1 /* $NetBSD: rf_engine.c,v 1.34 2004/03/09 03:10:26 oster Exp $ */ 2 /* 3 * Copyright (c) 1995 Carnegie-Mellon University. 4 * All rights reserved. 5 * 6 * Author: William V. Courtright II, Mark Holland, Rachad Youssef 7 * 8 * Permission to use, copy, modify and distribute this software and 9 * its documentation is hereby granted, provided that both the copyright 10 * notice and this permission notice appear in all copies of the 11 * software, derivative works or modified versions, and any portions 12 * thereof, and that both notices appear in supporting documentation. 13 * 14 * CARNEGIE MELLON ALLOWS FREE USE OF THIS SOFTWARE IN ITS "AS IS" 15 * CONDITION. CARNEGIE MELLON DISCLAIMS ANY LIABILITY OF ANY KIND 16 * FOR ANY DAMAGES WHATSOEVER RESULTING FROM THE USE OF THIS SOFTWARE. 17 * 18 * Carnegie Mellon requests users of this software to return to 19 * 20 * Software Distribution Coordinator or Software.Distribution@CS.CMU.EDU 21 * School of Computer Science 22 * Carnegie Mellon University 23 * Pittsburgh PA 15213-3890 24 * 25 * any improvements or extensions that they make and grant Carnegie the 26 * rights to redistribute these changes. 27 */ 28 29 /**************************************************************************** 30 * * 31 * engine.c -- code for DAG execution engine * 32 * * 33 * Modified to work as follows (holland): * 34 * A user-thread calls into DispatchDAG, which fires off the nodes that * 35 * are direct successors to the header node. DispatchDAG then returns, * 36 * and the rest of the I/O continues asynchronously. As each node * 37 * completes, the node execution function calls FinishNode(). FinishNode * 38 * scans the list of successors to the node and increments the antecedent * 39 * counts. Each node that becomes enabled is placed on a central node * 40 * queue. A dedicated dag-execution thread grabs nodes off of this * 41 * queue and fires them. * 42 * * 43 * NULL nodes are never fired. * 44 * * 45 * Terminator nodes are never fired, but rather cause the callback * 46 * associated with the DAG to be invoked. * 47 * * 48 * If a node fails, the dag either rolls forward to the completion or * 49 * rolls back, undoing previously-completed nodes and fails atomically. * 50 * The direction of recovery is determined by the location of the failed * 51 * node in the graph. If the failure occurred before the commit node in * 52 * the graph, backward recovery is used. Otherwise, forward recovery is * 53 * used. * 54 * * 55 ****************************************************************************/ 56 57 #include <sys/cdefs.h> 58 __KERNEL_RCSID(0, "$NetBSD: rf_engine.c,v 1.34 2004/03/09 03:10:26 oster Exp $"); 59 60 #include <sys/errno.h> 61 62 #include "rf_threadstuff.h" 63 #include "rf_dag.h" 64 #include "rf_engine.h" 65 #include "rf_etimer.h" 66 #include "rf_general.h" 67 #include "rf_dagutils.h" 68 #include "rf_shutdown.h" 69 #include "rf_raid.h" 70 71 static void rf_ShutdownEngine(void *); 72 static void DAGExecutionThread(RF_ThreadArg_t arg); 73 static void rf_RaidIOThread(RF_ThreadArg_t arg); 74 75 /* synchronization primitives for this file. DO_WAIT should be enclosed in a while loop. */ 76 77 #define DO_LOCK(_r_) \ 78 do { \ 79 ks = splbio(); \ 80 RF_LOCK_MUTEX((_r_)->node_queue_mutex); \ 81 } while (0) 82 83 #define DO_UNLOCK(_r_) \ 84 do { \ 85 RF_UNLOCK_MUTEX((_r_)->node_queue_mutex); \ 86 splx(ks); \ 87 } while (0) 88 89 #define DO_WAIT(_r_) \ 90 RF_WAIT_COND((_r_)->node_queue, (_r_)->node_queue_mutex) 91 92 #define DO_SIGNAL(_r_) \ 93 RF_BROADCAST_COND((_r_)->node_queue) /* XXX RF_SIGNAL_COND? */ 94 95 static void 96 rf_ShutdownEngine(void *arg) 97 { 98 RF_Raid_t *raidPtr; 99 int ks; 100 101 raidPtr = (RF_Raid_t *) arg; 102 103 /* Tell the rf_RaidIOThread to shutdown */ 104 simple_lock(&(raidPtr->iodone_lock)); 105 106 raidPtr->shutdown_raidio = 1; 107 wakeup(&(raidPtr->iodone)); 108 109 /* ...and wait for it to tell us it has finished */ 110 while (raidPtr->shutdown_raidio) 111 ltsleep(&(raidPtr->shutdown_raidio), PRIBIO, "raidshutdown", 0, 112 &(raidPtr->iodone_lock)); 113 114 simple_unlock(&(raidPtr->iodone_lock)); 115 116 /* Now shut down the DAG execution engine. */ 117 DO_LOCK(raidPtr); 118 raidPtr->shutdown_engine = 1; 119 DO_SIGNAL(raidPtr); 120 DO_UNLOCK(raidPtr); 121 122 } 123 124 int 125 rf_ConfigureEngine(RF_ShutdownList_t **listp, RF_Raid_t *raidPtr, 126 RF_Config_t *cfgPtr) 127 { 128 129 rf_mutex_init(&raidPtr->node_queue_mutex); 130 raidPtr->node_queue = NULL; 131 raidPtr->dags_in_flight = 0; 132 133 /* we create the execution thread only once per system boot. no need 134 * to check return code b/c the kernel panics if it can't create the 135 * thread. */ 136 #if RF_DEBUG_ENGINE 137 if (rf_engineDebug) { 138 printf("raid%d: Creating engine thread\n", raidPtr->raidid); 139 } 140 #endif 141 if (RF_CREATE_ENGINE_THREAD(raidPtr->engine_thread, 142 DAGExecutionThread, raidPtr, 143 "raid%d", raidPtr->raidid)) { 144 printf("raid%d: Unable to create engine thread\n", 145 raidPtr->raidid); 146 return (ENOMEM); 147 } 148 if (RF_CREATE_ENGINE_THREAD(raidPtr->engine_helper_thread, 149 rf_RaidIOThread, raidPtr, 150 "raidio%d", raidPtr->raidid)) { 151 printf("raid%d: Unable to create raidio thread\n", 152 raidPtr->raidid); 153 return (ENOMEM); 154 } 155 #if RF_DEBUG_ENGINE 156 if (rf_engineDebug) { 157 printf("raid%d: Created engine thread\n", raidPtr->raidid); 158 } 159 #endif 160 161 /* engine thread is now running and waiting for work */ 162 #if RF_DEBUG_ENGINE 163 if (rf_engineDebug) { 164 printf("raid%d: Engine thread running and waiting for events\n", raidPtr->raidid); 165 } 166 #endif 167 rf_ShutdownCreate(listp, rf_ShutdownEngine, raidPtr); 168 169 return (0); 170 } 171 172 static int 173 BranchDone(RF_DagNode_t *node) 174 { 175 int i; 176 177 /* return true if forward execution is completed for a node and it's 178 * succedents */ 179 switch (node->status) { 180 case rf_wait: 181 /* should never be called in this state */ 182 RF_PANIC(); 183 break; 184 case rf_fired: 185 /* node is currently executing, so we're not done */ 186 return (RF_FALSE); 187 case rf_good: 188 /* for each succedent recursively check branch */ 189 for (i = 0; i < node->numSuccedents; i++) 190 if (!BranchDone(node->succedents[i])) 191 return RF_FALSE; 192 return RF_TRUE; /* node and all succedent branches aren't in 193 * fired state */ 194 case rf_bad: 195 /* succedents can't fire */ 196 return (RF_TRUE); 197 case rf_recover: 198 /* should never be called in this state */ 199 RF_PANIC(); 200 break; 201 case rf_undone: 202 case rf_panic: 203 /* XXX need to fix this case */ 204 /* for now, assume that we're done */ 205 return (RF_TRUE); 206 default: 207 /* illegal node status */ 208 RF_PANIC(); 209 break; 210 } 211 } 212 213 static int 214 NodeReady(RF_DagNode_t *node) 215 { 216 int ready; 217 218 switch (node->dagHdr->status) { 219 case rf_enable: 220 case rf_rollForward: 221 if ((node->status == rf_wait) && 222 (node->numAntecedents == node->numAntDone)) 223 ready = RF_TRUE; 224 else 225 ready = RF_FALSE; 226 break; 227 case rf_rollBackward: 228 RF_ASSERT(node->numSuccDone <= node->numSuccedents); 229 RF_ASSERT(node->numSuccFired <= node->numSuccedents); 230 RF_ASSERT(node->numSuccFired <= node->numSuccDone); 231 if ((node->status == rf_good) && 232 (node->numSuccDone == node->numSuccedents)) 233 ready = RF_TRUE; 234 else 235 ready = RF_FALSE; 236 break; 237 default: 238 printf("Execution engine found illegal DAG status in NodeReady\n"); 239 RF_PANIC(); 240 break; 241 } 242 243 return (ready); 244 } 245 246 247 248 /* user context and dag-exec-thread context: Fire a node. The node's 249 * status field determines which function, do or undo, to be fired. 250 * This routine assumes that the node's status field has alread been 251 * set to "fired" or "recover" to indicate the direction of execution. 252 */ 253 static void 254 FireNode(RF_DagNode_t *node) 255 { 256 switch (node->status) { 257 case rf_fired: 258 /* fire the do function of a node */ 259 #if RF_DEBUG_ENGINE 260 if (rf_engineDebug) { 261 printf("raid%d: Firing node 0x%lx (%s)\n", 262 node->dagHdr->raidPtr->raidid, 263 (unsigned long) node, node->name); 264 } 265 #endif 266 if (node->flags & RF_DAGNODE_FLAG_YIELD) { 267 #if defined(__NetBSD__) && defined(_KERNEL) 268 /* thread_block(); */ 269 /* printf("Need to block the thread here...\n"); */ 270 /* XXX thread_block is actually mentioned in 271 * /usr/include/vm/vm_extern.h */ 272 #else 273 thread_block(); 274 #endif 275 } 276 (*(node->doFunc)) (node); 277 break; 278 case rf_recover: 279 /* fire the undo function of a node */ 280 #if RF_DEBUG_ENGINE 281 if (rf_engineDebug) { 282 printf("raid%d: Firing (undo) node 0x%lx (%s)\n", 283 node->dagHdr->raidPtr->raidid, 284 (unsigned long) node, node->name); 285 } 286 #endif 287 if (node->flags & RF_DAGNODE_FLAG_YIELD) 288 #if defined(__NetBSD__) && defined(_KERNEL) 289 /* thread_block(); */ 290 /* printf("Need to block the thread here...\n"); */ 291 /* XXX thread_block is actually mentioned in 292 * /usr/include/vm/vm_extern.h */ 293 #else 294 thread_block(); 295 #endif 296 (*(node->undoFunc)) (node); 297 break; 298 default: 299 RF_PANIC(); 300 break; 301 } 302 } 303 304 305 306 /* user context: 307 * Attempt to fire each node in a linear array. 308 * The entire list is fired atomically. 309 */ 310 static void 311 FireNodeArray(int numNodes, RF_DagNode_t **nodeList) 312 { 313 RF_DagStatus_t dstat; 314 RF_DagNode_t *node; 315 int i, j; 316 317 /* first, mark all nodes which are ready to be fired */ 318 for (i = 0; i < numNodes; i++) { 319 node = nodeList[i]; 320 dstat = node->dagHdr->status; 321 RF_ASSERT((node->status == rf_wait) || 322 (node->status == rf_good)); 323 if (NodeReady(node)) { 324 if ((dstat == rf_enable) || 325 (dstat == rf_rollForward)) { 326 RF_ASSERT(node->status == rf_wait); 327 if (node->commitNode) 328 node->dagHdr->numCommits++; 329 node->status = rf_fired; 330 for (j = 0; j < node->numAntecedents; j++) 331 node->antecedents[j]->numSuccFired++; 332 } else { 333 RF_ASSERT(dstat == rf_rollBackward); 334 RF_ASSERT(node->status == rf_good); 335 /* only one commit node per graph */ 336 RF_ASSERT(node->commitNode == RF_FALSE); 337 node->status = rf_recover; 338 } 339 } 340 } 341 /* now, fire the nodes */ 342 for (i = 0; i < numNodes; i++) { 343 if ((nodeList[i]->status == rf_fired) || 344 (nodeList[i]->status == rf_recover)) 345 FireNode(nodeList[i]); 346 } 347 } 348 349 350 /* user context: 351 * Attempt to fire each node in a linked list. 352 * The entire list is fired atomically. 353 */ 354 static void 355 FireNodeList(RF_DagNode_t *nodeList) 356 { 357 RF_DagNode_t *node, *next; 358 RF_DagStatus_t dstat; 359 int j; 360 361 if (nodeList) { 362 /* first, mark all nodes which are ready to be fired */ 363 for (node = nodeList; node; node = next) { 364 next = node->next; 365 dstat = node->dagHdr->status; 366 RF_ASSERT((node->status == rf_wait) || 367 (node->status == rf_good)); 368 if (NodeReady(node)) { 369 if ((dstat == rf_enable) || 370 (dstat == rf_rollForward)) { 371 RF_ASSERT(node->status == rf_wait); 372 if (node->commitNode) 373 node->dagHdr->numCommits++; 374 node->status = rf_fired; 375 for (j = 0; j < node->numAntecedents; j++) 376 node->antecedents[j]->numSuccFired++; 377 } else { 378 RF_ASSERT(dstat == rf_rollBackward); 379 RF_ASSERT(node->status == rf_good); 380 /* only one commit node per graph */ 381 RF_ASSERT(node->commitNode == RF_FALSE); 382 node->status = rf_recover; 383 } 384 } 385 } 386 /* now, fire the nodes */ 387 for (node = nodeList; node; node = next) { 388 next = node->next; 389 if ((node->status == rf_fired) || 390 (node->status == rf_recover)) 391 FireNode(node); 392 } 393 } 394 } 395 /* interrupt context: 396 * for each succedent 397 * propagate required results from node to succedent 398 * increment succedent's numAntDone 399 * place newly-enable nodes on node queue for firing 400 * 401 * To save context switches, we don't place NIL nodes on the node queue, 402 * but rather just process them as if they had fired. Note that NIL nodes 403 * that are the direct successors of the header will actually get fired by 404 * DispatchDAG, which is fine because no context switches are involved. 405 * 406 * Important: when running at user level, this can be called by any 407 * disk thread, and so the increment and check of the antecedent count 408 * must be locked. I used the node queue mutex and locked down the 409 * entire function, but this is certainly overkill. 410 */ 411 static void 412 PropagateResults(RF_DagNode_t *node, int context) 413 { 414 RF_DagNode_t *s, *a; 415 RF_Raid_t *raidPtr; 416 int i, ks; 417 RF_DagNode_t *finishlist = NULL; /* a list of NIL nodes to be 418 * finished */ 419 RF_DagNode_t *skiplist = NULL; /* list of nodes with failed truedata 420 * antecedents */ 421 RF_DagNode_t *firelist = NULL; /* a list of nodes to be fired */ 422 RF_DagNode_t *q = NULL, *qh = NULL, *next; 423 int j, skipNode; 424 425 raidPtr = node->dagHdr->raidPtr; 426 427 DO_LOCK(raidPtr); 428 429 /* debug - validate fire counts */ 430 for (i = 0; i < node->numAntecedents; i++) { 431 a = *(node->antecedents + i); 432 RF_ASSERT(a->numSuccFired >= a->numSuccDone); 433 RF_ASSERT(a->numSuccFired <= a->numSuccedents); 434 a->numSuccDone++; 435 } 436 437 switch (node->dagHdr->status) { 438 case rf_enable: 439 case rf_rollForward: 440 for (i = 0; i < node->numSuccedents; i++) { 441 s = *(node->succedents + i); 442 RF_ASSERT(s->status == rf_wait); 443 (s->numAntDone)++; 444 if (s->numAntDone == s->numAntecedents) { 445 /* look for NIL nodes */ 446 if (s->doFunc == rf_NullNodeFunc) { 447 /* don't fire NIL nodes, just process 448 * them */ 449 s->next = finishlist; 450 finishlist = s; 451 } else { 452 /* look to see if the node is to be 453 * skipped */ 454 skipNode = RF_FALSE; 455 for (j = 0; j < s->numAntecedents; j++) 456 if ((s->antType[j] == rf_trueData) && (s->antecedents[j]->status == rf_bad)) 457 skipNode = RF_TRUE; 458 if (skipNode) { 459 /* this node has one or more 460 * failed true data 461 * dependencies, so skip it */ 462 s->next = skiplist; 463 skiplist = s; 464 } else 465 /* add s to list of nodes (q) 466 * to execute */ 467 if (context != RF_INTR_CONTEXT) { 468 /* we only have to 469 * enqueue if we're at 470 * intr context */ 471 /* put node on 472 a list to 473 be fired 474 after we 475 unlock */ 476 s->next = firelist; 477 firelist = s; 478 } else { 479 /* enqueue the 480 node for 481 the dag 482 exec thread 483 to fire */ 484 RF_ASSERT(NodeReady(s)); 485 if (q) { 486 q->next = s; 487 q = s; 488 } else { 489 qh = q = s; 490 qh->next = NULL; 491 } 492 } 493 } 494 } 495 } 496 497 if (q) { 498 /* xfer our local list of nodes to the node queue */ 499 q->next = raidPtr->node_queue; 500 raidPtr->node_queue = qh; 501 DO_SIGNAL(raidPtr); 502 } 503 DO_UNLOCK(raidPtr); 504 505 for (; skiplist; skiplist = next) { 506 next = skiplist->next; 507 skiplist->status = rf_skipped; 508 for (i = 0; i < skiplist->numAntecedents; i++) { 509 skiplist->antecedents[i]->numSuccFired++; 510 } 511 if (skiplist->commitNode) { 512 skiplist->dagHdr->numCommits++; 513 } 514 rf_FinishNode(skiplist, context); 515 } 516 for (; finishlist; finishlist = next) { 517 /* NIL nodes: no need to fire them */ 518 next = finishlist->next; 519 finishlist->status = rf_good; 520 for (i = 0; i < finishlist->numAntecedents; i++) { 521 finishlist->antecedents[i]->numSuccFired++; 522 } 523 if (finishlist->commitNode) 524 finishlist->dagHdr->numCommits++; 525 /* 526 * Okay, here we're calling rf_FinishNode() on 527 * nodes that have the null function as their 528 * work proc. Such a node could be the 529 * terminal node in a DAG. If so, it will 530 * cause the DAG to complete, which will in 531 * turn free memory used by the DAG, which 532 * includes the node in question. Thus, we 533 * must avoid referencing the node at all 534 * after calling rf_FinishNode() on it. */ 535 rf_FinishNode(finishlist, context); /* recursive call */ 536 } 537 /* fire all nodes in firelist */ 538 FireNodeList(firelist); 539 break; 540 541 case rf_rollBackward: 542 for (i = 0; i < node->numAntecedents; i++) { 543 a = *(node->antecedents + i); 544 RF_ASSERT(a->status == rf_good); 545 RF_ASSERT(a->numSuccDone <= a->numSuccedents); 546 RF_ASSERT(a->numSuccDone <= a->numSuccFired); 547 548 if (a->numSuccDone == a->numSuccFired) { 549 if (a->undoFunc == rf_NullNodeFunc) { 550 /* don't fire NIL nodes, just process 551 * them */ 552 a->next = finishlist; 553 finishlist = a; 554 } else { 555 if (context != RF_INTR_CONTEXT) { 556 /* we only have to enqueue if 557 * we're at intr context */ 558 /* put node on a list to be 559 fired after we unlock */ 560 a->next = firelist; 561 562 firelist = a; 563 } else { 564 /* enqueue the node for the 565 dag exec thread to fire */ 566 RF_ASSERT(NodeReady(a)); 567 if (q) { 568 q->next = a; 569 q = a; 570 } else { 571 qh = q = a; 572 qh->next = NULL; 573 } 574 } 575 } 576 } 577 } 578 if (q) { 579 /* xfer our local list of nodes to the node queue */ 580 q->next = raidPtr->node_queue; 581 raidPtr->node_queue = qh; 582 DO_SIGNAL(raidPtr); 583 } 584 DO_UNLOCK(raidPtr); 585 for (; finishlist; finishlist = next) { 586 /* NIL nodes: no need to fire them */ 587 next = finishlist->next; 588 finishlist->status = rf_good; 589 /* 590 * Okay, here we're calling rf_FinishNode() on 591 * nodes that have the null function as their 592 * work proc. Such a node could be the first 593 * node in a DAG. If so, it will cause the DAG 594 * to complete, which will in turn free memory 595 * used by the DAG, which includes the node in 596 * question. Thus, we must avoid referencing 597 * the node at all after calling 598 * rf_FinishNode() on it. */ 599 rf_FinishNode(finishlist, context); /* recursive call */ 600 } 601 /* fire all nodes in firelist */ 602 FireNodeList(firelist); 603 604 break; 605 default: 606 printf("Engine found illegal DAG status in PropagateResults()\n"); 607 RF_PANIC(); 608 break; 609 } 610 } 611 612 613 614 /* 615 * Process a fired node which has completed 616 */ 617 static void 618 ProcessNode(RF_DagNode_t *node, int context) 619 { 620 RF_Raid_t *raidPtr; 621 622 raidPtr = node->dagHdr->raidPtr; 623 624 switch (node->status) { 625 case rf_good: 626 /* normal case, don't need to do anything */ 627 break; 628 case rf_bad: 629 if ((node->dagHdr->numCommits > 0) || 630 (node->dagHdr->numCommitNodes == 0)) { 631 /* crossed commit barrier */ 632 node->dagHdr->status = rf_rollForward; 633 #if RF_DEBUG_ENGINE 634 if (rf_engineDebug) { 635 printf("raid%d: node (%s) returned fail, rolling forward\n", raidPtr->raidid, node->name); 636 } 637 #endif 638 } else { 639 /* never reached commit barrier */ 640 node->dagHdr->status = rf_rollBackward; 641 #if RF_DEBUG_ENGINE 642 if (rf_engineDebug) { 643 printf("raid%d: node (%s) returned fail, rolling backward\n", raidPtr->raidid, node->name); 644 } 645 #endif 646 } 647 break; 648 case rf_undone: 649 /* normal rollBackward case, don't need to do anything */ 650 break; 651 case rf_panic: 652 /* an undo node failed!!! */ 653 printf("UNDO of a node failed!!!/n"); 654 break; 655 default: 656 printf("node finished execution with an illegal status!!!\n"); 657 RF_PANIC(); 658 break; 659 } 660 661 /* enqueue node's succedents (antecedents if rollBackward) for 662 * execution */ 663 PropagateResults(node, context); 664 } 665 666 667 668 /* user context or dag-exec-thread context: 669 * This is the first step in post-processing a newly-completed node. 670 * This routine is called by each node execution function to mark the node 671 * as complete and fire off any successors that have been enabled. 672 */ 673 int 674 rf_FinishNode(RF_DagNode_t *node, int context) 675 { 676 int retcode = RF_FALSE; 677 node->dagHdr->numNodesCompleted++; 678 ProcessNode(node, context); 679 680 return (retcode); 681 } 682 683 684 /* user context: submit dag for execution, return non-zero if we have 685 * to wait for completion. if and only if we return non-zero, we'll 686 * cause cbFunc to get invoked with cbArg when the DAG has completed. 687 * 688 * for now we always return 1. If the DAG does not cause any I/O, 689 * then the callback may get invoked before DispatchDAG returns. 690 * There's code in state 5 of ContinueRaidAccess to handle this. 691 * 692 * All we do here is fire the direct successors of the header node. 693 * The DAG execution thread does the rest of the dag processing. */ 694 int 695 rf_DispatchDAG(RF_DagHeader_t *dag, void (*cbFunc) (void *), 696 void *cbArg) 697 { 698 RF_Raid_t *raidPtr; 699 700 raidPtr = dag->raidPtr; 701 #if RF_ACC_TRACE > 0 702 if (dag->tracerec) { 703 RF_ETIMER_START(dag->tracerec->timer); 704 } 705 #endif 706 #if DEBUG 707 #if RF_DEBUG_VALIDATE_DAG 708 if (rf_engineDebug || rf_validateDAGDebug) { 709 if (rf_ValidateDAG(dag)) 710 RF_PANIC(); 711 } 712 #endif 713 #endif 714 #if RF_DEBUG_ENGINE 715 if (rf_engineDebug) { 716 printf("raid%d: Entering DispatchDAG\n", raidPtr->raidid); 717 } 718 #endif 719 raidPtr->dags_in_flight++; /* debug only: blow off proper 720 * locking */ 721 dag->cbFunc = cbFunc; 722 dag->cbArg = cbArg; 723 dag->numNodesCompleted = 0; 724 dag->status = rf_enable; 725 FireNodeArray(dag->numSuccedents, dag->succedents); 726 return (1); 727 } 728 /* dedicated kernel thread: the thread that handles all DAG node 729 * firing. To minimize locking and unlocking, we grab a copy of the 730 * entire node queue and then set the node queue to NULL before doing 731 * any firing of nodes. This way we only have to release the lock 732 * once. Of course, it's probably rare that there's more than one 733 * node in the queue at any one time, but it sometimes happens. 734 */ 735 736 static void 737 DAGExecutionThread(RF_ThreadArg_t arg) 738 { 739 RF_DagNode_t *nd, *local_nq, *term_nq, *fire_nq; 740 RF_Raid_t *raidPtr; 741 int ks; 742 int s; 743 744 raidPtr = (RF_Raid_t *) arg; 745 746 #if RF_DEBUG_ENGINE 747 if (rf_engineDebug) { 748 printf("raid%d: Engine thread is running\n", raidPtr->raidid); 749 } 750 #endif 751 s = splbio(); 752 753 DO_LOCK(raidPtr); 754 while (!raidPtr->shutdown_engine) { 755 756 while (raidPtr->node_queue != NULL) { 757 local_nq = raidPtr->node_queue; 758 fire_nq = NULL; 759 term_nq = NULL; 760 raidPtr->node_queue = NULL; 761 DO_UNLOCK(raidPtr); 762 763 /* first, strip out the terminal nodes */ 764 while (local_nq) { 765 nd = local_nq; 766 local_nq = local_nq->next; 767 switch (nd->dagHdr->status) { 768 case rf_enable: 769 case rf_rollForward: 770 if (nd->numSuccedents == 0) { 771 /* end of the dag, add to 772 * callback list */ 773 nd->next = term_nq; 774 term_nq = nd; 775 } else { 776 /* not the end, add to the 777 * fire queue */ 778 nd->next = fire_nq; 779 fire_nq = nd; 780 } 781 break; 782 case rf_rollBackward: 783 if (nd->numAntecedents == 0) { 784 /* end of the dag, add to the 785 * callback list */ 786 nd->next = term_nq; 787 term_nq = nd; 788 } else { 789 /* not the end, add to the 790 * fire queue */ 791 nd->next = fire_nq; 792 fire_nq = nd; 793 } 794 break; 795 default: 796 RF_PANIC(); 797 break; 798 } 799 } 800 801 /* execute callback of dags which have reached the 802 * terminal node */ 803 while (term_nq) { 804 nd = term_nq; 805 term_nq = term_nq->next; 806 nd->next = NULL; 807 (nd->dagHdr->cbFunc) (nd->dagHdr->cbArg); 808 raidPtr->dags_in_flight--; /* debug only */ 809 } 810 811 /* fire remaining nodes */ 812 FireNodeList(fire_nq); 813 814 DO_LOCK(raidPtr); 815 } 816 while (!raidPtr->shutdown_engine && 817 raidPtr->node_queue == NULL) { 818 DO_WAIT(raidPtr); 819 } 820 } 821 DO_UNLOCK(raidPtr); 822 823 splx(s); 824 kthread_exit(0); 825 } 826 827 /* 828 * rf_RaidIOThread() -- When I/O to a component completes, 829 * KernelWakeupFunc() puts the completed request onto raidPtr->iodone 830 * TAILQ. This function looks after requests on that queue by calling 831 * rf_DiskIOComplete() for the request, and by calling any required 832 * CompleteFunc for the request. 833 */ 834 835 static void 836 rf_RaidIOThread(RF_ThreadArg_t arg) 837 { 838 RF_Raid_t *raidPtr; 839 RF_DiskQueueData_t *req; 840 int s; 841 842 raidPtr = (RF_Raid_t *) arg; 843 844 s = splbio(); 845 simple_lock(&(raidPtr->iodone_lock)); 846 847 while (!raidPtr->shutdown_raidio) { 848 /* if there is nothing to do, then snooze. */ 849 if (TAILQ_EMPTY(&(raidPtr->iodone))) { 850 ltsleep(&(raidPtr->iodone), PRIBIO, "raidiow", 0, 851 &(raidPtr->iodone_lock)); 852 } 853 854 /* See what I/Os, if any, have arrived */ 855 while ((req = TAILQ_FIRST(&(raidPtr->iodone))) != NULL) { 856 TAILQ_REMOVE(&(raidPtr->iodone), req, iodone_entries); 857 simple_unlock(&(raidPtr->iodone_lock)); 858 rf_DiskIOComplete(req->queue, req, req->error); 859 (req->CompleteFunc) (req->argument, req->error); 860 simple_lock(&(raidPtr->iodone_lock)); 861 } 862 } 863 864 /* Let rf_ShutdownEngine know that we're done... */ 865 raidPtr->shutdown_raidio = 0; 866 wakeup(&(raidPtr->shutdown_raidio)); 867 868 simple_unlock(&(raidPtr->iodone_lock)); 869 splx(s); 870 871 kthread_exit(0); 872 } 873