From 240142bcbda1535f8aec6f49c3ff5afb8ea94571 Mon Sep 17 00:00:00 2001 From: Johannes Schmidt Date: Fri, 12 Dec 2025 13:27:02 +0100 Subject: [PATCH] Refactor OpenTsdbWriter to use a WorkQueue --- lib/perfdata/opentsdbwriter.cpp | 119 +++++++++++++++++++++----------- lib/perfdata/opentsdbwriter.hpp | 7 +- 2 files changed, 84 insertions(+), 42 deletions(-) diff --git a/lib/perfdata/opentsdbwriter.cpp b/lib/perfdata/opentsdbwriter.cpp index 002639792..95b43ba76 100644 --- a/lib/perfdata/opentsdbwriter.cpp +++ b/lib/perfdata/opentsdbwriter.cpp @@ -36,6 +36,8 @@ void OpenTsdbWriter::OnConfigLoaded() { ObjectImpl::OnConfigLoaded(); + m_WorkQueue.SetName("OpenTsdbWriter, " + GetName()); + if (!GetEnableHa()) { Log(LogDebug, "OpenTsdbWriter") << "HA functionality disabled. Won't pause connection: " << GetName(); @@ -51,14 +53,26 @@ void OpenTsdbWriter::OnConfigLoaded() * * @param status Key value pairs for feature stats */ -void OpenTsdbWriter::StatsFunc(const Dictionary::Ptr& status, const Array::Ptr&) +void OpenTsdbWriter::StatsFunc(const Dictionary::Ptr& status, const Array::Ptr& perfdata) { DictionaryData nodes; for (const OpenTsdbWriter::Ptr& opentsdbwriter : ConfigType::GetObjectsByType()) { - nodes.emplace_back(opentsdbwriter->GetName(), new Dictionary({ - { "connected", opentsdbwriter->GetConnected() } - })); + size_t workQueueItems = opentsdbwriter->m_WorkQueue.GetLength(); + double workQueueItemRate = opentsdbwriter->m_WorkQueue.GetTaskCount(60) / 60.0; + + nodes.emplace_back( + opentsdbwriter->GetName(), + new Dictionary({ + {"connected", opentsdbwriter->GetConnected()}, + {"work_queue_items", workQueueItems}, + {"work_queue_item_rate", workQueueItemRate} + } + ) + ); + + perfdata->Add(new PerfdataValue("opentsdbwriter_" + opentsdbwriter->GetName() + "_work_queue_items", workQueueItems)); + perfdata->Add(new PerfdataValue("opentsdbwriter_" + opentsdbwriter->GetName() + "_work_queue_item_rate", workQueueItemRate)); } status->Set("opentsdbwriter", new Dictionary(std::move(nodes))); @@ -74,6 +88,11 @@ void OpenTsdbWriter::Resume() Log(LogInformation, "OpentsdbWriter") << "'" << GetName() << "' resumed."; + m_WorkQueue.SetExceptionCallback([](const boost::exception_ptr& exp) { + Log(LogDebug, "OpenTsdbWriter") + << "Exception during OpenTsdb operation: " << DiagnosticInformation(exp); + }); + ReadConfigTemplate(); m_ReconnectTimer = Timer::Create(); @@ -95,6 +114,8 @@ void OpenTsdbWriter::Pause() m_HandleCheckResults.disconnect(); m_ReconnectTimer->Stop(true); + m_WorkQueue.Join(); + Log(LogInformation, "OpentsdbWriter") << "'" << GetName() << "' paused."; @@ -251,7 +272,7 @@ void OpenTsdbWriter::CheckResultHandler(const Checkable::Ptr& checkable, const C String escaped_hostName = EscapeTag(host->GetName()); tags["host"] = escaped_hostName; - double ts = cr->GetExecutionEnd(); + std::vector> metadata; if (service) { @@ -262,40 +283,51 @@ void OpenTsdbWriter::CheckResultHandler(const Checkable::Ptr& checkable, const C String escaped_serviceName = EscapeMetric(serviceName); metric = "icinga.service." + escaped_serviceName; } - - SendMetric(checkable, metric + ".state", tags, service->GetState(), ts); + metadata.emplace_back("state", service->GetState()); } else { if (!config_tmpl_metric.IsEmpty()) { metric = config_tmpl_metric; } else { metric = "icinga.host"; } - SendMetric(checkable, metric + ".state", tags, host->GetState(), ts); + metadata.emplace_back("state", host->GetState()); } - SendMetric(checkable, metric + ".state_type", tags, checkable->GetStateType(), ts); - SendMetric(checkable, metric + ".reachable", tags, checkable->IsReachable(), ts); - SendMetric(checkable, metric + ".downtime_depth", tags, checkable->GetDowntimeDepth(), ts); - SendMetric(checkable, metric + ".acknowledgement", tags, checkable->GetAcknowledgement(), ts); + metadata.emplace_back("state_type", checkable->GetStateType()); + metadata.emplace_back("reachable", checkable->IsReachable()); + metadata.emplace_back("downtime_depth", checkable->GetDowntimeDepth()); + metadata.emplace_back("acknowledgement", checkable->GetAcknowledgement()); - SendPerfdata(checkable, metric, tags, cr, ts); + m_WorkQueue.Enqueue( + [this, checkable, service, cr, metric = std::move(metric), tags = std::move(tags), metadata = std::move(metadata), ts]() mutable { + double ts = cr->GetExecutionEnd(); - metric = "icinga.check"; + for (auto& [name, val] : metadata) { + AddMetric(checkable, metric + "." + name, tags, val, ts); + } - if (service) { - tags["type"] = "service"; - String serviceName = service->GetShortName(); - String escaped_serviceName = EscapeTag(serviceName); - tags["service"] = escaped_serviceName; - } else { - tags["type"] = "host"; - } + AddPerfdata(checkable, metric, tags, cr, ts); - SendMetric(checkable, metric + ".current_attempt", tags, checkable->GetCheckAttempt(), ts); - SendMetric(checkable, metric + ".max_check_attempts", tags, checkable->GetMaxCheckAttempts(), ts); - SendMetric(checkable, metric + ".latency", tags, cr->CalculateLatency(), ts); - SendMetric(checkable, metric + ".execution_time", tags, cr->CalculateExecutionTime(), ts); + metric = "icinga.check"; + + if (service) { + tags["type"] = "service"; + String serviceName = service->GetShortName(); + String escaped_serviceName = EscapeTag(serviceName); + tags["service"] = escaped_serviceName; + } else { + tags["type"] = "host"; + } + + AddMetric(checkable, metric + ".current_attempt", tags, checkable->GetCheckAttempt(), ts); + AddMetric(checkable, metric + ".max_check_attempts", tags, checkable->GetMaxCheckAttempts(), ts); + AddMetric(checkable, metric + ".latency", tags, cr->CalculateLatency(), ts); + AddMetric(checkable, metric + ".execution_time", tags, cr->CalculateExecutionTime(), ts); + + SendMsgBuffer(); + } + ); } /** @@ -307,9 +339,11 @@ void OpenTsdbWriter::CheckResultHandler(const Checkable::Ptr& checkable, const C * @param cr Check result containing performance data * @param ts Timestamp when the check result was received */ -void OpenTsdbWriter::SendPerfdata(const Checkable::Ptr& checkable, const String& metric, +void OpenTsdbWriter::AddPerfdata(const Checkable::Ptr& checkable, const String& metric, const std::map& tags, const CheckResult::Ptr& cr, double ts) { + ASSERT(m_WorkQueue.IsWorkerThread()); + Array::Ptr perfdata = cr->GetPerformanceData(); if (!perfdata) @@ -350,21 +384,21 @@ void OpenTsdbWriter::SendPerfdata(const Checkable::Ptr& checkable, const String& tags_new["label"] = escaped_key; } - SendMetric(checkable, metric_name, tags_new, pdv->GetValue(), ts); + AddMetric(checkable, metric_name, tags_new, pdv->GetValue(), ts); if (!pdv->GetCrit().IsEmpty()) - SendMetric(checkable, metric_name + "_crit", tags_new, pdv->GetCrit(), ts); + AddMetric(checkable, metric_name + "_crit", tags_new, pdv->GetCrit(), ts); if (!pdv->GetWarn().IsEmpty()) - SendMetric(checkable, metric_name + "_warn", tags_new, pdv->GetWarn(), ts); + AddMetric(checkable, metric_name + "_warn", tags_new, pdv->GetWarn(), ts); if (!pdv->GetMin().IsEmpty()) - SendMetric(checkable, metric_name + "_min", tags_new, pdv->GetMin(), ts); + AddMetric(checkable, metric_name + "_min", tags_new, pdv->GetMin(), ts); if (!pdv->GetMax().IsEmpty()) - SendMetric(checkable, metric_name + "_max", tags_new, pdv->GetMax(), ts); + AddMetric(checkable, metric_name + "_max", tags_new, pdv->GetMax(), ts); } } /** - * Send given metric to OpenTSDB + * Add given metric to the data buffer to be later sent to OpenTSDB * * @param checkable Host/service object * @param metric Full metric name @@ -372,9 +406,11 @@ void OpenTsdbWriter::SendPerfdata(const Checkable::Ptr& checkable, const String& * @param value Floating point metric value * @param ts Timestamp where the metric was received from the check result */ -void OpenTsdbWriter::SendMetric(const Checkable::Ptr& checkable, const String& metric, +void OpenTsdbWriter::AddMetric(const Checkable::Ptr& checkable, const String& metric, const std::map& tags, double value, double ts) { + ASSERT(m_WorkQueue.IsWorkerThread()); + String tags_string = ""; for (auto& tag : tags) { @@ -394,18 +430,21 @@ void OpenTsdbWriter::SendMetric(const Checkable::Ptr& checkable, const String& m /* do not send \n to debug log */ msgbuf << "\n"; - String put = msgbuf.str(); + m_MsgBuf.append(msgbuf.str()); +} - ObjectLock olock(this); +void OpenTsdbWriter::SendMsgBuffer() +{ + ASSERT(m_WorkQueue.IsWorkerThread()); if (!GetConnected()) return; - try { - Log(LogDebug, "OpenTsdbWriter") - << "Checkable '" << checkable->GetName() << "' sending message '" << put << "'."; + Log(LogDebug, "OpenTsdbWriter") + << "Flushing data buffer to OpenTsdb."; - boost::asio::write(*m_Stream, boost::asio::buffer(msgbuf.str())); + try { + boost::asio::write(*m_Stream, boost::asio::buffer(std::exchange(m_MsgBuf, std::string{}))); m_Stream->flush(); } catch (const std::exception& ex) { Log(LogCritical, "OpenTsdbWriter") diff --git a/lib/perfdata/opentsdbwriter.hpp b/lib/perfdata/opentsdbwriter.hpp index cd3f2efc4..eb85f4436 100644 --- a/lib/perfdata/opentsdbwriter.hpp +++ b/lib/perfdata/opentsdbwriter.hpp @@ -36,6 +36,8 @@ protected: void Pause() override; private: + WorkQueue m_WorkQueue{10000000, 1}; + std::string m_MsgBuf; Shared::Ptr m_Stream; boost::signals2::connection m_HandleCheckResults; @@ -45,9 +47,10 @@ private: Dictionary::Ptr m_HostConfigTemplate; void CheckResultHandler(const Checkable::Ptr& checkable, const CheckResult::Ptr& cr); - void SendMetric(const Checkable::Ptr& checkable, const String& metric, + void AddMetric(const Checkable::Ptr& checkable, const String& metric, const std::map& tags, double value, double ts); - void SendPerfdata(const Checkable::Ptr& checkable, const String& metric, + void SendMsgBuffer(); + void AddPerfdata(const Checkable::Ptr& checkable, const String& metric, const std::map& tags, const CheckResult::Ptr& cr, double ts); static String EscapeTag(const String& str); static String EscapeMetric(const String& str);