@@ -35,6 +35,7 @@ import (
35
35
"k8s.io/apimachinery/pkg/util/runtime"
36
36
"k8s.io/client-go/informers"
37
37
v2 "k8s.io/client-go/informers/core/v1"
38
+ networkingv1 "k8s.io/client-go/informers/networking/v1"
38
39
"k8s.io/client-go/kubernetes"
39
40
kclientcmd "k8s.io/client-go/tools/clientcmd"
40
41
"k8s.io/klog/v2"
@@ -303,6 +304,10 @@ func main() {
303
304
return SyncServicePods (ctx , kdb , factory .Core ().V1 ().Services (), factory .Core ().V1 ().Pods ())
304
305
})
305
306
307
+ g .Go (func () error {
308
+ return SyncIngressServiceUuid (ctx , kdb , factory .Networking ().V1 ().Ingresses ())
309
+ })
310
+
306
311
err = internal .SyncPrometheusConfig (ctx , db , & cfg .Prometheus , clusterInstance .Uuid )
307
312
if err != nil {
308
313
klog .Error (errors .Wrap (err , "cannot sync prometheus config" ))
@@ -702,3 +707,103 @@ func SyncServicePods(ctx context.Context, db *kdatabase.Database, serviceList v2
702
707
703
708
return g .Wait ()
704
709
}
710
+
711
+ func SyncIngressServiceUuid (ctx context.Context , db * kdatabase.Database , ingressList networkingv1.IngressInformer ) error {
712
+ // TODO: Respect delete events. At the moment, service link entries will only be deleted if the corresponding ingress is deleted.
713
+ ingressBackendServices := make (chan any )
714
+ ingressRules := make (chan any )
715
+
716
+ g , ctx := errgroup .WithContext (ctx )
717
+
718
+ g .Go (func () error {
719
+ return db .UpsertStreamed (ctx , ingressBackendServices )
720
+ })
721
+
722
+ g .Go (func () error {
723
+ return db .UpsertStreamed (ctx , ingressRules )
724
+ })
725
+
726
+ g .Go (func () error {
727
+ ch := cachev1 .Multiplexers ().Services ().UpsertEvents ().Out ()
728
+ for {
729
+ select {
730
+ case service , more := <- ch :
731
+ if ! more {
732
+ return nil
733
+ }
734
+
735
+ ingresses , err := ingressList .Lister ().Ingresses (service .(* schemav1.Service ).Namespace ).List (labels .Everything ())
736
+ if err != nil {
737
+ return err
738
+ }
739
+
740
+ for _ , ingress := range ingresses {
741
+ if ingress .Spec .DefaultBackend != nil {
742
+ if ingress .Spec .DefaultBackend .Service != nil {
743
+ select {
744
+ case ingressBackendServices <- schemav1.IngressBackendService {
745
+ ServiceUuid : service .(* schemav1.Service ).Uuid ,
746
+ IngressUuid : schemav1 .EnsureUUID (ingress .UID ),
747
+ ServiceName : ingress .Spec .DefaultBackend .Service .Name ,
748
+ ServicePortName : ingress .Spec .DefaultBackend .Service .Port .Name ,
749
+ ServicePortNumber : ingress .Spec .DefaultBackend .Service .Port .Number ,
750
+ }:
751
+ case <- ctx .Done ():
752
+ return ctx .Err ()
753
+ }
754
+ }
755
+ }
756
+
757
+ for _ , rules := range ingress .Spec .Rules {
758
+ if rules .HTTP == nil {
759
+ continue
760
+ }
761
+
762
+ for _ , ruleValue := range rules .HTTP .Paths {
763
+ if ruleValue .Backend .Service == nil {
764
+ continue
765
+ }
766
+
767
+ serviceName := ruleValue .Backend .Service .Name
768
+ if service .(* schemav1.Service ).Name == serviceName {
769
+ serviceUuid := service .(* schemav1.Service ).Uuid
770
+ ingressUuid := schemav1 .EnsureUUID (ingress .UID )
771
+ ingressRuleUuid := schemav1 .NewUUID (ingressUuid , rules .Host + ruleValue .Path + serviceName )
772
+
773
+ select {
774
+ case ingressBackendServices <- schemav1.IngressBackendService {
775
+ ServiceUuid : serviceUuid ,
776
+ IngressUuid : ingressUuid ,
777
+ IngressRuleUuid : ingressRuleUuid ,
778
+ ServiceName : serviceName ,
779
+ ServicePortName : ruleValue .Backend .Service .Port .Name ,
780
+ ServicePortNumber : ruleValue .Backend .Service .Port .Number ,
781
+ }:
782
+ case <- ctx .Done ():
783
+ return ctx .Err ()
784
+ }
785
+
786
+ select {
787
+ case ingressRules <- schemav1.IngressRule {
788
+ Uuid : ingressRuleUuid ,
789
+ BackendUuid : serviceUuid ,
790
+ IngressUuid : ingressUuid ,
791
+ Host : schemav1 .NewNullableString (rules .Host ),
792
+ Path : schemav1 .NewNullableString (ruleValue .Path ),
793
+ PathType : string (* ruleValue .PathType ),
794
+ }:
795
+ case <- ctx .Done ():
796
+ return ctx .Err ()
797
+ }
798
+ }
799
+ }
800
+ }
801
+ }
802
+ case <- ctx .Done ():
803
+ return ctx .Err ()
804
+ }
805
+ }
806
+ })
807
+
808
+ return g .Wait ()
809
+ }
0 commit comments