xref: /netbsd-src/external/bsd/ntp/dist/libntp/work_thread.c (revision c9496f6b604074a9451a67df576a5b423068e71e)
1 /*	$NetBSD: work_thread.c,v 1.5 2016/05/01 23:32:00 christos Exp $	*/
2 
3 /*
4  * work_thread.c - threads implementation for blocking worker child.
5  */
6 #include <config.h>
7 #include "ntp_workimpl.h"
8 
9 #ifdef WORK_THREAD
10 
11 #include <stdio.h>
12 #include <ctype.h>
13 #include <signal.h>
14 #ifndef SYS_WINNT
15 #include <pthread.h>
16 #endif
17 
18 #include "ntp_stdlib.h"
19 #include "ntp_malloc.h"
20 #include "ntp_syslog.h"
21 #include "ntpd.h"
22 #include "ntp_io.h"
23 #include "ntp_assert.h"
24 #include "ntp_unixtime.h"
25 #include "timespecops.h"
26 #include "ntp_worker.h"
27 
28 #define CHILD_EXIT_REQ	((blocking_pipe_header *)(intptr_t)-1)
29 #define CHILD_GONE_RESP	CHILD_EXIT_REQ
30 /* Queue size increments:
31  * The request queue grows a bit faster than the response queue -- the
32  * deamon can push requests and pull results faster on avarage than the
33  * worker can process requests and push results...  If this really pays
34  * off is debatable.
35  */
36 #define WORKITEMS_ALLOC_INC	16
37 #define RESPONSES_ALLOC_INC	4
38 
39 /* Fiddle with min/max stack sizes. 64kB minimum seems to work, so we
40  * set the maximum to 256kB. If the minimum goes below the
41  * system-defined minimum stack size, we have to adjust accordingly.
42  */
43 #ifndef THREAD_MINSTACKSIZE
44 # define THREAD_MINSTACKSIZE	(64U * 1024)
45 #endif
46 #ifndef __sun
47 #if defined(PTHREAD_STACK_MIN) && THREAD_MINSTACKSIZE < PTHREAD_STACK_MIN
48 # undef THREAD_MINSTACKSIZE
49 # define THREAD_MINSTACKSIZE PTHREAD_STACK_MIN
50 #endif
51 #endif
52 
53 #ifndef THREAD_MAXSTACKSIZE
54 # define THREAD_MAXSTACKSIZE	(256U * 1024)
55 #endif
56 #if THREAD_MAXSTACKSIZE < THREAD_MINSTACKSIZE
57 # undef  THREAD_MAXSTACKSIZE
58 # define THREAD_MAXSTACKSIZE THREAD_MINSTACKSIZE
59 #endif
60 
61 
62 #ifdef SYS_WINNT
63 
64 # define thread_exit(c)	_endthreadex(c)
65 # define tickle_sem(sh) ReleaseSemaphore((sh->shnd), 1, NULL)
66 u_int	WINAPI	blocking_thread(void *);
67 static BOOL	same_os_sema(const sem_ref obj, void * osobj);
68 
69 #else
70 
71 # define thread_exit(c)	pthread_exit((void*)(size_t)(c))
72 # define tickle_sem	sem_post
73 void *		blocking_thread(void *);
74 static	void	block_thread_signals(sigset_t *);
75 
76 #endif
77 
78 #ifdef WORK_PIPE
79 addremove_io_fd_func		addremove_io_fd;
80 #else
81 addremove_io_semaphore_func	addremove_io_semaphore;
82 #endif
83 
84 static	void	start_blocking_thread(blocking_child *);
85 static	void	start_blocking_thread_internal(blocking_child *);
86 static	void	prepare_child_sems(blocking_child *);
87 static	int	wait_for_sem(sem_ref, struct timespec *);
88 static	int	ensure_workitems_empty_slot(blocking_child *);
89 static	int	ensure_workresp_empty_slot(blocking_child *);
90 static	int	queue_req_pointer(blocking_child *, blocking_pipe_header *);
91 static	void	cleanup_after_child(blocking_child *);
92 
93 static sema_type worker_mmutex;
94 static sem_ref   worker_memlock;
95 
96 /* --------------------------------------------------------------------
97  * locking the global worker state table (and other global stuff)
98  */
99 void
100 worker_global_lock(
101 	int inOrOut)
102 {
103 	if (worker_memlock) {
104 		if (inOrOut)
105 			wait_for_sem(worker_memlock, NULL);
106 		else
107 			tickle_sem(worker_memlock);
108 	}
109 }
110 
111 /* --------------------------------------------------------------------
112  * implementation isolation wrapper
113  */
114 void
115 exit_worker(
116 	int	exitcode
117 	)
118 {
119 	thread_exit(exitcode);	/* see #define thread_exit */
120 }
121 
122 /* --------------------------------------------------------------------
123  * sleep for a given time or until the wakup semaphore is tickled.
124  */
125 int
126 worker_sleep(
127 	blocking_child *	c,
128 	time_t			seconds
129 	)
130 {
131 	struct timespec	until;
132 	int		rc;
133 
134 # ifdef HAVE_CLOCK_GETTIME
135 	if (0 != clock_gettime(CLOCK_REALTIME, &until)) {
136 		msyslog(LOG_ERR, "worker_sleep: clock_gettime() failed: %m");
137 		return -1;
138 	}
139 # else
140 	if (0 != getclock(TIMEOFDAY, &until)) {
141 		msyslog(LOG_ERR, "worker_sleep: getclock() failed: %m");
142 		return -1;
143 	}
144 # endif
145 	until.tv_sec += seconds;
146 	rc = wait_for_sem(c->wake_scheduled_sleep, &until);
147 	if (0 == rc)
148 		return -1;
149 	if (-1 == rc && ETIMEDOUT == errno)
150 		return 0;
151 	msyslog(LOG_ERR, "worker_sleep: sem_timedwait: %m");
152 	return -1;
153 }
154 
155 
156 /* --------------------------------------------------------------------
157  * Wake up a worker that takes a nap.
158  */
159 void
160 interrupt_worker_sleep(void)
161 {
162 	u_int			idx;
163 	blocking_child *	c;
164 
165 	for (idx = 0; idx < blocking_children_alloc; idx++) {
166 		c = blocking_children[idx];
167 		if (NULL == c || NULL == c->wake_scheduled_sleep)
168 			continue;
169 		tickle_sem(c->wake_scheduled_sleep);
170 	}
171 }
172 
173 /* --------------------------------------------------------------------
174  * Make sure there is an empty slot at the head of the request
175  * queue. Tell if the queue is currently empty.
176  */
177 static int
178 ensure_workitems_empty_slot(
179 	blocking_child *c
180 	)
181 {
182 	/*
183 	** !!! PRECONDITION: caller holds access lock!
184 	**
185 	** This simply tries to increase the size of the buffer if it
186 	** becomes full. The resize operation does *not* maintain the
187 	** order of requests, but that should be irrelevant since the
188 	** processing is considered asynchronous anyway.
189 	**
190 	** Return if the buffer is currently empty.
191 	*/
192 
193 	static const size_t each =
194 	    sizeof(blocking_children[0]->workitems[0]);
195 
196 	size_t	new_alloc;
197 	size_t  slots_used;
198 	size_t	sidx;
199 
200 	slots_used = c->head_workitem - c->tail_workitem;
201 	if (slots_used >= c->workitems_alloc) {
202 		new_alloc  = c->workitems_alloc + WORKITEMS_ALLOC_INC;
203 		c->workitems = erealloc(c->workitems, new_alloc * each);
204 		for (sidx = c->workitems_alloc; sidx < new_alloc; ++sidx)
205 		    c->workitems[sidx] = NULL;
206 		c->tail_workitem   = 0;
207 		c->head_workitem   = c->workitems_alloc;
208 		c->workitems_alloc = new_alloc;
209 	}
210 	INSIST(NULL == c->workitems[c->head_workitem % c->workitems_alloc]);
211 	return (0 == slots_used);
212 }
213 
214 /* --------------------------------------------------------------------
215  * Make sure there is an empty slot at the head of the response
216  * queue. Tell if the queue is currently empty.
217  */
218 static int
219 ensure_workresp_empty_slot(
220 	blocking_child *c
221 	)
222 {
223 	/*
224 	** !!! PRECONDITION: caller holds access lock!
225 	**
226 	** Works like the companion function above.
227 	*/
228 
229 	static const size_t each =
230 	    sizeof(blocking_children[0]->responses[0]);
231 
232 	size_t	new_alloc;
233 	size_t  slots_used;
234 	size_t	sidx;
235 
236 	slots_used = c->head_response - c->tail_response;
237 	if (slots_used >= c->responses_alloc) {
238 		new_alloc  = c->responses_alloc + RESPONSES_ALLOC_INC;
239 		c->responses = erealloc(c->responses, new_alloc * each);
240 		for (sidx = c->responses_alloc; sidx < new_alloc; ++sidx)
241 		    c->responses[sidx] = NULL;
242 		c->tail_response   = 0;
243 		c->head_response   = c->responses_alloc;
244 		c->responses_alloc = new_alloc;
245 	}
246 	INSIST(NULL == c->responses[c->head_response % c->responses_alloc]);
247 	return (0 == slots_used);
248 }
249 
250 
251 /* --------------------------------------------------------------------
252  * queue_req_pointer() - append a work item or idle exit request to
253  *			 blocking_workitems[]. Employ proper locking.
254  */
255 static int
256 queue_req_pointer(
257 	blocking_child	*	c,
258 	blocking_pipe_header *	hdr
259 	)
260 {
261 	size_t qhead;
262 
263 	/* >>>> ACCESS LOCKING STARTS >>>> */
264 	wait_for_sem(c->accesslock, NULL);
265 	ensure_workitems_empty_slot(c);
266 	qhead = c->head_workitem;
267 	c->workitems[qhead % c->workitems_alloc] = hdr;
268 	c->head_workitem = 1 + qhead;
269 	tickle_sem(c->accesslock);
270 	/* <<<< ACCESS LOCKING ENDS <<<< */
271 
272 	/* queue consumer wake-up notification */
273 	tickle_sem(c->workitems_pending);
274 
275 	return 0;
276 }
277 
278 /* --------------------------------------------------------------------
279  * API function to make sure a worker is running, a proper private copy
280  * of the data is made, the data eneterd into the queue and the worker
281  * is signalled.
282  */
283 int
284 send_blocking_req_internal(
285 	blocking_child *	c,
286 	blocking_pipe_header *	hdr,
287 	void *			data
288 	)
289 {
290 	blocking_pipe_header *	threadcopy;
291 	size_t			payload_octets;
292 
293 	REQUIRE(hdr != NULL);
294 	REQUIRE(data != NULL);
295 	DEBUG_REQUIRE(BLOCKING_REQ_MAGIC == hdr->magic_sig);
296 
297 	if (hdr->octets <= sizeof(*hdr))
298 		return 1;	/* failure */
299 	payload_octets = hdr->octets - sizeof(*hdr);
300 
301 	if (NULL == c->thread_ref)
302 		start_blocking_thread(c);
303 	threadcopy = emalloc(hdr->octets);
304 	memcpy(threadcopy, hdr, sizeof(*hdr));
305 	memcpy((char *)threadcopy + sizeof(*hdr), data, payload_octets);
306 
307 	return queue_req_pointer(c, threadcopy);
308 }
309 
310 /* --------------------------------------------------------------------
311  * Wait for the 'incoming queue no longer empty' signal, lock the shared
312  * structure and dequeue an item.
313  */
314 blocking_pipe_header *
315 receive_blocking_req_internal(
316 	blocking_child *	c
317 	)
318 {
319 	blocking_pipe_header *	req;
320 	size_t			qhead, qtail;
321 
322 	req = NULL;
323 	do {
324 		/* wait for tickle from the producer side */
325 		wait_for_sem(c->workitems_pending, NULL);
326 
327 		/* >>>> ACCESS LOCKING STARTS >>>> */
328 		wait_for_sem(c->accesslock, NULL);
329 		qhead = c->head_workitem;
330 		do {
331 			qtail = c->tail_workitem;
332 			if (qhead == qtail)
333 				break;
334 			c->tail_workitem = qtail + 1;
335 			qtail %= c->workitems_alloc;
336 			req = c->workitems[qtail];
337 			c->workitems[qtail] = NULL;
338 		} while (NULL == req);
339 		tickle_sem(c->accesslock);
340 		/* <<<< ACCESS LOCKING ENDS <<<< */
341 
342 	} while (NULL == req);
343 
344 	INSIST(NULL != req);
345 	if (CHILD_EXIT_REQ == req) {	/* idled out */
346 		send_blocking_resp_internal(c, CHILD_GONE_RESP);
347 		req = NULL;
348 	}
349 
350 	return req;
351 }
352 
353 /* --------------------------------------------------------------------
354  * Push a response into the return queue and eventually tickle the
355  * receiver.
356  */
357 int
358 send_blocking_resp_internal(
359 	blocking_child *	c,
360 	blocking_pipe_header *	resp
361 	)
362 {
363 	size_t	qhead;
364 	int	empty;
365 
366 	/* >>>> ACCESS LOCKING STARTS >>>> */
367 	wait_for_sem(c->accesslock, NULL);
368 	empty = ensure_workresp_empty_slot(c);
369 	qhead = c->head_response;
370 	c->responses[qhead % c->responses_alloc] = resp;
371 	c->head_response = 1 + qhead;
372 	tickle_sem(c->accesslock);
373 	/* <<<< ACCESS LOCKING ENDS <<<< */
374 
375 	/* queue consumer wake-up notification */
376 	if (empty)
377 	{
378 #	    ifdef WORK_PIPE
379 		write(c->resp_write_pipe, "", 1);
380 #	    else
381 		tickle_sem(c->responses_pending);
382 #	    endif
383 	}
384 	return 0;
385 }
386 
387 
388 #ifndef WORK_PIPE
389 
390 /* --------------------------------------------------------------------
391  * Check if a (Windows-)hanndle to a semaphore is actually the same we
392  * are using inside the sema wrapper.
393  */
394 static BOOL
395 same_os_sema(
396 	const sem_ref	obj,
397 	void*		osh
398 	)
399 {
400 	return obj && osh && (obj->shnd == (HANDLE)osh);
401 }
402 
403 /* --------------------------------------------------------------------
404  * Find the shared context that associates to an OS handle and make sure
405  * the data is dequeued and processed.
406  */
407 void
408 handle_blocking_resp_sem(
409 	void *	context
410 	)
411 {
412 	blocking_child *	c;
413 	u_int			idx;
414 
415 	c = NULL;
416 	for (idx = 0; idx < blocking_children_alloc; idx++) {
417 		c = blocking_children[idx];
418 		if (c != NULL &&
419 			c->thread_ref != NULL &&
420 			same_os_sema(c->responses_pending, context))
421 			break;
422 	}
423 	if (idx < blocking_children_alloc)
424 		process_blocking_resp(c);
425 }
426 #endif	/* !WORK_PIPE */
427 
428 /* --------------------------------------------------------------------
429  * Fetch the next response from the return queue. In case of signalling
430  * via pipe, make sure the pipe is flushed, too.
431  */
432 blocking_pipe_header *
433 receive_blocking_resp_internal(
434 	blocking_child *	c
435 	)
436 {
437 	blocking_pipe_header *	removed;
438 	size_t			qhead, qtail, slot;
439 
440 #ifdef WORK_PIPE
441 	int			rc;
442 	char			scratch[32];
443 
444 	do
445 		rc = read(c->resp_read_pipe, scratch, sizeof(scratch));
446 	while (-1 == rc && EINTR == errno);
447 #endif
448 
449 	/* >>>> ACCESS LOCKING STARTS >>>> */
450 	wait_for_sem(c->accesslock, NULL);
451 	qhead = c->head_response;
452 	qtail = c->tail_response;
453 	for (removed = NULL; !removed && (qhead != qtail); ++qtail) {
454 		slot = qtail % c->responses_alloc;
455 		removed = c->responses[slot];
456 		c->responses[slot] = NULL;
457 	}
458 	c->tail_response = qtail;
459 	tickle_sem(c->accesslock);
460 	/* <<<< ACCESS LOCKING ENDS <<<< */
461 
462 	if (NULL != removed) {
463 		DEBUG_ENSURE(CHILD_GONE_RESP == removed ||
464 			     BLOCKING_RESP_MAGIC == removed->magic_sig);
465 	}
466 	if (CHILD_GONE_RESP == removed) {
467 		cleanup_after_child(c);
468 		removed = NULL;
469 	}
470 
471 	return removed;
472 }
473 
474 /* --------------------------------------------------------------------
475  * Light up a new worker.
476  */
477 static void
478 start_blocking_thread(
479 	blocking_child *	c
480 	)
481 {
482 
483 	DEBUG_INSIST(!c->reusable);
484 
485 	prepare_child_sems(c);
486 	start_blocking_thread_internal(c);
487 }
488 
489 /* --------------------------------------------------------------------
490  * Create a worker thread. There are several differences between POSIX
491  * and Windows, of course -- most notably the Windows thread is no
492  * detached thread, and we keep the handle around until we want to get
493  * rid of the thread. The notification scheme also differs: Windows
494  * makes use of semaphores in both directions, POSIX uses a pipe for
495  * integration with 'select()' or alike.
496  */
497 static void
498 start_blocking_thread_internal(
499 	blocking_child *	c
500 	)
501 #ifdef SYS_WINNT
502 {
503 	BOOL	resumed;
504 
505 	c->thread_ref = NULL;
506 	(*addremove_io_semaphore)(c->responses_pending->shnd, FALSE);
507 	c->thr_table[0].thnd =
508 		(HANDLE)_beginthreadex(
509 			NULL,
510 			0,
511 			&blocking_thread,
512 			c,
513 			CREATE_SUSPENDED,
514 			NULL);
515 
516 	if (NULL == c->thr_table[0].thnd) {
517 		msyslog(LOG_ERR, "start blocking thread failed: %m");
518 		exit(-1);
519 	}
520 	/* remember the thread priority is only within the process class */
521 	if (!SetThreadPriority(c->thr_table[0].thnd,
522 			       THREAD_PRIORITY_BELOW_NORMAL))
523 		msyslog(LOG_ERR, "Error lowering blocking thread priority: %m");
524 
525 	resumed = ResumeThread(c->thr_table[0].thnd);
526 	DEBUG_INSIST(resumed);
527 	c->thread_ref = &c->thr_table[0];
528 }
529 #else	/* pthreads start_blocking_thread_internal() follows */
530 {
531 # ifdef NEED_PTHREAD_INIT
532 	static int	pthread_init_called;
533 # endif
534 	pthread_attr_t	thr_attr;
535 	int		rc;
536 	int		pipe_ends[2];	/* read then write */
537 	int		is_pipe;
538 	int		flags;
539 	size_t		ostacksize;
540 	size_t		nstacksize;
541 	sigset_t	saved_sig_mask;
542 
543 	c->thread_ref = NULL;
544 
545 # ifdef NEED_PTHREAD_INIT
546 	/*
547 	 * from lib/isc/unix/app.c:
548 	 * BSDI 3.1 seg faults in pthread_sigmask() if we don't do this.
549 	 */
550 	if (!pthread_init_called) {
551 		pthread_init();
552 		pthread_init_called = TRUE;
553 	}
554 # endif
555 
556 	rc = pipe_socketpair(&pipe_ends[0], &is_pipe);
557 	if (0 != rc) {
558 		msyslog(LOG_ERR, "start_blocking_thread: pipe_socketpair() %m");
559 		exit(1);
560 	}
561 	c->resp_read_pipe = move_fd(pipe_ends[0]);
562 	c->resp_write_pipe = move_fd(pipe_ends[1]);
563 	c->ispipe = is_pipe;
564 	flags = fcntl(c->resp_read_pipe, F_GETFL, 0);
565 	if (-1 == flags) {
566 		msyslog(LOG_ERR, "start_blocking_thread: fcntl(F_GETFL) %m");
567 		exit(1);
568 	}
569 	rc = fcntl(c->resp_read_pipe, F_SETFL, O_NONBLOCK | flags);
570 	if (-1 == rc) {
571 		msyslog(LOG_ERR,
572 			"start_blocking_thread: fcntl(F_SETFL, O_NONBLOCK) %m");
573 		exit(1);
574 	}
575 	(*addremove_io_fd)(c->resp_read_pipe, c->ispipe, FALSE);
576 	pthread_attr_init(&thr_attr);
577 	pthread_attr_setdetachstate(&thr_attr, PTHREAD_CREATE_DETACHED);
578 #if defined(HAVE_PTHREAD_ATTR_GETSTACKSIZE) && \
579     defined(HAVE_PTHREAD_ATTR_SETSTACKSIZE)
580 	rc = pthread_attr_getstacksize(&thr_attr, &ostacksize);
581 	if (0 != rc) {
582 		msyslog(LOG_ERR,
583 			"start_blocking_thread: pthread_attr_getstacksize() -> %s",
584 			strerror(rc));
585 	} else {
586 		if (ostacksize < THREAD_MINSTACKSIZE)
587 			nstacksize = THREAD_MINSTACKSIZE;
588 		else if (ostacksize > THREAD_MAXSTACKSIZE)
589 			nstacksize = THREAD_MAXSTACKSIZE;
590 		else
591 			nstacksize = ostacksize;
592 		if (nstacksize != ostacksize)
593 			rc = pthread_attr_setstacksize(&thr_attr, nstacksize);
594 		if (0 != rc)
595 			msyslog(LOG_ERR,
596 				"start_blocking_thread: pthread_attr_setstacksize(0x%lx -> 0x%lx) -> %s",
597 				(u_long)ostacksize, (u_long)nstacksize,
598 				strerror(rc));
599 	}
600 #else
601 	UNUSED_ARG(nstacksize);
602 	UNUSED_ARG(ostacksize);
603 #endif
604 #if defined(PTHREAD_SCOPE_SYSTEM) && defined(NEED_PTHREAD_SCOPE_SYSTEM)
605 	pthread_attr_setscope(&thr_attr, PTHREAD_SCOPE_SYSTEM);
606 #endif
607 	c->thread_ref = emalloc_zero(sizeof(*c->thread_ref));
608 	block_thread_signals(&saved_sig_mask);
609 	rc = pthread_create(&c->thr_table[0], &thr_attr,
610 			    &blocking_thread, c);
611 	pthread_sigmask(SIG_SETMASK, &saved_sig_mask, NULL);
612 	pthread_attr_destroy(&thr_attr);
613 	if (0 != rc) {
614 		msyslog(LOG_ERR, "start_blocking_thread: pthread_create() -> %s",
615 			strerror(rc));
616 		exit(1);
617 	}
618 	c->thread_ref = &c->thr_table[0];
619 }
620 #endif
621 
622 /* --------------------------------------------------------------------
623  * block_thread_signals()
624  *
625  * Temporarily block signals used by ntpd main thread, so that signal
626  * mask inherited by child threads leaves them blocked.  Returns prior
627  * active signal mask via pmask, to be restored by the main thread
628  * after pthread_create().
629  */
630 #ifndef SYS_WINNT
631 void
632 block_thread_signals(
633 	sigset_t *	pmask
634 	)
635 {
636 	sigset_t	block;
637 
638 	sigemptyset(&block);
639 # ifdef HAVE_SIGNALED_IO
640 #  ifdef SIGIO
641 	sigaddset(&block, SIGIO);
642 #  endif
643 #  ifdef SIGPOLL
644 	sigaddset(&block, SIGPOLL);
645 #  endif
646 # endif	/* HAVE_SIGNALED_IO */
647 	sigaddset(&block, SIGALRM);
648 	sigaddset(&block, MOREDEBUGSIG);
649 	sigaddset(&block, LESSDEBUGSIG);
650 # ifdef SIGDIE1
651 	sigaddset(&block, SIGDIE1);
652 # endif
653 # ifdef SIGDIE2
654 	sigaddset(&block, SIGDIE2);
655 # endif
656 # ifdef SIGDIE3
657 	sigaddset(&block, SIGDIE3);
658 # endif
659 # ifdef SIGDIE4
660 	sigaddset(&block, SIGDIE4);
661 # endif
662 # ifdef SIGBUS
663 	sigaddset(&block, SIGBUS);
664 # endif
665 	sigemptyset(pmask);
666 	pthread_sigmask(SIG_BLOCK, &block, pmask);
667 }
668 #endif	/* !SYS_WINNT */
669 
670 
671 /* --------------------------------------------------------------------
672  * Create & destroy semaphores. This is sufficiently different between
673  * POSIX and Windows to warrant wrapper functions and close enough to
674  * use the concept of synchronization via semaphore for all platforms.
675  */
676 static sem_ref
677 create_sema(
678 	sema_type*	semptr,
679 	u_int		inival,
680 	u_int		maxval)
681 {
682 #ifdef SYS_WINNT
683 
684 	long svini, svmax;
685 	if (NULL != semptr) {
686 		svini = (inival < LONG_MAX)
687 		    ? (long)inival : LONG_MAX;
688 		svmax = (maxval < LONG_MAX && maxval > 0)
689 		    ? (long)maxval : LONG_MAX;
690 		semptr->shnd = CreateSemaphore(NULL, svini, svmax, NULL);
691 		if (NULL == semptr->shnd)
692 			semptr = NULL;
693 	}
694 
695 #else
696 
697 	(void)maxval;
698 	if (semptr && sem_init(semptr, FALSE, inival))
699 		semptr = NULL;
700 
701 #endif
702 
703 	return semptr;
704 }
705 
706 /* ------------------------------------------------------------------ */
707 static sem_ref
708 delete_sema(
709 	sem_ref obj)
710 {
711 
712 #   ifdef SYS_WINNT
713 
714 	if (obj) {
715 		if (obj->shnd)
716 			CloseHandle(obj->shnd);
717 		obj->shnd = NULL;
718 	}
719 
720 #   else
721 
722 	if (obj)
723 		sem_destroy(obj);
724 
725 #   endif
726 
727 	return NULL;
728 }
729 
730 /* --------------------------------------------------------------------
731  * prepare_child_sems()
732  *
733  * create sync & access semaphores
734  *
735  * All semaphores are cleared, only the access semaphore has 1 unit.
736  * Childs wait on 'workitems_pending', then grabs 'sema_access'
737  * and dequeues jobs. When done, 'sema_access' is given one unit back.
738  *
739  * The producer grabs 'sema_access', manages the queue, restores
740  * 'sema_access' and puts one unit into 'workitems_pending'.
741  *
742  * The story goes the same for the response queue.
743  */
744 static void
745 prepare_child_sems(
746 	blocking_child *c
747 	)
748 {
749 	if (NULL == worker_memlock)
750 		worker_memlock = create_sema(&worker_mmutex, 1, 1);
751 
752 	c->accesslock           = create_sema(&c->sem_table[0], 1, 1);
753 	c->workitems_pending    = create_sema(&c->sem_table[1], 0, 0);
754 	c->wake_scheduled_sleep = create_sema(&c->sem_table[2], 0, 1);
755 #   ifndef WORK_PIPE
756 	c->responses_pending    = create_sema(&c->sem_table[3], 0, 0);
757 #   endif
758 }
759 
760 /* --------------------------------------------------------------------
761  * wait for semaphore. Where the wait can be interrupted, it will
762  * internally resume -- When this function returns, there is either no
763  * semaphore at all, a timeout occurred, or the caller could
764  * successfully take a token from the semaphore.
765  *
766  * For untimed wait, not checking the result of this function at all is
767  * definitely an option.
768  */
769 static int
770 wait_for_sem(
771 	sem_ref			sem,
772 	struct timespec *	timeout		/* wall-clock */
773 	)
774 #ifdef SYS_WINNT
775 {
776 	struct timespec now;
777 	struct timespec delta;
778 	DWORD		msec;
779 	DWORD		rc;
780 
781 	if (!(sem && sem->shnd)) {
782 		errno = EINVAL;
783 		return -1;
784 	}
785 
786 	if (NULL == timeout) {
787 		msec = INFINITE;
788 	} else {
789 		getclock(TIMEOFDAY, &now);
790 		delta = sub_tspec(*timeout, now);
791 		if (delta.tv_sec < 0) {
792 			msec = 0;
793 		} else if ((delta.tv_sec + 1) >= (MAXDWORD / 1000)) {
794 			msec = INFINITE;
795 		} else {
796 			msec = 1000 * (DWORD)delta.tv_sec;
797 			msec += delta.tv_nsec / (1000 * 1000);
798 		}
799 	}
800 	rc = WaitForSingleObject(sem->shnd, msec);
801 	if (WAIT_OBJECT_0 == rc)
802 		return 0;
803 	if (WAIT_TIMEOUT == rc) {
804 		errno = ETIMEDOUT;
805 		return -1;
806 	}
807 	msyslog(LOG_ERR, "WaitForSingleObject unexpected 0x%x", rc);
808 	errno = EFAULT;
809 	return -1;
810 }
811 #else	/* pthreads wait_for_sem() follows */
812 {
813 	int rc = -1;
814 
815 	if (sem) do {
816 			if (NULL == timeout)
817 				rc = sem_wait(sem);
818 			else
819 				rc = sem_timedwait(sem, timeout);
820 		} while (rc == -1 && errno == EINTR);
821 	else
822 		errno = EINVAL;
823 
824 	return rc;
825 }
826 #endif
827 
828 /* --------------------------------------------------------------------
829  * blocking_thread - thread functions have WINAPI (aka 'stdcall')
830  * calling conventions under Windows and POSIX-defined signature
831  * otherwise.
832  */
833 #ifdef SYS_WINNT
834 u_int WINAPI
835 #else
836 void *
837 #endif
838 blocking_thread(
839 	void *	ThreadArg
840 	)
841 {
842 	blocking_child *c;
843 
844 	c = ThreadArg;
845 	exit_worker(blocking_child_common(c));
846 
847 	/* NOTREACHED */
848 	return 0;
849 }
850 
851 /* --------------------------------------------------------------------
852  * req_child_exit() runs in the parent.
853  *
854  * This function is called from from the idle timer, too, and possibly
855  * without a thread being there any longer. Since we have folded up our
856  * tent in that case and all the semaphores are already gone, we simply
857  * ignore this request in this case.
858  *
859  * Since the existence of the semaphores is controlled exclusively by
860  * the parent, there's no risk of data race here.
861  */
862 int
863 req_child_exit(
864 	blocking_child *c
865 	)
866 {
867 	return (c->accesslock)
868 	    ? queue_req_pointer(c, CHILD_EXIT_REQ)
869 	    : 0;
870 }
871 
872 /* --------------------------------------------------------------------
873  * cleanup_after_child() runs in parent.
874  */
875 static void
876 cleanup_after_child(
877 	blocking_child *	c
878 	)
879 {
880 	DEBUG_INSIST(!c->reusable);
881 
882 #   ifdef SYS_WINNT
883 	/* The thread was not created in detached state, so we better
884 	 * clean up.
885 	 */
886 	if (c->thread_ref && c->thread_ref->thnd) {
887 		WaitForSingleObject(c->thread_ref->thnd, INFINITE);
888 		INSIST(CloseHandle(c->thread_ref->thnd));
889 		c->thread_ref->thnd = NULL;
890 	}
891 #   endif
892 	c->thread_ref = NULL;
893 
894 	/* remove semaphores and (if signalling vi IO) pipes */
895 
896 	c->accesslock           = delete_sema(c->accesslock);
897 	c->workitems_pending    = delete_sema(c->workitems_pending);
898 	c->wake_scheduled_sleep = delete_sema(c->wake_scheduled_sleep);
899 
900 #   ifdef WORK_PIPE
901 	DEBUG_INSIST(-1 != c->resp_read_pipe);
902 	DEBUG_INSIST(-1 != c->resp_write_pipe);
903 	(*addremove_io_fd)(c->resp_read_pipe, c->ispipe, TRUE);
904 	close(c->resp_write_pipe);
905 	close(c->resp_read_pipe);
906 	c->resp_write_pipe = -1;
907 	c->resp_read_pipe = -1;
908 #   else
909 	DEBUG_INSIST(NULL != c->responses_pending);
910 	(*addremove_io_semaphore)(c->responses_pending->shnd, TRUE);
911 	c->responses_pending = delete_sema(c->responses_pending);
912 #   endif
913 
914 	/* Is it necessary to check if there are pending requests and
915 	 * responses? If so, and if there are, what to do with them?
916 	 */
917 
918 	/* re-init buffer index sequencers */
919 	c->head_workitem = 0;
920 	c->tail_workitem = 0;
921 	c->head_response = 0;
922 	c->tail_response = 0;
923 
924 	c->reusable = TRUE;
925 }
926 
927 
928 #else	/* !WORK_THREAD follows */
929 char work_thread_nonempty_compilation_unit;
930 #endif
931