This commit is contained in:
Donggyu Kim 2026-05-20 21:52:17 +01:00 committed by GitHub
commit d575d62b6e
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
13 changed files with 2060 additions and 6 deletions

View file

@ -0,0 +1,26 @@
Enhancement: Add rechunk copy feature
Restic didn't rechunk data blobs when copying snapshots between repositories
with different chunker parameters. Instead, it copied the blobs as-is,
which impaired deduplication powered by [Content Defined Chunking](https://restic.net/blog/2015-09-12/restic-foundation1-cdc/).
To mitigate this issue, users had to manually restore the snapshots somewhere,
and then backup them again to the new repository. This workaround was
inefficient, prone to tamper with the original metadata, and bothersome.
It now supports `--rechunk` option in `copy` command, in which the data are rechunked while copying.
Currently, it does not automatically skip previously copied snapshots. Also,
it does not remember which files had been rechunked in previous runs, so it will
try to rework on every file again in the next run (though it would not add new
data blobs to the repository in that case; this is what deduplication is for).
Therefore, current `copy --rechunk` is adequate for one-time migration between repositories.
For incremental copy scenarios, `copy` between repositories with same chunker
parameter is ideal.
`copy --rechunk` has a few additional options. It has `--force` option to force rechunk copy
even when the chunker parameters are same between source and destination repositories,
and `--cache-size` option to specify the in-memory blob cache size during rechunk copy process.
Also, it has `--add-tag` option to add tags to the copied snapshots in the destination repo.
https://github.com/restic/restic/issues/5473
https://forum.restic.net/t/is-it-possible-to-re-chunk-after-a-restic-copy/6072
https://forum.restic.net/t/copy-snapshots-between-repositories-without-copy-chunker-params/7320

View file

@ -10,7 +10,9 @@ import (
"github.com/restic/restic/internal/data"
"github.com/restic/restic/internal/debug"
"github.com/restic/restic/internal/errors"
"github.com/restic/restic/internal/feature"
"github.com/restic/restic/internal/global"
"github.com/restic/restic/internal/rechunker"
"github.com/restic/restic/internal/repository"
"github.com/restic/restic/internal/restic"
"github.com/restic/restic/internal/ui"
@ -65,11 +67,28 @@ Exit status is 12 if the password is incorrect.
type CopyOptions struct {
global.SecondaryRepoOptions
data.SnapshotFilter
RechunkCopyOptions
}
func (opts *CopyOptions) AddFlags(f *pflag.FlagSet) {
opts.SecondaryRepoOptions.AddFlags(f, "destination", "to copy snapshots from")
initMultiSnapshotFilter(f, &opts.SnapshotFilter, true)
opts.RechunkCopyOptions.AddFlags(f)
}
type RechunkCopyOptions struct {
Rechunk bool
ForceRechunk bool
AddTags data.TagLists
CacheSize int
isIntegrationTest bool // skip check for RESTIC_FEATURES=rechunk-copy during integration test
}
func (opts *RechunkCopyOptions) AddFlags(f *pflag.FlagSet) {
f.BoolVar(&opts.Rechunk, "rechunk", false, "rechunk files when copying")
f.BoolVar(&opts.ForceRechunk, "force", false, "force rechunk even when src and dst repo have same chunker polynomials; to be used with --rechunk")
f.Var(&opts.AddTags, "add-tag", "add `tags` for the copied snapshots in the format `tag[,tag,...]` (can be specified multiple times). Used with --rechunk")
f.IntVar(&opts.CacheSize, "cache-size", 4096, "for rechunk copy, specify in-memory blob cache size in MiBs (0 to disable cache). Used with --rechunk")
}
// collectAllSnapshots: select all snapshot trees to be copied
@ -106,6 +125,17 @@ func collectAllSnapshots(ctx context.Context, opts CopyOptions,
}
func runCopy(ctx context.Context, opts CopyOptions, gopts global.Options, args []string, term ui.Terminal) error {
// Rechunk-copy guardrails
if opts.Rechunk {
debug.Log("Rechunk option enabled")
if !feature.Flag.Enabled(feature.RechunkCopy) && !opts.isIntegrationTest {
return errors.Fatal("rechunk-copy feature flag is not set. Currently, rechunk-copy is alpha feature (disabled by default).")
}
if opts.CacheSize != 0 && opts.CacheSize < 100 {
return errors.Fatal("blob cache size must be at least 100 MiB")
}
}
printer := ui.NewProgressPrinter(false, gopts.Verbosity, term)
secondaryGopts, isFromRepo, err := opts.SecondaryRepoOptions.FillGlobalOpts(ctx, gopts, "destination")
if err != nil {
@ -128,6 +158,11 @@ func runCopy(ctx context.Context, opts CopyOptions, gopts global.Options, args [
}
defer unlock()
// if rechunk is enabled, ensure srcRepo and dstRepo have different ChunkerPolynomials
if opts.Rechunk && !opts.ForceRechunk && srcRepo.Config().ChunkerPolynomial == dstRepo.Config().ChunkerPolynomial {
return errors.Fatal("source repo and destination repo have same chunker polynomials; run without `--rechunk`, or set `--force` flag to proceed with rechunk anyway")
}
srcSnapshotLister, err := restic.MemorizeList(ctx, srcRepo, restic.SnapshotFile)
if err != nil {
return err
@ -161,8 +196,23 @@ func runCopy(ctx context.Context, opts CopyOptions, gopts global.Options, args [
selectedSnapshots := collectAllSnapshots(ctx, opts, srcSnapshotLister, srcRepo, dstSnapshotByOriginal, args, printer)
if err := copyTreeBatched(ctx, srcRepo, dstRepo, selectedSnapshots, printer); err != nil {
return err
if !opts.Rechunk {
if err := copyTreeBatched(ctx, srcRepo, dstRepo, selectedSnapshots, printer); err != nil {
return err
}
} else {
rechnker := rechunker.NewRechunker(rechunker.Config{
CacheSize: opts.CacheSize * (1 << 20),
Pol: dstRepo.Config().ChunkerPolynomial,
})
progress := rechunker.NewProgress(
term,
printer,
ui.CalculateProgressInterval(!gopts.Quiet, gopts.JSON, term.CanUpdateStatus()),
)
if err := rechunkCopy(ctx, srcRepo, dstRepo, selectedSnapshots, rechnker, printer, progress, opts.AddTags.Flatten()); err != nil {
return err
}
}
return ctx.Err()
@ -342,3 +392,62 @@ func copySaveSnapshot(ctx context.Context, sn *data.Snapshot, dstRepo restic.Rep
printer.P("snapshot %s saved, copied from source snapshot %s", newID.Str(), sn.ID().Str())
return nil
}
func rechunkCopy(ctx context.Context, srcRepo, dstRepo restic.Repository, selectedSnapshots iter.Seq[*data.Snapshot],
rechnker *rechunker.Rechunker, printer progress.Printer, progress *rechunker.Progress, tags data.TagList) error {
printer.V("Gathering snapshots...")
var snapshots []*data.Snapshot
var rootTrees restic.IDs
debug.Log("Gathering root trees from selectedSnapshots()")
selectedSnapshots(func(sn *data.Snapshot) bool {
snapshots = append(snapshots, sn)
rootTrees = append(rootTrees, *sn.Tree)
return true
})
printer.V("Scanning files to process... ")
debug.Log("Running Plan()")
err := rechnker.Plan(ctx, srcRepo, rootTrees)
if err != nil {
return err
}
printer.V("\n[Pre-run Summary]")
// num_snapshots, num_distinct_files, total_size, num_packs,
printer.V("Number of snapshots: %v", len(rootTrees))
printer.V("Number of distinct files to process: %v", rechnker.NumFiles())
printer.V(" - Total size (including duplicate blobs): %v", ui.FormatBytes(rechnker.TotalSize()))
printer.V("Number of packs to download: %v\n\n", rechnker.NumPacks())
debug.Log("Running Rechunk()")
progress.Start(rechnker.NumFiles(), rechnker.TotalSize())
err = rechnker.Rechunk(ctx, srcRepo, dstRepo, progress)
if err != nil {
return err
}
progress.Done()
printer.V("Rewriting trees...")
debug.Log("Running RewriteTrees()")
_, err = rechnker.RewriteTrees(ctx, srcRepo, dstRepo, rootTrees)
if err != nil {
return err
}
printer.V("Writing snapshots...")
for _, sn := range snapshots {
newTreeID, err := rechnker.GetRewrittenTree(*sn.Tree)
if err != nil {
return err
}
sn.Tree = &newTreeID
sn.AddTags(tags)
if err = copySaveSnapshot(ctx, sn, dstRepo, printer); err != nil {
return err
}
}
printer.P("Additional data stored to the repository: %v", ui.FormatBytes(rechnker.TotalAddedToDstRepo()))
return nil
}

View file

@ -30,11 +30,35 @@ func testRunCopy(t testing.TB, srcGopts global.Options, dstGopts global.Options)
}))
}
func testRunRechunkCopy(t testing.TB, srcGopts global.Options, dstGopts global.Options) {
gopts := srcGopts
gopts.Repo = dstGopts.Repo
gopts.Password = dstGopts.Password
gopts.InsecureNoPassword = dstGopts.InsecureNoPassword
copyOpts := CopyOptions{
SecondaryRepoOptions: global.SecondaryRepoOptions{
Repo: srcGopts.Repo,
Password: srcGopts.Password,
InsecureNoPassword: srcGopts.InsecureNoPassword,
},
RechunkCopyOptions: RechunkCopyOptions{
Rechunk: true,
isIntegrationTest: true,
},
}
rtest.OK(t, withTermStatus(t, gopts, func(ctx context.Context, gopts global.Options) error {
return runCopy(context.TODO(), copyOpts, gopts, nil, gopts.Term)
}))
}
func TestCopy(t *testing.T) {
env, cleanup := withTestEnvironment(t)
defer cleanup()
env2, cleanup2 := withTestEnvironment(t)
defer cleanup2()
env3, cleanup3 := withTestEnvironment(t) // test env for rechunk-copy
defer cleanup3()
testSetupBackupData(t, env)
opts := BackupOptions{}
@ -46,8 +70,12 @@ func TestCopy(t *testing.T) {
testRunInit(t, env2.gopts)
testRunCopy(t, env.gopts, env2.gopts)
testRunInit(t, env3.gopts)
testRunRechunkCopy(t, env.gopts, env3.gopts)
snapshotIDs := testListSnapshots(t, env.gopts, 3)
copiedSnapshotIDs := testListSnapshots(t, env2.gopts, 3)
rechunkCopiedSnapshotIDs := testListSnapshots(t, env3.gopts, 3)
// Check that the copies size seems reasonable
stat := dirStats(t, env.repo)
@ -61,12 +89,15 @@ func TestCopy(t *testing.T) {
// Check integrity of the copy
testRunCheck(t, env2.gopts)
testRunCheck(t, env3.gopts)
// Check that the copied snapshots have the same tree contents as the old ones (= identical tree hash)
origRestores := make(map[string]struct{})
origRestores2 := make(map[string]struct{})
for i, snapshotID := range snapshotIDs {
restoredir := filepath.Join(env.base, fmt.Sprintf("restore%d", i))
origRestores[restoredir] = struct{}{}
origRestores2[restoredir] = struct{}{}
testRunRestore(t, env.gopts, restoredir, snapshotID.String())
}
for i, snapshotID := range copiedSnapshotIDs {
@ -84,7 +115,24 @@ func TestCopy(t *testing.T) {
rtest.Assert(t, foundMatch, "found no counterpart for snapshot %v", snapshotID)
}
// Check that the rechunk-copied snapshots have the same tree contents as the old ones
for i, snapshotID := range rechunkCopiedSnapshotIDs {
restoredir := filepath.Join(env3.base, fmt.Sprintf("restore%d", i))
testRunRestore(t, env3.gopts, restoredir, snapshotID.String())
foundMatch := false
for cmpdir := range origRestores2 {
diff := directoriesContentsDiff(t, restoredir, cmpdir)
if diff == "" {
delete(origRestores2, cmpdir)
foundMatch = true
}
}
rtest.Assert(t, foundMatch, "found no counterpart for snapshot %v", snapshotID)
}
rtest.Assert(t, len(origRestores) == 0, "found not copied snapshots")
rtest.Assert(t, len(origRestores2) == 0, "found not rechunk-copied snapshots")
// check that snapshots were properly batched while copying
_, _, countBlobs := testPackAndBlobCounts(t, env.gopts)
@ -166,6 +214,8 @@ func TestCopyUnstableJSON(t *testing.T) {
defer cleanup()
env2, cleanup2 := withTestEnvironment(t)
defer cleanup2()
env3, cleanup3 := withTestEnvironment(t) // test env for rechunk-copy
defer cleanup3()
// contains a symlink created using `ln -s '../i/'$'\355\246\361''d/samba' broken-symlink`
datafile := filepath.Join("testdata", "copy-unstable-json.tar.gz")
@ -175,6 +225,11 @@ func TestCopyUnstableJSON(t *testing.T) {
testRunCopy(t, env.gopts, env2.gopts)
testRunCheck(t, env2.gopts)
testListSnapshots(t, env2.gopts, 1)
testRunInit(t, env3.gopts)
testRunRechunkCopy(t, env.gopts, env3.gopts)
testRunCheck(t, env3.gopts)
testListSnapshots(t, env3.gopts, 1)
}
func TestCopyToEmptyPassword(t *testing.T) {
@ -184,14 +239,22 @@ func TestCopyToEmptyPassword(t *testing.T) {
defer cleanup2()
env2.gopts.Password = ""
env2.gopts.InsecureNoPassword = true
env3, cleanup3 := withTestEnvironment(t) // test env for rechunk-copy
defer cleanup3()
env3.gopts.Password = ""
env3.gopts.InsecureNoPassword = true
testSetupBackupData(t, env)
testRunBackup(t, "", []string{filepath.Join(env.testdata, "0", "0", "9")}, BackupOptions{}, env.gopts)
testRunInit(t, env2.gopts)
testRunCopy(t, env.gopts, env2.gopts)
testRunInit(t, env3.gopts)
testRunRechunkCopy(t, env.gopts, env3.gopts)
testListSnapshots(t, env.gopts, 1)
testListSnapshots(t, env2.gopts, 1)
testListSnapshots(t, env3.gopts, 1)
testRunCheck(t, env2.gopts)
testRunCheck(t, env3.gopts)
}

View file

@ -240,7 +240,7 @@ messages can appear some time after the snapshot content was copied.
source and destination repository. This *may incur higher bandwidth usage
and costs* than expected during normal backup runs.
.. important:: The copying process does not re-chunk files, which may break
.. important:: The plain copy command does not re-chunk files, which may break
deduplication between the files copied and files already stored in the
destination repository. This means that copied files, which existed in
both the source and destination repository, *may occupy up to twice their
@ -292,9 +292,11 @@ Ensuring deduplication for copied snapshots
-------------------------------------------
Even though the copy command can transfer snapshots between arbitrary repositories,
deduplication between snapshots from the source and destination repository may not work.
To ensure proper deduplication, both repositories have to use the same parameters for
splitting large files into smaller chunks, which requires additional setup steps. With
deduplication between snapshots from the source and destination repository may not work
with plain copy command. There are two methods to ensure proper deduplication between
repositories. First one is to use ``--rechunk`` option described below. Second one is
to make both repositories use the same parameters for splitting large files into smaller
chunks, which requires additional setup steps. With
the same parameters restic will for both repositories split identical files into
identical chunks and therefore deduplication also works for snapshots copied between
these repositories.
@ -309,6 +311,36 @@ using the same chunker parameters as the source repository:
Note that it is not possible to change the chunker parameters of an existing repository.
Rechunk copy between repositories with different chunker parameters
-------------------------------------------------------------------
The ``copy --rechunk`` command re-chunks files with destination repository's chunker parameters
when copying snapshots. There are a few command-line options used with ``--rechunk``.
First is ``--force``, which forces it to rechunk files even when the chunker parameters are same for
the source and destination repositories.
Second is ``--cache-size``. The rechunk-copy command uses in-memory cache for
rechunking, whose default size is 4096 MiB. You can customize the cache size,
adapting for your system's RAM size and desired memory usage. Note that a small cache size will lead to
frequent re-download of packs, which is especially undesirable for remote source repositories.
Third is ``--add-tag``, which adds tags to the copied snapshots in the destination repo.
The below commands are all valid ones.
.. code-block:: console
$ restic -r /srv/dst-repo copy --rechunk --from-repo /srv/src-repo
$ restic -r /srv/dst-repo copy --rechunk --from-repo /srv/src-repo --host luigi --path /srv/data --tag foo,bar
$ restic -r /srv/dst-repo copy --rechunk --add-tag my-rechunk --from-repo /srv/src-repo 34c9e85f 2714b65a
$ restic -r /srv/dst-repo copy --rechunk --cache-size 8192 --from-repo /srv/src-repo # set cache size to 8192 MiB
.. note:: Although the ``copy --rechunk`` command can provide on-demand deduplication between
repositories with different chunker parameters, there are a few disadvantages compared
to the plain copy. The rechunk copy is slower because it re-assembles
all files and does the same all computations which are done during backup. Also, as of now,
the rechunk copy does not support skipping redundant snapshots, so you should
manually specify the exact snapshots to copy. Therefore, it is recommended to use
repositories with the same chunker parameter if you plan to copy regularly between your repositories.
Removing files from snapshots
=============================

View file

@ -10,6 +10,7 @@ const (
DeprecateS3LegacyLayout FlagName = "deprecate-s3-legacy-layout"
DeviceIDForHardlinks FlagName = "device-id-for-hardlinks"
ExplicitS3AnonymousAuth FlagName = "explicit-s3-anonymous-auth"
RechunkCopy FlagName = "rechunk-copy"
SafeForgetKeepTags FlagName = "safe-forget-keep-tags"
S3Restore FlagName = "s3-restore"
)
@ -21,6 +22,7 @@ func init() {
DeprecateS3LegacyLayout: {Type: Stable, Description: "disable support for S3 legacy layout used up to restic 0.7.0. Use restic 0.17.3 to migrate if necessary."},
DeviceIDForHardlinks: {Type: Alpha, Description: "store deviceID only for hardlinks to reduce metadata changes for example when using btrfs subvolumes. Will be removed in a future restic version after repository format 3 is available"},
ExplicitS3AnonymousAuth: {Type: Stable, Description: "forbid anonymous S3 authentication unless `-o s3.unsafe-anonymous-auth=true` is set"},
RechunkCopy: {Type: Alpha, Description: "enable rechunk-copy command, where it rechunks data blobs while copying snapshots. This command is not stable yet, so use with caution."},
SafeForgetKeepTags: {Type: Stable, Description: "prevent deleting all snapshots if the tag passed to `forget --keep-tags tagname` does not exist"},
S3Restore: {Type: Alpha, Description: "restore S3 objects from cold storage classes when `-o s3.enable-restore=true` is set"},
})

View file

@ -0,0 +1,344 @@
package rechunker
import (
"context"
"fmt"
"sync"
"github.com/hashicorp/golang-lru/v2/simplelru"
"github.com/restic/restic/internal/debug"
"github.com/restic/restic/internal/restic"
"golang.org/x/sync/errgroup"
)
type BlobCache struct {
mu sync.RWMutex
c *simplelru.LRU[restic.ID, []byte]
idx Index
free, size int
waitList restic.IDSet // set of packs waiting for download
inProgress map[restic.ID]chan struct{} // blob ready event channel; open by requestDownload(), closed by downloaders
downloadCh chan restic.ID // pack download request channel; produced by requestDownload(), consumed by downloaders
ignored restic.IDSet // set of ignored blobs; blobs in this set are excluded from download
cancel func() // this function is called at Close(), cancelling cache context
}
const overhead = len(restic.ID{}) + 64
func NewBlobCache(ctx context.Context, size int, numDownloaders int,
repo PackLoader, idx Index,
onReady func(blobIDs restic.IDs), onEvict func(blobIDs restic.IDs)) *BlobCache {
if size < 32*(1<<20) {
panic("Blob cache size should be at least 32 MiB!!")
}
debug.Log("Creating blob cache of size %v", size)
ctx, cancel := context.WithCancel(ctx)
c := &BlobCache{
idx: idx,
size: size,
free: size,
waitList: restic.NewIDSet(),
inProgress: map[restic.ID]chan struct{}{},
downloadCh: make(chan restic.ID),
ignored: restic.NewIDSet(),
cancel: cancel,
}
lru, err := simplelru.NewLRU(size, func(_ restic.ID, v []byte) {
c.free += cap(v) + overhead
})
if err != nil {
panic(err)
}
c.c = lru
// create download function that uses repo's LoadBlobsFromPack
download := createDownloadFn(ctx, repo)
c.startDownloaders(ctx, numDownloaders, download, onReady, onEvict)
return c
}
type blobMap = map[restic.ID][]byte
type downloadFn func(packID restic.ID, blobs []restic.Blob) (blobMap, error)
func createDownloadFn(ctx context.Context, repo PackLoader) downloadFn {
return func(packID restic.ID, blobs []restic.Blob) (blobMap, error) {
bm := blobMap{}
err := repo.LoadBlobsFromPack(ctx, packID, blobs,
func(blob restic.BlobHandle, buf []byte, err error) error {
if err != nil {
return err
}
newBuf := make([]byte, len(buf))
copy(newBuf, buf)
bm[blob.ID] = newBuf
return nil
})
if err != nil {
return blobMap{}, err
}
return bm, nil
}
}
func (c *BlobCache) startDownloaders(ctx context.Context, numDownloaders int,
download downloadFn, onReady, onEvict func(blobIDs restic.IDs)) {
wg, ctx := errgroup.WithContext(ctx)
for range numDownloaders {
wg.Go(func() error {
debug.Log("Starting blob cache downloader")
defer debug.Log("Stopping blob cache downloader")
for {
// listen to pack download request
var packID restic.ID
select {
case <-ctx.Done():
return ctx.Err()
case packID = <-c.downloadCh:
}
// filter out ignored blobs
c.mu.RLock()
var filtered []restic.Blob
for _, blob := range c.idx.PackToBlobs(packID) {
ignored := c.ignored.Has(blob.ID)
ready := c.c.Contains(blob.ID)
if !ignored && !ready {
filtered = append(filtered, blob)
}
}
c.mu.RUnlock()
// skip if no blobs to download
if len(filtered) == 0 {
continue
}
// download blobs from the repo
debug.Log("Starting download of %v blobs in pack %v", len(filtered), packID.Str())
blobs, err := download(packID, filtered)
if err != nil {
return err
}
// pop the pack from the waitlist,
// store downloaded blobs to the cache,
// and notify that blobs are ready
var ready, evicted restic.IDs
c.mu.Lock()
delete(c.waitList, packID)
for id, data := range blobs {
size := cap(data) + overhead
for size > c.free { // evict old blobs if there is not enough free space
id, _, ok := c.c.RemoveOldest()
if ok {
evicted = append(evicted, id)
} else {
defer c.mu.Unlock()
return fmt.Errorf("not enough cache size to store a blob; needs at least %d bytes, but has only %d bytes", size, c.free)
}
}
c.c.Add(id, data)
c.free -= size
if _, ok := c.inProgress[id]; ok {
close(c.inProgress[id])
delete(c.inProgress, id)
}
ready = append(ready, id)
}
currentCacheUsage := c.size - c.free // for debug logging
c.mu.Unlock()
// execute callbacks
if len(evicted) > 0 {
if onEvict != nil {
onEvict(evicted)
}
debug.Log("%v blobs are evicted.", len(evicted))
}
if onReady != nil {
onReady(ready)
}
debug.Log("Pack %v loaded. Current cache usage: %v", packID.Str(), currentCacheUsage)
debug.Log("Pack %v includes the following blobs: \n%v", packID.Str(), ready.String())
// debugStats: track maximum memory usage
if debugStats != nil {
debugStats.UpdateMax("max_cache_usage", currentCacheUsage)
}
}
})
}
}
func (c *BlobCache) Get(ctx context.Context, id restic.ID, buf []byte) ([]byte, <-chan []byte) {
c.mu.Lock()
blob, ok := c.c.Get(id) // try to retrieve blob, with recency update
c.mu.Unlock()
if ok { // case 1: when blob exists in cache: return that blob immediately
if cap(buf) < len(blob) {
debug.Log("Allocating new buf, as it has smaller capacity than chunk size.")
buf = make([]byte, len(blob))
} else {
buf = buf[:len(blob)]
}
copy(buf, blob)
debug.Log("Cache hit. Returning blob %v", id.Str())
return buf, nil
}
// case 2: when blob does not exist in cache: return chOut (where downloaded blob will be delievered)
debug.Log("Cache miss. Requesting async get for blob %v", id.Str())
chOut := c.asyncGet(ctx, id, buf)
return nil, chOut
}
func (c *BlobCache) asyncGet(ctx context.Context, id restic.ID, buf []byte) <-chan []byte {
wg, ctx := errgroup.WithContext(ctx)
out := make(chan []byte, 1)
wg.Go(func() error {
for {
c.mu.RLock()
blob, ready := c.c.Peek(id)
finish, inProgress := c.inProgress[id]
c.mu.RUnlock()
if ready { // case A: blob is now ready in the cache
if cap(buf) < len(blob) {
debug.Log("Allocating new buf, as it has smaller capacity than chunk size.")
buf = make([]byte, len(blob))
} else {
buf = buf[:len(blob)]
}
copy(buf, blob)
debug.Log("Blob %v is now ready in the cache. Passing blob data to channel.", id.Str())
out <- buf
return nil
}
if inProgress { // case B: blob is queued, but not yet ready
debug.Log("Waiting for blob %v to be ready in the cache.", id.Str())
select {
case <-ctx.Done():
return ctx.Err()
case <-finish: // wait until download complete
continue
}
}
// case C: blob is not queued
// add to the download queue
debug.Log("Requesting download of the pack containing blob %v", id.Str())
err := c.requestDownload(ctx, id)
if err != nil {
return err
}
}
})
return out
}
func (c *BlobCache) requestDownload(ctx context.Context, id restic.ID) error {
packID := c.idx.BlobToPack(id)
if packID.IsNull() {
return fmt.Errorf("unknown blob: %v", id.Str())
}
c.mu.Lock()
ok := c.waitList.Has(packID)
if !ok {
// queue pack download
c.waitList.Insert(packID)
}
if _, inProgress := c.inProgress[id]; !inProgress {
c.inProgress[id] = make(chan struct{})
}
c.mu.Unlock()
if ok { // somebody else has already queued pack download; it will handle download request
return nil
}
// send packID to inform the downloader
select {
case <-ctx.Done():
return ctx.Err()
case c.downloadCh <- packID:
return nil
}
}
func (c *BlobCache) Ignore(ids restic.IDs) {
c.mu.Lock()
defer c.mu.Unlock()
for _, id := range ids {
c.ignored.Insert(id)
_ = c.c.Remove(id)
}
if debugStats != nil {
debugStats.Add("ignored_blob_count", len(ids))
}
}
func (c *BlobCache) Close() {
if c == nil {
return
}
c.cancel()
}
type BlobLoaderWithCache struct {
repo PackLoader
cache *BlobCache
}
func (l *BlobLoaderWithCache) LoadBlob(ctx context.Context, _ restic.BlobType, id restic.ID, buf []byte) ([]byte, error) {
blob, ch := l.cache.Get(ctx, id, buf)
if blob == nil { // wait for blob to be downloaded
select {
case <-ctx.Done():
return nil, ctx.Err()
case blob = <-ch:
}
}
return blob, nil
}
type PackLoader interface {
LoadBlobsFromPack(context.Context, restic.ID, []restic.Blob, func(restic.BlobHandle, []byte, error) error) error
}
func WrapWithCache(ctx context.Context, repo PackLoader, cacheSize int, numDownloaders int, idx Index,
onReady, onEvict func(restic.IDs)) (*BlobLoaderWithCache, *BlobCache) {
r := &BlobLoaderWithCache{
repo: repo,
cache: NewBlobCache(ctx, cacheSize, numDownloaders, repo, idx, onReady, onEvict),
}
debug.Log("Wrapped the repository with blob cache.")
return r, r.cache
}

103
internal/rechunker/debug.go Normal file
View file

@ -0,0 +1,103 @@
package rechunker
import (
"maps"
"strings"
"sync"
"github.com/restic/restic/internal/debug"
)
// global data structure for debug trace
var debugStats = NewStats(true)
type Stats struct {
d map[string]int
mu sync.Mutex
}
func NewStats(enable bool) *Stats {
if enable {
return &Stats{
d: map[string]int{},
}
}
return nil
}
func (n *Stats) Add(k string, v int) {
if n == nil {
return
}
n.mu.Lock()
defer n.mu.Unlock()
n.d[k] += v
}
func (n *Stats) AddMap(m map[string]int) {
if n == nil {
return
}
n.mu.Lock()
defer n.mu.Unlock()
for k, v := range m {
n.d[k] += v
}
}
func (n *Stats) UpdateMax(k string, v int) {
if n == nil {
return
}
n.mu.Lock()
defer n.mu.Unlock()
if n.d[k] < v {
n.d[k] = v
}
}
func (n *Stats) Dump() (note map[string]int) {
if n == nil {
return
}
n.mu.Lock()
defer n.mu.Unlock()
note = map[string]int{}
maps.Copy(note, n.d)
return note
}
func debugPrintRechunkReport(rc *Rechunker) {
if debugStats == nil {
return
}
dNote := debugStats.Dump()
if rc.cfg.CacheSize > 0 {
debug.Log("List of blobs downloaded more than once:")
numBlobRedundant := 0
redundantDownloadCount := 0
for k := range dNote {
if strings.HasPrefix(k, "load:") && dNote[k] > 1 {
debug.Log("%v: Downloaded %d times", k[5:15], dNote[k])
numBlobRedundant++
redundantDownloadCount += dNote[k]
}
}
debug.Log("[summary_blobcache] Number of redundantly downloaded blobs is %d, whose overall download count is %d", numBlobRedundant, redundantDownloadCount)
debug.Log("[summary_blobcache] Peak memory usage by blob cache: %v/%v bytes", dNote["max_cache_usage"], rc.cfg.CacheSize)
if dNote["total_blob_count"] != dNote["ignored_blob_count"] {
debug.Log("[summary_blobcache] WARNING: Number of successfully ignored blob at the end: %v/%v", dNote["ignored_blob_count"], dNote["total_blob_count"])
}
}
}

View file

@ -0,0 +1,95 @@
package rechunker
import (
"fmt"
"sync"
"time"
"github.com/restic/restic/internal/ui"
"github.com/restic/restic/internal/ui/progress"
)
type Progress struct {
updater progress.Updater
m sync.Mutex
filesFinished int
filesTotal int
bytesProcessed uint64
bytesTotal uint64
printer progress.Printer
term ui.Terminal
show bool
}
func NewProgress(term ui.Terminal, printer progress.Printer, interval time.Duration) *Progress {
p := &Progress{
term: term,
printer: printer,
}
p.updater = *progress.NewUpdater(interval, p.update)
return p
}
func (p *Progress) Start(fileCount int, totalSize uint64) {
p.m.Lock()
defer p.m.Unlock()
p.filesTotal = fileCount
p.bytesTotal = totalSize
p.show = true
}
func (p *Progress) AddFile(count int) {
p.m.Lock()
defer p.m.Unlock()
p.filesFinished += count
}
func (p *Progress) AddBlob(size uint64) {
p.m.Lock()
defer p.m.Unlock()
p.bytesProcessed += size
}
func (p *Progress) update(duration time.Duration, final bool) {
p.m.Lock()
defer p.m.Unlock()
if p.show && !final {
formattedDuration := ui.FormatDuration(duration)
formattedBytesProcessed := ui.FormatBytes(p.bytesProcessed)
formattedBytesTotal := ui.FormatBytes(p.bytesTotal)
percent := ui.FormatPercent(p.bytesProcessed, p.bytesTotal)
progress := []string{
fmt.Sprintf("[%s] %v/%v distinct files processed",
formattedDuration, p.filesFinished, p.filesTotal),
fmt.Sprintf("%s %s/%s", percent, formattedBytesProcessed, formattedBytesTotal),
}
p.term.SetStatus(progress)
} else if p.show && final {
formattedDuration := ui.FormatDuration(duration)
formattedBytesProcessed := ui.FormatBytes(p.bytesProcessed)
formattedBytesTotal := ui.FormatBytes(p.bytesTotal)
percent := ui.FormatPercent(p.bytesProcessed, p.bytesTotal)
p.term.SetStatus(nil)
p.printer.P("[%s] %v/%v distinct files processed\n", formattedDuration, p.filesFinished, p.filesTotal)
p.printer.P("%s %s/%s\n", percent, formattedBytesProcessed, formattedBytesTotal)
p.show = false
} else {
p.term.SetStatus(nil)
}
}
func (p *Progress) Done() {
if p == nil {
return
}
p.updater.Done()
}

View file

@ -0,0 +1,421 @@
package rechunker
import (
"context"
"crypto/sha256"
"fmt"
"runtime"
"slices"
"sync"
"sync/atomic"
"github.com/restic/chunker"
"github.com/restic/restic/internal/data"
"github.com/restic/restic/internal/debug"
"github.com/restic/restic/internal/restic"
"github.com/restic/restic/internal/walker"
"golang.org/x/sync/errgroup"
)
type Rechunker struct {
cfg Config
idx Index
filesList []*ChunkedFile
totalSize uint64
rechunkReady bool
rechunkMap map[restic.ID]restic.IDs // hashOfIDs of srcBlobIDs -> dstBlobIDs
rechunkMapLock sync.Mutex
totalAddedToDstRepo atomic.Uint64
rewriteTreeMap map[restic.ID]restic.ID // original tree ID (in src repo) -> rewritten tree ID (in dst repo)
}
type Config struct {
CacheSize int
Pol chunker.Pol
}
type ChunkedFile struct {
restic.IDs
hashval restic.ID
}
type Index interface {
BlobSize(blobID restic.ID) (size uint) // blob ID -> blob size
BlobToPack(blobID restic.ID) (packID restic.ID) // blob ID -> pack ID
PackToBlobs(packID restic.ID) (blobs []restic.Blob) // pack ID -> list of blobs to be loaded from the pack
Packs() (packIDs restic.IDSet) // set of all pack IDs
}
func NewRechunker(cfg Config) *Rechunker {
return &Rechunker{
cfg: cfg,
rechunkMap: map[restic.ID]restic.IDs{},
rewriteTreeMap: map[restic.ID]restic.ID{},
}
}
func (rc *Rechunker) Plan(ctx context.Context, srcRepo restic.Repository, rootTrees restic.IDs) error {
var err error
debug.Log("Gathering distinct file Contents from target snapshots")
rc.filesList, rc.totalSize, err = gatherFileContents(ctx, srcRepo, rootTrees)
if err != nil {
return err
}
debug.Log("Building the internal index for use in Rechunk()")
rc.idx, err = createIndex(rc.filesList, srcRepo.LookupBlob)
if err != nil {
return err
}
debug.Log("Sorting the file list by their chunk counts (descending order)")
slices.SortFunc(rc.filesList, func(a, b *ChunkedFile) int {
return len(b.IDs) - len(a.IDs) // descending order
})
rc.rechunkReady = true
return nil
}
func gatherFileContents(ctx context.Context, repo restic.Loader, rootTrees restic.IDs) (filesList []*ChunkedFile, totalSize uint64, err error) {
mu := sync.Mutex{}
visitedFiles := restic.NewIDSet()
visitedTrees := restic.NewIDSet()
// Stream through all subtrees in target rootTrees and gather all distinct file Contents
err = data.StreamTrees(ctx, repo, rootTrees, nil, func(id restic.ID) bool {
visited := visitedTrees.Has(id)
visitedTrees.Insert(id)
return visited
}, func(_ restic.ID, err error, nodes data.TreeNodeIterator) error {
if err != nil {
return err
}
for item := range nodes {
if item.Error != nil {
return item.Error
}
if item.Node == nil || item.Node.Type != data.NodeTypeFile {
continue
}
hashval := HashOfIDs(item.Node.Content)
mu.Lock()
if visitedFiles.Has(hashval) {
mu.Unlock()
continue
}
visitedFiles.Insert(hashval)
filesList = append(filesList, &ChunkedFile{
item.Node.Content,
hashval,
})
totalSize += item.Node.Size
mu.Unlock()
}
return nil
})
if err != nil {
return nil, 0, err
}
return filesList, totalSize, nil
}
type index struct {
blobSize map[restic.ID]uint // blob ID -> blob size
blobIdx map[restic.ID]restic.ID // blob ID -> pack ID
packIdx map[restic.ID][]restic.Blob // pack ID -> list of blobs to be loaded from the pack
}
func (i *index) BlobSize(id restic.ID) uint {
return i.blobSize[id]
}
func (i *index) BlobToPack(id restic.ID) restic.ID {
return i.blobIdx[id]
}
func (i *index) PackToBlobs(id restic.ID) []restic.Blob {
return i.packIdx[id]
}
func (i *index) Packs() restic.IDSet {
ids := restic.NewIDSet()
for id := range i.packIdx {
ids.Insert(id)
}
return ids
}
func createIndex(filesList []*ChunkedFile, lookupBlob func(t restic.BlobType, id restic.ID) []restic.PackedBlob) (Index, error) {
// collect blob usage info
blobCount := map[restic.ID]int{}
for _, file := range filesList {
for _, blob := range file.IDs {
blobCount[blob]++
}
}
// debugStats: record the number of blobs used
if debugStats != nil {
debugStats.Add("total_blob_count", len(blobCount))
}
// build blob lookup info
blobSize := map[restic.ID]uint{}
blobIdx := map[restic.ID]restic.ID{}
packIdx := map[restic.ID][]restic.Blob{}
for blob := range blobCount {
packs := lookupBlob(restic.DataBlob, blob)
if len(packs) == 0 {
return nil, fmt.Errorf("can't find blob from source repo: %v", blob)
}
pb := packs[0]
blobSize[pb.Blob.ID] = pb.DataLength()
blobIdx[pb.Blob.ID] = pb.PackID
packIdx[pb.PackID] = append(packIdx[pb.PackID], pb.Blob)
}
idx := &index{
blobSize: blobSize,
blobIdx: blobIdx,
packIdx: packIdx,
}
return idx, nil
}
func (rc *Rechunker) Rechunk(ctx context.Context, srcRepo, dstRepo restic.Repository, p *Progress) error {
if dstRepo.Config().ChunkerPolynomial != rc.cfg.Pol {
return fmt.Errorf("chunker polynomial of dstRepo does not match with Rechunker's one")
}
if !rc.rechunkReady {
return fmt.Errorf("Plan() must be run first before Rechunk()")
}
rc.rechunkReady = false
debug.Log("Rechunk start.")
defer debug.Log("Rechunk done.")
numWorkers := min(runtime.GOMAXPROCS(0), int(srcRepo.Connections()))
numDownloaders := numWorkers
debug.Log("srcRepo.Connections(): %v", srcRepo.Connections())
// set up scheduler
scheduler := rc.setupScheduler(ctx)
// set up blob cache
var downloader restic.BlobLoader
var cache *BlobCache
if rc.cfg.CacheSize > 0 {
downloader, cache = rc.setupCache(ctx, srcRepo, scheduler, numDownloaders)
defer cache.Close()
} else {
downloader = srcRepo
}
// run rechunk workers
bufferPool := NewBufferPool(3 * (numWorkers + 1))
err := dstRepo.WithBlobUploader(ctx, func(ctx context.Context, uploader restic.BlobSaverWithAsync) error {
debug.Log("Starting uploader")
defer debug.Log("Closing uploader")
workerCfg := WorkerConfig{
Pol: rc.cfg.Pol,
Downloader: downloader,
Uploader: uploader,
BufferPool: bufferPool,
NewCursor: scheduler.newCursor,
UpdateCursor: scheduler.updateCursor,
}
wg, ctx := errgroup.WithContext(ctx)
rc.runWorkers(ctx, wg, numWorkers, workerCfg, scheduler.Next, p)
rc.runWorkers(ctx, wg, 1, workerCfg, scheduler.NextPriority, p)
return wg.Wait()
})
if err != nil {
return err
}
debugPrintRechunkReport(rc)
return nil
}
func (rc *Rechunker) setupScheduler(ctx context.Context) (scheduler *Scheduler) {
debug.Log("Running file dispatcher")
// If the blob cache is enabled, priority dispatch will be used.
// With priority dispatch, (small) files with all their blobs ready in the cache are prioritized.
// if the blob cache is disabled, dispatch order simply follows the filesList.
if rc.cfg.CacheSize > 0 {
scheduler = NewScheduler(ctx, rc.filesList, rc.idx, true)
} else {
scheduler = NewScheduler(ctx, rc.filesList, rc.idx, false)
}
return scheduler
}
func (rc *Rechunker) setupCache(ctx context.Context, srcRepo PackLoader, scheduler *Scheduler, numDownloaders int) (repo restic.BlobLoader, cache *BlobCache) {
debug.Log("Creating blob cache: cacheSize %v", rc.cfg.CacheSize)
// wrap srcRepo with cache. Now repo's LoadBlob() method will be transparently mediated by blob cache
repo, cache = WrapWithCache(ctx, srcRepo, rc.cfg.CacheSize, numDownloaders, rc.idx, scheduler.BlobReady, scheduler.BlobUnready)
// register cache.Ignore as scheduler's obsolete blob callback for early cache eviction
scheduler.SetIgnoreBlobsCallback(cache.Ignore)
return repo, cache
}
func (rc *Rechunker) runWorkers(ctx context.Context, wg *errgroup.Group, numWorkers int,
workerCfg WorkerConfig, receiveJob func(context.Context) (*ChunkedFile, bool, error),
p *Progress) {
for range numWorkers {
wg.Go(func() error {
debug.Log("Starting worker")
worker := NewWorker(workerCfg)
for {
debug.Log("receiving job")
file, ok, err := receiveJob(ctx)
if err != nil {
return err
}
if !ok {
return nil
}
debug.Log("Starting file %v", file.hashval.Str())
result, err := worker.RunFile(ctx, file.IDs, p)
if err != nil {
return err
}
debug.Log("Finished file %v", file.hashval.Str())
if p != nil {
p.AddFile(1)
}
rc.totalAddedToDstRepo.Add(result.addedToRepository)
rc.rechunkMapLock.Lock()
rc.rechunkMap[file.hashval] = result.dstBlobs
rc.rechunkMapLock.Unlock()
}
})
}
}
// wrapper type for BlobSaver where you can define custom SaveBlob()
type wrappedBlobSaver func(ctx context.Context, tpe restic.BlobType, buf []byte, id restic.ID, storeDuplicate bool) (newID restic.ID, known bool, sizeInRepo int, err error)
func (s wrappedBlobSaver) SaveBlob(ctx context.Context, tpe restic.BlobType, buf []byte, id restic.ID, storeDuplicate bool) (newID restic.ID, known bool, sizeInRepo int, err error) {
return s(ctx, tpe, buf, id, storeDuplicate)
}
func (rc *Rechunker) RewriteTrees(ctx context.Context, srcRepo, dstRepo restic.Repository, treeIDs restic.IDs) (restic.IDs, error) {
result := restic.IDs{}
rewriter := walker.NewTreeRewriter(walker.RewriteOpts{
RewriteNode: func(node *data.Node, _ string) *data.Node {
if node == nil {
return nil
}
if node.Type != data.NodeTypeFile {
return node
}
hashval := HashOfIDs(node.Content)
dstBlobs, ok := rc.rechunkMap[hashval]
if !ok {
panic(fmt.Errorf("can't find from rechunkBlobsMap: %v", node.Content.String()))
}
node.Content = dstBlobs
return node
},
AllowUnstableSerialization: true,
})
err := dstRepo.WithBlobUploader(ctx, func(ctx context.Context, uploader restic.BlobSaverWithAsync) error {
// wrap dstRepo so that total uploaded tree blobs size can be tracked
saver := wrappedBlobSaver(func(ctx context.Context, tpe restic.BlobType, buf []byte, id restic.ID, storeDuplicate bool) (newID restic.ID, known bool, sizeInRepo int, err error) {
newID, known, sizeInRepo, err = uploader.SaveBlob(ctx, tpe, buf, id, storeDuplicate)
if err != nil {
return
}
if !known {
rc.totalAddedToDstRepo.Add(uint64(sizeInRepo))
}
return
})
for _, treeID := range treeIDs {
// check if the identical tree has already been processed
newID, ok := rc.rewriteTreeMap[treeID]
if ok {
result = append(result, newID)
continue
}
newID, err := rewriter.RewriteTree(ctx, srcRepo, saver, "/", treeID)
if err != nil {
return err
}
rc.rewriteTreeMap[treeID] = newID
result = append(result, newID)
}
return nil
})
if err != nil {
return nil, err
}
return result, nil
}
func (rc *Rechunker) GetRewrittenTree(originalTree restic.ID) (restic.ID, error) {
newID, ok := rc.rewriteTreeMap[originalTree]
if !ok {
return restic.ID{}, fmt.Errorf("rewritten tree does not exist for original tree %v", originalTree)
}
return newID, nil
}
func (rc *Rechunker) TotalSize() uint64 {
return rc.totalSize
}
func (rc *Rechunker) NumFiles() int {
return len(rc.filesList)
}
func (rc *Rechunker) NumPacks() int {
return len(rc.idx.Packs())
}
func (rc *Rechunker) TotalAddedToDstRepo() uint64 {
return rc.totalAddedToDstRepo.Load()
}
// HashOfIDs computes a sha256 hash of the concatenation of all values of `restic.IDs`, making a mapping from `restic.IDs` to `restic.ID`.
func HashOfIDs(ids restic.IDs) restic.ID {
c := make([]byte, 0, len(ids)*32)
for _, id := range ids {
c = append(c, id[:]...)
}
return sha256.Sum256(c)
}

View file

@ -0,0 +1,193 @@
package rechunker
import (
"context"
"fmt"
"testing"
"github.com/restic/chunker"
"github.com/restic/restic/internal/archiver"
"github.com/restic/restic/internal/data"
"github.com/restic/restic/internal/restic"
rtest "github.com/restic/restic/internal/test"
"github.com/restic/restic/internal/walker"
)
// prepareData prepares random data for rechunker test.
func prepareData(t *testing.T) string {
tempdir := rtest.TempDir(t)
data := map[int][]byte{
1: rtest.Random(1, 10_000),
2: rtest.Random(2, 10_000_000),
3: rtest.Random(3, 100_000_000),
}
repo := archiver.TestDir{
"zero": archiver.TestFile{Content: ""},
"one": archiver.TestFile{Content: string(data[1])},
"two": archiver.TestFile{Content: string(data[2])},
"three": archiver.TestFile{Content: string(data[3])},
"dir1": archiver.TestDir{
"dir2": archiver.TestDir{
"dup_1": archiver.TestFile{Content: string(data[1])},
"dup_3": archiver.TestFile{Content: string(data[3])},
},
},
}
archiver.TestCreateFiles(t, tempdir, repo)
return tempdir
}
func gatherNodesByPath(t *testing.T, repo restic.BlobLoader, root restic.ID) map[string]*data.Node {
t.Helper()
result := map[string]*data.Node{}
err := walker.Walk(t.Context(), repo, root, walker.WalkVisitor{
ProcessNode: func(parentTreeID restic.ID, path string, node *data.Node, nodeErr error) (err error) {
if node != nil {
result[path] = node
}
return nodeErr
},
})
if err != nil {
t.Fatal(err)
}
return result
}
func buildRechunkMapByMatchingPath(t *testing.T, srcNodes, dstNodes map[string]*data.Node) map[restic.ID]restic.IDs {
t.Helper()
rechunkMap := map[restic.ID]restic.IDs{}
for k, v := range srcNodes {
if v.Type != data.NodeTypeFile {
continue
}
if _, ok := dstNodes[k]; !ok {
t.Fatalf("%v expected in dstNodes, but not found", k)
}
rechunkMap[HashOfIDs(v.Content)] = dstNodes[k].Content
}
return rechunkMap
}
func TestRechunker(t *testing.T) {
// generate reandom polynomials
srcChunkerParam, _ := chunker.RandomPolynomial()
dstChunkerParam, _ := chunker.RandomPolynomial()
// prepare test data
tempdir := prepareData(t)
// prepare repositories
srcRepo := TestRepositoryWithPol(t, srcChunkerParam)
dstWantsRepo := TestRepositoryWithPol(t, dstChunkerParam)
dstTestsRepo := TestRepositoryWithPol(t, dstChunkerParam)
srcSn := archiver.TestSnapshot(t, srcRepo, tempdir, nil)
dstWantsSn := archiver.TestSnapshot(t, dstWantsRepo, tempdir, nil)
srcNodes := gatherNodesByPath(t, srcRepo, *srcSn.Tree)
dstWantsNodes := gatherNodesByPath(t, dstWantsRepo, *dstWantsSn.Tree)
wantedRechunkMap := buildRechunkMapByMatchingPath(t, srcNodes, dstWantsNodes)
// run rechunk copy
rechunker := NewRechunker(Config{
CacheSize: 4 * (1 << 30),
Pol: dstChunkerParam,
})
t.Run("Plan running", func(t *testing.T) {
err := rechunker.Plan(t.Context(), srcRepo, restic.IDs{*srcSn.Tree})
if err != nil {
t.Fatal(err)
}
})
t.Run("Rechunk running", func(t *testing.T) {
err := rechunker.Rechunk(t.Context(), srcRepo, dstTestsRepo, nil)
if err != nil {
t.Fatal(err)
}
})
var testsTree restic.ID
t.Run("RewriteTrees running", func(t *testing.T) {
newID, err := rechunker.RewriteTrees(t.Context(), srcRepo, dstTestsRepo, restic.IDs{*srcSn.Tree})
if err != nil {
t.Fatal(err)
}
testsTree = newID[0]
})
// compare dstTestsRepo (rechunker result) vs dstWantsRepo (reference result)
// 1) check if all expected data blobs are stored
t.Run("data blob verification", func(t *testing.T) {
inCtx, stop := context.WithCancelCause(t.Context())
err := dstWantsRepo.ListBlobs(inCtx, func(pb restic.PackedBlob) {
if pb.Type == restic.DataBlob {
_, found := dstTestsRepo.LookupBlobSize(restic.DataBlob, pb.ID)
if !found {
stop(fmt.Errorf("blob %v expected but not found", pb.ID.Str()))
}
}
})
if err != nil {
t.Error(err)
}
})
// 2) check if rechunk is done correctly by comparing rechunkMap
t.Run("rechunk mapping verification", func(t *testing.T) {
testedRechunkMap := rechunker.rechunkMap
for k, v := range wantedRechunkMap {
wants := HashOfIDs(v)
tests := HashOfIDs(testedRechunkMap[k])
if wants != tests {
t.Errorf("rechunk result for src file %v does not match: %v expected, but got %v", k.Str(), wants.Str(), tests.Str())
}
}
})
// 3) check if tree is rewritten correctly by comparing tree nodes
t.Run("tree verification", func(t *testing.T) {
testsNodes := gatherNodesByPath(t, dstTestsRepo, testsTree)
// (i) compare Content field with dstWantsNodes
for path, node := range dstWantsNodes {
if node.Type != data.NodeTypeFile {
continue
}
if _, ok := testsNodes[path]; !ok {
t.Errorf("node for path %v does not exist", path)
continue
}
wants := HashOfIDs(node.Content)
tests := HashOfIDs(testsNodes[path].Content)
if wants != tests {
t.Errorf("node content for path %v does not match: %v expected, but got %v", path, wants.Str(), tests.Str())
}
}
// (ii) compare remaining fields with srcNodes
for path, wantsNode := range srcNodes {
testsNode, ok := testsNodes[path]
if !ok {
t.Errorf("node for path %v does not exist", path)
continue
}
// copy nodes and clear rewritten fields for comparison
wants, tests := *wantsNode, *testsNode
wants.Content, tests.Content = nil, nil
wants.Subtree, tests.Subtree = nil, nil
if !wants.Equals(tests) {
t.Errorf("node fields for path %v does not match", path)
}
}
})
}

View file

@ -0,0 +1,393 @@
package rechunker
import (
"context"
"fmt"
"sync"
"github.com/restic/restic/internal/debug"
"github.com/restic/restic/internal/restic"
"golang.org/x/sync/errgroup"
)
type Scheduler struct {
mu sync.Mutex
idx Index
regularCh <-chan *ChunkedFile
priorityCh <-chan *ChunkedFile
regularList []*ChunkedFile
priorityList []*ChunkedFile
prefixLookup map[restic.ID][]*ChunkedFile // blob ID -> files that contain the blob as prefix
remainingPrefixBlobs map[restic.ID]int // file hashval -> remaining count until all its blobs ready
remainingBlobUsage map[restic.ID]int // blob ID -> remaining blob usage until the end
ignoreBlobsCB func(ids restic.IDs)
push chan struct{}
done chan struct{}
}
func NewScheduler(ctx context.Context, files []*ChunkedFile, idx Index, usePriority bool) *Scheduler {
debug.Log(("Running NewScheduler()"))
wg, ctx := errgroup.WithContext(ctx)
filesContaining, blobsToPrepare, remainingBlobNeeds := createSchedulerState(files)
if !usePriority {
s := &Scheduler{
idx: idx,
regularList: files,
done: make(chan struct{}),
prefixLookup: filesContaining,
remainingPrefixBlobs: blobsToPrepare,
remainingBlobUsage: remainingBlobNeeds,
}
s.createRegularCh(ctx, wg, nil)
return s
}
s := &Scheduler{
idx: idx,
regularList: files,
push: make(chan struct{}, 1),
done: make(chan struct{}),
prefixLookup: filesContaining,
remainingPrefixBlobs: blobsToPrepare,
remainingBlobUsage: remainingBlobNeeds,
}
set := restic.IDSet{}
mu := sync.Mutex{}
visited := func(id restic.ID) bool {
mu.Lock()
visited := set.Has(id)
if !visited {
set.Insert(id)
}
mu.Unlock()
return visited
}
s.createRegularCh(ctx, wg, visited)
s.createPriorityCh(ctx, wg, visited)
return s
}
const FILE_HEAD_LENGTH = 25
func createSchedulerState(files []*ChunkedFile) (map[restic.ID][]*ChunkedFile, map[restic.ID]int, map[restic.ID]int) {
blobUsage := map[restic.ID]int{}
prefixLookup := map[restic.ID][]*ChunkedFile{}
numPrefixBlobs := map[restic.ID]int{}
for _, file := range files {
prefixLen := min(FILE_HEAD_LENGTH, len(file.IDs))
prefixSet := restic.NewIDSet(file.IDs[:prefixLen]...)
numPrefixBlobs[file.hashval] = len(prefixSet)
for _, blob := range file.IDs {
blobUsage[blob]++
}
for b := range prefixSet {
prefixLookup[b] = append(prefixLookup[b], file)
}
}
return prefixLookup, numPrefixBlobs, blobUsage
}
func (s *Scheduler) Next(ctx context.Context) (*ChunkedFile, bool, error) {
file, from, err := PrioritySelect(ctx, s.priorityCh, s.regularCh)
return file, from != 0, err
}
func (s *Scheduler) NextPriority(ctx context.Context) (*ChunkedFile, bool, error) {
if s.priorityCh == nil {
return nil, false, nil
}
file, from, err := PrioritySelect(ctx, s.priorityCh, nil)
return file, from != 0, err
}
func (s *Scheduler) pushPriority(files []*ChunkedFile) {
if s.priorityCh == nil {
return
}
s.mu.Lock()
defer s.mu.Unlock()
s.priorityList = append(s.priorityList, files...)
select {
case s.push <- struct{}{}:
default:
}
}
func (s *Scheduler) popPriority() []*ChunkedFile {
s.mu.Lock()
defer s.mu.Unlock()
l := s.priorityList
s.priorityList = nil
return l
}
func (s *Scheduler) createRegularCh(ctx context.Context, wg *errgroup.Group, visited func(id restic.ID) bool) {
debug.Log("Running scheduler for regular channel")
ch := make(chan *ChunkedFile)
wg.Go(func() error {
defer close(s.done)
defer close(ch)
for _, file := range s.regularList {
if visited != nil && visited(file.hashval) {
continue
}
select {
case <-ctx.Done():
return ctx.Err()
case ch <- file:
debug.Log("Sent file %v through regular channel", file.hashval.Str())
}
}
return nil
})
s.regularCh = ch
}
func (s *Scheduler) createPriorityCh(ctx context.Context, wg *errgroup.Group, visited func(id restic.ID) bool) {
debug.Log("Running scheduler for priority channel")
ch := make(chan *ChunkedFile)
wg.Go(func() error {
defer close(ch)
var list []*ChunkedFile
for {
if len(list) == 0 {
select {
case <-ctx.Done():
return ctx.Err()
case <-s.push:
list = s.popPriority()
debug.Log("Detected priority files whose count is %v", len(list))
continue
case <-s.done:
debug.Log("Closing scheduler for priority channel")
return nil
}
}
file := list[0]
list = list[1:]
if visited != nil && visited(file.hashval) {
continue
}
select {
case <-ctx.Done():
return ctx.Err()
case ch <- file:
debug.Log("Sent file %v through priority channel", file.hashval.Str())
}
}
})
s.priorityCh = ch
}
func (s *Scheduler) BlobReady(ids restic.IDs) {
// when a new blob is ready, files containing that blob as their prefix
// has their blobsToPrepare decreased by one.
// The list of files whose blobs are all prepared is pushed to priority chan.
if s.priorityCh == nil {
// if there is no priority chan, it is of no meaning to track the state
return
}
var readyFiles []*ChunkedFile
s.mu.Lock()
for _, id := range ids {
for _, file := range s.prefixLookup[id] {
n := s.remainingPrefixBlobs[file.hashval]
if n > 0 {
n--
if n == 0 {
readyFiles = append(readyFiles, file)
}
s.remainingPrefixBlobs[file.hashval] = n
}
}
}
s.mu.Unlock()
if len(readyFiles) == 0 {
return
}
s.pushPriority(readyFiles)
if debugStats != nil {
dAdds := map[string]int{}
for _, id := range ids {
dAdds["load:"+id.String()]++
}
debugStats.AddMap(dAdds)
}
}
func (s *Scheduler) BlobUnready(ids restic.IDs) {
// when a blob is evicted, files containing that blob as their prefix
// has their blobsToPrepare increased by one. However, ignore files
// once they have reached blobsToPrepare value zero; they are no longer tracked.
if s.priorityCh == nil {
// if there is no priority chan, it is of no meaning to track progress
return
}
s.mu.Lock()
for _, id := range ids {
filesToUpdate := s.prefixLookup[id]
for _, file := range filesToUpdate {
// files with blobsToPrepare==0 is not tracked
if s.remainingPrefixBlobs[file.hashval] > 0 {
s.remainingPrefixBlobs[file.hashval]++
}
}
}
s.mu.Unlock()
}
func (s *Scheduler) SetIgnoreBlobsCallback(cb func(restic.IDs)) {
s.mu.Lock()
defer s.mu.Unlock()
s.ignoreBlobsCB = cb
}
func (s *Scheduler) newCursor(blobs restic.IDs) cursor {
if s == nil {
return cursor{}
}
return cursor{
blobs: blobs,
blobSize: s.idx.BlobSize,
}
}
// updateCursor computes progress of cursor for a file, while inferring src blob consumption and using that info to track blob usage.
func (s *Scheduler) updateCursor(c cursor, bytesProcessed uint) (cursor, error) {
start := c
end, err := c.Advance(bytesProcessed)
if err != nil {
return cursor{}, err
}
if start.BlobIdx == end.BlobIdx {
return end, nil
}
blobs := c.blobs[start.BlobIdx:end.BlobIdx]
var obsolete restic.IDs
s.mu.Lock()
for _, b := range blobs {
s.remainingBlobUsage[b]--
if s.remainingBlobUsage[b] == 0 {
obsolete = append(obsolete, b)
}
}
s.mu.Unlock()
if len(obsolete) == 0 {
return end, nil
}
if s.ignoreBlobsCB != nil {
s.ignoreBlobsCB(obsolete)
}
return end, nil
}
type cursor struct {
blobs restic.IDs
BlobIdx int
Offset uint
blobSize func(restic.ID) uint
}
func (c cursor) Advance(numBytes uint) (cursor, error) {
if c.blobs == nil {
return cursor{}, nil
}
for c.BlobIdx < len(c.blobs) {
blobSize := c.blobSize(c.blobs[c.BlobIdx])
if blobSize == 0 {
return cursor{}, fmt.Errorf("unknown blob %v", c.blobs[c.BlobIdx].Str())
}
r := blobSize - c.Offset
if numBytes < r {
c.Offset += numBytes
numBytes = 0
break
}
numBytes -= r
c.BlobIdx++
c.Offset = 0
}
if numBytes != 0 {
return cursor{}, fmt.Errorf("cursor out of range; %d bytes over end position", numBytes)
}
return c, nil
}
// PrioritySelect selects from two channels with priority; first channel first.
func PrioritySelect(ctx context.Context, first <-chan *ChunkedFile, second <-chan *ChunkedFile) (item *ChunkedFile, from int, err error) {
// First, try to pull from channel 'first' only. If 'first' is not ready now, try both channels.
select {
case <-ctx.Done():
return nil, 0, ctx.Err()
case i, ok := <-first:
if ok {
item = i
from = 1
}
default:
select {
case <-ctx.Done():
return nil, 0, ctx.Err()
case i, ok := <-first:
if ok {
item = i
from = 1
}
case i, ok := <-second:
if ok {
item = i
from = 2
}
}
}
return item, from, nil
}

View file

@ -0,0 +1,30 @@
package rechunker
import (
"context"
"testing"
"github.com/restic/chunker"
"github.com/restic/restic/internal/repository"
"github.com/restic/restic/internal/restic"
"github.com/restic/restic/internal/test"
)
func TestRepositoryWithPol(t *testing.T, pol chunker.Pol) restic.Repository {
t.Helper()
be := repository.TestBackend(t)
repo, err := repository.New(be, repository.Options{})
if err != nil {
t.Fatalf("TestRepository(): new repo failed: %v", err)
}
var version uint = restic.StableRepoVersion
err = repo.Init(context.TODO(), version, test.TestPassword, &pol)
if err != nil {
t.Fatalf("TestRepository(): initialize repo failed: %v", err)
}
return repo
}

View file

@ -0,0 +1,243 @@
package rechunker
import (
"context"
"io"
"github.com/restic/chunker"
"github.com/restic/restic/internal/debug"
"github.com/restic/restic/internal/restic"
"golang.org/x/sync/errgroup"
)
type FileResult struct {
dstBlobs restic.IDs
addedToRepository uint64
}
type Worker struct {
pool *BufferPool
chunker *chunker.Chunker
pol chunker.Pol
downloader restic.BlobLoader
uploader restic.BlobSaver
newCursor func(blobs restic.IDs) cursor
updateCursor func(c cursor, numBytes uint) (cursor, error)
}
type WorkerConfig struct {
Pol chunker.Pol
Downloader restic.BlobLoader
Uploader restic.BlobSaver
BufferPool *BufferPool
NewCursor func(blobs restic.IDs) cursor
UpdateCursor func(c cursor, numBytes uint) (cursor, error)
}
func NewWorker(cfg WorkerConfig) *Worker {
if cfg.BufferPool == nil {
cfg.BufferPool = NewBufferPool(3)
}
return &Worker{
pool: cfg.BufferPool,
chunker: chunker.New(nil, cfg.Pol),
pol: cfg.Pol,
downloader: cfg.Downloader,
uploader: cfg.Uploader,
newCursor: cfg.NewCursor,
updateCursor: cfg.UpdateCursor,
}
}
func (w *Worker) RunFile(ctx context.Context, srcBlobs restic.IDs, p *Progress) (FileResult, error) {
buf := w.pool.Get()
// setup reader
reader := NewBlobSequenceReader(ctx, srcBlobs, w.downloader, buf)
// Run worker pipeline (reader and writer)
wg, ctx := errgroup.WithContext(ctx)
chChunk := make(chan chunker.Chunk) // chunk passing channel from reader to writer
chResult := make(chan FileResult, 1) // file rechunk result channel
// Run reader goroutine
w.runReader(ctx, wg, srcBlobs, reader, chChunk)
// Run writer goroutine
w.runWriter(ctx, wg, chChunk, chResult, p)
if err := wg.Wait(); err != nil {
return FileResult{}, err
}
result := <-chResult
w.pool.Put(buf)
return result, nil
}
func (w *Worker) runReader(ctx context.Context, wg *errgroup.Group, srcBlobs restic.IDs, reader *BlobSequenceReader, out chan<- chunker.Chunk) {
debug.Log("Starting reader goroutine")
wg.Go(func() error {
defer close(out)
w.chunker.Reset(reader, w.pol)
var c cursor
if w.newCursor != nil {
c = w.newCursor(srcBlobs)
}
for {
// bring buffer from bufferPool
buf := w.pool.Get()
// rechunk with new parameter
chunk, err := w.chunker.Next(buf)
if err == io.EOF { // reached EOF; all done
w.pool.Put(buf)
return nil
}
if err != nil {
return err
}
if w.updateCursor != nil {
c, err = w.updateCursor(c, chunk.Length)
if err != nil {
return err
}
}
// send a rechunked chunk to the writer
select {
case <-ctx.Done():
return ctx.Err()
case out <- chunk:
debug.Log("Sending a new chunk of size %v to writer", chunk.Length)
}
}
})
}
func (w *Worker) runWriter(ctx context.Context, wg *errgroup.Group, in <-chan chunker.Chunk, out chan<- FileResult, p *Progress) {
debug.Log("Starting writer goroutine")
wg.Go(func() error {
defer close(out)
dstBlobs := restic.IDs{}
var addedSize uint64
for {
// receive chunk from the reader
var c chunker.Chunk
var ok bool
select {
case <-ctx.Done():
return ctx.Err()
case c, ok = <-in:
if !ok { // EOF
out <- FileResult{
dstBlobs: dstBlobs,
addedToRepository: addedSize,
}
return nil
}
}
// save chunk to destination repo
dstBlobID, known, size, err := w.uploader.SaveBlob(ctx, restic.DataBlob, c.Data, restic.ID{}, false)
if err != nil {
return err
}
if !known {
addedSize += uint64(size)
debug.Log("Stored new dst chunk %v into dstRepo", dstBlobID.Str())
}
if p != nil {
p.AddBlob(uint64(c.Length))
}
// recycle used buffer into bufferPool
w.pool.Put(c.Data)
dstBlobs = append(dstBlobs, dstBlobID)
}
})
}
type BlobSequenceReader struct {
ctx context.Context
downloader restic.BlobLoader
blobs restic.IDs
data []byte // data of the current blob being read
buf []byte // reused buffer space
}
func NewBlobSequenceReader(ctx context.Context, blobs restic.IDs, downloader restic.BlobLoader, buf []byte) *BlobSequenceReader {
return &BlobSequenceReader{
ctx: ctx,
blobs: blobs,
downloader: downloader,
buf: buf,
}
}
func (r *BlobSequenceReader) Read(p []byte) (n int, err error) {
if len(r.data) == 0 {
// out of data; load the next blob
if len(r.blobs) == 0 {
return 0, io.EOF
}
// bring the blob data from backend
r.data, err = r.downloader.LoadBlob(r.ctx, restic.DataBlob, r.blobs[0], r.buf)
if err != nil {
return 0, err
}
r.blobs = r.blobs[1:]
}
// copy data from currentBuf to p
n = copy(p, r.data)
r.data = r.data[n:]
return n, nil
}
type BufferPool struct {
c chan []byte
}
func NewBufferPool(cap int) *BufferPool {
return &BufferPool{
c: make(chan []byte, cap),
}
}
func (p *BufferPool) Get() []byte {
select {
case buf := <-p.c:
return buf[:0]
default:
debug.Log("Allocating new buffer")
return make([]byte, 0, chunker.MaxSize)
}
}
func (p *BufferPool) Put(buf []byte) {
select {
case p.c <- buf:
default:
debug.Log("bufferPool is full; discarding the buffer")
}
}