Skip to content

Commit 544abdb

Browse files
committed
learning reactor
1 parent 92d53a9 commit 544abdb

File tree

8 files changed

+292
-28
lines changed

8 files changed

+292
-28
lines changed

springboot-reactor-demo/pom.xml

+8-5
Original file line numberDiff line numberDiff line change
@@ -6,9 +6,10 @@
66
<parent>
77
<groupId>org.springframework.boot</groupId>
88
<artifactId>spring-boot-starter-parent</artifactId>
9-
<version>2.1.3.RELEASE</version>
9+
<version>2.2.2.RELEASE</version>
1010
<relativePath/> <!-- lookup parent from repository -->
1111
</parent>
12+
1213
<groupId>me.zaccoding</groupId>
1314
<artifactId>demo</artifactId>
1415
<version>0.0.1-SNAPSHOT</version>
@@ -24,10 +25,6 @@
2425
<groupId>org.springframework.boot</groupId>
2526
<artifactId>spring-boot-starter-aop</artifactId>
2627
</dependency>
27-
<!--<dependency>
28-
<groupId>org.springframework.boot</groupId>
29-
<artifactId>spring-boot-starter-web</artifactId>
30-
</dependency>-->
3128
<dependency>
3229
<groupId>org.springframework.boot</groupId>
3330
<artifactId>spring-boot-starter-webflux</artifactId>
@@ -58,6 +55,12 @@
5855
<groupId>org.springframework.boot</groupId>
5956
<artifactId>spring-boot-starter-test</artifactId>
6057
<scope>test</scope>
58+
<exclusions>
59+
<exclusion>
60+
<groupId>org.junit.vintage</groupId>
61+
<artifactId>junit-vintage-engine</artifactId>
62+
</exclusion>
63+
</exclusions>
6164
</dependency>
6265
<dependency>
6366
<groupId>io.projectreactor</groupId>

springboot-reactor-demo/src/test/java/demo/DemoApplicationTests.java

