Skip to content

Commit 29b121b

Browse files
committed
spring-cloudGH-422 Initial support for CloudEvents
Added initial implementation of MessageConverter At the moment there seem that MessageConverter(s) would be the only thing needed to integrate Cloud Events with various elements of Spring
1 parent b90a54d commit 29b121b

File tree

4 files changed

+82
-7
lines changed

4 files changed

+82
-7
lines changed

spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/catalog/SimpleFunctionRegistry.java

+6-1
Original file line numberDiff line numberDiff line change
@@ -688,6 +688,9 @@ else if (value instanceof Mono) {
688688
inputValue = this.extractValueFromOriginalValueHolderIfNecessary(value);
689689
}
690690

691+
if (inputValue instanceof Message && !this.isInputTypeMessage()) {
692+
inputValue = ((Message) inputValue).getPayload();
693+
}
691694
Object result = ((Function) this.target).apply(inputValue);
692695

693696
return value instanceof OriginalMessageHolder
@@ -985,7 +988,9 @@ private Object convertInputMessageIfNecessary(Message message, Type type) {
985988
convertedInput = message;
986989
}
987990
else {
988-
convertedInput = MessageBuilder.withPayload(convertedInput).copyHeaders(message.getHeaders()).build();
991+
if (!(convertedInput instanceof Message)) {
992+
convertedInput = MessageBuilder.withPayload(convertedInput).copyHeaders(message.getHeaders()).build();
993+
}
989994
}
990995
}
991996
return convertedInput;
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,64 @@
1+
/*
2+
* Copyright 2020-2020 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.cloud.function.context.config;
18+
19+
import java.lang.reflect.Type;
20+
import java.util.Collection;
21+
import java.util.Map;
22+
23+
import org.springframework.cloud.function.json.JsonMapper;
24+
import org.springframework.lang.Nullable;
25+
import org.springframework.messaging.Message;
26+
import org.springframework.messaging.MessageHeaders;
27+
import org.springframework.messaging.converter.MessageConverter;
28+
import org.springframework.messaging.support.MessageBuilder;
29+
import org.springframework.util.MimeType;
30+
31+
/**
32+
* Implementation of {@link MessageConverter} which uses Jackson or Gson libraries to do the
33+
* actual conversion via {@link JsonMapper} instance.
34+
*
35+
* @author Oleg Zhurakousky
36+
*
37+
* @since 3.1.0
38+
*/
39+
public class CloudEventJsonMessageConverter extends JsonMessageConverter {
40+
41+
private final JsonMapper mapper;
42+
43+
public CloudEventJsonMessageConverter(JsonMapper jsonMapper) {
44+
super(jsonMapper, new MimeType("application", "cloudevents+json"));
45+
this.mapper = jsonMapper;
46+
}
47+
48+
@Override
49+
protected Object convertFromInternal(Message<?> message, Class<?> targetClass, @Nullable Object conversionHint) {
50+
Type convertToType = conversionHint == null ? targetClass : (Type) conversionHint;
51+
String jsonString = (String) message.getPayload();
52+
Map<String, Object> mapEvent = this.mapper.fromJson(jsonString, Map.class);
53+
Object payload = this.mapper.fromJson(this.mapper.toJson(mapEvent.get("data")), convertToType);
54+
mapEvent.remove("data");
55+
return MessageBuilder.withPayload(payload).copyHeaders(mapEvent).build();
56+
}
57+
58+
@Override
59+
protected Object convertToInternal(Object payload, @Nullable MessageHeaders headers,
60+
@Nullable Object conversionHint) {
61+
throw new UnsupportedOperationException("Temporarily not supported as this converter is work in progress");
62+
}
63+
64+
}

spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/config/ContextFunctionCatalogAutoConfiguration.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -97,11 +97,11 @@ public FunctionRegistry functionCatalog(List<MessageConverter> messageConverters
9797
.collect(Collectors.toList());
9898

9999
mcList.add(new JsonMessageConverter(jsonMapper));
100+
mcList.add(new CloudEventJsonMessageConverter(jsonMapper));
100101
mcList.add(new ByteArrayMessageConverter());
101102
mcList.add(new StringMessageConverter());
102103
mcList.add(new PrimitiveTypesFromStringMessageConverter(conversionService));
103104

104-
105105
if (!CollectionUtils.isEmpty(mcList)) {
106106
messageConverter = new SmartCompositeMessageConverter(mcList);
107107
}

spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/config/JsonMessageConverter.java

+11-5
Original file line numberDiff line numberDiff line change
@@ -78,14 +78,20 @@ protected Object convertFromInternal(Message<?> message, Class<?> targetClass, @
7878
return message.getPayload();
7979
}
8080
Type convertToType = conversionHint == null ? targetClass : (Type) conversionHint;
81-
try {
82-
return this.jsonMapper.fromJson(message.getPayload(), convertToType);
81+
if (targetClass == byte[].class && message.getPayload() instanceof String) {
82+
return ((String) message.getPayload()).getBytes(StandardCharsets.UTF_8);
8383
}
84-
catch (Exception e) {
85-
if (message.getPayload() instanceof byte[] && targetClass.isAssignableFrom(String.class)) {
86-
return new String((byte[]) message.getPayload(), StandardCharsets.UTF_8);
84+
else {
85+
try {
86+
return this.jsonMapper.fromJson(message.getPayload(), convertToType);
87+
}
88+
catch (Exception e) {
89+
if (message.getPayload() instanceof byte[] && targetClass.isAssignableFrom(String.class)) {
90+
return new String((byte[]) message.getPayload(), StandardCharsets.UTF_8);
91+
}
8792
}
8893
}
94+
8995
return null;
9096
}
9197

0 commit comments

Comments
 (0)