Skip to content

chore(next): Create shared starter #124

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 12 commits into from
Oct 11, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 4 additions & 5 deletions .github/workflows/main.yml
Original file line number Diff line number Diff line change
Expand Up @@ -40,14 +40,13 @@ jobs:
distribution: temurin
java-version: 17
- name: Execute build test jacocoTestReport and sonar analysis
if: endsWith(github.REF, '/master') == true
if: endsWith(github.REF, '/master') == true || github.event.pull_request.head.repo.fork == false
env:
GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}
SONAR_TOKEN: ${{ secrets.SONAR_TOKEN }}
run: ./gradlew build test jacocoTestReport sonar --refresh-dependencies --no-daemon --continue -Denv.ci=true
run: ./gradlew clean build generateMergedReport sonar --refresh-dependencies --no-daemon --continue -Denv.ci=true
- name: Execute build test jacocoTestReport pull request
if: endsWith(github.REF, '/merge') == true
if: github.event.pull_request.head.repo.fork == true
env:
GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}
SONAR_TOKEN: ${{ secrets.SONAR_TOKEN }}
run: ./gradlew build test jacocoTestReport --refresh-dependencies --no-daemon --continue -Denv.ci=true
run: ./gradlew clean build generateMergedReport --refresh-dependencies --no-daemon --continue -Denv.ci=true
4 changes: 2 additions & 2 deletions async/async-commons/async-commons.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,8 @@ dependencies {
compileOnly 'io.projectreactor:reactor-core'
api 'com.fasterxml.jackson.core:jackson-databind'
api 'com.fasterxml.jackson.datatype:jackson-datatype-jsr310'
implementation 'commons-io:commons-io:2.16.1'
implementation 'commons-io:commons-io:2.17.0'
implementation 'io.cloudevents:cloudevents-json-jackson:4.0.1'

testImplementation 'io.projectreactor:reactor-test'
}
}
Original file line number Diff line number Diff line change
@@ -1,8 +1,11 @@
package org.reactivecommons.async.commons.config;

import lombok.Getter;

import java.time.Duration;
import java.util.UUID;

