xref: /netbsd-src/external/bsd/openldap/dist/servers/slapd/back-asyncmeta/message_queue.c (revision 549b59ed3ccf0d36d3097190a0db27b770f3a839)
1 /*	$NetBSD: message_queue.c,v 1.2 2021/08/14 16:14:59 christos Exp $	*/
2 
3 /* message_queue.c - routines to maintain the per-connection lists
4  * of pending operations */
5 /* $OpenLDAP$ */
6 /* This work is part of OpenLDAP Software <http://www.openldap.org/>.
7  *
8  * Copyright 2016-2021 The OpenLDAP Foundation.
9  * Portions Copyright 2016 Symas Corporation.
10  * All rights reserved.
11  *
12  * Redistribution and use in source and binary forms, with or without
13  * modification, are permitted only as authorized by the OpenLDAP
14  * Public License.
15  *
16  * A copy of this license is available in the file LICENSE in the
17  * top-level directory of the distribution or, alternatively, at
18  * <http://www.OpenLDAP.org/license.html>.
19  */
20 
21 /* ACKNOWLEDGEMENTS:
22  * This work was developed by Symas Corporation
23  * based on back-meta module for inclusion in OpenLDAP Software.
24  * This work was sponsored by Ericsson. */
25 
26 #include <sys/cdefs.h>
27 __RCSID("$NetBSD: message_queue.c,v 1.2 2021/08/14 16:14:59 christos Exp $");
28 
29 #include "portable.h"
30 
31 #include <stdio.h>
32 
33 #include <ac/socket.h>
34 #include <ac/string.h>
35 #include <ac/time.h>
36 
37 #include "lutil.h"
38 #include "slap.h"
39 #include "../back-ldap/back-ldap.h"
40 #include "back-asyncmeta.h"
41 #include "../../../libraries/liblber/lber-int.h"
42 #include "lutil.h"
43 
44 
45 typedef struct listptr {
46 	void *reserved;
47 	struct listptr *next;
48 } listptr;
49 
50 typedef struct listhead {
51 	struct listptr *list;
52 	int cnt;
53 } listhead;
54 
55 #ifndef LH_MAX
56 #define LH_MAX	16
57 #endif
58 
asyncmeta_memctx_put(void * threadctx,void * memctx)59 static void asyncmeta_memctx_put(void *threadctx, void *memctx)
60 {
61 	slap_sl_mem_setctx(threadctx, NULL);
62 	slap_sl_mem_destroy((void *)1, memctx);
63 }
64 
asyncmeta_new_bm_context(Operation * op,SlapReply * rs,bm_context_t ** new_bc,int ntargets,a_metainfo_t * mi)65 int asyncmeta_new_bm_context(Operation *op,
66 			     SlapReply *rs,
67 			     bm_context_t **new_bc,
68 			     int ntargets,
69 			     a_metainfo_t	*mi)
70 {
71 	int i;
72 	*new_bc = op->o_tmpcalloc( 1, sizeof( bm_context_t ), op->o_tmpmemctx );
73 
74 	(*new_bc)->op = op;
75 	(*new_bc)->copy_op = *op;
76 	(*new_bc)->candidates = op->o_tmpcalloc(ntargets, sizeof(SlapReply),op->o_tmpmemctx);
77 	(*new_bc)->msgids = op->o_tmpcalloc(ntargets, sizeof(int),op->o_tmpmemctx);
78 	(*new_bc)->nretries = op->o_tmpcalloc(ntargets, sizeof(int),op->o_tmpmemctx);
79 	(*new_bc)->c_peer_name = op->o_conn->c_peer_name;
80 	(*new_bc)->is_root = be_isroot( op );
81 
82 	switch(op->o_tag) {
83 	case LDAP_REQ_COMPARE:
84 		{
85 		AttributeAssertion *ava = op->o_tmpcalloc( 1, sizeof(AttributeAssertion), op->o_tmpmemctx );
86 		*ava = *op->orc_ava;
87 		op->orc_ava = ava;
88 		}
89 		break;
90 	case LDAP_REQ_MODRDN:
91 		if (op->orr_newSup != NULL) {
92 			struct berval *bv = op->o_tmpalloc( sizeof( struct berval ), op->o_tmpmemctx );
93 			*bv = *op->orr_newSup;
94 			op->orr_newSup = bv;
95 		}
96 
97 		if (op->orr_nnewSup != NULL) {
98 			struct berval *bv = op->o_tmpalloc( sizeof( struct berval ), op->o_tmpmemctx );
99 			*bv = *op->orr_nnewSup;
100 			op->orr_nnewSup = bv;
101 		}
102 		break;
103 	default:
104 		break;
105 	}
106 	for (i = 0; i < ntargets; i++) {
107 		(*new_bc)->msgids[i] = META_MSGID_UNDEFINED;
108 	}
109 	for (i = 0; i < ntargets; i++) {
110 		(*new_bc)->nretries[i] = mi->mi_targets[i]->mt_nretries;
111 	}
112 	return LDAP_SUCCESS;
113 }
114 
asyncmeta_free_op(Operation * op)115 void asyncmeta_free_op(Operation *op)
116 {
117 	assert (op != NULL);
118 	switch (op->o_tag) {
119 	case LDAP_REQ_SEARCH:
120 		break;
121 	case LDAP_REQ_ADD:
122 		if ( op->ora_modlist != NULL ) {
123 			slap_mods_free(op->ora_modlist, 0 );
124 		}
125 
126 		if ( op->ora_e != NULL ) {
127 			entry_free( op->ora_e );
128 		}
129 
130 		break;
131 	case LDAP_REQ_MODIFY:
132 		if ( op->orm_modlist != NULL ) {
133 			slap_mods_free(op->orm_modlist, 1 );
134 		}
135 		break;
136 	case LDAP_REQ_MODRDN:
137 		if ( op->orr_modlist != NULL ) {
138 			slap_mods_free(op->orr_modlist, 1 );
139 		}
140 		break;
141 	case LDAP_REQ_COMPARE:
142 		break;
143 	case LDAP_REQ_DELETE:
144 		break;
145 	default:
146 		Debug( LDAP_DEBUG_TRACE, "==> asyncmeta_free_op : other message type" );
147 	}
148 
149 	connection_op_finish( op );
150 	slap_op_free( op, op->o_threadctx );
151 }
152 
153 
154 
155 
asyncmeta_clear_bm_context(bm_context_t * bc)156 void asyncmeta_clear_bm_context(bm_context_t *bc)
157 {
158 
159 	Operation *op = bc->op;
160 	void *thrctx, *memctx;
161 	int i;
162 
163 	if ( bc->bc_mc && bc->bc_mc->mc_info ) {
164 		for (i = 0; i < bc->bc_mc->mc_info->mi_ntargets; i++) {
165 			if (bc->candidates[ i ].sr_text != NULL) {
166 				ch_free( (char *)bc->candidates[ i ].sr_text );
167 				bc->candidates[ i ].sr_text = NULL;
168 			}
169 		}
170 	}
171 
172 	if (op->o_conn->c_conn_idx == -1)
173 		return;
174 	memctx = op->o_tmpmemctx;
175 	thrctx = op->o_threadctx;
176 	while (op->o_bd == bc->copy_op.o_bd)
177 		ldap_pvt_thread_yield();
178 	asyncmeta_free_op(op);
179 	asyncmeta_memctx_put(thrctx, memctx);
180 }
181 
asyncmeta_add_message_queue(a_metaconn_t * mc,bm_context_t * bc)182 int asyncmeta_add_message_queue(a_metaconn_t *mc, bm_context_t *bc)
183 {
184 	a_metainfo_t *mi = mc->mc_info;
185 	int max_pending_ops = (mi->mi_max_pending_ops == 0) ? META_BACK_CFG_MAX_PENDING_OPS : mi->mi_max_pending_ops;
186 
187 	Debug( LDAP_DEBUG_TRACE, "add_message_queue: mc %p, pending_ops %d, max_pending %d\n",
188 		mc, mc->pending_ops, max_pending_ops );
189 
190 	assert(bc->bc_mc == NULL);
191 	if (mc->pending_ops >= max_pending_ops) {
192 		return LDAP_BUSY;
193 	}
194 	bc->bc_mc = mc;
195 
196 	slap_sl_mem_setctx(bc->op->o_threadctx, NULL);
197 	LDAP_STAILQ_INSERT_TAIL( &mc->mc_om_list, bc, bc_next);
198 	mc->pending_ops++;
199 	return LDAP_SUCCESS;
200 }
201 
202 
203 void
asyncmeta_drop_bc(a_metaconn_t * mc,bm_context_t * bc)204 asyncmeta_drop_bc(a_metaconn_t *mc, bm_context_t *bc)
205 {
206 	bm_context_t *om;
207 	LDAP_STAILQ_FOREACH( om, &mc->mc_om_list, bc_next ) {
208 		if (om == bc) {
209 			LDAP_STAILQ_REMOVE(&mc->mc_om_list, om, bm_context_t, bc_next);
210 			mc->pending_ops--;
211 			break;
212 		}
213 	}
214 	assert(om == bc);
215 	assert(bc->bc_mc == mc);
216 }
217 
218 
219 bm_context_t *
asyncmeta_find_message(ber_int_t msgid,a_metaconn_t * mc,int candidate)220 asyncmeta_find_message(ber_int_t msgid, a_metaconn_t *mc, int candidate)
221 {
222 	bm_context_t *om;
223 	LDAP_STAILQ_FOREACH( om, &mc->mc_om_list, bc_next ) {
224 		if (om->candidates[candidate].sr_msgid == msgid && !om->bc_invalid) {
225 			break;
226 		}
227 	}
228 	return om;
229 }
230 
231 bm_context_t *
asyncmeta_bc_in_queue(a_metaconn_t * mc,bm_context_t * bc)232 asyncmeta_bc_in_queue(a_metaconn_t *mc, bm_context_t *bc)
233 {
234 	bm_context_t *om;
235 	LDAP_STAILQ_FOREACH( om, &mc->mc_om_list, bc_next ) {
236 		if (om == bc) {
237 			return bc;
238 		}
239 	}
240 	return NULL;
241 }
242