xref: /minix3/minix/servers/vfs/worker.c (revision 0b98e8aad89f2bd4ba80b523d73cf29e9dd82ce1)
1 #include "fs.h"
2 #include <string.h>
3 #include <assert.h>
4 
5 static void *worker_main(void *arg);
6 static void worker_sleep(void);
7 static void worker_wake(struct worker_thread *worker);
8 
9 static mthread_attr_t tattr;
10 static unsigned int pending;
11 static unsigned int busy;
12 static int block_all;
13 
14 #ifdef MKCOVERAGE
15 # define TH_STACKSIZE (40 * 1024)
16 #else
17 # define TH_STACKSIZE (28 * 1024)
18 #endif
19 
20 #define ASSERTW(w) assert((w) >= &workers[0] && (w) < &workers[NR_WTHREADS])
21 
22 /*===========================================================================*
23  *				worker_init				     *
24  *===========================================================================*/
25 void worker_init(void)
26 {
27 /* Initialize worker threads */
28   struct worker_thread *wp;
29   int i;
30 
31   if (mthread_attr_init(&tattr) != 0)
32 	panic("failed to initialize attribute");
33   if (mthread_attr_setstacksize(&tattr, TH_STACKSIZE) != 0)
34 	panic("couldn't set default thread stack size");
35 
36   pending = 0;
37   busy = 0;
38   block_all = FALSE;
39 
40   for (i = 0; i < NR_WTHREADS; i++) {
41 	wp = &workers[i];
42 
43 	wp->w_fp = NULL;		/* Mark not in use */
44 	wp->w_next = NULL;
45 	wp->w_task = NONE;
46 	if (mutex_init(&wp->w_event_mutex, NULL) != 0)
47 		panic("failed to initialize mutex");
48 	if (cond_init(&wp->w_event, NULL) != 0)
49 		panic("failed to initialize condition variable");
50 	if (mthread_create(&wp->w_tid, &tattr, worker_main, (void *) wp) != 0)
51 		panic("unable to start thread");
52   }
53 
54   /* Let all threads get ready to accept work. */
55   worker_yield();
56 }
57 
58 /*===========================================================================*
59  *				worker_cleanup				     *
60  *===========================================================================*/
61 void worker_cleanup(void)
62 {
63 /* Clean up worker threads, reversing the actions of worker_init() such that
64  * we can safely call worker_init() again later. All worker threads are
65  * expected to be idle already. Used for live updates, because transferring
66  * the thread stacks from one version to another is currently not feasible.
67  */
68   struct worker_thread *wp;
69   int i;
70 
71   assert(worker_idle());
72 
73   /* First terminate all threads. */
74   for (i = 0; i < NR_WTHREADS; i++) {
75 	wp = &workers[i];
76 
77 	assert(wp->w_fp == NULL);
78 
79 	/* Waking up the thread with no w_fp will cause it to exit. */
80 	worker_wake(wp);
81   }
82 
83   worker_yield();
84 
85   /* Then clean up their resources. */
86   for (i = 0; i < NR_WTHREADS; i++) {
87 	wp = &workers[i];
88 
89 	if (mthread_join(wp->w_tid, NULL) != 0)
90 		panic("worker_cleanup: could not join thread %d", i);
91 	if (cond_destroy(&wp->w_event) != 0)
92 		panic("failed to destroy condition variable");
93 	if (mutex_destroy(&wp->w_event_mutex) != 0)
94 		panic("failed to destroy mutex");
95   }
96 
97   /* Finally, clean up global resources. */
98   if (mthread_attr_destroy(&tattr) != 0)
99 	panic("failed to destroy attribute");
100 
101   memset(workers, 0, sizeof(workers));
102 }
103 
104 /*===========================================================================*
105  *				worker_idle				     *
106  *===========================================================================*/
107 int worker_idle(void)
108 {
109 /* Return whether all worker threads are idle. */
110 
111   return (pending == 0 && busy == 0);
112 }
113 
114 /*===========================================================================*
115  *				worker_assign				     *
116  *===========================================================================*/
117 static void worker_assign(struct fproc *rfp)
118 {
119 /* Assign the work for the given process to a free thread. The caller must
120  * ensure that there is in fact at least one free thread.
121  */
122   struct worker_thread *worker;
123   int i;
124 
125   /* Find a free worker thread. */
126   for (i = 0; i < NR_WTHREADS; i++) {
127 	worker = &workers[i];
128 
129 	if (worker->w_fp == NULL)
130 		break;
131   }
132   assert(worker != NULL);
133 
134   /* Assign work to it. */
135   rfp->fp_worker = worker;
136   worker->w_fp = rfp;
137   busy++;
138 
139   worker_wake(worker);
140 }
141 
142 /*===========================================================================*
143  *				worker_may_do_pending			     *
144  *===========================================================================*/
145 static int worker_may_do_pending(void)
146 {
147 /* Return whether there is a free thread that may do pending work. This is true
148  * only if there is pending work at all, and there is a free non-spare thread
149  * (the spare thread is never used for pending work), and VFS is currently
150  * processing new requests at all (this may not be true during initialization).
151  */
152 
153   /* Ordered by likelihood to be false. */
154   return (pending > 0 && worker_available() > 1 && !block_all);
155 }
156 
157 /*===========================================================================*
158  *				worker_allow				     *
159  *===========================================================================*/
160 void worker_allow(int allow)
161 {
162 /* Allow or disallow workers to process new work. If disallowed, any new work
163  * will be stored as pending, even when there are free worker threads. There is
164  * no facility to stop active workers. To be used only during initialization!
165  */
166   struct fproc *rfp;
167 
168   block_all = !allow;
169 
170   if (!worker_may_do_pending())
171 	return;
172 
173   /* Assign any pending work to workers. */
174   for (rfp = &fproc[0]; rfp < &fproc[NR_PROCS]; rfp++) {
175 	if (rfp->fp_flags & FP_PENDING) {
176 		rfp->fp_flags &= ~FP_PENDING; /* No longer pending */
177 		assert(pending > 0);
178 		pending--;
179 		worker_assign(rfp);
180 
181 		if (!worker_may_do_pending())
182 			return;
183 	}
184   }
185 }
186 
187 /*===========================================================================*
188  *				worker_get_work				     *
189  *===========================================================================*/
190 static int worker_get_work(void)
191 {
192 /* Find new work to do. Work can be 'queued', 'pending', or absent. In the
193  * latter case wait for new work to come in. Return TRUE if there is work to
194  * do, or FALSE if the current thread is requested to shut down.
195  */
196   struct fproc *rfp;
197 
198   assert(self->w_fp == NULL);
199 
200   /* Is there pending work, and should we do it? */
201   if (worker_may_do_pending()) {
202 	/* Find pending work */
203 	for (rfp = &fproc[0]; rfp < &fproc[NR_PROCS]; rfp++) {
204 		if (rfp->fp_flags & FP_PENDING) {
205 			self->w_fp = rfp;
206 			rfp->fp_worker = self;
207 			busy++;
208 			rfp->fp_flags &= ~FP_PENDING; /* No longer pending */
209 			assert(pending > 0);
210 			pending--;
211 			return TRUE;
212 		}
213 	}
214 	panic("Pending work inconsistency");
215   }
216 
217   /* Wait for work to come to us */
218   worker_sleep();
219 
220   return (self->w_fp != NULL);
221 }
222 
223 /*===========================================================================*
224  *				worker_available			     *
225  *===========================================================================*/
226 int worker_available(void)
227 {
228 /* Return the number of threads that are available, including the spare thread.
229  */
230 
231   return(NR_WTHREADS - busy);
232 }
233 
234 /*===========================================================================*
235  *				worker_main				     *
236  *===========================================================================*/
237 static void *worker_main(void *arg)
238 {
239 /* Worker thread main loop */
240 
241   self = (struct worker_thread *) arg;
242   ASSERTW(self);
243 
244   while (worker_get_work()) {
245 
246 	fp = self->w_fp;
247 	assert(fp->fp_worker == self);
248 
249 	/* Lock the process. */
250 	lock_proc(fp);
251 
252 	/* The following two blocks could be run in a loop until both the
253 	 * conditions are no longer met, but it is currently impossible that
254 	 * more normal work is present after postponed PM work has been done.
255 	 */
256 
257 	/* Perform normal work, if any. */
258 	if (fp->fp_func != NULL) {
259 		self->w_m_in = fp->fp_msg;
260 		err_code = OK;
261 
262 		fp->fp_func();
263 
264 		fp->fp_func = NULL;	/* deliberately unset AFTER the call */
265 	}
266 
267 	/* Perform postponed PM work, if any. */
268 	if (fp->fp_flags & FP_PM_WORK) {
269 		self->w_m_in = fp->fp_pm_msg;
270 
271 		service_pm_postponed();
272 
273 		fp->fp_flags &= ~FP_PM_WORK;
274 	}
275 
276 	/* Perform cleanup actions. */
277 	thread_cleanup();
278 
279 	unlock_proc(fp);
280 
281 	fp->fp_worker = NULL;
282 	self->w_fp = NULL;
283 	assert(busy > 0);
284 	busy--;
285   }
286 
287   return(NULL);
288 }
289 
290 /*===========================================================================*
291  *				worker_can_start			     *
292  *===========================================================================*/
293 int worker_can_start(struct fproc *rfp)
294 {
295 /* Return whether normal (non-PM) work can be started for the given process.
296  * This function is used to serialize invocation of "special" procedures, and
297  * not entirely safe for other cases, as explained in the comments below.
298  */
299   int is_pending, is_active, has_normal_work, has_pm_work;
300 
301   is_pending = (rfp->fp_flags & FP_PENDING);
302   is_active = (rfp->fp_worker != NULL);
303   has_normal_work = (rfp->fp_func != NULL);
304   has_pm_work = (rfp->fp_flags & FP_PM_WORK);
305 
306   /* If there is no work scheduled for the process, we can start work. */
307   if (!is_pending && !is_active) return TRUE;
308 
309   /* If there is already normal work scheduled for the process, we cannot add
310    * more, since we support only one normal job per process.
311    */
312   if (has_normal_work) return FALSE;
313 
314   /* If this process has pending PM work but no normal work, we can add the
315    * normal work for execution before the worker will start.
316    */
317   if (is_pending) return TRUE;
318 
319   /* However, if a worker is active for PM work, we cannot add normal work
320    * either, because the work will not be considered. For this reason, we can
321    * not use this function for processes that can possibly get postponed PM
322    * work. It is still safe for core system processes, though.
323    */
324   return FALSE;
325 }
326 
327 /*===========================================================================*
328  *				worker_try_activate			     *
329  *===========================================================================*/
330 static void worker_try_activate(struct fproc *rfp, int use_spare)
331 {
332 /* See if we can wake up a thread to do the work scheduled for the given
333  * process. If not, mark the process as having pending work for later.
334  */
335   int needed;
336 
337   /* Use the last available thread only if requested. Otherwise, leave at least
338    * one spare thread for deadlock resolution.
339    */
340   needed = use_spare ? 1 : 2;
341 
342   /* Also make sure that doing new work is allowed at all right now, which may
343    * not be the case during VFS initialization. We do always allow callback
344    * calls, i.e., calls that may use the spare thread. The reason is that we do
345    * not support callback calls being marked as pending, so the (entirely
346    * theoretical) exception here may (entirely theoretically) avoid deadlocks.
347    */
348   if (needed <= worker_available() && (!block_all || use_spare)) {
349 	worker_assign(rfp);
350   } else {
351 	rfp->fp_flags |= FP_PENDING;
352 	pending++;
353   }
354 }
355 
356 /*===========================================================================*
357  *				worker_start				     *
358  *===========================================================================*/
359 void worker_start(struct fproc *rfp, void (*func)(void), message *m_ptr,
360 	int use_spare)
361 {
362 /* Schedule work to be done by a worker thread. The work is bound to the given
363  * process. If a function pointer is given, the work is considered normal work,
364  * and the function will be called to handle it. If the function pointer is
365  * NULL, the work is considered postponed PM work, and service_pm_postponed
366  * will be called to handle it. The input message will be a copy of the given
367  * message. Optionally, the last spare (deadlock-resolving) thread may be used
368  * to execute the work immediately.
369  */
370   int is_pm_work, is_pending, is_active, has_normal_work, has_pm_work;
371 
372   assert(rfp != NULL);
373 
374   is_pm_work = (func == NULL);
375   is_pending = (rfp->fp_flags & FP_PENDING);
376   is_active = (rfp->fp_worker != NULL);
377   has_normal_work = (rfp->fp_func != NULL);
378   has_pm_work = (rfp->fp_flags & FP_PM_WORK);
379 
380   /* Sanity checks. If any of these trigger, someone messed up badly! */
381   if (is_pending || is_active) {
382 	if (is_pending && is_active)
383 		panic("work cannot be both pending and active");
384 
385 	/* The process cannot make more than one call at once. */
386 	if (!is_pm_work && has_normal_work)
387 		panic("process has two calls (%x, %x)",
388 			rfp->fp_msg.m_type, m_ptr->m_type);
389 
390 	/* PM will not send more than one job per process to us at once. */
391 	if (is_pm_work && has_pm_work)
392 		panic("got two calls from PM (%x, %x)",
393 			rfp->fp_pm_msg.m_type, m_ptr->m_type);
394 
395 	/* Despite PM's sys_delay_stop() system, it is possible that normal
396 	 * work (in particular, do_pending_pipe) arrives after postponed PM
397 	 * work has been scheduled for execution, so we don't check for that.
398 	 */
399 #if 0
400 	printf("VFS: adding %s work to %s thread\n",
401 		is_pm_work ? "PM" : "normal",
402 		is_pending ? "pending" : "active");
403 #endif
404   } else {
405 	/* Some cleanup step forgotten somewhere? */
406 	if (has_normal_work || has_pm_work)
407 		panic("worker administration error");
408   }
409 
410   /* Save the work to be performed. */
411   if (!is_pm_work) {
412 	rfp->fp_msg = *m_ptr;
413 	rfp->fp_func = func;
414   } else {
415 	rfp->fp_pm_msg = *m_ptr;
416 	rfp->fp_flags |= FP_PM_WORK;
417   }
418 
419   /* If we have not only added to existing work, go look for a free thread.
420    * Note that we won't be using the spare thread for normal work if there is
421    * already PM work pending, but that situation will never occur in practice.
422    */
423   if (!is_pending && !is_active)
424 	worker_try_activate(rfp, use_spare);
425 }
426 
427 /*===========================================================================*
428  *				worker_yield				     *
429  *===========================================================================*/
430 void worker_yield(void)
431 {
432 /* Yield to all worker threads. To be called from the main thread only. */
433 
434   mthread_yield_all();
435 
436   self = NULL;
437 }
438 
439 /*===========================================================================*
440  *				worker_sleep				     *
441  *===========================================================================*/
442 static void worker_sleep(void)
443 {
444   struct worker_thread *worker = self;
445   ASSERTW(worker);
446   if (mutex_lock(&worker->w_event_mutex) != 0)
447 	panic("unable to lock event mutex");
448   if (cond_wait(&worker->w_event, &worker->w_event_mutex) != 0)
449 	panic("could not wait on conditional variable");
450   if (mutex_unlock(&worker->w_event_mutex) != 0)
451 	panic("unable to unlock event mutex");
452   self = worker;
453 }
454 
455 /*===========================================================================*
456  *				worker_wake				     *
457  *===========================================================================*/
458 static void worker_wake(struct worker_thread *worker)
459 {
460 /* Signal a worker to wake up */
461   ASSERTW(worker);
462   if (mutex_lock(&worker->w_event_mutex) != 0)
463 	panic("unable to lock event mutex");
464   if (cond_signal(&worker->w_event) != 0)
465 	panic("unable to signal conditional variable");
466   if (mutex_unlock(&worker->w_event_mutex) != 0)
467 	panic("unable to unlock event mutex");
468 }
469 
470 /*===========================================================================*
471  *				worker_suspend				     *
472  *===========================================================================*/
473 struct worker_thread *worker_suspend(void)
474 {
475 /* Suspend the current thread, saving certain thread variables. Return a
476  * pointer to the thread's worker structure for later resumption.
477  */
478 
479   ASSERTW(self);
480   assert(fp != NULL);
481   assert(self->w_fp == fp);
482   assert(fp->fp_worker == self);
483 
484   self->w_err_code = err_code;
485 
486   return self;
487 }
488 
489 /*===========================================================================*
490  *				worker_resume				     *
491  *===========================================================================*/
492 void worker_resume(struct worker_thread *org_self)
493 {
494 /* Resume the current thread after suspension, restoring thread variables. */
495 
496   ASSERTW(org_self);
497 
498   self = org_self;
499 
500   fp = self->w_fp;
501   assert(fp != NULL);
502 
503   err_code = self->w_err_code;
504 }
505 
506 /*===========================================================================*
507  *				worker_wait				     *
508  *===========================================================================*/
509 void worker_wait(void)
510 {
511 /* Put the current thread to sleep until woken up by the main thread. */
512 
513   (void) worker_suspend(); /* worker_sleep already saves and restores 'self' */
514 
515   worker_sleep();
516 
517   /* We continue here after waking up */
518   worker_resume(self);
519   assert(self->w_next == NULL);
520 }
521 
522 /*===========================================================================*
523  *				worker_signal				     *
524  *===========================================================================*/
525 void worker_signal(struct worker_thread *worker)
526 {
527   ASSERTW(worker);		/* Make sure we have a valid thread */
528   worker_wake(worker);
529 }
530 
531 /*===========================================================================*
532  *				worker_stop				     *
533  *===========================================================================*/
534 void worker_stop(struct worker_thread *worker)
535 {
536   ASSERTW(worker);		/* Make sure we have a valid thread */
537   if (worker->w_task != NONE) {
538 	/* This thread is communicating with a driver or file server */
539 	if (worker->w_drv_sendrec != NULL) {			/* Driver */
540 		worker->w_drv_sendrec->m_type = EIO;
541 		worker->w_drv_sendrec = NULL;
542 	} else if (worker->w_sendrec != NULL) {		/* FS */
543 		worker->w_sendrec->m_type = EIO;
544 		worker->w_sendrec = NULL;
545 	} else {
546 		panic("reply storage consistency error");	/* Oh dear */
547 	}
548   } else {
549 	/* This shouldn't happen at all... */
550 	printf("VFS: stopping worker not blocked on any task?\n");
551 	util_stacktrace();
552   }
553   worker_wake(worker);
554 }
555 
556 /*===========================================================================*
557  *				worker_stop_by_endpt			     *
558  *===========================================================================*/
559 void worker_stop_by_endpt(endpoint_t proc_e)
560 {
561   struct worker_thread *worker;
562   int i;
563 
564   if (proc_e == NONE) return;
565 
566   for (i = 0; i < NR_WTHREADS; i++) {
567 	worker = &workers[i];
568 	if (worker->w_fp != NULL && worker->w_task == proc_e)
569 		worker_stop(worker);
570   }
571 }
572 
573 /*===========================================================================*
574  *				worker_get				     *
575  *===========================================================================*/
576 struct worker_thread *worker_get(thread_t worker_tid)
577 {
578   int i;
579 
580   for (i = 0; i < NR_WTHREADS; i++)
581 	if (workers[i].w_tid == worker_tid)
582 		return(&workers[i]);
583 
584   return(NULL);
585 }
586 
587 /*===========================================================================*
588  *				worker_set_proc				     *
589  *===========================================================================*/
590 void worker_set_proc(struct fproc *rfp)
591 {
592 /* Perform an incredibly ugly action that completely violates the threading
593  * model: change the current working thread's process context to another
594  * process. The caller is expected to hold the lock to both the calling and the
595  * target process, and neither process is expected to continue regular
596  * operation when done. This code is here *only* and *strictly* for the reboot
597  * code, and *must not* be used for anything else.
598  */
599 
600   if (fp == rfp) return;
601 
602   if (rfp->fp_worker != NULL)
603 	panic("worker_set_proc: target process not idle");
604 
605   fp->fp_worker = NULL;
606 
607   fp = rfp;
608 
609   self->w_fp = rfp;
610   fp->fp_worker = self;
611 }
612