@@ -94,6 +94,7 @@ func (r *SvcDiscoveryRegistryImpl) initializeConnMap() error {
94
94
if err != nil {
95
95
return err
96
96
}
97
+ r .connMap = make (map [string ][]* grpc.ClientConn )
97
98
for _ , kv := range resp .Kvs {
98
99
prefix , addr := r .splitEndpoint (string (kv .Key ))
99
100
conn , err := grpc .DialContext (context .Background (), addr , append (r .dialOptions , grpc .WithResolvers (r .resolver ))... )
@@ -102,7 +103,6 @@ func (r *SvcDiscoveryRegistryImpl) initializeConnMap() error {
102
103
}
103
104
r .connMap [prefix ] = append (r .connMap [prefix ], conn )
104
105
}
105
-
106
106
return nil
107
107
}
108
108
@@ -213,23 +213,10 @@ func (r *SvcDiscoveryRegistryImpl) keepAliveLease(leaseID clientv3.LeaseID) {
213
213
func (r * SvcDiscoveryRegistryImpl ) watchServiceChanges () {
214
214
watchChan := r .client .Watch (context .Background (), r .rootDirectory , clientv3 .WithPrefix ())
215
215
for range watchChan {
216
+ r .mu .RLock ()
216
217
r .initializeConnMap ()
218
+ r .mu .RUnlock ()
217
219
}
218
-
219
- //watchChan := r.client.Watch(context.Background(), r.rootDirectory, clientv3.WithPrefix())
220
- //updatedPrefixes := make(map[string]struct{}) // Create a set to track updated prefixes
221
- //for watchResp := range watchChan {
222
- // for _, event := range watchResp.Events {
223
- // prefix, _ := r.splitEndpoint(string(event.Kv.Key))
224
- // if _, alreadyUpdated := updatedPrefixes[prefix]; !alreadyUpdated {
225
- // updatedPrefixes[prefix] = struct{}{} // Mark this prefix as updated
226
- // fmt.Println("refreshConnMap prefix", prefix, event)
227
- // r.refreshConnMap(prefix)
228
- // } else {
229
- // fmt.Println("no refreshConnMap prefix", prefix, event)
230
- // }
231
- // }
232
- //}
233
220
}
234
221
235
222
// refreshConnMap fetches the latest endpoints and updates the local map
0 commit comments