|
| 1 | +# License: MIT |
| 2 | +# Copyright © 2024 Frequenz Energy-as-a-Service GmbH |
| 3 | + |
| 4 | +"""Load shedding actor.""" |
| 5 | + |
| 6 | +import asyncio |
| 7 | +import random |
| 8 | +import sys |
| 9 | +import termios |
| 10 | +import tty |
| 11 | +from dataclasses import dataclass |
| 12 | +from datetime import datetime, timezone |
| 13 | +from heapq import heappop, heappush |
| 14 | +from typing import AsyncGenerator |
| 15 | +from unittest.mock import patch |
| 16 | + |
| 17 | +from frequenz.channels import Broadcast, Receiver, Sender |
| 18 | +from frequenz.quantities import Percentage, Power |
| 19 | + |
| 20 | +from frequenz.sdk import microgrid |
| 21 | +from frequenz.sdk.actor import Actor, run |
| 22 | +from frequenz.sdk.timeseries import Sample |
| 23 | + |
| 24 | +# Mock configuration |
| 25 | +CONF_STATE = {} |
| 26 | + |
| 27 | + |
| 28 | +def mock_set_consumer(name: str, power: float) -> None: |
| 29 | + """Mock setting consumer power by storing the state in a dictionary. |
| 30 | +
|
| 31 | + Args: |
| 32 | + name: Consumer name. |
| 33 | + power: Power value to set. |
| 34 | + """ |
| 35 | + CONF_STATE[name] = power |
| 36 | + |
| 37 | + |
| 38 | +def _log(msg: str) -> None: |
| 39 | + print(msg, end="\n\r") |
| 40 | + |
| 41 | + |
| 42 | +class PowerMockActor(Actor): |
| 43 | + """Power Mock Actor. |
| 44 | +
|
| 45 | + Asynchronously listens to user key presses 'm' and 'n' to increase and decrease power of a |
| 46 | + static consumer. |
| 47 | + """ |
| 48 | + |
| 49 | + def __init__(self, consumer_name: str) -> None: |
| 50 | + """Initialize the actor.""" |
| 51 | + super().__init__() |
| 52 | + self.consumer_name = consumer_name |
| 53 | + self.power_step = Power.from_kilowatts(1.0) |
| 54 | + |
| 55 | + async def _run(self) -> None: |
| 56 | + _log("Press 'm' to increase power or 'n' to decrease power for the consumer.") |
| 57 | + |
| 58 | + while True: |
| 59 | + # Call _read_key in a thread to avoid blocking the event loop |
| 60 | + key = await asyncio.to_thread(self._read_key) |
| 61 | + if key == "m": |
| 62 | + CONF_STATE[self.consumer_name] = ( |
| 63 | + CONF_STATE.get(self.consumer_name, 0) + self.power_step.as_watts() |
| 64 | + ) |
| 65 | + _log( |
| 66 | + f"Increased {self.consumer_name} power to " |
| 67 | + f"{CONF_STATE[self.consumer_name]/1000.0} kW" |
| 68 | + ) |
| 69 | + elif key == "n": |
| 70 | + CONF_STATE[self.consumer_name] = max( |
| 71 | + 0, |
| 72 | + CONF_STATE.get(self.consumer_name, 0) - self.power_step.as_watts(), |
| 73 | + ) |
| 74 | + _log( |
| 75 | + f"Decreased {self.consumer_name} power to " |
| 76 | + f"{CONF_STATE[self.consumer_name]/1000.0} kW" |
| 77 | + ) |
| 78 | + elif key == "q": |
| 79 | + sys.exit() |
| 80 | + else: |
| 81 | + _log("Invalid key. Use 'm' or 'n'.") |
| 82 | + |
| 83 | + def _read_key(self) -> str: |
| 84 | + """Read a single key press without waiting for Enter.""" |
| 85 | + fd = sys.stdin.fileno() |
| 86 | + old_settings = termios.tcgetattr(fd) |
| 87 | + try: |
| 88 | + tty.setraw(fd) |
| 89 | + key = sys.stdin.read(1) |
| 90 | + finally: |
| 91 | + termios.tcsetattr(fd, termios.TCSADRAIN, old_settings) |
| 92 | + return key |
| 93 | + |
| 94 | + |
| 95 | +@dataclass(order=True) |
| 96 | +class Consumer: |
| 97 | + """Consumer dataclass.""" |
| 98 | + |
| 99 | + priority: int |
| 100 | + name: str |
| 101 | + power: Power |
| 102 | + enabled: bool = False |
| 103 | + |
| 104 | + |
| 105 | +class LoadSheddingActor(Actor): |
| 106 | + """Simple load shedding actor.""" |
| 107 | + |
| 108 | + def __init__( |
| 109 | + self, |
| 110 | + max_peak: Power, |
| 111 | + consumers: list[Consumer], |
| 112 | + grid_meter_receiver: Receiver[Sample[Power]], |
| 113 | + ): |
| 114 | + """Initialize the actor.""" |
| 115 | + super().__init__() |
| 116 | + self.max_peak = max_peak |
| 117 | + self.disable_tolerance = self.max_peak * 0.9 |
| 118 | + self.enable_tolerance = self.max_peak * 0.8 |
| 119 | + self.grid_meter_receiver = grid_meter_receiver |
| 120 | + |
| 121 | + self.enabled_consumers: list[Consumer] = [] |
| 122 | + self.disabled_consumers: list[Consumer] = [] |
| 123 | + |
| 124 | + for c in consumers: |
| 125 | + heappush(self.disabled_consumers, c) |
| 126 | + |
| 127 | + async def _enable_consumer(self, consumer: Consumer) -> None: |
| 128 | + if not consumer.enabled: |
| 129 | + consumer.enabled = True |
| 130 | + heappush(self.enabled_consumers, consumer) |
| 131 | + _log(f"+++{consumer.name}, +{consumer.power}") |
| 132 | + # This is a mock function to set the consumer power, |
| 133 | + # in a real system this would be replaced with the actual implementation |
| 134 | + mock_set_consumer(consumer.name, consumer.power.as_watts()) |
| 135 | + |
| 136 | + async def _disable_consumer(self, consumer: Consumer) -> None: |
| 137 | + if consumer.enabled: |
| 138 | + consumer.enabled = False |
| 139 | + heappush(self.disabled_consumers, consumer) |
| 140 | + _log(f"---{consumer.name}, -{consumer.power}") |
| 141 | + # This is a mock function to set the consumer power, |
| 142 | + # in a real system this would be replaced with the actual implementation |
| 143 | + mock_set_consumer(consumer.name, 0) |
| 144 | + |
| 145 | + async def _adjust_loads(self, current_load: Power) -> None: |
| 146 | + while current_load > self.disable_tolerance and self.enabled_consumers: |
| 147 | + enabled_consumer: Consumer = heappop(self.enabled_consumers) |
| 148 | + await self._disable_consumer(enabled_consumer) |
| 149 | + current_load -= enabled_consumer.power |
| 150 | + |
| 151 | + temp_disabled: list[Consumer] = [] |
| 152 | + while self.disabled_consumers: |
| 153 | + disabled_consumer: Consumer = heappop(self.disabled_consumers) |
| 154 | + if current_load + disabled_consumer.power <= self.enable_tolerance: |
| 155 | + await self._enable_consumer(disabled_consumer) |
| 156 | + current_load += disabled_consumer.power |
| 157 | + else: |
| 158 | + heappush(temp_disabled, disabled_consumer) |
| 159 | + break |
| 160 | + |
| 161 | + while temp_disabled: |
| 162 | + heappush(self.disabled_consumers, heappop(temp_disabled)) |
| 163 | + |
| 164 | + async def _run(self) -> None: |
| 165 | + async for power_sample in self.grid_meter_receiver: |
| 166 | + if power_sample.value: |
| 167 | + _log( |
| 168 | + f"Power: {power_sample.value}, " |
| 169 | + f"Peak: {self.max_peak} ({self.disable_tolerance} / {self.enable_tolerance})" |
| 170 | + f", Enabled: {', '.join(c.name for c in self.enabled_consumers)}\r" |
| 171 | + ) |
| 172 | + await self._adjust_loads(power_sample.value) |
| 173 | + |
| 174 | + |
| 175 | +async def mock_sender( |
| 176 | + sender: Sender[Sample[Power]], |
| 177 | +) -> AsyncGenerator[Sample[Power], None]: |
| 178 | + """Mock implementation of a grid meter receiver. |
| 179 | +
|
| 180 | + It sends power values every second. |
| 181 | + """ |
| 182 | + current_load = Power.from_kilowatts(0.0) |
| 183 | + |
| 184 | + def compute_power() -> Power: |
| 185 | + """Compute current grid power based on mock state.""" |
| 186 | + return Power.from_watts(sum(CONF_STATE.values())) |
| 187 | + |
| 188 | + while True: |
| 189 | + current_load = compute_power() |
| 190 | + # Add +- 8% noise to the current load |
| 191 | + current_load += current_load * Percentage.from_fraction( |
| 192 | + random.uniform(-0.08, 0.08) |
| 193 | + ) |
| 194 | + await sender.send( |
| 195 | + Sample(timestamp=datetime.now(tz=timezone.utc), value=current_load) |
| 196 | + ) |
| 197 | + await asyncio.sleep(1) |
| 198 | + |
| 199 | + |
| 200 | +async def main() -> None: |
| 201 | + """Program entry point.""" |
| 202 | + consumers = [ |
| 203 | + Consumer(priority=1, name="Fan2", power=Power.from_kilowatts(2.5)), |
| 204 | + Consumer(priority=2, name="Drier1", power=Power.from_kilowatts(3.0)), |
| 205 | + Consumer(priority=2, name="Drier2", power=Power.from_kilowatts(2.0)), |
| 206 | + Consumer(priority=3, name="Conveyor1", power=Power.from_kilowatts(1.5)), |
| 207 | + Consumer(priority=3, name="Conveyor2", power=Power.from_kilowatts(1.0)), |
| 208 | + Consumer(priority=4, name="Auger", power=Power.from_kilowatts(2.0)), |
| 209 | + Consumer(priority=4, name="HopperMixer", power=Power.from_kilowatts(2.5)), |
| 210 | + Consumer(priority=5, name="SiloVentilation", power=Power.from_kilowatts(1.0)), |
| 211 | + Consumer(priority=5, name="LoaderArm", power=Power.from_kilowatts(3.0)), |
| 212 | + Consumer(priority=6, name="SeedCleaner", power=Power.from_kilowatts(2.5)), |
| 213 | + Consumer(priority=6, name="Sprayer", power=Power.from_kilowatts(2.0)), |
| 214 | + Consumer(priority=7, name="Grinder", power=Power.from_kilowatts(3.0)), |
| 215 | + Consumer(priority=7, name="Shaker", power=Power.from_kilowatts(1.5)), |
| 216 | + Consumer(priority=8, name="Sorter", power=Power.from_kilowatts(2.0)), |
| 217 | + ] |
| 218 | + |
| 219 | + for consumer in consumers: |
| 220 | + mock_set_consumer(consumer.name, 0) |
| 221 | + |
| 222 | + grid_meter_receiver = microgrid.grid().power.new_receiver() |
| 223 | + |
| 224 | + actor_instance = LoadSheddingActor( |
| 225 | + max_peak=Power.from_kilowatts(30), |
| 226 | + consumers=consumers, |
| 227 | + grid_meter_receiver=grid_meter_receiver, |
| 228 | + ) |
| 229 | + |
| 230 | + user_input_actor = PowerMockActor(consumer_name="static_consumer") |
| 231 | + |
| 232 | + await run(actor_instance, user_input_actor) |
| 233 | + |
| 234 | + |
| 235 | +if __name__ == "__main__": |
| 236 | + with patch("frequenz.sdk.microgrid.grid") as mock_grid: |
| 237 | + chan = Broadcast[Sample[Power]](name="grid_power") |
| 238 | + mock_grid.return_value.power.new_receiver = chan.new_receiver |
| 239 | + |
| 240 | + async def begin() -> None: |
| 241 | + """Start main & mock sender.""" |
| 242 | + await asyncio.gather( |
| 243 | + main(), |
| 244 | + mock_sender(chan.new_sender()), |
| 245 | + ) |
| 246 | + |
| 247 | + asyncio.run(begin()) |
0 commit comments