From d364ad981e8eb9f1ba289fa225dc352a0d2328fc Mon Sep 17 00:00:00 2001 From: Yonas Habteab Date: Thu, 30 Oct 2025 18:02:20 +0100 Subject: [PATCH] IcingaDB: enqueue config runtime updates to the worker queue --- lib/icingadb/icingadb-objects.cpp | 158 +++++++++++------------------- lib/icingadb/icingadb.hpp | 12 +-- 2 files changed, 60 insertions(+), 110 deletions(-) diff --git a/lib/icingadb/icingadb-objects.cpp b/lib/icingadb/icingadb-objects.cpp index eaac6e920..6fe7a3196 100644 --- a/lib/icingadb/icingadb-objects.cpp +++ b/lib/icingadb/icingadb-objects.cpp @@ -1322,19 +1322,19 @@ void IcingaDB::InsertCheckableDependencies( * Update the state information of a checkable in Redis. * * What is updated exactly depends on the mode parameter: - * - Volatile: Update the volatile state information stored in icinga:host:state or icinga:service:state as well as - * the corresponding checksum stored in icinga:checksum:host:state or icinga:checksum:service:state. - * - RuntimeOnly: Write a runtime update to the icinga:runtime:state stream. It is up to the caller to ensure that + * - VolatileState: Update the volatile state information stored in icinga:host:state or icinga:service:state as well + * as the corresponding checksum stored in icinga:checksum:host:state or icinga:checksum:service:state. + * - RuntimeState: Write a runtime update to the icinga:runtime:state stream. It is up to the caller to ensure that * identical volatile state information was already written before to avoid inconsistencies. This mode is only * useful to upgrade a previous Volatile to a Full operation, otherwise Full should be used. - * - Full: Perform an update of all state information in Redis, that is updating the volatile information and sending - * a corresponding runtime update so that this state update gets written through to the persistent database by a - * running icingadb process. + * - FullState: Perform an update of all state information in Redis, that is updating the volatile information and + * sending a corresponding runtime update so that this state update gets written through to the persistent database + * by a running icingadb process. * * @param checkable State of this checkable is updated in Redis - * @param mode Mode of operation (StateUpdate::Volatile, StateUpdate::RuntimeOnly, or StateUpdate::Full) + * @param mode Mode of operation (DirtyBits:VolatileState, DirtyBits::RuntimeState, or DirtyBits::FullState) */ -void IcingaDB::UpdateState(const Checkable::Ptr& checkable, StateUpdate mode) +void IcingaDB::UpdateState(const Checkable::Ptr& checkable, uint32_t mode) { if (!m_RconWorker || !m_RconWorker->IsConnected()) return; @@ -1344,7 +1344,7 @@ void IcingaDB::UpdateState(const Checkable::Ptr& checkable, StateUpdate mode) String checksum = HashValue(stateAttrs); auto [redisStateKey, redisChecksumKey] = GetCheckableStateKeys(checkable->GetReflectionType()); - if (mode & StateUpdate::Volatile) { + if (mode & VolatileState) { String objectKey = GetObjectIdentifier(checkable); m_RconWorker->FireAndForgetQueries({ {"HSET", redisStateKey, objectKey, JsonEncode(stateAttrs)}, @@ -1352,7 +1352,7 @@ void IcingaDB::UpdateState(const Checkable::Ptr& checkable, StateUpdate mode) }, Prio::RuntimeStateSync); } - if (mode & StateUpdate::RuntimeOnly) { + if (mode & RuntimeState) { ObjectLock olock(stateAttrs); RedisConnection::Query streamadd({ @@ -1446,28 +1446,6 @@ void IcingaDB::UpdateDependenciesState(const Checkable::Ptr& checkable, const De } } -// Used to update a single object, used for runtime updates -void IcingaDB::SendConfigUpdate(const ConfigObject::Ptr& object, bool runtimeUpdate) -{ - if (!m_RconWorker || !m_RconWorker->IsConnected()) - return; - - std::map hMSets; - std::vector runtimeUpdates; - - CreateConfigUpdate(object, GetSyncableTypeRedisKeys(object->GetReflectionType()), hMSets, runtimeUpdates, runtimeUpdate); - Checkable::Ptr checkable = dynamic_pointer_cast(object); - if (checkable) { - UpdateState(checkable, runtimeUpdate ? StateUpdate::Full : StateUpdate::Volatile); - } - - ExecuteRedisTransaction(m_RconWorker, hMSets, runtimeUpdates); - - if (checkable) { - SendNextUpdate(checkable); - } -} - void IcingaDB::AddObjectDataToRuntimeUpdates(std::vector& runtimeUpdates, String objectKey, String redisKey, const Dictionary::Ptr& data) { @@ -1833,26 +1811,11 @@ void IcingaDB::SendConfigDelete(const ConfigObject::Ptr& object) if (!m_RconWorker || !m_RconWorker->IsConnected()) return; - Type::Ptr type = object->GetReflectionType(); - String objectKey = GetObjectIdentifier(object); - auto redisKeyPair = GetSyncableTypeRedisKeys(type); - - m_RconWorker->FireAndForgetQueries({ - {"HDEL", redisKeyPair.ObjectKey, objectKey}, - {"HDEL", redisKeyPair.ChecksumKey, objectKey}, - { - "XADD", "icinga:runtime", "MAXLEN", "~", "1000000", "*", - "redis_key", redisKeyPair.ObjectKey, "id", objectKey, "runtime_type", "delete" - } - }, Prio::Config); - - CustomVarObject::Ptr customVarObject = dynamic_pointer_cast(object); - - if (customVarObject) { - Dictionary::Ptr vars = customVarObject->GetVars(); - SendCustomVarsChanged(object, vars, nullptr); + if (auto customVarObject = dynamic_pointer_cast(object); customVarObject) { + SendCustomVarsChanged(object, customVarObject->GetVars(), nullptr); } + Type::Ptr type = object->GetReflectionType(); if (type == Host::TypeInstance || type == Service::TypeInstance) { Checkable::Ptr checkable = static_pointer_cast(object); @@ -1860,17 +1823,9 @@ void IcingaDB::SendConfigDelete(const ConfigObject::Ptr& object) Service::Ptr service; tie(host, service) = GetHostService(checkable); - m_RconWorker->FireAndForgetQuery({ - "ZREM", - service ? "icinga:nextupdate:service" : "icinga:nextupdate:host", - GetObjectIdentifier(checkable) - }, Prio::CheckResult); - auto [configStateKey, checksumStateKey] = GetCheckableStateKeys(checkable->GetReflectionType()); - m_RconWorker->FireAndForgetQueries({ - {"HDEL", configStateKey, objectKey}, - {"HDEL", checksumStateKey, objectKey} - }, Prio::RuntimeStateSync); + EnqueueRelationsDeletion(GetObjectIdentifier(checkable), {{configStateKey, checksumStateKey}}); + EnqueueConfigObject(object, ConfigDelete | NextUpdate); // Send also ZREM for next update if (service) { SendGroupsChanged(checkable, service->GetGroups(), nullptr); @@ -1881,6 +1836,8 @@ void IcingaDB::SendConfigDelete(const ConfigObject::Ptr& object) return; } + EnqueueConfigObject(object, ConfigDelete); + if (type == TimePeriod::TypeInstance) { TimePeriod::Ptr timeperiod = static_pointer_cast(object); SendTimePeriodRangesChanged(timeperiod, timeperiod->GetRanges(), nullptr); @@ -1941,7 +1898,7 @@ void IcingaDB::SendStateChange(const ConfigObject::Ptr& object, const CheckResul tie(host, service) = GetHostService(checkable); - UpdateState(checkable, StateUpdate::RuntimeOnly); + EnqueueConfigObject(checkable, RuntimeState); int hard_state{}; if (!cr) { @@ -2122,7 +2079,7 @@ void IcingaDB::SendStartedDowntime(const Downtime::Ptr& downtime) return; } - SendConfigUpdate(downtime, true); + EnqueueConfigObject(downtime, ConfigUpdate); auto checkable (downtime->GetCheckable()); auto triggeredBy (Downtime::GetByName(downtime->GetTriggeredBy())); @@ -2132,7 +2089,7 @@ void IcingaDB::SendStartedDowntime(const Downtime::Ptr& downtime) tie(host, service) = GetHostService(checkable); /* Update checkable state as in_downtime may have changed. */ - UpdateState(checkable, StateUpdate::Full); + EnqueueConfigObject(checkable, FullState); RedisConnection::Query xAdd ({ "XADD", "icinga:history:stream:downtime", "*", @@ -2221,7 +2178,7 @@ void IcingaDB::SendRemovedDowntime(const Downtime::Ptr& downtime) return; /* Update checkable state as in_downtime may have changed. */ - UpdateState(checkable, StateUpdate::Full); + EnqueueConfigObject(checkable, FullState); RedisConnection::Query xAdd ({ "XADD", "icinga:history:stream:downtime", "*", @@ -2309,6 +2266,9 @@ void IcingaDB::SendAddedComment(const Comment::Ptr& comment) Service::Ptr service; tie(host, service) = GetHostService(checkable); + // Update the checkable state to so that the "last_comment_id" is correctly reflected. + EnqueueConfigObject(checkable, FullState); + RedisConnection::Query xAdd ({ "XADD", "icinga:history:stream:comment", "*", "comment_id", GetObjectIdentifier(comment), @@ -2351,7 +2311,6 @@ void IcingaDB::SendAddedComment(const Comment::Ptr& comment) } m_HistoryBulker.ProduceOne(std::move(xAdd)); - UpdateState(checkable, StateUpdate::Full); } void IcingaDB::SendRemovedComment(const Comment::Ptr& comment) @@ -2381,6 +2340,9 @@ void IcingaDB::SendRemovedComment(const Comment::Ptr& comment) Service::Ptr service; tie(host, service) = GetHostService(checkable); + // Update the checkable state to so that the "last_comment_id" is correctly reflected. + EnqueueConfigObject(checkable, FullState); + RedisConnection::Query xAdd ({ "XADD", "icinga:history:stream:comment", "*", "comment_id", GetObjectIdentifier(comment), @@ -2431,7 +2393,6 @@ void IcingaDB::SendRemovedComment(const Comment::Ptr& comment) } m_HistoryBulker.ProduceOne(std::move(xAdd)); - UpdateState(checkable, StateUpdate::Full); } void IcingaDB::SendFlappingChange(const Checkable::Ptr& checkable, double changeTime, double flappingLastChange) @@ -2501,12 +2462,12 @@ void IcingaDB::SendFlappingChange(const Checkable::Ptr& checkable, double change m_HistoryBulker.ProduceOne(std::move(xAdd)); } -void IcingaDB::SendNextUpdate(const Checkable::Ptr& checkable) +void IcingaDB::SendNextUpdate(const Checkable::Ptr& checkable) const { if (!m_RconWorker || !m_RconWorker->IsConnected()) return; - if (checkable->GetEnableActiveChecks()) { + if (checkable->GetEnableActiveChecks() && !checkable->GetExtension("ConfigObjectDeleted")) { m_RconWorker->FireAndForgetQuery( { "ZADD", @@ -2539,7 +2500,7 @@ void IcingaDB::SendAcknowledgementSet(const Checkable::Ptr& checkable, const Str tie(host, service) = GetHostService(checkable); /* Update checkable state as is_acknowledged may have changed. */ - UpdateState(checkable, StateUpdate::Full); + EnqueueConfigObject(checkable, FullState); RedisConnection::Query xAdd ({ "XADD", "icinga:history:stream:acknowledgement", "*", @@ -2597,7 +2558,7 @@ void IcingaDB::SendAcknowledgementCleared(const Checkable::Ptr& checkable, const tie(host, service) = GetHostService(checkable); /* Update checkable state as is_acknowledged may have changed. */ - UpdateState(checkable, StateUpdate::Full); + EnqueueConfigObject(checkable, FullState); RedisConnection::Query xAdd ({ "XADD", "icinga:history:stream:acknowledgement", "*", @@ -2722,8 +2683,10 @@ void IcingaDB::SendNotificationUsersChanged(const Notification::Ptr& notificatio for (const auto& userName : deletedUsers) { String id = HashValue(new Array({m_EnvironmentId, "user", userName, notification->GetName()})); - DeleteRelationship(id, CONFIG_REDIS_KEY_PREFIX "notification:user"); - DeleteRelationship(id, CONFIG_REDIS_KEY_PREFIX "notification:recipient"); + EnqueueRelationsDeletion(id,{ + {CONFIG_REDIS_KEY_PREFIX "notification:user", ""}, + {CONFIG_REDIS_KEY_PREFIX "notification:recipient", ""}, + }); } } @@ -2737,12 +2700,14 @@ void IcingaDB::SendNotificationUserGroupsChanged(const Notification::Ptr& notifi for (const auto& userGroupName : deletedUserGroups) { UserGroup::Ptr userGroup = UserGroup::GetByName(userGroupName); String id = HashValue(new Array({m_EnvironmentId, "usergroup", userGroupName, notification->GetName()})); - DeleteRelationship(id, CONFIG_REDIS_KEY_PREFIX "notification:usergroup"); - DeleteRelationship(id, CONFIG_REDIS_KEY_PREFIX "notification:recipient"); + EnqueueRelationsDeletion(id, { + {CONFIG_REDIS_KEY_PREFIX "notification:usergroup", ""}, + {CONFIG_REDIS_KEY_PREFIX "notification:recipient", ""} + }); for (const User::Ptr& user : userGroup->GetMembers()) { String userId = HashValue(new Array({m_EnvironmentId, "usergroupuser", user->GetName(), userGroupName, notification->GetName()})); - DeleteRelationship(userId, CONFIG_REDIS_KEY_PREFIX "notification:recipient"); + EnqueueRelationsDeletion(userId, {{CONFIG_REDIS_KEY_PREFIX "notification:recipient", ""}}); } } } @@ -2756,7 +2721,7 @@ void IcingaDB::SendTimePeriodRangesChanged(const TimePeriod::Ptr& timeperiod, co for (const auto& rangeKey : deletedKeys) { String id = HashValue(new Array({m_EnvironmentId, rangeKey, oldValues->Get(rangeKey), timeperiod->GetName()})); - DeleteRelationship(id, CONFIG_REDIS_KEY_PREFIX "timeperiod:range"); + EnqueueRelationsDeletion(id, {{CONFIG_REDIS_KEY_PREFIX "timeperiod:range", ""}}); } } @@ -2769,7 +2734,7 @@ void IcingaDB::SendTimePeriodIncludesChanged(const TimePeriod::Ptr& timeperiod, for (const auto& includeName : deletedIncludes) { String id = HashValue(new Array({m_EnvironmentId, includeName, timeperiod->GetName()})); - DeleteRelationship(id, CONFIG_REDIS_KEY_PREFIX "timeperiod:override:include"); + EnqueueRelationsDeletion(id, {{CONFIG_REDIS_KEY_PREFIX "timeperiod:override:include", ""}}); } } @@ -2782,7 +2747,7 @@ void IcingaDB::SendTimePeriodExcludesChanged(const TimePeriod::Ptr& timeperiod, for (const auto& excludeName : deletedExcludes) { String id = HashValue(new Array({m_EnvironmentId, excludeName, timeperiod->GetName()})); - DeleteRelationship(id, CONFIG_REDIS_KEY_PREFIX "timeperiod:override:exclude"); + EnqueueRelationsDeletion(id, {{CONFIG_REDIS_KEY_PREFIX "timeperiod:override:exclude", ""}}); } } @@ -2806,14 +2771,14 @@ void IcingaDB::SendGroupsChanged(const ConfigObject::Ptr& object, const Array::P for (const auto& groupName : deletedGroups) { typename T::Ptr group = ConfigObject::GetObject(groupName); String id = HashValue(new Array({m_EnvironmentId, group->GetName(), object->GetName()})); - DeleteRelationship(id, keyType); + EnqueueRelationsDeletion(id, {{keyType, ""}}); - if (std::is_same::value) { + if constexpr (std::is_same_v) { UserGroup::Ptr userGroup = dynamic_pointer_cast(group); for (const auto& notification : userGroup->GetNotifications()) { String userId = HashValue(new Array({m_EnvironmentId, "usergroupuser", object->GetName(), groupName, notification->GetName()})); - DeleteRelationship(userId, CONFIG_REDIS_KEY_PREFIX "notification:recipient"); + EnqueueRelationsDeletion(userId, {{CONFIG_REDIS_KEY_PREFIX "notification:recipient", ""}}); } } } @@ -2832,7 +2797,7 @@ void IcingaDB::SendCommandEnvChanged( for (const auto& envvarKey : GetDictionaryDeletedKeys(oldValues, newValues)) { String id = HashValue(new Array({m_EnvironmentId, envvarKey, command->GetName()})); - DeleteRelationship(id, cmdRedisKeys.EnvObjectKey, cmdRedisKeys.EnvChecksumKey); + EnqueueRelationsDeletion(id, {{cmdRedisKeys.EnvObjectKey, cmdRedisKeys.EnvChecksumKey}}); } } @@ -2849,7 +2814,7 @@ void IcingaDB::SendCommandArgumentsChanged( for (const auto& argumentKey : GetDictionaryDeletedKeys(oldValues, newValues)) { String id = HashValue(new Array({m_EnvironmentId, argumentKey, command->GetName()})); - DeleteRelationship(id, cmdRedisKeys.ArgObjectKey, cmdRedisKeys.ArgChecksumKey); + EnqueueRelationsDeletion(id, {{cmdRedisKeys.ArgObjectKey, cmdRedisKeys.ArgChecksumKey}}); } } @@ -2869,7 +2834,7 @@ void IcingaDB::SendCustomVarsChanged(const ConfigObject::Ptr& object, const Dict for (const auto& varId : GetDictionaryDeletedKeys(oldVars, newVars)) { String id = HashValue(new Array({m_EnvironmentId, varId, object->GetName()})); - DeleteRelationship(id, customvarKey); + EnqueueRelationsDeletion(id, {{customvarKey, ""}}); } } @@ -3041,7 +3006,6 @@ IcingaDB::UpdateObjectAttrs(const ConfigObject::Ptr& object, int fieldType, typeName = typeNameOverride.ToLower(); return {GetObjectIdentifier(object), JsonEncode(attrs)}; - //m_Rcon->FireAndForgetQuery({"HSET", keyPrefix + typeName, GetObjectIdentifier(object), JsonEncode(attrs)}); } void IcingaDB::StateChangeHandler(const ConfigObject::Ptr& object, const CheckResult::Ptr& cr, StateType type) @@ -3055,7 +3019,7 @@ void IcingaDB::ReachabilityChangeHandler(const std::set& childre { for (const IcingaDB::Ptr& rw : ConfigType::GetObjectsByType()) { for (auto& checkable : children) { - rw->UpdateState(checkable, StateUpdate::Full); + rw->EnqueueConfigObject(checkable, FullState); for (const auto& dependencyGroup : checkable->GetDependencyGroups()) { rw->EnqueueDependencyGroupStateUpdate(dependencyGroup); } @@ -3074,17 +3038,13 @@ void IcingaDB::VersionChangedHandler(const ConfigObject::Ptr& object) } if (object->IsActive()) { - // Create or update the object config for (const IcingaDB::Ptr& rw : ConfigType::GetObjectsByType()) { - if (rw) - rw->SendConfigUpdate(object, true); + // A runtime config change triggers also a full state update as well as next update event. + rw->EnqueueConfigObject(object, ConfigUpdate | FullState | NextUpdate); } - } else if (!object->IsActive() && - object->GetExtension("ConfigObjectDeleted")) { // same as in apilistener-configsync.cpp - // Delete object config + } else if (!object->IsActive() && object->GetExtension("ConfigObjectDeleted")) { // same as in apilistener-configsync.cpp for (const IcingaDB::Ptr& rw : ConfigType::GetObjectsByType()) { - if (rw) - rw->SendConfigDelete(object); + rw->SendConfigDelete(object); } } } @@ -3144,16 +3104,14 @@ void IcingaDB::FlappingChangeHandler(const Checkable::Ptr& checkable, double cha void IcingaDB::NewCheckResultHandler(const Checkable::Ptr& checkable) { for (auto& rw : ConfigType::GetObjectsByType()) { - rw->UpdateState(checkable, StateUpdate::Volatile); - rw->SendNextUpdate(checkable); + rw->EnqueueConfigObject(checkable, VolatileState); } } void IcingaDB::NextCheckUpdatedHandler(const Checkable::Ptr& checkable) { for (auto& rw : ConfigType::GetObjectsByType()) { - rw->UpdateState(checkable, StateUpdate::Volatile); - rw->SendNextUpdate(checkable); + rw->EnqueueConfigObject(checkable, VolatileState | NextUpdate); } } @@ -3186,7 +3144,7 @@ void IcingaDB::DependencyGroupChildRemovedHandler(const DependencyGroup::Ptr& de void IcingaDB::HostProblemChangedHandler(const Service::Ptr& service) { for (auto& rw : ConfigType::GetObjectsByType()) { /* Host state changes affect is_handled and severity of services. */ - rw->UpdateState(service, StateUpdate::Full); + rw->EnqueueConfigObject(service, FullState); } } @@ -3322,7 +3280,7 @@ void IcingaDB::DeleteState(const String& id, RedisConnection::QueryArg redisObjK } /** - * Add the provided data to the Redis HMSETs map. + * Add the provided data to the provided map of HMSET queries. * * @param hMSets The map of HMSETs to add the provided data to. * @param redisKey The Redis key to which the HMSET query should be added. diff --git a/lib/icingadb/icingadb.hpp b/lib/icingadb/icingadb.hpp index d87d71be4..28b0dd232 100644 --- a/lib/icingadb/icingadb.hpp +++ b/lib/icingadb/icingadb.hpp @@ -233,13 +233,6 @@ private: std::mutex m_Mutex; }; - enum StateUpdate - { - Volatile = 1ull << 0, - RuntimeOnly = 1ull << 1, - Full = Volatile | RuntimeOnly, - }; - void OnConnectedHandler(); void PublishStatsTimerHandler(); @@ -253,8 +246,7 @@ private: void InsertObjectDependencies(const ConfigObject::Ptr& object, std::map& hMSets, std::vector& runtimeUpdates, bool runtimeUpdate); void UpdateDependenciesState(const Checkable::Ptr& checkable, const DependencyGroup::Ptr& dependencyGroup) const; - void UpdateState(const Checkable::Ptr& checkable, StateUpdate mode); - void SendConfigUpdate(const ConfigObject::Ptr& object, bool runtimeUpdate); + void UpdateState(const Checkable::Ptr& checkable, uint32_t mode); void CreateConfigUpdate(const ConfigObject::Ptr& object, const QueryArgPair& redisKeyPair, std::map& hMSets, std::vector& runtimeUpdates, bool runtimeUpdate); void SendConfigDelete(const ConfigObject::Ptr& object); @@ -274,7 +266,7 @@ private: void SendAddedComment(const Comment::Ptr& comment); void SendRemovedComment(const Comment::Ptr& comment); void SendFlappingChange(const Checkable::Ptr& checkable, double changeTime, double flappingLastChange); - void SendNextUpdate(const Checkable::Ptr& checkable); + void SendNextUpdate(const Checkable::Ptr& checkable) const; void SendAcknowledgementSet(const Checkable::Ptr& checkable, const String& author, const String& comment, AcknowledgementType type, bool persistent, double changeTime, double expiry); void SendAcknowledgementCleared(const Checkable::Ptr& checkable, const String& removedBy, double changeTime, double ackLastChange); void SendNotificationUsersChanged(const Notification::Ptr& notification, const Array::Ptr& oldValues, const Array::Ptr& newValues);