diff --git a/lib/base/stream.cpp b/lib/base/stream.cpp index 72ca82c1a..3048f096c 100644 --- a/lib/base/stream.cpp +++ b/lib/base/stream.cpp @@ -91,6 +91,16 @@ bool Stream::WaitForData(int timeout) return IsDataAvailable() || IsEof(); } +void Stream::SetCorked(bool corked) +{ + m_Corked = corked; +} + +bool Stream::IsCorked() const +{ + return m_Corked; +} + static void StreamDummyCallback() { } diff --git a/lib/base/stream.hpp b/lib/base/stream.hpp index 8a140a586..bb4129138 100644 --- a/lib/base/stream.hpp +++ b/lib/base/stream.hpp @@ -127,6 +127,9 @@ public: bool WaitForData(); bool WaitForData(int timeout); + virtual void SetCorked(bool corked); + bool IsCorked() const; + virtual bool SupportsWaiting() const; virtual bool IsDataAvailable() const; @@ -143,6 +146,8 @@ private: boost::mutex m_Mutex; boost::condition_variable m_CV; + + bool m_Corked{false}; }; } diff --git a/lib/base/tlsstream.cpp b/lib/base/tlsstream.cpp index 2846e8962..e93f119d8 100644 --- a/lib/base/tlsstream.cpp +++ b/lib/base/tlsstream.cpp @@ -151,12 +151,17 @@ void TlsStream::OnEvent(int revents) char buffer[64 * 1024]; if (m_CurrentAction == TlsActionNone) { - if (revents & (POLLIN | POLLERR | POLLHUP)) + bool corked = IsCorked(); + if (!corked && (revents & (POLLIN | POLLERR | POLLHUP))) m_CurrentAction = TlsActionRead; else if (m_SendQ->GetAvailableBytes() > 0 && (revents & POLLOUT)) m_CurrentAction = TlsActionWrite; else { - ChangeEvents(POLLIN); + if (corked) + ChangeEvents(0); + else + ChangeEvents(POLLIN); + return; } } @@ -399,6 +404,18 @@ bool TlsStream::IsDataAvailable() const return m_RecvQ->GetAvailableBytes() > 0; } +void TlsStream::SetCorked(bool corked) +{ + Stream::SetCorked(corked); + + boost::mutex::scoped_lock lock(m_Mutex); + + if (corked) + m_CurrentAction = TlsActionNone; + else + ChangeEvents(POLLIN | POLLOUT); +} + Socket::Ptr TlsStream::GetSocket() const { return m_Socket; diff --git a/lib/base/tlsstream.hpp b/lib/base/tlsstream.hpp index 3528d430b..ad5b92d0f 100644 --- a/lib/base/tlsstream.hpp +++ b/lib/base/tlsstream.hpp @@ -70,6 +70,8 @@ public: bool SupportsWaiting() const override; bool IsDataAvailable() const override; + void SetCorked(bool corked) override; + bool IsVerifyOK() const; String GetVerifyError() const; diff --git a/lib/remote/httpserverconnection.cpp b/lib/remote/httpserverconnection.cpp index 177ea4079..a51873975 100644 --- a/lib/remote/httpserverconnection.cpp +++ b/lib/remote/httpserverconnection.cpp @@ -96,7 +96,6 @@ void HttpServerConnection::Disconnect() bool HttpServerConnection::ProcessMessage() { - bool res; HttpResponse response(m_Stream, m_CurrentRequest); @@ -174,6 +173,8 @@ bool HttpServerConnection::ProcessMessage() return res; } + m_Stream->SetCorked(true); + m_RequestQueue.Enqueue(std::bind(&HttpServerConnection::ProcessMessageAsync, HttpServerConnection::Ptr(this), m_CurrentRequest, response, m_AuthenticatedUser)); @@ -328,6 +329,7 @@ void HttpServerConnection::ProcessMessageAsync(HttpRequest& request, HttpRespons response.Finish(); m_PendingRequests--; + m_Stream->SetCorked(false); } void HttpServerConnection::DataAvailableHandler() diff --git a/lib/remote/jsonrpcconnection.cpp b/lib/remote/jsonrpcconnection.cpp index c1895934c..978cd9658 100644 --- a/lib/remote/jsonrpcconnection.cpp +++ b/lib/remote/jsonrpcconnection.cpp @@ -155,6 +155,8 @@ void JsonRpcConnection::MessageHandlerWrapper(const String& jsonString) try { MessageHandler(jsonString); + + m_Stream->SetCorked(false); } catch (const std::exception& ex) { Log(LogWarning, "JsonRpcConnection") << "Error while reading JSON-RPC message for identity '" << m_Identity @@ -255,6 +257,8 @@ bool JsonRpcConnection::ProcessMessage() if (srs != StatusNewItem) return false; + m_Stream->SetCorked(true); + l_JsonRpcConnectionWorkQueues[m_ID % l_JsonRpcConnectionWorkQueueCount].Enqueue(std::bind(&JsonRpcConnection::MessageHandlerWrapper, JsonRpcConnection::Ptr(this), message)); return true;