Rework isccc_ccmsg to support multiple messages per tcp read

Previously, only a single controlconf message would be processed from a
single TCP read even if the TCP read buffer contained multiple messages.
Refactor the isccc_ccmsg unit to store the extra buffer in the internal
buffer and use the already read data first before reading from the
network again.

Co-authored-by: Ondřej Surý <ondrej@isc.org>
Co-authored-by: Dominik Thalhammer <dominik@thalhammer.it>
This commit is contained in:
Dominik Thalhammer 2023-11-09 10:26:43 +01:00 committed by Ondřej Surý
parent e13728413a
commit 24ae1157e8
No known key found for this signature in database
GPG key ID: 2820F37E873DEA41
3 changed files with 65 additions and 54 deletions

View file

@ -305,8 +305,7 @@ rndc_recvdone(isc_nmhandle_t *handle, isc_result_t result, void *arg) {
fatal("recv failed: %s", isc_result_totext(result));
}
source.rstart = isc_buffer_base(ccmsg->buffer);
source.rend = isc_buffer_used(ccmsg->buffer);
isccc_ccmsg_toregion(ccmsg, &source);
DO("parse message",
isccc_cc_fromwire(&source, &response, algorithm, &secret));
@ -381,8 +380,7 @@ rndc_recvnonce(isc_nmhandle_t *handle ISC_ATTR_UNUSED, isc_result_t result,
fatal("recv failed: %s", isc_result_totext(result));
}
source.rstart = isc_buffer_base(ccmsg->buffer);
source.rend = isc_buffer_used(ccmsg->buffer);
isccc_ccmsg_toregion(ccmsg, &source);
DO("parse message",
isccc_cc_fromwire(&source, &response, algorithm, &secret));

View file

