@@ -6,28 +6,23 @@ use std::sync::{Arc, Mutex};
66use std:: thread;
77use std:: time:: { Duration , Instant } ;
88
9- use futures:: future:: Loop ;
10- use futures:: sync:: oneshot:: { self , Receiver , Sender } ;
11- use futures:: { future, Async , Future , Sink , Stream } ;
12- use grpc:: {
9+ use futures:: channel:: oneshot:: { self , Receiver , Sender } ;
10+ use futures:: prelude:: * ;
11+ use grpcio:: {
1312 CallOption , Channel , ChannelBuilder , Client as GrpcClient , EnvBuilder , Environment , WriteFlags ,
1413} ;
15- use grpc_proto :: testing:: control:: { ClientConfig , ClientType , RpcType } ;
16- use grpc_proto :: testing:: messages:: SimpleRequest ;
17- use grpc_proto :: testing:: services_grpc:: BenchmarkServiceClient ;
18- use grpc_proto :: testing:: stats:: ClientStats ;
19- use grpc_proto :: util as proto_util;
14+ use grpcio_proto :: testing:: control:: { ClientConfig , ClientType , RpcType } ;
15+ use grpcio_proto :: testing:: messages:: SimpleRequest ;
16+ use grpcio_proto :: testing:: services_grpc:: BenchmarkServiceClient ;
17+ use grpcio_proto :: testing:: stats:: ClientStats ;
18+ use grpcio_proto :: util as proto_util;
2019use rand:: { self , SeedableRng } ;
2120use rand_distr:: { Distribution , Exp } ;
2221use rand_xorshift:: XorShiftRng ;
23- use tokio_timer:: { Sleep , Timer } ;
2422
2523use crate :: bench;
26- use crate :: error:: Error ;
2724use crate :: util:: { self , CpuRecorder , Histogram } ;
2825
29- type BoxFuture < T , E > = Box < dyn Future < Item = T , Error = E > + Send > ;
30-
3126fn gen_req ( cfg : & ClientConfig ) -> SimpleRequest {
3227 let mut req = SimpleRequest :: default ( ) ;
3328 let payload_config = cfg. get_payload_config ( ) ;
@@ -87,7 +82,6 @@ struct ExecutorContext<B> {
8782 keep_running : Arc < AtomicBool > ,
8883 histogram : Arc < Mutex < Histogram > > ,
8984 backoff : B ,
90- timer : Timer ,
9185 _trace : Sender < ( ) > ,
9286}
9387
@@ -97,15 +91,13 @@ impl<B: Backoff> ExecutorContext<B> {
9791 histogram : Arc < Mutex < Histogram > > ,
9892 keep_running : Arc < AtomicBool > ,
9993 backoff : B ,
100- timer : Timer ,
10194 ) -> ( ExecutorContext < B > , Receiver < ( ) > ) {
10295 let ( tx, rx) = oneshot:: channel ( ) ;
10396 (
10497 ExecutorContext {
10598 keep_running,
10699 histogram,
107100 backoff,
108- timer,
109101 _trace : tx,
110102 } ,
111103 rx,
@@ -118,8 +110,8 @@ impl<B: Backoff> ExecutorContext<B> {
118110 his. observe ( f) ;
119111 }
120112
121- fn backoff_async ( & mut self ) -> Option < Sleep > {
122- self . backoff . backoff_time ( ) . map ( |dur| self . timer . sleep ( dur ) )
113+ fn backoff_async ( & mut self ) -> Option < futures_timer :: Delay > {
114+ self . backoff . backoff_time ( ) . map ( futures_timer :: Delay :: new )
123115 }
124116
125117 fn backoff ( & mut self ) {
@@ -151,50 +143,36 @@ impl<B: Backoff + Send + 'static> GenericExecutor<B> {
151143 }
152144 }
153145
154- fn execute_stream ( self ) {
146+ fn execute_stream ( mut self ) {
155147 let client = self . client . clone ( ) ;
156148 let keep_running = self . ctx . keep_running . clone ( ) ;
157- let ( sender, receiver) = self
149+ let ( mut sender, mut receiver) = self
158150 . client
159151 . duplex_streaming (
160152 & bench:: METHOD_BENCHMARK_SERVICE_GENERIC_CALL ,
161153 CallOption :: default ( ) ,
162154 )
163155 . unwrap ( ) ;
164- let f = future:: loop_fn (
165- ( sender, self , receiver) ,
166- move |( sender, mut executor, receiver) | {
156+ let f = async move {
157+ loop {
167158 let latency_timer = Instant :: now ( ) ;
168- let send = sender. send ( ( executor. req . clone ( ) , WriteFlags :: default ( ) ) ) ;
169- send. map_err ( Error :: from) . and_then ( move |sender| {
170- receiver
171- . into_future ( )
172- . map_err ( |( e, _) | Error :: from ( e) )
173- . and_then ( move |( _, r) | {
174- executor. ctx . observe_latency ( latency_timer. elapsed ( ) ) ;
175- let mut time = executor. ctx . backoff_async ( ) ;
176- let mut res = Some ( ( sender, executor, r) ) ;
177- future:: poll_fn ( move || {
178- if let Some ( ref mut t) = time {
179- try_ready ! ( t. poll( ) ) ;
180- }
181- time. take ( ) ;
182- let r = res. take ( ) . unwrap ( ) ;
183- let l = if r. 1 . ctx . keep_running ( ) {
184- Loop :: Continue ( r)
185- } else {
186- Loop :: Break ( r)
187- } ;
188- Ok ( Async :: Ready ( l) )
189- } )
190- } )
191- } )
192- } ,
193- )
194- . and_then ( |( mut s, e, r) | {
195- future:: poll_fn ( move || s. close ( ) . map_err ( Error :: from) ) . map ( |_| ( e, r) )
196- } )
197- . and_then ( |( e, r) | r. into_future ( ) . map ( |_| e) . map_err ( |( e, _) | Error :: from ( e) ) ) ;
159+ sender
160+ . send ( ( self . req . clone ( ) , WriteFlags :: default ( ) ) )
161+ . await ?;
162+ receiver. try_next ( ) . await ?;
163+ self . ctx . observe_latency ( latency_timer. elapsed ( ) ) ;
164+ let mut time = self . ctx . backoff_async ( ) ;
165+ if let Some ( t) = & mut time {
166+ t. await ;
167+ }
168+ if !self . ctx . keep_running ( ) {
169+ break ;
170+ }
171+ }
172+ sender. close ( ) . await ?;
173+ receiver. try_next ( ) . await ?;
174+ Ok ( ( ) )
175+ } ;
198176 spawn ! ( client, keep_running, "streaming ping pong" , f)
199177 }
200178}
@@ -228,74 +206,52 @@ impl<B: Backoff + Send + 'static> RequestExecutor<B> {
228206 } ) ;
229207 }
230208
231- fn execute_unary_async ( self ) {
209+ fn execute_unary_async ( mut self ) {
232210 let client = self . client . clone ( ) ;
233211 let keep_running = self . ctx . keep_running . clone ( ) ;
234- let f = future:: loop_fn ( self , move |mut executor| {
235- let latency_timer = Instant :: now ( ) ;
236- let handler = executor. client . unary_call_async ( & executor. req ) . unwrap ( ) ;
237-
238- handler. map_err ( Error :: from) . and_then ( move |_| {
212+ let f = async move {
213+ loop {
214+ let latency_timer = Instant :: now ( ) ;
215+ self . client . unary_call_async ( & self . req ) ?. await ?;
239216 let elapsed = latency_timer. elapsed ( ) ;
240- executor. ctx . observe_latency ( elapsed) ;
241- let mut time = executor. ctx . backoff_async ( ) ;
242- let mut res = Some ( executor) ;
243- future:: poll_fn ( move || {
244- if let Some ( ref mut t) = time {
245- try_ready ! ( t. poll( ) ) ;
246- }
247- time. take ( ) ;
248- let executor = res. take ( ) . unwrap ( ) ;
249- let l = if executor. ctx . keep_running ( ) {
250- Loop :: Continue ( executor)
251- } else {
252- Loop :: Break ( ( ) )
253- } ;
254- Ok ( Async :: Ready ( l) )
255- } )
256- } )
257- } ) ;
217+ self . ctx . observe_latency ( elapsed) ;
218+ let mut time = self . ctx . backoff_async ( ) ;
219+ if let Some ( t) = & mut time {
220+ t. await ;
221+ }
222+ if !self . ctx . keep_running ( ) {
223+ break ;
224+ }
225+ }
226+ Ok ( ( ) )
227+ } ;
258228 spawn ! ( client, keep_running, "unary async" , f)
259229 }
260230
261- fn execute_stream_ping_pong ( self ) {
231+ fn execute_stream_ping_pong ( mut self ) {
262232 let client = self . client . clone ( ) ;
263233 let keep_running = self . ctx . keep_running . clone ( ) ;
264- let ( sender, receiver) = self . client . streaming_call ( ) . unwrap ( ) ;
265- let f = future:: loop_fn (
266- ( sender, self , receiver) ,
267- move |( sender, mut executor, receiver) | {
234+ let ( mut sender, mut receiver) = self . client . streaming_call ( ) . unwrap ( ) ;
235+ let f = async move {
236+ loop {
268237 let latency_timer = Instant :: now ( ) ;
269- let send = sender. send ( ( executor. req . clone ( ) , WriteFlags :: default ( ) ) ) ;
270- send. map_err ( Error :: from) . and_then ( move |sender| {
271- receiver
272- . into_future ( )
273- . map_err ( |( e, _) | Error :: from ( e) )
274- . and_then ( move |( _, r) | {
275- executor. ctx . observe_latency ( latency_timer. elapsed ( ) ) ;
276- let mut time = executor. ctx . backoff_async ( ) ;
277- let mut res = Some ( ( sender, executor, r) ) ;
278- future:: poll_fn ( move || {
279- if let Some ( ref mut t) = time {
280- try_ready ! ( t. poll( ) ) ;
281- }
282- time. take ( ) ;
283- let r = res. take ( ) . unwrap ( ) ;
284- let l = if r. 1 . ctx . keep_running ( ) {
285- Loop :: Continue ( r)
286- } else {
287- Loop :: Break ( r)
288- } ;
289- Ok ( Async :: Ready ( l) )
290- } )
291- } )
292- } )
293- } ,
294- )
295- . and_then ( |( mut s, e, r) | {
296- future:: poll_fn ( move || s. close ( ) . map_err ( Error :: from) ) . map ( |_| ( e, r) )
297- } )
298- . and_then ( |( e, r) | r. into_future ( ) . map ( |_| e) . map_err ( |( e, _) | Error :: from ( e) ) ) ;
238+ sender
239+ . send ( ( self . req . clone ( ) , WriteFlags :: default ( ) ) )
240+ . await ?;
241+ receiver. try_next ( ) . await ?;
242+ self . ctx . observe_latency ( latency_timer. elapsed ( ) ) ;
243+ let mut time = self . ctx . backoff_async ( ) ;
244+ if let Some ( t) = & mut time {
245+ t. await ;
246+ }
247+ if !self . ctx . keep_running ( ) {
248+ break ;
249+ }
250+ }
251+ sender. close ( ) . await ?;
252+ receiver. try_next ( ) . await ?;
253+ Ok ( ( ) )
254+ } ;
299255 spawn ! ( client, keep_running, "streaming ping pong" , f) ;
300256 }
301257}
@@ -390,26 +346,23 @@ impl Client {
390346 his_param. get_resolution ( ) ,
391347 his_param. get_max_possible ( ) ,
392348 ) ) ) ;
393- let timer = Timer :: default ( ) ;
394349 let keep_running = Arc :: new ( AtomicBool :: new ( true ) ) ;
395350 let mut running_reqs = Vec :: with_capacity ( client_channels * outstanding_rpcs_per_channel) ;
396351
397352 for ch in channels {
398353 for _ in 0 ..cfg. get_outstanding_rpcs_per_channel ( ) {
399354 let his = his. clone ( ) ;
400- let timer = timer. clone ( ) ;
401355 let ch = ch. clone ( ) ;
402356 let rx = if load_params. has_poisson ( ) {
403357 let lambda = load_params. get_poisson ( ) . get_offered_load ( )
404358 / client_channels as f64
405359 / outstanding_rpcs_per_channel as f64 ;
406360 let poisson = Poisson :: new ( lambda) ;
407- let ( ctx, rx) = ExecutorContext :: new ( his, keep_running. clone ( ) , poisson, timer ) ;
361+ let ( ctx, rx) = ExecutorContext :: new ( his, keep_running. clone ( ) , poisson) ;
408362 execute ( ctx, ch, client_type, cfg) ;
409363 rx
410364 } else {
411- let ( ctx, rx) =
412- ExecutorContext :: new ( his, keep_running. clone ( ) , ClosedLoop , timer) ;
365+ let ( ctx, rx) = ExecutorContext :: new ( his, keep_running. clone ( ) , ClosedLoop ) ;
413366 execute ( ctx, ch, client_type, cfg) ;
414367 rx
415368 } ;
@@ -442,18 +395,9 @@ impl Client {
442395 stats
443396 }
444397
445- pub fn shutdown ( & mut self ) -> BoxFuture < ( ) , Error > {
398+ pub fn shutdown ( & mut self ) -> impl Future < Output = ( ) > + Send {
446399 self . keep_running . store ( false , Ordering :: Relaxed ) ;
447- let mut tasks = self . running_reqs . take ( ) . unwrap ( ) ;
448- let mut idx = tasks. len ( ) ;
449- Box :: new ( future:: poll_fn ( move || {
450- while idx > 0 {
451- if let Ok ( Async :: NotReady ) = tasks[ idx - 1 ] . poll ( ) {
452- return Ok ( Async :: NotReady ) ;
453- }
454- idx -= 1 ;
455- }
456- Ok ( Async :: Ready ( ( ) ) )
457- } ) )
400+ let tasks = self . running_reqs . take ( ) . unwrap ( ) ;
401+ futures:: future:: join_all ( tasks) . map ( |_| ( ) )
458402 }
459403}
0 commit comments