Skip to content

Commit c3e0bef

Browse files
perlpunkd3flex
authored andcommitted
Enable gru tasks to emit AMQP messages
Start Mojo::IOLoop and let it end when the AMQP plugin has finished (or failed) publishing the event
1 parent b060060 commit c3e0bef

File tree

2 files changed

+34
-5
lines changed

2 files changed

+34
-5
lines changed

lib/OpenQA/Task/Job/Restart.pm

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,8 +40,11 @@ sub _restart_job ($minion_job, @args) {
4040
my $openqa_job = $app->schema->resultset('Jobs')->find($openqa_job_id);
4141
return $minion_job->finish("Job $openqa_job_id does not exist.") unless $openqa_job;
4242

43+
_init_amqp_plugin($app);
4344
# duplicate job and finish normally if no error was returned or job can not be cloned
4445
my ($is_ok, $cloned_job_or_error) = restart_openqa_job($minion_job, $openqa_job);
46+
_wait_for_event_publish($app);
47+
4548
return $minion_job->finish(ref $cloned_job_or_error ? undef : $cloned_job_or_error) if $is_ok;
4649

4750
# retry a certain number of times, maybe the transaction failed due to a conflict
@@ -53,4 +56,20 @@ sub _restart_job ($minion_job, @args) {
5356
$minion_job->retry({delay => restart_delay});
5457
}
5558

59+
sub _init_amqp_plugin ($app) {
60+
return undef unless $app->config->{amqp}->{enabled};
61+
$app->plugin('AMQP'); # Needs to be loaded again from forked process
62+
Mojo::IOLoop->singleton->one_tick;
63+
}
64+
65+
sub _wait_for_event_publish ($app) {
66+
return undef unless $app->config->{amqp}->{enabled};
67+
OpenQA::Events->singleton->once(
68+
'amqp_handled',
69+
sub {
70+
Mojo::IOLoop->singleton->stop;
71+
});
72+
Mojo::IOLoop->singleton->start;
73+
}
74+
5675
1;

lib/OpenQA/WebAPI/Plugin/AMQP.pm

Lines changed: 15 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,8 @@ sub new ($class, @args) {
2828
sub register ($self, $app, @args) {
2929
$self->{app} = $app;
3030
$self->{config} = $app->config;
31+
my $config = $self->{config}->{amqp};
32+
$config->{enabled} = 1; # Needed for reloading the plugin later in the forked process
3133
Mojo::IOLoop->singleton->next_tick(
3234
sub {
3335
# register for events
@@ -68,18 +70,26 @@ sub publish_amqp ($self, $topic, $event_data, $headers = {}, $remaining_attempts
6870

6971
$remaining_attempts //= $config->{publish_attempts};
7072
$retry_delay //= $config->{publish_retry_delay};
71-
$publisher->publish_p($event_data, $headers, routing_key => $topic)->then(sub { log_debug "$topic published" })
72-
->catch(
73+
$publisher->publish_p($event_data, $headers, routing_key => $topic)->then(
74+
sub {
75+
log_debug "$topic published";
76+
OpenQA::Events->singleton->emit('amqp_handled');
77+
}
78+
)->catch(
7379
sub ($error) {
7480
my $left = looks_like_number $remaining_attempts && $remaining_attempts > 1 ? $remaining_attempts - 1 : 0;
7581
my $delay = $retry_delay * $config->{publish_retry_delay_factor};
7682
my ($event_id, $job_id) = ($event_data->{id} // 'none', $event_data->{job_id});
7783
my $additional_info = $job_id ? ", job ID: $job_id" : '';
7884
my $log_msg = "Publishing $topic failed: $error (event ID: $event_id$additional_info, $left attempts left)";
79-
return log_error $log_msg unless $left;
8085
my $retry_function = sub ($loop) { $self->publish_amqp($topic, $event_data, $headers, $left, $delay) };
81-
log_info $log_msg;
82-
Mojo::IOLoop->timer($retry_delay => $retry_function) if $left;
86+
if ($left) {
87+
log_info $log_msg;
88+
Mojo::IOLoop->timer($retry_delay => $retry_function);
89+
return;
90+
}
91+
OpenQA::Events->singleton->emit('amqp_handled');
92+
return log_error $log_msg;
8393
})->finally(sub { undef $publisher });
8494
}
8595

0 commit comments

Comments
 (0)