From ba07d974ecb280b2d822507114f2228841a0d102 Mon Sep 17 00:00:00 2001 From: Hugo Osvaldo Barrera Date: Sat, 28 Mar 2026 02:24:11 +0100 Subject: [PATCH 1/2] Add benchmark with concurrent archivers Current benchmarks all work with a single archiver, and are not suitable for testing changes which might affect contention. Add a benchmark with multiple small files and multiple concurrent archivers. --- internal/archiver/archiver_test.go | 56 ++++++++++++++++++++++++++++++ 1 file changed, 56 insertions(+) diff --git a/internal/archiver/archiver_test.go b/internal/archiver/archiver_test.go index b13734655..98698a7c0 100644 --- a/internal/archiver/archiver_test.go +++ b/internal/archiver/archiver_test.go @@ -376,6 +376,62 @@ func BenchmarkArchiverSaveFileSmall(b *testing.B) { } } +func BenchmarkArchiverSaveFileConcurrent(b *testing.B) { + const fileSize = 4 * 1024 + const numFiles = 64 + + d := TestDir{} + for i := 0; i < numFiles; i++ { + d[fmt.Sprintf("file%d", i)] = TestFile{ + Content: string(rtest.Random(23+i, fileSize)), + } + } + + b.SetBytes(fileSize * numFiles) + + for i := 0; i < b.N; i++ { + b.StopTimer() + tempdir, repo := prepareTempdirRepoSrc(b, d) + + arch := New(repo, fs.Track{FS: fs.Local{}}, Options{}) + arch.Error = func(item string, err error) error { + b.Errorf("archiver error for %v: %v", item, err) + return err + } + b.StartTimer() + + err := repo.WithBlobUploader(context.Background(), func(ctx context.Context, uploader restic.BlobSaverWithAsync) error { + wg, ctx := errgroup.WithContext(ctx) + arch.runWorkers(ctx, wg, uploader) + + var futures []futureNode + for j := 0; j < numFiles; j++ { + filename := filepath.Join(tempdir, fmt.Sprintf("file%d", j)) + file, err := arch.FS.OpenFile(filename, fs.O_NOFOLLOW, false) + if err != nil { + b.Fatal(err) + } + + fn := arch.fileSaver.Save(ctx, "/", filename, file, func() {}, func() {}, func(*data.Node, ItemStats) {}) + futures = append(futures, fn) + } + + for _, fn := range futures { + fnr := fn.take(ctx) + if fnr.err != nil { + b.Fatal(fnr.err) + } + } + + arch.stopWorkers() + return wg.Wait() + }) + if err != nil { + b.Fatal(err) + } + } +} + func BenchmarkArchiverSaveFileLarge(b *testing.B) { const fileSize = 40*1024*1024 + 1287898 d := TestDir{"file": TestFile{ From 402d8857771553dd558f57cafce6437642386e77 Mon Sep 17 00:00:00 2001 From: Hugo Osvaldo Barrera Date: Sat, 24 Feb 2024 23:16:09 +0100 Subject: [PATCH 2/2] Don't use locks for counting stats Counting statistics uses locks each time an archiver needs to update stats. Use lockless stats counting, using locks only for measuring network speeed (this bit is stateful and can't trivially be made lockless). To be frank, I was curious if this was even feasible, and went out to implement it mostly to know if it was. Benchmarks show a mere 1.6% improvement in performance, with no real additional algorithmic complexity for it. Depends-On: https://github.com/restic/restic/pull/5764 --- internal/archiver/archiver.go | 60 ++++---- internal/archiver/archiver_test.go | 206 ++++++++++++++------------- internal/archiver/file_saver.go | 11 +- internal/archiver/file_saver_test.go | 2 +- internal/archiver/tree_saver.go | 22 +-- internal/ui/backup/json.go | 50 +++---- internal/ui/backup/progress.go | 65 ++++----- internal/ui/backup/progress_test.go | 8 +- internal/ui/backup/rate_estimator.go | 9 ++ internal/ui/backup/text.go | 40 +++--- 10 files changed, 245 insertions(+), 228 deletions(-) diff --git a/internal/archiver/archiver.go b/internal/archiver/archiver.go index 996e79e6a..29e83100e 100644 --- a/internal/archiver/archiver.go +++ b/internal/archiver/archiver.go @@ -9,6 +9,7 @@ import ( "sort" "strings" "sync" + "sync/atomic" "time" "github.com/restic/restic/internal/data" @@ -35,12 +36,12 @@ type ErrorFunc func(file string, err error) error // ItemStats collects some statistics about a particular file or directory. type ItemStats struct { - DataBlobs int // number of new data blobs added for this item - DataSize uint64 // sum of the sizes of all new data blobs - DataSizeInRepo uint64 // sum of the bytes added to the repo (including compression and crypto overhead) - TreeBlobs int // number of new tree blobs added for this item - TreeSize uint64 // sum of the sizes of all new tree blobs - TreeSizeInRepo uint64 // sum of the bytes added to the repo (including compression and crypto overhead) + DataBlobs atomic.Int64 // number of new data blobs added for this item + DataSize atomic.Uint64 // sum of the sizes of all new data blobs + DataSizeInRepo atomic.Uint64 // sum of the bytes added to the repo (including compression and crypto overhead) + TreeBlobs atomic.Int64 // number of new tree blobs added for this item + TreeSize atomic.Uint64 // sum of the sizes of all new tree blobs + TreeSizeInRepo atomic.Uint64 // sum of the bytes added to the repo (including compression and crypto overhead) } type ChangeStats struct { @@ -57,14 +58,17 @@ type Summary struct { ItemStats } -// Add adds other to the current ItemStats. -func (s *ItemStats) Add(other ItemStats) { - s.DataBlobs += other.DataBlobs - s.DataSize += other.DataSize - s.DataSizeInRepo += other.DataSizeInRepo - s.TreeBlobs += other.TreeBlobs - s.TreeSize += other.TreeSize - s.TreeSizeInRepo += other.TreeSizeInRepo +// Add atomically adds other to the current ItemStats. +func (s *ItemStats) Add(other *ItemStats) { + if other == nil { + return + } + s.DataBlobs.Add(other.DataBlobs.Load()) + s.DataSize.Add(other.DataSize.Load()) + s.DataSizeInRepo.Add(other.DataSizeInRepo.Load()) + s.TreeBlobs.Add(other.TreeBlobs.Load()) + s.TreeSize.Add(other.TreeSize.Load()) + s.TreeSizeInRepo.Add(other.TreeSizeInRepo.Load()) } // ToNoder returns a data.Node for a File. @@ -114,7 +118,7 @@ type Archiver struct { // // CompleteItem may be called asynchronously from several different // goroutines! - CompleteItem func(item string, previous, current *data.Node, s ItemStats, d time.Duration) + CompleteItem func(item string, previous, current *data.Node, s *ItemStats, d time.Duration) // StartFile is called when a file is being processed by a worker. StartFile func(filename string) @@ -180,7 +184,7 @@ func New(repo archiverRepo, filesystem fs.FS, opts Options) *Archiver { FS: filesystem, Options: opts.ApplyDefaults(), - CompleteItem: func(string, *data.Node, *data.Node, ItemStats, time.Duration) {}, + CompleteItem: func(string, *data.Node, *data.Node, *ItemStats, time.Duration) {}, StartFile: func(string) {}, CompleteBlob: func(uint64) {}, } @@ -210,7 +214,7 @@ func (arch *Archiver) error(item string, err error) error { return errf } -func (arch *Archiver) trackItem(item string, previous, current *data.Node, s ItemStats, d time.Duration) { +func (arch *Archiver) trackItem(item string, previous, current *data.Node, s *ItemStats, d time.Duration) { arch.CompleteItem(item, previous, current, s, d) arch.mu.Lock() @@ -395,7 +399,7 @@ type futureNodeResult struct { snPath, target string node *data.Node - stats ItemStats + stats *ItemStats err error } @@ -514,7 +518,7 @@ func (arch *Archiver) save(ctx context.Context, snPath, target string, previous if previous != nil && !fileChanged(fi, previous, arch.ChangeIgnoreFlags) { if arch.allBlobsPresent(previous) { debug.Log("%v hasn't changed, using old list of blobs", target) - arch.trackItem(snPath, previous, previous, ItemStats{}, time.Since(start)) + arch.trackItem(snPath, previous, previous, &ItemStats{}, time.Since(start)) arch.CompleteBlob(previous.Size) node, err := arch.nodeFromFileInfo(snPath, target, meta, false) if err != nil { @@ -567,8 +571,8 @@ func (arch *Archiver) save(ctx context.Context, snPath, target string, previous fn = arch.fileSaver.Save(ctx, snPath, target, meta, func() { arch.StartFile(snPath) }, func() { - arch.trackItem(snPath, nil, nil, ItemStats{}, 0) - }, func(node *data.Node, stats ItemStats) { + arch.trackItem(snPath, nil, nil, &ItemStats{}, 0) + }, func(node *data.Node, stats *ItemStats) { arch.trackItem(snPath, previous, node, stats, time.Since(start)) }) @@ -585,7 +589,7 @@ func (arch *Archiver) save(ctx context.Context, snPath, target string, previous } fn, err = arch.saveDir(ctx, snPath, target, meta, oldSubtree, - func(node *data.Node, stats ItemStats) { + func(node *data.Node, stats *ItemStats) { arch.trackItem(snItem, previous, node, stats, time.Since(start)) }) if err != nil { @@ -728,7 +732,7 @@ func (arch *Archiver) saveTree(ctx context.Context, snPath string, atree *tree, } // not a leaf node, archive subtree - fn, _, err := arch.saveTree(ctx, join(snPath, name), &subatree, oldSubtree, func(n *data.Node, is ItemStats) { + fn, _, err := arch.saveTree(ctx, join(snPath, name), &subatree, oldSubtree, func(n *data.Node, is *ItemStats) { arch.trackItem(snItem, oldNode, n, is, time.Since(start)) }) if err != nil { @@ -886,7 +890,7 @@ func (arch *Archiver) Snapshot(ctx context.Context, targets []string, opts Snaps arch.runWorkers(wgCtx, wg, uploader) debug.Log("starting snapshot") - fn, nodeCount, err := arch.saveTree(wgCtx, "/", atree, arch.loadParentTree(wgCtx, opts.ParentSnapshot), func(_ *data.Node, is ItemStats) { + fn, nodeCount, err := arch.saveTree(wgCtx, "/", atree, arch.loadParentTree(wgCtx, opts.ParentSnapshot), func(_ *data.Node, is *ItemStats) { arch.trackItem("/", nil, nil, is, time.Since(start)) }) if err != nil { @@ -954,10 +958,10 @@ func (arch *Archiver) Snapshot(ctx context.Context, targets []string, opts Snaps DirsNew: arch.summary.Dirs.New, DirsChanged: arch.summary.Dirs.Changed, DirsUnmodified: arch.summary.Dirs.Unchanged, - DataBlobs: arch.summary.ItemStats.DataBlobs, - TreeBlobs: arch.summary.ItemStats.TreeBlobs, - DataAdded: arch.summary.ItemStats.DataSize + arch.summary.ItemStats.TreeSize, - DataAddedPacked: arch.summary.ItemStats.DataSizeInRepo + arch.summary.ItemStats.TreeSizeInRepo, + DataBlobs: int(arch.summary.ItemStats.DataBlobs.Load()), + TreeBlobs: int(arch.summary.ItemStats.TreeBlobs.Load()), + DataAdded: arch.summary.ItemStats.DataSize.Load() + arch.summary.ItemStats.TreeSize.Load(), + DataAddedPacked: arch.summary.ItemStats.DataSizeInRepo.Load() + arch.summary.ItemStats.TreeSizeInRepo.Load(), TotalFilesProcessed: arch.summary.Files.New + arch.summary.Files.Changed + arch.summary.Files.Unchanged, TotalBytesProcessed: arch.summary.ProcessedBytes, } diff --git a/internal/archiver/archiver_test.go b/internal/archiver/archiver_test.go index 98698a7c0..0ff6c4429 100644 --- a/internal/archiver/archiver_test.go +++ b/internal/archiver/archiver_test.go @@ -38,12 +38,12 @@ func prepareTempdirRepoSrc(t testing.TB, src TestDir) (string, *repository.Repos return tempdir, repo } -func saveFile(t testing.TB, repo archiverRepo, filename string, filesystem fs.FS) (*data.Node, ItemStats) { +func saveFile(t testing.TB, repo archiverRepo, filename string, filesystem fs.FS) (*data.Node, *ItemStats) { var ( completeReadingCallback bool completeCallbackNode *data.Node - completeCallbackStats ItemStats + completeCallbackStats *ItemStats completeCallback bool startCallback bool @@ -67,7 +67,7 @@ func saveFile(t testing.TB, repo archiverRepo, filename string, filesystem fs.FS } } - complete := func(node *data.Node, stats ItemStats) { + complete := func(node *data.Node, stats *ItemStats) { completeCallback = true completeCallbackNode = node completeCallbackStats = stats @@ -117,7 +117,7 @@ func saveFile(t testing.TB, repo archiverRepo, filename string, filesystem fs.FS } if completeCallbackStats != fnr.stats { - t.Errorf("different stats return for complete callback, want:\n %v\ngot:\n %v", fnr.stats, completeCallbackStats) + t.Errorf("different stats returned for complete callback") } return fnr.node, fnr.stats @@ -139,17 +139,17 @@ func TestArchiverSaveFile(t *testing.T) { node, stats := saveFile(t, repo, filepath.Join(tempdir, "file"), fs.Track{FS: fs.Local{}}) TestEnsureFileContent(ctx, t, repo, "file", node, testfile) - if stats.DataSize != uint64(len(testfile.Content)) { - t.Errorf("wrong stats returned in DataSize, want %d, got %d", len(testfile.Content), stats.DataSize) + if stats.DataSize.Load() != uint64(len(testfile.Content)) { + t.Errorf("wrong stats returned in DataSize, want %d, got %d", len(testfile.Content), stats.DataSize.Load()) } - if stats.DataBlobs <= 0 && len(testfile.Content) > 0 { - t.Errorf("wrong stats returned in DataBlobs, want > 0, got %d", stats.DataBlobs) + if stats.DataBlobs.Load() <= 0 && len(testfile.Content) > 0 { + t.Errorf("wrong stats returned in DataBlobs, want > 0, got %d", stats.DataBlobs.Load()) } - if stats.TreeSize != 0 { - t.Errorf("wrong stats returned in TreeSize, want 0, got %d", stats.TreeSize) + if stats.TreeSize.Load() != 0 { + t.Errorf("wrong stats returned in TreeSize, want 0, got %d", stats.TreeSize.Load()) } - if stats.TreeBlobs != 0 { - t.Errorf("wrong stats returned in DataBlobs, want 0, got %d", stats.DataBlobs) + if stats.TreeBlobs.Load() != 0 { + t.Errorf("wrong stats returned in DataBlobs, want 0, got %d", stats.DataBlobs.Load()) } }) } @@ -181,17 +181,17 @@ func TestArchiverSaveFileReaderFS(t *testing.T) { node, stats := saveFile(t, repo, filename, readerFs) TestEnsureFileContent(ctx, t, repo, "file", node, TestFile{Content: test.Data}) - if stats.DataSize != uint64(len(test.Data)) { - t.Errorf("wrong stats returned in DataSize, want %d, got %d", len(test.Data), stats.DataSize) + if stats.DataSize.Load() != uint64(len(test.Data)) { + t.Errorf("wrong stats returned in DataSize, want %d, got %d", len(test.Data), stats.DataSize.Load()) } - if stats.DataBlobs <= 0 && len(test.Data) > 0 { - t.Errorf("wrong stats returned in DataBlobs, want > 0, got %d", stats.DataBlobs) + if stats.DataBlobs.Load() <= 0 && len(test.Data) > 0 { + t.Errorf("wrong stats returned in DataBlobs, want > 0, got %d", stats.DataBlobs.Load()) } - if stats.TreeSize != 0 { - t.Errorf("wrong stats returned in TreeSize, want 0, got %d", stats.TreeSize) + if stats.TreeSize.Load() != 0 { + t.Errorf("wrong stats returned in TreeSize, want 0, got %d", stats.TreeSize.Load()) } - if stats.TreeBlobs != 0 { - t.Errorf("wrong stats returned in DataBlobs, want 0, got %d", stats.DataBlobs) + if stats.TreeBlobs.Load() != 0 { + t.Errorf("wrong stats returned in DataBlobs, want 0, got %d", stats.DataBlobs.Load()) } }) } @@ -250,17 +250,17 @@ func TestArchiverSave(t *testing.T) { TestEnsureFileContent(ctx, t, repo, "file", fnr.node, testfile) stats := fnr.stats - if stats.DataSize != uint64(len(testfile.Content)) { - t.Errorf("wrong stats returned in DataSize, want %d, got %d", len(testfile.Content), stats.DataSize) + if stats.DataSize.Load() != uint64(len(testfile.Content)) { + t.Errorf("wrong stats returned in DataSize, want %d, got %d", len(testfile.Content), stats.DataSize.Load()) } - if stats.DataBlobs <= 0 && len(testfile.Content) > 0 { - t.Errorf("wrong stats returned in DataBlobs, want > 0, got %d", stats.DataBlobs) + if stats.DataBlobs.Load() <= 0 && len(testfile.Content) > 0 { + t.Errorf("wrong stats returned in DataBlobs, want > 0, got %d", stats.DataBlobs.Load()) } - if stats.TreeSize != 0 { - t.Errorf("wrong stats returned in TreeSize, want 0, got %d", stats.TreeSize) + if stats.TreeSize.Load() != 0 { + t.Errorf("wrong stats returned in TreeSize, want 0, got %d", stats.TreeSize.Load()) } - if stats.TreeBlobs != 0 { - t.Errorf("wrong stats returned in DataBlobs, want 0, got %d", stats.DataBlobs) + if stats.TreeBlobs.Load() != 0 { + t.Errorf("wrong stats returned in DataBlobs, want 0, got %d", stats.DataBlobs.Load()) } }) } @@ -328,17 +328,17 @@ func TestArchiverSaveReaderFS(t *testing.T) { TestEnsureFileContent(ctx, t, repo, "file", fnr.node, TestFile{Content: test.Data}) stats := fnr.stats - if stats.DataSize != uint64(len(test.Data)) { - t.Errorf("wrong stats returned in DataSize, want %d, got %d", len(test.Data), stats.DataSize) + if stats.DataSize.Load() != uint64(len(test.Data)) { + t.Errorf("wrong stats returned in DataSize, want %d, got %d", len(test.Data), stats.DataSize.Load()) } - if stats.DataBlobs <= 0 && len(test.Data) > 0 { - t.Errorf("wrong stats returned in DataBlobs, want > 0, got %d", stats.DataBlobs) + if stats.DataBlobs.Load() <= 0 && len(test.Data) > 0 { + t.Errorf("wrong stats returned in DataBlobs, want > 0, got %d", stats.DataBlobs.Load()) } - if stats.TreeSize != 0 { - t.Errorf("wrong stats returned in TreeSize, want 0, got %d", stats.TreeSize) + if stats.TreeSize.Load() != 0 { + t.Errorf("wrong stats returned in TreeSize, want 0, got %d", stats.TreeSize.Load()) } - if stats.TreeBlobs != 0 { - t.Errorf("wrong stats returned in DataBlobs, want 0, got %d", stats.DataBlobs) + if stats.TreeBlobs.Load() != 0 { + t.Errorf("wrong stats returned in DataBlobs, want 0, got %d", stats.DataBlobs.Load()) } }) } @@ -360,17 +360,17 @@ func BenchmarkArchiverSaveFileSmall(b *testing.B) { _, stats := saveFile(b, repo, filepath.Join(tempdir, "file"), fs.Track{FS: fs.Local{}}) b.StopTimer() - if stats.DataSize != fileSize { - b.Errorf("wrong stats returned in DataSize, want %d, got %d", fileSize, stats.DataSize) + if stats.DataSize.Load() != fileSize { + b.Errorf("wrong stats returned in DataSize, want %d, got %d", fileSize, stats.DataSize.Load()) } - if stats.DataBlobs <= 0 { - b.Errorf("wrong stats returned in DataBlobs, want > 0, got %d", stats.DataBlobs) + if stats.DataBlobs.Load() <= 0 { + b.Errorf("wrong stats returned in DataBlobs, want > 0, got %d", stats.DataBlobs.Load()) } - if stats.TreeSize != 0 { - b.Errorf("wrong stats returned in TreeSize, want 0, got %d", stats.TreeSize) + if stats.TreeSize.Load() != 0 { + b.Errorf("wrong stats returned in TreeSize, want 0, got %d", stats.TreeSize.Load()) } - if stats.TreeBlobs != 0 { - b.Errorf("wrong stats returned in DataBlobs, want 0, got %d", stats.DataBlobs) + if stats.TreeBlobs.Load() != 0 { + b.Errorf("wrong stats returned in DataBlobs, want 0, got %d", stats.DataBlobs.Load()) } b.StartTimer() } @@ -400,7 +400,7 @@ func BenchmarkArchiverSaveFileConcurrent(b *testing.B) { } b.StartTimer() - err := repo.WithBlobUploader(context.Background(), func(ctx context.Context, uploader restic.BlobSaverWithAsync) error { + err := repo.WithBlobUploader(context.TODO(), func(ctx context.Context, uploader restic.BlobSaverWithAsync) error { wg, ctx := errgroup.WithContext(ctx) arch.runWorkers(ctx, wg, uploader) @@ -412,7 +412,7 @@ func BenchmarkArchiverSaveFileConcurrent(b *testing.B) { b.Fatal(err) } - fn := arch.fileSaver.Save(ctx, "/", filename, file, func() {}, func() {}, func(*data.Node, ItemStats) {}) + fn := arch.fileSaver.Save(ctx, "/", filename, file, func() {}, func() {}, func(*data.Node, *ItemStats) {}) futures = append(futures, fn) } @@ -448,17 +448,17 @@ func BenchmarkArchiverSaveFileLarge(b *testing.B) { _, stats := saveFile(b, repo, filepath.Join(tempdir, "file"), fs.Track{FS: fs.Local{}}) b.StopTimer() - if stats.DataSize != fileSize { - b.Errorf("wrong stats returned in DataSize, want %d, got %d", fileSize, stats.DataSize) + if stats.DataSize.Load() != fileSize { + b.Errorf("wrong stats returned in DataSize, want %d, got %d", fileSize, stats.DataSize.Load()) } - if stats.DataBlobs <= 0 { - b.Errorf("wrong stats returned in DataBlobs, want > 0, got %d", stats.DataBlobs) + if stats.DataBlobs.Load() <= 0 { + b.Errorf("wrong stats returned in DataBlobs, want > 0, got %d", stats.DataBlobs.Load()) } - if stats.TreeSize != 0 { - b.Errorf("wrong stats returned in TreeSize, want 0, got %d", stats.TreeSize) + if stats.TreeSize.Load() != 0 { + b.Errorf("wrong stats returned in TreeSize, want 0, got %d", stats.TreeSize.Load()) } - if stats.TreeBlobs != 0 { - b.Errorf("wrong stats returned in DataBlobs, want 0, got %d", stats.DataBlobs) + if stats.TreeBlobs.Load() != 0 { + b.Errorf("wrong stats returned in DataBlobs, want 0, got %d", stats.DataBlobs.Load()) } b.StartTimer() } @@ -906,7 +906,7 @@ func TestArchiverSaveDir(t *testing.T) { defer back() var treeID restic.ID - err := repo.WithBlobUploader(context.TODO(), func(ctx context.Context, uploader restic.BlobSaverWithAsync) error { + err := repo.WithBlobUploader(context.Background(), func(ctx context.Context, uploader restic.BlobSaverWithAsync) error { wg, ctx := errgroup.WithContext(ctx) arch.runWorkers(ctx, wg, uploader) meta, err := testFS.OpenFile(test.target, fs.O_NOFOLLOW, true) @@ -919,17 +919,17 @@ func TestArchiverSaveDir(t *testing.T) { node, stats := fnr.node, fnr.stats t.Logf("stats: %v", stats) - if stats.DataSize != 0 { - t.Errorf("wrong stats returned in DataSize, want 0, got %d", stats.DataSize) + if stats.DataSize.Load() != 0 { + t.Errorf("wrong stats returned in DataSize, want 0, got %d", stats.DataSize.Load()) } - if stats.DataBlobs != 0 { - t.Errorf("wrong stats returned in DataBlobs, want 0, got %d", stats.DataBlobs) + if stats.DataBlobs.Load() != 0 { + t.Errorf("wrong stats returned in DataBlobs, want 0, got %d", stats.DataBlobs.Load()) } - if stats.TreeSize == 0 { - t.Errorf("wrong stats returned in TreeSize, want > 0, got %d", stats.TreeSize) + if stats.TreeSize.Load() == 0 { + t.Errorf("wrong stats returned in TreeSize, want > 0, got %d", stats.TreeSize.Load()) } - if stats.TreeBlobs <= 0 { - t.Errorf("wrong stats returned in TreeBlobs, want > 0, got %d", stats.TreeBlobs) + if stats.TreeBlobs.Load() <= 0 { + t.Errorf("wrong stats returned in TreeBlobs, want > 0, got %d", stats.TreeBlobs.Load()) } node.Name = targetNodeName @@ -989,31 +989,31 @@ func TestArchiverSaveDirIncremental(t *testing.T) { node, stats := fnr.node, fnr.stats if i == 0 { // operation must have added new tree data - if stats.DataSize != 0 { - t.Errorf("wrong stats returned in DataSize, want 0, got %d", stats.DataSize) + if stats.DataSize.Load() != 0 { + t.Errorf("wrong stats returned in DataSize, want 0, got %d", stats.DataSize.Load()) } - if stats.DataBlobs != 0 { - t.Errorf("wrong stats returned in DataBlobs, want 0, got %d", stats.DataBlobs) + if stats.DataBlobs.Load() != 0 { + t.Errorf("wrong stats returned in DataBlobs, want 0, got %d", stats.DataBlobs.Load()) } - if stats.TreeSize == 0 { - t.Errorf("wrong stats returned in TreeSize, want > 0, got %d", stats.TreeSize) + if stats.TreeSize.Load() == 0 { + t.Errorf("wrong stats returned in TreeSize, want > 0, got %d", stats.TreeSize.Load()) } - if stats.TreeBlobs <= 0 { - t.Errorf("wrong stats returned in TreeBlobs, want > 0, got %d", stats.TreeBlobs) + if stats.TreeBlobs.Load() <= 0 { + t.Errorf("wrong stats returned in TreeBlobs, want > 0, got %d", stats.TreeBlobs.Load()) } } else { // operation must not have added any new data - if stats.DataSize != 0 { - t.Errorf("wrong stats returned in DataSize, want 0, got %d", stats.DataSize) + if stats.DataSize.Load() != 0 { + t.Errorf("wrong stats returned in DataSize, want 0, got %d", stats.DataSize.Load()) } - if stats.DataBlobs != 0 { - t.Errorf("wrong stats returned in DataBlobs, want 0, got %d", stats.DataBlobs) + if stats.DataBlobs.Load() != 0 { + t.Errorf("wrong stats returned in DataBlobs, want 0, got %d", stats.DataBlobs.Load()) } - if stats.TreeSize != 0 { - t.Errorf("wrong stats returned in TreeSize, want 0, got %d", stats.TreeSize) + if stats.TreeSize.Load() != 0 { + t.Errorf("wrong stats returned in TreeSize, want 0, got %d", stats.TreeSize.Load()) } - if stats.TreeBlobs != 0 { - t.Errorf("wrong stats returned in TreeBlobs, want 0, got %d", stats.TreeBlobs) + if stats.TreeBlobs.Load() != 0 { + t.Errorf("wrong stats returned in TreeBlobs, want 0, got %d", stats.TreeBlobs.Load()) } } @@ -1027,6 +1027,18 @@ func TestArchiverSaveDirIncremental(t *testing.T) { } } +// newItemStats creates an ItemStats with the given values pre-loaded into the atomic fields. +func newItemStats(dataBlobs int64, dataSize, dataSizeInRepo uint64, treeBlobs int64, treeSize, treeSizeInRepo uint64) ItemStats { + var s ItemStats + s.DataBlobs.Store(dataBlobs) + s.DataSize.Store(dataSize) + s.DataSizeInRepo.Store(dataSizeInRepo) + s.TreeBlobs.Store(treeBlobs) + s.TreeSize.Store(treeSize) + s.TreeSizeInRepo.Store(treeSizeInRepo) + return s +} + // bothZeroOrNeither fails the test if only one of exp, act is zero. func bothZeroOrNeither(tb testing.TB, exp, act uint64) { tb.Helper() @@ -1062,7 +1074,7 @@ func TestArchiverSaveTree(t *testing.T) { "targetfile": TestFile{Content: "foobar"}, }, stat: Summary{ - ItemStats: ItemStats{1, 6, 32 + 6, 0, 0, 0}, + ItemStats: newItemStats(1, 6, 32+6, 0, 0, 0), ProcessedBytes: 6, Files: ChangeStats{1, 0, 0}, Dirs: ChangeStats{0, 0, 0}, @@ -1079,7 +1091,7 @@ func TestArchiverSaveTree(t *testing.T) { "filesymlink": TestSymlink{Target: "targetfile"}, }, stat: Summary{ - ItemStats: ItemStats{1, 6, 32 + 6, 0, 0, 0}, + ItemStats: newItemStats(1, 6, 32+6, 0, 0, 0), ProcessedBytes: 6, Files: ChangeStats{1, 0, 0}, Dirs: ChangeStats{0, 0, 0}, @@ -1104,7 +1116,7 @@ func TestArchiverSaveTree(t *testing.T) { }, }, stat: Summary{ - ItemStats: ItemStats{0, 0, 0, 1, 0x154, 0x16a}, + ItemStats: newItemStats(0, 0, 0, 1, 0x154, 0x16a), ProcessedBytes: 0, Files: ChangeStats{0, 0, 0}, Dirs: ChangeStats{1, 0, 0}, @@ -1133,7 +1145,7 @@ func TestArchiverSaveTree(t *testing.T) { }, }, stat: Summary{ - ItemStats: ItemStats{1, 6, 32 + 6, 3, 0x47f, 0x4c1}, + ItemStats: newItemStats(1, 6, 32+6, 3, 0x47f, 0x4c1), ProcessedBytes: 6, Files: ChangeStats{1, 0, 0}, Dirs: ChangeStats{3, 0, 0}, @@ -1192,11 +1204,11 @@ func TestArchiverSaveTree(t *testing.T) { } TestEnsureTree(context.TODO(), t, "/", repo, treeID, want) stat := arch.summary - bothZeroOrNeither(t, uint64(test.stat.DataBlobs), uint64(stat.DataBlobs)) - bothZeroOrNeither(t, uint64(test.stat.TreeBlobs), uint64(stat.TreeBlobs)) - bothZeroOrNeither(t, test.stat.DataSize, stat.DataSize) - bothZeroOrNeither(t, test.stat.DataSizeInRepo, stat.DataSizeInRepo) - bothZeroOrNeither(t, test.stat.TreeSizeInRepo, stat.TreeSizeInRepo) + bothZeroOrNeither(t, uint64(test.stat.DataBlobs.Load()), uint64(stat.DataBlobs.Load())) + bothZeroOrNeither(t, uint64(test.stat.TreeBlobs.Load()), uint64(stat.TreeBlobs.Load())) + bothZeroOrNeither(t, test.stat.DataSize.Load(), stat.DataSize.Load()) + bothZeroOrNeither(t, test.stat.DataSizeInRepo.Load(), stat.DataSizeInRepo.Load()) + bothZeroOrNeither(t, test.stat.TreeSizeInRepo.Load(), stat.TreeSizeInRepo.Load()) rtest.Equals(t, test.stat.ProcessedBytes, stat.ProcessedBytes) rtest.Equals(t, test.stat.Files, stat.Files) rtest.Equals(t, test.stat.Dirs, stat.Dirs) @@ -1740,10 +1752,10 @@ func checkSnapshotStats(t *testing.T, sn *data.Snapshot, stat Summary) { rtest.Equals(t, stat.Dirs.Unchanged, sn.Summary.DirsUnmodified, "DirsUnmodified") rtest.Equals(t, stat.ProcessedBytes, sn.Summary.TotalBytesProcessed, "TotalBytesProcessed") rtest.Equals(t, stat.Files.New+stat.Files.Changed+stat.Files.Unchanged, sn.Summary.TotalFilesProcessed, "TotalFilesProcessed") - bothZeroOrNeither(t, uint64(stat.DataBlobs), uint64(sn.Summary.DataBlobs)) - bothZeroOrNeither(t, uint64(stat.TreeBlobs), uint64(sn.Summary.TreeBlobs)) - bothZeroOrNeither(t, stat.DataSize+stat.TreeSize, sn.Summary.DataAdded) - bothZeroOrNeither(t, stat.DataSizeInRepo+stat.TreeSizeInRepo, sn.Summary.DataAddedPacked) + bothZeroOrNeither(t, uint64(stat.DataBlobs.Load()), uint64(sn.Summary.DataBlobs)) + bothZeroOrNeither(t, uint64(stat.TreeBlobs.Load()), uint64(sn.Summary.TreeBlobs)) + bothZeroOrNeither(t, stat.DataSize.Load()+stat.TreeSize.Load(), sn.Summary.DataAdded) + bothZeroOrNeither(t, stat.DataSizeInRepo.Load()+stat.TreeSizeInRepo.Load(), sn.Summary.DataAddedPacked) } func TestArchiverParent(t *testing.T) { @@ -1762,7 +1774,7 @@ func TestArchiverParent(t *testing.T) { Files: ChangeStats{1, 0, 0}, Dirs: ChangeStats{0, 0, 0}, ProcessedBytes: 2102152, - ItemStats: ItemStats{3, 0x201593, 0x201632, 1, 0, 0}, + ItemStats: newItemStats(3, 0x201593, 0x201632, 1, 0, 0), }, statSecond: Summary{ Files: ChangeStats{0, 0, 1}, @@ -1781,7 +1793,7 @@ func TestArchiverParent(t *testing.T) { Files: ChangeStats{1, 0, 0}, Dirs: ChangeStats{0, 0, 0}, ProcessedBytes: 2102152, - ItemStats: ItemStats{3, 0x201593, 0x201632, 1, 0, 0}, + ItemStats: newItemStats(3, 0x201593, 0x201632, 1, 0, 0), }, statSecond: Summary{ Files: ChangeStats{0, 0, 1}, @@ -1800,7 +1812,7 @@ func TestArchiverParent(t *testing.T) { Files: ChangeStats{2, 0, 0}, Dirs: ChangeStats{1, 0, 0}, ProcessedBytes: 2469, - ItemStats: ItemStats{2, 0xe1c, 0xcd9, 2, 0, 0}, + ItemStats: newItemStats(2, 0xe1c, 0xcd9, 2, 0, 0), }, statSecond: Summary{ Files: ChangeStats{0, 0, 2}, @@ -1823,13 +1835,13 @@ func TestArchiverParent(t *testing.T) { Files: ChangeStats{2, 0, 0}, Dirs: ChangeStats{1, 0, 0}, ProcessedBytes: 2469, - ItemStats: ItemStats{2, 0xe13, 0xcf8, 2, 0, 0}, + ItemStats: newItemStats(2, 0xe13, 0xcf8, 2, 0, 0), }, statSecond: Summary{ Files: ChangeStats{0, 1, 0}, Dirs: ChangeStats{0, 1, 0}, ProcessedBytes: 6, - ItemStats: ItemStats{1, 0x305, 0x233, 2, 0, 0}, + ItemStats: newItemStats(1, 0x305, 0x233, 2, 0, 0), }, }, } diff --git a/internal/archiver/file_saver.go b/internal/archiver/file_saver.go index 84e175d82..732f47f70 100644 --- a/internal/archiver/file_saver.go +++ b/internal/archiver/file_saver.go @@ -59,7 +59,7 @@ func (s *fileSaver) TriggerShutdown() { } // fileCompleteFunc is called when the file has been saved. -type fileCompleteFunc func(*data.Node, ItemStats) +type fileCompleteFunc func(*data.Node, *ItemStats) // Save stores the file f and returns the data once it has been completed. The // file is closed by Save. completeReading is only called if the file was read @@ -107,6 +107,7 @@ func (s *fileSaver) saveFile(ctx context.Context, chnker *chunker.Chunker, snPat fnr := futureNodeResult{ snPath: snPath, target: target, + stats: &ItemStats{}, } var lock sync.Mutex remaining := 0 @@ -141,7 +142,7 @@ func (s *fileSaver) saveFile(ctx context.Context, chnker *chunker.Chunker, snPat isCompleted = true fnr.err = fmt.Errorf("failed to save %v: %w", target, err) fnr.node = nil - fnr.stats = ItemStats{} + fnr.stats = &ItemStats{} finish(fnr) } } @@ -206,9 +207,9 @@ func (s *fileSaver) saveFile(ctx context.Context, chnker *chunker.Chunker, snPat lock.Lock() if !known { - fnr.stats.DataBlobs++ - fnr.stats.DataSize += uint64(len(buf.Data)) - fnr.stats.DataSizeInRepo += uint64(sizeInRepo) + fnr.stats.DataBlobs.Add(1) + fnr.stats.DataSize.Add(uint64(len(buf.Data))) + fnr.stats.DataSizeInRepo.Add(uint64(sizeInRepo)) } node.Content[pos] = newID lock.Unlock() diff --git a/internal/archiver/file_saver_test.go b/internal/archiver/file_saver_test.go index 4dbf78548..f1daa1a81 100644 --- a/internal/archiver/file_saver_test.go +++ b/internal/archiver/file_saver_test.go @@ -54,7 +54,7 @@ func TestFileSaver(t *testing.T) { startFn := func() {} completeReadingFn := func() {} - completeFn := func(*data.Node, ItemStats) {} + completeFn := func(*data.Node, *ItemStats) {} files := createTestFiles(t, 15) testFs := fs.Local{} diff --git a/internal/archiver/tree_saver.go b/internal/archiver/tree_saver.go index 8b38b5eb2..305e08b91 100644 --- a/internal/archiver/tree_saver.go +++ b/internal/archiver/tree_saver.go @@ -73,7 +73,7 @@ type saveTreeJob struct { } // save stores the nodes as a tree in the repo. -func (s *treeSaver) save(ctx context.Context, job *saveTreeJob) (*data.Node, ItemStats, error) { +func (s *treeSaver) save(ctx context.Context, job *saveTreeJob) (*data.Node, *ItemStats, error) { var stats ItemStats node := job.node nodes := job.nodes @@ -92,7 +92,7 @@ func (s *treeSaver) save(ctx context.Context, job *saveTreeJob) (*data.Node, Ite if fnr.err != nil { debug.Log("err for %v: %v", fnr.snPath, fnr.err) if fnr.err == context.Canceled { - return nil, stats, fnr.err + return nil, &stats, fnr.err } fnr.err = s.errFn(fnr.target, fnr.err) @@ -101,7 +101,7 @@ func (s *treeSaver) save(ctx context.Context, job *saveTreeJob) (*data.Node, Ite continue } - return nil, stats, fnr.err + return nil, &stats, fnr.err } // when the error is ignored, the node could not be saved, so ignore it @@ -119,14 +119,14 @@ func (s *treeSaver) save(ctx context.Context, job *saveTreeJob) (*data.Node, Ite } if err != nil { debug.Log("insert %v failed: %v", fnr.node.Name, err) - return nil, stats, err + return nil, &stats, err } lastNode = fnr.node } buf, err := builder.Finalize() if err != nil { - return nil, stats, err + return nil, &stats, err } var ( @@ -149,18 +149,18 @@ func (s *treeSaver) save(ctx context.Context, job *saveTreeJob) (*data.Node, Ite select { case <-ch: if err != nil { - return nil, stats, err + return nil, &stats, err } if !known { - stats.TreeBlobs++ - stats.TreeSize += uint64(length) - stats.TreeSizeInRepo += uint64(sizeInRepo) + stats.TreeBlobs.Add(1) + stats.TreeSize.Add(uint64(length)) + stats.TreeSizeInRepo.Add(uint64(sizeInRepo)) } node.Subtree = &id - return node, stats, nil + return node, &stats, nil case <-ctx.Done(): - return nil, stats, ctx.Err() + return nil, &stats, ctx.Err() } } diff --git a/internal/ui/backup/json.go b/internal/ui/backup/json.go index b46bbdc5f..338efd39d 100644 --- a/internal/ui/backup/json.go +++ b/internal/ui/backup/json.go @@ -39,20 +39,20 @@ func (b *JSONProgress) error(status interface{}) { } // Update updates the status lines. -func (b *JSONProgress) Update(total, processed Counter, errors uint, currentFiles map[string]struct{}, start time.Time, secs uint64) { +func (b *JSONProgress) Update(total, processed Counter, errors uint32, currentFiles map[string]struct{}, start time.Time, secs uint64) { status := statusUpdate{ MessageType: "status", SecondsElapsed: uint64(time.Since(start) / time.Second), SecondsRemaining: secs, - TotalFiles: total.Files, - FilesDone: processed.Files, - TotalBytes: total.Bytes, - BytesDone: processed.Bytes, + TotalFiles: total.Files.Load(), + FilesDone: processed.Files.Load(), + TotalBytes: total.Bytes.Load(), + BytesDone: processed.Bytes.Load(), ErrorCount: errors, } - if total.Bytes > 0 { - status.PercentDone = float64(processed.Bytes) / float64(total.Bytes) + if status.TotalBytes > 0 { + status.PercentDone = float64(processed.Bytes.Load()) / float64(total.Bytes.Load()) } for filename := range currentFiles { @@ -88,7 +88,7 @@ func (b *JSONProgress) Error(item string, err error) error { // CompleteItem is the status callback function for the archiver when a // file/dir has been saved successfully. -func (b *JSONProgress) CompleteItem(messageType, item string, s archiver.ItemStats, d time.Duration) { +func (b *JSONProgress) CompleteItem(messageType, item string, s *archiver.ItemStats, d time.Duration) { if b.v < 2 { return } @@ -100,10 +100,10 @@ func (b *JSONProgress) CompleteItem(messageType, item string, s archiver.ItemSta Action: "new", Item: item, Duration: d.Seconds(), - DataSize: s.DataSize, - DataSizeInRepo: s.DataSizeInRepo, - MetadataSize: s.TreeSize, - MetadataSizeInRepo: s.TreeSizeInRepo, + DataSize: s.DataSize.Load(), + DataSizeInRepo: s.DataSizeInRepo.Load(), + MetadataSize: s.TreeSize.Load(), + MetadataSizeInRepo: s.TreeSizeInRepo.Load(), }) case "dir unchanged": b.print(verboseUpdate{ @@ -117,10 +117,10 @@ func (b *JSONProgress) CompleteItem(messageType, item string, s archiver.ItemSta Action: "modified", Item: item, Duration: d.Seconds(), - DataSize: s.DataSize, - DataSizeInRepo: s.DataSizeInRepo, - MetadataSize: s.TreeSize, - MetadataSizeInRepo: s.TreeSizeInRepo, + DataSize: s.DataSize.Load(), + DataSizeInRepo: s.DataSizeInRepo.Load(), + MetadataSize: s.TreeSize.Load(), + MetadataSizeInRepo: s.TreeSizeInRepo.Load(), }) case "file new": b.print(verboseUpdate{ @@ -128,8 +128,8 @@ func (b *JSONProgress) CompleteItem(messageType, item string, s archiver.ItemSta Action: "new", Item: item, Duration: d.Seconds(), - DataSize: s.DataSize, - DataSizeInRepo: s.DataSizeInRepo, + DataSize: s.DataSize.Load(), + DataSizeInRepo: s.DataSizeInRepo.Load(), }) case "file unchanged": b.print(verboseUpdate{ @@ -143,8 +143,8 @@ func (b *JSONProgress) CompleteItem(messageType, item string, s archiver.ItemSta Action: "modified", Item: item, Duration: d.Seconds(), - DataSize: s.DataSize, - DataSizeInRepo: s.DataSizeInRepo, + DataSize: s.DataSize.Load(), + DataSizeInRepo: s.DataSizeInRepo.Load(), }) } } @@ -177,10 +177,10 @@ func (b *JSONProgress) Finish(snapshotID restic.ID, summary *archiver.Summary, d DirsNew: summary.Dirs.New, DirsChanged: summary.Dirs.Changed, DirsUnmodified: summary.Dirs.Unchanged, - DataBlobs: summary.ItemStats.DataBlobs, - TreeBlobs: summary.ItemStats.TreeBlobs, - DataAdded: summary.ItemStats.DataSize + summary.ItemStats.TreeSize, - DataAddedPacked: summary.ItemStats.DataSizeInRepo + summary.ItemStats.TreeSizeInRepo, + DataBlobs: int(summary.ItemStats.DataBlobs.Load()), + TreeBlobs: int(summary.ItemStats.TreeBlobs.Load()), + DataAdded: summary.ItemStats.DataSize.Load() + summary.ItemStats.TreeSize.Load(), + DataAddedPacked: summary.ItemStats.DataSizeInRepo.Load() + summary.ItemStats.TreeSizeInRepo.Load(), TotalFilesProcessed: summary.Files.New + summary.Files.Changed + summary.Files.Unchanged, TotalBytesProcessed: summary.ProcessedBytes, BackupStart: summary.BackupStart, @@ -204,7 +204,7 @@ type statusUpdate struct { FilesDone uint64 `json:"files_done,omitempty"` TotalBytes uint64 `json:"total_bytes,omitempty"` BytesDone uint64 `json:"bytes_done,omitempty"` - ErrorCount uint `json:"error_count,omitempty"` + ErrorCount uint32 `json:"error_count,omitempty"` CurrentFiles []string `json:"current_files,omitempty"` } diff --git a/internal/ui/backup/progress.go b/internal/ui/backup/progress.go index 17ccfa8f4..f9dfacecc 100644 --- a/internal/ui/backup/progress.go +++ b/internal/ui/backup/progress.go @@ -2,6 +2,7 @@ package backup import ( "sync" + "sync/atomic" "time" "github.com/restic/restic/internal/archiver" @@ -13,10 +14,10 @@ import ( // A ProgressPrinter can print various progress messages. // It must be safe to call its methods from concurrent goroutines. type ProgressPrinter interface { - Update(total, processed Counter, errors uint, currentFiles map[string]struct{}, start time.Time, secs uint64) + Update(total, processed Counter, errors uint32, currentFiles map[string]struct{}, start time.Time, secs uint64) Error(item string, err error) error ScannerError(item string, err error) error - CompleteItem(messageType string, item string, s archiver.ItemStats, d time.Duration) + CompleteItem(messageType string, item string, s *archiver.ItemStats, d time.Duration) ReportTotal(start time.Time, s archiver.ScanStats) Finish(snapshotID restic.ID, summary *archiver.Summary, dryRun bool) Reset() @@ -25,7 +26,7 @@ type ProgressPrinter interface { } type Counter struct { - Files, Dirs, Bytes uint64 + Files, Dirs, Bytes atomic.Uint64 } // Progress reports progress for the `backup` command. @@ -36,11 +37,11 @@ type Progress struct { start time.Time estimator rateEstimator - scanStarted, scanFinished bool + scanStarted, scanFinished atomic.Bool currentFiles map[string]struct{} processed, total Counter - errors uint + errors atomic.Uint32 printer ProgressPrinter } @@ -56,25 +57,25 @@ func NewProgress(printer ProgressPrinter, interval time.Duration) *Progress { if final { p.printer.Reset() } else { - p.mu.Lock() - defer p.mu.Unlock() - if !p.scanStarted { + if !p.scanStarted.Load() { return } var secondsRemaining uint64 - if p.scanFinished { + if p.scanFinished.Load() { rate := p.estimator.rate(time.Now()) tooSlowCutoff := 1024. if rate <= tooSlowCutoff { secondsRemaining = 0 } else { - todo := float64(p.total.Bytes - p.processed.Bytes) + todo := float64(p.total.Bytes.Load() - p.processed.Bytes.Load()) secondsRemaining = uint64(todo / rate) } } - p.printer.Update(p.total, p.processed, p.errors, p.currentFiles, p.start, secondsRemaining) + p.mu.Lock() + defer p.mu.Unlock() + p.printer.Update(p.total, p.processed, p.errors.Load(), p.currentFiles, p.start, secondsRemaining) } }) return p @@ -82,10 +83,8 @@ func NewProgress(printer ProgressPrinter, interval time.Duration) *Progress { // Error is the error callback function for the archiver, it prints the error and returns nil. func (p *Progress) Error(item string, err error) error { - p.mu.Lock() - p.errors++ - p.scanStarted = true - p.mu.Unlock() + p.errors.Add(1) + p.scanStarted.Store(true) return p.printer.Error(item, err) } @@ -97,24 +96,16 @@ func (p *Progress) StartFile(filename string) { p.currentFiles[filename] = struct{}{} } -func (p *Progress) addProcessed(c Counter) { - p.processed.Files += c.Files - p.processed.Dirs += c.Dirs - p.processed.Bytes += c.Bytes - p.estimator.recordBytes(time.Now(), c.Bytes) - p.scanStarted = true -} - // CompleteBlob is called for all saved blobs for files. func (p *Progress) CompleteBlob(bytes uint64) { - p.mu.Lock() - p.addProcessed(Counter{Bytes: bytes}) - p.mu.Unlock() + p.processed.Bytes.Add(bytes) + p.estimator.recordBytes(time.Now(), bytes) + p.scanStarted.Store(true) } // CompleteItem is the status callback function for the archiver when a // file/dir has been saved successfully. -func (p *Progress) CompleteItem(item string, previous, current *data.Node, s archiver.ItemStats, d time.Duration) { +func (p *Progress) CompleteItem(item string, previous, current *data.Node, s *archiver.ItemStats, d time.Duration) { if current == nil { // error occurred, tell the status display to remove the line p.mu.Lock() @@ -125,9 +116,8 @@ func (p *Progress) CompleteItem(item string, previous, current *data.Node, s arc switch current.Type { case data.NodeTypeDir: - p.mu.Lock() - p.addProcessed(Counter{Dirs: 1}) - p.mu.Unlock() + p.processed.Dirs.Add(1) + p.scanStarted.Store(true) switch { case previous == nil: @@ -139,8 +129,10 @@ func (p *Progress) CompleteItem(item string, previous, current *data.Node, s arc } case data.NodeTypeFile: + p.processed.Files.Add(1) + p.scanStarted.Store(true) + p.mu.Lock() - p.addProcessed(Counter{Files: 1}) delete(p.currentFiles, item) p.mu.Unlock() @@ -157,14 +149,13 @@ func (p *Progress) CompleteItem(item string, previous, current *data.Node, s arc // ReportTotal sets the total stats up to now func (p *Progress) ReportTotal(item string, s archiver.ScanStats) { - p.mu.Lock() - defer p.mu.Unlock() - - p.total = Counter{Files: uint64(s.Files), Dirs: uint64(s.Dirs), Bytes: s.Bytes} - p.scanStarted = true + p.total.Files.Store(uint64(s.Files)) + p.total.Dirs.Store(uint64(s.Dirs)) + p.total.Bytes.Store(uint64(s.Bytes)) + p.scanStarted.Store(true) if item == "" { - p.scanFinished = true + p.scanFinished.Store(true) p.printer.ReportTotal(p.start, s) } } diff --git a/internal/ui/backup/progress_test.go b/internal/ui/backup/progress_test.go index 7b53f2116..0a75629d9 100644 --- a/internal/ui/backup/progress_test.go +++ b/internal/ui/backup/progress_test.go @@ -18,12 +18,12 @@ type mockPrinter struct { id restic.ID } -func (p *mockPrinter) Update(_, _ Counter, _ uint, _ map[string]struct{}, _ time.Time, _ uint64) { +func (p *mockPrinter) Update(_, _ Counter, _ uint32, _ map[string]struct{}, _ time.Time, _ uint64) { } func (p *mockPrinter) Error(_ string, err error) error { return err } func (p *mockPrinter) ScannerError(_ string, err error) error { return err } -func (p *mockPrinter) CompleteItem(messageType string, _ string, _ archiver.ItemStats, _ time.Duration) { +func (p *mockPrinter) CompleteItem(messageType string, _ string, _ *archiver.ItemStats, _ time.Duration) { p.Lock() defer p.Unlock() @@ -56,10 +56,10 @@ func TestProgress(t *testing.T) { // "dir unchanged" node := data.Node{Type: data.NodeTypeDir} - prog.CompleteItem("foo", &node, &node, archiver.ItemStats{}, 0) + prog.CompleteItem("foo", &node, &node, &archiver.ItemStats{}, 0) // "file new" node.Type = data.NodeTypeFile - prog.CompleteItem("foo", nil, &node, archiver.ItemStats{}, 0) + prog.CompleteItem("foo", nil, &node, &archiver.ItemStats{}, 0) time.Sleep(10 * time.Millisecond) id := restic.NewRandomID() diff --git a/internal/ui/backup/rate_estimator.go b/internal/ui/backup/rate_estimator.go index 5291fbae1..b72a3334d 100644 --- a/internal/ui/backup/rate_estimator.go +++ b/internal/ui/backup/rate_estimator.go @@ -2,6 +2,7 @@ package backup import ( "container/list" + "sync" "time" ) @@ -12,7 +13,9 @@ type rateBucket struct { } // rateEstimator represents an estimate of the time to complete an operation. +// It is safe for concurrent use. type rateEstimator struct { + mu sync.Mutex buckets *list.List start time.Time totalBytes uint64 @@ -71,6 +74,9 @@ func (r *rateEstimator) recordBytes(now time.Time, bytes uint64) { if bytes == 0 { return } + r.mu.Lock() + defer r.mu.Unlock() + var tail *rateBucket if r.buckets.Len() > 0 { tail = r.buckets.Back().Value.(*rateBucket) @@ -88,6 +94,9 @@ func (r *rateEstimator) recordBytes(now time.Time, bytes uint64) { // rate returns an estimated bytes per second rate at a given time, or zero // if there is not enough data to compute a rate. func (r *rateEstimator) rate(now time.Time) float64 { + r.mu.Lock() + defer r.mu.Unlock() + r.trim(now) if !r.start.Before(now) { return 0 diff --git a/internal/ui/backup/text.go b/internal/ui/backup/text.go index 8b416da7d..e047ffd95 100644 --- a/internal/ui/backup/text.go +++ b/internal/ui/backup/text.go @@ -32,20 +32,20 @@ func NewTextProgress(term ui.Terminal, verbosity uint) *TextProgress { } // Update updates the status lines. -func (b *TextProgress) Update(total, processed Counter, errors uint, currentFiles map[string]struct{}, start time.Time, secs uint64) { +func (b *TextProgress) Update(total, processed Counter, errors uint32, currentFiles map[string]struct{}, start time.Time, secs uint64) { var status string - if total.Files == 0 && total.Dirs == 0 { + if total.Files.Load() == 0 && total.Dirs.Load() == 0 { // no total count available yet status = fmt.Sprintf("[%s] %v files, %s, %d errors", ui.FormatDuration(time.Since(start)), - processed.Files, ui.FormatBytes(processed.Bytes), errors, + processed.Files.Load(), ui.FormatBytes(processed.Bytes.Load()), errors, ) } else { var eta, percent string - if secs > 0 && processed.Bytes < total.Bytes { + if secs > 0 && processed.Bytes.Load() < total.Bytes.Load() { eta = fmt.Sprintf(" ETA %s", ui.FormatSeconds(secs)) - percent = ui.FormatPercent(processed.Bytes, total.Bytes) + percent = ui.FormatPercent(processed.Bytes.Load(), total.Bytes.Load()) percent += " " } @@ -53,10 +53,10 @@ func (b *TextProgress) Update(total, processed Counter, errors uint, currentFile status = fmt.Sprintf("[%s] %s%v files %s, total %v files %v, %d errors%s", ui.FormatDuration(time.Since(start)), percent, - processed.Files, - ui.FormatBytes(processed.Bytes), - total.Files, - ui.FormatBytes(total.Bytes), + processed.Files.Load(), + ui.FormatBytes(processed.Bytes.Load()), + total.Files.Load(), + ui.FormatBytes(total.Bytes.Load()), errors, eta, ) @@ -89,28 +89,28 @@ func (b *TextProgress) Error(_ string, err error) error { // CompleteItem is the status callback function for the archiver when a // file/dir has been saved successfully. -func (b *TextProgress) CompleteItem(messageType, item string, s archiver.ItemStats, d time.Duration) { +func (b *TextProgress) CompleteItem(messageType, item string, s *archiver.ItemStats, d time.Duration) { item = ui.Quote(item) switch messageType { case "dir new": b.VV("new %v, saved in %.3fs (%v added, %v stored, %v metadata)", - item, d.Seconds(), ui.FormatBytes(s.DataSize), - ui.FormatBytes(s.DataSizeInRepo), ui.FormatBytes(s.TreeSizeInRepo)) + item, d.Seconds(), ui.FormatBytes(s.DataSize.Load()), + ui.FormatBytes(s.DataSizeInRepo.Load()), ui.FormatBytes(s.TreeSizeInRepo.Load())) case "dir unchanged": b.VV("unchanged %v", item) case "dir modified": b.VV("modified %v, saved in %.3fs (%v added, %v stored, %v metadata)", - item, d.Seconds(), ui.FormatBytes(s.DataSize), - ui.FormatBytes(s.DataSizeInRepo), ui.FormatBytes(s.TreeSizeInRepo)) + item, d.Seconds(), ui.FormatBytes(s.DataSize.Load()), + ui.FormatBytes(s.DataSizeInRepo.Load()), ui.FormatBytes(s.TreeSizeInRepo.Load())) case "file new": b.VV("new %v, saved in %.3fs (%v added, %v stored)", item, - d.Seconds(), ui.FormatBytes(s.DataSize), ui.FormatBytes(s.DataSizeInRepo)) + d.Seconds(), ui.FormatBytes(s.DataSize.Load()), ui.FormatBytes(s.DataSizeInRepo.Load())) case "file unchanged": b.VV("unchanged %v", item) case "file modified": b.VV("modified %v, saved in %.3fs (%v added, %v stored)", item, - d.Seconds(), ui.FormatBytes(s.DataSize), ui.FormatBytes(s.DataSizeInRepo)) + d.Seconds(), ui.FormatBytes(s.DataSize.Load()), ui.FormatBytes(s.DataSizeInRepo.Load())) } } @@ -134,15 +134,15 @@ func (b *TextProgress) Finish(id restic.ID, summary *archiver.Summary, dryRun bo b.P("\n") b.P("Files: %5d new, %5d changed, %5d unmodified\n", summary.Files.New, summary.Files.Changed, summary.Files.Unchanged) b.P("Dirs: %5d new, %5d changed, %5d unmodified\n", summary.Dirs.New, summary.Dirs.Changed, summary.Dirs.Unchanged) - b.V("Data Blobs: %5d new\n", summary.ItemStats.DataBlobs) - b.V("Tree Blobs: %5d new\n", summary.ItemStats.TreeBlobs) + b.V("Data Blobs: %5d new\n", summary.ItemStats.DataBlobs.Load()) + b.V("Tree Blobs: %5d new\n", summary.ItemStats.TreeBlobs.Load()) verb := "Added" if dryRun { verb = "Would add" } b.P("%s to the repository: %-5s (%-5s stored)\n", verb, - ui.FormatBytes(summary.ItemStats.DataSize+summary.ItemStats.TreeSize), - ui.FormatBytes(summary.ItemStats.DataSizeInRepo+summary.ItemStats.TreeSizeInRepo)) + ui.FormatBytes(summary.ItemStats.DataSize.Load()+summary.ItemStats.TreeSize.Load()), + ui.FormatBytes(summary.ItemStats.DataSizeInRepo.Load()+summary.ItemStats.TreeSizeInRepo.Load())) b.P("\n") b.P("processed %v files, %v in %s", summary.Files.New+summary.Files.Changed+summary.Files.Unchanged,