pdmlabs.postprocessing.moving_average#

Moving average smoothing post-processor for score noise reduction.

MovingAveragePostProcessor applies a rolling window average to anomaly scores, smoothening sharp spikes and reducing high-frequency noise. The first window_length scores are returned unchanged to avoid NaN values.

Useful when: - Scores are noisy/volatile (want stable detections) - Transient spikes need filtering - Want to smooth before thresholding

Note: Creates temporal dependence - each score depends on previous scores.

Classes

MovingAveragePostProcessor(...)

Smooth anomaly scores using rolling window mean.

class pdmlabs.postprocessing.moving_average.MovingAveragePostProcessor(event_preferences: EventPreferences, window_length: int)#

Bases: PostProcessorInterface

Smooth anomaly scores using rolling window mean.

This post-processor reduces score variance by averaging values within a fixed-size sliding window. First window_length scores are unchanged (to avoid NaN), then each score is replaced by the mean of its window.

window_length#

Number of scores in rolling window.

Type:

int

scores_buffer_per_source#

Maintains recent scores per source for online/streaming mode.

Type:

dict

Examples

>>> from pdmlabs.postprocessing.moving_average import MovingAveragePostProcessor
>>>
>>> processor = MovingAveragePostProcessor(
...     event_preferences={'failure': [], 'reset': []},
...     window_length=5
... )
>>> processor.fit([df_train], ['bearing_1'], events_df)
>>>
>>> scores = [0.1, 0.2, 0.8, 0.9, 0.3, 0.4, 0.5]
>>> smoothed = processor.transform(scores, 'bearing_1', events_df)
>>> # First 5 scores unchanged, then rolling mean applied
fit(historic_data: list[DataFrame], historic_sources: list[str], event_data: DataFrame, anomaly_ranges=None) None#

No-op fit (moving average is stateless).

Parameters:
  • historic_data (list[pd.DataFrame]) – Ignored.

  • historic_sources (list[str]) – Ignored.

  • event_data (pd.DataFrame) – Ignored.

  • anomaly_ranges – Ignored.

get_params()#

Return hyperparameters.

Returns:

{β€˜window_length’: window size in scores}

Return type:

dict

transform(scores: list[float], source: str, event_data: DataFrame) list[float]#

Apply rolling window average to smooth scores.

Parameters:
  • scores (list[float]) – Anomaly scores to smooth.

  • source (str) – Source identifier (unused).

  • event_data (pd.DataFrame) – Event log (unused).

Returns:

Smoothed scores (same length as input). First

window_length scores are unchanged; remaining are rolling means.

Return type:

list[float]

Examples

>>> processor = MovingAveragePostProcessor(event_preferences={...}, window_length=3)
>>> scores = [1, 2, 10, 3, 4, 5]  # Has spike at position 2
>>> smoothed = processor.transform(scores, 'sensor_1', events_df)
>>> # Result: [1, 2, 10, 5, 4, 4]  (first 3 unchanged, then rolling means)
transform_one(score_point: float, source: str, is_event: bool) float#

Apply moving average to single score (online mode).

Maintains a buffer of recent scores per source. Returns the score unchanged until window_length scores are buffered, then returns the mean of the last window_length scores.

Parameters:
  • score_point (float) – Single anomaly score to process.

  • source (str) – Source identifier (used to maintain separate buffers).

  • is_event (bool) – Event flag (unused).

Returns:

If buffer < window_length: returns score unchanged.

Otherwise: returns mean of last window_length scores.

Return type:

float