diff --git a/internal/archiver/archiver.go b/internal/archiver/archiver.go index 9c6462887..0ed37eb5b 100644 --- a/internal/archiver/archiver.go +++ b/internal/archiver/archiver.go @@ -96,7 +96,6 @@ type Archiver struct { FS fs.FS Options Options - blobSaver *blobSaver fileSaver *fileSaver treeSaver *treeSaver mu sync.Mutex @@ -145,11 +144,6 @@ type Options struct { // turned out to be a good default for most situations). ReadConcurrency uint - // SaveBlobConcurrency sets how many blobs are hashed and saved - // concurrently. If it's set to zero, the default is the number of CPUs - // available in the system. - SaveBlobConcurrency uint - // SaveTreeConcurrency sets how many trees are marshalled and saved to the // repo concurrently. SaveTreeConcurrency uint @@ -165,12 +159,6 @@ func (o Options) ApplyDefaults() Options { o.ReadConcurrency = 2 } - if o.SaveBlobConcurrency == 0 { - // blob saving is CPU bound due to hash checking and encryption - // the actual upload is handled by the repository itself - o.SaveBlobConcurrency = uint(runtime.GOMAXPROCS(0)) - } - if o.SaveTreeConcurrency == 0 { // can either wait for a file, wait for a tree, serialize a tree or wait for saveblob // the last two are cpu-bound and thus mutually exclusive. @@ -834,24 +822,20 @@ func (arch *Archiver) loadParentTree(ctx context.Context, sn *data.Snapshot) *da } // runWorkers starts the worker pools, which are stopped when the context is cancelled. -func (arch *Archiver) runWorkers(ctx context.Context, wg *errgroup.Group, uploader restic.BlobSaver) { - arch.blobSaver = newBlobSaver(ctx, wg, uploader, arch.Options.SaveBlobConcurrency) - +func (arch *Archiver) runWorkers(ctx context.Context, wg *errgroup.Group, uploader restic.BlobSaverAsync) { arch.fileSaver = newFileSaver(ctx, wg, - arch.blobSaver.Save, + uploader, arch.Repo.Config().ChunkerPolynomial, - arch.Options.ReadConcurrency, arch.Options.SaveBlobConcurrency) + arch.Options.ReadConcurrency) arch.fileSaver.CompleteBlob = arch.CompleteBlob arch.fileSaver.NodeFromFileInfo = arch.nodeFromFileInfo - arch.treeSaver = newTreeSaver(ctx, wg, arch.Options.SaveTreeConcurrency, arch.blobSaver.Save, arch.Error) + arch.treeSaver = newTreeSaver(ctx, wg, arch.Options.SaveTreeConcurrency, uploader, arch.Error) } func (arch *Archiver) stopWorkers() { - arch.blobSaver.TriggerShutdown() arch.fileSaver.TriggerShutdown() arch.treeSaver.TriggerShutdown() - arch.blobSaver = nil arch.fileSaver = nil arch.treeSaver = nil } diff --git a/internal/archiver/archiver_test.go b/internal/archiver/archiver_test.go index cefe87285..eb60e1174 100644 --- a/internal/archiver/archiver_test.go +++ b/internal/archiver/archiver_test.go @@ -2084,8 +2084,6 @@ func TestArchiverContextCanceled(t *testing.T) { type TrackFS struct { fs.FS - errorOn map[string]error - opened map[string]uint m sync.Mutex } @@ -2101,33 +2099,53 @@ func (m *TrackFS) OpenFile(name string, flag int, metadataOnly bool) (fs.File, e type failSaveRepo struct { archiverRepo failAfter int32 - cnt int32 + cnt atomic.Int32 err error } func (f *failSaveRepo) WithBlobUploader(ctx context.Context, fn func(ctx context.Context, uploader restic.BlobSaverWithAsync) error) error { return f.archiverRepo.WithBlobUploader(ctx, func(ctx context.Context, uploader restic.BlobSaverWithAsync) error { - return fn(ctx, &failSaveSaver{saver: uploader, failSaveRepo: f}) + return fn(ctx, &failSaveSaver{saver: uploader, failSaveRepo: f, semaphore: make(chan struct{}, 1)}) }) } type failSaveSaver struct { - saver restic.BlobSaver + saver restic.BlobSaverWithAsync failSaveRepo *failSaveRepo + semaphore chan struct{} } func (f *failSaveSaver) SaveBlob(ctx context.Context, t restic.BlobType, buf []byte, id restic.ID, storeDuplicate bool) (restic.ID, bool, int, error) { - val := atomic.AddInt32(&f.failSaveRepo.cnt, 1) + val := f.failSaveRepo.cnt.Add(1) if val >= f.failSaveRepo.failAfter { - return restic.Hash(buf), false, 0, f.failSaveRepo.err + return restic.ID{}, false, 0, f.failSaveRepo.err } return f.saver.SaveBlob(ctx, t, buf, id, storeDuplicate) } -func (f *failSaveSaver) SaveBlobAsync(ctx context.Context, tpe restic.BlobType, buf []byte, id restic.ID, storeDuplicate bool, cb func(newID restic.ID, known bool, sizeInRepo int, err error)) { - newID, known, sizeInRepo, err := f.SaveBlob(ctx, tpe, buf, id, storeDuplicate) - cb(newID, known, sizeInRepo, err) +func (f *failSaveSaver) SaveBlobAsync(ctx context.Context, t restic.BlobType, buf []byte, id restic.ID, storeDuplicate bool, cb func(newID restic.ID, known bool, size int, err error)) { + // limit concurrency to make test reliable + f.semaphore <- struct{}{} + + val := f.failSaveRepo.cnt.Add(1) + if val >= f.failSaveRepo.failAfter { + // use a canceled context to make SaveBlobAsync fail + var cancel context.CancelCauseFunc + ctx, cancel = context.WithCancelCause(ctx) + cancel(f.failSaveRepo.err) + } + + f.saver.SaveBlobAsync(ctx, t, buf, id, storeDuplicate, func(newID restic.ID, known bool, size int, err error) { + if val >= f.failSaveRepo.failAfter { + if err == nil { + panic("expected error") + } + err = f.failSaveRepo.err + } + cb(newID, known, size, err) + <-f.semaphore + }) } func TestArchiverAbortEarlyOnError(t *testing.T) { @@ -2152,6 +2170,7 @@ func TestArchiverAbortEarlyOnError(t *testing.T) { filepath.FromSlash("dir/baz"): 1, filepath.FromSlash("dir/foo"): 1, }, + err: testErr, }, { src: TestDir{ @@ -2177,7 +2196,7 @@ func TestArchiverAbortEarlyOnError(t *testing.T) { filepath.FromSlash("dir/file9"): 0, }, // fails after four to seven files were opened, as the ReadConcurrency allows for - // two queued files and SaveBlobConcurrency for one blob queued for saving. + // two queued files and one blob queued for saving. failAfter: 4, err: testErr, }, @@ -2198,10 +2217,6 @@ func TestArchiverAbortEarlyOnError(t *testing.T) { opened: make(map[string]uint), } - if testFS.errorOn == nil { - testFS.errorOn = make(map[string]error) - } - testRepo := &failSaveRepo{ archiverRepo: repo, failAfter: int32(test.failAfter), @@ -2210,9 +2225,12 @@ func TestArchiverAbortEarlyOnError(t *testing.T) { // at most two files may be queued arch := New(testRepo, testFS, Options{ - ReadConcurrency: 2, - SaveBlobConcurrency: 1, + ReadConcurrency: 2, }) + arch.Error = func(item string, err error) error { + t.Logf("archiver error for %q: %v", item, err) + return err + } _, _, _, err := arch.Snapshot(ctx, []string{"."}, SnapshotOptions{Time: time.Now()}) if !errors.Is(err, test.err) { diff --git a/internal/archiver/blob_saver.go b/internal/archiver/blob_saver.go deleted file mode 100644 index 356a32ce2..000000000 --- a/internal/archiver/blob_saver.go +++ /dev/null @@ -1,105 +0,0 @@ -package archiver - -import ( - "context" - "fmt" - - "github.com/restic/restic/internal/debug" - "github.com/restic/restic/internal/restic" - "golang.org/x/sync/errgroup" -) - -// saver allows saving a blob. -type saver interface { - SaveBlob(ctx context.Context, t restic.BlobType, data []byte, id restic.ID, storeDuplicate bool) (restic.ID, bool, int, error) -} - -// blobSaver concurrently saves incoming blobs to the repo. -type blobSaver struct { - repo saver - ch chan<- saveBlobJob -} - -// newBlobSaver returns a new blob. A worker pool is started, it is stopped -// when ctx is cancelled. -func newBlobSaver(ctx context.Context, wg *errgroup.Group, repo saver, workers uint) *blobSaver { - ch := make(chan saveBlobJob) - s := &blobSaver{ - repo: repo, - ch: ch, - } - - for i := uint(0); i < workers; i++ { - wg.Go(func() error { - return s.worker(ctx, ch) - }) - } - - return s -} - -func (s *blobSaver) TriggerShutdown() { - close(s.ch) -} - -// Save stores a blob in the repo. It checks the index and the known blobs -// before saving anything. It takes ownership of the buffer passed in. -func (s *blobSaver) Save(ctx context.Context, t restic.BlobType, buf *buffer, filename string, cb func(res saveBlobResponse)) { - select { - case s.ch <- saveBlobJob{BlobType: t, buf: buf, fn: filename, cb: cb}: - case <-ctx.Done(): - debug.Log("not sending job, context is cancelled") - } -} - -type saveBlobJob struct { - restic.BlobType - buf *buffer - fn string - cb func(res saveBlobResponse) -} - -type saveBlobResponse struct { - id restic.ID - length int - sizeInRepo int - known bool -} - -func (s *blobSaver) saveBlob(ctx context.Context, t restic.BlobType, buf []byte) (saveBlobResponse, error) { - id, known, sizeInRepo, err := s.repo.SaveBlob(ctx, t, buf, restic.ID{}, false) - - if err != nil { - return saveBlobResponse{}, err - } - - return saveBlobResponse{ - id: id, - length: len(buf), - sizeInRepo: sizeInRepo, - known: known, - }, nil -} - -func (s *blobSaver) worker(ctx context.Context, jobs <-chan saveBlobJob) error { - for { - var job saveBlobJob - var ok bool - select { - case <-ctx.Done(): - return nil - case job, ok = <-jobs: - if !ok { - return nil - } - } - - res, err := s.saveBlob(ctx, job.BlobType, job.buf.Data) - if err != nil { - debug.Log("saveBlob returned error, exiting: %v", err) - return fmt.Errorf("failed to save blob from file %q: %w", job.fn, err) - } - job.cb(res) - job.buf.Release() - } -} diff --git a/internal/archiver/blob_saver_test.go b/internal/archiver/blob_saver_test.go deleted file mode 100644 index e23ed12e5..000000000 --- a/internal/archiver/blob_saver_test.go +++ /dev/null @@ -1,116 +0,0 @@ -package archiver - -import ( - "context" - "fmt" - "runtime" - "strings" - "sync" - "sync/atomic" - "testing" - - "github.com/restic/restic/internal/errors" - "github.com/restic/restic/internal/restic" - rtest "github.com/restic/restic/internal/test" - "golang.org/x/sync/errgroup" -) - -var errTest = errors.New("test error") - -type saveFail struct { - cnt int32 - failAt int32 -} - -func (b *saveFail) SaveBlob(_ context.Context, _ restic.BlobType, _ []byte, id restic.ID, _ bool) (restic.ID, bool, int, error) { - val := atomic.AddInt32(&b.cnt, 1) - if val == b.failAt { - return restic.ID{}, false, 0, errTest - } - - return id, false, 0, nil -} - -func TestBlobSaver(t *testing.T) { - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - - wg, ctx := errgroup.WithContext(ctx) - saver := &saveFail{} - - b := newBlobSaver(ctx, wg, saver, uint(runtime.NumCPU())) - - var wait sync.WaitGroup - var results []saveBlobResponse - var lock sync.Mutex - - wait.Add(20) - for i := 0; i < 20; i++ { - buf := &buffer{Data: []byte(fmt.Sprintf("foo%d", i))} - idx := i - lock.Lock() - results = append(results, saveBlobResponse{}) - lock.Unlock() - b.Save(ctx, restic.DataBlob, buf, "file", func(res saveBlobResponse) { - lock.Lock() - results[idx] = res - lock.Unlock() - wait.Done() - }) - } - - wait.Wait() - for i, sbr := range results { - if sbr.known { - t.Errorf("blob %v is known, that should not be the case", i) - } - } - - b.TriggerShutdown() - - err := wg.Wait() - if err != nil { - t.Fatal(err) - } -} - -func TestBlobSaverError(t *testing.T) { - var tests = []struct { - blobs int - failAt int - }{ - {20, 2}, - {20, 5}, - {20, 15}, - {200, 150}, - } - - for _, test := range tests { - t.Run("", func(t *testing.T) { - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - - wg, ctx := errgroup.WithContext(ctx) - saver := &saveFail{ - failAt: int32(test.failAt), - } - - b := newBlobSaver(ctx, wg, saver, uint(runtime.NumCPU())) - - for i := 0; i < test.blobs; i++ { - buf := &buffer{Data: []byte(fmt.Sprintf("foo%d", i))} - b.Save(ctx, restic.DataBlob, buf, "errfile", func(res saveBlobResponse) {}) - } - - b.TriggerShutdown() - - err := wg.Wait() - if err == nil { - t.Errorf("expected error not found") - } - - rtest.Assert(t, errors.Is(err, errTest), "unexpected error %v", err) - rtest.Assert(t, strings.Contains(err.Error(), "errfile"), "expected error to contain 'errfile' got: %v", err) - }) - } -} diff --git a/internal/archiver/file_saver.go b/internal/archiver/file_saver.go index 8370bee4d..4d0603e34 100644 --- a/internal/archiver/file_saver.go +++ b/internal/archiver/file_saver.go @@ -4,6 +4,7 @@ import ( "context" "fmt" "io" + "runtime" "sync" "github.com/restic/chunker" @@ -15,13 +16,10 @@ import ( "golang.org/x/sync/errgroup" ) -// saveBlobFn saves a blob to a repo. -type saveBlobFn func(context.Context, restic.BlobType, *buffer, string, func(res saveBlobResponse)) - // fileSaver concurrently saves incoming files to the repo. type fileSaver struct { saveFilePool *bufferPool - saveBlob saveBlobFn + uploader restic.BlobSaverAsync pol chunker.Pol @@ -34,15 +32,17 @@ type fileSaver struct { // newFileSaver returns a new file saver. A worker pool with fileWorkers is // started, it is stopped when ctx is cancelled. -func newFileSaver(ctx context.Context, wg *errgroup.Group, save saveBlobFn, pol chunker.Pol, fileWorkers, blobWorkers uint) *fileSaver { +func newFileSaver(ctx context.Context, wg *errgroup.Group, uploader restic.BlobSaverAsync, pol chunker.Pol, fileWorkers uint) *fileSaver { ch := make(chan saveFileJob) + // TODO find a way to get rid of this parameter + blobWorkers := uint(runtime.GOMAXPROCS(0)) debug.Log("new file saver with %v file workers and %v blob workers", fileWorkers, blobWorkers) poolSize := fileWorkers + blobWorkers s := &fileSaver{ - saveBlob: save, + uploader: uploader, saveFilePool: newBufferPool(int(poolSize), chunker.MaxSize), pol: pol, ch: ch, @@ -203,15 +203,20 @@ func (s *fileSaver) saveFile(ctx context.Context, chnker *chunker.Chunker, snPat node.Content = append(node.Content, restic.ID{}) lock.Unlock() - s.saveBlob(ctx, restic.DataBlob, buf, target, func(sbr saveBlobResponse) { - lock.Lock() - if !sbr.known { - fnr.stats.DataBlobs++ - fnr.stats.DataSize += uint64(sbr.length) - fnr.stats.DataSizeInRepo += uint64(sbr.sizeInRepo) + s.uploader.SaveBlobAsync(ctx, restic.DataBlob, buf.Data, restic.ID{}, false, func(newID restic.ID, known bool, sizeInRepo int, err error) { + defer buf.Release() + if err != nil { + completeError(err) + return } - node.Content[pos] = sbr.id + lock.Lock() + if !known { + fnr.stats.DataBlobs++ + fnr.stats.DataSize += uint64(len(buf.Data)) + fnr.stats.DataSizeInRepo += uint64(sizeInRepo) + } + node.Content[pos] = newID lock.Unlock() completeBlob() diff --git a/internal/archiver/file_saver_test.go b/internal/archiver/file_saver_test.go index 5aab78558..59a996d76 100644 --- a/internal/archiver/file_saver_test.go +++ b/internal/archiver/file_saver_test.go @@ -11,7 +11,6 @@ import ( "github.com/restic/chunker" "github.com/restic/restic/internal/data" "github.com/restic/restic/internal/fs" - "github.com/restic/restic/internal/restic" "github.com/restic/restic/internal/test" "golang.org/x/sync/errgroup" ) @@ -34,22 +33,13 @@ func createTestFiles(t testing.TB, num int) (files []string) { func startFileSaver(ctx context.Context, t testing.TB, _ fs.FS) (*fileSaver, context.Context, *errgroup.Group) { wg, ctx := errgroup.WithContext(ctx) - saveBlob := func(ctx context.Context, tpe restic.BlobType, buf *buffer, _ string, cb func(saveBlobResponse)) { - cb(saveBlobResponse{ - id: restic.Hash(buf.Data), - length: len(buf.Data), - sizeInRepo: len(buf.Data), - known: false, - }) - } - workers := uint(runtime.NumCPU()) pol, err := chunker.RandomPolynomial() if err != nil { t.Fatal(err) } - s := newFileSaver(ctx, wg, saveBlob, pol, workers, workers) + s := newFileSaver(ctx, wg, &noopSaver{}, pol, workers) s.NodeFromFileInfo = func(snPath, filename string, meta ToNoder, ignoreXattrListError bool) (*data.Node, error) { return meta.ToNode(ignoreXattrListError, t.Logf) } diff --git a/internal/archiver/tree_saver.go b/internal/archiver/tree_saver.go index d0e802765..8b38b5eb2 100644 --- a/internal/archiver/tree_saver.go +++ b/internal/archiver/tree_saver.go @@ -12,7 +12,7 @@ import ( // treeSaver concurrently saves incoming trees to the repo. type treeSaver struct { - saveBlob saveBlobFn + uploader restic.BlobSaverAsync errFn ErrorFunc ch chan<- saveTreeJob @@ -20,12 +20,12 @@ type treeSaver struct { // newTreeSaver returns a new tree saver. A worker pool with treeWorkers is // started, it is stopped when ctx is cancelled. -func newTreeSaver(ctx context.Context, wg *errgroup.Group, treeWorkers uint, saveBlob saveBlobFn, errFn ErrorFunc) *treeSaver { +func newTreeSaver(ctx context.Context, wg *errgroup.Group, treeWorkers uint, uploader restic.BlobSaverAsync, errFn ErrorFunc) *treeSaver { ch := make(chan saveTreeJob) s := &treeSaver{ ch: ch, - saveBlob: saveBlob, + uploader: uploader, errFn: errFn, } @@ -129,21 +129,35 @@ func (s *treeSaver) save(ctx context.Context, job *saveTreeJob) (*data.Node, Ite return nil, stats, err } - b := &buffer{Data: buf} - ch := make(chan saveBlobResponse, 1) - s.saveBlob(ctx, restic.TreeBlob, b, job.target, func(res saveBlobResponse) { - ch <- res + var ( + known bool + length int + sizeInRepo int + id restic.ID + ) + + ch := make(chan struct{}, 1) + s.uploader.SaveBlobAsync(ctx, restic.TreeBlob, buf, restic.ID{}, false, func(newID restic.ID, cbKnown bool, cbSizeInRepo int, cbErr error) { + known = cbKnown + length = len(buf) + sizeInRepo = cbSizeInRepo + id = newID + err = cbErr + ch <- struct{}{} }) select { - case sbr := <-ch: - if !sbr.known { + case <-ch: + if err != nil { + return nil, stats, err + } + if !known { stats.TreeBlobs++ - stats.TreeSize += uint64(sbr.length) - stats.TreeSizeInRepo += uint64(sbr.sizeInRepo) + stats.TreeSize += uint64(length) + stats.TreeSizeInRepo += uint64(sizeInRepo) } - node.Subtree = &sbr.id + node.Subtree = &id return node, stats, nil case <-ctx.Done(): return nil, stats, ctx.Err() diff --git a/internal/archiver/tree_saver_test.go b/internal/archiver/tree_saver_test.go index 2a4826444..4724f5d5c 100644 --- a/internal/archiver/tree_saver_test.go +++ b/internal/archiver/tree_saver_test.go @@ -13,13 +13,10 @@ import ( "golang.org/x/sync/errgroup" ) -func treeSaveHelper(_ context.Context, _ restic.BlobType, buf *buffer, _ string, cb func(res saveBlobResponse)) { - cb(saveBlobResponse{ - id: restic.NewRandomID(), - known: false, - length: len(buf.Data), - sizeInRepo: len(buf.Data), - }) +type noopSaver struct{} + +func (n *noopSaver) SaveBlobAsync(_ context.Context, _ restic.BlobType, buf []byte, id restic.ID, storeDuplicate bool, cb func(newID restic.ID, known bool, sizeInRepo int, err error)) { + cb(restic.Hash(buf), false, len(buf), nil) } func setupTreeSaver() (context.Context, context.CancelFunc, *treeSaver, func() error) { @@ -30,7 +27,7 @@ func setupTreeSaver() (context.Context, context.CancelFunc, *treeSaver, func() e return err } - b := newTreeSaver(ctx, wg, uint(runtime.NumCPU()), treeSaveHelper, errFn) + b := newTreeSaver(ctx, wg, uint(runtime.NumCPU()), &noopSaver{}, errFn) shutdown := func() error { b.TriggerShutdown()