IcingaDB: enqueue config runtime updates to the worker queue

This commit is contained in:
Yonas Habteab 2025-10-30 18:02:20 +01:00
parent 26d23bd569
commit d364ad981e
2 changed files with 60 additions and 110 deletions

View file

@ -1322,19 +1322,19 @@ void IcingaDB::InsertCheckableDependencies(
* Update the state information of a checkable in Redis.
*
* What is updated exactly depends on the mode parameter:
* - Volatile: Update the volatile state information stored in icinga:host:state or icinga:service:state as well as
* the corresponding checksum stored in icinga:checksum:host:state or icinga:checksum:service:state.
* - RuntimeOnly: Write a runtime update to the icinga:runtime:state stream. It is up to the caller to ensure that
* - VolatileState: Update the volatile state information stored in icinga:host:state or icinga:service:state as well
* as the corresponding checksum stored in icinga:checksum:host:state or icinga:checksum:service:state.
* - RuntimeState: Write a runtime update to the icinga:runtime:state stream. It is up to the caller to ensure that
* identical volatile state information was already written before to avoid inconsistencies. This mode is only
* useful to upgrade a previous Volatile to a Full operation, otherwise Full should be used.
* - Full: Perform an update of all state information in Redis, that is updating the volatile information and sending
* a corresponding runtime update so that this state update gets written through to the persistent database by a
* running icingadb process.
* - FullState: Perform an update of all state information in Redis, that is updating the volatile information and
* sending a corresponding runtime update so that this state update gets written through to the persistent database
* by a running icingadb process.
*
* @param checkable State of this checkable is updated in Redis
* @param mode Mode of operation (StateUpdate::Volatile, StateUpdate::RuntimeOnly, or StateUpdate::Full)
* @param mode Mode of operation (DirtyBits:VolatileState, DirtyBits::RuntimeState, or DirtyBits::FullState)
*/
void IcingaDB::UpdateState(const Checkable::Ptr& checkable, StateUpdate mode)
void IcingaDB::UpdateState(const Checkable::Ptr& checkable, uint32_t mode)
{
if (!m_RconWorker || !m_RconWorker->IsConnected())
return;
@ -1344,7 +1344,7 @@ void IcingaDB::UpdateState(const Checkable::Ptr& checkable, StateUpdate mode)
String checksum = HashValue(stateAttrs);
auto [redisStateKey, redisChecksumKey] = GetCheckableStateKeys(checkable->GetReflectionType());
if (mode & StateUpdate::Volatile) {
if (mode & VolatileState) {
String objectKey = GetObjectIdentifier(checkable);
m_RconWorker->FireAndForgetQueries({
{"HSET", redisStateKey, objectKey, JsonEncode(stateAttrs)},
@ -1352,7 +1352,7 @@ void IcingaDB::UpdateState(const Checkable::Ptr& checkable, StateUpdate mode)
}, Prio::RuntimeStateSync);
}
if (mode & StateUpdate::RuntimeOnly) {
if (mode & RuntimeState) {
ObjectLock olock(stateAttrs);
RedisConnection::Query streamadd({
@ -1446,28 +1446,6 @@ void IcingaDB::UpdateDependenciesState(const Checkable::Ptr& checkable, const De
}
}
// Used to update a single object, used for runtime updates
void IcingaDB::SendConfigUpdate(const ConfigObject::Ptr& object, bool runtimeUpdate)
{
if (!m_RconWorker || !m_RconWorker->IsConnected())
return;
std::map<RedisConnection::QueryArg, RedisConnection::Query> hMSets;
std::vector<Dictionary::Ptr> runtimeUpdates;
CreateConfigUpdate(object, GetSyncableTypeRedisKeys(object->GetReflectionType()), hMSets, runtimeUpdates, runtimeUpdate);
Checkable::Ptr checkable = dynamic_pointer_cast<Checkable>(object);
if (checkable) {
UpdateState(checkable, runtimeUpdate ? StateUpdate::Full : StateUpdate::Volatile);
}
ExecuteRedisTransaction(m_RconWorker, hMSets, runtimeUpdates);
if (checkable) {
SendNextUpdate(checkable);
}
}
void IcingaDB::AddObjectDataToRuntimeUpdates(std::vector<Dictionary::Ptr>& runtimeUpdates,
String objectKey, String redisKey, const Dictionary::Ptr& data)
{
@ -1833,26 +1811,11 @@ void IcingaDB::SendConfigDelete(const ConfigObject::Ptr& object)
if (!m_RconWorker || !m_RconWorker->IsConnected())
return;
Type::Ptr type = object->GetReflectionType();
String objectKey = GetObjectIdentifier(object);
auto redisKeyPair = GetSyncableTypeRedisKeys(type);
m_RconWorker->FireAndForgetQueries({
{"HDEL", redisKeyPair.ObjectKey, objectKey},
{"HDEL", redisKeyPair.ChecksumKey, objectKey},
{
"XADD", "icinga:runtime", "MAXLEN", "~", "1000000", "*",
"redis_key", redisKeyPair.ObjectKey, "id", objectKey, "runtime_type", "delete"
}
}, Prio::Config);
CustomVarObject::Ptr customVarObject = dynamic_pointer_cast<CustomVarObject>(object);
if (customVarObject) {
Dictionary::Ptr vars = customVarObject->GetVars();
SendCustomVarsChanged(object, vars, nullptr);
if (auto customVarObject = dynamic_pointer_cast<CustomVarObject>(object); customVarObject) {
SendCustomVarsChanged(object, customVarObject->GetVars(), nullptr);
}
Type::Ptr type = object->GetReflectionType();
if (type == Host::TypeInstance || type == Service::TypeInstance) {
Checkable::Ptr checkable = static_pointer_cast<Checkable>(object);
@ -1860,17 +1823,9 @@ void IcingaDB::SendConfigDelete(const ConfigObject::Ptr& object)
Service::Ptr service;
tie(host, service) = GetHostService(checkable);
m_RconWorker->FireAndForgetQuery({
"ZREM",
service ? "icinga:nextupdate:service" : "icinga:nextupdate:host",
GetObjectIdentifier(checkable)
}, Prio::CheckResult);
auto [configStateKey, checksumStateKey] = GetCheckableStateKeys(checkable->GetReflectionType());
m_RconWorker->FireAndForgetQueries({
{"HDEL", configStateKey, objectKey},
{"HDEL", checksumStateKey, objectKey}
}, Prio::RuntimeStateSync);
EnqueueRelationsDeletion(GetObjectIdentifier(checkable), {{configStateKey, checksumStateKey}});
EnqueueConfigObject(object, ConfigDelete | NextUpdate); // Send also ZREM for next update
if (service) {
SendGroupsChanged<ServiceGroup>(checkable, service->GetGroups(), nullptr);
@ -1881,6 +1836,8 @@ void IcingaDB::SendConfigDelete(const ConfigObject::Ptr& object)
return;
}
EnqueueConfigObject(object, ConfigDelete);
if (type == TimePeriod::TypeInstance) {
TimePeriod::Ptr timeperiod = static_pointer_cast<TimePeriod>(object);
SendTimePeriodRangesChanged(timeperiod, timeperiod->GetRanges(), nullptr);
@ -1941,7 +1898,7 @@ void IcingaDB::SendStateChange(const ConfigObject::Ptr& object, const CheckResul
tie(host, service) = GetHostService(checkable);
UpdateState(checkable, StateUpdate::RuntimeOnly);
EnqueueConfigObject(checkable, RuntimeState);
int hard_state{};
if (!cr) {
@ -2122,7 +2079,7 @@ void IcingaDB::SendStartedDowntime(const Downtime::Ptr& downtime)
return;
}
SendConfigUpdate(downtime, true);
EnqueueConfigObject(downtime, ConfigUpdate);
auto checkable (downtime->GetCheckable());
auto triggeredBy (Downtime::GetByName(downtime->GetTriggeredBy()));
@ -2132,7 +2089,7 @@ void IcingaDB::SendStartedDowntime(const Downtime::Ptr& downtime)
tie(host, service) = GetHostService(checkable);
/* Update checkable state as in_downtime may have changed. */
UpdateState(checkable, StateUpdate::Full);
EnqueueConfigObject(checkable, FullState);
RedisConnection::Query xAdd ({
"XADD", "icinga:history:stream:downtime", "*",
@ -2221,7 +2178,7 @@ void IcingaDB::SendRemovedDowntime(const Downtime::Ptr& downtime)
return;
/* Update checkable state as in_downtime may have changed. */
UpdateState(checkable, StateUpdate::Full);
EnqueueConfigObject(checkable, FullState);
RedisConnection::Query xAdd ({
"XADD", "icinga:history:stream:downtime", "*",
@ -2309,6 +2266,9 @@ void IcingaDB::SendAddedComment(const Comment::Ptr& comment)
Service::Ptr service;
tie(host, service) = GetHostService(checkable);
// Update the checkable state to so that the "last_comment_id" is correctly reflected.
EnqueueConfigObject(checkable, FullState);
RedisConnection::Query xAdd ({
"XADD", "icinga:history:stream:comment", "*",
"comment_id", GetObjectIdentifier(comment),
@ -2351,7 +2311,6 @@ void IcingaDB::SendAddedComment(const Comment::Ptr& comment)
}
m_HistoryBulker.ProduceOne(std::move(xAdd));
UpdateState(checkable, StateUpdate::Full);
}
void IcingaDB::SendRemovedComment(const Comment::Ptr& comment)
@ -2381,6 +2340,9 @@ void IcingaDB::SendRemovedComment(const Comment::Ptr& comment)
Service::Ptr service;
tie(host, service) = GetHostService(checkable);
// Update the checkable state to so that the "last_comment_id" is correctly reflected.
EnqueueConfigObject(checkable, FullState);
RedisConnection::Query xAdd ({
"XADD", "icinga:history:stream:comment", "*",
"comment_id", GetObjectIdentifier(comment),
@ -2431,7 +2393,6 @@ void IcingaDB::SendRemovedComment(const Comment::Ptr& comment)
}
m_HistoryBulker.ProduceOne(std::move(xAdd));
UpdateState(checkable, StateUpdate::Full);
}
void IcingaDB::SendFlappingChange(const Checkable::Ptr& checkable, double changeTime, double flappingLastChange)
@ -2501,12 +2462,12 @@ void IcingaDB::SendFlappingChange(const Checkable::Ptr& checkable, double change
m_HistoryBulker.ProduceOne(std::move(xAdd));
}
void IcingaDB::SendNextUpdate(const Checkable::Ptr& checkable)
void IcingaDB::SendNextUpdate(const Checkable::Ptr& checkable) const
{
if (!m_RconWorker || !m_RconWorker->IsConnected())
return;
if (checkable->GetEnableActiveChecks()) {
if (checkable->GetEnableActiveChecks() && !checkable->GetExtension("ConfigObjectDeleted")) {
m_RconWorker->FireAndForgetQuery(
{
"ZADD",
@ -2539,7 +2500,7 @@ void IcingaDB::SendAcknowledgementSet(const Checkable::Ptr& checkable, const Str
tie(host, service) = GetHostService(checkable);
/* Update checkable state as is_acknowledged may have changed. */
UpdateState(checkable, StateUpdate::Full);
EnqueueConfigObject(checkable, FullState);
RedisConnection::Query xAdd ({
"XADD", "icinga:history:stream:acknowledgement", "*",
@ -2597,7 +2558,7 @@ void IcingaDB::SendAcknowledgementCleared(const Checkable::Ptr& checkable, const
tie(host, service) = GetHostService(checkable);
/* Update checkable state as is_acknowledged may have changed. */
UpdateState(checkable, StateUpdate::Full);
EnqueueConfigObject(checkable, FullState);
RedisConnection::Query xAdd ({
"XADD", "icinga:history:stream:acknowledgement", "*",
@ -2722,8 +2683,10 @@ void IcingaDB::SendNotificationUsersChanged(const Notification::Ptr& notificatio
for (const auto& userName : deletedUsers) {
String id = HashValue(new Array({m_EnvironmentId, "user", userName, notification->GetName()}));
DeleteRelationship(id, CONFIG_REDIS_KEY_PREFIX "notification:user");
DeleteRelationship(id, CONFIG_REDIS_KEY_PREFIX "notification:recipient");
EnqueueRelationsDeletion(id,{
{CONFIG_REDIS_KEY_PREFIX "notification:user", ""},
{CONFIG_REDIS_KEY_PREFIX "notification:recipient", ""},
});
}
}
@ -2737,12 +2700,14 @@ void IcingaDB::SendNotificationUserGroupsChanged(const Notification::Ptr& notifi
for (const auto& userGroupName : deletedUserGroups) {
UserGroup::Ptr userGroup = UserGroup::GetByName(userGroupName);
String id = HashValue(new Array({m_EnvironmentId, "usergroup", userGroupName, notification->GetName()}));
DeleteRelationship(id, CONFIG_REDIS_KEY_PREFIX "notification:usergroup");
DeleteRelationship(id, CONFIG_REDIS_KEY_PREFIX "notification:recipient");
EnqueueRelationsDeletion(id, {
{CONFIG_REDIS_KEY_PREFIX "notification:usergroup", ""},
{CONFIG_REDIS_KEY_PREFIX "notification:recipient", ""}
});
for (const User::Ptr& user : userGroup->GetMembers()) {
String userId = HashValue(new Array({m_EnvironmentId, "usergroupuser", user->GetName(), userGroupName, notification->GetName()}));
DeleteRelationship(userId, CONFIG_REDIS_KEY_PREFIX "notification:recipient");
EnqueueRelationsDeletion(userId, {{CONFIG_REDIS_KEY_PREFIX "notification:recipient", ""}});
}
}
}
@ -2756,7 +2721,7 @@ void IcingaDB::SendTimePeriodRangesChanged(const TimePeriod::Ptr& timeperiod, co
for (const auto& rangeKey : deletedKeys) {
String id = HashValue(new Array({m_EnvironmentId, rangeKey, oldValues->Get(rangeKey), timeperiod->GetName()}));
DeleteRelationship(id, CONFIG_REDIS_KEY_PREFIX "timeperiod:range");
EnqueueRelationsDeletion(id, {{CONFIG_REDIS_KEY_PREFIX "timeperiod:range", ""}});
}
}
@ -2769,7 +2734,7 @@ void IcingaDB::SendTimePeriodIncludesChanged(const TimePeriod::Ptr& timeperiod,
for (const auto& includeName : deletedIncludes) {
String id = HashValue(new Array({m_EnvironmentId, includeName, timeperiod->GetName()}));
DeleteRelationship(id, CONFIG_REDIS_KEY_PREFIX "timeperiod:override:include");
EnqueueRelationsDeletion(id, {{CONFIG_REDIS_KEY_PREFIX "timeperiod:override:include", ""}});
}
}
@ -2782,7 +2747,7 @@ void IcingaDB::SendTimePeriodExcludesChanged(const TimePeriod::Ptr& timeperiod,
for (const auto& excludeName : deletedExcludes) {
String id = HashValue(new Array({m_EnvironmentId, excludeName, timeperiod->GetName()}));
DeleteRelationship(id, CONFIG_REDIS_KEY_PREFIX "timeperiod:override:exclude");
EnqueueRelationsDeletion(id, {{CONFIG_REDIS_KEY_PREFIX "timeperiod:override:exclude", ""}});
}
}
@ -2806,14 +2771,14 @@ void IcingaDB::SendGroupsChanged(const ConfigObject::Ptr& object, const Array::P
for (const auto& groupName : deletedGroups) {
typename T::Ptr group = ConfigObject::GetObject<T>(groupName);
String id = HashValue(new Array({m_EnvironmentId, group->GetName(), object->GetName()}));
DeleteRelationship(id, keyType);
EnqueueRelationsDeletion(id, {{keyType, ""}});
if (std::is_same<T, UserGroup>::value) {
if constexpr (std::is_same_v<T, UserGroup>) {
UserGroup::Ptr userGroup = dynamic_pointer_cast<UserGroup>(group);
for (const auto& notification : userGroup->GetNotifications()) {
String userId = HashValue(new Array({m_EnvironmentId, "usergroupuser", object->GetName(), groupName, notification->GetName()}));
DeleteRelationship(userId, CONFIG_REDIS_KEY_PREFIX "notification:recipient");
EnqueueRelationsDeletion(userId, {{CONFIG_REDIS_KEY_PREFIX "notification:recipient", ""}});
}
}
}
@ -2832,7 +2797,7 @@ void IcingaDB::SendCommandEnvChanged(
for (const auto& envvarKey : GetDictionaryDeletedKeys(oldValues, newValues)) {
String id = HashValue(new Array({m_EnvironmentId, envvarKey, command->GetName()}));
DeleteRelationship(id, cmdRedisKeys.EnvObjectKey, cmdRedisKeys.EnvChecksumKey);
EnqueueRelationsDeletion(id, {{cmdRedisKeys.EnvObjectKey, cmdRedisKeys.EnvChecksumKey}});
}
}
@ -2849,7 +2814,7 @@ void IcingaDB::SendCommandArgumentsChanged(
for (const auto& argumentKey : GetDictionaryDeletedKeys(oldValues, newValues)) {
String id = HashValue(new Array({m_EnvironmentId, argumentKey, command->GetName()}));
DeleteRelationship(id, cmdRedisKeys.ArgObjectKey, cmdRedisKeys.ArgChecksumKey);
EnqueueRelationsDeletion(id, {{cmdRedisKeys.ArgObjectKey, cmdRedisKeys.ArgChecksumKey}});
}
}
@ -2869,7 +2834,7 @@ void IcingaDB::SendCustomVarsChanged(const ConfigObject::Ptr& object, const Dict
for (const auto& varId : GetDictionaryDeletedKeys(oldVars, newVars)) {
String id = HashValue(new Array({m_EnvironmentId, varId, object->GetName()}));
DeleteRelationship(id, customvarKey);
EnqueueRelationsDeletion(id, {{customvarKey, ""}});
}
}
@ -3041,7 +3006,6 @@ IcingaDB::UpdateObjectAttrs(const ConfigObject::Ptr& object, int fieldType,
typeName = typeNameOverride.ToLower();
return {GetObjectIdentifier(object), JsonEncode(attrs)};
//m_Rcon->FireAndForgetQuery({"HSET", keyPrefix + typeName, GetObjectIdentifier(object), JsonEncode(attrs)});
}
void IcingaDB::StateChangeHandler(const ConfigObject::Ptr& object, const CheckResult::Ptr& cr, StateType type)
@ -3055,7 +3019,7 @@ void IcingaDB::ReachabilityChangeHandler(const std::set<Checkable::Ptr>& childre
{
for (const IcingaDB::Ptr& rw : ConfigType::GetObjectsByType<IcingaDB>()) {
for (auto& checkable : children) {
rw->UpdateState(checkable, StateUpdate::Full);
rw->EnqueueConfigObject(checkable, FullState);
for (const auto& dependencyGroup : checkable->GetDependencyGroups()) {
rw->EnqueueDependencyGroupStateUpdate(dependencyGroup);
}
@ -3074,17 +3038,13 @@ void IcingaDB::VersionChangedHandler(const ConfigObject::Ptr& object)
}
if (object->IsActive()) {
// Create or update the object config
for (const IcingaDB::Ptr& rw : ConfigType::GetObjectsByType<IcingaDB>()) {
if (rw)
rw->SendConfigUpdate(object, true);
// A runtime config change triggers also a full state update as well as next update event.
rw->EnqueueConfigObject(object, ConfigUpdate | FullState | NextUpdate);
}
} else if (!object->IsActive() &&
object->GetExtension("ConfigObjectDeleted")) { // same as in apilistener-configsync.cpp
// Delete object config
} else if (!object->IsActive() && object->GetExtension("ConfigObjectDeleted")) { // same as in apilistener-configsync.cpp
for (const IcingaDB::Ptr& rw : ConfigType::GetObjectsByType<IcingaDB>()) {
if (rw)
rw->SendConfigDelete(object);
rw->SendConfigDelete(object);
}
}
}
@ -3144,16 +3104,14 @@ void IcingaDB::FlappingChangeHandler(const Checkable::Ptr& checkable, double cha
void IcingaDB::NewCheckResultHandler(const Checkable::Ptr& checkable)
{
for (auto& rw : ConfigType::GetObjectsByType<IcingaDB>()) {
rw->UpdateState(checkable, StateUpdate::Volatile);
rw->SendNextUpdate(checkable);
rw->EnqueueConfigObject(checkable, VolatileState);
}
}
void IcingaDB::NextCheckUpdatedHandler(const Checkable::Ptr& checkable)
{
for (auto& rw : ConfigType::GetObjectsByType<IcingaDB>()) {
rw->UpdateState(checkable, StateUpdate::Volatile);
rw->SendNextUpdate(checkable);
rw->EnqueueConfigObject(checkable, VolatileState | NextUpdate);
}
}
@ -3186,7 +3144,7 @@ void IcingaDB::DependencyGroupChildRemovedHandler(const DependencyGroup::Ptr& de
void IcingaDB::HostProblemChangedHandler(const Service::Ptr& service) {
for (auto& rw : ConfigType::GetObjectsByType<IcingaDB>()) {
/* Host state changes affect is_handled and severity of services. */
rw->UpdateState(service, StateUpdate::Full);
rw->EnqueueConfigObject(service, FullState);
}
}
@ -3322,7 +3280,7 @@ void IcingaDB::DeleteState(const String& id, RedisConnection::QueryArg redisObjK
}
/**
* Add the provided data to the Redis HMSETs map.
* Add the provided data to the provided map of HMSET queries.
*
* @param hMSets The map of HMSETs to add the provided data to.
* @param redisKey The Redis key to which the HMSET query should be added.

View file

@ -233,13 +233,6 @@ private:
std::mutex m_Mutex;
};
enum StateUpdate
{
Volatile = 1ull << 0,
RuntimeOnly = 1ull << 1,
Full = Volatile | RuntimeOnly,
};
void OnConnectedHandler();
void PublishStatsTimerHandler();
@ -253,8 +246,7 @@ private:
void InsertObjectDependencies(const ConfigObject::Ptr& object,
std::map<RedisConnection::QueryArg, RedisConnection::Query>& hMSets, std::vector<Dictionary::Ptr>& runtimeUpdates, bool runtimeUpdate);
void UpdateDependenciesState(const Checkable::Ptr& checkable, const DependencyGroup::Ptr& dependencyGroup) const;
void UpdateState(const Checkable::Ptr& checkable, StateUpdate mode);
void SendConfigUpdate(const ConfigObject::Ptr& object, bool runtimeUpdate);
void UpdateState(const Checkable::Ptr& checkable, uint32_t mode);
void CreateConfigUpdate(const ConfigObject::Ptr& object, const QueryArgPair& redisKeyPair,
std::map<RedisConnection::QueryArg, RedisConnection::Query>& hMSets, std::vector<Dictionary::Ptr>& runtimeUpdates, bool runtimeUpdate);
void SendConfigDelete(const ConfigObject::Ptr& object);
@ -274,7 +266,7 @@ private:
void SendAddedComment(const Comment::Ptr& comment);
void SendRemovedComment(const Comment::Ptr& comment);
void SendFlappingChange(const Checkable::Ptr& checkable, double changeTime, double flappingLastChange);
void SendNextUpdate(const Checkable::Ptr& checkable);
void SendNextUpdate(const Checkable::Ptr& checkable) const;
void SendAcknowledgementSet(const Checkable::Ptr& checkable, const String& author, const String& comment, AcknowledgementType type, bool persistent, double changeTime, double expiry);
void SendAcknowledgementCleared(const Checkable::Ptr& checkable, const String& removedBy, double changeTime, double ackLastChange);
void SendNotificationUsersChanged(const Notification::Ptr& notification, const Array::Ptr& oldValues, const Array::Ptr& newValues);