@@ -321,7 +321,7 @@ func TestGetOnWaitChannel(t *testing.T) {
321321 servers [0 ].stop ()
322322}
323323
324- func TestOneConnectionDown (t * testing.T ) {
324+ func TestOneServerDown (t * testing.T ) {
325325 // Start 2 servers.
326326 numServers := 2
327327 servers , r := startServers (t , numServers , math .MaxUint32 )
@@ -363,7 +363,7 @@ func TestOneConnectionDown(t *testing.T) {
363363 go func () {
364364 time .Sleep (sleepDuration )
365365 // After sleepDuration, invoke RPC.
366- // server[0] is killed around the same time to make it racey between balancer and gRPC internals.
366+ // server[0] is killed around the same time to make it racy between balancer and gRPC internals.
367367 Invoke (context .Background (), "/foo/bar" , & req , & reply , cc , FailFast (false ))
368368 wg .Done ()
369369 }()
@@ -374,3 +374,65 @@ func TestOneConnectionDown(t *testing.T) {
374374 servers [i ].stop ()
375375 }
376376}
377+
378+ func TestOneAddressRemoval (t * testing.T ) {
379+ // Start 2 servers.
380+ numServers := 2
381+ servers , r := startServers (t , numServers , math .MaxUint32 )
382+ cc , err := Dial ("foo.bar.com" , WithBalancer (RoundRobin (r )), WithBlock (), WithInsecure (), WithCodec (testCodec {}))
383+ if err != nil {
384+ t .Fatalf ("Failed to create ClientConn: %v" , err )
385+ }
386+ // Add servers[1] to the service discovery.
387+ var updates []* naming.Update
388+ updates = append (updates , & naming.Update {
389+ Op : naming .Add ,
390+ Addr : "127.0.0.1:" + servers [1 ].port ,
391+ })
392+ r .w .inject (updates )
393+ req := "port"
394+ var reply string
395+ // Loop until servers[1] is up
396+ for {
397+ if err := Invoke (context .Background (), "/foo/bar" , & req , & reply , cc ); err != nil && ErrorDesc (err ) == servers [1 ].port {
398+ break
399+ }
400+ time .Sleep (10 * time .Millisecond )
401+ }
402+
403+ var wg sync.WaitGroup
404+ numRPC := 100
405+ sleepDuration := 10 * time .Millisecond
406+ wg .Add (1 )
407+ go func () {
408+ time .Sleep (sleepDuration )
409+ // After sleepDuration, delete server[0].
410+ var updates []* naming.Update
411+ updates = append (updates , & naming.Update {
412+ Op : naming .Delete ,
413+ Addr : "127.0.0.1:" + servers [0 ].port ,
414+ })
415+ r .w .inject (updates )
416+ wg .Done ()
417+ }()
418+
419+ // All non-failfast RPCs should not fail because there's at least one connection available.
420+ for i := 0 ; i < numRPC ; i ++ {
421+ wg .Add (1 )
422+ go func () {
423+ var reply string
424+ time .Sleep (sleepDuration )
425+ // After sleepDuration, invoke RPC.
426+ // server[0] is removed around the same time to make it racy between balancer and gRPC internals.
427+ if err := Invoke (context .Background (), "/foo/bar" , & expectedRequest , & reply , cc , FailFast (false )); err != nil {
428+ t .Errorf ("grpc.Invoke(_, _, _, _, _) = %v, want not nil" , err )
429+ }
430+ wg .Done ()
431+ }()
432+ }
433+ wg .Wait ()
434+ cc .Close ()
435+ for i := 0 ; i < numServers ; i ++ {
436+ servers [i ].stop ()
437+ }
438+ }
0 commit comments