18
18
19
19
package org .apache .flink .client ;
20
20
21
+ import static org .junit .Assert .assertEquals ;
21
22
import static org .junit .Assert .assertTrue ;
22
23
import static org .junit .Assert .fail ;
23
24
25
+ import akka .actor .ActorRef ;
26
+ import akka .actor .ActorSystem ;
27
+ import akka .actor .Props ;
28
+ import akka .actor .UntypedActor ;
29
+ import akka .testkit .JavaTestKit ;
30
+ import org .apache .commons .cli .CommandLine ;
31
+ import org .apache .flink .runtime .event .job .RecentJobEvent ;
24
32
import org .apache .flink .runtime .jobgraph .JobID ;
33
+ import org .apache .flink .runtime .messages .EventCollectorMessages ;
34
+ import org .apache .flink .runtime .messages .JobManagerMessages ;
35
+ import org .apache .flink .runtime .messages .JobResult ;
36
+ import org .junit .AfterClass ;
25
37
import org .junit .BeforeClass ;
26
38
import org .junit .Test ;
27
39
40
+ import java .util .ArrayList ;
41
+
28
42
//TODO: Update test case
29
43
public class CliFrontendListCancelTest {
44
+
45
+ private static ActorSystem actorSystem ;
46
+
47
+ @ BeforeClass
48
+ public static void setup (){
49
+ actorSystem = ActorSystem .create ("TestingActorSystem" );
50
+ }
51
+
52
+ @ AfterClass
53
+ public static void teardown (){
54
+ JavaTestKit .shutdownActorSystem (actorSystem );
55
+ actorSystem = null ;
56
+ }
30
57
31
58
@ BeforeClass
32
59
public static void init () {
@@ -57,9 +84,11 @@ public void testCancel() {
57
84
{
58
85
JobID jid = new JobID ();
59
86
String jidString = jid .toString ();
87
+
88
+ final ActorRef jm = actorSystem .actorOf (Props .create (CliJobManager .class , jid ));
60
89
61
90
String [] parameters = {"-i" , jidString };
62
- InfoListTestCliFrontend testFrontend = new InfoListTestCliFrontend ();
91
+ InfoListTestCliFrontend testFrontend = new InfoListTestCliFrontend (jm );
63
92
int retCode = testFrontend .cancel (parameters );
64
93
assertTrue (retCode == 0 );
65
94
}
@@ -75,6 +104,8 @@ public void testCancel() {
75
104
@ Test
76
105
public void testList () {
77
106
try {
107
+ final ActorRef jm = actorSystem .actorOf (Props .create (CliJobManager .class , (Object )null ));
108
+
78
109
// test unrecognized option
79
110
{
80
111
String [] parameters = {"-v" , "-k" };
@@ -94,7 +125,7 @@ public void testList() {
94
125
// test list properly
95
126
{
96
127
String [] parameters = {"-r" , "-s" };
97
- InfoListTestCliFrontend testFrontend = new InfoListTestCliFrontend ();
128
+ InfoListTestCliFrontend testFrontend = new InfoListTestCliFrontend (jm );
98
129
int retCode = testFrontend .list (parameters );
99
130
assertTrue (retCode == 0 );
100
131
}
@@ -108,20 +139,36 @@ public void testList() {
108
139
109
140
110
141
protected static final class InfoListTestCliFrontend extends CliFrontendTestUtils .TestingCliFrontend {
142
+ private ActorRef jobmanager ;
111
143
112
- public InfoListTestCliFrontend () {
144
+ public InfoListTestCliFrontend (ActorRef jobmanager ){
145
+ this .jobmanager = jobmanager ;
113
146
}
114
147
115
- public int getTotalNumberOfRegisteredSlots () {
116
- return 1 ;
148
+ @ Override
149
+ public ActorRef getJobManager (CommandLine line ){
150
+ return jobmanager ;
117
151
}
152
+ }
118
153
119
- @ Override
120
- public int getNumberOfSlotsAvailableToScheduler () throws IOException {
154
+ protected static final class CliJobManager extends UntypedActor {
155
+ private final JobID jobID ;
156
+
157
+ public CliJobManager (final JobID jobID ){
158
+ this .jobID = jobID ;
159
+ }
121
160
122
161
@ Override
123
- public int getBlobServerPort () {
124
- throw new UnsupportedOperationException ();
162
+ public void onReceive (Object message ) throws Exception {
163
+ if (message instanceof JobManagerMessages .RequestAvailableSlots$ ){
164
+ getSender ().tell (1 , getSelf ());
165
+ }else if (message instanceof EventCollectorMessages .RequestRecentJobEvents$ ) {
166
+ getSender ().tell (new EventCollectorMessages .RecentJobs (new ArrayList <RecentJobEvent >()), getSelf ());
167
+ }else if (message instanceof JobManagerMessages .CancelJob ){
168
+ JobManagerMessages .CancelJob cancelJob = (JobManagerMessages .CancelJob ) message ;
169
+ assertEquals (jobID , cancelJob .jobID ());
170
+ getSender ().tell (new JobResult .JobCancelResult (JobResult .SUCCESS (), null ), getSelf ());
171
+ }
125
172
}
126
173
}
127
174
}
0 commit comments