xref: /netbsd-src/external/bsd/ntp/dist/libntp/work_thread.c (revision 63372caa2f74032c7c1cb34e7cd32f28ad65b703)
1 /*	$NetBSD: work_thread.c,v 1.9 2024/08/18 20:47:13 christos Exp $	*/
2 
3 /*
4  * work_thread.c - threads implementation for blocking worker child.
5  */
6 #include <config.h>
7 #include "ntp_workimpl.h"
8 
9 #ifdef WORK_THREAD
10 
11 #include <stdio.h>
12 #include <ctype.h>
13 #include <signal.h>
14 #ifndef SYS_WINNT
15 #include <pthread.h>
16 #endif
17 
18 #include "ntp_stdlib.h"
19 #include "ntp_malloc.h"
20 #include "ntp_syslog.h"
21 #include "ntpd.h"
22 #include "ntp_io.h"
23 #include "ntp_assert.h"
24 #include "ntp_unixtime.h"
25 #include "timespecops.h"
26 #include "ntp_worker.h"
27 
28 #define CHILD_EXIT_REQ	((blocking_pipe_header *)(intptr_t)-1)
29 #define CHILD_GONE_RESP	CHILD_EXIT_REQ
30 /* Queue size increments:
31  * The request queue grows a bit faster than the response queue -- the
32  * daemon can push requests and pull results faster on avarage than the
33  * worker can process requests and push results...  If this really pays
34  * off is debatable.
35  */
36 #define WORKITEMS_ALLOC_INC	16
37 #define RESPONSES_ALLOC_INC	4
38 
39 /* Fiddle with min/max stack sizes. 64kB minimum seems to work, so we
40  * set the maximum to 256kB. If the minimum goes below the
41  * system-defined minimum stack size, we have to adjust accordingly.
42  */
43 #ifndef THREAD_MINSTACKSIZE
44 # define THREAD_MINSTACKSIZE	(64U * 1024)
45 #endif
46 
47 #ifndef THREAD_MAXSTACKSIZE
48 # define THREAD_MAXSTACKSIZE	(256U * 1024)
49 #endif
50 
51 /* need a good integer to store a pointer... */
52 #ifndef UINTPTR_T
53 # if defined(UINTPTR_MAX)
54 #  define UINTPTR_T uintptr_t
55 # elif defined(UINT_PTR)
56 #  define UINTPTR_T UINT_PTR
57 # else
58 #  define UINTPTR_T size_t
59 # endif
60 #endif
61 
62 
63 #ifdef SYS_WINNT
64 
65 # define thread_exit(c)	_endthreadex(c)
66 # define tickle_sem(sh) ReleaseSemaphore((sh->shnd), 1, NULL)
67 u_int	WINAPI	blocking_thread(void *);
68 static BOOL	same_os_sema(const sem_ref obj, void * osobj);
69 
70 #else
71 
72 # define thread_exit(c)	pthread_exit((void*)(UINTPTR_T)(c))
73 # define tickle_sem	sem_post
74 void *		blocking_thread(void *);
75 static	void	block_thread_signals(sigset_t *);
76 
77 #endif
78 
79 #ifdef WORK_PIPE
80 addremove_io_fd_func		addremove_io_fd;
81 #else
82 addremove_io_semaphore_func	addremove_io_semaphore;
83 #endif
84 
85 static	void	start_blocking_thread(blocking_child *);
86 static	void	start_blocking_thread_internal(blocking_child *);
87 static	void	prepare_child_sems(blocking_child *);
88 static	int	wait_for_sem(sem_ref, struct timespec *);
89 static	int	ensure_workitems_empty_slot(blocking_child *);
90 static	int	ensure_workresp_empty_slot(blocking_child *);
91 static	int	queue_req_pointer(blocking_child *, blocking_pipe_header *);
92 static	void	cleanup_after_child(blocking_child *);
93 
94 static sema_type worker_mmutex;
95 static sem_ref   worker_memlock;
96 
97 /* --------------------------------------------------------------------
98  * locking the global worker state table (and other global stuff)
99  */
100 void
101 worker_global_lock(
102 	int inOrOut)
103 {
104 	if (worker_memlock) {
105 		if (inOrOut)
106 			wait_for_sem(worker_memlock, NULL);
107 		else
108 			tickle_sem(worker_memlock);
109 	}
110 }
111 
112 /* --------------------------------------------------------------------
113  * implementation isolation wrapper
114  */
115 void
116 exit_worker(
117 	int	exitcode
118 	)
119 {
120 	thread_exit(exitcode);	/* see #define thread_exit */
121 }
122 
123 /* --------------------------------------------------------------------
124  * sleep for a given time or until the wakup semaphore is tickled.
125  */
126 int
127 worker_sleep(
128 	blocking_child *	c,
129 	time_t			seconds
130 	)
131 {
132 	struct timespec	until;
133 	int		rc;
134 
135 # ifdef HAVE_CLOCK_GETTIME
136 	if (0 != clock_gettime(CLOCK_REALTIME, &until)) {
137 		msyslog(LOG_ERR, "worker_sleep: clock_gettime() failed: %m");
138 		return -1;
139 	}
140 # else
141 	if (0 != getclock(TIMEOFDAY, &until)) {
142 		msyslog(LOG_ERR, "worker_sleep: getclock() failed: %m");
143 		return -1;
144 	}
145 # endif
146 	until.tv_sec += seconds;
147 	rc = wait_for_sem(c->wake_scheduled_sleep, &until);
148 	if (0 == rc)
149 		return -1;
150 	if (-1 == rc && ETIMEDOUT == errno)
151 		return 0;
152 	msyslog(LOG_ERR, "worker_sleep: sem_timedwait: %m");
153 	return -1;
154 }
155 
156 
157 /* --------------------------------------------------------------------
158  * Wake up a worker that takes a nap.
159  */
160 void
161 interrupt_worker_sleep(void)
162 {
163 	u_int			idx;
164 	blocking_child *	c;
165 
166 	for (idx = 0; idx < blocking_children_alloc; idx++) {
167 		c = blocking_children[idx];
168 		if (NULL == c || NULL == c->wake_scheduled_sleep)
169 			continue;
170 		tickle_sem(c->wake_scheduled_sleep);
171 	}
172 }
173 
174 /* --------------------------------------------------------------------
175  * Make sure there is an empty slot at the head of the request
176  * queue. Tell if the queue is currently empty.
177  */
178 static int
179 ensure_workitems_empty_slot(
180 	blocking_child *c
181 	)
182 {
183 	/*
184 	** !!! PRECONDITION: caller holds access lock!
185 	**
186 	** This simply tries to increase the size of the buffer if it
187 	** becomes full. The resize operation does *not* maintain the
188 	** order of requests, but that should be irrelevant since the
189 	** processing is considered asynchronous anyway.
190 	**
191 	** Return if the buffer is currently empty.
192 	*/
193 
194 	static const size_t each =
195 	    sizeof(blocking_children[0]->workitems[0]);
196 
197 	size_t	new_alloc;
198 	size_t  slots_used;
199 	size_t	sidx;
200 
201 	slots_used = c->head_workitem - c->tail_workitem;
202 	if (slots_used >= c->workitems_alloc) {
203 		new_alloc  = c->workitems_alloc + WORKITEMS_ALLOC_INC;
204 		c->workitems = erealloc(c->workitems, new_alloc * each);
205 		for (sidx = c->workitems_alloc; sidx < new_alloc; ++sidx)
206 		    c->workitems[sidx] = NULL;
207 		c->tail_workitem   = 0;
208 		c->head_workitem   = c->workitems_alloc;
209 		c->workitems_alloc = new_alloc;
210 	}
211 	INSIST(NULL == c->workitems[c->head_workitem % c->workitems_alloc]);
212 	return (0 == slots_used);
213 }
214 
215 /* --------------------------------------------------------------------
216  * Make sure there is an empty slot at the head of the response
217  * queue. Tell if the queue is currently empty.
218  */
219 static int
220 ensure_workresp_empty_slot(
221 	blocking_child *c
222 	)
223 {
224 	/*
225 	** !!! PRECONDITION: caller holds access lock!
226 	**
227 	** Works like the companion function above.
228 	*/
229 
230 	static const size_t each =
231 	    sizeof(blocking_children[0]->responses[0]);
232 
233 	size_t	new_alloc;
234 	size_t  slots_used;
235 	size_t	sidx;
236 
237 	slots_used = c->head_response - c->tail_response;
238 	if (slots_used >= c->responses_alloc) {
239 		new_alloc  = c->responses_alloc + RESPONSES_ALLOC_INC;
240 		c->responses = erealloc(c->responses, new_alloc * each);
241 		for (sidx = c->responses_alloc; sidx < new_alloc; ++sidx)
242 		    c->responses[sidx] = NULL;
243 		c->tail_response   = 0;
244 		c->head_response   = c->responses_alloc;
245 		c->responses_alloc = new_alloc;
246 	}
247 	INSIST(NULL == c->responses[c->head_response % c->responses_alloc]);
248 	return (0 == slots_used);
249 }
250 
251 
252 /* --------------------------------------------------------------------
253  * queue_req_pointer() - append a work item or idle exit request to
254  *			 blocking_workitems[]. Employ proper locking.
255  */
256 static int
257 queue_req_pointer(
258 	blocking_child	*	c,
259 	blocking_pipe_header *	hdr
260 	)
261 {
262 	size_t qhead;
263 
264 	/* >>>> ACCESS LOCKING STARTS >>>> */
265 	wait_for_sem(c->accesslock, NULL);
266 	ensure_workitems_empty_slot(c);
267 	qhead = c->head_workitem;
268 	c->workitems[qhead % c->workitems_alloc] = hdr;
269 	c->head_workitem = 1 + qhead;
270 	tickle_sem(c->accesslock);
271 	/* <<<< ACCESS LOCKING ENDS <<<< */
272 
273 	/* queue consumer wake-up notification */
274 	tickle_sem(c->workitems_pending);
275 
276 	return 0;
277 }
278 
279 /* --------------------------------------------------------------------
280  * API function to make sure a worker is running, a proper private copy
281  * of the data is made, the data eneterd into the queue and the worker
282  * is signalled.
283  */
284 int
285 send_blocking_req_internal(
286 	blocking_child *	c,
287 	blocking_pipe_header *	hdr,
288 	void *			data
289 	)
290 {
291 	blocking_pipe_header *	threadcopy;
292 	size_t			payload_octets;
293 
294 	REQUIRE(hdr != NULL);
295 	REQUIRE(data != NULL);
296 	DEBUG_REQUIRE(BLOCKING_REQ_MAGIC == hdr->magic_sig);
297 
298 	if (hdr->octets <= sizeof(*hdr))
299 		return 1;	/* failure */
300 	payload_octets = hdr->octets - sizeof(*hdr);
301 
302 	if (NULL == c->thread_ref)
303 		start_blocking_thread(c);
304 	threadcopy = emalloc(hdr->octets);
305 	memcpy(threadcopy, hdr, sizeof(*hdr));
306 	memcpy((char *)threadcopy + sizeof(*hdr), data, payload_octets);
307 
308 	return queue_req_pointer(c, threadcopy);
309 }
310 
311 /* --------------------------------------------------------------------
312  * Wait for the 'incoming queue no longer empty' signal, lock the shared
313  * structure and dequeue an item.
314  */
315 blocking_pipe_header *
316 receive_blocking_req_internal(
317 	blocking_child *	c
318 	)
319 {
320 	blocking_pipe_header *	req;
321 	size_t			qhead, qtail;
322 
323 	req = NULL;
324 	do {
325 		/* wait for tickle from the producer side */
326 		wait_for_sem(c->workitems_pending, NULL);
327 
328 		/* >>>> ACCESS LOCKING STARTS >>>> */
329 		wait_for_sem(c->accesslock, NULL);
330 		qhead = c->head_workitem;
331 		do {
332 			qtail = c->tail_workitem;
333 			if (qhead == qtail)
334 				break;
335 			c->tail_workitem = qtail + 1;
336 			qtail %= c->workitems_alloc;
337 			req = c->workitems[qtail];
338 			c->workitems[qtail] = NULL;
339 		} while (NULL == req);
340 		tickle_sem(c->accesslock);
341 		/* <<<< ACCESS LOCKING ENDS <<<< */
342 
343 	} while (NULL == req);
344 
345 	INSIST(NULL != req);
346 	if (CHILD_EXIT_REQ == req) {	/* idled out */
347 		send_blocking_resp_internal(c, CHILD_GONE_RESP);
348 		req = NULL;
349 	}
350 
351 	return req;
352 }
353 
354 /* --------------------------------------------------------------------
355  * Push a response into the return queue and eventually tickle the
356  * receiver.
357  */
358 int
359 send_blocking_resp_internal(
360 	blocking_child *	c,
361 	blocking_pipe_header *	resp
362 	)
363 {
364 	size_t	qhead;
365 	int	empty;
366 
367 	/* >>>> ACCESS LOCKING STARTS >>>> */
368 	wait_for_sem(c->accesslock, NULL);
369 	empty = ensure_workresp_empty_slot(c);
370 	qhead = c->head_response;
371 	c->responses[qhead % c->responses_alloc] = resp;
372 	c->head_response = 1 + qhead;
373 	tickle_sem(c->accesslock);
374 	/* <<<< ACCESS LOCKING ENDS <<<< */
375 
376 	/* queue consumer wake-up notification */
377 	if (empty)
378 	{
379 #	    ifdef WORK_PIPE
380 		if (1 != write(c->resp_write_pipe, "", 1))
381 			msyslog(LOG_WARNING, "async resolver: blocking_get%sinfo"
382 				" failed to notify main thread!",
383 				(BLOCKING_GETNAMEINFO == resp->rtype)
384 				    ? "name"
385 				    : "addr"
386 				);
387 #	    else
388 		tickle_sem(c->responses_pending);
389 #	    endif
390 	}
391 	return 0;
392 }
393 
394 
395 #ifndef WORK_PIPE
396 
397 /* --------------------------------------------------------------------
398  * Check if a (Windows-)handle to a semaphore is actually the same we
399  * are using inside the sema wrapper.
400  */
401 static BOOL
402 same_os_sema(
403 	const sem_ref	obj,
404 	void*		osh
405 	)
406 {
407 	return obj && osh && (obj->shnd == (HANDLE)osh);
408 }
409 
410 /* --------------------------------------------------------------------
411  * Find the shared context that associates to an OS handle and make sure
412  * the data is dequeued and processed.
413  */
414 void
415 handle_blocking_resp_sem(
416 	void *	context
417 	)
418 {
419 	blocking_child *	c;
420 	u_int			idx;
421 
422 	c = NULL;
423 	for (idx = 0; idx < blocking_children_alloc; idx++) {
424 		c = blocking_children[idx];
425 		if (c != NULL &&
426 			c->thread_ref != NULL &&
427 			same_os_sema(c->responses_pending, context))
428 			break;
429 	}
430 	if (idx < blocking_children_alloc)
431 		process_blocking_resp(c);
432 }
433 #endif	/* !WORK_PIPE */
434 
435 /* --------------------------------------------------------------------
436  * Fetch the next response from the return queue. In case of signalling
437  * via pipe, make sure the pipe is flushed, too.
438  */
439 blocking_pipe_header *
440 receive_blocking_resp_internal(
441 	blocking_child *	c
442 	)
443 {
444 	blocking_pipe_header *	removed;
445 	size_t			qhead, qtail, slot;
446 
447 #ifdef WORK_PIPE
448 	int			rc;
449 	char			scratch[32];
450 
451 	do
452 		rc = read(c->resp_read_pipe, scratch, sizeof(scratch));
453 	while (-1 == rc && EINTR == errno);
454 #endif
455 
456 	/* >>>> ACCESS LOCKING STARTS >>>> */
457 	wait_for_sem(c->accesslock, NULL);
458 	qhead = c->head_response;
459 	qtail = c->tail_response;
460 	for (removed = NULL; !removed && (qhead != qtail); ++qtail) {
461 		slot = qtail % c->responses_alloc;
462 		removed = c->responses[slot];
463 		c->responses[slot] = NULL;
464 	}
465 	c->tail_response = qtail;
466 	tickle_sem(c->accesslock);
467 	/* <<<< ACCESS LOCKING ENDS <<<< */
468 
469 	if (NULL != removed) {
470 		DEBUG_ENSURE(CHILD_GONE_RESP == removed ||
471 			     BLOCKING_RESP_MAGIC == removed->magic_sig);
472 	}
473 	if (CHILD_GONE_RESP == removed) {
474 		cleanup_after_child(c);
475 		removed = NULL;
476 	}
477 
478 	return removed;
479 }
480 
481 /* --------------------------------------------------------------------
482  * Light up a new worker.
483  */
484 static void
485 start_blocking_thread(
486 	blocking_child *	c
487 	)
488 {
489 
490 	DEBUG_INSIST(!c->reusable);
491 
492 	prepare_child_sems(c);
493 	start_blocking_thread_internal(c);
494 }
495 
496 /* --------------------------------------------------------------------
497  * Create a worker thread. There are several differences between POSIX
498  * and Windows, of course -- most notably the Windows thread is a
499  * detached thread, and we keep the handle around until we want to get
500  * rid of the thread. The notification scheme also differs: Windows
501  * makes use of semaphores in both directions, POSIX uses a pipe for
502  * integration with 'select()' or alike.
503  */
504 static void
505 start_blocking_thread_internal(
506 	blocking_child *	c
507 	)
508 #ifdef SYS_WINNT
509 {
510 	BOOL	resumed;
511 
512 	c->thread_ref = NULL;
513 	(*addremove_io_semaphore)(c->responses_pending->shnd, FALSE);
514 	c->thr_table[0].thnd =
515 		(HANDLE)_beginthreadex(
516 			NULL,
517 			0,
518 			&blocking_thread,
519 			c,
520 			CREATE_SUSPENDED,
521 			NULL);
522 
523 	if (NULL == c->thr_table[0].thnd) {
524 		msyslog(LOG_ERR, "start blocking thread failed: %m");
525 		exit(-1);
526 	}
527 	/* remember the thread priority is only within the process class */
528 	if (!SetThreadPriority(c->thr_table[0].thnd,
529 			       THREAD_PRIORITY_BELOW_NORMAL)) {
530 		msyslog(LOG_ERR, "Error lowering blocking thread priority: %m");
531 	}
532 	if (NULL != pSetThreadDescription) {
533 		(*pSetThreadDescription)(c->thr_table[0].thnd, L"ntp_worker");
534 	}
535 	resumed = ResumeThread(c->thr_table[0].thnd);
536 	DEBUG_INSIST(resumed);
537 	c->thread_ref = &c->thr_table[0];
538 }
539 #else	/* pthreads start_blocking_thread_internal() follows */
540 {
541 # ifdef NEED_PTHREAD_INIT
542 	static int	pthread_init_called;
543 # endif
544 	pthread_attr_t	thr_attr;
545 	int		rc;
546 	int		pipe_ends[2];	/* read then write */
547 	int		is_pipe;
548 	int		flags;
549 	size_t		ostacksize;
550 	size_t		nstacksize;
551 	sigset_t	saved_sig_mask;
552 
553 	c->thread_ref = NULL;
554 
555 # ifdef NEED_PTHREAD_INIT
556 	/*
557 	 * from lib/isc/unix/app.c:
558 	 * BSDI 3.1 seg faults in pthread_sigmask() if we don't do this.
559 	 */
560 	if (!pthread_init_called) {
561 		pthread_init();
562 		pthread_init_called = TRUE;
563 	}
564 # endif
565 
566 	rc = pipe_socketpair(&pipe_ends[0], &is_pipe);
567 	if (0 != rc) {
568 		msyslog(LOG_ERR, "start_blocking_thread: pipe_socketpair() %m");
569 		exit(1);
570 	}
571 	c->resp_read_pipe = move_fd(pipe_ends[0]);
572 	c->resp_write_pipe = move_fd(pipe_ends[1]);
573 	c->ispipe = is_pipe;
574 	flags = fcntl(c->resp_read_pipe, F_GETFL, 0);
575 	if (-1 == flags) {
576 		msyslog(LOG_ERR, "start_blocking_thread: fcntl(F_GETFL) %m");
577 		exit(1);
578 	}
579 	rc = fcntl(c->resp_read_pipe, F_SETFL, O_NONBLOCK | flags);
580 	if (-1 == rc) {
581 		msyslog(LOG_ERR,
582 			"start_blocking_thread: fcntl(F_SETFL, O_NONBLOCK) %m");
583 		exit(1);
584 	}
585 	(*addremove_io_fd)(c->resp_read_pipe, c->ispipe, FALSE);
586 	pthread_attr_init(&thr_attr);
587 	pthread_attr_setdetachstate(&thr_attr, PTHREAD_CREATE_DETACHED);
588 #if defined(HAVE_PTHREAD_ATTR_GETSTACKSIZE) && \
589     defined(HAVE_PTHREAD_ATTR_SETSTACKSIZE)
590 	rc = pthread_attr_getstacksize(&thr_attr, &ostacksize);
591 	if (0 != rc) {
592 		msyslog(LOG_ERR,
593 			"start_blocking_thread: pthread_attr_getstacksize() -> %s",
594 			strerror(rc));
595 	} else {
596 		nstacksize = ostacksize;
597 		/* order is important here: first clamp on upper limit,
598 		 * and the PTHREAD min stack size is ultimate override!
599 		 */
600 		if (nstacksize > THREAD_MAXSTACKSIZE)
601 			nstacksize = THREAD_MAXSTACKSIZE;
602 #            ifdef PTHREAD_STACK_MAX
603 		if (nstacksize > PTHREAD_STACK_MAX)
604 			nstacksize = PTHREAD_STACK_MAX;
605 #            endif
606 
607 		/* now clamp on lower stack limit. */
608 		if (nstacksize < THREAD_MINSTACKSIZE)
609 			nstacksize = THREAD_MINSTACKSIZE;
610 #            ifdef PTHREAD_STACK_MIN
611 		if (nstacksize < PTHREAD_STACK_MIN)
612 			nstacksize = PTHREAD_STACK_MIN;
613 #            endif
614 
615 		if (nstacksize != ostacksize)
616 			rc = pthread_attr_setstacksize(&thr_attr, nstacksize);
617 		if (0 != rc)
618 			msyslog(LOG_ERR,
619 				"start_blocking_thread: pthread_attr_setstacksize(0x%lx -> 0x%lx) -> %s",
620 				(u_long)ostacksize, (u_long)nstacksize,
621 				strerror(rc));
622 	}
623 #else
624 	UNUSED_ARG(nstacksize);
625 	UNUSED_ARG(ostacksize);
626 #endif
627 #if defined(PTHREAD_SCOPE_SYSTEM) && defined(NEED_PTHREAD_SCOPE_SYSTEM)
628 	pthread_attr_setscope(&thr_attr, PTHREAD_SCOPE_SYSTEM);
629 #endif
630 	c->thread_ref = emalloc_zero(sizeof(*c->thread_ref));
631 	block_thread_signals(&saved_sig_mask);
632 	rc = pthread_create(&c->thr_table[0], &thr_attr,
633 			    &blocking_thread, c);
634 	pthread_sigmask(SIG_SETMASK, &saved_sig_mask, NULL);
635 	pthread_attr_destroy(&thr_attr);
636 	if (0 != rc) {
637 		msyslog(LOG_ERR, "start_blocking_thread: pthread_create() -> %s",
638 			strerror(rc));
639 		exit(1);
640 	}
641 	c->thread_ref = &c->thr_table[0];
642 }
643 #endif
644 
645 /* --------------------------------------------------------------------
646  * block_thread_signals()
647  *
648  * Temporarily block signals used by ntpd main thread, so that signal
649  * mask inherited by child threads leaves them blocked.  Returns prior
650  * active signal mask via pmask, to be restored by the main thread
651  * after pthread_create().
652  */
653 #ifndef SYS_WINNT
654 void
655 block_thread_signals(
656 	sigset_t *	pmask
657 	)
658 {
659 	sigset_t	block;
660 
661 	sigemptyset(&block);
662 # ifdef HAVE_SIGNALED_IO
663 #  ifdef SIGIO
664 	sigaddset(&block, SIGIO);
665 #  endif
666 #  ifdef SIGPOLL
667 	sigaddset(&block, SIGPOLL);
668 #  endif
669 # endif	/* HAVE_SIGNALED_IO */
670 	sigaddset(&block, SIGALRM);
671 	sigaddset(&block, MOREDEBUGSIG);
672 	sigaddset(&block, LESSDEBUGSIG);
673 # ifdef SIGDIE1
674 	sigaddset(&block, SIGDIE1);
675 # endif
676 # ifdef SIGDIE2
677 	sigaddset(&block, SIGDIE2);
678 # endif
679 # ifdef SIGDIE3
680 	sigaddset(&block, SIGDIE3);
681 # endif
682 # ifdef SIGDIE4
683 	sigaddset(&block, SIGDIE4);
684 # endif
685 # ifdef SIGBUS
686 	sigaddset(&block, SIGBUS);
687 # endif
688 	sigemptyset(pmask);
689 	pthread_sigmask(SIG_BLOCK, &block, pmask);
690 }
691 #endif	/* !SYS_WINNT */
692 
693 
694 /* --------------------------------------------------------------------
695  * Create & destroy semaphores. This is sufficiently different between
696  * POSIX and Windows to warrant wrapper functions and close enough to
697  * use the concept of synchronization via semaphore for all platforms.
698  */
699 static sem_ref
700 create_sema(
701 	sema_type*	semptr,
702 	u_int		inival,
703 	u_int		maxval)
704 {
705 #ifdef SYS_WINNT
706 
707 	long svini, svmax;
708 	if (NULL != semptr) {
709 		svini = (inival < LONG_MAX)
710 		    ? (long)inival : LONG_MAX;
711 		svmax = (maxval < LONG_MAX && maxval > 0)
712 		    ? (long)maxval : LONG_MAX;
713 		semptr->shnd = CreateSemaphore(NULL, svini, svmax, NULL);
714 		if (NULL == semptr->shnd)
715 			semptr = NULL;
716 	}
717 
718 #else
719 
720 	(void)maxval;
721 	if (semptr && sem_init(semptr, FALSE, inival))
722 		semptr = NULL;
723 
724 #endif
725 
726 	return semptr;
727 }
728 
729 /* ------------------------------------------------------------------ */
730 static sem_ref
731 delete_sema(
732 	sem_ref obj)
733 {
734 
735 #   ifdef SYS_WINNT
736 
737 	if (obj) {
738 		if (obj->shnd)
739 			CloseHandle(obj->shnd);
740 		obj->shnd = NULL;
741 	}
742 
743 #   else
744 
745 	if (obj)
746 		sem_destroy(obj);
747 
748 #   endif
749 
750 	return NULL;
751 }
752 
753 /* --------------------------------------------------------------------
754  * prepare_child_sems()
755  *
756  * create sync & access semaphores
757  *
758  * All semaphores are cleared, only the access semaphore has 1 unit.
759  * Childs wait on 'workitems_pending', then grabs 'sema_access'
760  * and dequeues jobs. When done, 'sema_access' is given one unit back.
761  *
762  * The producer grabs 'sema_access', manages the queue, restores
763  * 'sema_access' and puts one unit into 'workitems_pending'.
764  *
765  * The story goes the same for the response queue.
766  */
767 static void
768 prepare_child_sems(
769 	blocking_child *c
770 	)
771 {
772 	if (NULL == worker_memlock)
773 		worker_memlock = create_sema(&worker_mmutex, 1, 1);
774 
775 	c->accesslock           = create_sema(&c->sem_table[0], 1, 1);
776 	c->workitems_pending    = create_sema(&c->sem_table[1], 0, 0);
777 	c->wake_scheduled_sleep = create_sema(&c->sem_table[2], 0, 1);
778 #   ifndef WORK_PIPE
779 	c->responses_pending    = create_sema(&c->sem_table[3], 0, 0);
780 #   endif
781 }
782 
783 /* --------------------------------------------------------------------
784  * wait for semaphore. Where the wait can be interrupted, it will
785  * internally resume -- When this function returns, there is either no
786  * semaphore at all, a timeout occurred, or the caller could
787  * successfully take a token from the semaphore.
788  *
789  * For untimed wait, not checking the result of this function at all is
790  * definitely an option.
791  */
792 static int
793 wait_for_sem(
794 	sem_ref			sem,
795 	struct timespec *	timeout		/* wall-clock */
796 	)
797 #ifdef SYS_WINNT
798 {
799 	struct timespec now;
800 	struct timespec delta;
801 	DWORD		msec;
802 	DWORD		rc;
803 
804 	if (!(sem && sem->shnd)) {
805 		errno = EINVAL;
806 		return -1;
807 	}
808 
809 	if (NULL == timeout) {
810 		msec = INFINITE;
811 	} else {
812 		getclock(TIMEOFDAY, &now);
813 		delta = sub_tspec(*timeout, now);
814 		if (delta.tv_sec < 0) {
815 			msec = 0;
816 		} else if ((delta.tv_sec + 1) >= (MAXDWORD / 1000)) {
817 			msec = INFINITE;
818 		} else {
819 			msec = 1000 * (DWORD)delta.tv_sec;
820 			msec += delta.tv_nsec / (1000 * 1000);
821 		}
822 	}
823 	rc = WaitForSingleObject(sem->shnd, msec);
824 	if (WAIT_OBJECT_0 == rc)
825 		return 0;
826 	if (WAIT_TIMEOUT == rc) {
827 		errno = ETIMEDOUT;
828 		return -1;
829 	}
830 	msyslog(LOG_ERR, "WaitForSingleObject unexpected 0x%x", rc);
831 	errno = EFAULT;
832 	return -1;
833 }
834 #else	/* pthreads wait_for_sem() follows */
835 {
836 	int rc = -1;
837 
838 	if (sem) do {
839 			if (NULL == timeout)
840 				rc = sem_wait(sem);
841 			else
842 				rc = sem_timedwait(sem, timeout);
843 		} while (rc == -1 && errno == EINTR);
844 	else
845 		errno = EINVAL;
846 
847 	return rc;
848 }
849 #endif
850 
851 /* --------------------------------------------------------------------
852  * blocking_thread - thread functions have WINAPI (aka 'stdcall')
853  * calling conventions under Windows and POSIX-defined signature
854  * otherwise.
855  */
856 #ifdef SYS_WINNT
857 u_int WINAPI
858 #else
859 void *
860 #endif
861 blocking_thread(
862 	void *	ThreadArg
863 	)
864 {
865 	blocking_child *c;
866 
867 	c = ThreadArg;
868 	exit_worker(blocking_child_common(c));
869 
870 	/* NOTREACHED */
871 	return 0;
872 }
873 
874 /* --------------------------------------------------------------------
875  * req_child_exit() runs in the parent.
876  *
877  * This function is called from from the idle timer, too, and possibly
878  * without a thread being there any longer. Since we have folded up our
879  * tent in that case and all the semaphores are already gone, we simply
880  * ignore this request in this case.
881  *
882  * Since the existence of the semaphores is controlled exclusively by
883  * the parent, there's no risk of data race here.
884  */
885 int
886 req_child_exit(
887 	blocking_child *c
888 	)
889 {
890 	return (c->accesslock)
891 	    ? queue_req_pointer(c, CHILD_EXIT_REQ)
892 	    : 0;
893 }
894 
895 /* --------------------------------------------------------------------
896  * cleanup_after_child() runs in parent.
897  */
898 static void
899 cleanup_after_child(
900 	blocking_child *	c
901 	)
902 {
903 	DEBUG_INSIST(!c->reusable);
904 
905 #   ifdef SYS_WINNT
906 	/* The thread was not created in detached state, so we better
907 	 * clean up.
908 	 */
909 	if (c->thread_ref && c->thread_ref->thnd) {
910 		WaitForSingleObject(c->thread_ref->thnd, INFINITE);
911 		INSIST(CloseHandle(c->thread_ref->thnd));
912 		c->thread_ref->thnd = NULL;
913 	}
914 #   endif
915 	c->thread_ref = NULL;
916 
917 	/* remove semaphores and (if signalling vi IO) pipes */
918 
919 	c->accesslock           = delete_sema(c->accesslock);
920 	c->workitems_pending    = delete_sema(c->workitems_pending);
921 	c->wake_scheduled_sleep = delete_sema(c->wake_scheduled_sleep);
922 
923 #   ifdef WORK_PIPE
924 	DEBUG_INSIST(-1 != c->resp_read_pipe);
925 	DEBUG_INSIST(-1 != c->resp_write_pipe);
926 	(*addremove_io_fd)(c->resp_read_pipe, c->ispipe, TRUE);
927 	close(c->resp_write_pipe);
928 	close(c->resp_read_pipe);
929 	c->resp_write_pipe = -1;
930 	c->resp_read_pipe = -1;
931 #   else
932 	DEBUG_INSIST(NULL != c->responses_pending);
933 	(*addremove_io_semaphore)(c->responses_pending->shnd, TRUE);
934 	c->responses_pending = delete_sema(c->responses_pending);
935 #   endif
936 
937 	/* Is it necessary to check if there are pending requests and
938 	 * responses? If so, and if there are, what to do with them?
939 	 */
940 
941 	/* re-init buffer index sequencers */
942 	c->head_workitem = 0;
943 	c->tail_workitem = 0;
944 	c->head_response = 0;
945 	c->tail_response = 0;
946 
947 	c->reusable = TRUE;
948 }
949 
950 
951 #else	/* !WORK_THREAD follows */
952 char work_thread_nonempty_compilation_unit;
953 #endif
954