1 /* $NetBSD: message_queue.c,v 1.2 2021/08/14 16:14:59 christos Exp $ */ 2 3 /* message_queue.c - routines to maintain the per-connection lists 4 * of pending operations */ 5 /* $OpenLDAP$ */ 6 /* This work is part of OpenLDAP Software <http://www.openldap.org/>. 7 * 8 * Copyright 2016-2021 The OpenLDAP Foundation. 9 * Portions Copyright 2016 Symas Corporation. 10 * All rights reserved. 11 * 12 * Redistribution and use in source and binary forms, with or without 13 * modification, are permitted only as authorized by the OpenLDAP 14 * Public License. 15 * 16 * A copy of this license is available in the file LICENSE in the 17 * top-level directory of the distribution or, alternatively, at 18 * <http://www.OpenLDAP.org/license.html>. 19 */ 20 21 /* ACKNOWLEDGEMENTS: 22 * This work was developed by Symas Corporation 23 * based on back-meta module for inclusion in OpenLDAP Software. 24 * This work was sponsored by Ericsson. */ 25 26 #include <sys/cdefs.h> 27 __RCSID("$NetBSD: message_queue.c,v 1.2 2021/08/14 16:14:59 christos Exp $"); 28 29 #include "portable.h" 30 31 #include <stdio.h> 32 33 #include <ac/socket.h> 34 #include <ac/string.h> 35 #include <ac/time.h> 36 37 #include "lutil.h" 38 #include "slap.h" 39 #include "../back-ldap/back-ldap.h" 40 #include "back-asyncmeta.h" 41 #include "../../../libraries/liblber/lber-int.h" 42 #include "lutil.h" 43 44 45 typedef struct listptr { 46 void *reserved; 47 struct listptr *next; 48 } listptr; 49 50 typedef struct listhead { 51 struct listptr *list; 52 int cnt; 53 } listhead; 54 55 #ifndef LH_MAX 56 #define LH_MAX 16 57 #endif 58 59 static void asyncmeta_memctx_put(void *threadctx, void *memctx) 60 { 61 slap_sl_mem_setctx(threadctx, NULL); 62 slap_sl_mem_destroy((void *)1, memctx); 63 } 64 65 int asyncmeta_new_bm_context(Operation *op, 66 SlapReply *rs, 67 bm_context_t **new_bc, 68 int ntargets, 69 a_metainfo_t *mi) 70 { 71 int i; 72 *new_bc = op->o_tmpcalloc( 1, sizeof( bm_context_t ), op->o_tmpmemctx ); 73 74 (*new_bc)->op = op; 75 (*new_bc)->copy_op = *op; 76 (*new_bc)->candidates = op->o_tmpcalloc(ntargets, sizeof(SlapReply),op->o_tmpmemctx); 77 (*new_bc)->msgids = op->o_tmpcalloc(ntargets, sizeof(int),op->o_tmpmemctx); 78 (*new_bc)->nretries = op->o_tmpcalloc(ntargets, sizeof(int),op->o_tmpmemctx); 79 (*new_bc)->c_peer_name = op->o_conn->c_peer_name; 80 (*new_bc)->is_root = be_isroot( op ); 81 82 switch(op->o_tag) { 83 case LDAP_REQ_COMPARE: 84 { 85 AttributeAssertion *ava = op->o_tmpcalloc( 1, sizeof(AttributeAssertion), op->o_tmpmemctx ); 86 *ava = *op->orc_ava; 87 op->orc_ava = ava; 88 } 89 break; 90 case LDAP_REQ_MODRDN: 91 if (op->orr_newSup != NULL) { 92 struct berval *bv = op->o_tmpalloc( sizeof( struct berval ), op->o_tmpmemctx ); 93 *bv = *op->orr_newSup; 94 op->orr_newSup = bv; 95 } 96 97 if (op->orr_nnewSup != NULL) { 98 struct berval *bv = op->o_tmpalloc( sizeof( struct berval ), op->o_tmpmemctx ); 99 *bv = *op->orr_nnewSup; 100 op->orr_nnewSup = bv; 101 } 102 break; 103 default: 104 break; 105 } 106 for (i = 0; i < ntargets; i++) { 107 (*new_bc)->msgids[i] = META_MSGID_UNDEFINED; 108 } 109 for (i = 0; i < ntargets; i++) { 110 (*new_bc)->nretries[i] = mi->mi_targets[i]->mt_nretries; 111 } 112 return LDAP_SUCCESS; 113 } 114 115 void asyncmeta_free_op(Operation *op) 116 { 117 assert (op != NULL); 118 switch (op->o_tag) { 119 case LDAP_REQ_SEARCH: 120 break; 121 case LDAP_REQ_ADD: 122 if ( op->ora_modlist != NULL ) { 123 slap_mods_free(op->ora_modlist, 0 ); 124 } 125 126 if ( op->ora_e != NULL ) { 127 entry_free( op->ora_e ); 128 } 129 130 break; 131 case LDAP_REQ_MODIFY: 132 if ( op->orm_modlist != NULL ) { 133 slap_mods_free(op->orm_modlist, 1 ); 134 } 135 break; 136 case LDAP_REQ_MODRDN: 137 if ( op->orr_modlist != NULL ) { 138 slap_mods_free(op->orr_modlist, 1 ); 139 } 140 break; 141 case LDAP_REQ_COMPARE: 142 break; 143 case LDAP_REQ_DELETE: 144 break; 145 default: 146 Debug( LDAP_DEBUG_TRACE, "==> asyncmeta_free_op : other message type" ); 147 } 148 149 connection_op_finish( op ); 150 slap_op_free( op, op->o_threadctx ); 151 } 152 153 154 155 156 void asyncmeta_clear_bm_context(bm_context_t *bc) 157 { 158 159 Operation *op = bc->op; 160 void *thrctx, *memctx; 161 int i; 162 163 if ( bc->bc_mc && bc->bc_mc->mc_info ) { 164 for (i = 0; i < bc->bc_mc->mc_info->mi_ntargets; i++) { 165 if (bc->candidates[ i ].sr_text != NULL) { 166 ch_free( (char *)bc->candidates[ i ].sr_text ); 167 bc->candidates[ i ].sr_text = NULL; 168 } 169 } 170 } 171 172 if (op->o_conn->c_conn_idx == -1) 173 return; 174 memctx = op->o_tmpmemctx; 175 thrctx = op->o_threadctx; 176 while (op->o_bd == bc->copy_op.o_bd) 177 ldap_pvt_thread_yield(); 178 asyncmeta_free_op(op); 179 asyncmeta_memctx_put(thrctx, memctx); 180 } 181 182 int asyncmeta_add_message_queue(a_metaconn_t *mc, bm_context_t *bc) 183 { 184 a_metainfo_t *mi = mc->mc_info; 185 int max_pending_ops = (mi->mi_max_pending_ops == 0) ? META_BACK_CFG_MAX_PENDING_OPS : mi->mi_max_pending_ops; 186 187 Debug( LDAP_DEBUG_TRACE, "add_message_queue: mc %p, pending_ops %d, max_pending %d\n", 188 mc, mc->pending_ops, max_pending_ops ); 189 190 assert(bc->bc_mc == NULL); 191 if (mc->pending_ops >= max_pending_ops) { 192 return LDAP_BUSY; 193 } 194 bc->bc_mc = mc; 195 196 slap_sl_mem_setctx(bc->op->o_threadctx, NULL); 197 LDAP_STAILQ_INSERT_TAIL( &mc->mc_om_list, bc, bc_next); 198 mc->pending_ops++; 199 return LDAP_SUCCESS; 200 } 201 202 203 void 204 asyncmeta_drop_bc(a_metaconn_t *mc, bm_context_t *bc) 205 { 206 bm_context_t *om; 207 LDAP_STAILQ_FOREACH( om, &mc->mc_om_list, bc_next ) { 208 if (om == bc) { 209 LDAP_STAILQ_REMOVE(&mc->mc_om_list, om, bm_context_t, bc_next); 210 mc->pending_ops--; 211 break; 212 } 213 } 214 assert(om == bc); 215 assert(bc->bc_mc == mc); 216 } 217 218 219 bm_context_t * 220 asyncmeta_find_message(ber_int_t msgid, a_metaconn_t *mc, int candidate) 221 { 222 bm_context_t *om; 223 LDAP_STAILQ_FOREACH( om, &mc->mc_om_list, bc_next ) { 224 if (om->candidates[candidate].sr_msgid == msgid && !om->bc_invalid) { 225 break; 226 } 227 } 228 return om; 229 } 230 231 bm_context_t * 232 asyncmeta_bc_in_queue(a_metaconn_t *mc, bm_context_t *bc) 233 { 234 bm_context_t *om; 235 LDAP_STAILQ_FOREACH( om, &mc->mc_om_list, bc_next ) { 236 if (om == bc) { 237 return bc; 238 } 239 } 240 return NULL; 241 } 242