mirror of
https://github.com/restic/restic.git
synced 2026-05-28 04:35:41 -04:00
Refactor: Change rechunker.Index to interface
Change rechunker.Index to interface type, so that the index is expandable to custom types
This commit is contained in:
parent
b431cd08b7
commit
eb8e127dcd
4 changed files with 55 additions and 29 deletions
|
|
@ -15,7 +15,7 @@ type BlobCache struct {
|
|||
mu sync.RWMutex
|
||||
c *simplelru.LRU[restic.ID, []byte]
|
||||
|
||||
idx *Index
|
||||
idx Index
|
||||
|
||||
free, size int
|
||||
|
||||
|
|
@ -31,7 +31,7 @@ type BlobCache struct {
|
|||
const overhead = len(restic.ID{}) + 64
|
||||
|
||||
func NewBlobCache(ctx context.Context, size int, numDownloaders int,
|
||||
repo PackLoader, idx *Index,
|
||||
repo PackLoader, idx Index,
|
||||
onReady func(blobIDs restic.IDs), onEvict func(blobIDs restic.IDs)) *BlobCache {
|
||||
if size < 32*(1<<20) {
|
||||
panic("Blob cache size should be at least 32 MiB!!")
|
||||
|
|
@ -115,7 +115,7 @@ func (c *BlobCache) startDownloaders(ctx context.Context, numDownloaders int,
|
|||
// filter out ignored blobs
|
||||
c.mu.RLock()
|
||||
var filtered []restic.Blob
|
||||
for _, blob := range c.idx.PackToBlobs[packID] {
|
||||
for _, blob := range c.idx.PackToBlobs(packID) {
|
||||
ignored := c.ignored.Has(blob.ID)
|
||||
ready := c.c.Contains(blob.ID)
|
||||
if !ignored && !ready {
|
||||
|
|
@ -260,13 +260,13 @@ func (c *BlobCache) asyncGet(ctx context.Context, id restic.ID, buf []byte) <-ch
|
|||
}
|
||||
|
||||
func (c *BlobCache) requestDownload(ctx context.Context, id restic.ID) error {
|
||||
packID, ok := c.idx.BlobToPack[id]
|
||||
if !ok {
|
||||
packID := c.idx.BlobToPack(id)
|
||||
if packID.IsNull() {
|
||||
return fmt.Errorf("unknown blob: %v", id.Str())
|
||||
}
|
||||
|
||||
c.mu.Lock()
|
||||
ok = c.waitList.Has(packID)
|
||||
ok := c.waitList.Has(packID)
|
||||
if !ok {
|
||||
// queue pack download
|
||||
c.waitList.Insert(packID)
|
||||
|
|
@ -332,7 +332,7 @@ type PackLoader interface {
|
|||
LoadBlobsFromPack(context.Context, restic.ID, []restic.Blob, func(restic.BlobHandle, []byte, error) error) error
|
||||
}
|
||||
|
||||
func WrapWithCache(ctx context.Context, repo PackLoader, cacheSize int, numDownloaders int, idx *Index,
|
||||
func WrapWithCache(ctx context.Context, repo PackLoader, cacheSize int, numDownloaders int, idx Index,
|
||||
onReady, onEvict func(restic.IDs)) (*BlobLoaderWithCache, *BlobCache) {
|
||||
r := &BlobLoaderWithCache{
|
||||
repo: repo,
|
||||
|
|
|
|||
|
|
@ -19,7 +19,7 @@ import (
|
|||
|
||||
type Rechunker struct {
|
||||
cfg Config
|
||||
idx *Index
|
||||
idx Index
|
||||
|
||||
filesList []*ChunkedFile
|
||||
totalSize uint64
|
||||
|
|
@ -41,11 +41,11 @@ type ChunkedFile struct {
|
|||
hashval restic.ID
|
||||
}
|
||||
|
||||
// Index is immutable after Plan() returns.
|
||||
type Index struct {
|
||||
BlobSize map[restic.ID]uint // blob ID -> blob size
|
||||
BlobToPack map[restic.ID]restic.ID // blob ID -> pack ID
|
||||
PackToBlobs map[restic.ID][]restic.Blob // pack ID -> list of blobs to be loaded from the pack
|
||||
type Index interface {
|
||||
BlobSize(blobID restic.ID) (size uint)
|
||||
BlobToPack(blobID restic.ID) (packID restic.ID)
|
||||
PackToBlobs(packID restic.ID) (blobs []restic.Blob)
|
||||
Packs() (packIDs restic.IDSet)
|
||||
}
|
||||
|
||||
func NewRechunker(cfg Config) *Rechunker {
|
||||
|
|
@ -129,7 +129,33 @@ func gatherFileContents(ctx context.Context, repo restic.Loader, rootTrees resti
|
|||
return filesList, totalSize, nil
|
||||
}
|
||||
|
||||
func createIndex(filesList []*ChunkedFile, lookupBlob func(t restic.BlobType, id restic.ID) []restic.PackedBlob) (*Index, error) {
|
||||
type index struct {
|
||||
blobSize map[restic.ID]uint // blob ID -> blob size
|
||||
blobIdx map[restic.ID]restic.ID // blob ID -> pack ID
|
||||
packIdx map[restic.ID][]restic.Blob // pack ID -> list of blobs to be loaded from the pack
|
||||
}
|
||||
|
||||
func (i *index) BlobSize(id restic.ID) uint {
|
||||
return i.blobSize[id]
|
||||
}
|
||||
|
||||
func (i *index) BlobToPack(id restic.ID) restic.ID {
|
||||
return i.blobIdx[id]
|
||||
}
|
||||
|
||||
func (i *index) PackToBlobs(id restic.ID) []restic.Blob {
|
||||
return i.packIdx[id]
|
||||
}
|
||||
|
||||
func (i *index) Packs() restic.IDSet {
|
||||
ids := restic.NewIDSet()
|
||||
for id := range i.packIdx {
|
||||
ids.Insert(id)
|
||||
}
|
||||
return ids
|
||||
}
|
||||
|
||||
func createIndex(filesList []*ChunkedFile, lookupBlob func(t restic.BlobType, id restic.ID) []restic.PackedBlob) (Index, error) {
|
||||
// collect blob usage info
|
||||
blobCount := map[restic.ID]int{}
|
||||
for _, file := range filesList {
|
||||
|
|
@ -145,8 +171,8 @@ func createIndex(filesList []*ChunkedFile, lookupBlob func(t restic.BlobType, id
|
|||
|
||||
// build blob lookup info
|
||||
blobSize := map[restic.ID]uint{}
|
||||
blobToPack := map[restic.ID]restic.ID{}
|
||||
packToBlobs := map[restic.ID][]restic.Blob{}
|
||||
blobIdx := map[restic.ID]restic.ID{}
|
||||
packIdx := map[restic.ID][]restic.Blob{}
|
||||
for blob := range blobCount {
|
||||
packs := lookupBlob(restic.DataBlob, blob)
|
||||
if len(packs) == 0 {
|
||||
|
|
@ -155,14 +181,14 @@ func createIndex(filesList []*ChunkedFile, lookupBlob func(t restic.BlobType, id
|
|||
pb := packs[0]
|
||||
|
||||
blobSize[pb.Blob.ID] = pb.DataLength()
|
||||
blobToPack[pb.Blob.ID] = pb.PackID
|
||||
packToBlobs[pb.PackID] = append(packToBlobs[pb.PackID], pb.Blob)
|
||||
blobIdx[pb.Blob.ID] = pb.PackID
|
||||
packIdx[pb.PackID] = append(packIdx[pb.PackID], pb.Blob)
|
||||
}
|
||||
|
||||
idx := &Index{
|
||||
BlobSize: blobSize,
|
||||
BlobToPack: blobToPack,
|
||||
PackToBlobs: packToBlobs,
|
||||
idx := &index{
|
||||
blobSize: blobSize,
|
||||
blobIdx: blobIdx,
|
||||
packIdx: packIdx,
|
||||
}
|
||||
|
||||
return idx, nil
|
||||
|
|
@ -373,7 +399,7 @@ func (rc *Rechunker) TotalSize() uint64 {
|
|||
}
|
||||
|
||||
func (rc *Rechunker) PackCount() int {
|
||||
return len(rc.idx.PackToBlobs)
|
||||
return len(rc.idx.Packs())
|
||||
}
|
||||
|
||||
func (rc *Rechunker) TotalAddedToDstRepo() uint64 {
|
||||
|
|
|
|||
|
|
@ -12,7 +12,7 @@ import (
|
|||
type Scheduler struct {
|
||||
mu sync.Mutex
|
||||
|
||||
idx *Index
|
||||
idx Index
|
||||
|
||||
regularCh <-chan *ChunkedFile
|
||||
priorityCh <-chan *ChunkedFile
|
||||
|
|
@ -31,7 +31,7 @@ type Scheduler struct {
|
|||
done chan struct{}
|
||||
}
|
||||
|
||||
func NewScheduler(ctx context.Context, files []*ChunkedFile, idx *Index, usePriority bool) *Scheduler {
|
||||
func NewScheduler(ctx context.Context, files []*ChunkedFile, idx Index, usePriority bool) *Scheduler {
|
||||
debug.Log(("Running NewScheduler()"))
|
||||
|
||||
wg, ctx := errgroup.WithContext(ctx)
|
||||
|
|
@ -281,7 +281,7 @@ func (s *Scheduler) SetObsoleteBlobCallback(cb func(restic.IDs)) {
|
|||
// ReadProgress computes progress of cursor for a file, while inferring src blob consumption and using that info to track blob usage.
|
||||
func (s *Scheduler) ReadProgress(cursor Cursor, bytesProcessed uint) (Cursor, error) {
|
||||
start := cursor
|
||||
end, err := AdvanceCursor(cursor, bytesProcessed, s.idx.BlobSize)
|
||||
end, err := AdvanceCursor(start, bytesProcessed, s.idx.BlobSize)
|
||||
if err != nil {
|
||||
return Cursor{}, err
|
||||
}
|
||||
|
|
|
|||
|
|
@ -235,10 +235,10 @@ type Cursor struct {
|
|||
Offset uint
|
||||
}
|
||||
|
||||
func AdvanceCursor(c Cursor, numBytes uint, blobSizes map[restic.ID]uint) (Cursor, error) {
|
||||
func AdvanceCursor(c Cursor, numBytes uint, blobSizes func(restic.ID) uint) (Cursor, error) {
|
||||
for c.BlobIdx < len(c.blobs) {
|
||||
blobSize, ok := blobSizes[c.blobs[c.BlobIdx]]
|
||||
if !ok {
|
||||
blobSize := blobSizes(c.blobs[c.BlobIdx])
|
||||
if blobSize == 0 {
|
||||
return Cursor{}, fmt.Errorf("blob %v not in blobSizes", c.blobs[c.BlobIdx].Str())
|
||||
}
|
||||
r := blobSize - c.Offset
|
||||
|
|
|
|||
Loading…
Reference in a new issue