From 7955d04ae3ffccdd768507f5b0ad081e9322507d Mon Sep 17 00:00:00 2001 From: Johannes Schmidt Date: Mon, 2 Feb 2026 13:41:35 +0100 Subject: [PATCH 1/9] Import std::chrono_literals into icinga namespace --- lib/base/i2-base.hpp | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/lib/base/i2-base.hpp b/lib/base/i2-base.hpp index caf67f6f4..440862ff1 100644 --- a/lib/base/i2-base.hpp +++ b/lib/base/i2-base.hpp @@ -76,5 +76,10 @@ #define BOOST_BIND_NO_PLACEHOLDERS #include +#include + +namespace icinga { +using namespace std::chrono_literals; +} // namespace icinga #endif /* I2BASE_H */ From 240142bcbda1535f8aec6f49c3ff5afb8ea94571 Mon Sep 17 00:00:00 2001 From: Johannes Schmidt Date: Fri, 12 Dec 2025 13:27:02 +0100 Subject: [PATCH 2/9] Refactor OpenTsdbWriter to use a WorkQueue --- lib/perfdata/opentsdbwriter.cpp | 119 +++++++++++++++++++++----------- lib/perfdata/opentsdbwriter.hpp | 7 +- 2 files changed, 84 insertions(+), 42 deletions(-) diff --git a/lib/perfdata/opentsdbwriter.cpp b/lib/perfdata/opentsdbwriter.cpp index 002639792..95b43ba76 100644 --- a/lib/perfdata/opentsdbwriter.cpp +++ b/lib/perfdata/opentsdbwriter.cpp @@ -36,6 +36,8 @@ void OpenTsdbWriter::OnConfigLoaded() { ObjectImpl::OnConfigLoaded(); + m_WorkQueue.SetName("OpenTsdbWriter, " + GetName()); + if (!GetEnableHa()) { Log(LogDebug, "OpenTsdbWriter") << "HA functionality disabled. Won't pause connection: " << GetName(); @@ -51,14 +53,26 @@ void OpenTsdbWriter::OnConfigLoaded() * * @param status Key value pairs for feature stats */ -void OpenTsdbWriter::StatsFunc(const Dictionary::Ptr& status, const Array::Ptr&) +void OpenTsdbWriter::StatsFunc(const Dictionary::Ptr& status, const Array::Ptr& perfdata) { DictionaryData nodes; for (const OpenTsdbWriter::Ptr& opentsdbwriter : ConfigType::GetObjectsByType()) { - nodes.emplace_back(opentsdbwriter->GetName(), new Dictionary({ - { "connected", opentsdbwriter->GetConnected() } - })); + size_t workQueueItems = opentsdbwriter->m_WorkQueue.GetLength(); + double workQueueItemRate = opentsdbwriter->m_WorkQueue.GetTaskCount(60) / 60.0; + + nodes.emplace_back( + opentsdbwriter->GetName(), + new Dictionary({ + {"connected", opentsdbwriter->GetConnected()}, + {"work_queue_items", workQueueItems}, + {"work_queue_item_rate", workQueueItemRate} + } + ) + ); + + perfdata->Add(new PerfdataValue("opentsdbwriter_" + opentsdbwriter->GetName() + "_work_queue_items", workQueueItems)); + perfdata->Add(new PerfdataValue("opentsdbwriter_" + opentsdbwriter->GetName() + "_work_queue_item_rate", workQueueItemRate)); } status->Set("opentsdbwriter", new Dictionary(std::move(nodes))); @@ -74,6 +88,11 @@ void OpenTsdbWriter::Resume() Log(LogInformation, "OpentsdbWriter") << "'" << GetName() << "' resumed."; + m_WorkQueue.SetExceptionCallback([](const boost::exception_ptr& exp) { + Log(LogDebug, "OpenTsdbWriter") + << "Exception during OpenTsdb operation: " << DiagnosticInformation(exp); + }); + ReadConfigTemplate(); m_ReconnectTimer = Timer::Create(); @@ -95,6 +114,8 @@ void OpenTsdbWriter::Pause() m_HandleCheckResults.disconnect(); m_ReconnectTimer->Stop(true); + m_WorkQueue.Join(); + Log(LogInformation, "OpentsdbWriter") << "'" << GetName() << "' paused."; @@ -251,7 +272,7 @@ void OpenTsdbWriter::CheckResultHandler(const Checkable::Ptr& checkable, const C String escaped_hostName = EscapeTag(host->GetName()); tags["host"] = escaped_hostName; - double ts = cr->GetExecutionEnd(); + std::vector> metadata; if (service) { @@ -262,40 +283,51 @@ void OpenTsdbWriter::CheckResultHandler(const Checkable::Ptr& checkable, const C String escaped_serviceName = EscapeMetric(serviceName); metric = "icinga.service." + escaped_serviceName; } - - SendMetric(checkable, metric + ".state", tags, service->GetState(), ts); + metadata.emplace_back("state", service->GetState()); } else { if (!config_tmpl_metric.IsEmpty()) { metric = config_tmpl_metric; } else { metric = "icinga.host"; } - SendMetric(checkable, metric + ".state", tags, host->GetState(), ts); + metadata.emplace_back("state", host->GetState()); } - SendMetric(checkable, metric + ".state_type", tags, checkable->GetStateType(), ts); - SendMetric(checkable, metric + ".reachable", tags, checkable->IsReachable(), ts); - SendMetric(checkable, metric + ".downtime_depth", tags, checkable->GetDowntimeDepth(), ts); - SendMetric(checkable, metric + ".acknowledgement", tags, checkable->GetAcknowledgement(), ts); + metadata.emplace_back("state_type", checkable->GetStateType()); + metadata.emplace_back("reachable", checkable->IsReachable()); + metadata.emplace_back("downtime_depth", checkable->GetDowntimeDepth()); + metadata.emplace_back("acknowledgement", checkable->GetAcknowledgement()); - SendPerfdata(checkable, metric, tags, cr, ts); + m_WorkQueue.Enqueue( + [this, checkable, service, cr, metric = std::move(metric), tags = std::move(tags), metadata = std::move(metadata), ts]() mutable { + double ts = cr->GetExecutionEnd(); - metric = "icinga.check"; + for (auto& [name, val] : metadata) { + AddMetric(checkable, metric + "." + name, tags, val, ts); + } - if (service) { - tags["type"] = "service"; - String serviceName = service->GetShortName(); - String escaped_serviceName = EscapeTag(serviceName); - tags["service"] = escaped_serviceName; - } else { - tags["type"] = "host"; - } + AddPerfdata(checkable, metric, tags, cr, ts); - SendMetric(checkable, metric + ".current_attempt", tags, checkable->GetCheckAttempt(), ts); - SendMetric(checkable, metric + ".max_check_attempts", tags, checkable->GetMaxCheckAttempts(), ts); - SendMetric(checkable, metric + ".latency", tags, cr->CalculateLatency(), ts); - SendMetric(checkable, metric + ".execution_time", tags, cr->CalculateExecutionTime(), ts); + metric = "icinga.check"; + + if (service) { + tags["type"] = "service"; + String serviceName = service->GetShortName(); + String escaped_serviceName = EscapeTag(serviceName); + tags["service"] = escaped_serviceName; + } else { + tags["type"] = "host"; + } + + AddMetric(checkable, metric + ".current_attempt", tags, checkable->GetCheckAttempt(), ts); + AddMetric(checkable, metric + ".max_check_attempts", tags, checkable->GetMaxCheckAttempts(), ts); + AddMetric(checkable, metric + ".latency", tags, cr->CalculateLatency(), ts); + AddMetric(checkable, metric + ".execution_time", tags, cr->CalculateExecutionTime(), ts); + + SendMsgBuffer(); + } + ); } /** @@ -307,9 +339,11 @@ void OpenTsdbWriter::CheckResultHandler(const Checkable::Ptr& checkable, const C * @param cr Check result containing performance data * @param ts Timestamp when the check result was received */ -void OpenTsdbWriter::SendPerfdata(const Checkable::Ptr& checkable, const String& metric, +void OpenTsdbWriter::AddPerfdata(const Checkable::Ptr& checkable, const String& metric, const std::map& tags, const CheckResult::Ptr& cr, double ts) { + ASSERT(m_WorkQueue.IsWorkerThread()); + Array::Ptr perfdata = cr->GetPerformanceData(); if (!perfdata) @@ -350,21 +384,21 @@ void OpenTsdbWriter::SendPerfdata(const Checkable::Ptr& checkable, const String& tags_new["label"] = escaped_key; } - SendMetric(checkable, metric_name, tags_new, pdv->GetValue(), ts); + AddMetric(checkable, metric_name, tags_new, pdv->GetValue(), ts); if (!pdv->GetCrit().IsEmpty()) - SendMetric(checkable, metric_name + "_crit", tags_new, pdv->GetCrit(), ts); + AddMetric(checkable, metric_name + "_crit", tags_new, pdv->GetCrit(), ts); if (!pdv->GetWarn().IsEmpty()) - SendMetric(checkable, metric_name + "_warn", tags_new, pdv->GetWarn(), ts); + AddMetric(checkable, metric_name + "_warn", tags_new, pdv->GetWarn(), ts); if (!pdv->GetMin().IsEmpty()) - SendMetric(checkable, metric_name + "_min", tags_new, pdv->GetMin(), ts); + AddMetric(checkable, metric_name + "_min", tags_new, pdv->GetMin(), ts); if (!pdv->GetMax().IsEmpty()) - SendMetric(checkable, metric_name + "_max", tags_new, pdv->GetMax(), ts); + AddMetric(checkable, metric_name + "_max", tags_new, pdv->GetMax(), ts); } } /** - * Send given metric to OpenTSDB + * Add given metric to the data buffer to be later sent to OpenTSDB * * @param checkable Host/service object * @param metric Full metric name @@ -372,9 +406,11 @@ void OpenTsdbWriter::SendPerfdata(const Checkable::Ptr& checkable, const String& * @param value Floating point metric value * @param ts Timestamp where the metric was received from the check result */ -void OpenTsdbWriter::SendMetric(const Checkable::Ptr& checkable, const String& metric, +void OpenTsdbWriter::AddMetric(const Checkable::Ptr& checkable, const String& metric, const std::map& tags, double value, double ts) { + ASSERT(m_WorkQueue.IsWorkerThread()); + String tags_string = ""; for (auto& tag : tags) { @@ -394,18 +430,21 @@ void OpenTsdbWriter::SendMetric(const Checkable::Ptr& checkable, const String& m /* do not send \n to debug log */ msgbuf << "\n"; - String put = msgbuf.str(); + m_MsgBuf.append(msgbuf.str()); +} - ObjectLock olock(this); +void OpenTsdbWriter::SendMsgBuffer() +{ + ASSERT(m_WorkQueue.IsWorkerThread()); if (!GetConnected()) return; - try { - Log(LogDebug, "OpenTsdbWriter") - << "Checkable '" << checkable->GetName() << "' sending message '" << put << "'."; + Log(LogDebug, "OpenTsdbWriter") + << "Flushing data buffer to OpenTsdb."; - boost::asio::write(*m_Stream, boost::asio::buffer(msgbuf.str())); + try { + boost::asio::write(*m_Stream, boost::asio::buffer(std::exchange(m_MsgBuf, std::string{}))); m_Stream->flush(); } catch (const std::exception& ex) { Log(LogCritical, "OpenTsdbWriter") diff --git a/lib/perfdata/opentsdbwriter.hpp b/lib/perfdata/opentsdbwriter.hpp index cd3f2efc4..eb85f4436 100644 --- a/lib/perfdata/opentsdbwriter.hpp +++ b/lib/perfdata/opentsdbwriter.hpp @@ -36,6 +36,8 @@ protected: void Pause() override; private: + WorkQueue m_WorkQueue{10000000, 1}; + std::string m_MsgBuf; Shared::Ptr m_Stream; boost::signals2::connection m_HandleCheckResults; @@ -45,9 +47,10 @@ private: Dictionary::Ptr m_HostConfigTemplate; void CheckResultHandler(const Checkable::Ptr& checkable, const CheckResult::Ptr& cr); - void SendMetric(const Checkable::Ptr& checkable, const String& metric, + void AddMetric(const Checkable::Ptr& checkable, const String& metric, const std::map& tags, double value, double ts); - void SendPerfdata(const Checkable::Ptr& checkable, const String& metric, + void SendMsgBuffer(); + void AddPerfdata(const Checkable::Ptr& checkable, const String& metric, const std::map& tags, const CheckResult::Ptr& cr, double ts); static String EscapeTag(const String& str); static String EscapeMetric(const String& str); From e339a229bac0fc562df8e7c88c71bae607c32079 Mon Sep 17 00:00:00 2001 From: Johannes Schmidt Date: Fri, 30 Jan 2026 15:38:22 +0100 Subject: [PATCH 3/9] Add TestThread class to not get unit-tests stuck in `join()`s --- test/test-thread.hpp | 60 ++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 60 insertions(+) create mode 100644 test/test-thread.hpp diff --git a/test/test-thread.hpp b/test/test-thread.hpp new file mode 100644 index 000000000..f78383282 --- /dev/null +++ b/test/test-thread.hpp @@ -0,0 +1,60 @@ +// SPDX-FileCopyrightText: 2026 Icinga GmbH +// SPDX-License-Identifier: GPL-3.0-or-later + +#pragma once + +#include +#include +#include +#include + +#define REQUIRE_JOINS_WITHIN(t, timeout) \ + BOOST_REQUIRE_MESSAGE(t.TryJoinWithin(timeout), "Thread not joinable within timeout.") +#define CHECK_JOINS_WITHIN(t, timeout) \ + BOOST_REQUIRE_MESSAGE(t.TryJoinWithin(timeout), "Thread not joinable within timeout.") +#define TEST_JOINS_WITHIN(t, timeout) \ + BOOST_REQUIRE_MESSAGE(t.TryJoinWithin(timeout), "Thread not joinable within timeout.") + +#define REQUIRE_JOINABLE(t) BOOST_REQUIRE_MESSAGE(t.Joinable(), "Thread not joinable.") +#define CHECK_JOINABLE(t) BOOST_REQUIRE_MESSAGE(t.Joinable(), "Thread not joinable.") +#define TEST_JOINABLE(t) BOOST_REQUIRE_MESSAGE(t.Joinable(), "Thread not joinable.") + +namespace icinga { + +class TestThread +{ +public: + explicit TestThread(std::function fn) : TestThread(std::move(fn), std::promise{}) {} + + bool Joinable() + { + auto status = m_JoinFuture.wait_for(std::chrono::milliseconds{0}); + return status == std::future_status::ready; + } + + template + bool TryJoinWithin(std::chrono::duration timeout) + { + auto status = m_JoinFuture.wait_for(timeout); + if (status == std::future_status::ready) { + m_Thread.join(); + return true; + } + return false; + } + +private: + explicit TestThread(std::function fn, std::promise joinPromise) + : m_JoinFuture(joinPromise.get_future()), + m_Thread([fn = std::move(fn), jp = std::move(joinPromise)]() mutable { + fn(); + jp.set_value(); + }) + { + } + + std::future m_JoinFuture; + std::thread m_Thread; +}; + +} // namespace icinga From 6fc91309833a4418b6416cca9cb82be498d0871f Mon Sep 17 00:00:00 2001 From: Johannes Schmidt Date: Mon, 23 Feb 2026 09:06:26 +0100 Subject: [PATCH 4/9] Move and extend ReceiveCheckResults function --- test/notification-notificationcomponent.cpp | 18 +-------- test/utils.cpp | 43 +++++++++++++++++++++ test/utils.hpp | 9 +++++ 3 files changed, 54 insertions(+), 16 deletions(-) diff --git a/test/notification-notificationcomponent.cpp b/test/notification-notificationcomponent.cpp index c2ef994a0..a67882ab7 100644 --- a/test/notification-notificationcomponent.cpp +++ b/test/notification-notificationcomponent.cpp @@ -5,6 +5,7 @@ #include "base/defer.hpp" #include "remote/apilistener.hpp" #include "test/base-testloggerfixture.hpp" +#include "test/utils.hpp" #include "config/configcompiler.hpp" #include "notification/notificationcomponent.hpp" @@ -194,22 +195,7 @@ object NotificationComponent "nc" {} void ReceiveCheckResults(std::size_t num, ServiceState state) { - StoppableWaitGroup::Ptr wg = new StoppableWaitGroup(); - - for (auto i = 0UL; i < num; ++i) { - CheckResult::Ptr cr = new CheckResult(); - - cr->SetState(state); - - double now = Utility::GetTime(); - cr->SetActive(false); - cr->SetScheduleStart(now); - cr->SetScheduleEnd(now); - cr->SetExecutionStart(now); - cr->SetExecutionEnd(now); - - BOOST_REQUIRE(m_Host->ProcessCheckResult(cr, wg) == Checkable::ProcessingResult::Ok); - } + ::ReceiveCheckResults(m_Host, num, state); } double GetLastNotificationTimestamp() { return m_Notification->GetLastNotification(); } diff --git a/test/utils.cpp b/test/utils.cpp index a0aba80d0..f056a51d9 100644 --- a/test/utils.cpp +++ b/test/utils.cpp @@ -2,6 +2,7 @@ // SPDX-License-Identifier: GPL-2.0-or-later #include "utils.hpp" +#include "base/perfdatavalue.hpp" #include #include #include @@ -66,3 +67,45 @@ GlobalTimezoneFixture::~GlobalTimezoneFixture() #endif tzset(); } + +/** + * Make our test host receive a number of check-results. + * + * @param num The number of check-results to receive + * @param state The state the check results should have + * @param fn A function that will be passed the current check-result + */ +void ReceiveCheckResults( + const icinga::Checkable::Ptr& host, + std::size_t num, + icinga::ServiceState state, + const std::function& fn +) +{ + using namespace icinga; + + StoppableWaitGroup::Ptr wg = new StoppableWaitGroup(); + + for (auto i = 0UL; i < num; ++i) { + CheckResult::Ptr cr = new CheckResult(); + + cr->SetState(state); + + double now = Utility::GetTime(); + cr->SetActive(false); + cr->SetScheduleStart(now); + cr->SetScheduleEnd(now); + cr->SetExecutionStart(now); + cr->SetExecutionEnd(now); + + Array::Ptr perfData = new Array; + perfData->Add(new PerfdataValue{"dummy", 42}); + cr->SetPerformanceData(perfData); + + if (fn) { + fn(cr); + } + + BOOST_REQUIRE(host->ProcessCheckResult(cr, wg) == Checkable::ProcessingResult::Ok); + } +} diff --git a/test/utils.hpp b/test/utils.hpp index 67d2575a2..e0bdf1129 100644 --- a/test/utils.hpp +++ b/test/utils.hpp @@ -3,7 +3,9 @@ #pragma once +#include "icinga/host.hpp" #include +#include #include tm make_tm(std::string s); @@ -24,3 +26,10 @@ struct GlobalTimezoneFixture char *tz; }; + +void ReceiveCheckResults( + const icinga::Checkable::Ptr& host, + std::size_t num, + icinga::ServiceState state, + const std::function& fn = {} +); From 11d099f4bb339345d71b40f64106fa4fc488654b Mon Sep 17 00:00:00 2001 From: Johannes Schmidt Date: Mon, 29 Sep 2025 11:00:27 +0200 Subject: [PATCH 5/9] Add Assert-Macros for the TestLogger Also add a Clear() function to clear existing log content. --- test/base-testloggerfixture.hpp | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/test/base-testloggerfixture.hpp b/test/base-testloggerfixture.hpp index 1d1426efc..d01384fa8 100644 --- a/test/base-testloggerfixture.hpp +++ b/test/base-testloggerfixture.hpp @@ -11,6 +11,12 @@ #include #include +#define CHECK_LOG_MESSAGE(pattern, timeout) BOOST_CHECK(ExpectLogPattern(pattern, timeout)) +#define REQUIRE_LOG_MESSAGE(pattern, timeout) BOOST_REQUIRE(ExpectLogPattern(pattern, timeout)) + +#define CHECK_NO_LOG_MESSAGE(pattern, timeout) BOOST_CHECK(!ExpectLogPattern(pattern, timeout)) +#define REQUIRE_NO_LOG_MESSAGE(pattern, timeout) BOOST_REQUIRE(!ExpectLogPattern(pattern, timeout)) + namespace icinga { class TestLogger : public Logger From 2e2576c7c5bb48e5ffbcf34981662706d2f75ba9 Mon Sep 17 00:00:00 2001 From: Johannes Schmidt Date: Mon, 2 Feb 2026 13:43:10 +0100 Subject: [PATCH 6/9] Add PerfdataWriterConnection class --- lib/perfdata/CMakeLists.txt | 1 + lib/perfdata/perfdatawriterconnection.cpp | 209 ++++++++++++++++++++++ lib/perfdata/perfdatawriterconnection.hpp | 157 ++++++++++++++++ 3 files changed, 367 insertions(+) create mode 100644 lib/perfdata/perfdatawriterconnection.cpp create mode 100644 lib/perfdata/perfdatawriterconnection.hpp diff --git a/lib/perfdata/CMakeLists.txt b/lib/perfdata/CMakeLists.txt index 3bbbe3b43..c53e9e9b6 100644 --- a/lib/perfdata/CMakeLists.txt +++ b/lib/perfdata/CMakeLists.txt @@ -19,6 +19,7 @@ set(perfdata_SOURCES influxdb2writer.cpp influxdb2writer.hpp influxdb2writer-ti.hpp opentsdbwriter.cpp opentsdbwriter.hpp opentsdbwriter-ti.hpp perfdatawriter.cpp perfdatawriter.hpp perfdatawriter-ti.hpp + perfdatawriterconnection.cpp perfdatawriterconnection.hpp ) if(ICINGA2_UNITY_BUILD) diff --git a/lib/perfdata/perfdatawriterconnection.cpp b/lib/perfdata/perfdatawriterconnection.cpp new file mode 100644 index 000000000..f8807c9ce --- /dev/null +++ b/lib/perfdata/perfdatawriterconnection.cpp @@ -0,0 +1,209 @@ +// SPDX-FileCopyrightText: 2026 Icinga GmbH +// SPDX-License-Identifier: GPL-3.0-or-later + +#include "perfdata/perfdatawriterconnection.hpp" +#include "base/tcpsocket.hpp" +#include +#include +#include +#include + +using namespace icinga; +using HttpResponse = PerfdataWriterConnection::HttpResponse; + +PerfdataWriterConnection::PerfdataWriterConnection( + const ConfigObject::Ptr& parent, + String host, + String port, + Shared::Ptr sslContext, + bool verifyPeerCertificate +) + : PerfdataWriterConnection( + parent->GetReflectionType()->GetName(), + parent->GetName(), + std::move(host), + std::move(port), + std::move(sslContext), + verifyPeerCertificate + ) {}; + +PerfdataWriterConnection::PerfdataWriterConnection( + String logFacility, + String parentName, + String host, + String port, + Shared::Ptr sslContext, + bool verifyPeerCertificate +) + : m_VerifyPeerCertificate(verifyPeerCertificate), + m_SslContext(std::move(sslContext)), + m_LogFacility(std::move(logFacility)), + m_ParentName(std::move(parentName)), + m_Host(std::move(host)), + m_Port(std::move(port)), + m_ReconnectTimer(IoEngine::Get().GetIoContext()), + m_Strand(IoEngine::Get().GetIoContext()), + m_Stream(MakeStream()) +{ +} + +/** + * Get the current state of the connection. + */ +bool PerfdataWriterConnection::IsConnected() const +{ + return m_Connected; +} + +bool PerfdataWriterConnection::IsStopped() const +{ + return m_Stopped; +} + +void PerfdataWriterConnection::Disconnect() +{ + if (m_Stopped.exchange(true, std::memory_order_relaxed)) { + return; + } + + std::promise promise; + + IoEngine::SpawnCoroutine(m_Strand, [&](boost::asio::yield_context yc) { + try { + /* Cancel any outstanding operations of the other coroutine. + * Since we're on the same strand we're hopefully guaranteed that all cancellations + * result in exceptions thrown by the yield_context, even if its already queued for + * completion. + */ + std::visit( + [](const auto& stream) { + if (stream->lowest_layer().is_open()) { + stream->lowest_layer().cancel(); + } + }, + m_Stream + ); + m_ReconnectTimer.cancel(); + + Disconnect(std::move(yc)); + promise.set_value(); + } catch (const std::exception& ex) { + promise.set_exception(std::current_exception()); + } + }); + + promise.get_future().get(); +} + +AsioTlsOrTcpStream PerfdataWriterConnection::MakeStream() const +{ + AsioTlsOrTcpStream ret; + if (m_SslContext) { + ret = Shared::Make(IoEngine::Get().GetIoContext(), *m_SslContext); + } else { + ret = Shared::Make(IoEngine::Get().GetIoContext()); + } + + return ret; +} + +/** + * Wait for the next attempt after an error, using a backoff algorithm. + * + * The waits between retries are doubled for each failure, up to a maximum of 32s, until it is + * reset by a successful attempt. + */ +void PerfdataWriterConnection::BackoffWait(const boost::asio::yield_context& yc) +{ + m_ReconnectTimer.expires_after(m_RetryTimeout); + if (m_RetryTimeout <= FinalRetryWait / 2) { + m_RetryTimeout *= 2; + } + m_ReconnectTimer.async_wait(yc); +} + +void PerfdataWriterConnection::EnsureConnected(const boost::asio::yield_context& yc) +{ + if (m_Connected) { + return; + } + + std::visit( + [&](auto& stream) { + ::Connect(stream->lowest_layer(), m_Host, m_Port, yc); + + if constexpr (std::is_same_v, Shared::Ptr>) { + using type = boost::asio::ssl::stream_base::handshake_type; + + stream->next_layer().async_handshake(type::client, yc); + + if (m_VerifyPeerCertificate) { + if (!stream->next_layer().IsVerifyOK()) { + BOOST_THROW_EXCEPTION( + std::runtime_error{ + "TLS certificate validation failed: " + stream->next_layer().GetVerifyError() + } + ); + } + } + } + }, + m_Stream + ); + + m_Connected = true; +} + +void PerfdataWriterConnection::Disconnect(boost::asio::yield_context yc) +{ + if (!m_Connected.exchange(false, std::memory_order_relaxed)) { + return; + } + + std::visit( + [&](auto& stream) { + if constexpr (std::is_same_v, Shared::Ptr>) { + stream->GracefulDisconnect(m_Strand, yc); + } else { + stream->lowest_layer().shutdown(boost::asio::socket_base::shutdown_both); + stream->lowest_layer().close(); + } + }, + m_Stream + ); + + m_Stream = MakeStream(); +} + +void PerfdataWriterConnection::WriteMessage(boost::asio::const_buffer buf, const boost::asio::yield_context& yc) +{ + std::visit( + [&](auto& stream) { + boost::asio::async_write(*stream, buf, yc); + stream->async_flush(yc); + }, + m_Stream + ); +} + +HttpResponse PerfdataWriterConnection::WriteMessage(const HttpRequest& request, const boost::asio::yield_context& yc) +{ + boost::beast::http::response response; + std::visit( + [&](auto& stream) { + boost::beast::http::request_serializer sr{request}; + boost::beast::http::async_write(*stream, sr, yc); + stream->async_flush(yc); + + boost::beast::flat_buffer buf; + boost::beast::http::async_read(*stream, buf, response, yc); + }, + m_Stream + ); + + if (!response.keep_alive()) { + Disconnect(yc); + } + + return response; +} diff --git a/lib/perfdata/perfdatawriterconnection.hpp b/lib/perfdata/perfdatawriterconnection.hpp new file mode 100644 index 000000000..729878a29 --- /dev/null +++ b/lib/perfdata/perfdatawriterconnection.hpp @@ -0,0 +1,157 @@ +// SPDX-FileCopyrightText: 2026 Icinga GmbH +// SPDX-License-Identifier: GPL-3.0-or-later + +#pragma once + +#include "base/io-engine.hpp" +#include "base/tlsstream.hpp" +#include +#include +#include +#include +#include + +namespace icinga { + +/** + * Class handling the connection to the various Perfdata backends. + */ +class PerfdataWriterConnection final : public Object +{ + static constexpr auto InitialRetryWait = 50ms; + static constexpr auto FinalRetryWait = 32s; + +public: + DECLARE_PTR_TYPEDEFS(PerfdataWriterConnection); + + struct Stopped : std::exception + { + [[nodiscard]] const char* what() const noexcept final { return "Connection stopped."; } + }; + + using HttpRequest = boost::beast::http::request; + using HttpResponse = boost::beast::http::response; + + PerfdataWriterConnection( + const ConfigObject::Ptr& parent, + String host, + String port, + Shared::Ptr sslContext = nullptr, + bool verifyPeerCertificate = true + ); + + PerfdataWriterConnection( + String logFacility, + String parentName, + String host, + String port, + Shared::Ptr sslContext = nullptr, + bool verifyPeerCertificate = true + ); + + /** + * Send the given data buffer to the server. + * + * To support each Buffer type this function needs an overload of the WriteMessage method. + * If the selected WriteMessage functions returns something, Send() will return that result. + * + * @param buf The buffer to send + * @return the return value returned by the WriteMessage overload for Buffer, otherwise void + */ + template + auto Send(Buffer&& buf) + { + if (m_Stopped) { + BOOST_THROW_EXCEPTION(Stopped{}); + } + + using RetType = decltype(WriteMessage(std::declval(), std::declval())); + std::promise promise; + + IoEngine::SpawnCoroutine(m_Strand, [&](boost::asio::yield_context yc) { + while (true) { + try { + EnsureConnected(yc); + + if constexpr (std::is_void_v) { + WriteMessage(std::forward(buf), yc); + promise.set_value(); + } else { + promise.set_value(WriteMessage(std::forward(buf), yc)); + } + + m_RetryTimeout = InitialRetryWait; + return; + } catch (const std::exception& ex) { + if (m_Stopped) { + promise.set_exception(std::make_exception_ptr(Stopped{})); + return; + } + + Log(LogCritical, m_LogFacility) + << "Error while " << (m_Connected ? "sending" : "connecting") << " to '" << m_Host << ":" + << m_Port << "' for '" << m_ParentName << "': " << ex.what(); + + m_Stream = MakeStream(); + m_Connected = false; + + try { + BackoffWait(yc); + } catch (const std::exception&) { + promise.set_exception(std::make_exception_ptr(Stopped{})); + return; + } + } + } + }); + + return promise.get_future().get(); + } + + void Disconnect(); + + /** + * Cancels ongoing operations either after a timeout or a future became ready. + * + * This will disconnect and set a flag so that no further Send() requests are accepted. + * + * @param future The future to wait for + * @param timeout The timeout after which ongoing operations are canceled + */ + template + void CancelAfterTimeout(const std::future& future, const std::chrono::duration& timeout) + { + future.wait_for(timeout); + Disconnect(); + } + + bool IsConnected() const; + bool IsStopped() const; + +private: + AsioTlsOrTcpStream MakeStream() const; + void BackoffWait(const boost::asio::yield_context& yc); + void EnsureConnected(const boost::asio::yield_context& yc); + void Disconnect(boost::asio::yield_context yc); + + void WriteMessage(boost::asio::const_buffer, const boost::asio::yield_context& yc); + HttpResponse WriteMessage(const HttpRequest& request, const boost::asio::yield_context& yc); + + std::atomic_bool m_Stopped{false}; + std::atomic_bool m_Connected{false}; + + bool m_VerifyPeerCertificate; + Shared::Ptr m_SslContext; + + String m_LogFacility; + String m_ParentName; + String m_Host; + String m_Port; + + std::chrono::milliseconds m_RetryTimeout{InitialRetryWait}; + boost::asio::steady_timer m_ReconnectTimer; + boost::asio::io_context::strand m_Strand; + AsioTlsOrTcpStream m_Stream; +}; + +} // namespace icinga From da2fc9df01f85aff96c9eba13e07b203b82dac24 Mon Sep 17 00:00:00 2001 From: Johannes Schmidt Date: Mon, 2 Feb 2026 13:43:31 +0100 Subject: [PATCH 7/9] Use PerfdataWriterConnection in perfdata writers --- lib/perfdata/elasticsearchwriter.cpp | 199 +++++++------------------ lib/perfdata/elasticsearchwriter.hpp | 12 +- lib/perfdata/elasticsearchwriter.ti | 5 +- lib/perfdata/gelfwriter.cpp | 204 ++++++-------------------- lib/perfdata/gelfwriter.hpp | 18 +-- lib/perfdata/gelfwriter.ti | 5 +- lib/perfdata/graphitewriter.cpp | 152 +++---------------- lib/perfdata/graphitewriter.hpp | 18 +-- lib/perfdata/graphitewriter.ti | 5 +- lib/perfdata/influxdbcommonwriter.cpp | 184 ++++++----------------- lib/perfdata/influxdbcommonwriter.hpp | 16 +- lib/perfdata/influxdbcommonwriter.ti | 5 +- lib/perfdata/opentsdbwriter.cpp | 87 +++-------- lib/perfdata/opentsdbwriter.hpp | 11 +- lib/perfdata/opentsdbwriter.ti | 6 +- 15 files changed, 219 insertions(+), 708 deletions(-) diff --git a/lib/perfdata/elasticsearchwriter.cpp b/lib/perfdata/elasticsearchwriter.cpp index 9445a16e0..044656966 100644 --- a/lib/perfdata/elasticsearchwriter.cpp +++ b/lib/perfdata/elasticsearchwriter.cpp @@ -2,6 +2,7 @@ // SPDX-License-Identifier: GPL-2.0-or-later #include "perfdata/elasticsearchwriter.hpp" +#include "base/defer.hpp" #include "perfdata/elasticsearchwriter-ti.cpp" #include "remote/url.hpp" #include "icinga/compatutility.hpp" @@ -9,30 +10,14 @@ #include "icinga/macroprocessor.hpp" #include "icinga/checkcommand.hpp" #include "base/application.hpp" -#include "base/defer.hpp" -#include "base/io-engine.hpp" -#include "base/tcpsocket.hpp" #include "base/stream.hpp" #include "base/base64.hpp" #include "base/json.hpp" #include "base/utility.hpp" -#include "base/networkstream.hpp" #include "base/perfdatavalue.hpp" #include "base/exception.hpp" #include "base/statsfunction.hpp" #include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include #include #include @@ -78,12 +63,25 @@ void ElasticsearchWriter::StatsFunc(const Dictionary::Ptr& status, const Array:: status->Set("elasticsearchwriter", new Dictionary(std::move(nodes))); } +void ElasticsearchWriter::Start(bool runtimeCreated) +{ + ObjectImpl::Start(runtimeCreated); + + if (GetEnableTls()) { + try { + m_SslContext = MakeAsioSslContext(GetCertPath(), GetKeyPath(), GetCaPath()); + } catch (const std::exception& ex) { + Log(LogCritical, "ElasticsearchWriter") + << "Unable to create SSL context: " << ex.what(); + throw; + } + } +} + void ElasticsearchWriter::Resume() { ObjectImpl::Resume(); - m_EventPrefix = "icinga2.event."; - Log(LogInformation, "ElasticsearchWriter") << "'" << GetName() << "' resumed."; @@ -96,6 +94,8 @@ void ElasticsearchWriter::Resume() m_FlushTimer->Start(); m_FlushTimer->Reschedule(0); + m_Connection = new PerfdataWriterConnection{this, GetHost(), GetPort(), m_SslContext, !GetInsecureNoverify()}; + /* Register for new metrics. */ m_HandleCheckResults = Checkable::OnNewCheckResult.connect([this](const Checkable::Ptr& checkable, const CheckResult::Ptr& cr, const MessageOrigin::Ptr&) { @@ -120,12 +120,17 @@ void ElasticsearchWriter::Pause() m_HandleNotifications.disconnect(); m_FlushTimer->Stop(true); - m_WorkQueue.Join(); - { - std::unique_lock lock (m_DataBufferMutex); + std::promise queueDonePromise; + m_WorkQueue.Enqueue([&]() { Flush(); - } + queueDonePromise.set_value(); + }, PriorityLow); + + auto timeout = std::chrono::duration{GetDisconnectTimeout()}; + m_Connection->CancelAfterTimeout(queueDonePromise.get_future(), timeout); + + m_WorkQueue.Join(); Log(LogInformation, "ElasticsearchWriter") << "'" << GetName() << "' paused."; @@ -269,6 +274,10 @@ void ElasticsearchWriter::CheckResultHandler(const Checkable::Ptr& checkable, co AddTemplateTags(fields, checkable, cr); m_WorkQueue.Enqueue([this, checkable, cr, fields = std::move(fields)]() { + if (m_Connection->IsStopped()) { + return; + } + CONTEXT("Elasticwriter processing check result for '" << checkable->GetName() << "'"); AddCheckResult(fields, checkable, cr); @@ -308,6 +317,10 @@ void ElasticsearchWriter::StateChangeHandler(const Checkable::Ptr& checkable, co AddTemplateTags(fields, checkable, cr); m_WorkQueue.Enqueue([this, checkable, cr, fields = std::move(fields)]() { + if (m_Connection->IsStopped()) { + return; + } + CONTEXT("Elasticwriter processing state change '" << checkable->GetName() << "'"); AddCheckResult(fields, checkable, cr); @@ -358,6 +371,10 @@ void ElasticsearchWriter::NotificationSentToAllUsersHandler(const Checkable::Ptr AddTemplateTags(fields, checkable, cr); m_WorkQueue.Enqueue([this, checkable, cr, fields = std::move(fields)]() { + if (m_Connection->IsStopped()) { + return; + } + CONTEXT("Elasticwriter processing notification to all users '" << checkable->GetName() << "'"); Log(LogDebug, "ElasticsearchWriter") @@ -379,15 +396,10 @@ void ElasticsearchWriter::Enqueue(const Checkable::Ptr& checkable, const String& { AssertOnWorkQueue(); - /* Atomically buffer the data point. */ - std::unique_lock lock(m_DataBufferMutex); - /* Format the timestamps to dynamically select the date datatype inside the index. */ fields->Set("@timestamp", FormatTimestamp(ts)); fields->Set("timestamp", FormatTimestamp(ts)); - - String eventType = m_EventPrefix + type; - fields->Set("type", eventType); + fields->Set("type", "icinga2.event." + type); /* Every payload needs a line describing the index. * We do it this way to avoid problems with a near full queue. @@ -408,19 +420,21 @@ void ElasticsearchWriter::Enqueue(const Checkable::Ptr& checkable, const String& } } +/** + * Queues a Flush on the work-queue if there isn't one queued already. + */ void ElasticsearchWriter::FlushTimeout() { - /* Prevent new data points from being added to the array, there is a - * race condition where they could disappear. - */ - std::unique_lock lock(m_DataBufferMutex); - - /* Flush if there are any data available. */ - if (m_DataBuffer.size() > 0) { - Log(LogDebug, "ElasticsearchWriter") - << "Timer expired writing " << m_DataBuffer.size() << " data points"; - Flush(); + if (m_FlushTimerInQueue.exchange(true, std::memory_order_relaxed)) { + return; } + + m_WorkQueue.Enqueue([&]() { + Defer resetFlushTimer{ + [&]() { m_FlushTimerInQueue.store(false, std::memory_order_relaxed); } + }; + Flush(); + }); } void ElasticsearchWriter::Flush() @@ -466,22 +480,6 @@ void ElasticsearchWriter::SendRequest(const String& body) url->SetPath(path); - OptionalTlsStream stream; - - try { - stream = Connect(); - } catch (const std::exception& ex) { - Log(LogWarning, "ElasticsearchWriter") - << "Flush failed, cannot connect to Elasticsearch: " << DiagnosticInformation(ex, false); - return; - } - - Defer s ([&stream]() { - if (stream.first) { - stream.first->next_layer().shutdown(); - } - }); - http::request request (http::verb::post, std::string(url->Format(true)), 10); request.set(http::field::user_agent, "Icinga/" + Application::GetAppVersion()); @@ -511,37 +509,14 @@ void ElasticsearchWriter::SendRequest(const String& body) << "Sending " << request.method_string() << " request" << ((!username.IsEmpty() && !password.IsEmpty()) ? " with basic auth" : "" ) << " to '" << url->Format() << "'."; + decltype(m_Connection->Send(request)) response; try { - if (stream.first) { - http::write(*stream.first, request); - stream.first->flush(); - } else { - http::write(*stream.second, request); - stream.second->flush(); - } - } catch (const std::exception&) { - Log(LogWarning, "ElasticsearchWriter") - << "Cannot write to HTTP API on host '" << GetHost() << "' port '" << GetPort() << "'."; - throw; + response = m_Connection->Send(request); + } catch (const PerfdataWriterConnection::Stopped& ex) { + Log(LogDebug, "ElasticsearchWriter") << ex.what(); + return; } - http::parser parser; - beast::flat_buffer buf; - - try { - if (stream.first) { - http::read(*stream.first, buf, parser); - } else { - http::read(*stream.second, buf, parser); - } - } catch (const std::exception& ex) { - Log(LogWarning, "ElasticsearchWriter") - << "Failed to parse HTTP response from host '" << GetHost() << "' port '" << GetPort() << "': " << DiagnosticInformation(ex, false); - throw; - } - - auto& response (parser.get()); - if (response.result_int() > 299) { if (response.result() == http::status::unauthorized) { /* More verbose error logging with Elasticsearch is hidden behind a proxy. */ @@ -589,66 +564,6 @@ void ElasticsearchWriter::SendRequest(const String& body) } } -OptionalTlsStream ElasticsearchWriter::Connect() -{ - Log(LogNotice, "ElasticsearchWriter") - << "Connecting to Elasticsearch on host '" << GetHost() << "' port '" << GetPort() << "'."; - - OptionalTlsStream stream; - bool tls = GetEnableTls(); - - if (tls) { - Shared::Ptr sslContext; - - try { - sslContext = MakeAsioSslContext(GetCertPath(), GetKeyPath(), GetCaPath()); - } catch (const std::exception&) { - Log(LogWarning, "ElasticsearchWriter") - << "Unable to create SSL context."; - throw; - } - - stream.first = Shared::Make(IoEngine::Get().GetIoContext(), *sslContext, GetHost()); - - } else { - stream.second = Shared::Make(IoEngine::Get().GetIoContext()); - } - - try { - icinga::Connect(tls ? stream.first->lowest_layer() : stream.second->lowest_layer(), GetHost(), GetPort()); - } catch (const std::exception&) { - Log(LogWarning, "ElasticsearchWriter") - << "Can't connect to Elasticsearch on host '" << GetHost() << "' port '" << GetPort() << "'."; - throw; - } - - if (tls) { - auto& tlsStream (stream.first->next_layer()); - - try { - tlsStream.handshake(tlsStream.client); - } catch (const std::exception&) { - Log(LogWarning, "ElasticsearchWriter") - << "TLS handshake with host '" << GetHost() << "' on port " << GetPort() << " failed."; - throw; - } - - if (!GetInsecureNoverify()) { - if (!tlsStream.GetPeerCertificate()) { - BOOST_THROW_EXCEPTION(std::runtime_error("Elasticsearch didn't present any TLS certificate.")); - } - - if (!tlsStream.IsVerifyOK()) { - BOOST_THROW_EXCEPTION(std::runtime_error( - "TLS certificate validation failed: " + std::string(tlsStream.GetVerifyError()) - )); - } - } - } - - return stream; -} - void ElasticsearchWriter::AssertOnWorkQueue() { ASSERT(m_WorkQueue.IsWorkerThread()); diff --git a/lib/perfdata/elasticsearchwriter.hpp b/lib/perfdata/elasticsearchwriter.hpp index c67c4324b..e83f9c935 100644 --- a/lib/perfdata/elasticsearchwriter.hpp +++ b/lib/perfdata/elasticsearchwriter.hpp @@ -5,11 +5,10 @@ #define ELASTICSEARCHWRITER_H #include "perfdata/elasticsearchwriter-ti.hpp" -#include "icinga/service.hpp" +#include "icinga/checkable.hpp" #include "base/configobject.hpp" #include "base/workqueue.hpp" -#include "base/timer.hpp" -#include "base/tlsstream.hpp" +#include "perfdata/perfdatawriterconnection.hpp" namespace icinga { @@ -29,16 +28,18 @@ public: protected: void OnConfigLoaded() override; + void Start(bool runtimeCreated) override; void Resume() override; void Pause() override; private: - String m_EventPrefix; WorkQueue m_WorkQueue{10000000, 1}; boost::signals2::connection m_HandleCheckResults, m_HandleStateChanges, m_HandleNotifications; Timer::Ptr m_FlushTimer; + std::atomic_bool m_FlushTimerInQueue{false}; std::vector m_DataBuffer; - std::mutex m_DataBufferMutex; + Shared::Ptr m_SslContext; + PerfdataWriterConnection::Ptr m_Connection; void AddCheckResult(const Dictionary::Ptr& fields, const Checkable::Ptr& checkable, const CheckResult::Ptr& cr); void AddTemplateTags(const Dictionary::Ptr& fields, const Checkable::Ptr& checkable, const CheckResult::Ptr& cr); @@ -51,7 +52,6 @@ private: void Enqueue(const Checkable::Ptr& checkable, const String& type, const Dictionary::Ptr& fields, double ts); - OptionalTlsStream Connect(); void AssertOnWorkQueue(); void ExceptionHandler(boost::exception_ptr exp); void FlushTimeout(); diff --git a/lib/perfdata/elasticsearchwriter.ti b/lib/perfdata/elasticsearchwriter.ti index 45c468804..c5e7bc3e1 100644 --- a/lib/perfdata/elasticsearchwriter.ti +++ b/lib/perfdata/elasticsearchwriter.ti @@ -40,7 +40,10 @@ class ElasticsearchWriter : ConfigObject [config] String cert_path; [config] String key_path; - [config] int flush_interval { + [config] double disconnect_timeout { + default {{{ return 10; }}} + }; + [config] double flush_interval { default {{{ return 10; }}} }; [config] int flush_threshold { diff --git a/lib/perfdata/gelfwriter.cpp b/lib/perfdata/gelfwriter.cpp index c2bff71dd..6f8567f70 100644 --- a/lib/perfdata/gelfwriter.cpp +++ b/lib/perfdata/gelfwriter.cpp @@ -6,28 +6,19 @@ #include "icinga/service.hpp" #include "icinga/notification.hpp" #include "icinga/checkcommand.hpp" -#include "icinga/macroprocessor.hpp" #include "icinga/compatutility.hpp" -#include "base/tcpsocket.hpp" #include "base/configtype.hpp" #include "base/objectlock.hpp" #include "base/logger.hpp" #include "base/utility.hpp" #include "base/perfdatavalue.hpp" -#include "base/application.hpp" #include "base/stream.hpp" -#include "base/networkstream.hpp" #include "base/context.hpp" #include "base/exception.hpp" #include "base/json.hpp" #include "base/statsfunction.hpp" #include #include -#include "base/io-engine.hpp" -#include -#include -#include -#include using namespace icinga; @@ -62,7 +53,7 @@ void GelfWriter::StatsFunc(const Dictionary::Ptr& status, const Array::Ptr& perf nodes.emplace_back(gelfwriter->GetName(), new Dictionary({ { "work_queue_items", workQueueItems }, { "work_queue_item_rate", workQueueItemRate }, - { "connected", gelfwriter->GetConnected() }, + { "connected", gelfwriter->m_Connection->IsConnected() }, { "source", gelfwriter->GetSource() } })); @@ -73,6 +64,22 @@ void GelfWriter::StatsFunc(const Dictionary::Ptr& status, const Array::Ptr& perf status->Set("gelfwriter", new Dictionary(std::move(nodes))); } +void GelfWriter::Start(bool runtimeCreated) +{ + ObjectImpl::Start(runtimeCreated); + + /* Initialize connection */ + if (GetEnableTls()) { + try { + m_SslContext = MakeAsioSslContext(GetCertPath(), GetKeyPath(), GetCaPath()); + } catch (const std::exception& ex) { + Log(LogWarning, "GelfWriter") + << "Unable to create SSL context."; + throw; + } + } +} + void GelfWriter::Resume() { ObjectImpl::Resume(); @@ -83,12 +90,7 @@ void GelfWriter::Resume() /* Register exception handler for WQ tasks. */ m_WorkQueue.SetExceptionCallback([this](boost::exception_ptr exp) { ExceptionHandler(std::move(exp)); }); - /* Timer for reconnecting */ - m_ReconnectTimer = Timer::Create(); - m_ReconnectTimer->SetInterval(10); - m_ReconnectTimer->OnTimerExpired.connect([this](const Timer * const&) { ReconnectTimerHandler(); }); - m_ReconnectTimer->Start(); - m_ReconnectTimer->Reschedule(0); + m_Connection = new PerfdataWriterConnection{this, GetHost(), GetPort(), m_SslContext, !GetInsecureNoverify()}; /* Register event handlers. */ m_HandleCheckResults = Checkable::OnNewCheckResult.connect([this](const Checkable::Ptr& checkable, @@ -113,18 +115,15 @@ void GelfWriter::Pause() m_HandleNotifications.disconnect(); m_HandleStateChanges.disconnect(); - m_ReconnectTimer->Stop(true); + std::promise queueDonePromise; - m_WorkQueue.Enqueue([this]() { - try { - ReconnectInternal(); - } catch (const std::exception&) { - Log(LogInformation, "GelfWriter") - << "Unable to connect, not flushing buffers. Data may be lost."; - } - }, PriorityImmediate); + m_WorkQueue.Enqueue([&]() { + queueDonePromise.set_value(); + }, PriorityLow); + + auto timeout = std::chrono::duration{GetDisconnectTimeout()}; + m_Connection->CancelAfterTimeout(queueDonePromise.get_future(), timeout); - m_WorkQueue.Enqueue([this]() { DisconnectInternal(); }, PriorityLow); m_WorkQueue.Join(); Log(LogInformation, "GelfWriter") @@ -142,126 +141,6 @@ void GelfWriter::ExceptionHandler(boost::exception_ptr exp) { Log(LogCritical, "GelfWriter") << "Exception during Graylog Gelf operation: " << DiagnosticInformation(exp, false); Log(LogDebug, "GelfWriter") << "Exception during Graylog Gelf operation: " << DiagnosticInformation(exp, true); - - DisconnectInternal(); -} - -void GelfWriter::Reconnect() -{ - AssertOnWorkQueue(); - - if (IsPaused()) { - SetConnected(false); - return; - } - - ReconnectInternal(); -} - -void GelfWriter::ReconnectInternal() -{ - double startTime = Utility::GetTime(); - - CONTEXT("Reconnecting to Graylog Gelf '" << GetName() << "'"); - - SetShouldConnect(true); - - if (GetConnected()) - return; - - Log(LogNotice, "GelfWriter") - << "Reconnecting to Graylog Gelf on host '" << GetHost() << "' port '" << GetPort() << "'."; - - bool ssl = GetEnableTls(); - - if (ssl) { - Shared::Ptr sslContext; - - try { - sslContext = MakeAsioSslContext(GetCertPath(), GetKeyPath(), GetCaPath()); - } catch (const std::exception& ex) { - Log(LogWarning, "GelfWriter") - << "Unable to create SSL context."; - throw; - } - - m_Stream.first = Shared::Make(IoEngine::Get().GetIoContext(), *sslContext, GetHost()); - - } else { - m_Stream.second = Shared::Make(IoEngine::Get().GetIoContext()); - } - - try { - icinga::Connect(ssl ? m_Stream.first->lowest_layer() : m_Stream.second->lowest_layer(), GetHost(), GetPort()); - } catch (const std::exception& ex) { - Log(LogWarning, "GelfWriter") - << "Can't connect to Graylog Gelf on host '" << GetHost() << "' port '" << GetPort() << ".'"; - throw; - } - - if (ssl) { - auto& tlsStream (m_Stream.first->next_layer()); - - try { - tlsStream.handshake(tlsStream.client); - } catch (const std::exception& ex) { - Log(LogWarning, "GelfWriter") - << "TLS handshake with host '" << GetHost() << " failed.'"; - throw; - } - - if (!GetInsecureNoverify()) { - if (!tlsStream.GetPeerCertificate()) { - BOOST_THROW_EXCEPTION(std::runtime_error("Graylog Gelf didn't present any TLS certificate.")); - } - - if (!tlsStream.IsVerifyOK()) { - BOOST_THROW_EXCEPTION(std::runtime_error( - "TLS certificate validation failed: " + std::string(tlsStream.GetVerifyError()) - )); - } - } - } - - SetConnected(true); - - Log(LogInformation, "GelfWriter") - << "Finished reconnecting to Graylog Gelf in " << std::setw(2) << Utility::GetTime() - startTime << " second(s)."; -} - -void GelfWriter::ReconnectTimerHandler() -{ - m_WorkQueue.Enqueue([this]() { Reconnect(); }, PriorityNormal); -} - -void GelfWriter::Disconnect() -{ - AssertOnWorkQueue(); - - DisconnectInternal(); -} - -void GelfWriter::DisconnectInternal() -{ - if (!GetConnected()) - return; - - if (m_Stream.first) { - boost::system::error_code ec; - m_Stream.first->next_layer().shutdown(ec); - - // https://stackoverflow.com/a/25703699 - // As long as the error code's category is not an SSL category, then the protocol was securely shutdown - if (ec.category() == boost::asio::error::get_ssl_category()) { - Log(LogCritical, "GelfWriter") - << "TLS shutdown with host '" << GetHost() << "' could not be done securely."; - } - } else if (m_Stream.second) { - m_Stream.second->close(); - } - - SetConnected(false); - } void GelfWriter::CheckResultHandler(const Checkable::Ptr& checkable, const CheckResult::Ptr& cr) @@ -298,6 +177,10 @@ void GelfWriter::CheckResultHandler(const Checkable::Ptr& checkable, const Check fields->Set("_check_command", checkCommand->GetName()); m_WorkQueue.Enqueue([this, checkable, cr, fields = std::move(fields)]() { + if (m_Connection->IsStopped()) { + return; + } + CONTEXT("GELF Processing check result for '" << checkable->GetName() << "'"); Log(LogDebug, "GelfWriter") @@ -405,6 +288,10 @@ void GelfWriter::NotificationToUserHandler(const Checkable::Ptr& checkable, Noti fields->Set("_check_command", checkable->GetCheckCommand()->GetName()); m_WorkQueue.Enqueue([this, checkable, ts, fields = std::move(fields)]() { + if (m_Connection->IsStopped()) { + return; + } + CONTEXT("GELF Processing notification to all users '" << checkable->GetName() << "'"); Log(LogDebug, "GelfWriter") @@ -447,6 +334,10 @@ void GelfWriter::StateChangeHandler(const Checkable::Ptr& checkable, const Check fields->Set("_check_source", cr->GetCheckSource()); m_WorkQueue.Enqueue([this, checkable, fields = std::move(fields), ts = cr->GetExecutionEnd()]() { + if (m_Connection->IsStopped()) { + return; + } + CONTEXT("GELF Processing state change '" << checkable->GetName() << "'"); Log(LogDebug, "GelfWriter") @@ -473,26 +364,15 @@ void GelfWriter::SendLogMessage(const Checkable::Ptr& checkable, const String& g msgbuf << gelfMessage; msgbuf << '\0'; - String log = msgbuf.str(); - - if (!GetConnected()) - return; + auto log = msgbuf.str(); try { Log(LogDebug, "GelfWriter") << "Checkable '" << checkable->GetName() << "' sending message '" << log << "'."; - if (m_Stream.first) { - boost::asio::write(*m_Stream.first, boost::asio::buffer(msgbuf.str())); - m_Stream.first->flush(); - } else { - boost::asio::write(*m_Stream.second, boost::asio::buffer(msgbuf.str())); - m_Stream.second->flush(); - } - } catch (const std::exception& ex) { - Log(LogCritical, "GelfWriter") - << "Cannot write to TCP socket on host '" << GetHost() << "' port '" << GetPort() << "'."; - - throw ex; + m_Connection->Send(boost::asio::const_buffer{log.data(), log.length()}); + } catch (const PerfdataWriterConnection::Stopped& ex) { + Log(LogDebug, "GelfWriter") << ex.what(); + return; } } diff --git a/lib/perfdata/gelfwriter.hpp b/lib/perfdata/gelfwriter.hpp index e24b6e6ea..f7d2a10c3 100644 --- a/lib/perfdata/gelfwriter.hpp +++ b/lib/perfdata/gelfwriter.hpp @@ -5,12 +5,10 @@ #define GELFWRITER_H #include "perfdata/gelfwriter-ti.hpp" -#include "icinga/service.hpp" +#include "perfdata/perfdatawriterconnection.hpp" +#include "icinga/checkable.hpp" #include "base/configobject.hpp" -#include "base/tcpsocket.hpp" -#include "base/timer.hpp" #include "base/workqueue.hpp" -#include namespace icinga { @@ -30,15 +28,16 @@ public: protected: void OnConfigLoaded() override; + void Start(bool runtimeCreated) override; void Resume() override; void Pause() override; private: - OptionalTlsStream m_Stream; + PerfdataWriterConnection::Ptr m_Connection; WorkQueue m_WorkQueue{10000000, 1}; + Shared::Ptr m_SslContext; boost::signals2::connection m_HandleCheckResults, m_HandleNotifications, m_HandleStateChanges; - Timer::Ptr m_ReconnectTimer; void CheckResultHandler(const Checkable::Ptr& checkable, const CheckResult::Ptr& cr); void NotificationToUserHandler(const Checkable::Ptr& checkable, NotificationType notificationType, const CheckResult::Ptr& cr, @@ -48,13 +47,6 @@ private: String ComposeGelfMessage(const Dictionary::Ptr& fields, const String& source, double ts); void SendLogMessage(const Checkable::Ptr& checkable, const String& gelfMessage); - void ReconnectTimerHandler(); - - void Disconnect(); - void DisconnectInternal(); - void Reconnect(); - void ReconnectInternal(); - void AssertOnWorkQueue(); void ExceptionHandler(boost::exception_ptr exp); diff --git a/lib/perfdata/gelfwriter.ti b/lib/perfdata/gelfwriter.ti index b04debbb4..46c194d1a 100644 --- a/lib/perfdata/gelfwriter.ti +++ b/lib/perfdata/gelfwriter.ti @@ -25,9 +25,8 @@ class GelfWriter : ConfigObject default {{{ return false; }}} }; - [no_user_modify] bool connected; - [no_user_modify] bool should_connect { - default {{{ return true; }}} + [config] double disconnect_timeout { + default {{{ return 10; }}} }; [config] bool enable_ha { default {{{ return false; }}} diff --git a/lib/perfdata/graphitewriter.cpp b/lib/perfdata/graphitewriter.cpp index 652b7d3d1..e00cd9275 100644 --- a/lib/perfdata/graphitewriter.cpp +++ b/lib/perfdata/graphitewriter.cpp @@ -7,16 +7,13 @@ #include "icinga/checkcommand.hpp" #include "icinga/macroprocessor.hpp" #include "icinga/icingaapplication.hpp" -#include "base/tcpsocket.hpp" #include "base/configtype.hpp" #include "base/objectlock.hpp" #include "base/logger.hpp" #include "base/convert.hpp" #include "base/utility.hpp" #include "base/perfdatavalue.hpp" -#include "base/application.hpp" #include "base/stream.hpp" -#include "base/networkstream.hpp" #include "base/exception.hpp" #include "base/statsfunction.hpp" #include @@ -65,7 +62,7 @@ void GraphiteWriter::StatsFunc(const Dictionary::Ptr& status, const Array::Ptr& nodes.emplace_back(graphitewriter->GetName(), new Dictionary({ { "work_queue_items", workQueueItems }, { "work_queue_item_rate", workQueueItemRate }, - { "connected", graphitewriter->GetConnected() } + { "connected", graphitewriter->m_Connection->IsConnected() } })); perfdata->Add(new PerfdataValue("graphitewriter_" + graphitewriter->GetName() + "_work_queue_items", workQueueItems)); @@ -88,12 +85,7 @@ void GraphiteWriter::Resume() /* Register exception handler for WQ tasks. */ m_WorkQueue.SetExceptionCallback([this](boost::exception_ptr exp) { ExceptionHandler(std::move(exp)); }); - /* Timer for reconnecting */ - m_ReconnectTimer = Timer::Create(); - m_ReconnectTimer->SetInterval(10); - m_ReconnectTimer->OnTimerExpired.connect([this](const Timer * const&) { ReconnectTimerHandler(); }); - m_ReconnectTimer->Start(); - m_ReconnectTimer->Reschedule(0); + m_Connection = new PerfdataWriterConnection{this, GetHost(), GetPort()}; /* Register event handlers. */ m_HandleCheckResults = Checkable::OnNewCheckResult.connect([this](const Checkable::Ptr& checkable, @@ -108,20 +100,17 @@ void GraphiteWriter::Resume() void GraphiteWriter::Pause() { m_HandleCheckResults.disconnect(); - m_ReconnectTimer->Stop(true); - try { - ReconnectInternal(); - } catch (const std::exception&) { - Log(LogInformation, "GraphiteWriter") - << "'" << GetName() << "' paused. Unable to connect, not flushing buffers. Data may be lost on reload."; + std::promise queueDonePromise; - ObjectImpl::Pause(); - return; - } + m_WorkQueue.Enqueue([&]() { + queueDonePromise.set_value(); + }, PriorityLow); + + auto timeout = std::chrono::duration{GetDisconnectTimeout()}; + m_Connection->CancelAfterTimeout(queueDonePromise.get_future(), timeout); m_WorkQueue.Join(); - DisconnectInternal(); Log(LogInformation, "GraphiteWriter") << "'" << GetName() << "' paused."; @@ -150,105 +139,6 @@ void GraphiteWriter::ExceptionHandler(boost::exception_ptr exp) Log(LogDebug, "GraphiteWriter") << "Exception during Graphite operation: " << DiagnosticInformation(std::move(exp)); - - if (GetConnected()) { - m_Stream->close(); - - SetConnected(false); - } -} - -/** - * Reconnect method, stops when the feature is paused in HA zones. - * - * Called inside the WQ. - */ -void GraphiteWriter::Reconnect() -{ - AssertOnWorkQueue(); - - if (IsPaused()) { - SetConnected(false); - return; - } - - ReconnectInternal(); -} - -/** - * Reconnect method, connects to a TCP Stream - */ -void GraphiteWriter::ReconnectInternal() -{ - double startTime = Utility::GetTime(); - - CONTEXT("Reconnecting to Graphite '" << GetName() << "'"); - - SetShouldConnect(true); - - if (GetConnected()) - return; - - Log(LogNotice, "GraphiteWriter") - << "Reconnecting to Graphite on host '" << GetHost() << "' port '" << GetPort() << "'."; - - m_Stream = Shared::Make(IoEngine::Get().GetIoContext()); - - try { - icinga::Connect(m_Stream->lowest_layer(), GetHost(), GetPort()); - } catch (const std::exception& ex) { - Log(LogWarning, "GraphiteWriter") - << "Can't connect to Graphite on host '" << GetHost() << "' port '" << GetPort() << ".'"; - - SetConnected(false); - - throw; - } - - SetConnected(true); - - Log(LogInformation, "GraphiteWriter") - << "Finished reconnecting to Graphite in " << std::setw(2) << Utility::GetTime() - startTime << " second(s)."; -} - -/** - * Reconnect handler called by the timer. - * - * Enqueues a reconnect task into the WQ. - */ -void GraphiteWriter::ReconnectTimerHandler() -{ - if (IsPaused()) - return; - - m_WorkQueue.Enqueue([this]() { Reconnect(); }, PriorityHigh); -} - -/** - * Disconnect the stream. - * - * Called inside the WQ. - */ -void GraphiteWriter::Disconnect() -{ - AssertOnWorkQueue(); - - DisconnectInternal(); -} - -/** - * Disconnect the stream. - * - * Called outside the WQ. - */ -void GraphiteWriter::DisconnectInternal() -{ - if (!GetConnected()) - return; - - m_Stream->close(); - - SetConnected(false); } /** @@ -302,11 +192,11 @@ void GraphiteWriter::CheckResultHandler(const Checkable::Ptr& checkable, const C } m_WorkQueue.Enqueue([this, checkable, cr, prefix = std::move(prefix), metadata = std::move(metadata)]() { - CONTEXT("Processing check result for '" << checkable->GetName() << "'"); + if (m_Connection->IsStopped()) { + return; + } - /* TODO: Deal with missing connection here. Needs refactoring - * into parsing the actual performance data and then putting it - * into a queue for re-inserting. */ + CONTEXT("Processing check result for '" << checkable->GetName() << "'"); for (auto& [name, val] : metadata) { SendMetric(checkable, prefix + ".metadata", name, val, cr->GetExecutionEnd()); @@ -394,19 +284,11 @@ void GraphiteWriter::SendMetric(const Checkable::Ptr& checkable, const String& p // do not send \n to debug log msgbuf << "\n"; - std::unique_lock lock(m_StreamMutex); - - if (!GetConnected()) - return; - try { - asio::write(*m_Stream, asio::buffer(msgbuf.str())); - m_Stream->flush(); - } catch (const std::exception& ex) { - Log(LogCritical, "GraphiteWriter") - << "Cannot write to TCP socket on host '" << GetHost() << "' port '" << GetPort() << "'."; - - throw ex; + m_Connection->Send(asio::buffer(msgbuf.str())); + } catch (const PerfdataWriterConnection::Stopped& ex) { + Log(LogDebug, "GraphiteWriter") << ex.what(); + return; } } diff --git a/lib/perfdata/graphitewriter.hpp b/lib/perfdata/graphitewriter.hpp index b28db8172..470fcc07d 100644 --- a/lib/perfdata/graphitewriter.hpp +++ b/lib/perfdata/graphitewriter.hpp @@ -5,13 +5,10 @@ #define GRAPHITEWRITER_H #include "perfdata/graphitewriter-ti.hpp" -#include "icinga/service.hpp" +#include "icinga/checkable.hpp" #include "base/configobject.hpp" -#include "base/tcpsocket.hpp" -#include "base/timer.hpp" #include "base/workqueue.hpp" -#include -#include +#include "perfdata/perfdatawriterconnection.hpp" namespace icinga { @@ -38,12 +35,10 @@ protected: void Pause() override; private: - Shared::Ptr m_Stream; - std::mutex m_StreamMutex; + PerfdataWriterConnection::Ptr m_Connection; WorkQueue m_WorkQueue{10000000, 1}; boost::signals2::connection m_HandleCheckResults; - Timer::Ptr m_ReconnectTimer; void CheckResultHandler(const Checkable::Ptr& checkable, const CheckResult::Ptr& cr); void SendMetric(const Checkable::Ptr& checkable, const String& prefix, const String& name, double value, double ts); @@ -52,13 +47,6 @@ private: static String EscapeMetricLabel(const String& str); static Value EscapeMacroMetric(const Value& value); - void ReconnectTimerHandler(); - - void Disconnect(); - void DisconnectInternal(); - void Reconnect(); - void ReconnectInternal(); - void AssertOnWorkQueue(); void ExceptionHandler(boost::exception_ptr exp); diff --git a/lib/perfdata/graphitewriter.ti b/lib/perfdata/graphitewriter.ti index d89e879dd..f0d9bfb80 100644 --- a/lib/perfdata/graphitewriter.ti +++ b/lib/perfdata/graphitewriter.ti @@ -27,9 +27,8 @@ class GraphiteWriter : ConfigObject [config] bool enable_send_thresholds; [config] bool enable_send_metadata; - [no_user_modify] bool connected; - [no_user_modify] bool should_connect { - default {{{ return true; }}} + [config] double disconnect_timeout { + default {{{ return 10; }}} }; [config] bool enable_ha { default {{{ return false; }}} diff --git a/lib/perfdata/influxdbcommonwriter.cpp b/lib/perfdata/influxdbcommonwriter.cpp index b097dfa8a..62af9d1d9 100644 --- a/lib/perfdata/influxdbcommonwriter.cpp +++ b/lib/perfdata/influxdbcommonwriter.cpp @@ -2,6 +2,7 @@ // SPDX-License-Identifier: GPL-2.0-or-later #include "perfdata/influxdbcommonwriter.hpp" +#include "base/defer.hpp" #include "perfdata/influxdbcommonwriter-ti.cpp" #include "remote/url.hpp" #include "icinga/service.hpp" @@ -9,36 +10,15 @@ #include "icinga/icingaapplication.hpp" #include "icinga/checkcommand.hpp" #include "base/application.hpp" -#include "base/defer.hpp" -#include "base/io-engine.hpp" -#include "base/tcpsocket.hpp" -#include "base/configtype.hpp" #include "base/objectlock.hpp" #include "base/logger.hpp" -#include "base/convert.hpp" -#include "base/utility.hpp" -#include "base/stream.hpp" #include "base/json.hpp" -#include "base/networkstream.hpp" #include "base/exception.hpp" -#include "base/statsfunction.hpp" -#include "base/tlsutility.hpp" #include #include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include #include #include #include -#include #include #include @@ -80,6 +60,21 @@ void InfluxdbCommonWriter::OnConfigLoaded() } } +void InfluxdbCommonWriter::Start(bool runtimeCreated) +{ + ObjectImpl::Start(runtimeCreated); + + if (GetSslEnable()) { + try { + m_SslContext = MakeAsioSslContext(GetSslCert(), GetSslKey(), GetSslCaCert()); + } catch (const std::exception& ex) { + Log(LogCritical, GetReflectionType()->GetName()) + << "Unable to create SSL context: " << ex.what(); + throw; + } + } +} + void InfluxdbCommonWriter::Resume() { ObjectImpl::Resume(); @@ -97,6 +92,8 @@ void InfluxdbCommonWriter::Resume() m_FlushTimer->Start(); m_FlushTimer->Reschedule(0); + m_Connection = new PerfdataWriterConnection{this, GetHost(), GetPort(), m_SslContext, !GetSslInsecureNoverify()}; + /* Register for new metrics. */ m_HandleCheckResults = Checkable::OnNewCheckResult.connect([this](const Checkable::Ptr& checkable, const CheckResult::Ptr& cr, const MessageOrigin::Ptr&) { @@ -114,7 +111,15 @@ void InfluxdbCommonWriter::Pause() << "Processing pending tasks and flushing data buffers."; m_FlushTimer->Stop(true); - m_WorkQueue.Enqueue([this]() { FlushWQ(); }, PriorityLow); + + std::promise queueDonePromise; + m_WorkQueue.Enqueue([&]() { + FlushWQ(); + queueDonePromise.set_value(); + }, PriorityLow); + + auto timeout = std::chrono::duration{GetDisconnectTimeout()}; + m_Connection->CancelAfterTimeout(queueDonePromise.get_future(), timeout); /* Wait for the flush to complete, implicitly waits for all WQ tasks enqueued prior to pausing. */ m_WorkQueue.Join(); @@ -136,68 +141,6 @@ void InfluxdbCommonWriter::ExceptionHandler(boost::exception_ptr exp) Log(LogDebug, GetReflectionType()->GetName()) << "Exception during InfluxDB operation: " << DiagnosticInformation(std::move(exp)); - - //TODO: Close the connection, if we keep it open. -} - -OptionalTlsStream InfluxdbCommonWriter::Connect() -{ - Log(LogNotice, GetReflectionType()->GetName()) - << "Reconnecting to InfluxDB on host '" << GetHost() << "' port '" << GetPort() << "'."; - - OptionalTlsStream stream; - bool ssl = GetSslEnable(); - - if (ssl) { - Shared::Ptr sslContext; - - try { - sslContext = MakeAsioSslContext(GetSslCert(), GetSslKey(), GetSslCaCert()); - } catch (const std::exception& ex) { - Log(LogWarning, GetReflectionType()->GetName()) - << "Unable to create SSL context."; - throw; - } - - stream.first = Shared::Make(IoEngine::Get().GetIoContext(), *sslContext, GetHost()); - - } else { - stream.second = Shared::Make(IoEngine::Get().GetIoContext()); - } - - try { - icinga::Connect(ssl ? stream.first->lowest_layer() : stream.second->lowest_layer(), GetHost(), GetPort()); - } catch (const std::exception& ex) { - Log(LogWarning, GetReflectionType()->GetName()) - << "Can't connect to InfluxDB on host '" << GetHost() << "' port '" << GetPort() << "'."; - throw; - } - - if (ssl) { - auto& tlsStream (stream.first->next_layer()); - - try { - tlsStream.handshake(tlsStream.client); - } catch (const std::exception& ex) { - Log(LogWarning, GetReflectionType()->GetName()) - << "TLS handshake with host '" << GetHost() << "' failed."; - throw; - } - - if (!GetSslInsecureNoverify()) { - if (!tlsStream.GetPeerCertificate()) { - BOOST_THROW_EXCEPTION(std::runtime_error("InfluxDB didn't present any TLS certificate.")); - } - - if (!tlsStream.IsVerifyOK()) { - BOOST_THROW_EXCEPTION(std::runtime_error( - "TLS certificate validation failed: " + std::string(tlsStream.GetVerifyError()) - )); - } - } - } - - return stream; } void InfluxdbCommonWriter::CheckResultHandler(const Checkable::Ptr& checkable, const CheckResult::Ptr& cr) @@ -261,6 +204,10 @@ void InfluxdbCommonWriter::CheckResultHandler(const Checkable::Ptr& checkable, c } m_WorkQueue.Enqueue([this, checkable, cr, tmpl = std::move(tmpl), metadataFields = std::move(fields)]() { + if (m_Connection->IsStopped()) { + return; + } + CONTEXT("Processing check result for '" << checkable->GetName() << "'"); double ts = cr->GetExecutionEnd(); @@ -411,19 +358,19 @@ void InfluxdbCommonWriter::SendMetric(const Checkable::Ptr& checkable, const Dic } } +/** + * Queues a Flush on the work-queue and restarts the timer. + */ void InfluxdbCommonWriter::FlushTimeout() { - m_WorkQueue.Enqueue([this]() { FlushTimeoutWQ(); }, PriorityHigh); -} + if (m_FlushTimerInQueue.exchange(true, std::memory_order_relaxed)) { + return; + } -void InfluxdbCommonWriter::FlushTimeoutWQ() -{ - AssertOnWorkQueue(); - - Log(LogDebug, GetReflectionType()->GetName()) - << "Timer expired writing " << m_DataBuffer.size() << " data points"; - - FlushWQ(); + m_WorkQueue.Enqueue([&]() { + Defer resetFlushTimer{[&]() { m_FlushTimerInQueue.store(false, std::memory_order_relaxed); }}; + FlushWQ(); + }); } void InfluxdbCommonWriter::FlushWQ() @@ -444,55 +391,16 @@ void InfluxdbCommonWriter::FlushWQ() m_DataBuffer.clear(); m_DataBufferSize = 0; - OptionalTlsStream stream; - - try { - stream = Connect(); - } catch (const std::exception& ex) { - Log(LogWarning, GetReflectionType()->GetName()) - << "Flush failed, cannot connect to InfluxDB: " << DiagnosticInformation(ex, false); - return; - } - - Defer s ([&stream]() { - if (stream.first) { - stream.first->next_layer().shutdown(); - } - }); - auto request (AssembleRequest(std::move(body))); + decltype(m_Connection->Send(request)) response; try { - if (stream.first) { - http::write(*stream.first, request); - stream.first->flush(); - } else { - http::write(*stream.second, request); - stream.second->flush(); - } - } catch (const std::exception& ex) { - Log(LogWarning, GetReflectionType()->GetName()) - << "Cannot write to TCP socket on host '" << GetHost() << "' port '" << GetPort() << "'."; - throw; + response = m_Connection->Send(request); + } catch (const PerfdataWriterConnection::Stopped& ex) { + Log(LogDebug, GetReflectionType()->GetName()) << ex.what(); + return; } - http::parser parser; - beast::flat_buffer buf; - - try { - if (stream.first) { - http::read(*stream.first, buf, parser); - } else { - http::read(*stream.second, buf, parser); - } - } catch (const std::exception& ex) { - Log(LogWarning, GetReflectionType()->GetName()) - << "Failed to parse HTTP response from host '" << GetHost() << "' port '" << GetPort() << "': " << DiagnosticInformation(ex); - throw; - } - - auto& response (parser.get()); - if (response.result() != http::status::no_content) { Log(LogWarning, GetReflectionType()->GetName()) << "Unexpected response code: " << response.result(); diff --git a/lib/perfdata/influxdbcommonwriter.hpp b/lib/perfdata/influxdbcommonwriter.hpp index 35caa2f25..cfda02501 100644 --- a/lib/perfdata/influxdbcommonwriter.hpp +++ b/lib/perfdata/influxdbcommonwriter.hpp @@ -5,18 +5,13 @@ #define INFLUXDBCOMMONWRITER_H #include "perfdata/influxdbcommonwriter-ti.hpp" -#include "icinga/service.hpp" +#include "icinga/checkable.hpp" #include "base/configobject.hpp" #include "base/perfdatavalue.hpp" -#include "base/tcpsocket.hpp" -#include "base/timer.hpp" -#include "base/tlsstream.hpp" #include "base/workqueue.hpp" #include "remote/url.hpp" -#include -#include +#include "perfdata/perfdatawriterconnection.hpp" #include -#include namespace icinga { @@ -39,6 +34,7 @@ public: protected: void OnConfigLoaded() override; + void Start(bool runtimeCreated) override; void Resume() override; void Pause() override; @@ -50,22 +46,22 @@ protected: private: boost::signals2::connection m_HandleCheckResults; Timer::Ptr m_FlushTimer; + std::atomic_bool m_FlushTimerInQueue{false}; WorkQueue m_WorkQueue{10000000, 1}; std::vector m_DataBuffer; std::atomic_size_t m_DataBufferSize{0}; + Shared::Ptr m_SslContext; + PerfdataWriterConnection::Ptr m_Connection; void CheckResultHandler(const Checkable::Ptr& checkable, const CheckResult::Ptr& cr); void SendMetric(const Checkable::Ptr& checkable, const Dictionary::Ptr& tmpl, const String& label, const Dictionary::Ptr& fields, double ts); void FlushTimeout(); - void FlushTimeoutWQ(); void FlushWQ(); static String EscapeKeyOrTagValue(const String& str); static String EscapeValue(const Value& value); - OptionalTlsStream Connect(); - void AssertOnWorkQueue(); void ExceptionHandler(boost::exception_ptr exp); diff --git a/lib/perfdata/influxdbcommonwriter.ti b/lib/perfdata/influxdbcommonwriter.ti index 12e867824..7074bceaf 100644 --- a/lib/perfdata/influxdbcommonwriter.ti +++ b/lib/perfdata/influxdbcommonwriter.ti @@ -52,13 +52,16 @@ abstract class InfluxdbCommonWriter : ConfigObject }); }}} }; + [config] double disconnect_timeout { + default {{{ return 10; }}} + }; [config] bool enable_send_thresholds { default {{{ return false; }}} }; [config] bool enable_send_metadata { default {{{ return false; }}} }; - [config] int flush_interval { + [config] double flush_interval { default {{{ return 10; }}} }; [config] int flush_threshold { diff --git a/lib/perfdata/opentsdbwriter.cpp b/lib/perfdata/opentsdbwriter.cpp index 95b43ba76..1b2f82a7d 100644 --- a/lib/perfdata/opentsdbwriter.cpp +++ b/lib/perfdata/opentsdbwriter.cpp @@ -7,17 +7,12 @@ #include "icinga/checkcommand.hpp" #include "icinga/macroprocessor.hpp" #include "icinga/icingaapplication.hpp" -#include "icinga/compatutility.hpp" -#include "base/tcpsocket.hpp" #include "base/configtype.hpp" #include "base/objectlock.hpp" #include "base/logger.hpp" #include "base/convert.hpp" -#include "base/utility.hpp" #include "base/perfdatavalue.hpp" -#include "base/application.hpp" #include "base/stream.hpp" -#include "base/networkstream.hpp" #include "base/exception.hpp" #include "base/statsfunction.hpp" #include @@ -64,7 +59,7 @@ void OpenTsdbWriter::StatsFunc(const Dictionary::Ptr& status, const Array::Ptr& nodes.emplace_back( opentsdbwriter->GetName(), new Dictionary({ - {"connected", opentsdbwriter->GetConnected()}, + { "connected", opentsdbwriter->m_Connection->IsConnected() }, {"work_queue_items", workQueueItems}, {"work_queue_item_rate", workQueueItemRate} } @@ -95,11 +90,7 @@ void OpenTsdbWriter::Resume() ReadConfigTemplate(); - m_ReconnectTimer = Timer::Create(); - m_ReconnectTimer->SetInterval(10); - m_ReconnectTimer->OnTimerExpired.connect([this](const Timer * const&) { ReconnectTimerHandler(); }); - m_ReconnectTimer->Start(); - m_ReconnectTimer->Reschedule(0); + m_Connection = new PerfdataWriterConnection{this, GetHost(), GetPort()}; m_HandleCheckResults = Service::OnNewCheckResult.connect([this](const Checkable::Ptr& checkable, const CheckResult::Ptr& cr, const MessageOrigin::Ptr&) { CheckResultHandler(checkable, cr); @@ -112,62 +103,24 @@ void OpenTsdbWriter::Resume() void OpenTsdbWriter::Pause() { m_HandleCheckResults.disconnect(); - m_ReconnectTimer->Stop(true); + + std::promise queueDonePromise; + + m_WorkQueue.Enqueue([&]() { + queueDonePromise.set_value(); + }, PriorityLow); + + auto timeout = std::chrono::duration{GetDisconnectTimeout()}; + m_Connection->CancelAfterTimeout(queueDonePromise.get_future(), timeout); m_WorkQueue.Join(); Log(LogInformation, "OpentsdbWriter") << "'" << GetName() << "' paused."; - m_Stream->close(); - - SetConnected(false); - ObjectImpl::Pause(); } -/** - * Reconnect handler called by the timer. - * Handles TLS - */ -void OpenTsdbWriter::ReconnectTimerHandler() -{ - if (IsPaused()) - return; - - SetShouldConnect(true); - - if (GetConnected()) - return; - - double startTime = Utility::GetTime(); - - Log(LogNotice, "OpenTsdbWriter") - << "Reconnecting to OpenTSDB TSD on host '" << GetHost() << "' port '" << GetPort() << "'."; - - /* - * We're using telnet as input method. Future PRs may change this into using the HTTP API. - * http://opentsdb.net/docs/build/html/user_guide/writing/index.html#telnet - */ - m_Stream = Shared::Make(IoEngine::Get().GetIoContext()); - - try { - icinga::Connect(m_Stream->lowest_layer(), GetHost(), GetPort()); - } catch (const std::exception& ex) { - Log(LogWarning, "OpenTsdbWriter") - << "Can't connect to OpenTSDB on host '" << GetHost() << "' port '" << GetPort() << "'."; - - SetConnected(false); - - return; - } - - SetConnected(true); - - Log(LogInformation, "OpenTsdbWriter") - << "Finished reconnecting to OpenTSDB in " << std::setw(2) << Utility::GetTime() - startTime << " second(s)."; -} - /** * Registered check result handler processing data. * Calculates tags from the config. @@ -300,7 +253,11 @@ void OpenTsdbWriter::CheckResultHandler(const Checkable::Ptr& checkable, const C metadata.emplace_back("acknowledgement", checkable->GetAcknowledgement()); m_WorkQueue.Enqueue( - [this, checkable, service, cr, metric = std::move(metric), tags = std::move(tags), metadata = std::move(metadata), ts]() mutable { + [this, checkable, service, cr, metric = std::move(metric), tags = std::move(tags), metadata = std::move(metadata)]() mutable { + if (m_Connection->IsStopped()) { + return; + } + double ts = cr->GetExecutionEnd(); for (auto& [name, val] : metadata) { @@ -437,18 +394,14 @@ void OpenTsdbWriter::SendMsgBuffer() { ASSERT(m_WorkQueue.IsWorkerThread()); - if (!GetConnected()) - return; - Log(LogDebug, "OpenTsdbWriter") << "Flushing data buffer to OpenTsdb."; try { - boost::asio::write(*m_Stream, boost::asio::buffer(std::exchange(m_MsgBuf, std::string{}))); - m_Stream->flush(); - } catch (const std::exception& ex) { - Log(LogCritical, "OpenTsdbWriter") - << "Cannot write to TCP socket on host '" << GetHost() << "' port '" << GetPort() << "'."; + m_Connection->Send(boost::asio::buffer(std::exchange(m_MsgBuf, std::string{}))); + } catch (const PerfdataWriterConnection::Stopped& ex) { + Log(LogDebug, "OpenTsdbWriter") << ex.what(); + return; } } diff --git a/lib/perfdata/opentsdbwriter.hpp b/lib/perfdata/opentsdbwriter.hpp index eb85f4436..5db298540 100644 --- a/lib/perfdata/opentsdbwriter.hpp +++ b/lib/perfdata/opentsdbwriter.hpp @@ -5,11 +5,9 @@ #define OPENTSDBWRITER_H #include "perfdata/opentsdbwriter-ti.hpp" -#include "icinga/service.hpp" +#include "icinga/checkable.hpp" #include "base/configobject.hpp" -#include "base/tcpsocket.hpp" -#include "base/timer.hpp" -#include +#include "perfdata/perfdatawriterconnection.hpp" namespace icinga { @@ -38,10 +36,9 @@ protected: private: WorkQueue m_WorkQueue{10000000, 1}; std::string m_MsgBuf; - Shared::Ptr m_Stream; + PerfdataWriterConnection::Ptr m_Connection; boost::signals2::connection m_HandleCheckResults; - Timer::Ptr m_ReconnectTimer; Dictionary::Ptr m_ServiceConfigTemplate; Dictionary::Ptr m_HostConfigTemplate; @@ -55,8 +52,6 @@ private: static String EscapeTag(const String& str); static String EscapeMetric(const String& str); - void ReconnectTimerHandler(); - void ReadConfigTemplate(); }; diff --git a/lib/perfdata/opentsdbwriter.ti b/lib/perfdata/opentsdbwriter.ti index 56bc8cdf4..dcad57168 100644 --- a/lib/perfdata/opentsdbwriter.ti +++ b/lib/perfdata/opentsdbwriter.ti @@ -31,10 +31,8 @@ class OpenTsdbWriter : ConfigObject [config] bool enable_generic_metrics { default {{{ return false; }}} }; - - [no_user_modify] bool connected; - [no_user_modify] bool should_connect { - default {{{ return true; }}} + [config] double disconnect_timeout { + default {{{ return 10; }}} }; }; From 2ac0b8aeb4824182a7cefec096c848d09186cf6a Mon Sep 17 00:00:00 2001 From: Johannes Schmidt Date: Wed, 25 Feb 2026 08:25:52 +0100 Subject: [PATCH 8/9] Add documentation for the new disconnect_timeout config options --- doc/09-object-types.md | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/doc/09-object-types.md b/doc/09-object-types.md index 23bbcff47..2979fe4a4 100644 --- a/doc/09-object-types.md +++ b/doc/09-object-types.md @@ -1214,6 +1214,7 @@ Configuration Attributes: port | Number | **Required.** Elasticsearch port. Defaults to `9200`. index | String | **Required.** Prefix for the index names. Defaults to `icinga2`. enable\_send\_perfdata | Boolean | **Optional.** Send parsed performance data metrics for check results. Defaults to `false`. + diconnect\_timeout | Duration | **Optional.** Timeout to wait for any outstanding data to be flushed to Elasticsearch before disconnecting. Defaults to `10s`. flush\_interval | Duration | **Optional.** How long to buffer data points before transferring to Elasticsearch. Defaults to `10s`. flush\_threshold | Number | **Optional.** How many data points to buffer before forcing a transfer to Elasticsearch. Defaults to `1024`. username | String | **Optional.** Basic auth username if Elasticsearch is hidden behind an HTTP proxy. @@ -1310,6 +1311,7 @@ Configuration Attributes: --------------------------|-----------------------|---------------------------------- host | String | **Optional.** GELF receiver host address. Defaults to `127.0.0.1`. port | Number | **Optional.** GELF receiver port. Defaults to `12201`. + diconnect\_timeout | Duration | **Optional.** Timeout to wait for any outstanding data to be flushed to GELF before disconnecting. Defaults to `10s`. source | String | **Optional.** Source name for this instance. Defaults to `icinga2`. enable\_send\_perfdata | Boolean | **Optional.** Enable performance data for 'CHECK RESULT' events. enable\_ha | Boolean | **Optional.** Enable the high availability functionality. Only valid in a [cluster setup](06-distributed-monitoring.md#distributed-monitoring-high-availability-features). Defaults to `false`. @@ -1340,6 +1342,7 @@ Configuration Attributes: --------------------------|-----------------------|---------------------------------- host | String | **Optional.** Graphite Carbon host address. Defaults to `127.0.0.1`. port | Number | **Optional.** Graphite Carbon port. Defaults to `2003`. + diconnect\_timeout | Duration | **Optional.** Timeout to wait for any outstanding data to be flushed to Graphite before disconnecting. Defaults to `10s`. host\_name\_template | String | **Optional.** Metric prefix for host name. Defaults to `icinga2.$host.name$.host.$host.check_command$`. service\_name\_template | String | **Optional.** Metric prefix for service name. Defaults to `icinga2.$host.name$.services.$service.name$.$service.check_command$`. enable\_send\_thresholds | Boolean | **Optional.** Send additional threshold metrics. Defaults to `false`. @@ -1682,6 +1685,7 @@ Configuration Attributes: service\_template | Dictionary | **Required.** Service template to define the influxDB line protocol. enable\_send\_thresholds | Boolean | **Optional.** Whether to send warn, crit, min & max tagged data. enable\_send\_metadata | Boolean | **Optional.** Whether to send check metadata e.g. states, execution time, latency etc. + diconnect\_timeout | Duration | **Optional.** Timeout to wait for any outstanding data to be flushed to InfluxDB before disconnecting. Defaults to `10s`. flush\_interval | Duration | **Optional.** How long to buffer data points before transferring to InfluxDB. Defaults to `10s`. flush\_threshold | Number | **Optional.** How many data points to buffer before forcing a transfer to InfluxDB. Defaults to `1024`. enable\_ha | Boolean | **Optional.** Enable the high availability functionality. Only valid in a [cluster setup](06-distributed-monitoring.md#distributed-monitoring-high-availability-features). Defaults to `false`. @@ -1745,6 +1749,7 @@ Configuration Attributes: service\_template | Dictionary | **Required.** Service template to define the influxDB line protocol. enable\_send\_thresholds | Boolean | **Optional.** Whether to send warn, crit, min & max tagged data. enable\_send\_metadata | Boolean | **Optional.** Whether to send check metadata e.g. states, execution time, latency etc. + diconnect\_timeout | Duration | **Optional.** Timeout to wait for any outstanding data to be flushed to InfluxDB before disconnecting. Defaults to `10s`. flush\_interval | Duration | **Optional.** How long to buffer data points before transferring to InfluxDB. Defaults to `10s`. flush\_threshold | Number | **Optional.** How many data points to buffer before forcing a transfer to InfluxDB. Defaults to `1024`. enable\_ha | Boolean | **Optional.** Enable the high availability functionality. Only valid in a [cluster setup](06-distributed-monitoring.md#distributed-monitoring-high-availability-features). Defaults to `false`. @@ -1860,6 +1865,7 @@ Configuration Attributes: --------------------------|-----------------------|---------------------------------- host | String | **Optional.** OpenTSDB host address. Defaults to `127.0.0.1`. port | Number | **Optional.** OpenTSDB port. Defaults to `4242`. + diconnect\_timeout | Duration | **Optional.** Timeout to wait for any outstanding data to be flushed to OpenTSDB before disconnecting. Defaults to `10s`. enable\_ha | Boolean | **Optional.** Enable the high availability functionality. Only valid in a [cluster setup](06-distributed-monitoring.md#distributed-monitoring-high-availability-features). Defaults to `false`. enable_generic_metrics | Boolean | **Optional.** Re-use metric names to store different perfdata values for a particular check. Use tags to distinguish perfdata instead of metric name. Defaults to `false`. host_template | Dictionary | **Optional.** Specify additional tags to be included with host metrics. This requires a sub-dictionary named `tags`. Also specify a naming prefix by setting `metric`. More information can be found in [OpenTSDB custom tags](14-features.md#opentsdb-custom-tags) and [OpenTSDB Metric Prefix](14-features.md#opentsdb-metric-prefix). More information can be found in [OpenTSDB custom tags](14-features.md#opentsdb-custom-tags). Defaults to an `empty Dictionary`. From 75b2ec6d969df54ba3bece5ede112220b3d577cd Mon Sep 17 00:00:00 2001 From: Johannes Schmidt Date: Mon, 2 Feb 2026 13:42:11 +0100 Subject: [PATCH 9/9] Add unit-tests for PerfdataWriterConnection There's a set of two tests for each perfdatawriter, just to make sure they can connect and send data that looks reasonably correct, and to make sure pausing actually works while the connection is stuck. Then there's a more in-depth suite of tests for PerfdataWriterConnection itself, to verify that connection handling works well in all types of scenarios. Co-authored-by: Yonas Habteab --- test/CMakeLists.txt | 12 + test/perfdata-elasticsearchwriter.cpp | 57 ++++ test/perfdata-gelfwriter.cpp | 48 +++ test/perfdata-graphitewriter.cpp | 47 +++ test/perfdata-influxdbwriter.cpp | 50 +++ test/perfdata-opentsdbwriter.cpp | 53 ++++ test/perfdata-perfdatatargetfixture.hpp | 198 ++++++++++++ test/perfdata-perfdatawriterconnection.cpp | 335 +++++++++++++++++++++ test/perfdata-perfdatawriterfixture.hpp | 139 +++++++++ test/utils.cpp | 14 + test/utils.hpp | 2 + 11 files changed, 955 insertions(+) create mode 100644 test/perfdata-elasticsearchwriter.cpp create mode 100644 test/perfdata-gelfwriter.cpp create mode 100644 test/perfdata-graphitewriter.cpp create mode 100644 test/perfdata-influxdbwriter.cpp create mode 100644 test/perfdata-opentsdbwriter.cpp create mode 100644 test/perfdata-perfdatatargetfixture.hpp create mode 100644 test/perfdata-perfdatawriterconnection.cpp create mode 100644 test/perfdata-perfdatawriterfixture.hpp diff --git a/test/CMakeLists.txt b/test/CMakeLists.txt index f628fc8a4..d1f0906cb 100644 --- a/test/CMakeLists.txt +++ b/test/CMakeLists.txt @@ -140,6 +140,18 @@ if(ICINGA2_WITH_NOTIFICATION) ) endif() +if(ICINGA2_WITH_PERFDATA) + list(APPEND base_test_SOURCES + perfdata-elasticsearchwriter.cpp + perfdata-gelfwriter.cpp + perfdata-graphitewriter.cpp + perfdata-influxdbwriter.cpp + perfdata-opentsdbwriter.cpp + perfdata-perfdatawriterconnection.cpp + $ + ) +endif() + if(ICINGA2_UNITY_BUILD) mkunity_target(base test base_test_SOURCES) endif() diff --git a/test/perfdata-elasticsearchwriter.cpp b/test/perfdata-elasticsearchwriter.cpp new file mode 100644 index 000000000..ac6abac8d --- /dev/null +++ b/test/perfdata-elasticsearchwriter.cpp @@ -0,0 +1,57 @@ +// SPDX-FileCopyrightText: 2026 Icinga GmbH +// SPDX-License-Identifier: GPL-3.0-or-later + +#include +#include "perfdata/elasticsearchwriter.hpp" +#include "test/base-testloggerfixture.hpp" +#include "test/perfdata-perfdatawriterfixture.hpp" +#include "test/utils.hpp" + +using namespace icinga; + +BOOST_FIXTURE_TEST_SUITE(perfdata_elasticsearchwriter, PerfdataWriterFixture, + *boost::unit_test::label("perfdata") + *boost::unit_test::label("network") +) + +BOOST_AUTO_TEST_CASE(connect) +{ + ResumeWriter(); + + ReceiveCheckResults(1, ServiceState::ServiceCritical); + + Accept(); + auto resp = GetSplitDecodedRequestBody(); + SendResponse(); + + // ElasticsearchWriter wants to send the same message twice, once for the check result + // and once for the "state change". + resp = GetSplitDecodedRequestBody(); + SendResponse(); + + // Just some basic sanity tests. It's not important to check if everything is entirely + // correct here. + BOOST_REQUIRE_GT(resp->GetLength(), 1); + Dictionary::Ptr cr = resp->Get(1); + BOOST_CHECK(cr->Contains("@timestamp")); + BOOST_CHECK_EQUAL(cr->Get("check_command"), "dummy"); + BOOST_CHECK_EQUAL(cr->Get("host"), "h1"); + + PauseWriter(); +} + +BOOST_AUTO_TEST_CASE(pause_with_pending_work) +{ + ResumeWriter(); + + // Process check-results until the writer is stuck. + BOOST_REQUIRE_MESSAGE(GetWriterStuck(10s), "Failed to get Writer stuck."); + + // Now try to pause. + PauseWriter(); + + REQUIRE_LOG_MESSAGE("Connection stopped\\.", 10s); + REQUIRE_LOG_MESSAGE("'ElasticsearchWriter' paused\\.", 10s); +} + +BOOST_AUTO_TEST_SUITE_END() diff --git a/test/perfdata-gelfwriter.cpp b/test/perfdata-gelfwriter.cpp new file mode 100644 index 000000000..8da07bc4a --- /dev/null +++ b/test/perfdata-gelfwriter.cpp @@ -0,0 +1,48 @@ +// SPDX-FileCopyrightText: 2026 Icinga GmbH +// SPDX-License-Identifier: GPL-3.0-or-later + +#include +#include "perfdata/gelfwriter.hpp" +#include "test/base-testloggerfixture.hpp" +#include "test/perfdata-perfdatawriterfixture.hpp" +#include "test/utils.hpp" + +using namespace icinga; + +BOOST_FIXTURE_TEST_SUITE(perfdata_gelfwriter, PerfdataWriterFixture, + *boost::unit_test::label("perfdata") + *boost::unit_test::label("network") +) + +BOOST_AUTO_TEST_CASE(connect) +{ + ResumeWriter(); + + ReceiveCheckResults(1, ServiceState::ServiceCritical); + + Accept(); + Dictionary::Ptr resp = JsonDecode(GetDataUntil('\0')); + + // Just some basic sanity tests. It's not important to check if everything is entirely + // correct here. + BOOST_CHECK_CLOSE(resp->Get("timestamp").Get(), Utility::GetTime(), 0.5); + BOOST_CHECK_EQUAL(resp->Get("_check_command"), "dummy"); + BOOST_CHECK_EQUAL(resp->Get("_hostname"), "h1"); + PauseWriter(); +} + +BOOST_AUTO_TEST_CASE(pause_with_pending_work) +{ + ResumeWriter(); + + // Process check-results until the writer is stuck. + BOOST_REQUIRE_MESSAGE(GetWriterStuck(10s), "Failed to get Writer stuck."); + + // Now stop reading and try to pause OpenTsdbWriter. + PauseWriter(); + + REQUIRE_LOG_MESSAGE("Connection stopped\\.", 1s); + REQUIRE_LOG_MESSAGE("'GelfWriter' paused\\.", 1s); +} + +BOOST_AUTO_TEST_SUITE_END() diff --git a/test/perfdata-graphitewriter.cpp b/test/perfdata-graphitewriter.cpp new file mode 100644 index 000000000..9b5789fe2 --- /dev/null +++ b/test/perfdata-graphitewriter.cpp @@ -0,0 +1,47 @@ +// SPDX-FileCopyrightText: 2026 Icinga GmbH +// SPDX-License-Identifier: GPL-3.0-or-later + +#include +#include "base/perfdatavalue.hpp" +#include "perfdata/graphitewriter.hpp" +#include "test/base-testloggerfixture.hpp" +#include "test/perfdata-perfdatawriterfixture.hpp" +#include "test/utils.hpp" + +using namespace icinga; + +BOOST_FIXTURE_TEST_SUITE(perfdata_graphitewriter, PerfdataWriterFixture, + *boost::unit_test::label("perfdata") + *boost::unit_test::label("network") +) + +BOOST_AUTO_TEST_CASE(connect) +{ + ResumeWriter(); + + ReceiveCheckResults(1, ServiceState::ServiceCritical); + + Accept(); + auto msg = GetDataUntil('\n'); + + // Just some basic sanity tests. It's not important to check if everything is entirely correct here. + std::string_view cmpStr{"icinga2.h1.host.dummy.perfdata.dummy.value 42"}; + BOOST_REQUIRE_EQUAL(msg.substr(0, cmpStr.length()), cmpStr); + PauseWriter(); +} + +BOOST_AUTO_TEST_CASE(pause_with_pending_work) +{ + ResumeWriter(); + + // Process check-results until the writer is stuck. + BOOST_REQUIRE_MESSAGE(GetWriterStuck(10s), "Failed to get Writer stuck."); + + // Now stop reading and try to pause OpenTsdbWriter. + PauseWriter(); + + REQUIRE_LOG_MESSAGE("Connection stopped\\.", 10s); + REQUIRE_LOG_MESSAGE("'GraphiteWriter' paused\\.", 10s); +} + +BOOST_AUTO_TEST_SUITE_END() diff --git a/test/perfdata-influxdbwriter.cpp b/test/perfdata-influxdbwriter.cpp new file mode 100644 index 000000000..43837b50f --- /dev/null +++ b/test/perfdata-influxdbwriter.cpp @@ -0,0 +1,50 @@ +// SPDX-FileCopyrightText: 2026 Icinga GmbH +// SPDX-License-Identifier: GPL-3.0-or-later + +#include +#include "perfdata/influxdb2writer.hpp" +#include "test/base-testloggerfixture.hpp" +#include "test/perfdata-perfdatawriterfixture.hpp" + +using namespace icinga; + +BOOST_FIXTURE_TEST_SUITE(perfdata_influxdbwriter, PerfdataWriterFixture, + *boost::unit_test::label("perfdata") + *boost::unit_test::label("network") +) + +BOOST_AUTO_TEST_CASE(connect) +{ + ResumeWriter(); + + ReceiveCheckResults(1, ServiceState::ServiceCritical); + + Accept(); + auto req = GetSplitRequestBody(','); + SendResponse(boost::beast::http::status::no_content); + + // Just some basic sanity tests. It's not important to check if everything is entirely + // correct here. + BOOST_REQUIRE_EQUAL(req.size(), 3); + BOOST_CHECK_EQUAL(req[0], "dummy"); + BOOST_CHECK_EQUAL(req[1], "hostname=h1"); + std::string_view perfData = "metric=dummy value=42"; + BOOST_CHECK_EQUAL(req[2].substr(0, perfData.length()), perfData); + PauseWriter(); +} + +BOOST_AUTO_TEST_CASE(pause_with_pending_work) +{ + ResumeWriter(); + + // Process check-results until the writer is stuck. + BOOST_REQUIRE_MESSAGE(GetWriterStuck(10s), "Failed to get Writer stuck."); + + // Now try to pause. + PauseWriter(); + + REQUIRE_LOG_MESSAGE("Connection stopped\\.", 10s); + REQUIRE_LOG_MESSAGE("'Influxdb2Writer' paused\\.", 1s); +} + +BOOST_AUTO_TEST_SUITE_END() diff --git a/test/perfdata-opentsdbwriter.cpp b/test/perfdata-opentsdbwriter.cpp new file mode 100644 index 000000000..c3ec47d6d --- /dev/null +++ b/test/perfdata-opentsdbwriter.cpp @@ -0,0 +1,53 @@ +// SPDX-FileCopyrightText: 2026 Icinga GmbH +// SPDX-License-Identifier: GPL-3.0-or-later + +#include +#include "base/perfdatavalue.hpp" +#include "perfdata/opentsdbwriter.hpp" +#include "test/base-testloggerfixture.hpp" +#include "test/perfdata-perfdatawriterfixture.hpp" +#include "test/utils.hpp" + +using namespace icinga; + +BOOST_FIXTURE_TEST_SUITE(perfdata_opentsdbwriter, PerfdataWriterFixture, + *boost::unit_test::label("perfdata") + *boost::unit_test::label("network") +) + +BOOST_AUTO_TEST_CASE(connect) +{ + ResumeWriter(); + + ReceiveCheckResults(1, ServiceState::ServiceCritical); + + Accept(); + auto msg = GetDataUntil('\n'); + std::vector splitMsg; + boost::split(splitMsg, msg, boost::is_any_of(" ")); + + // Just some basic sanity tests. It's not important to check if everything is entirely correct here. + BOOST_REQUIRE_EQUAL(splitMsg.size(), 5); + BOOST_REQUIRE_EQUAL(splitMsg[0], "put"); + BOOST_REQUIRE_EQUAL(splitMsg[1], "icinga.host.state"); + BOOST_REQUIRE_CLOSE(boost::lexical_cast(splitMsg[2]), Utility::GetTime(), 1); + BOOST_REQUIRE_EQUAL(splitMsg[3], "1"); + BOOST_REQUIRE_EQUAL(splitMsg[4], "host=h1"); + PauseWriter(); +} + +BOOST_AUTO_TEST_CASE(pause_with_pending_work) +{ + ResumeWriter(); + + // Process check-results until the writer is stuck. + BOOST_REQUIRE_MESSAGE(GetWriterStuck(10s), "Failed to get Writer stuck."); + + // Now stop reading and try to pause OpenTsdbWriter. + PauseWriter(); + + REQUIRE_LOG_MESSAGE("Connection stopped\\.", 10s); + REQUIRE_LOG_MESSAGE("'OpenTsdbWriter' paused\\.", 10s); +} + +BOOST_AUTO_TEST_SUITE_END() diff --git a/test/perfdata-perfdatatargetfixture.hpp b/test/perfdata-perfdatatargetfixture.hpp new file mode 100644 index 000000000..871610ff5 --- /dev/null +++ b/test/perfdata-perfdatatargetfixture.hpp @@ -0,0 +1,198 @@ +// SPDX-FileCopyrightText: 2026 Icinga GmbH +// SPDX-License-Identifier: GPL-3.0-or-later + +#pragma once + +#include +#include "base/io-engine.hpp" +#include "base/json.hpp" +#include "base/tlsstream.hpp" +#include +#include +#include +#include +#include +#include +#include +#include +#include + +namespace icinga { + +/** + * A fixture that provides methods to simulate a perfdata target + */ +class PerfdataWriterTargetFixture +{ +public: + PerfdataWriterTargetFixture() + : icinga::PerfdataWriterTargetFixture(Shared::Make(IoEngine::Get().GetIoContext())) + { + } + + explicit PerfdataWriterTargetFixture(const Shared::Ptr& sslCtx) + : icinga::PerfdataWriterTargetFixture(Shared::Make(IoEngine::Get().GetIoContext(), *sslCtx)) + { + m_SslContext = sslCtx; + } + + explicit PerfdataWriterTargetFixture(AsioTlsOrTcpStream stream) + : m_Stream(std::move(stream)), + m_Acceptor( + IoEngine::Get().GetIoContext(), + boost::asio::ip::tcp::endpoint{boost::asio::ip::address_v4::loopback(), 0} + ) + { + } + + unsigned short GetPort() { return m_Acceptor.local_endpoint().port(); } + + void Accept() + { + BOOST_REQUIRE_NO_THROW( + std::visit([&](auto& stream) { return m_Acceptor.accept(stream->lowest_layer()); }, m_Stream) + ); + } + + void Handshake() + { + BOOST_REQUIRE(std::holds_alternative::Ptr>(m_Stream)); + using handshake_type = UnbufferedAsioTlsStream::handshake_type; + auto& stream = std::get::Ptr>(m_Stream); + BOOST_REQUIRE_NO_THROW(stream->next_layer().handshake(handshake_type::server)); + BOOST_REQUIRE(stream->next_layer().IsVerifyOK()); + } + + void Shutdown() + { + BOOST_REQUIRE(std::holds_alternative::Ptr>(m_Stream)); + auto& stream = std::get::Ptr>(m_Stream); + try { + stream->next_layer().shutdown(); + } catch (const std::exception& ex) { + if (const auto* se = dynamic_cast(&ex); + !se || se->code() != boost::asio::error::eof) { + BOOST_FAIL("Exception in shutdown(): " << ex.what()); + } + } + + ResetStream(); + } + + void ResetStream() + { + if (std::holds_alternative::Ptr>(m_Stream)) { + m_Stream = Shared::Make(IoEngine::Get().GetIoContext(), *m_SslContext); + } else { + m_Stream = Shared::Make(IoEngine::Get().GetIoContext()); + } + } + + /** + * Reads the HTTP request body from the stream, with an optional limit on the number of bytes to read. + * + * @param bytes The maximum number of bytes to read from the request body. If 0, there is no limit and the entire body will be read. + * + * @return The HTTP request read from the stream. + */ + boost::beast::http::request GetRequest(std::size_t bytes = 0) + { + using namespace boost::beast; + + boost::beast::flat_buffer buf; + if (bytes > 0) { + buf = boost::beast::flat_buffer{bytes}; + } + boost::system::error_code ec; + http::request_parser parser; + parser.body_limit(-1); + std::visit( + [&](auto& stream) { + http::read(*stream, buf, parser, ec); + }, + m_Stream + ); + if (bytes > 0) { + BOOST_REQUIRE_MESSAGE( + !ec || ec == http::error::buffer_overflow, + "Reading request body with a buffer limit of '" << bytes << + "' should either succeed or fail with a buffer_overflow error, but got: " << ec.message() + ); + } else { + BOOST_REQUIRE_MESSAGE(!ec, "Error while reading request body: " << ec.message()); + BOOST_REQUIRE_MESSAGE(parser.is_done(), "Parser did not finish reading the request, but no error was set."); + } + return parser.release(); + } + + auto GetSplitRequestBody(char delim) + { + auto request = GetRequest(); + std::vector result{}; + boost::split(result, request.body(), boost::is_any_of(std::string{delim})); + return result; + } + + auto GetSplitDecodedRequestBody() + { + Array::Ptr result = new Array; + for (const auto& line : GetSplitRequestBody('\n')) { + if (!line.empty()) { + result->Add(JsonDecode(line)); + } + } + return result; + } + + template + std::string GetDataUntil(T&& delim) + { + using namespace boost::asio::ip; + + std::size_t delimLength{1}; + if constexpr (!std::is_same_v, char>) { + delimLength = std::string_view{delim}.size(); + } + + boost::asio::streambuf buf; + boost::system::error_code ec; + auto bytesRead = std::visit( + [&](auto& stream) { return boost::asio::read_until(*stream, buf, std::forward(delim), ec); }, m_Stream + ); + BOOST_REQUIRE_MESSAGE(!ec, ec.message()); + + std::string ret{ + boost::asio::buffers_begin(buf.data()), boost::asio::buffers_begin(buf.data()) + bytesRead - delimLength + }; + buf.consume(bytesRead); + + return ret; + } + + void SendResponse(boost::beast::http::status status = boost::beast::http::status::ok) + { + using namespace boost::asio::ip; + using namespace boost::beast; + + boost::system::error_code ec; + http::response response; + response.result(status); + response.prepare_payload(); + std::visit( + [&](auto& stream) { + http::write(*stream, response, ec); + BOOST_REQUIRE_MESSAGE(!ec, ec.message()); + stream->flush(ec); + BOOST_REQUIRE_MESSAGE(!ec, ec.message()); + }, + m_Stream + ); + } + +private: + AsioTlsOrTcpStream m_Stream; + boost::asio::ip::tcp::acceptor m_Acceptor; + Shared::Ptr m_SslContext; +}; + +} // namespace icinga diff --git a/test/perfdata-perfdatawriterconnection.cpp b/test/perfdata-perfdatawriterconnection.cpp new file mode 100644 index 000000000..649754421 --- /dev/null +++ b/test/perfdata-perfdatawriterconnection.cpp @@ -0,0 +1,335 @@ +// SPDX-FileCopyrightText: 2026 Icinga GmbH +// SPDX-License-Identifier: GPL-3.0-or-later + +#include "perfdata/perfdatawriterconnection.hpp" +#include "test/perfdata-perfdatatargetfixture.hpp" +#include "test/remote-certificate-fixture.hpp" +#include "test/test-ctest.hpp" +#include "test/test-thread.hpp" +#include "test/utils.hpp" + +using namespace icinga; + +class TlsPerfdataWriterFixture : public CertificateFixture, public PerfdataWriterTargetFixture +{ +public: + TlsPerfdataWriterFixture() : PerfdataWriterTargetFixture(MakeContext("server")) + { + m_PdwSslContext = MakeContext("client"); + + m_Conn = new PerfdataWriterConnection{"Test", "test", "127.0.0.1", std::to_string(GetPort()), m_PdwSslContext}; + } + + auto& GetConnection() { return *m_Conn; } + +private: + Shared::Ptr MakeContext(const std::string& name) + { + auto testCert = EnsureCertFor(name); + return SetupSslContext( + testCert.crtFile, + testCert.keyFile, + m_CaCrtFile.string(), + "", + DEFAULT_TLS_CIPHERS, + DEFAULT_TLS_PROTOCOLMIN, + DebugInfo() + ); + } + + Shared::Ptr m_PdwSslContext; + PerfdataWriterConnection::Ptr m_Conn; +}; + +BOOST_FIXTURE_TEST_SUITE(perfdata_connection, TlsPerfdataWriterFixture, + *CTestProperties("FIXTURES_REQUIRED ssl_certs") + *boost::unit_test::label("perfdata") + *boost::unit_test::label("network") +) + +/* If there is no acceptor listening on the other side, connecting should fail. + */ +BOOST_AUTO_TEST_CASE(connection_refused) +{ + std::promise p; + TestThread timeoutThread{[&]() { + auto f = p.get_future(); + GetConnection().CancelAfterTimeout(f, 50ms); + }}; + + BOOST_REQUIRE_THROW( + GetConnection().Send(boost::asio::const_buffer{"foobar", 7}), PerfdataWriterConnection::Stopped + ); + + REQUIRE_JOINS_WITHIN(timeoutThread, 1s); +} + +/* The PerfdataWriterConnection connects automatically when sending the first data. + * In case of http we also need to support disconnecting and reconnecting. + */ +BOOST_AUTO_TEST_CASE(ensure_connected) +{ + std::promise disconnectedPromise; + + TestThread mockTargetThread{[&]() { + Accept(); + Handshake(); + auto ret = GetDataUntil('\0'); + Shutdown(); + disconnectedPromise.get_future().get(); + BOOST_REQUIRE_EQUAL(ret, "foobar"); + }}; + + BOOST_REQUIRE_NO_THROW(GetConnection().Send(boost::asio::const_buffer{"foobar", 7})); + BOOST_REQUIRE_NO_THROW(GetConnection().Disconnect()); + disconnectedPromise.set_value(); + + REQUIRE_JOINS_WITHIN(mockTargetThread, 1s); +} + +/* Verify that data can still be sent while CancelAfterTimeout is waiting and the timeout + * can be aborted when all data has been sent successfully. + */ +BOOST_AUTO_TEST_CASE(finish_during_timeout) +{ + std::promise p; + + TestThread mockTargetThread{[&]() { + Accept(); + Handshake(); + auto ret = GetDataUntil('\0'); + BOOST_REQUIRE_EQUAL(ret, "foobar"); + ret = GetDataUntil('\0'); + BOOST_REQUIRE_EQUAL(ret, "foobar"); + // This is done here instead of the main thread after send, because we need to + // synchronize the asserts done in the timeoutThread after this point. + p.set_value(); + Shutdown(); + }}; + + GetConnection().Send(boost::asio::const_buffer{"foobar", 7}); + + TestThread timeoutThread{[&]() { + auto f = p.get_future(); + GetConnection().CancelAfterTimeout(f, 50ms); + BOOST_REQUIRE(f.wait_for(0ms) == std::future_status::ready); + BOOST_REQUIRE(!GetConnection().IsConnected()); + }}; + + GetConnection().Send(boost::asio::const_buffer{"foobar", 7}); + + REQUIRE_JOINS_WITHIN(timeoutThread, 1s); + REQUIRE_JOINS_WITHIN(mockTargetThread, 1s); +} + +/* For the client, even a hanging server will accept the connection immediately, since it's done + * in the kernel. But in that case the TLS handshake will be stuck, so we need to verify that a + * handshake can be interrupted by CancelAfterTimeout(). + */ +BOOST_AUTO_TEST_CASE(stuck_in_handshake) +{ + TestThread mockTargetThread{[&]() { Accept(); }}; + + std::promise p; + TestThread timeoutThread{[&]() { + auto f = p.get_future(); + GetConnection().CancelAfterTimeout(f, 50ms); + BOOST_REQUIRE(f.wait_for(0ms) == std::future_status::timeout); + }}; + + BOOST_REQUIRE_THROW( + GetConnection().Send(boost::asio::const_buffer{"foobar", 7}), PerfdataWriterConnection::Stopped + ); + + REQUIRE_JOINS_WITHIN(timeoutThread, 1s); + REQUIRE_JOINS_WITHIN(mockTargetThread, 1s); +} + +/* When the disconnect timeout runs out while sending something to a slow or blocking server, we + * expect the send to be aborted after a timeout with an 'operation cancelled' exception, in + * order to not delay the shutdown of a perfdata writer indefinitely. + * No orderly TLS shutdown can be performed in this case, because the stream has been truncated. + * The server will need to handle this one on their own. + */ +BOOST_AUTO_TEST_CASE(stuck_sending) +{ + std::promise shutdownPromise; + std::promise dataReadPromise; + + TestThread mockTargetThread{[&]() { + Accept(); + Handshake(); + auto ret = GetDataUntil("#"); + BOOST_REQUIRE_EQUAL(ret, "foobar"); + dataReadPromise.set_value(); + + // There's still a full buffer waiting to be read, but we're pretending to be dead and + // close the socket at this point. + shutdownPromise.get_future().get(); + ResetStream(); + }}; + + TestThread timeoutThread{[&]() { + // Synchronize with when mockTargetThread has read the initial data. + // This should especially help with timing on slow machines like the ARM GHA runners. + dataReadPromise.get_future().get(); + BOOST_REQUIRE(GetConnection().IsConnected()); + BOOST_REQUIRE_NO_THROW(GetConnection().Disconnect()); + BOOST_REQUIRE(!GetConnection().IsConnected()); + }}; + + // Allocate a large string that will fill the buffers on both sides of the connection, in + // order to make Send() block. + auto randomData = GetRandomString("foobar#", 4UL * 1024 * 1024); + auto buf = boost::asio::const_buffer{randomData.data(), randomData.size()}; + BOOST_REQUIRE_THROW(GetConnection().Send(buf), PerfdataWriterConnection::Stopped); + shutdownPromise.set_value(); + + REQUIRE_JOINS_WITHIN(timeoutThread, 1s); + REQUIRE_JOINS_WITHIN(mockTargetThread, 1s); +} + +/* This simulates a server that is stuck after receiving a HTTP request and before sending their + * response. Here, the simulated server is polite and still responds to a shutdown request, but + * in reality a server might not even do that. That case should be handled by our + * AsioTlsStream::GracefulDisconnect() function with an additional 10s timeout. + */ +BOOST_AUTO_TEST_CASE(stuck_reading_response) +{ + std::promise shutdownPromise; + std::promise requestReadPromise; + + TestThread mockTargetThread{[&]() { + Accept(); + Handshake(); + auto ret = GetRequest(); + BOOST_REQUIRE_EQUAL(ret.body(), "bar"); + requestReadPromise.set_value(); + // Do not send a response but react to the shutdown to be polite. + shutdownPromise.get_future().get(); + Shutdown(); + }}; + + TestThread timeoutThread{[&]() { + // Synchronize with after mockTargetThread has read the request + requestReadPromise.get_future().get(); + BOOST_REQUIRE(GetConnection().IsConnected()); + BOOST_REQUIRE_NO_THROW(GetConnection().Disconnect()); + BOOST_REQUIRE(!GetConnection().IsConnected()); + }}; + + boost::beast::http::request request; + request.body() = "bar"; + request.method(boost::beast::http::verb::get); + request.target("foo"); + request.prepare_payload(); + BOOST_REQUIRE_THROW(GetConnection().Send(request), PerfdataWriterConnection::Stopped); + shutdownPromise.set_value(); + + REQUIRE_JOINS_WITHIN(timeoutThread, 1s); + REQUIRE_JOINS_WITHIN(mockTargetThread, 1s); +} + +/* This test simulates a server that closes the connection and reappears at a later time. + * PerfdataWriterConnection should detect the disconnect, catch the exception and attempt to + * reconnect without exiting Send(). + */ +BOOST_AUTO_TEST_CASE(reconnect_failed) +{ + TestThread mockTargetThread{[&]() { + Accept(); + Handshake(); + auto ret = GetDataUntil("#"); + BOOST_REQUIRE_EQUAL(ret, "foobar"); + + ResetStream(); + + Accept(); + Handshake(); + + ret = GetDataUntil("#"); + BOOST_REQUIRE_EQUAL(ret, "foobar"); + ret = GetDataUntil("\n"); + + Shutdown(); + }}; + + // Allocate a large string that will fill the buffers on both sides of the connection, in + // order to make Send() block. + auto randomData = GetRandomString("foobar#", 4UL * 1024 * 1024); + randomData.push_back('\n'); + BOOST_REQUIRE_NO_THROW(GetConnection().Send(boost::asio::const_buffer{randomData.data(), randomData.size()})); + BOOST_REQUIRE_NO_THROW(GetConnection().Disconnect()); + + REQUIRE_JOINS_WITHIN(mockTargetThread, 1s); +} + +/* This tests if retrying an http send will reproducibly lead to the exact same message being + * received. Normally this us guaranteed by the interface only accepting a const reference, but + * since on older boost versions the async_write() functions also accept non-const references, it + * doesn't hurt to ensure this with a test-case. + */ +BOOST_AUTO_TEST_CASE(http_send_retry) +{ + TestThread mockTargetThread{[&] { + Accept(); + Handshake(); + + /* Read only the first 512 bytes of the request body, since we don't want to unblock the client yet. + */ + auto request = GetRequest(512); + BOOST_REQUIRE_MESSAGE( + request.method() == boost::beast::http::verb::post, + "Request method is not POST: " << request.method_string() + ); + BOOST_REQUIRE_MESSAGE(request.target() == "foo", "Request target is not 'foo': " << request.target()); + BOOST_REQUIRE_MESSAGE( + request.body().compare(0, 7, "foobar#") == 0, + "Request body does not start with 'foobar#': " << request.body().substr(0, 7) + ); + + ResetStream(); + Accept(); + Handshake(); + + /* Read the entire response now and verify that we still get the expected body, + * even though the first read was only partial. + */ + request = GetRequest(); + BOOST_REQUIRE_MESSAGE( + request.method() == boost::beast::http::verb::post, + "Request method is not POST: " << request.method_string() + ); + BOOST_REQUIRE_MESSAGE(request.target() == "foo", "Request target is not 'foo': " << request.target()); + BOOST_REQUIRE_MESSAGE( + request.body().compare(0, 7, "foobar#") == 0, + "Request body does not start with 'foobar#': " << request.body().substr(0, 7) + ); + + /* The body size is 4MB + 7 bytes (7 bytes for the "foobar#" prefix of the generated message) + */ + BOOST_REQUIRE_MESSAGE( + request.body().size() == (4UL * 1024 * 1024) + 7, + "Request body is not the expected size: " << request.body().size() + ); + + SendResponse(); + + Shutdown(); + }}; + + boost::beast::http::request request{boost::beast::http::verb::post, "foo", 10}; + request.set(boost::beast::http::field::host, "localhost:" + std::to_string(GetPort())); + + /* Allocate a large string that will fill the buffers on both sides of the connection, in + * order to make Send() block. + */ + request.body() = GetRandomString("foobar#", 4UL * 1024 * 1024); + request.prepare_payload(); + BOOST_REQUIRE_NO_THROW(GetConnection().Send(request)); + BOOST_REQUIRE_NO_THROW(GetConnection().Disconnect()); + + REQUIRE_JOINS_WITHIN(mockTargetThread, 1s); +} + +BOOST_AUTO_TEST_SUITE_END() diff --git a/test/perfdata-perfdatawriterfixture.hpp b/test/perfdata-perfdatawriterfixture.hpp new file mode 100644 index 000000000..e70b2123c --- /dev/null +++ b/test/perfdata-perfdatawriterfixture.hpp @@ -0,0 +1,139 @@ +// SPDX-FileCopyrightText: 2026 Icinga GmbH +// SPDX-License-Identifier: GPL-3.0-or-later + +#pragma once + +#include +#include "base/perfdatavalue.hpp" +#include "config/configcompiler.hpp" +#include "config/configitem.hpp" +#include "icinga/host.hpp" +#include "test/base-testloggerfixture.hpp" +#include "test/perfdata-perfdatatargetfixture.hpp" +#include "test/utils.hpp" +#include + +namespace icinga { + +template +class PerfdataWriterFixture : public PerfdataWriterTargetFixture, public TestLoggerFixture +{ +public: + PerfdataWriterFixture() : m_Writer(new Writer) + { + auto createObjects = [&]() { + String config = R"CONFIG( +object CheckCommand "dummy" { + command = "/bin/echo" +} +object Host "h1" { + address = "h1" + check_command = "dummy" + enable_notifications = true + enable_active_checks = false + enable_passive_checks = true +} +)CONFIG"; + + std::unique_ptr expr = ConfigCompiler::CompileText("", config); + expr->Evaluate(*ScriptFrame::GetCurrentFrame()); + }; + + ConfigItem::RunWithActivationContext(new Function("CreateTestObjects", createObjects)); + + m_Host = Host::GetByName("h1"); + BOOST_REQUIRE(m_Host); + + m_Writer->SetPort(std::to_string(GetPort())); + m_Writer->SetName(m_Writer->GetReflectionType()->GetName()); + m_Writer->SetDisconnectTimeout(0.05); + m_Writer->Register(); + + auto hasFlushInterval = boost::hana::is_valid([](auto&& obj) -> decltype(obj.SetFlushInterval(0.05)) {}); + if constexpr (decltype(hasFlushInterval(std::declval()))::value) { + m_Writer->SetFlushInterval(0.05); + } + + auto hasFlushThreshold = boost::hana::is_valid([](auto&& obj) -> decltype(obj.SetFlushThreshold(1)) {}); + if constexpr (decltype(hasFlushThreshold(std::declval()))::value) { + m_Writer->SetFlushThreshold(1); + } + } + + void ReceiveCheckResults( + std::size_t num, + ServiceState state, + const std::function& fn = {} + ) + { + ::ReceiveCheckResults(m_Host, num, state, fn); + } + + std::size_t GetWorkQueueLength() + { + Array::Ptr dummy = new Array; + Dictionary::Ptr status = new Dictionary; + m_Writer->StatsFunc(status, dummy); + ObjectLock lock{status}; + // Unpack the single-key top-level dictionary + Dictionary::Ptr writer = status->Begin()->second; + BOOST_REQUIRE(writer); + Dictionary::Ptr values = writer->Get(m_Writer->GetName()); + BOOST_REQUIRE(values); + BOOST_REQUIRE(values->Contains("work_queue_items")); + return values->Get("work_queue_items"); + } + + /** + * Processes check results until the writer's work queue is no longer moving. + * + * @param timeout Time after which to give up trying to get the writer stuck + * @return true if the writer is now stuck + */ + bool GetWriterStuck(std::chrono::milliseconds timeout) + { + auto start = std::chrono::steady_clock::now(); + std::size_t unchangedCount = 0; + while(true){ + ReceiveCheckResults(10, ServiceCritical, [&](const CheckResult::Ptr& cr) { + cr->GetPerformanceData()->Add(new PerfdataValue{GetRandomString("", 4096), 1}); + }); + + if (std::chrono::steady_clock::now() - start >= timeout) { + return false; + } + + auto numWq = GetWorkQueueLength(); + if (numWq >= 10) { + std::this_thread::sleep_for(1ms); + if (numWq == GetWorkQueueLength()) { + if (unchangedCount < 5) { + ++unchangedCount; + continue; + } + return true; + } + + unchangedCount = 0; + } + } + } + + void ResumeWriter() + { + static_cast(m_Writer)->OnConfigLoaded(); + m_Writer->SetActive(true); + m_Writer->Activate(); + BOOST_REQUIRE(!m_Writer->IsPaused()); + } + + void PauseWriter() { static_cast(m_Writer)->Pause(); } + + auto GetWriter() { return m_Writer; } + +private: + Host::Ptr m_Host; + typename Writer::Ptr m_Writer; +}; + +} // namespace icinga diff --git a/test/utils.cpp b/test/utils.cpp index f056a51d9..95a9936cd 100644 --- a/test/utils.cpp +++ b/test/utils.cpp @@ -5,6 +5,7 @@ #include "base/perfdatavalue.hpp" #include #include +#include #include #include @@ -68,6 +69,19 @@ GlobalTimezoneFixture::~GlobalTimezoneFixture() tzset(); } +std::string GetRandomString(std::string prefix, std::size_t length) +{ + std::random_device rd; + std::mt19937 gen(rd()); + std::uniform_int_distribution distribution('!', '~'); + + for (auto i = 0U; i < length; i++) { + prefix += static_cast(distribution(gen)); + } + + return prefix; +} + /** * Make our test host receive a number of check-results. * diff --git a/test/utils.hpp b/test/utils.hpp index e0bdf1129..ec8b245d4 100644 --- a/test/utils.hpp +++ b/test/utils.hpp @@ -27,6 +27,8 @@ struct GlobalTimezoneFixture char *tz; }; +std::string GetRandomString(std::string prefix, std::size_t length); + void ReceiveCheckResults( const icinga::Checkable::Ptr& host, std::size_t num,