Source code for bigdl.chronos.detector.anomaly.th_detector

#
# Copyright 2016 The BigDL Authors.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

import math
import numpy as np
from bigdl.chronos.detector.anomaly.abstract import AnomalyDetector
from bigdl.nano.utils.common import invalidInputError

from abc import ABC, abstractmethod


class Distance(ABC):
    """
    The Base Distance Class.
    """

    @abstractmethod
    def abs_dist(self, x, y):
        """
        Calculate the distance between x and y. a and b should be in same shape.

        :param x: the first tensor
        :param y: the second tensor
        :return: the absolute distance between x and y
        """
        pass


class EuclideanDistance(Distance):
    """
    Euclidean Distance Measure
    """

    def __init__(self):
        pass

    def abs_dist(self, x, y):
        return np.linalg.norm(x - y)


def estimate_pattern_th(y,
                        yhat,
                        mode="default",
                        ratio=0.01,
                        dist_measure=EuclideanDistance()):
    """
    Estimate the absolute distance threshold based on y and y_hat.

    :param y: actual values
    :param yhat: predicted values
    :param mode: types of ways to find threshold
        "default" : fit data to a uniform distribution (the percentile way)
        "gaussian": fit data to a gaussian distribution
    :param ratio: the ratio of anomaly to consider as anomaly.
    :param dist_measure: measure of distance
    :return: the threshold
    """
    invalidInputError(y.shape == yhat.shape, "y shape doesn't match yhat shape")
    diff = [dist_measure.abs_dist(m, n) for m, n in zip(y, yhat)]
    if mode == "default":
        threshold = np.percentile(diff, (1 - ratio) * 100)
        return threshold
    elif mode == "gaussian":
        from scipy.stats import norm
        mu, sigma = norm.fit(diff)
        t = norm.ppf(1 - ratio)
        return t * sigma + mu
    else:
        invalidInputError(False, f"Does not support ${mode}")


def estimate_trend_th(y,
                      mode="default",
                      ratio=0.01):
    """
    Estimate the min and max threshold based on y.

    :param y: actual values
    :param mode: types of ways to find threshold
        "default" : fit data to a uniform distribution (the percentile way)
        "gaussian": fit data to a gaussian distribution
    :param ratio: the ratio of anomaly to consider as anomaly.
    :return: tuple, the threshold (min, max)
    """
    if mode == "default":
        max_threshold = np.percentile(y, (1 - ratio) * 100)
        min_threshold = np.percentile(y, ratio * 100)
        return (min_threshold, max_threshold)
    elif mode == "gaussian":
        from scipy.stats import norm
        mu, sigma = norm.fit(y)
        max_t = norm.ppf(1 - ratio)
        min_t = norm.ppf(ratio)
        return (min_t * sigma + mu, max_t * sigma + mu)
    else:
        invalidInputError(False, f"Does not support ${mode}")


def detect_pattern_anomaly(y, yhat, th, dist_measure):
    anomaly_indexes = []
    for i, (y_i, yhat_i) in enumerate(zip(y, yhat)):
        if dist_measure.abs_dist(y_i, yhat_i) > th:
            anomaly_indexes.append(i)
    return anomaly_indexes


def detect_trend_anomaly(y, th):
    threshold_min = np.full_like(y, fill_value=th[0])
    threshold_max = np.full_like(y, fill_value=th[1])
    return detect_trend_anomaly_arr(y, (threshold_min, threshold_max))


def detect_trend_anomaly_arr(y, th_arr):
    min_diff = y - th_arr[0]
    max_diff = y - th_arr[1]
    anomaly_indexes = np.logical_or(min_diff < 0, max_diff > 0)
    anomaly_scores = np.zeros_like(y)
    anomaly_scores[anomaly_indexes] = 1
    return list(set(np.where(anomaly_scores > 0)[0]))


