1 /* $NetBSD: message_queue.c,v 1.2 2021/08/14 16:14:59 christos Exp $ */
2
3 /* message_queue.c - routines to maintain the per-connection lists
4 * of pending operations */
5 /* $OpenLDAP$ */
6 /* This work is part of OpenLDAP Software <http://www.openldap.org/>.
7 *
8 * Copyright 2016-2021 The OpenLDAP Foundation.
9 * Portions Copyright 2016 Symas Corporation.
10 * All rights reserved.
11 *
12 * Redistribution and use in source and binary forms, with or without
13 * modification, are permitted only as authorized by the OpenLDAP
14 * Public License.
15 *
16 * A copy of this license is available in the file LICENSE in the
17 * top-level directory of the distribution or, alternatively, at
18 * <http://www.OpenLDAP.org/license.html>.
19 */
20
21 /* ACKNOWLEDGEMENTS:
22 * This work was developed by Symas Corporation
23 * based on back-meta module for inclusion in OpenLDAP Software.
24 * This work was sponsored by Ericsson. */
25
26 #include <sys/cdefs.h>
27 __RCSID("$NetBSD: message_queue.c,v 1.2 2021/08/14 16:14:59 christos Exp $");
28
29 #include "portable.h"
30
31 #include <stdio.h>
32
33 #include <ac/socket.h>
34 #include <ac/string.h>
35 #include <ac/time.h>
36
37 #include "lutil.h"
38 #include "slap.h"
39 #include "../back-ldap/back-ldap.h"
40 #include "back-asyncmeta.h"
41 #include "../../../libraries/liblber/lber-int.h"
42 #include "lutil.h"
43
44
45 typedef struct listptr {
46 void *reserved;
47 struct listptr *next;
48 } listptr;
49
50 typedef struct listhead {
51 struct listptr *list;
52 int cnt;
53 } listhead;
54
55 #ifndef LH_MAX
56 #define LH_MAX 16
57 #endif
58
asyncmeta_memctx_put(void * threadctx,void * memctx)59 static void asyncmeta_memctx_put(void *threadctx, void *memctx)
60 {
61 slap_sl_mem_setctx(threadctx, NULL);
62 slap_sl_mem_destroy((void *)1, memctx);
63 }
64
asyncmeta_new_bm_context(Operation * op,SlapReply * rs,bm_context_t ** new_bc,int ntargets,a_metainfo_t * mi)65 int asyncmeta_new_bm_context(Operation *op,
66 SlapReply *rs,
67 bm_context_t **new_bc,
68 int ntargets,
69 a_metainfo_t *mi)
70 {
71 int i;
72 *new_bc = op->o_tmpcalloc( 1, sizeof( bm_context_t ), op->o_tmpmemctx );
73
74 (*new_bc)->op = op;
75 (*new_bc)->copy_op = *op;
76 (*new_bc)->candidates = op->o_tmpcalloc(ntargets, sizeof(SlapReply),op->o_tmpmemctx);
77 (*new_bc)->msgids = op->o_tmpcalloc(ntargets, sizeof(int),op->o_tmpmemctx);
78 (*new_bc)->nretries = op->o_tmpcalloc(ntargets, sizeof(int),op->o_tmpmemctx);
79 (*new_bc)->c_peer_name = op->o_conn->c_peer_name;
80 (*new_bc)->is_root = be_isroot( op );
81
82 switch(op->o_tag) {
83 case LDAP_REQ_COMPARE:
84 {
85 AttributeAssertion *ava = op->o_tmpcalloc( 1, sizeof(AttributeAssertion), op->o_tmpmemctx );
86 *ava = *op->orc_ava;
87 op->orc_ava = ava;
88 }
89 break;
90 case LDAP_REQ_MODRDN:
91 if (op->orr_newSup != NULL) {
92 struct berval *bv = op->o_tmpalloc( sizeof( struct berval ), op->o_tmpmemctx );
93 *bv = *op->orr_newSup;
94 op->orr_newSup = bv;
95 }
96
97 if (op->orr_nnewSup != NULL) {
98 struct berval *bv = op->o_tmpalloc( sizeof( struct berval ), op->o_tmpmemctx );
99 *bv = *op->orr_nnewSup;
100 op->orr_nnewSup = bv;
101 }
102 break;
103 default:
104 break;
105 }
106 for (i = 0; i < ntargets; i++) {
107 (*new_bc)->msgids[i] = META_MSGID_UNDEFINED;
108 }
109 for (i = 0; i < ntargets; i++) {
110 (*new_bc)->nretries[i] = mi->mi_targets[i]->mt_nretries;
111 }
112 return LDAP_SUCCESS;
113 }
114
asyncmeta_free_op(Operation * op)115 void asyncmeta_free_op(Operation *op)
116 {
117 assert (op != NULL);
118 switch (op->o_tag) {
119 case LDAP_REQ_SEARCH:
120 break;
121 case LDAP_REQ_ADD:
122 if ( op->ora_modlist != NULL ) {
123 slap_mods_free(op->ora_modlist, 0 );
124 }
125
126 if ( op->ora_e != NULL ) {
127 entry_free( op->ora_e );
128 }
129
130 break;
131 case LDAP_REQ_MODIFY:
132 if ( op->orm_modlist != NULL ) {
133 slap_mods_free(op->orm_modlist, 1 );
134 }
135 break;
136 case LDAP_REQ_MODRDN:
137 if ( op->orr_modlist != NULL ) {
138 slap_mods_free(op->orr_modlist, 1 );
139 }
140 break;
141 case LDAP_REQ_COMPARE:
142 break;
143 case LDAP_REQ_DELETE:
144 break;
145 default:
146 Debug( LDAP_DEBUG_TRACE, "==> asyncmeta_free_op : other message type" );
147 }
148
149 connection_op_finish( op );
150 slap_op_free( op, op->o_threadctx );
151 }
152
153
154
155
asyncmeta_clear_bm_context(bm_context_t * bc)156 void asyncmeta_clear_bm_context(bm_context_t *bc)
157 {
158
159 Operation *op = bc->op;
160 void *thrctx, *memctx;
161 int i;
162
163 if ( bc->bc_mc && bc->bc_mc->mc_info ) {
164 for (i = 0; i < bc->bc_mc->mc_info->mi_ntargets; i++) {
165 if (bc->candidates[ i ].sr_text != NULL) {
166 ch_free( (char *)bc->candidates[ i ].sr_text );
167 bc->candidates[ i ].sr_text = NULL;
168 }
169 }
170 }
171
172 if (op->o_conn->c_conn_idx == -1)
173 return;
174 memctx = op->o_tmpmemctx;
175 thrctx = op->o_threadctx;
176 while (op->o_bd == bc->copy_op.o_bd)
177 ldap_pvt_thread_yield();
178 asyncmeta_free_op(op);
179 asyncmeta_memctx_put(thrctx, memctx);
180 }
181
asyncmeta_add_message_queue(a_metaconn_t * mc,bm_context_t * bc)182 int asyncmeta_add_message_queue(a_metaconn_t *mc, bm_context_t *bc)
183 {
184 a_metainfo_t *mi = mc->mc_info;
185 int max_pending_ops = (mi->mi_max_pending_ops == 0) ? META_BACK_CFG_MAX_PENDING_OPS : mi->mi_max_pending_ops;
186
187 Debug( LDAP_DEBUG_TRACE, "add_message_queue: mc %p, pending_ops %d, max_pending %d\n",
188 mc, mc->pending_ops, max_pending_ops );
189
190 assert(bc->bc_mc == NULL);
191 if (mc->pending_ops >= max_pending_ops) {
192 return LDAP_BUSY;
193 }
194 bc->bc_mc = mc;
195
196 slap_sl_mem_setctx(bc->op->o_threadctx, NULL);
197 LDAP_STAILQ_INSERT_TAIL( &mc->mc_om_list, bc, bc_next);
198 mc->pending_ops++;
199 return LDAP_SUCCESS;
200 }
201
202
203 void
asyncmeta_drop_bc(a_metaconn_t * mc,bm_context_t * bc)204 asyncmeta_drop_bc(a_metaconn_t *mc, bm_context_t *bc)
205 {
206 bm_context_t *om;
207 LDAP_STAILQ_FOREACH( om, &mc->mc_om_list, bc_next ) {
208 if (om == bc) {
209 LDAP_STAILQ_REMOVE(&mc->mc_om_list, om, bm_context_t, bc_next);
210 mc->pending_ops--;
211 break;
212 }
213 }
214 assert(om == bc);
215 assert(bc->bc_mc == mc);
216 }
217
218
219 bm_context_t *
asyncmeta_find_message(ber_int_t msgid,a_metaconn_t * mc,int candidate)220 asyncmeta_find_message(ber_int_t msgid, a_metaconn_t *mc, int candidate)
221 {
222 bm_context_t *om;
223 LDAP_STAILQ_FOREACH( om, &mc->mc_om_list, bc_next ) {
224 if (om->candidates[candidate].sr_msgid == msgid && !om->bc_invalid) {
225 break;
226 }
227 }
228 return om;
229 }
230
231 bm_context_t *
asyncmeta_bc_in_queue(a_metaconn_t * mc,bm_context_t * bc)232 asyncmeta_bc_in_queue(a_metaconn_t *mc, bm_context_t *bc)
233 {
234 bm_context_t *om;
235 LDAP_STAILQ_FOREACH( om, &mc->mc_om_list, bc_next ) {
236 if (om == bc) {
237 return bc;
238 }
239 }
240 return NULL;
241 }
242