pdmlabs.preprocessing.record_level.aggregator#

Time-series aggregation preprocessor for temporal downsampling.

MeanAggregator performs time-window downsampling by computing the average (mean) value within fixed time periods. Useful for: - Reducing sampling frequency / data volume - Smoothing high-frequency noise - Aligning data to a fixed temporal grid

Classes

MeanAggregator(event_preferences[,Β period])

Downsample time-series data by computing mean over fixed time windows.

class pdmlabs.preprocessing.record_level.aggregator.MeanAggregator(event_preferences: EventPreferences, period: str = '10T')#

Bases: RecordLevelPreProcessorInterface

Downsample time-series data by computing mean over fixed time windows.

This preprocessor aggregates time-indexed data into larger time buckets, computing the mean value for each feature in each bucket. It reduces data volume and can smooth high-frequency noise.

period#

Pandas resampling frequency string (e.g. β€˜10T’ = 10 minutes, β€˜1H’ = 1 hour, β€˜1D’ = 1 day). See pandas resample documentation for all valid frequencies.

Type:

str

Examples

>>> from pdmlabs.preprocessing.record_level.aggregator import MeanAggregator
>>> import pandas as pd
>>>
>>> # Minute-level data with 3 sensors
>>> times = pd.date_range('2024-01-01', periods=600, freq='1T')
>>> df_train = pd.DataFrame({
...     'vibration': range(600),
...     'temperature': [20 + i * 0.01 for i in range(600)],
...     'pressure': [100 + i * 0.005 for i in range(600)]
... }, index=times)
>>>
>>> aggregator = MeanAggregator(
...     event_preferences={'failure': [], 'reset': []},
...     period='10T'  # Aggregate to 10-minute intervals
... )
>>> aggregator.fit([df_train], ['sensor_array_1'], events_df)
>>> df_test_agg = aggregator.transform(df_test, 'sensor_array_1', events_df)
>>> # df_test_agg now has one row per 10-minute window with averaged values
fit(historic_data: list[DataFrame], historic_sources: list[str], event_data: DataFrame, anomaly_ranges=None) None#

Fit aggregator (no-op, just placeholder).

Mean aggregation is stateless, so fit() does nothing. The aggregation period is fixed at initialization.

Parameters:
  • historic_data (list[pd.DataFrame]) – Ignored.

  • historic_sources (list[str]) – Ignored.

  • event_data (pd.DataFrame) – Ignored.

  • anomaly_ranges – Ignored.

get_params()#

Return hyperparameters.

Returns:

{β€˜period’: resampling frequency string}

Return type:

dict

Examples

>>> print(aggregator.get_params())
{'period': '10T'}
transform(target_data: DataFrame, source: str, event_data: DataFrame) DataFrame#

Aggregate time-series data by computing mean over time windows.

Parameters:
  • target_data (pd.DataFrame) – Time-indexed DataFrame to aggregate. Must have a DatetimeIndex.

  • source (str) – Source identifier (unused).

  • event_data (pd.DataFrame) – Event log (unused).

Returns:

Aggregated data with one row per time period. Any

rows that become all-NaN after aggregation are dropped.

Return type:

pd.DataFrame

Examples

>>> # Original: 600 rows (1 minute intervals)
>>> df_test_agg = aggregator.transform(df_test, 'sensor_array_1', events_df)
>>> # Result: ~60 rows (1 row per 10-minute window)
>>> print(df_test_agg.shape)
(61, 3)
transform_one(new_sample: Series, source: str, is_event: bool) Series#

Aggregation is not supported for single samples.

Parameters:
  • new_sample (pd.Series) – Single row (unused).

  • source (str) – Source identifier (unused).

  • is_event (bool) – Event flag (unused).

Returns:

Single-sample aggregation is not supported.

Return type:

None

Raises:

NotImplementedError – Implicitly (returns None).