def detect_anomaly(y,
                   yhat=None,
                   pattern_th=math.inf,
                   trend_th=(-math.inf, math.inf),
                   dist_measure=EuclideanDistance()):
    """
    Detect anomalies. Each sample can have 1 or more dimensions.

    :param y: the values to detect. shape could be 1-D (num_samples,)
        or 2-D array (num_samples, features)
    :param yhat: the predicted values, a tensor with same shape as y,
        default to be None
    :param pattern_th: a single value, specify absolute distance threshold
                       between y and yhat
    :param trend_th: a tuple composed of min_threshold and max_threshold,
                     specify min and max threshold for y
    :param dist_measure: measure of distance
    :return: dict, the anomaly values indexes in the samples, including pattern and trend type
    """
    pattern_anomaly_indexes, trend_anomaly_indexes = [], []
    pattern_anomaly_scores, trend_anomaly_scores = np.zeros_like(y), np.zeros_like(y)
    # Detect pattern anomaly
    if yhat is not None:
        invalidInputError(isinstance(pattern_th, int) or isinstance(pattern_th, float),
                          f"Pattern threshold format {type(pattern_th)} is not supported, "
                          "please specify int or float value.")
        pattern_anomaly_indexes = detect_pattern_anomaly(y, yhat, pattern_th, dist_measure)

    # Detect trend anomaly
    invalidInputError(isinstance(trend_th, tuple) and len(trend_th) == 2,
                      "Trend threshold is supposed to be a tuple of two elements.")
    if (isinstance(trend_th[0], int) or isinstance(trend_th[0], float)) \
            and (isinstance(trend_th[1], int) or isinstance(trend_th[1], float)):
        # min / max values are scalars
        invalidInputError(trend_th[0] <= trend_th[1],
                          "Trend threshold is composed of (min, max), max should not be smaller.")
        trend_anomaly_indexes = detect_trend_anomaly(y, trend_th)
    elif trend_th[0].shape == y.shape and trend_th[1].shape == y.shape:
        # min max values are arrays
        invalidInputError(np.all((trend_th[1] - trend_th[0]) >= 0),
                          "In trend threshold (min, max), each data point in max tensor"
                          " should not be smaller.")
        trend_anomaly_indexes = detect_trend_anomaly_arr(y, trend_th)
    else:
        invalidInputError(False, f"Threshold format ${str(trend_th)} is not supported")

    pattern_anomaly_scores[pattern_anomaly_indexes] = 1
    trend_anomaly_scores[trend_anomaly_indexes] = 1

    anomaly_indexes = list(set(pattern_anomaly_indexes + trend_anomaly_indexes))
    anomaly_scores = np.zeros_like(y)
    anomaly_scores[anomaly_indexes] = 1

    index_dict = {'pattern anomaly index': pattern_anomaly_indexes,
                  'trend anomaly index': trend_anomaly_indexes,
                  'anomaly index': anomaly_indexes}
    score_dict = {'pattern anomaly score': pattern_anomaly_scores,
                  'trend anomaly score': trend_anomaly_scores,
                  'anomaly score': anomaly_scores}

    return index_dict, score_dict


