Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 15 additions & 6 deletions doc/user/tasks.rst
Original file line number Diff line number Diff line change
Expand Up @@ -125,13 +125,22 @@ Tasks marked as periodic get automatically scheduled. To run a task every 5 seco
.. literalinclude:: ../../examples/periodic.py

Periodic tasks get scheduled by the workers themselves, there is no need to run an additional
process only for that. Of course having multiple workers on multiple machine is fine and will not
result in duplicated tasks.
process only for that. Having multiple workers on multiple machine is fine and will not result in
duplicated tasks.

Periodic tasks run at most every `period`. If the system scheduling periodic tasks gets delayed,
nothing compensates for the time lost. This has the added benefit of periodic tasks not being
scheduled if all the workers are down for a prolonged amount of time. When they get back online,
workers won't have a storm of periodic tasks to execute.
An optional `periodicity_start` parameter may be passed, that will "snap" the initial task start
to a given value, e.g.,::

@tasks.task(name='snapped', periodicity=timedelta(hours=1), periodicity_start=timedelta(minutes=15))
def foo(a, b):
pass

This will run the task hourly, starting at the next 15-minute boundary (0, 15, 30, 45).

Periodic tasks run at most every `periodicity`. If the system scheduling periodic tasks gets
delayed, nothing compensates for the time lost. This has the added benefit of periodic tasks not
being scheduled if all the workers are down for a prolonged amount of time. When they get back
online, workers won't have a storm of periodic tasks to execute.

Tasks Registry
--------------
Expand Down
6 changes: 6 additions & 0 deletions spinach/brokers/base.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import math
from abc import ABC, abstractmethod
from datetime import datetime, timezone
from logging import getLogger
Expand Down Expand Up @@ -184,5 +185,10 @@ def _get_broker_info(self) -> Dict[str, Union[None, str, int]]:
rv['last_seen_at'] = int(time.time())
return rv

@classmethod
def start_at(cls) -> datetime:
now = math.ceil(datetime.now(timezone.utc).timestamp())
return datetime.fromtimestamp(now, tz=timezone.utc)

def __repr__(self):
return '<{}: {}>'.format(self.__class__.__name__, self._id)
5 changes: 2 additions & 3 deletions spinach/brokers/memory.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
from datetime import datetime, timezone
from logging import getLogger
from queue import Queue, Empty
import sched
Expand Down Expand Up @@ -89,14 +88,14 @@ def register_periodic_tasks(self, tasks: Iterable[Task]):
"""Register tasks that need to be scheduled periodically."""
for task in tasks:
self._scheduler.enter(
int(task.periodicity.total_seconds()),
int(task.periodicity.total_seconds()) + task.periodicity_start,
0,
self._schedule_periodic_task,
argument=(task,)
)

