mirror of
https://github.com/Icinga/icinga2.git
synced 2026-06-13 18:50:22 -04:00
Add OTLPMetricsWriter
This commit is contained in:
parent
c34e03078a
commit
60fe45cd6e
6 changed files with 584 additions and 1 deletions
41
etc/icinga2/features-available/otlpmetrics.conf
Normal file
41
etc/icinga2/features-available/otlpmetrics.conf
Normal file
|
|
@ -0,0 +1,41 @@
|
|||
/**
|
||||
* The OpenTelemetry Metrics Writer feature allows Icinga 2 to export metrics from performance
|
||||
* data to an OpenTelemetry Collector or compatible backend.
|
||||
*
|
||||
* For more information, see the official documentation:
|
||||
* https://icinga.com/docs/icinga-2/latest/doc/14-features/#otlpmetrics-writer
|
||||
*/
|
||||
object OTLPMetricsWriter "otlp-metrics" {
|
||||
// host = "127.0.0.1"
|
||||
// port = 4318
|
||||
// metrics_endpoint = "/v1/metrics"
|
||||
# Optionally, you can set a namespace to be used as OTel service.namespace attribute for all exported metrics.
|
||||
// service_namespace = "icinga"
|
||||
|
||||
# By default, basic AUTH is disabled. Uncomment and set the following lines to enable it.
|
||||
// basic_auth = {
|
||||
// username = "otel_user"
|
||||
// password = "otel_password"
|
||||
// }
|
||||
|
||||
# These are the default settings used by the OTel writer. Adjust them as needed.
|
||||
# Please refer to the documentation for more details on each option.
|
||||
// enable_ha = false
|
||||
// flush_interval = 15s
|
||||
// flush_threshold = 32*1024*1024
|
||||
# When stopping Icinga 2, this timeout defines how long to wait for any pending OTel
|
||||
# metrics to be sent before disconnecting and discarding them.
|
||||
// disconnect_timeout = 10s
|
||||
|
||||
# Allow the OTLP writer to send the check thresholds as OTel metrics to the configured endpoint.
|
||||
# By default, this is disabled but you can enable it to have the thresholds available in the `state_check.threshold` OTel metric.
|
||||
// enable_send_thresholds = false
|
||||
|
||||
# You can enable TLS encryption by uncommenting and configuring the following options.
|
||||
# By default, the OTel writer uses unencrypted connections (plain HTTP requests).
|
||||
// enable_tls = false
|
||||
// tl_insecure_noverify = false
|
||||
// tls_ca_file = "/path/to/otel/ca.crt"
|
||||
// tls_cert_file = "/path/to/otel/client.crt"
|
||||
// tls_key_file = "/path/to/otel/client.key"
|
||||
}
|
||||
|
|
@ -22,6 +22,13 @@ set(perfdata_SOURCES
|
|||
perfdatawriterconnection.cpp perfdatawriterconnection.hpp
|
||||
)
|
||||
|
||||
if(ICINGA2_WITH_OPENTELEMETRY)
|
||||
mkclass_target(otlpmetricswriter.ti otlpmetricswriter-ti.cpp otlpmetricswriter-ti.hpp)
|
||||
list(APPEND perfdata_SOURCES
|
||||
otlpmetricswriter.cpp otlpmetricswriter.hpp otlpmetricswriter-ti.hpp
|
||||
)
|
||||
endif()
|
||||
|
||||
if(ICINGA2_UNITY_BUILD)
|
||||
mkunity_target(perfdata perfdata perfdata_SOURCES)
|
||||
endif()
|
||||
|
|
@ -29,6 +36,20 @@ endif()
|
|||
add_library(perfdata OBJECT ${perfdata_SOURCES})
|
||||
|
||||
add_dependencies(perfdata base config icinga)
|
||||
if(ICINGA2_WITH_OPENTELEMETRY)
|
||||
add_dependencies(perfdata otel)
|
||||
# All the Protobuf generated files within the otel target use relative include paths that won't be
|
||||
# resolved unless we also add the include directories of the otel target. Meaning, we include some
|
||||
# of the header files (not the generated ones) from otel within the otlpwriter and these headers
|
||||
# again include the generated headers and the generated headers in return include other generated
|
||||
# headers using relative paths like this:
|
||||
# #include "opentelemetry/proto/metrics/v1/metrics.pb.h"
|
||||
#
|
||||
# This path can only be resolved if the parent directory of "opentelemetry" is added to the compiler's
|
||||
# include search paths, which is done by the CMakefile of the otel target and we only need to propagate
|
||||
# its include directories to the perfdata target.
|
||||
target_include_directories(perfdata PUBLIC $<TARGET_PROPERTY:otel,INCLUDE_DIRECTORIES>)
|
||||
endif()
|
||||
|
||||
set_target_properties (
|
||||
perfdata PROPERTIES
|
||||
|
|
@ -65,6 +86,13 @@ install_if_not_exists(
|
|||
${ICINGA2_CONFIGDIR}/features-available
|
||||
)
|
||||
|
||||
if(ICINGA2_WITH_OPENTELEMETRY)
|
||||
install_if_not_exists(
|
||||
${PROJECT_SOURCE_DIR}/etc/icinga2/features-available/otlpmetrics.conf
|
||||
${ICINGA2_CONFIGDIR}/features-available
|
||||
)
|
||||
endif()
|
||||
|
||||
install_if_not_exists(
|
||||
${PROJECT_SOURCE_DIR}/etc/icinga2/features-available/perfdata.conf
|
||||
${ICINGA2_CONFIGDIR}/features-available
|
||||
|
|
|
|||
385
lib/perfdata/otlpmetricswriter.cpp
Normal file
385
lib/perfdata/otlpmetricswriter.cpp
Normal file
|
|
@ -0,0 +1,385 @@
|
|||
// SPDX-FileCopyrightText: 2026 Icinga GmbH <https://icinga.com>
|
||||
// SPDX-License-Identifier: GPL-3.0-or-later
|
||||
|
||||
#include "perfdata/otlpmetricswriter.hpp"
|
||||
#include "perfdata/otlpmetricswriter-ti.cpp"
|
||||
#include "base/base64.hpp"
|
||||
#include "base/defer.hpp"
|
||||
#include "base/json.hpp"
|
||||
#include "base/object-packer.hpp"
|
||||
#include "base/perfdatavalue.hpp"
|
||||
#include "base/statsfunction.hpp"
|
||||
#include "icinga/checkable.hpp"
|
||||
#include "icinga/checkcommand.hpp"
|
||||
#include "icinga/service.hpp"
|
||||
#include <future>
|
||||
|
||||
using namespace icinga;
|
||||
|
||||
REGISTER_TYPE(OTLPMetricsWriter);
|
||||
|
||||
REGISTER_STATSFUNCTION(OTLPMetricsWriter, &OTLPMetricsWriter::StatsFunc);
|
||||
|
||||
// Represent our currently supported metric streams.
|
||||
//
|
||||
// Note: These and all other attribute keys used within this compilation unit follow
|
||||
// the OTel general naming guidelines[^1] and conventions[^2].
|
||||
//
|
||||
// [^1]: https://opentelemetry.io/docs/specs/semconv/general/metrics/#general-guidelines
|
||||
// [^2]: https://opentelemetry.io/docs/specs/semconv/general/naming
|
||||
static constexpr std::string_view l_PerfdataMetric = "state_check.perfdata";
|
||||
static constexpr std::string_view l_ThresholdMetric = "state_check.threshold";
|
||||
|
||||
void OTLPMetricsWriter::StatsFunc(const Dictionary::Ptr& status, const Array::Ptr& perfdata)
|
||||
{
|
||||
DictionaryData statusData;
|
||||
for (const Ptr& otlpWriter : ConfigType::GetObjectsByType<OTLPMetricsWriter>()) {
|
||||
std::size_t workQueueSize = otlpWriter->m_WorkQueue.GetLength();
|
||||
double workQueueItemRate = otlpWriter->m_WorkQueue.GetTaskCount(60) / 60.0;
|
||||
std::size_t dataPointsCount = otlpWriter->m_DataPointsCount.load(std::memory_order_relaxed);
|
||||
uint64_t messageSize = otlpWriter->m_RecordedBytes.load(std::memory_order_relaxed);
|
||||
|
||||
const auto name = otlpWriter->GetName();
|
||||
statusData.emplace_back(name, new Dictionary{
|
||||
{"work_queue_items", workQueueSize},
|
||||
{"work_queue_item_rate", workQueueItemRate},
|
||||
{"data_buffer_items", dataPointsCount},
|
||||
{"data_buffer_bytes", messageSize},
|
||||
});
|
||||
|
||||
perfdata->Add(new PerfdataValue("otlpmetricswriter_" + name + "_work_queue_items", workQueueSize, true));
|
||||
perfdata->Add(new PerfdataValue("otlpmetricswriter_" + name + "_work_queue_item_rate", workQueueItemRate));
|
||||
perfdata->Add(new PerfdataValue("otlpmetricswriter_" + name + "_data_buffer_items", dataPointsCount, true));
|
||||
perfdata->Add(new PerfdataValue("otlpmetricswriter_" + name + "_data_buffer_bytes", messageSize, false, "bytes"));
|
||||
}
|
||||
status->Set("otlpmetricswriter", new Dictionary{std::move(statusData)});
|
||||
}
|
||||
|
||||
void OTLPMetricsWriter::OnConfigLoaded()
|
||||
{
|
||||
ObjectImpl::OnConfigLoaded();
|
||||
|
||||
m_WorkQueue.SetName("OTLPMetricsWriter, " + GetName());
|
||||
|
||||
if (!GetEnableHa()) {
|
||||
Log(LogDebug, "OTLPMetricsWriter")
|
||||
<< "HA functionality disabled. Won't pause connection: " << GetName();
|
||||
|
||||
SetHAMode(HARunEverywhere);
|
||||
} else {
|
||||
SetHAMode(HARunOnce);
|
||||
}
|
||||
}
|
||||
|
||||
void OTLPMetricsWriter::Start(bool runtimeCreated)
|
||||
{
|
||||
ObjectImpl::Start(runtimeCreated);
|
||||
|
||||
OTelConnInfo connInfo;
|
||||
connInfo.EnableTls = GetEnableTls();
|
||||
connInfo.VerifyPeerCertificate = !GetTlsInsecureNoverify();
|
||||
connInfo.Host = GetHost();
|
||||
connInfo.Port = GetPort();
|
||||
connInfo.TlsCaCrt = GetTlsCaFile();
|
||||
connInfo.TlsCrt = GetTlsCertFile();
|
||||
connInfo.TlsKey = GetTlsKeyFile();
|
||||
connInfo.MetricsEndpoint = GetMetricsEndpoint();
|
||||
if (auto auth = GetBasicAuth(); auth) {
|
||||
connInfo.BasicAuth = Base64::Encode(auth->Get("username") + ":" + auth->Get("password"));
|
||||
}
|
||||
|
||||
m_Exporter.reset(new OTel{connInfo});
|
||||
}
|
||||
|
||||
void OTLPMetricsWriter::Resume()
|
||||
{
|
||||
ObjectImpl::Resume();
|
||||
|
||||
Log(LogInformation, "OTLPMetricsWriter")
|
||||
<< "'" << GetName() << "' resumed.";
|
||||
|
||||
m_WorkQueue.SetExceptionCallback([](boost::exception_ptr exp) {
|
||||
Log(LogCritical, "OTLPMetricsWriter")
|
||||
<< "Exception while producing OTel metric: " << DiagnosticInformation(exp);
|
||||
});
|
||||
|
||||
m_FlushTimer = Timer::Create();
|
||||
m_FlushTimer->SetInterval(GetFlushInterval());
|
||||
m_FlushTimer->OnTimerExpired.connect([this](const Timer* const&) {
|
||||
if (m_TimerFlushInProgress.exchange(true, std::memory_order_relaxed)) {
|
||||
// Previous timer-initiated flush still in progress, skip this one.
|
||||
return;
|
||||
}
|
||||
m_WorkQueue.Enqueue([this] {
|
||||
Defer resetTimerFlag{[this] { m_TimerFlushInProgress.store(false, std::memory_order_relaxed); }};
|
||||
Flush(true);
|
||||
});
|
||||
});
|
||||
m_FlushTimer->Start();
|
||||
m_Exporter->Start();
|
||||
|
||||
m_CheckResultsSlot = Checkable::OnNewCheckResult.connect([this](
|
||||
const Checkable::Ptr& checkable,
|
||||
const CheckResult::Ptr& cr,
|
||||
const MessageOrigin::Ptr&
|
||||
) {
|
||||
CheckResultHandler(checkable, cr);
|
||||
});
|
||||
m_ActiveChangedSlot = OnActiveChanged.connect([this](const ConfigObject::Ptr& obj, const Value&) {
|
||||
auto checkable = dynamic_pointer_cast<Checkable>(obj);
|
||||
if (!checkable || checkable->IsActive()) {
|
||||
return;
|
||||
}
|
||||
m_WorkQueue.Enqueue([this, checkable] { m_Metrics.erase(checkable.get()); });
|
||||
});
|
||||
}
|
||||
|
||||
void OTLPMetricsWriter::Pause()
|
||||
{
|
||||
m_CheckResultsSlot.disconnect();
|
||||
m_ActiveChangedSlot.disconnect();
|
||||
|
||||
m_FlushTimer->Stop(true);
|
||||
|
||||
std::promise<void> promise;
|
||||
auto future = promise.get_future();
|
||||
m_WorkQueue.Enqueue([this, &promise] {
|
||||
Flush();
|
||||
promise.set_value();
|
||||
}, PriorityLow);
|
||||
|
||||
if (auto status = future.wait_for(std::chrono::seconds(GetDisconnectTimeout())); status != std::future_status::ready) {
|
||||
Log(LogWarning, "OTLPMetricsWriter")
|
||||
<< "Disconnect timeout reached while flushing OTel metrics, discarding '" << m_DataPointsCount
|
||||
<< "' data points ('" << m_RecordedBytes << "' bytes).";
|
||||
}
|
||||
m_Exporter->Stop();
|
||||
m_WorkQueue.Join();
|
||||
|
||||
m_Metrics.clear();
|
||||
|
||||
Log(LogInformation, "OTLPMetricsWriter")
|
||||
<< "'" << GetName() << "' paused.";
|
||||
|
||||
ObjectImpl::Pause();
|
||||
}
|
||||
|
||||
void OTLPMetricsWriter::CheckResultHandler(const Checkable::Ptr& checkable, const CheckResult::Ptr& cr)
|
||||
{
|
||||
if (!IcingaApplication::GetInstance()->GetEnablePerfdata() || !checkable->GetEnablePerfdata() || !cr->GetPerformanceData()) {
|
||||
return;
|
||||
}
|
||||
|
||||
m_WorkQueue.Enqueue([this, checkable, cr] {
|
||||
if (m_Exporter->Stopped()) {
|
||||
return;
|
||||
}
|
||||
CONTEXT("Processing check result for '" << checkable->GetName() << "'.");
|
||||
|
||||
auto startTime = cr->GetScheduleStart();
|
||||
auto endTime = cr->GetExecutionEnd();
|
||||
|
||||
Array::Ptr perfdata = cr->GetPerformanceData();
|
||||
ObjectLock olock(perfdata);
|
||||
for (const Value& val : perfdata) {
|
||||
PerfdataValue::Ptr pdv;
|
||||
if (val.IsObjectType<PerfdataValue>()) {
|
||||
pdv = val;
|
||||
} else {
|
||||
try {
|
||||
pdv = PerfdataValue::Parse(val);
|
||||
} catch (const std::exception&) {
|
||||
Log(LogWarning, "OTLPMetricsWriter")
|
||||
<< "Ignoring invalid perfdata for checkable '" << checkable->GetName() << "' and command '"
|
||||
<< checkable->GetCheckCommand()->GetName() << "' with value: " << val;
|
||||
continue;
|
||||
}
|
||||
}
|
||||
|
||||
OTel::AttrsMap attrs{{"perfdata_label", pdv->GetLabel()}};
|
||||
if (auto unit = pdv->GetUnit(); !unit.IsEmpty()) {
|
||||
attrs.emplace("unit", std::move(unit));
|
||||
}
|
||||
AddBytesAndFlushIfNeeded(Record(checkable, l_PerfdataMetric, pdv->GetValue(), startTime, endTime, std::move(attrs)));
|
||||
|
||||
if (GetEnableSendThresholds()) {
|
||||
std::array<std::pair<String, Value>, 4> thresholds{{
|
||||
{"critical", pdv->GetCrit()},
|
||||
{"warning", pdv->GetWarn()},
|
||||
{"min", pdv->GetMin()},
|
||||
{"max", pdv->GetMax()},
|
||||
}};
|
||||
for (auto& [label, threshold] : thresholds) {
|
||||
if (!threshold.IsEmpty()) {
|
||||
attrs = {
|
||||
{"perfdata_label", pdv->GetLabel()},
|
||||
{"threshold_type", std::move(label)},
|
||||
};
|
||||
AddBytesAndFlushIfNeeded(
|
||||
Record(
|
||||
checkable,
|
||||
l_ThresholdMetric,
|
||||
Convert::ToDouble(threshold),
|
||||
startTime,
|
||||
endTime,
|
||||
std::move(attrs)
|
||||
)
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
void OTLPMetricsWriter::Flush(bool fromTimer)
|
||||
{
|
||||
// If previous export is still in progress and this flush is requested from timer, skip it.
|
||||
// For manual flushes (e.g., due to reaching flush threshold), we want to block until
|
||||
// the previous export is done before returning to the caller (blocking is handled in OTel::Export()).
|
||||
if (fromTimer && m_Exporter->Exporting()) {
|
||||
return;
|
||||
}
|
||||
|
||||
Log(LogDebug, "OTLPMetricsWriter")
|
||||
<< "Flushing OTel metrics to OpenTelemetry backend" << (fromTimer ? " (timer expired)." : ".");
|
||||
|
||||
auto request = std::make_unique<OTel::MetricsRequest>();
|
||||
for (auto& [checkable, resourceMetrics] : m_Metrics) {
|
||||
if (resourceMetrics) {
|
||||
request->mutable_resource_metrics()->AddAllocated(resourceMetrics.release());
|
||||
}
|
||||
}
|
||||
if (request->resource_metrics_size() == 0) {
|
||||
Log(LogDebug, "OTLPMetricsWriter")
|
||||
<< "Not flushing OTel metrics: No data points recorded.";
|
||||
return;
|
||||
}
|
||||
m_Exporter->Export(std::move(request));
|
||||
m_RecordedBytes.store(0, std::memory_order_relaxed);
|
||||
m_DataPointsCount.store(0, std::memory_order_relaxed);
|
||||
}
|
||||
|
||||
void OTLPMetricsWriter::AddBytesAndFlushIfNeeded(std::size_t newBytes)
|
||||
{
|
||||
auto existingBytes = m_RecordedBytes.fetch_add(newBytes, std::memory_order_relaxed);
|
||||
if (auto bytes{existingBytes + newBytes}; bytes >= static_cast<uint64_t>(GetFlushThreshold())) {
|
||||
Log(LogDebug, "OTLPMetricsWriter")
|
||||
<< "Flush threshold reached, flushing '" << bytes << "' bytes of OTel metrics.";
|
||||
Flush();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Record a data point for the specified OTel metric associated with the given configuration object.
|
||||
*
|
||||
* This method records a data point of type T for the specified metric name associated with the
|
||||
* provided configuration object. If the metric does not exist for the object, it is created.
|
||||
*
|
||||
* @tparam T The type of the data point to record (e.g., int64_t, double).
|
||||
*
|
||||
* @param checkable The configuration object to associate the metric with.
|
||||
* @param metric The OTel metric enum value indicating which metric stream to record the data point for.
|
||||
* @param value The data point value to record.
|
||||
* @param startTime The start time of the data point in seconds.
|
||||
* @param endTime The end time of the data point in seconds.
|
||||
* @param attrs The attributes associated with the data point.
|
||||
*
|
||||
* @return The number of bytes recorded for this data point, which contributes to the flush threshold.
|
||||
*/
|
||||
template<typename T>
|
||||
std::size_t OTLPMetricsWriter::Record(
|
||||
const Checkable::Ptr& checkable,
|
||||
std::string_view metric,
|
||||
T value,
|
||||
double startTime,
|
||||
double endTime,
|
||||
OTel::AttrsMap attrs
|
||||
)
|
||||
{
|
||||
std::size_t bytes = 0;
|
||||
auto& resourceMetrics = m_Metrics[checkable.get()];
|
||||
if (!resourceMetrics) {
|
||||
using namespace std::string_view_literals;
|
||||
|
||||
resourceMetrics = std::make_unique<opentelemetry::proto::metrics::v1::ResourceMetrics>();
|
||||
OTel::PopulateResourceAttrs(resourceMetrics);
|
||||
|
||||
auto* resource = resourceMetrics->mutable_resource();
|
||||
auto* attr = resource->add_attributes();
|
||||
OTel::SetAttribute(*attr, "service.namespace"sv, GetServiceNamespace());
|
||||
|
||||
auto [host, service] = GetHostService(checkable);
|
||||
attr = resource->add_attributes();
|
||||
OTel::SetAttribute(*attr, "icinga2.host.name"sv, host->GetName());
|
||||
|
||||
// Add entity reference (https://opentelemetry.io/docs/specs/otel/entities/data-model/).
|
||||
auto* entity = resource->add_entity_refs();
|
||||
entity->mutable_id_keys()->Add("icinga2.host.name");
|
||||
if (service) {
|
||||
entity->set_type("service");
|
||||
entity->mutable_id_keys()->Add("icinga2.service.name");
|
||||
|
||||
attr = resource->add_attributes();
|
||||
OTel::SetAttribute(*attr, "icinga2.service.name"sv, service->GetShortName());
|
||||
} else {
|
||||
entity->set_type("host");
|
||||
}
|
||||
attr = resource->add_attributes();
|
||||
OTel::SetAttribute(*attr, "icinga2.command.name"sv, checkable->GetCheckCommand()->GetName());
|
||||
bytes = resourceMetrics->ByteSizeLong();
|
||||
}
|
||||
|
||||
auto* sm = resourceMetrics->mutable_scope_metrics(0);
|
||||
auto* metrics = sm->mutable_metrics();
|
||||
auto it = std::find_if(metrics->begin(), metrics->end(), [metric](const auto& m) { return m.name() == metric; });
|
||||
OTel::Gauge* gaugePtr = nullptr;
|
||||
if (it == metrics->end()) {
|
||||
OTel::ValidateName(metric);
|
||||
auto* metricPtr = sm->add_metrics();
|
||||
metricPtr->set_name(std::string(metric));
|
||||
bytes += metricPtr->ByteSizeLong(); // Account for metric name size in bytes.
|
||||
gaugePtr = metricPtr->mutable_gauge();
|
||||
} else {
|
||||
gaugePtr = it->mutable_gauge();
|
||||
}
|
||||
bytes += OTel::Record(*gaugePtr, value, startTime, endTime, std::move(attrs));
|
||||
m_DataPointsCount.fetch_add(1, std::memory_order_relaxed);
|
||||
return bytes;
|
||||
}
|
||||
|
||||
void OTLPMetricsWriter::ValidatePort(const Lazy<int>& lvalue, const ValidationUtils& utils)
|
||||
{
|
||||
ObjectImpl::ValidatePort(lvalue, utils);
|
||||
if (auto p = lvalue(); p < 1 || p > 65535) {
|
||||
BOOST_THROW_EXCEPTION(ValidationError(this, {"port"}, "Port must be in the range 1-65535."));
|
||||
}
|
||||
}
|
||||
|
||||
void OTLPMetricsWriter::ValidateFlushInterval(const Lazy<int>& lvalue, const ValidationUtils& utils)
|
||||
{
|
||||
ObjectImpl::ValidateFlushInterval(lvalue, utils);
|
||||
if (lvalue() < 1) {
|
||||
BOOST_THROW_EXCEPTION(ValidationError(this, {"flush_interval"}, "Flush interval must be at least 1 second."));
|
||||
}
|
||||
}
|
||||
|
||||
void OTLPMetricsWriter::ValidateFlushThreshold(const Lazy<int64_t>& lvalue, const ValidationUtils& utils)
|
||||
{
|
||||
ObjectImpl::ValidateFlushThreshold(lvalue, utils);
|
||||
if (lvalue() < 1) {
|
||||
BOOST_THROW_EXCEPTION(ValidationError(this, {"flush_threshold"}, "Flush threshold must be at least 1."));
|
||||
}
|
||||
// Protobuf limits the size of messages to be serialiazed/deserialized to max 2GiB. Thus, we can't accept
|
||||
// a flush threshold that would exceed that limit with a reasonable safe margin of 10MiB for any other
|
||||
// overhead in the message not accounted for in @c m_RecordedBytes.
|
||||
// See https://protobuf.dev/programming-guides/proto-limits/#total.
|
||||
constexpr std::size_t maxMessageSize = 2ULL * 1024 * 1024 * 1024 - 10 * 1024 * 1024;
|
||||
if (static_cast<uint64_t>(lvalue()) > maxMessageSize) {
|
||||
BOOST_THROW_EXCEPTION(ValidationError(
|
||||
this,
|
||||
{"flush_threshold"},
|
||||
"Flush threshold too high, would exceed Protobuf message size limit of 2GiB (1.9GiB max allowed)."
|
||||
));
|
||||
}
|
||||
}
|
||||
61
lib/perfdata/otlpmetricswriter.hpp
Normal file
61
lib/perfdata/otlpmetricswriter.hpp
Normal file
|
|
@ -0,0 +1,61 @@
|
|||
// SPDX-FileCopyrightText: 2026 Icinga GmbH <https://icinga.com>
|
||||
// SPDX-License-Identifier: GPL-3.0-or-later
|
||||
|
||||
#pragma once
|
||||
|
||||
#include "perfdata/otlpmetricswriter-ti.hpp"
|
||||
#include "base/workqueue.hpp"
|
||||
#include "icinga/checkable.hpp"
|
||||
#include "otel/otel.hpp"
|
||||
#include <unordered_map>
|
||||
|
||||
namespace icinga
|
||||
{
|
||||
|
||||
class OTLPMetricsWriter final : public ObjectImpl<OTLPMetricsWriter>
|
||||
{
|
||||
public:
|
||||
DECLARE_OBJECT(OTLPMetricsWriter);
|
||||
DECLARE_OBJECTNAME(OTLPMetricsWriter);
|
||||
|
||||
static void StatsFunc(const Dictionary::Ptr& status, const Array::Ptr& perfdata);
|
||||
|
||||
void Start(bool runtimeCreated) override;
|
||||
void OnConfigLoaded() override;
|
||||
void Resume() override;
|
||||
void Pause() override;
|
||||
|
||||
protected:
|
||||
void ValidatePort(const Lazy<int>& lvalue, const ValidationUtils& utils) override;
|
||||
void ValidateFlushInterval(const Lazy<int>& lvalue, const ValidationUtils& utils) override;
|
||||
void ValidateFlushThreshold(const Lazy<int64_t>& lvalue, const ValidationUtils& utils) override;
|
||||
|
||||
private:
|
||||
void CheckResultHandler(const Checkable::Ptr& checkable, const CheckResult::Ptr& cr);
|
||||
void Flush(bool fromTimer = false);
|
||||
void AddBytesAndFlushIfNeeded(std::size_t newBytes = 0);
|
||||
|
||||
template<typename T>
|
||||
[[nodiscard]] std::size_t Record(
|
||||
const Checkable::Ptr& checkable,
|
||||
std::string_view metric,
|
||||
T value,
|
||||
double startTime,
|
||||
double endTime,
|
||||
OTel::AttrsMap attrs
|
||||
);
|
||||
|
||||
std::atomic_uint64_t m_RecordedBytes{0}; // Total bytes recorded in the current OTel message.
|
||||
std::atomic_uint64_t m_DataPointsCount{0}; // Total data points recorded in the current OTel message.
|
||||
|
||||
// Checkables and their associated OTel ResourceMetrics that are being recorded for the current OTel message.
|
||||
std::unordered_map<Checkable*, std::unique_ptr<opentelemetry::proto::metrics::v1::ResourceMetrics>> m_Metrics;
|
||||
|
||||
WorkQueue m_WorkQueue{10'000'000, 1};
|
||||
boost::signals2::connection m_CheckResultsSlot, m_ActiveChangedSlot;
|
||||
OTel::Ptr m_Exporter;
|
||||
Timer::Ptr m_FlushTimer;
|
||||
std::atomic_bool m_TimerFlushInProgress{false}; // Whether a timer-initiated flush is in progress.
|
||||
};
|
||||
|
||||
} // namespace icinga
|
||||
68
lib/perfdata/otlpmetricswriter.ti
Normal file
68
lib/perfdata/otlpmetricswriter.ti
Normal file
|
|
@ -0,0 +1,68 @@
|
|||
// SPDX-FileCopyrightText: 2026 Icinga GmbH <https://icinga.com>
|
||||
// SPDX-License-Identifier: GPL-3.0-or-later
|
||||
|
||||
#include "base/configobject.hpp"
|
||||
|
||||
library perfdata;
|
||||
|
||||
namespace icinga
|
||||
{
|
||||
|
||||
class OTLPMetricsWriter : ConfigObject
|
||||
{
|
||||
activation_priority 100;
|
||||
|
||||
[config, required, no_user_modify] String host {
|
||||
default {{{ return "127.0.0.1"; }}}
|
||||
};
|
||||
[config, no_user_modify] int port {
|
||||
default {{{ return 4318; }}}
|
||||
};
|
||||
[config, required, no_user_modify] String metrics_endpoint {
|
||||
default {{{ return "/v1/metrics"; }}}
|
||||
};
|
||||
|
||||
[config, required] String service_namespace {
|
||||
default {{{ return "icinga"; }}}
|
||||
};
|
||||
|
||||
[config, no_user_view, no_user_modify] Dictionary::Ptr basic_auth;
|
||||
|
||||
[config] int flush_interval {
|
||||
default {{{ return 15; }}}
|
||||
};
|
||||
[config] int64_t flush_threshold {
|
||||
default {{{ return 32 * 1024 * 1024; }}}
|
||||
};
|
||||
[config] bool enable_ha {
|
||||
default {{{ return false; }}}
|
||||
};
|
||||
[config] bool enable_send_thresholds {
|
||||
default {{{ return false; }}}
|
||||
};
|
||||
[config] int disconnect_timeout {
|
||||
default {{{ return 10; }}}
|
||||
};
|
||||
|
||||
[config, no_user_modify] bool enable_tls {
|
||||
default {{{ return false; }}}
|
||||
};
|
||||
[config, no_user_modify] bool tls_insecure_noverify {
|
||||
default {{{ return false; }}}
|
||||
};
|
||||
[config, no_user_modify] String tls_ca_file;
|
||||
[config, no_user_modify] String tls_cert_file;
|
||||
[config, no_user_modify] String tls_key_file;
|
||||
};
|
||||
|
||||
validator OTLPMetricsWriter
|
||||
{
|
||||
Dictionary basic_auth {
|
||||
required username;
|
||||
String username;
|
||||
required password;
|
||||
String password;
|
||||
};
|
||||
};
|
||||
|
||||
} // namespace icinga
|
||||
|
|
@ -152,7 +152,7 @@ static std::string FieldTypeToIcingaName(const Field& field, bool inner)
|
|||
if (field.Attributes & FAEnum)
|
||||
return "Number";
|
||||
|
||||
if (ftype == "int" || ftype == "double")
|
||||
if (ftype == "int" || ftype == "int64_t" || ftype == "double")
|
||||
return "Number";
|
||||
else if (ftype == "bool")
|
||||
return "Boolean";
|
||||
|
|
|
|||
Loading…
Reference in a new issue