Skip to content

Commit c444eac

Browse files
committed
Fix flaky org.opensearch.rest.ReactorNetty4StreamingStressIT.testCloseClientStreamingRequest test case
Signed-off-by: Andriy Redko <[email protected]>
1 parent 5642ce7 commit c444eac

File tree

1 file changed

+13
-30
lines changed

1 file changed

+13
-30
lines changed

plugins/transport-reactor-netty4/src/javaRestTest/java/org/opensearch/rest/ReactorNetty4StreamingStressIT.java

+13-30
Original file line numberDiff line numberDiff line change
@@ -16,24 +16,20 @@
1616
import org.opensearch.test.rest.OpenSearchRestTestCase;
1717
import org.junit.After;
1818

19+
import java.io.IOException;
1920
import java.io.InterruptedIOException;
21+
import java.io.UncheckedIOException;
2022
import java.nio.ByteBuffer;
2123
import java.nio.charset.StandardCharsets;
2224
import java.time.Duration;
23-
import java.util.concurrent.Executors;
24-
import java.util.concurrent.ScheduledExecutorService;
25-
import java.util.concurrent.TimeUnit;
2625
import java.util.concurrent.atomic.AtomicInteger;
2726
import java.util.stream.Stream;
2827

2928
import reactor.core.publisher.Flux;
29+
import reactor.test.StepVerifier;
3030
import reactor.test.subscriber.TestSubscriber;
3131

32-
import static org.hamcrest.CoreMatchers.anyOf;
3332
import static org.hamcrest.CoreMatchers.equalTo;
34-
import static org.hamcrest.CoreMatchers.instanceOf;
35-
import static org.hamcrest.CoreMatchers.not;
36-
import static org.hamcrest.collection.IsEmptyCollection.empty;
3733

3834
public class ReactorNetty4StreamingStressIT extends OpenSearchRestTestCase {
3935
@After
@@ -68,28 +64,15 @@ public void testCloseClientStreamingRequest() throws Exception {
6864
TestSubscriber<ByteBuffer> subscriber = TestSubscriber.create();
6965
streamingResponse.getBody().subscribe(subscriber);
7066

71-
final ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor();
72-
try {
73-
// Await for subscriber to receive at least one chunk
74-
assertBusy(() -> assertThat(subscriber.getReceivedOnNext(), not(empty())));
75-
76-
// Close client forceably
77-
executor.schedule(() -> {
78-
client().close();
79-
return null;
80-
}, 2, TimeUnit.SECONDS);
81-
82-
// Await for subscriber to terminate
83-
subscriber.block(Duration.ofSeconds(10));
84-
assertThat(
85-
subscriber.expectTerminalError(),
86-
anyOf(instanceOf(InterruptedIOException.class), instanceOf(ConnectionClosedException.class))
87-
);
88-
} finally {
89-
executor.shutdown();
90-
if (executor.awaitTermination(1, TimeUnit.SECONDS) == false) {
91-
executor.shutdownNow();
92-
}
93-
}
67+
StepVerifier.create(Flux.from(streamingResponse.getBody()).map(b -> new String(b.array(), StandardCharsets.UTF_8)))
68+
.expectNextMatches(s -> s.contains("\"result\":\"created\"") && s.contains("\"_id\":\"1\""))
69+
.then(() -> {
70+
try {
71+
client().close();
72+
} catch (final IOException ex) {
73+
throw new UncheckedIOException(ex);
74+
}
75+
})
76+
.expectErrorMatches(t -> t instanceof InterruptedIOException || t instanceof ConnectionClosedException);
9477
}
9578
}

0 commit comments

Comments
 (0)