diff --git a/staging/src/k8s.io/apimachinery/pkg/runtime/serializer/cbor/collections.go b/staging/src/k8s.io/apimachinery/pkg/runtime/serializer/cbor/collections.go index 09505e3eda3..97804e86741 100644 --- a/staging/src/k8s.io/apimachinery/pkg/runtime/serializer/cbor/collections.go +++ b/staging/src/k8s.io/apimachinery/pkg/runtime/serializer/cbor/collections.go @@ -1,5 +1,5 @@ /* -Copyright 2026 The Kubernetes Authors. +Copyright The Kubernetes Authors. Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. @@ -27,6 +27,7 @@ import ( "k8s.io/apimachinery/pkg/api/meta" "k8s.io/apimachinery/pkg/conversion" + "k8s.io/klog/v2" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" @@ -42,15 +43,18 @@ func streamEncodeCollections(obj runtime.Object, w io.Writer, mode modes.EncMode return true, streamingEncodeUnstructuredList(w, list, mode) } if _, ok := obj.(cbor.Marshaler); ok { + klog.InfoS("is implement cbor.Marshaler", "obj", obj) return false, nil } if _, ok := obj.(json.Marshaler); ok { + klog.InfoS("is implement json.Marshaler", "obj", obj) return false, nil } typeMeta, listMeta, items, err := getListMeta(obj) if err == nil { return true, streamingEncodeList(w, typeMeta, listMeta, items, mode) } + klog.ErrorS(err, "getListMeta err", "obj", obj) return false, nil } @@ -69,8 +73,13 @@ func getListMeta(list runtime.Object) (metav1.TypeMeta, metav1.ListMeta, []runti if !ok { return metav1.TypeMeta{}, metav1.ListMeta{}, nil, fmt.Errorf("expected TypeMeta field to have TypeMeta type") } - if listType.Field(0).Tag.Get("json") != ",inline" { - return metav1.TypeMeta{}, metav1.ListMeta{}, nil, fmt.Errorf(`expected TypeMeta json field tag to be ",inline"`) + if !listType.Field(0).Anonymous { + return metav1.TypeMeta{}, metav1.ListMeta{}, nil, fmt.Errorf(`expected TypeMeta json field tag to be embedded`) + } + if jsonTag, jsonTagExists := listType.Field(0).Tag.Lookup("json"); !jsonTagExists { + return metav1.TypeMeta{}, metav1.ListMeta{}, nil, fmt.Errorf(`expected TypeMeta json field tag`) + } else if jsonTag != "" && jsonTag != ",inline" { + return metav1.TypeMeta{}, metav1.ListMeta{}, nil, fmt.Errorf(`expected TypeMeta json field tag to be "" or ",inline"`) } // ListMeta listMeta, ok := listValue.Field(1).Interface().(metav1.ListMeta) @@ -222,13 +231,13 @@ func encodeKeyValuePair(w io.Writer, key string, value interface{}, mode modes.E // writeMapHead writes a CBOR map header for a map with n entries. // Uses major type 5 (0xa0 base), following RFC 8949 Section 3.1. func writeMapHead(w io.Writer, n int) error { - return writeCollectionHead(w, 0xa0, n) + return writeCollectionHead(w, 0xa0, int64(n)) } // writeArrayHead writes a CBOR array header for an array with n elements. // Uses major type 4 (0x80 base), following RFC 8949 Section 3.1. func writeArrayHead(w io.Writer, n int) error { - return writeCollectionHead(w, 0x80, n) + return writeCollectionHead(w, 0x80, int64(n)) } // writeCollectionHead writes a CBOR collection (array or map) header encoding @@ -250,7 +259,7 @@ func writeArrayHead(w io.Writer, n int) error { // n <= 0xFFFF: 3 bytes — 0xb9 (0xa0|25), n>>8, n // n <= 0xFFFFFFFF: 5 bytes — 0xba (0xa0|26), n>>24..n // n > 0xFFFFFFFF: 9 bytes — 0xbb (0xa0|27), n>>56..n -func writeCollectionHead(w io.Writer, base byte, n int) error { +func writeCollectionHead(w io.Writer, base byte, n int64) error { switch { case n <= 23: // Additional info 0–23: length is encoded directly in the low 5 bits. diff --git a/staging/src/k8s.io/apimachinery/pkg/runtime/serializer/cbor/collections_test.go b/staging/src/k8s.io/apimachinery/pkg/runtime/serializer/cbor/collections_test.go index 47295bd966e..72749c59fab 100644 --- a/staging/src/k8s.io/apimachinery/pkg/runtime/serializer/cbor/collections_test.go +++ b/staging/src/k8s.io/apimachinery/pkg/runtime/serializer/cbor/collections_test.go @@ -1,5 +1,5 @@ /* -Copyright 2026 The Kubernetes Authors. +Copyright The Kubernetes Authors. Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. @@ -21,7 +21,6 @@ import ( "fmt" "testing" - "github.com/fxamacker/cbor/v2" "github.com/google/go-cmp/cmp" "sigs.k8s.io/randfill" @@ -33,16 +32,10 @@ import ( "k8s.io/apimachinery/pkg/runtime/serializer/cbor/internal/modes" ) -func TestCollectionsEncoding(t *testing.T) { - t.Run("Normal", func(t *testing.T) { - testCollectionsEncoding(t, false) - }) - t.Run("Streaming", func(t *testing.T) { - testCollectionsEncoding(t, true) - }) -} - -func testCollectionsEncoding(t *testing.T, streamingEnabled bool) { +// TestStreamingCollectionsEncoding verifies that streaming encoding produces +// output identical to normal non-streaming encoding, and that the streaming +// encoder actually uses multiple Write calls (not just buffering everything). +func TestStreamingCollectionsEncoding(t *testing.T) { var buf writeCountingBuffer var remainingItems int64 = 1 for _, tc := range []struct { @@ -390,10 +383,12 @@ func testCollectionsEncoding(t *testing.T, streamingEnabled bool) { RemainingItemCount: &remainingItems, }, Items: []testapigroupv1.Carp{ - {TypeMeta: metav1.TypeMeta{APIVersion: "v1", Kind: "Carp"}, ObjectMeta: metav1.ObjectMeta{ - Name: "pod", - Namespace: "default", - }}, + { + TypeMeta: metav1.TypeMeta{APIVersion: "v1", Kind: "Carp"}, ObjectMeta: metav1.ObjectMeta{ + Name: "pod", + Namespace: "default", + }, + }, }, }, }, @@ -408,14 +403,18 @@ func testCollectionsEncoding(t *testing.T, streamingEnabled bool) { ResourceVersion: "2345", }, Items: []testapigroupv1.Carp{ - {TypeMeta: metav1.TypeMeta{APIVersion: "v1", Kind: "Carp"}, ObjectMeta: metav1.ObjectMeta{ - Name: "pod", - Namespace: "default", - }}, - {TypeMeta: metav1.TypeMeta{APIVersion: "v1", Kind: "Carp"}, ObjectMeta: metav1.ObjectMeta{ - Name: "pod2", - Namespace: "default2", - }}, + { + TypeMeta: metav1.TypeMeta{APIVersion: "v1", Kind: "Carp"}, ObjectMeta: metav1.ObjectMeta{ + Name: "pod", + Namespace: "default", + }, + }, + { + TypeMeta: metav1.TypeMeta{APIVersion: "v1", Kind: "Carp"}, ObjectMeta: metav1.ObjectMeta{ + Name: "pod2", + Namespace: "default2", + }, + }, }, }, }, @@ -462,18 +461,22 @@ func testCollectionsEncoding(t *testing.T, streamingEnabled bool) { { name: "UnstructuredList no elements", in: &unstructured.UnstructuredList{ - Object: map[string]interface{}{"kind": "List", "apiVersion": "v1", "metadata": map[string]interface{}{"resourceVersion": "2345"}}, - Items: []unstructured.Unstructured{}, + Object: map[string]interface{}{ + "kind": "List", "apiVersion": "v1", "metadata": map[string]interface{}{"resourceVersion": "2345"}, + }, + Items: []unstructured.Unstructured{}, }, }, { name: "UnstructuredList one element with continue", in: &unstructured.UnstructuredList{ - Object: map[string]interface{}{"kind": "List", "apiVersion": "v1", "metadata": map[string]interface{}{ - "resourceVersion": "2345", - "continue": "abc", - "remainingItemCount": "1", - }}, + Object: map[string]interface{}{ + "kind": "List", "apiVersion": "v1", "metadata": map[string]interface{}{ + "resourceVersion": "2345", + "continue": "abc", + "remainingItemCount": "1", + }, + }, Items: []unstructured.Unstructured{ { Object: map[string]interface{}{ @@ -491,9 +494,11 @@ func testCollectionsEncoding(t *testing.T, streamingEnabled bool) { { name: "UnstructuredList two elements", in: &unstructured.UnstructuredList{ - Object: map[string]interface{}{"kind": "List", "apiVersion": "v1", "metadata": map[string]interface{}{ - "resourceVersion": "2345", - }}, + Object: map[string]interface{}{ + "kind": "List", "apiVersion": "v1", "metadata": map[string]interface{}{ + "resourceVersion": "2345", + }, + }, Items: []unstructured.Unstructured{ { Object: map[string]interface{}{ @@ -521,13 +526,15 @@ func testCollectionsEncoding(t *testing.T, streamingEnabled bool) { { name: "UnstructuredList conflict on items", in: &unstructured.UnstructuredList{ - Object: map[string]interface{}{"items": []unstructured.Unstructured{ - { - Object: map[string]interface{}{ - "name": "pod", + Object: map[string]interface{}{ + "items": []unstructured.Unstructured{ + { + Object: map[string]interface{}{ + "name": "pod", + }, }, }, - }}, + }, Items: []unstructured.Unstructured{ { Object: map[string]interface{}{ @@ -540,7 +547,7 @@ func testCollectionsEncoding(t *testing.T, streamingEnabled bool) { } { t.Run(tc.name, func(t *testing.T) { buf.Reset() - s := NewSerializer(nil, nil, StreamingCollectionsEncoding(streamingEnabled)) + s := NewSerializer(nil, nil, StreamingCollectionsEncoding(true)) if err := s.Encode(tc.in, &buf); err != nil { t.Fatalf("unexpected error: %v", err) } @@ -555,7 +562,7 @@ func testCollectionsEncoding(t *testing.T, streamingEnabled bool) { t.Errorf("streaming and normal encoding differ:\n%s", diff) } - expectStreaming := !tc.cannotStream && streamingEnabled + expectStreaming := !tc.cannotStream if expectStreaming && buf.writeCount <= 2 { t.Errorf("expected streaming but Write was called only: %d", buf.writeCount) } @@ -739,19 +746,20 @@ func TestFuzzCollectionsEncoding(t *testing.T) { "kind": "List", "apiVersion": "v1", c.String(0): c.String(0), - c.String(0): c.Uint64(), + c.String(0): int64(c.Intn(1000000)), // Limit to int64 range c.String(0): c.Bool(), "metadata": map[string]interface{}{ - "resourceVersion": fmt.Sprintf("%d", c.Uint64()), + "resourceVersion": fmt.Sprintf("%d", c.Intn(1000000)), // String format "continue": c.String(0), - "remainingItemCount": fmt.Sprintf("%d", c.Uint64()), + "remainingItemCount": fmt.Sprintf("%d", c.Intn(1000000)), // String format c.String(0): c.String(0), - }} + }, + } c.Fill(&list.Items) } fuzzMap := func(kvs map[string]interface{}, c randfill.Continue) { kvs[c.String(0)] = c.Bool() - kvs[c.String(0)] = c.Uint64() + kvs[c.String(0)] = int64(c.Intn(1000000)) // Limit to int64 range kvs[c.String(0)] = c.String(0) } f := randfill.New().Funcs(disableFuzzFieldsV1, fuzzUnstructuredList, fuzzMap) @@ -759,7 +767,7 @@ func TestFuzzCollectionsEncoding(t *testing.T) { normalSerializer := NewSerializer(nil, nil) normalBuffer := &bytes.Buffer{} t.Run("CarpList", func(t *testing.T) { - for i := 0; i < 1000; i++ { + for range 1000 { list := &testapigroupv1.CarpList{} f.Fill(list) streamingBuffer.Reset() @@ -785,7 +793,7 @@ func TestFuzzCollectionsEncoding(t *testing.T) { } }) t.Run("UnstructuredList", func(t *testing.T) { - for i := 0; i < 1000; i++ { + for range 1000 { list := &unstructured.UnstructuredList{} f.Fill(list) streamingBuffer.Reset() @@ -810,126 +818,96 @@ func TestFuzzCollectionsEncoding(t *testing.T) { } } }) - // Test EncodeNondeterministic: key order may differ from Encode, so we only - // verify the output is structurally valid CBOR (has selfDescribedCBOR prefix). + // Test EncodeNondeterministic: key order may differ, but output must decode to the same object. t.Run("CarpList/Nondeterministic", func(t *testing.T) { - for i := 0; i < 100; i++ { + for i := range 100 { list := &testapigroupv1.CarpList{} f.Fill(list) - streamingBuffer.Reset() - if _, err := streamingBuffer.Write(selfDescribedCBOR); err != nil { - t.Fatalf("unexpected error: %v", err) + + // Encode with deterministic mode as reference. + var detBuffer bytes.Buffer + ok, err := streamEncodeCollections(list, &detBuffer, modes.Encode) + if err != nil { + t.Fatalf("deterministic encode error: %v", err) } - ok, err := streamEncodeCollections(list, streamingBuffer, modes.EncodeNondeterministic) + if !ok { + t.Fatalf("expected streaming encoder to encode %T", list) + } + + // Decode deterministic output to interface{} for semantic comparison. + var detObj interface{} + if err := modes.Decode.Unmarshal(detBuffer.Bytes(), &detObj); err != nil { + t.Fatalf("decode deterministic output: %v", err) + } + + // Encode with nondeterministic mode. + streamingBuffer.Reset() + ok, err = streamEncodeCollections(list, streamingBuffer, modes.EncodeNondeterministic) if err != nil { t.Fatalf("unexpected error: %v", err) } if !ok { t.Fatalf("expected streaming encoder to encode %T", list) } - if !bytes.HasPrefix(streamingBuffer.Bytes(), selfDescribedCBOR) { - t.Errorf("streaming output missing selfDescribedCBOR prefix") + + // Semantic correctness: decoded nondeterministic output must equal deterministic reference. + payload := streamingBuffer.Bytes() + var ndetObj interface{} + if err := modes.Decode.Unmarshal(payload, &ndetObj); err != nil { + t.Fatalf("decode nondeterministic output: %v", err) + } + if diff := cmp.Diff(detObj, ndetObj); diff != "" { + t.Errorf("trial %d: semantic mismatch between deterministic and nondeterministic:\n%s", i, diff) } } }) t.Run("UnstructuredList/Nondeterministic", func(t *testing.T) { - for i := 0; i < 100; i++ { + for i := range 100 { list := &unstructured.UnstructuredList{} f.Fill(list) - streamingBuffer.Reset() - if _, err := streamingBuffer.Write(selfDescribedCBOR); err != nil { - t.Fatalf("unexpected error: %v", err) + + // Encode with deterministic mode as reference. + var detBuffer bytes.Buffer + ok, err := streamEncodeCollections(list, &detBuffer, modes.Encode) + if err != nil { + t.Fatalf("deterministic encode error: %v", err) } - ok, err := streamEncodeCollections(list, streamingBuffer, modes.EncodeNondeterministic) + if !ok { + t.Fatalf("expected streaming encoder to encode %T", list) + } + + // Decode deterministic output to map[string]interface{} for semantic comparison. + var detObj map[string]interface{} + if err := modes.Decode.Unmarshal(detBuffer.Bytes(), &detObj); err != nil { + t.Fatalf("decode deterministic output: %v", err) + } + + // Encode with nondeterministic mode. + streamingBuffer.Reset() + ok, err = streamEncodeCollections(list, streamingBuffer, modes.EncodeNondeterministic) if err != nil { t.Fatalf("unexpected error: %v", err) } if !ok { t.Fatalf("expected streaming encoder to encode %T", list) } - if !bytes.HasPrefix(streamingBuffer.Bytes(), selfDescribedCBOR) { - t.Errorf("streaming output missing selfDescribedCBOR prefix") + + // Semantic correctness: decoded nondeterministic output must equal deterministic reference. + payload := streamingBuffer.Bytes() + var ndetObj map[string]interface{} + if err := modes.Decode.Unmarshal(payload, &ndetObj); err != nil { + t.Fatalf("decode nondeterministic output: %v", err) + } + if diff := cmp.Diff(detObj, ndetObj); diff != "" { + t.Errorf("trial %d: semantic mismatch between deterministic and nondeterministic:\n%s", i, diff) } } }) } -// extractCBORMapKeys decodes the top-level CBOR map keys (as strings) from data in -// insertion order, skipping the selfDescribedCBOR tag prefix (0xd9d9f7) if present. -// Only the immediate keys of the outermost map are returned; values are skipped via -// cbor streaming decoder. Keys must be CBOR byte strings (major type 2) or text -// strings (major type 3) with a short length (≤23 bytes). -func extractCBORMapKeys(t *testing.T, data []byte) []string { - t.Helper() - // Skip selfDescribedCBOR tag prefix if present. - if bytes.HasPrefix(data, selfDescribedCBOR) { - data = data[len(selfDescribedCBOR):] - } - if len(data) == 0 { - return nil - } - // Parse map header manually to get the number of entries and - // advance pos past the header byte(s). - pos := 0 - mapByte := data[pos] - pos++ - if mapByte>>5 != 5 { - t.Fatalf("extractCBORMapKeys: expected major type 5 (map), got byte 0x%02x", mapByte) - } - addInfo := mapByte & 0x1f - var mapSize int - switch { - case addInfo <= 23: - mapSize = int(addInfo) - case addInfo == 24: - mapSize = int(data[pos]) - pos++ - case addInfo == 25: - mapSize = int(data[pos])<<8 | int(data[pos+1]) - pos += 2 - default: - t.Fatalf("extractCBORMapKeys: unsupported map size additional info %d", addInfo) - } - // Use a streaming decoder to read key+value pairs one at a time. - // This correctly handles each value's variable byte length. - dec := cbor.NewDecoder(bytes.NewReader(data[pos:])) - keys := make([]string, 0, mapSize) - for i := 0; i < mapSize; i++ { - // Decode the key as a RawMessage, then extract the string from the raw bytes. - var rawKey cbor.RawMessage - if err := dec.Decode(&rawKey); err != nil { - t.Fatalf("extractCBORMapKeys: decode key %d: %v", i, err) - } - if len(rawKey) == 0 { - t.Fatalf("extractCBORMapKeys: empty raw key at index %d", i) - } - keyMajor := rawKey[0] >> 5 - if keyMajor != 2 && keyMajor != 3 { - t.Fatalf("extractCBORMapKeys: key %d has unexpected major type %d (byte 0x%02x)", i, keyMajor, rawKey[0]) - } - // Extract the string content: skip the header byte(s). - keyHdrLen := 1 - if rawKey[0]&0x1f == 24 { - keyHdrLen = 2 // 1 type byte + 1 length byte - } - keys = append(keys, string(rawKey[keyHdrLen:])) - // Skip the value. - var rawVal cbor.RawMessage - if err := dec.Decode(&rawVal); err != nil { - t.Fatalf("extractCBORMapKeys: decode value for key %q: %v", keys[len(keys)-1], err) - } - } - return keys -} - // TestStreamEncodeCollectionsDeterministic verifies that streamEncodeCollections -// with modes.Encode (SortBytewiseLexical) produces: -// 1. Idempotent output: the same input always encodes to identical bytes. -// 2. Correct key order: top-level map keys follow SortBytewiseLexical -// (shorter length first, then lexicographic within same length). +// with modes.Encode produces output identical to the normal non-streaming encoder. func TestStreamEncodeCollectionsDeterministic(t *testing.T) { - wantKeyOrder := []string{"kind", "items", "metadata", "apiVersion"} - for _, tc := range []struct { name string in runtime.Object @@ -946,6 +924,8 @@ func TestStreamEncodeCollectionsDeterministic(t *testing.T) { }, Items: []testapigroupv1.Carp{ {ObjectMeta: metav1.ObjectMeta{Name: "a"}}, + {ObjectMeta: metav1.ObjectMeta{Name: "b"}}, + {ObjectMeta: metav1.ObjectMeta{Name: "c"}}, }, }, }, @@ -959,35 +939,34 @@ func TestStreamEncodeCollectionsDeterministic(t *testing.T) { }, Items: []unstructured.Unstructured{ {Object: map[string]interface{}{"name": "a"}}, + {Object: map[string]interface{}{"name": "b"}}, + {Object: map[string]interface{}{"name": "b"}}, }, }, }, } { t.Run(tc.name, func(t *testing.T) { - // 1. Idempotence: encode twice, bytes must match. - var buf1, buf2 bytes.Buffer - for _, buf := range []*bytes.Buffer{&buf1, &buf2} { - if _, err := buf.Write(selfDescribedCBOR); err != nil { - t.Fatal(err) - } - ok, err := streamEncodeCollections(tc.in, buf, modes.Encode) - if err != nil { - t.Fatalf("streamEncodeCollections error: %v", err) - } - if !ok { - t.Fatalf("expected streaming encoder to handle %T", tc.in) - } + // Encode with streaming enabled. + var streamingBuf bytes.Buffer + ok, err := streamEncodeCollections(tc.in, &streamingBuf, modes.Encode) + if err != nil { + t.Fatalf("streamEncodeCollections error: %v", err) } - if diff := cmp.Diff(buf1.Bytes(), buf2.Bytes()); diff != "" { - t.Errorf("deterministic encoding is not idempotent:\n%s", diff) + if !ok { + t.Fatalf("expected streaming encoder to handle %T", tc.in) } - // 2. Key order follows SortBytewiseLexical. - // Expected order for {kind(4), items(5), metadata(8), apiVersion(10)}: - // shorter length first → kind < items < metadata < apiVersion - gotKeys := extractCBORMapKeys(t, buf1.Bytes()) - if diff := cmp.Diff(wantKeyOrder, gotKeys); diff != "" { - t.Errorf("top-level key order does not follow SortBytewiseLexical:\n%s", diff) + // Encode with normal non-streaming encoder. + var normalBuf bytes.Buffer + if err := modes.Encode.MarshalTo(tc.in, &normalBuf); err != nil { + t.Fatalf("normal encode error: %v", err) + } + + // Output must be identical. + if diff := cmp.Diff(normalBuf.Bytes(), streamingBuf.Bytes()); diff != "" { + t.Logf("normal: %x", normalBuf.Bytes()) + t.Logf("streaming: %x", streamingBuf.Bytes()) + t.Errorf("streaming output differs from normal encoding:\n%s", diff) } }) } @@ -1020,6 +999,8 @@ func TestStreamEncodeCollectionsNondeterministic(t *testing.T) { }, Items: []testapigroupv1.Carp{ {ObjectMeta: metav1.ObjectMeta{Name: "a"}}, + {ObjectMeta: metav1.ObjectMeta{Name: "b"}}, + {ObjectMeta: metav1.ObjectMeta{Name: "c"}}, }, }, }, @@ -1033,6 +1014,8 @@ func TestStreamEncodeCollectionsNondeterministic(t *testing.T) { }, Items: []unstructured.Unstructured{ {Object: map[string]interface{}{"name": "a"}}, + {Object: map[string]interface{}{"name": "b"}}, + {Object: map[string]interface{}{"name": "c"}}, }, }, }, @@ -1057,7 +1040,7 @@ func TestStreamEncodeCollectionsNondeterministic(t *testing.T) { // Run nTrials of nondeterministic encoding. uniqueOutputs := make(map[string]struct{}) - for i := 0; i < nTrials; i++ { + for i := range nTrials { var buf bytes.Buffer if _, err := buf.Write(selfDescribedCBOR); err != nil { t.Fatal(err)