IcingaDB: use a statically initialized list of syncable types for Redis keys

This commit is contained in:
Yonas Habteab 2026-03-16 08:38:58 +01:00
parent cb872ee5c8
commit fa514e6336
3 changed files with 92 additions and 88 deletions

View file

@ -16,12 +16,9 @@
#include "base/array.hpp"
#include "base/exception.hpp"
#include "base/utility.hpp"
#include "base/object-packer.hpp"
#include "icinga/command.hpp"
#include "icinga/compatutility.hpp"
#include "icinga/customvarobject.hpp"
#include "icinga/dependency.hpp"
#include "icinga/host.hpp"
#include "icinga/service.hpp"
#include "icinga/hostgroup.hpp"
#include "icinga/servicegroup.hpp"
@ -38,7 +35,6 @@
#include <cstdint>
#include <iterator>
#include <map>
#include <memory>
#include <mutex>
#include <set>
#include <utility>
@ -48,8 +44,6 @@ using namespace icinga;
using Prio = RedisConnection::QueryPriority;
std::unordered_set<Type*> IcingaDB::m_IndexedTypes;
INITIALIZE_ONCE(&IcingaDB::ConfigStaticInitialize);
// A list of all types for which we want to sync custom variables, along with their corresponding Redis key.
@ -69,38 +63,58 @@ static constexpr std::pair<const Type::Ptr&, std::string_view> l_CustomVarKeys[]
{EventCommand::TypeInstance, CONFIG_REDIS_KEY_PREFIX "eventcommand:customvar"},
};
std::vector<Type::Ptr> IcingaDB::GetTypes()
// A list of all serializable types that we want to index in Redis, along with their corresponding object and checksum Redis keys.
// They're sorted in the order they should be synced in the initial config sync.
static constexpr std::pair<const Type::Ptr&, IcingaDB::QueryArgPair> l_SyncableTypes[] = {
// First sync the checkables to get their states ASAP.
{Host::TypeInstance, {CONFIG_REDIS_KEY_PREFIX "host", CHECKSUM_REDIS_KEY_PREFIX "host"}},
{Service::TypeInstance, {CONFIG_REDIS_KEY_PREFIX "service", CHECKSUM_REDIS_KEY_PREFIX "service"}},
// Then sync them for similar reasons.
{Downtime::TypeInstance, {CONFIG_REDIS_KEY_PREFIX "downtime", CHECKSUM_REDIS_KEY_PREFIX "downtime"}},
{Comment::TypeInstance, {CONFIG_REDIS_KEY_PREFIX "comment", CHECKSUM_REDIS_KEY_PREFIX "comment"}},
{HostGroup::TypeInstance, {CONFIG_REDIS_KEY_PREFIX "hostgroup", CHECKSUM_REDIS_KEY_PREFIX "hostgroup"}},
{ServiceGroup::TypeInstance, {CONFIG_REDIS_KEY_PREFIX "servicegroup", CHECKSUM_REDIS_KEY_PREFIX "servicegroup"}},
{CheckCommand::TypeInstance, {CONFIG_REDIS_KEY_PREFIX "checkcommand", CHECKSUM_REDIS_KEY_PREFIX "checkcommand"}},
{Endpoint::TypeInstance, {CONFIG_REDIS_KEY_PREFIX "endpoint", CHECKSUM_REDIS_KEY_PREFIX "endpoint"}},
{EventCommand::TypeInstance, {CONFIG_REDIS_KEY_PREFIX "eventcommand", CHECKSUM_REDIS_KEY_PREFIX "eventcommand"}},
{Notification::TypeInstance, {CONFIG_REDIS_KEY_PREFIX "notification", CHECKSUM_REDIS_KEY_PREFIX "notification"}},
{NotificationCommand::TypeInstance, {CONFIG_REDIS_KEY_PREFIX "notificationcommand", CHECKSUM_REDIS_KEY_PREFIX "notificationcommand"}},
{TimePeriod::TypeInstance, {CONFIG_REDIS_KEY_PREFIX "timeperiod", CHECKSUM_REDIS_KEY_PREFIX "timeperiod"}},
{User::TypeInstance, {CONFIG_REDIS_KEY_PREFIX "user", CHECKSUM_REDIS_KEY_PREFIX "user"}},
{UserGroup::TypeInstance, {CONFIG_REDIS_KEY_PREFIX "usergroup", CHECKSUM_REDIS_KEY_PREFIX "usergroup"}},
{Zone::TypeInstance, {CONFIG_REDIS_KEY_PREFIX "zone", CHECKSUM_REDIS_KEY_PREFIX "zone"}},
};
const IcingaDB::QueryArgPair& IcingaDB::GetSyncableTypeRedisKeys(const Type::Ptr& type)
{
// The initial config sync will queue the types in the following order.
return {
// Sync them first to get their states ASAP.
Host::TypeInstance,
Service::TypeInstance,
auto it = std::find_if(std::begin(l_SyncableTypes), std::end(l_SyncableTypes), [&type](const auto& pair) {
return pair.first == type;
});
if (it == std::end(l_SyncableTypes)) {
BOOST_THROW_EXCEPTION(std::invalid_argument{"Could not find Redis keys for type " + type->ToString()});
}
return it->second;
}
// Then sync them for similar reasons.
Downtime::TypeInstance,
Comment::TypeInstance,
HostGroup::TypeInstance,
ServiceGroup::TypeInstance,
CheckCommand::TypeInstance,
Endpoint::TypeInstance,
EventCommand::TypeInstance,
Notification::TypeInstance,
NotificationCommand::TypeInstance,
TimePeriod::TypeInstance,
User::TypeInstance,
UserGroup::TypeInstance,
Zone::TypeInstance
};
/**
* Get all syncable types, along with their corresponding Redis keys as a list of pairs.
*
* The first element of the pair is the type, the second element is another pair containing the Redis
* key for the object and the Redis key for the checksum.
*
* TODO: Once we upgrade to C++20, this should return a std::span instead to avoid the unnecessary copy.
*
* @return A list of all syncable types, along with their corresponding Redis keys.
*/
std::vector<IcingaDB::SyncableTypeInfo> IcingaDB::GetSyncableTypes()
{
return {std::begin(l_SyncableTypes), std::end(l_SyncableTypes)};
}
void IcingaDB::ConfigStaticInitialize()
{
for (auto& type : GetTypes()) {
m_IndexedTypes.emplace(type.get());
}
/* triggered in ProcessCheckResult(), requires UpdateNextCheck() to be called before */
Checkable::OnStateChange.connect([](const Checkable::Ptr& checkable, const CheckResult::Ptr& cr, StateType type, const MessageOrigin::Ptr&) {
IcingaDB::StateChangeHandler(checkable, cr, type);
@ -214,8 +228,6 @@ void IcingaDB::UpdateAllConfigObjects()
WorkQueue upq(25000, Configuration::Concurrency, LogNotice);
upq.SetName("IcingaDB:ConfigDump");
std::vector<Type::Ptr> types = GetTypes();
m_Rcon->SuppressQueryKind(Prio::CheckResult);
m_Rcon->SuppressQueryKind(Prio::RuntimeStateSync);
@ -254,8 +266,11 @@ void IcingaDB::UpdateAllConfigObjects()
m_DumpedGlobals.DependencyGroup.Reset();
});
upq.ParallelFor(types, false, [this](const Type::Ptr& type) {
String lcType = type->GetName().ToLower();
upq.ParallelFor(l_SyncableTypes, false, [this](const SyncableTypeInfo& info) {
// No structured binding is allowed here till C++20 (https://gcc.gnu.org/bugzilla/show_bug.cgi?id=85889).
Type::Ptr type;
QueryArgPair redisKeyPair;
std::tie(type, redisKeyPair) = info;
ConfigType *ctype = dynamic_cast<ConfigType *>(type.get());
if (!ctype)
return;
@ -265,17 +280,16 @@ void IcingaDB::UpdateAllConfigObjects()
DeleteKeys(rcon, GetTypeOverwriteKeys(type), Prio::Config);
WorkQueue upqObjectType(25000, Configuration::Concurrency, LogNotice);
upqObjectType.SetName("IcingaDB:ConfigDump:" + lcType);
upqObjectType.SetName("IcingaDB:ConfigDump:" + type->GetName().ToLower());
std::map<String, String> redisCheckSums;
String configCheckSum = CHECKSUM_REDIS_KEY_PREFIX + lcType;
upqObjectType.Enqueue([&rcon, &configCheckSum, &redisCheckSums]() {
upqObjectType.Enqueue([&rcon, &redisKeyPair, &redisCheckSums]() {
String cursor = "0";
do {
Array::Ptr res = rcon->GetResultOfQuery({
"HSCAN", configCheckSum, cursor, "COUNT", "1000"
"HSCAN", redisKeyPair.ChecksumKey, cursor, "COUNT", "1000"
}, Prio::Config);
AddKvsToMap(res->Get(1), redisCheckSums);
@ -285,10 +299,9 @@ void IcingaDB::UpdateAllConfigObjects()
});
auto objectChunks (ChunkObjects(ctype->GetObjects(), 500));
String configObject = CONFIG_REDIS_KEY_PREFIX + lcType;
// Skimmed away attributes and checksums HMSETs' keys and values by Redis key.
std::map<String, RedisConnection::Queries> ourContentRaw {{configCheckSum, {}}, {configObject, {}}};
std::map<RedisConnection::QueryArg, RedisConnection::Queries> ourContentRaw {{redisKeyPair.ChecksumKey, {}}, {redisKeyPair.ObjectKey, {}}};
std::mutex ourContentMutex;
upqObjectType.ParallelFor(objectChunks, [&](decltype(objectChunks)::const_reference chunk) {
@ -312,7 +325,7 @@ void IcingaDB::UpdateAllConfigObjects()
size_t bulkCounter = 0;
for (const ConfigObject::Ptr& object : chunk) {
if (lcType != GetLowerCaseTypeNameDB(object))
if (type != object->GetReflectionType())
continue;
// If we encounter not yet activated objects, i.e. they are currently being loaded and are about to
@ -325,7 +338,7 @@ void IcingaDB::UpdateAllConfigObjects()
}
std::vector<Dictionary::Ptr> runtimeUpdates;
CreateConfigUpdate(object, lcType, hMSets, runtimeUpdates, false);
CreateConfigUpdate(object, redisKeyPair, hMSets, runtimeUpdates, false);
// Write out inital state for checkables
if (!static_cast<std::string_view>(configStateKey).empty()) {
@ -379,7 +392,7 @@ void IcingaDB::UpdateAllConfigObjects()
}
Log(LogNotice, "IcingaDB")
<< "Dumped " << bulkCounter << " objects of type " << lcType;
<< "Dumped " << bulkCounter << " objects of " << type->ToString();
});
upqObjectType.Join();
@ -392,7 +405,7 @@ void IcingaDB::UpdateAllConfigObjects()
}
}
std::map<String, std::map<String, String>> ourContent;
std::map<RedisConnection::QueryArg, std::map<String, String>> ourContent;
for (auto& source : ourContentRaw) {
auto& dest (ourContent[source.first]);
@ -413,8 +426,8 @@ void IcingaDB::UpdateAllConfigObjects()
upqObjectType.Join();
ourContentRaw.clear();
auto& ourCheckSums (ourContent[configCheckSum]);
auto& ourObjects (ourContent[configObject]);
auto& ourCheckSums (ourContent[redisKeyPair.ChecksumKey]);
auto& ourObjects (ourContent[redisKeyPair.ObjectKey]);
RedisConnection::Query setChecksum, setObject, delChecksum, delObject;
auto redisCurrent (redisCheckSums.begin());
@ -425,8 +438,8 @@ void IcingaDB::UpdateAllConfigObjects()
auto flushSets ([&]() {
auto affectedConfig (setObject.size() / 2u);
setChecksum.insert(setChecksum.begin(), {"HMSET", configCheckSum});
setObject.insert(setObject.begin(), {"HMSET", configObject});
setChecksum.insert(setChecksum.begin(), {"HMSET", redisKeyPair.ChecksumKey});
setObject.insert(setObject.begin(), {"HMSET", redisKeyPair.ObjectKey});
RedisConnection::Queries transaction;
@ -444,8 +457,8 @@ void IcingaDB::UpdateAllConfigObjects()
auto flushDels ([&]() {
auto affectedConfig (delObject.size());
delChecksum.insert(delChecksum.begin(), {"HDEL", configCheckSum});
delObject.insert(delObject.begin(), {"HDEL", configObject});
delChecksum.insert(delChecksum.begin(), {"HDEL", redisKeyPair.ChecksumKey});
delObject.insert(delObject.begin(), {"HDEL", redisKeyPair.ObjectKey});
RedisConnection::Queries transaction;
@ -517,7 +530,9 @@ void IcingaDB::UpdateAllConfigObjects()
flushSets();
}
for (auto& key : GetTypeDumpSignalKeys(type)) {
auto keys = GetTypeOverwriteKeys(type, true);
keys.emplace_back(redisKeyPair.ObjectKey);
for (auto& key : keys) {
rcon->FireAndForgetQuery({"XADD", "icinga:dump", "*", "key", key, "state", "done"}, Prio::Config);
}
rcon->Sync();
@ -648,22 +663,16 @@ std::vector<RedisConnection::QueryArg> IcingaDB::GetTypeOverwriteKeys(const Type
return keys;
}
std::vector<RedisConnection::QueryArg> IcingaDB::GetTypeDumpSignalKeys(const Type::Ptr& type)
{
auto keys = GetTypeOverwriteKeys(type, true);
keys.emplace_back(CONFIG_REDIS_KEY_PREFIX + type->GetName().ToLower());
return keys;
}
template<typename ConfigType>
static ConfigObject::Ptr GetObjectByName(const String& name)
{
return ConfigObject::GetObject<ConfigType>(name);
}
void IcingaDB::InsertObjectDependencies(const ConfigObject::Ptr& object, const String& typeName,
void IcingaDB::InsertObjectDependencies(const ConfigObject::Ptr& object,
std::map<RedisConnection::QueryArg, RedisConnection::Query>& hMSets, std::vector<Dictionary::Ptr>& runtimeUpdates, bool runtimeUpdate)
{
auto typeName = GetLowerCaseTypeNameDB(object);
String objectKey = GetObjectIdentifier(object);
String objectKeyName = typeName + "_id";
@ -1496,12 +1505,10 @@ void IcingaDB::SendConfigUpdate(const ConfigObject::Ptr& object, bool runtimeUpd
if (!m_Rcon || !m_Rcon->IsConnected())
return;
String typeName = GetLowerCaseTypeNameDB(object);
std::map<RedisConnection::QueryArg, RedisConnection::Query> hMSets;
std::vector<Dictionary::Ptr> runtimeUpdates;
CreateConfigUpdate(object, typeName, hMSets, runtimeUpdates, runtimeUpdate);
CreateConfigUpdate(object, GetSyncableTypeRedisKeys(object->GetReflectionType()), hMSets, runtimeUpdates, runtimeUpdate);
Checkable::Ptr checkable = dynamic_pointer_cast<Checkable>(object);
if (checkable) {
UpdateState(checkable, runtimeUpdate ? StateUpdate::Full : StateUpdate::Volatile);
@ -1838,7 +1845,7 @@ bool IcingaDB::PrepareObject(const ConfigObject::Ptr& object, Dictionary::Ptr& a
* icinga:config:object:downtime) need to be prepended. There is nothing to indicate success or failure.
*/
void
IcingaDB::CreateConfigUpdate(const ConfigObject::Ptr& object, const String& typeName,
IcingaDB::CreateConfigUpdate(const ConfigObject::Ptr& object, const QueryArgPair& redisKeyPair,
std::map<RedisConnection::QueryArg, RedisConnection::Query>& hMSets, std::vector<Dictionary::Ptr>& runtimeUpdates, bool runtimeUpdate)
{
/* TODO: This isn't essentially correct as we don't keep track of config objects ourselves. This would avoid duplicated config updates at startup.
@ -1854,11 +1861,11 @@ IcingaDB::CreateConfigUpdate(const ConfigObject::Ptr& object, const String& type
if (!PrepareObject(object, attr))
return;
InsertObjectDependencies(object, typeName, hMSets, runtimeUpdates, runtimeUpdate);
InsertObjectDependencies(object, hMSets, runtimeUpdates, runtimeUpdate);
String objectKey = GetObjectIdentifier(object);
auto& attrs (hMSets[CONFIG_REDIS_KEY_PREFIX + typeName]);
auto& chksms (hMSets[CHECKSUM_REDIS_KEY_PREFIX + typeName]);
auto& attrs (hMSets[redisKeyPair.ObjectKey]);
auto& chksms (hMSets[redisKeyPair.ChecksumKey]);
attrs.emplace_back(objectKey);
attrs.emplace_back(JsonEncode(attr));
@ -1870,7 +1877,7 @@ IcingaDB::CreateConfigUpdate(const ConfigObject::Ptr& object, const String& type
/* Send an update event to subscribers. */
if (runtimeUpdate) {
attr->Set("checksum", checksum);
AddObjectDataToRuntimeUpdates(runtimeUpdates, objectKey, CONFIG_REDIS_KEY_PREFIX + typeName, attr);
AddObjectDataToRuntimeUpdates(runtimeUpdates, objectKey, std::string(redisKeyPair.ObjectKey), attr);
}
}
@ -1880,15 +1887,15 @@ void IcingaDB::SendConfigDelete(const ConfigObject::Ptr& object)
return;
Type::Ptr type = object->GetReflectionType();
String typeName = type->GetName().ToLower();
String objectKey = GetObjectIdentifier(object);
auto redisKeyPair = GetSyncableTypeRedisKeys(type);
m_Rcon->FireAndForgetQueries({
{"HDEL", CONFIG_REDIS_KEY_PREFIX + typeName, objectKey},
{"HDEL", CHECKSUM_REDIS_KEY_PREFIX + typeName, objectKey},
{"HDEL", redisKeyPair.ObjectKey, objectKey},
{"HDEL", redisKeyPair.ChecksumKey, objectKey},
{
"XADD", "icinga:runtime", "MAXLEN", "~", "1000000", "*",
"redis_key", CONFIG_REDIS_KEY_PREFIX + typeName, "id", objectKey, "runtime_type", "delete"
"redis_key", redisKeyPair.ObjectKey, "id", objectKey, "runtime_type", "delete"
}
}, Prio::Config);
@ -2901,7 +2908,8 @@ void IcingaDB::SendCommandArgumentsChanged(
void IcingaDB::SendCustomVarsChanged(const ConfigObject::Ptr& object, const Dictionary::Ptr& oldValues, const Dictionary::Ptr& newValues) {
const auto& type = object->GetReflectionType();
if (m_IndexedTypes.find(type.get()) == m_IndexedTypes.end()) {
std::string_view customvarKey = GetRedisCustomVarKey(type);
if (customvarKey.empty()) {
return;
}
@ -2909,11 +2917,6 @@ void IcingaDB::SendCustomVarsChanged(const ConfigObject::Ptr& object, const Dict
return;
}
std::string_view customvarKey = GetRedisCustomVarKey(type);
if (customvarKey.empty()) {
BOOST_THROW_EXCEPTION(std::invalid_argument("Unsupported customvar object of type: " + type->GetName()));
}
Dictionary::Ptr oldVars = SerializeVars(oldValues);
Dictionary::Ptr newVars = SerializeVars(newValues);
@ -3207,8 +3210,10 @@ void IcingaDB::ReachabilityChangeHandler(const std::set<Checkable::Ptr>& childre
void IcingaDB::VersionChangedHandler(const ConfigObject::Ptr& object)
{
Type::Ptr type = object->GetReflectionType();
if (m_IndexedTypes.find(type.get()) == m_IndexedTypes.end()) {
auto it = std::find_if(std::begin(l_SyncableTypes), std::end(l_SyncableTypes), [&type](const auto& pair) {
return pair.first == type;
});
if (it == std::end(l_SyncableTypes)) {
return;
}

View file

@ -84,7 +84,7 @@ void IcingaDB::Start(bool runtimeCreated)
m_Rcon = new RedisConnection(connInfo);
m_RconLocked.store(m_Rcon);
for (const Type::Ptr& type : GetTypes()) {
for (const auto& [type, _] : GetSyncableTypes()) {
auto ctype (dynamic_cast<ConfigType*>(type.get()));
if (!ctype)
continue;

View file

@ -121,13 +121,13 @@ private:
std::vector<std::vector<intrusive_ptr<ConfigObject>>> ChunkObjects(std::vector<intrusive_ptr<ConfigObject>> objects, size_t chunkSize);
void InsertCheckableDependencies(const Checkable::Ptr& checkable, std::map<RedisConnection::QueryArg, RedisConnection::Query>& hMSets,
std::vector<Dictionary::Ptr>* runtimeUpdates, const DependencyGroup::Ptr& onlyDependencyGroup = nullptr);
void InsertObjectDependencies(const ConfigObject::Ptr& object, const String& typeName,
void InsertObjectDependencies(const ConfigObject::Ptr& object,
std::map<RedisConnection::QueryArg, RedisConnection::Query>& hMSets, std::vector<Dictionary::Ptr>& runtimeUpdates, bool runtimeUpdate);
void UpdateDependenciesState(const Checkable::Ptr& checkable, const DependencyGroup::Ptr& onlyDependencyGroup = nullptr,
std::set<DependencyGroup*>* seenGroups = nullptr) const;
void UpdateState(const Checkable::Ptr& checkable, StateUpdate mode);
void SendConfigUpdate(const ConfigObject::Ptr& object, bool runtimeUpdate);
void CreateConfigUpdate(const ConfigObject::Ptr& object, const String& type,
void CreateConfigUpdate(const ConfigObject::Ptr& object, const QueryArgPair& redisKeyPair,
std::map<RedisConnection::QueryArg, RedisConnection::Query>& hMSets, std::vector<Dictionary::Ptr>& runtimeUpdates, bool runtimeUpdate);
void SendConfigDelete(const ConfigObject::Ptr& object);
void SendStateChange(const ConfigObject::Ptr& object, const CheckResult::Ptr& cr, StateType type);
@ -171,7 +171,6 @@ private:
static Dictionary::Ptr GetStats();
/* utilities */
static std::vector<RedisConnection::QueryArg> GetTypeDumpSignalKeys(const Type::Ptr& type);
static void DeleteKeys(const RedisConnection::Ptr& conn, const std::vector<RedisConnection::QueryArg>& keys, RedisConnection::QueryPriority priority);
static std::vector<RedisConnection::QueryArg> GetTypeOverwriteKeys(const Type::Ptr& type, bool skipChecksums = false);
static void AddDataToHmSets(std::map<RedisConnection::QueryArg, RedisConnection::Query>& hMSets, const RedisConnection::QueryArg& redisKey, const String& id, const Dictionary::Ptr& data);
@ -240,7 +239,9 @@ private:
void ExceptionHandler(boost::exception_ptr exp);
static std::vector<Type::Ptr> GetTypes();
using SyncableTypeInfo = std::pair<const Type::Ptr, QueryArgPair>;
static std::vector<SyncableTypeInfo> GetSyncableTypes();
static const QueryArgPair& GetSyncableTypeRedisKeys(const Type::Ptr& type);
static void InitEnvironmentId();
static void PersistEnvironmentId();
@ -271,8 +272,6 @@ private:
// initialization, the value is read-only and can be accessed without further synchronization.
static String m_EnvironmentId;
static std::mutex m_EnvironmentIdInitMutex;
static std::unordered_set<Type*> m_IndexedTypes;
};
}