xref: /netbsd-src/external/bsd/openldap/dist/servers/lloadd/connection.c (revision 549b59ed3ccf0d36d3097190a0db27b770f3a839)
1 /*	$NetBSD: connection.c,v 1.2 2021/08/14 16:14:58 christos Exp $	*/
2 
3 /* $OpenLDAP$ */
4 /* This work is part of OpenLDAP Software <http://www.openldap.org/>.
5  *
6  * Copyright 1998-2021 The OpenLDAP Foundation.
7  * All rights reserved.
8  *
9  * Redistribution and use in source and binary forms, with or without
10  * modification, are permitted only as authorized by the OpenLDAP
11  * Public License.
12  *
13  * A copy of this license is available in the file LICENSE in the
14  * top-level directory of the distribution or, alternatively, at
15  * <http://www.OpenLDAP.org/license.html>.
16  */
17 /* Portions Copyright (c) 1995 Regents of the University of Michigan.
18  * All rights reserved.
19  *
20  * Redistribution and use in source and binary forms are permitted
21  * provided that this notice is preserved and that due credit is given
22  * to the University of Michigan at Ann Arbor. The name of the University
23  * may not be used to endorse or promote products derived from this
24  * software without specific prior written permission. This software
25  * is provided ``as is'' without express or implied warranty.
26  */
27 
28 #include <sys/cdefs.h>
29 __RCSID("$NetBSD: connection.c,v 1.2 2021/08/14 16:14:58 christos Exp $");
30 
31 #include "portable.h"
32 
33 #include <stdio.h>
34 #ifdef HAVE_LIMITS_H
35 #include <limits.h>
36 #endif
37 
38 #include <ac/socket.h>
39 #include <ac/errno.h>
40 #include <ac/string.h>
41 #include <ac/time.h>
42 #include <ac/unistd.h>
43 
44 #include "lload.h"
45 
46 #include "lutil.h"
47 #include "lutil_ldap.h"
48 
49 static unsigned long conn_nextid = 0;
50 
51 static void
lload_connection_assign_nextid(LloadConnection * conn)52 lload_connection_assign_nextid( LloadConnection *conn )
53 {
54     conn->c_connid = __atomic_fetch_add( &conn_nextid, 1, __ATOMIC_RELAXED );
55 }
56 
57 /*
58  * We start off with the connection muted and c_currentber holding the pdu we
59  * received.
60  *
61  * We run c->c_pdu_cb for each pdu, stopping once we hit an error, have to wait
62  * on reading or after we process lload_conn_max_pdus_per_cycle pdus so as to
63  * maintain fairness and not hog the worker thread forever.
64  *
65  * If we've run out of pdus immediately available from the stream or hit the
66  * budget, we unmute the connection.
67  *
68  * c->c_pdu_cb might return an 'error' and not free the connection. That can
69  * happen when changing the state or when client is blocked on writing and
70  * already has a pdu pending on the same operation, it's their job to make sure
71  * we're woken up again.
72  */
73 void *
handle_pdus(void * ctx,void * arg)74 handle_pdus( void *ctx, void *arg )
75 {
76     LloadConnection *c = arg;
77     int pdus_handled = 0;
78     epoch_t epoch;
79 
80     /* A reference was passed on to us */
81     assert( IS_ALIVE( c, c_refcnt ) );
82 
83     epoch = epoch_join();
84     for ( ;; ) {
85         BerElement *ber;
86         ber_tag_t tag;
87         ber_len_t len;
88 
89         if ( c->c_pdu_cb( c ) ) {
90             /* Error/reset, get rid ouf our reference and bail */
91             goto done;
92         }
93 
94         if ( !IS_ALIVE( c, c_live ) ) {
95             break;
96         }
97 
98         if ( ++pdus_handled >= lload_conn_max_pdus_per_cycle ) {
99             /* Do not read now, re-enable read event instead */
100             break;
101         }
102 
103         ber = c->c_currentber;
104         if ( ber == NULL && (ber = ber_alloc()) == NULL ) {
105             Debug( LDAP_DEBUG_ANY, "handle_pdus: "
106                     "connid=%lu, ber_alloc failed\n",
107                     c->c_connid );
108             CONNECTION_LOCK_DESTROY(c);
109             goto done;
110         }
111         c->c_currentber = ber;
112 
113         checked_lock( &c->c_io_mutex );
114         if ( (lload_features & LLOAD_FEATURE_PAUSE) &&
115                 (c->c_io_state & LLOAD_C_READ_PAUSE) ) {
116             goto pause;
117         }
118         tag = ber_get_next( c->c_sb, &len, ber );
119         checked_unlock( &c->c_io_mutex );
120         if ( tag != LDAP_TAG_MESSAGE ) {
121             int err = sock_errno();
122 
123             if ( err != EWOULDBLOCK && err != EAGAIN ) {
124                 if ( err || tag == LBER_ERROR ) {
125                     char ebuf[128];
126                     Debug( LDAP_DEBUG_ANY, "handle_pdus: "
127                             "ber_get_next on fd=%d failed errno=%d (%s)\n",
128                             c->c_fd, err,
129                             sock_errstr( err, ebuf, sizeof(ebuf) ) );
130                 } else {
131                     Debug( LDAP_DEBUG_STATS, "handle_pdus: "
132                             "ber_get_next on fd=%d connid=%lu received "
133                             "a strange PDU tag=%lx\n",
134                             c->c_fd, c->c_connid, tag );
135                 }
136 
137                 c->c_currentber = NULL;
138                 ber_free( ber, 1 );
139                 CONNECTION_LOCK_DESTROY(c);
140                 goto done;
141             }
142             break;
143         }
144 
145         assert( IS_ALIVE( c, c_refcnt ) );
146         epoch_leave( epoch );
147         epoch = epoch_join();
148         assert( IS_ALIVE( c, c_refcnt ) );
149     }
150 
151     checked_lock( &c->c_io_mutex );
152     if ( !(lload_features & LLOAD_FEATURE_PAUSE) ||
153             !(c->c_io_state & LLOAD_C_READ_PAUSE) ) {
154         event_add( c->c_read_event, c->c_read_timeout );
155         Debug( LDAP_DEBUG_CONNS, "handle_pdus: "
156                 "re-enabled read event on connid=%lu\n",
157                 c->c_connid );
158     }
159 pause:
160     c->c_io_state &= ~LLOAD_C_READ_HANDOVER;
161     checked_unlock( &c->c_io_mutex );
162 
163 done:
164     RELEASE_REF( c, c_refcnt, c->c_destroy );
165     epoch_leave( epoch );
166     return NULL;
167 }
168 
169 /*
170  * Initial read on the connection, if we get an LDAP PDU, submit the
171  * processing of this and successive ones to the work queue.
172  *
173  * If we can't submit it to the queue (overload), process this one and return
174  * to the event loop immediately after.
175  */
176 void
connection_read_cb(evutil_socket_t s,short what,void * arg)177 connection_read_cb( evutil_socket_t s, short what, void *arg )
178 {
179     LloadConnection *c = arg;
180     BerElement *ber;
181     ber_tag_t tag;
182     ber_len_t len;
183     epoch_t epoch;
184     int pause;
185 
186     if ( !IS_ALIVE( c, c_live ) ) {
187         event_del( c->c_read_event );
188         Debug( LDAP_DEBUG_CONNS, "connection_read_cb: "
189                 "suspended read event on a dead connid=%lu\n",
190                 c->c_connid );
191         return;
192     }
193 
194     if ( what & EV_TIMEOUT ) {
195         Debug( LDAP_DEBUG_CONNS, "connection_read_cb: "
196                 "connid=%lu, timeout reached, destroying\n",
197                 c->c_connid );
198         /* Make sure the connection stays around for us to unlock it */
199         epoch = epoch_join();
200         CONNECTION_LOCK_DESTROY(c);
201         epoch_leave( epoch );
202         return;
203     }
204 
205     if ( !acquire_ref( &c->c_refcnt ) ) {
206         return;
207     }
208     epoch = epoch_join();
209 
210     Debug( LDAP_DEBUG_CONNS, "connection_read_cb: "
211             "connection connid=%lu ready to read\n",
212             c->c_connid );
213 
214     ber = c->c_currentber;
215     if ( ber == NULL && (ber = ber_alloc()) == NULL ) {
216         Debug( LDAP_DEBUG_ANY, "connection_read_cb: "
217                 "connid=%lu, ber_alloc failed\n",
218                 c->c_connid );
219         goto out;
220     }
221     c->c_currentber = ber;
222 
223     checked_lock( &c->c_io_mutex );
224     assert( !(c->c_io_state & LLOAD_C_READ_HANDOVER) );
225     tag = ber_get_next( c->c_sb, &len, ber );
226     pause = c->c_io_state & LLOAD_C_READ_PAUSE;
227     checked_unlock( &c->c_io_mutex );
228 
229     if ( tag != LDAP_TAG_MESSAGE ) {
230         int err = sock_errno();
231 
232         if ( err != EWOULDBLOCK && err != EAGAIN ) {
233             if ( err || tag == LBER_ERROR ) {
234                 char ebuf[128];
235                 Debug( LDAP_DEBUG_STATS, "connection_read_cb: "
236                         "ber_get_next on fd=%d failed errno=%d (%s)\n",
237                         c->c_fd, err,
238                         sock_errstr( err, ebuf, sizeof(ebuf) ) );
239             } else {
240                 Debug( LDAP_DEBUG_STATS, "connection_read_cb: "
241                         "ber_get_next on fd=%d connid=%lu received "
242                         "a strange PDU tag=%lx\n",
243                         c->c_fd, c->c_connid, tag );
244             }
245 
246             c->c_currentber = NULL;
247             ber_free( ber, 1 );
248 
249             event_del( c->c_read_event );
250             Debug( LDAP_DEBUG_CONNS, "connection_read_cb: "
251                     "suspended read event on dying connid=%lu\n",
252                     c->c_connid );
253             CONNECTION_LOCK_DESTROY(c);
254             goto out;
255         }
256         if ( !(lload_features & LLOAD_FEATURE_PAUSE) || !pause ) {
257             event_add( c->c_read_event, c->c_read_timeout );
258             Debug( LDAP_DEBUG_CONNS, "connection_read_cb: "
259                     "re-enabled read event on connid=%lu\n",
260                     c->c_connid );
261         }
262         goto out;
263     }
264 
265     checked_lock( &c->c_io_mutex );
266     c->c_io_state |= LLOAD_C_READ_HANDOVER;
267     checked_unlock( &c->c_io_mutex );
268     event_del( c->c_read_event );
269 
270     if ( !lload_conn_max_pdus_per_cycle ||
271             ldap_pvt_thread_pool_submit( &connection_pool, handle_pdus, c ) ) {
272         /* If we're overloaded or configured as such, process one and resume in
273          * the next cycle. */
274         int rc = c->c_pdu_cb( c );
275 
276         checked_lock( &c->c_io_mutex );
277         c->c_io_state &= ~LLOAD_C_READ_HANDOVER;
278         if ( rc == LDAP_SUCCESS &&
279                 ( !(lload_features & LLOAD_FEATURE_PAUSE) ||
280                         !(c->c_io_state & LLOAD_C_READ_PAUSE) ) ) {
281             event_add( c->c_read_event, c->c_read_timeout );
282         }
283         checked_unlock( &c->c_io_mutex );
284         goto out;
285     }
286 
287     Debug( LDAP_DEBUG_CONNS, "connection_read_cb: "
288             "suspended read event on connid=%lu\n",
289             c->c_connid );
290 
291     /*
292      * We have scheduled a call to handle_pdus to take care of handling this
293      * and further requests, its reference is now owned by that task.
294      */
295     epoch_leave( epoch );
296     return;
297 
298 out:
299     RELEASE_REF( c, c_refcnt, c->c_destroy );
300     epoch_leave( epoch );
301 }
302 
303 void
connection_write_cb(evutil_socket_t s,short what,void * arg)304 connection_write_cb( evutil_socket_t s, short what, void *arg )
305 {
306     LloadConnection *c = arg;
307     epoch_t epoch;
308 
309     Debug( LDAP_DEBUG_CONNS, "connection_write_cb: "
310             "considering writing to%s connid=%lu what=%hd\n",
311             c->c_live ? " live" : " dead", c->c_connid, what );
312     if ( !IS_ALIVE( c, c_live ) ) {
313         return;
314     }
315 
316     if ( what & EV_TIMEOUT ) {
317         Debug( LDAP_DEBUG_CONNS, "connection_write_cb: "
318                 "connid=%lu, timeout reached, destroying\n",
319                 c->c_connid );
320         /* Make sure the connection stays around for us to unlock it */
321         epoch = epoch_join();
322         CONNECTION_LOCK_DESTROY(c);
323         epoch_leave( epoch );
324         return;
325     }
326 
327     /* Before we acquire any locks */
328     event_del( c->c_write_event );
329 
330     if ( !acquire_ref( &c->c_refcnt ) ) {
331         return;
332     }
333 
334     /* If what == 0, we have a caller as opposed to being a callback */
335     if ( what ) {
336         epoch = epoch_join();
337     }
338 
339     checked_lock( &c->c_io_mutex );
340     Debug( LDAP_DEBUG_CONNS, "connection_write_cb: "
341             "have something to write to connection connid=%lu\n",
342             c->c_connid );
343 
344     /* We might have been beaten to flushing the data by another thread */
345     if ( c->c_pendingber && ber_flush( c->c_sb, c->c_pendingber, 1 ) ) {
346         int err = sock_errno();
347 
348         if ( err != EWOULDBLOCK && err != EAGAIN ) {
349             char ebuf[128];
350             checked_unlock( &c->c_io_mutex );
351             Debug( LDAP_DEBUG_ANY, "connection_write_cb: "
352                     "ber_flush on fd=%d failed errno=%d (%s)\n",
353                     c->c_fd, err, sock_errstr( err, ebuf, sizeof(ebuf) ) );
354             CONNECTION_LOCK_DESTROY(c);
355             goto done;
356         }
357 
358         if ( !(c->c_io_state & LLOAD_C_READ_PAUSE) ) {
359             Debug( LDAP_DEBUG_CONNS, "connection_write_cb: "
360                     "connection connid=%lu blocked on writing, marking "
361                     "paused\n",
362                     c->c_connid );
363         }
364         c->c_io_state |= LLOAD_C_READ_PAUSE;
365 
366         /* TODO: Do not reset write timeout unless we wrote something */
367         event_add( c->c_write_event, lload_write_timeout );
368     } else {
369         c->c_pendingber = NULL;
370         if ( c->c_io_state & LLOAD_C_READ_PAUSE ) {
371             c->c_io_state ^= LLOAD_C_READ_PAUSE;
372             Debug( LDAP_DEBUG_CONNS, "connection_write_cb: "
373                     "Unpausing connection connid=%lu\n",
374                     c->c_connid );
375             if ( !(c->c_io_state & LLOAD_C_READ_HANDOVER) ) {
376                 event_add( c->c_read_event, c->c_read_timeout );
377             }
378         }
379     }
380     checked_unlock( &c->c_io_mutex );
381 
382 done:
383     RELEASE_REF( c, c_refcnt, c->c_destroy );
384     if ( what ) {
385         epoch_leave( epoch );
386     }
387 }
388 
389 void
connection_destroy(LloadConnection * c)390 connection_destroy( LloadConnection *c )
391 {
392     assert( c );
393     Debug( LDAP_DEBUG_CONNS, "connection_destroy: "
394             "destroying connection connid=%lu\n",
395             c->c_connid );
396 
397     CONNECTION_ASSERT_LOCKED(c);
398     assert( c->c_live == 0 );
399     assert( c->c_refcnt == 0 );
400     assert( c->c_state == LLOAD_C_INVALID );
401 
402     ber_sockbuf_free( c->c_sb );
403 
404     if ( c->c_currentber ) {
405         ber_free( c->c_currentber, 1 );
406         c->c_currentber = NULL;
407     }
408     if ( c->c_pendingber ) {
409         ber_free( c->c_pendingber, 1 );
410         c->c_pendingber = NULL;
411     }
412 
413     if ( !BER_BVISNULL( &c->c_sasl_bind_mech ) ) {
414         ber_memfree( c->c_sasl_bind_mech.bv_val );
415         BER_BVZERO( &c->c_sasl_bind_mech );
416     }
417 #ifdef HAVE_CYRUS_SASL
418     if ( c->c_sasl_defaults ) {
419         lutil_sasl_freedefs( c->c_sasl_defaults );
420         c->c_sasl_defaults = NULL;
421     }
422     if ( c->c_sasl_authctx ) {
423 #ifdef SASL_CHANNEL_BINDING /* 2.1.25+ */
424         if ( c->c_sasl_cbinding ) {
425             ch_free( c->c_sasl_cbinding );
426         }
427 #endif
428         sasl_dispose( &c->c_sasl_authctx );
429     }
430 #endif /* HAVE_CYRUS_SASL */
431 
432     CONNECTION_UNLOCK(c);
433 
434     ldap_pvt_thread_mutex_destroy( &c->c_io_mutex );
435     ldap_pvt_thread_mutex_destroy( &c->c_mutex );
436 
437     ch_free( c );
438 
439     listeners_reactivate();
440 }
441 
442 /*
443  * Called holding mutex, will walk cq calling cb on all connections whose
444  * c_connid <= cq_last->c_connid that still exist at the time we get to them.
445  */
446 void
connections_walk_last(ldap_pvt_thread_mutex_t * cq_mutex,lload_c_head * cq,LloadConnection * cq_last,CONNCB cb,void * arg)447 connections_walk_last(
448         ldap_pvt_thread_mutex_t *cq_mutex,
449         lload_c_head *cq,
450         LloadConnection *cq_last,
451         CONNCB cb,
452         void *arg )
453 {
454     LloadConnection *c = cq_last;
455     uintptr_t last_connid;
456 
457     if ( LDAP_CIRCLEQ_EMPTY( cq ) ) {
458         return;
459     }
460     assert_locked( cq_mutex );
461 
462     last_connid = c->c_connid;
463     c = LDAP_CIRCLEQ_LOOP_NEXT( cq, c, c_next );
464 
465     while ( !acquire_ref( &c->c_refcnt ) ) {
466         c = LDAP_CIRCLEQ_LOOP_NEXT( cq, c, c_next );
467         if ( c->c_connid >= last_connid ) {
468             assert_locked( cq_mutex );
469             return;
470         }
471     }
472 
473     /*
474      * Notes:
475      * - we maintain the connections in the cq CIRCLEQ_ in ascending c_connid
476      *   order
477      * - the connection with the highest c_connid is passed in cq_last
478      * - we can only use cq when we hold cq_mutex
479      * - connections might be added to or removed from cq while we're busy
480      *   processing connections
481      * - we need a way to detect we've finished looping around cq for some
482      *   definition of looping around
483      */
484     do {
485         int rc;
486 
487         checked_unlock( cq_mutex );
488 
489         rc = cb( c, arg );
490         RELEASE_REF( c, c_refcnt, c->c_destroy );
491 
492         checked_lock( cq_mutex );
493         if ( rc || LDAP_CIRCLEQ_EMPTY( cq ) ) {
494             break;
495         }
496 
497         do {
498             LloadConnection *old = c;
499             c = LDAP_CIRCLEQ_LOOP_NEXT( cq, c, c_next );
500             if ( c->c_connid <= old->c_connid || c->c_connid > last_connid ) {
501                 assert_locked( cq_mutex );
502                 return;
503             }
504         } while ( !acquire_ref( &c->c_refcnt ) );
505     } while ( c->c_connid <= last_connid );
506     assert_locked( cq_mutex );
507 }
508 
509 void
connections_walk(ldap_pvt_thread_mutex_t * cq_mutex,lload_c_head * cq,CONNCB cb,void * arg)510 connections_walk(
511         ldap_pvt_thread_mutex_t *cq_mutex,
512         lload_c_head *cq,
513         CONNCB cb,
514         void *arg )
515 {
516     LloadConnection *cq_last = LDAP_CIRCLEQ_LAST( cq );
517     return connections_walk_last( cq_mutex, cq, cq_last, cb, arg );
518 }
519 
520 int
lload_connection_close(LloadConnection * c,void * arg)521 lload_connection_close( LloadConnection *c, void *arg )
522 {
523     int gentle = *(int *)arg;
524     LloadOperation *op;
525 
526     Debug( LDAP_DEBUG_CONNS, "lload_connection_close: "
527             "marking connection connid=%lu closing\n",
528             c->c_connid );
529 
530     /* We were approached from the connection list */
531     assert( IS_ALIVE( c, c_refcnt ) );
532 
533     CONNECTION_LOCK(c);
534     if ( !gentle || !c->c_ops ) {
535         CONNECTION_DESTROY(c);
536         return LDAP_SUCCESS;
537     }
538 
539     /* The first thing we do is make sure we don't get new Operations in */
540     c->c_state = LLOAD_C_CLOSING;
541 
542     do {
543         TAvlnode *node = ldap_tavl_end( c->c_ops, TAVL_DIR_LEFT );
544         op = node->avl_data;
545 
546         /* Close operations that would need client action to resolve,
547          * only SASL binds in progress do that right now */
548         if ( op->o_client_msgid || op->o_upstream_msgid ) {
549             break;
550         }
551 
552         CONNECTION_UNLOCK(c);
553         operation_unlink( op );
554         CONNECTION_LOCK(c);
555     } while ( c->c_ops );
556 
557     CONNECTION_UNLOCK(c);
558     return LDAP_SUCCESS;
559 }
560 
561 LloadConnection *
lload_connection_init(ber_socket_t s,const char * peername,int flags)562 lload_connection_init( ber_socket_t s, const char *peername, int flags )
563 {
564     LloadConnection *c;
565 
566     assert( peername != NULL );
567 
568     if ( s == AC_SOCKET_INVALID ) {
569         Debug( LDAP_DEBUG_ANY, "lload_connection_init: "
570                 "init of socket fd=%ld invalid\n",
571                 (long)s );
572         return NULL;
573     }
574 
575     assert( s >= 0 );
576 
577     c = ch_calloc( 1, sizeof(LloadConnection) );
578 
579     c->c_fd = s;
580     c->c_sb = ber_sockbuf_alloc();
581     ber_sockbuf_ctrl( c->c_sb, LBER_SB_OPT_SET_FD, &s );
582 
583 #ifdef LDAP_PF_LOCAL
584     if ( flags & CONN_IS_IPC ) {
585 #ifdef LDAP_DEBUG
586         ber_sockbuf_add_io( c->c_sb, &ber_sockbuf_io_debug,
587                 LBER_SBIOD_LEVEL_PROVIDER, (void *)"ipc_" );
588 #endif
589         ber_sockbuf_add_io( c->c_sb, &ber_sockbuf_io_fd,
590                 LBER_SBIOD_LEVEL_PROVIDER, (void *)&s );
591     } else
592 #endif /* LDAP_PF_LOCAL */
593     {
594 #ifdef LDAP_DEBUG
595         ber_sockbuf_add_io( c->c_sb, &ber_sockbuf_io_debug,
596                 LBER_SBIOD_LEVEL_PROVIDER, (void *)"tcp_" );
597 #endif
598         ber_sockbuf_add_io( c->c_sb, &ber_sockbuf_io_tcp,
599                 LBER_SBIOD_LEVEL_PROVIDER, (void *)&s );
600     }
601 
602 #ifdef LDAP_DEBUG
603     ber_sockbuf_add_io(
604             c->c_sb, &ber_sockbuf_io_debug, INT_MAX, (void *)"lload_" );
605 #endif
606 
607     c->c_next_msgid = 1;
608     c->c_refcnt = c->c_live = 1;
609     c->c_destroy = connection_destroy;
610 
611     LDAP_CIRCLEQ_ENTRY_INIT( c, c_next );
612 
613     ldap_pvt_thread_mutex_init( &c->c_mutex );
614     ldap_pvt_thread_mutex_init( &c->c_io_mutex );
615 
616     lload_connection_assign_nextid( c );
617 
618     Debug( LDAP_DEBUG_CONNS, "lload_connection_init: "
619             "connection connid=%lu allocated for socket fd=%d peername=%s\n",
620             c->c_connid, s, peername );
621 
622     c->c_state = LLOAD_C_ACTIVE;
623 
624     return c;
625 }
626