Add flush_threshold/flush_interval logic to GelfWriter

This commit is contained in:
Johannes Schmidt 2026-04-30 14:29:45 +02:00
parent e09ccfcfcd
commit 2c0033d0a6
4 changed files with 54 additions and 12 deletions

View file

@ -1317,6 +1317,8 @@ Configuration Attributes:
diconnect\_timeout | Duration | **Optional.** Timeout to wait for any outstanding data to be flushed to GELF before disconnecting. Defaults to `10s`.
source | String | **Optional.** Source name for this instance. Defaults to `icinga2`.
enable\_send\_perfdata | Boolean | **Optional.** Enable performance data for 'CHECK RESULT' events.
flush\_interval | Duration | **Optional.** How long to buffer data points before sending. Defaults to `15s`.
flush\_threshold | Number | **Optional.** How many bytes to buffer before forcing a flush to the backend. Defaults to `2MiB`.
enable\_ha | Boolean | **Optional.** Enable the high availability functionality. Only valid in a [cluster setup](06-distributed-monitoring.md#distributed-monitoring-high-availability-features). Defaults to `false`.
enable\_tls | Boolean | **Optional.** Whether to use a TLS stream. Defaults to `false`.
insecure\_noverify | Boolean | **Optional.** Disable TLS peer verification.

View file

@ -2,6 +2,7 @@
// SPDX-License-Identifier: GPL-2.0-or-later
#include "perfdata/gelfwriter.hpp"
#include "base/defer.hpp"
#include "perfdata/gelfwriter-ti.cpp"
#include "icinga/service.hpp"
#include "icinga/notification.hpp"
@ -90,6 +91,13 @@ void GelfWriter::Resume()
/* Register exception handler for WQ tasks. */
m_WorkQueue.SetExceptionCallback([this](boost::exception_ptr exp) { ExceptionHandler(std::move(exp)); });
/* Setup timer for periodically flushing m_DataBuffer */
m_FlushTimer = Timer::Create();
m_FlushTimer->SetInterval(GetFlushInterval());
m_FlushTimer->OnTimerExpired.connect([this](const Timer * const&) { FlushTimeout(); });
m_FlushTimer->Start();
m_FlushTimer->Reschedule(0);
m_Connection = new PerfdataWriterConnection{this, GetHost(), GetPort(), m_SslContext, !GetInsecureNoverify()};
/* Register event handlers. */
@ -115,6 +123,8 @@ void GelfWriter::Pause()
m_HandleNotifications.disconnect();
m_HandleStateChanges.disconnect();
m_FlushTimer->Stop(true);
std::promise<void> queueDonePromise;
m_WorkQueue.Enqueue([&]() {
@ -360,19 +370,38 @@ void GelfWriter::SendLogMessage(const Checkable::Ptr& checkable, const String& g
{
AssertOnWorkQueue();
std::ostringstream msgbuf;
msgbuf << gelfMessage;
msgbuf << '\0';
Log(LogDebug, "GelfWriter")
<< "Checkable '" << checkable->GetName() << "' sending message '" << gelfMessage << "'.";
auto log = msgbuf.str();
m_MsgBuf.GetData().reserve(m_MsgBuf.GetLength() + gelfMessage.GetLength() + 1);
m_MsgBuf += gelfMessage;
m_MsgBuf += '\0';
try {
Log(LogDebug, "GelfWriter")
<< "Checkable '" << checkable->GetName() << "' sending message '" << log << "'.";
m_Connection->Send(boost::asio::const_buffer{log.data(), log.length()});
} catch (const PerfdataWriterConnection::Stopped& ex) {
Log(LogDebug, "GelfWriter") << ex.what();
return;
if (GetFlushThreshold() <= m_MsgBuf.GetLength()) {
Flush();
}
}
/**
* Queues a Flush on the work-queue if none is queued yet.
*/
void GelfWriter::FlushTimeout()
{
if (m_FlushTimerInQueue.exchange(true, std::memory_order_relaxed)) {
return;
}
m_WorkQueue.Enqueue([&]() {
Defer resetFlushTimer{[&]() { m_FlushTimerInQueue.store(false, std::memory_order_relaxed); }};
Flush();
});
}
void GelfWriter::Flush()
{
try {
m_Connection->Send(boost::asio::buffer(std::exchange(m_MsgBuf.GetData(), {})));
} catch (const PerfdataWriterConnection::Stopped& ex) {
Log(LogDebug, "GelfWriter") << ex.what();
}
}

View file

@ -34,6 +34,9 @@ protected:
private:
PerfdataWriterConnection::Ptr m_Connection;
Timer::Ptr m_FlushTimer;
std::atomic_bool m_FlushTimerInQueue{false};
String m_MsgBuf;
WorkQueue m_WorkQueue{10000000, 1};
Shared<boost::asio::ssl::context>::Ptr m_SslContext;
@ -46,6 +49,8 @@ private:
String ComposeGelfMessage(const Dictionary::Ptr& fields, const String& source, double ts);
void SendLogMessage(const Checkable::Ptr& checkable, const String& gelfMessage);
void FlushTimeout();
void Flush();
void AssertOnWorkQueue();

View file

@ -24,6 +24,12 @@ class GelfWriter : ConfigObject
[config] bool enable_send_perfdata {
default {{{ return false; }}}
};
[config] int flush_interval {
default {{{ return 15; }}}
};
[config] std::size_t flush_threshold {
default {{{ return 2 * 1024 * 1024; }}}
};
[config] double disconnect_timeout {
default {{{ return 10; }}}