Skip to content

Commit c20cd61

Browse files
committed
Add a sample app with Spring Kafka
If you use Spring Cloud Function 3.1.3 then Kafka should work with Spring Cloud Streams out of the box already. This sample adds support for vanilla Spring Kafka with `@KafkaListener` where the listener can listen for and emit `CloudEvent`. Some issues with existing messaging support came to light and these have been ironed out in the process. Signed-off-by: Dave Syer <[email protected]>
1 parent e2b1310 commit c20cd61

File tree

18 files changed

+765
-20
lines changed

18 files changed

+765
-20
lines changed

.gitignore

+1
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ release.properties
1212
*.iml
1313
.classpath
1414
.project
15+
.factorypath
1516
.settings/
1617
.vscode/
1718
.attach_pid*

examples/pom.xml

+1
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
<module>spring-reactive</module>
2929
<module>spring-rsocket</module>
3030
<module>spring-function</module>
31+
<module>spring-kafka</module>
3132
</modules>
3233

3334
</project>

examples/spring-kafka/README.md

+32
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
# Spring Kafka + CloudEvents sample
2+
3+
## Build
4+
5+
```shell
6+
mvn package
7+
```
8+
9+
## Start Consumer
10+
11+
```shell
12+
mvn spring-boot:run
13+
```
14+
15+
You can try sending a request using any kafka client, or using the intergration tests in this project. You send to the "in" topic and it echos back a cloud event on the "out" topic. The listener is implemented like this (the request and response are modelled directly as a `CloudEvent`):
16+
17+
```java
18+
@KafkaListener(id = "listener", topics = "in", clientIdPrefix = "demo")
19+
@SendTo("out")
20+
public CloudEvent listen(CloudEvent event) {
21+
return ...;
22+
}
23+
```
24+
25+
and to make that work we need to install the Kafka message converter as a `@Bean`:
26+
27+
```java
28+
@Bean
29+
public CloudEventRecordMessageConverter recordMessageConverter() {
30+
return new CloudEventRecordMessageConverter();
31+
}
32+
```

examples/spring-kafka/pom.xml

