mirror of
https://github.com/Icinga/icinga2.git
synced 2026-02-18 18:19:13 -05:00
Merge pull request #10460 from Icinga/apilistener-shutdown-conns-on-stop
Deterministically shut down all connections on ApiListener::Stop()
This commit is contained in:
commit
dcef22e125
43 changed files with 220 additions and 25 deletions
|
|
@ -26,6 +26,11 @@ void StoppableWaitGroup::unlock_shared()
|
|||
}
|
||||
}
|
||||
|
||||
bool StoppableWaitGroup::IsLockable() const
|
||||
{
|
||||
return !m_Stopped.load(std::memory_order_relaxed);
|
||||
}
|
||||
|
||||
/**
|
||||
* Disallow new shared locks, wait for all existing ones.
|
||||
*/
|
||||
|
|
@ -33,6 +38,6 @@ void StoppableWaitGroup::Join()
|
|||
{
|
||||
std::unique_lock lock (m_Mutex);
|
||||
|
||||
m_Stopped = true;
|
||||
m_Stopped.store(true, std::memory_order_relaxed);
|
||||
m_CV.wait(lock, [this] { return !m_SharedLocks; });
|
||||
}
|
||||
|
|
|
|||
|
|
@ -3,6 +3,7 @@
|
|||
#pragma once
|
||||
|
||||
#include "base/object.hpp"
|
||||
#include "base/atomic.hpp"
|
||||
#include <condition_variable>
|
||||
#include <cstdint>
|
||||
#include <mutex>
|
||||
|
|
@ -22,6 +23,8 @@ public:
|
|||
|
||||
virtual bool try_lock_shared() = 0;
|
||||
virtual void unlock_shared() = 0;
|
||||
|
||||
virtual bool IsLockable() const = 0;
|
||||
};
|
||||
|
||||
/**
|
||||
|
|
@ -42,13 +45,16 @@ public:
|
|||
|
||||
bool try_lock_shared() override;
|
||||
void unlock_shared() override;
|
||||
|
||||
bool IsLockable() const override;
|
||||
|
||||
void Join();
|
||||
|
||||
private:
|
||||
std::mutex m_Mutex;
|
||||
std::condition_variable m_CV;
|
||||
uint_fast32_t m_SharedLocks = 0;
|
||||
bool m_Stopped = false;
|
||||
Atomic<bool> m_Stopped = false;
|
||||
};
|
||||
|
||||
}
|
||||
|
|
|
|||
|
|
@ -16,6 +16,7 @@ thread_local ApiUser::Ptr ActionsHandler::AuthenticatedApiUser;
|
|||
REGISTER_URLHANDLER("/v1/actions", ActionsHandler);
|
||||
|
||||
bool ActionsHandler::HandleRequest(
|
||||
const WaitGroup::Ptr& waitGroup,
|
||||
AsioTlsStream& stream,
|
||||
const ApiUser::Ptr& user,
|
||||
boost::beast::http::request<boost::beast::http::string_body>& request,
|
||||
|
|
@ -88,7 +89,28 @@ bool ActionsHandler::HandleRequest(
|
|||
if (params)
|
||||
verbose = HttpUtility::GetLastParameter(params, "verbose");
|
||||
|
||||
std::shared_lock wgLock{*waitGroup, std::try_to_lock};
|
||||
if (!wgLock) {
|
||||
HttpUtility::SendJsonError(response, params, 503, "Shutting down.");
|
||||
return true;
|
||||
}
|
||||
|
||||
for (ConfigObject::Ptr obj : objs) {
|
||||
if (!waitGroup->IsLockable()) {
|
||||
if (wgLock) {
|
||||
wgLock.unlock();
|
||||
}
|
||||
|
||||
results.emplace_back(new Dictionary({
|
||||
{ "type", obj->GetReflectionType()->GetName() },
|
||||
{ "name", obj->GetName() },
|
||||
{ "code", 503 },
|
||||
{ "status", "Action skipped: Shutting down."}
|
||||
}));
|
||||
|
||||
continue;
|
||||
}
|
||||
|
||||
try {
|
||||
results.emplace_back(action->Invoke(obj, params));
|
||||
} catch (const std::exception& ex) {
|
||||
|
|
|
|||
|
|
@ -16,6 +16,7 @@ public:
|
|||
static thread_local ApiUser::Ptr AuthenticatedApiUser;
|
||||
|
||||
bool HandleRequest(
|
||||
const WaitGroup::Ptr& waitGroup,
|
||||
AsioTlsStream& stream,
|
||||
const ApiUser::Ptr& user,
|
||||
boost::beast::http::request<boost::beast::http::string_body>& request,
|
||||
|
|
|
|||
|
|
@ -368,7 +368,12 @@ void ApiListener::Stop(bool runtimeDeleted)
|
|||
m_Timer->Stop(true);
|
||||
m_RenewOwnCertTimer->Stop(true);
|
||||
|
||||
StopListener();
|
||||
|
||||
DisconnectJsonRpcConnections();
|
||||
|
||||
m_WaitGroup->Join();
|
||||
|
||||
ObjectImpl<ApiListener>::Stop(runtimeDeleted);
|
||||
|
||||
Log(LogInformation, "ApiListener")
|
||||
|
|
@ -486,13 +491,38 @@ bool ApiListener::AddListener(const String& node, const String& service)
|
|||
Log(LogInformation, "ApiListener")
|
||||
<< "Started new listener on '[" << localEndpoint.address() << "]:" << localEndpoint.port() << "'";
|
||||
|
||||
IoEngine::SpawnCoroutine(io, [this, acceptor](asio::yield_context yc) { ListenerCoroutineProc(yc, acceptor); });
|
||||
auto strand = Shared<asio::io_context::strand>::Make(io);
|
||||
|
||||
boost::signals2::scoped_connection closeSignal = m_OnListenerShutdown.connect([strand, acceptor]() {
|
||||
boost::asio::post(*strand, [acceptor] {
|
||||
try {
|
||||
acceptor->close();
|
||||
} catch (const std::exception& ex) {
|
||||
Log(LogCritical, "ApiListener")
|
||||
<< "Failed to close acceptor socket: " << ex.what();
|
||||
}
|
||||
});
|
||||
});
|
||||
|
||||
IoEngine::SpawnCoroutine(*strand, [this, acceptor, closeSignal = std::move(closeSignal)](asio::yield_context yc) {
|
||||
ListenerCoroutineProc(yc, acceptor);
|
||||
});
|
||||
|
||||
UpdateStatusFile(localEndpoint);
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
/**
|
||||
* Stops the listener(s).
|
||||
*/
|
||||
void ApiListener::StopListener()
|
||||
{
|
||||
m_OnListenerShutdown();
|
||||
|
||||
m_ListenerWaitGroup->Join();
|
||||
}
|
||||
|
||||
void ApiListener::ListenerCoroutineProc(boost::asio::yield_context yc, const Shared<boost::asio::ip::tcp::acceptor>::Ptr& server)
|
||||
{
|
||||
namespace asio = boost::asio;
|
||||
|
|
@ -506,7 +536,14 @@ void ApiListener::ListenerCoroutineProc(boost::asio::yield_context yc, const Sha
|
|||
lastModified = Utility::GetFileCreationTime(crlPath);
|
||||
}
|
||||
|
||||
for (;;) {
|
||||
std::shared_lock wgLock(*m_ListenerWaitGroup, std::try_to_lock);
|
||||
if (!wgLock) {
|
||||
Log(LogCritical, "ApiListener")
|
||||
<< "Could not lock the listener wait group.";
|
||||
return;
|
||||
}
|
||||
|
||||
while (server->is_open()) {
|
||||
try {
|
||||
asio::ip::tcp::socket socket (io);
|
||||
|
||||
|
|
@ -546,6 +583,12 @@ void ApiListener::ListenerCoroutineProc(boost::asio::yield_context yc, const Sha
|
|||
NewClientHandler(yc, strand, sslConn, String(), RoleServer);
|
||||
});
|
||||
} catch (const std::exception& ex) {
|
||||
auto se (dynamic_cast<const boost::system::system_error*>(&ex));
|
||||
|
||||
if (se && se->code() == boost::asio::error::operation_aborted) {
|
||||
return;
|
||||
}
|
||||
|
||||
Log(LogCritical, "ApiListener")
|
||||
<< "Cannot accept new connection: " << ex.what();
|
||||
}
|
||||
|
|
@ -828,6 +871,11 @@ void ApiListener::NewClientHandlerInternal(
|
|||
throw;
|
||||
}
|
||||
|
||||
std::shared_lock wgLock(*m_ListenerWaitGroup, std::try_to_lock);
|
||||
if (!wgLock) {
|
||||
return;
|
||||
}
|
||||
|
||||
if (ctype == ClientJsonRpc) {
|
||||
Log(LogNotice, "ApiListener", "New JSON-RPC client");
|
||||
|
||||
|
|
@ -846,7 +894,7 @@ void ApiListener::NewClientHandlerInternal(
|
|||
return;
|
||||
}
|
||||
|
||||
JsonRpcConnection::Ptr aclient = new JsonRpcConnection(identity, verify_ok, client, role);
|
||||
JsonRpcConnection::Ptr aclient = new JsonRpcConnection(m_WaitGroup, identity, verify_ok, client, role);
|
||||
|
||||
if (endpoint) {
|
||||
endpoint->AddClient(aclient);
|
||||
|
|
@ -869,7 +917,7 @@ void ApiListener::NewClientHandlerInternal(
|
|||
} else {
|
||||
Log(LogNotice, "ApiListener", "New HTTP client");
|
||||
|
||||
HttpServerConnection::Ptr aclient = new HttpServerConnection(identity, verify_ok, client);
|
||||
HttpServerConnection::Ptr aclient = new HttpServerConnection(m_WaitGroup, identity, verify_ok, client);
|
||||
AddHttpClient(aclient);
|
||||
aclient->Start();
|
||||
shutdownSslConn.Cancel();
|
||||
|
|
@ -1760,6 +1808,20 @@ std::set<JsonRpcConnection::Ptr> ApiListener::GetAnonymousClients() const
|
|||
return m_AnonymousClients;
|
||||
}
|
||||
|
||||
void ApiListener::DisconnectJsonRpcConnections()
|
||||
{
|
||||
for (auto endpoint : ConfigType::GetObjectsByType<Endpoint>()) {
|
||||
for (const auto& client : endpoint->GetClients()) {
|
||||
client->Disconnect();
|
||||
}
|
||||
}
|
||||
|
||||
std::unique_lock lock(m_AnonymousClientsLock);
|
||||
for (const auto & client : m_AnonymousClients){
|
||||
client->Disconnect();
|
||||
}
|
||||
}
|
||||
|
||||
void ApiListener::AddHttpClient(const HttpServerConnection::Ptr& aclient)
|
||||
{
|
||||
std::unique_lock<std::mutex> lock(m_HttpClientsLock);
|
||||
|
|
|
|||
|
|
@ -114,6 +114,7 @@ public:
|
|||
bool AddAnonymousClient(const JsonRpcConnection::Ptr& aclient);
|
||||
void RemoveAnonymousClient(const JsonRpcConnection::Ptr& aclient);
|
||||
std::set<JsonRpcConnection::Ptr> GetAnonymousClients() const;
|
||||
void DisconnectJsonRpcConnections();
|
||||
|
||||
void AddHttpClient(const HttpServerConnection::Ptr& aclient);
|
||||
void RemoveHttpClient(const HttpServerConnection::Ptr& aclient);
|
||||
|
|
@ -191,12 +192,16 @@ private:
|
|||
static ApiListener::Ptr m_Instance;
|
||||
static std::atomic<bool> m_UpdatedObjectAuthority;
|
||||
|
||||
boost::signals2::signal<void()> m_OnListenerShutdown;
|
||||
StoppableWaitGroup::Ptr m_ListenerWaitGroup = new StoppableWaitGroup();
|
||||
|
||||
void ApiTimerHandler();
|
||||
void ApiReconnectTimerHandler();
|
||||
void CleanupCertificateRequestsTimerHandler();
|
||||
void CheckApiPackageIntegrity();
|
||||
|
||||
bool AddListener(const String& node, const String& service);
|
||||
void StopListener();
|
||||
void AddConnection(const Endpoint::Ptr& endpoint);
|
||||
|
||||
void NewClientHandler(
|
||||
|
|
|
|||
|
|
@ -14,6 +14,7 @@ using namespace icinga;
|
|||
REGISTER_URLHANDLER("/v1/config/files", ConfigFilesHandler);
|
||||
|
||||
bool ConfigFilesHandler::HandleRequest(
|
||||
const WaitGroup::Ptr&,
|
||||
AsioTlsStream& stream,
|
||||
const ApiUser::Ptr& user,
|
||||
boost::beast::http::request<boost::beast::http::string_body>& request,
|
||||
|
|
|
|||
|
|
@ -14,6 +14,7 @@ public:
|
|||
DECLARE_PTR_TYPEDEFS(ConfigFilesHandler);
|
||||
|
||||
bool HandleRequest(
|
||||
const WaitGroup::Ptr& waitGroup,
|
||||
AsioTlsStream& stream,
|
||||
const ApiUser::Ptr& user,
|
||||
boost::beast::http::request<boost::beast::http::string_body>& request,
|
||||
|
|
|
|||
|
|
@ -11,6 +11,7 @@ using namespace icinga;
|
|||
REGISTER_URLHANDLER("/v1/config/packages", ConfigPackagesHandler);
|
||||
|
||||
bool ConfigPackagesHandler::HandleRequest(
|
||||
const WaitGroup::Ptr&,
|
||||
AsioTlsStream& stream,
|
||||
const ApiUser::Ptr& user,
|
||||
boost::beast::http::request<boost::beast::http::string_body>& request,
|
||||
|
|
|
|||
|
|
@ -14,6 +14,7 @@ public:
|
|||
DECLARE_PTR_TYPEDEFS(ConfigPackagesHandler);
|
||||
|
||||
bool HandleRequest(
|
||||
const WaitGroup::Ptr& waitGroup,
|
||||
AsioTlsStream& stream,
|
||||
const ApiUser::Ptr& user,
|
||||
boost::beast::http::request<boost::beast::http::string_body>& request,
|
||||
|
|
|
|||
|
|
@ -15,6 +15,7 @@ REGISTER_URLHANDLER("/v1/config/stages", ConfigStagesHandler);
|
|||
std::atomic<bool> ConfigStagesHandler::m_RunningPackageUpdates (false);
|
||||
|
||||
bool ConfigStagesHandler::HandleRequest(
|
||||
const WaitGroup::Ptr&,
|
||||
AsioTlsStream& stream,
|
||||
const ApiUser::Ptr& user,
|
||||
boost::beast::http::request<boost::beast::http::string_body>& request,
|
||||
|
|
|
|||
|
|
@ -15,6 +15,7 @@ public:
|
|||
DECLARE_PTR_TYPEDEFS(ConfigStagesHandler);
|
||||
|
||||
bool HandleRequest(
|
||||
const WaitGroup::Ptr& waitGroup,
|
||||
AsioTlsStream& stream,
|
||||
const ApiUser::Ptr& user,
|
||||
boost::beast::http::request<boost::beast::http::string_body>& request,
|
||||
|
|
|
|||
|
|
@ -54,6 +54,7 @@ static void EnsureFrameCleanupTimer()
|
|||
}
|
||||
|
||||
bool ConsoleHandler::HandleRequest(
|
||||
const WaitGroup::Ptr&,
|
||||
AsioTlsStream& stream,
|
||||
const ApiUser::Ptr& user,
|
||||
boost::beast::http::request<boost::beast::http::string_body>& request,
|
||||
|
|
|
|||
|
|
@ -23,6 +23,7 @@ public:
|
|||
DECLARE_PTR_TYPEDEFS(ConsoleHandler);
|
||||
|
||||
bool HandleRequest(
|
||||
const WaitGroup::Ptr& waitGroup,
|
||||
AsioTlsStream& stream,
|
||||
const ApiUser::Ptr& user,
|
||||
boost::beast::http::request<boost::beast::http::string_body>& request,
|
||||
|
|
|
|||
|
|
@ -16,6 +16,7 @@ using namespace icinga;
|
|||
REGISTER_URLHANDLER("/v1/objects", CreateObjectHandler);
|
||||
|
||||
bool CreateObjectHandler::HandleRequest(
|
||||
const WaitGroup::Ptr& waitGroup,
|
||||
AsioTlsStream& stream,
|
||||
const ApiUser::Ptr& user,
|
||||
boost::beast::http::request<boost::beast::http::string_body>& request,
|
||||
|
|
@ -102,6 +103,12 @@ bool CreateObjectHandler::HandleRequest(
|
|||
return true;
|
||||
}
|
||||
|
||||
std::shared_lock wgLock{*waitGroup, std::try_to_lock};
|
||||
if (!wgLock) {
|
||||
HttpUtility::SendJsonError(response, params, 503, "Shutting down.");
|
||||
return true;
|
||||
}
|
||||
|
||||
/* Object creation can cause multiple errors and optionally diagnostic information.
|
||||
* We can't use SendJsonError() here.
|
||||
*/
|
||||
|
|
|
|||
|
|
@ -14,6 +14,7 @@ public:
|
|||
DECLARE_PTR_TYPEDEFS(CreateObjectHandler);
|
||||
|
||||
bool HandleRequest(
|
||||
const WaitGroup::Ptr& waitGroup,
|
||||
AsioTlsStream& stream,
|
||||
const ApiUser::Ptr& user,
|
||||
boost::beast::http::request<boost::beast::http::string_body>& request,
|
||||
|
|
|
|||
|
|
@ -16,6 +16,7 @@ using namespace icinga;
|
|||
REGISTER_URLHANDLER("/v1/objects", DeleteObjectHandler);
|
||||
|
||||
bool DeleteObjectHandler::HandleRequest(
|
||||
const WaitGroup::Ptr& waitGroup,
|
||||
AsioTlsStream& stream,
|
||||
const ApiUser::Ptr& user,
|
||||
boost::beast::http::request<boost::beast::http::string_body>& request,
|
||||
|
|
@ -78,7 +79,30 @@ bool DeleteObjectHandler::HandleRequest(
|
|||
|
||||
bool success = true;
|
||||
|
||||
std::shared_lock wgLock{*waitGroup, std::try_to_lock};
|
||||
if (!wgLock) {
|
||||
HttpUtility::SendJsonError(response, params, 503, "Shutting down.");
|
||||
return true;
|
||||
}
|
||||
|
||||
for (ConfigObject::Ptr obj : objs) {
|
||||
if (!waitGroup->IsLockable()) {
|
||||
if (wgLock) {
|
||||
wgLock.unlock();
|
||||
}
|
||||
|
||||
results.emplace_back(new Dictionary({
|
||||
{ "type", type->GetName() },
|
||||
{ "name", obj->GetName() },
|
||||
{ "code", 503 },
|
||||
{ "status", "Action skipped: Shutting down."}
|
||||
}));
|
||||
|
||||
success = false;
|
||||
|
||||
continue;
|
||||
}
|
||||
|
||||
int code;
|
||||
String status;
|
||||
Array::Ptr errors = new Array();
|
||||
|
|
|
|||
|
|
@ -14,6 +14,7 @@ public:
|
|||
DECLARE_PTR_TYPEDEFS(DeleteObjectHandler);
|
||||
|
||||
bool HandleRequest(
|
||||
const WaitGroup::Ptr& waitGroup,
|
||||
AsioTlsStream& stream,
|
||||
const ApiUser::Ptr& user,
|
||||
boost::beast::http::request<boost::beast::http::string_body>& request,
|
||||
|
|
|
|||
|
|
@ -60,7 +60,7 @@ void Endpoint::RemoveClient(const JsonRpcConnection::Ptr& client)
|
|||
std::unique_lock<std::mutex> lock(m_ClientsLock);
|
||||
m_Clients.erase(client);
|
||||
|
||||
Log(LogWarning, "ApiListener")
|
||||
Log(LogInformation, "ApiListener")
|
||||
<< "Removing API client for endpoint '" << GetName() << "'. " << m_Clients.size() << " API clients left.";
|
||||
|
||||
SetConnecting(false);
|
||||
|
|
|
|||
|
|
@ -40,6 +40,7 @@ const std::map<String, EventType> l_EventTypes ({
|
|||
const String l_ApiQuery ("<API query>");
|
||||
|
||||
bool EventsHandler::HandleRequest(
|
||||
const WaitGroup::Ptr&,
|
||||
AsioTlsStream& stream,
|
||||
const ApiUser::Ptr& user,
|
||||
boost::beast::http::request<boost::beast::http::string_body>& request,
|
||||
|
|
|
|||
|
|
@ -15,6 +15,7 @@ public:
|
|||
DECLARE_PTR_TYPEDEFS(EventsHandler);
|
||||
|
||||
bool HandleRequest(
|
||||
const WaitGroup::Ptr& waitGroup,
|
||||
AsioTlsStream& stream,
|
||||
const ApiUser::Ptr& user,
|
||||
boost::beast::http::request<boost::beast::http::string_body>& request,
|
||||
|
|
|
|||
|
|
@ -47,6 +47,7 @@ void HttpHandler::Register(const Url::Ptr& url, const HttpHandler::Ptr& handler)
|
|||
}
|
||||
|
||||
void HttpHandler::ProcessRequest(
|
||||
const WaitGroup::Ptr& waitGroup,
|
||||
AsioTlsStream& stream,
|
||||
const ApiUser::Ptr& user,
|
||||
boost::beast::http::request<boost::beast::http::string_body>& request,
|
||||
|
|
@ -108,7 +109,7 @@ void HttpHandler::ProcessRequest(
|
|||
*/
|
||||
try {
|
||||
for (const HttpHandler::Ptr& handler : handlers) {
|
||||
if (handler->HandleRequest(stream, user, request, url, response, params, yc, server)) {
|
||||
if (handler->HandleRequest(waitGroup, stream, user, request, url, response, params, yc, server)) {
|
||||
processed = true;
|
||||
break;
|
||||
}
|
||||
|
|
|
|||
|
|
@ -27,6 +27,7 @@ public:
|
|||
DECLARE_PTR_TYPEDEFS(HttpHandler);
|
||||
|
||||
virtual bool HandleRequest(
|
||||
const WaitGroup::Ptr& waitGroup,
|
||||
AsioTlsStream& stream,
|
||||
const ApiUser::Ptr& user,
|
||||
boost::beast::http::request<boost::beast::http::string_body>& request,
|
||||
|
|
@ -39,6 +40,7 @@ public:
|
|||
|
||||
static void Register(const Url::Ptr& url, const HttpHandler::Ptr& handler);
|
||||
static void ProcessRequest(
|
||||
const WaitGroup::Ptr& waitGroup,
|
||||
AsioTlsStream& stream,
|
||||
const ApiUser::Ptr& user,
|
||||
boost::beast::http::request<boost::beast::http::string_body>& request,
|
||||
|
|
|
|||
|
|
@ -35,13 +35,13 @@ using namespace icinga;
|
|||
|
||||
auto const l_ServerHeader ("Icinga/" + Application::GetAppVersion());
|
||||
|
||||
HttpServerConnection::HttpServerConnection(const String& identity, bool authenticated, const Shared<AsioTlsStream>::Ptr& stream)
|
||||
: HttpServerConnection(identity, authenticated, stream, IoEngine::Get().GetIoContext())
|
||||
HttpServerConnection::HttpServerConnection(const WaitGroup::Ptr& waitGroup, const String& identity, bool authenticated, const Shared<AsioTlsStream>::Ptr& stream)
|
||||
: HttpServerConnection(waitGroup, identity, authenticated, stream, IoEngine::Get().GetIoContext())
|
||||
{
|
||||
}
|
||||
|
||||
HttpServerConnection::HttpServerConnection(const String& identity, bool authenticated, const Shared<AsioTlsStream>::Ptr& stream, boost::asio::io_context& io)
|
||||
: m_Stream(stream), m_Seen(Utility::GetTime()), m_IoStrand(io), m_ShuttingDown(false), m_HasStartedStreaming(false),
|
||||
HttpServerConnection::HttpServerConnection(const WaitGroup::Ptr& waitGroup, const String& identity, bool authenticated, const Shared<AsioTlsStream>::Ptr& stream, boost::asio::io_context& io)
|
||||
: m_WaitGroup(waitGroup), m_Stream(stream), m_Seen(Utility::GetTime()), m_IoStrand(io), m_ShuttingDown(false), m_HasStartedStreaming(false),
|
||||
m_CheckLivenessTimer(io)
|
||||
{
|
||||
if (authenticated) {
|
||||
|
|
@ -419,6 +419,7 @@ bool ProcessRequest(
|
|||
boost::beast::http::response<boost::beast::http::string_body>& response,
|
||||
HttpServerConnection& server,
|
||||
bool& hasStartedStreaming,
|
||||
const WaitGroup::Ptr& waitGroup,
|
||||
std::chrono::steady_clock::duration& cpuBoundWorkTime,
|
||||
boost::asio::yield_context& yc
|
||||
)
|
||||
|
|
@ -431,7 +432,7 @@ bool ProcessRequest(
|
|||
CpuBoundWork handlingRequest (yc);
|
||||
cpuBoundWorkTime = std::chrono::steady_clock::now() - start;
|
||||
|
||||
HttpHandler::ProcessRequest(stream, authenticatedUser, request, response, yc, server);
|
||||
HttpHandler::ProcessRequest(waitGroup, stream, authenticatedUser, request, response, yc, server);
|
||||
} catch (const std::exception& ex) {
|
||||
if (hasStartedStreaming) {
|
||||
return false;
|
||||
|
|
@ -477,7 +478,7 @@ void HttpServerConnection::ProcessMessages(boost::asio::yield_context yc)
|
|||
*/
|
||||
beast::flat_buffer buf;
|
||||
|
||||
for (;;) {
|
||||
while (m_WaitGroup->IsLockable()) {
|
||||
m_Seen = Utility::GetTime();
|
||||
|
||||
http::parser<true, http::string_body> parser;
|
||||
|
|
@ -548,7 +549,7 @@ void HttpServerConnection::ProcessMessages(boost::asio::yield_context yc)
|
|||
|
||||
m_Seen = std::numeric_limits<decltype(m_Seen)>::max();
|
||||
|
||||
if (!ProcessRequest(*m_Stream, request, authenticatedUser, response, *this, m_HasStartedStreaming, cpuBoundWorkTime, yc)) {
|
||||
if (!ProcessRequest(*m_Stream, request, authenticatedUser, response, *this, m_HasStartedStreaming, m_WaitGroup, cpuBoundWorkTime, yc)) {
|
||||
break;
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -6,6 +6,7 @@
|
|||
#include "remote/apiuser.hpp"
|
||||
#include "base/string.hpp"
|
||||
#include "base/tlsstream.hpp"
|
||||
#include "base/wait-group.hpp"
|
||||
#include <memory>
|
||||
#include <boost/asio/deadline_timer.hpp>
|
||||
#include <boost/asio/io_context.hpp>
|
||||
|
|
@ -25,14 +26,15 @@ class HttpServerConnection final : public Object
|
|||
public:
|
||||
DECLARE_PTR_TYPEDEFS(HttpServerConnection);
|
||||
|
||||
HttpServerConnection(const String& identity, bool authenticated, const Shared<AsioTlsStream>::Ptr& stream);
|
||||
HttpServerConnection(const WaitGroup::Ptr& waitGroup, const String& identity, bool authenticated,
|
||||
const Shared<AsioTlsStream>::Ptr& stream);
|
||||
|
||||
void Start();
|
||||
void StartStreaming();
|
||||
|
||||
bool Disconnected();
|
||||
|
||||
private:
|
||||
WaitGroup::Ptr m_WaitGroup;
|
||||
ApiUser::Ptr m_ApiUser;
|
||||
Shared<AsioTlsStream>::Ptr m_Stream;
|
||||
double m_Seen;
|
||||
|
|
@ -42,7 +44,8 @@ private:
|
|||
bool m_HasStartedStreaming;
|
||||
boost::asio::deadline_timer m_CheckLivenessTimer;
|
||||
|
||||
HttpServerConnection(const String& identity, bool authenticated, const Shared<AsioTlsStream>::Ptr& stream, boost::asio::io_context& io);
|
||||
HttpServerConnection(const WaitGroup::Ptr& waitGroup, const String& identity, bool authenticated,
|
||||
const Shared<AsioTlsStream>::Ptr& stream, boost::asio::io_context& io);
|
||||
|
||||
void Disconnect(boost::asio::yield_context yc);
|
||||
|
||||
|
|
|
|||
|
|
@ -9,6 +9,7 @@ using namespace icinga;
|
|||
REGISTER_URLHANDLER("/", InfoHandler);
|
||||
|
||||
bool InfoHandler::HandleRequest(
|
||||
const WaitGroup::Ptr&,
|
||||
AsioTlsStream& stream,
|
||||
const ApiUser::Ptr& user,
|
||||
boost::beast::http::request<boost::beast::http::string_body>& request,
|
||||
|
|
|
|||
|
|
@ -14,6 +14,7 @@ public:
|
|||
DECLARE_PTR_TYPEDEFS(InfoHandler);
|
||||
|
||||
bool HandleRequest(
|
||||
const WaitGroup::Ptr& waitGroup,
|
||||
AsioTlsStream& stream,
|
||||
const ApiUser::Ptr& user,
|
||||
boost::beast::http::request<boost::beast::http::string_body>& request,
|
||||
|
|
|
|||
|
|
@ -29,17 +29,17 @@ REGISTER_APIFUNCTION(SetLogPosition, log, &SetLogPositionHandler);
|
|||
|
||||
static RingBuffer l_TaskStats (15 * 60);
|
||||
|
||||
JsonRpcConnection::JsonRpcConnection(const String& identity, bool authenticated,
|
||||
JsonRpcConnection::JsonRpcConnection(const WaitGroup::Ptr& waitGroup, const String& identity, bool authenticated,
|
||||
const Shared<AsioTlsStream>::Ptr& stream, ConnectionRole role)
|
||||
: JsonRpcConnection(identity, authenticated, stream, role, IoEngine::Get().GetIoContext())
|
||||
: JsonRpcConnection(waitGroup, identity, authenticated, stream, role, IoEngine::Get().GetIoContext())
|
||||
{
|
||||
}
|
||||
|
||||
JsonRpcConnection::JsonRpcConnection(const String& identity, bool authenticated,
|
||||
JsonRpcConnection::JsonRpcConnection(const WaitGroup::Ptr& waitGroup, const String& identity, bool authenticated,
|
||||
const Shared<AsioTlsStream>::Ptr& stream, ConnectionRole role, boost::asio::io_context& io)
|
||||
: m_Identity(identity), m_Authenticated(authenticated), m_Stream(stream), m_Role(role),
|
||||
m_Timestamp(Utility::GetTime()), m_Seen(Utility::GetTime()), m_IoStrand(io),
|
||||
m_OutgoingMessagesQueued(io), m_WriterDone(io), m_ShuttingDown(false),
|
||||
m_OutgoingMessagesQueued(io), m_WriterDone(io), m_ShuttingDown(false), m_WaitGroup(waitGroup),
|
||||
m_CheckLivenessTimer(io), m_HeartbeatTimer(io)
|
||||
{
|
||||
if (authenticated)
|
||||
|
|
@ -284,7 +284,7 @@ void JsonRpcConnection::Disconnect()
|
|||
ApiListener::GetInstance()->RemoveAnonymousClient(this);
|
||||
}
|
||||
|
||||
Log(LogWarning, "JsonRpcConnection")
|
||||
Log(LogInformation, "JsonRpcConnection")
|
||||
<< "API client disconnected for identity '" << m_Identity << "'";
|
||||
});
|
||||
}
|
||||
|
|
@ -303,6 +303,11 @@ void JsonRpcConnection::Disconnect()
|
|||
*/
|
||||
void JsonRpcConnection::MessageHandler(const Dictionary::Ptr& message)
|
||||
{
|
||||
std::shared_lock wgLock(*m_WaitGroup, std::try_to_lock);
|
||||
if (!wgLock) {
|
||||
return;
|
||||
}
|
||||
|
||||
if (m_Endpoint && message->Contains("ts")) {
|
||||
double ts = message->Get("ts");
|
||||
|
||||
|
|
|
|||
|
|
@ -8,6 +8,7 @@
|
|||
#include "base/atomic.hpp"
|
||||
#include "base/io-engine.hpp"
|
||||
#include "base/tlsstream.hpp"
|
||||
#include "base/wait-group.hpp"
|
||||
#include "base/timer.hpp"
|
||||
#include "base/workqueue.hpp"
|
||||
#include <memory>
|
||||
|
|
@ -43,7 +44,8 @@ class JsonRpcConnection final : public Object
|
|||
public:
|
||||
DECLARE_PTR_TYPEDEFS(JsonRpcConnection);
|
||||
|
||||
JsonRpcConnection(const String& identity, bool authenticated, const Shared<AsioTlsStream>::Ptr& stream, ConnectionRole role);
|
||||
JsonRpcConnection(const WaitGroup::Ptr& waitgroup, const String& identity, bool authenticated,
|
||||
const Shared<AsioTlsStream>::Ptr& stream, ConnectionRole role);
|
||||
|
||||
void Start();
|
||||
|
||||
|
|
@ -78,9 +80,11 @@ private:
|
|||
AsioEvent m_OutgoingMessagesQueued;
|
||||
AsioEvent m_WriterDone;
|
||||
Atomic<bool> m_ShuttingDown;
|
||||
WaitGroup::Ptr m_WaitGroup;
|
||||
boost::asio::deadline_timer m_CheckLivenessTimer, m_HeartbeatTimer;
|
||||
|
||||
JsonRpcConnection(const String& identity, bool authenticated, const Shared<AsioTlsStream>::Ptr& stream, ConnectionRole role, boost::asio::io_context& io);
|
||||
JsonRpcConnection(const WaitGroup::Ptr& waitgroup, const String& identity, bool authenticated,
|
||||
const Shared<AsioTlsStream>::Ptr& stream, ConnectionRole role, boost::asio::io_context& io);
|
||||
|
||||
void HandleIncomingMessages(boost::asio::yield_context yc);
|
||||
void WriteOutgoingMessages(boost::asio::yield_context yc);
|
||||
|
|
|
|||
|
|
@ -18,6 +18,7 @@ using namespace icinga;
|
|||
REGISTER_URLHANDLER("/v1/debug/malloc_info", MallocInfoHandler);
|
||||
|
||||
bool MallocInfoHandler::HandleRequest(
|
||||
const WaitGroup::Ptr&,
|
||||
AsioTlsStream&,
|
||||
const ApiUser::Ptr& user,
|
||||
boost::beast::http::request<boost::beast::http::string_body>& request,
|
||||
|
|
|
|||
|
|
@ -13,6 +13,7 @@ public:
|
|||
DECLARE_PTR_TYPEDEFS(MallocInfoHandler);
|
||||
|
||||
bool HandleRequest(
|
||||
const WaitGroup::Ptr& waitGroup,
|
||||
AsioTlsStream& stream,
|
||||
const ApiUser::Ptr& user,
|
||||
boost::beast::http::request<boost::beast::http::string_body>& request,
|
||||
|
|
|
|||
|
|
@ -14,6 +14,7 @@ using namespace icinga;
|
|||
REGISTER_URLHANDLER("/v1/objects", ModifyObjectHandler);
|
||||
|
||||
bool ModifyObjectHandler::HandleRequest(
|
||||
const WaitGroup::Ptr& waitGroup,
|
||||
AsioTlsStream& stream,
|
||||
const ApiUser::Ptr& user,
|
||||
boost::beast::http::request<boost::beast::http::string_body>& request,
|
||||
|
|
@ -104,12 +105,31 @@ bool ModifyObjectHandler::HandleRequest(
|
|||
|
||||
ArrayData results;
|
||||
|
||||
std::shared_lock wgLock{*waitGroup, std::try_to_lock};
|
||||
if (!wgLock) {
|
||||
HttpUtility::SendJsonError(response, params, 503, "Shutting down.");
|
||||
return true;
|
||||
}
|
||||
|
||||
for (ConfigObject::Ptr obj : objs) {
|
||||
Dictionary::Ptr result1 = new Dictionary();
|
||||
|
||||
result1->Set("type", type->GetName());
|
||||
result1->Set("name", obj->GetName());
|
||||
|
||||
if (!waitGroup->IsLockable()) {
|
||||
if (wgLock) {
|
||||
wgLock.unlock();
|
||||
}
|
||||
|
||||
result1->Set("code", 503);
|
||||
result1->Set("status", "Action skipped: Shutting down.");
|
||||
|
||||
results.emplace_back(std::move(result1));
|
||||
|
||||
continue;
|
||||
}
|
||||
|
||||
String key;
|
||||
|
||||
// Lock the object name of the given type to prevent from being modified/deleted concurrently.
|
||||
|
|
|
|||
|
|
@ -14,6 +14,7 @@ public:
|
|||
DECLARE_PTR_TYPEDEFS(ModifyObjectHandler);
|
||||
|
||||
bool HandleRequest(
|
||||
const WaitGroup::Ptr& waitGroup,
|
||||
AsioTlsStream& stream,
|
||||
const ApiUser::Ptr& user,
|
||||
boost::beast::http::request<boost::beast::http::string_body>& request,
|
||||
|
|
|
|||
|
|
@ -89,6 +89,7 @@ Dictionary::Ptr ObjectQueryHandler::SerializeObjectAttrs(const Object::Ptr& obje
|
|||
}
|
||||
|
||||
bool ObjectQueryHandler::HandleRequest(
|
||||
const WaitGroup::Ptr&,
|
||||
AsioTlsStream& stream,
|
||||
const ApiUser::Ptr& user,
|
||||
boost::beast::http::request<boost::beast::http::string_body>& request,
|
||||
|
|
|
|||
|
|
@ -14,6 +14,7 @@ public:
|
|||
DECLARE_PTR_TYPEDEFS(ObjectQueryHandler);
|
||||
|
||||
bool HandleRequest(
|
||||
const WaitGroup::Ptr& waitGroup,
|
||||
AsioTlsStream& stream,
|
||||
const ApiUser::Ptr& user,
|
||||
boost::beast::http::request<boost::beast::http::string_body>& request,
|
||||
|
|
|
|||
|
|
@ -69,6 +69,7 @@ public:
|
|||
};
|
||||
|
||||
bool StatusHandler::HandleRequest(
|
||||
const WaitGroup::Ptr&,
|
||||
AsioTlsStream& stream,
|
||||
const ApiUser::Ptr& user,
|
||||
boost::beast::http::request<boost::beast::http::string_body>& request,
|
||||
|
|
|
|||
|
|
@ -14,6 +14,7 @@ public:
|
|||
DECLARE_PTR_TYPEDEFS(StatusHandler);
|
||||
|
||||
bool HandleRequest(
|
||||
const WaitGroup::Ptr& waitGroup,
|
||||
AsioTlsStream& stream,
|
||||
const ApiUser::Ptr& user,
|
||||
boost::beast::http::request<boost::beast::http::string_body>& request,
|
||||
|
|
|
|||
|
|
@ -76,6 +76,7 @@ public:
|
|||
};
|
||||
|
||||
bool TemplateQueryHandler::HandleRequest(
|
||||
const WaitGroup::Ptr&,
|
||||
AsioTlsStream& stream,
|
||||
const ApiUser::Ptr& user,
|
||||
boost::beast::http::request<boost::beast::http::string_body>& request,
|
||||
|
|
|
|||
|
|
@ -14,6 +14,7 @@ public:
|
|||
DECLARE_PTR_TYPEDEFS(TemplateQueryHandler);
|
||||
|
||||
bool HandleRequest(
|
||||
const WaitGroup::Ptr& waitGroup,
|
||||
AsioTlsStream& stream,
|
||||
const ApiUser::Ptr& user,
|
||||
boost::beast::http::request<boost::beast::http::string_body>& request,
|
||||
|
|
|
|||
|
|
@ -47,6 +47,7 @@ public:
|
|||
};
|
||||
|
||||
bool TypeQueryHandler::HandleRequest(
|
||||
const WaitGroup::Ptr&,
|
||||
AsioTlsStream& stream,
|
||||
const ApiUser::Ptr& user,
|
||||
boost::beast::http::request<boost::beast::http::string_body>& request,
|
||||
|
|
|
|||
|
|
@ -14,6 +14,7 @@ public:
|
|||
DECLARE_PTR_TYPEDEFS(TypeQueryHandler);
|
||||
|
||||
bool HandleRequest(
|
||||
const WaitGroup::Ptr& waitGroup,
|
||||
AsioTlsStream& stream,
|
||||
const ApiUser::Ptr& user,
|
||||
boost::beast::http::request<boost::beast::http::string_body>& request,
|
||||
|
|
|
|||
|
|
@ -57,6 +57,7 @@ public:
|
|||
};
|
||||
|
||||
bool VariableQueryHandler::HandleRequest(
|
||||
const WaitGroup::Ptr&,
|
||||
AsioTlsStream& stream,
|
||||
const ApiUser::Ptr& user,
|
||||
boost::beast::http::request<boost::beast::http::string_body>& request,
|
||||
|
|
|
|||
|
|
@ -14,6 +14,7 @@ public:
|
|||
DECLARE_PTR_TYPEDEFS(VariableQueryHandler);
|
||||
|
||||
bool HandleRequest(
|
||||
const WaitGroup::Ptr& waitGroup,
|
||||
AsioTlsStream& stream,
|
||||
const ApiUser::Ptr& user,
|
||||
boost::beast::http::request<boost::beast::http::string_body>& request,
|
||||
|
|
|
|||
Loading…
Reference in a new issue