1 /* $NetBSD: daemon.c,v 1.2 2021/08/14 16:14:58 christos Exp $ */
2
3 /* $OpenLDAP$ */
4 /* This work is part of OpenLDAP Software <http://www.openldap.org/>.
5 *
6 * Copyright 1998-2021 The OpenLDAP Foundation.
7 * Portions Copyright 2007 by Howard Chu, Symas Corporation.
8 * All rights reserved.
9 *
10 * Redistribution and use in source and binary forms, with or without
11 * modification, are permitted only as authorized by the OpenLDAP
12 * Public License.
13 *
14 * A copy of this license is available in the file LICENSE in the
15 * top-level directory of the distribution or, alternatively, at
16 * <http://www.OpenLDAP.org/license.html>.
17 */
18 /* Portions Copyright (c) 1995 Regents of the University of Michigan.
19 * All rights reserved.
20 *
21 * Redistribution and use in source and binary forms are permitted
22 * provided that this notice is preserved and that due credit is given
23 * to the University of Michigan at Ann Arbor. The name of the University
24 * may not be used to endorse or promote products derived from this
25 * software without specific prior written permission. This software
26 * is provided ``as is'' without express or implied warranty.
27 */
28
29 #include <sys/cdefs.h>
30 __RCSID("$NetBSD: daemon.c,v 1.2 2021/08/14 16:14:58 christos Exp $");
31
32 #include "portable.h"
33
34 #include <stdio.h>
35
36 #include <ac/ctype.h>
37 #include <ac/errno.h>
38 #include <ac/socket.h>
39 #include <ac/string.h>
40 #include <ac/time.h>
41 #include <ac/unistd.h>
42
43 #include <event2/event.h>
44 #include <event2/dns.h>
45 #include <event2/listener.h>
46
47 #include "lload.h"
48 #include "ldap_pvt_thread.h"
49 #include "lutil.h"
50
51 #include "ldap_rq.h"
52
53 #ifdef HAVE_SYSTEMD_SD_DAEMON_H
54 #include <systemd/sd-daemon.h>
55 #endif
56
57 #ifdef LDAP_PF_LOCAL
58 #include <sys/stat.h>
59 /* this should go in <ldap.h> as soon as it is accepted */
60 #define LDAPI_MOD_URLEXT "x-mod"
61 #endif /* LDAP_PF_LOCAL */
62
63 #ifndef BALANCER_MODULE
64 #ifdef LDAP_PF_INET6
65 int slap_inet4or6 = AF_UNSPEC;
66 #else /* ! INETv6 */
67 int slap_inet4or6 = AF_INET;
68 #endif /* ! INETv6 */
69
70 /* globals */
71 time_t starttime;
72 struct runqueue_s slapd_rq;
73
74 #ifdef LDAP_TCP_BUFFER
75 int slapd_tcp_rmem;
76 int slapd_tcp_wmem;
77 #endif /* LDAP_TCP_BUFFER */
78
79 volatile sig_atomic_t slapd_shutdown = 0;
80 volatile sig_atomic_t slapd_gentle_shutdown = 0;
81 volatile sig_atomic_t slapd_abrupt_shutdown = 0;
82 #endif /* !BALANCER_MODULE */
83
84 static int emfile;
85
86 ldap_pvt_thread_mutex_t lload_wait_mutex;
87 ldap_pvt_thread_cond_t lload_wait_cond;
88 ldap_pvt_thread_cond_t lload_pause_cond;
89
90 #ifndef SLAPD_MAX_DAEMON_THREADS
91 #define SLAPD_MAX_DAEMON_THREADS 16
92 #endif
93 int lload_daemon_threads = 1;
94 int lload_daemon_mask;
95
96 struct event_base *listener_base = NULL;
97 LloadListener **lload_listeners = NULL;
98 static ldap_pvt_thread_t listener_tid, *daemon_tid;
99
100 struct event_base *daemon_base = NULL;
101 struct evdns_base *dnsbase;
102
103 struct event *lload_timeout_event;
104
105 /*
106 * global lload statistics. Not mutex protected to preserve performance -
107 * increment is atomic, at most we risk a bit of inconsistency
108 */
109 lload_global_stats_t lload_stats = {};
110
111 #ifndef SLAPD_LISTEN_BACKLOG
112 #define SLAPD_LISTEN_BACKLOG 1024
113 #endif /* ! SLAPD_LISTEN_BACKLOG */
114
115 #define DAEMON_ID(fd) ( fd & lload_daemon_mask )
116
117 #ifdef HAVE_WINSOCK
118 ldap_pvt_thread_mutex_t slapd_ws_mutex;
119 SOCKET *slapd_ws_sockets;
120 #define SD_READ 1
121 #define SD_WRITE 2
122 #define SD_ACTIVE 4
123 #define SD_LISTENER 8
124 #endif
125
126 #ifdef HAVE_TCPD
127 static ldap_pvt_thread_mutex_t sd_tcpd_mutex;
128 #endif /* TCP Wrappers */
129
130 typedef struct listener_item {
131 struct evconnlistener *listener;
132 ber_socket_t fd;
133 } listener_item;
134
135 typedef struct lload_daemon_st {
136 ldap_pvt_thread_mutex_t sd_mutex;
137
138 struct event_base *base;
139 struct event *wakeup_event;
140 } lload_daemon_st;
141
142 static lload_daemon_st lload_daemon[SLAPD_MAX_DAEMON_THREADS];
143
144 static void daemon_wakeup_cb( evutil_socket_t sig, short what, void *arg );
145
146 static void
lloadd_close(ber_socket_t s)147 lloadd_close( ber_socket_t s )
148 {
149 Debug( LDAP_DEBUG_CONNS, "lloadd_close: "
150 "closing fd=%ld\n",
151 (long)s );
152 tcp_close( s );
153 }
154
155 static void
lload_free_listener_addresses(struct sockaddr ** sal)156 lload_free_listener_addresses( struct sockaddr **sal )
157 {
158 struct sockaddr **sap;
159 if ( sal == NULL ) return;
160 for ( sap = sal; *sap != NULL; sap++ )
161 ch_free(*sap);
162 ch_free( sal );
163 }
164
165 #if defined(LDAP_PF_LOCAL) || defined(SLAP_X_LISTENER_MOD)
166 static int
get_url_perms(char ** exts,mode_t * perms,int * crit)167 get_url_perms( char **exts, mode_t *perms, int *crit )
168 {
169 int i;
170
171 assert( exts != NULL );
172 assert( perms != NULL );
173 assert( crit != NULL );
174
175 *crit = 0;
176 for ( i = 0; exts[i]; i++ ) {
177 char *type = exts[i];
178 int c = 0;
179
180 if ( type[0] == '!' ) {
181 c = 1;
182 type++;
183 }
184
185 if ( strncasecmp( type, LDAPI_MOD_URLEXT "=",
186 sizeof(LDAPI_MOD_URLEXT "=") - 1 ) == 0 ) {
187 char *value = type + ( sizeof(LDAPI_MOD_URLEXT "=") - 1 );
188 mode_t p = 0;
189 int j;
190
191 switch ( strlen( value ) ) {
192 case 4:
193 /* skip leading '0' */
194 if ( value[0] != '0' ) return LDAP_OTHER;
195 value++;
196
197 case 3:
198 for ( j = 0; j < 3; j++ ) {
199 int v;
200
201 v = value[j] - '0';
202
203 if ( v < 0 || v > 7 ) return LDAP_OTHER;
204
205 p |= v << 3 * ( 2 - j );
206 }
207 break;
208
209 case 10:
210 for ( j = 1; j < 10; j++ ) {
211 static mode_t m[] = { 0, S_IRUSR, S_IWUSR, S_IXUSR,
212 S_IRGRP, S_IWGRP, S_IXGRP, S_IROTH, S_IWOTH,
213 S_IXOTH };
214 static const char c[] = "-rwxrwxrwx";
215
216 if ( value[j] == c[j] ) {
217 p |= m[j];
218
219 } else if ( value[j] != '-' ) {
220 return LDAP_OTHER;
221 }
222 }
223 break;
224
225 default:
226 return LDAP_OTHER;
227 }
228
229 *crit = c;
230 *perms = p;
231
232 return LDAP_SUCCESS;
233 }
234 }
235
236 return LDAP_OTHER;
237 }
238 #endif /* LDAP_PF_LOCAL || SLAP_X_LISTENER_MOD */
239
240 /* port = 0 indicates AF_LOCAL */
241 static int
lload_get_listener_addresses(const char * host,unsigned short port,struct sockaddr *** sal)242 lload_get_listener_addresses(
243 const char *host,
244 unsigned short port,
245 struct sockaddr ***sal )
246 {
247 struct sockaddr **sap;
248
249 #ifdef LDAP_PF_LOCAL
250 if ( port == 0 ) {
251 sap = *sal = ch_malloc( 2 * sizeof(void *) );
252
253 *sap = ch_calloc( 1, sizeof(struct sockaddr_un) );
254 sap[1] = NULL;
255
256 if ( strlen( host ) >
257 ( sizeof( ((struct sockaddr_un *)*sap)->sun_path ) - 1 ) ) {
258 Debug( LDAP_DEBUG_ANY, "lload_get_listener_addresses: "
259 "domain socket path (%s) too long in URL\n",
260 host );
261 goto errexit;
262 }
263
264 (*sap)->sa_family = AF_LOCAL;
265 strcpy( ((struct sockaddr_un *)*sap)->sun_path, host );
266 } else
267 #endif /* LDAP_PF_LOCAL */
268 {
269 #ifdef HAVE_GETADDRINFO
270 struct addrinfo hints, *res, *sai;
271 int n, err;
272 char serv[7];
273
274 memset( &hints, '\0', sizeof(hints) );
275 hints.ai_flags = AI_PASSIVE;
276 hints.ai_socktype = SOCK_STREAM;
277 hints.ai_family = slap_inet4or6;
278 snprintf( serv, sizeof(serv), "%d", port );
279
280 if ( (err = getaddrinfo( host, serv, &hints, &res )) ) {
281 Debug( LDAP_DEBUG_ANY, "lload_get_listener_addresses: "
282 "getaddrinfo() failed: %s\n",
283 AC_GAI_STRERROR(err) );
284 return -1;
285 }
286
287 sai = res;
288 for ( n = 2; ( sai = sai->ai_next ) != NULL; n++ ) {
289 /* EMPTY */;
290 }
291 sap = *sal = ch_calloc( n, sizeof(void *) );
292
293 *sap = NULL;
294
295 for ( sai = res; sai; sai = sai->ai_next ) {
296 if ( sai->ai_addr == NULL ) {
297 Debug( LDAP_DEBUG_ANY, "lload_get_listener_addresses: "
298 "getaddrinfo ai_addr is NULL?\n" );
299 freeaddrinfo( res );
300 goto errexit;
301 }
302
303 switch ( sai->ai_family ) {
304 #ifdef LDAP_PF_INET6
305 case AF_INET6:
306 *sap = ch_malloc( sizeof(struct sockaddr_in6) );
307 *(struct sockaddr_in6 *)*sap =
308 *((struct sockaddr_in6 *)sai->ai_addr);
309 break;
310 #endif /* LDAP_PF_INET6 */
311 case AF_INET:
312 *sap = ch_malloc( sizeof(struct sockaddr_in) );
313 *(struct sockaddr_in *)*sap =
314 *((struct sockaddr_in *)sai->ai_addr);
315 break;
316 default:
317 *sap = NULL;
318 break;
319 }
320
321 if ( *sap != NULL ) {
322 (*sap)->sa_family = sai->ai_family;
323 sap++;
324 *sap = NULL;
325 }
326 }
327
328 freeaddrinfo( res );
329
330 #else /* ! HAVE_GETADDRINFO */
331 int i, n = 1;
332 struct in_addr in;
333 struct hostent *he = NULL;
334
335 if ( host == NULL ) {
336 in.s_addr = htonl( INADDR_ANY );
337
338 } else if ( !inet_aton( host, &in ) ) {
339 he = gethostbyname( host );
340 if ( he == NULL ) {
341 Debug( LDAP_DEBUG_ANY, "lload_get_listener_addresses: "
342 "invalid host %s\n",
343 host );
344 return -1;
345 }
346 for ( n = 0; he->h_addr_list[n]; n++ ) /* empty */;
347 }
348
349 sap = *sal = ch_malloc( ( n + 1 ) * sizeof(void *) );
350
351 for ( i = 0; i < n; i++ ) {
352 sap[i] = ch_calloc( 1, sizeof(struct sockaddr_in) );
353 sap[i]->sa_family = AF_INET;
354 ((struct sockaddr_in *)sap[i])->sin_port = htons( port );
355 AC_MEMCPY( &((struct sockaddr_in *)sap[i])->sin_addr,
356 he ? (struct in_addr *)he->h_addr_list[i] : &in,
357 sizeof(struct in_addr) );
358 }
359 sap[i] = NULL;
360 #endif /* ! HAVE_GETADDRINFO */
361 }
362
363 return 0;
364
365 errexit:
366 lload_free_listener_addresses(*sal);
367 return -1;
368 }
369
370 static int
lload_open_listener(const char * url,LDAPURLDesc * lud,int * listeners,int * cur)371 lload_open_listener(
372 const char *url,
373 LDAPURLDesc *lud,
374 int *listeners,
375 int *cur )
376 {
377 int num, tmp, rc;
378 LloadListener l;
379 LloadListener *li;
380 unsigned short port;
381 int err, addrlen = 0;
382 struct sockaddr **sal = NULL, **psal;
383 int socktype = SOCK_STREAM; /* default to COTS */
384 ber_socket_t s;
385 char ebuf[128];
386
387 #if defined(LDAP_PF_LOCAL) || defined(SLAP_X_LISTENER_MOD)
388 /*
389 * use safe defaults
390 */
391 int crit = 1;
392 #endif /* LDAP_PF_LOCAL || SLAP_X_LISTENER_MOD */
393
394 assert( url );
395 assert( lud );
396
397 l.sl_url.bv_val = NULL;
398 l.sl_mute = 0;
399 l.sl_busy = 0;
400
401 #ifndef HAVE_TLS
402 if ( ldap_pvt_url_scheme2tls( lud->lud_scheme ) ) {
403 Debug( LDAP_DEBUG_ANY, "lload_open_listener: "
404 "TLS not supported (%s)\n",
405 url );
406 ldap_free_urldesc( lud );
407 return -1;
408 }
409
410 if ( !lud->lud_port ) lud->lud_port = LDAP_PORT;
411
412 #else /* HAVE_TLS */
413 l.sl_is_tls = ldap_pvt_url_scheme2tls( lud->lud_scheme );
414 #endif /* HAVE_TLS */
415
416 l.sl_is_proxied = ldap_pvt_url_scheme2proxied( lud->lud_scheme );
417
418 #ifdef LDAP_TCP_BUFFER
419 l.sl_tcp_rmem = 0;
420 l.sl_tcp_wmem = 0;
421 #endif /* LDAP_TCP_BUFFER */
422
423 port = (unsigned short)lud->lud_port;
424
425 tmp = ldap_pvt_url_scheme2proto( lud->lud_scheme );
426 if ( tmp == LDAP_PROTO_IPC ) {
427 #ifdef LDAP_PF_LOCAL
428 if ( lud->lud_host == NULL || lud->lud_host[0] == '\0' ) {
429 err = lload_get_listener_addresses( LDAPI_SOCK, 0, &sal );
430 } else {
431 err = lload_get_listener_addresses( lud->lud_host, 0, &sal );
432 }
433 #else /* ! LDAP_PF_LOCAL */
434
435 Debug( LDAP_DEBUG_ANY, "lload_open_listener: "
436 "URL scheme not supported: %s\n",
437 url );
438 ldap_free_urldesc( lud );
439 return -1;
440 #endif /* ! LDAP_PF_LOCAL */
441 } else {
442 if ( lud->lud_host == NULL || lud->lud_host[0] == '\0' ||
443 strcmp( lud->lud_host, "*" ) == 0 ) {
444 err = lload_get_listener_addresses( NULL, port, &sal );
445 } else {
446 err = lload_get_listener_addresses( lud->lud_host, port, &sal );
447 }
448 }
449
450 #if defined(LDAP_PF_LOCAL) || defined(SLAP_X_LISTENER_MOD)
451 if ( lud->lud_exts ) {
452 err = get_url_perms( lud->lud_exts, &l.sl_perms, &crit );
453 } else {
454 l.sl_perms = S_IRWXU | S_IRWXO;
455 }
456 #endif /* LDAP_PF_LOCAL || SLAP_X_LISTENER_MOD */
457
458 ldap_free_urldesc( lud );
459 if ( err ) {
460 lload_free_listener_addresses( sal );
461 return -1;
462 }
463
464 /* If we got more than one address returned, we need to make space
465 * for it in the lload_listeners array.
466 */
467 for ( num = 0; sal[num]; num++ ) /* empty */;
468 if ( num > 1 ) {
469 *listeners += num - 1;
470 lload_listeners = ch_realloc( lload_listeners,
471 ( *listeners + 1 ) * sizeof(LloadListener *) );
472 }
473
474 psal = sal;
475 while ( *sal != NULL ) {
476 char *af;
477 switch ( (*sal)->sa_family ) {
478 case AF_INET:
479 af = "IPv4";
480 break;
481 #ifdef LDAP_PF_INET6
482 case AF_INET6:
483 af = "IPv6";
484 break;
485 #endif /* LDAP_PF_INET6 */
486 #ifdef LDAP_PF_LOCAL
487 case AF_LOCAL:
488 af = "Local";
489 break;
490 #endif /* LDAP_PF_LOCAL */
491 default:
492 sal++;
493 continue;
494 }
495
496 s = socket( (*sal)->sa_family, socktype, 0 );
497 if ( s == AC_SOCKET_INVALID ) {
498 int err = sock_errno();
499 Debug( LDAP_DEBUG_ANY, "lload_open_listener: "
500 "%s socket() failed errno=%d (%s)\n",
501 af, err, sock_errstr( err, ebuf, sizeof(ebuf) ) );
502 sal++;
503 continue;
504 }
505 ber_pvt_socket_set_nonblock( s, 1 );
506 l.sl_sd = s;
507
508 #ifdef LDAP_PF_LOCAL
509 if ( (*sal)->sa_family == AF_LOCAL ) {
510 unlink( ((struct sockaddr_un *)*sal)->sun_path );
511 } else
512 #endif /* LDAP_PF_LOCAL */
513 {
514 #ifdef SO_REUSEADDR
515 /* enable address reuse */
516 tmp = 1;
517 rc = setsockopt(
518 s, SOL_SOCKET, SO_REUSEADDR, (char *)&tmp, sizeof(tmp) );
519 if ( rc == AC_SOCKET_ERROR ) {
520 int err = sock_errno();
521 Debug( LDAP_DEBUG_ANY, "lload_open_listener(%ld): "
522 "setsockopt(SO_REUSEADDR) failed errno=%d (%s)\n",
523 (long)l.sl_sd, err,
524 sock_errstr( err, ebuf, sizeof(ebuf) ) );
525 }
526 #endif /* SO_REUSEADDR */
527 }
528
529 switch ( (*sal)->sa_family ) {
530 case AF_INET:
531 addrlen = sizeof(struct sockaddr_in);
532 break;
533 #ifdef LDAP_PF_INET6
534 case AF_INET6:
535 #ifdef IPV6_V6ONLY
536 /* Try to use IPv6 sockets for IPv6 only */
537 tmp = 1;
538 rc = setsockopt( s, IPPROTO_IPV6, IPV6_V6ONLY, (char *)&tmp,
539 sizeof(tmp) );
540 if ( rc == AC_SOCKET_ERROR ) {
541 int err = sock_errno();
542 Debug( LDAP_DEBUG_ANY, "lload_open_listener(%ld): "
543 "setsockopt(IPV6_V6ONLY) failed errno=%d (%s)\n",
544 (long)l.sl_sd, err,
545 sock_errstr( err, ebuf, sizeof(ebuf) ) );
546 }
547 #endif /* IPV6_V6ONLY */
548 addrlen = sizeof(struct sockaddr_in6);
549 break;
550 #endif /* LDAP_PF_INET6 */
551
552 #ifdef LDAP_PF_LOCAL
553 case AF_LOCAL:
554 #ifdef LOCAL_CREDS
555 {
556 int one = 1;
557 setsockopt( s, 0, LOCAL_CREDS, &one, sizeof(one) );
558 }
559 #endif /* LOCAL_CREDS */
560
561 addrlen = sizeof(struct sockaddr_un);
562 break;
563 #endif /* LDAP_PF_LOCAL */
564 }
565
566 #ifdef LDAP_PF_LOCAL
567 /* create socket with all permissions set for those systems
568 * that honor permissions on sockets (e.g. Linux); typically,
569 * only write is required. To exploit filesystem permissions,
570 * place the socket in a directory and use directory's
571 * permissions. Need write perms to the directory to
572 * create/unlink the socket; likely need exec perms to access
573 * the socket (ITS#4709) */
574 {
575 mode_t old_umask = 0;
576
577 if ( (*sal)->sa_family == AF_LOCAL ) {
578 old_umask = umask( 0 );
579 }
580 #endif /* LDAP_PF_LOCAL */
581 rc = bind( s, *sal, addrlen );
582 #ifdef LDAP_PF_LOCAL
583 if ( old_umask != 0 ) {
584 umask( old_umask );
585 }
586 }
587 #endif /* LDAP_PF_LOCAL */
588 if ( rc ) {
589 err = sock_errno();
590 Debug( LDAP_DEBUG_ANY, "lload_open_listener: "
591 "bind(%ld) failed errno=%d (%s)\n",
592 (long)l.sl_sd, err,
593 sock_errstr( err, ebuf, sizeof(ebuf) ) );
594 tcp_close( s );
595 sal++;
596 continue;
597 }
598
599 switch ( (*sal)->sa_family ) {
600 #ifdef LDAP_PF_LOCAL
601 case AF_LOCAL: {
602 char *path = ((struct sockaddr_un *)*sal)->sun_path;
603 l.sl_name.bv_len = strlen( path ) + STRLENOF("PATH=");
604 l.sl_name.bv_val = ch_malloc( l.sl_name.bv_len + 1 );
605 snprintf( l.sl_name.bv_val, l.sl_name.bv_len + 1, "PATH=%s",
606 path );
607 } break;
608 #endif /* LDAP_PF_LOCAL */
609
610 case AF_INET: {
611 char addr[INET_ADDRSTRLEN];
612 const char *s;
613 #if defined(HAVE_GETADDRINFO) && defined(HAVE_INET_NTOP)
614 s = inet_ntop( AF_INET,
615 &((struct sockaddr_in *)*sal)->sin_addr, addr,
616 sizeof(addr) );
617 #else /* ! HAVE_GETADDRINFO || ! HAVE_INET_NTOP */
618 s = inet_ntoa( ((struct sockaddr_in *)*sal)->sin_addr );
619 #endif /* ! HAVE_GETADDRINFO || ! HAVE_INET_NTOP */
620 if ( !s ) s = SLAP_STRING_UNKNOWN;
621 port = ntohs( ((struct sockaddr_in *)*sal)->sin_port );
622 l.sl_name.bv_val =
623 ch_malloc( sizeof("IP=255.255.255.255:65535") );
624 snprintf( l.sl_name.bv_val,
625 sizeof("IP=255.255.255.255:65535"), "IP=%s:%d", s,
626 port );
627 l.sl_name.bv_len = strlen( l.sl_name.bv_val );
628 } break;
629
630 #ifdef LDAP_PF_INET6
631 case AF_INET6: {
632 char addr[INET6_ADDRSTRLEN];
633 const char *s;
634 s = inet_ntop( AF_INET6,
635 &((struct sockaddr_in6 *)*sal)->sin6_addr, addr,
636 sizeof(addr) );
637 if ( !s ) s = SLAP_STRING_UNKNOWN;
638 port = ntohs( ((struct sockaddr_in6 *)*sal)->sin6_port );
639 l.sl_name.bv_len = strlen( s ) + sizeof("IP=[]:65535");
640 l.sl_name.bv_val = ch_malloc( l.sl_name.bv_len );
641 snprintf( l.sl_name.bv_val, l.sl_name.bv_len, "IP=[%s]:%d", s,
642 port );
643 l.sl_name.bv_len = strlen( l.sl_name.bv_val );
644 } break;
645 #endif /* LDAP_PF_INET6 */
646
647 default:
648 Debug( LDAP_DEBUG_ANY, "lload_open_listener: "
649 "unsupported address family (%d)\n",
650 (int)(*sal)->sa_family );
651 break;
652 }
653
654 AC_MEMCPY( &l.sl_sa, *sal, addrlen );
655 ber_str2bv( url, 0, 1, &l.sl_url );
656 li = ch_malloc( sizeof(LloadListener) );
657 *li = l;
658 lload_listeners[*cur] = li;
659 (*cur)++;
660 sal++;
661 }
662
663 lload_free_listener_addresses( psal );
664
665 if ( l.sl_url.bv_val == NULL ) {
666 Debug( LDAP_DEBUG_ANY, "lload_open_listener: "
667 "failed on %s\n",
668 url );
669 return -1;
670 }
671
672 Debug( LDAP_DEBUG_TRACE, "lload_open_listener: "
673 "listener initialized %s\n",
674 l.sl_url.bv_val );
675
676 return 0;
677 }
678
679 int
lload_open_new_listener(const char * url,LDAPURLDesc * lud)680 lload_open_new_listener( const char *url, LDAPURLDesc *lud )
681 {
682 int rc, i, j = 0;
683
684 for ( i = 0; lload_listeners && lload_listeners[i] != NULL;
685 i++ ) /* count */
686 ;
687 j = i;
688
689 i++;
690 lload_listeners = ch_realloc(
691 lload_listeners, ( i + 1 ) * sizeof(LloadListener *) );
692
693 rc = lload_open_listener( url, lud, &i, &j );
694 lload_listeners[j] = NULL;
695 return rc;
696 }
697
698 int lloadd_inited = 0;
699
700 int
lloadd_listeners_init(const char * urls)701 lloadd_listeners_init( const char *urls )
702 {
703 int i, j, n;
704 char **u;
705 LDAPURLDesc *lud;
706
707 Debug( LDAP_DEBUG_ARGS, "lloadd_listeners_init: %s\n",
708 urls ? urls : "<null>" );
709
710 #ifdef HAVE_TCPD
711 ldap_pvt_thread_mutex_init( &sd_tcpd_mutex );
712 #endif /* TCP Wrappers */
713
714 if ( urls == NULL ) urls = "ldap:///";
715
716 u = ldap_str2charray( urls, " " );
717
718 if ( u == NULL || u[0] == NULL ) {
719 Debug( LDAP_DEBUG_ANY, "lloadd_listeners_init: "
720 "no urls (%s) provided\n",
721 urls );
722 if ( u ) ldap_charray_free( u );
723 return -1;
724 }
725
726 for ( i = 0; u[i] != NULL; i++ ) {
727 Debug( LDAP_DEBUG_TRACE, "lloadd_listeners_init: "
728 "listen on %s\n",
729 u[i] );
730 }
731
732 if ( i == 0 ) {
733 Debug( LDAP_DEBUG_ANY, "lloadd_listeners_init: "
734 "no listeners to open (%s)\n",
735 urls );
736 ldap_charray_free( u );
737 return -1;
738 }
739
740 Debug( LDAP_DEBUG_TRACE, "lloadd_listeners_init: "
741 "%d listeners to open...\n",
742 i );
743 lload_listeners = ch_malloc( ( i + 1 ) * sizeof(LloadListener *) );
744
745 for ( n = 0, j = 0; u[n]; n++ ) {
746 if ( ldap_url_parse_ext( u[n], &lud, LDAP_PVT_URL_PARSE_DEF_PORT ) ) {
747 Debug( LDAP_DEBUG_ANY, "lloadd_listeners_init: "
748 "could not parse url %s\n",
749 u[n] );
750 ldap_charray_free( u );
751 return -1;
752 }
753
754 if ( lload_open_listener( u[n], lud, &i, &j ) ) {
755 ldap_charray_free( u );
756 return -1;
757 }
758 }
759 lload_listeners[j] = NULL;
760
761 Debug( LDAP_DEBUG_TRACE, "lloadd_listeners_init: "
762 "%d listeners opened\n",
763 i );
764
765 ldap_charray_free( u );
766
767 return !i;
768 }
769
770 int
lloadd_daemon_destroy(void)771 lloadd_daemon_destroy( void )
772 {
773 epoch_shutdown();
774 if ( lloadd_inited ) {
775 int i;
776
777 for ( i = 0; i < lload_daemon_threads; i++ ) {
778 ldap_pvt_thread_mutex_destroy( &lload_daemon[i].sd_mutex );
779 if ( lload_daemon[i].wakeup_event ) {
780 event_free( lload_daemon[i].wakeup_event );
781 }
782 if ( lload_daemon[i].base ) {
783 event_base_free( lload_daemon[i].base );
784 }
785 }
786
787 event_base_free( daemon_base );
788 daemon_base = NULL;
789
790 lloadd_inited = 0;
791 #ifdef HAVE_TCPD
792 ldap_pvt_thread_mutex_destroy( &sd_tcpd_mutex );
793 #endif /* TCP Wrappers */
794 }
795
796 return 0;
797 }
798
799 static void
destroy_listeners(void)800 destroy_listeners( void )
801 {
802 LloadListener *lr, **ll = lload_listeners;
803
804 if ( ll == NULL ) return;
805
806 ldap_pvt_thread_join( listener_tid, (void *)NULL );
807
808 while ( (lr = *ll++) != NULL ) {
809 if ( lr->sl_url.bv_val ) {
810 ber_memfree( lr->sl_url.bv_val );
811 }
812
813 if ( lr->sl_name.bv_val ) {
814 ber_memfree( lr->sl_name.bv_val );
815 }
816
817 #ifdef LDAP_PF_LOCAL
818 if ( lr->sl_sa.sa_addr.sa_family == AF_LOCAL ) {
819 unlink( lr->sl_sa.sa_un_addr.sun_path );
820 }
821 #endif /* LDAP_PF_LOCAL */
822
823 evconnlistener_free( lr->listener );
824
825 free( lr );
826 }
827
828 free( lload_listeners );
829 lload_listeners = NULL;
830
831 if ( listener_base ) {
832 event_base_free( listener_base );
833 }
834 }
835
836 static void
lload_listener(struct evconnlistener * listener,ber_socket_t s,struct sockaddr * a,int len,void * arg)837 lload_listener(
838 struct evconnlistener *listener,
839 ber_socket_t s,
840 struct sockaddr *a,
841 int len,
842 void *arg )
843 {
844 LloadListener *sl = arg;
845 LloadConnection *c;
846 Sockaddr *from = (Sockaddr *)a;
847 char peername[LDAP_IPADDRLEN];
848 struct berval peerbv = BER_BVC(peername);
849 int cflag;
850 int tid;
851 char ebuf[128];
852
853 Debug( LDAP_DEBUG_TRACE, ">>> lload_listener(%s)\n", sl->sl_url.bv_val );
854
855 peername[0] = '\0';
856
857 /* Resume the listener FD to allow concurrent-processing of
858 * additional incoming connections.
859 */
860 sl->sl_busy = 0;
861
862 tid = DAEMON_ID(s);
863
864 Debug( LDAP_DEBUG_CONNS, "lload_listener: "
865 "listen=%ld, new connection fd=%ld\n",
866 (long)sl->sl_sd, (long)s );
867
868 #if defined(SO_KEEPALIVE) || defined(TCP_NODELAY)
869 #ifdef LDAP_PF_LOCAL
870 /* for IPv4 and IPv6 sockets only */
871 if ( from->sa_addr.sa_family != AF_LOCAL )
872 #endif /* LDAP_PF_LOCAL */
873 {
874 int rc;
875 int tmp;
876 #ifdef SO_KEEPALIVE
877 /* enable keep alives */
878 tmp = 1;
879 rc = setsockopt(
880 s, SOL_SOCKET, SO_KEEPALIVE, (char *)&tmp, sizeof(tmp) );
881 if ( rc == AC_SOCKET_ERROR ) {
882 int err = sock_errno();
883 Debug( LDAP_DEBUG_ANY, "lload_listener(%ld): "
884 "setsockopt(SO_KEEPALIVE) failed errno=%d (%s)\n",
885 (long)s, err, sock_errstr( err, ebuf, sizeof(ebuf) ) );
886 }
887 #endif /* SO_KEEPALIVE */
888 #ifdef TCP_NODELAY
889 /* enable no delay */
890 tmp = 1;
891 rc = setsockopt(
892 s, IPPROTO_TCP, TCP_NODELAY, (char *)&tmp, sizeof(tmp) );
893 if ( rc == AC_SOCKET_ERROR ) {
894 int err = sock_errno();
895 Debug( LDAP_DEBUG_ANY, "lload_listener(%ld): "
896 "setsockopt(TCP_NODELAY) failed errno=%d (%s)\n",
897 (long)s, err, sock_errstr( err, ebuf, sizeof(ebuf) ) );
898 }
899 #endif /* TCP_NODELAY */
900 }
901 #endif /* SO_KEEPALIVE || TCP_NODELAY */
902
903 if ( sl->sl_is_proxied ) {
904 if ( !proxyp( s, from ) ) {
905 Debug( LDAP_DEBUG_ANY, "lload_listener: "
906 "proxyp(%ld) failed\n",
907 (long)s );
908 lloadd_close( s );
909 return;
910 }
911 }
912
913 cflag = 0;
914 switch ( from->sa_addr.sa_family ) {
915 #ifdef LDAP_PF_LOCAL
916 case AF_LOCAL:
917 cflag |= CONN_IS_IPC;
918
919 /* FIXME: apparently accept doesn't fill the sun_path member */
920 sprintf( peername, "PATH=%s", sl->sl_sa.sa_un_addr.sun_path );
921 break;
922 #endif /* LDAP_PF_LOCAL */
923
924 #ifdef LDAP_PF_INET6
925 case AF_INET6:
926 #endif /* LDAP_PF_INET6 */
927 case AF_INET:
928 ldap_pvt_sockaddrstr( from, &peerbv );
929 break;
930
931 default:
932 lloadd_close( s );
933 return;
934 }
935
936 #ifdef HAVE_TLS
937 if ( sl->sl_is_tls ) cflag |= CONN_IS_TLS;
938 #endif
939 c = client_init( s, peername, lload_daemon[tid].base, cflag );
940
941 if ( !c ) {
942 Debug( LDAP_DEBUG_ANY, "lload_listener: "
943 "client_init(%ld, %s, %s) failed\n",
944 (long)s, peername, sl->sl_name.bv_val );
945 lloadd_close( s );
946 }
947
948 return;
949 }
950
951 static void *
lload_listener_thread(void * ctx)952 lload_listener_thread( void *ctx )
953 {
954 int rc = event_base_dispatch( listener_base );
955 Debug( LDAP_DEBUG_ANY, "lload_listener_thread: "
956 "event loop finished: rc=%d\n",
957 rc );
958
959 return (void *)NULL;
960 }
961
962 static void
listener_error_cb(struct evconnlistener * lev,void * arg)963 listener_error_cb( struct evconnlistener *lev, void *arg )
964 {
965 LloadListener *l = arg;
966 int err = EVUTIL_SOCKET_ERROR();
967
968 assert( l->listener == lev );
969 if (
970 #ifdef EMFILE
971 err == EMFILE ||
972 #endif /* EMFILE */
973 #ifdef ENFILE
974 err == ENFILE ||
975 #endif /* ENFILE */
976 0 ) {
977 ldap_pvt_thread_mutex_lock( &lload_daemon[0].sd_mutex );
978 emfile++;
979 /* Stop listening until an existing session closes */
980 l->sl_mute = 1;
981 evconnlistener_disable( lev );
982 ldap_pvt_thread_mutex_unlock( &lload_daemon[0].sd_mutex );
983 Debug( LDAP_DEBUG_ANY, "listener_error_cb: "
984 "too many open files, cannot accept new connections on "
985 "url=%s\n",
986 l->sl_url.bv_val );
987 } else {
988 char ebuf[128];
989 Debug( LDAP_DEBUG_ANY, "listener_error_cb: "
990 "received an error on a listener, shutting down: '%s'\n",
991 sock_errstr( err, ebuf, sizeof(ebuf) ) );
992 event_base_loopexit( l->base, NULL );
993 }
994 }
995
996 void
listeners_reactivate(void)997 listeners_reactivate( void )
998 {
999 int i;
1000
1001 ldap_pvt_thread_mutex_lock( &lload_daemon[0].sd_mutex );
1002 for ( i = 0; emfile && lload_listeners[i] != NULL; i++ ) {
1003 LloadListener *lr = lload_listeners[i];
1004
1005 if ( lr->sl_sd == AC_SOCKET_INVALID ) continue;
1006 if ( lr->sl_mute ) {
1007 emfile--;
1008 evconnlistener_enable( lr->listener );
1009 lr->sl_mute = 0;
1010 Debug( LDAP_DEBUG_CONNS, "listeners_reactivate: "
1011 "reactivated listener url=%s\n",
1012 lr->sl_url.bv_val );
1013 }
1014 }
1015 if ( emfile && lload_listeners[i] == NULL ) {
1016 /* Walked the entire list without enabling anything; emfile
1017 * counter is stale. Reset it. */
1018 emfile = 0;
1019 }
1020 ldap_pvt_thread_mutex_unlock( &lload_daemon[0].sd_mutex );
1021 }
1022
1023 static int
lload_listener_activate(void)1024 lload_listener_activate( void )
1025 {
1026 struct evconnlistener *listener;
1027 int l, rc;
1028 char ebuf[128];
1029
1030 listener_base = event_base_new();
1031 if ( !listener_base ) return -1;
1032
1033 for ( l = 0; lload_listeners[l] != NULL; l++ ) {
1034 if ( lload_listeners[l]->sl_sd == AC_SOCKET_INVALID ) continue;
1035
1036 /* FIXME: TCP-only! */
1037 #ifdef LDAP_TCP_BUFFER
1038 if ( 1 ) {
1039 int origsize, size, realsize, rc;
1040 socklen_t optlen;
1041
1042 size = 0;
1043 if ( lload_listeners[l]->sl_tcp_rmem > 0 ) {
1044 size = lload_listeners[l]->sl_tcp_rmem;
1045 } else if ( slapd_tcp_rmem > 0 ) {
1046 size = slapd_tcp_rmem;
1047 }
1048
1049 if ( size > 0 ) {
1050 optlen = sizeof(origsize);
1051 rc = getsockopt( lload_listeners[l]->sl_sd, SOL_SOCKET,
1052 SO_RCVBUF, (void *)&origsize, &optlen );
1053
1054 if ( rc ) {
1055 int err = sock_errno();
1056 Debug( LDAP_DEBUG_ANY, "lload_listener_activate: "
1057 "getsockopt(SO_RCVBUF) failed errno=%d (%s)\n",
1058 err, AC_STRERROR_R( err, ebuf, sizeof(ebuf) ) );
1059 }
1060
1061 optlen = sizeof(size);
1062 rc = setsockopt( lload_listeners[l]->sl_sd, SOL_SOCKET,
1063 SO_RCVBUF, (const void *)&size, optlen );
1064
1065 if ( rc ) {
1066 int err = sock_errno();
1067 Debug( LDAP_DEBUG_ANY, "lload_listener_activate: "
1068 "setsockopt(SO_RCVBUF) failed errno=%d (%s)\n",
1069 err, sock_errstr( err, ebuf, sizeof(ebuf) ) );
1070 }
1071
1072 optlen = sizeof(realsize);
1073 rc = getsockopt( lload_listeners[l]->sl_sd, SOL_SOCKET,
1074 SO_RCVBUF, (void *)&realsize, &optlen );
1075
1076 if ( rc ) {
1077 int err = sock_errno();
1078 Debug( LDAP_DEBUG_ANY, "lload_listener_activate: "
1079 "getsockopt(SO_RCVBUF) failed errno=%d (%s)\n",
1080 err, sock_errstr( err, ebuf, sizeof(ebuf) ) );
1081 }
1082
1083 Debug( LDAP_DEBUG_ANY, "lload_listener_activate: "
1084 "url=%s (#%d) RCVBUF original size=%d requested "
1085 "size=%d real size=%d\n",
1086 lload_listeners[l]->sl_url.bv_val, l, origsize, size,
1087 realsize );
1088 }
1089
1090 size = 0;
1091 if ( lload_listeners[l]->sl_tcp_wmem > 0 ) {
1092 size = lload_listeners[l]->sl_tcp_wmem;
1093 } else if ( slapd_tcp_wmem > 0 ) {
1094 size = slapd_tcp_wmem;
1095 }
1096
1097 if ( size > 0 ) {
1098 optlen = sizeof(origsize);
1099 rc = getsockopt( lload_listeners[l]->sl_sd, SOL_SOCKET,
1100 SO_SNDBUF, (void *)&origsize, &optlen );
1101
1102 if ( rc ) {
1103 int err = sock_errno();
1104 Debug( LDAP_DEBUG_ANY, "lload_listener_activate: "
1105 "getsockopt(SO_SNDBUF) failed errno=%d (%s)\n",
1106 err, sock_errstr( err, ebuf, sizeof(ebuf) ) );
1107 }
1108
1109 optlen = sizeof(size);
1110 rc = setsockopt( lload_listeners[l]->sl_sd, SOL_SOCKET,
1111 SO_SNDBUF, (const void *)&size, optlen );
1112
1113 if ( rc ) {
1114 int err = sock_errno();
1115 Debug( LDAP_DEBUG_ANY, "lload_listener_activate: "
1116 "setsockopt(SO_SNDBUF) failed errno=%d (%s)\n",
1117 err, sock_errstr( err, ebuf, sizeof(ebuf) ) );
1118 }
1119
1120 optlen = sizeof(realsize);
1121 rc = getsockopt( lload_listeners[l]->sl_sd, SOL_SOCKET,
1122 SO_SNDBUF, (void *)&realsize, &optlen );
1123
1124 if ( rc ) {
1125 int err = sock_errno();
1126 Debug( LDAP_DEBUG_ANY, "lload_listener_activate: "
1127 "getsockopt(SO_SNDBUF) failed errno=%d (%s)\n",
1128 err, sock_errstr( err, ebuf, sizeof(ebuf) ) );
1129 }
1130
1131 Debug( LDAP_DEBUG_ANY, "lload_listener_activate: "
1132 "url=%s (#%d) SNDBUF original size=%d requested "
1133 "size=%d real size=%d\n",
1134 lload_listeners[l]->sl_url.bv_val, l, origsize, size,
1135 realsize );
1136 }
1137 }
1138 #endif /* LDAP_TCP_BUFFER */
1139
1140 lload_listeners[l]->sl_busy = 1;
1141 listener = evconnlistener_new( listener_base, lload_listener,
1142 lload_listeners[l],
1143 LEV_OPT_THREADSAFE|LEV_OPT_DEFERRED_ACCEPT,
1144 SLAPD_LISTEN_BACKLOG, lload_listeners[l]->sl_sd );
1145 if ( !listener ) {
1146 int err = sock_errno();
1147
1148 #ifdef LDAP_PF_INET6
1149 /* If error is EADDRINUSE, we are trying to listen to INADDR_ANY and
1150 * we are already listening to in6addr_any, then we want to ignore
1151 * this and continue.
1152 */
1153 if ( err == EADDRINUSE ) {
1154 int i;
1155 struct sockaddr_in sa = lload_listeners[l]->sl_sa.sa_in_addr;
1156 struct sockaddr_in6 sa6;
1157
1158 if ( sa.sin_family == AF_INET &&
1159 sa.sin_addr.s_addr == htonl( INADDR_ANY ) ) {
1160 for ( i = 0; i < l; i++ ) {
1161 sa6 = lload_listeners[i]->sl_sa.sa_in6_addr;
1162 if ( sa6.sin6_family == AF_INET6 &&
1163 !memcmp( &sa6.sin6_addr, &in6addr_any,
1164 sizeof(struct in6_addr) ) ) {
1165 break;
1166 }
1167 }
1168
1169 if ( i < l ) {
1170 /* We are already listening to in6addr_any */
1171 Debug( LDAP_DEBUG_CONNS, "lload_listener_activate: "
1172 "Attempt to listen to 0.0.0.0 failed, "
1173 "already listening on ::, assuming IPv4 "
1174 "included\n" );
1175 lloadd_close( lload_listeners[l]->sl_sd );
1176 lload_listeners[l]->sl_sd = AC_SOCKET_INVALID;
1177 continue;
1178 }
1179 }
1180 }
1181 #endif /* LDAP_PF_INET6 */
1182 Debug( LDAP_DEBUG_ANY, "lload_listener_activate: "
1183 "listen(%s, 5) failed errno=%d (%s)\n",
1184 lload_listeners[l]->sl_url.bv_val, err,
1185 sock_errstr( err, ebuf, sizeof(ebuf) ) );
1186 return -1;
1187 }
1188
1189 lload_listeners[l]->base = listener_base;
1190 lload_listeners[l]->listener = listener;
1191 evconnlistener_set_error_cb( listener, listener_error_cb );
1192 }
1193
1194 rc = ldap_pvt_thread_create(
1195 &listener_tid, 0, lload_listener_thread, lload_listeners[l] );
1196
1197 if ( rc != 0 ) {
1198 Debug( LDAP_DEBUG_ANY, "lload_listener_activate(%d): "
1199 "submit failed (%d)\n",
1200 lload_listeners[l]->sl_sd, rc );
1201 }
1202 return rc;
1203 }
1204
1205 static void *
lloadd_io_task(void * ptr)1206 lloadd_io_task( void *ptr )
1207 {
1208 int rc;
1209 int tid = (ldap_pvt_thread_t *)ptr - daemon_tid;
1210 struct event_base *base = lload_daemon[tid].base;
1211 struct event *event;
1212
1213 event = event_new( base, -1, EV_WRITE, daemon_wakeup_cb, ptr );
1214 if ( !event ) {
1215 Debug( LDAP_DEBUG_ANY, "lloadd_io_task: "
1216 "failed to set up the wakeup event\n" );
1217 return (void *)-1;
1218 }
1219 event_add( event, NULL );
1220 lload_daemon[tid].wakeup_event = event;
1221
1222 /* run */
1223 rc = event_base_dispatch( base );
1224 Debug( LDAP_DEBUG_ANY, "lloadd_io_task: "
1225 "Daemon %d, event loop finished: rc=%d\n",
1226 tid, rc );
1227
1228 if ( !slapd_gentle_shutdown ) {
1229 slapd_abrupt_shutdown = 1;
1230 }
1231
1232 return NULL;
1233 }
1234
1235 int
lloadd_daemon(struct event_base * daemon_base)1236 lloadd_daemon( struct event_base *daemon_base )
1237 {
1238 int i, rc;
1239 LloadBackend *b;
1240 struct event_base *base;
1241 struct event *event;
1242
1243 assert( daemon_base != NULL );
1244
1245 dnsbase = evdns_base_new( daemon_base, EVDNS_BASE_INITIALIZE_NAMESERVERS );
1246 if ( !dnsbase ) {
1247 Debug( LDAP_DEBUG_ANY, "lloadd startup: "
1248 "failed to set up for async name resolution\n" );
1249 return -1;
1250 }
1251
1252 if ( lload_daemon_threads > SLAPD_MAX_DAEMON_THREADS )
1253 lload_daemon_threads = SLAPD_MAX_DAEMON_THREADS;
1254
1255 daemon_tid =
1256 ch_malloc( lload_daemon_threads * sizeof(ldap_pvt_thread_t) );
1257
1258 for ( i = 0; i < lload_daemon_threads; i++ ) {
1259 base = event_base_new();
1260 if ( !base ) {
1261 Debug( LDAP_DEBUG_ANY, "lloadd startup: "
1262 "failed to acquire event base for an I/O thread\n" );
1263 return -1;
1264 }
1265 lload_daemon[i].base = base;
1266
1267 ldap_pvt_thread_mutex_init( &lload_daemon[i].sd_mutex );
1268 /* threads that handle client and upstream sockets */
1269 rc = ldap_pvt_thread_create(
1270 &daemon_tid[i], 0, lloadd_io_task, &daemon_tid[i] );
1271
1272 if ( rc != 0 ) {
1273 Debug( LDAP_DEBUG_ANY, "lloadd startup: "
1274 "listener ldap_pvt_thread_create failed (%d)\n",
1275 rc );
1276 return rc;
1277 }
1278 }
1279
1280 if ( (rc = lload_listener_activate()) != 0 ) {
1281 return rc;
1282 }
1283
1284 if ( !LDAP_CIRCLEQ_EMPTY( &backend ) ) {
1285 current_backend = LDAP_CIRCLEQ_FIRST( &backend );
1286 LDAP_CIRCLEQ_FOREACH ( b, &backend, b_next ) {
1287 event = evtimer_new( daemon_base, backend_connect, b );
1288 if ( !event ) {
1289 Debug( LDAP_DEBUG_ANY, "lloadd: "
1290 "failed to allocate retry event\n" );
1291 return -1;
1292 }
1293
1294 checked_lock( &b->b_mutex );
1295 b->b_retry_event = event;
1296 backend_retry( b );
1297 checked_unlock( &b->b_mutex );
1298 }
1299 }
1300
1301 event = evtimer_new( daemon_base, operations_timeout, event_self_cbarg() );
1302 if ( !event ) {
1303 Debug( LDAP_DEBUG_ANY, "lloadd: "
1304 "failed to allocate timeout event\n" );
1305 return -1;
1306 }
1307 lload_timeout_event = event;
1308
1309 /* TODO: should we just add it with any timeout and re-add when the timeout
1310 * changes? */
1311 if ( lload_timeout_api ) {
1312 event_add( event, lload_timeout_api );
1313 }
1314
1315 checked_lock( &lload_wait_mutex );
1316 lloadd_inited = 1;
1317 ldap_pvt_thread_cond_signal( &lload_wait_cond );
1318 checked_unlock( &lload_wait_mutex );
1319 #if !defined(BALANCER_MODULE) && defined(HAVE_SYSTEMD)
1320 rc = sd_notify( 1, "READY=1" );
1321 if ( rc < 0 ) {
1322 Debug( LDAP_DEBUG_ANY, "lloadd startup: "
1323 "systemd sd_notify failed (%d)\n", rc );
1324 }
1325 #endif /* !BALANCER_MODULE && HAVE_SYSTEMD */
1326
1327 rc = event_base_dispatch( daemon_base );
1328 Debug( LDAP_DEBUG_ANY, "lloadd shutdown: "
1329 "Main event loop finished: rc=%d\n",
1330 rc );
1331
1332 /* shutdown */
1333 event_base_loopexit( listener_base, 0 );
1334
1335 /* wait for the listener threads to complete */
1336 destroy_listeners();
1337
1338 /* Mark upstream connections closing and prevent from opening new ones */
1339 LDAP_CIRCLEQ_FOREACH ( b, &backend, b_next ) {
1340 epoch_t epoch = epoch_join();
1341
1342 checked_lock( &b->b_mutex );
1343 b->b_numconns = b->b_numbindconns = 0;
1344 backend_reset( b, 1 );
1345 checked_unlock( &b->b_mutex );
1346
1347 epoch_leave( epoch );
1348 }
1349
1350 /* Do the same for clients */
1351 clients_destroy( 1 );
1352
1353 for ( i = 0; i < lload_daemon_threads; i++ ) {
1354 /*
1355 * https://github.com/libevent/libevent/issues/623
1356 * deleting the event doesn't notify the base, just activate it and
1357 * let it delete itself
1358 */
1359 event_active( lload_daemon[i].wakeup_event, EV_READ, 0 );
1360 }
1361
1362 for ( i = 0; i < lload_daemon_threads; i++ ) {
1363 ldap_pvt_thread_join( daemon_tid[i], (void *)NULL );
1364 }
1365
1366 #ifndef BALANCER_MODULE
1367 if ( LogTest( LDAP_DEBUG_ANY ) ) {
1368 int t = ldap_pvt_thread_pool_backload( &connection_pool );
1369 Debug( LDAP_DEBUG_ANY, "lloadd shutdown: "
1370 "waiting for %d operations/tasks to finish\n",
1371 t );
1372 }
1373 ldap_pvt_thread_pool_close( &connection_pool, 1 );
1374 #endif
1375
1376 lload_backends_destroy();
1377 clients_destroy( 0 );
1378 lload_bindconf_free( &bindconf );
1379 evdns_base_free( dnsbase, 0 );
1380
1381 ch_free( daemon_tid );
1382 daemon_tid = NULL;
1383
1384 lloadd_daemon_destroy();
1385
1386 /* If we're a slapd module, let the thread that initiated the shut down
1387 * know we've finished */
1388 checked_lock( &lload_wait_mutex );
1389 ldap_pvt_thread_cond_signal( &lload_wait_cond );
1390 checked_unlock( &lload_wait_mutex );
1391
1392 return 0;
1393 }
1394
1395 static void
daemon_wakeup_cb(evutil_socket_t sig,short what,void * arg)1396 daemon_wakeup_cb( evutil_socket_t sig, short what, void *arg )
1397 {
1398 int tid = (ldap_pvt_thread_t *)arg - daemon_tid;
1399
1400 Debug( LDAP_DEBUG_TRACE, "daemon_wakeup_cb: "
1401 "Daemon thread %d woken up\n",
1402 tid );
1403 event_del( lload_daemon[tid].wakeup_event );
1404 }
1405
1406 LloadChange lload_change = { .type = LLOAD_CHANGE_UNDEFINED };
1407
1408 #ifdef BALANCER_MODULE
1409 int
backend_conn_cb(ldap_pvt_thread_start_t * start,void * startarg,void * arg)1410 backend_conn_cb( ldap_pvt_thread_start_t *start, void *startarg, void *arg )
1411 {
1412 LloadConnection *c = startarg;
1413 LloadBackend *b = arg;
1414
1415 if ( b == NULL || c->c_backend == b ) {
1416 CONNECTION_LOCK_DESTROY(c);
1417 return 1;
1418 }
1419 return 0;
1420 }
1421
1422 #ifdef HAVE_TLS
1423 int
client_tls_cb(ldap_pvt_thread_start_t * start,void * startarg,void * arg)1424 client_tls_cb( ldap_pvt_thread_start_t *start, void *startarg, void *arg )
1425 {
1426 LloadConnection *c = startarg;
1427
1428 if ( c->c_destroy == client_destroy &&
1429 c->c_is_tls == LLOAD_TLS_ESTABLISHED ) {
1430 CONNECTION_LOCK_DESTROY(c);
1431 return 1;
1432 }
1433 return 0;
1434 }
1435 #endif /* HAVE_TLS */
1436
1437 void
lload_handle_backend_invalidation(LloadChange * change)1438 lload_handle_backend_invalidation( LloadChange *change )
1439 {
1440 LloadBackend *b = change->target;
1441
1442 assert( change->object == LLOAD_BACKEND );
1443
1444 if ( change->type == LLOAD_CHANGE_ADD ) {
1445 BackendInfo *mi = backend_info( "monitor" );
1446
1447 if ( mi ) {
1448 monitor_extra_t *mbe = mi->bi_extra;
1449 if ( mbe->is_configured() ) {
1450 lload_monitor_backend_init( mi, b );
1451 }
1452 }
1453
1454 if ( !current_backend ) {
1455 current_backend = b;
1456 }
1457 checked_lock( &b->b_mutex );
1458 backend_retry( b );
1459 checked_unlock( &b->b_mutex );
1460 return;
1461 } else if ( change->type == LLOAD_CHANGE_DEL ) {
1462 ldap_pvt_thread_pool_walk(
1463 &connection_pool, handle_pdus, backend_conn_cb, b );
1464 ldap_pvt_thread_pool_walk(
1465 &connection_pool, upstream_bind, backend_conn_cb, b );
1466 lload_backend_destroy( b );
1467 return;
1468 }
1469 assert( change->type == LLOAD_CHANGE_MODIFY );
1470
1471 /*
1472 * A change that can't be handled gracefully, terminate all connections and
1473 * start over.
1474 */
1475 if ( change->flags.backend & LLOAD_BACKEND_MOD_OTHER ) {
1476 ldap_pvt_thread_pool_walk(
1477 &connection_pool, handle_pdus, backend_conn_cb, b );
1478 ldap_pvt_thread_pool_walk(
1479 &connection_pool, upstream_bind, backend_conn_cb, b );
1480 checked_lock( &b->b_mutex );
1481 backend_reset( b, 0 );
1482 backend_retry( b );
1483 checked_unlock( &b->b_mutex );
1484 return;
1485 }
1486
1487 /*
1488 * Handle changes to number of connections:
1489 * - a change might get the connection limit above the pool size:
1490 * - consider closing (in order of priority?):
1491 * - connections awaiting connect() completion
1492 * - connections currently preparing
1493 * - bind connections over limit (which is 0 if 'feature vc' is on
1494 * - regular connections over limit
1495 * - below pool size
1496 * - call backend_retry if there are no opening connections
1497 * - one pool size above and one below the configured size
1498 * - still close the ones above limit, it should sort itself out
1499 * the only issue is if a closing connection isn't guaranteed to do
1500 * that at some point
1501 */
1502 if ( change->flags.backend & LLOAD_BACKEND_MOD_CONNS ) {
1503 int bind_requested = 0, need_close = 0, need_open = 0;
1504 LloadConnection *c;
1505
1506 bind_requested =
1507 #ifdef LDAP_API_FEATURE_VERIFY_CREDENTIALS
1508 (lload_features & LLOAD_FEATURE_VC) ? 0 :
1509 #endif /* LDAP_API_FEATURE_VERIFY_CREDENTIALS */
1510 b->b_numbindconns;
1511
1512 if ( b->b_bindavail > bind_requested ) {
1513 need_close += b->b_bindavail - bind_requested;
1514 } else if ( b->b_bindavail < bind_requested ) {
1515 need_open = 1;
1516 }
1517
1518 if ( b->b_active > b->b_numconns ) {
1519 need_close += b->b_active - b->b_numconns;
1520 } else if ( b->b_active < b->b_numconns ) {
1521 need_open = 1;
1522 }
1523
1524 if ( !need_open ) {
1525 need_close += b->b_opening;
1526
1527 while ( !LDAP_LIST_EMPTY( &b->b_connecting ) ) {
1528 LloadPendingConnection *p = LDAP_LIST_FIRST( &b->b_connecting );
1529
1530 LDAP_LIST_REMOVE( p, next );
1531 event_free( p->event );
1532 evutil_closesocket( p->fd );
1533 ch_free( p );
1534 b->b_opening--;
1535 need_close--;
1536 }
1537 }
1538
1539 if ( need_close || !need_open ) {
1540 /* It might be too late to repurpose a preparing connection, just
1541 * close them all */
1542 while ( !LDAP_CIRCLEQ_EMPTY( &b->b_preparing ) ) {
1543 c = LDAP_CIRCLEQ_FIRST( &b->b_preparing );
1544
1545 event_del( c->c_read_event );
1546 CONNECTION_LOCK_DESTROY(c);
1547 assert( c == NULL );
1548 b->b_opening--;
1549 need_close--;
1550 }
1551 if ( event_pending( b->b_retry_event, EV_TIMEOUT, NULL ) ) {
1552 event_del( b->b_retry_event );
1553 b->b_opening--;
1554 }
1555 assert( b->b_opening == 0 );
1556 }
1557
1558 if ( b->b_bindavail > bind_requested ) {
1559 int diff = b->b_bindavail - bind_requested;
1560
1561 assert( need_close >= diff );
1562
1563 LDAP_CIRCLEQ_FOREACH ( c, &b->b_bindconns, c_next ) {
1564 int gentle = 1;
1565
1566 lload_connection_close( c, &gentle );
1567 need_close--;
1568 diff--;
1569 if ( !diff ) {
1570 break;
1571 }
1572 }
1573 assert( diff == 0 );
1574 }
1575
1576 if ( b->b_active > b->b_numconns ) {
1577 int diff = b->b_active - b->b_numconns;
1578
1579 assert( need_close >= diff );
1580
1581 LDAP_CIRCLEQ_FOREACH ( c, &b->b_conns, c_next ) {
1582 int gentle = 1;
1583
1584 lload_connection_close( c, &gentle );
1585 need_close--;
1586 diff--;
1587 if ( !diff ) {
1588 break;
1589 }
1590 }
1591 assert( diff == 0 );
1592 }
1593 assert( need_close == 0 );
1594
1595 if ( need_open ) {
1596 checked_lock( &b->b_mutex );
1597 backend_retry( b );
1598 checked_unlock( &b->b_mutex );
1599 }
1600 }
1601 }
1602
1603 void
lload_handle_global_invalidation(LloadChange * change)1604 lload_handle_global_invalidation( LloadChange *change )
1605 {
1606 assert( change->type == LLOAD_CHANGE_MODIFY );
1607 assert( change->object == LLOAD_DAEMON );
1608
1609 if ( change->flags.daemon & LLOAD_DAEMON_MOD_THREADS ) {
1610 /* walk the task queue to remove any tasks belonging to us. */
1611 /* TODO: initiate a full module restart, everything will fall into
1612 * place at that point */
1613 ldap_pvt_thread_pool_walk(
1614 &connection_pool, handle_pdus, backend_conn_cb, NULL );
1615 ldap_pvt_thread_pool_walk(
1616 &connection_pool, upstream_bind, backend_conn_cb, NULL );
1617 assert(0);
1618 return;
1619 }
1620
1621 if ( change->flags.daemon & LLOAD_DAEMON_MOD_FEATURES ) {
1622 lload_features_t feature_diff =
1623 lload_features ^ ( ~(uintptr_t)change->target );
1624 /* Feature change handling:
1625 * - VC (TODO):
1626 * - on: terminate all bind connections
1627 * - off: cancel all bind operations in progress, reopen bind connections
1628 * - ProxyAuthz:
1629 * - on: nothing needed
1630 * - off: clear c_auth/privileged on each client
1631 * - read pause (WIP):
1632 * - nothing needed?
1633 */
1634
1635 assert( change->target );
1636 if ( feature_diff & LLOAD_FEATURE_VC ) {
1637 assert(0);
1638 feature_diff &= ~LLOAD_FEATURE_VC;
1639 }
1640 if ( feature_diff & LLOAD_FEATURE_PAUSE ) {
1641 feature_diff &= ~LLOAD_FEATURE_PAUSE;
1642 }
1643 if ( feature_diff & LLOAD_FEATURE_PROXYAUTHZ ) {
1644 if ( !(lload_features & LLOAD_FEATURE_PROXYAUTHZ) ) {
1645 LloadConnection *c;
1646 /* We switched proxyauthz off */
1647 LDAP_CIRCLEQ_FOREACH ( c, &clients, c_next ) {
1648 if ( !BER_BVISNULL( &c->c_auth ) ) {
1649 ber_memfree( c->c_auth.bv_val );
1650 BER_BVZERO( &c->c_auth );
1651 }
1652 if ( c->c_type == LLOAD_C_PRIVILEGED ) {
1653 c->c_type = LLOAD_C_OPEN;
1654 }
1655 }
1656 }
1657 feature_diff &= ~LLOAD_FEATURE_PROXYAUTHZ;
1658 }
1659 assert( !feature_diff );
1660 }
1661
1662 #ifdef HAVE_TLS
1663 if ( change->flags.daemon & LLOAD_DAEMON_MOD_TLS ) {
1664 /* terminate all clients with TLS set up */
1665 ldap_pvt_thread_pool_walk(
1666 &connection_pool, handle_pdus, client_tls_cb, NULL );
1667 if ( !LDAP_CIRCLEQ_EMPTY( &clients ) ) {
1668 LloadConnection *c = LDAP_CIRCLEQ_FIRST( &clients );
1669 unsigned long first_connid = c->c_connid;
1670
1671 while ( c ) {
1672 LloadConnection *next =
1673 LDAP_CIRCLEQ_LOOP_NEXT( &clients, c, c_next );
1674 if ( c->c_is_tls ) {
1675 CONNECTION_LOCK_DESTROY(c);
1676 assert( c == NULL );
1677 }
1678 c = next;
1679 if ( c->c_connid <= first_connid ) {
1680 c = NULL;
1681 }
1682 }
1683 }
1684 }
1685 #endif /* HAVE_TLS */
1686
1687 if ( change->flags.daemon & LLOAD_DAEMON_MOD_BINDCONF ) {
1688 LloadBackend *b;
1689 LloadConnection *c;
1690
1691 /*
1692 * Only timeout changes can be handled gracefully, terminate all
1693 * connections and start over.
1694 */
1695 ldap_pvt_thread_pool_walk(
1696 &connection_pool, handle_pdus, backend_conn_cb, NULL );
1697 ldap_pvt_thread_pool_walk(
1698 &connection_pool, upstream_bind, backend_conn_cb, NULL );
1699
1700 LDAP_CIRCLEQ_FOREACH ( b, &backend, b_next ) {
1701 checked_lock( &b->b_mutex );
1702 backend_reset( b, 0 );
1703 backend_retry( b );
1704 checked_unlock( &b->b_mutex );
1705 }
1706
1707 /* Reconsider the PRIVILEGED flag on all clients */
1708 LDAP_CIRCLEQ_FOREACH ( c, &clients, c_next ) {
1709 int privileged = ber_bvstrcasecmp( &c->c_auth, &lloadd_identity );
1710
1711 /* We have just terminated all pending operations (even pins), there
1712 * should be no connections still binding/closing */
1713 assert( c->c_state == LLOAD_C_READY );
1714
1715 c->c_type = privileged ? LLOAD_C_PRIVILEGED : LLOAD_C_OPEN;
1716 }
1717 }
1718 }
1719
1720 int
lload_handle_invalidation(LloadChange * change)1721 lload_handle_invalidation( LloadChange *change )
1722 {
1723 if ( (change->type == LLOAD_CHANGE_MODIFY) &&
1724 change->flags.generic == 0 ) {
1725 Debug( LDAP_DEBUG_ANY, "lload_handle_invalidation: "
1726 "a modify where apparently nothing changed\n" );
1727 }
1728
1729 switch ( change->object ) {
1730 case LLOAD_BACKEND:
1731 lload_handle_backend_invalidation( change );
1732 break;
1733 case LLOAD_DAEMON:
1734 lload_handle_global_invalidation( change );
1735 break;
1736 default:
1737 Debug( LDAP_DEBUG_ANY, "lload_handle_invalidation: "
1738 "unrecognised change\n" );
1739 assert(0);
1740 }
1741
1742 return LDAP_SUCCESS;
1743 }
1744
1745 static void
lload_pause_event_cb(evutil_socket_t s,short what,void * arg)1746 lload_pause_event_cb( evutil_socket_t s, short what, void *arg )
1747 {
1748 /*
1749 * We are pausing, signal the pausing thread we've finished and
1750 * wait until the thread pool resumes operation.
1751 *
1752 * Do this in lockstep with the pausing thread.
1753 */
1754 checked_lock( &lload_wait_mutex );
1755 ldap_pvt_thread_cond_signal( &lload_wait_cond );
1756
1757 /* Now wait until we unpause, then we can resume operation */
1758 ldap_pvt_thread_cond_wait( &lload_pause_cond, &lload_wait_mutex );
1759 checked_unlock( &lload_wait_mutex );
1760 }
1761
1762 /*
1763 * Signal the event base to terminate processing as soon as it can and wait for
1764 * lload_pause_event_cb to notify us this has happened.
1765 */
1766 static int
lload_pause_base(struct event_base * base)1767 lload_pause_base( struct event_base *base )
1768 {
1769 int rc;
1770
1771 checked_lock( &lload_wait_mutex );
1772 event_base_once( base, -1, EV_TIMEOUT, lload_pause_event_cb, base, NULL );
1773 rc = ldap_pvt_thread_cond_wait( &lload_wait_cond, &lload_wait_mutex );
1774 checked_unlock( &lload_wait_mutex );
1775
1776 return rc;
1777 }
1778
1779 void
lload_pause_server(void)1780 lload_pause_server( void )
1781 {
1782 LloadChange ch = { .type = LLOAD_CHANGE_UNDEFINED };
1783 int i;
1784
1785 lload_pause_base( listener_base );
1786 lload_pause_base( daemon_base );
1787
1788 for ( i = 0; i < lload_daemon_threads; i++ ) {
1789 lload_pause_base( lload_daemon[i].base );
1790 }
1791
1792 lload_change = ch;
1793 }
1794
1795 void
lload_unpause_server(void)1796 lload_unpause_server( void )
1797 {
1798 if ( lload_change.type != LLOAD_CHANGE_UNDEFINED ) {
1799 lload_handle_invalidation( &lload_change );
1800 }
1801
1802 /*
1803 * Make sure lloadd is completely ready to unpause by now:
1804 *
1805 * After the broadcast, we handle I/O and begin filling the thread pool, in
1806 * high load conditions, we might hit the pool limits and start processing
1807 * operations in the I/O threads (one PDU per socket at a time for fairness
1808 * sake) even before a pause has finished from slapd's point of view!
1809 *
1810 * When (max_pdus_per_cycle == 0) we don't use the pool for these at all and
1811 * most lload processing starts immediately making this even more prominent.
1812 */
1813 ldap_pvt_thread_cond_broadcast( &lload_pause_cond );
1814 }
1815 #endif /* BALANCER_MODULE */
1816
1817 void
lload_sig_shutdown(evutil_socket_t sig,short what,void * arg)1818 lload_sig_shutdown( evutil_socket_t sig, short what, void *arg )
1819 {
1820 struct event_base *daemon_base = arg;
1821 int save_errno = errno;
1822 int i;
1823
1824 /*
1825 * If the NT Service Manager is controlling the server, we don't
1826 * want SIGBREAK to kill the server. For some strange reason,
1827 * SIGBREAK is generated when a user logs out.
1828 */
1829
1830 #if defined(HAVE_NT_SERVICE_MANAGER) && defined(SIGBREAK)
1831 if ( is_NT_Service && sig == SIGBREAK ) {
1832 /* empty */;
1833 } else
1834 #endif /* HAVE_NT_SERVICE_MANAGER && SIGBREAK */
1835 #ifdef SIGHUP
1836 if ( sig == SIGHUP && global_gentlehup && slapd_gentle_shutdown == 0 ) {
1837 slapd_gentle_shutdown = 1;
1838 } else
1839 #endif /* SIGHUP */
1840 {
1841 slapd_shutdown = 1;
1842 }
1843
1844 for ( i = 0; i < lload_daemon_threads; i++ ) {
1845 event_base_loopexit( lload_daemon[i].base, NULL );
1846 }
1847 event_base_loopexit( daemon_base, NULL );
1848
1849 errno = save_errno;
1850 }
1851
1852 struct event_base *
lload_get_base(ber_socket_t s)1853 lload_get_base( ber_socket_t s )
1854 {
1855 int tid = DAEMON_ID(s);
1856 return lload_daemon[tid].base;
1857 }
1858
1859 LloadListener **
lloadd_get_listeners(void)1860 lloadd_get_listeners( void )
1861 {
1862 /* Could return array with no listeners if !listening, but current
1863 * callers mostly look at the URLs. E.g. syncrepl uses this to
1864 * identify the server, which means it wants the startup arguments.
1865 */
1866 return lload_listeners;
1867 }
1868
1869 /* Reject all incoming requests */
1870 void
lload_suspend_listeners(void)1871 lload_suspend_listeners( void )
1872 {
1873 int i;
1874 for ( i = 0; lload_listeners[i]; i++ ) {
1875 lload_listeners[i]->sl_mute = 1;
1876 evconnlistener_disable( lload_listeners[i]->listener );
1877 listen( lload_listeners[i]->sl_sd, 0 );
1878 }
1879 }
1880
1881 /* Resume after a suspend */
1882 void
lload_resume_listeners(void)1883 lload_resume_listeners( void )
1884 {
1885 int i;
1886 for ( i = 0; lload_listeners[i]; i++ ) {
1887 lload_listeners[i]->sl_mute = 0;
1888 listen( lload_listeners[i]->sl_sd, SLAPD_LISTEN_BACKLOG );
1889 evconnlistener_enable( lload_listeners[i]->listener );
1890 }
1891 }
1892