Skip to content

Commit d8d2f19

Browse files
Yannic92thjaeckle
authored andcommitted
Fix timeout of acknowledgement aggregator
* Before this the ReceiveTimeout did only play a role if only one acknowledgement was requested or non of multiple acknowledgements were deilvered. Because after receiving the first acknowledgement ReceiveTimeout will be canceled and therefore the second acknowledgement could take much longer Signed-off-by: Yannic Klem <[email protected]>
1 parent c11f9d4 commit d8d2f19

File tree

2 files changed

+56
-9
lines changed

2 files changed

+56
-9
lines changed

internal/models/acks/src/main/java/org/eclipse/ditto/internal/models/acks/AcknowledgementAggregatorActor.java

+9-6
Original file line numberDiff line numberDiff line change
@@ -48,9 +48,8 @@
4848
import org.eclipse.ditto.things.model.signals.commands.ThingCommandResponse;
4949
import org.eclipse.ditto.things.model.signals.commands.ThingErrorResponse;
5050

51-
import akka.actor.AbstractActor;
51+
import akka.actor.AbstractActorWithTimers;
5252
import akka.actor.Props;
53-
import akka.actor.ReceiveTimeout;
5453

5554
/**
5655
* Actor which is created for an {@code ThingModifyCommand} containing {@code AcknowledgementRequests} responsible for
@@ -59,7 +58,7 @@
5958
*
6059
* @since 1.1.0
6160
*/
62-
public final class AcknowledgementAggregatorActor extends AbstractActor {
61+
public final class AcknowledgementAggregatorActor extends AbstractActorWithTimers {
6362

6463
private final DittoDiagnosticLoggingAdapter log = DittoLoggerFactory.getDiagnosticLoggingAdapter(this);
6564

@@ -84,7 +83,7 @@ private AcknowledgementAggregatorActor(final EntityId entityId,
8483
);
8584

8685
timeout = getTimeout(requestCommandHeaders, maxTimeout);
87-
getContext().setReceiveTimeout(timeout);
86+
timers().startSingleTimer(Control.WAITING_FOR_ACKS_TIMED_OUT, Control.WAITING_FOR_ACKS_TIMED_OUT, timeout);
8887

8988
final Set<AcknowledgementRequest> acknowledgementRequests = requestCommandHeaders.getAcknowledgementRequests();
9089
ackregator = AcknowledgementAggregator.getInstance(entityId, correlationId, timeout, headerTranslator);
@@ -146,7 +145,7 @@ public Receive createReceive() {
146145
.match(Acknowledgement.class, this::handleAcknowledgement)
147146
.match(Acknowledgements.class, this::handleAcknowledgements)
148147
.match(DittoRuntimeException.class, this::handleDittoRuntimeException)
149-
.match(ReceiveTimeout.class, this::handleReceiveTimeout)
148+
.match(Control.class, Control.WAITING_FOR_ACKS_TIMED_OUT::equals, this::handleReceiveTimeout)
150149
.matchAny(m -> log.warning("Received unexpected message: <{}>", m))
151150
.build();
152151
}
@@ -218,7 +217,7 @@ private static Optional<JsonValue> getPayload(final CommandResponse<?> response)
218217
return result;
219218
}
220219

221-
private void handleReceiveTimeout(final ReceiveTimeout receiveTimeout) {
220+
private void handleReceiveTimeout(final Control receiveTimeout) {
222221
log.withCorrelationId(correlationId).info("Timed out waiting for all requested acknowledgements, " +
223222
"completing Acknowledgements with timeouts...");
224223
completeAcknowledgements(null);
@@ -321,4 +320,8 @@ private static Duration getTimeout(final DittoHeaders headers, final Duration ma
321320
.orElse(maxTimeout);
322321
}
323322

323+
private enum Control {
324+
WAITING_FOR_ACKS_TIMED_OUT;
325+
}
326+
324327
}

internal/models/acks/src/test/java/org/eclipse/ditto/internal/models/acks/AcknowledgementAggregatorActorTest.java

+47-3
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
import static org.assertj.core.api.Assertions.assertThat;
1616

1717
import java.time.Duration;
18+
import java.util.concurrent.TimeUnit;
1819
import java.util.function.Consumer;
1920

2021
import org.eclipse.ditto.base.model.acks.AcknowledgementLabel;
@@ -23,12 +24,12 @@
2324
import org.eclipse.ditto.base.model.common.ResponseType;
2425
import org.eclipse.ditto.base.model.headers.DittoHeaderDefinition;
2526
import org.eclipse.ditto.base.model.headers.DittoHeaders;
26-
import org.eclipse.ditto.internal.models.acks.config.DefaultAcknowledgementConfig;
27-
import org.eclipse.ditto.things.model.ThingId;
28-
import org.eclipse.ditto.protocol.HeaderTranslator;
2927
import org.eclipse.ditto.base.model.signals.acks.Acknowledgement;
3028
import org.eclipse.ditto.base.model.signals.acks.Acknowledgements;
3129
import org.eclipse.ditto.base.model.signals.commands.exceptions.GatewayCommandTimeoutException;
30+
import org.eclipse.ditto.internal.models.acks.config.DefaultAcknowledgementConfig;
31+
import org.eclipse.ditto.protocol.HeaderTranslator;
32+
import org.eclipse.ditto.things.model.ThingId;
3233
import org.eclipse.ditto.things.model.signals.commands.ThingErrorResponse;
3334
import org.eclipse.ditto.things.model.signals.commands.modify.DeleteThing;
3435
import org.eclipse.ditto.things.model.signals.commands.modify.DeleteThingResponse;
@@ -188,6 +189,49 @@ public void keepCommandHeaders() {
188189
.build()
189190
);
190191
assertThat(acks.getSize()).isEqualTo(2);
192+
assertThat(acks.getAcknowledgement(label1).map(Acknowledgement::getHttpStatus)).contains(
193+
HttpStatus.UNAUTHORIZED);
194+
assertThat(acks.getAcknowledgement(label2).map(Acknowledgement::getHttpStatus)).contains(
195+
HttpStatus.PAYMENT_REQUIRED);
196+
}};
197+
}
198+
199+
@Test
200+
public void awaitsOnlyUntilTimeout() throws InterruptedException {
201+
new TestKit(actorSystem) {{
202+
// GIVEN
203+
final Duration timeout = Duration.ofSeconds(5);
204+
final String tag = "tag";
205+
final String correlationId = "awaitsOnlyUntilTimeout";
206+
final ThingId thingId = ThingId.of("thing:id");
207+
final AcknowledgementLabel label1 = AcknowledgementLabel.of("ack1");
208+
final AcknowledgementLabel label2 = AcknowledgementLabel.of("ack2");
209+
final ThingModifyCommand<?> command = DeleteThing.of(thingId, DittoHeaders.newBuilder()
210+
.correlationId(correlationId)
211+
.timeout(timeout)
212+
.acknowledgementRequest(AcknowledgementRequest.of(label1), AcknowledgementRequest.of(label2))
213+
.build());
214+
final ActorRef underTest = childActorOf(getAcknowledgementAggregatorProps(command, this));
215+
216+
// WHEN
217+
final Acknowledgement ack1 = Acknowledgement.of(label1, thingId, HttpStatus.UNAUTHORIZED,
218+
DittoHeaders.newBuilder().correlationId(correlationId).putHeader(tag, label1.toString()).build());
219+
final Acknowledgement ack2 = Acknowledgement.of(label2, thingId, HttpStatus.PAYMENT_REQUIRED,
220+
DittoHeaders.newBuilder().correlationId(correlationId).putHeader(tag, label2.toString()).build());
221+
TimeUnit.SECONDS.sleep(
222+
timeout.toSeconds() / 2 + 1); // Wait more than half the time before sending first ack
223+
underTest.tell(ack1, ActorRef.noSender());
224+
TimeUnit.SECONDS.sleep(timeout.toSeconds() / 2 +
225+
1); // Wait more than half the time before sending second ack. This should not be taken into account.
226+
underTest.tell(ack2, ActorRef.noSender());
227+
228+
// THEN
229+
final Acknowledgements acks = expectMsgClass(Acknowledgements.class);
230+
assertThat(acks.getSize()).isEqualTo(2);
231+
assertThat(acks.getAcknowledgement(label1).map(Acknowledgement::getHttpStatus)).contains(
232+
HttpStatus.UNAUTHORIZED);
233+
assertThat(acks.getAcknowledgement(label2).map(Acknowledgement::getHttpStatus)).contains(
234+
HttpStatus.REQUEST_TIMEOUT);
191235
}};
192236
}
193237

0 commit comments

Comments
 (0)