diff --git a/internal/ingress/controller/controller.go b/internal/ingress/controller/controller.go index 5a263551ef..0894c0dfc1 100644 --- a/internal/ingress/controller/controller.go +++ b/internal/ingress/controller/controller.go @@ -156,6 +156,13 @@ func getIngressPodZone(svc *apiv1.Service) string { } } } + if svc.Spec.TrafficDistribution != nil && *svc.Spec.TrafficDistribution == apiv1.ServiceTrafficDistributionPreferClose { + if foundZone, ok := k8s.IngressNodeDetails.GetLabels()[apiv1.LabelTopologyZone]; ok { + klog.V(3).Infof("Svc has traffic distribution enabled, try to use zone %q where controller pod is running for Service %q ", foundZone, svcKey) + return foundZone + } + } + return emptyZone } diff --git a/internal/ingress/controller/endpointslices.go b/internal/ingress/controller/endpointslices.go index ed46e2c853..1b8a3b571a 100644 --- a/internal/ingress/controller/endpointslices.go +++ b/internal/ingress/controller/endpointslices.go @@ -44,6 +44,10 @@ func getEndpointsFromSlices(s *corev1.Service, port *corev1.ServicePort, proto c return upsServers } + // we need to check if there is at least one endpoint with controller zone + // if we use traffic distribution + useTrafficDistribution := s.Spec.TrafficDistribution != nil && *s.Spec.TrafficDistribution == corev1.ServiceTrafficDistributionPreferClose + // using a map avoids duplicated upstream servers when the service // contains multiple port definitions sharing the same targetport processedUpstreamServers := make(map[string]struct{}) @@ -115,6 +119,27 @@ func getEndpointsFromSlices(s *corev1.Service, port *corev1.ServicePort, proto c useTopologyHints = false if zoneForHints != emptyZone { useTopologyHints = true + + // check if endpointslices have zone hints with controller zone + if useTrafficDistribution { + foundEndpointsForZone := false + for _, ep := range eps.Endpoints { + if ep.Hints == nil { + continue + } + for _, epzone := range ep.Hints.ForZones { + if epzone.Name == zoneForHints { + foundEndpointsForZone = true + break + } + } + } + if !foundEndpointsForZone { + klog.V(3).Infof("No endpoints found for zone %q in Service %q", zoneForHints, svcKey) + useTopologyHints = false + } + } + // check if all endpointslices have zone hints for _, ep := range eps.Endpoints { if ep.Hints == nil || len(ep.Hints.ForZones) == 0 { diff --git a/internal/ingress/controller/endpointslices_test.go b/internal/ingress/controller/endpointslices_test.go index 69ef3ef1b9..ca29a4dd3b 100644 --- a/internal/ingress/controller/endpointslices_test.go +++ b/internal/ingress/controller/endpointslices_test.go @@ -20,6 +20,8 @@ import ( "fmt" "testing" + "k8s.io/utils/ptr" + corev1 "k8s.io/api/core/v1" discoveryv1 "k8s.io/api/discovery/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -916,6 +918,168 @@ func TestGetEndpointsFromSlices(t *testing.T) { }, }, }, + { + "should return one endpoint which belongs to zone", + &corev1.Service{ + Spec: corev1.ServiceSpec{ + Type: corev1.ServiceTypeClusterIP, + ClusterIP: "1.1.1.1", + Ports: []corev1.ServicePort{ + { + Name: "default", + TargetPort: intstr.FromString("port-1"), + }, + }, + TrafficDistribution: ptr.To(corev1.ServiceTrafficDistributionPreferClose), + }, + }, + &corev1.ServicePort{ + Name: "port-1", + TargetPort: intstr.FromString("port-1"), + }, + corev1.ProtocolTCP, + "eu-west-1b", + func(string) ([]*discoveryv1.EndpointSlice, error) { + return []*discoveryv1.EndpointSlice{{ + ObjectMeta: metav1.ObjectMeta{ + Labels: map[string]string{discoveryv1.LabelServiceName: "default"}, + }, + Endpoints: []discoveryv1.Endpoint{ + { + Addresses: []string{"1.1.1.1"}, + Conditions: discoveryv1.EndpointConditions{ + Ready: &[]bool{true}[0], + }, + Hints: &[]discoveryv1.EndpointHints{{ + ForZones: []discoveryv1.ForZone{{ + Name: "eu-west-1b", + }}, + }}[0], + }, + { + Addresses: []string{"1.1.1.2"}, + Conditions: discoveryv1.EndpointConditions{ + Ready: &[]bool{true}[0], + }, + Hints: &[]discoveryv1.EndpointHints{{ + ForZones: []discoveryv1.ForZone{{ + Name: "eu-west-1a", + }}, + }}[0], + }, + { + Addresses: []string{"1.1.1.3"}, + Conditions: discoveryv1.EndpointConditions{ + Ready: &[]bool{true}[0], + }, + Hints: &[]discoveryv1.EndpointHints{{ + ForZones: []discoveryv1.ForZone{{ + Name: "eu-west-1c", + }}, + }}[0], + }, + }, + Ports: []discoveryv1.EndpointPort{ + { + Protocol: &[]corev1.Protocol{corev1.ProtocolTCP}[0], + Port: &[]int32{80}[0], + Name: &[]string{"port-1"}[0], + }, + }, + }}, nil + }, + []ingress.Endpoint{ + { + Address: "1.1.1.1", + Port: "80", + }, + }, + }, + { + "should return all endpoints because no endpoints with controller zone", + &corev1.Service{ + Spec: corev1.ServiceSpec{ + Type: corev1.ServiceTypeClusterIP, + ClusterIP: "1.1.1.1", + Ports: []corev1.ServicePort{ + { + Name: "default", + TargetPort: intstr.FromString("port-1"), + }, + }, + TrafficDistribution: ptr.To(corev1.ServiceTrafficDistributionPreferClose), + }, + }, + &corev1.ServicePort{ + Name: "port-1", + TargetPort: intstr.FromString("port-1"), + }, + corev1.ProtocolTCP, + "eu-west-1b", + func(string) ([]*discoveryv1.EndpointSlice, error) { + return []*discoveryv1.EndpointSlice{{ + ObjectMeta: metav1.ObjectMeta{ + Labels: map[string]string{discoveryv1.LabelServiceName: "default"}, + }, + Endpoints: []discoveryv1.Endpoint{ + { + Addresses: []string{"1.1.1.1"}, + Conditions: discoveryv1.EndpointConditions{ + Ready: &[]bool{true}[0], + }, + Hints: &[]discoveryv1.EndpointHints{{ + ForZones: []discoveryv1.ForZone{{ + Name: "eu-west-1a", + }}, + }}[0], + }, + { + Addresses: []string{"1.1.1.2"}, + Conditions: discoveryv1.EndpointConditions{ + Ready: &[]bool{true}[0], + }, + Hints: &[]discoveryv1.EndpointHints{{ + ForZones: []discoveryv1.ForZone{{ + Name: "eu-west-1c", + }}, + }}[0], + }, + { + Addresses: []string{"1.1.1.3"}, + Conditions: discoveryv1.EndpointConditions{ + Ready: &[]bool{true}[0], + }, + Hints: &[]discoveryv1.EndpointHints{{ + ForZones: []discoveryv1.ForZone{{ + Name: "eu-west-1c", + }}, + }}[0], + }, + }, + Ports: []discoveryv1.EndpointPort{ + { + Protocol: &[]corev1.Protocol{corev1.ProtocolTCP}[0], + Port: &[]int32{80}[0], + Name: &[]string{"port-1"}[0], + }, + }, + }}, nil + }, + []ingress.Endpoint{ + { + Address: "1.1.1.1", + Port: "80", + }, + { + Address: "1.1.1.2", + Port: "80", + }, + { + Address: "1.1.1.3", + Port: "80", + }, + }, + }, } for _, testCase := range tests { diff --git a/pkg/flags/flags.go b/pkg/flags/flags.go index ce24160fdc..376484649b 100644 --- a/pkg/flags/flags.go +++ b/pkg/flags/flags.go @@ -232,7 +232,7 @@ Takes the form ":port". If not provided, no admission controller is starte disableSyncEvents = flags.Bool("disable-sync-events", false, "Disables the creation of 'Sync' event resources") - enableTopologyAwareRouting = flags.Bool("enable-topology-aware-routing", false, "Enable topology aware routing feature, needs service object annotation service.kubernetes.io/topology-mode sets to auto.") + enableTopologyAwareRouting = flags.Bool("enable-topology-aware-routing", false, "Enable topology aware routing feature, needs service object annotation service.kubernetes.io/topology-mode sets to auto or trafficDistribution.") ) flags.StringVar(&nginx.MaxmindMirror, "maxmind-mirror", "", `Maxmind mirror url (example: http://geoip.local/databases.`)