icinga2/lib/icingadb/icingadb.cpp
Yonas Habteab cbb4147055 RedisConnection: simplify query prioritization logic
As opposed to the previous version which used a complex data structure
to correctly manage the query priorities, this version uses two separate
queues for the high and normal priority writes. All high priority writes
are processed in FIFO order but over take all queries from the normal
priority queue. The later queue only be processed when the high priority
queue is empty.
2026-04-02 16:37:57 +02:00

342 lines
9.5 KiB
C++

// SPDX-FileCopyrightText: 2012 Icinga GmbH <https://icinga.com>
// SPDX-License-Identifier: GPL-2.0-or-later
#include "icingadb/icingadb.hpp"
#include "icingadb/icingadb-ti.cpp"
#include "icingadb/redisconnection.hpp"
#include "remote/apilistener.hpp"
#include "remote/eventqueue.hpp"
#include "base/configuration.hpp"
#include "base/json.hpp"
#include "base/perfdatavalue.hpp"
#include "base/statsfunction.hpp"
#include "base/tlsutility.hpp"
#include "base/utility.hpp"
#include "icinga/checkable.hpp"
#include "icinga/host.hpp"
#include <boost/algorithm/string.hpp>
#include <fstream>
#include <memory>
#include <utility>
using namespace icinga;
#define MAX_EVENTS_DEFAULT 5000
String IcingaDB::m_EnvironmentId;
std::mutex IcingaDB::m_EnvironmentIdInitMutex;
REGISTER_TYPE(IcingaDB);
IcingaDB::IcingaDB()
: m_Rcon(nullptr)
{
m_RconLocked.store(nullptr);
m_WorkQueue.SetName("IcingaDB");
}
void IcingaDB::Validate(int types, const ValidationUtils& utils)
{
ObjectImpl<IcingaDB>::Validate(types, utils);
if (!(types & FAConfig))
return;
if (!GetUsername().IsEmpty() && GetPassword().IsEmpty()) {
BOOST_THROW_EXCEPTION(ValidationError(this, std::vector<String>(),
"Redis password must be set, if username is provided."));
}
if (GetEnableTls() && GetCertPath().IsEmpty() != GetKeyPath().IsEmpty()) {
BOOST_THROW_EXCEPTION(ValidationError(this, std::vector<String>(), "Either both a client certificate (cert_path) and its private key (key_path) or none of them must be given."));
}
try {
InitEnvironmentId();
} catch (const std::exception& e) {
BOOST_THROW_EXCEPTION(ValidationError(this, std::vector<String>(), e.what()));
}
}
/**
* Starts the component.
*/
void IcingaDB::Start(bool runtimeCreated)
{
ObjectImpl<IcingaDB>::Start(runtimeCreated);
VERIFY(!m_EnvironmentId.IsEmpty());
PersistEnvironmentId();
Log(LogInformation, "IcingaDB")
<< "'" << GetName() << "' started.";
m_ConfigDumpInProgress = false;
m_ConfigDumpDone = false;
m_WorkQueue.SetExceptionCallback([this](boost::exception_ptr exp) { ExceptionHandler(std::move(exp)); });
RedisConnInfo::ConstPtr connInfo = GetRedisConnInfo();
m_Rcon = new RedisConnection(connInfo);
m_RconLocked.store(m_Rcon);
m_RconWorker = new RedisConnection(connInfo, m_Rcon);
for (const auto& [type, _] : GetSyncableTypes()) {
auto ctype (dynamic_cast<ConfigType*>(type.get()));
if (!ctype)
continue;
RedisConnection::Ptr con = new RedisConnection(connInfo, m_Rcon);
con->SetConnectedCallback([this, con](boost::asio::yield_context&) {
con->SetConnectedCallback(nullptr);
size_t pending = --m_PendingRcons;
Log(LogDebug, "IcingaDB") << pending << " pending child connections remaining";
if (pending == 0) {
m_WorkQueue.Enqueue([this]() { OnConnectedHandler(); });
}
});
m_Rcons[ctype] = std::move(con);
}
m_PendingRcons = m_Rcons.size();
m_Rcon->SetConnectedCallback([this](boost::asio::yield_context&) {
m_Rcon->SetConnectedCallback(nullptr);
m_RconWorker->Start();
for (auto& kv : m_Rcons) {
kv.second->Start();
}
});
m_Rcon->Start();
m_StatsTimer = Timer::Create();
m_StatsTimer->SetInterval(1);
m_StatsTimer->OnTimerExpired.connect([this](const Timer * const&) { PublishStatsTimerHandler(); });
m_StatsTimer->Start();
m_WorkQueue.SetName("IcingaDB");
Ptr keepAlive (this);
m_HistoryThread = std::async(std::launch::async, [this, keepAlive]() { ForwardHistoryEntries(); });
m_PendingItemsThread = std::thread([this, keepAlive] { PendingItemsThreadProc(); });
}
void IcingaDB::ExceptionHandler(boost::exception_ptr exp)
{
Log(LogCritical, "IcingaDB", "Exception during redis query. Verify that Redis is operational.");
Log(LogDebug, "IcingaDB")
<< "Exception during redis operation: " << DiagnosticInformation(exp);
}
void IcingaDB::OnConnectedHandler()
{
AssertOnWorkQueue();
if (m_ConfigDumpInProgress || m_ConfigDumpDone)
return;
/* Config dump */
m_ConfigDumpInProgress = true;
PublishStats();
UpdateAllConfigObjects();
m_ConfigDumpDone.store(true);
m_ConfigDumpInProgress = false;
// Notify the pending items worker to let it know that the config dump is done,
// and it can start processing pending items.
m_PendingItemsCV.notify_one();
}
void IcingaDB::PublishStatsTimerHandler(void)
{
PublishStats();
}
void IcingaDB::PublishStats()
{
if (!m_Rcon || !m_Rcon->IsConnected())
return;
Dictionary::Ptr status = GetStats();
status->Set("config_dump_in_progress", m_ConfigDumpInProgress);
status->Set("timestamp", TimestampToMilliseconds(Utility::GetTime()));
status->Set("icingadb_environment", m_EnvironmentId);
RedisConnection::Query query {"XADD", "icinga:stats", "MAXLEN", "1", "*"};
{
ObjectLock statusLock (status);
for (auto& kv : status) {
query.emplace_back(kv.first);
query.emplace_back(JsonEncode(kv.second));
}
}
m_Rcon->FireAndForgetQuery(std::move(query), {}, true /* high priority */);
}
void IcingaDB::Stop(bool runtimeRemoved)
{
Log(LogInformation, "IcingaDB")
<< "Flushing history data buffer to Redis.";
m_PendingItemsCV.notify_all(); // Wake up the pending items worker to let it exit cleanly.
if (m_HistoryThread.wait_for(std::chrono::minutes(1)) == std::future_status::timeout) {
Log(LogCritical, "IcingaDB")
<< "Flushing takes more than one minute (while we're about to shut down). Giving up and discarding "
<< m_HistoryBulker.Size() << " queued history queries.";
}
m_StatsTimer->Stop(true);
m_PendingItemsThread.join();
Log(LogInformation, "IcingaDB")
<< "'" << GetName() << "' stopped.";
ObjectImpl<IcingaDB>::Stop(runtimeRemoved);
}
void IcingaDB::ValidateTlsProtocolmin(const Lazy<String>& lvalue, const ValidationUtils& utils)
{
ObjectImpl<IcingaDB>::ValidateTlsProtocolmin(lvalue, utils);
try {
ResolveTlsProtocolVersion(lvalue());
} catch (const std::exception& ex) {
BOOST_THROW_EXCEPTION(ValidationError(this, { "tls_protocolmin" }, ex.what()));
}
}
void IcingaDB::ValidateConnectTimeout(const Lazy<double>& lvalue, const ValidationUtils& utils)
{
ObjectImpl<IcingaDB>::ValidateConnectTimeout(lvalue, utils);
if (lvalue() <= 0) {
BOOST_THROW_EXCEPTION(ValidationError(this, { "connect_timeout" }, "Value must be greater than 0."));
}
}
void IcingaDB::AssertOnWorkQueue()
{
ASSERT(m_WorkQueue.IsWorkerThread());
}
void IcingaDB::DumpedGlobals::Reset()
{
std::lock_guard<std::mutex> l (m_Mutex);
m_Ids.clear();
}
String IcingaDB::GetEnvironmentId() const {
return m_EnvironmentId;
}
bool IcingaDB::DumpedGlobals::IsNew(const String& id)
{
std::lock_guard<std::mutex> l (m_Mutex);
return m_Ids.emplace(id).second;
}
/**
* Initializes the m_EnvironmentId attribute or throws an exception on failure to do so. Can be called concurrently.
*/
void IcingaDB::InitEnvironmentId()
{
// Initialize m_EnvironmentId once across all IcingaDB objects. In theory, this could be done using
// std::call_once, however, due to a bug in libstdc++ (https://gcc.gnu.org/bugzilla/show_bug.cgi?id=66146),
// this can result in a deadlock when an exception is thrown (which is explicitly allowed by the standard).
std::unique_lock<std::mutex> lock (m_EnvironmentIdInitMutex);
if (m_EnvironmentId.IsEmpty()) {
String path = Configuration::DataDir + "/icingadb.env";
String envId;
if (Utility::PathExists(path)) {
envId = Utility::LoadJsonFile(path);
if (envId.GetLength() != 2*SHA_DIGEST_LENGTH) {
throw std::runtime_error("environment ID stored at " + path + " is corrupt: wrong length.");
}
for (unsigned char c : envId) {
if (!std::isxdigit(c)) {
throw std::runtime_error("environment ID stored at " + path + " is corrupt: invalid hex string.");
}
}
} else {
String caPath = ApiListener::GetDefaultCaPath();
if (!Utility::PathExists(caPath)) {
throw std::runtime_error("Cannot find the CA certificate at '" + caPath + "'. "
"Please ensure the ApiListener is enabled first using 'icinga2 api setup'.");
}
std::shared_ptr<X509> cert = GetX509Certificate(caPath);
unsigned int n;
unsigned char digest[EVP_MAX_MD_SIZE];
if (X509_pubkey_digest(cert.get(), EVP_sha1(), digest, &n) != 1) {
BOOST_THROW_EXCEPTION(openssl_error()
<< boost::errinfo_api_function("X509_pubkey_digest")
<< errinfo_openssl_error(ERR_peek_error()));
}
envId = BinaryToHex(digest, n);
}
m_EnvironmentId = envId.ToLower();
}
}
/**
* Ensures that the environment ID is persisted on disk or throws an exception on failure to do so.
* Can be called concurrently.
*/
void IcingaDB::PersistEnvironmentId()
{
String path = Configuration::DataDir + "/icingadb.env";
std::unique_lock<std::mutex> lock (m_EnvironmentIdInitMutex);
if (!Utility::PathExists(path)) {
Utility::SaveJsonFile(path, 0600, m_EnvironmentId);
}
}
/**
* Constructs a RedisConnInfo object from the IcingaDB configuration.
*
* @return The RedisConnInfo object
*/
RedisConnInfo::ConstPtr IcingaDB::GetRedisConnInfo() const
{
RedisConnInfo::Ptr connInfo = new RedisConnInfo();
connInfo->Port = GetPort();
connInfo->DbIndex = GetDbIndex();
connInfo->Host = GetHost();
connInfo->Path = GetPath();
connInfo->User = GetUsername();
connInfo->Password = GetPassword();
connInfo->EnableTls = GetEnableTls();
connInfo->TlsCertPath = GetCertPath();
connInfo->TlsKeyPath = GetKeyPath();
connInfo->TlsCaPath = GetCaPath();
connInfo->TlsCrlPath = GetCrlPath();
connInfo->TlsCipherList = GetCipherList();
connInfo->TlsProtocolMin = GetTlsProtocolmin();
connInfo->TlsInsecureNoverify = GetInsecureNoverify();
connInfo->ConnectTimeout = GetConnectTimeout();
connInfo->DbgInfo = GetDebugInfo();
return connInfo;
}