Skip to content

Commit fb13033

Browse files
authored
chore(next): Fix kafka starter (#121)
1 parent e3803d9 commit fb13033

File tree

5 files changed

+36
-20
lines changed

5 files changed

+36
-20
lines changed

starters/async-kafka-starter/src/main/java/org/reactivecommons/async/kafka/annotations/EnableEventListeners.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22

33
import org.reactivecommons.async.kafka.config.RCKafkaConfig;
44
import org.reactivecommons.async.kafka.config.RCKafkaEventListenerConfig;
5+
import org.reactivecommons.async.kafka.config.RCKafkaHandlersConfiguration;
56
import org.springframework.context.annotation.Configuration;
67
import org.springframework.context.annotation.Import;
78

@@ -14,7 +15,7 @@
1415
@Retention(RetentionPolicy.RUNTIME)
1516
@Target({ElementType.TYPE})
1617
@Documented
17-
@Import({RCKafkaEventListenerConfig.class, RCKafkaConfig.class})
18+
@Import({RCKafkaEventListenerConfig.class, RCKafkaHandlersConfiguration.class, RCKafkaConfig.class})
1819
@Configuration
1920
public @interface EnableEventListeners {
2021
}

starters/async-kafka-starter/src/main/java/org/reactivecommons/async/kafka/annotations/EnableNotificationListener.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package org.reactivecommons.async.kafka.annotations;
22

33
import org.reactivecommons.async.kafka.config.RCKafkaConfig;
4+
import org.reactivecommons.async.kafka.config.RCKafkaHandlersConfiguration;
45
import org.reactivecommons.async.kafka.config.RCKafkaNotificationEventListenerConfig;
56
import org.springframework.context.annotation.Configuration;
67
import org.springframework.context.annotation.Import;
@@ -15,7 +16,7 @@
1516
@Retention(RetentionPolicy.RUNTIME)
1617
@Target({ElementType.TYPE})
1718
@Documented
18-
@Import({RCKafkaNotificationEventListenerConfig.class, RCKafkaConfig.class})
19+
@Import({RCKafkaNotificationEventListenerConfig.class, RCKafkaHandlersConfiguration.class, RCKafkaConfig.class})
1920
@Configuration
2021
public @interface EnableNotificationListener {
2122
}

starters/async-kafka-starter/src/main/java/org/reactivecommons/async/kafka/config/RCKafkaConfig.java

Lines changed: 0 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -70,22 +70,6 @@ public ConnectionManager kafkaConnectionManager(AsyncKafkaPropsDomain props,
7070
return connectionManager;
7171
}
7272

73-
@Bean
74-
public DomainHandlers buildHandlers(AsyncKafkaPropsDomain props, ApplicationContext context,
75-
HandlerRegistry primaryRegistry, DefaultCommandHandler<?> commandHandler) {
76-
DomainHandlers handlers = new DomainHandlers();
77-
final Map<String, HandlerRegistry> registries = context.getBeansOfType(HandlerRegistry.class);
78-
if (!registries.containsValue(primaryRegistry)) {
79-
registries.put("primaryHandlerRegistry", primaryRegistry);
80-
}
81-
props.forEach((domain, properties) -> {
82-
HandlerResolver resolver = HandlerResolverBuilder.buildResolver(domain, registries, commandHandler);
83-
handlers.add(domain, resolver);
84-
});
85-
return handlers;
86-
}
87-
88-
8973
// Sender
9074
@Bean
9175
@ConditionalOnMissingBean(DomainEventBus.class)

starters/async-kafka-starter/src/main/java/org/reactivecommons/async/kafka/config/RCKafkaEventListenerConfig.java

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,5 @@
11
package org.reactivecommons.async.kafka.config;
22

3-
import lombok.RequiredArgsConstructor;
43
import org.reactivecommons.async.commons.converters.MessageConverter;
54
import org.reactivecommons.async.commons.ext.CustomReporter;
65
import org.reactivecommons.async.kafka.config.props.AsyncKafkaProps;
@@ -14,7 +13,6 @@
1413
import static org.reactivecommons.async.api.HandlerRegistry.DEFAULT_DOMAIN;
1514

1615
@Configuration
17-
@RequiredArgsConstructor
1816
public class RCKafkaEventListenerConfig {
1917

2018
@Bean
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
package org.reactivecommons.async.kafka.config;
2+
3+
4+
import org.reactivecommons.async.api.DefaultCommandHandler;
5+
import org.reactivecommons.async.api.HandlerRegistry;
6+
import org.reactivecommons.async.commons.HandlerResolver;
7+
import org.reactivecommons.async.commons.HandlerResolverBuilder;
8+
import org.reactivecommons.async.kafka.config.props.AsyncKafkaPropsDomain;
9+
import org.springframework.context.ApplicationContext;
10+
import org.springframework.context.annotation.Bean;
11+
import org.springframework.context.annotation.Configuration;
12+
13+
import java.util.Map;
14+
15+
@Configuration
16+
public class RCKafkaHandlersConfiguration {
17+
18+
@Bean
19+
public DomainHandlers buildHandlers(AsyncKafkaPropsDomain props, ApplicationContext context,
20+
HandlerRegistry primaryRegistry, DefaultCommandHandler<?> commandHandler) {
21+
DomainHandlers handlers = new DomainHandlers();
22+
final Map<String, HandlerRegistry> registries = context.getBeansOfType(HandlerRegistry.class);
23+
if (!registries.containsValue(primaryRegistry)) {
24+
registries.put("primaryHandlerRegistry", primaryRegistry);
25+
}
26+
props.forEach((domain, properties) -> {
27+
HandlerResolver resolver = HandlerResolverBuilder.buildResolver(domain, registries, commandHandler);
28+
handlers.add(domain, resolver);
29+
});
30+
return handlers;
31+
}
32+
}

0 commit comments

Comments
 (0)