[docs]class ThresholdDetector(AnomalyDetector): """ Example: >>> #The dataset is split into x_train, x_test, y_train, y_test >>> forecaster = Forecaster(...) >>> forecaster.fit(x=x_train, y=y_train, ...) >>> y_pred = forecaster.predict(x_test) >>> td = ThresholdDetector() >>> td.fit(y_test, y_pred) >>> anomaly_scores = td.score() >>> anomaly_indexes = td.anomaly_indexes() """ def __init__(self): """ Initialize a ThresholdDetector. """ # to detect pattern anomaly, specify absolute distance threshold between y_true and y_pred self.pattern_th = math.inf # to detect trend anomaly, specify min and max threshold for y_true self.trend_th = (-math.inf, math.inf) self.ratio = 0.01 self.dist_measure = EuclideanDistance() self.mode = "default" self.anomaly_indexes_ = None self.anomaly_scores_ = None
[docs] def set_params(self, mode="default", ratio=0.01, pattern_threshold=math.inf, trend_threshold=(-math.inf, math.inf), dist_measure=EuclideanDistance()): """ Set parameters for ThresholdDetector :param mode: mode can be "default" or "gaussian". "default" : fit data according to a uniform distribution "gaussian": fit data according to a gaussian distribution :param ratio: the ratio of anomaly to consider as anomaly. :param pattern_threshold: a single value, specify absolute distance threshold between real data and predicted data to detect pattern anomaly. :param trend_threshold: a tuple composed of min_threshold and max_threshold, specify min and max threshold for real data to detect trend anomaly. :param dist_measure: measure of distance """ self.ratio = ratio self.dist_measure = dist_measure self.mode = mode self.pattern_th = pattern_threshold invalidInputError(isinstance(trend_threshold, tuple) and len(trend_threshold) == 2, "Trend threshold is supposed to be a tuple of two elements.") self.trend_th = trend_threshold
[docs] def fit(self, y, y_pred=None): """ Fit the model :param y: the values to detect. shape could be 1-D (num_samples,) or 2-D array (num_samples, features) :param y_pred: the predicted values, a tensor with same shape as y, default to be None. """ if not isinstance(self.trend_th[0], np.ndarray) and self.trend_th[0] == -math.inf and \ not isinstance(self.trend_th[1], np.ndarray) and self.trend_th[1] == math.inf: self.trend_th = estimate_trend_th(y, mode=self.mode, ratio=self.ratio) if y_pred is not None and self.pattern_th == math.inf: self.pattern_th = estimate_pattern_th(y, y_pred, mode=self.mode, ratio=self.ratio, dist_measure=self.dist_measure) # calculate anomalies in advance in case score does not specify input anomalies = detect_anomaly(y, y_pred, self.pattern_th, self.trend_th, self.dist_measure) self.anomaly_indexes_ = anomalies[0] self.anomaly_scores_ = anomalies[1]
[docs] def score(self, y=None, y_pred=None): """ Gets the anomaly scores for each sample. Each anomaly score is either 0 or 1, where 1 indicates an anomaly. :param y: new time series to detect anomaly. If y is None, returns anomalies in y_pred. Moreover, if both y and y_hat are None, returns anomalies in the fit input. :param y_pred: predicted values corresponding to y :return: dict, anomaly score for each sample composed of pattern and trend type, in an array format with the same size as input """ if self.anomaly_scores_ is None: invalidInputError(False, "please call fit before calling score") if y is None and y_pred is None: return self.anomaly_scores_ elif y is None: _, score_dict = detect_anomaly(y=y_pred, trend_th=self.trend_th, dist_measure=self.dist_measure) return score_dict else: _, score_dict = detect_anomaly(y, yhat=y_pred, pattern_th=self.pattern_th, trend_th=self.trend_th, dist_measure=self.dist_measure) return score_dict
[docs] def anomaly_indexes(self, y=None, y_pred=None): """ Gets the indexes of the anomalies. :param y: new time series to detect anomaly. If y is None, returns anomalies in y_pred. Moreover, if both y and y_hat are None, returns anomalies in the fit input. :param y_pred: predicted values corresponding to y :return: dict, anomaly indexes composed of pattern and trend type """ if self.anomaly_indexes_ is None: invalidInputError(False, "please call fit before calling score") if y is None and y_pred is None: return self.anomaly_indexes_ elif y is None: index_dict, _ = detect_anomaly(y=y_pred, trend_th=self.trend_th, dist_measure=self.dist_measure) return index_dict else: index_dict, _ = detect_anomaly(y, yhat=y_pred, pattern_th=self.pattern_th, trend_th=self.trend_th, dist_measure=self.dist_measure) return index_dict