diff --git a/chimerapy/engine/chimerapyrc.yaml b/chimerapy/engine/chimerapyrc.yaml index 7f826f48..bcc2d9b9 100644 --- a/chimerapy/engine/chimerapyrc.yaml +++ b/chimerapy/engine/chimerapyrc.yaml @@ -39,3 +39,5 @@ config: deque-length: 10000 interval: 10 logging-enabled: false + sync: + attempts: 10 diff --git a/chimerapy/engine/clock.py b/chimerapy/engine/clock.py new file mode 100644 index 00000000..59fc1c24 --- /dev/null +++ b/chimerapy/engine/clock.py @@ -0,0 +1,27 @@ +from datetime import datetime, timedelta +import ntplib +import time + +from chimerapy.engine import config + +# Globals +time_delta = timedelta(0) + + +def sync(): + global time_delta + ntp_client = ntplib.NTPClient() + for i in range(config.get("sync.attempts")): + try: + response = ntp_client.request("pool.ntp.org") + time_delta = timedelta(seconds=response.offset) + except: + time.sleep(0.5) + + +def utcnow() -> datetime: + return datetime.utcnow() + time_delta + + +def now() -> datetime: + return utcnow() diff --git a/chimerapy/engine/data_protocols.py b/chimerapy/engine/data_protocols.py index 1d85553d..1467d0f3 100644 --- a/chimerapy/engine/data_protocols.py +++ b/chimerapy/engine/data_protocols.py @@ -1,9 +1,10 @@ -import datetime from typing import Dict from dataclasses import dataclass, field from dataclasses_json import DataClassJsonMixin +from chimerapy.engine import clock + @dataclass class NodePubEntry(DataClassJsonMixin): @@ -19,7 +20,7 @@ class NodePubTable(DataClassJsonMixin): @dataclass class NodeDiagnostics(DataClassJsonMixin): timestamp: str = field( - default_factory=lambda: str(datetime.datetime.now().isoformat()) + default_factory=lambda: str(clock.now().isoformat()) ) # ISO str latency: float = 0 # ms payload_size: float = 0 # KB diff --git a/chimerapy/engine/eventbus/eventbus.py b/chimerapy/engine/eventbus/eventbus.py index 08023c3c..c655fa9e 100644 --- a/chimerapy/engine/eventbus/eventbus.py +++ b/chimerapy/engine/eventbus/eventbus.py @@ -1,6 +1,5 @@ import uuid import asyncio -from datetime import datetime from collections import deque from concurrent.futures import Future from typing import Any, Generic, Type, Callable, Awaitable, Optional, Literal, TypeVar @@ -8,6 +7,7 @@ from aioreactive import AsyncObservable, AsyncObserver, AsyncSubject from dataclasses import dataclass, field +from chimerapy.engine import clock from .. import _logger from ..networking.async_loop_thread import AsyncLoopThread @@ -21,7 +21,7 @@ class Event: type: str data: Optional[Any] = None id: str = field(default_factory=lambda: str(uuid.uuid4())) - timestamp: str = field(default_factory=lambda: datetime.utcnow().isoformat()) + timestamp: str = field(default_factory=lambda: clock.now().isoformat()) class EventBus(AsyncObservable): diff --git a/chimerapy/engine/logger/common.py b/chimerapy/engine/logger/common.py index ea1ad302..45b0b4a9 100644 --- a/chimerapy/engine/logger/common.py +++ b/chimerapy/engine/logger/common.py @@ -1,10 +1,11 @@ import logging -from datetime import datetime from logging import Filter, Formatter, Handler, StreamHandler from logging.handlers import RotatingFileHandler from pathlib import Path from typing import Union, Dict +from chimerapy.engine import clock + MAX_BYTES_PER_FILE = 100 * 1024 * 1024 # 100MB @@ -117,7 +118,7 @@ def emit(self, record: logging.LogRecord): @staticmethod def timestamp() -> str: """Return the current timestamp in the format YYYY-MM-DD_HH-MM-SS.""" - return datetime.now().strftime("%Y-%m-%d_%H-%M-%S") + return clock.now().strftime("%Y-%m-%d_%H-%M-%S") class MultiplexedRotatingFileHandler(MultiplexedEntityHandler): diff --git a/chimerapy/engine/manager/http_server_service.py b/chimerapy/engine/manager/http_server_service.py index c5ad83d3..ff29068a 100644 --- a/chimerapy/engine/manager/http_server_service.py +++ b/chimerapy/engine/manager/http_server_service.py @@ -1,4 +1,5 @@ import traceback +import datetime from concurrent.futures import Future from typing import List, Dict @@ -143,6 +144,7 @@ async def _register_worker_route(self, request: web.Request): "port": self.port, }, "config": config.config, + "manager_datetime_now": datetime.datetime.utcnow().isoformat(), } # Broadcast changes @@ -168,7 +170,6 @@ async def _update_nodes_status(self, request: web.Request): update_dataclass(self.state.workers[worker_state.id], worker_state) else: logger.error(f"{self}: non-registered Worker update: {worker_state.id}") - # logger.debug(f"{self}: Nodes status update to: {self.state.workers}") return web.HTTPOk() diff --git a/chimerapy/engine/manager/worker_handler_service.py b/chimerapy/engine/manager/worker_handler_service.py index 539b54b2..1e77cbca 100644 --- a/chimerapy/engine/manager/worker_handler_service.py +++ b/chimerapy/engine/manager/worker_handler_service.py @@ -105,13 +105,9 @@ def _get_worker_ip(self, worker_id: str) -> str: async def _register_worker(self, worker_state: WorkerState) -> bool: - logger.debug(f"{self}: worker_state: {worker_state} BEFORE") evented_worker_state = make_evented( worker_state, event_bus=self.eventbus, event_name="ManagerState.changed" ) - logger.debug( - f"{self}: worker_state: {worker_state}: {evented_worker_state} AFTER" - ) self.state.workers[worker_state.id] = evented_worker_state logger.debug( f"Manager registered bool: name="worker", host=self.state.ip, port=self.state.port, + manager_worker_timedelta=self.manager_worker_timedelta, worker_logdir=self.state.tempfolder, worker_config=config.config, node_config=node_config, diff --git a/pyproject.toml b/pyproject.toml index 0460a29a..df9194e0 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -34,7 +34,8 @@ dependencies = [ 'aiofiles', 'zeroconf', 'aioreactive', - 'psutil' + 'psutil', + 'ntplib' ] [project.optional-dependencies] diff --git a/test/manager/test_manager.py b/test/manager/test_manager.py index f3136bf8..a88e2532 100644 --- a/test/manager/test_manager.py +++ b/test/manager/test_manager.py @@ -68,7 +68,8 @@ def test_sending_package(self, manager, _worker, config_graph): assert manager.workers[_worker.id].nodes[node_id].fsm != "NULL" # @pytest.mark.parametrize("context", ["multiprocessing", "threading"]) - @pytest.mark.parametrize("context", ["threading"]) + # @pytest.mark.parametrize("context", ["threading"]) + @pytest.mark.parametrize("context", ["multiprocessing"]) def test_manager_lifecycle(self, manager_with_worker, context): manager, worker = manager_with_worker @@ -84,16 +85,9 @@ def test_manager_lifecycle(self, manager_with_worker, context): manager.commit_graph(graph, mapping, context=context).result(timeout=30) assert manager.start().result() - - # time.sleep(3) - assert manager.record().result() - # for i in range(50): - # logger.debug(manager.state) - # time.sleep(1) - - time.sleep(20) + time.sleep(10) assert manager.stop().result() assert manager.collect().result()