From cfc15cf103cada55de14b8fd2986c00e67b00c93 Mon Sep 17 00:00:00 2001 From: beorn7 Date: Fri, 29 Jan 2016 15:30:57 +0100 Subject: [PATCH 01/10] Update common/model vendoring --- vendor/github.com/prometheus/common/model/time.go | 14 ++++++++++++-- vendor/vendor.json | 4 ++-- 2 files changed, 14 insertions(+), 4 deletions(-) diff --git a/vendor/github.com/prometheus/common/model/time.go b/vendor/github.com/prometheus/common/model/time.go index e2e9ff5747..548968aebe 100644 --- a/vendor/github.com/prometheus/common/model/time.go +++ b/vendor/github.com/prometheus/common/model/time.go @@ -163,10 +163,10 @@ func (t *Time) UnmarshalJSON(b []byte) error { // This type should not propagate beyond the scope of input/output processing. type Duration time.Duration -var durationRE = regexp.MustCompile("^([0-9]+)(d|h|m|s|ms)$") +var durationRE = regexp.MustCompile("^([0-9]+)(y|w|d|h|m|s|ms)$") // StringToDuration parses a string into a time.Duration, assuming that a year -// a day always has 24h. +// always has 365d, a week always has 7d, and a day always has 24h. func ParseDuration(durationStr string) (Duration, error) { matches := durationRE.FindStringSubmatch(durationStr) if len(matches) != 3 { @@ -177,6 +177,10 @@ func ParseDuration(durationStr string) (Duration, error) { dur = time.Duration(n) * time.Millisecond ) switch unit := matches[2]; unit { + case "y": + dur *= 1000 * 60 * 60 * 24 * 365 + case "w": + dur *= 1000 * 60 * 60 * 24 * 7 case "d": dur *= 1000 * 60 * 60 * 24 case "h": @@ -199,6 +203,8 @@ func (d Duration) String() string { unit = "ms" ) factors := map[string]int64{ + "y": 1000 * 60 * 60 * 24 * 365, + "w": 1000 * 60 * 60 * 24 * 7, "d": 1000 * 60 * 60 * 24, "h": 1000 * 60 * 60, "m": 1000 * 60, @@ -207,6 +213,10 @@ func (d Duration) String() string { } switch int64(0) { + case ms % factors["y"]: + unit = "y" + case ms % factors["w"]: + unit = "w" case ms % factors["d"]: unit = "d" case ms % factors["h"]: diff --git a/vendor/vendor.json b/vendor/vendor.json index a5c1ee1152..f2405ebabd 100644 --- a/vendor/vendor.json +++ b/vendor/vendor.json @@ -174,8 +174,8 @@ }, { "path": "github.com/prometheus/common/model", - "revision": "b0d797186bfbaf6d785031c6c2d32f75c720007d", - "revisionTime": "2016-01-22T12:15:42+01:00" + "revision": "0e53cc19aa67dd2e8587a26e28643cb152f5403d", + "revisionTime": "2016-01-29T15:16:16+01:00" }, { "path": "github.com/prometheus/common/route", From a7408bfb471caa54fbe7ae6c228e1f0db8eae674 Mon Sep 17 00:00:00 2001 From: beorn7 Date: Fri, 29 Jan 2016 15:23:11 +0100 Subject: [PATCH 02/10] Unify duration parsing It's actually happening in several places (and for flags, we use the standard Go time.Duration...). This at least reduces all our home-grown parsing to one place (in model). --- config/config.go | 115 ++++++++++++------------------- config/config_test.go | 42 +++++------ promql/parse.go | 4 +- promql/printer.go | 9 ++- promql/test.go | 9 ++- retrieval/discovery/file_test.go | 4 +- retrieval/target_test.go | 16 ++--- retrieval/targetmanager_test.go | 8 +-- rules/alerting.go | 6 +- util/strutil/strconv.go | 62 ----------------- web/api/v1/api.go | 5 +- 11 files changed, 95 insertions(+), 185 deletions(-) diff --git a/config/config.go b/config/config.go index 81882b0638..19bec5423f 100644 --- a/config/config.go +++ b/config/config.go @@ -25,8 +25,6 @@ import ( "github.com/prometheus/common/model" "gopkg.in/yaml.v2" - - "github.com/prometheus/prometheus/util/strutil" ) var ( @@ -75,9 +73,9 @@ var ( // DefaultGlobalConfig is the default global configuration. DefaultGlobalConfig = GlobalConfig{ - ScrapeInterval: Duration(1 * time.Minute), - ScrapeTimeout: Duration(10 * time.Second), - EvaluationInterval: Duration(1 * time.Minute), + ScrapeInterval: model.Duration(1 * time.Minute), + ScrapeTimeout: model.Duration(10 * time.Second), + EvaluationInterval: model.Duration(1 * time.Minute), } // DefaultScrapeConfig is the default scrape configuration. @@ -99,13 +97,13 @@ var ( // DefaultDNSSDConfig is the default DNS SD configuration. DefaultDNSSDConfig = DNSSDConfig{ - RefreshInterval: Duration(30 * time.Second), + RefreshInterval: model.Duration(30 * time.Second), Type: "SRV", } // DefaultFileSDConfig is the default file SD configuration. DefaultFileSDConfig = FileSDConfig{ - RefreshInterval: Duration(5 * time.Minute), + RefreshInterval: model.Duration(5 * time.Minute), } // DefaultConsulSDConfig is the default Consul SD configuration. @@ -116,30 +114,30 @@ var ( // DefaultServersetSDConfig is the default Serverset SD configuration. DefaultServersetSDConfig = ServersetSDConfig{ - Timeout: Duration(10 * time.Second), + Timeout: model.Duration(10 * time.Second), } // DefaultNerveSDConfig is the default Nerve SD configuration. DefaultNerveSDConfig = NerveSDConfig{ - Timeout: Duration(10 * time.Second), + Timeout: model.Duration(10 * time.Second), } // DefaultMarathonSDConfig is the default Marathon SD configuration. DefaultMarathonSDConfig = MarathonSDConfig{ - RefreshInterval: Duration(30 * time.Second), + RefreshInterval: model.Duration(30 * time.Second), } // DefaultKubernetesSDConfig is the default Kubernetes SD configuration DefaultKubernetesSDConfig = KubernetesSDConfig{ KubeletPort: 10255, - RequestTimeout: Duration(10 * time.Second), - RetryInterval: Duration(1 * time.Second), + RequestTimeout: model.Duration(10 * time.Second), + RetryInterval: model.Duration(1 * time.Second), } // DefaultEC2SDConfig is the default EC2 SD configuration. DefaultEC2SDConfig = EC2SDConfig{ Port: 80, - RefreshInterval: Duration(60 * time.Second), + RefreshInterval: model.Duration(60 * time.Second), } ) @@ -281,11 +279,11 @@ func (c *Config) UnmarshalYAML(unmarshal func(interface{}) error) error { // objects. type GlobalConfig struct { // How frequently to scrape targets by default. - ScrapeInterval Duration `yaml:"scrape_interval,omitempty"` + ScrapeInterval model.Duration `yaml:"scrape_interval,omitempty"` // The default timeout when scraping targets. - ScrapeTimeout Duration `yaml:"scrape_timeout,omitempty"` + ScrapeTimeout model.Duration `yaml:"scrape_timeout,omitempty"` // How frequently to evaluate rules by default. - EvaluationInterval Duration `yaml:"evaluation_interval,omitempty"` + EvaluationInterval model.Duration `yaml:"evaluation_interval,omitempty"` // The labels to add to any timeseries that this Prometheus instance scrapes. ExternalLabels model.LabelSet `yaml:"external_labels,omitempty"` @@ -344,9 +342,9 @@ type ScrapeConfig struct { // A set of query parameters with which the target is scraped. Params url.Values `yaml:"params,omitempty"` // How frequently to scrape the targets of this scrape config. - ScrapeInterval Duration `yaml:"scrape_interval,omitempty"` + ScrapeInterval model.Duration `yaml:"scrape_interval,omitempty"` // The timeout for scraping targets of this config. - ScrapeTimeout Duration `yaml:"scrape_timeout,omitempty"` + ScrapeTimeout model.Duration `yaml:"scrape_timeout,omitempty"` // The HTTP resource path on which to fetch metrics from targets. MetricsPath string `yaml:"metrics_path,omitempty"` // The URL scheme with which to fetch metrics from targets. @@ -532,10 +530,10 @@ func (tg *TargetGroup) UnmarshalJSON(b []byte) error { // DNSSDConfig is the configuration for DNS based service discovery. type DNSSDConfig struct { - Names []string `yaml:"names"` - RefreshInterval Duration `yaml:"refresh_interval,omitempty"` - Type string `yaml:"type"` - Port int `yaml:"port"` // Ignored for SRV records + Names []string `yaml:"names"` + RefreshInterval model.Duration `yaml:"refresh_interval,omitempty"` + Type string `yaml:"type"` + Port int `yaml:"port"` // Ignored for SRV records // Catches all undefined fields and must be empty after parsing. XXX map[string]interface{} `yaml:",inline"` } @@ -565,8 +563,8 @@ func (c *DNSSDConfig) UnmarshalYAML(unmarshal func(interface{}) error) error { // FileSDConfig is the configuration for file based discovery. type FileSDConfig struct { - Names []string `yaml:"names"` - RefreshInterval Duration `yaml:"refresh_interval,omitempty"` + Names []string `yaml:"names"` + RefreshInterval model.Duration `yaml:"refresh_interval,omitempty"` // Catches all undefined fields and must be empty after parsing. XXX map[string]interface{} `yaml:",inline"` @@ -624,9 +622,9 @@ func (c *ConsulSDConfig) UnmarshalYAML(unmarshal func(interface{}) error) error // ServersetSDConfig is the configuration for Twitter serversets in Zookeeper based discovery. type ServersetSDConfig struct { - Servers []string `yaml:"servers"` - Paths []string `yaml:"paths"` - Timeout Duration `yaml:"timeout,omitempty"` + Servers []string `yaml:"servers"` + Paths []string `yaml:"paths"` + Timeout model.Duration `yaml:"timeout,omitempty"` // Catches all undefined fields and must be empty after parsing. XXX map[string]interface{} `yaml:",inline"` @@ -656,9 +654,9 @@ func (c *ServersetSDConfig) UnmarshalYAML(unmarshal func(interface{}) error) err // NerveSDConfig is the configuration for AirBnB's Nerve in Zookeeper based discovery. type NerveSDConfig struct { - Servers []string `yaml:"servers"` - Paths []string `yaml:"paths"` - Timeout Duration `yaml:"timeout,omitempty"` + Servers []string `yaml:"servers"` + Paths []string `yaml:"paths"` + Timeout model.Duration `yaml:"timeout,omitempty"` // Catches all undefined fields and must be empty after parsing. XXX map[string]interface{} `yaml:",inline"` @@ -688,8 +686,8 @@ func (c *NerveSDConfig) UnmarshalYAML(unmarshal func(interface{}) error) error { // MarathonSDConfig is the configuration for services running on Marathon. type MarathonSDConfig struct { - Servers []string `yaml:"servers,omitempty"` - RefreshInterval Duration `yaml:"refresh_interval,omitempty"` + Servers []string `yaml:"servers,omitempty"` + RefreshInterval model.Duration `yaml:"refresh_interval,omitempty"` // Catches all undefined fields and must be empty after parsing. XXX map[string]interface{} `yaml:",inline"` @@ -712,15 +710,15 @@ func (c *MarathonSDConfig) UnmarshalYAML(unmarshal func(interface{}) error) erro // KubernetesSDConfig is the configuration for Kubernetes service discovery. type KubernetesSDConfig struct { - APIServers []URL `yaml:"api_servers"` - KubeletPort int `yaml:"kubelet_port,omitempty"` - InCluster bool `yaml:"in_cluster,omitempty"` - BasicAuth *BasicAuth `yaml:"basic_auth,omitempty"` - BearerToken string `yaml:"bearer_token,omitempty"` - BearerTokenFile string `yaml:"bearer_token_file,omitempty"` - RetryInterval Duration `yaml:"retry_interval,omitempty"` - RequestTimeout Duration `yaml:"request_timeout,omitempty"` - TLSConfig TLSConfig `yaml:"tls_config,omitempty"` + APIServers []URL `yaml:"api_servers"` + KubeletPort int `yaml:"kubelet_port,omitempty"` + InCluster bool `yaml:"in_cluster,omitempty"` + BasicAuth *BasicAuth `yaml:"basic_auth,omitempty"` + BearerToken string `yaml:"bearer_token,omitempty"` + BearerTokenFile string `yaml:"bearer_token_file,omitempty"` + RetryInterval model.Duration `yaml:"retry_interval,omitempty"` + RequestTimeout model.Duration `yaml:"request_timeout,omitempty"` + TLSConfig TLSConfig `yaml:"tls_config,omitempty"` // Catches all undefined fields and must be empty after parsing. XXX map[string]interface{} `yaml:",inline"` @@ -749,11 +747,11 @@ func (c *KubernetesSDConfig) UnmarshalYAML(unmarshal func(interface{}) error) er // EC2SDConfig is the configuration for EC2 based service discovery. type EC2SDConfig struct { - Region string `yaml:"region"` - AccessKey string `yaml:"access_key,omitempty"` - SecretKey string `yaml:"secret_key,omitempty"` - RefreshInterval Duration `yaml:"refresh_interval,omitempty"` - Port int `yaml:"port"` + Region string `yaml:"region"` + AccessKey string `yaml:"access_key,omitempty"` + SecretKey string `yaml:"secret_key,omitempty"` + RefreshInterval model.Duration `yaml:"refresh_interval,omitempty"` + Port int `yaml:"port"` // Catches all undefined fields and must be empty after parsing. XXX map[string]interface{} `yaml:",inline"` } @@ -883,28 +881,3 @@ func (re Regexp) MarshalYAML() (interface{}, error) { } return nil, nil } - -// Duration encapsulates a time.Duration and makes it YAML marshallable. -// -// TODO(fabxc): Since we have custom types for most things, including timestamps, -// we might want to move this into our model as well, eventually. -type Duration time.Duration - -// UnmarshalYAML implements the yaml.Unmarshaler interface. -func (d *Duration) UnmarshalYAML(unmarshal func(interface{}) error) error { - var s string - if err := unmarshal(&s); err != nil { - return err - } - dur, err := strutil.StringToDuration(s) - if err != nil { - return err - } - *d = Duration(dur) - return nil -} - -// MarshalYAML implements the yaml.Marshaler interface. -func (d Duration) MarshalYAML() (interface{}, error) { - return strutil.DurationToString(time.Duration(d)), nil -} diff --git a/config/config_test.go b/config/config_test.go index 8c27f1b547..477bee7064 100644 --- a/config/config_test.go +++ b/config/config_test.go @@ -28,9 +28,9 @@ import ( var expectedConf = &Config{ GlobalConfig: GlobalConfig{ - ScrapeInterval: Duration(15 * time.Second), + ScrapeInterval: model.Duration(15 * time.Second), ScrapeTimeout: DefaultGlobalConfig.ScrapeTimeout, - EvaluationInterval: Duration(30 * time.Second), + EvaluationInterval: model.Duration(30 * time.Second), ExternalLabels: model.LabelSet{ "monitor": "codelab", @@ -49,7 +49,7 @@ var expectedConf = &Config{ JobName: "prometheus", HonorLabels: true, - ScrapeInterval: Duration(15 * time.Second), + ScrapeInterval: model.Duration(15 * time.Second), ScrapeTimeout: DefaultGlobalConfig.ScrapeTimeout, MetricsPath: DefaultScrapeConfig.MetricsPath, @@ -73,11 +73,11 @@ var expectedConf = &Config{ FileSDConfigs: []*FileSDConfig{ { Names: []string{"foo/*.slow.json", "foo/*.slow.yml", "single/file.yml"}, - RefreshInterval: Duration(10 * time.Minute), + RefreshInterval: model.Duration(10 * time.Minute), }, { Names: []string{"bar/*.yaml"}, - RefreshInterval: Duration(5 * time.Minute), + RefreshInterval: model.Duration(5 * time.Minute), }, }, @@ -108,8 +108,8 @@ var expectedConf = &Config{ { JobName: "service-x", - ScrapeInterval: Duration(50 * time.Second), - ScrapeTimeout: Duration(5 * time.Second), + ScrapeInterval: model.Duration(50 * time.Second), + ScrapeTimeout: model.Duration(5 * time.Second), BasicAuth: &BasicAuth{ Username: "admin_name", @@ -124,14 +124,14 @@ var expectedConf = &Config{ "first.dns.address.domain.com", "second.dns.address.domain.com", }, - RefreshInterval: Duration(15 * time.Second), + RefreshInterval: model.Duration(15 * time.Second), Type: "SRV", }, { Names: []string{ "first.dns.address.domain.com", }, - RefreshInterval: Duration(30 * time.Second), + RefreshInterval: model.Duration(30 * time.Second), Type: "SRV", }, }, @@ -180,7 +180,7 @@ var expectedConf = &Config{ { JobName: "service-y", - ScrapeInterval: Duration(15 * time.Second), + ScrapeInterval: model.Duration(15 * time.Second), ScrapeTimeout: DefaultGlobalConfig.ScrapeTimeout, MetricsPath: DefaultScrapeConfig.MetricsPath, @@ -198,8 +198,8 @@ var expectedConf = &Config{ { JobName: "service-z", - ScrapeInterval: Duration(15 * time.Second), - ScrapeTimeout: Duration(10 * time.Second), + ScrapeInterval: model.Duration(15 * time.Second), + ScrapeTimeout: model.Duration(10 * time.Second), MetricsPath: "/metrics", Scheme: "http", @@ -214,7 +214,7 @@ var expectedConf = &Config{ { JobName: "service-kubernetes", - ScrapeInterval: Duration(15 * time.Second), + ScrapeInterval: model.Duration(15 * time.Second), ScrapeTimeout: DefaultGlobalConfig.ScrapeTimeout, MetricsPath: DefaultScrapeConfig.MetricsPath, @@ -228,15 +228,15 @@ var expectedConf = &Config{ Password: "mypassword", }, KubeletPort: 10255, - RequestTimeout: Duration(10 * time.Second), - RetryInterval: Duration(1 * time.Second), + RequestTimeout: model.Duration(10 * time.Second), + RetryInterval: model.Duration(1 * time.Second), }, }, }, { JobName: "service-marathon", - ScrapeInterval: Duration(15 * time.Second), + ScrapeInterval: model.Duration(15 * time.Second), ScrapeTimeout: DefaultGlobalConfig.ScrapeTimeout, MetricsPath: DefaultScrapeConfig.MetricsPath, @@ -247,14 +247,14 @@ var expectedConf = &Config{ Servers: []string{ "http://marathon.example.com:8080", }, - RefreshInterval: Duration(30 * time.Second), + RefreshInterval: model.Duration(30 * time.Second), }, }, }, { JobName: "service-ec2", - ScrapeInterval: Duration(15 * time.Second), + ScrapeInterval: model.Duration(15 * time.Second), ScrapeTimeout: DefaultGlobalConfig.ScrapeTimeout, MetricsPath: DefaultScrapeConfig.MetricsPath, @@ -265,7 +265,7 @@ var expectedConf = &Config{ Region: "us-east-1", AccessKey: "access", SecretKey: "secret", - RefreshInterval: Duration(60 * time.Second), + RefreshInterval: model.Duration(60 * time.Second), Port: 80, }, }, @@ -273,7 +273,7 @@ var expectedConf = &Config{ { JobName: "service-nerve", - ScrapeInterval: Duration(15 * time.Second), + ScrapeInterval: model.Duration(15 * time.Second), ScrapeTimeout: DefaultGlobalConfig.ScrapeTimeout, MetricsPath: DefaultScrapeConfig.MetricsPath, @@ -283,7 +283,7 @@ var expectedConf = &Config{ { Servers: []string{"localhost"}, Paths: []string{"/monitoring"}, - Timeout: Duration(10 * time.Second), + Timeout: model.Duration(10 * time.Second), }, }, }, diff --git a/promql/parse.go b/promql/parse.go index 198aa43800..a239869734 100644 --- a/promql/parse.go +++ b/promql/parse.go @@ -1140,12 +1140,12 @@ func (p *parser) unquoteString(s string) string { } func parseDuration(ds string) (time.Duration, error) { - dur, err := strutil.StringToDuration(ds) + dur, err := model.ParseDuration(ds) if err != nil { return 0, err } if dur == 0 { return 0, fmt.Errorf("duration must be greater than 0") } - return dur, nil + return time.Duration(dur), nil } diff --git a/promql/printer.go b/promql/printer.go index 35b40b00ee..7be0de04e4 100644 --- a/promql/printer.go +++ b/promql/printer.go @@ -22,7 +22,6 @@ import ( "github.com/prometheus/common/model" "github.com/prometheus/prometheus/storage/metric" - "github.com/prometheus/prometheus/util/strutil" ) // Tree returns a string of the tree structure of the given node. @@ -104,7 +103,7 @@ func (node *AlertStmt) String() string { s := fmt.Sprintf("ALERT %s", node.Name) s += fmt.Sprintf("\n\tIF %s", node.Expr) if node.Duration > 0 { - s += fmt.Sprintf("\n\tFOR %s", strutil.DurationToString(node.Duration)) + s += fmt.Sprintf("\n\tFOR %s", model.Duration(node.Duration)) } if len(node.Labels) > 0 { s += fmt.Sprintf("\n\tLABELS %s", node.Labels) @@ -178,9 +177,9 @@ func (node *MatrixSelector) String() string { } offset := "" if node.Offset != time.Duration(0) { - offset = fmt.Sprintf(" OFFSET %s", strutil.DurationToString(node.Offset)) + offset = fmt.Sprintf(" OFFSET %s", model.Duration(node.Offset)) } - return fmt.Sprintf("%s[%s]%s", vecSelector.String(), strutil.DurationToString(node.Range), offset) + return fmt.Sprintf("%s[%s]%s", vecSelector.String(), model.Duration(node.Range), offset) } func (node *NumberLiteral) String() string { @@ -210,7 +209,7 @@ func (node *VectorSelector) String() string { } offset := "" if node.Offset != time.Duration(0) { - offset = fmt.Sprintf(" OFFSET %s", strutil.DurationToString(node.Offset)) + offset = fmt.Sprintf(" OFFSET %s", model.Duration(node.Offset)) } if len(labelStrings) == 0 { diff --git a/promql/test.go b/promql/test.go index 04ad8b8bed..024725df29 100644 --- a/promql/test.go +++ b/promql/test.go @@ -26,7 +26,6 @@ import ( "github.com/prometheus/prometheus/storage" "github.com/prometheus/prometheus/storage/local" - "github.com/prometheus/prometheus/util/strutil" "github.com/prometheus/prometheus/util/testutil" ) @@ -98,11 +97,11 @@ func (t *Test) parseLoad(lines []string, i int) (int, *loadCmd, error) { } parts := patLoad.FindStringSubmatch(lines[i]) - gap, err := strutil.StringToDuration(parts[1]) + gap, err := model.ParseDuration(parts[1]) if err != nil { return i, nil, raise(i, "invalid step definition %q: %s", parts[1], err) } - cmd := newLoadCmd(gap) + cmd := newLoadCmd(time.Duration(gap)) for i+1 < len(lines) { i++ defLine := lines[i] @@ -141,11 +140,11 @@ func (t *Test) parseEval(lines []string, i int) (int, *evalCmd, error) { return i, nil, err } - offset, err := strutil.StringToDuration(at) + offset, err := model.ParseDuration(at) if err != nil { return i, nil, raise(i, "invalid step definition %q: %s", parts[1], err) } - ts := testStartTime.Add(offset) + ts := testStartTime.Add(time.Duration(offset)) cmd := newEvalCmd(expr, ts, ts, 0) switch mod { diff --git a/retrieval/discovery/file_test.go b/retrieval/discovery/file_test.go index 74270ad029..4c1407666a 100644 --- a/retrieval/discovery/file_test.go +++ b/retrieval/discovery/file_test.go @@ -7,6 +7,8 @@ import ( "testing" "time" + "github.com/prometheus/common/model" + "github.com/prometheus/prometheus/config" ) @@ -22,7 +24,7 @@ func testFileSD(t *testing.T, ext string) { // whether file watches work as expected. var conf config.FileSDConfig conf.Names = []string{"fixtures/_*" + ext} - conf.RefreshInterval = config.Duration(1 * time.Hour) + conf.RefreshInterval = model.Duration(1 * time.Hour) var ( fsd = NewFileDiscovery(&conf) diff --git a/retrieval/target_test.go b/retrieval/target_test.go index 51cb331f23..acacc86aa4 100644 --- a/retrieval/target_test.go +++ b/retrieval/target_test.go @@ -420,8 +420,8 @@ func TestURLParams(t *testing.T) { target := NewTarget( &config.ScrapeConfig{ JobName: "test_job1", - ScrapeInterval: config.Duration(1 * time.Minute), - ScrapeTimeout: config.Duration(1 * time.Second), + ScrapeInterval: model.Duration(1 * time.Minute), + ScrapeTimeout: model.Duration(1 * time.Second), Scheme: serverURL.Scheme, Params: url.Values{ "foo": []string{"bar", "baz"}, @@ -441,7 +441,7 @@ func TestURLParams(t *testing.T) { func newTestTarget(targetURL string, deadline time.Duration, baseLabels model.LabelSet) *Target { cfg := &config.ScrapeConfig{ - ScrapeTimeout: config.Duration(deadline), + ScrapeTimeout: model.Duration(deadline), } c, _ := newHTTPClient(cfg) t := &Target{ @@ -481,7 +481,7 @@ func TestNewHTTPBearerToken(t *testing.T) { defer server.Close() cfg := &config.ScrapeConfig{ - ScrapeTimeout: config.Duration(1 * time.Second), + ScrapeTimeout: model.Duration(1 * time.Second), BearerToken: "1234", } c, err := newHTTPClient(cfg) @@ -509,7 +509,7 @@ func TestNewHTTPBearerTokenFile(t *testing.T) { defer server.Close() cfg := &config.ScrapeConfig{ - ScrapeTimeout: config.Duration(1 * time.Second), + ScrapeTimeout: model.Duration(1 * time.Second), BearerTokenFile: "testdata/bearertoken.txt", } c, err := newHTTPClient(cfg) @@ -536,7 +536,7 @@ func TestNewHTTPBasicAuth(t *testing.T) { defer server.Close() cfg := &config.ScrapeConfig{ - ScrapeTimeout: config.Duration(1 * time.Second), + ScrapeTimeout: model.Duration(1 * time.Second), BasicAuth: &config.BasicAuth{ Username: "user", Password: "password123", @@ -566,7 +566,7 @@ func TestNewHTTPCACert(t *testing.T) { defer server.Close() cfg := &config.ScrapeConfig{ - ScrapeTimeout: config.Duration(1 * time.Second), + ScrapeTimeout: model.Duration(1 * time.Second), TLSConfig: config.TLSConfig{ CAFile: "testdata/ca.cer", }, @@ -599,7 +599,7 @@ func TestNewHTTPClientCert(t *testing.T) { defer server.Close() cfg := &config.ScrapeConfig{ - ScrapeTimeout: config.Duration(1 * time.Second), + ScrapeTimeout: model.Duration(1 * time.Second), TLSConfig: config.TLSConfig{ CAFile: "testdata/ca.cer", CertFile: "testdata/client.cer", diff --git a/retrieval/targetmanager_test.go b/retrieval/targetmanager_test.go index 85abdd31ef..b192a10c0d 100644 --- a/retrieval/targetmanager_test.go +++ b/retrieval/targetmanager_test.go @@ -75,7 +75,7 @@ func TestPrefixedTargetProvider(t *testing.T) { func TestTargetManagerChan(t *testing.T) { testJob1 := &config.ScrapeConfig{ JobName: "test_job1", - ScrapeInterval: config.Duration(1 * time.Minute), + ScrapeInterval: model.Duration(1 * time.Minute), TargetGroups: []*config.TargetGroup{{ Targets: []model.LabelSet{ {model.AddressLabel: "example.org:80"}, @@ -204,7 +204,7 @@ func TestTargetManagerChan(t *testing.T) { func TestTargetManagerConfigUpdate(t *testing.T) { testJob1 := &config.ScrapeConfig{ JobName: "test_job1", - ScrapeInterval: config.Duration(1 * time.Minute), + ScrapeInterval: model.Duration(1 * time.Minute), Params: url.Values{ "testParam": []string{"paramValue", "secondValue"}, }, @@ -234,7 +234,7 @@ func TestTargetManagerConfigUpdate(t *testing.T) { } testJob2 := &config.ScrapeConfig{ JobName: "test_job2", - ScrapeInterval: config.Duration(1 * time.Minute), + ScrapeInterval: model.Duration(1 * time.Minute), TargetGroups: []*config.TargetGroup{ { Targets: []model.LabelSet{ @@ -288,7 +288,7 @@ func TestTargetManagerConfigUpdate(t *testing.T) { // Test that targets without host:port addresses are dropped. testJob3 := &config.ScrapeConfig{ JobName: "test_job1", - ScrapeInterval: config.Duration(1 * time.Minute), + ScrapeInterval: model.Duration(1 * time.Minute), TargetGroups: []*config.TargetGroup{{ Targets: []model.LabelSet{ {model.AddressLabel: "example.net:80"}, diff --git a/rules/alerting.go b/rules/alerting.go index 127c922a1b..a2a389d315 100644 --- a/rules/alerting.go +++ b/rules/alerting.go @@ -58,7 +58,7 @@ func (s AlertState) String() string { case StateFiring: return "firing" } - panic(fmt.Errorf("unknown alert state: %v", s)) + panic(fmt.Errorf("unknown alert state: %v", s.String())) } // Alert is the user-level representation of a single instance of an alerting rule. @@ -255,7 +255,7 @@ func (rule *AlertingRule) String() string { s := fmt.Sprintf("ALERT %s", rule.name) s += fmt.Sprintf("\n\tIF %s", rule.vector) if rule.holdDuration > 0 { - s += fmt.Sprintf("\n\tFOR %s", strutil.DurationToString(rule.holdDuration)) + s += fmt.Sprintf("\n\tFOR %s", model.Duration(rule.holdDuration)) } if len(rule.labels) > 0 { s += fmt.Sprintf("\n\tLABELS %s", rule.labels) @@ -277,7 +277,7 @@ func (rule *AlertingRule) HTMLSnippet(pathPrefix string) template.HTML { s := fmt.Sprintf("ALERT %s", pathPrefix+strutil.GraphLinkForExpression(alertMetric.String()), rule.name) s += fmt.Sprintf("\n IF %s", pathPrefix+strutil.GraphLinkForExpression(rule.vector.String()), rule.vector) if rule.holdDuration > 0 { - s += fmt.Sprintf("\n FOR %s", strutil.DurationToString(rule.holdDuration)) + s += fmt.Sprintf("\n FOR %s", model.Duration(rule.holdDuration)) } if len(rule.labels) > 0 { s += fmt.Sprintf("\n LABELS %s", rule.labels) diff --git a/util/strutil/strconv.go b/util/strutil/strconv.go index 1b7edf66bc..6bb25afdc3 100644 --- a/util/strutil/strconv.go +++ b/util/strutil/strconv.go @@ -17,75 +17,13 @@ import ( "fmt" "net/url" "regexp" - "strconv" "strings" - "time" ) var ( - durationRE = regexp.MustCompile("^([0-9]+)([ywdhms]+)$") invalidLabelCharRE = regexp.MustCompile(`[^a-zA-Z0-9_]`) ) -// DurationToString formats a time.Duration as a string with the assumption that -// a year always has 365 days and a day always has 24h. (The former doesn't work -// in leap years, the latter is broken by DST switches, not to speak about leap -// seconds, but those are not even treated properly by the duration strings in -// the standard library.) -func DurationToString(duration time.Duration) string { - seconds := int64(duration / time.Second) - factors := map[string]int64{ - "y": 60 * 60 * 24 * 365, - "d": 60 * 60 * 24, - "h": 60 * 60, - "m": 60, - "s": 1, - } - unit := "s" - switch int64(0) { - case seconds % factors["y"]: - unit = "y" - case seconds % factors["d"]: - unit = "d" - case seconds % factors["h"]: - unit = "h" - case seconds % factors["m"]: - unit = "m" - } - return fmt.Sprintf("%v%v", seconds/factors[unit], unit) -} - -// StringToDuration parses a string into a time.Duration, assuming that a year -// always has 365d, a week 7d, a day 24h. See DurationToString for problems with -// that. -func StringToDuration(durationStr string) (duration time.Duration, err error) { - matches := durationRE.FindStringSubmatch(durationStr) - if len(matches) != 3 { - err = fmt.Errorf("not a valid duration string: %q", durationStr) - return - } - durationSeconds, _ := strconv.Atoi(matches[1]) - duration = time.Duration(durationSeconds) * time.Second - unit := matches[2] - switch unit { - case "y": - duration *= 60 * 60 * 24 * 365 - case "w": - duration *= 60 * 60 * 24 * 7 - case "d": - duration *= 60 * 60 * 24 - case "h": - duration *= 60 * 60 - case "m": - duration *= 60 - case "s": - duration *= 1 - default: - return 0, fmt.Errorf("invalid time unit in duration string: %q", unit) - } - return -} - // TableLinkForExpression creates an escaped relative link to the table view of // the provided expression. func TableLinkForExpression(expr string) string { diff --git a/web/api/v1/api.go b/web/api/v1/api.go index 8acc9c7c9f..823665f3b2 100644 --- a/web/api/v1/api.go +++ b/web/api/v1/api.go @@ -18,7 +18,6 @@ import ( "github.com/prometheus/prometheus/storage/local" "github.com/prometheus/prometheus/storage/metric" "github.com/prometheus/prometheus/util/httputil" - "github.com/prometheus/prometheus/util/strutil" ) type status string @@ -324,8 +323,8 @@ func parseDuration(s string) (time.Duration, error) { if d, err := strconv.ParseFloat(s, 64); err == nil { return time.Duration(d * float64(time.Second)), nil } - if d, err := strutil.StringToDuration(s); err == nil { - return d, nil + if d, err := model.ParseDuration(s); err == nil { + return time.Duration(d), nil } return 0, fmt.Errorf("cannot parse %q to a valid duration", s) } From ec08c9a391a4fda86be6230ffd8a4832c47913e0 Mon Sep 17 00:00:00 2001 From: beorn7 Date: Wed, 27 Jan 2016 19:07:46 +0100 Subject: [PATCH 03/10] Rework the way to communicate backpressure (AKA suspended ingestion) This gives up on the idea to communicate throuh the Append() call (by either not returning as it is now or returning an error as suggested/explored elsewhere). Here I have added a Throttled() call, which has the advantage that it can be called before a whole _batch_ of Append()'s. Scrapes will happen completely or not at all. Same for rule group evaluations. That's a highly desired behavior (as discussed elsewhere). The code is even simpler now as the whole ingestion buffer could be removed. Logging of throttled mode has been streamlined and will create at most one message per minute. --- cmd/prometheus/config.go | 4 +- retrieval/helpers_test.go | 15 +++--- retrieval/target.go | 89 +++++++++++------------------- retrieval/target_test.go | 21 +++++--- retrieval/targetmanager.go | 1 + rules/manager.go | 18 ++++++- storage/local/interface.go | 3 ++ storage/local/storage.go | 96 ++++++++++++++++++++++++--------- storage/remote/queue_manager.go | 3 +- storage/remote/remote.go | 7 +++ storage/storage.go | 36 ++++++++++++- 11 files changed, 188 insertions(+), 105 deletions(-) diff --git a/cmd/prometheus/config.go b/cmd/prometheus/config.go index 19d51c9892..3ca28985e5 100644 --- a/cmd/prometheus/config.go +++ b/cmd/prometheus/config.go @@ -107,7 +107,7 @@ func init() { ) cfg.fs.IntVar( &cfg.storage.MemoryChunks, "storage.local.memory-chunks", 1024*1024, - "How many chunks to keep in memory. While the size of a chunk is 1kiB, the total memory usage will be significantly higher than this value * 1kiB. Furthermore, for various reasons, more chunks might have to be kept in memory temporarily.", + "How many chunks to keep in memory. While the size of a chunk is 1kiB, the total memory usage will be significantly higher than this value * 1kiB. Furthermore, for various reasons, more chunks might have to be kept in memory temporarily. Sample ingestion will be throttled if the configured value is exceeded by more than 10%.", ) cfg.fs.DurationVar( &cfg.storage.PersistenceRetentionPeriod, "storage.local.retention", 15*24*time.Hour, @@ -115,7 +115,7 @@ func init() { ) cfg.fs.IntVar( &cfg.storage.MaxChunksToPersist, "storage.local.max-chunks-to-persist", 512*1024, - "How many chunks can be waiting for persistence before sample ingestion will stop. Many chunks waiting to be persisted will increase the checkpoint size.", + "How many chunks can be waiting for persistence before sample ingestion will be throttled. Many chunks waiting to be persisted will increase the checkpoint size.", ) cfg.fs.DurationVar( &cfg.storage.CheckpointInterval, "storage.local.checkpoint-interval", 5*time.Minute, diff --git a/retrieval/helpers_test.go b/retrieval/helpers_test.go index 880e6230c6..1b74b2ea4a 100644 --- a/retrieval/helpers_test.go +++ b/retrieval/helpers_test.go @@ -14,8 +14,6 @@ package retrieval import ( - "time" - "github.com/prometheus/common/model" "github.com/prometheus/prometheus/config" @@ -26,14 +24,13 @@ type nopAppender struct{} func (a nopAppender) Append(*model.Sample) { } -type slowAppender struct{} - -func (a slowAppender) Append(*model.Sample) { - time.Sleep(time.Millisecond) +func (a nopAppender) NeedsThrottling() bool { + return false } type collectResultAppender struct { - result model.Samples + result model.Samples + throttled bool } func (a *collectResultAppender) Append(s *model.Sample) { @@ -45,6 +42,10 @@ func (a *collectResultAppender) Append(s *model.Sample) { a.result = append(a.result, s) } +func (a *collectResultAppender) NeedsThrottling() bool { + return a.throttled +} + // fakeTargetProvider implements a TargetProvider and allows manual injection // of TargetGroups through the update channel. type fakeTargetProvider struct { diff --git a/retrieval/target.go b/retrieval/target.go index 6139c39320..133009f4ce 100644 --- a/retrieval/target.go +++ b/retrieval/target.go @@ -48,7 +48,7 @@ const ( ) var ( - errIngestChannelFull = errors.New("ingestion channel full") + errSkippedScrape = errors.New("scrape skipped due to throttled ingestion") targetIntervalLength = prometheus.NewSummaryVec( prometheus.SummaryOpts{ @@ -59,10 +59,19 @@ var ( }, []string{interval}, ) + targetSkippedScrapes = prometheus.NewCounterVec( + prometheus.CounterOpts{ + Namespace: namespace, + Name: "target_skipped_scrapes_total", + Help: "Total number of scrapes that were skipped because the metric storage was throttled.", + }, + []string{interval}, + ) ) func init() { prometheus.MustRegister(targetIntervalLength) + prometheus.MustRegister(targetSkippedScrapes) } // TargetHealth describes the health state of a target. @@ -151,8 +160,6 @@ type Target struct { scraperStopping chan struct{} // Closing scraperStopped signals that scraping has been stopped. scraperStopped chan struct{} - // Channel to buffer ingested samples. - ingestedSamples chan model.Vector // Mutex protects the members below. sync.RWMutex @@ -166,8 +173,6 @@ type Target struct { baseLabels model.LabelSet // Internal labels, such as scheme. internalLabels model.LabelSet - // What is the deadline for the HTTP or HTTPS against this endpoint. - deadline time.Duration // The time between two scrapes. scrapeInterval time.Duration // Whether the target's labels have precedence over the base labels @@ -237,7 +242,6 @@ func (t *Target) Update(cfg *config.ScrapeConfig, baseLabels, metaLabels model.L t.url.RawQuery = params.Encode() t.scrapeInterval = time.Duration(cfg.ScrapeInterval) - t.deadline = time.Duration(cfg.ScrapeTimeout) t.honorLabels = cfg.HonorLabels t.metaLabels = metaLabels @@ -361,6 +365,11 @@ func (t *Target) RunScraper(sampleAppender storage.SampleAppender) { targetIntervalLength.WithLabelValues(intervalStr).Observe( float64(took) / float64(time.Second), // Sub-second precision. ) + if sampleAppender.NeedsThrottling() { + targetSkippedScrapes.WithLabelValues(intervalStr).Inc() + t.status.setLastError(errSkippedScrape) + continue + } t.scrape(sampleAppender) } } @@ -377,26 +386,6 @@ func (t *Target) StopScraper() { log.Debugf("Scraper for target %v stopped.", t) } -func (t *Target) ingest(s model.Vector) error { - t.RLock() - deadline := t.deadline - t.RUnlock() - // Since the regular case is that ingestedSamples is ready to receive, - // first try without setting a timeout so that we don't need to allocate - // a timer most of the time. - select { - case t.ingestedSamples <- s: - return nil - default: - select { - case t.ingestedSamples <- s: - return nil - case <-time.After(deadline / 10): - return errIngestChannelFull - } - } -} - const acceptHeader = `application/vnd.google.protobuf;proto=io.prometheus.client.MetricFamily;encoding=delimited;q=0.7,text/plain;version=0.0.4;q=0.3,application/json;schema="prometheus/telemetry";version=0.0.2;q=0.2,*/*;q=0.1` func (t *Target) scrape(appender storage.SampleAppender) (err error) { @@ -414,20 +403,20 @@ func (t *Target) scrape(appender storage.SampleAppender) (err error) { // so the relabeling rules are applied to the correct label set. if len(t.metricRelabelConfigs) > 0 { appender = relabelAppender{ - app: appender, - relabelings: t.metricRelabelConfigs, + SampleAppender: appender, + relabelings: t.metricRelabelConfigs, } } if t.honorLabels { appender = honorLabelsAppender{ - app: appender, - labels: baseLabels, + SampleAppender: appender, + labels: baseLabels, } } else { appender = ruleLabelsAppender{ - app: appender, - labels: baseLabels, + SampleAppender: appender, + labels: baseLabels, } } @@ -460,27 +449,11 @@ func (t *Target) scrape(appender storage.SampleAppender) (err error) { }, } - t.ingestedSamples = make(chan model.Vector, ingestedSamplesCap) - - go func() { - for { - // TODO(fabxc): Change the SampleAppender interface to return an error - // so we can proceed based on the status and don't leak goroutines trying - // to append a single sample after dropping all the other ones. - // - // This will also allow use to reuse this vector and save allocations. - var samples model.Vector - if err = sdec.Decode(&samples); err != nil { - break - } - if err = t.ingest(samples); err != nil { - break - } + var samples model.Vector + for { + if err = sdec.Decode(&samples); err != nil { + break } - close(t.ingestedSamples) - }() - - for samples := range t.ingestedSamples { for _, s := range samples { appender.Append(s) } @@ -495,7 +468,7 @@ func (t *Target) scrape(appender storage.SampleAppender) (err error) { // Merges the ingested sample's metric with the label set. On a collision the // value of the ingested label is stored in a label prefixed with 'exported_'. type ruleLabelsAppender struct { - app storage.SampleAppender + storage.SampleAppender labels model.LabelSet } @@ -507,11 +480,11 @@ func (app ruleLabelsAppender) Append(s *model.Sample) { s.Metric[ln] = lv } - app.app.Append(s) + app.SampleAppender.Append(s) } type honorLabelsAppender struct { - app storage.SampleAppender + storage.SampleAppender labels model.LabelSet } @@ -525,13 +498,13 @@ func (app honorLabelsAppender) Append(s *model.Sample) { } } - app.app.Append(s) + app.SampleAppender.Append(s) } // Applies a set of relabel configurations to the sample's metric // before actually appending it. type relabelAppender struct { - app storage.SampleAppender + storage.SampleAppender relabelings []*config.RelabelConfig } @@ -547,7 +520,7 @@ func (app relabelAppender) Append(s *model.Sample) { } s.Metric = model.Metric(labels) - app.app.Append(s) + app.SampleAppender.Append(s) } // URL returns a copy of the target's URL. diff --git a/retrieval/target_test.go b/retrieval/target_test.go index 51cb331f23..ed36d2450b 100644 --- a/retrieval/target_test.go +++ b/retrieval/target_test.go @@ -139,12 +139,12 @@ func TestTargetScrapeUpdatesState(t *testing.T) { } } -func TestTargetScrapeWithFullChannel(t *testing.T) { +func TestTargetScrapeWithThrottledStorage(t *testing.T) { server := httptest.NewServer( http.HandlerFunc( func(w http.ResponseWriter, r *http.Request) { w.Header().Set("Content-Type", `text/plain; version=0.0.4`) - for i := 0; i < 2*ingestedSamplesCap; i++ { + for i := 0; i < 10; i++ { w.Write([]byte( fmt.Sprintf("test_metric_%d{foo=\"bar\"} 123.456\n", i), )) @@ -155,15 +155,21 @@ func TestTargetScrapeWithFullChannel(t *testing.T) { defer server.Close() testTarget := newTestTarget(server.URL, time.Second, model.LabelSet{"dings": "bums"}) - // Affects full channel but not HTTP fetch - testTarget.deadline = 0 - testTarget.scrape(slowAppender{}) + go testTarget.RunScraper(&collectResultAppender{throttled: true}) + + // Enough time for a scrape to happen. + time.Sleep(20 * time.Millisecond) + + testTarget.StopScraper() + // Wait for it to take effect. + time.Sleep(20 * time.Millisecond) + if testTarget.status.Health() != HealthBad { t.Errorf("Expected target state %v, actual: %v", HealthBad, testTarget.status.Health()) } - if testTarget.status.LastError() != errIngestChannelFull { - t.Errorf("Expected target error %q, actual: %q", errIngestChannelFull, testTarget.status.LastError()) + if testTarget.status.LastError() != errSkippedScrape { + t.Errorf("Expected target error %q, actual: %q", errSkippedScrape, testTarget.status.LastError()) } } @@ -450,7 +456,6 @@ func newTestTarget(targetURL string, deadline time.Duration, baseLabels model.La Host: strings.TrimLeft(targetURL, "http://"), Path: "/metrics", }, - deadline: deadline, status: &TargetStatus{}, scrapeInterval: 1 * time.Millisecond, httpClient: c, diff --git a/retrieval/targetmanager.go b/retrieval/targetmanager.go index 8a453e70a3..0da126f8e1 100644 --- a/retrieval/targetmanager.go +++ b/retrieval/targetmanager.go @@ -165,6 +165,7 @@ func (tm *TargetManager) Run() { }) tm.running = true + log.Info("Target manager started.") } // handleUpdates receives target group updates and handles them in the diff --git a/rules/manager.go b/rules/manager.go index 2fea605a92..e9e7096e7b 100644 --- a/rules/manager.go +++ b/rules/manager.go @@ -66,9 +66,19 @@ var ( iterationDuration = prometheus.NewSummary(prometheus.SummaryOpts{ Namespace: namespace, Name: "evaluator_duration_seconds", - Help: "The duration for all evaluations to execute.", + Help: "The duration of rule group evaluations.", Objectives: map[float64]float64{0.01: 0.001, 0.05: 0.005, 0.5: 0.05, 0.90: 0.01, 0.99: 0.001}, }) + iterationsSkipped = prometheus.NewCounter(prometheus.CounterOpts{ + Namespace: namespace, + Name: "evaluator_iterations_skipped_total", + Help: "The total number of rule group evaluations skipped due to throttled metric storage.", + }) + iterationsScheduled = prometheus.NewCounter(prometheus.CounterOpts{ + Namespace: namespace, + Name: "evaluator_iterations_total", + Help: "The total number of scheduled rule group evaluations, whether skipped or executed.", + }) ) func init() { @@ -78,6 +88,7 @@ func init() { evalFailures.WithLabelValues(string(ruleTypeRecording)) prometheus.MustRegister(iterationDuration) + prometheus.MustRegister(iterationsSkipped) prometheus.MustRegister(evalFailures) prometheus.MustRegister(evalDuration) } @@ -133,6 +144,11 @@ func (g *Group) run() { } iter := func() { + iterationsScheduled.Inc() + if g.opts.SampleAppender.NeedsThrottling() { + iterationsSkipped.Inc() + return + } start := time.Now() g.eval() diff --git a/storage/local/interface.go b/storage/local/interface.go index 6c7df5ca2e..e260cc7627 100644 --- a/storage/local/interface.go +++ b/storage/local/interface.go @@ -34,6 +34,9 @@ type Storage interface { // from the provided Sample as those labels are considered equivalent to // a label not present at all. Append(*model.Sample) + // NeedsThrottling returns true if the Storage has too many chunks in memory + // already or has too many chunks waiting for persistence. + NeedsThrottling() bool // NewPreloader returns a new Preloader which allows preloading and pinning // series data into memory for use within a query. NewPreloader() Preloader diff --git a/storage/local/storage.go b/storage/local/storage.go index 3d5aeed562..29700163a2 100644 --- a/storage/local/storage.go +++ b/storage/local/storage.go @@ -47,9 +47,9 @@ const ( persintenceUrgencyScoreForLeavingRushedMode = 0.7 // This factor times -storage.local.memory-chunks is the number of - // memory chunks we tolerate before suspending ingestion (TODO!). It is - // also a basis for calculating the persistenceUrgencyScore. - toleranceFactorForMemChunks = 1.1 + // memory chunks we tolerate before throttling the storage. It is also a + // basis for calculating the persistenceUrgencyScore. + toleranceFactorMemChunks = 1.1 // This factor times -storage.local.max-chunks-to-persist is the minimum // required number of chunks waiting for persistence before the number // of chunks in memory may influence the persistenceUrgencyScore. (In @@ -121,9 +121,10 @@ type syncStrategy func() bool type memorySeriesStorage struct { // numChunksToPersist has to be aligned for atomic operations. - numChunksToPersist int64 // The number of chunks waiting for persistence. - maxChunksToPersist int // If numChunksToPersist reaches this threshold, ingestion will stall. - rushed bool // Whether the storage is in rushed mode. + numChunksToPersist int64 // The number of chunks waiting for persistence. + maxChunksToPersist int // If numChunksToPersist reaches this threshold, ingestion will be throttled. + rushed bool // Whether the storage is in rushed mode. + throttled chan struct{} // This chan is sent to whenever NeedsThrottling() returns true (for logging). fpLocker *fingerprintLocker fpToSeries *seriesMap @@ -180,6 +181,7 @@ func NewMemorySeriesStorage(o *MemorySeriesStorageOptions) Storage { loopStopping: make(chan struct{}), loopStopped: make(chan struct{}), + throttled: make(chan struct{}, 1), maxMemoryChunks: o.MemoryChunks, dropAfter: o.PersistenceRetentionPeriod, checkpointInterval: o.CheckpointInterval, @@ -306,6 +308,7 @@ func (s *memorySeriesStorage) Start() (err error) { } go s.handleEvictList() + go s.logThrottling() go s.loop() return nil @@ -571,16 +574,6 @@ func (s *memorySeriesStorage) Append(sample *model.Sample) { delete(sample.Metric, ln) } } - if s.getNumChunksToPersist() >= s.maxChunksToPersist { - log.Warnf( - "%d chunks waiting for persistence, sample ingestion suspended.", - s.getNumChunksToPersist(), - ) - for s.getNumChunksToPersist() >= s.maxChunksToPersist { - time.Sleep(time.Second) - } - log.Warn("Sample ingestion resumed.") - } rawFP := sample.Metric.FastFingerprint() s.fpLocker.Lock(rawFP) fp, err := s.mapper.mapFP(rawFP, sample.Metric) @@ -616,6 +609,57 @@ func (s *memorySeriesStorage) Append(sample *model.Sample) { s.incNumChunksToPersist(completedChunksCount) } +// NeedsThrottling implements Storage. +func (s *memorySeriesStorage) NeedsThrottling() bool { + if s.getNumChunksToPersist() > s.maxChunksToPersist || + float64(atomic.LoadInt64(&numMemChunks)) > float64(s.maxMemoryChunks)*toleranceFactorMemChunks { + select { + case s.throttled <- struct{}{}: + default: // Do nothing, signal aready pending. + } + return true + } + return false +} + +// logThrottling handles logging of throttled events and has to be started as a +// goroutine. It stops once s.loopStopping is closed. +// +// Logging strategy: Whenever Throttle() is called and returns true, an signal +// is sent to s.throttled. If that happens for the first time, an Error is +// logged that the storage is now throttled. As long as signals continues to be +// sent via s.throttled at least once per minute, nothing else is logged. Once +// no signal has arrived for a minute, an Info is logged that the storage is not +// throttled anymore. This resets things to the initial state, i.e. once a +// signal arrives again, the Error will be logged again. +func (s *memorySeriesStorage) logThrottling() { + timer := time.NewTimer(time.Minute) + timer.Stop() + + for { + select { + case <-s.throttled: + if !timer.Reset(time.Minute) { + log. + With("chunksToPersist", s.getNumChunksToPersist()). + With("maxChunksToPersist", s.maxChunksToPersist). + With("memoryChunks", atomic.LoadInt64(&numMemChunks)). + With("maxToleratedMemChunks", int(float64(s.maxMemoryChunks)*toleranceFactorMemChunks)). + Error("Storage needs throttling. Scrapes and rule evaluations will be skipped.") + } + case <-timer.C: + log. + With("chunksToPersist", s.getNumChunksToPersist()). + With("maxChunksToPersist", s.maxChunksToPersist). + With("memoryChunks", atomic.LoadInt64(&numMemChunks)). + With("maxToleratedMemChunks", int(float64(s.maxMemoryChunks)*toleranceFactorMemChunks)). + Info("Storage does not need throttling anymore.") + case <-s.loopStopping: + return + } + } +} + func (s *memorySeriesStorage) getOrCreateSeries(fp model.Fingerprint, m model.Metric) *memorySeries { series, ok := s.fpToSeries.get(fp) if !ok { @@ -1210,7 +1254,7 @@ func (s *memorySeriesStorage) calculatePersistenceUrgencyScore() float64 { if chunksToPersist > maxChunksToPersist*factorMinChunksToPersist { score = math.Max( score, - (memChunks/maxMemChunks-1)/(toleranceFactorForMemChunks-1), + (memChunks/maxMemChunks-1)/(toleranceFactorMemChunks-1), ) } if score > 1 { @@ -1230,11 +1274,11 @@ func (s *memorySeriesStorage) calculatePersistenceUrgencyScore() float64 { s.rushedMode.Set(0) log. With("urgencyScore", score). - With("chunksToPersist", chunksToPersist). - With("maxChunksToPersist", maxChunksToPersist). - With("memoryChunks", memChunks). - With("maxMemoryChunks", maxMemChunks). - Warn("Storage has left rushed mode.") + With("chunksToPersist", int(chunksToPersist)). + With("maxChunksToPersist", int(maxChunksToPersist)). + With("memoryChunks", int(memChunks)). + With("maxMemoryChunks", int(maxMemChunks)). + Info("Storage has left rushed mode.") return score } if score > persintenceUrgencyScoreForEnteringRushedMode { @@ -1243,10 +1287,10 @@ func (s *memorySeriesStorage) calculatePersistenceUrgencyScore() float64 { s.rushedMode.Set(1) log. With("urgencyScore", score). - With("chunksToPersist", chunksToPersist). - With("maxChunksToPersist", maxChunksToPersist). - With("memoryChunks", memChunks). - With("maxMemoryChunks", maxMemChunks). + With("chunksToPersist", int(chunksToPersist)). + With("maxChunksToPersist", int(maxChunksToPersist)). + With("memoryChunks", int(memChunks)). + With("maxMemoryChunks", int(maxMemChunks)). Warn("Storage has entered rushed mode.") return 1 } diff --git a/storage/remote/queue_manager.go b/storage/remote/queue_manager.go index 768f2f3d51..ae3528c7de 100644 --- a/storage/remote/queue_manager.go +++ b/storage/remote/queue_manager.go @@ -132,8 +132,7 @@ func NewStorageQueueManager(tsdb StorageClient, queueCapacity int) *StorageQueue } // Append queues a sample to be sent to the remote storage. It drops the -// sample on the floor if the queue is full. It implements -// storage.SampleAppender. +// sample on the floor if the queue is full. func (t *StorageQueueManager) Append(s *model.Sample) { select { case t.queue <- s: diff --git a/storage/remote/remote.go b/storage/remote/remote.go index 91d057a638..d295f8f37c 100644 --- a/storage/remote/remote.go +++ b/storage/remote/remote.go @@ -124,6 +124,13 @@ func (s *Storage) Append(smpl *model.Sample) { } } +// NeedsThrottling implements storage.SampleAppender. It will always return +// false as a remote storage drops samples on the floor if backlogging instead +// of asking for throttling. +func (s *Storage) NeedsThrottling() bool { + return false +} + // Describe implements prometheus.Collector. func (s *Storage) Describe(ch chan<- *prometheus.Desc) { for _, q := range s.queues { diff --git a/storage/storage.go b/storage/storage.go index 9f509e2dc6..71b6bcfa86 100644 --- a/storage/storage.go +++ b/storage/storage.go @@ -18,9 +18,32 @@ import ( ) // SampleAppender is the interface to append samples to both, local and remote -// storage. +// storage. All methods are goroutine-safe. type SampleAppender interface { + // Append appends a sample to the underlying storage. Depending on the + // storage implementation, there are different guarantees for the fate + // of the sample after Append has returned. Remote storage + // implementation will simply drop samples if they cannot keep up with + // sending samples. Local storage implementations will only drop metrics + // upon unrecoverable errors. Reporting any errors is done via metrics + // and logs and not the concern of the caller. Append(*model.Sample) + // NeedsThrottling returns true if the underlying storage wishes to not + // receive any more samples. Append will still work but might lead to + // undue resource usage. It is recommended to call NeedsThrottling once + // before an upcoming batch of Append calls (e.g. a full scrape of a + // target or the evaluation of a rule group) and only proceed with the + // batch if NeedsThrottling returns false. In that way, the result of a + // scrape or of an evaluation of a rule group will always be appended + // completely or not at all, and the work of scraping or evaluation will + // not be performed in vain. Also, a call of NeedsThrottling is + // potentially expensive, so limiting the number of calls is reasonable. + // + // Only SampleAppenders for which it is considered critical to receive + // each and every sample should ever return true. SampleAppenders that + // tolerate not receiving all samples should always return false and + // instead drop samples as they see fit to avoid overload. + NeedsThrottling() bool } // Fanout is a SampleAppender that appends every sample to each SampleAppender @@ -35,3 +58,14 @@ func (f Fanout) Append(s *model.Sample) { a.Append(s) } } + +// NeedsThrottling returns true if at least one of the SampleAppenders in the +// Fanout slice is throttled. +func (f Fanout) NeedsThrottling() bool { + for _, a := range f { + if a.NeedsThrottling() { + return true + } + } + return false +} From 59f1e722df7242f665cdb28d337c8874b1e3f80e Mon Sep 17 00:00:00 2001 From: Fabian Reinartz Date: Tue, 2 Feb 2016 14:01:44 +0100 Subject: [PATCH 04/10] Return error on sample appending --- retrieval/target.go | 36 +++++++++++++++++++++++---------- storage/local/interface.go | 2 +- storage/local/storage.go | 10 ++++++--- storage/remote/queue_manager.go | 4 +++- storage/remote/remote.go | 5 +++-- storage/storage.go | 12 ++++++++--- 6 files changed, 48 insertions(+), 21 deletions(-) diff --git a/retrieval/target.go b/retrieval/target.go index 133009f4ce..f45287dd25 100644 --- a/retrieval/target.go +++ b/retrieval/target.go @@ -32,6 +32,7 @@ import ( "github.com/prometheus/prometheus/config" "github.com/prometheus/prometheus/storage" + "github.com/prometheus/prometheus/storage/local" "github.com/prometheus/prometheus/util/httputil" ) @@ -449,15 +450,29 @@ func (t *Target) scrape(appender storage.SampleAppender) (err error) { }, } - var samples model.Vector + var ( + samples model.Vector + numOutOfOrder int + ) for { if err = sdec.Decode(&samples); err != nil { break } for _, s := range samples { - appender.Append(s) + err := appender.Append(s) + if err != nil { + if err == local.ErrOutOfOrderSample { + numOutOfOrder++ + } else { + log.Warnf("Error inserting sample %v: %s", s, err) + } + } + } } + if numOutOfOrder > 0 { + log.Warnf("Error on ingesting %d out-of-order samples") + } if err == io.EOF { return nil @@ -472,7 +487,7 @@ type ruleLabelsAppender struct { labels model.LabelSet } -func (app ruleLabelsAppender) Append(s *model.Sample) { +func (app ruleLabelsAppender) Append(s *model.Sample) error { for ln, lv := range app.labels { if v, ok := s.Metric[ln]; ok && v != "" { s.Metric[model.ExportedLabelPrefix+ln] = v @@ -480,7 +495,7 @@ func (app ruleLabelsAppender) Append(s *model.Sample) { s.Metric[ln] = lv } - app.SampleAppender.Append(s) + return app.SampleAppender.Append(s) } type honorLabelsAppender struct { @@ -491,14 +506,14 @@ type honorLabelsAppender struct { // Merges the sample's metric with the given labels if the label is not // already present in the metric. // This also considers labels explicitly set to the empty string. -func (app honorLabelsAppender) Append(s *model.Sample) { +func (app honorLabelsAppender) Append(s *model.Sample) error { for ln, lv := range app.labels { if _, ok := s.Metric[ln]; !ok { s.Metric[ln] = lv } } - app.SampleAppender.Append(s) + return app.SampleAppender.Append(s) } // Applies a set of relabel configurations to the sample's metric @@ -508,19 +523,18 @@ type relabelAppender struct { relabelings []*config.RelabelConfig } -func (app relabelAppender) Append(s *model.Sample) { +func (app relabelAppender) Append(s *model.Sample) error { labels, err := Relabel(model.LabelSet(s.Metric), app.relabelings...) if err != nil { - log.Errorf("Error while relabeling metric %s: %s", s.Metric, err) - return + return fmt.Errorf("metric relabeling error %s: %s", s.Metric, err) } // Check if the timeseries was dropped. if labels == nil { - return + return nil } s.Metric = model.Metric(labels) - app.SampleAppender.Append(s) + return app.SampleAppender.Append(s) } // URL returns a copy of the target's URL. diff --git a/storage/local/interface.go b/storage/local/interface.go index e260cc7627..454c2d9d5f 100644 --- a/storage/local/interface.go +++ b/storage/local/interface.go @@ -33,7 +33,7 @@ type Storage interface { // processing.) The implementation might remove labels with empty value // from the provided Sample as those labels are considered equivalent to // a label not present at all. - Append(*model.Sample) + Append(*model.Sample) error // NeedsThrottling returns true if the Storage has too many chunks in memory // already or has too many chunks waiting for persistence. NeedsThrottling() bool diff --git a/storage/local/storage.go b/storage/local/storage.go index 29700163a2..e7a4c292a1 100644 --- a/storage/local/storage.go +++ b/storage/local/storage.go @@ -567,8 +567,10 @@ func (s *memorySeriesStorage) DropMetricsForFingerprints(fps ...model.Fingerprin } } +var ErrOutOfOrderSample = fmt.Errorf("sample timestamp out of order") + // Append implements Storage. -func (s *memorySeriesStorage) Append(sample *model.Sample) { +func (s *memorySeriesStorage) Append(sample *model.Sample) error { for ln, lv := range sample.Metric { if len(lv) == 0 { delete(sample.Metric, ln) @@ -594,11 +596,11 @@ func (s *memorySeriesStorage) Append(sample *model.Sample) { // It would be even better to also compare the sample values here, but // we don't have efficient access to a series's last value. if sample.Timestamp != series.lastTime { - log.Warnf("Ignoring sample with out-of-order timestamp for fingerprint %v (%v): %v is not after %v", fp, series.metric, sample.Timestamp, series.lastTime) s.outOfOrderSamplesCount.Inc() + return ErrOutOfOrderSample } s.fpLocker.Unlock(fp) - return + return nil } completedChunksCount := series.add(&model.SamplePair{ Value: sample.Value, @@ -607,6 +609,8 @@ func (s *memorySeriesStorage) Append(sample *model.Sample) { s.fpLocker.Unlock(fp) s.ingestedSamplesCount.Inc() s.incNumChunksToPersist(completedChunksCount) + + return nil } // NeedsThrottling implements Storage. diff --git a/storage/remote/queue_manager.go b/storage/remote/queue_manager.go index ae3528c7de..4ed739707f 100644 --- a/storage/remote/queue_manager.go +++ b/storage/remote/queue_manager.go @@ -133,13 +133,15 @@ func NewStorageQueueManager(tsdb StorageClient, queueCapacity int) *StorageQueue // Append queues a sample to be sent to the remote storage. It drops the // sample on the floor if the queue is full. -func (t *StorageQueueManager) Append(s *model.Sample) { +// Always returns nil. +func (t *StorageQueueManager) Append(s *model.Sample) error { select { case t.queue <- s: default: t.samplesCount.WithLabelValues(dropped).Inc() log.Warn("Remote storage queue full, discarding sample.") } + return nil } // Stop stops sending samples to the remote storage and waits for pending diff --git a/storage/remote/remote.go b/storage/remote/remote.go index d295f8f37c..6c0ddba9de 100644 --- a/storage/remote/remote.go +++ b/storage/remote/remote.go @@ -104,8 +104,8 @@ func (s *Storage) Stop() { } } -// Append implements storage.SampleAppender. -func (s *Storage) Append(smpl *model.Sample) { +// Append implements storage.SampleAppender. Always returns nil. +func (s *Storage) Append(smpl *model.Sample) error { s.mtx.RLock() var snew model.Sample @@ -122,6 +122,7 @@ func (s *Storage) Append(smpl *model.Sample) { for _, q := range s.queues { q.Append(&snew) } + return nil } // NeedsThrottling implements storage.SampleAppender. It will always return diff --git a/storage/storage.go b/storage/storage.go index 71b6bcfa86..86730d6437 100644 --- a/storage/storage.go +++ b/storage/storage.go @@ -27,7 +27,7 @@ type SampleAppender interface { // sending samples. Local storage implementations will only drop metrics // upon unrecoverable errors. Reporting any errors is done via metrics // and logs and not the concern of the caller. - Append(*model.Sample) + Append(*model.Sample) error // NeedsThrottling returns true if the underlying storage wishes to not // receive any more samples. Append will still work but might lead to // undue resource usage. It is recommended to call NeedsThrottling once @@ -53,10 +53,16 @@ type Fanout []SampleAppender // Append implements SampleAppender. It appends the provided sample to all // SampleAppenders in the Fanout slice and waits for each append to complete // before proceeding with the next. -func (f Fanout) Append(s *model.Sample) { +// If any of the SampleAppenders returns an error, the first one is returned +// at the end. +func (f Fanout) Append(s *model.Sample) error { + var err error for _, a := range f { - a.Append(s) + if e := a.Append(s); e != nil && err == nil { + err = e + } } + return err } // NeedsThrottling returns true if at least one of the SampleAppenders in the From d0d2c38c68be373d6c6e3726083fb262175e152f Mon Sep 17 00:00:00 2001 From: Fabian Reinartz Date: Wed, 3 Feb 2016 10:17:08 +0100 Subject: [PATCH 05/10] Fix tests for append API changes --- retrieval/helpers_test.go | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/retrieval/helpers_test.go b/retrieval/helpers_test.go index 1b74b2ea4a..d19f6b41f1 100644 --- a/retrieval/helpers_test.go +++ b/retrieval/helpers_test.go @@ -21,7 +21,8 @@ import ( type nopAppender struct{} -func (a nopAppender) Append(*model.Sample) { +func (a nopAppender) Append(*model.Sample) error { + return nil } func (a nopAppender) NeedsThrottling() bool { @@ -33,13 +34,14 @@ type collectResultAppender struct { throttled bool } -func (a *collectResultAppender) Append(s *model.Sample) { +func (a *collectResultAppender) Append(s *model.Sample) error { for ln, lv := range s.Metric { if len(lv) == 0 { delete(s.Metric, ln) } } a.result = append(a.result, s) + return nil } func (a *collectResultAppender) NeedsThrottling() bool { From 1f877f3d2abc7d87e7cbfadeb7ca8c24ce8dc029 Mon Sep 17 00:00:00 2001 From: Fabian Reinartz Date: Wed, 3 Feb 2016 10:39:34 +0100 Subject: [PATCH 06/10] Fix deadlock, structure target logging --- retrieval/target.go | 5 +++-- storage/local/storage.go | 2 +- storage/storage.go | 3 +-- 3 files changed, 5 insertions(+), 5 deletions(-) diff --git a/retrieval/target.go b/retrieval/target.go index f45287dd25..bf9adb0c69 100644 --- a/retrieval/target.go +++ b/retrieval/target.go @@ -453,6 +453,7 @@ func (t *Target) scrape(appender storage.SampleAppender) (err error) { var ( samples model.Vector numOutOfOrder int + logger = log.With("target", t.InstanceIdentifier()) ) for { if err = sdec.Decode(&samples); err != nil { @@ -464,14 +465,14 @@ func (t *Target) scrape(appender storage.SampleAppender) (err error) { if err == local.ErrOutOfOrderSample { numOutOfOrder++ } else { - log.Warnf("Error inserting sample %v: %s", s, err) + logger.With("sample", s).Warnf("Error inserting sample: %s", err) } } } } if numOutOfOrder > 0 { - log.Warnf("Error on ingesting %d out-of-order samples") + logger.With("numDropped", numOutOfOrder).Warn("Error on ingesting out-of-order samples") } if err == io.EOF { diff --git a/storage/local/storage.go b/storage/local/storage.go index e7a4c292a1..62453d89d5 100644 --- a/storage/local/storage.go +++ b/storage/local/storage.go @@ -591,6 +591,7 @@ func (s *memorySeriesStorage) Append(sample *model.Sample) error { series := s.getOrCreateSeries(fp, sample.Metric) if sample.Timestamp <= series.lastTime { + s.fpLocker.Unlock(fp) // Don't log and track equal timestamps, as they are a common occurrence // when using client-side timestamps (e.g. Pushgateway or federation). // It would be even better to also compare the sample values here, but @@ -599,7 +600,6 @@ func (s *memorySeriesStorage) Append(sample *model.Sample) error { s.outOfOrderSamplesCount.Inc() return ErrOutOfOrderSample } - s.fpLocker.Unlock(fp) return nil } completedChunksCount := series.add(&model.SamplePair{ diff --git a/storage/storage.go b/storage/storage.go index 86730d6437..5acae673e2 100644 --- a/storage/storage.go +++ b/storage/storage.go @@ -25,8 +25,7 @@ type SampleAppender interface { // of the sample after Append has returned. Remote storage // implementation will simply drop samples if they cannot keep up with // sending samples. Local storage implementations will only drop metrics - // upon unrecoverable errors. Reporting any errors is done via metrics - // and logs and not the concern of the caller. + // upon unrecoverable errors. Append(*model.Sample) error // NeedsThrottling returns true if the underlying storage wishes to not // receive any more samples. Append will still work but might lead to From 0b02315517d929b3da00606c005e6337b7e375eb Mon Sep 17 00:00:00 2001 From: Fabian Reinartz Date: Thu, 4 Feb 2016 11:56:14 +0100 Subject: [PATCH 07/10] Sanitize POST URL for AM integration --- notification/notification.go | 3 ++- notification/notification_test.go | 37 +++++++++++++++++++++++++++++++ 2 files changed, 39 insertions(+), 1 deletion(-) diff --git a/notification/notification.go b/notification/notification.go index 3279bd2c52..7bf63019ad 100644 --- a/notification/notification.go +++ b/notification/notification.go @@ -18,6 +18,7 @@ import ( "encoding/json" "fmt" "net/http" + "strings" "sync" "time" @@ -239,7 +240,7 @@ func (n *Handler) setMore() { } func (n *Handler) postURL() string { - return n.opts.AlertmanagerURL + alertPushEndpoint + return strings.TrimRight(n.opts.AlertmanagerURL, "/") + alertPushEndpoint } func (n *Handler) send(alerts ...*model.Alert) error { diff --git a/notification/notification_test.go b/notification/notification_test.go index 29d057b65d..7c6de0a7ad 100644 --- a/notification/notification_test.go +++ b/notification/notification_test.go @@ -25,6 +25,43 @@ import ( "github.com/prometheus/common/model" ) +func TestHandlerPostURL(t *testing.T) { + var cases = []struct { + in, out string + }{ + { + in: "http://localhost:9093", + out: "http://localhost:9093/api/v1/alerts", + }, + { + in: "http://localhost:9093/", + out: "http://localhost:9093/api/v1/alerts", + }, + { + in: "http://localhost:9093/prefix", + out: "http://localhost:9093/prefix/api/v1/alerts", + }, + { + in: "http://localhost:9093/prefix//", + out: "http://localhost:9093/prefix/api/v1/alerts", + }, + { + in: "http://localhost:9093/prefix//", + out: "http://localhost:9093/prefix/api/v1/alerts", + }, + } + h := &Handler{ + opts: &HandlerOptions{}, + } + + for _, c := range cases { + h.opts.AlertmanagerURL = c.in + if res := h.postURL(); res != c.out { + t.Errorf("Expected post URL %q for %q but got %q", c.out, c.in, res) + } + } +} + func TestHandlerNextBatch(t *testing.T) { h := New(&HandlerOptions{}) From f1f8317fa5a98410bb9c1705f91b47c4154679d0 Mon Sep 17 00:00:00 2001 From: Tobias Schmidt Date: Thu, 4 Feb 2016 23:42:55 -0500 Subject: [PATCH 08/10] Fix detection of flapping alerts Alerts in the resolve retention period must be transitioned to the active state again when their condition is met. --- rules/alerting.go | 4 ++-- rules/manager_test.go | 33 +++++++++++++++++++++------------ 2 files changed, 23 insertions(+), 14 deletions(-) diff --git a/rules/alerting.go b/rules/alerting.go index a2a389d315..fdd3d00a88 100644 --- a/rules/alerting.go +++ b/rules/alerting.go @@ -39,7 +39,7 @@ const ( type AlertState int const ( - // StateInactive is the state of an alert that is either firing nor pending. + // StateInactive is the state of an alert that is neither firing nor pending. StateInactive AlertState = iota // StatePending is the state of an alert that has been active for less than // the configured threshold duration. @@ -159,7 +159,7 @@ func (r *AlertingRule) eval(ts model.Time, engine *promql.Engine) (model.Vector, fp := smpl.Metric.Fingerprint() resultFPs[fp] = struct{}{} - if alert, ok := r.active[fp]; ok { + if alert, ok := r.active[fp]; ok && alert.State != StateInactive { alert.Value = smpl.Value continue } diff --git a/rules/manager_test.go b/rules/manager_test.go index 463388e154..40e57203a3 100644 --- a/rules/manager_test.go +++ b/rules/manager_test.go @@ -27,14 +27,8 @@ import ( func TestAlertingRule(t *testing.T) { suite, err := promql.NewTest(t, ` load 5m - http_requests{job="api-server", instance="0", group="production"} 0+10x10 - http_requests{job="api-server", instance="1", group="production"} 0+20x10 - http_requests{job="api-server", instance="0", group="canary"} 0+30x10 - http_requests{job="api-server", instance="1", group="canary"} 0+40x10 - http_requests{job="app-server", instance="0", group="production"} 0+50x10 - http_requests{job="app-server", instance="1", group="production"} 0+60x10 - http_requests{job="app-server", instance="0", group="canary"} 0+70x10 - http_requests{job="app-server", instance="1", group="canary"} 0+80x10 + http_requests{job="app-server", instance="0", group="canary"} 75 85 95 105 105 95 85 + http_requests{job="app-server", instance="1", group="canary"} 80 90 100 110 120 130 140 `) if err != nil { t.Fatal(err) @@ -79,17 +73,32 @@ func TestAlertingRule(t *testing.T) { }, { time: 10 * time.Minute, result: []string{ + `ALERTS{alertname="HTTPRequestRateLow", alertstate="firing", group="canary", instance="0", job="app-server", severity="critical"} => 1 @[%v]`, `ALERTS{alertname="HTTPRequestRateLow", alertstate="firing", group="canary", instance="1", job="app-server", severity="critical"} => 0 @[%v]`, + }, + }, + { + time: 15 * time.Minute, + result: []string{ `ALERTS{alertname="HTTPRequestRateLow", alertstate="firing", group="canary", instance="0", job="app-server", severity="critical"} => 0 @[%v]`, }, }, { - time: 15 * time.Minute, - result: nil, + time: 20 * time.Minute, + result: []string{}, }, { - time: 20 * time.Minute, - result: nil, + time: 25 * time.Minute, + result: []string{ + `ALERTS{alertname="HTTPRequestRateLow", alertstate="pending", group="canary", instance="0", job="app-server", severity="critical"} => 1 @[%v]`, + }, + }, + { + time: 30 * time.Minute, + result: []string{ + `ALERTS{alertname="HTTPRequestRateLow", alertstate="pending", group="canary", instance="0", job="app-server", severity="critical"} => 0 @[%v]`, + `ALERTS{alertname="HTTPRequestRateLow", alertstate="firing", group="canary", instance="0", job="app-server", severity="critical"} => 1 @[%v]`, + }, }, } From 211cb10f1343c72f796b0d17523d7f00b74d0f30 Mon Sep 17 00:00:00 2001 From: Tobias Schmidt Date: Tue, 2 Feb 2016 22:09:26 -0500 Subject: [PATCH 09/10] Use https://prometheus.io --- web/ui/templates/_base.html | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/web/ui/templates/_base.html b/web/ui/templates/_base.html index 61ddc04543..c9d7f4d06a 100644 --- a/web/ui/templates/_base.html +++ b/web/ui/templates/_base.html @@ -41,7 +41,7 @@
  • Graph
  • Status
  • - Help + Help
  • From 328e1d733ac78f9d9b81a11bf3bbc3d8e44bc283 Mon Sep 17 00:00:00 2001 From: Fabian Reinartz Date: Fri, 5 Feb 2016 11:30:31 +0100 Subject: [PATCH 10/10] Change `make` to `make build` For building from source as an end user running the tests is not necessary. --- README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/README.md b/README.md index 8531a6b567..a7c3be6440 100644 --- a/README.md +++ b/README.md @@ -52,7 +52,7 @@ You can also clone the repository yourself and build using `make`: $ cd $GOPATH/src/github.com/prometheus $ git clone https://github.com/prometheus/prometheus.git $ cd prometheus - $ make + $ make build $ ./prometheus -config.file=your_config.yml The Makefile provides several targets: