From eab507f89230166ddc9b4d018bbb2a51d65d8020 Mon Sep 17 00:00:00 2001 From: Yonas Habteab Date: Thu, 30 Oct 2025 17:22:07 +0100 Subject: [PATCH] IcingaDB: track object changesets efficiently --- lib/icinga/dependency-group.cpp | 16 + lib/icinga/dependency.hpp | 1 + lib/icingadb/CMakeLists.txt | 2 +- lib/icingadb/icingadb-utility.cpp | 72 +++-- lib/icingadb/icingadb-worker.cpp | 489 ++++++++++++++++++++++++++++++ lib/icingadb/icingadb.cpp | 10 +- lib/icingadb/icingadb.hpp | 182 ++++++++++- lib/icingadb/redisconnection.hpp | 2 +- 8 files changed, 746 insertions(+), 28 deletions(-) create mode 100644 lib/icingadb/icingadb-worker.cpp diff --git a/lib/icinga/dependency-group.cpp b/lib/icinga/dependency-group.cpp index ff93bfd44..880cea344 100644 --- a/lib/icinga/dependency-group.cpp +++ b/lib/icinga/dependency-group.cpp @@ -144,6 +144,22 @@ void DependencyGroup::LoadParents(std::set& parents) const } } +/** + * Retrieve any child Checkable from the current dependency group. + * + * @return - Returns the first child Checkable found in this group, or nullptr if there are no children. + */ +Checkable::Ptr DependencyGroup::GetAnyChild() const +{ + std::lock_guard lock(m_Mutex); + for (auto& [_, children] : m_Members) { + if (!children.empty()) { + return children.begin()->second->GetChild(); + } + } + return nullptr; +} + /** * Retrieve the number of dependency objects in the current dependency group. * diff --git a/lib/icinga/dependency.hpp b/lib/icinga/dependency.hpp index 7ef1866dd..28d148e3e 100644 --- a/lib/icinga/dependency.hpp +++ b/lib/icinga/dependency.hpp @@ -164,6 +164,7 @@ public: void RemoveDependency(const Dependency::Ptr& dependency); std::vector GetDependenciesForChild(const Checkable* child) const; void LoadParents(std::set& parents) const; + Checkable::Ptr GetAnyChild() const; size_t GetDependenciesCount() const; void SetIcingaDBIdentifier(const String& identifier); diff --git a/lib/icingadb/CMakeLists.txt b/lib/icingadb/CMakeLists.txt index 04cf6a92d..49ac16b89 100644 --- a/lib/icingadb/CMakeLists.txt +++ b/lib/icingadb/CMakeLists.txt @@ -6,7 +6,7 @@ mkclass_target(icingadb.ti icingadb-ti.cpp icingadb-ti.hpp) mkembedconfig_target(icingadb-itl.conf icingadb-itl.cpp) set(icingadb_SOURCES - icingadb.cpp icingadb-objects.cpp icingadb-stats.cpp icingadb-utility.cpp redisconnection.cpp icingadb-ti.hpp + icingadb.cpp icingadb-objects.cpp icingadb-stats.cpp icingadb-utility.cpp icingadb-worker.cpp redisconnection.cpp icingadb-ti.hpp icingadbchecktask.cpp icingadb-itl.cpp ) diff --git a/lib/icingadb/icingadb-utility.cpp b/lib/icingadb/icingadb-utility.cpp index c588e93bf..74dc9afe7 100644 --- a/lib/icingadb/icingadb-utility.cpp +++ b/lib/icingadb/icingadb-utility.cpp @@ -26,6 +26,18 @@ using namespace icinga; +/** + * Checks if the given Redis key is a state key (ends with ":state"). + * + * @param key The Redis key to check. + * + * @return true if the key is a state key, false otherwise. + */ +bool IcingaDB::IsStateKey(const RedisConnection::QueryArg& key) +{ + return boost::algorithm::ends_with(static_cast(key), ":state"); +} + String IcingaDB::FormatCheckSumBinary(const String& str) { char output[20*2+1]; @@ -170,30 +182,8 @@ Dictionary::Ptr IcingaDB::SerializeVars(const Dictionary::Ptr& vars) */ Dictionary::Ptr IcingaDB::SerializeDependencyEdgeState(const DependencyGroup::Ptr& dependencyGroup, const Dependency::Ptr& dep) { - String edgeStateId; - // The edge state ID is computed a bit differently depending on whether this is for a redundancy group or not. - // For redundancy groups, the state ID is supposed to represent the connection state between the redundancy group - // and the parent Checkable of the given dependency. Hence, the outcome will always be different for each parent - // Checkable of the redundancy group. - if (dependencyGroup->IsRedundancyGroup()) { - edgeStateId = HashValue(new Array{ - dependencyGroup->GetIcingaDBIdentifier(), - GetObjectIdentifier(dep->GetParent()), - }); - } else if (dependencyGroup->GetIcingaDBIdentifier().IsEmpty()) { - // For non-redundant dependency groups, on the other hand, all dependency objects within that group will - // always have the same parent Checkable. Likewise, the state ID will be always the same as well it doesn't - // matter which dependency object is used to compute it. Therefore, it's sufficient to compute it only once - // and all the other dependency objects can reuse the cached state ID. - edgeStateId = HashValue(new Array{dependencyGroup->GetCompositeKey(), GetObjectIdentifier(dep->GetParent())}); - dependencyGroup->SetIcingaDBIdentifier(edgeStateId); - } else { - // Use the already computed state ID for the dependency group. - edgeStateId = dependencyGroup->GetIcingaDBIdentifier(); - } - return new Dictionary{ - {"id", std::move(edgeStateId)}, + {"id", GetDependencyEdgeStateId(dependencyGroup, dep)}, {"environment_id", m_EnvironmentId}, {"failed", !dep->IsAvailable(DependencyState) || !dep->GetParent()->IsReachable()} }; @@ -220,6 +210,42 @@ Dictionary::Ptr IcingaDB::SerializeRedundancyGroupState(const Checkable::Ptr& ch }; } +/** + * Computes the dependency edge state ID for the given dependency object. + * + * The edge state ID is computed a bit differently depending on whether this is for a redundancy group or not. + * For redundancy groups, the state ID is supposed to represent the connection state between the redundancy group + * and the parent Checkable of the given dependency. Hence, the outcome will always be different for each parent + * Checkable of the redundancy group. + * + * For non-redundant dependency groups, on the other hand, all dependency objects within that group will + * always have the same parent Checkable. Likewise, the state ID will be always the same as well it doesn't + * matter which dependency object is used to compute it. Therefore, it's sufficient to compute it only once + * and all the other dependency objects can reuse the cached state ID. Thus, this function will cache the just + * computed state ID in the dependency group object itself for later reuse. + * + * @param dependencyGroup The dependency group the dependency is part of. + * @param dep The dependency object to compute the state ID for. + * + * @return The computed edge state ID. + */ +String IcingaDB::GetDependencyEdgeStateId(const DependencyGroup::Ptr& dependencyGroup, const Dependency::Ptr& dep) +{ + if (dependencyGroup->IsRedundancyGroup()) { + return HashValue(new Array{ + dependencyGroup->GetIcingaDBIdentifier(), + GetObjectIdentifier(dep->GetParent()), + }); + } + if (dependencyGroup->GetIcingaDBIdentifier().IsEmpty()) { + auto edgeStateId = HashValue(new Array{dependencyGroup->GetCompositeKey(), GetObjectIdentifier(dep->GetParent())}); + dependencyGroup->SetIcingaDBIdentifier(edgeStateId); + return edgeStateId; + } + // Use the already computed state ID for the dependency group. + return dependencyGroup->GetIcingaDBIdentifier(); +} + /** * Converts the given filter to its Redis value representation. * diff --git a/lib/icingadb/icingadb-worker.cpp b/lib/icingadb/icingadb-worker.cpp new file mode 100644 index 000000000..6c1323d4d --- /dev/null +++ b/lib/icingadb/icingadb-worker.cpp @@ -0,0 +1,489 @@ +// SPDX-FileCopyrightText: 2025 Icinga GmbH +// SPDX-License-Identifier: GPL-3.0-or-later + +#include "icingadb/icingadb.hpp" +#include "base/logger.hpp" +#include + +using namespace icinga; + +PendingQueueItem::PendingQueueItem(PendingItemKey&& id, uint32_t dirtyBits) + : DirtyBits{dirtyBits & DirtyBitsAll}, ID{std::move(id)}, EnqueueTime{std::chrono::steady_clock::now()} +{ +} + +PendingConfigItem::PendingConfigItem(const ConfigObject::Ptr& obj, uint32_t bits) + : PendingQueueItem{std::make_pair(obj, nullptr), bits}, Object{obj} +{ +} +PendingDependencyGroupStateItem::PendingDependencyGroupStateItem(const DependencyGroup::Ptr& depGroup) + : PendingQueueItem{std::make_pair(nullptr, depGroup), 0}, DepGroup{depGroup} +{ +} + +PendingDependencyEdgeItem::PendingDependencyEdgeItem(const DependencyGroup::Ptr& depGroup, const Checkable::Ptr& child) + : PendingQueueItem{std::make_pair(child, depGroup), 0}, DepGroup{depGroup}, Child{child} +{ +} + +RelationsDeletionItem::RelationsDeletionItem(const String& id, RelationsKeySet relations) + : PendingQueueItem{id, 0}, Relations{std::move(relations)} +{ +} + +/** + * Background worker thread procedure for processing pending items. + * + * This function runs in a separate thread and continuously processes pending items that have been + * enqueued for Redis updates. It waits for new items to be added to the pending items container, + * and processes them one at a time, ensuring that the Redis connection is active and not overloaded + * with too many pending queries. The function also implements a delay mechanism to allow for potential + * additional changes to be merged into the same item before processing it. + */ +void IcingaDB::PendingItemsThreadProc() +{ + using namespace std::chrono_literals; + namespace ch = std::chrono; + + // Limits the number of pending queries the Rcon can have at any given time to reduce the memory overhead to + // the absolute minimum necessary, since the size of the pending queue items is much smaller than the size + // of the actual Redis queries. Thus, this will slow down the worker thread a bit from generating too many + // Redis queries when the Redis connection is saturated. + constexpr std::size_t maxPendingQueries = 128; + + std::unique_lock lock(m_PendingItemsMutex); + // Wait until the initial config dump is done. IcingaDB::OnConnectedHandler will notify us once it's finished. + while (GetActive() && !m_ConfigDumpDone) m_PendingItemsCV.wait(lock); + + // Predicate to determine whether the worker thread is allowed to process pending items. + auto canContinue = [this] { + if (!GetActive()) { + return true; + } + return !m_PendingItems.empty() && m_RconWorker && m_RconWorker->IsConnected() && m_RconWorker->GetPendingQueryCount() < maxPendingQueries; + }; + + while (true) { + // Even if someone notifies us, we still need to verify whether the precondition is actually fulfilled. + // However, in case we don't receive any notification, we still want to wake up periodically on our own + // to check whether we can proceed (e.g. the Redis connection might have become available again and there + // was no activity on the pending items queue to trigger a notification). Thus, we use a timed wait here. + while (!canContinue()) m_PendingItemsCV.wait_for(lock, 100ms); + + if (!GetActive()) { + break; + } + if (auto retryAfter = DequeueAndProcessOne(lock); retryAfter > 0ms) { + m_PendingItemsCV.wait_for(lock, retryAfter); + } + } +} + +/** + * Dequeue and process a single pending item. + * + * This function processes a single pending item from the pending items container. It iterates over + * the items in insertion order and checks if the first item is old enough to be processed (at least + * 1000ms old) unless we're being shutting down. If the item can be processed, it attempts to acquire + * a lock on the associated config object (if applicable) and processes the item accordingly. + * + * If the item cannot be processed right now because it's too new, the function returns a duration + * indicating how long to wait before retrying. Also, if no progress was made during this iteration + * (i.e., no item was processed), it returns a short delay to avoid busy-looping. + * + * @param lock A unique lock on the pending items mutex (must be acquired before calling this function). + * + * @return A duration indicating how long to wait before retrying. + */ +std::chrono::duration IcingaDB::DequeueAndProcessOne(std::unique_lock& lock) +{ + using namespace std::chrono_literals; + namespace ch = std::chrono; + + bool madeProgress = false; // Did we make any progress in this iteration? + ch::duration retryAfter{0}; // If we can't process anything right now, how long to wait before retrying? + auto now = ch::steady_clock::now(); + + auto& seqView = m_PendingItems.get<1>(); + for (auto it(seqView.begin()); it != seqView.end(); ++it) { + if (it != seqView.begin()) { + if (std::holds_alternative(*it)) { + // We don't know whether the previous items are related to this deletion item or not, + // thus we can't just process this right now when there are older items in the queue. + // Otherwise, we might delete something that is going to be updated/created. + break; + } + } + + auto age = now - std::visit([](const auto& item) { return item.EnqueueTime; }, *it); + if (GetActive() && 1000ms > age) { + if (it == seqView.begin()) { + retryAfter = 1000ms - age; + } + break; + } + + ConfigObject::Ptr cobj; + if (auto* citem = std::get_if(&*it); citem) { + cobj = citem->Object; + } + + ObjectLock olock(cobj, std::defer_lock); + if (cobj && !olock.TryLock()) { + continue; // Can't lock the object right now, try the next one. + } + + PendingItemVariant itemToProcess = *it; + seqView.erase(it); + madeProgress = true; + + lock.unlock(); + try { + std::visit([this](const auto& item) { ProcessPendingItem(item); }, itemToProcess); + } catch (const std::exception& ex) { + Log(LogCritical, "IcingaDB") + << "Exception while processing pending item of type index '" << itemToProcess.index() << "': " + << DiagnosticInformation(ex, GetActive()); + } + lock.lock(); + break; + } + + if (!madeProgress && retryAfter == 0ms) { + // We haven't made any progress, so give it a short delay before retrying. + retryAfter = 10ms; + } + return retryAfter; +} + +/** + * Process a single pending object. + * + * This function processes a single pending object based on its dirty bits. It checks if the object is a + * @c ConfigObject and performs the appropriate actions such as sending configuration updates, state updates, + * or deletions to the Redis connection. The function handles different types of objects, including @c Checkable + * objects, and ensures that the correct updates are sent based on the dirty bits set for the object. + * + * @param item The pending item containing the object and its dirty bits. + */ +void IcingaDB::ProcessPendingItem(const PendingConfigItem& item) +{ + if (item.DirtyBits & ConfigDelete) { + auto redisKeyPair = GetSyncableTypeRedisKeys(item.Object->GetReflectionType()); + m_RconWorker->FireAndForgetQueries( + { + {"HDEL", redisKeyPair.ObjectKey, GetObjectIdentifier(item.Object)}, + {"HDEL", redisKeyPair.ChecksumKey, GetObjectIdentifier(item.Object)}, + { + "XADD", + "icinga:runtime", + "MAXLEN", + "~", + "1000000", + "*", + "redis_key", + redisKeyPair.ObjectKey, + "id", + GetObjectIdentifier(item.Object), + "runtime_type", + "delete" + } + }, + RedisConnection::QueryPriority::Config + ); + } + + if (item.DirtyBits & ConfigUpdate) { + std::map hMSets; + std::vector runtimeUpdates; + CreateConfigUpdate(item.Object, GetSyncableTypeRedisKeys(item.Object->GetReflectionType()), hMSets, runtimeUpdates, true); + ExecuteRedisTransaction(m_RconWorker, hMSets, runtimeUpdates); + } + + if (auto checkable = dynamic_pointer_cast(item.Object); checkable) { + if (item.DirtyBits & FullState) { + UpdateState(checkable, item.DirtyBits); + } + if (item.DirtyBits & NextUpdate) { + SendNextUpdate(checkable); + } + } +} + +/** + * Process a single pending dependency group state item. + * + * This function processes a single pending dependency group state item by updating the dependencies + * state for the associated dependency group. It selects any child checkable from the dependency group + * to initiate the state update process. + * + * @param item The pending dependency group state item containing the dependency group. + */ +void IcingaDB::ProcessPendingItem(const PendingDependencyGroupStateItem& item) const +{ + // For dependency group state updates, we don't actually care which child triggered the update, + // since all children share the same dependency group state. Thus, we can just pick any child to + // start the update from. + if (auto child = item.DepGroup->GetAnyChild(); child) { + UpdateDependenciesState(child, item.DepGroup); + } +} + +/** + * Process a single pending dependency edge item. + * + * This function fully serializes a single pending dependency edge item (child registration) + * and sends all the resulting Redis queries in a single transaction. The dependencies (edges) + * to serialize are determined by the dependency group and child checkable the provided item represents. + * + * @param item The pending dependency edge item containing the dependency group and child checkable. + */ +void IcingaDB::ProcessPendingItem(const PendingDependencyEdgeItem& item) +{ + std::vector runtimeUpdates; + std::map hMSets; + InsertCheckableDependencies(item.Child, hMSets, &runtimeUpdates, item.DepGroup); + ExecuteRedisTransaction(m_RconWorker, hMSets, runtimeUpdates); +} + +/** + * Process a single pending deletion item. + * + * This function processes a single pending deletion item by deleting the specified sub-keys + * from Redis based on the provided deletion keys map. It ensures that the object's ID is + * removed from the specified Redis keys and their corresponding checksum keys if indicated. + * + * @param item The pending deletion item containing the ID and deletion keys map. + */ +void IcingaDB::ProcessPendingItem(const RelationsDeletionItem& item) +{ + ASSERT(std::holds_alternative(item.ID)); // Relation deletion items must have real IDs. + + auto id = std::get(item.ID); + for (auto [configKey, checksumKey] : item.Relations) { + if (IsStateKey(configKey)) { + DeleteState(id, configKey, checksumKey); + } else { + DeleteRelationship(id, configKey, checksumKey); + } + } +} + +/** + * Enqueue a configuration object for processing in the pending objects thread. + * + * @param object The configuration object to be enqueued for processing. + * @param bits The dirty bits indicating the type of changes to be processed for the object. + */ +void IcingaDB::EnqueueConfigObject(const ConfigObject::Ptr& object, uint32_t bits) +{ + if (!GetActive() || !m_RconWorker || !m_RconWorker->IsConnected()) { + return; // No need to enqueue anything if we're not connected. + } + + { + std::lock_guard lock(m_PendingItemsMutex); + if (auto [it, inserted] = m_PendingItems.insert(PendingConfigItem{object, bits}); !inserted) { + m_PendingItems.modify(it, [bits](PendingItemVariant& itemToProcess) { + std::visit( + [&bits](auto& item) { + if (bits & ConfigDelete) { + item.DirtyBits &= ~(ConfigUpdate | FullState); + } else if (bits & ConfigUpdate) { + item.DirtyBits &= ~ConfigDelete; + } + item.DirtyBits |= bits & DirtyBitsAll; + }, + itemToProcess + ); + }); + } + } + m_PendingItemsCV.notify_one(); +} + +void IcingaDB::EnqueueDependencyGroupStateUpdate(const DependencyGroup::Ptr& depGroup) +{ + if (GetActive() && m_RconWorker && m_RconWorker->IsConnected()) { + { + std::lock_guard lock(m_PendingItemsMutex); + m_PendingItems.insert(PendingDependencyGroupStateItem{depGroup}); + } + m_PendingItemsCV.notify_one(); + } +} + +/** + * Enqueue the registration of a dependency child to a dependency group. + * + * This function adds a pending item to the queue for processing the registration of a child checkable + * to a dependency group. If there is no active Redis connection available, this function is a no-op. + * + * @param depGroup The dependency group to which the child is being registered. + * @param child The child checkable being registered to the dependency group. + */ +void IcingaDB::EnqueueDependencyChildRegistered(const DependencyGroup::Ptr& depGroup, const Checkable::Ptr& child) +{ + if (GetActive() && m_RconWorker && m_RconWorker->IsConnected()) { + { + std::lock_guard lock(m_PendingItemsMutex); + m_PendingItems.insert(PendingDependencyEdgeItem{depGroup, child}); + } + m_PendingItemsCV.notify_one(); + } +} + +/** + * Enqueue the removal of a dependency child from a dependency group. + * + * This function handles the removal of a child checkable from a dependency group by first checking if there + * are any pending registration items for the same child and dependency group. If such an item exists, it is + * removed from the pending items queue, effectively canceling the registration. If there is also a pending + * dependency group state update triggered by the same child, it is either removed or updated to use a different + * child if the group is not being removed entirely. If no pending registration exists, the function proceeds + * to enqueue the necessary deletions in Redis for the dependencies and related nodes and edges. + * + * @param depGroup The dependency group from which the child is being removed. + * @param dependencies The list of dependencies associated with the child being removed. + * @param removeGroup A flag indicating whether the entire dependency group should be removed. + */ +void IcingaDB::EnqueueDependencyChildRemoved( + const DependencyGroup::Ptr& depGroup, + const std::vector& dependencies, + bool removeGroup +) +{ + if (dependencies.empty() || !GetActive() || !m_RconWorker || !m_RconWorker->IsConnected()) { + return; // No need to enqueue anything if we're not connected or there are no dependencies. + } + + Checkable::Ptr child(dependencies.front()->GetChild()); + bool cancelledRegistration = false; + + { + std::lock_guard lock(m_PendingItemsMutex); + if (auto it(m_PendingItems.find(std::make_pair(child, depGroup))); it != m_PendingItems.end()) { + cancelledRegistration = true; + m_PendingItems.erase(it); + if (removeGroup) { + // If we're removing the entire group registration, we can also drop any pending dependency group + // state update triggered previously as it should no longer have any children left. + m_PendingItems.erase(std::make_pair(nullptr, depGroup)); + } + } + } + + if (!child->HasAnyDependencies()) { + // If the child Checkable has no parent and reverse dependencies, we can safely remove the dependency node. + // This might be a no-op in some cases (e.g. if the child's only dependency was the one that we just canceled + // above), but since we can't reliably determine whether the node exists in Redis or not, we just enqueue the + // deletion anyway. + EnqueueRelationsDeletion(GetObjectIdentifier(child), {{CONFIG_REDIS_KEY_PREFIX "dependency:node", ""}}); + } + + if (cancelledRegistration && depGroup->GetIcingaDBIdentifier().IsEmpty()) { + // If we had a pending registration that we just canceled above, and the dependency group has no + // IcingaDB identifier yet, then there's no need to proceed with any deletions, as the dependency + // group was never serialized to Redis in the first place. + return; + } + + if (depGroup->GetIcingaDBIdentifier().IsEmpty()) { + // An empty IcingaDB identifier indicates that the worker thread has just picked up the registration of the + // first child (removed from the pending items queue) but hasn't yet entered the InsertCheckableDependencies() + // function to actually fill in the IcingaDB identifier. Thus, we need to generate and set it here to ensure + // that the relation deletions below use the correct identifier. + if (depGroup->IsRedundancyGroup()) { + // Keep this with IcingaDB::InsertCheckableDependencies in sync! + depGroup->SetIcingaDBIdentifier(HashValue(new Array{m_EnvironmentId, depGroup->GetCompositeKey()})); + } else { + // This will set the IcingaDB identifier of the dependency group as a side effect. + (void)GetDependencyEdgeStateId(depGroup, dependencies.front()); + } + } + + std::set detachedParents; + for (const auto& dependency : dependencies) { + const auto& parent(dependency->GetParent()); + if (auto [_, inserted] = detachedParents.insert(dependency->GetParent().get()); inserted) { + String edgeId; + if (depGroup->IsRedundancyGroup()) { + // If the redundancy group has no members left, it's going to be removed as well, so we need to + // delete dependency edges from that group to the parent Checkables. + if (removeGroup) { + EnqueueRelationsDeletion( + GetDependencyEdgeStateId(depGroup, dependency), + { + {CONFIG_REDIS_KEY_PREFIX "dependency:edge", ""}, + {CONFIG_REDIS_KEY_PREFIX "dependency:edge:state", ""}, + } + ); + } + + // Remove the connection from the child Checkable to the redundancy group. + edgeId = HashValue(new Array{GetObjectIdentifier(child), depGroup->GetIcingaDBIdentifier()}); + } else { + // Remove the edge between the parent and child Checkable linked through the removed dependency. + edgeId = HashValue(new Array{GetObjectIdentifier(child), GetObjectIdentifier(parent)}); + } + + EnqueueRelationsDeletion(edgeId, {{CONFIG_REDIS_KEY_PREFIX "dependency:edge", ""}}); + + // The total_children and affects_children columns might now have different outcome, so update the parent + // Checkable as well. The grandparent Checkable may still have wrong numbers of total children, though it's + // not worth traversing the whole tree way up and sending config updates for each one of them, as the next + // Redis config dump is going to fix it anyway. + EnqueueConfigObject(parent, ConfigUpdate); + + if (!parent->HasAnyDependencies()) { + // If the parent Checkable isn't part of any other dependency chain anymore, drop its dependency node entry. + EnqueueRelationsDeletion(GetObjectIdentifier(parent), {{CONFIG_REDIS_KEY_PREFIX "dependency:node", ""}}); + } + } + } + + if (removeGroup && depGroup->IsRedundancyGroup()) { + EnqueueRelationsDeletion( + depGroup->GetIcingaDBIdentifier(), + { + {CONFIG_REDIS_KEY_PREFIX "dependency:node", ""}, + {CONFIG_REDIS_KEY_PREFIX "redundancygroup", ""}, + {CONFIG_REDIS_KEY_PREFIX "redundancygroup:state", ""}, + {CONFIG_REDIS_KEY_PREFIX "dependency:edge:state", ""} + } + ); + } else if (removeGroup) { + // Note: The Icinga DB identifier of a non-redundant dependency group is used as the edge state ID + // and shared by all of its dependency objects. See also SerializeDependencyEdgeState() for details. + EnqueueRelationsDeletion(depGroup->GetIcingaDBIdentifier(), {{CONFIG_REDIS_KEY_PREFIX "dependency:edge:state", ""}}); + } +} + +/** + * Enqueue a relation deletion for processing in the pending objects thread. + * + * This function adds a relation deletion item to the set of pending items to be processed by the + * pending items worker thread. The relation deletion item contains the ID of the relation to be + * deleted and a map of Redis keys from which to delete the relation. If the relation deletion item + * is already in the set, it updates the deletion keys accordingly. + * + * @param id The ID of the relation to be deleted. + * @param relations A map of Redis keys from which to delete the relation. + */ +void IcingaDB::EnqueueRelationsDeletion(const String& id, const RelationsKeySet& relations) +{ + if (!GetActive() || !m_RconWorker || !m_RconWorker->IsConnected()) { + return; // No need to enqueue anything if we're not connected. + } + + { + std::lock_guard lock(m_PendingItemsMutex); + if (auto [it, inserted] = m_PendingItems.insert(RelationsDeletionItem{id, relations}); !inserted) { + m_PendingItems.modify(it, [&relations](PendingItemVariant& val) { + auto& item = std::get(val); + item.Relations.insert(relations.begin(), relations.end()); + }); + } + } + m_PendingItemsCV.notify_one(); +} diff --git a/lib/icingadb/icingadb.cpp b/lib/icingadb/icingadb.cpp index 26014d966..5f37a965f 100644 --- a/lib/icingadb/icingadb.cpp +++ b/lib/icingadb/icingadb.cpp @@ -131,6 +131,7 @@ void IcingaDB::Start(bool runtimeCreated) Ptr keepAlive (this); m_HistoryThread = std::async(std::launch::async, [this, keepAlive]() { ForwardHistoryEntries(); }); + m_PendingItemsThread = std::thread([this, keepAlive] { PendingItemsThreadProc(); }); } void IcingaDB::ExceptionHandler(boost::exception_ptr exp) @@ -154,9 +155,11 @@ void IcingaDB::OnConnectedHandler() UpdateAllConfigObjects(); - m_ConfigDumpDone = true; - + m_ConfigDumpDone.store(true); m_ConfigDumpInProgress = false; + // Notify the pending items worker to let it know that the config dump is done, + // and it can start processing pending items. + m_PendingItemsCV.notify_one(); } void IcingaDB::PublishStatsTimerHandler(void) @@ -192,6 +195,8 @@ void IcingaDB::Stop(bool runtimeRemoved) Log(LogInformation, "IcingaDB") << "Flushing history data buffer to Redis."; + m_PendingItemsCV.notify_all(); // Wake up the pending items worker to let it exit cleanly. + if (m_HistoryThread.wait_for(std::chrono::minutes(1)) == std::future_status::timeout) { Log(LogCritical, "IcingaDB") << "Flushing takes more than one minute (while we're about to shut down). Giving up and discarding " @@ -199,6 +204,7 @@ void IcingaDB::Stop(bool runtimeRemoved) } m_StatsTimer->Stop(true); + m_PendingItemsThread.join(); Log(LogInformation, "IcingaDB") << "'" << GetName() << "' stopped."; diff --git a/lib/icingadb/icingadb.hpp b/lib/icingadb/icingadb.hpp index d47935fd8..cf126b2df 100644 --- a/lib/icingadb/icingadb.hpp +++ b/lib/icingadb/icingadb.hpp @@ -16,6 +16,9 @@ #include "icinga/service.hpp" #include "icinga/downtime.hpp" #include "remote/messageorigin.hpp" +#include +#include +#include #include #include #include @@ -32,6 +35,132 @@ namespace icinga #define CONFIG_REDIS_KEY_PREFIX "icinga:" #define CHECKSUM_REDIS_KEY_PREFIX CONFIG_REDIS_KEY_PREFIX "checksum:" +/** + * Dirty bits for config/state changes. + * + * These are used to mark objects as "dirty" in order to trigger appropriate updates in Redis. + * Each bit represents a different type of change that requires a specific action to be taken. + * + * @ingroup icingadb + */ +enum DirtyBits : uint32_t +{ + ConfigUpdate = 1<<0, // Trigger a Redis config update for the object. + ConfigDelete = 1<<1, // Send a deletion command for the object to Redis. + VolatileState = 1<<2, // Send a volatile state update to Redis (affects only checkables). + RuntimeState = 1<<3, // Send a runtime state update to Redis (affects only checkables). + NextUpdate = 1<<4, // Update the `icinga:nextupdate:{host,service}` Redis keys (affects only checkables). + + FullState = VolatileState | RuntimeState, // A combination of all (non-dependency) state-related dirty bits. + + // All valid dirty bits combined used for masking input values. + DirtyBitsAll = ConfigUpdate | ConfigDelete | FullState | NextUpdate +}; + +/** + * A variant type representing the identifier of a pending item. + * + * This variant can hold either a string representing a real Redis hash key or a pair consisting of + * a configuration object pointer and a dependency group pointer. A pending item identified by the + * latter variant type operates primarily on the associated configuration object or dependency group, + * thus the pairs are used for uniqueness in the pending items container. + * + * @ingroup icingadb + */ +using PendingItemKey = std::variant>; + +/** + * A pending queue item. + * + * This struct represents a generic pending item in the queue that is associated with a unique identifier + * and dirty bits indicating the type of updates required in Redis. The @c EnqueueTime field records the + * time when the item was added to the queue, which can be useful for tracking how long an item waits before + * being processed. This base struct is extended by more specific pending item types that operate on different + * kinds of objects, such as configuration objects or dependency groups. + * + * @ingroup icingadb + */ +struct PendingQueueItem +{ + uint32_t DirtyBits; + PendingItemKey ID; + const std::chrono::steady_clock::time_point EnqueueTime; + + PendingQueueItem(PendingItemKey&& id, uint32_t dirtyBits); +}; + +/** + * A pending configuration object item. + * + * This struct represents a pending item in the queue that is associated with a configuration object. + * It contains a pointer to the configuration object and the dirty bits indicating the type of updates + * required for that object in Redis. A pending configuration item operates primarily on config objects, + * thus the @c ID field in the base struct is only used for uniqueness in the pending items container. + * + * @ingroup icingadb + */ +struct PendingConfigItem : PendingQueueItem +{ + ConfigObject::Ptr Object; + + PendingConfigItem(const ConfigObject::Ptr& obj, uint32_t bits); +}; + +/** + * A pending dependency group state item. + * + * This struct represents a pending item in the queue that is associated with a dependency group. + * It contains a pointer to the dependency group for which state updates are required. The dirty bits + * in the base struct are not used for this item type, as the operation is specific to updating the + * state of the dependency group itself. + * + * @ingroup icingadb + */ +struct PendingDependencyGroupStateItem : PendingQueueItem +{ + DependencyGroup::Ptr DepGroup; + + explicit PendingDependencyGroupStateItem(const DependencyGroup::Ptr& depGroup); +}; + +/** + * A pending dependency edge item. + * + * This struct represents a pending dependency child registration into a dependency group. + * It contains a pointer to the dependency group and the checkable child being registered. + * The dirty bits in the base struct are not used for this item type, as the operation is specific + * to registering the child into the dependency group. + * + * @ingroup icingadb + */ +struct PendingDependencyEdgeItem : PendingQueueItem +{ + DependencyGroup::Ptr DepGroup; + Checkable::Ptr Child; + + PendingDependencyEdgeItem(const DependencyGroup::Ptr& depGroup, const Checkable::Ptr& child); +}; + +// Set of Redis keys from which to delete a relation, along with their checksums (if any). +using RelationsKeySet = std::set>; + +/** + * A pending relations deletion item. + * + * This struct represents a pending item in the queue that is associated with the deletion of relations + * in Redis. It contains a map of Redis keys from which the relation identified by the @c ID field should + * be deleted. The @c ID field represents the unique identifier of the relation to be deleted, and the + * @c Relations map specifies the Redis keys and whether to delete the corresponding checksum keys. + * + * @ingroup icingadb + */ +struct RelationsDeletionItem : PendingQueueItem +{ + RelationsKeySet Relations; + + RelationsDeletionItem(const String& id, RelationsKeySet relations); +}; + /** * @ingroup icingadb */ @@ -174,6 +303,7 @@ private: static void DeleteKeys(const RedisConnection::Ptr& conn, const std::vector& keys, RedisConnection::QueryPriority priority); static std::vector GetTypeOverwriteKeys(const Type::Ptr& type, bool skipChecksums = false); static void AddDataToHmSets(std::map& hMSets, const RedisConnection::QueryArg& redisKey, const String& id, const Dictionary::Ptr& data); + static bool IsStateKey(const RedisConnection::QueryArg& key); static String FormatCheckSumBinary(const String& str); static String FormatCommandLine(const Value& commandLine); static QueryArgPair GetCheckableStateKeys(const Type::Ptr& type); @@ -192,6 +322,7 @@ private: static Dictionary::Ptr SerializeVars(const Dictionary::Ptr& vars); static Dictionary::Ptr SerializeDependencyEdgeState(const DependencyGroup::Ptr& dependencyGroup, const Dependency::Ptr& dep); static Dictionary::Ptr SerializeRedundancyGroupState(const Checkable::Ptr& child, const DependencyGroup::Ptr& redundancyGroup); + static String GetDependencyEdgeStateId(const DependencyGroup::Ptr& dependencyGroup, const Dependency::Ptr& dep); static String HashValue(const Value& value); static String HashValue(const Value& value, const std::set& propertiesBlacklist, bool propertiesWhitelist = false); @@ -253,7 +384,7 @@ private: Bulker m_HistoryBulker {4096, std::chrono::milliseconds(250)}; bool m_ConfigDumpInProgress; - bool m_ConfigDumpDone; + std::atomic_bool m_ConfigDumpDone{false}; /** * The primary Redis connection used to send history and heartbeat queries. @@ -290,6 +421,55 @@ private: // initialization, the value is read-only and can be accessed without further synchronization. static String m_EnvironmentId; static std::mutex m_EnvironmentIdInitMutex; + + // A variant type that can hold any of the pending item types used in the pending items container. + using PendingItemVariant = std::variant< + PendingConfigItem, + PendingDependencyGroupStateItem, + PendingDependencyEdgeItem, + RelationsDeletionItem + >; + + struct PendingItemKeyExtractor + { + // The type of the key extracted from a pending item required by Boost.MultiIndex. + using result_type = const PendingItemKey&; + + result_type operator()(const PendingItemVariant& item) const + { + return std::visit([](const auto& pendingItem) -> result_type { return pendingItem.ID; }, item); + } + }; + + // A multi-index container for managing pending items with unique IDs and maintaining insertion order. + // The first index is an ordered unique index based on the pending item key, allowing for efficient + // lookups and ensuring uniqueness of items. The second index is a sequenced index that maintains the + // order of insertion, enabling FIFO processing of pending items. + using PendingItemsSet = boost::multi_index_container< + PendingItemVariant, + boost::multi_index::indexed_by< + boost::multi_index::ordered_unique, // std::variant has operator< defined. + boost::multi_index::sequenced<> + > + >; + + std::thread m_PendingItemsThread; // The background worker thread (consumer of m_PendingItems). + PendingItemsSet m_PendingItems; // Container for pending items with dirty bits (access protected by m_PendingItemsMutex). + std::mutex m_PendingItemsMutex; // Mutex to protect access to m_PendingItems. + std::condition_variable m_PendingItemsCV; // Condition variable to forcefully wake up the worker thread. + + void PendingItemsThreadProc(); + std::chrono::duration DequeueAndProcessOne(std::unique_lock& lock); + void ProcessPendingItem(const PendingConfigItem& item); + void ProcessPendingItem(const PendingDependencyGroupStateItem& item) const; + void ProcessPendingItem(const PendingDependencyEdgeItem& item); + void ProcessPendingItem(const RelationsDeletionItem& item); + + void EnqueueConfigObject(const ConfigObject::Ptr& object, uint32_t bits); + void EnqueueDependencyGroupStateUpdate(const DependencyGroup::Ptr& depGroup); + void EnqueueDependencyChildRegistered(const DependencyGroup::Ptr& depGroup, const Checkable::Ptr& child); + void EnqueueDependencyChildRemoved(const DependencyGroup::Ptr& depGroup, const std::vector& dependencies, bool removeGroup); + void EnqueueRelationsDeletion(const String& id, const RelationsKeySet& relations); }; } diff --git a/lib/icingadb/redisconnection.hpp b/lib/icingadb/redisconnection.hpp index 3c587d2de..20a7cc4a6 100644 --- a/lib/icingadb/redisconnection.hpp +++ b/lib/icingadb/redisconnection.hpp @@ -195,7 +195,7 @@ struct RedisConnInfo final : SharedObject int GetQueryCount(RingBuffer::SizeType span); - inline int GetPendingQueryCount() const + inline std::size_t GetPendingQueryCount() const { return m_PendingQueries; }