From 27170841487d6d8a7c9a50af930dcedb9503a9ee Mon Sep 17 00:00:00 2001 From: Johannes Schmidt Date: Tue, 12 May 2026 09:37:17 +0200 Subject: [PATCH 1/8] Fix potential nullptr-dereference in perfdata writer stats functions --- lib/perfdata/gelfwriter.cpp | 6 ++++-- lib/perfdata/gelfwriter.hpp | 1 + lib/perfdata/graphitewriter.cpp | 6 ++++-- lib/perfdata/graphitewriter.hpp | 1 + lib/perfdata/opentsdbwriter.cpp | 6 ++++-- lib/perfdata/opentsdbwriter.hpp | 1 + 6 files changed, 15 insertions(+), 6 deletions(-) diff --git a/lib/perfdata/gelfwriter.cpp b/lib/perfdata/gelfwriter.cpp index 6f8567f70..2828839ee 100644 --- a/lib/perfdata/gelfwriter.cpp +++ b/lib/perfdata/gelfwriter.cpp @@ -49,11 +49,12 @@ void GelfWriter::StatsFunc(const Dictionary::Ptr& status, const Array::Ptr& perf for (const GelfWriter::Ptr& gelfwriter : ConfigType::GetObjectsByType()) { size_t workQueueItems = gelfwriter->m_WorkQueue.GetLength(); double workQueueItemRate = gelfwriter->m_WorkQueue.GetTaskCount(60) / 60.0; + auto connection = gelfwriter->m_LockedConnection.load(); nodes.emplace_back(gelfwriter->GetName(), new Dictionary({ { "work_queue_items", workQueueItems }, { "work_queue_item_rate", workQueueItemRate }, - { "connected", gelfwriter->m_Connection->IsConnected() }, + { "connected", connection && connection->IsConnected() }, { "source", gelfwriter->GetSource() } })); @@ -90,7 +91,8 @@ void GelfWriter::Resume() /* Register exception handler for WQ tasks. */ m_WorkQueue.SetExceptionCallback([this](boost::exception_ptr exp) { ExceptionHandler(std::move(exp)); }); - m_Connection = new PerfdataWriterConnection{this, GetHost(), GetPort(), m_SslContext, !GetInsecureNoverify()}; + m_LockedConnection.store(new PerfdataWriterConnection{this, GetHost(), GetPort(), m_SslContext, !GetInsecureNoverify()}); + m_Connection = m_LockedConnection.load(); /* Register event handlers. */ m_HandleCheckResults = Checkable::OnNewCheckResult.connect([this](const Checkable::Ptr& checkable, diff --git a/lib/perfdata/gelfwriter.hpp b/lib/perfdata/gelfwriter.hpp index f7d2a10c3..3663dfbe9 100644 --- a/lib/perfdata/gelfwriter.hpp +++ b/lib/perfdata/gelfwriter.hpp @@ -34,6 +34,7 @@ protected: private: PerfdataWriterConnection::Ptr m_Connection; + Locked m_LockedConnection; WorkQueue m_WorkQueue{10000000, 1}; Shared::Ptr m_SslContext; diff --git a/lib/perfdata/graphitewriter.cpp b/lib/perfdata/graphitewriter.cpp index e00cd9275..71eaeb359 100644 --- a/lib/perfdata/graphitewriter.cpp +++ b/lib/perfdata/graphitewriter.cpp @@ -58,11 +58,12 @@ void GraphiteWriter::StatsFunc(const Dictionary::Ptr& status, const Array::Ptr& for (const GraphiteWriter::Ptr& graphitewriter : ConfigType::GetObjectsByType()) { size_t workQueueItems = graphitewriter->m_WorkQueue.GetLength(); double workQueueItemRate = graphitewriter->m_WorkQueue.GetTaskCount(60) / 60.0; + auto connection = graphitewriter->m_LockedConnection.load(); nodes.emplace_back(graphitewriter->GetName(), new Dictionary({ { "work_queue_items", workQueueItems }, { "work_queue_item_rate", workQueueItemRate }, - { "connected", graphitewriter->m_Connection->IsConnected() } + { "connected", connection && connection->IsConnected() } })); perfdata->Add(new PerfdataValue("graphitewriter_" + graphitewriter->GetName() + "_work_queue_items", workQueueItems)); @@ -85,7 +86,8 @@ void GraphiteWriter::Resume() /* Register exception handler for WQ tasks. */ m_WorkQueue.SetExceptionCallback([this](boost::exception_ptr exp) { ExceptionHandler(std::move(exp)); }); - m_Connection = new PerfdataWriterConnection{this, GetHost(), GetPort()}; + m_LockedConnection.store(new PerfdataWriterConnection{this, GetHost(), GetPort()}); + m_Connection = m_LockedConnection.load(); /* Register event handlers. */ m_HandleCheckResults = Checkable::OnNewCheckResult.connect([this](const Checkable::Ptr& checkable, diff --git a/lib/perfdata/graphitewriter.hpp b/lib/perfdata/graphitewriter.hpp index 470fcc07d..2785e5e29 100644 --- a/lib/perfdata/graphitewriter.hpp +++ b/lib/perfdata/graphitewriter.hpp @@ -36,6 +36,7 @@ protected: private: PerfdataWriterConnection::Ptr m_Connection; + Locked m_LockedConnection; WorkQueue m_WorkQueue{10000000, 1}; boost::signals2::connection m_HandleCheckResults; diff --git a/lib/perfdata/opentsdbwriter.cpp b/lib/perfdata/opentsdbwriter.cpp index 1b2f82a7d..c79e09d7a 100644 --- a/lib/perfdata/opentsdbwriter.cpp +++ b/lib/perfdata/opentsdbwriter.cpp @@ -55,11 +55,12 @@ void OpenTsdbWriter::StatsFunc(const Dictionary::Ptr& status, const Array::Ptr& for (const OpenTsdbWriter::Ptr& opentsdbwriter : ConfigType::GetObjectsByType()) { size_t workQueueItems = opentsdbwriter->m_WorkQueue.GetLength(); double workQueueItemRate = opentsdbwriter->m_WorkQueue.GetTaskCount(60) / 60.0; + auto connection = opentsdbwriter->m_LockedConnection.load(); nodes.emplace_back( opentsdbwriter->GetName(), new Dictionary({ - { "connected", opentsdbwriter->m_Connection->IsConnected() }, + { "connected", connection && connection->IsConnected() }, {"work_queue_items", workQueueItems}, {"work_queue_item_rate", workQueueItemRate} } @@ -90,7 +91,8 @@ void OpenTsdbWriter::Resume() ReadConfigTemplate(); - m_Connection = new PerfdataWriterConnection{this, GetHost(), GetPort()}; + m_LockedConnection.store(new PerfdataWriterConnection{this, GetHost(), GetPort()}); + m_Connection = m_LockedConnection.load(); m_HandleCheckResults = Service::OnNewCheckResult.connect([this](const Checkable::Ptr& checkable, const CheckResult::Ptr& cr, const MessageOrigin::Ptr&) { CheckResultHandler(checkable, cr); diff --git a/lib/perfdata/opentsdbwriter.hpp b/lib/perfdata/opentsdbwriter.hpp index 5db298540..0dc00f9d0 100644 --- a/lib/perfdata/opentsdbwriter.hpp +++ b/lib/perfdata/opentsdbwriter.hpp @@ -37,6 +37,7 @@ private: WorkQueue m_WorkQueue{10000000, 1}; std::string m_MsgBuf; PerfdataWriterConnection::Ptr m_Connection; + Locked m_LockedConnection; boost::signals2::connection m_HandleCheckResults; From 339434c8b4e3a20c64440d07d96a2931edc7cdc2 Mon Sep 17 00:00:00 2001 From: Johannes Schmidt Date: Thu, 2 Apr 2026 13:32:43 +0200 Subject: [PATCH 2/8] Read until end of file in response reading test-cases --- test/perfdata-perfdatatargetfixture.hpp | 11 ++++++++++- test/perfdata-perfdatawriterconnection.cpp | 4 ++-- 2 files changed, 12 insertions(+), 3 deletions(-) diff --git a/test/perfdata-perfdatatargetfixture.hpp b/test/perfdata-perfdatatargetfixture.hpp index bac1c504d..8c2a381bc 100644 --- a/test/perfdata-perfdatatargetfixture.hpp +++ b/test/perfdata-perfdatatargetfixture.hpp @@ -71,11 +71,20 @@ public: BOOST_REQUIRE(stream->next_layer().IsVerifyOK()); } - void Shutdown() + void Shutdown(bool wait = false) { BOOST_REQUIRE(std::holds_alternative::Ptr>(m_Stream)); auto& stream = std::get::Ptr>(m_Stream); try { + if (wait) { + std::array buf{}; + boost::asio::mutable_buffer readBuf (buf.data(), buf.size()); + boost::system::error_code ec; + + do { + stream->read_some(readBuf, ec); + } while (!ec); + } stream->next_layer().shutdown(); } catch (const std::exception& ex) { if (const auto* se = dynamic_cast(&ex); diff --git a/test/perfdata-perfdatawriterconnection.cpp b/test/perfdata-perfdatawriterconnection.cpp index 16ed299a9..0f2435198 100644 --- a/test/perfdata-perfdatawriterconnection.cpp +++ b/test/perfdata-perfdatawriterconnection.cpp @@ -207,7 +207,7 @@ BOOST_AUTO_TEST_CASE(stuck_reading_response) requestReadPromise.set_value(); // Do not send a response but react to the shutdown to be polite. shutdownPromise.get_future().get(); - Shutdown(); + Shutdown(true); }}; TestThread timeoutThread{[&]() { @@ -315,7 +315,7 @@ BOOST_AUTO_TEST_CASE(http_send_retry) SendResponse(); - Shutdown(); + Shutdown(true); }}; boost::beast::http::request request{boost::beast::http::verb::post, "foo", 10}; From 55eb326a562b791082c2c0f42f4f3fee029668fd Mon Sep 17 00:00:00 2001 From: Johannes Schmidt Date: Wed, 22 Apr 2026 11:38:29 +0200 Subject: [PATCH 3/8] Fix a race-condition when perfdata writer is stuck in handshake The issue occurs when ::Connect in `EnsureConnected()` returns after `Disconnect()` has already set `m_Stopped` to true. By adding a check and throwing an exception before entering `async_handshake()` the behavior should now always be consistent. --- lib/perfdata/perfdatawriterconnection.cpp | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/lib/perfdata/perfdatawriterconnection.cpp b/lib/perfdata/perfdatawriterconnection.cpp index 46000c28f..e8a56e899 100644 --- a/lib/perfdata/perfdatawriterconnection.cpp +++ b/lib/perfdata/perfdatawriterconnection.cpp @@ -62,7 +62,7 @@ bool PerfdataWriterConnection::IsStopped() const void PerfdataWriterConnection::Disconnect() { - if (m_Stopped.exchange(true, std::memory_order_relaxed)) { + if (m_Stopped.exchange(true)) { return; } @@ -133,6 +133,10 @@ void PerfdataWriterConnection::EnsureConnected(const boost::asio::yield_context& ::Connect(stream->lowest_layer(), m_Host, m_Port, yc); if constexpr (std::is_same_v, Shared::Ptr>) { + if (m_Stopped) { + BOOST_THROW_EXCEPTION(Stopped{}); + } + using type = boost::asio::ssl::stream_base::handshake_type; stream->next_layer().async_handshake(type::client, yc); From db7a056ceeb76c39c85ba57300d810200d43d56b Mon Sep 17 00:00:00 2001 From: Johannes Schmidt Date: Wed, 22 Apr 2026 11:42:31 +0200 Subject: [PATCH 4/8] Fix ineffective cancel() when stuck in perfdata writer handshake --- lib/perfdata/perfdatawriterconnection.cpp | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/lib/perfdata/perfdatawriterconnection.cpp b/lib/perfdata/perfdatawriterconnection.cpp index e8a56e899..c8cc80e9d 100644 --- a/lib/perfdata/perfdatawriterconnection.cpp +++ b/lib/perfdata/perfdatawriterconnection.cpp @@ -76,9 +76,13 @@ void PerfdataWriterConnection::Disconnect() * completion. */ std::visit( - [](const auto& stream) { + [&](const auto& stream) { if (stream->lowest_layer().is_open()) { - stream->lowest_layer().cancel(); + if (m_Connected) { + stream->lowest_layer().cancel(); + } else { + stream->lowest_layer().close(); + } } }, m_Stream @@ -160,7 +164,7 @@ void PerfdataWriterConnection::EnsureConnected(const boost::asio::yield_context& void PerfdataWriterConnection::Disconnect(boost::asio::yield_context yc) { - if (!m_Connected.exchange(false, std::memory_order_relaxed)) { + if (!m_Connected.exchange(false)) { return; } From 3cc5ee212213028f1c415687273e0e24fd0937a0 Mon Sep 17 00:00:00 2001 From: Johannes Schmidt Date: Tue, 26 May 2026 12:04:26 +0200 Subject: [PATCH 5/8] Ignore errors for `close()` and TCP-`shutdown()` This makes the TCP-path also ignore all errors/exceptions, same as the TLS-path's `GracefulDisconnect()`. --- lib/perfdata/perfdatawriterconnection.cpp | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/lib/perfdata/perfdatawriterconnection.cpp b/lib/perfdata/perfdatawriterconnection.cpp index c8cc80e9d..4b0838d0a 100644 --- a/lib/perfdata/perfdatawriterconnection.cpp +++ b/lib/perfdata/perfdatawriterconnection.cpp @@ -173,8 +173,9 @@ void PerfdataWriterConnection::Disconnect(boost::asio::yield_context yc) if constexpr (std::is_same_v, Shared::Ptr>) { stream->GracefulDisconnect(m_Strand, yc); } else { - stream->lowest_layer().shutdown(boost::asio::socket_base::shutdown_both); - stream->lowest_layer().close(); + boost::system::error_code ec; + stream->lowest_layer().shutdown(boost::asio::socket_base::shutdown_both, ec); + stream->lowest_layer().close(ec); } }, m_Stream From 22b368ca29ceeaa6d5ca7cf36bc5d3b4daa15f5a Mon Sep 17 00:00:00 2001 From: Johannes Schmidt Date: Wed, 29 Apr 2026 12:30:28 +0200 Subject: [PATCH 6/8] Fix `PerfdataWriterConnection` segfaults on non-X86 architectures The issue is that std::promise internally also used thread local storage, in a call to `std::call_once` in `std::promise::set_value()`. The theory is that since all paths in `Send()` run this `std::call_once` routine and from then on, then Coroutine function looks like a normal function, the compiler inlined `set_value()` and moved the common parts of it to a common location for all paths before the suspension point in WriteMessage(yc). When finally the coroutine is resumes, it is likely that that happens under a different thread, which still has `__once_callable` in `std::call_once` set as `nullptr`, leading to the segmentation fault. The fix is to not use std::promise across coroutine suspension points and instead reimplement the functionality we required from it in a small helper class `SyncResult` that does not require any thread local storag. --- lib/perfdata/perfdatawriterconnection.cpp | 8 +-- lib/perfdata/perfdatawriterconnection.hpp | 63 ++++++++++++++++++++--- 2 files changed, 61 insertions(+), 10 deletions(-) diff --git a/lib/perfdata/perfdatawriterconnection.cpp b/lib/perfdata/perfdatawriterconnection.cpp index 4b0838d0a..8b50f061a 100644 --- a/lib/perfdata/perfdatawriterconnection.cpp +++ b/lib/perfdata/perfdatawriterconnection.cpp @@ -66,7 +66,7 @@ void PerfdataWriterConnection::Disconnect() return; } - std::promise promise; + SyncResult ret; IoEngine::SpawnCoroutine(m_Strand, [&](boost::asio::yield_context yc) { try { @@ -90,13 +90,13 @@ void PerfdataWriterConnection::Disconnect() m_ReconnectTimer.cancel(); Disconnect(std::move(yc)); - promise.set_value(); + ret.SetValue(); } catch (const std::exception& ex) { - promise.set_exception(std::current_exception()); + ret.SetException(std::current_exception()); } }); - promise.get_future().get(); + ret.Get(); } AsioTlsOrTcpStream PerfdataWriterConnection::MakeStream() const diff --git a/lib/perfdata/perfdatawriterconnection.hpp b/lib/perfdata/perfdatawriterconnection.hpp index 729878a29..9f91f037e 100644 --- a/lib/perfdata/perfdatawriterconnection.hpp +++ b/lib/perfdata/perfdatawriterconnection.hpp @@ -10,6 +10,7 @@ #include #include #include +#include namespace icinga { @@ -21,6 +22,56 @@ class PerfdataWriterConnection final : public Object static constexpr auto InitialRetryWait = 50ms; static constexpr auto FinalRetryWait = 32s; + template + class SyncResult + { + using ValueType = std::variant, bool, T>, std::exception_ptr>; + + public: + template>> + void SetValue(U&& v) + { + std::lock_guard lock(m_Mutex); + m_Value = std::forward(v); + m_Cv.notify_one(); + } + + template>> + void SetValue() + { + std::lock_guard lock(m_Mutex); + m_Value = true; + m_Cv.notify_one(); + } + + void SetException(std::exception_ptr ep) + { + std::lock_guard lock(m_Mutex); + m_Value = ValueType{ep}; + m_Cv.notify_one(); + } + + T Get() + { + std::unique_lock l(m_Mutex); + m_Cv.wait(l, [&] { return !std::holds_alternative(m_Value); }); + if (std::holds_alternative(m_Value)) { + std::rethrow_exception(std::get(m_Value)); + } + + if constexpr (std::is_void_v) { + return; + } else { + return std::move(std::get(m_Value)); + } + } + + private: + std::mutex m_Mutex; + std::condition_variable m_Cv; + ValueType m_Value; + }; + public: DECLARE_PTR_TYPEDEFS(PerfdataWriterConnection); @@ -66,7 +117,7 @@ public: } using RetType = decltype(WriteMessage(std::declval(), std::declval())); - std::promise promise; + SyncResult ret; IoEngine::SpawnCoroutine(m_Strand, [&](boost::asio::yield_context yc) { while (true) { @@ -75,16 +126,16 @@ public: if constexpr (std::is_void_v) { WriteMessage(std::forward(buf), yc); - promise.set_value(); + ret.SetValue(); } else { - promise.set_value(WriteMessage(std::forward(buf), yc)); + ret.SetValue(WriteMessage(std::forward(buf), yc)); } m_RetryTimeout = InitialRetryWait; return; } catch (const std::exception& ex) { if (m_Stopped) { - promise.set_exception(std::make_exception_ptr(Stopped{})); + ret.SetException(std::make_exception_ptr(Stopped{})); return; } @@ -98,14 +149,14 @@ public: try { BackoffWait(yc); } catch (const std::exception&) { - promise.set_exception(std::make_exception_ptr(Stopped{})); + ret.SetException(std::make_exception_ptr(Stopped{})); return; } } } }); - return promise.get_future().get(); + return ret.Get(); } void Disconnect(); From 1831a00d2d1e5c840d7de60485efadc79942a881 Mon Sep 17 00:00:00 2001 From: Johannes Schmidt Date: Mon, 18 May 2026 10:21:29 +0200 Subject: [PATCH 7/8] Reuse coroutines instead of spawning a new one on each Send --- lib/perfdata/perfdatawriterconnection.cpp | 10 ++++++++++ lib/perfdata/perfdatawriterconnection.hpp | 8 ++++++-- 2 files changed, 16 insertions(+), 2 deletions(-) diff --git a/lib/perfdata/perfdatawriterconnection.cpp b/lib/perfdata/perfdatawriterconnection.cpp index 8b50f061a..7392c8f93 100644 --- a/lib/perfdata/perfdatawriterconnection.cpp +++ b/lib/perfdata/perfdatawriterconnection.cpp @@ -43,8 +43,18 @@ PerfdataWriterConnection::PerfdataWriterConnection( m_Port(std::move(port)), m_ReconnectTimer(IoEngine::Get().GetIoContext()), m_Strand(IoEngine::Get().GetIoContext()), + m_SendCv(IoEngine::Get().GetIoContext()), m_Stream(MakeStream()) { + IoEngine::SpawnCoroutine( + m_Strand, [this, self = PerfdataWriterConnection::Ptr{this}](const boost::asio::yield_context& yc) { + while (!m_Stopped) { + if (m_SendFunction) { + m_SendFunction(yc); + } + m_SendCv.Wait(yc); + } + }); } /** diff --git a/lib/perfdata/perfdatawriterconnection.hpp b/lib/perfdata/perfdatawriterconnection.hpp index 9f91f037e..b0cde2fc8 100644 --- a/lib/perfdata/perfdatawriterconnection.hpp +++ b/lib/perfdata/perfdatawriterconnection.hpp @@ -119,7 +119,7 @@ public: using RetType = decltype(WriteMessage(std::declval(), std::declval())); SyncResult ret; - IoEngine::SpawnCoroutine(m_Strand, [&](boost::asio::yield_context yc) { + m_SendFunction = [&](const boost::asio::yield_context& yc) { while (true) { try { EnsureConnected(yc); @@ -154,7 +154,9 @@ public: } } } - }); + }; + + boost::asio::post(m_Strand, [this](){ m_SendCv.NotifyAll(); }); return ret.Get(); } @@ -202,6 +204,8 @@ private: std::chrono::milliseconds m_RetryTimeout{InitialRetryWait}; boost::asio::steady_timer m_ReconnectTimer; boost::asio::io_context::strand m_Strand; + AsioConditionVariable m_SendCv; + std::function m_SendFunction; AsioTlsOrTcpStream m_Stream; }; From 4680a2d76fb335274a77f4c24f3b27e5181030a6 Mon Sep 17 00:00:00 2001 From: Johannes Schmidt Date: Tue, 5 May 2026 09:09:24 +0200 Subject: [PATCH 8/8] Add time-measurements and debug output to PerfdataWriterConnection --- lib/perfdata/perfdatawriterconnection.cpp | 211 ++++++++++++++++++++++ lib/perfdata/perfdatawriterconnection.hpp | 136 +++++++++++++- 2 files changed, 345 insertions(+), 2 deletions(-) diff --git a/lib/perfdata/perfdatawriterconnection.cpp b/lib/perfdata/perfdatawriterconnection.cpp index 7392c8f93..67e0cefab 100644 --- a/lib/perfdata/perfdatawriterconnection.cpp +++ b/lib/perfdata/perfdatawriterconnection.cpp @@ -3,9 +3,13 @@ #include "perfdata/perfdatawriterconnection.hpp" #include "base/tcpsocket.hpp" +#include #include #include #include +#include +#include +#include #include using namespace icinga; @@ -55,6 +59,213 @@ PerfdataWriterConnection::PerfdataWriterConnection( m_SendCv.Wait(yc); } }); + + StartWatchdog(); +} + +PerfdataWriterConnection::~PerfdataWriterConnection() +{ + if (m_WatchdogTimer) { + m_WatchdogTimer->Stop(true); + } +} + +const char* PerfdataWriterConnection::SendPhaseName(SendPhase phase) +{ + switch (phase) { + case SendPhase::Idle: return "Idle"; + case SendPhase::Queued: return "Queued"; + case SendPhase::Connecting: return "Connecting"; + case SendPhase::Writing: return "Writing"; + case SendPhase::Finalizing: return "Finalizing"; + } + return "Unknown"; +} + +void PerfdataWriterConnection::StartWatchdog() +{ + m_WatchdogTimer = Timer::Create(); + m_WatchdogTimer->SetInterval(1.0); + m_WatchdogTimer->OnTimerExpired.connect([this](const Timer * const&) { OnWatchdogTimer(); }); + m_WatchdogTimer->Start(); +} + +namespace { + +std::string FormatThreadId(std::thread::id id) +{ + if (id == std::thread::id{}) { + return "?"; + } + std::ostringstream oss; + oss << id; + return oss.str(); +} + +void AppendProbeReport( + Log& msg, + const char* label, + const PerfdataWriterConnection::ProbeState& state, + std::chrono::steady_clock::time_point now, + std::chrono::steady_clock::duration stuckThreshold) +{ + auto epoch = std::chrono::steady_clock::time_point{}; + + msg << ", " << label << ":"; + + if (state.lastCompletedAt != epoch) { + auto sinceTick = std::chrono::duration_cast(now - state.lastCompletedAt); + msg << " last tick " << sinceTick.count() << "ms ago on thread " + << FormatThreadId(state.lastCompletedThread); + } else { + msg << " no tick observed yet"; + } + + if (state.probePending) { + auto probeAge = std::chrono::duration_cast(now - state.probeRequestedAt); + msg << ", probe pending " << probeAge.count() << "ms"; + if (probeAge >= stuckThreshold) { + msg << " — wedged"; + } + } else { + msg << ", probe responsive"; + } +} + +} // namespace + +void PerfdataWriterConnection::OnWatchdogTimer() +{ + if (m_Stopped) { + return; + } + + SendDiagnostics diag; + ProbeState strandProbe; + ProbeState ioContextProbe; + { + std::lock_guard lock{m_DiagMutex}; + diag = m_Diag; + strandProbe = m_StrandProbe; + ioContextProbe = m_IoContextProbe; + } + + /* Probe both executors. The snapshots above reflect the state observed + * before this tick's probes are queued, so they capture whether the + * *previous* probes ever ran. */ + ProbeStrand(); + ProbeIoContext(); + + if (diag.phase == SendPhase::Idle) { + return; + } + + auto now = std::chrono::steady_clock::now(); + auto totalElapsed = now - diag.startedAt; + + if (totalElapsed < stuckThreshold) { + return; + } + + std::chrono::steady_clock::time_point phaseStart; + switch (diag.phase) { + case SendPhase::Queued: phaseStart = diag.startedAt; break; + case SendPhase::Connecting: phaseStart = diag.spawnedAt; break; + case SendPhase::Writing: phaseStart = diag.writingAt; break; + case SendPhase::Finalizing: phaseStart = diag.doneAt; break; + case SendPhase::Idle: return; + } + + auto inPhaseFor = std::chrono::duration_cast(now - phaseStart); + auto totalMs = std::chrono::duration_cast(totalElapsed); + + Log msg(LogWarning, m_LogFacility); + msg << "Send to '" << m_Host << ":" << m_Port << "' for '" << m_ParentName + << "' has been in flight for " << totalMs.count() << "ms" + << " (phase: " << SendPhaseName(diag.phase) << " for " << inPhaseFor.count() << "ms"; + + AppendProbeReport(msg, "strand", strandProbe, now, stuckThreshold); + AppendProbeReport(msg, "io_context", ioContextProbe, now, stuckThreshold); + + msg << ")."; +} + +void PerfdataWriterConnection::ProbeStrand() +{ + auto now = std::chrono::steady_clock::now(); + { + std::lock_guard lock{m_DiagMutex}; + if (m_StrandProbe.probePending) { + /* The previous probe hasn't completed yet — don't pile up new + * posts. probeRequestedAt continues to age, so the watchdog will + * report a growing stall on each tick. */ + return; + } + m_StrandProbe.probePending = true; + m_StrandProbe.probeRequestedAt = now; + } + + boost::asio::post(m_Strand, [this, keepAlive = Ptr(this)]() { + auto completedAt = std::chrono::steady_clock::now(); + auto tid = std::this_thread::get_id(); + std::lock_guard lock{m_DiagMutex}; + m_StrandProbe.probePending = false; + m_StrandProbe.lastCompletedAt = completedAt; + m_StrandProbe.lastCompletedThread = tid; + }); +} + +void PerfdataWriterConnection::ProbeIoContext() +{ + auto now = std::chrono::steady_clock::now(); + { + std::lock_guard lock{m_DiagMutex}; + if (m_IoContextProbe.probePending) { + return; + } + m_IoContextProbe.probePending = true; + m_IoContextProbe.probeRequestedAt = now; + } + + /* Post directly to the io_context — bypassing m_Strand — so we can tell + * a strand-local wedge from io_context worker-pool exhaustion. */ + boost::asio::post(IoEngine::Get().GetIoContext(), [this, keepAlive = Ptr(this)]() { + auto completedAt = std::chrono::steady_clock::now(); + auto tid = std::this_thread::get_id(); + std::lock_guard lock{m_DiagMutex}; + m_IoContextProbe.probePending = false; + m_IoContextProbe.lastCompletedAt = completedAt; + m_IoContextProbe.lastCompletedThread = tid; + }); +} + +void PerfdataWriterConnection::LogSlowSend(const SendDiagnostics& diag, std::chrono::steady_clock::duration totalElapsed) const +{ + auto toMs = [](std::chrono::steady_clock::duration d) { + return std::chrono::duration_cast(d).count(); + }; + + Log msg(LogWarning, m_LogFacility); + msg << "Slow Send to '" << m_Host << ":" << m_Port << "' for '" << m_ParentName + << "' took " << toMs(totalElapsed) << "ms"; + + auto epoch = std::chrono::steady_clock::time_point{}; + + if (diag.spawnedAt > diag.startedAt) { + msg << ", spawn " << toMs(diag.spawnedAt - diag.startedAt) << "ms"; + } else if (diag.spawnedAt == epoch) { + msg << ", spawn still pending"; + } + + if (diag.writingAt > diag.spawnedAt && diag.spawnedAt != epoch) { + msg << ", connect " << toMs(diag.writingAt - diag.spawnedAt) << "ms"; + } + + if (diag.doneAt > diag.writingAt && diag.writingAt != epoch) { + msg << ", write " << toMs(diag.doneAt - diag.writingAt) << "ms"; + } + + msg << " (phase before exit: " << SendPhaseName(diag.phase) << ")."; } /** diff --git a/lib/perfdata/perfdatawriterconnection.hpp b/lib/perfdata/perfdatawriterconnection.hpp index b0cde2fc8..aeee0a18d 100644 --- a/lib/perfdata/perfdatawriterconnection.hpp +++ b/lib/perfdata/perfdatawriterconnection.hpp @@ -4,13 +4,19 @@ #pragma once #include "base/io-engine.hpp" +#include "base/logger.hpp" +#include "base/timer.hpp" #include "base/tlsstream.hpp" +#include #include #include #include #include +#include #include #include +#include +#include namespace icinga { @@ -72,6 +78,9 @@ class PerfdataWriterConnection final : public Object ValueType m_Value; }; + static constexpr auto slowThreshold = std::chrono::milliseconds(200); + static constexpr auto stuckThreshold = std::chrono::seconds(5); + public: DECLARE_PTR_TYPEDEFS(PerfdataWriterConnection); @@ -80,6 +89,52 @@ public: [[nodiscard]] const char* what() const noexcept final { return "Connection stopped."; } }; + enum class SendPhase : uint8_t { + Idle = 0, + Queued, + Connecting, + Writing, + Finalizing, + }; + + struct SendDiagnostics { + SendPhase phase{SendPhase::Idle}; + std::chrono::steady_clock::time_point startedAt{}; + std::chrono::steady_clock::time_point spawnedAt{}; + std::chrono::steady_clock::time_point writingAt{}; + std::chrono::steady_clock::time_point doneAt{}; + }; + + /** + * Executor-liveness probe state. + * + * The watchdog periodically posts a no-op onto an executor (m_Strand or + * the bare io_context). If the executor is draining its queue, the post + * completes and lastCompletedAt advances. If the executor is wedged the + * post never runs and probePending stays true with a growing + * probeRequestedAt age. + * + * Two probes are maintained in parallel to triangulate where a wedge sits: + * - m_StrandProbe: posts onto m_Strand. Detects strand-local wedges + * (some handler hogging this strand) as well as io_context-level ones. + * - m_IoContextProbe: posts onto the bare io_context. Detects worker-pool + * exhaustion (all io_context threads parked elsewhere). If the strand + * probe stalls but this one runs promptly, the wedge is strand-local. + * + * lastCompletedThread records which io_context worker last serviced the + * executor, to be cross-referenced with future per-thread instrumentation. + */ + struct ProbeState { + bool probePending{false}; + std::chrono::steady_clock::time_point probeRequestedAt{}; + std::chrono::steady_clock::time_point lastCompletedAt{}; + std::thread::id lastCompletedThread{}; + }; + + ~PerfdataWriterConnection() override; + + static const char* SendPhaseName(SendPhase phase); + using HttpRequest = boost::beast::http::request; using HttpResponse = boost::beast::http::response; @@ -116,19 +171,51 @@ public: BOOST_THROW_EXCEPTION(Stopped{}); } + auto start = std::chrono::steady_clock::now(); + using RetType = decltype(WriteMessage(std::declval(), std::declval())); SyncResult ret; + { + std::lock_guard lock{m_DiagMutex}; + m_Diag.phase = SendPhase::Queued; + m_Diag.startedAt = start; + } + m_SendFunction = [&](const boost::asio::yield_context& yc) { + { + auto now = std::chrono::steady_clock::now(); + std::lock_guard lock{m_DiagMutex}; + m_Diag.spawnedAt = now; + m_Diag.phase = SendPhase::Connecting; + } + while (true) { try { EnsureConnected(yc); + { + std::lock_guard lock{m_DiagMutex}; + m_Diag.phase = SendPhase::Writing; + m_Diag.writingAt = std::chrono::steady_clock::now(); + } + if constexpr (std::is_void_v) { WriteMessage(std::forward(buf), yc); + { + std::lock_guard lock{m_DiagMutex}; + m_Diag.phase = SendPhase::Finalizing; + m_Diag.doneAt = std::chrono::steady_clock::now(); + } ret.SetValue(); } else { - ret.SetValue(WriteMessage(std::forward(buf), yc)); + auto result = WriteMessage(std::forward(buf), yc); + { + std::lock_guard lock{m_DiagMutex}; + m_Diag.phase = SendPhase::Finalizing; + m_Diag.doneAt = std::chrono::steady_clock::now(); + } + ret.SetValue(std::move(result)); } m_RetryTimeout = InitialRetryWait; @@ -146,6 +233,12 @@ public: m_Stream = MakeStream(); m_Connected = false; + { + std::lock_guard lock{m_DiagMutex}; + m_Diag.phase = SendPhase::Connecting; + m_Diag.spawnedAt = std::chrono::steady_clock::now(); + } + try { BackoffWait(yc); } catch (const std::exception&) { @@ -158,7 +251,34 @@ public: boost::asio::post(m_Strand, [this](){ m_SendCv.NotifyAll(); }); - return ret.Get(); + auto resetAndLog = [this, start]() { + SendDiagnostics diag; + { + std::lock_guard lock{m_DiagMutex}; + diag = m_Diag; + m_Diag.phase = SendPhase::Idle; + } + + auto totalElapsed = std::chrono::steady_clock::now() - start; + if (totalElapsed >= slowThreshold) { + LogSlowSend(diag, totalElapsed); + } + }; + + try { + if constexpr (std::is_void_v) { + ret.Get(); + resetAndLog(); + return; + } else { + auto result = ret.Get(); + resetAndLog(); + return result; + } + } catch (const std::exception&) { + resetAndLog(); + throw; + } } void Disconnect(); @@ -190,6 +310,12 @@ private: void WriteMessage(boost::asio::const_buffer, const boost::asio::yield_context& yc); HttpResponse WriteMessage(const HttpRequest& request, const boost::asio::yield_context& yc); + void StartWatchdog(); + void OnWatchdogTimer(); + void ProbeStrand(); + void ProbeIoContext(); + void LogSlowSend(const SendDiagnostics& diag, std::chrono::steady_clock::duration totalElapsed) const; + std::atomic_bool m_Stopped{false}; std::atomic_bool m_Connected{false}; @@ -207,6 +333,12 @@ private: AsioConditionVariable m_SendCv; std::function m_SendFunction; AsioTlsOrTcpStream m_Stream; + + mutable std::mutex m_DiagMutex; + SendDiagnostics m_Diag; + ProbeState m_StrandProbe; + ProbeState m_IoContextProbe; + Timer::Ptr m_WatchdogTimer; }; } // namespace icinga