From ae062151cd52526dbfb875c72a9f7cb7f2377cf8 Mon Sep 17 00:00:00 2001 From: Kyle Eckhart Date: Tue, 17 Feb 2026 03:23:54 -0500 Subject: [PATCH] tsdb/wlog: Remove any temproary checkpoints when creating a Checkpoint (#17598) * RemoveTmpDirs function to tsdbutil * Refactor db to use RemoveTmpDirs and no longer cleanup checkpoint tmp dirs * Use RemoveTmpDirs in wlog checkpoint to cleanup all checkpoint tmp folders * Add tests for RemoveTmpDirs * Ensure db.Open will still cleanup extra temporary checkpoints Signed-off-by: Kyle Eckhart --- tsdb/db.go | 32 ++----- tsdb/tsdbutil/remove_tmp_dirs.go | 45 ++++++++++ tsdb/tsdbutil/remove_tmp_dirs_test.go | 124 ++++++++++++++++++++++++++ tsdb/wlog/checkpoint.go | 37 +++++--- tsdb/wlog/checkpoint_test.go | 78 ++++++++++++++++ 5 files changed, 280 insertions(+), 36 deletions(-) create mode 100644 tsdb/tsdbutil/remove_tmp_dirs.go create mode 100644 tsdb/tsdbutil/remove_tmp_dirs_test.go diff --git a/tsdb/db.go b/tsdb/db.go index 1d73628bfd..a4a4a77f3c 100644 --- a/tsdb/db.go +++ b/tsdb/db.go @@ -929,9 +929,13 @@ func open(dir string, l *slog.Logger, r prometheus.Registerer, opts *Options, rn for _, tmpDir := range []string{walDir, dir} { // Remove tmp dirs. - if err := removeBestEffortTmpDirs(l, tmpDir); err != nil { + if err := tsdbutil.RemoveTmpDirs(l, tmpDir, isTmpDir); err != nil { return nil, fmt.Errorf("remove tmp dirs: %w", err) } + // Remove any temporary checkpoints that might have been interrupted during creation. + if err := wlog.DeleteTempCheckpoints(l, tmpDir); err != nil { + return nil, fmt.Errorf("delete temp checkpoints: %w", err) + } } db := &DB{ @@ -1115,26 +1119,6 @@ func open(dir string, l *slog.Logger, r prometheus.Registerer, opts *Options, rn return db, nil } -func removeBestEffortTmpDirs(l *slog.Logger, dir string) error { - files, err := os.ReadDir(dir) - if os.IsNotExist(err) { - return nil - } - if err != nil { - return err - } - for _, f := range files { - if isTmpDir(f) { - if err := os.RemoveAll(filepath.Join(dir, f.Name())); err != nil { - l.Error("failed to delete tmp block dir", "dir", filepath.Join(dir, f.Name()), "err", err) - continue - } - l.Info("Found and deleted tmp block dir", "dir", filepath.Join(dir, f.Name())) - } - } - return nil -} - // StartTime implements the Storage interface. func (db *DB) StartTime() (int64, error) { db.mtx.RLock() @@ -2538,8 +2522,7 @@ func isBlockDir(fi fs.DirEntry) bool { return err == nil } -// isTmpDir returns true if the given file-info contains a block ULID, a checkpoint prefix, -// or a chunk snapshot prefix and a tmp extension. +// isTmpDir returns true if the given file-info contains a block ULID, or a chunk snapshot prefix and a tmp extension. func isTmpDir(fi fs.DirEntry) bool { if !fi.IsDir() { return false @@ -2548,9 +2531,6 @@ func isTmpDir(fi fs.DirEntry) bool { fn := fi.Name() ext := filepath.Ext(fn) if ext == tmpForDeletionBlockDirSuffix || ext == tmpForCreationBlockDirSuffix || ext == tmpLegacy { - if strings.HasPrefix(fn, wlog.CheckpointPrefix) { - return true - } if strings.HasPrefix(fn, chunkSnapshotPrefix) { return true } diff --git a/tsdb/tsdbutil/remove_tmp_dirs.go b/tsdb/tsdbutil/remove_tmp_dirs.go new file mode 100644 index 0000000000..a95db3159e --- /dev/null +++ b/tsdb/tsdbutil/remove_tmp_dirs.go @@ -0,0 +1,45 @@ +// Copyright 2018 The Prometheus Authors + +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package tsdbutil + +import ( + "io/fs" + "log/slog" + "os" + "path/filepath" +) + +// RemoveTmpDirs attempts to remove directories in the specified directory which match the isTmpDir predicate. +// Errors encountered during reading the directory that other than non-existence are returned. All other errors +// encountered during removal of tmp directories are logged but do not cause early termination. +func RemoveTmpDirs(l *slog.Logger, dir string, isTmpDir func(fi fs.DirEntry) bool) error { + files, err := os.ReadDir(dir) + if os.IsNotExist(err) { + return nil + } + if err != nil { + return err + } + for _, f := range files { + if isTmpDir(f) { + if err := os.RemoveAll(filepath.Join(dir, f.Name())); err != nil { + l.Error("failed to delete tmp dir", "dir", filepath.Join(dir, f.Name()), "err", err) + continue + } + l.Info("Found and deleted tmp dir", "dir", filepath.Join(dir, f.Name())) + } + } + return nil +} diff --git a/tsdb/tsdbutil/remove_tmp_dirs_test.go b/tsdb/tsdbutil/remove_tmp_dirs_test.go new file mode 100644 index 0000000000..4ab282d3b3 --- /dev/null +++ b/tsdb/tsdbutil/remove_tmp_dirs_test.go @@ -0,0 +1,124 @@ +// Copyright 2018 The Prometheus Authors + +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package tsdbutil + +import ( + "io/fs" + "os" + "path/filepath" + "strings" + "testing" + + "github.com/prometheus/common/promslog" + "github.com/stretchr/testify/require" +) + +func TestRemoveTmpDirs(t *testing.T) { + tests := []struct { + name string + isTmpDir func(fi fs.DirEntry) bool + setup func(t *testing.T, dir string) + expectedDirs []string // Directories that should remain after cleanup + }{ + { + name: "remove directories with tmp prefix", + isTmpDir: func(fi fs.DirEntry) bool { + return fi.IsDir() && strings.HasPrefix(fi.Name(), "tmp") + }, + setup: func(t *testing.T, dir string) { + require.NoError(t, os.Mkdir(filepath.Join(dir, "tmpdir1"), 0o755)) + require.NoError(t, os.Mkdir(filepath.Join(dir, "tmpdir2"), 0o755)) + require.NoError(t, os.Mkdir(filepath.Join(dir, "normaldir"), 0o755)) + }, + expectedDirs: []string{"normaldir"}, + }, + { + name: "remove directories with specific suffix", + isTmpDir: func(fi fs.DirEntry) bool { + return fi.IsDir() && strings.HasSuffix(fi.Name(), ".tmp") + }, + setup: func(t *testing.T, dir string) { + require.NoError(t, os.Mkdir(filepath.Join(dir, "data.tmp"), 0o755)) + require.NoError(t, os.Mkdir(filepath.Join(dir, "cache.tmp"), 0o755)) + require.NoError(t, os.Mkdir(filepath.Join(dir, "permanent"), 0o755)) + }, + expectedDirs: []string{"permanent"}, + }, + { + name: "no temporary directories to remove", + isTmpDir: func(fi fs.DirEntry) bool { + return fi.IsDir() && strings.HasPrefix(fi.Name(), "tmp") + }, + setup: func(t *testing.T, dir string) { + require.NoError(t, os.Mkdir(filepath.Join(dir, "normaldir1"), 0o755)) + require.NoError(t, os.Mkdir(filepath.Join(dir, "normaldir2"), 0o755)) + }, + expectedDirs: []string{"normaldir1", "normaldir2"}, + }, + { + name: "empty directory", + isTmpDir: func(fi fs.DirEntry) bool { + return fi.IsDir() && strings.HasPrefix(fi.Name(), "tmp") + }, + setup: func(_ *testing.T, _ string) {}, // No setup needed - directory is empty + expectedDirs: []string{}, + }, + { + name: "directory with files only (no directories)", + isTmpDir: func(fi fs.DirEntry) bool { + return fi.IsDir() && strings.HasPrefix(fi.Name(), "tmp") + }, + setup: func(t *testing.T, dir string) { + require.NoError(t, os.WriteFile(filepath.Join(dir, "tmpfile1.txt"), []byte("test"), 0o644)) + require.NoError(t, os.WriteFile(filepath.Join(dir, "tmpfile2.txt"), []byte("test"), 0o644)) + }, + expectedDirs: []string{}, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + testDir := t.TempDir() + + if tt.setup != nil { + tt.setup(t, testDir) + } + + require.NoError(t, RemoveTmpDirs(promslog.NewNopLogger(), testDir, tt.isTmpDir)) + + entries, err := os.ReadDir(testDir) + require.NoError(t, err) + + // Get actual remaining directories + var actualDirs []string + for _, entry := range entries { + if entry.IsDir() { + actualDirs = append(actualDirs, entry.Name()) + } + } + + require.ElementsMatch(t, tt.expectedDirs, actualDirs, "Remaining directories don't match expected") + }) + } +} + +func TestRemoveTmpDirs_NonExistentDirectory(t *testing.T) { + testDir := t.TempDir() + nonExistent := filepath.Join(testDir, "does_not_exist") + + require.NoError(t, RemoveTmpDirs(promslog.NewNopLogger(), nonExistent, func(_ fs.DirEntry) bool { + return true + })) +} diff --git a/tsdb/wlog/checkpoint.go b/tsdb/wlog/checkpoint.go index 6742141fbc..3a4e194fec 100644 --- a/tsdb/wlog/checkpoint.go +++ b/tsdb/wlog/checkpoint.go @@ -18,6 +18,7 @@ import ( "errors" "fmt" "io" + "io/fs" "log/slog" "math" "os" @@ -31,6 +32,7 @@ import ( "github.com/prometheus/prometheus/tsdb/fileutil" "github.com/prometheus/prometheus/tsdb/record" "github.com/prometheus/prometheus/tsdb/tombstones" + "github.com/prometheus/prometheus/tsdb/tsdbutil" ) // CheckpointStats returns stats about a created checkpoint. @@ -80,8 +82,16 @@ func DeleteCheckpoints(dir string, maxIndex int) error { return errors.Join(errs...) } -// CheckpointPrefix is the prefix used for checkpoint files. -const CheckpointPrefix = "checkpoint." +// checkpointTempFileSuffix is the suffix used when creating temporary checkpoint files. +const checkpointTempFileSuffix = ".tmp" + +// DeleteTempCheckpoints deletes all temporary checkpoint directories in the given directory. +func DeleteTempCheckpoints(logger *slog.Logger, dir string) error { + if err := tsdbutil.RemoveTmpDirs(logger, dir, isTempDir); err != nil { + return fmt.Errorf("remove previous temporary checkpoint dirs: %w", err) + } + return nil +} // Checkpoint creates a compacted checkpoint of segments in range [from, to] in the given WAL. // It includes the most recent checkpoint if it exists. @@ -123,13 +133,13 @@ func Checkpoint(logger *slog.Logger, w *WL, from, to int, keep func(id chunks.He defer sgmReader.Close() } - cpdir := checkpointDir(w.Dir(), to) - cpdirtmp := cpdir + ".tmp" - - if err := os.RemoveAll(cpdirtmp); err != nil { - return nil, fmt.Errorf("remove previous temporary checkpoint dir: %w", err) + if err := DeleteTempCheckpoints(logger, w.Dir()); err != nil { + return nil, err } + cpdir := checkpointDir(w.Dir(), to) + cpdirtmp := cpdir + checkpointTempFileSuffix + if err := os.MkdirAll(cpdirtmp, 0o777); err != nil { return nil, fmt.Errorf("create checkpoint dir: %w", err) } @@ -394,8 +404,11 @@ func Checkpoint(logger *slog.Logger, w *WL, from, to int, keep func(id chunks.He return stats, nil } +// checkpointPrefix is the prefix used for checkpoint files. +const checkpointPrefix = "checkpoint." + func checkpointDir(dir string, i int) string { - return filepath.Join(dir, fmt.Sprintf(CheckpointPrefix+"%08d", i)) + return filepath.Join(dir, fmt.Sprintf(checkpointPrefix+"%08d", i)) } type checkpointRef struct { @@ -411,13 +424,13 @@ func listCheckpoints(dir string) (refs []checkpointRef, err error) { for i := range files { fi := files[i] - if !strings.HasPrefix(fi.Name(), CheckpointPrefix) { + if !strings.HasPrefix(fi.Name(), checkpointPrefix) { continue } if !fi.IsDir() { return nil, fmt.Errorf("checkpoint %s is not a directory", fi.Name()) } - idx, err := strconv.Atoi(fi.Name()[len(CheckpointPrefix):]) + idx, err := strconv.Atoi(fi.Name()[len(checkpointPrefix):]) if err != nil { continue } @@ -431,3 +444,7 @@ func listCheckpoints(dir string) (refs []checkpointRef, err error) { return refs, nil } + +func isTempDir(fi fs.DirEntry) bool { + return strings.HasPrefix(fi.Name(), checkpointPrefix) && strings.HasSuffix(fi.Name(), checkpointTempFileSuffix) +} diff --git a/tsdb/wlog/checkpoint_test.go b/tsdb/wlog/checkpoint_test.go index 97ca2e768d..a348239ec7 100644 --- a/tsdb/wlog/checkpoint_test.go +++ b/tsdb/wlog/checkpoint_test.go @@ -417,3 +417,81 @@ func TestCheckpointNoTmpFolderAfterError(t *testing.T) { }) require.NoError(t, err) } + +func TestCheckpointDeletesTemporaryCheckpoints(t *testing.T) { + dir := t.TempDir() + + // Create one tmp checkpoint directory + require.NoError(t, os.MkdirAll(filepath.Join(dir, "checkpoint.00001000.tmp"), 0o777)) + + w, err := New(nil, nil, dir, compression.None) + require.NoError(t, err) + defer w.Close() + + _, err = Checkpoint(promslog.NewNopLogger(), w, 0, 1000, func(_ chunks.HeadSeriesRef) bool { return true }, 1000) + require.NoError(t, err) + + files, err := os.ReadDir(dir) + require.NoError(t, err) + + var actualDirectories []string + for _, f := range files { + if !f.IsDir() { + continue + } + actualDirectories = append(actualDirectories, f.Name()) + } + require.Equal(t, []string{"checkpoint.00001000"}, actualDirectories) +} + +func TestDeleteTempCheckpoints(t *testing.T) { + testCases := []struct { + name string + checkpointDirectoriesToCreate []string + expectedDirectories []string + }{ + { + name: "no tmp checkpoints", + checkpointDirectoriesToCreate: nil, + expectedDirectories: nil, + }, + { + name: "one tmp checkpoint", + checkpointDirectoriesToCreate: []string{"checkpoint.00001000.tmp"}, + expectedDirectories: nil, + }, + { + name: "many tmp checkpoints", + checkpointDirectoriesToCreate: []string{"checkpoint.00000001.tmp", "checkpoint.00001000.tmp", "checkpoint.00002000.tmp"}, + expectedDirectories: nil, + }, + { + name: "mix of tmp and regular checkpoints", + checkpointDirectoriesToCreate: []string{"checkpoint.00000001", "checkpoint.00000001.tmp", "checkpoint.00001000.tmp", "checkpoint.00002000"}, + expectedDirectories: []string{"checkpoint.00000001", "checkpoint.00002000"}, + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + dir := t.TempDir() + for _, fn := range tc.checkpointDirectoriesToCreate { + require.NoError(t, os.MkdirAll(filepath.Join(dir, fn), 0o777)) + } + + require.NoError(t, DeleteTempCheckpoints(promslog.NewNopLogger(), dir)) + + files, err := os.ReadDir(dir) + require.NoError(t, err) + + var actualDirectories []string + for _, f := range files { + if !f.IsDir() { + continue + } + actualDirectories = append(actualDirectories, f.Name()) + } + require.Equal(t, tc.expectedDirectories, actualDirectories) + }) + } +}