Skip to content

Commit 271a3b4

Browse files
committed
Upgrade to RfJava2 for testing reactive streams, close #1380
Motivation: ReactiveStreamsTest#testConnectionDoesNotGetClosed randomly fails. The server sometimes receives chunks out of order. Modifications: The issue seems to be a RxJava1 one. Upgrade to RxJava2. Result: No more erroneous random test failures
1 parent 919e478 commit 271a3b4

File tree

5 files changed

+30
-31
lines changed

5 files changed

+30
-31
lines changed

client/pom.xml

+5-2
Original file line numberDiff line numberDiff line change
@@ -50,12 +50,15 @@
5050
<dependency>
5151
<groupId>org.reactivestreams</groupId>
5252
<artifactId>reactive-streams</artifactId>
53-
<version>1.0.0</version>
5453
</dependency>
5554
<dependency>
5655
<groupId>com.typesafe.netty</groupId>
5756
<artifactId>netty-reactive-streams</artifactId>
58-
<version>2.0.0</version>
57+
</dependency>
58+
<dependency>
59+
<groupId>io.reactivex.rxjava2</groupId>
60+
<artifactId>rxjava</artifactId>
61+
<scope>test</scope>
5962
</dependency>
6063
</dependencies>
6164
</project>

client/src/test/java/org/asynchttpclient/reactivestreams/ReactiveStreamsTest.java

+6-19
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
import io.netty.buffer.ByteBuf;
2020
import io.netty.buffer.Unpooled;
2121
import io.netty.handler.codec.http.HttpHeaders;
22+
import io.reactivex.Flowable;
2223

2324
import java.io.ByteArrayOutputStream;
2425
import java.io.File;
@@ -60,16 +61,12 @@
6061
import org.testng.annotations.BeforeClass;
6162
import org.testng.annotations.Test;
6263

