diff --git a/changelog/unreleased/issue-5473 b/changelog/unreleased/issue-5473 new file mode 100644 index 000000000..1d79ae079 --- /dev/null +++ b/changelog/unreleased/issue-5473 @@ -0,0 +1,26 @@ +Enhancement: Add rechunk copy feature + +Restic didn't rechunk data blobs when copying snapshots between repositories +with different chunker parameters. Instead, it copied the blobs as-is, +which impaired deduplication powered by [Content Defined Chunking](https://restic.net/blog/2015-09-12/restic-foundation1-cdc/). +To mitigate this issue, users had to manually restore the snapshots somewhere, +and then backup them again to the new repository. This workaround was +inefficient, prone to tamper with the original metadata, and bothersome. + +It now supports `--rechunk` option in `copy` command, in which the data are rechunked while copying. +Currently, it does not automatically skip previously copied snapshots. Also, +it does not remember which files had been rechunked in previous runs, so it will +try to rework on every file again in the next run (though it would not add new +data blobs to the repository in that case; this is what deduplication is for). +Therefore, current `copy --rechunk` is adequate for one-time migration between repositories. +For incremental copy scenarios, `copy` between repositories with same chunker +parameter is ideal. + +`copy --rechunk` has a few additional options. It has `--force` option to force rechunk copy +even when the chunker parameters are same between source and destination repositories, +and `--cache-size` option to specify the in-memory blob cache size during rechunk copy process. +Also, it has `--add-tag` option to add tags to the copied snapshots in the destination repo. + +https://github.com/restic/restic/issues/5473 +https://forum.restic.net/t/is-it-possible-to-re-chunk-after-a-restic-copy/6072 +https://forum.restic.net/t/copy-snapshots-between-repositories-without-copy-chunker-params/7320 diff --git a/cmd/restic/cmd_copy.go b/cmd/restic/cmd_copy.go index d17ded7c9..4971b1257 100644 --- a/cmd/restic/cmd_copy.go +++ b/cmd/restic/cmd_copy.go @@ -10,7 +10,9 @@ import ( "github.com/restic/restic/internal/data" "github.com/restic/restic/internal/debug" "github.com/restic/restic/internal/errors" + "github.com/restic/restic/internal/feature" "github.com/restic/restic/internal/global" + "github.com/restic/restic/internal/rechunker" "github.com/restic/restic/internal/repository" "github.com/restic/restic/internal/restic" "github.com/restic/restic/internal/ui" @@ -65,11 +67,28 @@ Exit status is 12 if the password is incorrect. type CopyOptions struct { global.SecondaryRepoOptions data.SnapshotFilter + RechunkCopyOptions } func (opts *CopyOptions) AddFlags(f *pflag.FlagSet) { opts.SecondaryRepoOptions.AddFlags(f, "destination", "to copy snapshots from") initMultiSnapshotFilter(f, &opts.SnapshotFilter, true) + opts.RechunkCopyOptions.AddFlags(f) +} + +type RechunkCopyOptions struct { + Rechunk bool + ForceRechunk bool + AddTags data.TagLists + CacheSize int + isIntegrationTest bool // skip check for RESTIC_FEATURES=rechunk-copy during integration test +} + +func (opts *RechunkCopyOptions) AddFlags(f *pflag.FlagSet) { + f.BoolVar(&opts.Rechunk, "rechunk", false, "rechunk files when copying") + f.BoolVar(&opts.ForceRechunk, "force", false, "force rechunk even when src and dst repo have same chunker polynomials; to be used with --rechunk") + f.Var(&opts.AddTags, "add-tag", "add `tags` for the copied snapshots in the format `tag[,tag,...]` (can be specified multiple times). Used with --rechunk") + f.IntVar(&opts.CacheSize, "cache-size", 4096, "for rechunk copy, specify in-memory blob cache size in MiBs (0 to disable cache). Used with --rechunk") } // collectAllSnapshots: select all snapshot trees to be copied @@ -106,6 +125,17 @@ func collectAllSnapshots(ctx context.Context, opts CopyOptions, } func runCopy(ctx context.Context, opts CopyOptions, gopts global.Options, args []string, term ui.Terminal) error { + // Rechunk-copy guardrails + if opts.Rechunk { + debug.Log("Rechunk option enabled") + if !feature.Flag.Enabled(feature.RechunkCopy) && !opts.isIntegrationTest { + return errors.Fatal("rechunk-copy feature flag is not set. Currently, rechunk-copy is alpha feature (disabled by default).") + } + if opts.CacheSize != 0 && opts.CacheSize < 100 { + return errors.Fatal("blob cache size must be at least 100 MiB") + } + } + printer := ui.NewProgressPrinter(false, gopts.Verbosity, term) secondaryGopts, isFromRepo, err := opts.SecondaryRepoOptions.FillGlobalOpts(ctx, gopts, "destination") if err != nil { @@ -128,6 +158,11 @@ func runCopy(ctx context.Context, opts CopyOptions, gopts global.Options, args [ } defer unlock() + // if rechunk is enabled, ensure srcRepo and dstRepo have different ChunkerPolynomials + if opts.Rechunk && !opts.ForceRechunk && srcRepo.Config().ChunkerPolynomial == dstRepo.Config().ChunkerPolynomial { + return errors.Fatal("source repo and destination repo have same chunker polynomials; run without `--rechunk`, or set `--force` flag to proceed with rechunk anyway") + } + srcSnapshotLister, err := restic.MemorizeList(ctx, srcRepo, restic.SnapshotFile) if err != nil { return err @@ -161,8 +196,23 @@ func runCopy(ctx context.Context, opts CopyOptions, gopts global.Options, args [ selectedSnapshots := collectAllSnapshots(ctx, opts, srcSnapshotLister, srcRepo, dstSnapshotByOriginal, args, printer) - if err := copyTreeBatched(ctx, srcRepo, dstRepo, selectedSnapshots, printer); err != nil { - return err + if !opts.Rechunk { + if err := copyTreeBatched(ctx, srcRepo, dstRepo, selectedSnapshots, printer); err != nil { + return err + } + } else { + rechnker := rechunker.NewRechunker(rechunker.Config{ + CacheSize: opts.CacheSize * (1 << 20), + Pol: dstRepo.Config().ChunkerPolynomial, + }) + progress := rechunker.NewProgress( + term, + printer, + ui.CalculateProgressInterval(!gopts.Quiet, gopts.JSON, term.CanUpdateStatus()), + ) + if err := rechunkCopy(ctx, srcRepo, dstRepo, selectedSnapshots, rechnker, printer, progress, opts.AddTags.Flatten()); err != nil { + return err + } } return ctx.Err() @@ -342,3 +392,62 @@ func copySaveSnapshot(ctx context.Context, sn *data.Snapshot, dstRepo restic.Rep printer.P("snapshot %s saved, copied from source snapshot %s", newID.Str(), sn.ID().Str()) return nil } + +func rechunkCopy(ctx context.Context, srcRepo, dstRepo restic.Repository, selectedSnapshots iter.Seq[*data.Snapshot], + rechnker *rechunker.Rechunker, printer progress.Printer, progress *rechunker.Progress, tags data.TagList) error { + printer.V("Gathering snapshots...") + var snapshots []*data.Snapshot + var rootTrees restic.IDs + debug.Log("Gathering root trees from selectedSnapshots()") + selectedSnapshots(func(sn *data.Snapshot) bool { + snapshots = append(snapshots, sn) + rootTrees = append(rootTrees, *sn.Tree) + return true + }) + + printer.V("Scanning files to process... ") + debug.Log("Running Plan()") + err := rechnker.Plan(ctx, srcRepo, rootTrees) + if err != nil { + return err + } + + printer.V("\n[Pre-run Summary]") + // num_snapshots, num_distinct_files, total_size, num_packs, + printer.V("Number of snapshots: %v", len(rootTrees)) + printer.V("Number of distinct files to process: %v", rechnker.NumFiles()) + printer.V(" - Total size (including duplicate blobs): %v", ui.FormatBytes(rechnker.TotalSize())) + printer.V("Number of packs to download: %v\n\n", rechnker.NumPacks()) + + debug.Log("Running Rechunk()") + progress.Start(rechnker.NumFiles(), rechnker.TotalSize()) + err = rechnker.Rechunk(ctx, srcRepo, dstRepo, progress) + if err != nil { + return err + } + progress.Done() + + printer.V("Rewriting trees...") + debug.Log("Running RewriteTrees()") + _, err = rechnker.RewriteTrees(ctx, srcRepo, dstRepo, rootTrees) + if err != nil { + return err + } + + printer.V("Writing snapshots...") + for _, sn := range snapshots { + newTreeID, err := rechnker.GetRewrittenTree(*sn.Tree) + if err != nil { + return err + } + sn.Tree = &newTreeID + sn.AddTags(tags) + if err = copySaveSnapshot(ctx, sn, dstRepo, printer); err != nil { + return err + } + } + + printer.P("Additional data stored to the repository: %v", ui.FormatBytes(rechnker.TotalAddedToDstRepo())) + + return nil +} diff --git a/cmd/restic/cmd_copy_integration_test.go b/cmd/restic/cmd_copy_integration_test.go index 6105acfe4..15fdfdc75 100644 --- a/cmd/restic/cmd_copy_integration_test.go +++ b/cmd/restic/cmd_copy_integration_test.go @@ -30,11 +30,35 @@ func testRunCopy(t testing.TB, srcGopts global.Options, dstGopts global.Options) })) } +func testRunRechunkCopy(t testing.TB, srcGopts global.Options, dstGopts global.Options) { + gopts := srcGopts + gopts.Repo = dstGopts.Repo + gopts.Password = dstGopts.Password + gopts.InsecureNoPassword = dstGopts.InsecureNoPassword + copyOpts := CopyOptions{ + SecondaryRepoOptions: global.SecondaryRepoOptions{ + Repo: srcGopts.Repo, + Password: srcGopts.Password, + InsecureNoPassword: srcGopts.InsecureNoPassword, + }, + RechunkCopyOptions: RechunkCopyOptions{ + Rechunk: true, + isIntegrationTest: true, + }, + } + + rtest.OK(t, withTermStatus(t, gopts, func(ctx context.Context, gopts global.Options) error { + return runCopy(context.TODO(), copyOpts, gopts, nil, gopts.Term) + })) +} + func TestCopy(t *testing.T) { env, cleanup := withTestEnvironment(t) defer cleanup() env2, cleanup2 := withTestEnvironment(t) defer cleanup2() + env3, cleanup3 := withTestEnvironment(t) // test env for rechunk-copy + defer cleanup3() testSetupBackupData(t, env) opts := BackupOptions{} @@ -46,8 +70,12 @@ func TestCopy(t *testing.T) { testRunInit(t, env2.gopts) testRunCopy(t, env.gopts, env2.gopts) + testRunInit(t, env3.gopts) + testRunRechunkCopy(t, env.gopts, env3.gopts) + snapshotIDs := testListSnapshots(t, env.gopts, 3) copiedSnapshotIDs := testListSnapshots(t, env2.gopts, 3) + rechunkCopiedSnapshotIDs := testListSnapshots(t, env3.gopts, 3) // Check that the copies size seems reasonable stat := dirStats(t, env.repo) @@ -61,12 +89,15 @@ func TestCopy(t *testing.T) { // Check integrity of the copy testRunCheck(t, env2.gopts) + testRunCheck(t, env3.gopts) // Check that the copied snapshots have the same tree contents as the old ones (= identical tree hash) origRestores := make(map[string]struct{}) + origRestores2 := make(map[string]struct{}) for i, snapshotID := range snapshotIDs { restoredir := filepath.Join(env.base, fmt.Sprintf("restore%d", i)) origRestores[restoredir] = struct{}{} + origRestores2[restoredir] = struct{}{} testRunRestore(t, env.gopts, restoredir, snapshotID.String()) } for i, snapshotID := range copiedSnapshotIDs { @@ -84,7 +115,24 @@ func TestCopy(t *testing.T) { rtest.Assert(t, foundMatch, "found no counterpart for snapshot %v", snapshotID) } + // Check that the rechunk-copied snapshots have the same tree contents as the old ones + for i, snapshotID := range rechunkCopiedSnapshotIDs { + restoredir := filepath.Join(env3.base, fmt.Sprintf("restore%d", i)) + testRunRestore(t, env3.gopts, restoredir, snapshotID.String()) + foundMatch := false + for cmpdir := range origRestores2 { + diff := directoriesContentsDiff(t, restoredir, cmpdir) + if diff == "" { + delete(origRestores2, cmpdir) + foundMatch = true + } + } + + rtest.Assert(t, foundMatch, "found no counterpart for snapshot %v", snapshotID) + } + rtest.Assert(t, len(origRestores) == 0, "found not copied snapshots") + rtest.Assert(t, len(origRestores2) == 0, "found not rechunk-copied snapshots") // check that snapshots were properly batched while copying _, _, countBlobs := testPackAndBlobCounts(t, env.gopts) @@ -166,6 +214,8 @@ func TestCopyUnstableJSON(t *testing.T) { defer cleanup() env2, cleanup2 := withTestEnvironment(t) defer cleanup2() + env3, cleanup3 := withTestEnvironment(t) // test env for rechunk-copy + defer cleanup3() // contains a symlink created using `ln -s '../i/'$'\355\246\361''d/samba' broken-symlink` datafile := filepath.Join("testdata", "copy-unstable-json.tar.gz") @@ -175,6 +225,11 @@ func TestCopyUnstableJSON(t *testing.T) { testRunCopy(t, env.gopts, env2.gopts) testRunCheck(t, env2.gopts) testListSnapshots(t, env2.gopts, 1) + + testRunInit(t, env3.gopts) + testRunRechunkCopy(t, env.gopts, env3.gopts) + testRunCheck(t, env3.gopts) + testListSnapshots(t, env3.gopts, 1) } func TestCopyToEmptyPassword(t *testing.T) { @@ -184,14 +239,22 @@ func TestCopyToEmptyPassword(t *testing.T) { defer cleanup2() env2.gopts.Password = "" env2.gopts.InsecureNoPassword = true + env3, cleanup3 := withTestEnvironment(t) // test env for rechunk-copy + defer cleanup3() + env3.gopts.Password = "" + env3.gopts.InsecureNoPassword = true testSetupBackupData(t, env) testRunBackup(t, "", []string{filepath.Join(env.testdata, "0", "0", "9")}, BackupOptions{}, env.gopts) testRunInit(t, env2.gopts) testRunCopy(t, env.gopts, env2.gopts) + testRunInit(t, env3.gopts) + testRunRechunkCopy(t, env.gopts, env3.gopts) testListSnapshots(t, env.gopts, 1) testListSnapshots(t, env2.gopts, 1) + testListSnapshots(t, env3.gopts, 1) testRunCheck(t, env2.gopts) + testRunCheck(t, env3.gopts) } diff --git a/doc/045_working_with_repos.rst b/doc/045_working_with_repos.rst index 9aff3ab0f..80baa0175 100644 --- a/doc/045_working_with_repos.rst +++ b/doc/045_working_with_repos.rst @@ -240,7 +240,7 @@ messages can appear some time after the snapshot content was copied. source and destination repository. This *may incur higher bandwidth usage and costs* than expected during normal backup runs. -.. important:: The copying process does not re-chunk files, which may break +.. important:: The plain copy command does not re-chunk files, which may break deduplication between the files copied and files already stored in the destination repository. This means that copied files, which existed in both the source and destination repository, *may occupy up to twice their @@ -292,9 +292,11 @@ Ensuring deduplication for copied snapshots ------------------------------------------- Even though the copy command can transfer snapshots between arbitrary repositories, -deduplication between snapshots from the source and destination repository may not work. -To ensure proper deduplication, both repositories have to use the same parameters for -splitting large files into smaller chunks, which requires additional setup steps. With +deduplication between snapshots from the source and destination repository may not work +with plain copy command. There are two methods to ensure proper deduplication between +repositories. First one is to use ``--rechunk`` option described below. Second one is +to make both repositories use the same parameters for splitting large files into smaller +chunks, which requires additional setup steps. With the same parameters restic will for both repositories split identical files into identical chunks and therefore deduplication also works for snapshots copied between these repositories. @@ -309,6 +311,36 @@ using the same chunker parameters as the source repository: Note that it is not possible to change the chunker parameters of an existing repository. +Rechunk copy between repositories with different chunker parameters +------------------------------------------------------------------- + +The ``copy --rechunk`` command re-chunks files with destination repository's chunker parameters +when copying snapshots. There are a few command-line options used with ``--rechunk``. +First is ``--force``, which forces it to rechunk files even when the chunker parameters are same for +the source and destination repositories. +Second is ``--cache-size``. The rechunk-copy command uses in-memory cache for +rechunking, whose default size is 4096 MiB. You can customize the cache size, +adapting for your system's RAM size and desired memory usage. Note that a small cache size will lead to +frequent re-download of packs, which is especially undesirable for remote source repositories. +Third is ``--add-tag``, which adds tags to the copied snapshots in the destination repo. + +The below commands are all valid ones. + +.. code-block:: console + + $ restic -r /srv/dst-repo copy --rechunk --from-repo /srv/src-repo + $ restic -r /srv/dst-repo copy --rechunk --from-repo /srv/src-repo --host luigi --path /srv/data --tag foo,bar + $ restic -r /srv/dst-repo copy --rechunk --add-tag my-rechunk --from-repo /srv/src-repo 34c9e85f 2714b65a + $ restic -r /srv/dst-repo copy --rechunk --cache-size 8192 --from-repo /srv/src-repo # set cache size to 8192 MiB + +.. note:: Although the ``copy --rechunk`` command can provide on-demand deduplication between + repositories with different chunker parameters, there are a few disadvantages compared + to the plain copy. The rechunk copy is slower because it re-assembles + all files and does the same all computations which are done during backup. Also, as of now, + the rechunk copy does not support skipping redundant snapshots, so you should + manually specify the exact snapshots to copy. Therefore, it is recommended to use + repositories with the same chunker parameter if you plan to copy regularly between your repositories. + Removing files from snapshots ============================= diff --git a/internal/feature/registry.go b/internal/feature/registry.go index a7368fa75..fb373ea6a 100644 --- a/internal/feature/registry.go +++ b/internal/feature/registry.go @@ -10,6 +10,7 @@ const ( DeprecateS3LegacyLayout FlagName = "deprecate-s3-legacy-layout" DeviceIDForHardlinks FlagName = "device-id-for-hardlinks" ExplicitS3AnonymousAuth FlagName = "explicit-s3-anonymous-auth" + RechunkCopy FlagName = "rechunk-copy" SafeForgetKeepTags FlagName = "safe-forget-keep-tags" S3Restore FlagName = "s3-restore" ) @@ -21,6 +22,7 @@ func init() { DeprecateS3LegacyLayout: {Type: Stable, Description: "disable support for S3 legacy layout used up to restic 0.7.0. Use restic 0.17.3 to migrate if necessary."}, DeviceIDForHardlinks: {Type: Alpha, Description: "store deviceID only for hardlinks to reduce metadata changes for example when using btrfs subvolumes. Will be removed in a future restic version after repository format 3 is available"}, ExplicitS3AnonymousAuth: {Type: Stable, Description: "forbid anonymous S3 authentication unless `-o s3.unsafe-anonymous-auth=true` is set"}, + RechunkCopy: {Type: Alpha, Description: "enable rechunk-copy command, where it rechunks data blobs while copying snapshots. This command is not stable yet, so use with caution."}, SafeForgetKeepTags: {Type: Stable, Description: "prevent deleting all snapshots if the tag passed to `forget --keep-tags tagname` does not exist"}, S3Restore: {Type: Alpha, Description: "restore S3 objects from cold storage classes when `-o s3.enable-restore=true` is set"}, }) diff --git a/internal/rechunker/blob_cache.go b/internal/rechunker/blob_cache.go new file mode 100644 index 000000000..218104cc4 --- /dev/null +++ b/internal/rechunker/blob_cache.go @@ -0,0 +1,344 @@ +package rechunker + +import ( + "context" + "fmt" + "sync" + + "github.com/hashicorp/golang-lru/v2/simplelru" + "github.com/restic/restic/internal/debug" + "github.com/restic/restic/internal/restic" + "golang.org/x/sync/errgroup" +) + +type BlobCache struct { + mu sync.RWMutex + c *simplelru.LRU[restic.ID, []byte] + + idx Index + + free, size int + + waitList restic.IDSet // set of packs waiting for download + inProgress map[restic.ID]chan struct{} // blob ready event channel; open by requestDownload(), closed by downloaders + downloadCh chan restic.ID // pack download request channel; produced by requestDownload(), consumed by downloaders + + ignored restic.IDSet // set of ignored blobs; blobs in this set are excluded from download + + cancel func() // this function is called at Close(), cancelling cache context +} + +const overhead = len(restic.ID{}) + 64 + +func NewBlobCache(ctx context.Context, size int, numDownloaders int, + repo PackLoader, idx Index, + onReady func(blobIDs restic.IDs), onEvict func(blobIDs restic.IDs)) *BlobCache { + if size < 32*(1<<20) { + panic("Blob cache size should be at least 32 MiB!!") + } + debug.Log("Creating blob cache of size %v", size) + + ctx, cancel := context.WithCancel(ctx) + + c := &BlobCache{ + idx: idx, + + size: size, + free: size, + + waitList: restic.NewIDSet(), + inProgress: map[restic.ID]chan struct{}{}, + downloadCh: make(chan restic.ID), + + ignored: restic.NewIDSet(), + + cancel: cancel, + } + + lru, err := simplelru.NewLRU(size, func(_ restic.ID, v []byte) { + c.free += cap(v) + overhead + }) + if err != nil { + panic(err) + } + c.c = lru + + // create download function that uses repo's LoadBlobsFromPack + download := createDownloadFn(ctx, repo) + + c.startDownloaders(ctx, numDownloaders, download, onReady, onEvict) + + return c +} + +type blobMap = map[restic.ID][]byte +type downloadFn func(packID restic.ID, blobs []restic.Blob) (blobMap, error) + +func createDownloadFn(ctx context.Context, repo PackLoader) downloadFn { + return func(packID restic.ID, blobs []restic.Blob) (blobMap, error) { + bm := blobMap{} + err := repo.LoadBlobsFromPack(ctx, packID, blobs, + func(blob restic.BlobHandle, buf []byte, err error) error { + if err != nil { + return err + } + newBuf := make([]byte, len(buf)) + copy(newBuf, buf) + bm[blob.ID] = newBuf + + return nil + }) + if err != nil { + return blobMap{}, err + } + return bm, nil + } +} + +func (c *BlobCache) startDownloaders(ctx context.Context, numDownloaders int, + download downloadFn, onReady, onEvict func(blobIDs restic.IDs)) { + wg, ctx := errgroup.WithContext(ctx) + for range numDownloaders { + wg.Go(func() error { + debug.Log("Starting blob cache downloader") + defer debug.Log("Stopping blob cache downloader") + + for { + // listen to pack download request + var packID restic.ID + select { + case <-ctx.Done(): + return ctx.Err() + case packID = <-c.downloadCh: + } + + // filter out ignored blobs + c.mu.RLock() + var filtered []restic.Blob + for _, blob := range c.idx.PackToBlobs(packID) { + ignored := c.ignored.Has(blob.ID) + ready := c.c.Contains(blob.ID) + if !ignored && !ready { + filtered = append(filtered, blob) + } + } + c.mu.RUnlock() + + // skip if no blobs to download + if len(filtered) == 0 { + continue + } + + // download blobs from the repo + debug.Log("Starting download of %v blobs in pack %v", len(filtered), packID.Str()) + blobs, err := download(packID, filtered) + if err != nil { + return err + } + + // pop the pack from the waitlist, + // store downloaded blobs to the cache, + // and notify that blobs are ready + var ready, evicted restic.IDs + c.mu.Lock() + delete(c.waitList, packID) + for id, data := range blobs { + size := cap(data) + overhead + for size > c.free { // evict old blobs if there is not enough free space + id, _, ok := c.c.RemoveOldest() + if ok { + evicted = append(evicted, id) + } else { + defer c.mu.Unlock() + return fmt.Errorf("not enough cache size to store a blob; needs at least %d bytes, but has only %d bytes", size, c.free) + } + } + c.c.Add(id, data) + c.free -= size + if _, ok := c.inProgress[id]; ok { + close(c.inProgress[id]) + delete(c.inProgress, id) + } + ready = append(ready, id) + } + currentCacheUsage := c.size - c.free // for debug logging + c.mu.Unlock() + + // execute callbacks + if len(evicted) > 0 { + if onEvict != nil { + onEvict(evicted) + } + debug.Log("%v blobs are evicted.", len(evicted)) + } + if onReady != nil { + onReady(ready) + } + + debug.Log("Pack %v loaded. Current cache usage: %v", packID.Str(), currentCacheUsage) + debug.Log("Pack %v includes the following blobs: \n%v", packID.Str(), ready.String()) + + // debugStats: track maximum memory usage + if debugStats != nil { + debugStats.UpdateMax("max_cache_usage", currentCacheUsage) + } + } + }) + } +} + +func (c *BlobCache) Get(ctx context.Context, id restic.ID, buf []byte) ([]byte, <-chan []byte) { + c.mu.Lock() + blob, ok := c.c.Get(id) // try to retrieve blob, with recency update + c.mu.Unlock() + + if ok { // case 1: when blob exists in cache: return that blob immediately + if cap(buf) < len(blob) { + debug.Log("Allocating new buf, as it has smaller capacity than chunk size.") + buf = make([]byte, len(blob)) + } else { + buf = buf[:len(blob)] + } + copy(buf, blob) + + debug.Log("Cache hit. Returning blob %v", id.Str()) + return buf, nil + } + + // case 2: when blob does not exist in cache: return chOut (where downloaded blob will be delievered) + debug.Log("Cache miss. Requesting async get for blob %v", id.Str()) + chOut := c.asyncGet(ctx, id, buf) + + return nil, chOut +} + +func (c *BlobCache) asyncGet(ctx context.Context, id restic.ID, buf []byte) <-chan []byte { + wg, ctx := errgroup.WithContext(ctx) + out := make(chan []byte, 1) + + wg.Go(func() error { + for { + c.mu.RLock() + blob, ready := c.c.Peek(id) + finish, inProgress := c.inProgress[id] + c.mu.RUnlock() + + if ready { // case A: blob is now ready in the cache + if cap(buf) < len(blob) { + debug.Log("Allocating new buf, as it has smaller capacity than chunk size.") + buf = make([]byte, len(blob)) + } else { + buf = buf[:len(blob)] + } + copy(buf, blob) + + debug.Log("Blob %v is now ready in the cache. Passing blob data to channel.", id.Str()) + out <- buf + return nil + } + if inProgress { // case B: blob is queued, but not yet ready + debug.Log("Waiting for blob %v to be ready in the cache.", id.Str()) + select { + case <-ctx.Done(): + return ctx.Err() + case <-finish: // wait until download complete + continue + } + } + + // case C: blob is not queued + // add to the download queue + debug.Log("Requesting download of the pack containing blob %v", id.Str()) + err := c.requestDownload(ctx, id) + if err != nil { + return err + } + } + }) + + return out +} + +func (c *BlobCache) requestDownload(ctx context.Context, id restic.ID) error { + packID := c.idx.BlobToPack(id) + if packID.IsNull() { + return fmt.Errorf("unknown blob: %v", id.Str()) + } + + c.mu.Lock() + ok := c.waitList.Has(packID) + if !ok { + // queue pack download + c.waitList.Insert(packID) + } + if _, inProgress := c.inProgress[id]; !inProgress { + c.inProgress[id] = make(chan struct{}) + } + c.mu.Unlock() + + if ok { // somebody else has already queued pack download; it will handle download request + return nil + } + + // send packID to inform the downloader + select { + case <-ctx.Done(): + return ctx.Err() + case c.downloadCh <- packID: + return nil + } +} + +func (c *BlobCache) Ignore(ids restic.IDs) { + c.mu.Lock() + defer c.mu.Unlock() + + for _, id := range ids { + c.ignored.Insert(id) + _ = c.c.Remove(id) + } + + if debugStats != nil { + debugStats.Add("ignored_blob_count", len(ids)) + } +} + +func (c *BlobCache) Close() { + if c == nil { + return + } + + c.cancel() +} + +type BlobLoaderWithCache struct { + repo PackLoader + cache *BlobCache +} + +func (l *BlobLoaderWithCache) LoadBlob(ctx context.Context, _ restic.BlobType, id restic.ID, buf []byte) ([]byte, error) { + blob, ch := l.cache.Get(ctx, id, buf) + if blob == nil { // wait for blob to be downloaded + select { + case <-ctx.Done(): + return nil, ctx.Err() + case blob = <-ch: + } + } + return blob, nil +} + +type PackLoader interface { + LoadBlobsFromPack(context.Context, restic.ID, []restic.Blob, func(restic.BlobHandle, []byte, error) error) error +} + +func WrapWithCache(ctx context.Context, repo PackLoader, cacheSize int, numDownloaders int, idx Index, + onReady, onEvict func(restic.IDs)) (*BlobLoaderWithCache, *BlobCache) { + r := &BlobLoaderWithCache{ + repo: repo, + cache: NewBlobCache(ctx, cacheSize, numDownloaders, repo, idx, onReady, onEvict), + } + + debug.Log("Wrapped the repository with blob cache.") + return r, r.cache +} diff --git a/internal/rechunker/debug.go b/internal/rechunker/debug.go new file mode 100644 index 000000000..5a5e8b655 --- /dev/null +++ b/internal/rechunker/debug.go @@ -0,0 +1,103 @@ +package rechunker + +import ( + "maps" + "strings" + "sync" + + "github.com/restic/restic/internal/debug" +) + +// global data structure for debug trace +var debugStats = NewStats(true) + +type Stats struct { + d map[string]int + mu sync.Mutex +} + +func NewStats(enable bool) *Stats { + if enable { + return &Stats{ + d: map[string]int{}, + } + } + return nil +} + +func (n *Stats) Add(k string, v int) { + if n == nil { + return + } + + n.mu.Lock() + defer n.mu.Unlock() + + n.d[k] += v +} + +func (n *Stats) AddMap(m map[string]int) { + if n == nil { + return + } + + n.mu.Lock() + defer n.mu.Unlock() + + for k, v := range m { + n.d[k] += v + } +} + +func (n *Stats) UpdateMax(k string, v int) { + if n == nil { + return + } + + n.mu.Lock() + defer n.mu.Unlock() + + if n.d[k] < v { + n.d[k] = v + } +} + +func (n *Stats) Dump() (note map[string]int) { + if n == nil { + return + } + + n.mu.Lock() + defer n.mu.Unlock() + + note = map[string]int{} + maps.Copy(note, n.d) + + return note +} + +func debugPrintRechunkReport(rc *Rechunker) { + if debugStats == nil { + return + } + + dNote := debugStats.Dump() + + if rc.cfg.CacheSize > 0 { + debug.Log("List of blobs downloaded more than once:") + numBlobRedundant := 0 + redundantDownloadCount := 0 + for k := range dNote { + if strings.HasPrefix(k, "load:") && dNote[k] > 1 { + debug.Log("%v: Downloaded %d times", k[5:15], dNote[k]) + numBlobRedundant++ + redundantDownloadCount += dNote[k] + } + } + debug.Log("[summary_blobcache] Number of redundantly downloaded blobs is %d, whose overall download count is %d", numBlobRedundant, redundantDownloadCount) + debug.Log("[summary_blobcache] Peak memory usage by blob cache: %v/%v bytes", dNote["max_cache_usage"], rc.cfg.CacheSize) + if dNote["total_blob_count"] != dNote["ignored_blob_count"] { + debug.Log("[summary_blobcache] WARNING: Number of successfully ignored blob at the end: %v/%v", dNote["ignored_blob_count"], dNote["total_blob_count"]) + } + } +} diff --git a/internal/rechunker/progress.go b/internal/rechunker/progress.go new file mode 100644 index 000000000..ca99c998a --- /dev/null +++ b/internal/rechunker/progress.go @@ -0,0 +1,95 @@ +package rechunker + +import ( + "fmt" + "sync" + "time" + + "github.com/restic/restic/internal/ui" + "github.com/restic/restic/internal/ui/progress" +) + +type Progress struct { + updater progress.Updater + m sync.Mutex + + filesFinished int + filesTotal int + bytesProcessed uint64 + bytesTotal uint64 + + printer progress.Printer + term ui.Terminal + show bool +} + +func NewProgress(term ui.Terminal, printer progress.Printer, interval time.Duration) *Progress { + p := &Progress{ + term: term, + printer: printer, + } + p.updater = *progress.NewUpdater(interval, p.update) + + return p +} + +func (p *Progress) Start(fileCount int, totalSize uint64) { + p.m.Lock() + defer p.m.Unlock() + + p.filesTotal = fileCount + p.bytesTotal = totalSize + p.show = true +} + +func (p *Progress) AddFile(count int) { + p.m.Lock() + defer p.m.Unlock() + + p.filesFinished += count +} + +func (p *Progress) AddBlob(size uint64) { + p.m.Lock() + defer p.m.Unlock() + + p.bytesProcessed += size +} + +func (p *Progress) update(duration time.Duration, final bool) { + p.m.Lock() + defer p.m.Unlock() + + if p.show && !final { + formattedDuration := ui.FormatDuration(duration) + formattedBytesProcessed := ui.FormatBytes(p.bytesProcessed) + formattedBytesTotal := ui.FormatBytes(p.bytesTotal) + percent := ui.FormatPercent(p.bytesProcessed, p.bytesTotal) + progress := []string{ + fmt.Sprintf("[%s] %v/%v distinct files processed", + formattedDuration, p.filesFinished, p.filesTotal), + fmt.Sprintf("%s %s/%s", percent, formattedBytesProcessed, formattedBytesTotal), + } + p.term.SetStatus(progress) + } else if p.show && final { + formattedDuration := ui.FormatDuration(duration) + formattedBytesProcessed := ui.FormatBytes(p.bytesProcessed) + formattedBytesTotal := ui.FormatBytes(p.bytesTotal) + percent := ui.FormatPercent(p.bytesProcessed, p.bytesTotal) + + p.term.SetStatus(nil) + p.printer.P("[%s] %v/%v distinct files processed\n", formattedDuration, p.filesFinished, p.filesTotal) + p.printer.P("%s %s/%s\n", percent, formattedBytesProcessed, formattedBytesTotal) + p.show = false + } else { + p.term.SetStatus(nil) + } +} + +func (p *Progress) Done() { + if p == nil { + return + } + + p.updater.Done() +} diff --git a/internal/rechunker/rechunker.go b/internal/rechunker/rechunker.go new file mode 100644 index 000000000..634262882 --- /dev/null +++ b/internal/rechunker/rechunker.go @@ -0,0 +1,421 @@ +package rechunker + +import ( + "context" + "crypto/sha256" + "fmt" + "runtime" + "slices" + "sync" + "sync/atomic" + + "github.com/restic/chunker" + "github.com/restic/restic/internal/data" + "github.com/restic/restic/internal/debug" + "github.com/restic/restic/internal/restic" + "github.com/restic/restic/internal/walker" + "golang.org/x/sync/errgroup" +) + +type Rechunker struct { + cfg Config + idx Index + + filesList []*ChunkedFile + totalSize uint64 + rechunkReady bool + + rechunkMap map[restic.ID]restic.IDs // hashOfIDs of srcBlobIDs -> dstBlobIDs + rechunkMapLock sync.Mutex + totalAddedToDstRepo atomic.Uint64 + rewriteTreeMap map[restic.ID]restic.ID // original tree ID (in src repo) -> rewritten tree ID (in dst repo) +} + +type Config struct { + CacheSize int + Pol chunker.Pol +} + +type ChunkedFile struct { + restic.IDs + hashval restic.ID +} + +type Index interface { + BlobSize(blobID restic.ID) (size uint) // blob ID -> blob size + BlobToPack(blobID restic.ID) (packID restic.ID) // blob ID -> pack ID + PackToBlobs(packID restic.ID) (blobs []restic.Blob) // pack ID -> list of blobs to be loaded from the pack + Packs() (packIDs restic.IDSet) // set of all pack IDs +} + +func NewRechunker(cfg Config) *Rechunker { + return &Rechunker{ + cfg: cfg, + rechunkMap: map[restic.ID]restic.IDs{}, + rewriteTreeMap: map[restic.ID]restic.ID{}, + } +} + +func (rc *Rechunker) Plan(ctx context.Context, srcRepo restic.Repository, rootTrees restic.IDs) error { + var err error + debug.Log("Gathering distinct file Contents from target snapshots") + rc.filesList, rc.totalSize, err = gatherFileContents(ctx, srcRepo, rootTrees) + if err != nil { + return err + } + + debug.Log("Building the internal index for use in Rechunk()") + rc.idx, err = createIndex(rc.filesList, srcRepo.LookupBlob) + if err != nil { + return err + } + + debug.Log("Sorting the file list by their chunk counts (descending order)") + slices.SortFunc(rc.filesList, func(a, b *ChunkedFile) int { + return len(b.IDs) - len(a.IDs) // descending order + }) + + rc.rechunkReady = true + + return nil +} + +func gatherFileContents(ctx context.Context, repo restic.Loader, rootTrees restic.IDs) (filesList []*ChunkedFile, totalSize uint64, err error) { + mu := sync.Mutex{} + visitedFiles := restic.NewIDSet() + visitedTrees := restic.NewIDSet() + + // Stream through all subtrees in target rootTrees and gather all distinct file Contents + err = data.StreamTrees(ctx, repo, rootTrees, nil, func(id restic.ID) bool { + visited := visitedTrees.Has(id) + visitedTrees.Insert(id) + return visited + }, func(_ restic.ID, err error, nodes data.TreeNodeIterator) error { + if err != nil { + return err + } + + for item := range nodes { + if item.Error != nil { + return item.Error + } + if item.Node == nil || item.Node.Type != data.NodeTypeFile { + continue + } + + hashval := HashOfIDs(item.Node.Content) + + mu.Lock() + if visitedFiles.Has(hashval) { + mu.Unlock() + continue + } + visitedFiles.Insert(hashval) + + filesList = append(filesList, &ChunkedFile{ + item.Node.Content, + hashval, + }) + totalSize += item.Node.Size + mu.Unlock() + } + + return nil + }) + if err != nil { + return nil, 0, err + } + + return filesList, totalSize, nil +} + +type index struct { + blobSize map[restic.ID]uint // blob ID -> blob size + blobIdx map[restic.ID]restic.ID // blob ID -> pack ID + packIdx map[restic.ID][]restic.Blob // pack ID -> list of blobs to be loaded from the pack +} + +func (i *index) BlobSize(id restic.ID) uint { + return i.blobSize[id] +} + +func (i *index) BlobToPack(id restic.ID) restic.ID { + return i.blobIdx[id] +} + +func (i *index) PackToBlobs(id restic.ID) []restic.Blob { + return i.packIdx[id] +} + +func (i *index) Packs() restic.IDSet { + ids := restic.NewIDSet() + for id := range i.packIdx { + ids.Insert(id) + } + return ids +} + +func createIndex(filesList []*ChunkedFile, lookupBlob func(t restic.BlobType, id restic.ID) []restic.PackedBlob) (Index, error) { + // collect blob usage info + blobCount := map[restic.ID]int{} + for _, file := range filesList { + for _, blob := range file.IDs { + blobCount[blob]++ + } + } + + // debugStats: record the number of blobs used + if debugStats != nil { + debugStats.Add("total_blob_count", len(blobCount)) + } + + // build blob lookup info + blobSize := map[restic.ID]uint{} + blobIdx := map[restic.ID]restic.ID{} + packIdx := map[restic.ID][]restic.Blob{} + for blob := range blobCount { + packs := lookupBlob(restic.DataBlob, blob) + if len(packs) == 0 { + return nil, fmt.Errorf("can't find blob from source repo: %v", blob) + } + pb := packs[0] + + blobSize[pb.Blob.ID] = pb.DataLength() + blobIdx[pb.Blob.ID] = pb.PackID + packIdx[pb.PackID] = append(packIdx[pb.PackID], pb.Blob) + } + + idx := &index{ + blobSize: blobSize, + blobIdx: blobIdx, + packIdx: packIdx, + } + + return idx, nil +} + +func (rc *Rechunker) Rechunk(ctx context.Context, srcRepo, dstRepo restic.Repository, p *Progress) error { + if dstRepo.Config().ChunkerPolynomial != rc.cfg.Pol { + return fmt.Errorf("chunker polynomial of dstRepo does not match with Rechunker's one") + } + + if !rc.rechunkReady { + return fmt.Errorf("Plan() must be run first before Rechunk()") + } + rc.rechunkReady = false + + debug.Log("Rechunk start.") + defer debug.Log("Rechunk done.") + + numWorkers := min(runtime.GOMAXPROCS(0), int(srcRepo.Connections())) + numDownloaders := numWorkers + debug.Log("srcRepo.Connections(): %v", srcRepo.Connections()) + + // set up scheduler + scheduler := rc.setupScheduler(ctx) + + // set up blob cache + var downloader restic.BlobLoader + var cache *BlobCache + if rc.cfg.CacheSize > 0 { + downloader, cache = rc.setupCache(ctx, srcRepo, scheduler, numDownloaders) + defer cache.Close() + } else { + downloader = srcRepo + } + + // run rechunk workers + bufferPool := NewBufferPool(3 * (numWorkers + 1)) + err := dstRepo.WithBlobUploader(ctx, func(ctx context.Context, uploader restic.BlobSaverWithAsync) error { + debug.Log("Starting uploader") + defer debug.Log("Closing uploader") + + workerCfg := WorkerConfig{ + Pol: rc.cfg.Pol, + + Downloader: downloader, + Uploader: uploader, + BufferPool: bufferPool, + + NewCursor: scheduler.newCursor, + UpdateCursor: scheduler.updateCursor, + } + + wg, ctx := errgroup.WithContext(ctx) + rc.runWorkers(ctx, wg, numWorkers, workerCfg, scheduler.Next, p) + rc.runWorkers(ctx, wg, 1, workerCfg, scheduler.NextPriority, p) + + return wg.Wait() + }) + if err != nil { + return err + } + + debugPrintRechunkReport(rc) + + return nil +} + +func (rc *Rechunker) setupScheduler(ctx context.Context) (scheduler *Scheduler) { + debug.Log("Running file dispatcher") + + // If the blob cache is enabled, priority dispatch will be used. + // With priority dispatch, (small) files with all their blobs ready in the cache are prioritized. + // if the blob cache is disabled, dispatch order simply follows the filesList. + if rc.cfg.CacheSize > 0 { + scheduler = NewScheduler(ctx, rc.filesList, rc.idx, true) + } else { + scheduler = NewScheduler(ctx, rc.filesList, rc.idx, false) + } + return scheduler +} + +func (rc *Rechunker) setupCache(ctx context.Context, srcRepo PackLoader, scheduler *Scheduler, numDownloaders int) (repo restic.BlobLoader, cache *BlobCache) { + debug.Log("Creating blob cache: cacheSize %v", rc.cfg.CacheSize) + + // wrap srcRepo with cache. Now repo's LoadBlob() method will be transparently mediated by blob cache + repo, cache = WrapWithCache(ctx, srcRepo, rc.cfg.CacheSize, numDownloaders, rc.idx, scheduler.BlobReady, scheduler.BlobUnready) + + // register cache.Ignore as scheduler's obsolete blob callback for early cache eviction + scheduler.SetIgnoreBlobsCallback(cache.Ignore) + + return repo, cache +} + +func (rc *Rechunker) runWorkers(ctx context.Context, wg *errgroup.Group, numWorkers int, + workerCfg WorkerConfig, receiveJob func(context.Context) (*ChunkedFile, bool, error), + p *Progress) { + for range numWorkers { + wg.Go(func() error { + debug.Log("Starting worker") + worker := NewWorker(workerCfg) + + for { + debug.Log("receiving job") + file, ok, err := receiveJob(ctx) + if err != nil { + return err + } + if !ok { + return nil + } + + debug.Log("Starting file %v", file.hashval.Str()) + result, err := worker.RunFile(ctx, file.IDs, p) + if err != nil { + return err + } + debug.Log("Finished file %v", file.hashval.Str()) + if p != nil { + p.AddFile(1) + } + + rc.totalAddedToDstRepo.Add(result.addedToRepository) + rc.rechunkMapLock.Lock() + rc.rechunkMap[file.hashval] = result.dstBlobs + rc.rechunkMapLock.Unlock() + } + }) + } +} + +// wrapper type for BlobSaver where you can define custom SaveBlob() +type wrappedBlobSaver func(ctx context.Context, tpe restic.BlobType, buf []byte, id restic.ID, storeDuplicate bool) (newID restic.ID, known bool, sizeInRepo int, err error) + +func (s wrappedBlobSaver) SaveBlob(ctx context.Context, tpe restic.BlobType, buf []byte, id restic.ID, storeDuplicate bool) (newID restic.ID, known bool, sizeInRepo int, err error) { + return s(ctx, tpe, buf, id, storeDuplicate) +} + +func (rc *Rechunker) RewriteTrees(ctx context.Context, srcRepo, dstRepo restic.Repository, treeIDs restic.IDs) (restic.IDs, error) { + result := restic.IDs{} + + rewriter := walker.NewTreeRewriter(walker.RewriteOpts{ + RewriteNode: func(node *data.Node, _ string) *data.Node { + if node == nil { + return nil + } + if node.Type != data.NodeTypeFile { + return node + } + + hashval := HashOfIDs(node.Content) + dstBlobs, ok := rc.rechunkMap[hashval] + if !ok { + panic(fmt.Errorf("can't find from rechunkBlobsMap: %v", node.Content.String())) + } + node.Content = dstBlobs + return node + }, + AllowUnstableSerialization: true, + }) + + err := dstRepo.WithBlobUploader(ctx, func(ctx context.Context, uploader restic.BlobSaverWithAsync) error { + // wrap dstRepo so that total uploaded tree blobs size can be tracked + saver := wrappedBlobSaver(func(ctx context.Context, tpe restic.BlobType, buf []byte, id restic.ID, storeDuplicate bool) (newID restic.ID, known bool, sizeInRepo int, err error) { + newID, known, sizeInRepo, err = uploader.SaveBlob(ctx, tpe, buf, id, storeDuplicate) + if err != nil { + return + } + if !known { + rc.totalAddedToDstRepo.Add(uint64(sizeInRepo)) + } + return + }) + + for _, treeID := range treeIDs { + // check if the identical tree has already been processed + newID, ok := rc.rewriteTreeMap[treeID] + if ok { + result = append(result, newID) + continue + } + + newID, err := rewriter.RewriteTree(ctx, srcRepo, saver, "/", treeID) + if err != nil { + return err + } + rc.rewriteTreeMap[treeID] = newID + result = append(result, newID) + } + + return nil + }) + if err != nil { + return nil, err + } + + return result, nil +} + +func (rc *Rechunker) GetRewrittenTree(originalTree restic.ID) (restic.ID, error) { + newID, ok := rc.rewriteTreeMap[originalTree] + if !ok { + return restic.ID{}, fmt.Errorf("rewritten tree does not exist for original tree %v", originalTree) + } + return newID, nil +} + +func (rc *Rechunker) TotalSize() uint64 { + return rc.totalSize +} + +func (rc *Rechunker) NumFiles() int { + return len(rc.filesList) +} + +func (rc *Rechunker) NumPacks() int { + return len(rc.idx.Packs()) +} + +func (rc *Rechunker) TotalAddedToDstRepo() uint64 { + return rc.totalAddedToDstRepo.Load() +} + +// HashOfIDs computes a sha256 hash of the concatenation of all values of `restic.IDs`, making a mapping from `restic.IDs` to `restic.ID`. +func HashOfIDs(ids restic.IDs) restic.ID { + c := make([]byte, 0, len(ids)*32) + for _, id := range ids { + c = append(c, id[:]...) + } + return sha256.Sum256(c) +} diff --git a/internal/rechunker/rechunker_test.go b/internal/rechunker/rechunker_test.go new file mode 100644 index 000000000..9a8b3b9f9 --- /dev/null +++ b/internal/rechunker/rechunker_test.go @@ -0,0 +1,193 @@ +package rechunker + +import ( + "context" + "fmt" + "testing" + + "github.com/restic/chunker" + + "github.com/restic/restic/internal/archiver" + "github.com/restic/restic/internal/data" + "github.com/restic/restic/internal/restic" + rtest "github.com/restic/restic/internal/test" + "github.com/restic/restic/internal/walker" +) + +// prepareData prepares random data for rechunker test. +func prepareData(t *testing.T) string { + tempdir := rtest.TempDir(t) + data := map[int][]byte{ + 1: rtest.Random(1, 10_000), + 2: rtest.Random(2, 10_000_000), + 3: rtest.Random(3, 100_000_000), + } + repo := archiver.TestDir{ + "zero": archiver.TestFile{Content: ""}, + "one": archiver.TestFile{Content: string(data[1])}, + "two": archiver.TestFile{Content: string(data[2])}, + "three": archiver.TestFile{Content: string(data[3])}, + "dir1": archiver.TestDir{ + "dir2": archiver.TestDir{ + "dup_1": archiver.TestFile{Content: string(data[1])}, + "dup_3": archiver.TestFile{Content: string(data[3])}, + }, + }, + } + archiver.TestCreateFiles(t, tempdir, repo) + + return tempdir +} + +func gatherNodesByPath(t *testing.T, repo restic.BlobLoader, root restic.ID) map[string]*data.Node { + t.Helper() + + result := map[string]*data.Node{} + err := walker.Walk(t.Context(), repo, root, walker.WalkVisitor{ + ProcessNode: func(parentTreeID restic.ID, path string, node *data.Node, nodeErr error) (err error) { + if node != nil { + result[path] = node + } + return nodeErr + }, + }) + if err != nil { + t.Fatal(err) + } + + return result +} + +func buildRechunkMapByMatchingPath(t *testing.T, srcNodes, dstNodes map[string]*data.Node) map[restic.ID]restic.IDs { + t.Helper() + + rechunkMap := map[restic.ID]restic.IDs{} + + for k, v := range srcNodes { + if v.Type != data.NodeTypeFile { + continue + } + if _, ok := dstNodes[k]; !ok { + t.Fatalf("%v expected in dstNodes, but not found", k) + } + rechunkMap[HashOfIDs(v.Content)] = dstNodes[k].Content + } + + return rechunkMap +} + +func TestRechunker(t *testing.T) { + // generate reandom polynomials + srcChunkerParam, _ := chunker.RandomPolynomial() + dstChunkerParam, _ := chunker.RandomPolynomial() + + // prepare test data + tempdir := prepareData(t) + + // prepare repositories + srcRepo := TestRepositoryWithPol(t, srcChunkerParam) + dstWantsRepo := TestRepositoryWithPol(t, dstChunkerParam) + dstTestsRepo := TestRepositoryWithPol(t, dstChunkerParam) + + srcSn := archiver.TestSnapshot(t, srcRepo, tempdir, nil) + dstWantsSn := archiver.TestSnapshot(t, dstWantsRepo, tempdir, nil) + + srcNodes := gatherNodesByPath(t, srcRepo, *srcSn.Tree) + dstWantsNodes := gatherNodesByPath(t, dstWantsRepo, *dstWantsSn.Tree) + wantedRechunkMap := buildRechunkMapByMatchingPath(t, srcNodes, dstWantsNodes) + + // run rechunk copy + rechunker := NewRechunker(Config{ + CacheSize: 4 * (1 << 30), + Pol: dstChunkerParam, + }) + + t.Run("Plan running", func(t *testing.T) { + err := rechunker.Plan(t.Context(), srcRepo, restic.IDs{*srcSn.Tree}) + if err != nil { + t.Fatal(err) + } + }) + + t.Run("Rechunk running", func(t *testing.T) { + err := rechunker.Rechunk(t.Context(), srcRepo, dstTestsRepo, nil) + if err != nil { + t.Fatal(err) + } + }) + + var testsTree restic.ID + t.Run("RewriteTrees running", func(t *testing.T) { + newID, err := rechunker.RewriteTrees(t.Context(), srcRepo, dstTestsRepo, restic.IDs{*srcSn.Tree}) + if err != nil { + t.Fatal(err) + } + testsTree = newID[0] + }) + + // compare dstTestsRepo (rechunker result) vs dstWantsRepo (reference result) + // 1) check if all expected data blobs are stored + t.Run("data blob verification", func(t *testing.T) { + inCtx, stop := context.WithCancelCause(t.Context()) + err := dstWantsRepo.ListBlobs(inCtx, func(pb restic.PackedBlob) { + if pb.Type == restic.DataBlob { + _, found := dstTestsRepo.LookupBlobSize(restic.DataBlob, pb.ID) + if !found { + stop(fmt.Errorf("blob %v expected but not found", pb.ID.Str())) + } + } + }) + if err != nil { + t.Error(err) + } + }) + + // 2) check if rechunk is done correctly by comparing rechunkMap + t.Run("rechunk mapping verification", func(t *testing.T) { + testedRechunkMap := rechunker.rechunkMap + for k, v := range wantedRechunkMap { + wants := HashOfIDs(v) + tests := HashOfIDs(testedRechunkMap[k]) + if wants != tests { + t.Errorf("rechunk result for src file %v does not match: %v expected, but got %v", k.Str(), wants.Str(), tests.Str()) + } + } + }) + + // 3) check if tree is rewritten correctly by comparing tree nodes + t.Run("tree verification", func(t *testing.T) { + testsNodes := gatherNodesByPath(t, dstTestsRepo, testsTree) + + // (i) compare Content field with dstWantsNodes + for path, node := range dstWantsNodes { + if node.Type != data.NodeTypeFile { + continue + } + if _, ok := testsNodes[path]; !ok { + t.Errorf("node for path %v does not exist", path) + continue + } + wants := HashOfIDs(node.Content) + tests := HashOfIDs(testsNodes[path].Content) + if wants != tests { + t.Errorf("node content for path %v does not match: %v expected, but got %v", path, wants.Str(), tests.Str()) + } + } + + // (ii) compare remaining fields with srcNodes + for path, wantsNode := range srcNodes { + testsNode, ok := testsNodes[path] + if !ok { + t.Errorf("node for path %v does not exist", path) + continue + } + // copy nodes and clear rewritten fields for comparison + wants, tests := *wantsNode, *testsNode + wants.Content, tests.Content = nil, nil + wants.Subtree, tests.Subtree = nil, nil + if !wants.Equals(tests) { + t.Errorf("node fields for path %v does not match", path) + } + } + }) +} diff --git a/internal/rechunker/scheduler.go b/internal/rechunker/scheduler.go new file mode 100644 index 000000000..6294664b0 --- /dev/null +++ b/internal/rechunker/scheduler.go @@ -0,0 +1,393 @@ +package rechunker + +import ( + "context" + "fmt" + "sync" + + "github.com/restic/restic/internal/debug" + "github.com/restic/restic/internal/restic" + "golang.org/x/sync/errgroup" +) + +type Scheduler struct { + mu sync.Mutex + + idx Index + + regularCh <-chan *ChunkedFile + priorityCh <-chan *ChunkedFile + + regularList []*ChunkedFile + priorityList []*ChunkedFile + + prefixLookup map[restic.ID][]*ChunkedFile // blob ID -> files that contain the blob as prefix + remainingPrefixBlobs map[restic.ID]int // file hashval -> remaining count until all its blobs ready + + remainingBlobUsage map[restic.ID]int // blob ID -> remaining blob usage until the end + + ignoreBlobsCB func(ids restic.IDs) + + push chan struct{} + done chan struct{} +} + +func NewScheduler(ctx context.Context, files []*ChunkedFile, idx Index, usePriority bool) *Scheduler { + debug.Log(("Running NewScheduler()")) + + wg, ctx := errgroup.WithContext(ctx) + filesContaining, blobsToPrepare, remainingBlobNeeds := createSchedulerState(files) + + if !usePriority { + s := &Scheduler{ + idx: idx, + regularList: files, + done: make(chan struct{}), + prefixLookup: filesContaining, + remainingPrefixBlobs: blobsToPrepare, + remainingBlobUsage: remainingBlobNeeds, + } + s.createRegularCh(ctx, wg, nil) + return s + } + + s := &Scheduler{ + idx: idx, + regularList: files, + push: make(chan struct{}, 1), + done: make(chan struct{}), + prefixLookup: filesContaining, + remainingPrefixBlobs: blobsToPrepare, + remainingBlobUsage: remainingBlobNeeds, + } + + set := restic.IDSet{} + mu := sync.Mutex{} + visited := func(id restic.ID) bool { + mu.Lock() + visited := set.Has(id) + if !visited { + set.Insert(id) + } + mu.Unlock() + return visited + } + + s.createRegularCh(ctx, wg, visited) + s.createPriorityCh(ctx, wg, visited) + + return s +} + +const FILE_HEAD_LENGTH = 25 + +func createSchedulerState(files []*ChunkedFile) (map[restic.ID][]*ChunkedFile, map[restic.ID]int, map[restic.ID]int) { + blobUsage := map[restic.ID]int{} + prefixLookup := map[restic.ID][]*ChunkedFile{} + numPrefixBlobs := map[restic.ID]int{} + + for _, file := range files { + prefixLen := min(FILE_HEAD_LENGTH, len(file.IDs)) + prefixSet := restic.NewIDSet(file.IDs[:prefixLen]...) + numPrefixBlobs[file.hashval] = len(prefixSet) + for _, blob := range file.IDs { + blobUsage[blob]++ + } + for b := range prefixSet { + prefixLookup[b] = append(prefixLookup[b], file) + } + } + + return prefixLookup, numPrefixBlobs, blobUsage +} + +func (s *Scheduler) Next(ctx context.Context) (*ChunkedFile, bool, error) { + file, from, err := PrioritySelect(ctx, s.priorityCh, s.regularCh) + return file, from != 0, err +} + +func (s *Scheduler) NextPriority(ctx context.Context) (*ChunkedFile, bool, error) { + if s.priorityCh == nil { + return nil, false, nil + } + file, from, err := PrioritySelect(ctx, s.priorityCh, nil) + return file, from != 0, err +} + +func (s *Scheduler) pushPriority(files []*ChunkedFile) { + if s.priorityCh == nil { + return + } + + s.mu.Lock() + defer s.mu.Unlock() + + s.priorityList = append(s.priorityList, files...) + + select { + case s.push <- struct{}{}: + default: + } +} + +func (s *Scheduler) popPriority() []*ChunkedFile { + s.mu.Lock() + defer s.mu.Unlock() + + l := s.priorityList + s.priorityList = nil + + return l +} + +func (s *Scheduler) createRegularCh(ctx context.Context, wg *errgroup.Group, visited func(id restic.ID) bool) { + debug.Log("Running scheduler for regular channel") + ch := make(chan *ChunkedFile) + wg.Go(func() error { + defer close(s.done) + defer close(ch) + + for _, file := range s.regularList { + if visited != nil && visited(file.hashval) { + continue + } + + select { + case <-ctx.Done(): + return ctx.Err() + case ch <- file: + debug.Log("Sent file %v through regular channel", file.hashval.Str()) + } + } + + return nil + }) + + s.regularCh = ch +} + +func (s *Scheduler) createPriorityCh(ctx context.Context, wg *errgroup.Group, visited func(id restic.ID) bool) { + debug.Log("Running scheduler for priority channel") + ch := make(chan *ChunkedFile) + wg.Go(func() error { + defer close(ch) + + var list []*ChunkedFile + for { + if len(list) == 0 { + select { + case <-ctx.Done(): + return ctx.Err() + case <-s.push: + list = s.popPriority() + debug.Log("Detected priority files whose count is %v", len(list)) + continue + case <-s.done: + debug.Log("Closing scheduler for priority channel") + return nil + } + } + + file := list[0] + list = list[1:] + + if visited != nil && visited(file.hashval) { + continue + } + + select { + case <-ctx.Done(): + return ctx.Err() + case ch <- file: + debug.Log("Sent file %v through priority channel", file.hashval.Str()) + } + } + }) + + s.priorityCh = ch +} + +func (s *Scheduler) BlobReady(ids restic.IDs) { + // when a new blob is ready, files containing that blob as their prefix + // has their blobsToPrepare decreased by one. + // The list of files whose blobs are all prepared is pushed to priority chan. + + if s.priorityCh == nil { + // if there is no priority chan, it is of no meaning to track the state + return + } + + var readyFiles []*ChunkedFile + + s.mu.Lock() + for _, id := range ids { + for _, file := range s.prefixLookup[id] { + n := s.remainingPrefixBlobs[file.hashval] + if n > 0 { + n-- + if n == 0 { + readyFiles = append(readyFiles, file) + } + s.remainingPrefixBlobs[file.hashval] = n + } + } + } + s.mu.Unlock() + + if len(readyFiles) == 0 { + return + } + + s.pushPriority(readyFiles) + + if debugStats != nil { + dAdds := map[string]int{} + for _, id := range ids { + dAdds["load:"+id.String()]++ + } + debugStats.AddMap(dAdds) + } +} + +func (s *Scheduler) BlobUnready(ids restic.IDs) { + // when a blob is evicted, files containing that blob as their prefix + // has their blobsToPrepare increased by one. However, ignore files + // once they have reached blobsToPrepare value zero; they are no longer tracked. + + if s.priorityCh == nil { + // if there is no priority chan, it is of no meaning to track progress + return + } + + s.mu.Lock() + for _, id := range ids { + filesToUpdate := s.prefixLookup[id] + for _, file := range filesToUpdate { + // files with blobsToPrepare==0 is not tracked + if s.remainingPrefixBlobs[file.hashval] > 0 { + s.remainingPrefixBlobs[file.hashval]++ + } + } + } + s.mu.Unlock() +} + +func (s *Scheduler) SetIgnoreBlobsCallback(cb func(restic.IDs)) { + s.mu.Lock() + defer s.mu.Unlock() + + s.ignoreBlobsCB = cb +} + +func (s *Scheduler) newCursor(blobs restic.IDs) cursor { + if s == nil { + return cursor{} + } + + return cursor{ + blobs: blobs, + blobSize: s.idx.BlobSize, + } +} + +// updateCursor computes progress of cursor for a file, while inferring src blob consumption and using that info to track blob usage. +func (s *Scheduler) updateCursor(c cursor, bytesProcessed uint) (cursor, error) { + start := c + end, err := c.Advance(bytesProcessed) + if err != nil { + return cursor{}, err + } + + if start.BlobIdx == end.BlobIdx { + return end, nil + } + + blobs := c.blobs[start.BlobIdx:end.BlobIdx] + var obsolete restic.IDs + s.mu.Lock() + for _, b := range blobs { + s.remainingBlobUsage[b]-- + if s.remainingBlobUsage[b] == 0 { + obsolete = append(obsolete, b) + } + } + s.mu.Unlock() + + if len(obsolete) == 0 { + return end, nil + } + + if s.ignoreBlobsCB != nil { + s.ignoreBlobsCB(obsolete) + } + + return end, nil +} + +type cursor struct { + blobs restic.IDs + BlobIdx int + Offset uint + blobSize func(restic.ID) uint +} + +func (c cursor) Advance(numBytes uint) (cursor, error) { + if c.blobs == nil { + return cursor{}, nil + } + + for c.BlobIdx < len(c.blobs) { + blobSize := c.blobSize(c.blobs[c.BlobIdx]) + if blobSize == 0 { + return cursor{}, fmt.Errorf("unknown blob %v", c.blobs[c.BlobIdx].Str()) + } + r := blobSize - c.Offset + + if numBytes < r { + c.Offset += numBytes + numBytes = 0 + break + } + + numBytes -= r + c.BlobIdx++ + c.Offset = 0 + } + + if numBytes != 0 { + return cursor{}, fmt.Errorf("cursor out of range; %d bytes over end position", numBytes) + } + + return c, nil +} + +// PrioritySelect selects from two channels with priority; first channel first. +func PrioritySelect(ctx context.Context, first <-chan *ChunkedFile, second <-chan *ChunkedFile) (item *ChunkedFile, from int, err error) { + // First, try to pull from channel 'first' only. If 'first' is not ready now, try both channels. + select { + case <-ctx.Done(): + return nil, 0, ctx.Err() + case i, ok := <-first: + if ok { + item = i + from = 1 + } + default: + select { + case <-ctx.Done(): + return nil, 0, ctx.Err() + case i, ok := <-first: + if ok { + item = i + from = 1 + } + case i, ok := <-second: + if ok { + item = i + from = 2 + } + } + } + + return item, from, nil +} diff --git a/internal/rechunker/testing.go b/internal/rechunker/testing.go new file mode 100644 index 000000000..75e7e465b --- /dev/null +++ b/internal/rechunker/testing.go @@ -0,0 +1,30 @@ +package rechunker + +import ( + "context" + "testing" + + "github.com/restic/chunker" + "github.com/restic/restic/internal/repository" + "github.com/restic/restic/internal/restic" + "github.com/restic/restic/internal/test" +) + +func TestRepositoryWithPol(t *testing.T, pol chunker.Pol) restic.Repository { + t.Helper() + + be := repository.TestBackend(t) + + repo, err := repository.New(be, repository.Options{}) + if err != nil { + t.Fatalf("TestRepository(): new repo failed: %v", err) + } + + var version uint = restic.StableRepoVersion + err = repo.Init(context.TODO(), version, test.TestPassword, &pol) + if err != nil { + t.Fatalf("TestRepository(): initialize repo failed: %v", err) + } + + return repo +} diff --git a/internal/rechunker/worker.go b/internal/rechunker/worker.go new file mode 100644 index 000000000..9f433ca62 --- /dev/null +++ b/internal/rechunker/worker.go @@ -0,0 +1,243 @@ +package rechunker + +import ( + "context" + "io" + + "github.com/restic/chunker" + "github.com/restic/restic/internal/debug" + "github.com/restic/restic/internal/restic" + "golang.org/x/sync/errgroup" +) + +type FileResult struct { + dstBlobs restic.IDs + addedToRepository uint64 +} +type Worker struct { + pool *BufferPool + + chunker *chunker.Chunker + pol chunker.Pol + downloader restic.BlobLoader + uploader restic.BlobSaver + + newCursor func(blobs restic.IDs) cursor + updateCursor func(c cursor, numBytes uint) (cursor, error) +} +type WorkerConfig struct { + Pol chunker.Pol + + Downloader restic.BlobLoader + Uploader restic.BlobSaver + BufferPool *BufferPool + + NewCursor func(blobs restic.IDs) cursor + UpdateCursor func(c cursor, numBytes uint) (cursor, error) +} + +func NewWorker(cfg WorkerConfig) *Worker { + if cfg.BufferPool == nil { + cfg.BufferPool = NewBufferPool(3) + } + return &Worker{ + pool: cfg.BufferPool, + + chunker: chunker.New(nil, cfg.Pol), + pol: cfg.Pol, + downloader: cfg.Downloader, + uploader: cfg.Uploader, + + newCursor: cfg.NewCursor, + updateCursor: cfg.UpdateCursor, + } +} + +func (w *Worker) RunFile(ctx context.Context, srcBlobs restic.IDs, p *Progress) (FileResult, error) { + buf := w.pool.Get() + + // setup reader + reader := NewBlobSequenceReader(ctx, srcBlobs, w.downloader, buf) + + // Run worker pipeline (reader and writer) + wg, ctx := errgroup.WithContext(ctx) + + chChunk := make(chan chunker.Chunk) // chunk passing channel from reader to writer + chResult := make(chan FileResult, 1) // file rechunk result channel + + // Run reader goroutine + w.runReader(ctx, wg, srcBlobs, reader, chChunk) + + // Run writer goroutine + w.runWriter(ctx, wg, chChunk, chResult, p) + + if err := wg.Wait(); err != nil { + return FileResult{}, err + } + + result := <-chResult + + w.pool.Put(buf) + + return result, nil +} + +func (w *Worker) runReader(ctx context.Context, wg *errgroup.Group, srcBlobs restic.IDs, reader *BlobSequenceReader, out chan<- chunker.Chunk) { + debug.Log("Starting reader goroutine") + wg.Go(func() error { + defer close(out) + + w.chunker.Reset(reader, w.pol) + + var c cursor + if w.newCursor != nil { + c = w.newCursor(srcBlobs) + } + + for { + // bring buffer from bufferPool + buf := w.pool.Get() + + // rechunk with new parameter + chunk, err := w.chunker.Next(buf) + if err == io.EOF { // reached EOF; all done + w.pool.Put(buf) + return nil + } + if err != nil { + return err + } + + if w.updateCursor != nil { + c, err = w.updateCursor(c, chunk.Length) + if err != nil { + return err + } + } + + // send a rechunked chunk to the writer + select { + case <-ctx.Done(): + return ctx.Err() + case out <- chunk: + debug.Log("Sending a new chunk of size %v to writer", chunk.Length) + } + } + }) +} + +func (w *Worker) runWriter(ctx context.Context, wg *errgroup.Group, in <-chan chunker.Chunk, out chan<- FileResult, p *Progress) { + debug.Log("Starting writer goroutine") + wg.Go(func() error { + defer close(out) + + dstBlobs := restic.IDs{} + var addedSize uint64 + + for { + // receive chunk from the reader + var c chunker.Chunk + var ok bool + select { + case <-ctx.Done(): + return ctx.Err() + case c, ok = <-in: + if !ok { // EOF + out <- FileResult{ + dstBlobs: dstBlobs, + addedToRepository: addedSize, + } + return nil + } + } + + // save chunk to destination repo + dstBlobID, known, size, err := w.uploader.SaveBlob(ctx, restic.DataBlob, c.Data, restic.ID{}, false) + if err != nil { + return err + } + if !known { + addedSize += uint64(size) + debug.Log("Stored new dst chunk %v into dstRepo", dstBlobID.Str()) + } + + if p != nil { + p.AddBlob(uint64(c.Length)) + } + + // recycle used buffer into bufferPool + w.pool.Put(c.Data) + + dstBlobs = append(dstBlobs, dstBlobID) + } + }) +} + +type BlobSequenceReader struct { + ctx context.Context + downloader restic.BlobLoader + + blobs restic.IDs + + data []byte // data of the current blob being read + buf []byte // reused buffer space +} + +func NewBlobSequenceReader(ctx context.Context, blobs restic.IDs, downloader restic.BlobLoader, buf []byte) *BlobSequenceReader { + return &BlobSequenceReader{ + ctx: ctx, + blobs: blobs, + downloader: downloader, + buf: buf, + } +} + +func (r *BlobSequenceReader) Read(p []byte) (n int, err error) { + if len(r.data) == 0 { + // out of data; load the next blob + if len(r.blobs) == 0 { + return 0, io.EOF + } + + // bring the blob data from backend + r.data, err = r.downloader.LoadBlob(r.ctx, restic.DataBlob, r.blobs[0], r.buf) + if err != nil { + return 0, err + } + + r.blobs = r.blobs[1:] + } + + // copy data from currentBuf to p + n = copy(p, r.data) + r.data = r.data[n:] + return n, nil +} + +type BufferPool struct { + c chan []byte +} + +func NewBufferPool(cap int) *BufferPool { + return &BufferPool{ + c: make(chan []byte, cap), + } +} + +func (p *BufferPool) Get() []byte { + select { + case buf := <-p.c: + return buf[:0] + default: + debug.Log("Allocating new buffer") + return make([]byte, 0, chunker.MaxSize) + } +} + +func (p *BufferPool) Put(buf []byte) { + select { + case p.c <- buf: + default: + debug.Log("bufferPool is full; discarding the buffer") + } +}