Skip to content

Commit 2d28b57

Browse files
PubsubTemplate example
Resolving dependency issues, simplifying it Reformating code according to Spring code style
1 parent dd44846 commit 2d28b57

File tree

23 files changed

+327
-241
lines changed

23 files changed

+327
-241
lines changed

spring-cloud-gcp-core/pom.xml

+4-4
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
<?xml version="1.0" encoding="UTF-8"?>
2-
<project xmlns="http://maven.apache.org/POM/4.0.0"
3-
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
2+
<project xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
3+
xmlns="http://maven.apache.org/POM/4.0.0"
44
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
55
<modelVersion>4.0.0</modelVersion>
66
<parent>
@@ -21,8 +21,8 @@
2121
<artifactId>commons-logging</artifactId>
2222
</dependency>
2323
<dependency>
24-
<groupId>com.google.auth</groupId>
25-
<artifactId>google-auth-library-oauth2-http</artifactId>
24+
<groupId>com.google.cloud</groupId>
25+
<artifactId>google-cloud</artifactId>
2626
</dependency>
2727
</dependencies>
2828
</project>

spring-cloud-gcp-core/src/main/java/org/springframework/cloud/gcp/core/autoconfig/CredentialsAutoConfiguration.java

+5-4
Original file line numberDiff line numberDiff line change
@@ -45,11 +45,12 @@ public void setEnvironment(Environment environment) {
4545
}
4646

