𧬠Implementing Survival Analysis Methods#
Survival analysis methods model time-to-failure while handling censored data (systems that havenβt failed yet). Theyβre ideal for real-world scenarios where failure times are unknown for some equipment.
When to Use:
- You have time-to-failure data with censoring indicators
- You want to model both time AND event occurrence jointly
- You need to handle right-censored data (equipment still operational)
- Works only with Supervised_SA_PdMExperiment
Key Requirement: Training labels are tuples (time, event) where event β {0=censored, 1=failed}
Survival Analysis Concepts#
Survival Data Structure:
# Each sample has TWO labels:
labels = [
(100, 1), # Failed at time 100
(200, 0), # Censored (still alive) at time 200
(150, 1), # Failed at time 150
(250, 0), # Censored at time 250
]
Time: When failure occurred or when observation ended
Event: 1 if failure occurred, 0 if censored (still operating)
Cox Proportional Hazards:
Models the hazard function (instantaneous failure rate):
Combines baseline hazard with feature effects
Outputs survival curves: probability of surviving beyond time t
Interface Overview#
Survival methods inherit from SupervisedMethodInterface:
from pdmlabs.method.supervised_method import SupervisedMethodInterface
from pdmlabs.pdm_evaluation_types.types import EventPreferences
class MySurvivalMethod(SupervisedMethodInterface):
def __init__(self, event_preferences: EventPreferences, **kwargs):
super().__init__(event_preferences=event_preferences)
Key Characteristics:
- Has fit() method on (time, event) tuples
- Models both failure time AND occurrence
- Returns structured prediction: survival curves
- Works with Supervised_SA_PdMExperiment only
Example: Cox Proportional Hazards#
The CoxPH is a reference implementation of survival analysis.
File Location: pdmlabs/method/CoxModel.py
What It Does: - Fits Cox PH model to time-to-failure data with censoring - Maintains separate models per data source - Returns survival function predictions (probability of surviving past each time point)
Implementation Details:
class CoxPH(SupervisedMethodInterface):
def __init__(self, event_preferences: EventPreferences,
save_model=False, *args, **kwargs):
super().__init__(event_preferences=event_preferences)
self.model_per_source = {}
self.avail_times_per_source = {} # Store time points for curves
self.initial_args = args
self.initial_kwargs = kwargs
self.save_model = save_model
Training Phase:
def fit(self, historic_data, historic_sources, event_data, anomaly_ranges):
"""
Train Cox PH model on survival data.
anomaly_ranges structure for survival analysis:
- List per source
- Each element is list of (rul, event) tuples
"""
for data, source, labels in zip(historic_data, historic_sources, anomaly_ranges):
# Convert labels [(rul1, event1), (rul2, event2), ...] to sklearn format
from sksurv.util import Surv
rul_times = [lb[0] for lb in labels]
events = [lb[1] for lb in labels]
y_df = pd.DataFrame({'event': events, 'RUL': rul_times})
y = Surv.from_dataframe("event", "RUL", y_df)
# Train Cox PH model
model = CoxPHSurvivalAnalysis(*self.initial_args, **self.initial_kwargs)
model.fit(data, y)
self.model_per_source[source] = model
self.avail_times_per_source[source] = np.unique(rul_times)
Prediction Phase:
def predict(self, target_data, source, event_data):
"""
Predict survival curves for test data.
Returns:
Array of shape (n_samples, 2, n_timepoints)
- First slice: survival probabilities
- Second slice: time points
"""
model = self.model_per_source[source]
times = self.avail_times_per_source[source]
# Get survival functions at each time point
survival_funcs = model.predict_survival_function(target_data)
# Stack into (n, 2, T) format
result = np.stack([survival_funcs, np.tile(times, ...)], axis=1)
return result
Creating Your Own Survival Analysis Method#
Follow this template:
Step 1: Create File
Create pdmlabs/method/my_survival_method.py:
import pandas as pd
import numpy as np
from sksurv.linear_model import CoxPHSurvivalAnalysis # Survival model
from pdmlabs.method.supervised_method import SupervisedMethodInterface
from pdmlabs.pdm_evaluation_types.types import EventPreferences
class MySurvivalMethod(SupervisedMethodInterface):
"""Survival analysis for predictive maintenance with censoring.
This method models time-to-failure while accounting for censored
observations (equipment that hasn't failed yet).
"""
def __init__(self,
event_preferences: EventPreferences,
alpha: float = 0.05,
*args,
**kwargs):
super().__init__(event_preferences=event_preferences)
self.alpha = alpha # Confidence interval level
self.initial_args = args
self.initial_kwargs = kwargs
self.model_per_source = {}
self.available_times_per_source = {}
Step 2: Implement fit()
Train on survival data with censoring:
def fit(self, historic_data: list[pd.DataFrame],
historic_sources: list[str],
event_data: pd.DataFrame,
anomaly_ranges: list[list]) -> None:
"""
Train survival model for each source.
Args:
historic_data: Training features (one per source)
historic_sources: Source names
event_data: Event log
anomaly_ranges: **Survival tuples** [(time1, event1), (time2, event2), ...]
- time: time to failure or censoring
- event: 1 if failed, 0 if censored
Censoring example:
- (100, 1) β Failed at time 100
- (200, 0) β Still operational at time 200 (censored)
"""
from sksurv.util import Surv
for data, source, labels in zip(historic_data, historic_sources, anomaly_ranges):
# Extract times and events from tuples
times = [lb[0] for lb in labels]
events = [lb[1] for lb in labels]
# Convert to sksurv format
y_df = pd.DataFrame({
'event': events,
'time': times
})
y = Surv.from_dataframe("event", "time", y_df)
# Train survival model
model = CoxPHSurvivalAnalysis(alpha=self.alpha, *self.initial_args, **self.initial_kwargs)
model.fit(data, y)
self.model_per_source[source] = model
self.available_times_per_source[source] = np.unique(np.sort(times))
Step 3: Implement predict() - Survival Curves
Predict survival probabilities:
def predict(self, target_data: pd.DataFrame, source: str, event_data: pd.DataFrame):
"""
Predict survival curves for test samples.
Args:
target_data: Test features
source: Source identifier
event_data: Event log
Returns:
Array of shape (n_samples, 2, n_timepoints):
- [:, 0, :] = survival probabilities at each time
- [:, 1, :] = time points
"""
if source not in self.model_per_source:
raise ValueError(f"No model for source '{source}'")
model = self.model_per_source[source]
times = self.available_times_per_source[source]
# Get survival function (probability of survival at each time point)
survival_functions = model.predict_survival_function(target_data)
# survival_functions has shape (n_samples, n_timepoints)
n_samples = survival_functions.shape[0]
n_times = len(times)
# Stack into output format: (n_samples, 2, n_times)
# First dimension: survival probabilities
# Second dimension: time values
result = np.stack([
survival_functions.values, # Survival probabilities
np.tile(times, (n_samples, 1)) # Repeated times
], axis=1)
return result
Step 4: Implement predict_one() - Single Sample
Predict survival curve for one sample:
def predict_one(self, new_sample: pd.Series, source: str, is_event: bool) -> float:
"""
Predict survival curve for a single sample.
Args:
new_sample: Single features as Series
source: Source identifier
is_event: Event flag (context)
Returns:
Survival curve data
"""
if source not in self.model_per_source:
raise ValueError(f"No model for source '{source}'")
model = self.model_per_source[source]
times = self.available_times_per_source[source]
sample_array = new_sample.to_numpy().reshape(1, -1)
survival_func = model.predict_survival_function(sample_array)
n_times = len(times)
result = np.stack([
survival_func.values.flatten(),
times
], axis=0)
return result
Step 5: Implement remaining methods
def get_params(self) -> dict:
"""Return hyperparameters."""
first_source = list(self.model_per_source.keys())[0]
model = self.model_per_source[first_source]
return {
**model.get_params(),
'alpha': self.alpha,
}
def __str__(self) -> str:
return 'MySurvivalMethod'
def get_library(self) -> str:
return 'no_save'
def get_all_models(self):
return self.model_per_source
Survival Data Preparation#
Creating Survival Labels from Event Data:
If you have event timestamps, compute survival tuples:
def compute_survival_label(sample_time, failure_time, observation_end):
"""
Create survival label (time, event).
Args:
sample_time: When sample was collected
failure_time: When failure occurred (or None if not yet happened)
observation_end: When observation window ended
Returns:
(time_to_event, event_indicator)
"""
if failure_time is not None:
# Equipment failed - record time to failure
time_to_event = failure_time - sample_time
return (time_to_event, 1) # event=1 (failure occurred)
else:
# Equipment still operational - censored
time_to_event = observation_end - sample_time
return (time_to_event, 0) # event=0 (censored/right-censored)
Testing Your Implementation#
With your survival dataset prepared, test your custom survival method using run_experiment:
from pdmlabs.utils.dataset import Dataset
from pdmlabs.experiment.batch.SA_experiment import Supervised_SA_PdMExperiment
from pdmlabs.RunExperiment import run_experiment
from my_survival_method import MySurvivalMethod
from pdmlabs.pdm_evaluation_types.types import EventPreferences
# 1. Load data (must have survival tuples with censoring indicators)
df = pd.read_csv('your_survival_data.csv')
dataset_handler = Dataset(
data=df,
datetime_column="timestamp",
source_column="source",
train_sources=0.6,
val_sources=0.2,
test_sources=0.2
)
ds_sa, _ = dataset_handler.get_SA_dataset()
# 2. Define hyperparameters for your survival method
method_param_space = {
'alpha': [0.01, 0.05, 0.1],
}
# 3. Run experiment with run_experiment
best_params = run_experiment(
dataset=ds_sa,
methods=[MySurvivalMethod],
param_space_dict_per_method=[method_param_space],
method_names=['MySurvivalMethod'],
experiments=[Supervised_SA_PdMExperiment],
experiment_names=['Survival Analysis'],
MAX_RUNS=12,
MAX_JOBS=2,
INITIAL_RANDOM=2,
profile_size=10,
optimization_param='C_index',
maximize=True
)
# 5. Check results
print(f"Best parameters: {best_params[0]}")
Next Steps#
Review
CoxModel(CoxPH) inpdmlabs/method/CoxModel.pyExplore sksurv documentation (https://scikit-survival.readthedocs.io/)
Check
Supervised_SA_PdMExperimentinpdmlabs/experiment/batch/Review dataset SA preparation in
pdmlabs/utils/dataset.py::get_SA_dataset()