From a960388db2b917852bb51f26cdbbf05971fac472 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Peter=20=C5=A0tibran=C3=BD?= Date: Tue, 9 Jun 2026 20:12:08 +0200 Subject: [PATCH] Search: parallel chunk I/O for KV-backed snapshots (#126090) WriteSnapshotFile and ReadSnapshotFile previously walked chunks serially. On the write side every chunk is independent, and on the read side chunks 1..N-1 are independent once chunk 0 has been fetched (its size reveals the writer's chunk size). Both paths now run their chunk loops through an errgroup with a configurable fan-out. Concurrency is exposed as index_snapshot_kv_chunk_concurrency in cfg and plumbed to KVRemoteIndexStoreConfig.ChunkConcurrency. The default is 1, which is the previous serial behavior. Operators can raise it at runtime to measure throughput tradeoffs without redeploying. Parallel reads write to dst via io.NewOffsetWriter, one per chunk, so no per-chunk buffer is held in memory. *os.File.WriteAt is safe under concurrent use to non-overlapping regions, which is what our slots are by construction. --- pkg/setting/setting.go | 1 + pkg/setting/setting_unified_storage.go | 1 + .../unified/search/kv_remote_index_store.go | 105 ++++++++++++------ .../search/kv_remote_index_store_test.go | 60 ++++++++++ pkg/storage/unified/sql/service.go | 5 +- 5 files changed, 135 insertions(+), 37 deletions(-) diff --git a/pkg/setting/setting.go b/pkg/setting/setting.go index c0ca22db520..4f6b9914a81 100644 --- a/pkg/setting/setting.go +++ b/pkg/setting/setting.go @@ -683,6 +683,7 @@ type Cfg struct { IndexSnapshotEnabled bool // Enable remote index snapshots IndexSnapshotBucketURL string // Go CDK bucket URL for snapshot storage (s3://, gs://, azblob://, mem://, file:///) IndexSnapshotStorageKV bool // Store snapshots in the same KV used by the storage backend instead of an object-storage bucket. Mutually exclusive with index_snapshot_bucket_url; requires enable_kv_leases. + IndexSnapshotKVChunkConcurrency int // Per-file chunk I/O fan-out for KV-backed snapshots. 0 / 1 = serial. Used only when index_snapshot_storage_kv is true. IndexSnapshotThreshold int // Min doc count to use remote snapshots (must be >= IndexFileThreshold, default: 5000) IndexSnapshotMaxAge time.Duration // Max snapshot age before deletion (must be >= MaxFileIndexAge, default: 7d) IndexSnapshotCleanupGracePeriod time.Duration // Time a new snapshot must exist before its predecessor in the same Grafana-version group is eligible for cleanup (default: 30m) diff --git a/pkg/setting/setting_unified_storage.go b/pkg/setting/setting_unified_storage.go index 5f3013a9433..764a91a322a 100644 --- a/pkg/setting/setting_unified_storage.go +++ b/pkg/setting/setting_unified_storage.go @@ -262,6 +262,7 @@ func (cfg *Cfg) setUnifiedStorageConfig() { cfg.IndexSnapshotEnabled = section.Key("index_snapshot_enabled").MustBool(false) cfg.IndexSnapshotBucketURL = section.Key("index_snapshot_bucket_url").String() cfg.IndexSnapshotStorageKV = section.Key("index_snapshot_storage_kv").MustBool(false) + cfg.IndexSnapshotKVChunkConcurrency = section.Key("index_snapshot_kv_chunk_concurrency").MustInt(1) cfg.IndexSnapshotThreshold = section.Key("index_snapshot_threshold").MustInt(5000) if cfg.IndexSnapshotThreshold < cfg.IndexFileThreshold { cfg.Logger.Warn("index_snapshot_threshold is smaller than index_file_threshold, overriding", "configured", cfg.IndexSnapshotThreshold, "index_file_threshold", cfg.IndexFileThreshold) diff --git a/pkg/storage/unified/search/kv_remote_index_store.go b/pkg/storage/unified/search/kv_remote_index_store.go index 9ea441af0a7..c570aeaa000 100644 --- a/pkg/storage/unified/search/kv_remote_index_store.go +++ b/pkg/storage/unified/search/kv_remote_index_store.go @@ -13,6 +13,7 @@ import ( "time" "github.com/oklog/ulid/v2" + "golang.org/x/sync/errgroup" "github.com/grafana/grafana/pkg/apimachinery/validation" "github.com/grafana/grafana/pkg/infra/log" @@ -57,6 +58,12 @@ const ( minKVChunkSize int64 = 1024 * 1024 maxKVChunkSize int64 = 1024 * 1024 * 1024 + // maxKVChunkConcurrency caps the per-file parallelism for chunk I/O. + // The cap is a sanity bound: values much higher than this hit + // diminishing returns and risk overloading the KV backend or + // exhausting connection pools. + maxKVChunkConcurrency = 32 + // kvChunkSuffixFormat zero-pads the chunk index so listing returns // chunks in numeric order. Six digits supports up to a million chunks // per file, which is comfortable headroom even at the minimum chunk @@ -85,6 +92,12 @@ type KVRemoteIndexStoreConfig struct { // length, so writers can change ChunkSize between deployments without // breaking existing snapshots. Zero falls back to defaultKVChunkSize. ChunkSize int64 + + // ChunkConcurrency bounds the number of chunks uploaded or downloaded + // in parallel per snapshot file. Zero means 1 (fully serial). On the + // read side, chunk 0 is always read serially first (it reveals the + // writer's chunk size); only chunks 1..N-1 are parallelized. + ChunkConcurrency int } // KVRemoteIndexStore implements RemoteIndexStore on top of a kv.KV. @@ -127,12 +140,13 @@ type KVRemoteIndexStoreConfig struct { // not surface incomplete uploads; the cleanup pass uses // ListIndexKeysIncludingIncomplete to get that wider view. type KVRemoteIndexStore struct { - store kv.KV - leaseMgr *lease.Manager - buildLockOpts LockOptions - cleanupLockOpts LockOptions - chunkSize int64 - log log.Logger + store kv.KV + leaseMgr *lease.Manager + buildLockOpts LockOptions + cleanupLockOpts LockOptions + chunkSize int64 + chunkConcurrency int + log log.Logger } // NewKVRemoteIndexStore creates a KVRemoteIndexStore from cfg. Returns an @@ -148,13 +162,18 @@ func NewKVRemoteIndexStore(cfg KVRemoteIndexStoreConfig) (*KVRemoteIndexStore, e if chunkSize < minKVChunkSize || chunkSize > maxKVChunkSize { return nil, fmt.Errorf("chunk size %d out of range [%d, %d]", chunkSize, minKVChunkSize, maxKVChunkSize) } + chunkConcurrency := cmp.Or(cfg.ChunkConcurrency, 1) + if chunkConcurrency < 1 || chunkConcurrency > maxKVChunkConcurrency { + return nil, fmt.Errorf("chunk concurrency %d out of range [1, %d]", chunkConcurrency, maxKVChunkConcurrency) + } return &KVRemoteIndexStore{ - store: cfg.KV, - leaseMgr: cfg.LeaseManager, - buildLockOpts: cfg.BuildLock, - cleanupLockOpts: cfg.CleanupLock, - chunkSize: chunkSize, - log: log.New("kv-remote-index-store"), + store: cfg.KV, + leaseMgr: cfg.LeaseManager, + buildLockOpts: cfg.BuildLock, + cleanupLockOpts: cfg.CleanupLock, + chunkSize: chunkSize, + chunkConcurrency: chunkConcurrency, + log: log.New("kv-remote-index-store"), }, nil } @@ -258,18 +277,20 @@ func (s *KVRemoteIndexStore) WriteSnapshotFile(ctx context.Context, nsResource r } numChunks := (size + s.chunkSize - 1) / s.chunkSize + g, gctx := errgroup.WithContext(ctx) + g.SetLimit(s.chunkConcurrency) for chunkIdx := range numChunks { offset := chunkIdx * s.chunkSize length := s.chunkSize if remaining := size - offset; remaining < length { length = remaining } - section := io.NewSectionReader(src, offset, length) - if err := s.writeChunk(ctx, nsResource, indexKey, relPath, chunkIdx, section); err != nil { - return err - } + g.Go(func() error { + section := io.NewSectionReader(src, offset, length) + return s.writeChunk(gctx, nsResource, indexKey, relPath, chunkIdx, section) + }) } - return nil + return g.Wait() } // writeChunk uploads a single chunk to its KV key. The reader is fully @@ -326,26 +347,40 @@ func (s *KVRemoteIndexStore) ReadSnapshotFile(ctx context.Context, nsResource re // Chunk 0 covered some but not all of the file, so its size IS the // writer's chunk size; fetch the rest accordingly. chunkSize := n0 - written := n0 - for chunkIdx := int64(1); written < expectedSize; chunkIdx++ { - // Cap each subsequent chunk at chunkSize+1 to detect over-read - // without buffering the whole chunk first. - n, err := s.readChunk(ctx, nsResource, indexKey, relPath, chunkIdx, dst, chunkSize+1) - if err != nil { - if errors.Is(err, kv.ErrNotFound) { - return fmt.Errorf("chunk %d of %s missing: %w", chunkIdx, relPath, err) + numChunks := (expectedSize + chunkSize - 1) / chunkSize + + // Parallel reads write to their assigned slot in dst via + // io.OffsetWriter, so chunks can land in any order. + g, gctx := errgroup.WithContext(ctx) + g.SetLimit(s.chunkConcurrency) + for chunkIdx := int64(1); chunkIdx < numChunks; chunkIdx++ { + offset := chunkIdx * chunkSize + chunkMax := chunkSize + if chunkIdx == numChunks-1 { + // Tail chunk may be short; cap at the remaining bytes. + chunkMax = expectedSize - offset + } + g.Go(func() error { + // Cap each chunk at chunkMax+1 to detect over-read without + // buffering the whole chunk first. + out := io.NewOffsetWriter(dst, offset) + n, err := s.readChunk(gctx, nsResource, indexKey, relPath, chunkIdx, out, chunkMax+1) + if err != nil { + if errors.Is(err, kv.ErrNotFound) { + return fmt.Errorf("chunk %d of %s missing: %w", chunkIdx, relPath, err) + } + return err } - return err - } - if n == 0 { - return fmt.Errorf("chunk %d of %s is empty", chunkIdx, relPath) - } - written += n - if written > expectedSize { - return fmt.Errorf("remote object %s exceeds expected size %d: %w", relPath, expectedSize, resource.ErrWriteLimitExceeded) - } + if n == 0 { + return fmt.Errorf("chunk %d of %s is empty", chunkIdx, relPath) + } + if n > chunkMax { + return fmt.Errorf("remote object %s chunk %d exceeds expected size: %w", relPath, chunkIdx, resource.ErrWriteLimitExceeded) + } + return nil + }) } - return nil + return g.Wait() } // readChunk fetches one chunk into dst, capped at maxBytes to detect diff --git a/pkg/storage/unified/search/kv_remote_index_store_test.go b/pkg/storage/unified/search/kv_remote_index_store_test.go index 46ebb25df29..a27b5f10a53 100644 --- a/pkg/storage/unified/search/kv_remote_index_store_test.go +++ b/pkg/storage/unified/search/kv_remote_index_store_test.go @@ -734,6 +734,29 @@ func TestKVRemoteIndexStore_New_RejectsInvalidChunkSize(t *testing.T) { require.Equal(t, defaultKVChunkSize, s.chunkSize) } +func TestKVRemoteIndexStore_New_RejectsInvalidChunkConcurrency(t *testing.T) { + store := newTestBadgerKV(t) + mgr := newTestLeaseManager(t, store, "owner") + + // Negative is rejected. + _, err := NewKVRemoteIndexStore(KVRemoteIndexStoreConfig{KV: store, LeaseManager: mgr, ChunkConcurrency: -1}) + require.ErrorContains(t, err, "chunk concurrency") + + // Above maximum is rejected. + _, err = NewKVRemoteIndexStore(KVRemoteIndexStoreConfig{KV: store, LeaseManager: mgr, ChunkConcurrency: maxKVChunkConcurrency + 1}) + require.ErrorContains(t, err, "chunk concurrency") + + // Zero defaults to serial (1). + s, err := NewKVRemoteIndexStore(KVRemoteIndexStoreConfig{KV: store, LeaseManager: mgr}) + require.NoError(t, err) + require.Equal(t, 1, s.chunkConcurrency) + + // Valid value is preserved. + s, err = NewKVRemoteIndexStore(KVRemoteIndexStoreConfig{KV: store, LeaseManager: mgr, ChunkConcurrency: 8}) + require.NoError(t, err) + require.Equal(t, 8, s.chunkConcurrency) +} + func TestKVRemoteIndexStore_ChunkedRoundTrip(t *testing.T) { // Round-trips files spanning several chunk-count regimes through the // public Write/Read API and verifies byte-for-byte identity. The small @@ -768,6 +791,33 @@ func TestKVRemoteIndexStore_ChunkedRoundTrip(t *testing.T) { } } +// TestKVRemoteIndexStore_ParallelChunkIO_RoundTrip verifies that the +// parallel write and read paths produce the same bytes as the serial +// path. A many-chunk file is round-tripped with a range of concurrency +// values; each must reconstruct dst byte-for-byte. +func TestKVRemoteIndexStore_ParallelChunkIO_RoundTrip(t *testing.T) { + ns := newTestNsResource() + ctx := t.Context() + const chunkSize int64 = 4096 + // Pick a size that produces a partial tail chunk to exercise the + // numChunks-1 short-chunk branch on the read side. + const size int64 = chunkSize*7 + 123 + + for _, concurrency := range []int{1, 2, 4, 8} { + t.Run(fmt.Sprintf("concurrency=%d", concurrency), func(t *testing.T) { + store := newChunkSizedConcurrentTestStore(t, chunkSize, concurrency) + + key := ulid.Make() + src, want := newTempFileWithContent(t, size) + require.NoError(t, store.WriteSnapshotFile(ctx, ns, key, "f", src)) + + dst := newTempOSFile(t) + require.NoError(t, store.ReadSnapshotFile(ctx, ns, key, "f", dst, size)) + require.Equal(t, want, readAllFromFile(t, dst)) + }) + } +} + func TestKVRemoteIndexStore_ChunkSizeIndependence(t *testing.T) { // A file written with one ChunkSize must round-trip through a reader // configured with a different ChunkSize. The reader recovers the @@ -868,6 +918,16 @@ func newChunkSizedTestStoreOn(t *testing.T, store kv.KV, chunkSize int64) *KVRem return s } +// newChunkSizedConcurrentTestStore is like newChunkSizedTestStore but +// also sets ChunkConcurrency so callers can exercise the parallel I/O +// path with a controlled fan-out. +func newChunkSizedConcurrentTestStore(t *testing.T, chunkSize int64, concurrency int) *KVRemoteIndexStore { + t.Helper() + s := newChunkSizedTestStore(t, chunkSize) + s.chunkConcurrency = concurrency + return s +} + // newTempFileWithContent writes size bytes of a deterministic pattern to a // fresh temp file and returns both the open file (positioned at 0) and the // bytes it contains, so callers can use the file as a snapshot source and diff --git a/pkg/storage/unified/sql/service.go b/pkg/storage/unified/sql/service.go index 70cf04d6129..0273bc6c09f 100644 --- a/pkg/storage/unified/sql/service.go +++ b/pkg/storage/unified/sql/service.go @@ -611,8 +611,9 @@ func buildKVSnapshotStore(cfg *setting.Cfg, backend resource.StorageBackend, log } store, err := search.NewKVRemoteIndexStore(search.KVRemoteIndexStoreConfig{ - KV: kvBackend.KV(), - LeaseManager: leaseMgr, + KV: kvBackend.KV(), + LeaseManager: leaseMgr, + ChunkConcurrency: cfg.IndexSnapshotKVChunkConcurrency, }) if err != nil { return nil, fmt.Errorf("building KV remote index store: %w", err)