From dab2522acc538c4a5bb9402c1db90c6d25f53cfe Mon Sep 17 00:00:00 2001 From: Michael Friedrich Date: Fri, 26 May 2017 17:03:49 +0200 Subject: [PATCH] InfluxDB: Optimize work queue event handling refs #5219 --- lib/perfdata/influxdbwriter.cpp | 28 +++++++++++++++++----------- lib/perfdata/influxdbwriter.hpp | 3 +-- 2 files changed, 18 insertions(+), 13 deletions(-) diff --git a/lib/perfdata/influxdbwriter.cpp b/lib/perfdata/influxdbwriter.cpp index a55c0312f..7e7137da8 100644 --- a/lib/perfdata/influxdbwriter.cpp +++ b/lib/perfdata/influxdbwriter.cpp @@ -43,6 +43,7 @@ #include #include #include +#include #include #include @@ -52,7 +53,6 @@ REGISTER_TYPE(InfluxdbWriter); REGISTER_STATSFUNCTION(InfluxdbWriter, &InfluxdbWriter::StatsFunc); -//TODO: Evaluate whether multiple WQ threads and InfluxDB connections are possible. 10 threads will hog InfluxDB in large scale environments. InfluxdbWriter::InfluxdbWriter(void) : m_WorkQueue(10000000, 1) { } @@ -64,7 +64,7 @@ void InfluxdbWriter::OnConfigLoaded(void) m_WorkQueue.SetName("InfluxdbWriter, " + GetName()); } -void InfluxdbWriter::StatsFunc(const Dictionary::Ptr& status, const Array::Ptr&) +void InfluxdbWriter::StatsFunc(const Dictionary::Ptr& status, const Array::Ptr& perfdata) { Dictionary::Ptr nodes = new Dictionary(); @@ -73,13 +73,16 @@ void InfluxdbWriter::StatsFunc(const Dictionary::Ptr& status, const Array::Ptr&) double workQueueItemRate = influxdbwriter->m_WorkQueue.GetTaskCount(60) / 60.0; size_t dataBufferItems = influxdbwriter->m_DataBuffer.size(); - //TODO: Collect more stats Dictionary::Ptr stats = new Dictionary(); stats->Set("work_queue_items", workQueueItems); stats->Set("work_queue_item_rate", workQueueItemRate); stats->Set("data_buffer_items", dataBufferItems); nodes->Set(influxdbwriter->GetName(), stats); + + perfdata->Add(new PerfdataValue("influxdbwriter_" + influxdbwriter->GetName() + "_work_queue_items", workQueueItems)); + perfdata->Add(new PerfdataValue("influxdbwriter_" + influxdbwriter->GetName() + "_work_queue_item_rate", workQueueItemRate)); + perfdata->Add(new PerfdataValue("influxdbwriter_" + influxdbwriter->GetName() + "_data_queue_items", dataBufferItems)); } status->Set("influxdbwriter", nodes); @@ -173,6 +176,13 @@ Stream::Ptr InfluxdbWriter::Connect(TcpSocket::Ptr& socket) void InfluxdbWriter::CheckResultHandler(const Checkable::Ptr& checkable, const CheckResult::Ptr& cr) { + m_WorkQueue.Enqueue(boost::bind(&InfluxdbWriter::InternalCheckResultHandler, this, checkable, cr)); +} + +void InfluxdbWriter::InternalCheckResultHandler(const Checkable::Ptr& checkable, const CheckResult::Ptr& cr) +{ + AssertOnWorkQueue(); + CONTEXT("Processing check result for '" + checkable->GetName() + "'"); if (!IcingaApplication::GetInstance()->GetEnablePerfdata() || !checkable->GetEnablePerfdata()) @@ -329,6 +339,10 @@ String InfluxdbWriter::EscapeField(const String& str) if (boost::regex_match(str.GetData(), boolean_false)) return "false"; + // Handle NaNs + if (boost::math::isnan(str)) + return 0; + // Otherwise it's a string and needs escaping and quoting String result = str; boost::algorithm::replace_all(result, "\"", "\\\""); @@ -409,14 +423,6 @@ void InfluxdbWriter::Flush(void) String body = boost::algorithm::join(m_DataBuffer, "\n"); m_DataBuffer.clear(); - // Asynchronously flush the metric body to InfluxDB - m_WorkQueue.Enqueue(boost::bind(&InfluxdbWriter::FlushHandler, this, body)); -} - -void InfluxdbWriter::FlushHandler(const String& body) -{ - AssertOnWorkQueue(); - TcpSocket::Ptr socket; Stream::Ptr stream = Connect(socket); diff --git a/lib/perfdata/influxdbwriter.hpp b/lib/perfdata/influxdbwriter.hpp index f0bbf305c..dba570c45 100644 --- a/lib/perfdata/influxdbwriter.hpp +++ b/lib/perfdata/influxdbwriter.hpp @@ -62,13 +62,12 @@ private: boost::mutex m_DataBufferMutex; void CheckResultHandler(const Checkable::Ptr& checkable, const CheckResult::Ptr& cr); + void InternalCheckResultHandler(const Checkable::Ptr& checkable, const CheckResult::Ptr& cr); void SendPerfdata(const Dictionary::Ptr& tmpl, const Checkable::Ptr& checkable, const CheckResult::Ptr& cr, double ts); void SendMetric(const Dictionary::Ptr& tmpl, const String& label, const Dictionary::Ptr& fields, double ts); void FlushTimeout(void); void Flush(void); - void FlushHandler(const String& body); - static String FormatInteger(int val); static String FormatBoolean(bool val);