pdmlabs.pipeline.pipeline#

PdMPipeline: Core data processing and component orchestration for predictive maintenance.

This module defines the PdMPipeline class, which orchestrates the complete anomaly detection pipeline for predictive maintenance experiments. A pipeline combines:

  • Dataset (with features, events, preferences, evaluation parameters)

  • Processing steps: preprocessor → method → postprocessor → thresholder

  • Event mapping logic (failures, resets, source relationships)

The pipeline is the contract between raw data and experiment execution. It encapsulates: - The transformation steps applied to features - How anomalies are detected (via the method) - How anomaly scores are refined (postprocessor) - How scores are converted to binary predictions (thresholder) - Event semantics (what constitutes a failure, reset, or normal boundary)

Architecture:

Raw Data (DataFrame)

[Preprocessor] - Normalization, feature engineering, windowing

[Method] - Anomaly detection (returns anomaly scores)

[Postprocessor] - Score refinement (smoothing, aggregation, fusion)

[Thresholder] - Binary decision boundary (0/1 predictions)

Predictions (binary labels)

Key Concepts:

Events: Timestamps with type, description, and source that define:
  • Failures: when equipment actually fails

  • Resets: when systems restart/reinitialize episodes

  • Sources: which subsystem/sensor an event applies to

Event Preferences: Rules for how events propagate to target sources:
  • Direct (source → target_source)

  • Broadcast (* applies to all targets)

  • Same-source (= only to same source)

Caching: Event extraction is cached on first call for performance.

Classes

PdMPipeline(steps, dataset, auc_resolution)

Orchestrates the complete predictive maintenance anomaly detection pipeline.

PdMPipelineSteps

Type definition for the four-stage anomaly detection pipeline.

class pdmlabs.pipeline.pipeline.PdMPipeline(steps: ~pdmlabs.pipeline.pipeline.PdMPipelineSteps, dataset: dict, auc_resolution: int, experiment_type=<class 'pdmlabs.method.semi_supervised_method.SemiSupervisedMethodInterface'>)#

Bases: object

Orchestrates the complete predictive maintenance anomaly detection pipeline.

A PdMPipeline combines a dataset with four processing steps (preprocessor, method, postprocessor, thresholder) to form a complete anomaly detection system. It also manages event mappings (failures, resets) and evaluation parameters.

The pipeline serves as the contract between raw data and experiment execution: - Defines what transformations apply to features - Specifies which anomaly detection method to use - Caches event extraction results for performance - Provides utilities to query failure/reset dates by device source

dataset#

Complete dataset specification with keys: - ‘event_data’: DataFrame with columns [date, type, description, source] - ‘event_preferences’: Dict mapping ‘failure’ and ‘reset’ to event rules - ‘dates’: Target dates/indices (for temporal alignment) - ‘predictive_horizon’: Lead time for detection [int or list] - ‘slide’: Sliding window size for VUS metrics - ‘lead’: Detection lead time threshold - ‘beta’: Weighting parameter for metrics - ‘historic_data’: Training data files/DataFrames - ‘target_data’: Test data files/DataFrames - ‘historic_sources’: Source labels for training data - ‘target_sources’: Source labels for test data

Type:

dict

steps#

Dictionary with preprocessor, method, postprocessor, thresholder.

Type:

PdMPipelineSteps

auc_resolution#

Resolution for threshold sweep in evaluation (e.g., 100).

Type:

int

experiment_type#

Default method class if not provided in steps.

event_preferences#

Parsed event rules for failures/resets.

Type:

dict

event_data#

Parsed event log with timestamps and sources.

Type:

pd.DataFrame

preprocessor, method, postprocessor, thresholder

Pipeline component instances.

Examples

>>> from pdmlabs.method.isolation_forest import IsolationForest
>>> from pdmlabs.preprocessing.no_preprocessor import NoPreprocessor
>>> from pdmlabs.postprocessing.no_postprocessor import NoPostprocessor
>>> from pdmlabs.thresholding.static_threshold import StaticThreshold
>>>
>>> steps = {
...     'preprocessor': NoPreprocessor(event_preferences={...}),
...     'method': IsolationForest,
...     'postprocessor': NoPostprocessor(event_preferences={...}),
...     'thresholder': StaticThreshold(threshold_value=0.5, event_preferences={...})
... }
>>> pipeline = PdMPipeline(
...     steps=steps,
...     dataset=my_dataset,
...     auc_resolution=100
... )
>>> failure_dates = pipeline.extract_failure_dates_for_source('bearing_1')
>>> reset_dates = pipeline.extract_reset_dates_for_source('bearing_1')
extract_failure_dates_for_source(source: str) list[Timestamp]#

Extract all failure timestamps for a specific source (device/subsystem).

Queries the event log to find all events matching the failure preferences that should apply to the given source. Uses caching for performance on repeated calls.

Event preferences define rules like: - Direct: specific source fires failure event - Broadcast (*): all sources affected when event fires - Group: multiple source IDs affected by same event

Parameters:

source (str) – Source/device identifier (e.g., ‘bearing_1’, ‘motor_A’).

Returns:

Sorted list of unique failure timestamps for this source.

Empty list if no failures defined or source not in preferences.

Return type:

list[pd.Timestamp]

Caching:

Failure dates are cached on first call via expanded_event_preferences. Subsequent calls are O(1) lookups. Cache persists across calls to extract_reset_dates_for_source.

Examples

