Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 21 additions & 0 deletions lib/OpenQA/Task/Job/Restart.pm
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ package OpenQA::Task::Job::Restart;
use Mojo::Base 'Mojolicious::Plugin', -signatures;

use Time::Seconds;
use OpenQA::Events;

sub register ($self, $app, @args) {
$app->minion->add_task(restart_job => \&_restart_job);
Expand All @@ -17,6 +18,11 @@ sub restart_delay { $ENV{OPENQA_JOB_RESTART_DELAY} // 5 }
sub restart_openqa_job ($minion_job, $openqa_job) {
my $cloned_job_or_error = $openqa_job->auto_duplicate;
my $is_ok = ref $cloned_job_or_error || $cloned_job_or_error =~ qr/(already.*clone|direct parent)/i;
if (ref $cloned_job_or_error) {
my %event_data = (id => $openqa_job->id, result => $cloned_job_or_error->{cluster_cloned}, auto => 1);
OpenQA::Events->singleton->emit_event('openqa_job_restart', data => \%event_data);
}

$minion_job->note(
ref $cloned_job_or_error
? (cluster_cloned => $cloned_job_or_error->{cluster_cloned})
Expand All @@ -33,8 +39,11 @@ sub _restart_job ($minion_job, @args) {
my $openqa_job = $app->schema->resultset('Jobs')->find($openqa_job_id);
return $minion_job->finish("Job $openqa_job_id does not exist.") unless $openqa_job;

_init_amqp_plugin($app);
# duplicate job and finish normally if no error was returned or job can not be cloned
my ($is_ok, $cloned_job_or_error) = restart_openqa_job($minion_job, $openqa_job);
_wait_for_event_publish($app);

return $minion_job->finish(ref $cloned_job_or_error ? undef : $cloned_job_or_error) if $is_ok;

# retry a certain number of times, maybe the transaction failed due to a conflict
Expand All @@ -46,4 +55,16 @@ sub _restart_job ($minion_job, @args) {
$minion_job->retry({delay => restart_delay});
}

sub _init_amqp_plugin ($app) {
return undef unless $app->config->{amqp}->{enabled};
$app->plugin('AMQP'); # Needs to be loaded again from forked process
Mojo::IOLoop->singleton->one_tick;
}

sub _wait_for_event_publish ($app) {
return undef unless $app->config->{amqp}->{enabled};
OpenQA::Events->singleton->once(amqp_handled => sub { Mojo::IOLoop->stop });
Mojo::IOLoop->start;
}

1;
20 changes: 15 additions & 5 deletions lib/OpenQA/WebAPI/Plugin/AMQP.pm
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@ sub new ($class, @args) {
sub register ($self, $app, @args) {
$self->{app} = $app;
$self->{config} = $app->config;
my $config = $self->{config}->{amqp};
$config->{enabled} = 1; # Needed for reloading the plugin later in the forked process
Mojo::IOLoop->singleton->next_tick(
sub {
# register for events
Expand Down Expand Up @@ -68,18 +70,26 @@ sub publish_amqp ($self, $topic, $event_data, $headers = {}, $remaining_attempts

$remaining_attempts //= $config->{publish_attempts};
$retry_delay //= $config->{publish_retry_delay};
$publisher->publish_p($event_data, $headers, routing_key => $topic)->then(sub { log_debug "$topic published" })
->catch(
$publisher->publish_p($event_data, $headers, routing_key => $topic)->then(
sub {
log_debug "$topic published";
OpenQA::Events->singleton->emit('amqp_handled');
}
)->catch(
sub ($error) {
my $left = looks_like_number $remaining_attempts && $remaining_attempts > 1 ? $remaining_attempts - 1 : 0;
my $delay = $retry_delay * $config->{publish_retry_delay_factor};
my ($event_id, $job_id) = ($event_data->{id} // 'none', $event_data->{job_id});
my $additional_info = $job_id ? ", job ID: $job_id" : '';
my $log_msg = "Publishing $topic failed: $error (event ID: $event_id$additional_info, $left attempts left)";
return log_error $log_msg unless $left;
my $retry_function = sub ($loop) { $self->publish_amqp($topic, $event_data, $headers, $left, $delay) };
log_info $log_msg;
Mojo::IOLoop->timer($retry_delay => $retry_function) if $left;
if ($left) {
log_info $log_msg;
Mojo::IOLoop->timer($retry_delay => $retry_function);
return;
}
OpenQA::Events->singleton->emit('amqp_handled');
return log_error $log_msg;
})->finally(sub { undef $publisher });
}

Expand Down
60 changes: 57 additions & 3 deletions t/10-jobs.t
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,11 @@ use OpenQA::Jobs::Constants;
use OpenQA::Test::Case;
use Test::MockModule 'strict';
use Test::Mojo;
use Test::Output;
use Test::Warnings qw(:report_warnings warning);
use Mojo::File 'path';
use Mojo::File qw(path tempdir);
use Mojo::JSON qw(decode_json encode_json);
use OpenQA::Test::Utils qw(perform_minion_jobs);
use OpenQA::Test::Utils qw(perform_minion_jobs mock_io_loop);
use OpenQA::Test::TimeLimit '30';

binmode(STDOUT, ':encoding(UTF-8)');
Expand Down Expand Up @@ -899,7 +900,8 @@ subtest 'job setting based retriggering' => sub {
my $finalize_job_count_before = @{$get_jobs->('finalize_job_results')};
$job->update({state => SCHEDULED, result => NONE});
$job->done(result => FAILED);
perform_minion_jobs($minion);
stdout_like { perform_minion_jobs($minion) } qr/Job \d+ duplicated as \d+/,
'check debug message from auto_duplicate';
is $jobs->count, $jobs_nr + 2, 'job retriggered as it FAILED (with retry)';
$job->update;
$job->discard_changes;
Expand Down Expand Up @@ -931,6 +933,58 @@ subtest 'job setting based retriggering' => sub {
is $lastest_job->latest_job->id, $lastest_job->id, 'found the latest job from latest job itself';
};

subtest 'AMQP event emission for job restarts within Minion tasks' => sub {
my $plugin_mock = Test::MockModule->new('OpenQA::WebAPI::Plugin::AMQP');
my $conf = "[global]\nplugins=AMQP\n[amqp]\npublish_attempts = 2\npublish_retry_delay = 0\n";
my $tempdir = tempdir;
path($ENV{OPENQA_CONFIG} = $tempdir)->make_path->child('openqa.ini')->spew($conf);
my %published;
my @event_body;
my $io_loop_mock = Test::MockModule->new('Mojo::IOLoop');
$io_loop_mock->redefine(start => sub { });
$plugin_mock->redefine(
publish_amqp => sub ($self, $topic, $data) {
$published{$topic} = $data;
Mojo::IOLoop->next_tick(sub { OpenQA::Events->singleton->emit('amqp_handled'); });
});

my $events_mock = Test::MockModule->new('OpenQA::Events');
$events_mock->redefine(
emit => sub ($self, $type, $args) {
if ($type eq 'openqa_job_restart') {
@event_body = ($type, $args);
}
$events_mock->original('emit')->($self, $type, $args);
});
# use a new app; otherwise it runs slowly, maybe trying to run none-mocked stuff
my $t = Test::Mojo->new('OpenQA::WebAPI');
my $minion = $t->app->minion;
is $t->app->config->{amqp}->{enabled}, 1, 'AMQP enabled from config file';

my %retry_settings = %settings;
$retry_settings{TEST} = 'test_amqp_restart';
$retry_settings{RETRY} = '1';
my $job = $jobs->create_from_settings(\%retry_settings);
$job->discard_changes;
my $job_id = $job->id;

%published = ();
$job->done(result => FAILED);
stdout_like { perform_minion_jobs($minion) } qr/Job \d+ duplicated as \d+/;
is scalar keys %published, 1, 'exactly one job restart event emitted';
my $event = $published{'suse.openqa.job.restart'};
is $event->{id}, $job_id, 'event contains original job ID';
is $event->{auto}, 1, 'event marked as auto restart';
my ($user_id, $connection, $type, $data) = @{$event_body[1]};
is $user_id, undef, 'user_id is undef for Minion restart';
is $connection, undef, 'connection is undef for Minion restart';
is $type, 'openqa_job_restart', 'type matches event type';
is_deeply $data, $event, 'published event equals emitted event';
ok exists $data->{result}->{$job_id}, 'old job id is in result';
$job->discard_changes;
is $job->clone_id, $data->{result}->{$job_id}, 'clone_id points to reported id';
};

subtest '"race" between status updates and stale job detection' => sub {
my $job = $jobs->create({TEST => 'test-job'});
is_deeply $job->update_status({}), {result => 0}, 'status update rejected for scheduled job';
Expand Down