1 /* $NetBSD: connection.c,v 1.2 2021/08/14 16:14:58 christos Exp $ */
2
3 /* $OpenLDAP$ */
4 /* This work is part of OpenLDAP Software <http://www.openldap.org/>.
5 *
6 * Copyright 1998-2021 The OpenLDAP Foundation.
7 * All rights reserved.
8 *
9 * Redistribution and use in source and binary forms, with or without
10 * modification, are permitted only as authorized by the OpenLDAP
11 * Public License.
12 *
13 * A copy of this license is available in the file LICENSE in the
14 * top-level directory of the distribution or, alternatively, at
15 * <http://www.OpenLDAP.org/license.html>.
16 */
17 /* Portions Copyright (c) 1995 Regents of the University of Michigan.
18 * All rights reserved.
19 *
20 * Redistribution and use in source and binary forms are permitted
21 * provided that this notice is preserved and that due credit is given
22 * to the University of Michigan at Ann Arbor. The name of the University
23 * may not be used to endorse or promote products derived from this
24 * software without specific prior written permission. This software
25 * is provided ``as is'' without express or implied warranty.
26 */
27
28 #include <sys/cdefs.h>
29 __RCSID("$NetBSD: connection.c,v 1.2 2021/08/14 16:14:58 christos Exp $");
30
31 #include "portable.h"
32
33 #include <stdio.h>
34 #ifdef HAVE_LIMITS_H
35 #include <limits.h>
36 #endif
37
38 #include <ac/socket.h>
39 #include <ac/errno.h>
40 #include <ac/string.h>
41 #include <ac/time.h>
42 #include <ac/unistd.h>
43
44 #include "lload.h"
45
46 #include "lutil.h"
47 #include "lutil_ldap.h"
48
49 static unsigned long conn_nextid = 0;
50
51 static void
lload_connection_assign_nextid(LloadConnection * conn)52 lload_connection_assign_nextid( LloadConnection *conn )
53 {
54 conn->c_connid = __atomic_fetch_add( &conn_nextid, 1, __ATOMIC_RELAXED );
55 }
56
57 /*
58 * We start off with the connection muted and c_currentber holding the pdu we
59 * received.
60 *
61 * We run c->c_pdu_cb for each pdu, stopping once we hit an error, have to wait
62 * on reading or after we process lload_conn_max_pdus_per_cycle pdus so as to
63 * maintain fairness and not hog the worker thread forever.
64 *
65 * If we've run out of pdus immediately available from the stream or hit the
66 * budget, we unmute the connection.
67 *
68 * c->c_pdu_cb might return an 'error' and not free the connection. That can
69 * happen when changing the state or when client is blocked on writing and
70 * already has a pdu pending on the same operation, it's their job to make sure
71 * we're woken up again.
72 */
73 void *
handle_pdus(void * ctx,void * arg)74 handle_pdus( void *ctx, void *arg )
75 {
76 LloadConnection *c = arg;
77 int pdus_handled = 0;
78 epoch_t epoch;
79
80 /* A reference was passed on to us */
81 assert( IS_ALIVE( c, c_refcnt ) );
82
83 epoch = epoch_join();
84 for ( ;; ) {
85 BerElement *ber;
86 ber_tag_t tag;
87 ber_len_t len;
88
89 if ( c->c_pdu_cb( c ) ) {
90 /* Error/reset, get rid ouf our reference and bail */
91 goto done;
92 }
93
94 if ( !IS_ALIVE( c, c_live ) ) {
95 break;
96 }
97
98 if ( ++pdus_handled >= lload_conn_max_pdus_per_cycle ) {
99 /* Do not read now, re-enable read event instead */
100 break;
101 }
102
103 ber = c->c_currentber;
104 if ( ber == NULL && (ber = ber_alloc()) == NULL ) {
105 Debug( LDAP_DEBUG_ANY, "handle_pdus: "
106 "connid=%lu, ber_alloc failed\n",
107 c->c_connid );
108 CONNECTION_LOCK_DESTROY(c);
109 goto done;
110 }
111 c->c_currentber = ber;
112
113 checked_lock( &c->c_io_mutex );
114 if ( (lload_features & LLOAD_FEATURE_PAUSE) &&
115 (c->c_io_state & LLOAD_C_READ_PAUSE) ) {
116 goto pause;
117 }
118 tag = ber_get_next( c->c_sb, &len, ber );
119 checked_unlock( &c->c_io_mutex );
120 if ( tag != LDAP_TAG_MESSAGE ) {
121 int err = sock_errno();
122
123 if ( err != EWOULDBLOCK && err != EAGAIN ) {
124 if ( err || tag == LBER_ERROR ) {
125 char ebuf[128];
126 Debug( LDAP_DEBUG_ANY, "handle_pdus: "
127 "ber_get_next on fd=%d failed errno=%d (%s)\n",
128 c->c_fd, err,
129 sock_errstr( err, ebuf, sizeof(ebuf) ) );
130 } else {
131 Debug( LDAP_DEBUG_STATS, "handle_pdus: "
132 "ber_get_next on fd=%d connid=%lu received "
133 "a strange PDU tag=%lx\n",
134 c->c_fd, c->c_connid, tag );
135 }
136
137 c->c_currentber = NULL;
138 ber_free( ber, 1 );
139 CONNECTION_LOCK_DESTROY(c);
140 goto done;
141 }
142 break;
143 }
144
145 assert( IS_ALIVE( c, c_refcnt ) );
146 epoch_leave( epoch );
147 epoch = epoch_join();
148 assert( IS_ALIVE( c, c_refcnt ) );
149 }
150
151 checked_lock( &c->c_io_mutex );
152 if ( !(lload_features & LLOAD_FEATURE_PAUSE) ||
153 !(c->c_io_state & LLOAD_C_READ_PAUSE) ) {
154 event_add( c->c_read_event, c->c_read_timeout );
155 Debug( LDAP_DEBUG_CONNS, "handle_pdus: "
156 "re-enabled read event on connid=%lu\n",
157 c->c_connid );
158 }
159 pause:
160 c->c_io_state &= ~LLOAD_C_READ_HANDOVER;
161 checked_unlock( &c->c_io_mutex );
162
163 done:
164 RELEASE_REF( c, c_refcnt, c->c_destroy );
165 epoch_leave( epoch );
166 return NULL;
167 }
168
169 /*
170 * Initial read on the connection, if we get an LDAP PDU, submit the
171 * processing of this and successive ones to the work queue.
172 *
173 * If we can't submit it to the queue (overload), process this one and return
174 * to the event loop immediately after.
175 */
176 void
connection_read_cb(evutil_socket_t s,short what,void * arg)177 connection_read_cb( evutil_socket_t s, short what, void *arg )
178 {
179 LloadConnection *c = arg;
180 BerElement *ber;
181 ber_tag_t tag;
182 ber_len_t len;
183 epoch_t epoch;
184 int pause;
185
186 if ( !IS_ALIVE( c, c_live ) ) {
187 event_del( c->c_read_event );
188 Debug( LDAP_DEBUG_CONNS, "connection_read_cb: "
189 "suspended read event on a dead connid=%lu\n",
190 c->c_connid );
191 return;
192 }
193
194 if ( what & EV_TIMEOUT ) {
195 Debug( LDAP_DEBUG_CONNS, "connection_read_cb: "
196 "connid=%lu, timeout reached, destroying\n",
197 c->c_connid );
198 /* Make sure the connection stays around for us to unlock it */
199 epoch = epoch_join();
200 CONNECTION_LOCK_DESTROY(c);
201 epoch_leave( epoch );
202 return;
203 }
204
205 if ( !acquire_ref( &c->c_refcnt ) ) {
206 return;
207 }
208 epoch = epoch_join();
209
210 Debug( LDAP_DEBUG_CONNS, "connection_read_cb: "
211 "connection connid=%lu ready to read\n",
212 c->c_connid );
213
214 ber = c->c_currentber;
215 if ( ber == NULL && (ber = ber_alloc()) == NULL ) {
216 Debug( LDAP_DEBUG_ANY, "connection_read_cb: "
217 "connid=%lu, ber_alloc failed\n",
218 c->c_connid );
219 goto out;
220 }
221 c->c_currentber = ber;
222
223 checked_lock( &c->c_io_mutex );
224 assert( !(c->c_io_state & LLOAD_C_READ_HANDOVER) );
225 tag = ber_get_next( c->c_sb, &len, ber );
226 pause = c->c_io_state & LLOAD_C_READ_PAUSE;
227 checked_unlock( &c->c_io_mutex );
228
229 if ( tag != LDAP_TAG_MESSAGE ) {
230 int err = sock_errno();
231
232 if ( err != EWOULDBLOCK && err != EAGAIN ) {
233 if ( err || tag == LBER_ERROR ) {
234 char ebuf[128];
235 Debug( LDAP_DEBUG_STATS, "connection_read_cb: "
236 "ber_get_next on fd=%d failed errno=%d (%s)\n",
237 c->c_fd, err,
238 sock_errstr( err, ebuf, sizeof(ebuf) ) );
239 } else {
240 Debug( LDAP_DEBUG_STATS, "connection_read_cb: "
241 "ber_get_next on fd=%d connid=%lu received "
242 "a strange PDU tag=%lx\n",
243 c->c_fd, c->c_connid, tag );
244 }
245
246 c->c_currentber = NULL;
247 ber_free( ber, 1 );
248
249 event_del( c->c_read_event );
250 Debug( LDAP_DEBUG_CONNS, "connection_read_cb: "
251 "suspended read event on dying connid=%lu\n",
252 c->c_connid );
253 CONNECTION_LOCK_DESTROY(c);
254 goto out;
255 }
256 if ( !(lload_features & LLOAD_FEATURE_PAUSE) || !pause ) {
257 event_add( c->c_read_event, c->c_read_timeout );
258 Debug( LDAP_DEBUG_CONNS, "connection_read_cb: "
259 "re-enabled read event on connid=%lu\n",
260 c->c_connid );
261 }
262 goto out;
263 }
264
265 checked_lock( &c->c_io_mutex );
266 c->c_io_state |= LLOAD_C_READ_HANDOVER;
267 checked_unlock( &c->c_io_mutex );
268 event_del( c->c_read_event );
269
270 if ( !lload_conn_max_pdus_per_cycle ||
271 ldap_pvt_thread_pool_submit( &connection_pool, handle_pdus, c ) ) {
272 /* If we're overloaded or configured as such, process one and resume in
273 * the next cycle. */
274 int rc = c->c_pdu_cb( c );
275
276 checked_lock( &c->c_io_mutex );
277 c->c_io_state &= ~LLOAD_C_READ_HANDOVER;
278 if ( rc == LDAP_SUCCESS &&
279 ( !(lload_features & LLOAD_FEATURE_PAUSE) ||
280 !(c->c_io_state & LLOAD_C_READ_PAUSE) ) ) {
281 event_add( c->c_read_event, c->c_read_timeout );
282 }
283 checked_unlock( &c->c_io_mutex );
284 goto out;
285 }
286
287 Debug( LDAP_DEBUG_CONNS, "connection_read_cb: "
288 "suspended read event on connid=%lu\n",
289 c->c_connid );
290
291 /*
292 * We have scheduled a call to handle_pdus to take care of handling this
293 * and further requests, its reference is now owned by that task.
294 */
295 epoch_leave( epoch );
296 return;
297
298 out:
299 RELEASE_REF( c, c_refcnt, c->c_destroy );
300 epoch_leave( epoch );
301 }
302
303 void
connection_write_cb(evutil_socket_t s,short what,void * arg)304 connection_write_cb( evutil_socket_t s, short what, void *arg )
305 {
306 LloadConnection *c = arg;
307 epoch_t epoch;
308
309 Debug( LDAP_DEBUG_CONNS, "connection_write_cb: "
310 "considering writing to%s connid=%lu what=%hd\n",
311 c->c_live ? " live" : " dead", c->c_connid, what );
312 if ( !IS_ALIVE( c, c_live ) ) {
313 return;
314 }
315
316 if ( what & EV_TIMEOUT ) {
317 Debug( LDAP_DEBUG_CONNS, "connection_write_cb: "
318 "connid=%lu, timeout reached, destroying\n",
319 c->c_connid );
320 /* Make sure the connection stays around for us to unlock it */
321 epoch = epoch_join();
322 CONNECTION_LOCK_DESTROY(c);
323 epoch_leave( epoch );
324 return;
325 }
326
327 /* Before we acquire any locks */
328 event_del( c->c_write_event );
329
330 if ( !acquire_ref( &c->c_refcnt ) ) {
331 return;
332 }
333
334 /* If what == 0, we have a caller as opposed to being a callback */
335 if ( what ) {
336 epoch = epoch_join();
337 }
338
339 checked_lock( &c->c_io_mutex );
340 Debug( LDAP_DEBUG_CONNS, "connection_write_cb: "
341 "have something to write to connection connid=%lu\n",
342 c->c_connid );
343
344 /* We might have been beaten to flushing the data by another thread */
345 if ( c->c_pendingber && ber_flush( c->c_sb, c->c_pendingber, 1 ) ) {
346 int err = sock_errno();
347
348 if ( err != EWOULDBLOCK && err != EAGAIN ) {
349 char ebuf[128];
350 checked_unlock( &c->c_io_mutex );
351 Debug( LDAP_DEBUG_ANY, "connection_write_cb: "
352 "ber_flush on fd=%d failed errno=%d (%s)\n",
353 c->c_fd, err, sock_errstr( err, ebuf, sizeof(ebuf) ) );
354 CONNECTION_LOCK_DESTROY(c);
355 goto done;
356 }
357
358 if ( !(c->c_io_state & LLOAD_C_READ_PAUSE) ) {
359 Debug( LDAP_DEBUG_CONNS, "connection_write_cb: "
360 "connection connid=%lu blocked on writing, marking "
361 "paused\n",
362 c->c_connid );
363 }
364 c->c_io_state |= LLOAD_C_READ_PAUSE;
365
366 /* TODO: Do not reset write timeout unless we wrote something */
367 event_add( c->c_write_event, lload_write_timeout );
368 } else {
369 c->c_pendingber = NULL;
370 if ( c->c_io_state & LLOAD_C_READ_PAUSE ) {
371 c->c_io_state ^= LLOAD_C_READ_PAUSE;
372 Debug( LDAP_DEBUG_CONNS, "connection_write_cb: "
373 "Unpausing connection connid=%lu\n",
374 c->c_connid );
375 if ( !(c->c_io_state & LLOAD_C_READ_HANDOVER) ) {
376 event_add( c->c_read_event, c->c_read_timeout );
377 }
378 }
379 }
380 checked_unlock( &c->c_io_mutex );
381
382 done:
383 RELEASE_REF( c, c_refcnt, c->c_destroy );
384 if ( what ) {
385 epoch_leave( epoch );
386 }
387 }
388
389 void
connection_destroy(LloadConnection * c)390 connection_destroy( LloadConnection *c )
391 {
392 assert( c );
393 Debug( LDAP_DEBUG_CONNS, "connection_destroy: "
394 "destroying connection connid=%lu\n",
395 c->c_connid );
396
397 CONNECTION_ASSERT_LOCKED(c);
398 assert( c->c_live == 0 );
399 assert( c->c_refcnt == 0 );
400 assert( c->c_state == LLOAD_C_INVALID );
401
402 ber_sockbuf_free( c->c_sb );
403
404 if ( c->c_currentber ) {
405 ber_free( c->c_currentber, 1 );
406 c->c_currentber = NULL;
407 }
408 if ( c->c_pendingber ) {
409 ber_free( c->c_pendingber, 1 );
410 c->c_pendingber = NULL;
411 }
412
413 if ( !BER_BVISNULL( &c->c_sasl_bind_mech ) ) {
414 ber_memfree( c->c_sasl_bind_mech.bv_val );
415 BER_BVZERO( &c->c_sasl_bind_mech );
416 }
417 #ifdef HAVE_CYRUS_SASL
418 if ( c->c_sasl_defaults ) {
419 lutil_sasl_freedefs( c->c_sasl_defaults );
420 c->c_sasl_defaults = NULL;
421 }
422 if ( c->c_sasl_authctx ) {
423 #ifdef SASL_CHANNEL_BINDING /* 2.1.25+ */
424 if ( c->c_sasl_cbinding ) {
425 ch_free( c->c_sasl_cbinding );
426 }
427 #endif
428 sasl_dispose( &c->c_sasl_authctx );
429 }
430 #endif /* HAVE_CYRUS_SASL */
431
432 CONNECTION_UNLOCK(c);
433
434 ldap_pvt_thread_mutex_destroy( &c->c_io_mutex );
435 ldap_pvt_thread_mutex_destroy( &c->c_mutex );
436
437 ch_free( c );
438
439 listeners_reactivate();
440 }
441
442 /*
443 * Called holding mutex, will walk cq calling cb on all connections whose
444 * c_connid <= cq_last->c_connid that still exist at the time we get to them.
445 */
446 void
connections_walk_last(ldap_pvt_thread_mutex_t * cq_mutex,lload_c_head * cq,LloadConnection * cq_last,CONNCB cb,void * arg)447 connections_walk_last(
448 ldap_pvt_thread_mutex_t *cq_mutex,
449 lload_c_head *cq,
450 LloadConnection *cq_last,
451 CONNCB cb,
452 void *arg )
453 {
454 LloadConnection *c = cq_last;
455 uintptr_t last_connid;
456
457 if ( LDAP_CIRCLEQ_EMPTY( cq ) ) {
458 return;
459 }
460 assert_locked( cq_mutex );
461
462 last_connid = c->c_connid;
463 c = LDAP_CIRCLEQ_LOOP_NEXT( cq, c, c_next );
464
465 while ( !acquire_ref( &c->c_refcnt ) ) {
466 c = LDAP_CIRCLEQ_LOOP_NEXT( cq, c, c_next );
467 if ( c->c_connid >= last_connid ) {
468 assert_locked( cq_mutex );
469 return;
470 }
471 }
472
473 /*
474 * Notes:
475 * - we maintain the connections in the cq CIRCLEQ_ in ascending c_connid
476 * order
477 * - the connection with the highest c_connid is passed in cq_last
478 * - we can only use cq when we hold cq_mutex
479 * - connections might be added to or removed from cq while we're busy
480 * processing connections
481 * - we need a way to detect we've finished looping around cq for some
482 * definition of looping around
483 */
484 do {
485 int rc;
486
487 checked_unlock( cq_mutex );
488
489 rc = cb( c, arg );
490 RELEASE_REF( c, c_refcnt, c->c_destroy );
491
492 checked_lock( cq_mutex );
493 if ( rc || LDAP_CIRCLEQ_EMPTY( cq ) ) {
494 break;
495 }
496
497 do {
498 LloadConnection *old = c;
499 c = LDAP_CIRCLEQ_LOOP_NEXT( cq, c, c_next );
500 if ( c->c_connid <= old->c_connid || c->c_connid > last_connid ) {
501 assert_locked( cq_mutex );
502 return;
503 }
504 } while ( !acquire_ref( &c->c_refcnt ) );
505 } while ( c->c_connid <= last_connid );
506 assert_locked( cq_mutex );
507 }
508
509 void
connections_walk(ldap_pvt_thread_mutex_t * cq_mutex,lload_c_head * cq,CONNCB cb,void * arg)510 connections_walk(
511 ldap_pvt_thread_mutex_t *cq_mutex,
512 lload_c_head *cq,
513 CONNCB cb,
514 void *arg )
515 {
516 LloadConnection *cq_last = LDAP_CIRCLEQ_LAST( cq );
517 return connections_walk_last( cq_mutex, cq, cq_last, cb, arg );
518 }
519
520 int
lload_connection_close(LloadConnection * c,void * arg)521 lload_connection_close( LloadConnection *c, void *arg )
522 {
523 int gentle = *(int *)arg;
524 LloadOperation *op;
525
526 Debug( LDAP_DEBUG_CONNS, "lload_connection_close: "
527 "marking connection connid=%lu closing\n",
528 c->c_connid );
529
530 /* We were approached from the connection list */
531 assert( IS_ALIVE( c, c_refcnt ) );
532
533 CONNECTION_LOCK(c);
534 if ( !gentle || !c->c_ops ) {
535 CONNECTION_DESTROY(c);
536 return LDAP_SUCCESS;
537 }
538
539 /* The first thing we do is make sure we don't get new Operations in */
540 c->c_state = LLOAD_C_CLOSING;
541
542 do {
543 TAvlnode *node = ldap_tavl_end( c->c_ops, TAVL_DIR_LEFT );
544 op = node->avl_data;
545
546 /* Close operations that would need client action to resolve,
547 * only SASL binds in progress do that right now */
548 if ( op->o_client_msgid || op->o_upstream_msgid ) {
549 break;
550 }
551
552 CONNECTION_UNLOCK(c);
553 operation_unlink( op );
554 CONNECTION_LOCK(c);
555 } while ( c->c_ops );
556
557 CONNECTION_UNLOCK(c);
558 return LDAP_SUCCESS;
559 }
560
561 LloadConnection *
lload_connection_init(ber_socket_t s,const char * peername,int flags)562 lload_connection_init( ber_socket_t s, const char *peername, int flags )
563 {
564 LloadConnection *c;
565
566 assert( peername != NULL );
567
568 if ( s == AC_SOCKET_INVALID ) {
569 Debug( LDAP_DEBUG_ANY, "lload_connection_init: "
570 "init of socket fd=%ld invalid\n",
571 (long)s );
572 return NULL;
573 }
574
575 assert( s >= 0 );
576
577 c = ch_calloc( 1, sizeof(LloadConnection) );
578
579 c->c_fd = s;
580 c->c_sb = ber_sockbuf_alloc();
581 ber_sockbuf_ctrl( c->c_sb, LBER_SB_OPT_SET_FD, &s );
582
583 #ifdef LDAP_PF_LOCAL
584 if ( flags & CONN_IS_IPC ) {
585 #ifdef LDAP_DEBUG
586 ber_sockbuf_add_io( c->c_sb, &ber_sockbuf_io_debug,
587 LBER_SBIOD_LEVEL_PROVIDER, (void *)"ipc_" );
588 #endif
589 ber_sockbuf_add_io( c->c_sb, &ber_sockbuf_io_fd,
590 LBER_SBIOD_LEVEL_PROVIDER, (void *)&s );
591 } else
592 #endif /* LDAP_PF_LOCAL */
593 {
594 #ifdef LDAP_DEBUG
595 ber_sockbuf_add_io( c->c_sb, &ber_sockbuf_io_debug,
596 LBER_SBIOD_LEVEL_PROVIDER, (void *)"tcp_" );
597 #endif
598 ber_sockbuf_add_io( c->c_sb, &ber_sockbuf_io_tcp,
599 LBER_SBIOD_LEVEL_PROVIDER, (void *)&s );
600 }
601
602 #ifdef LDAP_DEBUG
603 ber_sockbuf_add_io(
604 c->c_sb, &ber_sockbuf_io_debug, INT_MAX, (void *)"lload_" );
605 #endif
606
607 c->c_next_msgid = 1;
608 c->c_refcnt = c->c_live = 1;
609 c->c_destroy = connection_destroy;
610
611 LDAP_CIRCLEQ_ENTRY_INIT( c, c_next );
612
613 ldap_pvt_thread_mutex_init( &c->c_mutex );
614 ldap_pvt_thread_mutex_init( &c->c_io_mutex );
615
616 lload_connection_assign_nextid( c );
617
618 Debug( LDAP_DEBUG_CONNS, "lload_connection_init: "
619 "connection connid=%lu allocated for socket fd=%d peername=%s\n",
620 c->c_connid, s, peername );
621
622 c->c_state = LLOAD_C_ACTIVE;
623
624 return c;
625 }
626