restic/internal/dump/common.go
Michael Eischer 350f29d921 data: replace Tree with TreeNodeIterator
The TreeNodeIterator decodes nodes while iterating over a tree blob.
This should reduce peak memory usage as now only the serialized tree
blob and a single node have to alive at the same time. Using the
iterator has implications for the error handling however. Now it is
necessary that all loops that iterate through a tree check for errors
before using the node returned by the iterator.

The other change is that it is no longer possible to iterate over a tree
multiple times. Instead it must be loaded a second time. This only
affects the tree rewriting code.
2026-01-31 20:03:38 +01:00

163 lines
3.4 KiB
Go

package dump
import (
"context"
"io"
"path"
"github.com/restic/restic/internal/bloblru"
"github.com/restic/restic/internal/data"
"github.com/restic/restic/internal/restic"
"github.com/restic/restic/internal/walker"
"golang.org/x/sync/errgroup"
)
// A Dumper writes trees and files from a repository to a Writer
// in an archive format.
type Dumper struct {
cache *bloblru.Cache
format string
repo restic.Loader
w io.Writer
}
func New(format string, repo restic.Loader, w io.Writer) *Dumper {
return &Dumper{
cache: bloblru.New(64 << 20),
format: format,
repo: repo,
w: w,
}
}
func (d *Dumper) DumpTree(ctx context.Context, tree data.TreeNodeIterator, rootPath string) error {
wg, ctx := errgroup.WithContext(ctx)
// ch is buffered to deal with variable download/write speeds.
ch := make(chan *data.Node, 10)
wg.Go(func() error {
return sendTrees(ctx, d.repo, tree, rootPath, ch)
})
wg.Go(func() error {
switch d.format {
case "tar":
return d.dumpTar(ctx, ch)
case "zip":
return d.dumpZip(ctx, ch)
default:
panic("unknown dump format")
}
})
return wg.Wait()
}
func sendTrees(ctx context.Context, repo restic.BlobLoader, nodes data.TreeNodeIterator, rootPath string, ch chan *data.Node) error {
defer close(ch)
for item := range nodes {
if item.Error != nil {
return item.Error
}
node := item.Node
node.Path = path.Join(rootPath, node.Name)
if err := sendNodes(ctx, repo, node, ch); err != nil {
return err
}
}
return nil
}
func sendNodes(ctx context.Context, repo restic.BlobLoader, root *data.Node, ch chan *data.Node) error {
select {
case ch <- root:
case <-ctx.Done():
return ctx.Err()
}
// If this is no directory we are finished
if root.Type != data.NodeTypeDir {
return nil
}
err := walker.Walk(ctx, repo, *root.Subtree, walker.WalkVisitor{ProcessNode: func(_ restic.ID, nodepath string, node *data.Node, err error) error {
if err != nil {
return err
}
if node == nil {
return nil
}
node.Path = path.Join(root.Path, nodepath)
if node.Type != data.NodeTypeFile && node.Type != data.NodeTypeDir && node.Type != data.NodeTypeSymlink {
return nil
}
select {
case ch <- node:
case <-ctx.Done():
return ctx.Err()
}
return nil
}})
return err
}
// WriteNode writes a file node's contents directly to d's Writer,
// without caring about d's format.
func (d *Dumper) WriteNode(ctx context.Context, node *data.Node) error {
return d.writeNode(ctx, d.w, node)
}
func (d *Dumper) writeNode(ctx context.Context, w io.Writer, node *data.Node) error {
wg, ctx := errgroup.WithContext(ctx)
limit := int(d.repo.Connections())
wg.SetLimit(1 + limit) // +1 for the writer.
blobs := make(chan (<-chan []byte), limit)
// Writer.
wg.Go(func() error {
for ch := range blobs {
select {
case <-ctx.Done():
return ctx.Err()
case blob := <-ch:
if _, err := w.Write(blob); err != nil {
return err
}
}
}
return nil
})
// Start short-lived goroutines to load blobs.
loop:
for _, id := range node.Content {
// This needs to be buffered, so that loaders can quit
// without waiting for the writer.
ch := make(chan []byte, 1)
wg.Go(func() error {
blob, err := d.cache.GetOrCompute(id, func() ([]byte, error) {
return d.repo.LoadBlob(ctx, restic.DataBlob, id, nil)
})
if err == nil {
ch <- blob
}
return err
})
select {
case blobs <- ch:
case <-ctx.Done():
break loop
}
}
close(blobs)
return wg.Wait()
}