mirror of
https://github.com/Icinga/icinga2.git
synced 2026-05-28 04:12:13 -04:00
Merge e52320fbeb into f63bbec4ab
This commit is contained in:
commit
3ea0d82b78
10 changed files with 157 additions and 31 deletions
|
|
@ -1317,6 +1317,8 @@ Configuration Attributes:
|
|||
diconnect\_timeout | Duration | **Optional.** Timeout to wait for any outstanding data to be flushed to GELF before disconnecting. Defaults to `10s`.
|
||||
source | String | **Optional.** Source name for this instance. Defaults to `icinga2`.
|
||||
enable\_send\_perfdata | Boolean | **Optional.** Enable performance data for 'CHECK RESULT' events.
|
||||
flush\_interval | Duration | **Optional.** How long to buffer data points before sending. Defaults to `15s`.
|
||||
flush\_threshold | Number | **Optional.** How many bytes to buffer before forcing a flush to the backend. Defaults to `2MiB`.
|
||||
enable\_ha | Boolean | **Optional.** Enable the high availability functionality. Only valid in a [cluster setup](06-distributed-monitoring.md#distributed-monitoring-high-availability-features). Defaults to `false`.
|
||||
enable\_tls | Boolean | **Optional.** Whether to use a TLS stream. Defaults to `false`.
|
||||
insecure\_noverify | Boolean | **Optional.** Disable TLS peer verification.
|
||||
|
|
@ -1350,6 +1352,8 @@ Configuration Attributes:
|
|||
service\_name\_template | String | **Optional.** Metric prefix for service name. Defaults to `icinga2.$host.name$.services.$service.name$.$service.check_command$`.
|
||||
enable\_send\_thresholds | Boolean | **Optional.** Send additional threshold metrics. Defaults to `false`.
|
||||
enable\_send\_metadata | Boolean | **Optional.** Send additional metadata metrics. Defaults to `false`.
|
||||
flush\_interval | Duration | **Optional.** How long to buffer data points before sending. Defaults to `15s`.
|
||||
flush\_threshold | Number | **Optional.** How many bytes to buffer before forcing a flush to the backend. Defaults to `2MiB`.
|
||||
enable\_ha | Boolean | **Optional.** Enable the high availability functionality. Only valid in a [cluster setup](06-distributed-monitoring.md#distributed-monitoring-high-availability-features). Defaults to `false`.
|
||||
|
||||
Additional usage examples can be found [here](14-features.md#graphite-carbon-cache-writer).
|
||||
|
|
@ -1865,6 +1869,8 @@ Configuration Attributes:
|
|||
--------------------------|-----------------------|----------------------------------
|
||||
host | String | **Optional.** OpenTSDB host address. Defaults to `127.0.0.1`.
|
||||
port | Number | **Optional.** OpenTSDB port. Defaults to `4242`.
|
||||
flush\_interval | Duration | **Optional.** How long to buffer data points before sending. Defaults to `15s`.
|
||||
flush\_threshold | Number | **Optional.** How many bytes to buffer before forcing a flush to the backend. Defaults to `2MiB`.
|
||||
diconnect\_timeout | Duration | **Optional.** Timeout to wait for any outstanding data to be flushed to OpenTSDB before disconnecting. Defaults to `10s`.
|
||||
enable\_ha | Boolean | **Optional.** Enable the high availability functionality. Only valid in a [cluster setup](06-distributed-monitoring.md#distributed-monitoring-high-availability-features). Defaults to `false`.
|
||||
enable_generic_metrics | Boolean | **Optional.** Re-use metric names to store different perfdata values for a particular check. Use tags to distinguish perfdata instead of metric name. Defaults to `false`.
|
||||
|
|
|
|||
|
|
@ -2,6 +2,7 @@
|
|||
// SPDX-License-Identifier: GPL-2.0-or-later
|
||||
|
||||
#include "perfdata/gelfwriter.hpp"
|
||||
#include "base/defer.hpp"
|
||||
#include "perfdata/gelfwriter-ti.cpp"
|
||||
#include "icinga/service.hpp"
|
||||
#include "icinga/notification.hpp"
|
||||
|
|
@ -90,6 +91,13 @@ void GelfWriter::Resume()
|
|||
/* Register exception handler for WQ tasks. */
|
||||
m_WorkQueue.SetExceptionCallback([this](boost::exception_ptr exp) { ExceptionHandler(std::move(exp)); });
|
||||
|
||||
/* Setup timer for periodically flushing m_DataBuffer */
|
||||
m_FlushTimer = Timer::Create();
|
||||
m_FlushTimer->SetInterval(GetFlushInterval());
|
||||
m_FlushTimer->OnTimerExpired.connect([this](const Timer * const&) { FlushTimeout(); });
|
||||
m_FlushTimer->Start();
|
||||
m_FlushTimer->Reschedule(0);
|
||||
|
||||
m_Connection = new PerfdataWriterConnection{this, GetHost(), GetPort(), m_SslContext, !GetInsecureNoverify()};
|
||||
|
||||
/* Register event handlers. */
|
||||
|
|
@ -115,6 +123,8 @@ void GelfWriter::Pause()
|
|||
m_HandleNotifications.disconnect();
|
||||
m_HandleStateChanges.disconnect();
|
||||
|
||||
m_FlushTimer->Stop(true);
|
||||
|
||||
std::promise<void> queueDonePromise;
|
||||
|
||||
m_WorkQueue.Enqueue([&]() {
|
||||
|
|
@ -360,19 +370,38 @@ void GelfWriter::SendLogMessage(const Checkable::Ptr& checkable, const String& g
|
|||
{
|
||||
AssertOnWorkQueue();
|
||||
|
||||
std::ostringstream msgbuf;
|
||||
msgbuf << gelfMessage;
|
||||
msgbuf << '\0';
|
||||
Log(LogDebug, "GelfWriter")
|
||||
<< "Checkable '" << checkable->GetName() << "' sending message '" << gelfMessage << "'.";
|
||||
|
||||
auto log = msgbuf.str();
|
||||
m_MsgBuf.GetData().reserve(m_MsgBuf.GetLength() + gelfMessage.GetLength() + 1);
|
||||
m_MsgBuf += gelfMessage;
|
||||
m_MsgBuf += '\0';
|
||||
|
||||
try {
|
||||
Log(LogDebug, "GelfWriter")
|
||||
<< "Checkable '" << checkable->GetName() << "' sending message '" << log << "'.";
|
||||
|
||||
m_Connection->Send(boost::asio::const_buffer{log.data(), log.length()});
|
||||
} catch (const PerfdataWriterConnection::Stopped& ex) {
|
||||
Log(LogDebug, "GelfWriter") << ex.what();
|
||||
return;
|
||||
if (GetFlushThreshold() <= m_MsgBuf.GetLength()) {
|
||||
Flush();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Queues a Flush on the work-queue if none is queued yet.
|
||||
*/
|
||||
void GelfWriter::FlushTimeout()
|
||||
{
|
||||
if (m_FlushTimerInQueue.exchange(true, std::memory_order_relaxed)) {
|
||||
return;
|
||||
}
|
||||
|
||||
m_WorkQueue.Enqueue([&]() {
|
||||
Defer resetFlushTimer{[&]() { m_FlushTimerInQueue.store(false, std::memory_order_relaxed); }};
|
||||
Flush();
|
||||
});
|
||||
}
|
||||
|
||||
void GelfWriter::Flush()
|
||||
{
|
||||
try {
|
||||
m_Connection->Send(boost::asio::buffer(std::exchange(m_MsgBuf.GetData(), {})));
|
||||
} catch (const PerfdataWriterConnection::Stopped& ex) {
|
||||
Log(LogDebug, "GelfWriter") << ex.what();
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -34,6 +34,9 @@ protected:
|
|||
|
||||
private:
|
||||
PerfdataWriterConnection::Ptr m_Connection;
|
||||
Timer::Ptr m_FlushTimer;
|
||||
std::atomic_bool m_FlushTimerInQueue{false};
|
||||
String m_MsgBuf;
|
||||
WorkQueue m_WorkQueue{10000000, 1};
|
||||
Shared<boost::asio::ssl::context>::Ptr m_SslContext;
|
||||
|
||||
|
|
@ -46,6 +49,8 @@ private:
|
|||
|
||||
String ComposeGelfMessage(const Dictionary::Ptr& fields, const String& source, double ts);
|
||||
void SendLogMessage(const Checkable::Ptr& checkable, const String& gelfMessage);
|
||||
void FlushTimeout();
|
||||
void Flush();
|
||||
|
||||
void AssertOnWorkQueue();
|
||||
|
||||
|
|
|
|||
|
|
@ -24,6 +24,12 @@ class GelfWriter : ConfigObject
|
|||
[config] bool enable_send_perfdata {
|
||||
default {{{ return false; }}}
|
||||
};
|
||||
[config] int flush_interval {
|
||||
default {{{ return 15; }}}
|
||||
};
|
||||
[config] std::size_t flush_threshold {
|
||||
default {{{ return 2 * 1024 * 1024; }}}
|
||||
};
|
||||
|
||||
[config] double disconnect_timeout {
|
||||
default {{{ return 10; }}}
|
||||
|
|
|
|||
|
|
@ -2,6 +2,7 @@
|
|||
// SPDX-License-Identifier: GPL-2.0-or-later
|
||||
|
||||
#include "perfdata/graphitewriter.hpp"
|
||||
#include "base/defer.hpp"
|
||||
#include "perfdata/graphitewriter-ti.cpp"
|
||||
#include "icinga/service.hpp"
|
||||
#include "icinga/checkcommand.hpp"
|
||||
|
|
@ -85,6 +86,13 @@ void GraphiteWriter::Resume()
|
|||
/* Register exception handler for WQ tasks. */
|
||||
m_WorkQueue.SetExceptionCallback([this](boost::exception_ptr exp) { ExceptionHandler(std::move(exp)); });
|
||||
|
||||
/* Setup timer for periodically flushing m_DataBuffer */
|
||||
m_FlushTimer = Timer::Create();
|
||||
m_FlushTimer->SetInterval(GetFlushInterval());
|
||||
m_FlushTimer->OnTimerExpired.connect([this](const Timer * const&) { FlushTimeout(); });
|
||||
m_FlushTimer->Start();
|
||||
m_FlushTimer->Reschedule(0);
|
||||
|
||||
m_Connection = new PerfdataWriterConnection{this, GetHost(), GetPort()};
|
||||
|
||||
/* Register event handlers. */
|
||||
|
|
@ -101,6 +109,8 @@ void GraphiteWriter::Pause()
|
|||
{
|
||||
m_HandleCheckResults.disconnect();
|
||||
|
||||
m_FlushTimer->Stop(true);
|
||||
|
||||
std::promise<void> queueDonePromise;
|
||||
|
||||
m_WorkQueue.Enqueue([&]() {
|
||||
|
|
@ -199,10 +209,10 @@ void GraphiteWriter::CheckResultHandler(const Checkable::Ptr& checkable, const C
|
|||
CONTEXT("Processing check result for '" << checkable->GetName() << "'");
|
||||
|
||||
for (auto& [name, val] : metadata) {
|
||||
SendMetric(checkable, prefix + ".metadata", name, val, cr->GetExecutionEnd());
|
||||
AddMetric(checkable, prefix + ".metadata", name, val, cr->GetExecutionEnd());
|
||||
}
|
||||
|
||||
SendPerfdata(checkable, prefix + ".perfdata", cr);
|
||||
AddPerfdata(checkable, prefix + ".perfdata", cr);
|
||||
});
|
||||
}
|
||||
|
||||
|
|
@ -213,7 +223,7 @@ void GraphiteWriter::CheckResultHandler(const Checkable::Ptr& checkable, const C
|
|||
* @param prefix Metric prefix string
|
||||
* @param cr Check result including performance data
|
||||
*/
|
||||
void GraphiteWriter::SendPerfdata(const Checkable::Ptr& checkable, const String& prefix, const CheckResult::Ptr& cr)
|
||||
void GraphiteWriter::AddPerfdata(const Checkable::Ptr& checkable, const String& prefix, const CheckResult::Ptr& cr)
|
||||
{
|
||||
AssertOnWorkQueue();
|
||||
|
||||
|
|
@ -245,17 +255,17 @@ void GraphiteWriter::SendPerfdata(const Checkable::Ptr& checkable, const String&
|
|||
String escapedKey = EscapeMetricLabel(pdv->GetLabel());
|
||||
double ts = cr->GetExecutionEnd();
|
||||
|
||||
SendMetric(checkable, prefix, escapedKey + ".value", pdv->GetValue(), ts);
|
||||
AddMetric(checkable, prefix, escapedKey + ".value", pdv->GetValue(), ts);
|
||||
|
||||
if (GetEnableSendThresholds()) {
|
||||
if (!pdv->GetCrit().IsEmpty())
|
||||
SendMetric(checkable, prefix, escapedKey + ".crit", pdv->GetCrit(), ts);
|
||||
AddMetric(checkable, prefix, escapedKey + ".crit", pdv->GetCrit(), ts);
|
||||
if (!pdv->GetWarn().IsEmpty())
|
||||
SendMetric(checkable, prefix, escapedKey + ".warn", pdv->GetWarn(), ts);
|
||||
AddMetric(checkable, prefix, escapedKey + ".warn", pdv->GetWarn(), ts);
|
||||
if (!pdv->GetMin().IsEmpty())
|
||||
SendMetric(checkable, prefix, escapedKey + ".min", pdv->GetMin(), ts);
|
||||
AddMetric(checkable, prefix, escapedKey + ".min", pdv->GetMin(), ts);
|
||||
if (!pdv->GetMax().IsEmpty())
|
||||
SendMetric(checkable, prefix, escapedKey + ".max", pdv->GetMax(), ts);
|
||||
AddMetric(checkable, prefix, escapedKey + ".max", pdv->GetMax(), ts);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
@ -269,7 +279,7 @@ void GraphiteWriter::SendPerfdata(const Checkable::Ptr& checkable, const String&
|
|||
* @param value Metric value
|
||||
* @param ts Timestamp when the check result was created
|
||||
*/
|
||||
void GraphiteWriter::SendMetric(const Checkable::Ptr& checkable, const String& prefix, const String& name, double value, double ts)
|
||||
void GraphiteWriter::AddMetric(const Checkable::Ptr& checkable, const String& prefix, const String& name, double value, double ts)
|
||||
{
|
||||
AssertOnWorkQueue();
|
||||
|
||||
|
|
@ -284,11 +294,34 @@ void GraphiteWriter::SendMetric(const Checkable::Ptr& checkable, const String& p
|
|||
// do not send \n to debug log
|
||||
msgbuf << "\n";
|
||||
|
||||
m_MsgBuf += std::move(msgbuf).str();
|
||||
|
||||
if (GetFlushThreshold() <= m_MsgBuf.GetLength()) {
|
||||
Flush();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Queues a Flush on the work-queue if none is queued yet.
|
||||
*/
|
||||
void GraphiteWriter::FlushTimeout()
|
||||
{
|
||||
if (m_FlushTimerInQueue.exchange(true, std::memory_order_relaxed)) {
|
||||
return;
|
||||
}
|
||||
|
||||
m_WorkQueue.Enqueue([&]() {
|
||||
Defer resetFlushTimer{[&]() { m_FlushTimerInQueue.store(false, std::memory_order_relaxed); }};
|
||||
Flush();
|
||||
});
|
||||
}
|
||||
|
||||
void GraphiteWriter::Flush()
|
||||
{
|
||||
try {
|
||||
m_Connection->Send(asio::buffer(msgbuf.str()));
|
||||
m_Connection->Send(boost::asio::buffer(std::exchange(m_MsgBuf.GetData(), {})));
|
||||
} catch (const PerfdataWriterConnection::Stopped& ex) {
|
||||
Log(LogDebug, "GraphiteWriter") << ex.what();
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -36,13 +36,18 @@ protected:
|
|||
|
||||
private:
|
||||
PerfdataWriterConnection::Ptr m_Connection;
|
||||
Timer::Ptr m_FlushTimer;
|
||||
std::atomic_bool m_FlushTimerInQueue{false};
|
||||
String m_MsgBuf;
|
||||
WorkQueue m_WorkQueue{10000000, 1};
|
||||
|
||||
boost::signals2::connection m_HandleCheckResults;
|
||||
|
||||
void CheckResultHandler(const Checkable::Ptr& checkable, const CheckResult::Ptr& cr);
|
||||
void SendMetric(const Checkable::Ptr& checkable, const String& prefix, const String& name, double value, double ts);
|
||||
void SendPerfdata(const Checkable::Ptr& checkable, const String& prefix, const CheckResult::Ptr& cr);
|
||||
void AddMetric(const Checkable::Ptr& checkable, const String& prefix, const String& name, double value, double ts);
|
||||
void AddPerfdata(const Checkable::Ptr& checkable, const String& prefix, const CheckResult::Ptr& cr);
|
||||
void FlushTimeout();
|
||||
void Flush();
|
||||
static String EscapeMetric(const String& str);
|
||||
static String EscapeMetricLabel(const String& str);
|
||||
static Value EscapeMacroMetric(const Value& value);
|
||||
|
|
|
|||
|
|
@ -24,8 +24,14 @@ class GraphiteWriter : ConfigObject
|
|||
[config] String service_name_template {
|
||||
default {{{ return "icinga2.$host.name$.services.$service.name$.$service.check_command$"; }}}
|
||||
};
|
||||
[config] bool enable_send_thresholds;
|
||||
[config] bool enable_send_metadata;
|
||||
[config] bool enable_send_thresholds;
|
||||
[config] bool enable_send_metadata;
|
||||
[config] int flush_interval {
|
||||
default {{{ return 15; }}}
|
||||
};
|
||||
[config] std::size_t flush_threshold {
|
||||
default {{{ return 2 * 1024 * 1024; }}}
|
||||
};
|
||||
|
||||
[config] double disconnect_timeout {
|
||||
default {{{ return 10; }}}
|
||||
|
|
|
|||
|
|
@ -2,6 +2,7 @@
|
|||
// SPDX-License-Identifier: GPL-2.0-or-later
|
||||
|
||||
#include "perfdata/opentsdbwriter.hpp"
|
||||
#include "base/defer.hpp"
|
||||
#include "perfdata/opentsdbwriter-ti.cpp"
|
||||
#include "icinga/service.hpp"
|
||||
#include "icinga/checkcommand.hpp"
|
||||
|
|
@ -88,6 +89,13 @@ void OpenTsdbWriter::Resume()
|
|||
<< "Exception during OpenTsdb operation: " << DiagnosticInformation(exp);
|
||||
});
|
||||
|
||||
/* Setup timer for periodically flushing m_DataBuffer */
|
||||
m_FlushTimer = Timer::Create();
|
||||
m_FlushTimer->SetInterval(GetFlushInterval());
|
||||
m_FlushTimer->OnTimerExpired.connect([this](const Timer * const&) { FlushTimeout(); });
|
||||
m_FlushTimer->Start();
|
||||
m_FlushTimer->Reschedule(0);
|
||||
|
||||
ReadConfigTemplate();
|
||||
|
||||
m_Connection = new PerfdataWriterConnection{this, GetHost(), GetPort()};
|
||||
|
|
@ -104,6 +112,8 @@ void OpenTsdbWriter::Pause()
|
|||
{
|
||||
m_HandleCheckResults.disconnect();
|
||||
|
||||
m_FlushTimer->Stop(true);
|
||||
|
||||
std::promise<void> queueDonePromise;
|
||||
|
||||
m_WorkQueue.Enqueue([&]() {
|
||||
|
|
@ -282,7 +292,9 @@ void OpenTsdbWriter::CheckResultHandler(const Checkable::Ptr& checkable, const C
|
|||
AddMetric(checkable, metric + ".latency", tags, cr->CalculateLatency(), ts);
|
||||
AddMetric(checkable, metric + ".execution_time", tags, cr->CalculateExecutionTime(), ts);
|
||||
|
||||
SendMsgBuffer();
|
||||
if (GetFlushThreshold() <= m_MsgBuf.GetLength()) {
|
||||
SendMsgBuffer();
|
||||
}
|
||||
}
|
||||
);
|
||||
}
|
||||
|
|
@ -387,7 +399,22 @@ void OpenTsdbWriter::AddMetric(const Checkable::Ptr& checkable, const String& me
|
|||
|
||||
/* do not send \n to debug log */
|
||||
msgbuf << "\n";
|
||||
m_MsgBuf.append(msgbuf.str());
|
||||
m_MsgBuf += msgbuf.str();
|
||||
}
|
||||
|
||||
/**
|
||||
* Queues a Flush on the work-queue if none is queued yet.
|
||||
*/
|
||||
void OpenTsdbWriter::FlushTimeout()
|
||||
{
|
||||
if (m_FlushTimerInQueue.exchange(true, std::memory_order_relaxed)) {
|
||||
return;
|
||||
}
|
||||
|
||||
m_WorkQueue.Enqueue([&]() {
|
||||
Defer resetFlushTimer{[&]() { m_FlushTimerInQueue.store(false, std::memory_order_relaxed); }};
|
||||
SendMsgBuffer();
|
||||
});
|
||||
}
|
||||
|
||||
void OpenTsdbWriter::SendMsgBuffer()
|
||||
|
|
@ -398,7 +425,7 @@ void OpenTsdbWriter::SendMsgBuffer()
|
|||
<< "Flushing data buffer to OpenTsdb.";
|
||||
|
||||
try {
|
||||
m_Connection->Send(boost::asio::buffer(std::exchange(m_MsgBuf, std::string{})));
|
||||
m_Connection->Send(boost::asio::buffer(std::exchange(m_MsgBuf.GetData(), {})));
|
||||
} catch (const PerfdataWriterConnection::Stopped& ex) {
|
||||
Log(LogDebug, "OpenTsdbWriter") << ex.what();
|
||||
return;
|
||||
|
|
|
|||
|
|
@ -35,7 +35,9 @@ protected:
|
|||
|
||||
private:
|
||||
WorkQueue m_WorkQueue{10000000, 1};
|
||||
std::string m_MsgBuf;
|
||||
Timer::Ptr m_FlushTimer;
|
||||
std::atomic_bool m_FlushTimerInQueue{false};
|
||||
String m_MsgBuf;
|
||||
PerfdataWriterConnection::Ptr m_Connection;
|
||||
|
||||
boost::signals2::connection m_HandleCheckResults;
|
||||
|
|
@ -46,6 +48,7 @@ private:
|
|||
void CheckResultHandler(const Checkable::Ptr& checkable, const CheckResult::Ptr& cr);
|
||||
void AddMetric(const Checkable::Ptr& checkable, const String& metric,
|
||||
const std::map<String, String>& tags, double value, double ts);
|
||||
void FlushTimeout();
|
||||
void SendMsgBuffer();
|
||||
void AddPerfdata(const Checkable::Ptr& checkable, const String& metric,
|
||||
const std::map<String, String>& tags, const CheckResult::Ptr& cr, double ts);
|
||||
|
|
|
|||
|
|
@ -31,6 +31,12 @@ class OpenTsdbWriter : ConfigObject
|
|||
[config] bool enable_generic_metrics {
|
||||
default {{{ return false; }}}
|
||||
};
|
||||
[config] int flush_interval {
|
||||
default {{{ return 15; }}}
|
||||
};
|
||||
[config] std::size_t flush_threshold {
|
||||
default {{{ return 2 * 1024 * 1024; }}}
|
||||
};
|
||||
[config] double disconnect_timeout {
|
||||
default {{{ return 10; }}}
|
||||
};
|
||||
|
|
|
|||
Loading…
Reference in a new issue