diff --git a/cmd/prometheus/main_test.go b/cmd/prometheus/main_test.go index e5e3db39ae..607e422868 100644 --- a/cmd/prometheus/main_test.go +++ b/cmd/prometheus/main_test.go @@ -968,8 +968,17 @@ remote_write: // TestRemoteWrite_ReshardingWithoutDeadlock ensures that resharding (scaling up) doesn't block when the shards are full. // See: https://github.com/prometheus/prometheus/issues/17384. +// +// The following shows key resharding metrics before and after the fix. +// In v3.7.0, the deadlock prevented the resharding logic from observing the true incoming data rate. +// +// | Metric | v3.7.0 | after the fix | +// |---------------------|---------------|---------------------| +// | dataInRate | 0.6 | 307.2 | +// | dataPendingRate | 0.2 | 306.8 | +// | dataPending | 0 | 1228.8 | +// | desiredShards | 0.6 | 369.2 |. func TestRemoteWrite_ReshardingWithoutDeadlock(t *testing.T) { - t.Skip("flaky test, see https://github.com/prometheus/prometheus/issues/17489") t.Parallel() tmpDir := t.TempDir() @@ -984,7 +993,8 @@ func TestRemoteWrite_ReshardingWithoutDeadlock(t *testing.T) { config := fmt.Sprintf(` global: - scrape_interval: 100ms + # Using a smaller interval may cause the scrape to time out. + scrape_interval: 1s scrape_configs: - job_name: 'self' static_configs: @@ -995,6 +1005,8 @@ remote_write: queue_config: # Speed up the queue being full. capacity: 1 + # Helps keep the “time to send one sample” low so it doesn’t influence the resharding logic. + max_samples_per_send: 1 `, port, server.URL) require.NoError(t, os.WriteFile(configFile, []byte(config), 0o777)) @@ -1003,36 +1015,52 @@ remote_write: configFile, port, fmt.Sprintf("--storage.tsdb.path=%s", tmpDir), + "--log.level=debug", ) require.NoError(t, prom.Start()) - var checkInitialDesiredShardsOnce sync.Once - require.Eventually(t, func() bool { + const desiredShardsMetric = "prometheus_remote_storage_shards_desired" + getMetrics := func() ([]byte, error) { r, err := http.Get(fmt.Sprintf("http://127.0.0.1:%d/metrics", port)) if err != nil { - return false + return nil, err } defer r.Body.Close() if r.StatusCode != http.StatusOK { - return false + return nil, fmt.Errorf("unexpected status code: %d", r.StatusCode) } metrics, err := io.ReadAll(r.Body) + if err != nil { + return nil, err + } + return metrics, nil + } + + // Ensure the initial desired shards is 1. + require.Eventually(t, func() bool { + metrics, err := getMetrics() if err != nil { return false } + initialDesiredShards, err := getMetricValue(t, bytes.NewReader(metrics), model.MetricTypeGauge, desiredShardsMetric) + if err != nil { + return false + } + return initialDesiredShards == 1.0 + }, 10*time.Second, 100*time.Millisecond) - checkInitialDesiredShardsOnce.Do(func() { - s, err := getMetricValue(t, bytes.NewReader(metrics), model.MetricTypeGauge, "prometheus_remote_storage_shards_desired") - require.NoError(t, err) - require.Equal(t, 1.0, s) - }) - - desiredShards, err := getMetricValue(t, bytes.NewReader(metrics), model.MetricTypeGauge, "prometheus_remote_storage_shards_desired") - if err != nil || desiredShards <= 1 { + // Ensure scaling up is triggered after some time. + require.Eventually(t, func() bool { + metrics, err := getMetrics() + if err != nil { + return false + } + desiredShards, err := getMetricValue(t, bytes.NewReader(metrics), model.MetricTypeGauge, desiredShardsMetric) + if err != nil || desiredShards <= 1.0 { return false } return true // 3*shardUpdateDuration to allow for the resharding logic to run. - }, 30*time.Second, 1*time.Second) + }, 30*time.Second, time.Second) }