1 #include "fs.h"
2 #include <string.h>
3 #include <assert.h>
4
5 static void *worker_main(void *arg);
6 static void worker_sleep(void);
7 static void worker_wake(struct worker_thread *worker);
8
9 static mthread_attr_t tattr;
10 static unsigned int pending;
11 static unsigned int busy;
12 static int block_all;
13
14 #if defined(_MINIX_MAGIC)
15 # define TH_STACKSIZE (64 * 1024)
16 #elif defined(MKCOVERAGE)
17 # define TH_STACKSIZE (40 * 1024)
18 #else
19 # define TH_STACKSIZE (28 * 1024)
20 #endif
21
22 #define ASSERTW(w) assert((w) >= &workers[0] && (w) < &workers[NR_WTHREADS])
23
24 /*===========================================================================*
25 * worker_init *
26 *===========================================================================*/
worker_init(void)27 void worker_init(void)
28 {
29 /* Initialize worker threads */
30 struct worker_thread *wp;
31 int i;
32
33 if (mthread_attr_init(&tattr) != 0)
34 panic("failed to initialize attribute");
35 if (mthread_attr_setstacksize(&tattr, TH_STACKSIZE) != 0)
36 panic("couldn't set default thread stack size");
37
38 pending = 0;
39 busy = 0;
40 block_all = FALSE;
41
42 for (i = 0; i < NR_WTHREADS; i++) {
43 wp = &workers[i];
44
45 wp->w_fp = NULL; /* Mark not in use */
46 wp->w_next = NULL;
47 wp->w_task = NONE;
48 if (mutex_init(&wp->w_event_mutex, NULL) != 0)
49 panic("failed to initialize mutex");
50 if (cond_init(&wp->w_event, NULL) != 0)
51 panic("failed to initialize condition variable");
52 if (mthread_create(&wp->w_tid, &tattr, worker_main, (void *) wp) != 0)
53 panic("unable to start thread");
54 }
55
56 /* Let all threads get ready to accept work. */
57 worker_yield();
58 }
59
60 /*===========================================================================*
61 * worker_cleanup *
62 *===========================================================================*/
worker_cleanup(void)63 void worker_cleanup(void)
64 {
65 /* Clean up worker threads, reversing the actions of worker_init() such that
66 * we can safely call worker_init() again later. All worker threads are
67 * expected to be idle already. Used for live updates, because transferring
68 * the thread stacks from one version to another is currently not feasible.
69 */
70 struct worker_thread *wp;
71 int i;
72
73 assert(worker_idle());
74
75 /* First terminate all threads. */
76 for (i = 0; i < NR_WTHREADS; i++) {
77 wp = &workers[i];
78
79 assert(wp->w_fp == NULL);
80
81 /* Waking up the thread with no w_fp will cause it to exit. */
82 worker_wake(wp);
83 }
84
85 worker_yield();
86
87 /* Then clean up their resources. */
88 for (i = 0; i < NR_WTHREADS; i++) {
89 wp = &workers[i];
90
91 if (mthread_join(wp->w_tid, NULL) != 0)
92 panic("worker_cleanup: could not join thread %d", i);
93 if (cond_destroy(&wp->w_event) != 0)
94 panic("failed to destroy condition variable");
95 if (mutex_destroy(&wp->w_event_mutex) != 0)
96 panic("failed to destroy mutex");
97 }
98
99 /* Finally, clean up global resources. */
100 if (mthread_attr_destroy(&tattr) != 0)
101 panic("failed to destroy attribute");
102
103 memset(workers, 0, sizeof(workers));
104 }
105
106 /*===========================================================================*
107 * worker_idle *
108 *===========================================================================*/
worker_idle(void)109 int worker_idle(void)
110 {
111 /* Return whether all worker threads are idle. */
112
113 return (pending == 0 && busy == 0);
114 }
115
116 /*===========================================================================*
117 * worker_assign *
118 *===========================================================================*/
worker_assign(struct fproc * rfp)119 static void worker_assign(struct fproc *rfp)
120 {
121 /* Assign the work for the given process to a free thread. The caller must
122 * ensure that there is in fact at least one free thread.
123 */
124 struct worker_thread *worker;
125 int i;
126
127 /* Find a free worker thread. */
128 for (i = 0; i < NR_WTHREADS; i++) {
129 worker = &workers[i];
130
131 if (worker->w_fp == NULL)
132 break;
133 }
134 assert(worker != NULL);
135
136 /* Assign work to it. */
137 rfp->fp_worker = worker;
138 worker->w_fp = rfp;
139 busy++;
140
141 worker_wake(worker);
142 }
143
144 /*===========================================================================*
145 * worker_may_do_pending *
146 *===========================================================================*/
worker_may_do_pending(void)147 static int worker_may_do_pending(void)
148 {
149 /* Return whether there is a free thread that may do pending work. This is true
150 * only if there is pending work at all, and there is a free non-spare thread
151 * (the spare thread is never used for pending work), and VFS is currently
152 * processing new requests at all (this may not be true during initialization).
153 */
154
155 /* Ordered by likelihood to be false. */
156 return (pending > 0 && worker_available() > 1 && !block_all);
157 }
158
159 /*===========================================================================*
160 * worker_allow *
161 *===========================================================================*/
worker_allow(int allow)162 void worker_allow(int allow)
163 {
164 /* Allow or disallow workers to process new work. If disallowed, any new work
165 * will be stored as pending, even when there are free worker threads. There is
166 * no facility to stop active workers. To be used only during initialization!
167 */
168 struct fproc *rfp;
169
170 block_all = !allow;
171
172 if (!worker_may_do_pending())
173 return;
174
175 /* Assign any pending work to workers. */
176 for (rfp = &fproc[0]; rfp < &fproc[NR_PROCS]; rfp++) {
177 if (rfp->fp_flags & FP_PENDING) {
178 rfp->fp_flags &= ~FP_PENDING; /* No longer pending */
179 assert(pending > 0);
180 pending--;
181 worker_assign(rfp);
182
183 if (!worker_may_do_pending())
184 return;
185 }
186 }
187 }
188
189 /*===========================================================================*
190 * worker_get_work *
191 *===========================================================================*/
worker_get_work(void)192 static int worker_get_work(void)
193 {
194 /* Find new work to do. Work can be 'queued', 'pending', or absent. In the
195 * latter case wait for new work to come in. Return TRUE if there is work to
196 * do, or FALSE if the current thread is requested to shut down.
197 */
198 struct fproc *rfp;
199
200 assert(self->w_fp == NULL);
201
202 /* Is there pending work, and should we do it? */
203 if (worker_may_do_pending()) {
204 /* Find pending work */
205 for (rfp = &fproc[0]; rfp < &fproc[NR_PROCS]; rfp++) {
206 if (rfp->fp_flags & FP_PENDING) {
207 self->w_fp = rfp;
208 rfp->fp_worker = self;
209 busy++;
210 rfp->fp_flags &= ~FP_PENDING; /* No longer pending */
211 assert(pending > 0);
212 pending--;
213 return TRUE;
214 }
215 }
216 panic("Pending work inconsistency");
217 }
218
219 /* Wait for work to come to us */
220 worker_sleep();
221
222 return (self->w_fp != NULL);
223 }
224
225 /*===========================================================================*
226 * worker_available *
227 *===========================================================================*/
worker_available(void)228 int worker_available(void)
229 {
230 /* Return the number of threads that are available, including the spare thread.
231 */
232
233 return(NR_WTHREADS - busy);
234 }
235
236 /*===========================================================================*
237 * worker_main *
238 *===========================================================================*/
worker_main(void * arg)239 static void *worker_main(void *arg)
240 {
241 /* Worker thread main loop */
242
243 self = (struct worker_thread *) arg;
244 ASSERTW(self);
245
246 while (worker_get_work()) {
247
248 fp = self->w_fp;
249 assert(fp->fp_worker == self);
250
251 /* Lock the process. */
252 lock_proc(fp);
253
254 /* The following two blocks could be run in a loop until both the
255 * conditions are no longer met, but it is currently impossible that
256 * more normal work is present after postponed PM work has been done.
257 */
258
259 /* Perform normal work, if any. */
260 if (fp->fp_func != NULL) {
261 self->w_m_in = fp->fp_msg;
262 err_code = OK;
263
264 fp->fp_func();
265
266 fp->fp_func = NULL; /* deliberately unset AFTER the call */
267 }
268
269 /* Perform postponed PM work, if any. */
270 if (fp->fp_flags & FP_PM_WORK) {
271 self->w_m_in = fp->fp_pm_msg;
272
273 service_pm_postponed();
274
275 fp->fp_flags &= ~FP_PM_WORK;
276 }
277
278 /* Perform cleanup actions. */
279 thread_cleanup();
280
281 unlock_proc(fp);
282
283 fp->fp_worker = NULL;
284 self->w_fp = NULL;
285 assert(busy > 0);
286 busy--;
287 }
288
289 return(NULL);
290 }
291
292 /*===========================================================================*
293 * worker_can_start *
294 *===========================================================================*/
worker_can_start(struct fproc * rfp)295 int worker_can_start(struct fproc *rfp)
296 {
297 /* Return whether normal (non-PM) work can be started for the given process.
298 * This function is used to serialize invocation of "special" procedures, and
299 * not entirely safe for other cases, as explained in the comments below.
300 */
301 int is_pending, is_active, has_normal_work;
302
303 is_pending = (rfp->fp_flags & FP_PENDING);
304 is_active = (rfp->fp_worker != NULL);
305 has_normal_work = (rfp->fp_func != NULL);
306
307 /* If there is no work scheduled for the process, we can start work. */
308 if (!is_pending && !is_active) return TRUE;
309
310 /* If there is already normal work scheduled for the process, we cannot add
311 * more, since we support only one normal job per process.
312 */
313 if (has_normal_work) return FALSE;
314
315 /* If this process has pending PM work but no normal work, we can add the
316 * normal work for execution before the worker will start.
317 */
318 if (is_pending) return TRUE;
319
320 /* However, if a worker is active for PM work, we cannot add normal work
321 * either, because the work will not be considered. For this reason, we can
322 * not use this function for processes that can possibly get postponed PM
323 * work. It is still safe for core system processes, though.
324 */
325 return FALSE;
326 }
327
328 /*===========================================================================*
329 * worker_try_activate *
330 *===========================================================================*/
worker_try_activate(struct fproc * rfp,int use_spare)331 static void worker_try_activate(struct fproc *rfp, int use_spare)
332 {
333 /* See if we can wake up a thread to do the work scheduled for the given
334 * process. If not, mark the process as having pending work for later.
335 */
336 int needed;
337
338 /* Use the last available thread only if requested. Otherwise, leave at least
339 * one spare thread for deadlock resolution.
340 */
341 needed = use_spare ? 1 : 2;
342
343 /* Also make sure that doing new work is allowed at all right now, which may
344 * not be the case during VFS initialization. We do always allow callback
345 * calls, i.e., calls that may use the spare thread. The reason is that we do
346 * not support callback calls being marked as pending, so the (entirely
347 * theoretical) exception here may (entirely theoretically) avoid deadlocks.
348 */
349 if (needed <= worker_available() && (!block_all || use_spare)) {
350 worker_assign(rfp);
351 } else {
352 rfp->fp_flags |= FP_PENDING;
353 pending++;
354 }
355 }
356
357 /*===========================================================================*
358 * worker_start *
359 *===========================================================================*/
worker_start(struct fproc * rfp,void (* func)(void),message * m_ptr,int use_spare)360 void worker_start(struct fproc *rfp, void (*func)(void), message *m_ptr,
361 int use_spare)
362 {
363 /* Schedule work to be done by a worker thread. The work is bound to the given
364 * process. If a function pointer is given, the work is considered normal work,
365 * and the function will be called to handle it. If the function pointer is
366 * NULL, the work is considered postponed PM work, and service_pm_postponed
367 * will be called to handle it. The input message will be a copy of the given
368 * message. Optionally, the last spare (deadlock-resolving) thread may be used
369 * to execute the work immediately.
370 */
371 int is_pm_work, is_pending, is_active, has_normal_work, has_pm_work;
372
373 assert(rfp != NULL);
374
375 is_pm_work = (func == NULL);
376 is_pending = (rfp->fp_flags & FP_PENDING);
377 is_active = (rfp->fp_worker != NULL);
378 has_normal_work = (rfp->fp_func != NULL);
379 has_pm_work = (rfp->fp_flags & FP_PM_WORK);
380
381 /* Sanity checks. If any of these trigger, someone messed up badly! */
382 if (is_pending || is_active) {
383 if (is_pending && is_active)
384 panic("work cannot be both pending and active");
385
386 /* The process cannot make more than one call at once. */
387 if (!is_pm_work && has_normal_work)
388 panic("process has two calls (%x, %x)",
389 rfp->fp_msg.m_type, m_ptr->m_type);
390
391 /* PM will not send more than one job per process to us at once. */
392 if (is_pm_work && has_pm_work)
393 panic("got two calls from PM (%x, %x)",
394 rfp->fp_pm_msg.m_type, m_ptr->m_type);
395
396 /* Despite PM's sys_delay_stop() system, it is possible that normal
397 * work (in particular, do_pending_pipe) arrives after postponed PM
398 * work has been scheduled for execution, so we don't check for that.
399 */
400 #if 0
401 printf("VFS: adding %s work to %s thread\n",
402 is_pm_work ? "PM" : "normal",
403 is_pending ? "pending" : "active");
404 #endif
405 } else {
406 /* Some cleanup step forgotten somewhere? */
407 if (has_normal_work || has_pm_work)
408 panic("worker administration error");
409 }
410
411 /* Save the work to be performed. */
412 if (!is_pm_work) {
413 rfp->fp_msg = *m_ptr;
414 rfp->fp_func = func;
415 } else {
416 rfp->fp_pm_msg = *m_ptr;
417 rfp->fp_flags |= FP_PM_WORK;
418 }
419
420 /* If we have not only added to existing work, go look for a free thread.
421 * Note that we won't be using the spare thread for normal work if there is
422 * already PM work pending, but that situation will never occur in practice.
423 */
424 if (!is_pending && !is_active)
425 worker_try_activate(rfp, use_spare);
426 }
427
428 /*===========================================================================*
429 * worker_yield *
430 *===========================================================================*/
worker_yield(void)431 void worker_yield(void)
432 {
433 /* Yield to all worker threads. To be called from the main thread only. */
434
435 mthread_yield_all();
436
437 self = NULL;
438 }
439
440 /*===========================================================================*
441 * worker_sleep *
442 *===========================================================================*/
worker_sleep(void)443 static void worker_sleep(void)
444 {
445 struct worker_thread *worker = self;
446 ASSERTW(worker);
447 if (mutex_lock(&worker->w_event_mutex) != 0)
448 panic("unable to lock event mutex");
449 if (cond_wait(&worker->w_event, &worker->w_event_mutex) != 0)
450 panic("could not wait on conditional variable");
451 if (mutex_unlock(&worker->w_event_mutex) != 0)
452 panic("unable to unlock event mutex");
453 self = worker;
454 }
455
456 /*===========================================================================*
457 * worker_wake *
458 *===========================================================================*/
worker_wake(struct worker_thread * worker)459 static void worker_wake(struct worker_thread *worker)
460 {
461 /* Signal a worker to wake up */
462 ASSERTW(worker);
463 if (mutex_lock(&worker->w_event_mutex) != 0)
464 panic("unable to lock event mutex");
465 if (cond_signal(&worker->w_event) != 0)
466 panic("unable to signal conditional variable");
467 if (mutex_unlock(&worker->w_event_mutex) != 0)
468 panic("unable to unlock event mutex");
469 }
470
471 /*===========================================================================*
472 * worker_suspend *
473 *===========================================================================*/
worker_suspend(void)474 struct worker_thread *worker_suspend(void)
475 {
476 /* Suspend the current thread, saving certain thread variables. Return a
477 * pointer to the thread's worker structure for later resumption.
478 */
479
480 ASSERTW(self);
481 assert(fp != NULL);
482 assert(self->w_fp == fp);
483 assert(fp->fp_worker == self);
484
485 self->w_err_code = err_code;
486
487 return self;
488 }
489
490 /*===========================================================================*
491 * worker_resume *
492 *===========================================================================*/
worker_resume(struct worker_thread * org_self)493 void worker_resume(struct worker_thread *org_self)
494 {
495 /* Resume the current thread after suspension, restoring thread variables. */
496
497 ASSERTW(org_self);
498
499 self = org_self;
500
501 fp = self->w_fp;
502 assert(fp != NULL);
503
504 err_code = self->w_err_code;
505 }
506
507 /*===========================================================================*
508 * worker_wait *
509 *===========================================================================*/
worker_wait(void)510 void worker_wait(void)
511 {
512 /* Put the current thread to sleep until woken up by the main thread. */
513
514 (void) worker_suspend(); /* worker_sleep already saves and restores 'self' */
515
516 worker_sleep();
517
518 /* We continue here after waking up */
519 worker_resume(self);
520 assert(self->w_next == NULL);
521 }
522
523 /*===========================================================================*
524 * worker_signal *
525 *===========================================================================*/
worker_signal(struct worker_thread * worker)526 void worker_signal(struct worker_thread *worker)
527 {
528 ASSERTW(worker); /* Make sure we have a valid thread */
529 worker_wake(worker);
530 }
531
532 /*===========================================================================*
533 * worker_stop *
534 *===========================================================================*/
worker_stop(struct worker_thread * worker)535 void worker_stop(struct worker_thread *worker)
536 {
537 ASSERTW(worker); /* Make sure we have a valid thread */
538 /* This thread is communicating with a driver or file server */
539 if (worker->w_drv_sendrec != NULL) { /* Driver */
540 assert(worker->w_task != NONE);
541 worker->w_drv_sendrec->m_type = EIO;
542 worker->w_drv_sendrec = NULL;
543 } else if (worker->w_sendrec != NULL) { /* FS */
544 /* worker->w_task may be NONE if the FS message was still queued */
545 worker->w_sendrec->m_type = EIO;
546 worker->w_sendrec = NULL;
547 } else
548 panic("reply storage consistency error"); /* Oh dear */
549 worker_wake(worker);
550 }
551
552 /*===========================================================================*
553 * worker_stop_by_endpt *
554 *===========================================================================*/
worker_stop_by_endpt(endpoint_t proc_e)555 void worker_stop_by_endpt(endpoint_t proc_e)
556 {
557 struct worker_thread *worker;
558 int i;
559
560 if (proc_e == NONE) return;
561
562 for (i = 0; i < NR_WTHREADS; i++) {
563 worker = &workers[i];
564 if (worker->w_fp != NULL && worker->w_task == proc_e)
565 worker_stop(worker);
566 }
567 }
568
569 /*===========================================================================*
570 * worker_get *
571 *===========================================================================*/
worker_get(thread_t worker_tid)572 struct worker_thread *worker_get(thread_t worker_tid)
573 {
574 int i;
575
576 for (i = 0; i < NR_WTHREADS; i++)
577 if (workers[i].w_tid == worker_tid)
578 return(&workers[i]);
579
580 return(NULL);
581 }
582
583 /*===========================================================================*
584 * worker_set_proc *
585 *===========================================================================*/
worker_set_proc(struct fproc * rfp)586 void worker_set_proc(struct fproc *rfp)
587 {
588 /* Perform an incredibly ugly action that completely violates the threading
589 * model: change the current working thread's process context to another
590 * process. The caller is expected to hold the lock to both the calling and the
591 * target process, and neither process is expected to continue regular
592 * operation when done. This code is here *only* and *strictly* for the reboot
593 * code, and *must not* be used for anything else.
594 */
595
596 if (fp == rfp) return;
597
598 if (rfp->fp_worker != NULL)
599 panic("worker_set_proc: target process not idle");
600
601 fp->fp_worker = NULL;
602
603 fp = rfp;
604
605 self->w_fp = rfp;
606 fp->fp_worker = self;
607 }
608