mirror of
https://github.com/Icinga/icinga2.git
synced 2026-06-10 17:21:03 -04:00
Use PerfdataWriterConnection in perfdata writers
This commit is contained in:
parent
2e2576c7c5
commit
da2fc9df01
15 changed files with 219 additions and 708 deletions
|
|
@ -2,6 +2,7 @@
|
|||
// SPDX-License-Identifier: GPL-2.0-or-later
|
||||
|
||||
#include "perfdata/elasticsearchwriter.hpp"
|
||||
#include "base/defer.hpp"
|
||||
#include "perfdata/elasticsearchwriter-ti.cpp"
|
||||
#include "remote/url.hpp"
|
||||
#include "icinga/compatutility.hpp"
|
||||
|
|
@ -9,30 +10,14 @@
|
|||
#include "icinga/macroprocessor.hpp"
|
||||
#include "icinga/checkcommand.hpp"
|
||||
#include "base/application.hpp"
|
||||
#include "base/defer.hpp"
|
||||
#include "base/io-engine.hpp"
|
||||
#include "base/tcpsocket.hpp"
|
||||
#include "base/stream.hpp"
|
||||
#include "base/base64.hpp"
|
||||
#include "base/json.hpp"
|
||||
#include "base/utility.hpp"
|
||||
#include "base/networkstream.hpp"
|
||||
#include "base/perfdatavalue.hpp"
|
||||
#include "base/exception.hpp"
|
||||
#include "base/statsfunction.hpp"
|
||||
#include <boost/algorithm/string.hpp>
|
||||
#include <boost/asio/ssl/context.hpp>
|
||||
#include <boost/beast/core/flat_buffer.hpp>
|
||||
#include <boost/beast/http/field.hpp>
|
||||
#include <boost/beast/http/message.hpp>
|
||||
#include <boost/beast/http/parser.hpp>
|
||||
#include <boost/beast/http/read.hpp>
|
||||
#include <boost/beast/http/status.hpp>
|
||||
#include <boost/beast/http/string_body.hpp>
|
||||
#include <boost/beast/http/verb.hpp>
|
||||
#include <boost/beast/http/write.hpp>
|
||||
#include <boost/scoped_array.hpp>
|
||||
#include <memory>
|
||||
#include <string>
|
||||
#include <utility>
|
||||
|
||||
|
|
@ -78,12 +63,25 @@ void ElasticsearchWriter::StatsFunc(const Dictionary::Ptr& status, const Array::
|
|||
status->Set("elasticsearchwriter", new Dictionary(std::move(nodes)));
|
||||
}
|
||||
|
||||
void ElasticsearchWriter::Start(bool runtimeCreated)
|
||||
{
|
||||
ObjectImpl::Start(runtimeCreated);
|
||||
|
||||
if (GetEnableTls()) {
|
||||
try {
|
||||
m_SslContext = MakeAsioSslContext(GetCertPath(), GetKeyPath(), GetCaPath());
|
||||
} catch (const std::exception& ex) {
|
||||
Log(LogCritical, "ElasticsearchWriter")
|
||||
<< "Unable to create SSL context: " << ex.what();
|
||||
throw;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
void ElasticsearchWriter::Resume()
|
||||
{
|
||||
ObjectImpl<ElasticsearchWriter>::Resume();
|
||||
|
||||
m_EventPrefix = "icinga2.event.";
|
||||
|
||||
Log(LogInformation, "ElasticsearchWriter")
|
||||
<< "'" << GetName() << "' resumed.";
|
||||
|
||||
|
|
@ -96,6 +94,8 @@ void ElasticsearchWriter::Resume()
|
|||
m_FlushTimer->Start();
|
||||
m_FlushTimer->Reschedule(0);
|
||||
|
||||
m_Connection = new PerfdataWriterConnection{this, GetHost(), GetPort(), m_SslContext, !GetInsecureNoverify()};
|
||||
|
||||
/* Register for new metrics. */
|
||||
m_HandleCheckResults = Checkable::OnNewCheckResult.connect([this](const Checkable::Ptr& checkable,
|
||||
const CheckResult::Ptr& cr, const MessageOrigin::Ptr&) {
|
||||
|
|
@ -120,12 +120,17 @@ void ElasticsearchWriter::Pause()
|
|||
m_HandleNotifications.disconnect();
|
||||
|
||||
m_FlushTimer->Stop(true);
|
||||
m_WorkQueue.Join();
|
||||
|
||||
{
|
||||
std::unique_lock<std::mutex> lock (m_DataBufferMutex);
|
||||
std::promise<void> queueDonePromise;
|
||||
m_WorkQueue.Enqueue([&]() {
|
||||
Flush();
|
||||
}
|
||||
queueDonePromise.set_value();
|
||||
}, PriorityLow);
|
||||
|
||||
auto timeout = std::chrono::duration<double>{GetDisconnectTimeout()};
|
||||
m_Connection->CancelAfterTimeout(queueDonePromise.get_future(), timeout);
|
||||
|
||||
m_WorkQueue.Join();
|
||||
|
||||
Log(LogInformation, "ElasticsearchWriter")
|
||||
<< "'" << GetName() << "' paused.";
|
||||
|
|
@ -269,6 +274,10 @@ void ElasticsearchWriter::CheckResultHandler(const Checkable::Ptr& checkable, co
|
|||
AddTemplateTags(fields, checkable, cr);
|
||||
|
||||
m_WorkQueue.Enqueue([this, checkable, cr, fields = std::move(fields)]() {
|
||||
if (m_Connection->IsStopped()) {
|
||||
return;
|
||||
}
|
||||
|
||||
CONTEXT("Elasticwriter processing check result for '" << checkable->GetName() << "'");
|
||||
|
||||
AddCheckResult(fields, checkable, cr);
|
||||
|
|
@ -308,6 +317,10 @@ void ElasticsearchWriter::StateChangeHandler(const Checkable::Ptr& checkable, co
|
|||
AddTemplateTags(fields, checkable, cr);
|
||||
|
||||
m_WorkQueue.Enqueue([this, checkable, cr, fields = std::move(fields)]() {
|
||||
if (m_Connection->IsStopped()) {
|
||||
return;
|
||||
}
|
||||
|
||||
CONTEXT("Elasticwriter processing state change '" << checkable->GetName() << "'");
|
||||
|
||||
AddCheckResult(fields, checkable, cr);
|
||||
|
|
@ -358,6 +371,10 @@ void ElasticsearchWriter::NotificationSentToAllUsersHandler(const Checkable::Ptr
|
|||
AddTemplateTags(fields, checkable, cr);
|
||||
|
||||
m_WorkQueue.Enqueue([this, checkable, cr, fields = std::move(fields)]() {
|
||||
if (m_Connection->IsStopped()) {
|
||||
return;
|
||||
}
|
||||
|
||||
CONTEXT("Elasticwriter processing notification to all users '" << checkable->GetName() << "'");
|
||||
|
||||
Log(LogDebug, "ElasticsearchWriter")
|
||||
|
|
@ -379,15 +396,10 @@ void ElasticsearchWriter::Enqueue(const Checkable::Ptr& checkable, const String&
|
|||
{
|
||||
AssertOnWorkQueue();
|
||||
|
||||
/* Atomically buffer the data point. */
|
||||
std::unique_lock<std::mutex> lock(m_DataBufferMutex);
|
||||
|
||||
/* Format the timestamps to dynamically select the date datatype inside the index. */
|
||||
fields->Set("@timestamp", FormatTimestamp(ts));
|
||||
fields->Set("timestamp", FormatTimestamp(ts));
|
||||
|
||||
String eventType = m_EventPrefix + type;
|
||||
fields->Set("type", eventType);
|
||||
fields->Set("type", "icinga2.event." + type);
|
||||
|
||||
/* Every payload needs a line describing the index.
|
||||
* We do it this way to avoid problems with a near full queue.
|
||||
|
|
@ -408,19 +420,21 @@ void ElasticsearchWriter::Enqueue(const Checkable::Ptr& checkable, const String&
|
|||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Queues a Flush on the work-queue if there isn't one queued already.
|
||||
*/
|
||||
void ElasticsearchWriter::FlushTimeout()
|
||||
{
|
||||
/* Prevent new data points from being added to the array, there is a
|
||||
* race condition where they could disappear.
|
||||
*/
|
||||
std::unique_lock<std::mutex> lock(m_DataBufferMutex);
|
||||
|
||||
/* Flush if there are any data available. */
|
||||
if (m_DataBuffer.size() > 0) {
|
||||
Log(LogDebug, "ElasticsearchWriter")
|
||||
<< "Timer expired writing " << m_DataBuffer.size() << " data points";
|
||||
Flush();
|
||||
if (m_FlushTimerInQueue.exchange(true, std::memory_order_relaxed)) {
|
||||
return;
|
||||
}
|
||||
|
||||
m_WorkQueue.Enqueue([&]() {
|
||||
Defer resetFlushTimer{
|
||||
[&]() { m_FlushTimerInQueue.store(false, std::memory_order_relaxed); }
|
||||
};
|
||||
Flush();
|
||||
});
|
||||
}
|
||||
|
||||
void ElasticsearchWriter::Flush()
|
||||
|
|
@ -466,22 +480,6 @@ void ElasticsearchWriter::SendRequest(const String& body)
|
|||
|
||||
url->SetPath(path);
|
||||
|
||||
OptionalTlsStream stream;
|
||||
|
||||
try {
|
||||
stream = Connect();
|
||||
} catch (const std::exception& ex) {
|
||||
Log(LogWarning, "ElasticsearchWriter")
|
||||
<< "Flush failed, cannot connect to Elasticsearch: " << DiagnosticInformation(ex, false);
|
||||
return;
|
||||
}
|
||||
|
||||
Defer s ([&stream]() {
|
||||
if (stream.first) {
|
||||
stream.first->next_layer().shutdown();
|
||||
}
|
||||
});
|
||||
|
||||
http::request<http::string_body> request (http::verb::post, std::string(url->Format(true)), 10);
|
||||
|
||||
request.set(http::field::user_agent, "Icinga/" + Application::GetAppVersion());
|
||||
|
|
@ -511,37 +509,14 @@ void ElasticsearchWriter::SendRequest(const String& body)
|
|||
<< "Sending " << request.method_string() << " request" << ((!username.IsEmpty() && !password.IsEmpty()) ? " with basic auth" : "" )
|
||||
<< " to '" << url->Format() << "'.";
|
||||
|
||||
decltype(m_Connection->Send(request)) response;
|
||||
try {
|
||||
if (stream.first) {
|
||||
http::write(*stream.first, request);
|
||||
stream.first->flush();
|
||||
} else {
|
||||
http::write(*stream.second, request);
|
||||
stream.second->flush();
|
||||
}
|
||||
} catch (const std::exception&) {
|
||||
Log(LogWarning, "ElasticsearchWriter")
|
||||
<< "Cannot write to HTTP API on host '" << GetHost() << "' port '" << GetPort() << "'.";
|
||||
throw;
|
||||
response = m_Connection->Send(request);
|
||||
} catch (const PerfdataWriterConnection::Stopped& ex) {
|
||||
Log(LogDebug, "ElasticsearchWriter") << ex.what();
|
||||
return;
|
||||
}
|
||||
|
||||
http::parser<false, http::string_body> parser;
|
||||
beast::flat_buffer buf;
|
||||
|
||||
try {
|
||||
if (stream.first) {
|
||||
http::read(*stream.first, buf, parser);
|
||||
} else {
|
||||
http::read(*stream.second, buf, parser);
|
||||
}
|
||||
} catch (const std::exception& ex) {
|
||||
Log(LogWarning, "ElasticsearchWriter")
|
||||
<< "Failed to parse HTTP response from host '" << GetHost() << "' port '" << GetPort() << "': " << DiagnosticInformation(ex, false);
|
||||
throw;
|
||||
}
|
||||
|
||||
auto& response (parser.get());
|
||||
|
||||
if (response.result_int() > 299) {
|
||||
if (response.result() == http::status::unauthorized) {
|
||||
/* More verbose error logging with Elasticsearch is hidden behind a proxy. */
|
||||
|
|
@ -589,66 +564,6 @@ void ElasticsearchWriter::SendRequest(const String& body)
|
|||
}
|
||||
}
|
||||
|
||||
OptionalTlsStream ElasticsearchWriter::Connect()
|
||||
{
|
||||
Log(LogNotice, "ElasticsearchWriter")
|
||||
<< "Connecting to Elasticsearch on host '" << GetHost() << "' port '" << GetPort() << "'.";
|
||||
|
||||
OptionalTlsStream stream;
|
||||
bool tls = GetEnableTls();
|
||||
|
||||
if (tls) {
|
||||
Shared<boost::asio::ssl::context>::Ptr sslContext;
|
||||
|
||||
try {
|
||||
sslContext = MakeAsioSslContext(GetCertPath(), GetKeyPath(), GetCaPath());
|
||||
} catch (const std::exception&) {
|
||||
Log(LogWarning, "ElasticsearchWriter")
|
||||
<< "Unable to create SSL context.";
|
||||
throw;
|
||||
}
|
||||
|
||||
stream.first = Shared<AsioTlsStream>::Make(IoEngine::Get().GetIoContext(), *sslContext, GetHost());
|
||||
|
||||
} else {
|
||||
stream.second = Shared<AsioTcpStream>::Make(IoEngine::Get().GetIoContext());
|
||||
}
|
||||
|
||||
try {
|
||||
icinga::Connect(tls ? stream.first->lowest_layer() : stream.second->lowest_layer(), GetHost(), GetPort());
|
||||
} catch (const std::exception&) {
|
||||
Log(LogWarning, "ElasticsearchWriter")
|
||||
<< "Can't connect to Elasticsearch on host '" << GetHost() << "' port '" << GetPort() << "'.";
|
||||
throw;
|
||||
}
|
||||
|
||||
if (tls) {
|
||||
auto& tlsStream (stream.first->next_layer());
|
||||
|
||||
try {
|
||||
tlsStream.handshake(tlsStream.client);
|
||||
} catch (const std::exception&) {
|
||||
Log(LogWarning, "ElasticsearchWriter")
|
||||
<< "TLS handshake with host '" << GetHost() << "' on port " << GetPort() << " failed.";
|
||||
throw;
|
||||
}
|
||||
|
||||
if (!GetInsecureNoverify()) {
|
||||
if (!tlsStream.GetPeerCertificate()) {
|
||||
BOOST_THROW_EXCEPTION(std::runtime_error("Elasticsearch didn't present any TLS certificate."));
|
||||
}
|
||||
|
||||
if (!tlsStream.IsVerifyOK()) {
|
||||
BOOST_THROW_EXCEPTION(std::runtime_error(
|
||||
"TLS certificate validation failed: " + std::string(tlsStream.GetVerifyError())
|
||||
));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return stream;
|
||||
}
|
||||
|
||||
void ElasticsearchWriter::AssertOnWorkQueue()
|
||||
{
|
||||
ASSERT(m_WorkQueue.IsWorkerThread());
|
||||
|
|
|
|||
|
|
@ -5,11 +5,10 @@
|
|||
#define ELASTICSEARCHWRITER_H
|
||||
|
||||
#include "perfdata/elasticsearchwriter-ti.hpp"
|
||||
#include "icinga/service.hpp"
|
||||
#include "icinga/checkable.hpp"
|
||||
#include "base/configobject.hpp"
|
||||
#include "base/workqueue.hpp"
|
||||
#include "base/timer.hpp"
|
||||
#include "base/tlsstream.hpp"
|
||||
#include "perfdata/perfdatawriterconnection.hpp"
|
||||
|
||||
namespace icinga
|
||||
{
|
||||
|
|
@ -29,16 +28,18 @@ public:
|
|||
|
||||
protected:
|
||||
void OnConfigLoaded() override;
|
||||
void Start(bool runtimeCreated) override;
|
||||
void Resume() override;
|
||||
void Pause() override;
|
||||
|
||||
private:
|
||||
String m_EventPrefix;
|
||||
WorkQueue m_WorkQueue{10000000, 1};
|
||||
boost::signals2::connection m_HandleCheckResults, m_HandleStateChanges, m_HandleNotifications;
|
||||
Timer::Ptr m_FlushTimer;
|
||||
std::atomic_bool m_FlushTimerInQueue{false};
|
||||
std::vector<String> m_DataBuffer;
|
||||
std::mutex m_DataBufferMutex;
|
||||
Shared<boost::asio::ssl::context>::Ptr m_SslContext;
|
||||
PerfdataWriterConnection::Ptr m_Connection;
|
||||
|
||||
void AddCheckResult(const Dictionary::Ptr& fields, const Checkable::Ptr& checkable, const CheckResult::Ptr& cr);
|
||||
void AddTemplateTags(const Dictionary::Ptr& fields, const Checkable::Ptr& checkable, const CheckResult::Ptr& cr);
|
||||
|
|
@ -51,7 +52,6 @@ private:
|
|||
void Enqueue(const Checkable::Ptr& checkable, const String& type,
|
||||
const Dictionary::Ptr& fields, double ts);
|
||||
|
||||
OptionalTlsStream Connect();
|
||||
void AssertOnWorkQueue();
|
||||
void ExceptionHandler(boost::exception_ptr exp);
|
||||
void FlushTimeout();
|
||||
|
|
|
|||
|
|
@ -40,7 +40,10 @@ class ElasticsearchWriter : ConfigObject
|
|||
[config] String cert_path;
|
||||
[config] String key_path;
|
||||
|
||||
[config] int flush_interval {
|
||||
[config] double disconnect_timeout {
|
||||
default {{{ return 10; }}}
|
||||
};
|
||||
[config] double flush_interval {
|
||||
default {{{ return 10; }}}
|
||||
};
|
||||
[config] int flush_threshold {
|
||||
|
|
|
|||
|
|
@ -6,28 +6,19 @@
|
|||
#include "icinga/service.hpp"
|
||||
#include "icinga/notification.hpp"
|
||||
#include "icinga/checkcommand.hpp"
|
||||
#include "icinga/macroprocessor.hpp"
|
||||
#include "icinga/compatutility.hpp"
|
||||
#include "base/tcpsocket.hpp"
|
||||
#include "base/configtype.hpp"
|
||||
#include "base/objectlock.hpp"
|
||||
#include "base/logger.hpp"
|
||||
#include "base/utility.hpp"
|
||||
#include "base/perfdatavalue.hpp"
|
||||
#include "base/application.hpp"
|
||||
#include "base/stream.hpp"
|
||||
#include "base/networkstream.hpp"
|
||||
#include "base/context.hpp"
|
||||
#include "base/exception.hpp"
|
||||
#include "base/json.hpp"
|
||||
#include "base/statsfunction.hpp"
|
||||
#include <boost/algorithm/string/replace.hpp>
|
||||
#include <utility>
|
||||
#include "base/io-engine.hpp"
|
||||
#include <boost/asio/write.hpp>
|
||||
#include <boost/asio/buffer.hpp>
|
||||
#include <boost/system/error_code.hpp>
|
||||
#include <boost/asio/error.hpp>
|
||||
|
||||
using namespace icinga;
|
||||
|
||||
|
|
@ -62,7 +53,7 @@ void GelfWriter::StatsFunc(const Dictionary::Ptr& status, const Array::Ptr& perf
|
|||
nodes.emplace_back(gelfwriter->GetName(), new Dictionary({
|
||||
{ "work_queue_items", workQueueItems },
|
||||
{ "work_queue_item_rate", workQueueItemRate },
|
||||
{ "connected", gelfwriter->GetConnected() },
|
||||
{ "connected", gelfwriter->m_Connection->IsConnected() },
|
||||
{ "source", gelfwriter->GetSource() }
|
||||
}));
|
||||
|
||||
|
|
@ -73,6 +64,22 @@ void GelfWriter::StatsFunc(const Dictionary::Ptr& status, const Array::Ptr& perf
|
|||
status->Set("gelfwriter", new Dictionary(std::move(nodes)));
|
||||
}
|
||||
|
||||
void GelfWriter::Start(bool runtimeCreated)
|
||||
{
|
||||
ObjectImpl::Start(runtimeCreated);
|
||||
|
||||
/* Initialize connection */
|
||||
if (GetEnableTls()) {
|
||||
try {
|
||||
m_SslContext = MakeAsioSslContext(GetCertPath(), GetKeyPath(), GetCaPath());
|
||||
} catch (const std::exception& ex) {
|
||||
Log(LogWarning, "GelfWriter")
|
||||
<< "Unable to create SSL context.";
|
||||
throw;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
void GelfWriter::Resume()
|
||||
{
|
||||
ObjectImpl<GelfWriter>::Resume();
|
||||
|
|
@ -83,12 +90,7 @@ void GelfWriter::Resume()
|
|||
/* Register exception handler for WQ tasks. */
|
||||
m_WorkQueue.SetExceptionCallback([this](boost::exception_ptr exp) { ExceptionHandler(std::move(exp)); });
|
||||
|
||||
/* Timer for reconnecting */
|
||||
m_ReconnectTimer = Timer::Create();
|
||||
m_ReconnectTimer->SetInterval(10);
|
||||
m_ReconnectTimer->OnTimerExpired.connect([this](const Timer * const&) { ReconnectTimerHandler(); });
|
||||
m_ReconnectTimer->Start();
|
||||
m_ReconnectTimer->Reschedule(0);
|
||||
m_Connection = new PerfdataWriterConnection{this, GetHost(), GetPort(), m_SslContext, !GetInsecureNoverify()};
|
||||
|
||||
/* Register event handlers. */
|
||||
m_HandleCheckResults = Checkable::OnNewCheckResult.connect([this](const Checkable::Ptr& checkable,
|
||||
|
|
@ -113,18 +115,15 @@ void GelfWriter::Pause()
|
|||
m_HandleNotifications.disconnect();
|
||||
m_HandleStateChanges.disconnect();
|
||||
|
||||
m_ReconnectTimer->Stop(true);
|
||||
std::promise<void> queueDonePromise;
|
||||
|
||||
m_WorkQueue.Enqueue([this]() {
|
||||
try {
|
||||
ReconnectInternal();
|
||||
} catch (const std::exception&) {
|
||||
Log(LogInformation, "GelfWriter")
|
||||
<< "Unable to connect, not flushing buffers. Data may be lost.";
|
||||
}
|
||||
}, PriorityImmediate);
|
||||
m_WorkQueue.Enqueue([&]() {
|
||||
queueDonePromise.set_value();
|
||||
}, PriorityLow);
|
||||
|
||||
auto timeout = std::chrono::duration<double>{GetDisconnectTimeout()};
|
||||
m_Connection->CancelAfterTimeout(queueDonePromise.get_future(), timeout);
|
||||
|
||||
m_WorkQueue.Enqueue([this]() { DisconnectInternal(); }, PriorityLow);
|
||||
m_WorkQueue.Join();
|
||||
|
||||
Log(LogInformation, "GelfWriter")
|
||||
|
|
@ -142,126 +141,6 @@ void GelfWriter::ExceptionHandler(boost::exception_ptr exp)
|
|||
{
|
||||
Log(LogCritical, "GelfWriter") << "Exception during Graylog Gelf operation: " << DiagnosticInformation(exp, false);
|
||||
Log(LogDebug, "GelfWriter") << "Exception during Graylog Gelf operation: " << DiagnosticInformation(exp, true);
|
||||
|
||||
DisconnectInternal();
|
||||
}
|
||||
|
||||
void GelfWriter::Reconnect()
|
||||
{
|
||||
AssertOnWorkQueue();
|
||||
|
||||
if (IsPaused()) {
|
||||
SetConnected(false);
|
||||
return;
|
||||
}
|
||||
|
||||
ReconnectInternal();
|
||||
}
|
||||
|
||||
void GelfWriter::ReconnectInternal()
|
||||
{
|
||||
double startTime = Utility::GetTime();
|
||||
|
||||
CONTEXT("Reconnecting to Graylog Gelf '" << GetName() << "'");
|
||||
|
||||
SetShouldConnect(true);
|
||||
|
||||
if (GetConnected())
|
||||
return;
|
||||
|
||||
Log(LogNotice, "GelfWriter")
|
||||
<< "Reconnecting to Graylog Gelf on host '" << GetHost() << "' port '" << GetPort() << "'.";
|
||||
|
||||
bool ssl = GetEnableTls();
|
||||
|
||||
if (ssl) {
|
||||
Shared<boost::asio::ssl::context>::Ptr sslContext;
|
||||
|
||||
try {
|
||||
sslContext = MakeAsioSslContext(GetCertPath(), GetKeyPath(), GetCaPath());
|
||||
} catch (const std::exception& ex) {
|
||||
Log(LogWarning, "GelfWriter")
|
||||
<< "Unable to create SSL context.";
|
||||
throw;
|
||||
}
|
||||
|
||||
m_Stream.first = Shared<AsioTlsStream>::Make(IoEngine::Get().GetIoContext(), *sslContext, GetHost());
|
||||
|
||||
} else {
|
||||
m_Stream.second = Shared<AsioTcpStream>::Make(IoEngine::Get().GetIoContext());
|
||||
}
|
||||
|
||||
try {
|
||||
icinga::Connect(ssl ? m_Stream.first->lowest_layer() : m_Stream.second->lowest_layer(), GetHost(), GetPort());
|
||||
} catch (const std::exception& ex) {
|
||||
Log(LogWarning, "GelfWriter")
|
||||
<< "Can't connect to Graylog Gelf on host '" << GetHost() << "' port '" << GetPort() << ".'";
|
||||
throw;
|
||||
}
|
||||
|
||||
if (ssl) {
|
||||
auto& tlsStream (m_Stream.first->next_layer());
|
||||
|
||||
try {
|
||||
tlsStream.handshake(tlsStream.client);
|
||||
} catch (const std::exception& ex) {
|
||||
Log(LogWarning, "GelfWriter")
|
||||
<< "TLS handshake with host '" << GetHost() << " failed.'";
|
||||
throw;
|
||||
}
|
||||
|
||||
if (!GetInsecureNoverify()) {
|
||||
if (!tlsStream.GetPeerCertificate()) {
|
||||
BOOST_THROW_EXCEPTION(std::runtime_error("Graylog Gelf didn't present any TLS certificate."));
|
||||
}
|
||||
|
||||
if (!tlsStream.IsVerifyOK()) {
|
||||
BOOST_THROW_EXCEPTION(std::runtime_error(
|
||||
"TLS certificate validation failed: " + std::string(tlsStream.GetVerifyError())
|
||||
));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
SetConnected(true);
|
||||
|
||||
Log(LogInformation, "GelfWriter")
|
||||
<< "Finished reconnecting to Graylog Gelf in " << std::setw(2) << Utility::GetTime() - startTime << " second(s).";
|
||||
}
|
||||
|
||||
void GelfWriter::ReconnectTimerHandler()
|
||||
{
|
||||
m_WorkQueue.Enqueue([this]() { Reconnect(); }, PriorityNormal);
|
||||
}
|
||||
|
||||
void GelfWriter::Disconnect()
|
||||
{
|
||||
AssertOnWorkQueue();
|
||||
|
||||
DisconnectInternal();
|
||||
}
|
||||
|
||||
void GelfWriter::DisconnectInternal()
|
||||
{
|
||||
if (!GetConnected())
|
||||
return;
|
||||
|
||||
if (m_Stream.first) {
|
||||
boost::system::error_code ec;
|
||||
m_Stream.first->next_layer().shutdown(ec);
|
||||
|
||||
// https://stackoverflow.com/a/25703699
|
||||
// As long as the error code's category is not an SSL category, then the protocol was securely shutdown
|
||||
if (ec.category() == boost::asio::error::get_ssl_category()) {
|
||||
Log(LogCritical, "GelfWriter")
|
||||
<< "TLS shutdown with host '" << GetHost() << "' could not be done securely.";
|
||||
}
|
||||
} else if (m_Stream.second) {
|
||||
m_Stream.second->close();
|
||||
}
|
||||
|
||||
SetConnected(false);
|
||||
|
||||
}
|
||||
|
||||
void GelfWriter::CheckResultHandler(const Checkable::Ptr& checkable, const CheckResult::Ptr& cr)
|
||||
|
|
@ -298,6 +177,10 @@ void GelfWriter::CheckResultHandler(const Checkable::Ptr& checkable, const Check
|
|||
fields->Set("_check_command", checkCommand->GetName());
|
||||
|
||||
m_WorkQueue.Enqueue([this, checkable, cr, fields = std::move(fields)]() {
|
||||
if (m_Connection->IsStopped()) {
|
||||
return;
|
||||
}
|
||||
|
||||
CONTEXT("GELF Processing check result for '" << checkable->GetName() << "'");
|
||||
|
||||
Log(LogDebug, "GelfWriter")
|
||||
|
|
@ -405,6 +288,10 @@ void GelfWriter::NotificationToUserHandler(const Checkable::Ptr& checkable, Noti
|
|||
fields->Set("_check_command", checkable->GetCheckCommand()->GetName());
|
||||
|
||||
m_WorkQueue.Enqueue([this, checkable, ts, fields = std::move(fields)]() {
|
||||
if (m_Connection->IsStopped()) {
|
||||
return;
|
||||
}
|
||||
|
||||
CONTEXT("GELF Processing notification to all users '" << checkable->GetName() << "'");
|
||||
|
||||
Log(LogDebug, "GelfWriter")
|
||||
|
|
@ -447,6 +334,10 @@ void GelfWriter::StateChangeHandler(const Checkable::Ptr& checkable, const Check
|
|||
fields->Set("_check_source", cr->GetCheckSource());
|
||||
|
||||
m_WorkQueue.Enqueue([this, checkable, fields = std::move(fields), ts = cr->GetExecutionEnd()]() {
|
||||
if (m_Connection->IsStopped()) {
|
||||
return;
|
||||
}
|
||||
|
||||
CONTEXT("GELF Processing state change '" << checkable->GetName() << "'");
|
||||
|
||||
Log(LogDebug, "GelfWriter")
|
||||
|
|
@ -473,26 +364,15 @@ void GelfWriter::SendLogMessage(const Checkable::Ptr& checkable, const String& g
|
|||
msgbuf << gelfMessage;
|
||||
msgbuf << '\0';
|
||||
|
||||
String log = msgbuf.str();
|
||||
|
||||
if (!GetConnected())
|
||||
return;
|
||||
auto log = msgbuf.str();
|
||||
|
||||
try {
|
||||
Log(LogDebug, "GelfWriter")
|
||||
<< "Checkable '" << checkable->GetName() << "' sending message '" << log << "'.";
|
||||
|
||||
if (m_Stream.first) {
|
||||
boost::asio::write(*m_Stream.first, boost::asio::buffer(msgbuf.str()));
|
||||
m_Stream.first->flush();
|
||||
} else {
|
||||
boost::asio::write(*m_Stream.second, boost::asio::buffer(msgbuf.str()));
|
||||
m_Stream.second->flush();
|
||||
}
|
||||
} catch (const std::exception& ex) {
|
||||
Log(LogCritical, "GelfWriter")
|
||||
<< "Cannot write to TCP socket on host '" << GetHost() << "' port '" << GetPort() << "'.";
|
||||
|
||||
throw ex;
|
||||
m_Connection->Send(boost::asio::const_buffer{log.data(), log.length()});
|
||||
} catch (const PerfdataWriterConnection::Stopped& ex) {
|
||||
Log(LogDebug, "GelfWriter") << ex.what();
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -5,12 +5,10 @@
|
|||
#define GELFWRITER_H
|
||||
|
||||
#include "perfdata/gelfwriter-ti.hpp"
|
||||
#include "icinga/service.hpp"
|
||||
#include "perfdata/perfdatawriterconnection.hpp"
|
||||
#include "icinga/checkable.hpp"
|
||||
#include "base/configobject.hpp"
|
||||
#include "base/tcpsocket.hpp"
|
||||
#include "base/timer.hpp"
|
||||
#include "base/workqueue.hpp"
|
||||
#include <fstream>
|
||||
|
||||
namespace icinga
|
||||
{
|
||||
|
|
@ -30,15 +28,16 @@ public:
|
|||
|
||||
protected:
|
||||
void OnConfigLoaded() override;
|
||||
void Start(bool runtimeCreated) override;
|
||||
void Resume() override;
|
||||
void Pause() override;
|
||||
|
||||
private:
|
||||
OptionalTlsStream m_Stream;
|
||||
PerfdataWriterConnection::Ptr m_Connection;
|
||||
WorkQueue m_WorkQueue{10000000, 1};
|
||||
Shared<boost::asio::ssl::context>::Ptr m_SslContext;
|
||||
|
||||
boost::signals2::connection m_HandleCheckResults, m_HandleNotifications, m_HandleStateChanges;
|
||||
Timer::Ptr m_ReconnectTimer;
|
||||
|
||||
void CheckResultHandler(const Checkable::Ptr& checkable, const CheckResult::Ptr& cr);
|
||||
void NotificationToUserHandler(const Checkable::Ptr& checkable, NotificationType notificationType, const CheckResult::Ptr& cr,
|
||||
|
|
@ -48,13 +47,6 @@ private:
|
|||
String ComposeGelfMessage(const Dictionary::Ptr& fields, const String& source, double ts);
|
||||
void SendLogMessage(const Checkable::Ptr& checkable, const String& gelfMessage);
|
||||
|
||||
void ReconnectTimerHandler();
|
||||
|
||||
void Disconnect();
|
||||
void DisconnectInternal();
|
||||
void Reconnect();
|
||||
void ReconnectInternal();
|
||||
|
||||
void AssertOnWorkQueue();
|
||||
|
||||
void ExceptionHandler(boost::exception_ptr exp);
|
||||
|
|
|
|||
|
|
@ -25,9 +25,8 @@ class GelfWriter : ConfigObject
|
|||
default {{{ return false; }}}
|
||||
};
|
||||
|
||||
[no_user_modify] bool connected;
|
||||
[no_user_modify] bool should_connect {
|
||||
default {{{ return true; }}}
|
||||
[config] double disconnect_timeout {
|
||||
default {{{ return 10; }}}
|
||||
};
|
||||
[config] bool enable_ha {
|
||||
default {{{ return false; }}}
|
||||
|
|
|
|||
|
|
@ -7,16 +7,13 @@
|
|||
#include "icinga/checkcommand.hpp"
|
||||
#include "icinga/macroprocessor.hpp"
|
||||
#include "icinga/icingaapplication.hpp"
|
||||
#include "base/tcpsocket.hpp"
|
||||
#include "base/configtype.hpp"
|
||||
#include "base/objectlock.hpp"
|
||||
#include "base/logger.hpp"
|
||||
#include "base/convert.hpp"
|
||||
#include "base/utility.hpp"
|
||||
#include "base/perfdatavalue.hpp"
|
||||
#include "base/application.hpp"
|
||||
#include "base/stream.hpp"
|
||||
#include "base/networkstream.hpp"
|
||||
#include "base/exception.hpp"
|
||||
#include "base/statsfunction.hpp"
|
||||
#include <boost/algorithm/string.hpp>
|
||||
|
|
@ -65,7 +62,7 @@ void GraphiteWriter::StatsFunc(const Dictionary::Ptr& status, const Array::Ptr&
|
|||
nodes.emplace_back(graphitewriter->GetName(), new Dictionary({
|
||||
{ "work_queue_items", workQueueItems },
|
||||
{ "work_queue_item_rate", workQueueItemRate },
|
||||
{ "connected", graphitewriter->GetConnected() }
|
||||
{ "connected", graphitewriter->m_Connection->IsConnected() }
|
||||
}));
|
||||
|
||||
perfdata->Add(new PerfdataValue("graphitewriter_" + graphitewriter->GetName() + "_work_queue_items", workQueueItems));
|
||||
|
|
@ -88,12 +85,7 @@ void GraphiteWriter::Resume()
|
|||
/* Register exception handler for WQ tasks. */
|
||||
m_WorkQueue.SetExceptionCallback([this](boost::exception_ptr exp) { ExceptionHandler(std::move(exp)); });
|
||||
|
||||
/* Timer for reconnecting */
|
||||
m_ReconnectTimer = Timer::Create();
|
||||
m_ReconnectTimer->SetInterval(10);
|
||||
m_ReconnectTimer->OnTimerExpired.connect([this](const Timer * const&) { ReconnectTimerHandler(); });
|
||||
m_ReconnectTimer->Start();
|
||||
m_ReconnectTimer->Reschedule(0);
|
||||
m_Connection = new PerfdataWriterConnection{this, GetHost(), GetPort()};
|
||||
|
||||
/* Register event handlers. */
|
||||
m_HandleCheckResults = Checkable::OnNewCheckResult.connect([this](const Checkable::Ptr& checkable,
|
||||
|
|
@ -108,20 +100,17 @@ void GraphiteWriter::Resume()
|
|||
void GraphiteWriter::Pause()
|
||||
{
|
||||
m_HandleCheckResults.disconnect();
|
||||
m_ReconnectTimer->Stop(true);
|
||||
|
||||
try {
|
||||
ReconnectInternal();
|
||||
} catch (const std::exception&) {
|
||||
Log(LogInformation, "GraphiteWriter")
|
||||
<< "'" << GetName() << "' paused. Unable to connect, not flushing buffers. Data may be lost on reload.";
|
||||
std::promise<void> queueDonePromise;
|
||||
|
||||
ObjectImpl<GraphiteWriter>::Pause();
|
||||
return;
|
||||
}
|
||||
m_WorkQueue.Enqueue([&]() {
|
||||
queueDonePromise.set_value();
|
||||
}, PriorityLow);
|
||||
|
||||
auto timeout = std::chrono::duration<double>{GetDisconnectTimeout()};
|
||||
m_Connection->CancelAfterTimeout(queueDonePromise.get_future(), timeout);
|
||||
|
||||
m_WorkQueue.Join();
|
||||
DisconnectInternal();
|
||||
|
||||
Log(LogInformation, "GraphiteWriter")
|
||||
<< "'" << GetName() << "' paused.";
|
||||
|
|
@ -150,105 +139,6 @@ void GraphiteWriter::ExceptionHandler(boost::exception_ptr exp)
|
|||
|
||||
Log(LogDebug, "GraphiteWriter")
|
||||
<< "Exception during Graphite operation: " << DiagnosticInformation(std::move(exp));
|
||||
|
||||
if (GetConnected()) {
|
||||
m_Stream->close();
|
||||
|
||||
SetConnected(false);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Reconnect method, stops when the feature is paused in HA zones.
|
||||
*
|
||||
* Called inside the WQ.
|
||||
*/
|
||||
void GraphiteWriter::Reconnect()
|
||||
{
|
||||
AssertOnWorkQueue();
|
||||
|
||||
if (IsPaused()) {
|
||||
SetConnected(false);
|
||||
return;
|
||||
}
|
||||
|
||||
ReconnectInternal();
|
||||
}
|
||||
|
||||
/**
|
||||
* Reconnect method, connects to a TCP Stream
|
||||
*/
|
||||
void GraphiteWriter::ReconnectInternal()
|
||||
{
|
||||
double startTime = Utility::GetTime();
|
||||
|
||||
CONTEXT("Reconnecting to Graphite '" << GetName() << "'");
|
||||
|
||||
SetShouldConnect(true);
|
||||
|
||||
if (GetConnected())
|
||||
return;
|
||||
|
||||
Log(LogNotice, "GraphiteWriter")
|
||||
<< "Reconnecting to Graphite on host '" << GetHost() << "' port '" << GetPort() << "'.";
|
||||
|
||||
m_Stream = Shared<AsioTcpStream>::Make(IoEngine::Get().GetIoContext());
|
||||
|
||||
try {
|
||||
icinga::Connect(m_Stream->lowest_layer(), GetHost(), GetPort());
|
||||
} catch (const std::exception& ex) {
|
||||
Log(LogWarning, "GraphiteWriter")
|
||||
<< "Can't connect to Graphite on host '" << GetHost() << "' port '" << GetPort() << ".'";
|
||||
|
||||
SetConnected(false);
|
||||
|
||||
throw;
|
||||
}
|
||||
|
||||
SetConnected(true);
|
||||
|
||||
Log(LogInformation, "GraphiteWriter")
|
||||
<< "Finished reconnecting to Graphite in " << std::setw(2) << Utility::GetTime() - startTime << " second(s).";
|
||||
}
|
||||
|
||||
/**
|
||||
* Reconnect handler called by the timer.
|
||||
*
|
||||
* Enqueues a reconnect task into the WQ.
|
||||
*/
|
||||
void GraphiteWriter::ReconnectTimerHandler()
|
||||
{
|
||||
if (IsPaused())
|
||||
return;
|
||||
|
||||
m_WorkQueue.Enqueue([this]() { Reconnect(); }, PriorityHigh);
|
||||
}
|
||||
|
||||
/**
|
||||
* Disconnect the stream.
|
||||
*
|
||||
* Called inside the WQ.
|
||||
*/
|
||||
void GraphiteWriter::Disconnect()
|
||||
{
|
||||
AssertOnWorkQueue();
|
||||
|
||||
DisconnectInternal();
|
||||
}
|
||||
|
||||
/**
|
||||
* Disconnect the stream.
|
||||
*
|
||||
* Called outside the WQ.
|
||||
*/
|
||||
void GraphiteWriter::DisconnectInternal()
|
||||
{
|
||||
if (!GetConnected())
|
||||
return;
|
||||
|
||||
m_Stream->close();
|
||||
|
||||
SetConnected(false);
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
@ -302,11 +192,11 @@ void GraphiteWriter::CheckResultHandler(const Checkable::Ptr& checkable, const C
|
|||
}
|
||||
|
||||
m_WorkQueue.Enqueue([this, checkable, cr, prefix = std::move(prefix), metadata = std::move(metadata)]() {
|
||||
CONTEXT("Processing check result for '" << checkable->GetName() << "'");
|
||||
if (m_Connection->IsStopped()) {
|
||||
return;
|
||||
}
|
||||
|
||||
/* TODO: Deal with missing connection here. Needs refactoring
|
||||
* into parsing the actual performance data and then putting it
|
||||
* into a queue for re-inserting. */
|
||||
CONTEXT("Processing check result for '" << checkable->GetName() << "'");
|
||||
|
||||
for (auto& [name, val] : metadata) {
|
||||
SendMetric(checkable, prefix + ".metadata", name, val, cr->GetExecutionEnd());
|
||||
|
|
@ -394,19 +284,11 @@ void GraphiteWriter::SendMetric(const Checkable::Ptr& checkable, const String& p
|
|||
// do not send \n to debug log
|
||||
msgbuf << "\n";
|
||||
|
||||
std::unique_lock<std::mutex> lock(m_StreamMutex);
|
||||
|
||||
if (!GetConnected())
|
||||
return;
|
||||
|
||||
try {
|
||||
asio::write(*m_Stream, asio::buffer(msgbuf.str()));
|
||||
m_Stream->flush();
|
||||
} catch (const std::exception& ex) {
|
||||
Log(LogCritical, "GraphiteWriter")
|
||||
<< "Cannot write to TCP socket on host '" << GetHost() << "' port '" << GetPort() << "'.";
|
||||
|
||||
throw ex;
|
||||
m_Connection->Send(asio::buffer(msgbuf.str()));
|
||||
} catch (const PerfdataWriterConnection::Stopped& ex) {
|
||||
Log(LogDebug, "GraphiteWriter") << ex.what();
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -5,13 +5,10 @@
|
|||
#define GRAPHITEWRITER_H
|
||||
|
||||
#include "perfdata/graphitewriter-ti.hpp"
|
||||
#include "icinga/service.hpp"
|
||||
#include "icinga/checkable.hpp"
|
||||
#include "base/configobject.hpp"
|
||||
#include "base/tcpsocket.hpp"
|
||||
#include "base/timer.hpp"
|
||||
#include "base/workqueue.hpp"
|
||||
#include <fstream>
|
||||
#include <mutex>
|
||||
#include "perfdata/perfdatawriterconnection.hpp"
|
||||
|
||||
namespace icinga
|
||||
{
|
||||
|
|
@ -38,12 +35,10 @@ protected:
|
|||
void Pause() override;
|
||||
|
||||
private:
|
||||
Shared<AsioTcpStream>::Ptr m_Stream;
|
||||
std::mutex m_StreamMutex;
|
||||
PerfdataWriterConnection::Ptr m_Connection;
|
||||
WorkQueue m_WorkQueue{10000000, 1};
|
||||
|
||||
boost::signals2::connection m_HandleCheckResults;
|
||||
Timer::Ptr m_ReconnectTimer;
|
||||
|
||||
void CheckResultHandler(const Checkable::Ptr& checkable, const CheckResult::Ptr& cr);
|
||||
void SendMetric(const Checkable::Ptr& checkable, const String& prefix, const String& name, double value, double ts);
|
||||
|
|
@ -52,13 +47,6 @@ private:
|
|||
static String EscapeMetricLabel(const String& str);
|
||||
static Value EscapeMacroMetric(const Value& value);
|
||||
|
||||
void ReconnectTimerHandler();
|
||||
|
||||
void Disconnect();
|
||||
void DisconnectInternal();
|
||||
void Reconnect();
|
||||
void ReconnectInternal();
|
||||
|
||||
void AssertOnWorkQueue();
|
||||
|
||||
void ExceptionHandler(boost::exception_ptr exp);
|
||||
|
|
|
|||
|
|
@ -27,9 +27,8 @@ class GraphiteWriter : ConfigObject
|
|||
[config] bool enable_send_thresholds;
|
||||
[config] bool enable_send_metadata;
|
||||
|
||||
[no_user_modify] bool connected;
|
||||
[no_user_modify] bool should_connect {
|
||||
default {{{ return true; }}}
|
||||
[config] double disconnect_timeout {
|
||||
default {{{ return 10; }}}
|
||||
};
|
||||
[config] bool enable_ha {
|
||||
default {{{ return false; }}}
|
||||
|
|
|
|||
|
|
@ -2,6 +2,7 @@
|
|||
// SPDX-License-Identifier: GPL-2.0-or-later
|
||||
|
||||
#include "perfdata/influxdbcommonwriter.hpp"
|
||||
#include "base/defer.hpp"
|
||||
#include "perfdata/influxdbcommonwriter-ti.cpp"
|
||||
#include "remote/url.hpp"
|
||||
#include "icinga/service.hpp"
|
||||
|
|
@ -9,36 +10,15 @@
|
|||
#include "icinga/icingaapplication.hpp"
|
||||
#include "icinga/checkcommand.hpp"
|
||||
#include "base/application.hpp"
|
||||
#include "base/defer.hpp"
|
||||
#include "base/io-engine.hpp"
|
||||
#include "base/tcpsocket.hpp"
|
||||
#include "base/configtype.hpp"
|
||||
#include "base/objectlock.hpp"
|
||||
#include "base/logger.hpp"
|
||||
#include "base/convert.hpp"
|
||||
#include "base/utility.hpp"
|
||||
#include "base/stream.hpp"
|
||||
#include "base/json.hpp"
|
||||
#include "base/networkstream.hpp"
|
||||
#include "base/exception.hpp"
|
||||
#include "base/statsfunction.hpp"
|
||||
#include "base/tlsutility.hpp"
|
||||
#include <boost/algorithm/string.hpp>
|
||||
#include <boost/algorithm/string/replace.hpp>
|
||||
#include <boost/asio/ssl/context.hpp>
|
||||
#include <boost/beast/core/flat_buffer.hpp>
|
||||
#include <boost/beast/http/field.hpp>
|
||||
#include <boost/beast/http/message.hpp>
|
||||
#include <boost/beast/http/parser.hpp>
|
||||
#include <boost/beast/http/read.hpp>
|
||||
#include <boost/beast/http/status.hpp>
|
||||
#include <boost/beast/http/string_body.hpp>
|
||||
#include <boost/beast/http/verb.hpp>
|
||||
#include <boost/beast/http/write.hpp>
|
||||
#include <boost/math/special_functions/fpclassify.hpp>
|
||||
#include <boost/regex.hpp>
|
||||
#include <boost/scoped_array.hpp>
|
||||
#include <memory>
|
||||
#include <string>
|
||||
#include <utility>
|
||||
|
||||
|
|
@ -80,6 +60,21 @@ void InfluxdbCommonWriter::OnConfigLoaded()
|
|||
}
|
||||
}
|
||||
|
||||
void InfluxdbCommonWriter::Start(bool runtimeCreated)
|
||||
{
|
||||
ObjectImpl::Start(runtimeCreated);
|
||||
|
||||
if (GetSslEnable()) {
|
||||
try {
|
||||
m_SslContext = MakeAsioSslContext(GetSslCert(), GetSslKey(), GetSslCaCert());
|
||||
} catch (const std::exception& ex) {
|
||||
Log(LogCritical, GetReflectionType()->GetName())
|
||||
<< "Unable to create SSL context: " << ex.what();
|
||||
throw;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
void InfluxdbCommonWriter::Resume()
|
||||
{
|
||||
ObjectImpl<InfluxdbCommonWriter>::Resume();
|
||||
|
|
@ -97,6 +92,8 @@ void InfluxdbCommonWriter::Resume()
|
|||
m_FlushTimer->Start();
|
||||
m_FlushTimer->Reschedule(0);
|
||||
|
||||
m_Connection = new PerfdataWriterConnection{this, GetHost(), GetPort(), m_SslContext, !GetSslInsecureNoverify()};
|
||||
|
||||
/* Register for new metrics. */
|
||||
m_HandleCheckResults = Checkable::OnNewCheckResult.connect([this](const Checkable::Ptr& checkable,
|
||||
const CheckResult::Ptr& cr, const MessageOrigin::Ptr&) {
|
||||
|
|
@ -114,7 +111,15 @@ void InfluxdbCommonWriter::Pause()
|
|||
<< "Processing pending tasks and flushing data buffers.";
|
||||
|
||||
m_FlushTimer->Stop(true);
|
||||
m_WorkQueue.Enqueue([this]() { FlushWQ(); }, PriorityLow);
|
||||
|
||||
std::promise<void> queueDonePromise;
|
||||
m_WorkQueue.Enqueue([&]() {
|
||||
FlushWQ();
|
||||
queueDonePromise.set_value();
|
||||
}, PriorityLow);
|
||||
|
||||
auto timeout = std::chrono::duration<double>{GetDisconnectTimeout()};
|
||||
m_Connection->CancelAfterTimeout(queueDonePromise.get_future(), timeout);
|
||||
|
||||
/* Wait for the flush to complete, implicitly waits for all WQ tasks enqueued prior to pausing. */
|
||||
m_WorkQueue.Join();
|
||||
|
|
@ -136,68 +141,6 @@ void InfluxdbCommonWriter::ExceptionHandler(boost::exception_ptr exp)
|
|||
|
||||
Log(LogDebug, GetReflectionType()->GetName())
|
||||
<< "Exception during InfluxDB operation: " << DiagnosticInformation(std::move(exp));
|
||||
|
||||
//TODO: Close the connection, if we keep it open.
|
||||
}
|
||||
|
||||
OptionalTlsStream InfluxdbCommonWriter::Connect()
|
||||
{
|
||||
Log(LogNotice, GetReflectionType()->GetName())
|
||||
<< "Reconnecting to InfluxDB on host '" << GetHost() << "' port '" << GetPort() << "'.";
|
||||
|
||||
OptionalTlsStream stream;
|
||||
bool ssl = GetSslEnable();
|
||||
|
||||
if (ssl) {
|
||||
Shared<boost::asio::ssl::context>::Ptr sslContext;
|
||||
|
||||
try {
|
||||
sslContext = MakeAsioSslContext(GetSslCert(), GetSslKey(), GetSslCaCert());
|
||||
} catch (const std::exception& ex) {
|
||||
Log(LogWarning, GetReflectionType()->GetName())
|
||||
<< "Unable to create SSL context.";
|
||||
throw;
|
||||
}
|
||||
|
||||
stream.first = Shared<AsioTlsStream>::Make(IoEngine::Get().GetIoContext(), *sslContext, GetHost());
|
||||
|
||||
} else {
|
||||
stream.second = Shared<AsioTcpStream>::Make(IoEngine::Get().GetIoContext());
|
||||
}
|
||||
|
||||
try {
|
||||
icinga::Connect(ssl ? stream.first->lowest_layer() : stream.second->lowest_layer(), GetHost(), GetPort());
|
||||
} catch (const std::exception& ex) {
|
||||
Log(LogWarning, GetReflectionType()->GetName())
|
||||
<< "Can't connect to InfluxDB on host '" << GetHost() << "' port '" << GetPort() << "'.";
|
||||
throw;
|
||||
}
|
||||
|
||||
if (ssl) {
|
||||
auto& tlsStream (stream.first->next_layer());
|
||||
|
||||
try {
|
||||
tlsStream.handshake(tlsStream.client);
|
||||
} catch (const std::exception& ex) {
|
||||
Log(LogWarning, GetReflectionType()->GetName())
|
||||
<< "TLS handshake with host '" << GetHost() << "' failed.";
|
||||
throw;
|
||||
}
|
||||
|
||||
if (!GetSslInsecureNoverify()) {
|
||||
if (!tlsStream.GetPeerCertificate()) {
|
||||
BOOST_THROW_EXCEPTION(std::runtime_error("InfluxDB didn't present any TLS certificate."));
|
||||
}
|
||||
|
||||
if (!tlsStream.IsVerifyOK()) {
|
||||
BOOST_THROW_EXCEPTION(std::runtime_error(
|
||||
"TLS certificate validation failed: " + std::string(tlsStream.GetVerifyError())
|
||||
));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return stream;
|
||||
}
|
||||
|
||||
void InfluxdbCommonWriter::CheckResultHandler(const Checkable::Ptr& checkable, const CheckResult::Ptr& cr)
|
||||
|
|
@ -261,6 +204,10 @@ void InfluxdbCommonWriter::CheckResultHandler(const Checkable::Ptr& checkable, c
|
|||
}
|
||||
|
||||
m_WorkQueue.Enqueue([this, checkable, cr, tmpl = std::move(tmpl), metadataFields = std::move(fields)]() {
|
||||
if (m_Connection->IsStopped()) {
|
||||
return;
|
||||
}
|
||||
|
||||
CONTEXT("Processing check result for '" << checkable->GetName() << "'");
|
||||
|
||||
double ts = cr->GetExecutionEnd();
|
||||
|
|
@ -411,19 +358,19 @@ void InfluxdbCommonWriter::SendMetric(const Checkable::Ptr& checkable, const Dic
|
|||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Queues a Flush on the work-queue and restarts the timer.
|
||||
*/
|
||||
void InfluxdbCommonWriter::FlushTimeout()
|
||||
{
|
||||
m_WorkQueue.Enqueue([this]() { FlushTimeoutWQ(); }, PriorityHigh);
|
||||
}
|
||||
if (m_FlushTimerInQueue.exchange(true, std::memory_order_relaxed)) {
|
||||
return;
|
||||
}
|
||||
|
||||
void InfluxdbCommonWriter::FlushTimeoutWQ()
|
||||
{
|
||||
AssertOnWorkQueue();
|
||||
|
||||
Log(LogDebug, GetReflectionType()->GetName())
|
||||
<< "Timer expired writing " << m_DataBuffer.size() << " data points";
|
||||
|
||||
FlushWQ();
|
||||
m_WorkQueue.Enqueue([&]() {
|
||||
Defer resetFlushTimer{[&]() { m_FlushTimerInQueue.store(false, std::memory_order_relaxed); }};
|
||||
FlushWQ();
|
||||
});
|
||||
}
|
||||
|
||||
void InfluxdbCommonWriter::FlushWQ()
|
||||
|
|
@ -444,55 +391,16 @@ void InfluxdbCommonWriter::FlushWQ()
|
|||
m_DataBuffer.clear();
|
||||
m_DataBufferSize = 0;
|
||||
|
||||
OptionalTlsStream stream;
|
||||
|
||||
try {
|
||||
stream = Connect();
|
||||
} catch (const std::exception& ex) {
|
||||
Log(LogWarning, GetReflectionType()->GetName())
|
||||
<< "Flush failed, cannot connect to InfluxDB: " << DiagnosticInformation(ex, false);
|
||||
return;
|
||||
}
|
||||
|
||||
Defer s ([&stream]() {
|
||||
if (stream.first) {
|
||||
stream.first->next_layer().shutdown();
|
||||
}
|
||||
});
|
||||
|
||||
auto request (AssembleRequest(std::move(body)));
|
||||
|
||||
decltype(m_Connection->Send(request)) response;
|
||||
try {
|
||||
if (stream.first) {
|
||||
http::write(*stream.first, request);
|
||||
stream.first->flush();
|
||||
} else {
|
||||
http::write(*stream.second, request);
|
||||
stream.second->flush();
|
||||
}
|
||||
} catch (const std::exception& ex) {
|
||||
Log(LogWarning, GetReflectionType()->GetName())
|
||||
<< "Cannot write to TCP socket on host '" << GetHost() << "' port '" << GetPort() << "'.";
|
||||
throw;
|
||||
response = m_Connection->Send(request);
|
||||
} catch (const PerfdataWriterConnection::Stopped& ex) {
|
||||
Log(LogDebug, GetReflectionType()->GetName()) << ex.what();
|
||||
return;
|
||||
}
|
||||
|
||||
http::parser<false, http::string_body> parser;
|
||||
beast::flat_buffer buf;
|
||||
|
||||
try {
|
||||
if (stream.first) {
|
||||
http::read(*stream.first, buf, parser);
|
||||
} else {
|
||||
http::read(*stream.second, buf, parser);
|
||||
}
|
||||
} catch (const std::exception& ex) {
|
||||
Log(LogWarning, GetReflectionType()->GetName())
|
||||
<< "Failed to parse HTTP response from host '" << GetHost() << "' port '" << GetPort() << "': " << DiagnosticInformation(ex);
|
||||
throw;
|
||||
}
|
||||
|
||||
auto& response (parser.get());
|
||||
|
||||
if (response.result() != http::status::no_content) {
|
||||
Log(LogWarning, GetReflectionType()->GetName())
|
||||
<< "Unexpected response code: " << response.result();
|
||||
|
|
|
|||
|
|
@ -5,18 +5,13 @@
|
|||
#define INFLUXDBCOMMONWRITER_H
|
||||
|
||||
#include "perfdata/influxdbcommonwriter-ti.hpp"
|
||||
#include "icinga/service.hpp"
|
||||
#include "icinga/checkable.hpp"
|
||||
#include "base/configobject.hpp"
|
||||
#include "base/perfdatavalue.hpp"
|
||||
#include "base/tcpsocket.hpp"
|
||||
#include "base/timer.hpp"
|
||||
#include "base/tlsstream.hpp"
|
||||
#include "base/workqueue.hpp"
|
||||
#include "remote/url.hpp"
|
||||
#include <boost/beast/http/message.hpp>
|
||||
#include <boost/beast/http/string_body.hpp>
|
||||
#include "perfdata/perfdatawriterconnection.hpp"
|
||||
#include <atomic>
|
||||
#include <fstream>
|
||||
|
||||
namespace icinga
|
||||
{
|
||||
|
|
@ -39,6 +34,7 @@ public:
|
|||
|
||||
protected:
|
||||
void OnConfigLoaded() override;
|
||||
void Start(bool runtimeCreated) override;
|
||||
void Resume() override;
|
||||
void Pause() override;
|
||||
|
||||
|
|
@ -50,22 +46,22 @@ protected:
|
|||
private:
|
||||
boost::signals2::connection m_HandleCheckResults;
|
||||
Timer::Ptr m_FlushTimer;
|
||||
std::atomic_bool m_FlushTimerInQueue{false};
|
||||
WorkQueue m_WorkQueue{10000000, 1};
|
||||
std::vector<String> m_DataBuffer;
|
||||
std::atomic_size_t m_DataBufferSize{0};
|
||||
Shared<boost::asio::ssl::context>::Ptr m_SslContext;
|
||||
PerfdataWriterConnection::Ptr m_Connection;
|
||||
|
||||
void CheckResultHandler(const Checkable::Ptr& checkable, const CheckResult::Ptr& cr);
|
||||
void SendMetric(const Checkable::Ptr& checkable, const Dictionary::Ptr& tmpl,
|
||||
const String& label, const Dictionary::Ptr& fields, double ts);
|
||||
void FlushTimeout();
|
||||
void FlushTimeoutWQ();
|
||||
void FlushWQ();
|
||||
|
||||
static String EscapeKeyOrTagValue(const String& str);
|
||||
static String EscapeValue(const Value& value);
|
||||
|
||||
OptionalTlsStream Connect();
|
||||
|
||||
void AssertOnWorkQueue();
|
||||
|
||||
void ExceptionHandler(boost::exception_ptr exp);
|
||||
|
|
|
|||
|
|
@ -52,13 +52,16 @@ abstract class InfluxdbCommonWriter : ConfigObject
|
|||
});
|
||||
}}}
|
||||
};
|
||||
[config] double disconnect_timeout {
|
||||
default {{{ return 10; }}}
|
||||
};
|
||||
[config] bool enable_send_thresholds {
|
||||
default {{{ return false; }}}
|
||||
};
|
||||
[config] bool enable_send_metadata {
|
||||
default {{{ return false; }}}
|
||||
};
|
||||
[config] int flush_interval {
|
||||
[config] double flush_interval {
|
||||
default {{{ return 10; }}}
|
||||
};
|
||||
[config] int flush_threshold {
|
||||
|
|
|
|||
|
|
@ -7,17 +7,12 @@
|
|||
#include "icinga/checkcommand.hpp"
|
||||
#include "icinga/macroprocessor.hpp"
|
||||
#include "icinga/icingaapplication.hpp"
|
||||
#include "icinga/compatutility.hpp"
|
||||
#include "base/tcpsocket.hpp"
|
||||
#include "base/configtype.hpp"
|
||||
#include "base/objectlock.hpp"
|
||||
#include "base/logger.hpp"
|
||||
#include "base/convert.hpp"
|
||||
#include "base/utility.hpp"
|
||||
#include "base/perfdatavalue.hpp"
|
||||
#include "base/application.hpp"
|
||||
#include "base/stream.hpp"
|
||||
#include "base/networkstream.hpp"
|
||||
#include "base/exception.hpp"
|
||||
#include "base/statsfunction.hpp"
|
||||
#include <boost/algorithm/string.hpp>
|
||||
|
|
@ -64,7 +59,7 @@ void OpenTsdbWriter::StatsFunc(const Dictionary::Ptr& status, const Array::Ptr&
|
|||
nodes.emplace_back(
|
||||
opentsdbwriter->GetName(),
|
||||
new Dictionary({
|
||||
{"connected", opentsdbwriter->GetConnected()},
|
||||
{ "connected", opentsdbwriter->m_Connection->IsConnected() },
|
||||
{"work_queue_items", workQueueItems},
|
||||
{"work_queue_item_rate", workQueueItemRate}
|
||||
}
|
||||
|
|
@ -95,11 +90,7 @@ void OpenTsdbWriter::Resume()
|
|||
|
||||
ReadConfigTemplate();
|
||||
|
||||
m_ReconnectTimer = Timer::Create();
|
||||
m_ReconnectTimer->SetInterval(10);
|
||||
m_ReconnectTimer->OnTimerExpired.connect([this](const Timer * const&) { ReconnectTimerHandler(); });
|
||||
m_ReconnectTimer->Start();
|
||||
m_ReconnectTimer->Reschedule(0);
|
||||
m_Connection = new PerfdataWriterConnection{this, GetHost(), GetPort()};
|
||||
|
||||
m_HandleCheckResults = Service::OnNewCheckResult.connect([this](const Checkable::Ptr& checkable, const CheckResult::Ptr& cr, const MessageOrigin::Ptr&) {
|
||||
CheckResultHandler(checkable, cr);
|
||||
|
|
@ -112,62 +103,24 @@ void OpenTsdbWriter::Resume()
|
|||
void OpenTsdbWriter::Pause()
|
||||
{
|
||||
m_HandleCheckResults.disconnect();
|
||||
m_ReconnectTimer->Stop(true);
|
||||
|
||||
std::promise<void> queueDonePromise;
|
||||
|
||||
m_WorkQueue.Enqueue([&]() {
|
||||
queueDonePromise.set_value();
|
||||
}, PriorityLow);
|
||||
|
||||
auto timeout = std::chrono::duration<double>{GetDisconnectTimeout()};
|
||||
m_Connection->CancelAfterTimeout(queueDonePromise.get_future(), timeout);
|
||||
|
||||
m_WorkQueue.Join();
|
||||
|
||||
Log(LogInformation, "OpentsdbWriter")
|
||||
<< "'" << GetName() << "' paused.";
|
||||
|
||||
m_Stream->close();
|
||||
|
||||
SetConnected(false);
|
||||
|
||||
ObjectImpl<OpenTsdbWriter>::Pause();
|
||||
}
|
||||
|
||||
/**
|
||||
* Reconnect handler called by the timer.
|
||||
* Handles TLS
|
||||
*/
|
||||
void OpenTsdbWriter::ReconnectTimerHandler()
|
||||
{
|
||||
if (IsPaused())
|
||||
return;
|
||||
|
||||
SetShouldConnect(true);
|
||||
|
||||
if (GetConnected())
|
||||
return;
|
||||
|
||||
double startTime = Utility::GetTime();
|
||||
|
||||
Log(LogNotice, "OpenTsdbWriter")
|
||||
<< "Reconnecting to OpenTSDB TSD on host '" << GetHost() << "' port '" << GetPort() << "'.";
|
||||
|
||||
/*
|
||||
* We're using telnet as input method. Future PRs may change this into using the HTTP API.
|
||||
* http://opentsdb.net/docs/build/html/user_guide/writing/index.html#telnet
|
||||
*/
|
||||
m_Stream = Shared<AsioTcpStream>::Make(IoEngine::Get().GetIoContext());
|
||||
|
||||
try {
|
||||
icinga::Connect(m_Stream->lowest_layer(), GetHost(), GetPort());
|
||||
} catch (const std::exception& ex) {
|
||||
Log(LogWarning, "OpenTsdbWriter")
|
||||
<< "Can't connect to OpenTSDB on host '" << GetHost() << "' port '" << GetPort() << "'.";
|
||||
|
||||
SetConnected(false);
|
||||
|
||||
return;
|
||||
}
|
||||
|
||||
SetConnected(true);
|
||||
|
||||
Log(LogInformation, "OpenTsdbWriter")
|
||||
<< "Finished reconnecting to OpenTSDB in " << std::setw(2) << Utility::GetTime() - startTime << " second(s).";
|
||||
}
|
||||
|
||||
/**
|
||||
* Registered check result handler processing data.
|
||||
* Calculates tags from the config.
|
||||
|
|
@ -300,7 +253,11 @@ void OpenTsdbWriter::CheckResultHandler(const Checkable::Ptr& checkable, const C
|
|||
metadata.emplace_back("acknowledgement", checkable->GetAcknowledgement());
|
||||
|
||||
m_WorkQueue.Enqueue(
|
||||
[this, checkable, service, cr, metric = std::move(metric), tags = std::move(tags), metadata = std::move(metadata), ts]() mutable {
|
||||
[this, checkable, service, cr, metric = std::move(metric), tags = std::move(tags), metadata = std::move(metadata)]() mutable {
|
||||
if (m_Connection->IsStopped()) {
|
||||
return;
|
||||
}
|
||||
|
||||
double ts = cr->GetExecutionEnd();
|
||||
|
||||
for (auto& [name, val] : metadata) {
|
||||
|
|
@ -437,18 +394,14 @@ void OpenTsdbWriter::SendMsgBuffer()
|
|||
{
|
||||
ASSERT(m_WorkQueue.IsWorkerThread());
|
||||
|
||||
if (!GetConnected())
|
||||
return;
|
||||
|
||||
Log(LogDebug, "OpenTsdbWriter")
|
||||
<< "Flushing data buffer to OpenTsdb.";
|
||||
|
||||
try {
|
||||
boost::asio::write(*m_Stream, boost::asio::buffer(std::exchange(m_MsgBuf, std::string{})));
|
||||
m_Stream->flush();
|
||||
} catch (const std::exception& ex) {
|
||||
Log(LogCritical, "OpenTsdbWriter")
|
||||
<< "Cannot write to TCP socket on host '" << GetHost() << "' port '" << GetPort() << "'.";
|
||||
m_Connection->Send(boost::asio::buffer(std::exchange(m_MsgBuf, std::string{})));
|
||||
} catch (const PerfdataWriterConnection::Stopped& ex) {
|
||||
Log(LogDebug, "OpenTsdbWriter") << ex.what();
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -5,11 +5,9 @@
|
|||
#define OPENTSDBWRITER_H
|
||||
|
||||
#include "perfdata/opentsdbwriter-ti.hpp"
|
||||
#include "icinga/service.hpp"
|
||||
#include "icinga/checkable.hpp"
|
||||
#include "base/configobject.hpp"
|
||||
#include "base/tcpsocket.hpp"
|
||||
#include "base/timer.hpp"
|
||||
#include <fstream>
|
||||
#include "perfdata/perfdatawriterconnection.hpp"
|
||||
|
||||
namespace icinga
|
||||
{
|
||||
|
|
@ -38,10 +36,9 @@ protected:
|
|||
private:
|
||||
WorkQueue m_WorkQueue{10000000, 1};
|
||||
std::string m_MsgBuf;
|
||||
Shared<AsioTcpStream>::Ptr m_Stream;
|
||||
PerfdataWriterConnection::Ptr m_Connection;
|
||||
|
||||
boost::signals2::connection m_HandleCheckResults;
|
||||
Timer::Ptr m_ReconnectTimer;
|
||||
|
||||
Dictionary::Ptr m_ServiceConfigTemplate;
|
||||
Dictionary::Ptr m_HostConfigTemplate;
|
||||
|
|
@ -55,8 +52,6 @@ private:
|
|||
static String EscapeTag(const String& str);
|
||||
static String EscapeMetric(const String& str);
|
||||
|
||||
void ReconnectTimerHandler();
|
||||
|
||||
void ReadConfigTemplate();
|
||||
};
|
||||
|
||||
|
|
|
|||
|
|
@ -31,10 +31,8 @@ class OpenTsdbWriter : ConfigObject
|
|||
[config] bool enable_generic_metrics {
|
||||
default {{{ return false; }}}
|
||||
};
|
||||
|
||||
[no_user_modify] bool connected;
|
||||
[no_user_modify] bool should_connect {
|
||||
default {{{ return true; }}}
|
||||
[config] double disconnect_timeout {
|
||||
default {{{ return 10; }}}
|
||||
};
|
||||
};
|
||||
|
||||
|
|
|
|||
Loading…
Reference in a new issue