From 4deec21643ea9b94247ebde8d189252980b17420 Mon Sep 17 00:00:00 2001 From: Nicolas Simonds Date: Sun, 13 Apr 2025 00:30:34 +1000 Subject: [PATCH 1/3] Fix up docker-compose command Lose deprecated config options, add quotes to int-like args --- tests/docker-compose.yml | 4 ++-- tox.ini | 6 +++--- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/tests/docker-compose.yml b/tests/docker-compose.yml index da9f9ba..9865695 100644 --- a/tests/docker-compose.yml +++ b/tests/docker-compose.yml @@ -1,9 +1,9 @@ -version: '2.1' +--- services: redis: image: docker.io/redis:latest command: "--appendonly yes" ports: - - 6379:6379 + - "6379:6379" volumes: - /tmp/redis-data:/data diff --git a/tox.ini b/tox.ini index 2a62ff8..55c87b5 100644 --- a/tox.ini +++ b/tox.ini @@ -8,7 +8,7 @@ envdir = pep8: {toxworkdir}/py3 usedevelop = True allowlist_externals = - docker-compose + docker deps = pytest pytest-cov @@ -23,6 +23,6 @@ commands = pycodestyle --ignore=E252,W503,W504 spinach tests [testenv:py3] -commands_pre = docker-compose -f {toxinidir}/tests/docker-compose.yml up -d +commands_pre = docker compose -f {toxinidir}/tests/docker-compose.yml up -d commands = pytest tests {posargs} -commands_post = docker-compose -f {toxinidir}/tests/docker-compose.yml down +commands_post = docker compose -f {toxinidir}/tests/docker-compose.yml down From b8eb951c3315f361e5c8cefc84558040515d07e3 Mon Sep 17 00:00:00 2001 From: Nicolas Simonds Date: Sun, 13 Apr 2025 01:24:39 +1000 Subject: [PATCH 2/3] Add Broker.start_at() method To give a consistent entrypoint for everyone running `datetime.now()` --- spinach/brokers/base.py | 6 ++++++ spinach/brokers/memory.py | 3 +-- spinach/brokers/redis.py | 8 +++----- tests/conftest.py | 1 - 4 files changed, 10 insertions(+), 8 deletions(-) diff --git a/spinach/brokers/base.py b/spinach/brokers/base.py index 486b03b..542e93f 100644 --- a/spinach/brokers/base.py +++ b/spinach/brokers/base.py @@ -1,3 +1,4 @@ +import math from abc import ABC, abstractmethod from datetime import datetime, timezone from logging import getLogger @@ -184,5 +185,10 @@ def _get_broker_info(self) -> Dict[str, Union[None, str, int]]: rv['last_seen_at'] = int(time.time()) return rv + @classmethod + def start_at(cls) -> datetime: + now = math.ceil(datetime.now(timezone.utc).timestamp()) + return datetime.fromtimestamp(now, tz=timezone.utc) + def __repr__(self): return '<{}: {}>'.format(self.__class__.__name__, self._id) diff --git a/spinach/brokers/memory.py b/spinach/brokers/memory.py index fed5aed..1dc6edd 100644 --- a/spinach/brokers/memory.py +++ b/spinach/brokers/memory.py @@ -1,4 +1,3 @@ -from datetime import datetime, timezone from logging import getLogger from queue import Queue, Empty import sched @@ -96,7 +95,7 @@ def register_periodic_tasks(self, tasks: Iterable[Task]): ) def _schedule_periodic_task(self, task: Task): - at = datetime.now(timezone.utc) + at = self.start_at() job = Job(task.name, task.queue, at, task.max_retries) self.enqueue_jobs([job]) self._scheduler.enter( diff --git a/spinach/brokers/redis.py b/spinach/brokers/redis.py index e3b1ef6..726d997 100644 --- a/spinach/brokers/redis.py +++ b/spinach/brokers/redis.py @@ -1,7 +1,5 @@ -from datetime import datetime, timezone import json from logging import getLogger -import math from os import path import socket import threading @@ -150,7 +148,7 @@ def move_future_jobs(self) -> int: self.namespace, self._to_namespaced(FUTURE_JOBS_KEY), self._to_namespaced(NOTIFICATIONS_KEY), - math.ceil(datetime.now(timezone.utc).timestamp()), + self.start_at().timestamp(), JobStatus.QUEUED.value, self._to_namespaced(PERIODIC_TASKS_HASH_KEY), self._to_namespaced(PERIODIC_TASKS_QUEUE_KEY), @@ -316,7 +314,7 @@ def register_periodic_tasks(self, tasks: Iterable[Task]): self._number_periodic_tasks = len(_tasks) self._run_script( self._register_periodic_tasks, - math.ceil(datetime.now(timezone.utc).timestamp()), + self.start_at().timestamp(), self._to_namespaced(PERIODIC_TASKS_HASH_KEY), self._to_namespaced(PERIODIC_TASKS_QUEUE_KEY), *_tasks @@ -361,7 +359,7 @@ def next_future_periodic_delta(self) -> Optional[float]: if not rv: return None - now = datetime.now(timezone.utc).timestamp() + now = self.start_at() next_event_time = rv[0][1] if next_event_time < now: return 0 diff --git a/tests/conftest.py b/tests/conftest.py index 32c41dd..28e49ab 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -32,7 +32,6 @@ def fromtimestamp(cls, *args, **kwargs): return datetime.fromtimestamp(*args, **kwargs) monkeypatch.setattr('spinach.brokers.base.datetime', MyDatetime) - monkeypatch.setattr('spinach.brokers.redis.datetime', MyDatetime) monkeypatch.setattr('spinach.job.datetime', MyDatetime) monkeypatch.setattr('spinach.engine.datetime', MyDatetime) monkeypatch.setattr('spinach.task.datetime', MyDatetime) From 5580fbde749c960768e330f60fea1f9c897cc38e Mon Sep 17 00:00:00 2001 From: Nicolas Simonds Date: Thu, 17 Apr 2025 14:16:13 +1000 Subject: [PATCH 3/3] feat: add `periodcity_start` parameter to Tasks Adds a new, optional, argument to task creation, `periodcity_start`, which gives Task authors finer control over scheduling of periodic tasks. Specifically, it allows Tasks to pin their start time to a defined wall-clock time, albeit on a best-effort basis, like all Tasks. Fixes: Issue #48 --- doc/user/tasks.rst | 21 +++++-- spinach/brokers/memory.py | 2 +- .../redis_scripts/register_periodic_tasks.lua | 9 ++- spinach/task.py | 62 +++++++++++++++++-- tests/test_brokers.py | 48 ++++++++++++++ tests/test_redis_brokers.py | 20 +++++- tests/test_task.py | 12 ++-- 7 files changed, 151 insertions(+), 23 deletions(-) diff --git a/doc/user/tasks.rst b/doc/user/tasks.rst index 52b45f4..60a4b12 100644 --- a/doc/user/tasks.rst +++ b/doc/user/tasks.rst @@ -125,13 +125,22 @@ Tasks marked as periodic get automatically scheduled. To run a task every 5 seco .. literalinclude:: ../../examples/periodic.py Periodic tasks get scheduled by the workers themselves, there is no need to run an additional -process only for that. Of course having multiple workers on multiple machine is fine and will not -result in duplicated tasks. +process only for that. Having multiple workers on multiple machine is fine and will not result in +duplicated tasks. -Periodic tasks run at most every `period`. If the system scheduling periodic tasks gets delayed, -nothing compensates for the time lost. This has the added benefit of periodic tasks not being -scheduled if all the workers are down for a prolonged amount of time. When they get back online, -workers won't have a storm of periodic tasks to execute. +An optional `periodicity_start` parameter may be passed, that will "snap" the initial task start +to a given value, e.g.,:: + + @tasks.task(name='snapped', periodicity=timedelta(hours=1), periodicity_start=timedelta(minutes=15)) + def foo(a, b): + pass + +This will run the task hourly, starting at the next 15-minute boundary (0, 15, 30, 45). + +Periodic tasks run at most every `periodicity`. If the system scheduling periodic tasks gets +delayed, nothing compensates for the time lost. This has the added benefit of periodic tasks not +being scheduled if all the workers are down for a prolonged amount of time. When they get back +online, workers won't have a storm of periodic tasks to execute. Tasks Registry -------------- diff --git a/spinach/brokers/memory.py b/spinach/brokers/memory.py index 1dc6edd..aa3f4bb 100644 --- a/spinach/brokers/memory.py +++ b/spinach/brokers/memory.py @@ -88,7 +88,7 @@ def register_periodic_tasks(self, tasks: Iterable[Task]): """Register tasks that need to be scheduled periodically.""" for task in tasks: self._scheduler.enter( - int(task.periodicity.total_seconds()), + int(task.periodicity.total_seconds()) + task.periodicity_start, 0, self._schedule_periodic_task, argument=(task,) diff --git a/spinach/brokers/redis_scripts/register_periodic_tasks.lua b/spinach/brokers/redis_scripts/register_periodic_tasks.lua index 5567c76..0f4d1f1 100644 --- a/spinach/brokers/redis_scripts/register_periodic_tasks.lua +++ b/spinach/brokers/redis_scripts/register_periodic_tasks.lua @@ -20,13 +20,18 @@ for i=4, #ARGV do if redis.call('hexists', periodic_tasks_hash, task["name"]) == 0 then -- the periodic task is new, add it to the queue + next_event_time = next_event_time + task["periodicity_start"] redis.call('zadd', periodic_tasks_queue, next_event_time, task["name"]) else local existing_task_json = redis.call('hget', periodic_tasks_hash, task["name"]) local existing_task = cjson.decode(existing_task_json) + -- the periodic task already existed but the periodicity or + -- periodicity_start changed, so it is reset + if existing_task["periodicity_start"] ~= task["periodicity_start"] then + next_event_time = next_event_time + task["periodicity_start"] + existing_task["periodicity"] = nil + end if existing_task["periodicity"] ~= task["periodicity"] then - -- the periodic task already existed but the periodicity changed - -- so it is reset redis.call('zadd', periodic_tasks_queue, next_event_time, task["name"]) end end diff --git a/spinach/task.py b/spinach/task.py index 5d12fab..e8e1dae 100644 --- a/spinach/task.py +++ b/spinach/task.py @@ -14,17 +14,19 @@ class Task: __slots__ = [ 'func', 'name', 'queue', 'max_retries', 'periodicity', - 'max_concurrency', + 'periodicity_start', 'max_concurrency', ] def __init__(self, func: Callable, name: str, queue: str, max_retries: Number, periodicity: Optional[timedelta], + periodicity_start: Optional[timedelta]=None, max_concurrency: Optional[int]=None): self.func = func self.name = name self.queue = queue self.max_retries = max_retries self.periodicity = periodicity + self.periodicity_start = 0 self.max_concurrency = max_concurrency # Prevent initialisation with max_concurrency set and @@ -36,6 +38,18 @@ def __init__(self, func: Callable, name: str, queue: str, ) if max_concurrency < 1: raise ValueError("max_concurrency must be greater than zero") + # "snap" periodicity to the next periodicity_start value + if periodicity_start is not None: + if periodicity is None: + raise ValueError( + "periodicity must be set if periodicity_start is set" + ) + ps = int(periodicity_start.total_seconds()) + if ps > periodicity.total_seconds(): + raise ValueError("periodicity_start must be < periodicity") + if ps > 43200: + raise ValueError("periodicity_start must be <= 12 hours") + self.periodicity_start = self._snap_to(periodicity_start) def serialize(self): periodicity = (int(self.periodicity.total_seconds()) @@ -45,6 +59,7 @@ def serialize(self): 'queue': self.queue, 'max_retries': self.max_retries, 'periodicity': periodicity, + 'periodicity_start': self.periodicity_start, 'max_concurrency': self.max_concurrency or -1, }, sort_keys=True) @@ -53,9 +68,9 @@ def task_name(self): return self.name def __repr__(self): - return 'Task({}, {}, {}, {}, {}, {})'.format( + return 'Task({}, {}, {}, {}, {}, {}, {})'.format( self.func, self.name, self.queue, self.max_retries, - self.periodicity, self.max_concurrency, + self.periodicity, self.periodicity_start, self.max_concurrency, ) def __eq__(self, other): @@ -67,6 +82,28 @@ def __eq__(self, other): return False return True + def _snap_to(self, snap_interval: timedelta) -> int: + # NOTE(nic): we treat the timedelta object as three distinct snap + # values, packed together. When applying them, we snap to the + # largest defined unit, and add the smaller units unconditionally. + # This is because while an hourly periodic with a snap of 15m15s + # would *technically* snap to the top of the hour, the intent of such + # a snap interval is pretty clearly "run this 15 seconds after the + # next quarter-hour". It's also simpler. + now = datetime.now(tz=timezone.utc).replace(microsecond=0) + epoch = datetime.fromtimestamp(0, tz=timezone.utc) + delta = epoch + snap_interval + if delta.hour: + mod = (now - epoch) % timedelta(hours=delta.hour) + elif delta.minute: + mod = (now - epoch) % timedelta(minutes=delta.minute) + elif delta.second: + mod = (now - epoch) % timedelta(seconds=delta.second) + + if not mod: + return 0 + return int((snap_interval - mod).total_seconds()) + Schedulable = Union[str, Callable, Task] @@ -78,6 +115,8 @@ class Tasks: :arg max_retries: default retry policy for tasks :arg periodicity: for periodic tasks, delay between executions as a timedelta + :arg periodicity_start: for periodic tasks, clamp the start time to the + given interval, expressed as a timedelta :arg max_concurrency: maximum number of simultaneous Jobs that can be started for this Task. Requires max_retries to be also set. """ @@ -87,11 +126,13 @@ class Tasks: def __init__(self, queue: Optional[str]=None, max_retries: Optional[Number]=None, periodicity: Optional[timedelta]=None, + periodicity_start: Optional[timedelta]=None, max_concurrency: Optional[int]=None): self._tasks = {} self.queue = queue self.max_retries = max_retries self.periodicity = periodicity + self.periodicity_start = periodicity_start self.max_concurrency = max_concurrency self._spin = None @@ -122,6 +163,7 @@ def get(self, name: Schedulable) -> Task: def task(self, func: Optional[Callable]=None, name: Optional[str]=None, queue: Optional[str]=None, max_retries: Optional[Number]=None, periodicity: Optional[timedelta]=None, + periodicity_start: Optional[timedelta]=None, max_concurrency: Optional[int]=None): """Decorator to register a task function. @@ -131,6 +173,8 @@ def task(self, func: Optional[Callable]=None, name: Optional[str]=None, not provided :arg periodicity: for periodic tasks, delay between executions as a timedelta + :arg periodicity_start: for periodic tasks, clamp the start time to the + given interval, expressed as a timedelta :arg max_concurrency: maximum number of simultaneous Jobs that can be started for this Task. Requires max_retries to be also set. @@ -143,10 +187,12 @@ def task(self, func: Optional[Callable]=None, name: Optional[str]=None, return functools.partial(self.task, name=name, queue=queue, max_retries=max_retries, periodicity=periodicity, + periodicity_start=periodicity_start, max_concurrency=max_concurrency) self.add(func, name=name, queue=queue, max_retries=max_retries, - periodicity=periodicity, max_concurrency=max_concurrency) + periodicity=periodicity, periodicity_start=periodicity_start, + max_concurrency=max_concurrency) # Add an attribute to the function to be able to conveniently use it as # spin.schedule(function) instead of spin.schedule('task_name') @@ -157,6 +203,7 @@ def task(self, func: Optional[Callable]=None, name: Optional[str]=None, def add(self, func: Callable, name: Optional[str]=None, queue: Optional[str]=None, max_retries: Optional[Number]=None, periodicity: Optional[timedelta]=None, + periodicity_start: Optional[timedelta]=None, max_concurrency: Optional[int]=None): """Register a task function. @@ -167,6 +214,8 @@ def add(self, func: Callable, name: Optional[str]=None, not provided :arg periodicity: for periodic tasks, delay between executions as a timedelta + :arg periodicity_start: for periodic tasks, clamp the start time to the + given interval, expressed as a timedelta :arg max_concurrency: maximum number of simultaneous Jobs that can be started for this Task. Requires max_retries to be also set. @@ -192,6 +241,8 @@ def add(self, func: Callable, name: Optional[str]=None, if periodicity is None: periodicity = self.periodicity + if periodicity_start is None: + periodicity_start = self.periodicity_start if max_concurrency is None: max_concurrency = self.max_concurrency @@ -200,7 +251,8 @@ def add(self, func: Callable, name: Optional[str]=None, 'Spinach for internal use') self._tasks[name] = Task( - func, name, queue, max_retries, periodicity, max_concurrency + func, name, queue, max_retries, periodicity, periodicity_start, + max_concurrency ) def _require_attached_tasks(self): diff --git a/tests/test_brokers.py b/tests/test_brokers.py index 5d70f69..7ec20c0 100644 --- a/tests/test_brokers.py +++ b/tests/test_brokers.py @@ -137,3 +137,51 @@ def test_periodic_tasks(broker): assert r[0][1] == 'foo' assert r[1][1] == 'bar' assert r[0][0] == r[1][0] - 5 + + +@pytest.fixture(params=tuple(range(1, 12))) +def periodicity(request): + value = request.param + yield value + + +def test_periodic_tasks_with_snap_values(broker, patch_now, periodicity): + set_now(datetime(2025, 4, 15, 0, 26, 21, 193337)) + next_p = periodicity * 2 + tasks = [ + Task(print, 'foo', 'q1', 0, timedelta(hours=periodicity), + timedelta(minutes=15)), + Task(print, 'bar', 'q1', 0, timedelta(hours=next_p), + timedelta(minutes=15, seconds=5)), + ] + broker.register_periodic_tasks(tasks) + r = broker.inspect_periodic_tasks() + assert r[0][1] == 'foo' + assert r[1][1] == 'bar' + assert r[0][0] == r[1][0] - (3600 * periodicity) - 5 + + +def test_periodic_tasks_with_snap_values_roll(broker, patch_now, periodicity): + set_now(datetime(2025, 4, 17, 23, 51, 21, 288362)) + next_p = periodicity * 2 + tasks = [ + Task(print, 'foo', 'q1', 0, timedelta(hours=periodicity), + timedelta(minutes=15)), + Task(print, 'bar', 'q1', 0, timedelta(hours=next_p), + timedelta(minutes=15, seconds=5)), + ] + broker.register_periodic_tasks(tasks) + r = broker.inspect_periodic_tasks() + assert r[0][1] == 'foo' + assert r[1][1] == 'bar' + assert r[0][0] == r[1][0] - (3600 * periodicity) - 5 + + +def test_periodic_tasks_with_snap_seconds_invalid_values(): + with pytest.raises(ValueError, match="periodicity must be set"): + Task(print, 'foo', 'q1', 0, None, timedelta(minutes=15)) + with pytest.raises(ValueError, match="must be < periodicity"): + Task(print, 'foo', 'q1', 0, timedelta(minutes=5), + timedelta(minutes=15)) + with pytest.raises(ValueError, match="must be <= 12 hours"): + Task(print, 'foo', 'q1', 0, timedelta(hours=13), timedelta(hours=13)) diff --git a/tests/test_redis_brokers.py b/tests/test_redis_brokers.py index 6b31e92..fc552ef 100644 --- a/tests/test_redis_brokers.py +++ b/tests/test_redis_brokers.py @@ -432,16 +432,16 @@ def test_old_periodic_tasks(broker): assert broker._number_periodic_tasks == 2 assert broker._r.hgetall(periodic_tasks_hash_key) == { b'foo': b'{"max_concurrency": -1, "max_retries": 0, "name": "foo", ' - b'"periodicity": 5, "queue": "q1"}', + b'"periodicity": 5, "periodicity_start": 0, "queue": "q1"}', b'bar': b'{"max_concurrency": -1, "max_retries": 0, "name": "bar", ' - b'"periodicity": 10, "queue": "q1"}' + b'"periodicity": 10, "periodicity_start": 0, "queue": "q1"}' } broker.register_periodic_tasks([tasks[1]]) assert broker._number_periodic_tasks == 1 assert broker._r.hgetall(periodic_tasks_hash_key) == { b'bar': b'{"max_concurrency": -1, "max_retries": 0, "name": "bar", ' - b'"periodicity": 10, "queue": "q1"}' + b'"periodicity": 10, "periodicity_start": 0, "queue": "q1"}' } @@ -455,3 +455,17 @@ def test_idempotency_token(_, broker): jobs = broker.get_jobs_from_queue('foo_queue', max_jobs=10) job_1.status = JobStatus.RUNNING assert jobs == [job_1] + + +def test_periodic_tasks_changing_snap_value_reschedules(broker, patch_now): + tasks = [Task(print, 'foo', 'q1', 0, timedelta(hours=1))] + broker.register_periodic_tasks(tasks) + old = broker.inspect_periodic_tasks() + tasks = [ + Task(print, 'foo', 'q1', 0, timedelta(hours=1), timedelta(minutes=15)) + ] + broker.register_periodic_tasks(tasks) + new = broker.inspect_periodic_tasks() + # Ensure that the task is "the same", but the scheduled time has moved + assert old[0][1] == new[0][1] + assert (old[0][0] + tasks[0].periodicity_start) == new[0][0] diff --git a/tests/test_task.py b/tests/test_task.py index 0dc14b9..82fe2b1 100644 --- a/tests/test_task.py +++ b/tests/test_task.py @@ -39,9 +39,9 @@ def test_task_eq(task): def test_task_repr(task): task_repr = repr(task) - expected = 'Task({}, {}, {}, {}, {}, {})'.format( + expected = 'Task({}, {}, {}, {}, {}, {}, {})'.format( task.func, task.name, task.queue, task.max_retries, - task.periodicity, task.max_concurrency, + task.periodicity, task.periodicity_start, task.max_concurrency, ) assert task_repr == expected @@ -67,16 +67,16 @@ def test_tasks_add(task): def test_task_serialize(task): expected = ( '{"max_concurrency": -1, "max_retries": 0, ' - '"name": "write_to_stdout", ' - '"periodicity": null, "queue": "foo_queue"}' + '"name": "write_to_stdout", "periodicity": null, ' + '"periodicity_start": 0, "queue": "foo_queue"}' ) assert task.serialize() == expected task.periodicity = timedelta(minutes=5) expected = ( '{"max_concurrency": -1, "max_retries": 0, ' - '"name": "write_to_stdout", ' - '"periodicity": 300, "queue": "foo_queue"}' + '"name": "write_to_stdout", "periodicity": 300, ' + '"periodicity_start": 0, "queue": "foo_queue"}' ) assert task.serialize() == expected