mirror of
https://github.com/Icinga/icinga2.git
synced 2026-04-27 00:57:01 -04:00
Add PerfdataWriterConnection class
This commit is contained in:
parent
11d099f4bb
commit
2e2576c7c5
3 changed files with 367 additions and 0 deletions
|
|
@ -19,6 +19,7 @@ set(perfdata_SOURCES
|
|||
influxdb2writer.cpp influxdb2writer.hpp influxdb2writer-ti.hpp
|
||||
opentsdbwriter.cpp opentsdbwriter.hpp opentsdbwriter-ti.hpp
|
||||
perfdatawriter.cpp perfdatawriter.hpp perfdatawriter-ti.hpp
|
||||
perfdatawriterconnection.cpp perfdatawriterconnection.hpp
|
||||
)
|
||||
|
||||
if(ICINGA2_UNITY_BUILD)
|
||||
|
|
|
|||
209
lib/perfdata/perfdatawriterconnection.cpp
Normal file
209
lib/perfdata/perfdatawriterconnection.cpp
Normal file
|
|
@ -0,0 +1,209 @@
|
|||
// SPDX-FileCopyrightText: 2026 Icinga GmbH <https://icinga.com>
|
||||
// SPDX-License-Identifier: GPL-3.0-or-later
|
||||
|
||||
#include "perfdata/perfdatawriterconnection.hpp"
|
||||
#include "base/tcpsocket.hpp"
|
||||
#include <boost/asio/use_future.hpp>
|
||||
#include <boost/beast/http/read.hpp>
|
||||
#include <boost/beast/http/write.hpp>
|
||||
#include <utility>
|
||||
|
||||
using namespace icinga;
|
||||
using HttpResponse = PerfdataWriterConnection::HttpResponse;
|
||||
|
||||
PerfdataWriterConnection::PerfdataWriterConnection(
|
||||
const ConfigObject::Ptr& parent,
|
||||
String host,
|
||||
String port,
|
||||
Shared<boost::asio::ssl::context>::Ptr sslContext,
|
||||
bool verifyPeerCertificate
|
||||
)
|
||||
: PerfdataWriterConnection(
|
||||
parent->GetReflectionType()->GetName(),
|
||||
parent->GetName(),
|
||||
std::move(host),
|
||||
std::move(port),
|
||||
std::move(sslContext),
|
||||
verifyPeerCertificate
|
||||
) {};
|
||||
|
||||
PerfdataWriterConnection::PerfdataWriterConnection(
|
||||
String logFacility,
|
||||
String parentName,
|
||||
String host,
|
||||
String port,
|
||||
Shared<boost::asio::ssl::context>::Ptr sslContext,
|
||||
bool verifyPeerCertificate
|
||||
)
|
||||
: m_VerifyPeerCertificate(verifyPeerCertificate),
|
||||
m_SslContext(std::move(sslContext)),
|
||||
m_LogFacility(std::move(logFacility)),
|
||||
m_ParentName(std::move(parentName)),
|
||||
m_Host(std::move(host)),
|
||||
m_Port(std::move(port)),
|
||||
m_ReconnectTimer(IoEngine::Get().GetIoContext()),
|
||||
m_Strand(IoEngine::Get().GetIoContext()),
|
||||
m_Stream(MakeStream())
|
||||
{
|
||||
}
|
||||
|
||||
/**
|
||||
* Get the current state of the connection.
|
||||
*/
|
||||
bool PerfdataWriterConnection::IsConnected() const
|
||||
{
|
||||
return m_Connected;
|
||||
}
|
||||
|
||||
bool PerfdataWriterConnection::IsStopped() const
|
||||
{
|
||||
return m_Stopped;
|
||||
}
|
||||
|
||||
void PerfdataWriterConnection::Disconnect()
|
||||
{
|
||||
if (m_Stopped.exchange(true, std::memory_order_relaxed)) {
|
||||
return;
|
||||
}
|
||||
|
||||
std::promise<void> promise;
|
||||
|
||||
IoEngine::SpawnCoroutine(m_Strand, [&](boost::asio::yield_context yc) {
|
||||
try {
|
||||
/* Cancel any outstanding operations of the other coroutine.
|
||||
* Since we're on the same strand we're hopefully guaranteed that all cancellations
|
||||
* result in exceptions thrown by the yield_context, even if its already queued for
|
||||
* completion.
|
||||
*/
|
||||
std::visit(
|
||||
[](const auto& stream) {
|
||||
if (stream->lowest_layer().is_open()) {
|
||||
stream->lowest_layer().cancel();
|
||||
}
|
||||
},
|
||||
m_Stream
|
||||
);
|
||||
m_ReconnectTimer.cancel();
|
||||
|
||||
Disconnect(std::move(yc));
|
||||
promise.set_value();
|
||||
} catch (const std::exception& ex) {
|
||||
promise.set_exception(std::current_exception());
|
||||
}
|
||||
});
|
||||
|
||||
promise.get_future().get();
|
||||
}
|
||||
|
||||
AsioTlsOrTcpStream PerfdataWriterConnection::MakeStream() const
|
||||
{
|
||||
AsioTlsOrTcpStream ret;
|
||||
if (m_SslContext) {
|
||||
ret = Shared<AsioTlsStream>::Make(IoEngine::Get().GetIoContext(), *m_SslContext);
|
||||
} else {
|
||||
ret = Shared<AsioTcpStream>::Make(IoEngine::Get().GetIoContext());
|
||||
}
|
||||
|
||||
return ret;
|
||||
}
|
||||
|
||||
/**
|
||||
* Wait for the next attempt after an error, using a backoff algorithm.
|
||||
*
|
||||
* The waits between retries are doubled for each failure, up to a maximum of 32s, until it is
|
||||
* reset by a successful attempt.
|
||||
*/
|
||||
void PerfdataWriterConnection::BackoffWait(const boost::asio::yield_context& yc)
|
||||
{
|
||||
m_ReconnectTimer.expires_after(m_RetryTimeout);
|
||||
if (m_RetryTimeout <= FinalRetryWait / 2) {
|
||||
m_RetryTimeout *= 2;
|
||||
}
|
||||
m_ReconnectTimer.async_wait(yc);
|
||||
}
|
||||
|
||||
void PerfdataWriterConnection::EnsureConnected(const boost::asio::yield_context& yc)
|
||||
{
|
||||
if (m_Connected) {
|
||||
return;
|
||||
}
|
||||
|
||||
std::visit(
|
||||
[&](auto& stream) {
|
||||
::Connect(stream->lowest_layer(), m_Host, m_Port, yc);
|
||||
|
||||
if constexpr (std::is_same_v<std::decay_t<decltype(stream)>, Shared<AsioTlsStream>::Ptr>) {
|
||||
using type = boost::asio::ssl::stream_base::handshake_type;
|
||||
|
||||
stream->next_layer().async_handshake(type::client, yc);
|
||||
|
||||
if (m_VerifyPeerCertificate) {
|
||||
if (!stream->next_layer().IsVerifyOK()) {
|
||||
BOOST_THROW_EXCEPTION(
|
||||
std::runtime_error{
|
||||
"TLS certificate validation failed: " + stream->next_layer().GetVerifyError()
|
||||
}
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
},
|
||||
m_Stream
|
||||
);
|
||||
|
||||
m_Connected = true;
|
||||
}
|
||||
|
||||
void PerfdataWriterConnection::Disconnect(boost::asio::yield_context yc)
|
||||
{
|
||||
if (!m_Connected.exchange(false, std::memory_order_relaxed)) {
|
||||
return;
|
||||
}
|
||||
|
||||
std::visit(
|
||||
[&](auto& stream) {
|
||||
if constexpr (std::is_same_v<std::decay_t<decltype(stream)>, Shared<AsioTlsStream>::Ptr>) {
|
||||
stream->GracefulDisconnect(m_Strand, yc);
|
||||
} else {
|
||||
stream->lowest_layer().shutdown(boost::asio::socket_base::shutdown_both);
|
||||
stream->lowest_layer().close();
|
||||
}
|
||||
},
|
||||
m_Stream
|
||||
);
|
||||
|
||||
m_Stream = MakeStream();
|
||||
}
|
||||
|
||||
void PerfdataWriterConnection::WriteMessage(boost::asio::const_buffer buf, const boost::asio::yield_context& yc)
|
||||
{
|
||||
std::visit(
|
||||
[&](auto& stream) {
|
||||
boost::asio::async_write(*stream, buf, yc);
|
||||
stream->async_flush(yc);
|
||||
},
|
||||
m_Stream
|
||||
);
|
||||
}
|
||||
|
||||
HttpResponse PerfdataWriterConnection::WriteMessage(const HttpRequest& request, const boost::asio::yield_context& yc)
|
||||
{
|
||||
boost::beast::http::response<boost::beast::http::string_body> response;
|
||||
std::visit(
|
||||
[&](auto& stream) {
|
||||
boost::beast::http::request_serializer<boost::beast::http::string_body> sr{request};
|
||||
boost::beast::http::async_write(*stream, sr, yc);
|
||||
stream->async_flush(yc);
|
||||
|
||||
boost::beast::flat_buffer buf;
|
||||
boost::beast::http::async_read(*stream, buf, response, yc);
|
||||
},
|
||||
m_Stream
|
||||
);
|
||||
|
||||
if (!response.keep_alive()) {
|
||||
Disconnect(yc);
|
||||
}
|
||||
|
||||
return response;
|
||||
}
|
||||
157
lib/perfdata/perfdatawriterconnection.hpp
Normal file
157
lib/perfdata/perfdatawriterconnection.hpp
Normal file
|
|
@ -0,0 +1,157 @@
|
|||
// SPDX-FileCopyrightText: 2026 Icinga GmbH <https://icinga.com>
|
||||
// SPDX-License-Identifier: GPL-3.0-or-later
|
||||
|
||||
#pragma once
|
||||
|
||||
#include "base/io-engine.hpp"
|
||||
#include "base/tlsstream.hpp"
|
||||
#include <boost/asio/buffer.hpp>
|
||||
#include <boost/asio/steady_timer.hpp>
|
||||
#include <boost/beast/http/message.hpp>
|
||||
#include <boost/beast/http/string_body.hpp>
|
||||
#include <future>
|
||||
|
||||
namespace icinga {
|
||||
|
||||
/**
|
||||
* Class handling the connection to the various Perfdata backends.
|
||||
*/
|
||||
class PerfdataWriterConnection final : public Object
|
||||
{
|
||||
static constexpr auto InitialRetryWait = 50ms;
|
||||
static constexpr auto FinalRetryWait = 32s;
|
||||
|
||||
public:
|
||||
DECLARE_PTR_TYPEDEFS(PerfdataWriterConnection);
|
||||
|
||||
struct Stopped : std::exception
|
||||
{
|
||||
[[nodiscard]] const char* what() const noexcept final { return "Connection stopped."; }
|
||||
};
|
||||
|
||||
using HttpRequest = boost::beast::http::request<boost::beast::http::string_body>;
|
||||
using HttpResponse = boost::beast::http::response<boost::beast::http::string_body>;
|
||||
|
||||
PerfdataWriterConnection(
|
||||
const ConfigObject::Ptr& parent,
|
||||
String host,
|
||||
String port,
|
||||
Shared<boost::asio::ssl::context>::Ptr sslContext = nullptr,
|
||||
bool verifyPeerCertificate = true
|
||||
);
|
||||
|
||||
PerfdataWriterConnection(
|
||||
String logFacility,
|
||||
String parentName,
|
||||
String host,
|
||||
String port,
|
||||
Shared<boost::asio::ssl::context>::Ptr sslContext = nullptr,
|
||||
bool verifyPeerCertificate = true
|
||||
);
|
||||
|
||||
/**
|
||||
* Send the given data buffer to the server.
|
||||
*
|
||||
* To support each Buffer type this function needs an overload of the WriteMessage method.
|
||||
* If the selected WriteMessage functions returns something, Send() will return that result.
|
||||
*
|
||||
* @param buf The buffer to send
|
||||
* @return the return value returned by the WriteMessage overload for Buffer, otherwise void
|
||||
*/
|
||||
template<typename Buffer>
|
||||
auto Send(Buffer&& buf)
|
||||
{
|
||||
if (m_Stopped) {
|
||||
BOOST_THROW_EXCEPTION(Stopped{});
|
||||
}
|
||||
|
||||
using RetType = decltype(WriteMessage(std::declval<Buffer>(), std::declval<boost::asio::yield_context>()));
|
||||
std::promise<RetType> promise;
|
||||
|
||||
IoEngine::SpawnCoroutine(m_Strand, [&](boost::asio::yield_context yc) {
|
||||
while (true) {
|
||||
try {
|
||||
EnsureConnected(yc);
|
||||
|
||||
if constexpr (std::is_void_v<RetType>) {
|
||||
WriteMessage(std::forward<Buffer>(buf), yc);
|
||||
promise.set_value();
|
||||
} else {
|
||||
promise.set_value(WriteMessage(std::forward<Buffer>(buf), yc));
|
||||
}
|
||||
|
||||
m_RetryTimeout = InitialRetryWait;
|
||||
return;
|
||||
} catch (const std::exception& ex) {
|
||||
if (m_Stopped) {
|
||||
promise.set_exception(std::make_exception_ptr(Stopped{}));
|
||||
return;
|
||||
}
|
||||
|
||||
Log(LogCritical, m_LogFacility)
|
||||
<< "Error while " << (m_Connected ? "sending" : "connecting") << " to '" << m_Host << ":"
|
||||
<< m_Port << "' for '" << m_ParentName << "': " << ex.what();
|
||||
|
||||
m_Stream = MakeStream();
|
||||
m_Connected = false;
|
||||
|
||||
try {
|
||||
BackoffWait(yc);
|
||||
} catch (const std::exception&) {
|
||||
promise.set_exception(std::make_exception_ptr(Stopped{}));
|
||||
return;
|
||||
}
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
return promise.get_future().get();
|
||||
}
|
||||
|
||||
void Disconnect();
|
||||
|
||||
/**
|
||||
* Cancels ongoing operations either after a timeout or a future became ready.
|
||||
*
|
||||
* This will disconnect and set a flag so that no further Send() requests are accepted.
|
||||
*
|
||||
* @param future The future to wait for
|
||||
* @param timeout The timeout after which ongoing operations are canceled
|
||||
*/
|
||||
template<class Rep, class Period>
|
||||
void CancelAfterTimeout(const std::future<void>& future, const std::chrono::duration<Rep, Period>& timeout)
|
||||
{
|
||||
future.wait_for(timeout);
|
||||
Disconnect();
|
||||
}
|
||||
|
||||
bool IsConnected() const;
|
||||
bool IsStopped() const;
|
||||
|
||||
private:
|
||||
AsioTlsOrTcpStream MakeStream() const;
|
||||
void BackoffWait(const boost::asio::yield_context& yc);
|
||||
void EnsureConnected(const boost::asio::yield_context& yc);
|
||||
void Disconnect(boost::asio::yield_context yc);
|
||||
|
||||
void WriteMessage(boost::asio::const_buffer, const boost::asio::yield_context& yc);
|
||||
HttpResponse WriteMessage(const HttpRequest& request, const boost::asio::yield_context& yc);
|
||||
|
||||
std::atomic_bool m_Stopped{false};
|
||||
std::atomic_bool m_Connected{false};
|
||||
|
||||
bool m_VerifyPeerCertificate;
|
||||
Shared<boost::asio::ssl::context>::Ptr m_SslContext;
|
||||
|
||||
String m_LogFacility;
|
||||
String m_ParentName;
|
||||
String m_Host;
|
||||
String m_Port;
|
||||
|
||||
std::chrono::milliseconds m_RetryTimeout{InitialRetryWait};
|
||||
boost::asio::steady_timer m_ReconnectTimer;
|
||||
boost::asio::io_context::strand m_Strand;
|
||||
AsioTlsOrTcpStream m_Stream;
|
||||
};
|
||||
|
||||
} // namespace icinga
|
||||
Loading…
Reference in a new issue