1 #include "fs.h" 2 #include <assert.h> 3 4 static void worker_get_work(void); 5 static void *worker_main(void *arg); 6 static void worker_sleep(void); 7 static void worker_wake(struct worker_thread *worker); 8 9 static mthread_attr_t tattr; 10 static unsigned int pending; 11 static unsigned int busy; 12 static int block_all; 13 14 #ifdef MKCOVERAGE 15 # define TH_STACKSIZE (40 * 1024) 16 #else 17 # define TH_STACKSIZE (28 * 1024) 18 #endif 19 20 #define ASSERTW(w) assert((w) >= &workers[0] && (w) < &workers[NR_WTHREADS]) 21 22 /*===========================================================================* 23 * worker_init * 24 *===========================================================================*/ 25 void worker_init(void) 26 { 27 /* Initialize worker thread */ 28 struct worker_thread *wp; 29 int i; 30 31 if (mthread_attr_init(&tattr) != 0) 32 panic("failed to initialize attribute"); 33 if (mthread_attr_setstacksize(&tattr, TH_STACKSIZE) != 0) 34 panic("couldn't set default thread stack size"); 35 if (mthread_attr_setdetachstate(&tattr, MTHREAD_CREATE_DETACHED) != 0) 36 panic("couldn't set default thread detach state"); 37 pending = 0; 38 busy = 0; 39 block_all = FALSE; 40 41 for (i = 0; i < NR_WTHREADS; i++) { 42 wp = &workers[i]; 43 44 wp->w_fp = NULL; /* Mark not in use */ 45 wp->w_next = NULL; 46 wp->w_task = NONE; 47 if (mutex_init(&wp->w_event_mutex, NULL) != 0) 48 panic("failed to initialize mutex"); 49 if (cond_init(&wp->w_event, NULL) != 0) 50 panic("failed to initialize conditional variable"); 51 if (mthread_create(&wp->w_tid, &tattr, worker_main, (void *) wp) != 0) 52 panic("unable to start thread"); 53 } 54 55 /* Let all threads get ready to accept work. */ 56 yield_all(); 57 } 58 59 /*===========================================================================* 60 * worker_assign * 61 *===========================================================================*/ 62 static void worker_assign(struct fproc *rfp) 63 { 64 /* Assign the work for the given process to a free thread. The caller must 65 * ensure that there is in fact at least one free thread. 66 */ 67 struct worker_thread *worker; 68 int i; 69 70 /* Find a free worker thread. */ 71 for (i = 0; i < NR_WTHREADS; i++) { 72 worker = &workers[i]; 73 74 if (worker->w_fp == NULL) 75 break; 76 } 77 assert(worker != NULL); 78 79 /* Assign work to it. */ 80 rfp->fp_worker = worker; 81 worker->w_fp = rfp; 82 busy++; 83 84 worker_wake(worker); 85 } 86 87 /*===========================================================================* 88 * worker_may_do_pending * 89 *===========================================================================*/ 90 static int worker_may_do_pending(void) 91 { 92 /* Return whether there is a free thread that may do pending work. This is true 93 * only if there is pending work at all, and there is a free non-spare thread 94 * (the spare thread is never used for pending work), and VFS is currently 95 * processing new requests at all (this may not be true during initialization). 96 */ 97 98 /* Ordered by likelihood to be false. */ 99 return (pending > 0 && worker_available() > 1 && !block_all); 100 } 101 102 /*===========================================================================* 103 * worker_allow * 104 *===========================================================================*/ 105 void worker_allow(int allow) 106 { 107 /* Allow or disallow workers to process new work. If disallowed, any new work 108 * will be stored as pending, even when there are free worker threads. There is 109 * no facility to stop active workers. To be used only during initialization! 110 */ 111 struct fproc *rfp; 112 113 block_all = !allow; 114 115 if (!worker_may_do_pending()) 116 return; 117 118 /* Assign any pending work to workers. */ 119 for (rfp = &fproc[0]; rfp < &fproc[NR_PROCS]; rfp++) { 120 if (rfp->fp_flags & FP_PENDING) { 121 rfp->fp_flags &= ~FP_PENDING; /* No longer pending */ 122 assert(pending > 0); 123 pending--; 124 worker_assign(rfp); 125 126 if (!worker_may_do_pending()) 127 return; 128 } 129 } 130 } 131 132 /*===========================================================================* 133 * worker_get_work * 134 *===========================================================================*/ 135 static void worker_get_work(void) 136 { 137 /* Find new work to do. Work can be 'queued', 'pending', or absent. In the 138 * latter case wait for new work to come in. 139 */ 140 struct fproc *rfp; 141 142 assert(self->w_fp == NULL); 143 144 /* Is there pending work, and should we do it? */ 145 if (worker_may_do_pending()) { 146 /* Find pending work */ 147 for (rfp = &fproc[0]; rfp < &fproc[NR_PROCS]; rfp++) { 148 if (rfp->fp_flags & FP_PENDING) { 149 self->w_fp = rfp; 150 rfp->fp_worker = self; 151 busy++; 152 rfp->fp_flags &= ~FP_PENDING; /* No longer pending */ 153 assert(pending > 0); 154 pending--; 155 return; 156 } 157 } 158 panic("Pending work inconsistency"); 159 } 160 161 /* Wait for work to come to us */ 162 worker_sleep(); 163 } 164 165 /*===========================================================================* 166 * worker_available * 167 *===========================================================================*/ 168 int worker_available(void) 169 { 170 /* Return the number of threads that are available, including the spare thread. 171 */ 172 173 return(NR_WTHREADS - busy); 174 } 175 176 /*===========================================================================* 177 * worker_main * 178 *===========================================================================*/ 179 static void *worker_main(void *arg) 180 { 181 /* Worker thread main loop */ 182 183 self = (struct worker_thread *) arg; 184 ASSERTW(self); 185 186 while(TRUE) { 187 worker_get_work(); 188 189 fp = self->w_fp; 190 assert(fp->fp_worker == self); 191 192 /* Lock the process. */ 193 lock_proc(fp); 194 195 /* The following two blocks could be run in a loop until both the 196 * conditions are no longer met, but it is currently impossible that 197 * more normal work is present after postponed PM work has been done. 198 */ 199 200 /* Perform normal work, if any. */ 201 if (fp->fp_func != NULL) { 202 self->w_m_in = fp->fp_msg; 203 err_code = OK; 204 205 fp->fp_func(); 206 207 fp->fp_func = NULL; /* deliberately unset AFTER the call */ 208 } 209 210 /* Perform postponed PM work, if any. */ 211 if (fp->fp_flags & FP_PM_WORK) { 212 self->w_m_in = fp->fp_pm_msg; 213 214 service_pm_postponed(); 215 216 fp->fp_flags &= ~FP_PM_WORK; 217 } 218 219 /* Perform cleanup actions. */ 220 thread_cleanup(); 221 222 unlock_proc(fp); 223 224 fp->fp_worker = NULL; 225 self->w_fp = NULL; 226 assert(busy > 0); 227 busy--; 228 } 229 230 return(NULL); /* Unreachable */ 231 } 232 233 /*===========================================================================* 234 * worker_can_start * 235 *===========================================================================*/ 236 int worker_can_start(struct fproc *rfp) 237 { 238 /* Return whether normal (non-PM) work can be started for the given process. 239 * This function is used to serialize invocation of "special" procedures, and 240 * not entirely safe for other cases, as explained in the comments below. 241 */ 242 int is_pending, is_active, has_normal_work, has_pm_work; 243 244 is_pending = (rfp->fp_flags & FP_PENDING); 245 is_active = (rfp->fp_worker != NULL); 246 has_normal_work = (rfp->fp_func != NULL); 247 has_pm_work = (rfp->fp_flags & FP_PM_WORK); 248 249 /* If there is no work scheduled for the process, we can start work. */ 250 if (!is_pending && !is_active) return TRUE; 251 252 /* If there is already normal work scheduled for the process, we cannot add 253 * more, since we support only one normal job per process. 254 */ 255 if (has_normal_work) return FALSE; 256 257 /* If this process has pending PM work but no normal work, we can add the 258 * normal work for execution before the worker will start. 259 */ 260 if (is_pending) return TRUE; 261 262 /* However, if a worker is active for PM work, we cannot add normal work 263 * either, because the work will not be considered. For this reason, we can 264 * not use this function for processes that can possibly get postponed PM 265 * work. It is still safe for core system processes, though. 266 */ 267 return FALSE; 268 } 269 270 /*===========================================================================* 271 * worker_try_activate * 272 *===========================================================================*/ 273 static void worker_try_activate(struct fproc *rfp, int use_spare) 274 { 275 /* See if we can wake up a thread to do the work scheduled for the given 276 * process. If not, mark the process as having pending work for later. 277 */ 278 int needed; 279 280 /* Use the last available thread only if requested. Otherwise, leave at least 281 * one spare thread for deadlock resolution. 282 */ 283 needed = use_spare ? 1 : 2; 284 285 /* Also make sure that doing new work is allowed at all right now, which may 286 * not be the case during VFS initialization. We do always allow callback 287 * calls, i.e., calls that may use the spare thread. The reason is that we do 288 * not support callback calls being marked as pending, so the (entirely 289 * theoretical) exception here may (entirely theoretically) avoid deadlocks. 290 */ 291 if (needed <= worker_available() && (!block_all || use_spare)) { 292 worker_assign(rfp); 293 } else { 294 rfp->fp_flags |= FP_PENDING; 295 pending++; 296 } 297 } 298 299 /*===========================================================================* 300 * worker_start * 301 *===========================================================================*/ 302 void worker_start(struct fproc *rfp, void (*func)(void), message *m_ptr, 303 int use_spare) 304 { 305 /* Schedule work to be done by a worker thread. The work is bound to the given 306 * process. If a function pointer is given, the work is considered normal work, 307 * and the function will be called to handle it. If the function pointer is 308 * NULL, the work is considered postponed PM work, and service_pm_postponed 309 * will be called to handle it. The input message will be a copy of the given 310 * message. Optionally, the last spare (deadlock-resolving) thread may be used 311 * to execute the work immediately. 312 */ 313 int is_pm_work, is_pending, is_active, has_normal_work, has_pm_work; 314 315 assert(rfp != NULL); 316 317 is_pm_work = (func == NULL); 318 is_pending = (rfp->fp_flags & FP_PENDING); 319 is_active = (rfp->fp_worker != NULL); 320 has_normal_work = (rfp->fp_func != NULL); 321 has_pm_work = (rfp->fp_flags & FP_PM_WORK); 322 323 /* Sanity checks. If any of these trigger, someone messed up badly! */ 324 if (is_pending || is_active) { 325 if (is_pending && is_active) 326 panic("work cannot be both pending and active"); 327 328 /* The process cannot make more than one call at once. */ 329 if (!is_pm_work && has_normal_work) 330 panic("process has two calls (%x, %x)", 331 rfp->fp_msg.m_type, m_ptr->m_type); 332 333 /* PM will not send more than one job per process to us at once. */ 334 if (is_pm_work && has_pm_work) 335 panic("got two calls from PM (%x, %x)", 336 rfp->fp_pm_msg.m_type, m_ptr->m_type); 337 338 /* Despite PM's sys_delay_stop() system, it is possible that normal 339 * work (in particular, do_pending_pipe) arrives after postponed PM 340 * work has been scheduled for execution, so we don't check for that. 341 */ 342 #if 0 343 printf("VFS: adding %s work to %s thread\n", 344 is_pm_work ? "PM" : "normal", 345 is_pending ? "pending" : "active"); 346 #endif 347 } else { 348 /* Some cleanup step forgotten somewhere? */ 349 if (has_normal_work || has_pm_work) 350 panic("worker administration error"); 351 } 352 353 /* Save the work to be performed. */ 354 if (!is_pm_work) { 355 rfp->fp_msg = *m_ptr; 356 rfp->fp_func = func; 357 } else { 358 rfp->fp_pm_msg = *m_ptr; 359 rfp->fp_flags |= FP_PM_WORK; 360 } 361 362 /* If we have not only added to existing work, go look for a free thread. 363 * Note that we won't be using the spare thread for normal work if there is 364 * already PM work pending, but that situation will never occur in practice. 365 */ 366 if (!is_pending && !is_active) 367 worker_try_activate(rfp, use_spare); 368 } 369 370 /*===========================================================================* 371 * worker_sleep * 372 *===========================================================================*/ 373 static void worker_sleep(void) 374 { 375 struct worker_thread *worker = self; 376 ASSERTW(worker); 377 if (mutex_lock(&worker->w_event_mutex) != 0) 378 panic("unable to lock event mutex"); 379 if (cond_wait(&worker->w_event, &worker->w_event_mutex) != 0) 380 panic("could not wait on conditional variable"); 381 if (mutex_unlock(&worker->w_event_mutex) != 0) 382 panic("unable to unlock event mutex"); 383 self = worker; 384 } 385 386 /*===========================================================================* 387 * worker_wake * 388 *===========================================================================*/ 389 static void worker_wake(struct worker_thread *worker) 390 { 391 /* Signal a worker to wake up */ 392 ASSERTW(worker); 393 if (mutex_lock(&worker->w_event_mutex) != 0) 394 panic("unable to lock event mutex"); 395 if (cond_signal(&worker->w_event) != 0) 396 panic("unable to signal conditional variable"); 397 if (mutex_unlock(&worker->w_event_mutex) != 0) 398 panic("unable to unlock event mutex"); 399 } 400 401 /*===========================================================================* 402 * worker_suspend * 403 *===========================================================================*/ 404 struct worker_thread *worker_suspend(void) 405 { 406 /* Suspend the current thread, saving certain thread variables. Return a 407 * pointer to the thread's worker structure for later resumption. 408 */ 409 410 ASSERTW(self); 411 assert(fp != NULL); 412 assert(self->w_fp == fp); 413 assert(fp->fp_worker == self); 414 415 self->w_err_code = err_code; 416 417 return self; 418 } 419 420 /*===========================================================================* 421 * worker_resume * 422 *===========================================================================*/ 423 void worker_resume(struct worker_thread *org_self) 424 { 425 /* Resume the current thread after suspension, restoring thread variables. */ 426 427 ASSERTW(org_self); 428 429 self = org_self; 430 431 fp = self->w_fp; 432 assert(fp != NULL); 433 434 err_code = self->w_err_code; 435 } 436 437 /*===========================================================================* 438 * worker_wait * 439 *===========================================================================*/ 440 void worker_wait(void) 441 { 442 /* Put the current thread to sleep until woken up by the main thread. */ 443 444 (void) worker_suspend(); /* worker_sleep already saves and restores 'self' */ 445 446 worker_sleep(); 447 448 /* We continue here after waking up */ 449 worker_resume(self); 450 assert(self->w_next == NULL); 451 } 452 453 /*===========================================================================* 454 * worker_signal * 455 *===========================================================================*/ 456 void worker_signal(struct worker_thread *worker) 457 { 458 ASSERTW(worker); /* Make sure we have a valid thread */ 459 worker_wake(worker); 460 } 461 462 /*===========================================================================* 463 * worker_stop * 464 *===========================================================================*/ 465 void worker_stop(struct worker_thread *worker) 466 { 467 ASSERTW(worker); /* Make sure we have a valid thread */ 468 if (worker->w_task != NONE) { 469 /* This thread is communicating with a driver or file server */ 470 if (worker->w_drv_sendrec != NULL) { /* Driver */ 471 worker->w_drv_sendrec->m_type = EIO; 472 worker->w_drv_sendrec = NULL; 473 } else if (worker->w_sendrec != NULL) { /* FS */ 474 worker->w_sendrec->m_type = EIO; 475 worker->w_sendrec = NULL; 476 } else { 477 panic("reply storage consistency error"); /* Oh dear */ 478 } 479 } else { 480 /* This shouldn't happen at all... */ 481 printf("VFS: stopping worker not blocked on any task?\n"); 482 util_stacktrace(); 483 } 484 worker_wake(worker); 485 } 486 487 /*===========================================================================* 488 * worker_stop_by_endpt * 489 *===========================================================================*/ 490 void worker_stop_by_endpt(endpoint_t proc_e) 491 { 492 struct worker_thread *worker; 493 int i; 494 495 if (proc_e == NONE) return; 496 497 for (i = 0; i < NR_WTHREADS; i++) { 498 worker = &workers[i]; 499 if (worker->w_fp != NULL && worker->w_task == proc_e) 500 worker_stop(worker); 501 } 502 } 503 504 /*===========================================================================* 505 * worker_get * 506 *===========================================================================*/ 507 struct worker_thread *worker_get(thread_t worker_tid) 508 { 509 int i; 510 511 for (i = 0; i < NR_WTHREADS; i++) 512 if (workers[i].w_tid == worker_tid) 513 return(&workers[i]); 514 515 return(NULL); 516 } 517 518 /*===========================================================================* 519 * worker_set_proc * 520 *===========================================================================*/ 521 void worker_set_proc(struct fproc *rfp) 522 { 523 /* Perform an incredibly ugly action that completely violates the threading 524 * model: change the current working thread's process context to another 525 * process. The caller is expected to hold the lock to both the calling and the 526 * target process, and neither process is expected to continue regular 527 * operation when done. This code is here *only* and *strictly* for the reboot 528 * code, and *must not* be used for anything else. 529 */ 530 531 if (fp == rfp) return; 532 533 if (rfp->fp_worker != NULL) 534 panic("worker_set_proc: target process not idle"); 535 536 fp->fp_worker = NULL; 537 538 fp = rfp; 539 540 self->w_fp = rfp; 541 fp->fp_worker = self; 542 } 543