Skip to content

Commit 0cb134e

Browse files
authored
fix(circular-dependency): Extract listener config and remove handler resolver from replies listener (#129)
* fix(circular-dependency): Extract listener config and remove handler resolver from replies listener * fix(log): add error log details * fix(test): Fix unit tests and update gradle
1 parent 5c8738c commit 0cb134e

File tree

22 files changed

+133
-98
lines changed

22 files changed

+133
-98
lines changed

async/async-commons/src/main/java/org/reactivecommons/async/commons/HandlerResolverBuilder.java

Lines changed: 19 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,6 @@
1212
import java.util.Map;
1313
import java.util.concurrent.ConcurrentHashMap;
1414
import java.util.concurrent.ConcurrentMap;
15-
import java.util.logging.Level;
1615
import java.util.stream.Stream;
1716

1817
import static org.reactivecommons.async.api.HandlerRegistry.DEFAULT_DOMAIN;
@@ -45,43 +44,51 @@ public static HandlerResolver buildResolver(String domain,
4544
.collect(ConcurrentHashMap::new, (map, handler) -> map.put(handler.getPath(), handler),
4645
ConcurrentHashMap::putAll);
4746

48-
final ConcurrentMap<String, RegisteredEventListener<?, ?>> eventsToBind = getEventsToBind(domain, registries);
47+
final ConcurrentMap<String, RegisteredEventListener<?, ?>> eventsToBind = getEventsToBind(domain,
48+
registries);
4949

50-
final ConcurrentMap<String, RegisteredEventListener<?, ?>> eventHandlers = getEventHandlersWithDynamics(domain, registries);
50+
final ConcurrentMap<String, RegisteredEventListener<?, ?>> eventHandlers =
51+
getEventHandlersWithDynamics(domain, registries);
5152

52-
return new HandlerResolver(queryHandlers, eventHandlers, eventsToBind, eventNotificationListener, commandHandlers) {
53+
return new HandlerResolver(queryHandlers, eventHandlers, eventsToBind, eventNotificationListener,
54+
commandHandlers) {
5355
@Override
5456
@SuppressWarnings("unchecked")
5557
public <T, D> RegisteredCommandHandler<T, D> getCommandHandler(String path) {
5658
final RegisteredCommandHandler<T, D> handler = super.getCommandHandler(path);
57-
return handler != null ? handler : new RegisteredCommandHandler<>("", defaultCommandHandler, Object.class);
59+
return handler != null ? handler : new RegisteredCommandHandler<>("", defaultCommandHandler,
60+
Object.class);
5861
}
5962
};
6063
}
6164

6265

6366
final ConcurrentMap<String, RegisteredEventListener<?, ?>> eventsToBind = getEventsToBind(domain, registries);
64-
final ConcurrentMap<String, RegisteredEventListener<?, ?>> eventHandlers = getEventHandlersWithDynamics(domain, registries);
67+
final ConcurrentMap<String, RegisteredEventListener<?, ?>> eventHandlers =
68+
getEventHandlersWithDynamics(domain, registries);
6569

66-
return new HandlerResolver(new ConcurrentHashMap<>(), eventHandlers, eventsToBind, new ConcurrentHashMap<>(), new ConcurrentHashMap<>()) {
70+
return new HandlerResolver(new ConcurrentHashMap<>(), eventHandlers, eventsToBind, new ConcurrentHashMap<>(),
71+
new ConcurrentHashMap<>()) {
6772
@Override
6873
@SuppressWarnings("unchecked")
6974
public <T, D> RegisteredCommandHandler<T, D> getCommandHandler(String path) {
7075
final RegisteredCommandHandler<T, D> handler = super.getCommandHandler(path);
71-
return handler != null ? handler : new RegisteredCommandHandler<>("", defaultCommandHandler, Object.class);
76+
return handler != null ? handler : new RegisteredCommandHandler<>("", defaultCommandHandler,
77+
Object.class);
7278
}
7379
};
7480
}
7581

76-
private static ConcurrentMap<String, RegisteredEventListener<?, ?>> getEventHandlersWithDynamics(String domain, Map<String, HandlerRegistry> registries) {
82+
private static ConcurrentMap<String, RegisteredEventListener<?, ?>> getEventHandlersWithDynamics(String domain,
83+
Map<String,
84+
HandlerRegistry> registries) {
7785
// event handlers and dynamic handlers
7886
return registries
7987
.values().stream()
8088
.flatMap(r -> {
8189
if (r.getDomainEventListeners().containsKey(domain)) {
8290
return Stream.concat(r.getDomainEventListeners().get(domain).stream(), getDynamics(domain, r));
8391
}
84-
log.log(Level.WARNING, "Domain " + domain + "does not have a connection defined in your configuration and you want to listen from it");
8592
return Stream.empty();
8693
})
8794
.collect(ConcurrentHashMap::new, (map, handler) -> map.put(handler.getPath(), handler),
@@ -95,14 +102,14 @@ public <T, D> RegisteredCommandHandler<T, D> getCommandHandler(String path) {
95102
return Stream.of();
96103
}
97104

98-
private static ConcurrentMap<String, RegisteredEventListener<?, ?>> getEventsToBind(String domain, Map<String, HandlerRegistry> registries) {
105+
private static ConcurrentMap<String, RegisteredEventListener<?, ?>> getEventsToBind(String domain, Map<String,
106+
HandlerRegistry> registries) {
99107
return registries
100108
.values().stream()
101109
.flatMap(r -> {
102110
if (r.getDomainEventListeners().containsKey(domain)) {
103111
return r.getDomainEventListeners().get(domain).stream();
104112
}
105-
log.log(Level.WARNING, "Domain " + domain + "does not have a connection defined in your configuration and you want to listen from it");
106113
return Stream.empty();
107114
})
108115
.collect(ConcurrentHashMap::new, (map, handler) -> map.put(handler.getPath(), handler),

async/async-commons/src/main/java/org/reactivecommons/async/commons/utils/LoggerSubscriber.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@ public class LoggerSubscriber<T> extends BaseSubscriber<T> {
1414

1515
private final String flowName;
1616
private static final String ON_COMPLETE_MSG = "%s: ##On Complete Hook!!";
17-
private static final String ON_ERROR_MSG = "%s: ##On Error Hook!!";
17+
private static final String ON_ERROR_MSG = "%s: ##On Error Hook!! %s";
1818
private static final String ON_CANCEL_MSG = "%s: ##On Cancel Hook!!";
1919
private static final String ON_FINALLY_MSG = "%s: ##On Finally Hook! Signal type: %s";
2020

@@ -29,7 +29,7 @@ protected void hookOnComplete() {
2929

3030
@Override
3131
protected void hookOnError(Throwable throwable) {
32-
log.log(Level.SEVERE, format(ON_ERROR_MSG), throwable);
32+
log.log(Level.SEVERE, format(ON_ERROR_MSG, throwable.getMessage()), throwable);
3333
}
3434

3535
@Override

build.gradle

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@ plugins {
1515
id 'org.sonarqube' version '6.0.1.5171'
1616
id 'org.springframework.boot' version '3.4.1' apply false
1717
id 'io.github.gradle-nexus.publish-plugin' version '2.0.0'
18-
id 'co.com.bancolombia.cleanArchitecture' version '3.20.7'
18+
id 'co.com.bancolombia.cleanArchitecture' version '3.20.8'
1919
}
2020

2121
repositories {

gradle/wrapper/gradle-wrapper.properties

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
distributionBase=GRADLE_USER_HOME
22
distributionPath=wrapper/dists
3-
distributionUrl=https\://services.gradle.org/distributions/gradle-8.10.2-bin.zip
3+
distributionUrl=https\://services.gradle.org/distributions/gradle-8.11.1-bin.zip
44
networkTimeout=10000
55
validateDistributionUrl=true
66
zipStoreBase=GRADLE_USER_HOME

main.gradle

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -176,5 +176,5 @@ tasks.register('generateMergedReport', JacocoReport) {
176176
}
177177

178178
tasks.named('wrapper') {
179-
gradleVersion = '8.11'
179+
gradleVersion = '8.11.1'
180180
}

starters/async-commons-starter/src/main/java/org/reactivecommons/async/starter/broker/BrokerProvider.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@ public interface BrokerProvider<T extends GenericAsyncProps> {
1313

1414
DomainEventBus getDomainBus();
1515

16-
DirectAsyncGateway getDirectAsyncGateway(HandlerResolver resolver);
16+
DirectAsyncGateway getDirectAsyncGateway();
1717

1818
void listenDomainEvents(HandlerResolver resolver);
1919

@@ -23,7 +23,7 @@ public interface BrokerProvider<T extends GenericAsyncProps> {
2323

2424
void listenQueries(HandlerResolver resolver);
2525

26-
void listenReplies(HandlerResolver resolver);
26+
void listenReplies();
2727

2828
Mono<RCHealth> healthCheck();
2929
}

starters/async-commons-starter/src/main/java/org/reactivecommons/async/starter/config/ReactiveCommonsConfig.java

Lines changed: 3 additions & 49 deletions
Original file line numberDiff line numberDiff line change
@@ -1,19 +1,13 @@
11
package org.reactivecommons.async.starter.config;
22

3+
import com.fasterxml.jackson.databind.ObjectMapper;
34
import io.micrometer.core.instrument.MeterRegistry;
45
import io.micrometer.core.instrument.simple.SimpleMeterRegistry;
56
import lombok.RequiredArgsConstructor;
67
import lombok.extern.java.Log;
7-
import org.reactivecommons.async.api.DefaultCommandHandler;
8-
import org.reactivecommons.async.api.DefaultQueryHandler;
9-
import org.reactivecommons.async.api.HandlerRegistry;
10-
import org.reactivecommons.async.commons.HandlerResolver;
11-
import org.reactivecommons.async.commons.HandlerResolverBuilder;
128
import org.reactivecommons.async.commons.config.BrokerConfig;
139
import org.reactivecommons.async.commons.converters.json.DefaultObjectMapperSupplier;
1410
import org.reactivecommons.async.commons.converters.json.ObjectMapperSupplier;
15-
import org.reactivecommons.async.commons.ext.CustomReporter;
16-
import org.reactivecommons.async.commons.ext.DefaultCustomReporter;
1711
import org.reactivecommons.async.commons.reply.ReactiveReplyRouter;
1812
import org.reactivecommons.async.starter.broker.BrokerProvider;
1913
import org.reactivecommons.async.starter.broker.BrokerProviderFactory;
@@ -27,7 +21,6 @@
2721
import org.springframework.context.annotation.ComponentScan;
2822
import org.springframework.context.annotation.Configuration;
2923
import org.springframework.context.annotation.Import;
30-
import reactor.core.publisher.Mono;
3124

3225
import java.util.Map;
3326

@@ -66,24 +59,6 @@ public ConnectionManager buildConnectionManager(ApplicationContext context) {
6659
return connectionManager;
6760
}
6861

69-
@Bean
70-
@SuppressWarnings({"rawtypes", "unchecked"})
71-
public DomainHandlers buildHandlers(ApplicationContext context,
72-
HandlerRegistry primaryRegistry, DefaultCommandHandler<?> commandHandler) {
73-
DomainHandlers handlers = new DomainHandlers();
74-
final Map<String, HandlerRegistry> registries = context.getBeansOfType(HandlerRegistry.class);
75-
if (!registries.containsValue(primaryRegistry)) {
76-
registries.put("primaryHandlerRegistry", primaryRegistry);
77-
}
78-
final Map<String, GenericAsyncPropsDomain> props = context.getBeansOfType(GenericAsyncPropsDomain.class);
79-
props.forEach((beanName, properties) -> properties.forEach((domain, asyncProps) -> {
80-
String domainName = (String) domain;
81-
HandlerResolver resolver = HandlerResolverBuilder.buildResolver(domainName, registries, commandHandler);
82-
handlers.add(domainName, resolver);
83-
}));
84-
return handlers;
85-
}
86-
8762
@Bean
8863
@ConditionalOnMissingBean
8964
public BrokerConfig brokerConfig() {
@@ -98,29 +73,8 @@ public ObjectMapperSupplier objectMapperSupplier() {
9873

9974
@Bean
10075
@ConditionalOnMissingBean
101-
public CustomReporter reactiveCommonsCustomErrorReporter() {
102-
return new DefaultCustomReporter();
103-
}
104-
105-
@Bean
106-
@ConditionalOnMissingBean
107-
@SuppressWarnings("rawtypes")
108-
public DefaultQueryHandler defaultHandler() {
109-
return (DefaultQueryHandler<Object, Object>) command ->
110-
Mono.error(new RuntimeException("No Handler Registered"));
111-
}
112-
113-
@Bean
114-
@ConditionalOnMissingBean
115-
@SuppressWarnings("rawtypes")
116-
public DefaultCommandHandler defaultCommandHandler() {
117-
return message -> Mono.error(new RuntimeException("No Handler Registered"));
118-
}
119-
120-
@Bean
121-
@ConditionalOnMissingBean
122-
public HandlerRegistry defaultHandlerRegistry() {
123-
return HandlerRegistry.register();
76+
public ObjectMapper defaultReactiveCommonsObjectMapper(ObjectMapperSupplier supplier) {
77+
return supplier.get();
12478
}
12579

12680
@Bean
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,70 @@
1+
package org.reactivecommons.async.starter.config;
2+
3+
import lombok.RequiredArgsConstructor;
4+
import lombok.extern.java.Log;
5+
import org.reactivecommons.async.api.DefaultCommandHandler;
6+
import org.reactivecommons.async.api.DefaultQueryHandler;
7+
import org.reactivecommons.async.api.HandlerRegistry;
8+
import org.reactivecommons.async.commons.HandlerResolver;
9+
import org.reactivecommons.async.commons.HandlerResolverBuilder;
10+
import org.reactivecommons.async.commons.ext.CustomReporter;
11+
import org.reactivecommons.async.commons.ext.DefaultCustomReporter;
12+
import org.reactivecommons.async.starter.props.GenericAsyncPropsDomain;
13+
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
14+
import org.springframework.context.ApplicationContext;
15+
import org.springframework.context.annotation.Bean;
16+
import org.springframework.context.annotation.Configuration;
17+
import reactor.core.publisher.Mono;
18+
19+
import java.util.Map;
20+
21+
@Log
22+
@Configuration
23+
@RequiredArgsConstructor
24+
public class ReactiveCommonsListenersConfig {
25+
26+
@Bean
27+
@SuppressWarnings({"rawtypes", "unchecked"})
28+
public DomainHandlers buildHandlers(ApplicationContext context,
29+
HandlerRegistry primaryRegistry, DefaultCommandHandler<?> commandHandler) {
30+
DomainHandlers handlers = new DomainHandlers();
31+
final Map<String, HandlerRegistry> registries = context.getBeansOfType(HandlerRegistry.class);
32+
if (!registries.containsValue(primaryRegistry)) {
33+
registries.put("primaryHandlerRegistry", primaryRegistry);
34+
}
35+
final Map<String, GenericAsyncPropsDomain> props = context.getBeansOfType(GenericAsyncPropsDomain.class);
36+
props.forEach((beanName, properties) -> properties.forEach((domain, asyncProps) -> {
37+
String domainName = (String) domain;
38+
HandlerResolver resolver = HandlerResolverBuilder.buildResolver(domainName, registries, commandHandler);
39+
handlers.add(domainName, resolver);
40+
}));
41+
return handlers;
42+
}
43+
44+
@Bean
45+
@ConditionalOnMissingBean
46+
public CustomReporter reactiveCommonsCustomErrorReporter() {
47+
return new DefaultCustomReporter();
48+
}
49+
50+
@Bean
51+
@ConditionalOnMissingBean
52+
@SuppressWarnings("rawtypes")
53+
public DefaultQueryHandler defaultHandler() {
54+
return (DefaultQueryHandler<Object, Object>) command ->
55+
Mono.error(new RuntimeException("No Handler Registered"));
56+
}
57+
58+
@Bean
59+
@ConditionalOnMissingBean
60+
@SuppressWarnings("rawtypes")
61+
public DefaultCommandHandler defaultCommandHandler() {
62+
return message -> Mono.error(new RuntimeException("No Handler Registered"));
63+
}
64+
65+
@Bean
66+
@ConditionalOnMissingBean
67+
public HandlerRegistry defaultHandlerRegistry() {
68+
return HandlerRegistry.register();
69+
}
70+
}

starters/async-commons-starter/src/main/java/org/reactivecommons/async/starter/listeners/CommandsListenerConfig.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,15 +2,16 @@
22

33

44
import org.reactivecommons.async.commons.HandlerResolver;
5-
import org.reactivecommons.async.starter.config.ConnectionManager;
65
import org.reactivecommons.async.starter.broker.BrokerProvider;
6+
import org.reactivecommons.async.starter.config.ConnectionManager;
77
import org.reactivecommons.async.starter.config.DomainHandlers;
88
import org.reactivecommons.async.starter.config.ReactiveCommonsConfig;
9+
import org.reactivecommons.async.starter.config.ReactiveCommonsListenersConfig;
910
import org.springframework.context.annotation.Configuration;
1011
import org.springframework.context.annotation.Import;
1112

1213
@Configuration
13-
@Import(ReactiveCommonsConfig.class)
14+
@Import({ReactiveCommonsConfig.class, ReactiveCommonsListenersConfig.class})
1415
public class CommandsListenerConfig extends AbstractListenerConfig {
1516

1617
public CommandsListenerConfig(ConnectionManager manager, DomainHandlers handlers) {

starters/async-commons-starter/src/main/java/org/reactivecommons/async/starter/listeners/EventsListenerConfig.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,11 +6,12 @@
66
import org.reactivecommons.async.starter.broker.BrokerProvider;
77
import org.reactivecommons.async.starter.config.DomainHandlers;
88
import org.reactivecommons.async.starter.config.ReactiveCommonsConfig;
9+
import org.reactivecommons.async.starter.config.ReactiveCommonsListenersConfig;
910
import org.springframework.context.annotation.Configuration;
1011
import org.springframework.context.annotation.Import;
1112

1213
@Configuration
13-
@Import(ReactiveCommonsConfig.class)
14+
@Import({ReactiveCommonsConfig.class, ReactiveCommonsListenersConfig.class})
1415
public class EventsListenerConfig extends AbstractListenerConfig {
1516

1617
public EventsListenerConfig(ConnectionManager manager, DomainHandlers handlers) {

starters/async-commons-starter/src/main/java/org/reactivecommons/async/starter/listeners/NotificationEventsListenerConfig.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,11 +6,12 @@
66
import org.reactivecommons.async.starter.broker.BrokerProvider;
77
import org.reactivecommons.async.starter.config.DomainHandlers;
88
import org.reactivecommons.async.starter.config.ReactiveCommonsConfig;
9+
import org.reactivecommons.async.starter.config.ReactiveCommonsListenersConfig;
910
import org.springframework.context.annotation.Configuration;
1011
import org.springframework.context.annotation.Import;
1112

1213
@Configuration
13-
@Import(ReactiveCommonsConfig.class)
14+
@Import({ReactiveCommonsConfig.class, ReactiveCommonsListenersConfig.class})
1415
public class NotificationEventsListenerConfig extends AbstractListenerConfig {
1516

1617
public NotificationEventsListenerConfig(ConnectionManager manager, DomainHandlers handlers) {

starters/async-commons-starter/src/main/java/org/reactivecommons/async/starter/listeners/QueriesListenerConfig.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,15 +2,16 @@
22

33

44
import org.reactivecommons.async.commons.HandlerResolver;
5-
import org.reactivecommons.async.starter.config.ConnectionManager;
65
import org.reactivecommons.async.starter.broker.BrokerProvider;
6+
import org.reactivecommons.async.starter.config.ConnectionManager;
77
import org.reactivecommons.async.starter.config.DomainHandlers;
88
import org.reactivecommons.async.starter.config.ReactiveCommonsConfig;
9+
import org.reactivecommons.async.starter.config.ReactiveCommonsListenersConfig;
910
import org.springframework.context.annotation.Configuration;
1011
import org.springframework.context.annotation.Import;
1112

1213
@Configuration
13-
@Import(ReactiveCommonsConfig.class)
14+
@Import({ReactiveCommonsConfig.class, ReactiveCommonsListenersConfig.class})
1415
public class QueriesListenerConfig extends AbstractListenerConfig {
1516

1617
public QueriesListenerConfig(ConnectionManager manager, DomainHandlers handlers) {

starters/async-commons-starter/src/main/java/org/reactivecommons/async/starter/senders/DirectAsyncGatewayConfig.java

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,6 @@
44
import lombok.extern.java.Log;
55
import org.reactivecommons.async.api.DirectAsyncGateway;
66
import org.reactivecommons.async.starter.config.ConnectionManager;
7-
import org.reactivecommons.async.starter.config.DomainHandlers;
87
import org.reactivecommons.async.starter.config.ReactiveCommonsConfig;
98
import org.springframework.context.annotation.Bean;
109
import org.springframework.context.annotation.Configuration;
@@ -20,10 +19,10 @@
2019
public class DirectAsyncGatewayConfig {
2120

2221
@Bean
23-
public DirectAsyncGateway genericDirectAsyncGateway(ConnectionManager manager, DomainHandlers handlers) {
22+
public DirectAsyncGateway genericDirectAsyncGateway(ConnectionManager manager) {
2423
ConcurrentMap<String, DirectAsyncGateway> directAsyncGateways = new ConcurrentHashMap<>();
2524
manager.forDomain((domain, provider) -> directAsyncGateways.put(domain,
26-
provider.getDirectAsyncGateway(handlers.get(domain))));
25+
provider.getDirectAsyncGateway()));
2726
return new GenericDirectAsyncGateway(directAsyncGateways);
2827
}
2928
}

0 commit comments

Comments
 (0)