1 /* $NetBSD: work_thread.c,v 1.5 2016/05/01 23:32:00 christos Exp $ */ 2 3 /* 4 * work_thread.c - threads implementation for blocking worker child. 5 */ 6 #include <config.h> 7 #include "ntp_workimpl.h" 8 9 #ifdef WORK_THREAD 10 11 #include <stdio.h> 12 #include <ctype.h> 13 #include <signal.h> 14 #ifndef SYS_WINNT 15 #include <pthread.h> 16 #endif 17 18 #include "ntp_stdlib.h" 19 #include "ntp_malloc.h" 20 #include "ntp_syslog.h" 21 #include "ntpd.h" 22 #include "ntp_io.h" 23 #include "ntp_assert.h" 24 #include "ntp_unixtime.h" 25 #include "timespecops.h" 26 #include "ntp_worker.h" 27 28 #define CHILD_EXIT_REQ ((blocking_pipe_header *)(intptr_t)-1) 29 #define CHILD_GONE_RESP CHILD_EXIT_REQ 30 /* Queue size increments: 31 * The request queue grows a bit faster than the response queue -- the 32 * deamon can push requests and pull results faster on avarage than the 33 * worker can process requests and push results... If this really pays 34 * off is debatable. 35 */ 36 #define WORKITEMS_ALLOC_INC 16 37 #define RESPONSES_ALLOC_INC 4 38 39 /* Fiddle with min/max stack sizes. 64kB minimum seems to work, so we 40 * set the maximum to 256kB. If the minimum goes below the 41 * system-defined minimum stack size, we have to adjust accordingly. 42 */ 43 #ifndef THREAD_MINSTACKSIZE 44 # define THREAD_MINSTACKSIZE (64U * 1024) 45 #endif 46 #ifndef __sun 47 #if defined(PTHREAD_STACK_MIN) && THREAD_MINSTACKSIZE < PTHREAD_STACK_MIN 48 # undef THREAD_MINSTACKSIZE 49 # define THREAD_MINSTACKSIZE PTHREAD_STACK_MIN 50 #endif 51 #endif 52 53 #ifndef THREAD_MAXSTACKSIZE 54 # define THREAD_MAXSTACKSIZE (256U * 1024) 55 #endif 56 #if THREAD_MAXSTACKSIZE < THREAD_MINSTACKSIZE 57 # undef THREAD_MAXSTACKSIZE 58 # define THREAD_MAXSTACKSIZE THREAD_MINSTACKSIZE 59 #endif 60 61 62 #ifdef SYS_WINNT 63 64 # define thread_exit(c) _endthreadex(c) 65 # define tickle_sem(sh) ReleaseSemaphore((sh->shnd), 1, NULL) 66 u_int WINAPI blocking_thread(void *); 67 static BOOL same_os_sema(const sem_ref obj, void * osobj); 68 69 #else 70 71 # define thread_exit(c) pthread_exit((void*)(size_t)(c)) 72 # define tickle_sem sem_post 73 void * blocking_thread(void *); 74 static void block_thread_signals(sigset_t *); 75 76 #endif 77 78 #ifdef WORK_PIPE 79 addremove_io_fd_func addremove_io_fd; 80 #else 81 addremove_io_semaphore_func addremove_io_semaphore; 82 #endif 83 84 static void start_blocking_thread(blocking_child *); 85 static void start_blocking_thread_internal(blocking_child *); 86 static void prepare_child_sems(blocking_child *); 87 static int wait_for_sem(sem_ref, struct timespec *); 88 static int ensure_workitems_empty_slot(blocking_child *); 89 static int ensure_workresp_empty_slot(blocking_child *); 90 static int queue_req_pointer(blocking_child *, blocking_pipe_header *); 91 static void cleanup_after_child(blocking_child *); 92 93 static sema_type worker_mmutex; 94 static sem_ref worker_memlock; 95 96 /* -------------------------------------------------------------------- 97 * locking the global worker state table (and other global stuff) 98 */ 99 void 100 worker_global_lock( 101 int inOrOut) 102 { 103 if (worker_memlock) { 104 if (inOrOut) 105 wait_for_sem(worker_memlock, NULL); 106 else 107 tickle_sem(worker_memlock); 108 } 109 } 110 111 /* -------------------------------------------------------------------- 112 * implementation isolation wrapper 113 */ 114 void 115 exit_worker( 116 int exitcode 117 ) 118 { 119 thread_exit(exitcode); /* see #define thread_exit */ 120 } 121 122 /* -------------------------------------------------------------------- 123 * sleep for a given time or until the wakup semaphore is tickled. 124 */ 125 int 126 worker_sleep( 127 blocking_child * c, 128 time_t seconds 129 ) 130 { 131 struct timespec until; 132 int rc; 133 134 # ifdef HAVE_CLOCK_GETTIME 135 if (0 != clock_gettime(CLOCK_REALTIME, &until)) { 136 msyslog(LOG_ERR, "worker_sleep: clock_gettime() failed: %m"); 137 return -1; 138 } 139 # else 140 if (0 != getclock(TIMEOFDAY, &until)) { 141 msyslog(LOG_ERR, "worker_sleep: getclock() failed: %m"); 142 return -1; 143 } 144 # endif 145 until.tv_sec += seconds; 146 rc = wait_for_sem(c->wake_scheduled_sleep, &until); 147 if (0 == rc) 148 return -1; 149 if (-1 == rc && ETIMEDOUT == errno) 150 return 0; 151 msyslog(LOG_ERR, "worker_sleep: sem_timedwait: %m"); 152 return -1; 153 } 154 155 156 /* -------------------------------------------------------------------- 157 * Wake up a worker that takes a nap. 158 */ 159 void 160 interrupt_worker_sleep(void) 161 { 162 u_int idx; 163 blocking_child * c; 164 165 for (idx = 0; idx < blocking_children_alloc; idx++) { 166 c = blocking_children[idx]; 167 if (NULL == c || NULL == c->wake_scheduled_sleep) 168 continue; 169 tickle_sem(c->wake_scheduled_sleep); 170 } 171 } 172 173 /* -------------------------------------------------------------------- 174 * Make sure there is an empty slot at the head of the request 175 * queue. Tell if the queue is currently empty. 176 */ 177 static int 178 ensure_workitems_empty_slot( 179 blocking_child *c 180 ) 181 { 182 /* 183 ** !!! PRECONDITION: caller holds access lock! 184 ** 185 ** This simply tries to increase the size of the buffer if it 186 ** becomes full. The resize operation does *not* maintain the 187 ** order of requests, but that should be irrelevant since the 188 ** processing is considered asynchronous anyway. 189 ** 190 ** Return if the buffer is currently empty. 191 */ 192 193 static const size_t each = 194 sizeof(blocking_children[0]->workitems[0]); 195 196 size_t new_alloc; 197 size_t slots_used; 198 size_t sidx; 199 200 slots_used = c->head_workitem - c->tail_workitem; 201 if (slots_used >= c->workitems_alloc) { 202 new_alloc = c->workitems_alloc + WORKITEMS_ALLOC_INC; 203 c->workitems = erealloc(c->workitems, new_alloc * each); 204 for (sidx = c->workitems_alloc; sidx < new_alloc; ++sidx) 205 c->workitems[sidx] = NULL; 206 c->tail_workitem = 0; 207 c->head_workitem = c->workitems_alloc; 208 c->workitems_alloc = new_alloc; 209 } 210 INSIST(NULL == c->workitems[c->head_workitem % c->workitems_alloc]); 211 return (0 == slots_used); 212 } 213 214 /* -------------------------------------------------------------------- 215 * Make sure there is an empty slot at the head of the response 216 * queue. Tell if the queue is currently empty. 217 */ 218 static int 219 ensure_workresp_empty_slot( 220 blocking_child *c 221 ) 222 { 223 /* 224 ** !!! PRECONDITION: caller holds access lock! 225 ** 226 ** Works like the companion function above. 227 */ 228 229 static const size_t each = 230 sizeof(blocking_children[0]->responses[0]); 231 232 size_t new_alloc; 233 size_t slots_used; 234 size_t sidx; 235 236 slots_used = c->head_response - c->tail_response; 237 if (slots_used >= c->responses_alloc) { 238 new_alloc = c->responses_alloc + RESPONSES_ALLOC_INC; 239 c->responses = erealloc(c->responses, new_alloc * each); 240 for (sidx = c->responses_alloc; sidx < new_alloc; ++sidx) 241 c->responses[sidx] = NULL; 242 c->tail_response = 0; 243 c->head_response = c->responses_alloc; 244 c->responses_alloc = new_alloc; 245 } 246 INSIST(NULL == c->responses[c->head_response % c->responses_alloc]); 247 return (0 == slots_used); 248 } 249 250 251 /* -------------------------------------------------------------------- 252 * queue_req_pointer() - append a work item or idle exit request to 253 * blocking_workitems[]. Employ proper locking. 254 */ 255 static int 256 queue_req_pointer( 257 blocking_child * c, 258 blocking_pipe_header * hdr 259 ) 260 { 261 size_t qhead; 262 263 /* >>>> ACCESS LOCKING STARTS >>>> */ 264 wait_for_sem(c->accesslock, NULL); 265 ensure_workitems_empty_slot(c); 266 qhead = c->head_workitem; 267 c->workitems[qhead % c->workitems_alloc] = hdr; 268 c->head_workitem = 1 + qhead; 269 tickle_sem(c->accesslock); 270 /* <<<< ACCESS LOCKING ENDS <<<< */ 271 272 /* queue consumer wake-up notification */ 273 tickle_sem(c->workitems_pending); 274 275 return 0; 276 } 277 278 /* -------------------------------------------------------------------- 279 * API function to make sure a worker is running, a proper private copy 280 * of the data is made, the data eneterd into the queue and the worker 281 * is signalled. 282 */ 283 int 284 send_blocking_req_internal( 285 blocking_child * c, 286 blocking_pipe_header * hdr, 287 void * data 288 ) 289 { 290 blocking_pipe_header * threadcopy; 291 size_t payload_octets; 292 293 REQUIRE(hdr != NULL); 294 REQUIRE(data != NULL); 295 DEBUG_REQUIRE(BLOCKING_REQ_MAGIC == hdr->magic_sig); 296 297 if (hdr->octets <= sizeof(*hdr)) 298 return 1; /* failure */ 299 payload_octets = hdr->octets - sizeof(*hdr); 300 301 if (NULL == c->thread_ref) 302 start_blocking_thread(c); 303 threadcopy = emalloc(hdr->octets); 304 memcpy(threadcopy, hdr, sizeof(*hdr)); 305 memcpy((char *)threadcopy + sizeof(*hdr), data, payload_octets); 306 307 return queue_req_pointer(c, threadcopy); 308 } 309 310 /* -------------------------------------------------------------------- 311 * Wait for the 'incoming queue no longer empty' signal, lock the shared 312 * structure and dequeue an item. 313 */ 314 blocking_pipe_header * 315 receive_blocking_req_internal( 316 blocking_child * c 317 ) 318 { 319 blocking_pipe_header * req; 320 size_t qhead, qtail; 321 322 req = NULL; 323 do { 324 /* wait for tickle from the producer side */ 325 wait_for_sem(c->workitems_pending, NULL); 326 327 /* >>>> ACCESS LOCKING STARTS >>>> */ 328 wait_for_sem(c->accesslock, NULL); 329 qhead = c->head_workitem; 330 do { 331 qtail = c->tail_workitem; 332 if (qhead == qtail) 333 break; 334 c->tail_workitem = qtail + 1; 335 qtail %= c->workitems_alloc; 336 req = c->workitems[qtail]; 337 c->workitems[qtail] = NULL; 338 } while (NULL == req); 339 tickle_sem(c->accesslock); 340 /* <<<< ACCESS LOCKING ENDS <<<< */ 341 342 } while (NULL == req); 343 344 INSIST(NULL != req); 345 if (CHILD_EXIT_REQ == req) { /* idled out */ 346 send_blocking_resp_internal(c, CHILD_GONE_RESP); 347 req = NULL; 348 } 349 350 return req; 351 } 352 353 /* -------------------------------------------------------------------- 354 * Push a response into the return queue and eventually tickle the 355 * receiver. 356 */ 357 int 358 send_blocking_resp_internal( 359 blocking_child * c, 360 blocking_pipe_header * resp 361 ) 362 { 363 size_t qhead; 364 int empty; 365 366 /* >>>> ACCESS LOCKING STARTS >>>> */ 367 wait_for_sem(c->accesslock, NULL); 368 empty = ensure_workresp_empty_slot(c); 369 qhead = c->head_response; 370 c->responses[qhead % c->responses_alloc] = resp; 371 c->head_response = 1 + qhead; 372 tickle_sem(c->accesslock); 373 /* <<<< ACCESS LOCKING ENDS <<<< */ 374 375 /* queue consumer wake-up notification */ 376 if (empty) 377 { 378 # ifdef WORK_PIPE 379 write(c->resp_write_pipe, "", 1); 380 # else 381 tickle_sem(c->responses_pending); 382 # endif 383 } 384 return 0; 385 } 386 387 388 #ifndef WORK_PIPE 389 390 /* -------------------------------------------------------------------- 391 * Check if a (Windows-)hanndle to a semaphore is actually the same we 392 * are using inside the sema wrapper. 393 */ 394 static BOOL 395 same_os_sema( 396 const sem_ref obj, 397 void* osh 398 ) 399 { 400 return obj && osh && (obj->shnd == (HANDLE)osh); 401 } 402 403 /* -------------------------------------------------------------------- 404 * Find the shared context that associates to an OS handle and make sure 405 * the data is dequeued and processed. 406 */ 407 void 408 handle_blocking_resp_sem( 409 void * context 410 ) 411 { 412 blocking_child * c; 413 u_int idx; 414 415 c = NULL; 416 for (idx = 0; idx < blocking_children_alloc; idx++) { 417 c = blocking_children[idx]; 418 if (c != NULL && 419 c->thread_ref != NULL && 420 same_os_sema(c->responses_pending, context)) 421 break; 422 } 423 if (idx < blocking_children_alloc) 424 process_blocking_resp(c); 425 } 426 #endif /* !WORK_PIPE */ 427 428 /* -------------------------------------------------------------------- 429 * Fetch the next response from the return queue. In case of signalling 430 * via pipe, make sure the pipe is flushed, too. 431 */ 432 blocking_pipe_header * 433 receive_blocking_resp_internal( 434 blocking_child * c 435 ) 436 { 437 blocking_pipe_header * removed; 438 size_t qhead, qtail, slot; 439 440 #ifdef WORK_PIPE 441 int rc; 442 char scratch[32]; 443 444 do 445 rc = read(c->resp_read_pipe, scratch, sizeof(scratch)); 446 while (-1 == rc && EINTR == errno); 447 #endif 448 449 /* >>>> ACCESS LOCKING STARTS >>>> */ 450 wait_for_sem(c->accesslock, NULL); 451 qhead = c->head_response; 452 qtail = c->tail_response; 453 for (removed = NULL; !removed && (qhead != qtail); ++qtail) { 454 slot = qtail % c->responses_alloc; 455 removed = c->responses[slot]; 456 c->responses[slot] = NULL; 457 } 458 c->tail_response = qtail; 459 tickle_sem(c->accesslock); 460 /* <<<< ACCESS LOCKING ENDS <<<< */ 461 462 if (NULL != removed) { 463 DEBUG_ENSURE(CHILD_GONE_RESP == removed || 464 BLOCKING_RESP_MAGIC == removed->magic_sig); 465 } 466 if (CHILD_GONE_RESP == removed) { 467 cleanup_after_child(c); 468 removed = NULL; 469 } 470 471 return removed; 472 } 473 474 /* -------------------------------------------------------------------- 475 * Light up a new worker. 476 */ 477 static void 478 start_blocking_thread( 479 blocking_child * c 480 ) 481 { 482 483 DEBUG_INSIST(!c->reusable); 484 485 prepare_child_sems(c); 486 start_blocking_thread_internal(c); 487 } 488 489 /* -------------------------------------------------------------------- 490 * Create a worker thread. There are several differences between POSIX 491 * and Windows, of course -- most notably the Windows thread is no 492 * detached thread, and we keep the handle around until we want to get 493 * rid of the thread. The notification scheme also differs: Windows 494 * makes use of semaphores in both directions, POSIX uses a pipe for 495 * integration with 'select()' or alike. 496 */ 497 static void 498 start_blocking_thread_internal( 499 blocking_child * c 500 ) 501 #ifdef SYS_WINNT 502 { 503 BOOL resumed; 504 505 c->thread_ref = NULL; 506 (*addremove_io_semaphore)(c->responses_pending->shnd, FALSE); 507 c->thr_table[0].thnd = 508 (HANDLE)_beginthreadex( 509 NULL, 510 0, 511 &blocking_thread, 512 c, 513 CREATE_SUSPENDED, 514 NULL); 515 516 if (NULL == c->thr_table[0].thnd) { 517 msyslog(LOG_ERR, "start blocking thread failed: %m"); 518 exit(-1); 519 } 520 /* remember the thread priority is only within the process class */ 521 if (!SetThreadPriority(c->thr_table[0].thnd, 522 THREAD_PRIORITY_BELOW_NORMAL)) 523 msyslog(LOG_ERR, "Error lowering blocking thread priority: %m"); 524 525 resumed = ResumeThread(c->thr_table[0].thnd); 526 DEBUG_INSIST(resumed); 527 c->thread_ref = &c->thr_table[0]; 528 } 529 #else /* pthreads start_blocking_thread_internal() follows */ 530 { 531 # ifdef NEED_PTHREAD_INIT 532 static int pthread_init_called; 533 # endif 534 pthread_attr_t thr_attr; 535 int rc; 536 int pipe_ends[2]; /* read then write */ 537 int is_pipe; 538 int flags; 539 size_t ostacksize; 540 size_t nstacksize; 541 sigset_t saved_sig_mask; 542 543 c->thread_ref = NULL; 544 545 # ifdef NEED_PTHREAD_INIT 546 /* 547 * from lib/isc/unix/app.c: 548 * BSDI 3.1 seg faults in pthread_sigmask() if we don't do this. 549 */ 550 if (!pthread_init_called) { 551 pthread_init(); 552 pthread_init_called = TRUE; 553 } 554 # endif 555 556 rc = pipe_socketpair(&pipe_ends[0], &is_pipe); 557 if (0 != rc) { 558 msyslog(LOG_ERR, "start_blocking_thread: pipe_socketpair() %m"); 559 exit(1); 560 } 561 c->resp_read_pipe = move_fd(pipe_ends[0]); 562 c->resp_write_pipe = move_fd(pipe_ends[1]); 563 c->ispipe = is_pipe; 564 flags = fcntl(c->resp_read_pipe, F_GETFL, 0); 565 if (-1 == flags) { 566 msyslog(LOG_ERR, "start_blocking_thread: fcntl(F_GETFL) %m"); 567 exit(1); 568 } 569 rc = fcntl(c->resp_read_pipe, F_SETFL, O_NONBLOCK | flags); 570 if (-1 == rc) { 571 msyslog(LOG_ERR, 572 "start_blocking_thread: fcntl(F_SETFL, O_NONBLOCK) %m"); 573 exit(1); 574 } 575 (*addremove_io_fd)(c->resp_read_pipe, c->ispipe, FALSE); 576 pthread_attr_init(&thr_attr); 577 pthread_attr_setdetachstate(&thr_attr, PTHREAD_CREATE_DETACHED); 578 #if defined(HAVE_PTHREAD_ATTR_GETSTACKSIZE) && \ 579 defined(HAVE_PTHREAD_ATTR_SETSTACKSIZE) 580 rc = pthread_attr_getstacksize(&thr_attr, &ostacksize); 581 if (0 != rc) { 582 msyslog(LOG_ERR, 583 "start_blocking_thread: pthread_attr_getstacksize() -> %s", 584 strerror(rc)); 585 } else { 586 if (ostacksize < THREAD_MINSTACKSIZE) 587 nstacksize = THREAD_MINSTACKSIZE; 588 else if (ostacksize > THREAD_MAXSTACKSIZE) 589 nstacksize = THREAD_MAXSTACKSIZE; 590 else 591 nstacksize = ostacksize; 592 if (nstacksize != ostacksize) 593 rc = pthread_attr_setstacksize(&thr_attr, nstacksize); 594 if (0 != rc) 595 msyslog(LOG_ERR, 596 "start_blocking_thread: pthread_attr_setstacksize(0x%lx -> 0x%lx) -> %s", 597 (u_long)ostacksize, (u_long)nstacksize, 598 strerror(rc)); 599 } 600 #else 601 UNUSED_ARG(nstacksize); 602 UNUSED_ARG(ostacksize); 603 #endif 604 #if defined(PTHREAD_SCOPE_SYSTEM) && defined(NEED_PTHREAD_SCOPE_SYSTEM) 605 pthread_attr_setscope(&thr_attr, PTHREAD_SCOPE_SYSTEM); 606 #endif 607 c->thread_ref = emalloc_zero(sizeof(*c->thread_ref)); 608 block_thread_signals(&saved_sig_mask); 609 rc = pthread_create(&c->thr_table[0], &thr_attr, 610 &blocking_thread, c); 611 pthread_sigmask(SIG_SETMASK, &saved_sig_mask, NULL); 612 pthread_attr_destroy(&thr_attr); 613 if (0 != rc) { 614 msyslog(LOG_ERR, "start_blocking_thread: pthread_create() -> %s", 615 strerror(rc)); 616 exit(1); 617 } 618 c->thread_ref = &c->thr_table[0]; 619 } 620 #endif 621 622 /* -------------------------------------------------------------------- 623 * block_thread_signals() 624 * 625 * Temporarily block signals used by ntpd main thread, so that signal 626 * mask inherited by child threads leaves them blocked. Returns prior 627 * active signal mask via pmask, to be restored by the main thread 628 * after pthread_create(). 629 */ 630 #ifndef SYS_WINNT 631 void 632 block_thread_signals( 633 sigset_t * pmask 634 ) 635 { 636 sigset_t block; 637 638 sigemptyset(&block); 639 # ifdef HAVE_SIGNALED_IO 640 # ifdef SIGIO 641 sigaddset(&block, SIGIO); 642 # endif 643 # ifdef SIGPOLL 644 sigaddset(&block, SIGPOLL); 645 # endif 646 # endif /* HAVE_SIGNALED_IO */ 647 sigaddset(&block, SIGALRM); 648 sigaddset(&block, MOREDEBUGSIG); 649 sigaddset(&block, LESSDEBUGSIG); 650 # ifdef SIGDIE1 651 sigaddset(&block, SIGDIE1); 652 # endif 653 # ifdef SIGDIE2 654 sigaddset(&block, SIGDIE2); 655 # endif 656 # ifdef SIGDIE3 657 sigaddset(&block, SIGDIE3); 658 # endif 659 # ifdef SIGDIE4 660 sigaddset(&block, SIGDIE4); 661 # endif 662 # ifdef SIGBUS 663 sigaddset(&block, SIGBUS); 664 # endif 665 sigemptyset(pmask); 666 pthread_sigmask(SIG_BLOCK, &block, pmask); 667 } 668 #endif /* !SYS_WINNT */ 669 670 671 /* -------------------------------------------------------------------- 672 * Create & destroy semaphores. This is sufficiently different between 673 * POSIX and Windows to warrant wrapper functions and close enough to 674 * use the concept of synchronization via semaphore for all platforms. 675 */ 676 static sem_ref 677 create_sema( 678 sema_type* semptr, 679 u_int inival, 680 u_int maxval) 681 { 682 #ifdef SYS_WINNT 683 684 long svini, svmax; 685 if (NULL != semptr) { 686 svini = (inival < LONG_MAX) 687 ? (long)inival : LONG_MAX; 688 svmax = (maxval < LONG_MAX && maxval > 0) 689 ? (long)maxval : LONG_MAX; 690 semptr->shnd = CreateSemaphore(NULL, svini, svmax, NULL); 691 if (NULL == semptr->shnd) 692 semptr = NULL; 693 } 694 695 #else 696 697 (void)maxval; 698 if (semptr && sem_init(semptr, FALSE, inival)) 699 semptr = NULL; 700 701 #endif 702 703 return semptr; 704 } 705 706 /* ------------------------------------------------------------------ */ 707 static sem_ref 708 delete_sema( 709 sem_ref obj) 710 { 711 712 # ifdef SYS_WINNT 713 714 if (obj) { 715 if (obj->shnd) 716 CloseHandle(obj->shnd); 717 obj->shnd = NULL; 718 } 719 720 # else 721 722 if (obj) 723 sem_destroy(obj); 724 725 # endif 726 727 return NULL; 728 } 729 730 /* -------------------------------------------------------------------- 731 * prepare_child_sems() 732 * 733 * create sync & access semaphores 734 * 735 * All semaphores are cleared, only the access semaphore has 1 unit. 736 * Childs wait on 'workitems_pending', then grabs 'sema_access' 737 * and dequeues jobs. When done, 'sema_access' is given one unit back. 738 * 739 * The producer grabs 'sema_access', manages the queue, restores 740 * 'sema_access' and puts one unit into 'workitems_pending'. 741 * 742 * The story goes the same for the response queue. 743 */ 744 static void 745 prepare_child_sems( 746 blocking_child *c 747 ) 748 { 749 if (NULL == worker_memlock) 750 worker_memlock = create_sema(&worker_mmutex, 1, 1); 751 752 c->accesslock = create_sema(&c->sem_table[0], 1, 1); 753 c->workitems_pending = create_sema(&c->sem_table[1], 0, 0); 754 c->wake_scheduled_sleep = create_sema(&c->sem_table[2], 0, 1); 755 # ifndef WORK_PIPE 756 c->responses_pending = create_sema(&c->sem_table[3], 0, 0); 757 # endif 758 } 759 760 /* -------------------------------------------------------------------- 761 * wait for semaphore. Where the wait can be interrupted, it will 762 * internally resume -- When this function returns, there is either no 763 * semaphore at all, a timeout occurred, or the caller could 764 * successfully take a token from the semaphore. 765 * 766 * For untimed wait, not checking the result of this function at all is 767 * definitely an option. 768 */ 769 static int 770 wait_for_sem( 771 sem_ref sem, 772 struct timespec * timeout /* wall-clock */ 773 ) 774 #ifdef SYS_WINNT 775 { 776 struct timespec now; 777 struct timespec delta; 778 DWORD msec; 779 DWORD rc; 780 781 if (!(sem && sem->shnd)) { 782 errno = EINVAL; 783 return -1; 784 } 785 786 if (NULL == timeout) { 787 msec = INFINITE; 788 } else { 789 getclock(TIMEOFDAY, &now); 790 delta = sub_tspec(*timeout, now); 791 if (delta.tv_sec < 0) { 792 msec = 0; 793 } else if ((delta.tv_sec + 1) >= (MAXDWORD / 1000)) { 794 msec = INFINITE; 795 } else { 796 msec = 1000 * (DWORD)delta.tv_sec; 797 msec += delta.tv_nsec / (1000 * 1000); 798 } 799 } 800 rc = WaitForSingleObject(sem->shnd, msec); 801 if (WAIT_OBJECT_0 == rc) 802 return 0; 803 if (WAIT_TIMEOUT == rc) { 804 errno = ETIMEDOUT; 805 return -1; 806 } 807 msyslog(LOG_ERR, "WaitForSingleObject unexpected 0x%x", rc); 808 errno = EFAULT; 809 return -1; 810 } 811 #else /* pthreads wait_for_sem() follows */ 812 { 813 int rc = -1; 814 815 if (sem) do { 816 if (NULL == timeout) 817 rc = sem_wait(sem); 818 else 819 rc = sem_timedwait(sem, timeout); 820 } while (rc == -1 && errno == EINTR); 821 else 822 errno = EINVAL; 823 824 return rc; 825 } 826 #endif 827 828 /* -------------------------------------------------------------------- 829 * blocking_thread - thread functions have WINAPI (aka 'stdcall') 830 * calling conventions under Windows and POSIX-defined signature 831 * otherwise. 832 */ 833 #ifdef SYS_WINNT 834 u_int WINAPI 835 #else 836 void * 837 #endif 838 blocking_thread( 839 void * ThreadArg 840 ) 841 { 842 blocking_child *c; 843 844 c = ThreadArg; 845 exit_worker(blocking_child_common(c)); 846 847 /* NOTREACHED */ 848 return 0; 849 } 850 851 /* -------------------------------------------------------------------- 852 * req_child_exit() runs in the parent. 853 * 854 * This function is called from from the idle timer, too, and possibly 855 * without a thread being there any longer. Since we have folded up our 856 * tent in that case and all the semaphores are already gone, we simply 857 * ignore this request in this case. 858 * 859 * Since the existence of the semaphores is controlled exclusively by 860 * the parent, there's no risk of data race here. 861 */ 862 int 863 req_child_exit( 864 blocking_child *c 865 ) 866 { 867 return (c->accesslock) 868 ? queue_req_pointer(c, CHILD_EXIT_REQ) 869 : 0; 870 } 871 872 /* -------------------------------------------------------------------- 873 * cleanup_after_child() runs in parent. 874 */ 875 static void 876 cleanup_after_child( 877 blocking_child * c 878 ) 879 { 880 DEBUG_INSIST(!c->reusable); 881 882 # ifdef SYS_WINNT 883 /* The thread was not created in detached state, so we better 884 * clean up. 885 */ 886 if (c->thread_ref && c->thread_ref->thnd) { 887 WaitForSingleObject(c->thread_ref->thnd, INFINITE); 888 INSIST(CloseHandle(c->thread_ref->thnd)); 889 c->thread_ref->thnd = NULL; 890 } 891 # endif 892 c->thread_ref = NULL; 893 894 /* remove semaphores and (if signalling vi IO) pipes */ 895 896 c->accesslock = delete_sema(c->accesslock); 897 c->workitems_pending = delete_sema(c->workitems_pending); 898 c->wake_scheduled_sleep = delete_sema(c->wake_scheduled_sleep); 899 900 # ifdef WORK_PIPE 901 DEBUG_INSIST(-1 != c->resp_read_pipe); 902 DEBUG_INSIST(-1 != c->resp_write_pipe); 903 (*addremove_io_fd)(c->resp_read_pipe, c->ispipe, TRUE); 904 close(c->resp_write_pipe); 905 close(c->resp_read_pipe); 906 c->resp_write_pipe = -1; 907 c->resp_read_pipe = -1; 908 # else 909 DEBUG_INSIST(NULL != c->responses_pending); 910 (*addremove_io_semaphore)(c->responses_pending->shnd, TRUE); 911 c->responses_pending = delete_sema(c->responses_pending); 912 # endif 913 914 /* Is it necessary to check if there are pending requests and 915 * responses? If so, and if there are, what to do with them? 916 */ 917 918 /* re-init buffer index sequencers */ 919 c->head_workitem = 0; 920 c->tail_workitem = 0; 921 c->head_response = 0; 922 c->tail_response = 0; 923 924 c->reusable = TRUE; 925 } 926 927 928 #else /* !WORK_THREAD follows */ 929 char work_thread_nonempty_compilation_unit; 930 #endif 931