1 /* $NetBSD: rf_engine.c,v 1.2 1998/11/13 11:48:26 simonb Exp $ */ 2 /* 3 * Copyright (c) 1995 Carnegie-Mellon University. 4 * All rights reserved. 5 * 6 * Author: William V. Courtright II, Mark Holland, Rachad Youssef 7 * 8 * Permission to use, copy, modify and distribute this software and 9 * its documentation is hereby granted, provided that both the copyright 10 * notice and this permission notice appear in all copies of the 11 * software, derivative works or modified versions, and any portions 12 * thereof, and that both notices appear in supporting documentation. 13 * 14 * CARNEGIE MELLON ALLOWS FREE USE OF THIS SOFTWARE IN ITS "AS IS" 15 * CONDITION. CARNEGIE MELLON DISCLAIMS ANY LIABILITY OF ANY KIND 16 * FOR ANY DAMAGES WHATSOEVER RESULTING FROM THE USE OF THIS SOFTWARE. 17 * 18 * Carnegie Mellon requests users of this software to return to 19 * 20 * Software Distribution Coordinator or Software.Distribution@CS.CMU.EDU 21 * School of Computer Science 22 * Carnegie Mellon University 23 * Pittsburgh PA 15213-3890 24 * 25 * any improvements or extensions that they make and grant Carnegie the 26 * rights to redistribute these changes. 27 */ 28 29 /**************************************************************************** 30 * * 31 * engine.c -- code for DAG execution engine * 32 * * 33 * Modified to work as follows (holland): * 34 * A user-thread calls into DispatchDAG, which fires off the nodes that * 35 * are direct successors to the header node. DispatchDAG then returns, * 36 * and the rest of the I/O continues asynchronously. As each node * 37 * completes, the node execution function calls FinishNode(). FinishNode * 38 * scans the list of successors to the node and increments the antecedent * 39 * counts. Each node that becomes enabled is placed on a central node * 40 * queue. A dedicated dag-execution thread grabs nodes off of this * 41 * queue and fires them. * 42 * * 43 * NULL nodes are never fired. * 44 * * 45 * Terminator nodes are never fired, but rather cause the callback * 46 * associated with the DAG to be invoked. * 47 * * 48 * If a node fails, the dag either rolls forward to the completion or * 49 * rolls back, undoing previously-completed nodes and fails atomically. * 50 * The direction of recovery is determined by the location of the failed * 51 * node in the graph. If the failure occured before the commit node in * 52 * the graph, backward recovery is used. Otherwise, forward recovery is * 53 * used. * 54 * * 55 ****************************************************************************/ 56 57 /* 58 * : 59 * 60 * Log: rf_engine.c,v 61 * Revision 1.56 1996/07/28 20:31:39 jimz 62 * i386netbsd port 63 * true/false fixup 64 * 65 * Revision 1.55 1996/07/22 19:52:16 jimz 66 * switched node params to RF_DagParam_t, a union of 67 * a 64-bit int and a void *, for better portability 68 * attempted hpux port, but failed partway through for 69 * lack of a single C compiler capable of compiling all 70 * source files 71 * 72 * Revision 1.54 1996/07/17 21:00:58 jimz 73 * clean up timer interface, tracing 74 * 75 * Revision 1.53 1996/07/15 17:22:18 jimz 76 * nit-pick code cleanup 77 * resolve stdlib problems on DEC OSF 78 * 79 * Revision 1.52 1996/06/17 03:17:08 jimz 80 * correctly shut down engine thread in kernel 81 * 82 * Revision 1.51 1996/06/14 15:02:10 jimz 83 * make new engine code happy in simulator 84 * 85 * Revision 1.50 1996/06/14 14:19:48 jimz 86 * use diskgroup to control engine thread, make all engine-thread-related 87 * stuff per-array 88 * 89 * Revision 1.49 1996/06/10 11:55:47 jimz 90 * Straightened out some per-array/not-per-array distinctions, fixed 91 * a couple bugs related to confusion. Added shutdown lists. Removed 92 * layout shutdown function (now subsumed by shutdown lists). 93 * 94 * Revision 1.48 1996/06/09 02:36:46 jimz 95 * lots of little crufty cleanup- fixup whitespace 96 * issues, comment #ifdefs, improve typing in some 97 * places (esp size-related) 98 * 99 * Revision 1.47 1996/06/06 01:23:23 jimz 100 * fix bug in node traversal when firing multiple nodes simultaneously 101 * 102 * Revision 1.46 1996/06/05 18:06:02 jimz 103 * Major code cleanup. The Great Renaming is now done. 104 * Better modularity. Better typing. Fixed a bunch of 105 * synchronization bugs. Made a lot of global stuff 106 * per-desc or per-array. Removed dead code. 107 * 108 * Revision 1.45 1996/05/30 12:59:18 jimz 109 * make etimer happier, more portable 110 * 111 * Revision 1.44 1996/05/30 11:29:41 jimz 112 * Numerous bug fixes. Stripe lock release code disagreed with the taking code 113 * about when stripes should be locked (I made it consistent: no parity, no lock) 114 * There was a lot of extra serialization of I/Os which I've removed- a lot of 115 * it was to calculate values for the cache code, which is no longer with us. 116 * More types, function, macro cleanup. Added code to properly quiesce the array 117 * on shutdown. Made a lot of stuff array-specific which was (bogusly) general 118 * before. Fixed memory allocation, freeing bugs. 119 * 120 * Revision 1.43 1996/05/27 18:56:37 jimz 121 * more code cleanup 122 * better typing 123 * compiles in all 3 environments 124 * 125 * Revision 1.42 1996/05/24 22:17:04 jimz 126 * continue code + namespace cleanup 127 * typed a bunch of flags 128 * 129 * Revision 1.41 1996/05/24 04:28:55 jimz 130 * release cleanup ckpt 131 * 132 * Revision 1.40 1996/05/23 00:33:23 jimz 133 * code cleanup: move all debug decls to rf_options.c, all extern 134 * debug decls to rf_options.h, all debug vars preceded by rf_ 135 * 136 * Revision 1.39 1996/05/20 16:15:17 jimz 137 * switch to rf_{mutex,cond}_{init,destroy} 138 * 139 * Revision 1.38 1996/05/18 20:09:54 jimz 140 * bit of cleanup to compile cleanly in kernel, once again 141 * 142 * Revision 1.37 1996/05/18 19:51:34 jimz 143 * major code cleanup- fix syntax, make some types consistent, 144 * add prototypes, clean out dead code, et cetera 145 * 146 * Revision 1.36 1996/05/15 20:24:19 wvcii 147 * fixed syntax bug in SIMULATE clause above ProcessNode 148 * 149 * Revision 1.35 1996/05/08 21:01:24 jimz 150 * fixed up enum type names that were conflicting with other 151 * enums and function names (ie, "panic") 152 * future naming trends will be towards RF_ and rf_ for 153 * everything raidframe-related 154 * 155 * Revision 1.34 1996/05/08 15:25:28 wvcii 156 * eliminated dead code 157 * merged common cases (sim/user/kernel) 158 * entire node lists (arrays) now fired atomically 159 * reordered source code for readability 160 * beefed-up & corrected comments 161 * 162 * Revision 1.33 1996/05/07 19:39:40 jimz 163 * 1. fixed problems in PropogateResults() with nodes being referenced 164 * after they were no longer valid 165 * 2. fixed problems in PropogateResults() with the node list being 166 * incorrectly threaded 167 * 168 * Revision 1.32 1996/05/07 19:03:56 wvcii 169 * in PropagateResults, fixed a bug in the rollBackward case: 170 * node data is copied before the call to FinishNode which 171 * frees the node and destroys its data. 172 * 173 * Revision 1.31 1996/05/07 17:45:17 jimz 174 * remove old #if 0 code from PropogateResults() (was kept in 175 * previous version for archival purposes (rcsdiff)) 176 * 177 * Revision 1.30 1996/05/07 17:44:19 jimz 178 * fix threading of nodes to be fired in PropagateResults() 179 * fix iteration through skiplist in PropagateResults() 180 * fix incorrect accesses to freed memory (dereferencing a 181 * node that was freed by the action of calling FinishNode() 182 * on it, which in turn completed its DAG) in PropagateResults() 183 * 184 * Revision 1.29 1996/05/02 15:04:15 wvcii 185 * fixed bad array index in PropagateResults 186 * 187 * Revision 1.28 1995/12/12 18:10:06 jimz 188 * MIN -> RF_MIN, MAX -> RF_MAX, ASSERT -> RF_ASSERT 189 * fix 80-column brain damage in comments 190 * 191 * Revision 1.27 1995/12/08 15:07:03 arw 192 * cache code cleanup 193 * 194 * Revision 1.26 1995/11/07 16:18:01 wvcii 195 * numerous changes associated with roll-away error recovery 196 * when a node fails, dag enters rollForward or rollBackward state 197 * 198 * Revision 1.25 1995/09/06 19:27:17 wvcii 199 * added debug vars enableRollAway and debugRecovery 200 * 201 */ 202 203 #ifdef _KERNEL 204 #define KERNEL 205 #endif 206 207 #include "rf_threadstuff.h" 208 209 #ifndef KERNEL 210 #include <stdio.h> 211 #include <stdlib.h> 212 #endif /* !KERNEL */ 213 214 #include <sys/errno.h> 215 216 #include "rf_dag.h" 217 #include "rf_engine.h" 218 #include "rf_threadid.h" 219 #include "rf_etimer.h" 220 #include "rf_general.h" 221 #include "rf_dagutils.h" 222 #include "rf_shutdown.h" 223 #include "rf_raid.h" 224 225 #ifndef SIMULATE 226 static void DAGExecutionThread(RF_ThreadArg_t arg); 227 #endif /* !SIMULATE */ 228 229 #define DO_INIT(_l_,_r_) { \ 230 int _rc; \ 231 _rc = rf_create_managed_mutex(_l_,&(_r_)->node_queue_mutex); \ 232 if (_rc) { \ 233 return(_rc); \ 234 } \ 235 _rc = rf_create_managed_cond(_l_,&(_r_)->node_queue_cond); \ 236 if (_rc) { \ 237 return(_rc); \ 238 } \ 239 } 240 241 /* synchronization primitives for this file. DO_WAIT should be enclosed in a while loop. */ 242 #ifndef KERNEL 243 244 #define DO_LOCK(_r_) RF_LOCK_MUTEX((_r_)->node_queue_mutex) 245 #define DO_UNLOCK(_r_) RF_UNLOCK_MUTEX((_r_)->node_queue_mutex) 246 #define DO_WAIT(_r_) RF_WAIT_COND((_r_)->node_queue_cond, (_r_)->node_queue_mutex) 247 #define DO_SIGNAL(_r_) RF_SIGNAL_COND((_r_)->node_queue_cond) 248 249 #else /* !KERNEL */ 250 251 /* 252 * XXX Is this spl-ing really necessary? 253 */ 254 #define DO_LOCK(_r_) { ks = splbio(); RF_LOCK_MUTEX((_r_)->node_queue_mutex); } 255 #define DO_UNLOCK(_r_) { RF_UNLOCK_MUTEX((_r_)->node_queue_mutex); splx(ks); } 256 #ifndef __NetBSD__ 257 #define DO_WAIT(_r_) mpsleep(&(_r_)->node_queue, PZERO, "raidframe nq", 0, (void *) simple_lock_addr((_r_)->node_queue_mutex), MS_LOCK_SIMPLE) 258 #else 259 #define DO_WAIT(_r_) tsleep(&(_r_)->node_queue, PRIBIO | PCATCH, "raidframe nq",0) 260 #endif 261 #define DO_SIGNAL(_r_) wakeup(&(_r_)->node_queue) 262 263 #endif /* !KERNEL */ 264 265 static void rf_ShutdownEngine(void *); 266 267 static void rf_ShutdownEngine(arg) 268 void *arg; 269 { 270 RF_Raid_t *raidPtr; 271 272 raidPtr = (RF_Raid_t *)arg; 273 #ifndef SIMULATE 274 raidPtr->shutdown_engine = 1; 275 DO_SIGNAL(raidPtr); 276 /* XXX something is missing here... */ 277 #ifdef DEBUG 278 printf("IGNORING WAIT_STOP\n"); 279 #endif 280 #if 0 281 RF_THREADGROUP_WAIT_STOP(&raidPtr->engine_tg); 282 #endif 283 #endif /* !SIMULATE */ 284 } 285 286 int rf_ConfigureEngine( 287 RF_ShutdownList_t **listp, 288 RF_Raid_t *raidPtr, 289 RF_Config_t *cfgPtr) 290 { 291 int rc, tid=0; 292 293 if (rf_engineDebug) { 294 rf_get_threadid(tid); 295 } 296 297 DO_INIT(listp,raidPtr); 298 299 raidPtr->node_queue = NULL; 300 raidPtr->dags_in_flight = 0; 301 302 #ifndef SIMULATE 303 rc = rf_init_managed_threadgroup(listp, &raidPtr->engine_tg); 304 if (rc) 305 return(rc); 306 307 /* we create the execution thread only once per system boot. 308 * no need to check return code b/c the kernel panics if it can't create the thread. 309 */ 310 if (rf_engineDebug) { 311 printf("[%d] Creating engine thread\n", tid); 312 } 313 314 if (RF_CREATE_THREAD(raidPtr->engine_thread, DAGExecutionThread, raidPtr)) { 315 RF_ERRORMSG("RAIDFRAME: Unable to create engine thread\n"); 316 return(ENOMEM); 317 } 318 if (rf_engineDebug) { 319 printf("[%d] Created engine thread\n", tid); 320 } 321 RF_THREADGROUP_STARTED(&raidPtr->engine_tg); 322 /* XXX something is missing here... */ 323 #ifdef debug 324 printf("Skipping the WAIT_START!!\n"); 325 #endif 326 #if 0 327 RF_THREADGROUP_WAIT_START(&raidPtr->engine_tg); 328 #endif 329 /* engine thread is now running and waiting for work */ 330 if (rf_engineDebug) { 331 printf("[%d] Engine thread running and waiting for events\n", tid); 332 } 333 #endif /* !SIMULATE */ 334 335 rc = rf_ShutdownCreate(listp, rf_ShutdownEngine, raidPtr); 336 if (rc) { 337 RF_ERRORMSG3("Unable to add to shutdown list file %s line %d rc=%d\n", __FILE__, 338 __LINE__, rc); 339 rf_ShutdownEngine(NULL); 340 } 341 342 return(rc); 343 } 344 345 static int BranchDone(RF_DagNode_t *node) 346 { 347 int i; 348 349 /* return true if forward execution is completed for a node and it's succedents */ 350 switch (node->status) { 351 case rf_wait : 352 /* should never be called in this state */ 353 RF_PANIC(); 354 break; 355 case rf_fired : 356 /* node is currently executing, so we're not done */ 357 return(RF_FALSE); 358 case rf_good : 359 for (i = 0; i < node->numSuccedents; i++) /* for each succedent */ 360 if (!BranchDone(node->succedents[i])) /* recursively check branch */ 361 return RF_FALSE; 362 return RF_TRUE; /* node and all succedent branches aren't in fired state */ 363 break; 364 case rf_bad : 365 /* succedents can't fire */ 366 return(RF_TRUE); 367 case rf_recover : 368 /* should never be called in this state */ 369 RF_PANIC(); 370 break; 371 case rf_undone : 372 case rf_panic : 373 /* XXX need to fix this case */ 374 /* for now, assume that we're done */ 375 return(RF_TRUE); 376 break; 377 default : 378 /* illegal node status */ 379 RF_PANIC(); 380 break; 381 } 382 } 383 384 #ifdef SIMULATE 385 /* this is only ifdef SIMULATE because nothing else needs it */ 386 /* recursively determine if a DAG has completed execution */ 387 static int DAGDone(RF_DagHeader_t *dag) 388 { 389 int i; 390 391 for (i = 0; i < dag->numSuccedents; i++) 392 if (!BranchDone(dag->succedents[i])) 393 return RF_FALSE; 394 return RF_TRUE; 395 } 396 #endif /* SIMULATE */ 397 398 static int NodeReady(RF_DagNode_t *node) 399 { 400 int ready; 401 402 switch (node->dagHdr->status) { 403 case rf_enable : 404 case rf_rollForward : 405 if ((node->status == rf_wait) && (node->numAntecedents == node->numAntDone)) 406 ready = RF_TRUE; 407 else 408 ready = RF_FALSE; 409 break; 410 case rf_rollBackward : 411 RF_ASSERT(node->numSuccDone <= node->numSuccedents); 412 RF_ASSERT(node->numSuccFired <= node->numSuccedents); 413 RF_ASSERT(node->numSuccFired <= node->numSuccDone); 414 if ((node->status == rf_good) && (node->numSuccDone == node->numSuccedents)) 415 ready = RF_TRUE; 416 else 417 ready = RF_FALSE; 418 break; 419 default : 420 printf("Execution engine found illegal DAG status in NodeReady\n"); 421 RF_PANIC(); 422 break; 423 } 424 425 return(ready); 426 } 427 428 429 430 /* user context and dag-exec-thread context: 431 * Fire a node. The node's status field determines which function, do or undo, 432 * to be fired. 433 * This routine assumes that the node's status field has alread been set to 434 * "fired" or "recover" to indicate the direction of execution. 435 */ 436 static void FireNode(RF_DagNode_t *node) 437 { 438 int tid; 439 440 switch (node->status) { 441 case rf_fired : 442 /* fire the do function of a node */ 443 if (rf_engineDebug) { 444 rf_get_threadid(tid); 445 printf("[%d] Firing node 0x%lx (%s)\n",tid,(unsigned long) node, node->name); 446 } 447 #ifdef KERNEL 448 if (node->flags & RF_DAGNODE_FLAG_YIELD) { 449 #if defined(__NetBSD__) && defined(_KERNEL) 450 /* thread_block(); */ 451 /* printf("Need to block the thread here...\n"); */ 452 /* XXX thread_block is actually mentioned in 453 /usr/include/vm/vm_extern.h */ 454 #else 455 thread_block(); 456 #endif 457 } 458 #endif /* KERNEL */ 459 (*(node->doFunc)) (node); 460 break; 461 case rf_recover : 462 /* fire the undo function of a node */ 463 if (rf_engineDebug || 1) { 464 rf_get_threadid(tid); 465 printf("[%d] Firing (undo) node 0x%lx (%s)\n",tid,(unsigned long) node, node->name); 466 } 467 #ifdef KERNEL 468 if (node->flags & RF_DAGNODE_FLAG_YIELD) 469 #if defined(__NetBSD__) && defined(_KERNEL) 470 /* thread_block(); */ 471 /* printf("Need to block the thread here...\n"); */ 472 /* XXX thread_block is actually mentioned in 473 /usr/include/vm/vm_extern.h */ 474 #else 475 thread_block(); 476 #endif 477 #endif /* KERNEL */ 478 (*(node->undoFunc)) (node); 479 break; 480 default : 481 RF_PANIC(); 482 break; 483 } 484 } 485 486 487 488 /* user context: 489 * Attempt to fire each node in a linear array. 490 * The entire list is fired atomically. 491 */ 492 static void FireNodeArray( 493 int numNodes, 494 RF_DagNode_t **nodeList) 495 { 496 RF_DagStatus_t dstat; 497 RF_DagNode_t *node; 498 int i, j; 499 500 /* first, mark all nodes which are ready to be fired */ 501 for (i = 0; i < numNodes; i++) { 502 node = nodeList[i]; 503 dstat = node->dagHdr->status; 504 RF_ASSERT((node->status == rf_wait) || (node->status == rf_good)); 505 if (NodeReady(node)) { 506 if ((dstat == rf_enable) || (dstat == rf_rollForward)) { 507 RF_ASSERT(node->status == rf_wait); 508 if (node->commitNode) 509 node->dagHdr->numCommits++; 510 node->status = rf_fired; 511 for (j = 0; j < node->numAntecedents; j++) 512 node->antecedents[j]->numSuccFired++; 513 } 514 else { 515 RF_ASSERT(dstat == rf_rollBackward); 516 RF_ASSERT(node->status == rf_good); 517 RF_ASSERT(node->commitNode == RF_FALSE); /* only one commit node per graph */ 518 node->status = rf_recover; 519 } 520 } 521 } 522 /* now, fire the nodes */ 523 for (i = 0; i < numNodes; i++) { 524 if ((nodeList[i]->status == rf_fired) || (nodeList[i]->status == rf_recover)) 525 FireNode(nodeList[i]); 526 } 527 } 528 529 530 #ifndef SIMULATE 531 /* user context: 532 * Attempt to fire each node in a linked list. 533 * The entire list is fired atomically. 534 */ 535 static void FireNodeList(RF_DagNode_t *nodeList) 536 { 537 RF_DagNode_t *node, *next; 538 RF_DagStatus_t dstat; 539 int j; 540 541 if (nodeList) { 542 /* first, mark all nodes which are ready to be fired */ 543 for (node = nodeList; node; node = next) { 544 next = node->next; 545 dstat = node->dagHdr->status; 546 RF_ASSERT((node->status == rf_wait) || (node->status == rf_good)); 547 if (NodeReady(node)) { 548 if ((dstat == rf_enable) || (dstat == rf_rollForward)) { 549 RF_ASSERT(node->status == rf_wait); 550 if (node->commitNode) 551 node->dagHdr->numCommits++; 552 node->status = rf_fired; 553 for (j = 0; j < node->numAntecedents; j++) 554 node->antecedents[j]->numSuccFired++; 555 } 556 else { 557 RF_ASSERT(dstat == rf_rollBackward); 558 RF_ASSERT(node->status == rf_good); 559 RF_ASSERT(node->commitNode == RF_FALSE); /* only one commit node per graph */ 560 node->status = rf_recover; 561 } 562 } 563 } 564 /* now, fire the nodes */ 565 for (node = nodeList; node; node = next) { 566 next = node->next; 567 if ((node->status == rf_fired) || (node->status == rf_recover)) 568 FireNode(node); 569 } 570 } 571 } 572 #endif /* !SIMULATE */ 573 574 575 576 /* interrupt context: 577 * for each succedent 578 * propagate required results from node to succedent 579 * increment succedent's numAntDone 580 * place newly-enable nodes on node queue for firing 581 * 582 * To save context switches, we don't place NIL nodes on the node queue, 583 * but rather just process them as if they had fired. Note that NIL nodes 584 * that are the direct successors of the header will actually get fired by 585 * DispatchDAG, which is fine because no context switches are involved. 586 * 587 * Important: when running at user level, this can be called by any 588 * disk thread, and so the increment and check of the antecedent count 589 * must be locked. I used the node queue mutex and locked down the 590 * entire function, but this is certainly overkill. 591 */ 592 static void PropagateResults( 593 RF_DagNode_t *node, 594 int context) 595 { 596 RF_DagNode_t *s, *a; 597 RF_Raid_t *raidPtr; 598 int tid, i, ks; 599 #ifdef SIMULATE 600 RF_PropHeader_t *p; /* prop list for succedent i */ 601 #else /* SIMULATE */ 602 RF_DagNode_t *finishlist = NULL; /* a list of NIL nodes to be finished */ 603 RF_DagNode_t *skiplist = NULL; /* list of nodes with failed truedata antecedents */ 604 RF_DagNode_t *firelist = NULL; /* a list of nodes to be fired */ 605 RF_DagNode_t *q = NULL, *qh = NULL, *next; 606 int j, skipNode; 607 #endif /* SIMULATE */ 608 609 rf_get_threadid(tid); 610 611 raidPtr = node->dagHdr->raidPtr; 612 613 DO_LOCK(raidPtr); 614 615 /* debug - validate fire counts */ 616 for (i = 0; i < node->numAntecedents; i++) { 617 a = *(node->antecedents + i); 618 RF_ASSERT(a->numSuccFired >= a->numSuccDone); 619 RF_ASSERT(a->numSuccFired <= a->numSuccedents); 620 a->numSuccDone++; 621 } 622 623 switch (node->dagHdr->status) { 624 case rf_enable : 625 case rf_rollForward : 626 #ifdef SIMULATE 627 /* currently we never propagate results unless in simulation */ 628 for (i = 0; i < node->numSuccedents; i++) { 629 s = *(node->succedents + i); 630 RF_ASSERT(s->status == rf_wait); 631 (s->numAntDone)++; 632 if (node->propList == NULL) 633 /* null propList implies no results to be propagated */ 634 p = NULL; 635 else 636 /* p=head of prop list for succedent i */ 637 p = *(node->propList + i); 638 while (p != NULL) { 639 /* bind node results to succedent's parameters */ 640 #if 0 641 *(s->params + p->paramNum) = *(node->results + p->resultNum); 642 #else 643 s->params[p->paramNum].p = node->results[p->resultNum]; 644 #endif 645 p = p->next; 646 } 647 } 648 #else /* SIMULATE */ 649 for (i = 0; i < node->numSuccedents; i++) { 650 s = *(node->succedents + i); 651 RF_ASSERT(s->status == rf_wait); 652 (s->numAntDone)++; 653 if (s->numAntDone == s->numAntecedents) { 654 /* look for NIL nodes */ 655 if (s->doFunc == rf_NullNodeFunc) { 656 /* don't fire NIL nodes, just process them */ 657 s->next = finishlist; 658 finishlist = s; 659 } 660 else { 661 /* look to see if the node is to be skipped */ 662 skipNode = RF_FALSE; 663 for (j = 0; j < s->numAntecedents; j++) 664 if ((s->antType[j] == rf_trueData) && (s->antecedents[j]->status == rf_bad)) 665 skipNode = RF_TRUE; 666 if (skipNode) { 667 /* this node has one or more failed true data dependencies, so skip it */ 668 s->next = skiplist; 669 skiplist = s; 670 } 671 else 672 /* add s to list of nodes (q) to execute */ 673 if (context != RF_INTR_CONTEXT) { 674 /* we only have to enqueue if we're at intr context */ 675 s->next = firelist; /* put node on a list to be fired after we unlock */ 676 firelist = s; 677 } else { /* enqueue the node for the dag exec thread to fire */ 678 RF_ASSERT(NodeReady(s)); 679 if (q) { 680 q->next = s; 681 q = s; 682 } 683 else { 684 qh = q = s; 685 qh->next = NULL; 686 } 687 } 688 } 689 } 690 } 691 692 if (q) { 693 /* xfer our local list of nodes to the node queue */ 694 q->next = raidPtr->node_queue; raidPtr->node_queue = qh; 695 DO_SIGNAL(raidPtr); 696 } 697 DO_UNLOCK(raidPtr); 698 699 for (; skiplist; skiplist = next) { 700 next = skiplist->next; 701 skiplist->status = rf_skipped; 702 for (i = 0; i < skiplist->numAntecedents; i++) { 703 skiplist->antecedents[i]->numSuccFired++; 704 } 705 if (skiplist->commitNode) { 706 skiplist->dagHdr->numCommits++; 707 } 708 rf_FinishNode(skiplist, context); 709 } 710 for (; finishlist; finishlist = next) { 711 /* NIL nodes: no need to fire them */ 712 next = finishlist->next; 713 finishlist->status = rf_good; 714 for (i = 0; i < finishlist->numAntecedents; i++) { 715 finishlist->antecedents[i]->numSuccFired++; 716 } 717 if (finishlist->commitNode) 718 finishlist->dagHdr->numCommits++; 719 /* 720 * Okay, here we're calling rf_FinishNode() on nodes that 721 * have the null function as their work proc. Such a node 722 * could be the terminal node in a DAG. If so, it will 723 * cause the DAG to complete, which will in turn free 724 * memory used by the DAG, which includes the node in 725 * question. Thus, we must avoid referencing the node 726 * at all after calling rf_FinishNode() on it. 727 */ 728 rf_FinishNode(finishlist, context); /* recursive call */ 729 } 730 /* fire all nodes in firelist */ 731 FireNodeList(firelist); 732 #endif /* SIMULATE */ 733 break; 734 735 case rf_rollBackward : 736 #ifdef SIMULATE 737 #else /* SIMULATE */ 738 for (i = 0; i < node->numAntecedents; i++) { 739 a = *(node->antecedents + i); 740 RF_ASSERT(a->status == rf_good); 741 RF_ASSERT(a->numSuccDone <= a->numSuccedents); 742 RF_ASSERT(a->numSuccDone <= a->numSuccFired); 743 744 if (a->numSuccDone == a->numSuccFired) { 745 if (a->undoFunc == rf_NullNodeFunc) { 746 /* don't fire NIL nodes, just process them */ 747 a->next = finishlist; 748 finishlist = a; 749 } else { 750 if (context != RF_INTR_CONTEXT) { 751 /* we only have to enqueue if we're at intr context */ 752 a->next = firelist; /* put node on a list to be fired after we unlock */ 753 firelist = a; 754 } else { /* enqueue the node for the dag exec thread to fire */ 755 RF_ASSERT(NodeReady(a)); 756 if (q) { 757 q->next = a; 758 q = a; 759 } 760 else { 761 qh = q = a; 762 qh->next = NULL; 763 } 764 } 765 } 766 } 767 768 } 769 if (q) { 770 /* xfer our local list of nodes to the node queue */ 771 q->next = raidPtr->node_queue; raidPtr->node_queue = qh; 772 DO_SIGNAL(raidPtr); 773 } 774 DO_UNLOCK(raidPtr); 775 for (; finishlist; finishlist = next) { /* NIL nodes: no need to fire them */ 776 next = finishlist->next; 777 finishlist->status = rf_good; 778 /* 779 * Okay, here we're calling rf_FinishNode() on nodes that 780 * have the null function as their work proc. Such a node 781 * could be the first node in a DAG. If so, it will 782 * cause the DAG to complete, which will in turn free 783 * memory used by the DAG, which includes the node in 784 * question. Thus, we must avoid referencing the node 785 * at all after calling rf_FinishNode() on it. 786 */ 787 rf_FinishNode(finishlist, context); /* recursive call */ 788 } 789 /* fire all nodes in firelist */ 790 FireNodeList(firelist); 791 #endif /* SIMULATE */ 792 793 break; 794 default : 795 printf("Engine found illegal DAG status in PropagateResults()\n"); 796 RF_PANIC(); 797 break; 798 } 799 } 800 801 802 803 /* 804 * Process a fired node which has completed 805 */ 806 static void ProcessNode( 807 RF_DagNode_t *node, 808 int context) 809 { 810 RF_Raid_t *raidPtr; 811 int tid; 812 813 raidPtr = node->dagHdr->raidPtr; 814 815 switch (node->status) { 816 case rf_good : 817 /* normal case, don't need to do anything */ 818 break; 819 case rf_bad : 820 if ((node->dagHdr->numCommits > 0) || (node->dagHdr->numCommitNodes == 0)) { 821 node->dagHdr->status = rf_rollForward; /* crossed commit barrier */ 822 if (rf_engineDebug || 1) { 823 rf_get_threadid(tid); 824 printf("[%d] node (%s) returned fail, rolling forward\n", tid, node->name); 825 } 826 } 827 else { 828 node->dagHdr->status = rf_rollBackward; /* never reached commit barrier */ 829 if (rf_engineDebug || 1) { 830 rf_get_threadid(tid); 831 printf("[%d] node (%s) returned fail, rolling backward\n", tid, node->name); 832 } 833 } 834 break; 835 case rf_undone : 836 /* normal rollBackward case, don't need to do anything */ 837 break; 838 case rf_panic : 839 /* an undo node failed!!! */ 840 printf("UNDO of a node failed!!!/n"); 841 break; 842 default : 843 printf("node finished execution with an illegal status!!!\n"); 844 RF_PANIC(); 845 break; 846 } 847 848 #ifdef SIMULATE 849 /* simulator fires nodes here. 850 * user/kernel rely upon PropagateResults to do this. 851 * XXX seems like this code should be merged so that the same thing happens for 852 * both sim, user, and kernel. -wvcii 853 */ 854 switch (node->dagHdr->status) { 855 case rf_enable : 856 case rf_rollForward : 857 if (node->numSuccedents == 0) { 858 /* process terminal node */ 859 if (rf_engineDebug) if (!DAGDone(node->dagHdr)) { 860 rf_get_threadid(tid); 861 printf("[%d] ProcessNode: !!!done but dag still in flight\n",tid); 862 RF_PANIC(); 863 } 864 if (rf_engineDebug) printf("[%d] ProcessNode: !!!done will return true\n",tid); 865 /* Mark dag as done */ 866 (node->dagHdr)->done=RF_TRUE; 867 raidPtr->dags_in_flight--; 868 } 869 else { 870 PropagateResults(node, context); 871 FireNodeArray(node->numSuccedents, node->succedents); 872 } 873 break; 874 case rf_rollBackward : 875 if (node->numAntecedents == 0) { 876 /* reached head of dag, we're done */ 877 if (rf_engineDebug) if (!DAGDone(node->dagHdr)) { 878 rf_get_threadid(tid); 879 printf("[%d] ProcessNode: !!!done but dag still in flight\n",tid); 880 RF_PANIC(); 881 } 882 if (rf_engineDebug) printf("[%d] ProcessNode: !!!done will return true\n",tid); 883 /* Mark dag as done */ 884 (node->dagHdr)->done=RF_TRUE; 885 raidPtr->dags_in_flight--; 886 } 887 else { 888 PropagateResults(node, context); 889 FireNodeArray(node->numAntecedents, node->antecedents); 890 } 891 break; 892 default : 893 RF_PANIC(); 894 break; 895 } 896 897 898 #else /* SIMULATE */ 899 /* enqueue node's succedents (antecedents if rollBackward) for execution */ 900 PropagateResults(node, context); 901 #endif /* SIMULATE */ 902 } 903 904 905 906 /* user context or dag-exec-thread context: 907 * This is the first step in post-processing a newly-completed node. 908 * This routine is called by each node execution function to mark the node 909 * as complete and fire off any successors that have been enabled. 910 */ 911 int rf_FinishNode( 912 RF_DagNode_t *node, 913 int context) 914 { 915 /* as far as I can tell, retcode is not used -wvcii */ 916 int retcode = RF_FALSE; 917 node->dagHdr->numNodesCompleted++; 918 ProcessNode(node, context); 919 920 #ifdef SIMULATE 921 if ((node->dagHdr)->done == RF_TRUE) 922 retcode = RF_TRUE; 923 #endif /* SIMULATE */ 924 925 return(retcode); 926 } 927 928 929 /* user context: 930 * submit dag for execution, return non-zero if we have to wait for completion. 931 * if and only if we return non-zero, we'll cause cbFunc to get invoked with 932 * cbArg when the DAG has completed. 933 * 934 * for now we always return 1. If the DAG does not cause any I/O, then the callback 935 * may get invoked before DispatchDAG returns. There's code in state 5 of ContinueRaidAccess 936 * to handle this. 937 * 938 * All we do here is fire the direct successors of the header node. The 939 * DAG execution thread does the rest of the dag processing. 940 */ 941 int rf_DispatchDAG( 942 RF_DagHeader_t *dag, 943 void (*cbFunc)(void *), 944 void *cbArg) 945 { 946 RF_Raid_t *raidPtr; 947 int tid; 948 949 raidPtr = dag->raidPtr; 950 if (dag->tracerec) { 951 RF_ETIMER_START(dag->tracerec->timer); 952 } 953 954 if (rf_engineDebug || rf_validateDAGDebug) { 955 if (rf_ValidateDAG(dag)) 956 RF_PANIC(); 957 } 958 if (rf_engineDebug) { 959 rf_get_threadid(tid); 960 printf("[%d] Entering DispatchDAG\n",tid); 961 } 962 963 raidPtr->dags_in_flight++; /* debug only: blow off proper locking */ 964 dag->cbFunc = cbFunc; 965 dag->cbArg = cbArg; 966 dag->numNodesCompleted = 0; 967 dag->status = rf_enable; 968 FireNodeArray(dag->numSuccedents, dag->succedents); 969 return(1); 970 } 971 972 /* dedicated kernel thread: 973 * the thread that handles all DAG node firing. 974 * To minimize locking and unlocking, we grab a copy of the entire node queue and then set the 975 * node queue to NULL before doing any firing of nodes. This way we only have to release the 976 * lock once. Of course, it's probably rare that there's more than one node in the queue at 977 * any one time, but it sometimes happens. 978 * 979 * In the kernel, this thread runs at spl0 and is not swappable. I copied these 980 * characteristics from the aio_completion_thread. 981 */ 982 983 #ifndef SIMULATE 984 static void DAGExecutionThread(RF_ThreadArg_t arg) 985 { 986 RF_DagNode_t *nd, *local_nq, *term_nq, *fire_nq; 987 RF_Raid_t *raidPtr; 988 int ks, tid; 989 int s; 990 #ifndef __NetBSD__ 991 RF_Thread_t thread; 992 #endif 993 994 raidPtr = (RF_Raid_t *)arg; 995 996 rf_assign_threadid(); 997 if (rf_engineDebug) { 998 rf_get_threadid(tid); 999 printf("[%d] Engine thread is running\n", tid); 1000 } 1001 1002 #ifdef KERNEL 1003 #ifndef __NetBSD__ 1004 thread = current_thread(); 1005 thread_swappable(thread, RF_FALSE); 1006 thread->priority = thread->sched_pri = BASEPRI_SYSTEM; 1007 s = spl0(); 1008 #endif 1009 /* XXX what to put here XXX */ 1010 1011 s=splbio(); 1012 1013 #endif /* KERNEL */ 1014 1015 RF_THREADGROUP_RUNNING(&raidPtr->engine_tg); 1016 1017 DO_LOCK(raidPtr); 1018 while (!raidPtr->shutdown_engine) { 1019 1020 while (raidPtr->node_queue != NULL) { 1021 local_nq = raidPtr->node_queue; 1022 fire_nq = NULL; 1023 term_nq = NULL; 1024 raidPtr->node_queue = NULL; 1025 DO_UNLOCK(raidPtr); 1026 1027 /* first, strip out the terminal nodes */ 1028 while (local_nq) { 1029 nd = local_nq; 1030 local_nq = local_nq->next; 1031 switch(nd->dagHdr->status) { 1032 case rf_enable : 1033 case rf_rollForward : 1034 if (nd->numSuccedents == 0) { 1035 /* end of the dag, add to callback list */ 1036 nd->next = term_nq; 1037 term_nq = nd; 1038 } 1039 else { 1040 /* not the end, add to the fire queue */ 1041 nd->next = fire_nq; 1042 fire_nq = nd; 1043 } 1044 break; 1045 case rf_rollBackward : 1046 if (nd->numAntecedents == 0) { 1047 /* end of the dag, add to the callback list */ 1048 nd->next = term_nq; 1049 term_nq = nd; 1050 } 1051 else { 1052 /* not the end, add to the fire queue */ 1053 nd->next = fire_nq; 1054 fire_nq = nd; 1055 } 1056 break; 1057 default : 1058 RF_PANIC(); 1059 break; 1060 } 1061 } 1062 1063 /* execute callback of dags which have reached the terminal node */ 1064 while (term_nq) { 1065 nd = term_nq; 1066 term_nq = term_nq->next; 1067 nd->next = NULL; 1068 (nd->dagHdr->cbFunc)(nd->dagHdr->cbArg); 1069 raidPtr->dags_in_flight--; /* debug only */ 1070 } 1071 1072 /* fire remaining nodes */ 1073 FireNodeList(fire_nq); 1074 1075 DO_LOCK(raidPtr); 1076 } 1077 while (!raidPtr->shutdown_engine && raidPtr->node_queue == NULL) 1078 DO_WAIT(raidPtr); 1079 } 1080 DO_UNLOCK(raidPtr); 1081 1082 RF_THREADGROUP_DONE(&raidPtr->engine_tg); 1083 #ifdef KERNEL 1084 #ifdef __NetBSD__ 1085 splx(s); 1086 kthread_exit(0); 1087 #else 1088 splx(s); 1089 thread_terminate(thread); 1090 thread_halt_self(); 1091 #endif 1092 #endif /* KERNEL */ 1093 } 1094 1095 #endif /* !SIMULATE */ 1096