diff --git a/internal/repository/repository_test.go b/internal/repository/repository_test.go index ecd42c3db..f2ef1d082 100644 --- a/internal/repository/repository_test.go +++ b/internal/repository/repository_test.go @@ -4,11 +4,13 @@ import ( "bytes" "context" "crypto/sha256" + "fmt" "io" "math/rand" "path/filepath" "strings" "sync" + "sync/atomic" "testing" "time" @@ -487,3 +489,65 @@ func TestNoDoubleInit(t *testing.T) { err = repo.Init(context.TODO(), r.Config().Version, rtest.TestPassword, &pol) rtest.Assert(t, strings.Contains(err.Error(), "repository already contains snapshots"), "expected already contains snapshots error, got %q", err) } + +func TestSaveBlobAsync(t *testing.T) { + repo, _, _ := repository.TestRepositoryWithVersion(t, 2) + ctx := context.Background() + + type result struct { + id restic.ID + known bool + size int + err error + } + numCalls := 10 + results := make([]result, numCalls) + var resultsMutex sync.Mutex + + err := repo.WithBlobUploader(ctx, func(ctx context.Context, uploader restic.BlobSaverWithAsync) error { + var wg sync.WaitGroup + wg.Add(numCalls) + for i := 0; i < numCalls; i++ { + // Use unique data for each call + testData := []byte(fmt.Sprintf("test blob data %d", i)) + uploader.SaveBlobAsync(ctx, restic.DataBlob, testData, restic.ID{}, false, + func(newID restic.ID, known bool, size int, err error) { + defer wg.Done() + resultsMutex.Lock() + results[i] = result{newID, known, size, err} + resultsMutex.Unlock() + }) + } + wg.Wait() + return nil + }) + rtest.OK(t, err) + + for i, result := range results { + testData := []byte(fmt.Sprintf("test blob data %d", i)) + expectedID := restic.Hash(testData) + rtest.Assert(t, result.err == nil, "result %d: unexpected error %v", i, result.err) + rtest.Assert(t, result.id.Equal(expectedID), "result %d: expected ID %v, got %v", i, expectedID, result.id) + rtest.Assert(t, !result.known, "result %d: expected unknown blob", i) + } +} + +func TestSaveBlobAsyncErrorHandling(t *testing.T) { + repo, _, _ := repository.TestRepositoryWithVersion(t, 2) + ctx, cancel := context.WithCancel(context.Background()) + + var callbackCalled atomic.Bool + + err := repo.WithBlobUploader(ctx, func(ctx context.Context, uploader restic.BlobSaverWithAsync) error { + cancel() + // Callback must be called even if the context is canceled + uploader.SaveBlobAsync(ctx, restic.DataBlob, []byte("test blob data"), restic.ID{}, false, + func(newID restic.ID, known bool, size int, err error) { + callbackCalled.Store(true) + }) + return nil + }) + + rtest.Assert(t, errors.Is(err, context.Canceled), "expected context canceled error, got %v", err) + rtest.Assert(t, callbackCalled.Load(), "callback was not called") +}