diff --git a/doc/09-object-types.md b/doc/09-object-types.md index 23bbcff47..2979fe4a4 100644 --- a/doc/09-object-types.md +++ b/doc/09-object-types.md @@ -1214,6 +1214,7 @@ Configuration Attributes: port | Number | **Required.** Elasticsearch port. Defaults to `9200`. index | String | **Required.** Prefix for the index names. Defaults to `icinga2`. enable\_send\_perfdata | Boolean | **Optional.** Send parsed performance data metrics for check results. Defaults to `false`. + diconnect\_timeout | Duration | **Optional.** Timeout to wait for any outstanding data to be flushed to Elasticsearch before disconnecting. Defaults to `10s`. flush\_interval | Duration | **Optional.** How long to buffer data points before transferring to Elasticsearch. Defaults to `10s`. flush\_threshold | Number | **Optional.** How many data points to buffer before forcing a transfer to Elasticsearch. Defaults to `1024`. username | String | **Optional.** Basic auth username if Elasticsearch is hidden behind an HTTP proxy. @@ -1310,6 +1311,7 @@ Configuration Attributes: --------------------------|-----------------------|---------------------------------- host | String | **Optional.** GELF receiver host address. Defaults to `127.0.0.1`. port | Number | **Optional.** GELF receiver port. Defaults to `12201`. + diconnect\_timeout | Duration | **Optional.** Timeout to wait for any outstanding data to be flushed to GELF before disconnecting. Defaults to `10s`. source | String | **Optional.** Source name for this instance. Defaults to `icinga2`. enable\_send\_perfdata | Boolean | **Optional.** Enable performance data for 'CHECK RESULT' events. enable\_ha | Boolean | **Optional.** Enable the high availability functionality. Only valid in a [cluster setup](06-distributed-monitoring.md#distributed-monitoring-high-availability-features). Defaults to `false`. @@ -1340,6 +1342,7 @@ Configuration Attributes: --------------------------|-----------------------|---------------------------------- host | String | **Optional.** Graphite Carbon host address. Defaults to `127.0.0.1`. port | Number | **Optional.** Graphite Carbon port. Defaults to `2003`. + diconnect\_timeout | Duration | **Optional.** Timeout to wait for any outstanding data to be flushed to Graphite before disconnecting. Defaults to `10s`. host\_name\_template | String | **Optional.** Metric prefix for host name. Defaults to `icinga2.$host.name$.host.$host.check_command$`. service\_name\_template | String | **Optional.** Metric prefix for service name. Defaults to `icinga2.$host.name$.services.$service.name$.$service.check_command$`. enable\_send\_thresholds | Boolean | **Optional.** Send additional threshold metrics. Defaults to `false`. @@ -1682,6 +1685,7 @@ Configuration Attributes: service\_template | Dictionary | **Required.** Service template to define the influxDB line protocol. enable\_send\_thresholds | Boolean | **Optional.** Whether to send warn, crit, min & max tagged data. enable\_send\_metadata | Boolean | **Optional.** Whether to send check metadata e.g. states, execution time, latency etc. + diconnect\_timeout | Duration | **Optional.** Timeout to wait for any outstanding data to be flushed to InfluxDB before disconnecting. Defaults to `10s`. flush\_interval | Duration | **Optional.** How long to buffer data points before transferring to InfluxDB. Defaults to `10s`. flush\_threshold | Number | **Optional.** How many data points to buffer before forcing a transfer to InfluxDB. Defaults to `1024`. enable\_ha | Boolean | **Optional.** Enable the high availability functionality. Only valid in a [cluster setup](06-distributed-monitoring.md#distributed-monitoring-high-availability-features). Defaults to `false`. @@ -1745,6 +1749,7 @@ Configuration Attributes: service\_template | Dictionary | **Required.** Service template to define the influxDB line protocol. enable\_send\_thresholds | Boolean | **Optional.** Whether to send warn, crit, min & max tagged data. enable\_send\_metadata | Boolean | **Optional.** Whether to send check metadata e.g. states, execution time, latency etc. + diconnect\_timeout | Duration | **Optional.** Timeout to wait for any outstanding data to be flushed to InfluxDB before disconnecting. Defaults to `10s`. flush\_interval | Duration | **Optional.** How long to buffer data points before transferring to InfluxDB. Defaults to `10s`. flush\_threshold | Number | **Optional.** How many data points to buffer before forcing a transfer to InfluxDB. Defaults to `1024`. enable\_ha | Boolean | **Optional.** Enable the high availability functionality. Only valid in a [cluster setup](06-distributed-monitoring.md#distributed-monitoring-high-availability-features). Defaults to `false`. @@ -1860,6 +1865,7 @@ Configuration Attributes: --------------------------|-----------------------|---------------------------------- host | String | **Optional.** OpenTSDB host address. Defaults to `127.0.0.1`. port | Number | **Optional.** OpenTSDB port. Defaults to `4242`. + diconnect\_timeout | Duration | **Optional.** Timeout to wait for any outstanding data to be flushed to OpenTSDB before disconnecting. Defaults to `10s`. enable\_ha | Boolean | **Optional.** Enable the high availability functionality. Only valid in a [cluster setup](06-distributed-monitoring.md#distributed-monitoring-high-availability-features). Defaults to `false`. enable_generic_metrics | Boolean | **Optional.** Re-use metric names to store different perfdata values for a particular check. Use tags to distinguish perfdata instead of metric name. Defaults to `false`. host_template | Dictionary | **Optional.** Specify additional tags to be included with host metrics. This requires a sub-dictionary named `tags`. Also specify a naming prefix by setting `metric`. More information can be found in [OpenTSDB custom tags](14-features.md#opentsdb-custom-tags) and [OpenTSDB Metric Prefix](14-features.md#opentsdb-metric-prefix). More information can be found in [OpenTSDB custom tags](14-features.md#opentsdb-custom-tags). Defaults to an `empty Dictionary`. diff --git a/lib/base/i2-base.hpp b/lib/base/i2-base.hpp index caf67f6f4..440862ff1 100644 --- a/lib/base/i2-base.hpp +++ b/lib/base/i2-base.hpp @@ -76,5 +76,10 @@ #define BOOST_BIND_NO_PLACEHOLDERS #include +#include + +namespace icinga { +using namespace std::chrono_literals; +} // namespace icinga #endif /* I2BASE_H */ diff --git a/lib/perfdata/CMakeLists.txt b/lib/perfdata/CMakeLists.txt index 3bbbe3b43..c53e9e9b6 100644 --- a/lib/perfdata/CMakeLists.txt +++ b/lib/perfdata/CMakeLists.txt @@ -19,6 +19,7 @@ set(perfdata_SOURCES influxdb2writer.cpp influxdb2writer.hpp influxdb2writer-ti.hpp opentsdbwriter.cpp opentsdbwriter.hpp opentsdbwriter-ti.hpp perfdatawriter.cpp perfdatawriter.hpp perfdatawriter-ti.hpp + perfdatawriterconnection.cpp perfdatawriterconnection.hpp ) if(ICINGA2_UNITY_BUILD) diff --git a/lib/perfdata/elasticsearchwriter.cpp b/lib/perfdata/elasticsearchwriter.cpp index 9445a16e0..044656966 100644 --- a/lib/perfdata/elasticsearchwriter.cpp +++ b/lib/perfdata/elasticsearchwriter.cpp @@ -2,6 +2,7 @@ // SPDX-License-Identifier: GPL-2.0-or-later #include "perfdata/elasticsearchwriter.hpp" +#include "base/defer.hpp" #include "perfdata/elasticsearchwriter-ti.cpp" #include "remote/url.hpp" #include "icinga/compatutility.hpp" @@ -9,30 +10,14 @@ #include "icinga/macroprocessor.hpp" #include "icinga/checkcommand.hpp" #include "base/application.hpp" -#include "base/defer.hpp" -#include "base/io-engine.hpp" -#include "base/tcpsocket.hpp" #include "base/stream.hpp" #include "base/base64.hpp" #include "base/json.hpp" #include "base/utility.hpp" -#include "base/networkstream.hpp" #include "base/perfdatavalue.hpp" #include "base/exception.hpp" #include "base/statsfunction.hpp" #include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include #include #include @@ -78,12 +63,25 @@ void ElasticsearchWriter::StatsFunc(const Dictionary::Ptr& status, const Array:: status->Set("elasticsearchwriter", new Dictionary(std::move(nodes))); } +void ElasticsearchWriter::Start(bool runtimeCreated) +{ + ObjectImpl::Start(runtimeCreated); + + if (GetEnableTls()) { + try { + m_SslContext = MakeAsioSslContext(GetCertPath(), GetKeyPath(), GetCaPath()); + } catch (const std::exception& ex) { + Log(LogCritical, "ElasticsearchWriter") + << "Unable to create SSL context: " << ex.what(); + throw; + } + } +} + void ElasticsearchWriter::Resume() { ObjectImpl::Resume(); - m_EventPrefix = "icinga2.event."; - Log(LogInformation, "ElasticsearchWriter") << "'" << GetName() << "' resumed."; @@ -96,6 +94,8 @@ void ElasticsearchWriter::Resume() m_FlushTimer->Start(); m_FlushTimer->Reschedule(0); + m_Connection = new PerfdataWriterConnection{this, GetHost(), GetPort(), m_SslContext, !GetInsecureNoverify()}; + /* Register for new metrics. */ m_HandleCheckResults = Checkable::OnNewCheckResult.connect([this](const Checkable::Ptr& checkable, const CheckResult::Ptr& cr, const MessageOrigin::Ptr&) { @@ -120,12 +120,17 @@ void ElasticsearchWriter::Pause() m_HandleNotifications.disconnect(); m_FlushTimer->Stop(true); - m_WorkQueue.Join(); - { - std::unique_lock lock (m_DataBufferMutex); + std::promise queueDonePromise; + m_WorkQueue.Enqueue([&]() { Flush(); - } + queueDonePromise.set_value(); + }, PriorityLow); + + auto timeout = std::chrono::duration{GetDisconnectTimeout()}; + m_Connection->CancelAfterTimeout(queueDonePromise.get_future(), timeout); + + m_WorkQueue.Join(); Log(LogInformation, "ElasticsearchWriter") << "'" << GetName() << "' paused."; @@ -269,6 +274,10 @@ void ElasticsearchWriter::CheckResultHandler(const Checkable::Ptr& checkable, co AddTemplateTags(fields, checkable, cr); m_WorkQueue.Enqueue([this, checkable, cr, fields = std::move(fields)]() { + if (m_Connection->IsStopped()) { + return; + } + CONTEXT("Elasticwriter processing check result for '" << checkable->GetName() << "'"); AddCheckResult(fields, checkable, cr); @@ -308,6 +317,10 @@ void ElasticsearchWriter::StateChangeHandler(const Checkable::Ptr& checkable, co AddTemplateTags(fields, checkable, cr); m_WorkQueue.Enqueue([this, checkable, cr, fields = std::move(fields)]() { + if (m_Connection->IsStopped()) { + return; + } + CONTEXT("Elasticwriter processing state change '" << checkable->GetName() << "'"); AddCheckResult(fields, checkable, cr); @@ -358,6 +371,10 @@ void ElasticsearchWriter::NotificationSentToAllUsersHandler(const Checkable::Ptr AddTemplateTags(fields, checkable, cr); m_WorkQueue.Enqueue([this, checkable, cr, fields = std::move(fields)]() { + if (m_Connection->IsStopped()) { + return; + } + CONTEXT("Elasticwriter processing notification to all users '" << checkable->GetName() << "'"); Log(LogDebug, "ElasticsearchWriter") @@ -379,15 +396,10 @@ void ElasticsearchWriter::Enqueue(const Checkable::Ptr& checkable, const String& { AssertOnWorkQueue(); - /* Atomically buffer the data point. */ - std::unique_lock lock(m_DataBufferMutex); - /* Format the timestamps to dynamically select the date datatype inside the index. */ fields->Set("@timestamp", FormatTimestamp(ts)); fields->Set("timestamp", FormatTimestamp(ts)); - - String eventType = m_EventPrefix + type; - fields->Set("type", eventType); + fields->Set("type", "icinga2.event." + type); /* Every payload needs a line describing the index. * We do it this way to avoid problems with a near full queue. @@ -408,19 +420,21 @@ void ElasticsearchWriter::Enqueue(const Checkable::Ptr& checkable, const String& } } +/** + * Queues a Flush on the work-queue if there isn't one queued already. + */ void ElasticsearchWriter::FlushTimeout() { - /* Prevent new data points from being added to the array, there is a - * race condition where they could disappear. - */ - std::unique_lock lock(m_DataBufferMutex); - - /* Flush if there are any data available. */ - if (m_DataBuffer.size() > 0) { - Log(LogDebug, "ElasticsearchWriter") - << "Timer expired writing " << m_DataBuffer.size() << " data points"; - Flush(); + if (m_FlushTimerInQueue.exchange(true, std::memory_order_relaxed)) { + return; } + + m_WorkQueue.Enqueue([&]() { + Defer resetFlushTimer{ + [&]() { m_FlushTimerInQueue.store(false, std::memory_order_relaxed); } + }; + Flush(); + }); } void ElasticsearchWriter::Flush() @@ -466,22 +480,6 @@ void ElasticsearchWriter::SendRequest(const String& body) url->SetPath(path); - OptionalTlsStream stream; - - try { - stream = Connect(); - } catch (const std::exception& ex) { - Log(LogWarning, "ElasticsearchWriter") - << "Flush failed, cannot connect to Elasticsearch: " << DiagnosticInformation(ex, false); - return; - } - - Defer s ([&stream]() { - if (stream.first) { - stream.first->next_layer().shutdown(); - } - }); - http::request request (http::verb::post, std::string(url->Format(true)), 10); request.set(http::field::user_agent, "Icinga/" + Application::GetAppVersion()); @@ -511,37 +509,14 @@ void ElasticsearchWriter::SendRequest(const String& body) << "Sending " << request.method_string() << " request" << ((!username.IsEmpty() && !password.IsEmpty()) ? " with basic auth" : "" ) << " to '" << url->Format() << "'."; + decltype(m_Connection->Send(request)) response; try { - if (stream.first) { - http::write(*stream.first, request); - stream.first->flush(); - } else { - http::write(*stream.second, request); - stream.second->flush(); - } - } catch (const std::exception&) { - Log(LogWarning, "ElasticsearchWriter") - << "Cannot write to HTTP API on host '" << GetHost() << "' port '" << GetPort() << "'."; - throw; + response = m_Connection->Send(request); + } catch (const PerfdataWriterConnection::Stopped& ex) { + Log(LogDebug, "ElasticsearchWriter") << ex.what(); + return; } - http::parser parser; - beast::flat_buffer buf; - - try { - if (stream.first) { - http::read(*stream.first, buf, parser); - } else { - http::read(*stream.second, buf, parser); - } - } catch (const std::exception& ex) { - Log(LogWarning, "ElasticsearchWriter") - << "Failed to parse HTTP response from host '" << GetHost() << "' port '" << GetPort() << "': " << DiagnosticInformation(ex, false); - throw; - } - - auto& response (parser.get()); - if (response.result_int() > 299) { if (response.result() == http::status::unauthorized) { /* More verbose error logging with Elasticsearch is hidden behind a proxy. */ @@ -589,66 +564,6 @@ void ElasticsearchWriter::SendRequest(const String& body) } } -OptionalTlsStream ElasticsearchWriter::Connect() -{ - Log(LogNotice, "ElasticsearchWriter") - << "Connecting to Elasticsearch on host '" << GetHost() << "' port '" << GetPort() << "'."; - - OptionalTlsStream stream; - bool tls = GetEnableTls(); - - if (tls) { - Shared::Ptr sslContext; - - try { - sslContext = MakeAsioSslContext(GetCertPath(), GetKeyPath(), GetCaPath()); - } catch (const std::exception&) { - Log(LogWarning, "ElasticsearchWriter") - << "Unable to create SSL context."; - throw; - } - - stream.first = Shared::Make(IoEngine::Get().GetIoContext(), *sslContext, GetHost()); - - } else { - stream.second = Shared::Make(IoEngine::Get().GetIoContext()); - } - - try { - icinga::Connect(tls ? stream.first->lowest_layer() : stream.second->lowest_layer(), GetHost(), GetPort()); - } catch (const std::exception&) { - Log(LogWarning, "ElasticsearchWriter") - << "Can't connect to Elasticsearch on host '" << GetHost() << "' port '" << GetPort() << "'."; - throw; - } - - if (tls) { - auto& tlsStream (stream.first->next_layer()); - - try { - tlsStream.handshake(tlsStream.client); - } catch (const std::exception&) { - Log(LogWarning, "ElasticsearchWriter") - << "TLS handshake with host '" << GetHost() << "' on port " << GetPort() << " failed."; - throw; - } - - if (!GetInsecureNoverify()) { - if (!tlsStream.GetPeerCertificate()) { - BOOST_THROW_EXCEPTION(std::runtime_error("Elasticsearch didn't present any TLS certificate.")); - } - - if (!tlsStream.IsVerifyOK()) { - BOOST_THROW_EXCEPTION(std::runtime_error( - "TLS certificate validation failed: " + std::string(tlsStream.GetVerifyError()) - )); - } - } - } - - return stream; -} - void ElasticsearchWriter::AssertOnWorkQueue() { ASSERT(m_WorkQueue.IsWorkerThread()); diff --git a/lib/perfdata/elasticsearchwriter.hpp b/lib/perfdata/elasticsearchwriter.hpp index c67c4324b..e83f9c935 100644 --- a/lib/perfdata/elasticsearchwriter.hpp +++ b/lib/perfdata/elasticsearchwriter.hpp @@ -5,11 +5,10 @@ #define ELASTICSEARCHWRITER_H #include "perfdata/elasticsearchwriter-ti.hpp" -#include "icinga/service.hpp" +#include "icinga/checkable.hpp" #include "base/configobject.hpp" #include "base/workqueue.hpp" -#include "base/timer.hpp" -#include "base/tlsstream.hpp" +#include "perfdata/perfdatawriterconnection.hpp" namespace icinga { @@ -29,16 +28,18 @@ public: protected: void OnConfigLoaded() override; + void Start(bool runtimeCreated) override; void Resume() override; void Pause() override; private: - String m_EventPrefix; WorkQueue m_WorkQueue{10000000, 1}; boost::signals2::connection m_HandleCheckResults, m_HandleStateChanges, m_HandleNotifications; Timer::Ptr m_FlushTimer; + std::atomic_bool m_FlushTimerInQueue{false}; std::vector m_DataBuffer; - std::mutex m_DataBufferMutex; + Shared::Ptr m_SslContext; + PerfdataWriterConnection::Ptr m_Connection; void AddCheckResult(const Dictionary::Ptr& fields, const Checkable::Ptr& checkable, const CheckResult::Ptr& cr); void AddTemplateTags(const Dictionary::Ptr& fields, const Checkable::Ptr& checkable, const CheckResult::Ptr& cr); @@ -51,7 +52,6 @@ private: void Enqueue(const Checkable::Ptr& checkable, const String& type, const Dictionary::Ptr& fields, double ts); - OptionalTlsStream Connect(); void AssertOnWorkQueue(); void ExceptionHandler(boost::exception_ptr exp); void FlushTimeout(); diff --git a/lib/perfdata/elasticsearchwriter.ti b/lib/perfdata/elasticsearchwriter.ti index 45c468804..c5e7bc3e1 100644 --- a/lib/perfdata/elasticsearchwriter.ti +++ b/lib/perfdata/elasticsearchwriter.ti @@ -40,7 +40,10 @@ class ElasticsearchWriter : ConfigObject [config] String cert_path; [config] String key_path; - [config] int flush_interval { + [config] double disconnect_timeout { + default {{{ return 10; }}} + }; + [config] double flush_interval { default {{{ return 10; }}} }; [config] int flush_threshold { diff --git a/lib/perfdata/gelfwriter.cpp b/lib/perfdata/gelfwriter.cpp index c2bff71dd..6f8567f70 100644 --- a/lib/perfdata/gelfwriter.cpp +++ b/lib/perfdata/gelfwriter.cpp @@ -6,28 +6,19 @@ #include "icinga/service.hpp" #include "icinga/notification.hpp" #include "icinga/checkcommand.hpp" -#include "icinga/macroprocessor.hpp" #include "icinga/compatutility.hpp" -#include "base/tcpsocket.hpp" #include "base/configtype.hpp" #include "base/objectlock.hpp" #include "base/logger.hpp" #include "base/utility.hpp" #include "base/perfdatavalue.hpp" -#include "base/application.hpp" #include "base/stream.hpp" -#include "base/networkstream.hpp" #include "base/context.hpp" #include "base/exception.hpp" #include "base/json.hpp" #include "base/statsfunction.hpp" #include #include -#include "base/io-engine.hpp" -#include -#include -#include -#include using namespace icinga; @@ -62,7 +53,7 @@ void GelfWriter::StatsFunc(const Dictionary::Ptr& status, const Array::Ptr& perf nodes.emplace_back(gelfwriter->GetName(), new Dictionary({ { "work_queue_items", workQueueItems }, { "work_queue_item_rate", workQueueItemRate }, - { "connected", gelfwriter->GetConnected() }, + { "connected", gelfwriter->m_Connection->IsConnected() }, { "source", gelfwriter->GetSource() } })); @@ -73,6 +64,22 @@ void GelfWriter::StatsFunc(const Dictionary::Ptr& status, const Array::Ptr& perf status->Set("gelfwriter", new Dictionary(std::move(nodes))); } +void GelfWriter::Start(bool runtimeCreated) +{ + ObjectImpl::Start(runtimeCreated); + + /* Initialize connection */ + if (GetEnableTls()) { + try { + m_SslContext = MakeAsioSslContext(GetCertPath(), GetKeyPath(), GetCaPath()); + } catch (const std::exception& ex) { + Log(LogWarning, "GelfWriter") + << "Unable to create SSL context."; + throw; + } + } +} + void GelfWriter::Resume() { ObjectImpl::Resume(); @@ -83,12 +90,7 @@ void GelfWriter::Resume() /* Register exception handler for WQ tasks. */ m_WorkQueue.SetExceptionCallback([this](boost::exception_ptr exp) { ExceptionHandler(std::move(exp)); }); - /* Timer for reconnecting */ - m_ReconnectTimer = Timer::Create(); - m_ReconnectTimer->SetInterval(10); - m_ReconnectTimer->OnTimerExpired.connect([this](const Timer * const&) { ReconnectTimerHandler(); }); - m_ReconnectTimer->Start(); - m_ReconnectTimer->Reschedule(0); + m_Connection = new PerfdataWriterConnection{this, GetHost(), GetPort(), m_SslContext, !GetInsecureNoverify()}; /* Register event handlers. */ m_HandleCheckResults = Checkable::OnNewCheckResult.connect([this](const Checkable::Ptr& checkable, @@ -113,18 +115,15 @@ void GelfWriter::Pause() m_HandleNotifications.disconnect(); m_HandleStateChanges.disconnect(); - m_ReconnectTimer->Stop(true); + std::promise queueDonePromise; - m_WorkQueue.Enqueue([this]() { - try { - ReconnectInternal(); - } catch (const std::exception&) { - Log(LogInformation, "GelfWriter") - << "Unable to connect, not flushing buffers. Data may be lost."; - } - }, PriorityImmediate); + m_WorkQueue.Enqueue([&]() { + queueDonePromise.set_value(); + }, PriorityLow); + + auto timeout = std::chrono::duration{GetDisconnectTimeout()}; + m_Connection->CancelAfterTimeout(queueDonePromise.get_future(), timeout); - m_WorkQueue.Enqueue([this]() { DisconnectInternal(); }, PriorityLow); m_WorkQueue.Join(); Log(LogInformation, "GelfWriter") @@ -142,126 +141,6 @@ void GelfWriter::ExceptionHandler(boost::exception_ptr exp) { Log(LogCritical, "GelfWriter") << "Exception during Graylog Gelf operation: " << DiagnosticInformation(exp, false); Log(LogDebug, "GelfWriter") << "Exception during Graylog Gelf operation: " << DiagnosticInformation(exp, true); - - DisconnectInternal(); -} - -void GelfWriter::Reconnect() -{ - AssertOnWorkQueue(); - - if (IsPaused()) { - SetConnected(false); - return; - } - - ReconnectInternal(); -} - -void GelfWriter::ReconnectInternal() -{ - double startTime = Utility::GetTime(); - - CONTEXT("Reconnecting to Graylog Gelf '" << GetName() << "'"); - - SetShouldConnect(true); - - if (GetConnected()) - return; - - Log(LogNotice, "GelfWriter") - << "Reconnecting to Graylog Gelf on host '" << GetHost() << "' port '" << GetPort() << "'."; - - bool ssl = GetEnableTls(); - - if (ssl) { - Shared::Ptr sslContext; - - try { - sslContext = MakeAsioSslContext(GetCertPath(), GetKeyPath(), GetCaPath()); - } catch (const std::exception& ex) { - Log(LogWarning, "GelfWriter") - << "Unable to create SSL context."; - throw; - } - - m_Stream.first = Shared::Make(IoEngine::Get().GetIoContext(), *sslContext, GetHost()); - - } else { - m_Stream.second = Shared::Make(IoEngine::Get().GetIoContext()); - } - - try { - icinga::Connect(ssl ? m_Stream.first->lowest_layer() : m_Stream.second->lowest_layer(), GetHost(), GetPort()); - } catch (const std::exception& ex) { - Log(LogWarning, "GelfWriter") - << "Can't connect to Graylog Gelf on host '" << GetHost() << "' port '" << GetPort() << ".'"; - throw; - } - - if (ssl) { - auto& tlsStream (m_Stream.first->next_layer()); - - try { - tlsStream.handshake(tlsStream.client); - } catch (const std::exception& ex) { - Log(LogWarning, "GelfWriter") - << "TLS handshake with host '" << GetHost() << " failed.'"; - throw; - } - - if (!GetInsecureNoverify()) { - if (!tlsStream.GetPeerCertificate()) { - BOOST_THROW_EXCEPTION(std::runtime_error("Graylog Gelf didn't present any TLS certificate.")); - } - - if (!tlsStream.IsVerifyOK()) { - BOOST_THROW_EXCEPTION(std::runtime_error( - "TLS certificate validation failed: " + std::string(tlsStream.GetVerifyError()) - )); - } - } - } - - SetConnected(true); - - Log(LogInformation, "GelfWriter") - << "Finished reconnecting to Graylog Gelf in " << std::setw(2) << Utility::GetTime() - startTime << " second(s)."; -} - -void GelfWriter::ReconnectTimerHandler() -{ - m_WorkQueue.Enqueue([this]() { Reconnect(); }, PriorityNormal); -} - -void GelfWriter::Disconnect() -{ - AssertOnWorkQueue(); - - DisconnectInternal(); -} - -void GelfWriter::DisconnectInternal() -{ - if (!GetConnected()) - return; - - if (m_Stream.first) { - boost::system::error_code ec; - m_Stream.first->next_layer().shutdown(ec); - - // https://stackoverflow.com/a/25703699 - // As long as the error code's category is not an SSL category, then the protocol was securely shutdown - if (ec.category() == boost::asio::error::get_ssl_category()) { - Log(LogCritical, "GelfWriter") - << "TLS shutdown with host '" << GetHost() << "' could not be done securely."; - } - } else if (m_Stream.second) { - m_Stream.second->close(); - } - - SetConnected(false); - } void GelfWriter::CheckResultHandler(const Checkable::Ptr& checkable, const CheckResult::Ptr& cr) @@ -298,6 +177,10 @@ void GelfWriter::CheckResultHandler(const Checkable::Ptr& checkable, const Check fields->Set("_check_command", checkCommand->GetName()); m_WorkQueue.Enqueue([this, checkable, cr, fields = std::move(fields)]() { + if (m_Connection->IsStopped()) { + return; + } + CONTEXT("GELF Processing check result for '" << checkable->GetName() << "'"); Log(LogDebug, "GelfWriter") @@ -405,6 +288,10 @@ void GelfWriter::NotificationToUserHandler(const Checkable::Ptr& checkable, Noti fields->Set("_check_command", checkable->GetCheckCommand()->GetName()); m_WorkQueue.Enqueue([this, checkable, ts, fields = std::move(fields)]() { + if (m_Connection->IsStopped()) { + return; + } + CONTEXT("GELF Processing notification to all users '" << checkable->GetName() << "'"); Log(LogDebug, "GelfWriter") @@ -447,6 +334,10 @@ void GelfWriter::StateChangeHandler(const Checkable::Ptr& checkable, const Check fields->Set("_check_source", cr->GetCheckSource()); m_WorkQueue.Enqueue([this, checkable, fields = std::move(fields), ts = cr->GetExecutionEnd()]() { + if (m_Connection->IsStopped()) { + return; + } + CONTEXT("GELF Processing state change '" << checkable->GetName() << "'"); Log(LogDebug, "GelfWriter") @@ -473,26 +364,15 @@ void GelfWriter::SendLogMessage(const Checkable::Ptr& checkable, const String& g msgbuf << gelfMessage; msgbuf << '\0'; - String log = msgbuf.str(); - - if (!GetConnected()) - return; + auto log = msgbuf.str(); try { Log(LogDebug, "GelfWriter") << "Checkable '" << checkable->GetName() << "' sending message '" << log << "'."; - if (m_Stream.first) { - boost::asio::write(*m_Stream.first, boost::asio::buffer(msgbuf.str())); - m_Stream.first->flush(); - } else { - boost::asio::write(*m_Stream.second, boost::asio::buffer(msgbuf.str())); - m_Stream.second->flush(); - } - } catch (const std::exception& ex) { - Log(LogCritical, "GelfWriter") - << "Cannot write to TCP socket on host '" << GetHost() << "' port '" << GetPort() << "'."; - - throw ex; + m_Connection->Send(boost::asio::const_buffer{log.data(), log.length()}); + } catch (const PerfdataWriterConnection::Stopped& ex) { + Log(LogDebug, "GelfWriter") << ex.what(); + return; } } diff --git a/lib/perfdata/gelfwriter.hpp b/lib/perfdata/gelfwriter.hpp index e24b6e6ea..f7d2a10c3 100644 --- a/lib/perfdata/gelfwriter.hpp +++ b/lib/perfdata/gelfwriter.hpp @@ -5,12 +5,10 @@ #define GELFWRITER_H #include "perfdata/gelfwriter-ti.hpp" -#include "icinga/service.hpp" +#include "perfdata/perfdatawriterconnection.hpp" +#include "icinga/checkable.hpp" #include "base/configobject.hpp" -#include "base/tcpsocket.hpp" -#include "base/timer.hpp" #include "base/workqueue.hpp" -#include namespace icinga { @@ -30,15 +28,16 @@ public: protected: void OnConfigLoaded() override; + void Start(bool runtimeCreated) override; void Resume() override; void Pause() override; private: - OptionalTlsStream m_Stream; + PerfdataWriterConnection::Ptr m_Connection; WorkQueue m_WorkQueue{10000000, 1}; + Shared::Ptr m_SslContext; boost::signals2::connection m_HandleCheckResults, m_HandleNotifications, m_HandleStateChanges; - Timer::Ptr m_ReconnectTimer; void CheckResultHandler(const Checkable::Ptr& checkable, const CheckResult::Ptr& cr); void NotificationToUserHandler(const Checkable::Ptr& checkable, NotificationType notificationType, const CheckResult::Ptr& cr, @@ -48,13 +47,6 @@ private: String ComposeGelfMessage(const Dictionary::Ptr& fields, const String& source, double ts); void SendLogMessage(const Checkable::Ptr& checkable, const String& gelfMessage); - void ReconnectTimerHandler(); - - void Disconnect(); - void DisconnectInternal(); - void Reconnect(); - void ReconnectInternal(); - void AssertOnWorkQueue(); void ExceptionHandler(boost::exception_ptr exp); diff --git a/lib/perfdata/gelfwriter.ti b/lib/perfdata/gelfwriter.ti index b04debbb4..46c194d1a 100644 --- a/lib/perfdata/gelfwriter.ti +++ b/lib/perfdata/gelfwriter.ti @@ -25,9 +25,8 @@ class GelfWriter : ConfigObject default {{{ return false; }}} }; - [no_user_modify] bool connected; - [no_user_modify] bool should_connect { - default {{{ return true; }}} + [config] double disconnect_timeout { + default {{{ return 10; }}} }; [config] bool enable_ha { default {{{ return false; }}} diff --git a/lib/perfdata/graphitewriter.cpp b/lib/perfdata/graphitewriter.cpp index 652b7d3d1..e00cd9275 100644 --- a/lib/perfdata/graphitewriter.cpp +++ b/lib/perfdata/graphitewriter.cpp @@ -7,16 +7,13 @@ #include "icinga/checkcommand.hpp" #include "icinga/macroprocessor.hpp" #include "icinga/icingaapplication.hpp" -#include "base/tcpsocket.hpp" #include "base/configtype.hpp" #include "base/objectlock.hpp" #include "base/logger.hpp" #include "base/convert.hpp" #include "base/utility.hpp" #include "base/perfdatavalue.hpp" -#include "base/application.hpp" #include "base/stream.hpp" -#include "base/networkstream.hpp" #include "base/exception.hpp" #include "base/statsfunction.hpp" #include @@ -65,7 +62,7 @@ void GraphiteWriter::StatsFunc(const Dictionary::Ptr& status, const Array::Ptr& nodes.emplace_back(graphitewriter->GetName(), new Dictionary({ { "work_queue_items", workQueueItems }, { "work_queue_item_rate", workQueueItemRate }, - { "connected", graphitewriter->GetConnected() } + { "connected", graphitewriter->m_Connection->IsConnected() } })); perfdata->Add(new PerfdataValue("graphitewriter_" + graphitewriter->GetName() + "_work_queue_items", workQueueItems)); @@ -88,12 +85,7 @@ void GraphiteWriter::Resume() /* Register exception handler for WQ tasks. */ m_WorkQueue.SetExceptionCallback([this](boost::exception_ptr exp) { ExceptionHandler(std::move(exp)); }); - /* Timer for reconnecting */ - m_ReconnectTimer = Timer::Create(); - m_ReconnectTimer->SetInterval(10); - m_ReconnectTimer->OnTimerExpired.connect([this](const Timer * const&) { ReconnectTimerHandler(); }); - m_ReconnectTimer->Start(); - m_ReconnectTimer->Reschedule(0); + m_Connection = new PerfdataWriterConnection{this, GetHost(), GetPort()}; /* Register event handlers. */ m_HandleCheckResults = Checkable::OnNewCheckResult.connect([this](const Checkable::Ptr& checkable, @@ -108,20 +100,17 @@ void GraphiteWriter::Resume() void GraphiteWriter::Pause() { m_HandleCheckResults.disconnect(); - m_ReconnectTimer->Stop(true); - try { - ReconnectInternal(); - } catch (const std::exception&) { - Log(LogInformation, "GraphiteWriter") - << "'" << GetName() << "' paused. Unable to connect, not flushing buffers. Data may be lost on reload."; + std::promise queueDonePromise; - ObjectImpl::Pause(); - return; - } + m_WorkQueue.Enqueue([&]() { + queueDonePromise.set_value(); + }, PriorityLow); + + auto timeout = std::chrono::duration{GetDisconnectTimeout()}; + m_Connection->CancelAfterTimeout(queueDonePromise.get_future(), timeout); m_WorkQueue.Join(); - DisconnectInternal(); Log(LogInformation, "GraphiteWriter") << "'" << GetName() << "' paused."; @@ -150,105 +139,6 @@ void GraphiteWriter::ExceptionHandler(boost::exception_ptr exp) Log(LogDebug, "GraphiteWriter") << "Exception during Graphite operation: " << DiagnosticInformation(std::move(exp)); - - if (GetConnected()) { - m_Stream->close(); - - SetConnected(false); - } -} - -/** - * Reconnect method, stops when the feature is paused in HA zones. - * - * Called inside the WQ. - */ -void GraphiteWriter::Reconnect() -{ - AssertOnWorkQueue(); - - if (IsPaused()) { - SetConnected(false); - return; - } - - ReconnectInternal(); -} - -/** - * Reconnect method, connects to a TCP Stream - */ -void GraphiteWriter::ReconnectInternal() -{ - double startTime = Utility::GetTime(); - - CONTEXT("Reconnecting to Graphite '" << GetName() << "'"); - - SetShouldConnect(true); - - if (GetConnected()) - return; - - Log(LogNotice, "GraphiteWriter") - << "Reconnecting to Graphite on host '" << GetHost() << "' port '" << GetPort() << "'."; - - m_Stream = Shared::Make(IoEngine::Get().GetIoContext()); - - try { - icinga::Connect(m_Stream->lowest_layer(), GetHost(), GetPort()); - } catch (const std::exception& ex) { - Log(LogWarning, "GraphiteWriter") - << "Can't connect to Graphite on host '" << GetHost() << "' port '" << GetPort() << ".'"; - - SetConnected(false); - - throw; - } - - SetConnected(true); - - Log(LogInformation, "GraphiteWriter") - << "Finished reconnecting to Graphite in " << std::setw(2) << Utility::GetTime() - startTime << " second(s)."; -} - -/** - * Reconnect handler called by the timer. - * - * Enqueues a reconnect task into the WQ. - */ -void GraphiteWriter::ReconnectTimerHandler() -{ - if (IsPaused()) - return; - - m_WorkQueue.Enqueue([this]() { Reconnect(); }, PriorityHigh); -} - -/** - * Disconnect the stream. - * - * Called inside the WQ. - */ -void GraphiteWriter::Disconnect() -{ - AssertOnWorkQueue(); - - DisconnectInternal(); -} - -/** - * Disconnect the stream. - * - * Called outside the WQ. - */ -void GraphiteWriter::DisconnectInternal() -{ - if (!GetConnected()) - return; - - m_Stream->close(); - - SetConnected(false); } /** @@ -302,11 +192,11 @@ void GraphiteWriter::CheckResultHandler(const Checkable::Ptr& checkable, const C } m_WorkQueue.Enqueue([this, checkable, cr, prefix = std::move(prefix), metadata = std::move(metadata)]() { - CONTEXT("Processing check result for '" << checkable->GetName() << "'"); + if (m_Connection->IsStopped()) { + return; + } - /* TODO: Deal with missing connection here. Needs refactoring - * into parsing the actual performance data and then putting it - * into a queue for re-inserting. */ + CONTEXT("Processing check result for '" << checkable->GetName() << "'"); for (auto& [name, val] : metadata) { SendMetric(checkable, prefix + ".metadata", name, val, cr->GetExecutionEnd()); @@ -394,19 +284,11 @@ void GraphiteWriter::SendMetric(const Checkable::Ptr& checkable, const String& p // do not send \n to debug log msgbuf << "\n"; - std::unique_lock lock(m_StreamMutex); - - if (!GetConnected()) - return; - try { - asio::write(*m_Stream, asio::buffer(msgbuf.str())); - m_Stream->flush(); - } catch (const std::exception& ex) { - Log(LogCritical, "GraphiteWriter") - << "Cannot write to TCP socket on host '" << GetHost() << "' port '" << GetPort() << "'."; - - throw ex; + m_Connection->Send(asio::buffer(msgbuf.str())); + } catch (const PerfdataWriterConnection::Stopped& ex) { + Log(LogDebug, "GraphiteWriter") << ex.what(); + return; } } diff --git a/lib/perfdata/graphitewriter.hpp b/lib/perfdata/graphitewriter.hpp index b28db8172..470fcc07d 100644 --- a/lib/perfdata/graphitewriter.hpp +++ b/lib/perfdata/graphitewriter.hpp @@ -5,13 +5,10 @@ #define GRAPHITEWRITER_H #include "perfdata/graphitewriter-ti.hpp" -#include "icinga/service.hpp" +#include "icinga/checkable.hpp" #include "base/configobject.hpp" -#include "base/tcpsocket.hpp" -#include "base/timer.hpp" #include "base/workqueue.hpp" -#include -#include +#include "perfdata/perfdatawriterconnection.hpp" namespace icinga { @@ -38,12 +35,10 @@ protected: void Pause() override; private: - Shared::Ptr m_Stream; - std::mutex m_StreamMutex; + PerfdataWriterConnection::Ptr m_Connection; WorkQueue m_WorkQueue{10000000, 1}; boost::signals2::connection m_HandleCheckResults; - Timer::Ptr m_ReconnectTimer; void CheckResultHandler(const Checkable::Ptr& checkable, const CheckResult::Ptr& cr); void SendMetric(const Checkable::Ptr& checkable, const String& prefix, const String& name, double value, double ts); @@ -52,13 +47,6 @@ private: static String EscapeMetricLabel(const String& str); static Value EscapeMacroMetric(const Value& value); - void ReconnectTimerHandler(); - - void Disconnect(); - void DisconnectInternal(); - void Reconnect(); - void ReconnectInternal(); - void AssertOnWorkQueue(); void ExceptionHandler(boost::exception_ptr exp); diff --git a/lib/perfdata/graphitewriter.ti b/lib/perfdata/graphitewriter.ti index d89e879dd..f0d9bfb80 100644 --- a/lib/perfdata/graphitewriter.ti +++ b/lib/perfdata/graphitewriter.ti @@ -27,9 +27,8 @@ class GraphiteWriter : ConfigObject [config] bool enable_send_thresholds; [config] bool enable_send_metadata; - [no_user_modify] bool connected; - [no_user_modify] bool should_connect { - default {{{ return true; }}} + [config] double disconnect_timeout { + default {{{ return 10; }}} }; [config] bool enable_ha { default {{{ return false; }}} diff --git a/lib/perfdata/influxdbcommonwriter.cpp b/lib/perfdata/influxdbcommonwriter.cpp index 47d945573..5b6f376de 100644 --- a/lib/perfdata/influxdbcommonwriter.cpp +++ b/lib/perfdata/influxdbcommonwriter.cpp @@ -2,6 +2,7 @@ // SPDX-License-Identifier: GPL-2.0-or-later #include "perfdata/influxdbcommonwriter.hpp" +#include "base/defer.hpp" #include "perfdata/influxdbcommonwriter-ti.cpp" #include "remote/url.hpp" #include "icinga/service.hpp" @@ -9,36 +10,15 @@ #include "icinga/icingaapplication.hpp" #include "icinga/checkcommand.hpp" #include "base/application.hpp" -#include "base/defer.hpp" -#include "base/io-engine.hpp" -#include "base/tcpsocket.hpp" -#include "base/configtype.hpp" #include "base/objectlock.hpp" #include "base/logger.hpp" -#include "base/convert.hpp" -#include "base/utility.hpp" -#include "base/stream.hpp" #include "base/json.hpp" -#include "base/networkstream.hpp" #include "base/exception.hpp" -#include "base/statsfunction.hpp" -#include "base/tlsutility.hpp" #include #include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include #include #include #include -#include #include #include @@ -80,6 +60,21 @@ void InfluxdbCommonWriter::OnConfigLoaded() } } +void InfluxdbCommonWriter::Start(bool runtimeCreated) +{ + ObjectImpl::Start(runtimeCreated); + + if (GetSslEnable()) { + try { + m_SslContext = MakeAsioSslContext(GetSslCert(), GetSslKey(), GetSslCaCert()); + } catch (const std::exception& ex) { + Log(LogCritical, GetReflectionType()->GetName()) + << "Unable to create SSL context: " << ex.what(); + throw; + } + } +} + void InfluxdbCommonWriter::Resume() { ObjectImpl::Resume(); @@ -97,6 +92,8 @@ void InfluxdbCommonWriter::Resume() m_FlushTimer->Start(); m_FlushTimer->Reschedule(0); + m_Connection = new PerfdataWriterConnection{this, GetHost(), GetPort(), m_SslContext, !GetSslInsecureNoverify()}; + /* Register for new metrics. */ m_HandleCheckResults = Checkable::OnNewCheckResult.connect([this](const Checkable::Ptr& checkable, const CheckResult::Ptr& cr, const MessageOrigin::Ptr&) { @@ -114,7 +111,15 @@ void InfluxdbCommonWriter::Pause() << "Processing pending tasks and flushing data buffers."; m_FlushTimer->Stop(true); - m_WorkQueue.Enqueue([this]() { FlushWQ(); }, PriorityLow); + + std::promise queueDonePromise; + m_WorkQueue.Enqueue([&]() { + FlushWQ(); + queueDonePromise.set_value(); + }, PriorityLow); + + auto timeout = std::chrono::duration{GetDisconnectTimeout()}; + m_Connection->CancelAfterTimeout(queueDonePromise.get_future(), timeout); /* Wait for the flush to complete, implicitly waits for all WQ tasks enqueued prior to pausing. */ m_WorkQueue.Join(); @@ -136,68 +141,6 @@ void InfluxdbCommonWriter::ExceptionHandler(boost::exception_ptr exp) Log(LogDebug, GetReflectionType()->GetName()) << "Exception during InfluxDB operation: " << DiagnosticInformation(std::move(exp)); - - //TODO: Close the connection, if we keep it open. -} - -OptionalTlsStream InfluxdbCommonWriter::Connect() -{ - Log(LogNotice, GetReflectionType()->GetName()) - << "Reconnecting to InfluxDB on host '" << GetHost() << "' port '" << GetPort() << "'."; - - OptionalTlsStream stream; - bool ssl = GetSslEnable(); - - if (ssl) { - Shared::Ptr sslContext; - - try { - sslContext = MakeAsioSslContext(GetSslCert(), GetSslKey(), GetSslCaCert()); - } catch (const std::exception& ex) { - Log(LogWarning, GetReflectionType()->GetName()) - << "Unable to create SSL context."; - throw; - } - - stream.first = Shared::Make(IoEngine::Get().GetIoContext(), *sslContext, GetHost()); - - } else { - stream.second = Shared::Make(IoEngine::Get().GetIoContext()); - } - - try { - icinga::Connect(ssl ? stream.first->lowest_layer() : stream.second->lowest_layer(), GetHost(), GetPort()); - } catch (const std::exception& ex) { - Log(LogWarning, GetReflectionType()->GetName()) - << "Can't connect to InfluxDB on host '" << GetHost() << "' port '" << GetPort() << "'."; - throw; - } - - if (ssl) { - auto& tlsStream (stream.first->next_layer()); - - try { - tlsStream.handshake(tlsStream.client); - } catch (const std::exception& ex) { - Log(LogWarning, GetReflectionType()->GetName()) - << "TLS handshake with host '" << GetHost() << "' failed."; - throw; - } - - if (!GetSslInsecureNoverify()) { - if (!tlsStream.GetPeerCertificate()) { - BOOST_THROW_EXCEPTION(std::runtime_error("InfluxDB didn't present any TLS certificate.")); - } - - if (!tlsStream.IsVerifyOK()) { - BOOST_THROW_EXCEPTION(std::runtime_error( - "TLS certificate validation failed: " + std::string(tlsStream.GetVerifyError()) - )); - } - } - } - - return stream; } void InfluxdbCommonWriter::CheckResultHandler(const Checkable::Ptr& checkable, const CheckResult::Ptr& cr) @@ -261,6 +204,10 @@ void InfluxdbCommonWriter::CheckResultHandler(const Checkable::Ptr& checkable, c } m_WorkQueue.Enqueue([this, checkable, cr, tmpl = std::move(tmpl), metadataFields = std::move(fields)]() { + if (m_Connection->IsStopped()) { + return; + } + CONTEXT("Processing check result for '" << checkable->GetName() << "'"); double ts = cr->GetExecutionEnd(); @@ -411,19 +358,19 @@ void InfluxdbCommonWriter::SendMetric(const Checkable::Ptr& checkable, const Dic } } +/** + * Queues a Flush on the work-queue and restarts the timer. + */ void InfluxdbCommonWriter::FlushTimeout() { - m_WorkQueue.Enqueue([this]() { FlushTimeoutWQ(); }, PriorityHigh); -} + if (m_FlushTimerInQueue.exchange(true, std::memory_order_relaxed)) { + return; + } -void InfluxdbCommonWriter::FlushTimeoutWQ() -{ - AssertOnWorkQueue(); - - Log(LogDebug, GetReflectionType()->GetName()) - << "Timer expired writing " << m_DataBuffer.size() << " data points"; - - FlushWQ(); + m_WorkQueue.Enqueue([&]() { + Defer resetFlushTimer{[&]() { m_FlushTimerInQueue.store(false, std::memory_order_relaxed); }}; + FlushWQ(); + }); } void InfluxdbCommonWriter::FlushWQ() @@ -444,55 +391,16 @@ void InfluxdbCommonWriter::FlushWQ() m_DataBuffer.clear(); m_DataBufferSize = 0; - OptionalTlsStream stream; - - try { - stream = Connect(); - } catch (const std::exception& ex) { - Log(LogWarning, GetReflectionType()->GetName()) - << "Flush failed, cannot connect to InfluxDB: " << DiagnosticInformation(ex, false); - return; - } - - Defer s ([&stream]() { - if (stream.first) { - stream.first->next_layer().shutdown(); - } - }); - auto request (AssembleRequest(std::move(body))); + decltype(m_Connection->Send(request)) response; try { - if (stream.first) { - http::write(*stream.first, request); - stream.first->flush(); - } else { - http::write(*stream.second, request); - stream.second->flush(); - } - } catch (const std::exception& ex) { - Log(LogWarning, GetReflectionType()->GetName()) - << "Cannot write to TCP socket on host '" << GetHost() << "' port '" << GetPort() << "'."; - throw; + response = m_Connection->Send(request); + } catch (const PerfdataWriterConnection::Stopped& ex) { + Log(LogDebug, GetReflectionType()->GetName()) << ex.what(); + return; } - http::parser parser; - beast::flat_buffer buf; - - try { - if (stream.first) { - http::read(*stream.first, buf, parser); - } else { - http::read(*stream.second, buf, parser); - } - } catch (const std::exception& ex) { - Log(LogWarning, GetReflectionType()->GetName()) - << "Failed to parse HTTP response from host '" << GetHost() << "' port '" << GetPort() << "': " << DiagnosticInformation(ex); - throw; - } - - auto& response (parser.get()); - if (response.result() != http::status::no_content) { Log(LogCritical, GetReflectionType()->GetName()) << "Unexpected response code: " << response.result() << ", InfluxDB error message:\n" << response.body(); diff --git a/lib/perfdata/influxdbcommonwriter.hpp b/lib/perfdata/influxdbcommonwriter.hpp index 35caa2f25..cfda02501 100644 --- a/lib/perfdata/influxdbcommonwriter.hpp +++ b/lib/perfdata/influxdbcommonwriter.hpp @@ -5,18 +5,13 @@ #define INFLUXDBCOMMONWRITER_H #include "perfdata/influxdbcommonwriter-ti.hpp" -#include "icinga/service.hpp" +#include "icinga/checkable.hpp" #include "base/configobject.hpp" #include "base/perfdatavalue.hpp" -#include "base/tcpsocket.hpp" -#include "base/timer.hpp" -#include "base/tlsstream.hpp" #include "base/workqueue.hpp" #include "remote/url.hpp" -#include -#include +#include "perfdata/perfdatawriterconnection.hpp" #include -#include namespace icinga { @@ -39,6 +34,7 @@ public: protected: void OnConfigLoaded() override; + void Start(bool runtimeCreated) override; void Resume() override; void Pause() override; @@ -50,22 +46,22 @@ protected: private: boost::signals2::connection m_HandleCheckResults; Timer::Ptr m_FlushTimer; + std::atomic_bool m_FlushTimerInQueue{false}; WorkQueue m_WorkQueue{10000000, 1}; std::vector m_DataBuffer; std::atomic_size_t m_DataBufferSize{0}; + Shared::Ptr m_SslContext; + PerfdataWriterConnection::Ptr m_Connection; void CheckResultHandler(const Checkable::Ptr& checkable, const CheckResult::Ptr& cr); void SendMetric(const Checkable::Ptr& checkable, const Dictionary::Ptr& tmpl, const String& label, const Dictionary::Ptr& fields, double ts); void FlushTimeout(); - void FlushTimeoutWQ(); void FlushWQ(); static String EscapeKeyOrTagValue(const String& str); static String EscapeValue(const Value& value); - OptionalTlsStream Connect(); - void AssertOnWorkQueue(); void ExceptionHandler(boost::exception_ptr exp); diff --git a/lib/perfdata/influxdbcommonwriter.ti b/lib/perfdata/influxdbcommonwriter.ti index 12e867824..7074bceaf 100644 --- a/lib/perfdata/influxdbcommonwriter.ti +++ b/lib/perfdata/influxdbcommonwriter.ti @@ -52,13 +52,16 @@ abstract class InfluxdbCommonWriter : ConfigObject }); }}} }; + [config] double disconnect_timeout { + default {{{ return 10; }}} + }; [config] bool enable_send_thresholds { default {{{ return false; }}} }; [config] bool enable_send_metadata { default {{{ return false; }}} }; - [config] int flush_interval { + [config] double flush_interval { default {{{ return 10; }}} }; [config] int flush_threshold { diff --git a/lib/perfdata/opentsdbwriter.cpp b/lib/perfdata/opentsdbwriter.cpp index 002639792..1b2f82a7d 100644 --- a/lib/perfdata/opentsdbwriter.cpp +++ b/lib/perfdata/opentsdbwriter.cpp @@ -7,17 +7,12 @@ #include "icinga/checkcommand.hpp" #include "icinga/macroprocessor.hpp" #include "icinga/icingaapplication.hpp" -#include "icinga/compatutility.hpp" -#include "base/tcpsocket.hpp" #include "base/configtype.hpp" #include "base/objectlock.hpp" #include "base/logger.hpp" #include "base/convert.hpp" -#include "base/utility.hpp" #include "base/perfdatavalue.hpp" -#include "base/application.hpp" #include "base/stream.hpp" -#include "base/networkstream.hpp" #include "base/exception.hpp" #include "base/statsfunction.hpp" #include @@ -36,6 +31,8 @@ void OpenTsdbWriter::OnConfigLoaded() { ObjectImpl::OnConfigLoaded(); + m_WorkQueue.SetName("OpenTsdbWriter, " + GetName()); + if (!GetEnableHa()) { Log(LogDebug, "OpenTsdbWriter") << "HA functionality disabled. Won't pause connection: " << GetName(); @@ -51,14 +48,26 @@ void OpenTsdbWriter::OnConfigLoaded() * * @param status Key value pairs for feature stats */ -void OpenTsdbWriter::StatsFunc(const Dictionary::Ptr& status, const Array::Ptr&) +void OpenTsdbWriter::StatsFunc(const Dictionary::Ptr& status, const Array::Ptr& perfdata) { DictionaryData nodes; for (const OpenTsdbWriter::Ptr& opentsdbwriter : ConfigType::GetObjectsByType()) { - nodes.emplace_back(opentsdbwriter->GetName(), new Dictionary({ - { "connected", opentsdbwriter->GetConnected() } - })); + size_t workQueueItems = opentsdbwriter->m_WorkQueue.GetLength(); + double workQueueItemRate = opentsdbwriter->m_WorkQueue.GetTaskCount(60) / 60.0; + + nodes.emplace_back( + opentsdbwriter->GetName(), + new Dictionary({ + { "connected", opentsdbwriter->m_Connection->IsConnected() }, + {"work_queue_items", workQueueItems}, + {"work_queue_item_rate", workQueueItemRate} + } + ) + ); + + perfdata->Add(new PerfdataValue("opentsdbwriter_" + opentsdbwriter->GetName() + "_work_queue_items", workQueueItems)); + perfdata->Add(new PerfdataValue("opentsdbwriter_" + opentsdbwriter->GetName() + "_work_queue_item_rate", workQueueItemRate)); } status->Set("opentsdbwriter", new Dictionary(std::move(nodes))); @@ -74,13 +83,14 @@ void OpenTsdbWriter::Resume() Log(LogInformation, "OpentsdbWriter") << "'" << GetName() << "' resumed."; + m_WorkQueue.SetExceptionCallback([](const boost::exception_ptr& exp) { + Log(LogDebug, "OpenTsdbWriter") + << "Exception during OpenTsdb operation: " << DiagnosticInformation(exp); + }); + ReadConfigTemplate(); - m_ReconnectTimer = Timer::Create(); - m_ReconnectTimer->SetInterval(10); - m_ReconnectTimer->OnTimerExpired.connect([this](const Timer * const&) { ReconnectTimerHandler(); }); - m_ReconnectTimer->Start(); - m_ReconnectTimer->Reschedule(0); + m_Connection = new PerfdataWriterConnection{this, GetHost(), GetPort()}; m_HandleCheckResults = Service::OnNewCheckResult.connect([this](const Checkable::Ptr& checkable, const CheckResult::Ptr& cr, const MessageOrigin::Ptr&) { CheckResultHandler(checkable, cr); @@ -93,60 +103,24 @@ void OpenTsdbWriter::Resume() void OpenTsdbWriter::Pause() { m_HandleCheckResults.disconnect(); - m_ReconnectTimer->Stop(true); + + std::promise queueDonePromise; + + m_WorkQueue.Enqueue([&]() { + queueDonePromise.set_value(); + }, PriorityLow); + + auto timeout = std::chrono::duration{GetDisconnectTimeout()}; + m_Connection->CancelAfterTimeout(queueDonePromise.get_future(), timeout); + + m_WorkQueue.Join(); Log(LogInformation, "OpentsdbWriter") << "'" << GetName() << "' paused."; - m_Stream->close(); - - SetConnected(false); - ObjectImpl::Pause(); } -/** - * Reconnect handler called by the timer. - * Handles TLS - */ -void OpenTsdbWriter::ReconnectTimerHandler() -{ - if (IsPaused()) - return; - - SetShouldConnect(true); - - if (GetConnected()) - return; - - double startTime = Utility::GetTime(); - - Log(LogNotice, "OpenTsdbWriter") - << "Reconnecting to OpenTSDB TSD on host '" << GetHost() << "' port '" << GetPort() << "'."; - - /* - * We're using telnet as input method. Future PRs may change this into using the HTTP API. - * http://opentsdb.net/docs/build/html/user_guide/writing/index.html#telnet - */ - m_Stream = Shared::Make(IoEngine::Get().GetIoContext()); - - try { - icinga::Connect(m_Stream->lowest_layer(), GetHost(), GetPort()); - } catch (const std::exception& ex) { - Log(LogWarning, "OpenTsdbWriter") - << "Can't connect to OpenTSDB on host '" << GetHost() << "' port '" << GetPort() << "'."; - - SetConnected(false); - - return; - } - - SetConnected(true); - - Log(LogInformation, "OpenTsdbWriter") - << "Finished reconnecting to OpenTSDB in " << std::setw(2) << Utility::GetTime() - startTime << " second(s)."; -} - /** * Registered check result handler processing data. * Calculates tags from the config. @@ -251,7 +225,7 @@ void OpenTsdbWriter::CheckResultHandler(const Checkable::Ptr& checkable, const C String escaped_hostName = EscapeTag(host->GetName()); tags["host"] = escaped_hostName; - double ts = cr->GetExecutionEnd(); + std::vector> metadata; if (service) { @@ -262,40 +236,55 @@ void OpenTsdbWriter::CheckResultHandler(const Checkable::Ptr& checkable, const C String escaped_serviceName = EscapeMetric(serviceName); metric = "icinga.service." + escaped_serviceName; } - - SendMetric(checkable, metric + ".state", tags, service->GetState(), ts); + metadata.emplace_back("state", service->GetState()); } else { if (!config_tmpl_metric.IsEmpty()) { metric = config_tmpl_metric; } else { metric = "icinga.host"; } - SendMetric(checkable, metric + ".state", tags, host->GetState(), ts); + metadata.emplace_back("state", host->GetState()); } - SendMetric(checkable, metric + ".state_type", tags, checkable->GetStateType(), ts); - SendMetric(checkable, metric + ".reachable", tags, checkable->IsReachable(), ts); - SendMetric(checkable, metric + ".downtime_depth", tags, checkable->GetDowntimeDepth(), ts); - SendMetric(checkable, metric + ".acknowledgement", tags, checkable->GetAcknowledgement(), ts); + metadata.emplace_back("state_type", checkable->GetStateType()); + metadata.emplace_back("reachable", checkable->IsReachable()); + metadata.emplace_back("downtime_depth", checkable->GetDowntimeDepth()); + metadata.emplace_back("acknowledgement", checkable->GetAcknowledgement()); - SendPerfdata(checkable, metric, tags, cr, ts); + m_WorkQueue.Enqueue( + [this, checkable, service, cr, metric = std::move(metric), tags = std::move(tags), metadata = std::move(metadata)]() mutable { + if (m_Connection->IsStopped()) { + return; + } - metric = "icinga.check"; + double ts = cr->GetExecutionEnd(); - if (service) { - tags["type"] = "service"; - String serviceName = service->GetShortName(); - String escaped_serviceName = EscapeTag(serviceName); - tags["service"] = escaped_serviceName; - } else { - tags["type"] = "host"; - } + for (auto& [name, val] : metadata) { + AddMetric(checkable, metric + "." + name, tags, val, ts); + } - SendMetric(checkable, metric + ".current_attempt", tags, checkable->GetCheckAttempt(), ts); - SendMetric(checkable, metric + ".max_check_attempts", tags, checkable->GetMaxCheckAttempts(), ts); - SendMetric(checkable, metric + ".latency", tags, cr->CalculateLatency(), ts); - SendMetric(checkable, metric + ".execution_time", tags, cr->CalculateExecutionTime(), ts); + AddPerfdata(checkable, metric, tags, cr, ts); + + metric = "icinga.check"; + + if (service) { + tags["type"] = "service"; + String serviceName = service->GetShortName(); + String escaped_serviceName = EscapeTag(serviceName); + tags["service"] = escaped_serviceName; + } else { + tags["type"] = "host"; + } + + AddMetric(checkable, metric + ".current_attempt", tags, checkable->GetCheckAttempt(), ts); + AddMetric(checkable, metric + ".max_check_attempts", tags, checkable->GetMaxCheckAttempts(), ts); + AddMetric(checkable, metric + ".latency", tags, cr->CalculateLatency(), ts); + AddMetric(checkable, metric + ".execution_time", tags, cr->CalculateExecutionTime(), ts); + + SendMsgBuffer(); + } + ); } /** @@ -307,9 +296,11 @@ void OpenTsdbWriter::CheckResultHandler(const Checkable::Ptr& checkable, const C * @param cr Check result containing performance data * @param ts Timestamp when the check result was received */ -void OpenTsdbWriter::SendPerfdata(const Checkable::Ptr& checkable, const String& metric, +void OpenTsdbWriter::AddPerfdata(const Checkable::Ptr& checkable, const String& metric, const std::map& tags, const CheckResult::Ptr& cr, double ts) { + ASSERT(m_WorkQueue.IsWorkerThread()); + Array::Ptr perfdata = cr->GetPerformanceData(); if (!perfdata) @@ -350,21 +341,21 @@ void OpenTsdbWriter::SendPerfdata(const Checkable::Ptr& checkable, const String& tags_new["label"] = escaped_key; } - SendMetric(checkable, metric_name, tags_new, pdv->GetValue(), ts); + AddMetric(checkable, metric_name, tags_new, pdv->GetValue(), ts); if (!pdv->GetCrit().IsEmpty()) - SendMetric(checkable, metric_name + "_crit", tags_new, pdv->GetCrit(), ts); + AddMetric(checkable, metric_name + "_crit", tags_new, pdv->GetCrit(), ts); if (!pdv->GetWarn().IsEmpty()) - SendMetric(checkable, metric_name + "_warn", tags_new, pdv->GetWarn(), ts); + AddMetric(checkable, metric_name + "_warn", tags_new, pdv->GetWarn(), ts); if (!pdv->GetMin().IsEmpty()) - SendMetric(checkable, metric_name + "_min", tags_new, pdv->GetMin(), ts); + AddMetric(checkable, metric_name + "_min", tags_new, pdv->GetMin(), ts); if (!pdv->GetMax().IsEmpty()) - SendMetric(checkable, metric_name + "_max", tags_new, pdv->GetMax(), ts); + AddMetric(checkable, metric_name + "_max", tags_new, pdv->GetMax(), ts); } } /** - * Send given metric to OpenTSDB + * Add given metric to the data buffer to be later sent to OpenTSDB * * @param checkable Host/service object * @param metric Full metric name @@ -372,9 +363,11 @@ void OpenTsdbWriter::SendPerfdata(const Checkable::Ptr& checkable, const String& * @param value Floating point metric value * @param ts Timestamp where the metric was received from the check result */ -void OpenTsdbWriter::SendMetric(const Checkable::Ptr& checkable, const String& metric, +void OpenTsdbWriter::AddMetric(const Checkable::Ptr& checkable, const String& metric, const std::map& tags, double value, double ts) { + ASSERT(m_WorkQueue.IsWorkerThread()); + String tags_string = ""; for (auto& tag : tags) { @@ -394,22 +387,21 @@ void OpenTsdbWriter::SendMetric(const Checkable::Ptr& checkable, const String& m /* do not send \n to debug log */ msgbuf << "\n"; - String put = msgbuf.str(); + m_MsgBuf.append(msgbuf.str()); +} - ObjectLock olock(this); +void OpenTsdbWriter::SendMsgBuffer() +{ + ASSERT(m_WorkQueue.IsWorkerThread()); - if (!GetConnected()) - return; + Log(LogDebug, "OpenTsdbWriter") + << "Flushing data buffer to OpenTsdb."; try { - Log(LogDebug, "OpenTsdbWriter") - << "Checkable '" << checkable->GetName() << "' sending message '" << put << "'."; - - boost::asio::write(*m_Stream, boost::asio::buffer(msgbuf.str())); - m_Stream->flush(); - } catch (const std::exception& ex) { - Log(LogCritical, "OpenTsdbWriter") - << "Cannot write to TCP socket on host '" << GetHost() << "' port '" << GetPort() << "'."; + m_Connection->Send(boost::asio::buffer(std::exchange(m_MsgBuf, std::string{}))); + } catch (const PerfdataWriterConnection::Stopped& ex) { + Log(LogDebug, "OpenTsdbWriter") << ex.what(); + return; } } diff --git a/lib/perfdata/opentsdbwriter.hpp b/lib/perfdata/opentsdbwriter.hpp index cd3f2efc4..5db298540 100644 --- a/lib/perfdata/opentsdbwriter.hpp +++ b/lib/perfdata/opentsdbwriter.hpp @@ -5,11 +5,9 @@ #define OPENTSDBWRITER_H #include "perfdata/opentsdbwriter-ti.hpp" -#include "icinga/service.hpp" +#include "icinga/checkable.hpp" #include "base/configobject.hpp" -#include "base/tcpsocket.hpp" -#include "base/timer.hpp" -#include +#include "perfdata/perfdatawriterconnection.hpp" namespace icinga { @@ -36,24 +34,24 @@ protected: void Pause() override; private: - Shared::Ptr m_Stream; + WorkQueue m_WorkQueue{10000000, 1}; + std::string m_MsgBuf; + PerfdataWriterConnection::Ptr m_Connection; boost::signals2::connection m_HandleCheckResults; - Timer::Ptr m_ReconnectTimer; Dictionary::Ptr m_ServiceConfigTemplate; Dictionary::Ptr m_HostConfigTemplate; void CheckResultHandler(const Checkable::Ptr& checkable, const CheckResult::Ptr& cr); - void SendMetric(const Checkable::Ptr& checkable, const String& metric, + void AddMetric(const Checkable::Ptr& checkable, const String& metric, const std::map& tags, double value, double ts); - void SendPerfdata(const Checkable::Ptr& checkable, const String& metric, + void SendMsgBuffer(); + void AddPerfdata(const Checkable::Ptr& checkable, const String& metric, const std::map& tags, const CheckResult::Ptr& cr, double ts); static String EscapeTag(const String& str); static String EscapeMetric(const String& str); - void ReconnectTimerHandler(); - void ReadConfigTemplate(); }; diff --git a/lib/perfdata/opentsdbwriter.ti b/lib/perfdata/opentsdbwriter.ti index 56bc8cdf4..dcad57168 100644 --- a/lib/perfdata/opentsdbwriter.ti +++ b/lib/perfdata/opentsdbwriter.ti @@ -31,10 +31,8 @@ class OpenTsdbWriter : ConfigObject [config] bool enable_generic_metrics { default {{{ return false; }}} }; - - [no_user_modify] bool connected; - [no_user_modify] bool should_connect { - default {{{ return true; }}} + [config] double disconnect_timeout { + default {{{ return 10; }}} }; }; diff --git a/lib/perfdata/perfdatawriterconnection.cpp b/lib/perfdata/perfdatawriterconnection.cpp new file mode 100644 index 000000000..f8807c9ce --- /dev/null +++ b/lib/perfdata/perfdatawriterconnection.cpp @@ -0,0 +1,209 @@ +// SPDX-FileCopyrightText: 2026 Icinga GmbH +// SPDX-License-Identifier: GPL-3.0-or-later + +#include "perfdata/perfdatawriterconnection.hpp" +#include "base/tcpsocket.hpp" +#include +#include +#include +#include + +using namespace icinga; +using HttpResponse = PerfdataWriterConnection::HttpResponse; + +PerfdataWriterConnection::PerfdataWriterConnection( + const ConfigObject::Ptr& parent, + String host, + String port, + Shared::Ptr sslContext, + bool verifyPeerCertificate +) + : PerfdataWriterConnection( + parent->GetReflectionType()->GetName(), + parent->GetName(), + std::move(host), + std::move(port), + std::move(sslContext), + verifyPeerCertificate + ) {}; + +PerfdataWriterConnection::PerfdataWriterConnection( + String logFacility, + String parentName, + String host, + String port, + Shared::Ptr sslContext, + bool verifyPeerCertificate +) + : m_VerifyPeerCertificate(verifyPeerCertificate), + m_SslContext(std::move(sslContext)), + m_LogFacility(std::move(logFacility)), + m_ParentName(std::move(parentName)), + m_Host(std::move(host)), + m_Port(std::move(port)), + m_ReconnectTimer(IoEngine::Get().GetIoContext()), + m_Strand(IoEngine::Get().GetIoContext()), + m_Stream(MakeStream()) +{ +} + +/** + * Get the current state of the connection. + */ +bool PerfdataWriterConnection::IsConnected() const +{ + return m_Connected; +} + +bool PerfdataWriterConnection::IsStopped() const +{ + return m_Stopped; +} + +void PerfdataWriterConnection::Disconnect() +{ + if (m_Stopped.exchange(true, std::memory_order_relaxed)) { + return; + } + + std::promise promise; + + IoEngine::SpawnCoroutine(m_Strand, [&](boost::asio::yield_context yc) { + try { + /* Cancel any outstanding operations of the other coroutine. + * Since we're on the same strand we're hopefully guaranteed that all cancellations + * result in exceptions thrown by the yield_context, even if its already queued for + * completion. + */ + std::visit( + [](const auto& stream) { + if (stream->lowest_layer().is_open()) { + stream->lowest_layer().cancel(); + } + }, + m_Stream + ); + m_ReconnectTimer.cancel(); + + Disconnect(std::move(yc)); + promise.set_value(); + } catch (const std::exception& ex) { + promise.set_exception(std::current_exception()); + } + }); + + promise.get_future().get(); +} + +AsioTlsOrTcpStream PerfdataWriterConnection::MakeStream() const +{ + AsioTlsOrTcpStream ret; + if (m_SslContext) { + ret = Shared::Make(IoEngine::Get().GetIoContext(), *m_SslContext); + } else { + ret = Shared::Make(IoEngine::Get().GetIoContext()); + } + + return ret; +} + +/** + * Wait for the next attempt after an error, using a backoff algorithm. + * + * The waits between retries are doubled for each failure, up to a maximum of 32s, until it is + * reset by a successful attempt. + */ +void PerfdataWriterConnection::BackoffWait(const boost::asio::yield_context& yc) +{ + m_ReconnectTimer.expires_after(m_RetryTimeout); + if (m_RetryTimeout <= FinalRetryWait / 2) { + m_RetryTimeout *= 2; + } + m_ReconnectTimer.async_wait(yc); +} + +void PerfdataWriterConnection::EnsureConnected(const boost::asio::yield_context& yc) +{ + if (m_Connected) { + return; + } + + std::visit( + [&](auto& stream) { + ::Connect(stream->lowest_layer(), m_Host, m_Port, yc); + + if constexpr (std::is_same_v, Shared::Ptr>) { + using type = boost::asio::ssl::stream_base::handshake_type; + + stream->next_layer().async_handshake(type::client, yc); + + if (m_VerifyPeerCertificate) { + if (!stream->next_layer().IsVerifyOK()) { + BOOST_THROW_EXCEPTION( + std::runtime_error{ + "TLS certificate validation failed: " + stream->next_layer().GetVerifyError() + } + ); + } + } + } + }, + m_Stream + ); + + m_Connected = true; +} + +void PerfdataWriterConnection::Disconnect(boost::asio::yield_context yc) +{ + if (!m_Connected.exchange(false, std::memory_order_relaxed)) { + return; + } + + std::visit( + [&](auto& stream) { + if constexpr (std::is_same_v, Shared::Ptr>) { + stream->GracefulDisconnect(m_Strand, yc); + } else { + stream->lowest_layer().shutdown(boost::asio::socket_base::shutdown_both); + stream->lowest_layer().close(); + } + }, + m_Stream + ); + + m_Stream = MakeStream(); +} + +void PerfdataWriterConnection::WriteMessage(boost::asio::const_buffer buf, const boost::asio::yield_context& yc) +{ + std::visit( + [&](auto& stream) { + boost::asio::async_write(*stream, buf, yc); + stream->async_flush(yc); + }, + m_Stream + ); +} + +HttpResponse PerfdataWriterConnection::WriteMessage(const HttpRequest& request, const boost::asio::yield_context& yc) +{ + boost::beast::http::response response; + std::visit( + [&](auto& stream) { + boost::beast::http::request_serializer sr{request}; + boost::beast::http::async_write(*stream, sr, yc); + stream->async_flush(yc); + + boost::beast::flat_buffer buf; + boost::beast::http::async_read(*stream, buf, response, yc); + }, + m_Stream + ); + + if (!response.keep_alive()) { + Disconnect(yc); + } + + return response; +} diff --git a/lib/perfdata/perfdatawriterconnection.hpp b/lib/perfdata/perfdatawriterconnection.hpp new file mode 100644 index 000000000..729878a29 --- /dev/null +++ b/lib/perfdata/perfdatawriterconnection.hpp @@ -0,0 +1,157 @@ +// SPDX-FileCopyrightText: 2026 Icinga GmbH +// SPDX-License-Identifier: GPL-3.0-or-later + +#pragma once + +#include "base/io-engine.hpp" +#include "base/tlsstream.hpp" +#include +#include +#include +#include +#include + +namespace icinga { + +/** + * Class handling the connection to the various Perfdata backends. + */ +class PerfdataWriterConnection final : public Object +{ + static constexpr auto InitialRetryWait = 50ms; + static constexpr auto FinalRetryWait = 32s; + +public: + DECLARE_PTR_TYPEDEFS(PerfdataWriterConnection); + + struct Stopped : std::exception + { + [[nodiscard]] const char* what() const noexcept final { return "Connection stopped."; } + }; + + using HttpRequest = boost::beast::http::request; + using HttpResponse = boost::beast::http::response; + + PerfdataWriterConnection( + const ConfigObject::Ptr& parent, + String host, + String port, + Shared::Ptr sslContext = nullptr, + bool verifyPeerCertificate = true + ); + + PerfdataWriterConnection( + String logFacility, + String parentName, + String host, + String port, + Shared::Ptr sslContext = nullptr, + bool verifyPeerCertificate = true + ); + + /** + * Send the given data buffer to the server. + * + * To support each Buffer type this function needs an overload of the WriteMessage method. + * If the selected WriteMessage functions returns something, Send() will return that result. + * + * @param buf The buffer to send + * @return the return value returned by the WriteMessage overload for Buffer, otherwise void + */ + template + auto Send(Buffer&& buf) + { + if (m_Stopped) { + BOOST_THROW_EXCEPTION(Stopped{}); + } + + using RetType = decltype(WriteMessage(std::declval(), std::declval())); + std::promise promise; + + IoEngine::SpawnCoroutine(m_Strand, [&](boost::asio::yield_context yc) { + while (true) { + try { + EnsureConnected(yc); + + if constexpr (std::is_void_v) { + WriteMessage(std::forward(buf), yc); + promise.set_value(); + } else { + promise.set_value(WriteMessage(std::forward(buf), yc)); + } + + m_RetryTimeout = InitialRetryWait; + return; + } catch (const std::exception& ex) { + if (m_Stopped) { + promise.set_exception(std::make_exception_ptr(Stopped{})); + return; + } + + Log(LogCritical, m_LogFacility) + << "Error while " << (m_Connected ? "sending" : "connecting") << " to '" << m_Host << ":" + << m_Port << "' for '" << m_ParentName << "': " << ex.what(); + + m_Stream = MakeStream(); + m_Connected = false; + + try { + BackoffWait(yc); + } catch (const std::exception&) { + promise.set_exception(std::make_exception_ptr(Stopped{})); + return; + } + } + } + }); + + return promise.get_future().get(); + } + + void Disconnect(); + + /** + * Cancels ongoing operations either after a timeout or a future became ready. + * + * This will disconnect and set a flag so that no further Send() requests are accepted. + * + * @param future The future to wait for + * @param timeout The timeout after which ongoing operations are canceled + */ + template + void CancelAfterTimeout(const std::future& future, const std::chrono::duration& timeout) + { + future.wait_for(timeout); + Disconnect(); + } + + bool IsConnected() const; + bool IsStopped() const; + +private: + AsioTlsOrTcpStream MakeStream() const; + void BackoffWait(const boost::asio::yield_context& yc); + void EnsureConnected(const boost::asio::yield_context& yc); + void Disconnect(boost::asio::yield_context yc); + + void WriteMessage(boost::asio::const_buffer, const boost::asio::yield_context& yc); + HttpResponse WriteMessage(const HttpRequest& request, const boost::asio::yield_context& yc); + + std::atomic_bool m_Stopped{false}; + std::atomic_bool m_Connected{false}; + + bool m_VerifyPeerCertificate; + Shared::Ptr m_SslContext; + + String m_LogFacility; + String m_ParentName; + String m_Host; + String m_Port; + + std::chrono::milliseconds m_RetryTimeout{InitialRetryWait}; + boost::asio::steady_timer m_ReconnectTimer; + boost::asio::io_context::strand m_Strand; + AsioTlsOrTcpStream m_Stream; +}; + +} // namespace icinga diff --git a/test/CMakeLists.txt b/test/CMakeLists.txt index f628fc8a4..d1f0906cb 100644 --- a/test/CMakeLists.txt +++ b/test/CMakeLists.txt @@ -140,6 +140,18 @@ if(ICINGA2_WITH_NOTIFICATION) ) endif() +if(ICINGA2_WITH_PERFDATA) + list(APPEND base_test_SOURCES + perfdata-elasticsearchwriter.cpp + perfdata-gelfwriter.cpp + perfdata-graphitewriter.cpp + perfdata-influxdbwriter.cpp + perfdata-opentsdbwriter.cpp + perfdata-perfdatawriterconnection.cpp + $ + ) +endif() + if(ICINGA2_UNITY_BUILD) mkunity_target(base test base_test_SOURCES) endif() diff --git a/test/base-testloggerfixture.hpp b/test/base-testloggerfixture.hpp index 1d1426efc..d01384fa8 100644 --- a/test/base-testloggerfixture.hpp +++ b/test/base-testloggerfixture.hpp @@ -11,6 +11,12 @@ #include #include +#define CHECK_LOG_MESSAGE(pattern, timeout) BOOST_CHECK(ExpectLogPattern(pattern, timeout)) +#define REQUIRE_LOG_MESSAGE(pattern, timeout) BOOST_REQUIRE(ExpectLogPattern(pattern, timeout)) + +#define CHECK_NO_LOG_MESSAGE(pattern, timeout) BOOST_CHECK(!ExpectLogPattern(pattern, timeout)) +#define REQUIRE_NO_LOG_MESSAGE(pattern, timeout) BOOST_REQUIRE(!ExpectLogPattern(pattern, timeout)) + namespace icinga { class TestLogger : public Logger diff --git a/test/notification-notificationcomponent.cpp b/test/notification-notificationcomponent.cpp index c2ef994a0..a67882ab7 100644 --- a/test/notification-notificationcomponent.cpp +++ b/test/notification-notificationcomponent.cpp @@ -5,6 +5,7 @@ #include "base/defer.hpp" #include "remote/apilistener.hpp" #include "test/base-testloggerfixture.hpp" +#include "test/utils.hpp" #include "config/configcompiler.hpp" #include "notification/notificationcomponent.hpp" @@ -194,22 +195,7 @@ object NotificationComponent "nc" {} void ReceiveCheckResults(std::size_t num, ServiceState state) { - StoppableWaitGroup::Ptr wg = new StoppableWaitGroup(); - - for (auto i = 0UL; i < num; ++i) { - CheckResult::Ptr cr = new CheckResult(); - - cr->SetState(state); - - double now = Utility::GetTime(); - cr->SetActive(false); - cr->SetScheduleStart(now); - cr->SetScheduleEnd(now); - cr->SetExecutionStart(now); - cr->SetExecutionEnd(now); - - BOOST_REQUIRE(m_Host->ProcessCheckResult(cr, wg) == Checkable::ProcessingResult::Ok); - } + ::ReceiveCheckResults(m_Host, num, state); } double GetLastNotificationTimestamp() { return m_Notification->GetLastNotification(); } diff --git a/test/perfdata-elasticsearchwriter.cpp b/test/perfdata-elasticsearchwriter.cpp new file mode 100644 index 000000000..ac6abac8d --- /dev/null +++ b/test/perfdata-elasticsearchwriter.cpp @@ -0,0 +1,57 @@ +// SPDX-FileCopyrightText: 2026 Icinga GmbH +// SPDX-License-Identifier: GPL-3.0-or-later + +#include +#include "perfdata/elasticsearchwriter.hpp" +#include "test/base-testloggerfixture.hpp" +#include "test/perfdata-perfdatawriterfixture.hpp" +#include "test/utils.hpp" + +using namespace icinga; + +BOOST_FIXTURE_TEST_SUITE(perfdata_elasticsearchwriter, PerfdataWriterFixture, + *boost::unit_test::label("perfdata") + *boost::unit_test::label("network") +) + +BOOST_AUTO_TEST_CASE(connect) +{ + ResumeWriter(); + + ReceiveCheckResults(1, ServiceState::ServiceCritical); + + Accept(); + auto resp = GetSplitDecodedRequestBody(); + SendResponse(); + + // ElasticsearchWriter wants to send the same message twice, once for the check result + // and once for the "state change". + resp = GetSplitDecodedRequestBody(); + SendResponse(); + + // Just some basic sanity tests. It's not important to check if everything is entirely + // correct here. + BOOST_REQUIRE_GT(resp->GetLength(), 1); + Dictionary::Ptr cr = resp->Get(1); + BOOST_CHECK(cr->Contains("@timestamp")); + BOOST_CHECK_EQUAL(cr->Get("check_command"), "dummy"); + BOOST_CHECK_EQUAL(cr->Get("host"), "h1"); + + PauseWriter(); +} + +BOOST_AUTO_TEST_CASE(pause_with_pending_work) +{ + ResumeWriter(); + + // Process check-results until the writer is stuck. + BOOST_REQUIRE_MESSAGE(GetWriterStuck(10s), "Failed to get Writer stuck."); + + // Now try to pause. + PauseWriter(); + + REQUIRE_LOG_MESSAGE("Connection stopped\\.", 10s); + REQUIRE_LOG_MESSAGE("'ElasticsearchWriter' paused\\.", 10s); +} + +BOOST_AUTO_TEST_SUITE_END() diff --git a/test/perfdata-gelfwriter.cpp b/test/perfdata-gelfwriter.cpp new file mode 100644 index 000000000..8da07bc4a --- /dev/null +++ b/test/perfdata-gelfwriter.cpp @@ -0,0 +1,48 @@ +// SPDX-FileCopyrightText: 2026 Icinga GmbH +// SPDX-License-Identifier: GPL-3.0-or-later + +#include +#include "perfdata/gelfwriter.hpp" +#include "test/base-testloggerfixture.hpp" +#include "test/perfdata-perfdatawriterfixture.hpp" +#include "test/utils.hpp" + +using namespace icinga; + +BOOST_FIXTURE_TEST_SUITE(perfdata_gelfwriter, PerfdataWriterFixture, + *boost::unit_test::label("perfdata") + *boost::unit_test::label("network") +) + +BOOST_AUTO_TEST_CASE(connect) +{ + ResumeWriter(); + + ReceiveCheckResults(1, ServiceState::ServiceCritical); + + Accept(); + Dictionary::Ptr resp = JsonDecode(GetDataUntil('\0')); + + // Just some basic sanity tests. It's not important to check if everything is entirely + // correct here. + BOOST_CHECK_CLOSE(resp->Get("timestamp").Get(), Utility::GetTime(), 0.5); + BOOST_CHECK_EQUAL(resp->Get("_check_command"), "dummy"); + BOOST_CHECK_EQUAL(resp->Get("_hostname"), "h1"); + PauseWriter(); +} + +BOOST_AUTO_TEST_CASE(pause_with_pending_work) +{ + ResumeWriter(); + + // Process check-results until the writer is stuck. + BOOST_REQUIRE_MESSAGE(GetWriterStuck(10s), "Failed to get Writer stuck."); + + // Now stop reading and try to pause OpenTsdbWriter. + PauseWriter(); + + REQUIRE_LOG_MESSAGE("Connection stopped\\.", 1s); + REQUIRE_LOG_MESSAGE("'GelfWriter' paused\\.", 1s); +} + +BOOST_AUTO_TEST_SUITE_END() diff --git a/test/perfdata-graphitewriter.cpp b/test/perfdata-graphitewriter.cpp new file mode 100644 index 000000000..9b5789fe2 --- /dev/null +++ b/test/perfdata-graphitewriter.cpp @@ -0,0 +1,47 @@ +// SPDX-FileCopyrightText: 2026 Icinga GmbH +// SPDX-License-Identifier: GPL-3.0-or-later + +#include +#include "base/perfdatavalue.hpp" +#include "perfdata/graphitewriter.hpp" +#include "test/base-testloggerfixture.hpp" +#include "test/perfdata-perfdatawriterfixture.hpp" +#include "test/utils.hpp" + +using namespace icinga; + +BOOST_FIXTURE_TEST_SUITE(perfdata_graphitewriter, PerfdataWriterFixture, + *boost::unit_test::label("perfdata") + *boost::unit_test::label("network") +) + +BOOST_AUTO_TEST_CASE(connect) +{ + ResumeWriter(); + + ReceiveCheckResults(1, ServiceState::ServiceCritical); + + Accept(); + auto msg = GetDataUntil('\n'); + + // Just some basic sanity tests. It's not important to check if everything is entirely correct here. + std::string_view cmpStr{"icinga2.h1.host.dummy.perfdata.dummy.value 42"}; + BOOST_REQUIRE_EQUAL(msg.substr(0, cmpStr.length()), cmpStr); + PauseWriter(); +} + +BOOST_AUTO_TEST_CASE(pause_with_pending_work) +{ + ResumeWriter(); + + // Process check-results until the writer is stuck. + BOOST_REQUIRE_MESSAGE(GetWriterStuck(10s), "Failed to get Writer stuck."); + + // Now stop reading and try to pause OpenTsdbWriter. + PauseWriter(); + + REQUIRE_LOG_MESSAGE("Connection stopped\\.", 10s); + REQUIRE_LOG_MESSAGE("'GraphiteWriter' paused\\.", 10s); +} + +BOOST_AUTO_TEST_SUITE_END() diff --git a/test/perfdata-influxdbwriter.cpp b/test/perfdata-influxdbwriter.cpp new file mode 100644 index 000000000..43837b50f --- /dev/null +++ b/test/perfdata-influxdbwriter.cpp @@ -0,0 +1,50 @@ +// SPDX-FileCopyrightText: 2026 Icinga GmbH +// SPDX-License-Identifier: GPL-3.0-or-later + +#include +#include "perfdata/influxdb2writer.hpp" +#include "test/base-testloggerfixture.hpp" +#include "test/perfdata-perfdatawriterfixture.hpp" + +using namespace icinga; + +BOOST_FIXTURE_TEST_SUITE(perfdata_influxdbwriter, PerfdataWriterFixture, + *boost::unit_test::label("perfdata") + *boost::unit_test::label("network") +) + +BOOST_AUTO_TEST_CASE(connect) +{ + ResumeWriter(); + + ReceiveCheckResults(1, ServiceState::ServiceCritical); + + Accept(); + auto req = GetSplitRequestBody(','); + SendResponse(boost::beast::http::status::no_content); + + // Just some basic sanity tests. It's not important to check if everything is entirely + // correct here. + BOOST_REQUIRE_EQUAL(req.size(), 3); + BOOST_CHECK_EQUAL(req[0], "dummy"); + BOOST_CHECK_EQUAL(req[1], "hostname=h1"); + std::string_view perfData = "metric=dummy value=42"; + BOOST_CHECK_EQUAL(req[2].substr(0, perfData.length()), perfData); + PauseWriter(); +} + +BOOST_AUTO_TEST_CASE(pause_with_pending_work) +{ + ResumeWriter(); + + // Process check-results until the writer is stuck. + BOOST_REQUIRE_MESSAGE(GetWriterStuck(10s), "Failed to get Writer stuck."); + + // Now try to pause. + PauseWriter(); + + REQUIRE_LOG_MESSAGE("Connection stopped\\.", 10s); + REQUIRE_LOG_MESSAGE("'Influxdb2Writer' paused\\.", 1s); +} + +BOOST_AUTO_TEST_SUITE_END() diff --git a/test/perfdata-opentsdbwriter.cpp b/test/perfdata-opentsdbwriter.cpp new file mode 100644 index 000000000..c3ec47d6d --- /dev/null +++ b/test/perfdata-opentsdbwriter.cpp @@ -0,0 +1,53 @@ +// SPDX-FileCopyrightText: 2026 Icinga GmbH +// SPDX-License-Identifier: GPL-3.0-or-later + +#include +#include "base/perfdatavalue.hpp" +#include "perfdata/opentsdbwriter.hpp" +#include "test/base-testloggerfixture.hpp" +#include "test/perfdata-perfdatawriterfixture.hpp" +#include "test/utils.hpp" + +using namespace icinga; + +BOOST_FIXTURE_TEST_SUITE(perfdata_opentsdbwriter, PerfdataWriterFixture, + *boost::unit_test::label("perfdata") + *boost::unit_test::label("network") +) + +BOOST_AUTO_TEST_CASE(connect) +{ + ResumeWriter(); + + ReceiveCheckResults(1, ServiceState::ServiceCritical); + + Accept(); + auto msg = GetDataUntil('\n'); + std::vector splitMsg; + boost::split(splitMsg, msg, boost::is_any_of(" ")); + + // Just some basic sanity tests. It's not important to check if everything is entirely correct here. + BOOST_REQUIRE_EQUAL(splitMsg.size(), 5); + BOOST_REQUIRE_EQUAL(splitMsg[0], "put"); + BOOST_REQUIRE_EQUAL(splitMsg[1], "icinga.host.state"); + BOOST_REQUIRE_CLOSE(boost::lexical_cast(splitMsg[2]), Utility::GetTime(), 1); + BOOST_REQUIRE_EQUAL(splitMsg[3], "1"); + BOOST_REQUIRE_EQUAL(splitMsg[4], "host=h1"); + PauseWriter(); +} + +BOOST_AUTO_TEST_CASE(pause_with_pending_work) +{ + ResumeWriter(); + + // Process check-results until the writer is stuck. + BOOST_REQUIRE_MESSAGE(GetWriterStuck(10s), "Failed to get Writer stuck."); + + // Now stop reading and try to pause OpenTsdbWriter. + PauseWriter(); + + REQUIRE_LOG_MESSAGE("Connection stopped\\.", 10s); + REQUIRE_LOG_MESSAGE("'OpenTsdbWriter' paused\\.", 10s); +} + +BOOST_AUTO_TEST_SUITE_END() diff --git a/test/perfdata-perfdatatargetfixture.hpp b/test/perfdata-perfdatatargetfixture.hpp new file mode 100644 index 000000000..871610ff5 --- /dev/null +++ b/test/perfdata-perfdatatargetfixture.hpp @@ -0,0 +1,198 @@ +// SPDX-FileCopyrightText: 2026 Icinga GmbH +// SPDX-License-Identifier: GPL-3.0-or-later + +#pragma once + +#include +#include "base/io-engine.hpp" +#include "base/json.hpp" +#include "base/tlsstream.hpp" +#include +#include +#include +#include +#include +#include +#include +#include +#include + +namespace icinga { + +/** + * A fixture that provides methods to simulate a perfdata target + */ +class PerfdataWriterTargetFixture +{ +public: + PerfdataWriterTargetFixture() + : icinga::PerfdataWriterTargetFixture(Shared::Make(IoEngine::Get().GetIoContext())) + { + } + + explicit PerfdataWriterTargetFixture(const Shared::Ptr& sslCtx) + : icinga::PerfdataWriterTargetFixture(Shared::Make(IoEngine::Get().GetIoContext(), *sslCtx)) + { + m_SslContext = sslCtx; + } + + explicit PerfdataWriterTargetFixture(AsioTlsOrTcpStream stream) + : m_Stream(std::move(stream)), + m_Acceptor( + IoEngine::Get().GetIoContext(), + boost::asio::ip::tcp::endpoint{boost::asio::ip::address_v4::loopback(), 0} + ) + { + } + + unsigned short GetPort() { return m_Acceptor.local_endpoint().port(); } + + void Accept() + { + BOOST_REQUIRE_NO_THROW( + std::visit([&](auto& stream) { return m_Acceptor.accept(stream->lowest_layer()); }, m_Stream) + ); + } + + void Handshake() + { + BOOST_REQUIRE(std::holds_alternative::Ptr>(m_Stream)); + using handshake_type = UnbufferedAsioTlsStream::handshake_type; + auto& stream = std::get::Ptr>(m_Stream); + BOOST_REQUIRE_NO_THROW(stream->next_layer().handshake(handshake_type::server)); + BOOST_REQUIRE(stream->next_layer().IsVerifyOK()); + } + + void Shutdown() + { + BOOST_REQUIRE(std::holds_alternative::Ptr>(m_Stream)); + auto& stream = std::get::Ptr>(m_Stream); + try { + stream->next_layer().shutdown(); + } catch (const std::exception& ex) { + if (const auto* se = dynamic_cast(&ex); + !se || se->code() != boost::asio::error::eof) { + BOOST_FAIL("Exception in shutdown(): " << ex.what()); + } + } + + ResetStream(); + } + + void ResetStream() + { + if (std::holds_alternative::Ptr>(m_Stream)) { + m_Stream = Shared::Make(IoEngine::Get().GetIoContext(), *m_SslContext); + } else { + m_Stream = Shared::Make(IoEngine::Get().GetIoContext()); + } + } + + /** + * Reads the HTTP request body from the stream, with an optional limit on the number of bytes to read. + * + * @param bytes The maximum number of bytes to read from the request body. If 0, there is no limit and the entire body will be read. + * + * @return The HTTP request read from the stream. + */ + boost::beast::http::request GetRequest(std::size_t bytes = 0) + { + using namespace boost::beast; + + boost::beast::flat_buffer buf; + if (bytes > 0) { + buf = boost::beast::flat_buffer{bytes}; + } + boost::system::error_code ec; + http::request_parser parser; + parser.body_limit(-1); + std::visit( + [&](auto& stream) { + http::read(*stream, buf, parser, ec); + }, + m_Stream + ); + if (bytes > 0) { + BOOST_REQUIRE_MESSAGE( + !ec || ec == http::error::buffer_overflow, + "Reading request body with a buffer limit of '" << bytes << + "' should either succeed or fail with a buffer_overflow error, but got: " << ec.message() + ); + } else { + BOOST_REQUIRE_MESSAGE(!ec, "Error while reading request body: " << ec.message()); + BOOST_REQUIRE_MESSAGE(parser.is_done(), "Parser did not finish reading the request, but no error was set."); + } + return parser.release(); + } + + auto GetSplitRequestBody(char delim) + { + auto request = GetRequest(); + std::vector result{}; + boost::split(result, request.body(), boost::is_any_of(std::string{delim})); + return result; + } + + auto GetSplitDecodedRequestBody() + { + Array::Ptr result = new Array; + for (const auto& line : GetSplitRequestBody('\n')) { + if (!line.empty()) { + result->Add(JsonDecode(line)); + } + } + return result; + } + + template + std::string GetDataUntil(T&& delim) + { + using namespace boost::asio::ip; + + std::size_t delimLength{1}; + if constexpr (!std::is_same_v, char>) { + delimLength = std::string_view{delim}.size(); + } + + boost::asio::streambuf buf; + boost::system::error_code ec; + auto bytesRead = std::visit( + [&](auto& stream) { return boost::asio::read_until(*stream, buf, std::forward(delim), ec); }, m_Stream + ); + BOOST_REQUIRE_MESSAGE(!ec, ec.message()); + + std::string ret{ + boost::asio::buffers_begin(buf.data()), boost::asio::buffers_begin(buf.data()) + bytesRead - delimLength + }; + buf.consume(bytesRead); + + return ret; + } + + void SendResponse(boost::beast::http::status status = boost::beast::http::status::ok) + { + using namespace boost::asio::ip; + using namespace boost::beast; + + boost::system::error_code ec; + http::response response; + response.result(status); + response.prepare_payload(); + std::visit( + [&](auto& stream) { + http::write(*stream, response, ec); + BOOST_REQUIRE_MESSAGE(!ec, ec.message()); + stream->flush(ec); + BOOST_REQUIRE_MESSAGE(!ec, ec.message()); + }, + m_Stream + ); + } + +private: + AsioTlsOrTcpStream m_Stream; + boost::asio::ip::tcp::acceptor m_Acceptor; + Shared::Ptr m_SslContext; +}; + +} // namespace icinga diff --git a/test/perfdata-perfdatawriterconnection.cpp b/test/perfdata-perfdatawriterconnection.cpp new file mode 100644 index 000000000..649754421 --- /dev/null +++ b/test/perfdata-perfdatawriterconnection.cpp @@ -0,0 +1,335 @@ +// SPDX-FileCopyrightText: 2026 Icinga GmbH +// SPDX-License-Identifier: GPL-3.0-or-later + +#include "perfdata/perfdatawriterconnection.hpp" +#include "test/perfdata-perfdatatargetfixture.hpp" +#include "test/remote-certificate-fixture.hpp" +#include "test/test-ctest.hpp" +#include "test/test-thread.hpp" +#include "test/utils.hpp" + +using namespace icinga; + +class TlsPerfdataWriterFixture : public CertificateFixture, public PerfdataWriterTargetFixture +{ +public: + TlsPerfdataWriterFixture() : PerfdataWriterTargetFixture(MakeContext("server")) + { + m_PdwSslContext = MakeContext("client"); + + m_Conn = new PerfdataWriterConnection{"Test", "test", "127.0.0.1", std::to_string(GetPort()), m_PdwSslContext}; + } + + auto& GetConnection() { return *m_Conn; } + +private: + Shared::Ptr MakeContext(const std::string& name) + { + auto testCert = EnsureCertFor(name); + return SetupSslContext( + testCert.crtFile, + testCert.keyFile, + m_CaCrtFile.string(), + "", + DEFAULT_TLS_CIPHERS, + DEFAULT_TLS_PROTOCOLMIN, + DebugInfo() + ); + } + + Shared::Ptr m_PdwSslContext; + PerfdataWriterConnection::Ptr m_Conn; +}; + +BOOST_FIXTURE_TEST_SUITE(perfdata_connection, TlsPerfdataWriterFixture, + *CTestProperties("FIXTURES_REQUIRED ssl_certs") + *boost::unit_test::label("perfdata") + *boost::unit_test::label("network") +) + +/* If there is no acceptor listening on the other side, connecting should fail. + */ +BOOST_AUTO_TEST_CASE(connection_refused) +{ + std::promise p; + TestThread timeoutThread{[&]() { + auto f = p.get_future(); + GetConnection().CancelAfterTimeout(f, 50ms); + }}; + + BOOST_REQUIRE_THROW( + GetConnection().Send(boost::asio::const_buffer{"foobar", 7}), PerfdataWriterConnection::Stopped + ); + + REQUIRE_JOINS_WITHIN(timeoutThread, 1s); +} + +/* The PerfdataWriterConnection connects automatically when sending the first data. + * In case of http we also need to support disconnecting and reconnecting. + */ +BOOST_AUTO_TEST_CASE(ensure_connected) +{ + std::promise disconnectedPromise; + + TestThread mockTargetThread{[&]() { + Accept(); + Handshake(); + auto ret = GetDataUntil('\0'); + Shutdown(); + disconnectedPromise.get_future().get(); + BOOST_REQUIRE_EQUAL(ret, "foobar"); + }}; + + BOOST_REQUIRE_NO_THROW(GetConnection().Send(boost::asio::const_buffer{"foobar", 7})); + BOOST_REQUIRE_NO_THROW(GetConnection().Disconnect()); + disconnectedPromise.set_value(); + + REQUIRE_JOINS_WITHIN(mockTargetThread, 1s); +} + +/* Verify that data can still be sent while CancelAfterTimeout is waiting and the timeout + * can be aborted when all data has been sent successfully. + */ +BOOST_AUTO_TEST_CASE(finish_during_timeout) +{ + std::promise p; + + TestThread mockTargetThread{[&]() { + Accept(); + Handshake(); + auto ret = GetDataUntil('\0'); + BOOST_REQUIRE_EQUAL(ret, "foobar"); + ret = GetDataUntil('\0'); + BOOST_REQUIRE_EQUAL(ret, "foobar"); + // This is done here instead of the main thread after send, because we need to + // synchronize the asserts done in the timeoutThread after this point. + p.set_value(); + Shutdown(); + }}; + + GetConnection().Send(boost::asio::const_buffer{"foobar", 7}); + + TestThread timeoutThread{[&]() { + auto f = p.get_future(); + GetConnection().CancelAfterTimeout(f, 50ms); + BOOST_REQUIRE(f.wait_for(0ms) == std::future_status::ready); + BOOST_REQUIRE(!GetConnection().IsConnected()); + }}; + + GetConnection().Send(boost::asio::const_buffer{"foobar", 7}); + + REQUIRE_JOINS_WITHIN(timeoutThread, 1s); + REQUIRE_JOINS_WITHIN(mockTargetThread, 1s); +} + +/* For the client, even a hanging server will accept the connection immediately, since it's done + * in the kernel. But in that case the TLS handshake will be stuck, so we need to verify that a + * handshake can be interrupted by CancelAfterTimeout(). + */ +BOOST_AUTO_TEST_CASE(stuck_in_handshake) +{ + TestThread mockTargetThread{[&]() { Accept(); }}; + + std::promise p; + TestThread timeoutThread{[&]() { + auto f = p.get_future(); + GetConnection().CancelAfterTimeout(f, 50ms); + BOOST_REQUIRE(f.wait_for(0ms) == std::future_status::timeout); + }}; + + BOOST_REQUIRE_THROW( + GetConnection().Send(boost::asio::const_buffer{"foobar", 7}), PerfdataWriterConnection::Stopped + ); + + REQUIRE_JOINS_WITHIN(timeoutThread, 1s); + REQUIRE_JOINS_WITHIN(mockTargetThread, 1s); +} + +/* When the disconnect timeout runs out while sending something to a slow or blocking server, we + * expect the send to be aborted after a timeout with an 'operation cancelled' exception, in + * order to not delay the shutdown of a perfdata writer indefinitely. + * No orderly TLS shutdown can be performed in this case, because the stream has been truncated. + * The server will need to handle this one on their own. + */ +BOOST_AUTO_TEST_CASE(stuck_sending) +{ + std::promise shutdownPromise; + std::promise dataReadPromise; + + TestThread mockTargetThread{[&]() { + Accept(); + Handshake(); + auto ret = GetDataUntil("#"); + BOOST_REQUIRE_EQUAL(ret, "foobar"); + dataReadPromise.set_value(); + + // There's still a full buffer waiting to be read, but we're pretending to be dead and + // close the socket at this point. + shutdownPromise.get_future().get(); + ResetStream(); + }}; + + TestThread timeoutThread{[&]() { + // Synchronize with when mockTargetThread has read the initial data. + // This should especially help with timing on slow machines like the ARM GHA runners. + dataReadPromise.get_future().get(); + BOOST_REQUIRE(GetConnection().IsConnected()); + BOOST_REQUIRE_NO_THROW(GetConnection().Disconnect()); + BOOST_REQUIRE(!GetConnection().IsConnected()); + }}; + + // Allocate a large string that will fill the buffers on both sides of the connection, in + // order to make Send() block. + auto randomData = GetRandomString("foobar#", 4UL * 1024 * 1024); + auto buf = boost::asio::const_buffer{randomData.data(), randomData.size()}; + BOOST_REQUIRE_THROW(GetConnection().Send(buf), PerfdataWriterConnection::Stopped); + shutdownPromise.set_value(); + + REQUIRE_JOINS_WITHIN(timeoutThread, 1s); + REQUIRE_JOINS_WITHIN(mockTargetThread, 1s); +} + +/* This simulates a server that is stuck after receiving a HTTP request and before sending their + * response. Here, the simulated server is polite and still responds to a shutdown request, but + * in reality a server might not even do that. That case should be handled by our + * AsioTlsStream::GracefulDisconnect() function with an additional 10s timeout. + */ +BOOST_AUTO_TEST_CASE(stuck_reading_response) +{ + std::promise shutdownPromise; + std::promise requestReadPromise; + + TestThread mockTargetThread{[&]() { + Accept(); + Handshake(); + auto ret = GetRequest(); + BOOST_REQUIRE_EQUAL(ret.body(), "bar"); + requestReadPromise.set_value(); + // Do not send a response but react to the shutdown to be polite. + shutdownPromise.get_future().get(); + Shutdown(); + }}; + + TestThread timeoutThread{[&]() { + // Synchronize with after mockTargetThread has read the request + requestReadPromise.get_future().get(); + BOOST_REQUIRE(GetConnection().IsConnected()); + BOOST_REQUIRE_NO_THROW(GetConnection().Disconnect()); + BOOST_REQUIRE(!GetConnection().IsConnected()); + }}; + + boost::beast::http::request request; + request.body() = "bar"; + request.method(boost::beast::http::verb::get); + request.target("foo"); + request.prepare_payload(); + BOOST_REQUIRE_THROW(GetConnection().Send(request), PerfdataWriterConnection::Stopped); + shutdownPromise.set_value(); + + REQUIRE_JOINS_WITHIN(timeoutThread, 1s); + REQUIRE_JOINS_WITHIN(mockTargetThread, 1s); +} + +/* This test simulates a server that closes the connection and reappears at a later time. + * PerfdataWriterConnection should detect the disconnect, catch the exception and attempt to + * reconnect without exiting Send(). + */ +BOOST_AUTO_TEST_CASE(reconnect_failed) +{ + TestThread mockTargetThread{[&]() { + Accept(); + Handshake(); + auto ret = GetDataUntil("#"); + BOOST_REQUIRE_EQUAL(ret, "foobar"); + + ResetStream(); + + Accept(); + Handshake(); + + ret = GetDataUntil("#"); + BOOST_REQUIRE_EQUAL(ret, "foobar"); + ret = GetDataUntil("\n"); + + Shutdown(); + }}; + + // Allocate a large string that will fill the buffers on both sides of the connection, in + // order to make Send() block. + auto randomData = GetRandomString("foobar#", 4UL * 1024 * 1024); + randomData.push_back('\n'); + BOOST_REQUIRE_NO_THROW(GetConnection().Send(boost::asio::const_buffer{randomData.data(), randomData.size()})); + BOOST_REQUIRE_NO_THROW(GetConnection().Disconnect()); + + REQUIRE_JOINS_WITHIN(mockTargetThread, 1s); +} + +/* This tests if retrying an http send will reproducibly lead to the exact same message being + * received. Normally this us guaranteed by the interface only accepting a const reference, but + * since on older boost versions the async_write() functions also accept non-const references, it + * doesn't hurt to ensure this with a test-case. + */ +BOOST_AUTO_TEST_CASE(http_send_retry) +{ + TestThread mockTargetThread{[&] { + Accept(); + Handshake(); + + /* Read only the first 512 bytes of the request body, since we don't want to unblock the client yet. + */ + auto request = GetRequest(512); + BOOST_REQUIRE_MESSAGE( + request.method() == boost::beast::http::verb::post, + "Request method is not POST: " << request.method_string() + ); + BOOST_REQUIRE_MESSAGE(request.target() == "foo", "Request target is not 'foo': " << request.target()); + BOOST_REQUIRE_MESSAGE( + request.body().compare(0, 7, "foobar#") == 0, + "Request body does not start with 'foobar#': " << request.body().substr(0, 7) + ); + + ResetStream(); + Accept(); + Handshake(); + + /* Read the entire response now and verify that we still get the expected body, + * even though the first read was only partial. + */ + request = GetRequest(); + BOOST_REQUIRE_MESSAGE( + request.method() == boost::beast::http::verb::post, + "Request method is not POST: " << request.method_string() + ); + BOOST_REQUIRE_MESSAGE(request.target() == "foo", "Request target is not 'foo': " << request.target()); + BOOST_REQUIRE_MESSAGE( + request.body().compare(0, 7, "foobar#") == 0, + "Request body does not start with 'foobar#': " << request.body().substr(0, 7) + ); + + /* The body size is 4MB + 7 bytes (7 bytes for the "foobar#" prefix of the generated message) + */ + BOOST_REQUIRE_MESSAGE( + request.body().size() == (4UL * 1024 * 1024) + 7, + "Request body is not the expected size: " << request.body().size() + ); + + SendResponse(); + + Shutdown(); + }}; + + boost::beast::http::request request{boost::beast::http::verb::post, "foo", 10}; + request.set(boost::beast::http::field::host, "localhost:" + std::to_string(GetPort())); + + /* Allocate a large string that will fill the buffers on both sides of the connection, in + * order to make Send() block. + */ + request.body() = GetRandomString("foobar#", 4UL * 1024 * 1024); + request.prepare_payload(); + BOOST_REQUIRE_NO_THROW(GetConnection().Send(request)); + BOOST_REQUIRE_NO_THROW(GetConnection().Disconnect()); + + REQUIRE_JOINS_WITHIN(mockTargetThread, 1s); +} + +BOOST_AUTO_TEST_SUITE_END() diff --git a/test/perfdata-perfdatawriterfixture.hpp b/test/perfdata-perfdatawriterfixture.hpp new file mode 100644 index 000000000..e70b2123c --- /dev/null +++ b/test/perfdata-perfdatawriterfixture.hpp @@ -0,0 +1,139 @@ +// SPDX-FileCopyrightText: 2026 Icinga GmbH +// SPDX-License-Identifier: GPL-3.0-or-later + +#pragma once + +#include +#include "base/perfdatavalue.hpp" +#include "config/configcompiler.hpp" +#include "config/configitem.hpp" +#include "icinga/host.hpp" +#include "test/base-testloggerfixture.hpp" +#include "test/perfdata-perfdatatargetfixture.hpp" +#include "test/utils.hpp" +#include + +namespace icinga { + +template +class PerfdataWriterFixture : public PerfdataWriterTargetFixture, public TestLoggerFixture +{ +public: + PerfdataWriterFixture() : m_Writer(new Writer) + { + auto createObjects = [&]() { + String config = R"CONFIG( +object CheckCommand "dummy" { + command = "/bin/echo" +} +object Host "h1" { + address = "h1" + check_command = "dummy" + enable_notifications = true + enable_active_checks = false + enable_passive_checks = true +} +)CONFIG"; + + std::unique_ptr expr = ConfigCompiler::CompileText("", config); + expr->Evaluate(*ScriptFrame::GetCurrentFrame()); + }; + + ConfigItem::RunWithActivationContext(new Function("CreateTestObjects", createObjects)); + + m_Host = Host::GetByName("h1"); + BOOST_REQUIRE(m_Host); + + m_Writer->SetPort(std::to_string(GetPort())); + m_Writer->SetName(m_Writer->GetReflectionType()->GetName()); + m_Writer->SetDisconnectTimeout(0.05); + m_Writer->Register(); + + auto hasFlushInterval = boost::hana::is_valid([](auto&& obj) -> decltype(obj.SetFlushInterval(0.05)) {}); + if constexpr (decltype(hasFlushInterval(std::declval()))::value) { + m_Writer->SetFlushInterval(0.05); + } + + auto hasFlushThreshold = boost::hana::is_valid([](auto&& obj) -> decltype(obj.SetFlushThreshold(1)) {}); + if constexpr (decltype(hasFlushThreshold(std::declval()))::value) { + m_Writer->SetFlushThreshold(1); + } + } + + void ReceiveCheckResults( + std::size_t num, + ServiceState state, + const std::function& fn = {} + ) + { + ::ReceiveCheckResults(m_Host, num, state, fn); + } + + std::size_t GetWorkQueueLength() + { + Array::Ptr dummy = new Array; + Dictionary::Ptr status = new Dictionary; + m_Writer->StatsFunc(status, dummy); + ObjectLock lock{status}; + // Unpack the single-key top-level dictionary + Dictionary::Ptr writer = status->Begin()->second; + BOOST_REQUIRE(writer); + Dictionary::Ptr values = writer->Get(m_Writer->GetName()); + BOOST_REQUIRE(values); + BOOST_REQUIRE(values->Contains("work_queue_items")); + return values->Get("work_queue_items"); + } + + /** + * Processes check results until the writer's work queue is no longer moving. + * + * @param timeout Time after which to give up trying to get the writer stuck + * @return true if the writer is now stuck + */ + bool GetWriterStuck(std::chrono::milliseconds timeout) + { + auto start = std::chrono::steady_clock::now(); + std::size_t unchangedCount = 0; + while(true){ + ReceiveCheckResults(10, ServiceCritical, [&](const CheckResult::Ptr& cr) { + cr->GetPerformanceData()->Add(new PerfdataValue{GetRandomString("", 4096), 1}); + }); + + if (std::chrono::steady_clock::now() - start >= timeout) { + return false; + } + + auto numWq = GetWorkQueueLength(); + if (numWq >= 10) { + std::this_thread::sleep_for(1ms); + if (numWq == GetWorkQueueLength()) { + if (unchangedCount < 5) { + ++unchangedCount; + continue; + } + return true; + } + + unchangedCount = 0; + } + } + } + + void ResumeWriter() + { + static_cast(m_Writer)->OnConfigLoaded(); + m_Writer->SetActive(true); + m_Writer->Activate(); + BOOST_REQUIRE(!m_Writer->IsPaused()); + } + + void PauseWriter() { static_cast(m_Writer)->Pause(); } + + auto GetWriter() { return m_Writer; } + +private: + Host::Ptr m_Host; + typename Writer::Ptr m_Writer; +}; + +} // namespace icinga diff --git a/test/test-thread.hpp b/test/test-thread.hpp new file mode 100644 index 000000000..f78383282 --- /dev/null +++ b/test/test-thread.hpp @@ -0,0 +1,60 @@ +// SPDX-FileCopyrightText: 2026 Icinga GmbH +// SPDX-License-Identifier: GPL-3.0-or-later + +#pragma once + +#include +#include +#include +#include + +#define REQUIRE_JOINS_WITHIN(t, timeout) \ + BOOST_REQUIRE_MESSAGE(t.TryJoinWithin(timeout), "Thread not joinable within timeout.") +#define CHECK_JOINS_WITHIN(t, timeout) \ + BOOST_REQUIRE_MESSAGE(t.TryJoinWithin(timeout), "Thread not joinable within timeout.") +#define TEST_JOINS_WITHIN(t, timeout) \ + BOOST_REQUIRE_MESSAGE(t.TryJoinWithin(timeout), "Thread not joinable within timeout.") + +#define REQUIRE_JOINABLE(t) BOOST_REQUIRE_MESSAGE(t.Joinable(), "Thread not joinable.") +#define CHECK_JOINABLE(t) BOOST_REQUIRE_MESSAGE(t.Joinable(), "Thread not joinable.") +#define TEST_JOINABLE(t) BOOST_REQUIRE_MESSAGE(t.Joinable(), "Thread not joinable.") + +namespace icinga { + +class TestThread +{ +public: + explicit TestThread(std::function fn) : TestThread(std::move(fn), std::promise{}) {} + + bool Joinable() + { + auto status = m_JoinFuture.wait_for(std::chrono::milliseconds{0}); + return status == std::future_status::ready; + } + + template + bool TryJoinWithin(std::chrono::duration timeout) + { + auto status = m_JoinFuture.wait_for(timeout); + if (status == std::future_status::ready) { + m_Thread.join(); + return true; + } + return false; + } + +private: + explicit TestThread(std::function fn, std::promise joinPromise) + : m_JoinFuture(joinPromise.get_future()), + m_Thread([fn = std::move(fn), jp = std::move(joinPromise)]() mutable { + fn(); + jp.set_value(); + }) + { + } + + std::future m_JoinFuture; + std::thread m_Thread; +}; + +} // namespace icinga diff --git a/test/utils.cpp b/test/utils.cpp index a0aba80d0..95a9936cd 100644 --- a/test/utils.cpp +++ b/test/utils.cpp @@ -2,8 +2,10 @@ // SPDX-License-Identifier: GPL-2.0-or-later #include "utils.hpp" +#include "base/perfdatavalue.hpp" #include #include +#include #include #include @@ -66,3 +68,58 @@ GlobalTimezoneFixture::~GlobalTimezoneFixture() #endif tzset(); } + +std::string GetRandomString(std::string prefix, std::size_t length) +{ + std::random_device rd; + std::mt19937 gen(rd()); + std::uniform_int_distribution distribution('!', '~'); + + for (auto i = 0U; i < length; i++) { + prefix += static_cast(distribution(gen)); + } + + return prefix; +} + +/** + * Make our test host receive a number of check-results. + * + * @param num The number of check-results to receive + * @param state The state the check results should have + * @param fn A function that will be passed the current check-result + */ +void ReceiveCheckResults( + const icinga::Checkable::Ptr& host, + std::size_t num, + icinga::ServiceState state, + const std::function& fn +) +{ + using namespace icinga; + + StoppableWaitGroup::Ptr wg = new StoppableWaitGroup(); + + for (auto i = 0UL; i < num; ++i) { + CheckResult::Ptr cr = new CheckResult(); + + cr->SetState(state); + + double now = Utility::GetTime(); + cr->SetActive(false); + cr->SetScheduleStart(now); + cr->SetScheduleEnd(now); + cr->SetExecutionStart(now); + cr->SetExecutionEnd(now); + + Array::Ptr perfData = new Array; + perfData->Add(new PerfdataValue{"dummy", 42}); + cr->SetPerformanceData(perfData); + + if (fn) { + fn(cr); + } + + BOOST_REQUIRE(host->ProcessCheckResult(cr, wg) == Checkable::ProcessingResult::Ok); + } +} diff --git a/test/utils.hpp b/test/utils.hpp index 67d2575a2..ec8b245d4 100644 --- a/test/utils.hpp +++ b/test/utils.hpp @@ -3,7 +3,9 @@ #pragma once +#include "icinga/host.hpp" #include +#include #include tm make_tm(std::string s); @@ -24,3 +26,12 @@ struct GlobalTimezoneFixture char *tz; }; + +std::string GetRandomString(std::string prefix, std::size_t length); + +void ReceiveCheckResults( + const icinga::Checkable::Ptr& host, + std::size_t num, + icinga::ServiceState state, + const std::function& fn = {} +);