Port service/proxy subresource from Endpoints to EndpointSlice

This commit is contained in:
Dan Winship 2025-10-24 11:10:50 -04:00
parent 7766ce7300
commit b2d07eeff8
7 changed files with 203 additions and 159 deletions

View file

@ -1,70 +0,0 @@
/*
Copyright 2021 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package testing
import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
api "k8s.io/kubernetes/pkg/apis/core"
)
// Tweak is a function that modifies a Endpoints.
type Tweak func(*api.Endpoints)
// MakeEndpoints helps construct Endpoints objects (which pass API validation)
// more legibly and tersely than a Go struct definition.
func MakeEndpoints(name string, addrs []api.EndpointAddress, ports []api.EndpointPort, tweaks ...Tweak) *api.Endpoints {
// NOTE: Any field that would be populated by defaulting needs to be
// present and valid here.
eps := &api.Endpoints{
ObjectMeta: metav1.ObjectMeta{
Name: name,
Namespace: metav1.NamespaceDefault,
},
Subsets: []api.EndpointSubset{{
Addresses: addrs,
Ports: ports,
}},
}
for _, tweak := range tweaks {
tweak(eps)
}
return eps
}
// MakeEndpointAddress helps construct EndpointAddress objects which pass API
// validation.
func MakeEndpointAddress(ip string, pod string) api.EndpointAddress {
return api.EndpointAddress{
IP: ip,
TargetRef: &api.ObjectReference{
Name: pod,
Namespace: metav1.NamespaceDefault,
},
}
}
// MakeEndpointPort helps construct EndpointPort objects which pass API
// validation.
func MakeEndpointPort(name string, port int) api.EndpointPort {
return api.EndpointPort{
Name: name,
Port: int32(port),
}
}

View file

@ -390,6 +390,8 @@ func (c CompletedConfig) New(delegationTarget genericapiserver.DelegationTarget)
}
func (c CompletedConfig) StorageProviders(client *kubernetes.Clientset) ([]controlplaneapiserver.RESTStorageProvider, error) {
discoveryRESTStorageProvider := &discoveryrest.StorageProvider{}
legacyRESTStorageProvider, err := corerest.New(corerest.Config{
GenericConfig: *c.ControlPlane.NewCoreGenericConfig(),
Proxy: corerest.ProxyConfig{
@ -402,6 +404,9 @@ func (c CompletedConfig) StorageProviders(client *kubernetes.Clientset) ([]contr
NodePortRange: c.Extra.ServiceNodePortRange,
IPRepairInterval: c.Extra.RepairServicesInterval,
},
OtherProviders: corerest.OtherProviders{
EndpointSliceListerProvider: discoveryRESTStorageProvider,
},
}, c.ControlPlane.Generic.Authorization.Authorizer)
if err != nil {
return nil, err
@ -423,7 +428,7 @@ func (c CompletedConfig) StorageProviders(client *kubernetes.Clientset) ([]contr
batchrest.RESTStorageProvider{},
certificatesrest.RESTStorageProvider{Authorizer: c.ControlPlane.Generic.Authorization.Authorizer},
coordinationrest.RESTStorageProvider{},
discoveryrest.StorageProvider{},
discoveryRESTStorageProvider,
networkingrest.RESTStorageProvider{},
noderest.RESTStorageProvider{},
policyrest.RESTStorageProvider{},

View file

@ -553,7 +553,7 @@ func TestGenericStorageProviders(t *testing.T) {
switch kube[k].(type) {
case autoscalingrest.RESTStorageProvider,
batchrest.RESTStorageProvider,
discoveryrest.StorageProvider,
*discoveryrest.StorageProvider,
networkingrest.RESTStorageProvider,
noderest.RESTStorageProvider,
policyrest.RESTStorageProvider,

View file

@ -70,8 +70,9 @@ import (
type Config struct {
GenericConfig
Proxy ProxyConfig
Services ServicesConfig
Proxy ProxyConfig
Services ServicesConfig
OtherProviders OtherProviders
}
type ProxyConfig struct {
@ -88,6 +89,10 @@ type ServicesConfig struct {
IPRepairInterval time.Duration
}
type OtherProviders struct {
EndpointSliceListerProvider servicestore.EndpointSliceListerProvider
}
type rangeRegistries struct {
clusterIP rangeallocation.RangeRegistry
secondaryClusterIP rangeallocation.RangeRegistry
@ -207,7 +212,7 @@ func (p *legacyProvider) NewRESTStorage(apiResourceConfigSource serverstorage.AP
p.primaryServiceClusterIPAllocator.IPFamily(),
p.serviceClusterIPAllocators,
p.serviceNodePortAllocator,
endpointsStorage,
p.OtherProviders.EndpointSliceListerProvider,
podStorage.Pod,
p.Proxy.Transport)
if err != nil {

View file

@ -23,10 +23,14 @@ import (
"net"
"net/http"
"net/url"
"sort"
"strconv"
discoveryv1 "k8s.io/api/discovery/v1"
"k8s.io/apimachinery/pkg/api/errors"
metainternalversion "k8s.io/apimachinery/pkg/apis/meta/internalversion"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime"
utilnet "k8s.io/apimachinery/pkg/util/net"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
@ -37,6 +41,7 @@ import (
"k8s.io/apiserver/pkg/registry/rest"
"k8s.io/apiserver/pkg/util/dryrun"
api "k8s.io/kubernetes/pkg/apis/core"
"k8s.io/kubernetes/pkg/apis/discovery"
"k8s.io/kubernetes/pkg/printers"
printersinternal "k8s.io/kubernetes/pkg/printers/internalversion"
printerstorage "k8s.io/kubernetes/pkg/printers/storage"
@ -47,8 +52,8 @@ import (
"sigs.k8s.io/structured-merge-diff/v6/fieldpath"
)
type EndpointsStorage interface {
rest.Getter
type EndpointSliceListerProvider interface {
EndpointSliceLister() rest.Lister
}
type PodStorage interface {
@ -60,7 +65,7 @@ type REST struct {
primaryIPFamily api.IPFamily
secondaryIPFamily api.IPFamily
alloc Allocators
endpoints EndpointsStorage
endpointSlices EndpointSliceListerProvider
pods PodStorage
proxyTransport http.RoundTripper
}
@ -79,7 +84,7 @@ func NewREST(
serviceIPFamily api.IPFamily,
ipAllocs map[api.IPFamily]ipallocator.Interface,
portAlloc portallocator.Interface,
endpoints EndpointsStorage,
endpointSlices EndpointSliceListerProvider,
pods PodStorage,
proxyTransport http.RoundTripper) (*REST, *StatusREST, *svcreg.ProxyREST, error) {
@ -120,7 +125,7 @@ func NewREST(
primaryIPFamily: primaryIPFamily,
secondaryIPFamily: secondaryIPFamily,
alloc: makeAlloc(serviceIPFamily, ipAllocs, portAlloc),
endpoints: endpoints,
endpointSlices: endpointSlices,
pods: pods,
proxyTransport: proxyTransport,
}
@ -435,58 +440,79 @@ func (r *REST) ResourceLocation(ctx context.Context, id string) (*url.URL, http.
}
}
obj, err := r.endpoints.Get(ctx, svcName, &metav1.GetOptions{})
obj, err := r.endpointSlices.EndpointSliceLister().List(ctx, &metainternalversion.ListOptions{LabelSelector: labels.SelectorFromSet(labels.Set{discoveryv1.LabelServiceName: svcName})})
if err != nil {
return nil, nil, err
}
eps := obj.(*api.Endpoints)
if len(eps.Subsets) == 0 {
slices := obj.(*discovery.EndpointSliceList).Items
if len(slices) == 0 {
return nil, nil, errors.NewServiceUnavailable(fmt.Sprintf("no endpoints available for service %q", svcName))
} else if len(slices) > 1 {
// If there are multiple slices, we want to look at them in a random
// order, but we need to look at all of the slices of the primary IP
// family first.
preferredAddressType := discovery.AddressType(r.primaryIPFamily)
randomOrder := rand.Perm(len(slices))
sort.Slice(slices, func(i, j int) bool {
if slices[i].AddressType != slices[j].AddressType {
return slices[i].AddressType == preferredAddressType
}
return randomOrder[i] < randomOrder[j]
})
}
// Pick a random Subset to start searching from.
ssSeed := rand.Intn(len(eps.Subsets))
// Find a Subset that has the port.
for ssi := 0; ssi < len(eps.Subsets); ssi++ {
ss := &eps.Subsets[(ssSeed+ssi)%len(eps.Subsets)]
if len(ss.Addresses) == 0 {
continue
}
for i := range ss.Ports {
if ss.Ports[i].Name == portStr {
addrSeed := rand.Intn(len(ss.Addresses))
// This is a little wonky, but it's expensive to test for the presence of a Pod
// So we repeatedly try at random and validate it, this means that for an invalid
// service with a lot of endpoints we're going to potentially make a lot of calls,
// but in the expected case we'll only make one.
for try := 0; try < len(ss.Addresses); try++ {
addr := ss.Addresses[(addrSeed+try)%len(ss.Addresses)]
// We only proxy to addresses that are actually pods.
if err := isValidAddress(ctx, &addr, r.pods); err != nil {
utilruntime.HandleError(fmt.Errorf("Address %v isn't valid (%v)", addr, err))
continue
}
ip := addr.IP
port := int(ss.Ports[i].Port)
return &url.URL{
Scheme: svcScheme,
Host: net.JoinHostPort(ip, strconv.Itoa(port)),
}, r.proxyTransport, nil
// Find a slice that has the port.
for _, slice := range slices {
for i := range slice.Ports {
if slice.Ports[i].Name == nil || *slice.Ports[i].Name != portStr {
continue
}
if slice.Ports[i].Port == nil {
continue
}
if len(slice.Endpoints) == 0 {
continue
}
// Starting from a random index, find a Ready endpoint.
// This is a little wonky, but it's expensive to test for the presence of a Pod
// So we repeatedly try at random and validate it, this means that for an invalid
// service with a lot of endpoints we're going to potentially make a lot of calls,
// but in the expected case we'll only make one.
offset := rand.Intn(len(slice.Endpoints))
for epi := range slice.Endpoints {
ep := &slice.Endpoints[(epi+offset)%len(slice.Endpoints)]
if ep.Conditions.Ready != nil && !*ep.Conditions.Ready {
continue
}
utilruntime.HandleError(fmt.Errorf("Failed to find a valid address, skipping subset: %v", ss))
// We only proxy to addresses that are actually pods.
if err := isValidAddress(ctx, ep, r.pods); err != nil {
utilruntime.HandleError(fmt.Errorf("endpoint %v isn't valid (%w)", ep, err))
continue
}
// (Addresses is an array but only Addresses[0] is used.)
ip := ep.Addresses[0]
port := int(*slice.Ports[i].Port)
return &url.URL{
Scheme: svcScheme,
Host: net.JoinHostPort(ip, strconv.Itoa(port)),
}, r.proxyTransport, nil
}
}
}
return nil, nil, errors.NewServiceUnavailable(fmt.Sprintf("no endpoints available for service %q", id))
}
func isValidAddress(ctx context.Context, addr *api.EndpointAddress, pods rest.Getter) error {
if addr.TargetRef == nil {
return fmt.Errorf("Address has no target ref, skipping: %v", addr)
func isValidAddress(ctx context.Context, ep *discovery.Endpoint, pods rest.Getter) error {
if ep.TargetRef == nil {
return fmt.Errorf("endpoint has no target ref, skipping: %v", ep)
}
if genericapirequest.NamespaceValue(ctx) != addr.TargetRef.Namespace {
return fmt.Errorf("Address namespace doesn't match context namespace")
if genericapirequest.NamespaceValue(ctx) != ep.TargetRef.Namespace {
return fmt.Errorf("endpoint namespace doesn't match context namespace")
}
obj, err := pods.Get(ctx, addr.TargetRef.Name, &metav1.GetOptions{})
obj, err := pods.Get(ctx, ep.TargetRef.Name, &metav1.GetOptions{})
if err != nil {
return err
}
@ -495,14 +521,15 @@ func isValidAddress(ctx context.Context, addr *api.EndpointAddress, pods rest.Ge
return fmt.Errorf("failed to cast to pod: %v", obj)
}
if pod == nil {
return fmt.Errorf("pod is missing, skipping (%s/%s)", addr.TargetRef.Namespace, addr.TargetRef.Name)
return fmt.Errorf("pod is missing, skipping (%s/%s)", ep.TargetRef.Namespace, ep.TargetRef.Name)
}
for _, podIP := range pod.Status.PodIPs {
if podIP.IP == addr.IP {
// (Addresses is an array but only Addresses[0] is used.)
if podIP.IP == ep.Addresses[0] {
return nil
}
}
return fmt.Errorf("pod ip(s) doesn't match endpoint ip, skipping: %v vs %s (%s/%s)", pod.Status.PodIPs, addr.IP, addr.TargetRef.Namespace, addr.TargetRef.Name)
return fmt.Errorf("pod ip(s) doesn't match endpoint ip, skipping: %v vs %s (%s/%s)", pod.Status.PodIPs, ep.Addresses[0], ep.TargetRef.Namespace, ep.TargetRef.Name)
}
// normalizeClusterIPs adjust clusterIPs based on ClusterIP. This must not

View file

@ -38,25 +38,34 @@ import (
"k8s.io/apiserver/pkg/registry/generic"
genericregistrytest "k8s.io/apiserver/pkg/registry/generic/testing"
"k8s.io/apiserver/pkg/registry/rest"
epstest "k8s.io/kubernetes/pkg/api/endpoints/testing"
svctest "k8s.io/kubernetes/pkg/api/service/testing"
api "k8s.io/kubernetes/pkg/apis/core"
endpointstore "k8s.io/kubernetes/pkg/registry/core/endpoint/storage"
"k8s.io/kubernetes/pkg/apis/discovery"
podstore "k8s.io/kubernetes/pkg/registry/core/pod/storage"
"k8s.io/kubernetes/pkg/registry/core/service/ipallocator"
"k8s.io/kubernetes/pkg/registry/core/service/portallocator"
endpointslicestore "k8s.io/kubernetes/pkg/registry/discovery/endpointslice/storage"
"k8s.io/kubernetes/pkg/registry/registrytest"
netutils "k8s.io/utils/net"
"k8s.io/utils/ptr"
)
type cleanupFunc func()
type dummyEndpointSliceListerProvider struct {
lister rest.Lister
}
func (d *dummyEndpointSliceListerProvider) EndpointSliceLister() rest.Lister {
return d.lister
}
// Most tests will use this to create a registry to run tests against.
func newStorage(t *testing.T, ipFamilies []api.IPFamily) (*wrapperRESTForTests, *StatusREST, cleanupFunc) {
return newStorageWithPods(t, ipFamilies, nil, nil)
}
func newStorageWithPods(t *testing.T, ipFamilies []api.IPFamily, pods []api.Pod, endpoints []*api.Endpoints) (*wrapperRESTForTests, *StatusREST, cleanupFunc) {
func newStorageWithPods(t *testing.T, ipFamilies []api.IPFamily, pods []api.Pod, endpointSlices []*discovery.EndpointSlice) (*wrapperRESTForTests, *StatusREST, cleanupFunc) {
etcdStorage, server := registrytest.NewEtcdStorage(t, "")
restOptions := generic.RESTOptions{
StorageConfig: etcdStorage.ForResource(schema.GroupResource{Resource: "services"}),
@ -101,32 +110,37 @@ func newStorageWithPods(t *testing.T, ipFamilies []api.IPFamily, pods []api.Pod,
}
}
endpointsStorage, err := endpointstore.NewREST(generic.RESTOptions{
StorageConfig: etcdStorage,
etcdDiscoveryStorage, discoveryServer := registrytest.NewEtcdStorage(t, discovery.GroupName)
endpointSliceStorage, err := endpointslicestore.NewREST(generic.RESTOptions{
StorageConfig: etcdDiscoveryStorage,
Decorator: generic.UndecoratedStorage,
ResourcePrefix: "endpoints",
ResourcePrefix: "endpointslices",
})
if err != nil {
t.Fatalf("unexpected error from REST storage: %v", err)
}
if endpoints != nil && len(endpoints) > 0 {
ctx := genericregistrytest.NewNamespaceScopeContext(endpointsStorage.Store, metav1.NamespaceDefault)
for ix := range endpoints {
key, _ := endpointsStorage.KeyFunc(ctx, endpoints[ix].Name)
if err := endpointsStorage.Store.Storage.Create(ctx, key, endpoints[ix], nil, 0, false); err != nil {
t.Fatalf("Couldn't create endpoint: %v", err)
endpointSliceListerProvider := &dummyEndpointSliceListerProvider{endpointSliceStorage}
if len(endpointSlices) > 0 {
ctx := genericregistrytest.NewNamespaceScopeContext(endpointSliceStorage.Store, metav1.NamespaceDefault)
for ix := range endpointSlices {
key, _ := endpointSliceStorage.KeyFunc(ctx, endpointSlices[ix].Name)
if err := endpointSliceStorage.Store.Storage.Create(ctx, key, endpointSlices[ix], nil, 0, false); err != nil {
t.Fatalf("Couldn't create endpointslice: %v", err)
}
}
}
serviceStorage, statusStorage, _, err := NewREST(restOptions, ipFamilies[0], ipAllocs, portAlloc, endpointsStorage, podStorage.Pod, nil)
serviceStorage, statusStorage, _, err := NewREST(restOptions, ipFamilies[0], ipAllocs, portAlloc, endpointSliceListerProvider, podStorage.Pod, nil)
if err != nil {
t.Fatalf("unexpected error from REST storage: %v", err)
}
cleanup := func() {
server.Terminate(t)
discoveryServer.Terminate(t)
serviceStorage.Destroy()
endpointSliceStorage.Store.Destroy()
}
return &wrapperRESTForTests{serviceStorage}, statusStorage, cleanup
@ -11605,30 +11619,84 @@ func TestServiceRegistryResourceLocation(t *testing.T) {
makePod("no-endpoints", "9.9.9.9"), // to prove this does not get chosen
}
endpoints := []*api.Endpoints{
epstest.MakeEndpoints("unnamed",
[]api.EndpointAddress{
epstest.MakeEndpointAddress("1.2.3.4", "unnamed"),
endpoints := []*discovery.EndpointSlice{
{
ObjectMeta: metav1.ObjectMeta{
Name: "unnamed-ep",
Namespace: metav1.NamespaceDefault,
Labels: map[string]string{
"kubernetes.io/service-name": "unnamed",
},
},
[]api.EndpointPort{
epstest.MakeEndpointPort("", 80),
}),
epstest.MakeEndpoints("unnamed2",
[]api.EndpointAddress{
epstest.MakeEndpointAddress("1.2.3.5", "unnamed"),
AddressType: discovery.AddressTypeIPv4,
Endpoints: []discovery.Endpoint{{
Addresses: []string{"1.2.3.4"},
TargetRef: &api.ObjectReference{
Name: "unnamed",
Namespace: metav1.NamespaceDefault,
},
}},
Ports: []discovery.EndpointPort{{
Name: ptr.To(""),
Port: ptr.To[int32](80),
}},
},
{
ObjectMeta: metav1.ObjectMeta{
Name: "unnamed2-ep",
Namespace: metav1.NamespaceDefault,
Labels: map[string]string{
"kubernetes.io/service-name": "unnamed2",
},
},
[]api.EndpointPort{
epstest.MakeEndpointPort("", 80),
}),
epstest.MakeEndpoints("named",
[]api.EndpointAddress{
epstest.MakeEndpointAddress("1.2.3.6", "named"),
AddressType: discovery.AddressTypeIPv4,
Endpoints: []discovery.Endpoint{{
Addresses: []string{"1.2.3.5"},
TargetRef: &api.ObjectReference{
Name: "unnamed",
Namespace: metav1.NamespaceDefault,
},
}},
Ports: []discovery.EndpointPort{{
Name: ptr.To(""),
Port: ptr.To[int32](80),
}},
},
{
ObjectMeta: metav1.ObjectMeta{
Name: "named-ep",
Namespace: metav1.NamespaceDefault,
Labels: map[string]string{
"kubernetes.io/service-name": "named",
},
},
[]api.EndpointPort{
epstest.MakeEndpointPort("p", 80),
epstest.MakeEndpointPort("q", 81),
}),
epstest.MakeEndpoints("no-endpoints", nil, nil), // to prove this does not get chosen
AddressType: discovery.AddressTypeIPv4,
Endpoints: []discovery.Endpoint{{
Addresses: []string{"1.2.3.6"},
TargetRef: &api.ObjectReference{
Name: "named",
Namespace: metav1.NamespaceDefault,
},
}},
Ports: []discovery.EndpointPort{{
Name: ptr.To("p"),
Port: ptr.To[int32](80),
}, {
Name: ptr.To("q"),
Port: ptr.To[int32](81),
}},
},
// to prove this does not get chosen
{
ObjectMeta: metav1.ObjectMeta{
Name: "no-endpoints-ep",
Namespace: metav1.NamespaceDefault,
Labels: map[string]string{
"kubernetes.io/service-name": "no-endpoints",
},
},
AddressType: discovery.AddressTypeIPv4,
},
}
storage, _, cleanup := newStorageWithPods(t, []api.IPFamily{api.IPv4Protocol}, pods, endpoints)

View file

@ -28,10 +28,12 @@ import (
)
// StorageProvider is a REST storage provider for discovery.k8s.io.
type StorageProvider struct{}
type StorageProvider struct {
endpointSliceLister rest.Lister
}
// NewRESTStorage returns a new storage provider.
func (p StorageProvider) NewRESTStorage(apiResourceConfigSource serverstorage.APIResourceConfigSource, restOptionsGetter generic.RESTOptionsGetter) (genericapiserver.APIGroupInfo, error) {
func (p *StorageProvider) NewRESTStorage(apiResourceConfigSource serverstorage.APIResourceConfigSource, restOptionsGetter generic.RESTOptionsGetter) (genericapiserver.APIGroupInfo, error) {
apiGroupInfo := genericapiserver.NewDefaultAPIGroupInfo(discovery.GroupName, legacyscheme.Scheme, legacyscheme.ParameterCodec, legacyscheme.Codecs)
// If you add a version here, be sure to add an entry in `k8s.io/kubernetes/cmd/kube-apiserver/app/aggregator.go with specific priorities.
// TODO refactor the plumbing to provide the information in the APIGroupInfo
@ -45,7 +47,7 @@ func (p StorageProvider) NewRESTStorage(apiResourceConfigSource serverstorage.AP
return apiGroupInfo, nil
}
func (p StorageProvider) v1Storage(apiResourceConfigSource serverstorage.APIResourceConfigSource, restOptionsGetter generic.RESTOptionsGetter) (map[string]rest.Storage, error) {
func (p *StorageProvider) v1Storage(apiResourceConfigSource serverstorage.APIResourceConfigSource, restOptionsGetter generic.RESTOptionsGetter) (map[string]rest.Storage, error) {
storage := map[string]rest.Storage{}
if resource := "endpointslices"; apiResourceConfigSource.ResourceEnabled(discoveryv1.SchemeGroupVersion.WithResource(resource)) {
@ -54,12 +56,19 @@ func (p StorageProvider) v1Storage(apiResourceConfigSource serverstorage.APIReso
return storage, err
}
storage[resource] = endpointSliceStorage
p.endpointSliceLister = endpointSliceStorage
}
return storage, nil
}
// GroupName is the group name for the storage provider.
func (p StorageProvider) GroupName() string {
func (p *StorageProvider) GroupName() string {
return discovery.GroupName
}
// EndpointSliceLister returns a rest.Lister for EndpointSlices (or nil if NewRESTStorage
// has not yet been called).
func (p *StorageProvider) EndpointSliceLister() rest.Lister {
return p.endpointSliceLister
}