From 7621454fa3a1b81b7ea419622d8279a2c42fceec Mon Sep 17 00:00:00 2001 From: mazzma12 Date: Fri, 8 Sep 2023 11:17:03 +0200 Subject: [PATCH] Add anomaly detector based on standard deviation range from mean --- src/adtk/detector/__init__.py | 2 ++ src/adtk/detector/_detector_1d.py | 59 +++++++++++++++++++++++++++++++ 2 files changed, 61 insertions(+) diff --git a/src/adtk/detector/__init__.py b/src/adtk/detector/__init__.py index c3bdd7d..d6e5db8 100644 --- a/src/adtk/detector/__init__.py +++ b/src/adtk/detector/__init__.py @@ -18,6 +18,7 @@ PersistAD, QuantileAD, SeasonalAD, + VolatilityRangeAD, ThresholdAD, VolatilityShiftAD, ) @@ -56,6 +57,7 @@ def print_all_models() -> None: "ThresholdAD", "QuantileAD", "InterQuartileRangeAD", + "VolatilityRangeAD", "GeneralizedESDTestAD", "PersistAD", "LevelShiftAD", diff --git a/src/adtk/detector/_detector_1d.py b/src/adtk/detector/_detector_1d.py index 0e2b8ff..153e81c 100644 --- a/src/adtk/detector/_detector_1d.py +++ b/src/adtk/detector/_detector_1d.py @@ -271,6 +271,65 @@ def _predict_core(self, s: pd.Series) -> pd.Series: return predicted +class VolatilityRangeAD(_TrainableUnivariateDetector): + """Anomaly detector based on standard deviation range. + + This detector flags anomalies for values that are outside the range of [mean - c * std, mean + c * std]. + + Parameters + ---------- + c : float, optional (default=1.0) + The multiplier for the standard deviation to set the range for anomaly detection. + """ + + def __init__( + self, + c: Union[ + Optional[float], Tuple[Optional[float], Optional[float]] + ] = 3.0, + ) -> None: + super().__init__() + self.c = c + + @property + def _param_names(self) -> Tuple[str, ...]: + return ("c",) + + def _fit_core(self, s: pd.Series) -> None: + if s.count() == 0: + raise RuntimeError("Valid values are not enough for training.") + mean_val = s.mean() + std_val = s.std() + + self.abs_low_ = ( + ( + mean_val + - std_val + * (self.c if (not isinstance(self.c, tuple)) else self.c[0]) + ) + if ( + (self.c if (not isinstance(self.c, tuple)) else self.c[0]) + is not None + ) + else -float("inf") + ) + self.abs_high_ = ( + mean_val + + std_val + * (self.c if (not isinstance(self.c, tuple)) else self.c[1]) + if ( + (self.c if (not isinstance(self.c, tuple)) else self.c[1]) + is not None + ) + else float("inf") + ) + + def _predict_core(self, s: pd.Series) -> pd.Series: + predicted = (s > self.abs_high_) | (s < self.abs_low_) + predicted[s.isna()] = np.nan + return predicted + + class GeneralizedESDTestAD(_TrainableUnivariateDetector): """Detector that detects anomaly based on generalized ESD test.