// SPDX-FileCopyrightText: 2026 Icinga GmbH // SPDX-License-Identifier: GPL-3.0-or-later #include "perfdata/otlpmetricswriter.hpp" #include "perfdata/otlpmetricswriter-ti.cpp" #include "base/base64.hpp" #include "base/defer.hpp" #include "base/json.hpp" #include "base/object-packer.hpp" #include "base/perfdatavalue.hpp" #include "base/statsfunction.hpp" #include "icinga/checkable.hpp" #include "icinga/checkcommand.hpp" #include "icinga/macroprocessor.hpp" #include "icinga/service.hpp" #include using namespace icinga; REGISTER_TYPE(OTLPMetricsWriter); REGISTER_STATSFUNCTION(OTLPMetricsWriter, &OTLPMetricsWriter::StatsFunc); // Represent our currently supported metric streams. // // Note: These and all other attribute keys used within this compilation unit follow // the OTel general naming guidelines[^1] and conventions[^2]. // // [^1]: https://opentelemetry.io/docs/specs/semconv/general/metrics/#general-guidelines // [^2]: https://opentelemetry.io/docs/specs/semconv/general/naming static constexpr std::string_view l_PerfdataMetric = "state_check.perfdata"; static constexpr std::string_view l_ThresholdMetric = "state_check.threshold"; void OTLPMetricsWriter::StatsFunc(const Dictionary::Ptr& status, const Array::Ptr& perfdata) { DictionaryData statusData; for (const Ptr& otlpWriter : ConfigType::GetObjectsByType()) { std::size_t workQueueSize = otlpWriter->m_WorkQueue.GetLength(); double workQueueItemRate = otlpWriter->m_WorkQueue.GetTaskCount(60) / 60.0; std::size_t dataPointsCount = otlpWriter->m_DataPointsCount.load(std::memory_order_relaxed); uint64_t messageSize = otlpWriter->m_RecordedBytes.load(std::memory_order_relaxed); const auto name = otlpWriter->GetName(); statusData.emplace_back(name, new Dictionary{ {"work_queue_items", workQueueSize}, {"work_queue_item_rate", workQueueItemRate}, {"data_buffer_items", dataPointsCount}, {"data_buffer_bytes", messageSize}, }); perfdata->Add(new PerfdataValue("otlpmetricswriter_" + name + "_work_queue_items", workQueueSize, true)); perfdata->Add(new PerfdataValue("otlpmetricswriter_" + name + "_work_queue_item_rate", workQueueItemRate)); perfdata->Add(new PerfdataValue("otlpmetricswriter_" + name + "_data_buffer_items", dataPointsCount, true)); perfdata->Add(new PerfdataValue("otlpmetricswriter_" + name + "_data_buffer_bytes", messageSize, false, "bytes")); } status->Set("otlpmetricswriter", new Dictionary{std::move(statusData)}); } void OTLPMetricsWriter::OnConfigLoaded() { ObjectImpl::OnConfigLoaded(); m_WorkQueue.SetName("OTLPMetricsWriter, " + GetName()); if (!GetEnableHa()) { Log(LogDebug, "OTLPMetricsWriter") << "HA functionality disabled. Won't pause connection: " << GetName(); SetHAMode(HARunEverywhere); } else { SetHAMode(HARunOnce); } } void OTLPMetricsWriter::Start(bool runtimeCreated) { ObjectImpl::Start(runtimeCreated); OTelConnInfo connInfo; connInfo.EnableTls = GetEnableTls(); connInfo.VerifyPeerCertificate = !GetTlsInsecureNoverify(); connInfo.Host = GetHost(); connInfo.Port = GetPort(); connInfo.TlsCaCrt = GetTlsCaFile(); connInfo.TlsCrt = GetTlsCertFile(); connInfo.TlsKey = GetTlsKeyFile(); connInfo.MetricsEndpoint = GetMetricsEndpoint(); if (auto auth = GetBasicAuth(); auth) { connInfo.BasicAuth = Base64::Encode(auth->Get("username") + ":" + auth->Get("password")); } m_Exporter.reset(new OTel{connInfo}); } void OTLPMetricsWriter::Resume() { ObjectImpl::Resume(); Log(LogInformation, "OTLPMetricsWriter") << "'" << GetName() << "' resumed."; m_WorkQueue.SetExceptionCallback([](boost::exception_ptr exp) { Log(LogCritical, "OTLPMetricsWriter") << "Exception while producing OTel metric: " << DiagnosticInformation(exp); }); m_FlushTimer = Timer::Create(); m_FlushTimer->SetInterval(GetFlushInterval()); m_FlushTimer->OnTimerExpired.connect([this](const Timer* const&) { if (m_TimerFlushInProgress.exchange(true, std::memory_order_relaxed)) { // Previous timer-initiated flush still in progress, skip this one. return; } m_WorkQueue.Enqueue([this] { Defer resetTimerFlag{[this] { m_TimerFlushInProgress.store(false, std::memory_order_relaxed); }}; Flush(true); }); }); m_FlushTimer->Start(); m_Exporter->Start(); m_CheckResultsSlot = Checkable::OnNewCheckResult.connect([this]( const Checkable::Ptr& checkable, const CheckResult::Ptr& cr, const MessageOrigin::Ptr& ) { CheckResultHandler(checkable, cr); }); m_ActiveChangedSlot = OnActiveChanged.connect([this](const ConfigObject::Ptr& obj, const Value&) { auto checkable = dynamic_pointer_cast(obj); if (!checkable || checkable->IsActive()) { return; } m_WorkQueue.Enqueue([this, checkable] { m_Metrics.erase(checkable.get()); }); }); } void OTLPMetricsWriter::Pause() { m_CheckResultsSlot.disconnect(); m_ActiveChangedSlot.disconnect(); m_FlushTimer->Stop(true); std::promise promise; auto future = promise.get_future(); m_WorkQueue.Enqueue([this, &promise] { Flush(); promise.set_value(); }, PriorityLow); if (auto status = future.wait_for(std::chrono::seconds(GetDisconnectTimeout())); status != std::future_status::ready) { Log(LogWarning, "OTLPMetricsWriter") << "Disconnect timeout reached while flushing OTel metrics, discarding '" << m_DataPointsCount << "' data points ('" << m_RecordedBytes << "' bytes)."; } m_Exporter->Stop(); m_WorkQueue.Join(); m_Metrics.clear(); Log(LogInformation, "OTLPMetricsWriter") << "'" << GetName() << "' paused."; ObjectImpl::Pause(); } void OTLPMetricsWriter::CheckResultHandler(const Checkable::Ptr& checkable, const CheckResult::Ptr& cr) { if (!IcingaApplication::GetInstance()->GetEnablePerfdata() || !checkable->GetEnablePerfdata() || !cr->GetPerformanceData()) { return; } m_WorkQueue.Enqueue([this, checkable, cr] { if (m_Exporter->Stopped()) { return; } CONTEXT("Processing check result for '" << checkable->GetName() << "'."); auto startTime = cr->GetScheduleStart(); auto endTime = cr->GetExecutionEnd(); Array::Ptr perfdata = cr->GetPerformanceData(); ObjectLock olock(perfdata); for (const Value& val : perfdata) { PerfdataValue::Ptr pdv; if (val.IsObjectType()) { pdv = val; } else { try { pdv = PerfdataValue::Parse(val); } catch (const std::exception&) { Log(LogWarning, "OTLPMetricsWriter") << "Ignoring invalid perfdata for checkable '" << checkable->GetName() << "' and command '" << checkable->GetCheckCommand()->GetName() << "' with value: " << val; continue; } } OTel::AttrsMap attrs{{"perfdata_label", pdv->GetLabel()}}; if (auto unit = pdv->GetUnit(); !unit.IsEmpty()) { attrs.emplace("unit", std::move(unit)); } AddBytesAndFlushIfNeeded(Record(checkable, cr, l_PerfdataMetric, pdv->GetValue(), startTime, endTime, std::move(attrs))); if (GetEnableSendThresholds()) { std::array, 4> thresholds{{ {"critical", pdv->GetCrit()}, {"warning", pdv->GetWarn()}, {"min", pdv->GetMin()}, {"max", pdv->GetMax()}, }}; for (auto& [label, threshold] : thresholds) { if (!threshold.IsEmpty()) { attrs = { {"perfdata_label", pdv->GetLabel()}, {"threshold_type", std::move(label)}, }; AddBytesAndFlushIfNeeded( Record( checkable, cr, l_ThresholdMetric, Convert::ToDouble(threshold), startTime, endTime, std::move(attrs) ) ); } } } } }); } void OTLPMetricsWriter::Flush(bool fromTimer) { // If previous export is still in progress and this flush is requested from timer, skip it. // For manual flushes (e.g., due to reaching flush threshold), we want to block until // the previous export is done before returning to the caller (blocking is handled in OTel::Export()). if (fromTimer && m_Exporter->Exporting()) { return; } Log(LogDebug, "OTLPMetricsWriter") << "Flushing OTel metrics to OpenTelemetry backend" << (fromTimer ? " (timer expired)." : "."); auto request = std::make_unique(); for (auto& [checkable, resourceMetrics] : m_Metrics) { if (resourceMetrics) { request->mutable_resource_metrics()->AddAllocated(resourceMetrics.release()); } } if (request->resource_metrics_size() == 0) { Log(LogDebug, "OTLPMetricsWriter") << "Not flushing OTel metrics: No data points recorded."; return; } m_Exporter->Export(std::move(request)); m_RecordedBytes.store(0, std::memory_order_relaxed); m_DataPointsCount.store(0, std::memory_order_relaxed); } void OTLPMetricsWriter::AddBytesAndFlushIfNeeded(std::size_t newBytes) { auto existingBytes = m_RecordedBytes.fetch_add(newBytes, std::memory_order_relaxed); if (auto bytes{existingBytes + newBytes}; bytes >= static_cast(GetFlushThreshold())) { Log(LogDebug, "OTLPMetricsWriter") << "Flush threshold reached, flushing '" << bytes << "' bytes of OTel metrics."; Flush(); } } /** * Record a data point for the specified OTel metric associated with the given configuration object. * * This method records a data point of type T for the specified metric name associated with the * provided configuration object. If the metric does not exist for the object, it is created. * * @tparam T The type of the data point to record (e.g., int64_t, double). * * @param checkable The configuration object to associate the metric with. * @param cr The check result associated with the metric data point, used for macro resolution in attributes. * @param metric The OTel metric enum value indicating which metric stream to record the data point for. * @param value The data point value to record. * @param startTime The start time of the data point in seconds. * @param endTime The end time of the data point in seconds. * @param attrs The attributes associated with the data point. * * @return The number of bytes recorded for this data point, which contributes to the flush threshold. */ template std::size_t OTLPMetricsWriter::Record( const Checkable::Ptr& checkable, const CheckResult::Ptr& cr, std::string_view metric, T value, double startTime, double endTime, OTel::AttrsMap attrs ) { std::size_t bytes = 0; auto& resourceMetrics = m_Metrics[checkable.get()]; if (!resourceMetrics) { using namespace std::string_view_literals; resourceMetrics = std::make_unique(); OTel::PopulateResourceAttrs(resourceMetrics); auto* resource = resourceMetrics->mutable_resource(); auto* attr = resource->add_attributes(); OTel::SetAttribute(*attr, "service.namespace"sv, GetServiceNamespace()); auto [host, service] = GetHostService(checkable); attr = resource->add_attributes(); OTel::SetAttribute(*attr, "icinga2.host.name"sv, host->GetName()); // Add entity reference (https://opentelemetry.io/docs/specs/otel/entities/data-model/). auto* entity = resource->add_entity_refs(); entity->mutable_id_keys()->Add("icinga2.host.name"); if (service) { entity->set_type("service"); entity->mutable_id_keys()->Add("icinga2.service.name"); attr = resource->add_attributes(); OTel::SetAttribute(*attr, "icinga2.service.name"sv, service->GetShortName()); } else { entity->set_type("host"); } attr = resource->add_attributes(); OTel::SetAttribute(*attr, "icinga2.command.name"sv, checkable->GetCheckCommand()->GetName()); if (Dictionary::Ptr tmpl = service ? GetServiceResourceAttributes() : GetHostResourceAttributes(); tmpl) { MacroProcessor::ResolverList resolvers{{"host", host}}; if (service) { resolvers.emplace_back("service", service); } ObjectLock olock(tmpl); for (const Dictionary::Pair& pair : tmpl) { String missingMacro; auto resolvedVal = MacroProcessor::ResolveMacros(pair.second, resolvers, cr, &missingMacro); if (missingMacro.IsEmpty()) { attr = resource->add_attributes(); try { OTel::SetAttribute(*attr, "icinga2.custom." + pair.first, resolvedVal); } catch (const std::exception& ex) { Log(LogWarning, "OTLPMetricsWriter") << "Ignoring invalid resource attribute '" << pair.first << "' for checkable '" << checkable->GetName() << "': " << ex.what(); // Remove the last attribute from the list which is the one we just attempted to set. resource->mutable_attributes()->RemoveLast(); } } } } bytes = resourceMetrics->ByteSizeLong(); } auto* sm = resourceMetrics->mutable_scope_metrics(0); auto* metrics = sm->mutable_metrics(); auto it = std::find_if(metrics->begin(), metrics->end(), [metric](const auto& m) { return m.name() == metric; }); OTel::Gauge* gaugePtr = nullptr; if (it == metrics->end()) { OTel::ValidateName(metric); auto* metricPtr = sm->add_metrics(); metricPtr->set_name(std::string(metric)); bytes += metricPtr->ByteSizeLong(); // Account for metric name size in bytes. gaugePtr = metricPtr->mutable_gauge(); } else { gaugePtr = it->mutable_gauge(); } bytes += OTel::Record(*gaugePtr, value, startTime, endTime, std::move(attrs)); m_DataPointsCount.fetch_add(1, std::memory_order_relaxed); return bytes; } void OTLPMetricsWriter::ValidatePort(const Lazy& lvalue, const ValidationUtils& utils) { ObjectImpl::ValidatePort(lvalue, utils); if (auto p = lvalue(); p < 1 || p > 65535) { BOOST_THROW_EXCEPTION(ValidationError(this, {"port"}, "Port must be in the range 1-65535.")); } } void OTLPMetricsWriter::ValidateFlushInterval(const Lazy& lvalue, const ValidationUtils& utils) { ObjectImpl::ValidateFlushInterval(lvalue, utils); if (lvalue() < 1) { BOOST_THROW_EXCEPTION(ValidationError(this, {"flush_interval"}, "Flush interval must be at least 1 second.")); } } void OTLPMetricsWriter::ValidateFlushThreshold(const Lazy& lvalue, const ValidationUtils& utils) { ObjectImpl::ValidateFlushThreshold(lvalue, utils); if (lvalue() < 1) { BOOST_THROW_EXCEPTION(ValidationError(this, {"flush_threshold"}, "Flush threshold must be at least 1.")); } // Protobuf limits the size of messages to be serialiazed/deserialized to max 2GiB. Thus, we can't accept // a flush threshold that would exceed that limit with a reasonable safe margin of 10MiB for any other // overhead in the message not accounted for in @c m_RecordedBytes. // See https://protobuf.dev/programming-guides/proto-limits/#total. constexpr std::size_t maxMessageSize = 2ULL * 1024 * 1024 * 1024 - 10 * 1024 * 1024; if (static_cast(lvalue()) > maxMessageSize) { BOOST_THROW_EXCEPTION(ValidationError( this, {"flush_threshold"}, "Flush threshold too high, would exceed Protobuf message size limit of 2GiB (1.9GiB max allowed)." )); } } void OTLPMetricsWriter::ValidateHostResourceAttributes(const Lazy& lvalue, const ValidationUtils& utils) { ObjectImpl::ValidateHostResourceAttributes(lvalue, utils); if (const auto& tags{lvalue()}; tags) { ValidateResourceAttributes(tags, "host_resource_attributes"); } } void OTLPMetricsWriter::ValidateServiceResourceAttributes(const Lazy& lvalue, const ValidationUtils& utils) { ObjectImpl::ValidateServiceResourceAttributes(lvalue, utils); if (const auto& tags{lvalue()}; tags) { ValidateResourceAttributes(tags, "service_resource_attributes"); } } void OTLPMetricsWriter::ValidateResourceAttributes(const Dictionary::Ptr& tmpl, const String& attrName) { ObjectLock olock(tmpl); for (const auto& pair : tmpl) { if (!MacroProcessor::ValidateMacroString(pair.second)) { BOOST_THROW_EXCEPTION(ValidationError( this, {attrName, pair.first}, "Closing $ not found in macro format string '" + pair.second + "'." )); } } }