mirror of
https://github.com/prometheus/prometheus.git
synced 2026-06-09 08:32:26 -04:00
test: Enable parallel execution for chunk write queue tests (#17338)
* test(tsdb): Enable parallel execution for chunk write queue tests Signed-off-by: Harsh <harshmastic@gmail.com>
This commit is contained in:
parent
1a5da4fbe0
commit
f312fde4a2
4 changed files with 19 additions and 0 deletions
|
|
@ -27,6 +27,7 @@ import (
|
|||
)
|
||||
|
||||
func TestChunkWriteQueue_GettingChunkFromQueue(t *testing.T) {
|
||||
t.Parallel()
|
||||
var blockWriterWg sync.WaitGroup
|
||||
blockWriterWg.Add(1)
|
||||
|
||||
|
|
@ -55,6 +56,7 @@ func TestChunkWriteQueue_GettingChunkFromQueue(t *testing.T) {
|
|||
}
|
||||
|
||||
func TestChunkWriteQueue_WritingThroughQueue(t *testing.T) {
|
||||
t.Parallel()
|
||||
var (
|
||||
gotSeriesRef HeadSeriesRef
|
||||
gotMint, gotMaxt int64
|
||||
|
|
@ -97,6 +99,7 @@ func TestChunkWriteQueue_WritingThroughQueue(t *testing.T) {
|
|||
}
|
||||
|
||||
func TestChunkWriteQueue_WrappingAroundSizeLimit(t *testing.T) {
|
||||
t.Parallel()
|
||||
sizeLimit := 100
|
||||
unblockChunkWriterCh := make(chan struct{}, sizeLimit)
|
||||
|
||||
|
|
@ -183,6 +186,7 @@ func TestChunkWriteQueue_WrappingAroundSizeLimit(t *testing.T) {
|
|||
}
|
||||
|
||||
func TestChunkWriteQueue_HandlerErrorViaCallback(t *testing.T) {
|
||||
t.Parallel()
|
||||
testError := errors.New("test error")
|
||||
chunkWriter := func(HeadSeriesRef, int64, int64, chunkenc.Chunk, ChunkDiskMapperRef, bool, bool) error {
|
||||
return testError
|
||||
|
|
|
|||
|
|
@ -23,6 +23,7 @@ import (
|
|||
)
|
||||
|
||||
func TestReaderWithInvalidBuffer(t *testing.T) {
|
||||
t.Parallel()
|
||||
b := realByteSlice([]byte{0x81, 0x81, 0x81, 0x81, 0x81, 0x81})
|
||||
r := &Reader{bs: []ByteSlice{b}}
|
||||
|
||||
|
|
@ -31,6 +32,7 @@ func TestReaderWithInvalidBuffer(t *testing.T) {
|
|||
}
|
||||
|
||||
func TestWriterWithDefaultSegmentSize(t *testing.T) {
|
||||
t.Parallel()
|
||||
chk1, err := ChunkFromSamples([]Sample{
|
||||
sample{t: 10, f: 11},
|
||||
sample{t: 20, f: 12},
|
||||
|
|
|
|||
|
|
@ -45,6 +45,7 @@ func TestMain(m *testing.M) {
|
|||
}
|
||||
|
||||
func TestChunkDiskMapper_WriteChunk_Chunk_IterateChunks(t *testing.T) {
|
||||
t.Parallel()
|
||||
hrw := createChunkDiskMapper(t, "")
|
||||
defer func() {
|
||||
require.NoError(t, hrw.Close())
|
||||
|
|
@ -181,6 +182,7 @@ func TestChunkDiskMapper_WriteChunk_Chunk_IterateChunks(t *testing.T) {
|
|||
// * The active file is not deleted even if the passed time makes it eligible to be deleted.
|
||||
// * Non-empty current file leads to creation of another file after truncation.
|
||||
func TestChunkDiskMapper_Truncate(t *testing.T) {
|
||||
t.Parallel()
|
||||
hrw := createChunkDiskMapper(t, "")
|
||||
defer func() {
|
||||
require.NoError(t, hrw.Close())
|
||||
|
|
@ -275,6 +277,7 @@ func TestChunkDiskMapper_Truncate(t *testing.T) {
|
|||
// This test exposes https://github.com/prometheus/prometheus/issues/7412 where the truncation
|
||||
// simply deleted all empty files instead of stopping once it encountered a non-empty file.
|
||||
func TestChunkDiskMapper_Truncate_PreservesFileSequence(t *testing.T) {
|
||||
t.Parallel()
|
||||
hrw := createChunkDiskMapper(t, "")
|
||||
defer func() {
|
||||
require.NoError(t, hrw.Close())
|
||||
|
|
@ -359,6 +362,7 @@ func TestChunkDiskMapper_Truncate_PreservesFileSequence(t *testing.T) {
|
|||
}
|
||||
|
||||
func TestChunkDiskMapper_Truncate_WriteQueueRaceCondition(t *testing.T) {
|
||||
t.Parallel()
|
||||
hrw := createChunkDiskMapper(t, "")
|
||||
t.Cleanup(func() {
|
||||
require.NoError(t, hrw.Close())
|
||||
|
|
@ -411,6 +415,7 @@ func TestChunkDiskMapper_Truncate_WriteQueueRaceCondition(t *testing.T) {
|
|||
// TestHeadReadWriter_TruncateAfterFailedIterateChunks tests for
|
||||
// https://github.com/prometheus/prometheus/issues/7753
|
||||
func TestHeadReadWriter_TruncateAfterFailedIterateChunks(t *testing.T) {
|
||||
t.Parallel()
|
||||
hrw := createChunkDiskMapper(t, "")
|
||||
defer func() {
|
||||
require.NoError(t, hrw.Close())
|
||||
|
|
@ -442,6 +447,7 @@ func TestHeadReadWriter_TruncateAfterFailedIterateChunks(t *testing.T) {
|
|||
}
|
||||
|
||||
func TestHeadReadWriter_ReadRepairOnEmptyLastFile(t *testing.T) {
|
||||
t.Parallel()
|
||||
hrw := createChunkDiskMapper(t, "")
|
||||
|
||||
timeRange := 0
|
||||
|
|
|
|||
|
|
@ -62,6 +62,7 @@ func (q *writeJobQueue) assertInvariants(t *testing.T) {
|
|||
}
|
||||
|
||||
func TestQueuePushPopSingleGoroutine(t *testing.T) {
|
||||
t.Parallel()
|
||||
seed := time.Now().UnixNano()
|
||||
t.Log("seed:", seed)
|
||||
r := rand.New(rand.NewSource(seed))
|
||||
|
|
@ -115,6 +116,7 @@ func TestQueuePushPopSingleGoroutine(t *testing.T) {
|
|||
}
|
||||
|
||||
func TestQueuePushBlocksOnFullQueue(t *testing.T) {
|
||||
t.Parallel()
|
||||
queue := newWriteJobQueue(5, 5)
|
||||
|
||||
pushTime := make(chan time.Time)
|
||||
|
|
@ -152,6 +154,7 @@ func TestQueuePushBlocksOnFullQueue(t *testing.T) {
|
|||
}
|
||||
|
||||
func TestQueuePopBlocksOnEmptyQueue(t *testing.T) {
|
||||
t.Parallel()
|
||||
queue := newWriteJobQueue(5, 5)
|
||||
|
||||
popTime := make(chan time.Time)
|
||||
|
|
@ -192,6 +195,7 @@ func TestQueuePopBlocksOnEmptyQueue(t *testing.T) {
|
|||
}
|
||||
|
||||
func TestQueuePopUnblocksOnClose(t *testing.T) {
|
||||
t.Parallel()
|
||||
queue := newWriteJobQueue(5, 5)
|
||||
|
||||
popTime := make(chan time.Time)
|
||||
|
|
@ -231,6 +235,7 @@ func TestQueuePopUnblocksOnClose(t *testing.T) {
|
|||
}
|
||||
|
||||
func TestQueuePopAfterCloseReturnsAllElements(t *testing.T) {
|
||||
t.Parallel()
|
||||
const count = 10
|
||||
|
||||
queue := newWriteJobQueue(count, count)
|
||||
|
|
@ -257,6 +262,7 @@ func TestQueuePopAfterCloseReturnsAllElements(t *testing.T) {
|
|||
}
|
||||
|
||||
func TestQueuePushPopManyGoroutines(t *testing.T) {
|
||||
t.Parallel()
|
||||
const readGoroutines = 5
|
||||
const writeGoroutines = 10
|
||||
const writes = 500
|
||||
|
|
@ -303,6 +309,7 @@ func TestQueuePushPopManyGoroutines(t *testing.T) {
|
|||
}
|
||||
|
||||
func TestQueueSegmentIsKeptEvenIfEmpty(t *testing.T) {
|
||||
t.Parallel()
|
||||
queue := newWriteJobQueue(1024, 64)
|
||||
|
||||
require.True(t, queue.push(chunkWriteJob{seriesRef: 1}))
|
||||
|
|
|
|||
Loading…
Reference in a new issue