xref: /minix3/minix/servers/vfs/worker.c (revision 7c48de6cc4c6d56f2277d378dba01dbac8a8c3b9)
1 #include "fs.h"
2 #include <string.h>
3 #include <assert.h>
4 
5 static void *worker_main(void *arg);
6 static void worker_sleep(void);
7 static void worker_wake(struct worker_thread *worker);
8 
9 static mthread_attr_t tattr;
10 static unsigned int pending;
11 static unsigned int busy;
12 static int block_all;
13 
14 #if defined(_MINIX_MAGIC)
15 # define TH_STACKSIZE (64 * 1024)
16 #elif defined(MKCOVERAGE)
17 # define TH_STACKSIZE (40 * 1024)
18 #else
19 # define TH_STACKSIZE (28 * 1024)
20 #endif
21 
22 #define ASSERTW(w) assert((w) >= &workers[0] && (w) < &workers[NR_WTHREADS])
23 
24 /*===========================================================================*
25  *				worker_init				     *
26  *===========================================================================*/
27 void worker_init(void)
28 {
29 /* Initialize worker threads */
30   struct worker_thread *wp;
31   int i;
32 
33   if (mthread_attr_init(&tattr) != 0)
34 	panic("failed to initialize attribute");
35   if (mthread_attr_setstacksize(&tattr, TH_STACKSIZE) != 0)
36 	panic("couldn't set default thread stack size");
37 
38   pending = 0;
39   busy = 0;
40   block_all = FALSE;
41 
42   for (i = 0; i < NR_WTHREADS; i++) {
43 	wp = &workers[i];
44 
45 	wp->w_fp = NULL;		/* Mark not in use */
46 	wp->w_next = NULL;
47 	wp->w_task = NONE;
48 	if (mutex_init(&wp->w_event_mutex, NULL) != 0)
49 		panic("failed to initialize mutex");
50 	if (cond_init(&wp->w_event, NULL) != 0)
51 		panic("failed to initialize condition variable");
52 	if (mthread_create(&wp->w_tid, &tattr, worker_main, (void *) wp) != 0)
53 		panic("unable to start thread");
54   }
55 
56   /* Let all threads get ready to accept work. */
57   worker_yield();
58 }
59 
60 /*===========================================================================*
61  *				worker_cleanup				     *
62  *===========================================================================*/
63 void worker_cleanup(void)
64 {
65 /* Clean up worker threads, reversing the actions of worker_init() such that
66  * we can safely call worker_init() again later. All worker threads are
67  * expected to be idle already. Used for live updates, because transferring
68  * the thread stacks from one version to another is currently not feasible.
69  */
70   struct worker_thread *wp;
71   int i;
72 
73   assert(worker_idle());
74 
75   /* First terminate all threads. */
76   for (i = 0; i < NR_WTHREADS; i++) {
77 	wp = &workers[i];
78 
79 	assert(wp->w_fp == NULL);
80 
81 	/* Waking up the thread with no w_fp will cause it to exit. */
82 	worker_wake(wp);
83   }
84 
85   worker_yield();
86 
87   /* Then clean up their resources. */
88   for (i = 0; i < NR_WTHREADS; i++) {
89 	wp = &workers[i];
90 
91 	if (mthread_join(wp->w_tid, NULL) != 0)
92 		panic("worker_cleanup: could not join thread %d", i);
93 	if (cond_destroy(&wp->w_event) != 0)
94 		panic("failed to destroy condition variable");
95 	if (mutex_destroy(&wp->w_event_mutex) != 0)
96 		panic("failed to destroy mutex");
97   }
98 
99   /* Finally, clean up global resources. */
100   if (mthread_attr_destroy(&tattr) != 0)
101 	panic("failed to destroy attribute");
102 
103   memset(workers, 0, sizeof(workers));
104 }
105 
106 /*===========================================================================*
107  *				worker_idle				     *
108  *===========================================================================*/
109 int worker_idle(void)
110 {
111 /* Return whether all worker threads are idle. */
112 
113   return (pending == 0 && busy == 0);
114 }
115 
116 /*===========================================================================*
117  *				worker_assign				     *
118  *===========================================================================*/
119 static void worker_assign(struct fproc *rfp)
120 {
121 /* Assign the work for the given process to a free thread. The caller must
122  * ensure that there is in fact at least one free thread.
123  */
124   struct worker_thread *worker;
125   int i;
126 
127   /* Find a free worker thread. */
128   for (i = 0; i < NR_WTHREADS; i++) {
129 	worker = &workers[i];
130 
131 	if (worker->w_fp == NULL)
132 		break;
133   }
134   assert(worker != NULL);
135 
136   /* Assign work to it. */
137   rfp->fp_worker = worker;
138   worker->w_fp = rfp;
139   busy++;
140 
141   worker_wake(worker);
142 }
143 
144 /*===========================================================================*
145  *				worker_may_do_pending			     *
146  *===========================================================================*/
147 static int worker_may_do_pending(void)
148 {
149 /* Return whether there is a free thread that may do pending work. This is true
150  * only if there is pending work at all, and there is a free non-spare thread
151  * (the spare thread is never used for pending work), and VFS is currently
152  * processing new requests at all (this may not be true during initialization).
153  */
154 
155   /* Ordered by likelihood to be false. */
156   return (pending > 0 && worker_available() > 1 && !block_all);
157 }
158 
159 /*===========================================================================*
160  *				worker_allow				     *
161  *===========================================================================*/
162 void worker_allow(int allow)
163 {
164 /* Allow or disallow workers to process new work. If disallowed, any new work
165  * will be stored as pending, even when there are free worker threads. There is
166  * no facility to stop active workers. To be used only during initialization!
167  */
168   struct fproc *rfp;
169 
170   block_all = !allow;
171 
172   if (!worker_may_do_pending())
173 	return;
174 
175   /* Assign any pending work to workers. */
176   for (rfp = &fproc[0]; rfp < &fproc[NR_PROCS]; rfp++) {
177 	if (rfp->fp_flags & FP_PENDING) {
178 		rfp->fp_flags &= ~FP_PENDING; /* No longer pending */
179 		assert(pending > 0);
180 		pending--;
181 		worker_assign(rfp);
182 
183 		if (!worker_may_do_pending())
184 			return;
185 	}
186   }
187 }
188 
189 /*===========================================================================*
190  *				worker_get_work				     *
191  *===========================================================================*/
192 static int worker_get_work(void)
193 {
194 /* Find new work to do. Work can be 'queued', 'pending', or absent. In the
195  * latter case wait for new work to come in. Return TRUE if there is work to
196  * do, or FALSE if the current thread is requested to shut down.
197  */
198   struct fproc *rfp;
199 
200   assert(self->w_fp == NULL);
201 
202   /* Is there pending work, and should we do it? */
203   if (worker_may_do_pending()) {
204 	/* Find pending work */
205 	for (rfp = &fproc[0]; rfp < &fproc[NR_PROCS]; rfp++) {
206 		if (rfp->fp_flags & FP_PENDING) {
207 			self->w_fp = rfp;
208 			rfp->fp_worker = self;
209 			busy++;
210 			rfp->fp_flags &= ~FP_PENDING; /* No longer pending */
211 			assert(pending > 0);
212 			pending--;
213 			return TRUE;
214 		}
215 	}
216 	panic("Pending work inconsistency");
217   }
218 
219   /* Wait for work to come to us */
220   worker_sleep();
221 
222   return (self->w_fp != NULL);
223 }
224 
225 /*===========================================================================*
226  *				worker_available			     *
227  *===========================================================================*/
228 int worker_available(void)
229 {
230 /* Return the number of threads that are available, including the spare thread.
231  */
232 
233   return(NR_WTHREADS - busy);
234 }
235 
236 /*===========================================================================*
237  *				worker_main				     *
238  *===========================================================================*/
239 static void *worker_main(void *arg)
240 {
241 /* Worker thread main loop */
242 
243   self = (struct worker_thread *) arg;
244   ASSERTW(self);
245 
246   while (worker_get_work()) {
247 
248 	fp = self->w_fp;
249 	assert(fp->fp_worker == self);
250 
251 	/* Lock the process. */
252 	lock_proc(fp);
253 
254 	/* The following two blocks could be run in a loop until both the
255 	 * conditions are no longer met, but it is currently impossible that
256 	 * more normal work is present after postponed PM work has been done.
257 	 */
258 
259 	/* Perform normal work, if any. */
260 	if (fp->fp_func != NULL) {
261 		self->w_m_in = fp->fp_msg;
262 		err_code = OK;
263 
264 		fp->fp_func();
265 
266 		fp->fp_func = NULL;	/* deliberately unset AFTER the call */
267 	}
268 
269 	/* Perform postponed PM work, if any. */
270 	if (fp->fp_flags & FP_PM_WORK) {
271 		self->w_m_in = fp->fp_pm_msg;
272 
273 		service_pm_postponed();
274 
275 		fp->fp_flags &= ~FP_PM_WORK;
276 	}
277 
278 	/* Perform cleanup actions. */
279 	thread_cleanup();
280 
281 	unlock_proc(fp);
282 
283 	fp->fp_worker = NULL;
284 	self->w_fp = NULL;
285 	assert(busy > 0);
286 	busy--;
287   }
288 
289   return(NULL);
290 }
291 
292 /*===========================================================================*
293  *				worker_can_start			     *
294  *===========================================================================*/
295 int worker_can_start(struct fproc *rfp)
296 {
297 /* Return whether normal (non-PM) work can be started for the given process.
298  * This function is used to serialize invocation of "special" procedures, and
299  * not entirely safe for other cases, as explained in the comments below.
300  */
301   int is_pending, is_active, has_normal_work, has_pm_work;
302 
303   is_pending = (rfp->fp_flags & FP_PENDING);
304   is_active = (rfp->fp_worker != NULL);
305   has_normal_work = (rfp->fp_func != NULL);
306   has_pm_work = (rfp->fp_flags & FP_PM_WORK);
307 
308   /* If there is no work scheduled for the process, we can start work. */
309   if (!is_pending && !is_active) return TRUE;
310 
311   /* If there is already normal work scheduled for the process, we cannot add
312    * more, since we support only one normal job per process.
313    */
314   if (has_normal_work) return FALSE;
315 
316   /* If this process has pending PM work but no normal work, we can add the
317    * normal work for execution before the worker will start.
318    */
319   if (is_pending) return TRUE;
320 
321   /* However, if a worker is active for PM work, we cannot add normal work
322    * either, because the work will not be considered. For this reason, we can
323    * not use this function for processes that can possibly get postponed PM
324    * work. It is still safe for core system processes, though.
325    */
326   return FALSE;
327 }
328 
329 /*===========================================================================*
330  *				worker_try_activate			     *
331  *===========================================================================*/
332 static void worker_try_activate(struct fproc *rfp, int use_spare)
333 {
334 /* See if we can wake up a thread to do the work scheduled for the given
335  * process. If not, mark the process as having pending work for later.
336  */
337   int needed;
338 
339   /* Use the last available thread only if requested. Otherwise, leave at least
340    * one spare thread for deadlock resolution.
341    */
342   needed = use_spare ? 1 : 2;
343 
344   /* Also make sure that doing new work is allowed at all right now, which may
345    * not be the case during VFS initialization. We do always allow callback
346    * calls, i.e., calls that may use the spare thread. The reason is that we do
347    * not support callback calls being marked as pending, so the (entirely
348    * theoretical) exception here may (entirely theoretically) avoid deadlocks.
349    */
350   if (needed <= worker_available() && (!block_all || use_spare)) {
351 	worker_assign(rfp);
352   } else {
353 	rfp->fp_flags |= FP_PENDING;
354 	pending++;
355   }
356 }
357 
358 /*===========================================================================*
359  *				worker_start				     *
360  *===========================================================================*/
361 void worker_start(struct fproc *rfp, void (*func)(void), message *m_ptr,
362 	int use_spare)
363 {
364 /* Schedule work to be done by a worker thread. The work is bound to the given
365  * process. If a function pointer is given, the work is considered normal work,
366  * and the function will be called to handle it. If the function pointer is
367  * NULL, the work is considered postponed PM work, and service_pm_postponed
368  * will be called to handle it. The input message will be a copy of the given
369  * message. Optionally, the last spare (deadlock-resolving) thread may be used
370  * to execute the work immediately.
371  */
372   int is_pm_work, is_pending, is_active, has_normal_work, has_pm_work;
373 
374   assert(rfp != NULL);
375 
376   is_pm_work = (func == NULL);
377   is_pending = (rfp->fp_flags & FP_PENDING);
378   is_active = (rfp->fp_worker != NULL);
379   has_normal_work = (rfp->fp_func != NULL);
380   has_pm_work = (rfp->fp_flags & FP_PM_WORK);
381 
382   /* Sanity checks. If any of these trigger, someone messed up badly! */
383   if (is_pending || is_active) {
384 	if (is_pending && is_active)
385 		panic("work cannot be both pending and active");
386 
387 	/* The process cannot make more than one call at once. */
388 	if (!is_pm_work && has_normal_work)
389 		panic("process has two calls (%x, %x)",
390 			rfp->fp_msg.m_type, m_ptr->m_type);
391 
392 	/* PM will not send more than one job per process to us at once. */
393 	if (is_pm_work && has_pm_work)
394 		panic("got two calls from PM (%x, %x)",
395 			rfp->fp_pm_msg.m_type, m_ptr->m_type);
396 
397 	/* Despite PM's sys_delay_stop() system, it is possible that normal
398 	 * work (in particular, do_pending_pipe) arrives after postponed PM
399 	 * work has been scheduled for execution, so we don't check for that.
400 	 */
401 #if 0
402 	printf("VFS: adding %s work to %s thread\n",
403 		is_pm_work ? "PM" : "normal",
404 		is_pending ? "pending" : "active");
405 #endif
406   } else {
407 	/* Some cleanup step forgotten somewhere? */
408 	if (has_normal_work || has_pm_work)
409 		panic("worker administration error");
410   }
411 
412   /* Save the work to be performed. */
413   if (!is_pm_work) {
414 	rfp->fp_msg = *m_ptr;
415 	rfp->fp_func = func;
416   } else {
417 	rfp->fp_pm_msg = *m_ptr;
418 	rfp->fp_flags |= FP_PM_WORK;
419   }
420 
421   /* If we have not only added to existing work, go look for a free thread.
422    * Note that we won't be using the spare thread for normal work if there is
423    * already PM work pending, but that situation will never occur in practice.
424    */
425   if (!is_pending && !is_active)
426 	worker_try_activate(rfp, use_spare);
427 }
428 
429 /*===========================================================================*
430  *				worker_yield				     *
431  *===========================================================================*/
432 void worker_yield(void)
433 {
434 /* Yield to all worker threads. To be called from the main thread only. */
435 
436   mthread_yield_all();
437 
438   self = NULL;
439 }
440 
441 /*===========================================================================*
442  *				worker_sleep				     *
443  *===========================================================================*/
444 static void worker_sleep(void)
445 {
446   struct worker_thread *worker = self;
447   ASSERTW(worker);
448   if (mutex_lock(&worker->w_event_mutex) != 0)
449 	panic("unable to lock event mutex");
450   if (cond_wait(&worker->w_event, &worker->w_event_mutex) != 0)
451 	panic("could not wait on conditional variable");
452   if (mutex_unlock(&worker->w_event_mutex) != 0)
453 	panic("unable to unlock event mutex");
454   self = worker;
455 }
456 
457 /*===========================================================================*
458  *				worker_wake				     *
459  *===========================================================================*/
460 static void worker_wake(struct worker_thread *worker)
461 {
462 /* Signal a worker to wake up */
463   ASSERTW(worker);
464   if (mutex_lock(&worker->w_event_mutex) != 0)
465 	panic("unable to lock event mutex");
466   if (cond_signal(&worker->w_event) != 0)
467 	panic("unable to signal conditional variable");
468   if (mutex_unlock(&worker->w_event_mutex) != 0)
469 	panic("unable to unlock event mutex");
470 }
471 
472 /*===========================================================================*
473  *				worker_suspend				     *
474  *===========================================================================*/
475 struct worker_thread *worker_suspend(void)
476 {
477 /* Suspend the current thread, saving certain thread variables. Return a
478  * pointer to the thread's worker structure for later resumption.
479  */
480 
481   ASSERTW(self);
482   assert(fp != NULL);
483   assert(self->w_fp == fp);
484   assert(fp->fp_worker == self);
485 
486   self->w_err_code = err_code;
487 
488   return self;
489 }
490 
491 /*===========================================================================*
492  *				worker_resume				     *
493  *===========================================================================*/
494 void worker_resume(struct worker_thread *org_self)
495 {
496 /* Resume the current thread after suspension, restoring thread variables. */
497 
498   ASSERTW(org_self);
499 
500   self = org_self;
501 
502   fp = self->w_fp;
503   assert(fp != NULL);
504 
505   err_code = self->w_err_code;
506 }
507 
508 /*===========================================================================*
509  *				worker_wait				     *
510  *===========================================================================*/
511 void worker_wait(void)
512 {
513 /* Put the current thread to sleep until woken up by the main thread. */
514 
515   (void) worker_suspend(); /* worker_sleep already saves and restores 'self' */
516 
517   worker_sleep();
518 
519   /* We continue here after waking up */
520   worker_resume(self);
521   assert(self->w_next == NULL);
522 }
523 
524 /*===========================================================================*
525  *				worker_signal				     *
526  *===========================================================================*/
527 void worker_signal(struct worker_thread *worker)
528 {
529   ASSERTW(worker);		/* Make sure we have a valid thread */
530   worker_wake(worker);
531 }
532 
533 /*===========================================================================*
534  *				worker_stop				     *
535  *===========================================================================*/
536 void worker_stop(struct worker_thread *worker)
537 {
538   ASSERTW(worker);		/* Make sure we have a valid thread */
539   if (worker->w_task != NONE) {
540 	/* This thread is communicating with a driver or file server */
541 	if (worker->w_drv_sendrec != NULL) {			/* Driver */
542 		worker->w_drv_sendrec->m_type = EIO;
543 		worker->w_drv_sendrec = NULL;
544 	} else if (worker->w_sendrec != NULL) {		/* FS */
545 		worker->w_sendrec->m_type = EIO;
546 		worker->w_sendrec = NULL;
547 	} else {
548 		panic("reply storage consistency error");	/* Oh dear */
549 	}
550   } else {
551 	/* This shouldn't happen at all... */
552 	printf("VFS: stopping worker not blocked on any task?\n");
553 	util_stacktrace();
554   }
555   worker_wake(worker);
556 }
557 
558 /*===========================================================================*
559  *				worker_stop_by_endpt			     *
560  *===========================================================================*/
561 void worker_stop_by_endpt(endpoint_t proc_e)
562 {
563   struct worker_thread *worker;
564   int i;
565 
566   if (proc_e == NONE) return;
567 
568   for (i = 0; i < NR_WTHREADS; i++) {
569 	worker = &workers[i];
570 	if (worker->w_fp != NULL && worker->w_task == proc_e)
571 		worker_stop(worker);
572   }
573 }
574 
575 /*===========================================================================*
576  *				worker_get				     *
577  *===========================================================================*/
578 struct worker_thread *worker_get(thread_t worker_tid)
579 {
580   int i;
581 
582   for (i = 0; i < NR_WTHREADS; i++)
583 	if (workers[i].w_tid == worker_tid)
584 		return(&workers[i]);
585 
586   return(NULL);
587 }
588 
589 /*===========================================================================*
590  *				worker_set_proc				     *
591  *===========================================================================*/
592 void worker_set_proc(struct fproc *rfp)
593 {
594 /* Perform an incredibly ugly action that completely violates the threading
595  * model: change the current working thread's process context to another
596  * process. The caller is expected to hold the lock to both the calling and the
597  * target process, and neither process is expected to continue regular
598  * operation when done. This code is here *only* and *strictly* for the reboot
599  * code, and *must not* be used for anything else.
600  */
601 
602   if (fp == rfp) return;
603 
604   if (rfp->fp_worker != NULL)
605 	panic("worker_set_proc: target process not idle");
606 
607   fp->fp_worker = NULL;
608 
609   fp = rfp;
610 
611   self->w_fp = rfp;
612   fp->fp_worker = self;
613 }
614