Skip to content
This repository was archived by the owner on Dec 4, 2017. It is now read-only.

Commit 2b2dba9

Browse files
committed
add ReactiveLoop
1 parent 253d1fe commit 2b2dba9

File tree

18 files changed

+990
-4
lines changed

18 files changed

+990
-4
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,71 @@
1+
package elasta.composer.utils;
2+
3+
import elasta.commons.Utils;
4+
import elasta.core.intfs.CallableUnckd;
5+
import io.reactivex.Observable;
6+
import io.reactivex.Single;
7+
import io.reactivex.functions.Function;
8+
import io.reactivex.functions.Predicate;
9+
import io.reactivex.subjects.PublishSubject;
10+
11+
import java.util.Objects;
12+
13+
/**
14+
* Created by sohan on 2017-09-02.
15+
*/
16+
@FunctionalInterface
17+
public interface ReactiveLoop<T> {
18+
Observable<T> start();
19+
20+
static <T> ReactiveLoop<T> create(CreateParams<T> params) {
21+
return () -> {
22+
try {
23+
final PublishSubject<T> subject = PublishSubject.create();
24+
25+
params.initialState.call()
26+
.doOnEvent((t, throwable) -> handle(t, throwable, subject))
27+
.subscribe();
28+
29+
return subject
30+
.doOnNext(t -> {
31+
32+
if (Utils.not(params.hasNext.test(t))) {
33+
subject.onComplete();
34+
return;
35+
}
36+
37+
params.nextState.apply(t)
38+
.doOnEvent((newState, throwable) -> handle(newState, throwable, subject))
39+
.subscribe();
40+
41+
});
42+
} catch (Throwable ex) {
43+
return Observable.error(ex);
44+
}
45+
};
46+
}
47+
48+
static <T> void handle(T state, Throwable throwable, PublishSubject<T> subject) {
49+
if (throwable != null) {
50+
subject.onError(throwable);
51+
} else {
52+
subject.onNext(state);
53+
}
54+
}
55+
56+
57+
final class CreateParams<T> {
58+
private final CallableUnckd<Single<T>> initialState;
59+
private final Predicate<T> hasNext;
60+
private final Function<T, Single<T>> nextState;
61+
62+
public CreateParams(CallableUnckd<Single<T>> initialState, Predicate<T> hasNext, Function<T, Single<T>> nextState) {
63+
Objects.requireNonNull(initialState);
64+
Objects.requireNonNull(hasNext);
65+
Objects.requireNonNull(nextState);
66+
this.initialState = initialState;
67+
this.hasNext = hasNext;
68+
this.nextState = nextState;
69+
}
70+
}
71+
}

proxy-config.json

+8
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,8 @@
1+
{
2+
"targetScheme": "http",
3+
"targetHost": "localhost",
4+
"targetPort": 52054,
5+
"targetPrefix": "",
6+
"uri": "/*",
7+
"port": 153
8+
}

settings.gradle

+1
Original file line numberDiff line numberDiff line change
@@ -16,4 +16,5 @@ include 'elasta-eventbus'
1616
include 'elasta-authorization'
1717
include 'tracker'
1818
include 'tracker-server'
19+
include 'simple-proxy'
1920

simple-proxy/build.gradle

+55
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,55 @@
1+
group 'com.github.codefacts'
2+
version '1.0-SNAPSHOT'
3+
4+
buildscript {
5+
repositories {
6+
jcenter()
7+
}
8+
dependencies {
9+
classpath 'com.github.jengelman.gradle.plugins:shadow:2.0.1'
10+
}
11+
}
12+
13+
apply plugin: 'com.github.johnrengelman.shadow'
14+
apply plugin: 'java'
15+
16+
sourceCompatibility = 1.8
17+
18+
repositories {
19+
mavenCentral()
20+
}
21+
22+
dependencies {
23+
24+
compile project(':commons')
25+
compile project(':elasta-core')
26+
compile project(':vertx-utils')
27+
compile project(':elasta-module')
28+
compile project(':elasta-eventbus')
29+
30+
compile group: 'io.vertx', name: 'vertx-core', version: '3.4.2'
31+
compile group: 'io.vertx', name: 'vertx-web', version: '3.4.2'
32+
33+
compile group: 'io.reactivex.rxjava2', name: 'rxjava', version: '2.1.2'
34+
35+
compile group: 'com.koushikdutta.async', name: 'androidasync', version: '2.1.7'
36+
37+
compile group: 'org.apache.logging.log4j', name: 'log4j-slf4j-impl', version: '2.8.2'
38+
compile group: 'org.apache.logging.log4j', name: 'log4j-api', version: '2.8.2'
39+
compile group: 'org.apache.logging.log4j', name: 'log4j-core', version: '2.8.2'
40+
41+
compileOnly "org.projectlombok:lombok:1.16.14"
42+
43+
testCompile group: 'junit', name: 'junit', version: '4.11'
44+
testCompileOnly "org.projectlombok:lombok:1.16.14"
45+
46+
testCompile group: 'com.squareup.okhttp3', name: 'okhttp', version: '3.8.1'
47+
48+
compile group: 'org.apache.commons', name: 'commons-csv', version: '1.4'
49+
}
50+
51+
jar {
52+
manifest {
53+
attributes 'Main-Class': 'simple.proxy.Proxy'
54+
}
55+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
package simple.proxy;
2+
3+
import java.io.IOException;
4+
5+
/**
6+
* Created by sohan on 2017-08-05.
7+
*/
8+
final public class InvalidConfigurationException extends RuntimeException {
9+
public InvalidConfigurationException(IOException e) {
10+
super(e);
11+
}
12+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,43 @@
1+
package simple.proxy;
2+
3+
import io.reactivex.Observable;
4+
5+
import java.rmi.UnexpectedException;
6+
import java.util.concurrent.TimeUnit;
7+
8+
/**
9+
* Created by sohan on 2017-08-08.
10+
*/
11+
public interface Nana {
12+
static void main(String[] args) throws InterruptedException {
13+
Observable
14+
.create(e -> {
15+
try {
16+
17+
System.out.println("### onNext: value and isDisposed" + e.isDisposed());
18+
e.onNext("value");
19+
20+
// Observable.timer(5, TimeUnit.SECONDS)
21+
// .subscribe(aLong -> {
22+
// System.out.println("## 22: " + e.isDisposed());
23+
// e.onNext("22");
24+
// if (!e.isDisposed()) {
25+
// e.onError(new UnexpectedException("error"));
26+
// }
27+
// });
28+
29+
} catch (Exception ex) {
30+
System.out.println(ex.toString());
31+
// ex.printStackTrace();
32+
}
33+
})
34+
.doOnNext(o -> System.out.println("#### got11: " + o))
35+
.flatMap(o -> Observable.timer(200, TimeUnit.MILLISECONDS).map(aLong -> o)
36+
.doOnNext(o1 -> System.out.println("###from timer")))
37+
.doOnNext(o -> System.out.println("#### got22: " + o))
38+
.timeout(0, TimeUnit.MILLISECONDS)
39+
.subscribe(o -> {
40+
}, throwable -> System.out.println("##### " + throwable));
41+
Thread.sleep(1000000);
42+
}
43+
}

0 commit comments

Comments
 (0)