Add flush_threshold/flush_interval logic to OpenTsdbWriter

This commit is contained in:
Johannes Schmidt 2026-04-30 15:10:58 +02:00
parent 2c0033d0a6
commit e52320fbeb
4 changed files with 42 additions and 4 deletions

View file

@ -1869,6 +1869,8 @@ Configuration Attributes:
--------------------------|-----------------------|----------------------------------
host | String | **Optional.** OpenTSDB host address. Defaults to `127.0.0.1`.
port | Number | **Optional.** OpenTSDB port. Defaults to `4242`.
flush\_interval | Duration | **Optional.** How long to buffer data points before sending. Defaults to `15s`.
flush\_threshold | Number | **Optional.** How many bytes to buffer before forcing a flush to the backend. Defaults to `2MiB`.
diconnect\_timeout | Duration | **Optional.** Timeout to wait for any outstanding data to be flushed to OpenTSDB before disconnecting. Defaults to `10s`.
enable\_ha | Boolean | **Optional.** Enable the high availability functionality. Only valid in a [cluster setup](06-distributed-monitoring.md#distributed-monitoring-high-availability-features). Defaults to `false`.
enable_generic_metrics | Boolean | **Optional.** Re-use metric names to store different perfdata values for a particular check. Use tags to distinguish perfdata instead of metric name. Defaults to `false`.

View file

@ -2,6 +2,7 @@
// SPDX-License-Identifier: GPL-2.0-or-later
#include "perfdata/opentsdbwriter.hpp"
#include "base/defer.hpp"
#include "perfdata/opentsdbwriter-ti.cpp"
#include "icinga/service.hpp"
#include "icinga/checkcommand.hpp"
@ -88,6 +89,13 @@ void OpenTsdbWriter::Resume()
<< "Exception during OpenTsdb operation: " << DiagnosticInformation(exp);
});
/* Setup timer for periodically flushing m_DataBuffer */
m_FlushTimer = Timer::Create();
m_FlushTimer->SetInterval(GetFlushInterval());
m_FlushTimer->OnTimerExpired.connect([this](const Timer * const&) { FlushTimeout(); });
m_FlushTimer->Start();
m_FlushTimer->Reschedule(0);
ReadConfigTemplate();
m_Connection = new PerfdataWriterConnection{this, GetHost(), GetPort()};
@ -104,6 +112,8 @@ void OpenTsdbWriter::Pause()
{
m_HandleCheckResults.disconnect();
m_FlushTimer->Stop(true);
std::promise<void> queueDonePromise;
m_WorkQueue.Enqueue([&]() {
@ -282,7 +292,9 @@ void OpenTsdbWriter::CheckResultHandler(const Checkable::Ptr& checkable, const C
AddMetric(checkable, metric + ".latency", tags, cr->CalculateLatency(), ts);
AddMetric(checkable, metric + ".execution_time", tags, cr->CalculateExecutionTime(), ts);
SendMsgBuffer();
if (GetFlushThreshold() <= m_MsgBuf.GetLength()) {
SendMsgBuffer();
}
}
);
}
@ -387,7 +399,22 @@ void OpenTsdbWriter::AddMetric(const Checkable::Ptr& checkable, const String& me
/* do not send \n to debug log */
msgbuf << "\n";
m_MsgBuf.append(msgbuf.str());
m_MsgBuf += msgbuf.str();
}
/**
* Queues a Flush on the work-queue if none is queued yet.
*/
void OpenTsdbWriter::FlushTimeout()
{
if (m_FlushTimerInQueue.exchange(true, std::memory_order_relaxed)) {
return;
}
m_WorkQueue.Enqueue([&]() {
Defer resetFlushTimer{[&]() { m_FlushTimerInQueue.store(false, std::memory_order_relaxed); }};
SendMsgBuffer();
});
}
void OpenTsdbWriter::SendMsgBuffer()
@ -398,7 +425,7 @@ void OpenTsdbWriter::SendMsgBuffer()
<< "Flushing data buffer to OpenTsdb.";
try {
m_Connection->Send(boost::asio::buffer(std::exchange(m_MsgBuf, std::string{})));
m_Connection->Send(boost::asio::buffer(std::exchange(m_MsgBuf.GetData(), {})));
} catch (const PerfdataWriterConnection::Stopped& ex) {
Log(LogDebug, "OpenTsdbWriter") << ex.what();
return;

View file

@ -35,7 +35,9 @@ protected:
private:
WorkQueue m_WorkQueue{10000000, 1};
std::string m_MsgBuf;
Timer::Ptr m_FlushTimer;
std::atomic_bool m_FlushTimerInQueue{false};
String m_MsgBuf;
PerfdataWriterConnection::Ptr m_Connection;
boost::signals2::connection m_HandleCheckResults;
@ -46,6 +48,7 @@ private:
void CheckResultHandler(const Checkable::Ptr& checkable, const CheckResult::Ptr& cr);
void AddMetric(const Checkable::Ptr& checkable, const String& metric,
const std::map<String, String>& tags, double value, double ts);
void FlushTimeout();
void SendMsgBuffer();
void AddPerfdata(const Checkable::Ptr& checkable, const String& metric,
const std::map<String, String>& tags, const CheckResult::Ptr& cr, double ts);

View file

@ -31,6 +31,12 @@ class OpenTsdbWriter : ConfigObject
[config] bool enable_generic_metrics {
default {{{ return false; }}}
};
[config] int flush_interval {
default {{{ return 15; }}}
};
[config] std::size_t flush_threshold {
default {{{ return 2 * 1024 * 1024; }}}
};
[config] double disconnect_timeout {
default {{{ return 10; }}}
};