diff --git a/internal/archiver/archiver.go b/internal/archiver/archiver.go index ceb4be69e..6d99ea167 100644 --- a/internal/archiver/archiver.go +++ b/internal/archiver/archiver.go @@ -9,6 +9,7 @@ import ( "sort" "strings" "sync" + "sync/atomic" "time" "github.com/restic/restic/internal/data" @@ -35,12 +36,12 @@ type ErrorFunc func(file string, err error) error // ItemStats collects some statistics about a particular file or directory. type ItemStats struct { - DataBlobs int // number of new data blobs added for this item - DataSize uint64 // sum of the sizes of all new data blobs - DataSizeInRepo uint64 // sum of the bytes added to the repo (including compression and crypto overhead) - TreeBlobs int // number of new tree blobs added for this item - TreeSize uint64 // sum of the sizes of all new tree blobs - TreeSizeInRepo uint64 // sum of the bytes added to the repo (including compression and crypto overhead) + DataBlobs atomic.Int64 // number of new data blobs added for this item + DataSize atomic.Uint64 // sum of the sizes of all new data blobs + DataSizeInRepo atomic.Uint64 // sum of the bytes added to the repo (including compression and crypto overhead) + TreeBlobs atomic.Int64 // number of new tree blobs added for this item + TreeSize atomic.Uint64 // sum of the sizes of all new tree blobs + TreeSizeInRepo atomic.Uint64 // sum of the bytes added to the repo (including compression and crypto overhead) } type ChangeStats struct { @@ -57,14 +58,17 @@ type Summary struct { ItemStats } -// Add adds other to the current ItemStats. -func (s *ItemStats) Add(other ItemStats) { - s.DataBlobs += other.DataBlobs - s.DataSize += other.DataSize - s.DataSizeInRepo += other.DataSizeInRepo - s.TreeBlobs += other.TreeBlobs - s.TreeSize += other.TreeSize - s.TreeSizeInRepo += other.TreeSizeInRepo +// Add atomically adds other to the current ItemStats. +func (s *ItemStats) Add(other *ItemStats) { + if other == nil { + return + } + s.DataBlobs.Add(other.DataBlobs.Load()) + s.DataSize.Add(other.DataSize.Load()) + s.DataSizeInRepo.Add(other.DataSizeInRepo.Load()) + s.TreeBlobs.Add(other.TreeBlobs.Load()) + s.TreeSize.Add(other.TreeSize.Load()) + s.TreeSizeInRepo.Add(other.TreeSizeInRepo.Load()) } // ToNoder returns a data.Node for a File. @@ -114,7 +118,7 @@ type Archiver struct { // // CompleteItem may be called asynchronously from several different // goroutines! - CompleteItem func(item string, previous, current *data.Node, s ItemStats, d time.Duration) + CompleteItem func(item string, previous, current *data.Node, s *ItemStats, d time.Duration) // StartFile is called when a file is being processed by a worker. StartFile func(filename string) @@ -180,7 +184,7 @@ func New(repo archiverRepo, filesystem fs.FS, opts Options) *Archiver { FS: filesystem, Options: opts.ApplyDefaults(), - CompleteItem: func(string, *data.Node, *data.Node, ItemStats, time.Duration) {}, + CompleteItem: func(string, *data.Node, *data.Node, *ItemStats, time.Duration) {}, StartFile: func(string) {}, CompleteBlob: func(uint64) {}, } @@ -210,7 +214,7 @@ func (arch *Archiver) error(item string, err error) error { return errf } -func (arch *Archiver) trackItem(item string, previous, current *data.Node, s ItemStats, d time.Duration) { +func (arch *Archiver) trackItem(item string, previous, current *data.Node, s *ItemStats, d time.Duration) { arch.CompleteItem(item, previous, current, s, d) arch.mu.Lock() @@ -395,7 +399,7 @@ type futureNodeResult struct { snPath, target string node *data.Node - stats ItemStats + stats *ItemStats err error } @@ -514,7 +518,7 @@ func (arch *Archiver) save(ctx context.Context, snPath, target string, previous if previous != nil && !fileChanged(fi, previous, arch.ChangeIgnoreFlags) { if arch.allBlobsPresent(previous) { debug.Log("%v hasn't changed, using old list of blobs", target) - arch.trackItem(snPath, previous, previous, ItemStats{}, time.Since(start)) + arch.trackItem(snPath, previous, previous, &ItemStats{}, time.Since(start)) arch.CompleteBlob(previous.Size) node, err := arch.nodeFromFileInfo(snPath, target, meta, false) if err != nil { @@ -567,8 +571,8 @@ func (arch *Archiver) save(ctx context.Context, snPath, target string, previous fn = arch.fileSaver.Save(ctx, snPath, target, meta, func() { arch.StartFile(snPath) }, func() { - arch.trackItem(snPath, nil, nil, ItemStats{}, 0) - }, func(node *data.Node, stats ItemStats) { + arch.trackItem(snPath, nil, nil, &ItemStats{}, 0) + }, func(node *data.Node, stats *ItemStats) { arch.trackItem(snPath, previous, node, stats, time.Since(start)) }) @@ -585,7 +589,7 @@ func (arch *Archiver) save(ctx context.Context, snPath, target string, previous } fn, err = arch.saveDir(ctx, snPath, target, meta, oldSubtree, - func(node *data.Node, stats ItemStats) { + func(node *data.Node, stats *ItemStats) { arch.trackItem(snItem, previous, node, stats, time.Since(start)) }) if err != nil { @@ -728,7 +732,7 @@ func (arch *Archiver) saveTree(ctx context.Context, snPath string, atree *tree, } // not a leaf node, archive subtree - fn, _, err := arch.saveTree(ctx, join(snPath, name), &subatree, oldSubtree, func(n *data.Node, is ItemStats) { + fn, _, err := arch.saveTree(ctx, join(snPath, name), &subatree, oldSubtree, func(n *data.Node, is *ItemStats) { arch.trackItem(snItem, oldNode, n, is, time.Since(start)) }) if err != nil { @@ -886,7 +890,7 @@ func (arch *Archiver) Snapshot(ctx context.Context, targets []string, opts Snaps arch.runWorkers(wgCtx, wg, uploader) debug.Log("starting snapshot") - fn, nodeCount, err := arch.saveTree(wgCtx, "/", atree, arch.loadParentTree(wgCtx, opts.ParentSnapshot), func(_ *data.Node, is ItemStats) { + fn, nodeCount, err := arch.saveTree(wgCtx, "/", atree, arch.loadParentTree(wgCtx, opts.ParentSnapshot), func(_ *data.Node, is *ItemStats) { arch.trackItem("/", nil, nil, is, time.Since(start)) }) if err != nil { @@ -954,10 +958,10 @@ func (arch *Archiver) Snapshot(ctx context.Context, targets []string, opts Snaps DirsNew: arch.summary.Dirs.New, DirsChanged: arch.summary.Dirs.Changed, DirsUnmodified: arch.summary.Dirs.Unchanged, - DataBlobs: arch.summary.ItemStats.DataBlobs, - TreeBlobs: arch.summary.ItemStats.TreeBlobs, - DataAdded: arch.summary.ItemStats.DataSize + arch.summary.ItemStats.TreeSize, - DataAddedPacked: arch.summary.ItemStats.DataSizeInRepo + arch.summary.ItemStats.TreeSizeInRepo, + DataBlobs: int(arch.summary.ItemStats.DataBlobs.Load()), + TreeBlobs: int(arch.summary.ItemStats.TreeBlobs.Load()), + DataAdded: arch.summary.ItemStats.DataSize.Load() + arch.summary.ItemStats.TreeSize.Load(), + DataAddedPacked: arch.summary.ItemStats.DataSizeInRepo.Load() + arch.summary.ItemStats.TreeSizeInRepo.Load(), TotalFilesProcessed: arch.summary.Files.New + arch.summary.Files.Changed + arch.summary.Files.Unchanged, TotalBytesProcessed: arch.summary.ProcessedBytes, } diff --git a/internal/archiver/archiver_test.go b/internal/archiver/archiver_test.go index b13734655..0ff6c4429 100644 --- a/internal/archiver/archiver_test.go +++ b/internal/archiver/archiver_test.go @@ -38,12 +38,12 @@ func prepareTempdirRepoSrc(t testing.TB, src TestDir) (string, *repository.Repos return tempdir, repo } -func saveFile(t testing.TB, repo archiverRepo, filename string, filesystem fs.FS) (*data.Node, ItemStats) { +func saveFile(t testing.TB, repo archiverRepo, filename string, filesystem fs.FS) (*data.Node, *ItemStats) { var ( completeReadingCallback bool completeCallbackNode *data.Node - completeCallbackStats ItemStats + completeCallbackStats *ItemStats completeCallback bool startCallback bool @@ -67,7 +67,7 @@ func saveFile(t testing.TB, repo archiverRepo, filename string, filesystem fs.FS } } - complete := func(node *data.Node, stats ItemStats) { + complete := func(node *data.Node, stats *ItemStats) { completeCallback = true completeCallbackNode = node completeCallbackStats = stats @@ -117,7 +117,7 @@ func saveFile(t testing.TB, repo archiverRepo, filename string, filesystem fs.FS } if completeCallbackStats != fnr.stats { - t.Errorf("different stats return for complete callback, want:\n %v\ngot:\n %v", fnr.stats, completeCallbackStats) + t.Errorf("different stats returned for complete callback") } return fnr.node, fnr.stats @@ -139,17 +139,17 @@ func TestArchiverSaveFile(t *testing.T) { node, stats := saveFile(t, repo, filepath.Join(tempdir, "file"), fs.Track{FS: fs.Local{}}) TestEnsureFileContent(ctx, t, repo, "file", node, testfile) - if stats.DataSize != uint64(len(testfile.Content)) { - t.Errorf("wrong stats returned in DataSize, want %d, got %d", len(testfile.Content), stats.DataSize) + if stats.DataSize.Load() != uint64(len(testfile.Content)) { + t.Errorf("wrong stats returned in DataSize, want %d, got %d", len(testfile.Content), stats.DataSize.Load()) } - if stats.DataBlobs <= 0 && len(testfile.Content) > 0 { - t.Errorf("wrong stats returned in DataBlobs, want > 0, got %d", stats.DataBlobs) + if stats.DataBlobs.Load() <= 0 && len(testfile.Content) > 0 { + t.Errorf("wrong stats returned in DataBlobs, want > 0, got %d", stats.DataBlobs.Load()) } - if stats.TreeSize != 0 { - t.Errorf("wrong stats returned in TreeSize, want 0, got %d", stats.TreeSize) + if stats.TreeSize.Load() != 0 { + t.Errorf("wrong stats returned in TreeSize, want 0, got %d", stats.TreeSize.Load()) } - if stats.TreeBlobs != 0 { - t.Errorf("wrong stats returned in DataBlobs, want 0, got %d", stats.DataBlobs) + if stats.TreeBlobs.Load() != 0 { + t.Errorf("wrong stats returned in DataBlobs, want 0, got %d", stats.DataBlobs.Load()) } }) } @@ -181,17 +181,17 @@ func TestArchiverSaveFileReaderFS(t *testing.T) { node, stats := saveFile(t, repo, filename, readerFs) TestEnsureFileContent(ctx, t, repo, "file", node, TestFile{Content: test.Data}) - if stats.DataSize != uint64(len(test.Data)) { - t.Errorf("wrong stats returned in DataSize, want %d, got %d", len(test.Data), stats.DataSize) + if stats.DataSize.Load() != uint64(len(test.Data)) { + t.Errorf("wrong stats returned in DataSize, want %d, got %d", len(test.Data), stats.DataSize.Load()) } - if stats.DataBlobs <= 0 && len(test.Data) > 0 { - t.Errorf("wrong stats returned in DataBlobs, want > 0, got %d", stats.DataBlobs) + if stats.DataBlobs.Load() <= 0 && len(test.Data) > 0 { + t.Errorf("wrong stats returned in DataBlobs, want > 0, got %d", stats.DataBlobs.Load()) } - if stats.TreeSize != 0 { - t.Errorf("wrong stats returned in TreeSize, want 0, got %d", stats.TreeSize) + if stats.TreeSize.Load() != 0 { + t.Errorf("wrong stats returned in TreeSize, want 0, got %d", stats.TreeSize.Load()) } - if stats.TreeBlobs != 0 { - t.Errorf("wrong stats returned in DataBlobs, want 0, got %d", stats.DataBlobs) + if stats.TreeBlobs.Load() != 0 { + t.Errorf("wrong stats returned in DataBlobs, want 0, got %d", stats.DataBlobs.Load()) } }) } @@ -250,17 +250,17 @@ func TestArchiverSave(t *testing.T) { TestEnsureFileContent(ctx, t, repo, "file", fnr.node, testfile) stats := fnr.stats - if stats.DataSize != uint64(len(testfile.Content)) { - t.Errorf("wrong stats returned in DataSize, want %d, got %d", len(testfile.Content), stats.DataSize) + if stats.DataSize.Load() != uint64(len(testfile.Content)) { + t.Errorf("wrong stats returned in DataSize, want %d, got %d", len(testfile.Content), stats.DataSize.Load()) } - if stats.DataBlobs <= 0 && len(testfile.Content) > 0 { - t.Errorf("wrong stats returned in DataBlobs, want > 0, got %d", stats.DataBlobs) + if stats.DataBlobs.Load() <= 0 && len(testfile.Content) > 0 { + t.Errorf("wrong stats returned in DataBlobs, want > 0, got %d", stats.DataBlobs.Load()) } - if stats.TreeSize != 0 { - t.Errorf("wrong stats returned in TreeSize, want 0, got %d", stats.TreeSize) + if stats.TreeSize.Load() != 0 { + t.Errorf("wrong stats returned in TreeSize, want 0, got %d", stats.TreeSize.Load()) } - if stats.TreeBlobs != 0 { - t.Errorf("wrong stats returned in DataBlobs, want 0, got %d", stats.DataBlobs) + if stats.TreeBlobs.Load() != 0 { + t.Errorf("wrong stats returned in DataBlobs, want 0, got %d", stats.DataBlobs.Load()) } }) } @@ -328,17 +328,17 @@ func TestArchiverSaveReaderFS(t *testing.T) { TestEnsureFileContent(ctx, t, repo, "file", fnr.node, TestFile{Content: test.Data}) stats := fnr.stats - if stats.DataSize != uint64(len(test.Data)) { - t.Errorf("wrong stats returned in DataSize, want %d, got %d", len(test.Data), stats.DataSize) + if stats.DataSize.Load() != uint64(len(test.Data)) { + t.Errorf("wrong stats returned in DataSize, want %d, got %d", len(test.Data), stats.DataSize.Load()) } - if stats.DataBlobs <= 0 && len(test.Data) > 0 { - t.Errorf("wrong stats returned in DataBlobs, want > 0, got %d", stats.DataBlobs) + if stats.DataBlobs.Load() <= 0 && len(test.Data) > 0 { + t.Errorf("wrong stats returned in DataBlobs, want > 0, got %d", stats.DataBlobs.Load()) } - if stats.TreeSize != 0 { - t.Errorf("wrong stats returned in TreeSize, want 0, got %d", stats.TreeSize) + if stats.TreeSize.Load() != 0 { + t.Errorf("wrong stats returned in TreeSize, want 0, got %d", stats.TreeSize.Load()) } - if stats.TreeBlobs != 0 { - t.Errorf("wrong stats returned in DataBlobs, want 0, got %d", stats.DataBlobs) + if stats.TreeBlobs.Load() != 0 { + t.Errorf("wrong stats returned in DataBlobs, want 0, got %d", stats.DataBlobs.Load()) } }) } @@ -360,22 +360,78 @@ func BenchmarkArchiverSaveFileSmall(b *testing.B) { _, stats := saveFile(b, repo, filepath.Join(tempdir, "file"), fs.Track{FS: fs.Local{}}) b.StopTimer() - if stats.DataSize != fileSize { - b.Errorf("wrong stats returned in DataSize, want %d, got %d", fileSize, stats.DataSize) + if stats.DataSize.Load() != fileSize { + b.Errorf("wrong stats returned in DataSize, want %d, got %d", fileSize, stats.DataSize.Load()) } - if stats.DataBlobs <= 0 { - b.Errorf("wrong stats returned in DataBlobs, want > 0, got %d", stats.DataBlobs) + if stats.DataBlobs.Load() <= 0 { + b.Errorf("wrong stats returned in DataBlobs, want > 0, got %d", stats.DataBlobs.Load()) } - if stats.TreeSize != 0 { - b.Errorf("wrong stats returned in TreeSize, want 0, got %d", stats.TreeSize) + if stats.TreeSize.Load() != 0 { + b.Errorf("wrong stats returned in TreeSize, want 0, got %d", stats.TreeSize.Load()) } - if stats.TreeBlobs != 0 { - b.Errorf("wrong stats returned in DataBlobs, want 0, got %d", stats.DataBlobs) + if stats.TreeBlobs.Load() != 0 { + b.Errorf("wrong stats returned in DataBlobs, want 0, got %d", stats.DataBlobs.Load()) } b.StartTimer() } } +func BenchmarkArchiverSaveFileConcurrent(b *testing.B) { + const fileSize = 4 * 1024 + const numFiles = 64 + + d := TestDir{} + for i := 0; i < numFiles; i++ { + d[fmt.Sprintf("file%d", i)] = TestFile{ + Content: string(rtest.Random(23+i, fileSize)), + } + } + + b.SetBytes(fileSize * numFiles) + + for i := 0; i < b.N; i++ { + b.StopTimer() + tempdir, repo := prepareTempdirRepoSrc(b, d) + + arch := New(repo, fs.Track{FS: fs.Local{}}, Options{}) + arch.Error = func(item string, err error) error { + b.Errorf("archiver error for %v: %v", item, err) + return err + } + b.StartTimer() + + err := repo.WithBlobUploader(context.TODO(), func(ctx context.Context, uploader restic.BlobSaverWithAsync) error { + wg, ctx := errgroup.WithContext(ctx) + arch.runWorkers(ctx, wg, uploader) + + var futures []futureNode + for j := 0; j < numFiles; j++ { + filename := filepath.Join(tempdir, fmt.Sprintf("file%d", j)) + file, err := arch.FS.OpenFile(filename, fs.O_NOFOLLOW, false) + if err != nil { + b.Fatal(err) + } + + fn := arch.fileSaver.Save(ctx, "/", filename, file, func() {}, func() {}, func(*data.Node, *ItemStats) {}) + futures = append(futures, fn) + } + + for _, fn := range futures { + fnr := fn.take(ctx) + if fnr.err != nil { + b.Fatal(fnr.err) + } + } + + arch.stopWorkers() + return wg.Wait() + }) + if err != nil { + b.Fatal(err) + } + } +} + func BenchmarkArchiverSaveFileLarge(b *testing.B) { const fileSize = 40*1024*1024 + 1287898 d := TestDir{"file": TestFile{ @@ -392,17 +448,17 @@ func BenchmarkArchiverSaveFileLarge(b *testing.B) { _, stats := saveFile(b, repo, filepath.Join(tempdir, "file"), fs.Track{FS: fs.Local{}}) b.StopTimer() - if stats.DataSize != fileSize { - b.Errorf("wrong stats returned in DataSize, want %d, got %d", fileSize, stats.DataSize) + if stats.DataSize.Load() != fileSize { + b.Errorf("wrong stats returned in DataSize, want %d, got %d", fileSize, stats.DataSize.Load()) } - if stats.DataBlobs <= 0 { - b.Errorf("wrong stats returned in DataBlobs, want > 0, got %d", stats.DataBlobs) + if stats.DataBlobs.Load() <= 0 { + b.Errorf("wrong stats returned in DataBlobs, want > 0, got %d", stats.DataBlobs.Load()) } - if stats.TreeSize != 0 { - b.Errorf("wrong stats returned in TreeSize, want 0, got %d", stats.TreeSize) + if stats.TreeSize.Load() != 0 { + b.Errorf("wrong stats returned in TreeSize, want 0, got %d", stats.TreeSize.Load()) } - if stats.TreeBlobs != 0 { - b.Errorf("wrong stats returned in DataBlobs, want 0, got %d", stats.DataBlobs) + if stats.TreeBlobs.Load() != 0 { + b.Errorf("wrong stats returned in DataBlobs, want 0, got %d", stats.DataBlobs.Load()) } b.StartTimer() } @@ -850,7 +906,7 @@ func TestArchiverSaveDir(t *testing.T) { defer back() var treeID restic.ID - err := repo.WithBlobUploader(context.TODO(), func(ctx context.Context, uploader restic.BlobSaverWithAsync) error { + err := repo.WithBlobUploader(context.Background(), func(ctx context.Context, uploader restic.BlobSaverWithAsync) error { wg, ctx := errgroup.WithContext(ctx) arch.runWorkers(ctx, wg, uploader) meta, err := testFS.OpenFile(test.target, fs.O_NOFOLLOW, true) @@ -863,17 +919,17 @@ func TestArchiverSaveDir(t *testing.T) { node, stats := fnr.node, fnr.stats t.Logf("stats: %v", stats) - if stats.DataSize != 0 { - t.Errorf("wrong stats returned in DataSize, want 0, got %d", stats.DataSize) + if stats.DataSize.Load() != 0 { + t.Errorf("wrong stats returned in DataSize, want 0, got %d", stats.DataSize.Load()) } - if stats.DataBlobs != 0 { - t.Errorf("wrong stats returned in DataBlobs, want 0, got %d", stats.DataBlobs) + if stats.DataBlobs.Load() != 0 { + t.Errorf("wrong stats returned in DataBlobs, want 0, got %d", stats.DataBlobs.Load()) } - if stats.TreeSize == 0 { - t.Errorf("wrong stats returned in TreeSize, want > 0, got %d", stats.TreeSize) + if stats.TreeSize.Load() == 0 { + t.Errorf("wrong stats returned in TreeSize, want > 0, got %d", stats.TreeSize.Load()) } - if stats.TreeBlobs <= 0 { - t.Errorf("wrong stats returned in TreeBlobs, want > 0, got %d", stats.TreeBlobs) + if stats.TreeBlobs.Load() <= 0 { + t.Errorf("wrong stats returned in TreeBlobs, want > 0, got %d", stats.TreeBlobs.Load()) } node.Name = targetNodeName @@ -933,31 +989,31 @@ func TestArchiverSaveDirIncremental(t *testing.T) { node, stats := fnr.node, fnr.stats if i == 0 { // operation must have added new tree data - if stats.DataSize != 0 { - t.Errorf("wrong stats returned in DataSize, want 0, got %d", stats.DataSize) + if stats.DataSize.Load() != 0 { + t.Errorf("wrong stats returned in DataSize, want 0, got %d", stats.DataSize.Load()) } - if stats.DataBlobs != 0 { - t.Errorf("wrong stats returned in DataBlobs, want 0, got %d", stats.DataBlobs) + if stats.DataBlobs.Load() != 0 { + t.Errorf("wrong stats returned in DataBlobs, want 0, got %d", stats.DataBlobs.Load()) } - if stats.TreeSize == 0 { - t.Errorf("wrong stats returned in TreeSize, want > 0, got %d", stats.TreeSize) + if stats.TreeSize.Load() == 0 { + t.Errorf("wrong stats returned in TreeSize, want > 0, got %d", stats.TreeSize.Load()) } - if stats.TreeBlobs <= 0 { - t.Errorf("wrong stats returned in TreeBlobs, want > 0, got %d", stats.TreeBlobs) + if stats.TreeBlobs.Load() <= 0 { + t.Errorf("wrong stats returned in TreeBlobs, want > 0, got %d", stats.TreeBlobs.Load()) } } else { // operation must not have added any new data - if stats.DataSize != 0 { - t.Errorf("wrong stats returned in DataSize, want 0, got %d", stats.DataSize) + if stats.DataSize.Load() != 0 { + t.Errorf("wrong stats returned in DataSize, want 0, got %d", stats.DataSize.Load()) } - if stats.DataBlobs != 0 { - t.Errorf("wrong stats returned in DataBlobs, want 0, got %d", stats.DataBlobs) + if stats.DataBlobs.Load() != 0 { + t.Errorf("wrong stats returned in DataBlobs, want 0, got %d", stats.DataBlobs.Load()) } - if stats.TreeSize != 0 { - t.Errorf("wrong stats returned in TreeSize, want 0, got %d", stats.TreeSize) + if stats.TreeSize.Load() != 0 { + t.Errorf("wrong stats returned in TreeSize, want 0, got %d", stats.TreeSize.Load()) } - if stats.TreeBlobs != 0 { - t.Errorf("wrong stats returned in TreeBlobs, want 0, got %d", stats.TreeBlobs) + if stats.TreeBlobs.Load() != 0 { + t.Errorf("wrong stats returned in TreeBlobs, want 0, got %d", stats.TreeBlobs.Load()) } } @@ -971,6 +1027,18 @@ func TestArchiverSaveDirIncremental(t *testing.T) { } } +// newItemStats creates an ItemStats with the given values pre-loaded into the atomic fields. +func newItemStats(dataBlobs int64, dataSize, dataSizeInRepo uint64, treeBlobs int64, treeSize, treeSizeInRepo uint64) ItemStats { + var s ItemStats + s.DataBlobs.Store(dataBlobs) + s.DataSize.Store(dataSize) + s.DataSizeInRepo.Store(dataSizeInRepo) + s.TreeBlobs.Store(treeBlobs) + s.TreeSize.Store(treeSize) + s.TreeSizeInRepo.Store(treeSizeInRepo) + return s +} + // bothZeroOrNeither fails the test if only one of exp, act is zero. func bothZeroOrNeither(tb testing.TB, exp, act uint64) { tb.Helper() @@ -1006,7 +1074,7 @@ func TestArchiverSaveTree(t *testing.T) { "targetfile": TestFile{Content: "foobar"}, }, stat: Summary{ - ItemStats: ItemStats{1, 6, 32 + 6, 0, 0, 0}, + ItemStats: newItemStats(1, 6, 32+6, 0, 0, 0), ProcessedBytes: 6, Files: ChangeStats{1, 0, 0}, Dirs: ChangeStats{0, 0, 0}, @@ -1023,7 +1091,7 @@ func TestArchiverSaveTree(t *testing.T) { "filesymlink": TestSymlink{Target: "targetfile"}, }, stat: Summary{ - ItemStats: ItemStats{1, 6, 32 + 6, 0, 0, 0}, + ItemStats: newItemStats(1, 6, 32+6, 0, 0, 0), ProcessedBytes: 6, Files: ChangeStats{1, 0, 0}, Dirs: ChangeStats{0, 0, 0}, @@ -1048,7 +1116,7 @@ func TestArchiverSaveTree(t *testing.T) { }, }, stat: Summary{ - ItemStats: ItemStats{0, 0, 0, 1, 0x154, 0x16a}, + ItemStats: newItemStats(0, 0, 0, 1, 0x154, 0x16a), ProcessedBytes: 0, Files: ChangeStats{0, 0, 0}, Dirs: ChangeStats{1, 0, 0}, @@ -1077,7 +1145,7 @@ func TestArchiverSaveTree(t *testing.T) { }, }, stat: Summary{ - ItemStats: ItemStats{1, 6, 32 + 6, 3, 0x47f, 0x4c1}, + ItemStats: newItemStats(1, 6, 32+6, 3, 0x47f, 0x4c1), ProcessedBytes: 6, Files: ChangeStats{1, 0, 0}, Dirs: ChangeStats{3, 0, 0}, @@ -1136,11 +1204,11 @@ func TestArchiverSaveTree(t *testing.T) { } TestEnsureTree(context.TODO(), t, "/", repo, treeID, want) stat := arch.summary - bothZeroOrNeither(t, uint64(test.stat.DataBlobs), uint64(stat.DataBlobs)) - bothZeroOrNeither(t, uint64(test.stat.TreeBlobs), uint64(stat.TreeBlobs)) - bothZeroOrNeither(t, test.stat.DataSize, stat.DataSize) - bothZeroOrNeither(t, test.stat.DataSizeInRepo, stat.DataSizeInRepo) - bothZeroOrNeither(t, test.stat.TreeSizeInRepo, stat.TreeSizeInRepo) + bothZeroOrNeither(t, uint64(test.stat.DataBlobs.Load()), uint64(stat.DataBlobs.Load())) + bothZeroOrNeither(t, uint64(test.stat.TreeBlobs.Load()), uint64(stat.TreeBlobs.Load())) + bothZeroOrNeither(t, test.stat.DataSize.Load(), stat.DataSize.Load()) + bothZeroOrNeither(t, test.stat.DataSizeInRepo.Load(), stat.DataSizeInRepo.Load()) + bothZeroOrNeither(t, test.stat.TreeSizeInRepo.Load(), stat.TreeSizeInRepo.Load()) rtest.Equals(t, test.stat.ProcessedBytes, stat.ProcessedBytes) rtest.Equals(t, test.stat.Files, stat.Files) rtest.Equals(t, test.stat.Dirs, stat.Dirs) @@ -1684,10 +1752,10 @@ func checkSnapshotStats(t *testing.T, sn *data.Snapshot, stat Summary) { rtest.Equals(t, stat.Dirs.Unchanged, sn.Summary.DirsUnmodified, "DirsUnmodified") rtest.Equals(t, stat.ProcessedBytes, sn.Summary.TotalBytesProcessed, "TotalBytesProcessed") rtest.Equals(t, stat.Files.New+stat.Files.Changed+stat.Files.Unchanged, sn.Summary.TotalFilesProcessed, "TotalFilesProcessed") - bothZeroOrNeither(t, uint64(stat.DataBlobs), uint64(sn.Summary.DataBlobs)) - bothZeroOrNeither(t, uint64(stat.TreeBlobs), uint64(sn.Summary.TreeBlobs)) - bothZeroOrNeither(t, stat.DataSize+stat.TreeSize, sn.Summary.DataAdded) - bothZeroOrNeither(t, stat.DataSizeInRepo+stat.TreeSizeInRepo, sn.Summary.DataAddedPacked) + bothZeroOrNeither(t, uint64(stat.DataBlobs.Load()), uint64(sn.Summary.DataBlobs)) + bothZeroOrNeither(t, uint64(stat.TreeBlobs.Load()), uint64(sn.Summary.TreeBlobs)) + bothZeroOrNeither(t, stat.DataSize.Load()+stat.TreeSize.Load(), sn.Summary.DataAdded) + bothZeroOrNeither(t, stat.DataSizeInRepo.Load()+stat.TreeSizeInRepo.Load(), sn.Summary.DataAddedPacked) } func TestArchiverParent(t *testing.T) { @@ -1706,7 +1774,7 @@ func TestArchiverParent(t *testing.T) { Files: ChangeStats{1, 0, 0}, Dirs: ChangeStats{0, 0, 0}, ProcessedBytes: 2102152, - ItemStats: ItemStats{3, 0x201593, 0x201632, 1, 0, 0}, + ItemStats: newItemStats(3, 0x201593, 0x201632, 1, 0, 0), }, statSecond: Summary{ Files: ChangeStats{0, 0, 1}, @@ -1725,7 +1793,7 @@ func TestArchiverParent(t *testing.T) { Files: ChangeStats{1, 0, 0}, Dirs: ChangeStats{0, 0, 0}, ProcessedBytes: 2102152, - ItemStats: ItemStats{3, 0x201593, 0x201632, 1, 0, 0}, + ItemStats: newItemStats(3, 0x201593, 0x201632, 1, 0, 0), }, statSecond: Summary{ Files: ChangeStats{0, 0, 1}, @@ -1744,7 +1812,7 @@ func TestArchiverParent(t *testing.T) { Files: ChangeStats{2, 0, 0}, Dirs: ChangeStats{1, 0, 0}, ProcessedBytes: 2469, - ItemStats: ItemStats{2, 0xe1c, 0xcd9, 2, 0, 0}, + ItemStats: newItemStats(2, 0xe1c, 0xcd9, 2, 0, 0), }, statSecond: Summary{ Files: ChangeStats{0, 0, 2}, @@ -1767,13 +1835,13 @@ func TestArchiverParent(t *testing.T) { Files: ChangeStats{2, 0, 0}, Dirs: ChangeStats{1, 0, 0}, ProcessedBytes: 2469, - ItemStats: ItemStats{2, 0xe13, 0xcf8, 2, 0, 0}, + ItemStats: newItemStats(2, 0xe13, 0xcf8, 2, 0, 0), }, statSecond: Summary{ Files: ChangeStats{0, 1, 0}, Dirs: ChangeStats{0, 1, 0}, ProcessedBytes: 6, - ItemStats: ItemStats{1, 0x305, 0x233, 2, 0, 0}, + ItemStats: newItemStats(1, 0x305, 0x233, 2, 0, 0), }, }, } diff --git a/internal/archiver/file_saver.go b/internal/archiver/file_saver.go index 3407cae16..29f69d821 100644 --- a/internal/archiver/file_saver.go +++ b/internal/archiver/file_saver.go @@ -59,7 +59,7 @@ func (s *fileSaver) TriggerShutdown() { } // fileCompleteFunc is called when the file has been saved. -type fileCompleteFunc func(*data.Node, ItemStats) +type fileCompleteFunc func(*data.Node, *ItemStats) // Save stores the file f and returns the data once it has been completed. The // file is closed by Save. completeReading is only called if the file was read @@ -107,6 +107,7 @@ func (s *fileSaver) saveFile(ctx context.Context, chnker *chunker.Chunker, snPat fnr := futureNodeResult{ snPath: snPath, target: target, + stats: &ItemStats{}, } var lock sync.Mutex remaining := 0 @@ -141,7 +142,7 @@ func (s *fileSaver) saveFile(ctx context.Context, chnker *chunker.Chunker, snPat isCompleted = true fnr.err = fmt.Errorf("failed to save %v: %w", target, err) fnr.node = nil - fnr.stats = ItemStats{} + fnr.stats = &ItemStats{} finish(fnr) } } @@ -208,9 +209,9 @@ func (s *fileSaver) saveFile(ctx context.Context, chnker *chunker.Chunker, snPat lock.Lock() if !known { - fnr.stats.DataBlobs++ - fnr.stats.DataSize += uint64(len(buf.Data)) - fnr.stats.DataSizeInRepo += uint64(sizeInRepo) + fnr.stats.DataBlobs.Add(1) + fnr.stats.DataSize.Add(uint64(len(buf.Data))) + fnr.stats.DataSizeInRepo.Add(uint64(sizeInRepo)) } node.Content[pos] = newID lock.Unlock() diff --git a/internal/archiver/file_saver_test.go b/internal/archiver/file_saver_test.go index 4dbf78548..f1daa1a81 100644 --- a/internal/archiver/file_saver_test.go +++ b/internal/archiver/file_saver_test.go @@ -54,7 +54,7 @@ func TestFileSaver(t *testing.T) { startFn := func() {} completeReadingFn := func() {} - completeFn := func(*data.Node, ItemStats) {} + completeFn := func(*data.Node, *ItemStats) {} files := createTestFiles(t, 15) testFs := fs.Local{} diff --git a/internal/archiver/tree_saver.go b/internal/archiver/tree_saver.go index 8b38b5eb2..305e08b91 100644 --- a/internal/archiver/tree_saver.go +++ b/internal/archiver/tree_saver.go @@ -73,7 +73,7 @@ type saveTreeJob struct { } // save stores the nodes as a tree in the repo. -func (s *treeSaver) save(ctx context.Context, job *saveTreeJob) (*data.Node, ItemStats, error) { +func (s *treeSaver) save(ctx context.Context, job *saveTreeJob) (*data.Node, *ItemStats, error) { var stats ItemStats node := job.node nodes := job.nodes @@ -92,7 +92,7 @@ func (s *treeSaver) save(ctx context.Context, job *saveTreeJob) (*data.Node, Ite if fnr.err != nil { debug.Log("err for %v: %v", fnr.snPath, fnr.err) if fnr.err == context.Canceled { - return nil, stats, fnr.err + return nil, &stats, fnr.err } fnr.err = s.errFn(fnr.target, fnr.err) @@ -101,7 +101,7 @@ func (s *treeSaver) save(ctx context.Context, job *saveTreeJob) (*data.Node, Ite continue } - return nil, stats, fnr.err + return nil, &stats, fnr.err } // when the error is ignored, the node could not be saved, so ignore it @@ -119,14 +119,14 @@ func (s *treeSaver) save(ctx context.Context, job *saveTreeJob) (*data.Node, Ite } if err != nil { debug.Log("insert %v failed: %v", fnr.node.Name, err) - return nil, stats, err + return nil, &stats, err } lastNode = fnr.node } buf, err := builder.Finalize() if err != nil { - return nil, stats, err + return nil, &stats, err } var ( @@ -149,18 +149,18 @@ func (s *treeSaver) save(ctx context.Context, job *saveTreeJob) (*data.Node, Ite select { case <-ch: if err != nil { - return nil, stats, err + return nil, &stats, err } if !known { - stats.TreeBlobs++ - stats.TreeSize += uint64(length) - stats.TreeSizeInRepo += uint64(sizeInRepo) + stats.TreeBlobs.Add(1) + stats.TreeSize.Add(uint64(length)) + stats.TreeSizeInRepo.Add(uint64(sizeInRepo)) } node.Subtree = &id - return node, stats, nil + return node, &stats, nil case <-ctx.Done(): - return nil, stats, ctx.Err() + return nil, &stats, ctx.Err() } } diff --git a/internal/ui/backup/json.go b/internal/ui/backup/json.go index b46bbdc5f..338efd39d 100644 --- a/internal/ui/backup/json.go +++ b/internal/ui/backup/json.go @@ -39,20 +39,20 @@ func (b *JSONProgress) error(status interface{}) { } // Update updates the status lines. -func (b *JSONProgress) Update(total, processed Counter, errors uint, currentFiles map[string]struct{}, start time.Time, secs uint64) { +func (b *JSONProgress) Update(total, processed Counter, errors uint32, currentFiles map[string]struct{}, start time.Time, secs uint64) { status := statusUpdate{ MessageType: "status", SecondsElapsed: uint64(time.Since(start) / time.Second), SecondsRemaining: secs, - TotalFiles: total.Files, - FilesDone: processed.Files, - TotalBytes: total.Bytes, - BytesDone: processed.Bytes, + TotalFiles: total.Files.Load(), + FilesDone: processed.Files.Load(), + TotalBytes: total.Bytes.Load(), + BytesDone: processed.Bytes.Load(), ErrorCount: errors, } - if total.Bytes > 0 { - status.PercentDone = float64(processed.Bytes) / float64(total.Bytes) + if status.TotalBytes > 0 { + status.PercentDone = float64(processed.Bytes.Load()) / float64(total.Bytes.Load()) } for filename := range currentFiles { @@ -88,7 +88,7 @@ func (b *JSONProgress) Error(item string, err error) error { // CompleteItem is the status callback function for the archiver when a // file/dir has been saved successfully. -func (b *JSONProgress) CompleteItem(messageType, item string, s archiver.ItemStats, d time.Duration) { +func (b *JSONProgress) CompleteItem(messageType, item string, s *archiver.ItemStats, d time.Duration) { if b.v < 2 { return } @@ -100,10 +100,10 @@ func (b *JSONProgress) CompleteItem(messageType, item string, s archiver.ItemSta Action: "new", Item: item, Duration: d.Seconds(), - DataSize: s.DataSize, - DataSizeInRepo: s.DataSizeInRepo, - MetadataSize: s.TreeSize, - MetadataSizeInRepo: s.TreeSizeInRepo, + DataSize: s.DataSize.Load(), + DataSizeInRepo: s.DataSizeInRepo.Load(), + MetadataSize: s.TreeSize.Load(), + MetadataSizeInRepo: s.TreeSizeInRepo.Load(), }) case "dir unchanged": b.print(verboseUpdate{ @@ -117,10 +117,10 @@ func (b *JSONProgress) CompleteItem(messageType, item string, s archiver.ItemSta Action: "modified", Item: item, Duration: d.Seconds(), - DataSize: s.DataSize, - DataSizeInRepo: s.DataSizeInRepo, - MetadataSize: s.TreeSize, - MetadataSizeInRepo: s.TreeSizeInRepo, + DataSize: s.DataSize.Load(), + DataSizeInRepo: s.DataSizeInRepo.Load(), + MetadataSize: s.TreeSize.Load(), + MetadataSizeInRepo: s.TreeSizeInRepo.Load(), }) case "file new": b.print(verboseUpdate{ @@ -128,8 +128,8 @@ func (b *JSONProgress) CompleteItem(messageType, item string, s archiver.ItemSta Action: "new", Item: item, Duration: d.Seconds(), - DataSize: s.DataSize, - DataSizeInRepo: s.DataSizeInRepo, + DataSize: s.DataSize.Load(), + DataSizeInRepo: s.DataSizeInRepo.Load(), }) case "file unchanged": b.print(verboseUpdate{ @@ -143,8 +143,8 @@ func (b *JSONProgress) CompleteItem(messageType, item string, s archiver.ItemSta Action: "modified", Item: item, Duration: d.Seconds(), - DataSize: s.DataSize, - DataSizeInRepo: s.DataSizeInRepo, + DataSize: s.DataSize.Load(), + DataSizeInRepo: s.DataSizeInRepo.Load(), }) } } @@ -177,10 +177,10 @@ func (b *JSONProgress) Finish(snapshotID restic.ID, summary *archiver.Summary, d DirsNew: summary.Dirs.New, DirsChanged: summary.Dirs.Changed, DirsUnmodified: summary.Dirs.Unchanged, - DataBlobs: summary.ItemStats.DataBlobs, - TreeBlobs: summary.ItemStats.TreeBlobs, - DataAdded: summary.ItemStats.DataSize + summary.ItemStats.TreeSize, - DataAddedPacked: summary.ItemStats.DataSizeInRepo + summary.ItemStats.TreeSizeInRepo, + DataBlobs: int(summary.ItemStats.DataBlobs.Load()), + TreeBlobs: int(summary.ItemStats.TreeBlobs.Load()), + DataAdded: summary.ItemStats.DataSize.Load() + summary.ItemStats.TreeSize.Load(), + DataAddedPacked: summary.ItemStats.DataSizeInRepo.Load() + summary.ItemStats.TreeSizeInRepo.Load(), TotalFilesProcessed: summary.Files.New + summary.Files.Changed + summary.Files.Unchanged, TotalBytesProcessed: summary.ProcessedBytes, BackupStart: summary.BackupStart, @@ -204,7 +204,7 @@ type statusUpdate struct { FilesDone uint64 `json:"files_done,omitempty"` TotalBytes uint64 `json:"total_bytes,omitempty"` BytesDone uint64 `json:"bytes_done,omitempty"` - ErrorCount uint `json:"error_count,omitempty"` + ErrorCount uint32 `json:"error_count,omitempty"` CurrentFiles []string `json:"current_files,omitempty"` } diff --git a/internal/ui/backup/progress.go b/internal/ui/backup/progress.go index 17ccfa8f4..f9dfacecc 100644 --- a/internal/ui/backup/progress.go +++ b/internal/ui/backup/progress.go @@ -2,6 +2,7 @@ package backup import ( "sync" + "sync/atomic" "time" "github.com/restic/restic/internal/archiver" @@ -13,10 +14,10 @@ import ( // A ProgressPrinter can print various progress messages. // It must be safe to call its methods from concurrent goroutines. type ProgressPrinter interface { - Update(total, processed Counter, errors uint, currentFiles map[string]struct{}, start time.Time, secs uint64) + Update(total, processed Counter, errors uint32, currentFiles map[string]struct{}, start time.Time, secs uint64) Error(item string, err error) error ScannerError(item string, err error) error - CompleteItem(messageType string, item string, s archiver.ItemStats, d time.Duration) + CompleteItem(messageType string, item string, s *archiver.ItemStats, d time.Duration) ReportTotal(start time.Time, s archiver.ScanStats) Finish(snapshotID restic.ID, summary *archiver.Summary, dryRun bool) Reset() @@ -25,7 +26,7 @@ type ProgressPrinter interface { } type Counter struct { - Files, Dirs, Bytes uint64 + Files, Dirs, Bytes atomic.Uint64 } // Progress reports progress for the `backup` command. @@ -36,11 +37,11 @@ type Progress struct { start time.Time estimator rateEstimator - scanStarted, scanFinished bool + scanStarted, scanFinished atomic.Bool currentFiles map[string]struct{} processed, total Counter - errors uint + errors atomic.Uint32 printer ProgressPrinter } @@ -56,25 +57,25 @@ func NewProgress(printer ProgressPrinter, interval time.Duration) *Progress { if final { p.printer.Reset() } else { - p.mu.Lock() - defer p.mu.Unlock() - if !p.scanStarted { + if !p.scanStarted.Load() { return } var secondsRemaining uint64 - if p.scanFinished { + if p.scanFinished.Load() { rate := p.estimator.rate(time.Now()) tooSlowCutoff := 1024. if rate <= tooSlowCutoff { secondsRemaining = 0 } else { - todo := float64(p.total.Bytes - p.processed.Bytes) + todo := float64(p.total.Bytes.Load() - p.processed.Bytes.Load()) secondsRemaining = uint64(todo / rate) } } - p.printer.Update(p.total, p.processed, p.errors, p.currentFiles, p.start, secondsRemaining) + p.mu.Lock() + defer p.mu.Unlock() + p.printer.Update(p.total, p.processed, p.errors.Load(), p.currentFiles, p.start, secondsRemaining) } }) return p @@ -82,10 +83,8 @@ func NewProgress(printer ProgressPrinter, interval time.Duration) *Progress { // Error is the error callback function for the archiver, it prints the error and returns nil. func (p *Progress) Error(item string, err error) error { - p.mu.Lock() - p.errors++ - p.scanStarted = true - p.mu.Unlock() + p.errors.Add(1) + p.scanStarted.Store(true) return p.printer.Error(item, err) } @@ -97,24 +96,16 @@ func (p *Progress) StartFile(filename string) { p.currentFiles[filename] = struct{}{} } -func (p *Progress) addProcessed(c Counter) { - p.processed.Files += c.Files - p.processed.Dirs += c.Dirs - p.processed.Bytes += c.Bytes - p.estimator.recordBytes(time.Now(), c.Bytes) - p.scanStarted = true -} - // CompleteBlob is called for all saved blobs for files. func (p *Progress) CompleteBlob(bytes uint64) { - p.mu.Lock() - p.addProcessed(Counter{Bytes: bytes}) - p.mu.Unlock() + p.processed.Bytes.Add(bytes) + p.estimator.recordBytes(time.Now(), bytes) + p.scanStarted.Store(true) } // CompleteItem is the status callback function for the archiver when a // file/dir has been saved successfully. -func (p *Progress) CompleteItem(item string, previous, current *data.Node, s archiver.ItemStats, d time.Duration) { +func (p *Progress) CompleteItem(item string, previous, current *data.Node, s *archiver.ItemStats, d time.Duration) { if current == nil { // error occurred, tell the status display to remove the line p.mu.Lock() @@ -125,9 +116,8 @@ func (p *Progress) CompleteItem(item string, previous, current *data.Node, s arc switch current.Type { case data.NodeTypeDir: - p.mu.Lock() - p.addProcessed(Counter{Dirs: 1}) - p.mu.Unlock() + p.processed.Dirs.Add(1) + p.scanStarted.Store(true) switch { case previous == nil: @@ -139,8 +129,10 @@ func (p *Progress) CompleteItem(item string, previous, current *data.Node, s arc } case data.NodeTypeFile: + p.processed.Files.Add(1) + p.scanStarted.Store(true) + p.mu.Lock() - p.addProcessed(Counter{Files: 1}) delete(p.currentFiles, item) p.mu.Unlock() @@ -157,14 +149,13 @@ func (p *Progress) CompleteItem(item string, previous, current *data.Node, s arc // ReportTotal sets the total stats up to now func (p *Progress) ReportTotal(item string, s archiver.ScanStats) { - p.mu.Lock() - defer p.mu.Unlock() - - p.total = Counter{Files: uint64(s.Files), Dirs: uint64(s.Dirs), Bytes: s.Bytes} - p.scanStarted = true + p.total.Files.Store(uint64(s.Files)) + p.total.Dirs.Store(uint64(s.Dirs)) + p.total.Bytes.Store(uint64(s.Bytes)) + p.scanStarted.Store(true) if item == "" { - p.scanFinished = true + p.scanFinished.Store(true) p.printer.ReportTotal(p.start, s) } } diff --git a/internal/ui/backup/progress_test.go b/internal/ui/backup/progress_test.go index 7b53f2116..0a75629d9 100644 --- a/internal/ui/backup/progress_test.go +++ b/internal/ui/backup/progress_test.go @@ -18,12 +18,12 @@ type mockPrinter struct { id restic.ID } -func (p *mockPrinter) Update(_, _ Counter, _ uint, _ map[string]struct{}, _ time.Time, _ uint64) { +func (p *mockPrinter) Update(_, _ Counter, _ uint32, _ map[string]struct{}, _ time.Time, _ uint64) { } func (p *mockPrinter) Error(_ string, err error) error { return err } func (p *mockPrinter) ScannerError(_ string, err error) error { return err } -func (p *mockPrinter) CompleteItem(messageType string, _ string, _ archiver.ItemStats, _ time.Duration) { +func (p *mockPrinter) CompleteItem(messageType string, _ string, _ *archiver.ItemStats, _ time.Duration) { p.Lock() defer p.Unlock() @@ -56,10 +56,10 @@ func TestProgress(t *testing.T) { // "dir unchanged" node := data.Node{Type: data.NodeTypeDir} - prog.CompleteItem("foo", &node, &node, archiver.ItemStats{}, 0) + prog.CompleteItem("foo", &node, &node, &archiver.ItemStats{}, 0) // "file new" node.Type = data.NodeTypeFile - prog.CompleteItem("foo", nil, &node, archiver.ItemStats{}, 0) + prog.CompleteItem("foo", nil, &node, &archiver.ItemStats{}, 0) time.Sleep(10 * time.Millisecond) id := restic.NewRandomID() diff --git a/internal/ui/backup/rate_estimator.go b/internal/ui/backup/rate_estimator.go index 5291fbae1..b72a3334d 100644 --- a/internal/ui/backup/rate_estimator.go +++ b/internal/ui/backup/rate_estimator.go @@ -2,6 +2,7 @@ package backup import ( "container/list" + "sync" "time" ) @@ -12,7 +13,9 @@ type rateBucket struct { } // rateEstimator represents an estimate of the time to complete an operation. +// It is safe for concurrent use. type rateEstimator struct { + mu sync.Mutex buckets *list.List start time.Time totalBytes uint64 @@ -71,6 +74,9 @@ func (r *rateEstimator) recordBytes(now time.Time, bytes uint64) { if bytes == 0 { return } + r.mu.Lock() + defer r.mu.Unlock() + var tail *rateBucket if r.buckets.Len() > 0 { tail = r.buckets.Back().Value.(*rateBucket) @@ -88,6 +94,9 @@ func (r *rateEstimator) recordBytes(now time.Time, bytes uint64) { // rate returns an estimated bytes per second rate at a given time, or zero // if there is not enough data to compute a rate. func (r *rateEstimator) rate(now time.Time) float64 { + r.mu.Lock() + defer r.mu.Unlock() + r.trim(now) if !r.start.Before(now) { return 0 diff --git a/internal/ui/backup/text.go b/internal/ui/backup/text.go index 8b416da7d..e047ffd95 100644 --- a/internal/ui/backup/text.go +++ b/internal/ui/backup/text.go @@ -32,20 +32,20 @@ func NewTextProgress(term ui.Terminal, verbosity uint) *TextProgress { } // Update updates the status lines. -func (b *TextProgress) Update(total, processed Counter, errors uint, currentFiles map[string]struct{}, start time.Time, secs uint64) { +func (b *TextProgress) Update(total, processed Counter, errors uint32, currentFiles map[string]struct{}, start time.Time, secs uint64) { var status string - if total.Files == 0 && total.Dirs == 0 { + if total.Files.Load() == 0 && total.Dirs.Load() == 0 { // no total count available yet status = fmt.Sprintf("[%s] %v files, %s, %d errors", ui.FormatDuration(time.Since(start)), - processed.Files, ui.FormatBytes(processed.Bytes), errors, + processed.Files.Load(), ui.FormatBytes(processed.Bytes.Load()), errors, ) } else { var eta, percent string - if secs > 0 && processed.Bytes < total.Bytes { + if secs > 0 && processed.Bytes.Load() < total.Bytes.Load() { eta = fmt.Sprintf(" ETA %s", ui.FormatSeconds(secs)) - percent = ui.FormatPercent(processed.Bytes, total.Bytes) + percent = ui.FormatPercent(processed.Bytes.Load(), total.Bytes.Load()) percent += " " } @@ -53,10 +53,10 @@ func (b *TextProgress) Update(total, processed Counter, errors uint, currentFile status = fmt.Sprintf("[%s] %s%v files %s, total %v files %v, %d errors%s", ui.FormatDuration(time.Since(start)), percent, - processed.Files, - ui.FormatBytes(processed.Bytes), - total.Files, - ui.FormatBytes(total.Bytes), + processed.Files.Load(), + ui.FormatBytes(processed.Bytes.Load()), + total.Files.Load(), + ui.FormatBytes(total.Bytes.Load()), errors, eta, ) @@ -89,28 +89,28 @@ func (b *TextProgress) Error(_ string, err error) error { // CompleteItem is the status callback function for the archiver when a // file/dir has been saved successfully. -func (b *TextProgress) CompleteItem(messageType, item string, s archiver.ItemStats, d time.Duration) { +func (b *TextProgress) CompleteItem(messageType, item string, s *archiver.ItemStats, d time.Duration) { item = ui.Quote(item) switch messageType { case "dir new": b.VV("new %v, saved in %.3fs (%v added, %v stored, %v metadata)", - item, d.Seconds(), ui.FormatBytes(s.DataSize), - ui.FormatBytes(s.DataSizeInRepo), ui.FormatBytes(s.TreeSizeInRepo)) + item, d.Seconds(), ui.FormatBytes(s.DataSize.Load()), + ui.FormatBytes(s.DataSizeInRepo.Load()), ui.FormatBytes(s.TreeSizeInRepo.Load())) case "dir unchanged": b.VV("unchanged %v", item) case "dir modified": b.VV("modified %v, saved in %.3fs (%v added, %v stored, %v metadata)", - item, d.Seconds(), ui.FormatBytes(s.DataSize), - ui.FormatBytes(s.DataSizeInRepo), ui.FormatBytes(s.TreeSizeInRepo)) + item, d.Seconds(), ui.FormatBytes(s.DataSize.Load()), + ui.FormatBytes(s.DataSizeInRepo.Load()), ui.FormatBytes(s.TreeSizeInRepo.Load())) case "file new": b.VV("new %v, saved in %.3fs (%v added, %v stored)", item, - d.Seconds(), ui.FormatBytes(s.DataSize), ui.FormatBytes(s.DataSizeInRepo)) + d.Seconds(), ui.FormatBytes(s.DataSize.Load()), ui.FormatBytes(s.DataSizeInRepo.Load())) case "file unchanged": b.VV("unchanged %v", item) case "file modified": b.VV("modified %v, saved in %.3fs (%v added, %v stored)", item, - d.Seconds(), ui.FormatBytes(s.DataSize), ui.FormatBytes(s.DataSizeInRepo)) + d.Seconds(), ui.FormatBytes(s.DataSize.Load()), ui.FormatBytes(s.DataSizeInRepo.Load())) } } @@ -134,15 +134,15 @@ func (b *TextProgress) Finish(id restic.ID, summary *archiver.Summary, dryRun bo b.P("\n") b.P("Files: %5d new, %5d changed, %5d unmodified\n", summary.Files.New, summary.Files.Changed, summary.Files.Unchanged) b.P("Dirs: %5d new, %5d changed, %5d unmodified\n", summary.Dirs.New, summary.Dirs.Changed, summary.Dirs.Unchanged) - b.V("Data Blobs: %5d new\n", summary.ItemStats.DataBlobs) - b.V("Tree Blobs: %5d new\n", summary.ItemStats.TreeBlobs) + b.V("Data Blobs: %5d new\n", summary.ItemStats.DataBlobs.Load()) + b.V("Tree Blobs: %5d new\n", summary.ItemStats.TreeBlobs.Load()) verb := "Added" if dryRun { verb = "Would add" } b.P("%s to the repository: %-5s (%-5s stored)\n", verb, - ui.FormatBytes(summary.ItemStats.DataSize+summary.ItemStats.TreeSize), - ui.FormatBytes(summary.ItemStats.DataSizeInRepo+summary.ItemStats.TreeSizeInRepo)) + ui.FormatBytes(summary.ItemStats.DataSize.Load()+summary.ItemStats.TreeSize.Load()), + ui.FormatBytes(summary.ItemStats.DataSizeInRepo.Load()+summary.ItemStats.TreeSizeInRepo.Load())) b.P("\n") b.P("processed %v files, %v in %s", summary.Files.New+summary.Files.Changed+summary.Files.Unchanged,