diff --git a/apps/predbat/component_base.py b/apps/predbat/component_base.py index 76ed1fea3..ecc25f555 100644 --- a/apps/predbat/component_base.py +++ b/apps/predbat/component_base.py @@ -150,6 +150,7 @@ async def start(self): """ seconds = 0 first = True + self.log(f"{self.__class__.__name__}: Starting...") while not self.api_stop and not self.fatal_error: try: if first or seconds % 60 == 0: diff --git a/apps/predbat/component_callback_server.py b/apps/predbat/component_callback_server.py new file mode 100644 index 000000000..139b8d563 --- /dev/null +++ b/apps/predbat/component_callback_server.py @@ -0,0 +1,177 @@ +# ----------------------------------------------------------------------------- +# Predbat Home Battery System +# Copyright Trefor Southwell 2025 - All Rights Reserved +# This application maybe used for personal use only and not for commercial use +# ----------------------------------------------------------------------------- +# fmt: off +# pylint: disable=consider-using-f-string +# pylint: disable=line-too-long +# pylint: disable=attribute-defined-outside-init + +""" +Component Callback Server + +This server runs within Predbat and provides a callback endpoint for remote +components to access the base Predbat object's methods. +""" + +import asyncio +import pickle +import inspect +import traceback +import aiohttp.web +import time + + +class ComponentCallbackServer: + """ + HTTP server that handles callbacks from remote components. + + This server provides endpoints for remote components to call back and access + the main Predbat base object's methods (get_arg, get_state_wrapper, etc.) + """ + + def __init__(self, base, port): + """ + Initialize the callback server. + + Args: + base: The main Predbat base object + port: Port to bind to + """ + self.base = base + self.port = port + self.started = False + self.runner = None + self.site = None + self.log = self.base.log + self.stop_api = False + + async def start(self): + """ + Start the callback server. + """ + print('Here') + self.log("CallBackServer: Initializing ComponentCallbackServer...") + try: + # Create web application + app = aiohttp.web.Application() + + self.log("Starting ComponentCallbackServer...") + + # Add routes + app.router.add_post('/base/call', self._handle_base_call) + app.router.add_get('/health', self._handle_health) + + self.log("CallBackServer: Setting up ComponentCallbackServer runner...") + # Create runner + self.runner = aiohttp.web.AppRunner(app) + await self.runner.setup() + + self.log(f"CallBackServer: Creating ComponentCallbackServer site port {self.port}...") + + # Create site and start + self.site = aiohttp.web.TCPSite(self.runner, '0.0.0.0', self.port) + + try: + await self.site.start() + except OSError as e: + error_msg = f"Failed to bind callback server to port {self.port}: {e}" + self.log(error_msg) + raise + + self.started = True + self.log(f"CallBackServer: started on port {self.port}") + + except Exception as e: + self.log(f"CallBackServer: Error: Failed to start callback server: {e}") + self.log(traceback.format_exc()) + raise + + while not self.stop_api: + await asyncio.sleep(1) + + if self.runner: + self.log("CallBackServer: Stopping callback server") + await self.runner.cleanup() + self.started = False + + + async def _handle_base_call(self, request): + """ + Handle base API call from remote component. + + Expected pickled request: { + "method": str, + "args": tuple, + "kwargs": dict + } + """ + try: + # Read and unpickle request + data = await request.read() + req = pickle.loads(data) + + method = req["method"] + args = req.get("args", ()) + kwargs = req.get("kwargs", {}) + + # Invoke method on base object + try: + method_fn = getattr(self.base, method) + result = method_fn(*args, **kwargs) + + # Check if result is a coroutine and await it + if inspect.iscoroutine(result): + result = await result + + # Return pickled result + return aiohttp.web.Response( + body=pickle.dumps(result, protocol=4) + ) + + except Exception as e: + error_msg = str(e) + self.log(f"CallBackServer: Error: Base call {method} failed: {error_msg}") + self.log(traceback.format_exc()) + + # Return error + return aiohttp.web.Response( + body=pickle.dumps({"error": error_msg}, protocol=4) + ) + + except Exception as e: + error_msg = f"Failed to process base call: {e}" + self.log(f"CallBackServer: Error: {error_msg}") + self.log(traceback.format_exc()) + + return aiohttp.web.Response( + body=pickle.dumps({"error": error_msg}, protocol=4) + ) + + async def _handle_health(self, request): + """Handle health check request.""" + return aiohttp.web.json_response({"status": "ok"}) + + def wait_started(self, timeout=5*60): + """ + Wait for the server to start. + + Args: + timeout: Maximum time to wait in seconds + + Raises: + TimeoutError: If server doesn't start within timeout + """ + elapsed = 0 + while not self.started and elapsed < timeout: + time.sleep(1) + elapsed += 1 + + if not self.started: + raise TimeoutError("Callback server failed to start within timeout") + + async def stop(self): + """Stop the callback server.""" + self.stop_api = True + await asyncio.sleep(0.1) # Give time for any ongoing requests to finish diff --git a/apps/predbat/component_client.py b/apps/predbat/component_client.py new file mode 100644 index 000000000..a1b90d02d --- /dev/null +++ b/apps/predbat/component_client.py @@ -0,0 +1,378 @@ +# ----------------------------------------------------------------------------- +# Predbat Home Battery System +# Copyright Trefor Southwell 2025 - All Rights Reserved +# This application maybe used for personal use only and not for commercial use +# ----------------------------------------------------------------------------- +# fmt: off +# pylint: disable=consider-using-f-string +# pylint: disable=line-too-long +# pylint: disable=attribute-defined-outside-init + +""" +Component Client for Remote Component Execution + +WARNING: This client uses pickle for serialization. Only use in trusted networks +(e.g. Kubernetes cluster) as pickle can execute arbitrary code. +""" + +import asyncio +import pickle +import time +import uuid +import traceback +from datetime import datetime, timezone +import aiohttp + + +class ComponentClient: + """ + Client proxy that forwards component operations to a remote ComponentServer. + + This class inherits from ComponentBase and acts as a transparent proxy for + remote components, handling all communication with the component server. + """ + def __init__(self, base, server_url, remote_class, **component_kwargs): + """ + Initialize the component client. + + Args: + base: The main Predbat base object + server_url: URL of the component server + remote_class: Name of the remote component class + **component_kwargs: Initialization kwargs to pass to remote component + """ + # Store parameters before calling parent init + self.server_url = server_url + self.remote_class = remote_class + self.component_kwargs = component_kwargs + self.client_id = None + self.session = None + self.last_ping_time = 0 + self.restart_lock = asyncio.Lock() + self.component_started = False + + # Initialize base attributes (mimic ComponentBase) + self.base = base + self.log = base.log + self.api_started = False + self.api_stop = False + self.last_success_timestamp = None + self.local_tz = base.local_tz + self.prefix = base.prefix + self.args = base.args + self.currenty_symbols = base.currency_symbols + self.count_errors = 0 + + def wait_api_started(self, timeout=5*60): + """ + Wait for the component's API to be started (self.api_started == True). + Returns True if started, False if timeout. + """ + start = time.time() + self.log(f"ComponentClient: Waiting for API to start (timeout {timeout}s)...") + while not self.api_started: + if time.time() - start > timeout: + return False + time.sleep(1) + return True + + async def start(self): + """ + Start the component client. + + This method: + 1. Gets or creates a client ID + 2. Creates HTTP session + 3. Starts the remote component + 4. Runs the main loop + """ + try: + # Get or create client ID + client_id_entity = f"{self.prefix}.client_id" + self.client_id = self.base.get_state_wrapper(client_id_entity, default=None) + + if not self.client_id: + # Generate new client ID + self.client_id = str(uuid.uuid4()) + self.log(f"ComponentClient: Generated new client ID: {self.client_id}") + + # Save client ID + self.base.set_state_wrapper( + client_id_entity, + self.client_id, + attributes={ + "created": datetime.now(timezone.utc).isoformat(), + "server_url": self.server_url + } + ) + else: + self.log(f"ComponentClient: Using existing client ID: {self.client_id}") + + # Get callback URL from config + callback_url = self.base.get_arg("component_client_callback_url", "http://localhost:5054") + + # Create HTTP session + self.session = aiohttp.ClientSession() + + # Start remote component + await self._start_remote_component(callback_url) + + # Run main loop (same as ComponentBase.start()) + seconds = 0 + first = True + while not self.api_stop and not self.fatal_error: + try: + if first or seconds % 60 == 0: + if await self.run(seconds, first): + if not self.api_started: + self.api_started = True + self.log(f"ComponentClient ({self.remote_class}): Started") + else: + self.count_errors += 1 + first = False + except Exception as e: + self.log(f"Error: ComponentClient ({self.remote_class}): {e}") + self.log("Error: " + traceback.format_exc()) + + seconds += 5 + await asyncio.sleep(5) + + self.log(f"ComponentClient ({self.remote_class}): Finalizing...") + await self.final() + + self.api_started = False + self.log(f"ComponentClient ({self.remote_class}): Stopped") + + except Exception as e: + self.log(f"Error: ComponentClient.start() failed: {e}") + self.log("Error: " + traceback.format_exc()) + self.api_started = False + + async def _start_remote_component(self, callback_url): + """ + Start the component on the remote server. + + Args: + callback_url: URL for server to call back to + """ + try: + self.log(f"ComponentClient: Starting remote component {self.remote_class} on {self.server_url} with callback {callback_url} args {self.component_kwargs}") + + # Prepare request + request_data = pickle.dumps({ + "client_id": self.client_id, + "component_class": self.remote_class, + "callback_url": callback_url, + "init_kwargs": self.component_kwargs + }, protocol=4) + + # Send start request + timeout = aiohttp.ClientTimeout(total=30) + async with self.session.post( + f"{self.server_url}/component/start", + data=request_data, + timeout=timeout + ) as resp: + response_data = await resp.read() + result = pickle.loads(response_data) + + # Check for error + if isinstance(result, dict) and "error" in result: + error_msg = f"Failed to start remote component: {result['error']}" + self.log(f"Error: {error_msg}") + raise RuntimeError(error_msg) + + self.component_started = True + self.log(f"ComponentClient: Remote component {self.remote_class} started successfully") + + except Exception as e: + self.log(f"Error: Failed to start remote component: {e}") + self.log("Error: " + traceback.format_exc()) + raise + + async def _call_remote_with_retry(self, method, *args, **kwargs): + """ + Call a remote method with retry logic. + + Args: + method: Method name to call + *args: Positional arguments + **kwargs: Keyword arguments + + Returns: + Result from remote call, or False on failure + """ + start_time = time.time() + backoff = 1 + timeout_config = self.base.get_arg("component_server_timeout", 1800) + + while True: + try: + # Prepare request + request_data = pickle.dumps({ + "client_id": self.client_id, + "component_class": self.remote_class, + "method": method, + "args": args, + "kwargs": kwargs + }, protocol=4) + + # Send request + timeout = aiohttp.ClientTimeout(total=30) + async with self.session.post( + f"{self.server_url}/component/call", + data=request_data, + timeout=timeout + ) as resp: + response_data = await resp.read() + result = pickle.loads(response_data) + + # Check for "Component not found" error - trigger restart + if isinstance(result, dict) and "error" in result: + if "Component not found" in result["error"]: + self.log(f"Warn: Remote component not found, restarting...") + + # Use lock to prevent duplicate restarts + async with self.restart_lock: + # Double-check component is still missing + if not self.component_started: + callback_url = self.base.get_arg("component_client_callback_url", "http://localhost:5054") + await self._start_remote_component(callback_url) + + # Retry the call + continue + else: + # Other error + self.log(f"Error: Remote call {method} failed: {result['error']}") + return False + + # Success + return result + + except (aiohttp.ClientError, asyncio.TimeoutError) as e: + # Check if we've exceeded total timeout + elapsed = time.time() - start_time + if elapsed > timeout_config: + self.log(f"Error: Remote call {method} timed out after {elapsed}s") + return False + + # Log and retry with backoff + self.log(f"Warn: Remote call {method} failed ({e}), retrying in {backoff}s...") + await asyncio.sleep(backoff) + backoff = min(backoff * 2, 30) # Exponential backoff, max 30s + + except Exception as e: + self.log(f"Error: Remote call {method} failed: {e}") + self.log("Error: " + traceback.format_exc()) + return False + + async def run(self, seconds, first): + """ + Run method called by the main loop. + + Args: + seconds: Seconds since start + first: True if first run + + Returns: + True on success, False on failure + """ + # Send ping if needed + poll_interval = self.base.get_arg("component_server_poll_interval", 300) + if time.time() - self.last_ping_time > poll_interval: + return await self._send_ping() + return True + + async def _send_ping(self): + """Send health check ping to server.""" + try: + request_data = pickle.dumps({ + "client_id": self.client_id, + "component_class": self.remote_class + }, protocol=4) + + timeout = aiohttp.ClientTimeout(total=10) + async with self.session.post( + f"{self.server_url}/component/ping", + data=request_data, + timeout=timeout + ) as resp: + response_data = await resp.read() + result = pickle.loads(response_data) + + self.last_ping_time = time.time() + + # Log if component is not alive + if isinstance(result, dict) and not result.get("alive", True): + self.log(f"Warn: Remote component reports not alive") + return False + return True + + except Exception as e: + self.log(f"Warn: Failed to ping server: {e}") + return False + + async def final(self): + """ + Final cleanup before stopping. + """ + try: + # Send stop request to server + request_data = pickle.dumps({ + "client_id": self.client_id, + "component_class": self.remote_class + }, protocol=4) + + timeout = aiohttp.ClientTimeout(total=10) + async with self.session.post( + f"{self.server_url}/component/stop", + data=request_data, + timeout=timeout + ) as resp: + await resp.read() + + except Exception as e: + self.log(f"Warn: Failed to stop remote component: {e}") + + # Close session + if self.session: + await self.session.close() + + async def stop(self): + """ + Stop the component gracefully. + """ + self.api_stop = True + self.api_started = False + await asyncio.sleep(0.1) + + @property + def fatal_error(self): + """Check if a fatal error has occurred.""" + return self.base.fatal_error + + def is_alive(self): + """Check if component is alive.""" + return self.api_started + + def last_updated_time(self): + """Get last updated time.""" + return self.last_success_timestamp + + def update_success_timestamp(self): + """Update last success timestamp.""" + self.last_success_timestamp = datetime.now(timezone.utc) + + # Event handlers - forward to remote component + + async def select_event(self, entity_id, value): + """Handle select entity event.""" + await self._call_remote_with_retry("select_event", entity_id, value) + + async def number_event(self, entity_id, value): + """Handle number entity event.""" + await self._call_remote_with_retry("number_event", entity_id, value) + + async def switch_event(self, entity_id, service): + """Handle switch entity event.""" + await self._call_remote_with_retry("switch_event", entity_id, service) diff --git a/apps/predbat/component_server.py b/apps/predbat/component_server.py new file mode 100644 index 000000000..eb9bab67f --- /dev/null +++ b/apps/predbat/component_server.py @@ -0,0 +1,656 @@ +# ----------------------------------------------------------------------------- +# Predbat Home Battery System +# Copyright Trefor Southwell 2025 - All Rights Reserved +# This application maybe used for personal use only and not for commercial use +# ----------------------------------------------------------------------------- +# fmt: off +# pylint: disable=consider-using-f-string +# pylint: disable=line-too-long +# pylint: disable=attribute-defined-outside-init + +""" +Component Server for Remote Component Execution + +WARNING: This server uses pickle for serialization. Only use in trusted networks +(e.g. Kubernetes cluster) as pickle can execute arbitrary code. Do not expose +this server to untrusted networks or the internet. +""" + +import asyncio +import pickle +import logging +import importlib +import traceback +from datetime import datetime, timezone +import aiohttp +import aiohttp.web +import os +class BaseMock: + """ + Mock base object that forwards all calls back to the client's Predbat instance. + + This class mimics the PredBat base object interface but implements it by making + HTTP callbacks to the client for all operations. + """ + + def __init__(self, callback_url, component_name, client_id): + """ + Initialize BaseMock with callback URL. + + Args: + callback_url: URL of the client's callback server + """ + self.callback_url = callback_url + self.session = aiohttp.ClientSession() + self.fatal_error = False + self.component_name = component_name + self.client_id = client_id + + async def initialize(self): + # Cache immutable attributes on initialization + self.local_tz = await self._remote_call("get_local_attr", "local_tz") + self.prefix = await self._remote_call("get_local_attr", "prefix") + self.args = await self._remote_call("get_local_attr", "args") + self.currency_symbols = await self._remote_call("get_local_attr", "currency_symbols") + self.num_cars = await self._remote_call("get_local_attr", "num_cars") + self.plan_interval_minutes = await self._remote_call("get_local_attr", "plan_interval_minutes") + + # Config root + self.config_root = self.component_name + "_" + self.client_id + os.makedirs(self.config_root, exist_ok=True) + + async def _remote_call(self, method, *args, **kwargs): + """ + Make a remote call to the client's base object. + + Args: + method: Method name to call + *args: Positional arguments + **kwargs: Keyword arguments + + Returns: + Result from the remote call + """ + try: + request_data = pickle.dumps({ + "method": method, + "args": args, + "kwargs": kwargs + }, protocol=4) + + # Get or create session for current event loop + try: + loop = asyncio.get_running_loop() + if not hasattr(self, '_loop_sessions'): + self._loop_sessions = {} + if loop not in self._loop_sessions: + self._loop_sessions[loop] = aiohttp.ClientSession() + session = self._loop_sessions[loop] + except RuntimeError: + # No running loop, use default session + session = self.session + + async with session.post( + f"{self.callback_url}/base/call", + data=request_data, + timeout=aiohttp.ClientTimeout(total=30) + ) as resp: + response_data = await resp.read() + result = pickle.loads(response_data) + + # Check if error was returned + if isinstance(result, dict) and "error" in result: + raise RuntimeError(f"Remote call failed: {result['error']}") + + return result + except Exception as e: + logging.error(f"BaseMock._remote_call({method}) failed: {e}") + raise + + # Base API methods - all forward to client + def log(self, msg): + logging.info(f"{self.component_name} ({self.client_id}): {msg}") + + async def async_get_arg(self, arg, default=None, indirect=True, combine=False, attribute=None, index=None, domain=None, can_override=True, required_unit=None): + """Get configuration argument from client (async).""" + return await self._remote_call( + "get_arg", arg, default=default, indirect=indirect, combine=combine, + attribute=attribute, index=index, domain=domain, can_override=can_override, required_unit=required_unit + ) + + + # Async versions + async def async_get_arg(self, arg, default=None, indirect=True, combine=False, attribute=None, index=None, domain=None, can_override=True, required_unit=None): + return await self._remote_call( + "get_arg", arg, default=default, indirect=indirect, combine=combine, + attribute=attribute, index=index, domain=domain, can_override=can_override, required_unit=required_unit + ) + async def async_set_arg(self, arg, value): + return await self._remote_call("set_arg", arg, value) + + async def async_get_state_wrapper(self, entity_id=None, default=None, attribute=None, refresh=False, required_unit=None): + return await self._remote_call("get_state_wrapper", entity_id, default=default, attribute=attribute, refresh=refresh, required_unit=required_unit) + + async def async_set_state_wrapper(self, entity_id, state, attributes={}, required_unit=None): + return await self._remote_call("set_state_wrapper", entity_id, state, attributes=attributes, required_unit=required_unit) + + async def async_get_history_wrapper(self, entity_id, days=30, required=True, tracked=True): + return await self._remote_call("get_history_wrapper", entity_id, days=days, required=required, tracked=tracked) + + async def async_get_ha_config(self, name, default): + return await self._remote_call("get_ha_config", name, default) + + async def async_dashboard_item(self, entity, state, attributes, app=None): + return await self._remote_call("dashboard_item", entity, state, attributes, app=app) + + @property + async def async_now_utc(self): + return await self._remote_call("get_local_attr", "now_utc") + + @property + async def async_midnight_utc(self): + return await self._remote_call("get_local_attr", "midnight_utc") + + @property + async def async_minutes_now(self): + return await self._remote_call("get_local_attr", "minutes_now") + + @property + async def async_arg_errors(self): + return await self._remote_call("get_local_attr", "arg_errors") + + # Sync wrappers (for non-async contexts only) + def _run_async(self, coro): + try: + loop = asyncio.get_running_loop() + except RuntimeError: + # No running loop, safe to use asyncio.run + return asyncio.run(coro) + else: + # Already in an event loop - must schedule and wait + # Use a new thread to run asyncio.run to avoid blocking the event loop + import threading + import queue + + result_queue = queue.Queue() + exception_queue = queue.Queue() + + def run_in_thread(): + try: + # Create a new event loop in this thread + new_loop = asyncio.new_event_loop() + asyncio.set_event_loop(new_loop) + try: + result = new_loop.run_until_complete(coro) + result_queue.put(result) + finally: + new_loop.close() + except Exception as e: + exception_queue.put(e) + + thread = threading.Thread(target=run_in_thread, daemon=True) + thread.start() + thread.join(timeout=2*60) # 2 minute timeout + + if not exception_queue.empty(): + raise exception_queue.get() + + if not result_queue.empty(): + return result_queue.get() + + raise TimeoutError("Async operation timed out after 30 seconds") + + def get_arg(self, *args, **kwargs): + return self._run_async(self.async_get_arg(*args, **kwargs)) + + def set_arg(self, *args, **kwargs): + return self._run_async(self.async_set_arg(*args, **kwargs)) + + def get_state_wrapper(self, *args, **kwargs): + return self._run_async(self.async_get_state_wrapper(*args, **kwargs)) + + def set_state_wrapper(self, *args, **kwargs): + return self._run_async(self.async_set_state_wrapper(*args, **kwargs)) + + def get_history_wrapper(self, *args, **kwargs): + return self._run_async(self.async_get_history_wrapper(*args, **kwargs)) + + def get_ha_config(self, *args, **kwargs): + return self._run_async(self.async_get_ha_config(*args, **kwargs)) + + def dashboard_item(self, *args, **kwargs): + return self._run_async(self.async_dashboard_item(*args, **kwargs)) + + @property + def now_utc(self): + return self._run_async(self.async_now_utc) + + @property + def midnight_utc(self): + return self._run_async(self.async_midnight_utc) + + @property + def minutes_now(self): + return self._run_async(self.async_minutes_now) + + @property + def arg_errors(self): + return self._run_async(self.async_arg_errors) + + async def cleanup(self): + """Cleanup session""" + await self.session.close() + # Clean up per-loop sessions + if hasattr(self, '_loop_sessions'): + for session in self._loop_sessions.values(): + await session.close() + self._loop_sessions.clear() + + +class ComponentServer: + """ + HTTP server that hosts remote components and forwards their base API calls + back to the client Predbat instance. + """ + + def __init__(self, timeout, component_loader): + """ + Initialize the component server. + + Args: + timeout: Timeout in seconds for component inactivity + component_classes: Dict mapping class names to class objects + component_loader: Optional function(class_name) -> class for lazy loading + """ + self.timeout = timeout + self.component_loader = component_loader + self.components = {} # Key: client_id_component_class, Value: metadata dict + self.shutdown_flag = False + self.active_calls = 0 + self.active_calls_lock = asyncio.Lock() + self.logger = logging.getLogger("ComponentServer") + self.app = None + self.timeout_task = None + + async def handle_component_start(self, request): + """ + Handle component start request. + + Expected pickled request: { + "client_id": str, + "component_class": str, + "callback_url": str, + "init_kwargs": dict + } + """ + if self.shutdown_flag: + return aiohttp.web.Response( + body=pickle.dumps({"error": "Server is shutting down"}, protocol=4) + ) + + try: + # Unpickle request + data = await request.read() + req = pickle.loads(data) + + client_id = req["client_id"] + component_class = req["component_class"] + callback_url = req["callback_url"] + init_kwargs = req["init_kwargs"] + component_args = init_kwargs.get("component_args", {}) + instance_key = f"{client_id}_{component_class}" + + if instance_key in self.components: + self.logger.info(f"Component {instance_key} already started") + return aiohttp.web.Response( + body=pickle.dumps({"success": True}, protocol=4) + ) + + self.logger.info(f"Starting component {component_class} for client {client_id} class {component_class} with callback {callback_url} args {init_kwargs}") + + # Validate callback URL is reachable + try: + async with aiohttp.ClientSession() as session: + async with session.get( + f"{callback_url}/health", + timeout=aiohttp.ClientTimeout(total=5) + ) as resp: + if resp.status != 200: + raise Exception(f"Health check returned {resp.status}") + except Exception as e: + error_msg = f"Callback URL {callback_url} not reachable: {e}" + error_msg += traceback.format_exc() + self.logger.error(error_msg) + return aiohttp.web.Response( + body=pickle.dumps({"error": error_msg}, protocol=4) + ) + + # Import and instantiate component + try: + # Get component class from registry or lazy loader + cls = self.component_loader(component_class) + if not cls: + raise ValueError(f"Component class {component_class} not registered") + self.logger.info(f"Loaded component class {component_class}") + + # Create BaseMock + base_mock = BaseMock(callback_url, component_class, client_id) + self.logger.info(f"Initializing BaseMock for component {component_class}") + await base_mock.initialize() + self.logger.info(f"BaseMock initialized for component {component_class}") + + # Instantiate component + component = cls(base_mock, **component_args) + self.logger.info(f"Instantiated component {component_class}") + + # Store component metadata + self.components[instance_key] = { + "component": component, + "base_mock": base_mock, + "callback_url": callback_url, + "last_ping": datetime.now(timezone.utc), + "task": asyncio.create_task(component.start()) + } + + self.logger.info(f"Component {instance_key} started successfully") + component.api_started = True + + return aiohttp.web.Response( + body=pickle.dumps({"success": True}, protocol=4) + ) + + except Exception as e: + error_msg = f"Failed to start component: {e}" + self.logger.error(error_msg) + self.logger.error(traceback.format_exc()) + return aiohttp.web.Response( + body=pickle.dumps({"error": error_msg}, protocol=4) + ) + + except Exception as e: + error_msg = f"Failed to process start request: {e}" + self.logger.error(error_msg) + self.logger.error(traceback.format_exc()) + return aiohttp.web.Response( + body=pickle.dumps({"error": error_msg}, protocol=4) + ) + + async def handle_component_call(self, request): + """ + Handle component method call. + + Expected pickled request: { + "client_id": str, + "component_class": str, + "method": str, + "args": tuple, + "kwargs": dict + } + """ + if self.shutdown_flag: + return aiohttp.web.Response( + body=pickle.dumps({"error": "Server is shutting down"}, protocol=4) + ) + + # Increment active calls counter + async with self.active_calls_lock: + self.active_calls += 1 + + try: + # Unpickle request + data = await request.read() + req = pickle.loads(data) + + client_id = req["client_id"] + component_class = req["component_class"] + method = req["method"] + args = req.get("args", ()) + kwargs = req.get("kwargs", {}) + + instance_key = f"{client_id}_{component_class}" + + self.logger.info(f"ComponentServer: Handling call to {method} for component {instance_key} class {component_class} args {args} kwargs {kwargs}") + + # Check if component exists + if instance_key not in self.components: + return aiohttp.web.Response( + body=pickle.dumps({"error": "Component not found"}, protocol=4) + ) + + component_meta = self.components[instance_key] + component = component_meta["component"] + + # Invoke method + try: + method_fn = getattr(component, method) + + # Check if method is a coroutine function + if asyncio.iscoroutinefunction(method_fn): + result = await method_fn(*args, **kwargs) + else: + result = method_fn(*args, **kwargs) + + return aiohttp.web.Response( + body=pickle.dumps(result, protocol=4) + ) + + except Exception as e: + error_msg = str(e) + self.logger.error(f"Component method {method} failed: {error_msg}") + self.logger.error(traceback.format_exc()) + return aiohttp.web.Response( + body=pickle.dumps({"error": error_msg}, protocol=4) + ) + + except Exception as e: + error_msg = f"Failed to process call request: {e}" + self.logger.error(error_msg) + self.logger.error(traceback.format_exc()) + return aiohttp.web.Response( + body=pickle.dumps({"error": error_msg}, protocol=4) + ) + finally: + # Decrement active calls counter + async with self.active_calls_lock: + self.active_calls -= 1 + + async def handle_component_ping(self, request): + """ + Handle component ping (health check). + + Expected pickled request: { + "client_id": str, + "component_class": str + } + """ + try: + data = await request.read() + req = pickle.loads(data) + + client_id = req["client_id"] + component_class = req["component_class"] + instance_key = f"{client_id}_{component_class}" + + if instance_key in self.components: + # Update last ping time + self.components[instance_key]["last_ping"] = datetime.now(timezone.utc) + + # Check if component is alive + component = self.components[instance_key]["component"] + alive = component.is_alive() + + return aiohttp.web.Response( + body=pickle.dumps({"alive": alive}, protocol=4) + ) + else: + return aiohttp.web.Response( + body=pickle.dumps({"error": "Component not found"}, protocol=4) + ) + + except Exception as e: + error_msg = f"Failed to process ping request: {e}" + self.logger.error(error_msg) + return aiohttp.web.Response( + body=pickle.dumps({"error": error_msg}, protocol=4) + ) + + async def handle_component_stop(self, request): + """ + Handle component stop request. + + Expected pickled request: { + "client_id": str, + "component_class": str + } + """ + try: + data = await request.read() + req = pickle.loads(data) + + client_id = req["client_id"] + component_class = req["component_class"] + instance_key = f"{client_id}_{component_class}" + + if instance_key in self.components: + await self._stop_component(instance_key) + + return aiohttp.web.Response( + body=pickle.dumps({"success": True}, protocol=4) + ) + else: + return aiohttp.web.Response( + body=pickle.dumps({"error": "Component not found"}, protocol=4) + ) + + except Exception as e: + error_msg = f"Failed to process stop request: {e}" + self.logger.error(error_msg) + return aiohttp.web.Response( + body=pickle.dumps({"error": error_msg}, protocol=4) + ) + + async def _stop_component(self, instance_key): + """Stop a component and clean up.""" + if instance_key not in self.components: + return + + self.logger.info(f"Stopping component {instance_key}") + + component_meta = self.components[instance_key] + component = component_meta["component"] + task = component_meta["task"] + base_mock = component_meta["base_mock"] + + # Stop component + try: + await component.stop() + except Exception as e: + self.logger.error(f"Error stopping component: {e}") + + # Cancel task + if task and not task.done(): + task.cancel() + try: + await task + except asyncio.CancelledError: + pass + + # Cleanup base mock + try: + await base_mock.cleanup() + except Exception as e: + self.logger.error(f"Error cleaning up base mock: {e}") + + # Remove from dict + del self.components[instance_key] + + self.logger.info(f"Component {instance_key} stopped") + + async def _timeout_checker(self): + """Background task to check for timed out components.""" + self.logger.info("Timeout checker started") + + while not self.shutdown_flag: + try: + await asyncio.sleep(60) # Check every 60 seconds + + now = datetime.now(timezone.utc) + timed_out = [] + + for instance_key, meta in self.components.items(): + last_ping = meta["last_ping"] + elapsed = (now - last_ping).total_seconds() + + if elapsed > self.timeout: + timed_out.append(instance_key) + self.logger.warning(f"Component {instance_key} timed out (no ping for {elapsed}s)") + + # Stop timed out components + for instance_key in timed_out: + await self._stop_component(instance_key) + + except Exception as e: + self.logger.error(f"Error in timeout checker: {e}") + self.logger.error(traceback.format_exc()) + + self.logger.info("Timeout checker stopped") + + async def shutdown(self): + """Graceful shutdown of the server.""" + self.logger.info("Shutdown initiated") + + # Set shutdown flag to reject new calls + self.shutdown_flag = True + + # Wait for active calls to complete (max 30 seconds) + wait_time = 0 + while wait_time < 30: + async with self.active_calls_lock: + if self.active_calls == 0: + break + + await asyncio.sleep(1) + wait_time += 1 + + if self.active_calls > 0: + self.logger.warning(f"Forcing shutdown with {self.active_calls} active calls") + + # Stop all components + instance_keys = list(self.components.keys()) + for instance_key in instance_keys: + await self._stop_component(instance_key) + + # Cancel timeout checker + if self.timeout_task and not self.timeout_task.done(): + self.timeout_task.cancel() + try: + await self.timeout_task + except asyncio.CancelledError: + pass + + self.logger.info("Shutdown complete") + + async def run(self, host, port): + """ + Run the component server. + + Args: + host: Host to bind to + port: Port to bind to + """ + # Create web application + self.app = aiohttp.web.Application() + + # Add routes + self.app.router.add_post('/component/start', self.handle_component_start) + self.app.router.add_post('/component/call', self.handle_component_call) + self.app.router.add_post('/component/ping', self.handle_component_ping) + self.app.router.add_post('/component/stop', self.handle_component_stop) + + # Start timeout checker + self.timeout_task = asyncio.create_task(self._timeout_checker()) + + self.logger.info(f"Component server starting on {host}:{port}") + + # Run web server + await aiohttp.web._run_app(self.app, host=host, port=port) diff --git a/apps/predbat/components.py b/apps/predbat/components.py index eccbf0f32..fdf7544a1 100644 --- a/apps/predbat/components.py +++ b/apps/predbat/components.py @@ -20,6 +20,8 @@ from db_manager import DatabaseManager from fox import FoxAPI from web_mcp import PredbatMCPServer +from component_client import ComponentClient +from component_callback_server import ComponentCallbackServer from datetime import datetime, timezone, timedelta import asyncio import os @@ -88,6 +90,27 @@ "required_or": ["solcast_host", "forecast_solar", "pv_forecast_today"], "phase": 1, }, + "octopus": { + "class": OctopusAPI, + "name": "Octopus Energy Direct", + "event_filter": "predbat_octopus_", + "args": { + "key": { + "required": True, + "config": "octopus_api_key", + }, + "account_id": { + "required": True, + "config": "octopus_api_account", + }, + "automatic": { + "required": False, + "default": True, + "config": "octopus_automatic", + }, + }, + "phase": 1, + }, "gecloud": { "class": GECloudDirect, "name": "GivEnergy Cloud Direct", @@ -133,27 +156,6 @@ }, "phase": 1, }, - "octopus": { - "class": OctopusAPI, - "name": "Octopus Energy Direct", - "event_filter": "predbat_octopus_", - "args": { - "key": { - "required": True, - "config": "octopus_api_key", - }, - "account_id": { - "required": True, - "config": "octopus_api_account", - }, - "automatic": { - "required": False, - "default": True, - "config": "octopus_automatic", - }, - }, - "phase": 1, - }, "ohme": { "class": OhmeAPI, "name": "Ohme Charger", @@ -222,9 +224,38 @@ def __init__(self, base): self.component_tasks = {} self.base = base self.log = base.log + self.callback_server = None def initialize(self, only=None, phase=0): """Initialize components without starting them""" + + # Initialize callback server for remote components (phase 0 only) + if phase == 0 and only is None: + # Get component_server config from args (apps.yaml) + component_server_config = self.base.args.get("component_server", {}) + component_server_components = component_server_config.get("components", []) + + if component_server_components: + server_url = component_server_config.get("url", "") + if not server_url: + self.log("Warn: component_server.components configured but component_server.url is empty, disabling remote components") + component_server_components = [] + else: + # Start callback server + try: + self.log("Components: Starting component callback server for remote components") + port = component_server_config.get("callback_port", 5054) + self.callback_server = ComponentCallbackServer(self.base, port) + self.log("Components: Starting component callback server for remote components") + self.base.create_task(self.callback_server.start()) + self.log("Components: Waiting for component callback server to start...") + self.callback_server.wait_started(timeout=30) + self.log(f"Components: Callback server started on port {port}") + except Exception as e: + self.log(f"Components: Error: Failed to start callback server: {e}") + self.callback_server = None + component_server_components = [] + for component_name, component_info in COMPONENT_LIST.items(): if only and component_name != only: continue @@ -260,7 +291,31 @@ def initialize(self, only=None, phase=0): have_all_args = False if have_all_args: self.log(f"Initializing {component_info['name']} interface") - self.components[component_name] = component_info["class"](self.base, **arg_dict) + + # Check if this component should run remotely + component_server_config = self.base.args.get("component_server", {}) + component_server_components = component_server_config.get("components", []) + + if component_name in component_server_components and self.callback_server: + # Create remote component client + server_url = component_server_config.get("url", "") + callback_url = component_server_config.get("callback_url", "") + callback_port = component_server_config.get("callback_port", 5054) + poll_interval = component_server_config.get("poll_interval", 300) + timeout = component_server_config.get("timeout", 1800) + + args_data = {"callback_url": callback_url, "callback_port": callback_port, "poll_interval": poll_interval, "timeout": timeout, "component_args": arg_dict} + + self.log(f"Creating remote component client for {component_name} at {server_url}") + self.components[component_name] = ComponentClient( + self.base, + server_url, + component_name, + **args_data, + ) + else: + # Create local component + self.components[component_name] = component_info["class"](self.base, **arg_dict) def start(self, only=None, phase=0): """Start all initialized components""" @@ -288,6 +343,12 @@ def start(self, only=None, phase=0): return not failed async def stop(self, only=None): + # Stop callback server first if stopping all components + if not only and self.callback_server: + self.log("Stopping component callback server") + await self.callback_server.stop() + self.callback_server = None + for component_name, component_info in reversed(list(self.components.items())): if only and component_name != only: continue diff --git a/apps/predbat/fox.py b/apps/predbat/fox.py index 76c9fc3de..5253c5df9 100644 --- a/apps/predbat/fox.py +++ b/apps/predbat/fox.py @@ -341,6 +341,7 @@ async def get_device_detail(self, deviceSN): result = await self.request_get(GET_DEVICE_INFO, post=False, datain=query) if result is not None: self.device_detail[deviceSN] = result + return result async def get_device_settings(self, deviceSN): """ @@ -1260,6 +1261,8 @@ class MockBase: def __init__(self): self.local_tz = datetime.now().astimezone().tzinfo + self.prefix = "predbat" + self.args = {} def log(self, message): print(f"LOG: {message}") @@ -1277,29 +1280,36 @@ async def test_fox_api(api_key): # Create a mock base object mock_base = MockBase() - sn = "609H50204BPM048" + sn = "603J303046YP036" # Create FoxAPI instance with a lambda that returns the API key - fox_api = FoxAPI(api_key, False, mock_base) - # device_List = await fox_api.get_device_list() - # print(f"Device List: {device_List}") + args = { + "key": api_key, + "automatic": False, + } + fox_api = FoxAPI(mock_base, **args) + device_List = await fox_api.get_device_list() + print(f"Device List: {device_List}") + # return(1) # await fox_api.start() # res = await fox_api.get_device_settings(sn) # print(res) # res = await fox_api.get_battery_charging_time(sn) + # print("Battery Charging Time:") + # print(res) + # return 1 + # res = await fox_api.get_device_detail(sn) + # print(res) + # res = await fox_api.get_scheduler(sn, checkBattery=False) # print(res) - res = await fox_api.get_device_detail(sn) - print(res) - res = await fox_api.get_scheduler(sn, checkBattery=False) - print(res) - return 1 # res = await fox_api.compute_schedule(sn) # res = await fox_api.publish_data() # res = await fox_api.set_device_setting(sn, "dummy", 42) # print(res) - # res = await fox_api.get_scheduler(sn) - # groups = res.get('groups', []) + res = await fox_api.get_scheduler(sn, checkBattery=False) + print(res) + groups = res.get("groups", []) # {'endHour': 0, 'fdPwr': 0, 'minSocOnGrid': 10, 'workMode': 'Invalid', 'fdSoc': 10, 'enable': 0, 'startHour': 0, 'maxSoc': 100, 'startMinute': 0, 'endMinute': 0}, # new_slot = groups[0].copy() new_slot = {} @@ -1310,7 +1320,7 @@ async def test_fox_api(api_key): new_slot["endHour"] = 12 new_slot["endMinute"] = 00 new_slot["fdSoc"] = 10 - new_slot["fdPwr"] = 5000 + new_slot["fdPwr"] = 8000 new_slot["minSocOnGrid"] = 10 new_slot2 = {} new_slot2["enable"] = 1 diff --git a/apps/predbat/octopus.py b/apps/predbat/octopus.py index 2a05cb4fc..b02ba0361 100644 --- a/apps/predbat/octopus.py +++ b/apps/predbat/octopus.py @@ -17,6 +17,7 @@ from config import TIME_FORMAT import json import pytz +import traceback user_agent_value = "predbat-octopus-energy" integration_context_header = "Ha-Integration-Context" @@ -337,6 +338,7 @@ def initialize(self, key, account_id, automatic): if not os.path.exists(self.cache_path): os.makedirs(self.cache_path) self.cache_file = self.cache_path + "/octopus.yaml" + self.log("Octopus API: Initialized with account ID {} cache {}".format(self.account_id, self.cache_file)) async def select_event(self, entity_id, value): if entity_id == self.get_entity_name("select", "intelligent_target_time"): @@ -364,6 +366,7 @@ async def run(self, seconds, first): """ Main run loop """ + self.log("Octopus API: enter seconds {} first {}".format(seconds, first)) if first: # Load cached data await self.load_octopus_cache() diff --git a/apps/predbat/predbat.py b/apps/predbat/predbat.py index 92454d767..1ea4df69d 100644 --- a/apps/predbat/predbat.py +++ b/apps/predbat/predbat.py @@ -30,7 +30,7 @@ THIS_VERSION = "v8.29.1" # fmt: off -PREDBAT_FILES = ["predbat.py", "hass.py", "config.py", "prediction.py", "gecloud.py","utils.py", "inverter.py", "ha.py", "download.py", "web.py", "web_helper.py", "predheat.py", "futurerate.py", "octopus.py", "solcast.py","execute.py", "plan.py", "fetch.py", "output.py", "userinterface.py", "energydataservice.py", "alertfeed.py", "compare.py", "db_manager.py", "db_engine.py", "plugin_system.py", "ohme.py", "components.py", "fox.py", "carbon.py", "web_mcp.py", "component_base.py"] +PREDBAT_FILES = ["predbat.py", "hass.py", "config.py", "prediction.py", "gecloud.py","utils.py", "inverter.py", "ha.py", "download.py", "web.py", "web_helper.py", "predheat.py", "futurerate.py", "octopus.py", "solcast.py","execute.py", "plan.py", "fetch.py", "output.py", "userinterface.py", "energydataservice.py", "alertfeed.py", "compare.py", "db_manager.py", "db_engine.py", "plugin_system.py", "ohme.py", "components.py", "fox.py", "carbon.py", "web_mcp.py", "component_base.py", "component_server.py", "component_client.py", "component_callback_server.py"] # fmt: on from download import predbat_update_move, predbat_update_download, check_install @@ -79,6 +79,12 @@ class PredBat(hass.Hass, Octopus, Energidataservice, Fetch, Plan, Execute, Outpu The battery prediction class itself """ + def get_local_attr(self, attr_name): + """ + Get a local attribute by name + """ + return getattr(self, attr_name, None) + def download_predbat_releases_url(self, url): """ Download release data from github, but use the cache for 2 hours diff --git a/apps/predbat/run_component_server.py b/apps/predbat/run_component_server.py new file mode 100755 index 000000000..88fa44e3d --- /dev/null +++ b/apps/predbat/run_component_server.py @@ -0,0 +1,152 @@ +#!/usr/bin/env python3 +# ----------------------------------------------------------------------------- +# Predbat Home Battery System +# Copyright Trefor Southwell 2025 - All Rights Reserved +# This application maybe used for personal use only and not for commercial use +# ----------------------------------------------------------------------------- +""" +Component Server Launcher + +WARNING: This server uses pickle for serialization. Only use in trusted networks +(e.g. Kubernetes cluster) as pickle can execute arbitrary code. Do not expose +this server to untrusted networks or the internet. + +This script launches the ComponentServer to host remote Predbat components. +""" + +import sys +import os +import argparse +import asyncio +import logging +import signal +import importlib + +# Add apps/predbat to path so we can import modules +sys.path.insert(0, os.path.join(os.path.dirname(__file__), "..", "..", "apps", "predbat")) + +from predbat import THIS_VERSION +from component_server import ComponentServer +from components import COMPONENT_LIST + + +def get_component_class(component_name): + """ + Lazy-load a component class by name using importlib to avoid circular import issues. + + Args: + class_name: Name of the component class (e.g., "OctopusAPI") + + Returns: + Component class or None if not found + """ + + component_def = COMPONENT_LIST.get(component_name) + if not component_def: + return None + + try: + return component_def["class"] + except Exception as e: + logging.error(f"Failed to load component class {component_def['class']}: {e}") + import traceback + + traceback.print_exc() + return None + + +def setup_logging(log_level, log_file=None): + """ + Setup logging configuration. + + Args: + log_level: Logging level (DEBUG, INFO, WARNING, ERROR) + log_file: Optional log file path + """ + handlers = [logging.StreamHandler(sys.stdout)] + + if log_file: + handlers.append(logging.FileHandler(log_file)) + + logging.basicConfig(level=getattr(logging, log_level), format="%(asctime)s - %(name)s - %(levelname)s - %(message)s", handlers=handlers) + + +def main(): + """Main entry point for the component server.""" + parser = argparse.ArgumentParser(description="Predbat Component Server - Remote component execution host") + parser.add_argument("--host", default="0.0.0.0", help="Server bind address (default: 0.0.0.0)") + parser.add_argument("--port", type=int, default=5053, help="Server port (default: 5053)") + parser.add_argument("--timeout", type=int, default=1800, help="Component inactivity timeout in seconds (default: 1800)") + parser.add_argument("--log-level", default="INFO", choices=["DEBUG", "INFO", "WARNING", "ERROR"], help="Logging level (default: INFO)") + parser.add_argument("--log-file", help="Optional log file path") + + args = parser.parse_args() + + # Setup logging + setup_logging(args.log_level, args.log_file) + logger = logging.getLogger(__name__) + + logger.info("=" * 80) + logger.info(f"Predbat Component Server (Predbat VERSION {THIS_VERSION})") + logger.info("=" * 80) + logger.info(f"Host: {args.host}") + logger.info(f"Port: {args.port}") + logger.info(f"Timeout: {args.timeout} seconds") + logger.info(f"Log Level: {args.log_level}") + if args.log_file: + logger.info(f"Log File: {args.log_file}") + logger.info(f"Registered Components: OctopusAPI") + logger.info("=" * 80) + logger.warning("WARNING: Using pickle serialization - only use in trusted networks!") + logger.info("=" * 80) + + # Create server instance with lazy component loader + server = ComponentServer(timeout=args.timeout, component_loader=get_component_class) # Lazy loader function + + # Setup signal handlers for graceful shutdown + shutdown_event = asyncio.Event() + + def signal_handler(signum, frame): + """Handle shutdown signals.""" + logger.info(f"Received signal {signum}, initiating shutdown...") + shutdown_event.set() + + signal.signal(signal.SIGTERM, signal_handler) + signal.signal(signal.SIGINT, signal_handler) + + # Run server + async def run_with_shutdown(): + """Run server with shutdown handling.""" + # Start server in background + server_task = asyncio.create_task(server.run(host=args.host, port=args.port)) + + # Wait for shutdown signal + await shutdown_event.wait() + + # Trigger graceful shutdown + await server.shutdown() + + # Cancel server task if still running + if not server_task.done(): + server_task.cancel() + try: + await server_task + except asyncio.CancelledError: + pass + + try: + asyncio.run(run_with_shutdown()) + except KeyboardInterrupt: + logger.info("Keyboard interrupt received") + except Exception as e: + logger.error(f"Server error: {e}") + import traceback + + logger.error(traceback.format_exc()) + sys.exit(1) + + logger.info("Component server stopped") + + +if __name__ == "__main__": + main() diff --git a/docs/COMPONENT_SERVER.md b/docs/COMPONENT_SERVER.md new file mode 100644 index 000000000..ed03b917d --- /dev/null +++ b/docs/COMPONENT_SERVER.md @@ -0,0 +1,194 @@ +# Distributed Component Server/Client System + +This feature allows Predbat components to run on a separate server, useful for Kubernetes deployments or load distribution. + +## Architecture + +- **Component Server**: Standalone server that hosts remote component instances (e.g., OctopusAPI) +- **Component Client**: Proxy in main Predbat that forwards operations to the server via REST +- **Callback Server**: HTTP server in Predbat that handles base API callbacks from remote components + +## Security Warning + +⚠️ **WARNING**: This system uses pickle for serialization. Only use in trusted networks (e.g., Kubernetes cluster) as pickle can execute arbitrary code. Do not expose to untrusted networks or the internet. + +## Configuration + +Add this section to your `apps.yaml` under the Predbat app configuration: + +```yaml +predbat: + module: predbat + class: PredBat + # ... other configuration ... + + # Component server configuration (infrastructure settings) + component_server: + # URL of the component server + url: "http://componentserver:5053" + + # List of components to run remotely (component names from components.py) + components: + - octopus + + # Callback URL where this Predbat instance can be reached by the server + # If not specified, server will use X-Forwarded-For or remote IP + callback_url: "http://predbat:5054" + + # Port for callback server (default: 5054) + callback_port: 5054 + + # How often client pings server (seconds, default: 300) + poll_interval: 300 + + # Component timeout - server stops component after this many seconds of no pings (default: 1800) + timeout: 1800 +``` + +### Configuration Keys + +| Key | Type | Required | Default | Description | +|-----|------|----------|---------|-------------| +| `url` | string | Yes | - | HTTP URL of component server | +| `components` | list | Yes | - | Component names to run remotely | +| `callback_url` | string | No | Auto-detect | URL where Predbat callback server is accessible | +| `callback_port` | int | No | 5054 | Port for Predbat callback server | +| `poll_interval` | int | No | 300 | Ping interval in seconds | +| `timeout` | int | No | 1800 | Component inactivity timeout in seconds | + +## Running the Component Server + +```bash +cd apps/predbat +./run_component_server.py --host 0.0.0.0 --port 5053 --timeout 1800 --log-level INFO +``` + +### Command Line Options + +- `--host`: Server bind address (default: 0.0.0.0) +- `--port`: Server port (default: 5053) +- `--timeout`: Component inactivity timeout in seconds (default: 1800) +- `--log-level`: Logging level - DEBUG, INFO, WARNING, ERROR (default: INFO) +- `--log-file`: Optional log file path + +## How It Works + +1. **Startup**: + - Predbat starts callback server on configured port + - For components in `component_server_components` list, creates `ComponentClient` instead of actual component + - Client connects to component server and requests component start + +2. **Operation**: + - Client forwards all method calls (run, events, etc.) to server via HTTP POST + - Server invokes methods on actual component instance + - When component needs base API (get_arg, get_state_wrapper, etc.), server calls back to Predbat + - Results are pickled and returned to client + +3. **Health Monitoring**: + - Client sends ping every `component_server_poll_interval` seconds + - Server auto-stops components that haven't pinged in `timeout` seconds + - Client auto-restarts components if server reports "Component not found" + +4. **Shutdown**: + - Client sends stop request to server + - Server gracefully stops component and cleans up resources + +## Supported Components + +Currently registered for remote execution: + +- OctopusAPI + +To add more components, edit `COMPONENT_CLASSES` in `run_component_server.py`. + +## Kubernetes Deployment Example + +### Component Server Deployment + +```yaml +apiVersion: apps/v1 +kind: Deployment +metadata: + name: predbat-component-server +spec: + replicas: 1 + selector: + matchLabels: + app: predbat-component-server + template: + metadata: + labels: + app: predbat-component-server + spec: + containers: + - name: server + image: predbat:latest + command: ["python3", "/app/apps/predbat/run_component_server.py"] + args: ["--host", "0.0.0.0", "--port", "5053", "--timeout", "1800"] + ports: + - containerPort: 5053 +--- +apiVersion: v1 +kind: Service +metadata: + name: componentserver +spec: + selector: + app: predbat-component-server + ports: + - port: 5053 + targetPort: 5053 +``` + +### Predbat Deployment + +Ensure Predbat has a Service exposing port 5054 for callbacks: + +```yaml +apiVersion: v1 +kind: Service +metadata: + name: predbat +spec: + selector: + app: predbat + ports: + - name: web + port: 5052 + targetPort: 5052 + - name: callback + port: 5054 + targetPort: 5054 +``` + +## Troubleshooting + +### Component won't start + +- Check callback URL is reachable from component server +- Verify component server URL is correct +- Check logs for connection errors + +### Component keeps restarting + +- Check ping interval isn't too long relative to timeout +- Verify network connectivity between server and client +- Review server logs for timeout messages + +### Performance issues + +- Reduce number of remote components (network overhead per call) +- Increase timeout values if network is slow +- Check server resources (CPU, memory) + +## Testing + +The feature is optional - Predbat works normally when `component_server_components` is empty or `component_server_url` is not set. + +To test: + +1. Start component server manually +2. Configure Predbat with server URL and component list +3. Restart Predbat +4. Check logs for "Creating remote component client" messages +5. Verify component functionality (e.g., Octopus rates updating)