Skip to content

Commit 8fceb15

Browse files
committed
feat(rawhandler): Allow raw message handler to listening events
1 parent 454e18d commit 8fceb15

File tree

16 files changed

+99
-13
lines changed

16 files changed

+99
-13
lines changed

async/async-commons-api/src/main/java/org/reactivecommons/async/api/HandlerRegistry.java

Lines changed: 16 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -4,12 +4,14 @@
44
import lombok.AccessLevel;
55
import lombok.Getter;
66
import lombok.NoArgsConstructor;
7+
import org.reactivecommons.api.domain.RawMessage;
78
import org.reactivecommons.async.api.handlers.CloudCommandHandler;
89
import org.reactivecommons.async.api.handlers.CloudEventHandler;
910
import org.reactivecommons.async.api.handlers.DomainCommandHandler;
1011
import org.reactivecommons.async.api.handlers.DomainEventHandler;
1112
import org.reactivecommons.async.api.handlers.QueryHandler;
1213
import org.reactivecommons.async.api.handlers.QueryHandlerDelegate;
14+
import org.reactivecommons.async.api.handlers.RawEventHandler;
1315
import org.reactivecommons.async.api.handlers.registered.RegisteredCommandHandler;
1416
import org.reactivecommons.async.api.handlers.registered.RegisteredEventListener;
1517
import org.reactivecommons.async.api.handlers.registered.RegisteredQueryHandler;
@@ -37,7 +39,8 @@ public static HandlerRegistry register() {
3739
return instance;
3840
}
3941

