Skip to content

Commit bed42fb

Browse files
committed
WIP: Add a channel management client
1 parent d31b343 commit bed42fb

12 files changed

+479
-25
lines changed
Lines changed: 349 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,349 @@
1+
package com.eatthepath.pushy.apns;
2+
3+
import com.eatthepath.json.JsonParser;
4+
import com.eatthepath.json.JsonSerializer;
5+
import com.eatthepath.uuid.FastUUID;
6+
import io.netty.channel.Channel;
7+
import io.netty.handler.codec.http.HttpMethod;
8+
import io.netty.handler.codec.http.HttpResponseStatus;
9+
import io.netty.handler.codec.http2.DefaultHttp2Headers;
10+
import io.netty.handler.codec.http2.EmptyHttp2Headers;
11+
import io.netty.handler.codec.http2.Http2Headers;
12+
import io.netty.util.AsciiString;
13+
import io.netty.util.concurrent.Future;
14+
import io.netty.util.concurrent.GenericFutureListener;
15+
16+
import java.nio.charset.StandardCharsets;
17+
import java.text.ParseException;
18+
import java.util.HashMap;
19+
import java.util.List;
20+
import java.util.Map;
21+
import java.util.UUID;
22+
import java.util.concurrent.CompletableFuture;
23+
24+
class ApnsChannelManagementClient {
25+
26+
private final ApnsChannelPool channelPool;
27+
28+
private static final AsciiString PATH_PREFIX = AsciiString.of("/1/apps/");
29+
private static final AsciiString SINGLE_CHANNEL_PATH_SUFFIX = AsciiString.of("/channels");
30+
private static final AsciiString ALL_CHANNELS_PATH_SUFFIX = AsciiString.of("/all-channels");
31+
32+
private static final AsciiString CHANNEL_ID_HEADER = AsciiString.of("apns-channel-id");
33+
34+
private static final String CHANNELS_BODY_KEY = "channels";
35+
private static final String MESSAGE_STORAGE_POLICY_BODY_KEY = "message-storage-policy";
36+
private static final String PUSH_TYPE_BODY_KEY = "push-type";
37+
38+
private static final ApnsChannelPoolMetricsListener NO_OP_METRICS_LISTENER = new ApnsChannelPoolMetricsListener() {
39+
40+
@Override
41+
public void handleConnectionAdded() {
42+
}
43+
44+
@Override
45+
public void handleConnectionRemoved() {
46+
}
47+
48+
@Override
49+
public void handleConnectionCreationFailed() {
50+
}
51+
};
52+
53+
private static class SimpleCreateChannelResponse implements CreateChannelResponse {
54+
55+
private final String channelId;
56+
57+
private final int status;
58+
private final UUID requestId;
59+
60+
private SimpleCreateChannelResponse(final String channelId, final int status, final UUID requestId) {
61+
this.channelId = channelId;
62+
this.status = status;
63+
this.requestId = requestId;
64+
}
65+
66+
@Override
67+
public String getChannelId() {
68+
return channelId;
69+
}
70+
71+
@Override
72+
public int getStatus() {
73+
return status;
74+
}
75+
76+
@Override
77+
public UUID getRequestId() {
78+
return requestId;
79+
}
80+
}
81+
82+
private static class SimpleGetChannelConfigurationResponse implements GetChannelConfigurationResponse {
83+
84+
private final MessageStoragePolicy messageStoragePolicy;
85+
86+
private final int status;
87+
private final UUID requestId;
88+
89+
private SimpleGetChannelConfigurationResponse(final MessageStoragePolicy messageStoragePolicy,
90+
final int status,
91+
final UUID requestId) {
92+
93+
this.messageStoragePolicy = messageStoragePolicy;
94+
this.status = status;
95+
this.requestId = requestId;
96+
}
97+
98+
@Override
99+
public MessageStoragePolicy getMessageStoragePolicy() {
100+
return messageStoragePolicy;
101+
}
102+
103+
public int getStatus() {
104+
return status;
105+
}
106+
107+
public UUID getRequestId() {
108+
return requestId;
109+
}
110+
}
111+
112+
private static class SimpleDeleteChannelResponse implements DeleteChannelResponse {
113+
114+
private final int status;
115+
private final UUID requestId;
116+
117+
private SimpleDeleteChannelResponse(final int status, final UUID requestId) {
118+
this.status = status;
119+
this.requestId = requestId;
120+
}
121+
122+
@Override
123+
public int getStatus() {
124+
return status;
125+
}
126+
127+
@Override
128+
public UUID getRequestId() {
129+
return requestId;
130+
}
131+
}
132+
133+
private static class SimpleGetChannelIdsResponse implements GetChannelIdsResponse {
134+
135+
private final List<String> channelIds;
136+
137+
private final int status;
138+
private final UUID requestId;
139+
140+
private SimpleGetChannelIdsResponse(final List<String> channelIds, final int status, final UUID requestId) {
141+
this.channelIds = channelIds;
142+
this.status = status;
143+
this.requestId = requestId;
144+
}
145+
146+
@Override
147+
public List<String> getChannelIds() {
148+
return channelIds;
149+
}
150+
151+
@Override
152+
public int getStatus() {
153+
return status;
154+
}
155+
156+
@Override
157+
public UUID getRequestId() {
158+
return requestId;
159+
}
160+
}
161+
162+
ApnsChannelManagementClient(final ApnsClientConfiguration clientConfiguration, final ApnsClientResources clientResources) {
163+
final ApnsChannelManagementChannelFactory channelFactory =
164+
new ApnsChannelManagementChannelFactory(clientConfiguration, clientResources);
165+
166+
this.channelPool = new ApnsChannelPool(channelFactory,
167+
1,
168+
clientResources.getEventLoopGroup().next(),
169+
NO_OP_METRICS_LISTENER);
170+
}
171+
172+
173+
public CompletableFuture<CreateChannelResponse> createChannel(final String bundleId,
174+
final MessageStoragePolicy messageStoragePolicy) {
175+
176+
return createChannel(bundleId, messageStoragePolicy, null);
177+
}
178+
179+
public CompletableFuture<CreateChannelResponse> createChannel(final String bundleId,
180+
final MessageStoragePolicy messageStoragePolicy,
181+
final UUID apnsRequestId) {
182+
183+
final Map<String, Object> requestBody = new HashMap<>();
184+
requestBody.put(MESSAGE_STORAGE_POLICY_BODY_KEY, messageStoragePolicy.getCode());
185+
requestBody.put(PUSH_TYPE_BODY_KEY, "LiveActivity");
186+
187+
final ApnsChannelManagementRequest request = new ApnsChannelManagementRequest(HttpMethod.POST,
188+
EmptyHttp2Headers.INSTANCE,
189+
PATH_PREFIX.concat(bundleId).concat(SINGLE_CHANNEL_PATH_SUFFIX),
190+
JsonSerializer.writeJsonTextAsString(requestBody),
191+
apnsRequestId);
192+
193+
sendRequest(request);
194+
195+
return request.getResponseFuture()
196+
.thenApply(http2Response -> {
197+
final HttpResponseStatus status = HttpResponseStatus.parseLine(http2Response.getHeaders().status());
198+
final UUID apnsRequestIdFromResponse = getApnsRequestId(http2Response.getHeaders());
199+
200+
if (status.code() == 201) {
201+
return new SimpleCreateChannelResponse(http2Response.getHeaders().get(CHANNEL_ID_HEADER).toString(),
202+
status.code(),
203+
apnsRequestIdFromResponse);
204+
} else {
205+
throw new ChannelManagementException(status.code(), apnsRequestIdFromResponse);
206+
}
207+
});
208+
}
209+
210+
public CompletableFuture<GetChannelConfigurationResponse> getChannelConfiguration(final String bundleId,
211+
final String channelId) {
212+
213+
return getChannelConfiguration(bundleId, channelId, null);
214+
}
215+
216+
public CompletableFuture<GetChannelConfigurationResponse> getChannelConfiguration(final String bundleId,
217+
final String channelId,
218+
final UUID apnsRequestId) {
219+
220+
final Http2Headers headers = new DefaultHttp2Headers();
221+
headers.add(CHANNEL_ID_HEADER, channelId);
222+
223+
final ApnsChannelManagementRequest request = new ApnsChannelManagementRequest(HttpMethod.GET,
224+
headers,
225+
PATH_PREFIX.concat(bundleId).concat(SINGLE_CHANNEL_PATH_SUFFIX),
226+
null,
227+
apnsRequestId);
228+
229+
sendRequest(request);
230+
231+
return request.getResponseFuture()
232+
.thenApply(http2Response -> {
233+
final HttpResponseStatus status = HttpResponseStatus.parseLine(http2Response.getHeaders().status());
234+
final UUID apnsRequestIdFromResponse = getApnsRequestId(http2Response.getHeaders());
235+
236+
if (status.code() == 200) {
237+
try {
238+
final Map<String, Object> parsedResponse =
239+
new JsonParser().parseJsonObject(new String(http2Response.getData(), StandardCharsets.UTF_8));
240+
241+
final MessageStoragePolicy messageStoragePolicy =
242+
MessageStoragePolicy.getFromCode((Integer) parsedResponse.get(MESSAGE_STORAGE_POLICY_BODY_KEY));
243+
244+
return new SimpleGetChannelConfigurationResponse(messageStoragePolicy, status.code(), apnsRequestIdFromResponse);
245+
} catch (final ParseException | IllegalArgumentException e) {
246+
throw new ChannelManagementException(status.code(), apnsRequestIdFromResponse, e);
247+
}
248+
} else {
249+
throw new ChannelManagementException(status.code(), apnsRequestIdFromResponse);
250+
}
251+
});
252+
}
253+
254+
public CompletableFuture<DeleteChannelResponse> deleteChannel(final String bundleId, final String channelId) {
255+
return deleteChannel(bundleId, channelId);
256+
}
257+
258+
public CompletableFuture<DeleteChannelResponse> deleteChannel(final String bundleId,
259+
final String channelId,
260+
final UUID apnsRequestId) {
261+
262+
final Http2Headers headers = new DefaultHttp2Headers();
263+
headers.add(CHANNEL_ID_HEADER, channelId);
264+
265+
final ApnsChannelManagementRequest request = new ApnsChannelManagementRequest(HttpMethod.DELETE,
266+
headers,
267+
PATH_PREFIX.concat(bundleId).concat(SINGLE_CHANNEL_PATH_SUFFIX),
268+
null,
269+
apnsRequestId);
270+
271+
sendRequest(request);
272+
273+
return request.getResponseFuture()
274+
.thenApply(http2Response -> {
275+
final HttpResponseStatus status = HttpResponseStatus.parseLine(http2Response.getHeaders().status());
276+
final UUID apnsRequestIdFromResponse = getApnsRequestId(http2Response.getHeaders());
277+
278+
if (status.code() == 204) {
279+
return new SimpleDeleteChannelResponse(status.code(), apnsRequestIdFromResponse);
280+
} else {
281+
throw new ChannelManagementException(status.code(), apnsRequestIdFromResponse);
282+
}
283+
});
284+
}
285+
286+
public CompletableFuture<GetChannelIdsResponse> getChannelIds(final String bundleId) {
287+
return getChannelIds(bundleId);
288+
}
289+
290+
public CompletableFuture<GetChannelIdsResponse> getChannelIds(final String bundleId, final UUID apnsRequestId) {
291+
292+
final ApnsChannelManagementRequest request = new ApnsChannelManagementRequest(HttpMethod.GET,
293+
EmptyHttp2Headers.INSTANCE,
294+
PATH_PREFIX.concat(bundleId).concat(ALL_CHANNELS_PATH_SUFFIX),
295+
null,
296+
apnsRequestId);
297+
298+
sendRequest(request);
299+
300+
return request.getResponseFuture()
301+
.thenApply(http2Response -> {
302+
final HttpResponseStatus status = HttpResponseStatus.parseLine(http2Response.getHeaders().status());
303+
final UUID apnsRequestIdFromResponse = getApnsRequestId(http2Response.getHeaders());
304+
305+
if (status.code() == 200) {
306+
try {
307+
final Map<String, Object> parsedResponse =
308+
new JsonParser().parseJsonObject(new String(http2Response.getData(), StandardCharsets.UTF_8));
309+
310+
@SuppressWarnings("unchecked") final List<String> channelIds =
311+
(List<String>) parsedResponse.get(CHANNELS_BODY_KEY);
312+
313+
return new SimpleGetChannelIdsResponse(channelIds, status.code(), apnsRequestIdFromResponse);
314+
} catch (final ParseException | IllegalArgumentException e) {
315+
throw new ChannelManagementException(status.code(), apnsRequestIdFromResponse, e);
316+
}
317+
} else {
318+
throw new ChannelManagementException(status.code(), apnsRequestIdFromResponse);
319+
}
320+
});
321+
}
322+
323+
private void sendRequest(final ApnsChannelManagementRequest request) {
324+
this.channelPool.acquire().addListener((GenericFutureListener<Future<Channel>>) acquireFuture -> {
325+
if (acquireFuture.isSuccess()) {
326+
final Channel channel = acquireFuture.getNow();
327+
328+
channel.writeAndFlush(request);
329+
channelPool.release(channel);
330+
} else {
331+
request.getResponseFuture().completeExceptionally(acquireFuture.cause());
332+
}
333+
});
334+
}
335+
336+
private static UUID getApnsRequestId(final Http2Headers headers) {
337+
final CharSequence uuidSequence = headers.get(ApnsChannelManagementHandler.APNS_REQUEST_ID_HEADER);
338+
339+
try {
340+
return uuidSequence != null ? FastUUID.parseUUID(uuidSequence) : null;
341+
} catch (final IllegalArgumentException e) {
342+
return null;
343+
}
344+
}
345+
346+
Future<Void> close() {
347+
return channelPool.close();
348+
}
349+
}