def _schedule_periodic_task(self, task: Task):
at = datetime.now(timezone.utc)
at = self.start_at()
job = Job(task.name, task.queue, at, task.max_retries)
self.enqueue_jobs([job])
self._scheduler.enter(
Expand Down
8 changes: 3 additions & 5 deletions spinach/brokers/redis.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
from datetime import datetime, timezone
import json
from logging import getLogger
import math
from os import path
import socket
import threading
Expand Down Expand Up @@ -150,7 +148,7 @@ def move_future_jobs(self) -> int:
self.namespace,
self._to_namespaced(FUTURE_JOBS_KEY),
self._to_namespaced(NOTIFICATIONS_KEY),
math.ceil(datetime.now(timezone.utc).timestamp()),
self.start_at().timestamp(),
JobStatus.QUEUED.value,
self._to_namespaced(PERIODIC_TASKS_HASH_KEY),
self._to_namespaced(PERIODIC_TASKS_QUEUE_KEY),
Expand Down Expand Up @@ -316,7 +314,7 @@ def register_periodic_tasks(self, tasks: Iterable[Task]):
self._number_periodic_tasks = len(_tasks)
self._run_script(
self._register_periodic_tasks,
math.ceil(datetime.now(timezone.utc).timestamp()),
self.start_at().timestamp(),
self._to_namespaced(PERIODIC_TASKS_HASH_KEY),
self._to_namespaced(PERIODIC_TASKS_QUEUE_KEY),
*_tasks
Expand Down Expand Up @@ -361,7 +359,7 @@ def next_future_periodic_delta(self) -> Optional[float]:
if not rv:
return None

now = datetime.now(timezone.utc).timestamp()
now = self.start_at()
next_event_time = rv[0][1]
if next_event_time < now:
return 0
Expand Down
9 changes: 7 additions & 2 deletions spinach/brokers/redis_scripts/register_periodic_tasks.lua
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,18 @@ for i=4, #ARGV do

if redis.call('hexists', periodic_tasks_hash, task["name"]) == 0 then
-- the periodic task is new, add it to the queue
next_event_time = next_event_time + task["periodicity_start"]
redis.call('zadd', periodic_tasks_queue, next_event_time, task["name"])
else
local existing_task_json = redis.call('hget', periodic_tasks_hash, task["name"])
local existing_task = cjson.decode(existing_task_json)
-- the periodic task already existed but the periodicity or
-- periodicity_start changed, so it is reset
if existing_task["periodicity_start"] ~= task["periodicity_start"] then
next_event_time = next_event_time + task["periodicity_start"]
existing_task["periodicity"] = nil
end
if existing_task["periodicity"] ~= task["periodicity"] then
-- the periodic task already existed but the periodicity changed
-- so it is reset
redis.call('zadd', periodic_tasks_queue, next_event_time, task["name"])
end
end
Expand Down
62 changes: 57 additions & 5 deletions spinach/task.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,17 +14,19 @@ class Task:

__slots__ = [
'func', 'name', 'queue', 'max_retries', 'periodicity',
'max_concurrency',
'periodicity_start', 'max_concurrency',
]

def __init__(self, func: Callable, name: str, queue: str,
max_retries: Number, periodicity: Optional[timedelta],
periodicity_start: Optional[timedelta]=None,
max_concurrency: Optional[int]=None):
self.func = func
self.name = name
self.queue = queue
self.max_retries = max_retries
self.periodicity = periodicity
self.periodicity_start = 0
self.max_concurrency = max_concurrency

# Prevent initialisation with max_concurrency set and
Expand All @@ -36,6 +38,18 @@ def __init__(self, func: Callable, name: str, queue: str,
)
if max_concurrency < 1:
raise ValueError("max_concurrency must be greater than zero")
# "snap" periodicity to the next periodicity_start value
if periodicity_start is not None:
if periodicity is None:
raise ValueError(
"periodicity must be set if periodicity_start is set"
)
ps = int(periodicity_start.total_seconds())
if ps > periodicity.total_seconds():
raise ValueError("periodicity_start must be < periodicity")
if ps > 43200:
raise ValueError("periodicity_start must be <= 12 hours")
self.periodicity_start = self._snap_to(periodicity_start)

def serialize(self):
periodicity = (int(self.periodicity.total_seconds())
Expand All @@ -45,6 +59,7 @@ def serialize(self):
'queue': self.queue,
'max_retries': self.max_retries,
'periodicity': periodicity,
'periodicity_start': self.periodicity_start,
'max_concurrency': self.max_concurrency or -1,
}, sort_keys=True)

Expand All @@ -53,9 +68,9 @@ def task_name(self):
return self.name

def __repr__(self):
return 'Task({}, {}, {}, {}, {}, {})'.format(
return 'Task({}, {}, {}, {}, {}, {}, {})'.format(
self.func, self.name, self.queue, self.max_retries,
self.periodicity, self.max_concurrency,
self.periodicity, self.periodicity_start, self.max_concurrency,
)

def __eq__(self, other):
Expand All @@ -67,6 +82,28 @@ def __eq__(self, other):
return False
return True

def _snap_to(self, snap_interval: timedelta) -> int:
# NOTE(nic): we treat the timedelta object as three distinct snap
# values, packed together. When applying them, we snap to the
# largest defined unit, and add the smaller units unconditionally.
# This is because while an hourly periodic with a snap of 15m15s
# would *technically* snap to the top of the hour, the intent of such
# a snap interval is pretty clearly "run this 15 seconds after the
# next quarter-hour". It's also simpler.
now = datetime.now(tz=timezone.utc).replace(microsecond=0)
epoch = datetime.fromtimestamp(0, tz=timezone.utc)
delta = epoch + snap_interval
if delta.hour:
mod = (now - epoch) % timedelta(hours=delta.hour)
elif delta.minute:
mod = (now - epoch) % timedelta(minutes=delta.minute)
elif delta.second:
mod = (now - epoch) % timedelta(seconds=delta.second)

if not mod:
return 0
return int((snap_interval - mod).total_seconds())


Schedulable = Union[str, Callable, Task]

Expand All @@ -78,6 +115,8 @@ class Tasks:
:arg max_retries: default retry policy for tasks
:arg periodicity: for periodic tasks, delay between executions as a
timedelta
:arg periodicity_start: for periodic tasks, clamp the start time to the
given interval, expressed as a timedelta
:arg max_concurrency: maximum number of simultaneous Jobs that can be
started for this Task. Requires max_retries to be also set.
"""
Expand All @@ -87,11 +126,13 @@ class Tasks:
def __init__(self, queue: Optional[str]=None,
max_retries: Optional[Number]=None,
periodicity: Optional[timedelta]=None,
periodicity_start: Optional[timedelta]=None,
max_concurrency: Optional[int]=None):
self._tasks = {}
self.queue = queue
self.max_retries = max_retries
self.periodicity = periodicity
self.periodicity_start = periodicity_start
self.max_concurrency = max_concurrency
self._spin = None

Expand Down Expand Up @@ -122,6 +163,7 @@ def get(self, name: Schedulable) -> Task:
def task(self, func: Optional[Callable]=None, name: Optional[str]=None,
queue: Optional[str]=None, max_retries: Optional[Number]=None,
periodicity: Optional[timedelta]=None,
periodicity_start: Optional[timedelta]=None,
max_concurrency: Optional[int]=None):
"""Decorator to register a task function.

Expand All @@ -131,6 +173,8 @@ def task(self, func: Optional[Callable]=None, name: Optional[str]=None,
not provided
:arg periodicity: for periodic tasks, delay between executions as a
timedelta
:arg periodicity_start: for periodic tasks, clamp the start time to the
given interval, expressed as a timedelta
:arg max_concurrency: maximum number of simultaneous Jobs that can be
started for this Task. Requires max_retries to be also set.

Expand All @@ -143,10 +187,12 @@ def task(self, func: Optional[Callable]=None, name: Optional[str]=None,
return functools.partial(self.task, name=name, queue=queue,
max_retries=max_retries,
periodicity=periodicity,
periodicity_start=periodicity_start,
max_concurrency=max_concurrency)

self.add(func, name=name, queue=queue, max_retries=max_retries,
periodicity=periodicity, max_concurrency=max_concurrency)
periodicity=periodicity, periodicity_start=periodicity_start,
max_concurrency=max_concurrency)

# Add an attribute to the function to be able to conveniently use it as
# spin.schedule(function) instead of spin.schedule('task_name')
Expand All @@ -157,6 +203,7 @@ def task(self, func: Optional[Callable]=None, name: Optional[str]=None,
def add(self, func: Callable, name: Optional[str]=None,
queue: Optional[str]=None, max_retries: Optional[Number]=None,
periodicity: Optional[timedelta]=None,
periodicity_start: Optional[timedelta]=None,
max_concurrency: Optional[int]=None):
"""Register a task function.

Expand All @@ -167,6 +214,8 @@ def add(self, func: Callable, name: Optional[str]=None,
not provided
:arg periodicity: for periodic tasks, delay between executions as a
timedelta
:arg periodicity_start: for periodic tasks, clamp the start time to the
given interval, expressed as a timedelta
:arg max_concurrency: maximum number of simultaneous Jobs that can be
started for this Task. Requires max_retries to be also set.

Expand All @@ -192,6 +241,8 @@ def add(self, func: Callable, name: Optional[str]=None,

if periodicity is None:
periodicity = self.periodicity
if periodicity_start is None:
periodicity_start = self.periodicity_start
if max_concurrency is None:
max_concurrency = self.max_concurrency

Expand All @@ -200,7 +251,8 @@ def add(self, func: Callable, name: Optional[str]=None,
'Spinach for internal use')

self._tasks[name] = Task(
func, name, queue, max_retries, periodicity, max_concurrency
func, name, queue, max_retries, periodicity, periodicity_start,
max_concurrency
)

def _require_attached_tasks(self):
Expand Down
1 change: 0 additions & 1 deletion tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@ def fromtimestamp(cls, *args, **kwargs):
return datetime.fromtimestamp(*args, **kwargs)

monkeypatch.setattr('spinach.brokers.base.datetime', MyDatetime)
monkeypatch.setattr('spinach.brokers.redis.datetime', MyDatetime)
monkeypatch.setattr('spinach.job.datetime', MyDatetime)
monkeypatch.setattr('spinach.engine.datetime', MyDatetime)
monkeypatch.setattr('spinach.task.datetime', MyDatetime)
Expand Down
4 changes: 2 additions & 2 deletions tests/docker-compose.yml
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
version: '2.1'
---
services:
redis:
image: docker.io/redis:latest
command: "--appendonly yes"
ports:
- 6379:6379
- "6379:6379"
volumes:
- /tmp/redis-data:/data
48 changes: 48 additions & 0 deletions tests/test_brokers.py
Original file line number Diff line number Diff line change
Expand Up @@ -137,3 +137,51 @@ def test_periodic_tasks(broker):
assert r[0][1] == 'foo'
assert r[1][1] == 'bar'
assert r[0][0] == r[1][0] - 5


@pytest.fixture(params=tuple(range(1, 12)))
def periodicity(request):
value = request.param
yield value


def test_periodic_tasks_with_snap_values(broker, patch_now, periodicity):
set_now(datetime(2025, 4, 15, 0, 26, 21, 193337))
next_p = periodicity * 2
tasks = [
Task(print, 'foo', 'q1', 0, timedelta(hours=periodicity),
timedelta(minutes=15)),
Task(print, 'bar', 'q1', 0, timedelta(hours=next_p),
timedelta(minutes=15, seconds=5)),
]
broker.register_periodic_tasks(tasks)
r = broker.inspect_periodic_tasks()
assert r[0][1] == 'foo'
assert r[1][1] == 'bar'
assert r[0][0] == r[1][0] - (3600 * periodicity) - 5


def test_periodic_tasks_with_snap_values_roll(broker, patch_now, periodicity):
set_now(datetime(2025, 4, 17, 23, 51, 21, 288362))
next_p = periodicity * 2
tasks = [
Task(print, 'foo', 'q1', 0, timedelta(hours=periodicity),
timedelta(minutes=15)),
Task(print, 'bar', 'q1', 0, timedelta(hours=next_p),
timedelta(minutes=15, seconds=5)),
]
broker.register_periodic_tasks(tasks)
r = broker.inspect_periodic_tasks()
assert r[0][1] == 'foo'
assert r[1][1] == 'bar'
assert r[0][0] == r[1][0] - (3600 * periodicity) - 5


def test_periodic_tasks_with_snap_seconds_invalid_values():
with pytest.raises(ValueError, match="periodicity must be set"):
Task(print, 'foo', 'q1', 0, None, timedelta(minutes=15))
with pytest.raises(ValueError, match="must be < periodicity"):
Task(print, 'foo', 'q1', 0, timedelta(minutes=5),
timedelta(minutes=15))
with pytest.raises(ValueError, match="must be <= 12 hours"):
Task(print, 'foo', 'q1', 0, timedelta(hours=13), timedelta(hours=13))
Loading