diff --git a/internal/dump/common.go b/internal/dump/common.go index b4741302e..8c02af04b 100644 --- a/internal/dump/common.go +++ b/internal/dump/common.go @@ -104,9 +104,11 @@ func (d *Dumper) WriteNode(ctx context.Context, node *restic.Node) error { func (d *Dumper) writeNode(ctx context.Context, w io.Writer, node *restic.Node) error { wg, ctx := errgroup.WithContext(ctx) - limit := d.repo.Connections() - 1 // See below for the -1. + limit := int(d.repo.Connections()) + wg.SetLimit(1 + limit) // +1 for the writer. blobs := make(chan (<-chan []byte), limit) + // Writer. wg.Go(func() error { for ch := range blobs { select { @@ -122,7 +124,6 @@ func (d *Dumper) writeNode(ctx context.Context, w io.Writer, node *restic.Node) }) // Start short-lived goroutines to load blobs. - // There will be at most 1+cap(blobs) calling LoadBlob at any moment. loop: for _, id := range node.Content { // This needs to be buffered, so that loaders can quit diff --git a/internal/repository/index/master_index.go b/internal/repository/index/master_index.go index a41119b58..c0a5095e3 100644 --- a/internal/repository/index/master_index.go +++ b/internal/repository/index/master_index.go @@ -459,27 +459,24 @@ func (mi *MasterIndex) Rewrite(ctx context.Context, repo restic.Unpacked[restic. return nil }) - // a worker receives an index from ch, and saves the index - worker := func() error { - for idx := range saveCh { - idx.Finalize() - if len(idx.packs) == 0 { - continue - } - if _, err := idx.SaveIndex(wgCtx, repo); err != nil { - return err - } - } - return nil - } - + var savers errgroup.Group // encoding an index can take quite some time such that this can be CPU- or IO-bound // do not add repo.Connections() here as there are already the loader goroutines. - workerCount := runtime.GOMAXPROCS(0) - // run workers on ch - for i := 0; i < workerCount; i++ { - wg.Go(worker) + savers.SetLimit(runtime.GOMAXPROCS(0)) + + for idx := range saveCh { + savers.Go(func() error { + idx.Finalize() + if len(idx.packs) == 0 { + return nil + } + _, err := idx.SaveIndex(wgCtx, repo) + return err + }) } + + wg.Go(savers.Wait) + err := wg.Wait() p.Done() if err != nil { @@ -514,6 +511,8 @@ func (mi *MasterIndex) SaveFallback(ctx context.Context, repo restic.SaverRemove obsolete := restic.NewIDSet() wg, wgCtx := errgroup.WithContext(ctx) + // keep concurrency bounded as we're on a fallback path + wg.SetLimit(1 + int(repo.Connections())) ch := make(chan *Index) wg.Go(func() error { @@ -553,23 +552,14 @@ func (mi *MasterIndex) SaveFallback(ctx context.Context, repo restic.SaverRemove return nil }) - // a worker receives an index from ch, and saves the index - worker := func() error { - for idx := range ch { + for idx := range ch { + wg.Go(func() error { idx.Finalize() - if _, err := idx.SaveIndex(wgCtx, repo); err != nil { - return err - } - } - return nil + _, err := idx.SaveIndex(wgCtx, repo) + return err + }) } - // keep concurrency bounded as we're on a fallback path - workerCount := int(repo.Connections()) - // run workers on ch - for i := 0; i < workerCount; i++ { - wg.Go(worker) - } err := wg.Wait() p.Done() // the index no longer matches to stored state diff --git a/internal/restic/parallel.go b/internal/restic/parallel.go index eacb54bae..a151a49d7 100644 --- a/internal/restic/parallel.go +++ b/internal/restic/parallel.go @@ -52,43 +52,31 @@ func ParallelList(ctx context.Context, r Lister, t FileType, parallelism uint, f return wg.Wait() } -// ParallelRemove deletes the given fileList of fileType in parallel -// if callback returns an error, then it will abort. +// ParallelRemove deletes the given fileList of fileType in parallel. +// If report returns an error, it aborts. func ParallelRemove[FT FileTypes](ctx context.Context, repo RemoverUnpacked[FT], fileList IDSet, fileType FT, report func(id ID, err error) error, bar *progress.Counter) error { - fileChan := make(chan ID) wg, ctx := errgroup.WithContext(ctx) - wg.Go(func() error { - defer close(fileChan) - for id := range fileList { - select { - case fileChan <- id: - case <-ctx.Done(): - return ctx.Err() - } - } - return nil - }) + wg.SetLimit(int(repo.Connections())) // deleting files is IO-bound bar.SetMax(uint64(len(fileList))) - // deleting files is IO-bound - workerCount := repo.Connections() - for i := 0; i < int(workerCount); i++ { +loop: + for id := range fileList { + select { + case <-ctx.Done(): + break loop + default: + } + wg.Go(func() error { - for id := range fileChan { - err := repo.RemoveUnpacked(ctx, fileType, id) - if err == nil { - // increment counter only if no error - bar.Add(1) - } - if report != nil { - err = report(id, err) - } - if err != nil { - return err - } + err := repo.RemoveUnpacked(ctx, fileType, id) + if err == nil { + bar.Add(1) } - return nil + if report != nil { + err = report(id, err) + } + return err }) } return wg.Wait()