xref: /netbsd-src/external/bsd/ntp/dist/libntp/work_thread.c (revision 8c5fe5c1e4abd9106f7eec3f829c22d3c5cb31fe)
1 /*	$NetBSD: work_thread.c,v 1.8 2020/05/25 20:47:24 christos Exp $	*/
2 
3 /*
4  * work_thread.c - threads implementation for blocking worker child.
5  */
6 #include <config.h>
7 #include "ntp_workimpl.h"
8 
9 #ifdef WORK_THREAD
10 
11 #include <stdio.h>
12 #include <ctype.h>
13 #include <signal.h>
14 #ifndef SYS_WINNT
15 #include <pthread.h>
16 #endif
17 
18 #include "ntp_stdlib.h"
19 #include "ntp_malloc.h"
20 #include "ntp_syslog.h"
21 #include "ntpd.h"
22 #include "ntp_io.h"
23 #include "ntp_assert.h"
24 #include "ntp_unixtime.h"
25 #include "timespecops.h"
26 #include "ntp_worker.h"
27 
28 #define CHILD_EXIT_REQ	((blocking_pipe_header *)(intptr_t)-1)
29 #define CHILD_GONE_RESP	CHILD_EXIT_REQ
30 /* Queue size increments:
31  * The request queue grows a bit faster than the response queue -- the
32  * daemon can push requests and pull results faster on avarage than the
33  * worker can process requests and push results...  If this really pays
34  * off is debatable.
35  */
36 #define WORKITEMS_ALLOC_INC	16
37 #define RESPONSES_ALLOC_INC	4
38 
39 /* Fiddle with min/max stack sizes. 64kB minimum seems to work, so we
40  * set the maximum to 256kB. If the minimum goes below the
41  * system-defined minimum stack size, we have to adjust accordingly.
42  */
43 #ifndef THREAD_MINSTACKSIZE
44 # define THREAD_MINSTACKSIZE	(64U * 1024)
45 #endif
46 #ifndef __sun
47 #if defined(PTHREAD_STACK_MIN) && THREAD_MINSTACKSIZE < PTHREAD_STACK_MIN
48 # undef THREAD_MINSTACKSIZE
49 # define THREAD_MINSTACKSIZE PTHREAD_STACK_MIN
50 #endif
51 #endif
52 
53 #ifndef THREAD_MAXSTACKSIZE
54 # define THREAD_MAXSTACKSIZE	(256U * 1024)
55 #endif
56 #if THREAD_MAXSTACKSIZE < THREAD_MINSTACKSIZE
57 # undef  THREAD_MAXSTACKSIZE
58 # define THREAD_MAXSTACKSIZE THREAD_MINSTACKSIZE
59 #endif
60 
61 /* need a good integer to store a pointer... */
62 #ifndef UINTPTR_T
63 # if defined(UINTPTR_MAX)
64 #  define UINTPTR_T uintptr_t
65 # elif defined(UINT_PTR)
66 #  define UINTPTR_T UINT_PTR
67 # else
68 #  define UINTPTR_T size_t
69 # endif
70 #endif
71 
72 
73 #ifdef SYS_WINNT
74 
75 # define thread_exit(c)	_endthreadex(c)
76 # define tickle_sem(sh) ReleaseSemaphore((sh->shnd), 1, NULL)
77 u_int	WINAPI	blocking_thread(void *);
78 static BOOL	same_os_sema(const sem_ref obj, void * osobj);
79 
80 #else
81 
82 # define thread_exit(c)	pthread_exit((void*)(UINTPTR_T)(c))
83 # define tickle_sem	sem_post
84 void *		blocking_thread(void *);
85 static	void	block_thread_signals(sigset_t *);
86 
87 #endif
88 
89 #ifdef WORK_PIPE
90 addremove_io_fd_func		addremove_io_fd;
91 #else
92 addremove_io_semaphore_func	addremove_io_semaphore;
93 #endif
94 
95 static	void	start_blocking_thread(blocking_child *);
96 static	void	start_blocking_thread_internal(blocking_child *);
97 static	void	prepare_child_sems(blocking_child *);
98 static	int	wait_for_sem(sem_ref, struct timespec *);
99 static	int	ensure_workitems_empty_slot(blocking_child *);
100 static	int	ensure_workresp_empty_slot(blocking_child *);
101 static	int	queue_req_pointer(blocking_child *, blocking_pipe_header *);
102 static	void	cleanup_after_child(blocking_child *);
103 
104 static sema_type worker_mmutex;
105 static sem_ref   worker_memlock;
106 
107 /* --------------------------------------------------------------------
108  * locking the global worker state table (and other global stuff)
109  */
110 void
111 worker_global_lock(
112 	int inOrOut)
113 {
114 	if (worker_memlock) {
115 		if (inOrOut)
116 			wait_for_sem(worker_memlock, NULL);
117 		else
118 			tickle_sem(worker_memlock);
119 	}
120 }
121 
122 /* --------------------------------------------------------------------
123  * implementation isolation wrapper
124  */
125 void
126 exit_worker(
127 	int	exitcode
128 	)
129 {
130 	thread_exit(exitcode);	/* see #define thread_exit */
131 }
132 
133 /* --------------------------------------------------------------------
134  * sleep for a given time or until the wakup semaphore is tickled.
135  */
136 int
137 worker_sleep(
138 	blocking_child *	c,
139 	time_t			seconds
140 	)
141 {
142 	struct timespec	until;
143 	int		rc;
144 
145 # ifdef HAVE_CLOCK_GETTIME
146 	if (0 != clock_gettime(CLOCK_REALTIME, &until)) {
147 		msyslog(LOG_ERR, "worker_sleep: clock_gettime() failed: %m");
148 		return -1;
149 	}
150 # else
151 	if (0 != getclock(TIMEOFDAY, &until)) {
152 		msyslog(LOG_ERR, "worker_sleep: getclock() failed: %m");
153 		return -1;
154 	}
155 # endif
156 	until.tv_sec += seconds;
157 	rc = wait_for_sem(c->wake_scheduled_sleep, &until);
158 	if (0 == rc)
159 		return -1;
160 	if (-1 == rc && ETIMEDOUT == errno)
161 		return 0;
162 	msyslog(LOG_ERR, "worker_sleep: sem_timedwait: %m");
163 	return -1;
164 }
165 
166 
167 /* --------------------------------------------------------------------
168  * Wake up a worker that takes a nap.
169  */
170 void
171 interrupt_worker_sleep(void)
172 {
173 	u_int			idx;
174 	blocking_child *	c;
175 
176 	for (idx = 0; idx < blocking_children_alloc; idx++) {
177 		c = blocking_children[idx];
178 		if (NULL == c || NULL == c->wake_scheduled_sleep)
179 			continue;
180 		tickle_sem(c->wake_scheduled_sleep);
181 	}
182 }
183 
184 /* --------------------------------------------------------------------
185  * Make sure there is an empty slot at the head of the request
186  * queue. Tell if the queue is currently empty.
187  */
188 static int
189 ensure_workitems_empty_slot(
190 	blocking_child *c
191 	)
192 {
193 	/*
194 	** !!! PRECONDITION: caller holds access lock!
195 	**
196 	** This simply tries to increase the size of the buffer if it
197 	** becomes full. The resize operation does *not* maintain the
198 	** order of requests, but that should be irrelevant since the
199 	** processing is considered asynchronous anyway.
200 	**
201 	** Return if the buffer is currently empty.
202 	*/
203 
204 	static const size_t each =
205 	    sizeof(blocking_children[0]->workitems[0]);
206 
207 	size_t	new_alloc;
208 	size_t  slots_used;
209 	size_t	sidx;
210 
211 	slots_used = c->head_workitem - c->tail_workitem;
212 	if (slots_used >= c->workitems_alloc) {
213 		new_alloc  = c->workitems_alloc + WORKITEMS_ALLOC_INC;
214 		c->workitems = erealloc(c->workitems, new_alloc * each);
215 		for (sidx = c->workitems_alloc; sidx < new_alloc; ++sidx)
216 		    c->workitems[sidx] = NULL;
217 		c->tail_workitem   = 0;
218 		c->head_workitem   = c->workitems_alloc;
219 		c->workitems_alloc = new_alloc;
220 	}
221 	INSIST(NULL == c->workitems[c->head_workitem % c->workitems_alloc]);
222 	return (0 == slots_used);
223 }
224 
225 /* --------------------------------------------------------------------
226  * Make sure there is an empty slot at the head of the response
227  * queue. Tell if the queue is currently empty.
228  */
229 static int
230 ensure_workresp_empty_slot(
231 	blocking_child *c
232 	)
233 {
234 	/*
235 	** !!! PRECONDITION: caller holds access lock!
236 	**
237 	** Works like the companion function above.
238 	*/
239 
240 	static const size_t each =
241 	    sizeof(blocking_children[0]->responses[0]);
242 
243 	size_t	new_alloc;
244 	size_t  slots_used;
245 	size_t	sidx;
246 
247 	slots_used = c->head_response - c->tail_response;
248 	if (slots_used >= c->responses_alloc) {
249 		new_alloc  = c->responses_alloc + RESPONSES_ALLOC_INC;
250 		c->responses = erealloc(c->responses, new_alloc * each);
251 		for (sidx = c->responses_alloc; sidx < new_alloc; ++sidx)
252 		    c->responses[sidx] = NULL;
253 		c->tail_response   = 0;
254 		c->head_response   = c->responses_alloc;
255 		c->responses_alloc = new_alloc;
256 	}
257 	INSIST(NULL == c->responses[c->head_response % c->responses_alloc]);
258 	return (0 == slots_used);
259 }
260 
261 
262 /* --------------------------------------------------------------------
263  * queue_req_pointer() - append a work item or idle exit request to
264  *			 blocking_workitems[]. Employ proper locking.
265  */
266 static int
267 queue_req_pointer(
268 	blocking_child	*	c,
269 	blocking_pipe_header *	hdr
270 	)
271 {
272 	size_t qhead;
273 
274 	/* >>>> ACCESS LOCKING STARTS >>>> */
275 	wait_for_sem(c->accesslock, NULL);
276 	ensure_workitems_empty_slot(c);
277 	qhead = c->head_workitem;
278 	c->workitems[qhead % c->workitems_alloc] = hdr;
279 	c->head_workitem = 1 + qhead;
280 	tickle_sem(c->accesslock);
281 	/* <<<< ACCESS LOCKING ENDS <<<< */
282 
283 	/* queue consumer wake-up notification */
284 	tickle_sem(c->workitems_pending);
285 
286 	return 0;
287 }
288 
289 /* --------------------------------------------------------------------
290  * API function to make sure a worker is running, a proper private copy
291  * of the data is made, the data eneterd into the queue and the worker
292  * is signalled.
293  */
294 int
295 send_blocking_req_internal(
296 	blocking_child *	c,
297 	blocking_pipe_header *	hdr,
298 	void *			data
299 	)
300 {
301 	blocking_pipe_header *	threadcopy;
302 	size_t			payload_octets;
303 
304 	REQUIRE(hdr != NULL);
305 	REQUIRE(data != NULL);
306 	DEBUG_REQUIRE(BLOCKING_REQ_MAGIC == hdr->magic_sig);
307 
308 	if (hdr->octets <= sizeof(*hdr))
309 		return 1;	/* failure */
310 	payload_octets = hdr->octets - sizeof(*hdr);
311 
312 	if (NULL == c->thread_ref)
313 		start_blocking_thread(c);
314 	threadcopy = emalloc(hdr->octets);
315 	memcpy(threadcopy, hdr, sizeof(*hdr));
316 	memcpy((char *)threadcopy + sizeof(*hdr), data, payload_octets);
317 
318 	return queue_req_pointer(c, threadcopy);
319 }
320 
321 /* --------------------------------------------------------------------
322  * Wait for the 'incoming queue no longer empty' signal, lock the shared
323  * structure and dequeue an item.
324  */
325 blocking_pipe_header *
326 receive_blocking_req_internal(
327 	blocking_child *	c
328 	)
329 {
330 	blocking_pipe_header *	req;
331 	size_t			qhead, qtail;
332 
333 	req = NULL;
334 	do {
335 		/* wait for tickle from the producer side */
336 		wait_for_sem(c->workitems_pending, NULL);
337 
338 		/* >>>> ACCESS LOCKING STARTS >>>> */
339 		wait_for_sem(c->accesslock, NULL);
340 		qhead = c->head_workitem;
341 		do {
342 			qtail = c->tail_workitem;
343 			if (qhead == qtail)
344 				break;
345 			c->tail_workitem = qtail + 1;
346 			qtail %= c->workitems_alloc;
347 			req = c->workitems[qtail];
348 			c->workitems[qtail] = NULL;
349 		} while (NULL == req);
350 		tickle_sem(c->accesslock);
351 		/* <<<< ACCESS LOCKING ENDS <<<< */
352 
353 	} while (NULL == req);
354 
355 	INSIST(NULL != req);
356 	if (CHILD_EXIT_REQ == req) {	/* idled out */
357 		send_blocking_resp_internal(c, CHILD_GONE_RESP);
358 		req = NULL;
359 	}
360 
361 	return req;
362 }
363 
364 /* --------------------------------------------------------------------
365  * Push a response into the return queue and eventually tickle the
366  * receiver.
367  */
368 int
369 send_blocking_resp_internal(
370 	blocking_child *	c,
371 	blocking_pipe_header *	resp
372 	)
373 {
374 	size_t	qhead;
375 	int	empty;
376 
377 	/* >>>> ACCESS LOCKING STARTS >>>> */
378 	wait_for_sem(c->accesslock, NULL);
379 	empty = ensure_workresp_empty_slot(c);
380 	qhead = c->head_response;
381 	c->responses[qhead % c->responses_alloc] = resp;
382 	c->head_response = 1 + qhead;
383 	tickle_sem(c->accesslock);
384 	/* <<<< ACCESS LOCKING ENDS <<<< */
385 
386 	/* queue consumer wake-up notification */
387 	if (empty)
388 	{
389 #	    ifdef WORK_PIPE
390 		if (1 != write(c->resp_write_pipe, "", 1))
391 			msyslog(LOG_WARNING, "async resolver: %s",
392 				"failed to notify main thread!");
393 #	    else
394 		tickle_sem(c->responses_pending);
395 #	    endif
396 	}
397 	return 0;
398 }
399 
400 
401 #ifndef WORK_PIPE
402 
403 /* --------------------------------------------------------------------
404  * Check if a (Windows-)hanndle to a semaphore is actually the same we
405  * are using inside the sema wrapper.
406  */
407 static BOOL
408 same_os_sema(
409 	const sem_ref	obj,
410 	void*		osh
411 	)
412 {
413 	return obj && osh && (obj->shnd == (HANDLE)osh);
414 }
415 
416 /* --------------------------------------------------------------------
417  * Find the shared context that associates to an OS handle and make sure
418  * the data is dequeued and processed.
419  */
420 void
421 handle_blocking_resp_sem(
422 	void *	context
423 	)
424 {
425 	blocking_child *	c;
426 	u_int			idx;
427 
428 	c = NULL;
429 	for (idx = 0; idx < blocking_children_alloc; idx++) {
430 		c = blocking_children[idx];
431 		if (c != NULL &&
432 			c->thread_ref != NULL &&
433 			same_os_sema(c->responses_pending, context))
434 			break;
435 	}
436 	if (idx < blocking_children_alloc)
437 		process_blocking_resp(c);
438 }
439 #endif	/* !WORK_PIPE */
440 
441 /* --------------------------------------------------------------------
442  * Fetch the next response from the return queue. In case of signalling
443  * via pipe, make sure the pipe is flushed, too.
444  */
445 blocking_pipe_header *
446 receive_blocking_resp_internal(
447 	blocking_child *	c
448 	)
449 {
450 	blocking_pipe_header *	removed;
451 	size_t			qhead, qtail, slot;
452 
453 #ifdef WORK_PIPE
454 	int			rc;
455 	char			scratch[32];
456 
457 	do
458 		rc = read(c->resp_read_pipe, scratch, sizeof(scratch));
459 	while (-1 == rc && EINTR == errno);
460 #endif
461 
462 	/* >>>> ACCESS LOCKING STARTS >>>> */
463 	wait_for_sem(c->accesslock, NULL);
464 	qhead = c->head_response;
465 	qtail = c->tail_response;
466 	for (removed = NULL; !removed && (qhead != qtail); ++qtail) {
467 		slot = qtail % c->responses_alloc;
468 		removed = c->responses[slot];
469 		c->responses[slot] = NULL;
470 	}
471 	c->tail_response = qtail;
472 	tickle_sem(c->accesslock);
473 	/* <<<< ACCESS LOCKING ENDS <<<< */
474 
475 	if (NULL != removed) {
476 		DEBUG_ENSURE(CHILD_GONE_RESP == removed ||
477 			     BLOCKING_RESP_MAGIC == removed->magic_sig);
478 	}
479 	if (CHILD_GONE_RESP == removed) {
480 		cleanup_after_child(c);
481 		removed = NULL;
482 	}
483 
484 	return removed;
485 }
486 
487 /* --------------------------------------------------------------------
488  * Light up a new worker.
489  */
490 static void
491 start_blocking_thread(
492 	blocking_child *	c
493 	)
494 {
495 
496 	DEBUG_INSIST(!c->reusable);
497 
498 	prepare_child_sems(c);
499 	start_blocking_thread_internal(c);
500 }
501 
502 /* --------------------------------------------------------------------
503  * Create a worker thread. There are several differences between POSIX
504  * and Windows, of course -- most notably the Windows thread is no
505  * detached thread, and we keep the handle around until we want to get
506  * rid of the thread. The notification scheme also differs: Windows
507  * makes use of semaphores in both directions, POSIX uses a pipe for
508  * integration with 'select()' or alike.
509  */
510 static void
511 start_blocking_thread_internal(
512 	blocking_child *	c
513 	)
514 #ifdef SYS_WINNT
515 {
516 	BOOL	resumed;
517 
518 	c->thread_ref = NULL;
519 	(*addremove_io_semaphore)(c->responses_pending->shnd, FALSE);
520 	c->thr_table[0].thnd =
521 		(HANDLE)_beginthreadex(
522 			NULL,
523 			0,
524 			&blocking_thread,
525 			c,
526 			CREATE_SUSPENDED,
527 			NULL);
528 
529 	if (NULL == c->thr_table[0].thnd) {
530 		msyslog(LOG_ERR, "start blocking thread failed: %m");
531 		exit(-1);
532 	}
533 	/* remember the thread priority is only within the process class */
534 	if (!SetThreadPriority(c->thr_table[0].thnd,
535 			       THREAD_PRIORITY_BELOW_NORMAL))
536 		msyslog(LOG_ERR, "Error lowering blocking thread priority: %m");
537 
538 	resumed = ResumeThread(c->thr_table[0].thnd);
539 	DEBUG_INSIST(resumed);
540 	c->thread_ref = &c->thr_table[0];
541 }
542 #else	/* pthreads start_blocking_thread_internal() follows */
543 {
544 # ifdef NEED_PTHREAD_INIT
545 	static int	pthread_init_called;
546 # endif
547 	pthread_attr_t	thr_attr;
548 	int		rc;
549 	int		pipe_ends[2];	/* read then write */
550 	int		is_pipe;
551 	int		flags;
552 	size_t		ostacksize;
553 	size_t		nstacksize;
554 	sigset_t	saved_sig_mask;
555 
556 	c->thread_ref = NULL;
557 
558 # ifdef NEED_PTHREAD_INIT
559 	/*
560 	 * from lib/isc/unix/app.c:
561 	 * BSDI 3.1 seg faults in pthread_sigmask() if we don't do this.
562 	 */
563 	if (!pthread_init_called) {
564 		pthread_init();
565 		pthread_init_called = TRUE;
566 	}
567 # endif
568 
569 	rc = pipe_socketpair(&pipe_ends[0], &is_pipe);
570 	if (0 != rc) {
571 		msyslog(LOG_ERR, "start_blocking_thread: pipe_socketpair() %m");
572 		exit(1);
573 	}
574 	c->resp_read_pipe = move_fd(pipe_ends[0]);
575 	c->resp_write_pipe = move_fd(pipe_ends[1]);
576 	c->ispipe = is_pipe;
577 	flags = fcntl(c->resp_read_pipe, F_GETFL, 0);
578 	if (-1 == flags) {
579 		msyslog(LOG_ERR, "start_blocking_thread: fcntl(F_GETFL) %m");
580 		exit(1);
581 	}
582 	rc = fcntl(c->resp_read_pipe, F_SETFL, O_NONBLOCK | flags);
583 	if (-1 == rc) {
584 		msyslog(LOG_ERR,
585 			"start_blocking_thread: fcntl(F_SETFL, O_NONBLOCK) %m");
586 		exit(1);
587 	}
588 	(*addremove_io_fd)(c->resp_read_pipe, c->ispipe, FALSE);
589 	pthread_attr_init(&thr_attr);
590 	pthread_attr_setdetachstate(&thr_attr, PTHREAD_CREATE_DETACHED);
591 #if defined(HAVE_PTHREAD_ATTR_GETSTACKSIZE) && \
592     defined(HAVE_PTHREAD_ATTR_SETSTACKSIZE)
593 	rc = pthread_attr_getstacksize(&thr_attr, &ostacksize);
594 	if (0 != rc) {
595 		msyslog(LOG_ERR,
596 			"start_blocking_thread: pthread_attr_getstacksize() -> %s",
597 			strerror(rc));
598 	} else {
599 		if (ostacksize < THREAD_MINSTACKSIZE)
600 			nstacksize = THREAD_MINSTACKSIZE;
601 		else if (ostacksize > THREAD_MAXSTACKSIZE)
602 			nstacksize = THREAD_MAXSTACKSIZE;
603 		else
604 			nstacksize = ostacksize;
605 		if (nstacksize != ostacksize)
606 			rc = pthread_attr_setstacksize(&thr_attr, nstacksize);
607 		if (0 != rc)
608 			msyslog(LOG_ERR,
609 				"start_blocking_thread: pthread_attr_setstacksize(0x%lx -> 0x%lx) -> %s",
610 				(u_long)ostacksize, (u_long)nstacksize,
611 				strerror(rc));
612 	}
613 #else
614 	UNUSED_ARG(nstacksize);
615 	UNUSED_ARG(ostacksize);
616 #endif
617 #if defined(PTHREAD_SCOPE_SYSTEM) && defined(NEED_PTHREAD_SCOPE_SYSTEM)
618 	pthread_attr_setscope(&thr_attr, PTHREAD_SCOPE_SYSTEM);
619 #endif
620 	c->thread_ref = emalloc_zero(sizeof(*c->thread_ref));
621 	block_thread_signals(&saved_sig_mask);
622 	rc = pthread_create(&c->thr_table[0], &thr_attr,
623 			    &blocking_thread, c);
624 	pthread_sigmask(SIG_SETMASK, &saved_sig_mask, NULL);
625 	pthread_attr_destroy(&thr_attr);
626 	if (0 != rc) {
627 		msyslog(LOG_ERR, "start_blocking_thread: pthread_create() -> %s",
628 			strerror(rc));
629 		exit(1);
630 	}
631 	c->thread_ref = &c->thr_table[0];
632 }
633 #endif
634 
635 /* --------------------------------------------------------------------
636  * block_thread_signals()
637  *
638  * Temporarily block signals used by ntpd main thread, so that signal
639  * mask inherited by child threads leaves them blocked.  Returns prior
640  * active signal mask via pmask, to be restored by the main thread
641  * after pthread_create().
642  */
643 #ifndef SYS_WINNT
644 void
645 block_thread_signals(
646 	sigset_t *	pmask
647 	)
648 {
649 	sigset_t	block;
650 
651 	sigemptyset(&block);
652 # ifdef HAVE_SIGNALED_IO
653 #  ifdef SIGIO
654 	sigaddset(&block, SIGIO);
655 #  endif
656 #  ifdef SIGPOLL
657 	sigaddset(&block, SIGPOLL);
658 #  endif
659 # endif	/* HAVE_SIGNALED_IO */
660 	sigaddset(&block, SIGALRM);
661 	sigaddset(&block, MOREDEBUGSIG);
662 	sigaddset(&block, LESSDEBUGSIG);
663 # ifdef SIGDIE1
664 	sigaddset(&block, SIGDIE1);
665 # endif
666 # ifdef SIGDIE2
667 	sigaddset(&block, SIGDIE2);
668 # endif
669 # ifdef SIGDIE3
670 	sigaddset(&block, SIGDIE3);
671 # endif
672 # ifdef SIGDIE4
673 	sigaddset(&block, SIGDIE4);
674 # endif
675 # ifdef SIGBUS
676 	sigaddset(&block, SIGBUS);
677 # endif
678 	sigemptyset(pmask);
679 	pthread_sigmask(SIG_BLOCK, &block, pmask);
680 }
681 #endif	/* !SYS_WINNT */
682 
683 
684 /* --------------------------------------------------------------------
685  * Create & destroy semaphores. This is sufficiently different between
686  * POSIX and Windows to warrant wrapper functions and close enough to
687  * use the concept of synchronization via semaphore for all platforms.
688  */
689 static sem_ref
690 create_sema(
691 	sema_type*	semptr,
692 	u_int		inival,
693 	u_int		maxval)
694 {
695 #ifdef SYS_WINNT
696 
697 	long svini, svmax;
698 	if (NULL != semptr) {
699 		svini = (inival < LONG_MAX)
700 		    ? (long)inival : LONG_MAX;
701 		svmax = (maxval < LONG_MAX && maxval > 0)
702 		    ? (long)maxval : LONG_MAX;
703 		semptr->shnd = CreateSemaphore(NULL, svini, svmax, NULL);
704 		if (NULL == semptr->shnd)
705 			semptr = NULL;
706 	}
707 
708 #else
709 
710 	(void)maxval;
711 	if (semptr && sem_init(semptr, FALSE, inival))
712 		semptr = NULL;
713 
714 #endif
715 
716 	return semptr;
717 }
718 
719 /* ------------------------------------------------------------------ */
720 static sem_ref
721 delete_sema(
722 	sem_ref obj)
723 {
724 
725 #   ifdef SYS_WINNT
726 
727 	if (obj) {
728 		if (obj->shnd)
729 			CloseHandle(obj->shnd);
730 		obj->shnd = NULL;
731 	}
732 
733 #   else
734 
735 	if (obj)
736 		sem_destroy(obj);
737 
738 #   endif
739 
740 	return NULL;
741 }
742 
743 /* --------------------------------------------------------------------
744  * prepare_child_sems()
745  *
746  * create sync & access semaphores
747  *
748  * All semaphores are cleared, only the access semaphore has 1 unit.
749  * Childs wait on 'workitems_pending', then grabs 'sema_access'
750  * and dequeues jobs. When done, 'sema_access' is given one unit back.
751  *
752  * The producer grabs 'sema_access', manages the queue, restores
753  * 'sema_access' and puts one unit into 'workitems_pending'.
754  *
755  * The story goes the same for the response queue.
756  */
757 static void
758 prepare_child_sems(
759 	blocking_child *c
760 	)
761 {
762 	if (NULL == worker_memlock)
763 		worker_memlock = create_sema(&worker_mmutex, 1, 1);
764 
765 	c->accesslock           = create_sema(&c->sem_table[0], 1, 1);
766 	c->workitems_pending    = create_sema(&c->sem_table[1], 0, 0);
767 	c->wake_scheduled_sleep = create_sema(&c->sem_table[2], 0, 1);
768 #   ifndef WORK_PIPE
769 	c->responses_pending    = create_sema(&c->sem_table[3], 0, 0);
770 #   endif
771 }
772 
773 /* --------------------------------------------------------------------
774  * wait for semaphore. Where the wait can be interrupted, it will
775  * internally resume -- When this function returns, there is either no
776  * semaphore at all, a timeout occurred, or the caller could
777  * successfully take a token from the semaphore.
778  *
779  * For untimed wait, not checking the result of this function at all is
780  * definitely an option.
781  */
782 static int
783 wait_for_sem(
784 	sem_ref			sem,
785 	struct timespec *	timeout		/* wall-clock */
786 	)
787 #ifdef SYS_WINNT
788 {
789 	struct timespec now;
790 	struct timespec delta;
791 	DWORD		msec;
792 	DWORD		rc;
793 
794 	if (!(sem && sem->shnd)) {
795 		errno = EINVAL;
796 		return -1;
797 	}
798 
799 	if (NULL == timeout) {
800 		msec = INFINITE;
801 	} else {
802 		getclock(TIMEOFDAY, &now);
803 		delta = sub_tspec(*timeout, now);
804 		if (delta.tv_sec < 0) {
805 			msec = 0;
806 		} else if ((delta.tv_sec + 1) >= (MAXDWORD / 1000)) {
807 			msec = INFINITE;
808 		} else {
809 			msec = 1000 * (DWORD)delta.tv_sec;
810 			msec += delta.tv_nsec / (1000 * 1000);
811 		}
812 	}
813 	rc = WaitForSingleObject(sem->shnd, msec);
814 	if (WAIT_OBJECT_0 == rc)
815 		return 0;
816 	if (WAIT_TIMEOUT == rc) {
817 		errno = ETIMEDOUT;
818 		return -1;
819 	}
820 	msyslog(LOG_ERR, "WaitForSingleObject unexpected 0x%x", rc);
821 	errno = EFAULT;
822 	return -1;
823 }
824 #else	/* pthreads wait_for_sem() follows */
825 {
826 	int rc = -1;
827 
828 	if (sem) do {
829 			if (NULL == timeout)
830 				rc = sem_wait(sem);
831 			else
832 				rc = sem_timedwait(sem, timeout);
833 		} while (rc == -1 && errno == EINTR);
834 	else
835 		errno = EINVAL;
836 
837 	return rc;
838 }
839 #endif
840 
841 /* --------------------------------------------------------------------
842  * blocking_thread - thread functions have WINAPI (aka 'stdcall')
843  * calling conventions under Windows and POSIX-defined signature
844  * otherwise.
845  */
846 #ifdef SYS_WINNT
847 u_int WINAPI
848 #else
849 void *
850 #endif
851 blocking_thread(
852 	void *	ThreadArg
853 	)
854 {
855 	blocking_child *c;
856 
857 	c = ThreadArg;
858 	exit_worker(blocking_child_common(c));
859 
860 	/* NOTREACHED */
861 	return 0;
862 }
863 
864 /* --------------------------------------------------------------------
865  * req_child_exit() runs in the parent.
866  *
867  * This function is called from from the idle timer, too, and possibly
868  * without a thread being there any longer. Since we have folded up our
869  * tent in that case and all the semaphores are already gone, we simply
870  * ignore this request in this case.
871  *
872  * Since the existence of the semaphores is controlled exclusively by
873  * the parent, there's no risk of data race here.
874  */
875 int
876 req_child_exit(
877 	blocking_child *c
878 	)
879 {
880 	return (c->accesslock)
881 	    ? queue_req_pointer(c, CHILD_EXIT_REQ)
882 	    : 0;
883 }
884 
885 /* --------------------------------------------------------------------
886  * cleanup_after_child() runs in parent.
887  */
888 static void
889 cleanup_after_child(
890 	blocking_child *	c
891 	)
892 {
893 	DEBUG_INSIST(!c->reusable);
894 
895 #   ifdef SYS_WINNT
896 	/* The thread was not created in detached state, so we better
897 	 * clean up.
898 	 */
899 	if (c->thread_ref && c->thread_ref->thnd) {
900 		WaitForSingleObject(c->thread_ref->thnd, INFINITE);
901 		INSIST(CloseHandle(c->thread_ref->thnd));
902 		c->thread_ref->thnd = NULL;
903 	}
904 #   endif
905 	c->thread_ref = NULL;
906 
907 	/* remove semaphores and (if signalling vi IO) pipes */
908 
909 	c->accesslock           = delete_sema(c->accesslock);
910 	c->workitems_pending    = delete_sema(c->workitems_pending);
911 	c->wake_scheduled_sleep = delete_sema(c->wake_scheduled_sleep);
912 
913 #   ifdef WORK_PIPE
914 	DEBUG_INSIST(-1 != c->resp_read_pipe);
915 	DEBUG_INSIST(-1 != c->resp_write_pipe);
916 	(*addremove_io_fd)(c->resp_read_pipe, c->ispipe, TRUE);
917 	close(c->resp_write_pipe);
918 	close(c->resp_read_pipe);
919 	c->resp_write_pipe = -1;
920 	c->resp_read_pipe = -1;
921 #   else
922 	DEBUG_INSIST(NULL != c->responses_pending);
923 	(*addremove_io_semaphore)(c->responses_pending->shnd, TRUE);
924 	c->responses_pending = delete_sema(c->responses_pending);
925 #   endif
926 
927 	/* Is it necessary to check if there are pending requests and
928 	 * responses? If so, and if there are, what to do with them?
929 	 */
930 
931 	/* re-init buffer index sequencers */
932 	c->head_workitem = 0;
933 	c->tail_workitem = 0;
934 	c->head_response = 0;
935 	c->tail_response = 0;
936 
937 	c->reusable = TRUE;
938 }
939 
940 
941 #else	/* !WORK_THREAD follows */
942 char work_thread_nonempty_compilation_unit;
943 #endif
944