diff --git a/pkg/setting/setting.go b/pkg/setting/setting.go index c0ca22db520..4f6b9914a81 100644 --- a/pkg/setting/setting.go +++ b/pkg/setting/setting.go @@ -683,6 +683,7 @@ type Cfg struct { IndexSnapshotEnabled bool // Enable remote index snapshots IndexSnapshotBucketURL string // Go CDK bucket URL for snapshot storage (s3://, gs://, azblob://, mem://, file:///) IndexSnapshotStorageKV bool // Store snapshots in the same KV used by the storage backend instead of an object-storage bucket. Mutually exclusive with index_snapshot_bucket_url; requires enable_kv_leases. + IndexSnapshotKVChunkConcurrency int // Per-file chunk I/O fan-out for KV-backed snapshots. 0 / 1 = serial. Used only when index_snapshot_storage_kv is true. IndexSnapshotThreshold int // Min doc count to use remote snapshots (must be >= IndexFileThreshold, default: 5000) IndexSnapshotMaxAge time.Duration // Max snapshot age before deletion (must be >= MaxFileIndexAge, default: 7d) IndexSnapshotCleanupGracePeriod time.Duration // Time a new snapshot must exist before its predecessor in the same Grafana-version group is eligible for cleanup (default: 30m) diff --git a/pkg/setting/setting_unified_storage.go b/pkg/setting/setting_unified_storage.go index 5f3013a9433..764a91a322a 100644 --- a/pkg/setting/setting_unified_storage.go +++ b/pkg/setting/setting_unified_storage.go @@ -262,6 +262,7 @@ func (cfg *Cfg) setUnifiedStorageConfig() { cfg.IndexSnapshotEnabled = section.Key("index_snapshot_enabled").MustBool(false) cfg.IndexSnapshotBucketURL = section.Key("index_snapshot_bucket_url").String() cfg.IndexSnapshotStorageKV = section.Key("index_snapshot_storage_kv").MustBool(false) + cfg.IndexSnapshotKVChunkConcurrency = section.Key("index_snapshot_kv_chunk_concurrency").MustInt(1) cfg.IndexSnapshotThreshold = section.Key("index_snapshot_threshold").MustInt(5000) if cfg.IndexSnapshotThreshold < cfg.IndexFileThreshold { cfg.Logger.Warn("index_snapshot_threshold is smaller than index_file_threshold, overriding", "configured", cfg.IndexSnapshotThreshold, "index_file_threshold", cfg.IndexFileThreshold) diff --git a/pkg/storage/unified/search/kv_remote_index_store.go b/pkg/storage/unified/search/kv_remote_index_store.go index 9ea441af0a7..c570aeaa000 100644 --- a/pkg/storage/unified/search/kv_remote_index_store.go +++ b/pkg/storage/unified/search/kv_remote_index_store.go @@ -13,6 +13,7 @@ import ( "time" "github.com/oklog/ulid/v2" + "golang.org/x/sync/errgroup" "github.com/grafana/grafana/pkg/apimachinery/validation" "github.com/grafana/grafana/pkg/infra/log" @@ -57,6 +58,12 @@ const ( minKVChunkSize int64 = 1024 * 1024 maxKVChunkSize int64 = 1024 * 1024 * 1024 + // maxKVChunkConcurrency caps the per-file parallelism for chunk I/O. + // The cap is a sanity bound: values much higher than this hit + // diminishing returns and risk overloading the KV backend or + // exhausting connection pools. + maxKVChunkConcurrency = 32 + // kvChunkSuffixFormat zero-pads the chunk index so listing returns // chunks in numeric order. Six digits supports up to a million chunks // per file, which is comfortable headroom even at the minimum chunk @@ -85,6 +92,12 @@ type KVRemoteIndexStoreConfig struct { // length, so writers can change ChunkSize between deployments without // breaking existing snapshots. Zero falls back to defaultKVChunkSize. ChunkSize int64 + + // ChunkConcurrency bounds the number of chunks uploaded or downloaded + // in parallel per snapshot file. Zero means 1 (fully serial). On the + // read side, chunk 0 is always read serially first (it reveals the + // writer's chunk size); only chunks 1..N-1 are parallelized. + ChunkConcurrency int } // KVRemoteIndexStore implements RemoteIndexStore on top of a kv.KV. @@ -127,12 +140,13 @@ type KVRemoteIndexStoreConfig struct { // not surface incomplete uploads; the cleanup pass uses // ListIndexKeysIncludingIncomplete to get that wider view. type KVRemoteIndexStore struct { - store kv.KV - leaseMgr *lease.Manager - buildLockOpts LockOptions - cleanupLockOpts LockOptions - chunkSize int64 - log log.Logger + store kv.KV + leaseMgr *lease.Manager + buildLockOpts LockOptions + cleanupLockOpts LockOptions + chunkSize int64 + chunkConcurrency int + log log.Logger } // NewKVRemoteIndexStore creates a KVRemoteIndexStore from cfg. Returns an @@ -148,13 +162,18 @@ func NewKVRemoteIndexStore(cfg KVRemoteIndexStoreConfig) (*KVRemoteIndexStore, e if chunkSize < minKVChunkSize || chunkSize > maxKVChunkSize { return nil, fmt.Errorf("chunk size %d out of range [%d, %d]", chunkSize, minKVChunkSize, maxKVChunkSize) } + chunkConcurrency := cmp.Or(cfg.ChunkConcurrency, 1) + if chunkConcurrency < 1 || chunkConcurrency > maxKVChunkConcurrency { + return nil, fmt.Errorf("chunk concurrency %d out of range [1, %d]", chunkConcurrency, maxKVChunkConcurrency) + } return &KVRemoteIndexStore{ - store: cfg.KV, - leaseMgr: cfg.LeaseManager, - buildLockOpts: cfg.BuildLock, - cleanupLockOpts: cfg.CleanupLock, - chunkSize: chunkSize, - log: log.New("kv-remote-index-store"), + store: cfg.KV, + leaseMgr: cfg.LeaseManager, + buildLockOpts: cfg.BuildLock, + cleanupLockOpts: cfg.CleanupLock, + chunkSize: chunkSize, + chunkConcurrency: chunkConcurrency, + log: log.New("kv-remote-index-store"), }, nil } @@ -258,18 +277,20 @@ func (s *KVRemoteIndexStore) WriteSnapshotFile(ctx context.Context, nsResource r } numChunks := (size + s.chunkSize - 1) / s.chunkSize + g, gctx := errgroup.WithContext(ctx) + g.SetLimit(s.chunkConcurrency) for chunkIdx := range numChunks { offset := chunkIdx * s.chunkSize length := s.chunkSize if remaining := size - offset; remaining < length { length = remaining } - section := io.NewSectionReader(src, offset, length) - if err := s.writeChunk(ctx, nsResource, indexKey, relPath, chunkIdx, section); err != nil { - return err - } + g.Go(func() error { + section := io.NewSectionReader(src, offset, length) + return s.writeChunk(gctx, nsResource, indexKey, relPath, chunkIdx, section) + }) } - return nil + return g.Wait() } // writeChunk uploads a single chunk to its KV key. The reader is fully @@ -326,26 +347,40 @@ func (s *KVRemoteIndexStore) ReadSnapshotFile(ctx context.Context, nsResource re // Chunk 0 covered some but not all of the file, so its size IS the // writer's chunk size; fetch the rest accordingly. chunkSize := n0 - written := n0 - for chunkIdx := int64(1); written < expectedSize; chunkIdx++ { - // Cap each subsequent chunk at chunkSize+1 to detect over-read - // without buffering the whole chunk first. - n, err := s.readChunk(ctx, nsResource, indexKey, relPath, chunkIdx, dst, chunkSize+1) - if err != nil { - if errors.Is(err, kv.ErrNotFound) { - return fmt.Errorf("chunk %d of %s missing: %w", chunkIdx, relPath, err) + numChunks := (expectedSize + chunkSize - 1) / chunkSize + + // Parallel reads write to their assigned slot in dst via + // io.OffsetWriter, so chunks can land in any order. + g, gctx := errgroup.WithContext(ctx) + g.SetLimit(s.chunkConcurrency) + for chunkIdx := int64(1); chunkIdx < numChunks; chunkIdx++ { + offset := chunkIdx * chunkSize + chunkMax := chunkSize + if chunkIdx == numChunks-1 { + // Tail chunk may be short; cap at the remaining bytes. + chunkMax = expectedSize - offset + } + g.Go(func() error { + // Cap each chunk at chunkMax+1 to detect over-read without + // buffering the whole chunk first. + out := io.NewOffsetWriter(dst, offset) + n, err := s.readChunk(gctx, nsResource, indexKey, relPath, chunkIdx, out, chunkMax+1) + if err != nil { + if errors.Is(err, kv.ErrNotFound) { + return fmt.Errorf("chunk %d of %s missing: %w", chunkIdx, relPath, err) + } + return err } - return err - } - if n == 0 { - return fmt.Errorf("chunk %d of %s is empty", chunkIdx, relPath) - } - written += n - if written > expectedSize { - return fmt.Errorf("remote object %s exceeds expected size %d: %w", relPath, expectedSize, resource.ErrWriteLimitExceeded) - } + if n == 0 { + return fmt.Errorf("chunk %d of %s is empty", chunkIdx, relPath) + } + if n > chunkMax { + return fmt.Errorf("remote object %s chunk %d exceeds expected size: %w", relPath, chunkIdx, resource.ErrWriteLimitExceeded) + } + return nil + }) } - return nil + return g.Wait() } // readChunk fetches one chunk into dst, capped at maxBytes to detect diff --git a/pkg/storage/unified/search/kv_remote_index_store_test.go b/pkg/storage/unified/search/kv_remote_index_store_test.go index 46ebb25df29..a27b5f10a53 100644 --- a/pkg/storage/unified/search/kv_remote_index_store_test.go +++ b/pkg/storage/unified/search/kv_remote_index_store_test.go @@ -734,6 +734,29 @@ func TestKVRemoteIndexStore_New_RejectsInvalidChunkSize(t *testing.T) { require.Equal(t, defaultKVChunkSize, s.chunkSize) } +func TestKVRemoteIndexStore_New_RejectsInvalidChunkConcurrency(t *testing.T) { + store := newTestBadgerKV(t) + mgr := newTestLeaseManager(t, store, "owner") + + // Negative is rejected. + _, err := NewKVRemoteIndexStore(KVRemoteIndexStoreConfig{KV: store, LeaseManager: mgr, ChunkConcurrency: -1}) + require.ErrorContains(t, err, "chunk concurrency") + + // Above maximum is rejected. + _, err = NewKVRemoteIndexStore(KVRemoteIndexStoreConfig{KV: store, LeaseManager: mgr, ChunkConcurrency: maxKVChunkConcurrency + 1}) + require.ErrorContains(t, err, "chunk concurrency") + + // Zero defaults to serial (1). + s, err := NewKVRemoteIndexStore(KVRemoteIndexStoreConfig{KV: store, LeaseManager: mgr}) + require.NoError(t, err) + require.Equal(t, 1, s.chunkConcurrency) + + // Valid value is preserved. + s, err = NewKVRemoteIndexStore(KVRemoteIndexStoreConfig{KV: store, LeaseManager: mgr, ChunkConcurrency: 8}) + require.NoError(t, err) + require.Equal(t, 8, s.chunkConcurrency) +} + func TestKVRemoteIndexStore_ChunkedRoundTrip(t *testing.T) { // Round-trips files spanning several chunk-count regimes through the // public Write/Read API and verifies byte-for-byte identity. The small @@ -768,6 +791,33 @@ func TestKVRemoteIndexStore_ChunkedRoundTrip(t *testing.T) { } } +// TestKVRemoteIndexStore_ParallelChunkIO_RoundTrip verifies that the +// parallel write and read paths produce the same bytes as the serial +// path. A many-chunk file is round-tripped with a range of concurrency +// values; each must reconstruct dst byte-for-byte. +func TestKVRemoteIndexStore_ParallelChunkIO_RoundTrip(t *testing.T) { + ns := newTestNsResource() + ctx := t.Context() + const chunkSize int64 = 4096 + // Pick a size that produces a partial tail chunk to exercise the + // numChunks-1 short-chunk branch on the read side. + const size int64 = chunkSize*7 + 123 + + for _, concurrency := range []int{1, 2, 4, 8} { + t.Run(fmt.Sprintf("concurrency=%d", concurrency), func(t *testing.T) { + store := newChunkSizedConcurrentTestStore(t, chunkSize, concurrency) + + key := ulid.Make() + src, want := newTempFileWithContent(t, size) + require.NoError(t, store.WriteSnapshotFile(ctx, ns, key, "f", src)) + + dst := newTempOSFile(t) + require.NoError(t, store.ReadSnapshotFile(ctx, ns, key, "f", dst, size)) + require.Equal(t, want, readAllFromFile(t, dst)) + }) + } +} + func TestKVRemoteIndexStore_ChunkSizeIndependence(t *testing.T) { // A file written with one ChunkSize must round-trip through a reader // configured with a different ChunkSize. The reader recovers the @@ -868,6 +918,16 @@ func newChunkSizedTestStoreOn(t *testing.T, store kv.KV, chunkSize int64) *KVRem return s } +// newChunkSizedConcurrentTestStore is like newChunkSizedTestStore but +// also sets ChunkConcurrency so callers can exercise the parallel I/O +// path with a controlled fan-out. +func newChunkSizedConcurrentTestStore(t *testing.T, chunkSize int64, concurrency int) *KVRemoteIndexStore { + t.Helper() + s := newChunkSizedTestStore(t, chunkSize) + s.chunkConcurrency = concurrency + return s +} + // newTempFileWithContent writes size bytes of a deterministic pattern to a // fresh temp file and returns both the open file (positioned at 0) and the // bytes it contains, so callers can use the file as a snapshot source and diff --git a/pkg/storage/unified/sql/service.go b/pkg/storage/unified/sql/service.go index 70cf04d6129..0273bc6c09f 100644 --- a/pkg/storage/unified/sql/service.go +++ b/pkg/storage/unified/sql/service.go @@ -611,8 +611,9 @@ func buildKVSnapshotStore(cfg *setting.Cfg, backend resource.StorageBackend, log } store, err := search.NewKVRemoteIndexStore(search.KVRemoteIndexStoreConfig{ - KV: kvBackend.KV(), - LeaseManager: leaseMgr, + KV: kvBackend.KV(), + LeaseManager: leaseMgr, + ChunkConcurrency: cfg.IndexSnapshotKVChunkConcurrency, }) if err != nil { return nil, fmt.Errorf("building KV remote index store: %w", err)