1 /* $NetBSD: rf_engine.c,v 1.21 2002/10/02 21:48:00 oster Exp $ */ 2 /* 3 * Copyright (c) 1995 Carnegie-Mellon University. 4 * All rights reserved. 5 * 6 * Author: William V. Courtright II, Mark Holland, Rachad Youssef 7 * 8 * Permission to use, copy, modify and distribute this software and 9 * its documentation is hereby granted, provided that both the copyright 10 * notice and this permission notice appear in all copies of the 11 * software, derivative works or modified versions, and any portions 12 * thereof, and that both notices appear in supporting documentation. 13 * 14 * CARNEGIE MELLON ALLOWS FREE USE OF THIS SOFTWARE IN ITS "AS IS" 15 * CONDITION. CARNEGIE MELLON DISCLAIMS ANY LIABILITY OF ANY KIND 16 * FOR ANY DAMAGES WHATSOEVER RESULTING FROM THE USE OF THIS SOFTWARE. 17 * 18 * Carnegie Mellon requests users of this software to return to 19 * 20 * Software Distribution Coordinator or Software.Distribution@CS.CMU.EDU 21 * School of Computer Science 22 * Carnegie Mellon University 23 * Pittsburgh PA 15213-3890 24 * 25 * any improvements or extensions that they make and grant Carnegie the 26 * rights to redistribute these changes. 27 */ 28 29 /**************************************************************************** 30 * * 31 * engine.c -- code for DAG execution engine * 32 * * 33 * Modified to work as follows (holland): * 34 * A user-thread calls into DispatchDAG, which fires off the nodes that * 35 * are direct successors to the header node. DispatchDAG then returns, * 36 * and the rest of the I/O continues asynchronously. As each node * 37 * completes, the node execution function calls FinishNode(). FinishNode * 38 * scans the list of successors to the node and increments the antecedent * 39 * counts. Each node that becomes enabled is placed on a central node * 40 * queue. A dedicated dag-execution thread grabs nodes off of this * 41 * queue and fires them. * 42 * * 43 * NULL nodes are never fired. * 44 * * 45 * Terminator nodes are never fired, but rather cause the callback * 46 * associated with the DAG to be invoked. * 47 * * 48 * If a node fails, the dag either rolls forward to the completion or * 49 * rolls back, undoing previously-completed nodes and fails atomically. * 50 * The direction of recovery is determined by the location of the failed * 51 * node in the graph. If the failure occurred before the commit node in * 52 * the graph, backward recovery is used. Otherwise, forward recovery is * 53 * used. * 54 * * 55 ****************************************************************************/ 56 57 #include <sys/cdefs.h> 58 __KERNEL_RCSID(0, "$NetBSD: rf_engine.c,v 1.21 2002/10/02 21:48:00 oster Exp $"); 59 60 #include "rf_threadstuff.h" 61 62 #include <sys/errno.h> 63 64 #include "rf_dag.h" 65 #include "rf_engine.h" 66 #include "rf_etimer.h" 67 #include "rf_general.h" 68 #include "rf_dagutils.h" 69 #include "rf_shutdown.h" 70 #include "rf_raid.h" 71 72 static void DAGExecutionThread(RF_ThreadArg_t arg); 73 74 #define DO_INIT(_l_,_r_) { \ 75 int _rc; \ 76 _rc = rf_create_managed_mutex(_l_,&(_r_)->node_queue_mutex); \ 77 if (_rc) { \ 78 return(_rc); \ 79 } \ 80 _rc = rf_create_managed_cond(_l_,&(_r_)->node_queue_cond); \ 81 if (_rc) { \ 82 return(_rc); \ 83 } \ 84 } 85 86 /* synchronization primitives for this file. DO_WAIT should be enclosed in a while loop. */ 87 88 #define DO_LOCK(_r_) \ 89 do { \ 90 ks = splbio(); \ 91 RF_LOCK_MUTEX((_r_)->node_queue_mutex); \ 92 } while (0) 93 94 #define DO_UNLOCK(_r_) \ 95 do { \ 96 RF_UNLOCK_MUTEX((_r_)->node_queue_mutex); \ 97 splx(ks); \ 98 } while (0) 99 100 #define DO_WAIT(_r_) \ 101 RF_WAIT_COND((_r_)->node_queue, (_r_)->node_queue_mutex) 102 103 #define DO_SIGNAL(_r_) \ 104 RF_BROADCAST_COND((_r_)->node_queue) /* XXX RF_SIGNAL_COND? */ 105 106 static void rf_ShutdownEngine(void *); 107 108 static void 109 rf_ShutdownEngine(arg) 110 void *arg; 111 { 112 RF_Raid_t *raidPtr; 113 114 raidPtr = (RF_Raid_t *) arg; 115 raidPtr->shutdown_engine = 1; 116 DO_SIGNAL(raidPtr); 117 } 118 119 int 120 rf_ConfigureEngine( 121 RF_ShutdownList_t ** listp, 122 RF_Raid_t * raidPtr, 123 RF_Config_t * cfgPtr) 124 { 125 int rc; 126 127 DO_INIT(listp, raidPtr); 128 129 raidPtr->node_queue = NULL; 130 raidPtr->dags_in_flight = 0; 131 132 rc = rf_init_managed_threadgroup(listp, &raidPtr->engine_tg); 133 if (rc) 134 return (rc); 135 136 /* we create the execution thread only once per system boot. no need 137 * to check return code b/c the kernel panics if it can't create the 138 * thread. */ 139 if (rf_engineDebug) { 140 printf("raid%d: Creating engine thread\n", raidPtr->raidid); 141 } 142 if (RF_CREATE_ENGINE_THREAD(raidPtr->engine_thread, DAGExecutionThread, raidPtr,"raid%d",raidPtr->raidid)) { 143 RF_ERRORMSG("RAIDFRAME: Unable to create engine thread\n"); 144 return (ENOMEM); 145 } 146 if (rf_engineDebug) { 147 printf("raid%d: Created engine thread\n", raidPtr->raidid); 148 } 149 RF_THREADGROUP_STARTED(&raidPtr->engine_tg); 150 /* XXX something is missing here... */ 151 #ifdef debug 152 printf("Skipping the WAIT_START!!\n"); 153 #endif 154 #if 0 155 RF_THREADGROUP_WAIT_START(&raidPtr->engine_tg); 156 #endif 157 /* engine thread is now running and waiting for work */ 158 if (rf_engineDebug) { 159 printf("raid%d: Engine thread running and waiting for events\n", raidPtr->raidid); 160 } 161 rc = rf_ShutdownCreate(listp, rf_ShutdownEngine, raidPtr); 162 if (rc) { 163 rf_print_unable_to_add_shutdown(__FILE__, __LINE__, rc); 164 rf_ShutdownEngine(NULL); 165 } 166 return (rc); 167 } 168 169 static int 170 BranchDone(RF_DagNode_t * node) 171 { 172 int i; 173 174 /* return true if forward execution is completed for a node and it's 175 * succedents */ 176 switch (node->status) { 177 case rf_wait: 178 /* should never be called in this state */ 179 RF_PANIC(); 180 break; 181 case rf_fired: 182 /* node is currently executing, so we're not done */ 183 return (RF_FALSE); 184 case rf_good: 185 for (i = 0; i < node->numSuccedents; i++) /* for each succedent */ 186 if (!BranchDone(node->succedents[i])) /* recursively check 187 * branch */ 188 return RF_FALSE; 189 return RF_TRUE; /* node and all succedent branches aren't in 190 * fired state */ 191 case rf_bad: 192 /* succedents can't fire */ 193 return (RF_TRUE); 194 case rf_recover: 195 /* should never be called in this state */ 196 RF_PANIC(); 197 break; 198 case rf_undone: 199 case rf_panic: 200 /* XXX need to fix this case */ 201 /* for now, assume that we're done */ 202 return (RF_TRUE); 203 default: 204 /* illegal node status */ 205 RF_PANIC(); 206 break; 207 } 208 } 209 210 static int 211 NodeReady(RF_DagNode_t * node) 212 { 213 int ready; 214 215 switch (node->dagHdr->status) { 216 case rf_enable: 217 case rf_rollForward: 218 if ((node->status == rf_wait) && (node->numAntecedents == node->numAntDone)) 219 ready = RF_TRUE; 220 else 221 ready = RF_FALSE; 222 break; 223 case rf_rollBackward: 224 RF_ASSERT(node->numSuccDone <= node->numSuccedents); 225 RF_ASSERT(node->numSuccFired <= node->numSuccedents); 226 RF_ASSERT(node->numSuccFired <= node->numSuccDone); 227 if ((node->status == rf_good) && (node->numSuccDone == node->numSuccedents)) 228 ready = RF_TRUE; 229 else 230 ready = RF_FALSE; 231 break; 232 default: 233 printf("Execution engine found illegal DAG status in NodeReady\n"); 234 RF_PANIC(); 235 break; 236 } 237 238 return (ready); 239 } 240 241 242 243 /* user context and dag-exec-thread context: 244 * Fire a node. The node's status field determines which function, do or undo, 245 * to be fired. 246 * This routine assumes that the node's status field has alread been set to 247 * "fired" or "recover" to indicate the direction of execution. 248 */ 249 static void 250 FireNode(RF_DagNode_t * node) 251 { 252 switch (node->status) { 253 case rf_fired: 254 /* fire the do function of a node */ 255 if (rf_engineDebug) { 256 printf("raid%d: Firing node 0x%lx (%s)\n", 257 node->dagHdr->raidPtr->raidid, 258 (unsigned long) node, node->name); 259 } 260 if (node->flags & RF_DAGNODE_FLAG_YIELD) { 261 #if defined(__NetBSD__) && defined(_KERNEL) 262 /* thread_block(); */ 263 /* printf("Need to block the thread here...\n"); */ 264 /* XXX thread_block is actually mentioned in 265 * /usr/include/vm/vm_extern.h */ 266 #else 267 thread_block(); 268 #endif 269 } 270 (*(node->doFunc)) (node); 271 break; 272 case rf_recover: 273 /* fire the undo function of a node */ 274 if (rf_engineDebug) { 275 printf("raid%d: Firing (undo) node 0x%lx (%s)\n", 276 node->dagHdr->raidPtr->raidid, 277 (unsigned long) node, node->name); 278 } 279 if (node->flags & RF_DAGNODE_FLAG_YIELD) 280 #if defined(__NetBSD__) && defined(_KERNEL) 281 /* thread_block(); */ 282 /* printf("Need to block the thread here...\n"); */ 283 /* XXX thread_block is actually mentioned in 284 * /usr/include/vm/vm_extern.h */ 285 #else 286 thread_block(); 287 #endif 288 (*(node->undoFunc)) (node); 289 break; 290 default: 291 RF_PANIC(); 292 break; 293 } 294 } 295 296 297 298 /* user context: 299 * Attempt to fire each node in a linear array. 300 * The entire list is fired atomically. 301 */ 302 static void 303 FireNodeArray( 304 int numNodes, 305 RF_DagNode_t ** nodeList) 306 { 307 RF_DagStatus_t dstat; 308 RF_DagNode_t *node; 309 int i, j; 310 311 /* first, mark all nodes which are ready to be fired */ 312 for (i = 0; i < numNodes; i++) { 313 node = nodeList[i]; 314 dstat = node->dagHdr->status; 315 RF_ASSERT((node->status == rf_wait) || (node->status == rf_good)); 316 if (NodeReady(node)) { 317 if ((dstat == rf_enable) || (dstat == rf_rollForward)) { 318 RF_ASSERT(node->status == rf_wait); 319 if (node->commitNode) 320 node->dagHdr->numCommits++; 321 node->status = rf_fired; 322 for (j = 0; j < node->numAntecedents; j++) 323 node->antecedents[j]->numSuccFired++; 324 } else { 325 RF_ASSERT(dstat == rf_rollBackward); 326 RF_ASSERT(node->status == rf_good); 327 RF_ASSERT(node->commitNode == RF_FALSE); /* only one commit node 328 * per graph */ 329 node->status = rf_recover; 330 } 331 } 332 } 333 /* now, fire the nodes */ 334 for (i = 0; i < numNodes; i++) { 335 if ((nodeList[i]->status == rf_fired) || (nodeList[i]->status == rf_recover)) 336 FireNode(nodeList[i]); 337 } 338 } 339 340 341 /* user context: 342 * Attempt to fire each node in a linked list. 343 * The entire list is fired atomically. 344 */ 345 static void 346 FireNodeList(RF_DagNode_t * nodeList) 347 { 348 RF_DagNode_t *node, *next; 349 RF_DagStatus_t dstat; 350 int j; 351 352 if (nodeList) { 353 /* first, mark all nodes which are ready to be fired */ 354 for (node = nodeList; node; node = next) { 355 next = node->next; 356 dstat = node->dagHdr->status; 357 RF_ASSERT((node->status == rf_wait) || (node->status == rf_good)); 358 if (NodeReady(node)) { 359 if ((dstat == rf_enable) || (dstat == rf_rollForward)) { 360 RF_ASSERT(node->status == rf_wait); 361 if (node->commitNode) 362 node->dagHdr->numCommits++; 363 node->status = rf_fired; 364 for (j = 0; j < node->numAntecedents; j++) 365 node->antecedents[j]->numSuccFired++; 366 } else { 367 RF_ASSERT(dstat == rf_rollBackward); 368 RF_ASSERT(node->status == rf_good); 369 RF_ASSERT(node->commitNode == RF_FALSE); /* only one commit node 370 * per graph */ 371 node->status = rf_recover; 372 } 373 } 374 } 375 /* now, fire the nodes */ 376 for (node = nodeList; node; node = next) { 377 next = node->next; 378 if ((node->status == rf_fired) || (node->status == rf_recover)) 379 FireNode(node); 380 } 381 } 382 } 383 /* interrupt context: 384 * for each succedent 385 * propagate required results from node to succedent 386 * increment succedent's numAntDone 387 * place newly-enable nodes on node queue for firing 388 * 389 * To save context switches, we don't place NIL nodes on the node queue, 390 * but rather just process them as if they had fired. Note that NIL nodes 391 * that are the direct successors of the header will actually get fired by 392 * DispatchDAG, which is fine because no context switches are involved. 393 * 394 * Important: when running at user level, this can be called by any 395 * disk thread, and so the increment and check of the antecedent count 396 * must be locked. I used the node queue mutex and locked down the 397 * entire function, but this is certainly overkill. 398 */ 399 static void 400 PropagateResults( 401 RF_DagNode_t * node, 402 int context) 403 { 404 RF_DagNode_t *s, *a; 405 RF_Raid_t *raidPtr; 406 int i, ks; 407 RF_DagNode_t *finishlist = NULL; /* a list of NIL nodes to be 408 * finished */ 409 RF_DagNode_t *skiplist = NULL; /* list of nodes with failed truedata 410 * antecedents */ 411 RF_DagNode_t *firelist = NULL; /* a list of nodes to be fired */ 412 RF_DagNode_t *q = NULL, *qh = NULL, *next; 413 int j, skipNode; 414 415 raidPtr = node->dagHdr->raidPtr; 416 417 DO_LOCK(raidPtr); 418 419 /* debug - validate fire counts */ 420 for (i = 0; i < node->numAntecedents; i++) { 421 a = *(node->antecedents + i); 422 RF_ASSERT(a->numSuccFired >= a->numSuccDone); 423 RF_ASSERT(a->numSuccFired <= a->numSuccedents); 424 a->numSuccDone++; 425 } 426 427 switch (node->dagHdr->status) { 428 case rf_enable: 429 case rf_rollForward: 430 for (i = 0; i < node->numSuccedents; i++) { 431 s = *(node->succedents + i); 432 RF_ASSERT(s->status == rf_wait); 433 (s->numAntDone)++; 434 if (s->numAntDone == s->numAntecedents) { 435 /* look for NIL nodes */ 436 if (s->doFunc == rf_NullNodeFunc) { 437 /* don't fire NIL nodes, just process 438 * them */ 439 s->next = finishlist; 440 finishlist = s; 441 } else { 442 /* look to see if the node is to be 443 * skipped */ 444 skipNode = RF_FALSE; 445 for (j = 0; j < s->numAntecedents; j++) 446 if ((s->antType[j] == rf_trueData) && (s->antecedents[j]->status == rf_bad)) 447 skipNode = RF_TRUE; 448 if (skipNode) { 449 /* this node has one or more 450 * failed true data 451 * dependencies, so skip it */ 452 s->next = skiplist; 453 skiplist = s; 454 } else 455 /* add s to list of nodes (q) 456 * to execute */ 457 if (context != RF_INTR_CONTEXT) { 458 /* we only have to 459 * enqueue if we're at 460 * intr context */ 461 s->next = firelist; /* put node on a list to 462 * be fired after we 463 * unlock */ 464 firelist = s; 465 } else { /* enqueue the node for 466 * the dag exec thread 467 * to fire */ 468 RF_ASSERT(NodeReady(s)); 469 if (q) { 470 q->next = s; 471 q = s; 472 } else { 473 qh = q = s; 474 qh->next = NULL; 475 } 476 } 477 } 478 } 479 } 480 481 if (q) { 482 /* xfer our local list of nodes to the node queue */ 483 q->next = raidPtr->node_queue; 484 raidPtr->node_queue = qh; 485 DO_SIGNAL(raidPtr); 486 } 487 DO_UNLOCK(raidPtr); 488 489 for (; skiplist; skiplist = next) { 490 next = skiplist->next; 491 skiplist->status = rf_skipped; 492 for (i = 0; i < skiplist->numAntecedents; i++) { 493 skiplist->antecedents[i]->numSuccFired++; 494 } 495 if (skiplist->commitNode) { 496 skiplist->dagHdr->numCommits++; 497 } 498 rf_FinishNode(skiplist, context); 499 } 500 for (; finishlist; finishlist = next) { 501 /* NIL nodes: no need to fire them */ 502 next = finishlist->next; 503 finishlist->status = rf_good; 504 for (i = 0; i < finishlist->numAntecedents; i++) { 505 finishlist->antecedents[i]->numSuccFired++; 506 } 507 if (finishlist->commitNode) 508 finishlist->dagHdr->numCommits++; 509 /* 510 * Okay, here we're calling rf_FinishNode() on nodes that 511 * have the null function as their work proc. Such a node 512 * could be the terminal node in a DAG. If so, it will 513 * cause the DAG to complete, which will in turn free 514 * memory used by the DAG, which includes the node in 515 * question. Thus, we must avoid referencing the node 516 * at all after calling rf_FinishNode() on it. 517 */ 518 rf_FinishNode(finishlist, context); /* recursive call */ 519 } 520 /* fire all nodes in firelist */ 521 FireNodeList(firelist); 522 break; 523 524 case rf_rollBackward: 525 for (i = 0; i < node->numAntecedents; i++) { 526 a = *(node->antecedents + i); 527 RF_ASSERT(a->status == rf_good); 528 RF_ASSERT(a->numSuccDone <= a->numSuccedents); 529 RF_ASSERT(a->numSuccDone <= a->numSuccFired); 530 531 if (a->numSuccDone == a->numSuccFired) { 532 if (a->undoFunc == rf_NullNodeFunc) { 533 /* don't fire NIL nodes, just process 534 * them */ 535 a->next = finishlist; 536 finishlist = a; 537 } else { 538 if (context != RF_INTR_CONTEXT) { 539 /* we only have to enqueue if 540 * we're at intr context */ 541 a->next = firelist; /* put node on a list to 542 * be fired after we 543 * unlock */ 544 firelist = a; 545 } else { /* enqueue the node for 546 * the dag exec thread 547 * to fire */ 548 RF_ASSERT(NodeReady(a)); 549 if (q) { 550 q->next = a; 551 q = a; 552 } else { 553 qh = q = a; 554 qh->next = NULL; 555 } 556 } 557 } 558 } 559 } 560 if (q) { 561 /* xfer our local list of nodes to the node queue */ 562 q->next = raidPtr->node_queue; 563 raidPtr->node_queue = qh; 564 DO_SIGNAL(raidPtr); 565 } 566 DO_UNLOCK(raidPtr); 567 for (; finishlist; finishlist = next) { /* NIL nodes: no need to 568 * fire them */ 569 next = finishlist->next; 570 finishlist->status = rf_good; 571 /* 572 * Okay, here we're calling rf_FinishNode() on nodes that 573 * have the null function as their work proc. Such a node 574 * could be the first node in a DAG. If so, it will 575 * cause the DAG to complete, which will in turn free 576 * memory used by the DAG, which includes the node in 577 * question. Thus, we must avoid referencing the node 578 * at all after calling rf_FinishNode() on it. 579 */ 580 rf_FinishNode(finishlist, context); /* recursive call */ 581 } 582 /* fire all nodes in firelist */ 583 FireNodeList(firelist); 584 585 break; 586 default: 587 printf("Engine found illegal DAG status in PropagateResults()\n"); 588 RF_PANIC(); 589 break; 590 } 591 } 592 593 594 595 /* 596 * Process a fired node which has completed 597 */ 598 static void 599 ProcessNode( 600 RF_DagNode_t * node, 601 int context) 602 { 603 RF_Raid_t *raidPtr; 604 605 raidPtr = node->dagHdr->raidPtr; 606 607 switch (node->status) { 608 case rf_good: 609 /* normal case, don't need to do anything */ 610 break; 611 case rf_bad: 612 if ((node->dagHdr->numCommits > 0) || (node->dagHdr->numCommitNodes == 0)) { 613 node->dagHdr->status = rf_rollForward; /* crossed commit 614 * barrier */ 615 if (rf_engineDebug || 1) { 616 printf("raid%d: node (%s) returned fail, rolling forward\n", raidPtr->raidid, node->name); 617 } 618 } else { 619 node->dagHdr->status = rf_rollBackward; /* never reached commit 620 * barrier */ 621 if (rf_engineDebug || 1) { 622 printf("raid%d: node (%s) returned fail, rolling backward\n", raidPtr->raidid, node->name); 623 } 624 } 625 break; 626 case rf_undone: 627 /* normal rollBackward case, don't need to do anything */ 628 break; 629 case rf_panic: 630 /* an undo node failed!!! */ 631 printf("UNDO of a node failed!!!/n"); 632 break; 633 default: 634 printf("node finished execution with an illegal status!!!\n"); 635 RF_PANIC(); 636 break; 637 } 638 639 /* enqueue node's succedents (antecedents if rollBackward) for 640 * execution */ 641 PropagateResults(node, context); 642 } 643 644 645 646 /* user context or dag-exec-thread context: 647 * This is the first step in post-processing a newly-completed node. 648 * This routine is called by each node execution function to mark the node 649 * as complete and fire off any successors that have been enabled. 650 */ 651 int 652 rf_FinishNode( 653 RF_DagNode_t * node, 654 int context) 655 { 656 int retcode = RF_FALSE; 657 node->dagHdr->numNodesCompleted++; 658 ProcessNode(node, context); 659 660 return (retcode); 661 } 662 663 664 /* user context: 665 * submit dag for execution, return non-zero if we have to wait for completion. 666 * if and only if we return non-zero, we'll cause cbFunc to get invoked with 667 * cbArg when the DAG has completed. 668 * 669 * for now we always return 1. If the DAG does not cause any I/O, then the callback 670 * may get invoked before DispatchDAG returns. There's code in state 5 of ContinueRaidAccess 671 * to handle this. 672 * 673 * All we do here is fire the direct successors of the header node. The 674 * DAG execution thread does the rest of the dag processing. 675 */ 676 int 677 rf_DispatchDAG( 678 RF_DagHeader_t * dag, 679 void (*cbFunc) (void *), 680 void *cbArg) 681 { 682 RF_Raid_t *raidPtr; 683 684 raidPtr = dag->raidPtr; 685 if (dag->tracerec) { 686 RF_ETIMER_START(dag->tracerec->timer); 687 } 688 #if DEBUG 689 #if RF_DEBUG_VALIDATE_DAG 690 if (rf_engineDebug || rf_validateDAGDebug) { 691 if (rf_ValidateDAG(dag)) 692 RF_PANIC(); 693 } 694 #endif 695 #endif 696 if (rf_engineDebug) { 697 printf("raid%d: Entering DispatchDAG\n", raidPtr->raidid); 698 } 699 raidPtr->dags_in_flight++; /* debug only: blow off proper 700 * locking */ 701 dag->cbFunc = cbFunc; 702 dag->cbArg = cbArg; 703 dag->numNodesCompleted = 0; 704 dag->status = rf_enable; 705 FireNodeArray(dag->numSuccedents, dag->succedents); 706 return (1); 707 } 708 /* dedicated kernel thread: 709 * the thread that handles all DAG node firing. 710 * To minimize locking and unlocking, we grab a copy of the entire node queue and then set the 711 * node queue to NULL before doing any firing of nodes. This way we only have to release the 712 * lock once. Of course, it's probably rare that there's more than one node in the queue at 713 * any one time, but it sometimes happens. 714 * 715 * In the kernel, this thread runs at spl0 and is not swappable. I copied these 716 * characteristics from the aio_completion_thread. 717 */ 718 719 static void 720 DAGExecutionThread(RF_ThreadArg_t arg) 721 { 722 RF_DagNode_t *nd, *local_nq, *term_nq, *fire_nq; 723 RF_Raid_t *raidPtr; 724 int ks; 725 int s; 726 727 raidPtr = (RF_Raid_t *) arg; 728 729 if (rf_engineDebug) { 730 printf("raid%d: Engine thread is running\n", raidPtr->raidid); 731 } 732 733 s = splbio(); 734 735 RF_THREADGROUP_RUNNING(&raidPtr->engine_tg); 736 737 DO_LOCK(raidPtr); 738 while (!raidPtr->shutdown_engine) { 739 740 while (raidPtr->node_queue != NULL) { 741 local_nq = raidPtr->node_queue; 742 fire_nq = NULL; 743 term_nq = NULL; 744 raidPtr->node_queue = NULL; 745 DO_UNLOCK(raidPtr); 746 747 /* first, strip out the terminal nodes */ 748 while (local_nq) { 749 nd = local_nq; 750 local_nq = local_nq->next; 751 switch (nd->dagHdr->status) { 752 case rf_enable: 753 case rf_rollForward: 754 if (nd->numSuccedents == 0) { 755 /* end of the dag, add to 756 * callback list */ 757 nd->next = term_nq; 758 term_nq = nd; 759 } else { 760 /* not the end, add to the 761 * fire queue */ 762 nd->next = fire_nq; 763 fire_nq = nd; 764 } 765 break; 766 case rf_rollBackward: 767 if (nd->numAntecedents == 0) { 768 /* end of the dag, add to the 769 * callback list */ 770 nd->next = term_nq; 771 term_nq = nd; 772 } else { 773 /* not the end, add to the 774 * fire queue */ 775 nd->next = fire_nq; 776 fire_nq = nd; 777 } 778 break; 779 default: 780 RF_PANIC(); 781 break; 782 } 783 } 784 785 /* execute callback of dags which have reached the 786 * terminal node */ 787 while (term_nq) { 788 nd = term_nq; 789 term_nq = term_nq->next; 790 nd->next = NULL; 791 (nd->dagHdr->cbFunc) (nd->dagHdr->cbArg); 792 raidPtr->dags_in_flight--; /* debug only */ 793 } 794 795 /* fire remaining nodes */ 796 FireNodeList(fire_nq); 797 798 DO_LOCK(raidPtr); 799 } 800 while (!raidPtr->shutdown_engine && 801 raidPtr->node_queue == NULL) { 802 DO_UNLOCK(raidPtr); 803 tsleep(&(raidPtr->node_queue), PRIBIO, "rfwcond", 0); 804 DO_LOCK(raidPtr); 805 } 806 } 807 DO_UNLOCK(raidPtr); 808 809 RF_THREADGROUP_DONE(&raidPtr->engine_tg); 810 811 splx(s); 812 kthread_exit(0); 813 } 814