diff --git a/lib/redis/rediswriter.cpp b/lib/redis/rediswriter.cpp index 4f7fb8414..6bad770ab 100644 --- a/lib/redis/rediswriter.cpp +++ b/lib/redis/rediswriter.cpp @@ -22,11 +22,12 @@ #include "remote/eventqueue.hpp" #include "base/json.hpp" #include "base/statsfunction.hpp" +#include using namespace icinga; //TODO Make configurable and figure out a sane default -#define MAX_EVENTS 5000 +#define MAX_EVENTS_DEFAULT 5000 REGISTER_TYPE(RedisWriter); @@ -178,29 +179,20 @@ void RedisWriter::UpdateSubscriptions(void) redisReply *keysReply = reply->element[1]; for (size_t i = 0; i < keysReply->elements; i++) { + if (boost::algorithm::ends_with(keysReply->element[i]->str, ":limit")) + continue; redisReply *keyReply = keysReply->element[i]; VERIFY(keyReply->type == REDIS_REPLY_STRING); - String key = keyReply->str; - - try { - boost::shared_ptr redisReply = ExecuteQuery({ "SMEMBERS", key }); - VERIFY(redisReply->type == REDIS_REPLY_ARRAY); - - RedisSubscriptionInfo rsi; - - for (size_t j = 0; j < redisReply->elements; j++) { - rsi.EventTypes.insert(redisReply->element[j]->str); - } + RedisSubscriptionInfo rsi; + String key = keysReply->element[i]->str; + if (!RedisWriter::GetSubscriptionTypes(key, rsi)) Log(LogInformation, "RedisWriter") - << "Subscriber Info - Key: " << key << " Value: " << Value(Array::FromSet(rsi.EventTypes)); - + << "Subscription \"" << key<< "\" has no types listed."; + else m_Subscriptions[key.SubStr(keyPrefix.GetLength())] = rsi; - } catch (const std::exception& ex) { - Log(LogWarning, "RedisWriter") - << "Invalid Redis subscriber info for subscriber '" << key << "': " << DiagnosticInformation(ex); - } + } } while (cursor != 0); @@ -208,6 +200,25 @@ void RedisWriter::UpdateSubscriptions(void) << "Current Redis event subscriptions: " << m_Subscriptions.size(); } +int RedisWriter::GetSubscriptionTypes(String key, RedisSubscriptionInfo& rsi) +{ + try { + boost::shared_ptr redisReply = ExecuteQuery({ "SMEMBERS", key }); + VERIFY(redisReply->type == REDIS_REPLY_ARRAY); + + for (size_t j = 0; j < redisReply->elements; j++) { + rsi.EventTypes.insert(redisReply->element[j]->str); + } + + Log(LogInformation, "RedisWriter") + << "Subscriber Info - Key: " << key << " Value: " << Value(Array::FromSet(rsi.EventTypes)); + + } catch (const std::exception& ex) { + Log(LogWarning, "RedisWriter") + << "Invalid Redis subscriber info for subscriber '" << key << "': " << DiagnosticInformation(ex); + } +} + void RedisWriter::PublishStatsTimerHandler(void) { m_WorkQueue.Enqueue(boost::bind(&RedisWriter::PublishStats, this)); @@ -282,9 +293,19 @@ void RedisWriter::HandleEvent(const Dictionary::Ptr& event) String body = JsonEncode(event); + boost::shared_ptr maxExists = ExecuteQuery({ "EXISTS", "icinga:subscription:" + name + ":limit" }); + long maxEvents = MAX_EVENTS_DEFAULT; + if (maxExists->integer) { + boost::shared_ptr redisReply = ExecuteQuery({ "GET", "icinga:subscription:" + name + ":limit"}); + VERIFY(redisReply->type == REDIS_REPLY_STRING); + Log(LogInformation, "RedisWriter") + << "Got limit " << redisReply->str << " for " << name; + maxEvents = Convert::ToLong(redisReply->str); + } + ExecuteQuery({ "MULTI" }); ExecuteQuery({ "LPUSH", "icinga:event:" + name, body }); - ExecuteQuery({ "LTRIM", "icinga:event:" + name, "0", String(MAX_EVENTS - 1)}); + ExecuteQuery({ "LTRIM", "icinga:event:" + name, "0", String(maxEvents - 1)}); ExecuteQuery({ "EXEC" }); } } diff --git a/lib/redis/rediswriter.hpp b/lib/redis/rediswriter.hpp index 39bd6fc4b..24c47f9dd 100644 --- a/lib/redis/rediswriter.hpp +++ b/lib/redis/rediswriter.hpp @@ -59,6 +59,7 @@ private: void UpdateSubscriptionsTimerHandler(void); void UpdateSubscriptions(void); + int GetSubscriptionTypes(String key, RedisSubscriptionInfo& rsi); void PublishStatsTimerHandler(void); void PublishStats(void);