Skip to content

Commit 0843e68

Browse files
committed
kafka unique message test but not working
1 parent 1beb8f1 commit 0843e68

7 files changed

+256
-5
lines changed

springboot-kafka-demo/README.md

+12
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,18 @@
88
broker.id=0
99
listeners=PLAINTEXT://192.168.116.128:9092
1010
log.dirs=/tmp/kafka-logs
11+
```
12+
13+
> zokeeper start
14+
15+
```aidl
16+
$ bin/zookeeper-server-start.sh config/zookeeper.properties
17+
```
18+
19+
> kafka start
20+
21+
```aidl
22+
$ bin/kafka-server-start.sh config/server.properties
1123
```
1224

1325
> Create topic
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
package demo.uniquemessage;
2+
3+
import lombok.AllArgsConstructor;
4+
import lombok.Getter;
5+
import lombok.NoArgsConstructor;
6+
import lombok.Setter;
7+
8+
/**
9+
* @author zacconding
10+
* @Date 2018-12-19
11+
* @GitHub : https://github.com/zacscoding
12+
*/
13+
@Getter
14+
@Setter
15+
@NoArgsConstructor
16+
@AllArgsConstructor
17+
public class UniqueMessage {
18+
19+
private String id;
20+
private String content;
21+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
package demo.uniquemessage;
2+
3+
import lombok.extern.slf4j.Slf4j;
4+
import org.springframework.context.annotation.Profile;
5+
import org.springframework.kafka.annotation.KafkaListener;
6+
import org.springframework.messaging.handler.annotation.Payload;
7+
import org.springframework.stereotype.Component;
8+
9+
/**
10+
* @author zacconding
11+
* @Date 2018-12-19
12+
* @GitHub : https://github.com/zacscoding
13+
*/
14+
@Profile("uniquemessage")
15+
@Slf4j(topic = "[CONSUMER]")
16+
@Component
17+
public class UniqueMessageConsumer {
18+
19+
@KafkaListener(topics = "unique-message", groupId = "1")
20+
/*public void listen(@Payload UniqueMessage message
21+
, @Headers MessageHeaders headers) {*/
22+
// public void listen(@Payload UniqueMessage message) {
23+
public void listen(@Payload String message) {
24+
/*log.info("## [Consumer] receive message. id : {} / content : {}"
25+
, message.getId(), message.getContent());*/
26+
27+
log.info("## [Consumer] receive message. {}"
28+
, message);
29+
}
30+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,48 @@
1+
package demo.uniquemessage;
2+
3+
import java.util.HashMap;
4+
import java.util.Map;
5+
import org.apache.kafka.clients.consumer.ConsumerConfig;
6+
import org.apache.kafka.common.serialization.StringDeserializer;
7+
import org.springframework.beans.factory.annotation.Value;
8+
import org.springframework.context.annotation.Bean;
9+
import org.springframework.context.annotation.Configuration;
10+
import org.springframework.context.annotation.Profile;
11+
import org.springframework.kafka.annotation.EnableKafka;
12+
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
13+
import org.springframework.kafka.core.ConsumerFactory;
14+
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
15+
import org.springframework.kafka.support.serializer.JsonDeserializer;
16+
17+
/**
18+
* @author zacconding
19+
* @Date 2018-12-19
20+
* @GitHub : https://github.com/zacscoding
21+
*/
22+
@Profile("uniquemessage")
23+
@EnableKafka // enable detection of @KafkaListener
24+
@Configuration
25+
public class UniqueMessageConsumerConfiguration {
26+
27+
@Value("${unique-message.kafka.bootstrap-servers}")
28+
private String bootstrapAddress;
29+
30+
@Bean
31+
public ConsumerFactory<String, String> consumerFactory() {
32+
Map<String, Object> props = new HashMap<>();
33+
34+
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
35+
props.put(ConsumerConfig.GROUP_ID_CONFIG, "1");
36+
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
37+
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
38+
39+
return new DefaultKafkaConsumerFactory<>(props);
40+
}
41+
42+
@Bean
43+
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
44+
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
45+
factory.setConsumerFactory(consumerFactory());
46+
return factory;
47+
}
48+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,80 @@
1+
package demo.uniquemessage;
2+
3+
import com.fasterxml.jackson.databind.ObjectMapper;
4+
import java.util.Random;
5+
import java.util.UUID;
6+
import java.util.concurrent.TimeUnit;
7+
import javax.annotation.PostConstruct;
8+
import lombok.extern.slf4j.Slf4j;
9+
import org.apache.kafka.clients.producer.ProducerRecord;
10+
import org.springframework.beans.factory.annotation.Autowired;
11+
import org.springframework.context.annotation.Profile;
12+
import org.springframework.kafka.core.KafkaTemplate;
13+
import org.springframework.kafka.support.KafkaHeaders;
14+
import org.springframework.kafka.support.SendResult;
15+
import org.springframework.stereotype.Component;
16+
import org.springframework.util.concurrent.ListenableFuture;
17+
import org.springframework.util.concurrent.ListenableFutureCallback;
18+
19+
/**
20+
* @author zacconding
21+
* @Date 2018-12-19
22+
* @GitHub : https://github.com/zacscoding
23+
*/
24+
@Profile("uniquemessage")
25+
@Slf4j(topic = "[PRODUCER]")
26+
@Component
27+
public class UniqueMessageProducer {
28+
29+
@Autowired
30+
private KafkaTemplate<String, UniqueMessage> kafkaTemplate;
31+
private final String topic = "unique-message";
32+
private final ObjectMapper objectMapper = new ObjectMapper();
33+
private Random random = new Random();
34+
35+
@PostConstruct
36+
public void setUp() {
37+
Thread produceTask = new Thread(() -> {
38+
log.info("## Start producer task");
39+
try {
40+
while (!Thread.currentThread().isInterrupted()) {
41+
UniqueMessage message = new UniqueMessage(generateRandomId(), geneateMessage());
42+
43+
ProducerRecord<String, UniqueMessage> record = new ProducerRecord<>(
44+
topic, message);
45+
record.headers().add(KafkaHeaders.MESSAGE_KEY, message.getId().getBytes());
46+
ListenableFuture<SendResult<String, UniqueMessage>> result = kafkaTemplate
47+
.send(record);
48+
49+
result.addCallback(new ListenableFutureCallback<SendResult<String, UniqueMessage>>() {
50+
@Override
51+
public void onFailure(Throwable error) {
52+
log.info("## Failed to send {}. > {}", message, error.getMessage());
53+
}
54+
55+
@Override
56+
public void onSuccess(SendResult<String, UniqueMessage> result) {
57+
log.info("## Success to send {}", result.toString());
58+
}
59+
});
60+
TimeUnit.SECONDS.sleep(3L);
61+
}
62+
} catch (Exception e) {
63+
log.error("## Exception occur while producing message", e);
64+
}
65+
});
66+
67+
produceTask.setDaemon(true);
68+
produceTask.start();
69+
}
70+
71+
private String generateRandomId() {
72+
return String.valueOf(
73+
random.nextInt(5)
74+
);
75+
}
76+
77+
private String geneateMessage() {
78+
return UUID.randomUUID().toString();
79+
}
80+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,43 @@
1+
package demo.uniquemessage;
2+
3+
import java.util.HashMap;
4+
import java.util.Map;
5+
import org.apache.kafka.clients.producer.ProducerConfig;
6+
import org.apache.kafka.common.serialization.StringSerializer;
7+
import org.springframework.beans.factory.annotation.Value;
8+
import org.springframework.context.annotation.Bean;
9+
import org.springframework.context.annotation.Configuration;
10+
import org.springframework.context.annotation.Profile;
11+
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
12+
import org.springframework.kafka.core.KafkaTemplate;
13+
import org.springframework.kafka.core.ProducerFactory;
14+
import org.springframework.kafka.support.serializer.JsonSerializer;
15+
16+
/**
17+
* @author zacconding
18+
* @Date 2018-12-19
19+
* @GitHub : https://github.com/zacscoding
20+
*/
21+
@Profile("uniquemessage")
22+
@Configuration
23+
public class UniqueMessageProducerConfiguration {
24+
25+
@Value("${unique-message.kafka.bootstrap-servers}")
26+
private String bootstrapAddress;
27+
28+
@Bean
29+
public ProducerFactory<String, UniqueMessage> producerFactory() {
30+
Map<String, Object> configProps = new HashMap<>();
31+
32+
configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
33+
configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
34+
configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
35+
36+
return new DefaultKafkaProducerFactory<>(configProps);
37+
}
38+
39+
@Bean
40+
public KafkaTemplate<String, UniqueMessage> kafkaTemplate() {
41+
return new KafkaTemplate<>(producerFactory());
42+
}
43+
}
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,30 @@
1-
# spring.profiles.active=basic
1+
# Task profiles
2+
3+
#spring.profiles.active=basic
24
#spring.profiles.active=rpc
3-
# working rpc2!!
4-
spring.profiles.active=rpc2
5+
#spring.profiles.active=rpc2
6+
spring.profiles.active=uniquemessage
7+
8+
logging.level.kafka=trace
9+
10+
# kafka for basic
11+
512
# kafka.bootstrapAddress=localhost:9092
613
# basic.kafka.bootstrap-servers=192.168.116.128:9092
7-
## for rpc
14+
15+
# kafka for rpc
816
# https://dzone.com/articles/synchronous-kafka-using-spring-request-reply-1
917
rpc.kafka.bootstrap-servers=192.168.5.78:9092
1018
rpc.kafka.topic.request-topic=request-topic
1119
rpc.kafka.topic.requestreply-topic=requestreply-topic
1220
rpc.kafka.consumergroup=requestreplygorup
13-
logging.level.kafka=trace
21+
22+
23+
# kafka for unique-message
24+
unique-message.kafka.bootstrap-servers=192.168.5.78:9092
25+
26+
27+
28+
29+
30+

0 commit comments

Comments
 (0)