Skip to content

Commit d5c80c4

Browse files
committed
WIP: Add a channel management client
1 parent f112d4e commit d5c80c4

16 files changed

+772
-80
lines changed

pom.xml

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -123,6 +123,11 @@
123123
<artifactId>junit-jupiter</artifactId>
124124
<version>${junit.version}</version>
125125
</dependency>
126+
<dependency>
127+
<groupId>org.wiremock</groupId>
128+
<artifactId>wiremock</artifactId>
129+
<version>${wiremock.version}</version>
130+
</dependency>
126131
<dependency>
127132
<groupId>org.slf4j</groupId>
128133
<artifactId>slf4j-simple</artifactId>
@@ -147,6 +152,7 @@
147152
<netty.version>4.1.119.Final</netty.version>
148153
<junit.version>5.12.1</junit.version>
149154
<slf4j.version>1.7.21</slf4j.version>
155+
<wiremock.version>3.12.1</wiremock.version>
150156
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
151157
</properties>
152158

pushy/pom.xml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,11 @@
6767
<artifactId>slf4j-simple</artifactId>
6868
<scope>test</scope>
6969
</dependency>
70+
<dependency>
71+
<groupId>org.wiremock</groupId>
72+
<artifactId>wiremock</artifactId>
73+
<scope>test</scope>
74+
</dependency>
7075
<dependency>
7176
<groupId>io.netty</groupId>
7277
<artifactId>netty-transport-native-epoll</artifactId>
Lines changed: 349 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,349 @@
1+
package com.eatthepath.pushy.apns;
2+
3+
import com.eatthepath.json.JsonParser;
4+
import com.eatthepath.json.JsonSerializer;
5+
import com.eatthepath.uuid.FastUUID;
6+
import io.netty.channel.Channel;
7+
import io.netty.handler.codec.http.HttpMethod;
8+
import io.netty.handler.codec.http.HttpResponseStatus;
9+
import io.netty.handler.codec.http2.DefaultHttp2Headers;
10+
import io.netty.handler.codec.http2.EmptyHttp2Headers;
11+
import io.netty.handler.codec.http2.Http2Headers;
12+
import io.netty.util.AsciiString;
13+
import io.netty.util.concurrent.Future;
14+
import io.netty.util.concurrent.GenericFutureListener;
15+
16+
import java.nio.charset.StandardCharsets;
17+
import java.text.ParseException;
18+
import java.util.HashMap;
19+
import java.util.List;
20+
import java.util.Map;
21+
import java.util.UUID;
22+
import java.util.concurrent.CompletableFuture;
23+
24+
class ApnsChannelManagementClient {
25+
26+
private final ApnsChannelPool channelPool;
27+
28+
private static final AsciiString PATH_PREFIX = AsciiString.of("/1/apps/");
29+
private static final AsciiString SINGLE_CHANNEL_PATH_SUFFIX = AsciiString.of("/channels");
30+
private static final AsciiString ALL_CHANNELS_PATH_SUFFIX = AsciiString.of("/all-channels");
31+
32+
private static final AsciiString CHANNEL_ID_HEADER = AsciiString.of("apns-channel-id");
33+
34+
private static final String CHANNELS_BODY_KEY = "channels";
35+
private static final String MESSAGE_STORAGE_POLICY_BODY_KEY = "message-storage-policy";
36+
private static final String PUSH_TYPE_BODY_KEY = "push-type";
37+
38+
private static final ApnsChannelPoolMetricsListener NO_OP_METRICS_LISTENER = new ApnsChannelPoolMetricsListener() {
39+
40+
@Override
41+
public void handleConnectionAdded() {
42+
}
43+
44+
@Override
45+
public void handleConnectionRemoved() {
46+
}
47+
48+
@Override
49+
public void handleConnectionCreationFailed() {
50+
}
51+
};
52+
53+
private static class SimpleCreateChannelResponse implements CreateChannelResponse {
54+
55+
private final String channelId;
56+
57+
private final int status;
58+
private final UUID requestId;
59+
60+
private SimpleCreateChannelResponse(final String channelId, final int status, final UUID requestId) {
61+
this.channelId = channelId;
62+
this.status = status;
63+
this.requestId = requestId;
64+
}
65+
66+
@Override
67+
public String getChannelId() {
68+
return channelId;
69+
}
70+
71+
@Override
72+
public int getStatus() {
73+
return status;
74+
}
75+
76+
@Override
77+
public UUID getRequestId() {
78+
return requestId;
79+
}
80+
}
81+
82+
private static class SimpleGetChannelConfigurationResponse implements GetChannelConfigurationResponse {
83+
84+
private final MessageStoragePolicy messageStoragePolicy;
85+
86+
private final int status;
87+
private final UUID requestId;
88+
89+
private SimpleGetChannelConfigurationResponse(final MessageStoragePolicy messageStoragePolicy,
90+
final int status,
91+
final UUID requestId) {
92+
93+
this.messageStoragePolicy = messageStoragePolicy;
94+
this.status = status;
95+
this.requestId = requestId;
96+
}
97+
98+
@Override
99+
public MessageStoragePolicy getMessageStoragePolicy() {
100+
return messageStoragePolicy;
101+
}
102+
103+
public int getStatus() {
104+
return status;
105+
}
106+
107+
public UUID getRequestId() {
108+
return requestId;
109+
}
110+
}
111+
112+
private static class SimpleDeleteChannelResponse implements DeleteChannelResponse {
113+
114+
private final int status;
115+
private final UUID requestId;
116+
117+
private SimpleDeleteChannelResponse(final int status, final UUID requestId) {
118+
this.status = status;
119+
this.requestId = requestId;
120+
}
121+
122+
@Override
123+
public int getStatus() {
124+
return status;
125+
}
126+
127+
@Override
128+
public UUID getRequestId() {
129+
return requestId;
130+
}
131+
}
132+
133+
private static class SimpleGetChannelIdsResponse implements GetChannelIdsResponse {
134+
135+
private final List<String> channelIds;
136+
137+
private final int status;
138+
private final UUID requestId;
139+
140+
private SimpleGetChannelIdsResponse(final List<String> channelIds, final int status, final UUID requestId) {
141+
this.channelIds = channelIds;
142+
this.status = status;
143+
this.requestId = requestId;
144+
}
145+
146+
@Override
147+
public List<String> getChannelIds() {
148+
return channelIds;
149+
}
150+
151+
@Override
152+
public int getStatus() {
153+
return status;
154+
}
155+
156+
@Override
157+
public UUID getRequestId() {
158+
return requestId;
159+
}
160+
}
161+
162+
ApnsChannelManagementClient(final ApnsClientConfiguration clientConfiguration, final ApnsClientResources clientResources) {
163+
final ApnsChannelManagementChannelFactory channelFactory =
164+
new ApnsChannelManagementChannelFactory(clientConfiguration, clientResources);
165+
166+
this.channelPool = new ApnsChannelPool(channelFactory,
167+
1,
168+
clientResources.getEventLoopGroup().next(),
169+
NO_OP_METRICS_LISTENER);
170+
}
171+
172+
173+
public CompletableFuture<CreateChannelResponse> createChannel(final String bundleId,
174+
final MessageStoragePolicy messageStoragePolicy) {
175+
176+
return createChannel(bundleId, messageStoragePolicy, null);
177+
}
178+
179+
public CompletableFuture<CreateChannelResponse> createChannel(final String bundleId,
180+
final MessageStoragePolicy messageStoragePolicy,
181+
final UUID apnsRequestId) {
182+
183+
final Map<String, Object> requestBody = new HashMap<>();
184+
requestBody.put(MESSAGE_STORAGE_POLICY_BODY_KEY, messageStoragePolicy.getCode());
185+
requestBody.put(PUSH_TYPE_BODY_KEY, "LiveActivity");
186+
187+
final ApnsChannelManagementRequest request = new ApnsChannelManagementRequest(HttpMethod.POST,
188+
EmptyHttp2Headers.INSTANCE,
189+
PATH_PREFIX.concat(bundleId).concat(SINGLE_CHANNEL_PATH_SUFFIX),
190+
JsonSerializer.writeJsonTextAsString(requestBody),
191+
apnsRequestId);
192+
193+
sendRequest(request);
194+
195+
return request.getResponseFuture()
196+
.thenApply(http2Response -> {
197+
final HttpResponseStatus status = HttpResponseStatus.parseLine(http2Response.getHeaders().status());
198+
final UUID apnsRequestIdFromResponse = getApnsRequestId(http2Response.getHeaders());
199+
200+
if (status.code() == 201) {
201+
return new SimpleCreateChannelResponse(http2Response.getHeaders().get(CHANNEL_ID_HEADER).toString(),
202+
status.code(),
203+
apnsRequestIdFromResponse);
204+
} else {
205+
throw new ChannelManagementException(status.code(), apnsRequestIdFromResponse);
206+
}
207+
});
208+
}
209+
210+
public CompletableFuture<GetChannelConfigurationResponse> getChannelConfiguration(final String bundleId,
211+
final String channelId) {
212+
213+
return getChannelConfiguration(bundleId, channelId, null);
214+
}
215+
216+
public CompletableFuture<GetChannelConfigurationResponse> getChannelConfiguration(final String bundleId,
217+
final String channelId,
218+
final UUID apnsRequestId) {
219+
220+
final Http2Headers headers = new DefaultHttp2Headers();
221+
headers.add(CHANNEL_ID_HEADER, channelId);
222+
223+
final ApnsChannelManagementRequest request = new ApnsChannelManagementRequest(HttpMethod.GET,
224+
headers,
225+
PATH_PREFIX.concat(bundleId).concat(SINGLE_CHANNEL_PATH_SUFFIX),
226+
null,
227+
apnsRequestId);
228+
229+
sendRequest(request);
230+
231+
return request.getResponseFuture()
232+
.thenApply(http2Response -> {
233+
final HttpResponseStatus status = HttpResponseStatus.parseLine(http2Response.getHeaders().status());
234+
final UUID apnsRequestIdFromResponse = getApnsRequestId(http2Response.getHeaders());
235+
236+
if (status.code() == 200) {
237+
try {
238+
final Map<String, Object> parsedResponse =
239+
new JsonParser().parseJsonObject(new String(http2Response.getData(), StandardCharsets.UTF_8));
240+
241+
final MessageStoragePolicy messageStoragePolicy =
242+
MessageStoragePolicy.getFromCode(((Long) parsedResponse.get(MESSAGE_STORAGE_POLICY_BODY_KEY)).intValue());
243+
244+
return new SimpleGetChannelConfigurationResponse(messageStoragePolicy, status.code(), apnsRequestIdFromResponse);
245+
} catch (final ParseException | IllegalArgumentException e) {
246+
throw new ChannelManagementException(status.code(), apnsRequestIdFromResponse, e);
247+
}
248+
} else {
249+
throw new ChannelManagementException(status.code(), apnsRequestIdFromResponse);
250+
}
251+
});
252+
}
253+
254+
public CompletableFuture<DeleteChannelResponse> deleteChannel(final String bundleId, final String channelId) {
255+
return deleteChannel(bundleId, channelId, null);
256+
}
257+
258+
public CompletableFuture<DeleteChannelResponse> deleteChannel(final String bundleId,
259+
final String channelId,
260+
final UUID apnsRequestId) {
261+
262+
final Http2Headers headers = new DefaultHttp2Headers();
263+
headers.add(CHANNEL_ID_HEADER, channelId);
264+
265+
final ApnsChannelManagementRequest request = new ApnsChannelManagementRequest(HttpMethod.DELETE,
266+
headers,
267+
PATH_PREFIX.concat(bundleId).concat(SINGLE_CHANNEL_PATH_SUFFIX),
268+
null,
269+
apnsRequestId);
270+
271+
sendRequest(request);
272+
273+
return request.getResponseFuture()
274+
.thenApply(http2Response -> {
275+
final HttpResponseStatus status = HttpResponseStatus.parseLine(http2Response.getHeaders().status());
276+
final UUID apnsRequestIdFromResponse = getApnsRequestId(http2Response.getHeaders());
277+
278+
if (status.code() == 204) {
279+
return new SimpleDeleteChannelResponse(status.code(), apnsRequestIdFromResponse);
280+
} else {
281+
throw new ChannelManagementException(status.code(), apnsRequestIdFromResponse);
282+
}
283+
});
284+
}
285+
286+
public CompletableFuture<GetChannelIdsResponse> getChannelIds(final String bundleId) {
287+
return getChannelIds(bundleId, null);
288+
}
289+
290+
public CompletableFuture<GetChannelIdsResponse> getChannelIds(final String bundleId, final UUID apnsRequestId) {
291+
292+
final ApnsChannelManagementRequest request = new ApnsChannelManagementRequest(HttpMethod.GET,
293+
EmptyHttp2Headers.INSTANCE,
294+
PATH_PREFIX.concat(bundleId).concat(ALL_CHANNELS_PATH_SUFFIX),
295+
null,
296+
apnsRequestId);
297+
298+
sendRequest(request);
299+
300+
return request.getResponseFuture()
301+
.thenApply(http2Response -> {
302+
final HttpResponseStatus status = HttpResponseStatus.parseLine(http2Response.getHeaders().status());
303+
final UUID apnsRequestIdFromResponse = getApnsRequestId(http2Response.getHeaders());
304+
305+
if (status.code() == 200) {
306+
try {
307+
final Map<String, Object> parsedResponse =
308+
new JsonParser().parseJsonObject(new String(http2Response.getData(), StandardCharsets.UTF_8));
309+
310+
@SuppressWarnings("unchecked") final List<String> channelIds =
311+
(List<String>) parsedResponse.get(CHANNELS_BODY_KEY);
312+
313+
return new SimpleGetChannelIdsResponse(channelIds, status.code(), apnsRequestIdFromResponse);
314+
} catch (final ParseException | IllegalArgumentException e) {
315+
throw new ChannelManagementException(status.code(), apnsRequestIdFromResponse, e);
316+
}
317+
} else {
318+
throw new ChannelManagementException(status.code(), apnsRequestIdFromResponse);
319+
}
320+
});
321+
}
322+
323+
private void sendRequest(final ApnsChannelManagementRequest request) {
324+
this.channelPool.acquire().addListener((GenericFutureListener<Future<Channel>>) acquireFuture -> {
325+
if (acquireFuture.isSuccess()) {
326+
final Channel channel = acquireFuture.getNow();
327+
328+
channel.writeAndFlush(request);
329+
channelPool.release(channel);
330+
} else {
331+
request.getResponseFuture().completeExceptionally(acquireFuture.cause());
332+
}
333+
});
334+
}
335+
336+
private static UUID getApnsRequestId(final Http2Headers headers) {
337+
final CharSequence uuidSequence = headers.get(ApnsChannelManagementHandler.APNS_REQUEST_ID_HEADER);
338+
339+
try {
340+
return uuidSequence != null ? FastUUID.parseUUID(uuidSequence) : null;
341+
} catch (final IllegalArgumentException e) {
342+
return null;
343+
}
344+
}
345+
346+
Future<Void> close() {
347+
return channelPool.close();
348+
}
349+
}

0 commit comments

Comments
 (0)