1
1
package com .github .wesleyegberto .collections .reactive ;
2
2
3
+ import java .util .concurrent .Flow ;
3
4
import java .util .concurrent .Flow .Subscriber ;
4
5
import java .util .concurrent .Flow .Subscription ;
5
6
import java .util .concurrent .SubmissionPublisher ;
6
7
import java .util .stream .IntStream ;
7
8
8
9
public class FlowTest {
9
- public static void main (String [] args ) {
10
- try (SubmissionPublisher <String > publisher = new SubmissionPublisher <>()) {
11
- publisher .subscribe (new MySubscriber ());
12
-
13
- IntStream .range (0 , 15 )
14
- .mapToObj (String ::valueOf )
15
- .forEach (publisher ::submit );
16
- }
17
- }
10
+ public static void main (String [] args ) throws InterruptedException {
11
+ try (SubmissionPublisher <String > publisher = new SubmissionPublisher <>()) {
12
+ publisher .subscribe (new MySubscriber ());
13
+
14
+ IntStream .range (0 , 15 )
15
+ .mapToObj (String ::valueOf )
16
+ .forEach (publisher ::submit );
17
+ }
18
+ Thread .sleep (10000 );
19
+ }
18
20
}
19
21
20
22
class MySubscriber implements Subscriber <String > {
21
- @ Override
22
- public void onComplete () {
23
- System .out .println ("Completed" );
24
- }
25
-
26
- @ Override
27
- public void onError (Throwable err ) {
28
- System .out .println ("Error: " + err .getMessage ());
29
- }
30
-
31
- @ Override
32
- public void onNext (String item ) {
33
- System .out .println ("Next: " + item );
34
- }
35
-
36
- @ Override
37
- public void onSubscribe (Subscription subscription ) {
38
- System .out .println ("Subscribed, requesting 10 items..." );
39
- subscription .request (10 );
40
- }
23
+ private Subscription subscription ;
24
+
25
+ @ Override
26
+ public void onComplete () {
27
+ System .out .println ("Completed" );
28
+ }
29
+
30
+ @ Override
31
+ public void onError (Throwable err ) {
32
+ System .out .println ("Error: " + err .getMessage ());
33
+ }
34
+
35
+ @ Override
36
+ public void onNext (String item ) {
37
+ System .out .println ("Next: " + item );
38
+ requestItem (subscription );
39
+ }
40
+
41
+ @ Override
42
+ public void onSubscribe (Subscription subscription ) {
43
+ this .subscription = subscription ;
44
+ requestItem (subscription );
45
+ }
46
+
47
+ private void requestItem (Subscription subscription ) {
48
+ System .out .println ("Subscribed, requesting 1 items..." );
49
+ subscription .request (1 );
50
+ }
41
51
42
52
}
0 commit comments