pdmlabs.pipeline.pipeline#
PdMPipeline: Core data processing and component orchestration for predictive maintenance.
This module defines the PdMPipeline class, which orchestrates the complete anomaly detection pipeline for predictive maintenance experiments. A pipeline combines:
Dataset (with features, events, preferences, evaluation parameters)
Processing steps: preprocessor → method → postprocessor → thresholder
Event mapping logic (failures, resets, source relationships)
The pipeline is the contract between raw data and experiment execution. It encapsulates: - The transformation steps applied to features - How anomalies are detected (via the method) - How anomaly scores are refined (postprocessor) - How scores are converted to binary predictions (thresholder) - Event semantics (what constitutes a failure, reset, or normal boundary)
Architecture:
- Raw Data (DataFrame)
↓
- [Preprocessor] - Normalization, feature engineering, windowing
↓
- [Method] - Anomaly detection (returns anomaly scores)
↓
- [Postprocessor] - Score refinement (smoothing, aggregation, fusion)
↓
- [Thresholder] - Binary decision boundary (0/1 predictions)
↓
Predictions (binary labels)
Key Concepts:
- Events: Timestamps with type, description, and source that define:
Failures: when equipment actually fails
Resets: when systems restart/reinitialize episodes
Sources: which subsystem/sensor an event applies to
- Event Preferences: Rules for how events propagate to target sources:
Direct (source → target_source)
Broadcast (* applies to all targets)
Same-source (= only to same source)
Caching: Event extraction is cached on first call for performance.
Classes
|
Orchestrates the complete predictive maintenance anomaly detection pipeline. |
Type definition for the four-stage anomaly detection pipeline. |
- class pdmlabs.pipeline.pipeline.PdMPipeline(steps: ~pdmlabs.pipeline.pipeline.PdMPipelineSteps, dataset: dict, auc_resolution: int, experiment_type=<class 'pdmlabs.method.semi_supervised_method.SemiSupervisedMethodInterface'>)#
Bases:
objectOrchestrates the complete predictive maintenance anomaly detection pipeline.
A PdMPipeline combines a dataset with four processing steps (preprocessor, method, postprocessor, thresholder) to form a complete anomaly detection system. It also manages event mappings (failures, resets) and evaluation parameters.
The pipeline serves as the contract between raw data and experiment execution: - Defines what transformations apply to features - Specifies which anomaly detection method to use - Caches event extraction results for performance - Provides utilities to query failure/reset dates by device source
- dataset#
Complete dataset specification with keys: - ‘event_data’: DataFrame with columns [date, type, description, source] - ‘event_preferences’: Dict mapping ‘failure’ and ‘reset’ to event rules - ‘dates’: Target dates/indices (for temporal alignment) - ‘predictive_horizon’: Lead time for detection [int or list] - ‘slide’: Sliding window size for VUS metrics - ‘lead’: Detection lead time threshold - ‘beta’: Weighting parameter for metrics - ‘historic_data’: Training data files/DataFrames - ‘target_data’: Test data files/DataFrames - ‘historic_sources’: Source labels for training data - ‘target_sources’: Source labels for test data
- Type:
dict
- steps#
Dictionary with preprocessor, method, postprocessor, thresholder.
- Type:
- auc_resolution#
Resolution for threshold sweep in evaluation (e.g., 100).
- Type:
int
- experiment_type#
Default method class if not provided in steps.
- event_preferences#
Parsed event rules for failures/resets.
- Type:
dict
- event_data#
Parsed event log with timestamps and sources.
- Type:
pd.DataFrame
- preprocessor, method, postprocessor, thresholder
Pipeline component instances.
Examples
>>> from pdmlabs.method.isolation_forest import IsolationForest >>> from pdmlabs.preprocessing.no_preprocessor import NoPreprocessor >>> from pdmlabs.postprocessing.no_postprocessor import NoPostprocessor >>> from pdmlabs.thresholding.static_threshold import StaticThreshold >>> >>> steps = { ... 'preprocessor': NoPreprocessor(event_preferences={...}), ... 'method': IsolationForest, ... 'postprocessor': NoPostprocessor(event_preferences={...}), ... 'thresholder': StaticThreshold(threshold_value=0.5, event_preferences={...}) ... } >>> pipeline = PdMPipeline( ... steps=steps, ... dataset=my_dataset, ... auc_resolution=100 ... ) >>> failure_dates = pipeline.extract_failure_dates_for_source('bearing_1') >>> reset_dates = pipeline.extract_reset_dates_for_source('bearing_1')
- extract_failure_dates_for_source(source: str) list[Timestamp]#
Extract all failure timestamps for a specific source (device/subsystem).
Queries the event log to find all events matching the failure preferences that should apply to the given source. Uses caching for performance on repeated calls.
Event preferences define rules like: - Direct: specific source fires failure event - Broadcast (*): all sources affected when event fires - Group: multiple source IDs affected by same event
- Parameters:
source (str) – Source/device identifier (e.g., ‘bearing_1’, ‘motor_A’).
- Returns:
- Sorted list of unique failure timestamps for this source.
Empty list if no failures defined or source not in preferences.
- Return type:
list[pd.Timestamp]
- Caching:
Failure dates are cached on first call via expanded_event_preferences. Subsequent calls are O(1) lookups. Cache persists across calls to extract_reset_dates_for_source.
Examples
>>> pipeline = PdMPipeline(...) >>> failure_times = pipeline.extract_failure_dates_for_source('bearing_1') >>> print(f"Failures at: {failure_times}") Failures at: [Timestamp('2024-01-10 14:30:00'), Timestamp('2024-01-15 09:15:00')]
>>> # For broadcast events (* applies to all sources): >>> failures_motor = pipeline.extract_failure_dates_for_source('motor_A') >>> failures_bearing = pipeline.extract_failure_dates_for_source('bearing_1') >>> # Both may include times from * events
- extract_reset_dates_for_source(source) list[Timestamp]#
Extract all reset timestamps for a specific source (device/subsystem).
Queries the event log to find all events matching the reset preferences that should apply to the given source. Reset events typically mark: - Equipment restarts / reinitializations - Episode boundaries (e.g., new mission, device replacement) - Scenario transitions for evaluation
Similar to extract_failure_dates_for_source, but queries ‘reset’ preferences from event_preferences.event_data. Uses the same caching mechanism.
- Parameters:
source (str) – Source/device identifier (e.g., ‘bearing_1’, ‘motor_A’).
- Returns:
- Sorted list of unique reset timestamps for this source.
Empty list if no resets defined or source not in preferences.
- Return type:
list[pd.Timestamp]
- Caching:
Resets are cached alongside failures on first call. Repeated calls are O(1).
Examples
>>> pipeline = PdMPipeline(...) >>> reset_times = pipeline.extract_reset_dates_for_source('bearing_1') >>> print(f"Resets at: {reset_times}") Resets at: [Timestamp('2024-01-05 08:00:00'), Timestamp('2024-01-20 16:30:00')]
>>> # Use to segment data into episodes: >>> failures = pipeline.extract_failure_dates_for_source('bearing_1') >>> resets = pipeline.extract_reset_dates_for_source('bearing_1') >>> # Resets define episode boundaries, failures are ground truth within each episode
- find_affected_sources(given_expanded_preferences, get_affected) dict[str, List[List[str]]]#
Map event preferences to affected sources (internal caching utility).
Processes expanded event preferences and populates a dictionary indicating which sources are affected by each event. Handles three preference types: 1. Direct (=): event affects only its source of origin 2. Broadcast (*): event affects all known sources 3. List: event affects specific named target sources
This method is called once per pipeline to build a cache used by extract_failure_dates_for_source() and extract_reset_dates_for_source().
- Parameters:
given_expanded_preferences (list) – List of expanded preference objects (typically all ‘failure’ or ‘reset’ preferences).
get_affected (dict) – Dictionary being populated with structure: {source_name: [[event_source, event_description, event_type], …]}
- Returns:
- Same dict with preferences added:
Maps each source to list of [source, description, type] tuples indicating which events apply to that source.
- Return type:
dict[str, List[List[str]]]
Examples
>>> preferences = [...expanded preferences...] >>> affected = {} # Will be populated >>> result = pipeline.find_affected_sources(preferences, affected) >>> print(result['bearing_1']) # Events affecting this source [['source_A', 'overheat', 'failure'], ['*', 'shutdown', 'failure']]
- get_step_by_name(step_name: str)#
Get a specific pipeline step by name.
- Parameters:
step_name (str) – Name of the step to retrieve. Must be one of: ‘preprocessor’, ‘method’, ‘postprocessor’, ‘thresholder’.
- Returns:
The requested pipeline component.
- Return type:
RecordLevelPreProcessorInterface | MethodInterface | PostProcessorInterface | ThresholderInterface
- Raises:
KeyError – If step_name is not found in the pipeline.
Examples
>>> pipeline = PdMPipeline(...) >>> method = pipeline.get_step_by_name('method') >>> preprocessor = pipeline.get_step_by_name('preprocessor')
- get_steps() PdMPipelineSteps#
Return the pipeline steps (preprocessor, method, postprocessor, thresholder).
- Returns:
Dictionary containing all four pipeline components.
- Return type:
Examples
>>> pipeline = PdMPipeline(...) >>> steps = pipeline.get_steps() >>> method = steps['method']
- get_steps_as_str()#
Generate a string representation of the pipeline steps for display/logging.
Creates a human-readable identifier of the pipeline configuration, useful for: - MLflow experiment naming and logging - Cache keys in optimization - Result comparison and tracking
- The format is:
preprocessor_<name>_method_<name>_postprocessor_<name>_thresholder_<name>
- Returns:
Concatenated names of all four pipeline components.
- Return type:
str
Examples
>>> pipeline = PdMPipeline(...) >>> config_str = pipeline.get_steps_as_str() >>> print(config_str) preprocessor_StandardScaler_method_IsolationForest_postprocessor_Smoother_thresholder_StaticThreshold
>>> # Use in MLflow experiment names: >>> exp_name = f"pdm-{pipeline.get_steps_as_str()}"
- old_extract_failure_dates_for_source(source) list[Timestamp]#
- class pdmlabs.pipeline.pipeline.PdMPipelineSteps#
Bases:
TypedDictType definition for the four-stage anomaly detection pipeline.
- preprocessor#
Transforms raw features (e.g., normalization, feature engineering, windowing). Fitted on training data, applied to test data.
- method#
Anomaly detection model that returns anomaly scores. Can be supervised, semi-supervised, or unsupervised depending on availability of labels.
- Type:
- postprocessor#
Refines anomaly scores (e.g., smoothing, source fusion, aggregation). Applied after method predictions.
- Type:
- thresholder#
Converts anomaly scores to binary predictions using a decision threshold. Can be fixed, adaptive, or learned.
- Type:
- method: MethodInterface#
- postprocessor: PostProcessorInterface#
- preprocessor: RecordLevelPreProcessorInterface#
- thresholder: ThresholderInterface#