1 #include "fs.h" 2 #include <string.h> 3 #include <assert.h> 4 5 static void *worker_main(void *arg); 6 static void worker_sleep(void); 7 static void worker_wake(struct worker_thread *worker); 8 9 static mthread_attr_t tattr; 10 static unsigned int pending; 11 static unsigned int busy; 12 static int block_all; 13 14 #if defined(_MINIX_MAGIC) 15 # define TH_STACKSIZE (64 * 1024) 16 #elif defined(MKCOVERAGE) 17 # define TH_STACKSIZE (40 * 1024) 18 #else 19 # define TH_STACKSIZE (28 * 1024) 20 #endif 21 22 #define ASSERTW(w) assert((w) >= &workers[0] && (w) < &workers[NR_WTHREADS]) 23 24 /*===========================================================================* 25 * worker_init * 26 *===========================================================================*/ 27 void worker_init(void) 28 { 29 /* Initialize worker threads */ 30 struct worker_thread *wp; 31 int i; 32 33 if (mthread_attr_init(&tattr) != 0) 34 panic("failed to initialize attribute"); 35 if (mthread_attr_setstacksize(&tattr, TH_STACKSIZE) != 0) 36 panic("couldn't set default thread stack size"); 37 38 pending = 0; 39 busy = 0; 40 block_all = FALSE; 41 42 for (i = 0; i < NR_WTHREADS; i++) { 43 wp = &workers[i]; 44 45 wp->w_fp = NULL; /* Mark not in use */ 46 wp->w_next = NULL; 47 wp->w_task = NONE; 48 if (mutex_init(&wp->w_event_mutex, NULL) != 0) 49 panic("failed to initialize mutex"); 50 if (cond_init(&wp->w_event, NULL) != 0) 51 panic("failed to initialize condition variable"); 52 if (mthread_create(&wp->w_tid, &tattr, worker_main, (void *) wp) != 0) 53 panic("unable to start thread"); 54 } 55 56 /* Let all threads get ready to accept work. */ 57 worker_yield(); 58 } 59 60 /*===========================================================================* 61 * worker_cleanup * 62 *===========================================================================*/ 63 void worker_cleanup(void) 64 { 65 /* Clean up worker threads, reversing the actions of worker_init() such that 66 * we can safely call worker_init() again later. All worker threads are 67 * expected to be idle already. Used for live updates, because transferring 68 * the thread stacks from one version to another is currently not feasible. 69 */ 70 struct worker_thread *wp; 71 int i; 72 73 assert(worker_idle()); 74 75 /* First terminate all threads. */ 76 for (i = 0; i < NR_WTHREADS; i++) { 77 wp = &workers[i]; 78 79 assert(wp->w_fp == NULL); 80 81 /* Waking up the thread with no w_fp will cause it to exit. */ 82 worker_wake(wp); 83 } 84 85 worker_yield(); 86 87 /* Then clean up their resources. */ 88 for (i = 0; i < NR_WTHREADS; i++) { 89 wp = &workers[i]; 90 91 if (mthread_join(wp->w_tid, NULL) != 0) 92 panic("worker_cleanup: could not join thread %d", i); 93 if (cond_destroy(&wp->w_event) != 0) 94 panic("failed to destroy condition variable"); 95 if (mutex_destroy(&wp->w_event_mutex) != 0) 96 panic("failed to destroy mutex"); 97 } 98 99 /* Finally, clean up global resources. */ 100 if (mthread_attr_destroy(&tattr) != 0) 101 panic("failed to destroy attribute"); 102 103 memset(workers, 0, sizeof(workers)); 104 } 105 106 /*===========================================================================* 107 * worker_idle * 108 *===========================================================================*/ 109 int worker_idle(void) 110 { 111 /* Return whether all worker threads are idle. */ 112 113 return (pending == 0 && busy == 0); 114 } 115 116 /*===========================================================================* 117 * worker_assign * 118 *===========================================================================*/ 119 static void worker_assign(struct fproc *rfp) 120 { 121 /* Assign the work for the given process to a free thread. The caller must 122 * ensure that there is in fact at least one free thread. 123 */ 124 struct worker_thread *worker; 125 int i; 126 127 /* Find a free worker thread. */ 128 for (i = 0; i < NR_WTHREADS; i++) { 129 worker = &workers[i]; 130 131 if (worker->w_fp == NULL) 132 break; 133 } 134 assert(worker != NULL); 135 136 /* Assign work to it. */ 137 rfp->fp_worker = worker; 138 worker->w_fp = rfp; 139 busy++; 140 141 worker_wake(worker); 142 } 143 144 /*===========================================================================* 145 * worker_may_do_pending * 146 *===========================================================================*/ 147 static int worker_may_do_pending(void) 148 { 149 /* Return whether there is a free thread that may do pending work. This is true 150 * only if there is pending work at all, and there is a free non-spare thread 151 * (the spare thread is never used for pending work), and VFS is currently 152 * processing new requests at all (this may not be true during initialization). 153 */ 154 155 /* Ordered by likelihood to be false. */ 156 return (pending > 0 && worker_available() > 1 && !block_all); 157 } 158 159 /*===========================================================================* 160 * worker_allow * 161 *===========================================================================*/ 162 void worker_allow(int allow) 163 { 164 /* Allow or disallow workers to process new work. If disallowed, any new work 165 * will be stored as pending, even when there are free worker threads. There is 166 * no facility to stop active workers. To be used only during initialization! 167 */ 168 struct fproc *rfp; 169 170 block_all = !allow; 171 172 if (!worker_may_do_pending()) 173 return; 174 175 /* Assign any pending work to workers. */ 176 for (rfp = &fproc[0]; rfp < &fproc[NR_PROCS]; rfp++) { 177 if (rfp->fp_flags & FP_PENDING) { 178 rfp->fp_flags &= ~FP_PENDING; /* No longer pending */ 179 assert(pending > 0); 180 pending--; 181 worker_assign(rfp); 182 183 if (!worker_may_do_pending()) 184 return; 185 } 186 } 187 } 188 189 /*===========================================================================* 190 * worker_get_work * 191 *===========================================================================*/ 192 static int worker_get_work(void) 193 { 194 /* Find new work to do. Work can be 'queued', 'pending', or absent. In the 195 * latter case wait for new work to come in. Return TRUE if there is work to 196 * do, or FALSE if the current thread is requested to shut down. 197 */ 198 struct fproc *rfp; 199 200 assert(self->w_fp == NULL); 201 202 /* Is there pending work, and should we do it? */ 203 if (worker_may_do_pending()) { 204 /* Find pending work */ 205 for (rfp = &fproc[0]; rfp < &fproc[NR_PROCS]; rfp++) { 206 if (rfp->fp_flags & FP_PENDING) { 207 self->w_fp = rfp; 208 rfp->fp_worker = self; 209 busy++; 210 rfp->fp_flags &= ~FP_PENDING; /* No longer pending */ 211 assert(pending > 0); 212 pending--; 213 return TRUE; 214 } 215 } 216 panic("Pending work inconsistency"); 217 } 218 219 /* Wait for work to come to us */ 220 worker_sleep(); 221 222 return (self->w_fp != NULL); 223 } 224 225 /*===========================================================================* 226 * worker_available * 227 *===========================================================================*/ 228 int worker_available(void) 229 { 230 /* Return the number of threads that are available, including the spare thread. 231 */ 232 233 return(NR_WTHREADS - busy); 234 } 235 236 /*===========================================================================* 237 * worker_main * 238 *===========================================================================*/ 239 static void *worker_main(void *arg) 240 { 241 /* Worker thread main loop */ 242 243 self = (struct worker_thread *) arg; 244 ASSERTW(self); 245 246 while (worker_get_work()) { 247 248 fp = self->w_fp; 249 assert(fp->fp_worker == self); 250 251 /* Lock the process. */ 252 lock_proc(fp); 253 254 /* The following two blocks could be run in a loop until both the 255 * conditions are no longer met, but it is currently impossible that 256 * more normal work is present after postponed PM work has been done. 257 */ 258 259 /* Perform normal work, if any. */ 260 if (fp->fp_func != NULL) { 261 self->w_m_in = fp->fp_msg; 262 err_code = OK; 263 264 fp->fp_func(); 265 266 fp->fp_func = NULL; /* deliberately unset AFTER the call */ 267 } 268 269 /* Perform postponed PM work, if any. */ 270 if (fp->fp_flags & FP_PM_WORK) { 271 self->w_m_in = fp->fp_pm_msg; 272 273 service_pm_postponed(); 274 275 fp->fp_flags &= ~FP_PM_WORK; 276 } 277 278 /* Perform cleanup actions. */ 279 thread_cleanup(); 280 281 unlock_proc(fp); 282 283 fp->fp_worker = NULL; 284 self->w_fp = NULL; 285 assert(busy > 0); 286 busy--; 287 } 288 289 return(NULL); 290 } 291 292 /*===========================================================================* 293 * worker_can_start * 294 *===========================================================================*/ 295 int worker_can_start(struct fproc *rfp) 296 { 297 /* Return whether normal (non-PM) work can be started for the given process. 298 * This function is used to serialize invocation of "special" procedures, and 299 * not entirely safe for other cases, as explained in the comments below. 300 */ 301 int is_pending, is_active, has_normal_work, has_pm_work; 302 303 is_pending = (rfp->fp_flags & FP_PENDING); 304 is_active = (rfp->fp_worker != NULL); 305 has_normal_work = (rfp->fp_func != NULL); 306 has_pm_work = (rfp->fp_flags & FP_PM_WORK); 307 308 /* If there is no work scheduled for the process, we can start work. */ 309 if (!is_pending && !is_active) return TRUE; 310 311 /* If there is already normal work scheduled for the process, we cannot add 312 * more, since we support only one normal job per process. 313 */ 314 if (has_normal_work) return FALSE; 315 316 /* If this process has pending PM work but no normal work, we can add the 317 * normal work for execution before the worker will start. 318 */ 319 if (is_pending) return TRUE; 320 321 /* However, if a worker is active for PM work, we cannot add normal work 322 * either, because the work will not be considered. For this reason, we can 323 * not use this function for processes that can possibly get postponed PM 324 * work. It is still safe for core system processes, though. 325 */ 326 return FALSE; 327 } 328 329 /*===========================================================================* 330 * worker_try_activate * 331 *===========================================================================*/ 332 static void worker_try_activate(struct fproc *rfp, int use_spare) 333 { 334 /* See if we can wake up a thread to do the work scheduled for the given 335 * process. If not, mark the process as having pending work for later. 336 */ 337 int needed; 338 339 /* Use the last available thread only if requested. Otherwise, leave at least 340 * one spare thread for deadlock resolution. 341 */ 342 needed = use_spare ? 1 : 2; 343 344 /* Also make sure that doing new work is allowed at all right now, which may 345 * not be the case during VFS initialization. We do always allow callback 346 * calls, i.e., calls that may use the spare thread. The reason is that we do 347 * not support callback calls being marked as pending, so the (entirely 348 * theoretical) exception here may (entirely theoretically) avoid deadlocks. 349 */ 350 if (needed <= worker_available() && (!block_all || use_spare)) { 351 worker_assign(rfp); 352 } else { 353 rfp->fp_flags |= FP_PENDING; 354 pending++; 355 } 356 } 357 358 /*===========================================================================* 359 * worker_start * 360 *===========================================================================*/ 361 void worker_start(struct fproc *rfp, void (*func)(void), message *m_ptr, 362 int use_spare) 363 { 364 /* Schedule work to be done by a worker thread. The work is bound to the given 365 * process. If a function pointer is given, the work is considered normal work, 366 * and the function will be called to handle it. If the function pointer is 367 * NULL, the work is considered postponed PM work, and service_pm_postponed 368 * will be called to handle it. The input message will be a copy of the given 369 * message. Optionally, the last spare (deadlock-resolving) thread may be used 370 * to execute the work immediately. 371 */ 372 int is_pm_work, is_pending, is_active, has_normal_work, has_pm_work; 373 374 assert(rfp != NULL); 375 376 is_pm_work = (func == NULL); 377 is_pending = (rfp->fp_flags & FP_PENDING); 378 is_active = (rfp->fp_worker != NULL); 379 has_normal_work = (rfp->fp_func != NULL); 380 has_pm_work = (rfp->fp_flags & FP_PM_WORK); 381 382 /* Sanity checks. If any of these trigger, someone messed up badly! */ 383 if (is_pending || is_active) { 384 if (is_pending && is_active) 385 panic("work cannot be both pending and active"); 386 387 /* The process cannot make more than one call at once. */ 388 if (!is_pm_work && has_normal_work) 389 panic("process has two calls (%x, %x)", 390 rfp->fp_msg.m_type, m_ptr->m_type); 391 392 /* PM will not send more than one job per process to us at once. */ 393 if (is_pm_work && has_pm_work) 394 panic("got two calls from PM (%x, %x)", 395 rfp->fp_pm_msg.m_type, m_ptr->m_type); 396 397 /* Despite PM's sys_delay_stop() system, it is possible that normal 398 * work (in particular, do_pending_pipe) arrives after postponed PM 399 * work has been scheduled for execution, so we don't check for that. 400 */ 401 #if 0 402 printf("VFS: adding %s work to %s thread\n", 403 is_pm_work ? "PM" : "normal", 404 is_pending ? "pending" : "active"); 405 #endif 406 } else { 407 /* Some cleanup step forgotten somewhere? */ 408 if (has_normal_work || has_pm_work) 409 panic("worker administration error"); 410 } 411 412 /* Save the work to be performed. */ 413 if (!is_pm_work) { 414 rfp->fp_msg = *m_ptr; 415 rfp->fp_func = func; 416 } else { 417 rfp->fp_pm_msg = *m_ptr; 418 rfp->fp_flags |= FP_PM_WORK; 419 } 420 421 /* If we have not only added to existing work, go look for a free thread. 422 * Note that we won't be using the spare thread for normal work if there is 423 * already PM work pending, but that situation will never occur in practice. 424 */ 425 if (!is_pending && !is_active) 426 worker_try_activate(rfp, use_spare); 427 } 428 429 /*===========================================================================* 430 * worker_yield * 431 *===========================================================================*/ 432 void worker_yield(void) 433 { 434 /* Yield to all worker threads. To be called from the main thread only. */ 435 436 mthread_yield_all(); 437 438 self = NULL; 439 } 440 441 /*===========================================================================* 442 * worker_sleep * 443 *===========================================================================*/ 444 static void worker_sleep(void) 445 { 446 struct worker_thread *worker = self; 447 ASSERTW(worker); 448 if (mutex_lock(&worker->w_event_mutex) != 0) 449 panic("unable to lock event mutex"); 450 if (cond_wait(&worker->w_event, &worker->w_event_mutex) != 0) 451 panic("could not wait on conditional variable"); 452 if (mutex_unlock(&worker->w_event_mutex) != 0) 453 panic("unable to unlock event mutex"); 454 self = worker; 455 } 456 457 /*===========================================================================* 458 * worker_wake * 459 *===========================================================================*/ 460 static void worker_wake(struct worker_thread *worker) 461 { 462 /* Signal a worker to wake up */ 463 ASSERTW(worker); 464 if (mutex_lock(&worker->w_event_mutex) != 0) 465 panic("unable to lock event mutex"); 466 if (cond_signal(&worker->w_event) != 0) 467 panic("unable to signal conditional variable"); 468 if (mutex_unlock(&worker->w_event_mutex) != 0) 469 panic("unable to unlock event mutex"); 470 } 471 472 /*===========================================================================* 473 * worker_suspend * 474 *===========================================================================*/ 475 struct worker_thread *worker_suspend(void) 476 { 477 /* Suspend the current thread, saving certain thread variables. Return a 478 * pointer to the thread's worker structure for later resumption. 479 */ 480 481 ASSERTW(self); 482 assert(fp != NULL); 483 assert(self->w_fp == fp); 484 assert(fp->fp_worker == self); 485 486 self->w_err_code = err_code; 487 488 return self; 489 } 490 491 /*===========================================================================* 492 * worker_resume * 493 *===========================================================================*/ 494 void worker_resume(struct worker_thread *org_self) 495 { 496 /* Resume the current thread after suspension, restoring thread variables. */ 497 498 ASSERTW(org_self); 499 500 self = org_self; 501 502 fp = self->w_fp; 503 assert(fp != NULL); 504 505 err_code = self->w_err_code; 506 } 507 508 /*===========================================================================* 509 * worker_wait * 510 *===========================================================================*/ 511 void worker_wait(void) 512 { 513 /* Put the current thread to sleep until woken up by the main thread. */ 514 515 (void) worker_suspend(); /* worker_sleep already saves and restores 'self' */ 516 517 worker_sleep(); 518 519 /* We continue here after waking up */ 520 worker_resume(self); 521 assert(self->w_next == NULL); 522 } 523 524 /*===========================================================================* 525 * worker_signal * 526 *===========================================================================*/ 527 void worker_signal(struct worker_thread *worker) 528 { 529 ASSERTW(worker); /* Make sure we have a valid thread */ 530 worker_wake(worker); 531 } 532 533 /*===========================================================================* 534 * worker_stop * 535 *===========================================================================*/ 536 void worker_stop(struct worker_thread *worker) 537 { 538 ASSERTW(worker); /* Make sure we have a valid thread */ 539 if (worker->w_task != NONE) { 540 /* This thread is communicating with a driver or file server */ 541 if (worker->w_drv_sendrec != NULL) { /* Driver */ 542 worker->w_drv_sendrec->m_type = EIO; 543 worker->w_drv_sendrec = NULL; 544 } else if (worker->w_sendrec != NULL) { /* FS */ 545 worker->w_sendrec->m_type = EIO; 546 worker->w_sendrec = NULL; 547 } else { 548 panic("reply storage consistency error"); /* Oh dear */ 549 } 550 } else { 551 /* This shouldn't happen at all... */ 552 printf("VFS: stopping worker not blocked on any task?\n"); 553 util_stacktrace(); 554 } 555 worker_wake(worker); 556 } 557 558 /*===========================================================================* 559 * worker_stop_by_endpt * 560 *===========================================================================*/ 561 void worker_stop_by_endpt(endpoint_t proc_e) 562 { 563 struct worker_thread *worker; 564 int i; 565 566 if (proc_e == NONE) return; 567 568 for (i = 0; i < NR_WTHREADS; i++) { 569 worker = &workers[i]; 570 if (worker->w_fp != NULL && worker->w_task == proc_e) 571 worker_stop(worker); 572 } 573 } 574 575 /*===========================================================================* 576 * worker_get * 577 *===========================================================================*/ 578 struct worker_thread *worker_get(thread_t worker_tid) 579 { 580 int i; 581 582 for (i = 0; i < NR_WTHREADS; i++) 583 if (workers[i].w_tid == worker_tid) 584 return(&workers[i]); 585 586 return(NULL); 587 } 588 589 /*===========================================================================* 590 * worker_set_proc * 591 *===========================================================================*/ 592 void worker_set_proc(struct fproc *rfp) 593 { 594 /* Perform an incredibly ugly action that completely violates the threading 595 * model: change the current working thread's process context to another 596 * process. The caller is expected to hold the lock to both the calling and the 597 * target process, and neither process is expected to continue regular 598 * operation when done. This code is here *only* and *strictly* for the reboot 599 * code, and *must not* be used for anything else. 600 */ 601 602 if (fp == rfp) return; 603 604 if (rfp->fp_worker != NULL) 605 panic("worker_set_proc: target process not idle"); 606 607 fp->fp_worker = NULL; 608 609 fp = rfp; 610 611 self->w_fp = rfp; 612 fp->fp_worker = self; 613 } 614