1 /* $NetBSD: rf_engine.c,v 1.41 2010/09/13 08:43:06 drochner Exp $ */ 2 /* 3 * Copyright (c) 1995 Carnegie-Mellon University. 4 * All rights reserved. 5 * 6 * Author: William V. Courtright II, Mark Holland, Rachad Youssef 7 * 8 * Permission to use, copy, modify and distribute this software and 9 * its documentation is hereby granted, provided that both the copyright 10 * notice and this permission notice appear in all copies of the 11 * software, derivative works or modified versions, and any portions 12 * thereof, and that both notices appear in supporting documentation. 13 * 14 * CARNEGIE MELLON ALLOWS FREE USE OF THIS SOFTWARE IN ITS "AS IS" 15 * CONDITION. CARNEGIE MELLON DISCLAIMS ANY LIABILITY OF ANY KIND 16 * FOR ANY DAMAGES WHATSOEVER RESULTING FROM THE USE OF THIS SOFTWARE. 17 * 18 * Carnegie Mellon requests users of this software to return to 19 * 20 * Software Distribution Coordinator or Software.Distribution@CS.CMU.EDU 21 * School of Computer Science 22 * Carnegie Mellon University 23 * Pittsburgh PA 15213-3890 24 * 25 * any improvements or extensions that they make and grant Carnegie the 26 * rights to redistribute these changes. 27 */ 28 29 /**************************************************************************** 30 * * 31 * engine.c -- code for DAG execution engine * 32 * * 33 * Modified to work as follows (holland): * 34 * A user-thread calls into DispatchDAG, which fires off the nodes that * 35 * are direct successors to the header node. DispatchDAG then returns, * 36 * and the rest of the I/O continues asynchronously. As each node * 37 * completes, the node execution function calls FinishNode(). FinishNode * 38 * scans the list of successors to the node and increments the antecedent * 39 * counts. Each node that becomes enabled is placed on a central node * 40 * queue. A dedicated dag-execution thread grabs nodes off of this * 41 * queue and fires them. * 42 * * 43 * NULL nodes are never fired. * 44 * * 45 * Terminator nodes are never fired, but rather cause the callback * 46 * associated with the DAG to be invoked. * 47 * * 48 * If a node fails, the dag either rolls forward to the completion or * 49 * rolls back, undoing previously-completed nodes and fails atomically. * 50 * The direction of recovery is determined by the location of the failed * 51 * node in the graph. If the failure occurred before the commit node in * 52 * the graph, backward recovery is used. Otherwise, forward recovery is * 53 * used. * 54 * * 55 ****************************************************************************/ 56 57 #include <sys/cdefs.h> 58 __KERNEL_RCSID(0, "$NetBSD: rf_engine.c,v 1.41 2010/09/13 08:43:06 drochner Exp $"); 59 60 #include <sys/errno.h> 61 62 #include "rf_threadstuff.h" 63 #include "rf_dag.h" 64 #include "rf_engine.h" 65 #include "rf_etimer.h" 66 #include "rf_general.h" 67 #include "rf_dagutils.h" 68 #include "rf_shutdown.h" 69 #include "rf_raid.h" 70 #include "rf_kintf.h" 71 #include "rf_paritymap.h" 72 73 static void rf_ShutdownEngine(void *); 74 static void DAGExecutionThread(RF_ThreadArg_t arg); 75 static void rf_RaidIOThread(RF_ThreadArg_t arg); 76 77 /* synchronization primitives for this file. DO_WAIT should be enclosed in a while loop. */ 78 79 #define DO_LOCK(_r_) \ 80 do { \ 81 ks = splbio(); \ 82 RF_LOCK_MUTEX((_r_)->node_queue_mutex); \ 83 } while (0) 84 85 #define DO_UNLOCK(_r_) \ 86 do { \ 87 RF_UNLOCK_MUTEX((_r_)->node_queue_mutex); \ 88 splx(ks); \ 89 } while (0) 90 91 #define DO_WAIT(_r_) \ 92 RF_WAIT_COND((_r_)->node_queue, (_r_)->node_queue_mutex) 93 94 #define DO_SIGNAL(_r_) \ 95 RF_BROADCAST_COND((_r_)->node_queue) /* XXX RF_SIGNAL_COND? */ 96 97 static void 98 rf_ShutdownEngine(void *arg) 99 { 100 RF_Raid_t *raidPtr; 101 int ks; 102 103 raidPtr = (RF_Raid_t *) arg; 104 105 /* Tell the rf_RaidIOThread to shutdown */ 106 simple_lock(&(raidPtr->iodone_lock)); 107 108 raidPtr->shutdown_raidio = 1; 109 wakeup(&(raidPtr->iodone)); 110 111 /* ...and wait for it to tell us it has finished */ 112 while (raidPtr->shutdown_raidio) 113 ltsleep(&(raidPtr->shutdown_raidio), PRIBIO, "raidshutdown", 0, 114 &(raidPtr->iodone_lock)); 115 116 simple_unlock(&(raidPtr->iodone_lock)); 117 118 /* Now shut down the DAG execution engine. */ 119 DO_LOCK(raidPtr); 120 raidPtr->shutdown_engine = 1; 121 DO_SIGNAL(raidPtr); 122 DO_UNLOCK(raidPtr); 123 124 } 125 126 int 127 rf_ConfigureEngine(RF_ShutdownList_t **listp, RF_Raid_t *raidPtr, 128 RF_Config_t *cfgPtr) 129 { 130 131 rf_mutex_init(&raidPtr->node_queue_mutex); 132 raidPtr->node_queue = NULL; 133 raidPtr->dags_in_flight = 0; 134 135 /* we create the execution thread only once per system boot. no need 136 * to check return code b/c the kernel panics if it can't create the 137 * thread. */ 138 #if RF_DEBUG_ENGINE 139 if (rf_engineDebug) { 140 printf("raid%d: Creating engine thread\n", raidPtr->raidid); 141 } 142 #endif 143 if (RF_CREATE_ENGINE_THREAD(raidPtr->engine_thread, 144 DAGExecutionThread, raidPtr, 145 "raid%d", raidPtr->raidid)) { 146 printf("raid%d: Unable to create engine thread\n", 147 raidPtr->raidid); 148 return (ENOMEM); 149 } 150 if (RF_CREATE_ENGINE_THREAD(raidPtr->engine_helper_thread, 151 rf_RaidIOThread, raidPtr, 152 "raidio%d", raidPtr->raidid)) { 153 printf("raid%d: Unable to create raidio thread\n", 154 raidPtr->raidid); 155 return (ENOMEM); 156 } 157 #if RF_DEBUG_ENGINE 158 if (rf_engineDebug) { 159 printf("raid%d: Created engine thread\n", raidPtr->raidid); 160 } 161 #endif 162 163 /* engine thread is now running and waiting for work */ 164 #if RF_DEBUG_ENGINE 165 if (rf_engineDebug) { 166 printf("raid%d: Engine thread running and waiting for events\n", raidPtr->raidid); 167 } 168 #endif 169 rf_ShutdownCreate(listp, rf_ShutdownEngine, raidPtr); 170 171 return (0); 172 } 173 174 #if 0 175 static int 176 BranchDone(RF_DagNode_t *node) 177 { 178 int i; 179 180 /* return true if forward execution is completed for a node and it's 181 * succedents */ 182 switch (node->status) { 183 case rf_wait: 184 /* should never be called in this state */ 185 RF_PANIC(); 186 break; 187 case rf_fired: 188 /* node is currently executing, so we're not done */ 189 return (RF_FALSE); 190 case rf_good: 191 /* for each succedent recursively check branch */ 192 for (i = 0; i < node->numSuccedents; i++) 193 if (!BranchDone(node->succedents[i])) 194 return RF_FALSE; 195 return RF_TRUE; /* node and all succedent branches aren't in 196 * fired state */ 197 case rf_bad: 198 /* succedents can't fire */ 199 return (RF_TRUE); 200 case rf_recover: 201 /* should never be called in this state */ 202 RF_PANIC(); 203 break; 204 case rf_undone: 205 case rf_panic: 206 /* XXX need to fix this case */ 207 /* for now, assume that we're done */ 208 return (RF_TRUE); 209 default: 210 /* illegal node status */ 211 RF_PANIC(); 212 break; 213 } 214 } 215 #endif 216 217 static int 218 NodeReady(RF_DagNode_t *node) 219 { 220 int ready; 221 222 switch (node->dagHdr->status) { 223 case rf_enable: 224 case rf_rollForward: 225 if ((node->status == rf_wait) && 226 (node->numAntecedents == node->numAntDone)) 227 ready = RF_TRUE; 228 else 229 ready = RF_FALSE; 230 break; 231 case rf_rollBackward: 232 RF_ASSERT(node->numSuccDone <= node->numSuccedents); 233 RF_ASSERT(node->numSuccFired <= node->numSuccedents); 234 RF_ASSERT(node->numSuccFired <= node->numSuccDone); 235 if ((node->status == rf_good) && 236 (node->numSuccDone == node->numSuccedents)) 237 ready = RF_TRUE; 238 else 239 ready = RF_FALSE; 240 break; 241 default: 242 printf("Execution engine found illegal DAG status in NodeReady\n"); 243 RF_PANIC(); 244 break; 245 } 246 247 return (ready); 248 } 249 250 251 252 /* user context and dag-exec-thread context: Fire a node. The node's 253 * status field determines which function, do or undo, to be fired. 254 * This routine assumes that the node's status field has alread been 255 * set to "fired" or "recover" to indicate the direction of execution. 256 */ 257 static void 258 FireNode(RF_DagNode_t *node) 259 { 260 switch (node->status) { 261 case rf_fired: 262 /* fire the do function of a node */ 263 #if RF_DEBUG_ENGINE 264 if (rf_engineDebug) { 265 printf("raid%d: Firing node 0x%lx (%s)\n", 266 node->dagHdr->raidPtr->raidid, 267 (unsigned long) node, node->name); 268 } 269 #endif 270 if (node->flags & RF_DAGNODE_FLAG_YIELD) { 271 #if defined(__NetBSD__) && defined(_KERNEL) 272 /* thread_block(); */ 273 /* printf("Need to block the thread here...\n"); */ 274 /* XXX thread_block is actually mentioned in 275 * /usr/include/vm/vm_extern.h */ 276 #else 277 thread_block(); 278 #endif 279 } 280 (*(node->doFunc)) (node); 281 break; 282 case rf_recover: 283 /* fire the undo function of a node */ 284 #if RF_DEBUG_ENGINE 285 if (rf_engineDebug) { 286 printf("raid%d: Firing (undo) node 0x%lx (%s)\n", 287 node->dagHdr->raidPtr->raidid, 288 (unsigned long) node, node->name); 289 } 290 #endif 291 if (node->flags & RF_DAGNODE_FLAG_YIELD) 292 #if defined(__NetBSD__) && defined(_KERNEL) 293 /* thread_block(); */ 294 /* printf("Need to block the thread here...\n"); */ 295 /* XXX thread_block is actually mentioned in 296 * /usr/include/vm/vm_extern.h */ 297 #else 298 thread_block(); 299 #endif 300 (*(node->undoFunc)) (node); 301 break; 302 default: 303 RF_PANIC(); 304 break; 305 } 306 } 307 308 309 310 /* user context: 311 * Attempt to fire each node in a linear array. 312 * The entire list is fired atomically. 313 */ 314 static void 315 FireNodeArray(int numNodes, RF_DagNode_t **nodeList) 316 { 317 RF_DagStatus_t dstat; 318 RF_DagNode_t *node; 319 int i, j; 320 321 /* first, mark all nodes which are ready to be fired */ 322 for (i = 0; i < numNodes; i++) { 323 node = nodeList[i]; 324 dstat = node->dagHdr->status; 325 RF_ASSERT((node->status == rf_wait) || 326 (node->status == rf_good)); 327 if (NodeReady(node)) { 328 if ((dstat == rf_enable) || 329 (dstat == rf_rollForward)) { 330 RF_ASSERT(node->status == rf_wait); 331 if (node->commitNode) 332 node->dagHdr->numCommits++; 333 node->status = rf_fired; 334 for (j = 0; j < node->numAntecedents; j++) 335 node->antecedents[j]->numSuccFired++; 336 } else { 337 RF_ASSERT(dstat == rf_rollBackward); 338 RF_ASSERT(node->status == rf_good); 339 /* only one commit node per graph */ 340 RF_ASSERT(node->commitNode == RF_FALSE); 341 node->status = rf_recover; 342 } 343 } 344 } 345 /* now, fire the nodes */ 346 for (i = 0; i < numNodes; i++) { 347 if ((nodeList[i]->status == rf_fired) || 348 (nodeList[i]->status == rf_recover)) 349 FireNode(nodeList[i]); 350 } 351 } 352 353 354 /* user context: 355 * Attempt to fire each node in a linked list. 356 * The entire list is fired atomically. 357 */ 358 static void 359 FireNodeList(RF_DagNode_t *nodeList) 360 { 361 RF_DagNode_t *node, *next; 362 RF_DagStatus_t dstat; 363 int j; 364 365 if (nodeList) { 366 /* first, mark all nodes which are ready to be fired */ 367 for (node = nodeList; node; node = next) { 368 next = node->next; 369 dstat = node->dagHdr->status; 370 RF_ASSERT((node->status == rf_wait) || 371 (node->status == rf_good)); 372 if (NodeReady(node)) { 373 if ((dstat == rf_enable) || 374 (dstat == rf_rollForward)) { 375 RF_ASSERT(node->status == rf_wait); 376 if (node->commitNode) 377 node->dagHdr->numCommits++; 378 node->status = rf_fired; 379 for (j = 0; j < node->numAntecedents; j++) 380 node->antecedents[j]->numSuccFired++; 381 } else { 382 RF_ASSERT(dstat == rf_rollBackward); 383 RF_ASSERT(node->status == rf_good); 384 /* only one commit node per graph */ 385 RF_ASSERT(node->commitNode == RF_FALSE); 386 node->status = rf_recover; 387 } 388 } 389 } 390 /* now, fire the nodes */ 391 for (node = nodeList; node; node = next) { 392 next = node->next; 393 if ((node->status == rf_fired) || 394 (node->status == rf_recover)) 395 FireNode(node); 396 } 397 } 398 } 399 /* interrupt context: 400 * for each succedent 401 * propagate required results from node to succedent 402 * increment succedent's numAntDone 403 * place newly-enable nodes on node queue for firing 404 * 405 * To save context switches, we don't place NIL nodes on the node queue, 406 * but rather just process them as if they had fired. Note that NIL nodes 407 * that are the direct successors of the header will actually get fired by 408 * DispatchDAG, which is fine because no context switches are involved. 409 * 410 * Important: when running at user level, this can be called by any 411 * disk thread, and so the increment and check of the antecedent count 412 * must be locked. I used the node queue mutex and locked down the 413 * entire function, but this is certainly overkill. 414 */ 415 static void 416 PropagateResults(RF_DagNode_t *node, int context) 417 { 418 RF_DagNode_t *s, *a; 419 RF_Raid_t *raidPtr; 420 int i, ks; 421 RF_DagNode_t *finishlist = NULL; /* a list of NIL nodes to be 422 * finished */ 423 RF_DagNode_t *skiplist = NULL; /* list of nodes with failed truedata 424 * antecedents */ 425 RF_DagNode_t *firelist = NULL; /* a list of nodes to be fired */ 426 RF_DagNode_t *q = NULL, *qh = NULL, *next; 427 int j, skipNode; 428 429 raidPtr = node->dagHdr->raidPtr; 430 431 DO_LOCK(raidPtr); 432 433 /* debug - validate fire counts */ 434 for (i = 0; i < node->numAntecedents; i++) { 435 a = *(node->antecedents + i); 436 RF_ASSERT(a->numSuccFired >= a->numSuccDone); 437 RF_ASSERT(a->numSuccFired <= a->numSuccedents); 438 a->numSuccDone++; 439 } 440 441 switch (node->dagHdr->status) { 442 case rf_enable: 443 case rf_rollForward: 444 for (i = 0; i < node->numSuccedents; i++) { 445 s = *(node->succedents + i); 446 RF_ASSERT(s->status == rf_wait); 447 (s->numAntDone)++; 448 if (s->numAntDone == s->numAntecedents) { 449 /* look for NIL nodes */ 450 if (s->doFunc == rf_NullNodeFunc) { 451 /* don't fire NIL nodes, just process 452 * them */ 453 s->next = finishlist; 454 finishlist = s; 455 } else { 456 /* look to see if the node is to be 457 * skipped */ 458 skipNode = RF_FALSE; 459 for (j = 0; j < s->numAntecedents; j++) 460 if ((s->antType[j] == rf_trueData) && (s->antecedents[j]->status == rf_bad)) 461 skipNode = RF_TRUE; 462 if (skipNode) { 463 /* this node has one or more 464 * failed true data 465 * dependencies, so skip it */ 466 s->next = skiplist; 467 skiplist = s; 468 } else 469 /* add s to list of nodes (q) 470 * to execute */ 471 if (context != RF_INTR_CONTEXT) { 472 /* we only have to 473 * enqueue if we're at 474 * intr context */ 475 /* put node on 476 a list to 477 be fired 478 after we 479 unlock */ 480 s->next = firelist; 481 firelist = s; 482 } else { 483 /* enqueue the 484 node for 485 the dag 486 exec thread 487 to fire */ 488 RF_ASSERT(NodeReady(s)); 489 if (q) { 490 q->next = s; 491 q = s; 492 } else { 493 qh = q = s; 494 qh->next = NULL; 495 } 496 } 497 } 498 } 499 } 500 501 if (q) { 502 /* xfer our local list of nodes to the node queue */ 503 q->next = raidPtr->node_queue; 504 raidPtr->node_queue = qh; 505 DO_SIGNAL(raidPtr); 506 } 507 DO_UNLOCK(raidPtr); 508 509 for (; skiplist; skiplist = next) { 510 next = skiplist->next; 511 skiplist->status = rf_skipped; 512 for (i = 0; i < skiplist->numAntecedents; i++) { 513 skiplist->antecedents[i]->numSuccFired++; 514 } 515 if (skiplist->commitNode) { 516 skiplist->dagHdr->numCommits++; 517 } 518 rf_FinishNode(skiplist, context); 519 } 520 for (; finishlist; finishlist = next) { 521 /* NIL nodes: no need to fire them */ 522 next = finishlist->next; 523 finishlist->status = rf_good; 524 for (i = 0; i < finishlist->numAntecedents; i++) { 525 finishlist->antecedents[i]->numSuccFired++; 526 } 527 if (finishlist->commitNode) 528 finishlist->dagHdr->numCommits++; 529 /* 530 * Okay, here we're calling rf_FinishNode() on 531 * nodes that have the null function as their 532 * work proc. Such a node could be the 533 * terminal node in a DAG. If so, it will 534 * cause the DAG to complete, which will in 535 * turn free memory used by the DAG, which 536 * includes the node in question. Thus, we 537 * must avoid referencing the node at all 538 * after calling rf_FinishNode() on it. */ 539 rf_FinishNode(finishlist, context); /* recursive call */ 540 } 541 /* fire all nodes in firelist */ 542 FireNodeList(firelist); 543 break; 544 545 case rf_rollBackward: 546 for (i = 0; i < node->numAntecedents; i++) { 547 a = *(node->antecedents + i); 548 RF_ASSERT(a->status == rf_good); 549 RF_ASSERT(a->numSuccDone <= a->numSuccedents); 550 RF_ASSERT(a->numSuccDone <= a->numSuccFired); 551 552 if (a->numSuccDone == a->numSuccFired) { 553 if (a->undoFunc == rf_NullNodeFunc) { 554 /* don't fire NIL nodes, just process 555 * them */ 556 a->next = finishlist; 557 finishlist = a; 558 } else { 559 if (context != RF_INTR_CONTEXT) { 560 /* we only have to enqueue if 561 * we're at intr context */ 562 /* put node on a list to be 563 fired after we unlock */ 564 a->next = firelist; 565 566 firelist = a; 567 } else { 568 /* enqueue the node for the 569 dag exec thread to fire */ 570 RF_ASSERT(NodeReady(a)); 571 if (q) { 572 q->next = a; 573 q = a; 574 } else { 575 qh = q = a; 576 qh->next = NULL; 577 } 578 } 579 } 580 } 581 } 582 if (q) { 583 /* xfer our local list of nodes to the node queue */ 584 q->next = raidPtr->node_queue; 585 raidPtr->node_queue = qh; 586 DO_SIGNAL(raidPtr); 587 } 588 DO_UNLOCK(raidPtr); 589 for (; finishlist; finishlist = next) { 590 /* NIL nodes: no need to fire them */ 591 next = finishlist->next; 592 finishlist->status = rf_good; 593 /* 594 * Okay, here we're calling rf_FinishNode() on 595 * nodes that have the null function as their 596 * work proc. Such a node could be the first 597 * node in a DAG. If so, it will cause the DAG 598 * to complete, which will in turn free memory 599 * used by the DAG, which includes the node in 600 * question. Thus, we must avoid referencing 601 * the node at all after calling 602 * rf_FinishNode() on it. */ 603 rf_FinishNode(finishlist, context); /* recursive call */ 604 } 605 /* fire all nodes in firelist */ 606 FireNodeList(firelist); 607 608 break; 609 default: 610 printf("Engine found illegal DAG status in PropagateResults()\n"); 611 RF_PANIC(); 612 break; 613 } 614 } 615 616 617 618 /* 619 * Process a fired node which has completed 620 */ 621 static void 622 ProcessNode(RF_DagNode_t *node, int context) 623 { 624 RF_Raid_t *raidPtr; 625 626 raidPtr = node->dagHdr->raidPtr; 627 628 switch (node->status) { 629 case rf_good: 630 /* normal case, don't need to do anything */ 631 break; 632 case rf_bad: 633 if ((node->dagHdr->numCommits > 0) || 634 (node->dagHdr->numCommitNodes == 0)) { 635 /* crossed commit barrier */ 636 node->dagHdr->status = rf_rollForward; 637 #if RF_DEBUG_ENGINE 638 if (rf_engineDebug) { 639 printf("raid%d: node (%s) returned fail, rolling forward\n", raidPtr->raidid, node->name); 640 } 641 #endif 642 } else { 643 /* never reached commit barrier */ 644 node->dagHdr->status = rf_rollBackward; 645 #if RF_DEBUG_ENGINE 646 if (rf_engineDebug) { 647 printf("raid%d: node (%s) returned fail, rolling backward\n", raidPtr->raidid, node->name); 648 } 649 #endif 650 } 651 break; 652 case rf_undone: 653 /* normal rollBackward case, don't need to do anything */ 654 break; 655 case rf_panic: 656 /* an undo node failed!!! */ 657 printf("UNDO of a node failed!!!/n"); 658 break; 659 default: 660 printf("node finished execution with an illegal status!!!\n"); 661 RF_PANIC(); 662 break; 663 } 664 665 /* enqueue node's succedents (antecedents if rollBackward) for 666 * execution */ 667 PropagateResults(node, context); 668 } 669 670 671 672 /* user context or dag-exec-thread context: 673 * This is the first step in post-processing a newly-completed node. 674 * This routine is called by each node execution function to mark the node 675 * as complete and fire off any successors that have been enabled. 676 */ 677 int 678 rf_FinishNode(RF_DagNode_t *node, int context) 679 { 680 int retcode = RF_FALSE; 681 node->dagHdr->numNodesCompleted++; 682 ProcessNode(node, context); 683 684 return (retcode); 685 } 686 687 688 /* user context: submit dag for execution, return non-zero if we have 689 * to wait for completion. if and only if we return non-zero, we'll 690 * cause cbFunc to get invoked with cbArg when the DAG has completed. 691 * 692 * for now we always return 1. If the DAG does not cause any I/O, 693 * then the callback may get invoked before DispatchDAG returns. 694 * There's code in state 5 of ContinueRaidAccess to handle this. 695 * 696 * All we do here is fire the direct successors of the header node. 697 * The DAG execution thread does the rest of the dag processing. */ 698 int 699 rf_DispatchDAG(RF_DagHeader_t *dag, void (*cbFunc) (void *), 700 void *cbArg) 701 { 702 RF_Raid_t *raidPtr; 703 704 raidPtr = dag->raidPtr; 705 #if RF_ACC_TRACE > 0 706 if (dag->tracerec) { 707 RF_ETIMER_START(dag->tracerec->timer); 708 } 709 #endif 710 #if DEBUG 711 #if RF_DEBUG_VALIDATE_DAG 712 if (rf_engineDebug || rf_validateDAGDebug) { 713 if (rf_ValidateDAG(dag)) 714 RF_PANIC(); 715 } 716 #endif 717 #endif 718 #if RF_DEBUG_ENGINE 719 if (rf_engineDebug) { 720 printf("raid%d: Entering DispatchDAG\n", raidPtr->raidid); 721 } 722 #endif 723 raidPtr->dags_in_flight++; /* debug only: blow off proper 724 * locking */ 725 dag->cbFunc = cbFunc; 726 dag->cbArg = cbArg; 727 dag->numNodesCompleted = 0; 728 dag->status = rf_enable; 729 FireNodeArray(dag->numSuccedents, dag->succedents); 730 return (1); 731 } 732 /* dedicated kernel thread: the thread that handles all DAG node 733 * firing. To minimize locking and unlocking, we grab a copy of the 734 * entire node queue and then set the node queue to NULL before doing 735 * any firing of nodes. This way we only have to release the lock 736 * once. Of course, it's probably rare that there's more than one 737 * node in the queue at any one time, but it sometimes happens. 738 */ 739 740 static void 741 DAGExecutionThread(RF_ThreadArg_t arg) 742 { 743 RF_DagNode_t *nd, *local_nq, *term_nq, *fire_nq; 744 RF_Raid_t *raidPtr; 745 int ks; 746 int s; 747 748 raidPtr = (RF_Raid_t *) arg; 749 750 #if RF_DEBUG_ENGINE 751 if (rf_engineDebug) { 752 printf("raid%d: Engine thread is running\n", raidPtr->raidid); 753 } 754 #endif 755 s = splbio(); 756 757 DO_LOCK(raidPtr); 758 while (!raidPtr->shutdown_engine) { 759 760 while (raidPtr->node_queue != NULL) { 761 local_nq = raidPtr->node_queue; 762 fire_nq = NULL; 763 term_nq = NULL; 764 raidPtr->node_queue = NULL; 765 DO_UNLOCK(raidPtr); 766 767 /* first, strip out the terminal nodes */ 768 while (local_nq) { 769 nd = local_nq; 770 local_nq = local_nq->next; 771 switch (nd->dagHdr->status) { 772 case rf_enable: 773 case rf_rollForward: 774 if (nd->numSuccedents == 0) { 775 /* end of the dag, add to 776 * callback list */ 777 nd->next = term_nq; 778 term_nq = nd; 779 } else { 780 /* not the end, add to the 781 * fire queue */ 782 nd->next = fire_nq; 783 fire_nq = nd; 784 } 785 break; 786 case rf_rollBackward: 787 if (nd->numAntecedents == 0) { 788 /* end of the dag, add to the 789 * callback list */ 790 nd->next = term_nq; 791 term_nq = nd; 792 } else { 793 /* not the end, add to the 794 * fire queue */ 795 nd->next = fire_nq; 796 fire_nq = nd; 797 } 798 break; 799 default: 800 RF_PANIC(); 801 break; 802 } 803 } 804 805 /* execute callback of dags which have reached the 806 * terminal node */ 807 while (term_nq) { 808 nd = term_nq; 809 term_nq = term_nq->next; 810 nd->next = NULL; 811 (nd->dagHdr->cbFunc) (nd->dagHdr->cbArg); 812 raidPtr->dags_in_flight--; /* debug only */ 813 } 814 815 /* fire remaining nodes */ 816 FireNodeList(fire_nq); 817 818 DO_LOCK(raidPtr); 819 } 820 while (!raidPtr->shutdown_engine && 821 raidPtr->node_queue == NULL) { 822 DO_WAIT(raidPtr); 823 } 824 } 825 DO_UNLOCK(raidPtr); 826 827 splx(s); 828 kthread_exit(0); 829 } 830 831 /* 832 * rf_RaidIOThread() -- When I/O to a component begins, raidstrategy() 833 * puts the I/O on a buf_queue, and then signals raidPtr->iodone. If 834 * necessary, this function calls raidstart() to initiate the I/O. 835 * When I/O to a component completes, KernelWakeupFunc() puts the 836 * completed request onto raidPtr->iodone TAILQ. This function looks 837 * after requests on that queue by calling rf_DiskIOComplete() for the 838 * request, and by calling any required CompleteFunc for the request. 839 */ 840 841 static void 842 rf_RaidIOThread(RF_ThreadArg_t arg) 843 { 844 RF_Raid_t *raidPtr; 845 RF_DiskQueueData_t *req; 846 int s; 847 848 raidPtr = (RF_Raid_t *) arg; 849 850 s = splbio(); 851 simple_lock(&(raidPtr->iodone_lock)); 852 853 while (!raidPtr->shutdown_raidio) { 854 /* if there is nothing to do, then snooze. */ 855 if (TAILQ_EMPTY(&(raidPtr->iodone)) && 856 rf_buf_queue_check(raidPtr->raidid)) { 857 ltsleep(&(raidPtr->iodone), PRIBIO, "raidiow", 0, 858 &(raidPtr->iodone_lock)); 859 } 860 861 /* Check for deferred parity-map-related work. */ 862 if (raidPtr->parity_map != NULL) { 863 simple_unlock(&(raidPtr->iodone_lock)); 864 rf_paritymap_checkwork(raidPtr->parity_map); 865 simple_lock(&(raidPtr->iodone_lock)); 866 } 867 868 /* See what I/Os, if any, have arrived */ 869 while ((req = TAILQ_FIRST(&(raidPtr->iodone))) != NULL) { 870 TAILQ_REMOVE(&(raidPtr->iodone), req, iodone_entries); 871 simple_unlock(&(raidPtr->iodone_lock)); 872 rf_DiskIOComplete(req->queue, req, req->error); 873 (req->CompleteFunc) (req->argument, req->error); 874 simple_lock(&(raidPtr->iodone_lock)); 875 } 876 877 /* process any pending outgoing IO */ 878 simple_unlock(&(raidPtr->iodone_lock)); 879 raidstart(raidPtr); 880 simple_lock(&(raidPtr->iodone_lock)); 881 882 } 883 884 /* Let rf_ShutdownEngine know that we're done... */ 885 raidPtr->shutdown_raidio = 0; 886 wakeup(&(raidPtr->shutdown_raidio)); 887 888 simple_unlock(&(raidPtr->iodone_lock)); 889 splx(s); 890 891 kthread_exit(0); 892 } 893