xref: /minix3/minix/lib/libblockdriver/driver_mt.c (revision eda6f5931d42c77e1480347b1fc3eef2f8d33806)
1 /* This file contains the multithreaded driver interface.
2  *
3  * Changes:
4  *   Aug 27, 2011   created (A. Welzel)
5  *
6  * The entry points into this file are:
7  *   blockdriver_mt_task:	the main message loop of the driver
8  *   blockdriver_mt_terminate:	break out of the main message loop
9  *   blockdriver_mt_sleep:	put the current thread to sleep
10  *   blockdriver_mt_wakeup:	wake up a sleeping thread
11  *   blockdriver_mt_set_workers:set the number of worker threads
12  */
13 
14 #include <minix/blockdriver_mt.h>
15 #include <minix/mthread.h>
16 #include <assert.h>
17 
18 #include "const.h"
19 #include "driver.h"
20 #include "mq.h"
21 
22 /* A thread ID is composed of a device ID and a per-device worker thread ID.
23  * All thread IDs must be in the range 0..(MAX_THREADS-1) inclusive.
24  */
25 #define MAKE_TID(did, wid)	((did) * MAX_WORKERS + (wid))
26 #define TID_DEVICE(tid)		((tid) / MAX_WORKERS)
27 #define TID_WORKER(tid)		((tid) % MAX_WORKERS)
28 
29 typedef unsigned int worker_id_t;
30 
31 typedef enum {
32   STATE_DEAD,
33   STATE_RUNNING,
34   STATE_BUSY,
35   STATE_EXITED
36 } worker_state;
37 
38 /* Structure with information about a worker thread. */
39 typedef struct {
40   device_id_t device_id;
41   worker_id_t worker_id;
42   worker_state state;
43   mthread_thread_t mthread;
44   mthread_event_t sleep_event;
45 } worker_t;
46 
47 /* Structure with information about a device. */
48 typedef struct {
49   device_id_t id;
50   unsigned int workers;
51   worker_t worker[MAX_WORKERS];
52   mthread_event_t queue_event;
53   mthread_rwlock_t barrier;
54 } device_t;
55 
56 static struct blockdriver *bdtab;
57 static int running = FALSE;
58 
59 static mthread_key_t worker_key;
60 
61 static device_t device[MAX_DEVICES];
62 
63 static worker_t *exited[MAX_THREADS];
64 static int num_exited = 0;
65 
66 /*===========================================================================*
67  *				enqueue					     *
68  *===========================================================================*/
69 static void enqueue(device_t *dp, const message *m_src, int ipc_status)
70 {
71 /* Enqueue a message into the device's queue, and signal the event.
72  * Must be called from the master thread.
73  */
74 
75   if (!mq_enqueue(dp->id, m_src, ipc_status))
76 	panic("blockdriver_mt: enqueue failed (message queue full)");
77 
78   mthread_event_fire(&dp->queue_event);
79 }
80 
81 /*===========================================================================*
82  *				try_dequeue				     *
83  *===========================================================================*/
84 static int try_dequeue(device_t *dp, message *m_dst, int *ipc_status)
85 {
86 /* See if a message can be dequeued from the current worker thread's device
87  * queue. If so, dequeue the message and return TRUE. If not, return FALSE.
88  * Must be called from a worker thread. Does not block.
89  */
90 
91   return mq_dequeue(dp->id, m_dst, ipc_status);
92 }
93 
94 /*===========================================================================*
95  *				dequeue					     *
96  *===========================================================================*/
97 static int dequeue(device_t *dp, worker_t *wp, message *m_dst,
98   int *ipc_status)
99 {
100 /* Dequeue a message from the current worker thread's device queue. Block the
101  * current thread if necessary. Must be called from a worker thread. Either
102  * succeeds with a message (TRUE) or indicates that the thread should be
103  * terminated (FALSE).
104  */
105 
106   do {
107 	mthread_event_wait(&dp->queue_event);
108 
109 	/* If we were woken up as a result of terminate or set_workers, break
110 	 * out of the loop and terminate the thread.
111 	 */
112 	if (!running || wp->worker_id >= dp->workers)
113 		return FALSE;
114   } while (!try_dequeue(dp, m_dst, ipc_status));
115 
116   return TRUE;
117 }
118 
119 /*===========================================================================*
120  *				is_transfer_req				     *
121  *===========================================================================*/
122 static int is_transfer_req(int type)
123 {
124 /* Return whether the given block device request is a transfer request.
125  */
126 
127   switch (type) {
128   case BDEV_READ:
129   case BDEV_WRITE:
130   case BDEV_GATHER:
131   case BDEV_SCATTER:
132 	return TRUE;
133 
134   default:
135 	return FALSE;
136   }
137 }
138 
139 /*===========================================================================*
140  *				worker_thread				     *
141  *===========================================================================*/
142 static void *worker_thread(void *param)
143 {
144 /* The worker thread loop. Set up the thread-specific reference to itself and
145  * start looping. The loop consists of blocking dequeing and handling messages.
146  * After handling a message, the thread might have been stopped, so we check
147  * for this condition and exit if so.
148  */
149   worker_t *wp;
150   device_t *dp;
151   thread_id_t tid;
152   message m;
153   int ipc_status;
154 
155   wp = (worker_t *) param;
156   assert(wp != NULL);
157   dp = &device[wp->device_id];
158   tid = MAKE_TID(wp->device_id, wp->worker_id);
159 
160   if (mthread_setspecific(worker_key, wp))
161 	panic("blockdriver_mt: could not save local thread pointer");
162 
163   while (running && wp->worker_id < dp->workers) {
164 
165 	/* See if a new message is available right away. */
166 	if (!try_dequeue(dp, &m, &ipc_status)) {
167 
168 		/* If not, block waiting for a new message or a thread
169 		 * termination event.
170 		 */
171 		if (!dequeue(dp, wp, &m, &ipc_status))
172 			break;
173 	}
174 
175 	/* Even if the thread was stopped before, a new message resumes it. */
176 	wp->state = STATE_BUSY;
177 
178 	/* If the request is a transfer request, we acquire the read barrier
179 	 * lock. Otherwise, we acquire the write lock.
180 	 */
181 	if (is_transfer_req(m.m_type))
182 		mthread_rwlock_rdlock(&dp->barrier);
183 	else
184 		mthread_rwlock_wrlock(&dp->barrier);
185 
186 	/* Handle the request and send a reply. */
187 	blockdriver_process_on_thread(bdtab, &m, ipc_status, tid);
188 
189 	/* Switch the thread back to running state, and unlock the barrier. */
190 	wp->state = STATE_RUNNING;
191 	mthread_rwlock_unlock(&dp->barrier);
192   }
193 
194   /* Clean up and terminate this thread. */
195   if (mthread_setspecific(worker_key, NULL))
196 	panic("blockdriver_mt: could not delete local thread pointer");
197 
198   wp->state = STATE_EXITED;
199 
200   exited[num_exited++] = wp;
201 
202   return NULL;
203 }
204 
205 /*===========================================================================*
206  *				master_create_worker			     *
207  *===========================================================================*/
208 static void master_create_worker(worker_t *wp, worker_id_t worker_id,
209   device_id_t device_id)
210 {
211 /* Start a new worker thread.
212  */
213   mthread_attr_t attr;
214   int r;
215 
216   wp->device_id = device_id;
217   wp->worker_id = worker_id;
218   wp->state = STATE_RUNNING;
219 
220   /* Initialize synchronization primitives. */
221   mthread_event_init(&wp->sleep_event);
222 
223   r = mthread_attr_init(&attr);
224   if (r != 0)
225 	panic("blockdriver_mt: could not initialize attributes (%d)", r);
226 
227   r = mthread_attr_setstacksize(&attr, STACK_SIZE);
228   if (r != 0)
229 	panic("blockdriver_mt: could not set stack size (%d)", r);
230 
231   r = mthread_create(&wp->mthread, &attr, worker_thread, (void *) wp);
232   if (r != 0)
233 	panic("blockdriver_mt: could not start thread %d (%d)", worker_id, r);
234 
235   mthread_attr_destroy(&attr);
236 }
237 
238 /*===========================================================================*
239  *				master_destroy_worker			     *
240  *===========================================================================*/
241 static void master_destroy_worker(worker_t *wp)
242 {
243 /* Clean up resources used by an exited worker thread.
244  */
245 
246   assert(wp != NULL);
247   assert(wp->state == STATE_EXITED);
248 
249   /* Join the thread. */
250   if (mthread_join(wp->mthread, NULL))
251 	panic("blockdriver_mt: could not join thread %d", wp->worker_id);
252 
253   /* Destroy resources. */
254   mthread_event_destroy(&wp->sleep_event);
255 
256   wp->state = STATE_DEAD;
257 }
258 
259 /*===========================================================================*
260  *				master_handle_exits			     *
261  *===========================================================================*/
262 static void master_handle_exits(void)
263 {
264 /* Destroy the remains of all exited threads.
265  */
266   int i;
267 
268   for (i = 0; i < num_exited; i++)
269 	master_destroy_worker(exited[i]);
270 
271   num_exited = 0;
272 }
273 
274 /*===========================================================================*
275  *				master_handle_message			     *
276  *===========================================================================*/
277 static void master_handle_message(message *m_ptr, int ipc_status)
278 {
279 /* For real request messages, query the device ID, start a thread if none is
280  * free and the maximum number of threads for that device has not yet been
281  * reached, and enqueue the message in the devices's message queue. All other
282  * messages are handled immediately from the main thread.
283  */
284   device_id_t id;
285   worker_t *wp;
286   device_t *dp;
287   unsigned int wid;
288   int r;
289 
290   /* If this is not a block driver request, we cannot get the minor device
291    * associated with it, and thus we can not tell which thread should process
292    * it either. In that case, the master thread has to handle it instead.
293    */
294   if (is_ipc_notify(ipc_status) || !IS_BDEV_RQ(m_ptr->m_type)) {
295 	/* Process as 'other' message. */
296 	blockdriver_process_on_thread(bdtab, m_ptr, ipc_status, MAIN_THREAD);
297 
298 	return;
299   }
300 
301   /* Query the device ID. Upon failure, send the error code to the caller. */
302   r = (*bdtab->bdr_device)(m_ptr->m_lbdev_lblockdriver_msg.minor, &id);
303   if (r != OK) {
304 	blockdriver_reply(m_ptr, ipc_status, r);
305 
306 	return;
307   }
308 
309   /* Look up the device control block. */
310   assert(id >= 0 && id < MAX_DEVICES);
311   dp = &device[id];
312 
313   /* Find the first non-busy worker thread. */
314   for (wid = 0; wid < dp->workers; wid++)
315 	if (dp->worker[wid].state != STATE_BUSY)
316 		break;
317 
318   /* If the worker thread is dead, start a thread now, unless we have already
319    * reached the maximum number of threads.
320    */
321   if (wid < dp->workers) {
322 	wp = &dp->worker[wid];
323 
324 	assert(wp->state != STATE_EXITED);
325 
326 	/* If the non-busy thread has not yet been created, create one now. */
327 	if (wp->state == STATE_DEAD)
328 		master_create_worker(wp, wid, dp->id);
329   }
330 
331   /* Enqueue the message at the device queue. */
332   enqueue(dp, m_ptr, ipc_status);
333 }
334 
335 /*===========================================================================*
336  *				master_init				     *
337  *===========================================================================*/
338 static void master_init(struct blockdriver *bdp)
339 {
340 /* Initialize the state of the master thread.
341  */
342   int i, j;
343 
344   assert(bdp != NULL);
345   assert(bdp->bdr_device != NULL);
346 
347   bdtab = bdp;
348 
349   /* Initialize device-specific data structures. */
350   for (i = 0; i < MAX_DEVICES; i++) {
351 	device[i].id = i;
352 	device[i].workers = 1;
353 	mthread_event_init(&device[i].queue_event);
354 	mthread_rwlock_init(&device[i].barrier);
355 
356 	for (j = 0; j < MAX_WORKERS; j++)
357 		device[i].worker[j].state = STATE_DEAD;
358   }
359 
360   /* Initialize a per-thread key, where each worker thread stores its own
361    * reference to the worker structure.
362    */
363   if (mthread_key_create(&worker_key, NULL))
364 	panic("blockdriver_mt: error initializing worker key");
365 }
366 
367 /*===========================================================================*
368  *				blockdriver_mt_get_tid			     *
369  *===========================================================================*/
370 thread_id_t blockdriver_mt_get_tid(void)
371 {
372 /* Return back the ID of this thread.
373  */
374   worker_t *wp;
375 
376   wp = (worker_t *) mthread_getspecific(worker_key);
377 
378   if (wp == NULL)
379 	panic("blockdriver_mt: master thread cannot query thread ID\n");
380 
381   return MAKE_TID(wp->device_id, wp->worker_id);
382 }
383 
384 /*===========================================================================*
385  *				blockdriver_mt_receive			     *
386  *===========================================================================*/
387 static void blockdriver_mt_receive(message *m_ptr, int *ipc_status)
388 {
389 /* Receive a message.
390  */
391   int r;
392 
393   r = sef_receive_status(ANY, m_ptr, ipc_status);
394 
395   if (r != OK)
396 	panic("blockdriver_mt: sef_receive_status() returned %d", r);
397 }
398 
399 /*===========================================================================*
400  *				blockdriver_mt_task			     *
401  *===========================================================================*/
402 void blockdriver_mt_task(struct blockdriver *driver_tab)
403 {
404 /* The multithreaded driver task.
405  */
406   int ipc_status, i;
407   message mess;
408 
409   /* Initialize first if necessary. */
410   if (!running) {
411 	master_init(driver_tab);
412 
413 	running = TRUE;
414   }
415 
416   /* The main message loop. */
417   while (running) {
418 	/* Receive a message. */
419 	blockdriver_mt_receive(&mess, &ipc_status);
420 
421 	/* Dispatch the message. */
422 	master_handle_message(&mess, ipc_status);
423 
424 	/* Let other threads run. */
425 	mthread_yield_all();
426 
427 	/* Clean up any exited threads. */
428 	if (num_exited > 0)
429 		master_handle_exits();
430   }
431 
432   /* Free up resources. */
433   for (i = 0; i < MAX_DEVICES; i++)
434 	mthread_event_destroy(&device[i].queue_event);
435 }
436 
437 /*===========================================================================*
438  *				blockdriver_mt_terminate		     *
439  *===========================================================================*/
440 void blockdriver_mt_terminate(void)
441 {
442 /* Instruct libblockdriver to shut down.
443  */
444 
445   running = FALSE;
446 }
447 
448 /*===========================================================================*
449  *				blockdriver_mt_sleep			     *
450  *===========================================================================*/
451 void blockdriver_mt_sleep(void)
452 {
453 /* Let the current thread sleep until it gets woken up by the master thread.
454  */
455   worker_t *wp;
456 
457   wp = (worker_t *) mthread_getspecific(worker_key);
458 
459   if (wp == NULL)
460 	panic("blockdriver_mt: master thread cannot sleep");
461 
462   mthread_event_wait(&wp->sleep_event);
463 }
464 
465 /*===========================================================================*
466  *				blockdriver_mt_wakeup			     *
467  *===========================================================================*/
468 void blockdriver_mt_wakeup(thread_id_t id)
469 {
470 /* Wake up a sleeping worker thread from the master thread.
471  */
472   worker_t *wp;
473   device_id_t device_id;
474   worker_id_t worker_id;
475 
476   device_id = TID_DEVICE(id);
477   worker_id = TID_WORKER(id);
478 
479   assert(device_id >= 0 && device_id < MAX_DEVICES);
480   assert(worker_id >= 0 && worker_id < MAX_WORKERS);
481 
482   wp = &device[device_id].worker[worker_id];
483 
484   assert(wp->state == STATE_RUNNING || wp->state == STATE_BUSY);
485 
486   mthread_event_fire(&wp->sleep_event);
487 }
488 
489 /*===========================================================================*
490  *				blockdriver_mt_set_workers		     *
491  *===========================================================================*/
492 void blockdriver_mt_set_workers(device_id_t id, unsigned int workers)
493 {
494 /* Set the number of worker threads for the given device.
495  */
496   device_t *dp;
497 
498   assert(id >= 0 && id < MAX_DEVICES);
499 
500   if (workers > MAX_WORKERS)
501 	workers = MAX_WORKERS;
502 
503   dp = &device[id];
504 
505   /* If we are cleaning up, wake up all threads waiting on a queue event. */
506   if (workers == 1 && dp->workers > workers)
507 	mthread_event_fire_all(&dp->queue_event);
508 
509   dp->workers = workers;
510 }
511