// SPDX-FileCopyrightText: 2026 Icinga GmbH // SPDX-License-Identifier: GPL-3.0-or-later #include "otel/otel.hpp" #include "base/application.hpp" #include "base/defer.hpp" #include "base/tcpsocket.hpp" #include "base/tlsutility.hpp" #include #include #include #include using namespace icinga; namespace http = boost::beast::http; namespace v1_metrics = opentelemetry::proto::metrics::v1; // The max buffer size used to batch Protobuf writes to Asio streams. static constexpr std::size_t l_BufferSize = 64UL * 1024; // The OpenTelemetry schema convention URL used in the exported metrics. // See https://opentelemetry.io/docs/specs/semconv/ static constexpr std::string_view l_OTelSchemaConv = "https://opentelemetry.io/schemas/1.39.0"; template std::size_t OTel::Record(Gauge&, int64_t, double, double, AttrsMap); template std::size_t OTel::Record(Gauge&, double, double, double, AttrsMap); template void OTel::SetAttribute(Attribute&, std::string_view&&, String&&); template void OTel::SetAttribute(Attribute&, String&&, Value&); /** * Calculate the exponential backoff duration for retrying failed exports or reconnections. * * This method calculates the backoff duration based on the number of retry attempts using an exponential * backoff strategy as per OTel specifications. The backoff duration starts at a minimum value and doubles * with each attempt, up to a maximum cap (30s). This helps to avoid overwhelming the OpenTelemetry backend * with rapid retry attempts in case of transient errors. * * @param attempt The current retry attempt number (starting from 1). * * @return The calculated backoff duration in milliseconds. */ static constexpr std::chrono::milliseconds Backoff(uint64_t attempt) { using namespace std::chrono; constexpr milliseconds MaxBackoffMs = seconds(30); constexpr milliseconds MinBackoffMs = milliseconds(100); // 2^attempt may overflow, so we cap it to a safe value within the 64-bit range, // which is sufficient to reach MaxBackoffMs from MinBackoffMs. constexpr uint64_t maxSafeAttempt = 16; // 2^16 * 100ms = 6553.6s > 30s auto exponential = MinBackoffMs * (1ULL << std::min(attempt, maxSafeAttempt)); if (exponential >= MaxBackoffMs) { return MaxBackoffMs; } return duration_cast(exponential); } OTel::OTel(OTelConnInfo& connInfo): OTel{connInfo, IoEngine::Get().GetIoContext()} { } OTel::OTel(OTelConnInfo& connInfo, boost::asio::io_context& io) : m_ConnInfo{std::move(connInfo)}, m_Strand{io}, m_ExportAsioCV{io}, m_RetryExportAndConnTimer{io}, m_Exporting{false}, m_Stopped{false} { if (m_ConnInfo.EnableTls) { m_TlsContext = MakeAsioSslContext(m_ConnInfo.TlsCrt, m_ConnInfo.TlsKey, m_ConnInfo.TlsCaCrt); } } void OTel::Start() { if (m_Stopped.exchange(false)) { ResetExporting(true); } IoEngine::SpawnCoroutine(m_Strand, [this, keepAlive = ConstPtr(this)](boost::asio::yield_context yc) { ExportLoop(yc); }); } /** * Stop the OTel exporter and disconnect from the OpenTelemetry backend. * * This method blocks until the exporter has fully stopped and disconnected from the backend. * It cancels any ongoing export operations and clears all its internal state, so that it can be * safely restarted later if needed. */ void OTel::Stop() { if (m_Stopped.exchange(true)) { return; } std::promise promise; IoEngine::SpawnCoroutine(m_Strand, [this, &promise, keepAlive = ConstPtr(this)](boost::asio::yield_context& yc) { m_ExportAsioCV.NotifyAll(); // Wake up the export loop if it's waiting for new export requests. m_RetryExportAndConnTimer.cancel(); if (!m_Stream) { promise.set_value(); return; } // We only wait for ongoing export operations to complete if we're currently exporting, // otherwise there will be nothing that would wake us up from the `WaitForClear` sleep // below, and we would end up blocking indefinitely, so we have to check the exporting // state here first. if (Exporting()) { Timeout writerTimeout(m_Strand, boost::posix_time::seconds(5), [this] { boost::system::error_code ec; std::visit([&ec](auto& stream) { stream->lowest_layer().cancel(ec); }, *m_Stream); }); while (m_Request) { m_ExportAsioCV.Wait(yc); } } // Check if the stream is still valid before attempting to disconnect, since the above lowest_layer.cancel() // may have caused the export loop to detect a broken connection and reset the stream already. if (m_Stream) { if (auto* tlsStreamPtr = std::get_if::Ptr>(&*m_Stream); tlsStreamPtr) { (*tlsStreamPtr)->GracefulDisconnect(m_Strand, yc); } else if (auto* tcpStreamPtr = std::get_if::Ptr>(&*m_Stream); tcpStreamPtr) { boost::system::error_code ec; (*tcpStreamPtr)->lowest_layer().shutdown(AsioTcpStream::lowest_layer_type::shutdown_both, ec); (*tcpStreamPtr)->lowest_layer().close(ec); } } Log(LogInformation, "OTelExporter") << "Disconnected from OpenTelemetry backend."; m_Stream.reset(); promise.set_value(); }); promise.get_future().wait(); } /** * Export the given OTel metrics request to the OpenTelemetry backend. * * This method initiates the export of the provided OTel metrics request to the configured * OpenTelemetry backend. If an export is already in progress, it waits for the previous * export to complete before proceeding with the new export request (blocking the caller). * * @param request The OTel metrics request to export. */ void OTel::Export(std::unique_ptr&& request) { std::unique_lock lock(m_Mutex); if (m_Exporting) { Log(LogWarning, "OTelExporter") << "Received export request while previous export is still in progress. Waiting for it to complete."; m_ExportCV.wait(lock, [this] { return m_Stopped || !m_Exporting; }); if (m_Stopped) { return; } } m_Exporting = true; lock.unlock(); // Access to m_Request is serialized via m_Strand, so we must post the actual export operation to it. boost::asio::post(m_Strand, [this, keepAlive = ConstPtr(this), request = std::move(request)]() mutable { m_Request = std::move(request); m_ExportAsioCV.NotifyAll(); }); } /** * Populate the standard OTel resource attributes in the given ResourceMetrics Protobuf object. * * This method populates the standard OTel resource attributes as per OTel specifications[^1][^2] * into the provided ResourceMetrics Protobuf object. It sets attributes such as service name, * instance ID, version, and telemetry SDK information. * * @param rm The ResourceMetrics Protobuf object to populate. * * [^1]: https://opentelemetry.io/docs/specs/semconv/resource/#telemetry-sdk * [^2]: https://opentelemetry.io/docs/specs/semconv/resource/service/ */ void OTel::PopulateResourceAttrs(const std::unique_ptr& rm) { using namespace std::string_view_literals; rm->set_schema_url(l_OTelSchemaConv.data()); auto* resource = rm->mutable_resource(); auto* attr = resource->add_attributes(); SetAttribute(*attr, "service.name"sv, "Icinga 2"sv); auto instanceID = Application::GetEnvironmentId(); if (instanceID.IsEmpty()) { instanceID = "unknown"; } attr = resource->add_attributes(); SetAttribute(*attr, "service.instance.id"sv, std::move(instanceID)); attr = resource->add_attributes(); SetAttribute(*attr, "service.version"sv, Application::GetAppVersion()); attr = resource->add_attributes(); // We don't actually use OTel SDKs here, but to comply with OTel specs, we need to provide these attributes anyway. SetAttribute(*attr, "telemetry.sdk.language"sv, "cpp"sv); attr = resource->add_attributes(); SetAttribute(*attr, "telemetry.sdk.name"sv, "Icinga 2 OTel Integration"sv); attr = resource->add_attributes(); SetAttribute(*attr, "telemetry.sdk.version"sv, Application::GetAppVersion()); auto* ism = rm->add_scope_metrics(); ism->set_schema_url(l_OTelSchemaConv.data()); ism->mutable_scope()->set_name("icinga2"); ism->mutable_scope()->set_version(Application::GetAppVersion()); } /** * Establish a connection to the OpenTelemetry backend endpoint. * * In case of connection failures, it retries as per OTel spec[^1] with exponential backoff until a successful * connection is established or the exporter is stopped. Therefore, @c m_Stream is not guaranteed to be valid * after this method returns, so the caller must check it before using it. * * @param yc The Boost.Asio yield context for asynchronous operations. * * [^1]: https://opentelemetry.io/docs/specs/otlp/#otlphttp-connection */ void OTel::Connect(boost::asio::yield_context& yc) { Log(LogInformation, "OTelExporter") << "Connecting to OpenTelemetry backend on host '" << m_ConnInfo.Host << ":" << m_ConnInfo.Port << "'."; for (uint64_t attempt = 1; !m_Stopped; ++attempt) { try { decltype(m_Stream) stream; if (m_ConnInfo.EnableTls) { stream = Shared::Make(m_Strand.context(), *m_TlsContext, m_ConnInfo.Host); } else { stream = Shared::Make(m_Strand.context()); } Timeout timeout{m_Strand, boost::posix_time::seconds(10), [this, stream] { Log(LogCritical, "OTelExporter") << "Timeout while connecting to OpenTelemetry backend '" << m_ConnInfo.Host << ":" << m_ConnInfo.Port << "', cancelling attempt."; boost::system::error_code ec; std::visit([&ec](auto& s) { s->lowest_layer().cancel(ec); }, *stream); }}; std::visit([this, &yc](auto& streamArg) { icinga::Connect(streamArg->lowest_layer(), m_ConnInfo.Host, std::to_string(m_ConnInfo.Port), yc); if constexpr (std::is_same_v, Shared::Ptr>) { streamArg->next_layer().async_handshake(AsioTlsStream::next_layer_type::client, yc); if (m_ConnInfo.VerifyPeerCertificate && !streamArg->next_layer().IsVerifyOK()) { BOOST_THROW_EXCEPTION(std::runtime_error( "TLS certificate validation failed: " + streamArg->next_layer().GetVerifyError() )); } } }, *stream); m_Stream = std::move(stream); Log(LogInformation, "OTelExporter") << "Successfully connected to OpenTelemetry backend."; return; } catch (const std::exception& ex) { Log(m_Stopped ? LogDebug : LogCritical, "OTelExporter") << "Cannot connect to OpenTelemetry backend '" << m_ConnInfo.Host << ":" << m_ConnInfo.Port << "' (attempt #" << attempt << "): " << ex.what(); if (!m_Stopped) { boost::system::error_code ec; m_RetryExportAndConnTimer.expires_after(Backoff(attempt)); m_RetryExportAndConnTimer.async_wait(yc[ec]); } } } } /** * Main export loop for exporting OTel metrics to the configured backend. * * This method runs in a loop, waiting for new metrics to be available for export. In case of export failures, * it retries the export as per OTel spec[^1] with exponential backoff until the export succeeds or the exporter * is stopped. After a successful export, it clears the exported metrics from @c m_Request to make room for new metrics. * * @param yc The Asio yield context for asynchronous operations. * * [^1]: https://opentelemetry.io/docs/specs/otlp/#retryable-response-codes */ void OTel::ExportLoop(boost::asio::yield_context& yc) { Defer cleanup{[this] { m_Request.reset(); m_ExportAsioCV.NotifyAll(); ResetExporting(true /* notify all */); }}; namespace ch = std::chrono; while (true) { // Wait for a new export request to be available. If the exporter is stopped while waiting, // we will be notified without a new request, so we also check the stopped state here to // avoid waiting indefinitely in that case. while (!m_Request && !m_Stopped) { m_ExportAsioCV.Wait(yc); } if (m_Stopped) { break; } if (!m_Stream) { Connect(yc); } for (uint64_t attempt = 1; m_Stream && !m_Stopped; ++attempt) { try { ExportImpl(yc); m_Request.reset(); m_ExportAsioCV.NotifyAll(); ResetExporting(false /* notify one */); break; } catch (const RetryableExportError& ex) { ch::milliseconds retryAfter; if (auto throttle = ex.Throttle(); throttle > 0ms) { retryAfter = throttle; } else { retryAfter = Backoff(attempt); } Log(LogWarning, "OTelExporter") << "Failed to export metrics to OpenTelemetry backend (attempt #" << attempt << "). Retrying in " << retryAfter.count() << "ms."; boost::system::error_code ec; m_RetryExportAndConnTimer.expires_after(retryAfter); m_RetryExportAndConnTimer.async_wait(yc[ec]); } catch (const std::exception& ex) { LogSeverity severity = LogCritical; const auto* ser{dynamic_cast(&ex)}; // Since we don't have a proper connection health check mechanism, we assume that certain errors // indicate a broken connection and force a reconnect in those cases. For the `end_of_stream` case, // we downgrade the log severity to debug level since this is a normal occurrence when using an OTEL // collector compatible backend that don't honor keep-alive connections (e.g., OpenSearch Data Prepper). if (m_Stopped || (ser && ser->code() == http::error::end_of_stream)) { severity = LogDebug; } Log{severity, "OTelExporter", DiagnosticInformation(ex, false)}; m_Stream.reset(); // Force reconnect on next export attempt. } } } } void OTel::ExportImpl(boost::asio::yield_context& yc) const { AsioProtobufOutStream outputS{*m_Stream, m_ConnInfo, yc}; [[maybe_unused]] auto serialized = m_Request->SerializeToZeroCopyStream(&outputS); ASSERT(serialized); // Must have completed chunk writing successfully, otherwise reading the response will hang forever. if (!outputS.WriterDone()) { BOOST_THROW_EXCEPTION(std::runtime_error("BUG: Protobuf output stream writer did not complete successfully.")); } IncomingHttpResponse responseMsg{*m_Stream}; responseMsg.Parse(yc); if (auto ct = responseMsg[http::field::content_type]; ct != "application/x-protobuf") { if (responseMsg.result() == http::status::ok) { // Some OpenTelemetry Collector compatible backends (e.g., Prometheus OTLP Receiver) respond with 200 OK // but without the expected Protobuf content type. So, don't do anything here since the request succeeded. return; } Log(LogWarning, "OTelExporter") << "Unexpected Content-Type from OpenTelemetry backend '" << ct << "' (" << responseMsg.reason() << "):\n" << responseMsg.body(); } else if (responseMsg.result_int() >= 200 && responseMsg.result_int() <= 299) { // We've got a valid Protobuf response, so we've to deserialize the body to check for partial success. // See https://opentelemetry.io/docs/specs/otlp/#partial-success-1. google::protobuf::Arena arena; auto* response = MetricsResponse::default_instance().New(&arena); [[maybe_unused]] auto deserialized = response->ParseFromString(responseMsg.body()); ASSERT(deserialized); if (response->has_partial_success()) { const auto& ps = response->partial_success(); const auto& msg = ps.error_message(); if (ps.rejected_data_points() > 0 || !msg.empty()) { Log(LogWarning, "OTelExporter") << "OpenTelemetry backend reported partial success: " << (msg.empty() ? "" : msg) << " (" << ps.rejected_data_points() << " metric data points rejected)."; } } } else if (IsRetryableExportError(responseMsg.result())) { uint64_t throttleSeconds = 0; if (auto throttle = responseMsg[http::field::retry_after]; !throttle.empty()) { try { throttleSeconds = boost::lexical_cast(throttle); } catch (const std::exception& ex) { Log(LogWarning, "OTelExporter") << "Failed to parse 'Retry-After' header from OpenTelemetry backend response: " << ex.what(); } } BOOST_THROW_EXCEPTION(RetryableExportError{throttleSeconds}); } else { Log(LogWarning, "OTelExporter") << "OpenTelemetry backend responded with non-success and non-retryable status code " << responseMsg.result_int() << " (" << responseMsg.reason() << ").\n" << responseMsg.body(); } } /** * Reset the exporting state and notify waiters. * * This method resets the internal exporting state to indicate that no export is currently * in progress. It then notifies either one or all waiters waiting for the export to complete, * based on the @c notifyAll parameter. * * @param notifyAll If true, notifies all waiters; otherwise, notifies only one waiter. */ void OTel::ResetExporting(bool notifyAll) { { std::lock_guard lock(m_Mutex); m_Exporting = false; } if (notifyAll) { m_ExportCV.notify_all(); } else { m_ExportCV.notify_one(); } } /** * Validate the given OTel metric name according to OTel naming conventions[^1]. * Here's the ABNF definition for reference: * @verbatim * instrument-name = ALPHA 0*254 ("_" / "." / "-" / "/" / ALPHA / DIGIT) * ALPHA = %x41-5A / %x61-7A; A-Z / a-z * DIGIT = %x30-39 ; 0-9 * @endverbatim * * @param name The metric name to validate. * * @throws std::invalid_argument if the metric name is invalid. * * [^1]: https://opentelemetry.io/docs/specs/otel/metrics/api/#instrument-name-syntax */ void OTel::ValidateName(std::string_view name) { if (name.empty() || name.size() > 255) { BOOST_THROW_EXCEPTION(std::invalid_argument("OTel instrument name must be between 1 and 255 characters long.")); } auto isAlpha = [](char c) { return ('A' <= c && c <= 'Z') || ('a' <= c && c <= 'z'); }; auto isDigit = [](char c) { return '0' <= c && c <= '9'; }; for (std::size_t i = 0; i < name.size(); ++i) { auto c = name[i]; if (i == 0 && !isAlpha(c)) { BOOST_THROW_EXCEPTION(std::invalid_argument("OTel instrument name must start with an alphabetic character.")); } if (!isAlpha(c) && !isDigit(c) && c != '_' && c != '.' && c != '-' && c != '/') { BOOST_THROW_EXCEPTION(std::invalid_argument( "OTel instrument name contains invalid character '" + std::string(1, c) + "'." )); } } } /** * Set the given OTel attribute key-value pair in the provided @c Attribute Protobuf object. * * This method sets the given key-value pair in the provided KeyValue Protobuf object according to * OTel specifications[^1]. While the OTel specs[^2] allows a wider range of attr value types, we * only support the most common/scalar types (Boolean, Number (double), and String) for simplicity. * * @param attr The OTel attribute Protobuf object to set the value for. * @param key The attribute key to set. Must not be empty. * @param value The Value object containing the value to set in the attribute. * * @throws std::invalid_argument if key is empty or if @c Value represents an unsupported attribute value type. * * [^1]: https://opentelemetry.io/docs/specs/otel/common/#attribute * [^2]: https://opentelemetry.io/docs/specs/otel/common/#anyvalue */ template void OTel::SetAttribute(Attribute& attr, Key&& key, AttrVal&& value) { if (begin(key) == end(key)) { BOOST_THROW_EXCEPTION(std::invalid_argument("OTel attribute key must not be empty.")); } if constexpr (std::is_rvalue_reference_v && std::is_same_v, String>) { attr.set_key(std::move(key.GetData())); } else { attr.set_key(std::string{std::forward(key)}); } constexpr bool isRvalReference = std::is_rvalue_reference_v; if constexpr (isRvalReference && std::is_same_v, String>) { attr.mutable_value()->set_string_value(std::move(value.GetData())); } else if constexpr (std::is_constructible_v) { attr.mutable_value()->set_string_value(std::string{std::forward(value)}); } else { switch (value.GetType()) { case ValueBoolean: attr.mutable_value()->set_bool_value(value.template Get()); break; case ValueNumber: attr.mutable_value()->set_double_value(value.template Get()); break; case ValueString: if (isRvalReference) { attr.mutable_value()->set_string_value(std::move(value.template Get().GetData())); } else { attr.mutable_value()->set_string_value(value.template Get().GetData()); } break; default: BOOST_THROW_EXCEPTION(std::invalid_argument( "OTel attribute value must be of type Boolean, Number, or String, got '" + value.GetTypeName() + "'." )); } } } /** * Record a data point in the given OTel Gauge metric stream with the provided value, timestamps, and attributes. * * This method adds a new data point to the provided Gauge Protobuf object with the given value, start and end * timestamps, and a set of attributes. The value can be either an int64_t or a double, depending on the type * of the Gauge. The timestamps are expected to be in seconds and will be converted to nanoseconds as required * by OTel specifications. The attributes are provided as a map of key-value pairs and will be set in the data * point according to OTel attribute specs. * * @tparam T The type of the data point value, which must be either int64_t or double. * * @param gauge The Gauge Protobuf object to record the data point in. * @param data The value of the data point to record. * @param start The start timestamp of the data point in seconds. * @param end The end timestamp of the data point in seconds. * @param attrs A map of attribute key-value pairs to set in the data point. * * @return The size in bytes of the recorded data point after serialization. * * @throws std::invalid_argument if any attribute key is empty or has an unsupported value type. */ template std::size_t OTel::Record(Gauge& gauge, T data, double start, double end, AttrsMap attrs) { namespace ch = std::chrono; auto* dataPoint = gauge.add_data_points(); if constexpr (std::is_same_v) { dataPoint->set_as_double(data); } else { dataPoint->set_as_int(data); } dataPoint->set_start_time_unix_nano( static_cast(ch::duration_cast(ch::duration(start)).count()) ); dataPoint->set_time_unix_nano( static_cast(ch::duration_cast(ch::duration(end)).count()) ); while (!attrs.empty()) { auto* attr = dataPoint->add_attributes(); auto node = attrs.extract(attrs.begin()); SetAttribute(*attr, std::move(node.key()), std::move(node.mapped())); } return dataPoint->ByteSizeLong(); } /** * Determine if the given HTTP status code represents a retryable export error as per OTel specs[^1]. * * @param status The HTTP status code to check. * * @return true if the status code indicates a retryable error; false otherwise. * * [^1]: https://opentelemetry.io/docs/specs/otlp/#retryable-response-codes */ bool OTel::IsRetryableExportError(const http::status status) { return status == http::status::too_many_requests || status == http::status::bad_gateway || status == http::status::service_unavailable || status == http::status::gateway_timeout; } AsioProtobufOutStream::AsioProtobufOutStream(const AsioTlsOrTcpStream& stream, const OTelConnInfo& connInfo, boost::asio::yield_context yc) : m_Writer{stream}, m_YieldContext{std::move(yc)} { m_Writer.method(http::verb::post); m_Writer.target(connInfo.MetricsEndpoint); m_Writer.set(http::field::host, connInfo.Host + ":" + std::to_string(connInfo.Port)); m_Writer.set(http::field::content_type, "application/x-protobuf"); if (!connInfo.BasicAuth.IsEmpty()) { m_Writer.set(http::field::authorization, "Basic " + connInfo.BasicAuth); } m_Writer.StartStreaming(); } bool AsioProtobufOutStream::Next(void** data, int* size) { if (m_Buffered == l_BufferSize) { Flush(); } // Prepare a new buffer segment that the Protobuf serializer can write into. // The buffer size is fixed to l_BufferSize, and as seen above, we flush if the previous buffer // segment was fully used (which is always the case on each Next call after the initial one), so // we'll end up reusing the same memory region for each Next call because when we flush, we also // consume the committed data, and that region becomes writable again. auto buf = m_Writer.Prepare(l_BufferSize - m_Buffered); *data = buf.data(); *size = static_cast(l_BufferSize); m_Buffered = l_BufferSize; return true; } void AsioProtobufOutStream::BackUp(int count) { // Make sure we've not already finalized the HTTP body because BackUp // is supposed to be called only after a preceding (final) Next call. ASSERT(!m_Writer.Done()); ASSERT(static_cast(count) <= m_Buffered); ASSERT(m_Buffered == l_BufferSize); // If the last prepared buffer segment was not fully used, we need to adjust the buffered size, // so that we don't commit unused memory regions with the below Flush() call. If count is zero, // this adjustment is a no-op, and indicates that the entire buffer was used and there won't be // any subsequent Next calls anymore (i.e., the Protobuf serialization is complete). m_Buffered -= count; Flush(true); } int64_t AsioProtobufOutStream::ByteCount() const { return m_Pos + static_cast(m_Buffered); } /** * Flush any buffered data to the underlying Asio stream. * * If the `finish` parameter is set to true, it indicates that no more data will * be buffered/generated, and the HTTP body will be finalized accordingly. * * @param finish Whether this is the final flush operation. */ void AsioProtobufOutStream::Flush(bool finish) { ASSERT(m_Buffered > 0 || finish); m_Writer.Commit(m_Buffered); m_Writer.Flush(m_YieldContext, finish); m_Pos += static_cast(m_Buffered); m_Buffered = 0; } /** * Check if the underlying HTTP request writer has completed writing. * * @return true if the writer has finished writing; false otherwise. */ bool AsioProtobufOutStream::WriterDone() { return m_Writer.Done(); }