1 /* $NetBSD: rf_engine.c,v 1.52 2016/12/11 05:27:00 nat Exp $ */ 2 /* 3 * Copyright (c) 1995 Carnegie-Mellon University. 4 * All rights reserved. 5 * 6 * Author: William V. Courtright II, Mark Holland, Rachad Youssef 7 * 8 * Permission to use, copy, modify and distribute this software and 9 * its documentation is hereby granted, provided that both the copyright 10 * notice and this permission notice appear in all copies of the 11 * software, derivative works or modified versions, and any portions 12 * thereof, and that both notices appear in supporting documentation. 13 * 14 * CARNEGIE MELLON ALLOWS FREE USE OF THIS SOFTWARE IN ITS "AS IS" 15 * CONDITION. CARNEGIE MELLON DISCLAIMS ANY LIABILITY OF ANY KIND 16 * FOR ANY DAMAGES WHATSOEVER RESULTING FROM THE USE OF THIS SOFTWARE. 17 * 18 * Carnegie Mellon requests users of this software to return to 19 * 20 * Software Distribution Coordinator or Software.Distribution@CS.CMU.EDU 21 * School of Computer Science 22 * Carnegie Mellon University 23 * Pittsburgh PA 15213-3890 24 * 25 * any improvements or extensions that they make and grant Carnegie the 26 * rights to redistribute these changes. 27 */ 28 29 /**************************************************************************** 30 * * 31 * engine.c -- code for DAG execution engine * 32 * * 33 * Modified to work as follows (holland): * 34 * A user-thread calls into DispatchDAG, which fires off the nodes that * 35 * are direct successors to the header node. DispatchDAG then returns, * 36 * and the rest of the I/O continues asynchronously. As each node * 37 * completes, the node execution function calls FinishNode(). FinishNode * 38 * scans the list of successors to the node and increments the antecedent * 39 * counts. Each node that becomes enabled is placed on a central node * 40 * queue. A dedicated dag-execution thread grabs nodes off of this * 41 * queue and fires them. * 42 * * 43 * NULL nodes are never fired. * 44 * * 45 * Terminator nodes are never fired, but rather cause the callback * 46 * associated with the DAG to be invoked. * 47 * * 48 * If a node fails, the dag either rolls forward to the completion or * 49 * rolls back, undoing previously-completed nodes and fails atomically. * 50 * The direction of recovery is determined by the location of the failed * 51 * node in the graph. If the failure occurred before the commit node in * 52 * the graph, backward recovery is used. Otherwise, forward recovery is * 53 * used. * 54 * * 55 ****************************************************************************/ 56 57 #include <sys/cdefs.h> 58 __KERNEL_RCSID(0, "$NetBSD: rf_engine.c,v 1.52 2016/12/11 05:27:00 nat Exp $"); 59 60 #include <sys/errno.h> 61 62 #include "rf_threadstuff.h" 63 #include "rf_dag.h" 64 #include "rf_engine.h" 65 #include "rf_etimer.h" 66 #include "rf_general.h" 67 #include "rf_dagutils.h" 68 #include "rf_shutdown.h" 69 #include "rf_raid.h" 70 #include "rf_kintf.h" 71 #include "rf_paritymap.h" 72 73 static void rf_ShutdownEngine(void *); 74 static void DAGExecutionThread(RF_ThreadArg_t arg); 75 static void rf_RaidIOThread(RF_ThreadArg_t arg); 76 77 /* synchronization primitives for this file. DO_WAIT should be enclosed in a while loop. */ 78 79 #define DO_LOCK(_r_) \ 80 rf_lock_mutex2((_r_)->node_queue_mutex) 81 82 #define DO_UNLOCK(_r_) \ 83 rf_unlock_mutex2((_r_)->node_queue_mutex) 84 85 #define DO_WAIT(_r_) \ 86 rf_wait_cond2((_r_)->node_queue_cv, (_r_)->node_queue_mutex) 87 88 #define DO_SIGNAL(_r_) \ 89 rf_broadcast_cond2((_r_)->node_queue_cv) /* XXX rf_signal_cond2? */ 90 91 static void 92 rf_ShutdownEngine(void *arg) 93 { 94 RF_Raid_t *raidPtr; 95 96 raidPtr = (RF_Raid_t *) arg; 97 98 /* Tell the rf_RaidIOThread to shutdown */ 99 rf_lock_mutex2(raidPtr->iodone_lock); 100 101 raidPtr->shutdown_raidio = 1; 102 rf_signal_cond2(raidPtr->iodone_cv); 103 104 /* ...and wait for it to tell us it has finished */ 105 while (raidPtr->shutdown_raidio) 106 rf_wait_cond2(raidPtr->iodone_cv, raidPtr->iodone_lock); 107 108 rf_unlock_mutex2(raidPtr->iodone_lock); 109 110 /* Now shut down the DAG execution engine. */ 111 DO_LOCK(raidPtr); 112 raidPtr->shutdown_engine = 1; 113 DO_SIGNAL(raidPtr); 114 115 /* ...and wait for it to tell us it has finished */ 116 while (raidPtr->shutdown_engine) 117 DO_WAIT(raidPtr); 118 119 DO_UNLOCK(raidPtr); 120 121 rf_destroy_mutex2(raidPtr->node_queue_mutex); 122 rf_destroy_cond2(raidPtr->node_queue_cv); 123 124 rf_destroy_mutex2(raidPtr->iodone_lock); 125 rf_destroy_cond2(raidPtr->iodone_cv); 126 } 127 128 int 129 rf_ConfigureEngine(RF_ShutdownList_t **listp, RF_Raid_t *raidPtr, 130 RF_Config_t *cfgPtr) 131 { 132 133 /* 134 * Initialise iodone for the IO thread. 135 */ 136 TAILQ_INIT(&(raidPtr->iodone)); 137 rf_init_mutex2(raidPtr->iodone_lock, IPL_VM); 138 rf_init_cond2(raidPtr->iodone_cv, "raidiow"); 139 140 rf_init_mutex2(raidPtr->node_queue_mutex, IPL_VM); 141 rf_init_cond2(raidPtr->node_queue_cv, "rfnodeq"); 142 raidPtr->node_queue = NULL; 143 raidPtr->dags_in_flight = 0; 144 145 /* we create the execution thread only once per system boot. no need 146 * to check return code b/c the kernel panics if it can't create the 147 * thread. */ 148 #if RF_DEBUG_ENGINE 149 if (rf_engineDebug) { 150 printf("raid%d: Creating engine thread\n", raidPtr->raidid); 151 } 152 #endif 153 if (RF_CREATE_ENGINE_THREAD(raidPtr->engine_thread, 154 DAGExecutionThread, raidPtr, 155 "raid%d", raidPtr->raidid)) { 156 printf("raid%d: Unable to create engine thread\n", 157 raidPtr->raidid); 158 return (ENOMEM); 159 } 160 if (RF_CREATE_ENGINE_THREAD(raidPtr->engine_helper_thread, 161 rf_RaidIOThread, raidPtr, 162 "raidio%d", raidPtr->raidid)) { 163 printf("raid%d: Unable to create raidio thread\n", 164 raidPtr->raidid); 165 return (ENOMEM); 166 } 167 #if RF_DEBUG_ENGINE 168 if (rf_engineDebug) { 169 printf("raid%d: Created engine thread\n", raidPtr->raidid); 170 } 171 #endif 172 173 /* engine thread is now running and waiting for work */ 174 #if RF_DEBUG_ENGINE 175 if (rf_engineDebug) { 176 printf("raid%d: Engine thread running and waiting for events\n", raidPtr->raidid); 177 } 178 #endif 179 rf_ShutdownCreate(listp, rf_ShutdownEngine, raidPtr); 180 181 return (0); 182 } 183 184 #if 0 185 static int 186 BranchDone(RF_DagNode_t *node) 187 { 188 int i; 189 190 /* return true if forward execution is completed for a node and its 191 * succedents */ 192 switch (node->status) { 193 case rf_wait: 194 /* should never be called in this state */ 195 RF_PANIC(); 196 break; 197 case rf_fired: 198 /* node is currently executing, so we're not done */ 199 return (RF_FALSE); 200 case rf_good: 201 /* for each succedent recursively check branch */ 202 for (i = 0; i < node->numSuccedents; i++) 203 if (!BranchDone(node->succedents[i])) 204 return RF_FALSE; 205 return RF_TRUE; /* node and all succedent branches aren't in 206 * fired state */ 207 case rf_bad: 208 /* succedents can't fire */ 209 return (RF_TRUE); 210 case rf_recover: 211 /* should never be called in this state */ 212 RF_PANIC(); 213 break; 214 case rf_undone: 215 case rf_panic: 216 /* XXX need to fix this case */ 217 /* for now, assume that we're done */ 218 return (RF_TRUE); 219 default: 220 /* illegal node status */ 221 RF_PANIC(); 222 break; 223 } 224 } 225 #endif 226 227 static int 228 NodeReady(RF_DagNode_t *node) 229 { 230 int ready; 231 232 ready = RF_FALSE; 233 234 switch (node->dagHdr->status) { 235 case rf_enable: 236 case rf_rollForward: 237 if ((node->status == rf_wait) && 238 (node->numAntecedents == node->numAntDone)) 239 ready = RF_TRUE; 240 break; 241 case rf_rollBackward: 242 RF_ASSERT(node->numSuccDone <= node->numSuccedents); 243 RF_ASSERT(node->numSuccFired <= node->numSuccedents); 244 RF_ASSERT(node->numSuccFired <= node->numSuccDone); 245 if ((node->status == rf_good) && 246 (node->numSuccDone == node->numSuccedents)) 247 ready = RF_TRUE; 248 break; 249 default: 250 printf("Execution engine found illegal DAG status in NodeReady\n"); 251 RF_PANIC(); 252 break; 253 } 254 255 return (ready); 256 } 257 258 259 260 /* user context and dag-exec-thread context: Fire a node. The node's 261 * status field determines which function, do or undo, to be fired. 262 * This routine assumes that the node's status field has alread been 263 * set to "fired" or "recover" to indicate the direction of execution. 264 */ 265 static void 266 FireNode(RF_DagNode_t *node) 267 { 268 switch (node->status) { 269 case rf_fired: 270 /* fire the do function of a node */ 271 #if RF_DEBUG_ENGINE 272 if (rf_engineDebug) { 273 printf("raid%d: Firing node 0x%lx (%s)\n", 274 node->dagHdr->raidPtr->raidid, 275 (unsigned long) node, node->name); 276 } 277 #endif 278 if (node->flags & RF_DAGNODE_FLAG_YIELD) { 279 #if defined(__NetBSD__) && defined(_KERNEL) 280 /* thread_block(); */ 281 /* printf("Need to block the thread here...\n"); */ 282 /* XXX thread_block is actually mentioned in 283 * /usr/include/vm/vm_extern.h */ 284 #else 285 thread_block(); 286 #endif 287 } 288 (*(node->doFunc)) (node); 289 break; 290 case rf_recover: 291 /* fire the undo function of a node */ 292 #if RF_DEBUG_ENGINE 293 if (rf_engineDebug) { 294 printf("raid%d: Firing (undo) node 0x%lx (%s)\n", 295 node->dagHdr->raidPtr->raidid, 296 (unsigned long) node, node->name); 297 } 298 #endif 299 if (node->flags & RF_DAGNODE_FLAG_YIELD) 300 #if defined(__NetBSD__) && defined(_KERNEL) 301 /* thread_block(); */ 302 /* printf("Need to block the thread here...\n"); */ 303 /* XXX thread_block is actually mentioned in 304 * /usr/include/vm/vm_extern.h */ 305 #else 306 thread_block(); 307 #endif 308 (*(node->undoFunc)) (node); 309 break; 310 default: 311 RF_PANIC(); 312 break; 313 } 314 } 315 316 317 318 /* user context: 319 * Attempt to fire each node in a linear array. 320 * The entire list is fired atomically. 321 */ 322 static void 323 FireNodeArray(int numNodes, RF_DagNode_t **nodeList) 324 { 325 RF_DagStatus_t dstat; 326 RF_DagNode_t *node; 327 int i, j; 328 329 /* first, mark all nodes which are ready to be fired */ 330 for (i = 0; i < numNodes; i++) { 331 node = nodeList[i]; 332 dstat = node->dagHdr->status; 333 RF_ASSERT((node->status == rf_wait) || 334 (node->status == rf_good)); 335 if (NodeReady(node)) { 336 if ((dstat == rf_enable) || 337 (dstat == rf_rollForward)) { 338 RF_ASSERT(node->status == rf_wait); 339 if (node->commitNode) 340 node->dagHdr->numCommits++; 341 node->status = rf_fired; 342 for (j = 0; j < node->numAntecedents; j++) 343 node->antecedents[j]->numSuccFired++; 344 } else { 345 RF_ASSERT(dstat == rf_rollBackward); 346 RF_ASSERT(node->status == rf_good); 347 /* only one commit node per graph */ 348 RF_ASSERT(node->commitNode == RF_FALSE); 349 node->status = rf_recover; 350 } 351 } 352 } 353 /* now, fire the nodes */ 354 for (i = 0; i < numNodes; i++) { 355 if ((nodeList[i]->status == rf_fired) || 356 (nodeList[i]->status == rf_recover)) 357 FireNode(nodeList[i]); 358 } 359 } 360 361 362 /* user context: 363 * Attempt to fire each node in a linked list. 364 * The entire list is fired atomically. 365 */ 366 static void 367 FireNodeList(RF_DagNode_t *nodeList) 368 { 369 RF_DagNode_t *node, *next; 370 RF_DagStatus_t dstat; 371 int j; 372 373 if (nodeList) { 374 /* first, mark all nodes which are ready to be fired */ 375 for (node = nodeList; node; node = next) { 376 next = node->next; 377 dstat = node->dagHdr->status; 378 RF_ASSERT((node->status == rf_wait) || 379 (node->status == rf_good)); 380 if (NodeReady(node)) { 381 if ((dstat == rf_enable) || 382 (dstat == rf_rollForward)) { 383 RF_ASSERT(node->status == rf_wait); 384 if (node->commitNode) 385 node->dagHdr->numCommits++; 386 node->status = rf_fired; 387 for (j = 0; j < node->numAntecedents; j++) 388 node->antecedents[j]->numSuccFired++; 389 } else { 390 RF_ASSERT(dstat == rf_rollBackward); 391 RF_ASSERT(node->status == rf_good); 392 /* only one commit node per graph */ 393 RF_ASSERT(node->commitNode == RF_FALSE); 394 node->status = rf_recover; 395 } 396 } 397 } 398 /* now, fire the nodes */ 399 for (node = nodeList; node; node = next) { 400 next = node->next; 401 if ((node->status == rf_fired) || 402 (node->status == rf_recover)) 403 FireNode(node); 404 } 405 } 406 } 407 /* interrupt context: 408 * for each succedent 409 * propagate required results from node to succedent 410 * increment succedent's numAntDone 411 * place newly-enable nodes on node queue for firing 412 * 413 * To save context switches, we don't place NIL nodes on the node queue, 414 * but rather just process them as if they had fired. Note that NIL nodes 415 * that are the direct successors of the header will actually get fired by 416 * DispatchDAG, which is fine because no context switches are involved. 417 * 418 * Important: when running at user level, this can be called by any 419 * disk thread, and so the increment and check of the antecedent count 420 * must be locked. I used the node queue mutex and locked down the 421 * entire function, but this is certainly overkill. 422 */ 423 static void 424 PropagateResults(RF_DagNode_t *node, int context) 425 { 426 RF_DagNode_t *s, *a; 427 RF_Raid_t *raidPtr; 428 int i; 429 RF_DagNode_t *finishlist = NULL; /* a list of NIL nodes to be 430 * finished */ 431 RF_DagNode_t *skiplist = NULL; /* list of nodes with failed truedata 432 * antecedents */ 433 RF_DagNode_t *firelist = NULL; /* a list of nodes to be fired */ 434 RF_DagNode_t *q = NULL, *qh = NULL, *next; 435 int j, skipNode; 436 437 raidPtr = node->dagHdr->raidPtr; 438 439 DO_LOCK(raidPtr); 440 441 /* debug - validate fire counts */ 442 for (i = 0; i < node->numAntecedents; i++) { 443 a = *(node->antecedents + i); 444 RF_ASSERT(a->numSuccFired >= a->numSuccDone); 445 RF_ASSERT(a->numSuccFired <= a->numSuccedents); 446 a->numSuccDone++; 447 } 448 449 switch (node->dagHdr->status) { 450 case rf_enable: 451 case rf_rollForward: 452 for (i = 0; i < node->numSuccedents; i++) { 453 s = *(node->succedents + i); 454 RF_ASSERT(s->status == rf_wait); 455 (s->numAntDone)++; 456 if (s->numAntDone == s->numAntecedents) { 457 /* look for NIL nodes */ 458 if (s->doFunc == rf_NullNodeFunc) { 459 /* don't fire NIL nodes, just process 460 * them */ 461 s->next = finishlist; 462 finishlist = s; 463 } else { 464 /* look to see if the node is to be 465 * skipped */ 466 skipNode = RF_FALSE; 467 for (j = 0; j < s->numAntecedents; j++) 468 if ((s->antType[j] == rf_trueData) && (s->antecedents[j]->status == rf_bad)) 469 skipNode = RF_TRUE; 470 if (skipNode) { 471 /* this node has one or more 472 * failed true data 473 * dependencies, so skip it */ 474 s->next = skiplist; 475 skiplist = s; 476 } else 477 /* add s to list of nodes (q) 478 * to execute */ 479 if (context != RF_INTR_CONTEXT) { 480 /* we only have to 481 * enqueue if we're at 482 * intr context */ 483 /* put node on 484 a list to 485 be fired 486 after we 487 unlock */ 488 s->next = firelist; 489 firelist = s; 490 } else { 491 /* enqueue the 492 node for 493 the dag 494 exec thread 495 to fire */ 496 RF_ASSERT(NodeReady(s)); 497 if (q) { 498 q->next = s; 499 q = s; 500 } else { 501 qh = q = s; 502 qh->next = NULL; 503 } 504 } 505 } 506 } 507 } 508 509 if (q) { 510 /* xfer our local list of nodes to the node queue */ 511 q->next = raidPtr->node_queue; 512 raidPtr->node_queue = qh; 513 DO_SIGNAL(raidPtr); 514 } 515 DO_UNLOCK(raidPtr); 516 517 for (; skiplist; skiplist = next) { 518 next = skiplist->next; 519 skiplist->status = rf_skipped; 520 for (i = 0; i < skiplist->numAntecedents; i++) { 521 skiplist->antecedents[i]->numSuccFired++; 522 } 523 if (skiplist->commitNode) { 524 skiplist->dagHdr->numCommits++; 525 } 526 rf_FinishNode(skiplist, context); 527 } 528 for (; finishlist; finishlist = next) { 529 /* NIL nodes: no need to fire them */ 530 next = finishlist->next; 531 finishlist->status = rf_good; 532 for (i = 0; i < finishlist->numAntecedents; i++) { 533 finishlist->antecedents[i]->numSuccFired++; 534 } 535 if (finishlist->commitNode) 536 finishlist->dagHdr->numCommits++; 537 /* 538 * Okay, here we're calling rf_FinishNode() on 539 * nodes that have the null function as their 540 * work proc. Such a node could be the 541 * terminal node in a DAG. If so, it will 542 * cause the DAG to complete, which will in 543 * turn free memory used by the DAG, which 544 * includes the node in question. Thus, we 545 * must avoid referencing the node at all 546 * after calling rf_FinishNode() on it. */ 547 rf_FinishNode(finishlist, context); /* recursive call */ 548 } 549 /* fire all nodes in firelist */ 550 FireNodeList(firelist); 551 break; 552 553 case rf_rollBackward: 554 for (i = 0; i < node->numAntecedents; i++) { 555 a = *(node->antecedents + i); 556 RF_ASSERT(a->status == rf_good); 557 RF_ASSERT(a->numSuccDone <= a->numSuccedents); 558 RF_ASSERT(a->numSuccDone <= a->numSuccFired); 559 560 if (a->numSuccDone == a->numSuccFired) { 561 if (a->undoFunc == rf_NullNodeFunc) { 562 /* don't fire NIL nodes, just process 563 * them */ 564 a->next = finishlist; 565 finishlist = a; 566 } else { 567 if (context != RF_INTR_CONTEXT) { 568 /* we only have to enqueue if 569 * we're at intr context */ 570 /* put node on a list to be 571 fired after we unlock */ 572 a->next = firelist; 573 574 firelist = a; 575 } else { 576 /* enqueue the node for the 577 dag exec thread to fire */ 578 RF_ASSERT(NodeReady(a)); 579 if (q) { 580 q->next = a; 581 q = a; 582 } else { 583 qh = q = a; 584 qh->next = NULL; 585 } 586 } 587 } 588 } 589 } 590 if (q) { 591 /* xfer our local list of nodes to the node queue */ 592 q->next = raidPtr->node_queue; 593 raidPtr->node_queue = qh; 594 DO_SIGNAL(raidPtr); 595 } 596 DO_UNLOCK(raidPtr); 597 for (; finishlist; finishlist = next) { 598 /* NIL nodes: no need to fire them */ 599 next = finishlist->next; 600 finishlist->status = rf_good; 601 /* 602 * Okay, here we're calling rf_FinishNode() on 603 * nodes that have the null function as their 604 * work proc. Such a node could be the first 605 * node in a DAG. If so, it will cause the DAG 606 * to complete, which will in turn free memory 607 * used by the DAG, which includes the node in 608 * question. Thus, we must avoid referencing 609 * the node at all after calling 610 * rf_FinishNode() on it. */ 611 rf_FinishNode(finishlist, context); /* recursive call */ 612 } 613 /* fire all nodes in firelist */ 614 FireNodeList(firelist); 615 616 break; 617 default: 618 printf("Engine found illegal DAG status in PropagateResults()\n"); 619 RF_PANIC(); 620 break; 621 } 622 } 623 624 625 626 /* 627 * Process a fired node which has completed 628 */ 629 static void 630 ProcessNode(RF_DagNode_t *node, int context) 631 { 632 #if RF_DEBUG_ENGINE 633 RF_Raid_t *raidPtr; 634 635 raidPtr = node->dagHdr->raidPtr; 636 #endif 637 638 switch (node->status) { 639 case rf_good: 640 /* normal case, don't need to do anything */ 641 break; 642 case rf_bad: 643 if ((node->dagHdr->numCommits > 0) || 644 (node->dagHdr->numCommitNodes == 0)) { 645 /* crossed commit barrier */ 646 node->dagHdr->status = rf_rollForward; 647 #if RF_DEBUG_ENGINE 648 if (rf_engineDebug) { 649 printf("raid%d: node (%s) returned fail, rolling forward\n", raidPtr->raidid, node->name); 650 } 651 #endif 652 } else { 653 /* never reached commit barrier */ 654 node->dagHdr->status = rf_rollBackward; 655 #if RF_DEBUG_ENGINE 656 if (rf_engineDebug) { 657 printf("raid%d: node (%s) returned fail, rolling backward\n", raidPtr->raidid, node->name); 658 } 659 #endif 660 } 661 break; 662 case rf_undone: 663 /* normal rollBackward case, don't need to do anything */ 664 break; 665 case rf_panic: 666 /* an undo node failed!!! */ 667 printf("UNDO of a node failed!!!\n"); 668 break; 669 default: 670 printf("node finished execution with an illegal status!!!\n"); 671 RF_PANIC(); 672 break; 673 } 674 675 /* enqueue node's succedents (antecedents if rollBackward) for 676 * execution */ 677 PropagateResults(node, context); 678 } 679 680 681 682 /* user context or dag-exec-thread context: 683 * This is the first step in post-processing a newly-completed node. 684 * This routine is called by each node execution function to mark the node 685 * as complete and fire off any successors that have been enabled. 686 */ 687 int 688 rf_FinishNode(RF_DagNode_t *node, int context) 689 { 690 int retcode = RF_FALSE; 691 node->dagHdr->numNodesCompleted++; 692 ProcessNode(node, context); 693 694 return (retcode); 695 } 696 697 698 /* user context: submit dag for execution, return non-zero if we have 699 * to wait for completion. if and only if we return non-zero, we'll 700 * cause cbFunc to get invoked with cbArg when the DAG has completed. 701 * 702 * for now we always return 1. If the DAG does not cause any I/O, 703 * then the callback may get invoked before DispatchDAG returns. 704 * There's code in state 5 of ContinueRaidAccess to handle this. 705 * 706 * All we do here is fire the direct successors of the header node. 707 * The DAG execution thread does the rest of the dag processing. */ 708 int 709 rf_DispatchDAG(RF_DagHeader_t *dag, void (*cbFunc) (void *), 710 void *cbArg) 711 { 712 RF_Raid_t *raidPtr; 713 714 raidPtr = dag->raidPtr; 715 #if RF_ACC_TRACE > 0 716 if (dag->tracerec) { 717 RF_ETIMER_START(dag->tracerec->timer); 718 } 719 #endif 720 #if DEBUG 721 #if RF_DEBUG_VALIDATE_DAG 722 if (rf_engineDebug || rf_validateDAGDebug) { 723 if (rf_ValidateDAG(dag)) 724 RF_PANIC(); 725 } 726 #endif 727 #endif 728 #if RF_DEBUG_ENGINE 729 if (rf_engineDebug) { 730 printf("raid%d: Entering DispatchDAG\n", raidPtr->raidid); 731 } 732 #endif 733 raidPtr->dags_in_flight++; /* debug only: blow off proper 734 * locking */ 735 dag->cbFunc = cbFunc; 736 dag->cbArg = cbArg; 737 dag->numNodesCompleted = 0; 738 dag->status = rf_enable; 739 FireNodeArray(dag->numSuccedents, dag->succedents); 740 return (1); 741 } 742 /* dedicated kernel thread: the thread that handles all DAG node 743 * firing. To minimize locking and unlocking, we grab a copy of the 744 * entire node queue and then set the node queue to NULL before doing 745 * any firing of nodes. This way we only have to release the lock 746 * once. Of course, it's probably rare that there's more than one 747 * node in the queue at any one time, but it sometimes happens. 748 */ 749 750 static void 751 DAGExecutionThread(RF_ThreadArg_t arg) 752 { 753 RF_DagNode_t *nd, *local_nq, *term_nq, *fire_nq; 754 RF_Raid_t *raidPtr; 755 756 raidPtr = (RF_Raid_t *) arg; 757 758 #if RF_DEBUG_ENGINE 759 if (rf_engineDebug) { 760 printf("raid%d: Engine thread is running\n", raidPtr->raidid); 761 } 762 #endif 763 764 DO_LOCK(raidPtr); 765 while (!raidPtr->shutdown_engine) { 766 767 while (raidPtr->node_queue != NULL) { 768 local_nq = raidPtr->node_queue; 769 fire_nq = NULL; 770 term_nq = NULL; 771 raidPtr->node_queue = NULL; 772 DO_UNLOCK(raidPtr); 773 774 /* first, strip out the terminal nodes */ 775 while (local_nq) { 776 nd = local_nq; 777 local_nq = local_nq->next; 778 switch (nd->dagHdr->status) { 779 case rf_enable: 780 case rf_rollForward: 781 if (nd->numSuccedents == 0) { 782 /* end of the dag, add to 783 * callback list */ 784 nd->next = term_nq; 785 term_nq = nd; 786 } else { 787 /* not the end, add to the 788 * fire queue */ 789 nd->next = fire_nq; 790 fire_nq = nd; 791 } 792 break; 793 case rf_rollBackward: 794 if (nd->numAntecedents == 0) { 795 /* end of the dag, add to the 796 * callback list */ 797 nd->next = term_nq; 798 term_nq = nd; 799 } else { 800 /* not the end, add to the 801 * fire queue */ 802 nd->next = fire_nq; 803 fire_nq = nd; 804 } 805 break; 806 default: 807 RF_PANIC(); 808 break; 809 } 810 } 811 812 /* execute callback of dags which have reached the 813 * terminal node */ 814 while (term_nq) { 815 nd = term_nq; 816 term_nq = term_nq->next; 817 nd->next = NULL; 818 (nd->dagHdr->cbFunc) (nd->dagHdr->cbArg); 819 raidPtr->dags_in_flight--; /* debug only */ 820 } 821 822 /* fire remaining nodes */ 823 FireNodeList(fire_nq); 824 825 DO_LOCK(raidPtr); 826 } 827 while (!raidPtr->shutdown_engine && 828 raidPtr->node_queue == NULL) { 829 DO_WAIT(raidPtr); 830 } 831 } 832 833 /* Let rf_ShutdownEngine know that we're done... */ 834 raidPtr->shutdown_engine = 0; 835 DO_SIGNAL(raidPtr); 836 837 DO_UNLOCK(raidPtr); 838 839 kthread_exit(0); 840 } 841 842 /* 843 * rf_RaidIOThread() -- When I/O to a component begins, raidstrategy() 844 * puts the I/O on a buffer queue, and then signals raidPtr->iodone. If 845 * necessary, this function calls raidstart() to initiate the I/O. 846 * When I/O to a component completes, KernelWakeupFunc() puts the 847 * completed request onto raidPtr->iodone TAILQ. This function looks 848 * after requests on that queue by calling rf_DiskIOComplete() for the 849 * request, and by calling any required CompleteFunc for the request. 850 */ 851 852 static void 853 rf_RaidIOThread(RF_ThreadArg_t arg) 854 { 855 RF_Raid_t *raidPtr; 856 RF_DiskQueueData_t *req; 857 858 raidPtr = (RF_Raid_t *) arg; 859 860 rf_lock_mutex2(raidPtr->iodone_lock); 861 862 while (!raidPtr->shutdown_raidio) { 863 /* if there is nothing to do, then snooze. */ 864 if (TAILQ_EMPTY(&(raidPtr->iodone)) && 865 rf_buf_queue_check(raidPtr)) { 866 rf_wait_cond2(raidPtr->iodone_cv, raidPtr->iodone_lock); 867 } 868 869 /* Check for deferred parity-map-related work. */ 870 if (raidPtr->parity_map != NULL) { 871 rf_unlock_mutex2(raidPtr->iodone_lock); 872 rf_paritymap_checkwork(raidPtr->parity_map); 873 rf_lock_mutex2(raidPtr->iodone_lock); 874 } 875 876 /* See what I/Os, if any, have arrived */ 877 while ((req = TAILQ_FIRST(&(raidPtr->iodone))) != NULL) { 878 TAILQ_REMOVE(&(raidPtr->iodone), req, iodone_entries); 879 rf_unlock_mutex2(raidPtr->iodone_lock); 880 rf_DiskIOComplete(req->queue, req, req->error); 881 (req->CompleteFunc) (req->argument, req->error); 882 rf_lock_mutex2(raidPtr->iodone_lock); 883 } 884 885 /* process any pending outgoing IO */ 886 rf_unlock_mutex2(raidPtr->iodone_lock); 887 raidstart(raidPtr); 888 rf_lock_mutex2(raidPtr->iodone_lock); 889 890 } 891 892 /* Let rf_ShutdownEngine know that we're done... */ 893 raidPtr->shutdown_raidio = 0; 894 rf_signal_cond2(raidPtr->iodone_cv); 895 896 rf_unlock_mutex2(raidPtr->iodone_lock); 897 898 kthread_exit(0); 899 } 900