1 /* $NetBSD: rf_engine.c,v 1.47 2011/09/07 07:46:45 mbalmer Exp $ */ 2 /* 3 * Copyright (c) 1995 Carnegie-Mellon University. 4 * All rights reserved. 5 * 6 * Author: William V. Courtright II, Mark Holland, Rachad Youssef 7 * 8 * Permission to use, copy, modify and distribute this software and 9 * its documentation is hereby granted, provided that both the copyright 10 * notice and this permission notice appear in all copies of the 11 * software, derivative works or modified versions, and any portions 12 * thereof, and that both notices appear in supporting documentation. 13 * 14 * CARNEGIE MELLON ALLOWS FREE USE OF THIS SOFTWARE IN ITS "AS IS" 15 * CONDITION. CARNEGIE MELLON DISCLAIMS ANY LIABILITY OF ANY KIND 16 * FOR ANY DAMAGES WHATSOEVER RESULTING FROM THE USE OF THIS SOFTWARE. 17 * 18 * Carnegie Mellon requests users of this software to return to 19 * 20 * Software Distribution Coordinator or Software.Distribution@CS.CMU.EDU 21 * School of Computer Science 22 * Carnegie Mellon University 23 * Pittsburgh PA 15213-3890 24 * 25 * any improvements or extensions that they make and grant Carnegie the 26 * rights to redistribute these changes. 27 */ 28 29 /**************************************************************************** 30 * * 31 * engine.c -- code for DAG execution engine * 32 * * 33 * Modified to work as follows (holland): * 34 * A user-thread calls into DispatchDAG, which fires off the nodes that * 35 * are direct successors to the header node. DispatchDAG then returns, * 36 * and the rest of the I/O continues asynchronously. As each node * 37 * completes, the node execution function calls FinishNode(). FinishNode * 38 * scans the list of successors to the node and increments the antecedent * 39 * counts. Each node that becomes enabled is placed on a central node * 40 * queue. A dedicated dag-execution thread grabs nodes off of this * 41 * queue and fires them. * 42 * * 43 * NULL nodes are never fired. * 44 * * 45 * Terminator nodes are never fired, but rather cause the callback * 46 * associated with the DAG to be invoked. * 47 * * 48 * If a node fails, the dag either rolls forward to the completion or * 49 * rolls back, undoing previously-completed nodes and fails atomically. * 50 * The direction of recovery is determined by the location of the failed * 51 * node in the graph. If the failure occurred before the commit node in * 52 * the graph, backward recovery is used. Otherwise, forward recovery is * 53 * used. * 54 * * 55 ****************************************************************************/ 56 57 #include <sys/cdefs.h> 58 __KERNEL_RCSID(0, "$NetBSD: rf_engine.c,v 1.47 2011/09/07 07:46:45 mbalmer Exp $"); 59 60 #include <sys/errno.h> 61 62 #include "rf_threadstuff.h" 63 #include "rf_dag.h" 64 #include "rf_engine.h" 65 #include "rf_etimer.h" 66 #include "rf_general.h" 67 #include "rf_dagutils.h" 68 #include "rf_shutdown.h" 69 #include "rf_raid.h" 70 #include "rf_kintf.h" 71 #include "rf_paritymap.h" 72 73 static void rf_ShutdownEngine(void *); 74 static void DAGExecutionThread(RF_ThreadArg_t arg); 75 static void rf_RaidIOThread(RF_ThreadArg_t arg); 76 77 /* synchronization primitives for this file. DO_WAIT should be enclosed in a while loop. */ 78 79 #define DO_LOCK(_r_) \ 80 rf_lock_mutex2((_r_)->node_queue_mutex) 81 82 #define DO_UNLOCK(_r_) \ 83 rf_unlock_mutex2((_r_)->node_queue_mutex) 84 85 #define DO_WAIT(_r_) \ 86 rf_wait_cond2((_r_)->node_queue_cv, (_r_)->node_queue_mutex) 87 88 #define DO_SIGNAL(_r_) \ 89 rf_broadcast_cond2((_r_)->node_queue_cv) /* XXX rf_signal_cond2? */ 90 91 static void 92 rf_ShutdownEngine(void *arg) 93 { 94 RF_Raid_t *raidPtr; 95 96 raidPtr = (RF_Raid_t *) arg; 97 98 /* Tell the rf_RaidIOThread to shutdown */ 99 rf_lock_mutex2(raidPtr->iodone_lock); 100 101 raidPtr->shutdown_raidio = 1; 102 rf_signal_cond2(raidPtr->iodone_cv); 103 104 /* ...and wait for it to tell us it has finished */ 105 while (raidPtr->shutdown_raidio) 106 rf_wait_cond2(raidPtr->iodone_cv, raidPtr->iodone_lock); 107 108 rf_unlock_mutex2(raidPtr->iodone_lock); 109 110 /* Now shut down the DAG execution engine. */ 111 DO_LOCK(raidPtr); 112 raidPtr->shutdown_engine = 1; 113 DO_SIGNAL(raidPtr); 114 115 /* ...and wait for it to tell us it has finished */ 116 while (raidPtr->shutdown_engine) 117 DO_WAIT(raidPtr); 118 119 DO_UNLOCK(raidPtr); 120 121 rf_destroy_mutex2(raidPtr->node_queue_mutex); 122 rf_destroy_cond2(raidPtr->node_queue_cv); 123 124 rf_destroy_mutex2(raidPtr->iodone_lock); 125 rf_destroy_cond2(raidPtr->iodone_cv); 126 } 127 128 int 129 rf_ConfigureEngine(RF_ShutdownList_t **listp, RF_Raid_t *raidPtr, 130 RF_Config_t *cfgPtr) 131 { 132 133 /* 134 * Initialise iodone for the IO thread. 135 */ 136 TAILQ_INIT(&(raidPtr->iodone)); 137 rf_init_mutex2(raidPtr->iodone_lock, IPL_VM); 138 rf_init_cond2(raidPtr->iodone_cv, "raidiow"); 139 140 rf_init_mutex2(raidPtr->node_queue_mutex, IPL_VM); 141 rf_init_cond2(raidPtr->node_queue_cv, "rfnodeq"); 142 raidPtr->node_queue = NULL; 143 raidPtr->dags_in_flight = 0; 144 145 /* we create the execution thread only once per system boot. no need 146 * to check return code b/c the kernel panics if it can't create the 147 * thread. */ 148 #if RF_DEBUG_ENGINE 149 if (rf_engineDebug) { 150 printf("raid%d: Creating engine thread\n", raidPtr->raidid); 151 } 152 #endif 153 if (RF_CREATE_ENGINE_THREAD(raidPtr->engine_thread, 154 DAGExecutionThread, raidPtr, 155 "raid%d", raidPtr->raidid)) { 156 printf("raid%d: Unable to create engine thread\n", 157 raidPtr->raidid); 158 return (ENOMEM); 159 } 160 if (RF_CREATE_ENGINE_THREAD(raidPtr->engine_helper_thread, 161 rf_RaidIOThread, raidPtr, 162 "raidio%d", raidPtr->raidid)) { 163 printf("raid%d: Unable to create raidio thread\n", 164 raidPtr->raidid); 165 return (ENOMEM); 166 } 167 #if RF_DEBUG_ENGINE 168 if (rf_engineDebug) { 169 printf("raid%d: Created engine thread\n", raidPtr->raidid); 170 } 171 #endif 172 173 /* engine thread is now running and waiting for work */ 174 #if RF_DEBUG_ENGINE 175 if (rf_engineDebug) { 176 printf("raid%d: Engine thread running and waiting for events\n", raidPtr->raidid); 177 } 178 #endif 179 rf_ShutdownCreate(listp, rf_ShutdownEngine, raidPtr); 180 181 return (0); 182 } 183 184 #if 0 185 static int 186 BranchDone(RF_DagNode_t *node) 187 { 188 int i; 189 190 /* return true if forward execution is completed for a node and it's 191 * succedents */ 192 switch (node->status) { 193 case rf_wait: 194 /* should never be called in this state */ 195 RF_PANIC(); 196 break; 197 case rf_fired: 198 /* node is currently executing, so we're not done */ 199 return (RF_FALSE); 200 case rf_good: 201 /* for each succedent recursively check branch */ 202 for (i = 0; i < node->numSuccedents; i++) 203 if (!BranchDone(node->succedents[i])) 204 return RF_FALSE; 205 return RF_TRUE; /* node and all succedent branches aren't in 206 * fired state */ 207 case rf_bad: 208 /* succedents can't fire */ 209 return (RF_TRUE); 210 case rf_recover: 211 /* should never be called in this state */ 212 RF_PANIC(); 213 break; 214 case rf_undone: 215 case rf_panic: 216 /* XXX need to fix this case */ 217 /* for now, assume that we're done */ 218 return (RF_TRUE); 219 default: 220 /* illegal node status */ 221 RF_PANIC(); 222 break; 223 } 224 } 225 #endif 226 227 static int 228 NodeReady(RF_DagNode_t *node) 229 { 230 int ready; 231 232 switch (node->dagHdr->status) { 233 case rf_enable: 234 case rf_rollForward: 235 if ((node->status == rf_wait) && 236 (node->numAntecedents == node->numAntDone)) 237 ready = RF_TRUE; 238 else 239 ready = RF_FALSE; 240 break; 241 case rf_rollBackward: 242 RF_ASSERT(node->numSuccDone <= node->numSuccedents); 243 RF_ASSERT(node->numSuccFired <= node->numSuccedents); 244 RF_ASSERT(node->numSuccFired <= node->numSuccDone); 245 if ((node->status == rf_good) && 246 (node->numSuccDone == node->numSuccedents)) 247 ready = RF_TRUE; 248 else 249 ready = RF_FALSE; 250 break; 251 default: 252 printf("Execution engine found illegal DAG status in NodeReady\n"); 253 RF_PANIC(); 254 break; 255 } 256 257 return (ready); 258 } 259 260 261 262 /* user context and dag-exec-thread context: Fire a node. The node's 263 * status field determines which function, do or undo, to be fired. 264 * This routine assumes that the node's status field has alread been 265 * set to "fired" or "recover" to indicate the direction of execution. 266 */ 267 static void 268 FireNode(RF_DagNode_t *node) 269 { 270 switch (node->status) { 271 case rf_fired: 272 /* fire the do function of a node */ 273 #if RF_DEBUG_ENGINE 274 if (rf_engineDebug) { 275 printf("raid%d: Firing node 0x%lx (%s)\n", 276 node->dagHdr->raidPtr->raidid, 277 (unsigned long) node, node->name); 278 } 279 #endif 280 if (node->flags & RF_DAGNODE_FLAG_YIELD) { 281 #if defined(__NetBSD__) && defined(_KERNEL) 282 /* thread_block(); */ 283 /* printf("Need to block the thread here...\n"); */ 284 /* XXX thread_block is actually mentioned in 285 * /usr/include/vm/vm_extern.h */ 286 #else 287 thread_block(); 288 #endif 289 } 290 (*(node->doFunc)) (node); 291 break; 292 case rf_recover: 293 /* fire the undo function of a node */ 294 #if RF_DEBUG_ENGINE 295 if (rf_engineDebug) { 296 printf("raid%d: Firing (undo) node 0x%lx (%s)\n", 297 node->dagHdr->raidPtr->raidid, 298 (unsigned long) node, node->name); 299 } 300 #endif 301 if (node->flags & RF_DAGNODE_FLAG_YIELD) 302 #if defined(__NetBSD__) && defined(_KERNEL) 303 /* thread_block(); */ 304 /* printf("Need to block the thread here...\n"); */ 305 /* XXX thread_block is actually mentioned in 306 * /usr/include/vm/vm_extern.h */ 307 #else 308 thread_block(); 309 #endif 310 (*(node->undoFunc)) (node); 311 break; 312 default: 313 RF_PANIC(); 314 break; 315 } 316 } 317 318 319 320 /* user context: 321 * Attempt to fire each node in a linear array. 322 * The entire list is fired atomically. 323 */ 324 static void 325 FireNodeArray(int numNodes, RF_DagNode_t **nodeList) 326 { 327 RF_DagStatus_t dstat; 328 RF_DagNode_t *node; 329 int i, j; 330 331 /* first, mark all nodes which are ready to be fired */ 332 for (i = 0; i < numNodes; i++) { 333 node = nodeList[i]; 334 dstat = node->dagHdr->status; 335 RF_ASSERT((node->status == rf_wait) || 336 (node->status == rf_good)); 337 if (NodeReady(node)) { 338 if ((dstat == rf_enable) || 339 (dstat == rf_rollForward)) { 340 RF_ASSERT(node->status == rf_wait); 341 if (node->commitNode) 342 node->dagHdr->numCommits++; 343 node->status = rf_fired; 344 for (j = 0; j < node->numAntecedents; j++) 345 node->antecedents[j]->numSuccFired++; 346 } else { 347 RF_ASSERT(dstat == rf_rollBackward); 348 RF_ASSERT(node->status == rf_good); 349 /* only one commit node per graph */ 350 RF_ASSERT(node->commitNode == RF_FALSE); 351 node->status = rf_recover; 352 } 353 } 354 } 355 /* now, fire the nodes */ 356 for (i = 0; i < numNodes; i++) { 357 if ((nodeList[i]->status == rf_fired) || 358 (nodeList[i]->status == rf_recover)) 359 FireNode(nodeList[i]); 360 } 361 } 362 363 364 /* user context: 365 * Attempt to fire each node in a linked list. 366 * The entire list is fired atomically. 367 */ 368 static void 369 FireNodeList(RF_DagNode_t *nodeList) 370 { 371 RF_DagNode_t *node, *next; 372 RF_DagStatus_t dstat; 373 int j; 374 375 if (nodeList) { 376 /* first, mark all nodes which are ready to be fired */ 377 for (node = nodeList; node; node = next) { 378 next = node->next; 379 dstat = node->dagHdr->status; 380 RF_ASSERT((node->status == rf_wait) || 381 (node->status == rf_good)); 382 if (NodeReady(node)) { 383 if ((dstat == rf_enable) || 384 (dstat == rf_rollForward)) { 385 RF_ASSERT(node->status == rf_wait); 386 if (node->commitNode) 387 node->dagHdr->numCommits++; 388 node->status = rf_fired; 389 for (j = 0; j < node->numAntecedents; j++) 390 node->antecedents[j]->numSuccFired++; 391 } else { 392 RF_ASSERT(dstat == rf_rollBackward); 393 RF_ASSERT(node->status == rf_good); 394 /* only one commit node per graph */ 395 RF_ASSERT(node->commitNode == RF_FALSE); 396 node->status = rf_recover; 397 } 398 } 399 } 400 /* now, fire the nodes */ 401 for (node = nodeList; node; node = next) { 402 next = node->next; 403 if ((node->status == rf_fired) || 404 (node->status == rf_recover)) 405 FireNode(node); 406 } 407 } 408 } 409 /* interrupt context: 410 * for each succedent 411 * propagate required results from node to succedent 412 * increment succedent's numAntDone 413 * place newly-enable nodes on node queue for firing 414 * 415 * To save context switches, we don't place NIL nodes on the node queue, 416 * but rather just process them as if they had fired. Note that NIL nodes 417 * that are the direct successors of the header will actually get fired by 418 * DispatchDAG, which is fine because no context switches are involved. 419 * 420 * Important: when running at user level, this can be called by any 421 * disk thread, and so the increment and check of the antecedent count 422 * must be locked. I used the node queue mutex and locked down the 423 * entire function, but this is certainly overkill. 424 */ 425 static void 426 PropagateResults(RF_DagNode_t *node, int context) 427 { 428 RF_DagNode_t *s, *a; 429 RF_Raid_t *raidPtr; 430 int i; 431 RF_DagNode_t *finishlist = NULL; /* a list of NIL nodes to be 432 * finished */ 433 RF_DagNode_t *skiplist = NULL; /* list of nodes with failed truedata 434 * antecedents */ 435 RF_DagNode_t *firelist = NULL; /* a list of nodes to be fired */ 436 RF_DagNode_t *q = NULL, *qh = NULL, *next; 437 int j, skipNode; 438 439 raidPtr = node->dagHdr->raidPtr; 440 441 DO_LOCK(raidPtr); 442 443 /* debug - validate fire counts */ 444 for (i = 0; i < node->numAntecedents; i++) { 445 a = *(node->antecedents + i); 446 RF_ASSERT(a->numSuccFired >= a->numSuccDone); 447 RF_ASSERT(a->numSuccFired <= a->numSuccedents); 448 a->numSuccDone++; 449 } 450 451 switch (node->dagHdr->status) { 452 case rf_enable: 453 case rf_rollForward: 454 for (i = 0; i < node->numSuccedents; i++) { 455 s = *(node->succedents + i); 456 RF_ASSERT(s->status == rf_wait); 457 (s->numAntDone)++; 458 if (s->numAntDone == s->numAntecedents) { 459 /* look for NIL nodes */ 460 if (s->doFunc == rf_NullNodeFunc) { 461 /* don't fire NIL nodes, just process 462 * them */ 463 s->next = finishlist; 464 finishlist = s; 465 } else { 466 /* look to see if the node is to be 467 * skipped */ 468 skipNode = RF_FALSE; 469 for (j = 0; j < s->numAntecedents; j++) 470 if ((s->antType[j] == rf_trueData) && (s->antecedents[j]->status == rf_bad)) 471 skipNode = RF_TRUE; 472 if (skipNode) { 473 /* this node has one or more 474 * failed true data 475 * dependencies, so skip it */ 476 s->next = skiplist; 477 skiplist = s; 478 } else 479 /* add s to list of nodes (q) 480 * to execute */ 481 if (context != RF_INTR_CONTEXT) { 482 /* we only have to 483 * enqueue if we're at 484 * intr context */ 485 /* put node on 486 a list to 487 be fired 488 after we 489 unlock */ 490 s->next = firelist; 491 firelist = s; 492 } else { 493 /* enqueue the 494 node for 495 the dag 496 exec thread 497 to fire */ 498 RF_ASSERT(NodeReady(s)); 499 if (q) { 500 q->next = s; 501 q = s; 502 } else { 503 qh = q = s; 504 qh->next = NULL; 505 } 506 } 507 } 508 } 509 } 510 511 if (q) { 512 /* xfer our local list of nodes to the node queue */ 513 q->next = raidPtr->node_queue; 514 raidPtr->node_queue = qh; 515 DO_SIGNAL(raidPtr); 516 } 517 DO_UNLOCK(raidPtr); 518 519 for (; skiplist; skiplist = next) { 520 next = skiplist->next; 521 skiplist->status = rf_skipped; 522 for (i = 0; i < skiplist->numAntecedents; i++) { 523 skiplist->antecedents[i]->numSuccFired++; 524 } 525 if (skiplist->commitNode) { 526 skiplist->dagHdr->numCommits++; 527 } 528 rf_FinishNode(skiplist, context); 529 } 530 for (; finishlist; finishlist = next) { 531 /* NIL nodes: no need to fire them */ 532 next = finishlist->next; 533 finishlist->status = rf_good; 534 for (i = 0; i < finishlist->numAntecedents; i++) { 535 finishlist->antecedents[i]->numSuccFired++; 536 } 537 if (finishlist->commitNode) 538 finishlist->dagHdr->numCommits++; 539 /* 540 * Okay, here we're calling rf_FinishNode() on 541 * nodes that have the null function as their 542 * work proc. Such a node could be the 543 * terminal node in a DAG. If so, it will 544 * cause the DAG to complete, which will in 545 * turn free memory used by the DAG, which 546 * includes the node in question. Thus, we 547 * must avoid referencing the node at all 548 * after calling rf_FinishNode() on it. */ 549 rf_FinishNode(finishlist, context); /* recursive call */ 550 } 551 /* fire all nodes in firelist */ 552 FireNodeList(firelist); 553 break; 554 555 case rf_rollBackward: 556 for (i = 0; i < node->numAntecedents; i++) { 557 a = *(node->antecedents + i); 558 RF_ASSERT(a->status == rf_good); 559 RF_ASSERT(a->numSuccDone <= a->numSuccedents); 560 RF_ASSERT(a->numSuccDone <= a->numSuccFired); 561 562 if (a->numSuccDone == a->numSuccFired) { 563 if (a->undoFunc == rf_NullNodeFunc) { 564 /* don't fire NIL nodes, just process 565 * them */ 566 a->next = finishlist; 567 finishlist = a; 568 } else { 569 if (context != RF_INTR_CONTEXT) { 570 /* we only have to enqueue if 571 * we're at intr context */ 572 /* put node on a list to be 573 fired after we unlock */ 574 a->next = firelist; 575 576 firelist = a; 577 } else { 578 /* enqueue the node for the 579 dag exec thread to fire */ 580 RF_ASSERT(NodeReady(a)); 581 if (q) { 582 q->next = a; 583 q = a; 584 } else { 585 qh = q = a; 586 qh->next = NULL; 587 } 588 } 589 } 590 } 591 } 592 if (q) { 593 /* xfer our local list of nodes to the node queue */ 594 q->next = raidPtr->node_queue; 595 raidPtr->node_queue = qh; 596 DO_SIGNAL(raidPtr); 597 } 598 DO_UNLOCK(raidPtr); 599 for (; finishlist; finishlist = next) { 600 /* NIL nodes: no need to fire them */ 601 next = finishlist->next; 602 finishlist->status = rf_good; 603 /* 604 * Okay, here we're calling rf_FinishNode() on 605 * nodes that have the null function as their 606 * work proc. Such a node could be the first 607 * node in a DAG. If so, it will cause the DAG 608 * to complete, which will in turn free memory 609 * used by the DAG, which includes the node in 610 * question. Thus, we must avoid referencing 611 * the node at all after calling 612 * rf_FinishNode() on it. */ 613 rf_FinishNode(finishlist, context); /* recursive call */ 614 } 615 /* fire all nodes in firelist */ 616 FireNodeList(firelist); 617 618 break; 619 default: 620 printf("Engine found illegal DAG status in PropagateResults()\n"); 621 RF_PANIC(); 622 break; 623 } 624 } 625 626 627 628 /* 629 * Process a fired node which has completed 630 */ 631 static void 632 ProcessNode(RF_DagNode_t *node, int context) 633 { 634 RF_Raid_t *raidPtr; 635 636 raidPtr = node->dagHdr->raidPtr; 637 638 switch (node->status) { 639 case rf_good: 640 /* normal case, don't need to do anything */ 641 break; 642 case rf_bad: 643 if ((node->dagHdr->numCommits > 0) || 644 (node->dagHdr->numCommitNodes == 0)) { 645 /* crossed commit barrier */ 646 node->dagHdr->status = rf_rollForward; 647 #if RF_DEBUG_ENGINE 648 if (rf_engineDebug) { 649 printf("raid%d: node (%s) returned fail, rolling forward\n", raidPtr->raidid, node->name); 650 } 651 #endif 652 } else { 653 /* never reached commit barrier */ 654 node->dagHdr->status = rf_rollBackward; 655 #if RF_DEBUG_ENGINE 656 if (rf_engineDebug) { 657 printf("raid%d: node (%s) returned fail, rolling backward\n", raidPtr->raidid, node->name); 658 } 659 #endif 660 } 661 break; 662 case rf_undone: 663 /* normal rollBackward case, don't need to do anything */ 664 break; 665 case rf_panic: 666 /* an undo node failed!!! */ 667 printf("UNDO of a node failed!!!\n"); 668 break; 669 default: 670 printf("node finished execution with an illegal status!!!\n"); 671 RF_PANIC(); 672 break; 673 } 674 675 /* enqueue node's succedents (antecedents if rollBackward) for 676 * execution */ 677 PropagateResults(node, context); 678 } 679 680 681 682 /* user context or dag-exec-thread context: 683 * This is the first step in post-processing a newly-completed node. 684 * This routine is called by each node execution function to mark the node 685 * as complete and fire off any successors that have been enabled. 686 */ 687 int 688 rf_FinishNode(RF_DagNode_t *node, int context) 689 { 690 int retcode = RF_FALSE; 691 node->dagHdr->numNodesCompleted++; 692 ProcessNode(node, context); 693 694 return (retcode); 695 } 696 697 698 /* user context: submit dag for execution, return non-zero if we have 699 * to wait for completion. if and only if we return non-zero, we'll 700 * cause cbFunc to get invoked with cbArg when the DAG has completed. 701 * 702 * for now we always return 1. If the DAG does not cause any I/O, 703 * then the callback may get invoked before DispatchDAG returns. 704 * There's code in state 5 of ContinueRaidAccess to handle this. 705 * 706 * All we do here is fire the direct successors of the header node. 707 * The DAG execution thread does the rest of the dag processing. */ 708 int 709 rf_DispatchDAG(RF_DagHeader_t *dag, void (*cbFunc) (void *), 710 void *cbArg) 711 { 712 RF_Raid_t *raidPtr; 713 714 raidPtr = dag->raidPtr; 715 #if RF_ACC_TRACE > 0 716 if (dag->tracerec) { 717 RF_ETIMER_START(dag->tracerec->timer); 718 } 719 #endif 720 #if DEBUG 721 #if RF_DEBUG_VALIDATE_DAG 722 if (rf_engineDebug || rf_validateDAGDebug) { 723 if (rf_ValidateDAG(dag)) 724 RF_PANIC(); 725 } 726 #endif 727 #endif 728 #if RF_DEBUG_ENGINE 729 if (rf_engineDebug) { 730 printf("raid%d: Entering DispatchDAG\n", raidPtr->raidid); 731 } 732 #endif 733 raidPtr->dags_in_flight++; /* debug only: blow off proper 734 * locking */ 735 dag->cbFunc = cbFunc; 736 dag->cbArg = cbArg; 737 dag->numNodesCompleted = 0; 738 dag->status = rf_enable; 739 FireNodeArray(dag->numSuccedents, dag->succedents); 740 return (1); 741 } 742 /* dedicated kernel thread: the thread that handles all DAG node 743 * firing. To minimize locking and unlocking, we grab a copy of the 744 * entire node queue and then set the node queue to NULL before doing 745 * any firing of nodes. This way we only have to release the lock 746 * once. Of course, it's probably rare that there's more than one 747 * node in the queue at any one time, but it sometimes happens. 748 */ 749 750 static void 751 DAGExecutionThread(RF_ThreadArg_t arg) 752 { 753 RF_DagNode_t *nd, *local_nq, *term_nq, *fire_nq; 754 RF_Raid_t *raidPtr; 755 756 raidPtr = (RF_Raid_t *) arg; 757 758 #if RF_DEBUG_ENGINE 759 if (rf_engineDebug) { 760 printf("raid%d: Engine thread is running\n", raidPtr->raidid); 761 } 762 #endif 763 764 DO_LOCK(raidPtr); 765 while (!raidPtr->shutdown_engine) { 766 767 while (raidPtr->node_queue != NULL) { 768 local_nq = raidPtr->node_queue; 769 fire_nq = NULL; 770 term_nq = NULL; 771 raidPtr->node_queue = NULL; 772 DO_UNLOCK(raidPtr); 773 774 /* first, strip out the terminal nodes */ 775 while (local_nq) { 776 nd = local_nq; 777 local_nq = local_nq->next; 778 switch (nd->dagHdr->status) { 779 case rf_enable: 780 case rf_rollForward: 781 if (nd->numSuccedents == 0) { 782 /* end of the dag, add to 783 * callback list */ 784 nd->next = term_nq; 785 term_nq = nd; 786 } else { 787 /* not the end, add to the 788 * fire queue */ 789 nd->next = fire_nq; 790 fire_nq = nd; 791 } 792 break; 793 case rf_rollBackward: 794 if (nd->numAntecedents == 0) { 795 /* end of the dag, add to the 796 * callback list */ 797 nd->next = term_nq; 798 term_nq = nd; 799 } else { 800 /* not the end, add to the 801 * fire queue */ 802 nd->next = fire_nq; 803 fire_nq = nd; 804 } 805 break; 806 default: 807 RF_PANIC(); 808 break; 809 } 810 } 811 812 /* execute callback of dags which have reached the 813 * terminal node */ 814 while (term_nq) { 815 nd = term_nq; 816 term_nq = term_nq->next; 817 nd->next = NULL; 818 (nd->dagHdr->cbFunc) (nd->dagHdr->cbArg); 819 raidPtr->dags_in_flight--; /* debug only */ 820 } 821 822 /* fire remaining nodes */ 823 FireNodeList(fire_nq); 824 825 DO_LOCK(raidPtr); 826 } 827 while (!raidPtr->shutdown_engine && 828 raidPtr->node_queue == NULL) { 829 DO_WAIT(raidPtr); 830 } 831 } 832 833 /* Let rf_ShutdownEngine know that we're done... */ 834 raidPtr->shutdown_engine = 0; 835 DO_SIGNAL(raidPtr); 836 837 DO_UNLOCK(raidPtr); 838 839 kthread_exit(0); 840 } 841 842 /* 843 * rf_RaidIOThread() -- When I/O to a component begins, raidstrategy() 844 * puts the I/O on a buf_queue, and then signals raidPtr->iodone. If 845 * necessary, this function calls raidstart() to initiate the I/O. 846 * When I/O to a component completes, KernelWakeupFunc() puts the 847 * completed request onto raidPtr->iodone TAILQ. This function looks 848 * after requests on that queue by calling rf_DiskIOComplete() for the 849 * request, and by calling any required CompleteFunc for the request. 850 */ 851 852 static void 853 rf_RaidIOThread(RF_ThreadArg_t arg) 854 { 855 RF_Raid_t *raidPtr; 856 RF_DiskQueueData_t *req; 857 858 raidPtr = (RF_Raid_t *) arg; 859 860 rf_lock_mutex2(raidPtr->iodone_lock); 861 862 while (!raidPtr->shutdown_raidio) { 863 /* if there is nothing to do, then snooze. */ 864 if (TAILQ_EMPTY(&(raidPtr->iodone)) && 865 rf_buf_queue_check(raidPtr->raidid)) { 866 rf_wait_cond2(raidPtr->iodone_cv, raidPtr->iodone_lock); 867 } 868 869 /* Check for deferred parity-map-related work. */ 870 if (raidPtr->parity_map != NULL) { 871 rf_unlock_mutex2(raidPtr->iodone_lock); 872 rf_paritymap_checkwork(raidPtr->parity_map); 873 rf_lock_mutex2(raidPtr->iodone_lock); 874 } 875 876 /* See what I/Os, if any, have arrived */ 877 while ((req = TAILQ_FIRST(&(raidPtr->iodone))) != NULL) { 878 TAILQ_REMOVE(&(raidPtr->iodone), req, iodone_entries); 879 rf_unlock_mutex2(raidPtr->iodone_lock); 880 rf_DiskIOComplete(req->queue, req, req->error); 881 (req->CompleteFunc) (req->argument, req->error); 882 rf_lock_mutex2(raidPtr->iodone_lock); 883 } 884 885 /* process any pending outgoing IO */ 886 rf_unlock_mutex2(raidPtr->iodone_lock); 887 raidstart(raidPtr); 888 rf_lock_mutex2(raidPtr->iodone_lock); 889 890 } 891 892 /* Let rf_ShutdownEngine know that we're done... */ 893 raidPtr->shutdown_raidio = 0; 894 rf_signal_cond2(raidPtr->iodone_cv); 895 896 rf_unlock_mutex2(raidPtr->iodone_lock); 897 898 kthread_exit(0); 899 } 900