pdmlabs.postprocessing.Moving2T#
Moving 2-threshold adaptive thresholding post-processor.
Moving2Thresholder converts anomaly scores to binary labels (0/1) using an adaptive threshold based on the distribution of historical scores. The threshold is recalculated for each new score based on recent history:
threshold = mean(non-anomalies) + factor * std(non-anomalies)
Optionally can exclude previously detected anomalies from threshold statistics. Implements two-pass thresholding for more robust estimation.
Useful when: - Baseline (normal operation) scores shift over time - Want adaptive thresholds that adjust to data changes - Need binary anomaly labels from continuous scores
Functions
|
Calculate adaptive threshold using two-pass statistical method. |
|
Exclude previously detected anomalies before applying Moving2T threshold. |
Classes
|
Threshold anomaly scores adaptively using moving mean/std approach. |
- pdmlabs.postprocessing.Moving2T.Moving2T(MAerror, factor, hscaleCount=1000)#
Calculate adaptive threshold using two-pass statistical method.
Two-pass approach for robust threshold estimation: - Pass 1: Compute mean + factor*std from all scores, flag scores > threshold - Pass 2: Compute mean + factor*std from scores < Pass 1 threshold (non-outliers) - Return Pass 2 threshold and check if last score exceeds it
This two-pass method makes thresholds resistant to outlier inflation.
- Parameters:
MAerror (list[float]) – All anomaly scores so far.
factor (float) – Std multiplier for threshold calculation. factor=1: 1-sigma threshold factor=2: 2-sigma threshold factor=3: 3-sigma threshold (very unlikely anomalies)
hscaleCount (int, optional) – Number of recent scores to consider. None uses all history. Defaults to 1000.
- Returns:
- (is_anomaly_bool, threshold_value)
First element indicates if last score exceeds threshold AND we achieved stable threshold (not all-NaN or degenerate)
Second element is the calculated threshold
- Return type:
tuple
- Edge cases handled:
If only 1 unique score: threshold = that score
If pass 2 has no values: return last score as threshold
If all scores are outliers: threshold determined from remaining scores
Examples
>>> scores = [0.5, 0.6, 0.55, 0.7, 0.8, 2.5, 3.0] >>> is_anom, thresh = Moving2T(scores, factor=2.0, hscaleCount=None) >>> # Pass 1 threshold based on all scores ~1.2 >>> # Pass 2 threshold based on normal scores ~0.95 >>> # Returns whether 3.0 > 0.95 (True) and threshold value
- pdmlabs.postprocessing.Moving2T.Moving2Texclude(MAerror, anomalies, factor, hscaleCount=1000)#
Exclude previously detected anomalies before applying Moving2T threshold.
This helper removes scores that were already flagged as anomalies from the statistics calculation, making thresholds more robust to clustered anomalies.
- Parameters:
MAerror (list[float]) – All anomaly scores so far.
anomalies (list[int/bool]) – Binary indicators (0/False=normal, 1/True=anomaly) for the first len(anomalies) scores.
factor (float) – Std multiplier for threshold calculation.
hscaleCount (int, optional) – History window size. Defaults to 1000 (full history).
- Returns:
- (is_anomaly_bool, threshold_value)
First element indicates if last score is anomaly
Second element is the calculated threshold
- Return type:
tuple
Examples
>>> scores = [0.5, 0.6, 2.0, 0.55, 3.0] # Indices 2, 4 are anomalies >>> anomalies = [0, 0, 1, 0, 1] # Marking detected anomalies >>> is_anom, thresh = Moving2Texclude(scores, anomalies[:-1], factor=2) >>> # Calculates threshold using only normal scores (0.5, 0.6, 0.55)
- class pdmlabs.postprocessing.Moving2T.Moving2Thresholder(event_preferences: EventPreferences, factor: float = 3, history_window=None, exclude=False)#
Bases:
PostProcessorInterfaceThreshold anomaly scores adaptively using moving mean/std approach.
Converts continuous anomaly scores to binary labels (0=normal, 1=anomaly) using dynamic thresholds calculated from recent score history. Optionally excludes previously flagged anomalies from statistics.
- factor#
Multiplier for standard deviation in threshold calculation. Higher values = higher threshold = fewer anomalies detected.
- Type:
float
- history_window#
Number of historical scores to consider (None=all).
- Type:
int
- exclude#
If True, exclude previously detected anomalies from threshold statistics (more robust to anomaly clusters).
- Type:
bool
- anomaly_scores_dict#
Maintains history of scores per source.
- Type:
dict
Examples
>>> from pdmlabs.postprocessing.Moving2T import Moving2Thresholder >>> processor = Moving2Thresholder( ... event_preferences={'failure': [], 'reset': []}, ... factor=3.0, ... history_window=100, ... exclude=True ... ) >>> processor.fit([df_train], ['bearing_1'], events_df) >>> >>> scores = [0.5, 0.6, 0.55, 1.2, 0.7, 2.5, 0.8] # Has spike at 2.5 >>> binary_labels = processor.transform(scores, 'bearing_1', events_df) >>> # Result: [0, 0, 0, 0, 0, 1, 0] (only 2.5 crosses threshold)
- fit(historic_data: list[DataFrame], historic_sources: list[str], event_data: DataFrame, anomaly_ranges=None) None#
No-op fit (thresholds computed on-the-fly during transform).
- Parameters:
historic_data (list[pd.DataFrame]) – Ignored.
historic_sources (list[str]) – Ignored.
event_data (pd.DataFrame) – Ignored.
anomaly_ranges – Ignored.
- get_params()#
Return hyperparameters.
- Returns:
- {‘factor’: std multiplier, ‘history_window’: window size,
’exclude’: whether to exclude anomalies from stats}
- Return type:
dict
- transform(scores: list[float], source: str, event_data: DataFrame) list[float]#
Convert scores to binary anomaly labels with adaptive thresholds.
Processes scores sequentially, computing threshold for each based on history of all previous scores. Returns 1 if score > threshold, 0 otherwise.
- Parameters:
scores (list[float]) – Anomaly scores to threshold.
source (str) – Source identifier (used to maintain separate histories).
event_data (pd.DataFrame) – Event log (unused).
- Returns:
Binary labels (0 or 1) indicating anomaly/normal.
- Return type:
list[float]
Examples
>>> scores = [0.5, 0.6, 0.55, 1.2, 0.7, 2.5, 0.8] >>> labels = processor.transform(scores, 'bearing_1', events_df) >>> # Returns [0, 0, 0, 0, 0, 1, 0] (threshold rises as history grows)
- transform_one(score_point: float, source: str, is_event: bool) float#
Threshold single score using adaptive threshold (online mode).
- Parameters:
score_point (float) – Single anomaly score to threshold.
source (str) – Source identifier (used to maintain separate histories).
is_event (bool) – Event flag (unused).
- Returns:
1 if score > threshold, 0 otherwise.
- Return type:
float