xref: /netbsd-src/external/bsd/ntp/dist/libntp/work_thread.c (revision 946379e7b37692fc43f68eb0d1c10daa0a7f3b6c)
1 /*	$NetBSD: work_thread.c,v 1.4 2016/01/08 21:35:39 christos Exp $	*/
2 
3 /*
4  * work_thread.c - threads implementation for blocking worker child.
5  */
6 #include <config.h>
7 #include "ntp_workimpl.h"
8 
9 #ifdef WORK_THREAD
10 
11 #include <stdio.h>
12 #include <ctype.h>
13 #include <signal.h>
14 #ifndef SYS_WINNT
15 #include <pthread.h>
16 #endif
17 
18 #include "ntp_stdlib.h"
19 #include "ntp_malloc.h"
20 #include "ntp_syslog.h"
21 #include "ntpd.h"
22 #include "ntp_io.h"
23 #include "ntp_assert.h"
24 #include "ntp_unixtime.h"
25 #include "timespecops.h"
26 #include "ntp_worker.h"
27 
28 #define CHILD_EXIT_REQ	((blocking_pipe_header *)(intptr_t)-1)
29 #define CHILD_GONE_RESP	CHILD_EXIT_REQ
30 #define WORKITEMS_ALLOC_INC	16
31 #define RESPONSES_ALLOC_INC	4
32 
33 #ifndef THREAD_MINSTACKSIZE
34 #define THREAD_MINSTACKSIZE	(64U * 1024)
35 #endif
36 
37 #ifdef SYS_WINNT
38 
39 # define thread_exit(c)	_endthreadex(c)
40 # define tickle_sem(sh) ReleaseSemaphore((sh->shnd), 1, NULL)
41 u_int	WINAPI	blocking_thread(void *);
42 static BOOL	same_os_sema(const sem_ref obj, void * osobj);
43 
44 #else
45 
46 # define thread_exit(c)	pthread_exit((void*)(size_t)(c))
47 # define tickle_sem	sem_post
48 void *		blocking_thread(void *);
49 static	void	block_thread_signals(sigset_t *);
50 
51 #endif
52 
53 #ifdef WORK_PIPE
54 addremove_io_fd_func		addremove_io_fd;
55 #else
56 addremove_io_semaphore_func	addremove_io_semaphore;
57 #endif
58 
59 static	void	start_blocking_thread(blocking_child *);
60 static	void	start_blocking_thread_internal(blocking_child *);
61 static	void	prepare_child_sems(blocking_child *);
62 static	int	wait_for_sem(sem_ref, struct timespec *);
63 static	int	ensure_workitems_empty_slot(blocking_child *);
64 static	int	ensure_workresp_empty_slot(blocking_child *);
65 static	int	queue_req_pointer(blocking_child *, blocking_pipe_header *);
66 static	void	cleanup_after_child(blocking_child *);
67 
68 
69 void
70 exit_worker(
71 	int	exitcode
72 	)
73 {
74 	thread_exit(exitcode);	/* see #define thread_exit */
75 }
76 
77 /* --------------------------------------------------------------------
78  * sleep for a given time or until the wakup semaphore is tickled.
79  */
80 int
81 worker_sleep(
82 	blocking_child *	c,
83 	time_t			seconds
84 	)
85 {
86 	struct timespec	until;
87 	int		rc;
88 
89 # ifdef HAVE_CLOCK_GETTIME
90 	if (0 != clock_gettime(CLOCK_REALTIME, &until)) {
91 		msyslog(LOG_ERR, "worker_sleep: clock_gettime() failed: %m");
92 		return -1;
93 	}
94 # else
95 	if (0 != getclock(TIMEOFDAY, &until)) {
96 		msyslog(LOG_ERR, "worker_sleep: getclock() failed: %m");
97 		return -1;
98 	}
99 # endif
100 	until.tv_sec += seconds;
101 	rc = wait_for_sem(c->wake_scheduled_sleep, &until);
102 	if (0 == rc)
103 		return -1;
104 	if (-1 == rc && ETIMEDOUT == errno)
105 		return 0;
106 	msyslog(LOG_ERR, "worker_sleep: sem_timedwait: %m");
107 	return -1;
108 }
109 
110 
111 /* --------------------------------------------------------------------
112  * Wake up a worker that takes a nap.
113  */
114 void
115 interrupt_worker_sleep(void)
116 {
117 	u_int			idx;
118 	blocking_child *	c;
119 
120 	for (idx = 0; idx < blocking_children_alloc; idx++) {
121 		c = blocking_children[idx];
122 		if (NULL == c || NULL == c->wake_scheduled_sleep)
123 			continue;
124 		tickle_sem(c->wake_scheduled_sleep);
125 	}
126 }
127 
128 /* --------------------------------------------------------------------
129  * Make sure there is an empty slot at the head of the request
130  * queue. Tell if the queue is currently empty.
131  */
132 static int
133 ensure_workitems_empty_slot(
134 	blocking_child *c
135 	)
136 {
137 	/*
138 	** !!! PRECONDITION: caller holds access lock!
139 	**
140 	** This simply tries to increase the size of the buffer if it
141 	** becomes full. The resize operation does *not* maintain the
142 	** order of requests, but that should be irrelevant since the
143 	** processing is considered asynchronous anyway.
144 	**
145 	** Return if the buffer is currently empty.
146 	*/
147 
148 	static const size_t each =
149 	    sizeof(blocking_children[0]->workitems[0]);
150 
151 	size_t	new_alloc;
152 	size_t  slots_used;
153 
154 	slots_used = c->head_workitem - c->tail_workitem;
155 	if (slots_used >= c->workitems_alloc) {
156 		new_alloc  = c->workitems_alloc + WORKITEMS_ALLOC_INC;
157 		c->workitems = erealloc(c->workitems, new_alloc * each);
158 		c->tail_workitem   = 0;
159 		c->head_workitem   = c->workitems_alloc;
160 		c->workitems_alloc = new_alloc;
161 	}
162 	return (0 == slots_used);
163 }
164 
165 /* --------------------------------------------------------------------
166  * Make sure there is an empty slot at the head of the response
167  * queue. Tell if the queue is currently empty.
168  */
169 static int
170 ensure_workresp_empty_slot(
171 	blocking_child *c
172 	)
173 {
174 	/*
175 	** !!! PRECONDITION: caller holds access lock!
176 	**
177 	** Works like the companion function above.
178 	*/
179 
180 	static const size_t each =
181 	    sizeof(blocking_children[0]->responses[0]);
182 
183 	size_t	new_alloc;
184 	size_t  slots_used;
185 
186 	slots_used = c->head_response - c->tail_response;
187 	if (slots_used >= c->responses_alloc) {
188 		new_alloc  = c->responses_alloc + RESPONSES_ALLOC_INC;
189 		c->responses = erealloc(c->responses, new_alloc * each);
190 		c->tail_response   = 0;
191 		c->head_response   = c->responses_alloc;
192 		c->responses_alloc = new_alloc;
193 	}
194 	return (0 == slots_used);
195 }
196 
197 
198 /* --------------------------------------------------------------------
199  * queue_req_pointer() - append a work item or idle exit request to
200  *			 blocking_workitems[]. Employ proper locking.
201  */
202 static int
203 queue_req_pointer(
204 	blocking_child	*	c,
205 	blocking_pipe_header *	hdr
206 	)
207 {
208 	size_t qhead;
209 
210 	/* >>>> ACCESS LOCKING STARTS >>>> */
211 	wait_for_sem(c->accesslock, NULL);
212 	ensure_workitems_empty_slot(c);
213 	qhead = c->head_workitem;
214 	c->workitems[qhead % c->workitems_alloc] = hdr;
215 	c->head_workitem = 1 + qhead;
216 	tickle_sem(c->accesslock);
217 	/* <<<< ACCESS LOCKING ENDS <<<< */
218 
219 	/* queue consumer wake-up notification */
220 	tickle_sem(c->workitems_pending);
221 
222 	return 0;
223 }
224 
225 /* --------------------------------------------------------------------
226  * API function to make sure a worker is running, a proper private copy
227  * of the data is made, the data eneterd into the queue and the worker
228  * is signalled.
229  */
230 int
231 send_blocking_req_internal(
232 	blocking_child *	c,
233 	blocking_pipe_header *	hdr,
234 	void *			data
235 	)
236 {
237 	blocking_pipe_header *	threadcopy;
238 	size_t			payload_octets;
239 
240 	REQUIRE(hdr != NULL);
241 	REQUIRE(data != NULL);
242 	DEBUG_REQUIRE(BLOCKING_REQ_MAGIC == hdr->magic_sig);
243 
244 	if (hdr->octets <= sizeof(*hdr))
245 		return 1;	/* failure */
246 	payload_octets = hdr->octets - sizeof(*hdr);
247 
248 	if (NULL == c->thread_ref)
249 		start_blocking_thread(c);
250 	threadcopy = emalloc(hdr->octets);
251 	memcpy(threadcopy, hdr, sizeof(*hdr));
252 	memcpy((char *)threadcopy + sizeof(*hdr), data, payload_octets);
253 
254 	return queue_req_pointer(c, threadcopy);
255 }
256 
257 /* --------------------------------------------------------------------
258  * Wait for the 'incoming queue no longer empty' signal, lock the shared
259  * structure and dequeue an item.
260  */
261 blocking_pipe_header *
262 receive_blocking_req_internal(
263 	blocking_child *	c
264 	)
265 {
266 	blocking_pipe_header *	req;
267 	size_t			qhead, qtail;
268 
269 	req = NULL;
270 	do {
271 		/* wait for tickle from the producer side */
272 		wait_for_sem(c->workitems_pending, NULL);
273 
274 		/* >>>> ACCESS LOCKING STARTS >>>> */
275 		wait_for_sem(c->accesslock, NULL);
276 		qhead = c->head_workitem;
277 		do {
278 			qtail = c->tail_workitem;
279 			if (qhead == qtail)
280 				break;
281 			c->tail_workitem = qtail + 1;
282 			qtail %= c->workitems_alloc;
283 			req = c->workitems[qtail];
284 			c->workitems[qtail] = NULL;
285 		} while (NULL == req);
286 		tickle_sem(c->accesslock);
287 		/* <<<< ACCESS LOCKING ENDS <<<< */
288 
289 	} while (NULL == req);
290 
291 	INSIST(NULL != req);
292 	if (CHILD_EXIT_REQ == req) {	/* idled out */
293 		send_blocking_resp_internal(c, CHILD_GONE_RESP);
294 		req = NULL;
295 	}
296 
297 	return req;
298 }
299 
300 /* --------------------------------------------------------------------
301  * Push a response into the return queue and eventually tickle the
302  * receiver.
303  */
304 int
305 send_blocking_resp_internal(
306 	blocking_child *	c,
307 	blocking_pipe_header *	resp
308 	)
309 {
310 	size_t	qhead;
311 	int	empty;
312 
313 	/* >>>> ACCESS LOCKING STARTS >>>> */
314 	wait_for_sem(c->accesslock, NULL);
315 	empty = ensure_workresp_empty_slot(c);
316 	qhead = c->head_response;
317 	c->responses[qhead % c->responses_alloc] = resp;
318 	c->head_response = 1 + qhead;
319 	tickle_sem(c->accesslock);
320 	/* <<<< ACCESS LOCKING ENDS <<<< */
321 
322 	/* queue consumer wake-up notification */
323 	if (empty)
324 	{
325 #	    ifdef WORK_PIPE
326 		write(c->resp_write_pipe, "", 1);
327 #	    else
328 		tickle_sem(c->responses_pending);
329 #	    endif
330 	}
331 	return 0;
332 }
333 
334 
335 #ifndef WORK_PIPE
336 
337 /* --------------------------------------------------------------------
338  * Check if a (Windows-)hanndle to a semaphore is actually the same we
339  * are using inside the sema wrapper.
340  */
341 static BOOL
342 same_os_sema(
343 	const sem_ref	obj,
344 	void*		osh
345 	)
346 {
347 	return obj && osh && (obj->shnd == (HANDLE)osh);
348 }
349 
350 /* --------------------------------------------------------------------
351  * Find the shared context that associates to an OS handle and make sure
352  * the data is dequeued and processed.
353  */
354 void
355 handle_blocking_resp_sem(
356 	void *	context
357 	)
358 {
359 	blocking_child *	c;
360 	u_int			idx;
361 
362 	c = NULL;
363 	for (idx = 0; idx < blocking_children_alloc; idx++) {
364 		c = blocking_children[idx];
365 		if (c != NULL &&
366 			c->thread_ref != NULL &&
367 			same_os_sema(c->responses_pending, context))
368 			break;
369 	}
370 	if (idx < blocking_children_alloc)
371 		process_blocking_resp(c);
372 }
373 #endif	/* !WORK_PIPE */
374 
375 /* --------------------------------------------------------------------
376  * Fetch the next response from the return queue. In case of signalling
377  * via pipe, make sure the pipe is flushed, too.
378  */
379 blocking_pipe_header *
380 receive_blocking_resp_internal(
381 	blocking_child *	c
382 	)
383 {
384 	blocking_pipe_header *	removed;
385 	size_t			qhead, qtail, slot;
386 
387 #ifdef WORK_PIPE
388 	int			rc;
389 	char			scratch[32];
390 
391 	do
392 		rc = read(c->resp_read_pipe, scratch, sizeof(scratch));
393 	while (-1 == rc && EINTR == errno);
394 #endif
395 
396 	/* >>>> ACCESS LOCKING STARTS >>>> */
397 	wait_for_sem(c->accesslock, NULL);
398 	qhead = c->head_response;
399 	qtail = c->tail_response;
400 	for (removed = NULL; !removed && (qhead != qtail); ++qtail) {
401 		slot = qtail % c->responses_alloc;
402 		removed = c->responses[slot];
403 		c->responses[slot] = NULL;
404 	}
405 	c->tail_response = qtail;
406 	tickle_sem(c->accesslock);
407 	/* <<<< ACCESS LOCKING ENDS <<<< */
408 
409 	if (NULL != removed) {
410 		DEBUG_ENSURE(CHILD_GONE_RESP == removed ||
411 			     BLOCKING_RESP_MAGIC == removed->magic_sig);
412 	}
413 	if (CHILD_GONE_RESP == removed) {
414 		cleanup_after_child(c);
415 		removed = NULL;
416 	}
417 
418 	return removed;
419 }
420 
421 /* --------------------------------------------------------------------
422  * Light up a new worker.
423  */
424 static void
425 start_blocking_thread(
426 	blocking_child *	c
427 	)
428 {
429 
430 	DEBUG_INSIST(!c->reusable);
431 
432 	prepare_child_sems(c);
433 	start_blocking_thread_internal(c);
434 }
435 
436 /* --------------------------------------------------------------------
437  * Create a worker thread. There are several differences between POSIX
438  * and Windows, of course -- most notably the Windows thread is no
439  * detached thread, and we keep the handle around until we want to get
440  * rid of the thread. The notification scheme also differs: Windows
441  * makes use of semaphores in both directions, POSIX uses a pipe for
442  * integration with 'select()' or alike.
443  */
444 static void
445 start_blocking_thread_internal(
446 	blocking_child *	c
447 	)
448 #ifdef SYS_WINNT
449 {
450 	BOOL	resumed;
451 
452 	c->thread_ref = NULL;
453 	(*addremove_io_semaphore)(c->responses_pending->shnd, FALSE);
454 	c->thr_table[0].thnd =
455 		(HANDLE)_beginthreadex(
456 			NULL,
457 			0,
458 			&blocking_thread,
459 			c,
460 			CREATE_SUSPENDED,
461 			NULL);
462 
463 	if (NULL == c->thr_table[0].thnd) {
464 		msyslog(LOG_ERR, "start blocking thread failed: %m");
465 		exit(-1);
466 	}
467 	/* remember the thread priority is only within the process class */
468 	if (!SetThreadPriority(c->thr_table[0].thnd,
469 			       THREAD_PRIORITY_BELOW_NORMAL))
470 		msyslog(LOG_ERR, "Error lowering blocking thread priority: %m");
471 
472 	resumed = ResumeThread(c->thr_table[0].thnd);
473 	DEBUG_INSIST(resumed);
474 	c->thread_ref = &c->thr_table[0];
475 }
476 #else	/* pthreads start_blocking_thread_internal() follows */
477 {
478 # ifdef NEED_PTHREAD_INIT
479 	static int	pthread_init_called;
480 # endif
481 	pthread_attr_t	thr_attr;
482 	int		rc;
483 	int		saved_errno;
484 	int		pipe_ends[2];	/* read then write */
485 	int		is_pipe;
486 	int		flags;
487 	size_t		stacksize;
488 	sigset_t	saved_sig_mask;
489 
490 	c->thread_ref = NULL;
491 
492 # ifdef NEED_PTHREAD_INIT
493 	/*
494 	 * from lib/isc/unix/app.c:
495 	 * BSDI 3.1 seg faults in pthread_sigmask() if we don't do this.
496 	 */
497 	if (!pthread_init_called) {
498 		pthread_init();
499 		pthread_init_called = TRUE;
500 	}
501 # endif
502 
503 	rc = pipe_socketpair(&pipe_ends[0], &is_pipe);
504 	if (0 != rc) {
505 		msyslog(LOG_ERR, "start_blocking_thread: pipe_socketpair() %m");
506 		exit(1);
507 	}
508 	c->resp_read_pipe = move_fd(pipe_ends[0]);
509 	c->resp_write_pipe = move_fd(pipe_ends[1]);
510 	c->ispipe = is_pipe;
511 	flags = fcntl(c->resp_read_pipe, F_GETFL, 0);
512 	if (-1 == flags) {
513 		msyslog(LOG_ERR, "start_blocking_thread: fcntl(F_GETFL) %m");
514 		exit(1);
515 	}
516 	rc = fcntl(c->resp_read_pipe, F_SETFL, O_NONBLOCK | flags);
517 	if (-1 == rc) {
518 		msyslog(LOG_ERR,
519 			"start_blocking_thread: fcntl(F_SETFL, O_NONBLOCK) %m");
520 		exit(1);
521 	}
522 	(*addremove_io_fd)(c->resp_read_pipe, c->ispipe, FALSE);
523 	pthread_attr_init(&thr_attr);
524 	pthread_attr_setdetachstate(&thr_attr, PTHREAD_CREATE_DETACHED);
525 #if defined(HAVE_PTHREAD_ATTR_GETSTACKSIZE) && \
526     defined(HAVE_PTHREAD_ATTR_SETSTACKSIZE)
527 	rc = pthread_attr_getstacksize(&thr_attr, &stacksize);
528 	if (-1 == rc) {
529 		msyslog(LOG_ERR,
530 			"start_blocking_thread: pthread_attr_getstacksize %m");
531 	} else if (stacksize < THREAD_MINSTACKSIZE) {
532 		rc = pthread_attr_setstacksize(&thr_attr,
533 					       THREAD_MINSTACKSIZE);
534 		if (-1 == rc)
535 			msyslog(LOG_ERR,
536 				"start_blocking_thread: pthread_attr_setstacksize(0x%lx -> 0x%lx) %m",
537 				(u_long)stacksize,
538 				(u_long)THREAD_MINSTACKSIZE);
539 	}
540 #else
541 	UNUSED_ARG(stacksize);
542 #endif
543 #if defined(PTHREAD_SCOPE_SYSTEM) && defined(NEED_PTHREAD_SCOPE_SYSTEM)
544 	pthread_attr_setscope(&thr_attr, PTHREAD_SCOPE_SYSTEM);
545 #endif
546 	c->thread_ref = emalloc_zero(sizeof(*c->thread_ref));
547 	block_thread_signals(&saved_sig_mask);
548 	rc = pthread_create(&c->thr_table[0], &thr_attr,
549 			    &blocking_thread, c);
550 	saved_errno = errno;
551 	pthread_sigmask(SIG_SETMASK, &saved_sig_mask, NULL);
552 	pthread_attr_destroy(&thr_attr);
553 	if (0 != rc) {
554 		errno = saved_errno;
555 		msyslog(LOG_ERR, "pthread_create() blocking child: %m");
556 		exit(1);
557 	}
558 	c->thread_ref = &c->thr_table[0];
559 }
560 #endif
561 
562 /* --------------------------------------------------------------------
563  * block_thread_signals()
564  *
565  * Temporarily block signals used by ntpd main thread, so that signal
566  * mask inherited by child threads leaves them blocked.  Returns prior
567  * active signal mask via pmask, to be restored by the main thread
568  * after pthread_create().
569  */
570 #ifndef SYS_WINNT
571 void
572 block_thread_signals(
573 	sigset_t *	pmask
574 	)
575 {
576 	sigset_t	block;
577 
578 	sigemptyset(&block);
579 # ifdef HAVE_SIGNALED_IO
580 #  ifdef SIGIO
581 	sigaddset(&block, SIGIO);
582 #  endif
583 #  ifdef SIGPOLL
584 	sigaddset(&block, SIGPOLL);
585 #  endif
586 # endif	/* HAVE_SIGNALED_IO */
587 	sigaddset(&block, SIGALRM);
588 	sigaddset(&block, MOREDEBUGSIG);
589 	sigaddset(&block, LESSDEBUGSIG);
590 # ifdef SIGDIE1
591 	sigaddset(&block, SIGDIE1);
592 # endif
593 # ifdef SIGDIE2
594 	sigaddset(&block, SIGDIE2);
595 # endif
596 # ifdef SIGDIE3
597 	sigaddset(&block, SIGDIE3);
598 # endif
599 # ifdef SIGDIE4
600 	sigaddset(&block, SIGDIE4);
601 # endif
602 # ifdef SIGBUS
603 	sigaddset(&block, SIGBUS);
604 # endif
605 	sigemptyset(pmask);
606 	pthread_sigmask(SIG_BLOCK, &block, pmask);
607 }
608 #endif	/* !SYS_WINNT */
609 
610 
611 /* --------------------------------------------------------------------
612  * Create & destroy semaphores. This is sufficiently different between
613  * POSIX and Windows to warrant wrapper functions and close enough to
614  * use the concept of synchronization via semaphore for all platforms.
615  */
616 static sem_ref
617 create_sema(
618 	sema_type*	semptr,
619 	u_int		inival,
620 	u_int		maxval)
621 {
622 #ifdef SYS_WINNT
623 
624 	long svini, svmax;
625 	if (NULL != semptr) {
626 		svini = (inival < LONG_MAX)
627 		    ? (long)inival : LONG_MAX;
628 		svmax = (maxval < LONG_MAX && maxval > 0)
629 		    ? (long)maxval : LONG_MAX;
630 		semptr->shnd = CreateSemaphore(NULL, svini, svmax, NULL);
631 		if (NULL == semptr->shnd)
632 			semptr = NULL;
633 	}
634 
635 #else
636 
637 	(void)maxval;
638 	if (semptr && sem_init(semptr, FALSE, inival))
639 		semptr = NULL;
640 
641 #endif
642 
643 	return semptr;
644 }
645 
646 /* ------------------------------------------------------------------ */
647 static sem_ref
648 delete_sema(
649 	sem_ref obj)
650 {
651 
652 #   ifdef SYS_WINNT
653 
654 	if (obj) {
655 		if (obj->shnd)
656 			CloseHandle(obj->shnd);
657 		obj->shnd = NULL;
658 	}
659 
660 #   else
661 
662 	if (obj)
663 		sem_destroy(obj);
664 
665 #   endif
666 
667 	return NULL;
668 }
669 
670 /* --------------------------------------------------------------------
671  * prepare_child_sems()
672  *
673  * create sync & access semaphores
674  *
675  * All semaphores are cleared, only the access semaphore has 1 unit.
676  * Childs wait on 'workitems_pending', then grabs 'sema_access'
677  * and dequeues jobs. When done, 'sema_access' is given one unit back.
678  *
679  * The producer grabs 'sema_access', manages the queue, restores
680  * 'sema_access' and puts one unit into 'workitems_pending'.
681  *
682  * The story goes the same for the response queue.
683  */
684 static void
685 prepare_child_sems(
686 	blocking_child *c
687 	)
688 {
689 	c->accesslock           = create_sema(&c->sem_table[0], 1, 1);
690 	c->workitems_pending    = create_sema(&c->sem_table[1], 0, 0);
691 	c->wake_scheduled_sleep = create_sema(&c->sem_table[2], 0, 1);
692 #   ifndef WORK_PIPE
693 	c->responses_pending    = create_sema(&c->sem_table[3], 0, 0);
694 #   endif
695 }
696 
697 /* --------------------------------------------------------------------
698  * wait for semaphore. Where the wait can be interrupted, it will
699  * internally resume -- When this function returns, there is either no
700  * semaphore at all, a timeout occurred, or the caller could
701  * successfully take a token from the semaphore.
702  *
703  * For untimed wait, not checking the result of this function at all is
704  * definitely an option.
705  */
706 static int
707 wait_for_sem(
708 	sem_ref			sem,
709 	struct timespec *	timeout		/* wall-clock */
710 	)
711 #ifdef SYS_WINNT
712 {
713 	struct timespec now;
714 	struct timespec delta;
715 	DWORD		msec;
716 	DWORD		rc;
717 
718 	if (!(sem && sem->shnd)) {
719 		errno = EINVAL;
720 		return -1;
721 	}
722 
723 	if (NULL == timeout) {
724 		msec = INFINITE;
725 	} else {
726 		getclock(TIMEOFDAY, &now);
727 		delta = sub_tspec(*timeout, now);
728 		if (delta.tv_sec < 0) {
729 			msec = 0;
730 		} else if ((delta.tv_sec + 1) >= (MAXDWORD / 1000)) {
731 			msec = INFINITE;
732 		} else {
733 			msec = 1000 * (DWORD)delta.tv_sec;
734 			msec += delta.tv_nsec / (1000 * 1000);
735 		}
736 	}
737 	rc = WaitForSingleObject(sem->shnd, msec);
738 	if (WAIT_OBJECT_0 == rc)
739 		return 0;
740 	if (WAIT_TIMEOUT == rc) {
741 		errno = ETIMEDOUT;
742 		return -1;
743 	}
744 	msyslog(LOG_ERR, "WaitForSingleObject unexpected 0x%x", rc);
745 	errno = EFAULT;
746 	return -1;
747 }
748 #else	/* pthreads wait_for_sem() follows */
749 {
750 	int rc = -1;
751 
752 	if (sem) do {
753 			if (NULL == timeout)
754 				rc = sem_wait(sem);
755 			else
756 				rc = sem_timedwait(sem, timeout);
757 		} while (rc == -1 && errno == EINTR);
758 	else
759 		errno = EINVAL;
760 
761 	return rc;
762 }
763 #endif
764 
765 /* --------------------------------------------------------------------
766  * blocking_thread - thread functions have WINAPI (aka 'stdcall')
767  * calling conventions under Windows and POSIX-defined signature
768  * otherwise.
769  */
770 #ifdef SYS_WINNT
771 u_int WINAPI
772 #else
773 void *
774 #endif
775 blocking_thread(
776 	void *	ThreadArg
777 	)
778 {
779 	blocking_child *c;
780 
781 	c = ThreadArg;
782 	exit_worker(blocking_child_common(c));
783 
784 	/* NOTREACHED */
785 	return 0;
786 }
787 
788 /* --------------------------------------------------------------------
789  * req_child_exit() runs in the parent.
790  *
791  * This function is called from from the idle timer, too, and possibly
792  * without a thread being there any longer. Since we have folded up our
793  * tent in that case and all the semaphores are already gone, we simply
794  * ignore this request in this case.
795  *
796  * Since the existence of the semaphores is controlled exclusively by
797  * the parent, there's no risk of data race here.
798  */
799 int
800 req_child_exit(
801 	blocking_child *c
802 	)
803 {
804 	return (c->accesslock)
805 	    ? queue_req_pointer(c, CHILD_EXIT_REQ)
806 	    : 0;
807 }
808 
809 /* --------------------------------------------------------------------
810  * cleanup_after_child() runs in parent.
811  */
812 static void
813 cleanup_after_child(
814 	blocking_child *	c
815 	)
816 {
817 	DEBUG_INSIST(!c->reusable);
818 
819 #   ifdef SYS_WINNT
820 	/* The thread was not created in detached state, so we better
821 	 * clean up.
822 	 */
823 	if (c->thread_ref && c->thread_ref->thnd) {
824 		WaitForSingleObject(c->thread_ref->thnd, INFINITE);
825 		INSIST(CloseHandle(c->thread_ref->thnd));
826 		c->thread_ref->thnd = NULL;
827 	}
828 #   endif
829 	c->thread_ref = NULL;
830 
831 	/* remove semaphores and (if signalling vi IO) pipes */
832 
833 	c->accesslock           = delete_sema(c->accesslock);
834 	c->workitems_pending    = delete_sema(c->workitems_pending);
835 	c->wake_scheduled_sleep = delete_sema(c->wake_scheduled_sleep);
836 
837 #   ifdef WORK_PIPE
838 	DEBUG_INSIST(-1 != c->resp_read_pipe);
839 	DEBUG_INSIST(-1 != c->resp_write_pipe);
840 	(*addremove_io_fd)(c->resp_read_pipe, c->ispipe, TRUE);
841 	close(c->resp_write_pipe);
842 	close(c->resp_read_pipe);
843 	c->resp_write_pipe = -1;
844 	c->resp_read_pipe = -1;
845 #   else
846 	DEBUG_INSIST(NULL != c->responses_pending);
847 	(*addremove_io_semaphore)(c->responses_pending->shnd, TRUE);
848 	c->responses_pending = delete_sema(c->responses_pending);
849 #   endif
850 
851 	/* Is it necessary to check if there are pending requests and
852 	 * responses? If so, and if there are, what to do with them?
853 	 */
854 
855 	/* re-init buffer index sequencers */
856 	c->head_workitem = 0;
857 	c->tail_workitem = 0;
858 	c->head_response = 0;
859 	c->tail_response = 0;
860 
861 	c->reusable = TRUE;
862 }
863 
864 
865 #else	/* !WORK_THREAD follows */
866 char work_thread_nonempty_compilation_unit;
867 #endif
868