Skip to content

Commit a706b9b

Browse files
authored
Add Unit Test and Docs for GcsStreamingMessageSource Comparator (spring-attic#2119)
Followup to spring-attic#2116; this adds an additional unit test and also mentions it in the docs.
1 parent 0fe079c commit a706b9b

File tree

3 files changed

+83
-24
lines changed

3 files changed

+83
-24
lines changed

docs/src/main/asciidoc/spring-integration-storage.adoc

+2
Original file line numberDiff line numberDiff line change
@@ -74,6 +74,8 @@ public MessageSource<InputStream> streamingAdapter(Storage gcs) {
7474
}
7575
----
7676

77+
If you would like to process the files in your bucket in a specific order, you may pass in a `Comparator<BlobInfo>` to the constructor `GcsStreamingMessageSource` to sort the files being processed.
78+
7779
==== Outbound channel adapter
7880

7981
The outbound channel adapter allows files to be written to Google Cloud Storage.

spring-cloud-gcp-storage/src/main/java/org/springframework/cloud/gcp/storage/integration/inbound/GcsStreamingMessageSource.java

+8
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,14 @@ public GcsStreamingMessageSource(RemoteFileTemplate<BlobInfo> template) {
4444
this(template, null);
4545
}
4646

47+
/**
48+
* Creates a {@link GcsStreamingMessageSource} with a {@code comparator} which controls the order
49+
* that files are processed in.
50+
* @param template template making remote file calls to Google Cloud Storage
51+
* @param comparator defines the order that files should be processed based on {@link BlobInfo}.
52+
*
53+
* @since 1.2
54+
*/
4755
public GcsStreamingMessageSource(RemoteFileTemplate<BlobInfo> template, Comparator<BlobInfo> comparator) {
4856
super(template, comparator);
4957
doSetFilter(new GcsPersistentAcceptOnceFileListFilter(new SimpleMetadataStore(), "gcsStreamingMessageSource"));

spring-cloud-gcp-storage/src/test/java/org/springframework/cloud/gcp/storage/integration/inbound/GcsStreamingMessageSourceTests.java

+73-24
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
package org.springframework.cloud.gcp.storage.integration.inbound;
1818

1919
import java.io.InputStream;
20+
import java.util.Comparator;
2021
import java.util.stream.Collectors;
2122
import java.util.stream.Stream;
2223

@@ -60,24 +61,58 @@
6061
public class GcsStreamingMessageSourceTests {
6162

6263
@Autowired
63-
private PollableChannel gcsChannel;
64+
private PollableChannel unsortedChannel;
65+
66+
@Autowired
67+
private PollableChannel sortedChannel;
6468

6569
@Test
6670
public void testInboundStreamingChannelAdapter() {
67-
Message<?> message = this.gcsChannel.receive(5000);
71+
Message<?> message = this.unsortedChannel.receive(5000);
72+
assertThat(message).isNotNull();
73+
assertThat(message.getPayload()).isInstanceOf(InputStream.class);
74+
assertThat(message.getHeaders().get(FileHeaders.REMOTE_FILE)).isEqualTo("gamma");
75+
76+
message = this.unsortedChannel.receive(5000);
77+
assertThat(message.getHeaders().get(FileHeaders.REMOTE_FILE)).isEqualTo("beta");
78+
assertThat(message.getPayload()).isInstanceOf(InputStream.class);
79+
80+
message = this.unsortedChannel.receive(5000);
81+
assertThat(message.getHeaders().get(FileHeaders.REMOTE_FILE)).isEqualTo("alpha/alpha");
82+
assertThat(message.getPayload()).isInstanceOf(InputStream.class);
6883

84+
message = this.unsortedChannel.receive(10);
85+
assertThat(message).isNull();
86+
}
87+
88+
@Test
89+
public void testSortedInboundChannelAdapter() {
90+
// This uses the channel adapter with a custom comparator.
91+
// Files will be processed in ascending order by name: alpha/alpha, beta, gamma
92+
Message<?> message = this.sortedChannel.receive(5000);
6993
assertThat(message).isNotNull();
7094
assertThat(message.getPayload()).isInstanceOf(InputStream.class);
71-
assertThat(message.getHeaders().get(FileHeaders.REMOTE_FILE)).isEqualTo("folder1/gcsfilename");
95+
assertThat(message.getHeaders().get(FileHeaders.REMOTE_FILE)).isEqualTo("alpha/alpha");
7296

73-
message = this.gcsChannel.receive(5000);
74-
assertThat(message.getHeaders().get(FileHeaders.REMOTE_FILE)).isEqualTo("secondfilename");
97+
message = this.sortedChannel.receive(5000);
98+
assertThat(message.getHeaders().get(FileHeaders.REMOTE_FILE)).isEqualTo("beta");
7599
assertThat(message.getPayload()).isInstanceOf(InputStream.class);
76100

77-
message = this.gcsChannel.receive(10);
101+
message = this.sortedChannel.receive(5000);
102+
assertThat(message.getHeaders().get(FileHeaders.REMOTE_FILE)).isEqualTo("gamma");
103+
assertThat(message.getPayload()).isInstanceOf(InputStream.class);
104+
105+
message = this.sortedChannel.receive(10);
78106
assertThat(message).isNull();
79107
}
80108

109+
private static Blob createBlob(String bucket, String name) {
110+
Blob blob = mock(Blob.class);
111+
willAnswer((invocationOnMock) -> bucket).given(blob).getBucket();
112+
willAnswer((invocationOnMock) -> name).given(blob).getName();
113+
return blob;
114+
}
115+
81116
/**
82117
* Spring config for the tests.
83118
*/
@@ -87,35 +122,30 @@ public static class Config {
87122

88123
@Bean
89124
public Storage gcsClient() {
90-
Blob blob1 = mock(Blob.class);
91-
Blob blob2 = mock(Blob.class);
92-
93-
willAnswer((invocationOnMock) -> "gcsbucket").given(blob1).getBucket();
94-
willAnswer((invocationOnMock) -> "folder1/gcsfilename").given(blob1).getName();
95-
willAnswer((invocationOnMock) -> "gcsbucket").given(blob2).getBucket();
96-
willAnswer((invocationOnMock) -> "secondfilename").given(blob2).getName();
97-
98125
Storage gcs = mock(Storage.class);
99126

100127
willAnswer((invocationOnMock) ->
101128
new PageImpl<>(null, null,
102-
Stream.of(blob1, blob2)
129+
Stream.of(
130+
createBlob("gcsbucket", "gamma"),
131+
createBlob("gcsbucket", "beta"),
132+
createBlob("gcsbucket", "alpha/alpha"))
103133
.collect(Collectors.toList())))
104134
.given(gcs).list(eq("gcsbucket"));
105135

106-
ReadChannel channel1 = mock(ReadChannel.class);
107-
ReadChannel channel2 = mock(ReadChannel.class);
108-
willAnswer((invocationOnMock) -> channel1)
109-
.given(gcs).reader(eq("gcsbucket"), eq("folder1/gcsfilename"));
110-
willAnswer((invocationOnMock) -> channel2)
111-
.given(gcs).reader(eq("gcsbucket"), eq("secondfilename"));
136+
willAnswer((invocationOnMock) -> mock(ReadChannel.class))
137+
.given(gcs).reader(eq("gcsbucket"), eq("alpha/alpha"));
138+
willAnswer((invocationOnMock) -> mock(ReadChannel.class))
139+
.given(gcs).reader(eq("gcsbucket"), eq("beta"));
140+
willAnswer((invocationOnMock) -> mock(ReadChannel.class))
141+
.given(gcs).reader(eq("gcsbucket"), eq("gamma"));
112142

113143
return gcs;
114144
}
115145

116146
@Bean
117-
@InboundChannelAdapter(value = "gcsChannel", poller = @Poller(fixedDelay = "100"))
118-
public MessageSource<InputStream> adapter(Storage gcs) {
147+
@InboundChannelAdapter(value = "unsortedChannel", poller = @Poller(fixedDelay = "100"))
148+
public MessageSource<InputStream> unsortedChannelAdapter(Storage gcs) {
119149
GcsStreamingMessageSource adapter =
120150
new GcsStreamingMessageSource(new RemoteFileTemplate<>(new GcsSessionFactory(gcs)));
121151
adapter.setRemoteDirectory("gcsbucket");
@@ -125,7 +155,26 @@ public MessageSource<InputStream> adapter(Storage gcs) {
125155
}
126156

127157
@Bean
128-
public PollableChannel gcsChannel() {
158+
@InboundChannelAdapter(value = "sortedChannel", poller = @Poller(fixedDelay = "100"))
159+
public MessageSource<InputStream> sortedChannelAdapter(Storage gcs) {
160+
GcsStreamingMessageSource adapter =
161+
new GcsStreamingMessageSource(
162+
new RemoteFileTemplate<>(new GcsSessionFactory(gcs)),
163+
Comparator.comparing(blob -> blob.getName()));
164+
165+
adapter.setRemoteDirectory("gcsbucket");
166+
adapter.setFilter(new AcceptOnceFileListFilter<>());
167+
168+
return adapter;
169+
}
170+
171+
@Bean
172+
public PollableChannel unsortedChannel() {
173+
return new QueueChannel();
174+
}
175+
176+
@Bean
177+
public PollableChannel sortedChannel() {
129178
return new QueueChannel();
130179
}
131180
}

0 commit comments

Comments
 (0)