index: convert to implement modern go iterators

This commit is contained in:
Michael Eischer 2025-10-10 23:16:20 +02:00
parent 0624b656b8
commit 38c543457e
11 changed files with 125 additions and 102 deletions

View file

@ -73,9 +73,13 @@ func runList(ctx context.Context, gopts global.Options, args []string, term ui.T
if err != nil {
return err
}
return idx.Each(ctx, func(blobs restic.PackedBlob) {
for blobs := range idx.Values() {
if ctx.Err() != nil {
return ctx.Err()
}
printer.S("%v %v", blobs.Type, blobs.ID)
})
}
return nil
})
default:
return errors.Fatal("invalid type")

View file

@ -92,17 +92,20 @@ func (c *Checker) LoadIndex(ctx context.Context, p restic.TerminalCounterFactory
debug.Log("process blobs")
cnt := 0
err = idx.Each(ctx, func(blob restic.PackedBlob) {
for blob := range idx.Values() {
if ctx.Err() != nil {
return ctx.Err()
}
cnt++
if _, ok := packToIndex[blob.PackID]; !ok {
packToIndex[blob.PackID] = restic.NewIDSet()
}
packToIndex[blob.PackID].Insert(id)
})
}
debug.Log("%d blobs processed", cnt)
return err
return nil
})
if err != nil {
// failed to load the index

View file

@ -1,7 +1,6 @@
package index
import (
"context"
"sort"
"github.com/restic/restic/internal/restic"
@ -120,17 +119,17 @@ func (a *AssociatedSet[T]) For(cb func(bh restic.BlobHandle, val T)) {
cb(k, v)
}
_ = a.idx.Each(context.Background(), func(pb restic.PackedBlob) {
for pb := range a.idx.Values() {
if _, ok := a.overflow[pb.BlobHandle]; ok {
// already reported via overflow set
return
continue
}
val, known := a.Get(pb.BlobHandle)
if known {
cb(pb.BlobHandle, val)
}
})
}
}
// List returns a sorted slice of all BlobHandle in the set.

View file

@ -5,6 +5,7 @@ import (
"context"
"encoding/json"
"io"
"iter"
"math"
"sync"
"time"
@ -169,9 +170,9 @@ func (idx *Index) Lookup(bh restic.BlobHandle, pbs []restic.PackedBlob) []restic
idx.m.RLock()
defer idx.m.RUnlock()
idx.byType[bh.Type].foreachWithID(bh.ID, func(e *indexEntry) {
for e := range idx.byType[bh.Type].valuesWithID(bh.ID) {
pbs = append(pbs, idx.toPackedBlob(e, bh.Type))
})
}
return pbs
}
@ -200,23 +201,22 @@ func (idx *Index) LookupSize(bh restic.BlobHandle) (plaintextLength uint, found
return uint(crypto.PlaintextLength(int(e.length))), true
}
// Each passes all blobs known to the index to the callback fn. This blocks any
// Values returns an iterator over all blobs known to the index. This blocks any
// modification of the index.
func (idx *Index) Each(ctx context.Context, fn func(restic.PackedBlob)) error {
idx.m.RLock()
defer idx.m.RUnlock()
func (idx *Index) Values() iter.Seq[restic.PackedBlob] {
return func(yield func(restic.PackedBlob) bool) {
idx.m.RLock()
defer idx.m.RUnlock()
for typ := range idx.byType {
m := &idx.byType[typ]
m.foreach(func(e *indexEntry) bool {
if ctx.Err() != nil {
return false
for typ := range idx.byType {
m := &idx.byType[typ]
for e := range m.values() {
if !yield(idx.toPackedBlob(e, restic.BlobType(typ))) {
return
}
}
fn(idx.toPackedBlob(e, restic.BlobType(typ)))
return true
})
}
}
return ctx.Err()
}
type EachByPackResult struct {
@ -244,15 +244,14 @@ func (idx *Index) EachByPack(ctx context.Context, packBlacklist restic.IDSet) <-
for typ := range idx.byType {
m := &idx.byType[typ]
m.foreach(func(e *indexEntry) bool {
for e := range m.values() {
packID := idx.packs[e.packIndex]
if !idx.final || !packBlacklist.Has(packID) {
v := byPack[packID]
v[typ] = append(v[typ], e)
byPack[packID] = v
}
return true
})
}
}
for packID, packByType := range byPack {
@ -309,7 +308,7 @@ func (idx *Index) generatePackList() ([]packJSON, error) {
for typ := range idx.byType {
m := &idx.byType[typ]
m.foreach(func(e *indexEntry) bool {
for e := range m.values() {
packID := idx.packs[e.packIndex]
if packID.IsNull() {
panic("null pack id")
@ -331,9 +330,7 @@ func (idx *Index) generatePackList() ([]packJSON, error) {
Length: uint(e.length),
UncompressedLength: uint(e.uncompressedLength),
})
return true
})
}
}
return list, nil
@ -475,23 +472,23 @@ func (idx *Index) merge(idx2 *Index) error {
// helper func to test if identical entry is contained in idx
hasIdenticalEntry := func(e2 *indexEntry) (found bool) {
m.foreachWithID(e2.id, func(e *indexEntry) {
for e := range m.valuesWithID(e2.id) {
b := idx.toPackedBlob(e, restic.BlobType(typ))
b2 := idx2.toPackedBlob(e2, restic.BlobType(typ))
if b == b2 {
found = true
break
}
})
}
return found
}
m2.foreach(func(e2 *indexEntry) bool {
for e2 := range m2.values() {
if !hasIdenticalEntry(e2) {
// packIndex needs to be changed as idx2.pack was appended to idx.pack, see above
m.add(e2.id, e2.packIndex+packlen, e2.offset, e2.length, e2.uncompressedLength)
}
return true
})
}
}
idx.ids = append(idx.ids, idx2.ids...)

View file

@ -325,11 +325,11 @@ func TestIndexUnserialize(t *testing.T) {
}
func listPack(t testing.TB, idx *index.Index, id restic.ID) (pbs []restic.PackedBlob) {
rtest.OK(t, idx.Each(context.TODO(), func(pb restic.PackedBlob) {
for pb := range idx.Values() {
if pb.PackID.Equal(id) {
pbs = append(pbs, pb)
}
}))
}
return pbs
}

View file

@ -2,6 +2,7 @@ package index
import (
"hash/maphash"
"iter"
"github.com/restic/restic/internal/restic"
)
@ -53,31 +54,37 @@ func (m *indexMap) add(id restic.ID, packIdx int, offset, length uint32, uncompr
m.numentries++
}
// foreach calls fn for all entries in the map, until fn returns false.
func (m *indexMap) foreach(fn func(*indexEntry) bool) {
blockCount := m.blockList.Size()
for i := uint(1); i < blockCount; i++ {
if !fn(m.resolve(i)) {
return
// values returns an iterator over all entries in the map.
func (m *indexMap) values() iter.Seq[*indexEntry] {
return func(yield func(*indexEntry) bool) {
blockCount := m.blockList.Size()
for i := uint(1); i < blockCount; i++ {
if !yield(m.resolve(i)) {
return
}
}
}
}
// foreachWithID calls fn for all entries with the given id.
func (m *indexMap) foreachWithID(id restic.ID, fn func(*indexEntry)) {
if len(m.buckets) == 0 {
return
}
h := m.hash(id)
ei := m.buckets[h]
for ei != 0 {
e := m.resolve(ei)
ei = e.next
if e.id != id {
continue
// valuesWithID returns an iterator over all entries with the given id.
func (m *indexMap) valuesWithID(id restic.ID) iter.Seq[*indexEntry] {
return func(yield func(*indexEntry) bool) {
if len(m.buckets) == 0 {
return
}
h := m.hash(id)
ei := m.buckets[h]
for ei != 0 {
e := m.resolve(ei)
ei = e.next
if e.id != id {
continue
}
if !yield(e) {
return
}
}
fn(e)
}
}

View file

@ -36,7 +36,9 @@ func TestIndexMapForeach(t *testing.T) {
var m indexMap
// Don't crash on empty map.
m.foreach(func(*indexEntry) bool { return true })
for range m.values() {
// empty iteration
}
for i := 0; i < N; i++ {
var id restic.ID
@ -45,7 +47,7 @@ func TestIndexMapForeach(t *testing.T) {
}
seen := make(map[int]struct{})
m.foreach(func(e *indexEntry) bool {
for e := range m.values() {
i := int(e.id[0])
rtest.Assert(t, i < N, "unknown id %v in indexMap", e.id)
rtest.Equals(t, i, e.packIndex)
@ -54,16 +56,15 @@ func TestIndexMapForeach(t *testing.T) {
rtest.Equals(t, i/2, int(e.uncompressedLength))
seen[i] = struct{}{}
return true
})
}
rtest.Equals(t, N, len(seen))
ncalls := 0
m.foreach(func(*indexEntry) bool {
for range m.values() {
ncalls++
return false
})
break
}
rtest.Equals(t, 1, ncalls)
}
@ -81,7 +82,9 @@ func TestIndexMapForeachWithID(t *testing.T) {
// No result (and no crash) for empty map.
n := 0
m.foreachWithID(id, func(*indexEntry) { n++ })
for range m.valuesWithID(id) {
n++
}
rtest.Equals(t, 0, n)
// Test insertion and retrieval of duplicates.
@ -97,10 +100,10 @@ func TestIndexMapForeachWithID(t *testing.T) {
n = 0
var packs [ndups]bool
m.foreachWithID(id, func(e *indexEntry) {
for e := range m.valuesWithID(id) {
packs[e.packIndex] = true
n++
})
}
rtest.Equals(t, ndups, n)
for i := range packs {

View file

@ -3,6 +3,7 @@ package index
import (
"context"
"fmt"
"iter"
"runtime"
"sync"
@ -224,18 +225,21 @@ func (mi *MasterIndex) finalizeFullIndexes() []*Index {
return list
}
// Each runs fn on all blobs known to the index. When the context is cancelled,
// the index iteration return immediately. This blocks any modification of the index.
func (mi *MasterIndex) Each(ctx context.Context, fn func(restic.PackedBlob)) error {
mi.idxMutex.RLock()
defer mi.idxMutex.RUnlock()
// Values returns an iterator over all blobs known to the index. This blocks any
// modification of the index.
func (mi *MasterIndex) Values() iter.Seq[restic.PackedBlob] {
return func(yield func(restic.PackedBlob) bool) {
mi.idxMutex.RLock()
defer mi.idxMutex.RUnlock()
for _, idx := range mi.idx {
if err := idx.Each(ctx, fn); err != nil {
return err
for _, idx := range mi.idx {
for pb := range idx.Values() {
if !yield(pb) {
return
}
}
}
}
return nil
}
// MergeFinalIndexes merges all final indexes together.
@ -610,13 +614,13 @@ func (mi *MasterIndex) ListPacks(ctx context.Context, packs restic.IDSet) <-chan
if len(packBlob) == 0 {
continue
}
err := mi.Each(ctx, func(pb restic.PackedBlob) {
for pb := range mi.Values() {
if ctx.Err() != nil {
return
}
if packs.Has(pb.PackID) && pb.PackID[0]&0xf == i {
packBlob[pb.PackID] = append(packBlob[pb.PackID], pb.Blob)
}
})
if err != nil {
return
}
// pass on packs

View file

@ -170,9 +170,9 @@ func TestMasterMergeFinalIndexes(t *testing.T) {
rtest.Equals(t, ids, mIdx.IDs())
blobCount := 0
rtest.OK(t, mIdx.Each(context.TODO(), func(pb restic.PackedBlob) {
for range mIdx.Values() {
blobCount++
}))
}
rtest.Equals(t, 2, blobCount)
blobs := mIdx.Lookup(bhInIdx1)
@ -204,9 +204,9 @@ func TestMasterMergeFinalIndexes(t *testing.T) {
rtest.Equals(t, []restic.PackedBlob{blob2}, blobs)
blobCount = 0
rtest.OK(t, mIdx.Each(context.TODO(), func(pb restic.PackedBlob) {
for range mIdx.Values() {
blobCount++
}))
}
rtest.Equals(t, 2, blobCount)
}
@ -325,9 +325,9 @@ func BenchmarkMasterIndexEach(b *testing.B) {
for i := 0; i < b.N; i++ {
entries := 0
rtest.OK(b, mIdx.Each(context.TODO(), func(pb restic.PackedBlob) {
for range mIdx.Values() {
entries++
}))
}
}
}
@ -385,21 +385,21 @@ func testIndexSave(t *testing.T, version uint) {
idx := index.NewMasterIndex()
rtest.OK(t, idx.Load(context.TODO(), repo, nil, nil))
blobs := make(map[restic.PackedBlob]struct{})
rtest.OK(t, idx.Each(context.TODO(), func(pb restic.PackedBlob) {
for pb := range idx.Values() {
blobs[pb] = struct{}{}
}))
}
rtest.OK(t, test.saver(idx, unpacked))
idx = index.NewMasterIndex()
rtest.OK(t, idx.Load(context.TODO(), repo, nil, nil))
rtest.OK(t, idx.Each(context.TODO(), func(pb restic.PackedBlob) {
for pb := range idx.Values() {
if _, ok := blobs[pb]; ok {
delete(blobs, pb)
} else {
t.Fatalf("unexpected blobs %v", pb)
}
}))
}
rtest.Equals(t, 0, len(blobs), "saved index is missing blobs")
checker.TestCheckRepo(t, repo)
@ -418,9 +418,9 @@ func testIndexSavePartial(t *testing.T, version uint) {
idx := index.NewMasterIndex()
rtest.OK(t, idx.Load(context.TODO(), repo, nil, nil))
blobs := make(map[restic.PackedBlob]struct{})
rtest.OK(t, idx.Each(context.TODO(), func(pb restic.PackedBlob) {
for pb := range idx.Values() {
blobs[pb] = struct{}{}
}))
}
// add+remove new snapshot and track its pack files
packsBefore := listPacks(t, repo)
@ -437,13 +437,13 @@ func testIndexSavePartial(t *testing.T, version uint) {
// check blobs
idx = index.NewMasterIndex()
rtest.OK(t, idx.Load(context.TODO(), repo, nil, nil))
rtest.OK(t, idx.Each(context.TODO(), func(pb restic.PackedBlob) {
for pb := range idx.Values() {
if _, ok := blobs[pb]; ok {
delete(blobs, pb)
} else {
t.Fatalf("unexpected blobs %v", pb)
}
}))
}
rtest.Equals(t, 0, len(blobs), "saved index is missing blobs")
// remove pack files to make check happy

View file

@ -648,7 +648,13 @@ func (r *Repository) LookupBlobSize(tpe restic.BlobType, id restic.ID) (uint, bo
// ListBlobs runs fn on all blobs known to the index. When the context is cancelled,
// the index iteration returns immediately with ctx.Err(). This blocks any modification of the index.
func (r *Repository) ListBlobs(ctx context.Context, fn func(restic.PackedBlob)) error {
return r.idx.Each(ctx, fn)
for blob := range r.idx.Values() {
if ctx.Err() != nil {
return ctx.Err()
}
fn(blob)
}
return nil
}
func (r *Repository) ListPacksFromIndex(ctx context.Context, packs restic.IDSet) <-chan restic.PackBlobs {
@ -690,13 +696,13 @@ func (r *Repository) loadIndexWithCallback(ctx context.Context, p restic.Termina
defer cancel()
invalidIndex := false
err := r.idx.Each(ctx, func(blob restic.PackedBlob) {
for blob := range r.idx.Values() {
if ctx.Err() != nil {
return ctx.Err()
}
if blob.IsCompressed() {
invalidIndex = true
}
})
if err != nil {
return err
}
if invalidIndex {
return errors.New("index uses feature not supported by repository version 1")

View file

@ -397,13 +397,13 @@ func testRepositoryIncrementalIndex(t *testing.T, version uint) {
idx, err := loadIndex(context.TODO(), repo, id)
rtest.OK(t, err)
rtest.OK(t, idx.Each(context.TODO(), func(pb restic.PackedBlob) {
for pb := range idx.Values() {
if _, ok := packEntries[pb.PackID]; !ok {
packEntries[pb.PackID] = make(map[restic.ID]struct{})
}
packEntries[pb.PackID][id] = struct{}{}
}))
}
return nil
})
if err != nil {