mirror of
https://github.com/Icinga/icinga2.git
synced 2026-06-09 00:32:12 -04:00
IcingaDB: use key extractor for worker queue
This commit restructures the queue items so that each one now has a method `GetQueueLookupKey()` that is used to derive which elements of the queue are considered to be equal. For this, there is a key extractor for the `multi_index_container` that takes the `variant` from the queue item, calls that method on it, and puts the result in a second variant type. The types in that variant type are automatically deduced from the return types of the individual methods.
This commit is contained in:
parent
2048450159
commit
855f6c7c0c
2 changed files with 168 additions and 137 deletions
|
|
@ -85,6 +85,7 @@ std::chrono::duration<double> IcingaDB::DequeueAndProcessOne(std::unique_lock<st
|
|||
{
|
||||
using namespace std::chrono_literals;
|
||||
namespace ch = std::chrono;
|
||||
namespace queue = icingadb::task_queue;
|
||||
|
||||
bool madeProgress = false; // Did we make any progress in this iteration?
|
||||
ch::duration<double> retryAfter{0}; // If we can't process anything right now, how long to wait before retrying?
|
||||
|
|
@ -93,7 +94,7 @@ std::chrono::duration<double> IcingaDB::DequeueAndProcessOne(std::unique_lock<st
|
|||
auto& seqView = m_PendingItems.get<1>();
|
||||
for (auto it(seqView.begin()); it != seqView.end(); ++it) {
|
||||
if (it != seqView.begin()) {
|
||||
if (dynamic_cast<const icingadb::task_queue::RelationsDeletionItem*>(it->get())) {
|
||||
if (std::holds_alternative<queue::RelationsDeletionItem>(it->Item)) {
|
||||
// We don't know whether the previous items are related to this deletion item or not,
|
||||
// thus we can't just process this right now when there are older items in the queue.
|
||||
// Otherwise, we might delete something that is going to be updated/created.
|
||||
|
|
@ -101,14 +102,19 @@ std::chrono::duration<double> IcingaDB::DequeueAndProcessOne(std::unique_lock<st
|
|||
}
|
||||
}
|
||||
|
||||
if (auto age = now - (*it)->EnqueueTime; 1000ms > age) {
|
||||
if (auto age = now - it->EnqueueTime; 1000ms > age) {
|
||||
if (it == seqView.begin()) {
|
||||
retryAfter = 1000ms - age;
|
||||
}
|
||||
break;
|
||||
}
|
||||
|
||||
ConfigObject::Ptr cobj = (*it)->GetObjectToLock();
|
||||
ConfigObject::Ptr cobj;
|
||||
if (auto confPtr = std::get_if<queue::PendingConfigItem>(&it->Item); confPtr) {
|
||||
cobj = confPtr->Object;
|
||||
} else if (auto edgePtr = std::get_if<queue::PendingDependencyEdgeItem>(&it->Item)) {
|
||||
cobj = edgePtr->Child;
|
||||
}
|
||||
ObjectLock olock(cobj, std::defer_lock);
|
||||
if (cobj && !olock.TryLock()) {
|
||||
continue; // Can't lock the object right now, try the next one.
|
||||
|
|
@ -119,14 +125,15 @@ std::chrono::duration<double> IcingaDB::DequeueAndProcessOne(std::unique_lock<st
|
|||
madeProgress = true;
|
||||
|
||||
lock.unlock();
|
||||
try {
|
||||
itemToProcess->Execute(*this);
|
||||
} catch (const std::exception& ex) {
|
||||
icingadb::task_queue::PendingQueueItem& itemRef = *itemToProcess; // For typeid(operand of typeid must not have any side effects).
|
||||
Log(LogCritical, "IcingaDB")
|
||||
<< "Exception while processing pending item of type '" << typeid(itemRef).name() << "': "
|
||||
<< DiagnosticInformation(ex, GetActive());
|
||||
}
|
||||
std::visit([this](auto &item) {
|
||||
try {
|
||||
ProcessQueueItem(item);
|
||||
} catch (const std::exception& ex) {
|
||||
Log(LogCritical, "IcingaDB")
|
||||
<< "Exception while processing pending item of type '" << typeid(decltype(item)).name() << "': "
|
||||
<< DiagnosticInformation(ex, GetActive());
|
||||
}
|
||||
}, itemToProcess.Item);
|
||||
lock.lock();
|
||||
break;
|
||||
}
|
||||
|
|
@ -138,11 +145,6 @@ std::chrono::duration<double> IcingaDB::DequeueAndProcessOne(std::unique_lock<st
|
|||
return retryAfter;
|
||||
}
|
||||
|
||||
ConfigObject::Ptr icingadb::task_queue::PendingConfigItem::GetObjectToLock() const
|
||||
{
|
||||
return Object;
|
||||
}
|
||||
|
||||
/**
|
||||
* Execute the pending configuration item.
|
||||
*
|
||||
|
|
@ -150,15 +152,18 @@ ConfigObject::Ptr icingadb::task_queue::PendingConfigItem::GetObjectToLock() con
|
|||
* on the dirty bits set for the associated configuration object. It handles configuration deletions, updates,
|
||||
* and state updates for checkable objects.
|
||||
*
|
||||
* @param icingadb The IcingaDB instance to use for executing Redis queries.
|
||||
* @param item The queue item to process.
|
||||
*/
|
||||
void icingadb::task_queue::PendingConfigItem::Execute(IcingaDB& icingadb) const {
|
||||
if (DirtyBits & ConfigDelete) {
|
||||
auto redisKeyPair = icingadb.GetSyncableTypeRedisKeys(Object->GetReflectionType());
|
||||
icingadb.m_RconWorker->FireAndForgetQueries(
|
||||
void IcingaDB::ProcessQueueItem(const icingadb::task_queue::PendingConfigItem& item)
|
||||
{
|
||||
namespace queue = icingadb::task_queue;
|
||||
|
||||
if (item.DirtyBits & queue::ConfigDelete) {
|
||||
auto redisKeyPair = GetSyncableTypeRedisKeys(item.Object->GetReflectionType());
|
||||
m_RconWorker->FireAndForgetQueries(
|
||||
{
|
||||
{"HDEL", redisKeyPair.ObjectKey, icingadb.GetObjectIdentifier(Object)},
|
||||
{"HDEL", redisKeyPair.ChecksumKey, icingadb.GetObjectIdentifier(Object)},
|
||||
{"HDEL", redisKeyPair.ObjectKey, GetObjectIdentifier(item.Object)},
|
||||
{"HDEL", redisKeyPair.ChecksumKey, GetObjectIdentifier(item.Object)},
|
||||
{
|
||||
"XADD",
|
||||
"icinga:runtime",
|
||||
|
|
@ -169,7 +174,7 @@ void icingadb::task_queue::PendingConfigItem::Execute(IcingaDB& icingadb) const
|
|||
"redis_key",
|
||||
redisKeyPair.ObjectKey,
|
||||
"id",
|
||||
icingadb.GetObjectIdentifier(Object),
|
||||
GetObjectIdentifier(item.Object),
|
||||
"runtime_type",
|
||||
"delete"
|
||||
}
|
||||
|
|
@ -177,19 +182,19 @@ void icingadb::task_queue::PendingConfigItem::Execute(IcingaDB& icingadb) const
|
|||
);
|
||||
}
|
||||
|
||||
if (DirtyBits & ConfigUpdate) {
|
||||
if (item.DirtyBits & queue::ConfigUpdate) {
|
||||
std::map<RedisConnection::QueryArg, RedisConnection::Query> hMSets;
|
||||
std::vector<Dictionary::Ptr> runtimeUpdates;
|
||||
icingadb.CreateConfigUpdate(Object, icingadb.GetSyncableTypeRedisKeys(Object->GetReflectionType()), hMSets, runtimeUpdates, true);
|
||||
icingadb.ExecuteRedisTransaction(icingadb.m_RconWorker, hMSets, runtimeUpdates);
|
||||
CreateConfigUpdate(item.Object, GetSyncableTypeRedisKeys(item.Object->GetReflectionType()), hMSets, runtimeUpdates, true);
|
||||
ExecuteRedisTransaction(m_RconWorker, hMSets, runtimeUpdates);
|
||||
}
|
||||
|
||||
if (auto checkable = dynamic_pointer_cast<Checkable>(Object); checkable) {
|
||||
if (DirtyBits & FullState) {
|
||||
icingadb.UpdateState(checkable, DirtyBits);
|
||||
if (auto checkable = dynamic_pointer_cast<Checkable>(item.Object); checkable) {
|
||||
if (item.DirtyBits & queue::FullState) {
|
||||
UpdateState(checkable, item.DirtyBits);
|
||||
}
|
||||
if (DirtyBits & NextUpdate) {
|
||||
icingadb.SendNextUpdate(checkable);
|
||||
if (item.DirtyBits & queue::NextUpdate) {
|
||||
SendNextUpdate(checkable);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
@ -201,15 +206,15 @@ void icingadb::task_queue::PendingConfigItem::Execute(IcingaDB& icingadb) const
|
|||
* dependency group in Redis. It selects any child checkable from the dependency group to initiate
|
||||
* the state update, as all children share the same dependency group state.
|
||||
*
|
||||
* @param icingadb The IcingaDB instance to use for executing Redis queries.
|
||||
* @param item The queue item to process.
|
||||
*/
|
||||
void icingadb::task_queue::PendingDependencyGroupStateItem::Execute(IcingaDB& icingadb) const
|
||||
void IcingaDB::ProcessQueueItem(const icingadb::task_queue::PendingDependencyGroupStateItem& item) const
|
||||
{
|
||||
// For dependency group state updates, we don't actually care which child triggered the update,
|
||||
// since all children share the same dependency group state. Thus, we can just pick any child to
|
||||
// start the update from.
|
||||
if (auto child = DepGroup->GetAnyChild(); child) {
|
||||
icingadb.UpdateDependenciesState(child, DepGroup);
|
||||
if (auto child = item.DepGroup->GetAnyChild(); child) {
|
||||
UpdateDependenciesState(child, item.DepGroup);
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -219,14 +224,14 @@ void icingadb::task_queue::PendingDependencyGroupStateItem::Execute(IcingaDB& ic
|
|||
* This function processes the pending dependency edge item and ensures that the necessary Redis
|
||||
* operations are performed to register the child checkable as part of the dependency group.
|
||||
*
|
||||
* @param icingadb The IcingaDB instance to use for executing Redis queries.
|
||||
* @param item The queue item to process.
|
||||
*/
|
||||
void icingadb::task_queue::PendingDependencyEdgeItem::Execute(IcingaDB& icingadb) const
|
||||
void IcingaDB::ProcessQueueItem(const icingadb::task_queue::PendingDependencyEdgeItem& item)
|
||||
{
|
||||
std::vector<Dictionary::Ptr> runtimeUpdates;
|
||||
std::map<RedisConnection::QueryArg, RedisConnection::Query> hMSets;
|
||||
icingadb.InsertCheckableDependencies(Child, hMSets, &runtimeUpdates, DepGroup);
|
||||
icingadb.ExecuteRedisTransaction(icingadb.m_RconWorker, hMSets, runtimeUpdates);
|
||||
InsertCheckableDependencies(item.Child, hMSets, &runtimeUpdates, item.DepGroup);
|
||||
ExecuteRedisTransaction(m_RconWorker, hMSets, runtimeUpdates);
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
@ -236,15 +241,15 @@ void icingadb::task_queue::PendingDependencyEdgeItem::Execute(IcingaDB& icingadb
|
|||
* from Redis. It iterates over the map of Redis keys and deletes the relations associated with
|
||||
* the given ID.
|
||||
*
|
||||
* @param icingadb The IcingaDB instance to use for executing Redis queries.
|
||||
* @param item The queue item to process.
|
||||
*/
|
||||
void icingadb::task_queue::RelationsDeletionItem::Execute(IcingaDB& icingadb) const
|
||||
void IcingaDB::ProcessQueueItem(const icingadb::task_queue::RelationsDeletionItem& item)
|
||||
{
|
||||
for (const auto& [configKey, checksumKey] : Relations) {
|
||||
if (icingadb.IsStateKey(configKey)) {
|
||||
icingadb.DeleteState(ID, configKey, checksumKey);
|
||||
for (const auto& [configKey, checksumKey] : item.Relations) {
|
||||
if (IsStateKey(configKey)) {
|
||||
DeleteState(item.ID, configKey, checksumKey);
|
||||
} else {
|
||||
icingadb.DeleteRelationship(ID, configKey, checksumKey);
|
||||
DeleteRelationship(item.ID, configKey, checksumKey);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
@ -257,22 +262,23 @@ void icingadb::task_queue::RelationsDeletionItem::Execute(IcingaDB& icingadb) co
|
|||
*/
|
||||
void IcingaDB::EnqueueConfigObject(const ConfigObject::Ptr& object, uint32_t bits)
|
||||
{
|
||||
namespace queue = icingadb::task_queue;
|
||||
|
||||
if (!GetActive() || !m_RconWorker || !m_RconWorker->IsConnected()) {
|
||||
return; // No need to enqueue anything if we're not connected.
|
||||
}
|
||||
namespace queue = icingadb::task_queue;
|
||||
|
||||
{
|
||||
std::lock_guard lock(m_PendingItemsMutex);
|
||||
if (auto [it, inserted] = m_PendingItems.insert(std::make_shared<queue::PendingConfigItem>(object, bits)); !inserted) {
|
||||
m_PendingItems.modify(it, [bits](const std::shared_ptr<queue::PendingQueueItem>& item) {
|
||||
auto configItem = dynamic_cast<queue::PendingConfigItem*>(item.get());
|
||||
if (auto [it, inserted] = m_PendingItems.emplace(queue::PendingConfigItem{object, bits}); !inserted) {
|
||||
m_PendingItems.modify(it, [bits](queue::PendingQueueItem& item) {
|
||||
auto& configItem = std::get<queue::PendingConfigItem>(item.Item);
|
||||
if (bits & queue::ConfigDelete) {
|
||||
configItem->DirtyBits &= ~(queue::ConfigUpdate | queue::FullState);
|
||||
configItem.DirtyBits &= ~(queue::ConfigUpdate | queue::FullState);
|
||||
} else if (bits & queue::ConfigUpdate) {
|
||||
configItem->DirtyBits &= ~queue::ConfigDelete;
|
||||
configItem.DirtyBits &= ~queue::ConfigDelete;
|
||||
}
|
||||
configItem->DirtyBits |= bits & queue::DirtyBitsAll;
|
||||
configItem.DirtyBits |= bits & queue::DirtyBitsAll;
|
||||
});
|
||||
}
|
||||
}
|
||||
|
|
@ -284,7 +290,7 @@ void IcingaDB::EnqueueDependencyGroupStateUpdate(const DependencyGroup::Ptr& dep
|
|||
if (GetActive() && m_RconWorker && m_RconWorker->IsConnected()) {
|
||||
{
|
||||
std::lock_guard lock(m_PendingItemsMutex);
|
||||
m_PendingItems.insert(std::make_shared<icingadb::task_queue::PendingDependencyGroupStateItem>(depGroup));
|
||||
m_PendingItems.emplace(icingadb::task_queue::PendingDependencyGroupStateItem{depGroup});
|
||||
}
|
||||
m_PendingItemsCV.notify_one();
|
||||
}
|
||||
|
|
@ -304,7 +310,7 @@ void IcingaDB::EnqueueDependencyChildRegistered(const DependencyGroup::Ptr& depG
|
|||
if (GetActive() && m_RconWorker && m_RconWorker->IsConnected()) {
|
||||
{
|
||||
std::lock_guard lock(m_PendingItemsMutex);
|
||||
m_PendingItems.insert(std::make_shared<icingadb::task_queue::PendingDependencyEdgeItem>(depGroup, child));
|
||||
m_PendingItems.emplace(icingadb::task_queue::PendingDependencyEdgeItem{depGroup, child});
|
||||
}
|
||||
m_PendingItemsCV.notify_one();
|
||||
}
|
||||
|
|
@ -330,6 +336,8 @@ void IcingaDB::EnqueueDependencyChildRemoved(
|
|||
bool removeGroup
|
||||
)
|
||||
{
|
||||
namespace queue = icingadb::task_queue;
|
||||
|
||||
if (dependencies.empty() || !GetActive() || !m_RconWorker || !m_RconWorker->IsConnected()) {
|
||||
return; // No need to enqueue anything if we're not connected or there are no dependencies.
|
||||
}
|
||||
|
|
@ -339,13 +347,12 @@ void IcingaDB::EnqueueDependencyChildRemoved(
|
|||
|
||||
{
|
||||
std::lock_guard lock(m_PendingItemsMutex);
|
||||
if (auto it(m_PendingItems.find(std::make_pair(child, depGroup))); it != m_PendingItems.end()) {
|
||||
if (m_PendingItems.erase(std::make_pair(depGroup.get(), child.get())) > 0) {
|
||||
cancelledRegistration = true;
|
||||
m_PendingItems.erase(it);
|
||||
if (removeGroup) {
|
||||
// If we're removing the entire group registration, we can also drop any pending dependency group
|
||||
// state update triggered previously as it should no longer have any children left.
|
||||
m_PendingItems.erase(std::make_pair(nullptr, depGroup));
|
||||
m_PendingItems.erase(depGroup.get());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
@ -410,7 +417,7 @@ void IcingaDB::EnqueueDependencyChildRemoved(
|
|||
// Checkable as well. The grandparent Checkable may still have wrong numbers of total children, though it's
|
||||
// not worth traversing the whole tree way up and sending config updates for each one of them, as the next
|
||||
// Redis config dump is going to fix it anyway.
|
||||
EnqueueConfigObject(parent, icingadb::task_queue::ConfigUpdate);
|
||||
EnqueueConfigObject(parent, queue::ConfigUpdate);
|
||||
|
||||
if (!parent->HasAnyDependencies()) {
|
||||
// If the parent Checkable isn't part of any other dependency chain anymore, drop its dependency node entry.
|
||||
|
|
@ -447,18 +454,20 @@ void IcingaDB::EnqueueDependencyChildRemoved(
|
|||
* @param id The ID of the relation to be deleted.
|
||||
* @param relations A map of Redis keys from which to delete the relation.
|
||||
*/
|
||||
void IcingaDB::EnqueueRelationsDeletion(const String& id, const icingadb::task_queue::RelationsDeletionItem::RelationsKeySet& relations)
|
||||
void IcingaDB::EnqueueRelationsDeletion(const String& id, icingadb::task_queue::RelationsDeletionItem::RelationsKeySet relations)
|
||||
{
|
||||
namespace queue = icingadb::task_queue;
|
||||
|
||||
if (!GetActive() || !m_RconWorker || !m_RconWorker->IsConnected()) {
|
||||
return; // No need to enqueue anything if we're not connected.
|
||||
}
|
||||
|
||||
{
|
||||
std::lock_guard lock(m_PendingItemsMutex);
|
||||
if (auto [it, inserted] = m_PendingItems.insert(std::make_shared<icingadb::task_queue::RelationsDeletionItem>(id, relations)); !inserted) {
|
||||
m_PendingItems.modify(it, [&relations](std::shared_ptr<icingadb::task_queue::PendingQueueItem>& val) {
|
||||
auto item = dynamic_cast<icingadb::task_queue::RelationsDeletionItem*>(val.get());
|
||||
item->Relations.insert(relations.begin(), relations.end());
|
||||
if (auto [it, inserted] = m_PendingItems.emplace(queue::RelationsDeletionItem{id, relations}); !inserted) {
|
||||
m_PendingItems.modify(it, [&relations](queue::PendingQueueItem& val) {
|
||||
auto& item = std::get<queue::RelationsDeletionItem>(val.Item);
|
||||
item.Relations.merge(std::move(relations));
|
||||
});
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -17,7 +17,6 @@
|
|||
#include "icinga/downtime.hpp"
|
||||
#include "remote/messageorigin.hpp"
|
||||
#include <boost/multi_index_container.hpp>
|
||||
#include <boost/multi_index/mem_fun.hpp>
|
||||
#include <boost/multi_index/ordered_index.hpp>
|
||||
#include <boost/multi_index/sequenced_index.hpp>
|
||||
#include <atomic>
|
||||
|
|
@ -61,92 +60,47 @@ enum DirtyBits : uint32_t
|
|||
DirtyBitsAll = ConfigUpdate | ConfigDelete | FullState | NextUpdate
|
||||
};
|
||||
|
||||
/**
|
||||
* A pending queue item.
|
||||
*
|
||||
* This struct represents a generic pending item in the queue that is associated with a unique identifier
|
||||
* and dirty bits indicating the type of updates required in Redis. The @c EnqueueTime field records the
|
||||
* time when the item was added to the queue, which can be useful for tracking how long an item waits before
|
||||
* being processed. This base struct is extended by more specific pending item types that operate on different
|
||||
* kinds of objects, such as configuration objects or dependency groups.
|
||||
*
|
||||
* @ingroup icingadb
|
||||
*/
|
||||
struct PendingQueueItem
|
||||
{
|
||||
/**
|
||||
* A variant type representing the identifier of a pending item.
|
||||
*
|
||||
* This variant can hold either a string representing a real Redis hash key or a pair consisting of
|
||||
* a configuration object pointer and a dependency group pointer. A pending item identified by the
|
||||
* latter variant type operates primarily on the associated configuration object or dependency group,
|
||||
* thus the pairs are used for uniqueness in the pending items container.
|
||||
*/
|
||||
using Key = std::variant<std::string /* Redis hash keys */, std::pair<ConfigObject::Ptr, DependencyGroup::Ptr>>;
|
||||
|
||||
virtual ~PendingQueueItem() = default;
|
||||
|
||||
const std::chrono::steady_clock::time_point EnqueueTime{std::chrono::steady_clock::now()};
|
||||
|
||||
virtual Key GetID() const = 0;
|
||||
virtual ConfigObject::Ptr GetObjectToLock() const { return nullptr; };
|
||||
virtual void Execute(IcingaDB& icingadb) const = 0;
|
||||
};
|
||||
|
||||
// A multi-index container for managing pending items with unique IDs and maintaining insertion order.
|
||||
// The first index is an ordered unique index based on the pending item key, allowing for efficient
|
||||
// lookups and ensuring uniqueness of items. The second index is a sequenced index that maintains the
|
||||
// order of insertion, enabling FIFO processing of pending items.
|
||||
using PendingItemsSet = boost::multi_index_container<
|
||||
std::shared_ptr<PendingQueueItem>,
|
||||
boost::multi_index::indexed_by<
|
||||
boost::multi_index::ordered_unique<
|
||||
boost::multi_index::const_mem_fun<PendingQueueItem, PendingQueueItem::Key, &PendingQueueItem::GetID>
|
||||
>,
|
||||
boost::multi_index::sequenced<>
|
||||
>
|
||||
>;
|
||||
|
||||
/**
|
||||
* A pending configuration object item.
|
||||
*
|
||||
* This struct represents a pending item in the queue that is associated with a configuration object.
|
||||
* It contains a pointer to the configuration object and the dirty bits indicating the type of updates
|
||||
* required for that object in Redis. A pending configuration item operates primarily on config objects,
|
||||
* thus the @c ID field in the base struct is only used for uniqueness in the pending items container.
|
||||
* required for that object in Redis.
|
||||
*
|
||||
* @ingroup icingadb
|
||||
*/
|
||||
struct PendingConfigItem : PendingQueueItem
|
||||
struct PendingConfigItem
|
||||
{
|
||||
ConfigObject::Ptr Object;
|
||||
uint32_t DirtyBits;
|
||||
|
||||
PendingConfigItem(const ConfigObject::Ptr& obj, uint32_t bits);
|
||||
|
||||
Key GetID() const override { return std::make_pair(Object, nullptr); }
|
||||
ConfigObject::Ptr GetObjectToLock() const override;
|
||||
void Execute(IcingaDB& icingadb) const override;
|
||||
[[nodiscard]] ConfigObject* GetQueueLookupKey() const
|
||||
{
|
||||
return Object.get();
|
||||
}
|
||||
};
|
||||
|
||||
/**
|
||||
* A pending dependency group state item.
|
||||
*
|
||||
* This struct represents a pending item in the queue that is associated with a dependency group.
|
||||
* It contains a pointer to the dependency group for which state updates are required. The dirty bits
|
||||
* in the base struct are not used for this item type, as the operation is specific to updating the
|
||||
* state of the dependency group itself.
|
||||
* It contains a pointer to the dependency group for which state updates are required.
|
||||
*
|
||||
* @ingroup icingadb
|
||||
*/
|
||||
struct PendingDependencyGroupStateItem : PendingQueueItem
|
||||
struct PendingDependencyGroupStateItem
|
||||
{
|
||||
DependencyGroup::Ptr DepGroup;
|
||||
|
||||
explicit PendingDependencyGroupStateItem(const DependencyGroup::Ptr& depGroup);
|
||||
|
||||
Key GetID() const override { return std::make_pair(nullptr, DepGroup.get()); }
|
||||
void Execute(IcingaDB& icingadb) const override;
|
||||
[[nodiscard]] DependencyGroup* GetQueueLookupKey() const
|
||||
{
|
||||
return DepGroup.get();
|
||||
}
|
||||
};
|
||||
|
||||
/**
|
||||
|
|
@ -154,21 +108,20 @@ struct PendingDependencyGroupStateItem : PendingQueueItem
|
|||
*
|
||||
* This struct represents a pending dependency child registration into a dependency group.
|
||||
* It contains a pointer to the dependency group and the checkable child being registered.
|
||||
* The dirty bits in the base struct are not used for this item type, as the operation is specific
|
||||
* to registering the child into the dependency group.
|
||||
*
|
||||
* @ingroup icingadb
|
||||
*/
|
||||
struct PendingDependencyEdgeItem : PendingQueueItem
|
||||
struct PendingDependencyEdgeItem
|
||||
{
|
||||
DependencyGroup::Ptr DepGroup;
|
||||
Checkable::Ptr Child;
|
||||
|
||||
PendingDependencyEdgeItem(const DependencyGroup::Ptr& depGroup, const Checkable::Ptr& child);
|
||||
|
||||
Key GetID() const override { return std::make_pair(Child.get(), DepGroup.get()); }
|
||||
ConfigObject::Ptr GetObjectToLock() const override { return Child; }
|
||||
void Execute(IcingaDB& icingadb) const override;
|
||||
[[nodiscard]] std::pair<DependencyGroup*, Checkable*> GetQueueLookupKey() const
|
||||
{
|
||||
return {DepGroup.get(), Child.get()};
|
||||
}
|
||||
};
|
||||
|
||||
/**
|
||||
|
|
@ -181,7 +134,7 @@ struct PendingDependencyEdgeItem : PendingQueueItem
|
|||
*
|
||||
* @ingroup icingadb
|
||||
*/
|
||||
struct RelationsDeletionItem : PendingQueueItem
|
||||
struct RelationsDeletionItem
|
||||
{
|
||||
std::string ID;
|
||||
// Set of Redis keys from which to delete a relation, along with their checksums (if any).
|
||||
|
|
@ -190,10 +143,79 @@ struct RelationsDeletionItem : PendingQueueItem
|
|||
|
||||
RelationsDeletionItem(const String& id, const RelationsKeySet& relations);
|
||||
|
||||
Key GetID() const override { return ID; }
|
||||
void Execute(IcingaDB& icingadb) const override;
|
||||
[[nodiscard]] std::string_view GetQueueLookupKey() const
|
||||
{
|
||||
return ID;
|
||||
}
|
||||
};
|
||||
|
||||
/**
|
||||
* A pending queue item.
|
||||
*
|
||||
* This struct represents a generic pending item in the queue. The @c EnqueueTime field records the
|
||||
* time when the item was added to the queue, which can be useful for tracking how long an item waits before
|
||||
* being processed. This base struct wraps one of the available more specific pending item types that operate
|
||||
* on different kinds of objects, such as configuration objects or dependency groups.
|
||||
*
|
||||
* @ingroup icingadb
|
||||
*/
|
||||
struct PendingQueueItem
|
||||
{
|
||||
using ItemVariant = std::variant<
|
||||
PendingConfigItem,
|
||||
PendingDependencyGroupStateItem,
|
||||
PendingDependencyEdgeItem,
|
||||
RelationsDeletionItem
|
||||
>;
|
||||
|
||||
ItemVariant Item;
|
||||
std::chrono::steady_clock::time_point EnqueueTime{std::chrono::steady_clock::now()};
|
||||
|
||||
template<typename T,
|
||||
// don't hide the default copy and move constructors
|
||||
typename = std::enable_if_t<!std::is_same_v<std::decay_t<T>, PendingQueueItem>>
|
||||
>
|
||||
explicit PendingQueueItem(T&& item) : Item(std::forward<T>(item)) {}
|
||||
};
|
||||
|
||||
template<typename T>
|
||||
struct KeyExtractor /* not implemented, only the specialization for std::variant is used */;
|
||||
|
||||
/**
|
||||
* Helper to use @c PendingQueueItem in @c boost::multi_index_container.
|
||||
*
|
||||
* It implements a key extractor that @c multi_index_container will invoke for each element to create an index.
|
||||
* Every individual type in @c PendingQueueItem::ItemVariant implements a method @c GetQueueLookupKey() that will
|
||||
* be invoked by this key extractor. The return value from the individual type is then returned as part of a second
|
||||
* variant type (its element types are automatically deduced from the individual @c GetQueueLookupKey() methods).
|
||||
* @c multi_index_container will then just use the standard comparison operators on it for building the index.
|
||||
*
|
||||
* @ingroup icingadb
|
||||
*/
|
||||
template<typename ...Ts>
|
||||
struct KeyExtractor<std::variant<Ts...>> {
|
||||
using result_type = std::variant<decltype(std::declval<Ts>().GetQueueLookupKey())...>;
|
||||
|
||||
result_type operator()(const PendingQueueItem& queueItem) const
|
||||
{
|
||||
return std::visit([](const auto& innerItem) -> result_type {
|
||||
return innerItem.GetQueueLookupKey();
|
||||
}, queueItem.Item);
|
||||
};
|
||||
};
|
||||
|
||||
// A multi-index container for managing pending items with unique IDs and maintaining insertion order.
|
||||
// The first index is an ordered unique index based on the pending item key, allowing for efficient
|
||||
// lookups and ensuring uniqueness of items. The second index is a sequenced index that maintains the
|
||||
// order of insertion, enabling FIFO processing of pending items.
|
||||
using PendingItemsSet = boost::multi_index_container<
|
||||
PendingQueueItem,
|
||||
boost::multi_index::indexed_by<
|
||||
boost::multi_index::ordered_unique<KeyExtractor<PendingQueueItem::ItemVariant>>,
|
||||
boost::multi_index::sequenced<>
|
||||
>
|
||||
>;
|
||||
|
||||
} // namespace icingadb::task_queue
|
||||
|
||||
/**
|
||||
|
|
@ -453,16 +475,16 @@ private:
|
|||
void PendingItemsThreadProc();
|
||||
std::chrono::duration<double> DequeueAndProcessOne(std::unique_lock<std::mutex>& lock);
|
||||
|
||||
void ProcessQueueItem(const icingadb::task_queue::PendingConfigItem& item);
|
||||
void ProcessQueueItem(const icingadb::task_queue::PendingDependencyGroupStateItem& item) const;
|
||||
void ProcessQueueItem(const icingadb::task_queue::PendingDependencyEdgeItem& item);
|
||||
void ProcessQueueItem(const icingadb::task_queue::RelationsDeletionItem& item);
|
||||
|
||||
void EnqueueConfigObject(const ConfigObject::Ptr& object, uint32_t bits);
|
||||
void EnqueueDependencyGroupStateUpdate(const DependencyGroup::Ptr& depGroup);
|
||||
void EnqueueDependencyChildRegistered(const DependencyGroup::Ptr& depGroup, const Checkable::Ptr& child);
|
||||
void EnqueueDependencyChildRemoved(const DependencyGroup::Ptr& depGroup, const std::vector<Dependency::Ptr>& dependencies, bool removeGroup);
|
||||
void EnqueueRelationsDeletion(const String& id, const icingadb::task_queue::RelationsDeletionItem::RelationsKeySet& relations);
|
||||
|
||||
friend struct icingadb::task_queue::PendingConfigItem;
|
||||
friend struct icingadb::task_queue::PendingDependencyGroupStateItem;
|
||||
friend struct icingadb::task_queue::PendingDependencyEdgeItem;
|
||||
friend struct icingadb::task_queue::RelationsDeletionItem;
|
||||
void EnqueueRelationsDeletion(const String& id, icingadb::task_queue::RelationsDeletionItem::RelationsKeySet relations);
|
||||
};
|
||||
}
|
||||
|
||||
|
|
|
|||
Loading…
Reference in a new issue