63-
import rx.Observable;
64-
import rx.RxReactiveStreams;
65-
6664
public class ReactiveStreamsTest {
6765

6866
private static final Logger LOGGER = LoggerFactory.getLogger(ReactiveStreamsTest.class);
6967

7068
public static Publisher<ByteBuf> createPublisher(final byte[] bytes, final int chunkSize) {
71-
Observable<ByteBuf> observable = Observable.from(new ByteBufIterable(bytes, chunkSize));
72-
return RxReactiveStreams.toPublisher(observable);
69+
return Flowable.fromIterable(new ByteBufIterable(bytes, chunkSize));
7370
}
7471

7572
private Tomcat tomcat;
@@ -236,11 +233,7 @@ public void testConnectionDoesNotGetClosed() throws Exception {
236233

237234
byte[] responseBody = response.getResponseBodyAsBytes();
238235
responseBody = response.getResponseBodyAsBytes();
239-
assertEquals(
240-
Integer.valueOf(response.getHeader("X-" + CONTENT_LENGTH)).intValue(),
241-
LARGE_IMAGE_BYTES.length,
242-
"Server side payload length invalid"
243-
);
236+
assertEquals(Integer.valueOf(response.getHeader("X-" + CONTENT_LENGTH)).intValue(), LARGE_IMAGE_BYTES.length, "Server side payload length invalid");
244237
assertEquals(responseBody.length, LARGE_IMAGE_BYTES.length, "Client side payload length invalid");
245238
assertEquals(response.getHeader(CONTENT_MD5), expectedMd5, "Server side payload MD5 invalid");
246239
assertEquals(TestUtils.md5(responseBody), expectedMd5, "Client side payload MD5 invalid");
@@ -249,11 +242,7 @@ public void testConnectionDoesNotGetClosed() throws Exception {
249242
response = requestBuilder.execute().get();
250243
assertEquals(response.getStatusCode(), 200);
251244
responseBody = response.getResponseBodyAsBytes();
252-
assertEquals(
253-
Integer.valueOf(response.getHeader("X-" + CONTENT_LENGTH)).intValue(),
254-
LARGE_IMAGE_BYTES.length,
255-
"Server side payload length invalid"
256-
);
245+
assertEquals(Integer.valueOf(response.getHeader("X-" + CONTENT_LENGTH)).intValue(), LARGE_IMAGE_BYTES.length, "Server side payload length invalid");
257246
assertEquals(responseBody.length, LARGE_IMAGE_BYTES.length, "Client side payload length invalid");
258247

259248
try {
@@ -285,9 +274,7 @@ public static void main(String[] args) throws Exception {
285274
@Test(groups = "standalone", expectedExceptions = ExecutionException.class)
286275
public void testFailingStream() throws Exception {
287276
try (AsyncHttpClient client = asyncHttpClient(config().setRequestTimeout(100 * 6000))) {
288-
Observable<ByteBuf> failingObservable = Observable.error(new FailedStream());
289-
Publisher<ByteBuf> failingPublisher = RxReactiveStreams.toPublisher(failingObservable);
290-
277+
Publisher<ByteBuf> failingPublisher = Flowable.error(new FailedStream());
291278
client.preparePut(getTargetUrl()).setBody(failingPublisher).execute().get();
292279
}
293280
}
@@ -520,7 +507,7 @@ public ByteBufIterable(byte[] payload, int chunkSize) {
520507
@Override
521508
public Iterator<ByteBuf> iterator() {
522509
return new Iterator<ByteBuf>() {
523-
private volatile int currentIndex = 0;
510+
private int currentIndex = 0;
524511

525512
@Override
526513
public boolean hasNext() {

extras/rxjava/pom.xml

-1
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,6 @@
1212
<dependency>
1313
<groupId>io.reactivex</groupId>
1414
<artifactId>rxjava</artifactId>
15-
<version>1.2.9</version>
1615
</dependency>
1716
</dependencies>
1817
</project>

extras/rxjava2/pom.xml

-1
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,6 @@
1212
<dependency>
1313
<groupId>io.reactivex.rxjava2</groupId>
1414
<artifactId>rxjava</artifactId>
15-
<version>2.0.8</version>
1615
</dependency>
1716
</dependencies>
1817
</project>

pom.xml

+19-8
Original file line numberDiff line numberDiff line change
@@ -257,11 +257,26 @@
257257
<version>${netty.version}</version>
258258
<optional>true</optional>
259259
</dependency>
260+
<dependency>
261+
<groupId>org.reactivestreams</groupId>
262+
<artifactId>reactive-streams</artifactId>
263+
<version>${reactive-streams.version}</version>
264+
</dependency>
265+
<dependency>
266+
<groupId>com.typesafe.netty</groupId>
267+
<artifactId>netty-reactive-streams</artifactId>
268+
<version>${netty-reactive-streams.version}</version>
269+
</dependency>
260270
<dependency>
261271
<groupId>io.reactivex</groupId>
262272
<artifactId>rxjava</artifactId>
263273
<version>${rxjava.version}</version>
264274
</dependency>
275+
<dependency>
276+
<groupId>io.reactivex.rxjava2</groupId>
277+
<artifactId>rxjava</artifactId>
278+
<version>${rxjava2.version}</version>
279+
</dependency>
265280
</dependencies>
266281
</dependencyManagement>
267282
<dependencies>
@@ -349,12 +364,6 @@
349364
<version>${privilegedaccessor.version}</version>
350365
<scope>test</scope>
351366
</dependency>
352-
<dependency>
353-
<groupId>io.reactivex</groupId>
354-
<artifactId>rxjava-reactive-streams</artifactId>
355-
<version>${rxjava-reactive-streams.version}</version>
356-
<scope>test</scope>
357-
</dependency>
358367
<dependency>
359368
<groupId>org.powermock</groupId>
360369
<artifactId>powermock-module-testng</artifactId>
@@ -375,15 +384,17 @@
375384
<target.property>1.8</target.property>
376385
<netty.version>4.1.13.Final</netty.version>
377386
<slf4j.version>1.7.25</slf4j.version>
387+
<reactive-streams.version>1.0.0</reactive-streams.version>
388+
<netty-reactive-streams.version>2.0.0</netty-reactive-streams.version>
389+
<rxjava.version>1.3.0</rxjava.version>
390+
<rxjava2.version>2.1.2</rxjava2.version>
378391
<logback.version>1.2.3</logback.version>
379392
<testng.version>6.9.10</testng.version>
380393
<jetty.version>9.4.6.v20170531</jetty.version>
381394
<tomcat.version>8.5.14</tomcat.version>
382395
<commons-io.version>2.4</commons-io.version>
383396
<commons-fileupload.version>1.3</commons-fileupload.version>
384397
<privilegedaccessor.version>1.2.2</privilegedaccessor.version>
385-
<rxjava-reactive-streams.version>1.2.1</rxjava-reactive-streams.version>
386-
<rxjava.version>1.3.0</rxjava.version>
387398
<powermock.version>1.6.4</powermock.version>
388399
</properties>
389400
</project>

0 commit comments

Comments
 (0)