4747
@Bean
48-
public GoogleCredentials googleCredentials(ResourceLoader resourceLoader) throws Exception{
48+
public GoogleCredentials googleCredentials(ResourceLoader resourceLoader)
49+
throws Exception {
4950
if (!StringUtils.isEmpty(this.environment.getProperty(AUTH_LOCATION_KEY))) {
50-
return GoogleCredentials.fromStream(
51-
new FileInputStream(resourceLoader
52-
.getResource(this.environment.getProperty(AUTH_LOCATION_KEY)).getFile()));
51+
return GoogleCredentials.fromStream(new FileInputStream(resourceLoader
52+
.getResource(this.environment.getProperty(AUTH_LOCATION_KEY))
53+
.getFile()));
5354
}
5455
else {
5556
return GoogleCredentials.getApplicationDefault();

spring-cloud-gcp-core/src/main/java/org/springframework/cloud/gcp/support/GcpHeaders.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@
2323
*/
2424
public abstract class GcpHeaders {
2525

26-
private static final String PREFIX = "gcp_";
26+
private static final String PREFIX = "gcp_";
2727

28-
public static final String CONSUMER = PREFIX + "pub_sub_consumer";
28+
public static final String CONSUMER = PREFIX + "pub_sub_consumer";
2929
}

spring-cloud-gcp-dependencies/pom.xml

+2-49
Original file line numberDiff line numberDiff line change
@@ -16,9 +16,7 @@
1616
<description>Spring Cloud GCP Dependencies</description>
1717

1818
<properties>
19-
<google-cloud-java.version>0.17.2-alpha</google-cloud-java.version>
20-
<grpc.version>1.2.0</grpc.version>
21-
<google-pubsub.api>0.1.5</google-pubsub.api>
19+
<google-cloud-java.version>0.18.0-alpha</google-cloud-java.version>
2220
</properties>
2321

2422
<dependencyManagement>
@@ -33,52 +31,7 @@
3331
<artifactId>google-cloud</artifactId>
3432
<version>${google-cloud-java.version}</version>
3533
</dependency>
36-
<dependency>
37-
<groupId>io.netty</groupId>
38-
<artifactId>netty-tcnative-boringssl-static</artifactId>
39-
<version>1.1.33.Fork19</version>
40-
</dependency>
41-
<dependency>
42-
<groupId>org.springframework.cloud</groupId>
43-
<artifactId>spring-cloud-gcp-pubsub</artifactId>
44-
<version>${project.version}</version>
45-
</dependency>
46-
<dependency>
47-
<groupId>com.google.api.grpc</groupId>
48-
<artifactId>grpc-google-cloud-pubsub-v1</artifactId>
49-
<version>${google-pubsub.api}</version>
50-
<exclusions>
51-
<exclusion>
52-
<groupId>io.grpc</groupId>
53-
<artifactId>grpc-all</artifactId>
54-
</exclusion>
55-
</exclusions>
56-
</dependency>
57-
<dependency>
58-
<groupId>com.google.auth</groupId>
59-
<artifactId>google-auth-library-oauth2-http</artifactId>
60-
<version>0.6.0</version>
61-
</dependency>
62-
<dependency>
63-
<groupId>com.google.cloud</groupId>
64-
<artifactId>google-cloud-storage</artifactId>
65-
<version>1.0.0</version>
66-
</dependency>
67-
<dependency>
68-
<groupId>io.grpc</groupId>
69-
<artifactId>grpc-netty</artifactId>
70-
<version>${grpc.version}</version>
71-
</dependency>
72-
<dependency>
73-
<groupId>io.grpc</groupId>
74-
<artifactId>grpc-stub</artifactId>
75-
<version>${grpc.version}</version>
76-
</dependency>
77-
<dependency>
78-
<groupId>io.grpc</groupId>
79-
<artifactId>grpc-auth</artifactId>
80-
<version>${grpc.version}</version>
81-
</dependency>
34+
8235
<dependency>
8336
<groupId>io.projectreactor</groupId>
8437
<artifactId>reactor-core</artifactId>

spring-cloud-gcp-pubsub/.jdk8

+1-1
Original file line numberDiff line numberDiff line change
@@ -1 +1 @@
1-
# This file makes the animal-sniffer plugin consider the project's code is Java 8.
1+
# This file makes the animal-sniffer plugin consider the project's code is Java 8.

spring-cloud-gcp-pubsub/pom.xml

+5-11
Original file line numberDiff line numberDiff line change
@@ -13,10 +13,7 @@
1313
<description>Spring Cloud GCP PubSub Module</description>
1414

1515
<dependencies>
16-
<dependency>
17-
<groupId>com.google.api.grpc</groupId>
18-
<artifactId>grpc-google-cloud-pubsub-v1</artifactId>
19-
</dependency>
16+
2017
<dependency>
2118
<groupId>com.google.cloud</groupId>
2219
<artifactId>google-cloud</artifactId>
@@ -42,20 +39,17 @@
4239
<artifactId>spring-test</artifactId>
4340
<scope>test</scope>
4441
</dependency>
45-
<!-- SI channel adapters -->
42+
43+
<!-- SI channel adapter dependencies -->
4644
<dependency>
4745
<groupId>org.springframework.integration</groupId>
4846
<artifactId>spring-integration-core</artifactId>
47+
<optional>true</optional>
4948
</dependency>
50-
<dependency>
51-
<groupId>com.google.api</groupId>
52-
<artifactId>api-common</artifactId>
53-
<!-- This shouldn't be required here. -->
54-
<version>1.0.0</version>
55-
</dependency>
5649
<dependency>
5750
<groupId>org.springframework.cloud</groupId>
5851
<artifactId>spring-cloud-gcp-core</artifactId>
52+
<optional>true</optional>
5953
</dependency>
6054
</dependencies>
6155
</project>

spring-cloud-gcp-pubsub/src/main/java/org/springframework/cloud/gcp/pubsub/BlockingPubSubSender.java

+2-1
Original file line numberDiff line numberDiff line change
@@ -23,5 +23,6 @@
2323
/**
2424
* @author Vinicius Carvalho
2525
*/
26-
public interface BlockingPubSubSender extends PubSubSender<String, Iterable<String>, Collection<? extends Message<?>>> {
26+
public interface BlockingPubSubSender
27+
extends PubSubSender<String, Iterable<String>, Collection<? extends Message<?>>> {
2728
}

spring-cloud-gcp-pubsub/src/main/java/org/springframework/cloud/gcp/pubsub/DefaultPubSubSender.java

+8-5
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,8 @@ public Mono<String> send(String destination, Object payload) {
4141
}
4242

4343
@Override
44-
public Mono<String> send(String destination, Object payload, Map<String, Object> headers) {
44+
public Mono<String> send(String destination, Object payload,
45+
Map<String, Object> headers) {
4546
throw new UnsupportedOperationException("not yet implemented");
4647
}
4748

@@ -63,7 +64,6 @@ protected void doStart() {
6364
protected void doStop() {
6465
}
6566

66-
6767
class DefaultBlockingPubSubSender implements BlockingPubSubSender {
6868

6969
@Override
@@ -72,7 +72,8 @@ public String send(String destination, Object payload) {
7272
}
7373

7474
@Override
75-
public String send(String destination, Object payload, Map<String, Object> headers) {
75+
public String send(String destination, Object payload,
76+
Map<String, Object> headers) {
7677
return DefaultPubSubSender.this.send(destination, payload, headers).block();
7778
}
7879

@@ -82,8 +83,10 @@ public String send(String destination, Message<?> message) {
8283
}
8384

8485
@Override
85-
public Iterable<String> sendAll(String destination, Collection<? extends Message<?>> messages) {
86-
return DefaultPubSubSender.this.sendAll(destination, Flux.fromIterable(messages)).toIterable();
86+
public Iterable<String> sendAll(String destination,
87+
Collection<? extends Message<?>> messages) {
88+
return DefaultPubSubSender.this
89+
.sendAll(destination, Flux.fromIterable(messages)).toIterable();
8790
}
8891
}
8992
}

spring-cloud-gcp-pubsub/src/main/java/org/springframework/cloud/gcp/pubsub/PubSubSender.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ public interface PubSubSender<S, M, F> {
2727

2828
S send(String destination, Object payload);
2929

30-
S send(String destination, Object payload, Map<String,Object> headers);
30+
S send(String destination, Object payload, Map<String, Object> headers);
3131

3232
S send(String destination, Message<?> message);
3333

spring-cloud-gcp-pubsub/src/main/java/org/springframework/cloud/gcp/pubsub/ReactivePubSubSender.java

+2-1
Original file line numberDiff line numberDiff line change
@@ -24,5 +24,6 @@
2424
/**
2525
* @author Vinicius Carvalho
2626
*/
27-
public interface ReactivePubSubSender extends PubSubSender<Mono<String>, Flux<String>, Flux<? extends Message<?>>> {
27+
public interface ReactivePubSubSender
28+
extends PubSubSender<Mono<String>, Flux<String>, Flux<? extends Message<?>>> {
2829
}

spring-cloud-gcp-pubsub/src/main/java/org/springframework/cloud/gcp/pubsub/converters/PubSubHeaderMapper.java

+10-10
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,6 @@
2121
import java.util.LinkedHashMap;
2222
import java.util.Map;
2323

24-
2524
import org.springframework.beans.factory.InitializingBean;
2625
import org.springframework.cloud.gcp.pubsub.converters.support.BooleanConverter;
2726
import org.springframework.cloud.gcp.pubsub.converters.support.DateConverter;
@@ -35,12 +34,13 @@
3534
/**
3635
* @author Vinicius Carvalho
3736
*/
38-
public class PubSubHeaderMapper implements HeaderMapper<Map<String,String>>, InitializingBean{
39-
40-
private String datePattern = "yyyy-MM-dd'T'HH:mm:ss.SSS'Z'";
37+
public class PubSubHeaderMapper
38+
implements HeaderMapper<Map<String, String>>, InitializingBean {
4139

4240
private final Map<Class<?>, HeaderConverter<?>> converterMap = new LinkedHashMap<>();
4341

42+
private String datePattern = "yyyy-MM-dd'T'HH:mm:ss.SSS'Z'";
43+
4444
public String getDatePattern() {
4545
return datePattern;
4646
}
@@ -55,27 +55,27 @@ public void afterPropertiesSet() throws Exception {
5555
converterMap.put(Integer.class, new IntegerConverter());
5656
converterMap.put(Long.class, new LongConverter());
5757
converterMap.put(Float.class, new FloatConverter());
58-
converterMap.put(Double.class,new DoubleConverter());
58+
converterMap.put(Double.class, new DoubleConverter());
5959
converterMap.put(Date.class, new DateConverter(datePattern));
6060
}
6161

6262
@Override
6363
public void fromHeaders(MessageHeaders headers, Map<String, String> target) {
64-
for (Map.Entry<String,Object> entry : headers.entrySet()) {
65-
target.put(entry.getKey(),encode(entry.getValue()));
64+
for (Map.Entry<String, Object> entry : headers.entrySet()) {
65+
target.put(entry.getKey(), encode(entry.getValue()));
6666
}
6767
}
6868

6969
@Override
7070
public MessageHeaders toHeaders(Map<String, String> source) {
7171
Map<String, Object> headerMap = new HashMap<>();
72-
for (Map.Entry<String,String> entry : source.entrySet()) {
73-
headerMap.put(entry.getKey(),decode(entry.getValue()));
72+
for (Map.Entry<String, String> entry : source.entrySet()) {
73+
headerMap.put(entry.getKey(), decode(entry.getValue()));
7474
}
7575
return new MessageHeaders(headerMap);
7676
}
7777

78-
@SuppressWarnings({"rawtypes", "unchecked"})
78+
@SuppressWarnings({ "rawtypes", "unchecked" })
7979
private String encode(Object value) {
8080
if (value instanceof String) {
8181
return (String) value;
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
/*
2+
* Copyright 2017 original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*
16+
*/
17+
18+
package org.springframework.cloud.gcp.pubsub.core;
19+
20+
import org.springframework.messaging.Message;
21+
22+
/**
23+
* @author Vinicius Carvalho
24+
*/
25+
public interface PubSubOperations {
26+
27+
String send(String topic, Message message);
28+
29+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,90 @@
1+
/*
2+
* Copyright 2017 original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*
16+
*/
17+
18+
package org.springframework.cloud.gcp.pubsub.core;
19+
20+
import java.io.IOException;
21+
import java.util.concurrent.ConcurrentHashMap;
22+
23+
import com.google.api.gax.grpc.ExecutorProvider;
24+
import com.google.api.gax.grpc.InstantiatingExecutorProvider;
25+
import com.google.auth.oauth2.GoogleCredentials;
26+
import com.google.cloud.pubsub.spi.v1.Publisher;
27+
import com.google.protobuf.ByteString;
28+
import com.google.pubsub.v1.PubsubMessage;
29+
import com.google.pubsub.v1.TopicName;
30+
31+
import org.springframework.beans.factory.InitializingBean;
32+
import org.springframework.cloud.gcp.pubsub.integration.converters.SimpleMessageConverter;
33+
import org.springframework.messaging.Message;
34+
import org.springframework.messaging.converter.MessageConverter;
35+
36+
/**
37+
* @author Vinicius Carvalho
38+
*/
39+
public class PubSubTemplate implements PubSubOperations, InitializingBean {
40+
41+
private final String projectId;
42+
43+
private final GoogleCredentials credentials;
44+
45+
private ConcurrentHashMap<String, Publisher> publishers = new ConcurrentHashMap<>();
46+
47+
private ExecutorProvider executorProvider;
48+
49+
private MessageConverter converter;
50+
51+
private int concurrentProducers = 1;
52+
53+
public PubSubTemplate(GoogleCredentials credentials, String projectId) {
54+
this.projectId = projectId;
55+
this.credentials = credentials;
56+
this.executorProvider = InstantiatingExecutorProvider.newBuilder()
57+
.setExecutorThreadCount(concurrentProducers).build();
58+
this.converter = new SimpleMessageConverter();
59+
}
60+
61+
@Override
62+
public String send(final String topic, Message message) throws RuntimeException {
63+
64+
Publisher publisher = publishers.computeIfAbsent(topic, s -> {
65+
try {
66+
return Publisher.defaultBuilder(TopicName.create(projectId, topic))
67+
.setExecutorProvider(executorProvider).build();
68+
}
69+
catch (IOException e) {
70+
e.printStackTrace();
71+
}
72+
return null;
73+
});
74+
75+
try {
76+
return publisher.publish(PubsubMessage.newBuilder()
77+
.setData(ByteString
78+
.copyFrom(message.getPayload().toString().getBytes()))
79+
.build()).get();
80+
}
81+
catch (Exception e) {
82+
throw new RuntimeException(e);
83+
}
84+
}
85+
86+
@Override
87+
public void afterPropertiesSet() throws Exception {
88+
89+
}
90+
}

0 commit comments

Comments
 (0)