gate: add wait duration native histogram metric

Add a prometheus.Histogram to the Gate that records how long each
Start() call waited before acquiring the semaphore. Uses native
histogram for better resolution without fixed bucket boundaries.

Fixes #11365

Signed-off-by: Ogulcan Aydogan <ogulcanaydogan@hotmail.com>
This commit is contained in:
Ogulcan Aydogan 2026-03-24 12:00:13 +00:00
parent 9a3ac8910b
commit c6ca77cc1c
No known key found for this signature in database
GPG key ID: 00C439F668BFC3F6
3 changed files with 129 additions and 6 deletions

View file

@ -51,7 +51,7 @@ func NewReadHandler(logger *slog.Logger, r prometheus.Registerer, queryable stor
queryable: queryable,
config: config,
remoteReadSampleLimit: remoteReadSampleLimit,
remoteReadGate: gate.New(remoteReadConcurrencyLimit),
remoteReadGate: gate.New(remoteReadConcurrencyLimit, prometheus.WrapRegistererWithPrefix("prometheus_remote_read_handler_", r)),
remoteReadMaxBytesInFrame: remoteReadMaxBytesInFrame,
marshalPool: &sync.Pool{},

View file

@ -13,27 +13,47 @@
package gate
import "context"
import (
"context"
"time"
"github.com/prometheus/client_golang/prometheus"
)
// A Gate controls the maximum number of concurrently running and waiting queries.
type Gate struct {
ch chan struct{}
ch chan struct{}
waitDuration prometheus.Histogram
}
// New returns a query gate that limits the number of queries
// being concurrently executed.
func New(length int) *Gate {
return &Gate{
// being concurrently executed. If reg is non-nil, a histogram tracking
// gate wait duration is registered.
func New(length int, reg prometheus.Registerer) *Gate {
g := &Gate{
ch: make(chan struct{}, length),
waitDuration: prometheus.NewHistogram(prometheus.HistogramOpts{
Name: "gate_wait_duration_seconds",
Help: "How long a request spent waiting for the gate to open before proceeding.",
NativeHistogramBucketFactor: 1.1,
NativeHistogramMaxBucketNumber: 100,
NativeHistogramMinResetDuration: 1 * time.Hour,
}),
}
if reg != nil {
reg.MustRegister(g.waitDuration)
}
return g
}
// Start blocks until the gate has a free spot or the context is done.
func (g *Gate) Start(ctx context.Context) error {
start := time.Now()
select {
case <-ctx.Done():
return ctx.Err()
case g.ch <- struct{}{}:
g.waitDuration.Observe(time.Since(start).Seconds())
return nil
}
}

103
util/gate/gate_test.go Normal file
View file

@ -0,0 +1,103 @@
// Copyright The Prometheus Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package gate
import (
"context"
"testing"
"time"
"github.com/prometheus/client_golang/prometheus"
dto "github.com/prometheus/client_model/go"
"github.com/stretchr/testify/require"
)
func TestGateStart(t *testing.T) {
g := New(1, nil)
require.NoError(t, g.Start(context.Background()))
g.Done()
}
func TestGateStartBlocksWhenFull(t *testing.T) {
g := New(1, nil)
require.NoError(t, g.Start(context.Background()))
ctx, cancel := context.WithTimeout(context.Background(), 50*time.Millisecond)
defer cancel()
err := g.Start(ctx)
require.ErrorIs(t, err, context.DeadlineExceeded)
g.Done()
}
func TestGateWaitDurationObserved(t *testing.T) {
reg := prometheus.NewRegistry()
g := New(1, reg)
require.NoError(t, g.Start(context.Background()))
g.Done()
mfs, err := reg.Gather()
require.NoError(t, err)
var hist *dto.Histogram
for _, mf := range mfs {
if mf.GetName() == "gate_wait_duration_seconds" {
hist = mf.GetMetric()[0].GetHistogram()
break
}
}
require.NotNil(t, hist, "expected gate_wait_duration_seconds histogram to be present")
require.Equal(t, uint64(1), hist.GetSampleCount())
}
func TestGateWaitDurationReflectsActualWait(t *testing.T) {
reg := prometheus.NewRegistry()
g := New(1, reg)
// Fill the gate.
require.NoError(t, g.Start(context.Background()))
// Release after a short delay so the next Start observes non-zero wait.
go func() {
time.Sleep(100 * time.Millisecond)
g.Done()
}()
require.NoError(t, g.Start(context.Background()))
g.Done()
mfs, err := reg.Gather()
require.NoError(t, err)
var hist *dto.Histogram
for _, mf := range mfs {
if mf.GetName() == "gate_wait_duration_seconds" {
hist = mf.GetMetric()[0].GetHistogram()
break
}
}
require.NotNil(t, hist)
// Two Start calls observed: one fast, one ~100ms.
require.Equal(t, uint64(2), hist.GetSampleCount())
require.Greater(t, hist.GetSampleSum(), 0.05, "expected meaningful wait time to be recorded")
}
func TestGateDonePanicsWhenNoneStarted(t *testing.T) {
g := New(1, nil)
require.Panics(t, func() { g.Done() })
}