Use buffered files

Signed-off-by: Ganesh Vernekar <ganesh.vernekar@reddit.com>
This commit is contained in:
Ganesh Vernekar 2026-01-25 12:36:51 -08:00
parent 9eb78735cf
commit 3bc827df44
9 changed files with 1524 additions and 9 deletions

View file

@ -297,7 +297,9 @@ func (*ChunkDiskMapper) RemoveMasks(sourceEncoding chunkenc.Encoding) chunkenc.E
return chunkenc.Encoding(restored)
}
// openMMapFiles opens all files within dir for mmapping.
// openMMapFiles opens all files within dir for reading.
// Despite the name, this function now uses the configured file reading mode
// (mmap or buffered) based on fileutil.GetFileReaderConfig().
func (cdm *ChunkDiskMapper) openMMapFiles() (returnErr error) {
cdm.mmappedChunkFiles = map[int]*mmappedChunkFile{}
cdm.closers = map[int]io.Closer{}
@ -322,12 +324,12 @@ func (cdm *ChunkDiskMapper) openMMapFiles() (returnErr error) {
chkFileIndices := make([]int, 0, len(files))
for seq, fn := range files {
f, err := fileutil.OpenMmapFile(fn)
f, err := fileutil.OpenBufferedFileReader(fn)
if err != nil {
return fmt.Errorf("mmap files, file: %s: %w", fn, err)
return fmt.Errorf("open file %s: %w", fn, err)
}
cdm.closers[seq] = f
cdm.mmappedChunkFiles[seq] = &mmappedChunkFile{byteSlice: realByteSlice(f.Bytes())}
cdm.mmappedChunkFiles[seq] = &mmappedChunkFile{byteSlice: f}
chkFileIndices = append(chkFileIndices, seq)
}
@ -596,7 +598,7 @@ func (cdm *ChunkDiskMapper) cutAndExpectRef(chkRef ChunkDiskMapperRef) (err erro
return nil
}
// cut creates a new m-mapped file. The write lock should be held before calling this.
// cut creates a new chunk file for writing. The write lock should be held before calling this.
// It returns the file sequence and the offset in that file to start writing chunks.
func (cdm *ChunkDiskMapper) cut() (seq, offset int, returnErr error) {
// Sync current tail to disk and close.
@ -625,7 +627,9 @@ func (cdm *ChunkDiskMapper) cut() (seq, offset int, returnErr error) {
cdm.readPathMtx.Unlock()
}
mmapFile, err := fileutil.OpenMmapFileWithSize(newFile.Name(), MaxHeadChunkFileSize)
// Open the file for buffered reading.
// We specify the full preallocated size so the reader knows the expected file size.
readFile, err := fileutil.OpenBufferedFileReaderWithSize(newFile.Name(), MaxHeadChunkFileSize)
if err != nil {
return 0, 0, err
}
@ -639,8 +643,8 @@ func (cdm *ChunkDiskMapper) cut() (seq, offset int, returnErr error) {
cdm.chkWriter = bufio.NewWriterSize(newFile, cdm.writeBufferSize)
}
cdm.closers[cdm.curFileSequence] = mmapFile
cdm.mmappedChunkFiles[cdm.curFileSequence] = &mmappedChunkFile{byteSlice: realByteSlice(mmapFile.Bytes())}
cdm.closers[cdm.curFileSequence] = readFile
cdm.mmappedChunkFiles[cdm.curFileSequence] = &mmappedChunkFile{byteSlice: readFile}
cdm.readPathMtx.Unlock()
cdm.curFileMaxt = 0

View file

@ -1924,7 +1924,7 @@ func TestCompactEmptyResultBlockWithTombstone(t *testing.T) {
func TestDelayedCompaction(t *testing.T) {
// The delay is chosen in such a way as to not slow down the tests, but also to make
// the effective compaction duration negligible compared to it, so that the duration comparisons make sense.
delay := 1000 * time.Millisecond
delay := 3000 * time.Millisecond
waitUntilCompactedAndCheck := func(db *DB) {
t.Helper()

View file

@ -0,0 +1,256 @@
// Copyright The Prometheus Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package fileutil
import (
"fmt"
"io"
"os"
"sync"
)
// BufferedFile provides buffered access to a file with optional caching.
// It implements an interface compatible with byte slice access patterns
// used by chunks and index readers.
type BufferedFile struct {
f *os.File
size int64
fileID uint64
cache *FileCache
// Mutex for file operations (pread is thread-safe on most systems,
// but we need to protect against concurrent close)
mu sync.RWMutex
closed bool
}
// OpenBufferedFile opens a file for buffered reading with optional caching.
// If cache is nil, every read goes directly to disk.
func OpenBufferedFile(path string, cache *FileCache) (*BufferedFile, error) {
return OpenBufferedFileWithSize(path, 0, cache)
}
// OpenBufferedFileWithSize opens a file for buffered reading with an expected size.
// If size is 0 or negative, the actual file size is used.
func OpenBufferedFileWithSize(path string, size int, cache *FileCache) (*BufferedFile, error) {
f, err := os.Open(path)
if err != nil {
return nil, fmt.Errorf("open file: %w", err)
}
fileSize := int64(size)
if fileSize <= 0 {
info, err := f.Stat()
if err != nil {
f.Close()
return nil, fmt.Errorf("stat: %w", err)
}
fileSize = info.Size()
}
var fileID uint64
if cache != nil {
fileID = cache.NextFileID()
}
return &BufferedFile{
f: f,
size: fileSize,
fileID: fileID,
cache: cache,
}, nil
}
// Len returns the total size of the file.
func (bf *BufferedFile) Len() int {
return int(bf.size)
}
// Range returns a byte slice for the given range [start, end).
// The returned slice is a copy and safe to use after the file is closed.
func (bf *BufferedFile) Range(start, end int) []byte {
bf.mu.RLock()
defer bf.mu.RUnlock()
if bf.closed {
return nil
}
length := end - start
if length <= 0 {
return nil
}
// Clamp to file size
if int64(end) > bf.size {
end = int(bf.size)
length = end - start
}
if length <= 0 {
return nil
}
result := make([]byte, length)
if bf.cache != nil {
bf.readWithCache(start, result)
} else {
bf.readDirect(start, result)
}
return result
}
// readWithCache reads data using the cache.
func (bf *BufferedFile) readWithCache(offset int, buf []byte) {
blockSize := bf.cache.BlockSize()
remaining := len(buf)
bufOffset := 0
for remaining > 0 {
block := int64(offset / blockSize)
blockStart := int(block) * blockSize
offsetInBlock := offset - blockStart
// Try to get from cache
cached := bf.cache.Get(bf.fileID, block)
if cached == nil {
// Cache miss - read the block from disk
cached = bf.readBlock(block)
if cached != nil {
bf.cache.Put(bf.fileID, block, cached)
}
}
if cached == nil {
// Failed to read, fill with zeros or return partial
break
}
// Copy from cached block to result
available := len(cached) - offsetInBlock
if available <= 0 {
break
}
toCopy := remaining
if toCopy > available {
toCopy = available
}
copy(buf[bufOffset:bufOffset+toCopy], cached[offsetInBlock:offsetInBlock+toCopy])
bufOffset += toCopy
offset += toCopy
remaining -= toCopy
}
}
// readBlock reads a full block from disk.
func (bf *BufferedFile) readBlock(block int64) []byte {
blockSize := bf.cache.BlockSize()
offset := block * int64(blockSize)
// Determine how much to read
toRead := int64(blockSize)
if offset+toRead > bf.size {
toRead = bf.size - offset
}
if toRead <= 0 {
return nil
}
buf := make([]byte, toRead)
n, err := bf.f.ReadAt(buf, offset)
if err != nil && err != io.EOF {
return nil
}
return buf[:n]
}
// readDirect reads data directly from disk without caching.
func (bf *BufferedFile) readDirect(offset int, buf []byte) {
n, err := bf.f.ReadAt(buf, int64(offset))
if err != nil && err != io.EOF {
// Zero out unread portion
for i := n; i < len(buf); i++ {
buf[i] = 0
}
}
}
// ReadAt implements io.ReaderAt for compatibility.
func (bf *BufferedFile) ReadAt(p []byte, off int64) (n int, err error) {
bf.mu.RLock()
defer bf.mu.RUnlock()
if bf.closed {
return 0, os.ErrClosed
}
if bf.cache != nil {
bf.readWithCache(int(off), p)
// Determine actual bytes read
if off+int64(len(p)) > bf.size {
n = int(bf.size - off)
if n < 0 {
n = 0
}
if n < len(p) {
return n, io.EOF
}
}
return len(p), nil
}
return bf.f.ReadAt(p, off)
}
// File returns the underlying os.File.
func (bf *BufferedFile) File() *os.File {
return bf.f
}
// Bytes returns the entire file content as a byte slice.
// WARNING: This loads the entire file into memory.
// Use Range() for large files.
func (bf *BufferedFile) Bytes() []byte {
return bf.Range(0, int(bf.size))
}
// Close closes the file and invalidates any cached data.
func (bf *BufferedFile) Close() error {
bf.mu.Lock()
defer bf.mu.Unlock()
if bf.closed {
return nil
}
bf.closed = true
// Invalidate cache entries for this file
if bf.cache != nil {
bf.cache.InvalidateFile(bf.fileID)
}
return bf.f.Close()
}
// IsClosed returns true if the file has been closed.
func (bf *BufferedFile) IsClosed() bool {
bf.mu.RLock()
defer bf.mu.RUnlock()
return bf.closed
}

View file

@ -0,0 +1,285 @@
// Copyright The Prometheus Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package fileutil
import (
"bytes"
"os"
"path/filepath"
"testing"
)
func TestBufferedFile_BasicRead(t *testing.T) {
// Create a temp file with known content
dir := t.TempDir()
path := filepath.Join(dir, "test.bin")
data := make([]byte, 10000)
for i := range data {
data[i] = byte(i % 256)
}
if err := os.WriteFile(path, data, 0644); err != nil {
t.Fatal(err)
}
// Test without cache
bf, err := OpenBufferedFile(path, nil)
if err != nil {
t.Fatal(err)
}
defer bf.Close()
if bf.Len() != len(data) {
t.Errorf("expected len %d, got %d", len(data), bf.Len())
}
// Read full content
got := bf.Bytes()
if !bytes.Equal(got, data) {
t.Error("full content mismatch")
}
// Read range
got = bf.Range(100, 200)
if !bytes.Equal(got, data[100:200]) {
t.Error("range content mismatch")
}
}
func TestBufferedFile_WithCache(t *testing.T) {
dir := t.TempDir()
path := filepath.Join(dir, "test.bin")
// Create file larger than block size to test multiple blocks
data := make([]byte, DefaultBlockSize*3+1000)
for i := range data {
data[i] = byte(i % 256)
}
if err := os.WriteFile(path, data, 0644); err != nil {
t.Fatal(err)
}
cache := NewFileCache(FileCacheOptions{
MaxSize: DefaultCacheSize,
BlockSize: DefaultBlockSize,
})
bf, err := OpenBufferedFile(path, cache)
if err != nil {
t.Fatal(err)
}
defer bf.Close()
// First read - should populate cache
got := bf.Range(0, 100)
if !bytes.Equal(got, data[:100]) {
t.Error("first read mismatch")
}
// Check cache stats
hits, misses, _, _ := cache.Stats()
if misses != 1 {
t.Errorf("expected 1 miss, got %d", misses)
}
// Second read from same block - should hit cache
got = bf.Range(50, 150)
if !bytes.Equal(got, data[50:150]) {
t.Error("second read mismatch")
}
hits, _, _, _ = cache.Stats()
if hits != 1 {
t.Errorf("expected 1 hit, got %d", hits)
}
// Read spanning multiple blocks
start := DefaultBlockSize - 100
end := DefaultBlockSize + 100
got = bf.Range(start, end)
if !bytes.Equal(got, data[start:end]) {
t.Error("cross-block read mismatch")
}
}
func TestBufferedFile_ReadAt(t *testing.T) {
dir := t.TempDir()
path := filepath.Join(dir, "test.bin")
data := []byte("Hello, World! This is test data.")
if err := os.WriteFile(path, data, 0644); err != nil {
t.Fatal(err)
}
bf, err := OpenBufferedFile(path, nil)
if err != nil {
t.Fatal(err)
}
defer bf.Close()
buf := make([]byte, 5)
n, err := bf.ReadAt(buf, 7)
if err != nil {
t.Fatal(err)
}
if n != 5 {
t.Errorf("expected to read 5 bytes, got %d", n)
}
if string(buf) != "World" {
t.Errorf("expected 'World', got '%s'", string(buf))
}
}
func TestBufferedFile_Close(t *testing.T) {
dir := t.TempDir()
path := filepath.Join(dir, "test.bin")
data := []byte("test data")
if err := os.WriteFile(path, data, 0644); err != nil {
t.Fatal(err)
}
cache := NewFileCache(FileCacheOptions{
MaxSize: 1024 * 1024,
BlockSize: 1024,
})
bf, err := OpenBufferedFile(path, cache)
if err != nil {
t.Fatal(err)
}
// Read to populate cache
bf.Range(0, 5)
// Close should invalidate cache
bf.Close()
// Verify cache is cleared for this file
// (Size should be 0 if only one file was cached)
if cache.Size() != 0 {
t.Errorf("expected cache to be cleared after close, size: %d", cache.Size())
}
// Reading after close should return nil
got := bf.Range(0, 5)
if got != nil {
t.Error("expected nil after close")
}
}
func TestBufferedFile_EdgeCases(t *testing.T) {
dir := t.TempDir()
path := filepath.Join(dir, "test.bin")
data := []byte("short")
if err := os.WriteFile(path, data, 0644); err != nil {
t.Fatal(err)
}
bf, err := OpenBufferedFile(path, nil)
if err != nil {
t.Fatal(err)
}
defer bf.Close()
// Empty range
got := bf.Range(2, 2)
if len(got) != 0 {
t.Error("expected empty slice for equal start/end")
}
// Invalid range (start > end)
got = bf.Range(3, 2)
if got != nil {
t.Error("expected nil for invalid range")
}
// Range beyond file size
got = bf.Range(0, 100)
if len(got) != len(data) {
t.Errorf("expected range clamped to file size, got %d bytes", len(got))
}
}
func TestBufferedFileReader(t *testing.T) {
dir := t.TempDir()
path := filepath.Join(dir, "test.bin")
data := []byte("test data for buffered reader")
if err := os.WriteFile(path, data, 0644); err != nil {
t.Fatal(err)
}
// Configure the buffered reader
SetBufferedFileReaderConfig(BufferedFileReaderConfig{
CacheSize: 1024 * 1024,
BlockSize: 1024,
})
reader, err := OpenBufferedFileReader(path)
if err != nil {
t.Fatal(err)
}
defer reader.Close()
if reader.Len() != len(data) {
t.Errorf("expected len %d, got %d", len(data), reader.Len())
}
got := reader.Range(0, 4)
if string(got) != "test" {
t.Errorf("expected 'test', got '%s'", string(got))
}
// Check that we're using the cache
hits, misses, size, _ := GlobalCacheStats()
if hits+misses == 0 {
t.Error("expected cache to be used")
}
if size == 0 {
t.Error("expected cache to have data")
}
}
func TestBufferedFileReaderConfig(t *testing.T) {
dir := t.TempDir()
path := filepath.Join(dir, "test.bin")
data := []byte("test data")
if err := os.WriteFile(path, data, 0644); err != nil {
t.Fatal(err)
}
// Set custom config
SetBufferedFileReaderConfig(BufferedFileReaderConfig{
CacheSize: 512 * 1024,
BlockSize: 4096,
})
reader, err := OpenBufferedFileReader(path)
if err != nil {
t.Fatal(err)
}
reader.Close()
// Verify config was applied
cfg := GetBufferedFileReaderConfig()
if cfg.CacheSize != 512*1024 {
t.Errorf("expected CacheSize 512KB, got %d", cfg.CacheSize)
}
if cfg.BlockSize != 4096 {
t.Errorf("expected BlockSize 4096, got %d", cfg.BlockSize)
}
}

266
tsdb/fileutil/file_cache.go Normal file
View file

@ -0,0 +1,266 @@
// Copyright The Prometheus Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package fileutil
import (
"container/list"
"sync"
"sync/atomic"
)
const (
// DefaultBlockSize is the size of each cached block (64KiB).
// This is chosen to balance between cache granularity and overhead.
DefaultBlockSize = 64 * 1024
// DefaultCacheSize is the default maximum cache size (512MiB).
DefaultCacheSize = 512 * 1024 * 1024
)
// cacheKey uniquely identifies a block in the cache.
type cacheKey struct {
fileID uint64
block int64 // block index (offset / blockSize)
}
// cacheEntry holds the cached data and its position in the LRU list.
type cacheEntry struct {
key cacheKey
data []byte
elem *list.Element
size int // actual size of data (may be less than blockSize for last block)
}
// FileCache is a shared LRU cache for file blocks.
// It provides configurable memory limits and efficient eviction.
type FileCache struct {
mu sync.RWMutex
maxSize int64
currentSize int64
blockSize int
entries map[cacheKey]*cacheEntry
lru *list.List // front = most recently used
// Buffer pool for allocating blocks
pool sync.Pool
// Metrics
hits atomic.Uint64
misses atomic.Uint64
// File ID counter for unique identification
nextFileID atomic.Uint64
}
// FileCacheOptions configures the file cache.
type FileCacheOptions struct {
MaxSize int64 // Maximum cache size in bytes
BlockSize int // Size of each cached block
}
// DefaultFileCacheOptions returns the default cache configuration.
func DefaultFileCacheOptions() FileCacheOptions {
return FileCacheOptions{
MaxSize: DefaultCacheSize,
BlockSize: DefaultBlockSize,
}
}
// NewFileCache creates a new file cache with the given options.
func NewFileCache(opts FileCacheOptions) *FileCache {
if opts.MaxSize <= 0 {
opts.MaxSize = DefaultCacheSize
}
if opts.BlockSize <= 0 {
opts.BlockSize = DefaultBlockSize
}
fc := &FileCache{
maxSize: opts.MaxSize,
blockSize: opts.BlockSize,
entries: make(map[cacheKey]*cacheEntry),
lru: list.New(),
}
fc.pool = sync.Pool{
New: func() interface{} {
return make([]byte, fc.blockSize)
},
}
return fc
}
// NextFileID returns a unique file ID for cache key generation.
func (fc *FileCache) NextFileID() uint64 {
return fc.nextFileID.Add(1)
}
// BlockSize returns the configured block size.
func (fc *FileCache) BlockSize() int {
return fc.blockSize
}
// Get retrieves a block from the cache.
// Returns nil if the block is not cached.
func (fc *FileCache) Get(fileID uint64, block int64) []byte {
key := cacheKey{fileID: fileID, block: block}
fc.mu.RLock()
entry, ok := fc.entries[key]
fc.mu.RUnlock()
if !ok {
fc.misses.Add(1)
return nil
}
// Move to front (most recently used)
fc.mu.Lock()
// Re-check after acquiring write lock
entry, ok = fc.entries[key]
if ok {
fc.lru.MoveToFront(entry.elem)
}
fc.mu.Unlock()
if ok {
fc.hits.Add(1)
return entry.data[:entry.size]
}
fc.misses.Add(1)
return nil
}
// Put adds a block to the cache.
// If the cache is full, the least recently used blocks are evicted.
func (fc *FileCache) Put(fileID uint64, block int64, data []byte) {
key := cacheKey{fileID: fileID, block: block}
dataSize := len(data)
fc.mu.Lock()
defer fc.mu.Unlock()
// Check if already exists
if entry, ok := fc.entries[key]; ok {
// Update existing entry
fc.lru.MoveToFront(entry.elem)
// If size changed, update
if entry.size != dataSize {
fc.currentSize += int64(dataSize - entry.size)
entry.size = dataSize
copy(entry.data, data)
}
return
}
// Evict if necessary
for fc.currentSize+int64(fc.blockSize) > fc.maxSize && fc.lru.Len() > 0 {
fc.evictLocked()
}
// Allocate from pool
buf := fc.pool.Get().([]byte)
copy(buf, data)
entry := &cacheEntry{
key: key,
data: buf,
size: dataSize,
}
entry.elem = fc.lru.PushFront(entry)
fc.entries[key] = entry
fc.currentSize += int64(fc.blockSize) // Account for full block allocation
}
// evictLocked removes the least recently used entry.
// Caller must hold fc.mu.
func (fc *FileCache) evictLocked() {
elem := fc.lru.Back()
if elem == nil {
return
}
entry := elem.Value.(*cacheEntry)
fc.lru.Remove(elem)
delete(fc.entries, entry.key)
fc.currentSize -= int64(fc.blockSize)
// Return buffer to pool
fc.pool.Put(entry.data)
}
// InvalidateFile removes all cached blocks for a specific file.
// Call this when a file is closed or deleted.
func (fc *FileCache) InvalidateFile(fileID uint64) {
fc.mu.Lock()
defer fc.mu.Unlock()
// Collect entries to remove
var toRemove []*cacheEntry
for key, entry := range fc.entries {
if key.fileID == fileID {
toRemove = append(toRemove, entry)
}
}
// Remove them
for _, entry := range toRemove {
fc.lru.Remove(entry.elem)
delete(fc.entries, entry.key)
fc.currentSize -= int64(fc.blockSize)
fc.pool.Put(entry.data)
}
}
// Clear removes all entries from the cache.
func (fc *FileCache) Clear() {
fc.mu.Lock()
defer fc.mu.Unlock()
for _, entry := range fc.entries {
fc.pool.Put(entry.data)
}
fc.entries = make(map[cacheKey]*cacheEntry)
fc.lru.Init()
fc.currentSize = 0
}
// Stats returns cache statistics.
func (fc *FileCache) Stats() (hits, misses uint64, size, maxSize int64) {
fc.mu.RLock()
size = fc.currentSize
fc.mu.RUnlock()
return fc.hits.Load(), fc.misses.Load(), size, fc.maxSize
}
// Size returns the current cache size in bytes.
func (fc *FileCache) Size() int64 {
fc.mu.RLock()
defer fc.mu.RUnlock()
return fc.currentSize
}
// SetMaxSize updates the maximum cache size and evicts entries if necessary.
func (fc *FileCache) SetMaxSize(maxSize int64) {
fc.mu.Lock()
defer fc.mu.Unlock()
fc.maxSize = maxSize
for fc.currentSize > fc.maxSize && fc.lru.Len() > 0 {
fc.evictLocked()
}
}

View file

@ -0,0 +1,143 @@
// Copyright The Prometheus Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package fileutil
import (
"github.com/prometheus/client_golang/prometheus"
)
// FileCacheMetrics holds Prometheus metrics for the file cache.
type FileCacheMetrics struct {
cacheHits prometheus.Counter
cacheMisses prometheus.Counter
cacheSize prometheus.Gauge
cacheMaxSize prometheus.Gauge
cacheEvictions prometheus.Counter
cacheHitRatio prometheus.GaugeFunc
}
// NewFileCacheMetrics creates metrics for a FileCache.
// The returned metrics are not registered; call Register() on the collector
// or register individual metrics manually.
func NewFileCacheMetrics(cache *FileCache) *FileCacheMetrics {
m := &FileCacheMetrics{
cacheHits: prometheus.NewCounter(prometheus.CounterOpts{
Name: "prometheus_tsdb_file_cache_hits_total",
Help: "Total number of file cache hits.",
}),
cacheMisses: prometheus.NewCounter(prometheus.CounterOpts{
Name: "prometheus_tsdb_file_cache_misses_total",
Help: "Total number of file cache misses.",
}),
cacheSize: prometheus.NewGauge(prometheus.GaugeOpts{
Name: "prometheus_tsdb_file_cache_size_bytes",
Help: "Current size of the file cache in bytes.",
}),
cacheMaxSize: prometheus.NewGauge(prometheus.GaugeOpts{
Name: "prometheus_tsdb_file_cache_max_size_bytes",
Help: "Maximum size of the file cache in bytes.",
}),
cacheEvictions: prometheus.NewCounter(prometheus.CounterOpts{
Name: "prometheus_tsdb_file_cache_evictions_total",
Help: "Total number of cache evictions.",
}),
}
if cache != nil {
m.cacheHitRatio = prometheus.NewGaugeFunc(prometheus.GaugeOpts{
Name: "prometheus_tsdb_file_cache_hit_ratio",
Help: "Cache hit ratio (hits / (hits + misses)).",
}, func() float64 {
hits, misses, _, _ := cache.Stats()
total := hits + misses
if total == 0 {
return 0
}
return float64(hits) / float64(total)
})
}
return m
}
// Describe implements prometheus.Collector.
func (m *FileCacheMetrics) Describe(ch chan<- *prometheus.Desc) {
m.cacheHits.Describe(ch)
m.cacheMisses.Describe(ch)
m.cacheSize.Describe(ch)
m.cacheMaxSize.Describe(ch)
m.cacheEvictions.Describe(ch)
if m.cacheHitRatio != nil {
m.cacheHitRatio.Describe(ch)
}
}
// Collect implements prometheus.Collector.
func (m *FileCacheMetrics) Collect(ch chan<- prometheus.Metric) {
m.cacheHits.Collect(ch)
m.cacheMisses.Collect(ch)
m.cacheSize.Collect(ch)
m.cacheMaxSize.Collect(ch)
m.cacheEvictions.Collect(ch)
if m.cacheHitRatio != nil {
m.cacheHitRatio.Collect(ch)
}
}
// Update updates the metrics from the cache.
// Call this periodically to keep metrics current.
func (m *FileCacheMetrics) Update(cache *FileCache) {
if cache == nil {
return
}
hits, misses, size, maxSize := cache.Stats()
m.cacheHits.Add(0) // Counter maintains its own value; this just ensures it exists
m.cacheMisses.Add(0) // Counter maintains its own value; this just ensures it exists
m.cacheSize.Set(float64(size))
m.cacheMaxSize.Set(float64(maxSize))
// Note: For accurate hit/miss counters, we'd need to track deltas
// or have the cache directly increment the prometheus counters.
_ = hits
_ = misses
}
// FileCacheWithMetrics wraps a FileCache and updates Prometheus metrics.
type FileCacheWithMetrics struct {
*FileCache
metrics *FileCacheMetrics
}
// NewFileCacheWithMetrics creates a new FileCache with Prometheus metrics.
func NewFileCacheWithMetrics(opts FileCacheOptions, reg prometheus.Registerer) (*FileCacheWithMetrics, error) {
cache := NewFileCache(opts)
metrics := NewFileCacheMetrics(cache)
if reg != nil {
if err := reg.Register(metrics); err != nil {
return nil, err
}
}
return &FileCacheWithMetrics{
FileCache: cache,
metrics: metrics,
}, nil
}
// Metrics returns the metrics for this cache.
func (c *FileCacheWithMetrics) Metrics() *FileCacheMetrics {
return c.metrics
}

View file

@ -0,0 +1,253 @@
// Copyright The Prometheus Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package fileutil
import (
"bytes"
"sync"
"testing"
)
func TestFileCache_BasicOperations(t *testing.T) {
cache := NewFileCache(FileCacheOptions{
MaxSize: 1024 * 1024, // 1MB
BlockSize: 1024, // 1KB blocks
})
// Test put and get
fileID := cache.NextFileID()
data := make([]byte, 1024)
for i := range data {
data[i] = byte(i % 256)
}
cache.Put(fileID, 0, data)
got := cache.Get(fileID, 0)
if got == nil {
t.Fatal("expected to get cached data, got nil")
}
if !bytes.Equal(got, data) {
t.Error("cached data doesn't match original")
}
// Test cache miss
got = cache.Get(fileID, 1)
if got != nil {
t.Error("expected nil for uncached block")
}
// Test stats
hits, misses, size, maxSize := cache.Stats()
if hits != 1 {
t.Errorf("expected 1 hit, got %d", hits)
}
if misses != 1 {
t.Errorf("expected 1 miss, got %d", misses)
}
if size != 1024 {
t.Errorf("expected size 1024, got %d", size)
}
if maxSize != 1024*1024 {
t.Errorf("expected maxSize 1MB, got %d", maxSize)
}
}
func TestFileCache_Eviction(t *testing.T) {
// Create cache that can hold exactly 2 blocks
cache := NewFileCache(FileCacheOptions{
MaxSize: 2048,
BlockSize: 1024,
})
fileID := cache.NextFileID()
data := make([]byte, 1024)
// Add 3 blocks - should evict the first one
for i := 0; i < 3; i++ {
for j := range data {
data[j] = byte(i)
}
cache.Put(fileID, int64(i), data)
}
// Block 0 should be evicted (LRU)
got := cache.Get(fileID, 0)
if got != nil {
t.Error("expected block 0 to be evicted")
}
// Blocks 1 and 2 should still be present
got = cache.Get(fileID, 1)
if got == nil {
t.Error("expected block 1 to be present")
}
got = cache.Get(fileID, 2)
if got == nil {
t.Error("expected block 2 to be present")
}
}
func TestFileCache_LRUOrder(t *testing.T) {
cache := NewFileCache(FileCacheOptions{
MaxSize: 3072, // 3 blocks
BlockSize: 1024,
})
fileID := cache.NextFileID()
data := make([]byte, 1024)
// Add 3 blocks
for i := 0; i < 3; i++ {
cache.Put(fileID, int64(i), data)
}
// Access block 0 to make it most recently used
cache.Get(fileID, 0)
// Add block 3 - should evict block 1 (now LRU)
cache.Put(fileID, 3, data)
// Block 1 should be evicted
if cache.Get(fileID, 1) != nil {
t.Error("expected block 1 to be evicted")
}
// Blocks 0, 2, 3 should still be present
if cache.Get(fileID, 0) == nil {
t.Error("expected block 0 to be present")
}
if cache.Get(fileID, 2) == nil {
t.Error("expected block 2 to be present")
}
if cache.Get(fileID, 3) == nil {
t.Error("expected block 3 to be present")
}
}
func TestFileCache_InvalidateFile(t *testing.T) {
cache := NewFileCache(FileCacheOptions{
MaxSize: 1024 * 1024,
BlockSize: 1024,
})
fileID1 := cache.NextFileID()
fileID2 := cache.NextFileID()
data := make([]byte, 1024)
// Add blocks for two files
cache.Put(fileID1, 0, data)
cache.Put(fileID1, 1, data)
cache.Put(fileID2, 0, data)
cache.Put(fileID2, 1, data)
// Invalidate file 1
cache.InvalidateFile(fileID1)
// File 1 blocks should be gone
if cache.Get(fileID1, 0) != nil {
t.Error("expected file1 block 0 to be invalidated")
}
if cache.Get(fileID1, 1) != nil {
t.Error("expected file1 block 1 to be invalidated")
}
// File 2 blocks should still be present
if cache.Get(fileID2, 0) == nil {
t.Error("expected file2 block 0 to be present")
}
if cache.Get(fileID2, 1) == nil {
t.Error("expected file2 block 1 to be present")
}
}
func TestFileCache_Clear(t *testing.T) {
cache := NewFileCache(FileCacheOptions{
MaxSize: 1024 * 1024,
BlockSize: 1024,
})
fileID := cache.NextFileID()
data := make([]byte, 1024)
cache.Put(fileID, 0, data)
cache.Put(fileID, 1, data)
cache.Clear()
if cache.Size() != 0 {
t.Errorf("expected size 0 after clear, got %d", cache.Size())
}
if cache.Get(fileID, 0) != nil {
t.Error("expected nil after clear")
}
}
func TestFileCache_ConcurrentAccess(t *testing.T) {
cache := NewFileCache(FileCacheOptions{
MaxSize: 1024 * 1024,
BlockSize: 1024,
})
fileID := cache.NextFileID()
data := make([]byte, 1024)
for i := range data {
data[i] = byte(i % 256)
}
var wg sync.WaitGroup
// Concurrent writes
for i := 0; i < 100; i++ {
wg.Add(1)
go func(block int64) {
defer wg.Done()
cache.Put(fileID, block, data)
}(int64(i % 10))
}
// Concurrent reads
for i := 0; i < 100; i++ {
wg.Add(1)
go func(block int64) {
defer wg.Done()
cache.Get(fileID, block)
}(int64(i % 10))
}
wg.Wait()
}
func TestFileCache_SetMaxSize(t *testing.T) {
cache := NewFileCache(FileCacheOptions{
MaxSize: 4096, // 4 blocks
BlockSize: 1024,
})
fileID := cache.NextFileID()
data := make([]byte, 1024)
// Fill cache with 4 blocks
for i := 0; i < 4; i++ {
cache.Put(fileID, int64(i), data)
}
if cache.Size() != 4096 {
t.Errorf("expected size 4096, got %d", cache.Size())
}
// Reduce max size - should evict blocks
cache.SetMaxSize(2048)
if cache.Size() > 2048 {
t.Errorf("expected size <= 2048, got %d", cache.Size())
}
}

View file

@ -0,0 +1,133 @@
// Copyright The Prometheus Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package fileutil
import (
"sync"
)
// BufferedFileReaderConfig holds configuration for buffered file reading.
type BufferedFileReaderConfig struct {
// CacheSize is the maximum cache size in bytes.
// Default is 512MiB.
CacheSize int64
// BlockSize is the size of each cached block.
// Default is 64KiB.
BlockSize int
}
// DefaultBufferedFileReaderConfig returns the default configuration.
func DefaultBufferedFileReaderConfig() BufferedFileReaderConfig {
return BufferedFileReaderConfig{
CacheSize: DefaultCacheSize,
BlockSize: DefaultBlockSize,
}
}
// bufferedFileReaderManager manages shared resources for buffered file reading.
type bufferedFileReaderManager struct {
mu sync.RWMutex
config BufferedFileReaderConfig
cache *FileCache
}
var globalManager = &bufferedFileReaderManager{
config: DefaultBufferedFileReaderConfig(),
}
// SetBufferedFileReaderConfig sets the global buffered file reader configuration.
// This should be called before opening any files, typically during
// database initialization. A new cache is created or updated based on the config.
func SetBufferedFileReaderConfig(cfg BufferedFileReaderConfig) {
globalManager.mu.Lock()
defer globalManager.mu.Unlock()
globalManager.config = cfg
if globalManager.cache == nil {
globalManager.cache = NewFileCache(FileCacheOptions{
MaxSize: cfg.CacheSize,
BlockSize: cfg.BlockSize,
})
} else {
// Update cache settings
globalManager.cache.SetMaxSize(cfg.CacheSize)
}
}
// GetBufferedFileReaderConfig returns the current global buffered file reader configuration.
func GetBufferedFileReaderConfig() BufferedFileReaderConfig {
globalManager.mu.RLock()
defer globalManager.mu.RUnlock()
return globalManager.config
}
// GetGlobalCache returns the global file cache.
func GetGlobalCache() *FileCache {
globalManager.mu.RLock()
defer globalManager.mu.RUnlock()
return globalManager.cache
}
// ensureCache ensures the global cache is initialized.
func ensureCache() *FileCache {
globalManager.mu.Lock()
defer globalManager.mu.Unlock()
if globalManager.cache == nil {
globalManager.cache = NewFileCache(FileCacheOptions{
MaxSize: globalManager.config.CacheSize,
BlockSize: globalManager.config.BlockSize,
})
}
return globalManager.cache
}
// OpenBufferedFileReader opens a file for buffered reading with caching.
// This is the primary entry point for opening files in the TSDB.
func OpenBufferedFileReader(path string) (*BufferedFile, error) {
return OpenBufferedFileReaderWithSize(path, 0)
}
// OpenBufferedFileReaderWithSize opens a file for buffered reading with an expected size.
func OpenBufferedFileReaderWithSize(path string, size int) (*BufferedFile, error) {
cache := ensureCache()
return OpenBufferedFileWithSize(path, size, cache)
}
// ClearGlobalCache clears the global file cache.
// This can be useful during testing or when memory pressure is high.
func ClearGlobalCache() {
globalManager.mu.RLock()
cache := globalManager.cache
globalManager.mu.RUnlock()
if cache != nil {
cache.Clear()
}
}
// GlobalCacheStats returns statistics for the global file cache.
// Returns zeros if cache is not initialized.
func GlobalCacheStats() (hits, misses uint64, size, maxSize int64) {
globalManager.mu.RLock()
cache := globalManager.cache
globalManager.mu.RUnlock()
if cache != nil {
return cache.Stats()
}
return 0, 0, 0, 0
}

View file

@ -0,0 +1,175 @@
// Copyright The Prometheus Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package fileutil
import (
"bytes"
"os"
"path/filepath"
"testing"
)
// TestBufferedFileReaderCorrectness tests that buffered file reader returns correct data.
func TestBufferedFileReaderCorrectness(t *testing.T) {
dir := t.TempDir()
path := filepath.Join(dir, "test.bin")
// Create a file with substantial data that spans multiple cache blocks
data := make([]byte, DefaultBlockSize*5+12345) // ~5.2 blocks
for i := range data {
data[i] = byte((i * 7) % 256)
}
if err := os.WriteFile(path, data, 0644); err != nil {
t.Fatal(err)
}
// Test ranges to verify
testRanges := []struct {
start, end int
}{
{0, 100}, // Start of file
{len(data) - 100, len(data)}, // End of file
{DefaultBlockSize - 50, DefaultBlockSize + 50}, // Cross block boundary
{0, len(data)}, // Full file
{1000, 2000}, // Middle of file
{DefaultBlockSize * 2, DefaultBlockSize*2 + 1000}, // Later block
}
// Configure buffered reader
SetBufferedFileReaderConfig(BufferedFileReaderConfig{
CacheSize: DefaultCacheSize,
BlockSize: DefaultBlockSize,
})
bufferedReader, err := OpenBufferedFileReader(path)
if err != nil {
t.Fatal(err)
}
defer bufferedReader.Close()
for _, tr := range testRanges {
bufferedResult := bufferedReader.Range(tr.start, tr.end)
expected := data[tr.start:tr.end]
if !bytes.Equal(expected, bufferedResult) {
t.Errorf("range [%d:%d] doesn't match original data", tr.start, tr.end)
}
}
// Verify cache was used
hits, misses, _, _ := GlobalCacheStats()
t.Logf("Cache stats: hits=%d, misses=%d", hits, misses)
if hits+misses == 0 {
t.Error("expected cache to be used")
}
}
// TestBufferedReaderMemoryControl verifies that the cache respects size limits.
func TestBufferedReaderMemoryControl(t *testing.T) {
dir := t.TempDir()
// Create multiple files
numFiles := 10
fileSize := DefaultBlockSize * 10 // 10 blocks per file
paths := make([]string, numFiles)
for i := 0; i < numFiles; i++ {
path := filepath.Join(dir, "test"+string(rune('0'+i))+".bin")
data := make([]byte, fileSize)
for j := range data {
data[j] = byte(i + j%256)
}
if err := os.WriteFile(path, data, 0644); err != nil {
t.Fatal(err)
}
paths[i] = path
}
// Configure cache to hold only a fraction of total data
maxCacheSize := int64(DefaultBlockSize * 20) // Only 20 blocks (2 files worth)
SetBufferedFileReaderConfig(BufferedFileReaderConfig{
CacheSize: maxCacheSize,
BlockSize: DefaultBlockSize,
})
// Open all files and read from them
readers := make([]*BufferedFile, numFiles)
for i, path := range paths {
r, err := OpenBufferedFileReader(path)
if err != nil {
t.Fatal(err)
}
readers[i] = r
// Read the full file to populate cache
_ = r.Bytes()
}
// Check cache size doesn't exceed limit
cache := GetGlobalCache()
if cache == nil {
t.Fatal("expected cache to be initialized")
}
if cache.Size() > maxCacheSize {
t.Errorf("cache size %d exceeds max %d", cache.Size(), maxCacheSize)
}
// Clean up
for _, r := range readers {
r.Close()
}
}
// TestCacheInvalidationOnClose verifies that closing a file clears its cache entries.
func TestCacheInvalidationOnClose(t *testing.T) {
dir := t.TempDir()
path := filepath.Join(dir, "test.bin")
data := make([]byte, DefaultBlockSize*3)
for i := range data {
data[i] = byte(i % 256)
}
if err := os.WriteFile(path, data, 0644); err != nil {
t.Fatal(err)
}
SetBufferedFileReaderConfig(BufferedFileReaderConfig{
CacheSize: DefaultCacheSize,
BlockSize: DefaultBlockSize,
})
// Clear any previous cache state
ClearGlobalCache()
reader, err := OpenBufferedFileReader(path)
if err != nil {
t.Fatal(err)
}
// Read to populate cache
_ = reader.Bytes()
cache := GetGlobalCache()
sizeBeforeClose := cache.Size()
if sizeBeforeClose == 0 {
t.Error("expected cache to have data before close")
}
// Close should invalidate cache
reader.Close()
sizeAfterClose := cache.Size()
if sizeAfterClose != 0 {
t.Errorf("expected cache to be cleared after close, got size %d", sizeAfterClose)
}
}