Source code for bigdl.chronos.detector.anomaly.dbscan_detector
#
# Copyright 2016 The BigDL Authors.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
from bigdl.chronos.detector.anomaly.abstract import AnomalyDetector
from bigdl.chronos.detector.anomaly.util import INTEL_EXT_DBSCAN
import numpy as np
[docs]class DBScanDetector(AnomalyDetector):
"""
Example:
>>> #The dataset to detect is y
>>> y = numpy.array(...)
>>> ad = DBScanDetector(eps=0.1, min_samples=6)
>>> ad.fit(y)
>>> anomaly_scores = ad.score()
>>> anomaly_indexes = ad.anomaly_indexes()
"""
def __init__(self,
eps=0.01,
min_samples=6,
**argv):
"""
Initialize a DBScanDetector.
:param eps: The maximum distance between two samples for one to be considered
as the neighborhood of the other.
It is a parameter of DBSCAN, refer to sklearn.cluster.DBSCAN docs for more details.
:param min_samples: The number of samples (or total weight) in a neighborhood
for a point to be considered as a core point.
It is a parameter of DBSCAN, refer to sklearn.cluster.DBSCAN docs for more details.
:param argv: Other parameters used in DBSCAN.
Refer to sklearn.cluster.DBSCAN docs for more details.
"""
self.eps = eps
self.min_samples = min_samples
self.argv = argv
self.anomaly_indexes_ = None
self.anomaly_scores_ = None
def check_data(self, arr):
if len(arr.shape) > 1:
from bigdl.nano.utils.common import invalidInputError
invalidInputError(False,
"Only univariate time series is supported")
[docs] def fit(self, y, use_sklearnex=True):
"""
Fit the model
:param y: the input time series. y must be 1-D numpy array.
:param use_sklearnex: bool, If scikit-learn-intelex is not installed,
DBScanDetector will fallback to use stock sklearn.
"""
self.check_data(y)
self.anomaly_scores_ = np.zeros_like(y)
with INTEL_EXT_DBSCAN(use_sklearnex=use_sklearnex,
algorithm_list=['DBSCAN']) as DBSCAN:
clusters = DBSCAN(eps=self.eps, min_samples=self.min_samples)\
.fit(y.reshape(-1, 1), **self.argv)
labels = clusters.labels_
outlier_indexes = np.where(labels == -1)[0]
self.anomaly_indexes_ = outlier_indexes
self.anomaly_scores_[self.anomaly_indexes_] = 1
[docs] def score(self):
"""
Gets the anomaly scores for each sample.
Each anomaly score is either 0 or 1, where 1 indicates an anomaly.
:return: anomaly score for each sample, in an array format with the same size as input
"""
if self.anomaly_indexes_ is None:
from bigdl.nano.utils.common import invalidInputError
invalidInputError(False,
"Please call fit first")
return self.anomaly_scores_
[docs] def anomaly_indexes(self):
"""
Gets the indexes of the anomalies.
:return: the indexes of the anomalies.
"""
if self.anomaly_indexes_ is None:
from bigdl.nano.utils.common import invalidInputError
invalidInputError(False,
"Please call fit first")
return self.anomaly_indexes_