icinga2/lib/perfdata/otlpmetricswriter.cpp

444 lines
15 KiB
C++

// SPDX-FileCopyrightText: 2026 Icinga GmbH <https://icinga.com>
// SPDX-License-Identifier: GPL-3.0-or-later
#include "perfdata/otlpmetricswriter.hpp"
#include "perfdata/otlpmetricswriter-ti.cpp"
#include "base/base64.hpp"
#include "base/defer.hpp"
#include "base/json.hpp"
#include "base/object-packer.hpp"
#include "base/perfdatavalue.hpp"
#include "base/statsfunction.hpp"
#include "icinga/checkable.hpp"
#include "icinga/checkcommand.hpp"
#include "icinga/macroprocessor.hpp"
#include "icinga/service.hpp"
#include <future>
using namespace icinga;
REGISTER_TYPE(OTLPMetricsWriter);
REGISTER_STATSFUNCTION(OTLPMetricsWriter, &OTLPMetricsWriter::StatsFunc);
// Represent our currently supported metric streams.
//
// Note: These and all other attribute keys used within this compilation unit follow
// the OTel general naming guidelines[^1] and conventions[^2].
//
// [^1]: https://opentelemetry.io/docs/specs/semconv/general/metrics/#general-guidelines
// [^2]: https://opentelemetry.io/docs/specs/semconv/general/naming
static constexpr std::string_view l_PerfdataMetric = "state_check.perfdata";
static constexpr std::string_view l_ThresholdMetric = "state_check.threshold";
void OTLPMetricsWriter::StatsFunc(const Dictionary::Ptr& status, const Array::Ptr& perfdata)
{
DictionaryData statusData;
for (const Ptr& otlpWriter : ConfigType::GetObjectsByType<OTLPMetricsWriter>()) {
std::size_t workQueueSize = otlpWriter->m_WorkQueue.GetLength();
double workQueueItemRate = otlpWriter->m_WorkQueue.GetTaskCount(60) / 60.0;
std::size_t dataPointsCount = otlpWriter->m_DataPointsCount.load(std::memory_order_relaxed);
uint64_t messageSize = otlpWriter->m_RecordedBytes.load(std::memory_order_relaxed);
const auto name = otlpWriter->GetName();
statusData.emplace_back(name, new Dictionary{
{"work_queue_items", workQueueSize},
{"work_queue_item_rate", workQueueItemRate},
{"data_buffer_items", dataPointsCount},
{"data_buffer_bytes", messageSize},
});
perfdata->Add(new PerfdataValue("otlpmetricswriter_" + name + "_work_queue_items", workQueueSize, true));
perfdata->Add(new PerfdataValue("otlpmetricswriter_" + name + "_work_queue_item_rate", workQueueItemRate));
perfdata->Add(new PerfdataValue("otlpmetricswriter_" + name + "_data_buffer_items", dataPointsCount, true));
perfdata->Add(new PerfdataValue("otlpmetricswriter_" + name + "_data_buffer_bytes", messageSize, false, "bytes"));
}
status->Set("otlpmetricswriter", new Dictionary{std::move(statusData)});
}
void OTLPMetricsWriter::OnConfigLoaded()
{
ObjectImpl::OnConfigLoaded();
m_WorkQueue.SetName("OTLPMetricsWriter, " + GetName());
if (!GetEnableHa()) {
Log(LogDebug, "OTLPMetricsWriter")
<< "HA functionality disabled. Won't pause connection: " << GetName();
SetHAMode(HARunEverywhere);
} else {
SetHAMode(HARunOnce);
}
}
void OTLPMetricsWriter::Start(bool runtimeCreated)
{
ObjectImpl::Start(runtimeCreated);
OTelConnInfo connInfo;
connInfo.EnableTls = GetEnableTls();
connInfo.VerifyPeerCertificate = !GetTlsInsecureNoverify();
connInfo.Host = GetHost();
connInfo.Port = GetPort();
connInfo.TlsCaCrt = GetTlsCaFile();
connInfo.TlsCrt = GetTlsCertFile();
connInfo.TlsKey = GetTlsKeyFile();
connInfo.MetricsEndpoint = GetMetricsEndpoint();
if (auto auth = GetBasicAuth(); auth) {
connInfo.BasicAuth = Base64::Encode(auth->Get("username") + ":" + auth->Get("password"));
}
m_Exporter.reset(new OTel{connInfo});
}
void OTLPMetricsWriter::Resume()
{
ObjectImpl::Resume();
Log(LogInformation, "OTLPMetricsWriter")
<< "'" << GetName() << "' resumed.";
m_WorkQueue.SetExceptionCallback([](boost::exception_ptr exp) {
Log(LogCritical, "OTLPMetricsWriter")
<< "Exception while producing OTel metric: " << DiagnosticInformation(exp);
});
m_FlushTimer = Timer::Create();
m_FlushTimer->SetInterval(GetFlushInterval());
m_FlushTimer->OnTimerExpired.connect([this](const Timer* const&) {
if (m_TimerFlushInProgress.exchange(true, std::memory_order_relaxed)) {
// Previous timer-initiated flush still in progress, skip this one.
return;
}
m_WorkQueue.Enqueue([this] {
Defer resetTimerFlag{[this] { m_TimerFlushInProgress.store(false, std::memory_order_relaxed); }};
Flush(true);
});
});
m_FlushTimer->Start();
m_Exporter->Start();
m_CheckResultsSlot = Checkable::OnNewCheckResult.connect([this](
const Checkable::Ptr& checkable,
const CheckResult::Ptr& cr,
const MessageOrigin::Ptr&
) {
CheckResultHandler(checkable, cr);
});
m_ActiveChangedSlot = OnActiveChanged.connect([this](const ConfigObject::Ptr& obj, const Value&) {
auto checkable = dynamic_pointer_cast<Checkable>(obj);
if (!checkable || checkable->IsActive()) {
return;
}
m_WorkQueue.Enqueue([this, checkable] { m_Metrics.erase(checkable.get()); });
});
}
void OTLPMetricsWriter::Pause()
{
m_CheckResultsSlot.disconnect();
m_ActiveChangedSlot.disconnect();
m_FlushTimer->Stop(true);
std::promise<void> promise;
auto future = promise.get_future();
m_WorkQueue.Enqueue([this, &promise] {
Flush();
promise.set_value();
}, PriorityLow);
if (auto status = future.wait_for(std::chrono::seconds(GetDisconnectTimeout())); status != std::future_status::ready) {
Log(LogWarning, "OTLPMetricsWriter")
<< "Disconnect timeout reached while flushing OTel metrics, discarding '" << m_DataPointsCount
<< "' data points ('" << m_RecordedBytes << "' bytes).";
}
m_Exporter->Stop();
m_WorkQueue.Join();
m_Metrics.clear();
Log(LogInformation, "OTLPMetricsWriter")
<< "'" << GetName() << "' paused.";
ObjectImpl::Pause();
}
void OTLPMetricsWriter::CheckResultHandler(const Checkable::Ptr& checkable, const CheckResult::Ptr& cr)
{
if (!IcingaApplication::GetInstance()->GetEnablePerfdata() || !checkable->GetEnablePerfdata() || !cr->GetPerformanceData()) {
return;
}
m_WorkQueue.Enqueue([this, checkable, cr] {
if (m_Exporter->Stopped()) {
return;
}
CONTEXT("Processing check result for '" << checkable->GetName() << "'.");
auto startTime = cr->GetScheduleStart();
auto endTime = cr->GetExecutionEnd();
Array::Ptr perfdata = cr->GetPerformanceData();
ObjectLock olock(perfdata);
for (const Value& val : perfdata) {
PerfdataValue::Ptr pdv;
if (val.IsObjectType<PerfdataValue>()) {
pdv = val;
} else {
try {
pdv = PerfdataValue::Parse(val);
} catch (const std::exception&) {
Log(LogWarning, "OTLPMetricsWriter")
<< "Ignoring invalid perfdata for checkable '" << checkable->GetName() << "' and command '"
<< checkable->GetCheckCommand()->GetName() << "' with value: " << val;
continue;
}
}
OTel::AttrsMap attrs{{"perfdata_label", pdv->GetLabel()}};
if (auto unit = pdv->GetUnit(); !unit.IsEmpty()) {
attrs.emplace("unit", std::move(unit));
}
AddBytesAndFlushIfNeeded(Record(checkable, cr, l_PerfdataMetric, pdv->GetValue(), startTime, endTime, std::move(attrs)));
if (GetEnableSendThresholds()) {
std::array<std::pair<String, Value>, 4> thresholds{{
{"critical", pdv->GetCrit()},
{"warning", pdv->GetWarn()},
{"min", pdv->GetMin()},
{"max", pdv->GetMax()},
}};
for (auto& [label, threshold] : thresholds) {
if (!threshold.IsEmpty()) {
attrs = {
{"perfdata_label", pdv->GetLabel()},
{"threshold_type", std::move(label)},
};
AddBytesAndFlushIfNeeded(
Record(
checkable,
cr,
l_ThresholdMetric,
Convert::ToDouble(threshold),
startTime,
endTime,
std::move(attrs)
)
);
}
}
}
}
});
}
void OTLPMetricsWriter::Flush(bool fromTimer)
{
// If previous export is still in progress and this flush is requested from timer, skip it.
// For manual flushes (e.g., due to reaching flush threshold), we want to block until
// the previous export is done before returning to the caller (blocking is handled in OTel::Export()).
if (fromTimer && m_Exporter->Exporting()) {
return;
}
Log(LogDebug, "OTLPMetricsWriter")
<< "Flushing OTel metrics to OpenTelemetry backend" << (fromTimer ? " (timer expired)." : ".");
auto request = std::make_unique<OTel::MetricsRequest>();
for (auto& [checkable, resourceMetrics] : m_Metrics) {
if (resourceMetrics) {
request->mutable_resource_metrics()->AddAllocated(resourceMetrics.release());
}
}
if (request->resource_metrics_size() == 0) {
Log(LogDebug, "OTLPMetricsWriter")
<< "Not flushing OTel metrics: No data points recorded.";
return;
}
m_Exporter->Export(std::move(request));
m_RecordedBytes.store(0, std::memory_order_relaxed);
m_DataPointsCount.store(0, std::memory_order_relaxed);
}
void OTLPMetricsWriter::AddBytesAndFlushIfNeeded(std::size_t newBytes)
{
auto existingBytes = m_RecordedBytes.fetch_add(newBytes, std::memory_order_relaxed);
if (auto bytes{existingBytes + newBytes}; bytes >= static_cast<uint64_t>(GetFlushThreshold())) {
Log(LogDebug, "OTLPMetricsWriter")
<< "Flush threshold reached, flushing '" << bytes << "' bytes of OTel metrics.";
Flush();
}
}
/**
* Record a data point for the specified OTel metric associated with the given configuration object.
*
* This method records a data point of type T for the specified metric name associated with the
* provided configuration object. If the metric does not exist for the object, it is created.
*
* @tparam T The type of the data point to record (e.g., int64_t, double).
*
* @param checkable The configuration object to associate the metric with.
* @param cr The check result associated with the metric data point, used for macro resolution in attributes.
* @param metric The OTel metric enum value indicating which metric stream to record the data point for.
* @param value The data point value to record.
* @param startTime The start time of the data point in seconds.
* @param endTime The end time of the data point in seconds.
* @param attrs The attributes associated with the data point.
*
* @return The number of bytes recorded for this data point, which contributes to the flush threshold.
*/
template<typename T>
std::size_t OTLPMetricsWriter::Record(
const Checkable::Ptr& checkable,
const CheckResult::Ptr& cr,
std::string_view metric,
T value,
double startTime,
double endTime,
OTel::AttrsMap attrs
)
{
std::size_t bytes = 0;
auto& resourceMetrics = m_Metrics[checkable.get()];
if (!resourceMetrics) {
using namespace std::string_view_literals;
resourceMetrics = std::make_unique<opentelemetry::proto::metrics::v1::ResourceMetrics>();
OTel::PopulateResourceAttrs(resourceMetrics);
auto* resource = resourceMetrics->mutable_resource();
auto* attr = resource->add_attributes();
OTel::SetAttribute(*attr, "service.namespace"sv, GetServiceNamespace());
auto [host, service] = GetHostService(checkable);
attr = resource->add_attributes();
OTel::SetAttribute(*attr, "icinga2.host.name"sv, host->GetName());
// Add entity reference (https://opentelemetry.io/docs/specs/otel/entities/data-model/).
auto* entity = resource->add_entity_refs();
entity->mutable_id_keys()->Add("icinga2.host.name");
if (service) {
entity->set_type("service");
entity->mutable_id_keys()->Add("icinga2.service.name");
attr = resource->add_attributes();
OTel::SetAttribute(*attr, "icinga2.service.name"sv, service->GetShortName());
} else {
entity->set_type("host");
}
attr = resource->add_attributes();
OTel::SetAttribute(*attr, "icinga2.command.name"sv, checkable->GetCheckCommand()->GetName());
if (Dictionary::Ptr tmpl = service ? GetServiceResourceAttributes() : GetHostResourceAttributes(); tmpl) {
MacroProcessor::ResolverList resolvers{{"host", host}};
if (service) {
resolvers.emplace_back("service", service);
}
ObjectLock olock(tmpl);
for (const Dictionary::Pair& pair : tmpl) {
String missingMacro;
auto resolvedVal = MacroProcessor::ResolveMacros(pair.second, resolvers, cr, &missingMacro);
if (missingMacro.IsEmpty()) {
attr = resource->add_attributes();
try {
OTel::SetAttribute(*attr, "icinga2.custom." + pair.first, resolvedVal);
} catch (const std::exception& ex) {
Log(LogWarning, "OTLPMetricsWriter")
<< "Ignoring invalid resource attribute '" << pair.first << "' for checkable '"
<< checkable->GetName() << "': " << ex.what();
// Remove the last attribute from the list which is the one we just attempted to set.
resource->mutable_attributes()->RemoveLast();
}
}
}
}
bytes = resourceMetrics->ByteSizeLong();
}
auto* sm = resourceMetrics->mutable_scope_metrics(0);
auto* metrics = sm->mutable_metrics();
auto it = std::find_if(metrics->begin(), metrics->end(), [metric](const auto& m) { return m.name() == metric; });
OTel::Gauge* gaugePtr = nullptr;
if (it == metrics->end()) {
OTel::ValidateName(metric);
auto* metricPtr = sm->add_metrics();
metricPtr->set_name(std::string(metric));
bytes += metricPtr->ByteSizeLong(); // Account for metric name size in bytes.
gaugePtr = metricPtr->mutable_gauge();
} else {
gaugePtr = it->mutable_gauge();
}
bytes += OTel::Record(*gaugePtr, value, startTime, endTime, std::move(attrs));
m_DataPointsCount.fetch_add(1, std::memory_order_relaxed);
return bytes;
}
void OTLPMetricsWriter::ValidatePort(const Lazy<int>& lvalue, const ValidationUtils& utils)
{
ObjectImpl::ValidatePort(lvalue, utils);
if (auto p = lvalue(); p < 1 || p > 65535) {
BOOST_THROW_EXCEPTION(ValidationError(this, {"port"}, "Port must be in the range 1-65535."));
}
}
void OTLPMetricsWriter::ValidateFlushInterval(const Lazy<int>& lvalue, const ValidationUtils& utils)
{
ObjectImpl::ValidateFlushInterval(lvalue, utils);
if (lvalue() < 1) {
BOOST_THROW_EXCEPTION(ValidationError(this, {"flush_interval"}, "Flush interval must be at least 1 second."));
}
}
void OTLPMetricsWriter::ValidateFlushThreshold(const Lazy<int64_t>& lvalue, const ValidationUtils& utils)
{
ObjectImpl::ValidateFlushThreshold(lvalue, utils);
if (lvalue() < 1) {
BOOST_THROW_EXCEPTION(ValidationError(this, {"flush_threshold"}, "Flush threshold must be at least 1."));
}
// Protobuf limits the size of messages to be serialiazed/deserialized to max 2GiB. Thus, we can't accept
// a flush threshold that would exceed that limit with a reasonable safe margin of 10MiB for any other
// overhead in the message not accounted for in @c m_RecordedBytes.
// See https://protobuf.dev/programming-guides/proto-limits/#total.
constexpr std::size_t maxMessageSize = 2ULL * 1024 * 1024 * 1024 - 10 * 1024 * 1024;
if (static_cast<uint64_t>(lvalue()) > maxMessageSize) {
BOOST_THROW_EXCEPTION(ValidationError(
this,
{"flush_threshold"},
"Flush threshold too high, would exceed Protobuf message size limit of 2GiB (1.9GiB max allowed)."
));
}
}
void OTLPMetricsWriter::ValidateHostResourceAttributes(const Lazy<Dictionary::Ptr>& lvalue, const ValidationUtils& utils)
{
ObjectImpl::ValidateHostResourceAttributes(lvalue, utils);
if (const auto& tags{lvalue()}; tags) {
ValidateResourceAttributes(tags, "host_resource_attributes");
}
}
void OTLPMetricsWriter::ValidateServiceResourceAttributes(const Lazy<Dictionary::Ptr>& lvalue, const ValidationUtils& utils)
{
ObjectImpl::ValidateServiceResourceAttributes(lvalue, utils);
if (const auto& tags{lvalue()}; tags) {
ValidateResourceAttributes(tags, "service_resource_attributes");
}
}
void OTLPMetricsWriter::ValidateResourceAttributes(const Dictionary::Ptr& tmpl, const String& attrName)
{
ObjectLock olock(tmpl);
for (const auto& pair : tmpl) {
if (!MacroProcessor::ValidateMacroString(pair.second)) {
BOOST_THROW_EXCEPTION(ValidationError(
this,
{attrName, pair.first},
"Closing $ not found in macro format string '" + pair.second + "'."
));
}
}
}