Skip to content

Commit 27d0d8a

Browse files
committed
spring-cloudGH-422 Add RabbitMQ instructions for Cloud Events interaction
1 parent 97347bf commit 27d0d8a

File tree

9 files changed

+106
-52
lines changed

9 files changed

+106
-52
lines changed

spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/catalog/SimpleFunctionRegistry.java

+17-7
Original file line numberDiff line numberDiff line change
@@ -16,12 +16,14 @@
1616

1717
package org.springframework.cloud.function.context.catalog;
1818

19+
import java.lang.reflect.Array;
1920
import java.lang.reflect.Field;
2021
import java.lang.reflect.ParameterizedType;
2122
import java.lang.reflect.Type;
2223
import java.lang.reflect.TypeVariable;
2324
import java.lang.reflect.WildcardType;
2425
import java.util.ArrayList;
26+
import java.util.Arrays;
2527
import java.util.Collection;
2628
import java.util.Collections;
2729
import java.util.HashMap;
@@ -170,7 +172,7 @@ <T> T doLookup(Class<?> type, String functionDefinition, String[] expectedOutput
170172
function = this.compose(type, functionDefinition);
171173
}
172174

173-
if (function != null) {
175+
if (function != null && !ObjectUtils.isEmpty(expectedOutputMimeTypes)) {
174176
function.expectedOutputContentType = expectedOutputMimeTypes;
175177
}
176178
else if (logger.isDebugEnabled()) {
@@ -804,6 +806,10 @@ else if (this.skipInputConversion) {
804806
: new OriginalMessageHolder(((Message) input).getPayload(), (Message<?>) input);
805807
}
806808
else if (input instanceof Message) {
809+
if (((Message) input).getPayload().getClass().getName().equals("org.springframework.kafka.support.KafkaNull")
810+
&& !this.isInputTypeMessage()) { //TODO rework
811+
return null;
812+
}
807813
convertedInput = this.convertInputMessageIfNecessary((Message) input, type);
808814
if (convertedInput == null) { // give ConversionService a chance
809815
convertedInput = this.convertNonMessageInputIfNecessary(type, ((Message) input).getPayload(), false);
@@ -866,7 +872,10 @@ else if (output instanceof Message) {
866872
else if (output instanceof Collection && this.isOutputTypeMessage()) {
867873
convertedOutput = this.convertMultipleOutputValuesIfNecessary(output, ObjectUtils.isEmpty(contentType) ? null : contentType);
868874
}
869-
else if (!ObjectUtils.isEmpty(contentType)) {
875+
else if (ObjectUtils.isArray(output) && !(output instanceof byte[])) {
876+
convertedOutput = this.convertMultipleOutputValuesIfNecessary(output, ObjectUtils.isEmpty(contentType) ? null : contentType);
877+
}
878+
else {
870879
convertedOutput = messageConverter.toMessage(output,
871880
new MessageHeaders(Collections.singletonMap(MessageHeaders.CONTENT_TYPE, contentType[0])));
872881
}
@@ -1043,14 +1052,15 @@ private Object convertOutputMessageIfNecessary(Object output, String expectedOut
10431052
*/
10441053
@SuppressWarnings("unchecked")
10451054
private Object convertMultipleOutputValuesIfNecessary(Object output, String[] contentType) {
1046-
Collection outputCollection = (Collection) output;
1047-
Collection convertedOutputCollection = output instanceof List ? new ArrayList<>() : new TreeSet<>();
1055+
Collection outputCollection = ObjectUtils.isArray(output) ? Arrays.asList(output) : (Collection) output;
1056+
Collection convertedOutputCollection = outputCollection instanceof List ? new ArrayList<>() : new TreeSet<>();
1057+
Type type = this.isOutputTypeMessage() ? FunctionTypeUtils.getGenericType(this.outputType) : this.outputType;
10481058
for (Object outToConvert : outputCollection) {
1049-
Object result = this.convertOutputIfNecessary(outToConvert, this.outputType, contentType);
1050-
Assert.notNull(result, () -> "Failed to convert output '" + output + "'");
1059+
Object result = this.convertOutputIfNecessary(outToConvert, type, contentType);
1060+
Assert.notNull(result, () -> "Failed to convert output '" + outToConvert + "'");
10511061
convertedOutputCollection.add(result);
10521062
}
1053-
return convertedOutputCollection;
1063+
return ObjectUtils.isArray(output) ? convertedOutputCollection.toArray() : convertedOutputCollection;
10541064
}
10551065

10561066
/*

spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/config/CloudEventJsonMessageConverter.java

+4-8
Original file line numberDiff line numberDiff line change
@@ -17,13 +17,13 @@
1717
package org.springframework.cloud.function.context.config;
1818

1919
import java.lang.reflect.Type;
20+
import java.nio.charset.StandardCharsets;
2021
import java.util.Collection;
2122
import java.util.Map;
2223

2324
import org.springframework.cloud.function.json.JsonMapper;
2425
import org.springframework.lang.Nullable;
2526
import org.springframework.messaging.Message;
26-
import org.springframework.messaging.MessageHeaders;
2727
import org.springframework.messaging.converter.MessageConverter;
2828
import org.springframework.messaging.support.MessageBuilder;
2929
import org.springframework.util.MimeType;
@@ -55,20 +55,16 @@ protected Object convertFromInternal(Message<?> message, Class<?> targetClass, @
5555
return message.getPayload();
5656
}
5757
Type convertToType = conversionHint == null ? targetClass : (Type) conversionHint;
58-
String jsonString = (String) message.getPayload();
58+
String jsonString = message.getPayload() instanceof String
59+
? (String) message.getPayload()
60+
: new String((byte[]) message.getPayload(), StandardCharsets.UTF_8);
5961
Map<String, Object> mapEvent = this.mapper.fromJson(jsonString, Map.class);
6062
Object payload = this.mapper.fromJson(this.mapper.toJson(mapEvent.get("data")), convertToType);
6163
mapEvent.remove("data");
6264
return MessageBuilder.withPayload(payload).copyHeaders(mapEvent).build();
6365
}
6466
}
6567

66-
@Override
67-
protected Object convertToInternal(Object payload, @Nullable MessageHeaders headers,
68-
@Nullable Object conversionHint) {
69-
throw new UnsupportedOperationException("Temporarily not supported as this converter is work in progress");
70-
}
71-
7268
private boolean isBinary(Message<?> message) {
7369
Map<String, Object> headers = message.getHeaders();
7470
return headers.containsKey("source") && headers.containsKey("specversion") && headers.containsKey("type");

spring-cloud-function-context/src/test/java/org/springframework/cloud/function/context/catalog/BeanFactoryAwarePojoFunctionRegistryTests.java

+6-6
Original file line numberDiff line numberDiff line change
@@ -62,11 +62,11 @@ public void testWithPojoFunctionImplementingFunction() {
6262
Function<Message<String>, String> f2message = catalog.lookup("myFunction");
6363
assertThat(f2message.apply(MessageBuilder.withPayload("message").build())).isEqualTo("MESSAGE");
6464

65-
Function<Message<String>, Message<byte[]>> f2messageReturned = catalog.lookup("myFunction", "application/json");
66-
assertThat(new String(f2messageReturned.apply(MessageBuilder.withPayload("message").build()).getPayload())).isEqualTo("\"MESSAGE\"");
67-
6865
Function<Flux<String>, Flux<String>> f3 = catalog.lookup("myFunction");
6966
assertThat(f3.apply(Flux.just("foo")).blockFirst()).isEqualTo("FOO");
67+
68+
Function<Message<String>, Message<byte[]>> f2messageReturned = catalog.lookup("myFunction", "application/json");
69+
assertThat(new String(f2messageReturned.apply(MessageBuilder.withPayload("message").build()).getPayload())).isEqualTo("\"MESSAGE\"");
7070
}
7171

7272
@Test
@@ -85,11 +85,11 @@ public void testWithPojoFunction() {
8585
Function<Message<String>, String> f2message = catalog.lookup("myFunctionLike");
8686
assertThat(f2message.apply(MessageBuilder.withPayload("message").build())).isEqualTo("MESSAGE");
8787

88-
Function<Message<String>, Message<byte[]>> f2messageReturned = catalog.lookup("myFunctionLike", "application/json");
89-
assertThat(new String(f2messageReturned.apply(MessageBuilder.withPayload("message").build()).getPayload())).isEqualTo("\"MESSAGE\"");
90-
9188
Function<Flux<String>, Flux<String>> f3 = catalog.lookup("myFunctionLike");
9289
assertThat(f3.apply(Flux.just("foo")).blockFirst()).isEqualTo("FOO");
90+
91+
Function<Message<String>, Message<byte[]>> f2messageReturned = catalog.lookup("myFunctionLike", "application/json");
92+
assertThat(new String(f2messageReturned.apply(MessageBuilder.withPayload("message").build()).getPayload())).isEqualTo("\"MESSAGE\"");
9393
}
9494

9595
@Test
Original file line numberDiff line numberDiff line change
@@ -1,35 +1,37 @@
1-
## Cloud Events with Spring samples
1+
## Examples of Cloud Events with Spring
22

33
### Introduction
44
The current example uses spring-cloud-function framework as its core which allows users to only worry about functional aspects of
55
their requirement while taking care-off non-functional aspects. For more information on Spring Cloud Function please visit
66
our https://spring.io/projects/spring-cloud-function[project page].
7-
The example provides dependency and instructions to demonstrate several distinct invocation models:
87

9-
- Direct function invocation
10-
- Function as a REST endpoint
11-
- Function as message handler (e.g., Kafka, RabbitMQ etc)
12-
- Function invocation via RSocket
8+
The example provides dependencies and instructions to demonstrate several distinct invocation models:
9+
10+
- _Direct function invocation_
11+
- _Function as a REST endpoint_
12+
- _Function as message handler (e.g., Kafka, RabbitMQ etc)_
13+
- _Function invocation via RSocket_
1314

1415
The POM file defines all the necessary dependency in a segregated way, so you can choose the one you're interested in.
1516

16-
#### Direct function invocation
17+
### Direct function invocation
18+
TBD
1719

18-
#### Function as a REST endpoint
20+
### Function as a REST endpoint
1921

2022
Given that SCF allows function to be exposed as REST endpoints, you can post cloud event to any of the
21-
functions by using function name as path (e.g., localhost:8080/<function_name>)
23+
functions by using function name as path (e.g., `localhost:8080/<function_name>`)
2224

2325
Here is an example of curl command posting a cloud event in binary-mode:
2426

2527
[source, text]
2628
----
2729
curl -w'\n' localhost:8080/asPOJO \
28-
-H "ce-Specversion: 1.0" \
29-
-H "ce-Type: com.example.springevent" \
30-
-H "ce-Source: spring.io/spring-event" \
30+
-H "ce-specversion: 1.0" \
31+
-H "ce-type: com.example.springevent" \
32+
-H "ce-source: spring.io/spring-event" \
3133
-H "Content-Type: application/json" \
32-
-H "ce-Id: 0001" \
34+
-H "ce-id: 0001" \
3335
-d '{"releaseDate":"24-03-2004", "releaseName":"Spring Framework", "version":"1.0"}'
3436
----
3537

@@ -38,11 +40,7 @@ And here is an example of curl command posting a cloud event in structured-mode:
3840
[source, text]
3941
----
4042
curl -w'\n' localhost:8080/asString \
41-
-H "ce-Specversion: 1.0" \
42-
-H "ce-Type: com.example.springevent" \
43-
-H "ce-Source: spring.io/spring-event" \
4443
-H "Content-Type: application/cloudevents+json" \
45-
-H "ce-Id: 0001" \
4644
-d '{
4745
"specversion" : "1.0",
4846
"type" : "org.springframework",
@@ -57,15 +55,65 @@ curl -w'\n' localhost:8080/asString \
5755
}'
5856
----
5957

60-
#### Function as message handler (e.g., Kafka, RabbitMQ etc)
58+
### Function as message handler (e.g., Kafka, RabbitMQ etc)
59+
60+
Streaming support for Apache Kafka and RabbitMQ is provided via https://spring.io/projects/spring-cloud-stream[Spring Cloud Stream] framework.
61+
In fact we're only mentioning Apache Kafka and RabbitMQ here as an example.
62+
Streaming support is automatically provided for any existing binders (e.g., Solace, Google PubSub, Amazon Kinesis and many more).
63+
Please see project page for for additional details on available binders.
64+
65+
Binders are components of Spring Cloud Stream responsible to bind user code (e.g., java function) to message broker destinations, so execution
66+
is triggered by messages posted to the broker destination and results of execution are sent back to the broker destinations. Binders also provide
67+
support for _consumer groups_, _partitioning_ and many other features. For more information on Spring Cloud Stream, Binders and available features
68+
please visit our https://docs.spring.io/spring-cloud-stream/docs/3.1.0-SNAPSHOT/reference/html/[documentation page].
69+
70+
*RabbitMQ*
71+
By simply declaring the following dependency
72+
[source, xml]
73+
----
74+
<dependency>
75+
<groupId>org.springframework.cloud</groupId>
76+
<artifactId>spring-cloud-stream-binder-rabbit</artifactId>
77+
<version>3.1.0-SNAPSHOT</version>
78+
</dependency>
79+
----
80+
. . . any function can now act as message handler bound to RabitMQ message broker. All you need to do is identify which function you intend to bind
81+
by identifying it via `spring.cloud.function.definition` property.
82+
[source, text]
83+
----
84+
--spring.cloud.function.definition=asPOJOMessage
85+
----
86+
87+
See link:src/main/resources/application.properties[application.properties] for more details.
88+
89+
Assuming RabbitMQ broker is running on localhost:default_port, start the application and navigate to
90+
http://localhost:15672/#/exchanges[RabbitMQ Dashboard]. Select `asPOJOMessage-in-0` exchange and:
91+
92+
. . . post a binary-mode message by filling all the required Cloud Events headers and posting `data` element as _payload_ (see the screenshot below).
93+
94+
image::images\rabbit-send-binary.png[]
6195

62-
Streaming support for Kafka and Rabbit is provided via Spring Cloud Stream framework (link). In fact we're only mentioning Kafka and Rabbit here as an example.
63-
Streaming support is automatically provided for any existing binders (e.g., Solace, GCP, AWS etc) (link)
64-
Binders are components of SCSt responsible to bind user code (e.g., function) to broker destinations so execution is triggered
65-
by messages on broker destination and results of execution are sent to broker destinations. Binders also provide support consumer
66-
groups and partitioning for both Kafka and RabbitMQ messaging systems.
96+
. . . post a structured-mode message by filling `contentType` header to the value of `application/cloudevents+json` while providing the
97+
entire structure of Cloud Event message as _payload_ (see the screenshot below).
98+
99+
[source, json]
100+
----
101+
{
102+
"specversion" : "1.0",
103+
"type" : "org.springframework",
104+
"source" : "https://spring.io/",
105+
"id" : "A234-1234-1234",
106+
"datacontenttype" : "application/json",
107+
"data" : {
108+
"version" : "1.0",
109+
"releaseName" : "Spring Framework",
110+
"releaseDate" : "24-03-2004"
111+
}
112+
}
113+
----
67114

115+
image::images\rabbit-send-structured.png[]
68116

69-
#### Function invocation via RSocket
117+
### Function invocation via RSocket
70118

71119
TBD
Loading
Loading

spring-cloud-function-samples/function-sample-cloudevent/pom.xml

+5-5
Original file line numberDiff line numberDiff line change
@@ -45,11 +45,11 @@
4545
<!-- end RSocket -->
4646

4747
<!-- RabbitMQ - only needed if you intend to invoke via RabbitMQ -->
48-
<!-- <dependency> -->
49-
<!-- <groupId>org.springframework.cloud</groupId> -->
50-
<!-- <artifactId>spring-cloud-stream-binder-rabbit</artifactId> -->
51-
<!-- <version>3.1.0-SNAPSHOT</version> -->
52-
<!-- </dependency> -->
48+
<dependency>
49+
<groupId>org.springframework.cloud</groupId>
50+
<artifactId>spring-cloud-stream-binder-rabbit</artifactId>
51+
<version>3.1.0-SNAPSHOT</version>
52+
</dependency>
5353
<!-- end RabbitMQ -->
5454

5555
<!-- Kafka - only needed if you intend to invoke via RabbitMQ -->
Original file line numberDiff line numberDiff line change
@@ -1 +1 @@
1-
1+
spring.cloud.function.definition=asPOJOMessage

spring-cloud-function-samples/function-sample-cloudevent/src/test/java/io/spring/cloudevent/CloudeventDemoApplicationRESTTests.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -152,7 +152,7 @@ public void testAsStracturalFormatToPOJO() throws Exception {
152152
" \"releaseDate\" : \"24-03-2004\"\n" +
153153
" }\n" +
154154
"}";
155-
155+
System.out.println(payload);
156156
HttpHeaders headers = new HttpHeaders();
157157
headers.setContentType(MediaType.valueOf("application/cloudevents+json;charset=utf-8"));
158158

0 commit comments

Comments
 (0)