xref: /freebsd-src/contrib/ntp/libntp/work_thread.c (revision f5f40dd63bc7acbb5312b26ac1ea1103c12352a6)
12b15cb3dSCy Schubert /*
22b15cb3dSCy Schubert  * work_thread.c - threads implementation for blocking worker child.
32b15cb3dSCy Schubert  */
42b15cb3dSCy Schubert #include <config.h>
52b15cb3dSCy Schubert #include "ntp_workimpl.h"
62b15cb3dSCy Schubert 
72b15cb3dSCy Schubert #ifdef WORK_THREAD
82b15cb3dSCy Schubert 
92b15cb3dSCy Schubert #include <stdio.h>
102b15cb3dSCy Schubert #include <ctype.h>
112b15cb3dSCy Schubert #include <signal.h>
122b15cb3dSCy Schubert #ifndef SYS_WINNT
132b15cb3dSCy Schubert #include <pthread.h>
142b15cb3dSCy Schubert #endif
152b15cb3dSCy Schubert 
162b15cb3dSCy Schubert #include "ntp_stdlib.h"
172b15cb3dSCy Schubert #include "ntp_malloc.h"
182b15cb3dSCy Schubert #include "ntp_syslog.h"
192b15cb3dSCy Schubert #include "ntpd.h"
202b15cb3dSCy Schubert #include "ntp_io.h"
212b15cb3dSCy Schubert #include "ntp_assert.h"
222b15cb3dSCy Schubert #include "ntp_unixtime.h"
232b15cb3dSCy Schubert #include "timespecops.h"
242b15cb3dSCy Schubert #include "ntp_worker.h"
252b15cb3dSCy Schubert 
262b15cb3dSCy Schubert #define CHILD_EXIT_REQ	((blocking_pipe_header *)(intptr_t)-1)
272b15cb3dSCy Schubert #define CHILD_GONE_RESP	CHILD_EXIT_REQ
2868ba7e87SXin LI /* Queue size increments:
2968ba7e87SXin LI  * The request queue grows a bit faster than the response queue -- the
3009100258SXin LI  * daemon can push requests and pull results faster on avarage than the
3168ba7e87SXin LI  * worker can process requests and push results...  If this really pays
3268ba7e87SXin LI  * off is debatable.
3368ba7e87SXin LI  */
342b15cb3dSCy Schubert #define WORKITEMS_ALLOC_INC	16
352b15cb3dSCy Schubert #define RESPONSES_ALLOC_INC	4
362b15cb3dSCy Schubert 
3768ba7e87SXin LI /* Fiddle with min/max stack sizes. 64kB minimum seems to work, so we
3868ba7e87SXin LI  * set the maximum to 256kB. If the minimum goes below the
3968ba7e87SXin LI  * system-defined minimum stack size, we have to adjust accordingly.
4068ba7e87SXin LI  */
412b15cb3dSCy Schubert #ifndef THREAD_MINSTACKSIZE
422b15cb3dSCy Schubert # define THREAD_MINSTACKSIZE	(64U * 1024)
432b15cb3dSCy Schubert #endif
4468ba7e87SXin LI 
4568ba7e87SXin LI #ifndef THREAD_MAXSTACKSIZE
4668ba7e87SXin LI # define THREAD_MAXSTACKSIZE	(256U * 1024)
4768ba7e87SXin LI #endif
4868ba7e87SXin LI 
494e1ef62aSXin LI /* need a good integer to store a pointer... */
504e1ef62aSXin LI #ifndef UINTPTR_T
514e1ef62aSXin LI # if defined(UINTPTR_MAX)
524e1ef62aSXin LI #  define UINTPTR_T uintptr_t
534e1ef62aSXin LI # elif defined(UINT_PTR)
544e1ef62aSXin LI #  define UINTPTR_T UINT_PTR
554e1ef62aSXin LI # else
564e1ef62aSXin LI #  define UINTPTR_T size_t
574e1ef62aSXin LI # endif
584e1ef62aSXin LI #endif
594e1ef62aSXin LI 
602b15cb3dSCy Schubert 
612b15cb3dSCy Schubert #ifdef SYS_WINNT
623311ff84SXin LI 
632b15cb3dSCy Schubert # define thread_exit(c)	_endthreadex(c)
643311ff84SXin LI # define tickle_sem(sh) ReleaseSemaphore((sh->shnd), 1, NULL)
653311ff84SXin LI u_int	WINAPI	blocking_thread(void *);
663311ff84SXin LI static BOOL	same_os_sema(const sem_ref obj, void * osobj);
673311ff84SXin LI 
682b15cb3dSCy Schubert #else
693311ff84SXin LI 
704e1ef62aSXin LI # define thread_exit(c)	pthread_exit((void*)(UINTPTR_T)(c))
712b15cb3dSCy Schubert # define tickle_sem	sem_post
723311ff84SXin LI void *		blocking_thread(void *);
733311ff84SXin LI static	void	block_thread_signals(sigset_t *);
743311ff84SXin LI 
752b15cb3dSCy Schubert #endif
762b15cb3dSCy Schubert 
772b15cb3dSCy Schubert #ifdef WORK_PIPE
782b15cb3dSCy Schubert addremove_io_fd_func		addremove_io_fd;
792b15cb3dSCy Schubert #else
802b15cb3dSCy Schubert addremove_io_semaphore_func	addremove_io_semaphore;
812b15cb3dSCy Schubert #endif
822b15cb3dSCy Schubert 
832b15cb3dSCy Schubert static	void	start_blocking_thread(blocking_child *);
842b15cb3dSCy Schubert static	void	start_blocking_thread_internal(blocking_child *);
852b15cb3dSCy Schubert static	void	prepare_child_sems(blocking_child *);
862b15cb3dSCy Schubert static	int	wait_for_sem(sem_ref, struct timespec *);
873311ff84SXin LI static	int	ensure_workitems_empty_slot(blocking_child *);
883311ff84SXin LI static	int	ensure_workresp_empty_slot(blocking_child *);
892b15cb3dSCy Schubert static	int	queue_req_pointer(blocking_child *, blocking_pipe_header *);
902b15cb3dSCy Schubert static	void	cleanup_after_child(blocking_child *);
912b15cb3dSCy Schubert 
924990d495SXin LI static sema_type worker_mmutex;
934990d495SXin LI static sem_ref   worker_memlock;
942b15cb3dSCy Schubert 
954990d495SXin LI /* --------------------------------------------------------------------
964990d495SXin LI  * locking the global worker state table (and other global stuff)
974990d495SXin LI  */
984990d495SXin LI void
worker_global_lock(int inOrOut)994990d495SXin LI worker_global_lock(
1004990d495SXin LI 	int inOrOut)
1014990d495SXin LI {
1024990d495SXin LI 	if (worker_memlock) {
1034990d495SXin LI 		if (inOrOut)
1044990d495SXin LI 			wait_for_sem(worker_memlock, NULL);
1054990d495SXin LI 		else
1064990d495SXin LI 			tickle_sem(worker_memlock);
1074990d495SXin LI 	}
1084990d495SXin LI }
1094990d495SXin LI 
1104990d495SXin LI /* --------------------------------------------------------------------
1114990d495SXin LI  * implementation isolation wrapper
1124990d495SXin LI  */
1132b15cb3dSCy Schubert void
exit_worker(int exitcode)1142b15cb3dSCy Schubert exit_worker(
1152b15cb3dSCy Schubert 	int	exitcode
1162b15cb3dSCy Schubert 	)
1172b15cb3dSCy Schubert {
1182b15cb3dSCy Schubert 	thread_exit(exitcode);	/* see #define thread_exit */
1192b15cb3dSCy Schubert }
1202b15cb3dSCy Schubert 
1213311ff84SXin LI /* --------------------------------------------------------------------
1223311ff84SXin LI  * sleep for a given time or until the wakup semaphore is tickled.
1233311ff84SXin LI  */
1242b15cb3dSCy Schubert int
worker_sleep(blocking_child * c,time_t seconds)1252b15cb3dSCy Schubert worker_sleep(
1262b15cb3dSCy Schubert 	blocking_child *	c,
1272b15cb3dSCy Schubert 	time_t			seconds
1282b15cb3dSCy Schubert 	)
1292b15cb3dSCy Schubert {
1302b15cb3dSCy Schubert 	struct timespec	until;
1312b15cb3dSCy Schubert 	int		rc;
1322b15cb3dSCy Schubert 
1332b15cb3dSCy Schubert # ifdef HAVE_CLOCK_GETTIME
1342b15cb3dSCy Schubert 	if (0 != clock_gettime(CLOCK_REALTIME, &until)) {
1352b15cb3dSCy Schubert 		msyslog(LOG_ERR, "worker_sleep: clock_gettime() failed: %m");
1362b15cb3dSCy Schubert 		return -1;
1372b15cb3dSCy Schubert 	}
1382b15cb3dSCy Schubert # else
1392b15cb3dSCy Schubert 	if (0 != getclock(TIMEOFDAY, &until)) {
1402b15cb3dSCy Schubert 		msyslog(LOG_ERR, "worker_sleep: getclock() failed: %m");
1412b15cb3dSCy Schubert 		return -1;
1422b15cb3dSCy Schubert 	}
1432b15cb3dSCy Schubert # endif
1442b15cb3dSCy Schubert 	until.tv_sec += seconds;
1452b15cb3dSCy Schubert 	rc = wait_for_sem(c->wake_scheduled_sleep, &until);
1462b15cb3dSCy Schubert 	if (0 == rc)
1472b15cb3dSCy Schubert 		return -1;
1482b15cb3dSCy Schubert 	if (-1 == rc && ETIMEDOUT == errno)
1492b15cb3dSCy Schubert 		return 0;
1502b15cb3dSCy Schubert 	msyslog(LOG_ERR, "worker_sleep: sem_timedwait: %m");
1512b15cb3dSCy Schubert 	return -1;
1522b15cb3dSCy Schubert }
1532b15cb3dSCy Schubert 
1542b15cb3dSCy Schubert 
1553311ff84SXin LI /* --------------------------------------------------------------------
1563311ff84SXin LI  * Wake up a worker that takes a nap.
1573311ff84SXin LI  */
1582b15cb3dSCy Schubert void
interrupt_worker_sleep(void)1592b15cb3dSCy Schubert interrupt_worker_sleep(void)
1602b15cb3dSCy Schubert {
1612b15cb3dSCy Schubert 	u_int			idx;
1622b15cb3dSCy Schubert 	blocking_child *	c;
1632b15cb3dSCy Schubert 
1642b15cb3dSCy Schubert 	for (idx = 0; idx < blocking_children_alloc; idx++) {
1652b15cb3dSCy Schubert 		c = blocking_children[idx];
1662b15cb3dSCy Schubert 		if (NULL == c || NULL == c->wake_scheduled_sleep)
1672b15cb3dSCy Schubert 			continue;
1682b15cb3dSCy Schubert 		tickle_sem(c->wake_scheduled_sleep);
1692b15cb3dSCy Schubert 	}
1702b15cb3dSCy Schubert }
1712b15cb3dSCy Schubert 
1723311ff84SXin LI /* --------------------------------------------------------------------
1733311ff84SXin LI  * Make sure there is an empty slot at the head of the request
1743311ff84SXin LI  * queue. Tell if the queue is currently empty.
1753311ff84SXin LI  */
1763311ff84SXin LI static int
ensure_workitems_empty_slot(blocking_child * c)1772b15cb3dSCy Schubert ensure_workitems_empty_slot(
1782b15cb3dSCy Schubert 	blocking_child *c
1792b15cb3dSCy Schubert 	)
1802b15cb3dSCy Schubert {
1813311ff84SXin LI 	/*
1823311ff84SXin LI 	** !!! PRECONDITION: caller holds access lock!
1833311ff84SXin LI 	**
1843311ff84SXin LI 	** This simply tries to increase the size of the buffer if it
1853311ff84SXin LI 	** becomes full. The resize operation does *not* maintain the
1863311ff84SXin LI 	** order of requests, but that should be irrelevant since the
1873311ff84SXin LI 	** processing is considered asynchronous anyway.
1883311ff84SXin LI 	**
1893311ff84SXin LI 	** Return if the buffer is currently empty.
1903311ff84SXin LI 	*/
1913311ff84SXin LI 
1923311ff84SXin LI 	static const size_t each =
1933311ff84SXin LI 	    sizeof(blocking_children[0]->workitems[0]);
1943311ff84SXin LI 
1952b15cb3dSCy Schubert 	size_t	new_alloc;
1963311ff84SXin LI 	size_t  slots_used;
19768ba7e87SXin LI 	size_t	sidx;
1982b15cb3dSCy Schubert 
1993311ff84SXin LI 	slots_used = c->head_workitem - c->tail_workitem;
2003311ff84SXin LI 	if (slots_used >= c->workitems_alloc) {
2012b15cb3dSCy Schubert 		new_alloc  = c->workitems_alloc + WORKITEMS_ALLOC_INC;
2023311ff84SXin LI 		c->workitems = erealloc(c->workitems, new_alloc * each);
20368ba7e87SXin LI 		for (sidx = c->workitems_alloc; sidx < new_alloc; ++sidx)
20468ba7e87SXin LI 		    c->workitems[sidx] = NULL;
2053311ff84SXin LI 		c->tail_workitem   = 0;
2063311ff84SXin LI 		c->head_workitem   = c->workitems_alloc;
2072b15cb3dSCy Schubert 		c->workitems_alloc = new_alloc;
2082b15cb3dSCy Schubert 	}
20968ba7e87SXin LI 	INSIST(NULL == c->workitems[c->head_workitem % c->workitems_alloc]);
2103311ff84SXin LI 	return (0 == slots_used);
2113311ff84SXin LI }
2122b15cb3dSCy Schubert 
2133311ff84SXin LI /* --------------------------------------------------------------------
2143311ff84SXin LI  * Make sure there is an empty slot at the head of the response
2153311ff84SXin LI  * queue. Tell if the queue is currently empty.
2163311ff84SXin LI  */
2173311ff84SXin LI static int
ensure_workresp_empty_slot(blocking_child * c)2182b15cb3dSCy Schubert ensure_workresp_empty_slot(
2192b15cb3dSCy Schubert 	blocking_child *c
2202b15cb3dSCy Schubert 	)
2212b15cb3dSCy Schubert {
2223311ff84SXin LI 	/*
2233311ff84SXin LI 	** !!! PRECONDITION: caller holds access lock!
2243311ff84SXin LI 	**
2253311ff84SXin LI 	** Works like the companion function above.
2263311ff84SXin LI 	*/
2273311ff84SXin LI 
2283311ff84SXin LI 	static const size_t each =
2293311ff84SXin LI 	    sizeof(blocking_children[0]->responses[0]);
2303311ff84SXin LI 
2312b15cb3dSCy Schubert 	size_t	new_alloc;
2323311ff84SXin LI 	size_t  slots_used;
23368ba7e87SXin LI 	size_t	sidx;
2342b15cb3dSCy Schubert 
2353311ff84SXin LI 	slots_used = c->head_response - c->tail_response;
2363311ff84SXin LI 	if (slots_used >= c->responses_alloc) {
2372b15cb3dSCy Schubert 		new_alloc  = c->responses_alloc + RESPONSES_ALLOC_INC;
2383311ff84SXin LI 		c->responses = erealloc(c->responses, new_alloc * each);
23968ba7e87SXin LI 		for (sidx = c->responses_alloc; sidx < new_alloc; ++sidx)
24068ba7e87SXin LI 		    c->responses[sidx] = NULL;
2413311ff84SXin LI 		c->tail_response   = 0;
2423311ff84SXin LI 		c->head_response   = c->responses_alloc;
2432b15cb3dSCy Schubert 		c->responses_alloc = new_alloc;
2442b15cb3dSCy Schubert 	}
24568ba7e87SXin LI 	INSIST(NULL == c->responses[c->head_response % c->responses_alloc]);
2463311ff84SXin LI 	return (0 == slots_used);
2473311ff84SXin LI }
2482b15cb3dSCy Schubert 
2492b15cb3dSCy Schubert 
2503311ff84SXin LI /* --------------------------------------------------------------------
2512b15cb3dSCy Schubert  * queue_req_pointer() - append a work item or idle exit request to
2523311ff84SXin LI  *			 blocking_workitems[]. Employ proper locking.
2532b15cb3dSCy Schubert  */
2542b15cb3dSCy Schubert static int
queue_req_pointer(blocking_child * c,blocking_pipe_header * hdr)2552b15cb3dSCy Schubert queue_req_pointer(
2562b15cb3dSCy Schubert 	blocking_child	*	c,
2572b15cb3dSCy Schubert 	blocking_pipe_header *	hdr
2582b15cb3dSCy Schubert 	)
2592b15cb3dSCy Schubert {
2603311ff84SXin LI 	size_t qhead;
2612b15cb3dSCy Schubert 
2623311ff84SXin LI 	/* >>>> ACCESS LOCKING STARTS >>>> */
2633311ff84SXin LI 	wait_for_sem(c->accesslock, NULL);
2643311ff84SXin LI 	ensure_workitems_empty_slot(c);
2653311ff84SXin LI 	qhead = c->head_workitem;
2663311ff84SXin LI 	c->workitems[qhead % c->workitems_alloc] = hdr;
2673311ff84SXin LI 	c->head_workitem = 1 + qhead;
2683311ff84SXin LI 	tickle_sem(c->accesslock);
2693311ff84SXin LI 	/* <<<< ACCESS LOCKING ENDS <<<< */
2703311ff84SXin LI 
2713311ff84SXin LI 	/* queue consumer wake-up notification */
2723311ff84SXin LI 	tickle_sem(c->workitems_pending);
2732b15cb3dSCy Schubert 
2742b15cb3dSCy Schubert 	return 0;
2752b15cb3dSCy Schubert }
2762b15cb3dSCy Schubert 
2773311ff84SXin LI /* --------------------------------------------------------------------
2783311ff84SXin LI  * API function to make sure a worker is running, a proper private copy
2793311ff84SXin LI  * of the data is made, the data eneterd into the queue and the worker
2803311ff84SXin LI  * is signalled.
2813311ff84SXin LI  */
2822b15cb3dSCy Schubert int
send_blocking_req_internal(blocking_child * c,blocking_pipe_header * hdr,void * data)2832b15cb3dSCy Schubert send_blocking_req_internal(
2842b15cb3dSCy Schubert 	blocking_child *	c,
2852b15cb3dSCy Schubert 	blocking_pipe_header *	hdr,
2862b15cb3dSCy Schubert 	void *			data
2872b15cb3dSCy Schubert 	)
2882b15cb3dSCy Schubert {
2892b15cb3dSCy Schubert 	blocking_pipe_header *	threadcopy;
2902b15cb3dSCy Schubert 	size_t			payload_octets;
2912b15cb3dSCy Schubert 
2922b15cb3dSCy Schubert 	REQUIRE(hdr != NULL);
2932b15cb3dSCy Schubert 	REQUIRE(data != NULL);
2942b15cb3dSCy Schubert 	DEBUG_REQUIRE(BLOCKING_REQ_MAGIC == hdr->magic_sig);
2952b15cb3dSCy Schubert 
2962b15cb3dSCy Schubert 	if (hdr->octets <= sizeof(*hdr))
2972b15cb3dSCy Schubert 		return 1;	/* failure */
2982b15cb3dSCy Schubert 	payload_octets = hdr->octets - sizeof(*hdr);
2992b15cb3dSCy Schubert 
3003311ff84SXin LI 	if (NULL == c->thread_ref)
3012b15cb3dSCy Schubert 		start_blocking_thread(c);
3022b15cb3dSCy Schubert 	threadcopy = emalloc(hdr->octets);
3032b15cb3dSCy Schubert 	memcpy(threadcopy, hdr, sizeof(*hdr));
3042b15cb3dSCy Schubert 	memcpy((char *)threadcopy + sizeof(*hdr), data, payload_octets);
3052b15cb3dSCy Schubert 
3062b15cb3dSCy Schubert 	return queue_req_pointer(c, threadcopy);
3072b15cb3dSCy Schubert }
3082b15cb3dSCy Schubert 
3093311ff84SXin LI /* --------------------------------------------------------------------
3103311ff84SXin LI  * Wait for the 'incoming queue no longer empty' signal, lock the shared
3113311ff84SXin LI  * structure and dequeue an item.
3123311ff84SXin LI  */
3132b15cb3dSCy Schubert blocking_pipe_header *
receive_blocking_req_internal(blocking_child * c)3142b15cb3dSCy Schubert receive_blocking_req_internal(
3152b15cb3dSCy Schubert 	blocking_child *	c
3162b15cb3dSCy Schubert 	)
3172b15cb3dSCy Schubert {
3182b15cb3dSCy Schubert 	blocking_pipe_header *	req;
3193311ff84SXin LI 	size_t			qhead, qtail;
3202b15cb3dSCy Schubert 
3213311ff84SXin LI 	req = NULL;
3222b15cb3dSCy Schubert 	do {
3233311ff84SXin LI 		/* wait for tickle from the producer side */
3243311ff84SXin LI 		wait_for_sem(c->workitems_pending, NULL);
3252b15cb3dSCy Schubert 
3263311ff84SXin LI 		/* >>>> ACCESS LOCKING STARTS >>>> */
3273311ff84SXin LI 		wait_for_sem(c->accesslock, NULL);
3283311ff84SXin LI 		qhead = c->head_workitem;
3293311ff84SXin LI 		do {
3303311ff84SXin LI 			qtail = c->tail_workitem;
3313311ff84SXin LI 			if (qhead == qtail)
3323311ff84SXin LI 				break;
3333311ff84SXin LI 			c->tail_workitem = qtail + 1;
3343311ff84SXin LI 			qtail %= c->workitems_alloc;
3353311ff84SXin LI 			req = c->workitems[qtail];
3363311ff84SXin LI 			c->workitems[qtail] = NULL;
3373311ff84SXin LI 		} while (NULL == req);
3383311ff84SXin LI 		tickle_sem(c->accesslock);
3393311ff84SXin LI 		/* <<<< ACCESS LOCKING ENDS <<<< */
3403311ff84SXin LI 
3413311ff84SXin LI 	} while (NULL == req);
3423311ff84SXin LI 
3432b15cb3dSCy Schubert 	INSIST(NULL != req);
3442b15cb3dSCy Schubert 	if (CHILD_EXIT_REQ == req) {	/* idled out */
3452b15cb3dSCy Schubert 		send_blocking_resp_internal(c, CHILD_GONE_RESP);
3462b15cb3dSCy Schubert 		req = NULL;
3472b15cb3dSCy Schubert 	}
3482b15cb3dSCy Schubert 
3492b15cb3dSCy Schubert 	return req;
3502b15cb3dSCy Schubert }
3512b15cb3dSCy Schubert 
3523311ff84SXin LI /* --------------------------------------------------------------------
3533311ff84SXin LI  * Push a response into the return queue and eventually tickle the
3543311ff84SXin LI  * receiver.
3553311ff84SXin LI  */
3562b15cb3dSCy Schubert int
send_blocking_resp_internal(blocking_child * c,blocking_pipe_header * resp)3572b15cb3dSCy Schubert send_blocking_resp_internal(
3582b15cb3dSCy Schubert 	blocking_child *	c,
3592b15cb3dSCy Schubert 	blocking_pipe_header *	resp
3602b15cb3dSCy Schubert 	)
3612b15cb3dSCy Schubert {
3623311ff84SXin LI 	size_t	qhead;
3633311ff84SXin LI 	int	empty;
3642b15cb3dSCy Schubert 
3653311ff84SXin LI 	/* >>>> ACCESS LOCKING STARTS >>>> */
3663311ff84SXin LI 	wait_for_sem(c->accesslock, NULL);
3673311ff84SXin LI 	empty = ensure_workresp_empty_slot(c);
3683311ff84SXin LI 	qhead = c->head_response;
3693311ff84SXin LI 	c->responses[qhead % c->responses_alloc] = resp;
3703311ff84SXin LI 	c->head_response = 1 + qhead;
3713311ff84SXin LI 	tickle_sem(c->accesslock);
3723311ff84SXin LI 	/* <<<< ACCESS LOCKING ENDS <<<< */
3732b15cb3dSCy Schubert 
3743311ff84SXin LI 	/* queue consumer wake-up notification */
3753311ff84SXin LI 	if (empty)
3763311ff84SXin LI 	{
3772b15cb3dSCy Schubert #	    ifdef WORK_PIPE
3784e1ef62aSXin LI 		if (1 != write(c->resp_write_pipe, "", 1))
379*f5f40dd6SCy Schubert 			msyslog(LOG_WARNING, "async resolver: blocking_get%sinfo"
380*f5f40dd6SCy Schubert 				" failed to notify main thread!",
381*f5f40dd6SCy Schubert 				(BLOCKING_GETNAMEINFO == resp->rtype)
382*f5f40dd6SCy Schubert 				    ? "name"
383*f5f40dd6SCy Schubert 				    : "addr"
384*f5f40dd6SCy Schubert 				);
3852b15cb3dSCy Schubert #	    else
3863311ff84SXin LI 		tickle_sem(c->responses_pending);
3872b15cb3dSCy Schubert #	    endif
3883311ff84SXin LI 	}
3892b15cb3dSCy Schubert 	return 0;
3902b15cb3dSCy Schubert }
3912b15cb3dSCy Schubert 
3922b15cb3dSCy Schubert 
3932b15cb3dSCy Schubert #ifndef WORK_PIPE
3943311ff84SXin LI 
3953311ff84SXin LI /* --------------------------------------------------------------------
396a466cc55SCy Schubert  * Check if a (Windows-)handle to a semaphore is actually the same we
3973311ff84SXin LI  * are using inside the sema wrapper.
3983311ff84SXin LI  */
3993311ff84SXin LI static BOOL
same_os_sema(const sem_ref obj,void * osh)4003311ff84SXin LI same_os_sema(
4013311ff84SXin LI 	const sem_ref	obj,
4023311ff84SXin LI 	void*		osh
4033311ff84SXin LI 	)
4043311ff84SXin LI {
4053311ff84SXin LI 	return obj && osh && (obj->shnd == (HANDLE)osh);
4063311ff84SXin LI }
4073311ff84SXin LI 
4083311ff84SXin LI /* --------------------------------------------------------------------
4093311ff84SXin LI  * Find the shared context that associates to an OS handle and make sure
4103311ff84SXin LI  * the data is dequeued and processed.
4113311ff84SXin LI  */
4122b15cb3dSCy Schubert void
handle_blocking_resp_sem(void * context)4132b15cb3dSCy Schubert handle_blocking_resp_sem(
4142b15cb3dSCy Schubert 	void *	context
4152b15cb3dSCy Schubert 	)
4162b15cb3dSCy Schubert {
4172b15cb3dSCy Schubert 	blocking_child *	c;
4182b15cb3dSCy Schubert 	u_int			idx;
4192b15cb3dSCy Schubert 
4202b15cb3dSCy Schubert 	c = NULL;
4212b15cb3dSCy Schubert 	for (idx = 0; idx < blocking_children_alloc; idx++) {
4222b15cb3dSCy Schubert 		c = blocking_children[idx];
4233311ff84SXin LI 		if (c != NULL &&
4243311ff84SXin LI 			c->thread_ref != NULL &&
4253311ff84SXin LI 			same_os_sema(c->responses_pending, context))
4262b15cb3dSCy Schubert 			break;
4272b15cb3dSCy Schubert 	}
4282b15cb3dSCy Schubert 	if (idx < blocking_children_alloc)
4292b15cb3dSCy Schubert 		process_blocking_resp(c);
4302b15cb3dSCy Schubert }
4312b15cb3dSCy Schubert #endif	/* !WORK_PIPE */
4322b15cb3dSCy Schubert 
4333311ff84SXin LI /* --------------------------------------------------------------------
4343311ff84SXin LI  * Fetch the next response from the return queue. In case of signalling
4353311ff84SXin LI  * via pipe, make sure the pipe is flushed, too.
4363311ff84SXin LI  */
4372b15cb3dSCy Schubert blocking_pipe_header *
receive_blocking_resp_internal(blocking_child * c)4382b15cb3dSCy Schubert receive_blocking_resp_internal(
4392b15cb3dSCy Schubert 	blocking_child *	c
4402b15cb3dSCy Schubert 	)
4412b15cb3dSCy Schubert {
4422b15cb3dSCy Schubert 	blocking_pipe_header *	removed;
4433311ff84SXin LI 	size_t			qhead, qtail, slot;
4443311ff84SXin LI 
4452b15cb3dSCy Schubert #ifdef WORK_PIPE
4462b15cb3dSCy Schubert 	int			rc;
4472b15cb3dSCy Schubert 	char			scratch[32];
4482b15cb3dSCy Schubert 
4493311ff84SXin LI 	do
4502b15cb3dSCy Schubert 		rc = read(c->resp_read_pipe, scratch, sizeof(scratch));
4513311ff84SXin LI 	while (-1 == rc && EINTR == errno);
4522b15cb3dSCy Schubert #endif
4533311ff84SXin LI 
4543311ff84SXin LI 	/* >>>> ACCESS LOCKING STARTS >>>> */
4553311ff84SXin LI 	wait_for_sem(c->accesslock, NULL);
4563311ff84SXin LI 	qhead = c->head_response;
4573311ff84SXin LI 	qtail = c->tail_response;
4583311ff84SXin LI 	for (removed = NULL; !removed && (qhead != qtail); ++qtail) {
4593311ff84SXin LI 		slot = qtail % c->responses_alloc;
4603311ff84SXin LI 		removed = c->responses[slot];
4613311ff84SXin LI 		c->responses[slot] = NULL;
4623311ff84SXin LI 	}
4633311ff84SXin LI 	c->tail_response = qtail;
4643311ff84SXin LI 	tickle_sem(c->accesslock);
4653311ff84SXin LI 	/* <<<< ACCESS LOCKING ENDS <<<< */
4663311ff84SXin LI 
4672b15cb3dSCy Schubert 	if (NULL != removed) {
4682b15cb3dSCy Schubert 		DEBUG_ENSURE(CHILD_GONE_RESP == removed ||
4692b15cb3dSCy Schubert 			     BLOCKING_RESP_MAGIC == removed->magic_sig);
4702b15cb3dSCy Schubert 	}
4712b15cb3dSCy Schubert 	if (CHILD_GONE_RESP == removed) {
4722b15cb3dSCy Schubert 		cleanup_after_child(c);
4732b15cb3dSCy Schubert 		removed = NULL;
4742b15cb3dSCy Schubert 	}
4752b15cb3dSCy Schubert 
4762b15cb3dSCy Schubert 	return removed;
4772b15cb3dSCy Schubert }
4782b15cb3dSCy Schubert 
4793311ff84SXin LI /* --------------------------------------------------------------------
4803311ff84SXin LI  * Light up a new worker.
4813311ff84SXin LI  */
4822b15cb3dSCy Schubert static void
start_blocking_thread(blocking_child * c)4832b15cb3dSCy Schubert start_blocking_thread(
4842b15cb3dSCy Schubert 	blocking_child *	c
4852b15cb3dSCy Schubert 	)
4862b15cb3dSCy Schubert {
4872b15cb3dSCy Schubert 
4882b15cb3dSCy Schubert 	DEBUG_INSIST(!c->reusable);
4892b15cb3dSCy Schubert 
4902b15cb3dSCy Schubert 	prepare_child_sems(c);
4912b15cb3dSCy Schubert 	start_blocking_thread_internal(c);
4922b15cb3dSCy Schubert }
4932b15cb3dSCy Schubert 
4943311ff84SXin LI /* --------------------------------------------------------------------
4953311ff84SXin LI  * Create a worker thread. There are several differences between POSIX
496*f5f40dd6SCy Schubert  * and Windows, of course -- most notably the Windows thread is a
4973311ff84SXin LI  * detached thread, and we keep the handle around until we want to get
4983311ff84SXin LI  * rid of the thread. The notification scheme also differs: Windows
4993311ff84SXin LI  * makes use of semaphores in both directions, POSIX uses a pipe for
5003311ff84SXin LI  * integration with 'select()' or alike.
5013311ff84SXin LI  */
5022b15cb3dSCy Schubert static void
start_blocking_thread_internal(blocking_child * c)5032b15cb3dSCy Schubert start_blocking_thread_internal(
5042b15cb3dSCy Schubert 	blocking_child *	c
5052b15cb3dSCy Schubert 	)
5062b15cb3dSCy Schubert #ifdef SYS_WINNT
5072b15cb3dSCy Schubert {
5082b15cb3dSCy Schubert 	BOOL	resumed;
5092b15cb3dSCy Schubert 
5103311ff84SXin LI 	c->thread_ref = NULL;
5113311ff84SXin LI 	(*addremove_io_semaphore)(c->responses_pending->shnd, FALSE);
5123311ff84SXin LI 	c->thr_table[0].thnd =
5132b15cb3dSCy Schubert 		(HANDLE)_beginthreadex(
5142b15cb3dSCy Schubert 			NULL,
5152b15cb3dSCy Schubert 			0,
5162b15cb3dSCy Schubert 			&blocking_thread,
5172b15cb3dSCy Schubert 			c,
5182b15cb3dSCy Schubert 			CREATE_SUSPENDED,
5193311ff84SXin LI 			NULL);
5202b15cb3dSCy Schubert 
5213311ff84SXin LI 	if (NULL == c->thr_table[0].thnd) {
5222b15cb3dSCy Schubert 		msyslog(LOG_ERR, "start blocking thread failed: %m");
5232b15cb3dSCy Schubert 		exit(-1);
5242b15cb3dSCy Schubert 	}
5252b15cb3dSCy Schubert 	/* remember the thread priority is only within the process class */
5263311ff84SXin LI 	if (!SetThreadPriority(c->thr_table[0].thnd,
527*f5f40dd6SCy Schubert 			       THREAD_PRIORITY_BELOW_NORMAL)) {
5282b15cb3dSCy Schubert 		msyslog(LOG_ERR, "Error lowering blocking thread priority: %m");
529*f5f40dd6SCy Schubert 	}
530*f5f40dd6SCy Schubert 	if (NULL != pSetThreadDescription) {
531*f5f40dd6SCy Schubert 		(*pSetThreadDescription)(c->thr_table[0].thnd, L"ntp_worker");
532*f5f40dd6SCy Schubert 	}
5333311ff84SXin LI 	resumed = ResumeThread(c->thr_table[0].thnd);
5342b15cb3dSCy Schubert 	DEBUG_INSIST(resumed);
5353311ff84SXin LI 	c->thread_ref = &c->thr_table[0];
5362b15cb3dSCy Schubert }
5372b15cb3dSCy Schubert #else	/* pthreads start_blocking_thread_internal() follows */
5382b15cb3dSCy Schubert {
5392b15cb3dSCy Schubert # ifdef NEED_PTHREAD_INIT
5402b15cb3dSCy Schubert 	static int	pthread_init_called;
5412b15cb3dSCy Schubert # endif
5422b15cb3dSCy Schubert 	pthread_attr_t	thr_attr;
5432b15cb3dSCy Schubert 	int		rc;
5442b15cb3dSCy Schubert 	int		pipe_ends[2];	/* read then write */
5452b15cb3dSCy Schubert 	int		is_pipe;
5462b15cb3dSCy Schubert 	int		flags;
54768ba7e87SXin LI 	size_t		ostacksize;
54868ba7e87SXin LI 	size_t		nstacksize;
5492b15cb3dSCy Schubert 	sigset_t	saved_sig_mask;
5502b15cb3dSCy Schubert 
5513311ff84SXin LI 	c->thread_ref = NULL;
5523311ff84SXin LI 
5532b15cb3dSCy Schubert # ifdef NEED_PTHREAD_INIT
5542b15cb3dSCy Schubert 	/*
5552b15cb3dSCy Schubert 	 * from lib/isc/unix/app.c:
5562b15cb3dSCy Schubert 	 * BSDI 3.1 seg faults in pthread_sigmask() if we don't do this.
5572b15cb3dSCy Schubert 	 */
5582b15cb3dSCy Schubert 	if (!pthread_init_called) {
5592b15cb3dSCy Schubert 		pthread_init();
5602b15cb3dSCy Schubert 		pthread_init_called = TRUE;
5612b15cb3dSCy Schubert 	}
5622b15cb3dSCy Schubert # endif
5632b15cb3dSCy Schubert 
5642b15cb3dSCy Schubert 	rc = pipe_socketpair(&pipe_ends[0], &is_pipe);
5652b15cb3dSCy Schubert 	if (0 != rc) {
5662b15cb3dSCy Schubert 		msyslog(LOG_ERR, "start_blocking_thread: pipe_socketpair() %m");
5672b15cb3dSCy Schubert 		exit(1);
5682b15cb3dSCy Schubert 	}
5692b15cb3dSCy Schubert 	c->resp_read_pipe = move_fd(pipe_ends[0]);
5702b15cb3dSCy Schubert 	c->resp_write_pipe = move_fd(pipe_ends[1]);
5712b15cb3dSCy Schubert 	c->ispipe = is_pipe;
5722b15cb3dSCy Schubert 	flags = fcntl(c->resp_read_pipe, F_GETFL, 0);
5732b15cb3dSCy Schubert 	if (-1 == flags) {
5742b15cb3dSCy Schubert 		msyslog(LOG_ERR, "start_blocking_thread: fcntl(F_GETFL) %m");
5752b15cb3dSCy Schubert 		exit(1);
5762b15cb3dSCy Schubert 	}
5772b15cb3dSCy Schubert 	rc = fcntl(c->resp_read_pipe, F_SETFL, O_NONBLOCK | flags);
5782b15cb3dSCy Schubert 	if (-1 == rc) {
5792b15cb3dSCy Schubert 		msyslog(LOG_ERR,
5802b15cb3dSCy Schubert 			"start_blocking_thread: fcntl(F_SETFL, O_NONBLOCK) %m");
5812b15cb3dSCy Schubert 		exit(1);
5822b15cb3dSCy Schubert 	}
5832b15cb3dSCy Schubert 	(*addremove_io_fd)(c->resp_read_pipe, c->ispipe, FALSE);
5842b15cb3dSCy Schubert 	pthread_attr_init(&thr_attr);
5852b15cb3dSCy Schubert 	pthread_attr_setdetachstate(&thr_attr, PTHREAD_CREATE_DETACHED);
5862b15cb3dSCy Schubert #if defined(HAVE_PTHREAD_ATTR_GETSTACKSIZE) && \
5872b15cb3dSCy Schubert     defined(HAVE_PTHREAD_ATTR_SETSTACKSIZE)
58868ba7e87SXin LI 	rc = pthread_attr_getstacksize(&thr_attr, &ostacksize);
58968ba7e87SXin LI 	if (0 != rc) {
5902b15cb3dSCy Schubert 		msyslog(LOG_ERR,
59168ba7e87SXin LI 			"start_blocking_thread: pthread_attr_getstacksize() -> %s",
59268ba7e87SXin LI 			strerror(rc));
59368ba7e87SXin LI 	} else {
59468ba7e87SXin LI 		nstacksize = ostacksize;
595a466cc55SCy Schubert 		/* order is important here: first clamp on upper limit,
596a466cc55SCy Schubert 		 * and the PTHREAD min stack size is ultimate override!
597a466cc55SCy Schubert 		 */
598a466cc55SCy Schubert 		if (nstacksize > THREAD_MAXSTACKSIZE)
599a466cc55SCy Schubert 			nstacksize = THREAD_MAXSTACKSIZE;
600a466cc55SCy Schubert #            ifdef PTHREAD_STACK_MAX
601a466cc55SCy Schubert 		if (nstacksize > PTHREAD_STACK_MAX)
602a466cc55SCy Schubert 			nstacksize = PTHREAD_STACK_MAX;
603a466cc55SCy Schubert #            endif
604a466cc55SCy Schubert 
605a466cc55SCy Schubert 		/* now clamp on lower stack limit. */
606a466cc55SCy Schubert 		if (nstacksize < THREAD_MINSTACKSIZE)
607a466cc55SCy Schubert 			nstacksize = THREAD_MINSTACKSIZE;
608a466cc55SCy Schubert #            ifdef PTHREAD_STACK_MIN
609a466cc55SCy Schubert 		if (nstacksize < PTHREAD_STACK_MIN)
610a466cc55SCy Schubert 			nstacksize = PTHREAD_STACK_MIN;
611a466cc55SCy Schubert #            endif
612a466cc55SCy Schubert 
61368ba7e87SXin LI 		if (nstacksize != ostacksize)
61468ba7e87SXin LI 			rc = pthread_attr_setstacksize(&thr_attr, nstacksize);
61568ba7e87SXin LI 		if (0 != rc)
6162b15cb3dSCy Schubert 			msyslog(LOG_ERR,
61768ba7e87SXin LI 				"start_blocking_thread: pthread_attr_setstacksize(0x%lx -> 0x%lx) -> %s",
61868ba7e87SXin LI 				(u_long)ostacksize, (u_long)nstacksize,
61968ba7e87SXin LI 				strerror(rc));
6202b15cb3dSCy Schubert 	}
6212b15cb3dSCy Schubert #else
62268ba7e87SXin LI 	UNUSED_ARG(nstacksize);
62368ba7e87SXin LI 	UNUSED_ARG(ostacksize);
6242b15cb3dSCy Schubert #endif
6252b15cb3dSCy Schubert #if defined(PTHREAD_SCOPE_SYSTEM) && defined(NEED_PTHREAD_SCOPE_SYSTEM)
6262b15cb3dSCy Schubert 	pthread_attr_setscope(&thr_attr, PTHREAD_SCOPE_SYSTEM);
6272b15cb3dSCy Schubert #endif
6282b15cb3dSCy Schubert 	c->thread_ref = emalloc_zero(sizeof(*c->thread_ref));
6292b15cb3dSCy Schubert 	block_thread_signals(&saved_sig_mask);
6303311ff84SXin LI 	rc = pthread_create(&c->thr_table[0], &thr_attr,
6312b15cb3dSCy Schubert 			    &blocking_thread, c);
6322b15cb3dSCy Schubert 	pthread_sigmask(SIG_SETMASK, &saved_sig_mask, NULL);
6332b15cb3dSCy Schubert 	pthread_attr_destroy(&thr_attr);
6342b15cb3dSCy Schubert 	if (0 != rc) {
63568ba7e87SXin LI 		msyslog(LOG_ERR, "start_blocking_thread: pthread_create() -> %s",
63668ba7e87SXin LI 			strerror(rc));
6372b15cb3dSCy Schubert 		exit(1);
6382b15cb3dSCy Schubert 	}
6393311ff84SXin LI 	c->thread_ref = &c->thr_table[0];
6402b15cb3dSCy Schubert }
6412b15cb3dSCy Schubert #endif
6422b15cb3dSCy Schubert 
6433311ff84SXin LI /* --------------------------------------------------------------------
6442b15cb3dSCy Schubert  * block_thread_signals()
6452b15cb3dSCy Schubert  *
6462b15cb3dSCy Schubert  * Temporarily block signals used by ntpd main thread, so that signal
6472b15cb3dSCy Schubert  * mask inherited by child threads leaves them blocked.  Returns prior
6482b15cb3dSCy Schubert  * active signal mask via pmask, to be restored by the main thread
6492b15cb3dSCy Schubert  * after pthread_create().
6502b15cb3dSCy Schubert  */
6512b15cb3dSCy Schubert #ifndef SYS_WINNT
6522b15cb3dSCy Schubert void
block_thread_signals(sigset_t * pmask)6532b15cb3dSCy Schubert block_thread_signals(
6542b15cb3dSCy Schubert 	sigset_t *	pmask
6552b15cb3dSCy Schubert 	)
6562b15cb3dSCy Schubert {
6572b15cb3dSCy Schubert 	sigset_t	block;
6582b15cb3dSCy Schubert 
6592b15cb3dSCy Schubert 	sigemptyset(&block);
6602b15cb3dSCy Schubert # ifdef HAVE_SIGNALED_IO
6612b15cb3dSCy Schubert #  ifdef SIGIO
6622b15cb3dSCy Schubert 	sigaddset(&block, SIGIO);
6632b15cb3dSCy Schubert #  endif
6642b15cb3dSCy Schubert #  ifdef SIGPOLL
6652b15cb3dSCy Schubert 	sigaddset(&block, SIGPOLL);
6662b15cb3dSCy Schubert #  endif
6672b15cb3dSCy Schubert # endif	/* HAVE_SIGNALED_IO */
6682b15cb3dSCy Schubert 	sigaddset(&block, SIGALRM);
6692b15cb3dSCy Schubert 	sigaddset(&block, MOREDEBUGSIG);
6702b15cb3dSCy Schubert 	sigaddset(&block, LESSDEBUGSIG);
6712b15cb3dSCy Schubert # ifdef SIGDIE1
6722b15cb3dSCy Schubert 	sigaddset(&block, SIGDIE1);
6732b15cb3dSCy Schubert # endif
6742b15cb3dSCy Schubert # ifdef SIGDIE2
6752b15cb3dSCy Schubert 	sigaddset(&block, SIGDIE2);
6762b15cb3dSCy Schubert # endif
6772b15cb3dSCy Schubert # ifdef SIGDIE3
6782b15cb3dSCy Schubert 	sigaddset(&block, SIGDIE3);
6792b15cb3dSCy Schubert # endif
6802b15cb3dSCy Schubert # ifdef SIGDIE4
6812b15cb3dSCy Schubert 	sigaddset(&block, SIGDIE4);
6822b15cb3dSCy Schubert # endif
6832b15cb3dSCy Schubert # ifdef SIGBUS
6842b15cb3dSCy Schubert 	sigaddset(&block, SIGBUS);
6852b15cb3dSCy Schubert # endif
6862b15cb3dSCy Schubert 	sigemptyset(pmask);
6872b15cb3dSCy Schubert 	pthread_sigmask(SIG_BLOCK, &block, pmask);
6882b15cb3dSCy Schubert }
6892b15cb3dSCy Schubert #endif	/* !SYS_WINNT */
6902b15cb3dSCy Schubert 
6912b15cb3dSCy Schubert 
6923311ff84SXin LI /* --------------------------------------------------------------------
6933311ff84SXin LI  * Create & destroy semaphores. This is sufficiently different between
6943311ff84SXin LI  * POSIX and Windows to warrant wrapper functions and close enough to
6953311ff84SXin LI  * use the concept of synchronization via semaphore for all platforms.
6963311ff84SXin LI  */
6973311ff84SXin LI static sem_ref
create_sema(sema_type * semptr,u_int inival,u_int maxval)6983311ff84SXin LI create_sema(
6993311ff84SXin LI 	sema_type*	semptr,
7003311ff84SXin LI 	u_int		inival,
7013311ff84SXin LI 	u_int		maxval)
7023311ff84SXin LI {
7033311ff84SXin LI #ifdef SYS_WINNT
7043311ff84SXin LI 
7053311ff84SXin LI 	long svini, svmax;
7063311ff84SXin LI 	if (NULL != semptr) {
7073311ff84SXin LI 		svini = (inival < LONG_MAX)
7083311ff84SXin LI 		    ? (long)inival : LONG_MAX;
7093311ff84SXin LI 		svmax = (maxval < LONG_MAX && maxval > 0)
7103311ff84SXin LI 		    ? (long)maxval : LONG_MAX;
7113311ff84SXin LI 		semptr->shnd = CreateSemaphore(NULL, svini, svmax, NULL);
7123311ff84SXin LI 		if (NULL == semptr->shnd)
7133311ff84SXin LI 			semptr = NULL;
7143311ff84SXin LI 	}
7153311ff84SXin LI 
7163311ff84SXin LI #else
7173311ff84SXin LI 
7183311ff84SXin LI 	(void)maxval;
7193311ff84SXin LI 	if (semptr && sem_init(semptr, FALSE, inival))
7203311ff84SXin LI 		semptr = NULL;
7213311ff84SXin LI 
7223311ff84SXin LI #endif
7233311ff84SXin LI 
7243311ff84SXin LI 	return semptr;
7253311ff84SXin LI }
7263311ff84SXin LI 
7273311ff84SXin LI /* ------------------------------------------------------------------ */
7283311ff84SXin LI static sem_ref
delete_sema(sem_ref obj)7293311ff84SXin LI delete_sema(
7303311ff84SXin LI 	sem_ref obj)
7313311ff84SXin LI {
7323311ff84SXin LI 
7333311ff84SXin LI #   ifdef SYS_WINNT
7343311ff84SXin LI 
7353311ff84SXin LI 	if (obj) {
7363311ff84SXin LI 		if (obj->shnd)
7373311ff84SXin LI 			CloseHandle(obj->shnd);
7383311ff84SXin LI 		obj->shnd = NULL;
7393311ff84SXin LI 	}
7403311ff84SXin LI 
7413311ff84SXin LI #   else
7423311ff84SXin LI 
7433311ff84SXin LI 	if (obj)
7443311ff84SXin LI 		sem_destroy(obj);
7453311ff84SXin LI 
7463311ff84SXin LI #   endif
7473311ff84SXin LI 
7483311ff84SXin LI 	return NULL;
7493311ff84SXin LI }
7503311ff84SXin LI 
7513311ff84SXin LI /* --------------------------------------------------------------------
7522b15cb3dSCy Schubert  * prepare_child_sems()
7532b15cb3dSCy Schubert  *
7543311ff84SXin LI  * create sync & access semaphores
7552b15cb3dSCy Schubert  *
7563311ff84SXin LI  * All semaphores are cleared, only the access semaphore has 1 unit.
7573311ff84SXin LI  * Childs wait on 'workitems_pending', then grabs 'sema_access'
7583311ff84SXin LI  * and dequeues jobs. When done, 'sema_access' is given one unit back.
7593311ff84SXin LI  *
7603311ff84SXin LI  * The producer grabs 'sema_access', manages the queue, restores
7613311ff84SXin LI  * 'sema_access' and puts one unit into 'workitems_pending'.
7623311ff84SXin LI  *
7633311ff84SXin LI  * The story goes the same for the response queue.
7642b15cb3dSCy Schubert  */
7652b15cb3dSCy Schubert static void
prepare_child_sems(blocking_child * c)7662b15cb3dSCy Schubert prepare_child_sems(
7672b15cb3dSCy Schubert 	blocking_child *c
7682b15cb3dSCy Schubert 	)
7692b15cb3dSCy Schubert {
7704990d495SXin LI 	if (NULL == worker_memlock)
7714990d495SXin LI 		worker_memlock = create_sema(&worker_mmutex, 1, 1);
7724990d495SXin LI 
7733311ff84SXin LI 	c->accesslock           = create_sema(&c->sem_table[0], 1, 1);
7743311ff84SXin LI 	c->workitems_pending    = create_sema(&c->sem_table[1], 0, 0);
7753311ff84SXin LI 	c->wake_scheduled_sleep = create_sema(&c->sem_table[2], 0, 1);
7763311ff84SXin LI #   ifndef WORK_PIPE
7773311ff84SXin LI 	c->responses_pending    = create_sema(&c->sem_table[3], 0, 0);
7782b15cb3dSCy Schubert #   endif
7793311ff84SXin LI }
7802b15cb3dSCy Schubert 
7813311ff84SXin LI /* --------------------------------------------------------------------
7823311ff84SXin LI  * wait for semaphore. Where the wait can be interrupted, it will
7833311ff84SXin LI  * internally resume -- When this function returns, there is either no
7843311ff84SXin LI  * semaphore at all, a timeout occurred, or the caller could
7853311ff84SXin LI  * successfully take a token from the semaphore.
7863311ff84SXin LI  *
7873311ff84SXin LI  * For untimed wait, not checking the result of this function at all is
7883311ff84SXin LI  * definitely an option.
7893311ff84SXin LI  */
7902b15cb3dSCy Schubert static int
wait_for_sem(sem_ref sem,struct timespec * timeout)7912b15cb3dSCy Schubert wait_for_sem(
7922b15cb3dSCy Schubert 	sem_ref			sem,
7932b15cb3dSCy Schubert 	struct timespec *	timeout		/* wall-clock */
7942b15cb3dSCy Schubert 	)
7952b15cb3dSCy Schubert #ifdef SYS_WINNT
7962b15cb3dSCy Schubert {
7972b15cb3dSCy Schubert 	struct timespec now;
7982b15cb3dSCy Schubert 	struct timespec delta;
7992b15cb3dSCy Schubert 	DWORD		msec;
8002b15cb3dSCy Schubert 	DWORD		rc;
8012b15cb3dSCy Schubert 
8023311ff84SXin LI 	if (!(sem && sem->shnd)) {
8033311ff84SXin LI 		errno = EINVAL;
8043311ff84SXin LI 		return -1;
8053311ff84SXin LI 	}
8063311ff84SXin LI 
8072b15cb3dSCy Schubert 	if (NULL == timeout) {
8082b15cb3dSCy Schubert 		msec = INFINITE;
8092b15cb3dSCy Schubert 	} else {
8102b15cb3dSCy Schubert 		getclock(TIMEOFDAY, &now);
8112b15cb3dSCy Schubert 		delta = sub_tspec(*timeout, now);
8122b15cb3dSCy Schubert 		if (delta.tv_sec < 0) {
8132b15cb3dSCy Schubert 			msec = 0;
8142b15cb3dSCy Schubert 		} else if ((delta.tv_sec + 1) >= (MAXDWORD / 1000)) {
8152b15cb3dSCy Schubert 			msec = INFINITE;
8162b15cb3dSCy Schubert 		} else {
8172b15cb3dSCy Schubert 			msec = 1000 * (DWORD)delta.tv_sec;
8182b15cb3dSCy Schubert 			msec += delta.tv_nsec / (1000 * 1000);
8192b15cb3dSCy Schubert 		}
8202b15cb3dSCy Schubert 	}
8213311ff84SXin LI 	rc = WaitForSingleObject(sem->shnd, msec);
8222b15cb3dSCy Schubert 	if (WAIT_OBJECT_0 == rc)
8232b15cb3dSCy Schubert 		return 0;
8242b15cb3dSCy Schubert 	if (WAIT_TIMEOUT == rc) {
8252b15cb3dSCy Schubert 		errno = ETIMEDOUT;
8262b15cb3dSCy Schubert 		return -1;
8272b15cb3dSCy Schubert 	}
8282b15cb3dSCy Schubert 	msyslog(LOG_ERR, "WaitForSingleObject unexpected 0x%x", rc);
8292b15cb3dSCy Schubert 	errno = EFAULT;
8302b15cb3dSCy Schubert 	return -1;
8312b15cb3dSCy Schubert }
8322b15cb3dSCy Schubert #else	/* pthreads wait_for_sem() follows */
8332b15cb3dSCy Schubert {
8343311ff84SXin LI 	int rc = -1;
8352b15cb3dSCy Schubert 
8363311ff84SXin LI 	if (sem) do {
8372b15cb3dSCy Schubert 			if (NULL == timeout)
8382b15cb3dSCy Schubert 				rc = sem_wait(sem);
8392b15cb3dSCy Schubert 			else
8402b15cb3dSCy Schubert 				rc = sem_timedwait(sem, timeout);
8413311ff84SXin LI 		} while (rc == -1 && errno == EINTR);
8423311ff84SXin LI 	else
8433311ff84SXin LI 		errno = EINVAL;
8442b15cb3dSCy Schubert 
8452b15cb3dSCy Schubert 	return rc;
8462b15cb3dSCy Schubert }
8472b15cb3dSCy Schubert #endif
8482b15cb3dSCy Schubert 
8493311ff84SXin LI /* --------------------------------------------------------------------
8503311ff84SXin LI  * blocking_thread - thread functions have WINAPI (aka 'stdcall')
8513311ff84SXin LI  * calling conventions under Windows and POSIX-defined signature
8523311ff84SXin LI  * otherwise.
8532b15cb3dSCy Schubert  */
8542b15cb3dSCy Schubert #ifdef SYS_WINNT
8553311ff84SXin LI u_int WINAPI
8562b15cb3dSCy Schubert #else
8572b15cb3dSCy Schubert void *
8582b15cb3dSCy Schubert #endif
blocking_thread(void * ThreadArg)8592b15cb3dSCy Schubert blocking_thread(
8602b15cb3dSCy Schubert 	void *	ThreadArg
8612b15cb3dSCy Schubert 	)
8622b15cb3dSCy Schubert {
8632b15cb3dSCy Schubert 	blocking_child *c;
8642b15cb3dSCy Schubert 
8652b15cb3dSCy Schubert 	c = ThreadArg;
8662b15cb3dSCy Schubert 	exit_worker(blocking_child_common(c));
8672b15cb3dSCy Schubert 
8682b15cb3dSCy Schubert 	/* NOTREACHED */
8692b15cb3dSCy Schubert 	return 0;
8702b15cb3dSCy Schubert }
8712b15cb3dSCy Schubert 
8723311ff84SXin LI /* --------------------------------------------------------------------
8732b15cb3dSCy Schubert  * req_child_exit() runs in the parent.
8743311ff84SXin LI  *
8753311ff84SXin LI  * This function is called from from the idle timer, too, and possibly
8763311ff84SXin LI  * without a thread being there any longer. Since we have folded up our
8773311ff84SXin LI  * tent in that case and all the semaphores are already gone, we simply
8783311ff84SXin LI  * ignore this request in this case.
8793311ff84SXin LI  *
8803311ff84SXin LI  * Since the existence of the semaphores is controlled exclusively by
8813311ff84SXin LI  * the parent, there's no risk of data race here.
8822b15cb3dSCy Schubert  */
8832b15cb3dSCy Schubert int
req_child_exit(blocking_child * c)8842b15cb3dSCy Schubert req_child_exit(
8852b15cb3dSCy Schubert 	blocking_child *c
8862b15cb3dSCy Schubert 	)
8872b15cb3dSCy Schubert {
8883311ff84SXin LI 	return (c->accesslock)
8893311ff84SXin LI 	    ? queue_req_pointer(c, CHILD_EXIT_REQ)
8903311ff84SXin LI 	    : 0;
8912b15cb3dSCy Schubert }
8922b15cb3dSCy Schubert 
8933311ff84SXin LI /* --------------------------------------------------------------------
8942b15cb3dSCy Schubert  * cleanup_after_child() runs in parent.
8952b15cb3dSCy Schubert  */
8962b15cb3dSCy Schubert static void
cleanup_after_child(blocking_child * c)8972b15cb3dSCy Schubert cleanup_after_child(
8982b15cb3dSCy Schubert 	blocking_child *	c
8992b15cb3dSCy Schubert 	)
9002b15cb3dSCy Schubert {
9012b15cb3dSCy Schubert 	DEBUG_INSIST(!c->reusable);
9023311ff84SXin LI 
9032b15cb3dSCy Schubert #   ifdef SYS_WINNT
9043311ff84SXin LI 	/* The thread was not created in detached state, so we better
9053311ff84SXin LI 	 * clean up.
9063311ff84SXin LI 	 */
9073311ff84SXin LI 	if (c->thread_ref && c->thread_ref->thnd) {
9083311ff84SXin LI 		WaitForSingleObject(c->thread_ref->thnd, INFINITE);
9093311ff84SXin LI 		INSIST(CloseHandle(c->thread_ref->thnd));
9103311ff84SXin LI 		c->thread_ref->thnd = NULL;
9113311ff84SXin LI 	}
9122b15cb3dSCy Schubert #   endif
9132b15cb3dSCy Schubert 	c->thread_ref = NULL;
9143311ff84SXin LI 
9153311ff84SXin LI 	/* remove semaphores and (if signalling vi IO) pipes */
9163311ff84SXin LI 
9173311ff84SXin LI 	c->accesslock           = delete_sema(c->accesslock);
9183311ff84SXin LI 	c->workitems_pending    = delete_sema(c->workitems_pending);
9193311ff84SXin LI 	c->wake_scheduled_sleep = delete_sema(c->wake_scheduled_sleep);
9203311ff84SXin LI 
9212b15cb3dSCy Schubert #   ifdef WORK_PIPE
9222b15cb3dSCy Schubert 	DEBUG_INSIST(-1 != c->resp_read_pipe);
9232b15cb3dSCy Schubert 	DEBUG_INSIST(-1 != c->resp_write_pipe);
9242b15cb3dSCy Schubert 	(*addremove_io_fd)(c->resp_read_pipe, c->ispipe, TRUE);
9252b15cb3dSCy Schubert 	close(c->resp_write_pipe);
9262b15cb3dSCy Schubert 	close(c->resp_read_pipe);
9272b15cb3dSCy Schubert 	c->resp_write_pipe = -1;
9282b15cb3dSCy Schubert 	c->resp_read_pipe = -1;
9292b15cb3dSCy Schubert #   else
9303311ff84SXin LI 	DEBUG_INSIST(NULL != c->responses_pending);
9313311ff84SXin LI 	(*addremove_io_semaphore)(c->responses_pending->shnd, TRUE);
9323311ff84SXin LI 	c->responses_pending = delete_sema(c->responses_pending);
9332b15cb3dSCy Schubert #   endif
9343311ff84SXin LI 
9353311ff84SXin LI 	/* Is it necessary to check if there are pending requests and
9363311ff84SXin LI 	 * responses? If so, and if there are, what to do with them?
9373311ff84SXin LI 	 */
9383311ff84SXin LI 
9393311ff84SXin LI 	/* re-init buffer index sequencers */
9403311ff84SXin LI 	c->head_workitem = 0;
9413311ff84SXin LI 	c->tail_workitem = 0;
9423311ff84SXin LI 	c->head_response = 0;
9433311ff84SXin LI 	c->tail_response = 0;
9443311ff84SXin LI 
9452b15cb3dSCy Schubert 	c->reusable = TRUE;
9462b15cb3dSCy Schubert }
9472b15cb3dSCy Schubert 
9482b15cb3dSCy Schubert 
9492b15cb3dSCy Schubert #else	/* !WORK_THREAD follows */
9502b15cb3dSCy Schubert char work_thread_nonempty_compilation_unit;
9512b15cb3dSCy Schubert #endif
952