Skip to content

Commit c4c29da

Browse files
authored
Merge pull request #13962 from jdaugherty/rxjava3
2 parents d561aa1 + f31f863 commit c4c29da

File tree

13 files changed

+740
-0
lines changed

13 files changed

+740
-0
lines changed

gradle.properties

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ plexusSisuInjectVersion=2.6.0
1717
slf4jVersion=2.0.16
1818
rxJava1Version=1.3.8
1919
rxJava2Version=2.2.21
20+
rxJava3Version=3.1.10
2021
gparsVersion=1.2.1
2122

2223
org.gradle.caching=true

grails-async/rxjava3/build.gradle

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,13 @@
1+
dependencies {
2+
implementation platform(project(':grails-bom'))
3+
4+
api project(':grails-async-core')
5+
api "io.reactivex.rxjava3:rxjava:$rxJava3Version"
6+
7+
implementation 'org.apache.groovy:groovy'
8+
implementation 'org.slf4j:slf4j-api'
9+
10+
testImplementation 'org.spockframework:spock-core'
11+
12+
testRuntimeOnly 'org.slf4j:slf4j-nop' // Get rid of warning about missing slf4j implementation during test task
13+
}
Lines changed: 159 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,159 @@
1+
package org.grails.async.factory.rxjava3
2+
3+
import grails.async.Promise
4+
import groovy.transform.AutoFinal
5+
import groovy.transform.CompileStatic
6+
import groovy.transform.PackageScope
7+
import io.reactivex.rxjava3.core.Observable
8+
import io.reactivex.rxjava3.core.Observer
9+
import io.reactivex.rxjava3.core.Scheduler
10+
import io.reactivex.rxjava3.core.Single
11+
import io.reactivex.rxjava3.core.SingleEmitter
12+
import io.reactivex.rxjava3.core.SingleOnSubscribe
13+
import io.reactivex.rxjava3.disposables.Disposable
14+
import io.reactivex.rxjava3.functions.Consumer
15+
import io.reactivex.rxjava3.functions.Function
16+
import io.reactivex.rxjava3.subjects.ReplaySubject
17+
import io.reactivex.rxjava3.subjects.Subject
18+
import org.grails.async.factory.BoundPromise
19+
20+
import java.util.concurrent.ExecutionException
21+
import java.util.concurrent.TimeUnit
22+
import java.util.concurrent.TimeoutException
23+
24+
/**
25+
* Promise based on RxJava 3.x
26+
*
27+
* @since 7.0
28+
*/
29+
@AutoFinal
30+
@CompileStatic
31+
@PackageScope
32+
class RxPromise<T> implements Promise<T> {
33+
34+
protected final Subject<T> subject
35+
protected final RxPromiseFactory promiseFactory
36+
protected final Observable<T> observable
37+
38+
protected Disposable subscription
39+
protected boolean finished = false
40+
41+
RxPromise(RxPromiseFactory promiseFactory, Closure callable, Scheduler scheduler) {
42+
this(promiseFactory, Single.create( { SingleEmitter<? super T> singleSubscriber ->
43+
try {
44+
singleSubscriber.onSuccess((T)callable.call())
45+
} catch (Throwable t) {
46+
singleSubscriber.onError(t)
47+
}
48+
} as SingleOnSubscribe<T>)
49+
.subscribeOn(scheduler))
50+
}
51+
52+
RxPromise(RxPromiseFactory promiseFactory, Observable single) {
53+
this(promiseFactory, single, ReplaySubject.create(1))
54+
}
55+
56+
RxPromise(RxPromiseFactory promiseFactory, Single single) {
57+
this(promiseFactory, single, ReplaySubject.create(1))
58+
}
59+
60+
RxPromise(RxPromiseFactory promiseFactory, Single single, Subject subject) {
61+
this(promiseFactory, single.toObservable(), subject)
62+
}
63+
64+
RxPromise(RxPromiseFactory promiseFactory, Observable observable, Subject subject) {
65+
this.observable = observable
66+
this.promiseFactory = promiseFactory
67+
observable.subscribe(new Observer<T>() {
68+
@Override
69+
void onSubscribe(Disposable d) {
70+
subscription = d
71+
}
72+
73+
@Override
74+
void onNext(T t) {
75+
subject.onNext(t)
76+
}
77+
78+
@Override
79+
void onError(Throwable e) {
80+
subject.onError(e)
81+
}
82+
83+
@Override
84+
void onComplete() {
85+
finished = true
86+
}
87+
})
88+
this.subject = subject
89+
}
90+
91+
Observable<T> toObservable() {
92+
return this.observable
93+
}
94+
95+
@Override
96+
Promise<T> accept(T value) {
97+
return new BoundPromise<T>(value)
98+
}
99+
100+
@Override
101+
Promise<T> onComplete(Closure<T> callable) {
102+
def decoratedCallable = promiseFactory.applyDecorators(callable, null)
103+
return new RxPromise<T>(promiseFactory, subject.map(decoratedCallable as Function<T, T>))
104+
}
105+
106+
@Override
107+
Promise<T> onError(Closure<T> callable) {
108+
def decoratedCallable = promiseFactory.applyDecorators(callable, null)
109+
return new RxPromise<T>(promiseFactory, subject.doOnError(decoratedCallable as Consumer<Throwable>))
110+
}
111+
112+
@Override
113+
Promise<T> then(Closure<T> callable) {
114+
return onComplete(callable)
115+
}
116+
117+
@Override
118+
boolean cancel(boolean mayInterruptIfRunning) {
119+
if(subscription != null) {
120+
subscription.dispose()
121+
return subscription.isDisposed()
122+
}
123+
return false
124+
}
125+
126+
@Override
127+
boolean isCancelled() {
128+
if(subscription == null) {
129+
return false
130+
}
131+
else {
132+
return subscription.isDisposed()
133+
}
134+
}
135+
136+
@Override
137+
boolean isDone() {
138+
return finished
139+
}
140+
141+
@Override
142+
T get() throws InterruptedException, ExecutionException {
143+
return subject.blockingFirst()
144+
}
145+
146+
@Override
147+
T get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
148+
try {
149+
return subject.timeout(timeout, unit).blockingFirst()
150+
} catch (Throwable e) {
151+
if(e.cause instanceof TimeoutException) {
152+
throw e.cause
153+
}
154+
else {
155+
throw e
156+
}
157+
}
158+
}
159+
}
Lines changed: 78 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,78 @@
1+
package org.grails.async.factory.rxjava3
2+
3+
import grails.async.Promise
4+
import grails.async.PromiseList
5+
import grails.async.factory.AbstractPromiseFactory
6+
import groovy.transform.AutoFinal
7+
import groovy.transform.CompileStatic
8+
import io.reactivex.rxjava3.core.Observable
9+
import io.reactivex.rxjava3.core.Single
10+
import io.reactivex.rxjava3.schedulers.Schedulers
11+
import org.grails.async.factory.BoundPromise
12+
13+
import java.util.concurrent.TimeUnit
14+
15+
/**
16+
* An RxJava {@link grails.async.PromiseFactory} implementation
17+
*/
18+
@AutoFinal
19+
@CompileStatic
20+
class RxPromiseFactory extends AbstractPromiseFactory {
21+
22+
@Override
23+
<T> Promise<T> createPromise(Class<T> returnType) {
24+
new RxPromise<T>(this, Single.just(null))
25+
}
26+
27+
@Override
28+
Promise<Object> createPromise() {
29+
new RxPromise<Object>(this, Single.just(null))
30+
}
31+
32+
@Override
33+
<T> Promise<T> createPromise(Closure<T>[] closures) {
34+
if(closures.length == 1) {
35+
return new RxPromise<T>(this, closures[0], Schedulers.io())
36+
}
37+
else {
38+
def promiseList = new PromiseList()
39+
for (Closure closure : closures) {
40+
promiseList.add(closure)
41+
}
42+
return promiseList
43+
}
44+
}
45+
46+
@Override
47+
<T> List<T> waitAll(List<Promise<T>> promises) {
48+
return promises.collect { Promise<T> p -> p.get() }
49+
}
50+
51+
@Override
52+
<T> List<T> waitAll(List<Promise<T>> promises, long timeout, TimeUnit units) {
53+
promises.collect { Promise<T> p -> p.get(timeout, units) }
54+
}
55+
56+
@Override
57+
<T> Promise<T> onComplete(List<Promise<T>> promises, Closure<T> callable) {
58+
new RxPromise<T>(this, Observable.concat(
59+
promises.collect { Promise p ->
60+
if(p instanceof BoundPromise) {
61+
return Observable.just(((BoundPromise)p).value) as Observable<T>
62+
}
63+
else {
64+
return ((RxPromise)p).toObservable() as Observable<T>
65+
}
66+
}
67+
).toList())
68+
.onComplete(callable)
69+
}
70+
71+
@Override
72+
<T> Promise<List<T>> onError(List<Promise<T>> promises, Closure<?> callable) {
73+
new RxPromise(this, Observable.concat(
74+
promises.collect { ((RxPromise<T>)it).toObservable() }
75+
).toList())
76+
.onError(callable) as Promise<List<T>>
77+
}
78+
}
Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
org.grails.async.factory.rxjava3.RxPromiseFactory
Lines changed: 80 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,80 @@
1+
package org.grails.async.factory.rxjava3
2+
3+
import grails.async.PromiseList
4+
import spock.lang.Specification
5+
import spock.util.concurrent.PollingConditions
6+
7+
class RxPromiseListSpec extends Specification {
8+
9+
void 'Test promise list handling'() {
10+
11+
when: 'a list of promises is created'
12+
def list = new PromiseList()
13+
list << { 1 }
14+
list << { 2 }
15+
list << { 3 }
16+
def result = null
17+
list.onComplete { result = it }
18+
19+
then: 'then the result from onComplete is correct'
20+
new PollingConditions(timeout: 5).eventually {
21+
result == [1,2,3]
22+
}
23+
}
24+
25+
void 'Test promise list handling with some async operations and some values'() {
26+
27+
when: 'a list of promises is created'
28+
def list = new PromiseList()
29+
list << { 1 }
30+
list << 2
31+
list << { 3 }
32+
def result = null
33+
list.onComplete { result = it }
34+
35+
then: 'then the result from onComplete is correct'
36+
new PollingConditions(timeout: 5).eventually {
37+
result == [1,2,3]
38+
}
39+
}
40+
41+
void 'Test promise list with then chaining'() {
42+
43+
when: 'a promise list is used with then chaining'
44+
def list = new PromiseList()
45+
list << { 1 }
46+
def promise = list
47+
.then { it << 2; it }
48+
.then {
49+
Thread.dumpStack()
50+
it << 3; it
51+
}
52+
def result = promise.get()
53+
54+
then: 'An appropriately populated list is produced'
55+
result == [1,2,3]
56+
57+
}
58+
59+
void 'Test promise list with an exception'() {
60+
61+
given: 'a promise list with a promise that throws an exception'
62+
def list = new PromiseList()
63+
list << { 1 }
64+
list << { throw new RuntimeException('bad') }
65+
list << { 3 }
66+
67+
when: 'the list is completed'
68+
def result = null
69+
Throwable error = null
70+
list.onComplete { result = it }
71+
list.onError { error = it }.get()
72+
list.get()
73+
74+
then: 'the onError handler is invoked with the exception'
75+
thrown(RuntimeException)
76+
!result
77+
error
78+
error.message == 'bad'
79+
}
80+
}

0 commit comments

Comments
 (0)