From e52320fbebb4a1efd6b1880559691dfe979602e3 Mon Sep 17 00:00:00 2001 From: Johannes Schmidt Date: Thu, 30 Apr 2026 15:10:58 +0200 Subject: [PATCH] Add `flush_threshold/flush_interval` logic to `OpenTsdbWriter` --- doc/09-object-types.md | 2 ++ lib/perfdata/opentsdbwriter.cpp | 33 ++++++++++++++++++++++++++++++--- lib/perfdata/opentsdbwriter.hpp | 5 ++++- lib/perfdata/opentsdbwriter.ti | 6 ++++++ 4 files changed, 42 insertions(+), 4 deletions(-) diff --git a/doc/09-object-types.md b/doc/09-object-types.md index e4c260f27..791b25811 100644 --- a/doc/09-object-types.md +++ b/doc/09-object-types.md @@ -1869,6 +1869,8 @@ Configuration Attributes: --------------------------|-----------------------|---------------------------------- host | String | **Optional.** OpenTSDB host address. Defaults to `127.0.0.1`. port | Number | **Optional.** OpenTSDB port. Defaults to `4242`. + flush\_interval | Duration | **Optional.** How long to buffer data points before sending. Defaults to `15s`. + flush\_threshold | Number | **Optional.** How many bytes to buffer before forcing a flush to the backend. Defaults to `2MiB`. diconnect\_timeout | Duration | **Optional.** Timeout to wait for any outstanding data to be flushed to OpenTSDB before disconnecting. Defaults to `10s`. enable\_ha | Boolean | **Optional.** Enable the high availability functionality. Only valid in a [cluster setup](06-distributed-monitoring.md#distributed-monitoring-high-availability-features). Defaults to `false`. enable_generic_metrics | Boolean | **Optional.** Re-use metric names to store different perfdata values for a particular check. Use tags to distinguish perfdata instead of metric name. Defaults to `false`. diff --git a/lib/perfdata/opentsdbwriter.cpp b/lib/perfdata/opentsdbwriter.cpp index 1b2f82a7d..e5ea9ea51 100644 --- a/lib/perfdata/opentsdbwriter.cpp +++ b/lib/perfdata/opentsdbwriter.cpp @@ -2,6 +2,7 @@ // SPDX-License-Identifier: GPL-2.0-or-later #include "perfdata/opentsdbwriter.hpp" +#include "base/defer.hpp" #include "perfdata/opentsdbwriter-ti.cpp" #include "icinga/service.hpp" #include "icinga/checkcommand.hpp" @@ -88,6 +89,13 @@ void OpenTsdbWriter::Resume() << "Exception during OpenTsdb operation: " << DiagnosticInformation(exp); }); + /* Setup timer for periodically flushing m_DataBuffer */ + m_FlushTimer = Timer::Create(); + m_FlushTimer->SetInterval(GetFlushInterval()); + m_FlushTimer->OnTimerExpired.connect([this](const Timer * const&) { FlushTimeout(); }); + m_FlushTimer->Start(); + m_FlushTimer->Reschedule(0); + ReadConfigTemplate(); m_Connection = new PerfdataWriterConnection{this, GetHost(), GetPort()}; @@ -104,6 +112,8 @@ void OpenTsdbWriter::Pause() { m_HandleCheckResults.disconnect(); + m_FlushTimer->Stop(true); + std::promise queueDonePromise; m_WorkQueue.Enqueue([&]() { @@ -282,7 +292,9 @@ void OpenTsdbWriter::CheckResultHandler(const Checkable::Ptr& checkable, const C AddMetric(checkable, metric + ".latency", tags, cr->CalculateLatency(), ts); AddMetric(checkable, metric + ".execution_time", tags, cr->CalculateExecutionTime(), ts); - SendMsgBuffer(); + if (GetFlushThreshold() <= m_MsgBuf.GetLength()) { + SendMsgBuffer(); + } } ); } @@ -387,7 +399,22 @@ void OpenTsdbWriter::AddMetric(const Checkable::Ptr& checkable, const String& me /* do not send \n to debug log */ msgbuf << "\n"; - m_MsgBuf.append(msgbuf.str()); + m_MsgBuf += msgbuf.str(); +} + +/** + * Queues a Flush on the work-queue if none is queued yet. + */ +void OpenTsdbWriter::FlushTimeout() +{ + if (m_FlushTimerInQueue.exchange(true, std::memory_order_relaxed)) { + return; + } + + m_WorkQueue.Enqueue([&]() { + Defer resetFlushTimer{[&]() { m_FlushTimerInQueue.store(false, std::memory_order_relaxed); }}; + SendMsgBuffer(); + }); } void OpenTsdbWriter::SendMsgBuffer() @@ -398,7 +425,7 @@ void OpenTsdbWriter::SendMsgBuffer() << "Flushing data buffer to OpenTsdb."; try { - m_Connection->Send(boost::asio::buffer(std::exchange(m_MsgBuf, std::string{}))); + m_Connection->Send(boost::asio::buffer(std::exchange(m_MsgBuf.GetData(), {}))); } catch (const PerfdataWriterConnection::Stopped& ex) { Log(LogDebug, "OpenTsdbWriter") << ex.what(); return; diff --git a/lib/perfdata/opentsdbwriter.hpp b/lib/perfdata/opentsdbwriter.hpp index 5db298540..04fe0779e 100644 --- a/lib/perfdata/opentsdbwriter.hpp +++ b/lib/perfdata/opentsdbwriter.hpp @@ -35,7 +35,9 @@ protected: private: WorkQueue m_WorkQueue{10000000, 1}; - std::string m_MsgBuf; + Timer::Ptr m_FlushTimer; + std::atomic_bool m_FlushTimerInQueue{false}; + String m_MsgBuf; PerfdataWriterConnection::Ptr m_Connection; boost::signals2::connection m_HandleCheckResults; @@ -46,6 +48,7 @@ private: void CheckResultHandler(const Checkable::Ptr& checkable, const CheckResult::Ptr& cr); void AddMetric(const Checkable::Ptr& checkable, const String& metric, const std::map& tags, double value, double ts); + void FlushTimeout(); void SendMsgBuffer(); void AddPerfdata(const Checkable::Ptr& checkable, const String& metric, const std::map& tags, const CheckResult::Ptr& cr, double ts); diff --git a/lib/perfdata/opentsdbwriter.ti b/lib/perfdata/opentsdbwriter.ti index dcad57168..c0935a887 100644 --- a/lib/perfdata/opentsdbwriter.ti +++ b/lib/perfdata/opentsdbwriter.ti @@ -31,6 +31,12 @@ class OpenTsdbWriter : ConfigObject [config] bool enable_generic_metrics { default {{{ return false; }}} }; + [config] int flush_interval { + default {{{ return 15; }}} + }; + [config] std::size_t flush_threshold { + default {{{ return 2 * 1024 * 1024; }}} + }; [config] double disconnect_timeout { default {{{ return 10; }}} };