Skip to content

Commit

Permalink
feat(multicluster): propagate service labels to endpoints (#13583)
Browse files Browse the repository at this point in the history
When Kubernetes creates endpoints for a service, it propagates the
labels from the parent service to the endpoints, allowing these labels
to be used for discovery. When using Linkerd mirrored endpoints,
this ability was lost until now.

This change simply mimics the vanilla Kubernetes behavior by propagating
the remote service labels to mirror endpoints.

https://github.com/kubernetes/kubernetes/blob/a716ea756d87f60900dbbb500fc27ae30f7bd384/pkg/controller/endpoint/endpoints_controller.go#L506

Signed-off-by: Maxime Brunet <[email protected]>
  • Loading branch information
maxbrunet authored Jan 23, 2025
1 parent 6714331 commit a979b4a
Show file tree
Hide file tree
Showing 4 changed files with 129 additions and 103 deletions.
15 changes: 11 additions & 4 deletions multicluster/service-mirror/cluster_watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -289,6 +289,16 @@ func (rcsw *RemoteClusterServiceWatcher) getMirrorServiceLabels(remoteService *c
return labels
}

// Provides labels for mirror endpoint. Copies all labels from the exported
// service to the mirror endpoint (except labels with the "SvcMirrorPrefix").
func (rcsw *RemoteClusterServiceWatcher) getMirrorEndpointLabels(exportedService *corev1.Service) map[string]string {
labels := rcsw.getCommonServiceLabels(exportedService)

labels[consts.RemoteClusterNameLabel] = rcsw.link.Spec.TargetClusterName

return labels
}

// Provides labels for federated services. Copies all labels from the remote
// service to the federated service (except labels with the "SvcMirrorPrefix").
func (rcsw *RemoteClusterServiceWatcher) getFederatedServiceLabels(remoteService *corev1.Service) map[string]string {
Expand Down Expand Up @@ -1000,10 +1010,7 @@ func (rcsw *RemoteClusterServiceWatcher) createGatewayEndpoints(ctx context.Cont
ObjectMeta: metav1.ObjectMeta{
Name: localServiceName,
Namespace: exportedService.Namespace,
Labels: map[string]string{
consts.MirroredResourceLabel: "true",
consts.RemoteClusterNameLabel: rcsw.link.Spec.TargetClusterName,
},
Labels: rcsw.getMirrorEndpointLabels(exportedService),
Annotations: map[string]string{
consts.RemoteServiceFqName: fmt.Sprintf("%s.%s.svc.%s", exportedService.Name, exportedService.Namespace, rcsw.link.Spec.TargetClusterDomain),
},
Expand Down
5 changes: 1 addition & 4 deletions multicluster/service-mirror/cluster_watcher_headless.go
Original file line number Diff line number Diff line change
Expand Up @@ -290,10 +290,7 @@ func (rcsw *RemoteClusterServiceWatcher) createHeadlessMirrorEndpoints(ctx conte
ObjectMeta: metav1.ObjectMeta{
Name: headlessMirrorServiceName,
Namespace: exportedService.Namespace,
Labels: map[string]string{
consts.MirroredResourceLabel: "true",
consts.RemoteClusterNameLabel: rcsw.link.Spec.TargetClusterName,
},
Labels: rcsw.getMirrorEndpointLabels(exportedService),
Annotations: map[string]string{
consts.RemoteServiceFqName: fmt.Sprintf("%s.%s.svc.%s", exportedService.Name, exportedService.Namespace, rcsw.link.Spec.TargetClusterDomain),
},
Expand Down
60 changes: 34 additions & 26 deletions multicluster/service-mirror/cluster_watcher_mirroring_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,9 @@ type mirroringTestCase struct {
}

func (tc *mirroringTestCase) run(t *testing.T) {
t.Helper()
t.Run(tc.description, func(t *testing.T) {
t.Helper()

q := workqueue.NewTypedRateLimitingQueue(workqueue.DefaultTypedControllerRateLimiter[any]())
localAPI, err := tc.environment.runEnvironment(q)
Expand All @@ -52,7 +54,7 @@ func (tc *mirroringTestCase) run(t *testing.T) {
}

if err := diffServices(expected, actual); err != nil {
t.Fatal(err)
t.Fatalf("service %s/%s: %v", expected.Namespace, expected.Name, err)
}
}
}
Expand All @@ -68,7 +70,7 @@ func (tc *mirroringTestCase) run(t *testing.T) {
}

if err := diffEndpoints(expected, actual); err != nil {
t.Fatal(err)
t.Fatalf("endpoint %s/%s: %v", expected.Namespace, expected.Name, err)
}
}
}
Expand Down Expand Up @@ -99,6 +101,7 @@ func TestRemoteServiceCreatedMirroring(t *testing.T) {
"service-one-remote",
"ns1",
"111",
map[string]string{"lk": "lv"},
[]corev1.ServicePort{
{
Name: "port1",
Expand All @@ -113,7 +116,7 @@ func TestRemoteServiceCreatedMirroring(t *testing.T) {
}),
},
expectedLocalEndpoints: []*corev1.Endpoints{
endpoints("service-one-remote", "ns1", "192.0.2.127", "gateway-identity", []corev1.EndpointPort{
endpoints("service-one-remote", "ns1", map[string]string{"lk": "lv"}, "192.0.2.127", "gateway-identity", []corev1.EndpointPort{
{
Name: "port1",
Port: 888,
Expand All @@ -135,6 +138,7 @@ func TestRemoteServiceCreatedMirroring(t *testing.T) {
"service-one-remote",
"ns2",
"111",
map[string]string{"lk": "lv"},
[]corev1.ServicePort{
{
Name: "port1",
Expand All @@ -152,6 +156,7 @@ func TestRemoteServiceCreatedMirroring(t *testing.T) {
"service-one-remote",
"ns2",
"112",
map[string]string{"lk": "lv"},
[]corev1.ServicePort{
{
Name: "port1",
Expand All @@ -167,7 +172,7 @@ func TestRemoteServiceCreatedMirroring(t *testing.T) {
),
},
expectedLocalEndpoints: []*corev1.Endpoints{
headlessMirrorEndpoints("service-one-remote", "ns2", "gateway-identity", []corev1.EndpointPort{
headlessMirrorEndpoints("service-one-remote", "ns2", map[string]string{"lk": "lv"}, "gateway-identity", []corev1.EndpointPort{
{
Name: "port1",
Port: 555,
Expand All @@ -182,6 +187,7 @@ func TestRemoteServiceCreatedMirroring(t *testing.T) {
endpointMirrorEndpoints(
"service-one-remote",
"ns2",
map[string]string{"lk": "lv"},
"pod-0",
"192.0.2.129",
"gateway-identity",
Expand Down Expand Up @@ -388,7 +394,7 @@ func TestLocalNamespaceCreatedAfterServiceExport(t *testing.T) {
remoteAPI, err := k8s.NewFakeAPI(
asYaml(gateway("existing-gateway", "existing-namespace", "222", "192.0.2.127", "mc-gateway", 888, "gateway-identity", defaultProbePort, defaultProbePath, defaultProbePeriod)),
asYaml(remoteService("service-one", "ns1", "111", map[string]string{consts.DefaultExportedServiceSelector: "true"}, []corev1.ServicePort{})),
asYaml(endpoints("service-one", "ns1", "192.0.2.127", "gateway-identity", []corev1.EndpointPort{})),
asYaml(endpoints("service-one", "ns1", nil, "192.0.2.127", "gateway-identity", []corev1.EndpointPort{})),
)
if err != nil {
t.Fatal(err)
Expand Down Expand Up @@ -480,7 +486,7 @@ func TestServiceCreatedGatewayAlive(t *testing.T) {
remoteAPI, err := k8s.NewFakeAPI(
asYaml(gateway("gateway", "gateway-ns", "1", "192.0.0.1", "gateway", 888, "gateway-identity", defaultProbePort, defaultProbePath, defaultProbePeriod)),
asYaml(remoteService("svc", "ns", "1", map[string]string{consts.DefaultExportedServiceSelector: "true"}, []corev1.ServicePort{})),
asYaml(endpoints("svc", "ns", "192.0.0.1", "gateway-identity", []corev1.EndpointPort{})),
asYaml(endpoints("svc", "ns", nil, "192.0.0.1", "gateway-identity", []corev1.EndpointPort{})),
)
if err != nil {
t.Fatal(err)
Expand Down Expand Up @@ -631,7 +637,7 @@ func TestServiceCreatedGatewayDown(t *testing.T) {
remoteAPI, err := k8s.NewFakeAPI(
asYaml(gateway("gateway", "gateway-ns", "1", "192.0.0.1", "gateway", 888, "gateway-identity", defaultProbePort, defaultProbePath, defaultProbePeriod)),
asYaml(remoteService("svc", "ns", "1", map[string]string{consts.DefaultExportedServiceSelector: "true"}, []corev1.ServicePort{})),
asYaml(endpoints("svc", "ns", "192.0.0.1", "gateway-identity", []corev1.EndpointPort{})),
asYaml(endpoints("svc", "ns", nil, "192.0.0.1", "gateway-identity", []corev1.EndpointPort{})),
)
if err != nil {
t.Fatal(err)
Expand Down Expand Up @@ -747,7 +753,7 @@ func TestRemoteServiceUpdatedMirroring(t *testing.T) {
description: "updates service ports on both service and endpoints",
environment: updateServiceWithChangedPorts,
expectedLocalServices: []*corev1.Service{
mirrorService("test-service-remote", "test-namespace", "currentServiceResVersion",
mirrorService("test-service-remote", "test-namespace", "currentServiceResVersion", nil,
[]corev1.ServicePort{
{
Name: "port1",
Expand All @@ -763,7 +769,7 @@ func TestRemoteServiceUpdatedMirroring(t *testing.T) {
},

expectedLocalEndpoints: []*corev1.Endpoints{
endpoints("test-service-remote", "test-namespace", "192.0.2.127", "gateway-identity", []corev1.EndpointPort{
endpoints("test-service-remote", "test-namespace", nil, "192.0.2.127", "gateway-identity", []corev1.EndpointPort{
{
Name: "port1",
Port: 888,
Expand Down Expand Up @@ -847,7 +853,7 @@ func TestRemoteEndpointsUpdatedMirroring(t *testing.T) {
description: "updates headless mirror service with new remote Endpoints hosts",
environment: updateEndpointsWithChangedHosts,
expectedLocalServices: []*corev1.Service{
headlessMirrorService("service-two-remote", "eptest", "222", []corev1.ServicePort{
headlessMirrorService("service-two-remote", "eptest", "222", nil, []corev1.ServicePort{
{
Name: "port1",
Protocol: "TCP",
Expand All @@ -859,7 +865,7 @@ func TestRemoteEndpointsUpdatedMirroring(t *testing.T) {
Port: 666,
},
}),
endpointMirrorService("pod-0", "service-two-remote", "eptest", "333", []corev1.ServicePort{
endpointMirrorService("pod-0", "service-two-remote", "eptest", "333", nil, []corev1.ServicePort{
{
Name: "port1",
Protocol: "TCP",
Expand All @@ -871,7 +877,7 @@ func TestRemoteEndpointsUpdatedMirroring(t *testing.T) {
Port: 666,
},
}),
endpointMirrorService("pod-1", "service-two-remote", "eptest", "112", []corev1.ServicePort{
endpointMirrorService("pod-1", "service-two-remote", "eptest", "112", nil, []corev1.ServicePort{
{
Name: "port1",
Protocol: "TCP",
Expand Down Expand Up @@ -906,6 +912,7 @@ func TestRemoteEndpointsUpdatedMirroring(t *testing.T) {
endpointMirrorEndpoints(
"service-two-remote",
"eptest",
nil,
"pod-0",
"192.0.2.127",
"gateway-identity",
Expand All @@ -924,6 +931,7 @@ func TestRemoteEndpointsUpdatedMirroring(t *testing.T) {
endpointMirrorEndpoints(
"service-two-remote",
"eptest",
nil,
"pod-1",
"192.0.2.127",
"gateway-identity",
Expand Down Expand Up @@ -965,15 +973,15 @@ func TestGcOrphanedServicesMirroring(t *testing.T) {
description: "deletes mirrored resources that are no longer present on the remote cluster",
environment: gcTriggered,
expectedLocalServices: []*corev1.Service{
mirrorService("test-service-1-remote", "test-namespace", "", nil),
headlessMirrorService("test-headless-service-remote", "test-namespace", "", nil),
endpointMirrorService("pod-0", "test-headless-service-remote", "test-namespace", "", nil),
mirrorService("test-service-1-remote", "test-namespace", "", nil, nil),
headlessMirrorService("test-headless-service-remote", "test-namespace", "", nil, nil),
endpointMirrorService("pod-0", "test-headless-service-remote", "test-namespace", "", nil, nil),
},

expectedLocalEndpoints: []*corev1.Endpoints{
endpoints("test-service-1-remote", "test-namespace", "", "", nil),
headlessMirrorEndpoints("test-headless-service-remote", "test-namespace", "", nil),
endpointMirrorEndpoints("test-headless-service-remote", "test-namespace", "pod-0", "", "", nil),
endpoints("test-service-1-remote", "test-namespace", nil, "", "", nil),
headlessMirrorEndpoints("test-headless-service-remote", "test-namespace", nil, "", nil),
endpointMirrorEndpoints("test-headless-service-remote", "test-namespace", nil, "pod-0", "", "", nil),
},
},
} {
Expand Down Expand Up @@ -1003,27 +1011,27 @@ func onAddOrUpdateTestCases(isAdd bool) []mirroringTestCase {
description: fmt.Sprintf("enqueue a RemoteServiceUpdated event if this is a service that we have already mirrored and its res version is different (%s)", testType),
environment: onAddOrUpdateRemoteServiceUpdated(isAdd),
expectedEventsInQueue: []interface{}{&RemoteExportedServiceUpdated{
localService: mirrorService("test-service-remote", "test-namespace", "pastResourceVersion", nil),
localEndpoints: endpoints("test-service-remote", "test-namespace", "0.0.0.0", "", nil),
localService: mirrorService("test-service-remote", "test-namespace", "pastResourceVersion", nil, nil),
localEndpoints: endpoints("test-service-remote", "test-namespace", nil, "0.0.0.0", "", nil),
remoteUpdate: remoteService("test-service", "test-namespace", "currentResVersion", map[string]string{
consts.DefaultExportedServiceSelector: "true",
}, nil),
}},
expectedLocalServices: []*corev1.Service{
mirrorService("test-service-remote", "test-namespace", "pastResourceVersion", nil),
mirrorService("test-service-remote", "test-namespace", "pastResourceVersion", nil, nil),
},
expectedLocalEndpoints: []*corev1.Endpoints{
endpoints("test-service-remote", "test-namespace", "0.0.0.0", "", nil),
endpoints("test-service-remote", "test-namespace", nil, "0.0.0.0", "", nil),
},
},
{
description: fmt.Sprintf("not enqueue any events as this update does not really tell us anything new (res version is the same...) (%s)", testType),
environment: onAddOrUpdateSameResVersion(isAdd),
expectedLocalServices: []*corev1.Service{
mirrorService("test-service-remote", "test-namespace", "currentResVersion", nil),
mirrorService("test-service-remote", "test-namespace", "currentResVersion", nil, nil),
},
expectedLocalEndpoints: []*corev1.Endpoints{
endpoints("test-service-remote", "test-namespace", "0.0.0.0", "", nil),
endpoints("test-service-remote", "test-namespace", nil, "0.0.0.0", "", nil),
},
},
{
Expand All @@ -1035,10 +1043,10 @@ func onAddOrUpdateTestCases(isAdd bool) []mirroringTestCase {
}},

expectedLocalServices: []*corev1.Service{
mirrorService("test-service-remote", "test-namespace", "currentResVersion", nil),
mirrorService("test-service-remote", "test-namespace", "currentResVersion", nil, nil),
},
expectedLocalEndpoints: []*corev1.Endpoints{
endpoints("test-service-remote", "test-namespace", "0.0.0.0", "", nil),
endpoints("test-service-remote", "test-namespace", nil, "0.0.0.0", "", nil),
},
},
}
Expand Down
Loading

0 comments on commit a979b4a

Please sign in to comment.