@Getter
public class BrokerConfig {
private final String routingKey = UUID.randomUUID().toString().replaceAll("-", "");
private final boolean persistentQueries;
Expand All @@ -24,24 +27,4 @@ public BrokerConfig(boolean persistentQueries, boolean persistentCommands, boole
this.replyTimeout = replyTimeout;
}

public boolean isPersistentQueries() {
return persistentQueries;
}

public boolean isPersistentCommands() {
return persistentCommands;
}

public boolean isPersistentEvents() {
return persistentEvents;
}

public Duration getReplyTimeout() {
return replyTimeout;
}

public String getRoutingKey() {
return routingKey;
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
package org.reactivecommons.async.kafka;

import io.cloudevents.CloudEvent;
import org.reactivecommons.api.domain.Command;
import org.reactivecommons.async.api.AsyncQuery;
import org.reactivecommons.async.api.DirectAsyncGateway;
import org.reactivecommons.async.api.From;
import reactor.core.publisher.Mono;

public class KafkaDirectAsyncGateway implements DirectAsyncGateway {

public static final String NOT_IMPLEMENTED_YET = "Not implemented yet";

@Override
public <T> Mono<Void> sendCommand(Command<T> command, String targetName) {
throw new UnsupportedOperationException(NOT_IMPLEMENTED_YET);
}

@Override
public <T> Mono<Void> sendCommand(Command<T> command, String targetName, long delayMillis) {
throw new UnsupportedOperationException(NOT_IMPLEMENTED_YET);
}

@Override
public <T> Mono<Void> sendCommand(Command<T> command, String targetName, String domain) {
throw new UnsupportedOperationException(NOT_IMPLEMENTED_YET);
}

@Override
public <T> Mono<Void> sendCommand(Command<T> command, String targetName, long delayMillis, String domain) {
throw new UnsupportedOperationException(NOT_IMPLEMENTED_YET);
}

@Override
public Mono<Void> sendCommand(CloudEvent command, String targetName) {
throw new UnsupportedOperationException(NOT_IMPLEMENTED_YET);
}

@Override
public Mono<Void> sendCommand(CloudEvent command, String targetName, long delayMillis) {
throw new UnsupportedOperationException(NOT_IMPLEMENTED_YET);
}

@Override
public Mono<Void> sendCommand(CloudEvent command, String targetName, String domain) {
throw new UnsupportedOperationException(NOT_IMPLEMENTED_YET);
}

@Override
public Mono<Void> sendCommand(CloudEvent command, String targetName, long delayMillis, String domain) {
throw new UnsupportedOperationException(NOT_IMPLEMENTED_YET);
}

@Override
public <T, R> Mono<R> requestReply(AsyncQuery<T> query, String targetName, Class<R> type) {
throw new UnsupportedOperationException(NOT_IMPLEMENTED_YET);
}

@Override
public <T, R> Mono<R> requestReply(AsyncQuery<T> query, String targetName, Class<R> type, String domain) {
throw new UnsupportedOperationException(NOT_IMPLEMENTED_YET);
}

@Override
public <R extends CloudEvent> Mono<R> requestReply(CloudEvent query, String targetName, Class<R> type) {
throw new UnsupportedOperationException(NOT_IMPLEMENTED_YET);
}

@Override
public <R extends CloudEvent> Mono<R> requestReply(CloudEvent query, String targetName, Class<R> type, String domain) {
throw new UnsupportedOperationException(NOT_IMPLEMENTED_YET);
}

@Override
public <T> Mono<Void> reply(T response, From from) {
throw new UnsupportedOperationException(NOT_IMPLEMENTED_YET);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,15 +9,26 @@

@AllArgsConstructor
public class KafkaDomainEventBus implements DomainEventBus {
public static final String NOT_IMPLEMENTED_YET = "Not implemented yet";
private final ReactiveMessageSender sender;

@Override
public <T> Publisher<Void> emit(DomainEvent<T> event) {
return sender.send(event);
}

@Override
public <T> Publisher<Void> emit(String domain, DomainEvent<T> event) {
throw new UnsupportedOperationException(NOT_IMPLEMENTED_YET);
}

@Override
public Publisher<Void> emit(CloudEvent event) {
return sender.send(event);
}

@Override
public Publisher<Void> emit(String domain, CloudEvent event) {
throw new UnsupportedOperationException(NOT_IMPLEMENTED_YET);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
package org.reactivecommons.async.kafka;

import io.cloudevents.CloudEvent;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;
import org.reactivecommons.api.domain.Command;
import org.reactivecommons.async.api.AsyncQuery;
import org.reactivecommons.async.api.DirectAsyncGateway;
import org.reactivecommons.async.api.From;

import static org.junit.jupiter.api.Assertions.assertThrows;

@ExtendWith(MockitoExtension.class)
class KafkaDirectAsyncGatewayTest {
private final DirectAsyncGateway directAsyncGateway = new KafkaDirectAsyncGateway();
private final String targetName = "targetName";
private final String domain = "domain";
private final long delay = 1000L;
@Mock
private CloudEvent cloudEvent;
@Mock
private Command<String> command;
@Mock
private AsyncQuery<String> query;
@Mock
private From from;

@Test
void allMethodsAreNotImplemented() {
assertThrows(UnsupportedOperationException.class, () -> directAsyncGateway.sendCommand(cloudEvent, targetName));
assertThrows(UnsupportedOperationException.class, () -> directAsyncGateway.sendCommand(cloudEvent, targetName, domain));
assertThrows(UnsupportedOperationException.class, () -> directAsyncGateway.sendCommand(cloudEvent, targetName, delay));
assertThrows(UnsupportedOperationException.class, () -> directAsyncGateway.sendCommand(cloudEvent, targetName, delay, domain));
assertThrows(UnsupportedOperationException.class, () -> directAsyncGateway.sendCommand(command, targetName));
assertThrows(UnsupportedOperationException.class, () -> directAsyncGateway.sendCommand(command, targetName, domain));
assertThrows(UnsupportedOperationException.class, () -> directAsyncGateway.sendCommand(command, targetName, delay));
assertThrows(UnsupportedOperationException.class, () -> directAsyncGateway.sendCommand(command, targetName, delay, domain));

assertThrows(UnsupportedOperationException.class, () -> directAsyncGateway.requestReply(cloudEvent, targetName, CloudEvent.class));
assertThrows(UnsupportedOperationException.class, () -> directAsyncGateway.requestReply(cloudEvent, targetName, CloudEvent.class, domain));
assertThrows(UnsupportedOperationException.class, () -> directAsyncGateway.requestReply(query, targetName, CloudEvent.class));
assertThrows(UnsupportedOperationException.class, () -> directAsyncGateway.requestReply(query, targetName, CloudEvent.class, domain));

assertThrows(UnsupportedOperationException.class, () -> directAsyncGateway.reply(targetName, from));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
package org.reactivecommons.async.kafka;

import io.cloudevents.CloudEvent;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.InjectMocks;
import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;
import org.reactivecommons.api.domain.DomainEvent;
import org.reactivecommons.async.kafka.communications.ReactiveMessageSender;
import reactor.core.publisher.Mono;
import reactor.test.StepVerifier;

import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.mockito.Mockito.when;

@ExtendWith(MockitoExtension.class)
class KafkaDomainEventBusTest {
@Mock
private DomainEvent<String> domainEvent;
@Mock
private CloudEvent cloudEvent;
@Mock
private ReactiveMessageSender sender;
@InjectMocks
private KafkaDomainEventBus kafkaDomainEventBus;
private final String domain = "domain";

@Test
void shouldEmitDomainEvent() {
// Arrange
when(sender.send(domainEvent)).thenReturn(Mono.empty());
// Act
Mono<Void> flow = Mono.from(kafkaDomainEventBus.emit(domainEvent));
// Assert
StepVerifier.create(flow)
.verifyComplete();
}

@Test
void shouldEmitCloudEvent() {
// Arrange
when(sender.send(cloudEvent)).thenReturn(Mono.empty());
// Act
Mono<Void> flow = Mono.from(kafkaDomainEventBus.emit(cloudEvent));
// Assert
StepVerifier.create(flow)
.verifyComplete();
}

@Test
void operationsShouldNotBeAbleForDomains() {
assertThrows(UnsupportedOperationException.class, () -> kafkaDomainEventBus.emit(domain, domainEvent));
assertThrows(UnsupportedOperationException.class, () -> kafkaDomainEventBus.emit(domain, cloudEvent));
}
}
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
package org.reactivecommons.async.rabbit;

import io.cloudevents.CloudEvent;
import org.reactivecommons.async.rabbit.communications.ReactiveMessageSender;
import org.reactivecommons.api.domain.DomainEvent;
import org.reactivecommons.api.domain.DomainEventBus;
import org.reactivecommons.async.commons.config.BrokerConfig;
import org.reactivecommons.async.rabbit.communications.ReactiveMessageSender;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Mono;
import org.reactivecommons.api.domain.DomainEvent;
import org.reactivecommons.api.domain.DomainEventBus;

import java.util.Collections;

Expand All @@ -29,7 +29,12 @@ public RabbitDomainEventBus(ReactiveMessageSender sender, String exchange, Broke
@Override
public <T> Mono<Void> emit(DomainEvent<T> event) {
return sender.sendWithConfirm(event, exchange, event.getName(), Collections.emptyMap(), persistentEvents)
.onErrorMap(err -> new RuntimeException("Event send failure: " + event.getName(), err));
.onErrorMap(err -> new RuntimeException("Event send failure: " + event.getName(), err));
}

@Override
public <T> Publisher<Void> emit(String domain, DomainEvent<T> event) {
throw new UnsupportedOperationException("Not implemented yet");
}

@Override
Expand All @@ -39,4 +44,9 @@ public Publisher<Void> emit(CloudEvent cloudEvent) {
.onErrorMap(err -> new RuntimeException("Event send failure: " + cloudEvent.getType(), err));
}

@Override
public Publisher<Void> emit(String domain, CloudEvent event) {
throw new UnsupportedOperationException("Not implemented yet");
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,10 @@
import org.reactivecommons.async.api.handlers.registered.RegisteredEventListener;
import org.reactivecommons.async.commons.DiscardNotifier;
import org.reactivecommons.async.commons.EventExecutor;
import org.reactivecommons.async.commons.HandlerResolver;
import org.reactivecommons.async.commons.communications.Message;
import org.reactivecommons.async.commons.converters.MessageConverter;
import org.reactivecommons.async.commons.ext.CustomReporter;
import org.reactivecommons.async.commons.HandlerResolver;
import org.reactivecommons.async.rabbit.communications.ReactiveMessageListener;
import org.reactivecommons.async.rabbit.communications.TopologyCreator;
import reactor.core.publisher.Flux;
Expand Down Expand Up @@ -51,9 +51,6 @@ public ApplicationNotificationListener(ReactiveMessageListener receiver,
}

protected Mono<Void> setUpBindings(TopologyCreator creator) {
final Mono<AMQP.Exchange.DeclareOk> declareExchange = creator.declare(exchange(exchangeName)
.type("topic")
.durable(true));

final Mono<AMQP.Queue.DeclareOk> declareQueue = creator.declare(
queue(queueName)
Expand All @@ -65,6 +62,10 @@ protected Mono<Void> setUpBindings(TopologyCreator creator) {
.flatMap(listener -> creator.bind(binding(exchangeName, listener.getPath(), queueName)));

if (createTopology) {
final Mono<AMQP.Exchange.DeclareOk> declareExchange = creator.declare(exchange(exchangeName)
.type("topic")
.durable(true));

return declareExchange
.then(declareQueue)
.thenMany(bindings)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,8 @@ protected Mono<AcknowledgableDelivery> handle(AcknowledgableDelivery msj, Instan
}

private void onTerminate() {
messageFlux.doOnTerminate(this::onTerminate)
messageFlux
.doOnTerminate(this::onTerminate)
.subscribe(new LoggerSubscriber<>(getClass().getName()));
}

Expand Down
Loading
Loading