diff --git a/servers/lloadd/Makefile.in b/servers/lloadd/Makefile.in
index 54beefffe9..736e2bbeba 100644
--- a/servers/lloadd/Makefile.in
+++ b/servers/lloadd/Makefile.in
@@ -22,7 +22,7 @@ NT_SRCS = nt_svc.c
NT_OBJS = nt_svc.o ../../libraries/liblutil/slapdmsg.res
SRCS = main.c globals.c backend.c config.c connection.c client.c daemon.c \
- ch_malloc.c init.c user.c sl_malloc.c upstream.c value.c \
+ ch_malloc.c init.c operation.c user.c sl_malloc.c upstream.c value.c \
libevent_support.c \
$(@PLAT@_SRCS)
diff --git a/servers/lloadd/operation.c b/servers/lloadd/operation.c
new file mode 100644
index 0000000000..c5a7217f17
--- /dev/null
+++ b/servers/lloadd/operation.c
@@ -0,0 +1,155 @@
+/* $OpenLDAP$ */
+/* This work is part of OpenLDAP Software .
+ *
+ * Copyright 1998-2020 The OpenLDAP Foundation.
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted only as authorized by the OpenLDAP
+ * Public License.
+ *
+ * A copy of this license is available in the file LICENSE in the
+ * top-level directory of the distribution or, alternatively, at
+ * .
+ */
+
+#include "portable.h"
+
+#include "lutil.h"
+#include "slap.h"
+
+int
+operation_client_cmp( const void *left, const void *right )
+{
+ const Operation *l = left, *r = right;
+
+ assert( l->o_client == r->o_client );
+ return ( l->o_client_msgid < r->o_client_msgid ) ?
+ -1 :
+ ( l->o_client_msgid > r->o_client_msgid );
+}
+
+int
+operation_upstream_cmp( const void *left, const void *right )
+{
+ const Operation *l = left, *r = right;
+
+ assert( l->o_upstream == r->o_upstream );
+ return ( l->o_upstream_msgid < r->o_upstream_msgid ) ?
+ -1 :
+ ( l->o_upstream_msgid > r->o_upstream_msgid );
+}
+
+void
+operation_destroy( Operation *op )
+{
+ Connection *c;
+
+ /* TODO: this is a stopgap and there are many races here, just get
+ * something in to test with until we implement the freelist */
+ if ( op->o_client ) {
+ c = op->o_client;
+ ldap_pvt_thread_mutex_lock( &c->c_mutex );
+ tavl_delete( &c->c_ops, op, operation_client_cmp );
+ ldap_pvt_thread_mutex_unlock( &c->c_mutex );
+ }
+
+ if ( op->o_upstream ) {
+ c = op->o_upstream;
+ ldap_pvt_thread_mutex_lock( &c->c_mutex );
+ tavl_delete( &c->c_ops, op, operation_upstream_cmp );
+ ldap_pvt_thread_mutex_unlock( &c->c_mutex );
+ }
+
+ ch_free( op );
+}
+
+Operation *
+operation_init( Connection *c, BerElement *ber )
+{
+ Operation *op;
+ ber_tag_t tag;
+ ber_len_t len;
+ int rc;
+
+ op = ch_calloc( 1, sizeof(Operation) );
+ op->o_client = c;
+
+ tag = ber_get_int( ber, &op->o_client_msgid );
+ if ( tag != LDAP_TAG_MSGID ) {
+ goto fail;
+ }
+
+ rc = tavl_insert( &c->c_ops, op, operation_client_cmp, avl_dup_error );
+ if ( rc ) {
+ Debug( LDAP_DEBUG_PACKETS, "operation_init: "
+ "several operations with same msgid=%d in-flight "
+ "from the client\n",
+ op->o_client_msgid );
+ goto fail;
+ }
+
+ tag = op->o_tag = ber_skip_element( ber, &op->o_request );
+ switch ( tag ) {
+ case LBER_ERROR:
+ rc = -1;
+ break;
+ }
+ if ( rc ) {
+ tavl_delete( &c->c_ops, op, operation_client_cmp );
+ goto fail;
+ }
+
+ tag = ber_peek_tag( ber, &len );
+ if ( tag == LDAP_TAG_CONTROLS ) {
+ ber_skip_element( ber, &op->o_ctrls );
+ }
+
+ return op;
+
+fail:
+ ch_free( op );
+ return NULL;
+}
+
+void *
+operation_process( void *ctx, void *arg )
+{
+ Operation *op = arg;
+ BerElement *output;
+ Connection *c;
+ ber_int_t msgid;
+ int rc;
+
+ c = backend_select( op );
+ if ( !c ) {
+ Debug( LDAP_DEBUG_STATS, "operation_process: "
+ "no available connection found\n" );
+ goto fail;
+ }
+ op->o_upstream = c;
+
+ c->c_pendingber = output = ber_alloc();
+ if ( !output ) {
+ goto fail;
+ }
+
+ op->o_upstream_msgid = msgid = c->c_next_msgid++;
+ rc = tavl_insert( &c->c_ops, op, operation_upstream_cmp, avl_dup_error );
+ assert( rc == LDAP_SUCCESS );
+
+ ber_start_seq( output, LDAP_TAG_MESSAGE );
+ ber_put_int( output, msgid, LDAP_TAG_MSGID );
+ ber_put_berval( output, &op->o_request, op->o_tag );
+ if ( !BER_BVISNULL( &op->o_ctrls ) ) {
+ ber_put_berval( output, &op->o_ctrls, LDAP_TAG_CONTROLS );
+ }
+ ber_put_seq( output );
+
+ ldap_pvt_thread_mutex_unlock( &c->c_mutex );
+ upstream_write_cb( -1, 0, c );
+
+ return NULL;
+fail:
+ return NULL;
+}
diff --git a/servers/lloadd/proto-slap.h b/servers/lloadd/proto-slap.h
index cb648ad087..68f372dc7d 100644
--- a/servers/lloadd/proto-slap.h
+++ b/servers/lloadd/proto-slap.h
@@ -41,6 +41,7 @@ struct config_reply_s; /* config.h */
*/
LDAP_SLAPD_F (void *) backend_connect( void *ctx, void *arg );
+LDAP_SLAPD_F (Connection *) backend_select( Operation *op );
/*
* ch_malloc.c
@@ -132,6 +133,15 @@ LDAP_SLAPD_F (int) lload_libevent_init( void );
LDAP_SLAPD_V (int) slapd_register_slp;
LDAP_SLAPD_V (const char *) slapd_slp_attrs;
+/*
+ * operation.c
+ */
+LDAP_SLAPD_F (int) operation_upstream_cmp( const void *l, const void *r );
+LDAP_SLAPD_F (int) operation_client_cmp( const void *l, const void *r );
+LDAP_SLAPD_F (void *) operation_process( void *ctx, void *arg );
+LDAP_SLAPD_F (Operation *) operation_init( Connection *c, BerElement *ber );
+LDAP_SLAPD_F (void) operation_destroy( Operation *op );
+
/*
* sl_malloc.c
*/
diff --git a/servers/lloadd/slap.h b/servers/lloadd/slap.h
index 7eebeeaca0..27cfc4b17a 100644
--- a/servers/lloadd/slap.h
+++ b/servers/lloadd/slap.h
@@ -98,6 +98,7 @@ typedef unsigned long slap_mask_t;
typedef struct Backend Backend;
typedef struct Connection Connection;
+typedef struct Operation Operation;
/* end of forward declarations */
typedef union Sockaddr {
@@ -288,6 +289,8 @@ struct Connection {
BerElement *c_currentber; /* ber we're attempting to read */
BerElement *c_pendingber; /* ber we're attempting to write */
+ TAvlnode *c_ops; /* Operations pending on the connection */
+
#define CONN_IS_TLS 1
#define CONN_IS_CLIENT 4
#define CONN_IS_IPC 8
@@ -303,6 +306,16 @@ struct Connection {
void *c_private;
};
+struct Operation {
+ Connection *o_client, *o_upstream;
+
+ ber_int_t o_client_msgid, o_upstream_msgid;
+ ber_tag_t o_tag;
+
+ BerElement *o_ber;
+ BerValue o_request, o_ctrls;
+};
+
#ifdef LDAP_DEBUG
#ifdef LDAP_SYSLOG
#ifdef LOG_LOCAL4