sharding: implement UID-based sharding for list and watch (KEP-5866)

This commit is contained in:
Jefftree 2026-03-12 14:35:03 -04:00
parent 5f94c5bb7d
commit b8a17e1ce8
28 changed files with 1908 additions and 21 deletions

View file

@ -2165,6 +2165,10 @@ var defaultVersionedKubernetesFeatureGates = map[featuregate.Feature]featuregate
{Version: version.MustParse("1.36"), Default: false, LockToDefault: true, PreRelease: featuregate.Deprecated},
},
genericfeatures.ShardedListAndWatch: {
{Version: version.MustParse("1.36"), Default: false, PreRelease: featuregate.Alpha},
},
genericfeatures.SizeBasedListCostEstimate: {
{Version: version.MustParse("1.34"), Default: true, PreRelease: featuregate.Beta},
},
@ -2640,6 +2644,8 @@ var defaultKubernetesFeatureGateDependencies = map[featuregate.Feature][]feature
genericfeatures.SeparateCacheWatchRPC: {},
genericfeatures.ShardedListAndWatch: {},
genericfeatures.SizeBasedListCostEstimate: {},
genericfeatures.StorageVersionAPI: {genericfeatures.APIServerIdentity},

View file

@ -0,0 +1,59 @@
/*
Copyright The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package internalversion
import (
"fmt"
v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/conversion"
"k8s.io/apimachinery/pkg/sharding"
)
// Convert_v1_ListOptions_To_internalversion_ListOptions handles conversion from
// the wire-format v1.ListOptions to the internal ListOptions, including shard
// selector parsing.
func Convert_v1_ListOptions_To_internalversion_ListOptions(in *v1.ListOptions, out *ListOptions, s conversion.Scope) error {
if err := autoConvert_v1_ListOptions_To_internalversion_ListOptions(in, out, s); err != nil {
return err
}
// Parse the new shardSelector field into a ShardSelector if set.
if in.ShardSelector != "" {
sel, err := sharding.Parse(in.ShardSelector)
if err != nil {
return fmt.Errorf("invalid shard selector: %w", err)
}
out.ShardSelector = sel
}
return nil
}
// Convert_internalversion_ListOptions_To_v1_ListOptions handles conversion from
// internal ListOptions to the wire-format v1.ListOptions.
func Convert_internalversion_ListOptions_To_v1_ListOptions(in *ListOptions, out *v1.ListOptions, s conversion.Scope) error {
if err := autoConvert_internalversion_ListOptions_To_v1_ListOptions(in, out, s); err != nil {
return err
}
if in.ShardSelector != nil && !in.ShardSelector.Empty() {
out.ShardSelector = in.ShardSelector.String()
}
return nil
}

View file

@ -21,6 +21,7 @@ import (
"k8s.io/apimachinery/pkg/fields"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/sharding"
)
// +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object
@ -89,6 +90,9 @@ type ListOptions struct {
// Defaults to true if `resourceVersion=""` or `resourceVersion="0"` (for backward
// compatibility reasons) and to false otherwise.
SendInitialEvents *bool
// ShardSelector is the parsed shard selector from the request.
ShardSelector sharding.Selector
}
// +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object

View file

@ -94,6 +94,13 @@ type ListInterface interface {
SetRemainingItemCount(c *int64)
}
// ShardedListInterface can be implemented by list types to indicate that they
// represent a sharded subset of the full collection rather than the complete list.
type ShardedListInterface interface {
GetShardInfo() *ShardInfo
SetShardInfo(*ShardInfo)
}
// Type exposes the type and APIVersion of versioned or internal API objects.
// TODO: move this, and TypeMeta and ListMeta, to a different package
type Type interface {
@ -113,6 +120,8 @@ func (meta *ListMeta) GetContinue() string { return meta.Continue
func (meta *ListMeta) SetContinue(c string) { meta.Continue = c }
func (meta *ListMeta) GetRemainingItemCount() *int64 { return meta.RemainingItemCount }
func (meta *ListMeta) SetRemainingItemCount(c *int64) { meta.RemainingItemCount = c }
func (meta *ListMeta) GetShardInfo() *ShardInfo { return meta.ShardInfo }
func (meta *ListMeta) SetShardInfo(s *ShardInfo) { meta.ShardInfo = s }
func (obj *TypeMeta) GetObjectKind() schema.ObjectKind { return obj }

View file

@ -92,6 +92,26 @@ type ListMeta struct {
// should not rely on the remainingItemCount to be set or to be exact.
// +optional
RemainingItemCount *int64 `json:"remainingItemCount,omitempty" protobuf:"bytes,4,opt,name=remainingItemCount"`
// shardInfo is set when the list is a filtered subset of the full collection,
// as selected by a shard selector on the request. It echoes back the selector
// so clients can verify which shard they received and merge sharded responses.
// Clients should not cache sharded list responses as a full representation
// of the collection.
//
// This is an alpha field and requires enabling the ShardedListAndWatch feature gate.
// +featureGate=ShardedListAndWatch
// +optional
ShardInfo *ShardInfo `json:"shardInfo,omitempty" protobuf:"bytes,5,opt,name=shardInfo"`
}
// ShardInfo describes the shard selector that was applied to produce a list response.
// Its presence on a list response indicates the list is a filtered subset.
type ShardInfo struct {
// selector is the shard selector string from the request, echoed back so clients
// can verify which shard they received and merge responses from multiple shards.
// +required
Selector string `json:"selector" protobuf:"bytes,1,opt,name=selector"`
}
// Field path constants that are specific to the internal API
@ -430,6 +450,38 @@ type ListOptions struct {
// compatibility reasons) and to false otherwise.
// +optional
SendInitialEvents *bool `json:"sendInitialEvents,omitempty" protobuf:"varint,11,opt,name=sendInitialEvents"`
// shardSelector restricts the list of returned objects using a CEL-based
// shard selector expression. The format uses the shardRange() function
// combined with || (logical OR) to specify one or more hash ranges:
//
// shardRange(object.metadata.uid, '0x0', '0x8000000000000000')
// shardRange(object.metadata.uid, '0x0', '0x8000000000000000') || shardRange(object.metadata.uid, '0x8000000000000000', '0x10000000000000000')
//
// Field paths use CEL-style object-rooted syntax (e.g. "object.metadata.uid"),
// NOT the fieldSelector format ("metadata.uid"). Currently supported paths:
// - object.metadata.uid
// - object.metadata.namespace
//
// hexStart and hexEnd are single-quoted CEL string literals with a '0x' prefix,
// defining the inclusive lower and exclusive upper bounds over the 64-bit FNV-1a
// hash space. The full range is [0x0, 0x10000000000000000), where the exclusive
// upper bound equals 2^64.
//
// Examples:
// 2-shard split:
// shard 0: shardRange(object.metadata.uid, '0x0000000000000000', '0x8000000000000000')
// shard 1: shardRange(object.metadata.uid, '0x8000000000000000', '0x10000000000000000')
// 4-shard split:
// shard 0: shardRange(object.metadata.uid, '0x0000000000000000', '0x4000000000000000')
// shard 1: shardRange(object.metadata.uid, '0x4000000000000000', '0x8000000000000000')
// shard 2: shardRange(object.metadata.uid, '0x8000000000000000', '0xc000000000000000')
// shard 3: shardRange(object.metadata.uid, '0xc000000000000000', '0x10000000000000000')
//
// This is an alpha field and requires enabling the ShardedListAndWatch feature gate.
// +featureGate=ShardedListAndWatch
// +optional
ShardSelector string `json:"shardSelector,omitempty" protobuf:"bytes,15,opt,name=shardSelector"`
}
const (

View file

@ -0,0 +1,50 @@
/*
Copyright The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package sharding
import (
"fmt"
"k8s.io/apimachinery/pkg/api/meta"
"k8s.io/apimachinery/pkg/runtime"
)
// ResolveFieldValue extracts a metadata field value from a runtime.Object
// based on the given field path.
//
// Field paths use CEL-style object-rooted syntax ("object.metadata.<field>"),
// which differs from the fieldSelector format ("metadata.<field>"). The
// "object." prefix anchors the path to the resource being filtered.
//
// Supported field paths:
// - "object.metadata.uid"
// - "object.metadata.namespace"
func ResolveFieldValue(obj runtime.Object, fieldPath string) (string, error) {
accessor, err := meta.Accessor(obj)
if err != nil {
return "", fmt.Errorf("failed to access object metadata: %w", err)
}
switch fieldPath {
case "object.metadata.uid":
return string(accessor.GetUID()), nil
case "object.metadata.namespace":
return accessor.GetNamespace(), nil
default:
return "", fmt.Errorf("unsupported field path: %q", fieldPath)
}
}

View file

@ -0,0 +1,64 @@
/*
Copyright The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package sharding
import (
"testing"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
)
func TestResolveFieldValue(t *testing.T) {
obj := &testObject{
ObjectMeta: metav1.ObjectMeta{
UID: types.UID("test-uid-123"),
Name: "test-name",
Namespace: "test-namespace",
},
}
tests := []struct {
fieldPath string
want string
wantErr bool
}{
{"object.metadata.uid", "test-uid-123", false},
{"object.metadata.namespace", "test-namespace", false},
{"object.metadata.name", "", true},
{"object.metadata.labels", "", true},
{"invalid.path", "", true},
}
for _, tt := range tests {
t.Run(tt.fieldPath, func(t *testing.T) {
got, err := ResolveFieldValue(obj, tt.fieldPath)
if tt.wantErr {
if err == nil {
t.Errorf("expected error for fieldPath %q", tt.fieldPath)
}
return
}
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
if got != tt.want {
t.Errorf("ResolveFieldValue(%q) = %q, want %q", tt.fieldPath, got, tt.want)
}
})
}
}

View file

@ -0,0 +1,30 @@
/*
Copyright The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package sharding
import (
"fmt"
"hash/fnv"
)
// HashField computes a hash of value and returns it
// as a 16-character lowercase hex string (no "0x" prefix).
func HashField(value string) string {
h := fnv.New64a()
h.Write([]byte(value))
return fmt.Sprintf("%016x", h.Sum64())
}

View file

@ -0,0 +1,58 @@
/*
Copyright The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package sharding
import (
"testing"
)
func TestHashField(t *testing.T) {
tests := []struct {
input string
}{
{""},
{"abc"},
{"test-uid-12345"},
{"aaaaaaaa-bbbb-cccc-dddd-eeeeeeeeeeee"},
}
for _, tt := range tests {
result := HashField(tt.input)
if len(result) != 16 {
t.Errorf("HashField(%q) returned %q (len %d), expected 16 hex chars", tt.input, result, len(result))
}
// Verify all chars are hex
for _, c := range result {
if (c < '0' || c > '9') && (c < 'a' || c > 'f') {
t.Errorf("HashField(%q) returned %q which contains non-hex char %q", tt.input, result, string(c))
}
}
}
// Determinism
h1 := HashField("test")
h2 := HashField("test")
if h1 != h2 {
t.Errorf("HashField is not deterministic: %q != %q", h1, h2)
}
// Different inputs produce different outputs
h3 := HashField("different")
if h1 == h3 {
t.Errorf("HashField produced same hash for different inputs: %q", h1)
}
}

View file

@ -0,0 +1,42 @@
/*
Copyright The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package sharding
import "fmt"
// ParseFunc is the signature for shard selector parsers.
type ParseFunc func(string) (Selector, error)
var registeredParser ParseFunc
// RegisterParser registers the shard selector parser implementation.
// This is called by k8s.io/apiserver to inject the CEL-based parser.
//
// This registration mechanism is an alpha-level internal API and is subject
// to change or removal in future releases. Do not depend on it.
func RegisterParser(p ParseFunc) {
registeredParser = p
}
// Parse parses a shard selector string into a Selector.
// A parser must be registered via RegisterParser before calling Parse.
func Parse(s string) (Selector, error) {
if registeredParser == nil {
return nil, fmt.Errorf("no shard selector parser registered")
}
return registeredParser(s)
}

View file

@ -0,0 +1,59 @@
/*
Copyright The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package sharding
import (
"testing"
)
func TestParseWithoutRegisteredParser(t *testing.T) {
old := registeredParser
defer func() { registeredParser = old }()
registeredParser = nil
_, err := Parse("anything")
if err == nil {
t.Error("Parse() without registered parser should return error")
}
}
func TestRegisterParserAndParse(t *testing.T) {
old := registeredParser
defer func() { registeredParser = old }()
called := false
RegisterParser(func(s string) (Selector, error) {
called = true
return NewSelector(ShardRangeRequirement{
Key: "object.metadata.uid",
Start: "0x0",
End: "0x8",
}), nil
})
sel, err := Parse("test")
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
if !called {
t.Error("registered parser was not called")
}
reqs := sel.Requirements()
if len(reqs) != 1 || reqs[0].Key != "object.metadata.uid" {
t.Errorf("unexpected requirements: %+v", reqs)
}
}

View file

@ -0,0 +1,128 @@
/*
Copyright The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package sharding
import (
"fmt"
"strings"
"k8s.io/apimachinery/pkg/runtime"
)
// Selector represents a shard selector that can match objects based on
// hash ranges of their metadata fields. It follows the labels.Selector
// pattern from the Kubernetes API.
type Selector interface {
// Matches returns true if the given object matches the shard selector.
Matches(obj runtime.Object) (bool, error)
// Empty returns true if the selector matches everything (no filtering).
Empty() bool
// String returns the wire-format string representation that can be
// round-tripped through Parse.
String() string
// Requirements returns the list of shard range requirements.
Requirements() []ShardRangeRequirement
// DeepCopySelector returns a deep copy of the selector.
DeepCopySelector() Selector
}
// Everything returns a selector that matches all objects.
func Everything() Selector {
return &everythingSelector{}
}
type everythingSelector struct{}
func (s *everythingSelector) Matches(_ runtime.Object) (bool, error) { return true, nil }
func (s *everythingSelector) Empty() bool { return true }
func (s *everythingSelector) String() string { return "" }
func (s *everythingSelector) Requirements() []ShardRangeRequirement { return nil }
func (s *everythingSelector) DeepCopySelector() Selector { return &everythingSelector{} }
// shardSelector implements Selector with one or more shard range requirements.
type shardSelector struct {
requirements []ShardRangeRequirement
}
func (s *shardSelector) Matches(obj runtime.Object) (bool, error) {
if len(s.requirements) == 0 {
return true, nil
}
// All requirements share the same key (enforced by the parser),
// so resolve the field value and compute the hash once.
value, err := ResolveFieldValue(obj, s.requirements[0].Key)
if err != nil {
return false, err
}
hash := "0x" + HashField(value)
for _, req := range s.requirements {
if !hexLess(hash, req.Start) && hexLess(hash, req.End) {
return true, nil
}
}
return false, nil
}
// hexLess compares two lowercase hex strings numerically.
// It handles strings of different lengths by treating shorter strings
// as having smaller magnitude (fewer digits = smaller number).
func hexLess(a, b string) bool {
if len(a) != len(b) {
return len(a) < len(b)
}
return a < b
}
func (s *shardSelector) Empty() bool {
return len(s.requirements) == 0
}
func (s *shardSelector) String() string {
parts := make([]string, 0, len(s.requirements))
for _, req := range s.requirements {
parts = append(parts, fmt.Sprintf("shardRange(%s, '%s', '%s')", req.Key, req.Start, req.End))
}
return strings.Join(parts, " || ")
}
func (s *shardSelector) Requirements() []ShardRangeRequirement {
result := make([]ShardRangeRequirement, len(s.requirements))
copy(result, s.requirements)
return result
}
func (s *shardSelector) DeepCopySelector() Selector {
reqs := make([]ShardRangeRequirement, len(s.requirements))
copy(reqs, s.requirements)
return &shardSelector{requirements: reqs}
}
// NewSelector creates a Selector from the given requirements.
// If no requirements are provided, returns Everything().
func NewSelector(reqs ...ShardRangeRequirement) Selector {
if len(reqs) == 0 {
return Everything()
}
return &shardSelector{
requirements: reqs,
}
}

View file

@ -0,0 +1,141 @@
/*
Copyright The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package sharding
import (
"testing"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
)
// testObject is a minimal runtime.Object for testing.
type testObject struct {
metav1.TypeMeta `json:",inline"`
metav1.ObjectMeta `json:"metadata"`
}
func (t *testObject) DeepCopyObject() runtime.Object {
return &testObject{
TypeMeta: t.TypeMeta,
ObjectMeta: *t.ObjectMeta.DeepCopy(),
}
}
func TestSelectorMatches(t *testing.T) {
obj := &testObject{
ObjectMeta: metav1.ObjectMeta{
UID: types.UID("test-uid-123"),
Name: "test-name",
Namespace: "test-namespace",
},
}
hash := "0x" + HashField("test-uid-123")
tests := []struct {
name string
selector Selector
wantMatch bool
}{
{
name: "everything matches",
selector: Everything(),
wantMatch: true,
},
{
name: "empty selector matches",
selector: NewSelector(),
wantMatch: true,
},
{
name: "full range matches",
selector: NewSelector(ShardRangeRequirement{
Key: "object.metadata.uid",
Start: "0x0000000000000000",
End: "0x10000000000000000",
}),
wantMatch: true,
},
{
name: "hash in specific range",
selector: NewSelector(ShardRangeRequirement{
Key: "object.metadata.uid",
Start: hash,
End: hash + "f", // hash + "f" is always > hash
}),
wantMatch: true,
},
{
name: "hash below start",
selector: NewSelector(ShardRangeRequirement{
Key: "object.metadata.uid",
Start: "0xffffffffffffffff",
End: "0x10000000000000000",
}),
wantMatch: hash >= "0xffffffffffffffff",
},
{
name: "hash at or above end",
selector: NewSelector(ShardRangeRequirement{
Key: "object.metadata.uid",
Start: "0x0000000000000000",
End: "0x0000000000000001",
}),
wantMatch: hash < "0x0000000000000001",
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
matched, err := tt.selector.Matches(obj)
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
if matched != tt.wantMatch {
t.Errorf("Matches() = %v, want %v (hash=%s)", matched, tt.wantMatch, hash)
}
})
}
}
func TestSelectorEmpty(t *testing.T) {
if !Everything().Empty() {
t.Error("Everything() should be empty")
}
if !NewSelector().Empty() {
t.Error("NewSelector() with no args should be empty")
}
sel := NewSelector(ShardRangeRequirement{Key: "object.metadata.uid", Start: "0x0000000000000000", End: "0x8000000000000000"})
if sel.Empty() {
t.Error("selector with requirement should not be empty")
}
}
func TestSelectorString(t *testing.T) {
sel := NewSelector(ShardRangeRequirement{
Key: "object.metadata.uid",
Start: "0x00",
End: "0x80",
})
expected := "shardRange(object.metadata.uid, '0x00', '0x80')"
if sel.String() != expected {
t.Errorf("String() = %q, want %q", sel.String(), expected)
}
}

View file

@ -0,0 +1,32 @@
/*
Copyright The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package sharding
// ShardRangeRequirement represents a single shard range requirement.
// It specifies a field path to hash and a hex range [Start, End) for filtering.
// The hash space is FNV-1a 64-bit: [0x0000000000000000, 0x10000000000000000).
// Both Start and End must be specified (no empty/unbounded values).
type ShardRangeRequirement struct {
// Key is the field path, e.g. "object.metadata.uid"
Key string
// Start is the inclusive lower bound as a 0x-prefixed lowercase hex string.
// Minimum value is "0x0000000000000000".
Start string
// End is the exclusive upper bound as a 0x-prefixed lowercase hex string.
// Maximum value is "0x10000000000000000" (2^64).
End string
}

View file

@ -227,6 +227,14 @@ const (
// This prevents watch cache from being starved by other watches.
SeparateCacheWatchRPC featuregate.Feature = "SeparateCacheWatchRPC"
// owner: @jefftree
// kep: https://kep.k8s.io/5866
//
// Enables the shard selector parameter on List/Watch requests,
// allowing clients to receive a filtered subset of objects based
// on hash ranges of metadata fields (e.g. UID).
ShardedListAndWatch featuregate.Feature = "ShardedListAndWatch"
// owner: @serathius
//
// Enables APF to use size of objects for estimating request cost.
@ -465,6 +473,10 @@ var defaultVersionedKubernetesFeatureGates = map[featuregate.Feature]featuregate
{Version: version.MustParse("1.36"), Default: false, LockToDefault: true, PreRelease: featuregate.Deprecated},
},
ShardedListAndWatch: {
{Version: version.MustParse("1.36"), Default: false, PreRelease: featuregate.Alpha},
},
SizeBasedListCostEstimate: {
{Version: version.MustParse("1.34"), Default: true, PreRelease: featuregate.Beta},
},

View file

@ -53,6 +53,8 @@ import (
flowcontrolrequest "k8s.io/apiserver/pkg/util/flowcontrol/request"
"k8s.io/client-go/tools/cache"
_ "k8s.io/apiserver/pkg/sharding" // registers CEL-based shard selector parser
"k8s.io/klog/v2"
)
@ -393,6 +395,9 @@ func (e *Store) ListPredicate(ctx context.Context, p storage.SelectionPredicate,
}
p.Limit = options.Limit
p.Continue = options.Continue
if utilfeature.DefaultFeatureGate.Enabled(features.ShardedListAndWatch) {
p.ShardSelector = options.ShardSelector
}
list := e.NewListFunc()
qualifiedResource := e.qualifiedResourceFromContext(ctx)
storageOpts := storage.ListOptions{
@ -1429,13 +1434,21 @@ func (e *Store) Watch(ctx context.Context, options *metainternalversion.ListOpti
if options != nil {
resourceVersion = options.ResourceVersion
predicate.AllowWatchBookmarks = options.AllowWatchBookmarks
if utilfeature.DefaultFeatureGate.Enabled(features.ShardedListAndWatch) {
predicate.ShardSelector = options.ShardSelector
}
}
return e.WatchPredicate(ctx, predicate, resourceVersion, options.SendInitialEvents)
}
// WatchPredicate starts a watch for the items that matches.
func (e *Store) WatchPredicate(ctx context.Context, p storage.SelectionPredicate, resourceVersion string, sendInitialEvents *bool) (watch.Interface, error) {
storageOpts := storage.ListOptions{ResourceVersion: resourceVersion, Predicate: p, Recursive: true, SendInitialEvents: sendInitialEvents}
storageOpts := storage.ListOptions{
ResourceVersion: resourceVersion,
Predicate: p,
Recursive: true,
SendInitialEvents: sendInitialEvents,
}
// if we're not already namespace-scoped, see if the field selector narrows the scope of the watch
if requestNamespace, _ := genericapirequest.NamespaceFrom(ctx); len(requestNamespace) == 0 {

View file

@ -0,0 +1,219 @@
/*
Copyright The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package sharding
import (
"fmt"
"strings"
celparser "github.com/google/cel-go/parser"
"github.com/google/cel-go/common"
"github.com/google/cel-go/common/ast"
"github.com/google/cel-go/common/operators"
apisharding "k8s.io/apimachinery/pkg/sharding"
)
// Parse parses a CEL-based shard selector expression into a Selector.
//
// The expression format is:
//
// shardRange(object.metadata.uid, '0x0', '0x8000000000000000')
// shardRange(object.metadata.uid, '0x0', '0x8000000000000000') || shardRange(...)
//
// Only the shardRange() function and || operator are permitted. The CEL
// expression is parsed but never evaluated — the AST is walked to extract
// shard range requirements.
func Parse(expr string) (apisharding.Selector, error) {
expr = strings.TrimSpace(expr)
if expr == "" {
return nil, fmt.Errorf("empty shard selector is not allowed; omit the parameter for unfiltered lists")
}
p, err := celparser.NewParser(celparser.Macros( /* no macros */ ))
if err != nil {
return nil, fmt.Errorf("failed to create CEL parser: %w", err)
}
parsed, errs := p.Parse(common.NewTextSource(expr))
if errs != nil && len(errs.GetErrors()) > 0 {
return nil, fmt.Errorf("CEL parse error: %s", errs.GetErrors()[0].Message)
}
reqs, err := walkExpr(parsed.Expr())
if err != nil {
return nil, err
}
// Validate that all requirements use the same field key.
for i := 1; i < len(reqs); i++ {
if reqs[i].Key != reqs[0].Key {
return nil, fmt.Errorf("all shard ranges must use the same field, got %q and %q", reqs[0].Key, reqs[i].Key)
}
}
return apisharding.NewSelector(reqs...), nil
}
// walkExpr recursively walks the CEL AST and extracts ShardRangeRequirement values.
// The root expression must be either a shardRange() call or a chain of || operators
// combining shardRange() calls.
func walkExpr(e ast.Expr) ([]apisharding.ShardRangeRequirement, error) {
if e.Kind() == ast.CallKind {
call := e.AsCall()
fn := call.FunctionName()
if fn == operators.LogicalOr {
// _||_ operator: recurse into both sides
args := call.Args()
if len(args) != 2 {
return nil, fmt.Errorf("|| operator requires exactly 2 arguments")
}
left, err := walkExpr(args[0])
if err != nil {
return nil, err
}
right, err := walkExpr(args[1])
if err != nil {
return nil, err
}
return append(left, right...), nil
}
if fn == "shardRange" {
req, err := parseShardRangeCall(call)
if err != nil {
return nil, err
}
return []apisharding.ShardRangeRequirement{req}, nil
}
return nil, fmt.Errorf("unsupported function %q; only shardRange() and || are allowed", fn)
}
return nil, fmt.Errorf("unexpected expression kind %v; expected shardRange() call or || operator", e.Kind())
}
// parseShardRangeCall extracts a ShardRangeRequirement from a shardRange(field, start, end) call.
func parseShardRangeCall(call ast.CallExpr) (apisharding.ShardRangeRequirement, error) {
args := call.Args()
if len(args) != 3 {
return apisharding.ShardRangeRequirement{}, fmt.Errorf("shardRange() requires exactly 3 arguments, got %d", len(args))
}
// Arg 0: field path (select chain like object.metadata.uid)
fieldPath, err := extractFieldPath(args[0])
if err != nil {
return apisharding.ShardRangeRequirement{}, fmt.Errorf("shardRange() first argument: %w", err)
}
// Validate field path
switch fieldPath {
case "object.metadata.uid", "object.metadata.namespace":
// ok
default:
return apisharding.ShardRangeRequirement{}, fmt.Errorf("unsupported field path %q; supported: object.metadata.uid, object.metadata.namespace", fieldPath)
}
// Arg 1: hex start (string literal)
hexStart, err := extractHexLiteral(args[1], "hexStart")
if err != nil {
return apisharding.ShardRangeRequirement{}, err
}
// Arg 2: hex end (string literal)
hexEnd, err := extractHexLiteral(args[2], "hexEnd")
if err != nil {
return apisharding.ShardRangeRequirement{}, err
}
// Validate start < end. Hex values are canonical (16 or 17 digits).
// A 17-digit value (only 0x10000000000000000) is always greater than a 16-digit value,
// so compare lengths first, then lexicographically within the same length.
if len(hexStart) > len(hexEnd) || (len(hexStart) == len(hexEnd) && hexStart >= hexEnd) {
return apisharding.ShardRangeRequirement{}, fmt.Errorf("shard range start %s must be less than end %s", hexStart, hexEnd)
}
return apisharding.ShardRangeRequirement{
Key: fieldPath,
Start: hexStart,
End: hexEnd,
}, nil
}
// extractFieldPath walks a select chain (object.metadata.uid) and returns the dot-joined path.
func extractFieldPath(e ast.Expr) (string, error) {
switch e.Kind() {
case ast.IdentKind:
return e.AsIdent(), nil
case ast.SelectKind:
sel := e.AsSelect()
operand, err := extractFieldPath(sel.Operand())
if err != nil {
return "", err
}
return operand + "." + sel.FieldName(), nil
default:
return "", fmt.Errorf("expected field path (e.g. object.metadata.uid), got expression kind %v", e.Kind())
}
}
// extractHexLiteral extracts a hex string from a CEL string literal.
// The literal must be a single-quoted string like '0xff' with a 0x prefix.
func extractHexLiteral(e ast.Expr, name string) (string, error) {
if e.Kind() != ast.LiteralKind {
return "", fmt.Errorf("%s must be a string literal (e.g. '0xff'), got expression kind %v", name, e.Kind())
}
val := e.AsLiteral()
s, ok := val.Value().(string)
if !ok {
return "", fmt.Errorf("%s must be a string literal, got %T", name, val.Value())
}
if !strings.HasPrefix(s, "0x") {
return "", fmt.Errorf("%s must have '0x' prefix, got %q", name, s)
}
hex := s[2:]
if hex == "" {
return "", fmt.Errorf("%s: hex value is required after '0x'", name)
}
if len(hex) > 17 {
return "", fmt.Errorf("%s: hex value too long (%d chars, max 17): %q", name, len(hex), hex)
}
for _, c := range hex {
if (c < '0' || c > '9') && (c < 'a' || c > 'f') {
return "", fmt.Errorf("%s: invalid hex character %q in %q", name, string(c), s)
}
}
// Require exactly 16 hex digits, except for the special case
// "0x10000000000000000" (2^64) which is the exclusive upper bound.
if len(hex) == 17 {
if s != "0x10000000000000000" {
return "", fmt.Errorf("%s: 17-digit hex value must be '0x10000000000000000' (2^64), got %q", name, s)
}
} else if len(hex) != 16 {
padded := "0x" + strings.Repeat("0", 16-len(hex)) + hex
return "", fmt.Errorf("%s must be a 0x-prefixed 16-digit hex value, got %q (did you mean %q?)", name, s, padded)
}
return s, nil
}

