Skip to content

Commit 7711063

Browse files
committed
Use AMQP outcomes in publisher callback
1 parent c17a126 commit 7711063

File tree

12 files changed

+106
-34
lines changed

12 files changed

+106
-34
lines changed

src/docs/asciidoc/usage.adoc

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -132,7 +132,8 @@ The following metrics are recorded:
132132
* the number of open consumers
133133
* the total number of published messages
134134
* the total number of accepted published messages
135-
* the total number of failed published messages
135+
* the total number of rejected published messages
136+
* the total number of released published messages
136137
* the total number of consumed messages
137138
* the total number of accepted consumed messages
138139
* the total number of requeued consumed messages
@@ -167,9 +168,12 @@ rabbitmq_amqp_published_total 2.0
167168
# HELP rabbitmq_amqp_published_accepted_total
168169
# TYPE rabbitmq_amqp_published_accepted_total counter
169170
rabbitmq_amqp_published_accepted_total 1.0
170-
# HELP rabbitmq_amqp_published_failed_total
171-
# TYPE rabbitmq_amqp_published_failed_total counter
172-
rabbitmq_amqp_published_failed_total 1.0
171+
# HELP rabbitmq_amqp_published_rejected_total
172+
# TYPE rabbitmq_amqp_published_rejected_total counter
173+
rabbitmq_amqp_published_rejected_total 1.0
174+
# HELP rabbitmq_amqp_published_released_total
175+
# TYPE rabbitmq_amqp_published_released_total counter
176+
rabbitmq_amqp_published_released_total 1.0
173177
# HELP rabbitmq_amqp_publishers
174178
# TYPE rabbitmq_amqp_publishers gauge
175179
rabbitmq_amqp_publishers 1.0

src/main/java/com/rabbitmq/client/amqp/Publisher.java

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -102,7 +102,17 @@ interface Context {
102102
enum Status {
103103
/** The message has been accepted by the broker. */
104104
ACCEPTED,
105-
/** The broker could not handle the message properly. */
106-
FAILED
105+
/**
106+
* At least one queue the message was routed to rejected the message. This happens when the
107+
* queue length is exceeded and the queue's overflow behaviour is set to reject-publish or when
108+
* a target classic queue is unavailable.
109+
*/
110+
REJECTED,
111+
/**
112+
* The broker could not route the message to any queue.
113+
*
114+
* <p>This is likely to be due to a topology misconfiguration.
115+
*/
116+
RELEASED
107117
}
108118
}

src/main/java/com/rabbitmq/client/amqp/impl/AmqpPublisher.java

Lines changed: 28 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -113,21 +113,42 @@ public void publish(Message message, Callback callback) {
113113
try {
114114
// FIXME set a timeout for publishing settlement
115115
tracker.settlementFuture().get();
116-
status =
117-
tracker.remoteState() == DeliveryState.accepted() ? Status.ACCEPTED : Status.FAILED;
116+
status = mapDeliveryState(tracker.remoteState());
118117
} catch (InterruptedException | ExecutionException e) {
119-
status = Status.FAILED;
118+
status = Status.REJECTED;
120119
}
121120
DefaultContext defaultContext = new DefaultContext(message, status);
122-
this.metricsCollector.publishDisposition(
123-
status == Status.ACCEPTED
124-
? MetricsCollector.PublishDisposition.ACCEPTED
125-
: MetricsCollector.PublishDisposition.FAILED);
121+
this.metricsCollector.publishDisposition(mapToPublishDisposition(status));
126122
callback.handle(defaultContext);
127123
});
128124
this.metricsCollector.publish();
129125
}
130126

