Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions chimerapy/engine/chimerapyrc.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -39,3 +39,5 @@ config:
deque-length: 10000
interval: 10
logging-enabled: false
sync:
attempts: 10
27 changes: 27 additions & 0 deletions chimerapy/engine/clock.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
from datetime import datetime, timedelta
import ntplib
import time

from chimerapy.engine import config

# Globals
time_delta = timedelta(0)


def sync():
global time_delta
ntp_client = ntplib.NTPClient()
for i in range(config.get("sync.attempts")):
try:
response = ntp_client.request("pool.ntp.org")
time_delta = timedelta(seconds=response.offset)
except:
time.sleep(0.5)


def utcnow() -> datetime:
return datetime.utcnow() + time_delta


def now() -> datetime:
return utcnow()
5 changes: 3 additions & 2 deletions chimerapy/engine/data_protocols.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
import datetime
from typing import Dict
from dataclasses import dataclass, field

from dataclasses_json import DataClassJsonMixin

from chimerapy.engine import clock


@dataclass
class NodePubEntry(DataClassJsonMixin):
Expand All @@ -19,7 +20,7 @@ class NodePubTable(DataClassJsonMixin):
@dataclass
class NodeDiagnostics(DataClassJsonMixin):
timestamp: str = field(
default_factory=lambda: str(datetime.datetime.now().isoformat())
default_factory=lambda: str(clock.now().isoformat())
) # ISO str
latency: float = 0 # ms
payload_size: float = 0 # KB
Expand Down
4 changes: 2 additions & 2 deletions chimerapy/engine/eventbus/eventbus.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
import uuid
import asyncio
from datetime import datetime
from collections import deque
from concurrent.futures import Future
from typing import Any, Generic, Type, Callable, Awaitable, Optional, Literal, TypeVar

from aioreactive import AsyncObservable, AsyncObserver, AsyncSubject
from dataclasses import dataclass, field

from chimerapy.engine import clock
from .. import _logger
from ..networking.async_loop_thread import AsyncLoopThread

Expand All @@ -21,7 +21,7 @@ class Event:
type: str
data: Optional[Any] = None
id: str = field(default_factory=lambda: str(uuid.uuid4()))
timestamp: str = field(default_factory=lambda: datetime.utcnow().isoformat())
timestamp: str = field(default_factory=lambda: clock.now().isoformat())


class EventBus(AsyncObservable):
Expand Down
5 changes: 3 additions & 2 deletions chimerapy/engine/logger/common.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
import logging
from datetime import datetime
from logging import Filter, Formatter, Handler, StreamHandler
from logging.handlers import RotatingFileHandler
from pathlib import Path
from typing import Union, Dict

from chimerapy.engine import clock

MAX_BYTES_PER_FILE = 100 * 1024 * 1024 # 100MB


Expand Down Expand Up @@ -117,7 +118,7 @@ def emit(self, record: logging.LogRecord):
@staticmethod
def timestamp() -> str:
"""Return the current timestamp in the format YYYY-MM-DD_HH-MM-SS."""
return datetime.now().strftime("%Y-%m-%d_%H-%M-%S")
return clock.now().strftime("%Y-%m-%d_%H-%M-%S")


class MultiplexedRotatingFileHandler(MultiplexedEntityHandler):
Expand Down
3 changes: 2 additions & 1 deletion chimerapy/engine/manager/http_server_service.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import traceback
import datetime
from concurrent.futures import Future
from typing import List, Dict

Expand Down Expand Up @@ -143,6 +144,7 @@ async def _register_worker_route(self, request: web.Request):
"port": self.port,
},
"config": config.config,
"manager_datetime_now": datetime.datetime.utcnow().isoformat(),
}

# Broadcast changes
Expand All @@ -168,7 +170,6 @@ async def _update_nodes_status(self, request: web.Request):
update_dataclass(self.state.workers[worker_state.id], worker_state)
else:
logger.error(f"{self}: non-registered Worker update: {worker_state.id}")
# logger.debug(f"{self}: Nodes status update to: {self.state.workers}")