+82
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,82 @@
1+
<?xml version="1.0" encoding="UTF-8"?>
2+
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
3+
<parent>
4+
<artifactId>cloudevents-examples</artifactId>
5+
<groupId>io.cloudevents</groupId>
6+
<version>2.1.0-SNAPSHOT</version>
7+
</parent>
8+
<modelVersion>4.0.0</modelVersion>
9+
10+
<artifactId>cloudevents-spring-kafka-example</artifactId>
11+
12+
<properties>
13+
<spring-boot.version>2.4.3</spring-boot.version>
14+
<testcontainers.version>1.15.2</testcontainers.version>
15+
</properties>
16+
17+
<dependencyManagement>
18+
<dependencies>
19+
<dependency>
20+
<groupId>org.springframework.boot</groupId>
21+
<artifactId>spring-boot-dependencies</artifactId>
22+
<version>${spring-boot.version}</version>
23+
<type>pom</type>
24+
<scope>import</scope>
25+
</dependency>
26+
<dependency>
27+
<groupId>org.testcontainers</groupId>
28+
<artifactId>testcontainers-bom</artifactId>
29+
<version>${testcontainers.version}</version>
30+
<type>pom</type>
31+
<scope>import</scope>
32+
</dependency>
33+
</dependencies>
34+
</dependencyManagement>
35+
36+
<dependencies>
37+
<dependency>
38+
<groupId>org.springframework.boot</groupId>
39+
<artifactId>spring-boot-starter</artifactId>
40+
</dependency>
41+
<dependency>
42+
<groupId>io.cloudevents</groupId>
43+
<artifactId>cloudevents-spring</artifactId>
44+
<version>${project.version}</version>
45+
</dependency>
46+
<dependency>
47+
<groupId>org.springframework.kafka</groupId>
48+
<artifactId>spring-kafka</artifactId>
49+
</dependency>
50+
<dependency>
51+
<groupId>io.cloudevents</groupId>
52+
<artifactId>cloudevents-json-jackson</artifactId>
53+
<version>${project.version}</version>
54+
</dependency>
55+
<dependency>
56+
<groupId>org.springframework.boot</groupId>
57+
<artifactId>spring-boot-starter-test</artifactId>
58+
<scope>test</scope>
59+
</dependency>
60+
<dependency>
61+
<groupId>org.springframework.kafka</groupId>
62+
<artifactId>spring-kafka-test</artifactId>
63+
<scope>test</scope>
64+
</dependency>
65+
<dependency>
66+
<groupId>org.testcontainers</groupId>
67+
<artifactId>kafka</artifactId>
68+
<scope>test</scope>
69+
</dependency>
70+
</dependencies>
71+
72+
<build>
73+
<plugins>
74+
<plugin>
75+
<groupId>org.springframework.boot</groupId>
76+
<artifactId>spring-boot-maven-plugin</artifactId>
77+
<version>${spring-boot.version}</version>
78+
</plugin>
79+
</plugins>
80+
</build>
81+
82+
</project>
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,58 @@
1+
package io.cloudevents.examples.spring;
2+
3+
import java.net.URI;
4+
import java.util.UUID;
5+
6+
import org.apache.kafka.clients.admin.NewTopic;
7+
import org.springframework.boot.SpringApplication;
8+
import org.springframework.boot.autoconfigure.SpringBootApplication;
9+
import org.springframework.context.annotation.Bean;
10+
import org.springframework.context.annotation.Configuration;
11+
import org.springframework.kafka.annotation.KafkaListener;
12+
import org.springframework.kafka.config.TopicBuilder;
13+
import org.springframework.messaging.handler.annotation.SendTo;
14+
15+
import io.cloudevents.CloudEvent;
16+
import io.cloudevents.core.builder.CloudEventBuilder;
17+
import io.cloudevents.spring.kafka.CloudEventRecordMessageConverter;
18+
19+
@SpringBootApplication
20+
public class DemoApplication {
21+
22+
public static void main(String[] args) throws Exception {
23+
SpringApplication.run(DemoApplication.class, args);
24+
}
25+
26+
@KafkaListener(id = "listener", topics = "in", clientIdPrefix = "demo")
27+
@SendTo("out")
28+
public CloudEvent listen(CloudEvent event) {
29+
System.err.println("Echo: " + event);
30+
return CloudEventBuilder.from(event).withId(UUID.randomUUID().toString())
31+
.withSource(URI.create("https://spring.io/foos")).withType("io.spring.event.Foo")
32+
.withData(event.getData().toBytes()).build();
33+
}
34+
35+
@Bean
36+
public NewTopic topicOut() {
37+
return TopicBuilder.name("out").build();
38+
}
39+
40+
@Bean
41+
public NewTopic topicIn() {
42+
return TopicBuilder.name("in").build();
43+
}
44+
45+
@Configuration
46+
public static class CloudEventMessageConverterConfiguration {
47+
/**
48+
* Configure a RecordMessageConverter for Spring Kafka to pick up and use to
49+
* convert to and from CloudEvent and Message.
50+
*/
51+
@Bean
52+
public CloudEventRecordMessageConverter recordMessageConverter() {
53+
return new CloudEventRecordMessageConverter();
54+
}
55+
56+
}
57+
58+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
/*
2+
* Copyright 2019-2019 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package io.cloudevents.examples.spring;
17+
18+
class Foo {
19+
20+
private String value;
21+
22+
public Foo() {
23+
}
24+
25+
public Foo(String value) {
26+
this.value = value;
27+
}
28+
29+
public String getValue() {
30+
return this.value;
31+
}
32+
33+
public void setValue(String value) {
34+
this.value = value;
35+
}
36+
37+
@Override
38+
public String toString() {
39+
return "Foo [value=" + this.value + "]";
40+
}
41+
42+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,4 @@
1+
spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.ByteArraySerializer
2+
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.ByteArraySerializer
3+
spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.ByteArrayDeserializer
4+
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.ByteArrayDeserializer

0 commit comments

Comments
 (0)