>>> pipeline = PdMPipeline(...)
>>> failure_times = pipeline.extract_failure_dates_for_source('bearing_1')
>>> print(f"Failures at: {failure_times}")
Failures at: [Timestamp('2024-01-10 14:30:00'), Timestamp('2024-01-15 09:15:00')]
>>> # For broadcast events (* applies to all sources):
>>> failures_motor = pipeline.extract_failure_dates_for_source('motor_A')
>>> failures_bearing = pipeline.extract_failure_dates_for_source('bearing_1')
>>> # Both may include times from * events
extract_reset_dates_for_source(source) list[Timestamp]#

Extract all reset timestamps for a specific source (device/subsystem).

Queries the event log to find all events matching the reset preferences that should apply to the given source. Reset events typically mark: - Equipment restarts / reinitializations - Episode boundaries (e.g., new mission, device replacement) - Scenario transitions for evaluation

Similar to extract_failure_dates_for_source, but queries ‘reset’ preferences from event_preferences.event_data. Uses the same caching mechanism.

Parameters:

source (str) – Source/device identifier (e.g., ‘bearing_1’, ‘motor_A’).

Returns:

Sorted list of unique reset timestamps for this source.

Empty list if no resets defined or source not in preferences.

Return type:

list[pd.Timestamp]

Caching:

Resets are cached alongside failures on first call. Repeated calls are O(1).

Examples

>>> pipeline = PdMPipeline(...)
>>> reset_times = pipeline.extract_reset_dates_for_source('bearing_1')
>>> print(f"Resets at: {reset_times}")
Resets at: [Timestamp('2024-01-05 08:00:00'), Timestamp('2024-01-20 16:30:00')]
>>> # Use to segment data into episodes:
>>> failures = pipeline.extract_failure_dates_for_source('bearing_1')
>>> resets = pipeline.extract_reset_dates_for_source('bearing_1')
>>> # Resets define episode boundaries, failures are ground truth within each episode
find_affected_sources(given_expanded_preferences, get_affected) dict[str, List[List[str]]]#

Map event preferences to affected sources (internal caching utility).

Processes expanded event preferences and populates a dictionary indicating which sources are affected by each event. Handles three preference types: 1. Direct (=): event affects only its source of origin 2. Broadcast (*): event affects all known sources 3. List: event affects specific named target sources

This method is called once per pipeline to build a cache used by extract_failure_dates_for_source() and extract_reset_dates_for_source().

Parameters:
  • given_expanded_preferences (list) – List of expanded preference objects (typically all ‘failure’ or ‘reset’ preferences).

  • get_affected (dict) – Dictionary being populated with structure: {source_name: [[event_source, event_description, event_type], …]}

Returns:

Same dict with preferences added:

Maps each source to list of [source, description, type] tuples indicating which events apply to that source.

Return type:

dict[str, List[List[str]]]

Examples

>>> preferences = [...expanded preferences...]
>>> affected = {}  # Will be populated
>>> result = pipeline.find_affected_sources(preferences, affected)
>>> print(result['bearing_1'])  # Events affecting this source
[['source_A', 'overheat', 'failure'], ['*', 'shutdown', 'failure']]
get_step_by_name(step_name: str)#

Get a specific pipeline step by name.

Parameters:

step_name (str) – Name of the step to retrieve. Must be one of: ‘preprocessor’, ‘method’, ‘postprocessor’, ‘thresholder’.

Returns:

The requested pipeline component.

Return type:

RecordLevelPreProcessorInterface | MethodInterface | PostProcessorInterface | ThresholderInterface

Raises:

KeyError – If step_name is not found in the pipeline.

Examples

>>> pipeline = PdMPipeline(...)
>>> method = pipeline.get_step_by_name('method')
>>> preprocessor = pipeline.get_step_by_name('preprocessor')
get_steps() PdMPipelineSteps#

Return the pipeline steps (preprocessor, method, postprocessor, thresholder).

Returns:

Dictionary containing all four pipeline components.

Return type:

PdMPipelineSteps

Examples

>>> pipeline = PdMPipeline(...)
>>> steps = pipeline.get_steps()
>>> method = steps['method']
get_steps_as_str()#

Generate a string representation of the pipeline steps for display/logging.

Creates a human-readable identifier of the pipeline configuration, useful for: - MLflow experiment naming and logging - Cache keys in optimization - Result comparison and tracking

The format is:

preprocessor_<name>_method_<name>_postprocessor_<name>_thresholder_<name>

Returns:

Concatenated names of all four pipeline components.

Return type:

str

Examples

>>> pipeline = PdMPipeline(...)
>>> config_str = pipeline.get_steps_as_str()
>>> print(config_str)
preprocessor_StandardScaler_method_IsolationForest_postprocessor_Smoother_thresholder_StaticThreshold
>>> # Use in MLflow experiment names:
>>> exp_name = f"pdm-{pipeline.get_steps_as_str()}"
old_extract_failure_dates_for_source(source) list[Timestamp]#
class pdmlabs.pipeline.pipeline.PdMPipelineSteps#

Bases: TypedDict

Type definition for the four-stage anomaly detection pipeline.

preprocessor#

Transforms raw features (e.g., normalization, feature engineering, windowing). Fitted on training data, applied to test data.

Type:

RecordLevelPreProcessorInterface

method#

Anomaly detection model that returns anomaly scores. Can be supervised, semi-supervised, or unsupervised depending on availability of labels.

Type:

MethodInterface

postprocessor#

Refines anomaly scores (e.g., smoothing, source fusion, aggregation). Applied after method predictions.

Type:

PostProcessorInterface

thresholder#

Converts anomaly scores to binary predictions using a decision threshold. Can be fixed, adaptive, or learned.

Type:

ThresholderInterface

method: MethodInterface#
postprocessor: PostProcessorInterface#
preprocessor: RecordLevelPreProcessorInterface#
thresholder: ThresholderInterface#