1 /* $NetBSD: rf_engine.c,v 1.10 2000/08/20 16:51:03 thorpej Exp $ */ 2 /* 3 * Copyright (c) 1995 Carnegie-Mellon University. 4 * All rights reserved. 5 * 6 * Author: William V. Courtright II, Mark Holland, Rachad Youssef 7 * 8 * Permission to use, copy, modify and distribute this software and 9 * its documentation is hereby granted, provided that both the copyright 10 * notice and this permission notice appear in all copies of the 11 * software, derivative works or modified versions, and any portions 12 * thereof, and that both notices appear in supporting documentation. 13 * 14 * CARNEGIE MELLON ALLOWS FREE USE OF THIS SOFTWARE IN ITS "AS IS" 15 * CONDITION. CARNEGIE MELLON DISCLAIMS ANY LIABILITY OF ANY KIND 16 * FOR ANY DAMAGES WHATSOEVER RESULTING FROM THE USE OF THIS SOFTWARE. 17 * 18 * Carnegie Mellon requests users of this software to return to 19 * 20 * Software Distribution Coordinator or Software.Distribution@CS.CMU.EDU 21 * School of Computer Science 22 * Carnegie Mellon University 23 * Pittsburgh PA 15213-3890 24 * 25 * any improvements or extensions that they make and grant Carnegie the 26 * rights to redistribute these changes. 27 */ 28 29 /**************************************************************************** 30 * * 31 * engine.c -- code for DAG execution engine * 32 * * 33 * Modified to work as follows (holland): * 34 * A user-thread calls into DispatchDAG, which fires off the nodes that * 35 * are direct successors to the header node. DispatchDAG then returns, * 36 * and the rest of the I/O continues asynchronously. As each node * 37 * completes, the node execution function calls FinishNode(). FinishNode * 38 * scans the list of successors to the node and increments the antecedent * 39 * counts. Each node that becomes enabled is placed on a central node * 40 * queue. A dedicated dag-execution thread grabs nodes off of this * 41 * queue and fires them. * 42 * * 43 * NULL nodes are never fired. * 44 * * 45 * Terminator nodes are never fired, but rather cause the callback * 46 * associated with the DAG to be invoked. * 47 * * 48 * If a node fails, the dag either rolls forward to the completion or * 49 * rolls back, undoing previously-completed nodes and fails atomically. * 50 * The direction of recovery is determined by the location of the failed * 51 * node in the graph. If the failure occured before the commit node in * 52 * the graph, backward recovery is used. Otherwise, forward recovery is * 53 * used. * 54 * * 55 ****************************************************************************/ 56 57 #include "rf_threadstuff.h" 58 59 #include <sys/errno.h> 60 61 #include "rf_dag.h" 62 #include "rf_engine.h" 63 #include "rf_etimer.h" 64 #include "rf_general.h" 65 #include "rf_dagutils.h" 66 #include "rf_shutdown.h" 67 #include "rf_raid.h" 68 69 static void DAGExecutionThread(RF_ThreadArg_t arg); 70 71 #define DO_INIT(_l_,_r_) { \ 72 int _rc; \ 73 _rc = rf_create_managed_mutex(_l_,&(_r_)->node_queue_mutex); \ 74 if (_rc) { \ 75 return(_rc); \ 76 } \ 77 _rc = rf_create_managed_cond(_l_,&(_r_)->node_queue_cond); \ 78 if (_rc) { \ 79 return(_rc); \ 80 } \ 81 } 82 83 /* synchronization primitives for this file. DO_WAIT should be enclosed in a while loop. */ 84 85 /* 86 * XXX Is this spl-ing really necessary? 87 */ 88 #define DO_LOCK(_r_) \ 89 do { \ 90 ks = splbio(); \ 91 RF_LOCK_MUTEX((_r_)->node_queue_mutex); \ 92 } while (0) 93 94 #define DO_UNLOCK(_r_) \ 95 do { \ 96 RF_UNLOCK_MUTEX((_r_)->node_queue_mutex); \ 97 splx(ks); \ 98 } while (0) 99 100 #define DO_WAIT(_r_) \ 101 RF_WAIT_COND((_r_)->node_queue, (_r_)->node_queue_mutex) 102 103 #define DO_SIGNAL(_r_) \ 104 RF_BROADCAST_COND((_r_)->node_queue) /* XXX RF_SIGNAL_COND? */ 105 106 static void rf_ShutdownEngine(void *); 107 108 static void 109 rf_ShutdownEngine(arg) 110 void *arg; 111 { 112 RF_Raid_t *raidPtr; 113 114 raidPtr = (RF_Raid_t *) arg; 115 raidPtr->shutdown_engine = 1; 116 DO_SIGNAL(raidPtr); 117 } 118 119 int 120 rf_ConfigureEngine( 121 RF_ShutdownList_t ** listp, 122 RF_Raid_t * raidPtr, 123 RF_Config_t * cfgPtr) 124 { 125 int rc; 126 127 DO_INIT(listp, raidPtr); 128 129 raidPtr->node_queue = NULL; 130 raidPtr->dags_in_flight = 0; 131 132 rc = rf_init_managed_threadgroup(listp, &raidPtr->engine_tg); 133 if (rc) 134 return (rc); 135 136 /* we create the execution thread only once per system boot. no need 137 * to check return code b/c the kernel panics if it can't create the 138 * thread. */ 139 if (rf_engineDebug) { 140 printf("raid%d: Creating engine thread\n", raidPtr->raidid); 141 } 142 if (RF_CREATE_THREAD(raidPtr->engine_thread, DAGExecutionThread, raidPtr,"raid")) { 143 RF_ERRORMSG("RAIDFRAME: Unable to create engine thread\n"); 144 return (ENOMEM); 145 } 146 if (rf_engineDebug) { 147 printf("raid%d: Created engine thread\n", raidPtr->raidid); 148 } 149 RF_THREADGROUP_STARTED(&raidPtr->engine_tg); 150 /* XXX something is missing here... */ 151 #ifdef debug 152 printf("Skipping the WAIT_START!!\n"); 153 #endif 154 #if 0 155 RF_THREADGROUP_WAIT_START(&raidPtr->engine_tg); 156 #endif 157 /* engine thread is now running and waiting for work */ 158 if (rf_engineDebug) { 159 printf("raid%d: Engine thread running and waiting for events\n", raidPtr->raidid); 160 } 161 rc = rf_ShutdownCreate(listp, rf_ShutdownEngine, raidPtr); 162 if (rc) { 163 RF_ERRORMSG3("Unable to add to shutdown list file %s line %d rc=%d\n", __FILE__, 164 __LINE__, rc); 165 rf_ShutdownEngine(NULL); 166 } 167 return (rc); 168 } 169 170 static int 171 BranchDone(RF_DagNode_t * node) 172 { 173 int i; 174 175 /* return true if forward execution is completed for a node and it's 176 * succedents */ 177 switch (node->status) { 178 case rf_wait: 179 /* should never be called in this state */ 180 RF_PANIC(); 181 break; 182 case rf_fired: 183 /* node is currently executing, so we're not done */ 184 return (RF_FALSE); 185 case rf_good: 186 for (i = 0; i < node->numSuccedents; i++) /* for each succedent */ 187 if (!BranchDone(node->succedents[i])) /* recursively check 188 * branch */ 189 return RF_FALSE; 190 return RF_TRUE; /* node and all succedent branches aren't in 191 * fired state */ 192 break; 193 case rf_bad: 194 /* succedents can't fire */ 195 return (RF_TRUE); 196 case rf_recover: 197 /* should never be called in this state */ 198 RF_PANIC(); 199 break; 200 case rf_undone: 201 case rf_panic: 202 /* XXX need to fix this case */ 203 /* for now, assume that we're done */ 204 return (RF_TRUE); 205 break; 206 default: 207 /* illegal node status */ 208 RF_PANIC(); 209 break; 210 } 211 } 212 213 static int 214 NodeReady(RF_DagNode_t * node) 215 { 216 int ready; 217 218 switch (node->dagHdr->status) { 219 case rf_enable: 220 case rf_rollForward: 221 if ((node->status == rf_wait) && (node->numAntecedents == node->numAntDone)) 222 ready = RF_TRUE; 223 else 224 ready = RF_FALSE; 225 break; 226 case rf_rollBackward: 227 RF_ASSERT(node->numSuccDone <= node->numSuccedents); 228 RF_ASSERT(node->numSuccFired <= node->numSuccedents); 229 RF_ASSERT(node->numSuccFired <= node->numSuccDone); 230 if ((node->status == rf_good) && (node->numSuccDone == node->numSuccedents)) 231 ready = RF_TRUE; 232 else 233 ready = RF_FALSE; 234 break; 235 default: 236 printf("Execution engine found illegal DAG status in NodeReady\n"); 237 RF_PANIC(); 238 break; 239 } 240 241 return (ready); 242 } 243 244 245 246 /* user context and dag-exec-thread context: 247 * Fire a node. The node's status field determines which function, do or undo, 248 * to be fired. 249 * This routine assumes that the node's status field has alread been set to 250 * "fired" or "recover" to indicate the direction of execution. 251 */ 252 static void 253 FireNode(RF_DagNode_t * node) 254 { 255 switch (node->status) { 256 case rf_fired: 257 /* fire the do function of a node */ 258 if (rf_engineDebug) { 259 printf("raid%d: Firing node 0x%lx (%s)\n", 260 node->dagHdr->raidPtr->raidid, 261 (unsigned long) node, node->name); 262 } 263 if (node->flags & RF_DAGNODE_FLAG_YIELD) { 264 #if defined(__NetBSD__) && defined(_KERNEL) 265 /* thread_block(); */ 266 /* printf("Need to block the thread here...\n"); */ 267 /* XXX thread_block is actually mentioned in 268 * /usr/include/vm/vm_extern.h */ 269 #else 270 thread_block(); 271 #endif 272 } 273 (*(node->doFunc)) (node); 274 break; 275 case rf_recover: 276 /* fire the undo function of a node */ 277 if (rf_engineDebug) { 278 printf("raid%d: Firing (undo) node 0x%lx (%s)\n", 279 node->dagHdr->raidPtr->raidid, 280 (unsigned long) node, node->name); 281 } 282 if (node->flags & RF_DAGNODE_FLAG_YIELD) 283 #if defined(__NetBSD__) && defined(_KERNEL) 284 /* thread_block(); */ 285 /* printf("Need to block the thread here...\n"); */ 286 /* XXX thread_block is actually mentioned in 287 * /usr/include/vm/vm_extern.h */ 288 #else 289 thread_block(); 290 #endif 291 (*(node->undoFunc)) (node); 292 break; 293 default: 294 RF_PANIC(); 295 break; 296 } 297 } 298 299 300 301 /* user context: 302 * Attempt to fire each node in a linear array. 303 * The entire list is fired atomically. 304 */ 305 static void 306 FireNodeArray( 307 int numNodes, 308 RF_DagNode_t ** nodeList) 309 { 310 RF_DagStatus_t dstat; 311 RF_DagNode_t *node; 312 int i, j; 313 314 /* first, mark all nodes which are ready to be fired */ 315 for (i = 0; i < numNodes; i++) { 316 node = nodeList[i]; 317 dstat = node->dagHdr->status; 318 RF_ASSERT((node->status == rf_wait) || (node->status == rf_good)); 319 if (NodeReady(node)) { 320 if ((dstat == rf_enable) || (dstat == rf_rollForward)) { 321 RF_ASSERT(node->status == rf_wait); 322 if (node->commitNode) 323 node->dagHdr->numCommits++; 324 node->status = rf_fired; 325 for (j = 0; j < node->numAntecedents; j++) 326 node->antecedents[j]->numSuccFired++; 327 } else { 328 RF_ASSERT(dstat == rf_rollBackward); 329 RF_ASSERT(node->status == rf_good); 330 RF_ASSERT(node->commitNode == RF_FALSE); /* only one commit node 331 * per graph */ 332 node->status = rf_recover; 333 } 334 } 335 } 336 /* now, fire the nodes */ 337 for (i = 0; i < numNodes; i++) { 338 if ((nodeList[i]->status == rf_fired) || (nodeList[i]->status == rf_recover)) 339 FireNode(nodeList[i]); 340 } 341 } 342 343 344 /* user context: 345 * Attempt to fire each node in a linked list. 346 * The entire list is fired atomically. 347 */ 348 static void 349 FireNodeList(RF_DagNode_t * nodeList) 350 { 351 RF_DagNode_t *node, *next; 352 RF_DagStatus_t dstat; 353 int j; 354 355 if (nodeList) { 356 /* first, mark all nodes which are ready to be fired */ 357 for (node = nodeList; node; node = next) { 358 next = node->next; 359 dstat = node->dagHdr->status; 360 RF_ASSERT((node->status == rf_wait) || (node->status == rf_good)); 361 if (NodeReady(node)) { 362 if ((dstat == rf_enable) || (dstat == rf_rollForward)) { 363 RF_ASSERT(node->status == rf_wait); 364 if (node->commitNode) 365 node->dagHdr->numCommits++; 366 node->status = rf_fired; 367 for (j = 0; j < node->numAntecedents; j++) 368 node->antecedents[j]->numSuccFired++; 369 } else { 370 RF_ASSERT(dstat == rf_rollBackward); 371 RF_ASSERT(node->status == rf_good); 372 RF_ASSERT(node->commitNode == RF_FALSE); /* only one commit node 373 * per graph */ 374 node->status = rf_recover; 375 } 376 } 377 } 378 /* now, fire the nodes */ 379 for (node = nodeList; node; node = next) { 380 next = node->next; 381 if ((node->status == rf_fired) || (node->status == rf_recover)) 382 FireNode(node); 383 } 384 } 385 } 386 /* interrupt context: 387 * for each succedent 388 * propagate required results from node to succedent 389 * increment succedent's numAntDone 390 * place newly-enable nodes on node queue for firing 391 * 392 * To save context switches, we don't place NIL nodes on the node queue, 393 * but rather just process them as if they had fired. Note that NIL nodes 394 * that are the direct successors of the header will actually get fired by 395 * DispatchDAG, which is fine because no context switches are involved. 396 * 397 * Important: when running at user level, this can be called by any 398 * disk thread, and so the increment and check of the antecedent count 399 * must be locked. I used the node queue mutex and locked down the 400 * entire function, but this is certainly overkill. 401 */ 402 static void 403 PropagateResults( 404 RF_DagNode_t * node, 405 int context) 406 { 407 RF_DagNode_t *s, *a; 408 RF_Raid_t *raidPtr; 409 int i, ks; 410 RF_DagNode_t *finishlist = NULL; /* a list of NIL nodes to be 411 * finished */ 412 RF_DagNode_t *skiplist = NULL; /* list of nodes with failed truedata 413 * antecedents */ 414 RF_DagNode_t *firelist = NULL; /* a list of nodes to be fired */ 415 RF_DagNode_t *q = NULL, *qh = NULL, *next; 416 int j, skipNode; 417 418 raidPtr = node->dagHdr->raidPtr; 419 420 DO_LOCK(raidPtr); 421 422 /* debug - validate fire counts */ 423 for (i = 0; i < node->numAntecedents; i++) { 424 a = *(node->antecedents + i); 425 RF_ASSERT(a->numSuccFired >= a->numSuccDone); 426 RF_ASSERT(a->numSuccFired <= a->numSuccedents); 427 a->numSuccDone++; 428 } 429 430 switch (node->dagHdr->status) { 431 case rf_enable: 432 case rf_rollForward: 433 for (i = 0; i < node->numSuccedents; i++) { 434 s = *(node->succedents + i); 435 RF_ASSERT(s->status == rf_wait); 436 (s->numAntDone)++; 437 if (s->numAntDone == s->numAntecedents) { 438 /* look for NIL nodes */ 439 if (s->doFunc == rf_NullNodeFunc) { 440 /* don't fire NIL nodes, just process 441 * them */ 442 s->next = finishlist; 443 finishlist = s; 444 } else { 445 /* look to see if the node is to be 446 * skipped */ 447 skipNode = RF_FALSE; 448 for (j = 0; j < s->numAntecedents; j++) 449 if ((s->antType[j] == rf_trueData) && (s->antecedents[j]->status == rf_bad)) 450 skipNode = RF_TRUE; 451 if (skipNode) { 452 /* this node has one or more 453 * failed true data 454 * dependencies, so skip it */ 455 s->next = skiplist; 456 skiplist = s; 457 } else 458 /* add s to list of nodes (q) 459 * to execute */ 460 if (context != RF_INTR_CONTEXT) { 461 /* we only have to 462 * enqueue if we're at 463 * intr context */ 464 s->next = firelist; /* put node on a list to 465 * be fired after we 466 * unlock */ 467 firelist = s; 468 } else { /* enqueue the node for 469 * the dag exec thread 470 * to fire */ 471 RF_ASSERT(NodeReady(s)); 472 if (q) { 473 q->next = s; 474 q = s; 475 } else { 476 qh = q = s; 477 qh->next = NULL; 478 } 479 } 480 } 481 } 482 } 483 484 if (q) { 485 /* xfer our local list of nodes to the node queue */ 486 q->next = raidPtr->node_queue; 487 raidPtr->node_queue = qh; 488 DO_SIGNAL(raidPtr); 489 } 490 DO_UNLOCK(raidPtr); 491 492 for (; skiplist; skiplist = next) { 493 next = skiplist->next; 494 skiplist->status = rf_skipped; 495 for (i = 0; i < skiplist->numAntecedents; i++) { 496 skiplist->antecedents[i]->numSuccFired++; 497 } 498 if (skiplist->commitNode) { 499 skiplist->dagHdr->numCommits++; 500 } 501 rf_FinishNode(skiplist, context); 502 } 503 for (; finishlist; finishlist = next) { 504 /* NIL nodes: no need to fire them */ 505 next = finishlist->next; 506 finishlist->status = rf_good; 507 for (i = 0; i < finishlist->numAntecedents; i++) { 508 finishlist->antecedents[i]->numSuccFired++; 509 } 510 if (finishlist->commitNode) 511 finishlist->dagHdr->numCommits++; 512 /* 513 * Okay, here we're calling rf_FinishNode() on nodes that 514 * have the null function as their work proc. Such a node 515 * could be the terminal node in a DAG. If so, it will 516 * cause the DAG to complete, which will in turn free 517 * memory used by the DAG, which includes the node in 518 * question. Thus, we must avoid referencing the node 519 * at all after calling rf_FinishNode() on it. 520 */ 521 rf_FinishNode(finishlist, context); /* recursive call */ 522 } 523 /* fire all nodes in firelist */ 524 FireNodeList(firelist); 525 break; 526 527 case rf_rollBackward: 528 for (i = 0; i < node->numAntecedents; i++) { 529 a = *(node->antecedents + i); 530 RF_ASSERT(a->status == rf_good); 531 RF_ASSERT(a->numSuccDone <= a->numSuccedents); 532 RF_ASSERT(a->numSuccDone <= a->numSuccFired); 533 534 if (a->numSuccDone == a->numSuccFired) { 535 if (a->undoFunc == rf_NullNodeFunc) { 536 /* don't fire NIL nodes, just process 537 * them */ 538 a->next = finishlist; 539 finishlist = a; 540 } else { 541 if (context != RF_INTR_CONTEXT) { 542 /* we only have to enqueue if 543 * we're at intr context */ 544 a->next = firelist; /* put node on a list to 545 * be fired after we 546 * unlock */ 547 firelist = a; 548 } else { /* enqueue the node for 549 * the dag exec thread 550 * to fire */ 551 RF_ASSERT(NodeReady(a)); 552 if (q) { 553 q->next = a; 554 q = a; 555 } else { 556 qh = q = a; 557 qh->next = NULL; 558 } 559 } 560 } 561 } 562 } 563 if (q) { 564 /* xfer our local list of nodes to the node queue */ 565 q->next = raidPtr->node_queue; 566 raidPtr->node_queue = qh; 567 DO_SIGNAL(raidPtr); 568 } 569 DO_UNLOCK(raidPtr); 570 for (; finishlist; finishlist = next) { /* NIL nodes: no need to 571 * fire them */ 572 next = finishlist->next; 573 finishlist->status = rf_good; 574 /* 575 * Okay, here we're calling rf_FinishNode() on nodes that 576 * have the null function as their work proc. Such a node 577 * could be the first node in a DAG. If so, it will 578 * cause the DAG to complete, which will in turn free 579 * memory used by the DAG, which includes the node in 580 * question. Thus, we must avoid referencing the node 581 * at all after calling rf_FinishNode() on it. 582 */ 583 rf_FinishNode(finishlist, context); /* recursive call */ 584 } 585 /* fire all nodes in firelist */ 586 FireNodeList(firelist); 587 588 break; 589 default: 590 printf("Engine found illegal DAG status in PropagateResults()\n"); 591 RF_PANIC(); 592 break; 593 } 594 } 595 596 597 598 /* 599 * Process a fired node which has completed 600 */ 601 static void 602 ProcessNode( 603 RF_DagNode_t * node, 604 int context) 605 { 606 RF_Raid_t *raidPtr; 607 608 raidPtr = node->dagHdr->raidPtr; 609 610 switch (node->status) { 611 case rf_good: 612 /* normal case, don't need to do anything */ 613 break; 614 case rf_bad: 615 if ((node->dagHdr->numCommits > 0) || (node->dagHdr->numCommitNodes == 0)) { 616 node->dagHdr->status = rf_rollForward; /* crossed commit 617 * barrier */ 618 if (rf_engineDebug || 1) { 619 printf("raid%d: node (%s) returned fail, rolling forward\n", raidPtr->raidid, node->name); 620 } 621 } else { 622 node->dagHdr->status = rf_rollBackward; /* never reached commit 623 * barrier */ 624 if (rf_engineDebug || 1) { 625 printf("raid%d: node (%s) returned fail, rolling backward\n", raidPtr->raidid, node->name); 626 } 627 } 628 break; 629 case rf_undone: 630 /* normal rollBackward case, don't need to do anything */ 631 break; 632 case rf_panic: 633 /* an undo node failed!!! */ 634 printf("UNDO of a node failed!!!/n"); 635 break; 636 default: 637 printf("node finished execution with an illegal status!!!\n"); 638 RF_PANIC(); 639 break; 640 } 641 642 /* enqueue node's succedents (antecedents if rollBackward) for 643 * execution */ 644 PropagateResults(node, context); 645 } 646 647 648 649 /* user context or dag-exec-thread context: 650 * This is the first step in post-processing a newly-completed node. 651 * This routine is called by each node execution function to mark the node 652 * as complete and fire off any successors that have been enabled. 653 */ 654 int 655 rf_FinishNode( 656 RF_DagNode_t * node, 657 int context) 658 { 659 /* as far as I can tell, retcode is not used -wvcii */ 660 int retcode = RF_FALSE; 661 node->dagHdr->numNodesCompleted++; 662 ProcessNode(node, context); 663 664 return (retcode); 665 } 666 667 668 /* user context: 669 * submit dag for execution, return non-zero if we have to wait for completion. 670 * if and only if we return non-zero, we'll cause cbFunc to get invoked with 671 * cbArg when the DAG has completed. 672 * 673 * for now we always return 1. If the DAG does not cause any I/O, then the callback 674 * may get invoked before DispatchDAG returns. There's code in state 5 of ContinueRaidAccess 675 * to handle this. 676 * 677 * All we do here is fire the direct successors of the header node. The 678 * DAG execution thread does the rest of the dag processing. 679 */ 680 int 681 rf_DispatchDAG( 682 RF_DagHeader_t * dag, 683 void (*cbFunc) (void *), 684 void *cbArg) 685 { 686 RF_Raid_t *raidPtr; 687 688 raidPtr = dag->raidPtr; 689 if (dag->tracerec) { 690 RF_ETIMER_START(dag->tracerec->timer); 691 } 692 if (rf_engineDebug || rf_validateDAGDebug) { 693 if (rf_ValidateDAG(dag)) 694 RF_PANIC(); 695 } 696 if (rf_engineDebug) { 697 printf("raid%d: Entering DispatchDAG\n", raidPtr->raidid); 698 } 699 raidPtr->dags_in_flight++; /* debug only: blow off proper 700 * locking */ 701 dag->cbFunc = cbFunc; 702 dag->cbArg = cbArg; 703 dag->numNodesCompleted = 0; 704 dag->status = rf_enable; 705 FireNodeArray(dag->numSuccedents, dag->succedents); 706 return (1); 707 } 708 /* dedicated kernel thread: 709 * the thread that handles all DAG node firing. 710 * To minimize locking and unlocking, we grab a copy of the entire node queue and then set the 711 * node queue to NULL before doing any firing of nodes. This way we only have to release the 712 * lock once. Of course, it's probably rare that there's more than one node in the queue at 713 * any one time, but it sometimes happens. 714 * 715 * In the kernel, this thread runs at spl0 and is not swappable. I copied these 716 * characteristics from the aio_completion_thread. 717 */ 718 719 static void 720 DAGExecutionThread(RF_ThreadArg_t arg) 721 { 722 RF_DagNode_t *nd, *local_nq, *term_nq, *fire_nq; 723 RF_Raid_t *raidPtr; 724 int ks; 725 int s; 726 727 raidPtr = (RF_Raid_t *) arg; 728 729 if (rf_engineDebug) { 730 printf("raid%d: Engine thread is running\n", raidPtr->raidid); 731 } 732 733 s = splbio(); 734 735 RF_THREADGROUP_RUNNING(&raidPtr->engine_tg); 736 737 DO_LOCK(raidPtr); 738 while (!raidPtr->shutdown_engine) { 739 740 while (raidPtr->node_queue != NULL) { 741 local_nq = raidPtr->node_queue; 742 fire_nq = NULL; 743 term_nq = NULL; 744 raidPtr->node_queue = NULL; 745 DO_UNLOCK(raidPtr); 746 747 /* first, strip out the terminal nodes */ 748 while (local_nq) { 749 nd = local_nq; 750 local_nq = local_nq->next; 751 switch (nd->dagHdr->status) { 752 case rf_enable: 753 case rf_rollForward: 754 if (nd->numSuccedents == 0) { 755 /* end of the dag, add to 756 * callback list */ 757 nd->next = term_nq; 758 term_nq = nd; 759 } else { 760 /* not the end, add to the 761 * fire queue */ 762 nd->next = fire_nq; 763 fire_nq = nd; 764 } 765 break; 766 case rf_rollBackward: 767 if (nd->numAntecedents == 0) { 768 /* end of the dag, add to the 769 * callback list */ 770 nd->next = term_nq; 771 term_nq = nd; 772 } else { 773 /* not the end, add to the 774 * fire queue */ 775 nd->next = fire_nq; 776 fire_nq = nd; 777 } 778 break; 779 default: 780 RF_PANIC(); 781 break; 782 } 783 } 784 785 /* execute callback of dags which have reached the 786 * terminal node */ 787 while (term_nq) { 788 nd = term_nq; 789 term_nq = term_nq->next; 790 nd->next = NULL; 791 (nd->dagHdr->cbFunc) (nd->dagHdr->cbArg); 792 raidPtr->dags_in_flight--; /* debug only */ 793 } 794 795 /* fire remaining nodes */ 796 FireNodeList(fire_nq); 797 798 DO_LOCK(raidPtr); 799 } 800 while (!raidPtr->shutdown_engine && raidPtr->node_queue == NULL) 801 DO_WAIT(raidPtr); 802 } 803 DO_UNLOCK(raidPtr); 804 805 RF_THREADGROUP_DONE(&raidPtr->engine_tg); 806 807 splx(s); 808 kthread_exit(0); 809 } 810