diff --git a/main.go b/main.go index 2c27f2f533..9b6a52decd 100644 --- a/main.go +++ b/main.go @@ -24,13 +24,13 @@ import ( "github.com/golang/glog" - clientmodel "github.com/prometheus/client_golang/model" registry "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/prometheus/config" "github.com/prometheus/prometheus/notification" "github.com/prometheus/prometheus/retrieval" "github.com/prometheus/prometheus/rules/manager" + "github.com/prometheus/prometheus/storage" "github.com/prometheus/prometheus/storage/local" "github.com/prometheus/prometheus/storage/remote" "github.com/prometheus/prometheus/storage/remote/opentsdb" @@ -52,7 +52,7 @@ var ( remoteTSDBUrl = flag.String("storage.remote.url", "", "The URL of the OpenTSDB instance to send samples to.") remoteTSDBTimeout = flag.Duration("storage.remote.timeout", 30*time.Second, "The timeout to use when sending samples to OpenTSDB.") - samplesQueueCapacity = flag.Int("storage.incoming-samples-queue-capacity", 64*1024, "The capacity of the queue of samples to be stored. Note that each slot in the queue takes a whole slice of samples whose size depends on details of the scrape process.") + samplesQueueCapacity = flag.Int("storage.incoming-samples-queue-capacity", 0, "Deprecated. Has no effect anymore.") numMemoryChunks = flag.Int("storage.local.memory-chunks", 1024*1024, "How many chunks to keep in memory. While the size of a chunk is 1kiB, the total memory usage will be significantly higher than this value * 1kiB. Furthermore, for various reasons, more chunks might have to be kept in memory temporarily.") @@ -67,23 +67,7 @@ var ( printVersion = flag.Bool("version", false, "Print version information.") ) -// Instrumentation. -var ( - samplesQueueCapDesc = registry.NewDesc( - "prometheus_samples_queue_capacity", - "Capacity of the queue for unwritten samples.", - nil, nil, - ) - samplesQueueLenDesc = registry.NewDesc( - "prometheus_samples_queue_length", - "Current number of items in the queue for unwritten samples. Each item comprises all samples exposed by one target as one metric family (i.e. metrics of the same name).", - nil, nil, - ) -) - type prometheus struct { - incomingSamples chan clientmodel.Samples - ruleManager manager.RuleManager targetManager retrieval.TargetManager notificationHandler *notification.NotificationHandler @@ -103,16 +87,6 @@ func NewPrometheus() *prometheus { glog.Fatalf("Error loading configuration from %s: %v", *configFile, err) } - incomingSamples := make(chan clientmodel.Samples, *samplesQueueCapacity) - - ingester := &retrieval.MergeLabelsIngester{ - Labels: conf.GlobalLabels(), - CollisionPrefix: clientmodel.ExporterLabelPrefix, - Ingester: retrieval.ChannelIngester(incomingSamples), - } - targetManager := retrieval.NewTargetManager(ingester) - targetManager.AddTargetsFromConfig(conf) - notificationHandler := notification.NewNotificationHandler(*alertmanagerURL, *notificationQueueCapacity) o := &local.MemorySeriesStorageOptions{ @@ -129,8 +103,25 @@ func NewPrometheus() *prometheus { glog.Fatal("Error opening memory series storage: ", err) } + var sampleAppender storage.SampleAppender + var remoteTSDBQueue *remote.TSDBQueueManager + if *remoteTSDBUrl == "" { + glog.Warningf("No TSDB URL provided; not sending any samples to long-term storage") + sampleAppender = memStorage + } else { + openTSDB := opentsdb.NewClient(*remoteTSDBUrl, *remoteTSDBTimeout) + remoteTSDBQueue = remote.NewTSDBQueueManager(openTSDB, 100*1024) + sampleAppender = storage.Tee{ + Appender1: remoteTSDBQueue, + Appender2: memStorage, + } + } + + targetManager := retrieval.NewTargetManager(sampleAppender, conf.GlobalLabels()) + targetManager.AddTargetsFromConfig(conf) + ruleManager := manager.NewRuleManager(&manager.RuleManagerOptions{ - Results: incomingSamples, + SampleAppender: sampleAppender, NotificationHandler: notificationHandler, EvaluationInterval: conf.EvaluationInterval(), Storage: memStorage, @@ -140,14 +131,6 @@ func NewPrometheus() *prometheus { glog.Fatal("Error loading rule files: ", err) } - var remoteTSDBQueue *remote.TSDBQueueManager - if *remoteTSDBUrl == "" { - glog.Warningf("No TSDB URL provided; not sending any samples to long-term storage") - } else { - openTSDB := opentsdb.NewClient(*remoteTSDBUrl, *remoteTSDBTimeout) - remoteTSDBQueue = remote.NewTSDBQueueManager(openTSDB, 512) - } - flags := map[string]string{} flag.VisitAll(func(f *flag.Flag) { flags[f.Name] = f.Value.String() @@ -183,8 +166,6 @@ func NewPrometheus() *prometheus { } p := &prometheus{ - incomingSamples: incomingSamples, - ruleManager: ruleManager, targetManager: targetManager, notificationHandler: notificationHandler, @@ -193,7 +174,7 @@ func NewPrometheus() *prometheus { webService: webService, } - webService.QuitDelegate = p.Close + webService.QuitChan = make(chan struct{}) return p } @@ -206,7 +187,6 @@ func (p *prometheus) Serve() { } go p.ruleManager.Run() go p.notificationHandler.Run() - go p.interruptHandler() p.storage.Start() @@ -217,15 +197,18 @@ func (p *prometheus) Serve() { } }() - for samples := range p.incomingSamples { - p.storage.AppendSamples(samples) - if p.remoteTSDBQueue != nil { - p.remoteTSDBQueue.Queue(samples) - } + notifier := make(chan os.Signal) + signal.Notify(notifier, os.Interrupt, syscall.SIGTERM) + select { + case <-notifier: + glog.Warning("Received SIGTERM, exiting gracefully...") + case <-p.webService.QuitChan: + glog.Warning("Received termination request via web service, exiting gracefully...") } - // The following shut-down operations have to happen after - // incomingSamples is drained. So do not move them into close(). + p.targetManager.Stop() + p.ruleManager.Stop() + if err := p.storage.Stop(); err != nil { glog.Error("Error stopping local storage: ", err) } @@ -238,35 +221,8 @@ func (p *prometheus) Serve() { glog.Info("See you next time!") } -// Close cleanly shuts down the Prometheus server. -func (p *prometheus) Close() { - p.closeOnce.Do(p.close) -} - -func (p *prometheus) interruptHandler() { - notifier := make(chan os.Signal) - signal.Notify(notifier, os.Interrupt, syscall.SIGTERM) - <-notifier - - glog.Warning("Received SIGTERM, exiting gracefully...") - p.Close() -} - -func (p *prometheus) close() { - glog.Info("Shutdown has been requested; subsytems are closing:") - p.targetManager.Stop() - p.ruleManager.Stop() - - close(p.incomingSamples) - // Note: Before closing the remaining subsystems (storage, ...), we have - // to wait until p.incomingSamples is actually drained. Therefore, - // remaining shut-downs happen in Serve(). -} - // Describe implements registry.Collector. func (p *prometheus) Describe(ch chan<- *registry.Desc) { - ch <- samplesQueueCapDesc - ch <- samplesQueueLenDesc p.notificationHandler.Describe(ch) p.storage.Describe(ch) if p.remoteTSDBQueue != nil { @@ -276,16 +232,6 @@ func (p *prometheus) Describe(ch chan<- *registry.Desc) { // Collect implements registry.Collector. func (p *prometheus) Collect(ch chan<- registry.Metric) { - ch <- registry.MustNewConstMetric( - samplesQueueCapDesc, - registry.GaugeValue, - float64(cap(p.incomingSamples)), - ) - ch <- registry.MustNewConstMetric( - samplesQueueLenDesc, - registry.GaugeValue, - float64(len(p.incomingSamples)), - ) p.notificationHandler.Collect(ch) p.storage.Collect(ch) if p.remoteTSDBQueue != nil { diff --git a/retrieval/helpers_test.go b/retrieval/helpers_test.go index cfc7514b84..ce96aa2e88 100644 --- a/retrieval/helpers_test.go +++ b/retrieval/helpers_test.go @@ -14,11 +14,27 @@ package retrieval import ( + "time" + clientmodel "github.com/prometheus/client_golang/model" ) -type nopIngester struct{} +type nopAppender struct{} -func (i nopIngester) Ingest(clientmodel.Samples) error { - return nil +func (a nopAppender) Append(*clientmodel.Sample) { +} + +type slowAppender struct{} + +func (a slowAppender) Append(*clientmodel.Sample) { + time.Sleep(time.Millisecond) + return +} + +type collectResultAppender struct { + result clientmodel.Samples +} + +func (a *collectResultAppender) Append(s *clientmodel.Sample) { + a.result = append(a.result, s) } diff --git a/retrieval/ingester.go b/retrieval/ingester.go deleted file mode 100644 index 7bcda4da6f..0000000000 --- a/retrieval/ingester.go +++ /dev/null @@ -1,71 +0,0 @@ -// Copyright 2013 The Prometheus Authors -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package retrieval - -import ( - "errors" - "time" - - "github.com/prometheus/client_golang/extraction" - - clientmodel "github.com/prometheus/client_golang/model" -) - -const ingestTimeout = 100 * time.Millisecond // TODO(beorn7): Adjust this to a fraction of the actual HTTP timeout. - -var errIngestChannelFull = errors.New("ingestion channel full") - -// MergeLabelsIngester merges a labelset ontop of a given extraction result and -// passes the result on to another ingester. Label collisions are avoided by -// appending a label prefix to any newly merged colliding labels. -type MergeLabelsIngester struct { - Labels clientmodel.LabelSet - CollisionPrefix clientmodel.LabelName - - Ingester extraction.Ingester -} - -// Ingest ingests the provided extraction result by merging in i.Labels and then -// handing it over to i.Ingester. -func (i *MergeLabelsIngester) Ingest(samples clientmodel.Samples) error { - for _, s := range samples { - s.Metric.MergeFromLabelSet(i.Labels, i.CollisionPrefix) - } - - return i.Ingester.Ingest(samples) -} - -// ChannelIngester feeds results into a channel without modifying them. -type ChannelIngester chan<- clientmodel.Samples - -// Ingest ingests the provided extraction result by sending it to its channel. -// If the channel was not able to receive the samples within the ingestTimeout, -// an error is returned. This is important to fail fast and to not pile up -// ingestion requests in case of overload. -func (i ChannelIngester) Ingest(s clientmodel.Samples) error { - // Since the regular case is that i is ready to receive, first try - // without setting a timeout so that we don't need to allocate a timer - // most of the time. - select { - case i <- s: - return nil - default: - select { - case i <- s: - return nil - case <-time.After(ingestTimeout): - return errIngestChannelFull - } - } -} diff --git a/retrieval/target.go b/retrieval/target.go index f734f5f078..3fc758a004 100644 --- a/retrieval/target.go +++ b/retrieval/target.go @@ -14,6 +14,7 @@ package retrieval import ( + "errors" "fmt" "math/rand" "net/http" @@ -29,6 +30,7 @@ import ( clientmodel "github.com/prometheus/client_golang/model" + "github.com/prometheus/prometheus/storage" "github.com/prometheus/prometheus/utility" ) @@ -41,6 +43,8 @@ const ( // ScrapeTimeMetricName is the metric name for the synthetic scrape duration // variable. scrapeDurationMetricName clientmodel.LabelValue = "scrape_duration_seconds" + // Capacity of the channel to buffer samples during ingestion. + ingestedSamplesCap = 256 // Constants for instrumentation. namespace = "prometheus" @@ -48,6 +52,8 @@ const ( ) var ( + errIngestChannelFull = errors.New("ingestion channel full") + localhostRepresentations = []string{"http://127.0.0.1", "http://localhost"} targetIntervalLength = prometheus.NewSummaryVec( @@ -100,6 +106,8 @@ const ( // For the future, the Target protocol will abstract away the exact means that // metrics are retrieved and deserialized from the given instance to which it // refers. +// +// Target implements extraction.Ingester. type Target interface { // Return the last encountered scrape error, if any. LastError() error @@ -125,9 +133,11 @@ type Target interface { // begins). SetBaseLabelsFrom(Target) // Scrape target at the specified interval. - RunScraper(extraction.Ingester, time.Duration) + RunScraper(storage.SampleAppender, time.Duration) // Stop scraping, synchronous. StopScraper() + // Ingest implements extraction.Ingester. + Ingest(clientmodel.Samples) error } // target is a Target that refers to a singular HTTP or HTTPS endpoint. @@ -144,10 +154,12 @@ type target struct { scraperStopped chan struct{} // Channel to queue base labels to be replaced. newBaseLabels chan clientmodel.LabelSet + // Channel to buffer ingested samples. + ingestedSamples chan clientmodel.Samples url string // What is the deadline for the HTTP or HTTPS against this endpoint. - Deadline time.Duration + deadline time.Duration // Any base labels that are added to this target and its metrics. baseLabels clientmodel.LabelSet // The HTTP client used to scrape the target's endpoint. @@ -163,52 +175,42 @@ type target struct { // NewTarget creates a reasonably configured target for querying. func NewTarget(url string, deadline time.Duration, baseLabels clientmodel.LabelSet) Target { - target := &target{ + t := &target{ url: url, - Deadline: deadline, - baseLabels: baseLabels, + deadline: deadline, httpClient: utility.NewDeadlineClient(deadline), scraperStopping: make(chan struct{}), scraperStopped: make(chan struct{}), newBaseLabels: make(chan clientmodel.LabelSet, 1), } - - return target + labels := clientmodel.LabelSet{InstanceLabel: clientmodel.LabelValue(t.InstanceIdentifier())} + for baseLabel, baseValue := range baseLabels { + labels[baseLabel] = baseValue + } + t.baseLabels = labels + return t } -func (t *target) recordScrapeHealth(ingester extraction.Ingester, timestamp clientmodel.Timestamp, healthy bool, scrapeDuration time.Duration) { - healthMetric := clientmodel.Metric{} - durationMetric := clientmodel.Metric{} - for label, value := range t.baseLabels { - healthMetric[label] = value - durationMetric[label] = value +// Ingest implements Target and extraction.Ingester. +func (t *target) Ingest(s clientmodel.Samples) error { + // Since the regular case is that ingestedSamples is ready to receive, + // first try without setting a timeout so that we don't need to allocate + // a timer most of the time. + select { + case t.ingestedSamples <- s: + return nil + default: + select { + case t.ingestedSamples <- s: + return nil + case <-time.After(t.deadline / 10): + return errIngestChannelFull + } } - healthMetric[clientmodel.MetricNameLabel] = clientmodel.LabelValue(scrapeHealthMetricName) - durationMetric[clientmodel.MetricNameLabel] = clientmodel.LabelValue(scrapeDurationMetricName) - healthMetric[InstanceLabel] = clientmodel.LabelValue(t.InstanceIdentifier()) - durationMetric[InstanceLabel] = clientmodel.LabelValue(t.InstanceIdentifier()) - - healthValue := clientmodel.SampleValue(0) - if healthy { - healthValue = clientmodel.SampleValue(1) - } - - healthSample := &clientmodel.Sample{ - Metric: healthMetric, - Timestamp: timestamp, - Value: healthValue, - } - durationSample := &clientmodel.Sample{ - Metric: durationMetric, - Timestamp: timestamp, - Value: clientmodel.SampleValue(float64(scrapeDuration) / float64(time.Second)), - } - - ingester.Ingest(clientmodel.Samples{healthSample, durationSample}) } // RunScraper implements Target. -func (t *target) RunScraper(ingester extraction.Ingester, interval time.Duration) { +func (t *target) RunScraper(sampleAppender storage.SampleAppender, interval time.Duration) { defer func() { // Need to drain t.newBaseLabels to not make senders block during shutdown. for { @@ -237,7 +239,7 @@ func (t *target) RunScraper(ingester extraction.Ingester, interval time.Duration t.Lock() // Writing t.lastScrape requires the lock. t.lastScrape = time.Now() t.Unlock() - t.scrape(ingester) + t.scrape(sampleAppender) // Explanation of the contraption below: // @@ -271,7 +273,7 @@ func (t *target) RunScraper(ingester extraction.Ingester, interval time.Duration targetIntervalLength.WithLabelValues(interval.String()).Observe( float64(took) / float64(time.Second), // Sub-second precision. ) - t.scrape(ingester) + t.scrape(sampleAppender) } } } @@ -285,7 +287,7 @@ func (t *target) StopScraper() { const acceptHeader = `application/vnd.google.protobuf;proto=io.prometheus.client.MetricFamily;encoding=delimited;q=0.7,text/plain;version=0.0.4;q=0.3,application/json;schema="prometheus/telemetry";version=0.0.2;q=0.2,*/*;q=0.1` -func (t *target) scrape(ingester extraction.Ingester) (err error) { +func (t *target) scrape(sampleAppender storage.SampleAppender) (err error) { timestamp := clientmodel.Now() defer func(start time.Time) { t.Lock() // Writing t.state and t.lastError requires the lock. @@ -296,7 +298,7 @@ func (t *target) scrape(ingester extraction.Ingester) (err error) { } t.lastError = err t.Unlock() - t.recordScrapeHealth(ingester, timestamp, err == nil, time.Since(start)) + t.recordScrapeHealth(sampleAppender, timestamp, err == nil, time.Since(start)) }(time.Now()) req, err := http.NewRequest("GET", t.URL(), nil) @@ -319,21 +321,23 @@ func (t *target) scrape(ingester extraction.Ingester) (err error) { return err } - baseLabels := clientmodel.LabelSet{InstanceLabel: clientmodel.LabelValue(t.InstanceIdentifier())} - for baseLabel, baseValue := range t.baseLabels { - baseLabels[baseLabel] = baseValue - } + t.ingestedSamples = make(chan clientmodel.Samples, ingestedSamplesCap) - i := &MergeLabelsIngester{ - Labels: baseLabels, - CollisionPrefix: clientmodel.ExporterLabelPrefix, - - Ingester: ingester, - } processOptions := &extraction.ProcessOptions{ Timestamp: timestamp, } - return processor.ProcessSingle(resp.Body, i, processOptions) + go func() { + err = processor.ProcessSingle(resp.Body, t, processOptions) + close(t.ingestedSamples) + }() + + for samples := range t.ingestedSamples { + for _, s := range samples { + s.Metric.MergeFromLabelSet(t.baseLabels, clientmodel.ExporterLabelPrefix) + sampleAppender.Append(s) + } + } + return err } // LastError implements Target. @@ -412,3 +416,33 @@ func (t *target) SetBaseLabelsFrom(newTarget Target) { } t.newBaseLabels <- newTarget.BaseLabels() } + +func (t *target) recordScrapeHealth(sampleAppender storage.SampleAppender, timestamp clientmodel.Timestamp, healthy bool, scrapeDuration time.Duration) { + healthMetric := clientmodel.Metric{} + durationMetric := clientmodel.Metric{} + for label, value := range t.baseLabels { + healthMetric[label] = value + durationMetric[label] = value + } + healthMetric[clientmodel.MetricNameLabel] = clientmodel.LabelValue(scrapeHealthMetricName) + durationMetric[clientmodel.MetricNameLabel] = clientmodel.LabelValue(scrapeDurationMetricName) + + healthValue := clientmodel.SampleValue(0) + if healthy { + healthValue = clientmodel.SampleValue(1) + } + + healthSample := &clientmodel.Sample{ + Metric: healthMetric, + Timestamp: timestamp, + Value: healthValue, + } + durationSample := &clientmodel.Sample{ + Metric: durationMetric, + Timestamp: timestamp, + Value: clientmodel.SampleValue(float64(scrapeDuration) / float64(time.Second)), + } + + sampleAppender.Append(healthSample) + sampleAppender.Append(durationSample) +} diff --git a/retrieval/target_provider.go b/retrieval/target_provider.go index 0b3fbebf01..43580e4456 100644 --- a/retrieval/target_provider.go +++ b/retrieval/target_provider.go @@ -59,22 +59,23 @@ type TargetProvider interface { } type sdTargetProvider struct { - job config.JobConfig - - targets []Target + job config.JobConfig + globalLabels clientmodel.LabelSet + targets []Target lastRefresh time.Time refreshInterval time.Duration } // NewSdTargetProvider constructs a new sdTargetProvider for a job. -func NewSdTargetProvider(job config.JobConfig) *sdTargetProvider { +func NewSdTargetProvider(job config.JobConfig, globalLabels clientmodel.LabelSet) *sdTargetProvider { i, err := utility.StringToDuration(job.GetSdRefreshInterval()) if err != nil { panic(fmt.Sprintf("illegal refresh duration string %s: %s", job.GetSdRefreshInterval(), err)) } return &sdTargetProvider{ job: job, + globalLabels: globalLabels, refreshInterval: i, } } @@ -101,6 +102,9 @@ func (p *sdTargetProvider) Targets() ([]Target, error) { baseLabels := clientmodel.LabelSet{ clientmodel.JobLabel: clientmodel.LabelValue(p.job.GetName()), } + for n, v := range p.globalLabels { + baseLabels[n] = v + } targets := make([]Target, 0, len(response.Answer)) endpoint := &url.URL{ diff --git a/retrieval/target_test.go b/retrieval/target_test.go index 3731a56cf3..5b96b4cc30 100644 --- a/retrieval/target_test.go +++ b/retrieval/target_test.go @@ -15,6 +15,7 @@ package retrieval import ( "errors" + "fmt" "net/http" "net/http/httptest" "testing" @@ -25,15 +26,6 @@ import ( "github.com/prometheus/prometheus/utility" ) -type collectResultIngester struct { - result clientmodel.Samples -} - -func (i *collectResultIngester) Ingest(s clientmodel.Samples) error { - i.result = s - return nil -} - func TestTargetHidesURLAuth(t *testing.T) { testVectors := []string{"http://secret:data@host.com/query?args#fragment", "https://example.net/foo", "http://foo.com:31337/bar"} testResults := []string{"host.com:80", "example.net:443", "foo.com:31337"} @@ -60,7 +52,7 @@ func TestTargetScrapeUpdatesState(t *testing.T) { url: "bad schema", httpClient: utility.NewDeadlineClient(0), } - testTarget.scrape(nopIngester{}) + testTarget.scrape(nopAppender{}) if testTarget.state != Unhealthy { t.Errorf("Expected target state %v, actual: %v", Unhealthy, testTarget.state) } @@ -71,7 +63,11 @@ func TestTargetScrapeWithFullChannel(t *testing.T) { http.HandlerFunc( func(w http.ResponseWriter, r *http.Request) { w.Header().Set("Content-Type", `text/plain; version=0.0.4`) - w.Write([]byte("test_metric{foo=\"bar\"} 123.456\n")) + for i := 0; i < 2*ingestedSamplesCap; i++ { + w.Write([]byte( + fmt.Sprintf("test_metric_%d{foo=\"bar\"} 123.456\n", i), + )) + } }, ), ) @@ -79,11 +75,11 @@ func TestTargetScrapeWithFullChannel(t *testing.T) { testTarget := NewTarget( server.URL, - 100*time.Millisecond, + 10*time.Millisecond, clientmodel.LabelSet{"dings": "bums"}, ).(*target) - testTarget.scrape(ChannelIngester(make(chan clientmodel.Samples))) // Capacity 0. + testTarget.scrape(slowAppender{}) if testTarget.state != Unhealthy { t.Errorf("Expected target state %v, actual: %v", Unhealthy, testTarget.state) } @@ -93,17 +89,15 @@ func TestTargetScrapeWithFullChannel(t *testing.T) { } func TestTargetRecordScrapeHealth(t *testing.T) { - testTarget := target{ - url: "http://example.url", - baseLabels: clientmodel.LabelSet{clientmodel.JobLabel: "testjob"}, - httpClient: utility.NewDeadlineClient(0), - } + testTarget := NewTarget( + "http://example.url", 0, clientmodel.LabelSet{clientmodel.JobLabel: "testjob"}, + ).(*target) now := clientmodel.Now() - ingester := &collectResultIngester{} - testTarget.recordScrapeHealth(ingester, now, true, 2*time.Second) + appender := &collectResultAppender{} + testTarget.recordScrapeHealth(appender, now, true, 2*time.Second) - result := ingester.result + result := appender.result if len(result) != 2 { t.Fatalf("Expected two samples, got %d", len(result)) @@ -154,11 +148,11 @@ func TestTargetScrapeTimeout(t *testing.T) { defer server.Close() testTarget := NewTarget(server.URL, 10*time.Millisecond, clientmodel.LabelSet{}) - ingester := nopIngester{} + appender := nopAppender{} // scrape once without timeout signal <- true - if err := testTarget.(*target).scrape(ingester); err != nil { + if err := testTarget.(*target).scrape(appender); err != nil { t.Fatal(err) } @@ -167,12 +161,12 @@ func TestTargetScrapeTimeout(t *testing.T) { // now scrape again signal <- true - if err := testTarget.(*target).scrape(ingester); err != nil { + if err := testTarget.(*target).scrape(appender); err != nil { t.Fatal(err) } // now timeout - if err := testTarget.(*target).scrape(ingester); err == nil { + if err := testTarget.(*target).scrape(appender); err == nil { t.Fatal("expected scrape to timeout") } else { signal <- true // let handler continue @@ -180,7 +174,7 @@ func TestTargetScrapeTimeout(t *testing.T) { // now scrape again without timeout signal <- true - if err := testTarget.(*target).scrape(ingester); err != nil { + if err := testTarget.(*target).scrape(appender); err != nil { t.Fatal(err) } } @@ -196,10 +190,10 @@ func TestTargetScrape404(t *testing.T) { defer server.Close() testTarget := NewTarget(server.URL, 10*time.Millisecond, clientmodel.LabelSet{}) - ingester := nopIngester{} + appender := nopAppender{} want := errors.New("server returned HTTP status 404 Not Found") - got := testTarget.(*target).scrape(ingester) + got := testTarget.(*target).scrape(appender) if got == nil || want.Error() != got.Error() { t.Fatalf("want err %q, got %q", want, got) } @@ -213,7 +207,7 @@ func TestTargetRunScraperScrapes(t *testing.T) { scraperStopping: make(chan struct{}), scraperStopped: make(chan struct{}), } - go testTarget.RunScraper(nopIngester{}, time.Duration(time.Millisecond)) + go testTarget.RunScraper(nopAppender{}, time.Duration(time.Millisecond)) // Enough time for a scrape to happen. time.Sleep(2 * time.Millisecond) @@ -248,11 +242,11 @@ func BenchmarkScrape(b *testing.B) { 100*time.Millisecond, clientmodel.LabelSet{"dings": "bums"}, ) - ingester := nopIngester{} + appender := nopAppender{} b.ResetTimer() for i := 0; i < b.N; i++ { - if err := testTarget.(*target).scrape(ingester); err != nil { + if err := testTarget.(*target).scrape(appender); err != nil { b.Fatal(err) } } diff --git a/retrieval/targetmanager.go b/retrieval/targetmanager.go index 5df7dd686c..32f64947ac 100644 --- a/retrieval/targetmanager.go +++ b/retrieval/targetmanager.go @@ -17,11 +17,11 @@ import ( "sync" "github.com/golang/glog" - "github.com/prometheus/client_golang/extraction" clientmodel "github.com/prometheus/client_golang/model" "github.com/prometheus/prometheus/config" + "github.com/prometheus/prometheus/storage" ) // TargetManager manages all scrape targets. All methods are goroutine-safe. @@ -35,16 +35,18 @@ type TargetManager interface { } type targetManager struct { - sync.Mutex // Protects poolByJob. - poolsByJob map[string]*TargetPool - ingester extraction.Ingester + sync.Mutex // Protects poolByJob. + globalLabels clientmodel.LabelSet + sampleAppender storage.SampleAppender + poolsByJob map[string]*TargetPool } // NewTargetManager returns a newly initialized TargetManager ready to use. -func NewTargetManager(ingester extraction.Ingester) TargetManager { +func NewTargetManager(sampleAppender storage.SampleAppender, globalLabels clientmodel.LabelSet) TargetManager { return &targetManager{ - ingester: ingester, - poolsByJob: make(map[string]*TargetPool), + sampleAppender: sampleAppender, + globalLabels: globalLabels, + poolsByJob: make(map[string]*TargetPool), } } @@ -54,11 +56,11 @@ func (m *targetManager) targetPoolForJob(job config.JobConfig) *TargetPool { if !ok { var provider TargetProvider if job.SdName != nil { - provider = NewSdTargetProvider(job) + provider = NewSdTargetProvider(job, m.globalLabels) } interval := job.ScrapeInterval() - targetPool = NewTargetPool(m, provider, m.ingester, interval) + targetPool = NewTargetPool(provider, m.sampleAppender, interval) glog.Infof("Pool for job %s does not exist; creating and starting...", job.GetName()) m.poolsByJob[job.GetName()] = targetPool @@ -102,6 +104,9 @@ func (m *targetManager) AddTargetsFromConfig(config config.Config) { baseLabels := clientmodel.LabelSet{ clientmodel.JobLabel: clientmodel.LabelValue(job.GetName()), } + for n, v := range m.globalLabels { + baseLabels[n] = v + } if targetGroup.Labels != nil { for _, label := range targetGroup.Labels.Label { baseLabels[clientmodel.LabelName(label.GetName())] = clientmodel.LabelValue(label.GetValue()) diff --git a/retrieval/targetmanager_test.go b/retrieval/targetmanager_test.go index 095f697ef0..683386f8ca 100644 --- a/retrieval/targetmanager_test.go +++ b/retrieval/targetmanager_test.go @@ -21,9 +21,8 @@ import ( clientmodel "github.com/prometheus/client_golang/model" - "github.com/prometheus/client_golang/extraction" - pb "github.com/prometheus/prometheus/config/generated" + "github.com/prometheus/prometheus/storage" "github.com/prometheus/prometheus/config" ) @@ -62,13 +61,13 @@ func (t fakeTarget) LastScrape() time.Time { return t.lastScrape } -func (t fakeTarget) scrape(i extraction.Ingester) error { +func (t fakeTarget) scrape(storage.SampleAppender) error { t.scrapeCount++ return nil } -func (t fakeTarget) RunScraper(ingester extraction.Ingester, interval time.Duration) { +func (t fakeTarget) RunScraper(storage.SampleAppender, time.Duration) { return } @@ -82,8 +81,10 @@ func (t fakeTarget) State() TargetState { func (t *fakeTarget) SetBaseLabelsFrom(newTarget Target) {} +func (t *fakeTarget) Ingest(clientmodel.Samples) error { return nil } + func testTargetManager(t testing.TB) { - targetManager := NewTargetManager(nopIngester{}) + targetManager := NewTargetManager(nopAppender{}, nil) testJob1 := config.JobConfig{ JobConfig: pb.JobConfig{ Name: proto.String("test_job1"), diff --git a/retrieval/targetpool.go b/retrieval/targetpool.go index d8aa2613e9..9b1b005b65 100644 --- a/retrieval/targetpool.go +++ b/retrieval/targetpool.go @@ -19,7 +19,8 @@ import ( "time" "github.com/golang/glog" - "github.com/prometheus/client_golang/extraction" + + "github.com/prometheus/prometheus/storage" "github.com/prometheus/prometheus/utility" ) @@ -35,7 +36,7 @@ type TargetPool struct { manager TargetManager targetsByURL map[string]Target interval time.Duration - ingester extraction.Ingester + sampleAppender storage.SampleAppender addTargetQueue chan Target targetProvider TargetProvider @@ -44,11 +45,10 @@ type TargetPool struct { } // NewTargetPool creates a TargetPool, ready to be started by calling Run. -func NewTargetPool(m TargetManager, p TargetProvider, ing extraction.Ingester, i time.Duration) *TargetPool { +func NewTargetPool(p TargetProvider, app storage.SampleAppender, i time.Duration) *TargetPool { return &TargetPool{ - manager: m, interval: i, - ingester: ing, + sampleAppender: app, targetsByURL: make(map[string]Target), addTargetQueue: make(chan Target, targetAddQueueSize), targetProvider: p, @@ -100,7 +100,7 @@ func (p *TargetPool) addTarget(target Target) { defer p.Unlock() p.targetsByURL[target.URL()] = target - go target.RunScraper(p.ingester, p.interval) + go target.RunScraper(p.sampleAppender, p.interval) } // ReplaceTargets replaces the old targets by the provided new ones but reuses @@ -118,7 +118,7 @@ func (p *TargetPool) ReplaceTargets(newTargets []Target) { oldTarget.SetBaseLabelsFrom(newTarget) } else { p.targetsByURL[newTarget.URL()] = newTarget - go newTarget.RunScraper(p.ingester, p.interval) + go newTarget.RunScraper(p.sampleAppender, p.interval) } } diff --git a/retrieval/targetpool_test.go b/retrieval/targetpool_test.go index 202b59944b..2c9d3738e9 100644 --- a/retrieval/targetpool_test.go +++ b/retrieval/targetpool_test.go @@ -80,7 +80,7 @@ func testTargetPool(t testing.TB) { } for i, scenario := range scenarios { - pool := NewTargetPool(nil, nil, nopIngester{}, time.Duration(1)) + pool := NewTargetPool(nil, nopAppender{}, time.Duration(1)) for _, input := range scenario.inputs { target := target{ @@ -112,7 +112,7 @@ func TestTargetPool(t *testing.T) { } func TestTargetPoolReplaceTargets(t *testing.T) { - pool := NewTargetPool(nil, nil, nopIngester{}, time.Duration(1)) + pool := NewTargetPool(nil, nopAppender{}, time.Duration(1)) oldTarget1 := &target{ url: "example1", state: Unhealthy, diff --git a/rules/ast/ast.go b/rules/ast/ast.go index 0eeb3aae40..46165ee527 100644 --- a/rules/ast/ast.go +++ b/rules/ast/ast.go @@ -249,8 +249,10 @@ type ( } ) +// VectorMatchCardinality is an enum describing vector matches (1:1, n:1, 1:n, n:m). type VectorMatchCardinality int +// Constants for VectorMatchCardinality enum. const ( MatchOneToOne VectorMatchCardinality = iota MatchManyToOne @@ -880,7 +882,9 @@ func (node *VectorArithExpr) evalVectors(timestamp clientmodel.Timestamp, lhs, r Timestamp: timestamp, } result = append(result, ns) - added[hash] = added[hash] // Set existance to true. + if _, ok := added[hash]; !ok { + added[hash] = nil // Set existence to true. + } } } diff --git a/rules/helpers_test.go b/rules/helpers_test.go index 144e6a2b34..037fec86e6 100644 --- a/rules/helpers_test.go +++ b/rules/helpers_test.go @@ -63,7 +63,9 @@ func storeMatrix(storage local.Storage, matrix ast.Matrix) { }) } } - storage.AppendSamples(pendingSamples) + for _, s := range pendingSamples { + storage.Append(s) + } storage.WaitForIndexing() } diff --git a/rules/manager/manager.go b/rules/manager/manager.go index 34f6873cd8..8dfdc561a7 100644 --- a/rules/manager/manager.go +++ b/rules/manager/manager.go @@ -26,6 +26,7 @@ import ( "github.com/prometheus/prometheus/config" "github.com/prometheus/prometheus/notification" "github.com/prometheus/prometheus/rules" + "github.com/prometheus/prometheus/storage" "github.com/prometheus/prometheus/storage/local" "github.com/prometheus/prometheus/templates" ) @@ -94,7 +95,7 @@ type ruleManager struct { interval time.Duration storage local.Storage - results chan<- clientmodel.Samples + sampleAppender storage.SampleAppender notificationHandler *notification.NotificationHandler prometheusURL string @@ -106,7 +107,7 @@ type RuleManagerOptions struct { Storage local.Storage NotificationHandler *notification.NotificationHandler - Results chan<- clientmodel.Samples + SampleAppender storage.SampleAppender PrometheusURL string } @@ -120,7 +121,7 @@ func NewRuleManager(o *RuleManagerOptions) RuleManager { interval: o.EvaluationInterval, storage: o.Storage, - results: o.Results, + sampleAppender: o.SampleAppender, notificationHandler: o.NotificationHandler, prometheusURL: o.PrometheusURL, } @@ -145,7 +146,7 @@ func (m *ruleManager) Run() { select { case <-ticker.C: start := time.Now() - m.runIteration(m.results) + m.runIteration() iterationDuration.Observe(float64(time.Since(start) / time.Millisecond)) case <-m.done: return @@ -213,7 +214,7 @@ func (m *ruleManager) queueAlertNotifications(rule *rules.AlertingRule, timestam m.notificationHandler.SubmitReqs(notifications) } -func (m *ruleManager) runIteration(results chan<- clientmodel.Samples) { +func (m *ruleManager) runIteration() { now := clientmodel.Now() wg := sync.WaitGroup{} @@ -232,20 +233,10 @@ func (m *ruleManager) runIteration(results chan<- clientmodel.Samples) { vector, err := rule.Eval(now, m.storage) duration := time.Since(start) - samples := make(clientmodel.Samples, len(vector)) - for i, s := range vector { - samples[i] = &clientmodel.Sample{ - Metric: s.Metric.Metric, - Value: s.Value, - Timestamp: s.Timestamp, - } - } - if err != nil { evalFailures.Inc() glog.Warningf("Error while evaluating rule %q: %s", rule, err) - } else { - m.results <- samples + return } switch r := rule.(type) { @@ -261,9 +252,16 @@ func (m *ruleManager) runIteration(results chan<- clientmodel.Samples) { default: panic(fmt.Sprintf("Unknown rule type: %T", rule)) } + + for _, s := range vector { + m.sampleAppender.Append(&clientmodel.Sample{ + Metric: s.Metric.Metric, + Value: s.Value, + Timestamp: s.Timestamp, + }) + } }(rule) } - wg.Wait() } diff --git a/storage/local/interface.go b/storage/local/interface.go index bdfa5698d3..34ed30c7b5 100644 --- a/storage/local/interface.go +++ b/storage/local/interface.go @@ -24,16 +24,15 @@ import ( ) // Storage ingests and manages samples, along with various indexes. All methods -// except AppendSamples are goroutine-safe. +// are goroutine-safe. Storage implements storage.SampleAppender. type Storage interface { prometheus.Collector - // AppendSamples stores a group of new samples. Multiple samples for the - // same fingerprint need to be submitted in chronological order, from - // oldest to newest (both in the same call to AppendSamples and across - // multiple calls). When AppendSamples has returned, the appended - // samples might not be queryable immediately. (Use WaitForIndexing to - // wait for complete processing.) This method is not goroutine-safe. - AppendSamples(clientmodel.Samples) + // Append stores a sample in the Storage. Multiple samples for the same + // fingerprint need to be submitted in chronological order, from oldest + // to newest. When Append has returned, the appended sample might not be + // queryable immediately. (Use WaitForIndexing to wait for complete + // processing.) + Append(*clientmodel.Sample) // NewPreloader returns a new Preloader which allows preloading and pinning // series data into memory for use within a query. NewPreloader() Preloader diff --git a/storage/local/storage.go b/storage/local/storage.go index db14aa8404..e8dbe97dce 100644 --- a/storage/local/storage.go +++ b/storage/local/storage.go @@ -16,7 +16,6 @@ package local import ( "container/list" - "sync" "sync/atomic" "time" @@ -37,9 +36,6 @@ const ( fpMaxSweepTime = 6 * time.Hour maxEvictInterval = time.Minute - - appendWorkers = 16 // Should be enough to not make appending samples a bottleneck. - appendQueueCap = 2 * appendWorkers ) var ( @@ -65,10 +61,6 @@ type memorySeriesStorage struct { checkpointInterval time.Duration checkpointDirtySeriesLimit int - appendQueue chan *clientmodel.Sample - appendLastTimestamp clientmodel.Timestamp // The timestamp of the last sample sent to the append queue. - appendWaitGroup sync.WaitGroup // To wait for all appended samples to be processed. - persistQueueLen int64 // The number of chunks that need persistence. persistQueueCap int // If persistQueueLen reaches this threshold, ingestion will stall. // Note that internally, the chunks to persist are not organized in a queue-like data structure, @@ -135,9 +127,6 @@ func NewMemorySeriesStorage(o *MemorySeriesStorageOptions) (Storage, error) { checkpointInterval: o.CheckpointInterval, checkpointDirtySeriesLimit: o.CheckpointDirtySeriesLimit, - appendLastTimestamp: clientmodel.Earliest, - appendQueue: make(chan *clientmodel.Sample, appendQueueCap), - persistQueueLen: persistQueueLen, persistQueueCap: o.PersistenceQueueCapacity, persistence: p, @@ -187,15 +176,6 @@ func NewMemorySeriesStorage(o *MemorySeriesStorageOptions) (Storage, error) { }), } - for i := 0; i < appendWorkers; i++ { - go func() { - for sample := range s.appendQueue { - s.appendSample(sample) - s.appendWaitGroup.Done() - } - }() - } - return s, nil } @@ -209,11 +189,6 @@ func (s *memorySeriesStorage) Start() { func (s *memorySeriesStorage) Stop() error { glog.Info("Stopping local storage...") - glog.Info("Draining append queue...") - close(s.appendQueue) - s.appendWaitGroup.Wait() - glog.Info("Append queue drained.") - glog.Info("Stopping maintenance loop...") close(s.loopStopping) <-s.loopStopped @@ -236,9 +211,6 @@ func (s *memorySeriesStorage) Stop() error { // WaitForIndexing implements Storage. func (s *memorySeriesStorage) WaitForIndexing() { - // First let all goroutines appending samples stop. - s.appendWaitGroup.Wait() - // Only then wait for the persistence to index them. s.persistence.waitForIndexing() } @@ -363,32 +335,18 @@ func (s *memorySeriesStorage) GetMetricForFingerprint(fp clientmodel.Fingerprint } } -// AppendSamples implements Storage. -func (s *memorySeriesStorage) AppendSamples(samples clientmodel.Samples) { - for _, sample := range samples { - if s.getPersistQueueLen() >= s.persistQueueCap { - glog.Warningf("%d chunks waiting for persistence, sample ingestion suspended.", s.getPersistQueueLen()) - for s.getPersistQueueLen() >= s.persistQueueCap { - time.Sleep(time.Second) - } - glog.Warning("Sample ingestion resumed.") +// Append implements Storage. +func (s *memorySeriesStorage) Append(sample *clientmodel.Sample) { + if s.getPersistQueueLen() >= s.persistQueueCap { + glog.Warningf( + "%d chunks waiting for persistence, sample ingestion suspended.", + s.getPersistQueueLen(), + ) + for s.getPersistQueueLen() >= s.persistQueueCap { + time.Sleep(time.Second) } - if sample.Timestamp != s.appendLastTimestamp { - // Timestamp has changed. We have to wait for processing - // of all appended samples before proceeding. Otherwise, - // we might violate the storage contract that each - // sample appended to a given series has to have a - // timestamp greater or equal to the previous sample - // appended to that series. - s.appendWaitGroup.Wait() - s.appendLastTimestamp = sample.Timestamp - } - s.appendWaitGroup.Add(1) - s.appendQueue <- sample + glog.Warning("Sample ingestion resumed.") } -} - -func (s *memorySeriesStorage) appendSample(sample *clientmodel.Sample) { fp := sample.Metric.Fingerprint() s.fpLocker.Lock(fp) series := s.getOrCreateSeries(fp, sample.Metric) diff --git a/storage/local/storage_test.go b/storage/local/storage_test.go index 21956519c1..8f155060d1 100644 --- a/storage/local/storage_test.go +++ b/storage/local/storage_test.go @@ -48,8 +48,9 @@ func TestGetFingerprintsForLabelMatchers(t *testing.T) { } fingerprints[i] = metric.Fingerprint() } - - storage.AppendSamples(samples) + for _, s := range samples { + storage.Append(s) + } storage.WaitForIndexing() newMatcher := func(matchType metric.MatchType, name clientmodel.LabelName, value clientmodel.LabelValue) *metric.LabelMatcher { @@ -166,7 +167,9 @@ func TestLoop(t *testing.T) { t.Fatalf("Error creating storage: %s", err) } storage.Start() - storage.AppendSamples(samples) + for _, s := range samples { + storage.Append(s) + } storage.WaitForIndexing() series, _ := storage.(*memorySeriesStorage).fpToSeries.get(clientmodel.Metric{}.Fingerprint()) cdsBefore := len(series.chunkDescs) @@ -192,7 +195,9 @@ func testChunk(t *testing.T, encoding chunkEncoding) { s, closer := NewTestStorage(t, encoding) defer closer.Close() - s.AppendSamples(samples) + for _, sample := range samples { + s.Append(sample) + } s.WaitForIndexing() for m := range s.(*memorySeriesStorage).fpToSeries.iter() { @@ -240,7 +245,9 @@ func testGetValueAtTime(t *testing.T, encoding chunkEncoding) { s, closer := NewTestStorage(t, encoding) defer closer.Close() - s.AppendSamples(samples) + for _, sample := range samples { + s.Append(sample) + } s.WaitForIndexing() fp := clientmodel.Metric{}.Fingerprint() @@ -331,7 +338,9 @@ func testGetRangeValues(t *testing.T, encoding chunkEncoding) { s, closer := NewTestStorage(t, encoding) defer closer.Close() - s.AppendSamples(samples) + for _, sample := range samples { + s.Append(sample) + } s.WaitForIndexing() fp := clientmodel.Metric{}.Fingerprint() @@ -483,7 +492,9 @@ func testEvictAndPurgeSeries(t *testing.T, encoding chunkEncoding) { ms := s.(*memorySeriesStorage) // Going to test the internal maintain.*Series methods. - s.AppendSamples(samples) + for _, sample := range samples { + s.Append(sample) + } s.WaitForIndexing() fp := clientmodel.Metric{}.Fingerprint() @@ -518,7 +529,9 @@ func testEvictAndPurgeSeries(t *testing.T, encoding chunkEncoding) { } // Recreate series. - s.AppendSamples(samples) + for _, sample := range samples { + s.Append(sample) + } s.WaitForIndexing() series, ok := ms.fpToSeries.get(fp) @@ -592,7 +605,9 @@ func benchmarkAppend(b *testing.B, encoding chunkEncoding) { s, closer := NewTestStorage(b, encoding) defer closer.Close() - s.AppendSamples(samples) + for _, sample := range samples { + s.Append(sample) + } } func BenchmarkAppendType0(b *testing.B) { @@ -616,7 +631,9 @@ func testFuzz(t *testing.T, encoding chunkEncoding) { defer c.Close() samples := createRandomSamples("test_fuzz", 1000) - s.AppendSamples(samples) + for _, sample := range samples { + s.Append(sample) + } return verifyStorage(t, s, samples, 24*7*time.Hour) } @@ -672,9 +689,13 @@ func benchmarkFuzz(b *testing.B, encoding chunkEncoding) { start := samplesPerRun * i end := samplesPerRun * (i + 1) middle := (start + end) / 2 - s.AppendSamples(samples[start:middle]) + for _, sample := range samples[start:middle] { + s.Append(sample) + } verifyStorage(b, s, samples[:middle], o.PersistenceRetentionPeriod) - s.AppendSamples(samples[middle:end]) + for _, sample := range samples[middle:end] { + s.Append(sample) + } verifyStorage(b, s, samples[:end], o.PersistenceRetentionPeriod) } } diff --git a/storage/remote/queue_manager.go b/storage/remote/queue_manager.go index 71bae548e2..475b4b1067 100644 --- a/storage/remote/queue_manager.go +++ b/storage/remote/queue_manager.go @@ -53,7 +53,7 @@ type TSDBClient interface { // by the provided TSDBClient. type TSDBQueueManager struct { tsdb TSDBClient - queue chan clientmodel.Samples + queue chan *clientmodel.Sample pendingSamples clientmodel.Samples sendSemaphore chan bool drained chan bool @@ -69,7 +69,7 @@ type TSDBQueueManager struct { func NewTSDBQueueManager(tsdb TSDBClient, queueCapacity int) *TSDBQueueManager { return &TSDBQueueManager{ tsdb: tsdb, - queue: make(chan clientmodel.Samples, queueCapacity), + queue: make(chan *clientmodel.Sample, queueCapacity), sendSemaphore: make(chan bool, maxConcurrentSends), drained: make(chan bool), @@ -112,17 +112,14 @@ func NewTSDBQueueManager(tsdb TSDBClient, queueCapacity int) *TSDBQueueManager { } } -// Queue queues a sample batch to be sent to the TSDB. It drops the most -// recently queued samples on the floor if the queue is full. -func (t *TSDBQueueManager) Queue(s clientmodel.Samples) { - if len(s) == 0 { - return - } +// Append queues a sample to be sent to the TSDB. It drops the sample on the +// floor if the queue is full. It implements storage.SampleAppender. +func (t *TSDBQueueManager) Append(s *clientmodel.Sample) { select { case t.queue <- s: default: - t.samplesCount.WithLabelValues(dropped).Add(float64(len(s))) - glog.Warningf("TSDB queue full, discarding %d samples", len(s)) + t.samplesCount.WithLabelValues(dropped).Inc() + glog.Warning("TSDB queue full, discarding sample.") } } @@ -195,7 +192,7 @@ func (t *TSDBQueueManager) Run() { return } - t.pendingSamples = append(t.pendingSamples, s...) + t.pendingSamples = append(t.pendingSamples, s) for len(t.pendingSamples) >= maxSamplesPerSend { go t.sendSamples(t.pendingSamples[:maxSamplesPerSend]) diff --git a/storage/remote/queue_manager_test.go b/storage/remote/queue_manager_test.go index 83429efa84..344d77ce4c 100644 --- a/storage/remote/queue_manager_test.go +++ b/storage/remote/queue_manager_test.go @@ -63,13 +63,16 @@ func TestSampleDelivery(t *testing.T) { c := &TestTSDBClient{} c.expectSamples(samples[:len(samples)/2]) - m := NewTSDBQueueManager(c, 1) + m := NewTSDBQueueManager(c, len(samples)/2) // These should be received by the client. - m.Queue(samples[:len(samples)/2]) + for _, s := range samples[:len(samples)/2] { + m.Append(s) + } // These will be dropped because the queue is full. - m.Queue(samples[len(samples)/2:]) - + for _, s := range samples[len(samples)/2:] { + m.Append(s) + } go m.Run() defer m.Stop() diff --git a/storage/storage.go b/storage/storage.go new file mode 100644 index 0000000000..4984f9d68e --- /dev/null +++ b/storage/storage.go @@ -0,0 +1,38 @@ +// Copyright 2015 The Prometheus Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package storage + +import ( + clientmodel "github.com/prometheus/client_golang/model" +) + +// SampleAppender is the interface to append samples to both, local and remote +// storage. +type SampleAppender interface { + Append(*clientmodel.Sample) +} + +// Tee is a SampleAppender that appends every sample to two other +// SampleAppenders. +type Tee struct { + Appender1, Appender2 SampleAppender +} + +// Append implements SampleAppender. It appends the provided sample first +// to Appender1, then to Appender2, waiting for each to return before +// proceeding. +func (t Tee) Append(s *clientmodel.Sample) { + t.Appender1.Append(s) + t.Appender2.Append(s) +} diff --git a/templates/templates_test.go b/templates/templates_test.go index ccb7899bc8..5492115181 100644 --- a/templates/templates_test.go +++ b/templates/templates_test.go @@ -154,19 +154,17 @@ func TestTemplateExpansion(t *testing.T) { storage, closer := local.NewTestStorage(t, 1) defer closer.Close() - storage.AppendSamples(clientmodel.Samples{ - { - Metric: clientmodel.Metric{ - clientmodel.MetricNameLabel: "metric", - "instance": "a"}, - Value: 11, - }, - { - Metric: clientmodel.Metric{ - clientmodel.MetricNameLabel: "metric", - "instance": "b"}, - Value: 21, - }, + storage.Append(&clientmodel.Sample{ + Metric: clientmodel.Metric{ + clientmodel.MetricNameLabel: "metric", + "instance": "a"}, + Value: 11, + }) + storage.Append(&clientmodel.Sample{ + Metric: clientmodel.Metric{ + clientmodel.MetricNameLabel: "metric", + "instance": "b"}, + Value: 21, }) storage.WaitForIndexing() diff --git a/web/web.go b/web/web.go index cff25c55d6..9c37e12cbd 100644 --- a/web/web.go +++ b/web/web.go @@ -48,7 +48,7 @@ type WebService struct { AlertsHandler *AlertsHandler ConsolesHandler *ConsolesHandler - QuitDelegate func() + QuitChan chan struct{} } // ServeForever serves the HTTP endpoints and only returns upon errors. @@ -109,7 +109,7 @@ func (ws WebService) quitHandler(w http.ResponseWriter, r *http.Request) { fmt.Fprintf(w, "Requesting termination... Goodbye!") - ws.QuitDelegate() + close(ws.QuitChan) } func getTemplateFile(name string) (string, error) {