1 /* $NetBSD: rf_engine.c,v 1.40 2009/11/17 18:54:26 jld Exp $ */ 2 /* 3 * Copyright (c) 1995 Carnegie-Mellon University. 4 * All rights reserved. 5 * 6 * Author: William V. Courtright II, Mark Holland, Rachad Youssef 7 * 8 * Permission to use, copy, modify and distribute this software and 9 * its documentation is hereby granted, provided that both the copyright 10 * notice and this permission notice appear in all copies of the 11 * software, derivative works or modified versions, and any portions 12 * thereof, and that both notices appear in supporting documentation. 13 * 14 * CARNEGIE MELLON ALLOWS FREE USE OF THIS SOFTWARE IN ITS "AS IS" 15 * CONDITION. CARNEGIE MELLON DISCLAIMS ANY LIABILITY OF ANY KIND 16 * FOR ANY DAMAGES WHATSOEVER RESULTING FROM THE USE OF THIS SOFTWARE. 17 * 18 * Carnegie Mellon requests users of this software to return to 19 * 20 * Software Distribution Coordinator or Software.Distribution@CS.CMU.EDU 21 * School of Computer Science 22 * Carnegie Mellon University 23 * Pittsburgh PA 15213-3890 24 * 25 * any improvements or extensions that they make and grant Carnegie the 26 * rights to redistribute these changes. 27 */ 28 29 /**************************************************************************** 30 * * 31 * engine.c -- code for DAG execution engine * 32 * * 33 * Modified to work as follows (holland): * 34 * A user-thread calls into DispatchDAG, which fires off the nodes that * 35 * are direct successors to the header node. DispatchDAG then returns, * 36 * and the rest of the I/O continues asynchronously. As each node * 37 * completes, the node execution function calls FinishNode(). FinishNode * 38 * scans the list of successors to the node and increments the antecedent * 39 * counts. Each node that becomes enabled is placed on a central node * 40 * queue. A dedicated dag-execution thread grabs nodes off of this * 41 * queue and fires them. * 42 * * 43 * NULL nodes are never fired. * 44 * * 45 * Terminator nodes are never fired, but rather cause the callback * 46 * associated with the DAG to be invoked. * 47 * * 48 * If a node fails, the dag either rolls forward to the completion or * 49 * rolls back, undoing previously-completed nodes and fails atomically. * 50 * The direction of recovery is determined by the location of the failed * 51 * node in the graph. If the failure occurred before the commit node in * 52 * the graph, backward recovery is used. Otherwise, forward recovery is * 53 * used. * 54 * * 55 ****************************************************************************/ 56 57 #include <sys/cdefs.h> 58 __KERNEL_RCSID(0, "$NetBSD: rf_engine.c,v 1.40 2009/11/17 18:54:26 jld Exp $"); 59 60 #include <sys/errno.h> 61 62 #include "rf_threadstuff.h" 63 #include "rf_dag.h" 64 #include "rf_engine.h" 65 #include "rf_etimer.h" 66 #include "rf_general.h" 67 #include "rf_dagutils.h" 68 #include "rf_shutdown.h" 69 #include "rf_raid.h" 70 #include "rf_kintf.h" 71 #include "rf_paritymap.h" 72 73 static void rf_ShutdownEngine(void *); 74 static void DAGExecutionThread(RF_ThreadArg_t arg); 75 static void rf_RaidIOThread(RF_ThreadArg_t arg); 76 77 /* synchronization primitives for this file. DO_WAIT should be enclosed in a while loop. */ 78 79 #define DO_LOCK(_r_) \ 80 do { \ 81 ks = splbio(); \ 82 RF_LOCK_MUTEX((_r_)->node_queue_mutex); \ 83 } while (0) 84 85 #define DO_UNLOCK(_r_) \ 86 do { \ 87 RF_UNLOCK_MUTEX((_r_)->node_queue_mutex); \ 88 splx(ks); \ 89 } while (0) 90 91 #define DO_WAIT(_r_) \ 92 RF_WAIT_COND((_r_)->node_queue, (_r_)->node_queue_mutex) 93 94 #define DO_SIGNAL(_r_) \ 95 RF_BROADCAST_COND((_r_)->node_queue) /* XXX RF_SIGNAL_COND? */ 96 97 static void 98 rf_ShutdownEngine(void *arg) 99 { 100 RF_Raid_t *raidPtr; 101 int ks; 102 103 raidPtr = (RF_Raid_t *) arg; 104 105 /* Tell the rf_RaidIOThread to shutdown */ 106 simple_lock(&(raidPtr->iodone_lock)); 107 108 raidPtr->shutdown_raidio = 1; 109 wakeup(&(raidPtr->iodone)); 110 111 /* ...and wait for it to tell us it has finished */ 112 while (raidPtr->shutdown_raidio) 113 ltsleep(&(raidPtr->shutdown_raidio), PRIBIO, "raidshutdown", 0, 114 &(raidPtr->iodone_lock)); 115 116 simple_unlock(&(raidPtr->iodone_lock)); 117 118 /* Now shut down the DAG execution engine. */ 119 DO_LOCK(raidPtr); 120 raidPtr->shutdown_engine = 1; 121 DO_SIGNAL(raidPtr); 122 DO_UNLOCK(raidPtr); 123 124 } 125 126 int 127 rf_ConfigureEngine(RF_ShutdownList_t **listp, RF_Raid_t *raidPtr, 128 RF_Config_t *cfgPtr) 129 { 130 131 rf_mutex_init(&raidPtr->node_queue_mutex); 132 raidPtr->node_queue = NULL; 133 raidPtr->dags_in_flight = 0; 134 135 /* we create the execution thread only once per system boot. no need 136 * to check return code b/c the kernel panics if it can't create the 137 * thread. */ 138 #if RF_DEBUG_ENGINE 139 if (rf_engineDebug) { 140 printf("raid%d: Creating engine thread\n", raidPtr->raidid); 141 } 142 #endif 143 if (RF_CREATE_ENGINE_THREAD(raidPtr->engine_thread, 144 DAGExecutionThread, raidPtr, 145 "raid%d", raidPtr->raidid)) { 146 printf("raid%d: Unable to create engine thread\n", 147 raidPtr->raidid); 148 return (ENOMEM); 149 } 150 if (RF_CREATE_ENGINE_THREAD(raidPtr->engine_helper_thread, 151 rf_RaidIOThread, raidPtr, 152 "raidio%d", raidPtr->raidid)) { 153 printf("raid%d: Unable to create raidio thread\n", 154 raidPtr->raidid); 155 return (ENOMEM); 156 } 157 #if RF_DEBUG_ENGINE 158 if (rf_engineDebug) { 159 printf("raid%d: Created engine thread\n", raidPtr->raidid); 160 } 161 #endif 162 163 /* engine thread is now running and waiting for work */ 164 #if RF_DEBUG_ENGINE 165 if (rf_engineDebug) { 166 printf("raid%d: Engine thread running and waiting for events\n", raidPtr->raidid); 167 } 168 #endif 169 rf_ShutdownCreate(listp, rf_ShutdownEngine, raidPtr); 170 171 return (0); 172 } 173 174 static int 175 BranchDone(RF_DagNode_t *node) 176 { 177 int i; 178 179 /* return true if forward execution is completed for a node and it's 180 * succedents */ 181 switch (node->status) { 182 case rf_wait: 183 /* should never be called in this state */ 184 RF_PANIC(); 185 break; 186 case rf_fired: 187 /* node is currently executing, so we're not done */ 188 return (RF_FALSE); 189 case rf_good: 190 /* for each succedent recursively check branch */ 191 for (i = 0; i < node->numSuccedents; i++) 192 if (!BranchDone(node->succedents[i])) 193 return RF_FALSE; 194 return RF_TRUE; /* node and all succedent branches aren't in 195 * fired state */ 196 case rf_bad: 197 /* succedents can't fire */ 198 return (RF_TRUE); 199 case rf_recover: 200 /* should never be called in this state */ 201 RF_PANIC(); 202 break; 203 case rf_undone: 204 case rf_panic: 205 /* XXX need to fix this case */ 206 /* for now, assume that we're done */ 207 return (RF_TRUE); 208 default: 209 /* illegal node status */ 210 RF_PANIC(); 211 break; 212 } 213 } 214 215 static int 216 NodeReady(RF_DagNode_t *node) 217 { 218 int ready; 219 220 switch (node->dagHdr->status) { 221 case rf_enable: 222 case rf_rollForward: 223 if ((node->status == rf_wait) && 224 (node->numAntecedents == node->numAntDone)) 225 ready = RF_TRUE; 226 else 227 ready = RF_FALSE; 228 break; 229 case rf_rollBackward: 230 RF_ASSERT(node->numSuccDone <= node->numSuccedents); 231 RF_ASSERT(node->numSuccFired <= node->numSuccedents); 232 RF_ASSERT(node->numSuccFired <= node->numSuccDone); 233 if ((node->status == rf_good) && 234 (node->numSuccDone == node->numSuccedents)) 235 ready = RF_TRUE; 236 else 237 ready = RF_FALSE; 238 break; 239 default: 240 printf("Execution engine found illegal DAG status in NodeReady\n"); 241 RF_PANIC(); 242 break; 243 } 244 245 return (ready); 246 } 247 248 249 250 /* user context and dag-exec-thread context: Fire a node. The node's 251 * status field determines which function, do or undo, to be fired. 252 * This routine assumes that the node's status field has alread been 253 * set to "fired" or "recover" to indicate the direction of execution. 254 */ 255 static void 256 FireNode(RF_DagNode_t *node) 257 { 258 switch (node->status) { 259 case rf_fired: 260 /* fire the do function of a node */ 261 #if RF_DEBUG_ENGINE 262 if (rf_engineDebug) { 263 printf("raid%d: Firing node 0x%lx (%s)\n", 264 node->dagHdr->raidPtr->raidid, 265 (unsigned long) node, node->name); 266 } 267 #endif 268 if (node->flags & RF_DAGNODE_FLAG_YIELD) { 269 #if defined(__NetBSD__) && defined(_KERNEL) 270 /* thread_block(); */ 271 /* printf("Need to block the thread here...\n"); */ 272 /* XXX thread_block is actually mentioned in 273 * /usr/include/vm/vm_extern.h */ 274 #else 275 thread_block(); 276 #endif 277 } 278 (*(node->doFunc)) (node); 279 break; 280 case rf_recover: 281 /* fire the undo function of a node */ 282 #if RF_DEBUG_ENGINE 283 if (rf_engineDebug) { 284 printf("raid%d: Firing (undo) node 0x%lx (%s)\n", 285 node->dagHdr->raidPtr->raidid, 286 (unsigned long) node, node->name); 287 } 288 #endif 289 if (node->flags & RF_DAGNODE_FLAG_YIELD) 290 #if defined(__NetBSD__) && defined(_KERNEL) 291 /* thread_block(); */ 292 /* printf("Need to block the thread here...\n"); */ 293 /* XXX thread_block is actually mentioned in 294 * /usr/include/vm/vm_extern.h */ 295 #else 296 thread_block(); 297 #endif 298 (*(node->undoFunc)) (node); 299 break; 300 default: 301 RF_PANIC(); 302 break; 303 } 304 } 305 306 307 308 /* user context: 309 * Attempt to fire each node in a linear array. 310 * The entire list is fired atomically. 311 */ 312 static void 313 FireNodeArray(int numNodes, RF_DagNode_t **nodeList) 314 { 315 RF_DagStatus_t dstat; 316 RF_DagNode_t *node; 317 int i, j; 318 319 /* first, mark all nodes which are ready to be fired */ 320 for (i = 0; i < numNodes; i++) { 321 node = nodeList[i]; 322 dstat = node->dagHdr->status; 323 RF_ASSERT((node->status == rf_wait) || 324 (node->status == rf_good)); 325 if (NodeReady(node)) { 326 if ((dstat == rf_enable) || 327 (dstat == rf_rollForward)) { 328 RF_ASSERT(node->status == rf_wait); 329 if (node->commitNode) 330 node->dagHdr->numCommits++; 331 node->status = rf_fired; 332 for (j = 0; j < node->numAntecedents; j++) 333 node->antecedents[j]->numSuccFired++; 334 } else { 335 RF_ASSERT(dstat == rf_rollBackward); 336 RF_ASSERT(node->status == rf_good); 337 /* only one commit node per graph */ 338 RF_ASSERT(node->commitNode == RF_FALSE); 339 node->status = rf_recover; 340 } 341 } 342 } 343 /* now, fire the nodes */ 344 for (i = 0; i < numNodes; i++) { 345 if ((nodeList[i]->status == rf_fired) || 346 (nodeList[i]->status == rf_recover)) 347 FireNode(nodeList[i]); 348 } 349 } 350 351 352 /* user context: 353 * Attempt to fire each node in a linked list. 354 * The entire list is fired atomically. 355 */ 356 static void 357 FireNodeList(RF_DagNode_t *nodeList) 358 { 359 RF_DagNode_t *node, *next; 360 RF_DagStatus_t dstat; 361 int j; 362 363 if (nodeList) { 364 /* first, mark all nodes which are ready to be fired */ 365 for (node = nodeList; node; node = next) { 366 next = node->next; 367 dstat = node->dagHdr->status; 368 RF_ASSERT((node->status == rf_wait) || 369 (node->status == rf_good)); 370 if (NodeReady(node)) { 371 if ((dstat == rf_enable) || 372 (dstat == rf_rollForward)) { 373 RF_ASSERT(node->status == rf_wait); 374 if (node->commitNode) 375 node->dagHdr->numCommits++; 376 node->status = rf_fired; 377 for (j = 0; j < node->numAntecedents; j++) 378 node->antecedents[j]->numSuccFired++; 379 } else { 380 RF_ASSERT(dstat == rf_rollBackward); 381 RF_ASSERT(node->status == rf_good); 382 /* only one commit node per graph */ 383 RF_ASSERT(node->commitNode == RF_FALSE); 384 node->status = rf_recover; 385 } 386 } 387 } 388 /* now, fire the nodes */ 389 for (node = nodeList; node; node = next) { 390 next = node->next; 391 if ((node->status == rf_fired) || 392 (node->status == rf_recover)) 393 FireNode(node); 394 } 395 } 396 } 397 /* interrupt context: 398 * for each succedent 399 * propagate required results from node to succedent 400 * increment succedent's numAntDone 401 * place newly-enable nodes on node queue for firing 402 * 403 * To save context switches, we don't place NIL nodes on the node queue, 404 * but rather just process them as if they had fired. Note that NIL nodes 405 * that are the direct successors of the header will actually get fired by 406 * DispatchDAG, which is fine because no context switches are involved. 407 * 408 * Important: when running at user level, this can be called by any 409 * disk thread, and so the increment and check of the antecedent count 410 * must be locked. I used the node queue mutex and locked down the 411 * entire function, but this is certainly overkill. 412 */ 413 static void 414 PropagateResults(RF_DagNode_t *node, int context) 415 { 416 RF_DagNode_t *s, *a; 417 RF_Raid_t *raidPtr; 418 int i, ks; 419 RF_DagNode_t *finishlist = NULL; /* a list of NIL nodes to be 420 * finished */ 421 RF_DagNode_t *skiplist = NULL; /* list of nodes with failed truedata 422 * antecedents */ 423 RF_DagNode_t *firelist = NULL; /* a list of nodes to be fired */ 424 RF_DagNode_t *q = NULL, *qh = NULL, *next; 425 int j, skipNode; 426 427 raidPtr = node->dagHdr->raidPtr; 428 429 DO_LOCK(raidPtr); 430 431 /* debug - validate fire counts */ 432 for (i = 0; i < node->numAntecedents; i++) { 433 a = *(node->antecedents + i); 434 RF_ASSERT(a->numSuccFired >= a->numSuccDone); 435 RF_ASSERT(a->numSuccFired <= a->numSuccedents); 436 a->numSuccDone++; 437 } 438 439 switch (node->dagHdr->status) { 440 case rf_enable: 441 case rf_rollForward: 442 for (i = 0; i < node->numSuccedents; i++) { 443 s = *(node->succedents + i); 444 RF_ASSERT(s->status == rf_wait); 445 (s->numAntDone)++; 446 if (s->numAntDone == s->numAntecedents) { 447 /* look for NIL nodes */ 448 if (s->doFunc == rf_NullNodeFunc) { 449 /* don't fire NIL nodes, just process 450 * them */ 451 s->next = finishlist; 452 finishlist = s; 453 } else { 454 /* look to see if the node is to be 455 * skipped */ 456 skipNode = RF_FALSE; 457 for (j = 0; j < s->numAntecedents; j++) 458 if ((s->antType[j] == rf_trueData) && (s->antecedents[j]->status == rf_bad)) 459 skipNode = RF_TRUE; 460 if (skipNode) { 461 /* this node has one or more 462 * failed true data 463 * dependencies, so skip it */ 464 s->next = skiplist; 465 skiplist = s; 466 } else 467 /* add s to list of nodes (q) 468 * to execute */ 469 if (context != RF_INTR_CONTEXT) { 470 /* we only have to 471 * enqueue if we're at 472 * intr context */ 473 /* put node on 474 a list to 475 be fired 476 after we 477 unlock */ 478 s->next = firelist; 479 firelist = s; 480 } else { 481 /* enqueue the 482 node for 483 the dag 484 exec thread 485 to fire */ 486 RF_ASSERT(NodeReady(s)); 487 if (q) { 488 q->next = s; 489 q = s; 490 } else { 491 qh = q = s; 492 qh->next = NULL; 493 } 494 } 495 } 496 } 497 } 498 499 if (q) { 500 /* xfer our local list of nodes to the node queue */ 501 q->next = raidPtr->node_queue; 502 raidPtr->node_queue = qh; 503 DO_SIGNAL(raidPtr); 504 } 505 DO_UNLOCK(raidPtr); 506 507 for (; skiplist; skiplist = next) { 508 next = skiplist->next; 509 skiplist->status = rf_skipped; 510 for (i = 0; i < skiplist->numAntecedents; i++) { 511 skiplist->antecedents[i]->numSuccFired++; 512 } 513 if (skiplist->commitNode) { 514 skiplist->dagHdr->numCommits++; 515 } 516 rf_FinishNode(skiplist, context); 517 } 518 for (; finishlist; finishlist = next) { 519 /* NIL nodes: no need to fire them */ 520 next = finishlist->next; 521 finishlist->status = rf_good; 522 for (i = 0; i < finishlist->numAntecedents; i++) { 523 finishlist->antecedents[i]->numSuccFired++; 524 } 525 if (finishlist->commitNode) 526 finishlist->dagHdr->numCommits++; 527 /* 528 * Okay, here we're calling rf_FinishNode() on 529 * nodes that have the null function as their 530 * work proc. Such a node could be the 531 * terminal node in a DAG. If so, it will 532 * cause the DAG to complete, which will in 533 * turn free memory used by the DAG, which 534 * includes the node in question. Thus, we 535 * must avoid referencing the node at all 536 * after calling rf_FinishNode() on it. */ 537 rf_FinishNode(finishlist, context); /* recursive call */ 538 } 539 /* fire all nodes in firelist */ 540 FireNodeList(firelist); 541 break; 542 543 case rf_rollBackward: 544 for (i = 0; i < node->numAntecedents; i++) { 545 a = *(node->antecedents + i); 546 RF_ASSERT(a->status == rf_good); 547 RF_ASSERT(a->numSuccDone <= a->numSuccedents); 548 RF_ASSERT(a->numSuccDone <= a->numSuccFired); 549 550 if (a->numSuccDone == a->numSuccFired) { 551 if (a->undoFunc == rf_NullNodeFunc) { 552 /* don't fire NIL nodes, just process 553 * them */ 554 a->next = finishlist; 555 finishlist = a; 556 } else { 557 if (context != RF_INTR_CONTEXT) { 558 /* we only have to enqueue if 559 * we're at intr context */ 560 /* put node on a list to be 561 fired after we unlock */ 562 a->next = firelist; 563 564 firelist = a; 565 } else { 566 /* enqueue the node for the 567 dag exec thread to fire */ 568 RF_ASSERT(NodeReady(a)); 569 if (q) { 570 q->next = a; 571 q = a; 572 } else { 573 qh = q = a; 574 qh->next = NULL; 575 } 576 } 577 } 578 } 579 } 580 if (q) { 581 /* xfer our local list of nodes to the node queue */ 582 q->next = raidPtr->node_queue; 583 raidPtr->node_queue = qh; 584 DO_SIGNAL(raidPtr); 585 } 586 DO_UNLOCK(raidPtr); 587 for (; finishlist; finishlist = next) { 588 /* NIL nodes: no need to fire them */ 589 next = finishlist->next; 590 finishlist->status = rf_good; 591 /* 592 * Okay, here we're calling rf_FinishNode() on 593 * nodes that have the null function as their 594 * work proc. Such a node could be the first 595 * node in a DAG. If so, it will cause the DAG 596 * to complete, which will in turn free memory 597 * used by the DAG, which includes the node in 598 * question. Thus, we must avoid referencing 599 * the node at all after calling 600 * rf_FinishNode() on it. */ 601 rf_FinishNode(finishlist, context); /* recursive call */ 602 } 603 /* fire all nodes in firelist */ 604 FireNodeList(firelist); 605 606 break; 607 default: 608 printf("Engine found illegal DAG status in PropagateResults()\n"); 609 RF_PANIC(); 610 break; 611 } 612 } 613 614 615 616 /* 617 * Process a fired node which has completed 618 */ 619 static void 620 ProcessNode(RF_DagNode_t *node, int context) 621 { 622 RF_Raid_t *raidPtr; 623 624 raidPtr = node->dagHdr->raidPtr; 625 626 switch (node->status) { 627 case rf_good: 628 /* normal case, don't need to do anything */ 629 break; 630 case rf_bad: 631 if ((node->dagHdr->numCommits > 0) || 632 (node->dagHdr->numCommitNodes == 0)) { 633 /* crossed commit barrier */ 634 node->dagHdr->status = rf_rollForward; 635 #if RF_DEBUG_ENGINE 636 if (rf_engineDebug) { 637 printf("raid%d: node (%s) returned fail, rolling forward\n", raidPtr->raidid, node->name); 638 } 639 #endif 640 } else { 641 /* never reached commit barrier */ 642 node->dagHdr->status = rf_rollBackward; 643 #if RF_DEBUG_ENGINE 644 if (rf_engineDebug) { 645 printf("raid%d: node (%s) returned fail, rolling backward\n", raidPtr->raidid, node->name); 646 } 647 #endif 648 } 649 break; 650 case rf_undone: 651 /* normal rollBackward case, don't need to do anything */ 652 break; 653 case rf_panic: 654 /* an undo node failed!!! */ 655 printf("UNDO of a node failed!!!/n"); 656 break; 657 default: 658 printf("node finished execution with an illegal status!!!\n"); 659 RF_PANIC(); 660 break; 661 } 662 663 /* enqueue node's succedents (antecedents if rollBackward) for 664 * execution */ 665 PropagateResults(node, context); 666 } 667 668 669 670 /* user context or dag-exec-thread context: 671 * This is the first step in post-processing a newly-completed node. 672 * This routine is called by each node execution function to mark the node 673 * as complete and fire off any successors that have been enabled. 674 */ 675 int 676 rf_FinishNode(RF_DagNode_t *node, int context) 677 { 678 int retcode = RF_FALSE; 679 node->dagHdr->numNodesCompleted++; 680 ProcessNode(node, context); 681 682 return (retcode); 683 } 684 685 686 /* user context: submit dag for execution, return non-zero if we have 687 * to wait for completion. if and only if we return non-zero, we'll 688 * cause cbFunc to get invoked with cbArg when the DAG has completed. 689 * 690 * for now we always return 1. If the DAG does not cause any I/O, 691 * then the callback may get invoked before DispatchDAG returns. 692 * There's code in state 5 of ContinueRaidAccess to handle this. 693 * 694 * All we do here is fire the direct successors of the header node. 695 * The DAG execution thread does the rest of the dag processing. */ 696 int 697 rf_DispatchDAG(RF_DagHeader_t *dag, void (*cbFunc) (void *), 698 void *cbArg) 699 { 700 RF_Raid_t *raidPtr; 701 702 raidPtr = dag->raidPtr; 703 #if RF_ACC_TRACE > 0 704 if (dag->tracerec) { 705 RF_ETIMER_START(dag->tracerec->timer); 706 } 707 #endif 708 #if DEBUG 709 #if RF_DEBUG_VALIDATE_DAG 710 if (rf_engineDebug || rf_validateDAGDebug) { 711 if (rf_ValidateDAG(dag)) 712 RF_PANIC(); 713 } 714 #endif 715 #endif 716 #if RF_DEBUG_ENGINE 717 if (rf_engineDebug) { 718 printf("raid%d: Entering DispatchDAG\n", raidPtr->raidid); 719 } 720 #endif 721 raidPtr->dags_in_flight++; /* debug only: blow off proper 722 * locking */ 723 dag->cbFunc = cbFunc; 724 dag->cbArg = cbArg; 725 dag->numNodesCompleted = 0; 726 dag->status = rf_enable; 727 FireNodeArray(dag->numSuccedents, dag->succedents); 728 return (1); 729 } 730 /* dedicated kernel thread: the thread that handles all DAG node 731 * firing. To minimize locking and unlocking, we grab a copy of the 732 * entire node queue and then set the node queue to NULL before doing 733 * any firing of nodes. This way we only have to release the lock 734 * once. Of course, it's probably rare that there's more than one 735 * node in the queue at any one time, but it sometimes happens. 736 */ 737 738 static void 739 DAGExecutionThread(RF_ThreadArg_t arg) 740 { 741 RF_DagNode_t *nd, *local_nq, *term_nq, *fire_nq; 742 RF_Raid_t *raidPtr; 743 int ks; 744 int s; 745 746 raidPtr = (RF_Raid_t *) arg; 747 748 #if RF_DEBUG_ENGINE 749 if (rf_engineDebug) { 750 printf("raid%d: Engine thread is running\n", raidPtr->raidid); 751 } 752 #endif 753 s = splbio(); 754 755 DO_LOCK(raidPtr); 756 while (!raidPtr->shutdown_engine) { 757 758 while (raidPtr->node_queue != NULL) { 759 local_nq = raidPtr->node_queue; 760 fire_nq = NULL; 761 term_nq = NULL; 762 raidPtr->node_queue = NULL; 763 DO_UNLOCK(raidPtr); 764 765 /* first, strip out the terminal nodes */ 766 while (local_nq) { 767 nd = local_nq; 768 local_nq = local_nq->next; 769 switch (nd->dagHdr->status) { 770 case rf_enable: 771 case rf_rollForward: 772 if (nd->numSuccedents == 0) { 773 /* end of the dag, add to 774 * callback list */ 775 nd->next = term_nq; 776 term_nq = nd; 777 } else { 778 /* not the end, add to the 779 * fire queue */ 780 nd->next = fire_nq; 781 fire_nq = nd; 782 } 783 break; 784 case rf_rollBackward: 785 if (nd->numAntecedents == 0) { 786 /* end of the dag, add to the 787 * callback list */ 788 nd->next = term_nq; 789 term_nq = nd; 790 } else { 791 /* not the end, add to the 792 * fire queue */ 793 nd->next = fire_nq; 794 fire_nq = nd; 795 } 796 break; 797 default: 798 RF_PANIC(); 799 break; 800 } 801 } 802 803 /* execute callback of dags which have reached the 804 * terminal node */ 805 while (term_nq) { 806 nd = term_nq; 807 term_nq = term_nq->next; 808 nd->next = NULL; 809 (nd->dagHdr->cbFunc) (nd->dagHdr->cbArg); 810 raidPtr->dags_in_flight--; /* debug only */ 811 } 812 813 /* fire remaining nodes */ 814 FireNodeList(fire_nq); 815 816 DO_LOCK(raidPtr); 817 } 818 while (!raidPtr->shutdown_engine && 819 raidPtr->node_queue == NULL) { 820 DO_WAIT(raidPtr); 821 } 822 } 823 DO_UNLOCK(raidPtr); 824 825 splx(s); 826 kthread_exit(0); 827 } 828 829 /* 830 * rf_RaidIOThread() -- When I/O to a component begins, raidstrategy() 831 * puts the I/O on a buf_queue, and then signals raidPtr->iodone. If 832 * necessary, this function calls raidstart() to initiate the I/O. 833 * When I/O to a component completes, KernelWakeupFunc() puts the 834 * completed request onto raidPtr->iodone TAILQ. This function looks 835 * after requests on that queue by calling rf_DiskIOComplete() for the 836 * request, and by calling any required CompleteFunc for the request. 837 */ 838 839 static void 840 rf_RaidIOThread(RF_ThreadArg_t arg) 841 { 842 RF_Raid_t *raidPtr; 843 RF_DiskQueueData_t *req; 844 int s; 845 846 raidPtr = (RF_Raid_t *) arg; 847 848 s = splbio(); 849 simple_lock(&(raidPtr->iodone_lock)); 850 851 while (!raidPtr->shutdown_raidio) { 852 /* if there is nothing to do, then snooze. */ 853 if (TAILQ_EMPTY(&(raidPtr->iodone)) && 854 rf_buf_queue_check(raidPtr->raidid)) { 855 ltsleep(&(raidPtr->iodone), PRIBIO, "raidiow", 0, 856 &(raidPtr->iodone_lock)); 857 } 858 859 /* Check for deferred parity-map-related work. */ 860 if (raidPtr->parity_map != NULL) { 861 simple_unlock(&(raidPtr->iodone_lock)); 862 rf_paritymap_checkwork(raidPtr->parity_map); 863 simple_lock(&(raidPtr->iodone_lock)); 864 } 865 866 /* See what I/Os, if any, have arrived */ 867 while ((req = TAILQ_FIRST(&(raidPtr->iodone))) != NULL) { 868 TAILQ_REMOVE(&(raidPtr->iodone), req, iodone_entries); 869 simple_unlock(&(raidPtr->iodone_lock)); 870 rf_DiskIOComplete(req->queue, req, req->error); 871 (req->CompleteFunc) (req->argument, req->error); 872 simple_lock(&(raidPtr->iodone_lock)); 873 } 874 875 /* process any pending outgoing IO */ 876 simple_unlock(&(raidPtr->iodone_lock)); 877 raidstart(raidPtr); 878 simple_lock(&(raidPtr->iodone_lock)); 879 880 } 881 882 /* Let rf_ShutdownEngine know that we're done... */ 883 raidPtr->shutdown_raidio = 0; 884 wakeup(&(raidPtr->shutdown_raidio)); 885 886 simple_unlock(&(raidPtr->iodone_lock)); 887 splx(s); 888 889 kthread_exit(0); 890 } 891