1 /* $NetBSD: rf_engine.c,v 1.31 2004/01/02 21:41:08 oster Exp $ */ 2 /* 3 * Copyright (c) 1995 Carnegie-Mellon University. 4 * All rights reserved. 5 * 6 * Author: William V. Courtright II, Mark Holland, Rachad Youssef 7 * 8 * Permission to use, copy, modify and distribute this software and 9 * its documentation is hereby granted, provided that both the copyright 10 * notice and this permission notice appear in all copies of the 11 * software, derivative works or modified versions, and any portions 12 * thereof, and that both notices appear in supporting documentation. 13 * 14 * CARNEGIE MELLON ALLOWS FREE USE OF THIS SOFTWARE IN ITS "AS IS" 15 * CONDITION. CARNEGIE MELLON DISCLAIMS ANY LIABILITY OF ANY KIND 16 * FOR ANY DAMAGES WHATSOEVER RESULTING FROM THE USE OF THIS SOFTWARE. 17 * 18 * Carnegie Mellon requests users of this software to return to 19 * 20 * Software Distribution Coordinator or Software.Distribution@CS.CMU.EDU 21 * School of Computer Science 22 * Carnegie Mellon University 23 * Pittsburgh PA 15213-3890 24 * 25 * any improvements or extensions that they make and grant Carnegie the 26 * rights to redistribute these changes. 27 */ 28 29 /**************************************************************************** 30 * * 31 * engine.c -- code for DAG execution engine * 32 * * 33 * Modified to work as follows (holland): * 34 * A user-thread calls into DispatchDAG, which fires off the nodes that * 35 * are direct successors to the header node. DispatchDAG then returns, * 36 * and the rest of the I/O continues asynchronously. As each node * 37 * completes, the node execution function calls FinishNode(). FinishNode * 38 * scans the list of successors to the node and increments the antecedent * 39 * counts. Each node that becomes enabled is placed on a central node * 40 * queue. A dedicated dag-execution thread grabs nodes off of this * 41 * queue and fires them. * 42 * * 43 * NULL nodes are never fired. * 44 * * 45 * Terminator nodes are never fired, but rather cause the callback * 46 * associated with the DAG to be invoked. * 47 * * 48 * If a node fails, the dag either rolls forward to the completion or * 49 * rolls back, undoing previously-completed nodes and fails atomically. * 50 * The direction of recovery is determined by the location of the failed * 51 * node in the graph. If the failure occurred before the commit node in * 52 * the graph, backward recovery is used. Otherwise, forward recovery is * 53 * used. * 54 * * 55 ****************************************************************************/ 56 57 #include <sys/cdefs.h> 58 __KERNEL_RCSID(0, "$NetBSD: rf_engine.c,v 1.31 2004/01/02 21:41:08 oster Exp $"); 59 60 #include <sys/errno.h> 61 62 #include "rf_threadstuff.h" 63 #include "rf_dag.h" 64 #include "rf_engine.h" 65 #include "rf_etimer.h" 66 #include "rf_general.h" 67 #include "rf_dagutils.h" 68 #include "rf_shutdown.h" 69 #include "rf_raid.h" 70 71 static void rf_ShutdownEngine(void *); 72 static void DAGExecutionThread(RF_ThreadArg_t arg); 73 static void rf_RaidIOThread(RF_ThreadArg_t arg); 74 75 /* synchronization primitives for this file. DO_WAIT should be enclosed in a while loop. */ 76 77 #define DO_LOCK(_r_) \ 78 do { \ 79 ks = splbio(); \ 80 RF_LOCK_MUTEX((_r_)->node_queue_mutex); \ 81 } while (0) 82 83 #define DO_UNLOCK(_r_) \ 84 do { \ 85 RF_UNLOCK_MUTEX((_r_)->node_queue_mutex); \ 86 splx(ks); \ 87 } while (0) 88 89 #define DO_WAIT(_r_) \ 90 RF_WAIT_COND((_r_)->node_queue, (_r_)->node_queue_mutex) 91 92 #define DO_SIGNAL(_r_) \ 93 RF_BROADCAST_COND((_r_)->node_queue) /* XXX RF_SIGNAL_COND? */ 94 95 static void 96 rf_ShutdownEngine(void *arg) 97 { 98 RF_Raid_t *raidPtr; 99 int ks; 100 101 raidPtr = (RF_Raid_t *) arg; 102 103 /* Tell the rf_RaidIOThread to shutdown */ 104 simple_lock(&(raidPtr->iodone_lock)); 105 106 raidPtr->shutdown_raidio = 1; 107 wakeup(&(raidPtr->iodone)); 108 109 /* ...and wait for it to tell us it has finished */ 110 while (raidPtr->shutdown_raidio) 111 ltsleep(&(raidPtr->shutdown_raidio), PRIBIO, "raidshutdown", 0, 112 &(raidPtr->iodone_lock)); 113 114 simple_unlock(&(raidPtr->iodone_lock)); 115 116 /* Now shut down the DAG execution engine. */ 117 DO_LOCK(raidPtr); 118 raidPtr->shutdown_engine = 1; 119 DO_SIGNAL(raidPtr); 120 DO_UNLOCK(raidPtr); 121 122 } 123 124 int 125 rf_ConfigureEngine(RF_ShutdownList_t **listp, RF_Raid_t *raidPtr, 126 RF_Config_t *cfgPtr) 127 { 128 int rc; 129 130 rf_mutex_init(&raidPtr->node_queue_mutex); 131 raidPtr->node_queue = NULL; 132 raidPtr->dags_in_flight = 0; 133 134 /* we create the execution thread only once per system boot. no need 135 * to check return code b/c the kernel panics if it can't create the 136 * thread. */ 137 if (rf_engineDebug) { 138 printf("raid%d: Creating engine thread\n", raidPtr->raidid); 139 } 140 if (RF_CREATE_ENGINE_THREAD(raidPtr->engine_thread, 141 DAGExecutionThread, raidPtr, 142 "raid%d", raidPtr->raidid)) { 143 printf("raid%d: Unable to create engine thread\n", 144 raidPtr->raidid); 145 return (ENOMEM); 146 } 147 if (RF_CREATE_ENGINE_THREAD(raidPtr->engine_helper_thread, 148 rf_RaidIOThread, raidPtr, 149 "raidio%d", raidPtr->raidid)) { 150 printf("raid%d: Unable to create raidio thread\n", 151 raidPtr->raidid); 152 return (ENOMEM); 153 } 154 if (rf_engineDebug) { 155 printf("raid%d: Created engine thread\n", raidPtr->raidid); 156 } 157 158 /* engine thread is now running and waiting for work */ 159 if (rf_engineDebug) { 160 printf("raid%d: Engine thread running and waiting for events\n", raidPtr->raidid); 161 } 162 rc = rf_ShutdownCreate(listp, rf_ShutdownEngine, raidPtr); 163 if (rc) { 164 rf_print_unable_to_add_shutdown(__FILE__, __LINE__, rc); 165 rf_ShutdownEngine(NULL); 166 } 167 return (rc); 168 } 169 170 static int 171 BranchDone(RF_DagNode_t *node) 172 { 173 int i; 174 175 /* return true if forward execution is completed for a node and it's 176 * succedents */ 177 switch (node->status) { 178 case rf_wait: 179 /* should never be called in this state */ 180 RF_PANIC(); 181 break; 182 case rf_fired: 183 /* node is currently executing, so we're not done */ 184 return (RF_FALSE); 185 case rf_good: 186 /* for each succedent recursively check branch */ 187 for (i = 0; i < node->numSuccedents; i++) 188 if (!BranchDone(node->succedents[i])) 189 return RF_FALSE; 190 return RF_TRUE; /* node and all succedent branches aren't in 191 * fired state */ 192 case rf_bad: 193 /* succedents can't fire */ 194 return (RF_TRUE); 195 case rf_recover: 196 /* should never be called in this state */ 197 RF_PANIC(); 198 break; 199 case rf_undone: 200 case rf_panic: 201 /* XXX need to fix this case */ 202 /* for now, assume that we're done */ 203 return (RF_TRUE); 204 default: 205 /* illegal node status */ 206 RF_PANIC(); 207 break; 208 } 209 } 210 211 static int 212 NodeReady(RF_DagNode_t *node) 213 { 214 int ready; 215 216 switch (node->dagHdr->status) { 217 case rf_enable: 218 case rf_rollForward: 219 if ((node->status == rf_wait) && 220 (node->numAntecedents == node->numAntDone)) 221 ready = RF_TRUE; 222 else 223 ready = RF_FALSE; 224 break; 225 case rf_rollBackward: 226 RF_ASSERT(node->numSuccDone <= node->numSuccedents); 227 RF_ASSERT(node->numSuccFired <= node->numSuccedents); 228 RF_ASSERT(node->numSuccFired <= node->numSuccDone); 229 if ((node->status == rf_good) && 230 (node->numSuccDone == node->numSuccedents)) 231 ready = RF_TRUE; 232 else 233 ready = RF_FALSE; 234 break; 235 default: 236 printf("Execution engine found illegal DAG status in NodeReady\n"); 237 RF_PANIC(); 238 break; 239 } 240 241 return (ready); 242 } 243 244 245 246 /* user context and dag-exec-thread context: Fire a node. The node's 247 * status field determines which function, do or undo, to be fired. 248 * This routine assumes that the node's status field has alread been 249 * set to "fired" or "recover" to indicate the direction of execution. 250 */ 251 static void 252 FireNode(RF_DagNode_t *node) 253 { 254 switch (node->status) { 255 case rf_fired: 256 /* fire the do function of a node */ 257 if (rf_engineDebug) { 258 printf("raid%d: Firing node 0x%lx (%s)\n", 259 node->dagHdr->raidPtr->raidid, 260 (unsigned long) node, node->name); 261 } 262 if (node->flags & RF_DAGNODE_FLAG_YIELD) { 263 #if defined(__NetBSD__) && defined(_KERNEL) 264 /* thread_block(); */ 265 /* printf("Need to block the thread here...\n"); */ 266 /* XXX thread_block is actually mentioned in 267 * /usr/include/vm/vm_extern.h */ 268 #else 269 thread_block(); 270 #endif 271 } 272 (*(node->doFunc)) (node); 273 break; 274 case rf_recover: 275 /* fire the undo function of a node */ 276 if (rf_engineDebug) { 277 printf("raid%d: Firing (undo) node 0x%lx (%s)\n", 278 node->dagHdr->raidPtr->raidid, 279 (unsigned long) node, node->name); 280 } 281 if (node->flags & RF_DAGNODE_FLAG_YIELD) 282 #if defined(__NetBSD__) && defined(_KERNEL) 283 /* thread_block(); */ 284 /* printf("Need to block the thread here...\n"); */ 285 /* XXX thread_block is actually mentioned in 286 * /usr/include/vm/vm_extern.h */ 287 #else 288 thread_block(); 289 #endif 290 (*(node->undoFunc)) (node); 291 break; 292 default: 293 RF_PANIC(); 294 break; 295 } 296 } 297 298 299 300 /* user context: 301 * Attempt to fire each node in a linear array. 302 * The entire list is fired atomically. 303 */ 304 static void 305 FireNodeArray(int numNodes, RF_DagNode_t **nodeList) 306 { 307 RF_DagStatus_t dstat; 308 RF_DagNode_t *node; 309 int i, j; 310 311 /* first, mark all nodes which are ready to be fired */ 312 for (i = 0; i < numNodes; i++) { 313 node = nodeList[i]; 314 dstat = node->dagHdr->status; 315 RF_ASSERT((node->status == rf_wait) || 316 (node->status == rf_good)); 317 if (NodeReady(node)) { 318 if ((dstat == rf_enable) || 319 (dstat == rf_rollForward)) { 320 RF_ASSERT(node->status == rf_wait); 321 if (node->commitNode) 322 node->dagHdr->numCommits++; 323 node->status = rf_fired; 324 for (j = 0; j < node->numAntecedents; j++) 325 node->antecedents[j]->numSuccFired++; 326 } else { 327 RF_ASSERT(dstat == rf_rollBackward); 328 RF_ASSERT(node->status == rf_good); 329 /* only one commit node per graph */ 330 RF_ASSERT(node->commitNode == RF_FALSE); 331 node->status = rf_recover; 332 } 333 } 334 } 335 /* now, fire the nodes */ 336 for (i = 0; i < numNodes; i++) { 337 if ((nodeList[i]->status == rf_fired) || 338 (nodeList[i]->status == rf_recover)) 339 FireNode(nodeList[i]); 340 } 341 } 342 343 344 /* user context: 345 * Attempt to fire each node in a linked list. 346 * The entire list is fired atomically. 347 */ 348 static void 349 FireNodeList(RF_DagNode_t *nodeList) 350 { 351 RF_DagNode_t *node, *next; 352 RF_DagStatus_t dstat; 353 int j; 354 355 if (nodeList) { 356 /* first, mark all nodes which are ready to be fired */ 357 for (node = nodeList; node; node = next) { 358 next = node->next; 359 dstat = node->dagHdr->status; 360 RF_ASSERT((node->status == rf_wait) || 361 (node->status == rf_good)); 362 if (NodeReady(node)) { 363 if ((dstat == rf_enable) || 364 (dstat == rf_rollForward)) { 365 RF_ASSERT(node->status == rf_wait); 366 if (node->commitNode) 367 node->dagHdr->numCommits++; 368 node->status = rf_fired; 369 for (j = 0; j < node->numAntecedents; j++) 370 node->antecedents[j]->numSuccFired++; 371 } else { 372 RF_ASSERT(dstat == rf_rollBackward); 373 RF_ASSERT(node->status == rf_good); 374 /* only one commit node per graph */ 375 RF_ASSERT(node->commitNode == RF_FALSE); 376 node->status = rf_recover; 377 } 378 } 379 } 380 /* now, fire the nodes */ 381 for (node = nodeList; node; node = next) { 382 next = node->next; 383 if ((node->status == rf_fired) || 384 (node->status == rf_recover)) 385 FireNode(node); 386 } 387 } 388 } 389 /* interrupt context: 390 * for each succedent 391 * propagate required results from node to succedent 392 * increment succedent's numAntDone 393 * place newly-enable nodes on node queue for firing 394 * 395 * To save context switches, we don't place NIL nodes on the node queue, 396 * but rather just process them as if they had fired. Note that NIL nodes 397 * that are the direct successors of the header will actually get fired by 398 * DispatchDAG, which is fine because no context switches are involved. 399 * 400 * Important: when running at user level, this can be called by any 401 * disk thread, and so the increment and check of the antecedent count 402 * must be locked. I used the node queue mutex and locked down the 403 * entire function, but this is certainly overkill. 404 */ 405 static void 406 PropagateResults(RF_DagNode_t *node, int context) 407 { 408 RF_DagNode_t *s, *a; 409 RF_Raid_t *raidPtr; 410 int i, ks; 411 RF_DagNode_t *finishlist = NULL; /* a list of NIL nodes to be 412 * finished */ 413 RF_DagNode_t *skiplist = NULL; /* list of nodes with failed truedata 414 * antecedents */ 415 RF_DagNode_t *firelist = NULL; /* a list of nodes to be fired */ 416 RF_DagNode_t *q = NULL, *qh = NULL, *next; 417 int j, skipNode; 418 419 raidPtr = node->dagHdr->raidPtr; 420 421 DO_LOCK(raidPtr); 422 423 /* debug - validate fire counts */ 424 for (i = 0; i < node->numAntecedents; i++) { 425 a = *(node->antecedents + i); 426 RF_ASSERT(a->numSuccFired >= a->numSuccDone); 427 RF_ASSERT(a->numSuccFired <= a->numSuccedents); 428 a->numSuccDone++; 429 } 430 431 switch (node->dagHdr->status) { 432 case rf_enable: 433 case rf_rollForward: 434 for (i = 0; i < node->numSuccedents; i++) { 435 s = *(node->succedents + i); 436 RF_ASSERT(s->status == rf_wait); 437 (s->numAntDone)++; 438 if (s->numAntDone == s->numAntecedents) { 439 /* look for NIL nodes */ 440 if (s->doFunc == rf_NullNodeFunc) { 441 /* don't fire NIL nodes, just process 442 * them */ 443 s->next = finishlist; 444 finishlist = s; 445 } else { 446 /* look to see if the node is to be 447 * skipped */ 448 skipNode = RF_FALSE; 449 for (j = 0; j < s->numAntecedents; j++) 450 if ((s->antType[j] == rf_trueData) && (s->antecedents[j]->status == rf_bad)) 451 skipNode = RF_TRUE; 452 if (skipNode) { 453 /* this node has one or more 454 * failed true data 455 * dependencies, so skip it */ 456 s->next = skiplist; 457 skiplist = s; 458 } else 459 /* add s to list of nodes (q) 460 * to execute */ 461 if (context != RF_INTR_CONTEXT) { 462 /* we only have to 463 * enqueue if we're at 464 * intr context */ 465 /* put node on 466 a list to 467 be fired 468 after we 469 unlock */ 470 s->next = firelist; 471 firelist = s; 472 } else { 473 /* enqueue the 474 node for 475 the dag 476 exec thread 477 to fire */ 478 RF_ASSERT(NodeReady(s)); 479 if (q) { 480 q->next = s; 481 q = s; 482 } else { 483 qh = q = s; 484 qh->next = NULL; 485 } 486 } 487 } 488 } 489 } 490 491 if (q) { 492 /* xfer our local list of nodes to the node queue */ 493 q->next = raidPtr->node_queue; 494 raidPtr->node_queue = qh; 495 DO_SIGNAL(raidPtr); 496 } 497 DO_UNLOCK(raidPtr); 498 499 for (; skiplist; skiplist = next) { 500 next = skiplist->next; 501 skiplist->status = rf_skipped; 502 for (i = 0; i < skiplist->numAntecedents; i++) { 503 skiplist->antecedents[i]->numSuccFired++; 504 } 505 if (skiplist->commitNode) { 506 skiplist->dagHdr->numCommits++; 507 } 508 rf_FinishNode(skiplist, context); 509 } 510 for (; finishlist; finishlist = next) { 511 /* NIL nodes: no need to fire them */ 512 next = finishlist->next; 513 finishlist->status = rf_good; 514 for (i = 0; i < finishlist->numAntecedents; i++) { 515 finishlist->antecedents[i]->numSuccFired++; 516 } 517 if (finishlist->commitNode) 518 finishlist->dagHdr->numCommits++; 519 /* 520 * Okay, here we're calling rf_FinishNode() on 521 * nodes that have the null function as their 522 * work proc. Such a node could be the 523 * terminal node in a DAG. If so, it will 524 * cause the DAG to complete, which will in 525 * turn free memory used by the DAG, which 526 * includes the node in question. Thus, we 527 * must avoid referencing the node at all 528 * after calling rf_FinishNode() on it. */ 529 rf_FinishNode(finishlist, context); /* recursive call */ 530 } 531 /* fire all nodes in firelist */ 532 FireNodeList(firelist); 533 break; 534 535 case rf_rollBackward: 536 for (i = 0; i < node->numAntecedents; i++) { 537 a = *(node->antecedents + i); 538 RF_ASSERT(a->status == rf_good); 539 RF_ASSERT(a->numSuccDone <= a->numSuccedents); 540 RF_ASSERT(a->numSuccDone <= a->numSuccFired); 541 542 if (a->numSuccDone == a->numSuccFired) { 543 if (a->undoFunc == rf_NullNodeFunc) { 544 /* don't fire NIL nodes, just process 545 * them */ 546 a->next = finishlist; 547 finishlist = a; 548 } else { 549 if (context != RF_INTR_CONTEXT) { 550 /* we only have to enqueue if 551 * we're at intr context */ 552 /* put node on a list to be 553 fired after we unlock */ 554 a->next = firelist; 555 556 firelist = a; 557 } else { 558 /* enqueue the node for the 559 dag exec thread to fire */ 560 RF_ASSERT(NodeReady(a)); 561 if (q) { 562 q->next = a; 563 q = a; 564 } else { 565 qh = q = a; 566 qh->next = NULL; 567 } 568 } 569 } 570 } 571 } 572 if (q) { 573 /* xfer our local list of nodes to the node queue */ 574 q->next = raidPtr->node_queue; 575 raidPtr->node_queue = qh; 576 DO_SIGNAL(raidPtr); 577 } 578 DO_UNLOCK(raidPtr); 579 for (; finishlist; finishlist = next) { 580 /* NIL nodes: no need to fire them */ 581 next = finishlist->next; 582 finishlist->status = rf_good; 583 /* 584 * Okay, here we're calling rf_FinishNode() on 585 * nodes that have the null function as their 586 * work proc. Such a node could be the first 587 * node in a DAG. If so, it will cause the DAG 588 * to complete, which will in turn free memory 589 * used by the DAG, which includes the node in 590 * question. Thus, we must avoid referencing 591 * the node at all after calling 592 * rf_FinishNode() on it. */ 593 rf_FinishNode(finishlist, context); /* recursive call */ 594 } 595 /* fire all nodes in firelist */ 596 FireNodeList(firelist); 597 598 break; 599 default: 600 printf("Engine found illegal DAG status in PropagateResults()\n"); 601 RF_PANIC(); 602 break; 603 } 604 } 605 606 607 608 /* 609 * Process a fired node which has completed 610 */ 611 static void 612 ProcessNode(RF_DagNode_t *node, int context) 613 { 614 RF_Raid_t *raidPtr; 615 616 raidPtr = node->dagHdr->raidPtr; 617 618 switch (node->status) { 619 case rf_good: 620 /* normal case, don't need to do anything */ 621 break; 622 case rf_bad: 623 if ((node->dagHdr->numCommits > 0) || 624 (node->dagHdr->numCommitNodes == 0)) { 625 /* crossed commit barrier */ 626 node->dagHdr->status = rf_rollForward; 627 if (rf_engineDebug) { 628 printf("raid%d: node (%s) returned fail, rolling forward\n", raidPtr->raidid, node->name); 629 } 630 } else { 631 /* never reached commit barrier */ 632 node->dagHdr->status = rf_rollBackward; 633 if (rf_engineDebug) { 634 printf("raid%d: node (%s) returned fail, rolling backward\n", raidPtr->raidid, node->name); 635 } 636 } 637 break; 638 case rf_undone: 639 /* normal rollBackward case, don't need to do anything */ 640 break; 641 case rf_panic: 642 /* an undo node failed!!! */ 643 printf("UNDO of a node failed!!!/n"); 644 break; 645 default: 646 printf("node finished execution with an illegal status!!!\n"); 647 RF_PANIC(); 648 break; 649 } 650 651 /* enqueue node's succedents (antecedents if rollBackward) for 652 * execution */ 653 PropagateResults(node, context); 654 } 655 656 657 658 /* user context or dag-exec-thread context: 659 * This is the first step in post-processing a newly-completed node. 660 * This routine is called by each node execution function to mark the node 661 * as complete and fire off any successors that have been enabled. 662 */ 663 int 664 rf_FinishNode(RF_DagNode_t *node, int context) 665 { 666 int retcode = RF_FALSE; 667 node->dagHdr->numNodesCompleted++; 668 ProcessNode(node, context); 669 670 return (retcode); 671 } 672 673 674 /* user context: submit dag for execution, return non-zero if we have 675 * to wait for completion. if and only if we return non-zero, we'll 676 * cause cbFunc to get invoked with cbArg when the DAG has completed. 677 * 678 * for now we always return 1. If the DAG does not cause any I/O, 679 * then the callback may get invoked before DispatchDAG returns. 680 * There's code in state 5 of ContinueRaidAccess to handle this. 681 * 682 * All we do here is fire the direct successors of the header node. 683 * The DAG execution thread does the rest of the dag processing. */ 684 int 685 rf_DispatchDAG(RF_DagHeader_t *dag, void (*cbFunc) (void *), 686 void *cbArg) 687 { 688 RF_Raid_t *raidPtr; 689 690 raidPtr = dag->raidPtr; 691 if (dag->tracerec) { 692 RF_ETIMER_START(dag->tracerec->timer); 693 } 694 #if DEBUG 695 #if RF_DEBUG_VALIDATE_DAG 696 if (rf_engineDebug || rf_validateDAGDebug) { 697 if (rf_ValidateDAG(dag)) 698 RF_PANIC(); 699 } 700 #endif 701 #endif 702 if (rf_engineDebug) { 703 printf("raid%d: Entering DispatchDAG\n", raidPtr->raidid); 704 } 705 raidPtr->dags_in_flight++; /* debug only: blow off proper 706 * locking */ 707 dag->cbFunc = cbFunc; 708 dag->cbArg = cbArg; 709 dag->numNodesCompleted = 0; 710 dag->status = rf_enable; 711 FireNodeArray(dag->numSuccedents, dag->succedents); 712 return (1); 713 } 714 /* dedicated kernel thread: the thread that handles all DAG node 715 * firing. To minimize locking and unlocking, we grab a copy of the 716 * entire node queue and then set the node queue to NULL before doing 717 * any firing of nodes. This way we only have to release the lock 718 * once. Of course, it's probably rare that there's more than one 719 * node in the queue at any one time, but it sometimes happens. 720 */ 721 722 static void 723 DAGExecutionThread(RF_ThreadArg_t arg) 724 { 725 RF_DagNode_t *nd, *local_nq, *term_nq, *fire_nq; 726 RF_Raid_t *raidPtr; 727 int ks; 728 int s; 729 730 raidPtr = (RF_Raid_t *) arg; 731 732 if (rf_engineDebug) { 733 printf("raid%d: Engine thread is running\n", raidPtr->raidid); 734 } 735 736 s = splbio(); 737 738 DO_LOCK(raidPtr); 739 while (!raidPtr->shutdown_engine) { 740 741 while (raidPtr->node_queue != NULL) { 742 local_nq = raidPtr->node_queue; 743 fire_nq = NULL; 744 term_nq = NULL; 745 raidPtr->node_queue = NULL; 746 DO_UNLOCK(raidPtr); 747 748 /* first, strip out the terminal nodes */ 749 while (local_nq) { 750 nd = local_nq; 751 local_nq = local_nq->next; 752 switch (nd->dagHdr->status) { 753 case rf_enable: 754 case rf_rollForward: 755 if (nd->numSuccedents == 0) { 756 /* end of the dag, add to 757 * callback list */ 758 nd->next = term_nq; 759 term_nq = nd; 760 } else { 761 /* not the end, add to the 762 * fire queue */ 763 nd->next = fire_nq; 764 fire_nq = nd; 765 } 766 break; 767 case rf_rollBackward: 768 if (nd->numAntecedents == 0) { 769 /* end of the dag, add to the 770 * callback list */ 771 nd->next = term_nq; 772 term_nq = nd; 773 } else { 774 /* not the end, add to the 775 * fire queue */ 776 nd->next = fire_nq; 777 fire_nq = nd; 778 } 779 break; 780 default: 781 RF_PANIC(); 782 break; 783 } 784 } 785 786 /* execute callback of dags which have reached the 787 * terminal node */ 788 while (term_nq) { 789 nd = term_nq; 790 term_nq = term_nq->next; 791 nd->next = NULL; 792 (nd->dagHdr->cbFunc) (nd->dagHdr->cbArg); 793 raidPtr->dags_in_flight--; /* debug only */ 794 } 795 796 /* fire remaining nodes */ 797 FireNodeList(fire_nq); 798 799 DO_LOCK(raidPtr); 800 } 801 while (!raidPtr->shutdown_engine && 802 raidPtr->node_queue == NULL) { 803 DO_WAIT(raidPtr); 804 } 805 } 806 DO_UNLOCK(raidPtr); 807 808 splx(s); 809 kthread_exit(0); 810 } 811 812 /* 813 * rf_RaidIOThread() -- When I/O to a component completes, 814 * KernelWakeupFunc() puts the completed request onto raidPtr->iodone 815 * TAILQ. This function looks after requests on that queue by calling 816 * rf_DiskIOComplete() for the request, and by calling any required 817 * CompleteFunc for the request. 818 */ 819 820 static void 821 rf_RaidIOThread(RF_ThreadArg_t arg) 822 { 823 RF_Raid_t *raidPtr; 824 RF_DiskQueueData_t *req; 825 int s; 826 827 raidPtr = (RF_Raid_t *) arg; 828 829 s = splbio(); 830 simple_lock(&(raidPtr->iodone_lock)); 831 832 while (!raidPtr->shutdown_raidio) { 833 /* if there is nothing to do, then snooze. */ 834 if (TAILQ_EMPTY(&(raidPtr->iodone))) { 835 ltsleep(&(raidPtr->iodone), PRIBIO, "raidiow", 0, 836 &(raidPtr->iodone_lock)); 837 } 838 839 /* See what I/Os, if any, have arrived */ 840 while ((req = TAILQ_FIRST(&(raidPtr->iodone))) != NULL) { 841 TAILQ_REMOVE(&(raidPtr->iodone), req, iodone_entries); 842 simple_unlock(&(raidPtr->iodone_lock)); 843 rf_DiskIOComplete(req->queue, req, req->error); 844 (req->CompleteFunc) (req->argument, req->error); 845 simple_lock(&(raidPtr->iodone_lock)); 846 } 847 } 848 849 /* Let rf_ShutdownEngine know that we're done... */ 850 raidPtr->shutdown_raidio = 0; 851 wakeup(&(raidPtr->shutdown_raidio)); 852 853 simple_unlock(&(raidPtr->iodone_lock)); 854 splx(s); 855 856 kthread_exit(0); 857 } 858