|
|
""" |
|
|
Example controller plugins for the Tennessee Eastman Process. |
|
|
|
|
|
This module provides a range of example controllers demonstrating the |
|
|
plugin system, from single-loop controllers to full process control. |
|
|
|
|
|
Controllers are organized by complexity: |
|
|
1. Single-loop: Control one MV based on one measurement |
|
|
2. Subsystem: Control a related group of MVs (e.g., reactor) |
|
|
3. Composition: Control product quality via cascade loops |
|
|
4. Full process: Control all MVs with coordinated strategy |
|
|
|
|
|
All controllers inherit from BaseController and are registered with |
|
|
the ControllerRegistry for easy discovery and instantiation. |
|
|
""" |
|
|
|
|
|
import numpy as np |
|
|
from typing import Dict, List, Optional, Any |
|
|
from dataclasses import dataclass |
|
|
|
|
|
from .controller_base import ( |
|
|
BaseController, |
|
|
ControllerRegistry, |
|
|
register_controller, |
|
|
CompositeController, |
|
|
) |
|
|
from .controllers import PIController |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
@register_controller( |
|
|
name="reactor_temp", |
|
|
description="Single-loop reactor temperature control via cooling water" |
|
|
) |
|
|
class ReactorTemperatureController(BaseController): |
|
|
""" |
|
|
Single-loop controller for reactor temperature. |
|
|
|
|
|
Controls reactor temperature (XMEAS 9) by adjusting reactor |
|
|
cooling water flow (XMV 10). This is a critical safety loop. |
|
|
|
|
|
This is the simplest example of a controller plugin - it manages |
|
|
just one MV based on one measurement. |
|
|
""" |
|
|
|
|
|
name = "reactor_temp" |
|
|
description = "Single-loop reactor temperature control via cooling water" |
|
|
version = "1.0.0" |
|
|
controlled_mvs = [10] |
|
|
|
|
|
def __init__( |
|
|
self, |
|
|
setpoint: float = 120.40, |
|
|
gain: float = -1.56, |
|
|
taui: float = 0.403, |
|
|
): |
|
|
""" |
|
|
Initialize reactor temperature controller. |
|
|
|
|
|
Args: |
|
|
setpoint: Target reactor temperature (deg C) |
|
|
gain: Controller gain (negative for reverse action) |
|
|
taui: Integral time constant (hours) |
|
|
""" |
|
|
self.setpoint = setpoint |
|
|
self.gain = gain |
|
|
self.taui = taui |
|
|
|
|
|
self._controller = PIController( |
|
|
setpoint=setpoint, |
|
|
gain=gain, |
|
|
taui=taui, |
|
|
scale=100.0 / 150.0, |
|
|
output_min=0.0, |
|
|
output_max=100.0, |
|
|
) |
|
|
|
|
|
def calculate( |
|
|
self, |
|
|
xmeas: np.ndarray, |
|
|
xmv: np.ndarray, |
|
|
step: int |
|
|
) -> np.ndarray: |
|
|
"""Calculate cooling water valve position.""" |
|
|
xmv_new = xmv.copy() |
|
|
|
|
|
|
|
|
if step % 3 == 0: |
|
|
dt = 3.0 / 3600.0 |
|
|
reactor_temp = xmeas[8] |
|
|
xmv_new[9] = self._controller.calculate( |
|
|
reactor_temp, xmv_new[9], dt |
|
|
) |
|
|
|
|
|
return xmv_new |
|
|
|
|
|
def reset(self): |
|
|
"""Reset controller state.""" |
|
|
self._controller.reset() |
|
|
|
|
|
def get_parameters(self) -> Dict[str, Any]: |
|
|
"""Get controller parameters.""" |
|
|
return { |
|
|
"setpoint": self.setpoint, |
|
|
"gain": self.gain, |
|
|
"taui": self.taui, |
|
|
} |
|
|
|
|
|
|
|
|
@register_controller( |
|
|
name="separator_level", |
|
|
description="Single-loop separator level control" |
|
|
) |
|
|
class SeparatorLevelController(BaseController): |
|
|
""" |
|
|
Single-loop controller for separator level. |
|
|
|
|
|
Controls separator level (XMEAS 12) by adjusting separator |
|
|
underflow valve (XMV 7). |
|
|
""" |
|
|
|
|
|
name = "separator_level" |
|
|
description = "Single-loop separator level control" |
|
|
version = "1.0.0" |
|
|
controlled_mvs = [7] |
|
|
|
|
|
def __init__( |
|
|
self, |
|
|
setpoint: float = 50.0, |
|
|
gain: float = -2.06, |
|
|
): |
|
|
""" |
|
|
Initialize separator level controller. |
|
|
|
|
|
Args: |
|
|
setpoint: Target level (%) |
|
|
gain: Controller gain (negative for reverse action) |
|
|
""" |
|
|
self.setpoint = setpoint |
|
|
self.gain = gain |
|
|
|
|
|
self._controller = PIController( |
|
|
setpoint=setpoint, |
|
|
gain=gain, |
|
|
taui=0.0, |
|
|
scale=100.0 / 70.0, |
|
|
) |
|
|
|
|
|
def calculate( |
|
|
self, |
|
|
xmeas: np.ndarray, |
|
|
xmv: np.ndarray, |
|
|
step: int |
|
|
) -> np.ndarray: |
|
|
"""Calculate separator underflow valve position.""" |
|
|
xmv_new = xmv.copy() |
|
|
|
|
|
if step % 3 == 0: |
|
|
dt = 3.0 / 3600.0 |
|
|
sep_level = xmeas[11] |
|
|
xmv_new[6] = self._controller.calculate( |
|
|
sep_level, xmv_new[6], dt |
|
|
) |
|
|
|
|
|
return xmv_new |
|
|
|
|
|
def reset(self): |
|
|
"""Reset controller state.""" |
|
|
self._controller.reset() |
|
|
|
|
|
def get_parameters(self) -> Dict[str, Any]: |
|
|
return {"setpoint": self.setpoint, "gain": self.gain} |
|
|
|
|
|
|
|
|
@register_controller( |
|
|
name="stripper_level", |
|
|
description="Single-loop stripper level control" |
|
|
) |
|
|
class StripperLevelController(BaseController): |
|
|
""" |
|
|
Single-loop controller for stripper level. |
|
|
|
|
|
Controls stripper level (XMEAS 15) by adjusting stripper |
|
|
product flow valve (XMV 8). |
|
|
""" |
|
|
|
|
|
name = "stripper_level" |
|
|
description = "Single-loop stripper level control" |
|
|
version = "1.0.0" |
|
|
controlled_mvs = [8] |
|
|
|
|
|
def __init__( |
|
|
self, |
|
|
setpoint: float = 50.0, |
|
|
gain: float = -1.62, |
|
|
): |
|
|
self.setpoint = setpoint |
|
|
self.gain = gain |
|
|
|
|
|
self._controller = PIController( |
|
|
setpoint=setpoint, |
|
|
gain=gain, |
|
|
taui=0.0, |
|
|
scale=100.0 / 70.0, |
|
|
) |
|
|
|
|
|
def calculate( |
|
|
self, |
|
|
xmeas: np.ndarray, |
|
|
xmv: np.ndarray, |
|
|
step: int |
|
|
) -> np.ndarray: |
|
|
xmv_new = xmv.copy() |
|
|
|
|
|
if step % 3 == 0: |
|
|
dt = 3.0 / 3600.0 |
|
|
strip_level = xmeas[14] |
|
|
xmv_new[7] = self._controller.calculate( |
|
|
strip_level, xmv_new[7], dt |
|
|
) |
|
|
|
|
|
return xmv_new |
|
|
|
|
|
def reset(self): |
|
|
self._controller.reset() |
|
|
|
|
|
def get_parameters(self) -> Dict[str, Any]: |
|
|
return {"setpoint": self.setpoint, "gain": self.gain} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
@register_controller( |
|
|
name="reactor_subsystem", |
|
|
description="Reactor subsystem control (temperature, level, cooling)" |
|
|
) |
|
|
class ReactorSubsystemController(BaseController): |
|
|
""" |
|
|
Subsystem controller for the reactor section. |
|
|
|
|
|
Controls: |
|
|
- Reactor temperature via cooling water (XMV 10) |
|
|
- Reactor level via A+C feed (XMV 4) |
|
|
|
|
|
Uses cascade control where the reactor level controller adjusts |
|
|
the A+C feed flow setpoint. |
|
|
""" |
|
|
|
|
|
name = "reactor_subsystem" |
|
|
description = "Reactor subsystem control (temperature, level, cooling)" |
|
|
version = "1.0.0" |
|
|
controlled_mvs = [4, 10] |
|
|
|
|
|
def __init__( |
|
|
self, |
|
|
temp_setpoint: float = 120.40, |
|
|
level_setpoint: float = 75.0, |
|
|
): |
|
|
""" |
|
|
Initialize reactor subsystem controller. |
|
|
|
|
|
Args: |
|
|
temp_setpoint: Target reactor temperature (deg C) |
|
|
level_setpoint: Target reactor level (%) |
|
|
""" |
|
|
self.temp_setpoint = temp_setpoint |
|
|
self.level_setpoint = level_setpoint |
|
|
|
|
|
|
|
|
self.cw_temp_setpoint = 94.599 |
|
|
self._ctrl_cw_temp = PIController( |
|
|
setpoint=self.cw_temp_setpoint, |
|
|
gain=-1.56, |
|
|
taui=1452.0 / 3600.0, |
|
|
scale=100.0 / 150.0, |
|
|
) |
|
|
|
|
|
|
|
|
self._ctrl_reactor_temp = PIController( |
|
|
setpoint=temp_setpoint, |
|
|
gain=28.3, |
|
|
taui=982.0 / 3600.0, |
|
|
scale=100.0 / 150.0, |
|
|
) |
|
|
|
|
|
|
|
|
self.ac_feed_setpoint = 9.3477 |
|
|
self._ctrl_ac_feed = PIController( |
|
|
setpoint=self.ac_feed_setpoint, |
|
|
gain=1.0, |
|
|
taui=0.0, |
|
|
scale=100.0 / 15.25, |
|
|
) |
|
|
|
|
|
|
|
|
self._ctrl_reactor_level = PIController( |
|
|
setpoint=level_setpoint, |
|
|
gain=1.11, |
|
|
taui=3168.0 / 3600.0, |
|
|
scale=100.0 / 50.0, |
|
|
) |
|
|
|
|
|
def calculate( |
|
|
self, |
|
|
xmeas: np.ndarray, |
|
|
xmv: np.ndarray, |
|
|
step: int |
|
|
) -> np.ndarray: |
|
|
xmv_new = xmv.copy() |
|
|
|
|
|
if step % 3 == 0: |
|
|
dt = 3.0 / 3600.0 |
|
|
|
|
|
|
|
|
|
|
|
dxmv = self._ctrl_reactor_temp.calculate_change(xmeas[8], dt) |
|
|
self.cw_temp_setpoint += dxmv * 150.0 / 100.0 |
|
|
self._ctrl_cw_temp.setpoint = self.cw_temp_setpoint |
|
|
|
|
|
|
|
|
xmv_new[9] = self._ctrl_cw_temp.calculate( |
|
|
xmeas[20], xmv_new[9], dt |
|
|
) |
|
|
|
|
|
|
|
|
|
|
|
dxmv = self._ctrl_reactor_level.calculate_change(xmeas[7], dt) |
|
|
self.ac_feed_setpoint += dxmv * 15.25 / 100.0 |
|
|
self._ctrl_ac_feed.setpoint = self.ac_feed_setpoint |
|
|
|
|
|
|
|
|
xmv_new[3] = self._ctrl_ac_feed.calculate( |
|
|
xmeas[3], xmv_new[3], dt |
|
|
) |
|
|
|
|
|
return xmv_new |
|
|
|
|
|
def reset(self): |
|
|
self.cw_temp_setpoint = 94.599 |
|
|
self.ac_feed_setpoint = 9.3477 |
|
|
self._ctrl_cw_temp.reset() |
|
|
self._ctrl_reactor_temp.reset() |
|
|
self._ctrl_ac_feed.reset() |
|
|
self._ctrl_reactor_level.reset() |
|
|
|
|
|
def get_parameters(self) -> Dict[str, Any]: |
|
|
return { |
|
|
"temp_setpoint": self.temp_setpoint, |
|
|
"level_setpoint": self.level_setpoint, |
|
|
"cw_temp_setpoint": self.cw_temp_setpoint, |
|
|
"ac_feed_setpoint": self.ac_feed_setpoint, |
|
|
} |
|
|
|
|
|
|
|
|
@register_controller( |
|
|
name="separator_subsystem", |
|
|
description="Separator subsystem control (level, pressure)" |
|
|
) |
|
|
class SeparatorSubsystemController(BaseController): |
|
|
""" |
|
|
Subsystem controller for the separator section. |
|
|
|
|
|
Controls: |
|
|
- Separator level via underflow (XMV 7) |
|
|
- Condenser cooling water (XMV 11) |
|
|
""" |
|
|
|
|
|
name = "separator_subsystem" |
|
|
description = "Separator subsystem control (level, pressure)" |
|
|
version = "1.0.0" |
|
|
controlled_mvs = [7, 11] |
|
|
|
|
|
def __init__( |
|
|
self, |
|
|
level_setpoint: float = 50.0, |
|
|
): |
|
|
self.level_setpoint = level_setpoint |
|
|
|
|
|
self._ctrl_level = PIController( |
|
|
setpoint=level_setpoint, |
|
|
gain=-2.06, |
|
|
taui=0.0, |
|
|
scale=100.0 / 70.0, |
|
|
) |
|
|
|
|
|
self._ctrl_condenser = PIController( |
|
|
setpoint=22.949, |
|
|
gain=1.09, |
|
|
taui=2600.0 / 3600.0, |
|
|
scale=100.0 / 46.0, |
|
|
) |
|
|
|
|
|
def calculate( |
|
|
self, |
|
|
xmeas: np.ndarray, |
|
|
xmv: np.ndarray, |
|
|
step: int |
|
|
) -> np.ndarray: |
|
|
xmv_new = xmv.copy() |
|
|
|
|
|
if step % 3 == 0: |
|
|
dt = 3.0 / 3600.0 |
|
|
|
|
|
|
|
|
xmv_new[6] = self._ctrl_level.calculate( |
|
|
xmeas[11], xmv_new[6], dt |
|
|
) |
|
|
|
|
|
|
|
|
xmv_new[10] = self._ctrl_condenser.calculate( |
|
|
xmeas[16], xmv_new[10], dt |
|
|
) |
|
|
|
|
|
return xmv_new |
|
|
|
|
|
def reset(self): |
|
|
self._ctrl_level.reset() |
|
|
self._ctrl_condenser.reset() |
|
|
|
|
|
def get_parameters(self) -> Dict[str, Any]: |
|
|
return {"level_setpoint": self.level_setpoint} |
|
|
|
|
|
|
|
|
@register_controller( |
|
|
name="feed_subsystem", |
|
|
description="Feed subsystem control (D, E, A, A+C feeds)" |
|
|
) |
|
|
class FeedSubsystemController(BaseController): |
|
|
""" |
|
|
Subsystem controller for feed flows. |
|
|
|
|
|
Controls: |
|
|
- D feed flow (XMV 1) |
|
|
- E feed flow (XMV 2) |
|
|
- A feed flow (XMV 3) |
|
|
- A+C feed flow (XMV 4) |
|
|
|
|
|
These are typically flow control loops that track setpoints |
|
|
from higher-level composition controllers. |
|
|
""" |
|
|
|
|
|
name = "feed_subsystem" |
|
|
description = "Feed subsystem control (D, E, A, A+C feeds)" |
|
|
version = "1.0.0" |
|
|
controlled_mvs = [1, 2, 3, 4] |
|
|
|
|
|
def __init__( |
|
|
self, |
|
|
d_feed_sp: float = 3664.0, |
|
|
e_feed_sp: float = 4509.3, |
|
|
a_feed_sp: float = 0.25052, |
|
|
ac_feed_sp: float = 9.3477, |
|
|
): |
|
|
self.d_feed_sp = d_feed_sp |
|
|
self.e_feed_sp = e_feed_sp |
|
|
self.a_feed_sp = a_feed_sp |
|
|
self.ac_feed_sp = ac_feed_sp |
|
|
|
|
|
self._ctrl_d = PIController( |
|
|
setpoint=d_feed_sp, gain=1.0, taui=0.0, |
|
|
scale=100.0 / 5811.0 |
|
|
) |
|
|
self._ctrl_e = PIController( |
|
|
setpoint=e_feed_sp, gain=1.0, taui=0.0, |
|
|
scale=100.0 / 8354.0 |
|
|
) |
|
|
self._ctrl_a = PIController( |
|
|
setpoint=a_feed_sp, gain=1.0, taui=0.0, |
|
|
scale=100.0 / 1.017 |
|
|
) |
|
|
self._ctrl_ac = PIController( |
|
|
setpoint=ac_feed_sp, gain=1.0, taui=0.0, |
|
|
scale=100.0 / 15.25 |
|
|
) |
|
|
|
|
|
def calculate( |
|
|
self, |
|
|
xmeas: np.ndarray, |
|
|
xmv: np.ndarray, |
|
|
step: int |
|
|
) -> np.ndarray: |
|
|
xmv_new = xmv.copy() |
|
|
|
|
|
if step % 3 == 0: |
|
|
dt = 3.0 / 3600.0 |
|
|
|
|
|
xmv_new[0] = self._ctrl_d.calculate(xmeas[1], xmv_new[0], dt) |
|
|
xmv_new[1] = self._ctrl_e.calculate(xmeas[2], xmv_new[1], dt) |
|
|
xmv_new[2] = self._ctrl_a.calculate(xmeas[0], xmv_new[2], dt) |
|
|
xmv_new[3] = self._ctrl_ac.calculate(xmeas[3], xmv_new[3], dt) |
|
|
|
|
|
return xmv_new |
|
|
|
|
|
def reset(self): |
|
|
self._ctrl_d.reset() |
|
|
self._ctrl_e.reset() |
|
|
self._ctrl_a.reset() |
|
|
self._ctrl_ac.reset() |
|
|
|
|
|
def set_setpoints( |
|
|
self, |
|
|
d_feed: float = None, |
|
|
e_feed: float = None, |
|
|
a_feed: float = None, |
|
|
ac_feed: float = None |
|
|
): |
|
|
"""Update feed flow setpoints.""" |
|
|
if d_feed is not None: |
|
|
self.d_feed_sp = d_feed |
|
|
self._ctrl_d.setpoint = d_feed |
|
|
if e_feed is not None: |
|
|
self.e_feed_sp = e_feed |
|
|
self._ctrl_e.setpoint = e_feed |
|
|
if a_feed is not None: |
|
|
self.a_feed_sp = a_feed |
|
|
self._ctrl_a.setpoint = a_feed |
|
|
if ac_feed is not None: |
|
|
self.ac_feed_sp = ac_feed |
|
|
self._ctrl_ac.setpoint = ac_feed |
|
|
|
|
|
def get_parameters(self) -> Dict[str, Any]: |
|
|
return { |
|
|
"d_feed_sp": self.d_feed_sp, |
|
|
"e_feed_sp": self.e_feed_sp, |
|
|
"a_feed_sp": self.a_feed_sp, |
|
|
"ac_feed_sp": self.ac_feed_sp, |
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
@register_controller( |
|
|
name="product_quality", |
|
|
description="Product quality control via stripper temperature cascade" |
|
|
) |
|
|
class ProductQualityController(BaseController): |
|
|
""" |
|
|
Product quality controller using cascade control. |
|
|
|
|
|
Controls product E composition (XMEAS 38) by adjusting stripper |
|
|
temperature, which in turn adjusts steam flow (XMV 9). |
|
|
|
|
|
This is a slow outer loop (15-minute sample time) that cascades |
|
|
to a faster temperature loop (3-second sample time). |
|
|
""" |
|
|
|
|
|
name = "product_quality" |
|
|
description = "Product quality control via stripper temperature cascade" |
|
|
version = "1.0.0" |
|
|
controlled_mvs = [9] |
|
|
|
|
|
def __init__( |
|
|
self, |
|
|
product_e_setpoint: float = 0.8357, |
|
|
): |
|
|
""" |
|
|
Initialize product quality controller. |
|
|
|
|
|
Args: |
|
|
product_e_setpoint: Target product E composition (mol fraction) |
|
|
""" |
|
|
self.product_e_setpoint = product_e_setpoint |
|
|
|
|
|
|
|
|
self.stripper_temp_setpoint = 65.731 |
|
|
|
|
|
|
|
|
self.steam_flow_setpoint = 230.31 |
|
|
|
|
|
|
|
|
self._ctrl_product_e = PIController( |
|
|
setpoint=product_e_setpoint, |
|
|
gain=-3.26, |
|
|
taui=12408.0 / 3600.0, |
|
|
scale=100.0 / 1.6, |
|
|
) |
|
|
|
|
|
|
|
|
self._ctrl_stripper_temp = PIController( |
|
|
setpoint=self.stripper_temp_setpoint, |
|
|
gain=0.169, |
|
|
taui=236.0 / 3600.0, |
|
|
scale=100.0 / 130.0, |
|
|
) |
|
|
|
|
|
|
|
|
self._ctrl_steam = PIController( |
|
|
setpoint=self.steam_flow_setpoint, |
|
|
gain=0.41, |
|
|
taui=0.0, |
|
|
scale=100.0 / 460.0, |
|
|
) |
|
|
|
|
|
def calculate( |
|
|
self, |
|
|
xmeas: np.ndarray, |
|
|
xmv: np.ndarray, |
|
|
step: int |
|
|
) -> np.ndarray: |
|
|
xmv_new = xmv.copy() |
|
|
|
|
|
|
|
|
if step % 3 == 0: |
|
|
dt3 = 3.0 / 3600.0 |
|
|
|
|
|
|
|
|
self._ctrl_steam.setpoint = self.steam_flow_setpoint |
|
|
|
|
|
|
|
|
xmv_new[8] = self._ctrl_steam.calculate( |
|
|
xmeas[18], xmv_new[8], dt3 |
|
|
) |
|
|
|
|
|
|
|
|
dxmv = self._ctrl_stripper_temp.calculate_change(xmeas[17], dt3) |
|
|
self.steam_flow_setpoint += dxmv * 460.0 / 100.0 |
|
|
|
|
|
|
|
|
if step % 900 == 0: |
|
|
dt900 = 900.0 / 3600.0 |
|
|
|
|
|
|
|
|
dxmv = self._ctrl_product_e.calculate_change(xmeas[37], dt900) |
|
|
self.stripper_temp_setpoint += dxmv * 130.0 / 100.0 |
|
|
self._ctrl_stripper_temp.setpoint = self.stripper_temp_setpoint |
|
|
|
|
|
return xmv_new |
|
|
|
|
|
def reset(self): |
|
|
self.stripper_temp_setpoint = 65.731 |
|
|
self.steam_flow_setpoint = 230.31 |
|
|
self._ctrl_product_e.reset() |
|
|
self._ctrl_stripper_temp.reset() |
|
|
self._ctrl_steam.reset() |
|
|
|
|
|
def get_parameters(self) -> Dict[str, Any]: |
|
|
return { |
|
|
"product_e_setpoint": self.product_e_setpoint, |
|
|
"stripper_temp_setpoint": self.stripper_temp_setpoint, |
|
|
"steam_flow_setpoint": self.steam_flow_setpoint, |
|
|
} |
|
|
|
|
|
|
|
|
@register_controller( |
|
|
name="reactor_composition", |
|
|
description="Reactor feed composition control (A, D, E in feed)" |
|
|
) |
|
|
class ReactorCompositionController(BaseController): |
|
|
""" |
|
|
Reactor feed composition controller. |
|
|
|
|
|
Controls reactor feed composition by adjusting feed flow setpoints. |
|
|
Uses slow (6-minute) cascade loops. |
|
|
|
|
|
Controls: |
|
|
- Feed A composition via A feed (affects XMV 3 setpoint) |
|
|
- Feed D composition via D feed (affects XMV 1 setpoint) |
|
|
- Feed E composition via E feed (affects XMV 2 setpoint) |
|
|
|
|
|
Note: This controller outputs setpoints, not direct MV values. |
|
|
It should be used with a FeedSubsystemController to execute |
|
|
the flow control. |
|
|
""" |
|
|
|
|
|
name = "reactor_composition" |
|
|
description = "Reactor feed composition control (A, D, E in feed)" |
|
|
version = "1.0.0" |
|
|
controlled_mvs = [1, 2, 3] |
|
|
|
|
|
def __init__( |
|
|
self, |
|
|
comp_a_setpoint: float = 32.188, |
|
|
comp_d_setpoint: float = 6.882, |
|
|
comp_e_setpoint: float = 18.776, |
|
|
): |
|
|
self.comp_a_setpoint = comp_a_setpoint |
|
|
self.comp_d_setpoint = comp_d_setpoint |
|
|
self.comp_e_setpoint = comp_e_setpoint |
|
|
|
|
|
|
|
|
self.a_feed_setpoint = 0.25052 |
|
|
self.d_feed_setpoint = 3664.0 |
|
|
self.e_feed_setpoint = 4509.3 |
|
|
|
|
|
|
|
|
self._ctrl_comp_a = PIController( |
|
|
setpoint=comp_a_setpoint, gain=18.0, |
|
|
taui=3168.0 / 3600.0, scale=100.0 / 100.0 |
|
|
) |
|
|
self._ctrl_comp_d = PIController( |
|
|
setpoint=comp_d_setpoint, gain=8.3, |
|
|
taui=3168.0 / 3600.0, scale=100.0 / 100.0 |
|
|
) |
|
|
self._ctrl_comp_e = PIController( |
|
|
setpoint=comp_e_setpoint, gain=2.37, |
|
|
taui=5069.0 / 3600.0, scale=100.0 / 100.0 |
|
|
) |
|
|
|
|
|
|
|
|
self._ctrl_a_feed = PIController( |
|
|
setpoint=self.a_feed_setpoint, gain=1.0, |
|
|
taui=0.0, scale=100.0 / 1.017 |
|
|
) |
|
|
self._ctrl_d_feed = PIController( |
|
|
setpoint=self.d_feed_setpoint, gain=1.0, |
|
|
taui=0.0, scale=100.0 / 5811.0 |
|
|
) |
|
|
self._ctrl_e_feed = PIController( |
|
|
setpoint=self.e_feed_setpoint, gain=1.0, |
|
|
taui=0.0, scale=100.0 / 8354.0 |
|
|
) |
|
|
|
|
|
def calculate( |
|
|
self, |
|
|
xmeas: np.ndarray, |
|
|
xmv: np.ndarray, |
|
|
step: int |
|
|
) -> np.ndarray: |
|
|
xmv_new = xmv.copy() |
|
|
|
|
|
|
|
|
if step % 3 == 0: |
|
|
dt3 = 3.0 / 3600.0 |
|
|
|
|
|
self._ctrl_a_feed.setpoint = self.a_feed_setpoint |
|
|
self._ctrl_d_feed.setpoint = self.d_feed_setpoint |
|
|
self._ctrl_e_feed.setpoint = self.e_feed_setpoint |
|
|
|
|
|
xmv_new[2] = self._ctrl_a_feed.calculate(xmeas[0], xmv_new[2], dt3) |
|
|
xmv_new[0] = self._ctrl_d_feed.calculate(xmeas[1], xmv_new[0], dt3) |
|
|
xmv_new[1] = self._ctrl_e_feed.calculate(xmeas[2], xmv_new[1], dt3) |
|
|
|
|
|
|
|
|
if step % 360 == 0: |
|
|
dt360 = 360.0 / 3600.0 |
|
|
|
|
|
|
|
|
dxmv = self._ctrl_comp_a.calculate_change(xmeas[22], dt360) |
|
|
self.a_feed_setpoint += dxmv * 1.017 / 100.0 |
|
|
|
|
|
|
|
|
dxmv = self._ctrl_comp_d.calculate_change(xmeas[25], dt360) |
|
|
self.d_feed_setpoint += dxmv * 5811.0 / 100.0 |
|
|
|
|
|
|
|
|
dxmv = self._ctrl_comp_e.calculate_change(xmeas[26], dt360) |
|
|
self.e_feed_setpoint += dxmv * 8354.0 / 100.0 |
|
|
|
|
|
return xmv_new |
|
|
|
|
|
def reset(self): |
|
|
self.a_feed_setpoint = 0.25052 |
|
|
self.d_feed_setpoint = 3664.0 |
|
|
self.e_feed_setpoint = 4509.3 |
|
|
self._ctrl_comp_a.reset() |
|
|
self._ctrl_comp_d.reset() |
|
|
self._ctrl_comp_e.reset() |
|
|
self._ctrl_a_feed.reset() |
|
|
self._ctrl_d_feed.reset() |
|
|
self._ctrl_e_feed.reset() |
|
|
|
|
|
def get_parameters(self) -> Dict[str, Any]: |
|
|
return { |
|
|
"comp_a_setpoint": self.comp_a_setpoint, |
|
|
"comp_d_setpoint": self.comp_d_setpoint, |
|
|
"comp_e_setpoint": self.comp_e_setpoint, |
|
|
"a_feed_setpoint": self.a_feed_setpoint, |
|
|
"d_feed_setpoint": self.d_feed_setpoint, |
|
|
"e_feed_setpoint": self.e_feed_setpoint, |
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
@register_controller( |
|
|
name="proportional_only", |
|
|
description="Simple proportional-only control for all loops" |
|
|
) |
|
|
class ProportionalOnlyController(BaseController): |
|
|
""" |
|
|
Simple proportional-only controller for all MVs. |
|
|
|
|
|
This is a minimal full-process controller that uses P-only control |
|
|
for all loops. Useful as a baseline or for testing. |
|
|
""" |
|
|
|
|
|
name = "proportional_only" |
|
|
description = "Simple proportional-only control for all loops" |
|
|
version = "1.0.0" |
|
|
controlled_mvs = list(range(1, 12)) |
|
|
|
|
|
|
|
|
DEFAULT_CONFIG = { |
|
|
|
|
|
0: (1, 3664.0, 1.0, 100.0 / 5811.0), |
|
|
1: (2, 4509.3, 1.0, 100.0 / 8354.0), |
|
|
2: (0, 0.25052, 1.0, 100.0 / 1.017), |
|
|
3: (3, 9.3477, 1.0, 100.0 / 15.25), |
|
|
4: (4, 26.902, -0.083, 100.0 / 53.0), |
|
|
5: (9, 0.33712, 1.22, 100.0 / 1.0), |
|
|
6: (11, 50.0, -2.06, 100.0 / 70.0), |
|
|
7: (14, 50.0, -1.62, 100.0 / 70.0), |
|
|
8: (18, 230.31, 0.41, 100.0 / 460.0), |
|
|
9: (20, 94.599, -1.56, 100.0 / 150.0), |
|
|
10: (16, 22.949, 1.09, 100.0 / 46.0), |
|
|
} |
|
|
|
|
|
def __init__(self): |
|
|
self._controllers = {} |
|
|
for mv_idx, (meas_idx, sp, gain, scale) in self.DEFAULT_CONFIG.items(): |
|
|
self._controllers[mv_idx] = PIController( |
|
|
setpoint=sp, |
|
|
gain=gain, |
|
|
taui=0.0, |
|
|
scale=scale, |
|
|
) |
|
|
|
|
|
def calculate( |
|
|
self, |
|
|
xmeas: np.ndarray, |
|
|
xmv: np.ndarray, |
|
|
step: int |
|
|
) -> np.ndarray: |
|
|
xmv_new = xmv.copy() |
|
|
|
|
|
if step % 3 == 0: |
|
|
dt = 3.0 / 3600.0 |
|
|
for mv_idx, (meas_idx, _, _, _) in self.DEFAULT_CONFIG.items(): |
|
|
xmv_new[mv_idx] = self._controllers[mv_idx].calculate( |
|
|
xmeas[meas_idx], xmv_new[mv_idx], dt |
|
|
) |
|
|
|
|
|
return xmv_new |
|
|
|
|
|
def reset(self): |
|
|
for ctrl in self._controllers.values(): |
|
|
ctrl.reset() |
|
|
|
|
|
def get_parameters(self) -> Dict[str, Any]: |
|
|
return { |
|
|
f"xmv{i+1}_setpoint": self._controllers[i].setpoint |
|
|
for i in self._controllers |
|
|
} |
|
|
|
|
|
|
|
|
@register_controller( |
|
|
name="economic_mpc", |
|
|
description="Economic MPC-style controller (simplified demonstration)" |
|
|
) |
|
|
class EconomicMPCController(BaseController): |
|
|
""" |
|
|
Simplified economic MPC-style controller. |
|
|
|
|
|
This demonstrates how an MPC-style controller could be implemented. |
|
|
It doesn't use actual MPC optimization, but shows the structure |
|
|
for integrating more advanced control strategies. |
|
|
|
|
|
The controller prioritizes: |
|
|
1. Safety (reactor pressure/temperature limits) |
|
|
2. Product quality (product E composition) |
|
|
3. Production rate (maximize throughput) |
|
|
""" |
|
|
|
|
|
name = "economic_mpc" |
|
|
description = "Economic MPC-style controller (simplified demonstration)" |
|
|
version = "1.0.0" |
|
|
controlled_mvs = list(range(1, 12)) |
|
|
|
|
|
|
|
|
REACTOR_TEMP_MAX = 150.0 |
|
|
REACTOR_TEMP_MIN = 100.0 |
|
|
REACTOR_PRESSURE_MAX = 2900.0 |
|
|
REACTOR_PRESSURE_MIN = 2700.0 |
|
|
|
|
|
def __init__( |
|
|
self, |
|
|
production_rate_target: float = 1.0, |
|
|
product_e_target: float = 0.8357, |
|
|
): |
|
|
""" |
|
|
Initialize economic MPC controller. |
|
|
|
|
|
Args: |
|
|
production_rate_target: Target production rate (fraction of nominal) |
|
|
product_e_target: Target product E composition |
|
|
""" |
|
|
self.production_rate_target = production_rate_target |
|
|
self.product_e_target = product_e_target |
|
|
|
|
|
|
|
|
self.base_setpoints = { |
|
|
"d_feed": 3664.0, |
|
|
"e_feed": 4509.3, |
|
|
"a_feed": 0.25052, |
|
|
"ac_feed": 9.3477, |
|
|
"recycle": 26.902, |
|
|
"purge": 0.33712, |
|
|
"sep_level": 50.0, |
|
|
"strip_level": 50.0, |
|
|
"steam": 230.31, |
|
|
"reactor_cw": 94.599, |
|
|
"cond_cw": 22.949, |
|
|
} |
|
|
|
|
|
|
|
|
self._init_controllers() |
|
|
|
|
|
def _init_controllers(self): |
|
|
"""Initialize all PI controllers.""" |
|
|
self._controllers = { |
|
|
0: PIController(self.base_setpoints["d_feed"], 1.0, 0.0, scale=100.0/5811.0), |
|
|
1: PIController(self.base_setpoints["e_feed"], 1.0, 0.0, scale=100.0/8354.0), |
|
|
2: PIController(self.base_setpoints["a_feed"], 1.0, 0.0, scale=100.0/1.017), |
|
|
3: PIController(self.base_setpoints["ac_feed"], 1.0, 0.0, scale=100.0/15.25), |
|
|
4: PIController(self.base_setpoints["recycle"], -0.083, 1.0/3600.0, scale=100.0/53.0), |
|
|
5: PIController(self.base_setpoints["purge"], 1.22, 0.0, scale=100.0/1.0), |
|
|
6: PIController(self.base_setpoints["sep_level"], -2.06, 0.0, scale=100.0/70.0), |
|
|
7: PIController(self.base_setpoints["strip_level"], -1.62, 0.0, scale=100.0/70.0), |
|
|
8: PIController(self.base_setpoints["steam"], 0.41, 0.0, scale=100.0/460.0), |
|
|
9: PIController(self.base_setpoints["reactor_cw"], -1.56, 1452.0/3600.0, scale=100.0/150.0), |
|
|
10: PIController(self.base_setpoints["cond_cw"], 1.09, 2600.0/3600.0, scale=100.0/46.0), |
|
|
} |
|
|
|
|
|
|
|
|
self._meas_idx = {0: 1, 1: 2, 2: 0, 3: 3, 4: 4, 5: 9, 6: 11, 7: 14, 8: 18, 9: 20, 10: 16} |
|
|
|
|
|
def calculate( |
|
|
self, |
|
|
xmeas: np.ndarray, |
|
|
xmv: np.ndarray, |
|
|
step: int |
|
|
) -> np.ndarray: |
|
|
xmv_new = xmv.copy() |
|
|
|
|
|
|
|
|
reactor_temp = xmeas[8] |
|
|
reactor_pressure = xmeas[6] |
|
|
|
|
|
safety_mode = False |
|
|
if reactor_temp > self.REACTOR_TEMP_MAX - 5: |
|
|
|
|
|
xmv_new[9] = min(xmv_new[9] + 2.0, 100.0) |
|
|
safety_mode = True |
|
|
elif reactor_temp < self.REACTOR_TEMP_MIN + 5: |
|
|
|
|
|
xmv_new[9] = max(xmv_new[9] - 2.0, 0.0) |
|
|
safety_mode = True |
|
|
|
|
|
if reactor_pressure > self.REACTOR_PRESSURE_MAX - 50: |
|
|
|
|
|
xmv_new[5] = min(xmv_new[5] + 5.0, 100.0) |
|
|
safety_mode = True |
|
|
|
|
|
|
|
|
if not safety_mode and step % 3 == 0: |
|
|
dt = 3.0 / 3600.0 |
|
|
|
|
|
|
|
|
rate_factor = self.production_rate_target |
|
|
self._controllers[0].setpoint = self.base_setpoints["d_feed"] * rate_factor |
|
|
self._controllers[1].setpoint = self.base_setpoints["e_feed"] * rate_factor |
|
|
|
|
|
|
|
|
for mv_idx, ctrl in self._controllers.items(): |
|
|
meas_idx = self._meas_idx[mv_idx] |
|
|
xmv_new[mv_idx] = ctrl.calculate(xmeas[meas_idx], xmv_new[mv_idx], dt) |
|
|
|
|
|
return xmv_new |
|
|
|
|
|
def reset(self): |
|
|
self._init_controllers() |
|
|
|
|
|
def set_production_rate(self, rate: float): |
|
|
"""Set production rate target (fraction of nominal).""" |
|
|
self.production_rate_target = np.clip(rate, 0.5, 1.2) |
|
|
|
|
|
def get_parameters(self) -> Dict[str, Any]: |
|
|
return { |
|
|
"production_rate_target": self.production_rate_target, |
|
|
"product_e_target": self.product_e_target, |
|
|
"base_setpoints": self.base_setpoints.copy(), |
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
@register_controller( |
|
|
name="passthrough", |
|
|
description="Passthrough controller - holds MVs at current values" |
|
|
) |
|
|
class PassthroughController(BaseController): |
|
|
""" |
|
|
Passthrough controller that holds MVs at their current values. |
|
|
|
|
|
Useful for testing or as a fallback in composite controllers. |
|
|
""" |
|
|
|
|
|
name = "passthrough" |
|
|
description = "Passthrough controller - holds MVs at current values" |
|
|
version = "1.0.0" |
|
|
controlled_mvs = None |
|
|
|
|
|
def calculate( |
|
|
self, |
|
|
xmeas: np.ndarray, |
|
|
xmv: np.ndarray, |
|
|
step: int |
|
|
) -> np.ndarray: |
|
|
return xmv.copy() |
|
|
|
|
|
def reset(self): |
|
|
pass |
|
|
|
|
|
def get_parameters(self) -> Dict[str, Any]: |
|
|
return {} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def create_composite_controller( |
|
|
subsystems: List[str], |
|
|
fallback: str = "passthrough" |
|
|
) -> CompositeController: |
|
|
""" |
|
|
Create a composite controller from named subsystem controllers. |
|
|
|
|
|
Args: |
|
|
subsystems: List of subsystem controller names to include |
|
|
fallback: Name of fallback controller for uncontrolled MVs |
|
|
|
|
|
Returns: |
|
|
CompositeController instance |
|
|
|
|
|
Example: |
|
|
>>> ctrl = create_composite_controller( |
|
|
... ["reactor_subsystem", "separator_subsystem"], |
|
|
... fallback="passthrough" |
|
|
... ) |
|
|
""" |
|
|
|
|
|
MV_ASSIGNMENTS = { |
|
|
"reactor_temp": [10], |
|
|
"separator_level": [7], |
|
|
"stripper_level": [8], |
|
|
"reactor_subsystem": [4, 10], |
|
|
"separator_subsystem": [7, 11], |
|
|
"feed_subsystem": [1, 2, 3, 4], |
|
|
"product_quality": [9], |
|
|
"reactor_composition": [1, 2, 3], |
|
|
} |
|
|
|
|
|
fallback_ctrl = ControllerRegistry.create(fallback) |
|
|
composite = CompositeController(fallback_controller=fallback_ctrl) |
|
|
|
|
|
for name in subsystems: |
|
|
ctrl = ControllerRegistry.create(name) |
|
|
mvs = MV_ASSIGNMENTS.get(name, ctrl.controlled_mvs or []) |
|
|
if mvs: |
|
|
composite.add_controller(ctrl, mvs) |
|
|
|
|
|
return composite |
|
|
|