pdmlabs.preprocessing.record_level.aggregator#
Time-series aggregation preprocessor for temporal downsampling.
MeanAggregator performs time-window downsampling by computing the average (mean) value within fixed time periods. Useful for: - Reducing sampling frequency / data volume - Smoothing high-frequency noise - Aligning data to a fixed temporal grid
Classes
|
Downsample time-series data by computing mean over fixed time windows. |
- class pdmlabs.preprocessing.record_level.aggregator.MeanAggregator(event_preferences: EventPreferences, period: str = '10T')#
Bases:
RecordLevelPreProcessorInterfaceDownsample time-series data by computing mean over fixed time windows.
This preprocessor aggregates time-indexed data into larger time buckets, computing the mean value for each feature in each bucket. It reduces data volume and can smooth high-frequency noise.
- period#
Pandas resampling frequency string (e.g. β10Tβ = 10 minutes, β1Hβ = 1 hour, β1Dβ = 1 day). See pandas resample documentation for all valid frequencies.
- Type:
str
Examples
>>> from pdmlabs.preprocessing.record_level.aggregator import MeanAggregator >>> import pandas as pd >>> >>> # Minute-level data with 3 sensors >>> times = pd.date_range('2024-01-01', periods=600, freq='1T') >>> df_train = pd.DataFrame({ ... 'vibration': range(600), ... 'temperature': [20 + i * 0.01 for i in range(600)], ... 'pressure': [100 + i * 0.005 for i in range(600)] ... }, index=times) >>> >>> aggregator = MeanAggregator( ... event_preferences={'failure': [], 'reset': []}, ... period='10T' # Aggregate to 10-minute intervals ... ) >>> aggregator.fit([df_train], ['sensor_array_1'], events_df) >>> df_test_agg = aggregator.transform(df_test, 'sensor_array_1', events_df) >>> # df_test_agg now has one row per 10-minute window with averaged values
- fit(historic_data: list[DataFrame], historic_sources: list[str], event_data: DataFrame, anomaly_ranges=None) None#
Fit aggregator (no-op, just placeholder).
Mean aggregation is stateless, so fit() does nothing. The aggregation period is fixed at initialization.
- Parameters:
historic_data (list[pd.DataFrame]) β Ignored.
historic_sources (list[str]) β Ignored.
event_data (pd.DataFrame) β Ignored.
anomaly_ranges β Ignored.
- get_params()#
Return hyperparameters.
- Returns:
{βperiodβ: resampling frequency string}
- Return type:
dict
Examples
>>> print(aggregator.get_params()) {'period': '10T'}
- transform(target_data: DataFrame, source: str, event_data: DataFrame) DataFrame#
Aggregate time-series data by computing mean over time windows.
- Parameters:
target_data (pd.DataFrame) β Time-indexed DataFrame to aggregate. Must have a DatetimeIndex.
source (str) β Source identifier (unused).
event_data (pd.DataFrame) β Event log (unused).
- Returns:
- Aggregated data with one row per time period. Any
rows that become all-NaN after aggregation are dropped.
- Return type:
pd.DataFrame
Examples
>>> # Original: 600 rows (1 minute intervals) >>> df_test_agg = aggregator.transform(df_test, 'sensor_array_1', events_df) >>> # Result: ~60 rows (1 row per 10-minute window) >>> print(df_test_agg.shape) (61, 3)
- transform_one(new_sample: Series, source: str, is_event: bool) Series#
Aggregation is not supported for single samples.
- Parameters:
new_sample (pd.Series) β Single row (unused).
source (str) β Source identifier (unused).
is_event (bool) β Event flag (unused).
- Returns:
Single-sample aggregation is not supported.
- Return type:
None
- Raises:
NotImplementedError β Implicitly (returns None).