@ -42,6 +42,35 @@
#define CCMSG_MAGIC ISC_MAGIC('C', 'C', 'm', 's')
#define VALID_CCMSG(foo) ISC_MAGIC_VALID(foo, CCMSG_MAGIC)
/*
* Try parsing a message from the internal read_buffer and set state
* accordingly. Returns true if a message was successfully parsed, false if not.
* If no message could be parsed the ccmsg struct remains untouched.
*/
static isc_result_t
try_parse_message(isccc_ccmsg_t *ccmsg) {
REQUIRE(ccmsg != NULL);
uint32_t len = 0;
if (isc_buffer_peekuint32(ccmsg->buffer, &len) != ISC_R_SUCCESS) {
return ISC_R_NOMORE;
}
if (len == 0) {
return ISC_R_UNEXPECTEDEND;
}
if (len > ccmsg->maxsize) {
return ISC_R_RANGE;
}
if (isc_buffer_remaininglength(ccmsg->buffer) < sizeof(uint32_t) + len)
{
return ISC_R_NOMORE;
}
/* Skip the size we just peeked */
isc_buffer_forward(ccmsg->buffer, sizeof(uint32_t));
ccmsg->size = len;
return ISC_R_SUCCESS;
}
static void
recv_data(isc_nmhandle_t *handle, isc_result_t eresult, isc_region_t *region,
void *arg) {
@ -56,49 +85,19 @@ recv_data(isc_nmhandle_t *handle, isc_result_t eresult, isc_region_t *region,
REQUIRE(region != NULL);
if (!ccmsg->length_received) {
if (region->length < sizeof(uint32_t)) {
eresult = ISC_R_UNEXPECTEDEND;
goto done;
}
ccmsg->size = ntohl(*(uint32_t *)region->base);
if (ccmsg->size == 0) {
eresult = ISC_R_UNEXPECTEDEND;
goto done;
}
if (ccmsg->size > ccmsg->maxsize) {
eresult = ISC_R_RANGE;
goto done;
}
isc_region_consume(region, sizeof(uint32_t));
isc_buffer_allocate(ccmsg->mctx, &ccmsg->buffer, ccmsg->size);
ccmsg->length_received = true;
}
/*
* If there's no more data, wait for more
*/
if (region->length == 0) {
return;
}
/* We have some data in the buffer, read it */
size_t size = ISC_MIN(isc_buffer_availablelength(ccmsg->buffer),
region->length);
isc_buffer_putmem(ccmsg->buffer, region->base, size);
isc_region_consume(region, size);
if (isc_buffer_usedlength(ccmsg->buffer) == ccmsg->size) {
/* Copy the received data to our reassembly buffer */
eresult = isc_buffer_copyregion(ccmsg->buffer, region);
if (eresult != ISC_R_SUCCESS) {
goto done;
}
isc_region_consume(region, region->length);
/* Wait for more data to come */
return;
/* Try to parse a single message of the buffer */
eresult = try_parse_message(ccmsg);
/* No results from parsing, we need more data */
if (eresult == ISC_R_NOMORE) {
return;
}
done:
isc_nm_read_stop(handle);
@ -120,6 +119,10 @@ isccc_ccmsg_init(isc_mem_t *mctx, isc_nmhandle_t *handle,
.mctx = mctx,
};
/* Preallocate the buffer to maximum single TCP read */
isc_buffer_allocate(ccmsg->mctx, &ccmsg->buffer,
UINT16_MAX + sizeof(uint16_t));
isc_nmhandle_attach(handle, &ccmsg->handle);
}
@ -134,15 +137,25 @@ void
isccc_ccmsg_readmessage(isccc_ccmsg_t *ccmsg, isc_nm_cb_t cb, void *cbarg) {
REQUIRE(VALID_CCMSG(ccmsg));
if (ccmsg->buffer != NULL) {
isc_buffer_free(&ccmsg->buffer);
if (ccmsg->size != 0) {
/* Remove the previously read message from the buffer */
isc_buffer_forward(ccmsg->buffer, ccmsg->size);
ccmsg->size = 0;
isc_buffer_trycompact(ccmsg->buffer);
}
ccmsg->recv_cb = cb;
ccmsg->recv_cbarg = cbarg;
ccmsg->length_received = false;
isc_nm_read(ccmsg->handle, recv_data, ccmsg);
/* If we have previous data still in the buffer, try to parse it */
isc_result_t result = try_parse_message(ccmsg);
if (result == ISC_R_NOMORE) {
/* We need to read more data */
isc_nm_read(ccmsg->handle, recv_data, ccmsg);
return;
}
ccmsg->recv_cb(ccmsg->handle, result, ccmsg->recv_cbarg);
}
static void
@ -187,18 +200,19 @@ isccc_ccmsg_disconnect(isccc_ccmsg_t *ccmsg) {
void
isccc_ccmsg_invalidate(isccc_ccmsg_t *ccmsg) {
REQUIRE(VALID_CCMSG(ccmsg));
REQUIRE(ccmsg->handle == NULL);
ccmsg->magic = 0;
if (ccmsg->buffer != NULL) {
isc_buffer_free(&ccmsg->buffer);
}
isc_buffer_free(&ccmsg->buffer);
}
void
isccc_ccmsg_toregion(isccc_ccmsg_t *ccmsg, isccc_region_t *ccregion) {
REQUIRE(VALID_CCMSG(ccmsg));
REQUIRE(ccmsg->buffer);
REQUIRE(isc_buffer_remaininglength(ccmsg->buffer) >= ccmsg->size);
ccregion->rstart = isc_buffer_base(ccmsg->buffer);
ccregion->rend = isc_buffer_used(ccmsg->buffer);
ccregion->rstart = isc_buffer_current(ccmsg->buffer);
ccregion->rend = ccregion->rstart + ccmsg->size;
}

View file

@ -45,7 +45,6 @@ typedef struct isccc_ccmsg {
/* private (don't touch!) */
unsigned int magic;
uint32_t size;
bool length_received;
isc_buffer_t *buffer;
unsigned int maxsize;
isc_mem_t *mctx;