diff --git a/internal/rechunker/rechunker.go b/internal/rechunker/rechunker.go index 7498a50a9..634262882 100644 --- a/internal/rechunker/rechunker.go +++ b/internal/rechunker/rechunker.go @@ -42,10 +42,10 @@ type ChunkedFile struct { } type Index interface { - BlobSize(blobID restic.ID) (size uint) - BlobToPack(blobID restic.ID) (packID restic.ID) - PackToBlobs(packID restic.ID) (blobs []restic.Blob) - Packs() (packIDs restic.IDSet) + BlobSize(blobID restic.ID) (size uint) // blob ID -> blob size + BlobToPack(blobID restic.ID) (packID restic.ID) // blob ID -> pack ID + PackToBlobs(packID restic.ID) (blobs []restic.Blob) // pack ID -> list of blobs to be loaded from the pack + Packs() (packIDs restic.IDSet) // set of all pack IDs } func NewRechunker(cfg Config) *Rechunker { @@ -230,9 +230,20 @@ func (rc *Rechunker) Rechunk(ctx context.Context, srcRepo, dstRepo restic.Reposi debug.Log("Starting uploader") defer debug.Log("Closing uploader") + workerCfg := WorkerConfig{ + Pol: rc.cfg.Pol, + + Downloader: downloader, + Uploader: uploader, + BufferPool: bufferPool, + + NewCursor: scheduler.newCursor, + UpdateCursor: scheduler.updateCursor, + } + wg, ctx := errgroup.WithContext(ctx) - rc.runWorkers(ctx, wg, numWorkers, downloader, uploader, scheduler.Next, scheduler.progressCursor, bufferPool, p) - rc.runWorkers(ctx, wg, 1, downloader, uploader, scheduler.NextPriority, scheduler.progressCursor, bufferPool, p) + rc.runWorkers(ctx, wg, numWorkers, workerCfg, scheduler.Next, p) + rc.runWorkers(ctx, wg, 1, workerCfg, scheduler.NextPriority, p) return wg.Wait() }) @@ -266,24 +277,18 @@ func (rc *Rechunker) setupCache(ctx context.Context, srcRepo PackLoader, schedul repo, cache = WrapWithCache(ctx, srcRepo, rc.cfg.CacheSize, numDownloaders, rc.idx, scheduler.BlobReady, scheduler.BlobUnready) // register cache.Ignore as scheduler's obsolete blob callback for early cache eviction - scheduler.SetObsoleteBlobCallback(cache.Ignore) + scheduler.SetIgnoreBlobsCallback(cache.Ignore) return repo, cache } func (rc *Rechunker) runWorkers(ctx context.Context, wg *errgroup.Group, numWorkers int, - downloader restic.BlobLoader, uploader restic.BlobSaver, receiveJob func(context.Context) (*ChunkedFile, bool, error), - cursorProgressor func(cursor, uint) (cursor, error), bufferPool *BufferPool, p *Progress) { + workerCfg WorkerConfig, receiveJob func(context.Context) (*ChunkedFile, bool, error), + p *Progress) { for range numWorkers { wg.Go(func() error { debug.Log("Starting worker") - worker := NewWorker( - rc.cfg.Pol, - downloader, - uploader, - bufferPool, - cursorProgressor, - ) + worker := NewWorker(workerCfg) for { debug.Log("receiving job") diff --git a/internal/rechunker/scheduler.go b/internal/rechunker/scheduler.go index 1c33cf00c..6294664b0 100644 --- a/internal/rechunker/scheduler.go +++ b/internal/rechunker/scheduler.go @@ -21,12 +21,12 @@ type Scheduler struct { regularList []*ChunkedFile priorityList []*ChunkedFile - filesContaining map[restic.ID][]*ChunkedFile - blobsToPrepare map[restic.ID]int + prefixLookup map[restic.ID][]*ChunkedFile // blob ID -> files that contain the blob as prefix + remainingPrefixBlobs map[restic.ID]int // file hashval -> remaining count until all its blobs ready - remainingBlobNeeds map[restic.ID]int + remainingBlobUsage map[restic.ID]int // blob ID -> remaining blob usage until the end - obsoleteBlobCB func(ids restic.IDs) + ignoreBlobsCB func(ids restic.IDs) push chan struct{} done chan struct{} @@ -40,25 +40,25 @@ func NewScheduler(ctx context.Context, files []*ChunkedFile, idx Index, usePrior if !usePriority { s := &Scheduler{ - idx: idx, - regularList: files, - done: make(chan struct{}), - filesContaining: filesContaining, - blobsToPrepare: blobsToPrepare, - remainingBlobNeeds: remainingBlobNeeds, + idx: idx, + regularList: files, + done: make(chan struct{}), + prefixLookup: filesContaining, + remainingPrefixBlobs: blobsToPrepare, + remainingBlobUsage: remainingBlobNeeds, } s.createRegularCh(ctx, wg, nil) return s } s := &Scheduler{ - idx: idx, - regularList: files, - push: make(chan struct{}, 1), - done: make(chan struct{}), - filesContaining: filesContaining, - blobsToPrepare: blobsToPrepare, - remainingBlobNeeds: remainingBlobNeeds, + idx: idx, + regularList: files, + push: make(chan struct{}, 1), + done: make(chan struct{}), + prefixLookup: filesContaining, + remainingPrefixBlobs: blobsToPrepare, + remainingBlobUsage: remainingBlobNeeds, } set := restic.IDSet{} @@ -82,23 +82,23 @@ func NewScheduler(ctx context.Context, files []*ChunkedFile, idx Index, usePrior const FILE_HEAD_LENGTH = 25 func createSchedulerState(files []*ChunkedFile) (map[restic.ID][]*ChunkedFile, map[restic.ID]int, map[restic.ID]int) { - blobCount := map[restic.ID]int{} - filesContaining := map[restic.ID][]*ChunkedFile{} - blobsToPrepare := map[restic.ID]int{} + blobUsage := map[restic.ID]int{} + prefixLookup := map[restic.ID][]*ChunkedFile{} + numPrefixBlobs := map[restic.ID]int{} for _, file := range files { prefixLen := min(FILE_HEAD_LENGTH, len(file.IDs)) - blobSet := restic.NewIDSet(file.IDs[:prefixLen]...) - blobsToPrepare[file.hashval] = len(blobSet) + prefixSet := restic.NewIDSet(file.IDs[:prefixLen]...) + numPrefixBlobs[file.hashval] = len(prefixSet) for _, blob := range file.IDs { - blobCount[blob]++ + blobUsage[blob]++ } - for b := range blobSet { - filesContaining[b] = append(filesContaining[b], file) + for b := range prefixSet { + prefixLookup[b] = append(prefixLookup[b], file) } } - return filesContaining, blobsToPrepare, blobCount + return prefixLookup, numPrefixBlobs, blobUsage } func (s *Scheduler) Next(ctx context.Context) (*ChunkedFile, bool, error) { @@ -114,7 +114,7 @@ func (s *Scheduler) NextPriority(ctx context.Context) (*ChunkedFile, bool, error return file, from != 0, err } -func (s *Scheduler) PushPriority(files []*ChunkedFile) { +func (s *Scheduler) pushPriority(files []*ChunkedFile) { if s.priorityCh == nil { return } @@ -221,14 +221,14 @@ func (s *Scheduler) BlobReady(ids restic.IDs) { s.mu.Lock() for _, id := range ids { - for _, file := range s.filesContaining[id] { - n := s.blobsToPrepare[file.hashval] + for _, file := range s.prefixLookup[id] { + n := s.remainingPrefixBlobs[file.hashval] if n > 0 { n-- if n == 0 { readyFiles = append(readyFiles, file) } - s.blobsToPrepare[file.hashval] = n + s.remainingPrefixBlobs[file.hashval] = n } } } @@ -238,7 +238,7 @@ func (s *Scheduler) BlobReady(ids restic.IDs) { return } - s.PushPriority(readyFiles) + s.pushPriority(readyFiles) if debugStats != nil { dAdds := map[string]int{} @@ -261,22 +261,22 @@ func (s *Scheduler) BlobUnready(ids restic.IDs) { s.mu.Lock() for _, id := range ids { - filesToUpdate := s.filesContaining[id] + filesToUpdate := s.prefixLookup[id] for _, file := range filesToUpdate { // files with blobsToPrepare==0 is not tracked - if s.blobsToPrepare[file.hashval] > 0 { - s.blobsToPrepare[file.hashval]++ + if s.remainingPrefixBlobs[file.hashval] > 0 { + s.remainingPrefixBlobs[file.hashval]++ } } } s.mu.Unlock() } -func (s *Scheduler) SetObsoleteBlobCallback(cb func(restic.IDs)) { +func (s *Scheduler) SetIgnoreBlobsCallback(cb func(restic.IDs)) { s.mu.Lock() defer s.mu.Unlock() - s.obsoleteBlobCB = cb + s.ignoreBlobsCB = cb } func (s *Scheduler) newCursor(blobs restic.IDs) cursor { @@ -290,18 +290,14 @@ func (s *Scheduler) newCursor(blobs restic.IDs) cursor { } } -// progressCursor computes progress of cursor for a file, while inferring src blob consumption and using that info to track blob usage. -func (s *Scheduler) progressCursor(c cursor, bytesProcessed uint) (cursor, error) { +// updateCursor computes progress of cursor for a file, while inferring src blob consumption and using that info to track blob usage. +func (s *Scheduler) updateCursor(c cursor, bytesProcessed uint) (cursor, error) { start := c end, err := c.Advance(bytesProcessed) if err != nil { return cursor{}, err } - if s.obsoleteBlobCB == nil { - return end, nil - } - if start.BlobIdx == end.BlobIdx { return end, nil } @@ -310,8 +306,8 @@ func (s *Scheduler) progressCursor(c cursor, bytesProcessed uint) (cursor, error var obsolete restic.IDs s.mu.Lock() for _, b := range blobs { - s.remainingBlobNeeds[b]-- - if s.remainingBlobNeeds[b] == 0 { + s.remainingBlobUsage[b]-- + if s.remainingBlobUsage[b] == 0 { obsolete = append(obsolete, b) } } @@ -321,7 +317,9 @@ func (s *Scheduler) progressCursor(c cursor, bytesProcessed uint) (cursor, error return end, nil } - s.obsoleteBlobCB(obsolete) + if s.ignoreBlobsCB != nil { + s.ignoreBlobsCB(obsolete) + } return end, nil } diff --git a/internal/rechunker/worker.go b/internal/rechunker/worker.go index 33aace50d..9f433ca62 100644 --- a/internal/rechunker/worker.go +++ b/internal/rechunker/worker.go @@ -22,22 +22,34 @@ type Worker struct { downloader restic.BlobLoader uploader restic.BlobSaver - cursorProgressor func(cursor cursor, bytesProcessed uint) (cursor, error) + newCursor func(blobs restic.IDs) cursor + updateCursor func(c cursor, numBytes uint) (cursor, error) +} +type WorkerConfig struct { + Pol chunker.Pol + + Downloader restic.BlobLoader + Uploader restic.BlobSaver + BufferPool *BufferPool + + NewCursor func(blobs restic.IDs) cursor + UpdateCursor func(c cursor, numBytes uint) (cursor, error) } -func NewWorker(pol chunker.Pol, downloader restic.BlobLoader, uploader restic.BlobSaver, - bufferPool *BufferPool, - cursorProgressor func(cursor, uint) (cursor, error), -) *Worker { +func NewWorker(cfg WorkerConfig) *Worker { + if cfg.BufferPool == nil { + cfg.BufferPool = NewBufferPool(3) + } return &Worker{ - pool: bufferPool, + pool: cfg.BufferPool, - chunker: chunker.New(nil, pol), - pol: pol, - downloader: downloader, - uploader: uploader, + chunker: chunker.New(nil, cfg.Pol), + pol: cfg.Pol, + downloader: cfg.Downloader, + uploader: cfg.Uploader, - cursorProgressor: cursorProgressor, + newCursor: cfg.NewCursor, + updateCursor: cfg.UpdateCursor, } } @@ -77,7 +89,10 @@ func (w *Worker) runReader(ctx context.Context, wg *errgroup.Group, srcBlobs res w.chunker.Reset(reader, w.pol) - c := cursor{blobs: srcBlobs} + var c cursor + if w.newCursor != nil { + c = w.newCursor(srcBlobs) + } for { // bring buffer from bufferPool @@ -93,9 +108,8 @@ func (w *Worker) runReader(ctx context.Context, wg *errgroup.Group, srcBlobs res return err } - // if cursor progressor callback is given, run it - if w.cursorProgressor != nil { - c, err = w.cursorProgressor(c, chunk.Length) + if w.updateCursor != nil { + c, err = w.updateCursor(c, chunk.Length) if err != nil { return err }