1 #include "fs.h" 2 #include <string.h> 3 #include <assert.h> 4 5 static void *worker_main(void *arg); 6 static void worker_sleep(void); 7 static void worker_wake(struct worker_thread *worker); 8 9 static mthread_attr_t tattr; 10 static unsigned int pending; 11 static unsigned int busy; 12 static int block_all; 13 14 #ifdef MKCOVERAGE 15 # define TH_STACKSIZE (40 * 1024) 16 #else 17 # define TH_STACKSIZE (28 * 1024) 18 #endif 19 20 #define ASSERTW(w) assert((w) >= &workers[0] && (w) < &workers[NR_WTHREADS]) 21 22 /*===========================================================================* 23 * worker_init * 24 *===========================================================================*/ 25 void worker_init(void) 26 { 27 /* Initialize worker threads */ 28 struct worker_thread *wp; 29 int i; 30 31 if (mthread_attr_init(&tattr) != 0) 32 panic("failed to initialize attribute"); 33 if (mthread_attr_setstacksize(&tattr, TH_STACKSIZE) != 0) 34 panic("couldn't set default thread stack size"); 35 36 pending = 0; 37 busy = 0; 38 block_all = FALSE; 39 40 for (i = 0; i < NR_WTHREADS; i++) { 41 wp = &workers[i]; 42 43 wp->w_fp = NULL; /* Mark not in use */ 44 wp->w_next = NULL; 45 wp->w_task = NONE; 46 if (mutex_init(&wp->w_event_mutex, NULL) != 0) 47 panic("failed to initialize mutex"); 48 if (cond_init(&wp->w_event, NULL) != 0) 49 panic("failed to initialize condition variable"); 50 if (mthread_create(&wp->w_tid, &tattr, worker_main, (void *) wp) != 0) 51 panic("unable to start thread"); 52 } 53 54 /* Let all threads get ready to accept work. */ 55 worker_yield(); 56 } 57 58 /*===========================================================================* 59 * worker_cleanup * 60 *===========================================================================*/ 61 void worker_cleanup(void) 62 { 63 /* Clean up worker threads, reversing the actions of worker_init() such that 64 * we can safely call worker_init() again later. All worker threads are 65 * expected to be idle already. Used for live updates, because transferring 66 * the thread stacks from one version to another is currently not feasible. 67 */ 68 struct worker_thread *wp; 69 int i; 70 71 assert(worker_idle()); 72 73 /* First terminate all threads. */ 74 for (i = 0; i < NR_WTHREADS; i++) { 75 wp = &workers[i]; 76 77 assert(wp->w_fp == NULL); 78 79 /* Waking up the thread with no w_fp will cause it to exit. */ 80 worker_wake(wp); 81 } 82 83 worker_yield(); 84 85 /* Then clean up their resources. */ 86 for (i = 0; i < NR_WTHREADS; i++) { 87 wp = &workers[i]; 88 89 if (mthread_join(wp->w_tid, NULL) != 0) 90 panic("worker_cleanup: could not join thread %d", i); 91 if (cond_destroy(&wp->w_event) != 0) 92 panic("failed to destroy condition variable"); 93 if (mutex_destroy(&wp->w_event_mutex) != 0) 94 panic("failed to destroy mutex"); 95 } 96 97 /* Finally, clean up global resources. */ 98 if (mthread_attr_destroy(&tattr) != 0) 99 panic("failed to destroy attribute"); 100 101 memset(workers, 0, sizeof(workers)); 102 } 103 104 /*===========================================================================* 105 * worker_idle * 106 *===========================================================================*/ 107 int worker_idle(void) 108 { 109 /* Return whether all worker threads are idle. */ 110 111 return (pending == 0 && busy == 0); 112 } 113 114 /*===========================================================================* 115 * worker_assign * 116 *===========================================================================*/ 117 static void worker_assign(struct fproc *rfp) 118 { 119 /* Assign the work for the given process to a free thread. The caller must 120 * ensure that there is in fact at least one free thread. 121 */ 122 struct worker_thread *worker; 123 int i; 124 125 /* Find a free worker thread. */ 126 for (i = 0; i < NR_WTHREADS; i++) { 127 worker = &workers[i]; 128 129 if (worker->w_fp == NULL) 130 break; 131 } 132 assert(worker != NULL); 133 134 /* Assign work to it. */ 135 rfp->fp_worker = worker; 136 worker->w_fp = rfp; 137 busy++; 138 139 worker_wake(worker); 140 } 141 142 /*===========================================================================* 143 * worker_may_do_pending * 144 *===========================================================================*/ 145 static int worker_may_do_pending(void) 146 { 147 /* Return whether there is a free thread that may do pending work. This is true 148 * only if there is pending work at all, and there is a free non-spare thread 149 * (the spare thread is never used for pending work), and VFS is currently 150 * processing new requests at all (this may not be true during initialization). 151 */ 152 153 /* Ordered by likelihood to be false. */ 154 return (pending > 0 && worker_available() > 1 && !block_all); 155 } 156 157 /*===========================================================================* 158 * worker_allow * 159 *===========================================================================*/ 160 void worker_allow(int allow) 161 { 162 /* Allow or disallow workers to process new work. If disallowed, any new work 163 * will be stored as pending, even when there are free worker threads. There is 164 * no facility to stop active workers. To be used only during initialization! 165 */ 166 struct fproc *rfp; 167 168 block_all = !allow; 169 170 if (!worker_may_do_pending()) 171 return; 172 173 /* Assign any pending work to workers. */ 174 for (rfp = &fproc[0]; rfp < &fproc[NR_PROCS]; rfp++) { 175 if (rfp->fp_flags & FP_PENDING) { 176 rfp->fp_flags &= ~FP_PENDING; /* No longer pending */ 177 assert(pending > 0); 178 pending--; 179 worker_assign(rfp); 180 181 if (!worker_may_do_pending()) 182 return; 183 } 184 } 185 } 186 187 /*===========================================================================* 188 * worker_get_work * 189 *===========================================================================*/ 190 static int worker_get_work(void) 191 { 192 /* Find new work to do. Work can be 'queued', 'pending', or absent. In the 193 * latter case wait for new work to come in. Return TRUE if there is work to 194 * do, or FALSE if the current thread is requested to shut down. 195 */ 196 struct fproc *rfp; 197 198 assert(self->w_fp == NULL); 199 200 /* Is there pending work, and should we do it? */ 201 if (worker_may_do_pending()) { 202 /* Find pending work */ 203 for (rfp = &fproc[0]; rfp < &fproc[NR_PROCS]; rfp++) { 204 if (rfp->fp_flags & FP_PENDING) { 205 self->w_fp = rfp; 206 rfp->fp_worker = self; 207 busy++; 208 rfp->fp_flags &= ~FP_PENDING; /* No longer pending */ 209 assert(pending > 0); 210 pending--; 211 return TRUE; 212 } 213 } 214 panic("Pending work inconsistency"); 215 } 216 217 /* Wait for work to come to us */ 218 worker_sleep(); 219 220 return (self->w_fp != NULL); 221 } 222 223 /*===========================================================================* 224 * worker_available * 225 *===========================================================================*/ 226 int worker_available(void) 227 { 228 /* Return the number of threads that are available, including the spare thread. 229 */ 230 231 return(NR_WTHREADS - busy); 232 } 233 234 /*===========================================================================* 235 * worker_main * 236 *===========================================================================*/ 237 static void *worker_main(void *arg) 238 { 239 /* Worker thread main loop */ 240 241 self = (struct worker_thread *) arg; 242 ASSERTW(self); 243 244 while (worker_get_work()) { 245 246 fp = self->w_fp; 247 assert(fp->fp_worker == self); 248 249 /* Lock the process. */ 250 lock_proc(fp); 251 252 /* The following two blocks could be run in a loop until both the 253 * conditions are no longer met, but it is currently impossible that 254 * more normal work is present after postponed PM work has been done. 255 */ 256 257 /* Perform normal work, if any. */ 258 if (fp->fp_func != NULL) { 259 self->w_m_in = fp->fp_msg; 260 err_code = OK; 261 262 fp->fp_func(); 263 264 fp->fp_func = NULL; /* deliberately unset AFTER the call */ 265 } 266 267 /* Perform postponed PM work, if any. */ 268 if (fp->fp_flags & FP_PM_WORK) { 269 self->w_m_in = fp->fp_pm_msg; 270 271 service_pm_postponed(); 272 273 fp->fp_flags &= ~FP_PM_WORK; 274 } 275 276 /* Perform cleanup actions. */ 277 thread_cleanup(); 278 279 unlock_proc(fp); 280 281 fp->fp_worker = NULL; 282 self->w_fp = NULL; 283 assert(busy > 0); 284 busy--; 285 } 286 287 return(NULL); 288 } 289 290 /*===========================================================================* 291 * worker_can_start * 292 *===========================================================================*/ 293 int worker_can_start(struct fproc *rfp) 294 { 295 /* Return whether normal (non-PM) work can be started for the given process. 296 * This function is used to serialize invocation of "special" procedures, and 297 * not entirely safe for other cases, as explained in the comments below. 298 */ 299 int is_pending, is_active, has_normal_work, has_pm_work; 300 301 is_pending = (rfp->fp_flags & FP_PENDING); 302 is_active = (rfp->fp_worker != NULL); 303 has_normal_work = (rfp->fp_func != NULL); 304 has_pm_work = (rfp->fp_flags & FP_PM_WORK); 305 306 /* If there is no work scheduled for the process, we can start work. */ 307 if (!is_pending && !is_active) return TRUE; 308 309 /* If there is already normal work scheduled for the process, we cannot add 310 * more, since we support only one normal job per process. 311 */ 312 if (has_normal_work) return FALSE; 313 314 /* If this process has pending PM work but no normal work, we can add the 315 * normal work for execution before the worker will start. 316 */ 317 if (is_pending) return TRUE; 318 319 /* However, if a worker is active for PM work, we cannot add normal work 320 * either, because the work will not be considered. For this reason, we can 321 * not use this function for processes that can possibly get postponed PM 322 * work. It is still safe for core system processes, though. 323 */ 324 return FALSE; 325 } 326 327 /*===========================================================================* 328 * worker_try_activate * 329 *===========================================================================*/ 330 static void worker_try_activate(struct fproc *rfp, int use_spare) 331 { 332 /* See if we can wake up a thread to do the work scheduled for the given 333 * process. If not, mark the process as having pending work for later. 334 */ 335 int needed; 336 337 /* Use the last available thread only if requested. Otherwise, leave at least 338 * one spare thread for deadlock resolution. 339 */ 340 needed = use_spare ? 1 : 2; 341 342 /* Also make sure that doing new work is allowed at all right now, which may 343 * not be the case during VFS initialization. We do always allow callback 344 * calls, i.e., calls that may use the spare thread. The reason is that we do 345 * not support callback calls being marked as pending, so the (entirely 346 * theoretical) exception here may (entirely theoretically) avoid deadlocks. 347 */ 348 if (needed <= worker_available() && (!block_all || use_spare)) { 349 worker_assign(rfp); 350 } else { 351 rfp->fp_flags |= FP_PENDING; 352 pending++; 353 } 354 } 355 356 /*===========================================================================* 357 * worker_start * 358 *===========================================================================*/ 359 void worker_start(struct fproc *rfp, void (*func)(void), message *m_ptr, 360 int use_spare) 361 { 362 /* Schedule work to be done by a worker thread. The work is bound to the given 363 * process. If a function pointer is given, the work is considered normal work, 364 * and the function will be called to handle it. If the function pointer is 365 * NULL, the work is considered postponed PM work, and service_pm_postponed 366 * will be called to handle it. The input message will be a copy of the given 367 * message. Optionally, the last spare (deadlock-resolving) thread may be used 368 * to execute the work immediately. 369 */ 370 int is_pm_work, is_pending, is_active, has_normal_work, has_pm_work; 371 372 assert(rfp != NULL); 373 374 is_pm_work = (func == NULL); 375 is_pending = (rfp->fp_flags & FP_PENDING); 376 is_active = (rfp->fp_worker != NULL); 377 has_normal_work = (rfp->fp_func != NULL); 378 has_pm_work = (rfp->fp_flags & FP_PM_WORK); 379 380 /* Sanity checks. If any of these trigger, someone messed up badly! */ 381 if (is_pending || is_active) { 382 if (is_pending && is_active) 383 panic("work cannot be both pending and active"); 384 385 /* The process cannot make more than one call at once. */ 386 if (!is_pm_work && has_normal_work) 387 panic("process has two calls (%x, %x)", 388 rfp->fp_msg.m_type, m_ptr->m_type); 389 390 /* PM will not send more than one job per process to us at once. */ 391 if (is_pm_work && has_pm_work) 392 panic("got two calls from PM (%x, %x)", 393 rfp->fp_pm_msg.m_type, m_ptr->m_type); 394 395 /* Despite PM's sys_delay_stop() system, it is possible that normal 396 * work (in particular, do_pending_pipe) arrives after postponed PM 397 * work has been scheduled for execution, so we don't check for that. 398 */ 399 #if 0 400 printf("VFS: adding %s work to %s thread\n", 401 is_pm_work ? "PM" : "normal", 402 is_pending ? "pending" : "active"); 403 #endif 404 } else { 405 /* Some cleanup step forgotten somewhere? */ 406 if (has_normal_work || has_pm_work) 407 panic("worker administration error"); 408 } 409 410 /* Save the work to be performed. */ 411 if (!is_pm_work) { 412 rfp->fp_msg = *m_ptr; 413 rfp->fp_func = func; 414 } else { 415 rfp->fp_pm_msg = *m_ptr; 416 rfp->fp_flags |= FP_PM_WORK; 417 } 418 419 /* If we have not only added to existing work, go look for a free thread. 420 * Note that we won't be using the spare thread for normal work if there is 421 * already PM work pending, but that situation will never occur in practice. 422 */ 423 if (!is_pending && !is_active) 424 worker_try_activate(rfp, use_spare); 425 } 426 427 /*===========================================================================* 428 * worker_yield * 429 *===========================================================================*/ 430 void worker_yield(void) 431 { 432 /* Yield to all worker threads. To be called from the main thread only. */ 433 434 mthread_yield_all(); 435 436 self = NULL; 437 } 438 439 /*===========================================================================* 440 * worker_sleep * 441 *===========================================================================*/ 442 static void worker_sleep(void) 443 { 444 struct worker_thread *worker = self; 445 ASSERTW(worker); 446 if (mutex_lock(&worker->w_event_mutex) != 0) 447 panic("unable to lock event mutex"); 448 if (cond_wait(&worker->w_event, &worker->w_event_mutex) != 0) 449 panic("could not wait on conditional variable"); 450 if (mutex_unlock(&worker->w_event_mutex) != 0) 451 panic("unable to unlock event mutex"); 452 self = worker; 453 } 454 455 /*===========================================================================* 456 * worker_wake * 457 *===========================================================================*/ 458 static void worker_wake(struct worker_thread *worker) 459 { 460 /* Signal a worker to wake up */ 461 ASSERTW(worker); 462 if (mutex_lock(&worker->w_event_mutex) != 0) 463 panic("unable to lock event mutex"); 464 if (cond_signal(&worker->w_event) != 0) 465 panic("unable to signal conditional variable"); 466 if (mutex_unlock(&worker->w_event_mutex) != 0) 467 panic("unable to unlock event mutex"); 468 } 469 470 /*===========================================================================* 471 * worker_suspend * 472 *===========================================================================*/ 473 struct worker_thread *worker_suspend(void) 474 { 475 /* Suspend the current thread, saving certain thread variables. Return a 476 * pointer to the thread's worker structure for later resumption. 477 */ 478 479 ASSERTW(self); 480 assert(fp != NULL); 481 assert(self->w_fp == fp); 482 assert(fp->fp_worker == self); 483 484 self->w_err_code = err_code; 485 486 return self; 487 } 488 489 /*===========================================================================* 490 * worker_resume * 491 *===========================================================================*/ 492 void worker_resume(struct worker_thread *org_self) 493 { 494 /* Resume the current thread after suspension, restoring thread variables. */ 495 496 ASSERTW(org_self); 497 498 self = org_self; 499 500 fp = self->w_fp; 501 assert(fp != NULL); 502 503 err_code = self->w_err_code; 504 } 505 506 /*===========================================================================* 507 * worker_wait * 508 *===========================================================================*/ 509 void worker_wait(void) 510 { 511 /* Put the current thread to sleep until woken up by the main thread. */ 512 513 (void) worker_suspend(); /* worker_sleep already saves and restores 'self' */ 514 515 worker_sleep(); 516 517 /* We continue here after waking up */ 518 worker_resume(self); 519 assert(self->w_next == NULL); 520 } 521 522 /*===========================================================================* 523 * worker_signal * 524 *===========================================================================*/ 525 void worker_signal(struct worker_thread *worker) 526 { 527 ASSERTW(worker); /* Make sure we have a valid thread */ 528 worker_wake(worker); 529 } 530 531 /*===========================================================================* 532 * worker_stop * 533 *===========================================================================*/ 534 void worker_stop(struct worker_thread *worker) 535 { 536 ASSERTW(worker); /* Make sure we have a valid thread */ 537 if (worker->w_task != NONE) { 538 /* This thread is communicating with a driver or file server */ 539 if (worker->w_drv_sendrec != NULL) { /* Driver */ 540 worker->w_drv_sendrec->m_type = EIO; 541 worker->w_drv_sendrec = NULL; 542 } else if (worker->w_sendrec != NULL) { /* FS */ 543 worker->w_sendrec->m_type = EIO; 544 worker->w_sendrec = NULL; 545 } else { 546 panic("reply storage consistency error"); /* Oh dear */ 547 } 548 } else { 549 /* This shouldn't happen at all... */ 550 printf("VFS: stopping worker not blocked on any task?\n"); 551 util_stacktrace(); 552 } 553 worker_wake(worker); 554 } 555 556 /*===========================================================================* 557 * worker_stop_by_endpt * 558 *===========================================================================*/ 559 void worker_stop_by_endpt(endpoint_t proc_e) 560 { 561 struct worker_thread *worker; 562 int i; 563 564 if (proc_e == NONE) return; 565 566 for (i = 0; i < NR_WTHREADS; i++) { 567 worker = &workers[i]; 568 if (worker->w_fp != NULL && worker->w_task == proc_e) 569 worker_stop(worker); 570 } 571 } 572 573 /*===========================================================================* 574 * worker_get * 575 *===========================================================================*/ 576 struct worker_thread *worker_get(thread_t worker_tid) 577 { 578 int i; 579 580 for (i = 0; i < NR_WTHREADS; i++) 581 if (workers[i].w_tid == worker_tid) 582 return(&workers[i]); 583 584 return(NULL); 585 } 586 587 /*===========================================================================* 588 * worker_set_proc * 589 *===========================================================================*/ 590 void worker_set_proc(struct fproc *rfp) 591 { 592 /* Perform an incredibly ugly action that completely violates the threading 593 * model: change the current working thread's process context to another 594 * process. The caller is expected to hold the lock to both the calling and the 595 * target process, and neither process is expected to continue regular 596 * operation when done. This code is here *only* and *strictly* for the reboot 597 * code, and *must not* be used for anything else. 598 */ 599 600 if (fp == rfp) return; 601 602 if (rfp->fp_worker != NULL) 603 panic("worker_set_proc: target process not idle"); 604 605 fp->fp_worker = NULL; 606 607 fp = rfp; 608 609 self->w_fp = rfp; 610 fp->fp_worker = self; 611 } 612