xref: /netbsd-src/external/bsd/openldap/dist/libraries/libldap/tpool.c (revision 549b59ed3ccf0d36d3097190a0db27b770f3a839)
1 /*	$NetBSD: tpool.c,v 1.2 2021/08/14 16:14:56 christos Exp $	*/
2 
3 /* $OpenLDAP$ */
4 /* This work is part of OpenLDAP Software <http://www.openldap.org/>.
5  *
6  * Copyright 1998-2021 The OpenLDAP Foundation.
7  * All rights reserved.
8  *
9  * Redistribution and use in source and binary forms, with or without
10  * modification, are permitted only as authorized by the OpenLDAP
11  * Public License.
12  *
13  * A copy of this license is available in file LICENSE in the
14  * top-level directory of the distribution or, alternatively, at
15  * <http://www.OpenLDAP.org/license.html>.
16  */
17 
18 #include <sys/cdefs.h>
19 __RCSID("$NetBSD: tpool.c,v 1.2 2021/08/14 16:14:56 christos Exp $");
20 
21 #include "portable.h"
22 
23 #include <stdio.h>
24 
25 #include <ac/signal.h>
26 #include <ac/stdarg.h>
27 #include <ac/stdlib.h>
28 #include <ac/string.h>
29 #include <ac/time.h>
30 #include <ac/errno.h>
31 
32 #include "ldap-int.h"
33 
34 #ifdef LDAP_R_COMPILE
35 
36 #include "ldap_pvt_thread.h" /* Get the thread interface */
37 #include "ldap_queue.h"
38 #define LDAP_THREAD_POOL_IMPLEMENTATION
39 #include "ldap_thr_debug.h"  /* May rename symbols defined below */
40 
41 #ifndef LDAP_THREAD_HAVE_TPOOL
42 
43 #ifndef CACHELINE
44 #define CACHELINE	64
45 #endif
46 
47 /* Thread-specific key with data and optional free function */
48 typedef struct ldap_int_tpool_key_s {
49 	void *ltk_key;
50 	void *ltk_data;
51 	ldap_pvt_thread_pool_keyfree_t *ltk_free;
52 } ldap_int_tpool_key_t;
53 
54 /* Max number of thread-specific keys we store per thread.
55  * We don't expect to use many...
56  */
57 #define	MAXKEYS	32
58 
59 /* Max number of threads */
60 #define	LDAP_MAXTHR	1024	/* must be a power of 2 */
61 
62 /* (Theoretical) max number of pending requests */
63 #define MAX_PENDING (INT_MAX/2)	/* INT_MAX - (room to avoid overflow) */
64 
65 /* pool->ltp_pause values */
66 enum { NOT_PAUSED = 0, WANT_PAUSE = 1, PAUSED = 2 };
67 
68 /* Context: thread ID and thread-specific key/data pairs */
69 typedef struct ldap_int_thread_userctx_s {
70 	struct ldap_int_thread_poolq_s *ltu_pq;
71 	ldap_pvt_thread_t ltu_id;
72 	ldap_int_tpool_key_t ltu_key[MAXKEYS];
73 } ldap_int_thread_userctx_t;
74 
75 
76 /* Simple {thread ID -> context} hash table; key=ctx->ltu_id.
77  * Protected by ldap_pvt_thread_pool_mutex.
78  */
79 static struct {
80 	ldap_int_thread_userctx_t *ctx;
81 	/* ctx is valid when not NULL or DELETED_THREAD_CTX */
82 #	define DELETED_THREAD_CTX (&ldap_int_main_thrctx + 1) /* dummy addr */
83 } thread_keys[LDAP_MAXTHR];
84 
85 #define	TID_HASH(tid, hash) do { \
86 	unsigned const char *ptr_ = (unsigned const char *)&(tid); \
87 	unsigned i_; \
88 	for (i_ = 0, (hash) = ptr_[0]; ++i_ < sizeof(tid);) \
89 		(hash) += ((hash) << 5) ^ ptr_[i_]; \
90 } while(0)
91 
92 
93 /* Task for a thread to perform */
94 typedef struct ldap_int_thread_task_s {
95 	union {
96 		LDAP_STAILQ_ENTRY(ldap_int_thread_task_s) q;
97 		LDAP_SLIST_ENTRY(ldap_int_thread_task_s) l;
98 	} ltt_next;
99 	ldap_pvt_thread_start_t *ltt_start_routine;
100 	void *ltt_arg;
101 	struct ldap_int_thread_poolq_s *ltt_queue;
102 } ldap_int_thread_task_t;
103 
104 typedef LDAP_STAILQ_HEAD(tcq, ldap_int_thread_task_s) ldap_int_tpool_plist_t;
105 
106 struct ldap_int_thread_poolq_s {
107 	void *ltp_free;
108 
109 	struct ldap_int_thread_pool_s *ltp_pool;
110 
111 	/* protect members below */
112 	ldap_pvt_thread_mutex_t ltp_mutex;
113 
114 	/* not paused and something to do for pool_<wrapper/pause/destroy>()
115 	 * Used for normal pool operation, to synch between submitter and
116 	 * worker threads. Not used for pauses. In normal operation multiple
117 	 * queues can rendezvous without acquiring the main pool lock.
118 	 */
119 	ldap_pvt_thread_cond_t ltp_cond;
120 
121 	/* ltp_pause == 0 ? &ltp_pending_list : &empty_pending_list,
122 	 * maintained to reduce work for pool_wrapper()
123 	 */
124 	ldap_int_tpool_plist_t *ltp_work_list;
125 
126 	/* pending tasks, and unused task objects */
127 	ldap_int_tpool_plist_t ltp_pending_list;
128 	LDAP_SLIST_HEAD(tcl, ldap_int_thread_task_s) ltp_free_list;
129 
130 	/* Max number of threads in this queue */
131 	int ltp_max_count;
132 
133 	/* Max pending + paused + idle tasks, negated when ltp_finishing */
134 	int ltp_max_pending;
135 
136 	int ltp_pending_count;		/* Pending + paused + idle tasks */
137 	int ltp_active_count;		/* Active, not paused/idle tasks */
138 	int ltp_open_count;			/* Number of threads */
139 	int ltp_starting;			/* Currently starting threads */
140 };
141 
142 struct ldap_int_thread_pool_s {
143 	LDAP_STAILQ_ENTRY(ldap_int_thread_pool_s) ltp_next;
144 
145 	struct ldap_int_thread_poolq_s **ltp_wqs;
146 
147 	/* number of poolqs */
148 	int ltp_numqs;
149 
150 	/* protect members below */
151 	ldap_pvt_thread_mutex_t ltp_mutex;
152 
153 	/* paused and waiting for resume
154 	 * When a pause is in effect all workers switch to waiting on
155 	 * this cond instead of their per-queue cond.
156 	 */
157 	ldap_pvt_thread_cond_t ltp_cond;
158 
159 	/* ltp_active_queues < 1 && ltp_pause */
160 	ldap_pvt_thread_cond_t ltp_pcond;
161 
162 	/* number of active queues */
163 	int ltp_active_queues;
164 
165 	/* The pool is finishing, waiting for its threads to close.
166 	 * They close when ltp_pending_list is done.  pool_submit()
167 	 * rejects new tasks.  ltp_max_pending = -(its old value).
168 	 */
169 	int ltp_finishing;
170 
171 	/* Some active task needs to be the sole active task.
172 	 * Atomic variable so ldap_pvt_thread_pool_pausing() can read it.
173 	 */
174 	volatile sig_atomic_t ltp_pause;
175 
176 	/* Max number of threads in pool */
177 	int ltp_max_count;
178 
179 	/* Configured max number of threads in pool, 0 for default (LDAP_MAXTHR) */
180 	int ltp_conf_max_count;
181 
182 	/* Max pending + paused + idle tasks, negated when ltp_finishing */
183 	int ltp_max_pending;
184 };
185 
186 static ldap_int_tpool_plist_t empty_pending_list =
187 	LDAP_STAILQ_HEAD_INITIALIZER(empty_pending_list);
188 
189 static int ldap_int_has_thread_pool = 0;
190 static LDAP_STAILQ_HEAD(tpq, ldap_int_thread_pool_s)
191 	ldap_int_thread_pool_list =
192 	LDAP_STAILQ_HEAD_INITIALIZER(ldap_int_thread_pool_list);
193 
194 static ldap_pvt_thread_mutex_t ldap_pvt_thread_pool_mutex;
195 
196 static void *ldap_int_thread_pool_wrapper( void *pool );
197 
198 static ldap_pvt_thread_key_t	ldap_tpool_key;
199 
200 /* Context of the main thread */
201 static ldap_int_thread_userctx_t ldap_int_main_thrctx;
202 
203 int
ldap_int_thread_pool_startup(void)204 ldap_int_thread_pool_startup ( void )
205 {
206 	ldap_int_main_thrctx.ltu_id = ldap_pvt_thread_self();
207 	ldap_pvt_thread_key_create( &ldap_tpool_key );
208 	return ldap_pvt_thread_mutex_init(&ldap_pvt_thread_pool_mutex);
209 }
210 
211 int
ldap_int_thread_pool_shutdown(void)212 ldap_int_thread_pool_shutdown ( void )
213 {
214 	struct ldap_int_thread_pool_s *pool;
215 
216 	while ((pool = LDAP_STAILQ_FIRST(&ldap_int_thread_pool_list)) != NULL) {
217 		(ldap_pvt_thread_pool_destroy)(&pool, 0); /* ignore thr_debug macro */
218 	}
219 	ldap_pvt_thread_mutex_destroy(&ldap_pvt_thread_pool_mutex);
220 	ldap_pvt_thread_key_destroy( ldap_tpool_key );
221 	return(0);
222 }
223 
224 
225 /* Create a thread pool */
226 int
ldap_pvt_thread_pool_init_q(ldap_pvt_thread_pool_t * tpool,int max_threads,int max_pending,int numqs)227 ldap_pvt_thread_pool_init_q (
228 	ldap_pvt_thread_pool_t *tpool,
229 	int max_threads,
230 	int max_pending,
231 	int numqs )
232 {
233 	ldap_pvt_thread_pool_t pool;
234 	struct ldap_int_thread_poolq_s *pq;
235 	int i, rc, rem_thr, rem_pend;
236 
237 	/* multiple pools are currently not supported (ITS#4943) */
238 	assert(!ldap_int_has_thread_pool);
239 
240 	if (! (0 <= max_threads && max_threads <= LDAP_MAXTHR))
241 		max_threads = 0;
242 	if (! (1 <= max_pending && max_pending <= MAX_PENDING))
243 		max_pending = MAX_PENDING;
244 
245 	*tpool = NULL;
246 	pool = (ldap_pvt_thread_pool_t) LDAP_CALLOC(1,
247 		sizeof(struct ldap_int_thread_pool_s));
248 
249 	if (pool == NULL) return(-1);
250 
251 	pool->ltp_wqs = LDAP_MALLOC(numqs * sizeof(struct ldap_int_thread_poolq_s *));
252 	if (pool->ltp_wqs == NULL) {
253 		LDAP_FREE(pool);
254 		return(-1);
255 	}
256 
257 	for (i=0; i<numqs; i++) {
258 		char *ptr = LDAP_CALLOC(1, sizeof(struct ldap_int_thread_poolq_s) + CACHELINE-1);
259 		if (ptr == NULL) {
260 			for (--i; i>=0; i--)
261 				LDAP_FREE(pool->ltp_wqs[i]->ltp_free);
262 			LDAP_FREE(pool->ltp_wqs);
263 			LDAP_FREE(pool);
264 			return(-1);
265 		}
266 		pool->ltp_wqs[i] = (struct ldap_int_thread_poolq_s *)(((size_t)ptr + CACHELINE-1) & ~(CACHELINE-1));
267 		pool->ltp_wqs[i]->ltp_free = ptr;
268 	}
269 
270 	pool->ltp_numqs = numqs;
271 	pool->ltp_conf_max_count = max_threads;
272 	if ( !max_threads )
273 		max_threads = LDAP_MAXTHR;
274 
275 	rc = ldap_pvt_thread_mutex_init(&pool->ltp_mutex);
276 	if (rc != 0) {
277 fail:
278 		for (i=0; i<numqs; i++)
279 			LDAP_FREE(pool->ltp_wqs[i]->ltp_free);
280 		LDAP_FREE(pool->ltp_wqs);
281 		LDAP_FREE(pool);
282 		return(rc);
283 	}
284 
285 	rc = ldap_pvt_thread_cond_init(&pool->ltp_cond);
286 	if (rc != 0)
287 		goto fail;
288 
289 	rc = ldap_pvt_thread_cond_init(&pool->ltp_pcond);
290 	if (rc != 0)
291 		goto fail;
292 
293 	rem_thr = max_threads % numqs;
294 	rem_pend = max_pending % numqs;
295 	for ( i=0; i<numqs; i++ ) {
296 		pq = pool->ltp_wqs[i];
297 		pq->ltp_pool = pool;
298 		rc = ldap_pvt_thread_mutex_init(&pq->ltp_mutex);
299 		if (rc != 0)
300 			return(rc);
301 		rc = ldap_pvt_thread_cond_init(&pq->ltp_cond);
302 		if (rc != 0)
303 			return(rc);
304 		LDAP_STAILQ_INIT(&pq->ltp_pending_list);
305 		pq->ltp_work_list = &pq->ltp_pending_list;
306 		LDAP_SLIST_INIT(&pq->ltp_free_list);
307 
308 		pq->ltp_max_count = max_threads / numqs;
309 		if ( rem_thr ) {
310 			pq->ltp_max_count++;
311 			rem_thr--;
312 		}
313 		pq->ltp_max_pending = max_pending / numqs;
314 		if ( rem_pend ) {
315 			pq->ltp_max_pending++;
316 			rem_pend--;
317 		}
318 	}
319 
320 	ldap_int_has_thread_pool = 1;
321 
322 	pool->ltp_max_count = max_threads;
323 	pool->ltp_max_pending = max_pending;
324 
325 	ldap_pvt_thread_mutex_lock(&ldap_pvt_thread_pool_mutex);
326 	LDAP_STAILQ_INSERT_TAIL(&ldap_int_thread_pool_list, pool, ltp_next);
327 	ldap_pvt_thread_mutex_unlock(&ldap_pvt_thread_pool_mutex);
328 
329 	/* Start no threads just yet.  That can break if the process forks
330 	 * later, as slapd does in order to daemonize.  On at least POSIX,
331 	 * only the forking thread would survive in the child.  Yet fork()
332 	 * can't unlock/clean up other threads' locks and data structures,
333 	 * unless pthread_atfork() handlers have been set up to do so.
334 	 */
335 
336 	*tpool = pool;
337 	return(0);
338 }
339 
340 int
ldap_pvt_thread_pool_init(ldap_pvt_thread_pool_t * tpool,int max_threads,int max_pending)341 ldap_pvt_thread_pool_init (
342 	ldap_pvt_thread_pool_t *tpool,
343 	int max_threads,
344 	int max_pending )
345 {
346 	return ldap_pvt_thread_pool_init_q( tpool, max_threads, max_pending, 1 );
347 }
348 
349 /* Submit a task to be performed by the thread pool */
350 int
ldap_pvt_thread_pool_submit(ldap_pvt_thread_pool_t * tpool,ldap_pvt_thread_start_t * start_routine,void * arg)351 ldap_pvt_thread_pool_submit (
352 	ldap_pvt_thread_pool_t *tpool,
353 	ldap_pvt_thread_start_t *start_routine, void *arg )
354 {
355 	return ldap_pvt_thread_pool_submit2( tpool, start_routine, arg, NULL );
356 }
357 
358 /* Submit a task to be performed by the thread pool */
359 int
ldap_pvt_thread_pool_submit2(ldap_pvt_thread_pool_t * tpool,ldap_pvt_thread_start_t * start_routine,void * arg,void ** cookie)360 ldap_pvt_thread_pool_submit2 (
361 	ldap_pvt_thread_pool_t *tpool,
362 	ldap_pvt_thread_start_t *start_routine, void *arg,
363 	void **cookie )
364 {
365 	struct ldap_int_thread_pool_s *pool;
366 	struct ldap_int_thread_poolq_s *pq;
367 	ldap_int_thread_task_t *task;
368 	ldap_pvt_thread_t thr;
369 	int i, j;
370 
371 	if (tpool == NULL)
372 		return(-1);
373 
374 	pool = *tpool;
375 
376 	if (pool == NULL)
377 		return(-1);
378 
379 	if ( pool->ltp_numqs > 1 ) {
380 		int min = pool->ltp_wqs[0]->ltp_max_pending + pool->ltp_wqs[0]->ltp_max_count;
381 		int min_x = 0, cnt;
382 		for ( i = 0; i < pool->ltp_numqs; i++ ) {
383 			/* take first queue that has nothing active */
384 			if ( !pool->ltp_wqs[i]->ltp_active_count ) {
385 				min_x = i;
386 				break;
387 			}
388 			cnt = pool->ltp_wqs[i]->ltp_active_count + pool->ltp_wqs[i]->ltp_pending_count;
389 			if ( cnt < min ) {
390 				min = cnt;
391 				min_x = i;
392 			}
393 		}
394 		i = min_x;
395 	} else
396 		i = 0;
397 
398 	j = i;
399 	while(1) {
400 		ldap_pvt_thread_mutex_lock(&pool->ltp_wqs[i]->ltp_mutex);
401 		if (pool->ltp_wqs[i]->ltp_pending_count < pool->ltp_wqs[i]->ltp_max_pending) {
402 			break;
403 		}
404 		ldap_pvt_thread_mutex_unlock(&pool->ltp_wqs[i]->ltp_mutex);
405 		i++;
406 		i %= pool->ltp_numqs;
407 		if ( i == j )
408 			return -1;
409 	}
410 
411 	pq = pool->ltp_wqs[i];
412 	task = LDAP_SLIST_FIRST(&pq->ltp_free_list);
413 	if (task) {
414 		LDAP_SLIST_REMOVE_HEAD(&pq->ltp_free_list, ltt_next.l);
415 	} else {
416 		task = (ldap_int_thread_task_t *) LDAP_MALLOC(sizeof(*task));
417 		if (task == NULL)
418 			goto failed;
419 	}
420 
421 	task->ltt_start_routine = start_routine;
422 	task->ltt_arg = arg;
423 	task->ltt_queue = pq;
424 	if ( cookie )
425 		*cookie = task;
426 
427 	pq->ltp_pending_count++;
428 	LDAP_STAILQ_INSERT_TAIL(&pq->ltp_pending_list, task, ltt_next.q);
429 
430 	if (pool->ltp_pause)
431 		goto done;
432 
433 	/* should we open (create) a thread? */
434 	if (pq->ltp_open_count < pq->ltp_active_count+pq->ltp_pending_count &&
435 		pq->ltp_open_count < pq->ltp_max_count)
436 	{
437 		pq->ltp_starting++;
438 		pq->ltp_open_count++;
439 
440 		if (0 != ldap_pvt_thread_create(
441 			&thr, 1, ldap_int_thread_pool_wrapper, pq))
442 		{
443 			/* couldn't create thread.  back out of
444 			 * ltp_open_count and check for even worse things.
445 			 */
446 			pq->ltp_starting--;
447 			pq->ltp_open_count--;
448 
449 			if (pq->ltp_open_count == 0) {
450 				/* no open threads at all?!?
451 				 */
452 				ldap_int_thread_task_t *ptr;
453 
454 				/* let pool_close know there are no more threads */
455 				ldap_pvt_thread_cond_signal(&pq->ltp_cond);
456 
457 				LDAP_STAILQ_FOREACH(ptr, &pq->ltp_pending_list, ltt_next.q)
458 					if (ptr == task) break;
459 				if (ptr == task) {
460 					/* no open threads, task not handled, so
461 					 * back out of ltp_pending_count, free the task,
462 					 * report the error.
463 					 */
464 					pq->ltp_pending_count--;
465 					LDAP_STAILQ_REMOVE(&pq->ltp_pending_list, task,
466 						ldap_int_thread_task_s, ltt_next.q);
467 					LDAP_SLIST_INSERT_HEAD(&pq->ltp_free_list, task,
468 						ltt_next.l);
469 					goto failed;
470 				}
471 			}
472 			/* there is another open thread, so this
473 			 * task will be handled eventually.
474 			 */
475 		}
476 	}
477 	ldap_pvt_thread_cond_signal(&pq->ltp_cond);
478 
479  done:
480 	ldap_pvt_thread_mutex_unlock(&pq->ltp_mutex);
481 	return(0);
482 
483  failed:
484 	ldap_pvt_thread_mutex_unlock(&pq->ltp_mutex);
485 	return(-1);
486 }
487 
488 static void *
no_task(void * ctx,void * arg)489 no_task( void *ctx, void *arg )
490 {
491 	return NULL;
492 }
493 
494 /* Cancel a pending task that was previously submitted.
495  * Return 1 if the task was successfully cancelled, 0 if
496  * not found, -1 for invalid parameters
497  */
498 int
ldap_pvt_thread_pool_retract(void * cookie)499 ldap_pvt_thread_pool_retract (
500 	void *cookie )
501 {
502 	ldap_int_thread_task_t *task, *ttmp;
503 	struct ldap_int_thread_poolq_s *pq;
504 
505 	if (cookie == NULL)
506 		return(-1);
507 
508 	ttmp = cookie;
509 	pq = ttmp->ltt_queue;
510 	if (pq == NULL)
511 		return(-1);
512 
513 	ldap_pvt_thread_mutex_lock(&pq->ltp_mutex);
514 	LDAP_STAILQ_FOREACH(task, &pq->ltp_pending_list, ltt_next.q)
515 		if (task == ttmp) {
516 			/* Could LDAP_STAILQ_REMOVE the task, but that
517 			 * walks ltp_pending_list again to find it.
518 			 */
519 			task->ltt_start_routine = no_task;
520 			task->ltt_arg = NULL;
521 			break;
522 		}
523 	ldap_pvt_thread_mutex_unlock(&pq->ltp_mutex);
524 	return task != NULL;
525 }
526 
527 /* Walk the pool and allow tasks to be retracted, only to be called while the
528  * pool is paused */
529 int
ldap_pvt_thread_pool_walk(ldap_pvt_thread_pool_t * tpool,ldap_pvt_thread_start_t * start,ldap_pvt_thread_walk_t * cb,void * arg)530 ldap_pvt_thread_pool_walk(
531 	ldap_pvt_thread_pool_t *tpool,
532 	ldap_pvt_thread_start_t *start,
533 	ldap_pvt_thread_walk_t *cb, void *arg )
534 {
535 	struct ldap_int_thread_pool_s *pool;
536 	struct ldap_int_thread_poolq_s *pq;
537 	ldap_int_thread_task_t *task;
538 	int i;
539 
540 	if (tpool == NULL)
541 		return(-1);
542 
543 	pool = *tpool;
544 
545 	if (pool == NULL)
546 		return(-1);
547 
548 	ldap_pvt_thread_mutex_lock(&pool->ltp_mutex);
549 	assert(pool->ltp_pause == PAUSED);
550 	ldap_pvt_thread_mutex_unlock(&pool->ltp_mutex);
551 
552 	for (i=0; i<pool->ltp_numqs; i++) {
553 		pq = pool->ltp_wqs[i];
554 		LDAP_STAILQ_FOREACH(task, &pq->ltp_pending_list, ltt_next.q) {
555 			if ( task->ltt_start_routine == start ) {
556 				if ( cb( task->ltt_start_routine, task->ltt_arg, arg ) ) {
557 					/* retract */
558 					task->ltt_start_routine = no_task;
559 					task->ltt_arg = NULL;
560 				}
561 			}
562 		}
563 	}
564 	return 0;
565 }
566 
567 /* Set number of work queues in this pool. Should not be
568  * more than the number of CPUs. */
569 int
ldap_pvt_thread_pool_queues(ldap_pvt_thread_pool_t * tpool,int numqs)570 ldap_pvt_thread_pool_queues(
571 	ldap_pvt_thread_pool_t *tpool,
572 	int numqs )
573 {
574 	struct ldap_int_thread_pool_s *pool;
575 	struct ldap_int_thread_poolq_s *pq;
576 	int i, rc, rem_thr, rem_pend;
577 
578 	if (numqs < 1 || tpool == NULL)
579 		return(-1);
580 
581 	pool = *tpool;
582 
583 	if (pool == NULL)
584 		return(-1);
585 
586 	if (numqs < pool->ltp_numqs) {
587 		for (i=numqs; i<pool->ltp_numqs; i++)
588 			pool->ltp_wqs[i]->ltp_max_count = 0;
589 	} else if (numqs > pool->ltp_numqs) {
590 		struct ldap_int_thread_poolq_s **wqs;
591 		wqs = LDAP_REALLOC(pool->ltp_wqs, numqs * sizeof(struct ldap_int_thread_poolq_s *));
592 		if (wqs == NULL)
593 			return(-1);
594 		pool->ltp_wqs = wqs;
595 		for (i=pool->ltp_numqs; i<numqs; i++) {
596 			char *ptr = LDAP_CALLOC(1, sizeof(struct ldap_int_thread_poolq_s) + CACHELINE-1);
597 			if (ptr == NULL) {
598 				for (; i<numqs; i++)
599 					pool->ltp_wqs[i] = NULL;
600 				return(-1);
601 			}
602 			pq = (struct ldap_int_thread_poolq_s *)(((size_t)ptr + CACHELINE-1) & ~(CACHELINE-1));
603 			pq->ltp_free = ptr;
604 			pool->ltp_wqs[i] = pq;
605 			pq->ltp_pool = pool;
606 			rc = ldap_pvt_thread_mutex_init(&pq->ltp_mutex);
607 			if (rc != 0)
608 				return(rc);
609 			rc = ldap_pvt_thread_cond_init(&pq->ltp_cond);
610 			if (rc != 0)
611 				return(rc);
612 			LDAP_STAILQ_INIT(&pq->ltp_pending_list);
613 			pq->ltp_work_list = &pq->ltp_pending_list;
614 			LDAP_SLIST_INIT(&pq->ltp_free_list);
615 		}
616 	}
617 	rem_thr = pool->ltp_max_count % numqs;
618 	rem_pend = pool->ltp_max_pending % numqs;
619 	for ( i=0; i<numqs; i++ ) {
620 		pq = pool->ltp_wqs[i];
621 		pq->ltp_max_count = pool->ltp_max_count / numqs;
622 		if ( rem_thr ) {
623 			pq->ltp_max_count++;
624 			rem_thr--;
625 		}
626 		pq->ltp_max_pending = pool->ltp_max_pending / numqs;
627 		if ( rem_pend ) {
628 			pq->ltp_max_pending++;
629 			rem_pend--;
630 		}
631 	}
632 	pool->ltp_numqs = numqs;
633 	return 0;
634 }
635 
636 /* Set max #threads.  value <= 0 means max supported #threads (LDAP_MAXTHR) */
637 int
ldap_pvt_thread_pool_maxthreads(ldap_pvt_thread_pool_t * tpool,int max_threads)638 ldap_pvt_thread_pool_maxthreads(
639 	ldap_pvt_thread_pool_t *tpool,
640 	int max_threads )
641 {
642 	struct ldap_int_thread_pool_s *pool;
643 	struct ldap_int_thread_poolq_s *pq;
644 	int remthr, i;
645 
646 	if (! (0 <= max_threads && max_threads <= LDAP_MAXTHR))
647 		max_threads = 0;
648 
649 	if (tpool == NULL)
650 		return(-1);
651 
652 	pool = *tpool;
653 
654 	if (pool == NULL)
655 		return(-1);
656 
657 	pool->ltp_conf_max_count = max_threads;
658 	if ( !max_threads )
659 		max_threads = LDAP_MAXTHR;
660 	pool->ltp_max_count = max_threads;
661 
662 	remthr = max_threads % pool->ltp_numqs;
663 	max_threads /= pool->ltp_numqs;
664 
665 	for (i=0; i<pool->ltp_numqs; i++) {
666 		pq = pool->ltp_wqs[i];
667 		pq->ltp_max_count = max_threads;
668 		if (remthr) {
669 			pq->ltp_max_count++;
670 			remthr--;
671 		}
672 	}
673 	return(0);
674 }
675 
676 /* Inspect the pool */
677 int
ldap_pvt_thread_pool_query(ldap_pvt_thread_pool_t * tpool,ldap_pvt_thread_pool_param_t param,void * value)678 ldap_pvt_thread_pool_query(
679 	ldap_pvt_thread_pool_t *tpool,
680 	ldap_pvt_thread_pool_param_t param,
681 	void *value )
682 {
683 	struct ldap_int_thread_pool_s	*pool;
684 	int				count = -1;
685 
686 	if ( tpool == NULL || value == NULL ) {
687 		return -1;
688 	}
689 
690 	pool = *tpool;
691 
692 	if ( pool == NULL ) {
693 		return 0;
694 	}
695 
696 	switch ( param ) {
697 	case LDAP_PVT_THREAD_POOL_PARAM_MAX:
698 		count = pool->ltp_conf_max_count;
699 		break;
700 
701 	case LDAP_PVT_THREAD_POOL_PARAM_MAX_PENDING:
702 		count = pool->ltp_max_pending;
703 		if (count < 0)
704 			count = -count;
705 		if (count == MAX_PENDING)
706 			count = 0;
707 		break;
708 
709 	case LDAP_PVT_THREAD_POOL_PARAM_PAUSING:
710 		ldap_pvt_thread_mutex_lock(&pool->ltp_mutex);
711 		count = (pool->ltp_pause != 0);
712 		ldap_pvt_thread_mutex_unlock(&pool->ltp_mutex);
713 		break;
714 
715 	case LDAP_PVT_THREAD_POOL_PARAM_OPEN:
716 	case LDAP_PVT_THREAD_POOL_PARAM_STARTING:
717 	case LDAP_PVT_THREAD_POOL_PARAM_ACTIVE:
718 	case LDAP_PVT_THREAD_POOL_PARAM_PENDING:
719 	case LDAP_PVT_THREAD_POOL_PARAM_BACKLOAD:
720 		{
721 			int i;
722 			count = 0;
723 			for (i=0; i<pool->ltp_numqs; i++) {
724 				struct ldap_int_thread_poolq_s *pq = pool->ltp_wqs[i];
725 				ldap_pvt_thread_mutex_lock(&pq->ltp_mutex);
726 				switch(param) {
727 					case LDAP_PVT_THREAD_POOL_PARAM_OPEN:
728 						count += pq->ltp_open_count;
729 						break;
730 					case LDAP_PVT_THREAD_POOL_PARAM_STARTING:
731 						count += pq->ltp_starting;
732 						break;
733 					case LDAP_PVT_THREAD_POOL_PARAM_ACTIVE:
734 						count += pq->ltp_active_count;
735 						break;
736 					case LDAP_PVT_THREAD_POOL_PARAM_PENDING:
737 						count += pq->ltp_pending_count;
738 						break;
739 					case LDAP_PVT_THREAD_POOL_PARAM_BACKLOAD:
740 						count += pq->ltp_pending_count + pq->ltp_active_count;
741 						break;
742 					default:
743 						break;
744 				}
745 				ldap_pvt_thread_mutex_unlock(&pq->ltp_mutex);
746 			}
747 			if (count < 0)
748 				count = -count;
749 		}
750 		break;
751 
752 	case LDAP_PVT_THREAD_POOL_PARAM_ACTIVE_MAX:
753 		break;
754 
755 	case LDAP_PVT_THREAD_POOL_PARAM_PENDING_MAX:
756 		break;
757 
758 	case LDAP_PVT_THREAD_POOL_PARAM_BACKLOAD_MAX:
759 		break;
760 
761 	case LDAP_PVT_THREAD_POOL_PARAM_STATE:
762 		if (pool->ltp_pause)
763 			*((char **)value) = "pausing";
764 		else if (!pool->ltp_finishing)
765 			*((char **)value) = "running";
766 		else {
767 			int i;
768 			for (i=0; i<pool->ltp_numqs; i++)
769 				if (pool->ltp_wqs[i]->ltp_pending_count) break;
770 			if (i<pool->ltp_numqs)
771 				*((char **)value) = "finishing";
772 			else
773 				*((char **)value) = "stopping";
774 		}
775 		break;
776 
777 	case LDAP_PVT_THREAD_POOL_PARAM_UNKNOWN:
778 		break;
779 	}
780 
781 	if ( count > -1 ) {
782 		*((int *)value) = count;
783 	}
784 
785 	return ( count == -1 ? -1 : 0 );
786 }
787 
788 /*
789  * true if pool is pausing; does not lock any mutex to check.
790  * 0 if not pause, 1 if pause, -1 if error or no pool.
791  */
792 int
ldap_pvt_thread_pool_pausing(ldap_pvt_thread_pool_t * tpool)793 ldap_pvt_thread_pool_pausing( ldap_pvt_thread_pool_t *tpool )
794 {
795 	int rc = -1;
796 	struct ldap_int_thread_pool_s *pool;
797 
798 	if ( tpool != NULL && (pool = *tpool) != NULL ) {
799 		rc = (pool->ltp_pause != 0);
800 	}
801 
802 	return rc;
803 }
804 
805 /*
806  * wrapper for ldap_pvt_thread_pool_query(), left around
807  * for backwards compatibility
808  */
809 int
ldap_pvt_thread_pool_backload(ldap_pvt_thread_pool_t * tpool)810 ldap_pvt_thread_pool_backload ( ldap_pvt_thread_pool_t *tpool )
811 {
812 	int	rc, count;
813 
814 	rc = ldap_pvt_thread_pool_query( tpool,
815 		LDAP_PVT_THREAD_POOL_PARAM_BACKLOAD, (void *)&count );
816 
817 	if ( rc == 0 ) {
818 		return count;
819 	}
820 
821 	return rc;
822 }
823 
824 
825 /*
826  * wrapper for ldap_pvt_thread_pool_close+free(), left around
827  * for backwards compatibility
828  */
829 int
ldap_pvt_thread_pool_destroy(ldap_pvt_thread_pool_t * tpool,int run_pending)830 ldap_pvt_thread_pool_destroy ( ldap_pvt_thread_pool_t *tpool, int run_pending )
831 {
832 	int rc;
833 
834 	if ( (rc = ldap_pvt_thread_pool_close( tpool, run_pending )) ) {
835 		return rc;
836 	}
837 
838 	return ldap_pvt_thread_pool_free( tpool );
839 }
840 
841 /* Shut down the pool making its threads finish */
842 int
ldap_pvt_thread_pool_close(ldap_pvt_thread_pool_t * tpool,int run_pending)843 ldap_pvt_thread_pool_close ( ldap_pvt_thread_pool_t *tpool, int run_pending )
844 {
845 	struct ldap_int_thread_pool_s *pool, *pptr;
846 	struct ldap_int_thread_poolq_s *pq;
847 	ldap_int_thread_task_t *task;
848 	int i;
849 
850 	if (tpool == NULL)
851 		return(-1);
852 
853 	pool = *tpool;
854 
855 	if (pool == NULL) return(-1);
856 
857 	ldap_pvt_thread_mutex_lock(&ldap_pvt_thread_pool_mutex);
858 	LDAP_STAILQ_FOREACH(pptr, &ldap_int_thread_pool_list, ltp_next)
859 		if (pptr == pool) break;
860 	ldap_pvt_thread_mutex_unlock(&ldap_pvt_thread_pool_mutex);
861 
862 	if (pool != pptr) return(-1);
863 
864 	ldap_pvt_thread_mutex_lock(&pool->ltp_mutex);
865 
866 	pool->ltp_finishing = 1;
867 	if (pool->ltp_max_pending > 0)
868 		pool->ltp_max_pending = -pool->ltp_max_pending;
869 
870 	ldap_pvt_thread_cond_broadcast(&pool->ltp_cond);
871 	ldap_pvt_thread_mutex_unlock(&pool->ltp_mutex);
872 
873 	for (i=0; i<pool->ltp_numqs; i++) {
874 		pq = pool->ltp_wqs[i];
875 		ldap_pvt_thread_mutex_lock(&pq->ltp_mutex);
876 		if (pq->ltp_max_pending > 0)
877 			pq->ltp_max_pending = -pq->ltp_max_pending;
878 		if (!run_pending) {
879 			while ((task = LDAP_STAILQ_FIRST(&pq->ltp_pending_list)) != NULL) {
880 				LDAP_STAILQ_REMOVE_HEAD(&pq->ltp_pending_list, ltt_next.q);
881 				LDAP_FREE(task);
882 			}
883 			pq->ltp_pending_count = 0;
884 		}
885 
886 		while (pq->ltp_open_count) {
887 			ldap_pvt_thread_cond_broadcast(&pq->ltp_cond);
888 			ldap_pvt_thread_cond_wait(&pq->ltp_cond, &pq->ltp_mutex);
889 		}
890 
891 		while ((task = LDAP_SLIST_FIRST(&pq->ltp_free_list)) != NULL)
892 		{
893 			LDAP_SLIST_REMOVE_HEAD(&pq->ltp_free_list, ltt_next.l);
894 			LDAP_FREE(task);
895 		}
896 		ldap_pvt_thread_mutex_unlock(&pq->ltp_mutex);
897 	}
898 
899 	return(0);
900 }
901 
902 /* Destroy the pool, everything must have already shut down */
903 int
ldap_pvt_thread_pool_free(ldap_pvt_thread_pool_t * tpool)904 ldap_pvt_thread_pool_free ( ldap_pvt_thread_pool_t *tpool )
905 {
906 	struct ldap_int_thread_pool_s *pool, *pptr;
907 	struct ldap_int_thread_poolq_s *pq;
908 	int i;
909 
910 	if (tpool == NULL)
911 		return(-1);
912 
913 	pool = *tpool;
914 
915 	if (pool == NULL) return(-1);
916 
917 	ldap_pvt_thread_mutex_lock(&ldap_pvt_thread_pool_mutex);
918 	LDAP_STAILQ_FOREACH(pptr, &ldap_int_thread_pool_list, ltp_next)
919 		if (pptr == pool) break;
920 	if (pptr == pool)
921 		LDAP_STAILQ_REMOVE(&ldap_int_thread_pool_list, pool,
922 			ldap_int_thread_pool_s, ltp_next);
923 	ldap_pvt_thread_mutex_unlock(&ldap_pvt_thread_pool_mutex);
924 
925 	if (pool != pptr) return(-1);
926 
927 	ldap_pvt_thread_cond_destroy(&pool->ltp_pcond);
928 	ldap_pvt_thread_cond_destroy(&pool->ltp_cond);
929 	ldap_pvt_thread_mutex_destroy(&pool->ltp_mutex);
930 	for (i=0; i<pool->ltp_numqs; i++) {
931 		pq = pool->ltp_wqs[i];
932 
933 		assert( !pq->ltp_open_count );
934 		assert( LDAP_SLIST_EMPTY(&pq->ltp_free_list) );
935 		ldap_pvt_thread_cond_destroy(&pq->ltp_cond);
936 		ldap_pvt_thread_mutex_destroy(&pq->ltp_mutex);
937 		if (pq->ltp_free) {
938 			LDAP_FREE(pq->ltp_free);
939 		}
940 	}
941 	LDAP_FREE(pool->ltp_wqs);
942 	LDAP_FREE(pool);
943 	*tpool = NULL;
944 	ldap_int_has_thread_pool = 0;
945 	return(0);
946 }
947 
948 /* Thread loop.  Accept and handle submitted tasks. */
949 static void *
ldap_int_thread_pool_wrapper(void * xpool)950 ldap_int_thread_pool_wrapper (
951 	void *xpool )
952 {
953 	struct ldap_int_thread_poolq_s *pq = xpool;
954 	struct ldap_int_thread_pool_s *pool = pq->ltp_pool;
955 	ldap_int_thread_task_t *task;
956 	ldap_int_tpool_plist_t *work_list;
957 	ldap_int_thread_userctx_t ctx, *kctx;
958 	unsigned i, keyslot, hash;
959 	int pool_lock = 0, freeme = 0;
960 
961 	assert(pool != NULL);
962 
963 	for ( i=0; i<MAXKEYS; i++ ) {
964 		ctx.ltu_key[i].ltk_key = NULL;
965 	}
966 
967 	ctx.ltu_pq = pq;
968 	ctx.ltu_id = ldap_pvt_thread_self();
969 	TID_HASH(ctx.ltu_id, hash);
970 
971 	ldap_pvt_thread_key_setdata( ldap_tpool_key, &ctx );
972 
973 	if (pool->ltp_pause) {
974 		ldap_pvt_thread_mutex_lock(&pool->ltp_mutex);
975 		/* thread_keys[] is read-only when paused */
976 		while (pool->ltp_pause)
977 			ldap_pvt_thread_cond_wait(&pool->ltp_cond, &pool->ltp_mutex);
978 		ldap_pvt_thread_mutex_unlock(&pool->ltp_mutex);
979 	}
980 
981 	/* find a key slot to give this thread ID and store a
982 	 * pointer to our keys there; start at the thread ID
983 	 * itself (mod LDAP_MAXTHR) and look for an empty slot.
984 	 */
985 	ldap_pvt_thread_mutex_lock(&ldap_pvt_thread_pool_mutex);
986 	for (keyslot = hash & (LDAP_MAXTHR-1);
987 		(kctx = thread_keys[keyslot].ctx) && kctx != DELETED_THREAD_CTX;
988 		keyslot = (keyslot+1) & (LDAP_MAXTHR-1));
989 	thread_keys[keyslot].ctx = &ctx;
990 	ldap_pvt_thread_mutex_unlock(&ldap_pvt_thread_pool_mutex);
991 
992 	ldap_pvt_thread_mutex_lock(&pq->ltp_mutex);
993 	pq->ltp_starting--;
994 	pq->ltp_active_count++;
995 
996 	for (;;) {
997 		work_list = pq->ltp_work_list; /* help the compiler a bit */
998 		task = LDAP_STAILQ_FIRST(work_list);
999 		if (task == NULL) {	/* paused or no pending tasks */
1000 			if (--(pq->ltp_active_count) < 1) {
1001 				if (pool->ltp_pause) {
1002 					ldap_pvt_thread_mutex_unlock(&pq->ltp_mutex);
1003 					ldap_pvt_thread_mutex_lock(&pool->ltp_mutex);
1004 					pool_lock = 1;
1005 					if (--(pool->ltp_active_queues) < 1) {
1006 						/* Notify pool_pause it is the sole active thread. */
1007 						ldap_pvt_thread_cond_signal(&pool->ltp_pcond);
1008 					}
1009 				}
1010 			}
1011 
1012 			do {
1013 				if (pool->ltp_finishing || pq->ltp_open_count > pq->ltp_max_count) {
1014 					/* Not paused, and either finishing or too many
1015 					 * threads running (can happen if ltp_max_count
1016 					 * was reduced).  Let this thread die.
1017 					 */
1018 					goto done;
1019 				}
1020 
1021 				/* We could check an idle timer here, and let the
1022 				 * thread die if it has been inactive for a while.
1023 				 * Only die if there are other open threads (i.e.,
1024 				 * always have at least one thread open).
1025 				 * The check should be like this:
1026 				 *   if (pool->ltp_open_count>1 && pool->ltp_starting==0)
1027 				 *       check timer, wait if ltp_pause, leave thread;
1028 				 *
1029 				 * Just use pthread_cond_timedwait() if we want to
1030 				 * check idle time.
1031 				 */
1032 				if (pool_lock) {
1033 					ldap_pvt_thread_cond_wait(&pool->ltp_cond, &pool->ltp_mutex);
1034 					if (!pool->ltp_pause) {
1035 						ldap_pvt_thread_mutex_unlock(&pool->ltp_mutex);
1036 						ldap_pvt_thread_mutex_lock(&pq->ltp_mutex);
1037 						pool_lock = 0;
1038 					}
1039 				} else
1040 					ldap_pvt_thread_cond_wait(&pq->ltp_cond, &pq->ltp_mutex);
1041 
1042 				work_list = pq->ltp_work_list;
1043 				task = LDAP_STAILQ_FIRST(work_list);
1044 			} while (task == NULL);
1045 
1046 			if (pool_lock) {
1047 				ldap_pvt_thread_mutex_unlock(&pool->ltp_mutex);
1048 				ldap_pvt_thread_mutex_lock(&pq->ltp_mutex);
1049 				pool_lock = 0;
1050 			}
1051 			pq->ltp_active_count++;
1052 		}
1053 
1054 		LDAP_STAILQ_REMOVE_HEAD(work_list, ltt_next.q);
1055 		pq->ltp_pending_count--;
1056 		ldap_pvt_thread_mutex_unlock(&pq->ltp_mutex);
1057 
1058 		task->ltt_start_routine(&ctx, task->ltt_arg);
1059 
1060 		ldap_pvt_thread_mutex_lock(&pq->ltp_mutex);
1061 		LDAP_SLIST_INSERT_HEAD(&pq->ltp_free_list, task, ltt_next.l);
1062 	}
1063  done:
1064 
1065 	ldap_pvt_thread_mutex_lock(&ldap_pvt_thread_pool_mutex);
1066 
1067 	/* The pool_mutex lock protects ctx->ltu_key from pool_purgekey()
1068 	 * during this call, since it prevents new pauses. */
1069 	ldap_pvt_thread_pool_context_reset(&ctx);
1070 
1071 	thread_keys[keyslot].ctx = DELETED_THREAD_CTX;
1072 	ldap_pvt_thread_mutex_unlock(&ldap_pvt_thread_pool_mutex);
1073 
1074 	pq->ltp_open_count--;
1075 	if (pq->ltp_open_count == 0) {
1076 		if (pool->ltp_finishing)
1077 			/* let pool_destroy know we're all done */
1078 			ldap_pvt_thread_cond_signal(&pq->ltp_cond);
1079 		else
1080 			freeme = 1;
1081 	}
1082 
1083 	if (pool_lock)
1084 		ldap_pvt_thread_mutex_unlock(&pool->ltp_mutex);
1085 	else
1086 		ldap_pvt_thread_mutex_unlock(&pq->ltp_mutex);
1087 
1088 	if (freeme) {
1089 		ldap_pvt_thread_cond_destroy(&pq->ltp_cond);
1090 		ldap_pvt_thread_mutex_destroy(&pq->ltp_mutex);
1091 		LDAP_FREE(pq->ltp_free);
1092 		pq->ltp_free = NULL;
1093 	}
1094 	ldap_pvt_thread_exit(NULL);
1095 	return(NULL);
1096 }
1097 
1098 /* Arguments > ltp_pause to handle_pause(,PAUSE_ARG()).  arg=PAUSE_ARG
1099  * ensures (arg-ltp_pause) sets GO_* at need and keeps DO_PAUSE/GO_*.
1100  */
1101 #define GO_IDLE		8
1102 #define GO_UNIDLE	16
1103 #define CHECK_PAUSE	32	/* if ltp_pause: GO_IDLE; wait; GO_UNIDLE */
1104 #define DO_PAUSE	64	/* CHECK_PAUSE; pause the pool */
1105 #define PAUSE_ARG(a) \
1106 		((a) | ((a) & (GO_IDLE|GO_UNIDLE) ? GO_IDLE-1 : CHECK_PAUSE))
1107 
1108 static int
handle_pause(ldap_pvt_thread_pool_t * tpool,int pause_type)1109 handle_pause( ldap_pvt_thread_pool_t *tpool, int pause_type )
1110 {
1111 	struct ldap_int_thread_pool_s *pool;
1112 	struct ldap_int_thread_poolq_s *pq;
1113 	int ret = 0, pause, max_ltp_pause;
1114 
1115 	if (tpool == NULL)
1116 		return(-1);
1117 
1118 	pool = *tpool;
1119 
1120 	if (pool == NULL)
1121 		return(0);
1122 
1123 	if (pause_type == CHECK_PAUSE && !pool->ltp_pause)
1124 		return(0);
1125 
1126 	{
1127 		ldap_int_thread_userctx_t *ctx = ldap_pvt_thread_pool_context();
1128 		pq = ctx->ltu_pq;
1129 		if ( !pq )
1130 			return(-1);
1131 	}
1132 
1133 	/* Let pool_unidle() ignore requests for new pauses */
1134 	max_ltp_pause = pause_type==PAUSE_ARG(GO_UNIDLE) ? WANT_PAUSE : NOT_PAUSED;
1135 
1136 	ldap_pvt_thread_mutex_lock(&pool->ltp_mutex);
1137 
1138 	pause = pool->ltp_pause;	/* NOT_PAUSED, WANT_PAUSE or PAUSED */
1139 
1140 	/* If ltp_pause and not GO_IDLE|GO_UNIDLE: Set GO_IDLE,GO_UNIDLE */
1141 	pause_type -= pause;
1142 
1143 	if (pause_type & GO_IDLE) {
1144 		int do_pool = 0;
1145 		ldap_pvt_thread_mutex_lock(&pq->ltp_mutex);
1146 		pq->ltp_pending_count++;
1147 		pq->ltp_active_count--;
1148 		if (pause && pq->ltp_active_count < 1) {
1149 			do_pool = 1;
1150 		}
1151 		ldap_pvt_thread_mutex_unlock(&pq->ltp_mutex);
1152 		if (do_pool) {
1153 			pool->ltp_active_queues--;
1154 			if (pool->ltp_active_queues < 1)
1155 			/* Tell the task waiting to DO_PAUSE it can proceed */
1156 				ldap_pvt_thread_cond_signal(&pool->ltp_pcond);
1157 		}
1158 	}
1159 
1160 	if (pause_type & GO_UNIDLE) {
1161 		/* Wait out pause if any, then cancel GO_IDLE */
1162 		if (pause > max_ltp_pause) {
1163 			ret = 1;
1164 			do {
1165 				ldap_pvt_thread_cond_wait(&pool->ltp_cond, &pool->ltp_mutex);
1166 			} while (pool->ltp_pause > max_ltp_pause);
1167 		}
1168 		ldap_pvt_thread_mutex_lock(&pq->ltp_mutex);
1169 		pq->ltp_pending_count--;
1170 		pq->ltp_active_count++;
1171 		ldap_pvt_thread_mutex_unlock(&pq->ltp_mutex);
1172 	}
1173 
1174 	if (pause_type & DO_PAUSE) {
1175 		int i, j;
1176 		/* Tell everyone else to pause or finish, then await that */
1177 		ret = 0;
1178 		assert(!pool->ltp_pause);
1179 		pool->ltp_pause = WANT_PAUSE;
1180 		pool->ltp_active_queues = 0;
1181 
1182 		for (i=0; i<pool->ltp_numqs; i++)
1183 			if (pool->ltp_wqs[i] == pq) break;
1184 
1185 		ldap_pvt_thread_mutex_lock(&pq->ltp_mutex);
1186 		/* temporarily remove ourself from active count */
1187 		pq->ltp_active_count--;
1188 
1189 		j=i;
1190 		do {
1191 			pq = pool->ltp_wqs[j];
1192 			if (j != i)
1193 				ldap_pvt_thread_mutex_lock(&pq->ltp_mutex);
1194 
1195 			/* Hide pending tasks from ldap_pvt_thread_pool_wrapper() */
1196 			pq->ltp_work_list = &empty_pending_list;
1197 
1198 			if (pq->ltp_active_count > 0)
1199 				pool->ltp_active_queues++;
1200 
1201 			ldap_pvt_thread_mutex_unlock(&pq->ltp_mutex);
1202 			if (pool->ltp_numqs > 1) {
1203 				j++;
1204 				j %= pool->ltp_numqs;
1205 			}
1206 		} while (j != i);
1207 
1208 		/* Wait for this task to become the sole active task */
1209 		while (pool->ltp_active_queues > 0)
1210 			ldap_pvt_thread_cond_wait(&pool->ltp_pcond, &pool->ltp_mutex);
1211 
1212 		/* restore us to active count */
1213 		pool->ltp_wqs[i]->ltp_active_count++;
1214 
1215 		assert(pool->ltp_pause == WANT_PAUSE);
1216 		pool->ltp_pause = PAUSED;
1217 	}
1218 	ldap_pvt_thread_mutex_unlock(&pool->ltp_mutex);
1219 
1220 	return(ret);
1221 }
1222 
1223 /* Consider this task idle: It will not block pool_pause() in other tasks. */
1224 void
ldap_pvt_thread_pool_idle(ldap_pvt_thread_pool_t * tpool)1225 ldap_pvt_thread_pool_idle( ldap_pvt_thread_pool_t *tpool )
1226 {
1227 	handle_pause(tpool, PAUSE_ARG(GO_IDLE));
1228 }
1229 
1230 /* Cancel pool_idle(). If the pool is paused, wait it out first. */
1231 void
ldap_pvt_thread_pool_unidle(ldap_pvt_thread_pool_t * tpool)1232 ldap_pvt_thread_pool_unidle( ldap_pvt_thread_pool_t *tpool )
1233 {
1234 	handle_pause(tpool, PAUSE_ARG(GO_UNIDLE));
1235 }
1236 
1237 /*
1238  * If a pause was requested, wait for it.  If several threads
1239  * are waiting to pause, let through one or more pauses.
1240  * The calling task must be active, not idle.
1241  * Return 1 if we waited, 0 if not, -1 at parameter error.
1242  */
1243 int
ldap_pvt_thread_pool_pausecheck(ldap_pvt_thread_pool_t * tpool)1244 ldap_pvt_thread_pool_pausecheck( ldap_pvt_thread_pool_t *tpool )
1245 {
1246 	return handle_pause(tpool, PAUSE_ARG(CHECK_PAUSE));
1247 }
1248 
1249 /*
1250  * Wait for a pause, from a non-pooled thread.
1251  */
1252 int
ldap_pvt_thread_pool_pausecheck_native(ldap_pvt_thread_pool_t * tpool)1253 ldap_pvt_thread_pool_pausecheck_native( ldap_pvt_thread_pool_t *tpool )
1254 {
1255 	struct ldap_int_thread_pool_s *pool;
1256 
1257 	if (tpool == NULL)
1258 		return(-1);
1259 
1260 	pool = *tpool;
1261 
1262 	if (pool == NULL)
1263 		return(0);
1264 
1265 	if (!pool->ltp_pause)
1266 		return(0);
1267 
1268 	ldap_pvt_thread_mutex_lock(&pool->ltp_mutex);
1269 	while (pool->ltp_pause)
1270 			ldap_pvt_thread_cond_wait(&pool->ltp_cond, &pool->ltp_mutex);
1271 	ldap_pvt_thread_mutex_unlock(&pool->ltp_mutex);
1272 	return 1;
1273 }
1274 
1275 /*
1276  * Pause the pool.  The calling task must be active, not idle.
1277  * Return when all other tasks are paused or idle.
1278  */
1279 int
ldap_pvt_thread_pool_pause(ldap_pvt_thread_pool_t * tpool)1280 ldap_pvt_thread_pool_pause( ldap_pvt_thread_pool_t *tpool )
1281 {
1282 	return handle_pause(tpool, PAUSE_ARG(DO_PAUSE));
1283 }
1284 
1285 /* End a pause */
1286 int
ldap_pvt_thread_pool_resume(ldap_pvt_thread_pool_t * tpool)1287 ldap_pvt_thread_pool_resume (
1288 	ldap_pvt_thread_pool_t *tpool )
1289 {
1290 	struct ldap_int_thread_pool_s *pool;
1291 	struct ldap_int_thread_poolq_s *pq;
1292 	int i;
1293 
1294 	if (tpool == NULL)
1295 		return(-1);
1296 
1297 	pool = *tpool;
1298 
1299 	if (pool == NULL)
1300 		return(0);
1301 
1302 	ldap_pvt_thread_mutex_lock(&pool->ltp_mutex);
1303 	assert(pool->ltp_pause == PAUSED);
1304 	pool->ltp_pause = 0;
1305 	for (i=0; i<pool->ltp_numqs; i++) {
1306 		pq = pool->ltp_wqs[i];
1307 		pq->ltp_work_list = &pq->ltp_pending_list;
1308 		ldap_pvt_thread_cond_broadcast(&pq->ltp_cond);
1309 	}
1310 	ldap_pvt_thread_cond_broadcast(&pool->ltp_cond);
1311 	ldap_pvt_thread_mutex_unlock(&pool->ltp_mutex);
1312 	return(0);
1313 }
1314 
1315 /*
1316  * Get the key's data and optionally free function in the given context.
1317  */
ldap_pvt_thread_pool_getkey(void * xctx,void * key,void ** data,ldap_pvt_thread_pool_keyfree_t ** kfree)1318 int ldap_pvt_thread_pool_getkey(
1319 	void *xctx,
1320 	void *key,
1321 	void **data,
1322 	ldap_pvt_thread_pool_keyfree_t **kfree )
1323 {
1324 	ldap_int_thread_userctx_t *ctx = xctx;
1325 	int i;
1326 
1327 	if ( !ctx || !key || !data ) return EINVAL;
1328 
1329 	for ( i=0; i<MAXKEYS && ctx->ltu_key[i].ltk_key; i++ ) {
1330 		if ( ctx->ltu_key[i].ltk_key == key ) {
1331 			*data = ctx->ltu_key[i].ltk_data;
1332 			if ( kfree ) *kfree = ctx->ltu_key[i].ltk_free;
1333 			return 0;
1334 		}
1335 	}
1336 	return ENOENT;
1337 }
1338 
1339 static void
clear_key_idx(ldap_int_thread_userctx_t * ctx,int i)1340 clear_key_idx( ldap_int_thread_userctx_t *ctx, int i )
1341 {
1342 	for ( ; i < MAXKEYS-1 && ctx->ltu_key[i+1].ltk_key; i++ )
1343 		ctx->ltu_key[i] = ctx->ltu_key[i+1];
1344 	ctx->ltu_key[i].ltk_key = NULL;
1345 }
1346 
1347 /*
1348  * Set or remove data for the key in the given context.
1349  * key can be any unique pointer.
1350  * kfree() is an optional function to free the data (but not the key):
1351  *   pool_context_reset() and pool_purgekey() call kfree(key, data),
1352  *   but pool_setkey() does not.  For pool_setkey() it is the caller's
1353  *   responsibility to free any existing data with the same key.
1354  *   kfree() must not call functions taking a tpool argument.
1355  */
ldap_pvt_thread_pool_setkey(void * xctx,void * key,void * data,ldap_pvt_thread_pool_keyfree_t * kfree,void ** olddatap,ldap_pvt_thread_pool_keyfree_t ** oldkfreep)1356 int ldap_pvt_thread_pool_setkey(
1357 	void *xctx,
1358 	void *key,
1359 	void *data,
1360 	ldap_pvt_thread_pool_keyfree_t *kfree,
1361 	void **olddatap,
1362 	ldap_pvt_thread_pool_keyfree_t **oldkfreep )
1363 {
1364 	ldap_int_thread_userctx_t *ctx = xctx;
1365 	int i, found;
1366 
1367 	if ( !ctx || !key ) return EINVAL;
1368 
1369 	for ( i=found=0; i<MAXKEYS; i++ ) {
1370 		if ( ctx->ltu_key[i].ltk_key == key ) {
1371 			found = 1;
1372 			break;
1373 		} else if ( !ctx->ltu_key[i].ltk_key ) {
1374 			break;
1375 		}
1376 	}
1377 
1378 	if ( olddatap ) {
1379 		if ( found ) {
1380 			*olddatap = ctx->ltu_key[i].ltk_data;
1381 		} else {
1382 			*olddatap = NULL;
1383 		}
1384 	}
1385 
1386 	if ( oldkfreep ) {
1387 		if ( found ) {
1388 			*oldkfreep = ctx->ltu_key[i].ltk_free;
1389 		} else {
1390 			*oldkfreep = 0;
1391 		}
1392 	}
1393 
1394 	if ( data || kfree ) {
1395 		if ( i>=MAXKEYS )
1396 			return ENOMEM;
1397 		ctx->ltu_key[i].ltk_key = key;
1398 		ctx->ltu_key[i].ltk_data = data;
1399 		ctx->ltu_key[i].ltk_free = kfree;
1400 	} else if ( found ) {
1401 		clear_key_idx( ctx, i );
1402 	}
1403 
1404 	return 0;
1405 }
1406 
1407 /* Free all elements with this key, no matter which thread they're in.
1408  * May only be called while the pool is paused.
1409  */
ldap_pvt_thread_pool_purgekey(void * key)1410 void ldap_pvt_thread_pool_purgekey( void *key )
1411 {
1412 	int i, j;
1413 	ldap_int_thread_userctx_t *ctx;
1414 
1415 	assert ( key != NULL );
1416 
1417 	ldap_pvt_thread_mutex_lock(&ldap_pvt_thread_pool_mutex);
1418 	for ( i=0; i<LDAP_MAXTHR; i++ ) {
1419 		ctx = thread_keys[i].ctx;
1420 		if ( ctx && ctx != DELETED_THREAD_CTX ) {
1421 			for ( j=0; j<MAXKEYS && ctx->ltu_key[j].ltk_key; j++ ) {
1422 				if ( ctx->ltu_key[j].ltk_key == key ) {
1423 					if (ctx->ltu_key[j].ltk_free)
1424 						ctx->ltu_key[j].ltk_free( ctx->ltu_key[j].ltk_key,
1425 						ctx->ltu_key[j].ltk_data );
1426 					clear_key_idx( ctx, j );
1427 					break;
1428 				}
1429 			}
1430 		}
1431 	}
1432 	ldap_pvt_thread_mutex_unlock(&ldap_pvt_thread_pool_mutex);
1433 }
1434 
1435 /*
1436  * Find the context of the current thread.
1437  * This is necessary if the caller does not have access to the
1438  * thread context handle (for example, a slapd plugin calling
1439  * slapi_search_internal()). No doubt it is more efficient
1440  * for the application to keep track of the thread context
1441  * handles itself.
1442  */
ldap_pvt_thread_pool_context()1443 void *ldap_pvt_thread_pool_context( )
1444 {
1445 	void *ctx = NULL;
1446 
1447 	ldap_pvt_thread_key_getdata( ldap_tpool_key, &ctx );
1448 	return ctx ? ctx : (void *) &ldap_int_main_thrctx;
1449 }
1450 
1451 /*
1452  * Free the context's keys.
1453  * Must not call functions taking a tpool argument (because this
1454  * thread already holds ltp_mutex when called from pool_wrapper()).
1455  */
ldap_pvt_thread_pool_context_reset(void * vctx)1456 void ldap_pvt_thread_pool_context_reset( void *vctx )
1457 {
1458 	ldap_int_thread_userctx_t *ctx = vctx;
1459 	int i;
1460 
1461 	for ( i=MAXKEYS-1; i>=0; i--) {
1462 		if ( !ctx->ltu_key[i].ltk_key )
1463 			continue;
1464 		if ( ctx->ltu_key[i].ltk_free )
1465 			ctx->ltu_key[i].ltk_free( ctx->ltu_key[i].ltk_key,
1466 			ctx->ltu_key[i].ltk_data );
1467 		ctx->ltu_key[i].ltk_key = NULL;
1468 	}
1469 }
1470 
ldap_pvt_thread_pool_tid(void * vctx)1471 ldap_pvt_thread_t ldap_pvt_thread_pool_tid( void *vctx )
1472 {
1473 	ldap_int_thread_userctx_t *ctx = vctx;
1474 
1475 	return ctx->ltu_id;
1476 }
1477 #endif /* LDAP_THREAD_HAVE_TPOOL */
1478 
1479 #endif /* LDAP_R_COMPILE */
1480