1 /* This file contains the multithreaded driver interface. 2 * 3 * Changes: 4 * Aug 27, 2011 created (A. Welzel) 5 * 6 * The entry points into this file are: 7 * blockdriver_mt_task: the main message loop of the driver 8 * blockdriver_mt_terminate: break out of the main message loop 9 * blockdriver_mt_sleep: put the current thread to sleep 10 * blockdriver_mt_wakeup: wake up a sleeping thread 11 * blockdriver_mt_set_workers:set the number of worker threads 12 */ 13 14 #include <minix/blockdriver_mt.h> 15 #include <minix/mthread.h> 16 #include <assert.h> 17 18 #include "const.h" 19 #include "driver.h" 20 #include "mq.h" 21 22 /* A thread ID is composed of a device ID and a per-device worker thread ID. 23 * All thread IDs must be in the range 0..(MAX_THREADS-1) inclusive. 24 */ 25 #define MAKE_TID(did, wid) ((did) * MAX_WORKERS + (wid)) 26 #define TID_DEVICE(tid) ((tid) / MAX_WORKERS) 27 #define TID_WORKER(tid) ((tid) % MAX_WORKERS) 28 29 typedef unsigned int worker_id_t; 30 31 typedef enum { 32 STATE_DEAD, 33 STATE_RUNNING, 34 STATE_BUSY, 35 STATE_EXITED 36 } worker_state; 37 38 /* Structure with information about a worker thread. */ 39 typedef struct { 40 device_id_t device_id; 41 worker_id_t worker_id; 42 worker_state state; 43 mthread_thread_t mthread; 44 mthread_event_t sleep_event; 45 } worker_t; 46 47 /* Structure with information about a device. */ 48 typedef struct { 49 device_id_t id; 50 unsigned int workers; 51 worker_t worker[MAX_WORKERS]; 52 mthread_event_t queue_event; 53 mthread_rwlock_t barrier; 54 } device_t; 55 56 static struct blockdriver *bdtab; 57 static int running = FALSE; 58 59 static mthread_key_t worker_key; 60 61 static device_t device[MAX_DEVICES]; 62 63 static worker_t *exited[MAX_THREADS]; 64 static int num_exited = 0; 65 66 /*===========================================================================* 67 * enqueue * 68 *===========================================================================*/ 69 static void enqueue(device_t *dp, const message *m_src, int ipc_status) 70 { 71 /* Enqueue a message into the device's queue, and signal the event. 72 * Must be called from the master thread. 73 */ 74 75 if (!mq_enqueue(dp->id, m_src, ipc_status)) 76 panic("blockdriver_mt: enqueue failed (message queue full)"); 77 78 mthread_event_fire(&dp->queue_event); 79 } 80 81 /*===========================================================================* 82 * try_dequeue * 83 *===========================================================================*/ 84 static int try_dequeue(device_t *dp, message *m_dst, int *ipc_status) 85 { 86 /* See if a message can be dequeued from the current worker thread's device 87 * queue. If so, dequeue the message and return TRUE. If not, return FALSE. 88 * Must be called from a worker thread. Does not block. 89 */ 90 91 return mq_dequeue(dp->id, m_dst, ipc_status); 92 } 93 94 /*===========================================================================* 95 * dequeue * 96 *===========================================================================*/ 97 static int dequeue(device_t *dp, worker_t *wp, message *m_dst, 98 int *ipc_status) 99 { 100 /* Dequeue a message from the current worker thread's device queue. Block the 101 * current thread if necessary. Must be called from a worker thread. Either 102 * succeeds with a message (TRUE) or indicates that the thread should be 103 * terminated (FALSE). 104 */ 105 106 do { 107 mthread_event_wait(&dp->queue_event); 108 109 /* If we were woken up as a result of terminate or set_workers, break 110 * out of the loop and terminate the thread. 111 */ 112 if (!running || wp->worker_id >= dp->workers) 113 return FALSE; 114 } while (!try_dequeue(dp, m_dst, ipc_status)); 115 116 return TRUE; 117 } 118 119 /*===========================================================================* 120 * is_transfer_req * 121 *===========================================================================*/ 122 static int is_transfer_req(int type) 123 { 124 /* Return whether the given block device request is a transfer request. 125 */ 126 127 switch (type) { 128 case BDEV_READ: 129 case BDEV_WRITE: 130 case BDEV_GATHER: 131 case BDEV_SCATTER: 132 return TRUE; 133 134 default: 135 return FALSE; 136 } 137 } 138 139 /*===========================================================================* 140 * worker_thread * 141 *===========================================================================*/ 142 static void *worker_thread(void *param) 143 { 144 /* The worker thread loop. Set up the thread-specific reference to itself and 145 * start looping. The loop consists of blocking dequeing and handling messages. 146 * After handling a message, the thread might have been stopped, so we check 147 * for this condition and exit if so. 148 */ 149 worker_t *wp; 150 device_t *dp; 151 thread_id_t tid; 152 message m; 153 int ipc_status; 154 155 wp = (worker_t *) param; 156 assert(wp != NULL); 157 dp = &device[wp->device_id]; 158 tid = MAKE_TID(wp->device_id, wp->worker_id); 159 160 if (mthread_setspecific(worker_key, wp)) 161 panic("blockdriver_mt: could not save local thread pointer"); 162 163 while (running && wp->worker_id < dp->workers) { 164 165 /* See if a new message is available right away. */ 166 if (!try_dequeue(dp, &m, &ipc_status)) { 167 168 /* If not, block waiting for a new message or a thread 169 * termination event. 170 */ 171 if (!dequeue(dp, wp, &m, &ipc_status)) 172 break; 173 } 174 175 /* Even if the thread was stopped before, a new message resumes it. */ 176 wp->state = STATE_BUSY; 177 178 /* If the request is a transfer request, we acquire the read barrier 179 * lock. Otherwise, we acquire the write lock. 180 */ 181 if (is_transfer_req(m.m_type)) 182 mthread_rwlock_rdlock(&dp->barrier); 183 else 184 mthread_rwlock_wrlock(&dp->barrier); 185 186 /* Handle the request and send a reply. */ 187 blockdriver_process_on_thread(bdtab, &m, ipc_status, tid); 188 189 /* Switch the thread back to running state, and unlock the barrier. */ 190 wp->state = STATE_RUNNING; 191 mthread_rwlock_unlock(&dp->barrier); 192 } 193 194 /* Clean up and terminate this thread. */ 195 if (mthread_setspecific(worker_key, NULL)) 196 panic("blockdriver_mt: could not delete local thread pointer"); 197 198 wp->state = STATE_EXITED; 199 200 exited[num_exited++] = wp; 201 202 return NULL; 203 } 204 205 /*===========================================================================* 206 * master_create_worker * 207 *===========================================================================*/ 208 static void master_create_worker(worker_t *wp, worker_id_t worker_id, 209 device_id_t device_id) 210 { 211 /* Start a new worker thread. 212 */ 213 mthread_attr_t attr; 214 int r; 215 216 wp->device_id = device_id; 217 wp->worker_id = worker_id; 218 wp->state = STATE_RUNNING; 219 220 /* Initialize synchronization primitives. */ 221 mthread_event_init(&wp->sleep_event); 222 223 r = mthread_attr_init(&attr); 224 if (r != 0) 225 panic("blockdriver_mt: could not initialize attributes (%d)", r); 226 227 r = mthread_attr_setstacksize(&attr, STACK_SIZE); 228 if (r != 0) 229 panic("blockdriver_mt: could not set stack size (%d)", r); 230 231 r = mthread_create(&wp->mthread, &attr, worker_thread, (void *) wp); 232 if (r != 0) 233 panic("blockdriver_mt: could not start thread %d (%d)", worker_id, r); 234 235 mthread_attr_destroy(&attr); 236 } 237 238 /*===========================================================================* 239 * master_destroy_worker * 240 *===========================================================================*/ 241 static void master_destroy_worker(worker_t *wp) 242 { 243 /* Clean up resources used by an exited worker thread. 244 */ 245 246 assert(wp != NULL); 247 assert(wp->state == STATE_EXITED); 248 249 /* Join the thread. */ 250 if (mthread_join(wp->mthread, NULL)) 251 panic("blockdriver_mt: could not join thread %d", wp->worker_id); 252 253 /* Destroy resources. */ 254 mthread_event_destroy(&wp->sleep_event); 255 256 wp->state = STATE_DEAD; 257 } 258 259 /*===========================================================================* 260 * master_handle_exits * 261 *===========================================================================*/ 262 static void master_handle_exits(void) 263 { 264 /* Destroy the remains of all exited threads. 265 */ 266 int i; 267 268 for (i = 0; i < num_exited; i++) 269 master_destroy_worker(exited[i]); 270 271 num_exited = 0; 272 } 273 274 /*===========================================================================* 275 * master_handle_message * 276 *===========================================================================*/ 277 static void master_handle_message(message *m_ptr, int ipc_status) 278 { 279 /* For real request messages, query the device ID, start a thread if none is 280 * free and the maximum number of threads for that device has not yet been 281 * reached, and enqueue the message in the devices's message queue. All other 282 * messages are handled immediately from the main thread. 283 */ 284 device_id_t id; 285 worker_t *wp; 286 device_t *dp; 287 unsigned int wid; 288 int r; 289 290 /* If this is not a block driver request, we cannot get the minor device 291 * associated with it, and thus we can not tell which thread should process 292 * it either. In that case, the master thread has to handle it instead. 293 */ 294 if (is_ipc_notify(ipc_status) || !IS_BDEV_RQ(m_ptr->m_type)) { 295 /* Process as 'other' message. */ 296 blockdriver_process_on_thread(bdtab, m_ptr, ipc_status, MAIN_THREAD); 297 298 return; 299 } 300 301 /* Query the device ID. Upon failure, send the error code to the caller. */ 302 r = (*bdtab->bdr_device)(m_ptr->m_lbdev_lblockdriver_msg.minor, &id); 303 if (r != OK) { 304 blockdriver_reply(m_ptr, ipc_status, r); 305 306 return; 307 } 308 309 /* Look up the device control block. */ 310 assert(id >= 0 && id < MAX_DEVICES); 311 dp = &device[id]; 312 313 /* Find the first non-busy worker thread. */ 314 for (wid = 0; wid < dp->workers; wid++) 315 if (dp->worker[wid].state != STATE_BUSY) 316 break; 317 318 /* If the worker thread is dead, start a thread now, unless we have already 319 * reached the maximum number of threads. 320 */ 321 if (wid < dp->workers) { 322 wp = &dp->worker[wid]; 323 324 assert(wp->state != STATE_EXITED); 325 326 /* If the non-busy thread has not yet been created, create one now. */ 327 if (wp->state == STATE_DEAD) 328 master_create_worker(wp, wid, dp->id); 329 } 330 331 /* Enqueue the message at the device queue. */ 332 enqueue(dp, m_ptr, ipc_status); 333 } 334 335 /*===========================================================================* 336 * master_init * 337 *===========================================================================*/ 338 static void master_init(struct blockdriver *bdp) 339 { 340 /* Initialize the state of the master thread. 341 */ 342 int i, j; 343 344 assert(bdp != NULL); 345 assert(bdp->bdr_device != NULL); 346 347 bdtab = bdp; 348 349 /* Initialize device-specific data structures. */ 350 for (i = 0; i < MAX_DEVICES; i++) { 351 device[i].id = i; 352 device[i].workers = 1; 353 mthread_event_init(&device[i].queue_event); 354 mthread_rwlock_init(&device[i].barrier); 355 356 for (j = 0; j < MAX_WORKERS; j++) 357 device[i].worker[j].state = STATE_DEAD; 358 } 359 360 /* Initialize a per-thread key, where each worker thread stores its own 361 * reference to the worker structure. 362 */ 363 if (mthread_key_create(&worker_key, NULL)) 364 panic("blockdriver_mt: error initializing worker key"); 365 } 366 367 /*===========================================================================* 368 * blockdriver_mt_get_tid * 369 *===========================================================================*/ 370 thread_id_t blockdriver_mt_get_tid(void) 371 { 372 /* Return back the ID of this thread. 373 */ 374 worker_t *wp; 375 376 wp = (worker_t *) mthread_getspecific(worker_key); 377 378 if (wp == NULL) 379 panic("blockdriver_mt: master thread cannot query thread ID\n"); 380 381 return MAKE_TID(wp->device_id, wp->worker_id); 382 } 383 384 /*===========================================================================* 385 * blockdriver_mt_receive * 386 *===========================================================================*/ 387 static void blockdriver_mt_receive(message *m_ptr, int *ipc_status) 388 { 389 /* Receive a message. 390 */ 391 int r; 392 393 r = sef_receive_status(ANY, m_ptr, ipc_status); 394 395 if (r != OK) 396 panic("blockdriver_mt: sef_receive_status() returned %d", r); 397 } 398 399 /*===========================================================================* 400 * blockdriver_mt_task * 401 *===========================================================================*/ 402 void blockdriver_mt_task(struct blockdriver *driver_tab) 403 { 404 /* The multithreaded driver task. 405 */ 406 int ipc_status, i; 407 message mess; 408 409 /* Initialize first if necessary. */ 410 if (!running) { 411 master_init(driver_tab); 412 413 running = TRUE; 414 } 415 416 /* The main message loop. */ 417 while (running) { 418 /* Receive a message. */ 419 blockdriver_mt_receive(&mess, &ipc_status); 420 421 /* Dispatch the message. */ 422 master_handle_message(&mess, ipc_status); 423 424 /* Let other threads run. */ 425 mthread_yield_all(); 426 427 /* Clean up any exited threads. */ 428 if (num_exited > 0) 429 master_handle_exits(); 430 } 431 432 /* Free up resources. */ 433 for (i = 0; i < MAX_DEVICES; i++) 434 mthread_event_destroy(&device[i].queue_event); 435 } 436 437 /*===========================================================================* 438 * blockdriver_mt_terminate * 439 *===========================================================================*/ 440 void blockdriver_mt_terminate(void) 441 { 442 /* Instruct libblockdriver to shut down. 443 */ 444 445 running = FALSE; 446 } 447 448 /*===========================================================================* 449 * blockdriver_mt_sleep * 450 *===========================================================================*/ 451 void blockdriver_mt_sleep(void) 452 { 453 /* Let the current thread sleep until it gets woken up by the master thread. 454 */ 455 worker_t *wp; 456 457 wp = (worker_t *) mthread_getspecific(worker_key); 458 459 if (wp == NULL) 460 panic("blockdriver_mt: master thread cannot sleep"); 461 462 mthread_event_wait(&wp->sleep_event); 463 } 464 465 /*===========================================================================* 466 * blockdriver_mt_wakeup * 467 *===========================================================================*/ 468 void blockdriver_mt_wakeup(thread_id_t id) 469 { 470 /* Wake up a sleeping worker thread from the master thread. 471 */ 472 worker_t *wp; 473 device_id_t device_id; 474 worker_id_t worker_id; 475 476 device_id = TID_DEVICE(id); 477 worker_id = TID_WORKER(id); 478 479 assert(device_id >= 0 && device_id < MAX_DEVICES); 480 assert(worker_id >= 0 && worker_id < MAX_WORKERS); 481 482 wp = &device[device_id].worker[worker_id]; 483 484 assert(wp->state == STATE_RUNNING || wp->state == STATE_BUSY); 485 486 mthread_event_fire(&wp->sleep_event); 487 } 488 489 /*===========================================================================* 490 * blockdriver_mt_set_workers * 491 *===========================================================================*/ 492 void blockdriver_mt_set_workers(device_id_t id, unsigned int workers) 493 { 494 /* Set the number of worker threads for the given device. 495 */ 496 device_t *dp; 497 498 assert(id >= 0 && id < MAX_DEVICES); 499 500 if (workers > MAX_WORKERS) 501 workers = MAX_WORKERS; 502 503 dp = &device[id]; 504 505 /* If we are cleaning up, wake up all threads waiting on a queue event. */ 506 if (workers == 1 && dp->workers > workers) 507 mthread_event_fire_all(&dp->queue_event); 508 509 dp->workers = workers; 510 } 511