From eb8e127dcd10c0222065cd06e72fd33b568f4c25 Mon Sep 17 00:00:00 2001 From: Donggyu Kim Date: Sun, 26 Apr 2026 17:19:11 +0900 Subject: [PATCH] Refactor: Change rechunker.Index to interface Change rechunker.Index to interface type, so that the index is expandable to custom types --- internal/rechunker/blob_cache.go | 14 ++++---- internal/rechunker/rechunker.go | 58 +++++++++++++++++++++++--------- internal/rechunker/scheduler.go | 6 ++-- internal/rechunker/worker.go | 6 ++-- 4 files changed, 55 insertions(+), 29 deletions(-) diff --git a/internal/rechunker/blob_cache.go b/internal/rechunker/blob_cache.go index e402df636..218104cc4 100644 --- a/internal/rechunker/blob_cache.go +++ b/internal/rechunker/blob_cache.go @@ -15,7 +15,7 @@ type BlobCache struct { mu sync.RWMutex c *simplelru.LRU[restic.ID, []byte] - idx *Index + idx Index free, size int @@ -31,7 +31,7 @@ type BlobCache struct { const overhead = len(restic.ID{}) + 64 func NewBlobCache(ctx context.Context, size int, numDownloaders int, - repo PackLoader, idx *Index, + repo PackLoader, idx Index, onReady func(blobIDs restic.IDs), onEvict func(blobIDs restic.IDs)) *BlobCache { if size < 32*(1<<20) { panic("Blob cache size should be at least 32 MiB!!") @@ -115,7 +115,7 @@ func (c *BlobCache) startDownloaders(ctx context.Context, numDownloaders int, // filter out ignored blobs c.mu.RLock() var filtered []restic.Blob - for _, blob := range c.idx.PackToBlobs[packID] { + for _, blob := range c.idx.PackToBlobs(packID) { ignored := c.ignored.Has(blob.ID) ready := c.c.Contains(blob.ID) if !ignored && !ready { @@ -260,13 +260,13 @@ func (c *BlobCache) asyncGet(ctx context.Context, id restic.ID, buf []byte) <-ch } func (c *BlobCache) requestDownload(ctx context.Context, id restic.ID) error { - packID, ok := c.idx.BlobToPack[id] - if !ok { + packID := c.idx.BlobToPack(id) + if packID.IsNull() { return fmt.Errorf("unknown blob: %v", id.Str()) } c.mu.Lock() - ok = c.waitList.Has(packID) + ok := c.waitList.Has(packID) if !ok { // queue pack download c.waitList.Insert(packID) @@ -332,7 +332,7 @@ type PackLoader interface { LoadBlobsFromPack(context.Context, restic.ID, []restic.Blob, func(restic.BlobHandle, []byte, error) error) error } -func WrapWithCache(ctx context.Context, repo PackLoader, cacheSize int, numDownloaders int, idx *Index, +func WrapWithCache(ctx context.Context, repo PackLoader, cacheSize int, numDownloaders int, idx Index, onReady, onEvict func(restic.IDs)) (*BlobLoaderWithCache, *BlobCache) { r := &BlobLoaderWithCache{ repo: repo, diff --git a/internal/rechunker/rechunker.go b/internal/rechunker/rechunker.go index ba1385630..e991d2a61 100644 --- a/internal/rechunker/rechunker.go +++ b/internal/rechunker/rechunker.go @@ -19,7 +19,7 @@ import ( type Rechunker struct { cfg Config - idx *Index + idx Index filesList []*ChunkedFile totalSize uint64 @@ -41,11 +41,11 @@ type ChunkedFile struct { hashval restic.ID } -// Index is immutable after Plan() returns. -type Index struct { - BlobSize map[restic.ID]uint // blob ID -> blob size - BlobToPack map[restic.ID]restic.ID // blob ID -> pack ID - PackToBlobs map[restic.ID][]restic.Blob // pack ID -> list of blobs to be loaded from the pack +type Index interface { + BlobSize(blobID restic.ID) (size uint) + BlobToPack(blobID restic.ID) (packID restic.ID) + PackToBlobs(packID restic.ID) (blobs []restic.Blob) + Packs() (packIDs restic.IDSet) } func NewRechunker(cfg Config) *Rechunker { @@ -129,7 +129,33 @@ func gatherFileContents(ctx context.Context, repo restic.Loader, rootTrees resti return filesList, totalSize, nil } -func createIndex(filesList []*ChunkedFile, lookupBlob func(t restic.BlobType, id restic.ID) []restic.PackedBlob) (*Index, error) { +type index struct { + blobSize map[restic.ID]uint // blob ID -> blob size + blobIdx map[restic.ID]restic.ID // blob ID -> pack ID + packIdx map[restic.ID][]restic.Blob // pack ID -> list of blobs to be loaded from the pack +} + +func (i *index) BlobSize(id restic.ID) uint { + return i.blobSize[id] +} + +func (i *index) BlobToPack(id restic.ID) restic.ID { + return i.blobIdx[id] +} + +func (i *index) PackToBlobs(id restic.ID) []restic.Blob { + return i.packIdx[id] +} + +func (i *index) Packs() restic.IDSet { + ids := restic.NewIDSet() + for id := range i.packIdx { + ids.Insert(id) + } + return ids +} + +func createIndex(filesList []*ChunkedFile, lookupBlob func(t restic.BlobType, id restic.ID) []restic.PackedBlob) (Index, error) { // collect blob usage info blobCount := map[restic.ID]int{} for _, file := range filesList { @@ -145,8 +171,8 @@ func createIndex(filesList []*ChunkedFile, lookupBlob func(t restic.BlobType, id // build blob lookup info blobSize := map[restic.ID]uint{} - blobToPack := map[restic.ID]restic.ID{} - packToBlobs := map[restic.ID][]restic.Blob{} + blobIdx := map[restic.ID]restic.ID{} + packIdx := map[restic.ID][]restic.Blob{} for blob := range blobCount { packs := lookupBlob(restic.DataBlob, blob) if len(packs) == 0 { @@ -155,14 +181,14 @@ func createIndex(filesList []*ChunkedFile, lookupBlob func(t restic.BlobType, id pb := packs[0] blobSize[pb.Blob.ID] = pb.DataLength() - blobToPack[pb.Blob.ID] = pb.PackID - packToBlobs[pb.PackID] = append(packToBlobs[pb.PackID], pb.Blob) + blobIdx[pb.Blob.ID] = pb.PackID + packIdx[pb.PackID] = append(packIdx[pb.PackID], pb.Blob) } - idx := &Index{ - BlobSize: blobSize, - BlobToPack: blobToPack, - PackToBlobs: packToBlobs, + idx := &index{ + blobSize: blobSize, + blobIdx: blobIdx, + packIdx: packIdx, } return idx, nil @@ -373,7 +399,7 @@ func (rc *Rechunker) TotalSize() uint64 { } func (rc *Rechunker) PackCount() int { - return len(rc.idx.PackToBlobs) + return len(rc.idx.Packs()) } func (rc *Rechunker) TotalAddedToDstRepo() uint64 { diff --git a/internal/rechunker/scheduler.go b/internal/rechunker/scheduler.go index 4343b2872..61063cf29 100644 --- a/internal/rechunker/scheduler.go +++ b/internal/rechunker/scheduler.go @@ -12,7 +12,7 @@ import ( type Scheduler struct { mu sync.Mutex - idx *Index + idx Index regularCh <-chan *ChunkedFile priorityCh <-chan *ChunkedFile @@ -31,7 +31,7 @@ type Scheduler struct { done chan struct{} } -func NewScheduler(ctx context.Context, files []*ChunkedFile, idx *Index, usePriority bool) *Scheduler { +func NewScheduler(ctx context.Context, files []*ChunkedFile, idx Index, usePriority bool) *Scheduler { debug.Log(("Running NewScheduler()")) wg, ctx := errgroup.WithContext(ctx) @@ -281,7 +281,7 @@ func (s *Scheduler) SetObsoleteBlobCallback(cb func(restic.IDs)) { // ReadProgress computes progress of cursor for a file, while inferring src blob consumption and using that info to track blob usage. func (s *Scheduler) ReadProgress(cursor Cursor, bytesProcessed uint) (Cursor, error) { start := cursor - end, err := AdvanceCursor(cursor, bytesProcessed, s.idx.BlobSize) + end, err := AdvanceCursor(start, bytesProcessed, s.idx.BlobSize) if err != nil { return Cursor{}, err } diff --git a/internal/rechunker/worker.go b/internal/rechunker/worker.go index 5174618d9..124451ffe 100644 --- a/internal/rechunker/worker.go +++ b/internal/rechunker/worker.go @@ -235,10 +235,10 @@ type Cursor struct { Offset uint } -func AdvanceCursor(c Cursor, numBytes uint, blobSizes map[restic.ID]uint) (Cursor, error) { +func AdvanceCursor(c Cursor, numBytes uint, blobSizes func(restic.ID) uint) (Cursor, error) { for c.BlobIdx < len(c.blobs) { - blobSize, ok := blobSizes[c.blobs[c.BlobIdx]] - if !ok { + blobSize := blobSizes(c.blobs[c.BlobIdx]) + if blobSize == 0 { return Cursor{}, fmt.Errorf("blob %v not in blobSizes", c.blobs[c.BlobIdx].Str()) } r := blobSize - c.Offset