@@ -21,42 +21,71 @@ import fs2.Stream
21
21
import higherkindness .mu .rpc .common ._
22
22
import higherkindness .mu .rpc .fs2 .Utils .service .ProtoRPCService
23
23
import munit .CatsEffectSuite
24
+ import org .log4s .{getLogger , Logger }
24
25
26
+ import java .io .IOException
25
27
import scala .concurrent .duration ._
26
28
27
29
class RPCTests extends CatsEffectSuite {
28
30
31
+ private val logger : Logger = getLogger
32
+
29
33
import higherkindness .mu .rpc .fs2 .Utils .database ._
30
34
import higherkindness .mu .rpc .fs2 .Utils .implicits ._
31
35
36
+ val retryFiveTimes : retry.RetryPolicy [IO ] =
37
+ retry.RetryPolicies .limitRetries[IO ](5 )
38
+
39
+ implicit class IOOps [Result ](private val io : IO [Result ]) {
40
+ def withRetry : IO [Result ] =
41
+ retry
42
+ .retryingOnSomeErrors[Result ]
43
+ .apply[IO , Throwable ](
44
+ retryFiveTimes,
45
+ {
46
+ case _ : IOException => IO .pure(true )
47
+ case _ => IO .pure(false )
48
+ },
49
+ (e, details) =>
50
+ details match {
51
+ case retry.RetryDetails .WillDelayAndRetry (_, retries : Int , _) =>
52
+ IO (logger.info(e)(s " Failed, retried $retries times " ))
53
+ case _ => IO .unit
54
+ }
55
+ )(io)
56
+ }
57
+
32
58
val behaviourOf : String = " mu-rpc client with fs2.Stream"
33
59
34
60
test(" mu-rpc server should allow to startup a server and check if it's alive" ) {
35
- grpcServer.use(_.isShutdown).assertEquals(false )
61
+ grpcServer.use(_.isShutdown).withRetry. assertEquals(false )
36
62
}
37
63
38
64
test(" mu-rpc server should allow to get the port where it's running" ) {
39
- grpcServer.use(_.getPort).assertEquals(SC .port)
65
+ grpcServer.use(_.getPort).withRetry. assertEquals(SC .port)
40
66
}
41
67
42
68
test(behaviourOf + " be able to run unary services" ) {
43
69
grpcServer
44
70
.flatMap(_ => muAvroRPCServiceClient)
45
71
.use(_.unary(a1))
72
+ .withRetry
46
73
.assertEquals(c1)
47
74
}
48
75
49
76
test(behaviourOf + " be able to run unary services with avro schemas" ) {
50
77
grpcServer
51
78
.flatMap(_ => muAvroWithSchemaRPCServiceClient)
52
79
.use(_.unaryWithSchema(a1))
80
+ .withRetry
53
81
.assertEquals(c1)
54
82
}
55
83
56
84
test(behaviourOf + " be able to run server streaming services" ) {
57
85
grpcServer
58
86
.flatMap(_ => muProtoRPCServiceClient)
59
87
.use(_.serverStreaming(b1).flatMap(_.compile.toList))
88
+ .withRetry
60
89
.assertEquals(cList)
61
90
}
62
91
@@ -79,12 +108,14 @@ class RPCTests extends CatsEffectSuite {
79
108
clientProgram(" Thrown" , s)
80
109
.assertEquals(List (C (" UNKNOWN" , a1)))
81
110
}
111
+ .withRetry
82
112
}
83
113
84
114
test(behaviourOf + " be able to run client streaming services" ) {
85
115
grpcServer
86
116
.flatMap(_ => muProtoRPCServiceClient)
87
117
.use(_.clientStreaming(Stream .fromIterator[IO ](aList.iterator, 1 )))
118
+ .withRetry
88
119
.assertEquals(dResult33)
89
120
}
90
121
@@ -94,6 +125,7 @@ class RPCTests extends CatsEffectSuite {
94
125
.use(
95
126
_.biStreaming(Stream .fromIterator[IO ](eList.iterator, 1 )).flatMap(_.compile.toList)
96
127
)
128
+ .withRetry
97
129
.map(_.distinct)
98
130
.assertEquals(eList)
99
131
}
@@ -107,6 +139,7 @@ class RPCTests extends CatsEffectSuite {
107
139
_.biStreamingWithSchema(Stream .fromIterator[IO ](eList.iterator, 1 ))
108
140
.flatMap(_.compile.toList)
109
141
)
142
+ .withRetry
110
143
.map(_.distinct)
111
144
.assertEquals(eList)
112
145
}
@@ -147,7 +180,7 @@ class RPCTests extends CatsEffectSuite {
147
180
.clientStreaming(Stream .fromIterator[IO ](aList.iterator, 1 ))
148
181
.assertEquals(dResult33)
149
182
)
150
- } yield ()).use_.timeoutTo(opTimeout, IO .println(" ERROR on multi-op fs2!" ))
183
+ } yield ()).use_.withRetry. timeoutTo(opTimeout, IO .println(" ERROR on multi-op fs2!" ))
151
184
}
152
185
153
186
val behaviourOfC : String = behaviourOf + " and compression enabled"
@@ -156,27 +189,31 @@ class RPCTests extends CatsEffectSuite {
156
189
grpcServer
157
190
.flatMap(_ => muCompressedAvroRPCServiceClient)
158
191
.use(_.unaryCompressed(a1))
192
+ .withRetry
159
193
.assertEquals(c1)
160
194
}
161
195
162
196
test(behaviourOfC + " be able to run unary services with avro schema" ) {
163
197
grpcServer
164
198
.flatMap(_ => muCompressedAvroWithSchemaRPCServiceClient)
165
199
.use(_.unaryCompressedWithSchema(a1))
200
+ .withRetry
166
201
.assertEquals(c1)
167
202
}
168
203
169
204
test(behaviourOfC + " be able to run server streaming services" ) {
170
205
grpcServer
171
206
.flatMap(_ => muCompressedProtoRPCServiceClient)
172
207
.use(_.serverStreamingCompressed(b1).flatMap(_.compile.toList))
208
+ .withRetry
173
209
.assertEquals(cList)
174
210
}
175
211
176
212
test(behaviourOfC + " be able to run client streaming services" ) {
177
213
grpcServer
178
214
.flatMap(_ => muCompressedProtoRPCServiceClient)
179
215
.use(_.clientStreamingCompressed(Stream .fromIterator[IO ](aList.iterator, 1 )))
216
+ .withRetry
180
217
.assertEquals(dResult33)
181
218
}
182
219
@@ -187,6 +224,7 @@ class RPCTests extends CatsEffectSuite {
187
224
_.biStreamingCompressed(Stream .fromIterator[IO ](eList.iterator, 1 ))
188
225
.flatMap(_.compile.toList)
189
226
)
227
+ .withRetry
190
228
.map(_.distinct)
191
229
.assertEquals(eList)
192
230
}
@@ -200,6 +238,7 @@ class RPCTests extends CatsEffectSuite {
200
238
_.biStreamingCompressedWithSchema(Stream .fromIterator[IO ](eList.iterator, 1 ))
201
239
.flatMap(_.compile.toList)
202
240
)
241
+ .withRetry
203
242
.map(_.distinct)
204
243
.assertEquals(eList)
205
244
}
@@ -244,7 +283,7 @@ class RPCTests extends CatsEffectSuite {
244
283
.clientStreamingCompressed(Stream .fromIterator[IO ](aList.iterator, 1 ))
245
284
.assertEquals(dResult33)
246
285
)
247
- } yield ()).use_.timeout(opTimeout)
286
+ } yield ()).use_.withRetry. timeout(opTimeout)
248
287
}
249
288
250
289
}
0 commit comments