From e004f1865dde7d40850847ee8b9bc758af42c98d Mon Sep 17 00:00:00 2001 From: Donggyu Kim Date: Sun, 26 Apr 2026 17:39:11 +0900 Subject: [PATCH] Refactor: Change function names Change function names in rechunker to --- cmd/restic/cmd_copy.go | 2 +- internal/rechunker/rechunker.go | 16 ++++----- internal/rechunker/scheduler.go | 61 +++++++++++++++++++++++++++++---- internal/rechunker/worker.go | 47 ++++--------------------- 4 files changed, 71 insertions(+), 55 deletions(-) diff --git a/cmd/restic/cmd_copy.go b/cmd/restic/cmd_copy.go index 54dbc6167..4971b1257 100644 --- a/cmd/restic/cmd_copy.go +++ b/cmd/restic/cmd_copy.go @@ -417,7 +417,7 @@ func rechunkCopy(ctx context.Context, srcRepo, dstRepo restic.Repository, select printer.V("Number of snapshots: %v", len(rootTrees)) printer.V("Number of distinct files to process: %v", rechnker.NumFiles()) printer.V(" - Total size (including duplicate blobs): %v", ui.FormatBytes(rechnker.TotalSize())) - printer.V("Number of packs to download: %v\n\n", rechnker.PackCount()) + printer.V("Number of packs to download: %v\n\n", rechnker.NumPacks()) debug.Log("Running Rechunk()") progress.Start(rechnker.NumFiles(), rechnker.TotalSize()) diff --git a/internal/rechunker/rechunker.go b/internal/rechunker/rechunker.go index e991d2a61..7498a50a9 100644 --- a/internal/rechunker/rechunker.go +++ b/internal/rechunker/rechunker.go @@ -231,8 +231,8 @@ func (rc *Rechunker) Rechunk(ctx context.Context, srcRepo, dstRepo restic.Reposi defer debug.Log("Closing uploader") wg, ctx := errgroup.WithContext(ctx) - rc.runWorkers(ctx, wg, numWorkers, downloader, uploader, scheduler.Next, scheduler.ReadProgress, bufferPool, p) - rc.runWorkers(ctx, wg, 1, downloader, uploader, scheduler.NextPriority, scheduler.ReadProgress, bufferPool, p) + rc.runWorkers(ctx, wg, numWorkers, downloader, uploader, scheduler.Next, scheduler.progressCursor, bufferPool, p) + rc.runWorkers(ctx, wg, 1, downloader, uploader, scheduler.NextPriority, scheduler.progressCursor, bufferPool, p) return wg.Wait() }) @@ -273,7 +273,7 @@ func (rc *Rechunker) setupCache(ctx context.Context, srcRepo PackLoader, schedul func (rc *Rechunker) runWorkers(ctx context.Context, wg *errgroup.Group, numWorkers int, downloader restic.BlobLoader, uploader restic.BlobSaver, receiveJob func(context.Context) (*ChunkedFile, bool, error), - cursorProgressor func(Cursor, uint) (Cursor, error), bufferPool *BufferPool, p *Progress) { + cursorProgressor func(cursor, uint) (cursor, error), bufferPool *BufferPool, p *Progress) { for range numWorkers { wg.Go(func() error { debug.Log("Starting worker") @@ -382,10 +382,6 @@ func (rc *Rechunker) RewriteTrees(ctx context.Context, srcRepo, dstRepo restic.R return result, nil } -func (rc *Rechunker) NumFiles() int { - return len(rc.filesList) -} - func (rc *Rechunker) GetRewrittenTree(originalTree restic.ID) (restic.ID, error) { newID, ok := rc.rewriteTreeMap[originalTree] if !ok { @@ -398,7 +394,11 @@ func (rc *Rechunker) TotalSize() uint64 { return rc.totalSize } -func (rc *Rechunker) PackCount() int { +func (rc *Rechunker) NumFiles() int { + return len(rc.filesList) +} + +func (rc *Rechunker) NumPacks() int { return len(rc.idx.Packs()) } diff --git a/internal/rechunker/scheduler.go b/internal/rechunker/scheduler.go index 61063cf29..1c33cf00c 100644 --- a/internal/rechunker/scheduler.go +++ b/internal/rechunker/scheduler.go @@ -2,6 +2,7 @@ package rechunker import ( "context" + "fmt" "sync" "github.com/restic/restic/internal/debug" @@ -278,12 +279,23 @@ func (s *Scheduler) SetObsoleteBlobCallback(cb func(restic.IDs)) { s.obsoleteBlobCB = cb } -// ReadProgress computes progress of cursor for a file, while inferring src blob consumption and using that info to track blob usage. -func (s *Scheduler) ReadProgress(cursor Cursor, bytesProcessed uint) (Cursor, error) { - start := cursor - end, err := AdvanceCursor(start, bytesProcessed, s.idx.BlobSize) +func (s *Scheduler) newCursor(blobs restic.IDs) cursor { + if s == nil { + return cursor{} + } + + return cursor{ + blobs: blobs, + blobSize: s.idx.BlobSize, + } +} + +// progressCursor computes progress of cursor for a file, while inferring src blob consumption and using that info to track blob usage. +func (s *Scheduler) progressCursor(c cursor, bytesProcessed uint) (cursor, error) { + start := c + end, err := c.Advance(bytesProcessed) if err != nil { - return Cursor{}, err + return cursor{}, err } if s.obsoleteBlobCB == nil { @@ -294,7 +306,7 @@ func (s *Scheduler) ReadProgress(cursor Cursor, bytesProcessed uint) (Cursor, er return end, nil } - blobs := cursor.blobs[start.BlobIdx:end.BlobIdx] + blobs := c.blobs[start.BlobIdx:end.BlobIdx] var obsolete restic.IDs s.mu.Lock() for _, b := range blobs { @@ -314,6 +326,43 @@ func (s *Scheduler) ReadProgress(cursor Cursor, bytesProcessed uint) (Cursor, er return end, nil } +type cursor struct { + blobs restic.IDs + BlobIdx int + Offset uint + blobSize func(restic.ID) uint +} + +func (c cursor) Advance(numBytes uint) (cursor, error) { + if c.blobs == nil { + return cursor{}, nil + } + + for c.BlobIdx < len(c.blobs) { + blobSize := c.blobSize(c.blobs[c.BlobIdx]) + if blobSize == 0 { + return cursor{}, fmt.Errorf("unknown blob %v", c.blobs[c.BlobIdx].Str()) + } + r := blobSize - c.Offset + + if numBytes < r { + c.Offset += numBytes + numBytes = 0 + break + } + + numBytes -= r + c.BlobIdx++ + c.Offset = 0 + } + + if numBytes != 0 { + return cursor{}, fmt.Errorf("cursor out of range; %d bytes over end position", numBytes) + } + + return c, nil +} + // PrioritySelect selects from two channels with priority; first channel first. func PrioritySelect(ctx context.Context, first <-chan *ChunkedFile, second <-chan *ChunkedFile) (item *ChunkedFile, from int, err error) { // First, try to pull from channel 'first' only. If 'first' is not ready now, try both channels. diff --git a/internal/rechunker/worker.go b/internal/rechunker/worker.go index 124451ffe..33aace50d 100644 --- a/internal/rechunker/worker.go +++ b/internal/rechunker/worker.go @@ -2,7 +2,6 @@ package rechunker import ( "context" - "fmt" "io" "github.com/restic/chunker" @@ -23,12 +22,12 @@ type Worker struct { downloader restic.BlobLoader uploader restic.BlobSaver - cursorProgressor func(cursor Cursor, bytesProcessed uint) (Cursor, error) + cursorProgressor func(cursor cursor, bytesProcessed uint) (cursor, error) } func NewWorker(pol chunker.Pol, downloader restic.BlobLoader, uploader restic.BlobSaver, bufferPool *BufferPool, - cursorProgressor func(Cursor, uint) (Cursor, error), + cursorProgressor func(cursor, uint) (cursor, error), ) *Worker { return &Worker{ pool: bufferPool, @@ -78,14 +77,14 @@ func (w *Worker) runReader(ctx context.Context, wg *errgroup.Group, srcBlobs res w.chunker.Reset(reader, w.pol) - cursor := Cursor{blobs: srcBlobs} + c := cursor{blobs: srcBlobs} for { // bring buffer from bufferPool buf := w.pool.Get() // rechunk with new parameter - c, err := w.chunker.Next(buf) + chunk, err := w.chunker.Next(buf) if err == io.EOF { // reached EOF; all done w.pool.Put(buf) return nil @@ -96,7 +95,7 @@ func (w *Worker) runReader(ctx context.Context, wg *errgroup.Group, srcBlobs res // if cursor progressor callback is given, run it if w.cursorProgressor != nil { - cursor, err = w.cursorProgressor(cursor, c.Length) + c, err = w.cursorProgressor(c, chunk.Length) if err != nil { return err } @@ -106,8 +105,8 @@ func (w *Worker) runReader(ctx context.Context, wg *errgroup.Group, srcBlobs res select { case <-ctx.Done(): return ctx.Err() - case out <- c: - debug.Log("Sending a new chunk of size %v to writer", c.Length) + case out <- chunk: + debug.Log("Sending a new chunk of size %v to writer", chunk.Length) } } }) @@ -228,35 +227,3 @@ func (p *BufferPool) Put(buf []byte) { debug.Log("bufferPool is full; discarding the buffer") } } - -type Cursor struct { - blobs restic.IDs - BlobIdx int - Offset uint -} - -func AdvanceCursor(c Cursor, numBytes uint, blobSizes func(restic.ID) uint) (Cursor, error) { - for c.BlobIdx < len(c.blobs) { - blobSize := blobSizes(c.blobs[c.BlobIdx]) - if blobSize == 0 { - return Cursor{}, fmt.Errorf("blob %v not in blobSizes", c.blobs[c.BlobIdx].Str()) - } - r := blobSize - c.Offset - - if numBytes < r { - c.Offset += numBytes - numBytes = 0 - break - } - - numBytes -= r - c.BlobIdx++ - c.Offset = 0 - } - - if numBytes != 0 { - return Cursor{}, fmt.Errorf("cursor out of range; %d bytes over end position", numBytes) - } - - return c, nil -}