1 /* $NetBSD: syncprov.c,v 1.3 2021/08/14 16:15:02 christos Exp $ */ 2 3 /* $OpenLDAP$ */ 4 /* syncprov.c - syncrepl provider */ 5 /* This work is part of OpenLDAP Software <http://www.openldap.org/>. 6 * 7 * Copyright 2004-2021 The OpenLDAP Foundation. 8 * All rights reserved. 9 * 10 * Redistribution and use in source and binary forms, with or without 11 * modification, are permitted only as authorized by the OpenLDAP 12 * Public License. 13 * 14 * A copy of this license is available in the file LICENSE in the 15 * top-level directory of the distribution or, alternatively, at 16 * <http://www.OpenLDAP.org/license.html>. 17 */ 18 /* ACKNOWLEDGEMENTS: 19 * This work was initially developed by Howard Chu for inclusion in 20 * OpenLDAP Software. 21 */ 22 23 #include <sys/cdefs.h> 24 __RCSID("$NetBSD: syncprov.c,v 1.3 2021/08/14 16:15:02 christos Exp $"); 25 26 #include "portable.h" 27 28 #ifdef SLAPD_OVER_SYNCPROV 29 30 #include <ac/string.h> 31 #include "lutil.h" 32 #include "slap.h" 33 #include "slap-config.h" 34 #include "ldap_rq.h" 35 36 #ifdef LDAP_DEVEL 37 #define CHECK_CSN 1 38 #endif 39 40 /* A modify request on a particular entry */ 41 typedef struct modinst { 42 struct modinst *mi_next; 43 Operation *mi_op; 44 } modinst; 45 46 typedef struct modtarget { 47 struct modinst *mt_mods; 48 struct modinst *mt_tail; 49 struct berval mt_dn; 50 ldap_pvt_thread_mutex_t mt_mutex; 51 } modtarget; 52 53 /* All the info of a psearch result that's shared between 54 * multiple queues 55 */ 56 typedef struct resinfo { 57 struct syncres *ri_list; 58 Entry *ri_e; 59 struct berval ri_dn; 60 struct berval ri_ndn; 61 struct berval ri_uuid; 62 struct berval ri_csn; 63 struct berval ri_cookie; 64 char ri_isref; 65 ldap_pvt_thread_mutex_t ri_mutex; 66 } resinfo; 67 68 /* A queued result of a persistent search */ 69 typedef struct syncres { 70 struct syncres *s_next; /* list of results on this psearch queue */ 71 struct syncres *s_rilist; /* list of psearches using this result */ 72 resinfo *s_info; 73 char s_mode; 74 } syncres; 75 76 /* Record of a persistent search */ 77 typedef struct syncops { 78 struct syncops *s_next; 79 struct syncprov_info_t *s_si; 80 struct berval s_base; /* ndn of search base */ 81 ID s_eid; /* entryID of search base */ 82 Operation *s_op; /* search op */ 83 int s_rid; 84 int s_sid; 85 struct berval s_filterstr; 86 int s_flags; /* search status */ 87 #define PS_IS_REFRESHING 0x01 88 #define PS_IS_DETACHED 0x02 89 #define PS_WROTE_BASE 0x04 90 #define PS_FIND_BASE 0x08 91 #define PS_FIX_FILTER 0x10 92 #define PS_TASK_QUEUED 0x20 93 94 int s_inuse; /* reference count */ 95 struct syncres *s_res; 96 struct syncres *s_restail; 97 void *s_pool_cookie; 98 ldap_pvt_thread_mutex_t s_mutex; 99 } syncops; 100 101 /* A received sync control */ 102 typedef struct sync_control { 103 struct sync_cookie sr_state; 104 int sr_rhint; 105 } sync_control; 106 107 #if 0 /* moved back to slap.h */ 108 #define o_sync o_ctrlflag[slap_cids.sc_LDAPsync] 109 #endif 110 /* o_sync_mode uses data bits of o_sync */ 111 #define o_sync_mode o_ctrlflag[slap_cids.sc_LDAPsync] 112 113 #define SLAP_SYNC_NONE (LDAP_SYNC_NONE<<SLAP_CONTROL_SHIFT) 114 #define SLAP_SYNC_REFRESH (LDAP_SYNC_REFRESH_ONLY<<SLAP_CONTROL_SHIFT) 115 #define SLAP_SYNC_PERSIST (LDAP_SYNC_RESERVED<<SLAP_CONTROL_SHIFT) 116 #define SLAP_SYNC_REFRESH_AND_PERSIST (LDAP_SYNC_REFRESH_AND_PERSIST<<SLAP_CONTROL_SHIFT) 117 118 /* Record of which searches matched at premodify step */ 119 typedef struct syncmatches { 120 struct syncmatches *sm_next; 121 syncops *sm_op; 122 } syncmatches; 123 124 /* Session log data */ 125 typedef struct slog_entry { 126 struct berval se_uuid; 127 struct berval se_csn; 128 int se_sid; 129 ber_tag_t se_tag; 130 } slog_entry; 131 132 typedef struct sessionlog { 133 BerVarray sl_mincsn; 134 int *sl_sids; 135 int sl_numcsns; 136 int sl_num; 137 int sl_size; 138 int sl_playing; 139 TAvlnode *sl_entries; 140 ldap_pvt_thread_rdwr_t sl_mutex; 141 } sessionlog; 142 143 /* Accesslog callback data */ 144 typedef struct syncprov_accesslog_deletes { 145 Operation *op; 146 SlapReply *rs; 147 sync_control *srs; 148 BerVarray ctxcsn; 149 int numcsns, *sids; 150 Avlnode *uuids; 151 BerVarray uuid_list; 152 int ndel, list_len; 153 char *uuid_buf; 154 } syncprov_accesslog_deletes; 155 156 /* The main state for this overlay */ 157 typedef struct syncprov_info_t { 158 syncops *si_ops; 159 struct berval si_contextdn; 160 struct berval si_logbase; 161 BerVarray si_ctxcsn; /* ldapsync context */ 162 int *si_sids; 163 int si_numcsns; 164 int si_chkops; /* checkpointing info */ 165 int si_chktime; 166 int si_numops; /* number of ops since last checkpoint */ 167 int si_nopres; /* Skip present phase */ 168 int si_usehint; /* use reload hint */ 169 int si_active; /* True if there are active mods */ 170 int si_dirty; /* True if the context is dirty, i.e changes 171 * have been made without updating the csn. */ 172 time_t si_chklast; /* time of last checkpoint */ 173 Avlnode *si_mods; /* entries being modified */ 174 sessionlog *si_logs; 175 ldap_pvt_thread_rdwr_t si_csn_rwlock; 176 ldap_pvt_thread_mutex_t si_ops_mutex; 177 ldap_pvt_thread_mutex_t si_mods_mutex; 178 ldap_pvt_thread_mutex_t si_resp_mutex; 179 } syncprov_info_t; 180 181 typedef struct opcookie { 182 slap_overinst *son; 183 syncmatches *smatches; 184 modtarget *smt; 185 Entry *se; 186 struct berval sdn; /* DN of entry, for deletes */ 187 struct berval sndn; 188 struct berval suuid; /* UUID of entry */ 189 struct berval sctxcsn; 190 short osid; /* sid of op csn */ 191 short rsid; /* sid of relay */ 192 short sreference; /* Is the entry a reference? */ 193 syncres ssres; 194 } opcookie; 195 196 typedef struct fbase_cookie { 197 struct berval *fdn; /* DN of a modified entry, for scope testing */ 198 syncops *fss; /* persistent search we're testing against */ 199 int fbase; /* if TRUE we found the search base and it's still valid */ 200 int fscope; /* if TRUE then fdn is within the psearch scope */ 201 } fbase_cookie; 202 203 static AttributeName csn_anlist[3]; 204 static AttributeName uuid_anlist[2]; 205 206 static AttributeDescription *ad_reqType, *ad_reqResult, *ad_reqDN, 207 *ad_reqEntryUUID, *ad_minCSN, *ad_reqNewDN; 208 209 /* Build a LDAPsync intermediate state control */ 210 static int 211 syncprov_state_ctrl( 212 Operation *op, 213 SlapReply *rs, 214 Entry *e, 215 int entry_sync_state, 216 LDAPControl **ctrls, 217 int num_ctrls, 218 int send_cookie, 219 struct berval *cookie ) 220 { 221 Attribute* a; 222 int ret; 223 224 BerElementBuffer berbuf; 225 BerElement *ber = (BerElement *)&berbuf; 226 LDAPControl *cp; 227 struct berval bv; 228 struct berval entryuuid_bv = BER_BVNULL; 229 230 ber_init2( ber, 0, LBER_USE_DER ); 231 ber_set_option( ber, LBER_OPT_BER_MEMCTX, &op->o_tmpmemctx ); 232 233 for ( a = e->e_attrs; a != NULL; a = a->a_next ) { 234 AttributeDescription *desc = a->a_desc; 235 if ( desc == slap_schema.si_ad_entryUUID ) { 236 entryuuid_bv = a->a_nvals[0]; 237 break; 238 } 239 } 240 241 /* FIXME: what if entryuuid is NULL or empty ? */ 242 243 if ( send_cookie && cookie ) { 244 ber_printf( ber, "{eOON}", 245 entry_sync_state, &entryuuid_bv, cookie ); 246 } else { 247 ber_printf( ber, "{eON}", 248 entry_sync_state, &entryuuid_bv ); 249 } 250 251 ret = ber_flatten2( ber, &bv, 0 ); 252 if ( ret == 0 ) { 253 cp = op->o_tmpalloc( sizeof( LDAPControl ) + bv.bv_len, op->o_tmpmemctx ); 254 cp->ldctl_oid = LDAP_CONTROL_SYNC_STATE; 255 cp->ldctl_iscritical = (op->o_sync == SLAP_CONTROL_CRITICAL); 256 cp->ldctl_value.bv_val = (char *)&cp[1]; 257 cp->ldctl_value.bv_len = bv.bv_len; 258 AC_MEMCPY( cp->ldctl_value.bv_val, bv.bv_val, bv.bv_len ); 259 ctrls[num_ctrls] = cp; 260 } 261 ber_free_buf( ber ); 262 263 if ( ret < 0 ) { 264 Debug( LDAP_DEBUG_TRACE, 265 "slap_build_sync_ctrl: ber_flatten2 failed (%d)\n", 266 ret ); 267 send_ldap_error( op, rs, LDAP_OTHER, "internal error" ); 268 return LDAP_OTHER; 269 } 270 271 return LDAP_SUCCESS; 272 } 273 274 /* Build a LDAPsync final state control */ 275 static int 276 syncprov_done_ctrl( 277 Operation *op, 278 SlapReply *rs, 279 LDAPControl **ctrls, 280 int num_ctrls, 281 int send_cookie, 282 struct berval *cookie, 283 int refreshDeletes ) 284 { 285 int ret; 286 BerElementBuffer berbuf; 287 BerElement *ber = (BerElement *)&berbuf; 288 LDAPControl *cp; 289 struct berval bv; 290 291 ber_init2( ber, NULL, LBER_USE_DER ); 292 ber_set_option( ber, LBER_OPT_BER_MEMCTX, &op->o_tmpmemctx ); 293 294 ber_printf( ber, "{" ); 295 if ( send_cookie && cookie ) { 296 ber_printf( ber, "O", cookie ); 297 } 298 if ( refreshDeletes == LDAP_SYNC_REFRESH_DELETES ) { 299 ber_printf( ber, "b", refreshDeletes ); 300 } 301 ber_printf( ber, "N}" ); 302 303 ret = ber_flatten2( ber, &bv, 0 ); 304 if ( ret == 0 ) { 305 cp = op->o_tmpalloc( sizeof( LDAPControl ) + bv.bv_len, op->o_tmpmemctx ); 306 cp->ldctl_oid = LDAP_CONTROL_SYNC_DONE; 307 cp->ldctl_iscritical = (op->o_sync == SLAP_CONTROL_CRITICAL); 308 cp->ldctl_value.bv_val = (char *)&cp[1]; 309 cp->ldctl_value.bv_len = bv.bv_len; 310 AC_MEMCPY( cp->ldctl_value.bv_val, bv.bv_val, bv.bv_len ); 311 ctrls[num_ctrls] = cp; 312 } 313 314 ber_free_buf( ber ); 315 316 if ( ret < 0 ) { 317 Debug( LDAP_DEBUG_TRACE, 318 "syncprov_done_ctrl: ber_flatten2 failed (%d)\n", 319 ret ); 320 send_ldap_error( op, rs, LDAP_OTHER, "internal error" ); 321 return LDAP_OTHER; 322 } 323 324 return LDAP_SUCCESS; 325 } 326 327 static int 328 syncprov_sendinfo( 329 Operation *op, 330 SlapReply *rs, 331 int type, 332 struct berval *cookie, 333 int refreshDone, 334 BerVarray syncUUIDs, 335 int refreshDeletes ) 336 { 337 BerElementBuffer berbuf; 338 BerElement *ber = (BerElement *)&berbuf; 339 struct berval rspdata; 340 341 int ret; 342 343 ber_init2( ber, NULL, LBER_USE_DER ); 344 ber_set_option( ber, LBER_OPT_BER_MEMCTX, &op->o_tmpmemctx ); 345 346 if ( type ) { 347 switch ( type ) { 348 case LDAP_TAG_SYNC_NEW_COOKIE: 349 Debug( LDAP_DEBUG_SYNC, "%s syncprov_sendinfo: " 350 "sending a new cookie=%s\n", 351 op->o_log_prefix, cookie->bv_val ); 352 ber_printf( ber, "tO", type, cookie ); 353 break; 354 case LDAP_TAG_SYNC_REFRESH_DELETE: 355 case LDAP_TAG_SYNC_REFRESH_PRESENT: 356 Debug( LDAP_DEBUG_SYNC, "%s syncprov_sendinfo: " 357 "%s cookie=%s\n", 358 op->o_log_prefix, 359 type == LDAP_TAG_SYNC_REFRESH_DELETE ? "refreshDelete" : "refreshPresent", 360 cookie ? cookie->bv_val : "" ); 361 ber_printf( ber, "t{", type ); 362 if ( cookie ) { 363 ber_printf( ber, "O", cookie ); 364 } 365 if ( refreshDone == 0 ) { 366 ber_printf( ber, "b", refreshDone ); 367 } 368 ber_printf( ber, "N}" ); 369 break; 370 case LDAP_TAG_SYNC_ID_SET: 371 Debug( LDAP_DEBUG_SYNC, "%s syncprov_sendinfo: " 372 "%s syncIdSet cookie=%s\n", 373 op->o_log_prefix, refreshDeletes ? "delete" : "present", 374 cookie ? cookie->bv_val : "" ); 375 ber_printf( ber, "t{", type ); 376 if ( cookie ) { 377 ber_printf( ber, "O", cookie ); 378 } 379 if ( refreshDeletes == 1 ) { 380 ber_printf( ber, "b", refreshDeletes ); 381 } 382 ber_printf( ber, "[W]", syncUUIDs ); 383 ber_printf( ber, "N}" ); 384 break; 385 default: 386 Debug( LDAP_DEBUG_TRACE, 387 "%s syncprov_sendinfo: invalid syncinfo type (%d)\n", 388 op->o_log_prefix, type ); 389 return LDAP_OTHER; 390 } 391 } 392 393 ret = ber_flatten2( ber, &rspdata, 0 ); 394 395 if ( ret < 0 ) { 396 Debug( LDAP_DEBUG_TRACE, 397 "syncprov_sendinfo: ber_flatten2 failed (%d)\n", 398 ret ); 399 send_ldap_error( op, rs, LDAP_OTHER, "internal error" ); 400 return LDAP_OTHER; 401 } 402 403 rs->sr_rspoid = LDAP_SYNC_INFO; 404 rs->sr_rspdata = &rspdata; 405 send_ldap_intermediate( op, rs ); 406 rs->sr_rspdata = NULL; 407 ber_free_buf( ber ); 408 409 return LDAP_SUCCESS; 410 } 411 412 /* Find a modtarget in an AVL tree */ 413 static int 414 sp_avl_cmp( const void *c1, const void *c2 ) 415 { 416 const modtarget *m1, *m2; 417 int rc; 418 419 m1 = c1; m2 = c2; 420 rc = m1->mt_dn.bv_len - m2->mt_dn.bv_len; 421 422 if ( rc ) return rc; 423 return ber_bvcmp( &m1->mt_dn, &m2->mt_dn ); 424 } 425 426 static int 427 sp_uuid_cmp( const void *l, const void *r ) 428 { 429 const struct berval *left = l, *right = r; 430 431 return ber_bvcmp( left, right ); 432 } 433 434 static int 435 syncprov_sessionlog_cmp( const void *l, const void *r ) 436 { 437 const slog_entry *left = l, *right = r; 438 int ret = ber_bvcmp( &left->se_csn, &right->se_csn ); 439 if ( !ret ) 440 ret = ber_bvcmp( &left->se_uuid, &right->se_uuid ); 441 /* Only time we have two modifications with same CSN is when we detect a 442 * rename during replication. 443 * We invert the test here because LDAP_REQ_MODDN is 444 * numerically greater than LDAP_REQ_MODIFY but we 445 * want it to occur first. 446 */ 447 if ( !ret ) 448 ret = right->se_tag - left->se_tag; 449 450 return ret; 451 } 452 453 /* syncprov_findbase: 454 * finds the true DN of the base of a search (with alias dereferencing) and 455 * checks to make sure the base entry doesn't get replaced with a different 456 * entry (e.g., swapping trees via ModDN, or retargeting an alias). If a 457 * change is detected, any persistent search on this base must be terminated / 458 * reloaded. 459 * On the first call, we just save the DN and entryID. On subsequent calls 460 * we compare the DN and entryID with the saved values. 461 */ 462 static int 463 findbase_cb( Operation *op, SlapReply *rs ) 464 { 465 slap_callback *sc = op->o_callback; 466 467 if ( rs->sr_type == REP_SEARCH && rs->sr_err == LDAP_SUCCESS ) { 468 fbase_cookie *fc = sc->sc_private; 469 470 /* If no entryID, we're looking for the first time. 471 * Just store whatever we got. 472 */ 473 if ( fc->fss->s_eid == NOID ) { 474 fc->fbase = 2; 475 fc->fss->s_eid = rs->sr_entry->e_id; 476 ber_dupbv( &fc->fss->s_base, &rs->sr_entry->e_nname ); 477 478 } else if ( rs->sr_entry->e_id == fc->fss->s_eid && 479 dn_match( &rs->sr_entry->e_nname, &fc->fss->s_base )) { 480 481 /* OK, the DN is the same and the entryID is the same. */ 482 fc->fbase = 1; 483 } 484 } 485 if ( rs->sr_err != LDAP_SUCCESS ) { 486 Debug( LDAP_DEBUG_ANY, "findbase failed! %d\n", rs->sr_err ); 487 } 488 return LDAP_SUCCESS; 489 } 490 491 static Filter generic_filter = { LDAP_FILTER_PRESENT, { 0 }, NULL }; 492 static struct berval generic_filterstr = BER_BVC("(objectclass=*)"); 493 494 static int 495 syncprov_findbase( Operation *op, fbase_cookie *fc ) 496 { 497 /* Use basic parameters from syncrepl search, but use 498 * current op's threadctx / tmpmemctx 499 */ 500 ldap_pvt_thread_mutex_lock( &fc->fss->s_mutex ); 501 if ( fc->fss->s_flags & PS_FIND_BASE ) { 502 slap_callback cb = {0}; 503 Operation fop; 504 SlapReply frs = { REP_RESULT }; 505 int rc; 506 507 fc->fss->s_flags ^= PS_FIND_BASE; 508 ldap_pvt_thread_mutex_unlock( &fc->fss->s_mutex ); 509 510 fop = *fc->fss->s_op; 511 512 fop.o_bd = fop.o_bd->bd_self; 513 fop.o_hdr = op->o_hdr; 514 fop.o_time = op->o_time; 515 fop.o_tincr = op->o_tincr; 516 fop.o_extra = op->o_extra; 517 518 cb.sc_response = findbase_cb; 519 cb.sc_private = fc; 520 521 fop.o_sync_mode = 0; /* turn off sync mode */ 522 fop.o_managedsait = SLAP_CONTROL_CRITICAL; 523 fop.o_callback = &cb; 524 fop.o_tag = LDAP_REQ_SEARCH; 525 fop.ors_scope = LDAP_SCOPE_BASE; 526 fop.ors_limit = NULL; 527 fop.ors_slimit = 1; 528 fop.ors_tlimit = SLAP_NO_LIMIT; 529 fop.ors_attrs = slap_anlist_no_attrs; 530 fop.ors_attrsonly = 1; 531 fop.ors_filter = &generic_filter; 532 fop.ors_filterstr = generic_filterstr; 533 534 Debug( LDAP_DEBUG_SYNC, "%s syncprov_findbase: searching\n", op->o_log_prefix ); 535 rc = fop.o_bd->be_search( &fop, &frs ); 536 } else { 537 ldap_pvt_thread_mutex_unlock( &fc->fss->s_mutex ); 538 fc->fbase = 1; 539 } 540 541 /* After the first call, see if the fdn resides in the scope */ 542 if ( fc->fbase == 1 ) { 543 switch ( fc->fss->s_op->ors_scope ) { 544 case LDAP_SCOPE_BASE: 545 fc->fscope = dn_match( fc->fdn, &fc->fss->s_base ); 546 break; 547 case LDAP_SCOPE_ONELEVEL: { 548 struct berval pdn; 549 dnParent( fc->fdn, &pdn ); 550 fc->fscope = dn_match( &pdn, &fc->fss->s_base ); 551 break; } 552 case LDAP_SCOPE_SUBTREE: 553 fc->fscope = dnIsSuffix( fc->fdn, &fc->fss->s_base ); 554 break; 555 case LDAP_SCOPE_SUBORDINATE: 556 fc->fscope = dnIsSuffix( fc->fdn, &fc->fss->s_base ) && 557 !dn_match( fc->fdn, &fc->fss->s_base ); 558 break; 559 } 560 } 561 562 if ( fc->fbase ) 563 return LDAP_SUCCESS; 564 565 /* If entryID has changed, then the base of this search has 566 * changed. Invalidate the psearch. 567 */ 568 return LDAP_NO_SUCH_OBJECT; 569 } 570 571 /* syncprov_findcsn: 572 * This function has three different purposes, but they all use a search 573 * that filters on entryCSN so they're combined here. 574 * 1: at startup time, after a contextCSN has been read from the database, 575 * we search for all entries with CSN >= contextCSN in case the contextCSN 576 * was not checkpointed at the previous shutdown. 577 * 578 * 2: when the current contextCSN is known and we have a sync cookie, we search 579 * for one entry with CSN = the cookie CSN. If not found, try <= cookie CSN. 580 * If an entry is found, the cookie CSN is valid, otherwise it is stale. 581 * 582 * 3: during a refresh phase, we search for all entries with CSN <= the cookie 583 * CSN, and generate Present records for them. We always collect this result 584 * in SyncID sets, even if there's only one match. 585 */ 586 typedef enum find_csn_t { 587 FIND_MAXCSN = 1, 588 FIND_CSN = 2, 589 FIND_PRESENT = 3 590 } find_csn_t; 591 592 static int 593 findmax_cb( Operation *op, SlapReply *rs ) 594 { 595 if ( rs->sr_type == REP_SEARCH && rs->sr_err == LDAP_SUCCESS ) { 596 struct berval *maxcsn = op->o_callback->sc_private; 597 Attribute *a = attr_find( rs->sr_entry->e_attrs, 598 slap_schema.si_ad_entryCSN ); 599 600 if ( a && ber_bvcmp( &a->a_vals[0], maxcsn ) > 0 && 601 slap_parse_csn_sid( &a->a_vals[0] ) == slap_serverID ) { 602 maxcsn->bv_len = a->a_vals[0].bv_len; 603 strcpy( maxcsn->bv_val, a->a_vals[0].bv_val ); 604 } 605 } 606 return LDAP_SUCCESS; 607 } 608 609 static int 610 findcsn_cb( Operation *op, SlapReply *rs ) 611 { 612 slap_callback *sc = op->o_callback; 613 614 /* We just want to know that at least one exists, so it's OK if 615 * we exceed the unchecked limit. 616 */ 617 if ( rs->sr_err == LDAP_ADMINLIMIT_EXCEEDED || 618 (rs->sr_type == REP_SEARCH && rs->sr_err == LDAP_SUCCESS )) { 619 sc->sc_private = (void *)1; 620 } 621 return LDAP_SUCCESS; 622 } 623 624 /* Build a list of entryUUIDs for sending in a SyncID set */ 625 626 #define UUID_LEN 16 627 628 typedef struct fpres_cookie { 629 int num; 630 BerVarray uuids; 631 char *last; 632 } fpres_cookie; 633 634 static int 635 findpres_cb( Operation *op, SlapReply *rs ) 636 { 637 slap_callback *sc = op->o_callback; 638 fpres_cookie *pc = sc->sc_private; 639 Attribute *a; 640 int ret = SLAP_CB_CONTINUE; 641 642 switch ( rs->sr_type ) { 643 case REP_SEARCH: 644 a = attr_find( rs->sr_entry->e_attrs, slap_schema.si_ad_entryUUID ); 645 if ( a ) { 646 pc->uuids[pc->num].bv_val = pc->last; 647 AC_MEMCPY( pc->uuids[pc->num].bv_val, a->a_nvals[0].bv_val, 648 pc->uuids[pc->num].bv_len ); 649 pc->num++; 650 pc->last = pc->uuids[pc->num].bv_val; 651 pc->uuids[pc->num].bv_val = NULL; 652 } 653 ret = LDAP_SUCCESS; 654 if ( pc->num != SLAP_SYNCUUID_SET_SIZE ) 655 break; 656 /* FALLTHRU */ 657 case REP_RESULT: 658 ret = rs->sr_err; 659 if ( pc->num ) { 660 ret = syncprov_sendinfo( op, rs, LDAP_TAG_SYNC_ID_SET, NULL, 661 0, pc->uuids, 0 ); 662 pc->uuids[pc->num].bv_val = pc->last; 663 pc->num = 0; 664 pc->last = pc->uuids[0].bv_val; 665 } 666 break; 667 default: 668 break; 669 } 670 return ret; 671 } 672 673 static int 674 syncprov_findcsn( Operation *op, find_csn_t mode, struct berval *csn ) 675 { 676 slap_overinst *on = (slap_overinst *)op->o_bd->bd_info; 677 syncprov_info_t *si = on->on_bi.bi_private; 678 679 slap_callback cb = {0}; 680 Operation fop; 681 SlapReply frs = { REP_RESULT }; 682 char buf[LDAP_PVT_CSNSTR_BUFSIZE + STRLENOF("(entryCSN<=)")]; 683 char cbuf[LDAP_PVT_CSNSTR_BUFSIZE]; 684 struct berval maxcsn; 685 Filter cf; 686 AttributeAssertion eq = ATTRIBUTEASSERTION_INIT; 687 fpres_cookie pcookie; 688 sync_control *srs = NULL; 689 struct slap_limits_set fc_limits; 690 int i, rc = LDAP_SUCCESS, findcsn_retry = 1; 691 int maxid; 692 693 if ( mode != FIND_MAXCSN ) { 694 srs = op->o_controls[slap_cids.sc_LDAPsync]; 695 } 696 697 Debug( LDAP_DEBUG_SYNC, "%s syncprov_findcsn: mode=%s csn=%s\n", 698 op->o_log_prefix, 699 mode == FIND_MAXCSN ? 700 "FIND_MAXCSN" : 701 mode == FIND_CSN ? 702 "FIND_CSN" : 703 "FIND_PRESENT", 704 csn ? csn->bv_val : "" ); 705 706 fop = *op; 707 fop.o_sync_mode &= SLAP_CONTROL_MASK; /* turn off sync_mode */ 708 /* We want pure entries, not referrals */ 709 fop.o_managedsait = SLAP_CONTROL_CRITICAL; 710 711 cf.f_ava = &eq; 712 cf.f_av_desc = slap_schema.si_ad_entryCSN; 713 BER_BVZERO( &cf.f_av_value ); 714 cf.f_next = NULL; 715 716 fop.o_callback = &cb; 717 fop.ors_limit = NULL; 718 fop.ors_tlimit = SLAP_NO_LIMIT; 719 fop.ors_filter = &cf; 720 fop.ors_filterstr.bv_val = buf; 721 722 again: 723 switch( mode ) { 724 case FIND_MAXCSN: 725 cf.f_choice = LDAP_FILTER_GE; 726 /* If there are multiple CSNs, use the one with our serverID */ 727 for ( i=0; i<si->si_numcsns; i++) { 728 if ( slap_serverID == si->si_sids[i] ) { 729 maxid = i; 730 break; 731 } 732 } 733 if ( i == si->si_numcsns ) { 734 /* No match: this is multimaster, and none of the content in the DB 735 * originated locally. Treat like no CSN. 736 */ 737 return LDAP_NO_SUCH_OBJECT; 738 } 739 cf.f_av_value = si->si_ctxcsn[maxid]; 740 fop.ors_filterstr.bv_len = snprintf( buf, sizeof( buf ), 741 "(entryCSN>=%s)", cf.f_av_value.bv_val ); 742 if ( fop.ors_filterstr.bv_len >= sizeof( buf ) ) { 743 return LDAP_OTHER; 744 } 745 fop.ors_attrsonly = 0; 746 fop.ors_attrs = csn_anlist; 747 fop.ors_slimit = SLAP_NO_LIMIT; 748 cb.sc_private = &maxcsn; 749 cb.sc_response = findmax_cb; 750 strcpy( cbuf, cf.f_av_value.bv_val ); 751 maxcsn.bv_val = cbuf; 752 maxcsn.bv_len = cf.f_av_value.bv_len; 753 break; 754 case FIND_CSN: 755 if ( BER_BVISEMPTY( &cf.f_av_value )) { 756 cf.f_av_value = *csn; 757 } 758 fop.o_dn = op->o_bd->be_rootdn; 759 fop.o_ndn = op->o_bd->be_rootndn; 760 fop.o_req_dn = op->o_bd->be_suffix[0]; 761 fop.o_req_ndn = op->o_bd->be_nsuffix[0]; 762 /* Look for exact match the first time */ 763 if ( findcsn_retry ) { 764 cf.f_choice = LDAP_FILTER_EQUALITY; 765 fop.ors_filterstr.bv_len = snprintf( buf, sizeof( buf ), 766 "(entryCSN=%s)", cf.f_av_value.bv_val ); 767 /* On retry, look for <= */ 768 } else { 769 cf.f_choice = LDAP_FILTER_LE; 770 fop.ors_limit = &fc_limits; 771 memset( &fc_limits, 0, sizeof( fc_limits )); 772 fc_limits.lms_s_unchecked = 1; 773 fop.ors_filterstr.bv_len = snprintf( buf, sizeof( buf ), 774 "(entryCSN<=%s)", cf.f_av_value.bv_val ); 775 } 776 if ( fop.ors_filterstr.bv_len >= sizeof( buf ) ) { 777 return LDAP_OTHER; 778 } 779 fop.ors_attrsonly = 1; 780 fop.ors_attrs = slap_anlist_no_attrs; 781 fop.ors_slimit = 1; 782 cb.sc_private = NULL; 783 cb.sc_response = findcsn_cb; 784 break; 785 case FIND_PRESENT: 786 fop.ors_filter = op->ors_filter; 787 fop.ors_filterstr = op->ors_filterstr; 788 fop.ors_attrsonly = 0; 789 fop.ors_attrs = uuid_anlist; 790 fop.ors_slimit = SLAP_NO_LIMIT; 791 cb.sc_private = &pcookie; 792 cb.sc_response = findpres_cb; 793 pcookie.num = 0; 794 795 /* preallocate storage for a full set */ 796 pcookie.uuids = op->o_tmpalloc( (SLAP_SYNCUUID_SET_SIZE+1) * 797 sizeof(struct berval) + SLAP_SYNCUUID_SET_SIZE * UUID_LEN, 798 op->o_tmpmemctx ); 799 pcookie.last = (char *)(pcookie.uuids + SLAP_SYNCUUID_SET_SIZE+1); 800 pcookie.uuids[0].bv_val = pcookie.last; 801 pcookie.uuids[0].bv_len = UUID_LEN; 802 for (i=1; i<SLAP_SYNCUUID_SET_SIZE; i++) { 803 pcookie.uuids[i].bv_val = pcookie.uuids[i-1].bv_val + UUID_LEN; 804 pcookie.uuids[i].bv_len = UUID_LEN; 805 } 806 break; 807 } 808 809 fop.o_bd->bd_info = (BackendInfo *)on->on_info; 810 fop.o_bd->be_search( &fop, &frs ); 811 fop.o_bd->bd_info = (BackendInfo *)on; 812 813 switch( mode ) { 814 case FIND_MAXCSN: 815 if ( ber_bvcmp( &si->si_ctxcsn[maxid], &maxcsn )) { 816 #ifdef CHECK_CSN 817 Syntax *syn = slap_schema.si_ad_contextCSN->ad_type->sat_syntax; 818 assert( !syn->ssyn_validate( syn, &maxcsn )); 819 #endif 820 ber_bvreplace( &si->si_ctxcsn[maxid], &maxcsn ); 821 si->si_numops++; /* ensure a checkpoint */ 822 } 823 break; 824 case FIND_CSN: 825 /* If matching CSN was not found, invalidate the context. */ 826 Debug( LDAP_DEBUG_SYNC, "%s syncprov_findcsn: csn%s=%s %sfound\n", 827 op->o_log_prefix, 828 cf.f_choice == LDAP_FILTER_EQUALITY ? "=" : "<", 829 cf.f_av_value.bv_val, cb.sc_private ? "" : "not " ); 830 if ( !cb.sc_private ) { 831 /* If we didn't find an exact match, then try for <= */ 832 if ( findcsn_retry ) { 833 findcsn_retry = 0; 834 rs_reinit( &frs, REP_RESULT ); 835 goto again; 836 } 837 rc = LDAP_NO_SUCH_OBJECT; 838 } 839 break; 840 case FIND_PRESENT: 841 op->o_tmpfree( pcookie.uuids, op->o_tmpmemctx ); 842 break; 843 } 844 845 return rc; 846 } 847 848 static void free_resinfo( syncres *sr ) 849 { 850 syncres **st; 851 int freeit = 0; 852 ldap_pvt_thread_mutex_lock( &sr->s_info->ri_mutex ); 853 for (st = &sr->s_info->ri_list; *st; st = &(*st)->s_rilist) { 854 if (*st == sr) { 855 *st = sr->s_rilist; 856 break; 857 } 858 } 859 if ( !sr->s_info->ri_list ) 860 freeit = 1; 861 ldap_pvt_thread_mutex_unlock( &sr->s_info->ri_mutex ); 862 if ( freeit ) { 863 ldap_pvt_thread_mutex_destroy( &sr->s_info->ri_mutex ); 864 if ( sr->s_info->ri_e ) 865 entry_free( sr->s_info->ri_e ); 866 if ( !BER_BVISNULL( &sr->s_info->ri_cookie )) 867 ch_free( sr->s_info->ri_cookie.bv_val ); 868 ch_free( sr->s_info ); 869 } 870 } 871 872 #define FS_UNLINK 1 873 #define FS_LOCK 2 874 875 static int 876 syncprov_free_syncop( syncops *so, int flags ) 877 { 878 syncres *sr, *srnext; 879 GroupAssertion *ga, *gnext; 880 881 if ( flags & FS_LOCK ) 882 ldap_pvt_thread_mutex_lock( &so->s_mutex ); 883 /* already being freed, or still in use */ 884 if ( !so->s_inuse || --so->s_inuse > 0 ) { 885 if ( flags & FS_LOCK ) 886 ldap_pvt_thread_mutex_unlock( &so->s_mutex ); 887 return 0; 888 } 889 ldap_pvt_thread_mutex_unlock( &so->s_mutex ); 890 if (( flags & FS_UNLINK ) && so->s_si ) { 891 syncops **sop; 892 ldap_pvt_thread_mutex_lock( &so->s_si->si_ops_mutex ); 893 for ( sop = &so->s_si->si_ops; *sop; sop = &(*sop)->s_next ) { 894 if ( *sop == so ) { 895 *sop = so->s_next; 896 break; 897 } 898 } 899 ldap_pvt_thread_mutex_unlock( &so->s_si->si_ops_mutex ); 900 } 901 if ( so->s_flags & PS_IS_DETACHED ) { 902 filter_free( so->s_op->ors_filter ); 903 for ( ga = so->s_op->o_groups; ga; ga=gnext ) { 904 gnext = ga->ga_next; 905 ch_free( ga ); 906 } 907 ch_free( so->s_op ); 908 } 909 ch_free( so->s_base.bv_val ); 910 for ( sr=so->s_res; sr; sr=srnext ) { 911 srnext = sr->s_next; 912 free_resinfo( sr ); 913 ch_free( sr ); 914 } 915 ldap_pvt_thread_mutex_destroy( &so->s_mutex ); 916 ch_free( so ); 917 return 1; 918 } 919 920 /* Send a persistent search response */ 921 static int 922 syncprov_sendresp( Operation *op, resinfo *ri, syncops *so, int mode ) 923 { 924 SlapReply rs = { REP_SEARCH }; 925 struct berval cookie, csns[2]; 926 Entry e_uuid = {0}; 927 Attribute a_uuid = {0}; 928 929 if ( so->s_op->o_abandon ) 930 return SLAPD_ABANDON; 931 932 rs.sr_ctrls = op->o_tmpalloc( sizeof(LDAPControl *)*2, op->o_tmpmemctx ); 933 rs.sr_ctrls[1] = NULL; 934 rs.sr_flags = REP_CTRLS_MUSTBEFREED; 935 csns[0] = ri->ri_csn; 936 BER_BVZERO( &csns[1] ); 937 slap_compose_sync_cookie( op, &cookie, csns, so->s_rid, 938 slap_serverID ? slap_serverID : -1, NULL ); 939 940 #ifdef LDAP_DEBUG 941 if ( so->s_sid > 0 ) { 942 Debug( LDAP_DEBUG_SYNC, "%s syncprov_sendresp: to=%03x, cookie=%s\n", 943 op->o_log_prefix, so->s_sid, cookie.bv_val ); 944 } else { 945 Debug( LDAP_DEBUG_SYNC, "%s syncprov_sendresp: cookie=%s\n", 946 op->o_log_prefix, cookie.bv_val ); 947 } 948 #endif 949 950 e_uuid.e_attrs = &a_uuid; 951 a_uuid.a_desc = slap_schema.si_ad_entryUUID; 952 a_uuid.a_nvals = &ri->ri_uuid; 953 rs.sr_err = syncprov_state_ctrl( op, &rs, &e_uuid, 954 mode, rs.sr_ctrls, 0, 1, &cookie ); 955 op->o_tmpfree( cookie.bv_val, op->o_tmpmemctx ); 956 957 rs.sr_entry = &e_uuid; 958 if ( mode == LDAP_SYNC_ADD || mode == LDAP_SYNC_MODIFY ) { 959 e_uuid = *ri->ri_e; 960 e_uuid.e_private = NULL; 961 } 962 963 switch( mode ) { 964 case LDAP_SYNC_ADD: 965 if ( ri->ri_isref && so->s_op->o_managedsait <= SLAP_CONTROL_IGNORED ) { 966 rs.sr_ref = get_entry_referrals( op, rs.sr_entry ); 967 rs.sr_err = send_search_reference( op, &rs ); 968 ber_bvarray_free( rs.sr_ref ); 969 break; 970 } 971 /* fallthru */ 972 case LDAP_SYNC_MODIFY: 973 Debug( LDAP_DEBUG_SYNC, "%s syncprov_sendresp: sending %s, dn=%s\n", 974 op->o_log_prefix, 975 mode == LDAP_SYNC_ADD ? "LDAP_SYNC_ADD" : "LDAP_SYNC_MODIFY", 976 e_uuid.e_nname.bv_val ); 977 rs.sr_attrs = op->ors_attrs; 978 rs.sr_err = send_search_entry( op, &rs ); 979 break; 980 case LDAP_SYNC_DELETE: 981 Debug( LDAP_DEBUG_SYNC, "%s syncprov_sendresp: " 982 "sending LDAP_SYNC_DELETE, dn=%s\n", 983 op->o_log_prefix, ri->ri_dn.bv_val ); 984 e_uuid.e_attrs = NULL; 985 e_uuid.e_name = ri->ri_dn; 986 e_uuid.e_nname = ri->ri_ndn; 987 if ( ri->ri_isref && so->s_op->o_managedsait <= SLAP_CONTROL_IGNORED ) { 988 struct berval bv = BER_BVNULL; 989 rs.sr_ref = &bv; 990 rs.sr_err = send_search_reference( op, &rs ); 991 } else { 992 rs.sr_err = send_search_entry( op, &rs ); 993 } 994 break; 995 default: 996 assert(0); 997 } 998 return rs.sr_err; 999 } 1000 1001 static void 1002 syncprov_qstart( syncops *so ); 1003 1004 /* Play back queued responses */ 1005 static int 1006 syncprov_qplay( Operation *op, syncops *so ) 1007 { 1008 syncres *sr; 1009 int rc = 0; 1010 1011 do { 1012 ldap_pvt_thread_mutex_lock( &so->s_mutex ); 1013 sr = so->s_res; 1014 /* Exit loop with mutex held */ 1015 if ( !sr ) 1016 break; 1017 so->s_res = sr->s_next; 1018 if ( !so->s_res ) 1019 so->s_restail = NULL; 1020 ldap_pvt_thread_mutex_unlock( &so->s_mutex ); 1021 1022 if ( !so->s_op->o_abandon ) { 1023 1024 if ( sr->s_mode == LDAP_SYNC_NEW_COOKIE ) { 1025 SlapReply rs = { REP_INTERMEDIATE }; 1026 1027 rc = syncprov_sendinfo( op, &rs, LDAP_TAG_SYNC_NEW_COOKIE, 1028 &sr->s_info->ri_cookie, 0, NULL, 0 ); 1029 } else { 1030 rc = syncprov_sendresp( op, sr->s_info, so, sr->s_mode ); 1031 } 1032 } 1033 1034 free_resinfo( sr ); 1035 ch_free( sr ); 1036 1037 if ( so->s_op->o_abandon ) 1038 continue; 1039 1040 /* Exit loop with mutex held */ 1041 ldap_pvt_thread_mutex_lock( &so->s_mutex ); 1042 break; 1043 1044 } while (1); 1045 1046 /* We now only send one change at a time, to prevent one 1047 * psearch from hogging all the CPU. Resubmit this task if 1048 * there are more responses queued and no errors occurred. 1049 */ 1050 1051 if ( rc == 0 && so->s_res ) { 1052 syncprov_qstart( so ); 1053 } 1054 1055 return rc; 1056 } 1057 1058 /* task for playing back queued responses */ 1059 static void * 1060 syncprov_qtask( void *ctx, void *arg ) 1061 { 1062 syncops *so = arg; 1063 OperationBuffer opbuf; 1064 Operation *op; 1065 BackendDB be; 1066 int rc; 1067 1068 op = &opbuf.ob_op; 1069 *op = *so->s_op; 1070 op->o_hdr = &opbuf.ob_hdr; 1071 op->o_controls = opbuf.ob_controls; 1072 memset( op->o_controls, 0, sizeof(opbuf.ob_controls) ); 1073 op->o_sync = SLAP_CONTROL_IGNORED; 1074 1075 *op->o_hdr = *so->s_op->o_hdr; 1076 1077 op->o_tmpmemctx = slap_sl_mem_create(SLAP_SLAB_SIZE, SLAP_SLAB_STACK, ctx, 1); 1078 op->o_tmpmfuncs = &slap_sl_mfuncs; 1079 op->o_threadctx = ctx; 1080 1081 /* syncprov_qplay expects a fake db */ 1082 be = *so->s_op->o_bd; 1083 be.be_flags |= SLAP_DBFLAG_OVERLAY; 1084 op->o_bd = &be; 1085 LDAP_SLIST_FIRST(&op->o_extra) = NULL; 1086 op->o_callback = NULL; 1087 1088 rc = syncprov_qplay( op, so ); 1089 1090 /* if an error occurred, or no responses left, task is no longer queued */ 1091 if ( !rc && !so->s_res ) 1092 rc = 1; 1093 1094 /* decrement use count... */ 1095 if ( !syncprov_free_syncop( so, FS_UNLINK )) { 1096 if ( rc ) 1097 /* if we didn't unlink, and task is no longer queued, clear flag */ 1098 so->s_flags ^= PS_TASK_QUEUED; 1099 ldap_pvt_thread_mutex_unlock( &so->s_mutex ); 1100 } 1101 1102 return NULL; 1103 } 1104 1105 /* Start the task to play back queued psearch responses */ 1106 static void 1107 syncprov_qstart( syncops *so ) 1108 { 1109 so->s_flags |= PS_TASK_QUEUED; 1110 so->s_inuse++; 1111 ldap_pvt_thread_pool_submit2( &connection_pool, 1112 syncprov_qtask, so, &so->s_pool_cookie ); 1113 } 1114 1115 /* Queue a persistent search response */ 1116 static int 1117 syncprov_qresp( opcookie *opc, syncops *so, int mode ) 1118 { 1119 syncres *sr; 1120 resinfo *ri; 1121 int srsize; 1122 struct berval csn = opc->sctxcsn; 1123 1124 sr = ch_malloc( sizeof( syncres )); 1125 sr->s_next = NULL; 1126 sr->s_mode = mode; 1127 if ( !opc->ssres.s_info ) { 1128 1129 srsize = sizeof( resinfo ); 1130 if ( csn.bv_len ) 1131 srsize += csn.bv_len + 1; 1132 1133 if ( opc->se ) { 1134 Attribute *a; 1135 ri = ch_malloc( srsize ); 1136 ri->ri_dn = opc->se->e_name; 1137 ri->ri_ndn = opc->se->e_nname; 1138 a = attr_find( opc->se->e_attrs, slap_schema.si_ad_entryUUID ); 1139 if ( a ) 1140 ri->ri_uuid = a->a_nvals[0]; 1141 else 1142 ri->ri_uuid.bv_len = 0; 1143 if ( csn.bv_len ) { 1144 ri->ri_csn.bv_val = (char *)(ri + 1); 1145 ri->ri_csn.bv_len = csn.bv_len; 1146 memcpy( ri->ri_csn.bv_val, csn.bv_val, csn.bv_len ); 1147 ri->ri_csn.bv_val[csn.bv_len] = '\0'; 1148 } else { 1149 ri->ri_csn.bv_val = NULL; 1150 } 1151 } else { 1152 srsize += opc->suuid.bv_len + 1153 opc->sdn.bv_len + 1 + opc->sndn.bv_len + 1; 1154 ri = ch_malloc( srsize ); 1155 ri->ri_dn.bv_val = (char *)(ri + 1); 1156 ri->ri_dn.bv_len = opc->sdn.bv_len; 1157 ri->ri_ndn.bv_val = lutil_strcopy( ri->ri_dn.bv_val, 1158 opc->sdn.bv_val ) + 1; 1159 ri->ri_ndn.bv_len = opc->sndn.bv_len; 1160 ri->ri_uuid.bv_val = lutil_strcopy( ri->ri_ndn.bv_val, 1161 opc->sndn.bv_val ) + 1; 1162 ri->ri_uuid.bv_len = opc->suuid.bv_len; 1163 AC_MEMCPY( ri->ri_uuid.bv_val, opc->suuid.bv_val, opc->suuid.bv_len ); 1164 if ( csn.bv_len ) { 1165 ri->ri_csn.bv_val = ri->ri_uuid.bv_val + ri->ri_uuid.bv_len; 1166 memcpy( ri->ri_csn.bv_val, csn.bv_val, csn.bv_len ); 1167 ri->ri_csn.bv_val[csn.bv_len] = '\0'; 1168 } else { 1169 ri->ri_csn.bv_val = NULL; 1170 } 1171 } 1172 ri->ri_list = &opc->ssres; 1173 ri->ri_e = opc->se; 1174 ri->ri_csn.bv_len = csn.bv_len; 1175 ri->ri_isref = opc->sreference; 1176 BER_BVZERO( &ri->ri_cookie ); 1177 ldap_pvt_thread_mutex_init( &ri->ri_mutex ); 1178 opc->se = NULL; 1179 opc->ssres.s_info = ri; 1180 } 1181 ri = opc->ssres.s_info; 1182 sr->s_info = ri; 1183 ldap_pvt_thread_mutex_lock( &ri->ri_mutex ); 1184 sr->s_rilist = ri->ri_list; 1185 ri->ri_list = sr; 1186 if ( mode == LDAP_SYNC_NEW_COOKIE && BER_BVISNULL( &ri->ri_cookie )) { 1187 syncprov_info_t *si = opc->son->on_bi.bi_private; 1188 1189 slap_compose_sync_cookie( NULL, &ri->ri_cookie, si->si_ctxcsn, 1190 so->s_rid, slap_serverID ? slap_serverID : -1, NULL ); 1191 } 1192 Debug( LDAP_DEBUG_SYNC, "%s syncprov_qresp: " 1193 "set up a new syncres mode=%d csn=%s\n", 1194 so->s_op->o_log_prefix, mode, csn.bv_val ? csn.bv_val : "" ); 1195 ldap_pvt_thread_mutex_unlock( &ri->ri_mutex ); 1196 1197 ldap_pvt_thread_mutex_lock( &so->s_mutex ); 1198 if ( !so->s_res ) { 1199 so->s_res = sr; 1200 } else { 1201 so->s_restail->s_next = sr; 1202 } 1203 so->s_restail = sr; 1204 1205 /* If the base of the psearch was modified, check it next time round */ 1206 if ( so->s_flags & PS_WROTE_BASE ) { 1207 so->s_flags ^= PS_WROTE_BASE; 1208 so->s_flags |= PS_FIND_BASE; 1209 } 1210 if (( so->s_flags & (PS_IS_DETACHED|PS_TASK_QUEUED)) == PS_IS_DETACHED ) { 1211 syncprov_qstart( so ); 1212 } 1213 ldap_pvt_thread_mutex_unlock( &so->s_mutex ); 1214 return LDAP_SUCCESS; 1215 } 1216 1217 static int 1218 syncprov_drop_psearch( syncops *so, int lock ) 1219 { 1220 if ( so->s_flags & PS_IS_DETACHED ) { 1221 if ( lock ) 1222 ldap_pvt_thread_mutex_lock( &so->s_op->o_conn->c_mutex ); 1223 so->s_op->o_conn->c_n_ops_executing--; 1224 so->s_op->o_conn->c_n_ops_completed++; 1225 LDAP_STAILQ_REMOVE( &so->s_op->o_conn->c_ops, so->s_op, Operation, 1226 o_next ); 1227 if ( lock ) 1228 ldap_pvt_thread_mutex_unlock( &so->s_op->o_conn->c_mutex ); 1229 } 1230 return syncprov_free_syncop( so, FS_LOCK ); 1231 } 1232 1233 static int 1234 syncprov_ab_cleanup( Operation *op, SlapReply *rs ) 1235 { 1236 slap_callback *sc = op->o_callback; 1237 op->o_callback = sc->sc_next; 1238 syncprov_drop_psearch( sc->sc_private, 0 ); 1239 op->o_tmpfree( sc, op->o_tmpmemctx ); 1240 return 0; 1241 } 1242 1243 static int 1244 syncprov_op_abandon( Operation *op, SlapReply *rs ) 1245 { 1246 slap_overinst *on = (slap_overinst *)op->o_bd->bd_info; 1247 syncprov_info_t *si = on->on_bi.bi_private; 1248 syncops *so, **sop; 1249 1250 ldap_pvt_thread_mutex_lock( &si->si_ops_mutex ); 1251 for ( sop=&si->si_ops; (so = *sop); sop = &(*sop)->s_next ) { 1252 if ( so->s_op->o_connid == op->o_connid && 1253 so->s_op->o_msgid == op->orn_msgid ) { 1254 so->s_op->o_abandon = 1; 1255 *sop = so->s_next; 1256 break; 1257 } 1258 } 1259 ldap_pvt_thread_mutex_unlock( &si->si_ops_mutex ); 1260 if ( so ) { 1261 /* Is this really a Cancel exop? */ 1262 if ( op->o_tag != LDAP_REQ_ABANDON ) { 1263 so->s_op->o_cancel = SLAP_CANCEL_ACK; 1264 rs->sr_err = LDAP_CANCELLED; 1265 send_ldap_result( so->s_op, rs ); 1266 if ( so->s_flags & PS_IS_DETACHED ) { 1267 slap_callback *cb; 1268 cb = op->o_tmpcalloc( 1, sizeof(slap_callback), op->o_tmpmemctx ); 1269 cb->sc_cleanup = syncprov_ab_cleanup; 1270 cb->sc_next = op->o_callback; 1271 cb->sc_private = so; 1272 op->o_callback = cb; 1273 return SLAP_CB_CONTINUE; 1274 } 1275 } 1276 syncprov_drop_psearch( so, 0 ); 1277 } 1278 return SLAP_CB_CONTINUE; 1279 } 1280 1281 /* Find which persistent searches are affected by this operation */ 1282 static void 1283 syncprov_matchops( Operation *op, opcookie *opc, int saveit ) 1284 { 1285 slap_overinst *on = opc->son; 1286 syncprov_info_t *si = on->on_bi.bi_private; 1287 1288 fbase_cookie fc; 1289 syncops **pss; 1290 Entry *e = NULL; 1291 Attribute *a; 1292 int rc, gonext; 1293 struct berval newdn; 1294 int freefdn = 0; 1295 BackendDB *b0 = op->o_bd, db; 1296 1297 fc.fdn = &op->o_req_ndn; 1298 /* compute new DN */ 1299 if ( op->o_tag == LDAP_REQ_MODRDN && !saveit ) { 1300 struct berval pdn; 1301 if ( op->orr_nnewSup ) pdn = *op->orr_nnewSup; 1302 else dnParent( fc.fdn, &pdn ); 1303 build_new_dn( &newdn, &pdn, &op->orr_nnewrdn, op->o_tmpmemctx ); 1304 fc.fdn = &newdn; 1305 freefdn = 1; 1306 } 1307 if ( op->o_tag != LDAP_REQ_ADD ) { 1308 if ( !SLAP_ISOVERLAY( op->o_bd )) { 1309 db = *op->o_bd; 1310 op->o_bd = &db; 1311 } 1312 rc = overlay_entry_get_ov( op, fc.fdn, NULL, NULL, 0, &e, on ); 1313 /* If we're sending responses now, make a copy and unlock the DB */ 1314 if ( e && !saveit ) { 1315 if ( !opc->se ) 1316 opc->se = entry_dup( e ); 1317 overlay_entry_release_ov( op, e, 0, on ); 1318 e = opc->se; 1319 } 1320 if ( rc ) { 1321 Debug( LDAP_DEBUG_SYNC, "%s syncprov_matchops: " 1322 "%s check, error finding entry dn=%s in database\n", 1323 op->o_log_prefix, saveit ? "initial" : "final", fc.fdn->bv_val ); 1324 op->o_bd = b0; 1325 return; 1326 } 1327 } else { 1328 e = op->ora_e; 1329 if ( !saveit ) { 1330 if ( !opc->se ) 1331 opc->se = entry_dup( e ); 1332 e = opc->se; 1333 } 1334 } 1335 1336 if ( saveit || op->o_tag == LDAP_REQ_ADD ) { 1337 ber_dupbv_x( &opc->sdn, &e->e_name, op->o_tmpmemctx ); 1338 ber_dupbv_x( &opc->sndn, &e->e_nname, op->o_tmpmemctx ); 1339 opc->sreference = is_entry_referral( e ); 1340 a = attr_find( e->e_attrs, slap_schema.si_ad_entryUUID ); 1341 if ( a ) 1342 ber_dupbv_x( &opc->suuid, &a->a_nvals[0], op->o_tmpmemctx ); 1343 Debug( LDAP_DEBUG_SYNC, "%s syncprov_matchops: " 1344 "%srecording uuid for dn=%s on opc=%p\n", 1345 op->o_log_prefix, a ? "" : "not ", opc->sdn.bv_val, opc ); 1346 } else if ( op->o_tag == LDAP_REQ_MODRDN && !saveit ) { 1347 op->o_tmpfree( opc->sndn.bv_val, op->o_tmpmemctx ); 1348 op->o_tmpfree( opc->sdn.bv_val, op->o_tmpmemctx ); 1349 ber_dupbv_x( &opc->sdn, &e->e_name, op->o_tmpmemctx ); 1350 ber_dupbv_x( &opc->sndn, &e->e_nname, op->o_tmpmemctx ); 1351 } 1352 1353 ldap_pvt_thread_mutex_lock( &si->si_ops_mutex ); 1354 for (pss = &si->si_ops; *pss; pss = gonext ? &(*pss)->s_next : pss) 1355 { 1356 Operation op2; 1357 Opheader oh; 1358 syncmatches *sm; 1359 int found = 0; 1360 syncops *snext, *ss = *pss; 1361 1362 gonext = 1; 1363 if ( ss->s_op->o_abandon ) 1364 continue; 1365 1366 /* Don't send ops back to the originator */ 1367 if ( opc->osid > 0 && opc->osid == ss->s_sid ) { 1368 Debug( LDAP_DEBUG_SYNC, "%s syncprov_matchops: " 1369 "skipping original sid %03x\n", 1370 ss->s_op->o_log_prefix, opc->osid ); 1371 continue; 1372 } 1373 1374 /* Don't send ops back to the messenger */ 1375 if ( opc->rsid > 0 && opc->rsid == ss->s_sid ) { 1376 Debug( LDAP_DEBUG_SYNC, "%s syncprov_matchops: " 1377 "skipping relayed sid %03x\n", 1378 ss->s_op->o_log_prefix, opc->rsid ); 1379 continue; 1380 } 1381 1382 /* validate base */ 1383 fc.fss = ss; 1384 fc.fbase = 0; 1385 fc.fscope = 0; 1386 1387 /* If the base of the search is missing, signal a refresh */ 1388 rc = syncprov_findbase( op, &fc ); 1389 if ( rc != LDAP_SUCCESS ) { 1390 SlapReply rs = {REP_RESULT}; 1391 send_ldap_error( ss->s_op, &rs, LDAP_SYNC_REFRESH_REQUIRED, 1392 "search base has changed" ); 1393 snext = ss->s_next; 1394 if ( syncprov_drop_psearch( ss, 1 ) ) 1395 *pss = snext; 1396 gonext = 0; 1397 continue; 1398 } 1399 1400 /* If we're sending results now, look for this op in old matches */ 1401 if ( !saveit ) { 1402 syncmatches *old; 1403 1404 /* Did we modify the search base? */ 1405 if ( dn_match( &op->o_req_ndn, &ss->s_base )) { 1406 ldap_pvt_thread_mutex_lock( &ss->s_mutex ); 1407 ss->s_flags |= PS_WROTE_BASE; 1408 ldap_pvt_thread_mutex_unlock( &ss->s_mutex ); 1409 } 1410 1411 for ( sm=opc->smatches, old=(syncmatches *)&opc->smatches; sm; 1412 old=sm, sm=sm->sm_next ) { 1413 if ( sm->sm_op == ss ) { 1414 found = 1; 1415 old->sm_next = sm->sm_next; 1416 op->o_tmpfree( sm, op->o_tmpmemctx ); 1417 break; 1418 } 1419 } 1420 } 1421 1422 if ( fc.fscope ) { 1423 ldap_pvt_thread_mutex_lock( &ss->s_mutex ); 1424 op2 = *ss->s_op; 1425 oh = *op->o_hdr; 1426 oh.oh_conn = ss->s_op->o_conn; 1427 oh.oh_connid = ss->s_op->o_connid; 1428 op2.o_bd = op->o_bd->bd_self; 1429 op2.o_hdr = &oh; 1430 op2.o_extra = op->o_extra; 1431 op2.o_callback = NULL; 1432 if (ss->s_flags & PS_FIX_FILTER) { 1433 /* Skip the AND/GE clause that we stuck on in front. We 1434 would lose deletes/mods that happen during the refresh 1435 phase otherwise (ITS#6555) */ 1436 op2.ors_filter = ss->s_op->ors_filter->f_and->f_next; 1437 } 1438 rc = test_filter( &op2, e, op2.ors_filter ); 1439 ldap_pvt_thread_mutex_unlock( &ss->s_mutex ); 1440 } 1441 1442 Debug( LDAP_DEBUG_TRACE, "%s syncprov_matchops: " 1443 "sid %03x fscope %d rc %d\n", 1444 ss->s_op->o_log_prefix, ss->s_sid, fc.fscope, rc ); 1445 1446 /* check if current o_req_dn is in scope and matches filter */ 1447 if ( fc.fscope && rc == LDAP_COMPARE_TRUE ) { 1448 if ( saveit ) { 1449 sm = op->o_tmpalloc( sizeof(syncmatches), op->o_tmpmemctx ); 1450 sm->sm_next = opc->smatches; 1451 sm->sm_op = ss; 1452 ldap_pvt_thread_mutex_lock( &ss->s_mutex ); 1453 ++ss->s_inuse; 1454 ldap_pvt_thread_mutex_unlock( &ss->s_mutex ); 1455 opc->smatches = sm; 1456 } else { 1457 /* if found send UPDATE else send ADD */ 1458 syncprov_qresp( opc, ss, 1459 found ? LDAP_SYNC_MODIFY : LDAP_SYNC_ADD ); 1460 } 1461 } else if ( !saveit && found ) { 1462 /* send DELETE */ 1463 syncprov_qresp( opc, ss, LDAP_SYNC_DELETE ); 1464 } else if ( !saveit ) { 1465 syncprov_qresp( opc, ss, LDAP_SYNC_NEW_COOKIE ); 1466 } 1467 if ( !saveit && found ) { 1468 /* Decrement s_inuse, was incremented when called 1469 * with saveit == TRUE 1470 */ 1471 snext = ss->s_next; 1472 if ( syncprov_free_syncop( ss, FS_LOCK ) ) { 1473 *pss = snext; 1474 gonext = 0; 1475 } 1476 } 1477 } 1478 ldap_pvt_thread_mutex_unlock( &si->si_ops_mutex ); 1479 1480 if ( op->o_tag != LDAP_REQ_ADD && e ) { 1481 if ( !SLAP_ISOVERLAY( op->o_bd )) { 1482 op->o_bd = &db; 1483 } 1484 if ( saveit ) 1485 overlay_entry_release_ov( op, e, 0, on ); 1486 op->o_bd = b0; 1487 } 1488 if ( !saveit ) { 1489 if ( opc->ssres.s_info ) 1490 free_resinfo( &opc->ssres ); 1491 else if ( opc->se ) 1492 entry_free( opc->se ); 1493 } 1494 if ( freefdn ) { 1495 op->o_tmpfree( fc.fdn->bv_val, op->o_tmpmemctx ); 1496 } 1497 op->o_bd = b0; 1498 } 1499 1500 static int 1501 syncprov_op_cleanup( Operation *op, SlapReply *rs ) 1502 { 1503 slap_callback *cb = op->o_callback; 1504 opcookie *opc = cb->sc_private; 1505 slap_overinst *on = opc->son; 1506 syncprov_info_t *si = on->on_bi.bi_private; 1507 syncmatches *sm, *snext; 1508 modtarget *mt; 1509 1510 ldap_pvt_thread_mutex_lock( &si->si_ops_mutex ); 1511 if ( si->si_active ) 1512 si->si_active--; 1513 ldap_pvt_thread_mutex_unlock( &si->si_ops_mutex ); 1514 1515 for (sm = opc->smatches; sm; sm=snext) { 1516 snext = sm->sm_next; 1517 syncprov_free_syncop( sm->sm_op, FS_LOCK|FS_UNLINK ); 1518 op->o_tmpfree( sm, op->o_tmpmemctx ); 1519 } 1520 1521 /* Remove op from lock table */ 1522 mt = opc->smt; 1523 if ( mt ) { 1524 modinst *mi = (modinst *)(opc+1), **m2; 1525 ldap_pvt_thread_mutex_lock( &mt->mt_mutex ); 1526 for (m2 = &mt->mt_mods; ; m2 = &(*m2)->mi_next) { 1527 if ( *m2 == mi ) { 1528 *m2 = mi->mi_next; 1529 if ( mt->mt_tail == mi ) 1530 mt->mt_tail = ( m2 == &mt->mt_mods ) ? NULL : (modinst *)m2; 1531 break; 1532 } 1533 } 1534 /* If there are more, promote the next one */ 1535 if ( mt->mt_mods ) { 1536 ldap_pvt_thread_mutex_unlock( &mt->mt_mutex ); 1537 } else { 1538 ldap_pvt_thread_mutex_unlock( &mt->mt_mutex ); 1539 ldap_pvt_thread_mutex_lock( &si->si_mods_mutex ); 1540 ldap_avl_delete( &si->si_mods, mt, sp_avl_cmp ); 1541 ldap_pvt_thread_mutex_unlock( &si->si_mods_mutex ); 1542 ldap_pvt_thread_mutex_destroy( &mt->mt_mutex ); 1543 ch_free( mt->mt_dn.bv_val ); 1544 ch_free( mt ); 1545 } 1546 } 1547 if ( !BER_BVISNULL( &opc->suuid )) 1548 op->o_tmpfree( opc->suuid.bv_val, op->o_tmpmemctx ); 1549 if ( !BER_BVISNULL( &opc->sndn )) 1550 op->o_tmpfree( opc->sndn.bv_val, op->o_tmpmemctx ); 1551 if ( !BER_BVISNULL( &opc->sdn )) 1552 op->o_tmpfree( opc->sdn.bv_val, op->o_tmpmemctx ); 1553 op->o_callback = cb->sc_next; 1554 op->o_tmpfree(cb, op->o_tmpmemctx); 1555 1556 return 0; 1557 } 1558 1559 static void 1560 syncprov_checkpoint( Operation *op, slap_overinst *on ) 1561 { 1562 syncprov_info_t *si = (syncprov_info_t *)on->on_bi.bi_private; 1563 Modifications mod; 1564 Operation opm; 1565 SlapReply rsm = {REP_RESULT}; 1566 slap_callback cb = {0}; 1567 BackendDB be; 1568 BackendInfo *bi; 1569 1570 #ifdef CHECK_CSN 1571 Syntax *syn = slap_schema.si_ad_contextCSN->ad_type->sat_syntax; 1572 1573 int i; 1574 for ( i=0; i<si->si_numcsns; i++ ) { 1575 assert( !syn->ssyn_validate( syn, si->si_ctxcsn+i )); 1576 } 1577 #endif 1578 1579 Debug( LDAP_DEBUG_SYNC, "%s syncprov_checkpoint: running checkpoint\n", 1580 op->o_log_prefix ); 1581 1582 mod.sml_numvals = si->si_numcsns; 1583 mod.sml_values = si->si_ctxcsn; 1584 mod.sml_nvalues = NULL; 1585 mod.sml_desc = slap_schema.si_ad_contextCSN; 1586 mod.sml_op = LDAP_MOD_REPLACE; 1587 mod.sml_flags = SLAP_MOD_INTERNAL; 1588 mod.sml_next = NULL; 1589 1590 cb.sc_response = slap_null_cb; 1591 opm = *op; 1592 opm.o_tag = LDAP_REQ_MODIFY; 1593 opm.o_callback = &cb; 1594 opm.orm_modlist = &mod; 1595 opm.orm_no_opattrs = 1; 1596 if ( SLAP_GLUE_SUBORDINATE( op->o_bd )) { 1597 be = *on->on_info->oi_origdb; 1598 opm.o_bd = &be; 1599 } 1600 opm.o_req_dn = si->si_contextdn; 1601 opm.o_req_ndn = si->si_contextdn; 1602 bi = opm.o_bd->bd_info; 1603 opm.o_bd->bd_info = on->on_info->oi_orig; 1604 opm.o_managedsait = SLAP_CONTROL_NONCRITICAL; 1605 opm.o_no_schema_check = 1; 1606 opm.o_dont_replicate = 1; 1607 opm.o_opid = -1; 1608 opm.o_bd->be_modify( &opm, &rsm ); 1609 1610 if ( rsm.sr_err == LDAP_NO_SUCH_OBJECT && 1611 SLAP_SYNC_SUBENTRY( opm.o_bd )) { 1612 const char *text; 1613 char txtbuf[SLAP_TEXT_BUFLEN]; 1614 size_t textlen = sizeof txtbuf; 1615 Entry *e = slap_create_context_csn_entry( opm.o_bd, NULL ); 1616 rs_reinit( &rsm, REP_RESULT ); 1617 slap_mods2entry( &mod, &e, 0, 1, &text, txtbuf, textlen); 1618 opm.ora_e = e; 1619 opm.o_bd->be_add( &opm, &rsm ); 1620 if ( e == opm.ora_e ) 1621 be_entry_release_w( &opm, opm.ora_e ); 1622 } 1623 opm.o_bd->bd_info = bi; 1624 1625 if ( mod.sml_next != NULL ) { 1626 slap_mods_free( mod.sml_next, 1 ); 1627 } 1628 #ifdef CHECK_CSN 1629 for ( i=0; i<si->si_numcsns; i++ ) { 1630 assert( !syn->ssyn_validate( syn, si->si_ctxcsn+i )); 1631 } 1632 #endif 1633 } 1634 1635 static void 1636 syncprov_add_slog( Operation *op ) 1637 { 1638 opcookie *opc = op->o_callback->sc_private; 1639 slap_overinst *on = opc->son; 1640 syncprov_info_t *si = on->on_bi.bi_private; 1641 sessionlog *sl; 1642 slog_entry *se; 1643 char uuidstr[40]; 1644 int rc; 1645 1646 sl = si->si_logs; 1647 { 1648 if ( BER_BVISEMPTY( &op->o_csn ) ) { 1649 /* During the syncrepl refresh phase we can receive operations 1650 * without a csn. We cannot reliably determine the consumers 1651 * state with respect to such operations, so we ignore them and 1652 * wipe out anything in the log if we see them. 1653 */ 1654 ldap_pvt_thread_rdwr_wlock( &sl->sl_mutex ); 1655 /* can only do this if no one else is reading the log at the moment */ 1656 if ( !sl->sl_playing ) { 1657 ldap_tavl_free( sl->sl_entries, (AVL_FREE)ch_free ); 1658 sl->sl_num = 0; 1659 sl->sl_entries = NULL; 1660 } 1661 ldap_pvt_thread_rdwr_wunlock( &sl->sl_mutex ); 1662 return; 1663 } 1664 1665 /* Allocate a record. UUIDs are not NUL-terminated. */ 1666 se = ch_malloc( sizeof( slog_entry ) + opc->suuid.bv_len + 1667 op->o_csn.bv_len + 1 ); 1668 se->se_tag = op->o_tag; 1669 1670 se->se_uuid.bv_val = (char *)(&se[1]); 1671 AC_MEMCPY( se->se_uuid.bv_val, opc->suuid.bv_val, opc->suuid.bv_len ); 1672 se->se_uuid.bv_len = opc->suuid.bv_len; 1673 1674 se->se_csn.bv_val = se->se_uuid.bv_val + opc->suuid.bv_len; 1675 AC_MEMCPY( se->se_csn.bv_val, op->o_csn.bv_val, op->o_csn.bv_len ); 1676 se->se_csn.bv_val[op->o_csn.bv_len] = '\0'; 1677 se->se_csn.bv_len = op->o_csn.bv_len; 1678 se->se_sid = slap_parse_csn_sid( &se->se_csn ); 1679 1680 ldap_pvt_thread_rdwr_wlock( &sl->sl_mutex ); 1681 if ( LogTest( LDAP_DEBUG_SYNC ) ) { 1682 uuidstr[0] = 0; 1683 if ( !BER_BVISEMPTY( &opc->suuid ) ) { 1684 lutil_uuidstr_from_normalized( opc->suuid.bv_val, opc->suuid.bv_len, 1685 uuidstr, 40 ); 1686 } 1687 Debug( LDAP_DEBUG_SYNC, "%s syncprov_add_slog: " 1688 "adding csn=%s to sessionlog, uuid=%s\n", 1689 op->o_log_prefix, se->se_csn.bv_val, uuidstr ); 1690 } 1691 if ( !sl->sl_entries ) { 1692 if ( !sl->sl_mincsn ) { 1693 sl->sl_numcsns = 1; 1694 sl->sl_mincsn = ch_malloc( 2*sizeof( struct berval )); 1695 sl->sl_sids = ch_malloc( sizeof( int )); 1696 sl->sl_sids[0] = se->se_sid; 1697 ber_dupbv( sl->sl_mincsn, &se->se_csn ); 1698 BER_BVZERO( &sl->sl_mincsn[1] ); 1699 } 1700 } 1701 rc = ldap_tavl_insert( &sl->sl_entries, se, syncprov_sessionlog_cmp, ldap_avl_dup_error ); 1702 if ( rc ) { 1703 Debug( LDAP_DEBUG_SYNC, "%s syncprov_add_slog: " 1704 "duplicate sessionlog entry ignored: csn=%s, uuid=%s\n", 1705 op->o_log_prefix, se->se_csn.bv_val, uuidstr ); 1706 ch_free( se ); 1707 goto leave; 1708 } 1709 sl->sl_num++; 1710 if ( !sl->sl_playing && sl->sl_num > sl->sl_size ) { 1711 TAvlnode *edge = ldap_tavl_end( sl->sl_entries, TAVL_DIR_LEFT ); 1712 while ( sl->sl_num > sl->sl_size ) { 1713 int i; 1714 TAvlnode *next = ldap_tavl_next( edge, TAVL_DIR_RIGHT ); 1715 se = edge->avl_data; 1716 Debug( LDAP_DEBUG_SYNC, "%s syncprov_add_slog: " 1717 "expiring csn=%s from sessionlog (sessionlog size=%d)\n", 1718 op->o_log_prefix, se->se_csn.bv_val, sl->sl_num ); 1719 for ( i=0; i<sl->sl_numcsns; i++ ) 1720 if ( sl->sl_sids[i] >= se->se_sid ) 1721 break; 1722 if ( i == sl->sl_numcsns || sl->sl_sids[i] != se->se_sid ) { 1723 Debug( LDAP_DEBUG_SYNC, "%s syncprov_add_slog: " 1724 "adding csn=%s to mincsn\n", 1725 op->o_log_prefix, se->se_csn.bv_val ); 1726 slap_insert_csn_sids( (struct sync_cookie *)sl, 1727 i, se->se_sid, &se->se_csn ); 1728 } else { 1729 Debug( LDAP_DEBUG_SYNC, "%s syncprov_add_slog: " 1730 "updating mincsn for sid=%d csn=%s to %s\n", 1731 op->o_log_prefix, se->se_sid, sl->sl_mincsn[i].bv_val, se->se_csn.bv_val ); 1732 ber_bvreplace( &sl->sl_mincsn[i], &se->se_csn ); 1733 } 1734 ldap_tavl_delete( &sl->sl_entries, se, syncprov_sessionlog_cmp ); 1735 ch_free( se ); 1736 edge = next; 1737 sl->sl_num--; 1738 } 1739 } 1740 leave: 1741 ldap_pvt_thread_rdwr_wunlock( &sl->sl_mutex ); 1742 } 1743 } 1744 1745 /* Just set a flag if we found the matching entry */ 1746 static int 1747 playlog_cb( Operation *op, SlapReply *rs ) 1748 { 1749 if ( rs->sr_type == REP_SEARCH ) { 1750 op->o_callback->sc_private = (void *)1; 1751 } 1752 return rs->sr_err; 1753 } 1754 1755 /* 1756 * Check whether the last nmods UUIDs in the uuids list exist in the database 1757 * and (still) match the op filter, zero out the bv_len of any that still exist 1758 * and return the number of UUIDs we have confirmed are gone now. 1759 */ 1760 static int 1761 check_uuidlist_presence( 1762 Operation *op, 1763 struct berval *uuids, 1764 int len, 1765 int nmods ) 1766 { 1767 slap_overinst *on = (slap_overinst *)op->o_bd->bd_info; 1768 Operation fop = *op; 1769 SlapReply frs = { REP_RESULT }; 1770 Filter mf, af; 1771 AttributeAssertion eq = ATTRIBUTEASSERTION_INIT; 1772 slap_callback cb = {0}; 1773 int i, mods = nmods; 1774 1775 fop.o_sync_mode = 0; 1776 fop.o_callback = &cb; 1777 fop.ors_limit = NULL; 1778 fop.ors_tlimit = SLAP_NO_LIMIT; 1779 fop.ors_attrs = slap_anlist_all_attributes; 1780 fop.ors_attrsonly = 0; 1781 fop.o_managedsait = SLAP_CONTROL_CRITICAL; 1782 1783 af.f_choice = LDAP_FILTER_AND; 1784 af.f_next = NULL; 1785 af.f_and = &mf; 1786 mf.f_choice = LDAP_FILTER_EQUALITY; 1787 mf.f_ava = &eq; 1788 mf.f_av_desc = slap_schema.si_ad_entryUUID; 1789 mf.f_next = fop.ors_filter; 1790 1791 fop.ors_filter = ⁡ 1792 1793 cb.sc_response = playlog_cb; 1794 1795 fop.o_bd->bd_info = (BackendInfo *)on->on_info; 1796 for ( i=0; i<nmods; i++ ) { 1797 mf.f_av_value = uuids[ len - 1 - i ]; 1798 cb.sc_private = NULL; 1799 fop.ors_slimit = 1; 1800 1801 if ( BER_BVISEMPTY( &mf.f_av_value ) ) { 1802 mods--; 1803 continue; 1804 } 1805 1806 rs_reinit( &frs, REP_RESULT ); 1807 fop.o_bd->be_search( &fop, &frs ); 1808 if ( cb.sc_private ) { 1809 uuids[ len - 1 - i ].bv_len = 0; 1810 mods--; 1811 } 1812 } 1813 fop.o_bd->bd_info = (BackendInfo *)on; 1814 1815 return mods; 1816 } 1817 1818 /* 1819 * On each entry we get from the DB: 1820 * - if it's an ADD, skip 1821 * - check we've not handled it yet, skip if we have 1822 * - check if it's a DELETE or missing from the DB now 1823 * - send a new syncinfo entry 1824 * - remember we've handled it already 1825 * 1826 * If we exhaust the list, clear it, forgetting entries we've handled so far. 1827 */ 1828 static int 1829 syncprov_accesslog_uuid_cb( Operation *op, SlapReply *rs ) 1830 { 1831 slap_callback *sc = op->o_callback; 1832 syncprov_accesslog_deletes *uuid_progress = sc->sc_private; 1833 Attribute *a, *attrs; 1834 sync_control *srs = uuid_progress->srs; 1835 struct berval *bv, csn[2] = {}, uuid[2] = {}, 1836 add = BER_BVC("add"), 1837 delete = BER_BVC("delete"), 1838 modrdn = BER_BVC("modrdn"); 1839 int cmp, sid, i, is_delete = 0, rc; 1840 1841 if ( rs->sr_type != REP_SEARCH ) { 1842 return rs->sr_err; 1843 } 1844 attrs = rs->sr_entry->e_attrs; 1845 1846 a = attr_find( attrs, ad_reqType ); 1847 if ( !a || a->a_numvals == 0 ) { 1848 rs->sr_err = LDAP_CONSTRAINT_VIOLATION; 1849 return rs->sr_err; 1850 } 1851 1852 if ( bvmatch( &a->a_nvals[0], &add ) ) { 1853 return rs->sr_err; 1854 } 1855 1856 if ( bvmatch( &a->a_nvals[0], &delete ) ) { 1857 is_delete = 1; 1858 } 1859 1860 if ( bvmatch( &a->a_nvals[0], &modrdn ) ) { 1861 a = attr_find( attrs, ad_reqDN ); 1862 if ( !a || a->a_numvals == 0 ) { 1863 rs->sr_err = LDAP_CONSTRAINT_VIOLATION; 1864 return rs->sr_err; 1865 } 1866 1867 /* Was it present in the first place? If not, skip: */ 1868 if ( !dnIsSuffix( &a->a_nvals[0], &uuid_progress->op->o_req_ndn ) ) { 1869 return rs->sr_err; 1870 } 1871 1872 a = attr_find( attrs, ad_reqNewDN ); 1873 if ( !a || a->a_numvals == 0 ) { 1874 rs->sr_err = LDAP_CONSTRAINT_VIOLATION; 1875 return rs->sr_err; 1876 } 1877 1878 /* Has it gone away? */ 1879 if ( !dnIsSuffix( &a->a_nvals[0], &uuid_progress->op->o_req_ndn ) ) { 1880 is_delete = 1; 1881 } 1882 } 1883 1884 /* 1885 * Only pick entries that are both: 1886 */ 1887 a = attr_find( attrs, slap_schema.si_ad_entryCSN ); 1888 if ( !a || a->a_numvals == 0 ) { 1889 rs->sr_err = LDAP_CONSTRAINT_VIOLATION; 1890 return rs->sr_err; 1891 } 1892 csn[0] = a->a_nvals[0]; 1893 1894 sid = slap_parse_csn_sid( &csn[0] ); 1895 1896 /* 1897 * newer than cookieCSN (srs->sr_state.ctxcsn) 1898 */ 1899 cmp = 1; 1900 for ( i=0; i<srs->sr_state.numcsns; i++ ) { 1901 if ( sid == srs->sr_state.sids[i] ) { 1902 cmp = ber_bvcmp( &csn[0], &srs->sr_state.ctxcsn[i] ); 1903 break; 1904 } 1905 } 1906 if ( cmp <= 0 ) { 1907 Debug( LDAP_DEBUG_SYNC, "%s syncprov_accesslog_uuid_cb: " 1908 "cmp %d, csn %s too old\n", 1909 op->o_log_prefix, cmp, csn[0].bv_val ); 1910 return rs->sr_err; 1911 } 1912 1913 /* 1914 * not newer than snapshot ctxcsn (uuid_progress->ctxcsn) 1915 */ 1916 cmp = 0; 1917 for ( i=0; i<uuid_progress->numcsns; i++ ) { 1918 if ( sid == uuid_progress->sids[i] ) { 1919 cmp = ber_bvcmp( &csn[0], &uuid_progress->ctxcsn[i] ); 1920 break; 1921 } 1922 } 1923 if ( cmp > 0 ) { 1924 Debug( LDAP_DEBUG_SYNC, "%s syncprov_accesslog_uuid_cb: " 1925 "cmp %d, csn %s too new\n", 1926 op->o_log_prefix, cmp, csn[0].bv_val ); 1927 return rs->sr_err; 1928 } 1929 1930 a = attr_find( attrs, ad_reqEntryUUID ); 1931 if ( !a || a->a_numvals == 0 ) { 1932 rs->sr_err = LDAP_CONSTRAINT_VIOLATION; 1933 return rs->sr_err; 1934 } 1935 uuid[0] = a->a_nvals[0]; 1936 1937 bv = ldap_avl_find( uuid_progress->uuids, uuid, sp_uuid_cmp ); 1938 if ( bv ) { 1939 /* Already checked or sent, no change */ 1940 Debug( LDAP_DEBUG_SYNC, "%s syncprov_accesslog_uuid_cb: " 1941 "uuid %s already checked\n", 1942 op->o_log_prefix, a->a_vals[0].bv_val ); 1943 return rs->sr_err; 1944 } 1945 1946 if ( !is_delete ) { 1947 is_delete = check_uuidlist_presence( uuid_progress->op, uuid, 1, 1 ); 1948 } 1949 Debug( LDAP_DEBUG_SYNC, "%s syncprov_accesslog_uuid_cb: " 1950 "uuid %s is %s present\n", 1951 op->o_log_prefix, a->a_vals[0].bv_val, 1952 is_delete ? "no longer" : "still" ); 1953 1954 i = uuid_progress->ndel++; 1955 1956 bv = &uuid_progress->uuid_list[i]; 1957 bv->bv_val = &uuid_progress->uuid_buf[i*UUID_LEN]; 1958 bv->bv_len = a->a_nvals[0].bv_len; 1959 AC_MEMCPY( bv->bv_val, a->a_nvals[0].bv_val, a->a_nvals[0].bv_len ); 1960 1961 rc = ldap_avl_insert( &uuid_progress->uuids, bv, sp_uuid_cmp, ldap_avl_dup_error ); 1962 assert( rc == LDAP_SUCCESS ); 1963 1964 if ( is_delete ) { 1965 struct berval cookie; 1966 1967 slap_compose_sync_cookie( op, &cookie, srs->sr_state.ctxcsn, 1968 srs->sr_state.rid, slap_serverID ? slap_serverID : -1, csn ); 1969 syncprov_sendinfo( uuid_progress->op, uuid_progress->rs, 1970 LDAP_TAG_SYNC_ID_SET, &cookie, 0, uuid, 1 ); 1971 op->o_tmpfree( cookie.bv_val, op->o_tmpmemctx ); 1972 } 1973 1974 if ( uuid_progress->ndel >= uuid_progress->list_len ) { 1975 int ndel; 1976 1977 assert( uuid_progress->ndel == uuid_progress->list_len ); 1978 ndel = ldap_avl_free( uuid_progress->uuids, NULL ); 1979 assert( ndel == uuid_progress->ndel ); 1980 uuid_progress->uuids = NULL; 1981 uuid_progress->ndel = 0; 1982 } 1983 1984 return rs->sr_err; 1985 } 1986 1987 static int 1988 syncprov_play_sessionlog( Operation *op, SlapReply *rs, sync_control *srs, 1989 BerVarray ctxcsn, int numcsns, int *sids, 1990 struct berval *mincsn, int minsid ) 1991 { 1992 slap_overinst *on = (slap_overinst *)op->o_bd->bd_info; 1993 syncprov_info_t *si = (syncprov_info_t *)on->on_bi.bi_private; 1994 sessionlog *sl = si->si_logs; 1995 int i, j, ndel, num, nmods, mmods, do_play = 0, rc = -1; 1996 BerVarray uuids, csns; 1997 struct berval uuid[2] = {}, csn[2] = {}; 1998 slog_entry *se; 1999 TAvlnode *entry; 2000 char cbuf[LDAP_PVT_CSNSTR_BUFSIZE]; 2001 struct berval delcsn[2]; 2002 2003 ldap_pvt_thread_rdwr_wlock( &sl->sl_mutex ); 2004 /* Are there any log entries, and is the consumer state 2005 * present in the session log? 2006 */ 2007 if ( !sl->sl_num ) { 2008 ldap_pvt_thread_rdwr_wunlock( &sl->sl_mutex ); 2009 return rc; 2010 } 2011 assert( sl->sl_num > 0 ); 2012 2013 for ( i=0; i<sl->sl_numcsns; i++ ) { 2014 /* SID not present == new enough */ 2015 if ( minsid < sl->sl_sids[i] ) { 2016 do_play = 1; 2017 break; 2018 } 2019 /* SID present */ 2020 if ( minsid == sl->sl_sids[i] ) { 2021 /* new enough? */ 2022 if ( ber_bvcmp( mincsn, &sl->sl_mincsn[i] ) >= 0 ) 2023 do_play = 1; 2024 break; 2025 } 2026 } 2027 /* SID not present == new enough */ 2028 if ( i == sl->sl_numcsns ) 2029 do_play = 1; 2030 2031 if ( !do_play ) { 2032 ldap_pvt_thread_rdwr_wunlock( &sl->sl_mutex ); 2033 return rc; 2034 } 2035 2036 num = sl->sl_num; 2037 i = 0; 2038 nmods = 0; 2039 sl->sl_playing++; 2040 ldap_pvt_thread_rdwr_wunlock( &sl->sl_mutex ); 2041 2042 uuids = op->o_tmpalloc( (num) * sizeof( struct berval ) + 2043 num * UUID_LEN, op->o_tmpmemctx ); 2044 uuids[0].bv_val = (char *)(uuids + num); 2045 csns = op->o_tmpalloc( (num) * sizeof( struct berval ) + 2046 num * LDAP_PVT_CSNSTR_BUFSIZE, op->o_tmpmemctx ); 2047 csns[0].bv_val = (char *)(csns + num); 2048 2049 ldap_pvt_thread_rdwr_rlock( &sl->sl_mutex ); 2050 /* Make a copy of the relevant UUIDs. Put the Deletes up front 2051 * and everything else at the end. Do this first so we can 2052 * let the write side manage the sessionlog again. 2053 */ 2054 assert( sl->sl_entries ); 2055 2056 /* Find first relevant log entry. If greater than mincsn, backtrack one entry */ 2057 { 2058 slog_entry te = {0}; 2059 te.se_csn = *mincsn; 2060 entry = ldap_tavl_find3( sl->sl_entries, &te, syncprov_sessionlog_cmp, &ndel ); 2061 } 2062 if ( ndel > 0 && entry ) 2063 entry = ldap_tavl_next( entry, TAVL_DIR_LEFT ); 2064 /* if none, just start at beginning */ 2065 if ( !entry ) 2066 entry = ldap_tavl_end( sl->sl_entries, TAVL_DIR_LEFT ); 2067 2068 do { 2069 char uuidstr[40] = {}; 2070 slog_entry *se = entry->avl_data; 2071 int k; 2072 2073 /* Make sure writes can still make progress */ 2074 ldap_pvt_thread_rdwr_runlock( &sl->sl_mutex ); 2075 ndel = 1; 2076 for ( k=0; k<srs->sr_state.numcsns; k++ ) { 2077 if ( se->se_sid == srs->sr_state.sids[k] ) { 2078 ndel = ber_bvcmp( &se->se_csn, &srs->sr_state.ctxcsn[k] ); 2079 break; 2080 } 2081 } 2082 if ( ndel <= 0 ) { 2083 ldap_pvt_thread_rdwr_rlock( &sl->sl_mutex ); 2084 continue; 2085 } 2086 ndel = 0; 2087 for ( k=0; k<numcsns; k++ ) { 2088 if ( se->se_sid == sids[k] ) { 2089 ndel = ber_bvcmp( &se->se_csn, &ctxcsn[k] ); 2090 break; 2091 } 2092 } 2093 if ( ndel > 0 ) { 2094 Debug( LDAP_DEBUG_SYNC, "%s syncprov_play_sessionlog: " 2095 "cmp %d, csn %s too new, we're finished\n", 2096 op->o_log_prefix, ndel, se->se_csn.bv_val ); 2097 ldap_pvt_thread_rdwr_rlock( &sl->sl_mutex ); 2098 break; 2099 } 2100 if ( se->se_tag == LDAP_REQ_DELETE ) { 2101 j = i; 2102 i++; 2103 } else { 2104 if ( se->se_tag == LDAP_REQ_ADD ) { 2105 ldap_pvt_thread_rdwr_rlock( &sl->sl_mutex ); 2106 continue; 2107 } 2108 nmods++; 2109 j = num - nmods; 2110 } 2111 uuids[j].bv_val = uuids[0].bv_val + (j * UUID_LEN); 2112 AC_MEMCPY(uuids[j].bv_val, se->se_uuid.bv_val, UUID_LEN); 2113 uuids[j].bv_len = UUID_LEN; 2114 2115 csns[j].bv_val = csns[0].bv_val + (j * LDAP_PVT_CSNSTR_BUFSIZE); 2116 AC_MEMCPY(csns[j].bv_val, se->se_csn.bv_val, se->se_csn.bv_len); 2117 csns[j].bv_len = se->se_csn.bv_len; 2118 /* We're printing it */ 2119 csns[j].bv_val[csns[j].bv_len] = '\0'; 2120 2121 if ( LogTest( LDAP_DEBUG_SYNC ) ) { 2122 lutil_uuidstr_from_normalized( uuids[j].bv_val, uuids[j].bv_len, 2123 uuidstr, 40 ); 2124 Debug( LDAP_DEBUG_SYNC, "%s syncprov_play_sessionlog: " 2125 "picking a %s entry uuid=%s cookie=%s\n", 2126 op->o_log_prefix, se->se_tag == LDAP_REQ_DELETE ? "deleted" : "modified", 2127 uuidstr, csns[j].bv_val ); 2128 } 2129 ldap_pvt_thread_rdwr_rlock( &sl->sl_mutex ); 2130 } while ( (entry = ldap_tavl_next( entry, TAVL_DIR_RIGHT )) != NULL ); 2131 ldap_pvt_thread_rdwr_runlock( &sl->sl_mutex ); 2132 ldap_pvt_thread_rdwr_wlock( &sl->sl_mutex ); 2133 sl->sl_playing--; 2134 ldap_pvt_thread_rdwr_wunlock( &sl->sl_mutex ); 2135 2136 ndel = i; 2137 2138 /* Zero out unused slots */ 2139 for ( i=ndel; i < num - nmods; i++ ) 2140 uuids[i].bv_len = 0; 2141 2142 /* Mods must be validated to see if they belong in this delete set. 2143 */ 2144 2145 mmods = nmods; 2146 /* Strip any duplicates */ 2147 for ( i=0; i<nmods; i++ ) { 2148 for ( j=0; j<ndel; j++ ) { 2149 if ( bvmatch( &uuids[j], &uuids[num - 1 - i] )) { 2150 uuids[num - 1 - i].bv_len = 0; 2151 mmods --; 2152 break; 2153 } 2154 } 2155 if ( uuids[num - 1 - i].bv_len == 0 ) continue; 2156 for ( j=0; j<i; j++ ) { 2157 if ( bvmatch( &uuids[num - 1 - j], &uuids[num - 1 - i] )) { 2158 uuids[num - 1 - i].bv_len = 0; 2159 mmods --; 2160 break; 2161 } 2162 } 2163 } 2164 2165 /* Check mods now */ 2166 if ( mmods ) { 2167 check_uuidlist_presence( op, uuids, num, nmods ); 2168 } 2169 2170 /* ITS#8768 Send entries sorted by CSN order */ 2171 i = j = 0; 2172 while ( i < ndel || j < nmods ) { 2173 struct berval cookie; 2174 int index; 2175 2176 /* Skip over duplicate mods */ 2177 if ( j < nmods && BER_BVISEMPTY( &uuids[ num - 1 - j ] ) ) { 2178 j++; 2179 continue; 2180 } 2181 index = num - 1 - j; 2182 2183 if ( i >= ndel ) { 2184 j++; 2185 } else if ( j >= nmods ) { 2186 index = i++; 2187 /* Take the oldest by CSN order */ 2188 } else if ( ber_bvcmp( &csns[index], &csns[i] ) < 0 ) { 2189 j++; 2190 } else { 2191 index = i++; 2192 } 2193 2194 uuid[0] = uuids[index]; 2195 csn[0] = csns[index]; 2196 2197 slap_compose_sync_cookie( op, &cookie, srs->sr_state.ctxcsn, 2198 srs->sr_state.rid, slap_serverID ? slap_serverID : -1, csn ); 2199 if ( LogTest( LDAP_DEBUG_SYNC ) ) { 2200 char uuidstr[40]; 2201 lutil_uuidstr_from_normalized( uuid[0].bv_val, uuid[0].bv_len, 2202 uuidstr, 40 ); 2203 Debug( LDAP_DEBUG_SYNC, "%s syncprov_play_sessionlog: " 2204 "sending a new disappearing entry uuid=%s cookie=%s\n", 2205 op->o_log_prefix, uuidstr, cookie.bv_val ); 2206 } 2207 2208 /* TODO: we might batch those that share the same CSN (think present 2209 * phase), but would have to limit how many we send out at once */ 2210 syncprov_sendinfo( op, rs, LDAP_TAG_SYNC_ID_SET, &cookie, 0, uuid, 1 ); 2211 } 2212 op->o_tmpfree( uuids, op->o_tmpmemctx ); 2213 op->o_tmpfree( csns, op->o_tmpmemctx ); 2214 2215 return LDAP_SUCCESS; 2216 } 2217 2218 static int 2219 syncprov_play_accesslog( Operation *op, SlapReply *rs, sync_control *srs, 2220 BerVarray ctxcsn, int numcsns, int *sids, 2221 struct berval *mincsn, int minsid ) 2222 { 2223 slap_overinst *on = (slap_overinst *)op->o_bd->bd_info; 2224 syncprov_info_t *si = on->on_bi.bi_private; 2225 Operation fop; 2226 SlapReply frs = { REP_RESULT }; 2227 slap_callback cb = {}; 2228 Filter *f; 2229 syncprov_accesslog_deletes uuid_progress = { 2230 .op = op, 2231 .rs = rs, 2232 .srs = srs, 2233 .ctxcsn = ctxcsn, 2234 .numcsns = numcsns, 2235 .sids = sids, 2236 }; 2237 struct berval oldestcsn = BER_BVNULL, newestcsn = ctxcsn[0], 2238 basedn, filterpattern = BER_BVC( 2239 "(&" 2240 "(entryCSN>=%s)" 2241 "(entryCSN<=%s)" 2242 "(reqResult=0)" 2243 "(|" 2244 "(reqDN:dnSubtreeMatch:=%s)" 2245 "(reqNewDN:dnSubtreeMatch:=%s)" 2246 ")" 2247 "(|" 2248 "(objectclass=auditWriteObject)" 2249 "(objectclass=auditExtended)" 2250 "))" ); 2251 BackendDB *db; 2252 Entry *e; 2253 Attribute *a; 2254 int i, rc = -1; 2255 2256 assert( !BER_BVISNULL( &si->si_logbase ) ); 2257 2258 for ( i=1; i < numcsns; i++ ) { 2259 if ( ber_bvcmp( &newestcsn, &ctxcsn[i] ) < 0 ) { 2260 newestcsn = ctxcsn[i]; 2261 } 2262 } 2263 2264 db = select_backend( &si->si_logbase, 0 ); 2265 if ( !db ) { 2266 Debug( LDAP_DEBUG_ANY, "%s syncprov_play_accesslog: " 2267 "No database configured to hold accesslog dn=%s\n", 2268 op->o_log_prefix, si->si_logbase.bv_val ); 2269 return LDAP_NO_SUCH_OBJECT; 2270 } 2271 2272 fop = *op; 2273 fop.o_sync_mode = 0; 2274 fop.o_bd = db; 2275 rc = be_entry_get_rw( &fop, &si->si_logbase, NULL, ad_minCSN, 0, &e ); 2276 if ( rc ) { 2277 return rc; 2278 } 2279 2280 a = attr_find( e->e_attrs, ad_minCSN ); 2281 if ( !a ) { 2282 be_entry_release_rw( &fop, e, 0 ); 2283 return LDAP_NO_SUCH_ATTRIBUTE; 2284 } 2285 for ( i=0; i < a->a_numvals; i++ ) { 2286 if ( BER_BVISEMPTY( &oldestcsn ) || 2287 ber_bvcmp( &oldestcsn, &a->a_nvals[i] ) > 0 ) { 2288 oldestcsn = a->a_nvals[i]; 2289 } 2290 } 2291 2292 filter_escape_value_x( &op->o_req_ndn, &basedn, fop.o_tmpmemctx ); 2293 /* filter_escape_value_x sets output to BVNULL if input value is empty, 2294 * supply our own copy */ 2295 if ( BER_BVISEMPTY( &basedn ) ) { 2296 basedn.bv_val = ""; 2297 } 2298 fop.o_req_ndn = fop.o_req_dn = si->si_logbase; 2299 fop.ors_filterstr.bv_val = fop.o_tmpalloc( 2300 filterpattern.bv_len + 2301 oldestcsn.bv_len + newestcsn.bv_len + 2 * basedn.bv_len, 2302 fop.o_tmpmemctx ); 2303 fop.ors_filterstr.bv_len = sprintf( fop.ors_filterstr.bv_val, 2304 filterpattern.bv_val, 2305 oldestcsn.bv_val, newestcsn.bv_val, basedn.bv_val, basedn.bv_val ); 2306 Debug( LDAP_DEBUG_SYNC, "%s syncprov_play_accesslog: " 2307 "prepared filter '%s', base='%s'\n", 2308 op->o_log_prefix, fop.ors_filterstr.bv_val, si->si_logbase.bv_val ); 2309 f = str2filter_x( &fop, fop.ors_filterstr.bv_val ); 2310 assert( f != NULL ); 2311 fop.ors_filter = f; 2312 2313 if ( !BER_BVISEMPTY( &basedn ) ) { 2314 fop.o_tmpfree( basedn.bv_val, fop.o_tmpmemctx ); 2315 } 2316 be_entry_release_rw( &fop, e, 0 ); 2317 2318 /* 2319 * Allocate memory for list_len uuids for use by the callback, populate 2320 * with entries that we have sent or checked still match the filter. 2321 * A disappearing entry gets its uuid sent as a delete. 2322 * 2323 * in the callback, we need: 2324 * - original op and rs so we can send the message 2325 * - sync_control 2326 * - the uuid buffer and list and their length 2327 * - number of uuids we already have in the list 2328 * - the lookup structure so we don't have to check/send a uuid twice 2329 * (AVL?) 2330 */ 2331 uuid_progress.list_len = SLAP_SYNCUUID_SET_SIZE; 2332 uuid_progress.uuid_list = fop.o_tmpalloc( (uuid_progress.list_len) * sizeof(struct berval), fop.o_tmpmemctx ); 2333 uuid_progress.uuid_buf = fop.o_tmpalloc( (uuid_progress.list_len) * UUID_LEN, fop.o_tmpmemctx ); 2334 2335 cb.sc_private = &uuid_progress; 2336 cb.sc_response = syncprov_accesslog_uuid_cb; 2337 2338 fop.o_callback = &cb; 2339 2340 rc = fop.o_bd->be_search( &fop, &frs ); 2341 2342 fop.o_tmpfree( uuid_progress.uuid_buf, fop.o_tmpmemctx ); 2343 fop.o_tmpfree( uuid_progress.uuid_list, fop.o_tmpmemctx ); 2344 fop.o_tmpfree( fop.ors_filterstr.bv_val, fop.o_tmpmemctx ); 2345 filter_free_x( &fop, f, 1 ); 2346 2347 return rc; 2348 } 2349 2350 static int 2351 syncprov_new_ctxcsn( opcookie *opc, syncprov_info_t *si, int csn_changed, int numvals, BerVarray vals ) 2352 { 2353 unsigned i; 2354 int j, sid; 2355 2356 for ( i=0; i<numvals; i++ ) { 2357 sid = slap_parse_csn_sid( &vals[i] ); 2358 for ( j=0; j<si->si_numcsns; j++ ) { 2359 if ( sid < si->si_sids[j] ) 2360 break; 2361 if ( sid == si->si_sids[j] ) { 2362 if ( ber_bvcmp( &vals[i], &si->si_ctxcsn[j] ) > 0 ) { 2363 ber_bvreplace( &si->si_ctxcsn[j], &vals[i] ); 2364 csn_changed = 1; 2365 } 2366 break; 2367 } 2368 } 2369 2370 if ( j == si->si_numcsns || sid != si->si_sids[j] ) { 2371 slap_insert_csn_sids( (struct sync_cookie *)&si->si_ctxcsn, 2372 j, sid, &vals[i] ); 2373 csn_changed = 1; 2374 } 2375 } 2376 if ( csn_changed ) 2377 si->si_dirty = 0; 2378 ldap_pvt_thread_rdwr_wunlock( &si->si_csn_rwlock ); 2379 2380 if ( csn_changed ) { 2381 syncops *ss; 2382 ldap_pvt_thread_mutex_lock( &si->si_ops_mutex ); 2383 for ( ss = si->si_ops; ss; ss = ss->s_next ) { 2384 if ( ss->s_op->o_abandon ) 2385 continue; 2386 /* Send the updated csn to all syncrepl consumers, 2387 * including the server from which it originated. 2388 * The syncrepl consumer and syncprov provider on 2389 * the originating server may be configured to store 2390 * their csn values in different entries. 2391 */ 2392 syncprov_qresp( opc, ss, LDAP_SYNC_NEW_COOKIE ); 2393 } 2394 ldap_pvt_thread_mutex_unlock( &si->si_ops_mutex ); 2395 } 2396 return csn_changed; 2397 } 2398 2399 static int 2400 syncprov_op_response( Operation *op, SlapReply *rs ) 2401 { 2402 opcookie *opc = op->o_callback->sc_private; 2403 slap_overinst *on = opc->son; 2404 syncprov_info_t *si = on->on_bi.bi_private; 2405 syncmatches *sm; 2406 2407 if ( rs->sr_err == LDAP_SUCCESS ) 2408 { 2409 struct berval maxcsn; 2410 char cbuf[LDAP_PVT_CSNSTR_BUFSIZE]; 2411 int do_check = 0, have_psearches, foundit, csn_changed = 0; 2412 2413 ldap_pvt_thread_mutex_lock( &si->si_resp_mutex ); 2414 2415 /* Update our context CSN */ 2416 cbuf[0] = '\0'; 2417 maxcsn.bv_val = cbuf; 2418 maxcsn.bv_len = sizeof(cbuf); 2419 ldap_pvt_thread_rdwr_wlock( &si->si_csn_rwlock ); 2420 2421 slap_get_commit_csn( op, &maxcsn, &foundit ); 2422 if ( BER_BVISEMPTY( &maxcsn ) && SLAP_GLUE_SUBORDINATE( op->o_bd )) { 2423 /* syncrepl queues the CSN values in the db where 2424 * it is configured , not where the changes are made. 2425 * So look for a value in the glue db if we didn't 2426 * find any in this db. 2427 */ 2428 BackendDB *be = op->o_bd; 2429 op->o_bd = select_backend( &be->be_nsuffix[0], 1); 2430 maxcsn.bv_val = cbuf; 2431 maxcsn.bv_len = sizeof(cbuf); 2432 slap_get_commit_csn( op, &maxcsn, &foundit ); 2433 op->o_bd = be; 2434 } 2435 if ( !BER_BVISEMPTY( &maxcsn ) ) { 2436 int i, sid; 2437 #ifdef CHECK_CSN 2438 Syntax *syn = slap_schema.si_ad_contextCSN->ad_type->sat_syntax; 2439 assert( !syn->ssyn_validate( syn, &maxcsn )); 2440 #endif 2441 sid = slap_parse_csn_sid( &maxcsn ); 2442 for ( i=0; i<si->si_numcsns; i++ ) { 2443 if ( sid < si->si_sids[i] ) 2444 break; 2445 if ( sid == si->si_sids[i] ) { 2446 if ( ber_bvcmp( &maxcsn, &si->si_ctxcsn[i] ) > 0 ) { 2447 ber_bvreplace( &si->si_ctxcsn[i], &maxcsn ); 2448 csn_changed = 1; 2449 } 2450 break; 2451 } 2452 } 2453 /* It's a new SID for us */ 2454 if ( i == si->si_numcsns || sid != si->si_sids[i] ) { 2455 slap_insert_csn_sids((struct sync_cookie *)&(si->si_ctxcsn), 2456 i, sid, &maxcsn ); 2457 csn_changed = 1; 2458 } 2459 } 2460 2461 /* Don't do any processing for consumer contextCSN updates */ 2462 if ( SLAPD_SYNC_IS_SYNCCONN( op->o_connid ) && 2463 op->o_tag == LDAP_REQ_MODIFY && 2464 op->orm_modlist && 2465 op->orm_modlist->sml_op == LDAP_MOD_REPLACE && 2466 op->orm_modlist->sml_desc == slap_schema.si_ad_contextCSN ) { 2467 /* Catch contextCSN updates from syncrepl. We have to look at 2468 * all the attribute values, as there may be more than one csn 2469 * that changed, and only one can be passed in the csn queue. 2470 */ 2471 csn_changed = syncprov_new_ctxcsn( opc, si, csn_changed, 2472 op->orm_modlist->sml_numvals, op->orm_modlist->sml_values ); 2473 if ( csn_changed ) 2474 si->si_numops++; 2475 goto leave; 2476 } 2477 if ( op->o_dont_replicate ) { 2478 if ( csn_changed ) 2479 si->si_numops++; 2480 ldap_pvt_thread_rdwr_wunlock( &si->si_csn_rwlock ); 2481 goto leave; 2482 } 2483 2484 /* If we're adding the context entry, parse all of its contextCSNs */ 2485 if ( op->o_tag == LDAP_REQ_ADD && 2486 dn_match( &op->o_req_ndn, &si->si_contextdn )) { 2487 Attribute *a = attr_find( op->ora_e->e_attrs, slap_schema.si_ad_contextCSN ); 2488 if ( a ) { 2489 csn_changed = syncprov_new_ctxcsn( opc, si, csn_changed, a->a_numvals, a->a_vals ); 2490 if ( csn_changed ) 2491 si->si_numops++; 2492 goto added; 2493 } 2494 } 2495 2496 if ( csn_changed ) 2497 si->si_numops++; 2498 if ( si->si_chkops || si->si_chktime ) { 2499 /* Never checkpoint adding the context entry, 2500 * it will deadlock 2501 */ 2502 if ( op->o_tag != LDAP_REQ_ADD || 2503 !dn_match( &op->o_req_ndn, &si->si_contextdn )) { 2504 if ( si->si_chkops && si->si_numops >= si->si_chkops ) { 2505 do_check = 1; 2506 si->si_numops = 0; 2507 } 2508 if ( si->si_chktime && 2509 (op->o_time - si->si_chklast >= si->si_chktime )) { 2510 if ( si->si_chklast ) { 2511 do_check = 1; 2512 si->si_chklast = op->o_time; 2513 } else { 2514 si->si_chklast = 1; 2515 } 2516 } 2517 } 2518 } 2519 si->si_dirty = !csn_changed; 2520 ldap_pvt_thread_rdwr_wunlock( &si->si_csn_rwlock ); 2521 2522 added: 2523 if ( do_check ) { 2524 ldap_pvt_thread_rdwr_rlock( &si->si_csn_rwlock ); 2525 syncprov_checkpoint( op, on ); 2526 ldap_pvt_thread_rdwr_runlock( &si->si_csn_rwlock ); 2527 } 2528 2529 /* only update consumer ctx if this is a newer csn */ 2530 if ( csn_changed ) { 2531 opc->sctxcsn = maxcsn; 2532 } 2533 2534 /* Handle any persistent searches */ 2535 ldap_pvt_thread_mutex_lock( &si->si_ops_mutex ); 2536 have_psearches = ( si->si_ops != NULL ); 2537 ldap_pvt_thread_mutex_unlock( &si->si_ops_mutex ); 2538 if ( have_psearches ) { 2539 switch(op->o_tag) { 2540 case LDAP_REQ_ADD: 2541 case LDAP_REQ_MODIFY: 2542 case LDAP_REQ_MODRDN: 2543 case LDAP_REQ_EXTENDED: 2544 syncprov_matchops( op, opc, 0 ); 2545 break; 2546 case LDAP_REQ_DELETE: 2547 /* for each match in opc->smatches: 2548 * send DELETE msg 2549 */ 2550 for ( sm = opc->smatches; sm; sm=sm->sm_next ) { 2551 if ( sm->sm_op->s_op->o_abandon ) 2552 continue; 2553 syncprov_qresp( opc, sm->sm_op, LDAP_SYNC_DELETE ); 2554 } 2555 if ( opc->ssres.s_info ) 2556 free_resinfo( &opc->ssres ); 2557 break; 2558 } 2559 } 2560 2561 /* Add any log records */ 2562 if ( si->si_logs ) { 2563 syncprov_add_slog( op ); 2564 } 2565 leave: ldap_pvt_thread_mutex_unlock( &si->si_resp_mutex ); 2566 } 2567 return SLAP_CB_CONTINUE; 2568 } 2569 2570 /* We don't use a subentry to store the context CSN any more. 2571 * We expose the current context CSN as an operational attribute 2572 * of the suffix entry. 2573 */ 2574 static int 2575 syncprov_op_compare( Operation *op, SlapReply *rs ) 2576 { 2577 slap_overinst *on = (slap_overinst *)op->o_bd->bd_info; 2578 syncprov_info_t *si = on->on_bi.bi_private; 2579 int rc = SLAP_CB_CONTINUE; 2580 2581 if ( dn_match( &op->o_req_ndn, &si->si_contextdn ) && 2582 op->oq_compare.rs_ava->aa_desc == slap_schema.si_ad_contextCSN ) 2583 { 2584 Entry e = {0}; 2585 Attribute a = {0}; 2586 2587 e.e_name = si->si_contextdn; 2588 e.e_nname = si->si_contextdn; 2589 e.e_attrs = &a; 2590 2591 a.a_desc = slap_schema.si_ad_contextCSN; 2592 2593 ldap_pvt_thread_rdwr_rlock( &si->si_csn_rwlock ); 2594 2595 a.a_vals = si->si_ctxcsn; 2596 a.a_nvals = a.a_vals; 2597 a.a_numvals = si->si_numcsns; 2598 2599 rs->sr_err = access_allowed( op, &e, op->oq_compare.rs_ava->aa_desc, 2600 &op->oq_compare.rs_ava->aa_value, ACL_COMPARE, NULL ); 2601 if ( ! rs->sr_err ) { 2602 rs->sr_err = LDAP_INSUFFICIENT_ACCESS; 2603 goto return_results; 2604 } 2605 2606 if ( get_assert( op ) && 2607 ( test_filter( op, &e, get_assertion( op ) ) != LDAP_COMPARE_TRUE ) ) 2608 { 2609 rs->sr_err = LDAP_ASSERTION_FAILED; 2610 goto return_results; 2611 } 2612 2613 2614 rs->sr_err = LDAP_COMPARE_FALSE; 2615 2616 if ( attr_valfind( &a, 2617 SLAP_MR_ATTRIBUTE_VALUE_NORMALIZED_MATCH | 2618 SLAP_MR_ASSERTED_VALUE_NORMALIZED_MATCH, 2619 &op->oq_compare.rs_ava->aa_value, NULL, op->o_tmpmemctx ) == 0 ) 2620 { 2621 rs->sr_err = LDAP_COMPARE_TRUE; 2622 } 2623 2624 return_results:; 2625 2626 ldap_pvt_thread_rdwr_runlock( &si->si_csn_rwlock ); 2627 2628 send_ldap_result( op, rs ); 2629 2630 if( rs->sr_err == LDAP_COMPARE_FALSE || rs->sr_err == LDAP_COMPARE_TRUE ) { 2631 rs->sr_err = LDAP_SUCCESS; 2632 } 2633 rc = rs->sr_err; 2634 } 2635 2636 return rc; 2637 } 2638 2639 static int 2640 syncprov_op_mod( Operation *op, SlapReply *rs ) 2641 { 2642 slap_overinst *on = (slap_overinst *)op->o_bd->bd_info; 2643 syncprov_info_t *si = on->on_bi.bi_private; 2644 slap_callback *cb; 2645 opcookie *opc; 2646 int have_psearches, cbsize; 2647 2648 ldap_pvt_thread_mutex_lock( &si->si_ops_mutex ); 2649 have_psearches = ( si->si_ops != NULL ); 2650 si->si_active++; 2651 ldap_pvt_thread_mutex_unlock( &si->si_ops_mutex ); 2652 2653 cbsize = sizeof(slap_callback) + sizeof(opcookie) + 2654 (have_psearches ? sizeof(modinst) : 0 ); 2655 2656 cb = op->o_tmpcalloc(1, cbsize, op->o_tmpmemctx); 2657 opc = (opcookie *)(cb+1); 2658 opc->son = on; 2659 cb->sc_response = syncprov_op_response; 2660 cb->sc_cleanup = syncprov_op_cleanup; 2661 cb->sc_private = opc; 2662 cb->sc_next = op->o_callback; 2663 op->o_callback = cb; 2664 2665 opc->osid = -1; 2666 opc->rsid = -1; 2667 if ( op->o_csn.bv_val ) { 2668 opc->osid = slap_parse_csn_sid( &op->o_csn ); 2669 } 2670 if ( op->o_controls ) { 2671 struct sync_cookie *scook = 2672 op->o_controls[slap_cids.sc_LDAPsync]; 2673 if ( scook ) 2674 opc->rsid = scook->sid; 2675 } 2676 2677 if ( op->o_dont_replicate ) 2678 return SLAP_CB_CONTINUE; 2679 2680 /* If there are active persistent searches, lock this operation. 2681 * See seqmod.c for the locking logic on its own. 2682 */ 2683 if ( have_psearches ) { 2684 modtarget *mt, mtdummy; 2685 modinst *mi; 2686 2687 mi = (modinst *)(opc+1); 2688 mi->mi_op = op; 2689 2690 /* See if we're already modifying this entry... */ 2691 mtdummy.mt_dn = op->o_req_ndn; 2692 retry: 2693 ldap_pvt_thread_mutex_lock( &si->si_mods_mutex ); 2694 mt = ldap_avl_find( si->si_mods, &mtdummy, sp_avl_cmp ); 2695 if ( mt ) { 2696 ldap_pvt_thread_mutex_lock( &mt->mt_mutex ); 2697 if ( mt->mt_mods == NULL ) { 2698 /* Cannot reuse this mt, as another thread is about 2699 * to release it in syncprov_op_cleanup. Wait for them 2700 * to finish; our own insert is required to succeed. 2701 */ 2702 ldap_pvt_thread_mutex_unlock( &mt->mt_mutex ); 2703 ldap_pvt_thread_mutex_unlock( &si->si_mods_mutex ); 2704 ldap_pvt_thread_yield(); 2705 goto retry; 2706 } 2707 } 2708 if ( mt ) { 2709 mt->mt_tail->mi_next = mi; 2710 mt->mt_tail = mi; 2711 ldap_pvt_thread_mutex_unlock( &si->si_mods_mutex ); 2712 /* wait for this op to get to head of list */ 2713 while ( mt->mt_mods != mi ) { 2714 modinst *m2; 2715 /* don't wait on other mods from the same thread */ 2716 for ( m2 = mt->mt_mods; m2; m2 = m2->mi_next ) { 2717 if ( m2->mi_op->o_threadctx == op->o_threadctx ) { 2718 break; 2719 } 2720 } 2721 if ( m2 ) 2722 break; 2723 2724 ldap_pvt_thread_mutex_unlock( &mt->mt_mutex ); 2725 /* FIXME: if dynamic config can delete overlays or 2726 * databases we'll have to check for cleanup here. 2727 * Currently it's not an issue because there are 2728 * no dynamic config deletes... 2729 */ 2730 if ( slapd_shutdown ) 2731 return SLAPD_ABANDON; 2732 2733 if ( !ldap_pvt_thread_pool_pausecheck( &connection_pool )) 2734 ldap_pvt_thread_yield(); 2735 ldap_pvt_thread_mutex_lock( &mt->mt_mutex ); 2736 2737 /* clean up if the caller is giving up */ 2738 if ( op->o_abandon ) { 2739 modinst **m2; 2740 slap_callback **sc; 2741 for (m2 = &mt->mt_mods; ; m2 = &(*m2)->mi_next) { 2742 if ( *m2 == mi ) { 2743 *m2 = mi->mi_next; 2744 if ( mt->mt_tail == mi ) 2745 mt->mt_tail = ( m2 == &mt->mt_mods ) ? NULL : (modinst *)m2; 2746 break; 2747 } 2748 } 2749 for (sc = &op->o_callback; ; sc = &(*sc)->sc_next) { 2750 if ( *sc == cb ) { 2751 *sc = cb->sc_next; 2752 break; 2753 } 2754 } 2755 op->o_tmpfree( cb, op->o_tmpmemctx ); 2756 ldap_pvt_thread_mutex_unlock( &mt->mt_mutex ); 2757 return SLAPD_ABANDON; 2758 } 2759 } 2760 ldap_pvt_thread_mutex_unlock( &mt->mt_mutex ); 2761 } else { 2762 /* Record that we're modifying this entry now */ 2763 mt = ch_malloc( sizeof(modtarget) ); 2764 mt->mt_mods = mi; 2765 mt->mt_tail = mi; 2766 ber_dupbv( &mt->mt_dn, &mi->mi_op->o_req_ndn ); 2767 ldap_pvt_thread_mutex_init( &mt->mt_mutex ); 2768 ldap_avl_insert( &si->si_mods, mt, sp_avl_cmp, ldap_avl_dup_error ); 2769 ldap_pvt_thread_mutex_unlock( &si->si_mods_mutex ); 2770 } 2771 opc->smt = mt; 2772 } 2773 2774 if (( have_psearches || si->si_logs ) && op->o_tag != LDAP_REQ_ADD ) 2775 syncprov_matchops( op, opc, 1 ); 2776 2777 return SLAP_CB_CONTINUE; 2778 } 2779 2780 static int 2781 syncprov_op_extended( Operation *op, SlapReply *rs ) 2782 { 2783 if ( exop_is_write( op )) 2784 return syncprov_op_mod( op, rs ); 2785 2786 return SLAP_CB_CONTINUE; 2787 } 2788 2789 typedef struct searchstate { 2790 slap_overinst *ss_on; 2791 syncops *ss_so; 2792 BerVarray ss_ctxcsn; 2793 int *ss_sids; 2794 int ss_numcsns; 2795 #define SS_PRESENT 0x01 2796 #define SS_CHANGED 0x02 2797 int ss_flags; 2798 } searchstate; 2799 2800 typedef struct SyncOperationBuffer { 2801 Operation sob_op; 2802 Opheader sob_hdr; 2803 OpExtra sob_oe; 2804 AttributeName sob_extra; /* not always present */ 2805 /* Further data allocated here */ 2806 } SyncOperationBuffer; 2807 2808 static void 2809 syncprov_detach_op( Operation *op, syncops *so, slap_overinst *on ) 2810 { 2811 SyncOperationBuffer *sopbuf2; 2812 Operation *op2; 2813 int i, alen = 0; 2814 size_t size; 2815 char *ptr; 2816 GroupAssertion *g1, *g2; 2817 2818 /* count the search attrs */ 2819 for (i=0; op->ors_attrs && !BER_BVISNULL( &op->ors_attrs[i].an_name ); i++) { 2820 alen += op->ors_attrs[i].an_name.bv_len + 1; 2821 } 2822 /* Make a new copy of the operation */ 2823 size = offsetof( SyncOperationBuffer, sob_extra ) + 2824 (i ? ( (i+1) * sizeof(AttributeName) + alen) : 0) + 2825 op->o_req_dn.bv_len + 1 + 2826 op->o_req_ndn.bv_len + 1 + 2827 op->o_ndn.bv_len + 1 + 2828 so->s_filterstr.bv_len + 1; 2829 sopbuf2 = ch_calloc( 1, size ); 2830 op2 = &sopbuf2->sob_op; 2831 op2->o_hdr = &sopbuf2->sob_hdr; 2832 LDAP_SLIST_FIRST(&op2->o_extra) = &sopbuf2->sob_oe; 2833 2834 /* Copy the fields we care about explicitly, leave the rest alone */ 2835 *op2->o_hdr = *op->o_hdr; 2836 op2->o_tag = op->o_tag; 2837 op2->o_time = op->o_time; 2838 op2->o_bd = on->on_info->oi_origdb; 2839 op2->o_request = op->o_request; 2840 op2->o_managedsait = op->o_managedsait; 2841 LDAP_SLIST_FIRST(&op2->o_extra)->oe_key = on; 2842 LDAP_SLIST_NEXT(LDAP_SLIST_FIRST(&op2->o_extra), oe_next) = NULL; 2843 2844 ptr = (char *) sopbuf2 + offsetof( SyncOperationBuffer, sob_extra ); 2845 if ( i ) { 2846 op2->ors_attrs = (AttributeName *) ptr; 2847 ptr = (char *) &op2->ors_attrs[i+1]; 2848 for (i=0; !BER_BVISNULL( &op->ors_attrs[i].an_name ); i++) { 2849 op2->ors_attrs[i] = op->ors_attrs[i]; 2850 op2->ors_attrs[i].an_name.bv_val = ptr; 2851 ptr = lutil_strcopy( ptr, op->ors_attrs[i].an_name.bv_val ) + 1; 2852 } 2853 BER_BVZERO( &op2->ors_attrs[i].an_name ); 2854 } 2855 2856 op2->o_authz = op->o_authz; 2857 op2->o_ndn.bv_val = ptr; 2858 ptr = lutil_strcopy(ptr, op->o_ndn.bv_val) + 1; 2859 op2->o_dn = op2->o_ndn; 2860 op2->o_req_dn.bv_len = op->o_req_dn.bv_len; 2861 op2->o_req_dn.bv_val = ptr; 2862 ptr = lutil_strcopy(ptr, op->o_req_dn.bv_val) + 1; 2863 op2->o_req_ndn.bv_len = op->o_req_ndn.bv_len; 2864 op2->o_req_ndn.bv_val = ptr; 2865 ptr = lutil_strcopy(ptr, op->o_req_ndn.bv_val) + 1; 2866 op2->ors_filterstr.bv_val = ptr; 2867 strcpy( ptr, so->s_filterstr.bv_val ); 2868 op2->ors_filterstr.bv_len = so->s_filterstr.bv_len; 2869 2870 /* Skip the AND/GE clause that we stuck on in front */ 2871 if ( so->s_flags & PS_FIX_FILTER ) { 2872 op2->ors_filter = op->ors_filter->f_and->f_next; 2873 so->s_flags ^= PS_FIX_FILTER; 2874 } else { 2875 op2->ors_filter = op->ors_filter; 2876 } 2877 op2->ors_filter = filter_dup( op2->ors_filter, NULL ); 2878 so->s_op = op2; 2879 2880 /* Copy any cached group ACLs individually */ 2881 op2->o_groups = NULL; 2882 for ( g1=op->o_groups; g1; g1=g1->ga_next ) { 2883 g2 = ch_malloc( sizeof(GroupAssertion) + g1->ga_len ); 2884 *g2 = *g1; 2885 strcpy( g2->ga_ndn, g1->ga_ndn ); 2886 g2->ga_next = op2->o_groups; 2887 op2->o_groups = g2; 2888 } 2889 /* Don't allow any further group caching */ 2890 op2->o_do_not_cache = 1; 2891 2892 /* Add op2 to conn so abandon will find us */ 2893 op->o_conn->c_n_ops_executing++; 2894 op->o_conn->c_n_ops_completed--; 2895 LDAP_STAILQ_INSERT_TAIL( &op->o_conn->c_ops, op2, o_next ); 2896 so->s_flags |= PS_IS_DETACHED; 2897 2898 /* Prevent anyone else from trying to send a result for this op */ 2899 op->o_abandon = 1; 2900 } 2901 2902 static int 2903 syncprov_search_response( Operation *op, SlapReply *rs ) 2904 { 2905 searchstate *ss = op->o_callback->sc_private; 2906 slap_overinst *on = ss->ss_on; 2907 syncprov_info_t *si = (syncprov_info_t *)on->on_bi.bi_private; 2908 sync_control *srs = op->o_controls[slap_cids.sc_LDAPsync]; 2909 2910 if ( rs->sr_type == REP_SEARCH || rs->sr_type == REP_SEARCHREF ) { 2911 Attribute *a; 2912 /* If we got a referral without a referral object, there's 2913 * something missing that we cannot replicate. Just ignore it. 2914 * The consumer will abort because we didn't send the expected 2915 * control. 2916 */ 2917 if ( !rs->sr_entry ) { 2918 assert( rs->sr_entry != NULL ); 2919 Debug( LDAP_DEBUG_ANY, "%s syncprov_search_response: " 2920 "bogus referral in context\n", op->o_log_prefix ); 2921 return SLAP_CB_CONTINUE; 2922 } 2923 a = attr_find( rs->sr_entry->e_attrs, slap_schema.si_ad_entryCSN ); 2924 if ( a == NULL && rs->sr_operational_attrs != NULL ) { 2925 a = attr_find( rs->sr_operational_attrs, slap_schema.si_ad_entryCSN ); 2926 } 2927 if ( a ) { 2928 int i, sid; 2929 sid = slap_parse_csn_sid( &a->a_nvals[0] ); 2930 2931 /* If not a persistent search */ 2932 if ( !ss->ss_so ) { 2933 /* Make sure entry is less than the snapshot'd contextCSN */ 2934 for ( i=0; i<ss->ss_numcsns; i++ ) { 2935 if ( sid == ss->ss_sids[i] && ber_bvcmp( &a->a_nvals[0], 2936 &ss->ss_ctxcsn[i] ) > 0 ) { 2937 Debug( LDAP_DEBUG_SYNC, "%s syncprov_search_response: " 2938 "Entry %s CSN %s greater than snapshot %s\n", 2939 op->o_log_prefix, 2940 rs->sr_entry->e_name.bv_val, 2941 a->a_nvals[0].bv_val, 2942 ss->ss_ctxcsn[i].bv_val ); 2943 return LDAP_SUCCESS; 2944 } 2945 } 2946 } 2947 2948 /* Don't send old entries twice */ 2949 if ( srs->sr_state.ctxcsn ) { 2950 for ( i=0; i<srs->sr_state.numcsns; i++ ) { 2951 if ( sid == srs->sr_state.sids[i] && 2952 ber_bvcmp( &a->a_nvals[0], 2953 &srs->sr_state.ctxcsn[i] )<= 0 ) { 2954 Debug( LDAP_DEBUG_SYNC, "%s syncprov_search_response: " 2955 "Entry %s CSN %s older or equal to ctx %s\n", 2956 op->o_log_prefix, 2957 rs->sr_entry->e_name.bv_val, 2958 a->a_nvals[0].bv_val, 2959 srs->sr_state.ctxcsn[i].bv_val ); 2960 return LDAP_SUCCESS; 2961 } 2962 } 2963 } 2964 } 2965 rs->sr_ctrls = op->o_tmpalloc( sizeof(LDAPControl *)*2, 2966 op->o_tmpmemctx ); 2967 rs->sr_ctrls[1] = NULL; 2968 rs->sr_flags |= REP_CTRLS_MUSTBEFREED; 2969 /* If we're in delta-sync mode, always send a cookie */ 2970 if ( si->si_nopres && si->si_usehint && a ) { 2971 struct berval cookie; 2972 slap_compose_sync_cookie( op, &cookie, a->a_nvals, srs->sr_state.rid, 2973 slap_serverID ? slap_serverID : -1, NULL ); 2974 rs->sr_err = syncprov_state_ctrl( op, rs, rs->sr_entry, 2975 LDAP_SYNC_ADD, rs->sr_ctrls, 0, 1, &cookie ); 2976 op->o_tmpfree( cookie.bv_val, op->o_tmpmemctx ); 2977 } else { 2978 rs->sr_err = syncprov_state_ctrl( op, rs, rs->sr_entry, 2979 LDAP_SYNC_ADD, rs->sr_ctrls, 0, 0, NULL ); 2980 } 2981 } else if ( rs->sr_type == REP_RESULT && rs->sr_err == LDAP_SUCCESS ) { 2982 struct berval cookie = BER_BVNULL; 2983 2984 if ( ( ss->ss_flags & SS_CHANGED ) && 2985 ss->ss_ctxcsn && !BER_BVISNULL( &ss->ss_ctxcsn[0] )) { 2986 slap_compose_sync_cookie( op, &cookie, ss->ss_ctxcsn, 2987 srs->sr_state.rid, 2988 slap_serverID ? slap_serverID : -1, NULL ); 2989 2990 Debug( LDAP_DEBUG_SYNC, "%s syncprov_search_response: cookie=%s\n", 2991 op->o_log_prefix, cookie.bv_val ); 2992 } 2993 2994 /* Is this a regular refresh? 2995 * Note: refresh never gets here if there were no changes 2996 */ 2997 if ( !ss->ss_so ) { 2998 rs->sr_ctrls = op->o_tmpalloc( sizeof(LDAPControl *)*2, 2999 op->o_tmpmemctx ); 3000 rs->sr_ctrls[1] = NULL; 3001 rs->sr_flags |= REP_CTRLS_MUSTBEFREED; 3002 rs->sr_err = syncprov_done_ctrl( op, rs, rs->sr_ctrls, 3003 0, 1, &cookie, ( ss->ss_flags & SS_PRESENT ) ? LDAP_SYNC_REFRESH_PRESENTS : 3004 LDAP_SYNC_REFRESH_DELETES ); 3005 op->o_tmpfree( cookie.bv_val, op->o_tmpmemctx ); 3006 } else { 3007 /* It's RefreshAndPersist, transition to Persist phase */ 3008 syncprov_sendinfo( op, rs, ( ss->ss_flags & SS_PRESENT ) ? 3009 LDAP_TAG_SYNC_REFRESH_PRESENT : LDAP_TAG_SYNC_REFRESH_DELETE, 3010 ( ss->ss_flags & SS_CHANGED ) ? &cookie : NULL, 3011 1, NULL, 0 ); 3012 if ( !BER_BVISNULL( &cookie )) 3013 op->o_tmpfree( cookie.bv_val, op->o_tmpmemctx ); 3014 3015 /* Detach this Op from frontend control */ 3016 ldap_pvt_thread_mutex_lock( &op->o_conn->c_mutex ); 3017 3018 /* But not if this connection was closed along the way */ 3019 if ( op->o_abandon ) { 3020 ldap_pvt_thread_mutex_unlock( &op->o_conn->c_mutex ); 3021 /* syncprov_ab_cleanup will free this syncop */ 3022 return SLAPD_ABANDON; 3023 3024 } else { 3025 ldap_pvt_thread_mutex_lock( &ss->ss_so->s_mutex ); 3026 /* Turn off the refreshing flag */ 3027 ss->ss_so->s_flags ^= PS_IS_REFRESHING; 3028 3029 Debug( LDAP_DEBUG_SYNC, "%s syncprov_search_response: " 3030 "detaching op\n", op->o_log_prefix ); 3031 syncprov_detach_op( op, ss->ss_so, on ); 3032 3033 ldap_pvt_thread_mutex_unlock( &op->o_conn->c_mutex ); 3034 3035 /* If there are queued responses, fire them off */ 3036 if ( ss->ss_so->s_res ) 3037 syncprov_qstart( ss->ss_so ); 3038 ldap_pvt_thread_mutex_unlock( &ss->ss_so->s_mutex ); 3039 } 3040 3041 return LDAP_SUCCESS; 3042 } 3043 } 3044 3045 return SLAP_CB_CONTINUE; 3046 } 3047 3048 static int 3049 syncprov_op_search( Operation *op, SlapReply *rs ) 3050 { 3051 slap_overinst *on = (slap_overinst *)op->o_bd->bd_info; 3052 syncprov_info_t *si = (syncprov_info_t *)on->on_bi.bi_private; 3053 slap_callback *cb; 3054 int gotstate = 0, changed = 0, do_present = 0; 3055 syncops *sop = NULL; 3056 searchstate *ss; 3057 sync_control *srs; 3058 BerVarray ctxcsn; 3059 int i, *sids, numcsns; 3060 struct berval mincsn, maxcsn; 3061 int minsid, maxsid; 3062 int dirty = 0; 3063 3064 if ( !(op->o_sync_mode & SLAP_SYNC_REFRESH) ) return SLAP_CB_CONTINUE; 3065 3066 if ( op->ors_deref & LDAP_DEREF_SEARCHING ) { 3067 send_ldap_error( op, rs, LDAP_PROTOCOL_ERROR, "illegal value for derefAliases" ); 3068 return rs->sr_err; 3069 } 3070 3071 srs = op->o_controls[slap_cids.sc_LDAPsync]; 3072 Debug( LDAP_DEBUG_SYNC, "%s syncprov_op_search: " 3073 "got a %ssearch with a cookie=%s\n", 3074 op->o_log_prefix, 3075 op->o_sync_mode & SLAP_SYNC_PERSIST ? "persistent ": "", 3076 srs->sr_state.octet_str.bv_val ); 3077 3078 /* If this is a persistent search, set it up right away */ 3079 if ( op->o_sync_mode & SLAP_SYNC_PERSIST ) { 3080 syncops so = {0}; 3081 fbase_cookie fc; 3082 opcookie opc; 3083 slap_callback sc = {0}; 3084 3085 fc.fss = &so; 3086 fc.fbase = 0; 3087 so.s_eid = NOID; 3088 so.s_op = op; 3089 so.s_flags = PS_IS_REFRESHING | PS_FIND_BASE; 3090 /* syncprov_findbase expects to be called as a callback... */ 3091 sc.sc_private = &opc; 3092 opc.son = on; 3093 ldap_pvt_thread_mutex_init( &so.s_mutex ); 3094 cb = op->o_callback; 3095 op->o_callback = ≻ 3096 rs->sr_err = syncprov_findbase( op, &fc ); 3097 op->o_callback = cb; 3098 ldap_pvt_thread_mutex_destroy( &so.s_mutex ); 3099 3100 if ( rs->sr_err != LDAP_SUCCESS ) { 3101 send_ldap_result( op, rs ); 3102 return rs->sr_err; 3103 } 3104 sop = ch_malloc( sizeof( syncops )); 3105 *sop = so; 3106 sop->s_rid = srs->sr_state.rid; 3107 sop->s_sid = srs->sr_state.sid; 3108 /* set refcount=2 to prevent being freed out from under us 3109 * by abandons that occur while we're running here 3110 */ 3111 sop->s_inuse = 2; 3112 3113 ldap_pvt_thread_mutex_lock( &si->si_ops_mutex ); 3114 while ( si->si_active ) { 3115 /* Wait for active mods to finish before proceeding, as they 3116 * may already have inspected the si_ops list looking for 3117 * consumers to replicate the change to. Using the log 3118 * doesn't help, as we may finish playing it before the 3119 * active mods gets added to it. 3120 */ 3121 ldap_pvt_thread_mutex_unlock( &si->si_ops_mutex ); 3122 if ( slapd_shutdown ) { 3123 ch_free( sop ); 3124 return SLAPD_ABANDON; 3125 } 3126 if ( !ldap_pvt_thread_pool_pausecheck( &connection_pool )) 3127 ldap_pvt_thread_yield(); 3128 ldap_pvt_thread_mutex_lock( &si->si_ops_mutex ); 3129 } 3130 if ( op->o_abandon ) { 3131 ldap_pvt_thread_mutex_unlock( &si->si_ops_mutex ); 3132 ch_free( sop ); 3133 return SLAPD_ABANDON; 3134 } 3135 ldap_pvt_thread_mutex_init( &sop->s_mutex ); 3136 sop->s_next = si->si_ops; 3137 sop->s_si = si; 3138 si->si_ops = sop; 3139 ldap_pvt_thread_mutex_unlock( &si->si_ops_mutex ); 3140 Debug( LDAP_DEBUG_SYNC, "%s syncprov_op_search: " 3141 "registered persistent search\n", op->o_log_prefix ); 3142 } 3143 3144 /* snapshot the ctxcsn 3145 * Note: this must not be done before the psearch setup. (ITS#8365) 3146 */ 3147 ldap_pvt_thread_rdwr_rlock( &si->si_csn_rwlock ); 3148 numcsns = si->si_numcsns; 3149 if ( numcsns ) { 3150 ber_bvarray_dup_x( &ctxcsn, si->si_ctxcsn, op->o_tmpmemctx ); 3151 sids = op->o_tmpalloc( numcsns * sizeof(int), op->o_tmpmemctx ); 3152 for ( i=0; i<numcsns; i++ ) 3153 sids[i] = si->si_sids[i]; 3154 } else { 3155 ctxcsn = NULL; 3156 sids = NULL; 3157 } 3158 dirty = si->si_dirty; 3159 ldap_pvt_thread_rdwr_runlock( &si->si_csn_rwlock ); 3160 3161 /* If we have a cookie, handle the PRESENT lookups */ 3162 if ( srs->sr_state.ctxcsn ) { 3163 sessionlog *sl; 3164 int i, j; 3165 3166 /* If we don't have any CSN of our own yet, bail out. 3167 */ 3168 if ( !numcsns ) { 3169 rs->sr_err = LDAP_UNWILLING_TO_PERFORM; 3170 rs->sr_text = "consumer has state info but provider doesn't!"; 3171 goto bailout; 3172 } 3173 3174 if ( !si->si_nopres ) 3175 do_present = SS_PRESENT; 3176 3177 /* If there are SIDs we don't recognize in the cookie, drop them */ 3178 for (i=0; i<srs->sr_state.numcsns; ) { 3179 for (j=i; j<numcsns; j++) { 3180 if ( srs->sr_state.sids[i] <= sids[j] ) { 3181 break; 3182 } 3183 } 3184 /* not found */ 3185 if ( j == numcsns || srs->sr_state.sids[i] != sids[j] ) { 3186 char *tmp = srs->sr_state.ctxcsn[i].bv_val; 3187 srs->sr_state.numcsns--; 3188 for ( j=i; j<srs->sr_state.numcsns; j++ ) { 3189 srs->sr_state.ctxcsn[j] = srs->sr_state.ctxcsn[j+1]; 3190 srs->sr_state.sids[j] = srs->sr_state.sids[j+1]; 3191 } 3192 srs->sr_state.ctxcsn[j].bv_val = tmp; 3193 srs->sr_state.ctxcsn[j].bv_len = 0; 3194 continue; 3195 } 3196 i++; 3197 } 3198 3199 if (srs->sr_state.numcsns != numcsns) { 3200 /* consumer doesn't have the right number of CSNs */ 3201 Debug( LDAP_DEBUG_SYNC, "%s syncprov_op_search: " 3202 "consumer cookie is missing a csn we track\n", 3203 op->o_log_prefix ); 3204 changed = SS_CHANGED; 3205 if ( srs->sr_state.ctxcsn ) { 3206 ber_bvarray_free_x( srs->sr_state.ctxcsn, op->o_tmpmemctx ); 3207 srs->sr_state.ctxcsn = NULL; 3208 } 3209 if ( srs->sr_state.sids ) { 3210 slap_sl_free( srs->sr_state.sids, op->o_tmpmemctx ); 3211 srs->sr_state.sids = NULL; 3212 } 3213 srs->sr_state.numcsns = 0; 3214 goto shortcut; 3215 } 3216 3217 /* Find the smallest CSN which differs from contextCSN */ 3218 mincsn.bv_len = 0; 3219 maxcsn.bv_len = 0; 3220 for ( i=0,j=0; i<srs->sr_state.numcsns; i++ ) { 3221 int newer; 3222 while ( srs->sr_state.sids[i] != sids[j] ) j++; 3223 if ( BER_BVISEMPTY( &maxcsn ) || ber_bvcmp( &maxcsn, 3224 &srs->sr_state.ctxcsn[i] ) < 0 ) { 3225 maxcsn = srs->sr_state.ctxcsn[i]; 3226 maxsid = sids[j]; 3227 } 3228 newer = ber_bvcmp( &srs->sr_state.ctxcsn[i], &ctxcsn[j] ); 3229 /* If our state is newer, tell consumer about changes */ 3230 if ( newer < 0) { 3231 changed = SS_CHANGED; 3232 if ( BER_BVISEMPTY( &mincsn ) || ber_bvcmp( &mincsn, 3233 &srs->sr_state.ctxcsn[i] ) > 0 ) { 3234 mincsn = srs->sr_state.ctxcsn[i]; 3235 minsid = sids[j]; 3236 } 3237 } else if ( newer > 0 && sids[j] == slap_serverID ) { 3238 /* our state is older, complain to consumer */ 3239 rs->sr_err = LDAP_UNWILLING_TO_PERFORM; 3240 rs->sr_text = "consumer state is newer than provider!"; 3241 Debug( LDAP_DEBUG_SYNC, "%s syncprov_op_search: " 3242 "consumer %d state %s is newer than provider %d state %s\n", 3243 op->o_log_prefix, sids[i], srs->sr_state.ctxcsn[i].bv_val, 3244 sids[j], /* == slap_serverID */ 3245 ctxcsn[j].bv_val); 3246 bailout: 3247 if ( sop ) { 3248 syncops **sp = &si->si_ops; 3249 3250 ldap_pvt_thread_mutex_lock( &si->si_ops_mutex ); 3251 while ( *sp != sop ) 3252 sp = &(*sp)->s_next; 3253 *sp = sop->s_next; 3254 ldap_pvt_thread_mutex_unlock( &si->si_ops_mutex ); 3255 ch_free( sop ); 3256 } 3257 rs->sr_ctrls = NULL; 3258 send_ldap_result( op, rs ); 3259 return rs->sr_err; 3260 } 3261 } 3262 if ( BER_BVISEMPTY( &mincsn )) { 3263 mincsn = maxcsn; 3264 minsid = maxsid; 3265 } 3266 3267 /* If nothing has changed, shortcut it */ 3268 if ( !changed && !dirty ) { 3269 do_present = 0; 3270 no_change: if ( !(op->o_sync_mode & SLAP_SYNC_PERSIST) ) { 3271 LDAPControl *ctrls[2]; 3272 3273 ctrls[0] = NULL; 3274 ctrls[1] = NULL; 3275 syncprov_done_ctrl( op, rs, ctrls, 0, 0, 3276 NULL, LDAP_SYNC_REFRESH_DELETES ); 3277 rs->sr_ctrls = ctrls; 3278 rs->sr_err = LDAP_SUCCESS; 3279 send_ldap_result( op, rs ); 3280 rs->sr_ctrls = NULL; 3281 return rs->sr_err; 3282 } 3283 Debug( LDAP_DEBUG_SYNC, "%s syncprov_op_search: " 3284 "no change, skipping log replay\n", 3285 op->o_log_prefix ); 3286 goto shortcut; 3287 } 3288 3289 if ( !BER_BVISNULL( &si->si_logbase ) ) { 3290 do_present = 0; 3291 if ( syncprov_play_accesslog( op, rs, srs, ctxcsn, 3292 numcsns, sids, &mincsn, minsid ) ) { 3293 do_present = SS_PRESENT; 3294 } 3295 } else if ( si->si_logs ) { 3296 do_present = 0; 3297 if ( syncprov_play_sessionlog( op, rs, srs, ctxcsn, 3298 numcsns, sids, &mincsn, minsid ) ) { 3299 do_present = SS_PRESENT; 3300 } 3301 } 3302 /* 3303 * If sessionlog wasn't useful, see if we can find at least one entry 3304 * that hasn't changed based on the cookie. 3305 * 3306 * TODO: Using mincsn only (rather than the whole cookie) will 3307 * under-approximate the set of entries that haven't changed, but we 3308 * can't look up CSNs by serverid with the current indexing support. 3309 * 3310 * As a result, dormant serverids in the cluster become mincsns and 3311 * more likely to make syncprov_findcsn(,FIND_CSN,) fail -> triggering 3312 * an expensive refresh... 3313 */ 3314 if ( !do_present ) { 3315 gotstate = 1; 3316 } else if ( syncprov_findcsn( op, FIND_CSN, &mincsn ) != LDAP_SUCCESS ) { 3317 /* No, so a reload is required */ 3318 /* the 2.2 consumer doesn't send this hint */ 3319 if ( si->si_usehint && srs->sr_rhint == 0 ) { 3320 if ( ctxcsn ) 3321 ber_bvarray_free_x( ctxcsn, op->o_tmpmemctx ); 3322 if ( sids ) 3323 op->o_tmpfree( sids, op->o_tmpmemctx ); 3324 rs->sr_err = LDAP_SYNC_REFRESH_REQUIRED; 3325 rs->sr_text = "sync cookie is stale"; 3326 goto bailout; 3327 } 3328 Debug( LDAP_DEBUG_SYNC, "%s syncprov_op_search: " 3329 "failed to find entry with csn=%s, ignoring cookie\n", 3330 op->o_log_prefix, mincsn.bv_val ); 3331 if ( srs->sr_state.ctxcsn ) { 3332 ber_bvarray_free_x( srs->sr_state.ctxcsn, op->o_tmpmemctx ); 3333 srs->sr_state.ctxcsn = NULL; 3334 } 3335 if ( srs->sr_state.sids ) { 3336 slap_sl_free( srs->sr_state.sids, op->o_tmpmemctx ); 3337 srs->sr_state.sids = NULL; 3338 } 3339 srs->sr_state.numcsns = 0; 3340 } else { 3341 gotstate = 1; 3342 /* If changed and doing Present lookup, send Present UUIDs */ 3343 if ( syncprov_findcsn( op, FIND_PRESENT, 0 ) != LDAP_SUCCESS ) { 3344 if ( ctxcsn ) 3345 ber_bvarray_free_x( ctxcsn, op->o_tmpmemctx ); 3346 if ( sids ) 3347 op->o_tmpfree( sids, op->o_tmpmemctx ); 3348 goto bailout; 3349 } 3350 } 3351 } else { 3352 /* The consumer knows nothing, we know nothing. OK. */ 3353 if (!numcsns) 3354 goto no_change; 3355 /* No consumer state, assume something has changed */ 3356 changed = SS_CHANGED; 3357 } 3358 3359 shortcut: 3360 /* Append CSN range to search filter, save original filter 3361 * for persistent search evaluation 3362 */ 3363 if ( sop ) { 3364 ldap_pvt_thread_mutex_lock( &sop->s_mutex ); 3365 sop->s_filterstr = op->ors_filterstr; 3366 /* correct the refcount that was set to 2 before */ 3367 sop->s_inuse--; 3368 } 3369 3370 /* If something changed, find the changes */ 3371 if ( gotstate && ( changed || dirty ) ) { 3372 Filter *fand, *fava; 3373 3374 fand = op->o_tmpalloc( sizeof(Filter), op->o_tmpmemctx ); 3375 fand->f_choice = LDAP_FILTER_AND; 3376 fand->f_next = NULL; 3377 fava = op->o_tmpalloc( sizeof(Filter), op->o_tmpmemctx ); 3378 fand->f_and = fava; 3379 fava->f_choice = LDAP_FILTER_GE; 3380 fava->f_ava = op->o_tmpalloc( sizeof(AttributeAssertion), op->o_tmpmemctx ); 3381 fava->f_ava->aa_desc = slap_schema.si_ad_entryCSN; 3382 #ifdef LDAP_COMP_MATCH 3383 fava->f_ava->aa_cf = NULL; 3384 #endif 3385 ber_dupbv_x( &fava->f_ava->aa_value, &mincsn, op->o_tmpmemctx ); 3386 fava->f_next = op->ors_filter; 3387 op->ors_filter = fand; 3388 filter2bv_x( op, op->ors_filter, &op->ors_filterstr ); 3389 if ( sop ) { 3390 sop->s_flags |= PS_FIX_FILTER; 3391 } 3392 } 3393 if ( sop ) { 3394 ldap_pvt_thread_mutex_unlock( &sop->s_mutex ); 3395 } 3396 3397 /* Let our callback add needed info to returned entries */ 3398 cb = op->o_tmpcalloc(1, sizeof(slap_callback)+sizeof(searchstate), op->o_tmpmemctx); 3399 ss = (searchstate *)(cb+1); 3400 ss->ss_on = on; 3401 ss->ss_so = sop; 3402 ss->ss_flags = do_present | changed; 3403 ss->ss_ctxcsn = ctxcsn; 3404 ss->ss_numcsns = numcsns; 3405 ss->ss_sids = sids; 3406 cb->sc_response = syncprov_search_response; 3407 cb->sc_private = ss; 3408 cb->sc_next = op->o_callback; 3409 op->o_callback = cb; 3410 3411 /* If this is a persistent search and no changes were reported during 3412 * the refresh phase, just invoke the response callback to transition 3413 * us into persist phase 3414 */ 3415 if ( !changed && !dirty ) { 3416 Debug( LDAP_DEBUG_SYNC, "%s syncprov_op_search: " 3417 "nothing changed, finishing up initial search early\n", 3418 op->o_log_prefix ); 3419 rs->sr_err = LDAP_SUCCESS; 3420 rs->sr_nentries = 0; 3421 send_ldap_result( op, rs ); 3422 return rs->sr_err; 3423 } 3424 return SLAP_CB_CONTINUE; 3425 } 3426 3427 static int 3428 syncprov_operational( 3429 Operation *op, 3430 SlapReply *rs ) 3431 { 3432 slap_overinst *on = (slap_overinst *)op->o_bd->bd_info; 3433 syncprov_info_t *si = (syncprov_info_t *)on->on_bi.bi_private; 3434 3435 /* This prevents generating unnecessarily; frontend will strip 3436 * any statically stored copy. 3437 */ 3438 if ( op->o_sync != SLAP_CONTROL_NONE ) 3439 return SLAP_CB_CONTINUE; 3440 3441 if ( rs->sr_entry && 3442 dn_match( &rs->sr_entry->e_nname, &si->si_contextdn )) { 3443 3444 if ( SLAP_OPATTRS( rs->sr_attr_flags ) || 3445 ad_inlist( slap_schema.si_ad_contextCSN, rs->sr_attrs )) { 3446 Attribute *a, **ap = NULL; 3447 3448 for ( a=rs->sr_entry->e_attrs; a; a=a->a_next ) { 3449 if ( a->a_desc == slap_schema.si_ad_contextCSN ) 3450 break; 3451 } 3452 3453 ldap_pvt_thread_rdwr_rlock( &si->si_csn_rwlock ); 3454 if ( si->si_ctxcsn ) { 3455 if ( !a ) { 3456 for ( ap = &rs->sr_operational_attrs; *ap; 3457 ap=&(*ap)->a_next ); 3458 3459 a = attr_alloc( slap_schema.si_ad_contextCSN ); 3460 *ap = a; 3461 } 3462 3463 if ( !ap ) { 3464 if ( rs_entry2modifiable( op, rs, on )) { 3465 a = attr_find( rs->sr_entry->e_attrs, 3466 slap_schema.si_ad_contextCSN ); 3467 } 3468 if ( a->a_nvals != a->a_vals ) { 3469 ber_bvarray_free( a->a_nvals ); 3470 } 3471 a->a_nvals = NULL; 3472 ber_bvarray_free( a->a_vals ); 3473 a->a_vals = NULL; 3474 a->a_numvals = 0; 3475 } 3476 attr_valadd( a, si->si_ctxcsn, si->si_ctxcsn, si->si_numcsns ); 3477 } 3478 ldap_pvt_thread_rdwr_runlock( &si->si_csn_rwlock ); 3479 } 3480 } 3481 return SLAP_CB_CONTINUE; 3482 } 3483 3484 static int 3485 syncprov_setup_accesslog(void) 3486 { 3487 const char *text; 3488 int rc = -1; 3489 3490 if ( !ad_reqType ) { 3491 if ( slap_str2ad( "reqType", &ad_reqType, &text ) ) { 3492 Debug( LDAP_DEBUG_ANY, "syncprov_setup_accesslog: " 3493 "couldn't get definition for attribute reqType, " 3494 "is accessslog configured?\n" ); 3495 return rc; 3496 } 3497 } 3498 3499 if ( !ad_reqResult ) { 3500 if ( slap_str2ad( "reqResult", &ad_reqResult, &text ) ) { 3501 Debug( LDAP_DEBUG_ANY, "syncprov_setup_accesslog: " 3502 "couldn't get definition for attribute reqResult, " 3503 "is accessslog configured?\n" ); 3504 return rc; 3505 } 3506 } 3507 3508 if ( !ad_reqDN ) { 3509 if ( slap_str2ad( "reqDN", &ad_reqDN, &text ) ) { 3510 Debug( LDAP_DEBUG_ANY, "syncprov_setup_accesslog: " 3511 "couldn't get definition for attribute reqDN, " 3512 "is accessslog configured?\n" ); 3513 return rc; 3514 } 3515 } 3516 3517 if ( !ad_reqEntryUUID ) { 3518 if ( slap_str2ad( "reqEntryUUID", &ad_reqEntryUUID, &text ) ) { 3519 Debug( LDAP_DEBUG_ANY, "syncprov_setup_accesslog: " 3520 "couldn't get definition for attribute reqEntryUUID, " 3521 "is accessslog configured?\n" ); 3522 return rc; 3523 } 3524 } 3525 3526 if ( !ad_reqNewDN ) { 3527 if ( slap_str2ad( "reqNewDN", &ad_reqNewDN, &text ) ) { 3528 Debug( LDAP_DEBUG_ANY, "syncprov_setup_accesslog: " 3529 "couldn't get definition for attribute reqNewDN, " 3530 "is accessslog configured?\n" ); 3531 return rc; 3532 } 3533 } 3534 3535 if ( !ad_minCSN ) { 3536 if ( slap_str2ad( "minCSN", &ad_minCSN, &text ) ) { 3537 Debug( LDAP_DEBUG_ANY, "syncprov_setup_accesslog: " 3538 "couldn't get definition for attribute minCSN, " 3539 "is accessslog configured?\n" ); 3540 return rc; 3541 } 3542 } 3543 3544 return LDAP_SUCCESS; 3545 } 3546 3547 enum { 3548 SP_CHKPT = 1, 3549 SP_SESSL, 3550 SP_NOPRES, 3551 SP_USEHINT, 3552 SP_LOGDB 3553 }; 3554 3555 static ConfigDriver sp_cf_gen; 3556 3557 static ConfigTable spcfg[] = { 3558 { "syncprov-checkpoint", "ops> <minutes", 3, 3, 0, ARG_MAGIC|SP_CHKPT, 3559 sp_cf_gen, "( OLcfgOvAt:1.1 NAME 'olcSpCheckpoint' " 3560 "DESC 'ContextCSN checkpoint interval in ops and minutes' " 3561 "EQUALITY caseIgnoreMatch " 3562 "SYNTAX OMsDirectoryString SINGLE-VALUE )", NULL, NULL }, 3563 { "syncprov-sessionlog", "ops", 2, 2, 0, ARG_INT|ARG_MAGIC|SP_SESSL, 3564 sp_cf_gen, "( OLcfgOvAt:1.2 NAME 'olcSpSessionlog' " 3565 "DESC 'Session log size in ops' " 3566 "EQUALITY integerMatch " 3567 "SYNTAX OMsInteger SINGLE-VALUE )", NULL, NULL }, 3568 { "syncprov-nopresent", NULL, 2, 2, 0, ARG_ON_OFF|ARG_MAGIC|SP_NOPRES, 3569 sp_cf_gen, "( OLcfgOvAt:1.3 NAME 'olcSpNoPresent' " 3570 "DESC 'Omit Present phase processing' " 3571 "EQUALITY booleanMatch " 3572 "SYNTAX OMsBoolean SINGLE-VALUE )", NULL, NULL }, 3573 { "syncprov-reloadhint", NULL, 2, 2, 0, ARG_ON_OFF|ARG_MAGIC|SP_USEHINT, 3574 sp_cf_gen, "( OLcfgOvAt:1.4 NAME 'olcSpReloadHint' " 3575 "DESC 'Observe Reload Hint in Request control' " 3576 "EQUALITY booleanMatch " 3577 "SYNTAX OMsBoolean SINGLE-VALUE )", NULL, NULL }, 3578 { "syncprov-sessionlog-source", NULL, 2, 2, 0, ARG_DN|ARG_QUOTE|ARG_MAGIC|SP_LOGDB, 3579 sp_cf_gen, "( OLcfgOvAt:1.5 NAME 'olcSpSessionlogSource' " 3580 "DESC 'On startup, try loading sessionlog from this subtree' " 3581 "SYNTAX OMsDN SINGLE-VALUE )", NULL, NULL }, 3582 { NULL, NULL, 0, 0, 0, ARG_IGNORED } 3583 }; 3584 3585 static ConfigOCs spocs[] = { 3586 { "( OLcfgOvOc:1.1 " 3587 "NAME 'olcSyncProvConfig' " 3588 "DESC 'SyncRepl Provider configuration' " 3589 "SUP olcOverlayConfig " 3590 "MAY ( olcSpCheckpoint " 3591 "$ olcSpSessionlog " 3592 "$ olcSpNoPresent " 3593 "$ olcSpReloadHint " 3594 "$ olcSpSessionlogSource " 3595 ") )", 3596 Cft_Overlay, spcfg }, 3597 { NULL, 0, NULL } 3598 }; 3599 3600 static int 3601 sp_cf_gen(ConfigArgs *c) 3602 { 3603 slap_overinst *on = (slap_overinst *)c->bi; 3604 syncprov_info_t *si = (syncprov_info_t *)on->on_bi.bi_private; 3605 int rc = 0; 3606 3607 if ( c->op == SLAP_CONFIG_EMIT ) { 3608 switch ( c->type ) { 3609 case SP_CHKPT: 3610 if ( si->si_chkops || si->si_chktime ) { 3611 struct berval bv; 3612 /* we assume si_chktime is a multiple of 60 3613 * because the parsed value was originally 3614 * multiplied by 60 */ 3615 bv.bv_len = snprintf( c->cr_msg, sizeof( c->cr_msg ), 3616 "%d %d", si->si_chkops, si->si_chktime/60 ); 3617 if ( bv.bv_len >= sizeof( c->cr_msg ) ) { 3618 rc = 1; 3619 } else { 3620 bv.bv_val = c->cr_msg; 3621 value_add_one( &c->rvalue_vals, &bv ); 3622 } 3623 } else { 3624 rc = 1; 3625 } 3626 break; 3627 case SP_SESSL: 3628 if ( si->si_logs ) { 3629 c->value_int = si->si_logs->sl_size; 3630 } else { 3631 rc = 1; 3632 } 3633 break; 3634 case SP_NOPRES: 3635 if ( si->si_nopres ) { 3636 c->value_int = 1; 3637 } else { 3638 rc = 1; 3639 } 3640 break; 3641 case SP_USEHINT: 3642 if ( si->si_usehint ) { 3643 c->value_int = 1; 3644 } else { 3645 rc = 1; 3646 } 3647 break; 3648 case SP_LOGDB: 3649 if ( BER_BVISEMPTY( &si->si_logbase ) ) { 3650 rc = 1; 3651 } else { 3652 value_add_one( &c->rvalue_vals, &si->si_logbase ); 3653 value_add_one( &c->rvalue_nvals, &si->si_logbase ); 3654 } 3655 break; 3656 } 3657 return rc; 3658 } else if ( c->op == LDAP_MOD_DELETE ) { 3659 switch ( c->type ) { 3660 case SP_CHKPT: 3661 si->si_chkops = 0; 3662 si->si_chktime = 0; 3663 break; 3664 case SP_SESSL: 3665 if ( si->si_logs ) 3666 si->si_logs->sl_size = 0; 3667 break; 3668 case SP_NOPRES: 3669 si->si_nopres = 0; 3670 break; 3671 case SP_USEHINT: 3672 si->si_usehint = 0; 3673 break; 3674 case SP_LOGDB: 3675 if ( !BER_BVISNULL( &si->si_logbase ) ) { 3676 ch_free( si->si_logbase.bv_val ); 3677 BER_BVZERO( &si->si_logbase ); 3678 } 3679 break; 3680 } 3681 return rc; 3682 } 3683 switch ( c->type ) { 3684 case SP_CHKPT: 3685 if ( lutil_atoi( &si->si_chkops, c->argv[1] ) != 0 ) { 3686 snprintf( c->cr_msg, sizeof( c->cr_msg ), "%s unable to parse checkpoint ops # \"%s\"", 3687 c->argv[0], c->argv[1] ); 3688 Debug( LDAP_DEBUG_CONFIG|LDAP_DEBUG_NONE, 3689 "%s: %s\n", c->log, c->cr_msg ); 3690 return ARG_BAD_CONF; 3691 } 3692 if ( si->si_chkops <= 0 ) { 3693 snprintf( c->cr_msg, sizeof( c->cr_msg ), "%s invalid checkpoint ops # \"%d\"", 3694 c->argv[0], si->si_chkops ); 3695 Debug( LDAP_DEBUG_CONFIG|LDAP_DEBUG_NONE, 3696 "%s: %s\n", c->log, c->cr_msg ); 3697 return ARG_BAD_CONF; 3698 } 3699 if ( lutil_atoi( &si->si_chktime, c->argv[2] ) != 0 ) { 3700 snprintf( c->cr_msg, sizeof( c->cr_msg ), "%s unable to parse checkpoint time \"%s\"", 3701 c->argv[0], c->argv[1] ); 3702 Debug( LDAP_DEBUG_CONFIG|LDAP_DEBUG_NONE, 3703 "%s: %s\n", c->log, c->cr_msg ); 3704 return ARG_BAD_CONF; 3705 } 3706 if ( si->si_chktime <= 0 ) { 3707 snprintf( c->cr_msg, sizeof( c->cr_msg ), "%s invalid checkpoint time \"%d\"", 3708 c->argv[0], si->si_chkops ); 3709 Debug( LDAP_DEBUG_CONFIG|LDAP_DEBUG_NONE, 3710 "%s: %s\n", c->log, c->cr_msg ); 3711 return ARG_BAD_CONF; 3712 } 3713 si->si_chktime *= 60; 3714 break; 3715 case SP_SESSL: { 3716 sessionlog *sl; 3717 int size = c->value_int; 3718 3719 if ( size < 0 ) { 3720 snprintf( c->cr_msg, sizeof( c->cr_msg ), "%s size %d is negative", 3721 c->argv[0], size ); 3722 Debug( LDAP_DEBUG_CONFIG|LDAP_DEBUG_NONE, 3723 "%s: %s\n", c->log, c->cr_msg ); 3724 return ARG_BAD_CONF; 3725 } 3726 if ( size && !BER_BVISNULL( &si->si_logbase ) ) { 3727 Debug( LDAP_DEBUG_ANY, "syncprov_config: while configuring " 3728 "internal sessionlog, accesslog source has already been " 3729 "configured, this results in wasteful operation\n" ); 3730 } 3731 sl = si->si_logs; 3732 if ( !sl ) { 3733 if ( !size ) break; 3734 sl = ch_calloc( 1, sizeof( sessionlog )); 3735 ldap_pvt_thread_rdwr_init( &sl->sl_mutex ); 3736 si->si_logs = sl; 3737 } 3738 sl->sl_size = size; 3739 } 3740 break; 3741 case SP_NOPRES: 3742 si->si_nopres = c->value_int; 3743 break; 3744 case SP_USEHINT: 3745 si->si_usehint = c->value_int; 3746 break; 3747 case SP_LOGDB: 3748 if ( si->si_logs ) { 3749 Debug( LDAP_DEBUG_ANY, "syncprov_config: while configuring " 3750 "accesslog source, internal sessionlog has already been " 3751 "configured, this results in wasteful operation\n" ); 3752 } 3753 if ( CONFIG_ONLINE_ADD( c ) ) { 3754 if ( !select_backend( &c->value_ndn, 0 ) ) { 3755 snprintf( c->cr_msg, sizeof( c->cr_msg ), 3756 "<%s> no matching backend found for suffix", 3757 c->argv[0] ); 3758 Debug( LDAP_DEBUG_ANY, "%s: %s \"%s\"\n", 3759 c->log, c->cr_msg, c->value_dn.bv_val ); 3760 rc = 1; 3761 break; 3762 } 3763 ch_free( c->value_ndn.bv_val ); 3764 } 3765 si->si_logbase = c->value_ndn; 3766 rc = syncprov_setup_accesslog(); 3767 ch_free( c->value_dn.bv_val ); 3768 break; 3769 } 3770 return rc; 3771 } 3772 3773 /* ITS#3456 we cannot run this search on the main thread, must use a 3774 * child thread in order to insure we have a big enough stack. 3775 */ 3776 static void * 3777 syncprov_db_otask( 3778 void *ptr 3779 ) 3780 { 3781 syncprov_findcsn( ptr, FIND_MAXCSN, 0 ); 3782 return NULL; 3783 } 3784 3785 static int 3786 syncprov_db_ocallback( 3787 Operation *op, 3788 SlapReply *rs 3789 ) 3790 { 3791 if ( rs->sr_type == REP_SEARCH && rs->sr_err == LDAP_SUCCESS ) { 3792 if ( rs->sr_entry->e_name.bv_len ) 3793 op->o_callback->sc_private = (void *)1; 3794 } 3795 return LDAP_SUCCESS; 3796 } 3797 3798 /* ITS#9015 see if the DB is really empty */ 3799 static void * 3800 syncprov_db_otask2( 3801 void *ptr 3802 ) 3803 { 3804 Operation *op = ptr; 3805 SlapReply rs = {REP_RESULT}; 3806 slap_callback cb = {0}; 3807 int rc; 3808 3809 cb.sc_response = syncprov_db_ocallback; 3810 3811 op->o_managedsait = SLAP_CONTROL_CRITICAL; 3812 op->o_callback = &cb; 3813 op->o_tag = LDAP_REQ_SEARCH; 3814 op->ors_scope = LDAP_SCOPE_SUBTREE; 3815 op->ors_limit = NULL; 3816 op->ors_slimit = 1; 3817 op->ors_tlimit = SLAP_NO_LIMIT; 3818 op->ors_attrs = slap_anlist_no_attrs; 3819 op->ors_attrsonly = 1; 3820 op->ors_deref = LDAP_DEREF_NEVER; 3821 op->ors_filter = &generic_filter; 3822 op->ors_filterstr = generic_filterstr; 3823 rc = op->o_bd->be_search( op, &rs ); 3824 if ( rc == LDAP_SIZELIMIT_EXCEEDED || cb.sc_private ) 3825 op->ors_slimit = 2; 3826 return NULL; 3827 } 3828 3829 /* Read any existing contextCSN from the underlying db. 3830 * Then search for any entries newer than that. If no value exists, 3831 * just generate it. Cache whatever result. 3832 */ 3833 static int 3834 syncprov_db_open( 3835 BackendDB *be, 3836 ConfigReply *cr 3837 ) 3838 { 3839 slap_overinst *on = (slap_overinst *) be->bd_info; 3840 syncprov_info_t *si = (syncprov_info_t *)on->on_bi.bi_private; 3841 3842 Connection conn = { 0 }; 3843 OperationBuffer opbuf; 3844 Operation *op; 3845 Entry *e = NULL; 3846 Attribute *a; 3847 int rc; 3848 void *thrctx = NULL; 3849 3850 if ( !SLAP_LASTMOD( be )) { 3851 Debug( LDAP_DEBUG_ANY, 3852 "syncprov_db_open: invalid config, lastmod must be enabled\n" ); 3853 return -1; 3854 } 3855 3856 if ( slapMode & SLAP_TOOL_MODE ) { 3857 return 0; 3858 } 3859 3860 rc = overlay_register_control( be, LDAP_CONTROL_SYNC ); 3861 if ( rc ) { 3862 return rc; 3863 } 3864 3865 Debug( LDAP_DEBUG_SYNC, "syncprov_db_open: " 3866 "starting syncprov for suffix %s\n", 3867 be->be_suffix[0].bv_val ); 3868 3869 thrctx = ldap_pvt_thread_pool_context(); 3870 connection_fake_init2( &conn, &opbuf, thrctx, 0 ); 3871 op = &opbuf.ob_op; 3872 op->o_bd = be; 3873 op->o_dn = be->be_rootdn; 3874 op->o_ndn = be->be_rootndn; 3875 3876 if ( SLAP_SYNC_SUBENTRY( be )) { 3877 build_new_dn( &si->si_contextdn, be->be_nsuffix, 3878 (struct berval *)&slap_ldapsync_cn_bv, NULL ); 3879 } else { 3880 si->si_contextdn = be->be_nsuffix[0]; 3881 } 3882 rc = overlay_entry_get_ov( op, &si->si_contextdn, NULL, 3883 slap_schema.si_ad_contextCSN, 0, &e, on ); 3884 3885 if ( e ) { 3886 ldap_pvt_thread_t tid; 3887 3888 a = attr_find( e->e_attrs, slap_schema.si_ad_contextCSN ); 3889 if ( a ) { 3890 ber_bvarray_dup_x( &si->si_ctxcsn, a->a_vals, NULL ); 3891 si->si_numcsns = a->a_numvals; 3892 si->si_sids = slap_parse_csn_sids( si->si_ctxcsn, a->a_numvals, NULL ); 3893 slap_sort_csn_sids( si->si_ctxcsn, si->si_sids, si->si_numcsns, NULL ); 3894 } 3895 overlay_entry_release_ov( op, e, 0, on ); 3896 if ( si->si_ctxcsn && !SLAP_DBCLEAN( be )) { 3897 op->o_tag = LDAP_REQ_SEARCH; 3898 op->o_req_dn = be->be_suffix[0]; 3899 op->o_req_ndn = be->be_nsuffix[0]; 3900 op->ors_scope = LDAP_SCOPE_SUBTREE; 3901 ldap_pvt_thread_create( &tid, 0, syncprov_db_otask, op ); 3902 ldap_pvt_thread_join( tid, NULL ); 3903 } 3904 } 3905 3906 /* Didn't find a contextCSN, should we generate one? */ 3907 if ( !si->si_ctxcsn ) { 3908 char csnbuf[ LDAP_PVT_CSNSTR_BUFSIZE ]; 3909 struct berval csn; 3910 3911 if ( SLAP_SINGLE_SHADOW( op->o_bd ) ) { 3912 /* Not in charge of this serverID, don't generate anything. */ 3913 goto out; 3914 } 3915 if ( !SLAP_SYNC_SUBENTRY( be ) && rc != LDAP_SUCCESS 3916 && rc != LDAP_NO_SUCH_ATTRIBUTE ) { 3917 /* If the DB is genuinely empty, don't generate one either. */ 3918 goto out; 3919 } 3920 if ( !si->si_contextdn.bv_len ) { 3921 ldap_pvt_thread_t tid; 3922 /* a glue entry here with no contextCSN might mean an empty DB. 3923 * we need to search for children, to be sure. 3924 */ 3925 op->o_req_dn = be->be_suffix[0]; 3926 op->o_req_ndn = be->be_nsuffix[0]; 3927 op->o_bd->bd_info = (BackendInfo *)on->on_info; 3928 ldap_pvt_thread_create( &tid, 0, syncprov_db_otask2, op ); 3929 ldap_pvt_thread_join( tid, NULL ); 3930 if ( op->ors_slimit == 1 ) 3931 goto out; 3932 } 3933 3934 csn.bv_val = csnbuf; 3935 csn.bv_len = sizeof( csnbuf ); 3936 slap_get_csn( op, &csn, 0 ); 3937 value_add_one( &si->si_ctxcsn, &csn ); 3938 si->si_numcsns = 1; 3939 si->si_sids = ch_malloc( sizeof(int) ); 3940 si->si_sids[0] = slap_serverID; 3941 Debug( LDAP_DEBUG_SYNC, "syncprov_db_open: " 3942 "generated a new ctxcsn=%s for suffix %s\n", 3943 csn.bv_val, be->be_suffix[0].bv_val ); 3944 3945 /* make sure we do a checkpoint on close */ 3946 si->si_numops++; 3947 } 3948 3949 /* Initialize the sessionlog mincsn */ 3950 if ( si->si_logs && si->si_numcsns ) { 3951 sessionlog *sl = si->si_logs; 3952 int i; 3953 ber_bvarray_dup_x( &sl->sl_mincsn, si->si_ctxcsn, NULL ); 3954 sl->sl_numcsns = si->si_numcsns; 3955 sl->sl_sids = ch_malloc( si->si_numcsns * sizeof(int) ); 3956 for ( i=0; i < si->si_numcsns; i++ ) 3957 sl->sl_sids[i] = si->si_sids[i]; 3958 } 3959 3960 if ( !BER_BVISNULL( &si->si_logbase ) ) { 3961 BackendDB *db = select_backend( &si->si_logbase, 0 ); 3962 if ( !db ) { 3963 Debug( LDAP_DEBUG_ANY, "syncprov_db_open: " 3964 "configured accesslog database dn='%s' not present\n", 3965 si->si_logbase.bv_val ); 3966 return -1; 3967 } 3968 } 3969 3970 out: 3971 op->o_bd->bd_info = (BackendInfo *)on; 3972 return 0; 3973 } 3974 3975 /* Write the current contextCSN into the underlying db. 3976 */ 3977 static int 3978 syncprov_db_close( 3979 BackendDB *be, 3980 ConfigReply *cr 3981 ) 3982 { 3983 slap_overinst *on = (slap_overinst *) be->bd_info; 3984 syncprov_info_t *si = (syncprov_info_t *)on->on_bi.bi_private; 3985 #ifdef SLAP_CONFIG_DELETE 3986 syncops *so, *sonext; 3987 #endif /* SLAP_CONFIG_DELETE */ 3988 3989 if ( slapMode & SLAP_TOOL_MODE ) { 3990 return 0; 3991 } 3992 if ( si->si_numops ) { 3993 Connection conn = {0}; 3994 OperationBuffer opbuf; 3995 Operation *op; 3996 void *thrctx; 3997 3998 thrctx = ldap_pvt_thread_pool_context(); 3999 connection_fake_init2( &conn, &opbuf, thrctx, 0 ); 4000 op = &opbuf.ob_op; 4001 op->o_bd = be; 4002 op->o_dn = be->be_rootdn; 4003 op->o_ndn = be->be_rootndn; 4004 syncprov_checkpoint( op, on ); 4005 } 4006 4007 #ifdef SLAP_CONFIG_DELETE 4008 if ( !slapd_shutdown ) { 4009 ldap_pvt_thread_mutex_lock( &si->si_ops_mutex ); 4010 for ( so=si->si_ops, sonext=so; so; so=sonext ) { 4011 SlapReply rs = {REP_RESULT}; 4012 rs.sr_err = LDAP_UNAVAILABLE; 4013 ldap_pvt_thread_mutex_lock( &so->s_mutex ); 4014 send_ldap_result( so->s_op, &rs ); 4015 sonext=so->s_next; 4016 if ( so->s_flags & PS_TASK_QUEUED ) 4017 ldap_pvt_thread_pool_retract( so->s_pool_cookie ); 4018 ldap_pvt_thread_mutex_unlock( &so->s_mutex ); 4019 if ( !syncprov_drop_psearch( so, 0 )) 4020 so->s_si = NULL; 4021 } 4022 si->si_ops=NULL; 4023 ldap_pvt_thread_mutex_unlock( &si->si_ops_mutex ); 4024 } 4025 overlay_unregister_control( be, LDAP_CONTROL_SYNC ); 4026 #endif /* SLAP_CONFIG_DELETE */ 4027 4028 return 0; 4029 } 4030 4031 static int 4032 syncprov_db_init( 4033 BackendDB *be, 4034 ConfigReply *cr 4035 ) 4036 { 4037 slap_overinst *on = (slap_overinst *)be->bd_info; 4038 syncprov_info_t *si; 4039 4040 if ( SLAP_ISGLOBALOVERLAY( be ) ) { 4041 Debug( LDAP_DEBUG_ANY, 4042 "syncprov must be instantiated within a database.\n" ); 4043 return 1; 4044 } 4045 4046 si = ch_calloc(1, sizeof(syncprov_info_t)); 4047 on->on_bi.bi_private = si; 4048 ldap_pvt_thread_rdwr_init( &si->si_csn_rwlock ); 4049 ldap_pvt_thread_mutex_init( &si->si_ops_mutex ); 4050 ldap_pvt_thread_mutex_init( &si->si_mods_mutex ); 4051 ldap_pvt_thread_mutex_init( &si->si_resp_mutex ); 4052 4053 csn_anlist[0].an_desc = slap_schema.si_ad_entryCSN; 4054 csn_anlist[0].an_name = slap_schema.si_ad_entryCSN->ad_cname; 4055 csn_anlist[1].an_desc = slap_schema.si_ad_entryUUID; 4056 csn_anlist[1].an_name = slap_schema.si_ad_entryUUID->ad_cname; 4057 4058 uuid_anlist[0].an_desc = slap_schema.si_ad_entryUUID; 4059 uuid_anlist[0].an_name = slap_schema.si_ad_entryUUID->ad_cname; 4060 4061 return 0; 4062 } 4063 4064 static int 4065 syncprov_db_destroy( 4066 BackendDB *be, 4067 ConfigReply *cr 4068 ) 4069 { 4070 slap_overinst *on = (slap_overinst *)be->bd_info; 4071 syncprov_info_t *si = (syncprov_info_t *)on->on_bi.bi_private; 4072 4073 if ( si ) { 4074 if ( si->si_logs ) { 4075 sessionlog *sl = si->si_logs; 4076 4077 ldap_tavl_free( sl->sl_entries, (AVL_FREE)ch_free ); 4078 if ( sl->sl_mincsn ) 4079 ber_bvarray_free( sl->sl_mincsn ); 4080 if ( sl->sl_sids ) 4081 ch_free( sl->sl_sids ); 4082 4083 ldap_pvt_thread_rdwr_destroy(&si->si_logs->sl_mutex); 4084 ch_free( si->si_logs ); 4085 } 4086 if ( si->si_ctxcsn ) 4087 ber_bvarray_free( si->si_ctxcsn ); 4088 if ( si->si_sids ) 4089 ch_free( si->si_sids ); 4090 ldap_pvt_thread_mutex_destroy( &si->si_resp_mutex ); 4091 ldap_pvt_thread_mutex_destroy( &si->si_mods_mutex ); 4092 ldap_pvt_thread_mutex_destroy( &si->si_ops_mutex ); 4093 ldap_pvt_thread_rdwr_destroy( &si->si_csn_rwlock ); 4094 ch_free( si ); 4095 } 4096 4097 return 0; 4098 } 4099 4100 static int syncprov_parseCtrl ( 4101 Operation *op, 4102 SlapReply *rs, 4103 LDAPControl *ctrl ) 4104 { 4105 ber_tag_t tag; 4106 BerElementBuffer berbuf; 4107 BerElement *ber = (BerElement *)&berbuf; 4108 ber_int_t mode; 4109 ber_len_t len; 4110 struct berval cookie = BER_BVNULL; 4111 sync_control *sr; 4112 int rhint = 0; 4113 4114 if ( op->o_sync != SLAP_CONTROL_NONE ) { 4115 rs->sr_text = "Sync control specified multiple times"; 4116 return LDAP_PROTOCOL_ERROR; 4117 } 4118 4119 if ( op->o_pagedresults != SLAP_CONTROL_NONE ) { 4120 rs->sr_text = "Sync control specified with pagedResults control"; 4121 return LDAP_PROTOCOL_ERROR; 4122 } 4123 4124 if ( BER_BVISNULL( &ctrl->ldctl_value ) ) { 4125 rs->sr_text = "Sync control value is absent"; 4126 return LDAP_PROTOCOL_ERROR; 4127 } 4128 4129 if ( BER_BVISEMPTY( &ctrl->ldctl_value ) ) { 4130 rs->sr_text = "Sync control value is empty"; 4131 return LDAP_PROTOCOL_ERROR; 4132 } 4133 4134 /* Parse the control value 4135 * syncRequestValue ::= SEQUENCE { 4136 * mode ENUMERATED { 4137 * -- 0 unused 4138 * refreshOnly (1), 4139 * -- 2 reserved 4140 * refreshAndPersist (3) 4141 * }, 4142 * cookie syncCookie OPTIONAL 4143 * } 4144 */ 4145 4146 ber_init2( ber, &ctrl->ldctl_value, 0 ); 4147 4148 if ( (tag = ber_scanf( ber, "{i" /*}*/, &mode )) == LBER_ERROR ) { 4149 rs->sr_text = "Sync control : mode decoding error"; 4150 return LDAP_PROTOCOL_ERROR; 4151 } 4152 4153 switch( mode ) { 4154 case LDAP_SYNC_REFRESH_ONLY: 4155 mode = SLAP_SYNC_REFRESH; 4156 break; 4157 case LDAP_SYNC_REFRESH_AND_PERSIST: 4158 mode = SLAP_SYNC_REFRESH_AND_PERSIST; 4159 break; 4160 default: 4161 rs->sr_text = "Sync control : unknown update mode"; 4162 return LDAP_PROTOCOL_ERROR; 4163 } 4164 4165 tag = ber_peek_tag( ber, &len ); 4166 4167 if ( tag == LDAP_TAG_SYNC_COOKIE ) { 4168 if (( ber_scanf( ber, /*{*/ "m", &cookie )) == LBER_ERROR ) { 4169 rs->sr_text = "Sync control : cookie decoding error"; 4170 return LDAP_PROTOCOL_ERROR; 4171 } 4172 tag = ber_peek_tag( ber, &len ); 4173 } 4174 if ( tag == LDAP_TAG_RELOAD_HINT ) { 4175 if (( ber_scanf( ber, /*{*/ "b", &rhint )) == LBER_ERROR ) { 4176 rs->sr_text = "Sync control : rhint decoding error"; 4177 return LDAP_PROTOCOL_ERROR; 4178 } 4179 } 4180 if (( ber_scanf( ber, /*{*/ "}")) == LBER_ERROR ) { 4181 rs->sr_text = "Sync control : decoding error"; 4182 return LDAP_PROTOCOL_ERROR; 4183 } 4184 sr = op->o_tmpcalloc( 1, sizeof(struct sync_control), op->o_tmpmemctx ); 4185 sr->sr_rhint = rhint; 4186 if (!BER_BVISNULL(&cookie)) { 4187 ber_dupbv_x( &sr->sr_state.octet_str, &cookie, op->o_tmpmemctx ); 4188 /* If parse fails, pretend no cookie was sent */ 4189 if ( slap_parse_sync_cookie( &sr->sr_state, op->o_tmpmemctx ) || 4190 sr->sr_state.rid == -1 ) { 4191 if ( sr->sr_state.ctxcsn ) { 4192 ber_bvarray_free_x( sr->sr_state.ctxcsn, op->o_tmpmemctx ); 4193 sr->sr_state.ctxcsn = NULL; 4194 } 4195 sr->sr_state.numcsns = 0; 4196 } 4197 } 4198 4199 op->o_controls[slap_cids.sc_LDAPsync] = sr; 4200 4201 op->o_sync = ctrl->ldctl_iscritical 4202 ? SLAP_CONTROL_CRITICAL 4203 : SLAP_CONTROL_NONCRITICAL; 4204 4205 op->o_sync_mode |= mode; /* o_sync_mode shares o_sync */ 4206 4207 return LDAP_SUCCESS; 4208 } 4209 4210 /* This overlay is set up for dynamic loading via moduleload. For static 4211 * configuration, you'll need to arrange for the slap_overinst to be 4212 * initialized and registered by some other function inside slapd. 4213 */ 4214 4215 static slap_overinst syncprov; 4216 4217 int 4218 syncprov_initialize() 4219 { 4220 int rc; 4221 4222 rc = register_supported_control( LDAP_CONTROL_SYNC, 4223 SLAP_CTRL_SEARCH, NULL, 4224 syncprov_parseCtrl, &slap_cids.sc_LDAPsync ); 4225 if ( rc != LDAP_SUCCESS ) { 4226 Debug( LDAP_DEBUG_ANY, 4227 "syncprov_init: Failed to register control %d\n", rc ); 4228 return rc; 4229 } 4230 4231 syncprov.on_bi.bi_type = "syncprov"; 4232 syncprov.on_bi.bi_flags = SLAPO_BFLAG_SINGLE; 4233 syncprov.on_bi.bi_db_init = syncprov_db_init; 4234 syncprov.on_bi.bi_db_destroy = syncprov_db_destroy; 4235 syncprov.on_bi.bi_db_open = syncprov_db_open; 4236 syncprov.on_bi.bi_db_close = syncprov_db_close; 4237 4238 syncprov.on_bi.bi_op_abandon = syncprov_op_abandon; 4239 syncprov.on_bi.bi_op_cancel = syncprov_op_abandon; 4240 4241 syncprov.on_bi.bi_op_add = syncprov_op_mod; 4242 syncprov.on_bi.bi_op_compare = syncprov_op_compare; 4243 syncprov.on_bi.bi_op_delete = syncprov_op_mod; 4244 syncprov.on_bi.bi_op_modify = syncprov_op_mod; 4245 syncprov.on_bi.bi_op_modrdn = syncprov_op_mod; 4246 syncprov.on_bi.bi_op_search = syncprov_op_search; 4247 syncprov.on_bi.bi_extended = syncprov_op_extended; 4248 syncprov.on_bi.bi_operational = syncprov_operational; 4249 4250 syncprov.on_bi.bi_cf_ocs = spocs; 4251 4252 generic_filter.f_desc = slap_schema.si_ad_objectClass; 4253 4254 rc = config_register_schema( spcfg, spocs ); 4255 if ( rc ) return rc; 4256 4257 return overlay_register( &syncprov ); 4258 } 4259 4260 #if SLAPD_OVER_SYNCPROV == SLAPD_MOD_DYNAMIC 4261 int 4262 init_module( int argc, char *argv[] ) 4263 { 4264 return syncprov_initialize(); 4265 } 4266 #endif /* SLAPD_OVER_SYNCPROV == SLAPD_MOD_DYNAMIC */ 4267 4268 #endif /* defined(SLAPD_OVER_SYNCPROV) */ 4269