1 /* $NetBSD: rf_engine.c,v 1.39 2006/11/16 01:33:23 christos Exp $ */ 2 /* 3 * Copyright (c) 1995 Carnegie-Mellon University. 4 * All rights reserved. 5 * 6 * Author: William V. Courtright II, Mark Holland, Rachad Youssef 7 * 8 * Permission to use, copy, modify and distribute this software and 9 * its documentation is hereby granted, provided that both the copyright 10 * notice and this permission notice appear in all copies of the 11 * software, derivative works or modified versions, and any portions 12 * thereof, and that both notices appear in supporting documentation. 13 * 14 * CARNEGIE MELLON ALLOWS FREE USE OF THIS SOFTWARE IN ITS "AS IS" 15 * CONDITION. CARNEGIE MELLON DISCLAIMS ANY LIABILITY OF ANY KIND 16 * FOR ANY DAMAGES WHATSOEVER RESULTING FROM THE USE OF THIS SOFTWARE. 17 * 18 * Carnegie Mellon requests users of this software to return to 19 * 20 * Software Distribution Coordinator or Software.Distribution@CS.CMU.EDU 21 * School of Computer Science 22 * Carnegie Mellon University 23 * Pittsburgh PA 15213-3890 24 * 25 * any improvements or extensions that they make and grant Carnegie the 26 * rights to redistribute these changes. 27 */ 28 29 /**************************************************************************** 30 * * 31 * engine.c -- code for DAG execution engine * 32 * * 33 * Modified to work as follows (holland): * 34 * A user-thread calls into DispatchDAG, which fires off the nodes that * 35 * are direct successors to the header node. DispatchDAG then returns, * 36 * and the rest of the I/O continues asynchronously. As each node * 37 * completes, the node execution function calls FinishNode(). FinishNode * 38 * scans the list of successors to the node and increments the antecedent * 39 * counts. Each node that becomes enabled is placed on a central node * 40 * queue. A dedicated dag-execution thread grabs nodes off of this * 41 * queue and fires them. * 42 * * 43 * NULL nodes are never fired. * 44 * * 45 * Terminator nodes are never fired, but rather cause the callback * 46 * associated with the DAG to be invoked. * 47 * * 48 * If a node fails, the dag either rolls forward to the completion or * 49 * rolls back, undoing previously-completed nodes and fails atomically. * 50 * The direction of recovery is determined by the location of the failed * 51 * node in the graph. If the failure occurred before the commit node in * 52 * the graph, backward recovery is used. Otherwise, forward recovery is * 53 * used. * 54 * * 55 ****************************************************************************/ 56 57 #include <sys/cdefs.h> 58 __KERNEL_RCSID(0, "$NetBSD: rf_engine.c,v 1.39 2006/11/16 01:33:23 christos Exp $"); 59 60 #include <sys/errno.h> 61 62 #include "rf_threadstuff.h" 63 #include "rf_dag.h" 64 #include "rf_engine.h" 65 #include "rf_etimer.h" 66 #include "rf_general.h" 67 #include "rf_dagutils.h" 68 #include "rf_shutdown.h" 69 #include "rf_raid.h" 70 #include "rf_kintf.h" 71 72 static void rf_ShutdownEngine(void *); 73 static void DAGExecutionThread(RF_ThreadArg_t arg); 74 static void rf_RaidIOThread(RF_ThreadArg_t arg); 75 76 /* synchronization primitives for this file. DO_WAIT should be enclosed in a while loop. */ 77 78 #define DO_LOCK(_r_) \ 79 do { \ 80 ks = splbio(); \ 81 RF_LOCK_MUTEX((_r_)->node_queue_mutex); \ 82 } while (0) 83 84 #define DO_UNLOCK(_r_) \ 85 do { \ 86 RF_UNLOCK_MUTEX((_r_)->node_queue_mutex); \ 87 splx(ks); \ 88 } while (0) 89 90 #define DO_WAIT(_r_) \ 91 RF_WAIT_COND((_r_)->node_queue, (_r_)->node_queue_mutex) 92 93 #define DO_SIGNAL(_r_) \ 94 RF_BROADCAST_COND((_r_)->node_queue) /* XXX RF_SIGNAL_COND? */ 95 96 static void 97 rf_ShutdownEngine(void *arg) 98 { 99 RF_Raid_t *raidPtr; 100 int ks; 101 102 raidPtr = (RF_Raid_t *) arg; 103 104 /* Tell the rf_RaidIOThread to shutdown */ 105 simple_lock(&(raidPtr->iodone_lock)); 106 107 raidPtr->shutdown_raidio = 1; 108 wakeup(&(raidPtr->iodone)); 109 110 /* ...and wait for it to tell us it has finished */ 111 while (raidPtr->shutdown_raidio) 112 ltsleep(&(raidPtr->shutdown_raidio), PRIBIO, "raidshutdown", 0, 113 &(raidPtr->iodone_lock)); 114 115 simple_unlock(&(raidPtr->iodone_lock)); 116 117 /* Now shut down the DAG execution engine. */ 118 DO_LOCK(raidPtr); 119 raidPtr->shutdown_engine = 1; 120 DO_SIGNAL(raidPtr); 121 DO_UNLOCK(raidPtr); 122 123 } 124 125 int 126 rf_ConfigureEngine(RF_ShutdownList_t **listp, RF_Raid_t *raidPtr, 127 RF_Config_t *cfgPtr) 128 { 129 130 rf_mutex_init(&raidPtr->node_queue_mutex); 131 raidPtr->node_queue = NULL; 132 raidPtr->dags_in_flight = 0; 133 134 /* we create the execution thread only once per system boot. no need 135 * to check return code b/c the kernel panics if it can't create the 136 * thread. */ 137 #if RF_DEBUG_ENGINE 138 if (rf_engineDebug) { 139 printf("raid%d: Creating engine thread\n", raidPtr->raidid); 140 } 141 #endif 142 if (RF_CREATE_ENGINE_THREAD(raidPtr->engine_thread, 143 DAGExecutionThread, raidPtr, 144 "raid%d", raidPtr->raidid)) { 145 printf("raid%d: Unable to create engine thread\n", 146 raidPtr->raidid); 147 return (ENOMEM); 148 } 149 if (RF_CREATE_ENGINE_THREAD(raidPtr->engine_helper_thread, 150 rf_RaidIOThread, raidPtr, 151 "raidio%d", raidPtr->raidid)) { 152 printf("raid%d: Unable to create raidio thread\n", 153 raidPtr->raidid); 154 return (ENOMEM); 155 } 156 #if RF_DEBUG_ENGINE 157 if (rf_engineDebug) { 158 printf("raid%d: Created engine thread\n", raidPtr->raidid); 159 } 160 #endif 161 162 /* engine thread is now running and waiting for work */ 163 #if RF_DEBUG_ENGINE 164 if (rf_engineDebug) { 165 printf("raid%d: Engine thread running and waiting for events\n", raidPtr->raidid); 166 } 167 #endif 168 rf_ShutdownCreate(listp, rf_ShutdownEngine, raidPtr); 169 170 return (0); 171 } 172 173 static int 174 BranchDone(RF_DagNode_t *node) 175 { 176 int i; 177 178 /* return true if forward execution is completed for a node and it's 179 * succedents */ 180 switch (node->status) { 181 case rf_wait: 182 /* should never be called in this state */ 183 RF_PANIC(); 184 break; 185 case rf_fired: 186 /* node is currently executing, so we're not done */ 187 return (RF_FALSE); 188 case rf_good: 189 /* for each succedent recursively check branch */ 190 for (i = 0; i < node->numSuccedents; i++) 191 if (!BranchDone(node->succedents[i])) 192 return RF_FALSE; 193 return RF_TRUE; /* node and all succedent branches aren't in 194 * fired state */ 195 case rf_bad: 196 /* succedents can't fire */ 197 return (RF_TRUE); 198 case rf_recover: 199 /* should never be called in this state */ 200 RF_PANIC(); 201 break; 202 case rf_undone: 203 case rf_panic: 204 /* XXX need to fix this case */ 205 /* for now, assume that we're done */ 206 return (RF_TRUE); 207 default: 208 /* illegal node status */ 209 RF_PANIC(); 210 break; 211 } 212 } 213 214 static int 215 NodeReady(RF_DagNode_t *node) 216 { 217 int ready; 218 219 switch (node->dagHdr->status) { 220 case rf_enable: 221 case rf_rollForward: 222 if ((node->status == rf_wait) && 223 (node->numAntecedents == node->numAntDone)) 224 ready = RF_TRUE; 225 else 226 ready = RF_FALSE; 227 break; 228 case rf_rollBackward: 229 RF_ASSERT(node->numSuccDone <= node->numSuccedents); 230 RF_ASSERT(node->numSuccFired <= node->numSuccedents); 231 RF_ASSERT(node->numSuccFired <= node->numSuccDone); 232 if ((node->status == rf_good) && 233 (node->numSuccDone == node->numSuccedents)) 234 ready = RF_TRUE; 235 else 236 ready = RF_FALSE; 237 break; 238 default: 239 printf("Execution engine found illegal DAG status in NodeReady\n"); 240 RF_PANIC(); 241 break; 242 } 243 244 return (ready); 245 } 246 247 248 249 /* user context and dag-exec-thread context: Fire a node. The node's 250 * status field determines which function, do or undo, to be fired. 251 * This routine assumes that the node's status field has alread been 252 * set to "fired" or "recover" to indicate the direction of execution. 253 */ 254 static void 255 FireNode(RF_DagNode_t *node) 256 { 257 switch (node->status) { 258 case rf_fired: 259 /* fire the do function of a node */ 260 #if RF_DEBUG_ENGINE 261 if (rf_engineDebug) { 262 printf("raid%d: Firing node 0x%lx (%s)\n", 263 node->dagHdr->raidPtr->raidid, 264 (unsigned long) node, node->name); 265 } 266 #endif 267 if (node->flags & RF_DAGNODE_FLAG_YIELD) { 268 #if defined(__NetBSD__) && defined(_KERNEL) 269 /* thread_block(); */ 270 /* printf("Need to block the thread here...\n"); */ 271 /* XXX thread_block is actually mentioned in 272 * /usr/include/vm/vm_extern.h */ 273 #else 274 thread_block(); 275 #endif 276 } 277 (*(node->doFunc)) (node); 278 break; 279 case rf_recover: 280 /* fire the undo function of a node */ 281 #if RF_DEBUG_ENGINE 282 if (rf_engineDebug) { 283 printf("raid%d: Firing (undo) node 0x%lx (%s)\n", 284 node->dagHdr->raidPtr->raidid, 285 (unsigned long) node, node->name); 286 } 287 #endif 288 if (node->flags & RF_DAGNODE_FLAG_YIELD) 289 #if defined(__NetBSD__) && defined(_KERNEL) 290 /* thread_block(); */ 291 /* printf("Need to block the thread here...\n"); */ 292 /* XXX thread_block is actually mentioned in 293 * /usr/include/vm/vm_extern.h */ 294 #else 295 thread_block(); 296 #endif 297 (*(node->undoFunc)) (node); 298 break; 299 default: 300 RF_PANIC(); 301 break; 302 } 303 } 304 305 306 307 /* user context: 308 * Attempt to fire each node in a linear array. 309 * The entire list is fired atomically. 310 */ 311 static void 312 FireNodeArray(int numNodes, RF_DagNode_t **nodeList) 313 { 314 RF_DagStatus_t dstat; 315 RF_DagNode_t *node; 316 int i, j; 317 318 /* first, mark all nodes which are ready to be fired */ 319 for (i = 0; i < numNodes; i++) { 320 node = nodeList[i]; 321 dstat = node->dagHdr->status; 322 RF_ASSERT((node->status == rf_wait) || 323 (node->status == rf_good)); 324 if (NodeReady(node)) { 325 if ((dstat == rf_enable) || 326 (dstat == rf_rollForward)) { 327 RF_ASSERT(node->status == rf_wait); 328 if (node->commitNode) 329 node->dagHdr->numCommits++; 330 node->status = rf_fired; 331 for (j = 0; j < node->numAntecedents; j++) 332 node->antecedents[j]->numSuccFired++; 333 } else { 334 RF_ASSERT(dstat == rf_rollBackward); 335 RF_ASSERT(node->status == rf_good); 336 /* only one commit node per graph */ 337 RF_ASSERT(node->commitNode == RF_FALSE); 338 node->status = rf_recover; 339 } 340 } 341 } 342 /* now, fire the nodes */ 343 for (i = 0; i < numNodes; i++) { 344 if ((nodeList[i]->status == rf_fired) || 345 (nodeList[i]->status == rf_recover)) 346 FireNode(nodeList[i]); 347 } 348 } 349 350 351 /* user context: 352 * Attempt to fire each node in a linked list. 353 * The entire list is fired atomically. 354 */ 355 static void 356 FireNodeList(RF_DagNode_t *nodeList) 357 { 358 RF_DagNode_t *node, *next; 359 RF_DagStatus_t dstat; 360 int j; 361 362 if (nodeList) { 363 /* first, mark all nodes which are ready to be fired */ 364 for (node = nodeList; node; node = next) { 365 next = node->next; 366 dstat = node->dagHdr->status; 367 RF_ASSERT((node->status == rf_wait) || 368 (node->status == rf_good)); 369 if (NodeReady(node)) { 370 if ((dstat == rf_enable) || 371 (dstat == rf_rollForward)) { 372 RF_ASSERT(node->status == rf_wait); 373 if (node->commitNode) 374 node->dagHdr->numCommits++; 375 node->status = rf_fired; 376 for (j = 0; j < node->numAntecedents; j++) 377 node->antecedents[j]->numSuccFired++; 378 } else { 379 RF_ASSERT(dstat == rf_rollBackward); 380 RF_ASSERT(node->status == rf_good); 381 /* only one commit node per graph */ 382 RF_ASSERT(node->commitNode == RF_FALSE); 383 node->status = rf_recover; 384 } 385 } 386 } 387 /* now, fire the nodes */ 388 for (node = nodeList; node; node = next) { 389 next = node->next; 390 if ((node->status == rf_fired) || 391 (node->status == rf_recover)) 392 FireNode(node); 393 } 394 } 395 } 396 /* interrupt context: 397 * for each succedent 398 * propagate required results from node to succedent 399 * increment succedent's numAntDone 400 * place newly-enable nodes on node queue for firing 401 * 402 * To save context switches, we don't place NIL nodes on the node queue, 403 * but rather just process them as if they had fired. Note that NIL nodes 404 * that are the direct successors of the header will actually get fired by 405 * DispatchDAG, which is fine because no context switches are involved. 406 * 407 * Important: when running at user level, this can be called by any 408 * disk thread, and so the increment and check of the antecedent count 409 * must be locked. I used the node queue mutex and locked down the 410 * entire function, but this is certainly overkill. 411 */ 412 static void 413 PropagateResults(RF_DagNode_t *node, int context) 414 { 415 RF_DagNode_t *s, *a; 416 RF_Raid_t *raidPtr; 417 int i, ks; 418 RF_DagNode_t *finishlist = NULL; /* a list of NIL nodes to be 419 * finished */ 420 RF_DagNode_t *skiplist = NULL; /* list of nodes with failed truedata 421 * antecedents */ 422 RF_DagNode_t *firelist = NULL; /* a list of nodes to be fired */ 423 RF_DagNode_t *q = NULL, *qh = NULL, *next; 424 int j, skipNode; 425 426 raidPtr = node->dagHdr->raidPtr; 427 428 DO_LOCK(raidPtr); 429 430 /* debug - validate fire counts */ 431 for (i = 0; i < node->numAntecedents; i++) { 432 a = *(node->antecedents + i); 433 RF_ASSERT(a->numSuccFired >= a->numSuccDone); 434 RF_ASSERT(a->numSuccFired <= a->numSuccedents); 435 a->numSuccDone++; 436 } 437 438 switch (node->dagHdr->status) { 439 case rf_enable: 440 case rf_rollForward: 441 for (i = 0; i < node->numSuccedents; i++) { 442 s = *(node->succedents + i); 443 RF_ASSERT(s->status == rf_wait); 444 (s->numAntDone)++; 445 if (s->numAntDone == s->numAntecedents) { 446 /* look for NIL nodes */ 447 if (s->doFunc == rf_NullNodeFunc) { 448 /* don't fire NIL nodes, just process 449 * them */ 450 s->next = finishlist; 451 finishlist = s; 452 } else { 453 /* look to see if the node is to be 454 * skipped */ 455 skipNode = RF_FALSE; 456 for (j = 0; j < s->numAntecedents; j++) 457 if ((s->antType[j] == rf_trueData) && (s->antecedents[j]->status == rf_bad)) 458 skipNode = RF_TRUE; 459 if (skipNode) { 460 /* this node has one or more 461 * failed true data 462 * dependencies, so skip it */ 463 s->next = skiplist; 464 skiplist = s; 465 } else 466 /* add s to list of nodes (q) 467 * to execute */ 468 if (context != RF_INTR_CONTEXT) { 469 /* we only have to 470 * enqueue if we're at 471 * intr context */ 472 /* put node on 473 a list to 474 be fired 475 after we 476 unlock */ 477 s->next = firelist; 478 firelist = s; 479 } else { 480 /* enqueue the 481 node for 482 the dag 483 exec thread 484 to fire */ 485 RF_ASSERT(NodeReady(s)); 486 if (q) { 487 q->next = s; 488 q = s; 489 } else { 490 qh = q = s; 491 qh->next = NULL; 492 } 493 } 494 } 495 } 496 } 497 498 if (q) { 499 /* xfer our local list of nodes to the node queue */ 500 q->next = raidPtr->node_queue; 501 raidPtr->node_queue = qh; 502 DO_SIGNAL(raidPtr); 503 } 504 DO_UNLOCK(raidPtr); 505 506 for (; skiplist; skiplist = next) { 507 next = skiplist->next; 508 skiplist->status = rf_skipped; 509 for (i = 0; i < skiplist->numAntecedents; i++) { 510 skiplist->antecedents[i]->numSuccFired++; 511 } 512 if (skiplist->commitNode) { 513 skiplist->dagHdr->numCommits++; 514 } 515 rf_FinishNode(skiplist, context); 516 } 517 for (; finishlist; finishlist = next) { 518 /* NIL nodes: no need to fire them */ 519 next = finishlist->next; 520 finishlist->status = rf_good; 521 for (i = 0; i < finishlist->numAntecedents; i++) { 522 finishlist->antecedents[i]->numSuccFired++; 523 } 524 if (finishlist->commitNode) 525 finishlist->dagHdr->numCommits++; 526 /* 527 * Okay, here we're calling rf_FinishNode() on 528 * nodes that have the null function as their 529 * work proc. Such a node could be the 530 * terminal node in a DAG. If so, it will 531 * cause the DAG to complete, which will in 532 * turn free memory used by the DAG, which 533 * includes the node in question. Thus, we 534 * must avoid referencing the node at all 535 * after calling rf_FinishNode() on it. */ 536 rf_FinishNode(finishlist, context); /* recursive call */ 537 } 538 /* fire all nodes in firelist */ 539 FireNodeList(firelist); 540 break; 541 542 case rf_rollBackward: 543 for (i = 0; i < node->numAntecedents; i++) { 544 a = *(node->antecedents + i); 545 RF_ASSERT(a->status == rf_good); 546 RF_ASSERT(a->numSuccDone <= a->numSuccedents); 547 RF_ASSERT(a->numSuccDone <= a->numSuccFired); 548 549 if (a->numSuccDone == a->numSuccFired) { 550 if (a->undoFunc == rf_NullNodeFunc) { 551 /* don't fire NIL nodes, just process 552 * them */ 553 a->next = finishlist; 554 finishlist = a; 555 } else { 556 if (context != RF_INTR_CONTEXT) { 557 /* we only have to enqueue if 558 * we're at intr context */ 559 /* put node on a list to be 560 fired after we unlock */ 561 a->next = firelist; 562 563 firelist = a; 564 } else { 565 /* enqueue the node for the 566 dag exec thread to fire */ 567 RF_ASSERT(NodeReady(a)); 568 if (q) { 569 q->next = a; 570 q = a; 571 } else { 572 qh = q = a; 573 qh->next = NULL; 574 } 575 } 576 } 577 } 578 } 579 if (q) { 580 /* xfer our local list of nodes to the node queue */ 581 q->next = raidPtr->node_queue; 582 raidPtr->node_queue = qh; 583 DO_SIGNAL(raidPtr); 584 } 585 DO_UNLOCK(raidPtr); 586 for (; finishlist; finishlist = next) { 587 /* NIL nodes: no need to fire them */ 588 next = finishlist->next; 589 finishlist->status = rf_good; 590 /* 591 * Okay, here we're calling rf_FinishNode() on 592 * nodes that have the null function as their 593 * work proc. Such a node could be the first 594 * node in a DAG. If so, it will cause the DAG 595 * to complete, which will in turn free memory 596 * used by the DAG, which includes the node in 597 * question. Thus, we must avoid referencing 598 * the node at all after calling 599 * rf_FinishNode() on it. */ 600 rf_FinishNode(finishlist, context); /* recursive call */ 601 } 602 /* fire all nodes in firelist */ 603 FireNodeList(firelist); 604 605 break; 606 default: 607 printf("Engine found illegal DAG status in PropagateResults()\n"); 608 RF_PANIC(); 609 break; 610 } 611 } 612 613 614 615 /* 616 * Process a fired node which has completed 617 */ 618 static void 619 ProcessNode(RF_DagNode_t *node, int context) 620 { 621 RF_Raid_t *raidPtr; 622 623 raidPtr = node->dagHdr->raidPtr; 624 625 switch (node->status) { 626 case rf_good: 627 /* normal case, don't need to do anything */ 628 break; 629 case rf_bad: 630 if ((node->dagHdr->numCommits > 0) || 631 (node->dagHdr->numCommitNodes == 0)) { 632 /* crossed commit barrier */ 633 node->dagHdr->status = rf_rollForward; 634 #if RF_DEBUG_ENGINE 635 if (rf_engineDebug) { 636 printf("raid%d: node (%s) returned fail, rolling forward\n", raidPtr->raidid, node->name); 637 } 638 #endif 639 } else { 640 /* never reached commit barrier */ 641 node->dagHdr->status = rf_rollBackward; 642 #if RF_DEBUG_ENGINE 643 if (rf_engineDebug) { 644 printf("raid%d: node (%s) returned fail, rolling backward\n", raidPtr->raidid, node->name); 645 } 646 #endif 647 } 648 break; 649 case rf_undone: 650 /* normal rollBackward case, don't need to do anything */ 651 break; 652 case rf_panic: 653 /* an undo node failed!!! */ 654 printf("UNDO of a node failed!!!/n"); 655 break; 656 default: 657 printf("node finished execution with an illegal status!!!\n"); 658 RF_PANIC(); 659 break; 660 } 661 662 /* enqueue node's succedents (antecedents if rollBackward) for 663 * execution */ 664 PropagateResults(node, context); 665 } 666 667 668 669 /* user context or dag-exec-thread context: 670 * This is the first step in post-processing a newly-completed node. 671 * This routine is called by each node execution function to mark the node 672 * as complete and fire off any successors that have been enabled. 673 */ 674 int 675 rf_FinishNode(RF_DagNode_t *node, int context) 676 { 677 int retcode = RF_FALSE; 678 node->dagHdr->numNodesCompleted++; 679 ProcessNode(node, context); 680 681 return (retcode); 682 } 683 684 685 /* user context: submit dag for execution, return non-zero if we have 686 * to wait for completion. if and only if we return non-zero, we'll 687 * cause cbFunc to get invoked with cbArg when the DAG has completed. 688 * 689 * for now we always return 1. If the DAG does not cause any I/O, 690 * then the callback may get invoked before DispatchDAG returns. 691 * There's code in state 5 of ContinueRaidAccess to handle this. 692 * 693 * All we do here is fire the direct successors of the header node. 694 * The DAG execution thread does the rest of the dag processing. */ 695 int 696 rf_DispatchDAG(RF_DagHeader_t *dag, void (*cbFunc) (void *), 697 void *cbArg) 698 { 699 RF_Raid_t *raidPtr; 700 701 raidPtr = dag->raidPtr; 702 #if RF_ACC_TRACE > 0 703 if (dag->tracerec) { 704 RF_ETIMER_START(dag->tracerec->timer); 705 } 706 #endif 707 #if DEBUG 708 #if RF_DEBUG_VALIDATE_DAG 709 if (rf_engineDebug || rf_validateDAGDebug) { 710 if (rf_ValidateDAG(dag)) 711 RF_PANIC(); 712 } 713 #endif 714 #endif 715 #if RF_DEBUG_ENGINE 716 if (rf_engineDebug) { 717 printf("raid%d: Entering DispatchDAG\n", raidPtr->raidid); 718 } 719 #endif 720 raidPtr->dags_in_flight++; /* debug only: blow off proper 721 * locking */ 722 dag->cbFunc = cbFunc; 723 dag->cbArg = cbArg; 724 dag->numNodesCompleted = 0; 725 dag->status = rf_enable; 726 FireNodeArray(dag->numSuccedents, dag->succedents); 727 return (1); 728 } 729 /* dedicated kernel thread: the thread that handles all DAG node 730 * firing. To minimize locking and unlocking, we grab a copy of the 731 * entire node queue and then set the node queue to NULL before doing 732 * any firing of nodes. This way we only have to release the lock 733 * once. Of course, it's probably rare that there's more than one 734 * node in the queue at any one time, but it sometimes happens. 735 */ 736 737 static void 738 DAGExecutionThread(RF_ThreadArg_t arg) 739 { 740 RF_DagNode_t *nd, *local_nq, *term_nq, *fire_nq; 741 RF_Raid_t *raidPtr; 742 int ks; 743 int s; 744 745 raidPtr = (RF_Raid_t *) arg; 746 747 #if RF_DEBUG_ENGINE 748 if (rf_engineDebug) { 749 printf("raid%d: Engine thread is running\n", raidPtr->raidid); 750 } 751 #endif 752 s = splbio(); 753 754 DO_LOCK(raidPtr); 755 while (!raidPtr->shutdown_engine) { 756 757 while (raidPtr->node_queue != NULL) { 758 local_nq = raidPtr->node_queue; 759 fire_nq = NULL; 760 term_nq = NULL; 761 raidPtr->node_queue = NULL; 762 DO_UNLOCK(raidPtr); 763 764 /* first, strip out the terminal nodes */ 765 while (local_nq) { 766 nd = local_nq; 767 local_nq = local_nq->next; 768 switch (nd->dagHdr->status) { 769 case rf_enable: 770 case rf_rollForward: 771 if (nd->numSuccedents == 0) { 772 /* end of the dag, add to 773 * callback list */ 774 nd->next = term_nq; 775 term_nq = nd; 776 } else { 777 /* not the end, add to the 778 * fire queue */ 779 nd->next = fire_nq; 780 fire_nq = nd; 781 } 782 break; 783 case rf_rollBackward: 784 if (nd->numAntecedents == 0) { 785 /* end of the dag, add to the 786 * callback list */ 787 nd->next = term_nq; 788 term_nq = nd; 789 } else { 790 /* not the end, add to the 791 * fire queue */ 792 nd->next = fire_nq; 793 fire_nq = nd; 794 } 795 break; 796 default: 797 RF_PANIC(); 798 break; 799 } 800 } 801 802 /* execute callback of dags which have reached the 803 * terminal node */ 804 while (term_nq) { 805 nd = term_nq; 806 term_nq = term_nq->next; 807 nd->next = NULL; 808 (nd->dagHdr->cbFunc) (nd->dagHdr->cbArg); 809 raidPtr->dags_in_flight--; /* debug only */ 810 } 811 812 /* fire remaining nodes */ 813 FireNodeList(fire_nq); 814 815 DO_LOCK(raidPtr); 816 } 817 while (!raidPtr->shutdown_engine && 818 raidPtr->node_queue == NULL) { 819 DO_WAIT(raidPtr); 820 } 821 } 822 DO_UNLOCK(raidPtr); 823 824 splx(s); 825 kthread_exit(0); 826 } 827 828 /* 829 * rf_RaidIOThread() -- When I/O to a component begins, raidstrategy() 830 * puts the I/O on a buf_queue, and then signals raidPtr->iodone. If 831 * necessary, this function calls raidstart() to initiate the I/O. 832 * When I/O to a component completes, KernelWakeupFunc() puts the 833 * completed request onto raidPtr->iodone TAILQ. This function looks 834 * after requests on that queue by calling rf_DiskIOComplete() for the 835 * request, and by calling any required CompleteFunc for the request. 836 */ 837 838 static void 839 rf_RaidIOThread(RF_ThreadArg_t arg) 840 { 841 RF_Raid_t *raidPtr; 842 RF_DiskQueueData_t *req; 843 int s; 844 845 raidPtr = (RF_Raid_t *) arg; 846 847 s = splbio(); 848 simple_lock(&(raidPtr->iodone_lock)); 849 850 while (!raidPtr->shutdown_raidio) { 851 /* if there is nothing to do, then snooze. */ 852 if (TAILQ_EMPTY(&(raidPtr->iodone)) && 853 rf_buf_queue_check(raidPtr->raidid)) { 854 ltsleep(&(raidPtr->iodone), PRIBIO, "raidiow", 0, 855 &(raidPtr->iodone_lock)); 856 } 857 858 /* See what I/Os, if any, have arrived */ 859 while ((req = TAILQ_FIRST(&(raidPtr->iodone))) != NULL) { 860 TAILQ_REMOVE(&(raidPtr->iodone), req, iodone_entries); 861 simple_unlock(&(raidPtr->iodone_lock)); 862 rf_DiskIOComplete(req->queue, req, req->error); 863 (req->CompleteFunc) (req->argument, req->error); 864 simple_lock(&(raidPtr->iodone_lock)); 865 } 866 867 /* process any pending outgoing IO */ 868 simple_unlock(&(raidPtr->iodone_lock)); 869 raidstart(raidPtr); 870 simple_lock(&(raidPtr->iodone_lock)); 871 872 } 873 874 /* Let rf_ShutdownEngine know that we're done... */ 875 raidPtr->shutdown_raidio = 0; 876 wakeup(&(raidPtr->shutdown_raidio)); 877 878 simple_unlock(&(raidPtr->iodone_lock)); 879 splx(s); 880 881 kthread_exit(0); 882 } 883