Search: parallel chunk I/O for KV-backed snapshots (#126090)

WriteSnapshotFile and ReadSnapshotFile previously walked chunks serially.
On the write side every chunk is independent, and on the read side chunks
1..N-1 are independent once chunk 0 has been fetched (its size reveals the
writer's chunk size). Both paths now run their chunk loops through an
errgroup with a configurable fan-out.

Concurrency is exposed as index_snapshot_kv_chunk_concurrency in cfg
and plumbed to KVRemoteIndexStoreConfig.ChunkConcurrency. The default
is 1, which is the previous serial behavior. Operators can raise it at
runtime to measure throughput tradeoffs without redeploying.

Parallel reads write to dst via io.NewOffsetWriter, one per chunk, so
no per-chunk buffer is held in memory. *os.File.WriteAt is safe under
concurrent use to non-overlapping regions, which is what our slots are
by construction.
This commit is contained in:
Peter Štibraný 2026-06-09 20:12:08 +02:00 committed by GitHub
parent 3e351e9f9c
commit a960388db2
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
5 changed files with 135 additions and 37 deletions

View file

@ -683,6 +683,7 @@ type Cfg struct {
IndexSnapshotEnabled bool // Enable remote index snapshots
IndexSnapshotBucketURL string // Go CDK bucket URL for snapshot storage (s3://, gs://, azblob://, mem://, file:///)
IndexSnapshotStorageKV bool // Store snapshots in the same KV used by the storage backend instead of an object-storage bucket. Mutually exclusive with index_snapshot_bucket_url; requires enable_kv_leases.
IndexSnapshotKVChunkConcurrency int // Per-file chunk I/O fan-out for KV-backed snapshots. 0 / 1 = serial. Used only when index_snapshot_storage_kv is true.
IndexSnapshotThreshold int // Min doc count to use remote snapshots (must be >= IndexFileThreshold, default: 5000)
IndexSnapshotMaxAge time.Duration // Max snapshot age before deletion (must be >= MaxFileIndexAge, default: 7d)
IndexSnapshotCleanupGracePeriod time.Duration // Time a new snapshot must exist before its predecessor in the same Grafana-version group is eligible for cleanup (default: 30m)

View file

@ -262,6 +262,7 @@ func (cfg *Cfg) setUnifiedStorageConfig() {
cfg.IndexSnapshotEnabled = section.Key("index_snapshot_enabled").MustBool(false)
cfg.IndexSnapshotBucketURL = section.Key("index_snapshot_bucket_url").String()
cfg.IndexSnapshotStorageKV = section.Key("index_snapshot_storage_kv").MustBool(false)
cfg.IndexSnapshotKVChunkConcurrency = section.Key("index_snapshot_kv_chunk_concurrency").MustInt(1)
cfg.IndexSnapshotThreshold = section.Key("index_snapshot_threshold").MustInt(5000)
if cfg.IndexSnapshotThreshold < cfg.IndexFileThreshold {
cfg.Logger.Warn("index_snapshot_threshold is smaller than index_file_threshold, overriding", "configured", cfg.IndexSnapshotThreshold, "index_file_threshold", cfg.IndexFileThreshold)

View file

@ -13,6 +13,7 @@ import (
"time"
"github.com/oklog/ulid/v2"
"golang.org/x/sync/errgroup"
"github.com/grafana/grafana/pkg/apimachinery/validation"
"github.com/grafana/grafana/pkg/infra/log"
@ -57,6 +58,12 @@ const (
minKVChunkSize int64 = 1024 * 1024
maxKVChunkSize int64 = 1024 * 1024 * 1024
// maxKVChunkConcurrency caps the per-file parallelism for chunk I/O.
// The cap is a sanity bound: values much higher than this hit
// diminishing returns and risk overloading the KV backend or
// exhausting connection pools.
maxKVChunkConcurrency = 32
// kvChunkSuffixFormat zero-pads the chunk index so listing returns
// chunks in numeric order. Six digits supports up to a million chunks
// per file, which is comfortable headroom even at the minimum chunk
@ -85,6 +92,12 @@ type KVRemoteIndexStoreConfig struct {
// length, so writers can change ChunkSize between deployments without
// breaking existing snapshots. Zero falls back to defaultKVChunkSize.
ChunkSize int64
// ChunkConcurrency bounds the number of chunks uploaded or downloaded
// in parallel per snapshot file. Zero means 1 (fully serial). On the
// read side, chunk 0 is always read serially first (it reveals the
// writer's chunk size); only chunks 1..N-1 are parallelized.
ChunkConcurrency int
}
// KVRemoteIndexStore implements RemoteIndexStore on top of a kv.KV.
@ -127,12 +140,13 @@ type KVRemoteIndexStoreConfig struct {
// not surface incomplete uploads; the cleanup pass uses
// ListIndexKeysIncludingIncomplete to get that wider view.
type KVRemoteIndexStore struct {
store kv.KV
leaseMgr *lease.Manager
buildLockOpts LockOptions
cleanupLockOpts LockOptions
chunkSize int64
log log.Logger
store kv.KV
leaseMgr *lease.Manager
buildLockOpts LockOptions
cleanupLockOpts LockOptions
chunkSize int64
chunkConcurrency int
log log.Logger
}
// NewKVRemoteIndexStore creates a KVRemoteIndexStore from cfg. Returns an
@ -148,13 +162,18 @@ func NewKVRemoteIndexStore(cfg KVRemoteIndexStoreConfig) (*KVRemoteIndexStore, e
if chunkSize < minKVChunkSize || chunkSize > maxKVChunkSize {
return nil, fmt.Errorf("chunk size %d out of range [%d, %d]", chunkSize, minKVChunkSize, maxKVChunkSize)
}
chunkConcurrency := cmp.Or(cfg.ChunkConcurrency, 1)
if chunkConcurrency < 1 || chunkConcurrency > maxKVChunkConcurrency {
return nil, fmt.Errorf("chunk concurrency %d out of range [1, %d]", chunkConcurrency, maxKVChunkConcurrency)
}
return &KVRemoteIndexStore{
store: cfg.KV,
leaseMgr: cfg.LeaseManager,
buildLockOpts: cfg.BuildLock,
cleanupLockOpts: cfg.CleanupLock,
chunkSize: chunkSize,
log: log.New("kv-remote-index-store"),
store: cfg.KV,
leaseMgr: cfg.LeaseManager,
buildLockOpts: cfg.BuildLock,
cleanupLockOpts: cfg.CleanupLock,
chunkSize: chunkSize,
chunkConcurrency: chunkConcurrency,
log: log.New("kv-remote-index-store"),
}, nil
}
@ -258,18 +277,20 @@ func (s *KVRemoteIndexStore) WriteSnapshotFile(ctx context.Context, nsResource r
}
numChunks := (size + s.chunkSize - 1) / s.chunkSize
g, gctx := errgroup.WithContext(ctx)
g.SetLimit(s.chunkConcurrency)
for chunkIdx := range numChunks {
offset := chunkIdx * s.chunkSize
length := s.chunkSize
if remaining := size - offset; remaining < length {
length = remaining
}
section := io.NewSectionReader(src, offset, length)
if err := s.writeChunk(ctx, nsResource, indexKey, relPath, chunkIdx, section); err != nil {
return err
}
g.Go(func() error {
section := io.NewSectionReader(src, offset, length)
return s.writeChunk(gctx, nsResource, indexKey, relPath, chunkIdx, section)
})
}
return nil
return g.Wait()
}
// writeChunk uploads a single chunk to its KV key. The reader is fully
@ -326,26 +347,40 @@ func (s *KVRemoteIndexStore) ReadSnapshotFile(ctx context.Context, nsResource re
// Chunk 0 covered some but not all of the file, so its size IS the
// writer's chunk size; fetch the rest accordingly.
chunkSize := n0
written := n0
for chunkIdx := int64(1); written < expectedSize; chunkIdx++ {
// Cap each subsequent chunk at chunkSize+1 to detect over-read
// without buffering the whole chunk first.
n, err := s.readChunk(ctx, nsResource, indexKey, relPath, chunkIdx, dst, chunkSize+1)
if err != nil {
if errors.Is(err, kv.ErrNotFound) {
return fmt.Errorf("chunk %d of %s missing: %w", chunkIdx, relPath, err)
numChunks := (expectedSize + chunkSize - 1) / chunkSize
// Parallel reads write to their assigned slot in dst via
// io.OffsetWriter, so chunks can land in any order.
g, gctx := errgroup.WithContext(ctx)
g.SetLimit(s.chunkConcurrency)
for chunkIdx := int64(1); chunkIdx < numChunks; chunkIdx++ {
offset := chunkIdx * chunkSize
chunkMax := chunkSize
if chunkIdx == numChunks-1 {
// Tail chunk may be short; cap at the remaining bytes.
chunkMax = expectedSize - offset
}
g.Go(func() error {
// Cap each chunk at chunkMax+1 to detect over-read without
// buffering the whole chunk first.
out := io.NewOffsetWriter(dst, offset)
n, err := s.readChunk(gctx, nsResource, indexKey, relPath, chunkIdx, out, chunkMax+1)
if err != nil {
if errors.Is(err, kv.ErrNotFound) {
return fmt.Errorf("chunk %d of %s missing: %w", chunkIdx, relPath, err)
}
return err
}
return err
}
if n == 0 {
return fmt.Errorf("chunk %d of %s is empty", chunkIdx, relPath)
}
written += n
if written > expectedSize {
return fmt.Errorf("remote object %s exceeds expected size %d: %w", relPath, expectedSize, resource.ErrWriteLimitExceeded)
}
if n == 0 {
return fmt.Errorf("chunk %d of %s is empty", chunkIdx, relPath)
}
if n > chunkMax {
return fmt.Errorf("remote object %s chunk %d exceeds expected size: %w", relPath, chunkIdx, resource.ErrWriteLimitExceeded)
}
return nil
})
}
return nil
return g.Wait()
}
// readChunk fetches one chunk into dst, capped at maxBytes to detect

View file

@ -734,6 +734,29 @@ func TestKVRemoteIndexStore_New_RejectsInvalidChunkSize(t *testing.T) {
require.Equal(t, defaultKVChunkSize, s.chunkSize)
}
func TestKVRemoteIndexStore_New_RejectsInvalidChunkConcurrency(t *testing.T) {
store := newTestBadgerKV(t)
mgr := newTestLeaseManager(t, store, "owner")
// Negative is rejected.
_, err := NewKVRemoteIndexStore(KVRemoteIndexStoreConfig{KV: store, LeaseManager: mgr, ChunkConcurrency: -1})
require.ErrorContains(t, err, "chunk concurrency")
// Above maximum is rejected.
_, err = NewKVRemoteIndexStore(KVRemoteIndexStoreConfig{KV: store, LeaseManager: mgr, ChunkConcurrency: maxKVChunkConcurrency + 1})
require.ErrorContains(t, err, "chunk concurrency")
// Zero defaults to serial (1).
s, err := NewKVRemoteIndexStore(KVRemoteIndexStoreConfig{KV: store, LeaseManager: mgr})
require.NoError(t, err)
require.Equal(t, 1, s.chunkConcurrency)
// Valid value is preserved.
s, err = NewKVRemoteIndexStore(KVRemoteIndexStoreConfig{KV: store, LeaseManager: mgr, ChunkConcurrency: 8})
require.NoError(t, err)
require.Equal(t, 8, s.chunkConcurrency)
}
func TestKVRemoteIndexStore_ChunkedRoundTrip(t *testing.T) {
// Round-trips files spanning several chunk-count regimes through the
// public Write/Read API and verifies byte-for-byte identity. The small
@ -768,6 +791,33 @@ func TestKVRemoteIndexStore_ChunkedRoundTrip(t *testing.T) {
}
}
// TestKVRemoteIndexStore_ParallelChunkIO_RoundTrip verifies that the
// parallel write and read paths produce the same bytes as the serial
// path. A many-chunk file is round-tripped with a range of concurrency
// values; each must reconstruct dst byte-for-byte.
func TestKVRemoteIndexStore_ParallelChunkIO_RoundTrip(t *testing.T) {
ns := newTestNsResource()
ctx := t.Context()
const chunkSize int64 = 4096
// Pick a size that produces a partial tail chunk to exercise the
// numChunks-1 short-chunk branch on the read side.
const size int64 = chunkSize*7 + 123
for _, concurrency := range []int{1, 2, 4, 8} {
t.Run(fmt.Sprintf("concurrency=%d", concurrency), func(t *testing.T) {
store := newChunkSizedConcurrentTestStore(t, chunkSize, concurrency)
key := ulid.Make()
src, want := newTempFileWithContent(t, size)
require.NoError(t, store.WriteSnapshotFile(ctx, ns, key, "f", src))
dst := newTempOSFile(t)
require.NoError(t, store.ReadSnapshotFile(ctx, ns, key, "f", dst, size))
require.Equal(t, want, readAllFromFile(t, dst))
})
}
}
func TestKVRemoteIndexStore_ChunkSizeIndependence(t *testing.T) {
// A file written with one ChunkSize must round-trip through a reader
// configured with a different ChunkSize. The reader recovers the
@ -868,6 +918,16 @@ func newChunkSizedTestStoreOn(t *testing.T, store kv.KV, chunkSize int64) *KVRem
return s
}
// newChunkSizedConcurrentTestStore is like newChunkSizedTestStore but
// also sets ChunkConcurrency so callers can exercise the parallel I/O
// path with a controlled fan-out.
func newChunkSizedConcurrentTestStore(t *testing.T, chunkSize int64, concurrency int) *KVRemoteIndexStore {
t.Helper()
s := newChunkSizedTestStore(t, chunkSize)
s.chunkConcurrency = concurrency
return s
}
// newTempFileWithContent writes size bytes of a deterministic pattern to a
// fresh temp file and returns both the open file (positioned at 0) and the
// bytes it contains, so callers can use the file as a snapshot source and

View file

@ -611,8 +611,9 @@ func buildKVSnapshotStore(cfg *setting.Cfg, backend resource.StorageBackend, log
}
store, err := search.NewKVRemoteIndexStore(search.KVRemoteIndexStoreConfig{
KV: kvBackend.KV(),
LeaseManager: leaseMgr,
KV: kvBackend.KV(),
LeaseManager: leaseMgr,
ChunkConcurrency: cfg.IndexSnapshotKVChunkConcurrency,
})
if err != nil {
return nil, fmt.Errorf("building KV remote index store: %w", err)