1 /* $NetBSD: rf_dagffwr.c,v 1.5 2000/01/07 03:40:58 oster Exp $ */ 2 /* 3 * Copyright (c) 1995 Carnegie-Mellon University. 4 * All rights reserved. 5 * 6 * Author: Mark Holland, Daniel Stodolsky, William V. Courtright II 7 * 8 * Permission to use, copy, modify and distribute this software and 9 * its documentation is hereby granted, provided that both the copyright 10 * notice and this permission notice appear in all copies of the 11 * software, derivative works or modified versions, and any portions 12 * thereof, and that both notices appear in supporting documentation. 13 * 14 * CARNEGIE MELLON ALLOWS FREE USE OF THIS SOFTWARE IN ITS "AS IS" 15 * CONDITION. CARNEGIE MELLON DISCLAIMS ANY LIABILITY OF ANY KIND 16 * FOR ANY DAMAGES WHATSOEVER RESULTING FROM THE USE OF THIS SOFTWARE. 17 * 18 * Carnegie Mellon requests users of this software to return to 19 * 20 * Software Distribution Coordinator or Software.Distribution@CS.CMU.EDU 21 * School of Computer Science 22 * Carnegie Mellon University 23 * Pittsburgh PA 15213-3890 24 * 25 * any improvements or extensions that they make and grant Carnegie the 26 * rights to redistribute these changes. 27 */ 28 29 /* 30 * rf_dagff.c 31 * 32 * code for creating fault-free DAGs 33 * 34 */ 35 36 #include "rf_types.h" 37 #include "rf_raid.h" 38 #include "rf_dag.h" 39 #include "rf_dagutils.h" 40 #include "rf_dagfuncs.h" 41 #include "rf_debugMem.h" 42 #include "rf_dagffrd.h" 43 #include "rf_memchunk.h" 44 #include "rf_general.h" 45 #include "rf_dagffwr.h" 46 47 /****************************************************************************** 48 * 49 * General comments on DAG creation: 50 * 51 * All DAGs in this file use roll-away error recovery. Each DAG has a single 52 * commit node, usually called "Cmt." If an error occurs before the Cmt node 53 * is reached, the execution engine will halt forward execution and work 54 * backward through the graph, executing the undo functions. Assuming that 55 * each node in the graph prior to the Cmt node are undoable and atomic - or - 56 * does not make changes to permanent state, the graph will fail atomically. 57 * If an error occurs after the Cmt node executes, the engine will roll-forward 58 * through the graph, blindly executing nodes until it reaches the end. 59 * If a graph reaches the end, it is assumed to have completed successfully. 60 * 61 * A graph has only 1 Cmt node. 62 * 63 */ 64 65 66 /****************************************************************************** 67 * 68 * The following wrappers map the standard DAG creation interface to the 69 * DAG creation routines. Additionally, these wrappers enable experimentation 70 * with new DAG structures by providing an extra level of indirection, allowing 71 * the DAG creation routines to be replaced at this single point. 72 */ 73 74 75 void 76 rf_CreateNonRedundantWriteDAG( 77 RF_Raid_t * raidPtr, 78 RF_AccessStripeMap_t * asmap, 79 RF_DagHeader_t * dag_h, 80 void *bp, 81 RF_RaidAccessFlags_t flags, 82 RF_AllocListElem_t * allocList, 83 RF_IoType_t type) 84 { 85 rf_CreateNonredundantDAG(raidPtr, asmap, dag_h, bp, flags, allocList, 86 RF_IO_TYPE_WRITE); 87 } 88 89 void 90 rf_CreateRAID0WriteDAG( 91 RF_Raid_t * raidPtr, 92 RF_AccessStripeMap_t * asmap, 93 RF_DagHeader_t * dag_h, 94 void *bp, 95 RF_RaidAccessFlags_t flags, 96 RF_AllocListElem_t * allocList, 97 RF_IoType_t type) 98 { 99 rf_CreateNonredundantDAG(raidPtr, asmap, dag_h, bp, flags, allocList, 100 RF_IO_TYPE_WRITE); 101 } 102 103 void 104 rf_CreateSmallWriteDAG( 105 RF_Raid_t * raidPtr, 106 RF_AccessStripeMap_t * asmap, 107 RF_DagHeader_t * dag_h, 108 void *bp, 109 RF_RaidAccessFlags_t flags, 110 RF_AllocListElem_t * allocList) 111 { 112 /* "normal" rollaway */ 113 rf_CommonCreateSmallWriteDAG(raidPtr, asmap, dag_h, bp, flags, allocList, 114 &rf_xorFuncs, NULL); 115 } 116 117 void 118 rf_CreateLargeWriteDAG( 119 RF_Raid_t * raidPtr, 120 RF_AccessStripeMap_t * asmap, 121 RF_DagHeader_t * dag_h, 122 void *bp, 123 RF_RaidAccessFlags_t flags, 124 RF_AllocListElem_t * allocList) 125 { 126 /* "normal" rollaway */ 127 rf_CommonCreateLargeWriteDAG(raidPtr, asmap, dag_h, bp, flags, allocList, 128 1, rf_RegularXorFunc, RF_TRUE); 129 } 130 131 132 /****************************************************************************** 133 * 134 * DAG creation code begins here 135 */ 136 137 138 /****************************************************************************** 139 * 140 * creates a DAG to perform a large-write operation: 141 * 142 * / Rod \ / Wnd \ 143 * H -- block- Rod - Xor - Cmt - Wnd --- T 144 * \ Rod / \ Wnp / 145 * \[Wnq]/ 146 * 147 * The XOR node also does the Q calculation in the P+Q architecture. 148 * All nodes are before the commit node (Cmt) are assumed to be atomic and 149 * undoable - or - they make no changes to permanent state. 150 * 151 * Rod = read old data 152 * Cmt = commit node 153 * Wnp = write new parity 154 * Wnd = write new data 155 * Wnq = write new "q" 156 * [] denotes optional segments in the graph 157 * 158 * Parameters: raidPtr - description of the physical array 159 * asmap - logical & physical addresses for this access 160 * bp - buffer ptr (holds write data) 161 * flags - general flags (e.g. disk locking) 162 * allocList - list of memory allocated in DAG creation 163 * nfaults - number of faults array can tolerate 164 * (equal to # redundancy units in stripe) 165 * redfuncs - list of redundancy generating functions 166 * 167 *****************************************************************************/ 168 169 void 170 rf_CommonCreateLargeWriteDAG( 171 RF_Raid_t * raidPtr, 172 RF_AccessStripeMap_t * asmap, 173 RF_DagHeader_t * dag_h, 174 void *bp, 175 RF_RaidAccessFlags_t flags, 176 RF_AllocListElem_t * allocList, 177 int nfaults, 178 int (*redFunc) (RF_DagNode_t *), 179 int allowBufferRecycle) 180 { 181 RF_DagNode_t *nodes, *wndNodes, *rodNodes, *xorNode, *wnpNode; 182 RF_DagNode_t *wnqNode, *blockNode, *commitNode, *termNode; 183 int nWndNodes, nRodNodes, i, nodeNum, asmNum; 184 RF_AccessStripeMapHeader_t *new_asm_h[2]; 185 RF_StripeNum_t parityStripeID; 186 char *sosBuffer, *eosBuffer; 187 RF_ReconUnitNum_t which_ru; 188 RF_RaidLayout_t *layoutPtr; 189 RF_PhysDiskAddr_t *pda; 190 191 layoutPtr = &(raidPtr->Layout); 192 parityStripeID = rf_RaidAddressToParityStripeID(layoutPtr, asmap->raidAddress, 193 &which_ru); 194 195 if (rf_dagDebug) { 196 printf("[Creating large-write DAG]\n"); 197 } 198 dag_h->creator = "LargeWriteDAG"; 199 200 dag_h->numCommitNodes = 1; 201 dag_h->numCommits = 0; 202 dag_h->numSuccedents = 1; 203 204 /* alloc the nodes: Wnd, xor, commit, block, term, and Wnp */ 205 nWndNodes = asmap->numStripeUnitsAccessed; 206 RF_CallocAndAdd(nodes, nWndNodes + 4 + nfaults, sizeof(RF_DagNode_t), 207 (RF_DagNode_t *), allocList); 208 i = 0; 209 wndNodes = &nodes[i]; 210 i += nWndNodes; 211 xorNode = &nodes[i]; 212 i += 1; 213 wnpNode = &nodes[i]; 214 i += 1; 215 blockNode = &nodes[i]; 216 i += 1; 217 commitNode = &nodes[i]; 218 i += 1; 219 termNode = &nodes[i]; 220 i += 1; 221 if (nfaults == 2) { 222 wnqNode = &nodes[i]; 223 i += 1; 224 } else { 225 wnqNode = NULL; 226 } 227 rf_MapUnaccessedPortionOfStripe(raidPtr, layoutPtr, asmap, dag_h, new_asm_h, 228 &nRodNodes, &sosBuffer, &eosBuffer, allocList); 229 if (nRodNodes > 0) { 230 RF_CallocAndAdd(rodNodes, nRodNodes, sizeof(RF_DagNode_t), 231 (RF_DagNode_t *), allocList); 232 } else { 233 rodNodes = NULL; 234 } 235 236 /* begin node initialization */ 237 if (nRodNodes > 0) { 238 rf_InitNode(blockNode, rf_wait, RF_FALSE, rf_NullNodeFunc, rf_NullNodeUndoFunc, 239 NULL, nRodNodes, 0, 0, 0, dag_h, "Nil", allocList); 240 } else { 241 rf_InitNode(blockNode, rf_wait, RF_FALSE, rf_NullNodeFunc, rf_NullNodeUndoFunc, 242 NULL, 1, 0, 0, 0, dag_h, "Nil", allocList); 243 } 244 245 rf_InitNode(commitNode, rf_wait, RF_TRUE, rf_NullNodeFunc, rf_NullNodeUndoFunc, NULL, 246 nWndNodes + nfaults, 1, 0, 0, dag_h, "Cmt", allocList); 247 rf_InitNode(termNode, rf_wait, RF_FALSE, rf_TerminateFunc, rf_TerminateUndoFunc, NULL, 248 0, nWndNodes + nfaults, 0, 0, dag_h, "Trm", allocList); 249 250 /* initialize the Rod nodes */ 251 for (nodeNum = asmNum = 0; asmNum < 2; asmNum++) { 252 if (new_asm_h[asmNum]) { 253 pda = new_asm_h[asmNum]->stripeMap->physInfo; 254 while (pda) { 255 rf_InitNode(&rodNodes[nodeNum], rf_wait, RF_FALSE, rf_DiskReadFunc, 256 rf_DiskReadUndoFunc, rf_GenericWakeupFunc, 1, 1, 4, 0, dag_h, 257 "Rod", allocList); 258 rodNodes[nodeNum].params[0].p = pda; 259 rodNodes[nodeNum].params[1].p = pda->bufPtr; 260 rodNodes[nodeNum].params[2].v = parityStripeID; 261 rodNodes[nodeNum].params[3].v = RF_CREATE_PARAM3(RF_IO_NORMAL_PRIORITY, 262 0, 0, which_ru); 263 nodeNum++; 264 pda = pda->next; 265 } 266 } 267 } 268 RF_ASSERT(nodeNum == nRodNodes); 269 270 /* initialize the wnd nodes */ 271 pda = asmap->physInfo; 272 for (i = 0; i < nWndNodes; i++) { 273 rf_InitNode(&wndNodes[i], rf_wait, RF_FALSE, rf_DiskWriteFunc, rf_DiskWriteUndoFunc, 274 rf_GenericWakeupFunc, 1, 1, 4, 0, dag_h, "Wnd", allocList); 275 RF_ASSERT(pda != NULL); 276 wndNodes[i].params[0].p = pda; 277 wndNodes[i].params[1].p = pda->bufPtr; 278 wndNodes[i].params[2].v = parityStripeID; 279 wndNodes[i].params[3].v = RF_CREATE_PARAM3(RF_IO_NORMAL_PRIORITY, 0, 0, which_ru); 280 pda = pda->next; 281 } 282 283 /* initialize the redundancy node */ 284 if (nRodNodes > 0) { 285 rf_InitNode(xorNode, rf_wait, RF_FALSE, redFunc, rf_NullNodeUndoFunc, NULL, 1, 286 nRodNodes, 2 * (nWndNodes + nRodNodes) + 1, nfaults, dag_h, 287 "Xr ", allocList); 288 } else { 289 rf_InitNode(xorNode, rf_wait, RF_FALSE, redFunc, rf_NullNodeUndoFunc, NULL, 1, 290 1, 2 * (nWndNodes + nRodNodes) + 1, nfaults, dag_h, "Xr ", allocList); 291 } 292 xorNode->flags |= RF_DAGNODE_FLAG_YIELD; 293 for (i = 0; i < nWndNodes; i++) { 294 xorNode->params[2 * i + 0] = wndNodes[i].params[0]; /* pda */ 295 xorNode->params[2 * i + 1] = wndNodes[i].params[1]; /* buf ptr */ 296 } 297 for (i = 0; i < nRodNodes; i++) { 298 xorNode->params[2 * (nWndNodes + i) + 0] = rodNodes[i].params[0]; /* pda */ 299 xorNode->params[2 * (nWndNodes + i) + 1] = rodNodes[i].params[1]; /* buf ptr */ 300 } 301 /* xor node needs to get at RAID information */ 302 xorNode->params[2 * (nWndNodes + nRodNodes)].p = raidPtr; 303 304 /* 305 * Look for an Rod node that reads a complete SU. If none, alloc a buffer 306 * to receive the parity info. Note that we can't use a new data buffer 307 * because it will not have gotten written when the xor occurs. 308 */ 309 if (allowBufferRecycle) { 310 for (i = 0; i < nRodNodes; i++) { 311 if (((RF_PhysDiskAddr_t *) rodNodes[i].params[0].p)->numSector == raidPtr->Layout.sectorsPerStripeUnit) 312 break; 313 } 314 } 315 if ((!allowBufferRecycle) || (i == nRodNodes)) { 316 RF_CallocAndAdd(xorNode->results[0], 1, 317 rf_RaidAddressToByte(raidPtr, raidPtr->Layout.sectorsPerStripeUnit), 318 (void *), allocList); 319 } else { 320 xorNode->results[0] = rodNodes[i].params[1].p; 321 } 322 323 /* initialize the Wnp node */ 324 rf_InitNode(wnpNode, rf_wait, RF_FALSE, rf_DiskWriteFunc, rf_DiskWriteUndoFunc, 325 rf_GenericWakeupFunc, 1, 1, 4, 0, dag_h, "Wnp", allocList); 326 wnpNode->params[0].p = asmap->parityInfo; 327 wnpNode->params[1].p = xorNode->results[0]; 328 wnpNode->params[2].v = parityStripeID; 329 wnpNode->params[3].v = RF_CREATE_PARAM3(RF_IO_NORMAL_PRIORITY, 0, 0, which_ru); 330 /* parityInfo must describe entire parity unit */ 331 RF_ASSERT(asmap->parityInfo->next == NULL); 332 333 if (nfaults == 2) { 334 /* 335 * We never try to recycle a buffer for the Q calcuation 336 * in addition to the parity. This would cause two buffers 337 * to get smashed during the P and Q calculation, guaranteeing 338 * one would be wrong. 339 */ 340 RF_CallocAndAdd(xorNode->results[1], 1, 341 rf_RaidAddressToByte(raidPtr, raidPtr->Layout.sectorsPerStripeUnit), 342 (void *), allocList); 343 rf_InitNode(wnqNode, rf_wait, RF_FALSE, rf_DiskWriteFunc, rf_DiskWriteUndoFunc, 344 rf_GenericWakeupFunc, 1, 1, 4, 0, dag_h, "Wnq", allocList); 345 wnqNode->params[0].p = asmap->qInfo; 346 wnqNode->params[1].p = xorNode->results[1]; 347 wnqNode->params[2].v = parityStripeID; 348 wnqNode->params[3].v = RF_CREATE_PARAM3(RF_IO_NORMAL_PRIORITY, 0, 0, which_ru); 349 /* parityInfo must describe entire parity unit */ 350 RF_ASSERT(asmap->parityInfo->next == NULL); 351 } 352 /* 353 * Connect nodes to form graph. 354 */ 355 356 /* connect dag header to block node */ 357 RF_ASSERT(blockNode->numAntecedents == 0); 358 dag_h->succedents[0] = blockNode; 359 360 if (nRodNodes > 0) { 361 /* connect the block node to the Rod nodes */ 362 RF_ASSERT(blockNode->numSuccedents == nRodNodes); 363 RF_ASSERT(xorNode->numAntecedents == nRodNodes); 364 for (i = 0; i < nRodNodes; i++) { 365 RF_ASSERT(rodNodes[i].numAntecedents == 1); 366 blockNode->succedents[i] = &rodNodes[i]; 367 rodNodes[i].antecedents[0] = blockNode; 368 rodNodes[i].antType[0] = rf_control; 369 370 /* connect the Rod nodes to the Xor node */ 371 RF_ASSERT(rodNodes[i].numSuccedents == 1); 372 rodNodes[i].succedents[0] = xorNode; 373 xorNode->antecedents[i] = &rodNodes[i]; 374 xorNode->antType[i] = rf_trueData; 375 } 376 } else { 377 /* connect the block node to the Xor node */ 378 RF_ASSERT(blockNode->numSuccedents == 1); 379 RF_ASSERT(xorNode->numAntecedents == 1); 380 blockNode->succedents[0] = xorNode; 381 xorNode->antecedents[0] = blockNode; 382 xorNode->antType[0] = rf_control; 383 } 384 385 /* connect the xor node to the commit node */ 386 RF_ASSERT(xorNode->numSuccedents == 1); 387 RF_ASSERT(commitNode->numAntecedents == 1); 388 xorNode->succedents[0] = commitNode; 389 commitNode->antecedents[0] = xorNode; 390 commitNode->antType[0] = rf_control; 391 392 /* connect the commit node to the write nodes */ 393 RF_ASSERT(commitNode->numSuccedents == nWndNodes + nfaults); 394 for (i = 0; i < nWndNodes; i++) { 395 RF_ASSERT(wndNodes->numAntecedents == 1); 396 commitNode->succedents[i] = &wndNodes[i]; 397 wndNodes[i].antecedents[0] = commitNode; 398 wndNodes[i].antType[0] = rf_control; 399 } 400 RF_ASSERT(wnpNode->numAntecedents == 1); 401 commitNode->succedents[nWndNodes] = wnpNode; 402 wnpNode->antecedents[0] = commitNode; 403 wnpNode->antType[0] = rf_trueData; 404 if (nfaults == 2) { 405 RF_ASSERT(wnqNode->numAntecedents == 1); 406 commitNode->succedents[nWndNodes + 1] = wnqNode; 407 wnqNode->antecedents[0] = commitNode; 408 wnqNode->antType[0] = rf_trueData; 409 } 410 /* connect the write nodes to the term node */ 411 RF_ASSERT(termNode->numAntecedents == nWndNodes + nfaults); 412 RF_ASSERT(termNode->numSuccedents == 0); 413 for (i = 0; i < nWndNodes; i++) { 414 RF_ASSERT(wndNodes->numSuccedents == 1); 415 wndNodes[i].succedents[0] = termNode; 416 termNode->antecedents[i] = &wndNodes[i]; 417 termNode->antType[i] = rf_control; 418 } 419 RF_ASSERT(wnpNode->numSuccedents == 1); 420 wnpNode->succedents[0] = termNode; 421 termNode->antecedents[nWndNodes] = wnpNode; 422 termNode->antType[nWndNodes] = rf_control; 423 if (nfaults == 2) { 424 RF_ASSERT(wnqNode->numSuccedents == 1); 425 wnqNode->succedents[0] = termNode; 426 termNode->antecedents[nWndNodes + 1] = wnqNode; 427 termNode->antType[nWndNodes + 1] = rf_control; 428 } 429 } 430 /****************************************************************************** 431 * 432 * creates a DAG to perform a small-write operation (either raid 5 or pq), 433 * which is as follows: 434 * 435 * Hdr -> Nil -> Rop -> Xor -> Cmt ----> Wnp [Unp] --> Trm 436 * \- Rod X / \----> Wnd [Und]-/ 437 * [\- Rod X / \---> Wnd [Und]-/] 438 * [\- Roq -> Q / \--> Wnq [Unq]-/] 439 * 440 * Rop = read old parity 441 * Rod = read old data 442 * Roq = read old "q" 443 * Cmt = commit node 444 * Und = unlock data disk 445 * Unp = unlock parity disk 446 * Unq = unlock q disk 447 * Wnp = write new parity 448 * Wnd = write new data 449 * Wnq = write new "q" 450 * [ ] denotes optional segments in the graph 451 * 452 * Parameters: raidPtr - description of the physical array 453 * asmap - logical & physical addresses for this access 454 * bp - buffer ptr (holds write data) 455 * flags - general flags (e.g. disk locking) 456 * allocList - list of memory allocated in DAG creation 457 * pfuncs - list of parity generating functions 458 * qfuncs - list of q generating functions 459 * 460 * A null qfuncs indicates single fault tolerant 461 *****************************************************************************/ 462 463 void 464 rf_CommonCreateSmallWriteDAG( 465 RF_Raid_t * raidPtr, 466 RF_AccessStripeMap_t * asmap, 467 RF_DagHeader_t * dag_h, 468 void *bp, 469 RF_RaidAccessFlags_t flags, 470 RF_AllocListElem_t * allocList, 471 RF_RedFuncs_t * pfuncs, 472 RF_RedFuncs_t * qfuncs) 473 { 474 RF_DagNode_t *readDataNodes, *readParityNodes, *readQNodes, *termNode; 475 RF_DagNode_t *unlockDataNodes, *unlockParityNodes, *unlockQNodes; 476 RF_DagNode_t *xorNodes, *qNodes, *blockNode, *commitNode, *nodes; 477 RF_DagNode_t *writeDataNodes, *writeParityNodes, *writeQNodes; 478 int i, j, nNodes, totalNumNodes, lu_flag; 479 RF_ReconUnitNum_t which_ru; 480 int (*func) (RF_DagNode_t *), (*undoFunc) (RF_DagNode_t *); 481 int (*qfunc) (RF_DagNode_t *); 482 int numDataNodes, numParityNodes; 483 RF_StripeNum_t parityStripeID; 484 RF_PhysDiskAddr_t *pda; 485 char *name, *qname; 486 long nfaults; 487 488 nfaults = qfuncs ? 2 : 1; 489 lu_flag = (rf_enableAtomicRMW) ? 1 : 0; /* lock/unlock flag */ 490 491 parityStripeID = rf_RaidAddressToParityStripeID(&(raidPtr->Layout), 492 asmap->raidAddress, &which_ru); 493 pda = asmap->physInfo; 494 numDataNodes = asmap->numStripeUnitsAccessed; 495 numParityNodes = (asmap->parityInfo->next) ? 2 : 1; 496 497 if (rf_dagDebug) { 498 printf("[Creating small-write DAG]\n"); 499 } 500 RF_ASSERT(numDataNodes > 0); 501 dag_h->creator = "SmallWriteDAG"; 502 503 dag_h->numCommitNodes = 1; 504 dag_h->numCommits = 0; 505 dag_h->numSuccedents = 1; 506 507 /* 508 * DAG creation occurs in four steps: 509 * 1. count the number of nodes in the DAG 510 * 2. create the nodes 511 * 3. initialize the nodes 512 * 4. connect the nodes 513 */ 514 515 /* 516 * Step 1. compute number of nodes in the graph 517 */ 518 519 /* number of nodes: a read and write for each data unit a redundancy 520 * computation node for each parity node (nfaults * nparity) a read 521 * and write for each parity unit a block and commit node (2) a 522 * terminate node if atomic RMW an unlock node for each data unit, 523 * redundancy unit */ 524 totalNumNodes = (2 * numDataNodes) + (nfaults * numParityNodes) 525 + (nfaults * 2 * numParityNodes) + 3; 526 if (lu_flag) { 527 totalNumNodes += (numDataNodes + (nfaults * numParityNodes)); 528 } 529 /* 530 * Step 2. create the nodes 531 */ 532 RF_CallocAndAdd(nodes, totalNumNodes, sizeof(RF_DagNode_t), 533 (RF_DagNode_t *), allocList); 534 i = 0; 535 blockNode = &nodes[i]; 536 i += 1; 537 commitNode = &nodes[i]; 538 i += 1; 539 readDataNodes = &nodes[i]; 540 i += numDataNodes; 541 readParityNodes = &nodes[i]; 542 i += numParityNodes; 543 writeDataNodes = &nodes[i]; 544 i += numDataNodes; 545 writeParityNodes = &nodes[i]; 546 i += numParityNodes; 547 xorNodes = &nodes[i]; 548 i += numParityNodes; 549 termNode = &nodes[i]; 550 i += 1; 551 if (lu_flag) { 552 unlockDataNodes = &nodes[i]; 553 i += numDataNodes; 554 unlockParityNodes = &nodes[i]; 555 i += numParityNodes; 556 } else { 557 unlockDataNodes = unlockParityNodes = NULL; 558 } 559 if (nfaults == 2) { 560 readQNodes = &nodes[i]; 561 i += numParityNodes; 562 writeQNodes = &nodes[i]; 563 i += numParityNodes; 564 qNodes = &nodes[i]; 565 i += numParityNodes; 566 if (lu_flag) { 567 unlockQNodes = &nodes[i]; 568 i += numParityNodes; 569 } else { 570 unlockQNodes = NULL; 571 } 572 } else { 573 readQNodes = writeQNodes = qNodes = unlockQNodes = NULL; 574 } 575 RF_ASSERT(i == totalNumNodes); 576 577 /* 578 * Step 3. initialize the nodes 579 */ 580 /* initialize block node (Nil) */ 581 nNodes = numDataNodes + (nfaults * numParityNodes); 582 rf_InitNode(blockNode, rf_wait, RF_FALSE, rf_NullNodeFunc, rf_NullNodeUndoFunc, 583 NULL, nNodes, 0, 0, 0, dag_h, "Nil", allocList); 584 585 /* initialize commit node (Cmt) */ 586 rf_InitNode(commitNode, rf_wait, RF_TRUE, rf_NullNodeFunc, rf_NullNodeUndoFunc, 587 NULL, nNodes, (nfaults * numParityNodes), 0, 0, dag_h, "Cmt", allocList); 588 589 /* initialize terminate node (Trm) */ 590 rf_InitNode(termNode, rf_wait, RF_FALSE, rf_TerminateFunc, rf_TerminateUndoFunc, 591 NULL, 0, nNodes, 0, 0, dag_h, "Trm", allocList); 592 593 /* initialize nodes which read old data (Rod) */ 594 for (i = 0; i < numDataNodes; i++) { 595 rf_InitNode(&readDataNodes[i], rf_wait, RF_FALSE, rf_DiskReadFunc, rf_DiskReadUndoFunc, 596 rf_GenericWakeupFunc, (nfaults * numParityNodes), 1, 4, 0, dag_h, 597 "Rod", allocList); 598 RF_ASSERT(pda != NULL); 599 /* physical disk addr desc */ 600 readDataNodes[i].params[0].p = pda; 601 /* buffer to hold old data */ 602 readDataNodes[i].params[1].p = rf_AllocBuffer(raidPtr, 603 dag_h, pda, allocList); 604 readDataNodes[i].params[2].v = parityStripeID; 605 readDataNodes[i].params[3].v = RF_CREATE_PARAM3(RF_IO_NORMAL_PRIORITY, 606 lu_flag, 0, which_ru); 607 pda = pda->next; 608 for (j = 0; j < readDataNodes[i].numSuccedents; j++) { 609 readDataNodes[i].propList[j] = NULL; 610 } 611 } 612 613 /* initialize nodes which read old parity (Rop) */ 614 pda = asmap->parityInfo; 615 i = 0; 616 for (i = 0; i < numParityNodes; i++) { 617 RF_ASSERT(pda != NULL); 618 rf_InitNode(&readParityNodes[i], rf_wait, RF_FALSE, rf_DiskReadFunc, 619 rf_DiskReadUndoFunc, rf_GenericWakeupFunc, numParityNodes, 1, 4, 620 0, dag_h, "Rop", allocList); 621 readParityNodes[i].params[0].p = pda; 622 /* buffer to hold old parity */ 623 readParityNodes[i].params[1].p = rf_AllocBuffer(raidPtr, 624 dag_h, pda, allocList); 625 readParityNodes[i].params[2].v = parityStripeID; 626 readParityNodes[i].params[3].v = RF_CREATE_PARAM3(RF_IO_NORMAL_PRIORITY, 627 lu_flag, 0, which_ru); 628 pda = pda->next; 629 for (j = 0; j < readParityNodes[i].numSuccedents; j++) { 630 readParityNodes[i].propList[0] = NULL; 631 } 632 } 633 634 /* initialize nodes which read old Q (Roq) */ 635 if (nfaults == 2) { 636 pda = asmap->qInfo; 637 for (i = 0; i < numParityNodes; i++) { 638 RF_ASSERT(pda != NULL); 639 rf_InitNode(&readQNodes[i], rf_wait, RF_FALSE, rf_DiskReadFunc, rf_DiskReadUndoFunc, 640 rf_GenericWakeupFunc, numParityNodes, 1, 4, 0, dag_h, "Roq", allocList); 641 readQNodes[i].params[0].p = pda; 642 /* buffer to hold old Q */ 643 readQNodes[i].params[1].p = rf_AllocBuffer(raidPtr, dag_h, pda, 644 allocList); 645 readQNodes[i].params[2].v = parityStripeID; 646 readQNodes[i].params[3].v = RF_CREATE_PARAM3(RF_IO_NORMAL_PRIORITY, 647 lu_flag, 0, which_ru); 648 pda = pda->next; 649 for (j = 0; j < readQNodes[i].numSuccedents; j++) { 650 readQNodes[i].propList[0] = NULL; 651 } 652 } 653 } 654 /* initialize nodes which write new data (Wnd) */ 655 pda = asmap->physInfo; 656 for (i = 0; i < numDataNodes; i++) { 657 RF_ASSERT(pda != NULL); 658 rf_InitNode(&writeDataNodes[i], rf_wait, RF_FALSE, rf_DiskWriteFunc, 659 rf_DiskWriteUndoFunc, rf_GenericWakeupFunc, 1, 1, 4, 0, dag_h, 660 "Wnd", allocList); 661 /* physical disk addr desc */ 662 writeDataNodes[i].params[0].p = pda; 663 /* buffer holding new data to be written */ 664 writeDataNodes[i].params[1].p = pda->bufPtr; 665 writeDataNodes[i].params[2].v = parityStripeID; 666 writeDataNodes[i].params[3].v = RF_CREATE_PARAM3(RF_IO_NORMAL_PRIORITY, 667 0, 0, which_ru); 668 if (lu_flag) { 669 /* initialize node to unlock the disk queue */ 670 rf_InitNode(&unlockDataNodes[i], rf_wait, RF_FALSE, rf_DiskUnlockFunc, 671 rf_DiskUnlockUndoFunc, rf_GenericWakeupFunc, 1, 1, 2, 0, dag_h, 672 "Und", allocList); 673 /* physical disk addr desc */ 674 unlockDataNodes[i].params[0].p = pda; 675 unlockDataNodes[i].params[1].v = RF_CREATE_PARAM3(RF_IO_NORMAL_PRIORITY, 676 0, lu_flag, which_ru); 677 } 678 pda = pda->next; 679 } 680 681 /* 682 * Initialize nodes which compute new parity and Q. 683 */ 684 /* 685 * We use the simple XOR func in the double-XOR case, and when 686 * we're accessing only a portion of one stripe unit. The distinction 687 * between the two is that the regular XOR func assumes that the targbuf 688 * is a full SU in size, and examines the pda associated with the buffer 689 * to decide where within the buffer to XOR the data, whereas 690 * the simple XOR func just XORs the data into the start of the buffer. 691 */ 692 if ((numParityNodes == 2) || ((numDataNodes == 1) 693 && (asmap->totalSectorsAccessed < raidPtr->Layout.sectorsPerStripeUnit))) { 694 func = pfuncs->simple; 695 undoFunc = rf_NullNodeUndoFunc; 696 name = pfuncs->SimpleName; 697 if (qfuncs) { 698 qfunc = qfuncs->simple; 699 qname = qfuncs->SimpleName; 700 } else { 701 qfunc = NULL; 702 qname = NULL; 703 } 704 } else { 705 func = pfuncs->regular; 706 undoFunc = rf_NullNodeUndoFunc; 707 name = pfuncs->RegularName; 708 if (qfuncs) { 709 qfunc = qfuncs->regular; 710 qname = qfuncs->RegularName; 711 } else { 712 qfunc = NULL; 713 qname = NULL; 714 } 715 } 716 /* 717 * Initialize the xor nodes: params are {pda,buf} 718 * from {Rod,Wnd,Rop} nodes, and raidPtr 719 */ 720 if (numParityNodes == 2) { 721 /* double-xor case */ 722 for (i = 0; i < numParityNodes; i++) { 723 /* note: no wakeup func for xor */ 724 rf_InitNode(&xorNodes[i], rf_wait, RF_FALSE, func, undoFunc, NULL, 725 1, (numDataNodes + numParityNodes), 7, 1, dag_h, name, allocList); 726 xorNodes[i].flags |= RF_DAGNODE_FLAG_YIELD; 727 xorNodes[i].params[0] = readDataNodes[i].params[0]; 728 xorNodes[i].params[1] = readDataNodes[i].params[1]; 729 xorNodes[i].params[2] = readParityNodes[i].params[0]; 730 xorNodes[i].params[3] = readParityNodes[i].params[1]; 731 xorNodes[i].params[4] = writeDataNodes[i].params[0]; 732 xorNodes[i].params[5] = writeDataNodes[i].params[1]; 733 xorNodes[i].params[6].p = raidPtr; 734 /* use old parity buf as target buf */ 735 xorNodes[i].results[0] = readParityNodes[i].params[1].p; 736 if (nfaults == 2) { 737 /* note: no wakeup func for qor */ 738 rf_InitNode(&qNodes[i], rf_wait, RF_FALSE, qfunc, undoFunc, NULL, 1, 739 (numDataNodes + numParityNodes), 7, 1, dag_h, qname, allocList); 740 qNodes[i].params[0] = readDataNodes[i].params[0]; 741 qNodes[i].params[1] = readDataNodes[i].params[1]; 742 qNodes[i].params[2] = readQNodes[i].params[0]; 743 qNodes[i].params[3] = readQNodes[i].params[1]; 744 qNodes[i].params[4] = writeDataNodes[i].params[0]; 745 qNodes[i].params[5] = writeDataNodes[i].params[1]; 746 qNodes[i].params[6].p = raidPtr; 747 /* use old Q buf as target buf */ 748 qNodes[i].results[0] = readQNodes[i].params[1].p; 749 } 750 } 751 } else { 752 /* there is only one xor node in this case */ 753 rf_InitNode(&xorNodes[0], rf_wait, RF_FALSE, func, undoFunc, NULL, 1, 754 (numDataNodes + numParityNodes), 755 (2 * (numDataNodes + numDataNodes + 1) + 1), 1, dag_h, name, allocList); 756 xorNodes[0].flags |= RF_DAGNODE_FLAG_YIELD; 757 for (i = 0; i < numDataNodes + 1; i++) { 758 /* set up params related to Rod and Rop nodes */ 759 xorNodes[0].params[2 * i + 0] = readDataNodes[i].params[0]; /* pda */ 760 xorNodes[0].params[2 * i + 1] = readDataNodes[i].params[1]; /* buffer ptr */ 761 } 762 for (i = 0; i < numDataNodes; i++) { 763 /* set up params related to Wnd and Wnp nodes */ 764 xorNodes[0].params[2 * (numDataNodes + 1 + i) + 0] = /* pda */ 765 writeDataNodes[i].params[0]; 766 xorNodes[0].params[2 * (numDataNodes + 1 + i) + 1] = /* buffer ptr */ 767 writeDataNodes[i].params[1]; 768 } 769 /* xor node needs to get at RAID information */ 770 xorNodes[0].params[2 * (numDataNodes + numDataNodes + 1)].p = raidPtr; 771 xorNodes[0].results[0] = readParityNodes[0].params[1].p; 772 if (nfaults == 2) { 773 rf_InitNode(&qNodes[0], rf_wait, RF_FALSE, qfunc, undoFunc, NULL, 1, 774 (numDataNodes + numParityNodes), 775 (2 * (numDataNodes + numDataNodes + 1) + 1), 1, dag_h, 776 qname, allocList); 777 for (i = 0; i < numDataNodes; i++) { 778 /* set up params related to Rod */ 779 qNodes[0].params[2 * i + 0] = readDataNodes[i].params[0]; /* pda */ 780 qNodes[0].params[2 * i + 1] = readDataNodes[i].params[1]; /* buffer ptr */ 781 } 782 /* and read old q */ 783 qNodes[0].params[2 * numDataNodes + 0] = /* pda */ 784 readQNodes[0].params[0]; 785 qNodes[0].params[2 * numDataNodes + 1] = /* buffer ptr */ 786 readQNodes[0].params[1]; 787 for (i = 0; i < numDataNodes; i++) { 788 /* set up params related to Wnd nodes */ 789 qNodes[0].params[2 * (numDataNodes + 1 + i) + 0] = /* pda */ 790 writeDataNodes[i].params[0]; 791 qNodes[0].params[2 * (numDataNodes + 1 + i) + 1] = /* buffer ptr */ 792 writeDataNodes[i].params[1]; 793 } 794 /* xor node needs to get at RAID information */ 795 qNodes[0].params[2 * (numDataNodes + numDataNodes + 1)].p = raidPtr; 796 qNodes[0].results[0] = readQNodes[0].params[1].p; 797 } 798 } 799 800 /* initialize nodes which write new parity (Wnp) */ 801 pda = asmap->parityInfo; 802 for (i = 0; i < numParityNodes; i++) { 803 rf_InitNode(&writeParityNodes[i], rf_wait, RF_FALSE, rf_DiskWriteFunc, 804 rf_DiskWriteUndoFunc, rf_GenericWakeupFunc, 1, 1, 4, 0, dag_h, 805 "Wnp", allocList); 806 RF_ASSERT(pda != NULL); 807 writeParityNodes[i].params[0].p = pda; /* param 1 (bufPtr) 808 * filled in by xor node */ 809 writeParityNodes[i].params[1].p = xorNodes[i].results[0]; /* buffer pointer for 810 * parity write 811 * operation */ 812 writeParityNodes[i].params[2].v = parityStripeID; 813 writeParityNodes[i].params[3].v = RF_CREATE_PARAM3(RF_IO_NORMAL_PRIORITY, 814 0, 0, which_ru); 815 if (lu_flag) { 816 /* initialize node to unlock the disk queue */ 817 rf_InitNode(&unlockParityNodes[i], rf_wait, RF_FALSE, rf_DiskUnlockFunc, 818 rf_DiskUnlockUndoFunc, rf_GenericWakeupFunc, 1, 1, 2, 0, dag_h, 819 "Unp", allocList); 820 unlockParityNodes[i].params[0].p = pda; /* physical disk addr 821 * desc */ 822 unlockParityNodes[i].params[1].v = RF_CREATE_PARAM3(RF_IO_NORMAL_PRIORITY, 823 0, lu_flag, which_ru); 824 } 825 pda = pda->next; 826 } 827 828 /* initialize nodes which write new Q (Wnq) */ 829 if (nfaults == 2) { 830 pda = asmap->qInfo; 831 for (i = 0; i < numParityNodes; i++) { 832 rf_InitNode(&writeQNodes[i], rf_wait, RF_FALSE, rf_DiskWriteFunc, 833 rf_DiskWriteUndoFunc, rf_GenericWakeupFunc, 1, 1, 4, 0, dag_h, 834 "Wnq", allocList); 835 RF_ASSERT(pda != NULL); 836 writeQNodes[i].params[0].p = pda; /* param 1 (bufPtr) 837 * filled in by xor node */ 838 writeQNodes[i].params[1].p = qNodes[i].results[0]; /* buffer pointer for 839 * parity write 840 * operation */ 841 writeQNodes[i].params[2].v = parityStripeID; 842 writeQNodes[i].params[3].v = RF_CREATE_PARAM3(RF_IO_NORMAL_PRIORITY, 843 0, 0, which_ru); 844 if (lu_flag) { 845 /* initialize node to unlock the disk queue */ 846 rf_InitNode(&unlockQNodes[i], rf_wait, RF_FALSE, rf_DiskUnlockFunc, 847 rf_DiskUnlockUndoFunc, rf_GenericWakeupFunc, 1, 1, 2, 0, dag_h, 848 "Unq", allocList); 849 unlockQNodes[i].params[0].p = pda; /* physical disk addr 850 * desc */ 851 unlockQNodes[i].params[1].v = RF_CREATE_PARAM3(RF_IO_NORMAL_PRIORITY, 852 0, lu_flag, which_ru); 853 } 854 pda = pda->next; 855 } 856 } 857 /* 858 * Step 4. connect the nodes. 859 */ 860 861 /* connect header to block node */ 862 dag_h->succedents[0] = blockNode; 863 864 /* connect block node to read old data nodes */ 865 RF_ASSERT(blockNode->numSuccedents == (numDataNodes + (numParityNodes * nfaults))); 866 for (i = 0; i < numDataNodes; i++) { 867 blockNode->succedents[i] = &readDataNodes[i]; 868 RF_ASSERT(readDataNodes[i].numAntecedents == 1); 869 readDataNodes[i].antecedents[0] = blockNode; 870 readDataNodes[i].antType[0] = rf_control; 871 } 872 873 /* connect block node to read old parity nodes */ 874 for (i = 0; i < numParityNodes; i++) { 875 blockNode->succedents[numDataNodes + i] = &readParityNodes[i]; 876 RF_ASSERT(readParityNodes[i].numAntecedents == 1); 877 readParityNodes[i].antecedents[0] = blockNode; 878 readParityNodes[i].antType[0] = rf_control; 879 } 880 881 /* connect block node to read old Q nodes */ 882 if (nfaults == 2) { 883 for (i = 0; i < numParityNodes; i++) { 884 blockNode->succedents[numDataNodes + numParityNodes + i] = &readQNodes[i]; 885 RF_ASSERT(readQNodes[i].numAntecedents == 1); 886 readQNodes[i].antecedents[0] = blockNode; 887 readQNodes[i].antType[0] = rf_control; 888 } 889 } 890 /* connect read old data nodes to xor nodes */ 891 for (i = 0; i < numDataNodes; i++) { 892 RF_ASSERT(readDataNodes[i].numSuccedents == (nfaults * numParityNodes)); 893 for (j = 0; j < numParityNodes; j++) { 894 RF_ASSERT(xorNodes[j].numAntecedents == numDataNodes + numParityNodes); 895 readDataNodes[i].succedents[j] = &xorNodes[j]; 896 xorNodes[j].antecedents[i] = &readDataNodes[i]; 897 xorNodes[j].antType[i] = rf_trueData; 898 } 899 } 900 901 /* connect read old data nodes to q nodes */ 902 if (nfaults == 2) { 903 for (i = 0; i < numDataNodes; i++) { 904 for (j = 0; j < numParityNodes; j++) { 905 RF_ASSERT(qNodes[j].numAntecedents == numDataNodes + numParityNodes); 906 readDataNodes[i].succedents[numParityNodes + j] = &qNodes[j]; 907 qNodes[j].antecedents[i] = &readDataNodes[i]; 908 qNodes[j].antType[i] = rf_trueData; 909 } 910 } 911 } 912 /* connect read old parity nodes to xor nodes */ 913 for (i = 0; i < numParityNodes; i++) { 914 RF_ASSERT(readParityNodes[i].numSuccedents == numParityNodes); 915 for (j = 0; j < numParityNodes; j++) { 916 readParityNodes[i].succedents[j] = &xorNodes[j]; 917 xorNodes[j].antecedents[numDataNodes + i] = &readParityNodes[i]; 918 xorNodes[j].antType[numDataNodes + i] = rf_trueData; 919 } 920 } 921 922 /* connect read old q nodes to q nodes */ 923 if (nfaults == 2) { 924 for (i = 0; i < numParityNodes; i++) { 925 RF_ASSERT(readParityNodes[i].numSuccedents == numParityNodes); 926 for (j = 0; j < numParityNodes; j++) { 927 readQNodes[i].succedents[j] = &qNodes[j]; 928 qNodes[j].antecedents[numDataNodes + i] = &readQNodes[i]; 929 qNodes[j].antType[numDataNodes + i] = rf_trueData; 930 } 931 } 932 } 933 /* connect xor nodes to commit node */ 934 RF_ASSERT(commitNode->numAntecedents == (nfaults * numParityNodes)); 935 for (i = 0; i < numParityNodes; i++) { 936 RF_ASSERT(xorNodes[i].numSuccedents == 1); 937 xorNodes[i].succedents[0] = commitNode; 938 commitNode->antecedents[i] = &xorNodes[i]; 939 commitNode->antType[i] = rf_control; 940 } 941 942 /* connect q nodes to commit node */ 943 if (nfaults == 2) { 944 for (i = 0; i < numParityNodes; i++) { 945 RF_ASSERT(qNodes[i].numSuccedents == 1); 946 qNodes[i].succedents[0] = commitNode; 947 commitNode->antecedents[i + numParityNodes] = &qNodes[i]; 948 commitNode->antType[i + numParityNodes] = rf_control; 949 } 950 } 951 /* connect commit node to write nodes */ 952 RF_ASSERT(commitNode->numSuccedents == (numDataNodes + (nfaults * numParityNodes))); 953 for (i = 0; i < numDataNodes; i++) { 954 RF_ASSERT(writeDataNodes[i].numAntecedents == 1); 955 commitNode->succedents[i] = &writeDataNodes[i]; 956 writeDataNodes[i].antecedents[0] = commitNode; 957 writeDataNodes[i].antType[0] = rf_trueData; 958 } 959 for (i = 0; i < numParityNodes; i++) { 960 RF_ASSERT(writeParityNodes[i].numAntecedents == 1); 961 commitNode->succedents[i + numDataNodes] = &writeParityNodes[i]; 962 writeParityNodes[i].antecedents[0] = commitNode; 963 writeParityNodes[i].antType[0] = rf_trueData; 964 } 965 if (nfaults == 2) { 966 for (i = 0; i < numParityNodes; i++) { 967 RF_ASSERT(writeQNodes[i].numAntecedents == 1); 968 commitNode->succedents[i + numDataNodes + numParityNodes] = &writeQNodes[i]; 969 writeQNodes[i].antecedents[0] = commitNode; 970 writeQNodes[i].antType[0] = rf_trueData; 971 } 972 } 973 RF_ASSERT(termNode->numAntecedents == (numDataNodes + (nfaults * numParityNodes))); 974 RF_ASSERT(termNode->numSuccedents == 0); 975 for (i = 0; i < numDataNodes; i++) { 976 if (lu_flag) { 977 /* connect write new data nodes to unlock nodes */ 978 RF_ASSERT(writeDataNodes[i].numSuccedents == 1); 979 RF_ASSERT(unlockDataNodes[i].numAntecedents == 1); 980 writeDataNodes[i].succedents[0] = &unlockDataNodes[i]; 981 unlockDataNodes[i].antecedents[0] = &writeDataNodes[i]; 982 unlockDataNodes[i].antType[0] = rf_control; 983 984 /* connect unlock nodes to term node */ 985 RF_ASSERT(unlockDataNodes[i].numSuccedents == 1); 986 unlockDataNodes[i].succedents[0] = termNode; 987 termNode->antecedents[i] = &unlockDataNodes[i]; 988 termNode->antType[i] = rf_control; 989 } else { 990 /* connect write new data nodes to term node */ 991 RF_ASSERT(writeDataNodes[i].numSuccedents == 1); 992 RF_ASSERT(termNode->numAntecedents == (numDataNodes + (nfaults * numParityNodes))); 993 writeDataNodes[i].succedents[0] = termNode; 994 termNode->antecedents[i] = &writeDataNodes[i]; 995 termNode->antType[i] = rf_control; 996 } 997 } 998 999 for (i = 0; i < numParityNodes; i++) { 1000 if (lu_flag) { 1001 /* connect write new parity nodes to unlock nodes */ 1002 RF_ASSERT(writeParityNodes[i].numSuccedents == 1); 1003 RF_ASSERT(unlockParityNodes[i].numAntecedents == 1); 1004 writeParityNodes[i].succedents[0] = &unlockParityNodes[i]; 1005 unlockParityNodes[i].antecedents[0] = &writeParityNodes[i]; 1006 unlockParityNodes[i].antType[0] = rf_control; 1007 1008 /* connect unlock nodes to term node */ 1009 RF_ASSERT(unlockParityNodes[i].numSuccedents == 1); 1010 unlockParityNodes[i].succedents[0] = termNode; 1011 termNode->antecedents[numDataNodes + i] = &unlockParityNodes[i]; 1012 termNode->antType[numDataNodes + i] = rf_control; 1013 } else { 1014 RF_ASSERT(writeParityNodes[i].numSuccedents == 1); 1015 writeParityNodes[i].succedents[0] = termNode; 1016 termNode->antecedents[numDataNodes + i] = &writeParityNodes[i]; 1017 termNode->antType[numDataNodes + i] = rf_control; 1018 } 1019 } 1020 1021 if (nfaults == 2) { 1022 for (i = 0; i < numParityNodes; i++) { 1023 if (lu_flag) { 1024 /* connect write new Q nodes to unlock nodes */ 1025 RF_ASSERT(writeQNodes[i].numSuccedents == 1); 1026 RF_ASSERT(unlockQNodes[i].numAntecedents == 1); 1027 writeQNodes[i].succedents[0] = &unlockQNodes[i]; 1028 unlockQNodes[i].antecedents[0] = &writeQNodes[i]; 1029 unlockQNodes[i].antType[0] = rf_control; 1030 1031 /* connect unlock nodes to unblock node */ 1032 RF_ASSERT(unlockQNodes[i].numSuccedents == 1); 1033 unlockQNodes[i].succedents[0] = termNode; 1034 termNode->antecedents[numDataNodes + numParityNodes + i] = &unlockQNodes[i]; 1035 termNode->antType[numDataNodes + numParityNodes + i] = rf_control; 1036 } else { 1037 RF_ASSERT(writeQNodes[i].numSuccedents == 1); 1038 writeQNodes[i].succedents[0] = termNode; 1039 termNode->antecedents[numDataNodes + numParityNodes + i] = &writeQNodes[i]; 1040 termNode->antType[numDataNodes + numParityNodes + i] = rf_control; 1041 } 1042 } 1043 } 1044 } 1045 1046 1047 /****************************************************************************** 1048 * create a write graph (fault-free or degraded) for RAID level 1 1049 * 1050 * Hdr -> Commit -> Wpd -> Nil -> Trm 1051 * -> Wsd -> 1052 * 1053 * The "Wpd" node writes data to the primary copy in the mirror pair 1054 * The "Wsd" node writes data to the secondary copy in the mirror pair 1055 * 1056 * Parameters: raidPtr - description of the physical array 1057 * asmap - logical & physical addresses for this access 1058 * bp - buffer ptr (holds write data) 1059 * flags - general flags (e.g. disk locking) 1060 * allocList - list of memory allocated in DAG creation 1061 *****************************************************************************/ 1062 1063 void 1064 rf_CreateRaidOneWriteDAG( 1065 RF_Raid_t * raidPtr, 1066 RF_AccessStripeMap_t * asmap, 1067 RF_DagHeader_t * dag_h, 1068 void *bp, 1069 RF_RaidAccessFlags_t flags, 1070 RF_AllocListElem_t * allocList) 1071 { 1072 RF_DagNode_t *unblockNode, *termNode, *commitNode; 1073 RF_DagNode_t *nodes, *wndNode, *wmirNode; 1074 int nWndNodes, nWmirNodes, i; 1075 RF_ReconUnitNum_t which_ru; 1076 RF_PhysDiskAddr_t *pda, *pdaP; 1077 RF_StripeNum_t parityStripeID; 1078 1079 parityStripeID = rf_RaidAddressToParityStripeID(&(raidPtr->Layout), 1080 asmap->raidAddress, &which_ru); 1081 if (rf_dagDebug) { 1082 printf("[Creating RAID level 1 write DAG]\n"); 1083 } 1084 dag_h->creator = "RaidOneWriteDAG"; 1085 1086 /* 2 implies access not SU aligned */ 1087 nWmirNodes = (asmap->parityInfo->next) ? 2 : 1; 1088 nWndNodes = (asmap->physInfo->next) ? 2 : 1; 1089 1090 /* alloc the Wnd nodes and the Wmir node */ 1091 if (asmap->numDataFailed == 1) 1092 nWndNodes--; 1093 if (asmap->numParityFailed == 1) 1094 nWmirNodes--; 1095 1096 /* total number of nodes = nWndNodes + nWmirNodes + (commit + unblock 1097 * + terminator) */ 1098 RF_CallocAndAdd(nodes, nWndNodes + nWmirNodes + 3, sizeof(RF_DagNode_t), 1099 (RF_DagNode_t *), allocList); 1100 i = 0; 1101 wndNode = &nodes[i]; 1102 i += nWndNodes; 1103 wmirNode = &nodes[i]; 1104 i += nWmirNodes; 1105 commitNode = &nodes[i]; 1106 i += 1; 1107 unblockNode = &nodes[i]; 1108 i += 1; 1109 termNode = &nodes[i]; 1110 i += 1; 1111 RF_ASSERT(i == (nWndNodes + nWmirNodes + 3)); 1112 1113 /* this dag can commit immediately */ 1114 dag_h->numCommitNodes = 1; 1115 dag_h->numCommits = 0; 1116 dag_h->numSuccedents = 1; 1117 1118 /* initialize the commit, unblock, and term nodes */ 1119 rf_InitNode(commitNode, rf_wait, RF_TRUE, rf_NullNodeFunc, rf_NullNodeUndoFunc, 1120 NULL, (nWndNodes + nWmirNodes), 0, 0, 0, dag_h, "Cmt", allocList); 1121 rf_InitNode(unblockNode, rf_wait, RF_FALSE, rf_NullNodeFunc, rf_NullNodeUndoFunc, 1122 NULL, 1, (nWndNodes + nWmirNodes), 0, 0, dag_h, "Nil", allocList); 1123 rf_InitNode(termNode, rf_wait, RF_FALSE, rf_TerminateFunc, rf_TerminateUndoFunc, 1124 NULL, 0, 1, 0, 0, dag_h, "Trm", allocList); 1125 1126 /* initialize the wnd nodes */ 1127 if (nWndNodes > 0) { 1128 pda = asmap->physInfo; 1129 for (i = 0; i < nWndNodes; i++) { 1130 rf_InitNode(&wndNode[i], rf_wait, RF_FALSE, rf_DiskWriteFunc, rf_DiskWriteUndoFunc, 1131 rf_GenericWakeupFunc, 1, 1, 4, 0, dag_h, "Wpd", allocList); 1132 RF_ASSERT(pda != NULL); 1133 wndNode[i].params[0].p = pda; 1134 wndNode[i].params[1].p = pda->bufPtr; 1135 wndNode[i].params[2].v = parityStripeID; 1136 wndNode[i].params[3].v = RF_CREATE_PARAM3(RF_IO_NORMAL_PRIORITY, 0, 0, which_ru); 1137 pda = pda->next; 1138 } 1139 RF_ASSERT(pda == NULL); 1140 } 1141 /* initialize the mirror nodes */ 1142 if (nWmirNodes > 0) { 1143 pda = asmap->physInfo; 1144 pdaP = asmap->parityInfo; 1145 for (i = 0; i < nWmirNodes; i++) { 1146 rf_InitNode(&wmirNode[i], rf_wait, RF_FALSE, rf_DiskWriteFunc, rf_DiskWriteUndoFunc, 1147 rf_GenericWakeupFunc, 1, 1, 4, 0, dag_h, "Wsd", allocList); 1148 RF_ASSERT(pda != NULL); 1149 wmirNode[i].params[0].p = pdaP; 1150 wmirNode[i].params[1].p = pda->bufPtr; 1151 wmirNode[i].params[2].v = parityStripeID; 1152 wmirNode[i].params[3].v = RF_CREATE_PARAM3(RF_IO_NORMAL_PRIORITY, 0, 0, which_ru); 1153 pda = pda->next; 1154 pdaP = pdaP->next; 1155 } 1156 RF_ASSERT(pda == NULL); 1157 RF_ASSERT(pdaP == NULL); 1158 } 1159 /* link the header node to the commit node */ 1160 RF_ASSERT(dag_h->numSuccedents == 1); 1161 RF_ASSERT(commitNode->numAntecedents == 0); 1162 dag_h->succedents[0] = commitNode; 1163 1164 /* link the commit node to the write nodes */ 1165 RF_ASSERT(commitNode->numSuccedents == (nWndNodes + nWmirNodes)); 1166 for (i = 0; i < nWndNodes; i++) { 1167 RF_ASSERT(wndNode[i].numAntecedents == 1); 1168 commitNode->succedents[i] = &wndNode[i]; 1169 wndNode[i].antecedents[0] = commitNode; 1170 wndNode[i].antType[0] = rf_control; 1171 } 1172 for (i = 0; i < nWmirNodes; i++) { 1173 RF_ASSERT(wmirNode[i].numAntecedents == 1); 1174 commitNode->succedents[i + nWndNodes] = &wmirNode[i]; 1175 wmirNode[i].antecedents[0] = commitNode; 1176 wmirNode[i].antType[0] = rf_control; 1177 } 1178 1179 /* link the write nodes to the unblock node */ 1180 RF_ASSERT(unblockNode->numAntecedents == (nWndNodes + nWmirNodes)); 1181 for (i = 0; i < nWndNodes; i++) { 1182 RF_ASSERT(wndNode[i].numSuccedents == 1); 1183 wndNode[i].succedents[0] = unblockNode; 1184 unblockNode->antecedents[i] = &wndNode[i]; 1185 unblockNode->antType[i] = rf_control; 1186 } 1187 for (i = 0; i < nWmirNodes; i++) { 1188 RF_ASSERT(wmirNode[i].numSuccedents == 1); 1189 wmirNode[i].succedents[0] = unblockNode; 1190 unblockNode->antecedents[i + nWndNodes] = &wmirNode[i]; 1191 unblockNode->antType[i + nWndNodes] = rf_control; 1192 } 1193 1194 /* link the unblock node to the term node */ 1195 RF_ASSERT(unblockNode->numSuccedents == 1); 1196 RF_ASSERT(termNode->numAntecedents == 1); 1197 RF_ASSERT(termNode->numSuccedents == 0); 1198 unblockNode->succedents[0] = termNode; 1199 termNode->antecedents[0] = unblockNode; 1200 termNode->antType[0] = rf_control; 1201 } 1202 1203 1204 1205 /* DAGs which have no commit points. 1206 * 1207 * The following DAGs are used in forward and backward error recovery experiments. 1208 * They are identical to the DAGs above this comment with the exception that the 1209 * the commit points have been removed. 1210 */ 1211 1212 1213 1214 void 1215 rf_CommonCreateLargeWriteDAGFwd( 1216 RF_Raid_t * raidPtr, 1217 RF_AccessStripeMap_t * asmap, 1218 RF_DagHeader_t * dag_h, 1219 void *bp, 1220 RF_RaidAccessFlags_t flags, 1221 RF_AllocListElem_t * allocList, 1222 int nfaults, 1223 int (*redFunc) (RF_DagNode_t *), 1224 int allowBufferRecycle) 1225 { 1226 RF_DagNode_t *nodes, *wndNodes, *rodNodes, *xorNode, *wnpNode; 1227 RF_DagNode_t *wnqNode, *blockNode, *syncNode, *termNode; 1228 int nWndNodes, nRodNodes, i, nodeNum, asmNum; 1229 RF_AccessStripeMapHeader_t *new_asm_h[2]; 1230 RF_StripeNum_t parityStripeID; 1231 char *sosBuffer, *eosBuffer; 1232 RF_ReconUnitNum_t which_ru; 1233 RF_RaidLayout_t *layoutPtr; 1234 RF_PhysDiskAddr_t *pda; 1235 1236 layoutPtr = &(raidPtr->Layout); 1237 parityStripeID = rf_RaidAddressToParityStripeID(&(raidPtr->Layout), asmap->raidAddress, &which_ru); 1238 1239 if (rf_dagDebug) 1240 printf("[Creating large-write DAG]\n"); 1241 dag_h->creator = "LargeWriteDAGFwd"; 1242 1243 dag_h->numCommitNodes = 0; 1244 dag_h->numCommits = 0; 1245 dag_h->numSuccedents = 1; 1246 1247 /* alloc the nodes: Wnd, xor, commit, block, term, and Wnp */ 1248 nWndNodes = asmap->numStripeUnitsAccessed; 1249 RF_CallocAndAdd(nodes, nWndNodes + 4 + nfaults, sizeof(RF_DagNode_t), (RF_DagNode_t *), allocList); 1250 i = 0; 1251 wndNodes = &nodes[i]; 1252 i += nWndNodes; 1253 xorNode = &nodes[i]; 1254 i += 1; 1255 wnpNode = &nodes[i]; 1256 i += 1; 1257 blockNode = &nodes[i]; 1258 i += 1; 1259 syncNode = &nodes[i]; 1260 i += 1; 1261 termNode = &nodes[i]; 1262 i += 1; 1263 if (nfaults == 2) { 1264 wnqNode = &nodes[i]; 1265 i += 1; 1266 } else { 1267 wnqNode = NULL; 1268 } 1269 rf_MapUnaccessedPortionOfStripe(raidPtr, layoutPtr, asmap, dag_h, new_asm_h, &nRodNodes, &sosBuffer, &eosBuffer, allocList); 1270 if (nRodNodes > 0) { 1271 RF_CallocAndAdd(rodNodes, nRodNodes, sizeof(RF_DagNode_t), (RF_DagNode_t *), allocList); 1272 } else { 1273 rodNodes = NULL; 1274 } 1275 1276 /* begin node initialization */ 1277 if (nRodNodes > 0) { 1278 rf_InitNode(blockNode, rf_wait, RF_FALSE, rf_NullNodeFunc, rf_NullNodeUndoFunc, NULL, nRodNodes, 0, 0, 0, dag_h, "Nil", allocList); 1279 rf_InitNode(syncNode, rf_wait, RF_FALSE, rf_NullNodeFunc, rf_NullNodeUndoFunc, NULL, nWndNodes + 1, nRodNodes, 0, 0, dag_h, "Nil", allocList); 1280 } else { 1281 rf_InitNode(blockNode, rf_wait, RF_FALSE, rf_NullNodeFunc, rf_NullNodeUndoFunc, NULL, 1, 0, 0, 0, dag_h, "Nil", allocList); 1282 rf_InitNode(syncNode, rf_wait, RF_FALSE, rf_NullNodeFunc, rf_NullNodeUndoFunc, NULL, nWndNodes + 1, 1, 0, 0, dag_h, "Nil", allocList); 1283 } 1284 1285 rf_InitNode(termNode, rf_wait, RF_FALSE, rf_TerminateFunc, rf_TerminateUndoFunc, NULL, 0, nWndNodes + nfaults, 0, 0, dag_h, "Trm", allocList); 1286 1287 /* initialize the Rod nodes */ 1288 for (nodeNum = asmNum = 0; asmNum < 2; asmNum++) { 1289 if (new_asm_h[asmNum]) { 1290 pda = new_asm_h[asmNum]->stripeMap->physInfo; 1291 while (pda) { 1292 rf_InitNode(&rodNodes[nodeNum], rf_wait, RF_FALSE, rf_DiskReadFunc, rf_DiskReadUndoFunc, rf_GenericWakeupFunc, 1, 1, 4, 0, dag_h, "Rod", allocList); 1293 rodNodes[nodeNum].params[0].p = pda; 1294 rodNodes[nodeNum].params[1].p = pda->bufPtr; 1295 rodNodes[nodeNum].params[2].v = parityStripeID; 1296 rodNodes[nodeNum].params[3].v = RF_CREATE_PARAM3(RF_IO_NORMAL_PRIORITY, 0, 0, which_ru); 1297 nodeNum++; 1298 pda = pda->next; 1299 } 1300 } 1301 } 1302 RF_ASSERT(nodeNum == nRodNodes); 1303 1304 /* initialize the wnd nodes */ 1305 pda = asmap->physInfo; 1306 for (i = 0; i < nWndNodes; i++) { 1307 rf_InitNode(&wndNodes[i], rf_wait, RF_FALSE, rf_DiskWriteFunc, rf_DiskWriteUndoFunc, rf_GenericWakeupFunc, 1, 1, 4, 0, dag_h, "Wnd", allocList); 1308 RF_ASSERT(pda != NULL); 1309 wndNodes[i].params[0].p = pda; 1310 wndNodes[i].params[1].p = pda->bufPtr; 1311 wndNodes[i].params[2].v = parityStripeID; 1312 wndNodes[i].params[3].v = RF_CREATE_PARAM3(RF_IO_NORMAL_PRIORITY, 0, 0, which_ru); 1313 pda = pda->next; 1314 } 1315 1316 /* initialize the redundancy node */ 1317 rf_InitNode(xorNode, rf_wait, RF_FALSE, redFunc, rf_NullNodeUndoFunc, NULL, 1, nfaults, 2 * (nWndNodes + nRodNodes) + 1, nfaults, dag_h, "Xr ", allocList); 1318 xorNode->flags |= RF_DAGNODE_FLAG_YIELD; 1319 for (i = 0; i < nWndNodes; i++) { 1320 xorNode->params[2 * i + 0] = wndNodes[i].params[0]; /* pda */ 1321 xorNode->params[2 * i + 1] = wndNodes[i].params[1]; /* buf ptr */ 1322 } 1323 for (i = 0; i < nRodNodes; i++) { 1324 xorNode->params[2 * (nWndNodes + i) + 0] = rodNodes[i].params[0]; /* pda */ 1325 xorNode->params[2 * (nWndNodes + i) + 1] = rodNodes[i].params[1]; /* buf ptr */ 1326 } 1327 xorNode->params[2 * (nWndNodes + nRodNodes)].p = raidPtr; /* xor node needs to get 1328 * at RAID information */ 1329 1330 /* look for an Rod node that reads a complete SU. If none, alloc a 1331 * buffer to receive the parity info. Note that we can't use a new 1332 * data buffer because it will not have gotten written when the xor 1333 * occurs. */ 1334 if (allowBufferRecycle) { 1335 for (i = 0; i < nRodNodes; i++) 1336 if (((RF_PhysDiskAddr_t *) rodNodes[i].params[0].p)->numSector == raidPtr->Layout.sectorsPerStripeUnit) 1337 break; 1338 } 1339 if ((!allowBufferRecycle) || (i == nRodNodes)) { 1340 RF_CallocAndAdd(xorNode->results[0], 1, rf_RaidAddressToByte(raidPtr, raidPtr->Layout.sectorsPerStripeUnit), (void *), allocList); 1341 } else 1342 xorNode->results[0] = rodNodes[i].params[1].p; 1343 1344 /* initialize the Wnp node */ 1345 rf_InitNode(wnpNode, rf_wait, RF_FALSE, rf_DiskWriteFunc, rf_DiskWriteUndoFunc, rf_GenericWakeupFunc, 1, 1, 4, 0, dag_h, "Wnp", allocList); 1346 wnpNode->params[0].p = asmap->parityInfo; 1347 wnpNode->params[1].p = xorNode->results[0]; 1348 wnpNode->params[2].v = parityStripeID; 1349 wnpNode->params[3].v = RF_CREATE_PARAM3(RF_IO_NORMAL_PRIORITY, 0, 0, which_ru); 1350 RF_ASSERT(asmap->parityInfo->next == NULL); /* parityInfo must 1351 * describe entire 1352 * parity unit */ 1353 1354 if (nfaults == 2) { 1355 /* we never try to recycle a buffer for the Q calcuation in 1356 * addition to the parity. This would cause two buffers to get 1357 * smashed during the P and Q calculation, guaranteeing one 1358 * would be wrong. */ 1359 RF_CallocAndAdd(xorNode->results[1], 1, rf_RaidAddressToByte(raidPtr, raidPtr->Layout.sectorsPerStripeUnit), (void *), allocList); 1360 rf_InitNode(wnqNode, rf_wait, RF_FALSE, rf_DiskWriteFunc, rf_DiskWriteUndoFunc, rf_GenericWakeupFunc, 1, 1, 4, 0, dag_h, "Wnq", allocList); 1361 wnqNode->params[0].p = asmap->qInfo; 1362 wnqNode->params[1].p = xorNode->results[1]; 1363 wnqNode->params[2].v = parityStripeID; 1364 wnqNode->params[3].v = RF_CREATE_PARAM3(RF_IO_NORMAL_PRIORITY, 0, 0, which_ru); 1365 RF_ASSERT(asmap->parityInfo->next == NULL); /* parityInfo must 1366 * describe entire 1367 * parity unit */ 1368 } 1369 /* connect nodes to form graph */ 1370 1371 /* connect dag header to block node */ 1372 RF_ASSERT(blockNode->numAntecedents == 0); 1373 dag_h->succedents[0] = blockNode; 1374 1375 if (nRodNodes > 0) { 1376 /* connect the block node to the Rod nodes */ 1377 RF_ASSERT(blockNode->numSuccedents == nRodNodes); 1378 RF_ASSERT(syncNode->numAntecedents == nRodNodes); 1379 for (i = 0; i < nRodNodes; i++) { 1380 RF_ASSERT(rodNodes[i].numAntecedents == 1); 1381 blockNode->succedents[i] = &rodNodes[i]; 1382 rodNodes[i].antecedents[0] = blockNode; 1383 rodNodes[i].antType[0] = rf_control; 1384 1385 /* connect the Rod nodes to the Nil node */ 1386 RF_ASSERT(rodNodes[i].numSuccedents == 1); 1387 rodNodes[i].succedents[0] = syncNode; 1388 syncNode->antecedents[i] = &rodNodes[i]; 1389 syncNode->antType[i] = rf_trueData; 1390 } 1391 } else { 1392 /* connect the block node to the Nil node */ 1393 RF_ASSERT(blockNode->numSuccedents == 1); 1394 RF_ASSERT(syncNode->numAntecedents == 1); 1395 blockNode->succedents[0] = syncNode; 1396 syncNode->antecedents[0] = blockNode; 1397 syncNode->antType[0] = rf_control; 1398 } 1399 1400 /* connect the sync node to the Wnd nodes */ 1401 RF_ASSERT(syncNode->numSuccedents == (1 + nWndNodes)); 1402 for (i = 0; i < nWndNodes; i++) { 1403 RF_ASSERT(wndNodes->numAntecedents == 1); 1404 syncNode->succedents[i] = &wndNodes[i]; 1405 wndNodes[i].antecedents[0] = syncNode; 1406 wndNodes[i].antType[0] = rf_control; 1407 } 1408 1409 /* connect the sync node to the Xor node */ 1410 RF_ASSERT(xorNode->numAntecedents == 1); 1411 syncNode->succedents[nWndNodes] = xorNode; 1412 xorNode->antecedents[0] = syncNode; 1413 xorNode->antType[0] = rf_control; 1414 1415 /* connect the xor node to the write parity node */ 1416 RF_ASSERT(xorNode->numSuccedents == nfaults); 1417 RF_ASSERT(wnpNode->numAntecedents == 1); 1418 xorNode->succedents[0] = wnpNode; 1419 wnpNode->antecedents[0] = xorNode; 1420 wnpNode->antType[0] = rf_trueData; 1421 if (nfaults == 2) { 1422 RF_ASSERT(wnqNode->numAntecedents == 1); 1423 xorNode->succedents[1] = wnqNode; 1424 wnqNode->antecedents[0] = xorNode; 1425 wnqNode->antType[0] = rf_trueData; 1426 } 1427 /* connect the write nodes to the term node */ 1428 RF_ASSERT(termNode->numAntecedents == nWndNodes + nfaults); 1429 RF_ASSERT(termNode->numSuccedents == 0); 1430 for (i = 0; i < nWndNodes; i++) { 1431 RF_ASSERT(wndNodes->numSuccedents == 1); 1432 wndNodes[i].succedents[0] = termNode; 1433 termNode->antecedents[i] = &wndNodes[i]; 1434 termNode->antType[i] = rf_control; 1435 } 1436 RF_ASSERT(wnpNode->numSuccedents == 1); 1437 wnpNode->succedents[0] = termNode; 1438 termNode->antecedents[nWndNodes] = wnpNode; 1439 termNode->antType[nWndNodes] = rf_control; 1440 if (nfaults == 2) { 1441 RF_ASSERT(wnqNode->numSuccedents == 1); 1442 wnqNode->succedents[0] = termNode; 1443 termNode->antecedents[nWndNodes + 1] = wnqNode; 1444 termNode->antType[nWndNodes + 1] = rf_control; 1445 } 1446 } 1447 1448 1449 /****************************************************************************** 1450 * 1451 * creates a DAG to perform a small-write operation (either raid 5 or pq), 1452 * which is as follows: 1453 * 1454 * Hdr -> Nil -> Rop - Xor - Wnp [Unp] -- Trm 1455 * \- Rod X- Wnd [Und] -------/ 1456 * [\- Rod X- Wnd [Und] ------/] 1457 * [\- Roq - Q --> Wnq [Unq]-/] 1458 * 1459 * Rop = read old parity 1460 * Rod = read old data 1461 * Roq = read old "q" 1462 * Cmt = commit node 1463 * Und = unlock data disk 1464 * Unp = unlock parity disk 1465 * Unq = unlock q disk 1466 * Wnp = write new parity 1467 * Wnd = write new data 1468 * Wnq = write new "q" 1469 * [ ] denotes optional segments in the graph 1470 * 1471 * Parameters: raidPtr - description of the physical array 1472 * asmap - logical & physical addresses for this access 1473 * bp - buffer ptr (holds write data) 1474 * flags - general flags (e.g. disk locking) 1475 * allocList - list of memory allocated in DAG creation 1476 * pfuncs - list of parity generating functions 1477 * qfuncs - list of q generating functions 1478 * 1479 * A null qfuncs indicates single fault tolerant 1480 *****************************************************************************/ 1481 1482 void 1483 rf_CommonCreateSmallWriteDAGFwd( 1484 RF_Raid_t * raidPtr, 1485 RF_AccessStripeMap_t * asmap, 1486 RF_DagHeader_t * dag_h, 1487 void *bp, 1488 RF_RaidAccessFlags_t flags, 1489 RF_AllocListElem_t * allocList, 1490 RF_RedFuncs_t * pfuncs, 1491 RF_RedFuncs_t * qfuncs) 1492 { 1493 RF_DagNode_t *readDataNodes, *readParityNodes, *readQNodes, *termNode; 1494 RF_DagNode_t *unlockDataNodes, *unlockParityNodes, *unlockQNodes; 1495 RF_DagNode_t *xorNodes, *qNodes, *blockNode, *nodes; 1496 RF_DagNode_t *writeDataNodes, *writeParityNodes, *writeQNodes; 1497 int i, j, nNodes, totalNumNodes, lu_flag; 1498 RF_ReconUnitNum_t which_ru; 1499 int (*func) (RF_DagNode_t *), (*undoFunc) (RF_DagNode_t *); 1500 int (*qfunc) (RF_DagNode_t *); 1501 int numDataNodes, numParityNodes; 1502 RF_StripeNum_t parityStripeID; 1503 RF_PhysDiskAddr_t *pda; 1504 char *name, *qname; 1505 long nfaults; 1506 1507 nfaults = qfuncs ? 2 : 1; 1508 lu_flag = (rf_enableAtomicRMW) ? 1 : 0; /* lock/unlock flag */ 1509 1510 parityStripeID = rf_RaidAddressToParityStripeID(&(raidPtr->Layout), asmap->raidAddress, &which_ru); 1511 pda = asmap->physInfo; 1512 numDataNodes = asmap->numStripeUnitsAccessed; 1513 numParityNodes = (asmap->parityInfo->next) ? 2 : 1; 1514 1515 if (rf_dagDebug) 1516 printf("[Creating small-write DAG]\n"); 1517 RF_ASSERT(numDataNodes > 0); 1518 dag_h->creator = "SmallWriteDAGFwd"; 1519 1520 dag_h->numCommitNodes = 0; 1521 dag_h->numCommits = 0; 1522 dag_h->numSuccedents = 1; 1523 1524 qfunc = NULL; 1525 qname = NULL; 1526 1527 /* DAG creation occurs in four steps: 1. count the number of nodes in 1528 * the DAG 2. create the nodes 3. initialize the nodes 4. connect the 1529 * nodes */ 1530 1531 /* Step 1. compute number of nodes in the graph */ 1532 1533 /* number of nodes: a read and write for each data unit a redundancy 1534 * computation node for each parity node (nfaults * nparity) a read 1535 * and write for each parity unit a block node a terminate node if 1536 * atomic RMW an unlock node for each data unit, redundancy unit */ 1537 totalNumNodes = (2 * numDataNodes) + (nfaults * numParityNodes) + (nfaults * 2 * numParityNodes) + 2; 1538 if (lu_flag) 1539 totalNumNodes += (numDataNodes + (nfaults * numParityNodes)); 1540 1541 1542 /* Step 2. create the nodes */ 1543 RF_CallocAndAdd(nodes, totalNumNodes, sizeof(RF_DagNode_t), (RF_DagNode_t *), allocList); 1544 i = 0; 1545 blockNode = &nodes[i]; 1546 i += 1; 1547 readDataNodes = &nodes[i]; 1548 i += numDataNodes; 1549 readParityNodes = &nodes[i]; 1550 i += numParityNodes; 1551 writeDataNodes = &nodes[i]; 1552 i += numDataNodes; 1553 writeParityNodes = &nodes[i]; 1554 i += numParityNodes; 1555 xorNodes = &nodes[i]; 1556 i += numParityNodes; 1557 termNode = &nodes[i]; 1558 i += 1; 1559 if (lu_flag) { 1560 unlockDataNodes = &nodes[i]; 1561 i += numDataNodes; 1562 unlockParityNodes = &nodes[i]; 1563 i += numParityNodes; 1564 } else { 1565 unlockDataNodes = unlockParityNodes = NULL; 1566 } 1567 if (nfaults == 2) { 1568 readQNodes = &nodes[i]; 1569 i += numParityNodes; 1570 writeQNodes = &nodes[i]; 1571 i += numParityNodes; 1572 qNodes = &nodes[i]; 1573 i += numParityNodes; 1574 if (lu_flag) { 1575 unlockQNodes = &nodes[i]; 1576 i += numParityNodes; 1577 } else { 1578 unlockQNodes = NULL; 1579 } 1580 } else { 1581 readQNodes = writeQNodes = qNodes = unlockQNodes = NULL; 1582 } 1583 RF_ASSERT(i == totalNumNodes); 1584 1585 /* Step 3. initialize the nodes */ 1586 /* initialize block node (Nil) */ 1587 nNodes = numDataNodes + (nfaults * numParityNodes); 1588 rf_InitNode(blockNode, rf_wait, RF_FALSE, rf_NullNodeFunc, rf_NullNodeUndoFunc, NULL, nNodes, 0, 0, 0, dag_h, "Nil", allocList); 1589 1590 /* initialize terminate node (Trm) */ 1591 rf_InitNode(termNode, rf_wait, RF_FALSE, rf_TerminateFunc, rf_TerminateUndoFunc, NULL, 0, nNodes, 0, 0, dag_h, "Trm", allocList); 1592 1593 /* initialize nodes which read old data (Rod) */ 1594 for (i = 0; i < numDataNodes; i++) { 1595 rf_InitNode(&readDataNodes[i], rf_wait, RF_FALSE, rf_DiskReadFunc, rf_DiskReadUndoFunc, rf_GenericWakeupFunc, (numParityNodes * nfaults) + 1, 1, 4, 0, dag_h, "Rod", allocList); 1596 RF_ASSERT(pda != NULL); 1597 readDataNodes[i].params[0].p = pda; /* physical disk addr 1598 * desc */ 1599 readDataNodes[i].params[1].p = rf_AllocBuffer(raidPtr, dag_h, pda, allocList); /* buffer to hold old 1600 * data */ 1601 readDataNodes[i].params[2].v = parityStripeID; 1602 readDataNodes[i].params[3].v = RF_CREATE_PARAM3(RF_IO_NORMAL_PRIORITY, lu_flag, 0, which_ru); 1603 pda = pda->next; 1604 for (j = 0; j < readDataNodes[i].numSuccedents; j++) 1605 readDataNodes[i].propList[j] = NULL; 1606 } 1607 1608 /* initialize nodes which read old parity (Rop) */ 1609 pda = asmap->parityInfo; 1610 i = 0; 1611 for (i = 0; i < numParityNodes; i++) { 1612 RF_ASSERT(pda != NULL); 1613 rf_InitNode(&readParityNodes[i], rf_wait, RF_FALSE, rf_DiskReadFunc, rf_DiskReadUndoFunc, rf_GenericWakeupFunc, numParityNodes, 1, 4, 0, dag_h, "Rop", allocList); 1614 readParityNodes[i].params[0].p = pda; 1615 readParityNodes[i].params[1].p = rf_AllocBuffer(raidPtr, dag_h, pda, allocList); /* buffer to hold old 1616 * parity */ 1617 readParityNodes[i].params[2].v = parityStripeID; 1618 readParityNodes[i].params[3].v = RF_CREATE_PARAM3(RF_IO_NORMAL_PRIORITY, lu_flag, 0, which_ru); 1619 for (j = 0; j < readParityNodes[i].numSuccedents; j++) 1620 readParityNodes[i].propList[0] = NULL; 1621 pda = pda->next; 1622 } 1623 1624 /* initialize nodes which read old Q (Roq) */ 1625 if (nfaults == 2) { 1626 pda = asmap->qInfo; 1627 for (i = 0; i < numParityNodes; i++) { 1628 RF_ASSERT(pda != NULL); 1629 rf_InitNode(&readQNodes[i], rf_wait, RF_FALSE, rf_DiskReadFunc, rf_DiskReadUndoFunc, rf_GenericWakeupFunc, numParityNodes, 1, 4, 0, dag_h, "Roq", allocList); 1630 readQNodes[i].params[0].p = pda; 1631 readQNodes[i].params[1].p = rf_AllocBuffer(raidPtr, dag_h, pda, allocList); /* buffer to hold old Q */ 1632 readQNodes[i].params[2].v = parityStripeID; 1633 readQNodes[i].params[3].v = RF_CREATE_PARAM3(RF_IO_NORMAL_PRIORITY, lu_flag, 0, which_ru); 1634 for (j = 0; j < readQNodes[i].numSuccedents; j++) 1635 readQNodes[i].propList[0] = NULL; 1636 pda = pda->next; 1637 } 1638 } 1639 /* initialize nodes which write new data (Wnd) */ 1640 pda = asmap->physInfo; 1641 for (i = 0; i < numDataNodes; i++) { 1642 RF_ASSERT(pda != NULL); 1643 rf_InitNode(&writeDataNodes[i], rf_wait, RF_FALSE, rf_DiskWriteFunc, rf_DiskWriteUndoFunc, rf_GenericWakeupFunc, 1, 1, 4, 0, dag_h, "Wnd", allocList); 1644 writeDataNodes[i].params[0].p = pda; /* physical disk addr 1645 * desc */ 1646 writeDataNodes[i].params[1].p = pda->bufPtr; /* buffer holding new 1647 * data to be written */ 1648 writeDataNodes[i].params[2].v = parityStripeID; 1649 writeDataNodes[i].params[3].v = RF_CREATE_PARAM3(RF_IO_NORMAL_PRIORITY, 0, 0, which_ru); 1650 1651 if (lu_flag) { 1652 /* initialize node to unlock the disk queue */ 1653 rf_InitNode(&unlockDataNodes[i], rf_wait, RF_FALSE, rf_DiskUnlockFunc, rf_DiskUnlockUndoFunc, rf_GenericWakeupFunc, 1, 1, 2, 0, dag_h, "Und", allocList); 1654 unlockDataNodes[i].params[0].p = pda; /* physical disk addr 1655 * desc */ 1656 unlockDataNodes[i].params[1].v = RF_CREATE_PARAM3(RF_IO_NORMAL_PRIORITY, 0, lu_flag, which_ru); 1657 } 1658 pda = pda->next; 1659 } 1660 1661 1662 /* initialize nodes which compute new parity and Q */ 1663 /* we use the simple XOR func in the double-XOR case, and when we're 1664 * accessing only a portion of one stripe unit. the distinction 1665 * between the two is that the regular XOR func assumes that the 1666 * targbuf is a full SU in size, and examines the pda associated with 1667 * the buffer to decide where within the buffer to XOR the data, 1668 * whereas the simple XOR func just XORs the data into the start of 1669 * the buffer. */ 1670 if ((numParityNodes == 2) || ((numDataNodes == 1) && (asmap->totalSectorsAccessed < raidPtr->Layout.sectorsPerStripeUnit))) { 1671 func = pfuncs->simple; 1672 undoFunc = rf_NullNodeUndoFunc; 1673 name = pfuncs->SimpleName; 1674 if (qfuncs) { 1675 qfunc = qfuncs->simple; 1676 qname = qfuncs->SimpleName; 1677 } 1678 } else { 1679 func = pfuncs->regular; 1680 undoFunc = rf_NullNodeUndoFunc; 1681 name = pfuncs->RegularName; 1682 if (qfuncs) { 1683 qfunc = qfuncs->regular; 1684 qname = qfuncs->RegularName; 1685 } 1686 } 1687 /* initialize the xor nodes: params are {pda,buf} from {Rod,Wnd,Rop} 1688 * nodes, and raidPtr */ 1689 if (numParityNodes == 2) { /* double-xor case */ 1690 for (i = 0; i < numParityNodes; i++) { 1691 rf_InitNode(&xorNodes[i], rf_wait, RF_FALSE, func, undoFunc, NULL, numParityNodes, numParityNodes + numDataNodes, 7, 1, dag_h, name, allocList); /* no wakeup func for 1692 * xor */ 1693 xorNodes[i].flags |= RF_DAGNODE_FLAG_YIELD; 1694 xorNodes[i].params[0] = readDataNodes[i].params[0]; 1695 xorNodes[i].params[1] = readDataNodes[i].params[1]; 1696 xorNodes[i].params[2] = readParityNodes[i].params[0]; 1697 xorNodes[i].params[3] = readParityNodes[i].params[1]; 1698 xorNodes[i].params[4] = writeDataNodes[i].params[0]; 1699 xorNodes[i].params[5] = writeDataNodes[i].params[1]; 1700 xorNodes[i].params[6].p = raidPtr; 1701 xorNodes[i].results[0] = readParityNodes[i].params[1].p; /* use old parity buf as 1702 * target buf */ 1703 if (nfaults == 2) { 1704 rf_InitNode(&qNodes[i], rf_wait, RF_FALSE, qfunc, undoFunc, NULL, numParityNodes, numParityNodes + numDataNodes, 7, 1, dag_h, qname, allocList); /* no wakeup func for 1705 * xor */ 1706 qNodes[i].params[0] = readDataNodes[i].params[0]; 1707 qNodes[i].params[1] = readDataNodes[i].params[1]; 1708 qNodes[i].params[2] = readQNodes[i].params[0]; 1709 qNodes[i].params[3] = readQNodes[i].params[1]; 1710 qNodes[i].params[4] = writeDataNodes[i].params[0]; 1711 qNodes[i].params[5] = writeDataNodes[i].params[1]; 1712 qNodes[i].params[6].p = raidPtr; 1713 qNodes[i].results[0] = readQNodes[i].params[1].p; /* use old Q buf as 1714 * target buf */ 1715 } 1716 } 1717 } else { 1718 /* there is only one xor node in this case */ 1719 rf_InitNode(&xorNodes[0], rf_wait, RF_FALSE, func, undoFunc, NULL, numParityNodes, numParityNodes + numDataNodes, (2 * (numDataNodes + numDataNodes + 1) + 1), 1, dag_h, name, allocList); 1720 xorNodes[0].flags |= RF_DAGNODE_FLAG_YIELD; 1721 for (i = 0; i < numDataNodes + 1; i++) { 1722 /* set up params related to Rod and Rop nodes */ 1723 xorNodes[0].params[2 * i + 0] = readDataNodes[i].params[0]; /* pda */ 1724 xorNodes[0].params[2 * i + 1] = readDataNodes[i].params[1]; /* buffer pointer */ 1725 } 1726 for (i = 0; i < numDataNodes; i++) { 1727 /* set up params related to Wnd and Wnp nodes */ 1728 xorNodes[0].params[2 * (numDataNodes + 1 + i) + 0] = writeDataNodes[i].params[0]; /* pda */ 1729 xorNodes[0].params[2 * (numDataNodes + 1 + i) + 1] = writeDataNodes[i].params[1]; /* buffer pointer */ 1730 } 1731 xorNodes[0].params[2 * (numDataNodes + numDataNodes + 1)].p = raidPtr; /* xor node needs to get 1732 * at RAID information */ 1733 xorNodes[0].results[0] = readParityNodes[0].params[1].p; 1734 if (nfaults == 2) { 1735 rf_InitNode(&qNodes[0], rf_wait, RF_FALSE, qfunc, undoFunc, NULL, numParityNodes, numParityNodes + numDataNodes, (2 * (numDataNodes + numDataNodes + 1) + 1), 1, dag_h, qname, allocList); 1736 for (i = 0; i < numDataNodes; i++) { 1737 /* set up params related to Rod */ 1738 qNodes[0].params[2 * i + 0] = readDataNodes[i].params[0]; /* pda */ 1739 qNodes[0].params[2 * i + 1] = readDataNodes[i].params[1]; /* buffer pointer */ 1740 } 1741 /* and read old q */ 1742 qNodes[0].params[2 * numDataNodes + 0] = readQNodes[0].params[0]; /* pda */ 1743 qNodes[0].params[2 * numDataNodes + 1] = readQNodes[0].params[1]; /* buffer pointer */ 1744 for (i = 0; i < numDataNodes; i++) { 1745 /* set up params related to Wnd nodes */ 1746 qNodes[0].params[2 * (numDataNodes + 1 + i) + 0] = writeDataNodes[i].params[0]; /* pda */ 1747 qNodes[0].params[2 * (numDataNodes + 1 + i) + 1] = writeDataNodes[i].params[1]; /* buffer pointer */ 1748 } 1749 qNodes[0].params[2 * (numDataNodes + numDataNodes + 1)].p = raidPtr; /* xor node needs to get 1750 * at RAID information */ 1751 qNodes[0].results[0] = readQNodes[0].params[1].p; 1752 } 1753 } 1754 1755 /* initialize nodes which write new parity (Wnp) */ 1756 pda = asmap->parityInfo; 1757 for (i = 0; i < numParityNodes; i++) { 1758 rf_InitNode(&writeParityNodes[i], rf_wait, RF_FALSE, rf_DiskWriteFunc, rf_DiskWriteUndoFunc, rf_GenericWakeupFunc, 1, numParityNodes, 4, 0, dag_h, "Wnp", allocList); 1759 RF_ASSERT(pda != NULL); 1760 writeParityNodes[i].params[0].p = pda; /* param 1 (bufPtr) 1761 * filled in by xor node */ 1762 writeParityNodes[i].params[1].p = xorNodes[i].results[0]; /* buffer pointer for 1763 * parity write 1764 * operation */ 1765 writeParityNodes[i].params[2].v = parityStripeID; 1766 writeParityNodes[i].params[3].v = RF_CREATE_PARAM3(RF_IO_NORMAL_PRIORITY, 0, 0, which_ru); 1767 1768 if (lu_flag) { 1769 /* initialize node to unlock the disk queue */ 1770 rf_InitNode(&unlockParityNodes[i], rf_wait, RF_FALSE, rf_DiskUnlockFunc, rf_DiskUnlockUndoFunc, rf_GenericWakeupFunc, 1, 1, 2, 0, dag_h, "Unp", allocList); 1771 unlockParityNodes[i].params[0].p = pda; /* physical disk addr 1772 * desc */ 1773 unlockParityNodes[i].params[1].v = RF_CREATE_PARAM3(RF_IO_NORMAL_PRIORITY, 0, lu_flag, which_ru); 1774 } 1775 pda = pda->next; 1776 } 1777 1778 /* initialize nodes which write new Q (Wnq) */ 1779 if (nfaults == 2) { 1780 pda = asmap->qInfo; 1781 for (i = 0; i < numParityNodes; i++) { 1782 rf_InitNode(&writeQNodes[i], rf_wait, RF_FALSE, rf_DiskWriteFunc, rf_DiskWriteUndoFunc, rf_GenericWakeupFunc, 1, numParityNodes, 4, 0, dag_h, "Wnq", allocList); 1783 RF_ASSERT(pda != NULL); 1784 writeQNodes[i].params[0].p = pda; /* param 1 (bufPtr) 1785 * filled in by xor node */ 1786 writeQNodes[i].params[1].p = qNodes[i].results[0]; /* buffer pointer for 1787 * parity write 1788 * operation */ 1789 writeQNodes[i].params[2].v = parityStripeID; 1790 writeQNodes[i].params[3].v = RF_CREATE_PARAM3(RF_IO_NORMAL_PRIORITY, 0, 0, which_ru); 1791 1792 if (lu_flag) { 1793 /* initialize node to unlock the disk queue */ 1794 rf_InitNode(&unlockQNodes[i], rf_wait, RF_FALSE, rf_DiskUnlockFunc, rf_DiskUnlockUndoFunc, rf_GenericWakeupFunc, 1, 1, 2, 0, dag_h, "Unq", allocList); 1795 unlockQNodes[i].params[0].p = pda; /* physical disk addr 1796 * desc */ 1797 unlockQNodes[i].params[1].v = RF_CREATE_PARAM3(RF_IO_NORMAL_PRIORITY, 0, lu_flag, which_ru); 1798 } 1799 pda = pda->next; 1800 } 1801 } 1802 /* Step 4. connect the nodes */ 1803 1804 /* connect header to block node */ 1805 dag_h->succedents[0] = blockNode; 1806 1807 /* connect block node to read old data nodes */ 1808 RF_ASSERT(blockNode->numSuccedents == (numDataNodes + (numParityNodes * nfaults))); 1809 for (i = 0; i < numDataNodes; i++) { 1810 blockNode->succedents[i] = &readDataNodes[i]; 1811 RF_ASSERT(readDataNodes[i].numAntecedents == 1); 1812 readDataNodes[i].antecedents[0] = blockNode; 1813 readDataNodes[i].antType[0] = rf_control; 1814 } 1815 1816 /* connect block node to read old parity nodes */ 1817 for (i = 0; i < numParityNodes; i++) { 1818 blockNode->succedents[numDataNodes + i] = &readParityNodes[i]; 1819 RF_ASSERT(readParityNodes[i].numAntecedents == 1); 1820 readParityNodes[i].antecedents[0] = blockNode; 1821 readParityNodes[i].antType[0] = rf_control; 1822 } 1823 1824 /* connect block node to read old Q nodes */ 1825 if (nfaults == 2) 1826 for (i = 0; i < numParityNodes; i++) { 1827 blockNode->succedents[numDataNodes + numParityNodes + i] = &readQNodes[i]; 1828 RF_ASSERT(readQNodes[i].numAntecedents == 1); 1829 readQNodes[i].antecedents[0] = blockNode; 1830 readQNodes[i].antType[0] = rf_control; 1831 } 1832 1833 /* connect read old data nodes to write new data nodes */ 1834 for (i = 0; i < numDataNodes; i++) { 1835 RF_ASSERT(readDataNodes[i].numSuccedents == ((nfaults * numParityNodes) + 1)); 1836 RF_ASSERT(writeDataNodes[i].numAntecedents == 1); 1837 readDataNodes[i].succedents[0] = &writeDataNodes[i]; 1838 writeDataNodes[i].antecedents[0] = &readDataNodes[i]; 1839 writeDataNodes[i].antType[0] = rf_antiData; 1840 } 1841 1842 /* connect read old data nodes to xor nodes */ 1843 for (i = 0; i < numDataNodes; i++) { 1844 for (j = 0; j < numParityNodes; j++) { 1845 RF_ASSERT(xorNodes[j].numAntecedents == numDataNodes + numParityNodes); 1846 readDataNodes[i].succedents[1 + j] = &xorNodes[j]; 1847 xorNodes[j].antecedents[i] = &readDataNodes[i]; 1848 xorNodes[j].antType[i] = rf_trueData; 1849 } 1850 } 1851 1852 /* connect read old data nodes to q nodes */ 1853 if (nfaults == 2) 1854 for (i = 0; i < numDataNodes; i++) 1855 for (j = 0; j < numParityNodes; j++) { 1856 RF_ASSERT(qNodes[j].numAntecedents == numDataNodes + numParityNodes); 1857 readDataNodes[i].succedents[1 + numParityNodes + j] = &qNodes[j]; 1858 qNodes[j].antecedents[i] = &readDataNodes[i]; 1859 qNodes[j].antType[i] = rf_trueData; 1860 } 1861 1862 /* connect read old parity nodes to xor nodes */ 1863 for (i = 0; i < numParityNodes; i++) { 1864 for (j = 0; j < numParityNodes; j++) { 1865 RF_ASSERT(readParityNodes[i].numSuccedents == numParityNodes); 1866 readParityNodes[i].succedents[j] = &xorNodes[j]; 1867 xorNodes[j].antecedents[numDataNodes + i] = &readParityNodes[i]; 1868 xorNodes[j].antType[numDataNodes + i] = rf_trueData; 1869 } 1870 } 1871 1872 /* connect read old q nodes to q nodes */ 1873 if (nfaults == 2) 1874 for (i = 0; i < numParityNodes; i++) { 1875 for (j = 0; j < numParityNodes; j++) { 1876 RF_ASSERT(readQNodes[i].numSuccedents == numParityNodes); 1877 readQNodes[i].succedents[j] = &qNodes[j]; 1878 qNodes[j].antecedents[numDataNodes + i] = &readQNodes[i]; 1879 qNodes[j].antType[numDataNodes + i] = rf_trueData; 1880 } 1881 } 1882 1883 /* connect xor nodes to the write new parity nodes */ 1884 for (i = 0; i < numParityNodes; i++) { 1885 RF_ASSERT(writeParityNodes[i].numAntecedents == numParityNodes); 1886 for (j = 0; j < numParityNodes; j++) { 1887 RF_ASSERT(xorNodes[j].numSuccedents == numParityNodes); 1888 xorNodes[i].succedents[j] = &writeParityNodes[j]; 1889 writeParityNodes[j].antecedents[i] = &xorNodes[i]; 1890 writeParityNodes[j].antType[i] = rf_trueData; 1891 } 1892 } 1893 1894 /* connect q nodes to the write new q nodes */ 1895 if (nfaults == 2) 1896 for (i = 0; i < numParityNodes; i++) { 1897 RF_ASSERT(writeQNodes[i].numAntecedents == numParityNodes); 1898 for (j = 0; j < numParityNodes; j++) { 1899 RF_ASSERT(qNodes[j].numSuccedents == 1); 1900 qNodes[i].succedents[j] = &writeQNodes[j]; 1901 writeQNodes[j].antecedents[i] = &qNodes[i]; 1902 writeQNodes[j].antType[i] = rf_trueData; 1903 } 1904 } 1905 1906 RF_ASSERT(termNode->numAntecedents == (numDataNodes + (nfaults * numParityNodes))); 1907 RF_ASSERT(termNode->numSuccedents == 0); 1908 for (i = 0; i < numDataNodes; i++) { 1909 if (lu_flag) { 1910 /* connect write new data nodes to unlock nodes */ 1911 RF_ASSERT(writeDataNodes[i].numSuccedents == 1); 1912 RF_ASSERT(unlockDataNodes[i].numAntecedents == 1); 1913 writeDataNodes[i].succedents[0] = &unlockDataNodes[i]; 1914 unlockDataNodes[i].antecedents[0] = &writeDataNodes[i]; 1915 unlockDataNodes[i].antType[0] = rf_control; 1916 1917 /* connect unlock nodes to term node */ 1918 RF_ASSERT(unlockDataNodes[i].numSuccedents == 1); 1919 unlockDataNodes[i].succedents[0] = termNode; 1920 termNode->antecedents[i] = &unlockDataNodes[i]; 1921 termNode->antType[i] = rf_control; 1922 } else { 1923 /* connect write new data nodes to term node */ 1924 RF_ASSERT(writeDataNodes[i].numSuccedents == 1); 1925 RF_ASSERT(termNode->numAntecedents == (numDataNodes + (nfaults * numParityNodes))); 1926 writeDataNodes[i].succedents[0] = termNode; 1927 termNode->antecedents[i] = &writeDataNodes[i]; 1928 termNode->antType[i] = rf_control; 1929 } 1930 } 1931 1932 for (i = 0; i < numParityNodes; i++) { 1933 if (lu_flag) { 1934 /* connect write new parity nodes to unlock nodes */ 1935 RF_ASSERT(writeParityNodes[i].numSuccedents == 1); 1936 RF_ASSERT(unlockParityNodes[i].numAntecedents == 1); 1937 writeParityNodes[i].succedents[0] = &unlockParityNodes[i]; 1938 unlockParityNodes[i].antecedents[0] = &writeParityNodes[i]; 1939 unlockParityNodes[i].antType[0] = rf_control; 1940 1941 /* connect unlock nodes to term node */ 1942 RF_ASSERT(unlockParityNodes[i].numSuccedents == 1); 1943 unlockParityNodes[i].succedents[0] = termNode; 1944 termNode->antecedents[numDataNodes + i] = &unlockParityNodes[i]; 1945 termNode->antType[numDataNodes + i] = rf_control; 1946 } else { 1947 RF_ASSERT(writeParityNodes[i].numSuccedents == 1); 1948 writeParityNodes[i].succedents[0] = termNode; 1949 termNode->antecedents[numDataNodes + i] = &writeParityNodes[i]; 1950 termNode->antType[numDataNodes + i] = rf_control; 1951 } 1952 } 1953 1954 if (nfaults == 2) 1955 for (i = 0; i < numParityNodes; i++) { 1956 if (lu_flag) { 1957 /* connect write new Q nodes to unlock nodes */ 1958 RF_ASSERT(writeQNodes[i].numSuccedents == 1); 1959 RF_ASSERT(unlockQNodes[i].numAntecedents == 1); 1960 writeQNodes[i].succedents[0] = &unlockQNodes[i]; 1961 unlockQNodes[i].antecedents[0] = &writeQNodes[i]; 1962 unlockQNodes[i].antType[0] = rf_control; 1963 1964 /* connect unlock nodes to unblock node */ 1965 RF_ASSERT(unlockQNodes[i].numSuccedents == 1); 1966 unlockQNodes[i].succedents[0] = termNode; 1967 termNode->antecedents[numDataNodes + numParityNodes + i] = &unlockQNodes[i]; 1968 termNode->antType[numDataNodes + numParityNodes + i] = rf_control; 1969 } else { 1970 RF_ASSERT(writeQNodes[i].numSuccedents == 1); 1971 writeQNodes[i].succedents[0] = termNode; 1972 termNode->antecedents[numDataNodes + numParityNodes + i] = &writeQNodes[i]; 1973 termNode->antType[numDataNodes + numParityNodes + i] = rf_control; 1974 } 1975 } 1976 } 1977 1978 1979 1980 /****************************************************************************** 1981 * create a write graph (fault-free or degraded) for RAID level 1 1982 * 1983 * Hdr Nil -> Wpd -> Nil -> Trm 1984 * Nil -> Wsd -> 1985 * 1986 * The "Wpd" node writes data to the primary copy in the mirror pair 1987 * The "Wsd" node writes data to the secondary copy in the mirror pair 1988 * 1989 * Parameters: raidPtr - description of the physical array 1990 * asmap - logical & physical addresses for this access 1991 * bp - buffer ptr (holds write data) 1992 * flags - general flags (e.g. disk locking) 1993 * allocList - list of memory allocated in DAG creation 1994 *****************************************************************************/ 1995 1996 void 1997 rf_CreateRaidOneWriteDAGFwd( 1998 RF_Raid_t * raidPtr, 1999 RF_AccessStripeMap_t * asmap, 2000 RF_DagHeader_t * dag_h, 2001 void *bp, 2002 RF_RaidAccessFlags_t flags, 2003 RF_AllocListElem_t * allocList) 2004 { 2005 RF_DagNode_t *blockNode, *unblockNode, *termNode; 2006 RF_DagNode_t *nodes, *wndNode, *wmirNode; 2007 int nWndNodes, nWmirNodes, i; 2008 RF_ReconUnitNum_t which_ru; 2009 RF_PhysDiskAddr_t *pda, *pdaP; 2010 RF_StripeNum_t parityStripeID; 2011 2012 parityStripeID = rf_RaidAddressToParityStripeID(&(raidPtr->Layout), 2013 asmap->raidAddress, &which_ru); 2014 if (rf_dagDebug) { 2015 printf("[Creating RAID level 1 write DAG]\n"); 2016 } 2017 nWmirNodes = (asmap->parityInfo->next) ? 2 : 1; /* 2 implies access not 2018 * SU aligned */ 2019 nWndNodes = (asmap->physInfo->next) ? 2 : 1; 2020 2021 /* alloc the Wnd nodes and the Wmir node */ 2022 if (asmap->numDataFailed == 1) 2023 nWndNodes--; 2024 if (asmap->numParityFailed == 1) 2025 nWmirNodes--; 2026 2027 /* total number of nodes = nWndNodes + nWmirNodes + (block + unblock + 2028 * terminator) */ 2029 RF_CallocAndAdd(nodes, nWndNodes + nWmirNodes + 3, sizeof(RF_DagNode_t), (RF_DagNode_t *), allocList); 2030 i = 0; 2031 wndNode = &nodes[i]; 2032 i += nWndNodes; 2033 wmirNode = &nodes[i]; 2034 i += nWmirNodes; 2035 blockNode = &nodes[i]; 2036 i += 1; 2037 unblockNode = &nodes[i]; 2038 i += 1; 2039 termNode = &nodes[i]; 2040 i += 1; 2041 RF_ASSERT(i == (nWndNodes + nWmirNodes + 3)); 2042 2043 /* this dag can commit immediately */ 2044 dag_h->numCommitNodes = 0; 2045 dag_h->numCommits = 0; 2046 dag_h->numSuccedents = 1; 2047 2048 /* initialize the unblock and term nodes */ 2049 rf_InitNode(blockNode, rf_wait, RF_FALSE, rf_NullNodeFunc, rf_NullNodeUndoFunc, NULL, (nWndNodes + nWmirNodes), 0, 0, 0, dag_h, "Nil", allocList); 2050 rf_InitNode(unblockNode, rf_wait, RF_FALSE, rf_NullNodeFunc, rf_NullNodeUndoFunc, NULL, 1, (nWndNodes + nWmirNodes), 0, 0, dag_h, "Nil", allocList); 2051 rf_InitNode(termNode, rf_wait, RF_FALSE, rf_TerminateFunc, rf_TerminateUndoFunc, NULL, 0, 1, 0, 0, dag_h, "Trm", allocList); 2052 2053 /* initialize the wnd nodes */ 2054 if (nWndNodes > 0) { 2055 pda = asmap->physInfo; 2056 for (i = 0; i < nWndNodes; i++) { 2057 rf_InitNode(&wndNode[i], rf_wait, RF_FALSE, rf_DiskWriteFunc, rf_DiskWriteUndoFunc, rf_GenericWakeupFunc, 1, 1, 4, 0, dag_h, "Wpd", allocList); 2058 RF_ASSERT(pda != NULL); 2059 wndNode[i].params[0].p = pda; 2060 wndNode[i].params[1].p = pda->bufPtr; 2061 wndNode[i].params[2].v = parityStripeID; 2062 wndNode[i].params[3].v = RF_CREATE_PARAM3(RF_IO_NORMAL_PRIORITY, 0, 0, which_ru); 2063 pda = pda->next; 2064 } 2065 RF_ASSERT(pda == NULL); 2066 } 2067 /* initialize the mirror nodes */ 2068 if (nWmirNodes > 0) { 2069 pda = asmap->physInfo; 2070 pdaP = asmap->parityInfo; 2071 for (i = 0; i < nWmirNodes; i++) { 2072 rf_InitNode(&wmirNode[i], rf_wait, RF_FALSE, rf_DiskWriteFunc, rf_DiskWriteUndoFunc, rf_GenericWakeupFunc, 1, 1, 4, 0, dag_h, "Wsd", allocList); 2073 RF_ASSERT(pda != NULL); 2074 wmirNode[i].params[0].p = pdaP; 2075 wmirNode[i].params[1].p = pda->bufPtr; 2076 wmirNode[i].params[2].v = parityStripeID; 2077 wmirNode[i].params[3].v = RF_CREATE_PARAM3(RF_IO_NORMAL_PRIORITY, 0, 0, which_ru); 2078 pda = pda->next; 2079 pdaP = pdaP->next; 2080 } 2081 RF_ASSERT(pda == NULL); 2082 RF_ASSERT(pdaP == NULL); 2083 } 2084 /* link the header node to the block node */ 2085 RF_ASSERT(dag_h->numSuccedents == 1); 2086 RF_ASSERT(blockNode->numAntecedents == 0); 2087 dag_h->succedents[0] = blockNode; 2088 2089 /* link the block node to the write nodes */ 2090 RF_ASSERT(blockNode->numSuccedents == (nWndNodes + nWmirNodes)); 2091 for (i = 0; i < nWndNodes; i++) { 2092 RF_ASSERT(wndNode[i].numAntecedents == 1); 2093 blockNode->succedents[i] = &wndNode[i]; 2094 wndNode[i].antecedents[0] = blockNode; 2095 wndNode[i].antType[0] = rf_control; 2096 } 2097 for (i = 0; i < nWmirNodes; i++) { 2098 RF_ASSERT(wmirNode[i].numAntecedents == 1); 2099 blockNode->succedents[i + nWndNodes] = &wmirNode[i]; 2100 wmirNode[i].antecedents[0] = blockNode; 2101 wmirNode[i].antType[0] = rf_control; 2102 } 2103 2104 /* link the write nodes to the unblock node */ 2105 RF_ASSERT(unblockNode->numAntecedents == (nWndNodes + nWmirNodes)); 2106 for (i = 0; i < nWndNodes; i++) { 2107 RF_ASSERT(wndNode[i].numSuccedents == 1); 2108 wndNode[i].succedents[0] = unblockNode; 2109 unblockNode->antecedents[i] = &wndNode[i]; 2110 unblockNode->antType[i] = rf_control; 2111 } 2112 for (i = 0; i < nWmirNodes; i++) { 2113 RF_ASSERT(wmirNode[i].numSuccedents == 1); 2114 wmirNode[i].succedents[0] = unblockNode; 2115 unblockNode->antecedents[i + nWndNodes] = &wmirNode[i]; 2116 unblockNode->antType[i + nWndNodes] = rf_control; 2117 } 2118 2119 /* link the unblock node to the term node */ 2120 RF_ASSERT(unblockNode->numSuccedents == 1); 2121 RF_ASSERT(termNode->numAntecedents == 1); 2122 RF_ASSERT(termNode->numSuccedents == 0); 2123 unblockNode->succedents[0] = termNode; 2124 termNode->antecedents[0] = unblockNode; 2125 termNode->antType[0] = rf_control; 2126 2127 return; 2128 } 2129