View file

@ -0,0 +1,237 @@
/*
Copyright The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package sharding
import (
"testing"
apisharding "k8s.io/apimachinery/pkg/sharding"
)
func TestParse(t *testing.T) {
tests := []struct {
name string
input string
wantErr bool
wantReq []apisharding.ShardRangeRequirement
}{
{
name: "empty string",
input: "",
wantErr: true,
},
{
name: "single requirement with uid",
input: "shardRange(object.metadata.uid, '0x0000000000000000', '0x8000000000000000')",
wantReq: []apisharding.ShardRangeRequirement{
{Key: "object.metadata.uid", Start: "0x0000000000000000", End: "0x8000000000000000"},
},
},
{
name: "full range with 2^64 end",
input: "shardRange(object.metadata.uid, '0x0000000000000000', '0x10000000000000000')",
wantReq: []apisharding.ShardRangeRequirement{
{Key: "object.metadata.uid", Start: "0x0000000000000000", End: "0x10000000000000000"},
},
},
{
name: "namespace field",
input: "shardRange(object.metadata.namespace, '0x00000000000000aa', '0x00000000000000ff')",
wantReq: []apisharding.ShardRangeRequirement{
{Key: "object.metadata.namespace", Start: "0x00000000000000aa", End: "0x00000000000000ff"},
},
},
{
name: "two requirements with ||",
input: "shardRange(object.metadata.uid, '0x0000000000000000', '0x8000000000000000') || shardRange(object.metadata.uid, '0x8000000000000000', '0x10000000000000000')",
wantReq: []apisharding.ShardRangeRequirement{
{Key: "object.metadata.uid", Start: "0x0000000000000000", End: "0x8000000000000000"},
{Key: "object.metadata.uid", Start: "0x8000000000000000", End: "0x10000000000000000"},
},
},
{
name: "three requirements with ||",
input: "shardRange(object.metadata.uid, '0x0000000000000000', '0x0000000000000005') || shardRange(object.metadata.uid, '0x0000000000000005', '0x000000000000000a') || shardRange(object.metadata.uid, '0x000000000000000a', '0x000000000000000f')",
wantReq: []apisharding.ShardRangeRequirement{
{Key: "object.metadata.uid", Start: "0x0000000000000000", End: "0x0000000000000005"},
{Key: "object.metadata.uid", Start: "0x0000000000000005", End: "0x000000000000000a"},
{Key: "object.metadata.uid", Start: "0x000000000000000a", End: "0x000000000000000f"},
},
},
{
name: "no spaces",
input: "shardRange(object.metadata.uid,'0x0000000000000000','0x8000000000000000')",
wantReq: []apisharding.ShardRangeRequirement{
{Key: "object.metadata.uid", Start: "0x0000000000000000", End: "0x8000000000000000"},
},
},
{
name: "extra whitespace",
input: " shardRange( object.metadata.uid , '0x0000000000000000' , '0x8000000000000000' ) ",
wantReq: []apisharding.ShardRangeRequirement{
{Key: "object.metadata.uid", Start: "0x0000000000000000", End: "0x8000000000000000"},
},
},
// Error cases
{
name: "missing 0x prefix on start",
input: "shardRange(object.metadata.uid, '0000000000000000', '0x8000000000000000')",
wantErr: true,
},
{
name: "missing 0x prefix on end",
input: "shardRange(object.metadata.uid, '0x0000000000000000', '8000000000000000')",
wantErr: true,
},
{
name: "empty after 0x prefix",
input: "shardRange(object.metadata.uid, '0x', '0x8000000000000000')",
wantErr: true,
},
{
name: "name field unsupported",
input: "shardRange(object.metadata.name, '0x00', '0x80')",
wantErr: true,
},
{
name: "unsupported field",
input: "shardRange(object.metadata.labels, '0x00', '0x80')",
wantErr: true,
},
{
name: "unknown function",
input: "invalidFunc(object.metadata.uid, '0x00', '0x80')",
wantErr: true,
},
{
name: "hex too long (18 chars)",
input: "shardRange(object.metadata.uid, '0x000000000000000000', '0x80')",
wantErr: true,
},
{
name: "invalid hex char",
input: "shardRange(object.metadata.uid, '0x0g', '0x80')",
wantErr: true,
},
{
name: "&& operator not allowed",
input: "shardRange(object.metadata.uid, '0x0', '0x8') && shardRange(object.metadata.uid, '0x8', '0xf')",
wantErr: true,
},
{
name: "integer literal instead of string",
input: "shardRange(object.metadata.uid, 0, 8)",
wantErr: true,
},
{
name: "comparison operator",
input: "object.metadata.uid > '0x0'",
wantErr: true,
},
{
name: "wrong number of arguments",
input: "shardRange(object.metadata.uid, '0x0')",
wantErr: true,
},
{
name: "nested function call",
input: "shardRange(object.metadata.uid, hex(0), '0x8')",
wantErr: true,
},
{
name: "mixed field keys",
input: "shardRange(object.metadata.uid, '0x0000000000000000', '0x8000000000000000') || shardRange(object.metadata.namespace, '0x8000000000000000', '0x10000000000000000')",
wantErr: true,
},
{
name: "short hex rejected",
input: "shardRange(object.metadata.uid, '0x0', '0x8000000000000000')",
wantErr: true,
},
{
name: "short hex rejected on end",
input: "shardRange(object.metadata.uid, '0x0000000000000000', '0xff')",
wantErr: true,
},
{
name: "start equals end",
input: "shardRange(object.metadata.uid, '0x8000000000000000', '0x8000000000000000')",
wantErr: true,
},
{
name: "start greater than end",
input: "shardRange(object.metadata.uid, '0xffffffffffffffff', '0x0000000000000000')",
wantErr: true,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
sel, err := Parse(tt.input)
if tt.wantErr {
if err == nil {
t.Errorf("Parse(%q) expected error, got nil", tt.input)
}
return
}
if err != nil {
t.Fatalf("Parse(%q) unexpected error: %v", tt.input, err)
}
reqs := sel.Requirements()
if len(reqs) != len(tt.wantReq) {
t.Fatalf("Parse(%q) got %d requirements, want %d", tt.input, len(reqs), len(tt.wantReq))
}
for i, req := range reqs {
if req != tt.wantReq[i] {
t.Errorf("Parse(%q) requirement[%d] = %+v, want %+v", tt.input, i, req, tt.wantReq[i])
}
}
})
}
}
func TestParseRoundTrip(t *testing.T) {
tests := []string{
"shardRange(object.metadata.uid, '0x0000000000000000', '0x8000000000000000')",
"shardRange(object.metadata.uid, '0x8000000000000000', '0x10000000000000000')",
"shardRange(object.metadata.uid, '0x0000000000000000', '0x10000000000000000')",
"shardRange(object.metadata.namespace, '0x00000000000000aa', '0x00000000000000ff')",
"shardRange(object.metadata.uid, '0x0000000000000000', '0x8000000000000000') || shardRange(object.metadata.uid, '0x8000000000000000', '0x10000000000000000')",
}
for _, input := range tests {
t.Run(input, func(t *testing.T) {
sel, err := Parse(input)
if err != nil {
t.Fatalf("Parse(%q) error: %v", input, err)
}
output := sel.String()
if output != input {
t.Errorf("Parse(%q).String() = %q, want %q", input, output, input)
}
// Round-trip — parse(string(parse(input))) should be stable.
sel2, err := Parse(output)
if err != nil {
t.Fatalf("Parse(%q) (round-trip) error: %v", output, err)
}
if sel.String() != sel2.String() {
t.Errorf("round-trip unstable: %q -> %q -> %q", input, output, sel2.String())
}
})
}
}

View file

@ -0,0 +1,23 @@
/*
Copyright The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package sharding
import apisharding "k8s.io/apimachinery/pkg/sharding"
func init() {
apisharding.RegisterParser(Parse)
}

View file

@ -371,10 +371,10 @@ func (c *cacheWatcher) convertToWatchEvent(event *watchCacheEvent) *watch.Event
return e
}
curObjPasses := event.Type != watch.Deleted && c.filter(event.Key, event.ObjLabels, event.ObjFields)
curObjPasses := event.Type != watch.Deleted && c.filter(event.Key, event.ObjLabels, event.ObjFields, event.Object)
oldObjPasses := false
if event.PrevObject != nil {
oldObjPasses = c.filter(event.Key, event.PrevObjLabels, event.PrevObjFields)
oldObjPasses = c.filter(event.Key, event.PrevObjLabels, event.PrevObjFields, event.PrevObject)
}
if !curObjPasses && !oldObjPasses {
// Watcher is not interested in that object.

View file

@ -29,6 +29,7 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/fields"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/apimachinery/pkg/watch"
@ -47,7 +48,7 @@ func TestCacheWatcherCleanupNotBlockedByResult(t *testing.T) {
var lock sync.RWMutex
var w *cacheWatcher
count := 0
filter := func(string, labels.Set, fields.Set) bool { return true }
filter := func(string, labels.Set, fields.Set, runtime.Object) bool { return true }
forget := func(drainWatcher bool) {
lock.Lock()
defer lock.Unlock()
@ -77,7 +78,7 @@ func TestCacheWatcherCleanupNotBlockedByResult(t *testing.T) {
}
func TestCacheWatcherHandlesFiltering(t *testing.T) {
filter := func(_ string, _ labels.Set, field fields.Set) bool {
filter := func(_ string, _ labels.Set, field fields.Set, _ runtime.Object) bool {
return field["spec.nodeName"] == "host"
}
forget := func(bool) {}
@ -209,7 +210,7 @@ TestCase:
func TestCacheWatcherStoppedInAnotherGoroutine(t *testing.T) {
var w *cacheWatcher
done := make(chan struct{})
filter := func(string, labels.Set, fields.Set) bool { return true }
filter := func(string, labels.Set, fields.Set, runtime.Object) bool { return true }
forget := func(drainWatcher bool) {
w.setDrainInputBufferLocked(drainWatcher)
w.stopLocked()
@ -303,7 +304,7 @@ func TestResourceVersionAfterInitEvents(t *testing.T) {
t.Fatal(err)
}
filter := func(_ string, _ labels.Set, _ fields.Set) bool { return true }
filter := func(_ string, _ labels.Set, _ fields.Set, _ runtime.Object) bool { return true }
forget := func(_ bool) {}
deadline := time.Now().Add(time.Minute)
w := newCacheWatcher(numObjects+1, filter, forget, storage.APIObjectVersioner{}, deadline, true, schema.GroupResource{Resource: "pods"}, "")
@ -343,7 +344,7 @@ func TestResourceVersionAfterInitEvents(t *testing.T) {
}
func TestTimeBucketWatchersBasic(t *testing.T) {
filter := func(_ string, _ labels.Set, _ fields.Set) bool {
filter := func(_ string, _ labels.Set, _ fields.Set, _ runtime.Object) bool {
return true
}
forget := func(bool) {}
@ -404,7 +405,7 @@ func TestCacheWatcherDraining(t *testing.T) {
var lock sync.RWMutex
var w *cacheWatcher
count := 0
filter := func(string, labels.Set, fields.Set) bool { return true }
filter := func(string, labels.Set, fields.Set, runtime.Object) bool { return true }
forget := func(drainWatcher bool) {
lock.Lock()
defer lock.Unlock()
@ -445,7 +446,7 @@ func TestCacheWatcherDrainingRequestedButNotDrained(t *testing.T) {
var lock sync.RWMutex
var w *cacheWatcher
count := 0
filter := func(string, labels.Set, fields.Set) bool { return true }
filter := func(string, labels.Set, fields.Set, runtime.Object) bool { return true }
forget := func(drainWatcher bool) {
lock.Lock()
defer lock.Unlock()
@ -479,7 +480,7 @@ func TestCacheWatcherDrainingNoBookmarkAfterResourceVersionReceived(t *testing.T
var lock sync.RWMutex
var w *cacheWatcher
count := 0
filter := func(string, labels.Set, fields.Set) bool { return true }
filter := func(string, labels.Set, fields.Set, runtime.Object) bool { return true }
forget := func(drainWatcher bool) {
lock.Lock()
defer lock.Unlock()
@ -543,7 +544,7 @@ func TestCacheWatcherDrainingNoBookmarkAfterResourceVersionSent(t *testing.T) {
watchInitializationSignal := utilflowcontrol.NewInitializationSignal()
ctx := utilflowcontrol.WithInitializationSignal(context.Background(), watchInitializationSignal)
count := 0
filter := func(string, labels.Set, fields.Set) bool { return true }
filter := func(string, labels.Set, fields.Set, runtime.Object) bool { return true }
forget := func(drainWatcher bool) {
lock.Lock()
defer lock.Unlock()
@ -606,7 +607,7 @@ func TestCacheWatcherDrainingNoBookmarkAfterResourceVersionSent(t *testing.T) {
func TestBookmarkAfterResourceVersionWatchers(t *testing.T) {
newWatcher := func(id string, deadline time.Time) *cacheWatcher {
w := newCacheWatcher(0, func(_ string, _ labels.Set, _ fields.Set) bool { return true }, func(bool) {}, storage.APIObjectVersioner{}, deadline, true, schema.GroupResource{Resource: "pods"}, id)
w := newCacheWatcher(0, func(_ string, _ labels.Set, _ fields.Set, _ runtime.Object) bool { return true }, func(bool) {}, storage.APIObjectVersioner{}, deadline, true, schema.GroupResource{Resource: "pods"}, id)
w.setBookmarkAfterResourceVersion(10)
return w
}

View file

@ -36,6 +36,7 @@ import (
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/apimachinery/pkg/watch"
"k8s.io/apiserver/pkg/audit"
@ -247,7 +248,7 @@ func (t *watcherBookmarkTimeBuckets) popExpiredWatchersThreadUnsafe() [][]*cache
return expiredWatchers
}
type filterWithAttrsFunc func(key string, l labels.Set, f fields.Set) bool
type filterWithAttrsFunc func(key string, l labels.Set, f fields.Set, obj runtime.Object) bool
type indexedTriggerFunc struct {
indexName string
@ -602,7 +603,7 @@ func (c *Cacher) Watch(ctx context.Context, key string, opts storage.ListOptions
// to compute watcher.forget function (which has to happen under lock).
watcher := newCacheWatcher(
chanSize,
filterWithAttrsAndPrefixFunction(key, pred),
filterWithAttrsAndPrefixFunction(key, pred, c.groupResource),
emptyFunc,
c.versioner,
deadline,
@ -676,6 +677,16 @@ func (c *Cacher) Watch(ctx context.Context, key string, opts storage.ListOptions
return newImmediateCloseWatcher(), nil
}
isSharded := pred.ShardSelector != nil && !pred.ShardSelector.Empty()
if isSharded {
metrics.RecordShardedWatchStarted(c.groupResource)
originalForget := watcher.forget
watcher.forget = func(drainWatcher bool) {
metrics.RecordShardedWatchStopped(c.groupResource)
originalForget(drainWatcher)
}
}
go watcher.processInterval(ctx, cacheInterval, requiredResourceVersion)
return watcher, nil
}
@ -794,7 +805,11 @@ func (c *Cacher) GetList(ctx context.Context, key string, opts storage.ListOptio
if !ok {
return fmt.Errorf("non *store.Element returned from storage: %v", obj)
}
if opts.Predicate.MatchesObjectAttributes(elem.Labels, elem.Fields) {
shardMatch, err := opts.Predicate.MatchesSharding(elem.Object)
if err != nil {
return fmt.Errorf("shard matching failed: %w", err)
}
if shardMatch && opts.Predicate.MatchesObjectAttributes(elem.Labels, elem.Fields) {
selectedObjects = append(selectedObjects, elem.Object)
lastSelectedObjectKey = elem.Key
}
@ -825,6 +840,7 @@ func (c *Cacher) GetList(ctx context.Context, key string, opts storage.ListOptio
return err
}
}
opts.Predicate.SetShardInfoOnList(listObj)
metrics.RecordListCacheMetrics(c.groupResource, indexUsed, len(resp.Items), listVal.Len())
return nil
}
@ -1203,11 +1219,23 @@ func forgetWatcher(c *Cacher, w *cacheWatcher, index int, scope namespacedName,
}
}
func filterWithAttrsAndPrefixFunction(key string, p storage.SelectionPredicate) filterWithAttrsFunc {
filterFunc := func(objKey string, label labels.Set, field fields.Set) bool {
func filterWithAttrsAndPrefixFunction(key string, p storage.SelectionPredicate, groupResource schema.GroupResource) filterWithAttrsFunc {
isSharded := p.ShardSelector != nil && !p.ShardSelector.Empty()
filterFunc := func(objKey string, label labels.Set, field fields.Set, obj runtime.Object) bool {
if !hasPathPrefix(objKey, key) {
return false
}
matches, err := p.MatchesSharding(obj)
if err != nil {
utilruntime.HandleError(fmt.Errorf("shard matching failed for %v: %w", groupResource, err))
return false
}
if !matches {
if isSharded {
metrics.RecordWatchFilteredEvent(groupResource)
}
return false
}
return p.MatchesObjectAttributes(label, field)
}
return filterFunc

View file

@ -41,6 +41,8 @@ import (
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/sharding"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/version"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/apimachinery/pkg/watch"
@ -2648,7 +2650,7 @@ func TestForgetWatcher(t *testing.T) {
}
w := newCacheWatcher(
0,
func(_ string, _ labels.Set, _ fields.Set) bool { return true },
func(_ string, _ labels.Set, _ fields.Set, _ runtime.Object) bool { return true },
nil,
storage.APIObjectVersioner{},
testingclock.NewFakeClock(time.Now()).Now().Add(2*time.Minute),
@ -3356,3 +3358,230 @@ func (f *fakeSnapshotter) RemoveLess(rv uint64) {}
func (f *fakeSnapshotter) Len() int {
return 0
}
// --- Sharding unit tests for filterWithAttrsAndPrefixFunction ---
// selectorIncludingUID builds a shard selector whose range contains the hash of uid.
func selectorIncludingUID(uid string) sharding.Selector {
hash := "0x" + sharding.HashField(uid)
return sharding.NewSelector(sharding.ShardRangeRequirement{
Key: "object.metadata.uid",
Start: hash,
End: incrementHex(hash),
})
}
// selectorExcludingUID builds a shard selector whose range does NOT contain the hash of uid.
func selectorExcludingUID(uid string) sharding.Selector {
hash := "0x" + sharding.HashField(uid)
return sharding.NewSelector(sharding.ShardRangeRequirement{
Key: "object.metadata.uid",
Start: "0x0000000000000000",
End: hash,
})
}
func incrementHex(hex string) string {
b := []byte(hex)
for i := len(b) - 1; i >= 2; i-- {
if b[i] < '9' {
b[i]++
return string(b)
} else if b[i] == '9' {
b[i] = 'a'
return string(b)
} else if b[i] < 'f' {
b[i]++
return string(b)
}
b[i] = '0'
}
return "0x10000000000000000"
}
func makeExamplePod(name, namespace, uid string, podLabels map[string]string) *example.Pod {
return &example.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: name,
Namespace: namespace,
UID: types.UID(uid),
Labels: podLabels,
},
}
}
func TestFilterWithAttrsAndPrefixFunction_ShardMatchAndLabels(t *testing.T) {
uid := "test-uid-1"
pod := makeExamplePod("pod1", "default", uid, map[string]string{"app": "web"})
gr := schema.GroupResource{Resource: "pods"}
pred := storage.SelectionPredicate{
Label: labels.SelectorFromSet(labels.Set{"app": "web"}),
Field: fields.Everything(),
ShardSelector: selectorIncludingUID(uid),
GetAttrs: storage.DefaultNamespaceScopedAttr,
}
filter := filterWithAttrsAndPrefixFunction("/pods/", pred, gr)
if !filter("/pods/default/pod1", labels.Set{"app": "web"}, fields.Set{"metadata.name": "pod1", "metadata.namespace": "default"}, pod) {
t.Error("expected filter to match: shard matches and labels match")
}
}
func TestFilterWithAttrsAndPrefixFunction_ShardMatchButLabelMismatch(t *testing.T) {
uid := "test-uid-2"
pod := makeExamplePod("pod2", "default", uid, map[string]string{"app": "api"})
gr := schema.GroupResource{Resource: "pods"}
pred := storage.SelectionPredicate{
Label: labels.SelectorFromSet(labels.Set{"app": "web"}),
Field: fields.Everything(),
ShardSelector: selectorIncludingUID(uid),
GetAttrs: storage.DefaultNamespaceScopedAttr,
}
filter := filterWithAttrsAndPrefixFunction("/pods/", pred, gr)
if filter("/pods/default/pod2", labels.Set{"app": "api"}, fields.Set{"metadata.name": "pod2", "metadata.namespace": "default"}, pod) {
t.Error("expected filter to reject: shard matches but labels don't")
}
}
func TestFilterWithAttrsAndPrefixFunction_ShardMismatch(t *testing.T) {
uid := "test-uid-3"
pod := makeExamplePod("pod3", "default", uid, map[string]string{"app": "web"})
gr := schema.GroupResource{Resource: "pods"}
pred := storage.SelectionPredicate{
Label: labels.Everything(),
Field: fields.Everything(),
ShardSelector: selectorExcludingUID(uid),
GetAttrs: storage.DefaultNamespaceScopedAttr,
}
filter := filterWithAttrsAndPrefixFunction("/pods/", pred, gr)
if filter("/pods/default/pod3", labels.Set{"app": "web"}, fields.Set{"metadata.name": "pod3", "metadata.namespace": "default"}, pod) {
t.Error("expected filter to reject: shard does not match")
}
}
func TestFilterWithAttrsAndPrefixFunction_NilShardSelector(t *testing.T) {
pod := makeExamplePod("pod4", "default", "uid-4", map[string]string{"app": "web"})
gr := schema.GroupResource{Resource: "pods"}
pred := storage.SelectionPredicate{
Label: labels.SelectorFromSet(labels.Set{"app": "web"}),
Field: fields.Everything(),
GetAttrs: storage.DefaultNamespaceScopedAttr,
}
filter := filterWithAttrsAndPrefixFunction("/pods/", pred, gr)
if !filter("/pods/default/pod4", labels.Set{"app": "web"}, fields.Set{"metadata.name": "pod4", "metadata.namespace": "default"}, pod) {
t.Error("expected filter to match: nil ShardSelector should pass everything through")
}
}
func TestFilterWithAttrsAndPrefixFunction_EverythingSelector(t *testing.T) {
pod := makeExamplePod("pod5", "default", "uid-5", nil)
gr := schema.GroupResource{Resource: "pods"}
pred := storage.SelectionPredicate{
Label: labels.Everything(),
Field: fields.Everything(),
ShardSelector: sharding.Everything(),
GetAttrs: storage.DefaultNamespaceScopedAttr,
}
filter := filterWithAttrsAndPrefixFunction("/pods/", pred, gr)
if !filter("/pods/default/pod5", labels.Set{}, fields.Set{"metadata.name": "pod5", "metadata.namespace": "default"}, pod) {
t.Error("expected filter to match: Everything() selector should pass all objects")
}
}
func TestFilterWithAttrsAndPrefixFunction_WrongPrefix(t *testing.T) {
uid := "test-uid-6"
pod := makeExamplePod("pod6", "default", uid, nil)
gr := schema.GroupResource{Resource: "pods"}
pred := storage.SelectionPredicate{
Label: labels.Everything(),
Field: fields.Everything(),
ShardSelector: selectorIncludingUID(uid),
GetAttrs: storage.DefaultNamespaceScopedAttr,
}
filter := filterWithAttrsAndPrefixFunction("/pods/", pred, gr)
if filter("/configmaps/default/cm1", nil, nil, pod) {
t.Error("expected filter to reject: key prefix doesn't match")
}
}
func TestFilterWithAttrsAndPrefixFunction_ShardErrorDropsEvent(t *testing.T) {
obj := &runtime.Unknown{}
gr := schema.GroupResource{Resource: "pods"}
pred := storage.SelectionPredicate{
Label: labels.Everything(),
Field: fields.Everything(),
ShardSelector: sharding.NewSelector(sharding.ShardRangeRequirement{
Key: "object.metadata.uid",
Start: "0x0000000000000000",
End: "0x10000000000000000",
}),
GetAttrs: storage.DefaultNamespaceScopedAttr,
}
filter := filterWithAttrsAndPrefixFunction("/pods/", pred, gr)
if filter("/pods/default/pod-err", nil, nil, obj) {
t.Error("expected filter to reject: shard matching should fail on unstructured object")
}
}
func TestFilterWithAttrsAndPrefixFunction_NamespaceSharding(t *testing.T) {
ns := "my-namespace"
pod := makeExamplePod("pod-ns", ns, "some-uid", nil)
nsHash := "0x" + sharding.HashField(ns)
sel := sharding.NewSelector(sharding.ShardRangeRequirement{
Key: "object.metadata.namespace",
Start: nsHash,
End: incrementHex(nsHash),
})
gr := schema.GroupResource{Resource: "pods"}
pred := storage.SelectionPredicate{
Label: labels.Everything(),
Field: fields.Everything(),
ShardSelector: sel,
GetAttrs: storage.DefaultNamespaceScopedAttr,
}
filter := filterWithAttrsAndPrefixFunction("/pods/", pred, gr)
if !filter("/pods/my-namespace/pod-ns", labels.Set{}, fields.Set{"metadata.name": "pod-ns", "metadata.namespace": ns}, pod) {
t.Error("expected filter to match: namespace-based shard selector should match")
}
}
func TestFilterWithAttrsAndPrefixFunction_NamespaceShardingMismatch(t *testing.T) {
ns := "other-namespace"
pod := makeExamplePod("pod-ns2", ns, "some-uid-2", nil)
targetHash := "0x" + sharding.HashField("my-namespace")
sel := sharding.NewSelector(sharding.ShardRangeRequirement{
Key: "object.metadata.namespace",
Start: targetHash,
End: incrementHex(targetHash),
})
gr := schema.GroupResource{Resource: "pods"}
pred := storage.SelectionPredicate{
Label: labels.Everything(),
Field: fields.Everything(),
ShardSelector: sel,
GetAttrs: storage.DefaultNamespaceScopedAttr,
}
filter := filterWithAttrsAndPrefixFunction("/pods/", pred, gr)
if filter("/pods/other-namespace/pod-ns2", labels.Set{}, fields.Set{"metadata.name": "pod-ns2", "metadata.namespace": ns}, pod) {
t.Error("expected filter to reject: namespace hash doesn't fall in shard range")
}
}

View file

@ -20,6 +20,8 @@ import (
"sync"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apiserver/pkg/features"
utilfeature "k8s.io/apiserver/pkg/util/feature"
compbasemetrics "k8s.io/component-base/metrics"
"k8s.io/component-base/metrics/legacyregistry"
)
@ -185,6 +187,26 @@ var (
Help: "Counter for status of consistency checks between etcd and watch cache",
StabilityLevel: compbasemetrics.ALPHA,
}, []string{"group", "resource", "status"})
WatchShardsTotal = compbasemetrics.NewGaugeVec(
&compbasemetrics.GaugeOpts{
Namespace: namespace,
Name: "watch_shards_total",
Help: "Number of active sharded watch connections broken by resource type.",
StabilityLevel: compbasemetrics.ALPHA,
},
[]string{"group", "resource"},
)
WatchFilteredEventsTotal = compbasemetrics.NewCounterVec(
&compbasemetrics.CounterOpts{
Namespace: namespace,
Name: "watch_filtered_events_total",
Help: "Counter of events filtered out by shard selector during watch dispatch, broken by resource type.",
StabilityLevel: compbasemetrics.ALPHA,
},
[]string{"group", "resource"},
)
)
var registerMetrics sync.Once
@ -208,6 +230,10 @@ func Register() {
legacyregistry.MustRegister(WatchCacheReadWait)
legacyregistry.MustRegister(ConsistentReadTotal)
legacyregistry.MustRegister(StorageConsistencyCheckTotal)
if utilfeature.DefaultFeatureGate.Enabled(features.ShardedListAndWatch) {
legacyregistry.MustRegister(WatchShardsTotal)
legacyregistry.MustRegister(WatchFilteredEventsTotal)
}
})
}
@ -225,7 +251,22 @@ func RecordResourceVersion(groupResource schema.GroupResource, resourceVersion u
watchCacheResourceVersion.WithLabelValues(groupResource.Group, groupResource.Resource).Set(float64(resourceVersion % 1000000000000000))
}
// RecordsWatchCacheCapacityChange record watchCache capacity resize(increase or decrease) operations.
// RecordShardedWatchStarted increments the active sharded watch gauge for the given resource.
func RecordShardedWatchStarted(groupResource schema.GroupResource) {
WatchShardsTotal.WithLabelValues(groupResource.Group, groupResource.Resource).Inc()
}
// RecordShardedWatchStopped decrements the active sharded watch gauge for the given resource.
func RecordShardedWatchStopped(groupResource schema.GroupResource) {
WatchShardsTotal.WithLabelValues(groupResource.Group, groupResource.Resource).Dec()
}
// RecordWatchFilteredEvent increments the counter for events filtered by shard selector.
func RecordWatchFilteredEvent(groupResource schema.GroupResource) {
WatchFilteredEventsTotal.WithLabelValues(groupResource.Group, groupResource.Resource).Inc()
}
// RecordsWatchCacheCapacityChange records watchCache capacity resize(increase or decrease) operations.
func RecordsWatchCacheCapacityChange(groupResource schema.GroupResource, old, new int) {
WatchCacheCapacity.WithLabelValues(groupResource.Group, groupResource.Resource).Set(float64(new))
if old < new {

View file

@ -895,7 +895,11 @@ func (s *store) GetList(ctx context.Context, key string, opts storage.ListOption
if err != nil {
return err
}
return s.versioner.UpdateList(listObj, uint64(withRev), continueValue, remainingItemCount)
if err := s.versioner.UpdateList(listObj, uint64(withRev), continueValue, remainingItemCount); err != nil {
return err
}
opts.Predicate.SetShardInfoOnList(listObj)
return nil
}
func (s *store) getList(ctx context.Context, keyPrefix string, recursive bool, options kubernetes.ListOptions) (resp kubernetes.ListResponse, err error) {

View file

@ -20,9 +20,11 @@ import (
"context"
"k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/fields"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/sharding"
"k8s.io/apiserver/pkg/endpoints/request"
)
@ -82,12 +84,17 @@ type SelectionPredicate struct {
Limit int64
Continue string
AllowWatchBookmarks bool
// ShardSelector is the parsed shard selector for filtering objects by hash range.
ShardSelector sharding.Selector
}
// Matches returns true if the given object's labels and fields (as
// returned by s.GetAttrs) match s.Label and s.Field. An error is
// returned if s.GetAttrs fails.
func (s *SelectionPredicate) Matches(obj runtime.Object) (bool, error) {
if matched, err := s.MatchesSharding(obj); err != nil || !matched {
return matched, err
}
if s.Empty() {
return true, nil
}
@ -168,6 +175,24 @@ func (s *SelectionPredicate) MatcherIndex(ctx context.Context) []MatchValue {
return result
}
// MatchesSharding returns true if the given object matches the sharding configuration.
// If ShardSelector is set and non-empty, it delegates to ShardSelector.Matches().
func (s *SelectionPredicate) MatchesSharding(obj runtime.Object) (bool, error) {
if s.ShardSelector != nil && !s.ShardSelector.Empty() {
return s.ShardSelector.Matches(obj)
}
return true, nil
}
// SetShardInfoOnList sets shard metadata on the list response if sharding is active.
func (s *SelectionPredicate) SetShardInfoOnList(listObj runtime.Object) {
if s.ShardSelector != nil && !s.ShardSelector.Empty() {
if setter, ok := listObj.(metav1.ShardedListInterface); ok {
setter.SetShardInfo(&metav1.ShardInfo{Selector: s.ShardSelector.String()})
}
}
}
func isNamespaceScopedRequest(ctx context.Context) (string, bool) {
re, _ := request.RequestInfoFrom(ctx)
if re == nil || len(re.Namespace) == 0 {

View file

@ -160,6 +160,7 @@ The function passed to Eventually returned the following error:
ResourceVersion: "",
Continue: "",
RemainingItemCount: nil,
ShardInfo: nil,
},
Status: "Failure",
Message: "pods \"no-such-pod\" not found",
@ -191,6 +192,7 @@ The function passed to Eventually returned the following error:
ResourceVersion: "",
Continue: "",
RemainingItemCount: nil,
ShardInfo: nil,
},
Status: "Failure",
Message: "pods \"no-such-pod\" not found",
@ -222,6 +224,7 @@ The function passed to Eventually returned the following error:
ResourceVersion: "",
Continue: "",
RemainingItemCount: nil,
ShardInfo: nil,
},
Status: "Failure",
Message: "pods \"no-such-pod\" not found",
@ -248,6 +251,7 @@ The function passed to Eventually returned the following error:
ResourceVersion: "",
Continue: "",
RemainingItemCount: nil,
ShardInfo: nil,
},
Status: "Failure",
Message: "pods \"no-such-pod\" not found",
@ -279,6 +283,7 @@ The function passed to Eventually returned the following error:
ResourceVersion: "",
Continue: "",
RemainingItemCount: nil,
ShardInfo: nil,
},
Status: "Failure",
Message: "pods \"no-such-pod\" not found",
@ -305,6 +310,7 @@ The function passed to Eventually returned the following error:
ResourceVersion: "",
Continue: "",
RemainingItemCount: nil,
ShardInfo: nil,
},
Status: "Failure",
Message: "pods \"no-such-pod\" not found",

View file

@ -0,0 +1,315 @@
/*
Copyright The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package apiserver
import (
"fmt"
"math/big"
"testing"
"time"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/sharding"
"k8s.io/apimachinery/pkg/watch"
"k8s.io/apiserver/pkg/features"
utilfeature "k8s.io/apiserver/pkg/util/feature"
featuregatetesting "k8s.io/component-base/featuregate/testing"
"k8s.io/kubernetes/test/integration/framework"
)
// calculateShardRange divides the 64-bit hash space evenly across shards and
// returns the hex-encoded start (inclusive) and end (exclusive) for the given index.
func calculateShardRange(index, total int) (start, end string) {
maxVal := new(big.Int).Lsh(big.NewInt(1), 64) // 2^64
span := new(big.Int).Div(maxVal, big.NewInt(int64(total)))
startVal := new(big.Int).Mul(span, big.NewInt(int64(index)))
endVal := new(big.Int).Mul(span, big.NewInt(int64(index+1)))
if index == total-1 {
endVal = maxVal // last shard covers remainder
}
start = fmt.Sprintf("0x%016x", startVal)
end = fmt.Sprintf("0x%016x", endVal)
return start, end
}
func shardSelectorString(index, total int) string {
start, end := calculateShardRange(index, total)
return fmt.Sprintf("shardRange(object.metadata.uid, '%s', '%s')", start, end)
}
// hexLess compares two lowercase hex strings numerically, matching the
// server-side sharding comparison logic.
func hexLess(a, b string) bool {
if len(a) != len(b) {
return len(a) < len(b)
}
return a < b
}
// objectInShard returns true if the object's UID hash falls within the given shard range.
func objectInShard(uid string, index, total int) bool {
start, end := calculateShardRange(index, total)
hash := "0x" + sharding.HashField(uid)
if hexLess(hash, start) {
return false
}
if !hexLess(hash, end) {
return false
}
return true
}
func TestShardedList(t *testing.T) {
featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.ShardedListAndWatch, true)
ctx, client, _, tearDownFn := setup(t)
defer tearDownFn()
ns := framework.CreateNamespaceOrDie(client, "shard-list", t)
defer framework.DeleteNamespaceOrDie(client, ns, t)
// Create a batch of configmaps.
const numObjects = 20
created := make([]*v1.ConfigMap, 0, numObjects)
for range numObjects {
cm, err := client.CoreV1().ConfigMaps(ns.Name).Create(ctx, &v1.ConfigMap{
ObjectMeta: metav1.ObjectMeta{
GenerateName: "shard-test-",
},
}, metav1.CreateOptions{})
if err != nil {
t.Fatalf("failed to create configmap: %v", err)
}
created = append(created, cm)
}
const numShards = 3
allFound := make(map[string]bool)
for shard := range numShards {
selector := shardSelectorString(shard, numShards)
list, err := client.CoreV1().ConfigMaps(ns.Name).List(ctx, metav1.ListOptions{
ShardSelector: selector,
})
if err != nil {
t.Fatalf("shard %d: failed to list: %v", shard, err)
}
// The response must include shard info with the selector echoed back.
if list.ShardInfo == nil {
t.Errorf("shard %d: expected shardInfo to be set", shard)
} else if list.ShardInfo.Selector != selector {
t.Errorf("shard %d: expected shardInfo.selector=%q, got %q", shard, selector, list.ShardInfo.Selector)
}
for _, cm := range list.Items {
uid := string(cm.UID)
if !objectInShard(uid, shard, numShards) {
t.Errorf("shard %d: object %s (UID %s) should not be in this shard", shard, cm.Name, uid)
}
if allFound[uid] {
t.Errorf("shard %d: object %s (UID %s) appeared in multiple shards", shard, cm.Name, uid)
}
allFound[uid] = true
}
}
// Every created object must appear in exactly one shard.
for _, cm := range created {
if !allFound[string(cm.UID)] {
t.Errorf("object %s (UID %s) was not returned by any shard", cm.Name, cm.UID)
}
}
}
func TestShardedWatch(t *testing.T) {
featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.ShardedListAndWatch, true)
ctx, client, _, tearDownFn := setup(t)
defer tearDownFn()
ns := framework.CreateNamespaceOrDie(client, "shard-watch", t)
defer framework.DeleteNamespaceOrDie(client, ns, t)
// Get initial resource version.
list, err := client.CoreV1().ConfigMaps(ns.Name).List(ctx, metav1.ListOptions{})
if err != nil {
t.Fatalf("initial list failed: %v", err)
}
rv := list.ResourceVersion
const numShards = 2
// Start a watch per shard.
watchers := make([]watch.Interface, numShards)
for shard := range numShards {
w, err := client.CoreV1().ConfigMaps(ns.Name).Watch(ctx, metav1.ListOptions{
ResourceVersion: rv,
ShardSelector: shardSelectorString(shard, numShards),
})
if err != nil {
t.Fatalf("shard %d: failed to start watch: %v", shard, err)
}
defer w.Stop()
watchers[shard] = w
}
// Create objects and track which shard should see each one.
const numObjects = 10
expectedShard := make(map[string]int) // UID -> shard index
for range numObjects {
cm, err := client.CoreV1().ConfigMaps(ns.Name).Create(ctx, &v1.ConfigMap{
ObjectMeta: metav1.ObjectMeta{
GenerateName: "shard-watch-",
},
}, metav1.CreateOptions{})
if err != nil {
t.Fatalf("failed to create configmap: %v", err)
}
for shard := range numShards {
if objectInShard(string(cm.UID), shard, numShards) {
expectedShard[string(cm.UID)] = shard
break
}
}
}
// Collect events from each watcher.
received := make([]map[string]bool, numShards)
for i := range received {
received[i] = make(map[string]bool)
}
for shard := range numShards {
collectEvents(t, watchers[shard], received[shard], expectedShard)
}
// Verify every object was seen by exactly the right shard.
for uid, expectedIdx := range expectedShard {
if !received[expectedIdx][uid] {
t.Errorf("UID %s: expected in shard %d but not received", uid, expectedIdx)
}
for other := range numShards {
if other != expectedIdx && received[other][uid] {
t.Errorf("UID %s: received in shard %d but expected only in shard %d", uid, other, expectedIdx)
}
}
}
}
func collectEvents(t *testing.T, w watch.Interface, seen map[string]bool, expected map[string]int) {
t.Helper()
timeout := time.After(30 * time.Second)
remaining := 0
for range expected {
remaining++
}
// We only need events for UIDs in our expected set that belong to this watcher.
// But we don't know which watcher this is, so just collect all ADDED events until
// we've seen enough or timed out.
for {
select {
case evt, ok := <-w.ResultChan():
if !ok {
return
}
if evt.Type == watch.Added {
cm, ok := evt.Object.(*v1.ConfigMap)
if !ok {
continue
}
seen[string(cm.UID)] = true
}
case <-timeout:
return
}
}
}
func TestShardedListFeatureGateDisabled(t *testing.T) {
featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.ShardedListAndWatch, false)
ctx, client, _, tearDownFn := setup(t)
defer tearDownFn()
ns := framework.CreateNamespaceOrDie(client, "shard-disabled", t)
defer framework.DeleteNamespaceOrDie(client, ns, t)
// Create a configmap so the namespace is non-empty.
_, err := client.CoreV1().ConfigMaps(ns.Name).Create(ctx, &v1.ConfigMap{
ObjectMeta: metav1.ObjectMeta{Name: "test"},
}, metav1.CreateOptions{})
if err != nil {
t.Fatalf("failed to create configmap: %v", err)
}
// List with a shard selector — when the gate is off, the selector should be
// ignored and all objects returned (not sharded).
list, err := client.CoreV1().ConfigMaps(ns.Name).List(ctx, metav1.ListOptions{
ShardSelector: shardSelectorString(0, 2),
})
if err != nil {
t.Fatalf("list failed: %v", err)
}
if list.ShardInfo != nil {
t.Errorf("expected shardInfo=nil when feature gate is disabled")
}
if len(list.Items) != 1 {
t.Errorf("expected 1 item (selector ignored), got %d", len(list.Items))
}
}
func TestShardedListComplete(t *testing.T) {
featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.ShardedListAndWatch, true)
ctx, client, _, tearDownFn := setup(t)
defer tearDownFn()
ns := framework.CreateNamespaceOrDie(client, "shard-complete", t)
defer framework.DeleteNamespaceOrDie(client, ns, t)
// An empty/everything selector should return all objects without sharded=true.
const numObjects = 5
for range numObjects {
_, err := client.CoreV1().ConfigMaps(ns.Name).Create(ctx, &v1.ConfigMap{
ObjectMeta: metav1.ObjectMeta{
GenerateName: "complete-",
},
}, metav1.CreateOptions{})
if err != nil {
t.Fatalf("failed to create configmap: %v", err)
}
}
// A single shard covering the full range (shard 0 of 1) has empty start and end,
// which means no selector string — just use a normal list.
list, err := client.CoreV1().ConfigMaps(ns.Name).List(ctx, metav1.ListOptions{})
if err != nil {
t.Fatalf("list failed: %v", err)
}
if list.ShardInfo != nil {
t.Errorf("expected shardInfo=nil for unsharded list")
}
if len(list.Items) != numObjects {
t.Errorf("expected %d items, got %d", numObjects, len(list.Items))
}
}