Skip to content

Commit ee3868d

Browse files
committed
GH-422, GH-606 Add support for generating attributes using provider in Consumer
1 parent 6093aef commit ee3868d

File tree

3 files changed

+58
-7
lines changed

3 files changed

+58
-7
lines changed

spring-cloud-function-context/src/main/java/org/springframework/cloud/function/cloudevent/CloudEventMessageUtils.java

+6
Original file line numberDiff line numberDiff line change
@@ -286,6 +286,12 @@ public static String determinePrefixToUse(Message<?> inputMessage) {
286286
}
287287
}
288288

289+
public static CloudEventAttributes generateAttributesWithProvider(MessageHeaders headers, CloudEventAttributesProvider provider) {
290+
CloudEventAttributes attributes = new CloudEventAttributes(headers);
291+
provider.generateDefaultCloudEventHeaders(attributes);
292+
return attributes;
293+
}
294+
289295
public static CloudEventAttributes generateAttributes(Message<?> inputMessage, Object result, String applicationName) {
290296
CloudEventAttributes attributes = new CloudEventAttributes(inputMessage.getHeaders(), CloudEventMessageUtils.determinePrefixToUse(inputMessage));
291297
if (attributes.isValidCloudEvent()) {

spring-cloud-function-samples/function-sample-cloudevent/src/main/java/io/spring/cloudevent/CloudeventDemoApplication.java

+39-7
Original file line numberDiff line numberDiff line change
@@ -16,18 +16,28 @@
1616

1717
package io.spring.cloudevent;
1818

19+
import java.net.URI;
20+
import java.util.List;
1921
import java.util.Map;
2022
import java.util.function.Consumer;
2123
import java.util.function.Function;
2224

25+
import org.springframework.beans.factory.annotation.Value;
2326
import org.springframework.boot.SpringApplication;
2427
import org.springframework.boot.autoconfigure.SpringBootApplication;
28+
import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression;
29+
import org.springframework.boot.web.client.RestTemplateBuilder;
2530
import org.springframework.cloud.function.cloudevent.CloudEventAttributes;
2631
import org.springframework.cloud.function.cloudevent.CloudEventAttributesProvider;
2732
import org.springframework.cloud.function.cloudevent.CloudEventMessageUtils;
33+
import org.springframework.cloud.function.web.util.HeaderUtils;
2834
import org.springframework.context.annotation.Bean;
35+
import org.springframework.http.RequestEntity;
2936
import org.springframework.messaging.Message;
37+
import org.springframework.messaging.MessageHeaders;
3038
import org.springframework.messaging.support.MessageBuilder;
39+
import org.springframework.util.Assert;
40+
import org.springframework.web.client.RestTemplate;
3141

3242
/**
3343
* Sample application that demonstrates how user functions can be triggered by cloud event.
@@ -120,11 +130,33 @@ public Function<SpringReleaseEvent, SpringReleaseEvent> consumeAndProduceCloudEv
120130
};
121131
}
122132

123-
// @Bean
124-
// public Consumer<SpringReleaseEvent> pojoConsumer(CloudEventAttributesProvider provider) {
125-
// return event -> {
126-
//
127-
// provider.generateDefaultCloudEventHeaders(attributes);
128-
// };
129-
// }
133+
@Bean
134+
public Consumer<Message<SpringReleaseEvent>> pojoConsumer(CloudEventAttributesProvider provider, RestTemplateBuilder builder) {
135+
return eventMessage -> {
136+
RequestEntity<SpringReleaseEvent> entity = RequestEntity.post(URI.create("http://foo.com"))
137+
.headers(HeaderUtils.fromMessage(
138+
new MessageHeaders(CloudEventMessageUtils.generateAttributesWithProvider(eventMessage.getHeaders(), provider))))
139+
.body(eventMessage.getPayload());
140+
List<String> sourceHeader = entity.getHeaders().get("ce-source");
141+
List<String> typeHeader = entity.getHeaders().get("ce-type");
142+
Assert.isTrue(sourceHeader.get(0).equals("https://interface21.com/"), "'source' must be https://interface21.com/");
143+
Assert.isTrue(typeHeader.get(0).equals("com.interface21"), "'source' must be com.interface21");
144+
};
145+
}
146+
147+
148+
@Bean
149+
@ConditionalOnExpression("'${K_SINK:}'!=''")
150+
public Consumer<Message<Map<String, Object>>> sink(CloudEventAttributesProvider provider,
151+
RestTemplateBuilder builder, @Value("${K_SINK}") String url) {
152+
RestTemplate client = builder.build();
153+
return eventMessage -> {
154+
RequestEntity<Map<String, Object>> entity = RequestEntity.post(URI.create("http://foo.com"))
155+
.headers(HeaderUtils.fromMessage(
156+
new MessageHeaders(CloudEventMessageUtils.generateAttributesWithProvider(eventMessage.getHeaders(), provider))))
157+
.body(eventMessage.getPayload());
158+
client.exchange(entity, byte[].class);
159+
};
160+
}
161+
130162
}

spring-cloud-function-samples/function-sample-cloudevent/src/test/java/io/spring/cloudevent/CloudeventDemoApplicationRESTTests.java

+13
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@
3636
import org.springframework.context.annotation.Configuration;
3737
import org.springframework.http.HttpHeaders;
3838
import org.springframework.http.HttpMethod;
39+
import org.springframework.http.HttpStatus;
3940
import org.springframework.http.MediaType;
4041
import org.springframework.http.RequestEntity;
4142
import org.springframework.http.ResponseEntity;
@@ -314,6 +315,18 @@ public void testAsStructuralPojoToPojo() throws Exception {
314315
// .isEqualTo(Collections.singletonList(SpringReleaseEvent.class.getName()));
315316
}
316317

318+
@Test
319+
public void testPojoConsumer() throws Exception {
320+
SpringApplication.run(new Class[] {CloudeventDemoApplication.class}, new String[] {});
321+
322+
HttpHeaders headers = this.buildHeaders(MediaType.APPLICATION_JSON);
323+
String payload = "{\"releaseDate\":\"01-10-2006\", \"releaseName\":\"Spring Framework\", \"version\":\"1.0\"}";
324+
325+
RequestEntity<String> re = new RequestEntity<>(payload, headers, HttpMethod.POST, this.constructURI("/pojoConsumer"));
326+
ResponseEntity<String> response = testRestTemplate.exchange(re, String.class);
327+
assertThat(response.getStatusCode()).isEqualTo(HttpStatus.ACCEPTED);
328+
}
329+
317330
private URI constructURI(String path) throws Exception {
318331
return new URI("http://localhost:" + System.getProperty("server.port") + path);
319332
}

0 commit comments

Comments
 (0)