pushy/src/main/java/com/eatthepath/pushy/apns/ApnsChannelManagementHandler.java

Lines changed: 5 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -71,7 +71,7 @@ class ApnsChannelManagementHandler extends Http2ConnectionHandler implements Htt
7171

7272
private Throwable connectionErrorCause;
7373

74-
private static final AsciiString APNS_ID_HEADER = new AsciiString("apns-id");
74+
static final AsciiString APNS_REQUEST_ID_HEADER = new AsciiString("apns-request-id");
7575
private static final AsciiString APNS_AUTHORIZATION_HEADER = new AsciiString("authorization");
7676

7777
private static final IOException STREAMS_EXHAUSTED_EXCEPTION =
@@ -240,6 +240,10 @@ protected Http2Headers getHeadersForRequest(final ApnsChannelManagementRequest r
240240
.method(request.getMethod().asciiName())
241241
.path(request.getPath());
242242

243+
if (request.getApnsRequestId() != null) {
244+
headers.add(APNS_REQUEST_ID_HEADER, request.getApnsRequestId().toString());
245+
}
246+
243247
if (this.authenticationToken == null) {
244248
log.debug("Generated a new authentication token for channel {} at stream {}", context.channel(), streamId);
245249
this.authenticationToken = new AuthenticationToken(this.signingKey, Instant.now());
@@ -294,21 +298,6 @@ private void handleEndOfStream(final Http2Stream stream) {
294298
ByteBufUtil.getBytes(stream.getProperty(this.responseDataPropertyKey))));
295299
}
296300

297-
private static UUID getApnsIdFromHeaders(final Http2Headers headers) {
298-
return getUUIDFromHeaders(headers, APNS_ID_HEADER);
299-
}
300-
301-
private static UUID getUUIDFromHeaders(final Http2Headers headers, final AsciiString header) {
302-
final CharSequence uuidSequence = headers.get(header);
303-
304-
try {
305-
return uuidSequence != null ? FastUUID.parseUUID(uuidSequence) : null;
306-
} catch (final IllegalArgumentException e) {
307-
log.error("Failed to parse `{}` header: {}", header, uuidSequence, e);
308-
return null;
309-
}
310-
}
311-
312301
@Override
313302
public void onPriorityRead(final ChannelHandlerContext ctx, final int streamId, final int streamDependency, final short weight, final boolean exclusive) {
314303
}

0 commit comments

Comments
 (0)