mirror of
https://github.com/prometheus/prometheus.git
synced 2026-04-24 23:58:56 -04:00
discovery/k8s: Dedup EPS for *DualStack policies
In case of {Prefer,Require}DualStack policies in Services, K8s will
create two `EndpointSlices` resources for each IP family address type
specified. This created duplicate targets.
Signed-off-by: Pranshu Srivastava <rexagod@gmail.com>
This commit is contained in:
parent
5d3f9ee39b
commit
a87b603464
2 changed files with 225 additions and 5 deletions
|
|
@ -306,12 +306,16 @@ func (e *EndpointSlice) buildEndpointSlice(eps v1.EndpointSlice) *targetgroup.Gr
|
|||
|
||||
addObjectMetaLabels(tg.Labels, eps.ObjectMeta, RoleEndpointSlice)
|
||||
|
||||
e.addServiceLabels(eps, tg)
|
||||
svc := e.addServiceLabels(eps, tg)
|
||||
|
||||
if e.withNamespaceMetadata {
|
||||
tg.Labels = addNamespaceLabels(tg.Labels, e.namespaceInf, e.logger, eps.Namespace)
|
||||
}
|
||||
|
||||
if nonPrimaryIPFamilySlice(svc, eps) {
|
||||
return tg
|
||||
}
|
||||
|
||||
type podEntry struct {
|
||||
pod *apiv1.Pod
|
||||
servicePorts []v1.EndpointPort
|
||||
|
|
@ -504,7 +508,28 @@ func (e *EndpointSlice) resolvePodRef(ref *apiv1.ObjectReference) *apiv1.Pod {
|
|||
return obj.(*apiv1.Pod)
|
||||
}
|
||||
|
||||
func (e *EndpointSlice) addServiceLabels(esa v1.EndpointSlice, tg *targetgroup.Group) {
|
||||
// nonPrimaryIPFamilySlice reports whether eps is the secondary slice of a
|
||||
// dual-stack service, i.e. its address type does not match the service's
|
||||
// primary IP family. Targets from such slices would duplicate those of the
|
||||
// primary slice.
|
||||
func nonPrimaryIPFamilySlice(svc *apiv1.Service, eps v1.EndpointSlice) bool {
|
||||
if svc == nil {
|
||||
return false
|
||||
}
|
||||
policy := svc.Spec.IPFamilyPolicy
|
||||
if policy == nil {
|
||||
return false
|
||||
}
|
||||
if *policy != apiv1.IPFamilyPolicyPreferDualStack && *policy != apiv1.IPFamilyPolicyRequireDualStack {
|
||||
return false
|
||||
}
|
||||
if len(svc.Spec.IPFamilies) == 0 {
|
||||
return false
|
||||
}
|
||||
return string(eps.AddressType) != string(svc.Spec.IPFamilies[0])
|
||||
}
|
||||
|
||||
func (e *EndpointSlice) addServiceLabels(esa v1.EndpointSlice, tg *targetgroup.Group) *apiv1.Service {
|
||||
var (
|
||||
found bool
|
||||
name string
|
||||
|
|
@ -515,18 +540,19 @@ func (e *EndpointSlice) addServiceLabels(esa v1.EndpointSlice, tg *targetgroup.G
|
|||
// kubernetes.io/service-name label.
|
||||
name, found = esa.Labels[v1.LabelServiceName]
|
||||
if !found {
|
||||
return
|
||||
return nil
|
||||
}
|
||||
|
||||
obj, exists, err := e.serviceStore.GetByKey(namespacedName(ns, name))
|
||||
if err != nil {
|
||||
e.logger.Error("retrieving service failed", "err", err)
|
||||
return
|
||||
return nil
|
||||
}
|
||||
if !exists {
|
||||
return
|
||||
return nil
|
||||
}
|
||||
svc := obj.(*apiv1.Service)
|
||||
|
||||
tg.Labels = tg.Labels.Merge(serviceLabels(svc))
|
||||
return svc
|
||||
}
|
||||
|
|
|
|||
|
|
@ -1611,3 +1611,197 @@ func TestEndpointsSlicesDiscoveryWithUpdatedNamespaceMetadata(t *testing.T) {
|
|||
},
|
||||
}.Run(t)
|
||||
}
|
||||
|
||||
func makeDualStackService(name, namespace string, policy corev1.IPFamilyPolicy, families []corev1.IPFamily) *corev1.Service {
|
||||
return &corev1.Service{
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Name: name,
|
||||
Namespace: namespace,
|
||||
},
|
||||
Spec: corev1.ServiceSpec{
|
||||
IPFamilyPolicy: &policy,
|
||||
IPFamilies: families,
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
func makeDualStackEndpointSlice(name, namespace, svcName string, addrType v1.AddressType, addr string) *v1.EndpointSlice {
|
||||
return &v1.EndpointSlice{
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Name: name,
|
||||
Namespace: namespace,
|
||||
Labels: map[string]string{v1.LabelServiceName: svcName},
|
||||
},
|
||||
AddressType: addrType,
|
||||
Ports: []v1.EndpointPort{
|
||||
{
|
||||
Name: strptr("http"),
|
||||
Port: int32ptr(9000),
|
||||
Protocol: protocolptr(corev1.ProtocolTCP),
|
||||
},
|
||||
},
|
||||
Endpoints: []v1.Endpoint{
|
||||
{
|
||||
Addresses: []string{addr},
|
||||
Conditions: v1.EndpointConditions{Ready: boolptr(true)},
|
||||
},
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
func dualStackGroupLabels(namespace, addrType, sliceName, svcName string) model.LabelSet {
|
||||
return model.LabelSet{
|
||||
"__meta_kubernetes_namespace": model.LabelValue(namespace),
|
||||
"__meta_kubernetes_endpointslice_address_type": model.LabelValue(addrType),
|
||||
"__meta_kubernetes_endpointslice_name": model.LabelValue(sliceName),
|
||||
"__meta_kubernetes_endpointslice_label_kubernetes_io_service_name": model.LabelValue(svcName),
|
||||
"__meta_kubernetes_endpointslice_labelpresent_kubernetes_io_service_name": "true",
|
||||
"__meta_kubernetes_service_name": model.LabelValue(svcName),
|
||||
}
|
||||
}
|
||||
|
||||
func dualStackTarget(addr string) model.LabelSet {
|
||||
return model.LabelSet{
|
||||
"__address__": model.LabelValue(addr),
|
||||
"__meta_kubernetes_endpointslice_endpoint_conditions_ready": "true",
|
||||
"__meta_kubernetes_endpointslice_port": "9000",
|
||||
"__meta_kubernetes_endpointslice_port_name": "http",
|
||||
"__meta_kubernetes_endpointslice_port_protocol": "TCP",
|
||||
}
|
||||
}
|
||||
|
||||
// TestEndpointSliceDiscoveryDualStackPreferredSkipsSecondarySlice verifies
|
||||
// that when a service has ipFamilyPolicy=PreferDualStack with IPv4 as the
|
||||
// primary family, only the IPv4 EndpointSlice generates targets. The IPv6
|
||||
// (secondary) EndpointSlice produces an empty target group.
|
||||
func TestEndpointSliceDiscoveryDualStackPreferredSkipsSecondarySlice(t *testing.T) {
|
||||
t.Parallel()
|
||||
svc := makeDualStackService("testsvc", "default", corev1.IPFamilyPolicyPreferDualStack,
|
||||
[]corev1.IPFamily{corev1.IPv4Protocol, corev1.IPv6Protocol})
|
||||
epsIPv4 := makeDualStackEndpointSlice("testsvc-ipv4", "default", "testsvc", v1.AddressTypeIPv4, "1.2.3.4")
|
||||
epsIPv6 := makeDualStackEndpointSlice("testsvc-ipv6", "default", "testsvc", v1.AddressTypeIPv6, "fd00::1")
|
||||
|
||||
n, _ := makeDiscovery(RoleEndpointSlice, NamespaceDiscovery{Names: []string{"default"}}, svc, epsIPv4, epsIPv6)
|
||||
|
||||
k8sDiscoveryTest{
|
||||
discovery: n,
|
||||
expectedMaxItems: 2,
|
||||
expectedRes: map[string]*targetgroup.Group{
|
||||
"endpointslice/default/testsvc-ipv4": {
|
||||
Targets: []model.LabelSet{dualStackTarget("1.2.3.4:9000")},
|
||||
Labels: dualStackGroupLabels("default", "IPv4", "testsvc-ipv4", "testsvc"),
|
||||
Source: "endpointslice/default/testsvc-ipv4",
|
||||
},
|
||||
"endpointslice/default/testsvc-ipv6": {
|
||||
Targets: nil,
|
||||
Labels: dualStackGroupLabels("default", "IPv6", "testsvc-ipv6", "testsvc"),
|
||||
Source: "endpointslice/default/testsvc-ipv6",
|
||||
},
|
||||
},
|
||||
}.Run(t)
|
||||
}
|
||||
|
||||
// TestEndpointSliceDiscoveryDualStackRequiredSkipsSecondarySlice verifies the
|
||||
// same deduplication behaviour for ipFamilyPolicy=RequireDualStack.
|
||||
func TestEndpointSliceDiscoveryDualStackRequiredSkipsSecondarySlice(t *testing.T) {
|
||||
t.Parallel()
|
||||
svc := makeDualStackService("testsvc", "default", corev1.IPFamilyPolicyRequireDualStack,
|
||||
[]corev1.IPFamily{corev1.IPv4Protocol, corev1.IPv6Protocol})
|
||||
epsIPv4 := makeDualStackEndpointSlice("testsvc-ipv4", "default", "testsvc", v1.AddressTypeIPv4, "1.2.3.4")
|
||||
epsIPv6 := makeDualStackEndpointSlice("testsvc-ipv6", "default", "testsvc", v1.AddressTypeIPv6, "fd00::1")
|
||||
|
||||
n, _ := makeDiscovery(RoleEndpointSlice, NamespaceDiscovery{Names: []string{"default"}}, svc, epsIPv4, epsIPv6)
|
||||
|
||||
k8sDiscoveryTest{
|
||||
discovery: n,
|
||||
expectedMaxItems: 2,
|
||||
expectedRes: map[string]*targetgroup.Group{
|
||||
"endpointslice/default/testsvc-ipv4": {
|
||||
Targets: []model.LabelSet{dualStackTarget("1.2.3.4:9000")},
|
||||
Labels: dualStackGroupLabels("default", "IPv4", "testsvc-ipv4", "testsvc"),
|
||||
Source: "endpointslice/default/testsvc-ipv4",
|
||||
},
|
||||
"endpointslice/default/testsvc-ipv6": {
|
||||
Targets: nil,
|
||||
Labels: dualStackGroupLabels("default", "IPv6", "testsvc-ipv6", "testsvc"),
|
||||
Source: "endpointslice/default/testsvc-ipv6",
|
||||
},
|
||||
},
|
||||
}.Run(t)
|
||||
}
|
||||
|
||||
func TestEndpointSliceDiscoverySingleStackIPv4Unaffected(t *testing.T) {
|
||||
t.Parallel()
|
||||
svc := makeDualStackService("testsvc", "default", corev1.IPFamilyPolicySingleStack,
|
||||
[]corev1.IPFamily{corev1.IPv4Protocol})
|
||||
epsIPv4 := makeDualStackEndpointSlice("testsvc-ipv4", "default", "testsvc", v1.AddressTypeIPv4, "1.2.3.4")
|
||||
|
||||
n, _ := makeDiscovery(RoleEndpointSlice, NamespaceDiscovery{Names: []string{"default"}}, svc, epsIPv4)
|
||||
|
||||
k8sDiscoveryTest{
|
||||
discovery: n,
|
||||
expectedMaxItems: 1,
|
||||
expectedRes: map[string]*targetgroup.Group{
|
||||
"endpointslice/default/testsvc-ipv4": {
|
||||
Targets: []model.LabelSet{dualStackTarget("1.2.3.4:9000")},
|
||||
Labels: dualStackGroupLabels("default", "IPv4", "testsvc-ipv4", "testsvc"),
|
||||
Source: "endpointslice/default/testsvc-ipv4",
|
||||
},
|
||||
},
|
||||
}.Run(t)
|
||||
}
|
||||
|
||||
// TestEndpointSliceDiscoverySingleStackIPv6Unaffected verifies that a
|
||||
// single-stack IPv6 service is not affected by the dual-stack deduplication
|
||||
// logic and still generates targets.
|
||||
func TestEndpointSliceDiscoverySingleStackIPv6Unaffected(t *testing.T) {
|
||||
t.Parallel()
|
||||
svc := makeDualStackService("testsvc", "default", corev1.IPFamilyPolicySingleStack,
|
||||
[]corev1.IPFamily{corev1.IPv6Protocol})
|
||||
epsIPv6 := makeDualStackEndpointSlice("testsvc-ipv6", "default", "testsvc", v1.AddressTypeIPv6, "fd00::1")
|
||||
|
||||
n, _ := makeDiscovery(RoleEndpointSlice, NamespaceDiscovery{Names: []string{"default"}}, svc, epsIPv6)
|
||||
|
||||
k8sDiscoveryTest{
|
||||
discovery: n,
|
||||
expectedMaxItems: 1,
|
||||
expectedRes: map[string]*targetgroup.Group{
|
||||
"endpointslice/default/testsvc-ipv6": {
|
||||
Targets: []model.LabelSet{dualStackTarget("[fd00::1]:9000")},
|
||||
Labels: dualStackGroupLabels("default", "IPv6", "testsvc-ipv6", "testsvc"),
|
||||
Source: "endpointslice/default/testsvc-ipv6",
|
||||
},
|
||||
},
|
||||
}.Run(t)
|
||||
}
|
||||
|
||||
// TestEndpointSliceDiscoveryDualStackPreferredIPv6Primary verifies that when
|
||||
// IPv6 is the primary family of a dual-stack service, the IPv6 EndpointSlice
|
||||
// generates targets and the IPv4 (secondary) EndpointSlice produces an empty
|
||||
// target group.
|
||||
func TestEndpointSliceDiscoveryDualStackPreferredIPv6Primary(t *testing.T) {
|
||||
t.Parallel()
|
||||
svc := makeDualStackService("testsvc", "default", corev1.IPFamilyPolicyPreferDualStack,
|
||||
[]corev1.IPFamily{corev1.IPv6Protocol, corev1.IPv4Protocol})
|
||||
epsIPv6 := makeDualStackEndpointSlice("testsvc-ipv6", "default", "testsvc", v1.AddressTypeIPv6, "fd00::1")
|
||||
epsIPv4 := makeDualStackEndpointSlice("testsvc-ipv4", "default", "testsvc", v1.AddressTypeIPv4, "1.2.3.4")
|
||||
|
||||
n, _ := makeDiscovery(RoleEndpointSlice, NamespaceDiscovery{Names: []string{"default"}}, svc, epsIPv6, epsIPv4)
|
||||
|
||||
k8sDiscoveryTest{
|
||||
discovery: n,
|
||||
expectedMaxItems: 2,
|
||||
expectedRes: map[string]*targetgroup.Group{
|
||||
"endpointslice/default/testsvc-ipv6": {
|
||||
Targets: []model.LabelSet{dualStackTarget("[fd00::1]:9000")},
|
||||
Labels: dualStackGroupLabels("default", "IPv6", "testsvc-ipv6", "testsvc"),
|
||||
Source: "endpointslice/default/testsvc-ipv6",
|
||||
},
|
||||
"endpointslice/default/testsvc-ipv4": {
|
||||
Targets: nil,
|
||||
Labels: dualStackGroupLabels("default", "IPv4", "testsvc-ipv4", "testsvc"),
|
||||
Source: "endpointslice/default/testsvc-ipv4",
|
||||
},
|
||||
},
|
||||
}.Run(t)
|
||||
}
|
||||
|
|
|
|||
Loading…
Reference in a new issue