From 25bf5fdaf57748c8522f718480396d0a01f0cc3a Mon Sep 17 00:00:00 2001 From: Fabian Reinartz Date: Fri, 21 Aug 2015 17:44:24 +0200 Subject: [PATCH] Timeout sample appends --- retrieval/target.go | 44 +++++++++++++++++++++++++++++++++++++++----- 1 file changed, 39 insertions(+), 5 deletions(-) diff --git a/retrieval/target.go b/retrieval/target.go index 2bab854442..622ae505fe 100644 --- a/retrieval/target.go +++ b/retrieval/target.go @@ -150,7 +150,7 @@ type Target struct { // Closing scraperStopped signals that scraping has been stopped. scraperStopped chan struct{} // Channel to buffer ingested samples. - ingestedSamples chan model.Samples + ingestedSamples chan model.Vector // Mutex protects the members below. sync.RWMutex @@ -376,6 +376,26 @@ func (t *Target) StopScraper() { log.Debugf("Scraper for target %v stopped.", t) } +func (t *Target) ingest(s model.Vector) error { + t.RLock() + deadline := t.deadline + t.RUnlock() + // Since the regular case is that ingestedSamples is ready to receive, + // first try without setting a timeout so that we don't need to allocate + // a timer most of the time. + select { + case t.ingestedSamples <- s: + return nil + default: + select { + case t.ingestedSamples <- s: + return nil + case <-time.After(deadline / 10): + return errIngestChannelFull + } + } +} + const acceptHeader = `application/vnd.google.protobuf;proto=io.prometheus.client.MetricFamily;encoding=delimited;q=0.7,text/plain;version=0.0.4;q=0.3,application/json;schema="prometheus/telemetry";version=0.0.2;q=0.2,*/*;q=0.1` func (t *Target) scrape(sampleAppender storage.SampleAppender) (err error) { @@ -422,13 +442,27 @@ func (t *Target) scrape(sampleAppender storage.SampleAppender) (err error) { }, } - var samples model.Vector + t.ingestedSamples = make(chan model.Vector, ingestedSamplesCap) - for { - if err = sdec.Decode(&samples); err != nil { - break + go func() { + for { + // TODO(fabxc): Changex the SampleAppender interface to return an error + // so we can proceed based on the status and don't leak goroutines trying + // to append a single sample after dropping all the other ones. + // + // This will also allow use to reuse this vector and save allocations. + var samples model.Vector + if err = sdec.Decode(&samples); err != nil { + break + } + if err = t.ingest(samples); err != nil { + break + } } + close(t.ingestedSamples) + }() + for samples := range t.ingestedSamples { for _, s := range samples { if honorLabels { // Merge the metric with the baseLabels for labels not already set in the