Merge branch 'main' into krajo/duration-arithmetic

This commit is contained in:
Fiona Liao 2025-02-18 10:59:59 +00:00
commit 74d7d375ac
58 changed files with 2035 additions and 532 deletions

View file

@ -0,0 +1,30 @@
---
name: Dependabot auto-merge
on: pull_request
concurrency:
group: ${{ github.workflow }}-${{ (github.event.pull_request && github.event.pull_request.number) || github.ref || github.run_id }}
cancel-in-progress: true
permissions:
contents: read
jobs:
dependabot:
permissions:
contents: write
pull-requests: write
runs-on: ubuntu-latest
if: ${{ github.event.pull_request.user.login == 'dependabot[bot]' && github.repository_owner == 'prometheus' }}
steps:
- name: Dependabot metadata
id: metadata
uses: dependabot/fetch-metadata@d7267f607e9d3fb96fc2fbe83e0af444713e90b7 # v2.3.0
with:
github-token: "${{ secrets.GITHUB_TOKEN }}"
- name: Enable auto-merge for Dependabot PRs
if: ${{steps.metadata.outputs.update-type == 'version-update:semver-minor' || steps.metadata.outputs.update-type == 'version-update:semver-patch'}}
run: gh pr merge --auto --merge "$PR_URL"
env:
PR_URL: ${{github.event.pull_request.html_url}}
GITHUB_TOKEN: ${{secrets.GITHUB_TOKEN}}

View file

@ -14,7 +14,7 @@ jobs:
image: quay.io/prometheus/golang-builder:1.23-base
steps:
- uses: actions/checkout@11bd71901bbe5b1630ceea73d27597364c9af683 # v4.2.2
- uses: prometheus/promci@52c7012f5f0070d7281b8db4a119e21341d43c91 # v0.4.5
- uses: prometheus/promci@c3c93a50d581b928af720f0134b2b2dad32a6c41 # v0.4.6
- uses: ./.github/promci/actions/setup_environment
with:
enable_npm: true
@ -30,7 +30,7 @@ jobs:
image: quay.io/prometheus/golang-builder:1.23-base
steps:
- uses: actions/checkout@11bd71901bbe5b1630ceea73d27597364c9af683 # v4.2.2
- uses: prometheus/promci@52c7012f5f0070d7281b8db4a119e21341d43c91 # v0.4.5
- uses: prometheus/promci@c3c93a50d581b928af720f0134b2b2dad32a6c41 # v0.4.6
- uses: ./.github/promci/actions/setup_environment
- run: go test --tags=dedupelabels ./...
- run: GOARCH=386 go test ./...
@ -63,7 +63,7 @@ jobs:
steps:
- uses: actions/checkout@11bd71901bbe5b1630ceea73d27597364c9af683 # v4.2.2
- uses: prometheus/promci@52c7012f5f0070d7281b8db4a119e21341d43c91 # v0.4.5
- uses: prometheus/promci@c3c93a50d581b928af720f0134b2b2dad32a6c41 # v0.4.6
- uses: ./.github/promci/actions/setup_environment
with:
enable_go: false
@ -122,7 +122,7 @@ jobs:
thread: [ 0, 1, 2 ]
steps:
- uses: actions/checkout@11bd71901bbe5b1630ceea73d27597364c9af683 # v4.2.2
- uses: prometheus/promci@52c7012f5f0070d7281b8db4a119e21341d43c91 # v0.4.5
- uses: prometheus/promci@c3c93a50d581b928af720f0134b2b2dad32a6c41 # v0.4.6
- uses: ./.github/promci/actions/build
with:
promu_opts: "-p linux/amd64 -p windows/amd64 -p linux/arm64 -p darwin/amd64 -p darwin/arm64 -p linux/386"
@ -147,7 +147,7 @@ jobs:
# should also be updated.
steps:
- uses: actions/checkout@11bd71901bbe5b1630ceea73d27597364c9af683 # v4.2.2
- uses: prometheus/promci@52c7012f5f0070d7281b8db4a119e21341d43c91 # v0.4.5
- uses: prometheus/promci@c3c93a50d581b928af720f0134b2b2dad32a6c41 # v0.4.6
- uses: ./.github/promci/actions/build
with:
parallelism: 12
@ -209,7 +209,7 @@ jobs:
if: github.event_name == 'push' && github.event.ref == 'refs/heads/main'
steps:
- uses: actions/checkout@11bd71901bbe5b1630ceea73d27597364c9af683 # v4.2.2
- uses: prometheus/promci@52c7012f5f0070d7281b8db4a119e21341d43c91 # v0.4.5
- uses: prometheus/promci@c3c93a50d581b928af720f0134b2b2dad32a6c41 # v0.4.6
- uses: ./.github/promci/actions/publish_main
with:
docker_hub_login: ${{ secrets.docker_hub_login }}
@ -226,7 +226,7 @@ jobs:
(github.event_name == 'push' && startsWith(github.ref, 'refs/tags/v3.'))
steps:
- uses: actions/checkout@11bd71901bbe5b1630ceea73d27597364c9af683 # v4.2.2
- uses: prometheus/promci@52c7012f5f0070d7281b8db4a119e21341d43c91 # v0.4.5
- uses: prometheus/promci@c3c93a50d581b928af720f0134b2b2dad32a6c41 # v0.4.6
- uses: ./.github/promci/actions/publish_release
with:
docker_hub_login: ${{ secrets.docker_hub_login }}
@ -241,7 +241,7 @@ jobs:
steps:
- name: Checkout
uses: actions/checkout@11bd71901bbe5b1630ceea73d27597364c9af683 # v4.2.2
- uses: prometheus/promci@52c7012f5f0070d7281b8db4a119e21341d43c91 # v0.4.5
- uses: prometheus/promci@c3c93a50d581b928af720f0134b2b2dad32a6c41 # v0.4.6
- name: Install nodejs
uses: actions/setup-node@39370e3970a6d050c480ffad4ff0ed4d3fdee5af # v4.1.0
with:

View file

@ -126,6 +126,9 @@ linters-settings:
- allowTypesBefore: "*testing.T,testing.TB"
- name: context-keys-type
- name: dot-imports
- name: early-return
arguments:
- "preserveScope"
# A lot of false positives: incorrectly identifies channel draining as "empty code block".
# See https://github.com/mgechev/revive/issues/386
- name: empty-block
@ -137,6 +140,8 @@ linters-settings:
- name: exported
- name: increment-decrement
- name: indent-error-flow
arguments:
- "preserveScope"
- name: package-comments
# TODO(beorn7): Currently, we have a lot of missing package doc comments. Maybe we should have them.
disabled: true
@ -144,6 +149,8 @@ linters-settings:
- name: receiver-naming
- name: redefines-builtin-id
- name: superfluous-else
arguments:
- "preserveScope"
- name: time-naming
- name: unexported-return
- name: unreachable-code

View file

