Refactor OpenTsdbWriter to use a WorkQueue

This commit is contained in:
Johannes Schmidt 2025-12-12 13:27:02 +01:00
parent 7955d04ae3
commit 240142bcbd
2 changed files with 84 additions and 42 deletions

View file

@ -36,6 +36,8 @@ void OpenTsdbWriter::OnConfigLoaded()
{
ObjectImpl<OpenTsdbWriter>::OnConfigLoaded();
m_WorkQueue.SetName("OpenTsdbWriter, " + GetName());
if (!GetEnableHa()) {
Log(LogDebug, "OpenTsdbWriter")
<< "HA functionality disabled. Won't pause connection: " << GetName();
@ -51,14 +53,26 @@ void OpenTsdbWriter::OnConfigLoaded()
*
* @param status Key value pairs for feature stats
*/
void OpenTsdbWriter::StatsFunc(const Dictionary::Ptr& status, const Array::Ptr&)
void OpenTsdbWriter::StatsFunc(const Dictionary::Ptr& status, const Array::Ptr& perfdata)
{
DictionaryData nodes;
for (const OpenTsdbWriter::Ptr& opentsdbwriter : ConfigType::GetObjectsByType<OpenTsdbWriter>()) {
nodes.emplace_back(opentsdbwriter->GetName(), new Dictionary({
{ "connected", opentsdbwriter->GetConnected() }
}));
size_t workQueueItems = opentsdbwriter->m_WorkQueue.GetLength();
double workQueueItemRate = opentsdbwriter->m_WorkQueue.GetTaskCount(60) / 60.0;
nodes.emplace_back(
opentsdbwriter->GetName(),
new Dictionary({
{"connected", opentsdbwriter->GetConnected()},
{"work_queue_items", workQueueItems},
{"work_queue_item_rate", workQueueItemRate}
}
)
);
perfdata->Add(new PerfdataValue("opentsdbwriter_" + opentsdbwriter->GetName() + "_work_queue_items", workQueueItems));
perfdata->Add(new PerfdataValue("opentsdbwriter_" + opentsdbwriter->GetName() + "_work_queue_item_rate", workQueueItemRate));
}
status->Set("opentsdbwriter", new Dictionary(std::move(nodes)));
@ -74,6 +88,11 @@ void OpenTsdbWriter::Resume()
Log(LogInformation, "OpentsdbWriter")
<< "'" << GetName() << "' resumed.";
m_WorkQueue.SetExceptionCallback([](const boost::exception_ptr& exp) {
Log(LogDebug, "OpenTsdbWriter")
<< "Exception during OpenTsdb operation: " << DiagnosticInformation(exp);
});
ReadConfigTemplate();
m_ReconnectTimer = Timer::Create();
@ -95,6 +114,8 @@ void OpenTsdbWriter::Pause()
m_HandleCheckResults.disconnect();
m_ReconnectTimer->Stop(true);
m_WorkQueue.Join();
Log(LogInformation, "OpentsdbWriter")
<< "'" << GetName() << "' paused.";
@ -251,7 +272,7 @@ void OpenTsdbWriter::CheckResultHandler(const Checkable::Ptr& checkable, const C
String escaped_hostName = EscapeTag(host->GetName());
tags["host"] = escaped_hostName;
double ts = cr->GetExecutionEnd();
std::vector<std::pair<String, double>> metadata;
if (service) {
@ -262,40 +283,51 @@ void OpenTsdbWriter::CheckResultHandler(const Checkable::Ptr& checkable, const C
String escaped_serviceName = EscapeMetric(serviceName);
metric = "icinga.service." + escaped_serviceName;
}
SendMetric(checkable, metric + ".state", tags, service->GetState(), ts);
metadata.emplace_back("state", service->GetState());
} else {
if (!config_tmpl_metric.IsEmpty()) {
metric = config_tmpl_metric;
} else {
metric = "icinga.host";
}
SendMetric(checkable, metric + ".state", tags, host->GetState(), ts);
metadata.emplace_back("state", host->GetState());
}
SendMetric(checkable, metric + ".state_type", tags, checkable->GetStateType(), ts);
SendMetric(checkable, metric + ".reachable", tags, checkable->IsReachable(), ts);
SendMetric(checkable, metric + ".downtime_depth", tags, checkable->GetDowntimeDepth(), ts);
SendMetric(checkable, metric + ".acknowledgement", tags, checkable->GetAcknowledgement(), ts);
metadata.emplace_back("state_type", checkable->GetStateType());
metadata.emplace_back("reachable", checkable->IsReachable());
metadata.emplace_back("downtime_depth", checkable->GetDowntimeDepth());
metadata.emplace_back("acknowledgement", checkable->GetAcknowledgement());
SendPerfdata(checkable, metric, tags, cr, ts);
m_WorkQueue.Enqueue(
[this, checkable, service, cr, metric = std::move(metric), tags = std::move(tags), metadata = std::move(metadata), ts]() mutable {
double ts = cr->GetExecutionEnd();
metric = "icinga.check";
for (auto& [name, val] : metadata) {
AddMetric(checkable, metric + "." + name, tags, val, ts);
}
if (service) {
tags["type"] = "service";
String serviceName = service->GetShortName();
String escaped_serviceName = EscapeTag(serviceName);
tags["service"] = escaped_serviceName;
} else {
tags["type"] = "host";
}
AddPerfdata(checkable, metric, tags, cr, ts);
SendMetric(checkable, metric + ".current_attempt", tags, checkable->GetCheckAttempt(), ts);
SendMetric(checkable, metric + ".max_check_attempts", tags, checkable->GetMaxCheckAttempts(), ts);
SendMetric(checkable, metric + ".latency", tags, cr->CalculateLatency(), ts);
SendMetric(checkable, metric + ".execution_time", tags, cr->CalculateExecutionTime(), ts);
metric = "icinga.check";
if (service) {
tags["type"] = "service";
String serviceName = service->GetShortName();
String escaped_serviceName = EscapeTag(serviceName);
tags["service"] = escaped_serviceName;
} else {
tags["type"] = "host";
}
AddMetric(checkable, metric + ".current_attempt", tags, checkable->GetCheckAttempt(), ts);
AddMetric(checkable, metric + ".max_check_attempts", tags, checkable->GetMaxCheckAttempts(), ts);
AddMetric(checkable, metric + ".latency", tags, cr->CalculateLatency(), ts);
AddMetric(checkable, metric + ".execution_time", tags, cr->CalculateExecutionTime(), ts);
SendMsgBuffer();
}
);
}
/**
@ -307,9 +339,11 @@ void OpenTsdbWriter::CheckResultHandler(const Checkable::Ptr& checkable, const C
* @param cr Check result containing performance data
* @param ts Timestamp when the check result was received
*/
void OpenTsdbWriter::SendPerfdata(const Checkable::Ptr& checkable, const String& metric,
void OpenTsdbWriter::AddPerfdata(const Checkable::Ptr& checkable, const String& metric,
const std::map<String, String>& tags, const CheckResult::Ptr& cr, double ts)
{
ASSERT(m_WorkQueue.IsWorkerThread());
Array::Ptr perfdata = cr->GetPerformanceData();
if (!perfdata)
@ -350,21 +384,21 @@ void OpenTsdbWriter::SendPerfdata(const Checkable::Ptr& checkable, const String&
tags_new["label"] = escaped_key;
}
SendMetric(checkable, metric_name, tags_new, pdv->GetValue(), ts);
AddMetric(checkable, metric_name, tags_new, pdv->GetValue(), ts);
if (!pdv->GetCrit().IsEmpty())
SendMetric(checkable, metric_name + "_crit", tags_new, pdv->GetCrit(), ts);
AddMetric(checkable, metric_name + "_crit", tags_new, pdv->GetCrit(), ts);
if (!pdv->GetWarn().IsEmpty())
SendMetric(checkable, metric_name + "_warn", tags_new, pdv->GetWarn(), ts);
AddMetric(checkable, metric_name + "_warn", tags_new, pdv->GetWarn(), ts);
if (!pdv->GetMin().IsEmpty())
SendMetric(checkable, metric_name + "_min", tags_new, pdv->GetMin(), ts);
AddMetric(checkable, metric_name + "_min", tags_new, pdv->GetMin(), ts);
if (!pdv->GetMax().IsEmpty())
SendMetric(checkable, metric_name + "_max", tags_new, pdv->GetMax(), ts);
AddMetric(checkable, metric_name + "_max", tags_new, pdv->GetMax(), ts);
}
}
/**
* Send given metric to OpenTSDB
* Add given metric to the data buffer to be later sent to OpenTSDB
*
* @param checkable Host/service object
* @param metric Full metric name
@ -372,9 +406,11 @@ void OpenTsdbWriter::SendPerfdata(const Checkable::Ptr& checkable, const String&
* @param value Floating point metric value
* @param ts Timestamp where the metric was received from the check result
*/
void OpenTsdbWriter::SendMetric(const Checkable::Ptr& checkable, const String& metric,
void OpenTsdbWriter::AddMetric(const Checkable::Ptr& checkable, const String& metric,
const std::map<String, String>& tags, double value, double ts)
{
ASSERT(m_WorkQueue.IsWorkerThread());
String tags_string = "";
for (auto& tag : tags) {
@ -394,18 +430,21 @@ void OpenTsdbWriter::SendMetric(const Checkable::Ptr& checkable, const String& m
/* do not send \n to debug log */
msgbuf << "\n";
String put = msgbuf.str();
m_MsgBuf.append(msgbuf.str());
}
ObjectLock olock(this);
void OpenTsdbWriter::SendMsgBuffer()
{
ASSERT(m_WorkQueue.IsWorkerThread());
if (!GetConnected())
return;
try {
Log(LogDebug, "OpenTsdbWriter")
<< "Checkable '" << checkable->GetName() << "' sending message '" << put << "'.";
Log(LogDebug, "OpenTsdbWriter")
<< "Flushing data buffer to OpenTsdb.";
boost::asio::write(*m_Stream, boost::asio::buffer(msgbuf.str()));
try {
boost::asio::write(*m_Stream, boost::asio::buffer(std::exchange(m_MsgBuf, std::string{})));
m_Stream->flush();
} catch (const std::exception& ex) {
Log(LogCritical, "OpenTsdbWriter")

View file

@ -36,6 +36,8 @@ protected:
void Pause() override;
private:
WorkQueue m_WorkQueue{10000000, 1};
std::string m_MsgBuf;
Shared<AsioTcpStream>::Ptr m_Stream;
boost::signals2::connection m_HandleCheckResults;
@ -45,9 +47,10 @@ private:
Dictionary::Ptr m_HostConfigTemplate;
void CheckResultHandler(const Checkable::Ptr& checkable, const CheckResult::Ptr& cr);
void SendMetric(const Checkable::Ptr& checkable, const String& metric,
void AddMetric(const Checkable::Ptr& checkable, const String& metric,
const std::map<String, String>& tags, double value, double ts);
void SendPerfdata(const Checkable::Ptr& checkable, const String& metric,
void SendMsgBuffer();
void AddPerfdata(const Checkable::Ptr& checkable, const String& metric,
const std::map<String, String>& tags, const CheckResult::Ptr& cr, double ts);
static String EscapeTag(const String& str);
static String EscapeMetric(const String& str);