pdmlabs.postprocessing.Moving2T#

Moving 2-threshold adaptive thresholding post-processor.

Moving2Thresholder converts anomaly scores to binary labels (0/1) using an adaptive threshold based on the distribution of historical scores. The threshold is recalculated for each new score based on recent history:

threshold = mean(non-anomalies) + factor * std(non-anomalies)

Optionally can exclude previously detected anomalies from threshold statistics. Implements two-pass thresholding for more robust estimation.

Useful when: - Baseline (normal operation) scores shift over time - Want adaptive thresholds that adjust to data changes - Need binary anomaly labels from continuous scores

Functions

Moving2T(MAerror, factor[, hscaleCount])

Calculate adaptive threshold using two-pass statistical method.

Moving2Texclude(MAerror, anomalies, factor)

Exclude previously detected anomalies before applying Moving2T threshold.

Classes

Moving2Thresholder(event_preferences[, ...])

Threshold anomaly scores adaptively using moving mean/std approach.

pdmlabs.postprocessing.Moving2T.Moving2T(MAerror, factor, hscaleCount=1000)#

Calculate adaptive threshold using two-pass statistical method.

Two-pass approach for robust threshold estimation: - Pass 1: Compute mean + factor*std from all scores, flag scores > threshold - Pass 2: Compute mean + factor*std from scores < Pass 1 threshold (non-outliers) - Return Pass 2 threshold and check if last score exceeds it

This two-pass method makes thresholds resistant to outlier inflation.

Parameters:
  • MAerror (list[float]) – All anomaly scores so far.

  • factor (float) – Std multiplier for threshold calculation. factor=1: 1-sigma threshold factor=2: 2-sigma threshold factor=3: 3-sigma threshold (very unlikely anomalies)

  • hscaleCount (int, optional) – Number of recent scores to consider. None uses all history. Defaults to 1000.

Returns:

(is_anomaly_bool, threshold_value)
  • First element indicates if last score exceeds threshold AND we achieved stable threshold (not all-NaN or degenerate)

  • Second element is the calculated threshold

Return type:

tuple

Edge cases handled:
  • If only 1 unique score: threshold = that score

  • If pass 2 has no values: return last score as threshold

  • If all scores are outliers: threshold determined from remaining scores

Examples

>>> scores = [0.5, 0.6, 0.55, 0.7, 0.8, 2.5, 3.0]
>>> is_anom, thresh = Moving2T(scores, factor=2.0, hscaleCount=None)
>>> # Pass 1 threshold based on all scores ~1.2
>>> # Pass 2 threshold based on normal scores ~0.95
>>> # Returns whether 3.0 > 0.95 (True) and threshold value
pdmlabs.postprocessing.Moving2T.Moving2Texclude(MAerror, anomalies, factor, hscaleCount=1000)#

Exclude previously detected anomalies before applying Moving2T threshold.

This helper removes scores that were already flagged as anomalies from the statistics calculation, making thresholds more robust to clustered anomalies.

Parameters:
  • MAerror (list[float]) – All anomaly scores so far.

  • anomalies (list[int/bool]) – Binary indicators (0/False=normal, 1/True=anomaly) for the first len(anomalies) scores.

  • factor (float) – Std multiplier for threshold calculation.

  • hscaleCount (int, optional) – History window size. Defaults to 1000 (full history).

Returns:

(is_anomaly_bool, threshold_value)
  • First element indicates if last score is anomaly

  • Second element is the calculated threshold

Return type:

tuple

Examples

>>> scores = [0.5, 0.6, 2.0, 0.55, 3.0]  # Indices 2, 4 are anomalies
>>> anomalies = [0, 0, 1, 0, 1]  # Marking detected anomalies
>>> is_anom, thresh = Moving2Texclude(scores, anomalies[:-1], factor=2)
>>> # Calculates threshold using only normal scores (0.5, 0.6, 0.55)
class pdmlabs.postprocessing.Moving2T.Moving2Thresholder(event_preferences: EventPreferences, factor: float = 3, history_window=None, exclude=False)#

Bases: PostProcessorInterface

Threshold anomaly scores adaptively using moving mean/std approach.

Converts continuous anomaly scores to binary labels (0=normal, 1=anomaly) using dynamic thresholds calculated from recent score history. Optionally excludes previously flagged anomalies from statistics.

factor#

Multiplier for standard deviation in threshold calculation. Higher values = higher threshold = fewer anomalies detected.

Type:

float

history_window#

Number of historical scores to consider (None=all).

Type:

int

exclude#

If True, exclude previously detected anomalies from threshold statistics (more robust to anomaly clusters).

Type:

bool

anomaly_scores_dict#

Maintains history of scores per source.

Type:

dict

Examples

>>> from pdmlabs.postprocessing.Moving2T import Moving2Thresholder
>>> processor = Moving2Thresholder(
...     event_preferences={'failure': [], 'reset': []},
...     factor=3.0,
...     history_window=100,
...     exclude=True
... )
>>> processor.fit([df_train], ['bearing_1'], events_df)
>>>
>>> scores = [0.5, 0.6, 0.55, 1.2, 0.7, 2.5, 0.8]  # Has spike at 2.5
>>> binary_labels = processor.transform(scores, 'bearing_1', events_df)
>>> # Result: [0, 0, 0, 0, 0, 1, 0]  (only 2.5 crosses threshold)
fit(historic_data: list[DataFrame], historic_sources: list[str], event_data: DataFrame, anomaly_ranges=None) None#

No-op fit (thresholds computed on-the-fly during transform).

Parameters:
  • historic_data (list[pd.DataFrame]) – Ignored.

  • historic_sources (list[str]) – Ignored.

  • event_data (pd.DataFrame) – Ignored.

  • anomaly_ranges – Ignored.

get_params()#

Return hyperparameters.

Returns:

{‘factor’: std multiplier, ‘history_window’: window size,

’exclude’: whether to exclude anomalies from stats}

Return type:

dict

transform(scores: list[float], source: str, event_data: DataFrame) list[float]#

Convert scores to binary anomaly labels with adaptive thresholds.

Processes scores sequentially, computing threshold for each based on history of all previous scores. Returns 1 if score > threshold, 0 otherwise.

Parameters:
  • scores (list[float]) – Anomaly scores to threshold.

  • source (str) – Source identifier (used to maintain separate histories).

  • event_data (pd.DataFrame) – Event log (unused).

Returns:

Binary labels (0 or 1) indicating anomaly/normal.

Return type:

list[float]

Examples

>>> scores = [0.5, 0.6, 0.55, 1.2, 0.7, 2.5, 0.8]
>>> labels = processor.transform(scores, 'bearing_1', events_df)
>>> # Returns [0, 0, 0, 0, 0, 1, 0]  (threshold rises as history grows)
transform_one(score_point: float, source: str, is_event: bool) float#

Threshold single score using adaptive threshold (online mode).

Parameters:
  • score_point (float) – Single anomaly score to threshold.

  • source (str) – Source identifier (used to maintain separate histories).

  • is_event (bool) – Event flag (unused).

Returns:

1 if score > threshold, 0 otherwise.

Return type:

float