8
8
9
9
package org .opensearch .rest ;
10
10
11
- import org .apache .hc .core5 .http .ConnectionClosedException ;
12
11
import org .opensearch .client .Request ;
13
12
import org .opensearch .client .Response ;
14
13
import org .opensearch .client .StreamingRequest ;
15
14
import org .opensearch .client .StreamingResponse ;
16
15
import org .opensearch .test .rest .OpenSearchRestTestCase ;
17
16
import org .junit .After ;
18
17
18
+ import java .io .IOException ;
19
19
import java .io .InterruptedIOException ;
20
+ import java .io .UncheckedIOException ;
20
21
import java .nio .ByteBuffer ;
21
22
import java .nio .charset .StandardCharsets ;
22
23
import java .time .Duration ;
23
- import java .util .concurrent .Executors ;
24
- import java .util .concurrent .ScheduledExecutorService ;
25
- import java .util .concurrent .TimeUnit ;
26
24
import java .util .concurrent .atomic .AtomicInteger ;
27
25
import java .util .stream .Stream ;
28
26
29
27
import reactor .core .publisher .Flux ;
30
- import reactor .test .subscriber .TestSubscriber ;
28
+ import reactor .test .StepVerifier ;
29
+ import reactor .test .scheduler .VirtualTimeScheduler ;
31
30
32
- import static org .hamcrest .CoreMatchers .anyOf ;
33
31
import static org .hamcrest .CoreMatchers .equalTo ;
34
- import static org .hamcrest .CoreMatchers .instanceOf ;
35
- import static org .hamcrest .CoreMatchers .not ;
36
- import static org .hamcrest .collection .IsEmptyCollection .empty ;
37
32
38
33
public class ReactorNetty4StreamingStressIT extends OpenSearchRestTestCase {
39
34
@ After
@@ -49,6 +44,8 @@ public void tearDown() throws Exception {
49
44
}
50
45
51
46
public void testCloseClientStreamingRequest () throws Exception {
47
+ final VirtualTimeScheduler scheduler = VirtualTimeScheduler .create (true );
48
+
52
49
final AtomicInteger id = new AtomicInteger (0 );
53
50
final Stream <String > stream = Stream .generate (
54
51
() -> "{ \" index\" : { \" _index\" : \" test-stress-streaming\" , \" _id\" : \" "
@@ -57,39 +54,28 @@ public void testCloseClientStreamingRequest() throws Exception {
57
54
+ "{ \" name\" : \" josh\" }\n "
58
55
);
59
56
57
+ final Duration delay = Duration .ofMillis (1 );
60
58
final StreamingRequest <ByteBuffer > streamingRequest = new StreamingRequest <>(
61
59
"POST" ,
62
60
"/_bulk/stream" ,
63
- Flux .fromStream (stream ).delayElements (Duration . ofMillis ( 500 ) ).map (s -> ByteBuffer .wrap (s .getBytes (StandardCharsets .UTF_8 )))
61
+ Flux .fromStream (stream ).delayElements (delay , scheduler ).map (s -> ByteBuffer .wrap (s .getBytes (StandardCharsets .UTF_8 )))
64
62
);
65
63
streamingRequest .addParameter ("refresh" , "true" );
66
64
67
65
final StreamingResponse <ByteBuffer > streamingResponse = client ().streamRequest (streamingRequest );
68
- TestSubscriber <ByteBuffer > subscriber = TestSubscriber .create ();
69
- streamingResponse .getBody ().subscribe (subscriber );
70
-
71
- final ScheduledExecutorService executor = Executors .newSingleThreadScheduledExecutor ();
72
- try {
73
- // Await for subscriber to receive at least one chunk
74
- assertBusy (() -> assertThat (subscriber .getReceivedOnNext (), not (empty ())));
75
-
76
- // Close client forceably
77
- executor .schedule (() -> {
78
- client ().close ();
79
- return null ;
80
- }, 2 , TimeUnit .SECONDS );
66
+ scheduler .advanceTimeBy (delay ); /* emit first element */
81
67
82
- // Await for subscriber to terminate
83
- subscriber . block ( Duration . ofSeconds ( 10 ));
84
- assertThat (
85
- subscriber . expectTerminalError (),
86
- anyOf ( instanceOf ( InterruptedIOException . class ), instanceOf ( ConnectionClosedException . class ))
87
- );
88
- } finally {
89
- executor . shutdown ();
90
- if ( executor . awaitTermination ( 1 , TimeUnit . SECONDS ) == false ) {
91
- executor . shutdownNow ();
92
- }
93
- }
68
+ StepVerifier . create ( Flux . from ( streamingResponse . getBody ()). map ( b -> new String ( b . array (), StandardCharsets . UTF_8 )))
69
+ . expectNextMatches ( s -> s . contains ( " \" result \" : \" created \" " ) && s . contains ( " \" _id \" : \" 1 \" " ))
70
+ . then (() -> {
71
+ try {
72
+ client (). close ();
73
+ } catch ( final IOException ex ) {
74
+ throw new UncheckedIOException ( ex );
75
+ }
76
+ })
77
+ . then (() -> scheduler . advanceTimeBy ( delay ))
78
+ . expectErrorMatches ( t -> t instanceof InterruptedIOException )
79
+ . verify ();
94
80
}
95
81
}
0 commit comments