9
9
import java .util .List ;
10
10
import java .util .Map ;
11
11
import java .util .concurrent .CompletableFuture ;
12
- import java .util .concurrent .ExecutionException ;
13
12
import java .util .concurrent .atomic .AtomicInteger ;
14
13
import java .util .concurrent .locks .Lock ;
15
14
import java .util .concurrent .locks .ReentrantLock ;
16
15
import java .util .function .Consumer ;
17
- import java .util .function .Supplier ;
16
+
17
+ import io .reactivex .Completable ;
18
+ import io .reactivex .Single ;
18
19
19
20
public class HubConnection {
20
21
private static final String RECORD_SEPARATOR = "\u001e " ;
@@ -32,13 +33,13 @@ public class HubConnection {
32
33
private Logger logger ;
33
34
private List <Consumer <Exception >> onClosedCallbackList ;
34
35
private boolean skipNegotiate ;
35
- private Supplier < CompletableFuture < String > > accessTokenProvider ;
36
+ private Single < String > accessTokenProvider ;
36
37
private Map <String , String > headers = new HashMap <>();
37
38
private ConnectionState connectionState = null ;
38
39
private HttpClient httpClient ;
39
40
private String stopError ;
40
41
41
- HubConnection (String url , Transport transport , boolean skipNegotiate , Logger logger , HttpClient httpClient , Supplier < CompletableFuture < String > > accessTokenProvider ) {
42
+ HubConnection (String url , Transport transport , boolean skipNegotiate , Logger logger , HttpClient httpClient , Single < String > accessTokenProvider ) {
42
43
if (url == null || url .isEmpty ()) {
43
44
throw new IllegalArgumentException ("A valid url is required." );
44
45
}
@@ -49,7 +50,7 @@ public class HubConnection {
49
50
if (accessTokenProvider != null ) {
50
51
this .accessTokenProvider = accessTokenProvider ;
51
52
} else {
52
- this .accessTokenProvider = () -> CompletableFuture . completedFuture ( null );
53
+ this .accessTokenProvider = Single . just ( "" );
53
54
}
54
55
55
56
if (httpClient != null ) {
@@ -153,14 +154,11 @@ private CompletableFuture<NegotiateResponse> handleNegotiate(String url) {
153
154
}
154
155
155
156
if (negotiateResponse .getAccessToken () != null ) {
156
- this .accessTokenProvider = () -> CompletableFuture . completedFuture (negotiateResponse .getAccessToken ());
157
+ this .accessTokenProvider = Single . just (negotiateResponse .getAccessToken ());
157
158
String token = "" ;
158
- try {
159
- // We know the future is already completed in this case
160
- // It's fine to call get() on it.
161
- token = this .accessTokenProvider .get ().get ();
162
- } catch (InterruptedException | ExecutionException e ) {
163
- }
159
+ // We know the Single is non blocking in this case
160
+ // It's fine to call blockingGet() on it.
161
+ token = this .accessTokenProvider .blockingGet ();
164
162
this .headers .put ("Authorization" , "Bearer " + token );
165
163
}
166
164
@@ -179,20 +177,21 @@ public HubConnectionState getConnectionState() {
179
177
180
178
/**
181
179
* Starts a connection to the server.
182
- * @return A completable future that completes when the connection has been established.
180
+ * @return A Completable that completes when the connection has been established.
183
181
*/
184
- public CompletableFuture < Void > start () {
182
+ public Completable start () {
185
183
if (hubConnectionState != HubConnectionState .DISCONNECTED ) {
186
- return CompletableFuture . completedFuture ( null );
184
+ return Completable . complete ( );
187
185
}
188
186
189
187
handshakeReceived = false ;
190
- CompletableFuture <Void > tokenFuture = accessTokenProvider .get ()
191
- .thenAccept ((token ) -> {
192
- if (token != null ) {
193
- this .headers .put ("Authorization" , "Bearer " + token );
194
- }
195
- });
188
+ CompletableFuture <Void > tokenFuture = new CompletableFuture <>();
189
+ accessTokenProvider .subscribe (token -> {
190
+ if (token != null && !token .isEmpty ()) {
191
+ this .headers .put ("Authorization" , "Bearer " + token );
192
+ }
193
+ tokenFuture .complete (null );
194
+ });
196
195
197
196
stopError = null ;
198
197
CompletableFuture <String > negotiate = null ;
@@ -202,7 +201,7 @@ public CompletableFuture<Void> start() {
202
201
negotiate = tokenFuture .thenCompose ((v ) -> CompletableFuture .completedFuture (baseUrl ));
203
202
}
204
203
205
- return negotiate .thenCompose (( url ) -> {
204
+ return Completable . fromFuture ( negotiate .thenCompose (url -> {
206
205
logger .log (LogLevel .Debug , "Starting HubConnection." );
207
206
if (transport == null ) {
208
207
transport = new WebSocketTransport (headers , httpClient , logger );
@@ -211,27 +210,21 @@ public CompletableFuture<Void> start() {
211
210
transport .setOnReceive (this .callback );
212
211
transport .setOnClose ((message ) -> stopConnection (message ));
213
212
214
- try {
215
- return transport .start (url ).thenCompose ((future ) -> {
216
- String handshake = HandshakeProtocol .createHandshakeRequestMessage (
217
- new HandshakeRequestMessage (protocol .getName (), protocol .getVersion ()));
218
- return transport .send (handshake ).thenRun (() -> {
219
- hubConnectionStateLock .lock ();
220
- try {
221
- hubConnectionState = HubConnectionState .CONNECTED ;
222
- connectionState = new ConnectionState (this );
223
- logger .log (LogLevel .Information , "HubConnection started." );
224
- } finally {
225
- hubConnectionStateLock .unlock ();
226
- }
227
- });
213
+ return transport .start (url ).thenCompose ((future ) -> {
214
+ String handshake = HandshakeProtocol .createHandshakeRequestMessage (
215
+ new HandshakeRequestMessage (protocol .getName (), protocol .getVersion ()));
216
+ return transport .send (handshake ).thenRun (() -> {
217
+ hubConnectionStateLock .lock ();
218
+ try {
219
+ hubConnectionState = HubConnectionState .CONNECTED ;
220
+ connectionState = new ConnectionState (this );
221
+ logger .log (LogLevel .Information , "HubConnection started." );
222
+ } finally {
223
+ hubConnectionStateLock .unlock ();
224
+ }
228
225
});
229
- } catch (RuntimeException e ) {
230
- throw e ;
231
- } catch (Exception e ) {
232
- throw new RuntimeException (e );
233
- }
234
- });
226
+ });
227
+ }));
235
228
}
236
229
237
230
private CompletableFuture <String > startNegotiate (String url , int negotiateAttempts ) {
@@ -268,7 +261,7 @@ private CompletableFuture<String> startNegotiate(String url, int negotiateAttemp
268
261
/**
269
262
* Stops a connection to the server.
270
263
* @param errorMessage An error message if the connected needs to be stopped because of an error.
271
- * @return A completable future that completes when the connection has been stopped.
264
+ * @return A Completable that completes when the connection has been stopped.
272
265
*/
273
266
private CompletableFuture <Void > stop (String errorMessage ) {
274
267
hubConnectionStateLock .lock ();
@@ -292,10 +285,10 @@ private CompletableFuture<Void> stop(String errorMessage) {
292
285
293
286
/**
294
287
* Stops a connection to the server.
295
- * @return A completable future that completes when the connection has been stopped.
288
+ * @return A Completable that completes when the connection has been stopped.
296
289
*/
297
- public CompletableFuture < Void > stop () {
298
- return stop (null );
290
+ public Completable stop () {
291
+ return Completable . fromFuture ( stop (null ) );
299
292
}
300
293
301
294
private void stopConnection (String errorMessage ) {
@@ -344,7 +337,7 @@ public void send(String method, Object... args) throws Exception {
344
337
sendHubMessage (invocationMessage );
345
338
}
346
339
347
- public <T > CompletableFuture <T > invoke (Class <T > returnType , String method , Object ... args ) throws Exception {
340
+ public <T > Single <T > invoke (Class <T > returnType , String method , Object ... args ) throws Exception {
348
341
String id = connectionState .getNextInvocationId ();
349
342
InvocationMessage invocationMessage = new InvocationMessage (id , method , args );
350
343
@@ -372,7 +365,7 @@ public <T> CompletableFuture<T> invoke(Class<T> returnType, String method, Objec
372
365
// where the map doesn't have the future yet when the response is returned
373
366
sendHubMessage (invocationMessage );
374
367
375
- return future ;
368
+ return Single . fromFuture ( future ) ;
376
369
}
377
370
378
371
private void sendHubMessage (HubMessage message ) throws Exception {
0 commit comments