|
|
""" |
|
|
Built-in fault detector implementations for the Tennessee Eastman Process. |
|
|
|
|
|
This module provides several ready-to-use fault detectors ranging from |
|
|
simple threshold checks to statistical methods. These serve as both |
|
|
practical tools and examples for implementing custom detectors. |
|
|
|
|
|
Available Detectors: |
|
|
- ThresholdDetector: Fast safety limit checking |
|
|
- EWMADetector: Exponentially weighted moving average change detection |
|
|
- CUSUMDetector: Cumulative sum control chart for drift detection |
|
|
- PCADetector: Principal Component Analysis with T² and SPE statistics |
|
|
- StatisticalDetector: Multi-statistic ensemble detector |
|
|
- SlidingWindowDetector: Simple sliding window comparison |
|
|
|
|
|
Example: |
|
|
>>> from tep.detector_base import FaultDetectorRegistry |
|
|
>>> |
|
|
>>> # Create a detector |
|
|
>>> detector = FaultDetectorRegistry.create("threshold") |
|
|
>>> |
|
|
>>> # Or with custom parameters |
|
|
>>> detector = FaultDetectorRegistry.create("pca", window_size=300) |
|
|
""" |
|
|
|
|
|
import numpy as np |
|
|
from typing import Dict, List, Optional, Any, Tuple |
|
|
|
|
|
from .detector_base import ( |
|
|
BaseFaultDetector, |
|
|
DetectionResult, |
|
|
FaultDetectorRegistry, |
|
|
register_detector, |
|
|
) |
|
|
from .constants import SAFETY_LIMITS, NUM_MEASUREMENTS |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
@register_detector( |
|
|
name="threshold", |
|
|
description="Fast threshold-based detection using process safety limits" |
|
|
) |
|
|
class ThresholdDetector(BaseFaultDetector): |
|
|
""" |
|
|
Simple threshold detector based on process safety limits. |
|
|
|
|
|
Checks measurements against known safe operating ranges and flags |
|
|
violations. Very fast (no window needed) and useful as a first |
|
|
line of defense. |
|
|
|
|
|
This detector can identify when the process is moving toward unsafe |
|
|
conditions, which often indicates specific fault types. |
|
|
|
|
|
Attributes: |
|
|
limits: Dict mapping XMEAS index to (low, high) thresholds |
|
|
fault_mapping: Dict mapping violated sensor to likely fault class |
|
|
""" |
|
|
|
|
|
name = "threshold" |
|
|
description = "Threshold-based safety limit detector" |
|
|
window_size = 1 |
|
|
detect_interval = 1 |
|
|
async_mode = False |
|
|
|
|
|
def __init__(self, **kwargs): |
|
|
super().__init__(**kwargs) |
|
|
|
|
|
|
|
|
|
|
|
self.limits: Dict[int, Tuple[Optional[float], Optional[float]]] = { |
|
|
6: (None, 2900.0), |
|
|
7: (3.0, 22.0), |
|
|
8: (None, 170.0), |
|
|
11: (2.0, 11.0), |
|
|
14: (2.0, 7.0), |
|
|
} |
|
|
|
|
|
|
|
|
self.fault_mapping: Dict[int, int] = { |
|
|
6: 4, |
|
|
7: 1, |
|
|
8: 4, |
|
|
11: 7, |
|
|
14: 6, |
|
|
} |
|
|
|
|
|
def detect(self, xmeas: np.ndarray, step: int) -> DetectionResult: |
|
|
"""Check measurements against thresholds.""" |
|
|
violations = [] |
|
|
max_severity = 0.0 |
|
|
|
|
|
for idx, (low, high) in self.limits.items(): |
|
|
val = xmeas[idx] |
|
|
severity = 0.0 |
|
|
|
|
|
if low is not None and val < low: |
|
|
severity = (low - val) / max(abs(low), 1.0) |
|
|
violations.append((idx, severity)) |
|
|
elif high is not None and val > high: |
|
|
severity = (val - high) / max(abs(high), 1.0) |
|
|
violations.append((idx, severity)) |
|
|
|
|
|
max_severity = max(max_severity, severity) |
|
|
|
|
|
if violations: |
|
|
|
|
|
violations.sort(key=lambda x: -x[1]) |
|
|
top_sensor = violations[0][0] |
|
|
fault_class = self.fault_mapping.get(top_sensor, 1) |
|
|
|
|
|
|
|
|
confidence = min(0.5 + max_severity * 0.5, 0.99) |
|
|
|
|
|
return DetectionResult( |
|
|
fault_class=fault_class, |
|
|
confidence=confidence, |
|
|
step=step, |
|
|
contributing_sensors=[v[0] for v in violations[:3]], |
|
|
statistics={"max_severity": max_severity} |
|
|
) |
|
|
|
|
|
return DetectionResult( |
|
|
fault_class=0, |
|
|
confidence=0.9, |
|
|
step=step |
|
|
) |
|
|
|
|
|
def get_parameters(self) -> Dict[str, Any]: |
|
|
return { |
|
|
"limits": self.limits, |
|
|
"fault_mapping": self.fault_mapping, |
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
@register_detector( |
|
|
name="ewma", |
|
|
description="Exponentially Weighted Moving Average change detector" |
|
|
) |
|
|
class EWMADetector(BaseFaultDetector): |
|
|
""" |
|
|
EWMA-based change detection. |
|
|
|
|
|
Tracks exponentially weighted moving averages and variances of |
|
|
all measurements. Detects anomalies when current values deviate |
|
|
significantly from the running statistics. |
|
|
|
|
|
Good for detecting gradual changes and persistent deviations. |
|
|
|
|
|
Attributes: |
|
|
alpha: Smoothing factor (0-1). Higher = faster response. |
|
|
threshold: Z-score threshold for anomaly detection. |
|
|
warmup_steps: Steps before detection begins. |
|
|
""" |
|
|
|
|
|
name = "ewma" |
|
|
description = "EWMA-based change detection" |
|
|
window_size = 1 |
|
|
detect_interval = 1 |
|
|
async_mode = False |
|
|
|
|
|
def __init__(self, alpha: float = 0.1, threshold: float = 3.0, |
|
|
warmup_steps: int = 100, **kwargs): |
|
|
super().__init__(**kwargs) |
|
|
self.alpha = alpha |
|
|
self.threshold = threshold |
|
|
self.warmup_steps = warmup_steps |
|
|
|
|
|
self._ewma: Optional[np.ndarray] = None |
|
|
self._ewma_var: Optional[np.ndarray] = None |
|
|
self._step_count = 0 |
|
|
|
|
|
def detect(self, xmeas: np.ndarray, step: int) -> DetectionResult: |
|
|
"""Detect using EWMA statistics.""" |
|
|
self._step_count += 1 |
|
|
|
|
|
|
|
|
if self._ewma is None: |
|
|
self._ewma = xmeas.copy() |
|
|
self._ewma_var = np.ones(NUM_MEASUREMENTS) * 0.01 |
|
|
return DetectionResult(-1, 0.0, step) |
|
|
|
|
|
|
|
|
diff = xmeas - self._ewma |
|
|
std = np.sqrt(self._ewma_var + 1e-8) |
|
|
z_scores = np.abs(diff) / std |
|
|
|
|
|
|
|
|
self._ewma = self.alpha * xmeas + (1 - self.alpha) * self._ewma |
|
|
self._ewma_var = self.alpha * diff**2 + (1 - self.alpha) * self._ewma_var |
|
|
|
|
|
|
|
|
if self._step_count < self.warmup_steps: |
|
|
return DetectionResult(-1, 0.0, step) |
|
|
|
|
|
|
|
|
max_z = float(np.max(z_scores)) |
|
|
top_sensors = np.argsort(z_scores)[-5:][::-1].tolist() |
|
|
|
|
|
if max_z > self.threshold: |
|
|
|
|
|
confidence = min(0.5 + (max_z - self.threshold) * 0.1, 0.99) |
|
|
|
|
|
return DetectionResult( |
|
|
fault_class=1, |
|
|
confidence=confidence, |
|
|
step=step, |
|
|
contributing_sensors=top_sensors[:3], |
|
|
statistics={ |
|
|
"max_z_score": max_z, |
|
|
"mean_z_score": float(np.mean(z_scores)), |
|
|
} |
|
|
) |
|
|
|
|
|
return DetectionResult( |
|
|
fault_class=0, |
|
|
confidence=min(0.5 + (self.threshold - max_z) * 0.1, 0.95), |
|
|
step=step, |
|
|
statistics={"max_z_score": max_z} |
|
|
) |
|
|
|
|
|
def _reset_impl(self): |
|
|
"""Reset EWMA state.""" |
|
|
self._ewma = None |
|
|
self._ewma_var = None |
|
|
self._step_count = 0 |
|
|
|
|
|
def get_parameters(self) -> Dict[str, Any]: |
|
|
return { |
|
|
"alpha": self.alpha, |
|
|
"threshold": self.threshold, |
|
|
"warmup_steps": self.warmup_steps, |
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
@register_detector( |
|
|
name="cusum", |
|
|
description="Cumulative Sum control chart for drift detection" |
|
|
) |
|
|
class CUSUMDetector(BaseFaultDetector): |
|
|
""" |
|
|
CUSUM (Cumulative Sum) control chart detector. |
|
|
|
|
|
Accumulates deviations from expected values to detect persistent |
|
|
shifts. Very effective for detecting slow drifts that might be |
|
|
missed by instantaneous checks. |
|
|
|
|
|
Attributes: |
|
|
k: Slack parameter (allowance). Typically 0.5 * expected shift. |
|
|
h: Decision threshold. Higher = fewer false alarms. |
|
|
target: Target values (learned from initial data or provided). |
|
|
""" |
|
|
|
|
|
name = "cusum" |
|
|
description = "CUSUM control chart for drift detection" |
|
|
window_size = 100 |
|
|
detect_interval = 1 |
|
|
async_mode = False |
|
|
|
|
|
def __init__(self, k: float = 0.5, h: float = 5.0, **kwargs): |
|
|
super().__init__(**kwargs) |
|
|
self.k = k |
|
|
self.h = h |
|
|
|
|
|
self._target: Optional[np.ndarray] = None |
|
|
self._std: Optional[np.ndarray] = None |
|
|
self._cusum_pos: Optional[np.ndarray] = None |
|
|
self._cusum_neg: Optional[np.ndarray] = None |
|
|
self._baseline_learned = False |
|
|
|
|
|
def detect(self, xmeas: np.ndarray, step: int) -> DetectionResult: |
|
|
"""Detect using CUSUM statistics.""" |
|
|
|
|
|
if not self._baseline_learned: |
|
|
if not self.window_ready: |
|
|
return DetectionResult(-1, 0.0, step) |
|
|
|
|
|
window = self.window |
|
|
self._target = np.mean(window, axis=0) |
|
|
self._std = np.std(window, axis=0) + 1e-8 |
|
|
self._cusum_pos = np.zeros(NUM_MEASUREMENTS) |
|
|
self._cusum_neg = np.zeros(NUM_MEASUREMENTS) |
|
|
self._baseline_learned = True |
|
|
|
|
|
|
|
|
z = (xmeas - self._target) / self._std |
|
|
|
|
|
|
|
|
self._cusum_pos = np.maximum(0, self._cusum_pos + z - self.k) |
|
|
self._cusum_neg = np.maximum(0, self._cusum_neg - z - self.k) |
|
|
|
|
|
|
|
|
max_pos = float(np.max(self._cusum_pos)) |
|
|
max_neg = float(np.max(self._cusum_neg)) |
|
|
max_cusum = max(max_pos, max_neg) |
|
|
|
|
|
top_pos = np.argsort(self._cusum_pos)[-3:][::-1].tolist() |
|
|
top_neg = np.argsort(self._cusum_neg)[-3:][::-1].tolist() |
|
|
|
|
|
if max_cusum > self.h: |
|
|
confidence = min(0.5 + (max_cusum - self.h) / self.h * 0.3, 0.99) |
|
|
|
|
|
|
|
|
contributing = list(set(top_pos + top_neg))[:5] |
|
|
|
|
|
return DetectionResult( |
|
|
fault_class=1, |
|
|
confidence=confidence, |
|
|
step=step, |
|
|
contributing_sensors=contributing, |
|
|
statistics={ |
|
|
"max_cusum_pos": max_pos, |
|
|
"max_cusum_neg": max_neg, |
|
|
} |
|
|
) |
|
|
|
|
|
return DetectionResult( |
|
|
fault_class=0, |
|
|
confidence=0.9, |
|
|
step=step, |
|
|
statistics={ |
|
|
"max_cusum_pos": max_pos, |
|
|
"max_cusum_neg": max_neg, |
|
|
} |
|
|
) |
|
|
|
|
|
def _reset_impl(self): |
|
|
"""Reset CUSUM state.""" |
|
|
self._target = None |
|
|
self._std = None |
|
|
self._cusum_pos = None |
|
|
self._cusum_neg = None |
|
|
self._baseline_learned = False |
|
|
|
|
|
def get_parameters(self) -> Dict[str, Any]: |
|
|
return { |
|
|
"k": self.k, |
|
|
"h": self.h, |
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
@register_detector( |
|
|
name="pca", |
|
|
description="PCA-based detection with T² and SPE statistics" |
|
|
) |
|
|
class PCADetector(BaseFaultDetector): |
|
|
""" |
|
|
Principal Component Analysis detector. |
|
|
|
|
|
Projects measurements onto principal components learned from normal |
|
|
operation. Uses two statistics for detection: |
|
|
- T² (Hotelling's): Variation within the principal component space |
|
|
- SPE (Q): Residual variation not captured by the model |
|
|
|
|
|
This is a classic multivariate statistical process monitoring technique. |
|
|
|
|
|
Attributes: |
|
|
n_components: Number of principal components to retain |
|
|
t2_threshold: Threshold for T² statistic |
|
|
spe_threshold: Threshold for SPE statistic |
|
|
auto_train: If True, train on first full window |
|
|
""" |
|
|
|
|
|
name = "pca" |
|
|
description = "PCA-based T² and SPE fault detection" |
|
|
window_size = 200 |
|
|
window_sample_interval = 1 |
|
|
detect_interval = 10 |
|
|
async_mode = False |
|
|
|
|
|
def __init__(self, n_components: int = 10, t2_threshold: float = 15.0, |
|
|
spe_threshold: float = 25.0, auto_train: bool = True, **kwargs): |
|
|
super().__init__(**kwargs) |
|
|
self.n_components = n_components |
|
|
self.t2_threshold = t2_threshold |
|
|
self.spe_threshold = spe_threshold |
|
|
self.auto_train = auto_train |
|
|
|
|
|
self._mean: Optional[np.ndarray] = None |
|
|
self._std: Optional[np.ndarray] = None |
|
|
self._components: Optional[np.ndarray] = None |
|
|
self._eigenvalues: Optional[np.ndarray] = None |
|
|
self._trained = False |
|
|
|
|
|
def train(self, data: np.ndarray): |
|
|
""" |
|
|
Train PCA model on normal operation data. |
|
|
|
|
|
Args: |
|
|
data: Training data of shape (n_samples, 41) |
|
|
""" |
|
|
self._mean = np.mean(data, axis=0) |
|
|
self._std = np.std(data, axis=0) |
|
|
self._std[self._std < 1e-8] = 1.0 |
|
|
|
|
|
|
|
|
normalized = (data - self._mean) / self._std |
|
|
|
|
|
|
|
|
n_samples = len(data) |
|
|
U, S, Vt = np.linalg.svd(normalized, full_matrices=False) |
|
|
|
|
|
|
|
|
n_comp = min(self.n_components, len(S)) |
|
|
self._components = Vt[:n_comp] |
|
|
self._eigenvalues = (S[:n_comp]**2) / (n_samples - 1) |
|
|
|
|
|
self._trained = True |
|
|
|
|
|
def detect(self, xmeas: np.ndarray, step: int) -> DetectionResult: |
|
|
"""Detect using PCA statistics.""" |
|
|
|
|
|
if not self._trained: |
|
|
if self.auto_train and self.window_ready: |
|
|
self.train(self.window) |
|
|
else: |
|
|
return DetectionResult(-1, 0.0, step) |
|
|
|
|
|
|
|
|
x = (xmeas - self._mean) / self._std |
|
|
|
|
|
|
|
|
scores = x @ self._components.T |
|
|
|
|
|
|
|
|
t2 = float(np.sum(scores**2 / self._eigenvalues)) |
|
|
|
|
|
|
|
|
reconstruction = scores @ self._components |
|
|
residual = x - reconstruction |
|
|
spe = float(np.sum(residual**2)) |
|
|
|
|
|
|
|
|
contributions = np.abs(residual) |
|
|
top_sensors = np.argsort(contributions)[-5:][::-1].tolist() |
|
|
|
|
|
|
|
|
t2_violation = t2 > self.t2_threshold |
|
|
spe_violation = spe > self.spe_threshold |
|
|
|
|
|
if t2_violation or spe_violation: |
|
|
|
|
|
t2_ratio = t2 / self.t2_threshold if self.t2_threshold > 0 else 0 |
|
|
spe_ratio = spe / self.spe_threshold if self.spe_threshold > 0 else 0 |
|
|
max_ratio = max(t2_ratio, spe_ratio) |
|
|
|
|
|
confidence = min(0.5 + (max_ratio - 1) * 0.2, 0.99) |
|
|
|
|
|
return DetectionResult( |
|
|
fault_class=1, |
|
|
confidence=confidence, |
|
|
step=step, |
|
|
contributing_sensors=top_sensors[:3], |
|
|
statistics={ |
|
|
"T2": t2, |
|
|
"SPE": spe, |
|
|
"T2_threshold": self.t2_threshold, |
|
|
"SPE_threshold": self.spe_threshold, |
|
|
} |
|
|
) |
|
|
|
|
|
return DetectionResult( |
|
|
fault_class=0, |
|
|
confidence=0.85, |
|
|
step=step, |
|
|
statistics={"T2": t2, "SPE": spe} |
|
|
) |
|
|
|
|
|
def _reset_impl(self): |
|
|
"""Reset PCA state (keeps trained model).""" |
|
|
pass |
|
|
|
|
|
def reset_model(self): |
|
|
"""Clear the trained PCA model.""" |
|
|
self._mean = None |
|
|
self._std = None |
|
|
self._components = None |
|
|
self._eigenvalues = None |
|
|
self._trained = False |
|
|
|
|
|
@property |
|
|
def is_trained(self) -> bool: |
|
|
"""Check if model has been trained.""" |
|
|
return self._trained |
|
|
|
|
|
def get_parameters(self) -> Dict[str, Any]: |
|
|
return { |
|
|
"n_components": self.n_components, |
|
|
"t2_threshold": self.t2_threshold, |
|
|
"spe_threshold": self.spe_threshold, |
|
|
"auto_train": self.auto_train, |
|
|
"is_trained": self._trained, |
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
@register_detector( |
|
|
name="statistical", |
|
|
description="Multi-statistic ensemble detector" |
|
|
) |
|
|
class StatisticalDetector(BaseFaultDetector): |
|
|
""" |
|
|
Ensemble detector combining multiple statistical tests. |
|
|
|
|
|
Computes various statistics on the measurement window and flags |
|
|
anomalies when multiple tests indicate problems. More robust than |
|
|
single-statistic methods. |
|
|
|
|
|
Tests performed: |
|
|
- Mean shift detection |
|
|
- Variance change detection |
|
|
- Correlation change detection |
|
|
- Range (max-min) analysis |
|
|
""" |
|
|
|
|
|
name = "statistical" |
|
|
description = "Multi-statistic ensemble detector" |
|
|
window_size = 120 |
|
|
window_sample_interval = 1 |
|
|
detect_interval = 30 |
|
|
async_mode = False |
|
|
|
|
|
def __init__(self, mean_threshold: float = 2.5, var_threshold: float = 2.0, |
|
|
votes_required: int = 2, **kwargs): |
|
|
super().__init__(**kwargs) |
|
|
self.mean_threshold = mean_threshold |
|
|
self.var_threshold = var_threshold |
|
|
self.votes_required = votes_required |
|
|
|
|
|
self._baseline_mean: Optional[np.ndarray] = None |
|
|
self._baseline_std: Optional[np.ndarray] = None |
|
|
self._baseline_var: Optional[np.ndarray] = None |
|
|
self._baseline_learned = False |
|
|
|
|
|
def detect(self, xmeas: np.ndarray, step: int) -> DetectionResult: |
|
|
"""Detect using multiple statistical tests.""" |
|
|
if not self.window_ready: |
|
|
return DetectionResult(-1, 0.0, step) |
|
|
|
|
|
window = self.window |
|
|
|
|
|
|
|
|
if not self._baseline_learned: |
|
|
self._baseline_mean = np.mean(window, axis=0) |
|
|
self._baseline_std = np.std(window, axis=0) + 1e-8 |
|
|
self._baseline_var = np.var(window, axis=0) + 1e-8 |
|
|
self._baseline_learned = True |
|
|
return DetectionResult(-1, 0.0, step) |
|
|
|
|
|
|
|
|
current_mean = np.mean(window, axis=0) |
|
|
current_var = np.var(window, axis=0) + 1e-8 |
|
|
|
|
|
|
|
|
mean_z = np.abs(current_mean - self._baseline_mean) / self._baseline_std |
|
|
mean_violation = np.any(mean_z > self.mean_threshold) |
|
|
mean_score = float(np.max(mean_z)) |
|
|
|
|
|
|
|
|
var_ratio = np.maximum(current_var / self._baseline_var, |
|
|
self._baseline_var / current_var) |
|
|
var_violation = np.any(var_ratio > self.var_threshold) |
|
|
var_score = float(np.max(var_ratio)) |
|
|
|
|
|
|
|
|
half = self.window_size // 2 |
|
|
first_half_mean = np.mean(window[:half], axis=0) |
|
|
second_half_mean = np.mean(window[half:], axis=0) |
|
|
trend = np.abs(second_half_mean - first_half_mean) / self._baseline_std |
|
|
trend_violation = np.any(trend > self.mean_threshold) |
|
|
trend_score = float(np.max(trend)) |
|
|
|
|
|
|
|
|
votes = sum([mean_violation, var_violation, trend_violation]) |
|
|
|
|
|
|
|
|
all_scores = mean_z + var_ratio + trend |
|
|
top_sensors = np.argsort(all_scores)[-5:][::-1].tolist() |
|
|
|
|
|
if votes >= self.votes_required: |
|
|
confidence = min(0.4 + votes * 0.2, 0.95) |
|
|
|
|
|
return DetectionResult( |
|
|
fault_class=1, |
|
|
confidence=confidence, |
|
|
step=step, |
|
|
contributing_sensors=top_sensors[:3], |
|
|
statistics={ |
|
|
"mean_score": mean_score, |
|
|
"var_score": var_score, |
|
|
"trend_score": trend_score, |
|
|
"votes": votes, |
|
|
} |
|
|
) |
|
|
|
|
|
return DetectionResult( |
|
|
fault_class=0, |
|
|
confidence=0.8, |
|
|
step=step, |
|
|
statistics={ |
|
|
"mean_score": mean_score, |
|
|
"var_score": var_score, |
|
|
"trend_score": trend_score, |
|
|
"votes": votes, |
|
|
} |
|
|
) |
|
|
|
|
|
def _reset_impl(self): |
|
|
"""Reset statistical baselines.""" |
|
|
self._baseline_mean = None |
|
|
self._baseline_std = None |
|
|
self._baseline_var = None |
|
|
self._baseline_learned = False |
|
|
|
|
|
def get_parameters(self) -> Dict[str, Any]: |
|
|
return { |
|
|
"mean_threshold": self.mean_threshold, |
|
|
"var_threshold": self.var_threshold, |
|
|
"votes_required": self.votes_required, |
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
@register_detector( |
|
|
name="sliding_window", |
|
|
description="Simple sliding window comparison detector" |
|
|
) |
|
|
class SlidingWindowDetector(BaseFaultDetector): |
|
|
""" |
|
|
Simple sliding window detector comparing recent vs older data. |
|
|
|
|
|
Splits the window into two halves and detects when the recent |
|
|
half differs significantly from the older half. Simple but |
|
|
effective for detecting step changes. |
|
|
""" |
|
|
|
|
|
name = "sliding_window" |
|
|
description = "Sliding window comparison detector" |
|
|
window_size = 60 |
|
|
detect_interval = 10 |
|
|
async_mode = False |
|
|
|
|
|
def __init__(self, threshold: float = 3.0, **kwargs): |
|
|
super().__init__(**kwargs) |
|
|
self.threshold = threshold |
|
|
|
|
|
def detect(self, xmeas: np.ndarray, step: int) -> DetectionResult: |
|
|
"""Detect by comparing window halves.""" |
|
|
if not self.window_ready: |
|
|
return DetectionResult(-1, 0.0, step) |
|
|
|
|
|
window = self.window |
|
|
half = self.window_size // 2 |
|
|
|
|
|
|
|
|
old_half = window[:half] |
|
|
new_half = window[half:] |
|
|
|
|
|
|
|
|
old_mean = np.mean(old_half, axis=0) |
|
|
new_mean = np.mean(new_half, axis=0) |
|
|
old_std = np.std(old_half, axis=0) + 1e-8 |
|
|
|
|
|
z_scores = np.abs(new_mean - old_mean) / old_std |
|
|
max_z = float(np.max(z_scores)) |
|
|
top_sensors = np.argsort(z_scores)[-3:][::-1].tolist() |
|
|
|
|
|
if max_z > self.threshold: |
|
|
confidence = min(0.5 + (max_z - self.threshold) * 0.1, 0.95) |
|
|
|
|
|
return DetectionResult( |
|
|
fault_class=1, |
|
|
confidence=confidence, |
|
|
step=step, |
|
|
contributing_sensors=top_sensors, |
|
|
statistics={"max_z_score": max_z} |
|
|
) |
|
|
|
|
|
return DetectionResult( |
|
|
fault_class=0, |
|
|
confidence=0.85, |
|
|
step=step, |
|
|
statistics={"max_z_score": max_z} |
|
|
) |
|
|
|
|
|
def get_parameters(self) -> Dict[str, Any]: |
|
|
return {"threshold": self.threshold} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
@register_detector( |
|
|
name="composite", |
|
|
description="Combines multiple detectors with voting" |
|
|
) |
|
|
class CompositeDetector(BaseFaultDetector): |
|
|
""" |
|
|
Combines multiple detectors using voting. |
|
|
|
|
|
Runs several detectors and aggregates their outputs. Can be |
|
|
configured to require unanimous agreement or majority voting. |
|
|
""" |
|
|
|
|
|
name = "composite" |
|
|
description = "Multi-detector voting ensemble" |
|
|
window_size = 1 |
|
|
detect_interval = 1 |
|
|
async_mode = False |
|
|
|
|
|
def __init__(self, detectors: List[BaseFaultDetector] = None, |
|
|
min_votes: int = 2, **kwargs): |
|
|
super().__init__(**kwargs) |
|
|
self._sub_detectors = detectors or [] |
|
|
self.min_votes = min_votes |
|
|
|
|
|
def add_detector(self, detector: BaseFaultDetector): |
|
|
"""Add a sub-detector.""" |
|
|
self._sub_detectors.append(detector) |
|
|
|
|
|
def detect(self, xmeas: np.ndarray, step: int) -> DetectionResult: |
|
|
"""Aggregate sub-detector results.""" |
|
|
if not self._sub_detectors: |
|
|
return DetectionResult(-1, 0.0, step) |
|
|
|
|
|
results = [] |
|
|
for detector in self._sub_detectors: |
|
|
result = detector.process(xmeas, step) |
|
|
if result.is_ready: |
|
|
results.append(result) |
|
|
|
|
|
if not results: |
|
|
return DetectionResult(-1, 0.0, step) |
|
|
|
|
|
|
|
|
fault_votes = sum(1 for r in results if r.is_fault) |
|
|
total_votes = len(results) |
|
|
|
|
|
|
|
|
if fault_votes >= self.min_votes: |
|
|
fault_results = [r for r in results if r.is_fault] |
|
|
avg_confidence = np.mean([r.confidence for r in fault_results]) |
|
|
|
|
|
|
|
|
fault_classes = [r.fault_class for r in fault_results] |
|
|
most_common = max(set(fault_classes), key=fault_classes.count) |
|
|
|
|
|
|
|
|
all_sensors = [] |
|
|
for r in fault_results: |
|
|
if r.contributing_sensors: |
|
|
all_sensors.extend(r.contributing_sensors) |
|
|
|
|
|
return DetectionResult( |
|
|
fault_class=most_common, |
|
|
confidence=float(avg_confidence), |
|
|
step=step, |
|
|
contributing_sensors=list(set(all_sensors))[:5], |
|
|
statistics={ |
|
|
"fault_votes": fault_votes, |
|
|
"total_votes": total_votes, |
|
|
} |
|
|
) |
|
|
|
|
|
|
|
|
normal_results = [r for r in results if r.is_normal] |
|
|
if normal_results: |
|
|
avg_confidence = np.mean([r.confidence for r in normal_results]) |
|
|
else: |
|
|
avg_confidence = 0.5 |
|
|
|
|
|
return DetectionResult( |
|
|
fault_class=0, |
|
|
confidence=float(avg_confidence), |
|
|
step=step, |
|
|
statistics={ |
|
|
"fault_votes": fault_votes, |
|
|
"total_votes": total_votes, |
|
|
} |
|
|
) |
|
|
|
|
|
def _reset_impl(self): |
|
|
"""Reset all sub-detectors.""" |
|
|
for detector in self._sub_detectors: |
|
|
detector.reset() |
|
|
|
|
|
def get_parameters(self) -> Dict[str, Any]: |
|
|
return { |
|
|
"min_votes": self.min_votes, |
|
|
"n_detectors": len(self._sub_detectors), |
|
|
"detector_names": [d.name for d in self._sub_detectors], |
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
@register_detector( |
|
|
name="passthrough", |
|
|
description="Always reports normal (baseline for comparison)" |
|
|
) |
|
|
class PassthroughDetector(BaseFaultDetector): |
|
|
""" |
|
|
Baseline detector that always reports normal operation. |
|
|
|
|
|
Useful for: |
|
|
- Testing the detector framework |
|
|
- Establishing a false-alarm-free baseline |
|
|
- Placeholder when no detection is wanted |
|
|
""" |
|
|
|
|
|
name = "passthrough" |
|
|
description = "Baseline detector (always normal)" |
|
|
window_size = 1 |
|
|
detect_interval = 1 |
|
|
async_mode = False |
|
|
|
|
|
def detect(self, xmeas: np.ndarray, step: int) -> DetectionResult: |
|
|
"""Always return normal.""" |
|
|
return DetectionResult( |
|
|
fault_class=0, |
|
|
confidence=1.0, |
|
|
step=step |
|
|
) |
|
|
|