127+
private Status mapDeliveryState(DeliveryState in) {
128+
if (in.isAccepted()) {
129+
return Status.ACCEPTED;
130+
} else if (in.getType() == DeliveryState.Type.REJECTED) {
131+
return Status.REJECTED;
132+
} else if (in.getType() == DeliveryState.Type.RELEASED) {
133+
return Status.RELEASED;
134+
} else {
135+
LOGGER.warn("Delivery state not supported: " + in.getType());
136+
throw new IllegalStateException("This delivery state is not supported: " + in.getType());
137+
}
138+
}
139+
140+
private static MetricsCollector.PublishDisposition mapToPublishDisposition(Status status) {
141+
if (status == Status.ACCEPTED) {
142+
return MetricsCollector.PublishDisposition.ACCEPTED;
143+
} else if (status == Status.REJECTED) {
144+
return MetricsCollector.PublishDisposition.REJECTED;
145+
} else if (status == Status.RELEASED) {
146+
return MetricsCollector.PublishDisposition.RELEASED;
147+
} else {
148+
return null;
149+
}
150+
}
151+
131152
void recoverAfterConnectionFailure() {
132153
this.connectionInfo = new Utils.ObservationConnectionInfo(this.connection.connectionAddress());
133154
this.sender =

src/main/java/com/rabbitmq/client/amqp/metrics/MetricsCollector.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,8 @@ public interface MetricsCollector {
4141

4242
enum PublishDisposition {
4343
ACCEPTED,
44-
FAILED
44+
REJECTED,
45+
RELEASED
4546
}
4647

4748
enum ConsumeDisposition {

src/main/java/com/rabbitmq/client/amqp/metrics/MicrometerMetricsCollector.java

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ public class MicrometerMetricsCollector implements MetricsCollector {
2929
private final AtomicLong connections;
3030
private final AtomicLong publishers;
3131
private final AtomicLong consumers;
32-
private final Counter publish, publishAccepted, publishFailed;
32+
private final Counter publish, publishAccepted, publishRejected, publishReleased;
3333
private final Counter consume, consumeAccepted, consumeRequeued, consumeDiscarded;
3434

3535
public MicrometerMetricsCollector(MeterRegistry registry) {
@@ -52,7 +52,8 @@ public MicrometerMetricsCollector(
5252
this.consumers = registry.gauge(prefix + ".consumers", tags, new AtomicLong(0));
5353
this.publish = registry.counter(prefix + ".published", tags);
5454
this.publishAccepted = registry.counter(prefix + ".published_accepted", tags);
55-
this.publishFailed = registry.counter(prefix + ".published_failed", tags);
55+
this.publishRejected = registry.counter(prefix + ".published_rejected", tags);
56+
this.publishReleased = registry.counter(prefix + ".published_released", tags);
5657
this.consume = registry.counter(prefix + ".consumed", tags);
5758
this.consumeAccepted = registry.counter(prefix + ".consumed_accepted", tags);
5859
this.consumeRequeued = registry.counter(prefix + ".consumed_requeued", tags);
@@ -100,8 +101,11 @@ public void publishDisposition(PublishDisposition disposition) {
100101
case ACCEPTED:
101102
this.publishAccepted.increment();
102103
break;
103-
case FAILED:
104-
this.publishFailed.increment();
104+
case REJECTED:
105+
this.publishRejected.increment();
106+
break;
107+
case RELEASED:
108+
this.publishReleased.increment();
105109
break;
106110
default:
107111
break;

src/main/qpid/org/apache/qpid/protonj2/client/impl/ClientDeliveryState.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -217,7 +217,7 @@ public ClientRejected(String condition, String description, Map<String, Object>
217217

218218
@Override
219219
public Type getType() {
220-
return Type.RELEASED;
220+
return Type.REJECTED;
221221
}
222222

223223
@Override

src/test/java/com/rabbitmq/client/amqp/impl/AddressFormatTest.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,7 @@ void exchangeKeyInAddress(TestInfo info) {
4747
publisher.publish(
4848
publisher.message(),
4949
ctx -> {
50-
if (ctx.status() == Publisher.Status.FAILED) {
50+
if (ctx.status() == Publisher.Status.RELEASED) {
5151
failedLatch.countDown();
5252
}
5353
});
@@ -88,7 +88,7 @@ void exchangeInAddress(TestInfo info) {
8888
publisher.publish(
8989
publisher.message(),
9090
ctx -> {
91-
if (ctx.status() == Publisher.Status.FAILED) {
91+
if (ctx.status() == Publisher.Status.RELEASED) {
9292
failedLatch.countDown();
9393
}
9494
});
@@ -158,14 +158,14 @@ void exchangeKeyInToField(TestInfo info) {
158158
publisher.publish(
159159
publisher.message().toAddress().exchange(e).message(),
160160
ctx -> {
161-
if (ctx.status() == Publisher.Status.FAILED) {
161+
if (ctx.status() == Publisher.Status.RELEASED) {
162162
failedLatch.countDown();
163163
}
164164
});
165165
publisher.publish(
166166
publisher.message().toAddress().exchange(e).key("foo").message(),
167167
ctx -> {
168-
if (ctx.status() == Publisher.Status.FAILED) {
168+
if (ctx.status() == Publisher.Status.RELEASED) {
169169
failedLatch.countDown();
170170
}
171171
});

src/test/java/com/rabbitmq/client/amqp/impl/AmqpTest.java

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -667,6 +667,33 @@ void declareQueueWithUnsupportedArgument() {
667667
});
668668
}
669669

670+
@Test
671+
void publishedMessageShouldBeRejectedWhenQueueLimitIsReached(TestInfo info) {
672+
Management management = connection.management();
673+
String q = TestUtils.name(info);
674+
int maxLength = 10;
675+
try {
676+
management
677+
.queue(q)
678+
.maxLength(maxLength)
679+
.overflowStrategy(Management.OverFlowStrategy.REJECT_PUBLISH)
680+
.declare();
681+
CountDownLatch rejectedLatch = new CountDownLatch(1);
682+
Publisher.Callback callback =
683+
context -> {
684+
if (context.status() == Publisher.Status.REJECTED) {
685+
rejectedLatch.countDown();
686+
}
687+
};
688+
Publisher publisher = connection.publisherBuilder().queue(q).build();
689+
IntStream.range(0, maxLength + 1)
690+
.forEach(ignored -> publisher.publish(publisher.message(), callback));
691+
Assertions.assertThat(rejectedLatch).completes();
692+
} finally {
693+
management.queueDeletion().delete(q);
694+
}
695+
}
696+
670697
private static String uuid() {
671698
return UUID.randomUUID().toString();
672699
}

src/test/java/com/rabbitmq/client/amqp/impl/MetricsCollectorTest.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@
1818
package com.rabbitmq.client.amqp.impl;
1919

2020
import static com.rabbitmq.client.amqp.metrics.MetricsCollector.ConsumeDisposition.*;
21-
import static com.rabbitmq.client.amqp.metrics.MetricsCollector.PublishDisposition.FAILED;
21+
import static com.rabbitmq.client.amqp.metrics.MetricsCollector.PublishDisposition.RELEASED;
2222
import static java.lang.String.format;
2323
import static org.mockito.ArgumentMatchers.any;
2424
import static org.mockito.Mockito.*;
@@ -113,8 +113,8 @@ void metricsShouldBeCollected() {
113113
disposed(disposed));
114114
verify(metricsCollector, times(2)).publish();
115115
Assertions.assertThat(disposed).completes();
116-
// the last message could not be routed, so its disposition state is failed
117-
verify(metricsCollector, times(1)).publishDisposition(FAILED);
116+
// the last message could not be routed, so its disposition state is RELEASED
117+
verify(metricsCollector, times(1)).publishDisposition(RELEASED);
118118
verify(metricsCollector, times(2)).publishDisposition(any());
119119

120120
verify(metricsCollector, never()).consume();

src/test/java/com/rabbitmq/client/amqp/impl/ResourceListenerTest.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717
1818
package com.rabbitmq.client.amqp.impl;
1919

20-
import static com.rabbitmq.client.amqp.Publisher.Status.FAILED;
20+
import static com.rabbitmq.client.amqp.Publisher.Status.RELEASED;
2121
import static org.assertj.core.api.Assertions.anyOf;
2222
import static org.assertj.core.api.Assertions.fail;
2323

@@ -95,7 +95,7 @@ void publisherIsClosedOnExchangeDeletion(boolean toExchange, TestInfo info)
9595
.is(
9696
anyOf(
9797
new Condition<>(s -> outboundMessageStatus.isEmpty(), "no status"),
98-
new Condition<>(s -> outboundMessageStatus.contains(FAILED), "only failed")));
98+
new Condition<>(s -> outboundMessageStatus.contains(RELEASED), "only released")));
9999
// Assertions.assertThat(outboundMessageStatus).containsOnly(Publisher.Status.FAILED);
100100
Assertions.assertThat(closedCause.get()).isNotNull().isInstanceOf(AmqpException.class);
101101
}

0 commit comments

Comments
 (0)