mirror of
https://github.com/Icinga/icinga2.git
synced 2026-05-28 04:12:13 -04:00
Merge 4680a2d76f into f63bbec4ab
This commit is contained in:
commit
ce96c475d9
10 changed files with 463 additions and 28 deletions
|
|
@ -49,11 +49,12 @@ void GelfWriter::StatsFunc(const Dictionary::Ptr& status, const Array::Ptr& perf
|
|||
for (const GelfWriter::Ptr& gelfwriter : ConfigType::GetObjectsByType<GelfWriter>()) {
|
||||
size_t workQueueItems = gelfwriter->m_WorkQueue.GetLength();
|
||||
double workQueueItemRate = gelfwriter->m_WorkQueue.GetTaskCount(60) / 60.0;
|
||||
auto connection = gelfwriter->m_LockedConnection.load();
|
||||
|
||||
nodes.emplace_back(gelfwriter->GetName(), new Dictionary({
|
||||
{ "work_queue_items", workQueueItems },
|
||||
{ "work_queue_item_rate", workQueueItemRate },
|
||||
{ "connected", gelfwriter->m_Connection->IsConnected() },
|
||||
{ "connected", connection && connection->IsConnected() },
|
||||
{ "source", gelfwriter->GetSource() }
|
||||
}));
|
||||
|
||||
|
|
@ -90,7 +91,8 @@ void GelfWriter::Resume()
|
|||
/* Register exception handler for WQ tasks. */
|
||||
m_WorkQueue.SetExceptionCallback([this](boost::exception_ptr exp) { ExceptionHandler(std::move(exp)); });
|
||||
|
||||
m_Connection = new PerfdataWriterConnection{this, GetHost(), GetPort(), m_SslContext, !GetInsecureNoverify()};
|
||||
m_LockedConnection.store(new PerfdataWriterConnection{this, GetHost(), GetPort(), m_SslContext, !GetInsecureNoverify()});
|
||||
m_Connection = m_LockedConnection.load();
|
||||
|
||||
/* Register event handlers. */
|
||||
m_HandleCheckResults = Checkable::OnNewCheckResult.connect([this](const Checkable::Ptr& checkable,
|
||||
|
|
|
|||
|
|
@ -34,6 +34,7 @@ protected:
|
|||
|
||||
private:
|
||||
PerfdataWriterConnection::Ptr m_Connection;
|
||||
Locked<PerfdataWriterConnection::Ptr> m_LockedConnection;
|
||||
WorkQueue m_WorkQueue{10000000, 1};
|
||||
Shared<boost::asio::ssl::context>::Ptr m_SslContext;
|
||||
|
||||
|
|
|
|||
|
|
@ -58,11 +58,12 @@ void GraphiteWriter::StatsFunc(const Dictionary::Ptr& status, const Array::Ptr&
|
|||
for (const GraphiteWriter::Ptr& graphitewriter : ConfigType::GetObjectsByType<GraphiteWriter>()) {
|
||||
size_t workQueueItems = graphitewriter->m_WorkQueue.GetLength();
|
||||
double workQueueItemRate = graphitewriter->m_WorkQueue.GetTaskCount(60) / 60.0;
|
||||
auto connection = graphitewriter->m_LockedConnection.load();
|
||||
|
||||
nodes.emplace_back(graphitewriter->GetName(), new Dictionary({
|
||||
{ "work_queue_items", workQueueItems },
|
||||
{ "work_queue_item_rate", workQueueItemRate },
|
||||
{ "connected", graphitewriter->m_Connection->IsConnected() }
|
||||
{ "connected", connection && connection->IsConnected() }
|
||||
}));
|
||||
|
||||
perfdata->Add(new PerfdataValue("graphitewriter_" + graphitewriter->GetName() + "_work_queue_items", workQueueItems));
|
||||
|
|
@ -85,7 +86,8 @@ void GraphiteWriter::Resume()
|
|||
/* Register exception handler for WQ tasks. */
|
||||
m_WorkQueue.SetExceptionCallback([this](boost::exception_ptr exp) { ExceptionHandler(std::move(exp)); });
|
||||
|
||||
m_Connection = new PerfdataWriterConnection{this, GetHost(), GetPort()};
|
||||
m_LockedConnection.store(new PerfdataWriterConnection{this, GetHost(), GetPort()});
|
||||
m_Connection = m_LockedConnection.load();
|
||||
|
||||
/* Register event handlers. */
|
||||
m_HandleCheckResults = Checkable::OnNewCheckResult.connect([this](const Checkable::Ptr& checkable,
|
||||
|
|
|
|||
|
|
@ -36,6 +36,7 @@ protected:
|
|||
|
||||
private:
|
||||
PerfdataWriterConnection::Ptr m_Connection;
|
||||
Locked<PerfdataWriterConnection::Ptr> m_LockedConnection;
|
||||
WorkQueue m_WorkQueue{10000000, 1};
|
||||
|
||||
boost::signals2::connection m_HandleCheckResults;
|
||||
|
|
|
|||
|
|
@ -55,11 +55,12 @@ void OpenTsdbWriter::StatsFunc(const Dictionary::Ptr& status, const Array::Ptr&
|
|||
for (const OpenTsdbWriter::Ptr& opentsdbwriter : ConfigType::GetObjectsByType<OpenTsdbWriter>()) {
|
||||
size_t workQueueItems = opentsdbwriter->m_WorkQueue.GetLength();
|
||||
double workQueueItemRate = opentsdbwriter->m_WorkQueue.GetTaskCount(60) / 60.0;
|
||||
auto connection = opentsdbwriter->m_LockedConnection.load();
|
||||
|
||||
nodes.emplace_back(
|
||||
opentsdbwriter->GetName(),
|
||||
new Dictionary({
|
||||
{ "connected", opentsdbwriter->m_Connection->IsConnected() },
|
||||
{ "connected", connection && connection->IsConnected() },
|
||||
{"work_queue_items", workQueueItems},
|
||||
{"work_queue_item_rate", workQueueItemRate}
|
||||
}
|
||||
|
|
@ -90,7 +91,8 @@ void OpenTsdbWriter::Resume()
|
|||
|
||||
ReadConfigTemplate();
|
||||
|
||||
m_Connection = new PerfdataWriterConnection{this, GetHost(), GetPort()};
|
||||
m_LockedConnection.store(new PerfdataWriterConnection{this, GetHost(), GetPort()});
|
||||
m_Connection = m_LockedConnection.load();
|
||||
|
||||
m_HandleCheckResults = Service::OnNewCheckResult.connect([this](const Checkable::Ptr& checkable, const CheckResult::Ptr& cr, const MessageOrigin::Ptr&) {
|
||||
CheckResultHandler(checkable, cr);
|
||||
|
|
|
|||
|
|
@ -37,6 +37,7 @@ private:
|
|||
WorkQueue m_WorkQueue{10000000, 1};
|
||||
std::string m_MsgBuf;
|
||||
PerfdataWriterConnection::Ptr m_Connection;
|
||||
Locked<PerfdataWriterConnection::Ptr> m_LockedConnection;
|
||||
|
||||
boost::signals2::connection m_HandleCheckResults;
|
||||
|
||||
|
|
|
|||
|
|
@ -3,9 +3,13 @@
|
|||
|
||||
#include "perfdata/perfdatawriterconnection.hpp"
|
||||
#include "base/tcpsocket.hpp"
|
||||
#include <boost/asio/post.hpp>
|
||||
#include <boost/asio/use_future.hpp>
|
||||
#include <boost/beast/http/read.hpp>
|
||||
#include <boost/beast/http/write.hpp>
|
||||
#include <chrono>
|
||||
#include <sstream>
|
||||
#include <thread>
|
||||
#include <utility>
|
||||
|
||||
using namespace icinga;
|
||||
|
|
@ -43,8 +47,225 @@ PerfdataWriterConnection::PerfdataWriterConnection(
|
|||
m_Port(std::move(port)),
|
||||
m_ReconnectTimer(IoEngine::Get().GetIoContext()),
|
||||
m_Strand(IoEngine::Get().GetIoContext()),
|
||||
m_SendCv(IoEngine::Get().GetIoContext()),
|
||||
m_Stream(MakeStream())
|
||||
{
|
||||
IoEngine::SpawnCoroutine(
|
||||
m_Strand, [this, self = PerfdataWriterConnection::Ptr{this}](const boost::asio::yield_context& yc) {
|
||||
while (!m_Stopped) {
|
||||
if (m_SendFunction) {
|
||||
m_SendFunction(yc);
|
||||
}
|
||||
m_SendCv.Wait(yc);
|
||||
}
|
||||
});
|
||||
|
||||
StartWatchdog();
|
||||
}
|
||||
|
||||
PerfdataWriterConnection::~PerfdataWriterConnection()
|
||||
{
|
||||
if (m_WatchdogTimer) {
|
||||
m_WatchdogTimer->Stop(true);
|
||||
}
|
||||
}
|
||||
|
||||
const char* PerfdataWriterConnection::SendPhaseName(SendPhase phase)
|
||||
{
|
||||
switch (phase) {
|
||||
case SendPhase::Idle: return "Idle";
|
||||
case SendPhase::Queued: return "Queued";
|
||||
case SendPhase::Connecting: return "Connecting";
|
||||
case SendPhase::Writing: return "Writing";
|
||||
case SendPhase::Finalizing: return "Finalizing";
|
||||
}
|
||||
return "Unknown";
|
||||
}
|
||||
|
||||
void PerfdataWriterConnection::StartWatchdog()
|
||||
{
|
||||
m_WatchdogTimer = Timer::Create();
|
||||
m_WatchdogTimer->SetInterval(1.0);
|
||||
m_WatchdogTimer->OnTimerExpired.connect([this](const Timer * const&) { OnWatchdogTimer(); });
|
||||
m_WatchdogTimer->Start();
|
||||
}
|
||||
|
||||
namespace {
|
||||
|
||||
std::string FormatThreadId(std::thread::id id)
|
||||
{
|
||||
if (id == std::thread::id{}) {
|
||||
return "?";
|
||||
}
|
||||
std::ostringstream oss;
|
||||
oss << id;
|
||||
return oss.str();
|
||||
}
|
||||
|
||||
void AppendProbeReport(
|
||||
Log& msg,
|
||||
const char* label,
|
||||
const PerfdataWriterConnection::ProbeState& state,
|
||||
std::chrono::steady_clock::time_point now,
|
||||
std::chrono::steady_clock::duration stuckThreshold)
|
||||
{
|
||||
auto epoch = std::chrono::steady_clock::time_point{};
|
||||
|
||||
msg << ", " << label << ":";
|
||||
|
||||
if (state.lastCompletedAt != epoch) {
|
||||
auto sinceTick = std::chrono::duration_cast<std::chrono::milliseconds>(now - state.lastCompletedAt);
|
||||
msg << " last tick " << sinceTick.count() << "ms ago on thread "
|
||||
<< FormatThreadId(state.lastCompletedThread);
|
||||
} else {
|
||||
msg << " no tick observed yet";
|
||||
}
|
||||
|
||||
if (state.probePending) {
|
||||
auto probeAge = std::chrono::duration_cast<std::chrono::milliseconds>(now - state.probeRequestedAt);
|
||||
msg << ", probe pending " << probeAge.count() << "ms";
|
||||
if (probeAge >= stuckThreshold) {
|
||||
msg << " — wedged";
|
||||
}
|
||||
} else {
|
||||
msg << ", probe responsive";
|
||||
}
|
||||
}
|
||||
|
||||
} // namespace
|
||||
|
||||
void PerfdataWriterConnection::OnWatchdogTimer()
|
||||
{
|
||||
if (m_Stopped) {
|
||||
return;
|
||||
}
|
||||
|
||||
SendDiagnostics diag;
|
||||
ProbeState strandProbe;
|
||||
ProbeState ioContextProbe;
|
||||
{
|
||||
std::lock_guard<std::mutex> lock{m_DiagMutex};
|
||||
diag = m_Diag;
|
||||
strandProbe = m_StrandProbe;
|
||||
ioContextProbe = m_IoContextProbe;
|
||||
}
|
||||
|
||||
/* Probe both executors. The snapshots above reflect the state observed
|
||||
* before this tick's probes are queued, so they capture whether the
|
||||
* *previous* probes ever ran. */
|
||||
ProbeStrand();
|
||||
ProbeIoContext();
|
||||
|
||||
if (diag.phase == SendPhase::Idle) {
|
||||
return;
|
||||
}
|
||||
|
||||
auto now = std::chrono::steady_clock::now();
|
||||
auto totalElapsed = now - diag.startedAt;
|
||||
|
||||
if (totalElapsed < stuckThreshold) {
|
||||
return;
|
||||
}
|
||||
|
||||
std::chrono::steady_clock::time_point phaseStart;
|
||||
switch (diag.phase) {
|
||||
case SendPhase::Queued: phaseStart = diag.startedAt; break;
|
||||
case SendPhase::Connecting: phaseStart = diag.spawnedAt; break;
|
||||
case SendPhase::Writing: phaseStart = diag.writingAt; break;
|
||||
case SendPhase::Finalizing: phaseStart = diag.doneAt; break;
|
||||
case SendPhase::Idle: return;
|
||||
}
|
||||
|
||||
auto inPhaseFor = std::chrono::duration_cast<std::chrono::milliseconds>(now - phaseStart);
|
||||
auto totalMs = std::chrono::duration_cast<std::chrono::milliseconds>(totalElapsed);
|
||||
|
||||
Log msg(LogWarning, m_LogFacility);
|
||||
msg << "Send to '" << m_Host << ":" << m_Port << "' for '" << m_ParentName
|
||||
<< "' has been in flight for " << totalMs.count() << "ms"
|
||||
<< " (phase: " << SendPhaseName(diag.phase) << " for " << inPhaseFor.count() << "ms";
|
||||
|
||||
AppendProbeReport(msg, "strand", strandProbe, now, stuckThreshold);
|
||||
AppendProbeReport(msg, "io_context", ioContextProbe, now, stuckThreshold);
|
||||
|
||||
msg << ").";
|
||||
}
|
||||
|
||||
void PerfdataWriterConnection::ProbeStrand()
|
||||
{
|
||||
auto now = std::chrono::steady_clock::now();
|
||||
{
|
||||
std::lock_guard<std::mutex> lock{m_DiagMutex};
|
||||
if (m_StrandProbe.probePending) {
|
||||
/* The previous probe hasn't completed yet — don't pile up new
|
||||
* posts. probeRequestedAt continues to age, so the watchdog will
|
||||
* report a growing stall on each tick. */
|
||||
return;
|
||||
}
|
||||
m_StrandProbe.probePending = true;
|
||||
m_StrandProbe.probeRequestedAt = now;
|
||||
}
|
||||
|
||||
boost::asio::post(m_Strand, [this, keepAlive = Ptr(this)]() {
|
||||
auto completedAt = std::chrono::steady_clock::now();
|
||||
auto tid = std::this_thread::get_id();
|
||||
std::lock_guard<std::mutex> lock{m_DiagMutex};
|
||||
m_StrandProbe.probePending = false;
|
||||
m_StrandProbe.lastCompletedAt = completedAt;
|
||||
m_StrandProbe.lastCompletedThread = tid;
|
||||
});
|
||||
}
|
||||
|
||||
void PerfdataWriterConnection::ProbeIoContext()
|
||||
{
|
||||
auto now = std::chrono::steady_clock::now();
|
||||
{
|
||||
std::lock_guard<std::mutex> lock{m_DiagMutex};
|
||||
if (m_IoContextProbe.probePending) {
|
||||
return;
|
||||
}
|
||||
m_IoContextProbe.probePending = true;
|
||||
m_IoContextProbe.probeRequestedAt = now;
|
||||
}
|
||||
|
||||
/* Post directly to the io_context — bypassing m_Strand — so we can tell
|
||||
* a strand-local wedge from io_context worker-pool exhaustion. */
|
||||
boost::asio::post(IoEngine::Get().GetIoContext(), [this, keepAlive = Ptr(this)]() {
|
||||
auto completedAt = std::chrono::steady_clock::now();
|
||||
auto tid = std::this_thread::get_id();
|
||||
std::lock_guard<std::mutex> lock{m_DiagMutex};
|
||||
m_IoContextProbe.probePending = false;
|
||||
m_IoContextProbe.lastCompletedAt = completedAt;
|
||||
m_IoContextProbe.lastCompletedThread = tid;
|
||||
});
|
||||
}
|
||||
|
||||
void PerfdataWriterConnection::LogSlowSend(const SendDiagnostics& diag, std::chrono::steady_clock::duration totalElapsed) const
|
||||
{
|
||||
auto toMs = [](std::chrono::steady_clock::duration d) {
|
||||
return std::chrono::duration_cast<std::chrono::milliseconds>(d).count();
|
||||
};
|
||||
|
||||
Log msg(LogWarning, m_LogFacility);
|
||||
msg << "Slow Send to '" << m_Host << ":" << m_Port << "' for '" << m_ParentName
|
||||
<< "' took " << toMs(totalElapsed) << "ms";
|
||||
|
||||
auto epoch = std::chrono::steady_clock::time_point{};
|
||||
|
||||
if (diag.spawnedAt > diag.startedAt) {
|
||||
msg << ", spawn " << toMs(diag.spawnedAt - diag.startedAt) << "ms";
|
||||
} else if (diag.spawnedAt == epoch) {
|
||||
msg << ", spawn still pending";
|
||||
}
|
||||
|
||||
if (diag.writingAt > diag.spawnedAt && diag.spawnedAt != epoch) {
|
||||
msg << ", connect " << toMs(diag.writingAt - diag.spawnedAt) << "ms";
|
||||
}
|
||||
|
||||
if (diag.doneAt > diag.writingAt && diag.writingAt != epoch) {
|
||||
msg << ", write " << toMs(diag.doneAt - diag.writingAt) << "ms";
|
||||
}
|
||||
|
||||
msg << " (phase before exit: " << SendPhaseName(diag.phase) << ").";
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
@ -62,11 +283,11 @@ bool PerfdataWriterConnection::IsStopped() const
|
|||
|
||||
void PerfdataWriterConnection::Disconnect()
|
||||
{
|
||||
if (m_Stopped.exchange(true, std::memory_order_relaxed)) {
|
||||
if (m_Stopped.exchange(true)) {
|
||||
return;
|
||||
}
|
||||
|
||||
std::promise<void> promise;
|
||||
SyncResult<void> ret;
|
||||
|
||||
IoEngine::SpawnCoroutine(m_Strand, [&](boost::asio::yield_context yc) {
|
||||
try {
|
||||
|
|
@ -76,9 +297,13 @@ void PerfdataWriterConnection::Disconnect()
|
|||
* completion.
|
||||
*/
|
||||
std::visit(
|
||||
[](const auto& stream) {
|
||||
[&](const auto& stream) {
|
||||
if (stream->lowest_layer().is_open()) {
|
||||
stream->lowest_layer().cancel();
|
||||
if (m_Connected) {
|
||||
stream->lowest_layer().cancel();
|
||||
} else {
|
||||
stream->lowest_layer().close();
|
||||
}
|
||||
}
|
||||
},
|
||||
m_Stream
|
||||
|
|
@ -86,13 +311,13 @@ void PerfdataWriterConnection::Disconnect()
|
|||
m_ReconnectTimer.cancel();
|
||||
|
||||
Disconnect(std::move(yc));
|
||||
promise.set_value();
|
||||
ret.SetValue();
|
||||
} catch (const std::exception& ex) {
|
||||
promise.set_exception(std::current_exception());
|
||||
ret.SetException(std::current_exception());
|
||||
}
|
||||
});
|
||||
|
||||
promise.get_future().get();
|
||||
ret.Get();
|
||||
}
|
||||
|
||||
AsioTlsOrTcpStream PerfdataWriterConnection::MakeStream() const
|
||||
|
|
@ -133,6 +358,10 @@ void PerfdataWriterConnection::EnsureConnected(const boost::asio::yield_context&
|
|||
::Connect(stream->lowest_layer(), m_Host, m_Port, yc);
|
||||
|
||||
if constexpr (std::is_same_v<std::decay_t<decltype(stream)>, Shared<AsioTlsStream>::Ptr>) {
|
||||
if (m_Stopped) {
|
||||
BOOST_THROW_EXCEPTION(Stopped{});
|
||||
}
|
||||
|
||||
using type = boost::asio::ssl::stream_base::handshake_type;
|
||||
|
||||
stream->next_layer().async_handshake(type::client, yc);
|
||||
|
|
@ -156,7 +385,7 @@ void PerfdataWriterConnection::EnsureConnected(const boost::asio::yield_context&
|
|||
|
||||
void PerfdataWriterConnection::Disconnect(boost::asio::yield_context yc)
|
||||
{
|
||||
if (!m_Connected.exchange(false, std::memory_order_relaxed)) {
|
||||
if (!m_Connected.exchange(false)) {
|
||||
return;
|
||||
}
|
||||
|
||||
|
|
@ -165,8 +394,9 @@ void PerfdataWriterConnection::Disconnect(boost::asio::yield_context yc)
|
|||
if constexpr (std::is_same_v<std::decay_t<decltype(stream)>, Shared<AsioTlsStream>::Ptr>) {
|
||||
stream->GracefulDisconnect(m_Strand, yc);
|
||||
} else {
|
||||
stream->lowest_layer().shutdown(boost::asio::socket_base::shutdown_both);
|
||||
stream->lowest_layer().close();
|
||||
boost::system::error_code ec;
|
||||
stream->lowest_layer().shutdown(boost::asio::socket_base::shutdown_both, ec);
|
||||
stream->lowest_layer().close(ec);
|
||||
}
|
||||
},
|
||||
m_Stream
|
||||
|
|
|
|||
|
|
@ -4,12 +4,19 @@
|
|||
#pragma once
|
||||
|
||||
#include "base/io-engine.hpp"
|
||||
#include "base/logger.hpp"
|
||||
#include "base/timer.hpp"
|
||||
#include "base/tlsstream.hpp"
|
||||
#include <atomic>
|
||||
#include <boost/asio/buffer.hpp>
|
||||
#include <boost/asio/steady_timer.hpp>
|
||||
#include <boost/beast/http/message.hpp>
|
||||
#include <boost/beast/http/string_body.hpp>
|
||||
#include <chrono>
|
||||
#include <future>
|
||||
#include <utility>
|
||||
#include <mutex>
|
||||
#include <thread>
|
||||
|
||||
namespace icinga {
|
||||
|
||||
|
|
@ -21,6 +28,59 @@ class PerfdataWriterConnection final : public Object
|
|||
static constexpr auto InitialRetryWait = 50ms;
|
||||
static constexpr auto FinalRetryWait = 32s;
|
||||
|
||||
template<typename T>
|
||||
class SyncResult
|
||||
{
|
||||
using ValueType = std::variant<std::monostate, std::conditional_t<std::is_void_v<T>, bool, T>, std::exception_ptr>;
|
||||
|
||||
public:
|
||||
template<typename U, typename V = T, typename = std::enable_if_t<!std::is_void_v<V>>>
|
||||
void SetValue(U&& v)
|
||||
{
|
||||
std::lock_guard lock(m_Mutex);
|
||||
m_Value = std::forward<U>(v);
|
||||
m_Cv.notify_one();
|
||||
}
|
||||
|
||||
template<typename V = T, typename = std::enable_if_t<std::is_void_v<V>>>
|
||||
void SetValue()
|
||||
{
|
||||
std::lock_guard lock(m_Mutex);
|
||||
m_Value = true;
|
||||
m_Cv.notify_one();
|
||||
}
|
||||
|
||||
void SetException(std::exception_ptr ep)
|
||||
{
|
||||
std::lock_guard lock(m_Mutex);
|
||||
m_Value = ValueType{ep};
|
||||
m_Cv.notify_one();
|
||||
}
|
||||
|
||||
T Get()
|
||||
{
|
||||
std::unique_lock l(m_Mutex);
|
||||
m_Cv.wait(l, [&] { return !std::holds_alternative<std::monostate>(m_Value); });
|
||||
if (std::holds_alternative<std::exception_ptr>(m_Value)) {
|
||||
std::rethrow_exception(std::get<std::exception_ptr>(m_Value));
|
||||
}
|
||||
|
||||
if constexpr (std::is_void_v<T>) {
|
||||
return;
|
||||
} else {
|
||||
return std::move(std::get<T>(m_Value));
|
||||
}
|
||||
}
|
||||
|
||||
private:
|
||||
std::mutex m_Mutex;
|
||||
std::condition_variable m_Cv;
|
||||
ValueType m_Value;
|
||||
};
|
||||
|
||||
static constexpr auto slowThreshold = std::chrono::milliseconds(200);
|
||||
static constexpr auto stuckThreshold = std::chrono::seconds(5);
|
||||
|
||||
public:
|
||||
DECLARE_PTR_TYPEDEFS(PerfdataWriterConnection);
|
||||
|
||||
|
|
@ -29,6 +89,52 @@ public:
|
|||
[[nodiscard]] const char* what() const noexcept final { return "Connection stopped."; }
|
||||
};
|
||||
|
||||
enum class SendPhase : uint8_t {
|
||||
Idle = 0,
|
||||
Queued,
|
||||
Connecting,
|
||||
Writing,
|
||||
Finalizing,
|
||||
};
|
||||
|
||||
struct SendDiagnostics {
|
||||
SendPhase phase{SendPhase::Idle};
|
||||
std::chrono::steady_clock::time_point startedAt{};
|
||||
std::chrono::steady_clock::time_point spawnedAt{};
|
||||
std::chrono::steady_clock::time_point writingAt{};
|
||||
std::chrono::steady_clock::time_point doneAt{};
|
||||
};
|
||||
|
||||
/**
|
||||
* Executor-liveness probe state.
|
||||
*
|
||||
* The watchdog periodically posts a no-op onto an executor (m_Strand or
|
||||
* the bare io_context). If the executor is draining its queue, the post
|
||||
* completes and lastCompletedAt advances. If the executor is wedged the
|
||||
* post never runs and probePending stays true with a growing
|
||||
* probeRequestedAt age.
|
||||
*
|
||||
* Two probes are maintained in parallel to triangulate where a wedge sits:
|
||||
* - m_StrandProbe: posts onto m_Strand. Detects strand-local wedges
|
||||
* (some handler hogging this strand) as well as io_context-level ones.
|
||||
* - m_IoContextProbe: posts onto the bare io_context. Detects worker-pool
|
||||
* exhaustion (all io_context threads parked elsewhere). If the strand
|
||||
* probe stalls but this one runs promptly, the wedge is strand-local.
|
||||
*
|
||||
* lastCompletedThread records which io_context worker last serviced the
|
||||
* executor, to be cross-referenced with future per-thread instrumentation.
|
||||
*/
|
||||
struct ProbeState {
|
||||
bool probePending{false};
|
||||
std::chrono::steady_clock::time_point probeRequestedAt{};
|
||||
std::chrono::steady_clock::time_point lastCompletedAt{};
|
||||
std::thread::id lastCompletedThread{};
|
||||
};
|
||||
|
||||
~PerfdataWriterConnection() override;
|
||||
|
||||
static const char* SendPhaseName(SendPhase phase);
|
||||
|
||||
using HttpRequest = boost::beast::http::request<boost::beast::http::string_body>;
|
||||
using HttpResponse = boost::beast::http::response<boost::beast::http::string_body>;
|
||||
|
||||
|
|
@ -65,26 +171,58 @@ public:
|
|||
BOOST_THROW_EXCEPTION(Stopped{});
|
||||
}
|
||||
|
||||
using RetType = decltype(WriteMessage(std::declval<Buffer>(), std::declval<boost::asio::yield_context>()));
|
||||
std::promise<RetType> promise;
|
||||
auto start = std::chrono::steady_clock::now();
|
||||
|
||||
using RetType = decltype(WriteMessage(std::declval<Buffer>(), std::declval<boost::asio::yield_context>()));
|
||||
SyncResult<RetType> ret;
|
||||
|
||||
{
|
||||
std::lock_guard<std::mutex> lock{m_DiagMutex};
|
||||
m_Diag.phase = SendPhase::Queued;
|
||||
m_Diag.startedAt = start;
|
||||
}
|
||||
|
||||
m_SendFunction = [&](const boost::asio::yield_context& yc) {
|
||||
{
|
||||
auto now = std::chrono::steady_clock::now();
|
||||
std::lock_guard<std::mutex> lock{m_DiagMutex};
|
||||
m_Diag.spawnedAt = now;
|
||||
m_Diag.phase = SendPhase::Connecting;
|
||||
}
|
||||
|
||||
IoEngine::SpawnCoroutine(m_Strand, [&](boost::asio::yield_context yc) {
|
||||
while (true) {
|
||||
try {
|
||||
EnsureConnected(yc);
|
||||
|
||||
{
|
||||
std::lock_guard<std::mutex> lock{m_DiagMutex};
|
||||
m_Diag.phase = SendPhase::Writing;
|
||||
m_Diag.writingAt = std::chrono::steady_clock::now();
|
||||
}
|
||||
|
||||
if constexpr (std::is_void_v<RetType>) {
|
||||
WriteMessage(std::forward<Buffer>(buf), yc);
|
||||
promise.set_value();
|
||||
{
|
||||
std::lock_guard<std::mutex> lock{m_DiagMutex};
|
||||
m_Diag.phase = SendPhase::Finalizing;
|
||||
m_Diag.doneAt = std::chrono::steady_clock::now();
|
||||
}
|
||||
ret.SetValue();
|
||||
} else {
|
||||
promise.set_value(WriteMessage(std::forward<Buffer>(buf), yc));
|
||||
auto result = WriteMessage(std::forward<Buffer>(buf), yc);
|
||||
{
|
||||
std::lock_guard<std::mutex> lock{m_DiagMutex};
|
||||
m_Diag.phase = SendPhase::Finalizing;
|
||||
m_Diag.doneAt = std::chrono::steady_clock::now();
|
||||
}
|
||||
ret.SetValue(std::move(result));
|
||||
}
|
||||
|
||||
m_RetryTimeout = InitialRetryWait;
|
||||
return;
|
||||
} catch (const std::exception& ex) {
|
||||
if (m_Stopped) {
|
||||
promise.set_exception(std::make_exception_ptr(Stopped{}));
|
||||
ret.SetException(std::make_exception_ptr(Stopped{}));
|
||||
return;
|
||||
}
|
||||
|
||||
|
|
@ -95,17 +233,52 @@ public:
|
|||
m_Stream = MakeStream();
|
||||
m_Connected = false;
|
||||
|
||||
{
|
||||
std::lock_guard<std::mutex> lock{m_DiagMutex};
|
||||
m_Diag.phase = SendPhase::Connecting;
|
||||
m_Diag.spawnedAt = std::chrono::steady_clock::now();
|
||||
}
|
||||
|
||||
try {
|
||||
BackoffWait(yc);
|
||||
} catch (const std::exception&) {
|
||||
promise.set_exception(std::make_exception_ptr(Stopped{}));
|
||||
ret.SetException(std::make_exception_ptr(Stopped{}));
|
||||
return;
|
||||
}
|
||||
}
|
||||
}
|
||||
});
|
||||
};
|
||||
|
||||
return promise.get_future().get();
|
||||
boost::asio::post(m_Strand, [this](){ m_SendCv.NotifyAll(); });
|
||||
|
||||
auto resetAndLog = [this, start]() {
|
||||
SendDiagnostics diag;
|
||||
{
|
||||
std::lock_guard<std::mutex> lock{m_DiagMutex};
|
||||
diag = m_Diag;
|
||||
m_Diag.phase = SendPhase::Idle;
|
||||
}
|
||||
|
||||
auto totalElapsed = std::chrono::steady_clock::now() - start;
|
||||
if (totalElapsed >= slowThreshold) {
|
||||
LogSlowSend(diag, totalElapsed);
|
||||
}
|
||||
};
|
||||
|
||||
try {
|
||||
if constexpr (std::is_void_v<RetType>) {
|
||||
ret.Get();
|
||||
resetAndLog();
|
||||
return;
|
||||
} else {
|
||||
auto result = ret.Get();
|
||||
resetAndLog();
|
||||
return result;
|
||||
}
|
||||
} catch (const std::exception&) {
|
||||
resetAndLog();
|
||||
throw;
|
||||
}
|
||||
}
|
||||
|
||||
void Disconnect();
|
||||
|
|
@ -137,6 +310,12 @@ private:
|
|||
void WriteMessage(boost::asio::const_buffer, const boost::asio::yield_context& yc);
|
||||
HttpResponse WriteMessage(const HttpRequest& request, const boost::asio::yield_context& yc);
|
||||
|
||||
void StartWatchdog();
|
||||
void OnWatchdogTimer();
|
||||
void ProbeStrand();
|
||||
void ProbeIoContext();
|
||||
void LogSlowSend(const SendDiagnostics& diag, std::chrono::steady_clock::duration totalElapsed) const;
|
||||
|
||||
std::atomic_bool m_Stopped{false};
|
||||
std::atomic_bool m_Connected{false};
|
||||
|
||||
|
|
@ -151,7 +330,15 @@ private:
|
|||
std::chrono::milliseconds m_RetryTimeout{InitialRetryWait};
|
||||
boost::asio::steady_timer m_ReconnectTimer;
|
||||
boost::asio::io_context::strand m_Strand;
|
||||
AsioConditionVariable m_SendCv;
|
||||
std::function<void(const boost::asio::yield_context&)> m_SendFunction;
|
||||
AsioTlsOrTcpStream m_Stream;
|
||||
|
||||
mutable std::mutex m_DiagMutex;
|
||||
SendDiagnostics m_Diag;
|
||||
ProbeState m_StrandProbe;
|
||||
ProbeState m_IoContextProbe;
|
||||
Timer::Ptr m_WatchdogTimer;
|
||||
};
|
||||
|
||||
} // namespace icinga
|
||||
|
|
|
|||
|
|
@ -71,11 +71,20 @@ public:
|
|||
BOOST_REQUIRE(stream->next_layer().IsVerifyOK());
|
||||
}
|
||||
|
||||
void Shutdown()
|
||||
void Shutdown(bool wait = false)
|
||||
{
|
||||
BOOST_REQUIRE(std::holds_alternative<Shared<AsioTlsStream>::Ptr>(m_Stream));
|
||||
auto& stream = std::get<Shared<AsioTlsStream>::Ptr>(m_Stream);
|
||||
try {
|
||||
if (wait) {
|
||||
std::array<std::byte, 128> buf{};
|
||||
boost::asio::mutable_buffer readBuf (buf.data(), buf.size());
|
||||
boost::system::error_code ec;
|
||||
|
||||
do {
|
||||
stream->read_some(readBuf, ec);
|
||||
} while (!ec);
|
||||
}
|
||||
stream->next_layer().shutdown();
|
||||
} catch (const std::exception& ex) {
|
||||
if (const auto* se = dynamic_cast<const boost::system::system_error*>(&ex);
|
||||
|
|
|
|||
|
|
@ -207,7 +207,7 @@ BOOST_AUTO_TEST_CASE(stuck_reading_response)
|
|||
requestReadPromise.set_value();
|
||||
// Do not send a response but react to the shutdown to be polite.
|
||||
shutdownPromise.get_future().get();
|
||||
Shutdown();
|
||||
Shutdown(true);
|
||||
}};
|
||||
|
||||
TestThread timeoutThread{[&]() {
|
||||
|
|
@ -315,7 +315,7 @@ BOOST_AUTO_TEST_CASE(http_send_retry)
|
|||
|
||||
SendResponse();
|
||||
|
||||
Shutdown();
|
||||
Shutdown(true);
|
||||
}};
|
||||
|
||||
boost::beast::http::request<boost::beast::http::string_body> request{boost::beast::http::verb::post, "foo", 10};
|
||||
|
|
|
|||
Loading…
Reference in a new issue