Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 15 additions & 0 deletions src/Illuminate/Contracts/Queue/KeepsJobsAlive.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
<?php

namespace Illuminate\Contracts\Queue;

interface KeepsJobsAlive
{
/**
* Inform the queue driver that the given job is still being processed.
*
* @param \Illuminate\Contracts\Queue\Job $job
* @param int $seconds
* @return void
*/
public function keepAlive(Job $job, int $seconds);
}
2 changes: 2 additions & 0 deletions src/Illuminate/Queue/Console/WorkCommand.php
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ class WorkCommand extends Command
{--memory=128 : The memory limit in megabytes}
{--sleep=3 : The number of seconds to sleep when no job is available}
{--rest=0 : The number of seconds to rest between jobs}
{--keepalive=0 : The number of seconds between keepalive heartbeats for supported queue drivers}
{--timeout=60 : The number of seconds a child process can run}
{--tries=1 : The number of times to attempt a job before logging it failed}
{--json : Output the queue worker information as JSON}';
Expand Down Expand Up @@ -171,6 +172,7 @@ protected function gatherWorkerOptions()
$this->option('max-time'),
$this->option('rest'),
$this->option('stop-when-empty-for'),
$this->option('keepalive'),
);
}

Expand Down
140 changes: 111 additions & 29 deletions src/Illuminate/Queue/Worker.php
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
use Illuminate\Contracts\Events\Dispatcher;
use Illuminate\Contracts\Queue\Factory as QueueManager;
use Illuminate\Contracts\Queue\Interruptible;
use Illuminate\Contracts\Queue\KeepsJobsAlive;
use Illuminate\Database\DetectsLostConnections;
use Illuminate\Queue\Events\JobAttempted;
use Illuminate\Queue\Events\JobExceptionOccurred;
Expand Down Expand Up @@ -238,14 +239,14 @@ public function daemon($connectionName, $queue, WorkerOptions $options)
}

// First, we will attempt to get the next job off of the queue. We will also
// register the timeout handler and reset the alarm for this job so it is
// register the signal deadlines and reset the alarm for this job so it is
// not stuck in a frozen state forever. Then, we can fire off this job.
$job = $this->getNextJob(
$this->manager->connection($connectionName), $queue
);

if ($supportsAsyncSignals) {
$this->registerTimeoutHandler($job, $options);
$this->listenForJobSignals($job, $options);
}

// If the daemon should run (not in maintenance mode, etc.), then we can run
Expand All @@ -268,7 +269,7 @@ public function daemon($connectionName, $queue, WorkerOptions $options)
}

if ($supportsAsyncSignals) {
$this->resetTimeoutHandler();
$this->resetJobSignals();
}

// Finally, we will check to see if we have exceeded our memory limits or if
Expand All @@ -283,50 +284,109 @@ public function daemon($connectionName, $queue, WorkerOptions $options)
}

/**
* Register the worker timeout handler.
* Handle a timed out job.
*
* @param \Illuminate\Contracts\Queue\Job|null $job
* @param \Illuminate\Queue\WorkerOptions $options
* @return never
*/
protected function handleJobTimeout($job, WorkerOptions $options)
{
if ($job) {
$this->markJobAsFailedIfWillExceedMaxAttempts(
$job->getConnectionName(), $job, (int) $options->maxTries, $e = $this->timeoutExceededException($job)
);

$this->markJobAsFailedIfWillExceedMaxExceptions(
$job->getConnectionName(), $job, $e
);

$this->markJobAsFailedIfItShouldFailOnTimeout(
$job->getConnectionName(), $job, $e
);

$this->events->dispatch(new JobTimedOut(
$job->getConnectionName(), $job
));
}

$this->kill(static::$timedOutExitCode ?? static::EXIT_ERROR, $options, WorkerStopReason::TimedOut);
}