-16
This file was deleted.
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,240 @@
1+
package demo.blogs;
2+
3+
import demo.helper.TestHelper;
4+
import java.util.Random;
5+
import java.util.concurrent.Callable;
6+
import java.util.concurrent.CountDownLatch;
7+
import java.util.function.BiFunction;
8+
import java.util.function.Consumer;
9+
import org.junit.jupiter.api.Test;
10+
import org.reactivestreams.Subscriber;
11+
import org.reactivestreams.Subscription;
12+
import reactor.core.publisher.BaseSubscriber;
13+
import reactor.core.publisher.Flux;
14+
import reactor.core.publisher.Mono;
15+
import reactor.core.publisher.SynchronousSink;
16+
17+
/**
18+
* https://javacan.tistory.com/entry/Reactor-Start-1-RS-Flux-Mono-Subscriber?category=699082
19+
*/
20+
public class FluxMonoSubscriber {
21+
22+
@Test
23+
public void testFluxMono() throws Exception {
24+
Flux.just(1, 2, 3); // --1-2-3-|-->
25+
Flux.just(); // --|-->
26+
Flux.range(9, 4); // // --9-10-11-12-|-->
27+
Mono.just(1); // --1-|-->
28+
Mono.empty(); // --|-->
29+
Mono.justOrEmpty(null); // --|-->
30+
Mono.justOrEmpty(1); // --1-|-->
31+
32+
TestHelper.doTask("Generate sequence with subscriber", () -> {
33+
Flux.just(1, 2, 3)
34+
.doOnNext(i -> TestHelper.printfln("doOnNext(%d)", i))
35+
.subscribe(i -> TestHelper.printfln("subscribe(%d)", i));
36+
});
37+
// ===============================================
38+
//> Generate flux with subscriber
39+
//===============================================
40+
//doOnNext(1)
41+
//subscribe(1)
42+
//doOnNext(2)
43+
//subscribe(2)
44+
//doOnNext(3)
45+
//subscribe(3)
46+
47+
TestHelper.doTask("Generate sequence and subscribe after print something", () -> {
48+
Flux<Integer> flux = Flux.just(1, 2, 3).doOnNext(i -> TestHelper.printfln("doOnNext(%d)", i));
49+
TestHelper.printfln("> Generate sequence");
50+
flux.subscribe(i -> TestHelper.printfln("subscribe(%d)", i));
51+
});
52+
// ===============================================
53+
//> Generate sequence and subscribe after print something
54+
//===============================================
55+
//> Generate sequence
56+
//doOnNext(1)
57+
//subscribe(1)
58+
//doOnNext(2)
59+
//subscribe(2)
60+
//doOnNext(3)
61+
//subscribe(3)
62+
}
63+
64+
@Test
65+
public void testSubscriber() throws Exception {
66+
TestHelper.doTask("Example of Subscriber", () -> {
67+
Flux<Integer> flux = Flux.just(1, 2, 3);
68+
final CountDownLatch countDownLatch = new CountDownLatch(1);
69+
flux.subscribe(new Subscriber<Integer>() {
70+
private Subscription subscription;
71+
72+
@Override
73+
public void onSubscribe(Subscription s) {
74+
TestHelper.printfln("onSubscribe()");
75+
this.subscription = s;
76+
// Publisher 에게 데이터 요청
77+
this.subscription.request(1);
78+
}
79+
80+
@Override
81+
public void onNext(Integer integer) {
82+
TestHelper.printfln("onNext(%d)", integer);
83+
this.subscription.request(1); // pull model
84+
// this.subscription.request(Long.MAX_VALUE); // push model
85+
}
86+
87+
@Override
88+
public void onError(Throwable t) {
89+
TestHelper.printfln("onError(%s)", t.getMessage());
90+
}
91+
92+
@Override
93+
public void onComplete() {
94+
TestHelper.printfln("onComplete()");
95+
countDownLatch.countDown();
96+
}
97+
});
98+
countDownLatch.await();
99+
});
100+
// ===============================================
101+
//> Example of Subscriber
102+
//===============================================
103+
//onSubscribe()
104+
//onNext(1)
105+
//onNext(2)
106+
//onNext(3)
107+
//onComplete()
108+
}
109+
110+
@Test
111+
public void testFluxGenerate() throws Exception {
112+
TestHelper.doTask("Flux#generate() example", () -> {
113+
Consumer<SynchronousSink<Integer>> randGen = new Consumer<SynchronousSink<Integer>>() {
114+
private int emitCount = 0;
115+
private Random rand = new Random();
116+
117+
@Override
118+
public void accept(SynchronousSink<Integer> sink) {
119+
emitCount++;
120+
int randData = rand.nextInt(100) + 1;
121+
TestHelper.printfln("Generator sink::next(%d)", randData);
122+
sink.next(randData);
123+
124+
if (emitCount == 10) {
125+
TestHelper.printfln("Generator sink complete");
126+
sink.complete();
127+
}
128+
}
129+
};
130+
131+
Flux<Integer> flux = Flux.generate(randGen);
132+
TestHelper.printfln("> Generate sequence");
133+
flux.subscribe(new BaseSubscriber<Integer>() {
134+
private int receiveCount = 0;
135+
136+
@Override
137+
protected void hookOnSubscribe(Subscription subscription) {
138+
TestHelper.printfln("Subscriber#onSubscribe request first 3 items");
139+
request(3L);
140+
}
141+
142+
@Override
143+
protected void hookOnNext(Integer value) {
144+
TestHelper.printfln("Subscriber#onNext(%d)", value);
145+
receiveCount++;
146+
147+
if (receiveCount % 3 == 0) {
148+
TestHelper.printfln("Subscriber request next 3 items");
149+
request(3L);
150+
}
151+
}
152+
153+
@Override
154+
protected void hookOnComplete() {
155+
TestHelper.printfln("Subscriber#onComplete()");
156+
}
157+
});
158+
});
159+
160+
//===============================================
161+
//> Flux#generate() example
162+
//===============================================
163+
//> Generate sequence
164+
//Subscriber#onSubscribe request first 3 items
165+
//Generator sink::next(28)
166+
//Subscriber#onNext(28)
167+
//Generator sink::next(16)
168+
//Subscriber#onNext(16)
169+
//Generator sink::next(56)
170+
//Subscriber#onNext(56)
171+
//Subscriber request next 3 items
172+
173+
//Generator sink::next(81)
174+
//Subscriber#onNext(81)
175+
//Generator sink::next(59)
176+
//Subscriber#onNext(59)
177+
//Generator sink::next(55)
178+
//Subscriber#onNext(55)
179+
//Subscriber request next 3 items
180+
181+
//Generator sink::next(28)
182+
//Subscriber#onNext(28)
183+
//Generator sink::next(95)
184+
//Subscriber#onNext(95)
185+
//Generator sink::next(14)
186+
//Subscriber#onNext(14)
187+
//Subscriber request next 3 items
188+
189+
//Generator sink::next(96)
190+
//Subscriber#onNext(96)
191+
//Generator sink complete
192+
//Subscriber#onComplete()
193+
194+
TestHelper.doTask("Flux#generate(Callable<S> stateSupplier, BiFunction<S, SynchronousSink<T>, S> generator) example", () -> {
195+
// Callable<S> stateSupplier, BiFunction<S, SynchronousSink<T>, S> generator
196+
final CountDownLatch countDownLatch = new CountDownLatch(1);
197+
Flux<String> flux = Flux.generate(new Callable<Integer>() {
198+
private final Integer val = 1;
199+
200+
@Override
201+
public Integer call() throws Exception {
202+
TestHelper.printfln("StateSupplier#call():%d", val);
203+
return val;
204+
}
205+
}, new BiFunction<Integer, SynchronousSink<String>, Integer>() {
206+
private final String format = "3 x %d = %d";
207+
208+
@Override
209+
public Integer apply(Integer state, SynchronousSink<String> sink) {
210+
sink.next(String.format(format, state, state));
211+
212+
if (state == 10) {
213+
sink.complete();
214+
countDownLatch.countDown();
215+
}
216+
217+
return state + 1;
218+
}
219+
});
220+
221+
flux.subscribe(s -> TestHelper.printfln("Subscriber %s", s));
222+
countDownLatch.await();
223+
});
224+
225+
//===============================================
226+
//> Flux#generate(Callable<S> stateSupplier, BiFunction<S, SynchronousSink<T>, S> generator) example
227+
//===============================================
228+
//StateSupplier#call():1
229+
//Subscriber 3 x 1 = 1
230+
//Subscriber 3 x 2 = 2
231+
//Subscriber 3 x 3 = 3
232+
//Subscriber 3 x 4 = 4
233+
//Subscriber 3 x 5 = 5
234+
//Subscriber 3 x 6 = 6
235+
//Subscriber 3 x 7 = 7
236+
//Subscriber 3 x 8 = 8
237+
//Subscriber 3 x 9 = 9
238+
//Subscriber 3 x 10 = 10
239+
}
240+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,4 @@
1+
/**
2+
* learning reactor from https://javacan.tistory.com/category/Reactive
3+
*/
4+
package demo.blogs;
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
package demo.helper;
2+
3+
/**
4+
*
5+
*/
6+
public class TestHelper {
7+
8+
public static void doTask(String title, TestTask task) {
9+
printfln("===============================================");
10+
printfln("> %s", title);
11+
printfln("===============================================");
12+
try {
13+
task.run();
14+
} catch (Exception e) {
15+
e.printStackTrace(System.err);
16+
}
17+
}
18+
19+
public static void printfln(String format, Object... args) {
20+
System.out.flush();
21+
System.err.flush();
22+
23+
System.out.printf(format, args);
24+
System.out.println();
25+
26+
System.out.flush();
27+
System.err.flush();
28+
}
29+
30+
@FunctionalInterface
31+
public interface TestTask {
32+
33+
void run() throws Exception;
34+
}
35+
}

springboot-reactor-demo/src/test/java/demo/publisher1/Publisher1Tests.java

+1-3
Original file line numberDiff line numberDiff line change
@@ -3,10 +3,8 @@
33
import static org.assertj.core.api.Assertions.assertThat;
44

55
import demo.dto.Pair;
6-
import demo.util.ThreadUtil;
76
import java.util.concurrent.CountDownLatch;
8-
import javax.swing.TransferHandler;
9-
import org.junit.Test;
7+
import org.junit.jupiter.api.Test;
108
import reactor.core.Disposable;
119
import reactor.core.publisher.Flux;
1210

springboot-reactor-demo/src/test/java/demo/reactor/BasicLearnTest.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@
88
import java.util.List;
99
import java.util.Map;
1010
import java.util.function.Consumer;
11-
import org.junit.Test;
11+
import org.junit.jupiter.api.Test;
1212
import reactor.core.publisher.Flux;
1313
import reactor.core.publisher.GroupedFlux;
1414
import reactor.core.publisher.Mono;

springboot-reactor-demo/src/test/java/demo/reactor/FluxTest.java

+3-3
Original file line numberDiff line numberDiff line change
@@ -6,8 +6,8 @@
66
import java.util.Random;
77
import java.util.concurrent.TimeUnit;
88
import java.util.stream.Collectors;
9-
import org.junit.Before;
10-
import org.junit.Test;
9+
import org.junit.jupiter.api.BeforeEach;
10+
import org.junit.jupiter.api.Test;
1111
import reactor.core.publisher.EmitterProcessor;
1212
import reactor.core.publisher.Flux;
1313
import reactor.core.publisher.FluxSink;
@@ -23,7 +23,7 @@ public class FluxTest {
2323
FluxSink<String> fluxSink;
2424
Flux<String> flux;
2525

26-
@Before
26+
@BeforeEach
2727
public void setUp() {
2828
this.emitterProcessor = EmitterProcessor.create();
2929
this.fluxSink = emitterProcessor.sink(OverflowStrategy.DROP);

0 commit comments

Comments
 (0)