mirror of
https://github.com/restic/restic.git
synced 2026-05-28 04:35:41 -04:00
Refactor: Change function names
Change function names in rechunker to
This commit is contained in:
parent
eb8e127dcd
commit
e004f1865d
4 changed files with 71 additions and 55 deletions
|
|
@ -417,7 +417,7 @@ func rechunkCopy(ctx context.Context, srcRepo, dstRepo restic.Repository, select
|
|||
printer.V("Number of snapshots: %v", len(rootTrees))
|
||||
printer.V("Number of distinct files to process: %v", rechnker.NumFiles())
|
||||
printer.V(" - Total size (including duplicate blobs): %v", ui.FormatBytes(rechnker.TotalSize()))
|
||||
printer.V("Number of packs to download: %v\n\n", rechnker.PackCount())
|
||||
printer.V("Number of packs to download: %v\n\n", rechnker.NumPacks())
|
||||
|
||||
debug.Log("Running Rechunk()")
|
||||
progress.Start(rechnker.NumFiles(), rechnker.TotalSize())
|
||||
|
|
|
|||
|
|
@ -231,8 +231,8 @@ func (rc *Rechunker) Rechunk(ctx context.Context, srcRepo, dstRepo restic.Reposi
|
|||
defer debug.Log("Closing uploader")
|
||||
|
||||
wg, ctx := errgroup.WithContext(ctx)
|
||||
rc.runWorkers(ctx, wg, numWorkers, downloader, uploader, scheduler.Next, scheduler.ReadProgress, bufferPool, p)
|
||||
rc.runWorkers(ctx, wg, 1, downloader, uploader, scheduler.NextPriority, scheduler.ReadProgress, bufferPool, p)
|
||||
rc.runWorkers(ctx, wg, numWorkers, downloader, uploader, scheduler.Next, scheduler.progressCursor, bufferPool, p)
|
||||
rc.runWorkers(ctx, wg, 1, downloader, uploader, scheduler.NextPriority, scheduler.progressCursor, bufferPool, p)
|
||||
|
||||
return wg.Wait()
|
||||
})
|
||||
|
|
@ -273,7 +273,7 @@ func (rc *Rechunker) setupCache(ctx context.Context, srcRepo PackLoader, schedul
|
|||
|
||||
func (rc *Rechunker) runWorkers(ctx context.Context, wg *errgroup.Group, numWorkers int,
|
||||
downloader restic.BlobLoader, uploader restic.BlobSaver, receiveJob func(context.Context) (*ChunkedFile, bool, error),
|
||||
cursorProgressor func(Cursor, uint) (Cursor, error), bufferPool *BufferPool, p *Progress) {
|
||||
cursorProgressor func(cursor, uint) (cursor, error), bufferPool *BufferPool, p *Progress) {
|
||||
for range numWorkers {
|
||||
wg.Go(func() error {
|
||||
debug.Log("Starting worker")
|
||||
|
|
@ -382,10 +382,6 @@ func (rc *Rechunker) RewriteTrees(ctx context.Context, srcRepo, dstRepo restic.R
|
|||
return result, nil
|
||||
}
|
||||
|
||||
func (rc *Rechunker) NumFiles() int {
|
||||
return len(rc.filesList)
|
||||
}
|
||||
|
||||
func (rc *Rechunker) GetRewrittenTree(originalTree restic.ID) (restic.ID, error) {
|
||||
newID, ok := rc.rewriteTreeMap[originalTree]
|
||||
if !ok {
|
||||
|
|
@ -398,7 +394,11 @@ func (rc *Rechunker) TotalSize() uint64 {
|
|||
return rc.totalSize
|
||||
}
|
||||
|
||||
func (rc *Rechunker) PackCount() int {
|
||||
func (rc *Rechunker) NumFiles() int {
|
||||
return len(rc.filesList)
|
||||
}
|
||||
|
||||
func (rc *Rechunker) NumPacks() int {
|
||||
return len(rc.idx.Packs())
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -2,6 +2,7 @@ package rechunker
|
|||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"sync"
|
||||
|
||||
"github.com/restic/restic/internal/debug"
|
||||
|
|
@ -278,12 +279,23 @@ func (s *Scheduler) SetObsoleteBlobCallback(cb func(restic.IDs)) {
|
|||
s.obsoleteBlobCB = cb
|
||||
}
|
||||
|
||||
// ReadProgress computes progress of cursor for a file, while inferring src blob consumption and using that info to track blob usage.
|
||||
func (s *Scheduler) ReadProgress(cursor Cursor, bytesProcessed uint) (Cursor, error) {
|
||||
start := cursor
|
||||
end, err := AdvanceCursor(start, bytesProcessed, s.idx.BlobSize)
|
||||
func (s *Scheduler) newCursor(blobs restic.IDs) cursor {
|
||||
if s == nil {
|
||||
return cursor{}
|
||||
}
|
||||
|
||||
return cursor{
|
||||
blobs: blobs,
|
||||
blobSize: s.idx.BlobSize,
|
||||
}
|
||||
}
|
||||
|
||||
// progressCursor computes progress of cursor for a file, while inferring src blob consumption and using that info to track blob usage.
|
||||
func (s *Scheduler) progressCursor(c cursor, bytesProcessed uint) (cursor, error) {
|
||||
start := c
|
||||
end, err := c.Advance(bytesProcessed)
|
||||
if err != nil {
|
||||
return Cursor{}, err
|
||||
return cursor{}, err
|
||||
}
|
||||
|
||||
if s.obsoleteBlobCB == nil {
|
||||
|
|
@ -294,7 +306,7 @@ func (s *Scheduler) ReadProgress(cursor Cursor, bytesProcessed uint) (Cursor, er
|
|||
return end, nil
|
||||
}
|
||||
|
||||
blobs := cursor.blobs[start.BlobIdx:end.BlobIdx]
|
||||
blobs := c.blobs[start.BlobIdx:end.BlobIdx]
|
||||
var obsolete restic.IDs
|
||||
s.mu.Lock()
|
||||
for _, b := range blobs {
|
||||
|
|
@ -314,6 +326,43 @@ func (s *Scheduler) ReadProgress(cursor Cursor, bytesProcessed uint) (Cursor, er
|
|||
return end, nil
|
||||
}
|
||||
|
||||
type cursor struct {
|
||||
blobs restic.IDs
|
||||
BlobIdx int
|
||||
Offset uint
|
||||
blobSize func(restic.ID) uint
|
||||
}
|
||||
|
||||
func (c cursor) Advance(numBytes uint) (cursor, error) {
|
||||
if c.blobs == nil {
|
||||
return cursor{}, nil
|
||||
}
|
||||
|
||||
for c.BlobIdx < len(c.blobs) {
|
||||
blobSize := c.blobSize(c.blobs[c.BlobIdx])
|
||||
if blobSize == 0 {
|
||||
return cursor{}, fmt.Errorf("unknown blob %v", c.blobs[c.BlobIdx].Str())
|
||||
}
|
||||
r := blobSize - c.Offset
|
||||
|
||||
if numBytes < r {
|
||||
c.Offset += numBytes
|
||||
numBytes = 0
|
||||
break
|
||||
}
|
||||
|
||||
numBytes -= r
|
||||
c.BlobIdx++
|
||||
c.Offset = 0
|
||||
}
|
||||
|
||||
if numBytes != 0 {
|
||||
return cursor{}, fmt.Errorf("cursor out of range; %d bytes over end position", numBytes)
|
||||
}
|
||||
|
||||
return c, nil
|
||||
}
|
||||
|
||||
// PrioritySelect selects from two channels with priority; first channel first.
|
||||
func PrioritySelect(ctx context.Context, first <-chan *ChunkedFile, second <-chan *ChunkedFile) (item *ChunkedFile, from int, err error) {
|
||||
// First, try to pull from channel 'first' only. If 'first' is not ready now, try both channels.
|
||||
|
|
|
|||
|
|
@ -2,7 +2,6 @@ package rechunker
|
|||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"io"
|
||||
|
||||
"github.com/restic/chunker"
|
||||
|
|
@ -23,12 +22,12 @@ type Worker struct {
|
|||
downloader restic.BlobLoader
|
||||
uploader restic.BlobSaver
|
||||
|
||||
cursorProgressor func(cursor Cursor, bytesProcessed uint) (Cursor, error)
|
||||
cursorProgressor func(cursor cursor, bytesProcessed uint) (cursor, error)
|
||||
}
|
||||
|
||||
func NewWorker(pol chunker.Pol, downloader restic.BlobLoader, uploader restic.BlobSaver,
|
||||
bufferPool *BufferPool,
|
||||
cursorProgressor func(Cursor, uint) (Cursor, error),
|
||||
cursorProgressor func(cursor, uint) (cursor, error),
|
||||
) *Worker {
|
||||
return &Worker{
|
||||
pool: bufferPool,
|
||||
|
|
@ -78,14 +77,14 @@ func (w *Worker) runReader(ctx context.Context, wg *errgroup.Group, srcBlobs res
|
|||
|
||||
w.chunker.Reset(reader, w.pol)
|
||||
|
||||
cursor := Cursor{blobs: srcBlobs}
|
||||
c := cursor{blobs: srcBlobs}
|
||||
|
||||
for {
|
||||
// bring buffer from bufferPool
|
||||
buf := w.pool.Get()
|
||||
|
||||
// rechunk with new parameter
|
||||
c, err := w.chunker.Next(buf)
|
||||
chunk, err := w.chunker.Next(buf)
|
||||
if err == io.EOF { // reached EOF; all done
|
||||
w.pool.Put(buf)
|
||||
return nil
|
||||
|
|
@ -96,7 +95,7 @@ func (w *Worker) runReader(ctx context.Context, wg *errgroup.Group, srcBlobs res
|
|||
|
||||
// if cursor progressor callback is given, run it
|
||||
if w.cursorProgressor != nil {
|
||||
cursor, err = w.cursorProgressor(cursor, c.Length)
|
||||
c, err = w.cursorProgressor(c, chunk.Length)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
|
@ -106,8 +105,8 @@ func (w *Worker) runReader(ctx context.Context, wg *errgroup.Group, srcBlobs res
|
|||
select {
|
||||
case <-ctx.Done():
|
||||
return ctx.Err()
|
||||
case out <- c:
|
||||
debug.Log("Sending a new chunk of size %v to writer", c.Length)
|
||||
case out <- chunk:
|
||||
debug.Log("Sending a new chunk of size %v to writer", chunk.Length)
|
||||
}
|
||||
}
|
||||
})
|
||||
|
|
@ -228,35 +227,3 @@ func (p *BufferPool) Put(buf []byte) {
|
|||
debug.Log("bufferPool is full; discarding the buffer")
|
||||
}
|
||||
}
|
||||
|
||||
type Cursor struct {
|
||||
blobs restic.IDs
|
||||
BlobIdx int
|
||||
Offset uint
|
||||
}
|
||||
|
||||
func AdvanceCursor(c Cursor, numBytes uint, blobSizes func(restic.ID) uint) (Cursor, error) {
|
||||
for c.BlobIdx < len(c.blobs) {
|
||||
blobSize := blobSizes(c.blobs[c.BlobIdx])
|
||||
if blobSize == 0 {
|
||||
return Cursor{}, fmt.Errorf("blob %v not in blobSizes", c.blobs[c.BlobIdx].Str())
|
||||
}
|
||||
r := blobSize - c.Offset
|
||||
|
||||
if numBytes < r {
|
||||
c.Offset += numBytes
|
||||
numBytes = 0
|
||||
break
|
||||
}
|
||||
|
||||
numBytes -= r
|
||||
c.BlobIdx++
|
||||
c.Offset = 0
|
||||
}
|
||||
|
||||
if numBytes != 0 {
|
||||
return Cursor{}, fmt.Errorf("cursor out of range; %d bytes over end position", numBytes)
|
||||
}
|
||||
|
||||
return c, nil
|
||||
}
|
||||
|
|
|
|||
Loading…
Reference in a new issue