@ -231,7 +231,7 @@ func (c *flagConfig) setFeatureListOptions(logger *slog.Logger) error {
logger.Info("Experimental additional scrape metrics enabled")
case "metadata-wal-records":
c.scrape.AppendMetadata = true
logger.Info("Experimental metadata records in WAL enabled, required for remote write 2.0")
logger.Info("Experimental metadata records in WAL enabled")
case "promql-per-step-stats":
c.enablePerStepStats = true
logger.Info("Experimental per-step statistics reporting")
@ -699,7 +699,7 @@ func main() {
var (
localStorage = &readyStorage{stats: tsdb.NewDBStats()}
scraper = &readyScrapeManager{}
remoteStorage = remote.NewStorage(logger.With("component", "remote"), prometheus.DefaultRegisterer, localStorage.StartTime, localStoragePath, time.Duration(cfg.RemoteFlushDeadline), scraper, cfg.scrape.AppendMetadata)
remoteStorage = remote.NewStorage(logger.With("component", "remote"), prometheus.DefaultRegisterer, localStorage.StartTime, localStoragePath, time.Duration(cfg.RemoteFlushDeadline), scraper)
fanoutStorage = storage.NewFanout(logger, localStorage, remoteStorage)
)

View file

@ -149,7 +149,7 @@ func createBlocks(input []byte, mint, maxt, maxBlockDuration int64, maxSamplesIn
_, ts, v := p.Series()
if ts == nil {
l := labels.Labels{}
p.Metric(&l)
p.Labels(&l)
return fmt.Errorf("expected timestamp for series %v, got none", l)
}
if *ts < t {
@ -163,7 +163,7 @@ func createBlocks(input []byte, mint, maxt, maxBlockDuration int64, maxSamplesIn
}
l := labels.Labels{}
p.Metric(&l)
p.Labels(&l)
lb.Reset(l)
for name, value := range customLabels {

View file

@ -992,11 +992,11 @@ func checkDuplicates(groups []rulefmt.RuleGroup) []compareRuleType {
return duplicates
}
func ruleMetric(rule rulefmt.RuleNode) string {
if rule.Alert.Value != "" {
return rule.Alert.Value
func ruleMetric(rule rulefmt.Rule) string {
if rule.Alert != "" {
return rule.Alert
}
return rule.Record.Value
return rule.Record
}
var checkMetricsUsage = strings.TrimSpace(`

View file

@ -826,17 +826,31 @@ func checkErr(err error) int {
}
func backfillOpenMetrics(path, outputDir string, humanReadable, quiet bool, maxBlockDuration time.Duration, customLabels map[string]string) int {
inputFile, err := fileutil.OpenMmapFile(path)
var buf []byte
info, err := os.Stat(path)
if err != nil {
return checkErr(err)
}
defer inputFile.Close()
if info.Mode()&(os.ModeNamedPipe|os.ModeCharDevice) != 0 {
// Read the pipe chunks by chunks as it cannot be mmap-ed
buf, err = os.ReadFile(path)
if err != nil {
return checkErr(err)
}
} else {
inputFile, err := fileutil.OpenMmapFile(path)
if err != nil {
return checkErr(err)
}
defer inputFile.Close()
buf = inputFile.Bytes()
}
if err := os.MkdirAll(outputDir, 0o777); err != nil {
return checkErr(fmt.Errorf("create output dir: %w", err))
}
return checkErr(backfill(5000, inputFile.Bytes(), outputDir, humanReadable, quiet, maxBlockDuration, customLabels))
return checkErr(backfill(5000, buf, outputDir, humanReadable, quiet, maxBlockDuration, customLabels))
}
func displayHistogram(dataType string, datas []int, total int) {

View file

@ -0,0 +1,69 @@
// Copyright 2017 The Prometheus Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//go:build !windows
package main
import (
"bytes"
"io"
"math"
"os"
"path"
"syscall"
"testing"
"time"
"github.com/stretchr/testify/require"
"github.com/prometheus/prometheus/tsdb"
)
func TestTSDBDumpOpenMetricsRoundTripPipe(t *testing.T) {
initialMetrics, err := os.ReadFile("testdata/dump-openmetrics-roundtrip-test.prom")
require.NoError(t, err)
initialMetrics = normalizeNewLine(initialMetrics)
pipeDir := t.TempDir()
dbDir := t.TempDir()
// create pipe
pipe := path.Join(pipeDir, "pipe")
err = syscall.Mkfifo(pipe, 0o666)
require.NoError(t, err)
go func() {
// open pipe to write
in, err := os.OpenFile(pipe, os.O_WRONLY, os.ModeNamedPipe)
require.NoError(t, err)
defer func() { require.NoError(t, in.Close()) }()
_, err = io.Copy(in, bytes.NewReader(initialMetrics))
require.NoError(t, err)
}()
// Import samples from OM format
code := backfillOpenMetrics(pipe, dbDir, false, false, 2*time.Hour, map[string]string{})
require.Equal(t, 0, code)
db, err := tsdb.Open(dbDir, nil, nil, tsdb.DefaultOptions(), nil)
require.NoError(t, err)
t.Cleanup(func() {
require.NoError(t, db.Close())
})
// Dump the blocks into OM format
dumpedMetrics := getDumpedSamples(t, dbDir, "", math.MinInt64, math.MaxInt64, []string{"{__name__=~'(?s:.*)'}"}, formatSeriesSetOpenMetrics)
// Should get back the initial metrics.
require.Equal(t, string(initialMetrics), dumpedMetrics)
}

View file

@ -458,11 +458,10 @@ func (d *Discovery) vmToLabelSet(ctx context.Context, client client, vm virtualM
networkInterface, err = client.getVMScaleSetVMNetworkInterfaceByID(ctx, nicID, vm.ScaleSet, vm.InstanceID)
}
if err != nil {
if errors.Is(err, errorNotFound) {
d.logger.Warn("Network interface does not exist", "name", nicID, "err", err)
} else {
if !errors.Is(err, errorNotFound) {
return nil, err
}
d.logger.Warn("Network interface does not exist", "name", nicID, "err", err)
// Get out of this routine because we cannot continue without a network interface.
return nil, nil
}

View file

@ -194,13 +194,12 @@ func (d *Discovery) refresh(ctx context.Context) ([]*targetgroup.Group, error) {
events, err := d.client.ListEvents(ctx, &eventsOpts)
if err != nil {
var e *linodego.Error
if errors.As(err, &e) && e.Code == http.StatusUnauthorized {
// If we get a 401, the token doesn't have `events:read_only` scope.
// Disable event polling and fallback to doing a full refresh every interval.
d.eventPollingEnabled = false
} else {
if !(errors.As(err, &e) && e.Code == http.StatusUnauthorized) {
return nil, err
}
// If we get a 401, the token doesn't have `events:read_only` scope.
// Disable event polling and fallback to doing a full refresh every interval.
d.eventPollingEnabled = false
} else {
// Event polling tells us changes the Linode API is aware of. Actions issued outside of the Linode API,
// such as issuing a `shutdown` at the VM's console instead of using the API to power off an instance,

View file

@ -101,12 +101,12 @@ func NewManager(ctx context.Context, logger *slog.Logger, registerer prometheus.
// Register the metrics.
// We have to do this after setting all options, so that the name of the Manager is set.
if metrics, err := NewManagerMetrics(registerer, mgr.name); err == nil {
mgr.metrics = metrics
} else {
metrics, err := NewManagerMetrics(registerer, mgr.name)
if err != nil {
logger.Error("Failed to create discovery manager metrics", "manager", mgr.name, "err", err)
return nil
}
mgr.metrics = metrics
return mgr
}

View file

@ -237,17 +237,16 @@ func (d *DockerDiscovery) refresh(ctx context.Context) ([]*targetgroup.Group, er
if len(networks) == 0 {
// Try to lookup shared networks
for {
if containerNetworkMode.IsContainer() {
tmpContainer, exists := allContainers[containerNetworkMode.ConnectedContainer()]
if !exists {
break
}
networks = tmpContainer.NetworkSettings.Networks
containerNetworkMode = container.NetworkMode(tmpContainer.HostConfig.NetworkMode)
if len(networks) > 0 {
break
}
} else {
if !containerNetworkMode.IsContainer() {
break
}
tmpContainer, exists := allContainers[containerNetworkMode.ConnectedContainer()]
if !exists {
break
}
networks = tmpContainer.NetworkSettings.Networks
containerNetworkMode = container.NetworkMode(tmpContainer.HostConfig.NetworkMode)
if len(networks) > 0 {
break
}
}

View file

@ -35,28 +35,30 @@ import (
const (
instanceLabelPrefix = metaLabelPrefix + "instance_"
instanceBootTypeLabel = instanceLabelPrefix + "boot_type"
instanceHostnameLabel = instanceLabelPrefix + "hostname"
instanceIDLabel = instanceLabelPrefix + "id"
instanceImageArchLabel = instanceLabelPrefix + "image_arch"
instanceImageIDLabel = instanceLabelPrefix + "image_id"
instanceImageNameLabel = instanceLabelPrefix + "image_name"
instanceLocationClusterID = instanceLabelPrefix + "location_cluster_id"
instanceLocationHypervisorID = instanceLabelPrefix + "location_hypervisor_id"
instanceLocationNodeID = instanceLabelPrefix + "location_node_id"
instanceNameLabel = instanceLabelPrefix + "name"
instanceOrganizationLabel = instanceLabelPrefix + "organization_id"
instancePrivateIPv4Label = instanceLabelPrefix + "private_ipv4"
instanceProjectLabel = instanceLabelPrefix + "project_id"
instancePublicIPv4Label = instanceLabelPrefix + "public_ipv4"
instancePublicIPv6Label = instanceLabelPrefix + "public_ipv6"
instanceSecurityGroupIDLabel = instanceLabelPrefix + "security_group_id"
instanceSecurityGroupNameLabel = instanceLabelPrefix + "security_group_name"
instanceStateLabel = instanceLabelPrefix + "status"
instanceTagsLabel = instanceLabelPrefix + "tags"
instanceTypeLabel = instanceLabelPrefix + "type"
instanceZoneLabel = instanceLabelPrefix + "zone"
instanceRegionLabel = instanceLabelPrefix + "region"
instanceBootTypeLabel = instanceLabelPrefix + "boot_type"
instanceHostnameLabel = instanceLabelPrefix + "hostname"
instanceIDLabel = instanceLabelPrefix + "id"
instanceImageArchLabel = instanceLabelPrefix + "image_arch"
instanceImageIDLabel = instanceLabelPrefix + "image_id"
instanceImageNameLabel = instanceLabelPrefix + "image_name"
instanceLocationClusterID = instanceLabelPrefix + "location_cluster_id"
instanceLocationHypervisorID = instanceLabelPrefix + "location_hypervisor_id"
instanceLocationNodeID = instanceLabelPrefix + "location_node_id"
instanceNameLabel = instanceLabelPrefix + "name"
instanceOrganizationLabel = instanceLabelPrefix + "organization_id"
instancePrivateIPv4Label = instanceLabelPrefix + "private_ipv4"
instanceProjectLabel = instanceLabelPrefix + "project_id"
instancePublicIPv4Label = instanceLabelPrefix + "public_ipv4"
instancePublicIPv6Label = instanceLabelPrefix + "public_ipv6"
instancePublicIPv4AddressesLabel = instanceLabelPrefix + "public_ipv4_addresses"
instancePublicIPv6AddressesLabel = instanceLabelPrefix + "public_ipv6_addresses"
instanceSecurityGroupIDLabel = instanceLabelPrefix + "security_group_id"
instanceSecurityGroupNameLabel = instanceLabelPrefix + "security_group_name"
instanceStateLabel = instanceLabelPrefix + "status"
instanceTagsLabel = instanceLabelPrefix + "tags"
instanceTypeLabel = instanceLabelPrefix + "type"
instanceZoneLabel = instanceLabelPrefix + "zone"
instanceRegionLabel = instanceLabelPrefix + "region"
)
type instanceDiscovery struct {
@ -175,14 +177,42 @@ func (d *instanceDiscovery) refresh(ctx context.Context) ([]*targetgroup.Group,
}
addr := ""
if len(server.PublicIPs) > 0 {
var ipv4Addresses []string
var ipv6Addresses []string
for _, ip := range server.PublicIPs {
if ip.Family == instance.ServerIPIPFamilyInet {
ipv4Addresses = append(ipv4Addresses, ip.Address.String())
} else if ip.Family == instance.ServerIPIPFamilyInet6 {
ipv6Addresses = append(ipv6Addresses, ip.Address.String())
}
}
if len(ipv6Addresses) > 0 {
labels[instancePublicIPv6AddressesLabel] = model.LabelValue(
separator +
strings.Join(ipv6Addresses, separator) +
separator)
}
if len(ipv4Addresses) > 0 {
labels[instancePublicIPv4AddressesLabel] = model.LabelValue(
separator +
strings.Join(ipv4Addresses, separator) +
separator)
}
}
if server.IPv6 != nil { //nolint:staticcheck
labels[instancePublicIPv6Label] = model.LabelValue(server.IPv6.Address.String()) //nolint:staticcheck
addr = server.IPv6.Address.String() //nolint:staticcheck
}
if server.PublicIP != nil { //nolint:staticcheck
labels[instancePublicIPv4Label] = model.LabelValue(server.PublicIP.Address.String()) //nolint:staticcheck
addr = server.PublicIP.Address.String() //nolint:staticcheck
if server.PublicIP.Family != instance.ServerIPIPFamilyInet6 { //nolint:staticcheck
labels[instancePublicIPv4Label] = model.LabelValue(server.PublicIP.Address.String()) //nolint:staticcheck
}
addr = server.PublicIP.Address.String() //nolint:staticcheck
}
if server.PrivateIP != nil {

View file

@ -60,7 +60,7 @@ api_url: %s
tg := tgs[0]
require.NotNil(t, tg)
require.NotNil(t, tg.Targets)
require.Len(t, tg.Targets, 3)
require.Len(t, tg.Targets, 4)
for i, lbls := range []model.LabelSet{
{
@ -125,6 +125,8 @@ api_url: %s
"__meta_scaleway_instance_organization_id": "20b3d507-96ac-454c-a795-bc731b46b12f",
"__meta_scaleway_instance_project_id": "20b3d507-96ac-454c-a795-bc731b46b12f",
"__meta_scaleway_instance_public_ipv4": "51.158.183.115",
"__meta_scaleway_instance_public_ipv4_addresses": ",51.158.183.115,",
"__meta_scaleway_instance_public_ipv6_addresses": ",2001:bc8:1640:1568:dc00:ff:fe21:91b,",
"__meta_scaleway_instance_region": "nl-ams",
"__meta_scaleway_instance_security_group_id": "984414da-9fc2-49c0-a925-fed6266fe092",
"__meta_scaleway_instance_security_group_name": "Default security group",
@ -132,6 +134,30 @@ api_url: %s
"__meta_scaleway_instance_type": "DEV1-S",
"__meta_scaleway_instance_zone": "nl-ams-1",
},
{
"__address__": "163.172.136.10:80",
"__meta_scaleway_instance_boot_type": "local",
"__meta_scaleway_instance_hostname": "multiple-ips",
"__meta_scaleway_instance_id": "658abbf4-e6c6-4239-a483-3307763cf6e0",
"__meta_scaleway_instance_image_arch": "x86_64",
"__meta_scaleway_instance_image_id": "f583f58c-1ea5-44ab-a1e6-2b2e7df32a86",
"__meta_scaleway_instance_image_name": "Ubuntu 24.04 Noble Numbat",
"__meta_scaleway_instance_location_cluster_id": "7",
"__meta_scaleway_instance_location_hypervisor_id": "801",
"__meta_scaleway_instance_location_node_id": "95",
"__meta_scaleway_instance_name": "multiple-ips",
"__meta_scaleway_instance_organization_id": "ee7bd9e1-9cbd-4724-b2f4-19e50f3cf38b",
"__meta_scaleway_instance_project_id": "ee7bd9e1-9cbd-4724-b2f4-19e50f3cf38b",
"__meta_scaleway_instance_public_ipv4": "163.172.136.10",
"__meta_scaleway_instance_public_ipv4_addresses": ",163.172.136.10,212.47.248.223,51.15.231.134,",
"__meta_scaleway_instance_public_ipv6_addresses": ",2001:bc8:710:4a69:dc00:ff:fe58:40c1,2001:bc8:710:d::,2001:bc8:710:5417::,",
"__meta_scaleway_instance_region": "fr-par",
"__meta_scaleway_instance_security_group_id": "0fe819c3-274d-472a-b3f5-ddb258d2d8bb",
"__meta_scaleway_instance_security_group_name": "Default security group",
"__meta_scaleway_instance_status": "running",
"__meta_scaleway_instance_type": "PLAY2-PICO",
"__meta_scaleway_instance_zone": "fr-par-1",
},
} {
t.Run(fmt.Sprintf("item %d", i), func(t *testing.T) {
require.Equal(t, lbls, tg.Targets[i])

View file

@ -356,6 +356,222 @@
"placement_group": null,
"private_nics": [],
"zone": "nl-ams-1"
},
{
"id": "658abbf4-e6c6-4239-a483-3307763cf6e0",
"name": "multiple-ips",
"arch": "x86_64",
"commercial_type": "PLAY2-PICO",
"boot_type": "local",
"organization": "ee7bd9e1-9cbd-4724-b2f4-19e50f3cf38b",
"project": "ee7bd9e1-9cbd-4724-b2f4-19e50f3cf38b",
"hostname": "multiple-ips",
"image": {
"id": "f583f58c-1ea5-44ab-a1e6-2b2e7df32a86",
"name": "Ubuntu 24.04 Noble Numbat",
"organization": "51b656e3-4865-41e8-adbc-0c45bdd780db",
"project": "51b656e3-4865-41e8-adbc-0c45bdd780db",
"root_volume": {
"id": "cfab1e2e-fa24-480a-a372-61b19e8e2fda",
"name": "Ubuntu 24.04 Noble Numbat",
"volume_type": "unified",
"size": 10000000000
},
"extra_volumes": {
},
"public": true,
"arch": "x86_64",
"creation_date": "2024-04-26T09:24:38.624912+00:00",
"modification_date": "2024-04-26T09:24:38.624912+00:00",
"default_bootscript": null,
"from_server": "",
"state": "available",
"tags": [
],
"zone": "fr-par-1"
},
"volumes": {
"0": {
"boot": false,
"id": "7d4dc5ae-3f3c-4f9c-91cf-4a47a2193e94",
"name": "Ubuntu 24.04 Noble Numbat",
"volume_type": "b_ssd",
"export_uri": null,
"organization": "ee7bd9e1-9cbd-4724-b2f4-19e50f3cf38b",
"project": "ee7bd9e1-9cbd-4724-b2f4-19e50f3cf38b",
"server": {
"id": "658abbf4-e6c6-4239-a483-3307763cf6e0",
"name": "multiple-ips"
},
"size": 10000000000,
"state": "available",
"creation_date": "2024-06-07T13:33:17.697162+00:00",
"modification_date": "2024-06-07T13:33:17.697162+00:00",
"tags": [
],
"zone": "fr-par-1"
}
},
"tags": [
],
"state": "running",
"protected": false,
"state_detail": "booted",
"public_ip": {
"id": "63fd9ede-58d7-482c-b21d-1d79d81a90dc",
"address": "163.172.136.10",
"dynamic": false,
"gateway": "62.210.0.1",
"netmask": "32",
"family": "inet",
"provisioning_mode": "dhcp",
"tags": [
],
"state": "attached",
"ipam_id": "700644ed-f6a2-4c64-8508-bb867bc07673"
},
"public_ips": [
{
"id": "63fd9ede-58d7-482c-b21d-1d79d81a90dc",
"address": "163.172.136.10",
"dynamic": false,
"gateway": "62.210.0.1",
"netmask": "32",
"family": "inet",
"provisioning_mode": "dhcp",
"tags": [
],
"state": "attached",
"ipam_id": "700644ed-f6a2-4c64-8508-bb867bc07673"
},
{
"id": "eed4575b-90e5-4102-b956-df874c911e2b",
"address": "212.47.248.223",
"dynamic": false,
"gateway": "62.210.0.1",
"netmask": "32",
"family": "inet",
"provisioning_mode": "manual",
"tags": [
],
"state": "attached",
"ipam_id": "e2bdef64-828b-4f4a-a56b-954a85759adf"
},
{
"id": "fca8b329-6c0e-4f9c-aa88-d5c197b5919a",
"address": "51.15.231.134",
"dynamic": false,
"gateway": "62.210.0.1",
"netmask": "32",
"family": "inet",
"provisioning_mode": "manual",
"tags": [
],
"state": "attached",
"ipam_id": "e56808db-b348-4b7e-ad23-995ae08dc1a1"
},
{
"id": "3ffa6774-124c-4e64-8afb-148c15304b25",
"address": "2001:bc8:710:4a69:dc00:ff:fe58:40c1",
"dynamic": false,
"gateway": "fe80::dc00:ff:fe58:40c2",
"netmask": "64",
"family": "inet6",
"provisioning_mode": "slaac",
"tags": [
],
"state": "attached",
"ipam_id": "d97773d9-fd2c-4085-92bb-5c471abd132e"
},
{
"id": "28fcf539-8492-4603-b627-de0501ce8489",
"address": "2001:bc8:710:d::",
"dynamic": false,
"gateway": "fe80::dc00:ff:fe58:40c2",
"netmask": "64",
"family": "inet6",
"provisioning_mode": "manual",
"tags": [
],
"state": "attached",
"ipam_id": "005d19ac-203c-4034-ab16-9ff4caadbdd5"
},
{
"id": "db6fafda-3a12-403d-8c9c-1a1cb1c315ba",
"address": "2001:bc8:710:5417::",
"dynamic": false,
"gateway": "fe80::dc00:ff:fe58:40c2",
"netmask": "64",
"family": "inet6",
"provisioning_mode": "manual",
"tags": [
],
"state": "attached",
"ipam_id": "8c62fac8-4134-462c-ba48-71ce4f8ac939"
}
],
"mac_address": "de:00:00:58:40:c1",
"routed_ip_enabled": true,
"ipv6": null,
"extra_networks": [
],
"dynamic_ip_required": false,
"enable_ipv6": false,
"private_ip": null,
"creation_date": "2024-06-07T13:33:17.697162+00:00",
"modification_date": "2024-06-07T13:33:26.021167+00:00",
"bootscript": {
"id": "fdfe150f-a870-4ce4-b432-9f56b5b995c1",
"public": true,
"title": "x86_64 mainline 4.4.230 rev1",
"architecture": "x86_64",
"organization": "11111111-1111-4111-8111-111111111111",
"project": "11111111-1111-4111-8111-111111111111",
"kernel": "http://10.194.3.9/kernel/x86_64-mainline-lts-4.4-4.4.230-rev1/vmlinuz-4.4.230",
"dtb": "",
"initrd": "http://10.194.3.9/initrd/initrd-Linux-x86_64-v3.14.6.gz",
"bootcmdargs": "LINUX_COMMON scaleway boot=local nbd.max_part=16",
"default": true,
"zone": "fr-par-1"
},
"security_group": {
"id": "0fe819c3-274d-472a-b3f5-ddb258d2d8bb",
"name": "Default security group"
},
"location": {
"zone_id": "par1",
"platform_id": "14",
"cluster_id": "7",
"hypervisor_id": "801",
"node_id": "95"
},
"maintenances": [
],
"allowed_actions": [
"poweroff",
"terminate",
"reboot",
"stop_in_place",
"backup"
],
"placement_group": null,
"private_nics": [
],
"zone": "fr-par-1"
}
]
}

View file

@ -2338,6 +2338,8 @@ The following meta labels are available on targets during [relabeling](#relabel_
* `__meta_scaleway_instance_project_id`: project id of the server
* `__meta_scaleway_instance_public_ipv4`: the public IPv4 address of the server
* `__meta_scaleway_instance_public_ipv6`: the public IPv6 address of the server
* `__meta_scaleway_instance_public_ipv4_addresses`: the public IPv4 addresses of the server
* `__meta_scaleway_instance_public_ipv6_addresses`: the public IPv6 addresses of the server
* `__meta_scaleway_instance_region`: the region of the server
* `__meta_scaleway_instance_security_group_id`: the ID of the security group of the server
* `__meta_scaleway_instance_security_group_name`: the name of the security group of the server

View file

@ -114,8 +114,7 @@ Fall back to serving the old (Prometheus 2.x) web UI instead of the new UI. The
When enabled, Prometheus will store metadata in-memory and keep track of
metadata changes as WAL records on a per-series basis.
This must be used if
you are also using remote write 2.0 as it will only gather metadata from the WAL.
This must be used if you would like to send metadata using the new remote write 2.0.
## Delay compaction start time

View file

@ -319,7 +319,14 @@ the input samples, including the original labels, are returned in the result
vector. `by` and `without` are only used to bucket the input vector. Similar to
`min` and `max`, they only operate on float samples, considering `NaN` values
to be farthest from the top or bottom, respectively. Histogram samples in the
input vector are ignored, flagged by an info-level annotation.
input vector are ignored, flagged by an info-level annotation.
If used in an instant query, `topk` and `bottomk` return series ordered by
value in descending or ascending order, respectively. If used with `by` or
`without`, then series within each bucket are sorted by value, and series in
the same bucket are returned consecutively, but there is no guarantee that
buckets of series will be returned in any particular order. No sorting applies
to range queries.
`limitk` and `limit_ratio` also return a subset of the input samples, including
the original labels in the result vector. The subset is selected in a
@ -334,7 +341,7 @@ the input samples, while `limit_ratio(-0.9, ...)` returns precisely the
remaining approximately 90% of the input samples not returned by
`limit_ratio(0.1, ...)`.
`group` and `count` do not do not interact with the sample values,
`group` and `count` do not interact with the sample values,
they work in the same way for float samples and histogram samples.
`count_values` outputs one time series per unique sample value. Each series has

View file

@ -288,11 +288,11 @@ func valuesToSamples(timestamp time.Time, value interface{}) (prompb.Sample, err
var valueInt64 int64
var ok bool
if valueFloat64, ok = value.(float64); !ok {
if valueInt64, ok = value.(int64); ok {
valueFloat64 = float64(valueInt64)
} else {
valueInt64, ok = value.(int64)
if !ok {
return prompb.Sample{}, fmt.Errorf("unable to convert sample value to float64: %v", value)
}
valueFloat64 = float64(valueInt64)
}
return prompb.Sample{

View file

@ -92,7 +92,7 @@ type RuleGroups struct {
}
type ruleGroups struct {
Groups []yaml.Node `yaml:"groups"`
Groups []ruleGroupNode `yaml:"groups"`
}
// Validate validates all rules in the rule groups.
@ -128,9 +128,9 @@ func (g *RuleGroups) Validate(node ruleGroups) (errs []error) {
set[g.Name] = struct{}{}
for i, r := range g.Rules {
for _, node := range g.Rules[i].Validate() {
var ruleName yaml.Node
if r.Alert.Value != "" {
for _, node := range r.Validate(node.Groups[j].Rules[i]) {
var ruleName string
if r.Alert != "" {
ruleName = r.Alert
} else {
ruleName = r.Record
@ -138,7 +138,7 @@ func (g *RuleGroups) Validate(node ruleGroups) (errs []error) {
errs = append(errs, &Error{
Group: g.Name,
Rule: i + 1,
RuleName: ruleName.Value,
RuleName: ruleName,
Err: node,
})
}
@ -154,7 +154,18 @@ type RuleGroup struct {
Interval model.Duration `yaml:"interval,omitempty"`
QueryOffset *model.Duration `yaml:"query_offset,omitempty"`
Limit int `yaml:"limit,omitempty"`
Rules []RuleNode `yaml:"rules"`
Rules []Rule `yaml:"rules"`
Labels map[string]string `yaml:"labels,omitempty"`
}
// ruleGroupNode adds yaml.v3 layer to support line and columns outputs for invalid rule groups.
type ruleGroupNode struct {
yaml.Node
Name string `yaml:"name"`
Interval model.Duration `yaml:"interval,omitempty"`
QueryOffset *model.Duration `yaml:"query_offset,omitempty"`
Limit int `yaml:"limit,omitempty"`
Rules []ruleNode `yaml:"rules"`
Labels map[string]string `yaml:"labels,omitempty"`
}
@ -169,8 +180,8 @@ type Rule struct {
Annotations map[string]string `yaml:"annotations,omitempty"`
}
// RuleNode adds yaml.v3 layer to support line and column outputs for invalid rules.
type RuleNode struct {
// ruleNode adds yaml.v3 layer to support line and column outputs for invalid rules.
type ruleNode struct {
Record yaml.Node `yaml:"record,omitempty"`
Alert yaml.Node `yaml:"alert,omitempty"`
Expr yaml.Node `yaml:"expr"`
@ -181,64 +192,64 @@ type RuleNode struct {
}
// Validate the rule and return a list of encountered errors.
func (r *RuleNode) Validate() (nodes []WrappedError) {
if r.Record.Value != "" && r.Alert.Value != "" {
func (r *Rule) Validate(node ruleNode) (nodes []WrappedError) {
if r.Record != "" && r.Alert != "" {
nodes = append(nodes, WrappedError{
err: errors.New("only one of 'record' and 'alert' must be set"),
node: &r.Record,
nodeAlt: &r.Alert,
node: &node.Record,
nodeAlt: &node.Alert,
})
}
if r.Record.Value == "" && r.Alert.Value == "" {
if r.Record == "" && r.Alert == "" {
nodes = append(nodes, WrappedError{
err: errors.New("one of 'record' or 'alert' must be set"),
node: &r.Record,
nodeAlt: &r.Alert,
node: &node.Record,
nodeAlt: &node.Alert,
})
}
if r.Expr.Value == "" {
if r.Expr == "" {
nodes = append(nodes, WrappedError{
err: errors.New("field 'expr' must be set in rule"),
node: &r.Expr,
node: &node.Expr,
})
} else if _, err := parser.ParseExpr(r.Expr.Value); err != nil {
} else if _, err := parser.ParseExpr(r.Expr); err != nil {
nodes = append(nodes, WrappedError{
err: fmt.Errorf("could not parse expression: %w", err),
node: &r.Expr,
node: &node.Expr,
})
}
if r.Record.Value != "" {
if r.Record != "" {
if len(r.Annotations) > 0 {
nodes = append(nodes, WrappedError{
err: errors.New("invalid field 'annotations' in recording rule"),
node: &r.Record,
node: &node.Record,
})
}
if r.For != 0 {
nodes = append(nodes, WrappedError{
err: errors.New("invalid field 'for' in recording rule"),
node: &r.Record,
node: &node.Record,
})
}
if r.KeepFiringFor != 0 {
nodes = append(nodes, WrappedError{
err: errors.New("invalid field 'keep_firing_for' in recording rule"),
node: &r.Record,
node: &node.Record,
})
}
if !model.IsValidMetricName(model.LabelValue(r.Record.Value)) {
if !model.IsValidMetricName(model.LabelValue(r.Record)) {
nodes = append(nodes, WrappedError{
err: fmt.Errorf("invalid recording rule name: %s", r.Record.Value),
node: &r.Record,
err: fmt.Errorf("invalid recording rule name: %s", r.Record),
node: &node.Record,
})
}
// While record is a valid UTF-8 it's common mistake to put PromQL expression in the record name.
// Disallow "{}" chars.
if strings.Contains(r.Record.Value, "{") || strings.Contains(r.Record.Value, "}") {
if strings.Contains(r.Record, "{") || strings.Contains(r.Record, "}") {
nodes = append(nodes, WrappedError{
err: fmt.Errorf("braces present in the recording rule name; should it be in expr?: %s", r.Record.Value),
node: &r.Record,
err: fmt.Errorf("braces present in the recording rule name; should it be in expr?: %s", r.Record),
node: &node.Record,
})
}
}
@ -274,8 +285,8 @@ func (r *RuleNode) Validate() (nodes []WrappedError) {
// testTemplateParsing checks if the templates used in labels and annotations
// of the alerting rules are parsed correctly.
func testTemplateParsing(rl *RuleNode) (errs []error) {
if rl.Alert.Value == "" {
func testTemplateParsing(rl *Rule) (errs []error) {
if rl.Alert == "" {
// Not an alerting rule.
return errs
}
@ -292,7 +303,7 @@ func testTemplateParsing(rl *RuleNode) (errs []error) {
tmpl := template.NewTemplateExpander(
context.TODO(),
strings.Join(append(defs, text), ""),
"__alert_"+rl.Alert.Value,
"__alert_"+rl.Alert,
tmplData,
model.Time(timestamp.FromTime(time.Now())),
nil,

View file

@ -33,6 +33,33 @@ func TestParseFileSuccess(t *testing.T) {
require.Empty(t, errs, "unexpected errors parsing file")
}
func TestParseFileSuccessWithAliases(t *testing.T) {
exprString := `sum without(instance) (rate(errors_total[5m]))
/
sum without(instance) (rate(requests_total[5m]))
`
rgs, errs := ParseFile("testdata/test_aliases.yaml", false)
require.Empty(t, errs, "unexpected errors parsing file")
for _, rg := range rgs.Groups {
require.Equal(t, "HighAlert", rg.Rules[0].Alert)
require.Equal(t, "critical", rg.Rules[0].Labels["severity"])
require.Equal(t, "stuff's happening with {{ $.labels.service }}", rg.Rules[0].Annotations["description"])
require.Equal(t, "new_metric", rg.Rules[1].Record)
require.Equal(t, "HighAlert", rg.Rules[2].Alert)
require.Equal(t, "critical", rg.Rules[2].Labels["severity"])
require.Equal(t, "stuff's happening with {{ $.labels.service }}", rg.Rules[0].Annotations["description"])
require.Equal(t, "HighAlert2", rg.Rules[3].Alert)
require.Equal(t, "critical", rg.Rules[3].Labels["severity"])
for _, rule := range rg.Rules {
require.Equal(t, exprString, rule.Expr)
}
}
}
func TestParseFileFailure(t *testing.T) {
for _, c := range []struct {
filename string

View file

@ -0,0 +1,57 @@
groups:
- name: my-group-name
interval: 30s # defaults to global interval
rules:
- &highalert
alert: &alertname HighAlert
expr: &expr |
sum without(instance) (rate(errors_total[5m]))
/
sum without(instance) (rate(requests_total[5m]))
for: 5m
labels:
severity: &severity critical
annotations:
description: &description "stuff's happening with {{ $.labels.service }}"
# Mix recording rules in the same list
- record: &recordname "new_metric"
expr: *expr
labels:
abc: edf
uvw: xyz
- alert: *alertname
expr: *expr
for: 5m
labels:
severity: *severity
annotations:
description: *description
- <<: *highalert
alert: HighAlert2
- name: my-another-name
interval: 30s # defaults to global interval
rules:
- alert: *alertname
expr: *expr
for: 5m
labels:
severity: *severity
annotations:
description: *description
- record: *recordname
expr: *expr
- alert: *alertname
expr: *expr
labels:
severity: *severity
annotations:
description: *description
- <<: *highalert
alert: HighAlert2

View file

@ -203,7 +203,7 @@ func benchParse(b *testing.B, data []byte, parser string) {
b.Fatal("not implemented entry", t)
}
_ = p.Metric(&res)
p.Labels(&res)
_ = p.CreatedTimestamp()
for hasExemplar := p.Exemplar(&e); hasExemplar; hasExemplar = p.Exemplar(&e) {
}

View file

@ -57,11 +57,10 @@ type Parser interface {
// The returned byte slice becomes invalid after the next call to Next.
Comment() []byte
// Metric writes the labels of the current sample into the passed labels.
// It returns the string from which the metric was parsed.
// Labels writes the labels of the current sample into the passed labels.
// The values of the "le" labels of classic histograms and "quantile" labels
// of summaries should follow the OpenMetrics formatting rules.
Metric(l *labels.Labels) string
Labels(l *labels.Labels)
// Exemplar writes the exemplar of the current sample into the passed
// exemplar. It can be called repeatedly to retrieve multiple exemplars

View file

@ -19,6 +19,7 @@ import (
"testing"
"github.com/google/go-cmp/cmp"
"github.com/google/go-cmp/cmp/cmpopts"
"github.com/prometheus/common/model"
"github.com/stretchr/testify/require"
@ -210,6 +211,17 @@ func requireEntries(t *testing.T, exp, got []parsedEntry) {
t.Helper()
testutil.RequireEqualWithOptions(t, exp, got, []cmp.Option{
// We reuse slices so we sometimes have empty vs nil differences
// we need to ignore with cmpopts.EquateEmpty().
// However we have to filter out labels, as only
// one comparer per type has to be specified,
// and RequireEqualWithOptions uses
// cmp.Comparer(labels.Equal).
cmp.FilterValues(func(x, y any) bool {
_, xIsLabels := x.(labels.Labels)
_, yIsLabels := y.(labels.Labels)
return !xIsLabels && !yIsLabels
}, cmpopts.EquateEmpty()),
cmp.AllowUnexported(parsedEntry{}),
})
}
@ -230,15 +242,20 @@ func testParse(t *testing.T, p Parser) (ret []parsedEntry) {
case EntryInvalid:
t.Fatal("entry invalid not expected")
case EntrySeries, EntryHistogram:
var ts *int64
if et == EntrySeries {
m, got.t, got.v = p.Series()
got.m = string(m)
m, ts, got.v = p.Series()
} else {
m, got.t, got.shs, got.fhs = p.Histogram()
got.m = string(m)
m, ts, got.shs, got.fhs = p.Histogram()
}
if ts != nil {
// TODO(bwplotka): Change to 0 in the interface for set check to
// avoid pointer mangling.
got.t = int64p(*ts)
}
got.m = string(m)
p.Labels(&got.lset)
p.Metric(&got.lset)
// Parser reuses int pointer.
if ct := p.CreatedTimestamp(); ct != nil {
got.ct = int64p(*ct)

View file

@ -67,8 +67,7 @@ type NHCBParser struct {
h *histogram.Histogram
fh *histogram.FloatHistogram
// For Metric.
lset labels.Labels
metricString string
lset labels.Labels
// For Type.
bName []byte
typ model.MetricType
@ -141,13 +140,12 @@ func (p *NHCBParser) Comment() []byte {
return p.parser.Comment()
}
func (p *NHCBParser) Metric(l *labels.Labels) string {
func (p *NHCBParser) Labels(l *labels.Labels) {
if p.state == stateEmitting {
*l = p.lsetNHCB
return p.metricStringNHCB
return
}
*l = p.lset
return p.metricString
}
func (p *NHCBParser) Exemplar(ex *exemplar.Exemplar) bool {
@ -200,7 +198,7 @@ func (p *NHCBParser) Next() (Entry, error) {
switch p.entry {
case EntrySeries:
p.bytes, p.ts, p.value = p.parser.Series()
p.metricString = p.parser.Metric(&p.lset)
p.parser.Labels(&p.lset)
// Check the label set to see if we can continue or need to emit the NHCB.
var isNHCB bool
if p.compareLabels() {
@ -224,7 +222,7 @@ func (p *NHCBParser) Next() (Entry, error) {
return p.entry, p.err
case EntryHistogram:
p.bytes, p.ts, p.h, p.fh = p.parser.Histogram()
p.metricString = p.parser.Metric(&p.lset)
p.parser.Labels(&p.lset)
p.storeExponentialLabels()
case EntryType:
p.bName, p.typ = p.parser.Type()

View file

@ -931,7 +931,7 @@ func createTestPromHistogram() string {
return `# HELP test_histogram1 Test histogram 1
# TYPE test_histogram1 histogram
test_histogram1_count 175 1234568
test_histogram1_sum 0.0008280461746287094 1234768
test_histogram1_sum 0.0008280461746287094 1234568
test_histogram1_bucket{le="-0.0004899999999999998"} 2 1234568
test_histogram1_bucket{le="-0.0003899999999999998"} 4 1234568
test_histogram1_bucket{le="-0.0002899999999999998"} 16 1234568

View file

@ -197,11 +197,9 @@ func (p *OpenMetricsParser) Comment() []byte {
return p.text
}
// Metric writes the labels of the current sample into the passed labels.
// It returns the string from which the metric was parsed.
func (p *OpenMetricsParser) Metric(l *labels.Labels) string {
// Copy the buffer to a string: this is only necessary for the return value.
s := string(p.series)
// Labels writes the labels of the current sample into the passed labels.
func (p *OpenMetricsParser) Labels(l *labels.Labels) {
s := yoloString(p.series)
p.builder.Reset()
metricName := unreplace(s[p.offsets[0]-p.start : p.offsets[1]-p.start])
@ -220,8 +218,6 @@ func (p *OpenMetricsParser) Metric(l *labels.Labels) string {
p.builder.Sort()
*l = p.builder.Labels()
return s
}
// Exemplar writes the exemplar of the current sample into the passed exemplar.

View file

@ -223,11 +223,9 @@ func (p *PromParser) Comment() []byte {
return p.text
}
// Metric writes the labels of the current sample into the passed labels.
// It returns the string from which the metric was parsed.
func (p *PromParser) Metric(l *labels.Labels) string {
// Copy the buffer to a string: this is only necessary for the return value.
s := string(p.series)
// Labels writes the labels of the current sample into the passed labels.
func (p *PromParser) Labels(l *labels.Labels) {
s := yoloString(p.series)
p.builder.Reset()
metricName := unreplace(s[p.offsets[0]-p.start : p.offsets[1]-p.start])
@ -246,8 +244,6 @@ func (p *PromParser) Metric(l *labels.Labels) string {
p.builder.Sort()
*l = p.builder.Labels()
return s
}
// Exemplar implements the Parser interface. However, since the classic
@ -506,6 +502,10 @@ func yoloString(b []byte) string {
return unsafe.String(unsafe.SliceData(b), len(b))
}
func yoloBytes(b string) []byte {
return unsafe.Slice(unsafe.StringData(b), len(b))
}
func parseFloat(s string) (float64, error) {
// Keep to pre-Go 1.13 float formats.
if strings.ContainsAny(s, "pP_") {

View file

@ -15,7 +15,6 @@ package textparse
import (
"bytes"
"encoding/binary"
"errors"
"fmt"
"io"
@ -25,7 +24,6 @@ import (
"sync"
"unicode/utf8"
"github.com/gogo/protobuf/proto"
"github.com/gogo/protobuf/types"
"github.com/prometheus/common/model"
@ -45,24 +43,24 @@ var floatFormatBufPool = sync.Pool{
},
}
// ProtobufParser is a very inefficient way of unmarshaling the old Prometheus
// protobuf format and then present it as it if were parsed by a
// Prometheus-2-style text parser. This is only done so that we can easily plug
// in the protobuf format into Prometheus 2. For future use (with the final
// format that will be used for native histograms), we have to revisit the
// parsing. A lot of the efficiency tricks of the Prometheus-2-style parsing
// could be used in a similar fashion (byte-slice pointers into the raw
// payload), which requires some hand-coded protobuf handling. But the current
// parsers all expect the full series name (metric name plus label pairs) as one
// string, which is not how things are represented in the protobuf format. If
// the re-arrangement work is actually causing problems (which has to be seen),
// that expectation needs to be changed.
// ProtobufParser parses the old Prometheus protobuf format and present it
// as the text-style textparse.Parser interface.
//
// It uses a tailored streaming protobuf dto.MetricStreamingDecoder that
// reuses internal protobuf structs and allows direct unmarshalling to Prometheus
// types like labels.
type ProtobufParser struct {
in []byte // The input to parse.
inPos int // Position within the input.
metricPos int // Position within Metric slice.
dec *dto.MetricStreamingDecoder
// Used for both the string returned by Series and Histogram, as well as,
// metric family for Type, Unit and Help.
entryBytes *bytes.Buffer
lset labels.Labels
builder labels.ScratchBuilder // Held here to reduce allocations when building Labels.
// fieldPos is the position within a Summary or (legacy) Histogram. -2
// is the count. -1 is the sum. Otherwise it is the index within
// is the count. -1 is the sum. Otherwise, it is the index within
// quantiles/buckets.
fieldPos int
fieldsDone bool // true if no more fields of a Summary or (legacy) Histogram to be processed.
@ -78,27 +76,20 @@ type ProtobufParser struct {
// that we have to decode the next MetricFamily.
state Entry
builder labels.ScratchBuilder // held here to reduce allocations when building Labels
mf *dto.MetricFamily
// Whether to also parse a classic histogram that is also present as a
// native histogram.
parseClassicHistograms bool
// The following are just shenanigans to satisfy the Parser interface.
metricBytes *bytes.Buffer // A somewhat fluid representation of the current metric.
}
// NewProtobufParser returns a parser for the payload in the byte slice.
func NewProtobufParser(b []byte, parseClassicHistograms bool, st *labels.SymbolTable) Parser {
return &ProtobufParser{
in: b,
dec: dto.NewMetricStreamingDecoder(b),
entryBytes: &bytes.Buffer{},
builder: labels.NewScratchBuilderWithSymbolTable(st, 16), // TODO(bwplotka): Try base builder.
state: EntryInvalid,
mf: &dto.MetricFamily{},
metricBytes: &bytes.Buffer{},
parseClassicHistograms: parseClassicHistograms,
builder: labels.NewScratchBuilderWithSymbolTable(st, 16),
}
}
@ -106,19 +97,18 @@ func NewProtobufParser(b []byte, parseClassicHistograms bool, st *labels.SymbolT
// value, the timestamp if set, and the value of the current sample.
func (p *ProtobufParser) Series() ([]byte, *int64, float64) {
var (
m = p.mf.GetMetric()[p.metricPos]
ts = m.GetTimestampMs()
ts = &p.dec.TimestampMs // To save memory allocations, never nil.
v float64
)
switch p.mf.GetType() {
switch p.dec.GetType() {
case dto.MetricType_COUNTER:
v = m.GetCounter().GetValue()
v = p.dec.GetCounter().GetValue()
case dto.MetricType_GAUGE:
v = m.GetGauge().GetValue()
v = p.dec.GetGauge().GetValue()
case dto.MetricType_UNTYPED:
v = m.GetUntyped().GetValue()
v = p.dec.GetUntyped().GetValue()
case dto.MetricType_SUMMARY:
s := m.GetSummary()
s := p.dec.GetSummary()
switch p.fieldPos {
case -2:
v = float64(s.GetSampleCount())
@ -133,7 +123,7 @@ func (p *ProtobufParser) Series() ([]byte, *int64, float64) {
}
case dto.MetricType_HISTOGRAM, dto.MetricType_GAUGE_HISTOGRAM:
// This should only happen for a classic histogram.
h := m.GetHistogram()
h := p.dec.GetHistogram()
switch p.fieldPos {
case -2:
v = h.GetSampleCountFloat()
@ -159,8 +149,8 @@ func (p *ProtobufParser) Series() ([]byte, *int64, float64) {
default:
panic("encountered unexpected metric type, this is a bug")
}
if ts != 0 {
return p.metricBytes.Bytes(), &ts, v
if *ts != 0 {
return p.entryBytes.Bytes(), ts, v
}
// TODO(beorn7): We assume here that ts==0 means no timestamp. That's
// not true in general, but proto3 originally has no distinction between
@ -171,7 +161,7 @@ func (p *ProtobufParser) Series() ([]byte, *int64, float64) {
// away from gogo-protobuf to an actively maintained protobuf
// implementation. Once that's done, we can simply use the `optional`
// keyword and check for the unset state explicitly.
return p.metricBytes.Bytes(), nil, v
return p.entryBytes.Bytes(), nil, v
}
// Histogram returns the bytes of a series with a native histogram as a value,
@ -186,47 +176,56 @@ func (p *ProtobufParser) Series() ([]byte, *int64, float64) {
// value.
func (p *ProtobufParser) Histogram() ([]byte, *int64, *histogram.Histogram, *histogram.FloatHistogram) {
var (
m = p.mf.GetMetric()[p.metricPos]
ts = m.GetTimestampMs()
h = m.GetHistogram()
ts = &p.dec.TimestampMs // To save memory allocations, never nil.
h = p.dec.GetHistogram()
)
if p.parseClassicHistograms && len(h.GetBucket()) > 0 {
p.redoClassic = true
}
if h.GetSampleCountFloat() > 0 || h.GetZeroCountFloat() > 0 {
// It is a float histogram.
fh := histogram.FloatHistogram{
Count: h.GetSampleCountFloat(),
Sum: h.GetSampleSum(),
ZeroThreshold: h.GetZeroThreshold(),
ZeroCount: h.GetZeroCountFloat(),
Schema: h.GetSchema(),
Count: h.GetSampleCountFloat(),
Sum: h.GetSampleSum(),
ZeroThreshold: h.GetZeroThreshold(),
ZeroCount: h.GetZeroCountFloat(),
Schema: h.GetSchema(),
// Decoder reuses slices, so we need to copy.
PositiveSpans: make([]histogram.Span, len(h.GetPositiveSpan())),
PositiveBuckets: h.GetPositiveCount(),
PositiveBuckets: make([]float64, len(h.GetPositiveCount())),
NegativeSpans: make([]histogram.Span, len(h.GetNegativeSpan())),
NegativeBuckets: h.GetNegativeCount(),
NegativeBuckets: make([]float64, len(h.GetNegativeCount())),
}
for i, span := range h.GetPositiveSpan() {
fh.PositiveSpans[i].Offset = span.GetOffset()
fh.PositiveSpans[i].Length = span.GetLength()
}
for i, cnt := range h.GetPositiveCount() {
fh.PositiveBuckets[i] = cnt
}
for i, span := range h.GetNegativeSpan() {
fh.NegativeSpans[i].Offset = span.GetOffset()
fh.NegativeSpans[i].Length = span.GetLength()
}
if p.mf.GetType() == dto.MetricType_GAUGE_HISTOGRAM {
for i, cnt := range h.GetNegativeCount() {
fh.NegativeBuckets[i] = cnt
}
if p.dec.GetType() == dto.MetricType_GAUGE_HISTOGRAM {
fh.CounterResetHint = histogram.GaugeType
}
fh.Compact(0)
if ts != 0 {
return p.metricBytes.Bytes(), &ts, nil, &fh
if *ts != 0 {
return p.entryBytes.Bytes(), ts, nil, &fh
}
// Nasty hack: Assume that ts==0 means no timestamp. That's not true in
// general, but proto3 has no distinction between unset and
// default. Need to avoid in the final format.
return p.metricBytes.Bytes(), nil, nil, &fh
return p.entryBytes.Bytes(), nil, nil, &fh
}
// TODO(bwplotka): Create sync.Pool for those structs.
sh := histogram.Histogram{
Count: h.GetSampleCount(),
Sum: h.GetSampleSum(),
@ -234,41 +233,47 @@ func (p *ProtobufParser) Histogram() ([]byte, *int64, *histogram.Histogram, *his
ZeroCount: h.GetZeroCount(),
Schema: h.GetSchema(),
PositiveSpans: make([]histogram.Span, len(h.GetPositiveSpan())),
PositiveBuckets: h.GetPositiveDelta(),
PositiveBuckets: make([]int64, len(h.GetPositiveDelta())),
NegativeSpans: make([]histogram.Span, len(h.GetNegativeSpan())),
NegativeBuckets: h.GetNegativeDelta(),
NegativeBuckets: make([]int64, len(h.GetNegativeDelta())),
}
for i, span := range h.GetPositiveSpan() {
sh.PositiveSpans[i].Offset = span.GetOffset()
sh.PositiveSpans[i].Length = span.GetLength()
}
for i, cnt := range h.GetPositiveDelta() {
sh.PositiveBuckets[i] = cnt
}
for i, span := range h.GetNegativeSpan() {
sh.NegativeSpans[i].Offset = span.GetOffset()
sh.NegativeSpans[i].Length = span.GetLength()
}
if p.mf.GetType() == dto.MetricType_GAUGE_HISTOGRAM {
for i, cnt := range h.GetNegativeDelta() {
sh.NegativeBuckets[i] = cnt
}
if p.dec.GetType() == dto.MetricType_GAUGE_HISTOGRAM {
sh.CounterResetHint = histogram.GaugeType
}
sh.Compact(0)
if ts != 0 {
return p.metricBytes.Bytes(), &ts, &sh, nil
if *ts != 0 {
return p.entryBytes.Bytes(), ts, &sh, nil
}
return p.metricBytes.Bytes(), nil, &sh, nil
return p.entryBytes.Bytes(), nil, &sh, nil
}
// Help returns the metric name and help text in the current entry.
// Must only be called after Next returned a help entry.
// The returned byte slices become invalid after the next call to Next.
func (p *ProtobufParser) Help() ([]byte, []byte) {
return p.metricBytes.Bytes(), []byte(p.mf.GetHelp())
return p.entryBytes.Bytes(), yoloBytes(p.dec.GetHelp())
}
// Type returns the metric name and type in the current entry.
// Must only be called after Next returned a type entry.
// The returned byte slices become invalid after the next call to Next.
func (p *ProtobufParser) Type() ([]byte, model.MetricType) {
n := p.metricBytes.Bytes()
switch p.mf.GetType() {
n := p.entryBytes.Bytes()
switch p.dec.GetType() {
case dto.MetricType_COUNTER:
return n, model.MetricTypeCounter
case dto.MetricType_GAUGE:
@ -287,7 +292,7 @@ func (p *ProtobufParser) Type() ([]byte, model.MetricType) {
// Must only be called after Next returned a unit entry.
// The returned byte slices become invalid after the next call to Next.
func (p *ProtobufParser) Unit() ([]byte, []byte) {
return p.metricBytes.Bytes(), []byte(p.mf.GetUnit())
return p.entryBytes.Bytes(), []byte(p.dec.GetUnit())
}
// Comment always returns nil because comments aren't supported by the protobuf
@ -296,24 +301,9 @@ func (p *ProtobufParser) Comment() []byte {
return nil
}
// Metric writes the labels of the current sample into the passed labels.
// It returns the string from which the metric was parsed.
func (p *ProtobufParser) Metric(l *labels.Labels) string {
p.builder.Reset()
p.builder.Add(labels.MetricName, p.getMagicName())
for _, lp := range p.mf.GetMetric()[p.metricPos].GetLabel() {
p.builder.Add(lp.GetName(), lp.GetValue())
}
if needed, name, value := p.getMagicLabel(); needed {
p.builder.Add(name, value)
}
// Sort labels to maintain the sorted labels invariant.
p.builder.Sort()
*l = p.builder.Labels()
return p.metricBytes.String()
// Labels writes the labels of the current sample into the passed labels.
func (p *ProtobufParser) Labels(l *labels.Labels) {
*l = p.lset.Copy()
}
// Exemplar writes the exemplar of the current sample into the passed
@ -326,15 +316,14 @@ func (p *ProtobufParser) Exemplar(ex *exemplar.Exemplar) bool {
// We only ever return one exemplar per (non-native-histogram) series.
return false
}
m := p.mf.GetMetric()[p.metricPos]
var exProto *dto.Exemplar
switch p.mf.GetType() {
switch p.dec.GetType() {
case dto.MetricType_COUNTER:
exProto = m.GetCounter().GetExemplar()
exProto = p.dec.GetCounter().GetExemplar()
case dto.MetricType_HISTOGRAM, dto.MetricType_GAUGE_HISTOGRAM:
isClassic := p.state == EntrySeries
if !isClassic && len(m.GetHistogram().GetExemplars()) > 0 {
exs := m.GetHistogram().GetExemplars()
if !isClassic && len(p.dec.GetHistogram().GetExemplars()) > 0 {
exs := p.dec.GetHistogram().GetExemplars()
for p.exemplarPos < len(exs) {
exProto = exs[p.exemplarPos]
p.exemplarPos++
@ -346,7 +335,7 @@ func (p *ProtobufParser) Exemplar(ex *exemplar.Exemplar) bool {
return false
}
} else {
bb := m.GetHistogram().GetBucket()
bb := p.dec.GetHistogram().GetBucket()
if p.fieldPos < 0 {
if isClassic {
return false // At _count or _sum.
@ -394,13 +383,13 @@ func (p *ProtobufParser) Exemplar(ex *exemplar.Exemplar) bool {
// invalid (as timestamp e.g. negative value) on counters, summaries or histograms.
func (p *ProtobufParser) CreatedTimestamp() *int64 {
var ct *types.Timestamp
switch p.mf.GetType() {
switch p.dec.GetType() {
case dto.MetricType_COUNTER:
ct = p.mf.GetMetric()[p.metricPos].GetCounter().GetCreatedTimestamp()
ct = p.dec.GetCounter().GetCreatedTimestamp()
case dto.MetricType_SUMMARY:
ct = p.mf.GetMetric()[p.metricPos].GetSummary().GetCreatedTimestamp()
ct = p.dec.GetSummary().GetCreatedTimestamp()
case dto.MetricType_HISTOGRAM, dto.MetricType_GAUGE_HISTOGRAM:
ct = p.mf.GetMetric()[p.metricPos].GetHistogram().GetCreatedTimestamp()
ct = p.dec.GetHistogram().GetCreatedTimestamp()
default:
}
ctAsTime, err := types.TimestampFromProto(ct)
@ -418,31 +407,34 @@ func (p *ProtobufParser) CreatedTimestamp() *int64 {
func (p *ProtobufParser) Next() (Entry, error) {
p.exemplarReturned = false
switch p.state {
// Invalid state occurs on:
// * First Next() call.
// * Recursive call that tells Next to move to the next metric family.
case EntryInvalid:
p.metricPos = 0
p.exemplarPos = 0
p.fieldPos = -2
n, err := readDelimited(p.in[p.inPos:], p.mf)
p.inPos += n
if err != nil {
if err := p.dec.NextMetricFamily(); err != nil {
return p.state, err
}
// Skip empty metric families.
if len(p.mf.GetMetric()) == 0 {
return p.Next()
if err := p.dec.NextMetric(); err != nil {
// Skip empty metric families.
if errors.Is(err, io.EOF) {
return p.Next()
}
return EntryInvalid, err
}
// We are at the beginning of a metric family. Put only the name
// into metricBytes and validate only name, help, and type for now.
name := p.mf.GetName()
// into entryBytes and validate only name, help, and type for now.
name := p.dec.GetName()
if !model.IsValidMetricName(model.LabelValue(name)) {
return EntryInvalid, fmt.Errorf("invalid metric name: %s", name)
}
if help := p.mf.GetHelp(); !utf8.ValidString(help) {
if help := p.dec.GetHelp(); !utf8.ValidString(help) {
return EntryInvalid, fmt.Errorf("invalid help for metric %q: %s", name, help)
}
switch p.mf.GetType() {
switch p.dec.GetType() {
case dto.MetricType_COUNTER,
dto.MetricType_GAUGE,
dto.MetricType_HISTOGRAM,
@ -451,11 +443,11 @@ func (p *ProtobufParser) Next() (Entry, error) {
dto.MetricType_UNTYPED:
// All good.
default:
return EntryInvalid, fmt.Errorf("unknown metric type for metric %q: %s", name, p.mf.GetType())
return EntryInvalid, fmt.Errorf("unknown metric type for metric %q: %s", name, p.dec.GetType())
}
unit := p.mf.GetUnit()
unit := p.dec.GetUnit()
if len(unit) > 0 {
if p.mf.GetType() == dto.MetricType_COUNTER && strings.HasSuffix(name, "_total") {
if p.dec.GetType() == dto.MetricType_COUNTER && strings.HasSuffix(name, "_total") {
if !strings.HasSuffix(name[:len(name)-6], unit) || len(name)-6 < len(unit)+1 || name[len(name)-6-len(unit)-1] != '_' {
return EntryInvalid, fmt.Errorf("unit %q not a suffix of counter %q", unit, name)
}
@ -463,12 +455,11 @@ func (p *ProtobufParser) Next() (Entry, error) {
return EntryInvalid, fmt.Errorf("unit %q not a suffix of metric %q", unit, name)
}
}
p.metricBytes.Reset()
p.metricBytes.WriteString(name)
p.entryBytes.Reset()
p.entryBytes.WriteString(name)
p.state = EntryHelp
case EntryHelp:
if p.mf.Unit != "" {
if p.dec.Unit != "" {
p.state = EntryUnit
} else {
p.state = EntryType
@ -476,48 +467,78 @@ func (p *ProtobufParser) Next() (Entry, error) {
case EntryUnit:
p.state = EntryType
case EntryType:
t := p.mf.GetType()
t := p.dec.GetType()
if (t == dto.MetricType_HISTOGRAM || t == dto.MetricType_GAUGE_HISTOGRAM) &&
isNativeHistogram(p.mf.GetMetric()[0].GetHistogram()) {
isNativeHistogram(p.dec.GetHistogram()) {
p.state = EntryHistogram
} else {
p.state = EntrySeries
}
if err := p.updateMetricBytes(); err != nil {
if err := p.onSeriesOrHistogramUpdate(); err != nil {
return EntryInvalid, err
}
case EntryHistogram, EntrySeries:
if p.redoClassic {
p.redoClassic = false
p.state = EntrySeries
p.fieldPos = -3
p.fieldsDone = false
}
t := p.mf.GetType()
if p.state == EntrySeries && !p.fieldsDone &&
(t == dto.MetricType_SUMMARY ||
t == dto.MetricType_HISTOGRAM ||
t == dto.MetricType_GAUGE_HISTOGRAM) {
p.fieldPos++
} else {
p.metricPos++
case EntrySeries:
// Potentially a second series in the metric family.
t := p.dec.GetType()
if t == dto.MetricType_SUMMARY ||
t == dto.MetricType_HISTOGRAM ||
t == dto.MetricType_GAUGE_HISTOGRAM {
// Non-trivial series (complex metrics, with magic suffixes).
// Did we iterate over all the classic representations fields?
// NOTE: p.fieldsDone is updated on p.onSeriesOrHistogramUpdate.
if !p.fieldsDone {
// Still some fields to iterate over.
p.fieldPos++
if err := p.onSeriesOrHistogramUpdate(); err != nil {
return EntryInvalid, err
}
return p.state, nil
}
// Reset histogram fields.
p.fieldPos = -2
p.fieldsDone = false
p.exemplarPos = 0
// If this is a metric family containing native
// histograms, we have to switch back to native
// histograms after parsing a classic histogram.
if p.state == EntrySeries &&
(t == dto.MetricType_HISTOGRAM || t == dto.MetricType_GAUGE_HISTOGRAM) &&
isNativeHistogram(p.mf.GetMetric()[0].GetHistogram()) {
// histograms, it means we are here thanks to redoClassic state.
// Return to native histograms for the consistent flow.
if (t == dto.MetricType_HISTOGRAM || t == dto.MetricType_GAUGE_HISTOGRAM) &&
isNativeHistogram(p.dec.GetHistogram()) {
p.state = EntryHistogram
}
}
if p.metricPos >= len(p.mf.GetMetric()) {
p.state = EntryInvalid
return p.Next()
// Is there another series?
if err := p.dec.NextMetric(); err != nil {
if errors.Is(err, io.EOF) {
p.state = EntryInvalid
return p.Next()
}
return EntryInvalid, err
}
if err := p.updateMetricBytes(); err != nil {
if err := p.onSeriesOrHistogramUpdate(); err != nil {
return EntryInvalid, err
}
case EntryHistogram:
// Was Histogram() called and parseClassicHistograms is true?
if p.redoClassic {
p.redoClassic = false
p.fieldPos = -3
p.fieldsDone = false
p.state = EntrySeries
return p.Next() // Switch to classic histogram.
}
// Is there another series?
if err := p.dec.NextMetric(); err != nil {
if errors.Is(err, io.EOF) {
p.state = EntryInvalid
return p.Next()
}
return EntryInvalid, err
}
if err := p.onSeriesOrHistogramUpdate(); err != nil {
return EntryInvalid, err
}
default:
@ -526,30 +547,39 @@ func (p *ProtobufParser) Next() (Entry, error) {
return p.state, nil
}
func (p *ProtobufParser) updateMetricBytes() error {
b := p.metricBytes
b.Reset()
b.WriteString(p.getMagicName())
for _, lp := range p.mf.GetMetric()[p.metricPos].GetLabel() {
b.WriteByte(model.SeparatorByte)
n := lp.GetName()
if !model.LabelName(n).IsValid() {
return fmt.Errorf("invalid label name: %s", n)
}
b.WriteString(n)
b.WriteByte(model.SeparatorByte)
v := lp.GetValue()
if !utf8.ValidString(v) {
return fmt.Errorf("invalid label value: %s", v)
}
b.WriteString(v)
// onSeriesOrHistogramUpdate updates internal state before returning
// a series or histogram. It updates:
// * p.lset.
// * p.entryBytes.
// * p.fieldsDone depending on p.fieldPos.
func (p *ProtobufParser) onSeriesOrHistogramUpdate() error {
p.builder.Reset()
p.builder.Add(labels.MetricName, p.getMagicName())
if err := p.dec.Label(&p.builder); err != nil {
return err
}
if needed, n, v := p.getMagicLabel(); needed {
b.WriteByte(model.SeparatorByte)
b.WriteString(n)
b.WriteByte(model.SeparatorByte)
b.WriteString(v)
if needed, name, value := p.getMagicLabel(); needed {
p.builder.Add(name, value)
}
// Sort labels to maintain the sorted labels invariant.
p.builder.Sort()
p.builder.Overwrite(&p.lset)
// entryBytes has to be unique for each series.
p.entryBytes.Reset()
p.lset.Range(func(l labels.Label) {
if l.Name == labels.MetricName {
p.entryBytes.WriteString(l.Value)
return
}
p.entryBytes.WriteByte(model.SeparatorByte)
p.entryBytes.WriteString(l.Name)
p.entryBytes.WriteByte(model.SeparatorByte)
p.entryBytes.WriteString(l.Value)
})
return nil
}
@ -557,36 +587,37 @@ func (p *ProtobufParser) updateMetricBytes() error {
// ("_count", "_sum", "_bucket") if needed according to the current parser
// state.
func (p *ProtobufParser) getMagicName() string {
t := p.mf.GetType()
t := p.dec.GetType()
if p.state == EntryHistogram || (t != dto.MetricType_HISTOGRAM && t != dto.MetricType_GAUGE_HISTOGRAM && t != dto.MetricType_SUMMARY) {
return p.mf.GetName()
return p.dec.GetName()
}
if p.fieldPos == -2 {
return p.mf.GetName() + "_count"
return p.dec.GetName() + "_count"
}
if p.fieldPos == -1 {
return p.mf.GetName() + "_sum"
return p.dec.GetName() + "_sum"
}
if t == dto.MetricType_HISTOGRAM || t == dto.MetricType_GAUGE_HISTOGRAM {
return p.mf.GetName() + "_bucket"
return p.dec.GetName() + "_bucket"
}
return p.mf.GetName()
return p.dec.GetName()
}
// getMagicLabel returns if a magic label ("quantile" or "le") is needed and, if
// so, its name and value. It also sets p.fieldsDone if applicable.
func (p *ProtobufParser) getMagicLabel() (bool, string, string) {
// Native histogram or _count and _sum series.
if p.state == EntryHistogram || p.fieldPos < 0 {
return false, "", ""
}
switch p.mf.GetType() {
switch p.dec.GetType() {
case dto.MetricType_SUMMARY:
qq := p.mf.GetMetric()[p.metricPos].GetSummary().GetQuantile()
qq := p.dec.GetSummary().GetQuantile()
q := qq[p.fieldPos]
p.fieldsDone = p.fieldPos == len(qq)-1
return true, model.QuantileLabel, formatOpenMetricsFloat(q.GetQuantile())
case dto.MetricType_HISTOGRAM, dto.MetricType_GAUGE_HISTOGRAM:
bb := p.mf.GetMetric()[p.metricPos].GetHistogram().GetBucket()
bb := p.dec.GetHistogram().GetBucket()
if p.fieldPos >= len(bb) {
p.fieldsDone = true
return true, model.BucketLabel, "+Inf"
@ -598,29 +629,6 @@ func (p *ProtobufParser) getMagicLabel() (bool, string, string) {
return false, "", ""
}
var errInvalidVarint = errors.New("protobufparse: invalid varint encountered")
// readDelimited is essentially doing what the function of the same name in
// github.com/matttproud/golang_protobuf_extensions/pbutil is doing, but it is
// specific to a MetricFamily, utilizes the more efficient gogo-protobuf
// unmarshaling, and acts on a byte slice directly without any additional
// staging buffers.
func readDelimited(b []byte, mf *dto.MetricFamily) (n int, err error) {
if len(b) == 0 {
return 0, io.EOF
}
messageLength, varIntLength := proto.DecodeVarint(b)
if varIntLength == 0 || varIntLength > binary.MaxVarintLen32 {
return 0, errInvalidVarint
}
totalLength := varIntLength + int(messageLength)
if totalLength > len(b) {
return 0, fmt.Errorf("protobufparse: insufficient length of buffer, expected at least %d bytes, got %d bytes", totalLength, len(b))
}
mf.Reset()
return totalLength, mf.Unmarshal(b[varIntLength:totalLength])
}
// formatOpenMetricsFloat works like the usual Go string formatting of a float
// but appends ".0" if the resulting number would otherwise contain neither a
// "." nor an "e".

View file

@ -1246,7 +1246,7 @@ func TestProtobufParse(t *testing.T) {
),
},
{
m: "rpc_durations_seconds\xffservice\xffexponential\xffquantile\xff0.5",
m: "rpc_durations_seconds\xffquantile\xff0.5\xffservice\xffexponential",
v: 6.442786329648548e-07,
lset: labels.FromStrings(
"__name__", "rpc_durations_seconds",
@ -1255,7 +1255,7 @@ func TestProtobufParse(t *testing.T) {
),
},
{
m: "rpc_durations_seconds\xffservice\xffexponential\xffquantile\xff0.9",
m: "rpc_durations_seconds\xffquantile\xff0.9\xffservice\xffexponential",
v: 1.9435742936658396e-06,
lset: labels.FromStrings(
"__name__", "rpc_durations_seconds",
@ -1264,7 +1264,7 @@ func TestProtobufParse(t *testing.T) {
),
},
{
m: "rpc_durations_seconds\xffservice\xffexponential\xffquantile\xff0.99",
m: "rpc_durations_seconds\xffquantile\xff0.99\xffservice\xffexponential",
v: 4.0471608667037015e-06,
lset: labels.FromStrings(
"__name__", "rpc_durations_seconds",
@ -2199,7 +2199,7 @@ func TestProtobufParse(t *testing.T) {
),
},
{
m: "rpc_durations_seconds\xffservice\xffexponential\xffquantile\xff0.5",
m: "rpc_durations_seconds\xffquantile\xff0.5\xffservice\xffexponential",
v: 6.442786329648548e-07,
lset: labels.FromStrings(
"__name__", "rpc_durations_seconds",
@ -2208,7 +2208,7 @@ func TestProtobufParse(t *testing.T) {
),
},
{
m: "rpc_durations_seconds\xffservice\xffexponential\xffquantile\xff0.9",
m: "rpc_durations_seconds\xffquantile\xff0.9\xffservice\xffexponential",
v: 1.9435742936658396e-06,
lset: labels.FromStrings(
"__name__", "rpc_durations_seconds",
@ -2217,7 +2217,7 @@ func TestProtobufParse(t *testing.T) {
),
},
{
m: "rpc_durations_seconds\xffservice\xffexponential\xffquantile\xff0.99",
m: "rpc_durations_seconds\xffquantile\xff0.99\xffservice\xffexponential",
v: 4.0471608667037015e-06,
lset: labels.FromStrings(
"__name__", "rpc_durations_seconds",

View file

@ -0,0 +1,780 @@
// Copyright 2025 The Prometheus Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package io_prometheus_client //nolint:revive
import (
"encoding/binary"
"errors"
"fmt"
"io"
"unicode/utf8"
"unsafe"
proto "github.com/gogo/protobuf/proto"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/model/labels"
)
type MetricStreamingDecoder struct {
in []byte
inPos int
// TODO(bwplotka): Switch to generator/plugin that won't have those fields accessible e.g. OpaqueAPI
// We leverage the fact those two don't collide.
*MetricFamily // Without Metric, guarded by overridden GetMetric method.
*Metric // Without Label, guarded by overridden GetLabel method.
mfData []byte
metrics []pos
metricIndex int
mData []byte
labels []pos
}
// NewMetricStreamingDecoder returns a Go iterator that unmarshals given protobuf bytes one
// metric family and metric at the time, allowing efficient streaming.
//
// Do not modify MetricStreamingDecoder between iterations as it's reused to save allocations.
// GetGauge, GetCounter, etc are also cached, which means GetGauge will work for counter
// if previously gauge was parsed. It's up to the caller to use Type to decide what
// method to use when checking the value.
//
// TODO(bwplotka): io.Reader approach is possible too, but textparse has access to whole scrape for now.
func NewMetricStreamingDecoder(data []byte) *MetricStreamingDecoder {
return &MetricStreamingDecoder{
in: data,
MetricFamily: &MetricFamily{},
Metric: &Metric{},
metrics: make([]pos, 0, 100),
}
}
var errInvalidVarint = errors.New("clientpb: invalid varint encountered")
func (m *MetricStreamingDecoder) NextMetricFamily() error {
b := m.in[m.inPos:]
if len(b) == 0 {
return io.EOF
}
messageLength, varIntLength := proto.DecodeVarint(b) // TODO(bwplotka): Get rid of gogo.
if varIntLength == 0 || varIntLength > binary.MaxVarintLen32 {
return errInvalidVarint
}
totalLength := varIntLength + int(messageLength)
if totalLength > len(b) {
return fmt.Errorf("clientpb: insufficient length of buffer, expected at least %d bytes, got %d bytes", totalLength, len(b))
}
m.resetMetricFamily()
m.mfData = b[varIntLength:totalLength]
m.inPos += totalLength
return m.MetricFamily.unmarshalWithoutMetrics(m, m.mfData)
}
// resetMetricFamily resets all the fields in m to equal the zero value, but re-using slice memory.
func (m *MetricStreamingDecoder) resetMetricFamily() {
m.metrics = m.metrics[:0]
m.metricIndex = 0
m.MetricFamily.Reset()
}
func (m *MetricStreamingDecoder) NextMetric() error {
if m.metricIndex >= len(m.metrics) {
return io.EOF
}
m.resetMetric()
m.mData = m.mfData[m.metrics[m.metricIndex].start:m.metrics[m.metricIndex].end]
if err := m.Metric.unmarshalWithoutLabels(m, m.mData); err != nil {
return err
}
m.metricIndex++
return nil
}
// resetMetric resets all the fields in m to equal the zero value, but re-using slices memory.
func (m *MetricStreamingDecoder) resetMetric() {
m.labels = m.labels[:0]
m.TimestampMs = 0
// TODO(bwplotka): Autogenerate reset functions.
if m.Metric.Counter != nil {
m.Metric.Counter.Value = 0
m.Metric.Counter.CreatedTimestamp = nil
m.Metric.Counter.Exemplar = nil
}
if m.Metric.Gauge != nil {
m.Metric.Gauge.Value = 0
}
if m.Metric.Histogram != nil {
m.Metric.Histogram.SampleCount = 0
m.Metric.Histogram.SampleCountFloat = 0
m.Metric.Histogram.SampleSum = 0
m.Metric.Histogram.Bucket = m.Metric.Histogram.Bucket[:0]
m.Metric.Histogram.CreatedTimestamp = nil
m.Metric.Histogram.Schema = 0
m.Metric.Histogram.ZeroThreshold = 0
m.Metric.Histogram.ZeroCount = 0
m.Metric.Histogram.ZeroCountFloat = 0
m.Metric.Histogram.NegativeSpan = m.Metric.Histogram.NegativeSpan[:0]
m.Metric.Histogram.NegativeDelta = m.Metric.Histogram.NegativeDelta[:0]
m.Metric.Histogram.NegativeCount = m.Metric.Histogram.NegativeCount[:0]
m.Metric.Histogram.PositiveSpan = m.Metric.Histogram.PositiveSpan[:0]
m.Metric.Histogram.PositiveDelta = m.Metric.Histogram.PositiveDelta[:0]
m.Metric.Histogram.PositiveCount = m.Metric.Histogram.PositiveCount[:0]
m.Metric.Histogram.Exemplars = m.Metric.Histogram.Exemplars[:0]
}
if m.Metric.Summary != nil {
m.Metric.Summary.SampleCount = 0
m.Metric.Summary.SampleSum = 0
m.Metric.Summary.Quantile = m.Metric.Summary.Quantile[:0]
m.Metric.Summary.CreatedTimestamp = nil
}
}
func (m *MetricStreamingDecoder) GetMetric() {
panic("don't use GetMetric, use Metric directly")
}
func (m *MetricStreamingDecoder) GetLabel() {
panic("don't use GetLabel, use Label instead")
}
// Label parses labels into labels scratch builder. Metric name is missing
// given the protobuf metric model and has to be deduced from the metric family name.
// TODO: The method name intentionally hide MetricStreamingDecoder.Metric.Label
// field to avoid direct use (it's not parsed). In future generator will generate
// structs tailored for streaming decoding.
func (m *MetricStreamingDecoder) Label(b *labels.ScratchBuilder) error {
for _, l := range m.labels {
if err := parseLabel(m.mData[l.start:l.end], b); err != nil {
return err
}
}
return nil
}
// parseLabels is essentially LabelPair.Unmarshal but directly adding into scratch builder
// and reusing strings.
func parseLabel(dAtA []byte, b *labels.ScratchBuilder) error {
var name, value string
l := len(dAtA)
iNdEx := 0
for iNdEx < l {
preIndex := iNdEx
var wire uint64
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return ErrIntOverflowMetrics
}
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
b := dAtA[iNdEx]
iNdEx++
wire |= uint64(b&0x7F) << shift
if b < 0x80 {
break
}
}
fieldNum := int32(wire >> 3)
wireType := int(wire & 0x7)
if wireType == 4 {
return errors.New("proto: LabelPair: wiretype end group for non-group")
}
if fieldNum <= 0 {
return fmt.Errorf("proto: LabelPair: illegal tag %d (wire type %d)", fieldNum, wire)
}
switch fieldNum {
case 1:
if wireType != 2 {
return fmt.Errorf("proto: wrong wireType = %d for field Name", wireType)
}
var stringLen uint64
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return ErrIntOverflowMetrics
}
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
b := dAtA[iNdEx]
iNdEx++
stringLen |= uint64(b&0x7F) << shift
if b < 0x80 {
break
}
}
intStringLen := int(stringLen)
if intStringLen < 0 {
return ErrInvalidLengthMetrics
}
postIndex := iNdEx + intStringLen
if postIndex < 0 {
return ErrInvalidLengthMetrics
}
if postIndex > l {
return io.ErrUnexpectedEOF
}
name = yoloString(dAtA[iNdEx:postIndex])
if !model.LabelName(name).IsValid() {
return fmt.Errorf("invalid label name: %s", name)
}
iNdEx = postIndex
case 2:
if wireType != 2 {
return fmt.Errorf("proto: wrong wireType = %d for field Value", wireType)
}
var stringLen uint64
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return ErrIntOverflowMetrics
}
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
b := dAtA[iNdEx]
iNdEx++
stringLen |= uint64(b&0x7F) << shift
if b < 0x80 {
break
}
}
intStringLen := int(stringLen)
if intStringLen < 0 {
return ErrInvalidLengthMetrics
}
postIndex := iNdEx + intStringLen
if postIndex < 0 {
return ErrInvalidLengthMetrics
}
if postIndex > l {
return io.ErrUnexpectedEOF
}
value = yoloString(dAtA[iNdEx:postIndex])
if !utf8.ValidString(value) {
return fmt.Errorf("invalid label value: %s", value)
}
iNdEx = postIndex
default:
iNdEx = preIndex
skippy, err := skipMetrics(dAtA[iNdEx:])
if err != nil {
return err
}
if (skippy < 0) || (iNdEx+skippy) < 0 {
return ErrInvalidLengthMetrics
}
if (iNdEx + skippy) > l {
return io.ErrUnexpectedEOF
}
iNdEx += skippy
}
}
if iNdEx > l {
return io.ErrUnexpectedEOF
}
b.Add(name, value)
return nil
}
func yoloString(b []byte) string {
return unsafe.String(unsafe.SliceData(b), len(b))
}
type pos struct {
start, end int
}
func (m *Metric) unmarshalWithoutLabels(p *MetricStreamingDecoder, dAtA []byte) error {
l := len(dAtA)
iNdEx := 0
for iNdEx < l {
preIndex := iNdEx
var wire uint64
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return ErrIntOverflowMetrics
}
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
b := dAtA[iNdEx]
iNdEx++
wire |= uint64(b&0x7F) << shift
if b < 0x80 {
break
}
}
fieldNum := int32(wire >> 3)
wireType := int(wire & 0x7)
if wireType == 4 {
return errors.New("proto: Metric: wiretype end group for non-group")
}
if fieldNum <= 0 {
return fmt.Errorf("proto: Metric: illegal tag %d (wire type %d)", fieldNum, wire)
}
switch fieldNum {
case 1:
if wireType != 2 {
return fmt.Errorf("proto: wrong wireType = %d for field Label", wireType)
}
var msglen int
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return ErrIntOverflowMetrics
}
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
b := dAtA[iNdEx]
iNdEx++
msglen |= int(b&0x7F) << shift
if b < 0x80 {
break
}
}
if msglen < 0 {
return ErrInvalidLengthMetrics
}
postIndex := iNdEx + msglen
if postIndex < 0 {
return ErrInvalidLengthMetrics
}
if postIndex > l {
return io.ErrUnexpectedEOF
}
p.labels = append(p.labels, pos{start: iNdEx, end: postIndex})
iNdEx = postIndex
case 2:
if wireType != 2 {
return fmt.Errorf("proto: wrong wireType = %d for field Gauge", wireType)
}
var msglen int
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return ErrIntOverflowMetrics
}
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
b := dAtA[iNdEx]
iNdEx++
msglen |= int(b&0x7F) << shift
if b < 0x80 {
break
}
}
if msglen < 0 {
return ErrInvalidLengthMetrics
}
postIndex := iNdEx + msglen
if postIndex < 0 {
return ErrInvalidLengthMetrics
}
if postIndex > l {
return io.ErrUnexpectedEOF
}
if m.Gauge == nil {
m.Gauge = &Gauge{}
}
if err := m.Gauge.Unmarshal(dAtA[iNdEx:postIndex]); err != nil {
return err
}
iNdEx = postIndex
case 3:
if wireType != 2 {
return fmt.Errorf("proto: wrong wireType = %d for field Counter", wireType)
}
var msglen int
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return ErrIntOverflowMetrics
}
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
b := dAtA[iNdEx]
iNdEx++
msglen |= int(b&0x7F) << shift
if b < 0x80 {
break
}
}
if msglen < 0 {
return ErrInvalidLengthMetrics
}
postIndex := iNdEx + msglen
if postIndex < 0 {
return ErrInvalidLengthMetrics
}
if postIndex > l {
return io.ErrUnexpectedEOF
}
if m.Counter == nil {
m.Counter = &Counter{}
}
if err := m.Counter.Unmarshal(dAtA[iNdEx:postIndex]); err != nil {
return err
}
iNdEx = postIndex
case 4:
if wireType != 2 {
return fmt.Errorf("proto: wrong wireType = %d for field Summary", wireType)
}
var msglen int
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return ErrIntOverflowMetrics
}
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
b := dAtA[iNdEx]
iNdEx++
msglen |= int(b&0x7F) << shift
if b < 0x80 {
break
}
}
if msglen < 0 {
return ErrInvalidLengthMetrics
}
postIndex := iNdEx + msglen
if postIndex < 0 {
return ErrInvalidLengthMetrics
}
if postIndex > l {
return io.ErrUnexpectedEOF
}
if m.Summary == nil {
m.Summary = &Summary{}
}
if err := m.Summary.Unmarshal(dAtA[iNdEx:postIndex]); err != nil {
return err
}
iNdEx = postIndex
case 5:
if wireType != 2 {
return fmt.Errorf("proto: wrong wireType = %d for field Untyped", wireType)
}
var msglen int
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return ErrIntOverflowMetrics
}
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
b := dAtA[iNdEx]
iNdEx++
msglen |= int(b&0x7F) << shift
if b < 0x80 {
break
}
}
if msglen < 0 {
return ErrInvalidLengthMetrics
}
postIndex := iNdEx + msglen
if postIndex < 0 {
return ErrInvalidLengthMetrics
}
if postIndex > l {
return io.ErrUnexpectedEOF
}
if m.Untyped == nil {
m.Untyped = &Untyped{}
}
if err := m.Untyped.Unmarshal(dAtA[iNdEx:postIndex]); err != nil {
return err
}
iNdEx = postIndex
case 6:
if wireType != 0 {
return fmt.Errorf("proto: wrong wireType = %d for field TimestampMs", wireType)
}
m.TimestampMs = 0
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return ErrIntOverflowMetrics
}
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
b := dAtA[iNdEx]
iNdEx++
m.TimestampMs |= int64(b&0x7F) << shift
if b < 0x80 {
break
}
}
case 7:
if wireType != 2 {
return fmt.Errorf("proto: wrong wireType = %d for field Histogram", wireType)
}
var msglen int
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return ErrIntOverflowMetrics
}
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
b := dAtA[iNdEx]
iNdEx++
msglen |= int(b&0x7F) << shift
if b < 0x80 {
break
}
}
if msglen < 0 {
return ErrInvalidLengthMetrics
}
postIndex := iNdEx + msglen
if postIndex < 0 {
return ErrInvalidLengthMetrics
}
if postIndex > l {
return io.ErrUnexpectedEOF
}
if m.Histogram == nil {
m.Histogram = &Histogram{}
}
if err := m.Histogram.Unmarshal(dAtA[iNdEx:postIndex]); err != nil {
return err
}
iNdEx = postIndex
default:
iNdEx = preIndex
skippy, err := skipMetrics(dAtA[iNdEx:])
if err != nil {
return err
}
if (skippy < 0) || (iNdEx+skippy) < 0 {
return ErrInvalidLengthMetrics
}
if (iNdEx + skippy) > l {
return io.ErrUnexpectedEOF
}
m.XXX_unrecognized = append(m.XXX_unrecognized, dAtA[iNdEx:iNdEx+skippy]...)
iNdEx += skippy
}
}
if iNdEx > l {
return io.ErrUnexpectedEOF
}
return nil
}
func (m *MetricFamily) unmarshalWithoutMetrics(buf *MetricStreamingDecoder, dAtA []byte) error {
l := len(dAtA)
iNdEx := 0
for iNdEx < l {
preIndex := iNdEx
var wire uint64
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return ErrIntOverflowMetrics
}
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
b := dAtA[iNdEx]
iNdEx++
wire |= uint64(b&0x7F) << shift
if b < 0x80 {
break
}
}
fieldNum := int32(wire >> 3)
wireType := int(wire & 0x7)
if wireType == 4 {
return errors.New("proto: MetricFamily: wiretype end group for non-group")
}
if fieldNum <= 0 {
return fmt.Errorf("proto: MetricFamily: illegal tag %d (wire type %d)", fieldNum, wire)
}
switch fieldNum {
case 1:
if wireType != 2 {
return fmt.Errorf("proto: wrong wireType = %d for field Name", wireType)
}
var stringLen uint64
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return ErrIntOverflowMetrics
}
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
b := dAtA[iNdEx]
iNdEx++
stringLen |= uint64(b&0x7F) << shift
if b < 0x80 {
break
}
}
intStringLen := int(stringLen)
if intStringLen < 0 {
return ErrInvalidLengthMetrics
}
postIndex := iNdEx + intStringLen
if postIndex < 0 {
return ErrInvalidLengthMetrics
}
if postIndex > l {
return io.ErrUnexpectedEOF
}
m.Name = yoloString(dAtA[iNdEx:postIndex])
iNdEx = postIndex
case 2:
if wireType != 2 {
return fmt.Errorf("proto: wrong wireType = %d for field Help", wireType)
}
var stringLen uint64
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return ErrIntOverflowMetrics
}
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
b := dAtA[iNdEx]
iNdEx++
stringLen |= uint64(b&0x7F) << shift
if b < 0x80 {
break
}
}
intStringLen := int(stringLen)
if intStringLen < 0 {
return ErrInvalidLengthMetrics
}
postIndex := iNdEx + intStringLen
if postIndex < 0 {
return ErrInvalidLengthMetrics
}
if postIndex > l {
return io.ErrUnexpectedEOF
}
m.Help = yoloString(dAtA[iNdEx:postIndex])
iNdEx = postIndex
case 3:
if wireType != 0 {
return fmt.Errorf("proto: wrong wireType = %d for field Type", wireType)
}
m.Type = 0
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return ErrIntOverflowMetrics
}
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
b := dAtA[iNdEx]
iNdEx++
m.Type |= MetricType(b&0x7F) << shift
if b < 0x80 {
break
}
}
case 4:
if wireType != 2 {
return fmt.Errorf("proto: wrong wireType = %d for field Metric", wireType)
}
var msglen int
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return ErrIntOverflowMetrics
}
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
b := dAtA[iNdEx]
iNdEx++
msglen |= int(b&0x7F) << shift
if b < 0x80 {
break
}
}
if msglen < 0 {
return ErrInvalidLengthMetrics
}
postIndex := iNdEx + msglen
if postIndex < 0 {
return ErrInvalidLengthMetrics
}
if postIndex > l {
return io.ErrUnexpectedEOF
}
buf.metrics = append(buf.metrics, pos{start: iNdEx, end: postIndex})
iNdEx = postIndex
case 5:
if wireType != 2 {
return fmt.Errorf("proto: wrong wireType = %d for field Unit", wireType)
}
var stringLen uint64
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return ErrIntOverflowMetrics
}
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
b := dAtA[iNdEx]
iNdEx++
stringLen |= uint64(b&0x7F) << shift
if b < 0x80 {
break
}
}
intStringLen := int(stringLen)
if intStringLen < 0 {
return ErrInvalidLengthMetrics
}
postIndex := iNdEx + intStringLen
if postIndex < 0 {
return ErrInvalidLengthMetrics
}
if postIndex > l {
return io.ErrUnexpectedEOF
}
m.Unit = yoloString(dAtA[iNdEx:postIndex])
iNdEx = postIndex
default:
iNdEx = preIndex
skippy, err := skipMetrics(dAtA[iNdEx:])
if err != nil {
return err
}
if (skippy < 0) || (iNdEx+skippy) < 0 {
return ErrInvalidLengthMetrics
}
if (iNdEx + skippy) > l {
return io.ErrUnexpectedEOF
}
m.XXX_unrecognized = append(m.XXX_unrecognized, dAtA[iNdEx:iNdEx+skippy]...)
iNdEx += skippy
}
}
if iNdEx > l {
return io.ErrUnexpectedEOF
}
return nil
}

View file

@ -0,0 +1,171 @@
// Copyright 2025 The Prometheus Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package io_prometheus_client //nolint:revive
import (
"bytes"
"encoding/binary"
"errors"
"io"
"testing"
"github.com/gogo/protobuf/proto"
"github.com/stretchr/testify/require"
"github.com/prometheus/prometheus/model/labels"
)
const (
testGauge = `name: "go_build_info"
help: "Build information about the main Go module."
type: GAUGE
metric: <
label: <
name: "checksum"
value: ""
>
label: <
name: "path"
value: "github.com/prometheus/client_golang"
>
label: <
name: "version"
value: "(devel)"
>
gauge: <
value: 1
>
>
metric: <
label: <
name: "checksum"
value: ""
>
label: <
name: "path"
value: "github.com/prometheus/prometheus"
>
label: <
name: "version"
value: "v3.0.0"
>
gauge: <
value: 2
>
>
`
testCounter = `name: "go_memstats_alloc_bytes_total"
help: "Total number of bytes allocated, even if freed."
type: COUNTER
unit: "bytes"
metric: <
counter: <
value: 1.546544e+06
exemplar: <
label: <
name: "dummyID"
value: "42"
>
value: 12
timestamp: <
seconds: 1625851151
nanos: 233181499
>
>
>
>
`
)
func TestMetricStreamingDecoder(t *testing.T) {
varintBuf := make([]byte, binary.MaxVarintLen32)
buf := bytes.Buffer{}
for _, m := range []string{testGauge, testCounter} {
mf := &MetricFamily{}
require.NoError(t, proto.UnmarshalText(m, mf))
// From proto message to binary protobuf.
protoBuf, err := proto.Marshal(mf)
require.NoError(t, err)
// Write first length, then binary protobuf.
varintLength := binary.PutUvarint(varintBuf, uint64(len(protoBuf)))
buf.Write(varintBuf[:varintLength])
buf.Write(protoBuf)
}
d := NewMetricStreamingDecoder(buf.Bytes())
require.NoError(t, d.NextMetricFamily())
nextFn := func() error {
for {
err := d.NextMetric()
if errors.Is(err, io.EOF) {
if err := d.NextMetricFamily(); err != nil {
return err
}
continue
}
return err
}
}
var firstMetricLset labels.Labels
{
require.NoError(t, nextFn())
require.Equal(t, "go_build_info", d.GetName())
require.Equal(t, "Build information about the main Go module.", d.GetHelp())
require.Equal(t, MetricType_GAUGE, d.GetType())
require.Equal(t, float64(1), d.GetGauge().GetValue())
b := labels.NewScratchBuilder(0)
require.NoError(t, d.Label(&b))
firstMetricLset = b.Labels()
require.Equal(t, `{checksum="", path="github.com/prometheus/client_golang", version="(devel)"}`, firstMetricLset.String())
}
{
require.NoError(t, nextFn())
require.Equal(t, "go_build_info", d.GetName())
require.Equal(t, "Build information about the main Go module.", d.GetHelp())
require.Equal(t, MetricType_GAUGE, d.GetType())
require.Equal(t, float64(2), d.GetGauge().GetValue())
b := labels.NewScratchBuilder(0)
require.NoError(t, d.Label(&b))
require.Equal(t, `{checksum="", path="github.com/prometheus/prometheus", version="v3.0.0"}`, b.Labels().String())
}
{
// Different mf now.
require.NoError(t, nextFn())
require.Equal(t, "go_memstats_alloc_bytes_total", d.GetName())
require.Equal(t, "Total number of bytes allocated, even if freed.", d.GetHelp())
require.Equal(t, "bytes", d.GetUnit())
require.Equal(t, MetricType_COUNTER, d.GetType())
require.Equal(t, 1.546544e+06, d.Metric.GetCounter().GetValue())
b := labels.NewScratchBuilder(0)
require.NoError(t, d.Label(&b))
require.Equal(t, `{}`, b.Labels().String())
}
require.Equal(t, io.EOF, nextFn())
// Expect labels and metricBytes to be static and reusable even after parsing.
require.Equal(t, `{checksum="", path="github.com/prometheus/client_golang", version="(devel)"}`, firstMetricLset.String())
}

View file

@ -1686,6 +1686,28 @@ func (ev *evaluator) evalSubquery(ctx context.Context, subq *parser.SubqueryExpr
VectorSelector: vs,
}
for _, s := range mat {
// Set any "NotCounterReset" and "CounterReset" hints in native
// histograms to "UnknownCounterReset" because we might
// otherwise miss a counter reset happening in samples not
// returned by the subquery, or we might over-detect counter
// resets if the sample with a counter reset is returned
// multiple times by a high-res subquery. This intentionally
// does not attempt to be clever (like detecting if we are
// really missing underlying samples or returning underlying
// samples multiple times) because subqueries on counters are
// inherently problematic WRT counter reset handling, so we
// cannot really solve the problem for good. We only want to
// avoid problems that happen due to the explicitly set counter
// reset hints and go back to the behavior we already know from
// float samples.
for i, hp := range s.Histograms {
switch hp.H.CounterResetHint {
case histogram.NotCounterReset, histogram.CounterReset:
h := *hp.H // Shallow copy is sufficient, we only change CounterResetHint.
h.CounterResetHint = histogram.UnknownCounterReset
s.Histograms[i].H = &h
}
}
vs.Series = append(vs.Series, NewStorageSeries(s))
}
return ms, mat.TotalSamples(), ws
@ -3691,11 +3713,11 @@ func formatDate(t time.Time) string {
// unwrapParenExpr does the AST equivalent of removing parentheses around a expression.
func unwrapParenExpr(e *parser.Expr) {
for {
if p, ok := (*e).(*parser.ParenExpr); ok {
*e = p.Expr
} else {
p, ok := (*e).(*parser.ParenExpr)
if !ok {
break
}
*e = p.Expr
}
}

View file

@ -134,3 +134,19 @@ eval instant at 20s min_over_time(metric_total[15s:10s])
eval instant at 20m min_over_time(rate(metric_total[5m])[20m:1m])
{} 0.12119047619047618
clear
# This native histogram series has a counter reset at 5m.
# TODO(beorn7): Write this more nicely once https://github.com/prometheus/prometheus/issues/15984 is fixed.
load 1m
native_histogram {{sum:100 count:100}} {{sum:103 count:103}} {{sum:106 count:106}} {{sum:109 count:109}} {{sum:112 count:112}} {{sum:3 count:3 counter_reset_hint:reset}} {{sum:6 count:6}}+{{sum:3 count:3}}x5
# This sub-query has to detect the counter reset even if it does
# not include the exact sample where the counter reset has happened.
eval instant at 10m increase(native_histogram[10m:3m])
{} {{count:25 sum:25}}
# This sub-query must not detect the counter reset multiple times
# even though the sample where the counter reset happened is returned
# by the sub-query multiple times.
eval instant at 10m increase(native_histogram[10m:15s])
{} {{count:30.769230769230766 sum:30.769230769230766}}

View file

@ -1110,9 +1110,6 @@ func buildDependencyMap(rules []Rule) dependencyMap {
return dependencies
}
inputs := make(map[string][]Rule, len(rules))
outputs := make(map[string][]Rule, len(rules))
var indeterminate bool
for _, rule := range rules {
@ -1120,26 +1117,46 @@ func buildDependencyMap(rules []Rule) dependencyMap {
break
}
name := rule.Name()
outputs[name] = append(outputs[name], rule)
parser.Inspect(rule.Query(), func(node parser.Node, path []parser.Node) error {
if n, ok := node.(*parser.VectorSelector); ok {
// Find the name matcher for the rule.
var nameMatcher *labels.Matcher
if n.Name != "" {
nameMatcher = labels.MustNewMatcher(labels.MatchEqual, model.MetricNameLabel, n.Name)
} else {
for _, m := range n.LabelMatchers {
if m.Name == model.MetricNameLabel {
nameMatcher = m
break
}
}
}
// A wildcard metric expression means we cannot reliably determine if this rule depends on any other,
// which means we cannot safely run any rules concurrently.
if n.Name == "" && len(n.LabelMatchers) > 0 {
if nameMatcher == nil {
indeterminate = true
return nil
}
// Rules which depend on "meta-metrics" like ALERTS and ALERTS_FOR_STATE will have undefined behaviour
// if they run concurrently.
if n.Name == alertMetricName || n.Name == alertForStateMetricName {
if nameMatcher.Matches(alertMetricName) || nameMatcher.Matches(alertForStateMetricName) {
indeterminate = true
return nil
}
inputs[n.Name] = append(inputs[n.Name], rule)
// Find rules which depend on the output of this rule.
for _, other := range rules {
if other == rule {
continue
}
otherName := other.Name()
if nameMatcher.Matches(otherName) {
dependencies[other] = append(dependencies[other], rule)
}
}
}
return nil
})
@ -1149,13 +1166,5 @@ func buildDependencyMap(rules []Rule) dependencyMap {
return nil
}
for output, outRules := range outputs {
for _, outRule := range outRules {
if inRules, found := inputs[output]; found && len(inRules) > 0 {
dependencies[outRule] = append(dependencies[outRule], inRules...)
}
}
}
return dependencies
}

View file

@ -324,16 +324,16 @@ func (m *Manager) LoadGroups(
rules := make([]Rule, 0, len(rg.Rules))
for _, r := range rg.Rules {
expr, err := m.opts.GroupLoader.Parse(r.Expr.Value)
expr, err := m.opts.GroupLoader.Parse(r.Expr)
if err != nil {
return nil, []error{fmt.Errorf("%s: %w", fn, err)}
}
mLabels := FromMaps(rg.Labels, r.Labels)
if r.Alert.Value != "" {
if r.Alert != "" {
rules = append(rules, NewAlertingRule(
r.Alert.Value,
r.Alert,
expr,
time.Duration(r.For),
time.Duration(r.KeepFiringFor),
@ -347,7 +347,7 @@ func (m *Manager) LoadGroups(
continue
}
rules = append(rules, NewRecordingRule(
r.Record.Value,
r.Record,
expr,
mLabels,
))

View file

@ -842,7 +842,7 @@ func TestUpdate(t *testing.T) {
// Change group rules and reload.
for i, g := range rgs.Groups {
for j, r := range g.Rules {
rgs.Groups[i].Rules[j].Expr.SetString(fmt.Sprintf("%s * 0", r.Expr.Value))
rgs.Groups[i].Rules[j].Expr = fmt.Sprintf("%s * 0", r.Expr)
}
}
reloadAndValidate(rgs, t, tmpFile, ruleManager, ogs)
@ -869,9 +869,9 @@ func formatRules(r *rulefmt.RuleGroups) ruleGroupsTest {
rtmp := []rulefmt.Rule{}
for _, r := range g.Rules {
rtmp = append(rtmp, rulefmt.Rule{
Record: r.Record.Value,
Alert: r.Alert.Value,
Expr: r.Expr.Value,
Record: r.Record,
Alert: r.Alert,
Expr: r.Expr,
For: r.For,
Labels: r.Labels,
Annotations: r.Annotations,
@ -1601,10 +1601,14 @@ func TestDependencyMap(t *testing.T) {
require.NoError(t, err)
rule4 := NewRecordingRule("user:requests:increase1h", expr, labels.Labels{})
expr, err = parser.ParseExpr(`sum by (user) ({__name__=~"user:requests.+5m"})`)
require.NoError(t, err)
rule5 := NewRecordingRule("user:requests:sum5m", expr, labels.Labels{})
group := NewGroup(GroupOptions{
Name: "rule_group",
Interval: time.Second,
Rules: []Rule{rule, rule2, rule3, rule4},
Rules: []Rule{rule, rule2, rule3, rule4, rule5},
Opts: opts,
})
@ -1619,13 +1623,17 @@ func TestDependencyMap(t *testing.T) {
require.Equal(t, []Rule{rule}, depMap.dependencies(rule2))
require.False(t, depMap.isIndependent(rule2))
require.Zero(t, depMap.dependents(rule3))
require.Equal(t, []Rule{rule5}, depMap.dependents(rule3))
require.Zero(t, depMap.dependencies(rule3))
require.True(t, depMap.isIndependent(rule3))
require.False(t, depMap.isIndependent(rule3))
require.Zero(t, depMap.dependents(rule4))
require.Equal(t, []Rule{rule}, depMap.dependencies(rule4))
require.False(t, depMap.isIndependent(rule4))
require.Zero(t, depMap.dependents(rule5))
require.Equal(t, []Rule{rule3}, depMap.dependencies(rule5))
require.False(t, depMap.isIndependent(rule5))
}
func TestNoDependency(t *testing.T) {

View file

@ -1714,7 +1714,7 @@ loop:
lset = ce.lset
hash = ce.hash
} else {
p.Metric(&lset)
p.Labels(&lset)
hash = lset.Hash()
// Hash label set as it is seen local to the target. Then add target labels
@ -1962,12 +1962,24 @@ func isSeriesPartOfFamily(mName string, mfName []byte, typ model.MetricType) boo
// Adds samples to the appender, checking the error, and then returns the # of samples added,
// whether the caller should continue to process more samples, and any sample or bucket limit errors.
// Switch error cases for Sample and Bucket limits are checked first since they're more common
// during normal operation (e.g., accidental cardinality explosion, sudden traffic spikes).
// Current case ordering prevents exercising other cases when limits are exceeded.
// Remaining error cases typically occur only a few times, often during initial setup.
func (sl *scrapeLoop) checkAddError(met []byte, err error, sampleLimitErr, bucketLimitErr *error, appErrs *appendErrors) (bool, error) {
switch {
case err == nil:
return true, nil
case errors.Is(err, storage.ErrNotFound):
return false, storage.ErrNotFound
case errors.Is(err, errSampleLimit):
// Keep on parsing output if we hit the limit, so we report the correct
// total number of samples scraped.
*sampleLimitErr = err
return false, nil
case errors.Is(err, errBucketLimit):
// Keep on parsing output if we hit the limit, so we report the bucket
// total number of samples scraped.
*bucketLimitErr = err
return false, nil
case errors.Is(err, storage.ErrOutOfOrderSample):
appErrs.numOutOfOrder++
sl.l.Debug("Out of order sample", "series", string(met))
@ -1983,16 +1995,8 @@ func (sl *scrapeLoop) checkAddError(met []byte, err error, sampleLimitErr, bucke
sl.l.Debug("Out of bounds metric", "series", string(met))
sl.metrics.targetScrapeSampleOutOfBounds.Inc()
return false, nil
case errors.Is(err, errSampleLimit):
// Keep on parsing output if we hit the limit, so we report the correct
// total number of samples scraped.
*sampleLimitErr = err
return false, nil
case errors.Is(err, errBucketLimit):
// Keep on parsing output if we hit the limit, so we report the correct
// total number of samples scraped.
*bucketLimitErr = err
return false, nil
case errors.Is(err, storage.ErrNotFound):
return false, storage.ErrNotFound
default:
return false, err
}

View file

@ -1895,6 +1895,7 @@ func TestScrapeLoopAppend(t *testing.T) {
}
func requireEqual(t *testing.T, expected, actual interface{}, msgAndArgs ...interface{}) {
t.Helper()
testutil.RequireEqualWithOptions(t, expected, actual,
[]cmp.Option{cmp.Comparer(equalFloatSamples), cmp.AllowUnexported(histogramSample{})},
msgAndArgs...)
@ -1988,7 +1989,7 @@ func TestScrapeLoopAppendCacheEntryButErrNotFound(t *testing.T) {
var lset labels.Labels
p.Next()
p.Metric(&lset)
p.Labels(&lset)
hash := lset.Hash()
// Create a fake entry in the cache

View file

@ -349,11 +349,10 @@ func (tokenProvider *tokenProvider) getToken(ctx context.Context) error {
func (tokenProvider *tokenProvider) updateRefreshTime(accessToken azcore.AccessToken) error {
tokenExpiryTimestamp := accessToken.ExpiresOn.UTC()
deltaExpirytime := time.Now().Add(time.Until(tokenExpiryTimestamp) / 2)
if deltaExpirytime.After(time.Now().UTC()) {
tokenProvider.refreshTime = deltaExpirytime
} else {
if !deltaExpirytime.After(time.Now().UTC()) {
return errors.New("access token expiry is less than the current time")
}
tokenProvider.refreshTime = deltaExpirytime
return nil
}

View file

@ -135,7 +135,7 @@ func TestBasicContentNegotiation(t *testing.T) {
} {
t.Run(tc.name, func(t *testing.T) {
dir := t.TempDir()
s := NewStorage(nil, nil, nil, dir, defaultFlushDeadline, nil, true)
s := NewStorage(nil, nil, nil, dir, defaultFlushDeadline, nil)
defer s.Close()
var (
@ -243,7 +243,7 @@ func TestSampleDelivery(t *testing.T) {
} {
t.Run(fmt.Sprintf("%s-%s", tc.protoMsg, tc.name), func(t *testing.T) {
dir := t.TempDir()
s := NewStorage(nil, nil, nil, dir, defaultFlushDeadline, nil, true)
s := NewStorage(nil, nil, nil, dir, defaultFlushDeadline, nil)
defer s.Close()
var (
@ -362,7 +362,7 @@ func TestMetadataDelivery(t *testing.T) {
func TestWALMetadataDelivery(t *testing.T) {
dir := t.TempDir()
s := NewStorage(nil, nil, nil, dir, defaultFlushDeadline, nil, true)
s := NewStorage(nil, nil, nil, dir, defaultFlushDeadline, nil)
defer s.Close()
cfg := config.DefaultQueueConfig

View file

@ -93,7 +93,7 @@ func TestNoDuplicateReadConfigs(t *testing.T) {
for _, tc := range cases {
t.Run("", func(t *testing.T) {
s := NewStorage(nil, nil, nil, dir, defaultFlushDeadline, nil, false)
s := NewStorage(nil, nil, nil, dir, defaultFlushDeadline, nil)
conf := &config.Config{
GlobalConfig: config.DefaultGlobalConfig,
RemoteReadConfigs: tc.cfgs,

View file

@ -64,7 +64,7 @@ type Storage struct {
}
// NewStorage returns a remote.Storage.
func NewStorage(l *slog.Logger, reg prometheus.Registerer, stCallback startTimeCallback, walDir string, flushDeadline time.Duration, sm ReadyScrapeManager, metadataInWAL bool) *Storage {
func NewStorage(l *slog.Logger, reg prometheus.Registerer, stCallback startTimeCallback, walDir string, flushDeadline time.Duration, sm ReadyScrapeManager) *Storage {
if l == nil {
l = promslog.NewNopLogger()
}
@ -76,7 +76,7 @@ func NewStorage(l *slog.Logger, reg prometheus.Registerer, stCallback startTimeC
deduper: deduper,
localStartTimeCallback: stCallback,
}
s.rws = NewWriteStorage(s.logger, reg, walDir, flushDeadline, sm, metadataInWAL)
s.rws = NewWriteStorage(s.logger, reg, walDir, flushDeadline, sm)
return s
}

View file

@ -29,7 +29,7 @@ import (
func TestStorageLifecycle(t *testing.T) {
dir := t.TempDir()
s := NewStorage(nil, nil, nil, dir, defaultFlushDeadline, nil, false)
s := NewStorage(nil, nil, nil, dir, defaultFlushDeadline, nil)
conf := &config.Config{
GlobalConfig: config.DefaultGlobalConfig,
RemoteWriteConfigs: []*config.RemoteWriteConfig{
@ -56,7 +56,7 @@ func TestStorageLifecycle(t *testing.T) {
func TestUpdateRemoteReadConfigs(t *testing.T) {
dir := t.TempDir()
s := NewStorage(nil, nil, nil, dir, defaultFlushDeadline, nil, false)
s := NewStorage(nil, nil, nil, dir, defaultFlushDeadline, nil)
conf := &config.Config{
GlobalConfig: config.GlobalConfig{},
@ -77,7 +77,7 @@ func TestUpdateRemoteReadConfigs(t *testing.T) {
func TestFilterExternalLabels(t *testing.T) {
dir := t.TempDir()
s := NewStorage(nil, nil, nil, dir, defaultFlushDeadline, nil, false)
s := NewStorage(nil, nil, nil, dir, defaultFlushDeadline, nil)
conf := &config.Config{
GlobalConfig: config.GlobalConfig{
@ -102,7 +102,7 @@ func TestFilterExternalLabels(t *testing.T) {
func TestIgnoreExternalLabels(t *testing.T) {
dir := t.TempDir()
s := NewStorage(nil, nil, nil, dir, defaultFlushDeadline, nil, false)
s := NewStorage(nil, nil, nil, dir, defaultFlushDeadline, nil)
conf := &config.Config{
GlobalConfig: config.GlobalConfig{
@ -154,7 +154,7 @@ func baseRemoteReadConfig(host string) *config.RemoteReadConfig {
// ApplyConfig runs concurrently with Notify
// See https://github.com/prometheus/prometheus/issues/12747
func TestWriteStorageApplyConfigsDuringCommit(t *testing.T) {
s := NewStorage(nil, nil, nil, t.TempDir(), defaultFlushDeadline, nil, false)
s := NewStorage(nil, nil, nil, t.TempDir(), defaultFlushDeadline, nil)
var wg sync.WaitGroup
wg.Add(2000)

View file

@ -15,7 +15,6 @@ package remote
import (
"context"
"errors"
"fmt"
"log/slog"
"math"
@ -67,7 +66,6 @@ type WriteStorage struct {
externalLabels labels.Labels
dir string
queues map[string]*QueueManager
metadataInWAL bool
samplesIn *ewmaRate
flushDeadline time.Duration
interner *pool
@ -79,7 +77,7 @@ type WriteStorage struct {
}
// NewWriteStorage creates and runs a WriteStorage.
func NewWriteStorage(logger *slog.Logger, reg prometheus.Registerer, dir string, flushDeadline time.Duration, sm ReadyScrapeManager, metadataInWal bool) *WriteStorage {
func NewWriteStorage(logger *slog.Logger, reg prometheus.Registerer, dir string, flushDeadline time.Duration, sm ReadyScrapeManager) *WriteStorage {
if logger == nil {
logger = promslog.NewNopLogger()
}
@ -95,7 +93,6 @@ func NewWriteStorage(logger *slog.Logger, reg prometheus.Registerer, dir string,
interner: newPool(),
scraper: sm,
quit: make(chan struct{}),
metadataInWAL: metadataInWal,
highestTimestamp: &maxTimestamp{
Gauge: prometheus.NewGauge(prometheus.GaugeOpts{
Namespace: namespace,
@ -149,9 +146,6 @@ func (rws *WriteStorage) ApplyConfig(conf *config.Config) error {
newQueues := make(map[string]*QueueManager)
newHashes := []string{}
for _, rwConf := range conf.RemoteWriteConfigs {
if rwConf.ProtobufMessage == config.RemoteWriteProtoMsgV2 && !rws.metadataInWAL {
return errors.New("invalid remote write configuration, if you are using remote write version 2.0 the `--enable-feature=metadata-wal-records` feature flag must be enabled")
}
hash, err := toHash(rwConf)
if err != nil {
return err

View file

@ -117,7 +117,7 @@ func TestWriteStorageApplyConfig_NoDuplicateWriteConfigs(t *testing.T) {
},
} {
t.Run("", func(t *testing.T) {
s := NewWriteStorage(nil, nil, dir, time.Millisecond, nil, false)
s := NewWriteStorage(nil, nil, dir, time.Millisecond, nil)
conf := &config.Config{
GlobalConfig: config.DefaultGlobalConfig,
RemoteWriteConfigs: tc.cfgs,
@ -143,7 +143,7 @@ func TestWriteStorageApplyConfig_RestartOnNameChange(t *testing.T) {
hash, err := toHash(cfg)
require.NoError(t, err)
s := NewWriteStorage(nil, nil, dir, time.Millisecond, nil, false)
s := NewWriteStorage(nil, nil, dir, time.Millisecond, nil)
conf := &config.Config{
GlobalConfig: config.DefaultGlobalConfig,
@ -165,7 +165,7 @@ func TestWriteStorageApplyConfig_RestartOnNameChange(t *testing.T) {
func TestWriteStorageApplyConfig_UpdateWithRegisterer(t *testing.T) {
dir := t.TempDir()
s := NewWriteStorage(nil, prometheus.NewRegistry(), dir, time.Millisecond, nil, false)
s := NewWriteStorage(nil, prometheus.NewRegistry(), dir, time.Millisecond, nil)
c1 := &config.RemoteWriteConfig{
Name: "named",
URL: &common_config.URL{
@ -206,7 +206,7 @@ func TestWriteStorageApplyConfig_UpdateWithRegisterer(t *testing.T) {
func TestWriteStorageApplyConfig_Lifecycle(t *testing.T) {
dir := t.TempDir()
s := NewWriteStorage(nil, nil, dir, defaultFlushDeadline, nil, false)
s := NewWriteStorage(nil, nil, dir, defaultFlushDeadline, nil)
conf := &config.Config{
GlobalConfig: config.DefaultGlobalConfig,
RemoteWriteConfigs: []*config.RemoteWriteConfig{
@ -222,7 +222,7 @@ func TestWriteStorageApplyConfig_Lifecycle(t *testing.T) {
func TestWriteStorageApplyConfig_UpdateExternalLabels(t *testing.T) {
dir := t.TempDir()
s := NewWriteStorage(nil, prometheus.NewRegistry(), dir, time.Second, nil, false)
s := NewWriteStorage(nil, prometheus.NewRegistry(), dir, time.Second, nil)
externalLabels := labels.FromStrings("external", "true")
conf := &config.Config{
@ -250,7 +250,7 @@ func TestWriteStorageApplyConfig_UpdateExternalLabels(t *testing.T) {
func TestWriteStorageApplyConfig_Idempotent(t *testing.T) {
dir := t.TempDir()
s := NewWriteStorage(nil, nil, dir, defaultFlushDeadline, nil, false)
s := NewWriteStorage(nil, nil, dir, defaultFlushDeadline, nil)
conf := &config.Config{
GlobalConfig: config.GlobalConfig{},
RemoteWriteConfigs: []*config.RemoteWriteConfig{
@ -274,7 +274,7 @@ func TestWriteStorageApplyConfig_Idempotent(t *testing.T) {
func TestWriteStorageApplyConfig_PartialUpdate(t *testing.T) {
dir := t.TempDir()
s := NewWriteStorage(nil, nil, dir, defaultFlushDeadline, nil, false)
s := NewWriteStorage(nil, nil, dir, defaultFlushDeadline, nil)
c0 := &config.RemoteWriteConfig{
RemoteTimeout: model.Duration(10 * time.Second),

View file

@ -235,6 +235,12 @@ type DB struct {
appenderPool sync.Pool
bufPool sync.Pool
// These pools are used during WAL replay.
walReplaySeriesPool zeropool.Pool[[]record.RefSeries]
walReplaySamplesPool zeropool.Pool[[]record.RefSample]
walReplayHistogramsPool zeropool.Pool[[]record.RefHistogramSample]
walReplayFloatHistogramsPool zeropool.Pool[[]record.RefFloatHistogramSample]
nextRef *atomic.Uint64
series *stripeSeries
// deleted is a map of (ref IDs that should be deleted from WAL) to (the WAL segment they
@ -426,11 +432,6 @@ func (db *DB) loadWAL(r *wlog.Reader, multiRef map[chunks.HeadSeriesRef]chunks.H
decoded = make(chan interface{}, 10)
errCh = make(chan error, 1)
seriesPool zeropool.Pool[[]record.RefSeries]
samplesPool zeropool.Pool[[]record.RefSample]
histogramsPool zeropool.Pool[[]record.RefHistogramSample]
floatHistogramsPool zeropool.Pool[[]record.RefFloatHistogramSample]
)
go func() {
@ -440,7 +441,7 @@ func (db *DB) loadWAL(r *wlog.Reader, multiRef map[chunks.HeadSeriesRef]chunks.H
rec := r.Record()
switch dec.Type(rec) {
case record.Series:
series := seriesPool.Get()[:0]
series := db.walReplaySeriesPool.Get()[:0]
series, err = dec.Series(rec, series)
if err != nil {
errCh <- &wlog.CorruptionErr{
@ -452,7 +453,7 @@ func (db *DB) loadWAL(r *wlog.Reader, multiRef map[chunks.HeadSeriesRef]chunks.H
}
decoded <- series
case record.Samples:
samples := samplesPool.Get()[:0]
samples := db.walReplaySamplesPool.Get()[:0]
samples, err = dec.Samples(rec, samples)
if err != nil {
errCh <- &wlog.CorruptionErr{
@ -464,7 +465,7 @@ func (db *DB) loadWAL(r *wlog.Reader, multiRef map[chunks.HeadSeriesRef]chunks.H
}
decoded <- samples
case record.HistogramSamples, record.CustomBucketsHistogramSamples:
histograms := histogramsPool.Get()[:0]
histograms := db.walReplayHistogramsPool.Get()[:0]
histograms, err = dec.HistogramSamples(rec, histograms)
if err != nil {
errCh <- &wlog.CorruptionErr{
@ -476,7 +477,7 @@ func (db *DB) loadWAL(r *wlog.Reader, multiRef map[chunks.HeadSeriesRef]chunks.H
}
decoded <- histograms
case record.FloatHistogramSamples, record.CustomBucketsFloatHistogramSamples:
floatHistograms := floatHistogramsPool.Get()[:0]
floatHistograms := db.walReplayFloatHistogramsPool.Get()[:0]
floatHistograms, err = dec.FloatHistogramSamples(rec, floatHistograms)
if err != nil {
errCh <- &wlog.CorruptionErr{
@ -521,7 +522,7 @@ func (db *DB) loadWAL(r *wlog.Reader, multiRef map[chunks.HeadSeriesRef]chunks.H
}
}
}
seriesPool.Put(v)
db.walReplaySeriesPool.Put(v)
case []record.RefSample:
for _, entry := range v {
// Update the lastTs for the series based
@ -535,7 +536,7 @@ func (db *DB) loadWAL(r *wlog.Reader, multiRef map[chunks.HeadSeriesRef]chunks.H
series.lastTs = entry.T
}
}
samplesPool.Put(v)
db.walReplaySamplesPool.Put(v)
case []record.RefHistogramSample:
for _, entry := range v {
// Update the lastTs for the series based
@ -549,7 +550,7 @@ func (db *DB) loadWAL(r *wlog.Reader, multiRef map[chunks.HeadSeriesRef]chunks.H
series.lastTs = entry.T
}
}
histogramsPool.Put(v)
db.walReplayHistogramsPool.Put(v)
case []record.RefFloatHistogramSample:
for _, entry := range v {
// Update the lastTs for the series based
@ -563,7 +564,7 @@ func (db *DB) loadWAL(r *wlog.Reader, multiRef map[chunks.HeadSeriesRef]chunks.H
series.lastTs = entry.T
}
}
floatHistogramsPool.Put(v)
db.walReplayFloatHistogramsPool.Put(v)
default:
panic(fmt.Errorf("unexpected decoded type: %T", d))
}
@ -1026,12 +1027,11 @@ func (a *appender) AppendHistogramCTZeroSample(ref storage.SeriesRef, l labels.L
return 0, storage.ErrOutOfOrderCT
}
if ct > series.lastTs {
series.lastTs = ct
} else {
if ct <= series.lastTs {
// discard the sample if it's out of order.
return 0, storage.ErrOutOfOrderCT
}
series.lastTs = ct
switch {
case h != nil:
@ -1091,12 +1091,11 @@ func (a *appender) AppendCTZeroSample(ref storage.SeriesRef, l labels.Labels, t,
return 0, storage.ErrOutOfOrderSample
}
if ct > series.lastTs {
series.lastTs = ct
} else {
if ct <= series.lastTs {
// discard the sample if it's out of order.
return 0, storage.ErrOutOfOrderCT
}
series.lastTs = ct
// NOTE: always modify pendingSamples and sampleSeries together.
a.pendingSamples = append(a.pendingSamples, record.RefSample{

View file

@ -91,7 +91,7 @@ func createTestAgentDB(t testing.TB, reg prometheus.Registerer, opts *Options) *
t.Helper()
dbDir := t.TempDir()
rs := remote.NewStorage(promslog.NewNopLogger(), reg, startTime, dbDir, time.Second*30, nil, false)
rs := remote.NewStorage(promslog.NewNopLogger(), reg, startTime, dbDir, time.Second*30, nil)
t.Cleanup(func() {
require.NoError(t, rs.Close())
})
@ -737,7 +737,7 @@ func TestLockfile(t *testing.T) {
tsdbutil.TestDirLockerUsage(t, func(t *testing.T, data string, createLock bool) (*tsdbutil.DirLocker, testutil.Closer) {
logger := promslog.NewNopLogger()
reg := prometheus.NewRegistry()
rs := remote.NewStorage(logger, reg, startTime, data, time.Second*30, nil, false)
rs := remote.NewStorage(logger, reg, startTime, data, time.Second*30, nil)
t.Cleanup(func() {
require.NoError(t, rs.Close())
})
@ -757,7 +757,7 @@ func TestLockfile(t *testing.T) {
func Test_ExistingWAL_NextRef(t *testing.T) {
dbDir := t.TempDir()
rs := remote.NewStorage(promslog.NewNopLogger(), nil, startTime, dbDir, time.Second*30, nil, false)
rs := remote.NewStorage(promslog.NewNopLogger(), nil, startTime, dbDir, time.Second*30, nil)
defer func() {
require.NoError(t, rs.Close())
}()

View file

@ -94,6 +94,16 @@ type Head struct {
bytesPool zeropool.Pool[[]byte]
memChunkPool sync.Pool
// These pools are used during WAL/WBL replay.
wlReplaySeriesPool zeropool.Pool[[]record.RefSeries]
wlReplaySamplesPool zeropool.Pool[[]record.RefSample]
wlReplaytStonesPool zeropool.Pool[[]tombstones.Stone]
wlReplayExemplarsPool zeropool.Pool[[]record.RefExemplar]
wlReplayHistogramsPool zeropool.Pool[[]record.RefHistogramSample]
wlReplayFloatHistogramsPool zeropool.Pool[[]record.RefFloatHistogramSample]
wlReplayMetadataPool zeropool.Pool[[]record.RefMetadata]
wlReplayMmapMarkersPool zeropool.Pool[[]record.RefMmapMarker]
// All series addressable by their ID or hash.
series *stripeSeries

View file

@ -46,6 +46,7 @@ import (
"github.com/prometheus/prometheus/storage"
"github.com/prometheus/prometheus/tsdb/chunkenc"
"github.com/prometheus/prometheus/tsdb/chunks"
"github.com/prometheus/prometheus/tsdb/fileutil"
"github.com/prometheus/prometheus/tsdb/index"
"github.com/prometheus/prometheus/tsdb/record"
"github.com/prometheus/prometheus/tsdb/tombstones"
@ -440,27 +441,41 @@ func BenchmarkLoadWLs(b *testing.B) {
// BenchmarkLoadRealWLs will be skipped unless the BENCHMARK_LOAD_REAL_WLS_DIR environment variable is set.
// BENCHMARK_LOAD_REAL_WLS_DIR should be the folder where `wal` and `chunks_head` are located.
//
// Using an absolute path for BENCHMARK_LOAD_REAL_WLS_DIR is recommended.
//
// Because WLs loading may alter BENCHMARK_LOAD_REAL_WLS_DIR which can affect benchmark results and to ensure consistency,
// a copy of BENCHMARK_LOAD_REAL_WLS_DIR is made for each iteration and deleted at the end.
// Make sure there is sufficient disk space for that.
func BenchmarkLoadRealWLs(b *testing.B) {
dir := os.Getenv("BENCHMARK_LOAD_REAL_WLS_DIR")
if dir == "" {
srcDir := os.Getenv("BENCHMARK_LOAD_REAL_WLS_DIR")
if srcDir == "" {
b.SkipNow()
}
wal, err := wlog.New(nil, nil, filepath.Join(dir, "wal"), wlog.CompressionNone)
require.NoError(b, err)
b.Cleanup(func() { wal.Close() })
wbl, err := wlog.New(nil, nil, filepath.Join(dir, "wbl"), wlog.CompressionNone)
require.NoError(b, err)
b.Cleanup(func() { wbl.Close() })
// Load the WAL.
for i := 0; i < b.N; i++ {
b.StopTimer()
dir := b.TempDir()
require.NoError(b, fileutil.CopyDirs(srcDir, dir))
wal, err := wlog.New(nil, nil, filepath.Join(dir, "wal"), wlog.CompressionNone)
require.NoError(b, err)
b.Cleanup(func() { wal.Close() })
wbl, err := wlog.New(nil, nil, filepath.Join(dir, "wbl"), wlog.CompressionNone)
require.NoError(b, err)
b.Cleanup(func() { wbl.Close() })
b.StartTimer()
opts := DefaultHeadOptions()
opts.ChunkDirRoot = dir
h, err := NewHead(nil, nil, wal, wbl, opts, nil)
require.NoError(b, err)
require.NoError(b, h.Init(0))
b.StopTimer()
require.NoError(b, os.RemoveAll(dir))
}
}

View file

@ -39,7 +39,6 @@ import (
"github.com/prometheus/prometheus/tsdb/record"
"github.com/prometheus/prometheus/tsdb/tombstones"
"github.com/prometheus/prometheus/tsdb/wlog"
"github.com/prometheus/prometheus/util/zeropool"
)
// histogramRecord combines both RefHistogramSample and RefFloatHistogramSample
@ -73,14 +72,6 @@ func (h *Head) loadWAL(r *wlog.Reader, syms *labels.SymbolTable, multiRef map[ch
decoded = make(chan interface{}, 10)
decodeErr, seriesCreationErr error
seriesPool zeropool.Pool[[]record.RefSeries]
samplesPool zeropool.Pool[[]record.RefSample]
tstonesPool zeropool.Pool[[]tombstones.Stone]
exemplarsPool zeropool.Pool[[]record.RefExemplar]
histogramsPool zeropool.Pool[[]record.RefHistogramSample]
floatHistogramsPool zeropool.Pool[[]record.RefFloatHistogramSample]
metadataPool zeropool.Pool[[]record.RefMetadata]
)
defer func() {
@ -140,7 +131,7 @@ func (h *Head) loadWAL(r *wlog.Reader, syms *labels.SymbolTable, multiRef map[ch
rec := r.Record()
switch dec.Type(rec) {
case record.Series:
series := seriesPool.Get()[:0]
series := h.wlReplaySeriesPool.Get()[:0]
series, err = dec.Series(rec, series)
if err != nil {
decodeErr = &wlog.CorruptionErr{
@ -152,7 +143,7 @@ func (h *Head) loadWAL(r *wlog.Reader, syms *labels.SymbolTable, multiRef map[ch
}
decoded <- series
case record.Samples:
samples := samplesPool.Get()[:0]
samples := h.wlReplaySamplesPool.Get()[:0]
samples, err = dec.Samples(rec, samples)
if err != nil {
decodeErr = &wlog.CorruptionErr{
@ -164,7 +155,7 @@ func (h *Head) loadWAL(r *wlog.Reader, syms *labels.SymbolTable, multiRef map[ch
}
decoded <- samples
case record.Tombstones:
tstones := tstonesPool.Get()[:0]
tstones := h.wlReplaytStonesPool.Get()[:0]
tstones, err = dec.Tombstones(rec, tstones)
if err != nil {
decodeErr = &wlog.CorruptionErr{
@ -176,7 +167,7 @@ func (h *Head) loadWAL(r *wlog.Reader, syms *labels.SymbolTable, multiRef map[ch
}
decoded <- tstones
case record.Exemplars:
exemplars := exemplarsPool.Get()[:0]
exemplars := h.wlReplayExemplarsPool.Get()[:0]
exemplars, err = dec.Exemplars(rec, exemplars)
if err != nil {
decodeErr = &wlog.CorruptionErr{
@ -188,7 +179,7 @@ func (h *Head) loadWAL(r *wlog.Reader, syms *labels.SymbolTable, multiRef map[ch
}
decoded <- exemplars
case record.HistogramSamples, record.CustomBucketsHistogramSamples:
hists := histogramsPool.Get()[:0]
hists := h.wlReplayHistogramsPool.Get()[:0]
hists, err = dec.HistogramSamples(rec, hists)
if err != nil {
decodeErr = &wlog.CorruptionErr{
@ -200,7 +191,7 @@ func (h *Head) loadWAL(r *wlog.Reader, syms *labels.SymbolTable, multiRef map[ch
}
decoded <- hists
case record.FloatHistogramSamples, record.CustomBucketsFloatHistogramSamples:
hists := floatHistogramsPool.Get()[:0]
hists := h.wlReplayFloatHistogramsPool.Get()[:0]
hists, err = dec.FloatHistogramSamples(rec, hists)
if err != nil {
decodeErr = &wlog.CorruptionErr{
@ -212,7 +203,7 @@ func (h *Head) loadWAL(r *wlog.Reader, syms *labels.SymbolTable, multiRef map[ch
}
decoded <- hists
case record.Metadata:
meta := metadataPool.Get()[:0]
meta := h.wlReplayMetadataPool.Get()[:0]
meta, err := dec.Metadata(rec, meta)
if err != nil {
decodeErr = &wlog.CorruptionErr{
@ -251,7 +242,7 @@ Outer:
idx := uint64(mSeries.ref) % uint64(concurrency)
processors[idx].input <- walSubsetProcessorInputItem{walSeriesRef: walSeries.Ref, existingSeries: mSeries}
}
seriesPool.Put(v)
h.wlReplaySeriesPool.Put(v)
case []record.RefSample:
samples := v
minValidTime := h.minValidTime.Load()
@ -287,7 +278,7 @@ Outer:
}
samples = samples[m:]
}
samplesPool.Put(v)
h.wlReplaySamplesPool.Put(v)
case []tombstones.Stone:
for _, s := range v {
for _, itv := range s.Intervals {
@ -301,12 +292,12 @@ Outer:
h.tombstones.AddInterval(s.Ref, itv)
}
}
tstonesPool.Put(v)
h.wlReplaytStonesPool.Put(v)
case []record.RefExemplar:
for _, e := range v {
exemplarsInput <- e
}
exemplarsPool.Put(v)
h.wlReplayExemplarsPool.Put(v)
case []record.RefHistogramSample:
samples := v
minValidTime := h.minValidTime.Load()
@ -342,7 +333,7 @@ Outer:
}
samples = samples[m:]
}
histogramsPool.Put(v)
h.wlReplayHistogramsPool.Put(v)
case []record.RefFloatHistogramSample:
samples := v
minValidTime := h.minValidTime.Load()
@ -378,7 +369,7 @@ Outer:
}
samples = samples[m:]
}
floatHistogramsPool.Put(v)
h.wlReplayFloatHistogramsPool.Put(v)
case []record.RefMetadata:
for _, m := range v {
s := h.series.getByID(m.Ref)
@ -392,7 +383,7 @@ Outer:
Help: m.Help,
}
}
metadataPool.Put(v)
h.wlReplayMetadataPool.Put(v)
default:
panic(fmt.Errorf("unexpected decoded type: %T", d))
}
@ -659,12 +650,8 @@ func (h *Head) loadWBL(r *wlog.Reader, syms *labels.SymbolTable, multiRef map[ch
shards = make([][]record.RefSample, concurrency)
histogramShards = make([][]histogramRecord, concurrency)
decodedCh = make(chan interface{}, 10)
decodeErr error
samplesPool zeropool.Pool[[]record.RefSample]
markersPool zeropool.Pool[[]record.RefMmapMarker]
histogramSamplesPool zeropool.Pool[[]record.RefHistogramSample]
floatHistogramSamplesPool zeropool.Pool[[]record.RefFloatHistogramSample]
decodedCh = make(chan interface{}, 10)
decodeErr error
)
defer func() {
@ -700,7 +687,7 @@ func (h *Head) loadWBL(r *wlog.Reader, syms *labels.SymbolTable, multiRef map[ch
rec := r.Record()
switch dec.Type(rec) {
case record.Samples:
samples := samplesPool.Get()[:0]
samples := h.wlReplaySamplesPool.Get()[:0]
samples, err = dec.Samples(rec, samples)
if err != nil {
decodeErr = &wlog.CorruptionErr{
@ -712,7 +699,7 @@ func (h *Head) loadWBL(r *wlog.Reader, syms *labels.SymbolTable, multiRef map[ch
}
decodedCh <- samples
case record.MmapMarkers:
markers := markersPool.Get()[:0]
markers := h.wlReplayMmapMarkersPool.Get()[:0]
markers, err = dec.MmapMarkers(rec, markers)
if err != nil {
decodeErr = &wlog.CorruptionErr{
@ -724,7 +711,7 @@ func (h *Head) loadWBL(r *wlog.Reader, syms *labels.SymbolTable, multiRef map[ch
}
decodedCh <- markers
case record.HistogramSamples, record.CustomBucketsHistogramSamples:
hists := histogramSamplesPool.Get()[:0]
hists := h.wlReplayHistogramsPool.Get()[:0]
hists, err = dec.HistogramSamples(rec, hists)
if err != nil {
decodeErr = &wlog.CorruptionErr{
@ -736,7 +723,7 @@ func (h *Head) loadWBL(r *wlog.Reader, syms *labels.SymbolTable, multiRef map[ch
}
decodedCh <- hists
case record.FloatHistogramSamples, record.CustomBucketsFloatHistogramSamples:
hists := floatHistogramSamplesPool.Get()[:0]
hists := h.wlReplayFloatHistogramsPool.Get()[:0]
hists, err = dec.FloatHistogramSamples(rec, hists)
if err != nil {
decodeErr = &wlog.CorruptionErr{
@ -787,7 +774,7 @@ func (h *Head) loadWBL(r *wlog.Reader, syms *labels.SymbolTable, multiRef map[ch
}
samples = samples[m:]
}
samplesPool.Put(v)
h.wlReplaySamplesPool.Put(v)
case []record.RefMmapMarker:
markers := v
for _, rm := range markers {
@ -842,7 +829,7 @@ func (h *Head) loadWBL(r *wlog.Reader, syms *labels.SymbolTable, multiRef map[ch
}
samples = samples[m:]
}
histogramSamplesPool.Put(v)
h.wlReplayHistogramsPool.Put(v)
case []record.RefFloatHistogramSample:
samples := v
// We split up the samples into chunks of 5000 samples or less.
@ -874,7 +861,7 @@ func (h *Head) loadWBL(r *wlog.Reader, syms *labels.SymbolTable, multiRef map[ch
}
samples = samples[m:]
}
floatHistogramSamplesPool.Put(v)
h.wlReplayFloatHistogramsPool.Put(v)
default:
panic(fmt.Errorf("unexpected decodedCh type: %T", d))
}

View file

@ -275,12 +275,11 @@ func (txr *txRing) cleanupAppendIDsBelow(bound uint64) {
pos := int(txr.txIDFirst)
for txr.txIDCount > 0 {
if txr.txIDs[pos] < bound {
txr.txIDFirst++
txr.txIDCount--
} else {
if txr.txIDs[pos] >= bound {
break
}
txr.txIDFirst++
txr.txIDCount--
pos++
if pos == len(txr.txIDs) {

View file

@ -495,7 +495,7 @@ func TestEndpoints(t *testing.T) {
remote := remote.NewStorage(promslog.New(&promslogConfig), prometheus.DefaultRegisterer, func() (int64, error) {
return 0, nil
}, dbDir, 1*time.Second, nil, false)
}, dbDir, 1*time.Second, nil)
err = remote.ApplyConfig(&config.Config{
RemoteReadConfigs: []*config.RemoteReadConfig{

View file

@ -403,7 +403,7 @@ func TestFederationWithNativeHistograms(t *testing.T) {
}
require.NoError(t, err)
if et == textparse.EntryHistogram || et == textparse.EntrySeries {
p.Metric(&l)
p.Labels(&l)
}
switch et {
case textparse.EntryHelp:

View file

@ -332,14 +332,16 @@ function App() {
justify="space-between"
wrap="nowrap"
>
<Group gap={65} wrap="nowrap">
<Group gap={40} wrap="nowrap">
<Link
to="/"
style={{ textDecoration: "none", color: "white" }}
>
<Group gap={10} wrap="nowrap">
<img src={PrometheusLogo} height={30} />
<Text fz={20}>Prometheus{agentMode && " Agent"}</Text>
<Text hiddenFrom="sm" fz={20}>Prometheus</Text>
<Text visibleFrom="md" fz={20}>Prometheus</Text>
<Text fz={20}>{agentMode && "Agent"}</Text>
</Group>
</Link>
<Group gap={12} visibleFrom="sm" wrap="nowrap">

View file

@ -1,14 +1,8 @@
import {
useMantineColorScheme,
SegmentedControl,
rem,
MantineColorScheme,
Tooltip,
} from "@mantine/core";
import { useMantineColorScheme, rem, ActionIcon } from "@mantine/core";
import {
IconMoonFilled,
IconSunFilled,
IconUserFilled,
IconBrightnessFilled,
} from "@tabler/icons-react";
import { FC } from "react";
@ -20,45 +14,28 @@ export const ThemeSelector: FC = () => {
};
return (
<SegmentedControl
color="gray.7"
size="xs"
// styles={{ root: { backgroundColor: "var(--mantine-color-gray-7)" } }}
styles={{
root: {
padding: 3,
backgroundColor: "var(--mantine-color-gray-6)",
},
}}
withItemsBorders={false}
value={colorScheme}
onChange={(v) => setColorScheme(v as MantineColorScheme)}
data={[
{
value: "light",
label: (
<Tooltip label="Use light theme" offset={15}>
<IconSunFilled {...iconProps} />
</Tooltip>
),
},
{
value: "dark",
label: (
<Tooltip label="Use dark theme" offset={15}>
<IconMoonFilled {...iconProps} />
</Tooltip>
),
},
{
value: "auto",
label: (
<Tooltip label="Use browser-preferred theme" offset={15}>
<IconUserFilled {...iconProps} />
</Tooltip>
),
},
]}
/>
<ActionIcon
color="gray"
title={`Switch to ${colorScheme === "light" ? "dark" : colorScheme === "dark" ? "browser-preferred" : "light"} theme`}
aria-label={`Switch to ${colorScheme === "light" ? "dark" : colorScheme === "dark" ? "browser-preferred" : "light"} theme`}
size={32}
onClick={() =>
setColorScheme(
colorScheme === "light"
? "dark"
: colorScheme === "dark"
? "auto"
: "light"
)
}
>
{colorScheme === "light" ? (
<IconSunFilled {...iconProps} />
) : colorScheme === "dark" ? (
<IconMoonFilled {...iconProps} />
) : (
<IconBrightnessFilled {...iconProps} />
)}
</ActionIcon>
);
};

View file

@ -294,7 +294,7 @@ class Cache {
constructor(config?: CacheConfig) {
const maxAge = {
ttl: config && config.maxAge ? config.maxAge : 5 * 60 * 1000,
ttlAutopurge: false,
ttlAutopurge: true,
};
this.completeAssociation = new LRUCache<string, Map<string, Set<string>>>(maxAge);
this.metricMetadata = {};