For an overview of how to use php-resque, see README.md
.
The following is a step-by-step breakdown of how php-resque operates.
What happens when you call Resque\Resque::enqueue()
?
Resque\Resque::enqueue()
callsResque\JobHandler::create()
with the same arguments it received.Resque\JobHandler::create()
checks that your$args
(the third argument) are eithernull
or in an arrayResque\JobHandler::create()
generates a job ID (a "token" in most of the docs)Resque\JobHandler::create()
pushes the job to the requested queue (first argument)Resque\JobHandler::create()
, if status monitoring is enabled for the job (fourth argument), callsResque\Job\Status::create()
with the job ID as its only argumentResque\Job\Status::create()
creates a key in Redis with the job ID in its name, and the current status (as well as a couple of timestamps) as its value, then returns control toResque\JobHandler::create()
Resque\JobHandler::create()
returns control toResque\Resque::enqueue()
, with the job ID as a return valueResque\Resque::enqueue()
triggers theafterEnqueue
event, then returns control to your application, again with the job ID as its return value
How do the workers process the queues?
Resque\Worker\ResqueWorker::work()
, the main loop of the worker process, callsResque\Worker\ResqueWorker->reserve()
to check for a jobResque\Worker\ResqueWorker->reserve()
checks whether to use blocking pops or not (fromBLOCKING
), then acts accordingly:
- Blocking Pop
Resque\Worker\ResqueWorker->reserve()
callsResque\JobHandler::reserveBlocking()
with the entire queue list and the timeout (fromINTERVAL
) as argumentsResque\JobHandler::reserveBlocking()
callsResque\Resque::blpop()
(which in turn calls Redis'blpop
, after prepping the queue list for the call, then processes the response for consistency with other aspects of the library, before finally returning control [and the queue/content of the retrieved job, if any] toResque\JobHandler::reserveBlocking()
)Resque\JobHandler::reserveBlocking()
checks whether the job content is an array (it should contain the job's type [class], payload [args], and ID), and aborts processing if notResque\JobHandler::reserveBlocking()
creates a newResque\JobHandler
object with the queue and content as constructor arguments to initialize the job itself, and returns it, along with control of the process, toResque\Worker\ResqueWorker->reserve()
- Queue Polling
Resque\Worker\ResqueWorker->reserve()
iterates through the queue list, callingResque\JobHandler::reserve()
with the current queue's name as the sole argument on each passResque\JobHandler::reserve()
passes the queue name on toResque\Resque::pop()
, which in turn calls Redis'lpop
with the same argument, then returns control (and the job content, if any) toResque\JobHandler::reserve()
Resque\JobHandler::reserve()
checks whether the job content is an array (as before, it should contain the job's type [class], payload [args], and ID), and aborts processing if notResque\JobHandler::reserve()
creates a newResque\JobHandler
object in the same manner as above, and also returns this object (along with control of the process) toResque\Worker\ResqueWorker->reserve()
- In either case,
Resque\Worker\ResqueWorker->reserve()
returns the newResque\JobHandler
object, along with control, up toResque\Worker\ResqueWorker::work()
; if no job is found, it simply returnsFALSE
- No Jobs
- If blocking mode is not enabled,
Resque\Worker\ResqueWorker::work()
sleeps forINTERVAL
seconds; it callsusleep()
for this, so fractional seconds are supported
- If blocking mode is not enabled,
- Job Reserved
Resque\Worker\ResqueWorker::work()
triggers abeforeFork
eventResque\Worker\ResqueWorker::work()
callsResque\Worker\ResqueWorker->workingOn()
with the newResque\JobHandler
object as its argumentResque\Worker\ResqueWorker->workingOn()
does some reference assignments to help keep track of the worker/job relationship, then updates the job status fromWAITING
toRUNNING
Resque\Worker\ResqueWorker->workingOn()
stores the newResque\JobHandler
object's payload in a Redis key associated to the worker itself (this is to prevent the job from being lost indefinitely, but does rely on that PID never being allocated on that host to a different worker process), then returns control toResque\Worker\ResqueWorker::work()
Resque\Worker\ResqueWorker::work()
forks a child process to run the actualperform()
- The next steps differ between the worker and the child, now running in separate processes:
- Worker
- The worker waits for the job process to complete
- If the exit status is not 0, the worker calls
Resque\JobHandler->fail()
with aResque\Exceptions\DirtyExitException
as its only argument. Resque\JobHandler->fail()
triggers anonFailure
eventResque\JobHandler->fail()
updates the job status fromRUNNING
toFAILED
Resque\JobHandler->fail()
callsResque\FailureHandler::create()
with the job payload, theResque\Exceptions\DirtyExitException
, the internal ID of the worker, and the queue name as argumentsResque\FailureHandler::create()
creates a new object of whatever type has been set as theResque\FailureHandler
"backend" handler; by default, this is aResque\FailureHandler_Redis
object, whose constructor simply collects the data passed intoResque\FailureHandler::create()
and pushes it into Redis in thefailed
queueResque\JobHandler->fail()
increments two failure counters in Redis: one for a total count, and one for the workerResque\JobHandler->fail()
returns control to the worker (still inResque\Worker\ResqueWorker::work()
) without a value
- Job
Resque\Job\PID
is created, registering the PID of the actual process doing the job.- The job calls
Resque\Worker\ResqueWorker->perform()
with theResque\JobHandler
as its only argument. Resque\Worker\ResqueWorker->perform()
sets up atry...catch
block so it can properly handle exceptions by marking jobs as failed (by callingResque\JobHandler->fail()
, as above)- Inside the
try...catch
,Resque\Worker\ResqueWorker->perform()
triggers anafterFork
event - Still inside the
try...catch
,Resque\Worker\ResqueWorker->perform()
callsResque\JobHandler->perform()
with no arguments Resque\JobHandler->perform()
callsResque\JobHandler->getInstance()
with no arguments- If
Resque\JobHandler->getInstance()
has already been called, it returns the existing instance; otherwise: Resque\JobHandler->getInstance()
checks that the job's class (type) exists and has aperform()
method; if not, in either case, it throws an exception which will be caught byResque\Worker\ResqueWorker->perform()
Resque\JobHandler->getInstance()
creates an instance of the job's class, and initializes it with a reference to theResque\JobHandler
itself, the job's arguments (which it gets by callingResque\JobHandler->getArguments()
, which in turn simply returns the value ofargs[0]
, or an empty array if no arguments were passed), and the queue nameResque\JobHandler->getInstance()
returns control, along with the job class instance, toResque\JobHandler->perform()
Resque\JobHandler->perform()
sets up its owntry...catch
block to handleResque\Exceptions\DoNotPerformException
exceptions; any other exceptions are passed up toResque\Worker\ResqueWorker->perform()
Resque\JobHandler->perform()
triggers abeforePerform
eventResque\JobHandler->perform()
callssetUp()
on the instance, if it existsResque\JobHandler->perform()
callsperform()
on the instanceResque\JobHandler->perform()
callstearDown()
on the instance, if it existsResque\JobHandler->perform()
triggers anafterPerform
event- The
try...catch
block ends, suppressingResque\Exceptions\DoNotPerformException
exceptions by returning control, and the valueFALSE
, toResque\Worker\ResqueWorker->perform()
; any other situation returns the valueTRUE
along with control, instead - The
try...catch
block inResque\Worker\ResqueWorker->perform()
ends Resque\Worker\ResqueWorker->perform()
updates the job status fromRUNNING
toCOMPLETE
, then returns control, with no value, to the worker (again still inResque\Worker\ResqueWorker::work()
)Resque\Job\PID()
is removed, the forked process will terminate soon cleanlyResque\Worker\ResqueWorker::work()
callsexit(0)
to terminate the job process
- SPECIAL CASE: Non-forking OS (Windows)
- Same as the job above, except it doesn't call
exit(0)
when done
- Same as the job above, except it doesn't call
Resque\Worker\ResqueWorker::work()
callsResque\Worker\ResqueWorker->doneWorking()
with no argumentsResque\Worker\ResqueWorker->doneWorking()
increments two processed counters in Redis: one for a total count, and one for the workerResque\Worker\ResqueWorker->doneWorking()
deletes the Redis key set inResque\Worker\ResqueWorker->workingOn()
, then returns control, with no value, toResque\Worker\ResqueWorker::work()
Resque\Worker\ResqueWorker::work()
returns control to the beginning of the main loop, where it will wait for the next job to become available, and start this process all over again