Skip to content

Commit ed56070

Browse files
committed
* added messages publisher
1 parent 417acf8 commit ed56070

18 files changed

+239
-42
lines changed

app.iml

+2
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,8 @@
2626
<orderEntry type="library" name="Maven: com.fasterxml.jackson.core:jackson-databind:2.9.5" level="project" />
2727
<orderEntry type="library" name="Maven: com.fasterxml.jackson.core:jackson-annotations:2.9.0" level="project" />
2828
<orderEntry type="library" name="Maven: com.fasterxml.jackson.core:jackson-core:2.9.5" level="project" />
29+
<orderEntry type="library" name="Maven: com.fasterxml.jackson.dataformat:jackson-dataformat-yaml:2.9.5" level="project" />
30+
<orderEntry type="library" name="Maven: org.yaml:snakeyaml:1.18" level="project" />
2931
<orderEntry type="library" scope="PROVIDED" name="Maven: org.projectlombok:lombok:1.16.8" level="project" />
3032
<orderEntry type="library" name="Maven: com.rabbitmq:amqp-client:5.2.0" level="project" />
3133
<orderEntry type="library" name="Maven: org.slf4j:slf4j-api:1.7.25" level="project" />

docker-compose.yml

+9
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
version: '3'
2+
services:
3+
queue:
4+
image: rabbitmq:3.7-rc-management
5+
ports:
6+
- "5672:5672"
7+
- "15672:15672"
8+
- "25672:25672"
9+
- "4369:4369"

pom.xml

+9-2
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,8 @@
88

99
<properties>
1010
<akkastreams.version>2.5.11</akkastreams.version>
11+
<jackson.version>2.9.5</jackson.version>
12+
<rabbitmq.version>5.2.0</rabbitmq.version>
1113
</properties>
1214

1315
<dependencies>
@@ -33,7 +35,12 @@
3335
<dependency>
3436
<groupId>com.fasterxml.jackson.core</groupId>
3537
<artifactId>jackson-databind</artifactId>
36-
<version>2.9.5</version>
38+
<version>${jackson.version}</version>
39+
</dependency>
40+
<dependency>
41+
<groupId>com.fasterxml.jackson.dataformat</groupId>
42+
<artifactId>jackson-dataformat-yaml</artifactId>
43+
<version>${jackson.version}</version>
3744
</dependency>
3845

3946
<!-- LOMBOK -->
@@ -48,7 +55,7 @@
4855
<dependency>
4956
<groupId>com.rabbitmq</groupId>
5057
<artifactId>amqp-client</artifactId>
51-
<version>5.2.0</version>
58+
<version>${rabbitmq.version}</version>
5259
</dependency>
5360
</dependencies>
5461
<build>

settings.yml

+5
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
host: 192.168.99.100
2+
port: 5672
3+
user: guest
4+
pass: guest
5+
queue: hello

src/main/java/tech/volhvporechja/akka/demo/actors/Contracts/FetchStartMessage.java

+4
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,10 @@
44
import lombok.Builder;
55
import lombok.Getter;
66

7+
/**
8+
* Сообщение говорящее о том, что пора зачинать поток заданий
9+
* содержит все параметры, чтобы начать это делать.
10+
*/
711
@Getter
812
@AllArgsConstructor
913
@Builder

src/main/java/tech/volhvporechja/akka/demo/actors/Contracts/FetcherInitMessage.java

+4
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,10 @@
44
import lombok.Builder;
55
import lombok.Getter;
66

7+
/**
8+
* Инициализирующее сообщние
9+
* содержит параметры в какой начальной конфигурации надо инициализироваться систему
10+
*/
711
@Getter
812
@AllArgsConstructor
913
@Builder

src/main/java/tech/volhvporechja/akka/demo/actors/Contracts/GreetingConfigMessage.java

+3
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,9 @@
44
import lombok.Getter;
55
import lombok.Setter;
66

7+
/**
8+
* Сообщение по реконфигурации шаблонов печати
9+
*/
710
@Getter
811
@Setter
912
@AllArgsConstructor

src/main/java/tech/volhvporechja/akka/demo/actors/Contracts/GreetingMessage.java