40-
public <T> HandlerRegistry listenDomainEvent(String domain, String eventName, DomainEventHandler<T> handler, Class<T> eventClass) {
42+
public <T> HandlerRegistry listenDomainEvent(String domain, String eventName, DomainEventHandler<T> handler,
43+
Class<T> eventClass) {
4144
domainEventListeners.computeIfAbsent(domain, ignored -> new CopyOnWriteArrayList<>())
4245
.add(new RegisteredEventListener<>(eventName, handler, eventClass));
4346
return this;
@@ -49,6 +52,12 @@ public HandlerRegistry listenDomainCloudEvent(String domain, String eventName, C
4952
return this;
5053
}
5154

55+
public HandlerRegistry listenDomainRawEvent(String domain, String eventName, RawEventHandler handler) {
56+
domainEventListeners.computeIfAbsent(domain, ignored -> new CopyOnWriteArrayList<>())
57+
.add(new RegisteredEventListener<>(eventName, handler, RawMessage.class));
58+
return this;
59+
}
60+
5261
public <T> HandlerRegistry listenEvent(String eventName, DomainEventHandler<T> handler, Class<T> eventClass) {
5362
domainEventListeners.computeIfAbsent(DEFAULT_DOMAIN, ignored -> new CopyOnWriteArrayList<>())
5463
.add(new RegisteredEventListener<>(eventName, handler, eventClass));
@@ -61,7 +70,8 @@ public HandlerRegistry listenCloudEvent(String eventName, CloudEventHandler hand
6170
return this;
6271
}
6372

64-
public <T> HandlerRegistry listenNotificationEvent(String eventName, DomainEventHandler<T> handler, Class<T> eventClass) {
73+
public <T> HandlerRegistry listenNotificationEvent(String eventName, DomainEventHandler<T> handler,
74+
Class<T> eventClass) {
6575
eventNotificationListener.add(new RegisteredEventListener<>(eventName, handler, eventClass));
6676
return this;
6777
}
@@ -71,7 +81,8 @@ public HandlerRegistry listenNotificationCloudEvent(String eventName, CloudEvent
7181
return this;
7282
}
7383

74-
public <T> HandlerRegistry handleDynamicEvents(String eventNamePattern, DomainEventHandler<T> handler, Class<T> eventClass) {
84+
public <T> HandlerRegistry handleDynamicEvents(String eventNamePattern, DomainEventHandler<T> handler,
85+
Class<T> eventClass) {
7586
dynamicEventHandlers.add(new RegisteredEventListener<>(eventNamePattern, handler, eventClass));
7687
return this;
7788
}
@@ -102,7 +113,8 @@ public <R> HandlerRegistry serveQuery(String resource, QueryHandlerDelegate<Void
102113
}
103114

104115
public <R> HandlerRegistry serveCloudEventQuery(String resource, QueryHandler<R, CloudEvent> handler) {
105-
handlers.add(new RegisteredQueryHandler<>(resource, (ignored, message) -> handler.handle(message), CloudEvent.class));
116+
handlers.add(new RegisteredQueryHandler<>(resource, (ignored, message) -> handler.handle(message),
117+
CloudEvent.class));
106118
return this;
107119
}
108120

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
package org.reactivecommons.async.api.handlers;
2+
3+
import org.reactivecommons.api.domain.RawMessage;
4+
5+
public interface RawEventHandler<T extends RawMessage> extends EventHandler<T> {
6+
}

async/async-commons/src/main/java/org/reactivecommons/async/commons/communications/Message.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,15 @@
11
package org.reactivecommons.async.commons.communications;
22

3+
import org.reactivecommons.api.domain.RawMessage;
4+
35
import java.util.Map;
46

57
/**
68
* Simple Internal Message representation
79
*
810
* @author Daniel Bustamante Ospina
911
*/
10-
public interface Message {
12+
public interface Message extends RawMessage {
1113

1214
byte[] getBody();
1315

async/async-kafka/src/main/java/org/reactivecommons/async/kafka/KafkaDomainEventBus.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
import lombok.AllArgsConstructor;
55
import org.reactivecommons.api.domain.DomainEvent;
66
import org.reactivecommons.api.domain.DomainEventBus;
7+
import org.reactivecommons.api.domain.RawMessage;
78
import org.reactivecommons.async.kafka.communications.ReactiveMessageSender;
89
import org.reactivestreams.Publisher;
910

@@ -31,4 +32,14 @@ public Publisher<Void> emit(CloudEvent event) {
3132
public Publisher<Void> emit(String domain, CloudEvent event) {
3233
throw new UnsupportedOperationException(NOT_IMPLEMENTED_YET);
3334
}
35+
36+
@Override
37+
public Publisher<Void> emit(RawMessage event) {
38+
return sender.send(event);
39+
}
40+
41+
@Override
42+
public Publisher<Void> emit(String domain, RawMessage event) {
43+
throw new UnsupportedOperationException(NOT_IMPLEMENTED_YET);
44+
}
3445
}

async/async-kafka/src/main/java/org/reactivecommons/async/kafka/KafkaMessage.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
public class KafkaMessage implements Message {
1616
private final byte[] body;
1717
private final Properties properties;
18+
private final String type;
1819

1920
@Data
2021
public static class KafkaMessageProperties implements Properties {
@@ -30,7 +31,11 @@ public String getContentType() {
3031
}
3132

3233
public static KafkaMessage fromDelivery(ReceiverRecord<String, byte[]> record) {
33-
return new KafkaMessage(record.value(), createMessageProps(record));
34+
return fromDelivery(record, null);
35+
}
36+
37+
public static KafkaMessage fromDelivery(ReceiverRecord<String, byte[]> record, String type) {
38+
return new KafkaMessage(record.value(), createMessageProps(record), type);
3439
}
3540

3641
private static Properties createMessageProps(ReceiverRecord<String, byte[]> record) {

async/async-kafka/src/main/java/org/reactivecommons/async/kafka/converters/json/KafkaJacksonMessageConverter.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,9 @@ public KafkaJacksonMessageConverter(ObjectMapper objectMapper) {
2222

2323
@Override
2424
public Message toMessage(Object object) {
25+
if (object instanceof KafkaMessage) {
26+
return (KafkaMessage) object;
27+
}
2528
byte[] bytes;
2629
try {
2730
String jsonString = this.objectMapper.writeValueAsString(object);
@@ -30,7 +33,7 @@ public Message toMessage(Object object) {
3033
throw new MessageConversionException(FAILED_TO_CONVERT_MESSAGE_CONTENT, e);
3134
}
3235
KafkaMessageProperties props = buildProperties(object);
33-
return new KafkaMessage(bytes, props);
36+
return new KafkaMessage(bytes, props, null);
3437
}
3538

3639
private KafkaMessageProperties buildProperties(Object message) {

async/async-kafka/src/main/java/org/reactivecommons/async/kafka/listeners/GenericMessageListener.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -112,7 +112,7 @@ protected Mono<ReceiverRecord<String, byte[]>> handle(ReceiverRecord<String, byt
112112
try {
113113
final String executorPath = getExecutorPath(msj);
114114
final Function<Message, Mono<Object>> handler = getExecutor(executorPath);
115-
final Message message = KafkaMessage.fromDelivery(msj);
115+
final Message message = KafkaMessage.fromDelivery(msj, executorPath);
116116

117117
Mono<Object> flow = Mono.defer(() -> handler.apply(message))
118118
.transform(enrichPostProcess(message));

async/async-rabbit/src/main/java/org/reactivecommons/async/rabbit/RabbitDomainEventBus.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
import io.cloudevents.CloudEvent;
44
import org.reactivecommons.api.domain.DomainEvent;
55
import org.reactivecommons.api.domain.DomainEventBus;
6+
import org.reactivecommons.api.domain.RawMessage;
67
import org.reactivecommons.async.commons.config.BrokerConfig;
78
import org.reactivecommons.async.rabbit.communications.ReactiveMessageSender;
89
import org.reactivestreams.Publisher;
@@ -49,4 +50,15 @@ public Publisher<Void> emit(String domain, CloudEvent event) {
4950
throw new UnsupportedOperationException("Not implemented yet");
5051
}
5152

53+
@Override
54+
public Publisher<Void> emit(RawMessage rawEvent) {
55+
return sender.sendWithConfirm(rawEvent, exchange, rawEvent.getType(),
56+
Collections.emptyMap(), persistentEvents)
57+
.onErrorMap(err -> new RuntimeException("Event send failure: " + rawEvent.getType(), err));
58+
}
59+
60+
@Override
61+
public Publisher<Void> emit(String domain, RawMessage event) {
62+
throw new UnsupportedOperationException("Not implemented yet");
63+
}
5264
}

async/async-rabbit/src/main/java/org/reactivecommons/async/rabbit/RabbitMessage.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
public class RabbitMessage implements Message {
1212
private final byte[] body;
1313
private final Properties properties;
14+
private final String type;
1415

1516
@Data
1617
public static class RabbitMessageProperties implements Properties {
@@ -22,7 +23,11 @@ public static class RabbitMessageProperties implements Properties {
2223
}
2324

2425
public static RabbitMessage fromDelivery(Delivery delivery) {
25-
return new RabbitMessage(delivery.getBody(), createMessageProps(delivery));
26+
return fromDelivery(delivery, null);
27+
}
28+
29+
public static RabbitMessage fromDelivery(Delivery delivery, String executorPath) {
30+
return new RabbitMessage(delivery.getBody(), createMessageProps(delivery), executorPath);
2631
}
2732

2833
private static Message.Properties createMessageProps(Delivery msj) {

async/async-rabbit/src/main/java/org/reactivecommons/async/rabbit/converters/json/RabbitJacksonMessageConverter.java

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,9 @@ public RabbitJacksonMessageConverter(ObjectMapper objectMapper) {
1818

1919
@Override
2020
public Message toMessage(Object object) {
21+
if (object instanceof RabbitMessage) {
22+
return (RabbitMessage) object;
23+
}
2124
byte[] bytes;
2225
try {
2326
String jsonString = this.objectMapper.writeValueAsString(object);
@@ -29,10 +32,10 @@ public Message toMessage(Object object) {
2932
if (object instanceof CloudEvent) {
3033
props.setContentType(APPLICATION_CLOUD_EVENT_JSON);
3134
} else {
32-
props.setContentType(CONTENT_TYPE);
35+
props.setContentType(APPLICATION_JSON);
3336
}
3437
props.setContentEncoding(StandardCharsets.UTF_8.name());
3538
props.setContentLength(bytes.length);
36-
return new RabbitMessage(bytes, props);
39+
return new RabbitMessage(bytes, props, null);
3740
}
3841
}

async/async-rabbit/src/main/java/org/reactivecommons/async/rabbit/listeners/ApplicationEventListener.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
import lombok.extern.java.Log;
55
import org.reactivecommons.async.api.handlers.CloudEventHandler;
66
import org.reactivecommons.async.api.handlers.DomainEventHandler;
7+
import org.reactivecommons.async.api.handlers.RawEventHandler;
78
import org.reactivecommons.async.api.handlers.registered.RegisteredEventListener;
89
import org.reactivecommons.async.commons.DiscardNotifier;
910
import org.reactivecommons.async.commons.EventExecutor;
@@ -109,6 +110,9 @@ private <T, D> Function<Message, Object> resolveConverter(RegisteredEventListene
109110
if (registeredEventListener.getHandler() instanceof CloudEventHandler) {
110111
return messageConverter::readCloudEvent;
111112
}
113+
if (registeredEventListener.getHandler() instanceof RawEventHandler) {
114+
return message -> message;
115+
}
112116
throw new RuntimeException("Unknown handler type");
113117
}
114118
}

async/async-rabbit/src/main/java/org/reactivecommons/async/rabbit/listeners/GenericMessageListener.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -116,7 +116,7 @@ protected Mono<AcknowledgableDelivery> handle(AcknowledgableDelivery msj, Instan
116116
try {
117117
final String executorPath = getExecutorPath(msj);
118118
final Function<Message, Mono<Object>> handler = getExecutor(executorPath);
119-
final Message message = RabbitMessage.fromDelivery(msj);
119+
final Message message = RabbitMessage.fromDelivery(msj, executorPath);
120120

121121
Mono<Object> flow = defer(() -> handler.apply(message))
122122
.transform(enrichPostProcess(message));

async/async-rabbit/src/test/java/org/reactivecommons/async/helpers/TestStubs.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,6 @@ public static Message mockMessage() {
1515
properties.getHeaders().put(CORRELATION_ID, "correlation");
1616
properties.getHeaders().put(SERVED_QUERY_ID, "my-query");
1717
return new RabbitMessage("{\"id\":\"id\",\"name\":\"name\",\"date\":\"2020-10-22T17:03:26.062Z\"}".getBytes()
18-
, properties);
18+
, properties, null);
1919
}
2020
}

domain/domain-events/src/main/java/org/reactivecommons/api/domain/DomainEventBus.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,4 +9,7 @@ public interface DomainEventBus {
99

1010
Publisher<Void> emit(CloudEvent event);
1111
Publisher<Void> emit(String domain, CloudEvent event);
12+
13+
Publisher<Void> emit(RawMessage event);
14+
Publisher<Void> emit(String domain, RawMessage event);
1215
}
Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
package org.reactivecommons.api.domain;
2+
3+
public interface RawMessage {
4+
String getType();
5+
}

starters/async-commons-starter/src/main/java/org/reactivecommons/async/starter/senders/GenericDomainEventBus.java

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
import lombok.RequiredArgsConstructor;
55
import org.reactivecommons.api.domain.DomainEvent;
66
import org.reactivecommons.api.domain.DomainEventBus;
7+
import org.reactivecommons.api.domain.RawMessage;
78
import org.reactivecommons.async.starter.exceptions.InvalidConfigurationException;
89
import org.reactivestreams.Publisher;
910
import reactor.core.publisher.Mono;
@@ -44,4 +45,18 @@ public Publisher<Void> emit(String domain, CloudEvent event) {
4445
}
4546
return domainEventBus.emit(event);
4647
}
48+
49+
@Override
50+
public Publisher<Void> emit(RawMessage event) {
51+
return emit(DEFAULT_DOMAIN, event);
52+
}
53+
54+
@Override
55+
public Publisher<Void> emit(String domain, RawMessage event) {
56+
DomainEventBus domainEventBus = domainEventBuses.get(domain);
57+
if (domainEventBus == null) {
58+
return Mono.error(() -> new InvalidConfigurationException("Domain not found: " + domain));
59+
}
60+
return domainEventBus.emit(event);
61+
}
4762
}

0 commit comments

Comments
 (0)