1 /* $NetBSD: work_thread.c,v 1.9 2024/08/18 20:47:13 christos Exp $ */ 2 3 /* 4 * work_thread.c - threads implementation for blocking worker child. 5 */ 6 #include <config.h> 7 #include "ntp_workimpl.h" 8 9 #ifdef WORK_THREAD 10 11 #include <stdio.h> 12 #include <ctype.h> 13 #include <signal.h> 14 #ifndef SYS_WINNT 15 #include <pthread.h> 16 #endif 17 18 #include "ntp_stdlib.h" 19 #include "ntp_malloc.h" 20 #include "ntp_syslog.h" 21 #include "ntpd.h" 22 #include "ntp_io.h" 23 #include "ntp_assert.h" 24 #include "ntp_unixtime.h" 25 #include "timespecops.h" 26 #include "ntp_worker.h" 27 28 #define CHILD_EXIT_REQ ((blocking_pipe_header *)(intptr_t)-1) 29 #define CHILD_GONE_RESP CHILD_EXIT_REQ 30 /* Queue size increments: 31 * The request queue grows a bit faster than the response queue -- the 32 * daemon can push requests and pull results faster on avarage than the 33 * worker can process requests and push results... If this really pays 34 * off is debatable. 35 */ 36 #define WORKITEMS_ALLOC_INC 16 37 #define RESPONSES_ALLOC_INC 4 38 39 /* Fiddle with min/max stack sizes. 64kB minimum seems to work, so we 40 * set the maximum to 256kB. If the minimum goes below the 41 * system-defined minimum stack size, we have to adjust accordingly. 42 */ 43 #ifndef THREAD_MINSTACKSIZE 44 # define THREAD_MINSTACKSIZE (64U * 1024) 45 #endif 46 47 #ifndef THREAD_MAXSTACKSIZE 48 # define THREAD_MAXSTACKSIZE (256U * 1024) 49 #endif 50 51 /* need a good integer to store a pointer... */ 52 #ifndef UINTPTR_T 53 # if defined(UINTPTR_MAX) 54 # define UINTPTR_T uintptr_t 55 # elif defined(UINT_PTR) 56 # define UINTPTR_T UINT_PTR 57 # else 58 # define UINTPTR_T size_t 59 # endif 60 #endif 61 62 63 #ifdef SYS_WINNT 64 65 # define thread_exit(c) _endthreadex(c) 66 # define tickle_sem(sh) ReleaseSemaphore((sh->shnd), 1, NULL) 67 u_int WINAPI blocking_thread(void *); 68 static BOOL same_os_sema(const sem_ref obj, void * osobj); 69 70 #else 71 72 # define thread_exit(c) pthread_exit((void*)(UINTPTR_T)(c)) 73 # define tickle_sem sem_post 74 void * blocking_thread(void *); 75 static void block_thread_signals(sigset_t *); 76 77 #endif 78 79 #ifdef WORK_PIPE 80 addremove_io_fd_func addremove_io_fd; 81 #else 82 addremove_io_semaphore_func addremove_io_semaphore; 83 #endif 84 85 static void start_blocking_thread(blocking_child *); 86 static void start_blocking_thread_internal(blocking_child *); 87 static void prepare_child_sems(blocking_child *); 88 static int wait_for_sem(sem_ref, struct timespec *); 89 static int ensure_workitems_empty_slot(blocking_child *); 90 static int ensure_workresp_empty_slot(blocking_child *); 91 static int queue_req_pointer(blocking_child *, blocking_pipe_header *); 92 static void cleanup_after_child(blocking_child *); 93 94 static sema_type worker_mmutex; 95 static sem_ref worker_memlock; 96 97 /* -------------------------------------------------------------------- 98 * locking the global worker state table (and other global stuff) 99 */ 100 void 101 worker_global_lock( 102 int inOrOut) 103 { 104 if (worker_memlock) { 105 if (inOrOut) 106 wait_for_sem(worker_memlock, NULL); 107 else 108 tickle_sem(worker_memlock); 109 } 110 } 111 112 /* -------------------------------------------------------------------- 113 * implementation isolation wrapper 114 */ 115 void 116 exit_worker( 117 int exitcode 118 ) 119 { 120 thread_exit(exitcode); /* see #define thread_exit */ 121 } 122 123 /* -------------------------------------------------------------------- 124 * sleep for a given time or until the wakup semaphore is tickled. 125 */ 126 int 127 worker_sleep( 128 blocking_child * c, 129 time_t seconds 130 ) 131 { 132 struct timespec until; 133 int rc; 134 135 # ifdef HAVE_CLOCK_GETTIME 136 if (0 != clock_gettime(CLOCK_REALTIME, &until)) { 137 msyslog(LOG_ERR, "worker_sleep: clock_gettime() failed: %m"); 138 return -1; 139 } 140 # else 141 if (0 != getclock(TIMEOFDAY, &until)) { 142 msyslog(LOG_ERR, "worker_sleep: getclock() failed: %m"); 143 return -1; 144 } 145 # endif 146 until.tv_sec += seconds; 147 rc = wait_for_sem(c->wake_scheduled_sleep, &until); 148 if (0 == rc) 149 return -1; 150 if (-1 == rc && ETIMEDOUT == errno) 151 return 0; 152 msyslog(LOG_ERR, "worker_sleep: sem_timedwait: %m"); 153 return -1; 154 } 155 156 157 /* -------------------------------------------------------------------- 158 * Wake up a worker that takes a nap. 159 */ 160 void 161 interrupt_worker_sleep(void) 162 { 163 u_int idx; 164 blocking_child * c; 165 166 for (idx = 0; idx < blocking_children_alloc; idx++) { 167 c = blocking_children[idx]; 168 if (NULL == c || NULL == c->wake_scheduled_sleep) 169 continue; 170 tickle_sem(c->wake_scheduled_sleep); 171 } 172 } 173 174 /* -------------------------------------------------------------------- 175 * Make sure there is an empty slot at the head of the request 176 * queue. Tell if the queue is currently empty. 177 */ 178 static int 179 ensure_workitems_empty_slot( 180 blocking_child *c 181 ) 182 { 183 /* 184 ** !!! PRECONDITION: caller holds access lock! 185 ** 186 ** This simply tries to increase the size of the buffer if it 187 ** becomes full. The resize operation does *not* maintain the 188 ** order of requests, but that should be irrelevant since the 189 ** processing is considered asynchronous anyway. 190 ** 191 ** Return if the buffer is currently empty. 192 */ 193 194 static const size_t each = 195 sizeof(blocking_children[0]->workitems[0]); 196 197 size_t new_alloc; 198 size_t slots_used; 199 size_t sidx; 200 201 slots_used = c->head_workitem - c->tail_workitem; 202 if (slots_used >= c->workitems_alloc) { 203 new_alloc = c->workitems_alloc + WORKITEMS_ALLOC_INC; 204 c->workitems = erealloc(c->workitems, new_alloc * each); 205 for (sidx = c->workitems_alloc; sidx < new_alloc; ++sidx) 206 c->workitems[sidx] = NULL; 207 c->tail_workitem = 0; 208 c->head_workitem = c->workitems_alloc; 209 c->workitems_alloc = new_alloc; 210 } 211 INSIST(NULL == c->workitems[c->head_workitem % c->workitems_alloc]); 212 return (0 == slots_used); 213 } 214 215 /* -------------------------------------------------------------------- 216 * Make sure there is an empty slot at the head of the response 217 * queue. Tell if the queue is currently empty. 218 */ 219 static int 220 ensure_workresp_empty_slot( 221 blocking_child *c 222 ) 223 { 224 /* 225 ** !!! PRECONDITION: caller holds access lock! 226 ** 227 ** Works like the companion function above. 228 */ 229 230 static const size_t each = 231 sizeof(blocking_children[0]->responses[0]); 232 233 size_t new_alloc; 234 size_t slots_used; 235 size_t sidx; 236 237 slots_used = c->head_response - c->tail_response; 238 if (slots_used >= c->responses_alloc) { 239 new_alloc = c->responses_alloc + RESPONSES_ALLOC_INC; 240 c->responses = erealloc(c->responses, new_alloc * each); 241 for (sidx = c->responses_alloc; sidx < new_alloc; ++sidx) 242 c->responses[sidx] = NULL; 243 c->tail_response = 0; 244 c->head_response = c->responses_alloc; 245 c->responses_alloc = new_alloc; 246 } 247 INSIST(NULL == c->responses[c->head_response % c->responses_alloc]); 248 return (0 == slots_used); 249 } 250 251 252 /* -------------------------------------------------------------------- 253 * queue_req_pointer() - append a work item or idle exit request to 254 * blocking_workitems[]. Employ proper locking. 255 */ 256 static int 257 queue_req_pointer( 258 blocking_child * c, 259 blocking_pipe_header * hdr 260 ) 261 { 262 size_t qhead; 263 264 /* >>>> ACCESS LOCKING STARTS >>>> */ 265 wait_for_sem(c->accesslock, NULL); 266 ensure_workitems_empty_slot(c); 267 qhead = c->head_workitem; 268 c->workitems[qhead % c->workitems_alloc] = hdr; 269 c->head_workitem = 1 + qhead; 270 tickle_sem(c->accesslock); 271 /* <<<< ACCESS LOCKING ENDS <<<< */ 272 273 /* queue consumer wake-up notification */ 274 tickle_sem(c->workitems_pending); 275 276 return 0; 277 } 278 279 /* -------------------------------------------------------------------- 280 * API function to make sure a worker is running, a proper private copy 281 * of the data is made, the data eneterd into the queue and the worker 282 * is signalled. 283 */ 284 int 285 send_blocking_req_internal( 286 blocking_child * c, 287 blocking_pipe_header * hdr, 288 void * data 289 ) 290 { 291 blocking_pipe_header * threadcopy; 292 size_t payload_octets; 293 294 REQUIRE(hdr != NULL); 295 REQUIRE(data != NULL); 296 DEBUG_REQUIRE(BLOCKING_REQ_MAGIC == hdr->magic_sig); 297 298 if (hdr->octets <= sizeof(*hdr)) 299 return 1; /* failure */ 300 payload_octets = hdr->octets - sizeof(*hdr); 301 302 if (NULL == c->thread_ref) 303 start_blocking_thread(c); 304 threadcopy = emalloc(hdr->octets); 305 memcpy(threadcopy, hdr, sizeof(*hdr)); 306 memcpy((char *)threadcopy + sizeof(*hdr), data, payload_octets); 307 308 return queue_req_pointer(c, threadcopy); 309 } 310 311 /* -------------------------------------------------------------------- 312 * Wait for the 'incoming queue no longer empty' signal, lock the shared 313 * structure and dequeue an item. 314 */ 315 blocking_pipe_header * 316 receive_blocking_req_internal( 317 blocking_child * c 318 ) 319 { 320 blocking_pipe_header * req; 321 size_t qhead, qtail; 322 323 req = NULL; 324 do { 325 /* wait for tickle from the producer side */ 326 wait_for_sem(c->workitems_pending, NULL); 327 328 /* >>>> ACCESS LOCKING STARTS >>>> */ 329 wait_for_sem(c->accesslock, NULL); 330 qhead = c->head_workitem; 331 do { 332 qtail = c->tail_workitem; 333 if (qhead == qtail) 334 break; 335 c->tail_workitem = qtail + 1; 336 qtail %= c->workitems_alloc; 337 req = c->workitems[qtail]; 338 c->workitems[qtail] = NULL; 339 } while (NULL == req); 340 tickle_sem(c->accesslock); 341 /* <<<< ACCESS LOCKING ENDS <<<< */ 342 343 } while (NULL == req); 344 345 INSIST(NULL != req); 346 if (CHILD_EXIT_REQ == req) { /* idled out */ 347 send_blocking_resp_internal(c, CHILD_GONE_RESP); 348 req = NULL; 349 } 350 351 return req; 352 } 353 354 /* -------------------------------------------------------------------- 355 * Push a response into the return queue and eventually tickle the 356 * receiver. 357 */ 358 int 359 send_blocking_resp_internal( 360 blocking_child * c, 361 blocking_pipe_header * resp 362 ) 363 { 364 size_t qhead; 365 int empty; 366 367 /* >>>> ACCESS LOCKING STARTS >>>> */ 368 wait_for_sem(c->accesslock, NULL); 369 empty = ensure_workresp_empty_slot(c); 370 qhead = c->head_response; 371 c->responses[qhead % c->responses_alloc] = resp; 372 c->head_response = 1 + qhead; 373 tickle_sem(c->accesslock); 374 /* <<<< ACCESS LOCKING ENDS <<<< */ 375 376 /* queue consumer wake-up notification */ 377 if (empty) 378 { 379 # ifdef WORK_PIPE 380 if (1 != write(c->resp_write_pipe, "", 1)) 381 msyslog(LOG_WARNING, "async resolver: blocking_get%sinfo" 382 " failed to notify main thread!", 383 (BLOCKING_GETNAMEINFO == resp->rtype) 384 ? "name" 385 : "addr" 386 ); 387 # else 388 tickle_sem(c->responses_pending); 389 # endif 390 } 391 return 0; 392 } 393 394 395 #ifndef WORK_PIPE 396 397 /* -------------------------------------------------------------------- 398 * Check if a (Windows-)handle to a semaphore is actually the same we 399 * are using inside the sema wrapper. 400 */ 401 static BOOL 402 same_os_sema( 403 const sem_ref obj, 404 void* osh 405 ) 406 { 407 return obj && osh && (obj->shnd == (HANDLE)osh); 408 } 409 410 /* -------------------------------------------------------------------- 411 * Find the shared context that associates to an OS handle and make sure 412 * the data is dequeued and processed. 413 */ 414 void 415 handle_blocking_resp_sem( 416 void * context 417 ) 418 { 419 blocking_child * c; 420 u_int idx; 421 422 c = NULL; 423 for (idx = 0; idx < blocking_children_alloc; idx++) { 424 c = blocking_children[idx]; 425 if (c != NULL && 426 c->thread_ref != NULL && 427 same_os_sema(c->responses_pending, context)) 428 break; 429 } 430 if (idx < blocking_children_alloc) 431 process_blocking_resp(c); 432 } 433 #endif /* !WORK_PIPE */ 434 435 /* -------------------------------------------------------------------- 436 * Fetch the next response from the return queue. In case of signalling 437 * via pipe, make sure the pipe is flushed, too. 438 */ 439 blocking_pipe_header * 440 receive_blocking_resp_internal( 441 blocking_child * c 442 ) 443 { 444 blocking_pipe_header * removed; 445 size_t qhead, qtail, slot; 446 447 #ifdef WORK_PIPE 448 int rc; 449 char scratch[32]; 450 451 do 452 rc = read(c->resp_read_pipe, scratch, sizeof(scratch)); 453 while (-1 == rc && EINTR == errno); 454 #endif 455 456 /* >>>> ACCESS LOCKING STARTS >>>> */ 457 wait_for_sem(c->accesslock, NULL); 458 qhead = c->head_response; 459 qtail = c->tail_response; 460 for (removed = NULL; !removed && (qhead != qtail); ++qtail) { 461 slot = qtail % c->responses_alloc; 462 removed = c->responses[slot]; 463 c->responses[slot] = NULL; 464 } 465 c->tail_response = qtail; 466 tickle_sem(c->accesslock); 467 /* <<<< ACCESS LOCKING ENDS <<<< */ 468 469 if (NULL != removed) { 470 DEBUG_ENSURE(CHILD_GONE_RESP == removed || 471 BLOCKING_RESP_MAGIC == removed->magic_sig); 472 } 473 if (CHILD_GONE_RESP == removed) { 474 cleanup_after_child(c); 475 removed = NULL; 476 } 477 478 return removed; 479 } 480 481 /* -------------------------------------------------------------------- 482 * Light up a new worker. 483 */ 484 static void 485 start_blocking_thread( 486 blocking_child * c 487 ) 488 { 489 490 DEBUG_INSIST(!c->reusable); 491 492 prepare_child_sems(c); 493 start_blocking_thread_internal(c); 494 } 495 496 /* -------------------------------------------------------------------- 497 * Create a worker thread. There are several differences between POSIX 498 * and Windows, of course -- most notably the Windows thread is a 499 * detached thread, and we keep the handle around until we want to get 500 * rid of the thread. The notification scheme also differs: Windows 501 * makes use of semaphores in both directions, POSIX uses a pipe for 502 * integration with 'select()' or alike. 503 */ 504 static void 505 start_blocking_thread_internal( 506 blocking_child * c 507 ) 508 #ifdef SYS_WINNT 509 { 510 BOOL resumed; 511 512 c->thread_ref = NULL; 513 (*addremove_io_semaphore)(c->responses_pending->shnd, FALSE); 514 c->thr_table[0].thnd = 515 (HANDLE)_beginthreadex( 516 NULL, 517 0, 518 &blocking_thread, 519 c, 520 CREATE_SUSPENDED, 521 NULL); 522 523 if (NULL == c->thr_table[0].thnd) { 524 msyslog(LOG_ERR, "start blocking thread failed: %m"); 525 exit(-1); 526 } 527 /* remember the thread priority is only within the process class */ 528 if (!SetThreadPriority(c->thr_table[0].thnd, 529 THREAD_PRIORITY_BELOW_NORMAL)) { 530 msyslog(LOG_ERR, "Error lowering blocking thread priority: %m"); 531 } 532 if (NULL != pSetThreadDescription) { 533 (*pSetThreadDescription)(c->thr_table[0].thnd, L"ntp_worker"); 534 } 535 resumed = ResumeThread(c->thr_table[0].thnd); 536 DEBUG_INSIST(resumed); 537 c->thread_ref = &c->thr_table[0]; 538 } 539 #else /* pthreads start_blocking_thread_internal() follows */ 540 { 541 # ifdef NEED_PTHREAD_INIT 542 static int pthread_init_called; 543 # endif 544 pthread_attr_t thr_attr; 545 int rc; 546 int pipe_ends[2]; /* read then write */ 547 int is_pipe; 548 int flags; 549 size_t ostacksize; 550 size_t nstacksize; 551 sigset_t saved_sig_mask; 552 553 c->thread_ref = NULL; 554 555 # ifdef NEED_PTHREAD_INIT 556 /* 557 * from lib/isc/unix/app.c: 558 * BSDI 3.1 seg faults in pthread_sigmask() if we don't do this. 559 */ 560 if (!pthread_init_called) { 561 pthread_init(); 562 pthread_init_called = TRUE; 563 } 564 # endif 565 566 rc = pipe_socketpair(&pipe_ends[0], &is_pipe); 567 if (0 != rc) { 568 msyslog(LOG_ERR, "start_blocking_thread: pipe_socketpair() %m"); 569 exit(1); 570 } 571 c->resp_read_pipe = move_fd(pipe_ends[0]); 572 c->resp_write_pipe = move_fd(pipe_ends[1]); 573 c->ispipe = is_pipe; 574 flags = fcntl(c->resp_read_pipe, F_GETFL, 0); 575 if (-1 == flags) { 576 msyslog(LOG_ERR, "start_blocking_thread: fcntl(F_GETFL) %m"); 577 exit(1); 578 } 579 rc = fcntl(c->resp_read_pipe, F_SETFL, O_NONBLOCK | flags); 580 if (-1 == rc) { 581 msyslog(LOG_ERR, 582 "start_blocking_thread: fcntl(F_SETFL, O_NONBLOCK) %m"); 583 exit(1); 584 } 585 (*addremove_io_fd)(c->resp_read_pipe, c->ispipe, FALSE); 586 pthread_attr_init(&thr_attr); 587 pthread_attr_setdetachstate(&thr_attr, PTHREAD_CREATE_DETACHED); 588 #if defined(HAVE_PTHREAD_ATTR_GETSTACKSIZE) && \ 589 defined(HAVE_PTHREAD_ATTR_SETSTACKSIZE) 590 rc = pthread_attr_getstacksize(&thr_attr, &ostacksize); 591 if (0 != rc) { 592 msyslog(LOG_ERR, 593 "start_blocking_thread: pthread_attr_getstacksize() -> %s", 594 strerror(rc)); 595 } else { 596 nstacksize = ostacksize; 597 /* order is important here: first clamp on upper limit, 598 * and the PTHREAD min stack size is ultimate override! 599 */ 600 if (nstacksize > THREAD_MAXSTACKSIZE) 601 nstacksize = THREAD_MAXSTACKSIZE; 602 # ifdef PTHREAD_STACK_MAX 603 if (nstacksize > PTHREAD_STACK_MAX) 604 nstacksize = PTHREAD_STACK_MAX; 605 # endif 606 607 /* now clamp on lower stack limit. */ 608 if (nstacksize < THREAD_MINSTACKSIZE) 609 nstacksize = THREAD_MINSTACKSIZE; 610 # ifdef PTHREAD_STACK_MIN 611 if (nstacksize < PTHREAD_STACK_MIN) 612 nstacksize = PTHREAD_STACK_MIN; 613 # endif 614 615 if (nstacksize != ostacksize) 616 rc = pthread_attr_setstacksize(&thr_attr, nstacksize); 617 if (0 != rc) 618 msyslog(LOG_ERR, 619 "start_blocking_thread: pthread_attr_setstacksize(0x%lx -> 0x%lx) -> %s", 620 (u_long)ostacksize, (u_long)nstacksize, 621 strerror(rc)); 622 } 623 #else 624 UNUSED_ARG(nstacksize); 625 UNUSED_ARG(ostacksize); 626 #endif 627 #if defined(PTHREAD_SCOPE_SYSTEM) && defined(NEED_PTHREAD_SCOPE_SYSTEM) 628 pthread_attr_setscope(&thr_attr, PTHREAD_SCOPE_SYSTEM); 629 #endif 630 c->thread_ref = emalloc_zero(sizeof(*c->thread_ref)); 631 block_thread_signals(&saved_sig_mask); 632 rc = pthread_create(&c->thr_table[0], &thr_attr, 633 &blocking_thread, c); 634 pthread_sigmask(SIG_SETMASK, &saved_sig_mask, NULL); 635 pthread_attr_destroy(&thr_attr); 636 if (0 != rc) { 637 msyslog(LOG_ERR, "start_blocking_thread: pthread_create() -> %s", 638 strerror(rc)); 639 exit(1); 640 } 641 c->thread_ref = &c->thr_table[0]; 642 } 643 #endif 644 645 /* -------------------------------------------------------------------- 646 * block_thread_signals() 647 * 648 * Temporarily block signals used by ntpd main thread, so that signal 649 * mask inherited by child threads leaves them blocked. Returns prior 650 * active signal mask via pmask, to be restored by the main thread 651 * after pthread_create(). 652 */ 653 #ifndef SYS_WINNT 654 void 655 block_thread_signals( 656 sigset_t * pmask 657 ) 658 { 659 sigset_t block; 660 661 sigemptyset(&block); 662 # ifdef HAVE_SIGNALED_IO 663 # ifdef SIGIO 664 sigaddset(&block, SIGIO); 665 # endif 666 # ifdef SIGPOLL 667 sigaddset(&block, SIGPOLL); 668 # endif 669 # endif /* HAVE_SIGNALED_IO */ 670 sigaddset(&block, SIGALRM); 671 sigaddset(&block, MOREDEBUGSIG); 672 sigaddset(&block, LESSDEBUGSIG); 673 # ifdef SIGDIE1 674 sigaddset(&block, SIGDIE1); 675 # endif 676 # ifdef SIGDIE2 677 sigaddset(&block, SIGDIE2); 678 # endif 679 # ifdef SIGDIE3 680 sigaddset(&block, SIGDIE3); 681 # endif 682 # ifdef SIGDIE4 683 sigaddset(&block, SIGDIE4); 684 # endif 685 # ifdef SIGBUS 686 sigaddset(&block, SIGBUS); 687 # endif 688 sigemptyset(pmask); 689 pthread_sigmask(SIG_BLOCK, &block, pmask); 690 } 691 #endif /* !SYS_WINNT */ 692 693 694 /* -------------------------------------------------------------------- 695 * Create & destroy semaphores. This is sufficiently different between 696 * POSIX and Windows to warrant wrapper functions and close enough to 697 * use the concept of synchronization via semaphore for all platforms. 698 */ 699 static sem_ref 700 create_sema( 701 sema_type* semptr, 702 u_int inival, 703 u_int maxval) 704 { 705 #ifdef SYS_WINNT 706 707 long svini, svmax; 708 if (NULL != semptr) { 709 svini = (inival < LONG_MAX) 710 ? (long)inival : LONG_MAX; 711 svmax = (maxval < LONG_MAX && maxval > 0) 712 ? (long)maxval : LONG_MAX; 713 semptr->shnd = CreateSemaphore(NULL, svini, svmax, NULL); 714 if (NULL == semptr->shnd) 715 semptr = NULL; 716 } 717 718 #else 719 720 (void)maxval; 721 if (semptr && sem_init(semptr, FALSE, inival)) 722 semptr = NULL; 723 724 #endif 725 726 return semptr; 727 } 728 729 /* ------------------------------------------------------------------ */ 730 static sem_ref 731 delete_sema( 732 sem_ref obj) 733 { 734 735 # ifdef SYS_WINNT 736 737 if (obj) { 738 if (obj->shnd) 739 CloseHandle(obj->shnd); 740 obj->shnd = NULL; 741 } 742 743 # else 744 745 if (obj) 746 sem_destroy(obj); 747 748 # endif 749 750 return NULL; 751 } 752 753 /* -------------------------------------------------------------------- 754 * prepare_child_sems() 755 * 756 * create sync & access semaphores 757 * 758 * All semaphores are cleared, only the access semaphore has 1 unit. 759 * Childs wait on 'workitems_pending', then grabs 'sema_access' 760 * and dequeues jobs. When done, 'sema_access' is given one unit back. 761 * 762 * The producer grabs 'sema_access', manages the queue, restores 763 * 'sema_access' and puts one unit into 'workitems_pending'. 764 * 765 * The story goes the same for the response queue. 766 */ 767 static void 768 prepare_child_sems( 769 blocking_child *c 770 ) 771 { 772 if (NULL == worker_memlock) 773 worker_memlock = create_sema(&worker_mmutex, 1, 1); 774 775 c->accesslock = create_sema(&c->sem_table[0], 1, 1); 776 c->workitems_pending = create_sema(&c->sem_table[1], 0, 0); 777 c->wake_scheduled_sleep = create_sema(&c->sem_table[2], 0, 1); 778 # ifndef WORK_PIPE 779 c->responses_pending = create_sema(&c->sem_table[3], 0, 0); 780 # endif 781 } 782 783 /* -------------------------------------------------------------------- 784 * wait for semaphore. Where the wait can be interrupted, it will 785 * internally resume -- When this function returns, there is either no 786 * semaphore at all, a timeout occurred, or the caller could 787 * successfully take a token from the semaphore. 788 * 789 * For untimed wait, not checking the result of this function at all is 790 * definitely an option. 791 */ 792 static int 793 wait_for_sem( 794 sem_ref sem, 795 struct timespec * timeout /* wall-clock */ 796 ) 797 #ifdef SYS_WINNT 798 { 799 struct timespec now; 800 struct timespec delta; 801 DWORD msec; 802 DWORD rc; 803 804 if (!(sem && sem->shnd)) { 805 errno = EINVAL; 806 return -1; 807 } 808 809 if (NULL == timeout) { 810 msec = INFINITE; 811 } else { 812 getclock(TIMEOFDAY, &now); 813 delta = sub_tspec(*timeout, now); 814 if (delta.tv_sec < 0) { 815 msec = 0; 816 } else if ((delta.tv_sec + 1) >= (MAXDWORD / 1000)) { 817 msec = INFINITE; 818 } else { 819 msec = 1000 * (DWORD)delta.tv_sec; 820 msec += delta.tv_nsec / (1000 * 1000); 821 } 822 } 823 rc = WaitForSingleObject(sem->shnd, msec); 824 if (WAIT_OBJECT_0 == rc) 825 return 0; 826 if (WAIT_TIMEOUT == rc) { 827 errno = ETIMEDOUT; 828 return -1; 829 } 830 msyslog(LOG_ERR, "WaitForSingleObject unexpected 0x%x", rc); 831 errno = EFAULT; 832 return -1; 833 } 834 #else /* pthreads wait_for_sem() follows */ 835 { 836 int rc = -1; 837 838 if (sem) do { 839 if (NULL == timeout) 840 rc = sem_wait(sem); 841 else 842 rc = sem_timedwait(sem, timeout); 843 } while (rc == -1 && errno == EINTR); 844 else 845 errno = EINVAL; 846 847 return rc; 848 } 849 #endif 850 851 /* -------------------------------------------------------------------- 852 * blocking_thread - thread functions have WINAPI (aka 'stdcall') 853 * calling conventions under Windows and POSIX-defined signature 854 * otherwise. 855 */ 856 #ifdef SYS_WINNT 857 u_int WINAPI 858 #else 859 void * 860 #endif 861 blocking_thread( 862 void * ThreadArg 863 ) 864 { 865 blocking_child *c; 866 867 c = ThreadArg; 868 exit_worker(blocking_child_common(c)); 869 870 /* NOTREACHED */ 871 return 0; 872 } 873 874 /* -------------------------------------------------------------------- 875 * req_child_exit() runs in the parent. 876 * 877 * This function is called from from the idle timer, too, and possibly 878 * without a thread being there any longer. Since we have folded up our 879 * tent in that case and all the semaphores are already gone, we simply 880 * ignore this request in this case. 881 * 882 * Since the existence of the semaphores is controlled exclusively by 883 * the parent, there's no risk of data race here. 884 */ 885 int 886 req_child_exit( 887 blocking_child *c 888 ) 889 { 890 return (c->accesslock) 891 ? queue_req_pointer(c, CHILD_EXIT_REQ) 892 : 0; 893 } 894 895 /* -------------------------------------------------------------------- 896 * cleanup_after_child() runs in parent. 897 */ 898 static void 899 cleanup_after_child( 900 blocking_child * c 901 ) 902 { 903 DEBUG_INSIST(!c->reusable); 904 905 # ifdef SYS_WINNT 906 /* The thread was not created in detached state, so we better 907 * clean up. 908 */ 909 if (c->thread_ref && c->thread_ref->thnd) { 910 WaitForSingleObject(c->thread_ref->thnd, INFINITE); 911 INSIST(CloseHandle(c->thread_ref->thnd)); 912 c->thread_ref->thnd = NULL; 913 } 914 # endif 915 c->thread_ref = NULL; 916 917 /* remove semaphores and (if signalling vi IO) pipes */ 918 919 c->accesslock = delete_sema(c->accesslock); 920 c->workitems_pending = delete_sema(c->workitems_pending); 921 c->wake_scheduled_sleep = delete_sema(c->wake_scheduled_sleep); 922 923 # ifdef WORK_PIPE 924 DEBUG_INSIST(-1 != c->resp_read_pipe); 925 DEBUG_INSIST(-1 != c->resp_write_pipe); 926 (*addremove_io_fd)(c->resp_read_pipe, c->ispipe, TRUE); 927 close(c->resp_write_pipe); 928 close(c->resp_read_pipe); 929 c->resp_write_pipe = -1; 930 c->resp_read_pipe = -1; 931 # else 932 DEBUG_INSIST(NULL != c->responses_pending); 933 (*addremove_io_semaphore)(c->responses_pending->shnd, TRUE); 934 c->responses_pending = delete_sema(c->responses_pending); 935 # endif 936 937 /* Is it necessary to check if there are pending requests and 938 * responses? If so, and if there are, what to do with them? 939 */ 940 941 /* re-init buffer index sequencers */ 942 c->head_workitem = 0; 943 c->tail_workitem = 0; 944 c->head_response = 0; 945 c->tail_response = 0; 946 947 c->reusable = TRUE; 948 } 949 950 951 #else /* !WORK_THREAD follows */ 952 char work_thread_nonempty_compilation_unit; 953 #endif 954