diff --git a/staging/src/k8s.io/apimachinery/pkg/runtime/serializer/cbor/cbor.go b/staging/src/k8s.io/apimachinery/pkg/runtime/serializer/cbor/cbor.go index 118579ec355..e5730e3c503 100644 --- a/staging/src/k8s.io/apimachinery/pkg/runtime/serializer/cbor/cbor.go +++ b/staging/src/k8s.io/apimachinery/pkg/runtime/serializer/cbor/cbor.go @@ -68,8 +68,9 @@ type Serializer interface { var _ Serializer = &serializer{} type options struct { - strict bool - transcode bool + strict bool + transcode bool + streamingCollectionsEncoding bool } type Option func(*options) @@ -92,6 +93,13 @@ func Transcode(s bool) Option { } } +// StreamingCollectionsEncoding is used for testing purposes only. +func StreamingCollectionsEncoding(s bool) Option { + return func(opts *options) { + opts.streamingCollectionsEncoding = s + } +} + type serializer struct { metaFactory metaFactory creater runtime.ObjectCreater @@ -114,6 +122,7 @@ func newSerializer(metaFactory metaFactory, creater runtime.ObjectCreater, typer typer: typer, } s.options.transcode = true + s.options.streamingCollectionsEncoding = true for _, o := range options { o(&s.options) } @@ -147,15 +156,25 @@ func (s *serializer) EncodeNondeterministic(obj runtime.Object, w io.Writer) err } func (s *serializer) encode(mode modes.EncMode, obj runtime.Object, w io.Writer) error { + if _, err := w.Write(selfDescribedCBOR); err != nil { + return err + } + + if s.options.streamingCollectionsEncoding { + ok, err := streamEncodeCollections(obj, w, mode) + if err != nil { + return err + } + if ok { + return nil + } + } + var v interface{} = obj if u, ok := obj.(runtime.Unstructured); ok { v = u.UnstructuredContent() } - if _, err := w.Write(selfDescribedCBOR); err != nil { - return err - } - return mode.MarshalTo(v, w) } diff --git a/staging/src/k8s.io/apimachinery/pkg/runtime/serializer/cbor/collections.go b/staging/src/k8s.io/apimachinery/pkg/runtime/serializer/cbor/collections.go new file mode 100644 index 00000000000..97804e86741 --- /dev/null +++ b/staging/src/k8s.io/apimachinery/pkg/runtime/serializer/cbor/collections.go @@ -0,0 +1,288 @@ +/* +Copyright The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package cbor + +import ( + "encoding/json" + "fmt" + "io" + "maps" + "math/rand" + "slices" + "sort" + + "k8s.io/apimachinery/pkg/api/meta" + "k8s.io/apimachinery/pkg/conversion" + "k8s.io/klog/v2" + + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/runtime/serializer/cbor/internal/modes" + + "github.com/fxamacker/cbor/v2" +) + +func streamEncodeCollections(obj runtime.Object, w io.Writer, mode modes.EncMode) (bool, error) { + list, ok := obj.(*unstructured.UnstructuredList) + if ok { + return true, streamingEncodeUnstructuredList(w, list, mode) + } + if _, ok := obj.(cbor.Marshaler); ok { + klog.InfoS("is implement cbor.Marshaler", "obj", obj) + return false, nil + } + if _, ok := obj.(json.Marshaler); ok { + klog.InfoS("is implement json.Marshaler", "obj", obj) + return false, nil + } + typeMeta, listMeta, items, err := getListMeta(obj) + if err == nil { + return true, streamingEncodeList(w, typeMeta, listMeta, items, mode) + } + klog.ErrorS(err, "getListMeta err", "obj", obj) + return false, nil +} + +// getListMeta implements list extraction logic for cbor stream serialization. +func getListMeta(list runtime.Object) (metav1.TypeMeta, metav1.ListMeta, []runtime.Object, error) { + listValue, err := conversion.EnforcePtr(list) + if err != nil { + return metav1.TypeMeta{}, metav1.ListMeta{}, nil, err + } + listType := listValue.Type() + if listType.NumField() != 3 { + return metav1.TypeMeta{}, metav1.ListMeta{}, nil, fmt.Errorf("expected ListType to have 3 fields") + } + // TypeMeta + typeMeta, ok := listValue.Field(0).Interface().(metav1.TypeMeta) + if !ok { + return metav1.TypeMeta{}, metav1.ListMeta{}, nil, fmt.Errorf("expected TypeMeta field to have TypeMeta type") + } + if !listType.Field(0).Anonymous { + return metav1.TypeMeta{}, metav1.ListMeta{}, nil, fmt.Errorf(`expected TypeMeta json field tag to be embedded`) + } + if jsonTag, jsonTagExists := listType.Field(0).Tag.Lookup("json"); !jsonTagExists { + return metav1.TypeMeta{}, metav1.ListMeta{}, nil, fmt.Errorf(`expected TypeMeta json field tag`) + } else if jsonTag != "" && jsonTag != ",inline" { + return metav1.TypeMeta{}, metav1.ListMeta{}, nil, fmt.Errorf(`expected TypeMeta json field tag to be "" or ",inline"`) + } + // ListMeta + listMeta, ok := listValue.Field(1).Interface().(metav1.ListMeta) + if !ok { + return metav1.TypeMeta{}, metav1.ListMeta{}, nil, fmt.Errorf("expected ListMeta field to have ListMeta type") + } + if listType.Field(1).Tag.Get("json") != "metadata,omitempty" { + return metav1.TypeMeta{}, metav1.ListMeta{}, nil, fmt.Errorf(`expected ListMeta json field tag to be "metadata,omitempty"`) + } + // Items + items, err := meta.ExtractList(list) + if err != nil { + return metav1.TypeMeta{}, metav1.ListMeta{}, nil, err + } + if listType.Field(2).Tag.Get("json") != "items" { + return metav1.TypeMeta{}, metav1.ListMeta{}, nil, fmt.Errorf(`expected Items json field tag to be "items"`) + } + return typeMeta, listMeta, items, nil +} + +type cborMapEntry struct { + key string + write func() error +} + +func streamingEncodeList(w io.Writer, typeMeta metav1.TypeMeta, listMeta metav1.ListMeta, items []runtime.Object, mode modes.EncMode) error { + var entries []cborMapEntry + + if typeMeta.Kind != "" { + entries = append(entries, cborMapEntry{ + key: "kind", + write: func() error { + return encodeKeyValuePair(w, "kind", typeMeta.Kind, mode) + }, + }) + } + entries = append(entries, cborMapEntry{ + key: "items", + write: func() error { + if err := mode.MarshalTo("items", w); err != nil { + return err + } + if items == nil { + _, err := w.Write([]byte{0xf6}) // CBOR null + return err + } + if err := writeArrayHead(w, len(items)); err != nil { + return err + } + for _, item := range items { + if err := mode.MarshalTo(item, w); err != nil { + return err + } + } + return nil + }, + }) + entries = append(entries, cborMapEntry{ + key: "metadata", + write: func() error { + return encodeKeyValuePair(w, "metadata", listMeta, mode) + }, + }) + if typeMeta.APIVersion != "" { + entries = append(entries, cborMapEntry{ + key: "apiVersion", + write: func() error { + return encodeKeyValuePair(w, "apiVersion", typeMeta.APIVersion, mode) + }, + }) + } + + // For nondeterministic modes (SortFastShuffle), randomize the initial offset + // of the encoding for-loop. + start := 0 + if !mode.IsDeterministic() && len(entries) > 0 { + start = rand.Intn(len(entries)) + } + + if err := writeMapHead(w, len(entries)); err != nil { + return err + } + + for i := 0; i < len(entries); i++ { + entry := entries[(start+i)%len(entries)] + if err := entry.write(); err != nil { + return err + } + } + return nil +} + +func streamingEncodeUnstructuredList(w io.Writer, list *unstructured.UnstructuredList, mode modes.EncMode) error { + keys := slices.Collect(maps.Keys(list.Object)) + if _, exists := list.Object["items"]; !exists { + keys = append(keys, "items") + } + // Sort keys only for deterministic modes (SortBytewiseLexical): + // shorter lengths come first, then lexicographic by content. + // For nondeterministic modes (SortFastShuffle), randomize the initial offset + // of the encoding for-loop (essentially what SortFastShuffle does for structs). + start := 0 + if mode.IsDeterministic() { + sort.Slice(keys, func(i, j int) bool { + if len(keys[i]) != len(keys[j]) { + return len(keys[i]) < len(keys[j]) + } + return keys[i] < keys[j] + }) + } + + if err := writeMapHead(w, len(keys)); err != nil { + return err + } + + for i := 0; i < len(keys); i++ { + key := keys[(start+i)%len(keys)] + if err := mode.MarshalTo(key, w); err != nil { + return err + } + if key == "items" { + if err := writeArrayHead(w, len(list.Items)); err != nil { + return err + } + for _, item := range list.Items { + if err := mode.MarshalTo(item.Object, w); err != nil { + return err + } + } + } else { + if err := mode.MarshalTo(list.Object[key], w); err != nil { + return err + } + } + } + return nil +} + +func encodeKeyValuePair(w io.Writer, key string, value interface{}, mode modes.EncMode) error { + if err := mode.MarshalTo(key, w); err != nil { + return err + } + if err := mode.MarshalTo(value, w); err != nil { + return err + } + return nil +} + +// writeMapHead writes a CBOR map header for a map with n entries. +// Uses major type 5 (0xa0 base), following RFC 8949 Section 3.1. +func writeMapHead(w io.Writer, n int) error { + return writeCollectionHead(w, 0xa0, int64(n)) +} + +// writeArrayHead writes a CBOR array header for an array with n elements. +// Uses major type 4 (0x80 base), following RFC 8949 Section 3.1. +func writeArrayHead(w io.Writer, n int) error { + return writeCollectionHead(w, 0x80, int64(n)) +} + +// writeCollectionHead writes a CBOR collection (array or map) header encoding +// the number of elements n, following RFC 8949 Section 3 additional info rules: +// +// - base: the prefix byte for the collection type. +// For maps: 0xa0 (major type 5), for arrays: 0x80 (major type 4). +// +// The extended form prefixes are derived from base using bitwise OR: +// - base|24: 1-byte length follows (additional info 24) +// - base|25: 2-byte length follows (additional info 25) +// - base|26: 4-byte length follows (additional info 26) +// - base|27: 8-byte length follows (additional info 27) +// +// Encoding table (map example, base=0xa0): +// +// n <= 23: 1 byte — 0xa0|n +// n <= 0xFF: 2 bytes — 0xb8 (0xa0|24), n +// n <= 0xFFFF: 3 bytes — 0xb9 (0xa0|25), n>>8, n +// n <= 0xFFFFFFFF: 5 bytes — 0xba (0xa0|26), n>>24..n +// n > 0xFFFFFFFF: 9 bytes — 0xbb (0xa0|27), n>>56..n +func writeCollectionHead(w io.Writer, base byte, n int64) error { + switch { + case n <= 23: + // Additional info 0–23: length is encoded directly in the low 5 bits. + _, err := w.Write([]byte{base + byte(n)}) + return err + case n <= 0xFF: + // Additional info 24: one additional byte carries the length. + _, err := w.Write([]byte{base | 24, byte(n)}) + return err + case n <= 0xFFFF: + // Additional info 25: two additional bytes carry the length (big-endian). + _, err := w.Write([]byte{base | 25, byte(n >> 8), byte(n)}) + return err + case n <= 0xFFFFFFFF: + // Additional info 26: four additional bytes carry the length (big-endian). + _, err := w.Write([]byte{base | 26, byte(n >> 24), byte(n >> 16), byte(n >> 8), byte(n)}) + return err + default: + // Additional info 27: eight additional bytes carry the length (big-endian). + _, err := w.Write([]byte{ + base | 27, byte(n >> 56), byte(n >> 48), byte(n >> 40), byte(n >> 32), byte(n >> 24), byte(n >> 16), + byte(n >> 8), byte(n), + }) + return err + } +} diff --git a/staging/src/k8s.io/apimachinery/pkg/runtime/serializer/cbor/collections_test.go b/staging/src/k8s.io/apimachinery/pkg/runtime/serializer/cbor/collections_test.go new file mode 100644 index 00000000000..72749c59fab --- /dev/null +++ b/staging/src/k8s.io/apimachinery/pkg/runtime/serializer/cbor/collections_test.go @@ -0,0 +1,1076 @@ +/* +Copyright The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package cbor + +import ( + "bytes" + "fmt" + "testing" + + "github.com/google/go-cmp/cmp" + + "sigs.k8s.io/randfill" + + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" + testapigroupv1 "k8s.io/apimachinery/pkg/apis/testapigroup/v1" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/runtime/serializer/cbor/internal/modes" +) + +// TestStreamingCollectionsEncoding verifies that streaming encoding produces +// output identical to normal non-streaming encoding, and that the streaming +// encoder actually uses multiple Write calls (not just buffering everything). +func TestStreamingCollectionsEncoding(t *testing.T) { + var buf writeCountingBuffer + var remainingItems int64 = 1 + for _, tc := range []struct { + name string + in runtime.Object + cannotStream bool + }{ + // Preserving the distinction between integers and floating-point numbers + { + name: "Struct with floats", + in: &StructWithFloatsList{ + Items: []StructWithFloats{ + { + Int: 1, + Float32: float32(1), + Float64: 1.1, + }, + }, + }, + }, + { + name: "Unstructured object float", + in: &unstructured.UnstructuredList{ + Object: map[string]interface{}{ + "int": 1, + "float32": float32(1), + "float64": 1.1, + }, + }, + }, + { + name: "Unstructured items float", + in: &unstructured.UnstructuredList{ + Items: []unstructured.Unstructured{ + { + Object: map[string]interface{}{ + "int": 1, + "float32": float32(1), + "float64": 1.1, + }, + }, + }, + }, + }, + // Handling structs with duplicate field names (JSON tag names) without producing duplicate keys in the encoded output + { + name: "StructWithDuplicatedTags", + in: &StructWithDuplicatedTagsList{ + Items: []StructWithDuplicatedTags{ + { + Key1: "key1", + Key2: "key2", + }, + }, + }, + }, + // Encoding Go strings containing invalid UTF-8 sequences without error + { + name: "UnstructuredList object invalid UTF-8", + in: &unstructured.UnstructuredList{ + Object: map[string]interface{}{ + "key": "\x80", // first byte is a continuation byte + }, + }, + }, + { + name: "UnstructuredList items invalid UTF-8", + in: &unstructured.UnstructuredList{ + Items: []unstructured.Unstructured{ + { + Object: map[string]interface{}{ + "key": "\x80", + }, + }, + }, + }, + }, + // Preserving the distinction between absent, present-but-null, and present-and-empty states for slices and maps + { + name: "CarpList items nil", + in: &testapigroupv1.CarpList{ + Items: nil, + }, + }, + { + name: "CarpList slice nil", + in: &testapigroupv1.CarpList{ + Items: []testapigroupv1.Carp{ + { + Status: testapigroupv1.CarpStatus{ + Conditions: nil, + }, + }, + }, + }, + }, + { + name: "CarpList map nil", + in: &testapigroupv1.CarpList{ + Items: []testapigroupv1.Carp{ + { + Spec: testapigroupv1.CarpSpec{ + NodeSelector: nil, + }, + }, + }, + }, + }, + { + name: "UnstructuredList items nil", + in: &unstructured.UnstructuredList{ + Items: nil, + }, + }, + { + name: "UnstructuredList items slice nil", + in: &unstructured.UnstructuredList{ + Items: []unstructured.Unstructured{ + { + Object: map[string]interface{}{ + "slice": ([]string)(nil), + }, + }, + }, + }, + }, + { + name: "UnstructuredList items map nil", + in: &unstructured.UnstructuredList{ + Items: []unstructured.Unstructured{ + { + Object: map[string]interface{}{ + "map": (map[string]string)(nil), + }, + }, + }, + }, + }, + { + name: "UnstructuredList object nil", + in: &unstructured.UnstructuredList{ + Object: nil, + }, + }, + { + name: "UnstructuredList object slice nil", + in: &unstructured.UnstructuredList{ + Object: map[string]interface{}{ + "slice": ([]string)(nil), + }, + }, + }, + { + name: "UnstructuredList object map nil", + in: &unstructured.UnstructuredList{ + Object: map[string]interface{}{ + "map": (map[string]string)(nil), + }, + }, + }, + { + name: "CarpList items empty", + in: &testapigroupv1.CarpList{ + Items: []testapigroupv1.Carp{}, + }, + }, + { + name: "CarpList slice empty", + in: &testapigroupv1.CarpList{ + Items: []testapigroupv1.Carp{ + { + Status: testapigroupv1.CarpStatus{ + Conditions: []testapigroupv1.CarpCondition{}, + }, + }, + }, + }, + }, + { + name: "CarpList map empty", + in: &testapigroupv1.CarpList{ + Items: []testapigroupv1.Carp{ + { + Spec: testapigroupv1.CarpSpec{ + NodeSelector: map[string]string{}, + }, + }, + }, + }, + }, + { + name: "UnstructuredList items empty", + in: &unstructured.UnstructuredList{ + Items: []unstructured.Unstructured{}, + }, + }, + { + name: "UnstructuredList items slice empty", + in: &unstructured.UnstructuredList{ + Items: []unstructured.Unstructured{ + { + Object: map[string]interface{}{ + "slice": []string{}, + }, + }, + }, + }, + }, + { + name: "UnstructuredList items map empty", + in: &unstructured.UnstructuredList{ + Items: []unstructured.Unstructured{ + { + Object: map[string]interface{}{ + "map": map[string]string{}, + }, + }, + }, + }, + }, + { + name: "UnstructuredList object empty", + in: &unstructured.UnstructuredList{ + Object: map[string]interface{}{}, + }, + }, + { + name: "UnstructuredList object slice empty", + in: &unstructured.UnstructuredList{ + Object: map[string]interface{}{ + "slice": []string{}, + }, + }, + }, + { + name: "UnstructuredList object map empty", + in: &unstructured.UnstructuredList{ + Object: map[string]interface{}{ + "map": map[string]string{}, + }, + }, + }, + // Handling structs implementing json.Marshaler method + { + name: "List with json.Marshaler cannot be streamed", + in: &ListWithMarshalJSONList{}, + cannotStream: true, + }, + { + name: "Struct with json.Marshaler", + in: &StructWithMarshalJSONList{ + Items: []StructWithMarshalJSON{ + {}, + }, + }, + }, + // Handling structs implementing json.Marshaler but NOT cbor.Marshaler + { + name: "List with json.Marshaler but no cbor.Marshaler cannot be streamed", + in: &ListWithMarshalJSONNoCBORList{}, + cannotStream: true, + }, + { + name: "Struct with json.Marshaler but no cbor.Marshaler", + in: &StructWithMarshalJSONNoCBORList{ + Items: []StructWithMarshalJSONNoCBOR{ + {}, + }, + }, + }, + // Handling raw bytes. + { + name: "Struct with raw bytes", + in: &StructWithRawBytesList{ + Items: []StructWithRawBytes{ + { + Slice: []byte{0x01, 0x02, 0x03}, + Array: [3]byte{0x01, 0x02, 0x03}, + }, + }, + }, + }, + { + name: "UnstructuredList object raw bytes", + in: &unstructured.UnstructuredList{ + Object: map[string]interface{}{ + "slice": []byte{0x01, 0x02, 0x03}, + "array": [3]byte{0x01, 0x02, 0x03}, + }, + }, + }, + { + name: "UnstructuredList items raw bytes", + in: &unstructured.UnstructuredList{ + Items: []unstructured.Unstructured{ + { + Object: map[string]interface{}{ + "slice": []byte{0x01, 0x02, 0x03}, + "array": [3]byte{0x01, 0x02, 0x03}, + }, + }, + }, + }, + }, + // Other scenarios: + { + name: "List just kind", + in: &testapigroupv1.CarpList{ + TypeMeta: metav1.TypeMeta{ + Kind: "List", + }, + }, + }, + { + name: "List just apiVersion", + in: &testapigroupv1.CarpList{ + TypeMeta: metav1.TypeMeta{ + APIVersion: "v1", + }, + }, + }, + { + name: "List no elements", + in: &testapigroupv1.CarpList{ + TypeMeta: metav1.TypeMeta{ + Kind: "List", + APIVersion: "v1", + }, + ListMeta: metav1.ListMeta{ + ResourceVersion: "2345", + }, + Items: []testapigroupv1.Carp{}, + }, + }, + { + name: "List one element with continue", + in: &testapigroupv1.CarpList{ + TypeMeta: metav1.TypeMeta{ + Kind: "List", + APIVersion: "v1", + }, + ListMeta: metav1.ListMeta{ + ResourceVersion: "2345", + Continue: "abc", + RemainingItemCount: &remainingItems, + }, + Items: []testapigroupv1.Carp{ + { + TypeMeta: metav1.TypeMeta{APIVersion: "v1", Kind: "Carp"}, ObjectMeta: metav1.ObjectMeta{ + Name: "pod", + Namespace: "default", + }, + }, + }, + }, + }, + { + name: "List two elements", + in: &testapigroupv1.CarpList{ + TypeMeta: metav1.TypeMeta{ + Kind: "List", + APIVersion: "v1", + }, + ListMeta: metav1.ListMeta{ + ResourceVersion: "2345", + }, + Items: []testapigroupv1.Carp{ + { + TypeMeta: metav1.TypeMeta{APIVersion: "v1", Kind: "Carp"}, ObjectMeta: metav1.ObjectMeta{ + Name: "pod", + Namespace: "default", + }, + }, + { + TypeMeta: metav1.TypeMeta{APIVersion: "v1", Kind: "Carp"}, ObjectMeta: metav1.ObjectMeta{ + Name: "pod2", + Namespace: "default2", + }, + }, + }, + }, + }, + { + name: "List with extra field cannot be streamed", + in: &ListWithAdditionalFields{ + TypeMeta: metav1.TypeMeta{ + Kind: "List", + APIVersion: "v1", + }, + ListMeta: metav1.ListMeta{ + ResourceVersion: "2345", + }, + Items: []testapigroupv1.Carp{}, + }, + cannotStream: true, + }, + { + name: "Not a collection cannot be streamed", + in: &testapigroupv1.Carp{ + TypeMeta: metav1.TypeMeta{ + Kind: "List", + APIVersion: "v1", + }, + }, + cannotStream: true, + }, + { + name: "UnstructuredList empty", + in: &unstructured.UnstructuredList{}, + }, + { + name: "UnstructuredList just kind", + in: &unstructured.UnstructuredList{ + Object: map[string]interface{}{"kind": "List"}, + }, + }, + { + name: "UnstructuredList just apiVersion", + in: &unstructured.UnstructuredList{ + Object: map[string]interface{}{"apiVersion": "v1"}, + }, + }, + { + name: "UnstructuredList no elements", + in: &unstructured.UnstructuredList{ + Object: map[string]interface{}{ + "kind": "List", "apiVersion": "v1", "metadata": map[string]interface{}{"resourceVersion": "2345"}, + }, + Items: []unstructured.Unstructured{}, + }, + }, + { + name: "UnstructuredList one element with continue", + in: &unstructured.UnstructuredList{ + Object: map[string]interface{}{ + "kind": "List", "apiVersion": "v1", "metadata": map[string]interface{}{ + "resourceVersion": "2345", + "continue": "abc", + "remainingItemCount": "1", + }, + }, + Items: []unstructured.Unstructured{ + { + Object: map[string]interface{}{ + "apiVersion": "v1", + "kind": "Carp", + "metadata": map[string]interface{}{ + "name": "pod", + "namespace": "default", + }, + }, + }, + }, + }, + }, + { + name: "UnstructuredList two elements", + in: &unstructured.UnstructuredList{ + Object: map[string]interface{}{ + "kind": "List", "apiVersion": "v1", "metadata": map[string]interface{}{ + "resourceVersion": "2345", + }, + }, + Items: []unstructured.Unstructured{ + { + Object: map[string]interface{}{ + "apiVersion": "v1", + "kind": "Carp", + "metadata": map[string]interface{}{ + "name": "pod", + "namespace": "default", + }, + }, + }, + { + Object: map[string]interface{}{ + "apiVersion": "v1", + "kind": "Carp", + "metadata": map[string]interface{}{ + "name": "pod2", + "namespace": "default", + }, + }, + }, + }, + }, + }, + { + name: "UnstructuredList conflict on items", + in: &unstructured.UnstructuredList{ + Object: map[string]interface{}{ + "items": []unstructured.Unstructured{ + { + Object: map[string]interface{}{ + "name": "pod", + }, + }, + }, + }, + Items: []unstructured.Unstructured{ + { + Object: map[string]interface{}{ + "name": "pod2", + }, + }, + }, + }, + }, + } { + t.Run(tc.name, func(t *testing.T) { + buf.Reset() + s := NewSerializer(nil, nil, StreamingCollectionsEncoding(true)) + if err := s.Encode(tc.in, &buf); err != nil { + t.Fatalf("unexpected error: %v", err) + } + + var normalBuf bytes.Buffer + normalS := NewSerializer(nil, nil, StreamingCollectionsEncoding(false)) + if err := normalS.Encode(tc.in, &normalBuf); err != nil { + t.Fatalf("normal encode error: %v", err) + } + + if diff := cmp.Diff(buf.Bytes(), normalBuf.Bytes()); diff != "" { + t.Errorf("streaming and normal encoding differ:\n%s", diff) + } + + expectStreaming := !tc.cannotStream + if expectStreaming && buf.writeCount <= 2 { + t.Errorf("expected streaming but Write was called only: %d", buf.writeCount) + } + if !expectStreaming && buf.writeCount > 2 { + t.Errorf("expected non-streaming but Write was called more than once: %d", buf.writeCount) + } + }) + } +} + +type StructWithFloatsList struct { + metav1.TypeMeta `json:",inline"` + metav1.ListMeta `json:"metadata,omitempty" protobuf:"bytes,1,opt,name=metadata"` + Items []StructWithFloats `json:"items" protobuf:"bytes,2,rep,name=items"` +} + +func (l *StructWithFloatsList) DeepCopyObject() runtime.Object { + return nil +} + +type StructWithFloats struct { + metav1.TypeMeta `json:",inline"` + metav1.ObjectMeta `json:"metadata,omitempty" protobuf:"bytes,1,opt,name=metadata"` + + Int int + Float32 float32 + Float64 float64 +} + +func (s *StructWithFloats) DeepCopyObject() runtime.Object { + return nil +} + +type StructWithDuplicatedTagsList struct { + metav1.TypeMeta `json:",inline"` + metav1.ListMeta `json:"metadata,omitempty" protobuf:"bytes,1,opt,name=metadata"` + Items []StructWithDuplicatedTags `json:"items" protobuf:"bytes,2,rep,name=items"` +} + +func (l *StructWithDuplicatedTagsList) DeepCopyObject() runtime.Object { + return nil +} + +type StructWithDuplicatedTags struct { + metav1.TypeMeta `json:",inline"` + metav1.ObjectMeta `json:"metadata,omitempty" protobuf:"bytes,1,opt,name=metadata"` + + Key1 string `json:"key"` + Key2 string `json:"key"` //nolint:govet +} + +func (s *StructWithDuplicatedTags) DeepCopyObject() runtime.Object { + return nil +} + +type ListWithMarshalJSONList struct { + metav1.TypeMeta `json:",inline"` + metav1.ListMeta `json:"metadata,omitempty" protobuf:"bytes,1,opt,name=metadata"` + Items []string `json:"items" protobuf:"bytes,2,rep,name=items"` +} + +func (l *ListWithMarshalJSONList) DeepCopyObject() runtime.Object { + return nil +} + +func (l *ListWithMarshalJSONList) MarshalJSON() ([]byte, error) { + return []byte(`"marshallJSON"`), nil +} + +type StructWithMarshalJSONList struct { + metav1.TypeMeta `json:",inline"` + metav1.ListMeta `json:"metadata,omitempty" protobuf:"bytes,1,opt,name=metadata"` + Items []StructWithMarshalJSON `json:"items" protobuf:"bytes,2,rep,name=items"` +} + +func (s *StructWithMarshalJSONList) DeepCopyObject() runtime.Object { + return nil +} + +type StructWithMarshalJSON struct { + metav1.TypeMeta `json:",inline"` + metav1.ObjectMeta `json:"metadata,omitempty" protobuf:"bytes,1,opt,name=metadata"` +} + +func (l *StructWithMarshalJSON) DeepCopyObject() runtime.Object { + return nil +} + +func (l *StructWithMarshalJSON) MarshalJSON() ([]byte, error) { + return []byte(`"marshallJSON"`), nil +} + +type ListWithMarshalJSONNoCBORList struct { + metav1.TypeMeta `json:",inline"` + metav1.ListMeta `json:"metadata,omitempty" protobuf:"bytes,1,opt,name=metadata"` + Items []StructWithMarshalJSONNoCBOR `json:"items" protobuf:"bytes,2,rep,name=items"` +} + +func (l *ListWithMarshalJSONNoCBORList) DeepCopyObject() runtime.Object { + return nil +} + +func (l *ListWithMarshalJSONNoCBORList) MarshalJSON() ([]byte, error) { + return []byte(`"marshalJSONNoCBOR"`), nil +} + +type StructWithMarshalJSONNoCBORList struct { + metav1.TypeMeta `json:",inline"` + metav1.ListMeta `json:"metadata,omitempty" protobuf:"bytes,1,opt,name=metadata"` + Items []StructWithMarshalJSONNoCBOR `json:"items" protobuf:"bytes,2,rep,name=items"` +} + +func (s *StructWithMarshalJSONNoCBORList) DeepCopyObject() runtime.Object { + return nil +} + +type StructWithMarshalJSONNoCBOR struct { + metav1.TypeMeta `json:",inline"` + metav1.ObjectMeta `json:"metadata,omitempty" protobuf:"bytes,1,opt,name=metadata"` +} + +func (l *StructWithMarshalJSONNoCBOR) DeepCopyObject() runtime.Object { + return nil +} + +func (l *StructWithMarshalJSONNoCBOR) MarshalJSON() ([]byte, error) { + return []byte(`"marshalJSONNoCBOR"`), nil +} + +type StructWithRawBytesList struct { + metav1.TypeMeta `json:",inline"` + metav1.ListMeta `json:"metadata,omitempty" protobuf:"bytes,1,opt,name=metadata"` + Items []StructWithRawBytes `json:"items" protobuf:"bytes,2,rep,name=items"` +} + +func (s *StructWithRawBytesList) DeepCopyObject() runtime.Object { + return nil +} + +type StructWithRawBytes struct { + metav1.TypeMeta `json:",inline"` + metav1.ObjectMeta `json:"metadata,omitempty" protobuf:"bytes,1,opt,name=metadata"` + Slice []byte + Array [3]byte +} + +func (s *StructWithRawBytes) DeepCopyObject() runtime.Object { + return nil +} + +type ListWithAdditionalFields struct { + metav1.TypeMeta `json:",inline"` + metav1.ListMeta `json:"metadata,omitempty" protobuf:"bytes,1,opt,name=metadata"` + Items []testapigroupv1.Carp `json:"items" protobuf:"bytes,2,rep,name=items"` + AdditionalField int +} + +func (s *ListWithAdditionalFields) DeepCopyObject() runtime.Object { + return nil +} + +type writeCountingBuffer struct { + writeCount int + bytes.Buffer +} + +func (b *writeCountingBuffer) Write(data []byte) (int, error) { + b.writeCount++ + return b.Buffer.Write(data) +} + +func (b *writeCountingBuffer) Reset() { + b.writeCount = 0 + b.Buffer.Reset() +} + +func TestFuzzCollectionsEncoding(t *testing.T) { + disableFuzzFieldsV1 := func(field *metav1.FieldsV1, c randfill.Continue) {} + fuzzUnstructuredList := func(list *unstructured.UnstructuredList, c randfill.Continue) { + list.Object = map[string]interface{}{ + "kind": "List", + "apiVersion": "v1", + c.String(0): c.String(0), + c.String(0): int64(c.Intn(1000000)), // Limit to int64 range + c.String(0): c.Bool(), + "metadata": map[string]interface{}{ + "resourceVersion": fmt.Sprintf("%d", c.Intn(1000000)), // String format + "continue": c.String(0), + "remainingItemCount": fmt.Sprintf("%d", c.Intn(1000000)), // String format + c.String(0): c.String(0), + }, + } + c.Fill(&list.Items) + } + fuzzMap := func(kvs map[string]interface{}, c randfill.Continue) { + kvs[c.String(0)] = c.Bool() + kvs[c.String(0)] = int64(c.Intn(1000000)) // Limit to int64 range + kvs[c.String(0)] = c.String(0) + } + f := randfill.New().Funcs(disableFuzzFieldsV1, fuzzUnstructuredList, fuzzMap) + streamingBuffer := &bytes.Buffer{} + normalSerializer := NewSerializer(nil, nil) + normalBuffer := &bytes.Buffer{} + t.Run("CarpList", func(t *testing.T) { + for range 1000 { + list := &testapigroupv1.CarpList{} + f.Fill(list) + streamingBuffer.Reset() + normalBuffer.Reset() + if _, err := streamingBuffer.Write(selfDescribedCBOR); err != nil { + t.Fatalf("unexpected error: %v", err) + } + ok, err := streamEncodeCollections(list, streamingBuffer, modes.Encode) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + if !ok { + t.Fatalf("expected streaming encoder to encode %T", list) + } + if err := normalSerializer.Encode(list, normalBuffer); err != nil { + t.Fatal(err) + } + if diff := cmp.Diff(normalBuffer.Bytes(), streamingBuffer.Bytes()); diff != "" { + t.Logf("normal: %x", normalBuffer.Bytes()) + t.Logf("streaming: %x", streamingBuffer.Bytes()) + t.Errorf("not matching:\n%s", diff) + } + } + }) + t.Run("UnstructuredList", func(t *testing.T) { + for range 1000 { + list := &unstructured.UnstructuredList{} + f.Fill(list) + streamingBuffer.Reset() + normalBuffer.Reset() + if _, err := streamingBuffer.Write(selfDescribedCBOR); err != nil { + t.Fatalf("unexpected error: %v", err) + } + ok, err := streamEncodeCollections(list, streamingBuffer, modes.Encode) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + if !ok { + t.Fatalf("expected streaming encoder to encode %T", list) + } + if err := normalSerializer.Encode(list, normalBuffer); err != nil { + t.Fatal(err) + } + if diff := cmp.Diff(normalBuffer.Bytes(), streamingBuffer.Bytes()); diff != "" { + t.Logf("normal: %x", normalBuffer.Bytes()) + t.Logf("streaming: %x", streamingBuffer.Bytes()) + t.Errorf("not matching:\n%s", diff) + } + } + }) + // Test EncodeNondeterministic: key order may differ, but output must decode to the same object. + t.Run("CarpList/Nondeterministic", func(t *testing.T) { + for i := range 100 { + list := &testapigroupv1.CarpList{} + f.Fill(list) + + // Encode with deterministic mode as reference. + var detBuffer bytes.Buffer + ok, err := streamEncodeCollections(list, &detBuffer, modes.Encode) + if err != nil { + t.Fatalf("deterministic encode error: %v", err) + } + if !ok { + t.Fatalf("expected streaming encoder to encode %T", list) + } + + // Decode deterministic output to interface{} for semantic comparison. + var detObj interface{} + if err := modes.Decode.Unmarshal(detBuffer.Bytes(), &detObj); err != nil { + t.Fatalf("decode deterministic output: %v", err) + } + + // Encode with nondeterministic mode. + streamingBuffer.Reset() + ok, err = streamEncodeCollections(list, streamingBuffer, modes.EncodeNondeterministic) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + if !ok { + t.Fatalf("expected streaming encoder to encode %T", list) + } + + // Semantic correctness: decoded nondeterministic output must equal deterministic reference. + payload := streamingBuffer.Bytes() + var ndetObj interface{} + if err := modes.Decode.Unmarshal(payload, &ndetObj); err != nil { + t.Fatalf("decode nondeterministic output: %v", err) + } + if diff := cmp.Diff(detObj, ndetObj); diff != "" { + t.Errorf("trial %d: semantic mismatch between deterministic and nondeterministic:\n%s", i, diff) + } + } + }) + t.Run("UnstructuredList/Nondeterministic", func(t *testing.T) { + for i := range 100 { + list := &unstructured.UnstructuredList{} + f.Fill(list) + + // Encode with deterministic mode as reference. + var detBuffer bytes.Buffer + ok, err := streamEncodeCollections(list, &detBuffer, modes.Encode) + if err != nil { + t.Fatalf("deterministic encode error: %v", err) + } + if !ok { + t.Fatalf("expected streaming encoder to encode %T", list) + } + + // Decode deterministic output to map[string]interface{} for semantic comparison. + var detObj map[string]interface{} + if err := modes.Decode.Unmarshal(detBuffer.Bytes(), &detObj); err != nil { + t.Fatalf("decode deterministic output: %v", err) + } + + // Encode with nondeterministic mode. + streamingBuffer.Reset() + ok, err = streamEncodeCollections(list, streamingBuffer, modes.EncodeNondeterministic) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + if !ok { + t.Fatalf("expected streaming encoder to encode %T", list) + } + + // Semantic correctness: decoded nondeterministic output must equal deterministic reference. + payload := streamingBuffer.Bytes() + var ndetObj map[string]interface{} + if err := modes.Decode.Unmarshal(payload, &ndetObj); err != nil { + t.Fatalf("decode nondeterministic output: %v", err) + } + if diff := cmp.Diff(detObj, ndetObj); diff != "" { + t.Errorf("trial %d: semantic mismatch between deterministic and nondeterministic:\n%s", i, diff) + } + } + }) +} + +// TestStreamEncodeCollectionsDeterministic verifies that streamEncodeCollections +// with modes.Encode produces output identical to the normal non-streaming encoder. +func TestStreamEncodeCollectionsDeterministic(t *testing.T) { + for _, tc := range []struct { + name string + in runtime.Object + }{ + { + name: "CarpList with all top-level fields", + in: &testapigroupv1.CarpList{ + TypeMeta: metav1.TypeMeta{ + Kind: "CarpList", + APIVersion: "v1", + }, + ListMeta: metav1.ListMeta{ + ResourceVersion: "42", + }, + Items: []testapigroupv1.Carp{ + {ObjectMeta: metav1.ObjectMeta{Name: "a"}}, + {ObjectMeta: metav1.ObjectMeta{Name: "b"}}, + {ObjectMeta: metav1.ObjectMeta{Name: "c"}}, + }, + }, + }, + { + name: "UnstructuredList with all top-level fields", + in: &unstructured.UnstructuredList{ + Object: map[string]interface{}{ + "kind": "List", + "apiVersion": "v1", + "metadata": map[string]interface{}{"resourceVersion": "42"}, + }, + Items: []unstructured.Unstructured{ + {Object: map[string]interface{}{"name": "a"}}, + {Object: map[string]interface{}{"name": "b"}}, + {Object: map[string]interface{}{"name": "b"}}, + }, + }, + }, + } { + t.Run(tc.name, func(t *testing.T) { + // Encode with streaming enabled. + var streamingBuf bytes.Buffer + ok, err := streamEncodeCollections(tc.in, &streamingBuf, modes.Encode) + if err != nil { + t.Fatalf("streamEncodeCollections error: %v", err) + } + if !ok { + t.Fatalf("expected streaming encoder to handle %T", tc.in) + } + + // Encode with normal non-streaming encoder. + var normalBuf bytes.Buffer + if err := modes.Encode.MarshalTo(tc.in, &normalBuf); err != nil { + t.Fatalf("normal encode error: %v", err) + } + + // Output must be identical. + if diff := cmp.Diff(normalBuf.Bytes(), streamingBuf.Bytes()); diff != "" { + t.Logf("normal: %x", normalBuf.Bytes()) + t.Logf("streaming: %x", streamingBuf.Bytes()) + t.Errorf("streaming output differs from normal encoding:\n%s", diff) + } + }) + } +} + +// TestStreamEncodeCollectionsNondeterministic verifies that streamEncodeCollections +// with modes.EncodeNondeterministic (SortFastShuffle): +// 1. Semantic correctness: the output decodes to the same object as deterministic encoding. +// 2. Non-idempotence: across multiple trials the key order is observed to vary +// (probabilistic; uses a multi-key object to make the probability of flake negligible). +func TestStreamEncodeCollectionsNondeterministic(t *testing.T) { + // A list with kind+apiVersion+metadata+items = 4 keys. + // With SortFastShuffle the number of possible orderings is 4! = 24. + // Over 200 trials the probability of seeing only 1 unique ordering is (1/24)^199 ≈ 0. + const nTrials = 200 + + for _, tc := range []struct { + name string + in runtime.Object + }{ + { + name: "CarpList", + in: &testapigroupv1.CarpList{ + TypeMeta: metav1.TypeMeta{ + Kind: "CarpList", + APIVersion: "v1", + }, + ListMeta: metav1.ListMeta{ + ResourceVersion: "42", + }, + Items: []testapigroupv1.Carp{ + {ObjectMeta: metav1.ObjectMeta{Name: "a"}}, + {ObjectMeta: metav1.ObjectMeta{Name: "b"}}, + {ObjectMeta: metav1.ObjectMeta{Name: "c"}}, + }, + }, + }, + { + name: "UnstructuredList", + in: &unstructured.UnstructuredList{ + Object: map[string]interface{}{ + "kind": "List", + "apiVersion": "v1", + "metadata": map[string]interface{}{"resourceVersion": "42"}, + }, + Items: []unstructured.Unstructured{ + {Object: map[string]interface{}{"name": "a"}}, + {Object: map[string]interface{}{"name": "b"}}, + {Object: map[string]interface{}{"name": "c"}}, + }, + }, + }, + } { + t.Run(tc.name, func(t *testing.T) { + // Encode once with deterministic mode as reference for semantic equality. + var detBuf bytes.Buffer + if _, err := detBuf.Write(selfDescribedCBOR); err != nil { + t.Fatal(err) + } + ok, err := streamEncodeCollections(tc.in, &detBuf, modes.Encode) + if err != nil { + t.Fatalf("deterministic encode error: %v", err) + } + if !ok { + t.Fatalf("expected streaming encoder to handle %T", tc.in) + } + var detObj map[string]interface{} + if err := modes.Decode.Unmarshal(detBuf.Bytes()[len(selfDescribedCBOR):], &detObj); err != nil { + t.Fatalf("decode deterministic output: %v", err) + } + + // Run nTrials of nondeterministic encoding. + uniqueOutputs := make(map[string]struct{}) + for i := range nTrials { + var buf bytes.Buffer + if _, err := buf.Write(selfDescribedCBOR); err != nil { + t.Fatal(err) + } + ok, err := streamEncodeCollections(tc.in, &buf, modes.EncodeNondeterministic) + if err != nil { + t.Fatalf("trial %d: nondeterministic encode error: %v", i, err) + } + if !ok { + t.Fatalf("trial %d: expected streaming encoder to handle %T", i, tc.in) + } + payload := buf.Bytes()[len(selfDescribedCBOR):] + + // Semantic correctness: decoded value must equal the deterministic reference. + var ndetObj map[string]interface{} + if err := modes.Decode.Unmarshal(payload, &ndetObj); err != nil { + t.Fatalf("trial %d: decode nondeterministic output: %v", i, err) + } + if diff := cmp.Diff(detObj, ndetObj); diff != "" { + t.Errorf("trial %d: semantic mismatch between deterministic and nondeterministic:\n%s", i, diff) + } + + uniqueOutputs[string(payload)] = struct{}{} + } + + // Non-idempotence: must have observed at least 2 distinct byte sequences. + if len(uniqueOutputs) < 2 { + t.Errorf("nondeterministic encoding produced only %d unique byte sequence(s) over %d trials; expected varied output", len(uniqueOutputs), nTrials) + } + t.Logf("%s: observed %d unique encodings over %d trials", tc.name, len(uniqueOutputs), nTrials) + }) + } +} diff --git a/staging/src/k8s.io/apimachinery/pkg/runtime/serializer/cbor/internal/modes/encode.go b/staging/src/k8s.io/apimachinery/pkg/runtime/serializer/cbor/internal/modes/encode.go index 815dbe6660a..3287dfd31c2 100644 --- a/staging/src/k8s.io/apimachinery/pkg/runtime/serializer/cbor/internal/modes/encode.go +++ b/staging/src/k8s.io/apimachinery/pkg/runtime/serializer/cbor/internal/modes/encode.go @@ -107,6 +107,7 @@ var encode = EncMode{ } return encode }(), + deterministic: true, } var Encode = EncMode{ @@ -122,6 +123,7 @@ var Encode = EncMode{ } return em }(), + deterministic: true, } var EncodeNondeterministic = EncMode{ @@ -134,16 +136,22 @@ var EncodeNondeterministic = EncMode{ } return em }(), + deterministic: false, } type EncMode struct { - delegate cbor.UserBufferEncMode + delegate cbor.UserBufferEncMode + deterministic bool } func (em EncMode) options() cbor.EncOptions { return em.delegate.EncOptions() } +func (em EncMode) IsDeterministic() bool { + return em.deterministic +} + func (em EncMode) MarshalTo(v interface{}, w io.Writer) error { if buf, ok := w.(*buffer); ok { return em.delegate.MarshalToBuffer(v, &buf.Buffer)