/**
* Inform the queue connection that the given job is still being processed.
*
* @param \Illuminate\Contracts\Queue\Job $job
* @param int $keepAlive
* @return void
*/
protected function registerTimeoutHandler($job, WorkerOptions $options)
protected function handleJobKeepAlive($job, $keepAlive)
{
// We will register a signal handler for the alarm signal so that we can kill this
// process if it is running too long because it has frozen. This uses the async
// signals supported in recent versions of PHP to accomplish it conveniently.
pcntl_signal(SIGALRM, function () use ($job, $options) {
if ($job) {
$this->markJobAsFailedIfWillExceedMaxAttempts(
$job->getConnectionName(), $job, (int) $options->maxTries, $e = $this->timeoutExceededException($job)
);
$this->manager->connection($job->getConnectionName())->keepAlive($job, $keepAlive);
}

$this->markJobAsFailedIfWillExceedMaxExceptions(
$job->getConnectionName(), $job, $e
);
/**
* Register the alarm signal handler for the given job.
*
* @param \Illuminate\Contracts\Queue\Job|null $job
* @param \Illuminate\Queue\WorkerOptions $options
* @return void
*/
protected function listenForJobSignals($job, WorkerOptions $options)
{
$now = $this->currentTime();

$this->markJobAsFailedIfItShouldFailOnTimeout(
$job->getConnectionName(), $job, $e
);
// Track the next absolute deadlines for the timeout and keepalive alarm.
$timeout = max($this->timeoutForJob($job, $options), 0);
$timeoutAt = $timeout > 0 ? $now + $timeout : null;

$this->events->dispatch(new JobTimedOut(
$job->getConnectionName(), $job
));
$keepAliveFrequency = max($this->keepAliveForJob($job, $options), 0);
$keepAliveAt = $keepAliveFrequency > 0 ? $now + $keepAliveFrequency : null;

$scheduleNextAlarm = function ($now) use ($timeoutAt, &$keepAliveAt) {
// Schedule the next alarm for whichever deadline comes first: timeout or keepalive.
$next = $keepAliveAt;

if (is_null($next) || (! is_null($timeoutAt) && $timeoutAt < $next)) {
$next = $timeoutAt;
}

if (is_null($next)) {
$next = 0;
}

$secondsUntilNext = max($next - $now, 1);

// Account for floating point drift from high-resolution timestamps without firing early.
pcntl_alarm((int) ceil($secondsUntilNext - 1e-9));
};

pcntl_signal(SIGALRM, function () use ($job, $options, $timeoutAt, $keepAliveFrequency, &$keepAliveAt, $scheduleNextAlarm) {
$now = $this->currentTime();

if (! is_null($timeoutAt) && $now >= $timeoutAt) {
$this->handleJobTimeout($job, $options);
}

if (! is_null($keepAliveAt) && $now >= $keepAliveAt) {
$this->handleJobKeepAlive($job, $keepAliveFrequency);

// Keepalive continues to run on its configured interval from the prior deadline.
$keepAliveAt += $keepAliveFrequency;
}

$this->kill(static::$timedOutExitCode ?? static::EXIT_ERROR, $options, WorkerStopReason::TimedOut);
$scheduleNextAlarm($now);
}, true);

pcntl_alarm(
max($this->timeoutForJob($job, $options), 0)
);
$scheduleNextAlarm($now);
}

/**
* Reset the worker timeout handler.
* Reset the alarm signal for the current job.
*
* @return void
*/
protected function resetTimeoutHandler()
protected function resetJobSignals()
{
pcntl_alarm(0);
}
Expand All @@ -340,7 +400,29 @@ protected function resetTimeoutHandler()
*/
protected function timeoutForJob($job, WorkerOptions $options)
{
return $job && ! is_null($job->timeout()) ? $job->timeout() : $options->timeout;
if (! $job) {
return 0;
}

return ! is_null($job->timeout()) ? $job->timeout() : $options->timeout;
}

/**
* Get the keepalive interval for the given job.
*
* @param \Illuminate\Contracts\Queue\Job|null $job
* @param \Illuminate\Queue\WorkerOptions $options
* @return int
*/
protected function keepAliveForJob($job, WorkerOptions $options)
{
if (! $job) {
return 0;
}

return $this->manager->connection($job->getConnectionName()) instanceof KeepsJobsAlive
? $options->keepAlive
: 0;
}

/**
Expand Down
10 changes: 10 additions & 0 deletions src/Illuminate/Queue/WorkerOptions.php
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,13 @@ class WorkerOptions
*/
public $timeout;

/**
* The number of seconds between keepalive heartbeats for supported queue connections.
*
* @var int
*/
public $keepAlive;

/**
* The number of seconds to wait in between polling the queue.
*
Expand Down Expand Up @@ -103,6 +110,7 @@ class WorkerOptions
* @param int $maxTime
* @param int $rest
* @param int $stopWhenEmptyFor
* @param int $keepAlive The number of seconds between keepalive heartbeats.
*/
public function __construct(
$name = 'default',
Expand All @@ -117,6 +125,7 @@ public function __construct(
$maxTime = 0,
$rest = 0,
$stopWhenEmptyFor = 0,
$keepAlive = 0,
) {
$this->name = $name;
$this->backoff = $backoff;
Expand All @@ -125,6 +134,7 @@ public function __construct(
$this->force = $force;
$this->memory = $memory;
$this->timeout = $timeout;
$this->keepAlive = $keepAlive;
$this->maxTries = $maxTries;
$this->stopWhenEmpty = $stopWhenEmpty;
$this->stopWhenEmptyFor = $stopWhenEmptyFor;
Expand Down
Loading