jkitchin's picture
Upload folder using huggingface_hub
f5f42f3 verified
"""
Built-in fault detector implementations for the Tennessee Eastman Process.
This module provides several ready-to-use fault detectors ranging from
simple threshold checks to statistical methods. These serve as both
practical tools and examples for implementing custom detectors.
Available Detectors:
- ThresholdDetector: Fast safety limit checking
- EWMADetector: Exponentially weighted moving average change detection
- CUSUMDetector: Cumulative sum control chart for drift detection
- PCADetector: Principal Component Analysis with T² and SPE statistics
- StatisticalDetector: Multi-statistic ensemble detector
- SlidingWindowDetector: Simple sliding window comparison
Example:
>>> from tep.detector_base import FaultDetectorRegistry
>>>
>>> # Create a detector
>>> detector = FaultDetectorRegistry.create("threshold")
>>>
>>> # Or with custom parameters
>>> detector = FaultDetectorRegistry.create("pca", window_size=300)
"""
import numpy as np
from typing import Dict, List, Optional, Any, Tuple
from .detector_base import (
BaseFaultDetector,
DetectionResult,
FaultDetectorRegistry,
register_detector,
)
from .constants import SAFETY_LIMITS, NUM_MEASUREMENTS
# =============================================================================
# Threshold Detector
# =============================================================================
@register_detector(
name="threshold",
description="Fast threshold-based detection using process safety limits"
)
class ThresholdDetector(BaseFaultDetector):
"""
Simple threshold detector based on process safety limits.
Checks measurements against known safe operating ranges and flags
violations. Very fast (no window needed) and useful as a first
line of defense.
This detector can identify when the process is moving toward unsafe
conditions, which often indicates specific fault types.
Attributes:
limits: Dict mapping XMEAS index to (low, high) thresholds
fault_mapping: Dict mapping violated sensor to likely fault class
"""
name = "threshold"
description = "Threshold-based safety limit detector"
window_size = 1
detect_interval = 1
async_mode = False
def __init__(self, **kwargs):
super().__init__(**kwargs)
# Default thresholds from process safety limits
# Index is 0-based XMEAS index
self.limits: Dict[int, Tuple[Optional[float], Optional[float]]] = {
6: (None, 2900.0), # Reactor pressure max (kPa)
7: (3.0, 22.0), # Reactor level (%)
8: (None, 170.0), # Reactor temperature max (deg C)
11: (2.0, 11.0), # Separator level (%)
14: (2.0, 7.0), # Stripper level (%)
}
# Map sensor violations to likely fault classes
self.fault_mapping: Dict[int, int] = {
6: 4, # Pressure issues -> IDV(4) cooling water
7: 1, # Reactor level -> IDV(1) feed ratio
8: 4, # Temperature -> IDV(4) cooling water
11: 7, # Separator level -> IDV(7) header pressure
14: 6, # Stripper level -> IDV(6) A feed loss
}
def detect(self, xmeas: np.ndarray, step: int) -> DetectionResult:
"""Check measurements against thresholds."""
violations = []
max_severity = 0.0
for idx, (low, high) in self.limits.items():
val = xmeas[idx]
severity = 0.0
if low is not None and val < low:
severity = (low - val) / max(abs(low), 1.0)
violations.append((idx, severity))
elif high is not None and val > high:
severity = (val - high) / max(abs(high), 1.0)
violations.append((idx, severity))
max_severity = max(max_severity, severity)
if violations:
# Sort by severity
violations.sort(key=lambda x: -x[1])
top_sensor = violations[0][0]
fault_class = self.fault_mapping.get(top_sensor, 1)
# Confidence based on severity
confidence = min(0.5 + max_severity * 0.5, 0.99)
return DetectionResult(
fault_class=fault_class,
confidence=confidence,
step=step,
contributing_sensors=[v[0] for v in violations[:3]],
statistics={"max_severity": max_severity}
)
return DetectionResult(
fault_class=0,
confidence=0.9,
step=step
)
def get_parameters(self) -> Dict[str, Any]:
return {
"limits": self.limits,
"fault_mapping": self.fault_mapping,
}
# =============================================================================
# EWMA Detector
# =============================================================================
@register_detector(
name="ewma",
description="Exponentially Weighted Moving Average change detector"
)
class EWMADetector(BaseFaultDetector):
"""
EWMA-based change detection.
Tracks exponentially weighted moving averages and variances of
all measurements. Detects anomalies when current values deviate
significantly from the running statistics.
Good for detecting gradual changes and persistent deviations.
Attributes:
alpha: Smoothing factor (0-1). Higher = faster response.
threshold: Z-score threshold for anomaly detection.
warmup_steps: Steps before detection begins.
"""
name = "ewma"
description = "EWMA-based change detection"
window_size = 1 # Stateful, doesn't need window buffer
detect_interval = 1
async_mode = False
def __init__(self, alpha: float = 0.1, threshold: float = 3.0,
warmup_steps: int = 100, **kwargs):
super().__init__(**kwargs)
self.alpha = alpha
self.threshold = threshold
self.warmup_steps = warmup_steps
self._ewma: Optional[np.ndarray] = None
self._ewma_var: Optional[np.ndarray] = None
self._step_count = 0
def detect(self, xmeas: np.ndarray, step: int) -> DetectionResult:
"""Detect using EWMA statistics."""
self._step_count += 1
# Initialize on first call
if self._ewma is None:
self._ewma = xmeas.copy()
self._ewma_var = np.ones(NUM_MEASUREMENTS) * 0.01
return DetectionResult(-1, 0.0, step)
# Compute deviation before updating
diff = xmeas - self._ewma
std = np.sqrt(self._ewma_var + 1e-8)
z_scores = np.abs(diff) / std
# Update EWMA statistics
self._ewma = self.alpha * xmeas + (1 - self.alpha) * self._ewma
self._ewma_var = self.alpha * diff**2 + (1 - self.alpha) * self._ewma_var
# Don't flag during warmup
if self._step_count < self.warmup_steps:
return DetectionResult(-1, 0.0, step)
# Find anomalies
max_z = float(np.max(z_scores))
top_sensors = np.argsort(z_scores)[-5:][::-1].tolist()
if max_z > self.threshold:
# Confidence scales with how far above threshold
confidence = min(0.5 + (max_z - self.threshold) * 0.1, 0.99)
return DetectionResult(
fault_class=1, # Generic fault indication
confidence=confidence,
step=step,
contributing_sensors=top_sensors[:3],
statistics={
"max_z_score": max_z,
"mean_z_score": float(np.mean(z_scores)),
}
)
return DetectionResult(
fault_class=0,
confidence=min(0.5 + (self.threshold - max_z) * 0.1, 0.95),
step=step,
statistics={"max_z_score": max_z}
)
def _reset_impl(self):
"""Reset EWMA state."""
self._ewma = None
self._ewma_var = None
self._step_count = 0
def get_parameters(self) -> Dict[str, Any]:
return {
"alpha": self.alpha,
"threshold": self.threshold,
"warmup_steps": self.warmup_steps,
}
# =============================================================================
# CUSUM Detector
# =============================================================================
@register_detector(
name="cusum",
description="Cumulative Sum control chart for drift detection"
)
class CUSUMDetector(BaseFaultDetector):
"""
CUSUM (Cumulative Sum) control chart detector.
Accumulates deviations from expected values to detect persistent
shifts. Very effective for detecting slow drifts that might be
missed by instantaneous checks.
Attributes:
k: Slack parameter (allowance). Typically 0.5 * expected shift.
h: Decision threshold. Higher = fewer false alarms.
target: Target values (learned from initial data or provided).
"""
name = "cusum"
description = "CUSUM control chart for drift detection"
window_size = 100 # For learning baseline
detect_interval = 1
async_mode = False
def __init__(self, k: float = 0.5, h: float = 5.0, **kwargs):
super().__init__(**kwargs)
self.k = k
self.h = h
self._target: Optional[np.ndarray] = None
self._std: Optional[np.ndarray] = None
self._cusum_pos: Optional[np.ndarray] = None
self._cusum_neg: Optional[np.ndarray] = None
self._baseline_learned = False
def detect(self, xmeas: np.ndarray, step: int) -> DetectionResult:
"""Detect using CUSUM statistics."""
# Learn baseline from window
if not self._baseline_learned:
if not self.window_ready:
return DetectionResult(-1, 0.0, step)
window = self.window
self._target = np.mean(window, axis=0)
self._std = np.std(window, axis=0) + 1e-8
self._cusum_pos = np.zeros(NUM_MEASUREMENTS)
self._cusum_neg = np.zeros(NUM_MEASUREMENTS)
self._baseline_learned = True
# Normalized deviation
z = (xmeas - self._target) / self._std
# Update CUSUM
self._cusum_pos = np.maximum(0, self._cusum_pos + z - self.k)
self._cusum_neg = np.maximum(0, self._cusum_neg - z - self.k)
# Check for violations
max_pos = float(np.max(self._cusum_pos))
max_neg = float(np.max(self._cusum_neg))
max_cusum = max(max_pos, max_neg)
top_pos = np.argsort(self._cusum_pos)[-3:][::-1].tolist()
top_neg = np.argsort(self._cusum_neg)[-3:][::-1].tolist()
if max_cusum > self.h:
confidence = min(0.5 + (max_cusum - self.h) / self.h * 0.3, 0.99)
# Combine top contributors
contributing = list(set(top_pos + top_neg))[:5]
return DetectionResult(
fault_class=1, # Generic drift detected
confidence=confidence,
step=step,
contributing_sensors=contributing,
statistics={
"max_cusum_pos": max_pos,
"max_cusum_neg": max_neg,
}
)
return DetectionResult(
fault_class=0,
confidence=0.9,
step=step,
statistics={
"max_cusum_pos": max_pos,
"max_cusum_neg": max_neg,
}
)
def _reset_impl(self):
"""Reset CUSUM state."""
self._target = None
self._std = None
self._cusum_pos = None
self._cusum_neg = None
self._baseline_learned = False
def get_parameters(self) -> Dict[str, Any]:
return {
"k": self.k,
"h": self.h,
}
# =============================================================================
# PCA Detector
# =============================================================================
@register_detector(
name="pca",
description="PCA-based detection with T² and SPE statistics"
)
class PCADetector(BaseFaultDetector):
"""
Principal Component Analysis detector.
Projects measurements onto principal components learned from normal
operation. Uses two statistics for detection:
- T² (Hotelling's): Variation within the principal component space
- SPE (Q): Residual variation not captured by the model
This is a classic multivariate statistical process monitoring technique.
Attributes:
n_components: Number of principal components to retain
t2_threshold: Threshold for T² statistic
spe_threshold: Threshold for SPE statistic
auto_train: If True, train on first full window
"""
name = "pca"
description = "PCA-based T² and SPE fault detection"
window_size = 200
window_sample_interval = 1
detect_interval = 10
async_mode = False
def __init__(self, n_components: int = 10, t2_threshold: float = 15.0,
spe_threshold: float = 25.0, auto_train: bool = True, **kwargs):
super().__init__(**kwargs)
self.n_components = n_components
self.t2_threshold = t2_threshold
self.spe_threshold = spe_threshold
self.auto_train = auto_train
self._mean: Optional[np.ndarray] = None
self._std: Optional[np.ndarray] = None
self._components: Optional[np.ndarray] = None
self._eigenvalues: Optional[np.ndarray] = None
self._trained = False
def train(self, data: np.ndarray):
"""
Train PCA model on normal operation data.
Args:
data: Training data of shape (n_samples, 41)
"""
self._mean = np.mean(data, axis=0)
self._std = np.std(data, axis=0)
self._std[self._std < 1e-8] = 1.0 # Avoid division by zero
# Normalize
normalized = (data - self._mean) / self._std
# SVD for PCA
n_samples = len(data)
U, S, Vt = np.linalg.svd(normalized, full_matrices=False)
# Keep top components
n_comp = min(self.n_components, len(S))
self._components = Vt[:n_comp]
self._eigenvalues = (S[:n_comp]**2) / (n_samples - 1)
self._trained = True
def detect(self, xmeas: np.ndarray, step: int) -> DetectionResult:
"""Detect using PCA statistics."""
# Auto-train if needed
if not self._trained:
if self.auto_train and self.window_ready:
self.train(self.window)
else:
return DetectionResult(-1, 0.0, step)
# Normalize current measurement
x = (xmeas - self._mean) / self._std
# Project onto principal components
scores = x @ self._components.T
# T² statistic (Hotelling's T-squared)
t2 = float(np.sum(scores**2 / self._eigenvalues))
# SPE/Q statistic (squared prediction error)
reconstruction = scores @ self._components
residual = x - reconstruction
spe = float(np.sum(residual**2))
# Contribution analysis
contributions = np.abs(residual)
top_sensors = np.argsort(contributions)[-5:][::-1].tolist()
# Check thresholds
t2_violation = t2 > self.t2_threshold
spe_violation = spe > self.spe_threshold
if t2_violation or spe_violation:
# Compute confidence based on how much thresholds are exceeded
t2_ratio = t2 / self.t2_threshold if self.t2_threshold > 0 else 0
spe_ratio = spe / self.spe_threshold if self.spe_threshold > 0 else 0
max_ratio = max(t2_ratio, spe_ratio)
confidence = min(0.5 + (max_ratio - 1) * 0.2, 0.99)
return DetectionResult(
fault_class=1, # Anomaly detected
confidence=confidence,
step=step,
contributing_sensors=top_sensors[:3],
statistics={
"T2": t2,
"SPE": spe,
"T2_threshold": self.t2_threshold,
"SPE_threshold": self.spe_threshold,
}
)
return DetectionResult(
fault_class=0,
confidence=0.85,
step=step,
statistics={"T2": t2, "SPE": spe}
)
def _reset_impl(self):
"""Reset PCA state (keeps trained model)."""
pass # Model is retained across resets
def reset_model(self):
"""Clear the trained PCA model."""
self._mean = None
self._std = None
self._components = None
self._eigenvalues = None
self._trained = False
@property
def is_trained(self) -> bool:
"""Check if model has been trained."""
return self._trained
def get_parameters(self) -> Dict[str, Any]:
return {
"n_components": self.n_components,
"t2_threshold": self.t2_threshold,
"spe_threshold": self.spe_threshold,
"auto_train": self.auto_train,
"is_trained": self._trained,
}
# =============================================================================
# Statistical Detector
# =============================================================================
@register_detector(
name="statistical",
description="Multi-statistic ensemble detector"
)
class StatisticalDetector(BaseFaultDetector):
"""
Ensemble detector combining multiple statistical tests.
Computes various statistics on the measurement window and flags
anomalies when multiple tests indicate problems. More robust than
single-statistic methods.
Tests performed:
- Mean shift detection
- Variance change detection
- Correlation change detection
- Range (max-min) analysis
"""
name = "statistical"
description = "Multi-statistic ensemble detector"
window_size = 120
window_sample_interval = 1
detect_interval = 30
async_mode = False
def __init__(self, mean_threshold: float = 2.5, var_threshold: float = 2.0,
votes_required: int = 2, **kwargs):
super().__init__(**kwargs)
self.mean_threshold = mean_threshold
self.var_threshold = var_threshold
self.votes_required = votes_required
self._baseline_mean: Optional[np.ndarray] = None
self._baseline_std: Optional[np.ndarray] = None
self._baseline_var: Optional[np.ndarray] = None
self._baseline_learned = False
def detect(self, xmeas: np.ndarray, step: int) -> DetectionResult:
"""Detect using multiple statistical tests."""
if not self.window_ready:
return DetectionResult(-1, 0.0, step)
window = self.window
# Learn baseline from first window
if not self._baseline_learned:
self._baseline_mean = np.mean(window, axis=0)
self._baseline_std = np.std(window, axis=0) + 1e-8
self._baseline_var = np.var(window, axis=0) + 1e-8
self._baseline_learned = True
return DetectionResult(-1, 0.0, step)
# Current window statistics
current_mean = np.mean(window, axis=0)
current_var = np.var(window, axis=0) + 1e-8
# Test 1: Mean shift
mean_z = np.abs(current_mean - self._baseline_mean) / self._baseline_std
mean_violation = np.any(mean_z > self.mean_threshold)
mean_score = float(np.max(mean_z))
# Test 2: Variance change (F-test approximation)
var_ratio = np.maximum(current_var / self._baseline_var,
self._baseline_var / current_var)
var_violation = np.any(var_ratio > self.var_threshold)
var_score = float(np.max(var_ratio))
# Test 3: Recent trend
half = self.window_size // 2
first_half_mean = np.mean(window[:half], axis=0)
second_half_mean = np.mean(window[half:], axis=0)
trend = np.abs(second_half_mean - first_half_mean) / self._baseline_std
trend_violation = np.any(trend > self.mean_threshold)
trend_score = float(np.max(trend))
# Count votes
votes = sum([mean_violation, var_violation, trend_violation])
# Find contributing sensors
all_scores = mean_z + var_ratio + trend
top_sensors = np.argsort(all_scores)[-5:][::-1].tolist()
if votes >= self.votes_required:
confidence = min(0.4 + votes * 0.2, 0.95)
return DetectionResult(
fault_class=1,
confidence=confidence,
step=step,
contributing_sensors=top_sensors[:3],
statistics={
"mean_score": mean_score,
"var_score": var_score,
"trend_score": trend_score,
"votes": votes,
}
)
return DetectionResult(
fault_class=0,
confidence=0.8,
step=step,
statistics={
"mean_score": mean_score,
"var_score": var_score,
"trend_score": trend_score,
"votes": votes,
}
)
def _reset_impl(self):
"""Reset statistical baselines."""
self._baseline_mean = None
self._baseline_std = None
self._baseline_var = None
self._baseline_learned = False
def get_parameters(self) -> Dict[str, Any]:
return {
"mean_threshold": self.mean_threshold,
"var_threshold": self.var_threshold,
"votes_required": self.votes_required,
}
# =============================================================================
# Sliding Window Detector
# =============================================================================
@register_detector(
name="sliding_window",
description="Simple sliding window comparison detector"
)
class SlidingWindowDetector(BaseFaultDetector):
"""
Simple sliding window detector comparing recent vs older data.
Splits the window into two halves and detects when the recent
half differs significantly from the older half. Simple but
effective for detecting step changes.
"""
name = "sliding_window"
description = "Sliding window comparison detector"
window_size = 60
detect_interval = 10
async_mode = False
def __init__(self, threshold: float = 3.0, **kwargs):
super().__init__(**kwargs)
self.threshold = threshold
def detect(self, xmeas: np.ndarray, step: int) -> DetectionResult:
"""Detect by comparing window halves."""
if not self.window_ready:
return DetectionResult(-1, 0.0, step)
window = self.window
half = self.window_size // 2
# Split window
old_half = window[:half]
new_half = window[half:]
# Compare means
old_mean = np.mean(old_half, axis=0)
new_mean = np.mean(new_half, axis=0)
old_std = np.std(old_half, axis=0) + 1e-8
z_scores = np.abs(new_mean - old_mean) / old_std
max_z = float(np.max(z_scores))
top_sensors = np.argsort(z_scores)[-3:][::-1].tolist()
if max_z > self.threshold:
confidence = min(0.5 + (max_z - self.threshold) * 0.1, 0.95)
return DetectionResult(
fault_class=1,
confidence=confidence,
step=step,
contributing_sensors=top_sensors,
statistics={"max_z_score": max_z}
)
return DetectionResult(
fault_class=0,
confidence=0.85,
step=step,
statistics={"max_z_score": max_z}
)
def get_parameters(self) -> Dict[str, Any]:
return {"threshold": self.threshold}
# =============================================================================
# Composite Detector
# =============================================================================
@register_detector(
name="composite",
description="Combines multiple detectors with voting"
)
class CompositeDetector(BaseFaultDetector):
"""
Combines multiple detectors using voting.
Runs several detectors and aggregates their outputs. Can be
configured to require unanimous agreement or majority voting.
"""
name = "composite"
description = "Multi-detector voting ensemble"
window_size = 1 # Managed by sub-detectors
detect_interval = 1
async_mode = False
def __init__(self, detectors: List[BaseFaultDetector] = None,
min_votes: int = 2, **kwargs):
super().__init__(**kwargs)
self._sub_detectors = detectors or []
self.min_votes = min_votes
def add_detector(self, detector: BaseFaultDetector):
"""Add a sub-detector."""
self._sub_detectors.append(detector)
def detect(self, xmeas: np.ndarray, step: int) -> DetectionResult:
"""Aggregate sub-detector results."""
if not self._sub_detectors:
return DetectionResult(-1, 0.0, step)
results = []
for detector in self._sub_detectors:
result = detector.process(xmeas, step)
if result.is_ready:
results.append(result)
if not results:
return DetectionResult(-1, 0.0, step)
# Count fault votes
fault_votes = sum(1 for r in results if r.is_fault)
total_votes = len(results)
# Aggregate confidence
if fault_votes >= self.min_votes:
fault_results = [r for r in results if r.is_fault]
avg_confidence = np.mean([r.confidence for r in fault_results])
# Get most common fault class
fault_classes = [r.fault_class for r in fault_results]
most_common = max(set(fault_classes), key=fault_classes.count)
# Aggregate contributing sensors
all_sensors = []
for r in fault_results:
if r.contributing_sensors:
all_sensors.extend(r.contributing_sensors)
return DetectionResult(
fault_class=most_common,
confidence=float(avg_confidence),
step=step,
contributing_sensors=list(set(all_sensors))[:5],
statistics={
"fault_votes": fault_votes,
"total_votes": total_votes,
}
)
# No fault consensus
normal_results = [r for r in results if r.is_normal]
if normal_results:
avg_confidence = np.mean([r.confidence for r in normal_results])
else:
avg_confidence = 0.5
return DetectionResult(
fault_class=0,
confidence=float(avg_confidence),
step=step,
statistics={
"fault_votes": fault_votes,
"total_votes": total_votes,
}
)
def _reset_impl(self):
"""Reset all sub-detectors."""
for detector in self._sub_detectors:
detector.reset()
def get_parameters(self) -> Dict[str, Any]:
return {
"min_votes": self.min_votes,
"n_detectors": len(self._sub_detectors),
"detector_names": [d.name for d in self._sub_detectors],
}
# =============================================================================
# Passthrough Detector (for testing/baseline)
# =============================================================================
@register_detector(
name="passthrough",
description="Always reports normal (baseline for comparison)"
)
class PassthroughDetector(BaseFaultDetector):
"""
Baseline detector that always reports normal operation.
Useful for:
- Testing the detector framework
- Establishing a false-alarm-free baseline
- Placeholder when no detection is wanted
"""
name = "passthrough"
description = "Baseline detector (always normal)"
window_size = 1
detect_interval = 1
async_mode = False
def detect(self, xmeas: np.ndarray, step: int) -> DetectionResult:
"""Always return normal."""
return DetectionResult(
fault_class=0,
confidence=1.0,
step=step
)