diff --git a/lib/icingadb/icingadb-worker.cpp b/lib/icingadb/icingadb-worker.cpp index 5c151c285..ecf905ac0 100644 --- a/lib/icingadb/icingadb-worker.cpp +++ b/lib/icingadb/icingadb-worker.cpp @@ -85,6 +85,7 @@ std::chrono::duration IcingaDB::DequeueAndProcessOne(std::unique_lock retryAfter{0}; // If we can't process anything right now, how long to wait before retrying? @@ -93,7 +94,7 @@ std::chrono::duration IcingaDB::DequeueAndProcessOne(std::unique_lock(); for (auto it(seqView.begin()); it != seqView.end(); ++it) { if (it != seqView.begin()) { - if (dynamic_cast(it->get())) { + if (std::holds_alternative(it->Item)) { // We don't know whether the previous items are related to this deletion item or not, // thus we can't just process this right now when there are older items in the queue. // Otherwise, we might delete something that is going to be updated/created. @@ -101,14 +102,19 @@ std::chrono::duration IcingaDB::DequeueAndProcessOne(std::unique_lockEnqueueTime; 1000ms > age) { + if (auto age = now - it->EnqueueTime; 1000ms > age) { if (it == seqView.begin()) { retryAfter = 1000ms - age; } break; } - ConfigObject::Ptr cobj = (*it)->GetObjectToLock(); + ConfigObject::Ptr cobj; + if (auto confPtr = std::get_if(&it->Item); confPtr) { + cobj = confPtr->Object; + } else if (auto edgePtr = std::get_if(&it->Item)) { + cobj = edgePtr->Child; + } ObjectLock olock(cobj, std::defer_lock); if (cobj && !olock.TryLock()) { continue; // Can't lock the object right now, try the next one. @@ -119,14 +125,15 @@ std::chrono::duration IcingaDB::DequeueAndProcessOne(std::unique_lockExecute(*this); - } catch (const std::exception& ex) { - icingadb::task_queue::PendingQueueItem& itemRef = *itemToProcess; // For typeid(operand of typeid must not have any side effects). - Log(LogCritical, "IcingaDB") - << "Exception while processing pending item of type '" << typeid(itemRef).name() << "': " - << DiagnosticInformation(ex, GetActive()); - } + std::visit([this](auto &item) { + try { + ProcessQueueItem(item); + } catch (const std::exception& ex) { + Log(LogCritical, "IcingaDB") + << "Exception while processing pending item of type '" << typeid(decltype(item)).name() << "': " + << DiagnosticInformation(ex, GetActive()); + } + }, itemToProcess.Item); lock.lock(); break; } @@ -138,11 +145,6 @@ std::chrono::duration IcingaDB::DequeueAndProcessOne(std::unique_lockGetReflectionType()); - icingadb.m_RconWorker->FireAndForgetQueries( +void IcingaDB::ProcessQueueItem(const icingadb::task_queue::PendingConfigItem& item) +{ + namespace queue = icingadb::task_queue; + + if (item.DirtyBits & queue::ConfigDelete) { + auto redisKeyPair = GetSyncableTypeRedisKeys(item.Object->GetReflectionType()); + m_RconWorker->FireAndForgetQueries( { - {"HDEL", redisKeyPair.ObjectKey, icingadb.GetObjectIdentifier(Object)}, - {"HDEL", redisKeyPair.ChecksumKey, icingadb.GetObjectIdentifier(Object)}, + {"HDEL", redisKeyPair.ObjectKey, GetObjectIdentifier(item.Object)}, + {"HDEL", redisKeyPair.ChecksumKey, GetObjectIdentifier(item.Object)}, { "XADD", "icinga:runtime", @@ -169,7 +174,7 @@ void icingadb::task_queue::PendingConfigItem::Execute(IcingaDB& icingadb) const "redis_key", redisKeyPair.ObjectKey, "id", - icingadb.GetObjectIdentifier(Object), + GetObjectIdentifier(item.Object), "runtime_type", "delete" } @@ -177,19 +182,19 @@ void icingadb::task_queue::PendingConfigItem::Execute(IcingaDB& icingadb) const ); } - if (DirtyBits & ConfigUpdate) { + if (item.DirtyBits & queue::ConfigUpdate) { std::map hMSets; std::vector runtimeUpdates; - icingadb.CreateConfigUpdate(Object, icingadb.GetSyncableTypeRedisKeys(Object->GetReflectionType()), hMSets, runtimeUpdates, true); - icingadb.ExecuteRedisTransaction(icingadb.m_RconWorker, hMSets, runtimeUpdates); + CreateConfigUpdate(item.Object, GetSyncableTypeRedisKeys(item.Object->GetReflectionType()), hMSets, runtimeUpdates, true); + ExecuteRedisTransaction(m_RconWorker, hMSets, runtimeUpdates); } - if (auto checkable = dynamic_pointer_cast(Object); checkable) { - if (DirtyBits & FullState) { - icingadb.UpdateState(checkable, DirtyBits); + if (auto checkable = dynamic_pointer_cast(item.Object); checkable) { + if (item.DirtyBits & queue::FullState) { + UpdateState(checkable, item.DirtyBits); } - if (DirtyBits & NextUpdate) { - icingadb.SendNextUpdate(checkable); + if (item.DirtyBits & queue::NextUpdate) { + SendNextUpdate(checkable); } } } @@ -201,15 +206,15 @@ void icingadb::task_queue::PendingConfigItem::Execute(IcingaDB& icingadb) const * dependency group in Redis. It selects any child checkable from the dependency group to initiate * the state update, as all children share the same dependency group state. * - * @param icingadb The IcingaDB instance to use for executing Redis queries. + * @param item The queue item to process. */ -void icingadb::task_queue::PendingDependencyGroupStateItem::Execute(IcingaDB& icingadb) const +void IcingaDB::ProcessQueueItem(const icingadb::task_queue::PendingDependencyGroupStateItem& item) const { // For dependency group state updates, we don't actually care which child triggered the update, // since all children share the same dependency group state. Thus, we can just pick any child to // start the update from. - if (auto child = DepGroup->GetAnyChild(); child) { - icingadb.UpdateDependenciesState(child, DepGroup); + if (auto child = item.DepGroup->GetAnyChild(); child) { + UpdateDependenciesState(child, item.DepGroup); } } @@ -219,14 +224,14 @@ void icingadb::task_queue::PendingDependencyGroupStateItem::Execute(IcingaDB& ic * This function processes the pending dependency edge item and ensures that the necessary Redis * operations are performed to register the child checkable as part of the dependency group. * - * @param icingadb The IcingaDB instance to use for executing Redis queries. + * @param item The queue item to process. */ -void icingadb::task_queue::PendingDependencyEdgeItem::Execute(IcingaDB& icingadb) const +void IcingaDB::ProcessQueueItem(const icingadb::task_queue::PendingDependencyEdgeItem& item) { std::vector runtimeUpdates; std::map hMSets; - icingadb.InsertCheckableDependencies(Child, hMSets, &runtimeUpdates, DepGroup); - icingadb.ExecuteRedisTransaction(icingadb.m_RconWorker, hMSets, runtimeUpdates); + InsertCheckableDependencies(item.Child, hMSets, &runtimeUpdates, item.DepGroup); + ExecuteRedisTransaction(m_RconWorker, hMSets, runtimeUpdates); } /** @@ -236,15 +241,15 @@ void icingadb::task_queue::PendingDependencyEdgeItem::Execute(IcingaDB& icingadb * from Redis. It iterates over the map of Redis keys and deletes the relations associated with * the given ID. * - * @param icingadb The IcingaDB instance to use for executing Redis queries. + * @param item The queue item to process. */ -void icingadb::task_queue::RelationsDeletionItem::Execute(IcingaDB& icingadb) const +void IcingaDB::ProcessQueueItem(const icingadb::task_queue::RelationsDeletionItem& item) { - for (const auto& [configKey, checksumKey] : Relations) { - if (icingadb.IsStateKey(configKey)) { - icingadb.DeleteState(ID, configKey, checksumKey); + for (const auto& [configKey, checksumKey] : item.Relations) { + if (IsStateKey(configKey)) { + DeleteState(item.ID, configKey, checksumKey); } else { - icingadb.DeleteRelationship(ID, configKey, checksumKey); + DeleteRelationship(item.ID, configKey, checksumKey); } } } @@ -257,22 +262,23 @@ void icingadb::task_queue::RelationsDeletionItem::Execute(IcingaDB& icingadb) co */ void IcingaDB::EnqueueConfigObject(const ConfigObject::Ptr& object, uint32_t bits) { + namespace queue = icingadb::task_queue; + if (!GetActive() || !m_RconWorker || !m_RconWorker->IsConnected()) { return; // No need to enqueue anything if we're not connected. } - namespace queue = icingadb::task_queue; { std::lock_guard lock(m_PendingItemsMutex); - if (auto [it, inserted] = m_PendingItems.insert(std::make_shared(object, bits)); !inserted) { - m_PendingItems.modify(it, [bits](const std::shared_ptr& item) { - auto configItem = dynamic_cast(item.get()); + if (auto [it, inserted] = m_PendingItems.emplace(queue::PendingConfigItem{object, bits}); !inserted) { + m_PendingItems.modify(it, [bits](queue::PendingQueueItem& item) { + auto& configItem = std::get(item.Item); if (bits & queue::ConfigDelete) { - configItem->DirtyBits &= ~(queue::ConfigUpdate | queue::FullState); + configItem.DirtyBits &= ~(queue::ConfigUpdate | queue::FullState); } else if (bits & queue::ConfigUpdate) { - configItem->DirtyBits &= ~queue::ConfigDelete; + configItem.DirtyBits &= ~queue::ConfigDelete; } - configItem->DirtyBits |= bits & queue::DirtyBitsAll; + configItem.DirtyBits |= bits & queue::DirtyBitsAll; }); } } @@ -284,7 +290,7 @@ void IcingaDB::EnqueueDependencyGroupStateUpdate(const DependencyGroup::Ptr& dep if (GetActive() && m_RconWorker && m_RconWorker->IsConnected()) { { std::lock_guard lock(m_PendingItemsMutex); - m_PendingItems.insert(std::make_shared(depGroup)); + m_PendingItems.emplace(icingadb::task_queue::PendingDependencyGroupStateItem{depGroup}); } m_PendingItemsCV.notify_one(); } @@ -304,7 +310,7 @@ void IcingaDB::EnqueueDependencyChildRegistered(const DependencyGroup::Ptr& depG if (GetActive() && m_RconWorker && m_RconWorker->IsConnected()) { { std::lock_guard lock(m_PendingItemsMutex); - m_PendingItems.insert(std::make_shared(depGroup, child)); + m_PendingItems.emplace(icingadb::task_queue::PendingDependencyEdgeItem{depGroup, child}); } m_PendingItemsCV.notify_one(); } @@ -330,6 +336,8 @@ void IcingaDB::EnqueueDependencyChildRemoved( bool removeGroup ) { + namespace queue = icingadb::task_queue; + if (dependencies.empty() || !GetActive() || !m_RconWorker || !m_RconWorker->IsConnected()) { return; // No need to enqueue anything if we're not connected or there are no dependencies. } @@ -339,13 +347,12 @@ void IcingaDB::EnqueueDependencyChildRemoved( { std::lock_guard lock(m_PendingItemsMutex); - if (auto it(m_PendingItems.find(std::make_pair(child, depGroup))); it != m_PendingItems.end()) { + if (m_PendingItems.erase(std::make_pair(depGroup.get(), child.get())) > 0) { cancelledRegistration = true; - m_PendingItems.erase(it); if (removeGroup) { // If we're removing the entire group registration, we can also drop any pending dependency group // state update triggered previously as it should no longer have any children left. - m_PendingItems.erase(std::make_pair(nullptr, depGroup)); + m_PendingItems.erase(depGroup.get()); } } } @@ -410,7 +417,7 @@ void IcingaDB::EnqueueDependencyChildRemoved( // Checkable as well. The grandparent Checkable may still have wrong numbers of total children, though it's // not worth traversing the whole tree way up and sending config updates for each one of them, as the next // Redis config dump is going to fix it anyway. - EnqueueConfigObject(parent, icingadb::task_queue::ConfigUpdate); + EnqueueConfigObject(parent, queue::ConfigUpdate); if (!parent->HasAnyDependencies()) { // If the parent Checkable isn't part of any other dependency chain anymore, drop its dependency node entry. @@ -447,18 +454,20 @@ void IcingaDB::EnqueueDependencyChildRemoved( * @param id The ID of the relation to be deleted. * @param relations A map of Redis keys from which to delete the relation. */ -void IcingaDB::EnqueueRelationsDeletion(const String& id, const icingadb::task_queue::RelationsDeletionItem::RelationsKeySet& relations) +void IcingaDB::EnqueueRelationsDeletion(const String& id, icingadb::task_queue::RelationsDeletionItem::RelationsKeySet relations) { + namespace queue = icingadb::task_queue; + if (!GetActive() || !m_RconWorker || !m_RconWorker->IsConnected()) { return; // No need to enqueue anything if we're not connected. } { std::lock_guard lock(m_PendingItemsMutex); - if (auto [it, inserted] = m_PendingItems.insert(std::make_shared(id, relations)); !inserted) { - m_PendingItems.modify(it, [&relations](std::shared_ptr& val) { - auto item = dynamic_cast(val.get()); - item->Relations.insert(relations.begin(), relations.end()); + if (auto [it, inserted] = m_PendingItems.emplace(queue::RelationsDeletionItem{id, relations}); !inserted) { + m_PendingItems.modify(it, [&relations](queue::PendingQueueItem& val) { + auto& item = std::get(val.Item); + item.Relations.merge(std::move(relations)); }); } } diff --git a/lib/icingadb/icingadb.hpp b/lib/icingadb/icingadb.hpp index 81c6c2a51..0aa42176d 100644 --- a/lib/icingadb/icingadb.hpp +++ b/lib/icingadb/icingadb.hpp @@ -17,7 +17,6 @@ #include "icinga/downtime.hpp" #include "remote/messageorigin.hpp" #include -#include #include #include #include @@ -61,92 +60,47 @@ enum DirtyBits : uint32_t DirtyBitsAll = ConfigUpdate | ConfigDelete | FullState | NextUpdate }; -/** - * A pending queue item. - * - * This struct represents a generic pending item in the queue that is associated with a unique identifier - * and dirty bits indicating the type of updates required in Redis. The @c EnqueueTime field records the - * time when the item was added to the queue, which can be useful for tracking how long an item waits before - * being processed. This base struct is extended by more specific pending item types that operate on different - * kinds of objects, such as configuration objects or dependency groups. - * - * @ingroup icingadb - */ -struct PendingQueueItem -{ - /** - * A variant type representing the identifier of a pending item. - * - * This variant can hold either a string representing a real Redis hash key or a pair consisting of - * a configuration object pointer and a dependency group pointer. A pending item identified by the - * latter variant type operates primarily on the associated configuration object or dependency group, - * thus the pairs are used for uniqueness in the pending items container. - */ - using Key = std::variant>; - - virtual ~PendingQueueItem() = default; - - const std::chrono::steady_clock::time_point EnqueueTime{std::chrono::steady_clock::now()}; - - virtual Key GetID() const = 0; - virtual ConfigObject::Ptr GetObjectToLock() const { return nullptr; }; - virtual void Execute(IcingaDB& icingadb) const = 0; -}; - -// A multi-index container for managing pending items with unique IDs and maintaining insertion order. -// The first index is an ordered unique index based on the pending item key, allowing for efficient -// lookups and ensuring uniqueness of items. The second index is a sequenced index that maintains the -// order of insertion, enabling FIFO processing of pending items. -using PendingItemsSet = boost::multi_index_container< - std::shared_ptr, - boost::multi_index::indexed_by< - boost::multi_index::ordered_unique< - boost::multi_index::const_mem_fun - >, - boost::multi_index::sequenced<> - > ->; /** * A pending configuration object item. * * This struct represents a pending item in the queue that is associated with a configuration object. * It contains a pointer to the configuration object and the dirty bits indicating the type of updates - * required for that object in Redis. A pending configuration item operates primarily on config objects, - * thus the @c ID field in the base struct is only used for uniqueness in the pending items container. + * required for that object in Redis. * * @ingroup icingadb */ -struct PendingConfigItem : PendingQueueItem +struct PendingConfigItem { ConfigObject::Ptr Object; uint32_t DirtyBits; PendingConfigItem(const ConfigObject::Ptr& obj, uint32_t bits); - Key GetID() const override { return std::make_pair(Object, nullptr); } - ConfigObject::Ptr GetObjectToLock() const override; - void Execute(IcingaDB& icingadb) const override; + [[nodiscard]] ConfigObject* GetQueueLookupKey() const + { + return Object.get(); + } }; /** * A pending dependency group state item. * * This struct represents a pending item in the queue that is associated with a dependency group. - * It contains a pointer to the dependency group for which state updates are required. The dirty bits - * in the base struct are not used for this item type, as the operation is specific to updating the - * state of the dependency group itself. + * It contains a pointer to the dependency group for which state updates are required. * * @ingroup icingadb */ -struct PendingDependencyGroupStateItem : PendingQueueItem +struct PendingDependencyGroupStateItem { DependencyGroup::Ptr DepGroup; explicit PendingDependencyGroupStateItem(const DependencyGroup::Ptr& depGroup); - Key GetID() const override { return std::make_pair(nullptr, DepGroup.get()); } - void Execute(IcingaDB& icingadb) const override; + [[nodiscard]] DependencyGroup* GetQueueLookupKey() const + { + return DepGroup.get(); + } }; /** @@ -154,21 +108,20 @@ struct PendingDependencyGroupStateItem : PendingQueueItem * * This struct represents a pending dependency child registration into a dependency group. * It contains a pointer to the dependency group and the checkable child being registered. - * The dirty bits in the base struct are not used for this item type, as the operation is specific - * to registering the child into the dependency group. * * @ingroup icingadb */ -struct PendingDependencyEdgeItem : PendingQueueItem +struct PendingDependencyEdgeItem { DependencyGroup::Ptr DepGroup; Checkable::Ptr Child; PendingDependencyEdgeItem(const DependencyGroup::Ptr& depGroup, const Checkable::Ptr& child); - Key GetID() const override { return std::make_pair(Child.get(), DepGroup.get()); } - ConfigObject::Ptr GetObjectToLock() const override { return Child; } - void Execute(IcingaDB& icingadb) const override; + [[nodiscard]] std::pair GetQueueLookupKey() const + { + return {DepGroup.get(), Child.get()}; + } }; /** @@ -181,7 +134,7 @@ struct PendingDependencyEdgeItem : PendingQueueItem * * @ingroup icingadb */ -struct RelationsDeletionItem : PendingQueueItem +struct RelationsDeletionItem { std::string ID; // Set of Redis keys from which to delete a relation, along with their checksums (if any). @@ -190,10 +143,79 @@ struct RelationsDeletionItem : PendingQueueItem RelationsDeletionItem(const String& id, const RelationsKeySet& relations); - Key GetID() const override { return ID; } - void Execute(IcingaDB& icingadb) const override; + [[nodiscard]] std::string_view GetQueueLookupKey() const + { + return ID; + } }; +/** + * A pending queue item. + * + * This struct represents a generic pending item in the queue. The @c EnqueueTime field records the + * time when the item was added to the queue, which can be useful for tracking how long an item waits before + * being processed. This base struct wraps one of the available more specific pending item types that operate + * on different kinds of objects, such as configuration objects or dependency groups. + * + * @ingroup icingadb + */ +struct PendingQueueItem +{ + using ItemVariant = std::variant< + PendingConfigItem, + PendingDependencyGroupStateItem, + PendingDependencyEdgeItem, + RelationsDeletionItem + >; + + ItemVariant Item; + std::chrono::steady_clock::time_point EnqueueTime{std::chrono::steady_clock::now()}; + + template, PendingQueueItem>> + > + explicit PendingQueueItem(T&& item) : Item(std::forward(item)) {} +}; + +template +struct KeyExtractor /* not implemented, only the specialization for std::variant is used */; + +/** + * Helper to use @c PendingQueueItem in @c boost::multi_index_container. + * + * It implements a key extractor that @c multi_index_container will invoke for each element to create an index. + * Every individual type in @c PendingQueueItem::ItemVariant implements a method @c GetQueueLookupKey() that will + * be invoked by this key extractor. The return value from the individual type is then returned as part of a second + * variant type (its element types are automatically deduced from the individual @c GetQueueLookupKey() methods). + * @c multi_index_container will then just use the standard comparison operators on it for building the index. + * + * @ingroup icingadb + */ +template +struct KeyExtractor> { + using result_type = std::variant().GetQueueLookupKey())...>; + + result_type operator()(const PendingQueueItem& queueItem) const + { + return std::visit([](const auto& innerItem) -> result_type { + return innerItem.GetQueueLookupKey(); + }, queueItem.Item); + }; +}; + +// A multi-index container for managing pending items with unique IDs and maintaining insertion order. +// The first index is an ordered unique index based on the pending item key, allowing for efficient +// lookups and ensuring uniqueness of items. The second index is a sequenced index that maintains the +// order of insertion, enabling FIFO processing of pending items. +using PendingItemsSet = boost::multi_index_container< + PendingQueueItem, + boost::multi_index::indexed_by< + boost::multi_index::ordered_unique>, + boost::multi_index::sequenced<> + > +>; + } // namespace icingadb::task_queue /** @@ -453,16 +475,16 @@ private: void PendingItemsThreadProc(); std::chrono::duration DequeueAndProcessOne(std::unique_lock& lock); + void ProcessQueueItem(const icingadb::task_queue::PendingConfigItem& item); + void ProcessQueueItem(const icingadb::task_queue::PendingDependencyGroupStateItem& item) const; + void ProcessQueueItem(const icingadb::task_queue::PendingDependencyEdgeItem& item); + void ProcessQueueItem(const icingadb::task_queue::RelationsDeletionItem& item); + void EnqueueConfigObject(const ConfigObject::Ptr& object, uint32_t bits); void EnqueueDependencyGroupStateUpdate(const DependencyGroup::Ptr& depGroup); void EnqueueDependencyChildRegistered(const DependencyGroup::Ptr& depGroup, const Checkable::Ptr& child); void EnqueueDependencyChildRemoved(const DependencyGroup::Ptr& depGroup, const std::vector& dependencies, bool removeGroup); - void EnqueueRelationsDeletion(const String& id, const icingadb::task_queue::RelationsDeletionItem::RelationsKeySet& relations); - - friend struct icingadb::task_queue::PendingConfigItem; - friend struct icingadb::task_queue::PendingDependencyGroupStateItem; - friend struct icingadb::task_queue::PendingDependencyEdgeItem; - friend struct icingadb::task_queue::RelationsDeletionItem; + void EnqueueRelationsDeletion(const String& id, icingadb::task_queue::RelationsDeletionItem::RelationsKeySet relations); }; }