return web.HTTPOk()

Expand Down
4 changes: 0 additions & 4 deletions chimerapy/engine/manager/worker_handler_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -105,13 +105,9 @@ def _get_worker_ip(self, worker_id: str) -> str:

async def _register_worker(self, worker_state: WorkerState) -> bool:

logger.debug(f"{self}: worker_state: {worker_state} BEFORE")
evented_worker_state = make_evented(
worker_state, event_bus=self.eventbus, event_name="ManagerState.changed"
)
logger.debug(
f"{self}: worker_state: {worker_state}: {evented_worker_state} AFTER"
)
self.state.workers[worker_state.id] = evented_worker_state
logger.debug(
f"Manager registered <Worker id={worker_state.id}"
Expand Down
4 changes: 2 additions & 2 deletions chimerapy/engine/networking/data_chunk.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,14 @@
import pickle
import uuid
import blosc
import datetime
from typing import Any, Literal, Dict, List

# Third-party Imports
import numpy as np
import simplejpeg

# Internal Imports
from chimerapy.engine import clock
from chimerapy.engine._logger import getLogger

logger = getLogger("chimerapy-engine")
Expand Down Expand Up @@ -40,7 +40,7 @@ def __init__(self):
self._container["meta"] = {
"value": {
"ownership": [],
"created": datetime.datetime.now(),
"created": clock.now(),
"transmitted": None,
"received": None,
},
Expand Down
15 changes: 6 additions & 9 deletions chimerapy/engine/node/node.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
import pathlib
import logging
import uuid
import datetime
import os
import tempfile
import asyncio
Expand All @@ -15,7 +14,7 @@

# Internal Imports
from chimerapy.engine import _logger
from chimerapy.engine import config
from chimerapy.engine import clock
from ..states import NodeState
from ..networking import DataChunk
from ..networking.async_loop_thread import AsyncLoopThread
Expand Down Expand Up @@ -89,7 +88,7 @@ def __init__(
# Generic Node needs
self.logger: logging.Logger = logging.getLogger("chimerapy-engine-node")
self.logging_level: int = logging.DEBUG
self.start_time = datetime.datetime.now()
self.start_time = clock.now()

# Default values
self.node_config = NodeConfig()
Expand Down Expand Up @@ -200,7 +199,7 @@ def save_video(self, name: str, data: np.ndarray, fps: int):
return False

if self.recorder.enabled:
timestamp = datetime.datetime.now()
timestamp = clock.now()
video_entry = {
"uuid": uuid.uuid4(),
"name": name,
Expand Down Expand Up @@ -251,7 +250,7 @@ def save_audio(
"channels": channels,
"format": format,
"rate": rate,
"timestamp": datetime.datetime.now(),
"timestamp": clock.now(),
}
self.recorder.submit(audio_entry)

Expand All @@ -271,7 +270,7 @@ def save_tabular(
"name": name,
"data": data,
"dtype": "tabular",
"timestamp": datetime.datetime.now(),
"timestamp": clock.now(),
}
self.recorder.submit(tabular_entry)

Expand All @@ -289,7 +288,7 @@ def save_image(self, name: str, data: np.ndarray):
"name": name,
"data": data,
"dtype": "image",
"timestamp": datetime.datetime.now(),
"timestamp": clock.now(),
}
self.recorder.submit(image_entry)

Expand Down Expand Up @@ -422,8 +421,6 @@ def run(
self.worker_comms.in_node_config(
state=self.state, eventbus=self.eventbus, logger=self.logger
)
if self.worker_comms.worker_config:
config.update_defaults(self.worker_comms.worker_config)
elif not self.state.logdir:
self.state.logdir = pathlib.Path(tempfile.mktemp())

Expand Down
4 changes: 2 additions & 2 deletions chimerapy/engine/node/poller_service.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
import threading
import logging
import datetime
from typing import Optional, Dict, Tuple, List

import zmq

from chimerapy.engine import clock
from chimerapy.engine import _logger
from ..states import NodeState
from ..networking import Subscriber, DataChunk
Expand Down Expand Up @@ -149,7 +149,7 @@ def poll_inputs(self):
serial_data_chunk = s.recv()
data_chunk = DataChunk.from_bytes(serial_data_chunk)
meta = data_chunk.get("meta")
meta["value"]["received"] = datetime.datetime.now()
meta["value"]["received"] = clock.now()
data_chunk.update("meta", meta)

# Update the latest value
Expand Down
4 changes: 2 additions & 2 deletions chimerapy/engine/node/processor_service.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
import threading
import traceback
import datetime
import logging
import asyncio
from typing import Dict, List, Optional, Callable, Coroutine, Any, Literal

from chimerapy.engine import _logger
from chimerapy.engine import clock
from ..networking.client import Client
from ..networking.enums import NODE_MESSAGE
from ..networking import DataChunk
Expand Down Expand Up @@ -285,7 +285,7 @@ async def safe_step(self, data_chunks: Dict[str, DataChunk] = {}):

# Add timestamp and step id to the DataChunk
meta = output_data_chunk.get("meta")
meta["value"]["transmitted"] = datetime.datetime.now()
meta["value"]["transmitted"] = clock.now()
output_data_chunk.update("meta", meta)

# Send out the output to the OutputsHandler
Expand Down
4 changes: 2 additions & 2 deletions chimerapy/engine/node/profiler_service.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,14 @@
import os
import pickle
import logging
import datetime
from collections import deque
from typing import Dict, Optional, Any, List

import pandas as pd
from psutil import Process

from chimerapy.engine import config
from chimerapy.engine import clock
from ..data_protocols import NodeDiagnostics
from ..async_timer import AsyncTimer
from ..networking.data_chunk import DataChunk
Expand Down Expand Up @@ -79,7 +79,7 @@ async def diagnostics_report(self):
return None

# Get the timestamp
timestamp = datetime.datetime.now().isoformat()
timestamp = clock.now().isoformat()

# Get process-wide information
memory = self.process.memory_info()
Expand Down
12 changes: 12 additions & 0 deletions chimerapy/engine/node/worker_comms_service.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,11 @@
import pathlib
import logging
import tempfile
import datetime
from typing import Dict, Optional

from chimerapy.engine import config
from chimerapy.engine import clock
from ..networking import Client
from ..states import NodeState
from ..networking.enums import GENERAL_MESSAGE, WORKER_MESSAGE, NODE_MESSAGE
Expand All @@ -25,6 +28,7 @@ def __init__(
host: str,
port: int,
node_config: NodeConfig,
manager_worker_timedelta: Optional[datetime.timedelta] = None,
worker_logdir: Optional[pathlib.Path] = None,
worker_config: Optional[Dict] = None,
logging_level: int = logging.INFO,
Expand All @@ -42,6 +46,7 @@ def __init__(
self.worker_config = worker_config
self.logging_level = logging_level
self.node_config = node_config
self.manager_worker_timedelta = manager_worker_timedelta

# Optional
self.state = state
Expand Down Expand Up @@ -77,6 +82,13 @@ def in_node_config(
# Then add observers
self.add_observers()

# Update config if possible
if self.worker_config:
config.update_defaults(self.worker_config)

# Set clock if possible
clock.sync()

def add_observers(self):
assert self.state and self.eventbus and self.logger

Expand Down
6 changes: 6 additions & 0 deletions chimerapy/engine/worker/events.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import pathlib
import datetime
from dataclasses import dataclass, field
from typing import Dict, Any
from enum import Enum
Expand All @@ -13,6 +14,11 @@ class BroadcastEvent:
data: Dict[str, Any] = field(default_factory=dict)


@dataclass
class ManagerWorkerTimeDeltaEvent:
manager_worker_timedelta: datetime.timedelta


@dataclass
class SendMessageEvent:
client_id: str
Expand Down
Loading