Skip to content

Commit 91fb351

Browse files
committed
Polishing #2045
Use thenApplyAsync(…) instead of supplyAsync(…) to reduce allocations. Adopt tests. Original pull request: #2048.
1 parent ae99229 commit 91fb351

File tree

3 files changed

+37
-35
lines changed

3 files changed

+37
-35
lines changed

src/main/java/io/lettuce/core/cluster/topology/DefaultClusterTopologyRefresh.java

+8-13
Original file line numberDiff line numberDiff line change
@@ -105,7 +105,8 @@ public CompletionStage<Map<RedisURI, Partitions>> loadViews(Iterable<RedisURI> s
105105
Requests requestedTopology = connections.requestTopology(commandTimeoutNs, TimeUnit.NANOSECONDS);
106106
Requests requestedInfo = connections.requestInfo(commandTimeoutNs, TimeUnit.NANOSECONDS);
107107
return CompletableFuture.allOf(requestedTopology.allCompleted(), requestedInfo.allCompleted())
108-
.thenCompose(ignore -> getNodeSpecificViewsAsync(requestedTopology, requestedInfo))
108+
.thenApplyAsync(ignore -> getNodeSpecificViews(requestedTopology, requestedInfo),
109+
clientResources.eventExecutorGroup())
109110
.thenCompose(views -> {
110111
if (discovery && isEventLoopActive()) {
111112

@@ -124,11 +125,12 @@ public CompletionStage<Map<RedisURI, Partitions>> loadViews(Iterable<RedisURI> s
124125

125126
Requests additionalTopology = newConnections
126127
.requestTopology(commandTimeoutNs, TimeUnit.NANOSECONDS).mergeWith(requestedTopology);
127-
Requests additionalClients = newConnections
128-
.requestInfo(commandTimeoutNs, TimeUnit.NANOSECONDS).mergeWith(requestedInfo);
128+
Requests additionalInfo = newConnections.requestInfo(commandTimeoutNs, TimeUnit.NANOSECONDS)
129+
.mergeWith(requestedInfo);
129130
return CompletableFuture
130-
.allOf(additionalTopology.allCompleted(), additionalClients.allCompleted())
131-
.thenCompose(ignore2 -> getNodeSpecificViewsAsync(additionalTopology, additionalClients));
131+
.allOf(additionalTopology.allCompleted(), additionalInfo.allCompleted())
132+
.thenApplyAsync(ignore2 -> getNodeSpecificViews(additionalTopology, additionalInfo),
133+
clientResources.eventExecutorGroup());
132134
});
133135
}
134136

@@ -281,13 +283,6 @@ NodeTopologyViews getNodeSpecificViews(Requests requestedTopology, Requests requ
281283
return new NodeTopologyViews(views);
282284
}
283285

284-
private CompletableFuture<NodeTopologyViews> getNodeSpecificViewsAsync(Requests requestedTopology, Requests requestedInfo) {
285-
// use computation thread pool
286-
// ref: https://github.com/lettuce-io/lettuce-core/issues/2045
287-
return CompletableFuture.supplyAsync(() -> getNodeSpecificViews(requestedTopology, requestedInfo),
288-
clientResources.eventExecutorGroup());
289-
}
290-
291286
private static boolean validNode(RedisClusterNode redisClusterNode) {
292287

293288
if (redisClusterNode.is(RedisClusterNode.NodeFlag.NOADDR)) {
@@ -376,7 +371,7 @@ private boolean isEventLoopActive() {
376371

377372
private static Set<RedisURI> difference(Set<RedisURI> allKnown, Set<RedisURI> seed) {
378373

379-
Set<RedisURI> result = new TreeSet<>(TopologyComparators.RedisURIComparator.INSTANCE);
374+
Set<RedisURI> result = new TreeSet<>(TopologyComparators.RedisURIComparator.INSTANCE);
380375

381376
for (RedisURI e : allKnown) {
382377
if (!seed.contains(e)) {

src/test/java/io/lettuce/core/cluster/topology/ClusterTopologyRefreshUnitTests.java

+28-21
Original file line numberDiff line numberDiff line change
@@ -74,6 +74,7 @@ class ClusterTopologyRefreshUnitTests {
7474

7575
private static final String NODE_1_VIEW = "1 127.0.0.1:7380 master,myself - 0 1401258245007 2 disconnected 8000-11999\n"
7676
+ "2 127.0.0.1:7381 master - 111 1401258245007 222 connected 7000 12000 12002-16383\n";
77+
7778
private static final String NODE_2_VIEW = "1 127.0.0.1:7380 master - 0 1401258245007 2 disconnected 8000-11999\n"
7879
+ "2 127.0.0.1:7381 master,myself - 111 1401258245007 222 connected 7000 12000 12002-16383\n";
7980

@@ -114,6 +115,11 @@ void before() {
114115
when(clientResources.timer()).thenReturn(timer);
115116
when(clientResources.socketAddressResolver()).thenReturn(SocketAddressResolver.create(DnsResolver.unresolved()));
116117
when(clientResources.eventExecutorGroup()).thenReturn(eventExecutors);
118+
doAnswer(invocation -> {
119+
((Runnable) invocation.getArgument(0)).run();
120+
return null;
121+
}).when(eventExecutors).execute(any(Runnable.class));
122+
117123
when(connection1.async()).thenReturn(asyncCommands1);
118124
when(connection2.async()).thenReturn(asyncCommands2);
119125
when(connection1.closeAsync()).thenReturn(CompletableFuture.completedFuture(null));
@@ -299,10 +305,10 @@ void shouldAttemptToConnectOnlyOnce() {
299305

300306
when(nodeConnectionFactory.connectToNodeAsync(any(RedisCodec.class),
301307
eq(InetSocketAddress.createUnresolved("127.0.0.1", 7380))))
302-
.thenReturn(completedFuture((StatefulRedisConnection) connection1));
308+
.thenReturn(completedFuture((StatefulRedisConnection) connection1));
303309
when(nodeConnectionFactory.connectToNodeAsync(any(RedisCodec.class),
304310
eq(InetSocketAddress.createUnresolved("127.0.0.1", 7381))))
305-
.thenReturn(completedWithException(new RedisException("connection failed")));
311+
.thenReturn(completedWithException(new RedisException("connection failed")));
306312

307313
sut.loadViews(seed, Duration.ofSeconds(1), true);
308314

@@ -346,10 +352,10 @@ void shouldFailIfNoNodeConnects() {
346352

347353
when(nodeConnectionFactory.connectToNodeAsync(any(RedisCodec.class),
348354
eq(InetSocketAddress.createUnresolved("127.0.0.1", 7380))))
349-
.thenReturn(completedWithException(new RedisException("connection failed")));
355+
.thenReturn(completedWithException(new RedisException("connection failed")));
350356
when(nodeConnectionFactory.connectToNodeAsync(any(RedisCodec.class),
351357
eq(InetSocketAddress.createUnresolved("127.0.0.1", 7381))))
352-
.thenReturn(completedWithException(new RedisException("connection failed")));
358+
.thenReturn(completedWithException(new RedisException("connection failed")));
353359

354360
try {
355361
sut.loadViews(seed, Duration.ofSeconds(1), true).toCompletableFuture().join();
@@ -373,10 +379,10 @@ void shouldShouldDiscoverNodes() {
373379

374380
when(nodeConnectionFactory.connectToNodeAsync(any(RedisCodec.class),
375381
eq(InetSocketAddress.createUnresolved("127.0.0.1", 7380))))
376-
.thenReturn(completedFuture((StatefulRedisConnection) connection1));
382+
.thenReturn(completedFuture((StatefulRedisConnection) connection1));
377383
when(nodeConnectionFactory.connectToNodeAsync(any(RedisCodec.class),
378384
eq(InetSocketAddress.createUnresolved("127.0.0.1", 7381))))
379-
.thenReturn(completedFuture((StatefulRedisConnection) connection2));
385+
.thenReturn(completedFuture((StatefulRedisConnection) connection2));
380386

381387
sut.loadViews(seed, Duration.ofSeconds(1), true);
382388

@@ -393,7 +399,7 @@ void shouldShouldNotDiscoverNodes() {
393399

394400
when(nodeConnectionFactory.connectToNodeAsync(any(RedisCodec.class),
395401
eq(InetSocketAddress.createUnresolved("127.0.0.1", 7380))))
396-
.thenReturn(completedFuture((StatefulRedisConnection) connection1));
402+
.thenReturn(completedFuture((StatefulRedisConnection) connection1));
397403

398404
sut.loadViews(seed, Duration.ofSeconds(1), false);
399405

@@ -410,11 +416,11 @@ void shouldNotFailOnDuplicateSeedNodes() {
410416

411417
when(nodeConnectionFactory.connectToNodeAsync(any(RedisCodec.class),
412418
eq(InetSocketAddress.createUnresolved("127.0.0.1", 7380))))
413-
.thenReturn(completedFuture((StatefulRedisConnection) connection1));
419+
.thenReturn(completedFuture((StatefulRedisConnection) connection1));
414420

415421
when(nodeConnectionFactory.connectToNodeAsync(any(RedisCodec.class),
416422
eq(InetSocketAddress.createUnresolved("127.0.0.1", 7381))))
417-
.thenReturn(completedFuture((StatefulRedisConnection) connection2));
423+
.thenReturn(completedFuture((StatefulRedisConnection) connection2));
418424

419425
sut.loadViews(seed, Duration.ofSeconds(1), true);
420426

@@ -431,10 +437,10 @@ void shouldCloseConnections() {
431437

432438
when(nodeConnectionFactory.connectToNodeAsync(any(RedisCodec.class),
433439
eq(InetSocketAddress.createUnresolved("127.0.0.1", 7380))))
434-
.thenReturn(completedFuture((StatefulRedisConnection) connection1));
440+
.thenReturn(completedFuture((StatefulRedisConnection) connection1));
435441
when(nodeConnectionFactory.connectToNodeAsync(any(RedisCodec.class),
436442
eq(InetSocketAddress.createUnresolved("127.0.0.1", 7381))))
437-
.thenReturn(completedFuture((StatefulRedisConnection) connection2));
443+
.thenReturn(completedFuture((StatefulRedisConnection) connection2));
438444

439445
sut.loadViews(seed, Duration.ofSeconds(1), true);
440446

@@ -449,7 +455,7 @@ void undiscoveredAdditionalNodesShouldBeLastUsingClientCount() {
449455

450456
when(nodeConnectionFactory.connectToNodeAsync(any(RedisCodec.class),
451457
eq(InetSocketAddress.createUnresolved("127.0.0.1", 7380))))
452-
.thenReturn(completedFuture((StatefulRedisConnection) connection1));
458+
.thenReturn(completedFuture((StatefulRedisConnection) connection1));
453459

454460
Map<RedisURI, Partitions> partitionsMap = sut.loadViews(seed, Duration.ofSeconds(1), false).toCompletableFuture()
455461
.join();
@@ -469,18 +475,18 @@ void discoveredAdditionalNodesShouldBeOrderedUsingClientCount() {
469475

470476
when(nodeConnectionFactory.connectToNodeAsync(any(RedisCodec.class),
471477
eq(InetSocketAddress.createUnresolved("127.0.0.1", 7380))))
472-
.thenReturn(completedFuture((StatefulRedisConnection) connection1));
478+
.thenReturn(completedFuture((StatefulRedisConnection) connection1));
473479
when(nodeConnectionFactory.connectToNodeAsync(any(RedisCodec.class),
474480
eq(InetSocketAddress.createUnresolved("127.0.0.1", 7381))))
475-
.thenReturn(completedFuture((StatefulRedisConnection) connection2));
481+
.thenReturn(completedFuture((StatefulRedisConnection) connection2));
476482

477483
Map<RedisURI, Partitions> partitionsMap = sut.loadViews(seed, Duration.ofSeconds(1), true).toCompletableFuture().join();
478484

479485
Partitions partitions = partitionsMap.values().iterator().next();
480486

481487
List<RedisClusterNode> nodes = TopologyComparators.sortByClientCount(partitions);
482488

483-
assertThat(nodes).hasSize(2).extracting(RedisClusterNode::getUri).containsSequence(RedisURI.create("127.0.0.1", 7381),
489+
assertThat(nodes).hasSize(2).extracting(RedisClusterNode::getUri).contains(RedisURI.create("127.0.0.1", 7381),
484490
seed.get(0));
485491
}
486492

@@ -491,7 +497,7 @@ void undiscoveredAdditionalNodesShouldBeLastUsingLatency() {
491497

492498
when(nodeConnectionFactory.connectToNodeAsync(any(RedisCodec.class),
493499
eq(InetSocketAddress.createUnresolved("127.0.0.1", 7380))))
494-
.thenReturn(completedFuture((StatefulRedisConnection) connection1));
500+
.thenReturn(completedFuture((StatefulRedisConnection) connection1));
495501

496502
Map<RedisURI, Partitions> partitionsMap = sut.loadViews(seed, Duration.ofSeconds(1), false).toCompletableFuture()
497503
.join();
@@ -511,18 +517,18 @@ void discoveredAdditionalNodesShouldBeOrderedUsingLatency() {
511517

512518
when(nodeConnectionFactory.connectToNodeAsync(any(RedisCodec.class),
513519
eq(InetSocketAddress.createUnresolved("127.0.0.1", 7380))))
514-
.thenReturn(completedFuture((StatefulRedisConnection) connection1));
520+
.thenReturn(completedFuture((StatefulRedisConnection) connection1));
515521
when(nodeConnectionFactory.connectToNodeAsync(any(RedisCodec.class),
516522
eq(InetSocketAddress.createUnresolved("127.0.0.1", 7381))))
517-
.thenReturn(completedFuture((StatefulRedisConnection) connection2));
523+
.thenReturn(completedFuture((StatefulRedisConnection) connection2));
518524

519525
Map<RedisURI, Partitions> partitionsMap = sut.loadViews(seed, Duration.ofSeconds(1), true).toCompletableFuture().join();
520526

521527
Partitions partitions = partitionsMap.values().iterator().next();
522528

523529
List<RedisClusterNode> nodes = TopologyComparators.sortByLatency(partitions);
524530

525-
assertThat(nodes).hasSize(2).extracting(RedisClusterNode::getUri).containsSequence(RedisURI.create("127.0.0.1", 7381),
531+
assertThat(nodes).hasSize(2).extracting(RedisClusterNode::getUri).contains(RedisURI.create("127.0.0.1", 7381),
526532
seed.get(0));
527533
}
528534

@@ -533,10 +539,10 @@ void shouldPropagateCommandFailures() {
533539

534540
when(nodeConnectionFactory.connectToNodeAsync(any(RedisCodec.class),
535541
eq(InetSocketAddress.createUnresolved("127.0.0.1", 7380))))
536-
.thenReturn(completedFuture((StatefulRedisConnection) connection1));
542+
.thenReturn(completedFuture((StatefulRedisConnection) connection1));
537543
when(nodeConnectionFactory.connectToNodeAsync(any(RedisCodec.class),
538544
eq(InetSocketAddress.createUnresolved("127.0.0.1", 7381))))
539-
.thenReturn(completedFuture((StatefulRedisConnection) connection2));
545+
.thenReturn(completedFuture((StatefulRedisConnection) connection2));
540546

541547
reset(connection1, connection2);
542548

@@ -618,4 +624,5 @@ private static <T> ConnectionFuture<T> completedWithException(Exception e) {
618624

619625
return ConnectionFuture.from(InetSocketAddress.createUnresolved(TestSettings.host(), TestSettings.port()), future);
620626
}
627+
621628
}

src/test/java/io/lettuce/core/cluster/topology/TopologyRefreshIntegrationTests.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -205,7 +205,7 @@ void adaptiveTopologyUpdateIsRateLimited() {
205205
}
206206

207207
@Test
208-
void adaptiveTopologyUpdatetUsesTimeout() {
208+
void adaptiveTopologyUpdateUsesTimeout() {
209209

210210
ClusterTopologyRefreshOptions topologyRefreshOptions = ClusterTopologyRefreshOptions.builder()//
211211
.adaptiveRefreshTriggersTimeout(500, TimeUnit.MILLISECONDS)//

0 commit comments

Comments
 (0)