mirror of
https://github.com/restic/restic.git
synced 2026-02-03 04:20:45 -05:00
automatically batch snapshots in copy
This commit is contained in:
parent
f95dc73d38
commit
cf409b7c66
4 changed files with 24 additions and 27 deletions
|
|
@ -63,11 +63,9 @@ Exit status is 12 if the password is incorrect.
|
|||
type CopyOptions struct {
|
||||
global.SecondaryRepoOptions
|
||||
data.SnapshotFilter
|
||||
batch bool
|
||||
}
|
||||
|
||||
func (opts *CopyOptions) AddFlags(f *pflag.FlagSet) {
|
||||
f.BoolVar(&opts.batch, "batch", false, "batch all snapshots to be copied into one step to optimize use of packfiles")
|
||||
opts.SecondaryRepoOptions.AddFlags(f, "destination", "to copy snapshots from")
|
||||
initMultiSnapshotFilter(f, &opts.SnapshotFilter, true)
|
||||
}
|
||||
|
|
@ -161,7 +159,7 @@ func runCopy(ctx context.Context, opts CopyOptions, gopts global.Options, args [
|
|||
|
||||
selectedSnapshots := collectAllSnapshots(ctx, opts, srcSnapshotLister, srcRepo, dstSnapshotByOriginal, args, printer)
|
||||
|
||||
if err := copyTreeBatched(ctx, srcRepo, dstRepo, selectedSnapshots, opts, printer); err != nil {
|
||||
if err := copyTreeBatched(ctx, srcRepo, dstRepo, selectedSnapshots, printer); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
|
|
@ -187,35 +185,33 @@ func similarSnapshots(sna *data.Snapshot, snb *data.Snapshot) bool {
|
|||
return true
|
||||
}
|
||||
|
||||
// copyTreeBatched: copy multiple snapshot trees in one go, using calls to
|
||||
// repository.RepackInner() for all selected snapshot trees and thereby packing the packfiles optimally.
|
||||
// Usually each snapshot creates at least one tree packfile and one data packfile.
|
||||
// copyTreeBatched copies multiple snapshots in one go. Snapshots are written after
|
||||
// data equivalent to at least 10 packfiles was written.
|
||||
func copyTreeBatched(ctx context.Context, srcRepo restic.Repository, dstRepo restic.Repository,
|
||||
selectedSnapshots []*data.Snapshot, opts CopyOptions, printer progress.Printer) error {
|
||||
selectedSnapshots []*data.Snapshot, printer progress.Printer) error {
|
||||
|
||||
// remember already processed trees across all snapshots
|
||||
visitedTrees := restic.NewIDSet()
|
||||
batchSize := 1
|
||||
if opts.batch {
|
||||
batchSize = len(selectedSnapshots)
|
||||
}
|
||||
|
||||
for len(selectedSnapshots) > 0 {
|
||||
var batch []*data.Snapshot
|
||||
batchSize := uint64(0)
|
||||
targetSize := uint64(dstRepo.PackSize()) * 10
|
||||
// call WithBlobUploader() once and then loop over all selectedSnapshots
|
||||
err := dstRepo.WithBlobUploader(ctx, func(ctx context.Context, uploader restic.BlobSaver) error {
|
||||
for len(selectedSnapshots) > 0 && len(batch) < batchSize {
|
||||
for len(selectedSnapshots) > 0 && batchSize < targetSize {
|
||||
sn := selectedSnapshots[0]
|
||||
selectedSnapshots = selectedSnapshots[1:]
|
||||
batch = append(batch, sn)
|
||||
|
||||
printer.P("\n%v", sn)
|
||||
printer.P(" copy started, this may take a while...")
|
||||
err := copyTree(ctx, srcRepo, dstRepo, visitedTrees, *sn.Tree, printer, uploader)
|
||||
sizeBlobs, err := copyTree(ctx, srcRepo, dstRepo, visitedTrees, *sn.Tree, printer, uploader)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
debug.Log("tree copied")
|
||||
batchSize += sizeBlobs
|
||||
}
|
||||
|
||||
return nil
|
||||
|
|
@ -237,7 +233,7 @@ func copyTreeBatched(ctx context.Context, srcRepo restic.Repository, dstRepo res
|
|||
}
|
||||
|
||||
func copyTree(ctx context.Context, srcRepo restic.Repository, dstRepo restic.Repository,
|
||||
visitedTrees restic.IDSet, rootTreeID restic.ID, printer progress.Printer, uploader restic.BlobSaver) error {
|
||||
visitedTrees restic.IDSet, rootTreeID restic.ID, printer progress.Printer, uploader restic.BlobSaver) (uint64, error) {
|
||||
|
||||
wg, wgCtx := errgroup.WithContext(ctx)
|
||||
|
||||
|
|
@ -281,21 +277,20 @@ func copyTree(ctx context.Context, srcRepo restic.Repository, dstRepo restic.Rep
|
|||
})
|
||||
err := wg.Wait()
|
||||
if err != nil {
|
||||
return err
|
||||
return 0, err
|
||||
}
|
||||
|
||||
copyStats(srcRepo, copyBlobs, packList, printer)
|
||||
sizeBlobs := copyStats(srcRepo, copyBlobs, packList, printer)
|
||||
bar := printer.NewCounter("packs copied")
|
||||
err = repository.CopyBlobs(ctx, srcRepo, dstRepo, uploader, packList, copyBlobs, bar, printer.P)
|
||||
if err != nil {
|
||||
return errors.Fatalf("%s", err)
|
||||
return 0, errors.Fatalf("%s", err)
|
||||
}
|
||||
return nil
|
||||
return sizeBlobs, nil
|
||||
}
|
||||
|
||||
// copyStats: print statistics for the blobs to be copied
|
||||
func copyStats(srcRepo restic.Repository, copyBlobs restic.BlobSet, packList restic.IDSet, printer progress.Printer) {
|
||||
|
||||
func copyStats(srcRepo restic.Repository, copyBlobs restic.BlobSet, packList restic.IDSet, printer progress.Printer) uint64 {
|
||||
// count and size
|
||||
countBlobs := 0
|
||||
sizeBlobs := uint64(0)
|
||||
|
|
@ -309,6 +304,7 @@ func copyStats(srcRepo restic.Repository, copyBlobs restic.BlobSet, packList res
|
|||
|
||||
printer.V(" copy %d blobs with disk size %s in %d packfiles\n",
|
||||
countBlobs, ui.FormatBytes(uint64(sizeBlobs)), len(packList))
|
||||
return sizeBlobs
|
||||
}
|
||||
|
||||
func copySaveSnapshot(ctx context.Context, sn *data.Snapshot, dstRepo restic.Repository, printer progress.Printer) error {
|
||||
|
|
|
|||
|
|
@ -105,7 +105,7 @@ func PlanPrune(ctx context.Context, opts PruneOptions, repo *Repository, getUsed
|
|||
if repo.Config().Version < 2 && opts.RepackUncompressed {
|
||||
return nil, fmt.Errorf("compression requires at least repository format version 2")
|
||||
}
|
||||
if opts.SmallPackBytes > uint64(repo.packSize()) {
|
||||
if opts.SmallPackBytes > uint64(repo.PackSize()) {
|
||||
return nil, fmt.Errorf("repack-smaller-than exceeds repository packsize")
|
||||
}
|
||||
|
||||
|
|
@ -329,12 +329,12 @@ func decidePackAction(ctx context.Context, opts PruneOptions, repo *Repository,
|
|||
var repackSmallCandidates []packInfoWithID
|
||||
repoVersion := repo.Config().Version
|
||||
// only repack very small files by default
|
||||
targetPackSize := repo.packSize() / 25
|
||||
targetPackSize := repo.PackSize() / 25
|
||||
if opts.SmallPackBytes > 0 {
|
||||
targetPackSize = uint(opts.SmallPackBytes)
|
||||
} else if opts.RepackSmall {
|
||||
// consider files with at least 80% of the target size as large enough
|
||||
targetPackSize = repo.packSize() / 5 * 4
|
||||
targetPackSize = repo.PackSize() / 5 * 4
|
||||
}
|
||||
|
||||
// loop over all packs and decide what to do
|
||||
|
|
|
|||
|
|
@ -154,8 +154,8 @@ func (r *Repository) Config() restic.Config {
|
|||
return r.cfg
|
||||
}
|
||||
|
||||
// packSize return the target size of a pack file when uploading
|
||||
func (r *Repository) packSize() uint {
|
||||
// PackSize return the target size of a pack file when uploading
|
||||
func (r *Repository) PackSize() uint {
|
||||
return r.opts.PackSize
|
||||
}
|
||||
|
||||
|
|
@ -590,8 +590,8 @@ func (r *Repository) startPackUploader(ctx context.Context, wg *errgroup.Group)
|
|||
innerWg, ctx := errgroup.WithContext(ctx)
|
||||
r.packerWg = innerWg
|
||||
r.uploader = newPackerUploader(ctx, innerWg, r, r.Connections())
|
||||
r.treePM = newPackerManager(r.key, restic.TreeBlob, r.packSize(), r.packerCount, r.uploader.QueuePacker)
|
||||
r.dataPM = newPackerManager(r.key, restic.DataBlob, r.packSize(), r.packerCount, r.uploader.QueuePacker)
|
||||
r.treePM = newPackerManager(r.key, restic.TreeBlob, r.PackSize(), r.packerCount, r.uploader.QueuePacker)
|
||||
r.dataPM = newPackerManager(r.key, restic.DataBlob, r.PackSize(), r.packerCount, r.uploader.QueuePacker)
|
||||
|
||||
wg.Go(func() error {
|
||||
return innerWg.Wait()
|
||||
|
|
|
|||
|
|
@ -18,6 +18,7 @@ type Repository interface {
|
|||
// Connections returns the maximum number of concurrent backend operations
|
||||
Connections() uint
|
||||
Config() Config
|
||||
PackSize() uint
|
||||
Key() *crypto.Key
|
||||
|
||||
LoadIndex(ctx context.Context, p TerminalCounterFactory) error
|
||||
|
|
|
|||
Loading…
Reference in a new issue