mirror of
https://github.com/restic/restic.git
synced 2026-05-28 04:35:41 -04:00
feat(backends/s3): add warmup support for check command
Follow-up from https://github.com/restic/restic/pull/5173 See also https://github.com/restic/restic/issues/3202
This commit is contained in:
parent
f000da3b35
commit
ae7aad82cd
8 changed files with 98 additions and 15 deletions
5
changelog/unreleased/issue-3202
Normal file
5
changelog/unreleased/issue-3202
Normal file
|
|
@ -0,0 +1,5 @@
|
|||
Enhancement: Add warmup support for check command on S3 backend
|
||||
|
||||
https://github.com/restic/restic/pull/5248
|
||||
https://github.com/restic/restic/issues/3202
|
||||
https://github.com/restic/restic/issues/2504
|
||||
|
|
@ -383,10 +383,9 @@ func runCheck(ctx context.Context, opts CheckOptions, gopts global.Options, args
|
|||
}
|
||||
|
||||
if readDataFilter != nil {
|
||||
p := printer.NewCounter("packs")
|
||||
errChan := make(chan error)
|
||||
|
||||
go chkr.ReadPacks(ctx, readDataFilter, p, errChan)
|
||||
go chkr.ReadPacks(ctx, readDataFilter, printer, errChan)
|
||||
|
||||
for err := range errChan {
|
||||
errorsFound = true
|
||||
|
|
@ -396,7 +395,6 @@ func runCheck(ctx context.Context, opts CheckOptions, gopts global.Options, args
|
|||
salvagePacks.Insert(err.PackID)
|
||||
}
|
||||
}
|
||||
p.Done()
|
||||
}
|
||||
|
||||
if len(salvagePacks) > 0 {
|
||||
|
|
|
|||
|
|
@ -291,6 +291,7 @@ Archive** storage classes is available:
|
|||
- Currently, only the following commands are known to work:
|
||||
|
||||
- ``backup``
|
||||
- ``check`
|
||||
- ``copy``
|
||||
- ``prune``
|
||||
- ``restore``
|
||||
|
|
|
|||
|
|
@ -293,10 +293,10 @@ func (c *Checker) UnusedBlobs(ctx context.Context) (blobs restic.BlobHandles, er
|
|||
// with an unmodified parameter list
|
||||
// Otherwise it calculates the packfiles needed, gets their sizes from the full
|
||||
// packfile set and submits them to repository.ReadPacks()
|
||||
func (c *Checker) ReadPacks(ctx context.Context, filter func(packs map[restic.ID]int64) map[restic.ID]int64, p *progress.Counter, errChan chan<- error) {
|
||||
func (c *Checker) ReadPacks(ctx context.Context, filter func(packs map[restic.ID]int64) map[restic.ID]int64, printer progress.Printer, errChan chan<- error) {
|
||||
// no snapshot filtering, pass through
|
||||
if !c.IsFiltered() {
|
||||
c.Checker.ReadPacks(ctx, filter, p, errChan)
|
||||
c.Checker.ReadPacks(ctx, filter, printer, errChan)
|
||||
return
|
||||
}
|
||||
|
||||
|
|
@ -315,5 +315,5 @@ func (c *Checker) ReadPacks(ctx context.Context, filter func(packs map[restic.ID
|
|||
return filter(filteredPacks)
|
||||
}
|
||||
|
||||
c.Checker.ReadPacks(ctx, packfileFilter, p, errChan)
|
||||
c.Checker.ReadPacks(ctx, packfileFilter, printer, errChan)
|
||||
}
|
||||
|
|
|
|||
|
|
@ -6,6 +6,7 @@ import (
|
|||
"math/rand"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"reflect"
|
||||
"sort"
|
||||
"strconv"
|
||||
"strings"
|
||||
|
|
@ -18,10 +19,12 @@ import (
|
|||
"github.com/restic/restic/internal/checker"
|
||||
"github.com/restic/restic/internal/data"
|
||||
"github.com/restic/restic/internal/errors"
|
||||
"github.com/restic/restic/internal/feature"
|
||||
"github.com/restic/restic/internal/repository"
|
||||
"github.com/restic/restic/internal/repository/hashing"
|
||||
"github.com/restic/restic/internal/restic"
|
||||
"github.com/restic/restic/internal/test"
|
||||
"github.com/restic/restic/internal/ui/progress"
|
||||
)
|
||||
|
||||
var checkerTestData = filepath.Join("testdata", "checker-test-repo.tar.gz")
|
||||
|
|
@ -61,7 +64,7 @@ func checkData(chkr *checker.Checker) []error {
|
|||
func(ctx context.Context, errCh chan<- error) {
|
||||
chkr.ReadPacks(ctx, func(packs map[restic.ID]int64) map[restic.ID]int64 {
|
||||
return packs
|
||||
}, nil, errCh)
|
||||
}, &progress.NoopPrinter{}, errCh)
|
||||
},
|
||||
)
|
||||
}
|
||||
|
|
@ -434,6 +437,61 @@ func TestCheckerModifiedData(t *testing.T) {
|
|||
}
|
||||
}
|
||||
|
||||
// warmupBackend simulates a backend where all handles needs to be warmed up.
|
||||
type warmupBackend struct {
|
||||
backend.Backend
|
||||
handlesToWarmup []backend.Handle
|
||||
handlesAwaited []backend.Handle
|
||||
}
|
||||
|
||||
func (be *warmupBackend) Warmup(ctx context.Context, h []backend.Handle) ([]backend.Handle, error) {
|
||||
if be.handlesToWarmup == nil {
|
||||
be.handlesToWarmup = []backend.Handle{}
|
||||
}
|
||||
be.handlesToWarmup = append(be.handlesToWarmup, h...)
|
||||
return h, nil
|
||||
}
|
||||
|
||||
func (be *warmupBackend) WarmupWait(ctx context.Context, h []backend.Handle) error {
|
||||
if be.handlesAwaited == nil {
|
||||
be.handlesAwaited = []backend.Handle{}
|
||||
}
|
||||
be.handlesAwaited = append(be.handlesAwaited, h...)
|
||||
return nil
|
||||
}
|
||||
|
||||
func TestCheckerWarmup(t *testing.T) {
|
||||
defer feature.TestSetFlag(t, feature.Flag, feature.S3Restore, true)()
|
||||
|
||||
repo, _, be := repository.TestRepositoryWithVersion(t, 0)
|
||||
_ = archiver.TestSnapshot(t, repo, ".", nil)
|
||||
wBackend := &warmupBackend{Backend: be}
|
||||
checkRepo := repository.TestOpenBackend(t, wBackend)
|
||||
chkr := checker.New(checkRepo, false)
|
||||
|
||||
_, errs := chkr.LoadIndex(context.TODO(), nil)
|
||||
if len(errs) > 0 {
|
||||
t.Fatalf("expected no errors, got %v: %v", len(errs), errs)
|
||||
}
|
||||
|
||||
if err := chkr.LoadSnapshots(context.TODO(), &data.SnapshotFilter{}, nil); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
errs = checkData(chkr)
|
||||
if len(errs) != 0 {
|
||||
t.Errorf("expected no data error, got %v: %v", len(errs), errs)
|
||||
}
|
||||
|
||||
if len(wBackend.handlesToWarmup) == 0 {
|
||||
t.Errorf("found no handles to warmup")
|
||||
}
|
||||
|
||||
if !reflect.DeepEqual(wBackend.handlesToWarmup, wBackend.handlesAwaited) {
|
||||
t.Errorf("expected to wait for all cold handles")
|
||||
}
|
||||
}
|
||||
|
||||
// loadTreesOnceRepository allows each tree to be loaded only once
|
||||
type loadTreesOnceRepository struct {
|
||||
*repository.Repository
|
||||
|
|
|
|||
|
|
@ -6,6 +6,7 @@ import (
|
|||
|
||||
"github.com/restic/restic/internal/data"
|
||||
"github.com/restic/restic/internal/restic"
|
||||
"github.com/restic/restic/internal/ui/progress"
|
||||
)
|
||||
|
||||
// TestCheckRepo runs the checker on repo.
|
||||
|
|
@ -55,7 +56,7 @@ func TestCheckRepo(t testing.TB, repo checkerRepository) {
|
|||
errChan = make(chan error)
|
||||
go chkr.ReadPacks(context.TODO(), func(packs map[restic.ID]int64) map[restic.ID]int64 {
|
||||
return packs
|
||||
}, nil, errChan)
|
||||
}, &progress.NoopPrinter{}, errChan)
|
||||
|
||||
for err := range errChan {
|
||||
t.Error(err)
|
||||
|
|
|
|||
|
|
@ -8,6 +8,7 @@ import (
|
|||
"github.com/klauspost/compress/zstd"
|
||||
"github.com/restic/restic/internal/debug"
|
||||
"github.com/restic/restic/internal/errors"
|
||||
"github.com/restic/restic/internal/feature"
|
||||
"github.com/restic/restic/internal/repository/index"
|
||||
"github.com/restic/restic/internal/repository/pack"
|
||||
"github.com/restic/restic/internal/restic"
|
||||
|
|
@ -199,7 +200,7 @@ func (c *Checker) Packs(ctx context.Context, errChan chan<- error) {
|
|||
}
|
||||
|
||||
// ReadPacks loads data from specified packs and checks the integrity.
|
||||
func (c *Checker) ReadPacks(ctx context.Context, filter func(packs map[restic.ID]int64) map[restic.ID]int64, p *progress.Counter, errChan chan<- error) {
|
||||
func (c *Checker) ReadPacks(ctx context.Context, filter func(packs map[restic.ID]int64) map[restic.ID]int64, printer progress.Printer, errChan chan<- error) {
|
||||
defer close(errChan)
|
||||
|
||||
// compute pack size using index entries
|
||||
|
|
@ -209,7 +210,30 @@ func (c *Checker) ReadPacks(ctx context.Context, filter func(packs map[restic.ID
|
|||
return
|
||||
}
|
||||
packs = filter(packs)
|
||||
|
||||
p := printer.NewCounter("packs")
|
||||
p.SetMax(uint64(len(packs)))
|
||||
defer p.Done()
|
||||
|
||||
packSet := restic.NewIDSet()
|
||||
for pack := range packs {
|
||||
packSet.Insert(pack)
|
||||
}
|
||||
|
||||
if feature.Flag.Enabled(feature.S3Restore) {
|
||||
job, err := c.repo.StartWarmup(ctx, packSet)
|
||||
if err != nil {
|
||||
errChan <- err
|
||||
return
|
||||
}
|
||||
if job.HandleCount() != 0 {
|
||||
printer.P("warming up %d packs from cold storage, this may take a while...", job.HandleCount())
|
||||
if err := job.Wait(ctx); err != nil {
|
||||
errChan <- err
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
g, ctx := errgroup.WithContext(ctx)
|
||||
type checkTask struct {
|
||||
|
|
@ -258,11 +282,6 @@ func (c *Checker) ReadPacks(ctx context.Context, filter func(packs map[restic.ID
|
|||
})
|
||||
}
|
||||
|
||||
packSet := restic.NewIDSet()
|
||||
for pack := range packs {
|
||||
packSet.Insert(pack)
|
||||
}
|
||||
|
||||
// push packs to ch
|
||||
for pbs := range c.repo.ListPacksFromIndex(ctx, packSet) {
|
||||
size := packs[pbs.PackID]
|
||||
|
|
|
|||
|
|
@ -14,6 +14,7 @@ import (
|
|||
"github.com/restic/restic/internal/crypto"
|
||||
"github.com/restic/restic/internal/restic"
|
||||
"github.com/restic/restic/internal/test"
|
||||
"github.com/restic/restic/internal/ui/progress"
|
||||
|
||||
"github.com/restic/chunker"
|
||||
)
|
||||
|
|
@ -188,7 +189,7 @@ func TestCheckRepo(t testing.TB, repo *Repository) {
|
|||
errChan = make(chan error)
|
||||
go chkr.ReadPacks(context.TODO(), func(packs map[restic.ID]int64) map[restic.ID]int64 {
|
||||
return packs
|
||||
}, nil, errChan)
|
||||
}, &progress.NoopPrinter{}, errChan)
|
||||
|
||||
for err := range errChan {
|
||||
t.Error(err)
|
||||
|
|
|
|||
Loading…
Reference in a new issue