Refactor: Rename variables and functions

Rename variables and functions for readability
Create WorkerConfig for cleaner argument passing
This commit is contained in:
Donggyu Kim 2026-04-26 18:46:11 +09:00
parent e004f1865d
commit b93abf8f90
3 changed files with 93 additions and 76 deletions

View file

@ -42,10 +42,10 @@ type ChunkedFile struct {
}
type Index interface {
BlobSize(blobID restic.ID) (size uint)
BlobToPack(blobID restic.ID) (packID restic.ID)
PackToBlobs(packID restic.ID) (blobs []restic.Blob)
Packs() (packIDs restic.IDSet)
BlobSize(blobID restic.ID) (size uint) // blob ID -> blob size
BlobToPack(blobID restic.ID) (packID restic.ID) // blob ID -> pack ID
PackToBlobs(packID restic.ID) (blobs []restic.Blob) // pack ID -> list of blobs to be loaded from the pack
Packs() (packIDs restic.IDSet) // set of all pack IDs
}
func NewRechunker(cfg Config) *Rechunker {
@ -230,9 +230,20 @@ func (rc *Rechunker) Rechunk(ctx context.Context, srcRepo, dstRepo restic.Reposi
debug.Log("Starting uploader")
defer debug.Log("Closing uploader")
workerCfg := WorkerConfig{
Pol: rc.cfg.Pol,
Downloader: downloader,
Uploader: uploader,
BufferPool: bufferPool,
NewCursor: scheduler.newCursor,
UpdateCursor: scheduler.updateCursor,
}
wg, ctx := errgroup.WithContext(ctx)
rc.runWorkers(ctx, wg, numWorkers, downloader, uploader, scheduler.Next, scheduler.progressCursor, bufferPool, p)
rc.runWorkers(ctx, wg, 1, downloader, uploader, scheduler.NextPriority, scheduler.progressCursor, bufferPool, p)
rc.runWorkers(ctx, wg, numWorkers, workerCfg, scheduler.Next, p)
rc.runWorkers(ctx, wg, 1, workerCfg, scheduler.NextPriority, p)
return wg.Wait()
})
@ -266,24 +277,18 @@ func (rc *Rechunker) setupCache(ctx context.Context, srcRepo PackLoader, schedul
repo, cache = WrapWithCache(ctx, srcRepo, rc.cfg.CacheSize, numDownloaders, rc.idx, scheduler.BlobReady, scheduler.BlobUnready)
// register cache.Ignore as scheduler's obsolete blob callback for early cache eviction
scheduler.SetObsoleteBlobCallback(cache.Ignore)
scheduler.SetIgnoreBlobsCallback(cache.Ignore)
return repo, cache
}
func (rc *Rechunker) runWorkers(ctx context.Context, wg *errgroup.Group, numWorkers int,
downloader restic.BlobLoader, uploader restic.BlobSaver, receiveJob func(context.Context) (*ChunkedFile, bool, error),
cursorProgressor func(cursor, uint) (cursor, error), bufferPool *BufferPool, p *Progress) {
workerCfg WorkerConfig, receiveJob func(context.Context) (*ChunkedFile, bool, error),
p *Progress) {
for range numWorkers {
wg.Go(func() error {
debug.Log("Starting worker")
worker := NewWorker(
rc.cfg.Pol,
downloader,
uploader,
bufferPool,
cursorProgressor,
)
worker := NewWorker(workerCfg)
for {
debug.Log("receiving job")

View file

@ -21,12 +21,12 @@ type Scheduler struct {
regularList []*ChunkedFile
priorityList []*ChunkedFile
filesContaining map[restic.ID][]*ChunkedFile
blobsToPrepare map[restic.ID]int
prefixLookup map[restic.ID][]*ChunkedFile // blob ID -> files that contain the blob as prefix
remainingPrefixBlobs map[restic.ID]int // file hashval -> remaining count until all its blobs ready
remainingBlobNeeds map[restic.ID]int
remainingBlobUsage map[restic.ID]int // blob ID -> remaining blob usage until the end
obsoleteBlobCB func(ids restic.IDs)
ignoreBlobsCB func(ids restic.IDs)
push chan struct{}
done chan struct{}
@ -40,25 +40,25 @@ func NewScheduler(ctx context.Context, files []*ChunkedFile, idx Index, usePrior
if !usePriority {
s := &Scheduler{
idx: idx,
regularList: files,
done: make(chan struct{}),
filesContaining: filesContaining,
blobsToPrepare: blobsToPrepare,
remainingBlobNeeds: remainingBlobNeeds,
idx: idx,
regularList: files,
done: make(chan struct{}),
prefixLookup: filesContaining,
remainingPrefixBlobs: blobsToPrepare,
remainingBlobUsage: remainingBlobNeeds,
}
s.createRegularCh(ctx, wg, nil)
return s
}
s := &Scheduler{
idx: idx,
regularList: files,
push: make(chan struct{}, 1),
done: make(chan struct{}),
filesContaining: filesContaining,
blobsToPrepare: blobsToPrepare,
remainingBlobNeeds: remainingBlobNeeds,
idx: idx,
regularList: files,
push: make(chan struct{}, 1),
done: make(chan struct{}),
prefixLookup: filesContaining,
remainingPrefixBlobs: blobsToPrepare,
remainingBlobUsage: remainingBlobNeeds,
}
set := restic.IDSet{}
@ -82,23 +82,23 @@ func NewScheduler(ctx context.Context, files []*ChunkedFile, idx Index, usePrior
const FILE_HEAD_LENGTH = 25
func createSchedulerState(files []*ChunkedFile) (map[restic.ID][]*ChunkedFile, map[restic.ID]int, map[restic.ID]int) {
blobCount := map[restic.ID]int{}
filesContaining := map[restic.ID][]*ChunkedFile{}
blobsToPrepare := map[restic.ID]int{}
blobUsage := map[restic.ID]int{}
prefixLookup := map[restic.ID][]*ChunkedFile{}
numPrefixBlobs := map[restic.ID]int{}
for _, file := range files {
prefixLen := min(FILE_HEAD_LENGTH, len(file.IDs))
blobSet := restic.NewIDSet(file.IDs[:prefixLen]...)
blobsToPrepare[file.hashval] = len(blobSet)
prefixSet := restic.NewIDSet(file.IDs[:prefixLen]...)
numPrefixBlobs[file.hashval] = len(prefixSet)
for _, blob := range file.IDs {
blobCount[blob]++
blobUsage[blob]++
}
for b := range blobSet {
filesContaining[b] = append(filesContaining[b], file)
for b := range prefixSet {
prefixLookup[b] = append(prefixLookup[b], file)
}
}
return filesContaining, blobsToPrepare, blobCount
return prefixLookup, numPrefixBlobs, blobUsage
}
func (s *Scheduler) Next(ctx context.Context) (*ChunkedFile, bool, error) {
@ -114,7 +114,7 @@ func (s *Scheduler) NextPriority(ctx context.Context) (*ChunkedFile, bool, error
return file, from != 0, err
}
func (s *Scheduler) PushPriority(files []*ChunkedFile) {
func (s *Scheduler) pushPriority(files []*ChunkedFile) {
if s.priorityCh == nil {
return
}
@ -221,14 +221,14 @@ func (s *Scheduler) BlobReady(ids restic.IDs) {
s.mu.Lock()
for _, id := range ids {
for _, file := range s.filesContaining[id] {
n := s.blobsToPrepare[file.hashval]
for _, file := range s.prefixLookup[id] {
n := s.remainingPrefixBlobs[file.hashval]
if n > 0 {
n--
if n == 0 {
readyFiles = append(readyFiles, file)
}
s.blobsToPrepare[file.hashval] = n
s.remainingPrefixBlobs[file.hashval] = n
}
}
}
@ -238,7 +238,7 @@ func (s *Scheduler) BlobReady(ids restic.IDs) {
return
}
s.PushPriority(readyFiles)
s.pushPriority(readyFiles)
if debugStats != nil {
dAdds := map[string]int{}
@ -261,22 +261,22 @@ func (s *Scheduler) BlobUnready(ids restic.IDs) {
s.mu.Lock()
for _, id := range ids {
filesToUpdate := s.filesContaining[id]
filesToUpdate := s.prefixLookup[id]
for _, file := range filesToUpdate {
// files with blobsToPrepare==0 is not tracked
if s.blobsToPrepare[file.hashval] > 0 {
s.blobsToPrepare[file.hashval]++
if s.remainingPrefixBlobs[file.hashval] > 0 {
s.remainingPrefixBlobs[file.hashval]++
}
}
}
s.mu.Unlock()
}
func (s *Scheduler) SetObsoleteBlobCallback(cb func(restic.IDs)) {
func (s *Scheduler) SetIgnoreBlobsCallback(cb func(restic.IDs)) {
s.mu.Lock()
defer s.mu.Unlock()
s.obsoleteBlobCB = cb
s.ignoreBlobsCB = cb
}
func (s *Scheduler) newCursor(blobs restic.IDs) cursor {
@ -290,18 +290,14 @@ func (s *Scheduler) newCursor(blobs restic.IDs) cursor {
}
}
// progressCursor computes progress of cursor for a file, while inferring src blob consumption and using that info to track blob usage.
func (s *Scheduler) progressCursor(c cursor, bytesProcessed uint) (cursor, error) {
// updateCursor computes progress of cursor for a file, while inferring src blob consumption and using that info to track blob usage.
func (s *Scheduler) updateCursor(c cursor, bytesProcessed uint) (cursor, error) {
start := c
end, err := c.Advance(bytesProcessed)
if err != nil {
return cursor{}, err
}
if s.obsoleteBlobCB == nil {
return end, nil
}
if start.BlobIdx == end.BlobIdx {
return end, nil
}
@ -310,8 +306,8 @@ func (s *Scheduler) progressCursor(c cursor, bytesProcessed uint) (cursor, error
var obsolete restic.IDs
s.mu.Lock()
for _, b := range blobs {
s.remainingBlobNeeds[b]--
if s.remainingBlobNeeds[b] == 0 {
s.remainingBlobUsage[b]--
if s.remainingBlobUsage[b] == 0 {
obsolete = append(obsolete, b)
}
}
@ -321,7 +317,9 @@ func (s *Scheduler) progressCursor(c cursor, bytesProcessed uint) (cursor, error
return end, nil
}
s.obsoleteBlobCB(obsolete)
if s.ignoreBlobsCB != nil {
s.ignoreBlobsCB(obsolete)
}
return end, nil
}

View file

@ -22,22 +22,34 @@ type Worker struct {
downloader restic.BlobLoader
uploader restic.BlobSaver
cursorProgressor func(cursor cursor, bytesProcessed uint) (cursor, error)
newCursor func(blobs restic.IDs) cursor
updateCursor func(c cursor, numBytes uint) (cursor, error)
}
type WorkerConfig struct {
Pol chunker.Pol
Downloader restic.BlobLoader
Uploader restic.BlobSaver
BufferPool *BufferPool
NewCursor func(blobs restic.IDs) cursor
UpdateCursor func(c cursor, numBytes uint) (cursor, error)
}
func NewWorker(pol chunker.Pol, downloader restic.BlobLoader, uploader restic.BlobSaver,
bufferPool *BufferPool,
cursorProgressor func(cursor, uint) (cursor, error),
) *Worker {
func NewWorker(cfg WorkerConfig) *Worker {
if cfg.BufferPool == nil {
cfg.BufferPool = NewBufferPool(3)
}
return &Worker{
pool: bufferPool,
pool: cfg.BufferPool,
chunker: chunker.New(nil, pol),
pol: pol,
downloader: downloader,
uploader: uploader,
chunker: chunker.New(nil, cfg.Pol),
pol: cfg.Pol,
downloader: cfg.Downloader,
uploader: cfg.Uploader,
cursorProgressor: cursorProgressor,
newCursor: cfg.NewCursor,
updateCursor: cfg.UpdateCursor,
}
}
@ -77,7 +89,10 @@ func (w *Worker) runReader(ctx context.Context, wg *errgroup.Group, srcBlobs res
w.chunker.Reset(reader, w.pol)
c := cursor{blobs: srcBlobs}
var c cursor
if w.newCursor != nil {
c = w.newCursor(srcBlobs)
}
for {
// bring buffer from bufferPool
@ -93,9 +108,8 @@ func (w *Worker) runReader(ctx context.Context, wg *errgroup.Group, srcBlobs res
return err
}
// if cursor progressor callback is given, run it
if w.cursorProgressor != nil {
c, err = w.cursorProgressor(c, chunk.Length)
if w.updateCursor != nil {
c, err = w.updateCursor(c, chunk.Length)
if err != nil {
return err
}