mirror of
https://github.com/prometheus/prometheus.git
synced 2026-05-28 04:02:21 -04:00
tsdb/wlog: Remove any temproary checkpoints when creating a Checkpoint (#17598)
* RemoveTmpDirs function to tsdbutil * Refactor db to use RemoveTmpDirs and no longer cleanup checkpoint tmp dirs * Use RemoveTmpDirs in wlog checkpoint to cleanup all checkpoint tmp folders * Add tests for RemoveTmpDirs * Ensure db.Open will still cleanup extra temporary checkpoints Signed-off-by: Kyle Eckhart <kgeckhart@users.noreply.github.com>
This commit is contained in:
parent
b908cc48a2
commit
ae062151cd
5 changed files with 280 additions and 36 deletions
32
tsdb/db.go
32
tsdb/db.go
|
|
@ -929,9 +929,13 @@ func open(dir string, l *slog.Logger, r prometheus.Registerer, opts *Options, rn
|
|||
|
||||
for _, tmpDir := range []string{walDir, dir} {
|
||||
// Remove tmp dirs.
|
||||
if err := removeBestEffortTmpDirs(l, tmpDir); err != nil {
|
||||
if err := tsdbutil.RemoveTmpDirs(l, tmpDir, isTmpDir); err != nil {
|
||||
return nil, fmt.Errorf("remove tmp dirs: %w", err)
|
||||
}
|
||||
// Remove any temporary checkpoints that might have been interrupted during creation.
|
||||
if err := wlog.DeleteTempCheckpoints(l, tmpDir); err != nil {
|
||||
return nil, fmt.Errorf("delete temp checkpoints: %w", err)
|
||||
}
|
||||
}
|
||||
|
||||
db := &DB{
|
||||
|
|
@ -1115,26 +1119,6 @@ func open(dir string, l *slog.Logger, r prometheus.Registerer, opts *Options, rn
|
|||
return db, nil
|
||||
}
|
||||
|
||||
func removeBestEffortTmpDirs(l *slog.Logger, dir string) error {
|
||||
files, err := os.ReadDir(dir)
|
||||
if os.IsNotExist(err) {
|
||||
return nil
|
||||
}
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
for _, f := range files {
|
||||
if isTmpDir(f) {
|
||||
if err := os.RemoveAll(filepath.Join(dir, f.Name())); err != nil {
|
||||
l.Error("failed to delete tmp block dir", "dir", filepath.Join(dir, f.Name()), "err", err)
|
||||
continue
|
||||
}
|
||||
l.Info("Found and deleted tmp block dir", "dir", filepath.Join(dir, f.Name()))
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// StartTime implements the Storage interface.
|
||||
func (db *DB) StartTime() (int64, error) {
|
||||
db.mtx.RLock()
|
||||
|
|
@ -2538,8 +2522,7 @@ func isBlockDir(fi fs.DirEntry) bool {
|
|||
return err == nil
|
||||
}
|
||||
|
||||
// isTmpDir returns true if the given file-info contains a block ULID, a checkpoint prefix,
|
||||
// or a chunk snapshot prefix and a tmp extension.
|
||||
// isTmpDir returns true if the given file-info contains a block ULID, or a chunk snapshot prefix and a tmp extension.
|
||||
func isTmpDir(fi fs.DirEntry) bool {
|
||||
if !fi.IsDir() {
|
||||
return false
|
||||
|
|
@ -2548,9 +2531,6 @@ func isTmpDir(fi fs.DirEntry) bool {
|
|||
fn := fi.Name()
|
||||
ext := filepath.Ext(fn)
|
||||
if ext == tmpForDeletionBlockDirSuffix || ext == tmpForCreationBlockDirSuffix || ext == tmpLegacy {
|
||||
if strings.HasPrefix(fn, wlog.CheckpointPrefix) {
|
||||
return true
|
||||
}
|
||||
if strings.HasPrefix(fn, chunkSnapshotPrefix) {
|
||||
return true
|
||||
}
|
||||
|
|
|
|||
45
tsdb/tsdbutil/remove_tmp_dirs.go
Normal file
45
tsdb/tsdbutil/remove_tmp_dirs.go
Normal file
|
|
@ -0,0 +1,45 @@
|
|||
// Copyright 2018 The Prometheus Authors
|
||||
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package tsdbutil
|
||||
|
||||
import (
|
||||
"io/fs"
|
||||
"log/slog"
|
||||
"os"
|
||||
"path/filepath"
|
||||
)
|
||||
|
||||
// RemoveTmpDirs attempts to remove directories in the specified directory which match the isTmpDir predicate.
|
||||
// Errors encountered during reading the directory that other than non-existence are returned. All other errors
|
||||
// encountered during removal of tmp directories are logged but do not cause early termination.
|
||||
func RemoveTmpDirs(l *slog.Logger, dir string, isTmpDir func(fi fs.DirEntry) bool) error {
|
||||
files, err := os.ReadDir(dir)
|
||||
if os.IsNotExist(err) {
|
||||
return nil
|
||||
}
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
for _, f := range files {
|
||||
if isTmpDir(f) {
|
||||
if err := os.RemoveAll(filepath.Join(dir, f.Name())); err != nil {
|
||||
l.Error("failed to delete tmp dir", "dir", filepath.Join(dir, f.Name()), "err", err)
|
||||
continue
|
||||
}
|
||||
l.Info("Found and deleted tmp dir", "dir", filepath.Join(dir, f.Name()))
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
124
tsdb/tsdbutil/remove_tmp_dirs_test.go
Normal file
124
tsdb/tsdbutil/remove_tmp_dirs_test.go
Normal file
|
|
@ -0,0 +1,124 @@
|
|||
// Copyright 2018 The Prometheus Authors
|
||||
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package tsdbutil
|
||||
|
||||
import (
|
||||
"io/fs"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"strings"
|
||||
"testing"
|
||||
|
||||
"github.com/prometheus/common/promslog"
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
func TestRemoveTmpDirs(t *testing.T) {
|
||||
tests := []struct {
|
||||
name string
|
||||
isTmpDir func(fi fs.DirEntry) bool
|
||||
setup func(t *testing.T, dir string)
|
||||
expectedDirs []string // Directories that should remain after cleanup
|
||||
}{
|
||||
{
|
||||
name: "remove directories with tmp prefix",
|
||||
isTmpDir: func(fi fs.DirEntry) bool {
|
||||
return fi.IsDir() && strings.HasPrefix(fi.Name(), "tmp")
|
||||
},
|
||||
setup: func(t *testing.T, dir string) {
|
||||
require.NoError(t, os.Mkdir(filepath.Join(dir, "tmpdir1"), 0o755))
|
||||
require.NoError(t, os.Mkdir(filepath.Join(dir, "tmpdir2"), 0o755))
|
||||
require.NoError(t, os.Mkdir(filepath.Join(dir, "normaldir"), 0o755))
|
||||
},
|
||||
expectedDirs: []string{"normaldir"},
|
||||
},
|
||||
{
|
||||
name: "remove directories with specific suffix",
|
||||
isTmpDir: func(fi fs.DirEntry) bool {
|
||||
return fi.IsDir() && strings.HasSuffix(fi.Name(), ".tmp")
|
||||
},
|
||||
setup: func(t *testing.T, dir string) {
|
||||
require.NoError(t, os.Mkdir(filepath.Join(dir, "data.tmp"), 0o755))
|
||||
require.NoError(t, os.Mkdir(filepath.Join(dir, "cache.tmp"), 0o755))
|
||||
require.NoError(t, os.Mkdir(filepath.Join(dir, "permanent"), 0o755))
|
||||
},
|
||||
expectedDirs: []string{"permanent"},
|
||||
},
|
||||
{
|
||||
name: "no temporary directories to remove",
|
||||
isTmpDir: func(fi fs.DirEntry) bool {
|
||||
return fi.IsDir() && strings.HasPrefix(fi.Name(), "tmp")
|
||||
},
|
||||
setup: func(t *testing.T, dir string) {
|
||||
require.NoError(t, os.Mkdir(filepath.Join(dir, "normaldir1"), 0o755))
|
||||
require.NoError(t, os.Mkdir(filepath.Join(dir, "normaldir2"), 0o755))
|
||||
},
|
||||
expectedDirs: []string{"normaldir1", "normaldir2"},
|
||||
},
|
||||
{
|
||||
name: "empty directory",
|
||||
isTmpDir: func(fi fs.DirEntry) bool {
|
||||
return fi.IsDir() && strings.HasPrefix(fi.Name(), "tmp")
|
||||
},
|
||||
setup: func(_ *testing.T, _ string) {}, // No setup needed - directory is empty
|
||||
expectedDirs: []string{},
|
||||
},
|
||||
{
|
||||
name: "directory with files only (no directories)",
|
||||
isTmpDir: func(fi fs.DirEntry) bool {
|
||||
return fi.IsDir() && strings.HasPrefix(fi.Name(), "tmp")
|
||||
},
|
||||
setup: func(t *testing.T, dir string) {
|
||||
require.NoError(t, os.WriteFile(filepath.Join(dir, "tmpfile1.txt"), []byte("test"), 0o644))
|
||||
require.NoError(t, os.WriteFile(filepath.Join(dir, "tmpfile2.txt"), []byte("test"), 0o644))
|
||||
},
|
||||
expectedDirs: []string{},
|
||||
},
|
||||
}
|
||||
|
||||
for _, tt := range tests {
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
testDir := t.TempDir()
|
||||
|
||||
if tt.setup != nil {
|
||||
tt.setup(t, testDir)
|
||||
}
|
||||
|
||||
require.NoError(t, RemoveTmpDirs(promslog.NewNopLogger(), testDir, tt.isTmpDir))
|
||||
|
||||
entries, err := os.ReadDir(testDir)
|
||||
require.NoError(t, err)
|
||||
|
||||
// Get actual remaining directories
|
||||
var actualDirs []string
|
||||
for _, entry := range entries {
|
||||
if entry.IsDir() {
|
||||
actualDirs = append(actualDirs, entry.Name())
|
||||
}
|
||||
}
|
||||
|
||||
require.ElementsMatch(t, tt.expectedDirs, actualDirs, "Remaining directories don't match expected")
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestRemoveTmpDirs_NonExistentDirectory(t *testing.T) {
|
||||
testDir := t.TempDir()
|
||||
nonExistent := filepath.Join(testDir, "does_not_exist")
|
||||
|
||||
require.NoError(t, RemoveTmpDirs(promslog.NewNopLogger(), nonExistent, func(_ fs.DirEntry) bool {
|
||||
return true
|
||||
}))
|
||||
}
|
||||
|
|
@ -18,6 +18,7 @@ import (
|
|||
"errors"
|
||||
"fmt"
|
||||
"io"
|
||||
"io/fs"
|
||||
"log/slog"
|
||||
"math"
|
||||
"os"
|
||||
|
|
@ -31,6 +32,7 @@ import (
|
|||
"github.com/prometheus/prometheus/tsdb/fileutil"
|
||||
"github.com/prometheus/prometheus/tsdb/record"
|
||||
"github.com/prometheus/prometheus/tsdb/tombstones"
|
||||
"github.com/prometheus/prometheus/tsdb/tsdbutil"
|
||||
)
|
||||
|
||||
// CheckpointStats returns stats about a created checkpoint.
|
||||
|
|
@ -80,8 +82,16 @@ func DeleteCheckpoints(dir string, maxIndex int) error {
|
|||
return errors.Join(errs...)
|
||||
}
|
||||
|
||||
// CheckpointPrefix is the prefix used for checkpoint files.
|
||||
const CheckpointPrefix = "checkpoint."
|
||||
// checkpointTempFileSuffix is the suffix used when creating temporary checkpoint files.
|
||||
const checkpointTempFileSuffix = ".tmp"
|
||||
|
||||
// DeleteTempCheckpoints deletes all temporary checkpoint directories in the given directory.
|
||||
func DeleteTempCheckpoints(logger *slog.Logger, dir string) error {
|
||||
if err := tsdbutil.RemoveTmpDirs(logger, dir, isTempDir); err != nil {
|
||||
return fmt.Errorf("remove previous temporary checkpoint dirs: %w", err)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// Checkpoint creates a compacted checkpoint of segments in range [from, to] in the given WAL.
|
||||
// It includes the most recent checkpoint if it exists.
|
||||
|
|
@ -123,13 +133,13 @@ func Checkpoint(logger *slog.Logger, w *WL, from, to int, keep func(id chunks.He
|
|||
defer sgmReader.Close()
|
||||
}
|
||||
|
||||
cpdir := checkpointDir(w.Dir(), to)
|
||||
cpdirtmp := cpdir + ".tmp"
|
||||
|
||||
if err := os.RemoveAll(cpdirtmp); err != nil {
|
||||
return nil, fmt.Errorf("remove previous temporary checkpoint dir: %w", err)
|
||||
if err := DeleteTempCheckpoints(logger, w.Dir()); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
cpdir := checkpointDir(w.Dir(), to)
|
||||
cpdirtmp := cpdir + checkpointTempFileSuffix
|
||||
|
||||
if err := os.MkdirAll(cpdirtmp, 0o777); err != nil {
|
||||
return nil, fmt.Errorf("create checkpoint dir: %w", err)
|
||||
}
|
||||
|
|
@ -394,8 +404,11 @@ func Checkpoint(logger *slog.Logger, w *WL, from, to int, keep func(id chunks.He
|
|||
return stats, nil
|
||||
}
|
||||
|
||||
// checkpointPrefix is the prefix used for checkpoint files.
|
||||
const checkpointPrefix = "checkpoint."
|
||||
|
||||
func checkpointDir(dir string, i int) string {
|
||||
return filepath.Join(dir, fmt.Sprintf(CheckpointPrefix+"%08d", i))
|
||||
return filepath.Join(dir, fmt.Sprintf(checkpointPrefix+"%08d", i))
|
||||
}
|
||||
|
||||
type checkpointRef struct {
|
||||
|
|
@ -411,13 +424,13 @@ func listCheckpoints(dir string) (refs []checkpointRef, err error) {
|
|||
|
||||
for i := range files {
|
||||
fi := files[i]
|
||||
if !strings.HasPrefix(fi.Name(), CheckpointPrefix) {
|
||||
if !strings.HasPrefix(fi.Name(), checkpointPrefix) {
|
||||
continue
|
||||
}
|
||||
if !fi.IsDir() {
|
||||
return nil, fmt.Errorf("checkpoint %s is not a directory", fi.Name())
|
||||
}
|
||||
idx, err := strconv.Atoi(fi.Name()[len(CheckpointPrefix):])
|
||||
idx, err := strconv.Atoi(fi.Name()[len(checkpointPrefix):])
|
||||
if err != nil {
|
||||
continue
|
||||
}
|
||||
|
|
@ -431,3 +444,7 @@ func listCheckpoints(dir string) (refs []checkpointRef, err error) {
|
|||
|
||||
return refs, nil
|
||||
}
|
||||
|
||||
func isTempDir(fi fs.DirEntry) bool {
|
||||
return strings.HasPrefix(fi.Name(), checkpointPrefix) && strings.HasSuffix(fi.Name(), checkpointTempFileSuffix)
|
||||
}
|
||||
|
|
|
|||
|
|
@ -417,3 +417,81 @@ func TestCheckpointNoTmpFolderAfterError(t *testing.T) {
|
|||
})
|
||||
require.NoError(t, err)
|
||||
}
|
||||
|
||||
func TestCheckpointDeletesTemporaryCheckpoints(t *testing.T) {
|
||||
dir := t.TempDir()
|
||||
|
||||
// Create one tmp checkpoint directory
|
||||
require.NoError(t, os.MkdirAll(filepath.Join(dir, "checkpoint.00001000.tmp"), 0o777))
|
||||
|
||||
w, err := New(nil, nil, dir, compression.None)
|
||||
require.NoError(t, err)
|
||||
defer w.Close()
|
||||
|
||||
_, err = Checkpoint(promslog.NewNopLogger(), w, 0, 1000, func(_ chunks.HeadSeriesRef) bool { return true }, 1000)
|
||||
require.NoError(t, err)
|
||||
|
||||
files, err := os.ReadDir(dir)
|
||||
require.NoError(t, err)
|
||||
|
||||
var actualDirectories []string
|
||||
for _, f := range files {
|
||||
if !f.IsDir() {
|
||||
continue
|
||||
}
|
||||
actualDirectories = append(actualDirectories, f.Name())
|
||||
}
|
||||
require.Equal(t, []string{"checkpoint.00001000"}, actualDirectories)
|
||||
}
|
||||
|
||||
func TestDeleteTempCheckpoints(t *testing.T) {
|
||||
testCases := []struct {
|
||||
name string
|
||||
checkpointDirectoriesToCreate []string
|
||||
expectedDirectories []string
|
||||
}{
|
||||
{
|
||||
name: "no tmp checkpoints",
|
||||
checkpointDirectoriesToCreate: nil,
|
||||
expectedDirectories: nil,
|
||||
},
|
||||
{
|
||||
name: "one tmp checkpoint",
|
||||
checkpointDirectoriesToCreate: []string{"checkpoint.00001000.tmp"},
|
||||
expectedDirectories: nil,
|
||||
},
|
||||
{
|
||||
name: "many tmp checkpoints",
|
||||
checkpointDirectoriesToCreate: []string{"checkpoint.00000001.tmp", "checkpoint.00001000.tmp", "checkpoint.00002000.tmp"},
|
||||
expectedDirectories: nil,
|
||||
},
|
||||
{
|
||||
name: "mix of tmp and regular checkpoints",
|
||||
checkpointDirectoriesToCreate: []string{"checkpoint.00000001", "checkpoint.00000001.tmp", "checkpoint.00001000.tmp", "checkpoint.00002000"},
|
||||
expectedDirectories: []string{"checkpoint.00000001", "checkpoint.00002000"},
|
||||
},
|
||||
}
|
||||
|
||||
for _, tc := range testCases {
|
||||
t.Run(tc.name, func(t *testing.T) {
|
||||
dir := t.TempDir()
|
||||
for _, fn := range tc.checkpointDirectoriesToCreate {
|
||||
require.NoError(t, os.MkdirAll(filepath.Join(dir, fn), 0o777))
|
||||
}
|
||||
|
||||
require.NoError(t, DeleteTempCheckpoints(promslog.NewNopLogger(), dir))
|
||||
|
||||
files, err := os.ReadDir(dir)
|
||||
require.NoError(t, err)
|
||||
|
||||
var actualDirectories []string
|
||||
for _, f := range files {
|
||||
if !f.IsDir() {
|
||||
continue
|
||||
}
|
||||
actualDirectories = append(actualDirectories, f.Name())
|
||||
}
|
||||
require.Equal(t, tc.expectedDirectories, actualDirectories)
|
||||
})
|
||||
}
|
||||
}
|
||||
|
|
|
|||
Loading…
Reference in a new issue