From da2fc9df01f85aff96c9eba13e07b203b82dac24 Mon Sep 17 00:00:00 2001 From: Johannes Schmidt Date: Mon, 2 Feb 2026 13:43:31 +0100 Subject: [PATCH] Use PerfdataWriterConnection in perfdata writers --- lib/perfdata/elasticsearchwriter.cpp | 199 +++++++------------------ lib/perfdata/elasticsearchwriter.hpp | 12 +- lib/perfdata/elasticsearchwriter.ti | 5 +- lib/perfdata/gelfwriter.cpp | 204 ++++++-------------------- lib/perfdata/gelfwriter.hpp | 18 +-- lib/perfdata/gelfwriter.ti | 5 +- lib/perfdata/graphitewriter.cpp | 152 +++---------------- lib/perfdata/graphitewriter.hpp | 18 +-- lib/perfdata/graphitewriter.ti | 5 +- lib/perfdata/influxdbcommonwriter.cpp | 184 ++++++----------------- lib/perfdata/influxdbcommonwriter.hpp | 16 +- lib/perfdata/influxdbcommonwriter.ti | 5 +- lib/perfdata/opentsdbwriter.cpp | 87 +++-------- lib/perfdata/opentsdbwriter.hpp | 11 +- lib/perfdata/opentsdbwriter.ti | 6 +- 15 files changed, 219 insertions(+), 708 deletions(-) diff --git a/lib/perfdata/elasticsearchwriter.cpp b/lib/perfdata/elasticsearchwriter.cpp index 9445a16e0..044656966 100644 --- a/lib/perfdata/elasticsearchwriter.cpp +++ b/lib/perfdata/elasticsearchwriter.cpp @@ -2,6 +2,7 @@ // SPDX-License-Identifier: GPL-2.0-or-later #include "perfdata/elasticsearchwriter.hpp" +#include "base/defer.hpp" #include "perfdata/elasticsearchwriter-ti.cpp" #include "remote/url.hpp" #include "icinga/compatutility.hpp" @@ -9,30 +10,14 @@ #include "icinga/macroprocessor.hpp" #include "icinga/checkcommand.hpp" #include "base/application.hpp" -#include "base/defer.hpp" -#include "base/io-engine.hpp" -#include "base/tcpsocket.hpp" #include "base/stream.hpp" #include "base/base64.hpp" #include "base/json.hpp" #include "base/utility.hpp" -#include "base/networkstream.hpp" #include "base/perfdatavalue.hpp" #include "base/exception.hpp" #include "base/statsfunction.hpp" #include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include #include #include @@ -78,12 +63,25 @@ void ElasticsearchWriter::StatsFunc(const Dictionary::Ptr& status, const Array:: status->Set("elasticsearchwriter", new Dictionary(std::move(nodes))); } +void ElasticsearchWriter::Start(bool runtimeCreated) +{ + ObjectImpl::Start(runtimeCreated); + + if (GetEnableTls()) { + try { + m_SslContext = MakeAsioSslContext(GetCertPath(), GetKeyPath(), GetCaPath()); + } catch (const std::exception& ex) { + Log(LogCritical, "ElasticsearchWriter") + << "Unable to create SSL context: " << ex.what(); + throw; + } + } +} + void ElasticsearchWriter::Resume() { ObjectImpl::Resume(); - m_EventPrefix = "icinga2.event."; - Log(LogInformation, "ElasticsearchWriter") << "'" << GetName() << "' resumed."; @@ -96,6 +94,8 @@ void ElasticsearchWriter::Resume() m_FlushTimer->Start(); m_FlushTimer->Reschedule(0); + m_Connection = new PerfdataWriterConnection{this, GetHost(), GetPort(), m_SslContext, !GetInsecureNoverify()}; + /* Register for new metrics. */ m_HandleCheckResults = Checkable::OnNewCheckResult.connect([this](const Checkable::Ptr& checkable, const CheckResult::Ptr& cr, const MessageOrigin::Ptr&) { @@ -120,12 +120,17 @@ void ElasticsearchWriter::Pause() m_HandleNotifications.disconnect(); m_FlushTimer->Stop(true); - m_WorkQueue.Join(); - { - std::unique_lock lock (m_DataBufferMutex); + std::promise queueDonePromise; + m_WorkQueue.Enqueue([&]() { Flush(); - } + queueDonePromise.set_value(); + }, PriorityLow); + + auto timeout = std::chrono::duration{GetDisconnectTimeout()}; + m_Connection->CancelAfterTimeout(queueDonePromise.get_future(), timeout); + + m_WorkQueue.Join(); Log(LogInformation, "ElasticsearchWriter") << "'" << GetName() << "' paused."; @@ -269,6 +274,10 @@ void ElasticsearchWriter::CheckResultHandler(const Checkable::Ptr& checkable, co AddTemplateTags(fields, checkable, cr); m_WorkQueue.Enqueue([this, checkable, cr, fields = std::move(fields)]() { + if (m_Connection->IsStopped()) { + return; + } + CONTEXT("Elasticwriter processing check result for '" << checkable->GetName() << "'"); AddCheckResult(fields, checkable, cr); @@ -308,6 +317,10 @@ void ElasticsearchWriter::StateChangeHandler(const Checkable::Ptr& checkable, co AddTemplateTags(fields, checkable, cr); m_WorkQueue.Enqueue([this, checkable, cr, fields = std::move(fields)]() { + if (m_Connection->IsStopped()) { + return; + } + CONTEXT("Elasticwriter processing state change '" << checkable->GetName() << "'"); AddCheckResult(fields, checkable, cr); @@ -358,6 +371,10 @@ void ElasticsearchWriter::NotificationSentToAllUsersHandler(const Checkable::Ptr AddTemplateTags(fields, checkable, cr); m_WorkQueue.Enqueue([this, checkable, cr, fields = std::move(fields)]() { + if (m_Connection->IsStopped()) { + return; + } + CONTEXT("Elasticwriter processing notification to all users '" << checkable->GetName() << "'"); Log(LogDebug, "ElasticsearchWriter") @@ -379,15 +396,10 @@ void ElasticsearchWriter::Enqueue(const Checkable::Ptr& checkable, const String& { AssertOnWorkQueue(); - /* Atomically buffer the data point. */ - std::unique_lock lock(m_DataBufferMutex); - /* Format the timestamps to dynamically select the date datatype inside the index. */ fields->Set("@timestamp", FormatTimestamp(ts)); fields->Set("timestamp", FormatTimestamp(ts)); - - String eventType = m_EventPrefix + type; - fields->Set("type", eventType); + fields->Set("type", "icinga2.event." + type); /* Every payload needs a line describing the index. * We do it this way to avoid problems with a near full queue. @@ -408,19 +420,21 @@ void ElasticsearchWriter::Enqueue(const Checkable::Ptr& checkable, const String& } } +/** + * Queues a Flush on the work-queue if there isn't one queued already. + */ void ElasticsearchWriter::FlushTimeout() { - /* Prevent new data points from being added to the array, there is a - * race condition where they could disappear. - */ - std::unique_lock lock(m_DataBufferMutex); - - /* Flush if there are any data available. */ - if (m_DataBuffer.size() > 0) { - Log(LogDebug, "ElasticsearchWriter") - << "Timer expired writing " << m_DataBuffer.size() << " data points"; - Flush(); + if (m_FlushTimerInQueue.exchange(true, std::memory_order_relaxed)) { + return; } + + m_WorkQueue.Enqueue([&]() { + Defer resetFlushTimer{ + [&]() { m_FlushTimerInQueue.store(false, std::memory_order_relaxed); } + }; + Flush(); + }); } void ElasticsearchWriter::Flush() @@ -466,22 +480,6 @@ void ElasticsearchWriter::SendRequest(const String& body) url->SetPath(path); - OptionalTlsStream stream; - - try { - stream = Connect(); - } catch (const std::exception& ex) { - Log(LogWarning, "ElasticsearchWriter") - << "Flush failed, cannot connect to Elasticsearch: " << DiagnosticInformation(ex, false); - return; - } - - Defer s ([&stream]() { - if (stream.first) { - stream.first->next_layer().shutdown(); - } - }); - http::request request (http::verb::post, std::string(url->Format(true)), 10); request.set(http::field::user_agent, "Icinga/" + Application::GetAppVersion()); @@ -511,37 +509,14 @@ void ElasticsearchWriter::SendRequest(const String& body) << "Sending " << request.method_string() << " request" << ((!username.IsEmpty() && !password.IsEmpty()) ? " with basic auth" : "" ) << " to '" << url->Format() << "'."; + decltype(m_Connection->Send(request)) response; try { - if (stream.first) { - http::write(*stream.first, request); - stream.first->flush(); - } else { - http::write(*stream.second, request); - stream.second->flush(); - } - } catch (const std::exception&) { - Log(LogWarning, "ElasticsearchWriter") - << "Cannot write to HTTP API on host '" << GetHost() << "' port '" << GetPort() << "'."; - throw; + response = m_Connection->Send(request); + } catch (const PerfdataWriterConnection::Stopped& ex) { + Log(LogDebug, "ElasticsearchWriter") << ex.what(); + return; } - http::parser parser; - beast::flat_buffer buf; - - try { - if (stream.first) { - http::read(*stream.first, buf, parser); - } else { - http::read(*stream.second, buf, parser); - } - } catch (const std::exception& ex) { - Log(LogWarning, "ElasticsearchWriter") - << "Failed to parse HTTP response from host '" << GetHost() << "' port '" << GetPort() << "': " << DiagnosticInformation(ex, false); - throw; - } - - auto& response (parser.get()); - if (response.result_int() > 299) { if (response.result() == http::status::unauthorized) { /* More verbose error logging with Elasticsearch is hidden behind a proxy. */ @@ -589,66 +564,6 @@ void ElasticsearchWriter::SendRequest(const String& body) } } -OptionalTlsStream ElasticsearchWriter::Connect() -{ - Log(LogNotice, "ElasticsearchWriter") - << "Connecting to Elasticsearch on host '" << GetHost() << "' port '" << GetPort() << "'."; - - OptionalTlsStream stream; - bool tls = GetEnableTls(); - - if (tls) { - Shared::Ptr sslContext; - - try { - sslContext = MakeAsioSslContext(GetCertPath(), GetKeyPath(), GetCaPath()); - } catch (const std::exception&) { - Log(LogWarning, "ElasticsearchWriter") - << "Unable to create SSL context."; - throw; - } - - stream.first = Shared::Make(IoEngine::Get().GetIoContext(), *sslContext, GetHost()); - - } else { - stream.second = Shared::Make(IoEngine::Get().GetIoContext()); - } - - try { - icinga::Connect(tls ? stream.first->lowest_layer() : stream.second->lowest_layer(), GetHost(), GetPort()); - } catch (const std::exception&) { - Log(LogWarning, "ElasticsearchWriter") - << "Can't connect to Elasticsearch on host '" << GetHost() << "' port '" << GetPort() << "'."; - throw; - } - - if (tls) { - auto& tlsStream (stream.first->next_layer()); - - try { - tlsStream.handshake(tlsStream.client); - } catch (const std::exception&) { - Log(LogWarning, "ElasticsearchWriter") - << "TLS handshake with host '" << GetHost() << "' on port " << GetPort() << " failed."; - throw; - } - - if (!GetInsecureNoverify()) { - if (!tlsStream.GetPeerCertificate()) { - BOOST_THROW_EXCEPTION(std::runtime_error("Elasticsearch didn't present any TLS certificate.")); - } - - if (!tlsStream.IsVerifyOK()) { - BOOST_THROW_EXCEPTION(std::runtime_error( - "TLS certificate validation failed: " + std::string(tlsStream.GetVerifyError()) - )); - } - } - } - - return stream; -} - void ElasticsearchWriter::AssertOnWorkQueue() { ASSERT(m_WorkQueue.IsWorkerThread()); diff --git a/lib/perfdata/elasticsearchwriter.hpp b/lib/perfdata/elasticsearchwriter.hpp index c67c4324b..e83f9c935 100644 --- a/lib/perfdata/elasticsearchwriter.hpp +++ b/lib/perfdata/elasticsearchwriter.hpp @@ -5,11 +5,10 @@ #define ELASTICSEARCHWRITER_H #include "perfdata/elasticsearchwriter-ti.hpp" -#include "icinga/service.hpp" +#include "icinga/checkable.hpp" #include "base/configobject.hpp" #include "base/workqueue.hpp" -#include "base/timer.hpp" -#include "base/tlsstream.hpp" +#include "perfdata/perfdatawriterconnection.hpp" namespace icinga { @@ -29,16 +28,18 @@ public: protected: void OnConfigLoaded() override; + void Start(bool runtimeCreated) override; void Resume() override; void Pause() override; private: - String m_EventPrefix; WorkQueue m_WorkQueue{10000000, 1}; boost::signals2::connection m_HandleCheckResults, m_HandleStateChanges, m_HandleNotifications; Timer::Ptr m_FlushTimer; + std::atomic_bool m_FlushTimerInQueue{false}; std::vector m_DataBuffer; - std::mutex m_DataBufferMutex; + Shared::Ptr m_SslContext; + PerfdataWriterConnection::Ptr m_Connection; void AddCheckResult(const Dictionary::Ptr& fields, const Checkable::Ptr& checkable, const CheckResult::Ptr& cr); void AddTemplateTags(const Dictionary::Ptr& fields, const Checkable::Ptr& checkable, const CheckResult::Ptr& cr); @@ -51,7 +52,6 @@ private: void Enqueue(const Checkable::Ptr& checkable, const String& type, const Dictionary::Ptr& fields, double ts); - OptionalTlsStream Connect(); void AssertOnWorkQueue(); void ExceptionHandler(boost::exception_ptr exp); void FlushTimeout(); diff --git a/lib/perfdata/elasticsearchwriter.ti b/lib/perfdata/elasticsearchwriter.ti index 45c468804..c5e7bc3e1 100644 --- a/lib/perfdata/elasticsearchwriter.ti +++ b/lib/perfdata/elasticsearchwriter.ti @@ -40,7 +40,10 @@ class ElasticsearchWriter : ConfigObject [config] String cert_path; [config] String key_path; - [config] int flush_interval { + [config] double disconnect_timeout { + default {{{ return 10; }}} + }; + [config] double flush_interval { default {{{ return 10; }}} }; [config] int flush_threshold { diff --git a/lib/perfdata/gelfwriter.cpp b/lib/perfdata/gelfwriter.cpp index c2bff71dd..6f8567f70 100644 --- a/lib/perfdata/gelfwriter.cpp +++ b/lib/perfdata/gelfwriter.cpp @@ -6,28 +6,19 @@ #include "icinga/service.hpp" #include "icinga/notification.hpp" #include "icinga/checkcommand.hpp" -#include "icinga/macroprocessor.hpp" #include "icinga/compatutility.hpp" -#include "base/tcpsocket.hpp" #include "base/configtype.hpp" #include "base/objectlock.hpp" #include "base/logger.hpp" #include "base/utility.hpp" #include "base/perfdatavalue.hpp" -#include "base/application.hpp" #include "base/stream.hpp" -#include "base/networkstream.hpp" #include "base/context.hpp" #include "base/exception.hpp" #include "base/json.hpp" #include "base/statsfunction.hpp" #include #include -#include "base/io-engine.hpp" -#include -#include -#include -#include using namespace icinga; @@ -62,7 +53,7 @@ void GelfWriter::StatsFunc(const Dictionary::Ptr& status, const Array::Ptr& perf nodes.emplace_back(gelfwriter->GetName(), new Dictionary({ { "work_queue_items", workQueueItems }, { "work_queue_item_rate", workQueueItemRate }, - { "connected", gelfwriter->GetConnected() }, + { "connected", gelfwriter->m_Connection->IsConnected() }, { "source", gelfwriter->GetSource() } })); @@ -73,6 +64,22 @@ void GelfWriter::StatsFunc(const Dictionary::Ptr& status, const Array::Ptr& perf status->Set("gelfwriter", new Dictionary(std::move(nodes))); } +void GelfWriter::Start(bool runtimeCreated) +{ + ObjectImpl::Start(runtimeCreated); + + /* Initialize connection */ + if (GetEnableTls()) { + try { + m_SslContext = MakeAsioSslContext(GetCertPath(), GetKeyPath(), GetCaPath()); + } catch (const std::exception& ex) { + Log(LogWarning, "GelfWriter") + << "Unable to create SSL context."; + throw; + } + } +} + void GelfWriter::Resume() { ObjectImpl::Resume(); @@ -83,12 +90,7 @@ void GelfWriter::Resume() /* Register exception handler for WQ tasks. */ m_WorkQueue.SetExceptionCallback([this](boost::exception_ptr exp) { ExceptionHandler(std::move(exp)); }); - /* Timer for reconnecting */ - m_ReconnectTimer = Timer::Create(); - m_ReconnectTimer->SetInterval(10); - m_ReconnectTimer->OnTimerExpired.connect([this](const Timer * const&) { ReconnectTimerHandler(); }); - m_ReconnectTimer->Start(); - m_ReconnectTimer->Reschedule(0); + m_Connection = new PerfdataWriterConnection{this, GetHost(), GetPort(), m_SslContext, !GetInsecureNoverify()}; /* Register event handlers. */ m_HandleCheckResults = Checkable::OnNewCheckResult.connect([this](const Checkable::Ptr& checkable, @@ -113,18 +115,15 @@ void GelfWriter::Pause() m_HandleNotifications.disconnect(); m_HandleStateChanges.disconnect(); - m_ReconnectTimer->Stop(true); + std::promise queueDonePromise; - m_WorkQueue.Enqueue([this]() { - try { - ReconnectInternal(); - } catch (const std::exception&) { - Log(LogInformation, "GelfWriter") - << "Unable to connect, not flushing buffers. Data may be lost."; - } - }, PriorityImmediate); + m_WorkQueue.Enqueue([&]() { + queueDonePromise.set_value(); + }, PriorityLow); + + auto timeout = std::chrono::duration{GetDisconnectTimeout()}; + m_Connection->CancelAfterTimeout(queueDonePromise.get_future(), timeout); - m_WorkQueue.Enqueue([this]() { DisconnectInternal(); }, PriorityLow); m_WorkQueue.Join(); Log(LogInformation, "GelfWriter") @@ -142,126 +141,6 @@ void GelfWriter::ExceptionHandler(boost::exception_ptr exp) { Log(LogCritical, "GelfWriter") << "Exception during Graylog Gelf operation: " << DiagnosticInformation(exp, false); Log(LogDebug, "GelfWriter") << "Exception during Graylog Gelf operation: " << DiagnosticInformation(exp, true); - - DisconnectInternal(); -} - -void GelfWriter::Reconnect() -{ - AssertOnWorkQueue(); - - if (IsPaused()) { - SetConnected(false); - return; - } - - ReconnectInternal(); -} - -void GelfWriter::ReconnectInternal() -{ - double startTime = Utility::GetTime(); - - CONTEXT("Reconnecting to Graylog Gelf '" << GetName() << "'"); - - SetShouldConnect(true); - - if (GetConnected()) - return; - - Log(LogNotice, "GelfWriter") - << "Reconnecting to Graylog Gelf on host '" << GetHost() << "' port '" << GetPort() << "'."; - - bool ssl = GetEnableTls(); - - if (ssl) { - Shared::Ptr sslContext; - - try { - sslContext = MakeAsioSslContext(GetCertPath(), GetKeyPath(), GetCaPath()); - } catch (const std::exception& ex) { - Log(LogWarning, "GelfWriter") - << "Unable to create SSL context."; - throw; - } - - m_Stream.first = Shared::Make(IoEngine::Get().GetIoContext(), *sslContext, GetHost()); - - } else { - m_Stream.second = Shared::Make(IoEngine::Get().GetIoContext()); - } - - try { - icinga::Connect(ssl ? m_Stream.first->lowest_layer() : m_Stream.second->lowest_layer(), GetHost(), GetPort()); - } catch (const std::exception& ex) { - Log(LogWarning, "GelfWriter") - << "Can't connect to Graylog Gelf on host '" << GetHost() << "' port '" << GetPort() << ".'"; - throw; - } - - if (ssl) { - auto& tlsStream (m_Stream.first->next_layer()); - - try { - tlsStream.handshake(tlsStream.client); - } catch (const std::exception& ex) { - Log(LogWarning, "GelfWriter") - << "TLS handshake with host '" << GetHost() << " failed.'"; - throw; - } - - if (!GetInsecureNoverify()) { - if (!tlsStream.GetPeerCertificate()) { - BOOST_THROW_EXCEPTION(std::runtime_error("Graylog Gelf didn't present any TLS certificate.")); - } - - if (!tlsStream.IsVerifyOK()) { - BOOST_THROW_EXCEPTION(std::runtime_error( - "TLS certificate validation failed: " + std::string(tlsStream.GetVerifyError()) - )); - } - } - } - - SetConnected(true); - - Log(LogInformation, "GelfWriter") - << "Finished reconnecting to Graylog Gelf in " << std::setw(2) << Utility::GetTime() - startTime << " second(s)."; -} - -void GelfWriter::ReconnectTimerHandler() -{ - m_WorkQueue.Enqueue([this]() { Reconnect(); }, PriorityNormal); -} - -void GelfWriter::Disconnect() -{ - AssertOnWorkQueue(); - - DisconnectInternal(); -} - -void GelfWriter::DisconnectInternal() -{ - if (!GetConnected()) - return; - - if (m_Stream.first) { - boost::system::error_code ec; - m_Stream.first->next_layer().shutdown(ec); - - // https://stackoverflow.com/a/25703699 - // As long as the error code's category is not an SSL category, then the protocol was securely shutdown - if (ec.category() == boost::asio::error::get_ssl_category()) { - Log(LogCritical, "GelfWriter") - << "TLS shutdown with host '" << GetHost() << "' could not be done securely."; - } - } else if (m_Stream.second) { - m_Stream.second->close(); - } - - SetConnected(false); - } void GelfWriter::CheckResultHandler(const Checkable::Ptr& checkable, const CheckResult::Ptr& cr) @@ -298,6 +177,10 @@ void GelfWriter::CheckResultHandler(const Checkable::Ptr& checkable, const Check fields->Set("_check_command", checkCommand->GetName()); m_WorkQueue.Enqueue([this, checkable, cr, fields = std::move(fields)]() { + if (m_Connection->IsStopped()) { + return; + } + CONTEXT("GELF Processing check result for '" << checkable->GetName() << "'"); Log(LogDebug, "GelfWriter") @@ -405,6 +288,10 @@ void GelfWriter::NotificationToUserHandler(const Checkable::Ptr& checkable, Noti fields->Set("_check_command", checkable->GetCheckCommand()->GetName()); m_WorkQueue.Enqueue([this, checkable, ts, fields = std::move(fields)]() { + if (m_Connection->IsStopped()) { + return; + } + CONTEXT("GELF Processing notification to all users '" << checkable->GetName() << "'"); Log(LogDebug, "GelfWriter") @@ -447,6 +334,10 @@ void GelfWriter::StateChangeHandler(const Checkable::Ptr& checkable, const Check fields->Set("_check_source", cr->GetCheckSource()); m_WorkQueue.Enqueue([this, checkable, fields = std::move(fields), ts = cr->GetExecutionEnd()]() { + if (m_Connection->IsStopped()) { + return; + } + CONTEXT("GELF Processing state change '" << checkable->GetName() << "'"); Log(LogDebug, "GelfWriter") @@ -473,26 +364,15 @@ void GelfWriter::SendLogMessage(const Checkable::Ptr& checkable, const String& g msgbuf << gelfMessage; msgbuf << '\0'; - String log = msgbuf.str(); - - if (!GetConnected()) - return; + auto log = msgbuf.str(); try { Log(LogDebug, "GelfWriter") << "Checkable '" << checkable->GetName() << "' sending message '" << log << "'."; - if (m_Stream.first) { - boost::asio::write(*m_Stream.first, boost::asio::buffer(msgbuf.str())); - m_Stream.first->flush(); - } else { - boost::asio::write(*m_Stream.second, boost::asio::buffer(msgbuf.str())); - m_Stream.second->flush(); - } - } catch (const std::exception& ex) { - Log(LogCritical, "GelfWriter") - << "Cannot write to TCP socket on host '" << GetHost() << "' port '" << GetPort() << "'."; - - throw ex; + m_Connection->Send(boost::asio::const_buffer{log.data(), log.length()}); + } catch (const PerfdataWriterConnection::Stopped& ex) { + Log(LogDebug, "GelfWriter") << ex.what(); + return; } } diff --git a/lib/perfdata/gelfwriter.hpp b/lib/perfdata/gelfwriter.hpp index e24b6e6ea..f7d2a10c3 100644 --- a/lib/perfdata/gelfwriter.hpp +++ b/lib/perfdata/gelfwriter.hpp @@ -5,12 +5,10 @@ #define GELFWRITER_H #include "perfdata/gelfwriter-ti.hpp" -#include "icinga/service.hpp" +#include "perfdata/perfdatawriterconnection.hpp" +#include "icinga/checkable.hpp" #include "base/configobject.hpp" -#include "base/tcpsocket.hpp" -#include "base/timer.hpp" #include "base/workqueue.hpp" -#include namespace icinga { @@ -30,15 +28,16 @@ public: protected: void OnConfigLoaded() override; + void Start(bool runtimeCreated) override; void Resume() override; void Pause() override; private: - OptionalTlsStream m_Stream; + PerfdataWriterConnection::Ptr m_Connection; WorkQueue m_WorkQueue{10000000, 1}; + Shared::Ptr m_SslContext; boost::signals2::connection m_HandleCheckResults, m_HandleNotifications, m_HandleStateChanges; - Timer::Ptr m_ReconnectTimer; void CheckResultHandler(const Checkable::Ptr& checkable, const CheckResult::Ptr& cr); void NotificationToUserHandler(const Checkable::Ptr& checkable, NotificationType notificationType, const CheckResult::Ptr& cr, @@ -48,13 +47,6 @@ private: String ComposeGelfMessage(const Dictionary::Ptr& fields, const String& source, double ts); void SendLogMessage(const Checkable::Ptr& checkable, const String& gelfMessage); - void ReconnectTimerHandler(); - - void Disconnect(); - void DisconnectInternal(); - void Reconnect(); - void ReconnectInternal(); - void AssertOnWorkQueue(); void ExceptionHandler(boost::exception_ptr exp); diff --git a/lib/perfdata/gelfwriter.ti b/lib/perfdata/gelfwriter.ti index b04debbb4..46c194d1a 100644 --- a/lib/perfdata/gelfwriter.ti +++ b/lib/perfdata/gelfwriter.ti @@ -25,9 +25,8 @@ class GelfWriter : ConfigObject default {{{ return false; }}} }; - [no_user_modify] bool connected; - [no_user_modify] bool should_connect { - default {{{ return true; }}} + [config] double disconnect_timeout { + default {{{ return 10; }}} }; [config] bool enable_ha { default {{{ return false; }}} diff --git a/lib/perfdata/graphitewriter.cpp b/lib/perfdata/graphitewriter.cpp index 652b7d3d1..e00cd9275 100644 --- a/lib/perfdata/graphitewriter.cpp +++ b/lib/perfdata/graphitewriter.cpp @@ -7,16 +7,13 @@ #include "icinga/checkcommand.hpp" #include "icinga/macroprocessor.hpp" #include "icinga/icingaapplication.hpp" -#include "base/tcpsocket.hpp" #include "base/configtype.hpp" #include "base/objectlock.hpp" #include "base/logger.hpp" #include "base/convert.hpp" #include "base/utility.hpp" #include "base/perfdatavalue.hpp" -#include "base/application.hpp" #include "base/stream.hpp" -#include "base/networkstream.hpp" #include "base/exception.hpp" #include "base/statsfunction.hpp" #include @@ -65,7 +62,7 @@ void GraphiteWriter::StatsFunc(const Dictionary::Ptr& status, const Array::Ptr& nodes.emplace_back(graphitewriter->GetName(), new Dictionary({ { "work_queue_items", workQueueItems }, { "work_queue_item_rate", workQueueItemRate }, - { "connected", graphitewriter->GetConnected() } + { "connected", graphitewriter->m_Connection->IsConnected() } })); perfdata->Add(new PerfdataValue("graphitewriter_" + graphitewriter->GetName() + "_work_queue_items", workQueueItems)); @@ -88,12 +85,7 @@ void GraphiteWriter::Resume() /* Register exception handler for WQ tasks. */ m_WorkQueue.SetExceptionCallback([this](boost::exception_ptr exp) { ExceptionHandler(std::move(exp)); }); - /* Timer for reconnecting */ - m_ReconnectTimer = Timer::Create(); - m_ReconnectTimer->SetInterval(10); - m_ReconnectTimer->OnTimerExpired.connect([this](const Timer * const&) { ReconnectTimerHandler(); }); - m_ReconnectTimer->Start(); - m_ReconnectTimer->Reschedule(0); + m_Connection = new PerfdataWriterConnection{this, GetHost(), GetPort()}; /* Register event handlers. */ m_HandleCheckResults = Checkable::OnNewCheckResult.connect([this](const Checkable::Ptr& checkable, @@ -108,20 +100,17 @@ void GraphiteWriter::Resume() void GraphiteWriter::Pause() { m_HandleCheckResults.disconnect(); - m_ReconnectTimer->Stop(true); - try { - ReconnectInternal(); - } catch (const std::exception&) { - Log(LogInformation, "GraphiteWriter") - << "'" << GetName() << "' paused. Unable to connect, not flushing buffers. Data may be lost on reload."; + std::promise queueDonePromise; - ObjectImpl::Pause(); - return; - } + m_WorkQueue.Enqueue([&]() { + queueDonePromise.set_value(); + }, PriorityLow); + + auto timeout = std::chrono::duration{GetDisconnectTimeout()}; + m_Connection->CancelAfterTimeout(queueDonePromise.get_future(), timeout); m_WorkQueue.Join(); - DisconnectInternal(); Log(LogInformation, "GraphiteWriter") << "'" << GetName() << "' paused."; @@ -150,105 +139,6 @@ void GraphiteWriter::ExceptionHandler(boost::exception_ptr exp) Log(LogDebug, "GraphiteWriter") << "Exception during Graphite operation: " << DiagnosticInformation(std::move(exp)); - - if (GetConnected()) { - m_Stream->close(); - - SetConnected(false); - } -} - -/** - * Reconnect method, stops when the feature is paused in HA zones. - * - * Called inside the WQ. - */ -void GraphiteWriter::Reconnect() -{ - AssertOnWorkQueue(); - - if (IsPaused()) { - SetConnected(false); - return; - } - - ReconnectInternal(); -} - -/** - * Reconnect method, connects to a TCP Stream - */ -void GraphiteWriter::ReconnectInternal() -{ - double startTime = Utility::GetTime(); - - CONTEXT("Reconnecting to Graphite '" << GetName() << "'"); - - SetShouldConnect(true); - - if (GetConnected()) - return; - - Log(LogNotice, "GraphiteWriter") - << "Reconnecting to Graphite on host '" << GetHost() << "' port '" << GetPort() << "'."; - - m_Stream = Shared::Make(IoEngine::Get().GetIoContext()); - - try { - icinga::Connect(m_Stream->lowest_layer(), GetHost(), GetPort()); - } catch (const std::exception& ex) { - Log(LogWarning, "GraphiteWriter") - << "Can't connect to Graphite on host '" << GetHost() << "' port '" << GetPort() << ".'"; - - SetConnected(false); - - throw; - } - - SetConnected(true); - - Log(LogInformation, "GraphiteWriter") - << "Finished reconnecting to Graphite in " << std::setw(2) << Utility::GetTime() - startTime << " second(s)."; -} - -/** - * Reconnect handler called by the timer. - * - * Enqueues a reconnect task into the WQ. - */ -void GraphiteWriter::ReconnectTimerHandler() -{ - if (IsPaused()) - return; - - m_WorkQueue.Enqueue([this]() { Reconnect(); }, PriorityHigh); -} - -/** - * Disconnect the stream. - * - * Called inside the WQ. - */ -void GraphiteWriter::Disconnect() -{ - AssertOnWorkQueue(); - - DisconnectInternal(); -} - -/** - * Disconnect the stream. - * - * Called outside the WQ. - */ -void GraphiteWriter::DisconnectInternal() -{ - if (!GetConnected()) - return; - - m_Stream->close(); - - SetConnected(false); } /** @@ -302,11 +192,11 @@ void GraphiteWriter::CheckResultHandler(const Checkable::Ptr& checkable, const C } m_WorkQueue.Enqueue([this, checkable, cr, prefix = std::move(prefix), metadata = std::move(metadata)]() { - CONTEXT("Processing check result for '" << checkable->GetName() << "'"); + if (m_Connection->IsStopped()) { + return; + } - /* TODO: Deal with missing connection here. Needs refactoring - * into parsing the actual performance data and then putting it - * into a queue for re-inserting. */ + CONTEXT("Processing check result for '" << checkable->GetName() << "'"); for (auto& [name, val] : metadata) { SendMetric(checkable, prefix + ".metadata", name, val, cr->GetExecutionEnd()); @@ -394,19 +284,11 @@ void GraphiteWriter::SendMetric(const Checkable::Ptr& checkable, const String& p // do not send \n to debug log msgbuf << "\n"; - std::unique_lock lock(m_StreamMutex); - - if (!GetConnected()) - return; - try { - asio::write(*m_Stream, asio::buffer(msgbuf.str())); - m_Stream->flush(); - } catch (const std::exception& ex) { - Log(LogCritical, "GraphiteWriter") - << "Cannot write to TCP socket on host '" << GetHost() << "' port '" << GetPort() << "'."; - - throw ex; + m_Connection->Send(asio::buffer(msgbuf.str())); + } catch (const PerfdataWriterConnection::Stopped& ex) { + Log(LogDebug, "GraphiteWriter") << ex.what(); + return; } } diff --git a/lib/perfdata/graphitewriter.hpp b/lib/perfdata/graphitewriter.hpp index b28db8172..470fcc07d 100644 --- a/lib/perfdata/graphitewriter.hpp +++ b/lib/perfdata/graphitewriter.hpp @@ -5,13 +5,10 @@ #define GRAPHITEWRITER_H #include "perfdata/graphitewriter-ti.hpp" -#include "icinga/service.hpp" +#include "icinga/checkable.hpp" #include "base/configobject.hpp" -#include "base/tcpsocket.hpp" -#include "base/timer.hpp" #include "base/workqueue.hpp" -#include -#include +#include "perfdata/perfdatawriterconnection.hpp" namespace icinga { @@ -38,12 +35,10 @@ protected: void Pause() override; private: - Shared::Ptr m_Stream; - std::mutex m_StreamMutex; + PerfdataWriterConnection::Ptr m_Connection; WorkQueue m_WorkQueue{10000000, 1}; boost::signals2::connection m_HandleCheckResults; - Timer::Ptr m_ReconnectTimer; void CheckResultHandler(const Checkable::Ptr& checkable, const CheckResult::Ptr& cr); void SendMetric(const Checkable::Ptr& checkable, const String& prefix, const String& name, double value, double ts); @@ -52,13 +47,6 @@ private: static String EscapeMetricLabel(const String& str); static Value EscapeMacroMetric(const Value& value); - void ReconnectTimerHandler(); - - void Disconnect(); - void DisconnectInternal(); - void Reconnect(); - void ReconnectInternal(); - void AssertOnWorkQueue(); void ExceptionHandler(boost::exception_ptr exp); diff --git a/lib/perfdata/graphitewriter.ti b/lib/perfdata/graphitewriter.ti index d89e879dd..f0d9bfb80 100644 --- a/lib/perfdata/graphitewriter.ti +++ b/lib/perfdata/graphitewriter.ti @@ -27,9 +27,8 @@ class GraphiteWriter : ConfigObject [config] bool enable_send_thresholds; [config] bool enable_send_metadata; - [no_user_modify] bool connected; - [no_user_modify] bool should_connect { - default {{{ return true; }}} + [config] double disconnect_timeout { + default {{{ return 10; }}} }; [config] bool enable_ha { default {{{ return false; }}} diff --git a/lib/perfdata/influxdbcommonwriter.cpp b/lib/perfdata/influxdbcommonwriter.cpp index b097dfa8a..62af9d1d9 100644 --- a/lib/perfdata/influxdbcommonwriter.cpp +++ b/lib/perfdata/influxdbcommonwriter.cpp @@ -2,6 +2,7 @@ // SPDX-License-Identifier: GPL-2.0-or-later #include "perfdata/influxdbcommonwriter.hpp" +#include "base/defer.hpp" #include "perfdata/influxdbcommonwriter-ti.cpp" #include "remote/url.hpp" #include "icinga/service.hpp" @@ -9,36 +10,15 @@ #include "icinga/icingaapplication.hpp" #include "icinga/checkcommand.hpp" #include "base/application.hpp" -#include "base/defer.hpp" -#include "base/io-engine.hpp" -#include "base/tcpsocket.hpp" -#include "base/configtype.hpp" #include "base/objectlock.hpp" #include "base/logger.hpp" -#include "base/convert.hpp" -#include "base/utility.hpp" -#include "base/stream.hpp" #include "base/json.hpp" -#include "base/networkstream.hpp" #include "base/exception.hpp" -#include "base/statsfunction.hpp" -#include "base/tlsutility.hpp" #include #include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include #include #include #include -#include #include #include @@ -80,6 +60,21 @@ void InfluxdbCommonWriter::OnConfigLoaded() } } +void InfluxdbCommonWriter::Start(bool runtimeCreated) +{ + ObjectImpl::Start(runtimeCreated); + + if (GetSslEnable()) { + try { + m_SslContext = MakeAsioSslContext(GetSslCert(), GetSslKey(), GetSslCaCert()); + } catch (const std::exception& ex) { + Log(LogCritical, GetReflectionType()->GetName()) + << "Unable to create SSL context: " << ex.what(); + throw; + } + } +} + void InfluxdbCommonWriter::Resume() { ObjectImpl::Resume(); @@ -97,6 +92,8 @@ void InfluxdbCommonWriter::Resume() m_FlushTimer->Start(); m_FlushTimer->Reschedule(0); + m_Connection = new PerfdataWriterConnection{this, GetHost(), GetPort(), m_SslContext, !GetSslInsecureNoverify()}; + /* Register for new metrics. */ m_HandleCheckResults = Checkable::OnNewCheckResult.connect([this](const Checkable::Ptr& checkable, const CheckResult::Ptr& cr, const MessageOrigin::Ptr&) { @@ -114,7 +111,15 @@ void InfluxdbCommonWriter::Pause() << "Processing pending tasks and flushing data buffers."; m_FlushTimer->Stop(true); - m_WorkQueue.Enqueue([this]() { FlushWQ(); }, PriorityLow); + + std::promise queueDonePromise; + m_WorkQueue.Enqueue([&]() { + FlushWQ(); + queueDonePromise.set_value(); + }, PriorityLow); + + auto timeout = std::chrono::duration{GetDisconnectTimeout()}; + m_Connection->CancelAfterTimeout(queueDonePromise.get_future(), timeout); /* Wait for the flush to complete, implicitly waits for all WQ tasks enqueued prior to pausing. */ m_WorkQueue.Join(); @@ -136,68 +141,6 @@ void InfluxdbCommonWriter::ExceptionHandler(boost::exception_ptr exp) Log(LogDebug, GetReflectionType()->GetName()) << "Exception during InfluxDB operation: " << DiagnosticInformation(std::move(exp)); - - //TODO: Close the connection, if we keep it open. -} - -OptionalTlsStream InfluxdbCommonWriter::Connect() -{ - Log(LogNotice, GetReflectionType()->GetName()) - << "Reconnecting to InfluxDB on host '" << GetHost() << "' port '" << GetPort() << "'."; - - OptionalTlsStream stream; - bool ssl = GetSslEnable(); - - if (ssl) { - Shared::Ptr sslContext; - - try { - sslContext = MakeAsioSslContext(GetSslCert(), GetSslKey(), GetSslCaCert()); - } catch (const std::exception& ex) { - Log(LogWarning, GetReflectionType()->GetName()) - << "Unable to create SSL context."; - throw; - } - - stream.first = Shared::Make(IoEngine::Get().GetIoContext(), *sslContext, GetHost()); - - } else { - stream.second = Shared::Make(IoEngine::Get().GetIoContext()); - } - - try { - icinga::Connect(ssl ? stream.first->lowest_layer() : stream.second->lowest_layer(), GetHost(), GetPort()); - } catch (const std::exception& ex) { - Log(LogWarning, GetReflectionType()->GetName()) - << "Can't connect to InfluxDB on host '" << GetHost() << "' port '" << GetPort() << "'."; - throw; - } - - if (ssl) { - auto& tlsStream (stream.first->next_layer()); - - try { - tlsStream.handshake(tlsStream.client); - } catch (const std::exception& ex) { - Log(LogWarning, GetReflectionType()->GetName()) - << "TLS handshake with host '" << GetHost() << "' failed."; - throw; - } - - if (!GetSslInsecureNoverify()) { - if (!tlsStream.GetPeerCertificate()) { - BOOST_THROW_EXCEPTION(std::runtime_error("InfluxDB didn't present any TLS certificate.")); - } - - if (!tlsStream.IsVerifyOK()) { - BOOST_THROW_EXCEPTION(std::runtime_error( - "TLS certificate validation failed: " + std::string(tlsStream.GetVerifyError()) - )); - } - } - } - - return stream; } void InfluxdbCommonWriter::CheckResultHandler(const Checkable::Ptr& checkable, const CheckResult::Ptr& cr) @@ -261,6 +204,10 @@ void InfluxdbCommonWriter::CheckResultHandler(const Checkable::Ptr& checkable, c } m_WorkQueue.Enqueue([this, checkable, cr, tmpl = std::move(tmpl), metadataFields = std::move(fields)]() { + if (m_Connection->IsStopped()) { + return; + } + CONTEXT("Processing check result for '" << checkable->GetName() << "'"); double ts = cr->GetExecutionEnd(); @@ -411,19 +358,19 @@ void InfluxdbCommonWriter::SendMetric(const Checkable::Ptr& checkable, const Dic } } +/** + * Queues a Flush on the work-queue and restarts the timer. + */ void InfluxdbCommonWriter::FlushTimeout() { - m_WorkQueue.Enqueue([this]() { FlushTimeoutWQ(); }, PriorityHigh); -} + if (m_FlushTimerInQueue.exchange(true, std::memory_order_relaxed)) { + return; + } -void InfluxdbCommonWriter::FlushTimeoutWQ() -{ - AssertOnWorkQueue(); - - Log(LogDebug, GetReflectionType()->GetName()) - << "Timer expired writing " << m_DataBuffer.size() << " data points"; - - FlushWQ(); + m_WorkQueue.Enqueue([&]() { + Defer resetFlushTimer{[&]() { m_FlushTimerInQueue.store(false, std::memory_order_relaxed); }}; + FlushWQ(); + }); } void InfluxdbCommonWriter::FlushWQ() @@ -444,55 +391,16 @@ void InfluxdbCommonWriter::FlushWQ() m_DataBuffer.clear(); m_DataBufferSize = 0; - OptionalTlsStream stream; - - try { - stream = Connect(); - } catch (const std::exception& ex) { - Log(LogWarning, GetReflectionType()->GetName()) - << "Flush failed, cannot connect to InfluxDB: " << DiagnosticInformation(ex, false); - return; - } - - Defer s ([&stream]() { - if (stream.first) { - stream.first->next_layer().shutdown(); - } - }); - auto request (AssembleRequest(std::move(body))); + decltype(m_Connection->Send(request)) response; try { - if (stream.first) { - http::write(*stream.first, request); - stream.first->flush(); - } else { - http::write(*stream.second, request); - stream.second->flush(); - } - } catch (const std::exception& ex) { - Log(LogWarning, GetReflectionType()->GetName()) - << "Cannot write to TCP socket on host '" << GetHost() << "' port '" << GetPort() << "'."; - throw; + response = m_Connection->Send(request); + } catch (const PerfdataWriterConnection::Stopped& ex) { + Log(LogDebug, GetReflectionType()->GetName()) << ex.what(); + return; } - http::parser parser; - beast::flat_buffer buf; - - try { - if (stream.first) { - http::read(*stream.first, buf, parser); - } else { - http::read(*stream.second, buf, parser); - } - } catch (const std::exception& ex) { - Log(LogWarning, GetReflectionType()->GetName()) - << "Failed to parse HTTP response from host '" << GetHost() << "' port '" << GetPort() << "': " << DiagnosticInformation(ex); - throw; - } - - auto& response (parser.get()); - if (response.result() != http::status::no_content) { Log(LogWarning, GetReflectionType()->GetName()) << "Unexpected response code: " << response.result(); diff --git a/lib/perfdata/influxdbcommonwriter.hpp b/lib/perfdata/influxdbcommonwriter.hpp index 35caa2f25..cfda02501 100644 --- a/lib/perfdata/influxdbcommonwriter.hpp +++ b/lib/perfdata/influxdbcommonwriter.hpp @@ -5,18 +5,13 @@ #define INFLUXDBCOMMONWRITER_H #include "perfdata/influxdbcommonwriter-ti.hpp" -#include "icinga/service.hpp" +#include "icinga/checkable.hpp" #include "base/configobject.hpp" #include "base/perfdatavalue.hpp" -#include "base/tcpsocket.hpp" -#include "base/timer.hpp" -#include "base/tlsstream.hpp" #include "base/workqueue.hpp" #include "remote/url.hpp" -#include -#include +#include "perfdata/perfdatawriterconnection.hpp" #include -#include namespace icinga { @@ -39,6 +34,7 @@ public: protected: void OnConfigLoaded() override; + void Start(bool runtimeCreated) override; void Resume() override; void Pause() override; @@ -50,22 +46,22 @@ protected: private: boost::signals2::connection m_HandleCheckResults; Timer::Ptr m_FlushTimer; + std::atomic_bool m_FlushTimerInQueue{false}; WorkQueue m_WorkQueue{10000000, 1}; std::vector m_DataBuffer; std::atomic_size_t m_DataBufferSize{0}; + Shared::Ptr m_SslContext; + PerfdataWriterConnection::Ptr m_Connection; void CheckResultHandler(const Checkable::Ptr& checkable, const CheckResult::Ptr& cr); void SendMetric(const Checkable::Ptr& checkable, const Dictionary::Ptr& tmpl, const String& label, const Dictionary::Ptr& fields, double ts); void FlushTimeout(); - void FlushTimeoutWQ(); void FlushWQ(); static String EscapeKeyOrTagValue(const String& str); static String EscapeValue(const Value& value); - OptionalTlsStream Connect(); - void AssertOnWorkQueue(); void ExceptionHandler(boost::exception_ptr exp); diff --git a/lib/perfdata/influxdbcommonwriter.ti b/lib/perfdata/influxdbcommonwriter.ti index 12e867824..7074bceaf 100644 --- a/lib/perfdata/influxdbcommonwriter.ti +++ b/lib/perfdata/influxdbcommonwriter.ti @@ -52,13 +52,16 @@ abstract class InfluxdbCommonWriter : ConfigObject }); }}} }; + [config] double disconnect_timeout { + default {{{ return 10; }}} + }; [config] bool enable_send_thresholds { default {{{ return false; }}} }; [config] bool enable_send_metadata { default {{{ return false; }}} }; - [config] int flush_interval { + [config] double flush_interval { default {{{ return 10; }}} }; [config] int flush_threshold { diff --git a/lib/perfdata/opentsdbwriter.cpp b/lib/perfdata/opentsdbwriter.cpp index 95b43ba76..1b2f82a7d 100644 --- a/lib/perfdata/opentsdbwriter.cpp +++ b/lib/perfdata/opentsdbwriter.cpp @@ -7,17 +7,12 @@ #include "icinga/checkcommand.hpp" #include "icinga/macroprocessor.hpp" #include "icinga/icingaapplication.hpp" -#include "icinga/compatutility.hpp" -#include "base/tcpsocket.hpp" #include "base/configtype.hpp" #include "base/objectlock.hpp" #include "base/logger.hpp" #include "base/convert.hpp" -#include "base/utility.hpp" #include "base/perfdatavalue.hpp" -#include "base/application.hpp" #include "base/stream.hpp" -#include "base/networkstream.hpp" #include "base/exception.hpp" #include "base/statsfunction.hpp" #include @@ -64,7 +59,7 @@ void OpenTsdbWriter::StatsFunc(const Dictionary::Ptr& status, const Array::Ptr& nodes.emplace_back( opentsdbwriter->GetName(), new Dictionary({ - {"connected", opentsdbwriter->GetConnected()}, + { "connected", opentsdbwriter->m_Connection->IsConnected() }, {"work_queue_items", workQueueItems}, {"work_queue_item_rate", workQueueItemRate} } @@ -95,11 +90,7 @@ void OpenTsdbWriter::Resume() ReadConfigTemplate(); - m_ReconnectTimer = Timer::Create(); - m_ReconnectTimer->SetInterval(10); - m_ReconnectTimer->OnTimerExpired.connect([this](const Timer * const&) { ReconnectTimerHandler(); }); - m_ReconnectTimer->Start(); - m_ReconnectTimer->Reschedule(0); + m_Connection = new PerfdataWriterConnection{this, GetHost(), GetPort()}; m_HandleCheckResults = Service::OnNewCheckResult.connect([this](const Checkable::Ptr& checkable, const CheckResult::Ptr& cr, const MessageOrigin::Ptr&) { CheckResultHandler(checkable, cr); @@ -112,62 +103,24 @@ void OpenTsdbWriter::Resume() void OpenTsdbWriter::Pause() { m_HandleCheckResults.disconnect(); - m_ReconnectTimer->Stop(true); + + std::promise queueDonePromise; + + m_WorkQueue.Enqueue([&]() { + queueDonePromise.set_value(); + }, PriorityLow); + + auto timeout = std::chrono::duration{GetDisconnectTimeout()}; + m_Connection->CancelAfterTimeout(queueDonePromise.get_future(), timeout); m_WorkQueue.Join(); Log(LogInformation, "OpentsdbWriter") << "'" << GetName() << "' paused."; - m_Stream->close(); - - SetConnected(false); - ObjectImpl::Pause(); } -/** - * Reconnect handler called by the timer. - * Handles TLS - */ -void OpenTsdbWriter::ReconnectTimerHandler() -{ - if (IsPaused()) - return; - - SetShouldConnect(true); - - if (GetConnected()) - return; - - double startTime = Utility::GetTime(); - - Log(LogNotice, "OpenTsdbWriter") - << "Reconnecting to OpenTSDB TSD on host '" << GetHost() << "' port '" << GetPort() << "'."; - - /* - * We're using telnet as input method. Future PRs may change this into using the HTTP API. - * http://opentsdb.net/docs/build/html/user_guide/writing/index.html#telnet - */ - m_Stream = Shared::Make(IoEngine::Get().GetIoContext()); - - try { - icinga::Connect(m_Stream->lowest_layer(), GetHost(), GetPort()); - } catch (const std::exception& ex) { - Log(LogWarning, "OpenTsdbWriter") - << "Can't connect to OpenTSDB on host '" << GetHost() << "' port '" << GetPort() << "'."; - - SetConnected(false); - - return; - } - - SetConnected(true); - - Log(LogInformation, "OpenTsdbWriter") - << "Finished reconnecting to OpenTSDB in " << std::setw(2) << Utility::GetTime() - startTime << " second(s)."; -} - /** * Registered check result handler processing data. * Calculates tags from the config. @@ -300,7 +253,11 @@ void OpenTsdbWriter::CheckResultHandler(const Checkable::Ptr& checkable, const C metadata.emplace_back("acknowledgement", checkable->GetAcknowledgement()); m_WorkQueue.Enqueue( - [this, checkable, service, cr, metric = std::move(metric), tags = std::move(tags), metadata = std::move(metadata), ts]() mutable { + [this, checkable, service, cr, metric = std::move(metric), tags = std::move(tags), metadata = std::move(metadata)]() mutable { + if (m_Connection->IsStopped()) { + return; + } + double ts = cr->GetExecutionEnd(); for (auto& [name, val] : metadata) { @@ -437,18 +394,14 @@ void OpenTsdbWriter::SendMsgBuffer() { ASSERT(m_WorkQueue.IsWorkerThread()); - if (!GetConnected()) - return; - Log(LogDebug, "OpenTsdbWriter") << "Flushing data buffer to OpenTsdb."; try { - boost::asio::write(*m_Stream, boost::asio::buffer(std::exchange(m_MsgBuf, std::string{}))); - m_Stream->flush(); - } catch (const std::exception& ex) { - Log(LogCritical, "OpenTsdbWriter") - << "Cannot write to TCP socket on host '" << GetHost() << "' port '" << GetPort() << "'."; + m_Connection->Send(boost::asio::buffer(std::exchange(m_MsgBuf, std::string{}))); + } catch (const PerfdataWriterConnection::Stopped& ex) { + Log(LogDebug, "OpenTsdbWriter") << ex.what(); + return; } } diff --git a/lib/perfdata/opentsdbwriter.hpp b/lib/perfdata/opentsdbwriter.hpp index eb85f4436..5db298540 100644 --- a/lib/perfdata/opentsdbwriter.hpp +++ b/lib/perfdata/opentsdbwriter.hpp @@ -5,11 +5,9 @@ #define OPENTSDBWRITER_H #include "perfdata/opentsdbwriter-ti.hpp" -#include "icinga/service.hpp" +#include "icinga/checkable.hpp" #include "base/configobject.hpp" -#include "base/tcpsocket.hpp" -#include "base/timer.hpp" -#include +#include "perfdata/perfdatawriterconnection.hpp" namespace icinga { @@ -38,10 +36,9 @@ protected: private: WorkQueue m_WorkQueue{10000000, 1}; std::string m_MsgBuf; - Shared::Ptr m_Stream; + PerfdataWriterConnection::Ptr m_Connection; boost::signals2::connection m_HandleCheckResults; - Timer::Ptr m_ReconnectTimer; Dictionary::Ptr m_ServiceConfigTemplate; Dictionary::Ptr m_HostConfigTemplate; @@ -55,8 +52,6 @@ private: static String EscapeTag(const String& str); static String EscapeMetric(const String& str); - void ReconnectTimerHandler(); - void ReadConfigTemplate(); }; diff --git a/lib/perfdata/opentsdbwriter.ti b/lib/perfdata/opentsdbwriter.ti index 56bc8cdf4..dcad57168 100644 --- a/lib/perfdata/opentsdbwriter.ti +++ b/lib/perfdata/opentsdbwriter.ti @@ -31,10 +31,8 @@ class OpenTsdbWriter : ConfigObject [config] bool enable_generic_metrics { default {{{ return false; }}} }; - - [no_user_modify] bool connected; - [no_user_modify] bool should_connect { - default {{{ return true; }}} + [config] double disconnect_timeout { + default {{{ return 10; }}} }; };