RedisConnection: enhance WriteQueueItem & related usages

This commit is contained in:
Yonas Habteab 2025-10-31 12:21:21 +01:00
parent cbb4147055
commit bbb7d0249e
2 changed files with 138 additions and 97 deletions

View file

@ -137,7 +137,7 @@ void RedisConnection::FireAndForgetQuery(Query query, QueryAffects affects, bool
auto item (Shared<Query>::Make(std::move(query)));
asio::post(m_Strand, [this, item, highPriority, affects, ctime = Utility::GetTime()]() {
m_Queues.Push(WriteQueueItem{item, nullptr, nullptr, nullptr, nullptr, ctime, affects}, highPriority);
m_Queues.Push(WriteQueueItem{item, ctime, affects}, highPriority);
m_QueuedWrites.Set();
IncreasePendingQueries(1);
});
@ -160,7 +160,7 @@ void RedisConnection::FireAndForgetQueries(RedisConnection::Queries queries, Que
auto item (Shared<Queries>::Make(std::move(queries)));
asio::post(m_Strand, [this, item, affects, ctime = Utility::GetTime()]() {
m_Queues.Push(WriteQueueItem{nullptr, item, nullptr, nullptr, nullptr, ctime, affects}, false);
m_Queues.Push(WriteQueueItem{item, ctime, affects}, false);
m_QueuedWrites.Set();
IncreasePendingQueries(item->size());
});
@ -185,7 +185,7 @@ RedisConnection::Reply RedisConnection::GetResultOfQuery(RedisConnection::Query
auto item (Shared<std::pair<Query, std::promise<Reply>>>::Make(std::move(query), std::move(promise)));
asio::post(m_Strand, [this, item, affects, ctime = Utility::GetTime()]() {
m_Queues.Push(WriteQueueItem{nullptr, nullptr, item, nullptr, nullptr, ctime, affects}, false);
m_Queues.Push(WriteQueueItem{item, ctime, affects}, false);
m_QueuedWrites.Set();
IncreasePendingQueries(1);
});
@ -216,7 +216,7 @@ RedisConnection::Replies RedisConnection::GetResultsOfQueries(Queries queries, Q
auto item (Shared<std::pair<Queries, std::promise<Replies>>>::Make(std::move(queries), std::move(promise)));
asio::post(m_Strand, [this, item, highPriority, affects, ctime = Utility::GetTime()]() {
m_Queues.Push(WriteQueueItem{nullptr, nullptr, nullptr, item, nullptr, ctime, affects}, highPriority);
m_Queues.Push(WriteQueueItem{item, ctime, affects}, highPriority);
m_QueuedWrites.Set();
IncreasePendingQueries(item->first.size());
});
@ -229,7 +229,7 @@ RedisConnection::Replies RedisConnection::GetResultsOfQueries(Queries queries, Q
void RedisConnection::EnqueueCallback(const std::function<void(boost::asio::yield_context&)>& callback)
{
asio::post(m_Strand, [this, callback, ctime = Utility::GetTime()]() {
m_Queues.Push(WriteQueueItem{nullptr, nullptr, nullptr, nullptr, callback, ctime}, false);
m_Queues.Push(WriteQueueItem{callback, ctime, {}}, false);
m_QueuedWrites.Set();
});
}
@ -450,7 +450,15 @@ void RedisConnection::WriteLoop(asio::yield_context& yc)
m_QueuedWrites.Wait(yc);
while (m_Queues.HasWrites()) {
WriteItem(yc, m_Queues.PopFront());
auto queuedWrite(m_Queues.PopFront());
std::visit(
[this, &yc, &queuedWrite](const auto& item) {
if (WriteItem(item, yc)) {
RecordAffected(queuedWrite.Affects, Utility::GetTime());
}
},
queuedWrite.Item
);
}
m_QueuedWrites.Clear();
@ -493,111 +501,138 @@ void RedisConnection::LogStats(asio::yield_context& yc)
}
/**
* Send next and schedule receiving the response
* Write a single Redis query in a fire-and-forget manner.
*
* @param next Redis queries
* @param item Redis query
*
* @return true on success, false on failure.
*/
void RedisConnection::WriteItem(boost::asio::yield_context& yc, RedisConnection::WriteQueueItem next)
bool RedisConnection::WriteItem(const FireAndForgetQ& item, boost::asio::yield_context& yc)
{
if (next.FireAndForgetQuery) {
auto& item (*next.FireAndForgetQuery);
DecreasePendingQueries(1);
DecreasePendingQueries(1);
try {
WriteOne(item, yc);
} catch (const std::exception& ex) {
Log msg (LogCritical, "IcingaDB", "Error during sending query");
LogQuery(item, msg);
msg << " which has been fired and forgotten: " << ex.what();
try {
WriteOne(*item, yc);
} catch (const std::exception& ex) {
Log msg (LogCritical, "IcingaDB", "Error during sending query");
LogQuery(*item, msg);
msg << " which has been fired and forgotten: " << ex.what();
return;
}
if (m_Queues.FutureResponseActions.empty() || m_Queues.FutureResponseActions.back().Action != ResponseAction::Ignore) {
m_Queues.FutureResponseActions.emplace(FutureResponseAction{1, ResponseAction::Ignore});
} else {
++m_Queues.FutureResponseActions.back().Amount;
}
m_QueuedReads.Set();
return false;
}
if (next.FireAndForgetQueries) {
auto& item (*next.FireAndForgetQueries);
size_t i = 0;
DecreasePendingQueries(item.size());
try {
for (auto& query : item) {
WriteOne(query, yc);
++i;
}
} catch (const std::exception& ex) {
Log msg (LogCritical, "IcingaDB", "Error during sending query");
LogQuery(item[i], msg);
msg << " which has been fired and forgotten: " << ex.what();
return;
}
if (m_Queues.FutureResponseActions.empty() || m_Queues.FutureResponseActions.back().Action != ResponseAction::Ignore) {
m_Queues.FutureResponseActions.emplace(FutureResponseAction{item.size(), ResponseAction::Ignore});
} else {
m_Queues.FutureResponseActions.back().Amount += item.size();
}
m_QueuedReads.Set();
if (m_Queues.FutureResponseActions.empty() || m_Queues.FutureResponseActions.back().Action != ResponseAction::Ignore) {
m_Queues.FutureResponseActions.emplace(FutureResponseAction{1, ResponseAction::Ignore});
} else {
++m_Queues.FutureResponseActions.back().Amount;
}
if (next.GetResultOfQuery) {
auto& item (*next.GetResultOfQuery);
DecreasePendingQueries(1);
m_QueuedReads.Set();
return true;
}
try {
WriteOne(item.first, yc);
} catch (const std::exception&) {
item.second.set_exception(std::current_exception());
/**
* Write multiple Redis queries in a fire-and-forget manner.
*
* @param item Redis queries
*
* @return true on success, false on failure.
*/
bool RedisConnection::WriteItem(const FireAndForgetQs& item, boost::asio::yield_context& yc)
{
size_t i = 0;
return;
DecreasePendingQueries(item->size());
try {
for (auto& query : *item) {
WriteOne(query, yc);
++i;
}
} catch (const std::exception& ex) {
Log msg (LogCritical, "IcingaDB", "Error during sending query");
LogQuery((*item)[i], msg);
msg << " which has been fired and forgotten: " << ex.what();
m_Queues.ReplyPromises.emplace(std::move(item.second));
if (m_Queues.FutureResponseActions.empty() || m_Queues.FutureResponseActions.back().Action != ResponseAction::Deliver) {
m_Queues.FutureResponseActions.emplace(FutureResponseAction{1, ResponseAction::Deliver});
} else {
++m_Queues.FutureResponseActions.back().Amount;
}
m_QueuedReads.Set();
return false;
}
if (next.GetResultsOfQueries) {
auto& item (*next.GetResultsOfQueries);
DecreasePendingQueries(item.first.size());
if (m_Queues.FutureResponseActions.empty() || m_Queues.FutureResponseActions.back().Action != ResponseAction::Ignore) {
m_Queues.FutureResponseActions.emplace(FutureResponseAction{item->size(), ResponseAction::Ignore});
} else {
m_Queues.FutureResponseActions.back().Amount += item->size();
}
try {
for (auto& query : item.first) {
WriteOne(query, yc);
}
} catch (const std::exception&) {
item.second.set_exception(std::current_exception());
m_QueuedReads.Set();
return true;
}
return;
/**
* Write a single Redis query and enqueue a response promise to be fulfilled once the response has been received.
*
* @param item Redis query and promise for the response
*/
bool RedisConnection::WriteItem(const QueryWithPromise& item, boost::asio::yield_context& yc)
{
DecreasePendingQueries(1);
try {
WriteOne(item->first, yc);
} catch (const std::exception&) {
item->second.set_exception(std::current_exception());
return false;
}
m_Queues.ReplyPromises.push(std::move(item->second));
if (m_Queues.FutureResponseActions.empty() || m_Queues.FutureResponseActions.back().Action != ResponseAction::Deliver) {
m_Queues.FutureResponseActions.emplace(FutureResponseAction{1, ResponseAction::Deliver});
} else {
++m_Queues.FutureResponseActions.back().Amount;
}
m_QueuedReads.Set();
return true;
}
/**
* Write multiple Redis queries and enqueue a response promise to be fulfilled once all responses have been received.
*
* @param item Redis queries and promise for the responses.
*
* @return true on success, false on failure.
*/
bool RedisConnection::WriteItem(const QueriesWithPromise& item, boost::asio::yield_context& yc)
{
DecreasePendingQueries(item->first.size());
try {
for (auto& query : item->first) {
WriteOne(query, yc);
}
} catch (const std::exception&) {
item->second.set_exception(std::current_exception());
m_Queues.RepliesPromises.emplace(std::move(item.second));
m_Queues.FutureResponseActions.emplace(FutureResponseAction{item.first.size(), ResponseAction::DeliverBulk});
m_QueuedReads.Set();
return false;
}
if (next.Callback) {
next.Callback(yc);
}
m_Queues.RepliesPromises.emplace(std::move(item->second));
m_Queues.FutureResponseActions.emplace(FutureResponseAction{item->first.size(), ResponseAction::DeliverBulk});
RecordAffected(next.Affects, Utility::GetTime());
m_QueuedReads.Set();
return true;
}
/**
* Invokes the provided callback immediately.
*
* @param item Callback to execute
*/
bool RedisConnection::WriteItem(const QueryCallback& item, boost::asio::yield_context& yc)
{
item(yc);
return true;
}
/**

View file

@ -43,6 +43,7 @@
#include <utility>
#include <variant>
#include <vector>
#include <variant>
namespace icinga
{
@ -219,19 +220,20 @@ struct RedisConnInfo final : SharedObject
ResponseAction Action;
};
using FireAndForgetQ = Shared<Query>::Ptr; // A single query that does not expect a result.
using FireAndForgetQs = Shared<Queries>::Ptr; // Multiple queries that do not expect results.
using QueryWithPromise = Shared<std::pair<Query, std::promise<Reply>>>::Ptr; // A single query expecting a result.
using QueriesWithPromise = Shared<std::pair<Queries, std::promise<Replies>>>::Ptr; // Multiple queries expecting results.
using QueryCallback = std::function<void(boost::asio::yield_context&)>; // A callback to be executed.
/**
* Something to be send to Redis.
* An item in the write queue to be sent to Redis.
*
* @ingroup icingadb
*/
struct WriteQueueItem
{
Shared<Query>::Ptr FireAndForgetQuery;
Shared<Queries>::Ptr FireAndForgetQueries;
Shared<std::pair<Query, std::promise<Reply>>>::Ptr GetResultOfQuery;
Shared<std::pair<Queries, std::promise<Replies>>>::Ptr GetResultsOfQueries;
std::function<void(boost::asio::yield_context&)> Callback;
std::variant<FireAndForgetQ, FireAndForgetQs, QueryWithPromise, QueriesWithPromise, QueryCallback> Item;
double CTime; // When was this item queued?
QueryAffects Affects;
};
@ -261,7 +263,11 @@ struct RedisConnInfo final : SharedObject
void ReadLoop(boost::asio::yield_context& yc);
void WriteLoop(boost::asio::yield_context& yc);
void LogStats(boost::asio::yield_context& yc);
void WriteItem(boost::asio::yield_context& yc, WriteQueueItem item);
bool WriteItem(const FireAndForgetQ& item, boost::asio::yield_context& yc);
bool WriteItem(const FireAndForgetQs& item, boost::asio::yield_context& yc);
bool WriteItem(const QueryWithPromise& item, boost::asio::yield_context& yc);
bool WriteItem(const QueriesWithPromise& item, boost::asio::yield_context& yc);
bool WriteItem(const QueryCallback& item, boost::asio::yield_context& yc);
Reply ReadOne(boost::asio::yield_context& yc);
void WriteOne(Query& query, boost::asio::yield_context& yc);