Skip to content

Commit 678e084

Browse files
committed
feat: support traffic distribution
1 parent 8ce4af2 commit 678e084

File tree

4 files changed

+193
-1
lines changed

4 files changed

+193
-1
lines changed

internal/ingress/controller/controller.go

+7
Original file line numberDiff line numberDiff line change
@@ -156,6 +156,13 @@ func getIngressPodZone(svc *apiv1.Service) string {
156156
}
157157
}
158158
}
159+
if svc.Spec.TrafficDistribution != nil && *svc.Spec.TrafficDistribution == apiv1.ServiceTrafficDistributionPreferClose {
160+
if foundZone, ok := k8s.IngressNodeDetails.GetLabels()[apiv1.LabelTopologyZone]; ok {
161+
klog.V(3).Infof("Svc has traffic distribution enabled, try to use zone %q where controller pod is running for Service %q ", foundZone, svcKey)
162+
return foundZone
163+
}
164+
}
165+
159166
return emptyZone
160167
}
161168

internal/ingress/controller/endpointslices.go

+22
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,10 @@ func getEndpointsFromSlices(s *corev1.Service, port *corev1.ServicePort, proto c
4444
return upsServers
4545
}
4646

47+
// we need to check if there is at least one endpoint with controller zone
48+
// if we use traffic distribution
49+
useTrafficDistribution := s.Spec.TrafficDistribution != nil && *s.Spec.TrafficDistribution == corev1.ServiceTrafficDistributionPreferClose
50+
4751
// using a map avoids duplicated upstream servers when the service
4852
// contains multiple port definitions sharing the same targetport
4953
processedUpstreamServers := make(map[string]struct{})
@@ -115,6 +119,24 @@ func getEndpointsFromSlices(s *corev1.Service, port *corev1.ServicePort, proto c
115119
useTopologyHints = false
116120
if zoneForHints != emptyZone {
117121
useTopologyHints = true
122+
123+
// check if endpointslices have zone hints with controller zone
124+
if useTrafficDistribution {
125+
foundEndpointsForZone := false
126+
for _, ep := range eps.Endpoints {
127+
for _, epzone := range ep.Hints.ForZones {
128+
if epzone.Name == zoneForHints {
129+
foundEndpointsForZone = true
130+
break
131+
}
132+
}
133+
}
134+
if !foundEndpointsForZone {
135+
klog.V(3).Infof("No endpoints found for zone %q in Service %q", zoneForHints, svcKey)
136+
useTopologyHints = false
137+
}
138+
}
139+
118140
// check if all endpointslices have zone hints
119141
for _, ep := range eps.Endpoints {
120142
if ep.Hints == nil || len(ep.Hints.ForZones) == 0 {

internal/ingress/controller/endpointslices_test.go

+163
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ package controller
1818

1919
import (
2020
"fmt"
21+
"k8s.io/utils/ptr"
2122
"testing"
2223

2324
corev1 "k8s.io/api/core/v1"
@@ -916,6 +917,168 @@ func TestGetEndpointsFromSlices(t *testing.T) {
916917
},
917918
},
918919
},
920+
{
921+
"should return one endpoint which belongs to zone",
922+
&corev1.Service{
923+
Spec: corev1.ServiceSpec{
924+
Type: corev1.ServiceTypeClusterIP,
925+
ClusterIP: "1.1.1.1",
926+
Ports: []corev1.ServicePort{
927+
{
928+
Name: "default",
929+
TargetPort: intstr.FromString("port-1"),
930+
},
931+
},
932+
TrafficDistribution: ptr.To(corev1.ServiceTrafficDistributionPreferClose),
933+
},
934+
},
935+
&corev1.ServicePort{
936+
Name: "port-1",
937+
TargetPort: intstr.FromString("port-1"),
938+
},
939+
corev1.ProtocolTCP,
940+
"eu-west-1b",
941+
func(string) ([]*discoveryv1.EndpointSlice, error) {
942+
return []*discoveryv1.EndpointSlice{{
943+
ObjectMeta: metav1.ObjectMeta{
944+
Labels: map[string]string{discoveryv1.LabelServiceName: "default"},
945+
},
946+
Endpoints: []discoveryv1.Endpoint{
947+
{
948+
Addresses: []string{"1.1.1.1"},
949+
Conditions: discoveryv1.EndpointConditions{
950+
Ready: &[]bool{true}[0],
951+
},
952+
Hints: &[]discoveryv1.EndpointHints{{
953+
ForZones: []discoveryv1.ForZone{{
954+
Name: "eu-west-1b",
955+
}},
956+
}}[0],
957+
},
958+
{
959+
Addresses: []string{"1.1.1.2"},
960+
Conditions: discoveryv1.EndpointConditions{
961+
Ready: &[]bool{true}[0],
962+
},
963+
Hints: &[]discoveryv1.EndpointHints{{
964+
ForZones: []discoveryv1.ForZone{{
965+
Name: "eu-west-1a",
966+
}},
967+
}}[0],
968+
},
969+
{
970+
Addresses: []string{"1.1.1.3"},
971+
Conditions: discoveryv1.EndpointConditions{
972+
Ready: &[]bool{true}[0],
973+
},
974+
Hints: &[]discoveryv1.EndpointHints{{
975+
ForZones: []discoveryv1.ForZone{{
976+
Name: "eu-west-1c",
977+
}},
978+
}}[0],
979+
},
980+
},
981+
Ports: []discoveryv1.EndpointPort{
982+
{
983+
Protocol: &[]corev1.Protocol{corev1.ProtocolTCP}[0],
984+
Port: &[]int32{80}[0],
985+
Name: &[]string{"port-1"}[0],
986+
},
987+
},
988+
}}, nil
989+
},
990+
[]ingress.Endpoint{
991+
{
992+
Address: "1.1.1.1",
993+
Port: "80",
994+
},
995+
},
996+
},
997+
{
998+
"should return all endpoints because no endpoints with controller zone",
999+
&corev1.Service{
1000+
Spec: corev1.ServiceSpec{
1001+
Type: corev1.ServiceTypeClusterIP,
1002+
ClusterIP: "1.1.1.1",
1003+
Ports: []corev1.ServicePort{
1004+
{
1005+
Name: "default",
1006+
TargetPort: intstr.FromString("port-1"),
1007+
},
1008+
},
1009+
TrafficDistribution: ptr.To(corev1.ServiceTrafficDistributionPreferClose),
1010+
},
1011+
},
1012+
&corev1.ServicePort{
1013+
Name: "port-1",
1014+
TargetPort: intstr.FromString("port-1"),
1015+
},
1016+
corev1.ProtocolTCP,
1017+
"eu-west-1b",
1018+
func(string) ([]*discoveryv1.EndpointSlice, error) {
1019+
return []*discoveryv1.EndpointSlice{{
1020+
ObjectMeta: metav1.ObjectMeta{
1021+
Labels: map[string]string{discoveryv1.LabelServiceName: "default"},
1022+
},
1023+
Endpoints: []discoveryv1.Endpoint{
1024+
{
1025+
Addresses: []string{"1.1.1.1"},
1026+
Conditions: discoveryv1.EndpointConditions{
1027+
Ready: &[]bool{true}[0],
1028+
},
1029+
Hints: &[]discoveryv1.EndpointHints{{
1030+
ForZones: []discoveryv1.ForZone{{
1031+
Name: "eu-west-1a",
1032+
}},
1033+
}}[0],
1034+
},
1035+
{
1036+
Addresses: []string{"1.1.1.2"},
1037+
Conditions: discoveryv1.EndpointConditions{
1038+
Ready: &[]bool{true}[0],
1039+
},
1040+
Hints: &[]discoveryv1.EndpointHints{{
1041+
ForZones: []discoveryv1.ForZone{{
1042+
Name: "eu-west-1c",
1043+
}},
1044+
}}[0],
1045+
},
1046+
{
1047+
Addresses: []string{"1.1.1.3"},
1048+
Conditions: discoveryv1.EndpointConditions{
1049+
Ready: &[]bool{true}[0],
1050+
},
1051+
Hints: &[]discoveryv1.EndpointHints{{
1052+
ForZones: []discoveryv1.ForZone{{
1053+
Name: "eu-west-1c",
1054+
}},
1055+
}}[0],
1056+
},
1057+
},
1058+
Ports: []discoveryv1.EndpointPort{
1059+
{
1060+
Protocol: &[]corev1.Protocol{corev1.ProtocolTCP}[0],
1061+
Port: &[]int32{80}[0],
1062+
Name: &[]string{"port-1"}[0],
1063+
},
1064+
},
1065+
}}, nil
1066+
},
1067+
[]ingress.Endpoint{
1068+
{
1069+
Address: "1.1.1.1",
1070+
Port: "80",
1071+
},
1072+
{
1073+
Address: "1.1.1.2",
1074+
Port: "80",
1075+
},
1076+
{
1077+
Address: "1.1.1.3",
1078+
Port: "80",
1079+
},
1080+
},
1081+
},
9191082
}
9201083

9211084
for _, testCase := range tests {

pkg/flags/flags.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -232,7 +232,7 @@ Takes the form "<host>:port". If not provided, no admission controller is starte
232232

233233
disableSyncEvents = flags.Bool("disable-sync-events", false, "Disables the creation of 'Sync' event resources")
234234

235-
enableTopologyAwareRouting = flags.Bool("enable-topology-aware-routing", false, "Enable topology aware routing feature, needs service object annotation service.kubernetes.io/topology-mode sets to auto.")
235+
enableTopologyAwareRouting = flags.Bool("enable-topology-aware-routing", false, "Enable topology aware routing feature, needs service object annotation service.kubernetes.io/topology-mode sets to auto or trafficDistribution.")
236236
)
237237

238238
flags.StringVar(&nginx.MaxmindMirror, "maxmind-mirror", "", `Maxmind mirror url (example: http://geoip.local/databases.`)

0 commit comments

Comments
 (0)