1 /* $NetBSD: rf_engine.c,v 1.50 2014/10/18 08:33:28 snj Exp $ */ 2 /* 3 * Copyright (c) 1995 Carnegie-Mellon University. 4 * All rights reserved. 5 * 6 * Author: William V. Courtright II, Mark Holland, Rachad Youssef 7 * 8 * Permission to use, copy, modify and distribute this software and 9 * its documentation is hereby granted, provided that both the copyright 10 * notice and this permission notice appear in all copies of the 11 * software, derivative works or modified versions, and any portions 12 * thereof, and that both notices appear in supporting documentation. 13 * 14 * CARNEGIE MELLON ALLOWS FREE USE OF THIS SOFTWARE IN ITS "AS IS" 15 * CONDITION. CARNEGIE MELLON DISCLAIMS ANY LIABILITY OF ANY KIND 16 * FOR ANY DAMAGES WHATSOEVER RESULTING FROM THE USE OF THIS SOFTWARE. 17 * 18 * Carnegie Mellon requests users of this software to return to 19 * 20 * Software Distribution Coordinator or Software.Distribution@CS.CMU.EDU 21 * School of Computer Science 22 * Carnegie Mellon University 23 * Pittsburgh PA 15213-3890 24 * 25 * any improvements or extensions that they make and grant Carnegie the 26 * rights to redistribute these changes. 27 */ 28 29 /**************************************************************************** 30 * * 31 * engine.c -- code for DAG execution engine * 32 * * 33 * Modified to work as follows (holland): * 34 * A user-thread calls into DispatchDAG, which fires off the nodes that * 35 * are direct successors to the header node. DispatchDAG then returns, * 36 * and the rest of the I/O continues asynchronously. As each node * 37 * completes, the node execution function calls FinishNode(). FinishNode * 38 * scans the list of successors to the node and increments the antecedent * 39 * counts. Each node that becomes enabled is placed on a central node * 40 * queue. A dedicated dag-execution thread grabs nodes off of this * 41 * queue and fires them. * 42 * * 43 * NULL nodes are never fired. * 44 * * 45 * Terminator nodes are never fired, but rather cause the callback * 46 * associated with the DAG to be invoked. * 47 * * 48 * If a node fails, the dag either rolls forward to the completion or * 49 * rolls back, undoing previously-completed nodes and fails atomically. * 50 * The direction of recovery is determined by the location of the failed * 51 * node in the graph. If the failure occurred before the commit node in * 52 * the graph, backward recovery is used. Otherwise, forward recovery is * 53 * used. * 54 * * 55 ****************************************************************************/ 56 57 #include <sys/cdefs.h> 58 __KERNEL_RCSID(0, "$NetBSD: rf_engine.c,v 1.50 2014/10/18 08:33:28 snj Exp $"); 59 60 #include <sys/errno.h> 61 62 #include "rf_threadstuff.h" 63 #include "rf_dag.h" 64 #include "rf_engine.h" 65 #include "rf_etimer.h" 66 #include "rf_general.h" 67 #include "rf_dagutils.h" 68 #include "rf_shutdown.h" 69 #include "rf_raid.h" 70 #include "rf_kintf.h" 71 #include "rf_paritymap.h" 72 73 static void rf_ShutdownEngine(void *); 74 static void DAGExecutionThread(RF_ThreadArg_t arg); 75 static void rf_RaidIOThread(RF_ThreadArg_t arg); 76 77 /* synchronization primitives for this file. DO_WAIT should be enclosed in a while loop. */ 78 79 #define DO_LOCK(_r_) \ 80 rf_lock_mutex2((_r_)->node_queue_mutex) 81 82 #define DO_UNLOCK(_r_) \ 83 rf_unlock_mutex2((_r_)->node_queue_mutex) 84 85 #define DO_WAIT(_r_) \ 86 rf_wait_cond2((_r_)->node_queue_cv, (_r_)->node_queue_mutex) 87 88 #define DO_SIGNAL(_r_) \ 89 rf_broadcast_cond2((_r_)->node_queue_cv) /* XXX rf_signal_cond2? */ 90 91 static void 92 rf_ShutdownEngine(void *arg) 93 { 94 RF_Raid_t *raidPtr; 95 96 raidPtr = (RF_Raid_t *) arg; 97 98 /* Tell the rf_RaidIOThread to shutdown */ 99 rf_lock_mutex2(raidPtr->iodone_lock); 100 101 raidPtr->shutdown_raidio = 1; 102 rf_signal_cond2(raidPtr->iodone_cv); 103 104 /* ...and wait for it to tell us it has finished */ 105 while (raidPtr->shutdown_raidio) 106 rf_wait_cond2(raidPtr->iodone_cv, raidPtr->iodone_lock); 107 108 rf_unlock_mutex2(raidPtr->iodone_lock); 109 110 /* Now shut down the DAG execution engine. */ 111 DO_LOCK(raidPtr); 112 raidPtr->shutdown_engine = 1; 113 DO_SIGNAL(raidPtr); 114 115 /* ...and wait for it to tell us it has finished */ 116 while (raidPtr->shutdown_engine) 117 DO_WAIT(raidPtr); 118 119 DO_UNLOCK(raidPtr); 120 121 rf_destroy_mutex2(raidPtr->node_queue_mutex); 122 rf_destroy_cond2(raidPtr->node_queue_cv); 123 124 rf_destroy_mutex2(raidPtr->iodone_lock); 125 rf_destroy_cond2(raidPtr->iodone_cv); 126 } 127 128 int 129 rf_ConfigureEngine(RF_ShutdownList_t **listp, RF_Raid_t *raidPtr, 130 RF_Config_t *cfgPtr) 131 { 132 133 /* 134 * Initialise iodone for the IO thread. 135 */ 136 TAILQ_INIT(&(raidPtr->iodone)); 137 rf_init_mutex2(raidPtr->iodone_lock, IPL_VM); 138 rf_init_cond2(raidPtr->iodone_cv, "raidiow"); 139 140 rf_init_mutex2(raidPtr->node_queue_mutex, IPL_VM); 141 rf_init_cond2(raidPtr->node_queue_cv, "rfnodeq"); 142 raidPtr->node_queue = NULL; 143 raidPtr->dags_in_flight = 0; 144 145 /* we create the execution thread only once per system boot. no need 146 * to check return code b/c the kernel panics if it can't create the 147 * thread. */ 148 #if RF_DEBUG_ENGINE 149 if (rf_engineDebug) { 150 printf("raid%d: Creating engine thread\n", raidPtr->raidid); 151 } 152 #endif 153 if (RF_CREATE_ENGINE_THREAD(raidPtr->engine_thread, 154 DAGExecutionThread, raidPtr, 155 "raid%d", raidPtr->raidid)) { 156 printf("raid%d: Unable to create engine thread\n", 157 raidPtr->raidid); 158 return (ENOMEM); 159 } 160 if (RF_CREATE_ENGINE_THREAD(raidPtr->engine_helper_thread, 161 rf_RaidIOThread, raidPtr, 162 "raidio%d", raidPtr->raidid)) { 163 printf("raid%d: Unable to create raidio thread\n", 164 raidPtr->raidid); 165 return (ENOMEM); 166 } 167 #if RF_DEBUG_ENGINE 168 if (rf_engineDebug) { 169 printf("raid%d: Created engine thread\n", raidPtr->raidid); 170 } 171 #endif 172 173 /* engine thread is now running and waiting for work */ 174 #if RF_DEBUG_ENGINE 175 if (rf_engineDebug) { 176 printf("raid%d: Engine thread running and waiting for events\n", raidPtr->raidid); 177 } 178 #endif 179 rf_ShutdownCreate(listp, rf_ShutdownEngine, raidPtr); 180 181 return (0); 182 } 183 184 #if 0 185 static int 186 BranchDone(RF_DagNode_t *node) 187 { 188 int i; 189 190 /* return true if forward execution is completed for a node and its 191 * succedents */ 192 switch (node->status) { 193 case rf_wait: 194 /* should never be called in this state */ 195 RF_PANIC(); 196 break; 197 case rf_fired: 198 /* node is currently executing, so we're not done */ 199 return (RF_FALSE); 200 case rf_good: 201 /* for each succedent recursively check branch */ 202 for (i = 0; i < node->numSuccedents; i++) 203 if (!BranchDone(node->succedents[i])) 204 return RF_FALSE; 205 return RF_TRUE; /* node and all succedent branches aren't in 206 * fired state */ 207 case rf_bad: 208 /* succedents can't fire */ 209 return (RF_TRUE); 210 case rf_recover: 211 /* should never be called in this state */ 212 RF_PANIC(); 213 break; 214 case rf_undone: 215 case rf_panic: 216 /* XXX need to fix this case */ 217 /* for now, assume that we're done */ 218 return (RF_TRUE); 219 default: 220 /* illegal node status */ 221 RF_PANIC(); 222 break; 223 } 224 } 225 #endif 226 227 static int 228 NodeReady(RF_DagNode_t *node) 229 { 230 int ready; 231 232 switch (node->dagHdr->status) { 233 case rf_enable: 234 case rf_rollForward: 235 if ((node->status == rf_wait) && 236 (node->numAntecedents == node->numAntDone)) 237 ready = RF_TRUE; 238 else 239 ready = RF_FALSE; 240 break; 241 case rf_rollBackward: 242 RF_ASSERT(node->numSuccDone <= node->numSuccedents); 243 RF_ASSERT(node->numSuccFired <= node->numSuccedents); 244 RF_ASSERT(node->numSuccFired <= node->numSuccDone); 245 if ((node->status == rf_good) && 246 (node->numSuccDone == node->numSuccedents)) 247 ready = RF_TRUE; 248 else 249 ready = RF_FALSE; 250 break; 251 default: 252 printf("Execution engine found illegal DAG status in NodeReady\n"); 253 RF_PANIC(); 254 break; 255 } 256 257 return (ready); 258 } 259 260 261 262 /* user context and dag-exec-thread context: Fire a node. The node's 263 * status field determines which function, do or undo, to be fired. 264 * This routine assumes that the node's status field has alread been 265 * set to "fired" or "recover" to indicate the direction of execution. 266 */ 267 static void 268 FireNode(RF_DagNode_t *node) 269 { 270 switch (node->status) { 271 case rf_fired: 272 /* fire the do function of a node */ 273 #if RF_DEBUG_ENGINE 274 if (rf_engineDebug) { 275 printf("raid%d: Firing node 0x%lx (%s)\n", 276 node->dagHdr->raidPtr->raidid, 277 (unsigned long) node, node->name); 278 } 279 #endif 280 if (node->flags & RF_DAGNODE_FLAG_YIELD) { 281 #if defined(__NetBSD__) && defined(_KERNEL) 282 /* thread_block(); */ 283 /* printf("Need to block the thread here...\n"); */ 284 /* XXX thread_block is actually mentioned in 285 * /usr/include/vm/vm_extern.h */ 286 #else 287 thread_block(); 288 #endif 289 } 290 (*(node->doFunc)) (node); 291 break; 292 case rf_recover: 293 /* fire the undo function of a node */ 294 #if RF_DEBUG_ENGINE 295 if (rf_engineDebug) { 296 printf("raid%d: Firing (undo) node 0x%lx (%s)\n", 297 node->dagHdr->raidPtr->raidid, 298 (unsigned long) node, node->name); 299 } 300 #endif 301 if (node->flags & RF_DAGNODE_FLAG_YIELD) 302 #if defined(__NetBSD__) && defined(_KERNEL) 303 /* thread_block(); */ 304 /* printf("Need to block the thread here...\n"); */ 305 /* XXX thread_block is actually mentioned in 306 * /usr/include/vm/vm_extern.h */ 307 #else 308 thread_block(); 309 #endif 310 (*(node->undoFunc)) (node); 311 break; 312 default: 313 RF_PANIC(); 314 break; 315 } 316 } 317 318 319 320 /* user context: 321 * Attempt to fire each node in a linear array. 322 * The entire list is fired atomically. 323 */ 324 static void 325 FireNodeArray(int numNodes, RF_DagNode_t **nodeList) 326 { 327 RF_DagStatus_t dstat; 328 RF_DagNode_t *node; 329 int i, j; 330 331 /* first, mark all nodes which are ready to be fired */ 332 for (i = 0; i < numNodes; i++) { 333 node = nodeList[i]; 334 dstat = node->dagHdr->status; 335 RF_ASSERT((node->status == rf_wait) || 336 (node->status == rf_good)); 337 if (NodeReady(node)) { 338 if ((dstat == rf_enable) || 339 (dstat == rf_rollForward)) { 340 RF_ASSERT(node->status == rf_wait); 341 if (node->commitNode) 342 node->dagHdr->numCommits++; 343 node->status = rf_fired; 344 for (j = 0; j < node->numAntecedents; j++) 345 node->antecedents[j]->numSuccFired++; 346 } else { 347 RF_ASSERT(dstat == rf_rollBackward); 348 RF_ASSERT(node->status == rf_good); 349 /* only one commit node per graph */ 350 RF_ASSERT(node->commitNode == RF_FALSE); 351 node->status = rf_recover; 352 } 353 } 354 } 355 /* now, fire the nodes */ 356 for (i = 0; i < numNodes; i++) { 357 if ((nodeList[i]->status == rf_fired) || 358 (nodeList[i]->status == rf_recover)) 359 FireNode(nodeList[i]); 360 } 361 } 362 363 364 /* user context: 365 * Attempt to fire each node in a linked list. 366 * The entire list is fired atomically. 367 */ 368 static void 369 FireNodeList(RF_DagNode_t *nodeList) 370 { 371 RF_DagNode_t *node, *next; 372 RF_DagStatus_t dstat; 373 int j; 374 375 if (nodeList) { 376 /* first, mark all nodes which are ready to be fired */ 377 for (node = nodeList; node; node = next) { 378 next = node->next; 379 dstat = node->dagHdr->status; 380 RF_ASSERT((node->status == rf_wait) || 381 (node->status == rf_good)); 382 if (NodeReady(node)) { 383 if ((dstat == rf_enable) || 384 (dstat == rf_rollForward)) { 385 RF_ASSERT(node->status == rf_wait); 386 if (node->commitNode) 387 node->dagHdr->numCommits++; 388 node->status = rf_fired; 389 for (j = 0; j < node->numAntecedents; j++) 390 node->antecedents[j]->numSuccFired++; 391 } else { 392 RF_ASSERT(dstat == rf_rollBackward); 393 RF_ASSERT(node->status == rf_good); 394 /* only one commit node per graph */ 395 RF_ASSERT(node->commitNode == RF_FALSE); 396 node->status = rf_recover; 397 } 398 } 399 } 400 /* now, fire the nodes */ 401 for (node = nodeList; node; node = next) { 402 next = node->next; 403 if ((node->status == rf_fired) || 404 (node->status == rf_recover)) 405 FireNode(node); 406 } 407 } 408 } 409 /* interrupt context: 410 * for each succedent 411 * propagate required results from node to succedent 412 * increment succedent's numAntDone 413 * place newly-enable nodes on node queue for firing 414 * 415 * To save context switches, we don't place NIL nodes on the node queue, 416 * but rather just process them as if they had fired. Note that NIL nodes 417 * that are the direct successors of the header will actually get fired by 418 * DispatchDAG, which is fine because no context switches are involved. 419 * 420 * Important: when running at user level, this can be called by any 421 * disk thread, and so the increment and check of the antecedent count 422 * must be locked. I used the node queue mutex and locked down the 423 * entire function, but this is certainly overkill. 424 */ 425 static void 426 PropagateResults(RF_DagNode_t *node, int context) 427 { 428 RF_DagNode_t *s, *a; 429 RF_Raid_t *raidPtr; 430 int i; 431 RF_DagNode_t *finishlist = NULL; /* a list of NIL nodes to be 432 * finished */ 433 RF_DagNode_t *skiplist = NULL; /* list of nodes with failed truedata 434 * antecedents */ 435 RF_DagNode_t *firelist = NULL; /* a list of nodes to be fired */ 436 RF_DagNode_t *q = NULL, *qh = NULL, *next; 437 int j, skipNode; 438 439 raidPtr = node->dagHdr->raidPtr; 440 441 DO_LOCK(raidPtr); 442 443 /* debug - validate fire counts */ 444 for (i = 0; i < node->numAntecedents; i++) { 445 a = *(node->antecedents + i); 446 RF_ASSERT(a->numSuccFired >= a->numSuccDone); 447 RF_ASSERT(a->numSuccFired <= a->numSuccedents); 448 a->numSuccDone++; 449 } 450 451 switch (node->dagHdr->status) { 452 case rf_enable: 453 case rf_rollForward: 454 for (i = 0; i < node->numSuccedents; i++) { 455 s = *(node->succedents + i); 456 RF_ASSERT(s->status == rf_wait); 457 (s->numAntDone)++; 458 if (s->numAntDone == s->numAntecedents) { 459 /* look for NIL nodes */ 460 if (s->doFunc == rf_NullNodeFunc) { 461 /* don't fire NIL nodes, just process 462 * them */ 463 s->next = finishlist; 464 finishlist = s; 465 } else { 466 /* look to see if the node is to be 467 * skipped */ 468 skipNode = RF_FALSE; 469 for (j = 0; j < s->numAntecedents; j++) 470 if ((s->antType[j] == rf_trueData) && (s->antecedents[j]->status == rf_bad)) 471 skipNode = RF_TRUE; 472 if (skipNode) { 473 /* this node has one or more 474 * failed true data 475 * dependencies, so skip it */ 476 s->next = skiplist; 477 skiplist = s; 478 } else 479 /* add s to list of nodes (q) 480 * to execute */ 481 if (context != RF_INTR_CONTEXT) { 482 /* we only have to 483 * enqueue if we're at 484 * intr context */ 485 /* put node on 486 a list to 487 be fired 488 after we 489 unlock */ 490 s->next = firelist; 491 firelist = s; 492 } else { 493 /* enqueue the 494 node for 495 the dag 496 exec thread 497 to fire */ 498 RF_ASSERT(NodeReady(s)); 499 if (q) { 500 q->next = s; 501 q = s; 502 } else { 503 qh = q = s; 504 qh->next = NULL; 505 } 506 } 507 } 508 } 509 } 510 511 if (q) { 512 /* xfer our local list of nodes to the node queue */ 513 q->next = raidPtr->node_queue; 514 raidPtr->node_queue = qh; 515 DO_SIGNAL(raidPtr); 516 } 517 DO_UNLOCK(raidPtr); 518 519 for (; skiplist; skiplist = next) { 520 next = skiplist->next; 521 skiplist->status = rf_skipped; 522 for (i = 0; i < skiplist->numAntecedents; i++) { 523 skiplist->antecedents[i]->numSuccFired++; 524 } 525 if (skiplist->commitNode) { 526 skiplist->dagHdr->numCommits++; 527 } 528 rf_FinishNode(skiplist, context); 529 } 530 for (; finishlist; finishlist = next) { 531 /* NIL nodes: no need to fire them */ 532 next = finishlist->next; 533 finishlist->status = rf_good; 534 for (i = 0; i < finishlist->numAntecedents; i++) { 535 finishlist->antecedents[i]->numSuccFired++; 536 } 537 if (finishlist->commitNode) 538 finishlist->dagHdr->numCommits++; 539 /* 540 * Okay, here we're calling rf_FinishNode() on 541 * nodes that have the null function as their 542 * work proc. Such a node could be the 543 * terminal node in a DAG. If so, it will 544 * cause the DAG to complete, which will in 545 * turn free memory used by the DAG, which 546 * includes the node in question. Thus, we 547 * must avoid referencing the node at all 548 * after calling rf_FinishNode() on it. */ 549 rf_FinishNode(finishlist, context); /* recursive call */ 550 } 551 /* fire all nodes in firelist */ 552 FireNodeList(firelist); 553 break; 554 555 case rf_rollBackward: 556 for (i = 0; i < node->numAntecedents; i++) { 557 a = *(node->antecedents + i); 558 RF_ASSERT(a->status == rf_good); 559 RF_ASSERT(a->numSuccDone <= a->numSuccedents); 560 RF_ASSERT(a->numSuccDone <= a->numSuccFired); 561 562 if (a->numSuccDone == a->numSuccFired) { 563 if (a->undoFunc == rf_NullNodeFunc) { 564 /* don't fire NIL nodes, just process 565 * them */ 566 a->next = finishlist; 567 finishlist = a; 568 } else { 569 if (context != RF_INTR_CONTEXT) { 570 /* we only have to enqueue if 571 * we're at intr context */ 572 /* put node on a list to be 573 fired after we unlock */ 574 a->next = firelist; 575 576 firelist = a; 577 } else { 578 /* enqueue the node for the 579 dag exec thread to fire */ 580 RF_ASSERT(NodeReady(a)); 581 if (q) { 582 q->next = a; 583 q = a; 584 } else { 585 qh = q = a; 586 qh->next = NULL; 587 } 588 } 589 } 590 } 591 } 592 if (q) { 593 /* xfer our local list of nodes to the node queue */ 594 q->next = raidPtr->node_queue; 595 raidPtr->node_queue = qh; 596 DO_SIGNAL(raidPtr); 597 } 598 DO_UNLOCK(raidPtr); 599 for (; finishlist; finishlist = next) { 600 /* NIL nodes: no need to fire them */ 601 next = finishlist->next; 602 finishlist->status = rf_good; 603 /* 604 * Okay, here we're calling rf_FinishNode() on 605 * nodes that have the null function as their 606 * work proc. Such a node could be the first 607 * node in a DAG. If so, it will cause the DAG 608 * to complete, which will in turn free memory 609 * used by the DAG, which includes the node in 610 * question. Thus, we must avoid referencing 611 * the node at all after calling 612 * rf_FinishNode() on it. */ 613 rf_FinishNode(finishlist, context); /* recursive call */ 614 } 615 /* fire all nodes in firelist */ 616 FireNodeList(firelist); 617 618 break; 619 default: 620 printf("Engine found illegal DAG status in PropagateResults()\n"); 621 RF_PANIC(); 622 break; 623 } 624 } 625 626 627 628 /* 629 * Process a fired node which has completed 630 */ 631 static void 632 ProcessNode(RF_DagNode_t *node, int context) 633 { 634 #if RF_DEBUG_ENGINE 635 RF_Raid_t *raidPtr; 636 637 raidPtr = node->dagHdr->raidPtr; 638 #endif 639 640 switch (node->status) { 641 case rf_good: 642 /* normal case, don't need to do anything */ 643 break; 644 case rf_bad: 645 if ((node->dagHdr->numCommits > 0) || 646 (node->dagHdr->numCommitNodes == 0)) { 647 /* crossed commit barrier */ 648 node->dagHdr->status = rf_rollForward; 649 #if RF_DEBUG_ENGINE 650 if (rf_engineDebug) { 651 printf("raid%d: node (%s) returned fail, rolling forward\n", raidPtr->raidid, node->name); 652 } 653 #endif 654 } else { 655 /* never reached commit barrier */ 656 node->dagHdr->status = rf_rollBackward; 657 #if RF_DEBUG_ENGINE 658 if (rf_engineDebug) { 659 printf("raid%d: node (%s) returned fail, rolling backward\n", raidPtr->raidid, node->name); 660 } 661 #endif 662 } 663 break; 664 case rf_undone: 665 /* normal rollBackward case, don't need to do anything */ 666 break; 667 case rf_panic: 668 /* an undo node failed!!! */ 669 printf("UNDO of a node failed!!!\n"); 670 break; 671 default: 672 printf("node finished execution with an illegal status!!!\n"); 673 RF_PANIC(); 674 break; 675 } 676 677 /* enqueue node's succedents (antecedents if rollBackward) for 678 * execution */ 679 PropagateResults(node, context); 680 } 681 682 683 684 /* user context or dag-exec-thread context: 685 * This is the first step in post-processing a newly-completed node. 686 * This routine is called by each node execution function to mark the node 687 * as complete and fire off any successors that have been enabled. 688 */ 689 int 690 rf_FinishNode(RF_DagNode_t *node, int context) 691 { 692 int retcode = RF_FALSE; 693 node->dagHdr->numNodesCompleted++; 694 ProcessNode(node, context); 695 696 return (retcode); 697 } 698 699 700 /* user context: submit dag for execution, return non-zero if we have 701 * to wait for completion. if and only if we return non-zero, we'll 702 * cause cbFunc to get invoked with cbArg when the DAG has completed. 703 * 704 * for now we always return 1. If the DAG does not cause any I/O, 705 * then the callback may get invoked before DispatchDAG returns. 706 * There's code in state 5 of ContinueRaidAccess to handle this. 707 * 708 * All we do here is fire the direct successors of the header node. 709 * The DAG execution thread does the rest of the dag processing. */ 710 int 711 rf_DispatchDAG(RF_DagHeader_t *dag, void (*cbFunc) (void *), 712 void *cbArg) 713 { 714 RF_Raid_t *raidPtr; 715 716 raidPtr = dag->raidPtr; 717 #if RF_ACC_TRACE > 0 718 if (dag->tracerec) { 719 RF_ETIMER_START(dag->tracerec->timer); 720 } 721 #endif 722 #if DEBUG 723 #if RF_DEBUG_VALIDATE_DAG 724 if (rf_engineDebug || rf_validateDAGDebug) { 725 if (rf_ValidateDAG(dag)) 726 RF_PANIC(); 727 } 728 #endif 729 #endif 730 #if RF_DEBUG_ENGINE 731 if (rf_engineDebug) { 732 printf("raid%d: Entering DispatchDAG\n", raidPtr->raidid); 733 } 734 #endif 735 raidPtr->dags_in_flight++; /* debug only: blow off proper 736 * locking */ 737 dag->cbFunc = cbFunc; 738 dag->cbArg = cbArg; 739 dag->numNodesCompleted = 0; 740 dag->status = rf_enable; 741 FireNodeArray(dag->numSuccedents, dag->succedents); 742 return (1); 743 } 744 /* dedicated kernel thread: the thread that handles all DAG node 745 * firing. To minimize locking and unlocking, we grab a copy of the 746 * entire node queue and then set the node queue to NULL before doing 747 * any firing of nodes. This way we only have to release the lock 748 * once. Of course, it's probably rare that there's more than one 749 * node in the queue at any one time, but it sometimes happens. 750 */ 751 752 static void 753 DAGExecutionThread(RF_ThreadArg_t arg) 754 { 755 RF_DagNode_t *nd, *local_nq, *term_nq, *fire_nq; 756 RF_Raid_t *raidPtr; 757 758 raidPtr = (RF_Raid_t *) arg; 759 760 #if RF_DEBUG_ENGINE 761 if (rf_engineDebug) { 762 printf("raid%d: Engine thread is running\n", raidPtr->raidid); 763 } 764 #endif 765 766 DO_LOCK(raidPtr); 767 while (!raidPtr->shutdown_engine) { 768 769 while (raidPtr->node_queue != NULL) { 770 local_nq = raidPtr->node_queue; 771 fire_nq = NULL; 772 term_nq = NULL; 773 raidPtr->node_queue = NULL; 774 DO_UNLOCK(raidPtr); 775 776 /* first, strip out the terminal nodes */ 777 while (local_nq) { 778 nd = local_nq; 779 local_nq = local_nq->next; 780 switch (nd->dagHdr->status) { 781 case rf_enable: 782 case rf_rollForward: 783 if (nd->numSuccedents == 0) { 784 /* end of the dag, add to 785 * callback list */ 786 nd->next = term_nq; 787 term_nq = nd; 788 } else { 789 /* not the end, add to the 790 * fire queue */ 791 nd->next = fire_nq; 792 fire_nq = nd; 793 } 794 break; 795 case rf_rollBackward: 796 if (nd->numAntecedents == 0) { 797 /* end of the dag, add to the 798 * callback list */ 799 nd->next = term_nq; 800 term_nq = nd; 801 } else { 802 /* not the end, add to the 803 * fire queue */ 804 nd->next = fire_nq; 805 fire_nq = nd; 806 } 807 break; 808 default: 809 RF_PANIC(); 810 break; 811 } 812 } 813 814 /* execute callback of dags which have reached the 815 * terminal node */ 816 while (term_nq) { 817 nd = term_nq; 818 term_nq = term_nq->next; 819 nd->next = NULL; 820 (nd->dagHdr->cbFunc) (nd->dagHdr->cbArg); 821 raidPtr->dags_in_flight--; /* debug only */ 822 } 823 824 /* fire remaining nodes */ 825 FireNodeList(fire_nq); 826 827 DO_LOCK(raidPtr); 828 } 829 while (!raidPtr->shutdown_engine && 830 raidPtr->node_queue == NULL) { 831 DO_WAIT(raidPtr); 832 } 833 } 834 835 /* Let rf_ShutdownEngine know that we're done... */ 836 raidPtr->shutdown_engine = 0; 837 DO_SIGNAL(raidPtr); 838 839 DO_UNLOCK(raidPtr); 840 841 kthread_exit(0); 842 } 843 844 /* 845 * rf_RaidIOThread() -- When I/O to a component begins, raidstrategy() 846 * puts the I/O on a buf_queue, and then signals raidPtr->iodone. If 847 * necessary, this function calls raidstart() to initiate the I/O. 848 * When I/O to a component completes, KernelWakeupFunc() puts the 849 * completed request onto raidPtr->iodone TAILQ. This function looks 850 * after requests on that queue by calling rf_DiskIOComplete() for the 851 * request, and by calling any required CompleteFunc for the request. 852 */ 853 854 static void 855 rf_RaidIOThread(RF_ThreadArg_t arg) 856 { 857 RF_Raid_t *raidPtr; 858 RF_DiskQueueData_t *req; 859 860 raidPtr = (RF_Raid_t *) arg; 861 862 rf_lock_mutex2(raidPtr->iodone_lock); 863 864 while (!raidPtr->shutdown_raidio) { 865 /* if there is nothing to do, then snooze. */ 866 if (TAILQ_EMPTY(&(raidPtr->iodone)) && 867 rf_buf_queue_check(raidPtr)) { 868 rf_wait_cond2(raidPtr->iodone_cv, raidPtr->iodone_lock); 869 } 870 871 /* Check for deferred parity-map-related work. */ 872 if (raidPtr->parity_map != NULL) { 873 rf_unlock_mutex2(raidPtr->iodone_lock); 874 rf_paritymap_checkwork(raidPtr->parity_map); 875 rf_lock_mutex2(raidPtr->iodone_lock); 876 } 877 878 /* See what I/Os, if any, have arrived */ 879 while ((req = TAILQ_FIRST(&(raidPtr->iodone))) != NULL) { 880 TAILQ_REMOVE(&(raidPtr->iodone), req, iodone_entries); 881 rf_unlock_mutex2(raidPtr->iodone_lock); 882 rf_DiskIOComplete(req->queue, req, req->error); 883 (req->CompleteFunc) (req->argument, req->error); 884 rf_lock_mutex2(raidPtr->iodone_lock); 885 } 886 887 /* process any pending outgoing IO */ 888 rf_unlock_mutex2(raidPtr->iodone_lock); 889 raidstart(raidPtr); 890 rf_lock_mutex2(raidPtr->iodone_lock); 891 892 } 893 894 /* Let rf_ShutdownEngine know that we're done... */ 895 raidPtr->shutdown_raidio = 0; 896 rf_signal_cond2(raidPtr->iodone_cv); 897 898 rf_unlock_mutex2(raidPtr->iodone_lock); 899 900 kthread_exit(0); 901 } 902