1 /* $NetBSD: connection.c,v 1.2 2021/08/14 16:14:58 christos Exp $ */ 2 3 /* $OpenLDAP$ */ 4 /* This work is part of OpenLDAP Software <http://www.openldap.org/>. 5 * 6 * Copyright 1998-2021 The OpenLDAP Foundation. 7 * All rights reserved. 8 * 9 * Redistribution and use in source and binary forms, with or without 10 * modification, are permitted only as authorized by the OpenLDAP 11 * Public License. 12 * 13 * A copy of this license is available in the file LICENSE in the 14 * top-level directory of the distribution or, alternatively, at 15 * <http://www.OpenLDAP.org/license.html>. 16 */ 17 /* Portions Copyright (c) 1995 Regents of the University of Michigan. 18 * All rights reserved. 19 * 20 * Redistribution and use in source and binary forms are permitted 21 * provided that this notice is preserved and that due credit is given 22 * to the University of Michigan at Ann Arbor. The name of the University 23 * may not be used to endorse or promote products derived from this 24 * software without specific prior written permission. This software 25 * is provided ``as is'' without express or implied warranty. 26 */ 27 28 #include <sys/cdefs.h> 29 __RCSID("$NetBSD: connection.c,v 1.2 2021/08/14 16:14:58 christos Exp $"); 30 31 #include "portable.h" 32 33 #include <stdio.h> 34 #ifdef HAVE_LIMITS_H 35 #include <limits.h> 36 #endif 37 38 #include <ac/socket.h> 39 #include <ac/errno.h> 40 #include <ac/string.h> 41 #include <ac/time.h> 42 #include <ac/unistd.h> 43 44 #include "lload.h" 45 46 #include "lutil.h" 47 #include "lutil_ldap.h" 48 49 static unsigned long conn_nextid = 0; 50 51 static void 52 lload_connection_assign_nextid( LloadConnection *conn ) 53 { 54 conn->c_connid = __atomic_fetch_add( &conn_nextid, 1, __ATOMIC_RELAXED ); 55 } 56 57 /* 58 * We start off with the connection muted and c_currentber holding the pdu we 59 * received. 60 * 61 * We run c->c_pdu_cb for each pdu, stopping once we hit an error, have to wait 62 * on reading or after we process lload_conn_max_pdus_per_cycle pdus so as to 63 * maintain fairness and not hog the worker thread forever. 64 * 65 * If we've run out of pdus immediately available from the stream or hit the 66 * budget, we unmute the connection. 67 * 68 * c->c_pdu_cb might return an 'error' and not free the connection. That can 69 * happen when changing the state or when client is blocked on writing and 70 * already has a pdu pending on the same operation, it's their job to make sure 71 * we're woken up again. 72 */ 73 void * 74 handle_pdus( void *ctx, void *arg ) 75 { 76 LloadConnection *c = arg; 77 int pdus_handled = 0; 78 epoch_t epoch; 79 80 /* A reference was passed on to us */ 81 assert( IS_ALIVE( c, c_refcnt ) ); 82 83 epoch = epoch_join(); 84 for ( ;; ) { 85 BerElement *ber; 86 ber_tag_t tag; 87 ber_len_t len; 88 89 if ( c->c_pdu_cb( c ) ) { 90 /* Error/reset, get rid ouf our reference and bail */ 91 goto done; 92 } 93 94 if ( !IS_ALIVE( c, c_live ) ) { 95 break; 96 } 97 98 if ( ++pdus_handled >= lload_conn_max_pdus_per_cycle ) { 99 /* Do not read now, re-enable read event instead */ 100 break; 101 } 102 103 ber = c->c_currentber; 104 if ( ber == NULL && (ber = ber_alloc()) == NULL ) { 105 Debug( LDAP_DEBUG_ANY, "handle_pdus: " 106 "connid=%lu, ber_alloc failed\n", 107 c->c_connid ); 108 CONNECTION_LOCK_DESTROY(c); 109 goto done; 110 } 111 c->c_currentber = ber; 112 113 checked_lock( &c->c_io_mutex ); 114 if ( (lload_features & LLOAD_FEATURE_PAUSE) && 115 (c->c_io_state & LLOAD_C_READ_PAUSE) ) { 116 goto pause; 117 } 118 tag = ber_get_next( c->c_sb, &len, ber ); 119 checked_unlock( &c->c_io_mutex ); 120 if ( tag != LDAP_TAG_MESSAGE ) { 121 int err = sock_errno(); 122 123 if ( err != EWOULDBLOCK && err != EAGAIN ) { 124 if ( err || tag == LBER_ERROR ) { 125 char ebuf[128]; 126 Debug( LDAP_DEBUG_ANY, "handle_pdus: " 127 "ber_get_next on fd=%d failed errno=%d (%s)\n", 128 c->c_fd, err, 129 sock_errstr( err, ebuf, sizeof(ebuf) ) ); 130 } else { 131 Debug( LDAP_DEBUG_STATS, "handle_pdus: " 132 "ber_get_next on fd=%d connid=%lu received " 133 "a strange PDU tag=%lx\n", 134 c->c_fd, c->c_connid, tag ); 135 } 136 137 c->c_currentber = NULL; 138 ber_free( ber, 1 ); 139 CONNECTION_LOCK_DESTROY(c); 140 goto done; 141 } 142 break; 143 } 144 145 assert( IS_ALIVE( c, c_refcnt ) ); 146 epoch_leave( epoch ); 147 epoch = epoch_join(); 148 assert( IS_ALIVE( c, c_refcnt ) ); 149 } 150 151 checked_lock( &c->c_io_mutex ); 152 if ( !(lload_features & LLOAD_FEATURE_PAUSE) || 153 !(c->c_io_state & LLOAD_C_READ_PAUSE) ) { 154 event_add( c->c_read_event, c->c_read_timeout ); 155 Debug( LDAP_DEBUG_CONNS, "handle_pdus: " 156 "re-enabled read event on connid=%lu\n", 157 c->c_connid ); 158 } 159 pause: 160 c->c_io_state &= ~LLOAD_C_READ_HANDOVER; 161 checked_unlock( &c->c_io_mutex ); 162 163 done: 164 RELEASE_REF( c, c_refcnt, c->c_destroy ); 165 epoch_leave( epoch ); 166 return NULL; 167 } 168 169 /* 170 * Initial read on the connection, if we get an LDAP PDU, submit the 171 * processing of this and successive ones to the work queue. 172 * 173 * If we can't submit it to the queue (overload), process this one and return 174 * to the event loop immediately after. 175 */ 176 void 177 connection_read_cb( evutil_socket_t s, short what, void *arg ) 178 { 179 LloadConnection *c = arg; 180 BerElement *ber; 181 ber_tag_t tag; 182 ber_len_t len; 183 epoch_t epoch; 184 int pause; 185 186 if ( !IS_ALIVE( c, c_live ) ) { 187 event_del( c->c_read_event ); 188 Debug( LDAP_DEBUG_CONNS, "connection_read_cb: " 189 "suspended read event on a dead connid=%lu\n", 190 c->c_connid ); 191 return; 192 } 193 194 if ( what & EV_TIMEOUT ) { 195 Debug( LDAP_DEBUG_CONNS, "connection_read_cb: " 196 "connid=%lu, timeout reached, destroying\n", 197 c->c_connid ); 198 /* Make sure the connection stays around for us to unlock it */ 199 epoch = epoch_join(); 200 CONNECTION_LOCK_DESTROY(c); 201 epoch_leave( epoch ); 202 return; 203 } 204 205 if ( !acquire_ref( &c->c_refcnt ) ) { 206 return; 207 } 208 epoch = epoch_join(); 209 210 Debug( LDAP_DEBUG_CONNS, "connection_read_cb: " 211 "connection connid=%lu ready to read\n", 212 c->c_connid ); 213 214 ber = c->c_currentber; 215 if ( ber == NULL && (ber = ber_alloc()) == NULL ) { 216 Debug( LDAP_DEBUG_ANY, "connection_read_cb: " 217 "connid=%lu, ber_alloc failed\n", 218 c->c_connid ); 219 goto out; 220 } 221 c->c_currentber = ber; 222 223 checked_lock( &c->c_io_mutex ); 224 assert( !(c->c_io_state & LLOAD_C_READ_HANDOVER) ); 225 tag = ber_get_next( c->c_sb, &len, ber ); 226 pause = c->c_io_state & LLOAD_C_READ_PAUSE; 227 checked_unlock( &c->c_io_mutex ); 228 229 if ( tag != LDAP_TAG_MESSAGE ) { 230 int err = sock_errno(); 231 232 if ( err != EWOULDBLOCK && err != EAGAIN ) { 233 if ( err || tag == LBER_ERROR ) { 234 char ebuf[128]; 235 Debug( LDAP_DEBUG_STATS, "connection_read_cb: " 236 "ber_get_next on fd=%d failed errno=%d (%s)\n", 237 c->c_fd, err, 238 sock_errstr( err, ebuf, sizeof(ebuf) ) ); 239 } else { 240 Debug( LDAP_DEBUG_STATS, "connection_read_cb: " 241 "ber_get_next on fd=%d connid=%lu received " 242 "a strange PDU tag=%lx\n", 243 c->c_fd, c->c_connid, tag ); 244 } 245 246 c->c_currentber = NULL; 247 ber_free( ber, 1 ); 248 249 event_del( c->c_read_event ); 250 Debug( LDAP_DEBUG_CONNS, "connection_read_cb: " 251 "suspended read event on dying connid=%lu\n", 252 c->c_connid ); 253 CONNECTION_LOCK_DESTROY(c); 254 goto out; 255 } 256 if ( !(lload_features & LLOAD_FEATURE_PAUSE) || !pause ) { 257 event_add( c->c_read_event, c->c_read_timeout ); 258 Debug( LDAP_DEBUG_CONNS, "connection_read_cb: " 259 "re-enabled read event on connid=%lu\n", 260 c->c_connid ); 261 } 262 goto out; 263 } 264 265 checked_lock( &c->c_io_mutex ); 266 c->c_io_state |= LLOAD_C_READ_HANDOVER; 267 checked_unlock( &c->c_io_mutex ); 268 event_del( c->c_read_event ); 269 270 if ( !lload_conn_max_pdus_per_cycle || 271 ldap_pvt_thread_pool_submit( &connection_pool, handle_pdus, c ) ) { 272 /* If we're overloaded or configured as such, process one and resume in 273 * the next cycle. */ 274 int rc = c->c_pdu_cb( c ); 275 276 checked_lock( &c->c_io_mutex ); 277 c->c_io_state &= ~LLOAD_C_READ_HANDOVER; 278 if ( rc == LDAP_SUCCESS && 279 ( !(lload_features & LLOAD_FEATURE_PAUSE) || 280 !(c->c_io_state & LLOAD_C_READ_PAUSE) ) ) { 281 event_add( c->c_read_event, c->c_read_timeout ); 282 } 283 checked_unlock( &c->c_io_mutex ); 284 goto out; 285 } 286 287 Debug( LDAP_DEBUG_CONNS, "connection_read_cb: " 288 "suspended read event on connid=%lu\n", 289 c->c_connid ); 290 291 /* 292 * We have scheduled a call to handle_pdus to take care of handling this 293 * and further requests, its reference is now owned by that task. 294 */ 295 epoch_leave( epoch ); 296 return; 297 298 out: 299 RELEASE_REF( c, c_refcnt, c->c_destroy ); 300 epoch_leave( epoch ); 301 } 302 303 void 304 connection_write_cb( evutil_socket_t s, short what, void *arg ) 305 { 306 LloadConnection *c = arg; 307 epoch_t epoch; 308 309 Debug( LDAP_DEBUG_CONNS, "connection_write_cb: " 310 "considering writing to%s connid=%lu what=%hd\n", 311 c->c_live ? " live" : " dead", c->c_connid, what ); 312 if ( !IS_ALIVE( c, c_live ) ) { 313 return; 314 } 315 316 if ( what & EV_TIMEOUT ) { 317 Debug( LDAP_DEBUG_CONNS, "connection_write_cb: " 318 "connid=%lu, timeout reached, destroying\n", 319 c->c_connid ); 320 /* Make sure the connection stays around for us to unlock it */ 321 epoch = epoch_join(); 322 CONNECTION_LOCK_DESTROY(c); 323 epoch_leave( epoch ); 324 return; 325 } 326 327 /* Before we acquire any locks */ 328 event_del( c->c_write_event ); 329 330 if ( !acquire_ref( &c->c_refcnt ) ) { 331 return; 332 } 333 334 /* If what == 0, we have a caller as opposed to being a callback */ 335 if ( what ) { 336 epoch = epoch_join(); 337 } 338 339 checked_lock( &c->c_io_mutex ); 340 Debug( LDAP_DEBUG_CONNS, "connection_write_cb: " 341 "have something to write to connection connid=%lu\n", 342 c->c_connid ); 343 344 /* We might have been beaten to flushing the data by another thread */ 345 if ( c->c_pendingber && ber_flush( c->c_sb, c->c_pendingber, 1 ) ) { 346 int err = sock_errno(); 347 348 if ( err != EWOULDBLOCK && err != EAGAIN ) { 349 char ebuf[128]; 350 checked_unlock( &c->c_io_mutex ); 351 Debug( LDAP_DEBUG_ANY, "connection_write_cb: " 352 "ber_flush on fd=%d failed errno=%d (%s)\n", 353 c->c_fd, err, sock_errstr( err, ebuf, sizeof(ebuf) ) ); 354 CONNECTION_LOCK_DESTROY(c); 355 goto done; 356 } 357 358 if ( !(c->c_io_state & LLOAD_C_READ_PAUSE) ) { 359 Debug( LDAP_DEBUG_CONNS, "connection_write_cb: " 360 "connection connid=%lu blocked on writing, marking " 361 "paused\n", 362 c->c_connid ); 363 } 364 c->c_io_state |= LLOAD_C_READ_PAUSE; 365 366 /* TODO: Do not reset write timeout unless we wrote something */ 367 event_add( c->c_write_event, lload_write_timeout ); 368 } else { 369 c->c_pendingber = NULL; 370 if ( c->c_io_state & LLOAD_C_READ_PAUSE ) { 371 c->c_io_state ^= LLOAD_C_READ_PAUSE; 372 Debug( LDAP_DEBUG_CONNS, "connection_write_cb: " 373 "Unpausing connection connid=%lu\n", 374 c->c_connid ); 375 if ( !(c->c_io_state & LLOAD_C_READ_HANDOVER) ) { 376 event_add( c->c_read_event, c->c_read_timeout ); 377 } 378 } 379 } 380 checked_unlock( &c->c_io_mutex ); 381 382 done: 383 RELEASE_REF( c, c_refcnt, c->c_destroy ); 384 if ( what ) { 385 epoch_leave( epoch ); 386 } 387 } 388 389 void 390 connection_destroy( LloadConnection *c ) 391 { 392 assert( c ); 393 Debug( LDAP_DEBUG_CONNS, "connection_destroy: " 394 "destroying connection connid=%lu\n", 395 c->c_connid ); 396 397 CONNECTION_ASSERT_LOCKED(c); 398 assert( c->c_live == 0 ); 399 assert( c->c_refcnt == 0 ); 400 assert( c->c_state == LLOAD_C_INVALID ); 401 402 ber_sockbuf_free( c->c_sb ); 403 404 if ( c->c_currentber ) { 405 ber_free( c->c_currentber, 1 ); 406 c->c_currentber = NULL; 407 } 408 if ( c->c_pendingber ) { 409 ber_free( c->c_pendingber, 1 ); 410 c->c_pendingber = NULL; 411 } 412 413 if ( !BER_BVISNULL( &c->c_sasl_bind_mech ) ) { 414 ber_memfree( c->c_sasl_bind_mech.bv_val ); 415 BER_BVZERO( &c->c_sasl_bind_mech ); 416 } 417 #ifdef HAVE_CYRUS_SASL 418 if ( c->c_sasl_defaults ) { 419 lutil_sasl_freedefs( c->c_sasl_defaults ); 420 c->c_sasl_defaults = NULL; 421 } 422 if ( c->c_sasl_authctx ) { 423 #ifdef SASL_CHANNEL_BINDING /* 2.1.25+ */ 424 if ( c->c_sasl_cbinding ) { 425 ch_free( c->c_sasl_cbinding ); 426 } 427 #endif 428 sasl_dispose( &c->c_sasl_authctx ); 429 } 430 #endif /* HAVE_CYRUS_SASL */ 431 432 CONNECTION_UNLOCK(c); 433 434 ldap_pvt_thread_mutex_destroy( &c->c_io_mutex ); 435 ldap_pvt_thread_mutex_destroy( &c->c_mutex ); 436 437 ch_free( c ); 438 439 listeners_reactivate(); 440 } 441 442 /* 443 * Called holding mutex, will walk cq calling cb on all connections whose 444 * c_connid <= cq_last->c_connid that still exist at the time we get to them. 445 */ 446 void 447 connections_walk_last( 448 ldap_pvt_thread_mutex_t *cq_mutex, 449 lload_c_head *cq, 450 LloadConnection *cq_last, 451 CONNCB cb, 452 void *arg ) 453 { 454 LloadConnection *c = cq_last; 455 uintptr_t last_connid; 456 457 if ( LDAP_CIRCLEQ_EMPTY( cq ) ) { 458 return; 459 } 460 assert_locked( cq_mutex ); 461 462 last_connid = c->c_connid; 463 c = LDAP_CIRCLEQ_LOOP_NEXT( cq, c, c_next ); 464 465 while ( !acquire_ref( &c->c_refcnt ) ) { 466 c = LDAP_CIRCLEQ_LOOP_NEXT( cq, c, c_next ); 467 if ( c->c_connid >= last_connid ) { 468 assert_locked( cq_mutex ); 469 return; 470 } 471 } 472 473 /* 474 * Notes: 475 * - we maintain the connections in the cq CIRCLEQ_ in ascending c_connid 476 * order 477 * - the connection with the highest c_connid is passed in cq_last 478 * - we can only use cq when we hold cq_mutex 479 * - connections might be added to or removed from cq while we're busy 480 * processing connections 481 * - we need a way to detect we've finished looping around cq for some 482 * definition of looping around 483 */ 484 do { 485 int rc; 486 487 checked_unlock( cq_mutex ); 488 489 rc = cb( c, arg ); 490 RELEASE_REF( c, c_refcnt, c->c_destroy ); 491 492 checked_lock( cq_mutex ); 493 if ( rc || LDAP_CIRCLEQ_EMPTY( cq ) ) { 494 break; 495 } 496 497 do { 498 LloadConnection *old = c; 499 c = LDAP_CIRCLEQ_LOOP_NEXT( cq, c, c_next ); 500 if ( c->c_connid <= old->c_connid || c->c_connid > last_connid ) { 501 assert_locked( cq_mutex ); 502 return; 503 } 504 } while ( !acquire_ref( &c->c_refcnt ) ); 505 } while ( c->c_connid <= last_connid ); 506 assert_locked( cq_mutex ); 507 } 508 509 void 510 connections_walk( 511 ldap_pvt_thread_mutex_t *cq_mutex, 512 lload_c_head *cq, 513 CONNCB cb, 514 void *arg ) 515 { 516 LloadConnection *cq_last = LDAP_CIRCLEQ_LAST( cq ); 517 return connections_walk_last( cq_mutex, cq, cq_last, cb, arg ); 518 } 519 520 int 521 lload_connection_close( LloadConnection *c, void *arg ) 522 { 523 int gentle = *(int *)arg; 524 LloadOperation *op; 525 526 Debug( LDAP_DEBUG_CONNS, "lload_connection_close: " 527 "marking connection connid=%lu closing\n", 528 c->c_connid ); 529 530 /* We were approached from the connection list */ 531 assert( IS_ALIVE( c, c_refcnt ) ); 532 533 CONNECTION_LOCK(c); 534 if ( !gentle || !c->c_ops ) { 535 CONNECTION_DESTROY(c); 536 return LDAP_SUCCESS; 537 } 538 539 /* The first thing we do is make sure we don't get new Operations in */ 540 c->c_state = LLOAD_C_CLOSING; 541 542 do { 543 TAvlnode *node = ldap_tavl_end( c->c_ops, TAVL_DIR_LEFT ); 544 op = node->avl_data; 545 546 /* Close operations that would need client action to resolve, 547 * only SASL binds in progress do that right now */ 548 if ( op->o_client_msgid || op->o_upstream_msgid ) { 549 break; 550 } 551 552 CONNECTION_UNLOCK(c); 553 operation_unlink( op ); 554 CONNECTION_LOCK(c); 555 } while ( c->c_ops ); 556 557 CONNECTION_UNLOCK(c); 558 return LDAP_SUCCESS; 559 } 560 561 LloadConnection * 562 lload_connection_init( ber_socket_t s, const char *peername, int flags ) 563 { 564 LloadConnection *c; 565 566 assert( peername != NULL ); 567 568 if ( s == AC_SOCKET_INVALID ) { 569 Debug( LDAP_DEBUG_ANY, "lload_connection_init: " 570 "init of socket fd=%ld invalid\n", 571 (long)s ); 572 return NULL; 573 } 574 575 assert( s >= 0 ); 576 577 c = ch_calloc( 1, sizeof(LloadConnection) ); 578 579 c->c_fd = s; 580 c->c_sb = ber_sockbuf_alloc(); 581 ber_sockbuf_ctrl( c->c_sb, LBER_SB_OPT_SET_FD, &s ); 582 583 #ifdef LDAP_PF_LOCAL 584 if ( flags & CONN_IS_IPC ) { 585 #ifdef LDAP_DEBUG 586 ber_sockbuf_add_io( c->c_sb, &ber_sockbuf_io_debug, 587 LBER_SBIOD_LEVEL_PROVIDER, (void *)"ipc_" ); 588 #endif 589 ber_sockbuf_add_io( c->c_sb, &ber_sockbuf_io_fd, 590 LBER_SBIOD_LEVEL_PROVIDER, (void *)&s ); 591 } else 592 #endif /* LDAP_PF_LOCAL */ 593 { 594 #ifdef LDAP_DEBUG 595 ber_sockbuf_add_io( c->c_sb, &ber_sockbuf_io_debug, 596 LBER_SBIOD_LEVEL_PROVIDER, (void *)"tcp_" ); 597 #endif 598 ber_sockbuf_add_io( c->c_sb, &ber_sockbuf_io_tcp, 599 LBER_SBIOD_LEVEL_PROVIDER, (void *)&s ); 600 } 601 602 #ifdef LDAP_DEBUG 603 ber_sockbuf_add_io( 604 c->c_sb, &ber_sockbuf_io_debug, INT_MAX, (void *)"lload_" ); 605 #endif 606 607 c->c_next_msgid = 1; 608 c->c_refcnt = c->c_live = 1; 609 c->c_destroy = connection_destroy; 610 611 LDAP_CIRCLEQ_ENTRY_INIT( c, c_next ); 612 613 ldap_pvt_thread_mutex_init( &c->c_mutex ); 614 ldap_pvt_thread_mutex_init( &c->c_io_mutex ); 615 616 lload_connection_assign_nextid( c ); 617 618 Debug( LDAP_DEBUG_CONNS, "lload_connection_init: " 619 "connection connid=%lu allocated for socket fd=%d peername=%s\n", 620 c->c_connid, s, peername ); 621 622 c->c_state = LLOAD_C_ACTIVE; 623 624 return c; 625 } 626