diff --git a/pkg/features/kube_features.go b/pkg/features/kube_features.go index 9798965ba47..6cfc6e15573 100644 --- a/pkg/features/kube_features.go +++ b/pkg/features/kube_features.go @@ -2165,6 +2165,10 @@ var defaultVersionedKubernetesFeatureGates = map[featuregate.Feature]featuregate {Version: version.MustParse("1.36"), Default: false, LockToDefault: true, PreRelease: featuregate.Deprecated}, }, + genericfeatures.ShardedListAndWatch: { + {Version: version.MustParse("1.36"), Default: false, PreRelease: featuregate.Alpha}, + }, + genericfeatures.SizeBasedListCostEstimate: { {Version: version.MustParse("1.34"), Default: true, PreRelease: featuregate.Beta}, }, @@ -2640,6 +2644,8 @@ var defaultKubernetesFeatureGateDependencies = map[featuregate.Feature][]feature genericfeatures.SeparateCacheWatchRPC: {}, + genericfeatures.ShardedListAndWatch: {}, + genericfeatures.SizeBasedListCostEstimate: {}, genericfeatures.StorageVersionAPI: {genericfeatures.APIServerIdentity}, diff --git a/staging/src/k8s.io/apimachinery/pkg/apis/meta/internalversion/conversion.go b/staging/src/k8s.io/apimachinery/pkg/apis/meta/internalversion/conversion.go new file mode 100644 index 00000000000..79020b277ce --- /dev/null +++ b/staging/src/k8s.io/apimachinery/pkg/apis/meta/internalversion/conversion.go @@ -0,0 +1,59 @@ +/* +Copyright The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package internalversion + +import ( + "fmt" + + v1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/conversion" + "k8s.io/apimachinery/pkg/sharding" +) + +// Convert_v1_ListOptions_To_internalversion_ListOptions handles conversion from +// the wire-format v1.ListOptions to the internal ListOptions, including shard +// selector parsing. +func Convert_v1_ListOptions_To_internalversion_ListOptions(in *v1.ListOptions, out *ListOptions, s conversion.Scope) error { + if err := autoConvert_v1_ListOptions_To_internalversion_ListOptions(in, out, s); err != nil { + return err + } + + // Parse the new shardSelector field into a ShardSelector if set. + if in.ShardSelector != "" { + sel, err := sharding.Parse(in.ShardSelector) + if err != nil { + return fmt.Errorf("invalid shard selector: %w", err) + } + out.ShardSelector = sel + } + + return nil +} + +// Convert_internalversion_ListOptions_To_v1_ListOptions handles conversion from +// internal ListOptions to the wire-format v1.ListOptions. +func Convert_internalversion_ListOptions_To_v1_ListOptions(in *ListOptions, out *v1.ListOptions, s conversion.Scope) error { + if err := autoConvert_internalversion_ListOptions_To_v1_ListOptions(in, out, s); err != nil { + return err + } + + if in.ShardSelector != nil && !in.ShardSelector.Empty() { + out.ShardSelector = in.ShardSelector.String() + } + + return nil +} diff --git a/staging/src/k8s.io/apimachinery/pkg/apis/meta/internalversion/types.go b/staging/src/k8s.io/apimachinery/pkg/apis/meta/internalversion/types.go index 8c60e7d2a81..3a58d055e3b 100644 --- a/staging/src/k8s.io/apimachinery/pkg/apis/meta/internalversion/types.go +++ b/staging/src/k8s.io/apimachinery/pkg/apis/meta/internalversion/types.go @@ -21,6 +21,7 @@ import ( "k8s.io/apimachinery/pkg/fields" "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/sharding" ) // +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object @@ -89,6 +90,9 @@ type ListOptions struct { // Defaults to true if `resourceVersion=""` or `resourceVersion="0"` (for backward // compatibility reasons) and to false otherwise. SendInitialEvents *bool + + // ShardSelector is the parsed shard selector from the request. + ShardSelector sharding.Selector } // +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object diff --git a/staging/src/k8s.io/apimachinery/pkg/apis/meta/v1/meta.go b/staging/src/k8s.io/apimachinery/pkg/apis/meta/v1/meta.go index 92d3ed5e012..6ae1b095fc5 100644 --- a/staging/src/k8s.io/apimachinery/pkg/apis/meta/v1/meta.go +++ b/staging/src/k8s.io/apimachinery/pkg/apis/meta/v1/meta.go @@ -94,6 +94,13 @@ type ListInterface interface { SetRemainingItemCount(c *int64) } +// ShardedListInterface can be implemented by list types to indicate that they +// represent a sharded subset of the full collection rather than the complete list. +type ShardedListInterface interface { + GetShardInfo() *ShardInfo + SetShardInfo(*ShardInfo) +} + // Type exposes the type and APIVersion of versioned or internal API objects. // TODO: move this, and TypeMeta and ListMeta, to a different package type Type interface { @@ -113,6 +120,8 @@ func (meta *ListMeta) GetContinue() string { return meta.Continue func (meta *ListMeta) SetContinue(c string) { meta.Continue = c } func (meta *ListMeta) GetRemainingItemCount() *int64 { return meta.RemainingItemCount } func (meta *ListMeta) SetRemainingItemCount(c *int64) { meta.RemainingItemCount = c } +func (meta *ListMeta) GetShardInfo() *ShardInfo { return meta.ShardInfo } +func (meta *ListMeta) SetShardInfo(s *ShardInfo) { meta.ShardInfo = s } func (obj *TypeMeta) GetObjectKind() schema.ObjectKind { return obj } diff --git a/staging/src/k8s.io/apimachinery/pkg/apis/meta/v1/types.go b/staging/src/k8s.io/apimachinery/pkg/apis/meta/v1/types.go index f26b57065b7..c8b87f20c8e 100644 --- a/staging/src/k8s.io/apimachinery/pkg/apis/meta/v1/types.go +++ b/staging/src/k8s.io/apimachinery/pkg/apis/meta/v1/types.go @@ -92,6 +92,26 @@ type ListMeta struct { // should not rely on the remainingItemCount to be set or to be exact. // +optional RemainingItemCount *int64 `json:"remainingItemCount,omitempty" protobuf:"bytes,4,opt,name=remainingItemCount"` + + // shardInfo is set when the list is a filtered subset of the full collection, + // as selected by a shard selector on the request. It echoes back the selector + // so clients can verify which shard they received and merge sharded responses. + // Clients should not cache sharded list responses as a full representation + // of the collection. + // + // This is an alpha field and requires enabling the ShardedListAndWatch feature gate. + // +featureGate=ShardedListAndWatch + // +optional + ShardInfo *ShardInfo `json:"shardInfo,omitempty" protobuf:"bytes,5,opt,name=shardInfo"` +} + +// ShardInfo describes the shard selector that was applied to produce a list response. +// Its presence on a list response indicates the list is a filtered subset. +type ShardInfo struct { + // selector is the shard selector string from the request, echoed back so clients + // can verify which shard they received and merge responses from multiple shards. + // +required + Selector string `json:"selector" protobuf:"bytes,1,opt,name=selector"` } // Field path constants that are specific to the internal API @@ -430,6 +450,38 @@ type ListOptions struct { // compatibility reasons) and to false otherwise. // +optional SendInitialEvents *bool `json:"sendInitialEvents,omitempty" protobuf:"varint,11,opt,name=sendInitialEvents"` + + // shardSelector restricts the list of returned objects using a CEL-based + // shard selector expression. The format uses the shardRange() function + // combined with || (logical OR) to specify one or more hash ranges: + // + // shardRange(object.metadata.uid, '0x0', '0x8000000000000000') + // shardRange(object.metadata.uid, '0x0', '0x8000000000000000') || shardRange(object.metadata.uid, '0x8000000000000000', '0x10000000000000000') + // + // Field paths use CEL-style object-rooted syntax (e.g. "object.metadata.uid"), + // NOT the fieldSelector format ("metadata.uid"). Currently supported paths: + // - object.metadata.uid + // - object.metadata.namespace + // + // hexStart and hexEnd are single-quoted CEL string literals with a '0x' prefix, + // defining the inclusive lower and exclusive upper bounds over the 64-bit FNV-1a + // hash space. The full range is [0x0, 0x10000000000000000), where the exclusive + // upper bound equals 2^64. + // + // Examples: + // 2-shard split: + // shard 0: shardRange(object.metadata.uid, '0x0000000000000000', '0x8000000000000000') + // shard 1: shardRange(object.metadata.uid, '0x8000000000000000', '0x10000000000000000') + // 4-shard split: + // shard 0: shardRange(object.metadata.uid, '0x0000000000000000', '0x4000000000000000') + // shard 1: shardRange(object.metadata.uid, '0x4000000000000000', '0x8000000000000000') + // shard 2: shardRange(object.metadata.uid, '0x8000000000000000', '0xc000000000000000') + // shard 3: shardRange(object.metadata.uid, '0xc000000000000000', '0x10000000000000000') + // + // This is an alpha field and requires enabling the ShardedListAndWatch feature gate. + // +featureGate=ShardedListAndWatch + // +optional + ShardSelector string `json:"shardSelector,omitempty" protobuf:"bytes,15,opt,name=shardSelector"` } const ( diff --git a/staging/src/k8s.io/apimachinery/pkg/sharding/accessor.go b/staging/src/k8s.io/apimachinery/pkg/sharding/accessor.go new file mode 100644 index 00000000000..fab98ffca43 --- /dev/null +++ b/staging/src/k8s.io/apimachinery/pkg/sharding/accessor.go @@ -0,0 +1,50 @@ +/* +Copyright The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package sharding + +import ( + "fmt" + + "k8s.io/apimachinery/pkg/api/meta" + "k8s.io/apimachinery/pkg/runtime" +) + +// ResolveFieldValue extracts a metadata field value from a runtime.Object +// based on the given field path. +// +// Field paths use CEL-style object-rooted syntax ("object.metadata."), +// which differs from the fieldSelector format ("metadata."). The +// "object." prefix anchors the path to the resource being filtered. +// +// Supported field paths: +// - "object.metadata.uid" +// - "object.metadata.namespace" +func ResolveFieldValue(obj runtime.Object, fieldPath string) (string, error) { + accessor, err := meta.Accessor(obj) + if err != nil { + return "", fmt.Errorf("failed to access object metadata: %w", err) + } + + switch fieldPath { + case "object.metadata.uid": + return string(accessor.GetUID()), nil + case "object.metadata.namespace": + return accessor.GetNamespace(), nil + default: + return "", fmt.Errorf("unsupported field path: %q", fieldPath) + } +} diff --git a/staging/src/k8s.io/apimachinery/pkg/sharding/accessor_test.go b/staging/src/k8s.io/apimachinery/pkg/sharding/accessor_test.go new file mode 100644 index 00000000000..bee6d390259 --- /dev/null +++ b/staging/src/k8s.io/apimachinery/pkg/sharding/accessor_test.go @@ -0,0 +1,64 @@ +/* +Copyright The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package sharding + +import ( + "testing" + + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/types" +) + +func TestResolveFieldValue(t *testing.T) { + obj := &testObject{ + ObjectMeta: metav1.ObjectMeta{ + UID: types.UID("test-uid-123"), + Name: "test-name", + Namespace: "test-namespace", + }, + } + + tests := []struct { + fieldPath string + want string + wantErr bool + }{ + {"object.metadata.uid", "test-uid-123", false}, + {"object.metadata.namespace", "test-namespace", false}, + {"object.metadata.name", "", true}, + {"object.metadata.labels", "", true}, + {"invalid.path", "", true}, + } + + for _, tt := range tests { + t.Run(tt.fieldPath, func(t *testing.T) { + got, err := ResolveFieldValue(obj, tt.fieldPath) + if tt.wantErr { + if err == nil { + t.Errorf("expected error for fieldPath %q", tt.fieldPath) + } + return + } + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + if got != tt.want { + t.Errorf("ResolveFieldValue(%q) = %q, want %q", tt.fieldPath, got, tt.want) + } + }) + } +} diff --git a/staging/src/k8s.io/apimachinery/pkg/sharding/hash.go b/staging/src/k8s.io/apimachinery/pkg/sharding/hash.go new file mode 100644 index 00000000000..ae4dcfe2f6a --- /dev/null +++ b/staging/src/k8s.io/apimachinery/pkg/sharding/hash.go @@ -0,0 +1,30 @@ +/* +Copyright The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package sharding + +import ( + "fmt" + "hash/fnv" +) + +// HashField computes a hash of value and returns it +// as a 16-character lowercase hex string (no "0x" prefix). +func HashField(value string) string { + h := fnv.New64a() + h.Write([]byte(value)) + return fmt.Sprintf("%016x", h.Sum64()) +} diff --git a/staging/src/k8s.io/apimachinery/pkg/sharding/hash_test.go b/staging/src/k8s.io/apimachinery/pkg/sharding/hash_test.go new file mode 100644 index 00000000000..750352eef1f --- /dev/null +++ b/staging/src/k8s.io/apimachinery/pkg/sharding/hash_test.go @@ -0,0 +1,58 @@ +/* +Copyright The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package sharding + +import ( + "testing" +) + +func TestHashField(t *testing.T) { + tests := []struct { + input string + }{ + {""}, + {"abc"}, + {"test-uid-12345"}, + {"aaaaaaaa-bbbb-cccc-dddd-eeeeeeeeeeee"}, + } + + for _, tt := range tests { + result := HashField(tt.input) + if len(result) != 16 { + t.Errorf("HashField(%q) returned %q (len %d), expected 16 hex chars", tt.input, result, len(result)) + } + // Verify all chars are hex + for _, c := range result { + if (c < '0' || c > '9') && (c < 'a' || c > 'f') { + t.Errorf("HashField(%q) returned %q which contains non-hex char %q", tt.input, result, string(c)) + } + } + } + + // Determinism + h1 := HashField("test") + h2 := HashField("test") + if h1 != h2 { + t.Errorf("HashField is not deterministic: %q != %q", h1, h2) + } + + // Different inputs produce different outputs + h3 := HashField("different") + if h1 == h3 { + t.Errorf("HashField produced same hash for different inputs: %q", h1) + } +} diff --git a/staging/src/k8s.io/apimachinery/pkg/sharding/parser.go b/staging/src/k8s.io/apimachinery/pkg/sharding/parser.go new file mode 100644 index 00000000000..c11b8681a21 --- /dev/null +++ b/staging/src/k8s.io/apimachinery/pkg/sharding/parser.go @@ -0,0 +1,42 @@ +/* +Copyright The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package sharding + +import "fmt" + +// ParseFunc is the signature for shard selector parsers. +type ParseFunc func(string) (Selector, error) + +var registeredParser ParseFunc + +// RegisterParser registers the shard selector parser implementation. +// This is called by k8s.io/apiserver to inject the CEL-based parser. +// +// This registration mechanism is an alpha-level internal API and is subject +// to change or removal in future releases. Do not depend on it. +func RegisterParser(p ParseFunc) { + registeredParser = p +} + +// Parse parses a shard selector string into a Selector. +// A parser must be registered via RegisterParser before calling Parse. +func Parse(s string) (Selector, error) { + if registeredParser == nil { + return nil, fmt.Errorf("no shard selector parser registered") + } + return registeredParser(s) +} diff --git a/staging/src/k8s.io/apimachinery/pkg/sharding/parser_test.go b/staging/src/k8s.io/apimachinery/pkg/sharding/parser_test.go new file mode 100644 index 00000000000..bc90ea2b941 --- /dev/null +++ b/staging/src/k8s.io/apimachinery/pkg/sharding/parser_test.go @@ -0,0 +1,59 @@ +/* +Copyright The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package sharding + +import ( + "testing" +) + +func TestParseWithoutRegisteredParser(t *testing.T) { + old := registeredParser + defer func() { registeredParser = old }() + + registeredParser = nil + _, err := Parse("anything") + if err == nil { + t.Error("Parse() without registered parser should return error") + } +} + +func TestRegisterParserAndParse(t *testing.T) { + old := registeredParser + defer func() { registeredParser = old }() + + called := false + RegisterParser(func(s string) (Selector, error) { + called = true + return NewSelector(ShardRangeRequirement{ + Key: "object.metadata.uid", + Start: "0x0", + End: "0x8", + }), nil + }) + + sel, err := Parse("test") + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + if !called { + t.Error("registered parser was not called") + } + reqs := sel.Requirements() + if len(reqs) != 1 || reqs[0].Key != "object.metadata.uid" { + t.Errorf("unexpected requirements: %+v", reqs) + } +} diff --git a/staging/src/k8s.io/apimachinery/pkg/sharding/selector.go b/staging/src/k8s.io/apimachinery/pkg/sharding/selector.go new file mode 100644 index 00000000000..a454f263f16 --- /dev/null +++ b/staging/src/k8s.io/apimachinery/pkg/sharding/selector.go @@ -0,0 +1,128 @@ +/* +Copyright The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package sharding + +import ( + "fmt" + "strings" + + "k8s.io/apimachinery/pkg/runtime" +) + +// Selector represents a shard selector that can match objects based on +// hash ranges of their metadata fields. It follows the labels.Selector +// pattern from the Kubernetes API. +type Selector interface { + // Matches returns true if the given object matches the shard selector. + Matches(obj runtime.Object) (bool, error) + + // Empty returns true if the selector matches everything (no filtering). + Empty() bool + + // String returns the wire-format string representation that can be + // round-tripped through Parse. + String() string + + // Requirements returns the list of shard range requirements. + Requirements() []ShardRangeRequirement + + // DeepCopySelector returns a deep copy of the selector. + DeepCopySelector() Selector +} + +// Everything returns a selector that matches all objects. +func Everything() Selector { + return &everythingSelector{} +} + +type everythingSelector struct{} + +func (s *everythingSelector) Matches(_ runtime.Object) (bool, error) { return true, nil } +func (s *everythingSelector) Empty() bool { return true } +func (s *everythingSelector) String() string { return "" } +func (s *everythingSelector) Requirements() []ShardRangeRequirement { return nil } +func (s *everythingSelector) DeepCopySelector() Selector { return &everythingSelector{} } + +// shardSelector implements Selector with one or more shard range requirements. +type shardSelector struct { + requirements []ShardRangeRequirement +} + +func (s *shardSelector) Matches(obj runtime.Object) (bool, error) { + if len(s.requirements) == 0 { + return true, nil + } + // All requirements share the same key (enforced by the parser), + // so resolve the field value and compute the hash once. + value, err := ResolveFieldValue(obj, s.requirements[0].Key) + if err != nil { + return false, err + } + hash := "0x" + HashField(value) + + for _, req := range s.requirements { + if !hexLess(hash, req.Start) && hexLess(hash, req.End) { + return true, nil + } + } + return false, nil +} + +// hexLess compares two lowercase hex strings numerically. +// It handles strings of different lengths by treating shorter strings +// as having smaller magnitude (fewer digits = smaller number). +func hexLess(a, b string) bool { + if len(a) != len(b) { + return len(a) < len(b) + } + return a < b +} + +func (s *shardSelector) Empty() bool { + return len(s.requirements) == 0 +} + +func (s *shardSelector) String() string { + parts := make([]string, 0, len(s.requirements)) + for _, req := range s.requirements { + parts = append(parts, fmt.Sprintf("shardRange(%s, '%s', '%s')", req.Key, req.Start, req.End)) + } + return strings.Join(parts, " || ") +} + +func (s *shardSelector) Requirements() []ShardRangeRequirement { + result := make([]ShardRangeRequirement, len(s.requirements)) + copy(result, s.requirements) + return result +} + +func (s *shardSelector) DeepCopySelector() Selector { + reqs := make([]ShardRangeRequirement, len(s.requirements)) + copy(reqs, s.requirements) + return &shardSelector{requirements: reqs} +} + +// NewSelector creates a Selector from the given requirements. +// If no requirements are provided, returns Everything(). +func NewSelector(reqs ...ShardRangeRequirement) Selector { + if len(reqs) == 0 { + return Everything() + } + return &shardSelector{ + requirements: reqs, + } +} diff --git a/staging/src/k8s.io/apimachinery/pkg/sharding/selector_test.go b/staging/src/k8s.io/apimachinery/pkg/sharding/selector_test.go new file mode 100644 index 00000000000..afaaf4125c0 --- /dev/null +++ b/staging/src/k8s.io/apimachinery/pkg/sharding/selector_test.go @@ -0,0 +1,141 @@ +/* +Copyright The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package sharding + +import ( + "testing" + + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/types" +) + +// testObject is a minimal runtime.Object for testing. +type testObject struct { + metav1.TypeMeta `json:",inline"` + metav1.ObjectMeta `json:"metadata"` +} + +func (t *testObject) DeepCopyObject() runtime.Object { + return &testObject{ + TypeMeta: t.TypeMeta, + ObjectMeta: *t.ObjectMeta.DeepCopy(), + } +} + +func TestSelectorMatches(t *testing.T) { + obj := &testObject{ + ObjectMeta: metav1.ObjectMeta{ + UID: types.UID("test-uid-123"), + Name: "test-name", + Namespace: "test-namespace", + }, + } + + hash := "0x" + HashField("test-uid-123") + + tests := []struct { + name string + selector Selector + wantMatch bool + }{ + { + name: "everything matches", + selector: Everything(), + wantMatch: true, + }, + { + name: "empty selector matches", + selector: NewSelector(), + wantMatch: true, + }, + { + name: "full range matches", + selector: NewSelector(ShardRangeRequirement{ + Key: "object.metadata.uid", + Start: "0x0000000000000000", + End: "0x10000000000000000", + }), + wantMatch: true, + }, + { + name: "hash in specific range", + selector: NewSelector(ShardRangeRequirement{ + Key: "object.metadata.uid", + Start: hash, + End: hash + "f", // hash + "f" is always > hash + }), + wantMatch: true, + }, + { + name: "hash below start", + selector: NewSelector(ShardRangeRequirement{ + Key: "object.metadata.uid", + Start: "0xffffffffffffffff", + End: "0x10000000000000000", + }), + wantMatch: hash >= "0xffffffffffffffff", + }, + { + name: "hash at or above end", + selector: NewSelector(ShardRangeRequirement{ + Key: "object.metadata.uid", + Start: "0x0000000000000000", + End: "0x0000000000000001", + }), + wantMatch: hash < "0x0000000000000001", + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + matched, err := tt.selector.Matches(obj) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + if matched != tt.wantMatch { + t.Errorf("Matches() = %v, want %v (hash=%s)", matched, tt.wantMatch, hash) + } + }) + } +} + +func TestSelectorEmpty(t *testing.T) { + if !Everything().Empty() { + t.Error("Everything() should be empty") + } + if !NewSelector().Empty() { + t.Error("NewSelector() with no args should be empty") + } + + sel := NewSelector(ShardRangeRequirement{Key: "object.metadata.uid", Start: "0x0000000000000000", End: "0x8000000000000000"}) + if sel.Empty() { + t.Error("selector with requirement should not be empty") + } +} + +func TestSelectorString(t *testing.T) { + sel := NewSelector(ShardRangeRequirement{ + Key: "object.metadata.uid", + Start: "0x00", + End: "0x80", + }) + expected := "shardRange(object.metadata.uid, '0x00', '0x80')" + if sel.String() != expected { + t.Errorf("String() = %q, want %q", sel.String(), expected) + } +} diff --git a/staging/src/k8s.io/apimachinery/pkg/sharding/types.go b/staging/src/k8s.io/apimachinery/pkg/sharding/types.go new file mode 100644 index 00000000000..4684fc497fb --- /dev/null +++ b/staging/src/k8s.io/apimachinery/pkg/sharding/types.go @@ -0,0 +1,32 @@ +/* +Copyright The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package sharding + +// ShardRangeRequirement represents a single shard range requirement. +// It specifies a field path to hash and a hex range [Start, End) for filtering. +// The hash space is FNV-1a 64-bit: [0x0000000000000000, 0x10000000000000000). +// Both Start and End must be specified (no empty/unbounded values). +type ShardRangeRequirement struct { + // Key is the field path, e.g. "object.metadata.uid" + Key string + // Start is the inclusive lower bound as a 0x-prefixed lowercase hex string. + // Minimum value is "0x0000000000000000". + Start string + // End is the exclusive upper bound as a 0x-prefixed lowercase hex string. + // Maximum value is "0x10000000000000000" (2^64). + End string +} diff --git a/staging/src/k8s.io/apiserver/pkg/features/kube_features.go b/staging/src/k8s.io/apiserver/pkg/features/kube_features.go index d5236087b77..dbe776e8af4 100644 --- a/staging/src/k8s.io/apiserver/pkg/features/kube_features.go +++ b/staging/src/k8s.io/apiserver/pkg/features/kube_features.go @@ -227,6 +227,14 @@ const ( // This prevents watch cache from being starved by other watches. SeparateCacheWatchRPC featuregate.Feature = "SeparateCacheWatchRPC" + // owner: @jefftree + // kep: https://kep.k8s.io/5866 + // + // Enables the shard selector parameter on List/Watch requests, + // allowing clients to receive a filtered subset of objects based + // on hash ranges of metadata fields (e.g. UID). + ShardedListAndWatch featuregate.Feature = "ShardedListAndWatch" + // owner: @serathius // // Enables APF to use size of objects for estimating request cost. @@ -465,6 +473,10 @@ var defaultVersionedKubernetesFeatureGates = map[featuregate.Feature]featuregate {Version: version.MustParse("1.36"), Default: false, LockToDefault: true, PreRelease: featuregate.Deprecated}, }, + ShardedListAndWatch: { + {Version: version.MustParse("1.36"), Default: false, PreRelease: featuregate.Alpha}, + }, + SizeBasedListCostEstimate: { {Version: version.MustParse("1.34"), Default: true, PreRelease: featuregate.Beta}, }, diff --git a/staging/src/k8s.io/apiserver/pkg/registry/generic/registry/store.go b/staging/src/k8s.io/apiserver/pkg/registry/generic/registry/store.go index 7e7e67536d7..2700c3a6e5b 100644 --- a/staging/src/k8s.io/apiserver/pkg/registry/generic/registry/store.go +++ b/staging/src/k8s.io/apiserver/pkg/registry/generic/registry/store.go @@ -53,6 +53,8 @@ import ( flowcontrolrequest "k8s.io/apiserver/pkg/util/flowcontrol/request" "k8s.io/client-go/tools/cache" + _ "k8s.io/apiserver/pkg/sharding" // registers CEL-based shard selector parser + "k8s.io/klog/v2" ) @@ -393,6 +395,9 @@ func (e *Store) ListPredicate(ctx context.Context, p storage.SelectionPredicate, } p.Limit = options.Limit p.Continue = options.Continue + if utilfeature.DefaultFeatureGate.Enabled(features.ShardedListAndWatch) { + p.ShardSelector = options.ShardSelector + } list := e.NewListFunc() qualifiedResource := e.qualifiedResourceFromContext(ctx) storageOpts := storage.ListOptions{ @@ -1429,13 +1434,21 @@ func (e *Store) Watch(ctx context.Context, options *metainternalversion.ListOpti if options != nil { resourceVersion = options.ResourceVersion predicate.AllowWatchBookmarks = options.AllowWatchBookmarks + if utilfeature.DefaultFeatureGate.Enabled(features.ShardedListAndWatch) { + predicate.ShardSelector = options.ShardSelector + } } return e.WatchPredicate(ctx, predicate, resourceVersion, options.SendInitialEvents) } // WatchPredicate starts a watch for the items that matches. func (e *Store) WatchPredicate(ctx context.Context, p storage.SelectionPredicate, resourceVersion string, sendInitialEvents *bool) (watch.Interface, error) { - storageOpts := storage.ListOptions{ResourceVersion: resourceVersion, Predicate: p, Recursive: true, SendInitialEvents: sendInitialEvents} + storageOpts := storage.ListOptions{ + ResourceVersion: resourceVersion, + Predicate: p, + Recursive: true, + SendInitialEvents: sendInitialEvents, + } // if we're not already namespace-scoped, see if the field selector narrows the scope of the watch if requestNamespace, _ := genericapirequest.NamespaceFrom(ctx); len(requestNamespace) == 0 { diff --git a/staging/src/k8s.io/apiserver/pkg/sharding/parser.go b/staging/src/k8s.io/apiserver/pkg/sharding/parser.go new file mode 100644 index 00000000000..a7ac0dda309 --- /dev/null +++ b/staging/src/k8s.io/apiserver/pkg/sharding/parser.go @@ -0,0 +1,219 @@ +/* +Copyright The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package sharding + +import ( + "fmt" + "strings" + + celparser "github.com/google/cel-go/parser" + + "github.com/google/cel-go/common" + "github.com/google/cel-go/common/ast" + "github.com/google/cel-go/common/operators" + + apisharding "k8s.io/apimachinery/pkg/sharding" +) + +// Parse parses a CEL-based shard selector expression into a Selector. +// +// The expression format is: +// +// shardRange(object.metadata.uid, '0x0', '0x8000000000000000') +// shardRange(object.metadata.uid, '0x0', '0x8000000000000000') || shardRange(...) +// +// Only the shardRange() function and || operator are permitted. The CEL +// expression is parsed but never evaluated — the AST is walked to extract +// shard range requirements. +func Parse(expr string) (apisharding.Selector, error) { + expr = strings.TrimSpace(expr) + if expr == "" { + return nil, fmt.Errorf("empty shard selector is not allowed; omit the parameter for unfiltered lists") + } + + p, err := celparser.NewParser(celparser.Macros( /* no macros */ )) + if err != nil { + return nil, fmt.Errorf("failed to create CEL parser: %w", err) + } + + parsed, errs := p.Parse(common.NewTextSource(expr)) + if errs != nil && len(errs.GetErrors()) > 0 { + return nil, fmt.Errorf("CEL parse error: %s", errs.GetErrors()[0].Message) + } + + reqs, err := walkExpr(parsed.Expr()) + if err != nil { + return nil, err + } + + // Validate that all requirements use the same field key. + for i := 1; i < len(reqs); i++ { + if reqs[i].Key != reqs[0].Key { + return nil, fmt.Errorf("all shard ranges must use the same field, got %q and %q", reqs[0].Key, reqs[i].Key) + } + } + + return apisharding.NewSelector(reqs...), nil +} + +// walkExpr recursively walks the CEL AST and extracts ShardRangeRequirement values. +// The root expression must be either a shardRange() call or a chain of || operators +// combining shardRange() calls. +func walkExpr(e ast.Expr) ([]apisharding.ShardRangeRequirement, error) { + if e.Kind() == ast.CallKind { + call := e.AsCall() + fn := call.FunctionName() + + if fn == operators.LogicalOr { + // _||_ operator: recurse into both sides + args := call.Args() + if len(args) != 2 { + return nil, fmt.Errorf("|| operator requires exactly 2 arguments") + } + left, err := walkExpr(args[0]) + if err != nil { + return nil, err + } + right, err := walkExpr(args[1]) + if err != nil { + return nil, err + } + return append(left, right...), nil + } + + if fn == "shardRange" { + req, err := parseShardRangeCall(call) + if err != nil { + return nil, err + } + return []apisharding.ShardRangeRequirement{req}, nil + } + + return nil, fmt.Errorf("unsupported function %q; only shardRange() and || are allowed", fn) + } + + return nil, fmt.Errorf("unexpected expression kind %v; expected shardRange() call or || operator", e.Kind()) +} + +// parseShardRangeCall extracts a ShardRangeRequirement from a shardRange(field, start, end) call. +func parseShardRangeCall(call ast.CallExpr) (apisharding.ShardRangeRequirement, error) { + args := call.Args() + if len(args) != 3 { + return apisharding.ShardRangeRequirement{}, fmt.Errorf("shardRange() requires exactly 3 arguments, got %d", len(args)) + } + + // Arg 0: field path (select chain like object.metadata.uid) + fieldPath, err := extractFieldPath(args[0]) + if err != nil { + return apisharding.ShardRangeRequirement{}, fmt.Errorf("shardRange() first argument: %w", err) + } + + // Validate field path + switch fieldPath { + case "object.metadata.uid", "object.metadata.namespace": + // ok + default: + return apisharding.ShardRangeRequirement{}, fmt.Errorf("unsupported field path %q; supported: object.metadata.uid, object.metadata.namespace", fieldPath) + } + + // Arg 1: hex start (string literal) + hexStart, err := extractHexLiteral(args[1], "hexStart") + if err != nil { + return apisharding.ShardRangeRequirement{}, err + } + + // Arg 2: hex end (string literal) + hexEnd, err := extractHexLiteral(args[2], "hexEnd") + if err != nil { + return apisharding.ShardRangeRequirement{}, err + } + + // Validate start < end. Hex values are canonical (16 or 17 digits). + // A 17-digit value (only 0x10000000000000000) is always greater than a 16-digit value, + // so compare lengths first, then lexicographically within the same length. + if len(hexStart) > len(hexEnd) || (len(hexStart) == len(hexEnd) && hexStart >= hexEnd) { + return apisharding.ShardRangeRequirement{}, fmt.Errorf("shard range start %s must be less than end %s", hexStart, hexEnd) + } + + return apisharding.ShardRangeRequirement{ + Key: fieldPath, + Start: hexStart, + End: hexEnd, + }, nil +} + +// extractFieldPath walks a select chain (object.metadata.uid) and returns the dot-joined path. +func extractFieldPath(e ast.Expr) (string, error) { + switch e.Kind() { + case ast.IdentKind: + return e.AsIdent(), nil + case ast.SelectKind: + sel := e.AsSelect() + operand, err := extractFieldPath(sel.Operand()) + if err != nil { + return "", err + } + return operand + "." + sel.FieldName(), nil + default: + return "", fmt.Errorf("expected field path (e.g. object.metadata.uid), got expression kind %v", e.Kind()) + } +} + +// extractHexLiteral extracts a hex string from a CEL string literal. +// The literal must be a single-quoted string like '0xff' with a 0x prefix. +func extractHexLiteral(e ast.Expr, name string) (string, error) { + if e.Kind() != ast.LiteralKind { + return "", fmt.Errorf("%s must be a string literal (e.g. '0xff'), got expression kind %v", name, e.Kind()) + } + + val := e.AsLiteral() + s, ok := val.Value().(string) + if !ok { + return "", fmt.Errorf("%s must be a string literal, got %T", name, val.Value()) + } + + if !strings.HasPrefix(s, "0x") { + return "", fmt.Errorf("%s must have '0x' prefix, got %q", name, s) + } + + hex := s[2:] + if hex == "" { + return "", fmt.Errorf("%s: hex value is required after '0x'", name) + } + if len(hex) > 17 { + return "", fmt.Errorf("%s: hex value too long (%d chars, max 17): %q", name, len(hex), hex) + } + + for _, c := range hex { + if (c < '0' || c > '9') && (c < 'a' || c > 'f') { + return "", fmt.Errorf("%s: invalid hex character %q in %q", name, string(c), s) + } + } + + // Require exactly 16 hex digits, except for the special case + // "0x10000000000000000" (2^64) which is the exclusive upper bound. + if len(hex) == 17 { + if s != "0x10000000000000000" { + return "", fmt.Errorf("%s: 17-digit hex value must be '0x10000000000000000' (2^64), got %q", name, s) + } + } else if len(hex) != 16 { + padded := "0x" + strings.Repeat("0", 16-len(hex)) + hex + return "", fmt.Errorf("%s must be a 0x-prefixed 16-digit hex value, got %q (did you mean %q?)", name, s, padded) + } + + return s, nil +} diff --git a/staging/src/k8s.io/apiserver/pkg/sharding/parser_test.go b/staging/src/k8s.io/apiserver/pkg/sharding/parser_test.go new file mode 100644 index 00000000000..bf412a45bc1 --- /dev/null +++ b/staging/src/k8s.io/apiserver/pkg/sharding/parser_test.go @@ -0,0 +1,237 @@ +/* +Copyright The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package sharding + +import ( + "testing" + + apisharding "k8s.io/apimachinery/pkg/sharding" +) + +func TestParse(t *testing.T) { + tests := []struct { + name string + input string + wantErr bool + wantReq []apisharding.ShardRangeRequirement + }{ + { + name: "empty string", + input: "", + wantErr: true, + }, + { + name: "single requirement with uid", + input: "shardRange(object.metadata.uid, '0x0000000000000000', '0x8000000000000000')", + wantReq: []apisharding.ShardRangeRequirement{ + {Key: "object.metadata.uid", Start: "0x0000000000000000", End: "0x8000000000000000"}, + }, + }, + { + name: "full range with 2^64 end", + input: "shardRange(object.metadata.uid, '0x0000000000000000', '0x10000000000000000')", + wantReq: []apisharding.ShardRangeRequirement{ + {Key: "object.metadata.uid", Start: "0x0000000000000000", End: "0x10000000000000000"}, + }, + }, + { + name: "namespace field", + input: "shardRange(object.metadata.namespace, '0x00000000000000aa', '0x00000000000000ff')", + wantReq: []apisharding.ShardRangeRequirement{ + {Key: "object.metadata.namespace", Start: "0x00000000000000aa", End: "0x00000000000000ff"}, + }, + }, + { + name: "two requirements with ||", + input: "shardRange(object.metadata.uid, '0x0000000000000000', '0x8000000000000000') || shardRange(object.metadata.uid, '0x8000000000000000', '0x10000000000000000')", + wantReq: []apisharding.ShardRangeRequirement{ + {Key: "object.metadata.uid", Start: "0x0000000000000000", End: "0x8000000000000000"}, + {Key: "object.metadata.uid", Start: "0x8000000000000000", End: "0x10000000000000000"}, + }, + }, + { + name: "three requirements with ||", + input: "shardRange(object.metadata.uid, '0x0000000000000000', '0x0000000000000005') || shardRange(object.metadata.uid, '0x0000000000000005', '0x000000000000000a') || shardRange(object.metadata.uid, '0x000000000000000a', '0x000000000000000f')", + wantReq: []apisharding.ShardRangeRequirement{ + {Key: "object.metadata.uid", Start: "0x0000000000000000", End: "0x0000000000000005"}, + {Key: "object.metadata.uid", Start: "0x0000000000000005", End: "0x000000000000000a"}, + {Key: "object.metadata.uid", Start: "0x000000000000000a", End: "0x000000000000000f"}, + }, + }, + { + name: "no spaces", + input: "shardRange(object.metadata.uid,'0x0000000000000000','0x8000000000000000')", + wantReq: []apisharding.ShardRangeRequirement{ + {Key: "object.metadata.uid", Start: "0x0000000000000000", End: "0x8000000000000000"}, + }, + }, + { + name: "extra whitespace", + input: " shardRange( object.metadata.uid , '0x0000000000000000' , '0x8000000000000000' ) ", + wantReq: []apisharding.ShardRangeRequirement{ + {Key: "object.metadata.uid", Start: "0x0000000000000000", End: "0x8000000000000000"}, + }, + }, + // Error cases + { + name: "missing 0x prefix on start", + input: "shardRange(object.metadata.uid, '0000000000000000', '0x8000000000000000')", + wantErr: true, + }, + { + name: "missing 0x prefix on end", + input: "shardRange(object.metadata.uid, '0x0000000000000000', '8000000000000000')", + wantErr: true, + }, + { + name: "empty after 0x prefix", + input: "shardRange(object.metadata.uid, '0x', '0x8000000000000000')", + wantErr: true, + }, + { + name: "name field unsupported", + input: "shardRange(object.metadata.name, '0x00', '0x80')", + wantErr: true, + }, + { + name: "unsupported field", + input: "shardRange(object.metadata.labels, '0x00', '0x80')", + wantErr: true, + }, + { + name: "unknown function", + input: "invalidFunc(object.metadata.uid, '0x00', '0x80')", + wantErr: true, + }, + { + name: "hex too long (18 chars)", + input: "shardRange(object.metadata.uid, '0x000000000000000000', '0x80')", + wantErr: true, + }, + { + name: "invalid hex char", + input: "shardRange(object.metadata.uid, '0x0g', '0x80')", + wantErr: true, + }, + { + name: "&& operator not allowed", + input: "shardRange(object.metadata.uid, '0x0', '0x8') && shardRange(object.metadata.uid, '0x8', '0xf')", + wantErr: true, + }, + { + name: "integer literal instead of string", + input: "shardRange(object.metadata.uid, 0, 8)", + wantErr: true, + }, + { + name: "comparison operator", + input: "object.metadata.uid > '0x0'", + wantErr: true, + }, + { + name: "wrong number of arguments", + input: "shardRange(object.metadata.uid, '0x0')", + wantErr: true, + }, + { + name: "nested function call", + input: "shardRange(object.metadata.uid, hex(0), '0x8')", + wantErr: true, + }, + { + name: "mixed field keys", + input: "shardRange(object.metadata.uid, '0x0000000000000000', '0x8000000000000000') || shardRange(object.metadata.namespace, '0x8000000000000000', '0x10000000000000000')", + wantErr: true, + }, + { + name: "short hex rejected", + input: "shardRange(object.metadata.uid, '0x0', '0x8000000000000000')", + wantErr: true, + }, + { + name: "short hex rejected on end", + input: "shardRange(object.metadata.uid, '0x0000000000000000', '0xff')", + wantErr: true, + }, + { + name: "start equals end", + input: "shardRange(object.metadata.uid, '0x8000000000000000', '0x8000000000000000')", + wantErr: true, + }, + { + name: "start greater than end", + input: "shardRange(object.metadata.uid, '0xffffffffffffffff', '0x0000000000000000')", + wantErr: true, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + sel, err := Parse(tt.input) + if tt.wantErr { + if err == nil { + t.Errorf("Parse(%q) expected error, got nil", tt.input) + } + return + } + if err != nil { + t.Fatalf("Parse(%q) unexpected error: %v", tt.input, err) + } + + reqs := sel.Requirements() + if len(reqs) != len(tt.wantReq) { + t.Fatalf("Parse(%q) got %d requirements, want %d", tt.input, len(reqs), len(tt.wantReq)) + } + for i, req := range reqs { + if req != tt.wantReq[i] { + t.Errorf("Parse(%q) requirement[%d] = %+v, want %+v", tt.input, i, req, tt.wantReq[i]) + } + } + }) + } +} + +func TestParseRoundTrip(t *testing.T) { + tests := []string{ + "shardRange(object.metadata.uid, '0x0000000000000000', '0x8000000000000000')", + "shardRange(object.metadata.uid, '0x8000000000000000', '0x10000000000000000')", + "shardRange(object.metadata.uid, '0x0000000000000000', '0x10000000000000000')", + "shardRange(object.metadata.namespace, '0x00000000000000aa', '0x00000000000000ff')", + "shardRange(object.metadata.uid, '0x0000000000000000', '0x8000000000000000') || shardRange(object.metadata.uid, '0x8000000000000000', '0x10000000000000000')", + } + + for _, input := range tests { + t.Run(input, func(t *testing.T) { + sel, err := Parse(input) + if err != nil { + t.Fatalf("Parse(%q) error: %v", input, err) + } + output := sel.String() + if output != input { + t.Errorf("Parse(%q).String() = %q, want %q", input, output, input) + } + // Round-trip — parse(string(parse(input))) should be stable. + sel2, err := Parse(output) + if err != nil { + t.Fatalf("Parse(%q) (round-trip) error: %v", output, err) + } + if sel.String() != sel2.String() { + t.Errorf("round-trip unstable: %q -> %q -> %q", input, output, sel2.String()) + } + }) + } +} diff --git a/staging/src/k8s.io/apiserver/pkg/sharding/register.go b/staging/src/k8s.io/apiserver/pkg/sharding/register.go new file mode 100644 index 00000000000..31c47b5ab7d --- /dev/null +++ b/staging/src/k8s.io/apiserver/pkg/sharding/register.go @@ -0,0 +1,23 @@ +/* +Copyright The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package sharding + +import apisharding "k8s.io/apimachinery/pkg/sharding" + +func init() { + apisharding.RegisterParser(Parse) +} diff --git a/staging/src/k8s.io/apiserver/pkg/storage/cacher/cache_watcher.go b/staging/src/k8s.io/apiserver/pkg/storage/cacher/cache_watcher.go index 808491dbd60..8711596a3cf 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/cacher/cache_watcher.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/cacher/cache_watcher.go @@ -371,10 +371,10 @@ func (c *cacheWatcher) convertToWatchEvent(event *watchCacheEvent) *watch.Event return e } - curObjPasses := event.Type != watch.Deleted && c.filter(event.Key, event.ObjLabels, event.ObjFields) + curObjPasses := event.Type != watch.Deleted && c.filter(event.Key, event.ObjLabels, event.ObjFields, event.Object) oldObjPasses := false if event.PrevObject != nil { - oldObjPasses = c.filter(event.Key, event.PrevObjLabels, event.PrevObjFields) + oldObjPasses = c.filter(event.Key, event.PrevObjLabels, event.PrevObjFields, event.PrevObject) } if !curObjPasses && !oldObjPasses { // Watcher is not interested in that object. diff --git a/staging/src/k8s.io/apiserver/pkg/storage/cacher/cache_watcher_test.go b/staging/src/k8s.io/apiserver/pkg/storage/cacher/cache_watcher_test.go index 224391835e1..7d68dfbc61e 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/cacher/cache_watcher_test.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/cacher/cache_watcher_test.go @@ -29,6 +29,7 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/fields" "k8s.io/apimachinery/pkg/labels" + "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/util/wait" "k8s.io/apimachinery/pkg/watch" @@ -47,7 +48,7 @@ func TestCacheWatcherCleanupNotBlockedByResult(t *testing.T) { var lock sync.RWMutex var w *cacheWatcher count := 0 - filter := func(string, labels.Set, fields.Set) bool { return true } + filter := func(string, labels.Set, fields.Set, runtime.Object) bool { return true } forget := func(drainWatcher bool) { lock.Lock() defer lock.Unlock() @@ -77,7 +78,7 @@ func TestCacheWatcherCleanupNotBlockedByResult(t *testing.T) { } func TestCacheWatcherHandlesFiltering(t *testing.T) { - filter := func(_ string, _ labels.Set, field fields.Set) bool { + filter := func(_ string, _ labels.Set, field fields.Set, _ runtime.Object) bool { return field["spec.nodeName"] == "host" } forget := func(bool) {} @@ -209,7 +210,7 @@ TestCase: func TestCacheWatcherStoppedInAnotherGoroutine(t *testing.T) { var w *cacheWatcher done := make(chan struct{}) - filter := func(string, labels.Set, fields.Set) bool { return true } + filter := func(string, labels.Set, fields.Set, runtime.Object) bool { return true } forget := func(drainWatcher bool) { w.setDrainInputBufferLocked(drainWatcher) w.stopLocked() @@ -303,7 +304,7 @@ func TestResourceVersionAfterInitEvents(t *testing.T) { t.Fatal(err) } - filter := func(_ string, _ labels.Set, _ fields.Set) bool { return true } + filter := func(_ string, _ labels.Set, _ fields.Set, _ runtime.Object) bool { return true } forget := func(_ bool) {} deadline := time.Now().Add(time.Minute) w := newCacheWatcher(numObjects+1, filter, forget, storage.APIObjectVersioner{}, deadline, true, schema.GroupResource{Resource: "pods"}, "") @@ -343,7 +344,7 @@ func TestResourceVersionAfterInitEvents(t *testing.T) { } func TestTimeBucketWatchersBasic(t *testing.T) { - filter := func(_ string, _ labels.Set, _ fields.Set) bool { + filter := func(_ string, _ labels.Set, _ fields.Set, _ runtime.Object) bool { return true } forget := func(bool) {} @@ -404,7 +405,7 @@ func TestCacheWatcherDraining(t *testing.T) { var lock sync.RWMutex var w *cacheWatcher count := 0 - filter := func(string, labels.Set, fields.Set) bool { return true } + filter := func(string, labels.Set, fields.Set, runtime.Object) bool { return true } forget := func(drainWatcher bool) { lock.Lock() defer lock.Unlock() @@ -445,7 +446,7 @@ func TestCacheWatcherDrainingRequestedButNotDrained(t *testing.T) { var lock sync.RWMutex var w *cacheWatcher count := 0 - filter := func(string, labels.Set, fields.Set) bool { return true } + filter := func(string, labels.Set, fields.Set, runtime.Object) bool { return true } forget := func(drainWatcher bool) { lock.Lock() defer lock.Unlock() @@ -479,7 +480,7 @@ func TestCacheWatcherDrainingNoBookmarkAfterResourceVersionReceived(t *testing.T var lock sync.RWMutex var w *cacheWatcher count := 0 - filter := func(string, labels.Set, fields.Set) bool { return true } + filter := func(string, labels.Set, fields.Set, runtime.Object) bool { return true } forget := func(drainWatcher bool) { lock.Lock() defer lock.Unlock() @@ -543,7 +544,7 @@ func TestCacheWatcherDrainingNoBookmarkAfterResourceVersionSent(t *testing.T) { watchInitializationSignal := utilflowcontrol.NewInitializationSignal() ctx := utilflowcontrol.WithInitializationSignal(context.Background(), watchInitializationSignal) count := 0 - filter := func(string, labels.Set, fields.Set) bool { return true } + filter := func(string, labels.Set, fields.Set, runtime.Object) bool { return true } forget := func(drainWatcher bool) { lock.Lock() defer lock.Unlock() @@ -606,7 +607,7 @@ func TestCacheWatcherDrainingNoBookmarkAfterResourceVersionSent(t *testing.T) { func TestBookmarkAfterResourceVersionWatchers(t *testing.T) { newWatcher := func(id string, deadline time.Time) *cacheWatcher { - w := newCacheWatcher(0, func(_ string, _ labels.Set, _ fields.Set) bool { return true }, func(bool) {}, storage.APIObjectVersioner{}, deadline, true, schema.GroupResource{Resource: "pods"}, id) + w := newCacheWatcher(0, func(_ string, _ labels.Set, _ fields.Set, _ runtime.Object) bool { return true }, func(bool) {}, storage.APIObjectVersioner{}, deadline, true, schema.GroupResource{Resource: "pods"}, id) w.setBookmarkAfterResourceVersion(10) return w } diff --git a/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher.go b/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher.go index c2993872371..1923984f22e 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher.go @@ -36,6 +36,7 @@ import ( "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime/schema" + utilruntime "k8s.io/apimachinery/pkg/util/runtime" "k8s.io/apimachinery/pkg/util/wait" "k8s.io/apimachinery/pkg/watch" "k8s.io/apiserver/pkg/audit" @@ -247,7 +248,7 @@ func (t *watcherBookmarkTimeBuckets) popExpiredWatchersThreadUnsafe() [][]*cache return expiredWatchers } -type filterWithAttrsFunc func(key string, l labels.Set, f fields.Set) bool +type filterWithAttrsFunc func(key string, l labels.Set, f fields.Set, obj runtime.Object) bool type indexedTriggerFunc struct { indexName string @@ -602,7 +603,7 @@ func (c *Cacher) Watch(ctx context.Context, key string, opts storage.ListOptions // to compute watcher.forget function (which has to happen under lock). watcher := newCacheWatcher( chanSize, - filterWithAttrsAndPrefixFunction(key, pred), + filterWithAttrsAndPrefixFunction(key, pred, c.groupResource), emptyFunc, c.versioner, deadline, @@ -676,6 +677,16 @@ func (c *Cacher) Watch(ctx context.Context, key string, opts storage.ListOptions return newImmediateCloseWatcher(), nil } + isSharded := pred.ShardSelector != nil && !pred.ShardSelector.Empty() + if isSharded { + metrics.RecordShardedWatchStarted(c.groupResource) + originalForget := watcher.forget + watcher.forget = func(drainWatcher bool) { + metrics.RecordShardedWatchStopped(c.groupResource) + originalForget(drainWatcher) + } + } + go watcher.processInterval(ctx, cacheInterval, requiredResourceVersion) return watcher, nil } @@ -794,7 +805,11 @@ func (c *Cacher) GetList(ctx context.Context, key string, opts storage.ListOptio if !ok { return fmt.Errorf("non *store.Element returned from storage: %v", obj) } - if opts.Predicate.MatchesObjectAttributes(elem.Labels, elem.Fields) { + shardMatch, err := opts.Predicate.MatchesSharding(elem.Object) + if err != nil { + return fmt.Errorf("shard matching failed: %w", err) + } + if shardMatch && opts.Predicate.MatchesObjectAttributes(elem.Labels, elem.Fields) { selectedObjects = append(selectedObjects, elem.Object) lastSelectedObjectKey = elem.Key } @@ -825,6 +840,7 @@ func (c *Cacher) GetList(ctx context.Context, key string, opts storage.ListOptio return err } } + opts.Predicate.SetShardInfoOnList(listObj) metrics.RecordListCacheMetrics(c.groupResource, indexUsed, len(resp.Items), listVal.Len()) return nil } @@ -1203,11 +1219,23 @@ func forgetWatcher(c *Cacher, w *cacheWatcher, index int, scope namespacedName, } } -func filterWithAttrsAndPrefixFunction(key string, p storage.SelectionPredicate) filterWithAttrsFunc { - filterFunc := func(objKey string, label labels.Set, field fields.Set) bool { +func filterWithAttrsAndPrefixFunction(key string, p storage.SelectionPredicate, groupResource schema.GroupResource) filterWithAttrsFunc { + isSharded := p.ShardSelector != nil && !p.ShardSelector.Empty() + filterFunc := func(objKey string, label labels.Set, field fields.Set, obj runtime.Object) bool { if !hasPathPrefix(objKey, key) { return false } + matches, err := p.MatchesSharding(obj) + if err != nil { + utilruntime.HandleError(fmt.Errorf("shard matching failed for %v: %w", groupResource, err)) + return false + } + if !matches { + if isSharded { + metrics.RecordWatchFilteredEvent(groupResource) + } + return false + } return p.MatchesObjectAttributes(label, field) } return filterFunc diff --git a/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher_whitebox_test.go b/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher_whitebox_test.go index e98d6fe2cdf..fa8ea1011b0 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher_whitebox_test.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher_whitebox_test.go @@ -41,6 +41,8 @@ import ( "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/apimachinery/pkg/sharding" + "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/version" "k8s.io/apimachinery/pkg/util/wait" "k8s.io/apimachinery/pkg/watch" @@ -2648,7 +2650,7 @@ func TestForgetWatcher(t *testing.T) { } w := newCacheWatcher( 0, - func(_ string, _ labels.Set, _ fields.Set) bool { return true }, + func(_ string, _ labels.Set, _ fields.Set, _ runtime.Object) bool { return true }, nil, storage.APIObjectVersioner{}, testingclock.NewFakeClock(time.Now()).Now().Add(2*time.Minute), @@ -3356,3 +3358,230 @@ func (f *fakeSnapshotter) RemoveLess(rv uint64) {} func (f *fakeSnapshotter) Len() int { return 0 } + +// --- Sharding unit tests for filterWithAttrsAndPrefixFunction --- + +// selectorIncludingUID builds a shard selector whose range contains the hash of uid. +func selectorIncludingUID(uid string) sharding.Selector { + hash := "0x" + sharding.HashField(uid) + return sharding.NewSelector(sharding.ShardRangeRequirement{ + Key: "object.metadata.uid", + Start: hash, + End: incrementHex(hash), + }) +} + +// selectorExcludingUID builds a shard selector whose range does NOT contain the hash of uid. +func selectorExcludingUID(uid string) sharding.Selector { + hash := "0x" + sharding.HashField(uid) + return sharding.NewSelector(sharding.ShardRangeRequirement{ + Key: "object.metadata.uid", + Start: "0x0000000000000000", + End: hash, + }) +} + +func incrementHex(hex string) string { + b := []byte(hex) + for i := len(b) - 1; i >= 2; i-- { + if b[i] < '9' { + b[i]++ + return string(b) + } else if b[i] == '9' { + b[i] = 'a' + return string(b) + } else if b[i] < 'f' { + b[i]++ + return string(b) + } + b[i] = '0' + } + return "0x10000000000000000" +} + +func makeExamplePod(name, namespace, uid string, podLabels map[string]string) *example.Pod { + return &example.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: name, + Namespace: namespace, + UID: types.UID(uid), + Labels: podLabels, + }, + } +} + +func TestFilterWithAttrsAndPrefixFunction_ShardMatchAndLabels(t *testing.T) { + uid := "test-uid-1" + pod := makeExamplePod("pod1", "default", uid, map[string]string{"app": "web"}) + gr := schema.GroupResource{Resource: "pods"} + + pred := storage.SelectionPredicate{ + Label: labels.SelectorFromSet(labels.Set{"app": "web"}), + Field: fields.Everything(), + ShardSelector: selectorIncludingUID(uid), + GetAttrs: storage.DefaultNamespaceScopedAttr, + } + filter := filterWithAttrsAndPrefixFunction("/pods/", pred, gr) + + if !filter("/pods/default/pod1", labels.Set{"app": "web"}, fields.Set{"metadata.name": "pod1", "metadata.namespace": "default"}, pod) { + t.Error("expected filter to match: shard matches and labels match") + } +} + +func TestFilterWithAttrsAndPrefixFunction_ShardMatchButLabelMismatch(t *testing.T) { + uid := "test-uid-2" + pod := makeExamplePod("pod2", "default", uid, map[string]string{"app": "api"}) + gr := schema.GroupResource{Resource: "pods"} + + pred := storage.SelectionPredicate{ + Label: labels.SelectorFromSet(labels.Set{"app": "web"}), + Field: fields.Everything(), + ShardSelector: selectorIncludingUID(uid), + GetAttrs: storage.DefaultNamespaceScopedAttr, + } + filter := filterWithAttrsAndPrefixFunction("/pods/", pred, gr) + + if filter("/pods/default/pod2", labels.Set{"app": "api"}, fields.Set{"metadata.name": "pod2", "metadata.namespace": "default"}, pod) { + t.Error("expected filter to reject: shard matches but labels don't") + } +} + +func TestFilterWithAttrsAndPrefixFunction_ShardMismatch(t *testing.T) { + uid := "test-uid-3" + pod := makeExamplePod("pod3", "default", uid, map[string]string{"app": "web"}) + gr := schema.GroupResource{Resource: "pods"} + + pred := storage.SelectionPredicate{ + Label: labels.Everything(), + Field: fields.Everything(), + ShardSelector: selectorExcludingUID(uid), + GetAttrs: storage.DefaultNamespaceScopedAttr, + } + filter := filterWithAttrsAndPrefixFunction("/pods/", pred, gr) + + if filter("/pods/default/pod3", labels.Set{"app": "web"}, fields.Set{"metadata.name": "pod3", "metadata.namespace": "default"}, pod) { + t.Error("expected filter to reject: shard does not match") + } +} + +func TestFilterWithAttrsAndPrefixFunction_NilShardSelector(t *testing.T) { + pod := makeExamplePod("pod4", "default", "uid-4", map[string]string{"app": "web"}) + gr := schema.GroupResource{Resource: "pods"} + + pred := storage.SelectionPredicate{ + Label: labels.SelectorFromSet(labels.Set{"app": "web"}), + Field: fields.Everything(), + GetAttrs: storage.DefaultNamespaceScopedAttr, + } + filter := filterWithAttrsAndPrefixFunction("/pods/", pred, gr) + + if !filter("/pods/default/pod4", labels.Set{"app": "web"}, fields.Set{"metadata.name": "pod4", "metadata.namespace": "default"}, pod) { + t.Error("expected filter to match: nil ShardSelector should pass everything through") + } +} + +func TestFilterWithAttrsAndPrefixFunction_EverythingSelector(t *testing.T) { + pod := makeExamplePod("pod5", "default", "uid-5", nil) + gr := schema.GroupResource{Resource: "pods"} + + pred := storage.SelectionPredicate{ + Label: labels.Everything(), + Field: fields.Everything(), + ShardSelector: sharding.Everything(), + GetAttrs: storage.DefaultNamespaceScopedAttr, + } + filter := filterWithAttrsAndPrefixFunction("/pods/", pred, gr) + + if !filter("/pods/default/pod5", labels.Set{}, fields.Set{"metadata.name": "pod5", "metadata.namespace": "default"}, pod) { + t.Error("expected filter to match: Everything() selector should pass all objects") + } +} + +func TestFilterWithAttrsAndPrefixFunction_WrongPrefix(t *testing.T) { + uid := "test-uid-6" + pod := makeExamplePod("pod6", "default", uid, nil) + gr := schema.GroupResource{Resource: "pods"} + + pred := storage.SelectionPredicate{ + Label: labels.Everything(), + Field: fields.Everything(), + ShardSelector: selectorIncludingUID(uid), + GetAttrs: storage.DefaultNamespaceScopedAttr, + } + filter := filterWithAttrsAndPrefixFunction("/pods/", pred, gr) + + if filter("/configmaps/default/cm1", nil, nil, pod) { + t.Error("expected filter to reject: key prefix doesn't match") + } +} + +func TestFilterWithAttrsAndPrefixFunction_ShardErrorDropsEvent(t *testing.T) { + obj := &runtime.Unknown{} + gr := schema.GroupResource{Resource: "pods"} + + pred := storage.SelectionPredicate{ + Label: labels.Everything(), + Field: fields.Everything(), + ShardSelector: sharding.NewSelector(sharding.ShardRangeRequirement{ + Key: "object.metadata.uid", + Start: "0x0000000000000000", + End: "0x10000000000000000", + }), + GetAttrs: storage.DefaultNamespaceScopedAttr, + } + filter := filterWithAttrsAndPrefixFunction("/pods/", pred, gr) + + if filter("/pods/default/pod-err", nil, nil, obj) { + t.Error("expected filter to reject: shard matching should fail on unstructured object") + } +} + +func TestFilterWithAttrsAndPrefixFunction_NamespaceSharding(t *testing.T) { + ns := "my-namespace" + pod := makeExamplePod("pod-ns", ns, "some-uid", nil) + + nsHash := "0x" + sharding.HashField(ns) + sel := sharding.NewSelector(sharding.ShardRangeRequirement{ + Key: "object.metadata.namespace", + Start: nsHash, + End: incrementHex(nsHash), + }) + + gr := schema.GroupResource{Resource: "pods"} + pred := storage.SelectionPredicate{ + Label: labels.Everything(), + Field: fields.Everything(), + ShardSelector: sel, + GetAttrs: storage.DefaultNamespaceScopedAttr, + } + filter := filterWithAttrsAndPrefixFunction("/pods/", pred, gr) + + if !filter("/pods/my-namespace/pod-ns", labels.Set{}, fields.Set{"metadata.name": "pod-ns", "metadata.namespace": ns}, pod) { + t.Error("expected filter to match: namespace-based shard selector should match") + } +} + +func TestFilterWithAttrsAndPrefixFunction_NamespaceShardingMismatch(t *testing.T) { + ns := "other-namespace" + pod := makeExamplePod("pod-ns2", ns, "some-uid-2", nil) + + targetHash := "0x" + sharding.HashField("my-namespace") + sel := sharding.NewSelector(sharding.ShardRangeRequirement{ + Key: "object.metadata.namespace", + Start: targetHash, + End: incrementHex(targetHash), + }) + + gr := schema.GroupResource{Resource: "pods"} + pred := storage.SelectionPredicate{ + Label: labels.Everything(), + Field: fields.Everything(), + ShardSelector: sel, + GetAttrs: storage.DefaultNamespaceScopedAttr, + } + filter := filterWithAttrsAndPrefixFunction("/pods/", pred, gr) + + if filter("/pods/other-namespace/pod-ns2", labels.Set{}, fields.Set{"metadata.name": "pod-ns2", "metadata.namespace": ns}, pod) { + t.Error("expected filter to reject: namespace hash doesn't fall in shard range") + } +} diff --git a/staging/src/k8s.io/apiserver/pkg/storage/cacher/metrics/metrics.go b/staging/src/k8s.io/apiserver/pkg/storage/cacher/metrics/metrics.go index 83941f46fa6..0981d45739f 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/cacher/metrics/metrics.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/cacher/metrics/metrics.go @@ -20,6 +20,8 @@ import ( "sync" "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/apiserver/pkg/features" + utilfeature "k8s.io/apiserver/pkg/util/feature" compbasemetrics "k8s.io/component-base/metrics" "k8s.io/component-base/metrics/legacyregistry" ) @@ -185,6 +187,26 @@ var ( Help: "Counter for status of consistency checks between etcd and watch cache", StabilityLevel: compbasemetrics.ALPHA, }, []string{"group", "resource", "status"}) + + WatchShardsTotal = compbasemetrics.NewGaugeVec( + &compbasemetrics.GaugeOpts{ + Namespace: namespace, + Name: "watch_shards_total", + Help: "Number of active sharded watch connections broken by resource type.", + StabilityLevel: compbasemetrics.ALPHA, + }, + []string{"group", "resource"}, + ) + + WatchFilteredEventsTotal = compbasemetrics.NewCounterVec( + &compbasemetrics.CounterOpts{ + Namespace: namespace, + Name: "watch_filtered_events_total", + Help: "Counter of events filtered out by shard selector during watch dispatch, broken by resource type.", + StabilityLevel: compbasemetrics.ALPHA, + }, + []string{"group", "resource"}, + ) ) var registerMetrics sync.Once @@ -208,6 +230,10 @@ func Register() { legacyregistry.MustRegister(WatchCacheReadWait) legacyregistry.MustRegister(ConsistentReadTotal) legacyregistry.MustRegister(StorageConsistencyCheckTotal) + if utilfeature.DefaultFeatureGate.Enabled(features.ShardedListAndWatch) { + legacyregistry.MustRegister(WatchShardsTotal) + legacyregistry.MustRegister(WatchFilteredEventsTotal) + } }) } @@ -225,7 +251,22 @@ func RecordResourceVersion(groupResource schema.GroupResource, resourceVersion u watchCacheResourceVersion.WithLabelValues(groupResource.Group, groupResource.Resource).Set(float64(resourceVersion % 1000000000000000)) } -// RecordsWatchCacheCapacityChange record watchCache capacity resize(increase or decrease) operations. +// RecordShardedWatchStarted increments the active sharded watch gauge for the given resource. +func RecordShardedWatchStarted(groupResource schema.GroupResource) { + WatchShardsTotal.WithLabelValues(groupResource.Group, groupResource.Resource).Inc() +} + +// RecordShardedWatchStopped decrements the active sharded watch gauge for the given resource. +func RecordShardedWatchStopped(groupResource schema.GroupResource) { + WatchShardsTotal.WithLabelValues(groupResource.Group, groupResource.Resource).Dec() +} + +// RecordWatchFilteredEvent increments the counter for events filtered by shard selector. +func RecordWatchFilteredEvent(groupResource schema.GroupResource) { + WatchFilteredEventsTotal.WithLabelValues(groupResource.Group, groupResource.Resource).Inc() +} + +// RecordsWatchCacheCapacityChange records watchCache capacity resize(increase or decrease) operations. func RecordsWatchCacheCapacityChange(groupResource schema.GroupResource, old, new int) { WatchCacheCapacity.WithLabelValues(groupResource.Group, groupResource.Resource).Set(float64(new)) if old < new { diff --git a/staging/src/k8s.io/apiserver/pkg/storage/etcd3/store.go b/staging/src/k8s.io/apiserver/pkg/storage/etcd3/store.go index 798567d1cd1..e55fdcde875 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/etcd3/store.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/etcd3/store.go @@ -895,7 +895,11 @@ func (s *store) GetList(ctx context.Context, key string, opts storage.ListOption if err != nil { return err } - return s.versioner.UpdateList(listObj, uint64(withRev), continueValue, remainingItemCount) + if err := s.versioner.UpdateList(listObj, uint64(withRev), continueValue, remainingItemCount); err != nil { + return err + } + opts.Predicate.SetShardInfoOnList(listObj) + return nil } func (s *store) getList(ctx context.Context, keyPrefix string, recursive bool, options kubernetes.ListOptions) (resp kubernetes.ListResponse, err error) { diff --git a/staging/src/k8s.io/apiserver/pkg/storage/selection_predicate.go b/staging/src/k8s.io/apiserver/pkg/storage/selection_predicate.go index 480b5a89342..419375c5f0b 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/selection_predicate.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/selection_predicate.go @@ -20,9 +20,11 @@ import ( "context" "k8s.io/apimachinery/pkg/api/meta" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/fields" "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/sharding" "k8s.io/apiserver/pkg/endpoints/request" ) @@ -82,12 +84,17 @@ type SelectionPredicate struct { Limit int64 Continue string AllowWatchBookmarks bool + // ShardSelector is the parsed shard selector for filtering objects by hash range. + ShardSelector sharding.Selector } // Matches returns true if the given object's labels and fields (as // returned by s.GetAttrs) match s.Label and s.Field. An error is // returned if s.GetAttrs fails. func (s *SelectionPredicate) Matches(obj runtime.Object) (bool, error) { + if matched, err := s.MatchesSharding(obj); err != nil || !matched { + return matched, err + } if s.Empty() { return true, nil } @@ -168,6 +175,24 @@ func (s *SelectionPredicate) MatcherIndex(ctx context.Context) []MatchValue { return result } +// MatchesSharding returns true if the given object matches the sharding configuration. +// If ShardSelector is set and non-empty, it delegates to ShardSelector.Matches(). +func (s *SelectionPredicate) MatchesSharding(obj runtime.Object) (bool, error) { + if s.ShardSelector != nil && !s.ShardSelector.Empty() { + return s.ShardSelector.Matches(obj) + } + return true, nil +} + +// SetShardInfoOnList sets shard metadata on the list response if sharding is active. +func (s *SelectionPredicate) SetShardInfoOnList(listObj runtime.Object) { + if s.ShardSelector != nil && !s.ShardSelector.Empty() { + if setter, ok := listObj.(metav1.ShardedListInterface); ok { + setter.SetShardInfo(&metav1.ShardInfo{Selector: s.ShardSelector.String()}) + } + } +} + func isNamespaceScopedRequest(ctx context.Context) (string, bool) { re, _ := request.RequestInfoFrom(ctx) if re == nil || len(re.Namespace) == 0 { diff --git a/test/e2e/framework/pod/wait_test.go b/test/e2e/framework/pod/wait_test.go index bc932bd2d0f..6de9600e99b 100644 --- a/test/e2e/framework/pod/wait_test.go +++ b/test/e2e/framework/pod/wait_test.go @@ -160,6 +160,7 @@ The function passed to Eventually returned the following error: ResourceVersion: "", Continue: "", RemainingItemCount: nil, + ShardInfo: nil, }, Status: "Failure", Message: "pods \"no-such-pod\" not found", @@ -191,6 +192,7 @@ The function passed to Eventually returned the following error: ResourceVersion: "", Continue: "", RemainingItemCount: nil, + ShardInfo: nil, }, Status: "Failure", Message: "pods \"no-such-pod\" not found", @@ -222,6 +224,7 @@ The function passed to Eventually returned the following error: ResourceVersion: "", Continue: "", RemainingItemCount: nil, + ShardInfo: nil, }, Status: "Failure", Message: "pods \"no-such-pod\" not found", @@ -248,6 +251,7 @@ The function passed to Eventually returned the following error: ResourceVersion: "", Continue: "", RemainingItemCount: nil, + ShardInfo: nil, }, Status: "Failure", Message: "pods \"no-such-pod\" not found", @@ -279,6 +283,7 @@ The function passed to Eventually returned the following error: ResourceVersion: "", Continue: "", RemainingItemCount: nil, + ShardInfo: nil, }, Status: "Failure", Message: "pods \"no-such-pod\" not found", @@ -305,6 +310,7 @@ The function passed to Eventually returned the following error: ResourceVersion: "", Continue: "", RemainingItemCount: nil, + ShardInfo: nil, }, Status: "Failure", Message: "pods \"no-such-pod\" not found", diff --git a/test/integration/apiserver/sharding_test.go b/test/integration/apiserver/sharding_test.go new file mode 100644 index 00000000000..a765db2f390 --- /dev/null +++ b/test/integration/apiserver/sharding_test.go @@ -0,0 +1,315 @@ +/* +Copyright The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package apiserver + +import ( + "fmt" + "math/big" + "testing" + "time" + + v1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/sharding" + "k8s.io/apimachinery/pkg/watch" + "k8s.io/apiserver/pkg/features" + utilfeature "k8s.io/apiserver/pkg/util/feature" + featuregatetesting "k8s.io/component-base/featuregate/testing" + "k8s.io/kubernetes/test/integration/framework" +) + +// calculateShardRange divides the 64-bit hash space evenly across shards and +// returns the hex-encoded start (inclusive) and end (exclusive) for the given index. +func calculateShardRange(index, total int) (start, end string) { + maxVal := new(big.Int).Lsh(big.NewInt(1), 64) // 2^64 + span := new(big.Int).Div(maxVal, big.NewInt(int64(total))) + + startVal := new(big.Int).Mul(span, big.NewInt(int64(index))) + endVal := new(big.Int).Mul(span, big.NewInt(int64(index+1))) + if index == total-1 { + endVal = maxVal // last shard covers remainder + } + + start = fmt.Sprintf("0x%016x", startVal) + end = fmt.Sprintf("0x%016x", endVal) + return start, end +} + +func shardSelectorString(index, total int) string { + start, end := calculateShardRange(index, total) + return fmt.Sprintf("shardRange(object.metadata.uid, '%s', '%s')", start, end) +} + +// hexLess compares two lowercase hex strings numerically, matching the +// server-side sharding comparison logic. +func hexLess(a, b string) bool { + if len(a) != len(b) { + return len(a) < len(b) + } + return a < b +} + +// objectInShard returns true if the object's UID hash falls within the given shard range. +func objectInShard(uid string, index, total int) bool { + start, end := calculateShardRange(index, total) + hash := "0x" + sharding.HashField(uid) + if hexLess(hash, start) { + return false + } + if !hexLess(hash, end) { + return false + } + return true +} + +func TestShardedList(t *testing.T) { + featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.ShardedListAndWatch, true) + + ctx, client, _, tearDownFn := setup(t) + defer tearDownFn() + + ns := framework.CreateNamespaceOrDie(client, "shard-list", t) + defer framework.DeleteNamespaceOrDie(client, ns, t) + + // Create a batch of configmaps. + const numObjects = 20 + created := make([]*v1.ConfigMap, 0, numObjects) + for range numObjects { + cm, err := client.CoreV1().ConfigMaps(ns.Name).Create(ctx, &v1.ConfigMap{ + ObjectMeta: metav1.ObjectMeta{ + GenerateName: "shard-test-", + }, + }, metav1.CreateOptions{}) + if err != nil { + t.Fatalf("failed to create configmap: %v", err) + } + created = append(created, cm) + } + + const numShards = 3 + allFound := make(map[string]bool) + + for shard := range numShards { + selector := shardSelectorString(shard, numShards) + list, err := client.CoreV1().ConfigMaps(ns.Name).List(ctx, metav1.ListOptions{ + ShardSelector: selector, + }) + if err != nil { + t.Fatalf("shard %d: failed to list: %v", shard, err) + } + + // The response must include shard info with the selector echoed back. + if list.ShardInfo == nil { + t.Errorf("shard %d: expected shardInfo to be set", shard) + } else if list.ShardInfo.Selector != selector { + t.Errorf("shard %d: expected shardInfo.selector=%q, got %q", shard, selector, list.ShardInfo.Selector) + } + + for _, cm := range list.Items { + uid := string(cm.UID) + if !objectInShard(uid, shard, numShards) { + t.Errorf("shard %d: object %s (UID %s) should not be in this shard", shard, cm.Name, uid) + } + if allFound[uid] { + t.Errorf("shard %d: object %s (UID %s) appeared in multiple shards", shard, cm.Name, uid) + } + allFound[uid] = true + } + } + + // Every created object must appear in exactly one shard. + for _, cm := range created { + if !allFound[string(cm.UID)] { + t.Errorf("object %s (UID %s) was not returned by any shard", cm.Name, cm.UID) + } + } +} + +func TestShardedWatch(t *testing.T) { + featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.ShardedListAndWatch, true) + + ctx, client, _, tearDownFn := setup(t) + defer tearDownFn() + + ns := framework.CreateNamespaceOrDie(client, "shard-watch", t) + defer framework.DeleteNamespaceOrDie(client, ns, t) + + // Get initial resource version. + list, err := client.CoreV1().ConfigMaps(ns.Name).List(ctx, metav1.ListOptions{}) + if err != nil { + t.Fatalf("initial list failed: %v", err) + } + rv := list.ResourceVersion + + const numShards = 2 + + // Start a watch per shard. + watchers := make([]watch.Interface, numShards) + for shard := range numShards { + w, err := client.CoreV1().ConfigMaps(ns.Name).Watch(ctx, metav1.ListOptions{ + ResourceVersion: rv, + ShardSelector: shardSelectorString(shard, numShards), + }) + if err != nil { + t.Fatalf("shard %d: failed to start watch: %v", shard, err) + } + defer w.Stop() + watchers[shard] = w + } + + // Create objects and track which shard should see each one. + const numObjects = 10 + expectedShard := make(map[string]int) // UID -> shard index + + for range numObjects { + cm, err := client.CoreV1().ConfigMaps(ns.Name).Create(ctx, &v1.ConfigMap{ + ObjectMeta: metav1.ObjectMeta{ + GenerateName: "shard-watch-", + }, + }, metav1.CreateOptions{}) + if err != nil { + t.Fatalf("failed to create configmap: %v", err) + } + for shard := range numShards { + if objectInShard(string(cm.UID), shard, numShards) { + expectedShard[string(cm.UID)] = shard + break + } + } + } + + // Collect events from each watcher. + received := make([]map[string]bool, numShards) + for i := range received { + received[i] = make(map[string]bool) + } + + for shard := range numShards { + collectEvents(t, watchers[shard], received[shard], expectedShard) + } + + // Verify every object was seen by exactly the right shard. + for uid, expectedIdx := range expectedShard { + if !received[expectedIdx][uid] { + t.Errorf("UID %s: expected in shard %d but not received", uid, expectedIdx) + } + for other := range numShards { + if other != expectedIdx && received[other][uid] { + t.Errorf("UID %s: received in shard %d but expected only in shard %d", uid, other, expectedIdx) + } + } + } +} + +func collectEvents(t *testing.T, w watch.Interface, seen map[string]bool, expected map[string]int) { + t.Helper() + timeout := time.After(30 * time.Second) + remaining := 0 + for range expected { + remaining++ + } + // We only need events for UIDs in our expected set that belong to this watcher. + // But we don't know which watcher this is, so just collect all ADDED events until + // we've seen enough or timed out. + for { + select { + case evt, ok := <-w.ResultChan(): + if !ok { + return + } + if evt.Type == watch.Added { + cm, ok := evt.Object.(*v1.ConfigMap) + if !ok { + continue + } + seen[string(cm.UID)] = true + } + case <-timeout: + return + } + } +} + +func TestShardedListFeatureGateDisabled(t *testing.T) { + featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.ShardedListAndWatch, false) + + ctx, client, _, tearDownFn := setup(t) + defer tearDownFn() + + ns := framework.CreateNamespaceOrDie(client, "shard-disabled", t) + defer framework.DeleteNamespaceOrDie(client, ns, t) + + // Create a configmap so the namespace is non-empty. + _, err := client.CoreV1().ConfigMaps(ns.Name).Create(ctx, &v1.ConfigMap{ + ObjectMeta: metav1.ObjectMeta{Name: "test"}, + }, metav1.CreateOptions{}) + if err != nil { + t.Fatalf("failed to create configmap: %v", err) + } + + // List with a shard selector — when the gate is off, the selector should be + // ignored and all objects returned (not sharded). + list, err := client.CoreV1().ConfigMaps(ns.Name).List(ctx, metav1.ListOptions{ + ShardSelector: shardSelectorString(0, 2), + }) + if err != nil { + t.Fatalf("list failed: %v", err) + } + + if list.ShardInfo != nil { + t.Errorf("expected shardInfo=nil when feature gate is disabled") + } + if len(list.Items) != 1 { + t.Errorf("expected 1 item (selector ignored), got %d", len(list.Items)) + } +} + +func TestShardedListComplete(t *testing.T) { + featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.ShardedListAndWatch, true) + + ctx, client, _, tearDownFn := setup(t) + defer tearDownFn() + + ns := framework.CreateNamespaceOrDie(client, "shard-complete", t) + defer framework.DeleteNamespaceOrDie(client, ns, t) + + // An empty/everything selector should return all objects without sharded=true. + const numObjects = 5 + for range numObjects { + _, err := client.CoreV1().ConfigMaps(ns.Name).Create(ctx, &v1.ConfigMap{ + ObjectMeta: metav1.ObjectMeta{ + GenerateName: "complete-", + }, + }, metav1.CreateOptions{}) + if err != nil { + t.Fatalf("failed to create configmap: %v", err) + } + } + + // A single shard covering the full range (shard 0 of 1) has empty start and end, + // which means no selector string — just use a normal list. + list, err := client.CoreV1().ConfigMaps(ns.Name).List(ctx, metav1.ListOptions{}) + if err != nil { + t.Fatalf("list failed: %v", err) + } + if list.ShardInfo != nil { + t.Errorf("expected shardInfo=nil for unsharded list") + } + if len(list.Items) != numObjects { + t.Errorf("expected %d items, got %d", numObjects, len(list.Items)) + } +}