From 415140bc3671326dd817407dc4c16a4f972c2f2a Mon Sep 17 00:00:00 2001 From: Yonas Habteab Date: Tue, 13 Jan 2026 14:04:01 +0100 Subject: [PATCH] Add common `OTel` type/lib --- CMakeLists.txt | 12 + icinga-app/CMakeLists.txt | 4 + lib/CMakeLists.txt | 4 + lib/otel/CMakeLists.txt | 43 +++ lib/otel/otel.cpp | 649 +++++++++++++++++++++++++++++++++++++ lib/otel/otel.hpp | 190 +++++++++++ lib/remote/httpmessage.cpp | 8 + lib/remote/httpmessage.hpp | 15 + test/CMakeLists.txt | 4 + 9 files changed, 929 insertions(+) create mode 100644 lib/otel/CMakeLists.txt create mode 100644 lib/otel/otel.cpp create mode 100644 lib/otel/otel.hpp diff --git a/CMakeLists.txt b/CMakeLists.txt index 5ecdba054..ed657d615 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -37,6 +37,7 @@ option(ICINGA2_WITH_LIVESTATUS "Build the Livestatus module" ${ICINGA2_MASTER}) option(ICINGA2_WITH_NOTIFICATION "Build the notification module" ON) option(ICINGA2_WITH_PERFDATA "Build the perfdata module" ${ICINGA2_MASTER}) option(ICINGA2_WITH_ICINGADB "Build the IcingaDB module" ${ICINGA2_MASTER}) +option(ICINGA2_WITH_OPENTELEMETRY "Build the OpenTelemetry integration module" ${ICINGA2_MASTER}) option (USE_SYSTEMD "Configure icinga as native systemd service instead of a SysV initscript" OFF) @@ -207,6 +208,17 @@ set(HAVE_EDITLINE "${EDITLINE_FOUND}") find_package(Termcap) set(HAVE_TERMCAP "${TERMCAP_FOUND}") +if(ICINGA2_WITH_OPENTELEMETRY) + # Newer Protobuf versions provide a CMake config package that we should prefer, since it implicitly + # links against all its dependencies (like absl, etc.) that would otherwise need to be linked manually. + # Thus, first try to find Protobuf in config mode and only fall back to module mode if that fails. + find_package(Protobuf CONFIG) + if(NOT Protobuf_FOUND) + find_package(Protobuf REQUIRED) + endif() + list(APPEND base_DEPS protobuf::libprotobuf-lite) +endif() + include_directories( ${CMAKE_CURRENT_SOURCE_DIR} ${CMAKE_CURRENT_SOURCE_DIR}/lib ${CMAKE_CURRENT_BINARY_DIR} ${CMAKE_CURRENT_BINARY_DIR}/lib diff --git a/icinga-app/CMakeLists.txt b/icinga-app/CMakeLists.txt index a93589395..88ad8bfdc 100644 --- a/icinga-app/CMakeLists.txt +++ b/icinga-app/CMakeLists.txt @@ -50,6 +50,10 @@ if(ICINGA2_WITH_NOTIFICATION) list(APPEND icinga_app_SOURCES $) endif() +if(ICINGA2_WITH_OPENTELEMETRY) + list(APPEND icinga_app_SOURCES $) +endif() + if(ICINGA2_WITH_PERFDATA) list(APPEND icinga_app_SOURCES $) endif() diff --git a/lib/CMakeLists.txt b/lib/CMakeLists.txt index 2eb3d1832..ebb0ce404 100644 --- a/lib/CMakeLists.txt +++ b/lib/CMakeLists.txt @@ -50,6 +50,10 @@ if(ICINGA2_WITH_NOTIFICATION) add_subdirectory(notification) endif() +if(ICINGA2_WITH_OPENTELEMETRY) + add_subdirectory(otel) +endif() + if(ICINGA2_WITH_PERFDATA) add_subdirectory(perfdata) endif() diff --git a/lib/otel/CMakeLists.txt b/lib/otel/CMakeLists.txt new file mode 100644 index 000000000..dca82a91d --- /dev/null +++ b/lib/otel/CMakeLists.txt @@ -0,0 +1,43 @@ +# SPDX-FileCopyrightText: 2026 Icinga GmbH +# SPDX-License-Identifier: GPL-3.0-or-later + +set(ICINGA2_OPENTELEMETRY_PROTOS_DIR "${icinga2_SOURCE_DIR}/third-party/opentelemetry-proto") +protobuf_generate( + LANGUAGE cpp + # According to the Protobuf docs[^1], the Protobuf compiler generates with the "LITE_RUNTIME" option much + # smaller code than the default optimze_for=SPEED option, which includes code for reflection, descriptors, + # and other features not needed by any part of the Icinga 2 OpenTelemetry integration. Thus, we use the "lite" + # option to generate code that only depend on the libprotobuf-lite instead of the full libprotobuf library. + # + # The only downside of using the lite runtime is that we won't be able to use any debugging capabilities + # provided by the full Protobuf runtime (like the DebugString() method on messages for easy printing, + # which heavily relies on reflection). + # + # [^1]: https://protobuf.dev/programming-guides/proto3/#options + PLUGIN_OPTIONS lite + OUT_VAR otel_PROTO_SRCS + IMPORT_DIRS "${ICINGA2_OPENTELEMETRY_PROTOS_DIR}" + PROTOS + "${ICINGA2_OPENTELEMETRY_PROTOS_DIR}/opentelemetry/proto/collector/metrics/v1/metrics_service.proto" + "${ICINGA2_OPENTELEMETRY_PROTOS_DIR}/opentelemetry/proto/common/v1/common.proto" + "${ICINGA2_OPENTELEMETRY_PROTOS_DIR}/opentelemetry/proto/metrics/v1/metrics.proto" + "${ICINGA2_OPENTELEMETRY_PROTOS_DIR}/opentelemetry/proto/resource/v1/resource.proto" +) + +set(otel_SOURCES + otel.cpp otel.hpp + ${otel_PROTO_SRCS} +) + +add_library(otel OBJECT ${otel_SOURCES}) +add_dependencies(otel base remote) +target_include_directories(otel + SYSTEM PUBLIC + ${Protobuf_INCLUDE_DIRS} + ${CMAKE_CURRENT_BINARY_DIR} +) + +set_target_properties( + otel PROPERTIES + FOLDER Lib +) diff --git a/lib/otel/otel.cpp b/lib/otel/otel.cpp new file mode 100644 index 000000000..7d9c55765 --- /dev/null +++ b/lib/otel/otel.cpp @@ -0,0 +1,649 @@ +// SPDX-FileCopyrightText: 2026 Icinga GmbH +// SPDX-License-Identifier: GPL-3.0-or-later + +#include "otel/otel.hpp" +#include "base/application.hpp" +#include "base/defer.hpp" +#include "base/tcpsocket.hpp" +#include "base/tlsutility.hpp" +#include +#include +#include +#include + +using namespace icinga; + +namespace http = boost::beast::http; +namespace v1_metrics = opentelemetry::proto::metrics::v1; + +// The max buffer size used to batch Protobuf writes to Asio streams. +static constexpr std::size_t l_BufferSize = 64UL * 1024; +// The OpenTelemetry schema convention URL used in the exported metrics. +// See https://opentelemetry.io/docs/specs/semconv/ +static constexpr std::string_view l_OTelSchemaConv = "https://opentelemetry.io/schemas/1.39.0"; + +template std::size_t OTel::Record(Gauge&, int64_t, double, double, AttrsMap); +template std::size_t OTel::Record(Gauge&, double, double, double, AttrsMap); +template void OTel::SetAttribute(Attribute&, std::string_view&&, String&&); +template void OTel::SetAttribute(Attribute&, String&&, Value&); + +/** + * Calculate the exponential backoff duration for retrying failed exports or reconnections. + * + * This method calculates the backoff duration based on the number of retry attempts using an exponential + * backoff strategy as per OTel specifications. The backoff duration starts at a minimum value and doubles + * with each attempt, up to a maximum cap (30s). This helps to avoid overwhelming the OpenTelemetry backend + * with rapid retry attempts in case of transient errors. + * + * @param attempt The current retry attempt number (starting from 1). + * + * @return The calculated backoff duration in milliseconds. + */ +static constexpr std::chrono::milliseconds Backoff(uint64_t attempt) +{ + using namespace std::chrono; + + constexpr milliseconds MaxBackoffMs = seconds(30); + constexpr milliseconds MinBackoffMs = milliseconds(100); + + // 2^attempt may overflow, so we cap it to a safe value within the 64-bit range, + // which is sufficient to reach MaxBackoffMs from MinBackoffMs. + constexpr uint64_t maxSafeAttempt = 16; // 2^16 * 100ms = 6553.6s > 30s + auto exponential = MinBackoffMs * (1ULL << std::min(attempt, maxSafeAttempt)); + if (exponential >= MaxBackoffMs) { + return MaxBackoffMs; + } + return duration_cast(exponential); +} + +OTel::OTel(OTelConnInfo& connInfo): OTel{connInfo, IoEngine::Get().GetIoContext()} +{ +} + +OTel::OTel(OTelConnInfo& connInfo, boost::asio::io_context& io) + : m_ConnInfo{std::move(connInfo)}, + m_Strand{io}, + m_Export{io}, + m_RetryExportAndConnTimer{io}, + m_Exporting{false}, + m_Stopped{false} +{ + if (m_ConnInfo.EnableTls) { + m_TlsContext = MakeAsioSslContext(m_ConnInfo.TlsCrt, m_ConnInfo.TlsKey, m_ConnInfo.TlsCaCrt); + } +} + +void OTel::Start() +{ + if (m_Stopped.exchange(false)) { + ResetExporting(true); + } + + IoEngine::SpawnCoroutine(m_Strand, [this, keepAlive = ConstPtr(this)](boost::asio::yield_context yc) { + ExportLoop(yc); + }); +} + +/** + * Stop the OTel exporter and disconnect from the OpenTelemetry backend. + * + * This method blocks until the exporter has fully stopped and disconnected from the backend. + * It cancels any ongoing export operations and clears all its internal state, so that it can be + * safely restarted later if needed. + */ +void OTel::Stop() +{ + if (m_Stopped.exchange(true)) { + return; + } + + std::promise promise; + IoEngine::SpawnCoroutine(m_Strand, [this, &promise, keepAlive = ConstPtr(this)](boost::asio::yield_context& yc) { + m_Export.Set(); + m_RetryExportAndConnTimer.cancel(); + + if (!m_Stream) { + promise.set_value(); + return; + } + + std::visit([this, &yc](auto& stream) { + { + Timeout writerTimeout(m_Strand, boost::posix_time::seconds(5), [&stream] { + boost::system::error_code ec; + stream->lowest_layer().cancel(ec); + }); + m_Export.WaitForClear(yc); + } + + using StreamType = std::decay_t; + if constexpr (std::is_same_v::Ptr>) { + stream->GracefulDisconnect(m_Strand, yc); + } else { + static_assert(std::is_same_v::Ptr>, "Unknown stream type"); + boost::system::error_code ec; + stream->lowest_layer().shutdown(AsioTcpStream::lowest_layer_type::shutdown_both, ec); + stream->lowest_layer().close(ec); + } + }, *m_Stream); + + Log(LogInformation, "OTelExporter") + << "Disconnected from OpenTelemetry backend."; + + m_Stream.reset(); + m_Request.reset(); + promise.set_value(); + }); + promise.get_future().wait(); +} + +/** + * Export the given OTel metrics request to the OpenTelemetry backend. + * + * This method initiates the export of the provided OTel metrics request to the configured + * OpenTelemetry backend. If an export is already in progress, it waits for the previous + * export to complete before proceeding with the new export request (blocking the caller). + * + * @param request The OTel metrics request to export. + */ +void OTel::Export(std::unique_ptr&& request) +{ + std::unique_lock lock(m_Mutex); + if (m_Exporting) { + Log(LogWarning, "OTelExporter") + << "Received export request while previous export is still in progress. Waiting for it to complete."; + + m_ExportCV.wait(lock, [this] { return m_Stopped || !m_Exporting; }); + if (m_Stopped) { + return; + } + } + m_Exporting = true; + lock.unlock(); + + // Access to m_Request is serialized via m_Strand, so we must post the actual export operation to it. + boost::asio::post(m_Strand, [this, keepAlive = ConstPtr(this), request = std::move(request)]() mutable { + m_Request = std::move(request); + m_Export.Set(); + }); +} + +/** + * Populate the standard OTel resource attributes in the given ResourceMetrics Protobuf object. + * + * This method populates the standard OTel resource attributes as per OTel specifications[^1][^2] + * into the provided ResourceMetrics Protobuf object. It sets attributes such as service name, + * instance ID, version, and telemetry SDK information. + * + * @param rm The ResourceMetrics Protobuf object to populate. + * + * [^1]: https://opentelemetry.io/docs/specs/semconv/resource/#telemetry-sdk + * [^2]: https://opentelemetry.io/docs/specs/semconv/resource/service/ + */ +void OTel::PopulateResourceAttrs(const std::unique_ptr& rm) +{ + using namespace std::string_view_literals; + + rm->set_schema_url(l_OTelSchemaConv.data()); + auto* resource = rm->mutable_resource(); + + auto* attr = resource->add_attributes(); + SetAttribute(*attr, "service.name"sv, "Icinga 2"sv); + + auto instanceID = Application::GetEnvironmentId(); + if (instanceID.IsEmpty()) { + instanceID = "unknown"; + } + attr = resource->add_attributes(); + SetAttribute(*attr, "service.instance.id"sv, std::move(instanceID)); + + attr = resource->add_attributes(); + SetAttribute(*attr, "service.version"sv, Application::GetAppVersion()); + + attr = resource->add_attributes(); + // We don't actually use OTel SDKs here, but to comply with OTel specs, we need to provide these attributes anyway. + SetAttribute(*attr, "telemetry.sdk.language"sv, "cpp"sv); + + attr = resource->add_attributes(); + SetAttribute(*attr, "telemetry.sdk.name"sv, "Icinga 2 OTel Integration"sv); + + attr = resource->add_attributes(); + SetAttribute(*attr, "telemetry.sdk.version"sv, Application::GetAppVersion()); + + auto* ism = rm->add_scope_metrics(); + ism->set_schema_url(l_OTelSchemaConv.data()); + ism->mutable_scope()->set_name("icinga2"); + ism->mutable_scope()->set_version(Application::GetAppVersion()); +} + +/** + * Establish a connection to the OpenTelemetry backend endpoint. + * + * In case of connection failures, it retries as per OTel spec[^1] with exponential backoff until a successful + * connection is established or the exporter is stopped. Therefore, @c m_Stream is not guaranteed to be valid + * after this method returns, so the caller must check it before using it. + * + * @param yc The Boost.Asio yield context for asynchronous operations. + * + * [^1]: https://opentelemetry.io/docs/specs/otlp/#otlphttp-connection + */ +void OTel::Connect(boost::asio::yield_context& yc) +{ + Log(LogInformation, "OTelExporter") + << "Connecting to OpenTelemetry backend on host '" << m_ConnInfo.Host << ":" << m_ConnInfo.Port << "'."; + + for (uint64_t attempt = 1; !m_Stopped; ++attempt) { + try { + boost::asio::ip::tcp::socket socket{m_Strand.context()}; + icinga::Connect(socket, m_ConnInfo.Host, std::to_string(m_ConnInfo.Port), yc); + + if (m_ConnInfo.EnableTls) { + auto tlsStream = Shared::Make(m_Strand.context(), *m_TlsContext, m_ConnInfo.Host); + tlsStream->lowest_layer() = std::move(socket); + tlsStream->next_layer().async_handshake(AsioTlsStream::next_layer_type::client, yc); + + if (m_ConnInfo.VerifyPeerCertificate && !tlsStream->next_layer().IsVerifyOK()) { + BOOST_THROW_EXCEPTION(std::runtime_error( + "TLS certificate validation failed: " + tlsStream->next_layer().GetVerifyError() + )); + } + m_Stream = std::move(tlsStream); + } else { + auto tcpStream = Shared::Make(m_Strand.context()); + tcpStream->lowest_layer() = std::move(socket); + m_Stream = std::move(tcpStream); + } + + Log(LogInformation, "OTelExporter") + << "Successfully connected to OpenTelemetry backend."; + return; + } catch (const std::exception& ex) { + Log(m_Stopped ? LogDebug : LogCritical, "OTelExporter") + << "Cannot connect to OpenTelemetry backend '" << m_ConnInfo.Host << ":" << m_ConnInfo.Port + << "' (attempt #" << attempt << "): " << ex.what(); + + if (!m_Stopped) { + boost::system::error_code ec; + m_RetryExportAndConnTimer.expires_after(Backoff(attempt)); + m_RetryExportAndConnTimer.async_wait(yc[ec]); + } + } + } +} + +/** + * Main export loop for exporting OTel metrics to the configured backend. + * + * This method runs in a loop, waiting for new metrics to be available for export. In case of export failures, + * it retries the export as per OTel spec[^1] with exponential backoff until the export succeeds or the exporter + * is stopped. After a successful export, it clears the exported metrics from @c m_Request to make room for new metrics. + * + * @param yc The Asio yield context for asynchronous operations. + * + * [^1]: https://opentelemetry.io/docs/specs/otlp/#retryable-response-codes + */ +void OTel::ExportLoop(boost::asio::yield_context& yc) +{ + Defer cleanup{[this] { + m_Export.Clear(); + ResetExporting(true /* notify all */); + }}; + + namespace ch = std::chrono; + + while (!m_Stopped) { + m_Export.WaitForSet(yc); + if (!m_Stream) { + Connect(yc); + } + + for (uint64_t attempt = 1; m_Stream && !m_Stopped; ++attempt) { + try { + ExportImpl(yc); + m_Request.reset(); + m_Export.Clear(); + ResetExporting(false /* notify one */); + break; + } catch (const RetryableExportError& ex) { + ch::milliseconds retryAfter; + if (auto throttle = ex.Throttle(); throttle > 0ms) { + retryAfter = throttle; + } else { + retryAfter = Backoff(attempt); + } + + Log(LogWarning, "OTelExporter") + << "Failed to export metrics to OpenTelemetry backend (attempt #" << attempt << "). Retrying in " + << retryAfter.count() << "ms."; + + boost::system::error_code ec; + m_RetryExportAndConnTimer.expires_after(retryAfter); + m_RetryExportAndConnTimer.async_wait(yc[ec]); + } catch (const std::exception& ex) { + LogSeverity severity = LogCritical; + const auto* ser{dynamic_cast(&ex)}; + // Since we don't have a proper connection health check mechanism, we assume that certain errors + // indicate a broken connection and force a reconnect in those cases. For the `end_of_stream` case, + // we downgrade the log severity to debug level since this is a normal occurrence when using an OTEL + // collector compatible backend that don't honor keep-alive connections (e.g., OpenSearch Data Prepper). + if (m_Stopped || (ser && ser->code() == http::error::end_of_stream)) { + severity = LogDebug; + } + Log{severity, "OTelExporter", DiagnosticInformation(ex, false)}; + m_Stream.reset(); // Force reconnect on next export attempt. + } + } + } +} + +void OTel::ExportImpl(boost::asio::yield_context& yc) const +{ + AsioProtobufOutStream outputS{*m_Stream, m_ConnInfo, yc}; + [[maybe_unused]] auto serialized = m_Request->SerializeToZeroCopyStream(&outputS); + ASSERT(serialized); + // Must have completed chunk writing successfully, otherwise reading the response will hang forever. + VERIFY(outputS.WriterDone()); + + IncomingHttpResponse responseMsg{*m_Stream}; + responseMsg.Parse(yc); + + if (auto ct = responseMsg[http::field::content_type]; ct != "application/x-protobuf") { + if (responseMsg.result() == http::status::ok) { + // Some OpenTelemetry Collector compatible backends (e.g., Prometheus OTLP Receiver) respond with 200 OK + // but without the expected Protobuf content type. So, don't do anything here since the request succeeded. + return; + } + Log(LogWarning, "OTelExporter") + << "Unexpected Content-Type from OpenTelemetry backend '" << ct << "' (" << responseMsg.reason() << "):\n" + << responseMsg.body(); + } else if (responseMsg.result_int() >= 200 && responseMsg.result_int() <= 299) { + // We've got a valid Protobuf response, so we've to deserialize the body to check for partial success. + // See https://opentelemetry.io/docs/specs/otlp/#partial-success-1. + google::protobuf::Arena arena; + auto* response = MetricsResponse::default_instance().New(&arena); + [[maybe_unused]] auto deserialized = response->ParseFromString(responseMsg.body()); + ASSERT(deserialized); + + if (response->has_partial_success()) { + const auto& ps = response->partial_success(); + const auto& msg = ps.error_message(); + if (ps.rejected_data_points() > 0 || !msg.empty()) { + Log(LogWarning, "OTelExporter") + << "OpenTelemetry backend reported partial success: " << (msg.empty() ? "" : msg) + << " (" << ps.rejected_data_points() << " metric data points rejected)."; + } + } + } else if (IsRetryableExportError(responseMsg.result())) { + uint64_t throttleSeconds = 0; + if (auto throttle = responseMsg[http::field::retry_after]; !throttle.empty()) { + try { + throttleSeconds = boost::lexical_cast(throttle); + } catch (const std::exception& ex) { + Log(LogWarning, "OTelExporter") + << "Failed to parse 'Retry-After' header from OpenTelemetry backend response: " << ex.what(); + } + } + BOOST_THROW_EXCEPTION(RetryableExportError{throttleSeconds}); + } else { + Log(LogWarning, "OTelExporter") + << "OpenTelemetry backend responded with non-success and non-retryable status code " + << responseMsg.result_int() << " (" << responseMsg.reason() << ").\n" << responseMsg.body(); + } +} + +/** + * Reset the exporting state and notify waiters. + * + * This method resets the internal exporting state to indicate that no export is currently + * in progress. It then notifies either one or all waiters waiting for the export to complete, + * based on the @c notifyAll parameter. + * + * @param notifyAll If true, notifies all waiters; otherwise, notifies only one waiter. + */ +void OTel::ResetExporting(bool notifyAll) +{ + { + std::lock_guard lock(m_Mutex); + m_Exporting = false; + } + if (notifyAll) { + m_ExportCV.notify_all(); + } else { + m_ExportCV.notify_one(); + } +} + +/** + * Validate the given OTel metric name according to OTel naming conventions[^1]. + * Here's the ABNF definition for reference: + * @verbatim + * instrument-name = ALPHA 0*254 ("_" / "." / "-" / "/" / ALPHA / DIGIT) + * ALPHA = %x41-5A / %x61-7A; A-Z / a-z + * DIGIT = %x30-39 ; 0-9 + * @endverbatim + * + * @param name The metric name to validate. + * + * @throws std::invalid_argument if the metric name is invalid. + * + * [^1]: https://opentelemetry.io/docs/specs/otel/metrics/api/#instrument-name-syntax + */ +void OTel::ValidateName(std::string_view name) +{ + if (name.empty() || name.size() > 255) { + BOOST_THROW_EXCEPTION(std::invalid_argument("OTel instrument name must be between 1 and 255 characters long.")); + } + + auto isAlpha = [](char c) { return ('A' <= c && c <= 'Z') || ('a' <= c && c <= 'z'); }; + auto isDigit = [](char c) { return '0' <= c && c <= '9'; }; + for (std::size_t i = 0; i < name.size(); ++i) { + auto c = name[i]; + if (i == 0 && !isAlpha(c)) { + BOOST_THROW_EXCEPTION(std::invalid_argument("OTel instrument name must start with an alphabetic character.")); + } + if (!isAlpha(c) && !isDigit(c) && c != '_' && c != '.' && c != '-' && c != '/') { + BOOST_THROW_EXCEPTION(std::invalid_argument( + "OTel instrument name contains invalid character '" + std::string(1, c) + "'." + )); + } + } +} + +/** + * Set the given OTel attribute key-value pair in the provided @c Attribute Protobuf object. + * + * This method sets the given key-value pair in the provided KeyValue Protobuf object according to + * OTel specifications[^1]. While the OTel specs[^2] allows a wider range of attr value types, we + * only support the most common/scalar types (Boolean, Number (double), and String) for simplicity. + * + * @param attr The OTel attribute Protobuf object to set the value for. + * @param key The attribute key to set. Must not be empty. + * @param value The Value object containing the value to set in the attribute. + * + * @throws std::invalid_argument if key is empty or if @c Value represents an unsupported attribute value type. + * + * [^1]: https://opentelemetry.io/docs/specs/otel/common/#attribute + * [^2]: https://opentelemetry.io/docs/specs/otel/common/#anyvalue + */ +template +void OTel::SetAttribute(Attribute& attr, Key&& key, AttrVal&& value) +{ + if (begin(key) == end(key)) { + BOOST_THROW_EXCEPTION(std::invalid_argument("OTel attribute key must not be empty.")); + } + + if constexpr (std::is_rvalue_reference_v && std::is_same_v, String>) { + attr.set_key(std::move(key.GetData())); + } else { + attr.set_key(std::string{std::forward(key)}); + } + + constexpr bool isRvalReference = std::is_rvalue_reference_v; + if constexpr (isRvalReference && std::is_same_v, String>) { + attr.mutable_value()->set_string_value(std::move(value.GetData())); + } else if constexpr (std::is_constructible_v) { + attr.mutable_value()->set_string_value(std::string{std::forward(value)}); + } else { + switch (value.GetType()) { + case ValueBoolean: + attr.mutable_value()->set_bool_value(value.template Get()); + break; + case ValueNumber: + attr.mutable_value()->set_double_value(value.template Get()); + break; + case ValueString: + if (isRvalReference) { + attr.mutable_value()->set_string_value(std::move(value.template Get().GetData())); + } else { + attr.mutable_value()->set_string_value(value.template Get().GetData()); + } + break; + default: + BOOST_THROW_EXCEPTION(std::invalid_argument( + "OTel attribute value must be of type Boolean, Number, or String, got '" + value.GetTypeName() + "'." + )); + } + } +} + +/** + * Record a data point in the given OTel Gauge metric stream with the provided value, timestamps, and attributes. + * + * This method adds a new data point to the provided Gauge Protobuf object with the given value, start and end + * timestamps, and a set of attributes. The value can be either an int64_t or a double, depending on the type + * of the Gauge. The timestamps are expected to be in seconds and will be converted to nanoseconds as required + * by OTel specifications. The attributes are provided as a map of key-value pairs and will be set in the data + * point according to OTel attribute specs. + * + * @tparam T The type of the data point value, which must be either int64_t or double. + * + * @param gauge The Gauge Protobuf object to record the data point in. + * @param data The value of the data point to record. + * @param start The start timestamp of the data point in seconds. + * @param end The end timestamp of the data point in seconds. + * @param attrs A map of attribute key-value pairs to set in the data point. + * + * @return The size in bytes of the recorded data point after serialization. + * + * @throws std::invalid_argument if any attribute key is empty or has an unsupported value type. + */ +template +std::size_t OTel::Record(Gauge& gauge, T data, double start, double end, AttrsMap attrs) +{ + namespace ch = std::chrono; + + auto* dataPoint = gauge.add_data_points(); + if constexpr (std::is_same_v) { + dataPoint->set_as_double(data); + } else { + dataPoint->set_as_int(data); + } + + dataPoint->set_start_time_unix_nano( + static_cast(ch::duration_cast(ch::duration(start)).count()) + ); + dataPoint->set_time_unix_nano( + static_cast(ch::duration_cast(ch::duration(end)).count()) + ); + + for (auto it{attrs.begin()}; it != attrs.end(); /* NOPE */) { + auto* attr = dataPoint->add_attributes(); + auto node = attrs.extract(it++); + SetAttribute(*attr, node.key(), node.mapped()); + } + return dataPoint->ByteSizeLong(); +} + +/** + * Determine if the given HTTP status code represents a retryable export error as per OTel specs[^1]. + * + * @param status The HTTP status code to check. + * + * @return true if the status code indicates a retryable error; false otherwise. + * + * [^1]: https://opentelemetry.io/docs/specs/otlp/#retryable-response-codes + */ +bool OTel::IsRetryableExportError(const http::status status) +{ + return status == http::status::too_many_requests + || status == http::status::bad_gateway + || status == http::status::service_unavailable + || status == http::status::gateway_timeout; +} + +AsioProtobufOutStream::AsioProtobufOutStream(const AsioTlsOrTcpStream& stream, const OTelConnInfo& connInfo, boost::asio::yield_context yc) + : m_Writer{stream}, m_YieldContext{std::move(yc)} +{ + m_Writer.method(http::verb::post); + m_Writer.target(connInfo.MetricsEndpoint); + m_Writer.set(http::field::host, connInfo.Host + ":" + std::to_string(connInfo.Port)); + m_Writer.set(http::field::content_type, "application/x-protobuf"); + if (!connInfo.BasicAuth.IsEmpty()) { + m_Writer.set(http::field::authorization, "Basic " + connInfo.BasicAuth); + } + m_Writer.StartStreaming(); +} + +bool AsioProtobufOutStream::Next(void** data, int* size) +{ + if (m_Buffered == l_BufferSize) { + Flush(); + } + // Prepare a new buffer segment that the Protobuf serializer can write into. + // The buffer size is fixed to l_BufferSize, and as seen above, we flush if the previous buffer + // segment was fully used (which is always the case on each Next call after the initial one), so + // we'll end up reusing the same memory region for each Next call because when we flush, we also + // consume the committed data, and that region becomes writable again. + auto buf = m_Writer.Prepare(l_BufferSize - m_Buffered); + *data = buf.data(); + *size = static_cast(l_BufferSize); + m_Buffered = l_BufferSize; + return true; +} + +void AsioProtobufOutStream::BackUp(int count) +{ + // Make sure we've not already finalized the HTTP body because BackUp + // is supposed to be called only after a preceding (final) Next call. + ASSERT(!m_Writer.Done()); + ASSERT(static_cast(count) <= m_Buffered); + ASSERT(m_Buffered == l_BufferSize); + // If the last prepared buffer segment was not fully used, we need to adjust the buffered size, + // so that we don't commit unused memory regions with the below Flush() call. If count is zero, + // this adjustment is a no-op, and indicates that the entire buffer was used and there won't be + // any subsequent Next calls anymore (i.e., the Protobuf serialization is complete). + m_Buffered -= count; + Flush(true); +} + +int64_t AsioProtobufOutStream::ByteCount() const +{ + return m_Pos + static_cast(m_Buffered); +} + +/** + * Flush any buffered data to the underlying Asio stream. + * + * If the `finish` parameter is set to true, it indicates that no more data will + * be buffered/generated, and the HTTP body will be finalized accordingly. + * + * @param finish Whether this is the final flush operation. + */ +void AsioProtobufOutStream::Flush(bool finish) +{ + ASSERT(m_Buffered > 0 || finish); + m_Writer.Commit(m_Buffered); + m_Writer.Flush(m_YieldContext, finish); + m_Pos += static_cast(m_Buffered); + m_Buffered = 0; +} + +/** + * Check if the underlying HTTP request writer has completed writing. + * + * @return true if the writer has finished writing; false otherwise. + */ +bool AsioProtobufOutStream::WriterDone() +{ + return m_Writer.Done(); +} diff --git a/lib/otel/otel.hpp b/lib/otel/otel.hpp new file mode 100644 index 000000000..44ef0df3e --- /dev/null +++ b/lib/otel/otel.hpp @@ -0,0 +1,190 @@ +// SPDX-FileCopyrightText: 2026 Icinga GmbH +// SPDX-License-Identifier: GPL-3.0-or-later + +#pragma once + +#include "base/io-engine.hpp" +#include "base/tlsstream.hpp" +#include "base/shared.hpp" +#include "base/shared-object.hpp" +#include "base/string.hpp" +#include "remote/httpmessage.hpp" +#include "otel/opentelemetry/proto/collector/metrics/v1/metrics_service.pb.h" +#include +#include +#include +#include +#include +#include +#include + +namespace icinga +{ + +/** + * Connection parameters for connecting to an OpenTelemetry collector endpoint. + * + * @ingroup otel + */ +struct OTelConnInfo +{ + bool EnableTls{false}; + bool VerifyPeerCertificate{true}; + int Port; + String Host; + String TlsCaCrt; + String TlsCrt; + String TlsKey; + String MetricsEndpoint; + String BasicAuth; // Base64-encoded "username:password" string for basic authentication. +}; + +/** + * OTel implements the OpenTelemetry Protocol (OTLP) exporter. + * + * This class manages the connection to an OpenTelemetry collector or compatible backend and + * handles exporting (currently only metrics) in OTLP Protobuf format over HTTP. It supports + * TLS connections, basic authentication, and implements retry logic for transient errors as + * per OTel specs. + * + * @ingroup otel + */ +class OTel : public SharedObject +{ +public: + DECLARE_PTR_TYPEDEFS(OTel); + + // Protobuf request and response types for exporting metrics. + using MetricsRequest = opentelemetry::proto::collector::metrics::v1::ExportMetricsServiceRequest; + using MetricsResponse = opentelemetry::proto::collector::metrics::v1::ExportMetricsServiceResponse; + // Protobuf attribute type used for OTel resource and data point attributes. + using Attribute = opentelemetry::proto::common::v1::KeyValue; + // Protobuf Gauge type used for representing OTel Gauge metric streams. + using Gauge = opentelemetry::proto::metrics::v1::Gauge; + + /** + * Represents a collection of OTel attributes[^1] as key-value pairs. + * + * [^1]: https://opentelemetry.io/docs/specs/otel/common/#attribute + */ + using AttrsMap = std::map; + + explicit OTel(OTelConnInfo& connInfo); + + void Start(); + void Stop(); + void Export(std::unique_ptr&& request); + + bool Exporting() const + { + std::lock_guard lock(m_Mutex); + return m_Exporting; + } + + bool Stopped() const { return m_Stopped.load(); } + + static void PopulateResourceAttrs(const std::unique_ptr& rm); + static void ValidateName(std::string_view name); + template && ( + std::is_same_v, Value> || + std::is_constructible_v + ) + >> + static void SetAttribute(Attribute& attr, Key&& key, AttrVal&& value); + static bool IsRetryableExportError(boost::beast::http::status status); + + template, int64_t> || std::is_same_v, double>> + > + [[nodiscard]] static std::size_t Record(Gauge& gauge, T data, double start, double end, AttrsMap attrs); + +private: + OTel(OTelConnInfo& connInfo, boost::asio::io_context& io); + + void Connect(boost::asio::yield_context& yc); + void ExportLoop(boost::asio::yield_context& yc); + void ExportImpl(boost::asio::yield_context& yc) const; + + void ResetExporting(bool notifyAll = false); + + const OTelConnInfo m_ConnInfo; + std::optional m_Stream; + Shared::Ptr m_TlsContext; + boost::asio::io_context::strand m_Strand; + + AsioDualEvent m_Export; // Event to signal when a new export request is available. + // Timer for scheduling retries of failed exports and reconnection attempts. + boost::asio::steady_timer m_RetryExportAndConnTimer; + + // Mutex and condition variable for synchronizing concurrent export requests. + mutable std::mutex m_Mutex; + std::condition_variable m_ExportCV; + std::unique_ptr m_Request; // Current export request being processed (if any). + bool m_Exporting; // Whether an export operation is in progress. + std::atomic_bool m_Stopped; // Whether someone has requested to stop the exporter. +}; +extern template std::size_t OTel::Record(Gauge&, int64_t, double, double, AttrsMap); +extern template std::size_t OTel::Record(Gauge&, double, double, double, AttrsMap); +extern template void OTel::SetAttribute(Attribute&, std::string_view&&, String&&); +extern template void OTel::SetAttribute(Attribute&, String&&, Value&); + +/** + * A zero-copy output stream that writes directly to an Asio [TLS] stream. + * + * This class implements the @c google::protobuf::io::ZeroCopyOutputStream interface, allowing Protobuf + * serializers to write data directly to an Asio [TLS] stream without unnecessary copying of data. It + * doesn't buffer data internally, but instead writes it in chunks to the underlying stream using an HTTP + * request writer (@c HttpRequestWriter) in a Protobuf binary format. It is not safe to be reused across + * multiple export calls. + * + * @ingroup otel + */ +class AsioProtobufOutStream final : public google::protobuf::io::ZeroCopyOutputStream +{ +public: + AsioProtobufOutStream(const AsioTlsOrTcpStream& stream, const OTelConnInfo& connInfo, boost::asio::yield_context yc); + + bool Next(void** data, int* size) override; + void BackUp(int count) override; + int64_t ByteCount() const override; + + bool WriterDone(); + +private: + void Flush(bool finish = false); + + int64_t m_Pos{0}; // Monotonically increasing byte position in the stream (excluding m_Buffered bytes). + std::size_t m_Buffered{0}; // Number of uncommitted bytes currently buffered. + OutgoingHttpRequest m_Writer; + boost::asio::yield_context m_YieldContext; // Yield context for async operations. +}; + +/** + * Exception class representing a retryable export error. + * + * This exception is thrown when an export attempt to an OpenTelemetry collector fails + * with a retryable error status. It carries an optional HTTP throttle[^1] duration indicating + * how long to wait before retrying the export. + * + * [^1]: https://opentelemetry.io/docs/specs/otlp/#otlphttp-throttling + * + * @ingroup otel + */ +struct RetryableExportError : std::exception +{ + explicit RetryableExportError(uint64_t throttle): m_Throttle{throttle} + { + } + + [[nodiscard]] std::chrono::seconds Throttle() const { return m_Throttle; } + const char* what() const noexcept override + { + return "OTel::RetryableExportError()"; + } + +private: + std::chrono::seconds m_Throttle; +}; + +} // namespace icinga diff --git a/lib/remote/httpmessage.cpp b/lib/remote/httpmessage.cpp index 7641e75ab..221c8945e 100644 --- a/lib/remote/httpmessage.cpp +++ b/lib/remote/httpmessage.cpp @@ -93,6 +93,14 @@ void IncomingHttpMessage::ParseBody( Base::body() = std::move(m_Parser.release().body()); } +template +void IncomingHttpMessage::Parse(boost::asio::yield_context& yc) +{ + boost::beast::flat_buffer buf; + ParseHeader(buf, yc); + ParseBody(buf, yc); +} + HttpApiRequest::HttpApiRequest(Shared::Ptr stream) : IncomingHttpMessage(std::move(stream)) { } diff --git a/lib/remote/httpmessage.hpp b/lib/remote/httpmessage.hpp index 30f11442e..80e61de39 100644 --- a/lib/remote/httpmessage.hpp +++ b/lib/remote/httpmessage.hpp @@ -178,6 +178,14 @@ public: */ void ParseBody(boost::beast::flat_buffer& buf, boost::asio::yield_context yc); + /** + * Parse the entire message (header and body) using the internal parser object. + * + * This is just a convenience wrapper around @c ParseHeader() and @c ParseBody() that consecutively calls + * both of them. It can be used when you don't need to do anything with the header before parsing the body. + */ + void Parse(boost::asio::yield_context& yc); + ParserType& Parser() { return m_Parser; } private: @@ -251,6 +259,13 @@ public: [[nodiscard]] bool HasSerializationStarted() const { return m_SerializationStarted; } + /** + * Check if the message has been fully serialized. + * + * @return true if the message is fully serialized; false otherwise. + */ + [[nodiscard]] bool Done() { return m_Serializer.is_done(); } + /** * Sends the contents of a file. * diff --git a/test/CMakeLists.txt b/test/CMakeLists.txt index d1f0906cb..c20e0aaf5 100644 --- a/test/CMakeLists.txt +++ b/test/CMakeLists.txt @@ -68,6 +68,10 @@ if(ICINGA2_WITH_NOTIFICATION) list(APPEND types_test_SOURCES $) endif() +if(ICINGA2_WITH_OPENTELEMETRY) + list(APPEND types_test_SOURCES $) +endif() + if(ICINGA2_WITH_PERFDATA) list(APPEND types_test_SOURCES $) endif()