1 /* $NetBSD: rf_engine.c,v 1.5 1999/03/14 21:53:31 oster Exp $ */ 2 /* 3 * Copyright (c) 1995 Carnegie-Mellon University. 4 * All rights reserved. 5 * 6 * Author: William V. Courtright II, Mark Holland, Rachad Youssef 7 * 8 * Permission to use, copy, modify and distribute this software and 9 * its documentation is hereby granted, provided that both the copyright 10 * notice and this permission notice appear in all copies of the 11 * software, derivative works or modified versions, and any portions 12 * thereof, and that both notices appear in supporting documentation. 13 * 14 * CARNEGIE MELLON ALLOWS FREE USE OF THIS SOFTWARE IN ITS "AS IS" 15 * CONDITION. CARNEGIE MELLON DISCLAIMS ANY LIABILITY OF ANY KIND 16 * FOR ANY DAMAGES WHATSOEVER RESULTING FROM THE USE OF THIS SOFTWARE. 17 * 18 * Carnegie Mellon requests users of this software to return to 19 * 20 * Software Distribution Coordinator or Software.Distribution@CS.CMU.EDU 21 * School of Computer Science 22 * Carnegie Mellon University 23 * Pittsburgh PA 15213-3890 24 * 25 * any improvements or extensions that they make and grant Carnegie the 26 * rights to redistribute these changes. 27 */ 28 29 /**************************************************************************** 30 * * 31 * engine.c -- code for DAG execution engine * 32 * * 33 * Modified to work as follows (holland): * 34 * A user-thread calls into DispatchDAG, which fires off the nodes that * 35 * are direct successors to the header node. DispatchDAG then returns, * 36 * and the rest of the I/O continues asynchronously. As each node * 37 * completes, the node execution function calls FinishNode(). FinishNode * 38 * scans the list of successors to the node and increments the antecedent * 39 * counts. Each node that becomes enabled is placed on a central node * 40 * queue. A dedicated dag-execution thread grabs nodes off of this * 41 * queue and fires them. * 42 * * 43 * NULL nodes are never fired. * 44 * * 45 * Terminator nodes are never fired, but rather cause the callback * 46 * associated with the DAG to be invoked. * 47 * * 48 * If a node fails, the dag either rolls forward to the completion or * 49 * rolls back, undoing previously-completed nodes and fails atomically. * 50 * The direction of recovery is determined by the location of the failed * 51 * node in the graph. If the failure occured before the commit node in * 52 * the graph, backward recovery is used. Otherwise, forward recovery is * 53 * used. * 54 * * 55 ****************************************************************************/ 56 57 #include "rf_threadstuff.h" 58 59 #include <sys/errno.h> 60 61 #include "rf_dag.h" 62 #include "rf_engine.h" 63 #include "rf_threadid.h" 64 #include "rf_etimer.h" 65 #include "rf_general.h" 66 #include "rf_dagutils.h" 67 #include "rf_shutdown.h" 68 #include "rf_raid.h" 69 70 static void DAGExecutionThread(RF_ThreadArg_t arg); 71 72 #define DO_INIT(_l_,_r_) { \ 73 int _rc; \ 74 _rc = rf_create_managed_mutex(_l_,&(_r_)->node_queue_mutex); \ 75 if (_rc) { \ 76 return(_rc); \ 77 } \ 78 _rc = rf_create_managed_cond(_l_,&(_r_)->node_queue_cond); \ 79 if (_rc) { \ 80 return(_rc); \ 81 } \ 82 } 83 84 /* synchronization primitives for this file. DO_WAIT should be enclosed in a while loop. */ 85 86 /* 87 * XXX Is this spl-ing really necessary? 88 */ 89 #define DO_LOCK(_r_) { ks = splbio(); RF_LOCK_MUTEX((_r_)->node_queue_mutex); } 90 #define DO_UNLOCK(_r_) { RF_UNLOCK_MUTEX((_r_)->node_queue_mutex); splx(ks); } 91 #define DO_WAIT(_r_) tsleep(&(_r_)->node_queue, PRIBIO, "raidframe nq",0) 92 #define DO_SIGNAL(_r_) wakeup(&(_r_)->node_queue) 93 94 static void rf_ShutdownEngine(void *); 95 96 static void 97 rf_ShutdownEngine(arg) 98 void *arg; 99 { 100 RF_Raid_t *raidPtr; 101 102 raidPtr = (RF_Raid_t *) arg; 103 raidPtr->shutdown_engine = 1; 104 DO_SIGNAL(raidPtr); 105 } 106 107 int 108 rf_ConfigureEngine( 109 RF_ShutdownList_t ** listp, 110 RF_Raid_t * raidPtr, 111 RF_Config_t * cfgPtr) 112 { 113 int rc, tid = 0; 114 115 if (rf_engineDebug) { 116 rf_get_threadid(tid); 117 } 118 DO_INIT(listp, raidPtr); 119 120 raidPtr->node_queue = NULL; 121 raidPtr->dags_in_flight = 0; 122 123 rc = rf_init_managed_threadgroup(listp, &raidPtr->engine_tg); 124 if (rc) 125 return (rc); 126 127 /* we create the execution thread only once per system boot. no need 128 * to check return code b/c the kernel panics if it can't create the 129 * thread. */ 130 if (rf_engineDebug) { 131 printf("[%d] Creating engine thread\n", tid); 132 } 133 if (RF_CREATE_THREAD(raidPtr->engine_thread, DAGExecutionThread, raidPtr)) { 134 RF_ERRORMSG("RAIDFRAME: Unable to create engine thread\n"); 135 return (ENOMEM); 136 } 137 if (rf_engineDebug) { 138 printf("[%d] Created engine thread\n", tid); 139 } 140 RF_THREADGROUP_STARTED(&raidPtr->engine_tg); 141 /* XXX something is missing here... */ 142 #ifdef debug 143 printf("Skipping the WAIT_START!!\n"); 144 #endif 145 #if 0 146 RF_THREADGROUP_WAIT_START(&raidPtr->engine_tg); 147 #endif 148 /* engine thread is now running and waiting for work */ 149 if (rf_engineDebug) { 150 printf("[%d] Engine thread running and waiting for events\n", tid); 151 } 152 rc = rf_ShutdownCreate(listp, rf_ShutdownEngine, raidPtr); 153 if (rc) { 154 RF_ERRORMSG3("Unable to add to shutdown list file %s line %d rc=%d\n", __FILE__, 155 __LINE__, rc); 156 rf_ShutdownEngine(NULL); 157 } 158 return (rc); 159 } 160 161 static int 162 BranchDone(RF_DagNode_t * node) 163 { 164 int i; 165 166 /* return true if forward execution is completed for a node and it's 167 * succedents */ 168 switch (node->status) { 169 case rf_wait: 170 /* should never be called in this state */ 171 RF_PANIC(); 172 break; 173 case rf_fired: 174 /* node is currently executing, so we're not done */ 175 return (RF_FALSE); 176 case rf_good: 177 for (i = 0; i < node->numSuccedents; i++) /* for each succedent */ 178 if (!BranchDone(node->succedents[i])) /* recursively check 179 * branch */ 180 return RF_FALSE; 181 return RF_TRUE; /* node and all succedent branches aren't in 182 * fired state */ 183 break; 184 case rf_bad: 185 /* succedents can't fire */ 186 return (RF_TRUE); 187 case rf_recover: 188 /* should never be called in this state */ 189 RF_PANIC(); 190 break; 191 case rf_undone: 192 case rf_panic: 193 /* XXX need to fix this case */ 194 /* for now, assume that we're done */ 195 return (RF_TRUE); 196 break; 197 default: 198 /* illegal node status */ 199 RF_PANIC(); 200 break; 201 } 202 } 203 204 static int 205 NodeReady(RF_DagNode_t * node) 206 { 207 int ready; 208 209 switch (node->dagHdr->status) { 210 case rf_enable: 211 case rf_rollForward: 212 if ((node->status == rf_wait) && (node->numAntecedents == node->numAntDone)) 213 ready = RF_TRUE; 214 else 215 ready = RF_FALSE; 216 break; 217 case rf_rollBackward: 218 RF_ASSERT(node->numSuccDone <= node->numSuccedents); 219 RF_ASSERT(node->numSuccFired <= node->numSuccedents); 220 RF_ASSERT(node->numSuccFired <= node->numSuccDone); 221 if ((node->status == rf_good) && (node->numSuccDone == node->numSuccedents)) 222 ready = RF_TRUE; 223 else 224 ready = RF_FALSE; 225 break; 226 default: 227 printf("Execution engine found illegal DAG status in NodeReady\n"); 228 RF_PANIC(); 229 break; 230 } 231 232 return (ready); 233 } 234 235 236 237 /* user context and dag-exec-thread context: 238 * Fire a node. The node's status field determines which function, do or undo, 239 * to be fired. 240 * This routine assumes that the node's status field has alread been set to 241 * "fired" or "recover" to indicate the direction of execution. 242 */ 243 static void 244 FireNode(RF_DagNode_t * node) 245 { 246 int tid; 247 248 switch (node->status) { 249 case rf_fired: 250 /* fire the do function of a node */ 251 if (rf_engineDebug) { 252 rf_get_threadid(tid); 253 printf("[%d] Firing node 0x%lx (%s)\n", tid, (unsigned long) node, node->name); 254 } 255 if (node->flags & RF_DAGNODE_FLAG_YIELD) { 256 #if defined(__NetBSD__) && defined(_KERNEL) 257 /* thread_block(); */ 258 /* printf("Need to block the thread here...\n"); */ 259 /* XXX thread_block is actually mentioned in 260 * /usr/include/vm/vm_extern.h */ 261 #else 262 thread_block(); 263 #endif 264 } 265 (*(node->doFunc)) (node); 266 break; 267 case rf_recover: 268 /* fire the undo function of a node */ 269 if (rf_engineDebug || 1) { 270 rf_get_threadid(tid); 271 printf("[%d] Firing (undo) node 0x%lx (%s)\n", tid, (unsigned long) node, node->name); 272 } 273 if (node->flags & RF_DAGNODE_FLAG_YIELD) 274 #if defined(__NetBSD__) && defined(_KERNEL) 275 /* thread_block(); */ 276 /* printf("Need to block the thread here...\n"); */ 277 /* XXX thread_block is actually mentioned in 278 * /usr/include/vm/vm_extern.h */ 279 #else 280 thread_block(); 281 #endif 282 (*(node->undoFunc)) (node); 283 break; 284 default: 285 RF_PANIC(); 286 break; 287 } 288 } 289 290 291 292 /* user context: 293 * Attempt to fire each node in a linear array. 294 * The entire list is fired atomically. 295 */ 296 static void 297 FireNodeArray( 298 int numNodes, 299 RF_DagNode_t ** nodeList) 300 { 301 RF_DagStatus_t dstat; 302 RF_DagNode_t *node; 303 int i, j; 304 305 /* first, mark all nodes which are ready to be fired */ 306 for (i = 0; i < numNodes; i++) { 307 node = nodeList[i]; 308 dstat = node->dagHdr->status; 309 RF_ASSERT((node->status == rf_wait) || (node->status == rf_good)); 310 if (NodeReady(node)) { 311 if ((dstat == rf_enable) || (dstat == rf_rollForward)) { 312 RF_ASSERT(node->status == rf_wait); 313 if (node->commitNode) 314 node->dagHdr->numCommits++; 315 node->status = rf_fired; 316 for (j = 0; j < node->numAntecedents; j++) 317 node->antecedents[j]->numSuccFired++; 318 } else { 319 RF_ASSERT(dstat == rf_rollBackward); 320 RF_ASSERT(node->status == rf_good); 321 RF_ASSERT(node->commitNode == RF_FALSE); /* only one commit node 322 * per graph */ 323 node->status = rf_recover; 324 } 325 } 326 } 327 /* now, fire the nodes */ 328 for (i = 0; i < numNodes; i++) { 329 if ((nodeList[i]->status == rf_fired) || (nodeList[i]->status == rf_recover)) 330 FireNode(nodeList[i]); 331 } 332 } 333 334 335 /* user context: 336 * Attempt to fire each node in a linked list. 337 * The entire list is fired atomically. 338 */ 339 static void 340 FireNodeList(RF_DagNode_t * nodeList) 341 { 342 RF_DagNode_t *node, *next; 343 RF_DagStatus_t dstat; 344 int j; 345 346 if (nodeList) { 347 /* first, mark all nodes which are ready to be fired */ 348 for (node = nodeList; node; node = next) { 349 next = node->next; 350 dstat = node->dagHdr->status; 351 RF_ASSERT((node->status == rf_wait) || (node->status == rf_good)); 352 if (NodeReady(node)) { 353 if ((dstat == rf_enable) || (dstat == rf_rollForward)) { 354 RF_ASSERT(node->status == rf_wait); 355 if (node->commitNode) 356 node->dagHdr->numCommits++; 357 node->status = rf_fired; 358 for (j = 0; j < node->numAntecedents; j++) 359 node->antecedents[j]->numSuccFired++; 360 } else { 361 RF_ASSERT(dstat == rf_rollBackward); 362 RF_ASSERT(node->status == rf_good); 363 RF_ASSERT(node->commitNode == RF_FALSE); /* only one commit node 364 * per graph */ 365 node->status = rf_recover; 366 } 367 } 368 } 369 /* now, fire the nodes */ 370 for (node = nodeList; node; node = next) { 371 next = node->next; 372 if ((node->status == rf_fired) || (node->status == rf_recover)) 373 FireNode(node); 374 } 375 } 376 } 377 /* interrupt context: 378 * for each succedent 379 * propagate required results from node to succedent 380 * increment succedent's numAntDone 381 * place newly-enable nodes on node queue for firing 382 * 383 * To save context switches, we don't place NIL nodes on the node queue, 384 * but rather just process them as if they had fired. Note that NIL nodes 385 * that are the direct successors of the header will actually get fired by 386 * DispatchDAG, which is fine because no context switches are involved. 387 * 388 * Important: when running at user level, this can be called by any 389 * disk thread, and so the increment and check of the antecedent count 390 * must be locked. I used the node queue mutex and locked down the 391 * entire function, but this is certainly overkill. 392 */ 393 static void 394 PropagateResults( 395 RF_DagNode_t * node, 396 int context) 397 { 398 RF_DagNode_t *s, *a; 399 RF_Raid_t *raidPtr; 400 int tid, i, ks; 401 RF_DagNode_t *finishlist = NULL; /* a list of NIL nodes to be 402 * finished */ 403 RF_DagNode_t *skiplist = NULL; /* list of nodes with failed truedata 404 * antecedents */ 405 RF_DagNode_t *firelist = NULL; /* a list of nodes to be fired */ 406 RF_DagNode_t *q = NULL, *qh = NULL, *next; 407 int j, skipNode; 408 409 rf_get_threadid(tid); 410 411 raidPtr = node->dagHdr->raidPtr; 412 413 DO_LOCK(raidPtr); 414 415 /* debug - validate fire counts */ 416 for (i = 0; i < node->numAntecedents; i++) { 417 a = *(node->antecedents + i); 418 RF_ASSERT(a->numSuccFired >= a->numSuccDone); 419 RF_ASSERT(a->numSuccFired <= a->numSuccedents); 420 a->numSuccDone++; 421 } 422 423 switch (node->dagHdr->status) { 424 case rf_enable: 425 case rf_rollForward: 426 for (i = 0; i < node->numSuccedents; i++) { 427 s = *(node->succedents + i); 428 RF_ASSERT(s->status == rf_wait); 429 (s->numAntDone)++; 430 if (s->numAntDone == s->numAntecedents) { 431 /* look for NIL nodes */ 432 if (s->doFunc == rf_NullNodeFunc) { 433 /* don't fire NIL nodes, just process 434 * them */ 435 s->next = finishlist; 436 finishlist = s; 437 } else { 438 /* look to see if the node is to be 439 * skipped */ 440 skipNode = RF_FALSE; 441 for (j = 0; j < s->numAntecedents; j++) 442 if ((s->antType[j] == rf_trueData) && (s->antecedents[j]->status == rf_bad)) 443 skipNode = RF_TRUE; 444 if (skipNode) { 445 /* this node has one or more 446 * failed true data 447 * dependencies, so skip it */ 448 s->next = skiplist; 449 skiplist = s; 450 } else 451 /* add s to list of nodes (q) 452 * to execute */ 453 if (context != RF_INTR_CONTEXT) { 454 /* we only have to 455 * enqueue if we're at 456 * intr context */ 457 s->next = firelist; /* put node on a list to 458 * be fired after we 459 * unlock */ 460 firelist = s; 461 } else { /* enqueue the node for 462 * the dag exec thread 463 * to fire */ 464 RF_ASSERT(NodeReady(s)); 465 if (q) { 466 q->next = s; 467 q = s; 468 } else { 469 qh = q = s; 470 qh->next = NULL; 471 } 472 } 473 } 474 } 475 } 476 477 if (q) { 478 /* xfer our local list of nodes to the node queue */ 479 q->next = raidPtr->node_queue; 480 raidPtr->node_queue = qh; 481 DO_SIGNAL(raidPtr); 482 } 483 DO_UNLOCK(raidPtr); 484 485 for (; skiplist; skiplist = next) { 486 next = skiplist->next; 487 skiplist->status = rf_skipped; 488 for (i = 0; i < skiplist->numAntecedents; i++) { 489 skiplist->antecedents[i]->numSuccFired++; 490 } 491 if (skiplist->commitNode) { 492 skiplist->dagHdr->numCommits++; 493 } 494 rf_FinishNode(skiplist, context); 495 } 496 for (; finishlist; finishlist = next) { 497 /* NIL nodes: no need to fire them */ 498 next = finishlist->next; 499 finishlist->status = rf_good; 500 for (i = 0; i < finishlist->numAntecedents; i++) { 501 finishlist->antecedents[i]->numSuccFired++; 502 } 503 if (finishlist->commitNode) 504 finishlist->dagHdr->numCommits++; 505 /* 506 * Okay, here we're calling rf_FinishNode() on nodes that 507 * have the null function as their work proc. Such a node 508 * could be the terminal node in a DAG. If so, it will 509 * cause the DAG to complete, which will in turn free 510 * memory used by the DAG, which includes the node in 511 * question. Thus, we must avoid referencing the node 512 * at all after calling rf_FinishNode() on it. 513 */ 514 rf_FinishNode(finishlist, context); /* recursive call */ 515 } 516 /* fire all nodes in firelist */ 517 FireNodeList(firelist); 518 break; 519 520 case rf_rollBackward: 521 for (i = 0; i < node->numAntecedents; i++) { 522 a = *(node->antecedents + i); 523 RF_ASSERT(a->status == rf_good); 524 RF_ASSERT(a->numSuccDone <= a->numSuccedents); 525 RF_ASSERT(a->numSuccDone <= a->numSuccFired); 526 527 if (a->numSuccDone == a->numSuccFired) { 528 if (a->undoFunc == rf_NullNodeFunc) { 529 /* don't fire NIL nodes, just process 530 * them */ 531 a->next = finishlist; 532 finishlist = a; 533 } else { 534 if (context != RF_INTR_CONTEXT) { 535 /* we only have to enqueue if 536 * we're at intr context */ 537 a->next = firelist; /* put node on a list to 538 * be fired after we 539 * unlock */ 540 firelist = a; 541 } else { /* enqueue the node for 542 * the dag exec thread 543 * to fire */ 544 RF_ASSERT(NodeReady(a)); 545 if (q) { 546 q->next = a; 547 q = a; 548 } else { 549 qh = q = a; 550 qh->next = NULL; 551 } 552 } 553 } 554 } 555 } 556 if (q) { 557 /* xfer our local list of nodes to the node queue */ 558 q->next = raidPtr->node_queue; 559 raidPtr->node_queue = qh; 560 DO_SIGNAL(raidPtr); 561 } 562 DO_UNLOCK(raidPtr); 563 for (; finishlist; finishlist = next) { /* NIL nodes: no need to 564 * fire them */ 565 next = finishlist->next; 566 finishlist->status = rf_good; 567 /* 568 * Okay, here we're calling rf_FinishNode() on nodes that 569 * have the null function as their work proc. Such a node 570 * could be the first node in a DAG. If so, it will 571 * cause the DAG to complete, which will in turn free 572 * memory used by the DAG, which includes the node in 573 * question. Thus, we must avoid referencing the node 574 * at all after calling rf_FinishNode() on it. 575 */ 576 rf_FinishNode(finishlist, context); /* recursive call */ 577 } 578 /* fire all nodes in firelist */ 579 FireNodeList(firelist); 580 581 break; 582 default: 583 printf("Engine found illegal DAG status in PropagateResults()\n"); 584 RF_PANIC(); 585 break; 586 } 587 } 588 589 590 591 /* 592 * Process a fired node which has completed 593 */ 594 static void 595 ProcessNode( 596 RF_DagNode_t * node, 597 int context) 598 { 599 RF_Raid_t *raidPtr; 600 int tid; 601 602 raidPtr = node->dagHdr->raidPtr; 603 604 switch (node->status) { 605 case rf_good: 606 /* normal case, don't need to do anything */ 607 break; 608 case rf_bad: 609 if ((node->dagHdr->numCommits > 0) || (node->dagHdr->numCommitNodes == 0)) { 610 node->dagHdr->status = rf_rollForward; /* crossed commit 611 * barrier */ 612 if (rf_engineDebug || 1) { 613 rf_get_threadid(tid); 614 printf("[%d] node (%s) returned fail, rolling forward\n", tid, node->name); 615 } 616 } else { 617 node->dagHdr->status = rf_rollBackward; /* never reached commit 618 * barrier */ 619 if (rf_engineDebug || 1) { 620 rf_get_threadid(tid); 621 printf("[%d] node (%s) returned fail, rolling backward\n", tid, node->name); 622 } 623 } 624 break; 625 case rf_undone: 626 /* normal rollBackward case, don't need to do anything */ 627 break; 628 case rf_panic: 629 /* an undo node failed!!! */ 630 printf("UNDO of a node failed!!!/n"); 631 break; 632 default: 633 printf("node finished execution with an illegal status!!!\n"); 634 RF_PANIC(); 635 break; 636 } 637 638 /* enqueue node's succedents (antecedents if rollBackward) for 639 * execution */ 640 PropagateResults(node, context); 641 } 642 643 644 645 /* user context or dag-exec-thread context: 646 * This is the first step in post-processing a newly-completed node. 647 * This routine is called by each node execution function to mark the node 648 * as complete and fire off any successors that have been enabled. 649 */ 650 int 651 rf_FinishNode( 652 RF_DagNode_t * node, 653 int context) 654 { 655 /* as far as I can tell, retcode is not used -wvcii */ 656 int retcode = RF_FALSE; 657 node->dagHdr->numNodesCompleted++; 658 ProcessNode(node, context); 659 660 return (retcode); 661 } 662 663 664 /* user context: 665 * submit dag for execution, return non-zero if we have to wait for completion. 666 * if and only if we return non-zero, we'll cause cbFunc to get invoked with 667 * cbArg when the DAG has completed. 668 * 669 * for now we always return 1. If the DAG does not cause any I/O, then the callback 670 * may get invoked before DispatchDAG returns. There's code in state 5 of ContinueRaidAccess 671 * to handle this. 672 * 673 * All we do here is fire the direct successors of the header node. The 674 * DAG execution thread does the rest of the dag processing. 675 */ 676 int 677 rf_DispatchDAG( 678 RF_DagHeader_t * dag, 679 void (*cbFunc) (void *), 680 void *cbArg) 681 { 682 RF_Raid_t *raidPtr; 683 int tid; 684 685 raidPtr = dag->raidPtr; 686 if (dag->tracerec) { 687 RF_ETIMER_START(dag->tracerec->timer); 688 } 689 if (rf_engineDebug || rf_validateDAGDebug) { 690 if (rf_ValidateDAG(dag)) 691 RF_PANIC(); 692 } 693 if (rf_engineDebug) { 694 rf_get_threadid(tid); 695 printf("[%d] Entering DispatchDAG\n", tid); 696 } 697 raidPtr->dags_in_flight++; /* debug only: blow off proper 698 * locking */ 699 dag->cbFunc = cbFunc; 700 dag->cbArg = cbArg; 701 dag->numNodesCompleted = 0; 702 dag->status = rf_enable; 703 FireNodeArray(dag->numSuccedents, dag->succedents); 704 return (1); 705 } 706 /* dedicated kernel thread: 707 * the thread that handles all DAG node firing. 708 * To minimize locking and unlocking, we grab a copy of the entire node queue and then set the 709 * node queue to NULL before doing any firing of nodes. This way we only have to release the 710 * lock once. Of course, it's probably rare that there's more than one node in the queue at 711 * any one time, but it sometimes happens. 712 * 713 * In the kernel, this thread runs at spl0 and is not swappable. I copied these 714 * characteristics from the aio_completion_thread. 715 */ 716 717 static void 718 DAGExecutionThread(RF_ThreadArg_t arg) 719 { 720 RF_DagNode_t *nd, *local_nq, *term_nq, *fire_nq; 721 RF_Raid_t *raidPtr; 722 int ks, tid; 723 int s; 724 725 raidPtr = (RF_Raid_t *) arg; 726 727 rf_assign_threadid(); 728 if (rf_engineDebug) { 729 rf_get_threadid(tid); 730 printf("[%d] Engine thread is running\n", tid); 731 } 732 #ifndef __NetBSD__ 733 thread = current_thread(); 734 thread_swappable(thread, RF_FALSE); 735 thread->priority = thread->sched_pri = BASEPRI_SYSTEM; 736 s = spl0(); 737 #endif 738 /* XXX what to put here XXX */ 739 740 s = splbio(); 741 742 RF_THREADGROUP_RUNNING(&raidPtr->engine_tg); 743 744 DO_LOCK(raidPtr); 745 while (!raidPtr->shutdown_engine) { 746 747 while (raidPtr->node_queue != NULL) { 748 local_nq = raidPtr->node_queue; 749 fire_nq = NULL; 750 term_nq = NULL; 751 raidPtr->node_queue = NULL; 752 DO_UNLOCK(raidPtr); 753 754 /* first, strip out the terminal nodes */ 755 while (local_nq) { 756 nd = local_nq; 757 local_nq = local_nq->next; 758 switch (nd->dagHdr->status) { 759 case rf_enable: 760 case rf_rollForward: 761 if (nd->numSuccedents == 0) { 762 /* end of the dag, add to 763 * callback list */ 764 nd->next = term_nq; 765 term_nq = nd; 766 } else { 767 /* not the end, add to the 768 * fire queue */ 769 nd->next = fire_nq; 770 fire_nq = nd; 771 } 772 break; 773 case rf_rollBackward: 774 if (nd->numAntecedents == 0) { 775 /* end of the dag, add to the 776 * callback list */ 777 nd->next = term_nq; 778 term_nq = nd; 779 } else { 780 /* not the end, add to the 781 * fire queue */ 782 nd->next = fire_nq; 783 fire_nq = nd; 784 } 785 break; 786 default: 787 RF_PANIC(); 788 break; 789 } 790 } 791 792 /* execute callback of dags which have reached the 793 * terminal node */ 794 while (term_nq) { 795 nd = term_nq; 796 term_nq = term_nq->next; 797 nd->next = NULL; 798 (nd->dagHdr->cbFunc) (nd->dagHdr->cbArg); 799 raidPtr->dags_in_flight--; /* debug only */ 800 } 801 802 /* fire remaining nodes */ 803 FireNodeList(fire_nq); 804 805 DO_LOCK(raidPtr); 806 } 807 while (!raidPtr->shutdown_engine && raidPtr->node_queue == NULL) 808 DO_WAIT(raidPtr); 809 } 810 DO_UNLOCK(raidPtr); 811 812 RF_THREADGROUP_DONE(&raidPtr->engine_tg); 813 #ifdef __NetBSD__ 814 splx(s); 815 kthread_exit(0); 816 #else 817 splx(s); 818 thread_terminate(thread); 819 thread_halt_self(); 820 #endif 821 } 822