1 /* $NetBSD: rf_engine.c,v 1.12 2001/11/13 07:11:14 lukem Exp $ */ 2 /* 3 * Copyright (c) 1995 Carnegie-Mellon University. 4 * All rights reserved. 5 * 6 * Author: William V. Courtright II, Mark Holland, Rachad Youssef 7 * 8 * Permission to use, copy, modify and distribute this software and 9 * its documentation is hereby granted, provided that both the copyright 10 * notice and this permission notice appear in all copies of the 11 * software, derivative works or modified versions, and any portions 12 * thereof, and that both notices appear in supporting documentation. 13 * 14 * CARNEGIE MELLON ALLOWS FREE USE OF THIS SOFTWARE IN ITS "AS IS" 15 * CONDITION. CARNEGIE MELLON DISCLAIMS ANY LIABILITY OF ANY KIND 16 * FOR ANY DAMAGES WHATSOEVER RESULTING FROM THE USE OF THIS SOFTWARE. 17 * 18 * Carnegie Mellon requests users of this software to return to 19 * 20 * Software Distribution Coordinator or Software.Distribution@CS.CMU.EDU 21 * School of Computer Science 22 * Carnegie Mellon University 23 * Pittsburgh PA 15213-3890 24 * 25 * any improvements or extensions that they make and grant Carnegie the 26 * rights to redistribute these changes. 27 */ 28 29 /**************************************************************************** 30 * * 31 * engine.c -- code for DAG execution engine * 32 * * 33 * Modified to work as follows (holland): * 34 * A user-thread calls into DispatchDAG, which fires off the nodes that * 35 * are direct successors to the header node. DispatchDAG then returns, * 36 * and the rest of the I/O continues asynchronously. As each node * 37 * completes, the node execution function calls FinishNode(). FinishNode * 38 * scans the list of successors to the node and increments the antecedent * 39 * counts. Each node that becomes enabled is placed on a central node * 40 * queue. A dedicated dag-execution thread grabs nodes off of this * 41 * queue and fires them. * 42 * * 43 * NULL nodes are never fired. * 44 * * 45 * Terminator nodes are never fired, but rather cause the callback * 46 * associated with the DAG to be invoked. * 47 * * 48 * If a node fails, the dag either rolls forward to the completion or * 49 * rolls back, undoing previously-completed nodes and fails atomically. * 50 * The direction of recovery is determined by the location of the failed * 51 * node in the graph. If the failure occurred before the commit node in * 52 * the graph, backward recovery is used. Otherwise, forward recovery is * 53 * used. * 54 * * 55 ****************************************************************************/ 56 57 #include <sys/cdefs.h> 58 __KERNEL_RCSID(0, "$NetBSD: rf_engine.c,v 1.12 2001/11/13 07:11:14 lukem Exp $"); 59 60 #include "rf_threadstuff.h" 61 62 #include <sys/errno.h> 63 64 #include "rf_dag.h" 65 #include "rf_engine.h" 66 #include "rf_etimer.h" 67 #include "rf_general.h" 68 #include "rf_dagutils.h" 69 #include "rf_shutdown.h" 70 #include "rf_raid.h" 71 72 static void DAGExecutionThread(RF_ThreadArg_t arg); 73 74 #define DO_INIT(_l_,_r_) { \ 75 int _rc; \ 76 _rc = rf_create_managed_mutex(_l_,&(_r_)->node_queue_mutex); \ 77 if (_rc) { \ 78 return(_rc); \ 79 } \ 80 _rc = rf_create_managed_cond(_l_,&(_r_)->node_queue_cond); \ 81 if (_rc) { \ 82 return(_rc); \ 83 } \ 84 } 85 86 /* synchronization primitives for this file. DO_WAIT should be enclosed in a while loop. */ 87 88 /* 89 * XXX Is this spl-ing really necessary? 90 */ 91 #define DO_LOCK(_r_) \ 92 do { \ 93 ks = splbio(); \ 94 RF_LOCK_MUTEX((_r_)->node_queue_mutex); \ 95 } while (0) 96 97 #define DO_UNLOCK(_r_) \ 98 do { \ 99 RF_UNLOCK_MUTEX((_r_)->node_queue_mutex); \ 100 splx(ks); \ 101 } while (0) 102 103 #define DO_WAIT(_r_) \ 104 RF_WAIT_COND((_r_)->node_queue, (_r_)->node_queue_mutex) 105 106 #define DO_SIGNAL(_r_) \ 107 RF_BROADCAST_COND((_r_)->node_queue) /* XXX RF_SIGNAL_COND? */ 108 109 static void rf_ShutdownEngine(void *); 110 111 static void 112 rf_ShutdownEngine(arg) 113 void *arg; 114 { 115 RF_Raid_t *raidPtr; 116 117 raidPtr = (RF_Raid_t *) arg; 118 raidPtr->shutdown_engine = 1; 119 DO_SIGNAL(raidPtr); 120 } 121 122 int 123 rf_ConfigureEngine( 124 RF_ShutdownList_t ** listp, 125 RF_Raid_t * raidPtr, 126 RF_Config_t * cfgPtr) 127 { 128 int rc; 129 130 DO_INIT(listp, raidPtr); 131 132 raidPtr->node_queue = NULL; 133 raidPtr->dags_in_flight = 0; 134 135 rc = rf_init_managed_threadgroup(listp, &raidPtr->engine_tg); 136 if (rc) 137 return (rc); 138 139 /* we create the execution thread only once per system boot. no need 140 * to check return code b/c the kernel panics if it can't create the 141 * thread. */ 142 if (rf_engineDebug) { 143 printf("raid%d: Creating engine thread\n", raidPtr->raidid); 144 } 145 if (RF_CREATE_THREAD(raidPtr->engine_thread, DAGExecutionThread, raidPtr,"raid")) { 146 RF_ERRORMSG("RAIDFRAME: Unable to create engine thread\n"); 147 return (ENOMEM); 148 } 149 if (rf_engineDebug) { 150 printf("raid%d: Created engine thread\n", raidPtr->raidid); 151 } 152 RF_THREADGROUP_STARTED(&raidPtr->engine_tg); 153 /* XXX something is missing here... */ 154 #ifdef debug 155 printf("Skipping the WAIT_START!!\n"); 156 #endif 157 #if 0 158 RF_THREADGROUP_WAIT_START(&raidPtr->engine_tg); 159 #endif 160 /* engine thread is now running and waiting for work */ 161 if (rf_engineDebug) { 162 printf("raid%d: Engine thread running and waiting for events\n", raidPtr->raidid); 163 } 164 rc = rf_ShutdownCreate(listp, rf_ShutdownEngine, raidPtr); 165 if (rc) { 166 RF_ERRORMSG3("Unable to add to shutdown list file %s line %d rc=%d\n", __FILE__, 167 __LINE__, rc); 168 rf_ShutdownEngine(NULL); 169 } 170 return (rc); 171 } 172 173 static int 174 BranchDone(RF_DagNode_t * node) 175 { 176 int i; 177 178 /* return true if forward execution is completed for a node and it's 179 * succedents */ 180 switch (node->status) { 181 case rf_wait: 182 /* should never be called in this state */ 183 RF_PANIC(); 184 break; 185 case rf_fired: 186 /* node is currently executing, so we're not done */ 187 return (RF_FALSE); 188 case rf_good: 189 for (i = 0; i < node->numSuccedents; i++) /* for each succedent */ 190 if (!BranchDone(node->succedents[i])) /* recursively check 191 * branch */ 192 return RF_FALSE; 193 return RF_TRUE; /* node and all succedent branches aren't in 194 * fired state */ 195 break; 196 case rf_bad: 197 /* succedents can't fire */ 198 return (RF_TRUE); 199 case rf_recover: 200 /* should never be called in this state */ 201 RF_PANIC(); 202 break; 203 case rf_undone: 204 case rf_panic: 205 /* XXX need to fix this case */ 206 /* for now, assume that we're done */ 207 return (RF_TRUE); 208 break; 209 default: 210 /* illegal node status */ 211 RF_PANIC(); 212 break; 213 } 214 } 215 216 static int 217 NodeReady(RF_DagNode_t * node) 218 { 219 int ready; 220 221 switch (node->dagHdr->status) { 222 case rf_enable: 223 case rf_rollForward: 224 if ((node->status == rf_wait) && (node->numAntecedents == node->numAntDone)) 225 ready = RF_TRUE; 226 else 227 ready = RF_FALSE; 228 break; 229 case rf_rollBackward: 230 RF_ASSERT(node->numSuccDone <= node->numSuccedents); 231 RF_ASSERT(node->numSuccFired <= node->numSuccedents); 232 RF_ASSERT(node->numSuccFired <= node->numSuccDone); 233 if ((node->status == rf_good) && (node->numSuccDone == node->numSuccedents)) 234 ready = RF_TRUE; 235 else 236 ready = RF_FALSE; 237 break; 238 default: 239 printf("Execution engine found illegal DAG status in NodeReady\n"); 240 RF_PANIC(); 241 break; 242 } 243 244 return (ready); 245 } 246 247 248 249 /* user context and dag-exec-thread context: 250 * Fire a node. The node's status field determines which function, do or undo, 251 * to be fired. 252 * This routine assumes that the node's status field has alread been set to 253 * "fired" or "recover" to indicate the direction of execution. 254 */ 255 static void 256 FireNode(RF_DagNode_t * node) 257 { 258 switch (node->status) { 259 case rf_fired: 260 /* fire the do function of a node */ 261 if (rf_engineDebug) { 262 printf("raid%d: Firing node 0x%lx (%s)\n", 263 node->dagHdr->raidPtr->raidid, 264 (unsigned long) node, node->name); 265 } 266 if (node->flags & RF_DAGNODE_FLAG_YIELD) { 267 #if defined(__NetBSD__) && defined(_KERNEL) 268 /* thread_block(); */ 269 /* printf("Need to block the thread here...\n"); */ 270 /* XXX thread_block is actually mentioned in 271 * /usr/include/vm/vm_extern.h */ 272 #else 273 thread_block(); 274 #endif 275 } 276 (*(node->doFunc)) (node); 277 break; 278 case rf_recover: 279 /* fire the undo function of a node */ 280 if (rf_engineDebug) { 281 printf("raid%d: Firing (undo) node 0x%lx (%s)\n", 282 node->dagHdr->raidPtr->raidid, 283 (unsigned long) node, node->name); 284 } 285 if (node->flags & RF_DAGNODE_FLAG_YIELD) 286 #if defined(__NetBSD__) && defined(_KERNEL) 287 /* thread_block(); */ 288 /* printf("Need to block the thread here...\n"); */ 289 /* XXX thread_block is actually mentioned in 290 * /usr/include/vm/vm_extern.h */ 291 #else 292 thread_block(); 293 #endif 294 (*(node->undoFunc)) (node); 295 break; 296 default: 297 RF_PANIC(); 298 break; 299 } 300 } 301 302 303 304 /* user context: 305 * Attempt to fire each node in a linear array. 306 * The entire list is fired atomically. 307 */ 308 static void 309 FireNodeArray( 310 int numNodes, 311 RF_DagNode_t ** nodeList) 312 { 313 RF_DagStatus_t dstat; 314 RF_DagNode_t *node; 315 int i, j; 316 317 /* first, mark all nodes which are ready to be fired */ 318 for (i = 0; i < numNodes; i++) { 319 node = nodeList[i]; 320 dstat = node->dagHdr->status; 321 RF_ASSERT((node->status == rf_wait) || (node->status == rf_good)); 322 if (NodeReady(node)) { 323 if ((dstat == rf_enable) || (dstat == rf_rollForward)) { 324 RF_ASSERT(node->status == rf_wait); 325 if (node->commitNode) 326 node->dagHdr->numCommits++; 327 node->status = rf_fired; 328 for (j = 0; j < node->numAntecedents; j++) 329 node->antecedents[j]->numSuccFired++; 330 } else { 331 RF_ASSERT(dstat == rf_rollBackward); 332 RF_ASSERT(node->status == rf_good); 333 RF_ASSERT(node->commitNode == RF_FALSE); /* only one commit node 334 * per graph */ 335 node->status = rf_recover; 336 } 337 } 338 } 339 /* now, fire the nodes */ 340 for (i = 0; i < numNodes; i++) { 341 if ((nodeList[i]->status == rf_fired) || (nodeList[i]->status == rf_recover)) 342 FireNode(nodeList[i]); 343 } 344 } 345 346 347 /* user context: 348 * Attempt to fire each node in a linked list. 349 * The entire list is fired atomically. 350 */ 351 static void 352 FireNodeList(RF_DagNode_t * nodeList) 353 { 354 RF_DagNode_t *node, *next; 355 RF_DagStatus_t dstat; 356 int j; 357 358 if (nodeList) { 359 /* first, mark all nodes which are ready to be fired */ 360 for (node = nodeList; node; node = next) { 361 next = node->next; 362 dstat = node->dagHdr->status; 363 RF_ASSERT((node->status == rf_wait) || (node->status == rf_good)); 364 if (NodeReady(node)) { 365 if ((dstat == rf_enable) || (dstat == rf_rollForward)) { 366 RF_ASSERT(node->status == rf_wait); 367 if (node->commitNode) 368 node->dagHdr->numCommits++; 369 node->status = rf_fired; 370 for (j = 0; j < node->numAntecedents; j++) 371 node->antecedents[j]->numSuccFired++; 372 } else { 373 RF_ASSERT(dstat == rf_rollBackward); 374 RF_ASSERT(node->status == rf_good); 375 RF_ASSERT(node->commitNode == RF_FALSE); /* only one commit node 376 * per graph */ 377 node->status = rf_recover; 378 } 379 } 380 } 381 /* now, fire the nodes */ 382 for (node = nodeList; node; node = next) { 383 next = node->next; 384 if ((node->status == rf_fired) || (node->status == rf_recover)) 385 FireNode(node); 386 } 387 } 388 } 389 /* interrupt context: 390 * for each succedent 391 * propagate required results from node to succedent 392 * increment succedent's numAntDone 393 * place newly-enable nodes on node queue for firing 394 * 395 * To save context switches, we don't place NIL nodes on the node queue, 396 * but rather just process them as if they had fired. Note that NIL nodes 397 * that are the direct successors of the header will actually get fired by 398 * DispatchDAG, which is fine because no context switches are involved. 399 * 400 * Important: when running at user level, this can be called by any 401 * disk thread, and so the increment and check of the antecedent count 402 * must be locked. I used the node queue mutex and locked down the 403 * entire function, but this is certainly overkill. 404 */ 405 static void 406 PropagateResults( 407 RF_DagNode_t * node, 408 int context) 409 { 410 RF_DagNode_t *s, *a; 411 RF_Raid_t *raidPtr; 412 int i, ks; 413 RF_DagNode_t *finishlist = NULL; /* a list of NIL nodes to be 414 * finished */ 415 RF_DagNode_t *skiplist = NULL; /* list of nodes with failed truedata 416 * antecedents */ 417 RF_DagNode_t *firelist = NULL; /* a list of nodes to be fired */ 418 RF_DagNode_t *q = NULL, *qh = NULL, *next; 419 int j, skipNode; 420 421 raidPtr = node->dagHdr->raidPtr; 422 423 DO_LOCK(raidPtr); 424 425 /* debug - validate fire counts */ 426 for (i = 0; i < node->numAntecedents; i++) { 427 a = *(node->antecedents + i); 428 RF_ASSERT(a->numSuccFired >= a->numSuccDone); 429 RF_ASSERT(a->numSuccFired <= a->numSuccedents); 430 a->numSuccDone++; 431 } 432 433 switch (node->dagHdr->status) { 434 case rf_enable: 435 case rf_rollForward: 436 for (i = 0; i < node->numSuccedents; i++) { 437 s = *(node->succedents + i); 438 RF_ASSERT(s->status == rf_wait); 439 (s->numAntDone)++; 440 if (s->numAntDone == s->numAntecedents) { 441 /* look for NIL nodes */ 442 if (s->doFunc == rf_NullNodeFunc) { 443 /* don't fire NIL nodes, just process 444 * them */ 445 s->next = finishlist; 446 finishlist = s; 447 } else { 448 /* look to see if the node is to be 449 * skipped */ 450 skipNode = RF_FALSE; 451 for (j = 0; j < s->numAntecedents; j++) 452 if ((s->antType[j] == rf_trueData) && (s->antecedents[j]->status == rf_bad)) 453 skipNode = RF_TRUE; 454 if (skipNode) { 455 /* this node has one or more 456 * failed true data 457 * dependencies, so skip it */ 458 s->next = skiplist; 459 skiplist = s; 460 } else 461 /* add s to list of nodes (q) 462 * to execute */ 463 if (context != RF_INTR_CONTEXT) { 464 /* we only have to 465 * enqueue if we're at 466 * intr context */ 467 s->next = firelist; /* put node on a list to 468 * be fired after we 469 * unlock */ 470 firelist = s; 471 } else { /* enqueue the node for 472 * the dag exec thread 473 * to fire */ 474 RF_ASSERT(NodeReady(s)); 475 if (q) { 476 q->next = s; 477 q = s; 478 } else { 479 qh = q = s; 480 qh->next = NULL; 481 } 482 } 483 } 484 } 485 } 486 487 if (q) { 488 /* xfer our local list of nodes to the node queue */ 489 q->next = raidPtr->node_queue; 490 raidPtr->node_queue = qh; 491 DO_SIGNAL(raidPtr); 492 } 493 DO_UNLOCK(raidPtr); 494 495 for (; skiplist; skiplist = next) { 496 next = skiplist->next; 497 skiplist->status = rf_skipped; 498 for (i = 0; i < skiplist->numAntecedents; i++) { 499 skiplist->antecedents[i]->numSuccFired++; 500 } 501 if (skiplist->commitNode) { 502 skiplist->dagHdr->numCommits++; 503 } 504 rf_FinishNode(skiplist, context); 505 } 506 for (; finishlist; finishlist = next) { 507 /* NIL nodes: no need to fire them */ 508 next = finishlist->next; 509 finishlist->status = rf_good; 510 for (i = 0; i < finishlist->numAntecedents; i++) { 511 finishlist->antecedents[i]->numSuccFired++; 512 } 513 if (finishlist->commitNode) 514 finishlist->dagHdr->numCommits++; 515 /* 516 * Okay, here we're calling rf_FinishNode() on nodes that 517 * have the null function as their work proc. Such a node 518 * could be the terminal node in a DAG. If so, it will 519 * cause the DAG to complete, which will in turn free 520 * memory used by the DAG, which includes the node in 521 * question. Thus, we must avoid referencing the node 522 * at all after calling rf_FinishNode() on it. 523 */ 524 rf_FinishNode(finishlist, context); /* recursive call */ 525 } 526 /* fire all nodes in firelist */ 527 FireNodeList(firelist); 528 break; 529 530 case rf_rollBackward: 531 for (i = 0; i < node->numAntecedents; i++) { 532 a = *(node->antecedents + i); 533 RF_ASSERT(a->status == rf_good); 534 RF_ASSERT(a->numSuccDone <= a->numSuccedents); 535 RF_ASSERT(a->numSuccDone <= a->numSuccFired); 536 537 if (a->numSuccDone == a->numSuccFired) { 538 if (a->undoFunc == rf_NullNodeFunc) { 539 /* don't fire NIL nodes, just process 540 * them */ 541 a->next = finishlist; 542 finishlist = a; 543 } else { 544 if (context != RF_INTR_CONTEXT) { 545 /* we only have to enqueue if 546 * we're at intr context */ 547 a->next = firelist; /* put node on a list to 548 * be fired after we 549 * unlock */ 550 firelist = a; 551 } else { /* enqueue the node for 552 * the dag exec thread 553 * to fire */ 554 RF_ASSERT(NodeReady(a)); 555 if (q) { 556 q->next = a; 557 q = a; 558 } else { 559 qh = q = a; 560 qh->next = NULL; 561 } 562 } 563 } 564 } 565 } 566 if (q) { 567 /* xfer our local list of nodes to the node queue */ 568 q->next = raidPtr->node_queue; 569 raidPtr->node_queue = qh; 570 DO_SIGNAL(raidPtr); 571 } 572 DO_UNLOCK(raidPtr); 573 for (; finishlist; finishlist = next) { /* NIL nodes: no need to 574 * fire them */ 575 next = finishlist->next; 576 finishlist->status = rf_good; 577 /* 578 * Okay, here we're calling rf_FinishNode() on nodes that 579 * have the null function as their work proc. Such a node 580 * could be the first node in a DAG. If so, it will 581 * cause the DAG to complete, which will in turn free 582 * memory used by the DAG, which includes the node in 583 * question. Thus, we must avoid referencing the node 584 * at all after calling rf_FinishNode() on it. 585 */ 586 rf_FinishNode(finishlist, context); /* recursive call */ 587 } 588 /* fire all nodes in firelist */ 589 FireNodeList(firelist); 590 591 break; 592 default: 593 printf("Engine found illegal DAG status in PropagateResults()\n"); 594 RF_PANIC(); 595 break; 596 } 597 } 598 599 600 601 /* 602 * Process a fired node which has completed 603 */ 604 static void 605 ProcessNode( 606 RF_DagNode_t * node, 607 int context) 608 { 609 RF_Raid_t *raidPtr; 610 611 raidPtr = node->dagHdr->raidPtr; 612 613 switch (node->status) { 614 case rf_good: 615 /* normal case, don't need to do anything */ 616 break; 617 case rf_bad: 618 if ((node->dagHdr->numCommits > 0) || (node->dagHdr->numCommitNodes == 0)) { 619 node->dagHdr->status = rf_rollForward; /* crossed commit 620 * barrier */ 621 if (rf_engineDebug || 1) { 622 printf("raid%d: node (%s) returned fail, rolling forward\n", raidPtr->raidid, node->name); 623 } 624 } else { 625 node->dagHdr->status = rf_rollBackward; /* never reached commit 626 * barrier */ 627 if (rf_engineDebug || 1) { 628 printf("raid%d: node (%s) returned fail, rolling backward\n", raidPtr->raidid, node->name); 629 } 630 } 631 break; 632 case rf_undone: 633 /* normal rollBackward case, don't need to do anything */ 634 break; 635 case rf_panic: 636 /* an undo node failed!!! */ 637 printf("UNDO of a node failed!!!/n"); 638 break; 639 default: 640 printf("node finished execution with an illegal status!!!\n"); 641 RF_PANIC(); 642 break; 643 } 644 645 /* enqueue node's succedents (antecedents if rollBackward) for 646 * execution */ 647 PropagateResults(node, context); 648 } 649 650 651 652 /* user context or dag-exec-thread context: 653 * This is the first step in post-processing a newly-completed node. 654 * This routine is called by each node execution function to mark the node 655 * as complete and fire off any successors that have been enabled. 656 */ 657 int 658 rf_FinishNode( 659 RF_DagNode_t * node, 660 int context) 661 { 662 /* as far as I can tell, retcode is not used -wvcii */ 663 int retcode = RF_FALSE; 664 node->dagHdr->numNodesCompleted++; 665 ProcessNode(node, context); 666 667 return (retcode); 668 } 669 670 671 /* user context: 672 * submit dag for execution, return non-zero if we have to wait for completion. 673 * if and only if we return non-zero, we'll cause cbFunc to get invoked with 674 * cbArg when the DAG has completed. 675 * 676 * for now we always return 1. If the DAG does not cause any I/O, then the callback 677 * may get invoked before DispatchDAG returns. There's code in state 5 of ContinueRaidAccess 678 * to handle this. 679 * 680 * All we do here is fire the direct successors of the header node. The 681 * DAG execution thread does the rest of the dag processing. 682 */ 683 int 684 rf_DispatchDAG( 685 RF_DagHeader_t * dag, 686 void (*cbFunc) (void *), 687 void *cbArg) 688 { 689 RF_Raid_t *raidPtr; 690 691 raidPtr = dag->raidPtr; 692 if (dag->tracerec) { 693 RF_ETIMER_START(dag->tracerec->timer); 694 } 695 if (rf_engineDebug || rf_validateDAGDebug) { 696 if (rf_ValidateDAG(dag)) 697 RF_PANIC(); 698 } 699 if (rf_engineDebug) { 700 printf("raid%d: Entering DispatchDAG\n", raidPtr->raidid); 701 } 702 raidPtr->dags_in_flight++; /* debug only: blow off proper 703 * locking */ 704 dag->cbFunc = cbFunc; 705 dag->cbArg = cbArg; 706 dag->numNodesCompleted = 0; 707 dag->status = rf_enable; 708 FireNodeArray(dag->numSuccedents, dag->succedents); 709 return (1); 710 } 711 /* dedicated kernel thread: 712 * the thread that handles all DAG node firing. 713 * To minimize locking and unlocking, we grab a copy of the entire node queue and then set the 714 * node queue to NULL before doing any firing of nodes. This way we only have to release the 715 * lock once. Of course, it's probably rare that there's more than one node in the queue at 716 * any one time, but it sometimes happens. 717 * 718 * In the kernel, this thread runs at spl0 and is not swappable. I copied these 719 * characteristics from the aio_completion_thread. 720 */ 721 722 static void 723 DAGExecutionThread(RF_ThreadArg_t arg) 724 { 725 RF_DagNode_t *nd, *local_nq, *term_nq, *fire_nq; 726 RF_Raid_t *raidPtr; 727 int ks; 728 int s; 729 730 raidPtr = (RF_Raid_t *) arg; 731 732 if (rf_engineDebug) { 733 printf("raid%d: Engine thread is running\n", raidPtr->raidid); 734 } 735 736 s = splbio(); 737 738 RF_THREADGROUP_RUNNING(&raidPtr->engine_tg); 739 740 DO_LOCK(raidPtr); 741 while (!raidPtr->shutdown_engine) { 742 743 while (raidPtr->node_queue != NULL) { 744 local_nq = raidPtr->node_queue; 745 fire_nq = NULL; 746 term_nq = NULL; 747 raidPtr->node_queue = NULL; 748 DO_UNLOCK(raidPtr); 749 750 /* first, strip out the terminal nodes */ 751 while (local_nq) { 752 nd = local_nq; 753 local_nq = local_nq->next; 754 switch (nd->dagHdr->status) { 755 case rf_enable: 756 case rf_rollForward: 757 if (nd->numSuccedents == 0) { 758 /* end of the dag, add to 759 * callback list */ 760 nd->next = term_nq; 761 term_nq = nd; 762 } else { 763 /* not the end, add to the 764 * fire queue */ 765 nd->next = fire_nq; 766 fire_nq = nd; 767 } 768 break; 769 case rf_rollBackward: 770 if (nd->numAntecedents == 0) { 771 /* end of the dag, add to the 772 * callback list */ 773 nd->next = term_nq; 774 term_nq = nd; 775 } else { 776 /* not the end, add to the 777 * fire queue */ 778 nd->next = fire_nq; 779 fire_nq = nd; 780 } 781 break; 782 default: 783 RF_PANIC(); 784 break; 785 } 786 } 787 788 /* execute callback of dags which have reached the 789 * terminal node */ 790 while (term_nq) { 791 nd = term_nq; 792 term_nq = term_nq->next; 793 nd->next = NULL; 794 (nd->dagHdr->cbFunc) (nd->dagHdr->cbArg); 795 raidPtr->dags_in_flight--; /* debug only */ 796 } 797 798 /* fire remaining nodes */ 799 FireNodeList(fire_nq); 800 801 DO_LOCK(raidPtr); 802 } 803 while (!raidPtr->shutdown_engine && raidPtr->node_queue == NULL) 804 DO_WAIT(raidPtr); 805 } 806 DO_UNLOCK(raidPtr); 807 808 RF_THREADGROUP_DONE(&raidPtr->engine_tg); 809 810 splx(s); 811 kthread_exit(0); 812 } 813