mirror of
https://github.com/Icinga/icinga2.git
synced 2026-05-28 04:12:13 -04:00
IcingaDB: track object changesets efficiently
This commit is contained in:
parent
105c066649
commit
eab507f892
8 changed files with 746 additions and 28 deletions
|
|
@ -144,6 +144,22 @@ void DependencyGroup::LoadParents(std::set<Checkable::Ptr>& parents) const
|
|||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Retrieve any child Checkable from the current dependency group.
|
||||
*
|
||||
* @return - Returns the first child Checkable found in this group, or nullptr if there are no children.
|
||||
*/
|
||||
Checkable::Ptr DependencyGroup::GetAnyChild() const
|
||||
{
|
||||
std::lock_guard lock(m_Mutex);
|
||||
for (auto& [_, children] : m_Members) {
|
||||
if (!children.empty()) {
|
||||
return children.begin()->second->GetChild();
|
||||
}
|
||||
}
|
||||
return nullptr;
|
||||
}
|
||||
|
||||
/**
|
||||
* Retrieve the number of dependency objects in the current dependency group.
|
||||
*
|
||||
|
|
|
|||
|
|
@ -164,6 +164,7 @@ public:
|
|||
void RemoveDependency(const Dependency::Ptr& dependency);
|
||||
std::vector<Dependency::Ptr> GetDependenciesForChild(const Checkable* child) const;
|
||||
void LoadParents(std::set<Checkable::Ptr>& parents) const;
|
||||
Checkable::Ptr GetAnyChild() const;
|
||||
size_t GetDependenciesCount() const;
|
||||
|
||||
void SetIcingaDBIdentifier(const String& identifier);
|
||||
|
|
|
|||
|
|
@ -6,7 +6,7 @@ mkclass_target(icingadb.ti icingadb-ti.cpp icingadb-ti.hpp)
|
|||
mkembedconfig_target(icingadb-itl.conf icingadb-itl.cpp)
|
||||
|
||||
set(icingadb_SOURCES
|
||||
icingadb.cpp icingadb-objects.cpp icingadb-stats.cpp icingadb-utility.cpp redisconnection.cpp icingadb-ti.hpp
|
||||
icingadb.cpp icingadb-objects.cpp icingadb-stats.cpp icingadb-utility.cpp icingadb-worker.cpp redisconnection.cpp icingadb-ti.hpp
|
||||
icingadbchecktask.cpp icingadb-itl.cpp
|
||||
)
|
||||
|
||||
|
|
|
|||
|
|
@ -26,6 +26,18 @@
|
|||
|
||||
using namespace icinga;
|
||||
|
||||
/**
|
||||
* Checks if the given Redis key is a state key (ends with ":state").
|
||||
*
|
||||
* @param key The Redis key to check.
|
||||
*
|
||||
* @return true if the key is a state key, false otherwise.
|
||||
*/
|
||||
bool IcingaDB::IsStateKey(const RedisConnection::QueryArg& key)
|
||||
{
|
||||
return boost::algorithm::ends_with(static_cast<std::string_view>(key), ":state");
|
||||
}
|
||||
|
||||
String IcingaDB::FormatCheckSumBinary(const String& str)
|
||||
{
|
||||
char output[20*2+1];
|
||||
|
|
@ -170,30 +182,8 @@ Dictionary::Ptr IcingaDB::SerializeVars(const Dictionary::Ptr& vars)
|
|||
*/
|
||||
Dictionary::Ptr IcingaDB::SerializeDependencyEdgeState(const DependencyGroup::Ptr& dependencyGroup, const Dependency::Ptr& dep)
|
||||
{
|
||||
String edgeStateId;
|
||||
// The edge state ID is computed a bit differently depending on whether this is for a redundancy group or not.
|
||||
// For redundancy groups, the state ID is supposed to represent the connection state between the redundancy group
|
||||
// and the parent Checkable of the given dependency. Hence, the outcome will always be different for each parent
|
||||
// Checkable of the redundancy group.
|
||||
if (dependencyGroup->IsRedundancyGroup()) {
|
||||
edgeStateId = HashValue(new Array{
|
||||
dependencyGroup->GetIcingaDBIdentifier(),
|
||||
GetObjectIdentifier(dep->GetParent()),
|
||||
});
|
||||
} else if (dependencyGroup->GetIcingaDBIdentifier().IsEmpty()) {
|
||||
// For non-redundant dependency groups, on the other hand, all dependency objects within that group will
|
||||
// always have the same parent Checkable. Likewise, the state ID will be always the same as well it doesn't
|
||||
// matter which dependency object is used to compute it. Therefore, it's sufficient to compute it only once
|
||||
// and all the other dependency objects can reuse the cached state ID.
|
||||
edgeStateId = HashValue(new Array{dependencyGroup->GetCompositeKey(), GetObjectIdentifier(dep->GetParent())});
|
||||
dependencyGroup->SetIcingaDBIdentifier(edgeStateId);
|
||||
} else {
|
||||
// Use the already computed state ID for the dependency group.
|
||||
edgeStateId = dependencyGroup->GetIcingaDBIdentifier();
|
||||
}
|
||||
|
||||
return new Dictionary{
|
||||
{"id", std::move(edgeStateId)},
|
||||
{"id", GetDependencyEdgeStateId(dependencyGroup, dep)},
|
||||
{"environment_id", m_EnvironmentId},
|
||||
{"failed", !dep->IsAvailable(DependencyState) || !dep->GetParent()->IsReachable()}
|
||||
};
|
||||
|
|
@ -220,6 +210,42 @@ Dictionary::Ptr IcingaDB::SerializeRedundancyGroupState(const Checkable::Ptr& ch
|
|||
};
|
||||
}
|
||||
|
||||
/**
|
||||
* Computes the dependency edge state ID for the given dependency object.
|
||||
*
|
||||
* The edge state ID is computed a bit differently depending on whether this is for a redundancy group or not.
|
||||
* For redundancy groups, the state ID is supposed to represent the connection state between the redundancy group
|
||||
* and the parent Checkable of the given dependency. Hence, the outcome will always be different for each parent
|
||||
* Checkable of the redundancy group.
|
||||
*
|
||||
* For non-redundant dependency groups, on the other hand, all dependency objects within that group will
|
||||
* always have the same parent Checkable. Likewise, the state ID will be always the same as well it doesn't
|
||||
* matter which dependency object is used to compute it. Therefore, it's sufficient to compute it only once
|
||||
* and all the other dependency objects can reuse the cached state ID. Thus, this function will cache the just
|
||||
* computed state ID in the dependency group object itself for later reuse.
|
||||
*
|
||||
* @param dependencyGroup The dependency group the dependency is part of.
|
||||
* @param dep The dependency object to compute the state ID for.
|
||||
*
|
||||
* @return The computed edge state ID.
|
||||
*/
|
||||
String IcingaDB::GetDependencyEdgeStateId(const DependencyGroup::Ptr& dependencyGroup, const Dependency::Ptr& dep)
|
||||
{
|
||||
if (dependencyGroup->IsRedundancyGroup()) {
|
||||
return HashValue(new Array{
|
||||
dependencyGroup->GetIcingaDBIdentifier(),
|
||||
GetObjectIdentifier(dep->GetParent()),
|
||||
});
|
||||
}
|
||||
if (dependencyGroup->GetIcingaDBIdentifier().IsEmpty()) {
|
||||
auto edgeStateId = HashValue(new Array{dependencyGroup->GetCompositeKey(), GetObjectIdentifier(dep->GetParent())});
|
||||
dependencyGroup->SetIcingaDBIdentifier(edgeStateId);
|
||||
return edgeStateId;
|
||||
}
|
||||
// Use the already computed state ID for the dependency group.
|
||||
return dependencyGroup->GetIcingaDBIdentifier();
|
||||
}
|
||||
|
||||
/**
|
||||
* Converts the given filter to its Redis value representation.
|
||||
*
|
||||
|
|
|
|||
489
lib/icingadb/icingadb-worker.cpp
Normal file
489
lib/icingadb/icingadb-worker.cpp
Normal file
|
|
@ -0,0 +1,489 @@
|
|||
// SPDX-FileCopyrightText: 2025 Icinga GmbH <https://icinga.com>
|
||||
// SPDX-License-Identifier: GPL-3.0-or-later
|
||||
|
||||
#include "icingadb/icingadb.hpp"
|
||||
#include "base/logger.hpp"
|
||||
#include <vector>
|
||||
|
||||
using namespace icinga;
|
||||
|
||||
PendingQueueItem::PendingQueueItem(PendingItemKey&& id, uint32_t dirtyBits)
|
||||
: DirtyBits{dirtyBits & DirtyBitsAll}, ID{std::move(id)}, EnqueueTime{std::chrono::steady_clock::now()}
|
||||
{
|
||||
}
|
||||
|
||||
PendingConfigItem::PendingConfigItem(const ConfigObject::Ptr& obj, uint32_t bits)
|
||||
: PendingQueueItem{std::make_pair(obj, nullptr), bits}, Object{obj}
|
||||
{
|
||||
}
|
||||
PendingDependencyGroupStateItem::PendingDependencyGroupStateItem(const DependencyGroup::Ptr& depGroup)
|
||||
: PendingQueueItem{std::make_pair(nullptr, depGroup), 0}, DepGroup{depGroup}
|
||||
{
|
||||
}
|
||||
|
||||
PendingDependencyEdgeItem::PendingDependencyEdgeItem(const DependencyGroup::Ptr& depGroup, const Checkable::Ptr& child)
|
||||
: PendingQueueItem{std::make_pair(child, depGroup), 0}, DepGroup{depGroup}, Child{child}
|
||||
{
|
||||
}
|
||||
|
||||
RelationsDeletionItem::RelationsDeletionItem(const String& id, RelationsKeySet relations)
|
||||
: PendingQueueItem{id, 0}, Relations{std::move(relations)}
|
||||
{
|
||||
}
|
||||
|
||||
/**
|
||||
* Background worker thread procedure for processing pending items.
|
||||
*
|
||||
* This function runs in a separate thread and continuously processes pending items that have been
|
||||
* enqueued for Redis updates. It waits for new items to be added to the pending items container,
|
||||
* and processes them one at a time, ensuring that the Redis connection is active and not overloaded
|
||||
* with too many pending queries. The function also implements a delay mechanism to allow for potential
|
||||
* additional changes to be merged into the same item before processing it.
|
||||
*/
|
||||
void IcingaDB::PendingItemsThreadProc()
|
||||
{
|
||||
using namespace std::chrono_literals;
|
||||
namespace ch = std::chrono;
|
||||
|
||||
// Limits the number of pending queries the Rcon can have at any given time to reduce the memory overhead to
|
||||
// the absolute minimum necessary, since the size of the pending queue items is much smaller than the size
|
||||
// of the actual Redis queries. Thus, this will slow down the worker thread a bit from generating too many
|
||||
// Redis queries when the Redis connection is saturated.
|
||||
constexpr std::size_t maxPendingQueries = 128;
|
||||
|
||||
std::unique_lock lock(m_PendingItemsMutex);
|
||||
// Wait until the initial config dump is done. IcingaDB::OnConnectedHandler will notify us once it's finished.
|
||||
while (GetActive() && !m_ConfigDumpDone) m_PendingItemsCV.wait(lock);
|
||||
|
||||
// Predicate to determine whether the worker thread is allowed to process pending items.
|
||||
auto canContinue = [this] {
|
||||
if (!GetActive()) {
|
||||
return true;
|
||||
}
|
||||
return !m_PendingItems.empty() && m_RconWorker && m_RconWorker->IsConnected() && m_RconWorker->GetPendingQueryCount() < maxPendingQueries;
|
||||
};
|
||||
|
||||
while (true) {
|
||||
// Even if someone notifies us, we still need to verify whether the precondition is actually fulfilled.
|
||||
// However, in case we don't receive any notification, we still want to wake up periodically on our own
|
||||
// to check whether we can proceed (e.g. the Redis connection might have become available again and there
|
||||
// was no activity on the pending items queue to trigger a notification). Thus, we use a timed wait here.
|
||||
while (!canContinue()) m_PendingItemsCV.wait_for(lock, 100ms);
|
||||
|
||||
if (!GetActive()) {
|
||||
break;
|
||||
}
|
||||
if (auto retryAfter = DequeueAndProcessOne(lock); retryAfter > 0ms) {
|
||||
m_PendingItemsCV.wait_for(lock, retryAfter);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Dequeue and process a single pending item.
|
||||
*
|
||||
* This function processes a single pending item from the pending items container. It iterates over
|
||||
* the items in insertion order and checks if the first item is old enough to be processed (at least
|
||||
* 1000ms old) unless we're being shutting down. If the item can be processed, it attempts to acquire
|
||||
* a lock on the associated config object (if applicable) and processes the item accordingly.
|
||||
*
|
||||
* If the item cannot be processed right now because it's too new, the function returns a duration
|
||||
* indicating how long to wait before retrying. Also, if no progress was made during this iteration
|
||||
* (i.e., no item was processed), it returns a short delay to avoid busy-looping.
|
||||
*
|
||||
* @param lock A unique lock on the pending items mutex (must be acquired before calling this function).
|
||||
*
|
||||
* @return A duration indicating how long to wait before retrying.
|
||||
*/
|
||||
std::chrono::duration<double> IcingaDB::DequeueAndProcessOne(std::unique_lock<std::mutex>& lock)
|
||||
{
|
||||
using namespace std::chrono_literals;
|
||||
namespace ch = std::chrono;
|
||||
|
||||
bool madeProgress = false; // Did we make any progress in this iteration?
|
||||
ch::duration<double> retryAfter{0}; // If we can't process anything right now, how long to wait before retrying?
|
||||
auto now = ch::steady_clock::now();
|
||||
|
||||
auto& seqView = m_PendingItems.get<1>();
|
||||
for (auto it(seqView.begin()); it != seqView.end(); ++it) {
|
||||
if (it != seqView.begin()) {
|
||||
if (std::holds_alternative<RelationsDeletionItem>(*it)) {
|
||||
// We don't know whether the previous items are related to this deletion item or not,
|
||||
// thus we can't just process this right now when there are older items in the queue.
|
||||
// Otherwise, we might delete something that is going to be updated/created.
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
auto age = now - std::visit([](const auto& item) { return item.EnqueueTime; }, *it);
|
||||
if (GetActive() && 1000ms > age) {
|
||||
if (it == seqView.begin()) {
|
||||
retryAfter = 1000ms - age;
|
||||
}
|
||||
break;
|
||||
}
|
||||
|
||||
ConfigObject::Ptr cobj;
|
||||
if (auto* citem = std::get_if<PendingConfigItem>(&*it); citem) {
|
||||
cobj = citem->Object;
|
||||
}
|
||||
|
||||
ObjectLock olock(cobj, std::defer_lock);
|
||||
if (cobj && !olock.TryLock()) {
|
||||
continue; // Can't lock the object right now, try the next one.
|
||||
}
|
||||
|
||||
PendingItemVariant itemToProcess = *it;
|
||||
seqView.erase(it);
|
||||
madeProgress = true;
|
||||
|
||||
lock.unlock();
|
||||
try {
|
||||
std::visit([this](const auto& item) { ProcessPendingItem(item); }, itemToProcess);
|
||||
} catch (const std::exception& ex) {
|
||||
Log(LogCritical, "IcingaDB")
|
||||
<< "Exception while processing pending item of type index '" << itemToProcess.index() << "': "
|
||||
<< DiagnosticInformation(ex, GetActive());
|
||||
}
|
||||
lock.lock();
|
||||
break;
|
||||
}
|
||||
|
||||
if (!madeProgress && retryAfter == 0ms) {
|
||||
// We haven't made any progress, so give it a short delay before retrying.
|
||||
retryAfter = 10ms;
|
||||
}
|
||||
return retryAfter;
|
||||
}
|
||||
|
||||
/**
|
||||
* Process a single pending object.
|
||||
*
|
||||
* This function processes a single pending object based on its dirty bits. It checks if the object is a
|
||||
* @c ConfigObject and performs the appropriate actions such as sending configuration updates, state updates,
|
||||
* or deletions to the Redis connection. The function handles different types of objects, including @c Checkable
|
||||
* objects, and ensures that the correct updates are sent based on the dirty bits set for the object.
|
||||
*
|
||||
* @param item The pending item containing the object and its dirty bits.
|
||||
*/
|
||||
void IcingaDB::ProcessPendingItem(const PendingConfigItem& item)
|
||||
{
|
||||
if (item.DirtyBits & ConfigDelete) {
|
||||
auto redisKeyPair = GetSyncableTypeRedisKeys(item.Object->GetReflectionType());
|
||||
m_RconWorker->FireAndForgetQueries(
|
||||
{
|
||||
{"HDEL", redisKeyPair.ObjectKey, GetObjectIdentifier(item.Object)},
|
||||
{"HDEL", redisKeyPair.ChecksumKey, GetObjectIdentifier(item.Object)},
|
||||
{
|
||||
"XADD",
|
||||
"icinga:runtime",
|
||||
"MAXLEN",
|
||||
"~",
|
||||
"1000000",
|
||||
"*",
|
||||
"redis_key",
|
||||
redisKeyPair.ObjectKey,
|
||||
"id",
|
||||
GetObjectIdentifier(item.Object),
|
||||
"runtime_type",
|
||||
"delete"
|
||||
}
|
||||
},
|
||||
RedisConnection::QueryPriority::Config
|
||||
);
|
||||
}
|
||||
|
||||
if (item.DirtyBits & ConfigUpdate) {
|
||||
std::map<RedisConnection::QueryArg, RedisConnection::Query> hMSets;
|
||||
std::vector<Dictionary::Ptr> runtimeUpdates;
|
||||
CreateConfigUpdate(item.Object, GetSyncableTypeRedisKeys(item.Object->GetReflectionType()), hMSets, runtimeUpdates, true);
|
||||
ExecuteRedisTransaction(m_RconWorker, hMSets, runtimeUpdates);
|
||||
}
|
||||
|
||||
if (auto checkable = dynamic_pointer_cast<Checkable>(item.Object); checkable) {
|
||||
if (item.DirtyBits & FullState) {
|
||||
UpdateState(checkable, item.DirtyBits);
|
||||
}
|
||||
if (item.DirtyBits & NextUpdate) {
|
||||
SendNextUpdate(checkable);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Process a single pending dependency group state item.
|
||||
*
|
||||
* This function processes a single pending dependency group state item by updating the dependencies
|
||||
* state for the associated dependency group. It selects any child checkable from the dependency group
|
||||
* to initiate the state update process.
|
||||
*
|
||||
* @param item The pending dependency group state item containing the dependency group.
|
||||
*/
|
||||
void IcingaDB::ProcessPendingItem(const PendingDependencyGroupStateItem& item) const
|
||||
{
|
||||
// For dependency group state updates, we don't actually care which child triggered the update,
|
||||
// since all children share the same dependency group state. Thus, we can just pick any child to
|
||||
// start the update from.
|
||||
if (auto child = item.DepGroup->GetAnyChild(); child) {
|
||||
UpdateDependenciesState(child, item.DepGroup);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Process a single pending dependency edge item.
|
||||
*
|
||||
* This function fully serializes a single pending dependency edge item (child registration)
|
||||
* and sends all the resulting Redis queries in a single transaction. The dependencies (edges)
|
||||
* to serialize are determined by the dependency group and child checkable the provided item represents.
|
||||
*
|
||||
* @param item The pending dependency edge item containing the dependency group and child checkable.
|
||||
*/
|
||||
void IcingaDB::ProcessPendingItem(const PendingDependencyEdgeItem& item)
|
||||
{
|
||||
std::vector<Dictionary::Ptr> runtimeUpdates;
|
||||
std::map<RedisConnection::QueryArg, RedisConnection::Query> hMSets;
|
||||
InsertCheckableDependencies(item.Child, hMSets, &runtimeUpdates, item.DepGroup);
|
||||
ExecuteRedisTransaction(m_RconWorker, hMSets, runtimeUpdates);
|
||||
}
|
||||
|
||||
/**
|
||||
* Process a single pending deletion item.
|
||||
*
|
||||
* This function processes a single pending deletion item by deleting the specified sub-keys
|
||||
* from Redis based on the provided deletion keys map. It ensures that the object's ID is
|
||||
* removed from the specified Redis keys and their corresponding checksum keys if indicated.
|
||||
*
|
||||
* @param item The pending deletion item containing the ID and deletion keys map.
|
||||
*/
|
||||
void IcingaDB::ProcessPendingItem(const RelationsDeletionItem& item)
|
||||
{
|
||||
ASSERT(std::holds_alternative<std::string>(item.ID)); // Relation deletion items must have real IDs.
|
||||
|
||||
auto id = std::get<std::string>(item.ID);
|
||||
for (auto [configKey, checksumKey] : item.Relations) {
|
||||
if (IsStateKey(configKey)) {
|
||||
DeleteState(id, configKey, checksumKey);
|
||||
} else {
|
||||
DeleteRelationship(id, configKey, checksumKey);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Enqueue a configuration object for processing in the pending objects thread.
|
||||
*
|
||||
* @param object The configuration object to be enqueued for processing.
|
||||
* @param bits The dirty bits indicating the type of changes to be processed for the object.
|
||||
*/
|
||||
void IcingaDB::EnqueueConfigObject(const ConfigObject::Ptr& object, uint32_t bits)
|
||||
{
|
||||
if (!GetActive() || !m_RconWorker || !m_RconWorker->IsConnected()) {
|
||||
return; // No need to enqueue anything if we're not connected.
|
||||
}
|
||||
|
||||
{
|
||||
std::lock_guard lock(m_PendingItemsMutex);
|
||||
if (auto [it, inserted] = m_PendingItems.insert(PendingConfigItem{object, bits}); !inserted) {
|
||||
m_PendingItems.modify(it, [bits](PendingItemVariant& itemToProcess) {
|
||||
std::visit(
|
||||
[&bits](auto& item) {
|
||||
if (bits & ConfigDelete) {
|
||||
item.DirtyBits &= ~(ConfigUpdate | FullState);
|
||||
} else if (bits & ConfigUpdate) {
|
||||
item.DirtyBits &= ~ConfigDelete;
|
||||
}
|
||||
item.DirtyBits |= bits & DirtyBitsAll;
|
||||
},
|
||||
itemToProcess
|
||||
);
|
||||
});
|
||||
}
|
||||
}
|
||||
m_PendingItemsCV.notify_one();
|
||||
}
|
||||
|
||||
void IcingaDB::EnqueueDependencyGroupStateUpdate(const DependencyGroup::Ptr& depGroup)
|
||||
{
|
||||
if (GetActive() && m_RconWorker && m_RconWorker->IsConnected()) {
|
||||
{
|
||||
std::lock_guard lock(m_PendingItemsMutex);
|
||||
m_PendingItems.insert(PendingDependencyGroupStateItem{depGroup});
|
||||
}
|
||||
m_PendingItemsCV.notify_one();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Enqueue the registration of a dependency child to a dependency group.
|
||||
*
|
||||
* This function adds a pending item to the queue for processing the registration of a child checkable
|
||||
* to a dependency group. If there is no active Redis connection available, this function is a no-op.
|
||||
*
|
||||
* @param depGroup The dependency group to which the child is being registered.
|
||||
* @param child The child checkable being registered to the dependency group.
|
||||
*/
|
||||
void IcingaDB::EnqueueDependencyChildRegistered(const DependencyGroup::Ptr& depGroup, const Checkable::Ptr& child)
|
||||
{
|
||||
if (GetActive() && m_RconWorker && m_RconWorker->IsConnected()) {
|
||||
{
|
||||
std::lock_guard lock(m_PendingItemsMutex);
|
||||
m_PendingItems.insert(PendingDependencyEdgeItem{depGroup, child});
|
||||
}
|
||||
m_PendingItemsCV.notify_one();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Enqueue the removal of a dependency child from a dependency group.
|
||||
*
|
||||
* This function handles the removal of a child checkable from a dependency group by first checking if there
|
||||
* are any pending registration items for the same child and dependency group. If such an item exists, it is
|
||||
* removed from the pending items queue, effectively canceling the registration. If there is also a pending
|
||||
* dependency group state update triggered by the same child, it is either removed or updated to use a different
|
||||
* child if the group is not being removed entirely. If no pending registration exists, the function proceeds
|
||||
* to enqueue the necessary deletions in Redis for the dependencies and related nodes and edges.
|
||||
*
|
||||
* @param depGroup The dependency group from which the child is being removed.
|
||||
* @param dependencies The list of dependencies associated with the child being removed.
|
||||
* @param removeGroup A flag indicating whether the entire dependency group should be removed.
|
||||
*/
|
||||
void IcingaDB::EnqueueDependencyChildRemoved(
|
||||
const DependencyGroup::Ptr& depGroup,
|
||||
const std::vector<Dependency::Ptr>& dependencies,
|
||||
bool removeGroup
|
||||
)
|
||||
{
|
||||
if (dependencies.empty() || !GetActive() || !m_RconWorker || !m_RconWorker->IsConnected()) {
|
||||
return; // No need to enqueue anything if we're not connected or there are no dependencies.
|
||||
}
|
||||
|
||||
Checkable::Ptr child(dependencies.front()->GetChild());
|
||||
bool cancelledRegistration = false;
|
||||
|
||||
{
|
||||
std::lock_guard lock(m_PendingItemsMutex);
|
||||
if (auto it(m_PendingItems.find(std::make_pair(child, depGroup))); it != m_PendingItems.end()) {
|
||||
cancelledRegistration = true;
|
||||
m_PendingItems.erase(it);
|
||||
if (removeGroup) {
|
||||
// If we're removing the entire group registration, we can also drop any pending dependency group
|
||||
// state update triggered previously as it should no longer have any children left.
|
||||
m_PendingItems.erase(std::make_pair(nullptr, depGroup));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if (!child->HasAnyDependencies()) {
|
||||
// If the child Checkable has no parent and reverse dependencies, we can safely remove the dependency node.
|
||||
// This might be a no-op in some cases (e.g. if the child's only dependency was the one that we just canceled
|
||||
// above), but since we can't reliably determine whether the node exists in Redis or not, we just enqueue the
|
||||
// deletion anyway.
|
||||
EnqueueRelationsDeletion(GetObjectIdentifier(child), {{CONFIG_REDIS_KEY_PREFIX "dependency:node", ""}});
|
||||
}
|
||||
|
||||
if (cancelledRegistration && depGroup->GetIcingaDBIdentifier().IsEmpty()) {
|
||||
// If we had a pending registration that we just canceled above, and the dependency group has no
|
||||
// IcingaDB identifier yet, then there's no need to proceed with any deletions, as the dependency
|
||||
// group was never serialized to Redis in the first place.
|
||||
return;
|
||||
}
|
||||
|
||||
if (depGroup->GetIcingaDBIdentifier().IsEmpty()) {
|
||||
// An empty IcingaDB identifier indicates that the worker thread has just picked up the registration of the
|
||||
// first child (removed from the pending items queue) but hasn't yet entered the InsertCheckableDependencies()
|
||||
// function to actually fill in the IcingaDB identifier. Thus, we need to generate and set it here to ensure
|
||||
// that the relation deletions below use the correct identifier.
|
||||
if (depGroup->IsRedundancyGroup()) {
|
||||
// Keep this with IcingaDB::InsertCheckableDependencies in sync!
|
||||
depGroup->SetIcingaDBIdentifier(HashValue(new Array{m_EnvironmentId, depGroup->GetCompositeKey()}));
|
||||
} else {
|
||||
// This will set the IcingaDB identifier of the dependency group as a side effect.
|
||||
(void)GetDependencyEdgeStateId(depGroup, dependencies.front());
|
||||
}
|
||||
}
|
||||
|
||||
std::set<Checkable*> detachedParents;
|
||||
for (const auto& dependency : dependencies) {
|
||||
const auto& parent(dependency->GetParent());
|
||||
if (auto [_, inserted] = detachedParents.insert(dependency->GetParent().get()); inserted) {
|
||||
String edgeId;
|
||||
if (depGroup->IsRedundancyGroup()) {
|
||||
// If the redundancy group has no members left, it's going to be removed as well, so we need to
|
||||
// delete dependency edges from that group to the parent Checkables.
|
||||
if (removeGroup) {
|
||||
EnqueueRelationsDeletion(
|
||||
GetDependencyEdgeStateId(depGroup, dependency),
|
||||
{
|
||||
{CONFIG_REDIS_KEY_PREFIX "dependency:edge", ""},
|
||||
{CONFIG_REDIS_KEY_PREFIX "dependency:edge:state", ""},
|
||||
}
|
||||
);
|
||||
}
|
||||
|
||||
// Remove the connection from the child Checkable to the redundancy group.
|
||||
edgeId = HashValue(new Array{GetObjectIdentifier(child), depGroup->GetIcingaDBIdentifier()});
|
||||
} else {
|
||||
// Remove the edge between the parent and child Checkable linked through the removed dependency.
|
||||
edgeId = HashValue(new Array{GetObjectIdentifier(child), GetObjectIdentifier(parent)});
|
||||
}
|
||||
|
||||
EnqueueRelationsDeletion(edgeId, {{CONFIG_REDIS_KEY_PREFIX "dependency:edge", ""}});
|
||||
|
||||
// The total_children and affects_children columns might now have different outcome, so update the parent
|
||||
// Checkable as well. The grandparent Checkable may still have wrong numbers of total children, though it's
|
||||
// not worth traversing the whole tree way up and sending config updates for each one of them, as the next
|
||||
// Redis config dump is going to fix it anyway.
|
||||
EnqueueConfigObject(parent, ConfigUpdate);
|
||||
|
||||
if (!parent->HasAnyDependencies()) {
|
||||
// If the parent Checkable isn't part of any other dependency chain anymore, drop its dependency node entry.
|
||||
EnqueueRelationsDeletion(GetObjectIdentifier(parent), {{CONFIG_REDIS_KEY_PREFIX "dependency:node", ""}});
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if (removeGroup && depGroup->IsRedundancyGroup()) {
|
||||
EnqueueRelationsDeletion(
|
||||
depGroup->GetIcingaDBIdentifier(),
|
||||
{
|
||||
{CONFIG_REDIS_KEY_PREFIX "dependency:node", ""},
|
||||
{CONFIG_REDIS_KEY_PREFIX "redundancygroup", ""},
|
||||
{CONFIG_REDIS_KEY_PREFIX "redundancygroup:state", ""},
|
||||
{CONFIG_REDIS_KEY_PREFIX "dependency:edge:state", ""}
|
||||
}
|
||||
);
|
||||
} else if (removeGroup) {
|
||||
// Note: The Icinga DB identifier of a non-redundant dependency group is used as the edge state ID
|
||||
// and shared by all of its dependency objects. See also SerializeDependencyEdgeState() for details.
|
||||
EnqueueRelationsDeletion(depGroup->GetIcingaDBIdentifier(), {{CONFIG_REDIS_KEY_PREFIX "dependency:edge:state", ""}});
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Enqueue a relation deletion for processing in the pending objects thread.
|
||||
*
|
||||
* This function adds a relation deletion item to the set of pending items to be processed by the
|
||||
* pending items worker thread. The relation deletion item contains the ID of the relation to be
|
||||
* deleted and a map of Redis keys from which to delete the relation. If the relation deletion item
|
||||
* is already in the set, it updates the deletion keys accordingly.
|
||||
*
|
||||
* @param id The ID of the relation to be deleted.
|
||||
* @param relations A map of Redis keys from which to delete the relation.
|
||||
*/
|
||||
void IcingaDB::EnqueueRelationsDeletion(const String& id, const RelationsKeySet& relations)
|
||||
{
|
||||
if (!GetActive() || !m_RconWorker || !m_RconWorker->IsConnected()) {
|
||||
return; // No need to enqueue anything if we're not connected.
|
||||
}
|
||||
|
||||
{
|
||||
std::lock_guard lock(m_PendingItemsMutex);
|
||||
if (auto [it, inserted] = m_PendingItems.insert(RelationsDeletionItem{id, relations}); !inserted) {
|
||||
m_PendingItems.modify(it, [&relations](PendingItemVariant& val) {
|
||||
auto& item = std::get<RelationsDeletionItem>(val);
|
||||
item.Relations.insert(relations.begin(), relations.end());
|
||||
});
|
||||
}
|
||||
}
|
||||
m_PendingItemsCV.notify_one();
|
||||
}
|
||||
|
|
@ -131,6 +131,7 @@ void IcingaDB::Start(bool runtimeCreated)
|
|||
Ptr keepAlive (this);
|
||||
|
||||
m_HistoryThread = std::async(std::launch::async, [this, keepAlive]() { ForwardHistoryEntries(); });
|
||||
m_PendingItemsThread = std::thread([this, keepAlive] { PendingItemsThreadProc(); });
|
||||
}
|
||||
|
||||
void IcingaDB::ExceptionHandler(boost::exception_ptr exp)
|
||||
|
|
@ -154,9 +155,11 @@ void IcingaDB::OnConnectedHandler()
|
|||
|
||||
UpdateAllConfigObjects();
|
||||
|
||||
m_ConfigDumpDone = true;
|
||||
|
||||
m_ConfigDumpDone.store(true);
|
||||
m_ConfigDumpInProgress = false;
|
||||
// Notify the pending items worker to let it know that the config dump is done,
|
||||
// and it can start processing pending items.
|
||||
m_PendingItemsCV.notify_one();
|
||||
}
|
||||
|
||||
void IcingaDB::PublishStatsTimerHandler(void)
|
||||
|
|
@ -192,6 +195,8 @@ void IcingaDB::Stop(bool runtimeRemoved)
|
|||
Log(LogInformation, "IcingaDB")
|
||||
<< "Flushing history data buffer to Redis.";
|
||||
|
||||
m_PendingItemsCV.notify_all(); // Wake up the pending items worker to let it exit cleanly.
|
||||
|
||||
if (m_HistoryThread.wait_for(std::chrono::minutes(1)) == std::future_status::timeout) {
|
||||
Log(LogCritical, "IcingaDB")
|
||||
<< "Flushing takes more than one minute (while we're about to shut down). Giving up and discarding "
|
||||
|
|
@ -199,6 +204,7 @@ void IcingaDB::Stop(bool runtimeRemoved)
|
|||
}
|
||||
|
||||
m_StatsTimer->Stop(true);
|
||||
m_PendingItemsThread.join();
|
||||
|
||||
Log(LogInformation, "IcingaDB")
|
||||
<< "'" << GetName() << "' stopped.";
|
||||
|
|
|
|||
|
|
@ -16,6 +16,9 @@
|
|||
#include "icinga/service.hpp"
|
||||
#include "icinga/downtime.hpp"
|
||||
#include "remote/messageorigin.hpp"
|
||||
#include <boost/multi_index_container.hpp>
|
||||
#include <boost/multi_index/ordered_index.hpp>
|
||||
#include <boost/multi_index/sequenced_index.hpp>
|
||||
#include <atomic>
|
||||
#include <chrono>
|
||||
#include <future>
|
||||
|
|
@ -32,6 +35,132 @@ namespace icinga
|
|||
#define CONFIG_REDIS_KEY_PREFIX "icinga:"
|
||||
#define CHECKSUM_REDIS_KEY_PREFIX CONFIG_REDIS_KEY_PREFIX "checksum:"
|
||||
|
||||
/**
|
||||
* Dirty bits for config/state changes.
|
||||
*
|
||||
* These are used to mark objects as "dirty" in order to trigger appropriate updates in Redis.
|
||||
* Each bit represents a different type of change that requires a specific action to be taken.
|
||||
*
|
||||
* @ingroup icingadb
|
||||
*/
|
||||
enum DirtyBits : uint32_t
|
||||
{
|
||||
ConfigUpdate = 1<<0, // Trigger a Redis config update for the object.
|
||||
ConfigDelete = 1<<1, // Send a deletion command for the object to Redis.
|
||||
VolatileState = 1<<2, // Send a volatile state update to Redis (affects only checkables).
|
||||
RuntimeState = 1<<3, // Send a runtime state update to Redis (affects only checkables).
|
||||
NextUpdate = 1<<4, // Update the `icinga:nextupdate:{host,service}` Redis keys (affects only checkables).
|
||||
|
||||
FullState = VolatileState | RuntimeState, // A combination of all (non-dependency) state-related dirty bits.
|
||||
|
||||
// All valid dirty bits combined used for masking input values.
|
||||
DirtyBitsAll = ConfigUpdate | ConfigDelete | FullState | NextUpdate
|
||||
};
|
||||
|
||||
/**
|
||||
* A variant type representing the identifier of a pending item.
|
||||
*
|
||||
* This variant can hold either a string representing a real Redis hash key or a pair consisting of
|
||||
* a configuration object pointer and a dependency group pointer. A pending item identified by the
|
||||
* latter variant type operates primarily on the associated configuration object or dependency group,
|
||||
* thus the pairs are used for uniqueness in the pending items container.
|
||||
*
|
||||
* @ingroup icingadb
|
||||
*/
|
||||
using PendingItemKey = std::variant<std::string /* Redis hash keys */, std::pair<ConfigObject::Ptr, DependencyGroup::Ptr>>;
|
||||
|
||||
/**
|
||||
* A pending queue item.
|
||||
*
|
||||
* This struct represents a generic pending item in the queue that is associated with a unique identifier
|
||||
* and dirty bits indicating the type of updates required in Redis. The @c EnqueueTime field records the
|
||||
* time when the item was added to the queue, which can be useful for tracking how long an item waits before
|
||||
* being processed. This base struct is extended by more specific pending item types that operate on different
|
||||
* kinds of objects, such as configuration objects or dependency groups.
|
||||
*
|
||||
* @ingroup icingadb
|
||||
*/
|
||||
struct PendingQueueItem
|
||||
{
|
||||
uint32_t DirtyBits;
|
||||
PendingItemKey ID;
|
||||
const std::chrono::steady_clock::time_point EnqueueTime;
|
||||
|
||||
PendingQueueItem(PendingItemKey&& id, uint32_t dirtyBits);
|
||||
};
|
||||
|
||||
/**
|
||||
* A pending configuration object item.
|
||||
*
|
||||
* This struct represents a pending item in the queue that is associated with a configuration object.
|
||||
* It contains a pointer to the configuration object and the dirty bits indicating the type of updates
|
||||
* required for that object in Redis. A pending configuration item operates primarily on config objects,
|
||||
* thus the @c ID field in the base struct is only used for uniqueness in the pending items container.
|
||||
*
|
||||
* @ingroup icingadb
|
||||
*/
|
||||
struct PendingConfigItem : PendingQueueItem
|
||||
{
|
||||
ConfigObject::Ptr Object;
|
||||
|
||||
PendingConfigItem(const ConfigObject::Ptr& obj, uint32_t bits);
|
||||
};
|
||||
|
||||
/**
|
||||
* A pending dependency group state item.
|
||||
*
|
||||
* This struct represents a pending item in the queue that is associated with a dependency group.
|
||||
* It contains a pointer to the dependency group for which state updates are required. The dirty bits
|
||||
* in the base struct are not used for this item type, as the operation is specific to updating the
|
||||
* state of the dependency group itself.
|
||||
*
|
||||
* @ingroup icingadb
|
||||
*/
|
||||
struct PendingDependencyGroupStateItem : PendingQueueItem
|
||||
{
|
||||
DependencyGroup::Ptr DepGroup;
|
||||
|
||||
explicit PendingDependencyGroupStateItem(const DependencyGroup::Ptr& depGroup);
|
||||
};
|
||||
|
||||
/**
|
||||
* A pending dependency edge item.
|
||||
*
|
||||
* This struct represents a pending dependency child registration into a dependency group.
|
||||
* It contains a pointer to the dependency group and the checkable child being registered.
|
||||
* The dirty bits in the base struct are not used for this item type, as the operation is specific
|
||||
* to registering the child into the dependency group.
|
||||
*
|
||||
* @ingroup icingadb
|
||||
*/
|
||||
struct PendingDependencyEdgeItem : PendingQueueItem
|
||||
{
|
||||
DependencyGroup::Ptr DepGroup;
|
||||
Checkable::Ptr Child;
|
||||
|
||||
PendingDependencyEdgeItem(const DependencyGroup::Ptr& depGroup, const Checkable::Ptr& child);
|
||||
};
|
||||
|
||||
// Set of Redis keys from which to delete a relation, along with their checksums (if any).
|
||||
using RelationsKeySet = std::set<std::pair<RedisConnection::QueryArg, RedisConnection::QueryArg>>;
|
||||
|
||||
/**
|
||||
* A pending relations deletion item.
|
||||
*
|
||||
* This struct represents a pending item in the queue that is associated with the deletion of relations
|
||||
* in Redis. It contains a map of Redis keys from which the relation identified by the @c ID field should
|
||||
* be deleted. The @c ID field represents the unique identifier of the relation to be deleted, and the
|
||||
* @c Relations map specifies the Redis keys and whether to delete the corresponding checksum keys.
|
||||
*
|
||||
* @ingroup icingadb
|
||||
*/
|
||||
struct RelationsDeletionItem : PendingQueueItem
|
||||
{
|
||||
RelationsKeySet Relations;
|
||||
|
||||
RelationsDeletionItem(const String& id, RelationsKeySet relations);
|
||||
};
|
||||
|
||||
/**
|
||||
* @ingroup icingadb
|
||||
*/
|
||||
|
|
@ -174,6 +303,7 @@ private:
|
|||
static void DeleteKeys(const RedisConnection::Ptr& conn, const std::vector<RedisConnection::QueryArg>& keys, RedisConnection::QueryPriority priority);
|
||||
static std::vector<RedisConnection::QueryArg> GetTypeOverwriteKeys(const Type::Ptr& type, bool skipChecksums = false);
|
||||
static void AddDataToHmSets(std::map<RedisConnection::QueryArg, RedisConnection::Query>& hMSets, const RedisConnection::QueryArg& redisKey, const String& id, const Dictionary::Ptr& data);
|
||||
static bool IsStateKey(const RedisConnection::QueryArg& key);
|
||||
static String FormatCheckSumBinary(const String& str);
|
||||
static String FormatCommandLine(const Value& commandLine);
|
||||
static QueryArgPair GetCheckableStateKeys(const Type::Ptr& type);
|
||||
|
|
@ -192,6 +322,7 @@ private:
|
|||
static Dictionary::Ptr SerializeVars(const Dictionary::Ptr& vars);
|
||||
static Dictionary::Ptr SerializeDependencyEdgeState(const DependencyGroup::Ptr& dependencyGroup, const Dependency::Ptr& dep);
|
||||
static Dictionary::Ptr SerializeRedundancyGroupState(const Checkable::Ptr& child, const DependencyGroup::Ptr& redundancyGroup);
|
||||
static String GetDependencyEdgeStateId(const DependencyGroup::Ptr& dependencyGroup, const Dependency::Ptr& dep);
|
||||
|
||||
static String HashValue(const Value& value);
|
||||
static String HashValue(const Value& value, const std::set<String>& propertiesBlacklist, bool propertiesWhitelist = false);
|
||||
|
|
@ -253,7 +384,7 @@ private:
|
|||
Bulker<RedisConnection::Query> m_HistoryBulker {4096, std::chrono::milliseconds(250)};
|
||||
|
||||
bool m_ConfigDumpInProgress;
|
||||
bool m_ConfigDumpDone;
|
||||
std::atomic_bool m_ConfigDumpDone{false};
|
||||
|
||||
/**
|
||||
* The primary Redis connection used to send history and heartbeat queries.
|
||||
|
|
@ -290,6 +421,55 @@ private:
|
|||
// initialization, the value is read-only and can be accessed without further synchronization.
|
||||
static String m_EnvironmentId;
|
||||
static std::mutex m_EnvironmentIdInitMutex;
|
||||
|
||||
// A variant type that can hold any of the pending item types used in the pending items container.
|
||||
using PendingItemVariant = std::variant<
|
||||
PendingConfigItem,
|
||||
PendingDependencyGroupStateItem,
|
||||
PendingDependencyEdgeItem,
|
||||
RelationsDeletionItem
|
||||
>;
|
||||
|
||||
struct PendingItemKeyExtractor
|
||||
{
|
||||
// The type of the key extracted from a pending item required by Boost.MultiIndex.
|
||||
using result_type = const PendingItemKey&;
|
||||
|
||||
result_type operator()(const PendingItemVariant& item) const
|
||||
{
|
||||
return std::visit([](const auto& pendingItem) -> result_type { return pendingItem.ID; }, item);
|
||||
}
|
||||
};
|
||||
|
||||
// A multi-index container for managing pending items with unique IDs and maintaining insertion order.
|
||||
// The first index is an ordered unique index based on the pending item key, allowing for efficient
|
||||
// lookups and ensuring uniqueness of items. The second index is a sequenced index that maintains the
|
||||
// order of insertion, enabling FIFO processing of pending items.
|
||||
using PendingItemsSet = boost::multi_index_container<
|
||||
PendingItemVariant,
|
||||
boost::multi_index::indexed_by<
|
||||
boost::multi_index::ordered_unique<PendingItemKeyExtractor>, // std::variant has operator< defined.
|
||||
boost::multi_index::sequenced<>
|
||||
>
|
||||
>;
|
||||
|
||||
std::thread m_PendingItemsThread; // The background worker thread (consumer of m_PendingItems).
|
||||
PendingItemsSet m_PendingItems; // Container for pending items with dirty bits (access protected by m_PendingItemsMutex).
|
||||
std::mutex m_PendingItemsMutex; // Mutex to protect access to m_PendingItems.
|
||||
std::condition_variable m_PendingItemsCV; // Condition variable to forcefully wake up the worker thread.
|
||||
|
||||
void PendingItemsThreadProc();
|
||||
std::chrono::duration<double> DequeueAndProcessOne(std::unique_lock<std::mutex>& lock);
|
||||
void ProcessPendingItem(const PendingConfigItem& item);
|
||||
void ProcessPendingItem(const PendingDependencyGroupStateItem& item) const;
|
||||
void ProcessPendingItem(const PendingDependencyEdgeItem& item);
|
||||
void ProcessPendingItem(const RelationsDeletionItem& item);
|
||||
|
||||
void EnqueueConfigObject(const ConfigObject::Ptr& object, uint32_t bits);
|
||||
void EnqueueDependencyGroupStateUpdate(const DependencyGroup::Ptr& depGroup);
|
||||
void EnqueueDependencyChildRegistered(const DependencyGroup::Ptr& depGroup, const Checkable::Ptr& child);
|
||||
void EnqueueDependencyChildRemoved(const DependencyGroup::Ptr& depGroup, const std::vector<Dependency::Ptr>& dependencies, bool removeGroup);
|
||||
void EnqueueRelationsDeletion(const String& id, const RelationsKeySet& relations);
|
||||
};
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -195,7 +195,7 @@ struct RedisConnInfo final : SharedObject
|
|||
|
||||
int GetQueryCount(RingBuffer::SizeType span);
|
||||
|
||||
inline int GetPendingQueryCount() const
|
||||
inline std::size_t GetPendingQueryCount() const
|
||||
{
|
||||
return m_PendingQueries;
|
||||
}
|
||||
|
|
|
|||
Loading…
Reference in a new issue