Skip to content

Commit 85a2220

Browse files
fix(fds): simplifies sync logic (#134)
My understanding of the current client-server flow is as follows: - client connects to the server - it requests push of predefined types (ExportedSvc) as initial push request - SotW - on subscriber(client) connect, server does another SotW push Not only we are sending the same data twice, but we also need to make sure that handlers we define are aligned with initial push requests. If we add new handler, we need to explictly define it as part of initial push. It also needs to be defined (again) in onSubscribe on the FDS server side. This PR moves the initial sync responsibility to the client, ensuring each handler automatically requests the data it needs. This avoids redundant pushes and reduces code duplication. Signed-off-by: bartoszmajsak <[email protected]> Signed-off-by: bartoszmajsak <[email protected]>
1 parent 514e1bb commit 85a2220

File tree

4 files changed

+15
-30
lines changed

4 files changed

+15
-30
lines changed

Diff for: cmd/federation-controller/main.go

-9
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,6 @@ import (
2424
"syscall"
2525
"time"
2626

27-
discovery "github.com/envoyproxy/go-control-plane/envoy/service/discovery/v3"
2827
routev1client "github.com/openshift/client-go/route/clientset/versioned"
2928
istiokube "istio.io/istio/pkg/kube"
3029
istiolog "istio.io/istio/pkg/log"
@@ -251,13 +250,8 @@ func startReconciler(ctx context.Context, cfg *config.Federation, serviceLister
251250
}
252251

253252
func startFederationServer(ctx context.Context, cfg *config.Federation, serviceLister v1.ServiceLister, fdsPushRequests chan xds.PushRequest) {
254-
triggerFDSPushOnNewSubscription := func() {
255-
fdsPushRequests <- xds.PushRequest{TypeUrl: xds.ExportedServiceTypeUrl}
256-
}
257-
258253
federationServer := adss.NewServer(
259254
fdsPushRequests,
260-
triggerFDSPushOnNewSubscription,
261255
fds.NewExportedServicesGenerator(*cfg, serviceLister),
262256
)
263257

@@ -316,9 +310,6 @@ func startFDSClient(ctx context.Context, remote config.Remote, meshConfigPushReq
316310
PeerName: remote.Name,
317311
DiscoveryAddr: discoveryAddr,
318312
Authority: remote.ServiceFQDN(),
319-
InitialDiscoveryRequests: []*discovery.DiscoveryRequest{{
320-
TypeUrl: xds.ExportedServiceTypeUrl,
321-
}},
322313
Handlers: map[string]adsc.ResponseHandler{
323314
xds.ExportedServiceTypeUrl: fds.NewImportedServiceHandler(importedServiceStore, meshConfigPushRequests),
324315
},

Diff for: internal/pkg/xds/adsc/adsc.go

+13-14
Original file line numberDiff line numberDiff line change
@@ -35,12 +35,11 @@ const (
3535
)
3636

3737
type ADSCConfig struct {
38-
PeerName string
39-
DiscoveryAddr string
40-
Authority string
41-
InitialDiscoveryRequests []*discovery.DiscoveryRequest
42-
Handlers map[string]ResponseHandler
43-
ReconnectDelay time.Duration
38+
PeerName string
39+
DiscoveryAddr string
40+
Authority string
41+
Handlers map[string]ResponseHandler
42+
ReconnectDelay time.Duration
4443
}
4544

4645
type ADSC struct {
@@ -66,17 +65,17 @@ func New(opts *ADSCConfig) (*ADSC, error) {
6665
}
6766

6867
func (a *ADSC) Run(ctx context.Context) error {
69-
var err error
70-
7168
client := discovery.NewAggregatedDiscoveryServiceClient(a.conn)
72-
a.stream, err = client.StreamAggregatedResources(ctx)
73-
if err != nil {
74-
return err
69+
70+
var err error
71+
if a.stream, err = client.StreamAggregatedResources(ctx); err != nil {
72+
return fmt.Errorf("failed setting resource stream: %w", err)
7573
}
7674

77-
for _, r := range a.cfg.InitialDiscoveryRequests {
78-
if sendErr := a.Send(r); sendErr != nil {
79-
a.log.Errorf("failed sending initial discovery request: %+v", sendErr)
75+
for k, _ := range a.cfg.Handlers {
76+
discoveryRequest := &discovery.DiscoveryRequest{TypeUrl: k}
77+
if sendErr := a.Send(discoveryRequest); sendErr != nil {
78+
a.log.Errorf("[%s] failed requesting initial discovery sync: %+v", k, sendErr)
8079
}
8180
}
8281

Diff for: internal/pkg/xds/adss/adss_handler.go

-4
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,6 @@ type adsServer struct {
4848
handlers map[string]RequestHandler
4949
subscribers sync.Map
5050
nextSubscriberID atomic.Uint64
51-
onNewSubscriber func()
5251
}
5352

5453
// subscriber represents a client that is subscribed to XDS resources.
@@ -72,9 +71,6 @@ func (adss *adsServer) StreamAggregatedResources(downstream DiscoveryStream) err
7271

7372
adss.subscribers.Store(sub.id, sub)
7473

75-
if adss.onNewSubscriber != nil {
76-
adss.onNewSubscriber()
77-
}
7874
go adss.recvFromStream(int64(sub.id), downstream)
7975

8076
<-ctx.Done()

Diff for: internal/pkg/xds/adss/grpc_server.go

+2-3
Original file line numberDiff line numberDiff line change
@@ -31,15 +31,14 @@ type Server struct {
3131
pushRequests <-chan xds.PushRequest
3232
}
3333

34-
func NewServer(pushRequests <-chan xds.PushRequest, onNewSubscriber func(), handlers ...RequestHandler) *Server {
34+
func NewServer(pushRequests <-chan xds.PushRequest, handlers ...RequestHandler) *Server {
3535
grpcServer := grpc.NewServer()
3636
handlerMap := make(map[string]RequestHandler)
3737
for _, g := range handlers {
3838
handlerMap[g.GetTypeUrl()] = g
3939
}
4040
ads := &adsServer{
41-
handlers: handlerMap,
42-
onNewSubscriber: onNewSubscriber,
41+
handlers: handlerMap,
4342
}
4443

4544
discovery.RegisterAggregatedDiscoveryServiceServer(grpcServer, ads)

0 commit comments

Comments
 (0)