|
6 | 6 | from simplesignals.process import WorkerProcessBase |
7 | 7 | from time import sleep |
8 | 8 | import logging |
9 | | -import multiprocessing |
10 | 9 |
|
11 | 10 |
|
12 | 11 | logger = logging.getLogger(__name__) |
|
15 | 14 | DEFAULT_QUEUE_NAME = 'default' |
16 | 15 |
|
17 | 16 |
|
18 | | -def run_next_task(job): |
19 | | - """Updates a job by running its next task""" |
| 17 | +def process_job(queue_name): |
| 18 | + """This function grabs the next available job for a given queue, and runs its next task.""" |
| 19 | + |
| 20 | + with transaction.atomic(): |
| 21 | + job = Job.objects.get_ready_or_none(queue_name) |
| 22 | + if not job: |
| 23 | + return |
| 24 | + |
| 25 | + logger.info('Processing job: name="%s" queue="%s" id=%s state=%s next_task=%s', job.name, queue_name, job.pk, job.state, job.next_task) |
| 26 | + job.state = Job.STATES.PROCESSING |
| 27 | + job.save() |
| 28 | + |
20 | 29 | try: |
21 | 30 | task_function = import_by_path(job.next_task) |
22 | 31 | task_function(job) |
@@ -46,23 +55,6 @@ def run_next_task(job): |
46 | 55 | raise |
47 | 56 |
|
48 | 57 |
|
49 | | -def process_job(queue_name): |
50 | | - """This function grabs the next available job for a given queue, and runs its next task.""" |
51 | | - |
52 | | - with transaction.atomic(): |
53 | | - job = Job.objects.get_ready_or_none(queue_name) |
54 | | - if not job: |
55 | | - return |
56 | | - |
57 | | - logger.info('Processing job: name="%s" queue="%s" id=%s state=%s next_task=%s', job.name, queue_name, job.pk, job.state, job.next_task) |
58 | | - job.state = Job.STATES.PROCESSING |
59 | | - job.save() |
60 | | - |
61 | | - child = multiprocessing.Process(target=run_next_task, args=(job,)) |
62 | | - child.start() |
63 | | - child.join() |
64 | | - |
65 | | - |
66 | 58 | class Worker(WorkerProcessBase): |
67 | 59 |
|
68 | 60 | process_title = "jobworker" |
|
0 commit comments