+3
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,9 @@
33
import lombok.AllArgsConstructor;
44
import lombok.Getter;
55

6+
/**
7+
* Сообщение для печати
8+
*/
69
@Getter
710
@AllArgsConstructor
811
public class GreetingMessage {

src/main/java/tech/volhvporechja/akka/demo/actors/Contracts/IncomingMessage.java

-13
This file was deleted.
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
package tech.volhvporechja.akka.demo.actors.Contracts;
2+
3+
public class MessagesTypes {
4+
public static final String WELCOME = "welcome";
5+
public static final String CONFIG = "config";
6+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
package tech.volhvporechja.akka.demo.actors.Contracts;
2+
3+
import com.fasterxml.jackson.core.JsonProcessingException;
4+
import com.fasterxml.jackson.databind.ObjectMapper;
5+
import lombok.AllArgsConstructor;
6+
import lombok.Builder;
7+
import lombok.Getter;
8+
import lombok.Setter;
9+
10+
/**
11+
* Контракт сообщения получаемого из RabbitMQ
12+
*/
13+
@Getter
14+
@Setter
15+
@AllArgsConstructor
16+
@Builder
17+
public class QueueIncomingMessage {
18+
private String type;
19+
private String load;
20+
21+
public byte[] marshall() throws JsonProcessingException {
22+
ObjectMapper mapper = new ObjectMapper();
23+
return mapper.writeValueAsString(this).getBytes();
24+
}
25+
}

src/main/java/tech/volhvporechja/akka/demo/actors/Contracts/WelcomeMessage.java

+3
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,9 @@
33
import lombok.AllArgsConstructor;
44
import lombok.Getter;
55

6+
/**
7+
* Сообщение для шаблона и последующей печати
8+
*/
69
@Getter
710
@AllArgsConstructor
811
public class WelcomeMessage {

src/main/java/tech/volhvporechja/akka/demo/actors/FetcherActor.java

+21-11
Original file line numberDiff line numberDiff line change
@@ -22,38 +22,47 @@ public class FetcherActor extends AbstractActor {
2222
private Router broadcastRouter;
2323
private Router router;
2424

25+
// Blueprint нашего аткора, то как его следует инстанцировать
2526
static public Props props() {
2627
return Props.create(FetcherActor.class, FetcherActor::new);
2728
}
2829

2930
/**
30-
* Processor for actors system initialization
31+
* Логика инициалиазции нашей системы
32+
* Передаем сообщение в котором храниться конфигурация нашей системы
3133
*
3234
* @param config initialization config
3335
*/
34-
private void intializePrinterSystem(FetcherInitMessage config) {
36+
private void initializePrinterSystem(FetcherInitMessage config) {
37+
// готовим наши "принтеры"
3538
List<Routee> printerRoutees = new ArrayList<>();
3639
for (int i = 0; i < config.getPrintersCount(); i++) {
3740
ActorRef printer = getContext().actorOf(PrinterActor.props(i), "printer-" + i);
3841
getContext().watch(printer);
3942
printerRoutees.add(new ActorRefRoutee(printer));
4043
}
4144

45+
// готовим наши шаблонизаторы
4246
Router printerRouter = new Router(new RoundRobinRoutingLogic(), printerRoutees);
43-
4447
List<Routee> greeterRoutees = new ArrayList<>();
4548
for (int i = 0; i < config.getGreetersCount(); i++) {
4649
ActorRef greeter = getContext().actorOf(GreeterActor.props("Aloha, %s!!", printerRouter, i), "greeter-" + i);
4750
getContext().watch(greeter);
4851
greeterRoutees.add(new ActorRefRoutee(greeter));
4952
}
5053

54+
// Роутер для распространения конфигурационного сообщения
5155
broadcastRouter = new Router(new BroadcastRoutingLogic(), greeterRoutees);
56+
57+
// Роутер для отправки сообщений предназначенных для шаблонизации и печати
5258
router = new Router(new RoundRobinRoutingLogic(), greeterRoutees);
5359
}
5460

5561
/**
56-
* Processor for listening for queue
62+
* Метод прослушивания очереди
63+
* !! Disclaimer: используется блокирующее API для доступа к RabbitMQ
64+
* поток, который будет исполнять этот код будет потерян для пула.
65+
* Но так как такой актор только один, для демонстрационных целей - ничего страшного
5766
*
5867
* @param config listening config
5968
*/
@@ -78,18 +87,18 @@ public void handleDelivery(String consumerTag, Envelope envelope,
7887
String message = new String(body, "UTF-8");
7988
log.info(" [x] Received '" + message + "'");
8089

81-
IncomingMessage request = null;
90+
QueueIncomingMessage request = null;
8291
try {
8392
ObjectMapper mapper = new ObjectMapper();
84-
request = mapper.readValue(message, IncomingMessage.class);
93+
request = mapper.readValue(message, QueueIncomingMessage.class);
8594
} catch (Exception ex) {
8695
log.error(ex, "Unable to parse request.");
8796
}
8897

8998
if (request != null)
90-
if (request.getType().equals("welcome"))
99+
if (request.getType().equals(MessagesTypes.WELCOME))
91100
router.route(new WelcomeMessage(request.getLoad()), getSelf());
92-
else if (request.getType().equals("config"))
101+
else if (request.getType().equals(MessagesTypes.CONFIG))
93102
broadcastRouter.route(new GreetingConfigMessage(request.getLoad()), getSelf());
94103
else
95104
log.warning("Unknown message type");
@@ -103,12 +112,13 @@ else if (request.getType().equals("config"))
103112
}
104113
}
105114

115+
// Билдер нашего обраотчика сообщений
106116
@Override
107117
public Receive createReceive() {
108118
return receiveBuilder()
109-
.match(FetchStartMessage.class, this::listen)
110-
.match(FetcherInitMessage.class, this::intializePrinterSystem)
111-
.matchAny(o -> log.info("received unknown message"))
119+
.match(FetchStartMessage.class, this::listen) // Обработчик сообщения начала процесса прослушивания заданий на печать
120+
.match(FetcherInitMessage.class, this::initializePrinterSystem) // Обработчик сообщения инициализации системы
121+
.matchAny(o -> log.info("received unknown message")) // Обработчик неизвестного сообщения
112122
.build();
113123
}
114124
}

src/main/java/tech/volhvporechja/akka/demo/actors/GreeterActor.java

+22-5
Original file line numberDiff line numberDiff line change
@@ -9,16 +9,33 @@
99
import tech.volhvporechja.akka.demo.actors.Contracts.GreetingMessage;
1010
import tech.volhvporechja.akka.demo.actors.Contracts.WelcomeMessage;
1111

12+
/**
13+
* Наш шаблонизатор
14+
* на борту: шаблон и логика обработки двух типов сообщений.
15+
*/
1216
public class GreeterActor extends AbstractActor {
1317
private final int id;
1418
private LoggingAdapter log = Logging.getLogger(getContext().getSystem(), this);
1519
private String messageTemplate;
1620
private Router printer;
1721

18-
static public Props props(String message, Router printer, int id) {
19-
return Props.create(GreeterActor.class, () -> new GreeterActor(message, printer, id));
22+
/**
23+
* Blueprint, описывающий как надо собирать нашего Актора
24+
* @param template печатная форма
25+
* @param printersRouter роутер, дающий нам доступ к принтерам
26+
* @param id идентификтор эксземпляра этого шаблонизатора
27+
* @return
28+
*/
29+
static public Props props(String template, Router printersRouter, int id) {
30+
return Props.create(GreeterActor.class, () -> new GreeterActor(template, printersRouter, id));
2031
}
2132

33+
/**
34+
* JUST CTOR
35+
* @param messageTemplate
36+
* @param router
37+
* @param id
38+
*/
2239
public GreeterActor(String messageTemplate, Router router, int id) {
2340
this.messageTemplate = messageTemplate;
2441
this.printer = router;
@@ -28,15 +45,15 @@ public GreeterActor(String messageTemplate, Router router, int id) {
2845
@Override
2946
public Receive createReceive() {
3047
return receiveBuilder()
31-
.match(GreetingConfigMessage.class, wtg -> { // Reconfiguration
48+
.match(GreetingConfigMessage.class, wtg -> { // Реконфигурируем
3249
log.info(String.format("GREETER-%s: Reconfiguring greeter [%s]->[%s]", id, messageTemplate, wtg.getFormatString()));
3350
messageTemplate = wtg.getFormatString();
3451
})
35-
.match(WelcomeMessage.class, wtg -> { // Execution
52+
.match(WelcomeMessage.class, wtg -> { // Рендерим печатную псевдоформу с шаблоном
3653
log.info(String.format("GREETER-%s: Received data [%s]", id, wtg.getWho()));
3754
printer.route(new GreetingMessage(String.format(messageTemplate, wtg.getWho())), getSelf());
3855
})
39-
.matchAny(o -> log.info("received unknown message"))
56+
.matchAny(o -> log.info("received unknown message")) // Обработчик неизвестного сообщения
4057
.build();
4158
}
4259
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,56 @@
1+
package tech.volhvporechja.akka.demo.actors;
2+
3+
import com.rabbitmq.client.Channel;
4+
import com.rabbitmq.client.Connection;
5+
import com.rabbitmq.client.ConnectionFactory;
6+
import tech.volhvporechja.akka.demo.actors.Contracts.MessagesTypes;
7+
import tech.volhvporechja.akka.demo.actors.Contracts.QueueIncomingMessage;
8+
import tech.volhvporechja.akka.demo.actors.config.RabbitMQConfig;
9+
10+
import java.io.IOException;
11+
import java.util.concurrent.TimeoutException;
12+
13+
public class MessagesPublisher {
14+
15+
private static String[] names = {"Alex", "Ben", "Carrol", "Diana", "Evan", "Martin Fawler"};
16+
17+
public static void main(String[] args) throws IOException, TimeoutException {
18+
RabbitMQConfig config = RabbitMQConfig.Load("settings.yml");
19+
20+
ConnectionFactory factory = new ConnectionFactory();
21+
factory.setHost(config.getHost());
22+
factory.setPort(config.getPort());
23+
factory.setUsername(config.getUser());
24+
factory.setPassword(config.getPass());
25+
26+
try (Connection conn = factory.newConnection()) {
27+
try (Channel channel = conn.createChannel()) {
28+
channel.queueDeclare(config.getQueue(), false, false, false, null);
29+
30+
QueueIncomingMessage.QueueIncomingMessageBuilder welcomeMessageBuilder = QueueIncomingMessage.builder().
31+
type(MessagesTypes.WELCOME);
32+
for (int i = 0; i < 5e6; i++) {
33+
final byte[] message = welcomeMessageBuilder
34+
.load(names[i % names.length])
35+
.build().marshall();
36+
37+
channel.basicPublish("", config.getQueue(), null, message);
38+
}
39+
40+
byte[] reconfig = QueueIncomingMessage.builder()
41+
.type(MessagesTypes.CONFIG)
42+
.load("Heyyo, %s")
43+
.build().marshall();
44+
channel.basicPublish("", config.getQueue(), null, reconfig);
45+
46+
for (int i = 0; i < 5e6; i++) {
47+
final byte[] message = welcomeMessageBuilder
48+
.load(names[i % names.length])
49+
.build().marshall();
50+
51+
channel.basicPublish("", config.getQueue(), null, message);
52+
}
53+
}
54+
}
55+
}
56+
}

src/main/java/tech/volhvporechja/akka/demo/actors/PrinterActor.java

+3-2
Original file line numberDiff line numberDiff line change
@@ -31,10 +31,11 @@ public Receive createReceive() {
3131
})
3232
.match(ReceiveTimeout.class, r -> {
3333
log.info("Soooo boooring!!");
34-
// Switch off
34+
// Switch off - просто висячий код, который позволяет выключить таймер
35+
// для этой адской машины скуки
3536
// getContext().setReceiveTimeout(Duration.Undefined());
3637
})
37-
.matchAny(o -> log.info("received unknown message"))
38+
.matchAny(o -> log.info("received unknown message")) // Обработчик неизвестного сообщения
3839
.build();
3940
}
4041
}

0 commit comments

Comments
 (0)