1 /* $NetBSD: rf_engine.c,v 1.9 2000/01/08 22:57:31 oster Exp $ */ 2 /* 3 * Copyright (c) 1995 Carnegie-Mellon University. 4 * All rights reserved. 5 * 6 * Author: William V. Courtright II, Mark Holland, Rachad Youssef 7 * 8 * Permission to use, copy, modify and distribute this software and 9 * its documentation is hereby granted, provided that both the copyright 10 * notice and this permission notice appear in all copies of the 11 * software, derivative works or modified versions, and any portions 12 * thereof, and that both notices appear in supporting documentation. 13 * 14 * CARNEGIE MELLON ALLOWS FREE USE OF THIS SOFTWARE IN ITS "AS IS" 15 * CONDITION. CARNEGIE MELLON DISCLAIMS ANY LIABILITY OF ANY KIND 16 * FOR ANY DAMAGES WHATSOEVER RESULTING FROM THE USE OF THIS SOFTWARE. 17 * 18 * Carnegie Mellon requests users of this software to return to 19 * 20 * Software Distribution Coordinator or Software.Distribution@CS.CMU.EDU 21 * School of Computer Science 22 * Carnegie Mellon University 23 * Pittsburgh PA 15213-3890 24 * 25 * any improvements or extensions that they make and grant Carnegie the 26 * rights to redistribute these changes. 27 */ 28 29 /**************************************************************************** 30 * * 31 * engine.c -- code for DAG execution engine * 32 * * 33 * Modified to work as follows (holland): * 34 * A user-thread calls into DispatchDAG, which fires off the nodes that * 35 * are direct successors to the header node. DispatchDAG then returns, * 36 * and the rest of the I/O continues asynchronously. As each node * 37 * completes, the node execution function calls FinishNode(). FinishNode * 38 * scans the list of successors to the node and increments the antecedent * 39 * counts. Each node that becomes enabled is placed on a central node * 40 * queue. A dedicated dag-execution thread grabs nodes off of this * 41 * queue and fires them. * 42 * * 43 * NULL nodes are never fired. * 44 * * 45 * Terminator nodes are never fired, but rather cause the callback * 46 * associated with the DAG to be invoked. * 47 * * 48 * If a node fails, the dag either rolls forward to the completion or * 49 * rolls back, undoing previously-completed nodes and fails atomically. * 50 * The direction of recovery is determined by the location of the failed * 51 * node in the graph. If the failure occured before the commit node in * 52 * the graph, backward recovery is used. Otherwise, forward recovery is * 53 * used. * 54 * * 55 ****************************************************************************/ 56 57 #include "rf_threadstuff.h" 58 59 #include <sys/errno.h> 60 61 #include "rf_dag.h" 62 #include "rf_engine.h" 63 #include "rf_etimer.h" 64 #include "rf_general.h" 65 #include "rf_dagutils.h" 66 #include "rf_shutdown.h" 67 #include "rf_raid.h" 68 69 static void DAGExecutionThread(RF_ThreadArg_t arg); 70 71 #define DO_INIT(_l_,_r_) { \ 72 int _rc; \ 73 _rc = rf_create_managed_mutex(_l_,&(_r_)->node_queue_mutex); \ 74 if (_rc) { \ 75 return(_rc); \ 76 } \ 77 _rc = rf_create_managed_cond(_l_,&(_r_)->node_queue_cond); \ 78 if (_rc) { \ 79 return(_rc); \ 80 } \ 81 } 82 83 /* synchronization primitives for this file. DO_WAIT should be enclosed in a while loop. */ 84 85 /* 86 * XXX Is this spl-ing really necessary? 87 */ 88 #define DO_LOCK(_r_) { ks = splbio(); RF_LOCK_MUTEX((_r_)->node_queue_mutex); } 89 #define DO_UNLOCK(_r_) { RF_UNLOCK_MUTEX((_r_)->node_queue_mutex); splx(ks); } 90 #define DO_WAIT(_r_) tsleep(&(_r_)->node_queue, PRIBIO, "raidframe nq",0) 91 #define DO_SIGNAL(_r_) wakeup(&(_r_)->node_queue) 92 93 static void rf_ShutdownEngine(void *); 94 95 static void 96 rf_ShutdownEngine(arg) 97 void *arg; 98 { 99 RF_Raid_t *raidPtr; 100 101 raidPtr = (RF_Raid_t *) arg; 102 raidPtr->shutdown_engine = 1; 103 DO_SIGNAL(raidPtr); 104 } 105 106 int 107 rf_ConfigureEngine( 108 RF_ShutdownList_t ** listp, 109 RF_Raid_t * raidPtr, 110 RF_Config_t * cfgPtr) 111 { 112 int rc; 113 114 DO_INIT(listp, raidPtr); 115 116 raidPtr->node_queue = NULL; 117 raidPtr->dags_in_flight = 0; 118 119 rc = rf_init_managed_threadgroup(listp, &raidPtr->engine_tg); 120 if (rc) 121 return (rc); 122 123 /* we create the execution thread only once per system boot. no need 124 * to check return code b/c the kernel panics if it can't create the 125 * thread. */ 126 if (rf_engineDebug) { 127 printf("raid%d: Creating engine thread\n", raidPtr->raidid); 128 } 129 if (RF_CREATE_THREAD(raidPtr->engine_thread, DAGExecutionThread, raidPtr,"raid")) { 130 RF_ERRORMSG("RAIDFRAME: Unable to create engine thread\n"); 131 return (ENOMEM); 132 } 133 if (rf_engineDebug) { 134 printf("raid%d: Created engine thread\n", raidPtr->raidid); 135 } 136 RF_THREADGROUP_STARTED(&raidPtr->engine_tg); 137 /* XXX something is missing here... */ 138 #ifdef debug 139 printf("Skipping the WAIT_START!!\n"); 140 #endif 141 #if 0 142 RF_THREADGROUP_WAIT_START(&raidPtr->engine_tg); 143 #endif 144 /* engine thread is now running and waiting for work */ 145 if (rf_engineDebug) { 146 printf("raid%d: Engine thread running and waiting for events\n", raidPtr->raidid); 147 } 148 rc = rf_ShutdownCreate(listp, rf_ShutdownEngine, raidPtr); 149 if (rc) { 150 RF_ERRORMSG3("Unable to add to shutdown list file %s line %d rc=%d\n", __FILE__, 151 __LINE__, rc); 152 rf_ShutdownEngine(NULL); 153 } 154 return (rc); 155 } 156 157 static int 158 BranchDone(RF_DagNode_t * node) 159 { 160 int i; 161 162 /* return true if forward execution is completed for a node and it's 163 * succedents */ 164 switch (node->status) { 165 case rf_wait: 166 /* should never be called in this state */ 167 RF_PANIC(); 168 break; 169 case rf_fired: 170 /* node is currently executing, so we're not done */ 171 return (RF_FALSE); 172 case rf_good: 173 for (i = 0; i < node->numSuccedents; i++) /* for each succedent */ 174 if (!BranchDone(node->succedents[i])) /* recursively check 175 * branch */ 176 return RF_FALSE; 177 return RF_TRUE; /* node and all succedent branches aren't in 178 * fired state */ 179 break; 180 case rf_bad: 181 /* succedents can't fire */ 182 return (RF_TRUE); 183 case rf_recover: 184 /* should never be called in this state */ 185 RF_PANIC(); 186 break; 187 case rf_undone: 188 case rf_panic: 189 /* XXX need to fix this case */ 190 /* for now, assume that we're done */ 191 return (RF_TRUE); 192 break; 193 default: 194 /* illegal node status */ 195 RF_PANIC(); 196 break; 197 } 198 } 199 200 static int 201 NodeReady(RF_DagNode_t * node) 202 { 203 int ready; 204 205 switch (node->dagHdr->status) { 206 case rf_enable: 207 case rf_rollForward: 208 if ((node->status == rf_wait) && (node->numAntecedents == node->numAntDone)) 209 ready = RF_TRUE; 210 else 211 ready = RF_FALSE; 212 break; 213 case rf_rollBackward: 214 RF_ASSERT(node->numSuccDone <= node->numSuccedents); 215 RF_ASSERT(node->numSuccFired <= node->numSuccedents); 216 RF_ASSERT(node->numSuccFired <= node->numSuccDone); 217 if ((node->status == rf_good) && (node->numSuccDone == node->numSuccedents)) 218 ready = RF_TRUE; 219 else 220 ready = RF_FALSE; 221 break; 222 default: 223 printf("Execution engine found illegal DAG status in NodeReady\n"); 224 RF_PANIC(); 225 break; 226 } 227 228 return (ready); 229 } 230 231 232 233 /* user context and dag-exec-thread context: 234 * Fire a node. The node's status field determines which function, do or undo, 235 * to be fired. 236 * This routine assumes that the node's status field has alread been set to 237 * "fired" or "recover" to indicate the direction of execution. 238 */ 239 static void 240 FireNode(RF_DagNode_t * node) 241 { 242 switch (node->status) { 243 case rf_fired: 244 /* fire the do function of a node */ 245 if (rf_engineDebug) { 246 printf("raid%d: Firing node 0x%lx (%s)\n", 247 node->dagHdr->raidPtr->raidid, 248 (unsigned long) node, node->name); 249 } 250 if (node->flags & RF_DAGNODE_FLAG_YIELD) { 251 #if defined(__NetBSD__) && defined(_KERNEL) 252 /* thread_block(); */ 253 /* printf("Need to block the thread here...\n"); */ 254 /* XXX thread_block is actually mentioned in 255 * /usr/include/vm/vm_extern.h */ 256 #else 257 thread_block(); 258 #endif 259 } 260 (*(node->doFunc)) (node); 261 break; 262 case rf_recover: 263 /* fire the undo function of a node */ 264 if (rf_engineDebug) { 265 printf("raid%d: Firing (undo) node 0x%lx (%s)\n", 266 node->dagHdr->raidPtr->raidid, 267 (unsigned long) node, node->name); 268 } 269 if (node->flags & RF_DAGNODE_FLAG_YIELD) 270 #if defined(__NetBSD__) && defined(_KERNEL) 271 /* thread_block(); */ 272 /* printf("Need to block the thread here...\n"); */ 273 /* XXX thread_block is actually mentioned in 274 * /usr/include/vm/vm_extern.h */ 275 #else 276 thread_block(); 277 #endif 278 (*(node->undoFunc)) (node); 279 break; 280 default: 281 RF_PANIC(); 282 break; 283 } 284 } 285 286 287 288 /* user context: 289 * Attempt to fire each node in a linear array. 290 * The entire list is fired atomically. 291 */ 292 static void 293 FireNodeArray( 294 int numNodes, 295 RF_DagNode_t ** nodeList) 296 { 297 RF_DagStatus_t dstat; 298 RF_DagNode_t *node; 299 int i, j; 300 301 /* first, mark all nodes which are ready to be fired */ 302 for (i = 0; i < numNodes; i++) { 303 node = nodeList[i]; 304 dstat = node->dagHdr->status; 305 RF_ASSERT((node->status == rf_wait) || (node->status == rf_good)); 306 if (NodeReady(node)) { 307 if ((dstat == rf_enable) || (dstat == rf_rollForward)) { 308 RF_ASSERT(node->status == rf_wait); 309 if (node->commitNode) 310 node->dagHdr->numCommits++; 311 node->status = rf_fired; 312 for (j = 0; j < node->numAntecedents; j++) 313 node->antecedents[j]->numSuccFired++; 314 } else { 315 RF_ASSERT(dstat == rf_rollBackward); 316 RF_ASSERT(node->status == rf_good); 317 RF_ASSERT(node->commitNode == RF_FALSE); /* only one commit node 318 * per graph */ 319 node->status = rf_recover; 320 } 321 } 322 } 323 /* now, fire the nodes */ 324 for (i = 0; i < numNodes; i++) { 325 if ((nodeList[i]->status == rf_fired) || (nodeList[i]->status == rf_recover)) 326 FireNode(nodeList[i]); 327 } 328 } 329 330 331 /* user context: 332 * Attempt to fire each node in a linked list. 333 * The entire list is fired atomically. 334 */ 335 static void 336 FireNodeList(RF_DagNode_t * nodeList) 337 { 338 RF_DagNode_t *node, *next; 339 RF_DagStatus_t dstat; 340 int j; 341 342 if (nodeList) { 343 /* first, mark all nodes which are ready to be fired */ 344 for (node = nodeList; node; node = next) { 345 next = node->next; 346 dstat = node->dagHdr->status; 347 RF_ASSERT((node->status == rf_wait) || (node->status == rf_good)); 348 if (NodeReady(node)) { 349 if ((dstat == rf_enable) || (dstat == rf_rollForward)) { 350 RF_ASSERT(node->status == rf_wait); 351 if (node->commitNode) 352 node->dagHdr->numCommits++; 353 node->status = rf_fired; 354 for (j = 0; j < node->numAntecedents; j++) 355 node->antecedents[j]->numSuccFired++; 356 } else { 357 RF_ASSERT(dstat == rf_rollBackward); 358 RF_ASSERT(node->status == rf_good); 359 RF_ASSERT(node->commitNode == RF_FALSE); /* only one commit node 360 * per graph */ 361 node->status = rf_recover; 362 } 363 } 364 } 365 /* now, fire the nodes */ 366 for (node = nodeList; node; node = next) { 367 next = node->next; 368 if ((node->status == rf_fired) || (node->status == rf_recover)) 369 FireNode(node); 370 } 371 } 372 } 373 /* interrupt context: 374 * for each succedent 375 * propagate required results from node to succedent 376 * increment succedent's numAntDone 377 * place newly-enable nodes on node queue for firing 378 * 379 * To save context switches, we don't place NIL nodes on the node queue, 380 * but rather just process them as if they had fired. Note that NIL nodes 381 * that are the direct successors of the header will actually get fired by 382 * DispatchDAG, which is fine because no context switches are involved. 383 * 384 * Important: when running at user level, this can be called by any 385 * disk thread, and so the increment and check of the antecedent count 386 * must be locked. I used the node queue mutex and locked down the 387 * entire function, but this is certainly overkill. 388 */ 389 static void 390 PropagateResults( 391 RF_DagNode_t * node, 392 int context) 393 { 394 RF_DagNode_t *s, *a; 395 RF_Raid_t *raidPtr; 396 int i, ks; 397 RF_DagNode_t *finishlist = NULL; /* a list of NIL nodes to be 398 * finished */ 399 RF_DagNode_t *skiplist = NULL; /* list of nodes with failed truedata 400 * antecedents */ 401 RF_DagNode_t *firelist = NULL; /* a list of nodes to be fired */ 402 RF_DagNode_t *q = NULL, *qh = NULL, *next; 403 int j, skipNode; 404 405 raidPtr = node->dagHdr->raidPtr; 406 407 DO_LOCK(raidPtr); 408 409 /* debug - validate fire counts */ 410 for (i = 0; i < node->numAntecedents; i++) { 411 a = *(node->antecedents + i); 412 RF_ASSERT(a->numSuccFired >= a->numSuccDone); 413 RF_ASSERT(a->numSuccFired <= a->numSuccedents); 414 a->numSuccDone++; 415 } 416 417 switch (node->dagHdr->status) { 418 case rf_enable: 419 case rf_rollForward: 420 for (i = 0; i < node->numSuccedents; i++) { 421 s = *(node->succedents + i); 422 RF_ASSERT(s->status == rf_wait); 423 (s->numAntDone)++; 424 if (s->numAntDone == s->numAntecedents) { 425 /* look for NIL nodes */ 426 if (s->doFunc == rf_NullNodeFunc) { 427 /* don't fire NIL nodes, just process 428 * them */ 429 s->next = finishlist; 430 finishlist = s; 431 } else { 432 /* look to see if the node is to be 433 * skipped */ 434 skipNode = RF_FALSE; 435 for (j = 0; j < s->numAntecedents; j++) 436 if ((s->antType[j] == rf_trueData) && (s->antecedents[j]->status == rf_bad)) 437 skipNode = RF_TRUE; 438 if (skipNode) { 439 /* this node has one or more 440 * failed true data 441 * dependencies, so skip it */ 442 s->next = skiplist; 443 skiplist = s; 444 } else 445 /* add s to list of nodes (q) 446 * to execute */ 447 if (context != RF_INTR_CONTEXT) { 448 /* we only have to 449 * enqueue if we're at 450 * intr context */ 451 s->next = firelist; /* put node on a list to 452 * be fired after we 453 * unlock */ 454 firelist = s; 455 } else { /* enqueue the node for 456 * the dag exec thread 457 * to fire */ 458 RF_ASSERT(NodeReady(s)); 459 if (q) { 460 q->next = s; 461 q = s; 462 } else { 463 qh = q = s; 464 qh->next = NULL; 465 } 466 } 467 } 468 } 469 } 470 471 if (q) { 472 /* xfer our local list of nodes to the node queue */ 473 q->next = raidPtr->node_queue; 474 raidPtr->node_queue = qh; 475 DO_SIGNAL(raidPtr); 476 } 477 DO_UNLOCK(raidPtr); 478 479 for (; skiplist; skiplist = next) { 480 next = skiplist->next; 481 skiplist->status = rf_skipped; 482 for (i = 0; i < skiplist->numAntecedents; i++) { 483 skiplist->antecedents[i]->numSuccFired++; 484 } 485 if (skiplist->commitNode) { 486 skiplist->dagHdr->numCommits++; 487 } 488 rf_FinishNode(skiplist, context); 489 } 490 for (; finishlist; finishlist = next) { 491 /* NIL nodes: no need to fire them */ 492 next = finishlist->next; 493 finishlist->status = rf_good; 494 for (i = 0; i < finishlist->numAntecedents; i++) { 495 finishlist->antecedents[i]->numSuccFired++; 496 } 497 if (finishlist->commitNode) 498 finishlist->dagHdr->numCommits++; 499 /* 500 * Okay, here we're calling rf_FinishNode() on nodes that 501 * have the null function as their work proc. Such a node 502 * could be the terminal node in a DAG. If so, it will 503 * cause the DAG to complete, which will in turn free 504 * memory used by the DAG, which includes the node in 505 * question. Thus, we must avoid referencing the node 506 * at all after calling rf_FinishNode() on it. 507 */ 508 rf_FinishNode(finishlist, context); /* recursive call */ 509 } 510 /* fire all nodes in firelist */ 511 FireNodeList(firelist); 512 break; 513 514 case rf_rollBackward: 515 for (i = 0; i < node->numAntecedents; i++) { 516 a = *(node->antecedents + i); 517 RF_ASSERT(a->status == rf_good); 518 RF_ASSERT(a->numSuccDone <= a->numSuccedents); 519 RF_ASSERT(a->numSuccDone <= a->numSuccFired); 520 521 if (a->numSuccDone == a->numSuccFired) { 522 if (a->undoFunc == rf_NullNodeFunc) { 523 /* don't fire NIL nodes, just process 524 * them */ 525 a->next = finishlist; 526 finishlist = a; 527 } else { 528 if (context != RF_INTR_CONTEXT) { 529 /* we only have to enqueue if 530 * we're at intr context */ 531 a->next = firelist; /* put node on a list to 532 * be fired after we 533 * unlock */ 534 firelist = a; 535 } else { /* enqueue the node for 536 * the dag exec thread 537 * to fire */ 538 RF_ASSERT(NodeReady(a)); 539 if (q) { 540 q->next = a; 541 q = a; 542 } else { 543 qh = q = a; 544 qh->next = NULL; 545 } 546 } 547 } 548 } 549 } 550 if (q) { 551 /* xfer our local list of nodes to the node queue */ 552 q->next = raidPtr->node_queue; 553 raidPtr->node_queue = qh; 554 DO_SIGNAL(raidPtr); 555 } 556 DO_UNLOCK(raidPtr); 557 for (; finishlist; finishlist = next) { /* NIL nodes: no need to 558 * fire them */ 559 next = finishlist->next; 560 finishlist->status = rf_good; 561 /* 562 * Okay, here we're calling rf_FinishNode() on nodes that 563 * have the null function as their work proc. Such a node 564 * could be the first node in a DAG. If so, it will 565 * cause the DAG to complete, which will in turn free 566 * memory used by the DAG, which includes the node in 567 * question. Thus, we must avoid referencing the node 568 * at all after calling rf_FinishNode() on it. 569 */ 570 rf_FinishNode(finishlist, context); /* recursive call */ 571 } 572 /* fire all nodes in firelist */ 573 FireNodeList(firelist); 574 575 break; 576 default: 577 printf("Engine found illegal DAG status in PropagateResults()\n"); 578 RF_PANIC(); 579 break; 580 } 581 } 582 583 584 585 /* 586 * Process a fired node which has completed 587 */ 588 static void 589 ProcessNode( 590 RF_DagNode_t * node, 591 int context) 592 { 593 RF_Raid_t *raidPtr; 594 595 raidPtr = node->dagHdr->raidPtr; 596 597 switch (node->status) { 598 case rf_good: 599 /* normal case, don't need to do anything */ 600 break; 601 case rf_bad: 602 if ((node->dagHdr->numCommits > 0) || (node->dagHdr->numCommitNodes == 0)) { 603 node->dagHdr->status = rf_rollForward; /* crossed commit 604 * barrier */ 605 if (rf_engineDebug || 1) { 606 printf("raid%d: node (%s) returned fail, rolling forward\n", raidPtr->raidid, node->name); 607 } 608 } else { 609 node->dagHdr->status = rf_rollBackward; /* never reached commit 610 * barrier */ 611 if (rf_engineDebug || 1) { 612 printf("raid%d: node (%s) returned fail, rolling backward\n", raidPtr->raidid, node->name); 613 } 614 } 615 break; 616 case rf_undone: 617 /* normal rollBackward case, don't need to do anything */ 618 break; 619 case rf_panic: 620 /* an undo node failed!!! */ 621 printf("UNDO of a node failed!!!/n"); 622 break; 623 default: 624 printf("node finished execution with an illegal status!!!\n"); 625 RF_PANIC(); 626 break; 627 } 628 629 /* enqueue node's succedents (antecedents if rollBackward) for 630 * execution */ 631 PropagateResults(node, context); 632 } 633 634 635 636 /* user context or dag-exec-thread context: 637 * This is the first step in post-processing a newly-completed node. 638 * This routine is called by each node execution function to mark the node 639 * as complete and fire off any successors that have been enabled. 640 */ 641 int 642 rf_FinishNode( 643 RF_DagNode_t * node, 644 int context) 645 { 646 /* as far as I can tell, retcode is not used -wvcii */ 647 int retcode = RF_FALSE; 648 node->dagHdr->numNodesCompleted++; 649 ProcessNode(node, context); 650 651 return (retcode); 652 } 653 654 655 /* user context: 656 * submit dag for execution, return non-zero if we have to wait for completion. 657 * if and only if we return non-zero, we'll cause cbFunc to get invoked with 658 * cbArg when the DAG has completed. 659 * 660 * for now we always return 1. If the DAG does not cause any I/O, then the callback 661 * may get invoked before DispatchDAG returns. There's code in state 5 of ContinueRaidAccess 662 * to handle this. 663 * 664 * All we do here is fire the direct successors of the header node. The 665 * DAG execution thread does the rest of the dag processing. 666 */ 667 int 668 rf_DispatchDAG( 669 RF_DagHeader_t * dag, 670 void (*cbFunc) (void *), 671 void *cbArg) 672 { 673 RF_Raid_t *raidPtr; 674 675 raidPtr = dag->raidPtr; 676 if (dag->tracerec) { 677 RF_ETIMER_START(dag->tracerec->timer); 678 } 679 if (rf_engineDebug || rf_validateDAGDebug) { 680 if (rf_ValidateDAG(dag)) 681 RF_PANIC(); 682 } 683 if (rf_engineDebug) { 684 printf("raid%d: Entering DispatchDAG\n", raidPtr->raidid); 685 } 686 raidPtr->dags_in_flight++; /* debug only: blow off proper 687 * locking */ 688 dag->cbFunc = cbFunc; 689 dag->cbArg = cbArg; 690 dag->numNodesCompleted = 0; 691 dag->status = rf_enable; 692 FireNodeArray(dag->numSuccedents, dag->succedents); 693 return (1); 694 } 695 /* dedicated kernel thread: 696 * the thread that handles all DAG node firing. 697 * To minimize locking and unlocking, we grab a copy of the entire node queue and then set the 698 * node queue to NULL before doing any firing of nodes. This way we only have to release the 699 * lock once. Of course, it's probably rare that there's more than one node in the queue at 700 * any one time, but it sometimes happens. 701 * 702 * In the kernel, this thread runs at spl0 and is not swappable. I copied these 703 * characteristics from the aio_completion_thread. 704 */ 705 706 static void 707 DAGExecutionThread(RF_ThreadArg_t arg) 708 { 709 RF_DagNode_t *nd, *local_nq, *term_nq, *fire_nq; 710 RF_Raid_t *raidPtr; 711 int ks; 712 int s; 713 714 raidPtr = (RF_Raid_t *) arg; 715 716 if (rf_engineDebug) { 717 printf("raid%d: Engine thread is running\n", raidPtr->raidid); 718 } 719 720 s = splbio(); 721 722 RF_THREADGROUP_RUNNING(&raidPtr->engine_tg); 723 724 DO_LOCK(raidPtr); 725 while (!raidPtr->shutdown_engine) { 726 727 while (raidPtr->node_queue != NULL) { 728 local_nq = raidPtr->node_queue; 729 fire_nq = NULL; 730 term_nq = NULL; 731 raidPtr->node_queue = NULL; 732 DO_UNLOCK(raidPtr); 733 734 /* first, strip out the terminal nodes */ 735 while (local_nq) { 736 nd = local_nq; 737 local_nq = local_nq->next; 738 switch (nd->dagHdr->status) { 739 case rf_enable: 740 case rf_rollForward: 741 if (nd->numSuccedents == 0) { 742 /* end of the dag, add to 743 * callback list */ 744 nd->next = term_nq; 745 term_nq = nd; 746 } else { 747 /* not the end, add to the 748 * fire queue */ 749 nd->next = fire_nq; 750 fire_nq = nd; 751 } 752 break; 753 case rf_rollBackward: 754 if (nd->numAntecedents == 0) { 755 /* end of the dag, add to the 756 * callback list */ 757 nd->next = term_nq; 758 term_nq = nd; 759 } else { 760 /* not the end, add to the 761 * fire queue */ 762 nd->next = fire_nq; 763 fire_nq = nd; 764 } 765 break; 766 default: 767 RF_PANIC(); 768 break; 769 } 770 } 771 772 /* execute callback of dags which have reached the 773 * terminal node */ 774 while (term_nq) { 775 nd = term_nq; 776 term_nq = term_nq->next; 777 nd->next = NULL; 778 (nd->dagHdr->cbFunc) (nd->dagHdr->cbArg); 779 raidPtr->dags_in_flight--; /* debug only */ 780 } 781 782 /* fire remaining nodes */ 783 FireNodeList(fire_nq); 784 785 DO_LOCK(raidPtr); 786 } 787 while (!raidPtr->shutdown_engine && raidPtr->node_queue == NULL) 788 DO_WAIT(raidPtr); 789 } 790 DO_UNLOCK(raidPtr); 791 792 RF_THREADGROUP_DONE(&raidPtr->engine_tg); 793 794 splx(s); 795 kthread_exit(0); 796 } 797