Skip to content

Commit 07cb3df

Browse files
refactor endpoint code to eliminate code duplication (#52726)
* wip Signed-off-by: Rama Chavali <[email protected]> * refactor endpoint code to eliminate code duplication Signed-off-by: Rama Chavali <[email protected]> * fix bad merge Signed-off-by: Rama Chavali <[email protected]> --------- Signed-off-by: Rama Chavali <[email protected]>
1 parent 54d3d57 commit 07cb3df

File tree

2 files changed

+19
-42
lines changed

2 files changed

+19
-42
lines changed

pilot/pkg/model/push_context.go

+2
Original file line numberDiff line numberDiff line change
@@ -297,6 +297,8 @@ type ConsolidatedDestRule struct {
297297
from []types.NamespacedName
298298
}
299299

300+
type EdsUpdateFn func(shard ShardKey, hostname string, namespace string, entry []*IstioEndpoint)
301+
300302
// XDSUpdater is used for direct updates of the xDS model and incremental push.
301303
// Pilot uses multiple registries - for example each K8S cluster is a registry
302304
// instance. Each registry is responsible for tracking a set

pilot/pkg/serviceregistry/serviceentry/controller.go

+17-42
Original file line numberDiff line numberDiff line change
@@ -324,12 +324,12 @@ func (s *Controller) workloadEntryHandler(old, curr config.Config, event model.E
324324
if event == model.EventAdd {
325325
s.XdsUpdater.ProxyUpdate(s.Cluster(), wle.Address)
326326
}
327-
s.edsUpdate(allInstances)
327+
s.edsUpdate(allInstances, true)
328328
return
329329
}
330330

331331
// update eds cache only
332-
s.edsCacheUpdate(allInstances)
332+
s.edsUpdate(allInstances, false)
333333

334334
pushReq := &model.PushRequest{
335335
Full: true,
@@ -451,7 +451,7 @@ func (s *Controller) serviceEntryHandler(old, curr config.Config, event model.Ev
451451
fullPush := len(configsUpdated) > 0
452452
// if not full push needed, at least one service unchanged
453453
if !fullPush {
454-
s.edsUpdate(serviceInstances)
454+
s.edsUpdate(serviceInstances, true)
455455
return
456456
}
457457

@@ -468,7 +468,7 @@ func (s *Controller) serviceEntryHandler(old, curr config.Config, event model.Ev
468468
keys.Insert(instancesKey{hostname: svc.Hostname, namespace: curr.Namespace})
469469
}
470470

471-
s.queueEdsEvent(keys, s.doEdsCacheUpdate)
471+
s.queueEdsEvent(keys, false)
472472

473473
pushReq := &model.PushRequest{
474474
Full: true,
@@ -604,7 +604,7 @@ func (s *Controller) WorkloadInstanceHandler(wi *model.WorkloadInstance, event m
604604

605605
s.mutex.Unlock()
606606

607-
s.edsUpdate(append(instances, instancesDeleted...))
607+
s.edsUpdate(append(instances, instancesDeleted...), true)
608608

609609
// ServiceEntry with WorkloadEntry results in STRICT_DNS cluster with hardcoded endpoints
610610
// need to update CDS to refresh endpoints
@@ -691,7 +691,7 @@ func (s *Controller) ResyncEDS() {
691691
s.mutex.RLock()
692692
allInstances := s.serviceInstances.getAll()
693693
s.mutex.RUnlock()
694-
s.edsUpdate(allInstances)
694+
s.edsUpdate(allInstances, true)
695695
// HACK to workaround Service syncing after WorkloadEntry: https://github.com/istio/istio/issues/45114
696696
s.workloadInstances.ForEach(func(wi *model.WorkloadInstance) {
697697
if wi.Kind == model.WorkloadEntryKind {
@@ -703,35 +703,27 @@ func (s *Controller) ResyncEDS() {
703703
// edsUpdate triggers an EDS push serially such that we can prevent all instances
704704
// got at t1 can accidentally override that got at t2 if multiple threads are
705705
// running this function. Queueing ensures latest updated wins.
706-
func (s *Controller) edsUpdate(instances []*model.ServiceInstance) {
706+
func (s *Controller) edsUpdate(instances []*model.ServiceInstance, pushEds bool) {
707707
// Find all keys we need to lookup
708708
keys := sets.NewWithLength[instancesKey](len(instances))
709709
for _, i := range instances {
710710
keys.Insert(makeInstanceKey(i))
711711
}
712-
s.queueEdsEvent(keys, s.doEdsUpdate)
713-
}
714-
715-
// edsCacheUpdate updates eds cache serially such that we can prevent allinstances
716-
// got at t1 can accidentally override that got at t2 if multiple threads are
717-
// running this function. Queueing ensures latest updated wins.
718-
func (s *Controller) edsCacheUpdate(instances []*model.ServiceInstance) {
719-
// Find all keys we need to lookup
720-
keys := map[instancesKey]struct{}{}
721-
for _, i := range instances {
722-
keys[makeInstanceKey(i)] = struct{}{}
723-
}
724-
s.queueEdsEvent(keys, s.doEdsCacheUpdate)
712+
s.queueEdsEvent(keys, pushEds)
725713
}
726714

727715
// queueEdsEvent processes eds events sequentially for the passed keys and invokes the passed function.
728-
func (s *Controller) queueEdsEvent(keys sets.Set[instancesKey], edsFn func(keys sets.Set[instancesKey])) {
716+
func (s *Controller) queueEdsEvent(keys sets.Set[instancesKey], pushEds bool) {
729717
// wait for the cache update finished
730718
waitCh := make(chan struct{})
731719
// trigger update eds endpoint shards
732720
s.edsQueue.Push(func() error {
733721
defer close(waitCh)
734-
edsFn(keys)
722+
xdsUpdateFn := s.XdsUpdater.EDSCacheUpdate
723+
if pushEds {
724+
xdsUpdateFn = s.XdsUpdater.EDSUpdate
725+
}
726+
s.doEdsUpdate(keys, xdsUpdateFn)
735727
return nil
736728
})
737729
select {
@@ -744,34 +736,17 @@ func (s *Controller) queueEdsEvent(keys sets.Set[instancesKey], edsFn func(keys
744736
}
745737
}
746738

747-
// doEdsCacheUpdate invokes XdsUpdater's EDSCacheUpdate to update endpoint shards.
748-
func (s *Controller) doEdsCacheUpdate(keys sets.Set[instancesKey]) {
739+
func (s *Controller) doEdsUpdate(keys sets.Set[instancesKey], xdsUpdateFn model.EdsUpdateFn) {
749740
endpoints := s.buildEndpoints(keys)
750741
shard := model.ShardKeyFromRegistry(s)
751742

752743
for k := range keys {
753744
if eps, ok := endpoints[k]; ok {
754745
// Update the cache with the generated endpoints.
755-
s.XdsUpdater.EDSCacheUpdate(shard, string(k.hostname), k.namespace, eps)
756-
} else {
757-
// Handle deletions by sending a nil endpoints update.
758-
s.XdsUpdater.EDSCacheUpdate(shard, string(k.hostname), k.namespace, nil)
759-
}
760-
}
761-
}
762-
763-
// doEdsUpdate invokes XdsUpdater's eds update to trigger eds push.
764-
func (s *Controller) doEdsUpdate(keys sets.Set[instancesKey]) {
765-
endpoints := s.buildEndpoints(keys)
766-
shard := model.ShardKeyFromRegistry(s)
767-
768-
for k := range keys {
769-
if eps, ok := endpoints[k]; ok {
770-
// Update with the generated endpoints.
771-
s.XdsUpdater.EDSUpdate(shard, string(k.hostname), k.namespace, eps)
746+
xdsUpdateFn(shard, string(k.hostname), k.namespace, eps)
772747
} else {
773748
// Handle deletions by sending a nil endpoints update.
774-
s.XdsUpdater.EDSUpdate(shard, string(k.hostname), k.namespace, nil)
749+
xdsUpdateFn(shard, string(k.hostname), k.namespace, nil)
775750
}
776751
}
777752
}

0 commit comments

Comments
 (0)