xref: /minix3/minix/servers/vfs/worker.c (revision b5e2faaaaf60a8b9a02f8d72f64caa56a87eb312)
1 #include "fs.h"
2 #include <assert.h>
3 
4 static void worker_get_work(void);
5 static void *worker_main(void *arg);
6 static void worker_sleep(void);
7 static void worker_wake(struct worker_thread *worker);
8 
9 static mthread_attr_t tattr;
10 static unsigned int pending;
11 static unsigned int busy;
12 static int block_all;
13 
14 #ifdef MKCOVERAGE
15 # define TH_STACKSIZE (40 * 1024)
16 #else
17 # define TH_STACKSIZE (28 * 1024)
18 #endif
19 
20 #define ASSERTW(w) assert((w) >= &workers[0] && (w) < &workers[NR_WTHREADS])
21 
22 /*===========================================================================*
23  *				worker_init				     *
24  *===========================================================================*/
25 void worker_init(void)
26 {
27 /* Initialize worker thread */
28   struct worker_thread *wp;
29   int i;
30 
31   if (mthread_attr_init(&tattr) != 0)
32 	panic("failed to initialize attribute");
33   if (mthread_attr_setstacksize(&tattr, TH_STACKSIZE) != 0)
34 	panic("couldn't set default thread stack size");
35   if (mthread_attr_setdetachstate(&tattr, MTHREAD_CREATE_DETACHED) != 0)
36 	panic("couldn't set default thread detach state");
37   pending = 0;
38   busy = 0;
39   block_all = FALSE;
40 
41   for (i = 0; i < NR_WTHREADS; i++) {
42 	wp = &workers[i];
43 
44 	wp->w_fp = NULL;		/* Mark not in use */
45 	wp->w_next = NULL;
46 	wp->w_task = NONE;
47 	if (mutex_init(&wp->w_event_mutex, NULL) != 0)
48 		panic("failed to initialize mutex");
49 	if (cond_init(&wp->w_event, NULL) != 0)
50 		panic("failed to initialize conditional variable");
51 	if (mthread_create(&wp->w_tid, &tattr, worker_main, (void *) wp) != 0)
52 		panic("unable to start thread");
53   }
54 
55   /* Let all threads get ready to accept work. */
56   yield_all();
57 }
58 
59 /*===========================================================================*
60  *				worker_assign				     *
61  *===========================================================================*/
62 static void worker_assign(struct fproc *rfp)
63 {
64 /* Assign the work for the given process to a free thread. The caller must
65  * ensure that there is in fact at least one free thread.
66  */
67   struct worker_thread *worker;
68   int i;
69 
70   /* Find a free worker thread. */
71   for (i = 0; i < NR_WTHREADS; i++) {
72 	worker = &workers[i];
73 
74 	if (worker->w_fp == NULL)
75 		break;
76   }
77   assert(worker != NULL);
78 
79   /* Assign work to it. */
80   rfp->fp_worker = worker;
81   worker->w_fp = rfp;
82   busy++;
83 
84   worker_wake(worker);
85 }
86 
87 /*===========================================================================*
88  *				worker_may_do_pending			     *
89  *===========================================================================*/
90 static int worker_may_do_pending(void)
91 {
92 /* Return whether there is a free thread that may do pending work. This is true
93  * only if there is pending work at all, and there is a free non-spare thread
94  * (the spare thread is never used for pending work), and VFS is currently
95  * processing new requests at all (this may not be true during initialization).
96  */
97 
98   /* Ordered by likelihood to be false. */
99   return (pending > 0 && worker_available() > 1 && !block_all);
100 }
101 
102 /*===========================================================================*
103  *				worker_allow				     *
104  *===========================================================================*/
105 void worker_allow(int allow)
106 {
107 /* Allow or disallow workers to process new work. If disallowed, any new work
108  * will be stored as pending, even when there are free worker threads. There is
109  * no facility to stop active workers. To be used only during initialization!
110  */
111   struct fproc *rfp;
112 
113   block_all = !allow;
114 
115   if (!worker_may_do_pending())
116 	return;
117 
118   /* Assign any pending work to workers. */
119   for (rfp = &fproc[0]; rfp < &fproc[NR_PROCS]; rfp++) {
120 	if (rfp->fp_flags & FP_PENDING) {
121 		rfp->fp_flags &= ~FP_PENDING; /* No longer pending */
122 		assert(pending > 0);
123 		pending--;
124 		worker_assign(rfp);
125 
126 		if (!worker_may_do_pending())
127 			return;
128 	}
129   }
130 }
131 
132 /*===========================================================================*
133  *				worker_get_work				     *
134  *===========================================================================*/
135 static void worker_get_work(void)
136 {
137 /* Find new work to do. Work can be 'queued', 'pending', or absent. In the
138  * latter case wait for new work to come in.
139  */
140   struct fproc *rfp;
141 
142   assert(self->w_fp == NULL);
143 
144   /* Is there pending work, and should we do it? */
145   if (worker_may_do_pending()) {
146 	/* Find pending work */
147 	for (rfp = &fproc[0]; rfp < &fproc[NR_PROCS]; rfp++) {
148 		if (rfp->fp_flags & FP_PENDING) {
149 			self->w_fp = rfp;
150 			rfp->fp_worker = self;
151 			busy++;
152 			rfp->fp_flags &= ~FP_PENDING; /* No longer pending */
153 			assert(pending > 0);
154 			pending--;
155 			return;
156 		}
157 	}
158 	panic("Pending work inconsistency");
159   }
160 
161   /* Wait for work to come to us */
162   worker_sleep();
163 }
164 
165 /*===========================================================================*
166  *				worker_available			     *
167  *===========================================================================*/
168 int worker_available(void)
169 {
170 /* Return the number of threads that are available, including the spare thread.
171  */
172 
173   return(NR_WTHREADS - busy);
174 }
175 
176 /*===========================================================================*
177  *				worker_main				     *
178  *===========================================================================*/
179 static void *worker_main(void *arg)
180 {
181 /* Worker thread main loop */
182 
183   self = (struct worker_thread *) arg;
184   ASSERTW(self);
185 
186   while(TRUE) {
187 	worker_get_work();
188 
189 	fp = self->w_fp;
190 	assert(fp->fp_worker == self);
191 
192 	/* Lock the process. */
193 	lock_proc(fp);
194 
195 	/* The following two blocks could be run in a loop until both the
196 	 * conditions are no longer met, but it is currently impossible that
197 	 * more normal work is present after postponed PM work has been done.
198 	 */
199 
200 	/* Perform normal work, if any. */
201 	if (fp->fp_func != NULL) {
202 		self->w_m_in = fp->fp_msg;
203 		err_code = OK;
204 
205 		fp->fp_func();
206 
207 		fp->fp_func = NULL;	/* deliberately unset AFTER the call */
208 	}
209 
210 	/* Perform postponed PM work, if any. */
211 	if (fp->fp_flags & FP_PM_WORK) {
212 		self->w_m_in = fp->fp_pm_msg;
213 
214 		service_pm_postponed();
215 
216 		fp->fp_flags &= ~FP_PM_WORK;
217 	}
218 
219 	/* Perform cleanup actions. */
220 	thread_cleanup();
221 
222 	unlock_proc(fp);
223 
224 	fp->fp_worker = NULL;
225 	self->w_fp = NULL;
226 	assert(busy > 0);
227 	busy--;
228   }
229 
230   return(NULL);	/* Unreachable */
231 }
232 
233 /*===========================================================================*
234  *				worker_can_start			     *
235  *===========================================================================*/
236 int worker_can_start(struct fproc *rfp)
237 {
238 /* Return whether normal (non-PM) work can be started for the given process.
239  * This function is used to serialize invocation of "special" procedures, and
240  * not entirely safe for other cases, as explained in the comments below.
241  */
242   int is_pending, is_active, has_normal_work, has_pm_work;
243 
244   is_pending = (rfp->fp_flags & FP_PENDING);
245   is_active = (rfp->fp_worker != NULL);
246   has_normal_work = (rfp->fp_func != NULL);
247   has_pm_work = (rfp->fp_flags & FP_PM_WORK);
248 
249   /* If there is no work scheduled for the process, we can start work. */
250   if (!is_pending && !is_active) return TRUE;
251 
252   /* If there is already normal work scheduled for the process, we cannot add
253    * more, since we support only one normal job per process.
254    */
255   if (has_normal_work) return FALSE;
256 
257   /* If this process has pending PM work but no normal work, we can add the
258    * normal work for execution before the worker will start.
259    */
260   if (is_pending) return TRUE;
261 
262   /* However, if a worker is active for PM work, we cannot add normal work
263    * either, because the work will not be considered. For this reason, we can
264    * not use this function for processes that can possibly get postponed PM
265    * work. It is still safe for core system processes, though.
266    */
267   return FALSE;
268 }
269 
270 /*===========================================================================*
271  *				worker_try_activate			     *
272  *===========================================================================*/
273 static void worker_try_activate(struct fproc *rfp, int use_spare)
274 {
275 /* See if we can wake up a thread to do the work scheduled for the given
276  * process. If not, mark the process as having pending work for later.
277  */
278   int needed;
279 
280   /* Use the last available thread only if requested. Otherwise, leave at least
281    * one spare thread for deadlock resolution.
282    */
283   needed = use_spare ? 1 : 2;
284 
285   /* Also make sure that doing new work is allowed at all right now, which may
286    * not be the case during VFS initialization. We do always allow callback
287    * calls, i.e., calls that may use the spare thread. The reason is that we do
288    * not support callback calls being marked as pending, so the (entirely
289    * theoretical) exception here may (entirely theoretically) avoid deadlocks.
290    */
291   if (needed <= worker_available() && (!block_all || use_spare)) {
292 	worker_assign(rfp);
293   } else {
294 	rfp->fp_flags |= FP_PENDING;
295 	pending++;
296   }
297 }
298 
299 /*===========================================================================*
300  *				worker_start				     *
301  *===========================================================================*/
302 void worker_start(struct fproc *rfp, void (*func)(void), message *m_ptr,
303 	int use_spare)
304 {
305 /* Schedule work to be done by a worker thread. The work is bound to the given
306  * process. If a function pointer is given, the work is considered normal work,
307  * and the function will be called to handle it. If the function pointer is
308  * NULL, the work is considered postponed PM work, and service_pm_postponed
309  * will be called to handle it. The input message will be a copy of the given
310  * message. Optionally, the last spare (deadlock-resolving) thread may be used
311  * to execute the work immediately.
312  */
313   int is_pm_work, is_pending, is_active, has_normal_work, has_pm_work;
314 
315   assert(rfp != NULL);
316 
317   is_pm_work = (func == NULL);
318   is_pending = (rfp->fp_flags & FP_PENDING);
319   is_active = (rfp->fp_worker != NULL);
320   has_normal_work = (rfp->fp_func != NULL);
321   has_pm_work = (rfp->fp_flags & FP_PM_WORK);
322 
323   /* Sanity checks. If any of these trigger, someone messed up badly! */
324   if (is_pending || is_active) {
325 	if (is_pending && is_active)
326 		panic("work cannot be both pending and active");
327 
328 	/* The process cannot make more than one call at once. */
329 	if (!is_pm_work && has_normal_work)
330 		panic("process has two calls (%x, %x)",
331 			rfp->fp_msg.m_type, m_ptr->m_type);
332 
333 	/* PM will not send more than one job per process to us at once. */
334 	if (is_pm_work && has_pm_work)
335 		panic("got two calls from PM (%x, %x)",
336 			rfp->fp_pm_msg.m_type, m_ptr->m_type);
337 
338 	/* Despite PM's sys_delay_stop() system, it is possible that normal
339 	 * work (in particular, do_pending_pipe) arrives after postponed PM
340 	 * work has been scheduled for execution, so we don't check for that.
341 	 */
342 #if 0
343 	printf("VFS: adding %s work to %s thread\n",
344 		is_pm_work ? "PM" : "normal",
345 		is_pending ? "pending" : "active");
346 #endif
347   } else {
348 	/* Some cleanup step forgotten somewhere? */
349 	if (has_normal_work || has_pm_work)
350 		panic("worker administration error");
351   }
352 
353   /* Save the work to be performed. */
354   if (!is_pm_work) {
355 	rfp->fp_msg = *m_ptr;
356 	rfp->fp_func = func;
357   } else {
358 	rfp->fp_pm_msg = *m_ptr;
359 	rfp->fp_flags |= FP_PM_WORK;
360   }
361 
362   /* If we have not only added to existing work, go look for a free thread.
363    * Note that we won't be using the spare thread for normal work if there is
364    * already PM work pending, but that situation will never occur in practice.
365    */
366   if (!is_pending && !is_active)
367 	worker_try_activate(rfp, use_spare);
368 }
369 
370 /*===========================================================================*
371  *				worker_sleep				     *
372  *===========================================================================*/
373 static void worker_sleep(void)
374 {
375   struct worker_thread *worker = self;
376   ASSERTW(worker);
377   if (mutex_lock(&worker->w_event_mutex) != 0)
378 	panic("unable to lock event mutex");
379   if (cond_wait(&worker->w_event, &worker->w_event_mutex) != 0)
380 	panic("could not wait on conditional variable");
381   if (mutex_unlock(&worker->w_event_mutex) != 0)
382 	panic("unable to unlock event mutex");
383   self = worker;
384 }
385 
386 /*===========================================================================*
387  *				worker_wake				     *
388  *===========================================================================*/
389 static void worker_wake(struct worker_thread *worker)
390 {
391 /* Signal a worker to wake up */
392   ASSERTW(worker);
393   if (mutex_lock(&worker->w_event_mutex) != 0)
394 	panic("unable to lock event mutex");
395   if (cond_signal(&worker->w_event) != 0)
396 	panic("unable to signal conditional variable");
397   if (mutex_unlock(&worker->w_event_mutex) != 0)
398 	panic("unable to unlock event mutex");
399 }
400 
401 /*===========================================================================*
402  *				worker_suspend				     *
403  *===========================================================================*/
404 struct worker_thread *worker_suspend(void)
405 {
406 /* Suspend the current thread, saving certain thread variables. Return a
407  * pointer to the thread's worker structure for later resumption.
408  */
409 
410   ASSERTW(self);
411   assert(fp != NULL);
412   assert(self->w_fp == fp);
413   assert(fp->fp_worker == self);
414 
415   self->w_err_code = err_code;
416 
417   return self;
418 }
419 
420 /*===========================================================================*
421  *				worker_resume				     *
422  *===========================================================================*/
423 void worker_resume(struct worker_thread *org_self)
424 {
425 /* Resume the current thread after suspension, restoring thread variables. */
426 
427   ASSERTW(org_self);
428 
429   self = org_self;
430 
431   fp = self->w_fp;
432   assert(fp != NULL);
433 
434   err_code = self->w_err_code;
435 }
436 
437 /*===========================================================================*
438  *				worker_wait				     *
439  *===========================================================================*/
440 void worker_wait(void)
441 {
442 /* Put the current thread to sleep until woken up by the main thread. */
443 
444   (void) worker_suspend(); /* worker_sleep already saves and restores 'self' */
445 
446   worker_sleep();
447 
448   /* We continue here after waking up */
449   worker_resume(self);
450   assert(self->w_next == NULL);
451 }
452 
453 /*===========================================================================*
454  *				worker_signal				     *
455  *===========================================================================*/
456 void worker_signal(struct worker_thread *worker)
457 {
458   ASSERTW(worker);		/* Make sure we have a valid thread */
459   worker_wake(worker);
460 }
461 
462 /*===========================================================================*
463  *				worker_stop				     *
464  *===========================================================================*/
465 void worker_stop(struct worker_thread *worker)
466 {
467   ASSERTW(worker);		/* Make sure we have a valid thread */
468   if (worker->w_task != NONE) {
469 	/* This thread is communicating with a driver or file server */
470 	if (worker->w_drv_sendrec != NULL) {			/* Driver */
471 		worker->w_drv_sendrec->m_type = EIO;
472 		worker->w_drv_sendrec = NULL;
473 	} else if (worker->w_sendrec != NULL) {		/* FS */
474 		worker->w_sendrec->m_type = EIO;
475 		worker->w_sendrec = NULL;
476 	} else {
477 		panic("reply storage consistency error");	/* Oh dear */
478 	}
479   } else {
480 	/* This shouldn't happen at all... */
481 	printf("VFS: stopping worker not blocked on any task?\n");
482 	util_stacktrace();
483   }
484   worker_wake(worker);
485 }
486 
487 /*===========================================================================*
488  *				worker_stop_by_endpt			     *
489  *===========================================================================*/
490 void worker_stop_by_endpt(endpoint_t proc_e)
491 {
492   struct worker_thread *worker;
493   int i;
494 
495   if (proc_e == NONE) return;
496 
497   for (i = 0; i < NR_WTHREADS; i++) {
498 	worker = &workers[i];
499 	if (worker->w_fp != NULL && worker->w_task == proc_e)
500 		worker_stop(worker);
501   }
502 }
503 
504 /*===========================================================================*
505  *				worker_get				     *
506  *===========================================================================*/
507 struct worker_thread *worker_get(thread_t worker_tid)
508 {
509   int i;
510 
511   for (i = 0; i < NR_WTHREADS; i++)
512 	if (workers[i].w_tid == worker_tid)
513 		return(&workers[i]);
514 
515   return(NULL);
516 }
517 
518 /*===========================================================================*
519  *				worker_set_proc				     *
520  *===========================================================================*/
521 void worker_set_proc(struct fproc *rfp)
522 {
523 /* Perform an incredibly ugly action that completely violates the threading
524  * model: change the current working thread's process context to another
525  * process. The caller is expected to hold the lock to both the calling and the
526  * target process, and neither process is expected to continue regular
527  * operation when done. This code is here *only* and *strictly* for the reboot
528  * code, and *must not* be used for anything else.
529  */
530 
531   if (fp == rfp) return;
532 
533   if (rfp->fp_worker != NULL)
534 	panic("worker_set_proc: target process not idle");
535 
536   fp->fp_worker = NULL;
537 
538   fp = rfp;
539 
540   self->w_fp = rfp;
541   fp->fp_worker = self;
542 }
543