1 /* $NetBSD: rf_dagffwr.c,v 1.10 2003/02/09 10:04:33 jdolecek Exp $ */ 2 /* 3 * Copyright (c) 1995 Carnegie-Mellon University. 4 * All rights reserved. 5 * 6 * Author: Mark Holland, Daniel Stodolsky, William V. Courtright II 7 * 8 * Permission to use, copy, modify and distribute this software and 9 * its documentation is hereby granted, provided that both the copyright 10 * notice and this permission notice appear in all copies of the 11 * software, derivative works or modified versions, and any portions 12 * thereof, and that both notices appear in supporting documentation. 13 * 14 * CARNEGIE MELLON ALLOWS FREE USE OF THIS SOFTWARE IN ITS "AS IS" 15 * CONDITION. CARNEGIE MELLON DISCLAIMS ANY LIABILITY OF ANY KIND 16 * FOR ANY DAMAGES WHATSOEVER RESULTING FROM THE USE OF THIS SOFTWARE. 17 * 18 * Carnegie Mellon requests users of this software to return to 19 * 20 * Software Distribution Coordinator or Software.Distribution@CS.CMU.EDU 21 * School of Computer Science 22 * Carnegie Mellon University 23 * Pittsburgh PA 15213-3890 24 * 25 * any improvements or extensions that they make and grant Carnegie the 26 * rights to redistribute these changes. 27 */ 28 29 /* 30 * rf_dagff.c 31 * 32 * code for creating fault-free DAGs 33 * 34 */ 35 36 #include <sys/cdefs.h> 37 __KERNEL_RCSID(0, "$NetBSD: rf_dagffwr.c,v 1.10 2003/02/09 10:04:33 jdolecek Exp $"); 38 39 #include <dev/raidframe/raidframevar.h> 40 41 #include "rf_raid.h" 42 #include "rf_dag.h" 43 #include "rf_dagutils.h" 44 #include "rf_dagfuncs.h" 45 #include "rf_debugMem.h" 46 #include "rf_dagffrd.h" 47 #include "rf_general.h" 48 #include "rf_dagffwr.h" 49 50 /****************************************************************************** 51 * 52 * General comments on DAG creation: 53 * 54 * All DAGs in this file use roll-away error recovery. Each DAG has a single 55 * commit node, usually called "Cmt." If an error occurs before the Cmt node 56 * is reached, the execution engine will halt forward execution and work 57 * backward through the graph, executing the undo functions. Assuming that 58 * each node in the graph prior to the Cmt node are undoable and atomic - or - 59 * does not make changes to permanent state, the graph will fail atomically. 60 * If an error occurs after the Cmt node executes, the engine will roll-forward 61 * through the graph, blindly executing nodes until it reaches the end. 62 * If a graph reaches the end, it is assumed to have completed successfully. 63 * 64 * A graph has only 1 Cmt node. 65 * 66 */ 67 68 69 /****************************************************************************** 70 * 71 * The following wrappers map the standard DAG creation interface to the 72 * DAG creation routines. Additionally, these wrappers enable experimentation 73 * with new DAG structures by providing an extra level of indirection, allowing 74 * the DAG creation routines to be replaced at this single point. 75 */ 76 77 78 void 79 rf_CreateNonRedundantWriteDAG( 80 RF_Raid_t * raidPtr, 81 RF_AccessStripeMap_t * asmap, 82 RF_DagHeader_t * dag_h, 83 void *bp, 84 RF_RaidAccessFlags_t flags, 85 RF_AllocListElem_t * allocList, 86 RF_IoType_t type) 87 { 88 rf_CreateNonredundantDAG(raidPtr, asmap, dag_h, bp, flags, allocList, 89 RF_IO_TYPE_WRITE); 90 } 91 92 void 93 rf_CreateRAID0WriteDAG( 94 RF_Raid_t * raidPtr, 95 RF_AccessStripeMap_t * asmap, 96 RF_DagHeader_t * dag_h, 97 void *bp, 98 RF_RaidAccessFlags_t flags, 99 RF_AllocListElem_t * allocList, 100 RF_IoType_t type) 101 { 102 rf_CreateNonredundantDAG(raidPtr, asmap, dag_h, bp, flags, allocList, 103 RF_IO_TYPE_WRITE); 104 } 105 106 void 107 rf_CreateSmallWriteDAG( 108 RF_Raid_t * raidPtr, 109 RF_AccessStripeMap_t * asmap, 110 RF_DagHeader_t * dag_h, 111 void *bp, 112 RF_RaidAccessFlags_t flags, 113 RF_AllocListElem_t * allocList) 114 { 115 /* "normal" rollaway */ 116 rf_CommonCreateSmallWriteDAG(raidPtr, asmap, dag_h, bp, flags, allocList, 117 &rf_xorFuncs, NULL); 118 } 119 120 void 121 rf_CreateLargeWriteDAG( 122 RF_Raid_t * raidPtr, 123 RF_AccessStripeMap_t * asmap, 124 RF_DagHeader_t * dag_h, 125 void *bp, 126 RF_RaidAccessFlags_t flags, 127 RF_AllocListElem_t * allocList) 128 { 129 /* "normal" rollaway */ 130 rf_CommonCreateLargeWriteDAG(raidPtr, asmap, dag_h, bp, flags, allocList, 131 1, rf_RegularXorFunc, RF_TRUE); 132 } 133 134 135 /****************************************************************************** 136 * 137 * DAG creation code begins here 138 */ 139 140 141 /****************************************************************************** 142 * 143 * creates a DAG to perform a large-write operation: 144 * 145 * / Rod \ / Wnd \ 146 * H -- block- Rod - Xor - Cmt - Wnd --- T 147 * \ Rod / \ Wnp / 148 * \[Wnq]/ 149 * 150 * The XOR node also does the Q calculation in the P+Q architecture. 151 * All nodes are before the commit node (Cmt) are assumed to be atomic and 152 * undoable - or - they make no changes to permanent state. 153 * 154 * Rod = read old data 155 * Cmt = commit node 156 * Wnp = write new parity 157 * Wnd = write new data 158 * Wnq = write new "q" 159 * [] denotes optional segments in the graph 160 * 161 * Parameters: raidPtr - description of the physical array 162 * asmap - logical & physical addresses for this access 163 * bp - buffer ptr (holds write data) 164 * flags - general flags (e.g. disk locking) 165 * allocList - list of memory allocated in DAG creation 166 * nfaults - number of faults array can tolerate 167 * (equal to # redundancy units in stripe) 168 * redfuncs - list of redundancy generating functions 169 * 170 *****************************************************************************/ 171 172 void 173 rf_CommonCreateLargeWriteDAG( 174 RF_Raid_t * raidPtr, 175 RF_AccessStripeMap_t * asmap, 176 RF_DagHeader_t * dag_h, 177 void *bp, 178 RF_RaidAccessFlags_t flags, 179 RF_AllocListElem_t * allocList, 180 int nfaults, 181 int (*redFunc) (RF_DagNode_t *), 182 int allowBufferRecycle) 183 { 184 RF_DagNode_t *nodes, *wndNodes, *rodNodes, *xorNode, *wnpNode; 185 RF_DagNode_t *wnqNode, *blockNode, *commitNode, *termNode; 186 int nWndNodes, nRodNodes, i, nodeNum, asmNum; 187 RF_AccessStripeMapHeader_t *new_asm_h[2]; 188 RF_StripeNum_t parityStripeID; 189 char *sosBuffer, *eosBuffer; 190 RF_ReconUnitNum_t which_ru; 191 RF_RaidLayout_t *layoutPtr; 192 RF_PhysDiskAddr_t *pda; 193 194 layoutPtr = &(raidPtr->Layout); 195 parityStripeID = rf_RaidAddressToParityStripeID(layoutPtr, asmap->raidAddress, 196 &which_ru); 197 198 if (rf_dagDebug) { 199 printf("[Creating large-write DAG]\n"); 200 } 201 dag_h->creator = "LargeWriteDAG"; 202 203 dag_h->numCommitNodes = 1; 204 dag_h->numCommits = 0; 205 dag_h->numSuccedents = 1; 206 207 /* alloc the nodes: Wnd, xor, commit, block, term, and Wnp */ 208 nWndNodes = asmap->numStripeUnitsAccessed; 209 RF_CallocAndAdd(nodes, nWndNodes + 4 + nfaults, sizeof(RF_DagNode_t), 210 (RF_DagNode_t *), allocList); 211 i = 0; 212 wndNodes = &nodes[i]; 213 i += nWndNodes; 214 xorNode = &nodes[i]; 215 i += 1; 216 wnpNode = &nodes[i]; 217 i += 1; 218 blockNode = &nodes[i]; 219 i += 1; 220 commitNode = &nodes[i]; 221 i += 1; 222 termNode = &nodes[i]; 223 i += 1; 224 if (nfaults == 2) { 225 wnqNode = &nodes[i]; 226 i += 1; 227 } else { 228 wnqNode = NULL; 229 } 230 rf_MapUnaccessedPortionOfStripe(raidPtr, layoutPtr, asmap, dag_h, new_asm_h, 231 &nRodNodes, &sosBuffer, &eosBuffer, allocList); 232 if (nRodNodes > 0) { 233 RF_CallocAndAdd(rodNodes, nRodNodes, sizeof(RF_DagNode_t), 234 (RF_DagNode_t *), allocList); 235 } else { 236 rodNodes = NULL; 237 } 238 239 /* begin node initialization */ 240 if (nRodNodes > 0) { 241 rf_InitNode(blockNode, rf_wait, RF_FALSE, rf_NullNodeFunc, rf_NullNodeUndoFunc, 242 NULL, nRodNodes, 0, 0, 0, dag_h, "Nil", allocList); 243 } else { 244 rf_InitNode(blockNode, rf_wait, RF_FALSE, rf_NullNodeFunc, rf_NullNodeUndoFunc, 245 NULL, 1, 0, 0, 0, dag_h, "Nil", allocList); 246 } 247 248 rf_InitNode(commitNode, rf_wait, RF_TRUE, rf_NullNodeFunc, rf_NullNodeUndoFunc, NULL, 249 nWndNodes + nfaults, 1, 0, 0, dag_h, "Cmt", allocList); 250 rf_InitNode(termNode, rf_wait, RF_FALSE, rf_TerminateFunc, rf_TerminateUndoFunc, NULL, 251 0, nWndNodes + nfaults, 0, 0, dag_h, "Trm", allocList); 252 253 /* initialize the Rod nodes */ 254 for (nodeNum = asmNum = 0; asmNum < 2; asmNum++) { 255 if (new_asm_h[asmNum]) { 256 pda = new_asm_h[asmNum]->stripeMap->physInfo; 257 while (pda) { 258 rf_InitNode(&rodNodes[nodeNum], rf_wait, RF_FALSE, rf_DiskReadFunc, 259 rf_DiskReadUndoFunc, rf_GenericWakeupFunc, 1, 1, 4, 0, dag_h, 260 "Rod", allocList); 261 rodNodes[nodeNum].params[0].p = pda; 262 rodNodes[nodeNum].params[1].p = pda->bufPtr; 263 rodNodes[nodeNum].params[2].v = parityStripeID; 264 rodNodes[nodeNum].params[3].v = RF_CREATE_PARAM3(RF_IO_NORMAL_PRIORITY, 265 0, 0, which_ru); 266 nodeNum++; 267 pda = pda->next; 268 } 269 } 270 } 271 RF_ASSERT(nodeNum == nRodNodes); 272 273 /* initialize the wnd nodes */ 274 pda = asmap->physInfo; 275 for (i = 0; i < nWndNodes; i++) { 276 rf_InitNode(&wndNodes[i], rf_wait, RF_FALSE, rf_DiskWriteFunc, rf_DiskWriteUndoFunc, 277 rf_GenericWakeupFunc, 1, 1, 4, 0, dag_h, "Wnd", allocList); 278 RF_ASSERT(pda != NULL); 279 wndNodes[i].params[0].p = pda; 280 wndNodes[i].params[1].p = pda->bufPtr; 281 wndNodes[i].params[2].v = parityStripeID; 282 wndNodes[i].params[3].v = RF_CREATE_PARAM3(RF_IO_NORMAL_PRIORITY, 0, 0, which_ru); 283 pda = pda->next; 284 } 285 286 /* initialize the redundancy node */ 287 if (nRodNodes > 0) { 288 rf_InitNode(xorNode, rf_wait, RF_FALSE, redFunc, rf_NullNodeUndoFunc, NULL, 1, 289 nRodNodes, 2 * (nWndNodes + nRodNodes) + 1, nfaults, dag_h, 290 "Xr ", allocList); 291 } else { 292 rf_InitNode(xorNode, rf_wait, RF_FALSE, redFunc, rf_NullNodeUndoFunc, NULL, 1, 293 1, 2 * (nWndNodes + nRodNodes) + 1, nfaults, dag_h, "Xr ", allocList); 294 } 295 xorNode->flags |= RF_DAGNODE_FLAG_YIELD; 296 for (i = 0; i < nWndNodes; i++) { 297 xorNode->params[2 * i + 0] = wndNodes[i].params[0]; /* pda */ 298 xorNode->params[2 * i + 1] = wndNodes[i].params[1]; /* buf ptr */ 299 } 300 for (i = 0; i < nRodNodes; i++) { 301 xorNode->params[2 * (nWndNodes + i) + 0] = rodNodes[i].params[0]; /* pda */ 302 xorNode->params[2 * (nWndNodes + i) + 1] = rodNodes[i].params[1]; /* buf ptr */ 303 } 304 /* xor node needs to get at RAID information */ 305 xorNode->params[2 * (nWndNodes + nRodNodes)].p = raidPtr; 306 307 /* 308 * Look for an Rod node that reads a complete SU. If none, alloc a buffer 309 * to receive the parity info. Note that we can't use a new data buffer 310 * because it will not have gotten written when the xor occurs. 311 */ 312 if (allowBufferRecycle) { 313 for (i = 0; i < nRodNodes; i++) { 314 if (((RF_PhysDiskAddr_t *) rodNodes[i].params[0].p)->numSector == raidPtr->Layout.sectorsPerStripeUnit) 315 break; 316 } 317 } 318 if ((!allowBufferRecycle) || (i == nRodNodes)) { 319 RF_CallocAndAdd(xorNode->results[0], 1, 320 rf_RaidAddressToByte(raidPtr, raidPtr->Layout.sectorsPerStripeUnit), 321 (void *), allocList); 322 } else { 323 xorNode->results[0] = rodNodes[i].params[1].p; 324 } 325 326 /* initialize the Wnp node */ 327 rf_InitNode(wnpNode, rf_wait, RF_FALSE, rf_DiskWriteFunc, rf_DiskWriteUndoFunc, 328 rf_GenericWakeupFunc, 1, 1, 4, 0, dag_h, "Wnp", allocList); 329 wnpNode->params[0].p = asmap->parityInfo; 330 wnpNode->params[1].p = xorNode->results[0]; 331 wnpNode->params[2].v = parityStripeID; 332 wnpNode->params[3].v = RF_CREATE_PARAM3(RF_IO_NORMAL_PRIORITY, 0, 0, which_ru); 333 /* parityInfo must describe entire parity unit */ 334 RF_ASSERT(asmap->parityInfo->next == NULL); 335 336 if (nfaults == 2) { 337 /* 338 * We never try to recycle a buffer for the Q calcuation 339 * in addition to the parity. This would cause two buffers 340 * to get smashed during the P and Q calculation, guaranteeing 341 * one would be wrong. 342 */ 343 RF_CallocAndAdd(xorNode->results[1], 1, 344 rf_RaidAddressToByte(raidPtr, raidPtr->Layout.sectorsPerStripeUnit), 345 (void *), allocList); 346 rf_InitNode(wnqNode, rf_wait, RF_FALSE, rf_DiskWriteFunc, rf_DiskWriteUndoFunc, 347 rf_GenericWakeupFunc, 1, 1, 4, 0, dag_h, "Wnq", allocList); 348 wnqNode->params[0].p = asmap->qInfo; 349 wnqNode->params[1].p = xorNode->results[1]; 350 wnqNode->params[2].v = parityStripeID; 351 wnqNode->params[3].v = RF_CREATE_PARAM3(RF_IO_NORMAL_PRIORITY, 0, 0, which_ru); 352 /* parityInfo must describe entire parity unit */ 353 RF_ASSERT(asmap->parityInfo->next == NULL); 354 } 355 /* 356 * Connect nodes to form graph. 357 */ 358 359 /* connect dag header to block node */ 360 RF_ASSERT(blockNode->numAntecedents == 0); 361 dag_h->succedents[0] = blockNode; 362 363 if (nRodNodes > 0) { 364 /* connect the block node to the Rod nodes */ 365 RF_ASSERT(blockNode->numSuccedents == nRodNodes); 366 RF_ASSERT(xorNode->numAntecedents == nRodNodes); 367 for (i = 0; i < nRodNodes; i++) { 368 RF_ASSERT(rodNodes[i].numAntecedents == 1); 369 blockNode->succedents[i] = &rodNodes[i]; 370 rodNodes[i].antecedents[0] = blockNode; 371 rodNodes[i].antType[0] = rf_control; 372 373 /* connect the Rod nodes to the Xor node */ 374 RF_ASSERT(rodNodes[i].numSuccedents == 1); 375 rodNodes[i].succedents[0] = xorNode; 376 xorNode->antecedents[i] = &rodNodes[i]; 377 xorNode->antType[i] = rf_trueData; 378 } 379 } else { 380 /* connect the block node to the Xor node */ 381 RF_ASSERT(blockNode->numSuccedents == 1); 382 RF_ASSERT(xorNode->numAntecedents == 1); 383 blockNode->succedents[0] = xorNode; 384 xorNode->antecedents[0] = blockNode; 385 xorNode->antType[0] = rf_control; 386 } 387 388 /* connect the xor node to the commit node */ 389 RF_ASSERT(xorNode->numSuccedents == 1); 390 RF_ASSERT(commitNode->numAntecedents == 1); 391 xorNode->succedents[0] = commitNode; 392 commitNode->antecedents[0] = xorNode; 393 commitNode->antType[0] = rf_control; 394 395 /* connect the commit node to the write nodes */ 396 RF_ASSERT(commitNode->numSuccedents == nWndNodes + nfaults); 397 for (i = 0; i < nWndNodes; i++) { 398 RF_ASSERT(wndNodes->numAntecedents == 1); 399 commitNode->succedents[i] = &wndNodes[i]; 400 wndNodes[i].antecedents[0] = commitNode; 401 wndNodes[i].antType[0] = rf_control; 402 } 403 RF_ASSERT(wnpNode->numAntecedents == 1); 404 commitNode->succedents[nWndNodes] = wnpNode; 405 wnpNode->antecedents[0] = commitNode; 406 wnpNode->antType[0] = rf_trueData; 407 if (nfaults == 2) { 408 RF_ASSERT(wnqNode->numAntecedents == 1); 409 commitNode->succedents[nWndNodes + 1] = wnqNode; 410 wnqNode->antecedents[0] = commitNode; 411 wnqNode->antType[0] = rf_trueData; 412 } 413 /* connect the write nodes to the term node */ 414 RF_ASSERT(termNode->numAntecedents == nWndNodes + nfaults); 415 RF_ASSERT(termNode->numSuccedents == 0); 416 for (i = 0; i < nWndNodes; i++) { 417 RF_ASSERT(wndNodes->numSuccedents == 1); 418 wndNodes[i].succedents[0] = termNode; 419 termNode->antecedents[i] = &wndNodes[i]; 420 termNode->antType[i] = rf_control; 421 } 422 RF_ASSERT(wnpNode->numSuccedents == 1); 423 wnpNode->succedents[0] = termNode; 424 termNode->antecedents[nWndNodes] = wnpNode; 425 termNode->antType[nWndNodes] = rf_control; 426 if (nfaults == 2) { 427 RF_ASSERT(wnqNode->numSuccedents == 1); 428 wnqNode->succedents[0] = termNode; 429 termNode->antecedents[nWndNodes + 1] = wnqNode; 430 termNode->antType[nWndNodes + 1] = rf_control; 431 } 432 } 433 /****************************************************************************** 434 * 435 * creates a DAG to perform a small-write operation (either raid 5 or pq), 436 * which is as follows: 437 * 438 * Hdr -> Nil -> Rop -> Xor -> Cmt ----> Wnp [Unp] --> Trm 439 * \- Rod X / \----> Wnd [Und]-/ 440 * [\- Rod X / \---> Wnd [Und]-/] 441 * [\- Roq -> Q / \--> Wnq [Unq]-/] 442 * 443 * Rop = read old parity 444 * Rod = read old data 445 * Roq = read old "q" 446 * Cmt = commit node 447 * Und = unlock data disk 448 * Unp = unlock parity disk 449 * Unq = unlock q disk 450 * Wnp = write new parity 451 * Wnd = write new data 452 * Wnq = write new "q" 453 * [ ] denotes optional segments in the graph 454 * 455 * Parameters: raidPtr - description of the physical array 456 * asmap - logical & physical addresses for this access 457 * bp - buffer ptr (holds write data) 458 * flags - general flags (e.g. disk locking) 459 * allocList - list of memory allocated in DAG creation 460 * pfuncs - list of parity generating functions 461 * qfuncs - list of q generating functions 462 * 463 * A null qfuncs indicates single fault tolerant 464 *****************************************************************************/ 465 466 void 467 rf_CommonCreateSmallWriteDAG( 468 RF_Raid_t * raidPtr, 469 RF_AccessStripeMap_t * asmap, 470 RF_DagHeader_t * dag_h, 471 void *bp, 472 RF_RaidAccessFlags_t flags, 473 RF_AllocListElem_t * allocList, 474 const RF_RedFuncs_t * pfuncs, 475 const RF_RedFuncs_t * qfuncs) 476 { 477 RF_DagNode_t *readDataNodes, *readParityNodes, *readQNodes, *termNode; 478 RF_DagNode_t *unlockDataNodes, *unlockParityNodes, *unlockQNodes; 479 RF_DagNode_t *xorNodes, *qNodes, *blockNode, *commitNode, *nodes; 480 RF_DagNode_t *writeDataNodes, *writeParityNodes, *writeQNodes; 481 int i, j, nNodes, totalNumNodes, lu_flag; 482 RF_ReconUnitNum_t which_ru; 483 int (*func) (RF_DagNode_t *), (*undoFunc) (RF_DagNode_t *); 484 int (*qfunc) (RF_DagNode_t *); 485 int numDataNodes, numParityNodes; 486 RF_StripeNum_t parityStripeID; 487 RF_PhysDiskAddr_t *pda; 488 char *name, *qname; 489 long nfaults; 490 491 nfaults = qfuncs ? 2 : 1; 492 lu_flag = (rf_enableAtomicRMW) ? 1 : 0; /* lock/unlock flag */ 493 494 parityStripeID = rf_RaidAddressToParityStripeID(&(raidPtr->Layout), 495 asmap->raidAddress, &which_ru); 496 pda = asmap->physInfo; 497 numDataNodes = asmap->numStripeUnitsAccessed; 498 numParityNodes = (asmap->parityInfo->next) ? 2 : 1; 499 500 if (rf_dagDebug) { 501 printf("[Creating small-write DAG]\n"); 502 } 503 RF_ASSERT(numDataNodes > 0); 504 dag_h->creator = "SmallWriteDAG"; 505 506 dag_h->numCommitNodes = 1; 507 dag_h->numCommits = 0; 508 dag_h->numSuccedents = 1; 509 510 /* 511 * DAG creation occurs in four steps: 512 * 1. count the number of nodes in the DAG 513 * 2. create the nodes 514 * 3. initialize the nodes 515 * 4. connect the nodes 516 */ 517 518 /* 519 * Step 1. compute number of nodes in the graph 520 */ 521 522 /* number of nodes: a read and write for each data unit a redundancy 523 * computation node for each parity node (nfaults * nparity) a read 524 * and write for each parity unit a block and commit node (2) a 525 * terminate node if atomic RMW an unlock node for each data unit, 526 * redundancy unit */ 527 totalNumNodes = (2 * numDataNodes) + (nfaults * numParityNodes) 528 + (nfaults * 2 * numParityNodes) + 3; 529 if (lu_flag) { 530 totalNumNodes += (numDataNodes + (nfaults * numParityNodes)); 531 } 532 /* 533 * Step 2. create the nodes 534 */ 535 RF_CallocAndAdd(nodes, totalNumNodes, sizeof(RF_DagNode_t), 536 (RF_DagNode_t *), allocList); 537 i = 0; 538 blockNode = &nodes[i]; 539 i += 1; 540 commitNode = &nodes[i]; 541 i += 1; 542 readDataNodes = &nodes[i]; 543 i += numDataNodes; 544 readParityNodes = &nodes[i]; 545 i += numParityNodes; 546 writeDataNodes = &nodes[i]; 547 i += numDataNodes; 548 writeParityNodes = &nodes[i]; 549 i += numParityNodes; 550 xorNodes = &nodes[i]; 551 i += numParityNodes; 552 termNode = &nodes[i]; 553 i += 1; 554 if (lu_flag) { 555 unlockDataNodes = &nodes[i]; 556 i += numDataNodes; 557 unlockParityNodes = &nodes[i]; 558 i += numParityNodes; 559 } else { 560 unlockDataNodes = unlockParityNodes = NULL; 561 } 562 if (nfaults == 2) { 563 readQNodes = &nodes[i]; 564 i += numParityNodes; 565 writeQNodes = &nodes[i]; 566 i += numParityNodes; 567 qNodes = &nodes[i]; 568 i += numParityNodes; 569 if (lu_flag) { 570 unlockQNodes = &nodes[i]; 571 i += numParityNodes; 572 } else { 573 unlockQNodes = NULL; 574 } 575 } else { 576 readQNodes = writeQNodes = qNodes = unlockQNodes = NULL; 577 } 578 RF_ASSERT(i == totalNumNodes); 579 580 /* 581 * Step 3. initialize the nodes 582 */ 583 /* initialize block node (Nil) */ 584 nNodes = numDataNodes + (nfaults * numParityNodes); 585 rf_InitNode(blockNode, rf_wait, RF_FALSE, rf_NullNodeFunc, rf_NullNodeUndoFunc, 586 NULL, nNodes, 0, 0, 0, dag_h, "Nil", allocList); 587 588 /* initialize commit node (Cmt) */ 589 rf_InitNode(commitNode, rf_wait, RF_TRUE, rf_NullNodeFunc, rf_NullNodeUndoFunc, 590 NULL, nNodes, (nfaults * numParityNodes), 0, 0, dag_h, "Cmt", allocList); 591 592 /* initialize terminate node (Trm) */ 593 rf_InitNode(termNode, rf_wait, RF_FALSE, rf_TerminateFunc, rf_TerminateUndoFunc, 594 NULL, 0, nNodes, 0, 0, dag_h, "Trm", allocList); 595 596 /* initialize nodes which read old data (Rod) */ 597 for (i = 0; i < numDataNodes; i++) { 598 rf_InitNode(&readDataNodes[i], rf_wait, RF_FALSE, rf_DiskReadFunc, rf_DiskReadUndoFunc, 599 rf_GenericWakeupFunc, (nfaults * numParityNodes), 1, 4, 0, dag_h, 600 "Rod", allocList); 601 RF_ASSERT(pda != NULL); 602 /* physical disk addr desc */ 603 readDataNodes[i].params[0].p = pda; 604 /* buffer to hold old data */ 605 readDataNodes[i].params[1].p = rf_AllocBuffer(raidPtr, 606 dag_h, pda, allocList); 607 readDataNodes[i].params[2].v = parityStripeID; 608 readDataNodes[i].params[3].v = RF_CREATE_PARAM3(RF_IO_NORMAL_PRIORITY, 609 lu_flag, 0, which_ru); 610 pda = pda->next; 611 for (j = 0; j < readDataNodes[i].numSuccedents; j++) { 612 readDataNodes[i].propList[j] = NULL; 613 } 614 } 615 616 /* initialize nodes which read old parity (Rop) */ 617 pda = asmap->parityInfo; 618 i = 0; 619 for (i = 0; i < numParityNodes; i++) { 620 RF_ASSERT(pda != NULL); 621 rf_InitNode(&readParityNodes[i], rf_wait, RF_FALSE, rf_DiskReadFunc, 622 rf_DiskReadUndoFunc, rf_GenericWakeupFunc, numParityNodes, 1, 4, 623 0, dag_h, "Rop", allocList); 624 readParityNodes[i].params[0].p = pda; 625 /* buffer to hold old parity */ 626 readParityNodes[i].params[1].p = rf_AllocBuffer(raidPtr, 627 dag_h, pda, allocList); 628 readParityNodes[i].params[2].v = parityStripeID; 629 readParityNodes[i].params[3].v = RF_CREATE_PARAM3(RF_IO_NORMAL_PRIORITY, 630 lu_flag, 0, which_ru); 631 pda = pda->next; 632 for (j = 0; j < readParityNodes[i].numSuccedents; j++) { 633 readParityNodes[i].propList[0] = NULL; 634 } 635 } 636 637 /* initialize nodes which read old Q (Roq) */ 638 if (nfaults == 2) { 639 pda = asmap->qInfo; 640 for (i = 0; i < numParityNodes; i++) { 641 RF_ASSERT(pda != NULL); 642 rf_InitNode(&readQNodes[i], rf_wait, RF_FALSE, rf_DiskReadFunc, rf_DiskReadUndoFunc, 643 rf_GenericWakeupFunc, numParityNodes, 1, 4, 0, dag_h, "Roq", allocList); 644 readQNodes[i].params[0].p = pda; 645 /* buffer to hold old Q */ 646 readQNodes[i].params[1].p = rf_AllocBuffer(raidPtr, dag_h, pda, 647 allocList); 648 readQNodes[i].params[2].v = parityStripeID; 649 readQNodes[i].params[3].v = RF_CREATE_PARAM3(RF_IO_NORMAL_PRIORITY, 650 lu_flag, 0, which_ru); 651 pda = pda->next; 652 for (j = 0; j < readQNodes[i].numSuccedents; j++) { 653 readQNodes[i].propList[0] = NULL; 654 } 655 } 656 } 657 /* initialize nodes which write new data (Wnd) */ 658 pda = asmap->physInfo; 659 for (i = 0; i < numDataNodes; i++) { 660 RF_ASSERT(pda != NULL); 661 rf_InitNode(&writeDataNodes[i], rf_wait, RF_FALSE, rf_DiskWriteFunc, 662 rf_DiskWriteUndoFunc, rf_GenericWakeupFunc, 1, 1, 4, 0, dag_h, 663 "Wnd", allocList); 664 /* physical disk addr desc */ 665 writeDataNodes[i].params[0].p = pda; 666 /* buffer holding new data to be written */ 667 writeDataNodes[i].params[1].p = pda->bufPtr; 668 writeDataNodes[i].params[2].v = parityStripeID; 669 writeDataNodes[i].params[3].v = RF_CREATE_PARAM3(RF_IO_NORMAL_PRIORITY, 670 0, 0, which_ru); 671 if (lu_flag) { 672 /* initialize node to unlock the disk queue */ 673 rf_InitNode(&unlockDataNodes[i], rf_wait, RF_FALSE, rf_DiskUnlockFunc, 674 rf_DiskUnlockUndoFunc, rf_GenericWakeupFunc, 1, 1, 2, 0, dag_h, 675 "Und", allocList); 676 /* physical disk addr desc */ 677 unlockDataNodes[i].params[0].p = pda; 678 unlockDataNodes[i].params[1].v = RF_CREATE_PARAM3(RF_IO_NORMAL_PRIORITY, 679 0, lu_flag, which_ru); 680 } 681 pda = pda->next; 682 } 683 684 /* 685 * Initialize nodes which compute new parity and Q. 686 */ 687 /* 688 * We use the simple XOR func in the double-XOR case, and when 689 * we're accessing only a portion of one stripe unit. The distinction 690 * between the two is that the regular XOR func assumes that the targbuf 691 * is a full SU in size, and examines the pda associated with the buffer 692 * to decide where within the buffer to XOR the data, whereas 693 * the simple XOR func just XORs the data into the start of the buffer. 694 */ 695 if ((numParityNodes == 2) || ((numDataNodes == 1) 696 && (asmap->totalSectorsAccessed < raidPtr->Layout.sectorsPerStripeUnit))) { 697 func = pfuncs->simple; 698 undoFunc = rf_NullNodeUndoFunc; 699 name = pfuncs->SimpleName; 700 if (qfuncs) { 701 qfunc = qfuncs->simple; 702 qname = qfuncs->SimpleName; 703 } else { 704 qfunc = NULL; 705 qname = NULL; 706 } 707 } else { 708 func = pfuncs->regular; 709 undoFunc = rf_NullNodeUndoFunc; 710 name = pfuncs->RegularName; 711 if (qfuncs) { 712 qfunc = qfuncs->regular; 713 qname = qfuncs->RegularName; 714 } else { 715 qfunc = NULL; 716 qname = NULL; 717 } 718 } 719 /* 720 * Initialize the xor nodes: params are {pda,buf} 721 * from {Rod,Wnd,Rop} nodes, and raidPtr 722 */ 723 if (numParityNodes == 2) { 724 /* double-xor case */ 725 for (i = 0; i < numParityNodes; i++) { 726 /* note: no wakeup func for xor */ 727 rf_InitNode(&xorNodes[i], rf_wait, RF_FALSE, func, undoFunc, NULL, 728 1, (numDataNodes + numParityNodes), 7, 1, dag_h, name, allocList); 729 xorNodes[i].flags |= RF_DAGNODE_FLAG_YIELD; 730 xorNodes[i].params[0] = readDataNodes[i].params[0]; 731 xorNodes[i].params[1] = readDataNodes[i].params[1]; 732 xorNodes[i].params[2] = readParityNodes[i].params[0]; 733 xorNodes[i].params[3] = readParityNodes[i].params[1]; 734 xorNodes[i].params[4] = writeDataNodes[i].params[0]; 735 xorNodes[i].params[5] = writeDataNodes[i].params[1]; 736 xorNodes[i].params[6].p = raidPtr; 737 /* use old parity buf as target buf */ 738 xorNodes[i].results[0] = readParityNodes[i].params[1].p; 739 if (nfaults == 2) { 740 /* note: no wakeup func for qor */ 741 rf_InitNode(&qNodes[i], rf_wait, RF_FALSE, qfunc, undoFunc, NULL, 1, 742 (numDataNodes + numParityNodes), 7, 1, dag_h, qname, allocList); 743 qNodes[i].params[0] = readDataNodes[i].params[0]; 744 qNodes[i].params[1] = readDataNodes[i].params[1]; 745 qNodes[i].params[2] = readQNodes[i].params[0]; 746 qNodes[i].params[3] = readQNodes[i].params[1]; 747 qNodes[i].params[4] = writeDataNodes[i].params[0]; 748 qNodes[i].params[5] = writeDataNodes[i].params[1]; 749 qNodes[i].params[6].p = raidPtr; 750 /* use old Q buf as target buf */ 751 qNodes[i].results[0] = readQNodes[i].params[1].p; 752 } 753 } 754 } else { 755 /* there is only one xor node in this case */ 756 rf_InitNode(&xorNodes[0], rf_wait, RF_FALSE, func, undoFunc, NULL, 1, 757 (numDataNodes + numParityNodes), 758 (2 * (numDataNodes + numDataNodes + 1) + 1), 1, dag_h, name, allocList); 759 xorNodes[0].flags |= RF_DAGNODE_FLAG_YIELD; 760 for (i = 0; i < numDataNodes + 1; i++) { 761 /* set up params related to Rod and Rop nodes */ 762 xorNodes[0].params[2 * i + 0] = readDataNodes[i].params[0]; /* pda */ 763 xorNodes[0].params[2 * i + 1] = readDataNodes[i].params[1]; /* buffer ptr */ 764 } 765 for (i = 0; i < numDataNodes; i++) { 766 /* set up params related to Wnd and Wnp nodes */ 767 xorNodes[0].params[2 * (numDataNodes + 1 + i) + 0] = /* pda */ 768 writeDataNodes[i].params[0]; 769 xorNodes[0].params[2 * (numDataNodes + 1 + i) + 1] = /* buffer ptr */ 770 writeDataNodes[i].params[1]; 771 } 772 /* xor node needs to get at RAID information */ 773 xorNodes[0].params[2 * (numDataNodes + numDataNodes + 1)].p = raidPtr; 774 xorNodes[0].results[0] = readParityNodes[0].params[1].p; 775 if (nfaults == 2) { 776 rf_InitNode(&qNodes[0], rf_wait, RF_FALSE, qfunc, undoFunc, NULL, 1, 777 (numDataNodes + numParityNodes), 778 (2 * (numDataNodes + numDataNodes + 1) + 1), 1, dag_h, 779 qname, allocList); 780 for (i = 0; i < numDataNodes; i++) { 781 /* set up params related to Rod */ 782 qNodes[0].params[2 * i + 0] = readDataNodes[i].params[0]; /* pda */ 783 qNodes[0].params[2 * i + 1] = readDataNodes[i].params[1]; /* buffer ptr */ 784 } 785 /* and read old q */ 786 qNodes[0].params[2 * numDataNodes + 0] = /* pda */ 787 readQNodes[0].params[0]; 788 qNodes[0].params[2 * numDataNodes + 1] = /* buffer ptr */ 789 readQNodes[0].params[1]; 790 for (i = 0; i < numDataNodes; i++) { 791 /* set up params related to Wnd nodes */ 792 qNodes[0].params[2 * (numDataNodes + 1 + i) + 0] = /* pda */ 793 writeDataNodes[i].params[0]; 794 qNodes[0].params[2 * (numDataNodes + 1 + i) + 1] = /* buffer ptr */ 795 writeDataNodes[i].params[1]; 796 } 797 /* xor node needs to get at RAID information */ 798 qNodes[0].params[2 * (numDataNodes + numDataNodes + 1)].p = raidPtr; 799 qNodes[0].results[0] = readQNodes[0].params[1].p; 800 } 801 } 802 803 /* initialize nodes which write new parity (Wnp) */ 804 pda = asmap->parityInfo; 805 for (i = 0; i < numParityNodes; i++) { 806 rf_InitNode(&writeParityNodes[i], rf_wait, RF_FALSE, rf_DiskWriteFunc, 807 rf_DiskWriteUndoFunc, rf_GenericWakeupFunc, 1, 1, 4, 0, dag_h, 808 "Wnp", allocList); 809 RF_ASSERT(pda != NULL); 810 writeParityNodes[i].params[0].p = pda; /* param 1 (bufPtr) 811 * filled in by xor node */ 812 writeParityNodes[i].params[1].p = xorNodes[i].results[0]; /* buffer pointer for 813 * parity write 814 * operation */ 815 writeParityNodes[i].params[2].v = parityStripeID; 816 writeParityNodes[i].params[3].v = RF_CREATE_PARAM3(RF_IO_NORMAL_PRIORITY, 817 0, 0, which_ru); 818 if (lu_flag) { 819 /* initialize node to unlock the disk queue */ 820 rf_InitNode(&unlockParityNodes[i], rf_wait, RF_FALSE, rf_DiskUnlockFunc, 821 rf_DiskUnlockUndoFunc, rf_GenericWakeupFunc, 1, 1, 2, 0, dag_h, 822 "Unp", allocList); 823 unlockParityNodes[i].params[0].p = pda; /* physical disk addr 824 * desc */ 825 unlockParityNodes[i].params[1].v = RF_CREATE_PARAM3(RF_IO_NORMAL_PRIORITY, 826 0, lu_flag, which_ru); 827 } 828 pda = pda->next; 829 } 830 831 /* initialize nodes which write new Q (Wnq) */ 832 if (nfaults == 2) { 833 pda = asmap->qInfo; 834 for (i = 0; i < numParityNodes; i++) { 835 rf_InitNode(&writeQNodes[i], rf_wait, RF_FALSE, rf_DiskWriteFunc, 836 rf_DiskWriteUndoFunc, rf_GenericWakeupFunc, 1, 1, 4, 0, dag_h, 837 "Wnq", allocList); 838 RF_ASSERT(pda != NULL); 839 writeQNodes[i].params[0].p = pda; /* param 1 (bufPtr) 840 * filled in by xor node */ 841 writeQNodes[i].params[1].p = qNodes[i].results[0]; /* buffer pointer for 842 * parity write 843 * operation */ 844 writeQNodes[i].params[2].v = parityStripeID; 845 writeQNodes[i].params[3].v = RF_CREATE_PARAM3(RF_IO_NORMAL_PRIORITY, 846 0, 0, which_ru); 847 if (lu_flag) { 848 /* initialize node to unlock the disk queue */ 849 rf_InitNode(&unlockQNodes[i], rf_wait, RF_FALSE, rf_DiskUnlockFunc, 850 rf_DiskUnlockUndoFunc, rf_GenericWakeupFunc, 1, 1, 2, 0, dag_h, 851 "Unq", allocList); 852 unlockQNodes[i].params[0].p = pda; /* physical disk addr 853 * desc */ 854 unlockQNodes[i].params[1].v = RF_CREATE_PARAM3(RF_IO_NORMAL_PRIORITY, 855 0, lu_flag, which_ru); 856 } 857 pda = pda->next; 858 } 859 } 860 /* 861 * Step 4. connect the nodes. 862 */ 863 864 /* connect header to block node */ 865 dag_h->succedents[0] = blockNode; 866 867 /* connect block node to read old data nodes */ 868 RF_ASSERT(blockNode->numSuccedents == (numDataNodes + (numParityNodes * nfaults))); 869 for (i = 0; i < numDataNodes; i++) { 870 blockNode->succedents[i] = &readDataNodes[i]; 871 RF_ASSERT(readDataNodes[i].numAntecedents == 1); 872 readDataNodes[i].antecedents[0] = blockNode; 873 readDataNodes[i].antType[0] = rf_control; 874 } 875 876 /* connect block node to read old parity nodes */ 877 for (i = 0; i < numParityNodes; i++) { 878 blockNode->succedents[numDataNodes + i] = &readParityNodes[i]; 879 RF_ASSERT(readParityNodes[i].numAntecedents == 1); 880 readParityNodes[i].antecedents[0] = blockNode; 881 readParityNodes[i].antType[0] = rf_control; 882 } 883 884 /* connect block node to read old Q nodes */ 885 if (nfaults == 2) { 886 for (i = 0; i < numParityNodes; i++) { 887 blockNode->succedents[numDataNodes + numParityNodes + i] = &readQNodes[i]; 888 RF_ASSERT(readQNodes[i].numAntecedents == 1); 889 readQNodes[i].antecedents[0] = blockNode; 890 readQNodes[i].antType[0] = rf_control; 891 } 892 } 893 /* connect read old data nodes to xor nodes */ 894 for (i = 0; i < numDataNodes; i++) { 895 RF_ASSERT(readDataNodes[i].numSuccedents == (nfaults * numParityNodes)); 896 for (j = 0; j < numParityNodes; j++) { 897 RF_ASSERT(xorNodes[j].numAntecedents == numDataNodes + numParityNodes); 898 readDataNodes[i].succedents[j] = &xorNodes[j]; 899 xorNodes[j].antecedents[i] = &readDataNodes[i]; 900 xorNodes[j].antType[i] = rf_trueData; 901 } 902 } 903 904 /* connect read old data nodes to q nodes */ 905 if (nfaults == 2) { 906 for (i = 0; i < numDataNodes; i++) { 907 for (j = 0; j < numParityNodes; j++) { 908 RF_ASSERT(qNodes[j].numAntecedents == numDataNodes + numParityNodes); 909 readDataNodes[i].succedents[numParityNodes + j] = &qNodes[j]; 910 qNodes[j].antecedents[i] = &readDataNodes[i]; 911 qNodes[j].antType[i] = rf_trueData; 912 } 913 } 914 } 915 /* connect read old parity nodes to xor nodes */ 916 for (i = 0; i < numParityNodes; i++) { 917 RF_ASSERT(readParityNodes[i].numSuccedents == numParityNodes); 918 for (j = 0; j < numParityNodes; j++) { 919 readParityNodes[i].succedents[j] = &xorNodes[j]; 920 xorNodes[j].antecedents[numDataNodes + i] = &readParityNodes[i]; 921 xorNodes[j].antType[numDataNodes + i] = rf_trueData; 922 } 923 } 924 925 /* connect read old q nodes to q nodes */ 926 if (nfaults == 2) { 927 for (i = 0; i < numParityNodes; i++) { 928 RF_ASSERT(readParityNodes[i].numSuccedents == numParityNodes); 929 for (j = 0; j < numParityNodes; j++) { 930 readQNodes[i].succedents[j] = &qNodes[j]; 931 qNodes[j].antecedents[numDataNodes + i] = &readQNodes[i]; 932 qNodes[j].antType[numDataNodes + i] = rf_trueData; 933 } 934 } 935 } 936 /* connect xor nodes to commit node */ 937 RF_ASSERT(commitNode->numAntecedents == (nfaults * numParityNodes)); 938 for (i = 0; i < numParityNodes; i++) { 939 RF_ASSERT(xorNodes[i].numSuccedents == 1); 940 xorNodes[i].succedents[0] = commitNode; 941 commitNode->antecedents[i] = &xorNodes[i]; 942 commitNode->antType[i] = rf_control; 943 } 944 945 /* connect q nodes to commit node */ 946 if (nfaults == 2) { 947 for (i = 0; i < numParityNodes; i++) { 948 RF_ASSERT(qNodes[i].numSuccedents == 1); 949 qNodes[i].succedents[0] = commitNode; 950 commitNode->antecedents[i + numParityNodes] = &qNodes[i]; 951 commitNode->antType[i + numParityNodes] = rf_control; 952 } 953 } 954 /* connect commit node to write nodes */ 955 RF_ASSERT(commitNode->numSuccedents == (numDataNodes + (nfaults * numParityNodes))); 956 for (i = 0; i < numDataNodes; i++) { 957 RF_ASSERT(writeDataNodes[i].numAntecedents == 1); 958 commitNode->succedents[i] = &writeDataNodes[i]; 959 writeDataNodes[i].antecedents[0] = commitNode; 960 writeDataNodes[i].antType[0] = rf_trueData; 961 } 962 for (i = 0; i < numParityNodes; i++) { 963 RF_ASSERT(writeParityNodes[i].numAntecedents == 1); 964 commitNode->succedents[i + numDataNodes] = &writeParityNodes[i]; 965 writeParityNodes[i].antecedents[0] = commitNode; 966 writeParityNodes[i].antType[0] = rf_trueData; 967 } 968 if (nfaults == 2) { 969 for (i = 0; i < numParityNodes; i++) { 970 RF_ASSERT(writeQNodes[i].numAntecedents == 1); 971 commitNode->succedents[i + numDataNodes + numParityNodes] = &writeQNodes[i]; 972 writeQNodes[i].antecedents[0] = commitNode; 973 writeQNodes[i].antType[0] = rf_trueData; 974 } 975 } 976 RF_ASSERT(termNode->numAntecedents == (numDataNodes + (nfaults * numParityNodes))); 977 RF_ASSERT(termNode->numSuccedents == 0); 978 for (i = 0; i < numDataNodes; i++) { 979 if (lu_flag) { 980 /* connect write new data nodes to unlock nodes */ 981 RF_ASSERT(writeDataNodes[i].numSuccedents == 1); 982 RF_ASSERT(unlockDataNodes[i].numAntecedents == 1); 983 writeDataNodes[i].succedents[0] = &unlockDataNodes[i]; 984 unlockDataNodes[i].antecedents[0] = &writeDataNodes[i]; 985 unlockDataNodes[i].antType[0] = rf_control; 986 987 /* connect unlock nodes to term node */ 988 RF_ASSERT(unlockDataNodes[i].numSuccedents == 1); 989 unlockDataNodes[i].succedents[0] = termNode; 990 termNode->antecedents[i] = &unlockDataNodes[i]; 991 termNode->antType[i] = rf_control; 992 } else { 993 /* connect write new data nodes to term node */ 994 RF_ASSERT(writeDataNodes[i].numSuccedents == 1); 995 RF_ASSERT(termNode->numAntecedents == (numDataNodes + (nfaults * numParityNodes))); 996 writeDataNodes[i].succedents[0] = termNode; 997 termNode->antecedents[i] = &writeDataNodes[i]; 998 termNode->antType[i] = rf_control; 999 } 1000 } 1001 1002 for (i = 0; i < numParityNodes; i++) { 1003 if (lu_flag) { 1004 /* connect write new parity nodes to unlock nodes */ 1005 RF_ASSERT(writeParityNodes[i].numSuccedents == 1); 1006 RF_ASSERT(unlockParityNodes[i].numAntecedents == 1); 1007 writeParityNodes[i].succedents[0] = &unlockParityNodes[i]; 1008 unlockParityNodes[i].antecedents[0] = &writeParityNodes[i]; 1009 unlockParityNodes[i].antType[0] = rf_control; 1010 1011 /* connect unlock nodes to term node */ 1012 RF_ASSERT(unlockParityNodes[i].numSuccedents == 1); 1013 unlockParityNodes[i].succedents[0] = termNode; 1014 termNode->antecedents[numDataNodes + i] = &unlockParityNodes[i]; 1015 termNode->antType[numDataNodes + i] = rf_control; 1016 } else { 1017 RF_ASSERT(writeParityNodes[i].numSuccedents == 1); 1018 writeParityNodes[i].succedents[0] = termNode; 1019 termNode->antecedents[numDataNodes + i] = &writeParityNodes[i]; 1020 termNode->antType[numDataNodes + i] = rf_control; 1021 } 1022 } 1023 1024 if (nfaults == 2) { 1025 for (i = 0; i < numParityNodes; i++) { 1026 if (lu_flag) { 1027 /* connect write new Q nodes to unlock nodes */ 1028 RF_ASSERT(writeQNodes[i].numSuccedents == 1); 1029 RF_ASSERT(unlockQNodes[i].numAntecedents == 1); 1030 writeQNodes[i].succedents[0] = &unlockQNodes[i]; 1031 unlockQNodes[i].antecedents[0] = &writeQNodes[i]; 1032 unlockQNodes[i].antType[0] = rf_control; 1033 1034 /* connect unlock nodes to unblock node */ 1035 RF_ASSERT(unlockQNodes[i].numSuccedents == 1); 1036 unlockQNodes[i].succedents[0] = termNode; 1037 termNode->antecedents[numDataNodes + numParityNodes + i] = &unlockQNodes[i]; 1038 termNode->antType[numDataNodes + numParityNodes + i] = rf_control; 1039 } else { 1040 RF_ASSERT(writeQNodes[i].numSuccedents == 1); 1041 writeQNodes[i].succedents[0] = termNode; 1042 termNode->antecedents[numDataNodes + numParityNodes + i] = &writeQNodes[i]; 1043 termNode->antType[numDataNodes + numParityNodes + i] = rf_control; 1044 } 1045 } 1046 } 1047 } 1048 1049 1050 /****************************************************************************** 1051 * create a write graph (fault-free or degraded) for RAID level 1 1052 * 1053 * Hdr -> Commit -> Wpd -> Nil -> Trm 1054 * -> Wsd -> 1055 * 1056 * The "Wpd" node writes data to the primary copy in the mirror pair 1057 * The "Wsd" node writes data to the secondary copy in the mirror pair 1058 * 1059 * Parameters: raidPtr - description of the physical array 1060 * asmap - logical & physical addresses for this access 1061 * bp - buffer ptr (holds write data) 1062 * flags - general flags (e.g. disk locking) 1063 * allocList - list of memory allocated in DAG creation 1064 *****************************************************************************/ 1065 1066 void 1067 rf_CreateRaidOneWriteDAG( 1068 RF_Raid_t * raidPtr, 1069 RF_AccessStripeMap_t * asmap, 1070 RF_DagHeader_t * dag_h, 1071 void *bp, 1072 RF_RaidAccessFlags_t flags, 1073 RF_AllocListElem_t * allocList) 1074 { 1075 RF_DagNode_t *unblockNode, *termNode, *commitNode; 1076 RF_DagNode_t *nodes, *wndNode, *wmirNode; 1077 int nWndNodes, nWmirNodes, i; 1078 RF_ReconUnitNum_t which_ru; 1079 RF_PhysDiskAddr_t *pda, *pdaP; 1080 RF_StripeNum_t parityStripeID; 1081 1082 parityStripeID = rf_RaidAddressToParityStripeID(&(raidPtr->Layout), 1083 asmap->raidAddress, &which_ru); 1084 if (rf_dagDebug) { 1085 printf("[Creating RAID level 1 write DAG]\n"); 1086 } 1087 dag_h->creator = "RaidOneWriteDAG"; 1088 1089 /* 2 implies access not SU aligned */ 1090 nWmirNodes = (asmap->parityInfo->next) ? 2 : 1; 1091 nWndNodes = (asmap->physInfo->next) ? 2 : 1; 1092 1093 /* alloc the Wnd nodes and the Wmir node */ 1094 if (asmap->numDataFailed == 1) 1095 nWndNodes--; 1096 if (asmap->numParityFailed == 1) 1097 nWmirNodes--; 1098 1099 /* total number of nodes = nWndNodes + nWmirNodes + (commit + unblock 1100 * + terminator) */ 1101 RF_CallocAndAdd(nodes, nWndNodes + nWmirNodes + 3, sizeof(RF_DagNode_t), 1102 (RF_DagNode_t *), allocList); 1103 i = 0; 1104 wndNode = &nodes[i]; 1105 i += nWndNodes; 1106 wmirNode = &nodes[i]; 1107 i += nWmirNodes; 1108 commitNode = &nodes[i]; 1109 i += 1; 1110 unblockNode = &nodes[i]; 1111 i += 1; 1112 termNode = &nodes[i]; 1113 i += 1; 1114 RF_ASSERT(i == (nWndNodes + nWmirNodes + 3)); 1115 1116 /* this dag can commit immediately */ 1117 dag_h->numCommitNodes = 1; 1118 dag_h->numCommits = 0; 1119 dag_h->numSuccedents = 1; 1120 1121 /* initialize the commit, unblock, and term nodes */ 1122 rf_InitNode(commitNode, rf_wait, RF_TRUE, rf_NullNodeFunc, rf_NullNodeUndoFunc, 1123 NULL, (nWndNodes + nWmirNodes), 0, 0, 0, dag_h, "Cmt", allocList); 1124 rf_InitNode(unblockNode, rf_wait, RF_FALSE, rf_NullNodeFunc, rf_NullNodeUndoFunc, 1125 NULL, 1, (nWndNodes + nWmirNodes), 0, 0, dag_h, "Nil", allocList); 1126 rf_InitNode(termNode, rf_wait, RF_FALSE, rf_TerminateFunc, rf_TerminateUndoFunc, 1127 NULL, 0, 1, 0, 0, dag_h, "Trm", allocList); 1128 1129 /* initialize the wnd nodes */ 1130 if (nWndNodes > 0) { 1131 pda = asmap->physInfo; 1132 for (i = 0; i < nWndNodes; i++) { 1133 rf_InitNode(&wndNode[i], rf_wait, RF_FALSE, rf_DiskWriteFunc, rf_DiskWriteUndoFunc, 1134 rf_GenericWakeupFunc, 1, 1, 4, 0, dag_h, "Wpd", allocList); 1135 RF_ASSERT(pda != NULL); 1136 wndNode[i].params[0].p = pda; 1137 wndNode[i].params[1].p = pda->bufPtr; 1138 wndNode[i].params[2].v = parityStripeID; 1139 wndNode[i].params[3].v = RF_CREATE_PARAM3(RF_IO_NORMAL_PRIORITY, 0, 0, which_ru); 1140 pda = pda->next; 1141 } 1142 RF_ASSERT(pda == NULL); 1143 } 1144 /* initialize the mirror nodes */ 1145 if (nWmirNodes > 0) { 1146 pda = asmap->physInfo; 1147 pdaP = asmap->parityInfo; 1148 for (i = 0; i < nWmirNodes; i++) { 1149 rf_InitNode(&wmirNode[i], rf_wait, RF_FALSE, rf_DiskWriteFunc, rf_DiskWriteUndoFunc, 1150 rf_GenericWakeupFunc, 1, 1, 4, 0, dag_h, "Wsd", allocList); 1151 RF_ASSERT(pda != NULL); 1152 wmirNode[i].params[0].p = pdaP; 1153 wmirNode[i].params[1].p = pda->bufPtr; 1154 wmirNode[i].params[2].v = parityStripeID; 1155 wmirNode[i].params[3].v = RF_CREATE_PARAM3(RF_IO_NORMAL_PRIORITY, 0, 0, which_ru); 1156 pda = pda->next; 1157 pdaP = pdaP->next; 1158 } 1159 RF_ASSERT(pda == NULL); 1160 RF_ASSERT(pdaP == NULL); 1161 } 1162 /* link the header node to the commit node */ 1163 RF_ASSERT(dag_h->numSuccedents == 1); 1164 RF_ASSERT(commitNode->numAntecedents == 0); 1165 dag_h->succedents[0] = commitNode; 1166 1167 /* link the commit node to the write nodes */ 1168 RF_ASSERT(commitNode->numSuccedents == (nWndNodes + nWmirNodes)); 1169 for (i = 0; i < nWndNodes; i++) { 1170 RF_ASSERT(wndNode[i].numAntecedents == 1); 1171 commitNode->succedents[i] = &wndNode[i]; 1172 wndNode[i].antecedents[0] = commitNode; 1173 wndNode[i].antType[0] = rf_control; 1174 } 1175 for (i = 0; i < nWmirNodes; i++) { 1176 RF_ASSERT(wmirNode[i].numAntecedents == 1); 1177 commitNode->succedents[i + nWndNodes] = &wmirNode[i]; 1178 wmirNode[i].antecedents[0] = commitNode; 1179 wmirNode[i].antType[0] = rf_control; 1180 } 1181 1182 /* link the write nodes to the unblock node */ 1183 RF_ASSERT(unblockNode->numAntecedents == (nWndNodes + nWmirNodes)); 1184 for (i = 0; i < nWndNodes; i++) { 1185 RF_ASSERT(wndNode[i].numSuccedents == 1); 1186 wndNode[i].succedents[0] = unblockNode; 1187 unblockNode->antecedents[i] = &wndNode[i]; 1188 unblockNode->antType[i] = rf_control; 1189 } 1190 for (i = 0; i < nWmirNodes; i++) { 1191 RF_ASSERT(wmirNode[i].numSuccedents == 1); 1192 wmirNode[i].succedents[0] = unblockNode; 1193 unblockNode->antecedents[i + nWndNodes] = &wmirNode[i]; 1194 unblockNode->antType[i + nWndNodes] = rf_control; 1195 } 1196 1197 /* link the unblock node to the term node */ 1198 RF_ASSERT(unblockNode->numSuccedents == 1); 1199 RF_ASSERT(termNode->numAntecedents == 1); 1200 RF_ASSERT(termNode->numSuccedents == 0); 1201 unblockNode->succedents[0] = termNode; 1202 termNode->antecedents[0] = unblockNode; 1203 termNode->antType[0] = rf_control; 1204 } 1205 1206 1207 #if 0 1208 /* DAGs which have no commit points. 1209 * 1210 * The following DAGs are used in forward and backward error recovery experiments. 1211 * They are identical to the DAGs above this comment with the exception that the 1212 * the commit points have been removed. 1213 */ 1214 1215 1216 1217 void 1218 rf_CommonCreateLargeWriteDAGFwd( 1219 RF_Raid_t * raidPtr, 1220 RF_AccessStripeMap_t * asmap, 1221 RF_DagHeader_t * dag_h, 1222 void *bp, 1223 RF_RaidAccessFlags_t flags, 1224 RF_AllocListElem_t * allocList, 1225 int nfaults, 1226 int (*redFunc) (RF_DagNode_t *), 1227 int allowBufferRecycle) 1228 { 1229 RF_DagNode_t *nodes, *wndNodes, *rodNodes, *xorNode, *wnpNode; 1230 RF_DagNode_t *wnqNode, *blockNode, *syncNode, *termNode; 1231 int nWndNodes, nRodNodes, i, nodeNum, asmNum; 1232 RF_AccessStripeMapHeader_t *new_asm_h[2]; 1233 RF_StripeNum_t parityStripeID; 1234 char *sosBuffer, *eosBuffer; 1235 RF_ReconUnitNum_t which_ru; 1236 RF_RaidLayout_t *layoutPtr; 1237 RF_PhysDiskAddr_t *pda; 1238 1239 layoutPtr = &(raidPtr->Layout); 1240 parityStripeID = rf_RaidAddressToParityStripeID(&(raidPtr->Layout), asmap->raidAddress, &which_ru); 1241 1242 if (rf_dagDebug) 1243 printf("[Creating large-write DAG]\n"); 1244 dag_h->creator = "LargeWriteDAGFwd"; 1245 1246 dag_h->numCommitNodes = 0; 1247 dag_h->numCommits = 0; 1248 dag_h->numSuccedents = 1; 1249 1250 /* alloc the nodes: Wnd, xor, commit, block, term, and Wnp */ 1251 nWndNodes = asmap->numStripeUnitsAccessed; 1252 RF_CallocAndAdd(nodes, nWndNodes + 4 + nfaults, sizeof(RF_DagNode_t), (RF_DagNode_t *), allocList); 1253 i = 0; 1254 wndNodes = &nodes[i]; 1255 i += nWndNodes; 1256 xorNode = &nodes[i]; 1257 i += 1; 1258 wnpNode = &nodes[i]; 1259 i += 1; 1260 blockNode = &nodes[i]; 1261 i += 1; 1262 syncNode = &nodes[i]; 1263 i += 1; 1264 termNode = &nodes[i]; 1265 i += 1; 1266 if (nfaults == 2) { 1267 wnqNode = &nodes[i]; 1268 i += 1; 1269 } else { 1270 wnqNode = NULL; 1271 } 1272 rf_MapUnaccessedPortionOfStripe(raidPtr, layoutPtr, asmap, dag_h, new_asm_h, &nRodNodes, &sosBuffer, &eosBuffer, allocList); 1273 if (nRodNodes > 0) { 1274 RF_CallocAndAdd(rodNodes, nRodNodes, sizeof(RF_DagNode_t), (RF_DagNode_t *), allocList); 1275 } else { 1276 rodNodes = NULL; 1277 } 1278 1279 /* begin node initialization */ 1280 if (nRodNodes > 0) { 1281 rf_InitNode(blockNode, rf_wait, RF_FALSE, rf_NullNodeFunc, rf_NullNodeUndoFunc, NULL, nRodNodes, 0, 0, 0, dag_h, "Nil", allocList); 1282 rf_InitNode(syncNode, rf_wait, RF_FALSE, rf_NullNodeFunc, rf_NullNodeUndoFunc, NULL, nWndNodes + 1, nRodNodes, 0, 0, dag_h, "Nil", allocList); 1283 } else { 1284 rf_InitNode(blockNode, rf_wait, RF_FALSE, rf_NullNodeFunc, rf_NullNodeUndoFunc, NULL, 1, 0, 0, 0, dag_h, "Nil", allocList); 1285 rf_InitNode(syncNode, rf_wait, RF_FALSE, rf_NullNodeFunc, rf_NullNodeUndoFunc, NULL, nWndNodes + 1, 1, 0, 0, dag_h, "Nil", allocList); 1286 } 1287 1288 rf_InitNode(termNode, rf_wait, RF_FALSE, rf_TerminateFunc, rf_TerminateUndoFunc, NULL, 0, nWndNodes + nfaults, 0, 0, dag_h, "Trm", allocList); 1289 1290 /* initialize the Rod nodes */ 1291 for (nodeNum = asmNum = 0; asmNum < 2; asmNum++) { 1292 if (new_asm_h[asmNum]) { 1293 pda = new_asm_h[asmNum]->stripeMap->physInfo; 1294 while (pda) { 1295 rf_InitNode(&rodNodes[nodeNum], rf_wait, RF_FALSE, rf_DiskReadFunc, rf_DiskReadUndoFunc, rf_GenericWakeupFunc, 1, 1, 4, 0, dag_h, "Rod", allocList); 1296 rodNodes[nodeNum].params[0].p = pda; 1297 rodNodes[nodeNum].params[1].p = pda->bufPtr; 1298 rodNodes[nodeNum].params[2].v = parityStripeID; 1299 rodNodes[nodeNum].params[3].v = RF_CREATE_PARAM3(RF_IO_NORMAL_PRIORITY, 0, 0, which_ru); 1300 nodeNum++; 1301 pda = pda->next; 1302 } 1303 } 1304 } 1305 RF_ASSERT(nodeNum == nRodNodes); 1306 1307 /* initialize the wnd nodes */ 1308 pda = asmap->physInfo; 1309 for (i = 0; i < nWndNodes; i++) { 1310 rf_InitNode(&wndNodes[i], rf_wait, RF_FALSE, rf_DiskWriteFunc, rf_DiskWriteUndoFunc, rf_GenericWakeupFunc, 1, 1, 4, 0, dag_h, "Wnd", allocList); 1311 RF_ASSERT(pda != NULL); 1312 wndNodes[i].params[0].p = pda; 1313 wndNodes[i].params[1].p = pda->bufPtr; 1314 wndNodes[i].params[2].v = parityStripeID; 1315 wndNodes[i].params[3].v = RF_CREATE_PARAM3(RF_IO_NORMAL_PRIORITY, 0, 0, which_ru); 1316 pda = pda->next; 1317 } 1318 1319 /* initialize the redundancy node */ 1320 rf_InitNode(xorNode, rf_wait, RF_FALSE, redFunc, rf_NullNodeUndoFunc, NULL, 1, nfaults, 2 * (nWndNodes + nRodNodes) + 1, nfaults, dag_h, "Xr ", allocList); 1321 xorNode->flags |= RF_DAGNODE_FLAG_YIELD; 1322 for (i = 0; i < nWndNodes; i++) { 1323 xorNode->params[2 * i + 0] = wndNodes[i].params[0]; /* pda */ 1324 xorNode->params[2 * i + 1] = wndNodes[i].params[1]; /* buf ptr */ 1325 } 1326 for (i = 0; i < nRodNodes; i++) { 1327 xorNode->params[2 * (nWndNodes + i) + 0] = rodNodes[i].params[0]; /* pda */ 1328 xorNode->params[2 * (nWndNodes + i) + 1] = rodNodes[i].params[1]; /* buf ptr */ 1329 } 1330 xorNode->params[2 * (nWndNodes + nRodNodes)].p = raidPtr; /* xor node needs to get 1331 * at RAID information */ 1332 1333 /* look for an Rod node that reads a complete SU. If none, alloc a 1334 * buffer to receive the parity info. Note that we can't use a new 1335 * data buffer because it will not have gotten written when the xor 1336 * occurs. */ 1337 if (allowBufferRecycle) { 1338 for (i = 0; i < nRodNodes; i++) 1339 if (((RF_PhysDiskAddr_t *) rodNodes[i].params[0].p)->numSector == raidPtr->Layout.sectorsPerStripeUnit) 1340 break; 1341 } 1342 if ((!allowBufferRecycle) || (i == nRodNodes)) { 1343 RF_CallocAndAdd(xorNode->results[0], 1, rf_RaidAddressToByte(raidPtr, raidPtr->Layout.sectorsPerStripeUnit), (void *), allocList); 1344 } else 1345 xorNode->results[0] = rodNodes[i].params[1].p; 1346 1347 /* initialize the Wnp node */ 1348 rf_InitNode(wnpNode, rf_wait, RF_FALSE, rf_DiskWriteFunc, rf_DiskWriteUndoFunc, rf_GenericWakeupFunc, 1, 1, 4, 0, dag_h, "Wnp", allocList); 1349 wnpNode->params[0].p = asmap->parityInfo; 1350 wnpNode->params[1].p = xorNode->results[0]; 1351 wnpNode->params[2].v = parityStripeID; 1352 wnpNode->params[3].v = RF_CREATE_PARAM3(RF_IO_NORMAL_PRIORITY, 0, 0, which_ru); 1353 RF_ASSERT(asmap->parityInfo->next == NULL); /* parityInfo must 1354 * describe entire 1355 * parity unit */ 1356 1357 if (nfaults == 2) { 1358 /* we never try to recycle a buffer for the Q calcuation in 1359 * addition to the parity. This would cause two buffers to get 1360 * smashed during the P and Q calculation, guaranteeing one 1361 * would be wrong. */ 1362 RF_CallocAndAdd(xorNode->results[1], 1, rf_RaidAddressToByte(raidPtr, raidPtr->Layout.sectorsPerStripeUnit), (void *), allocList); 1363 rf_InitNode(wnqNode, rf_wait, RF_FALSE, rf_DiskWriteFunc, rf_DiskWriteUndoFunc, rf_GenericWakeupFunc, 1, 1, 4, 0, dag_h, "Wnq", allocList); 1364 wnqNode->params[0].p = asmap->qInfo; 1365 wnqNode->params[1].p = xorNode->results[1]; 1366 wnqNode->params[2].v = parityStripeID; 1367 wnqNode->params[3].v = RF_CREATE_PARAM3(RF_IO_NORMAL_PRIORITY, 0, 0, which_ru); 1368 RF_ASSERT(asmap->parityInfo->next == NULL); /* parityInfo must 1369 * describe entire 1370 * parity unit */ 1371 } 1372 /* connect nodes to form graph */ 1373 1374 /* connect dag header to block node */ 1375 RF_ASSERT(blockNode->numAntecedents == 0); 1376 dag_h->succedents[0] = blockNode; 1377 1378 if (nRodNodes > 0) { 1379 /* connect the block node to the Rod nodes */ 1380 RF_ASSERT(blockNode->numSuccedents == nRodNodes); 1381 RF_ASSERT(syncNode->numAntecedents == nRodNodes); 1382 for (i = 0; i < nRodNodes; i++) { 1383 RF_ASSERT(rodNodes[i].numAntecedents == 1); 1384 blockNode->succedents[i] = &rodNodes[i]; 1385 rodNodes[i].antecedents[0] = blockNode; 1386 rodNodes[i].antType[0] = rf_control; 1387 1388 /* connect the Rod nodes to the Nil node */ 1389 RF_ASSERT(rodNodes[i].numSuccedents == 1); 1390 rodNodes[i].succedents[0] = syncNode; 1391 syncNode->antecedents[i] = &rodNodes[i]; 1392 syncNode->antType[i] = rf_trueData; 1393 } 1394 } else { 1395 /* connect the block node to the Nil node */ 1396 RF_ASSERT(blockNode->numSuccedents == 1); 1397 RF_ASSERT(syncNode->numAntecedents == 1); 1398 blockNode->succedents[0] = syncNode; 1399 syncNode->antecedents[0] = blockNode; 1400 syncNode->antType[0] = rf_control; 1401 } 1402 1403 /* connect the sync node to the Wnd nodes */ 1404 RF_ASSERT(syncNode->numSuccedents == (1 + nWndNodes)); 1405 for (i = 0; i < nWndNodes; i++) { 1406 RF_ASSERT(wndNodes->numAntecedents == 1); 1407 syncNode->succedents[i] = &wndNodes[i]; 1408 wndNodes[i].antecedents[0] = syncNode; 1409 wndNodes[i].antType[0] = rf_control; 1410 } 1411 1412 /* connect the sync node to the Xor node */ 1413 RF_ASSERT(xorNode->numAntecedents == 1); 1414 syncNode->succedents[nWndNodes] = xorNode; 1415 xorNode->antecedents[0] = syncNode; 1416 xorNode->antType[0] = rf_control; 1417 1418 /* connect the xor node to the write parity node */ 1419 RF_ASSERT(xorNode->numSuccedents == nfaults); 1420 RF_ASSERT(wnpNode->numAntecedents == 1); 1421 xorNode->succedents[0] = wnpNode; 1422 wnpNode->antecedents[0] = xorNode; 1423 wnpNode->antType[0] = rf_trueData; 1424 if (nfaults == 2) { 1425 RF_ASSERT(wnqNode->numAntecedents == 1); 1426 xorNode->succedents[1] = wnqNode; 1427 wnqNode->antecedents[0] = xorNode; 1428 wnqNode->antType[0] = rf_trueData; 1429 } 1430 /* connect the write nodes to the term node */ 1431 RF_ASSERT(termNode->numAntecedents == nWndNodes + nfaults); 1432 RF_ASSERT(termNode->numSuccedents == 0); 1433 for (i = 0; i < nWndNodes; i++) { 1434 RF_ASSERT(wndNodes->numSuccedents == 1); 1435 wndNodes[i].succedents[0] = termNode; 1436 termNode->antecedents[i] = &wndNodes[i]; 1437 termNode->antType[i] = rf_control; 1438 } 1439 RF_ASSERT(wnpNode->numSuccedents == 1); 1440 wnpNode->succedents[0] = termNode; 1441 termNode->antecedents[nWndNodes] = wnpNode; 1442 termNode->antType[nWndNodes] = rf_control; 1443 if (nfaults == 2) { 1444 RF_ASSERT(wnqNode->numSuccedents == 1); 1445 wnqNode->succedents[0] = termNode; 1446 termNode->antecedents[nWndNodes + 1] = wnqNode; 1447 termNode->antType[nWndNodes + 1] = rf_control; 1448 } 1449 } 1450 1451 1452 /****************************************************************************** 1453 * 1454 * creates a DAG to perform a small-write operation (either raid 5 or pq), 1455 * which is as follows: 1456 * 1457 * Hdr -> Nil -> Rop - Xor - Wnp [Unp] -- Trm 1458 * \- Rod X- Wnd [Und] -------/ 1459 * [\- Rod X- Wnd [Und] ------/] 1460 * [\- Roq - Q --> Wnq [Unq]-/] 1461 * 1462 * Rop = read old parity 1463 * Rod = read old data 1464 * Roq = read old "q" 1465 * Cmt = commit node 1466 * Und = unlock data disk 1467 * Unp = unlock parity disk 1468 * Unq = unlock q disk 1469 * Wnp = write new parity 1470 * Wnd = write new data 1471 * Wnq = write new "q" 1472 * [ ] denotes optional segments in the graph 1473 * 1474 * Parameters: raidPtr - description of the physical array 1475 * asmap - logical & physical addresses for this access 1476 * bp - buffer ptr (holds write data) 1477 * flags - general flags (e.g. disk locking) 1478 * allocList - list of memory allocated in DAG creation 1479 * pfuncs - list of parity generating functions 1480 * qfuncs - list of q generating functions 1481 * 1482 * A null qfuncs indicates single fault tolerant 1483 *****************************************************************************/ 1484 1485 void 1486 rf_CommonCreateSmallWriteDAGFwd( 1487 RF_Raid_t * raidPtr, 1488 RF_AccessStripeMap_t * asmap, 1489 RF_DagHeader_t * dag_h, 1490 void *bp, 1491 RF_RaidAccessFlags_t flags, 1492 RF_AllocListElem_t * allocList, 1493 RF_RedFuncs_t * pfuncs, 1494 RF_RedFuncs_t * qfuncs) 1495 { 1496 RF_DagNode_t *readDataNodes, *readParityNodes, *readQNodes, *termNode; 1497 RF_DagNode_t *unlockDataNodes, *unlockParityNodes, *unlockQNodes; 1498 RF_DagNode_t *xorNodes, *qNodes, *blockNode, *nodes; 1499 RF_DagNode_t *writeDataNodes, *writeParityNodes, *writeQNodes; 1500 int i, j, nNodes, totalNumNodes, lu_flag; 1501 RF_ReconUnitNum_t which_ru; 1502 int (*func) (RF_DagNode_t *), (*undoFunc) (RF_DagNode_t *); 1503 int (*qfunc) (RF_DagNode_t *); 1504 int numDataNodes, numParityNodes; 1505 RF_StripeNum_t parityStripeID; 1506 RF_PhysDiskAddr_t *pda; 1507 char *name, *qname; 1508 long nfaults; 1509 1510 nfaults = qfuncs ? 2 : 1; 1511 lu_flag = (rf_enableAtomicRMW) ? 1 : 0; /* lock/unlock flag */ 1512 1513 parityStripeID = rf_RaidAddressToParityStripeID(&(raidPtr->Layout), asmap->raidAddress, &which_ru); 1514 pda = asmap->physInfo; 1515 numDataNodes = asmap->numStripeUnitsAccessed; 1516 numParityNodes = (asmap->parityInfo->next) ? 2 : 1; 1517 1518 if (rf_dagDebug) 1519 printf("[Creating small-write DAG]\n"); 1520 RF_ASSERT(numDataNodes > 0); 1521 dag_h->creator = "SmallWriteDAGFwd"; 1522 1523 dag_h->numCommitNodes = 0; 1524 dag_h->numCommits = 0; 1525 dag_h->numSuccedents = 1; 1526 1527 qfunc = NULL; 1528 qname = NULL; 1529 1530 /* DAG creation occurs in four steps: 1. count the number of nodes in 1531 * the DAG 2. create the nodes 3. initialize the nodes 4. connect the 1532 * nodes */ 1533 1534 /* Step 1. compute number of nodes in the graph */ 1535 1536 /* number of nodes: a read and write for each data unit a redundancy 1537 * computation node for each parity node (nfaults * nparity) a read 1538 * and write for each parity unit a block node a terminate node if 1539 * atomic RMW an unlock node for each data unit, redundancy unit */ 1540 totalNumNodes = (2 * numDataNodes) + (nfaults * numParityNodes) + (nfaults * 2 * numParityNodes) + 2; 1541 if (lu_flag) 1542 totalNumNodes += (numDataNodes + (nfaults * numParityNodes)); 1543 1544 1545 /* Step 2. create the nodes */ 1546 RF_CallocAndAdd(nodes, totalNumNodes, sizeof(RF_DagNode_t), (RF_DagNode_t *), allocList); 1547 i = 0; 1548 blockNode = &nodes[i]; 1549 i += 1; 1550 readDataNodes = &nodes[i]; 1551 i += numDataNodes; 1552 readParityNodes = &nodes[i]; 1553 i += numParityNodes; 1554 writeDataNodes = &nodes[i]; 1555 i += numDataNodes; 1556 writeParityNodes = &nodes[i]; 1557 i += numParityNodes; 1558 xorNodes = &nodes[i]; 1559 i += numParityNodes; 1560 termNode = &nodes[i]; 1561 i += 1; 1562 if (lu_flag) { 1563 unlockDataNodes = &nodes[i]; 1564 i += numDataNodes; 1565 unlockParityNodes = &nodes[i]; 1566 i += numParityNodes; 1567 } else { 1568 unlockDataNodes = unlockParityNodes = NULL; 1569 } 1570 if (nfaults == 2) { 1571 readQNodes = &nodes[i]; 1572 i += numParityNodes; 1573 writeQNodes = &nodes[i]; 1574 i += numParityNodes; 1575 qNodes = &nodes[i]; 1576 i += numParityNodes; 1577 if (lu_flag) { 1578 unlockQNodes = &nodes[i]; 1579 i += numParityNodes; 1580 } else { 1581 unlockQNodes = NULL; 1582 } 1583 } else { 1584 readQNodes = writeQNodes = qNodes = unlockQNodes = NULL; 1585 } 1586 RF_ASSERT(i == totalNumNodes); 1587 1588 /* Step 3. initialize the nodes */ 1589 /* initialize block node (Nil) */ 1590 nNodes = numDataNodes + (nfaults * numParityNodes); 1591 rf_InitNode(blockNode, rf_wait, RF_FALSE, rf_NullNodeFunc, rf_NullNodeUndoFunc, NULL, nNodes, 0, 0, 0, dag_h, "Nil", allocList); 1592 1593 /* initialize terminate node (Trm) */ 1594 rf_InitNode(termNode, rf_wait, RF_FALSE, rf_TerminateFunc, rf_TerminateUndoFunc, NULL, 0, nNodes, 0, 0, dag_h, "Trm", allocList); 1595 1596 /* initialize nodes which read old data (Rod) */ 1597 for (i = 0; i < numDataNodes; i++) { 1598 rf_InitNode(&readDataNodes[i], rf_wait, RF_FALSE, rf_DiskReadFunc, rf_DiskReadUndoFunc, rf_GenericWakeupFunc, (numParityNodes * nfaults) + 1, 1, 4, 0, dag_h, "Rod", allocList); 1599 RF_ASSERT(pda != NULL); 1600 readDataNodes[i].params[0].p = pda; /* physical disk addr 1601 * desc */ 1602 readDataNodes[i].params[1].p = rf_AllocBuffer(raidPtr, dag_h, pda, allocList); /* buffer to hold old 1603 * data */ 1604 readDataNodes[i].params[2].v = parityStripeID; 1605 readDataNodes[i].params[3].v = RF_CREATE_PARAM3(RF_IO_NORMAL_PRIORITY, lu_flag, 0, which_ru); 1606 pda = pda->next; 1607 for (j = 0; j < readDataNodes[i].numSuccedents; j++) 1608 readDataNodes[i].propList[j] = NULL; 1609 } 1610 1611 /* initialize nodes which read old parity (Rop) */ 1612 pda = asmap->parityInfo; 1613 i = 0; 1614 for (i = 0; i < numParityNodes; i++) { 1615 RF_ASSERT(pda != NULL); 1616 rf_InitNode(&readParityNodes[i], rf_wait, RF_FALSE, rf_DiskReadFunc, rf_DiskReadUndoFunc, rf_GenericWakeupFunc, numParityNodes, 1, 4, 0, dag_h, "Rop", allocList); 1617 readParityNodes[i].params[0].p = pda; 1618 readParityNodes[i].params[1].p = rf_AllocBuffer(raidPtr, dag_h, pda, allocList); /* buffer to hold old 1619 * parity */ 1620 readParityNodes[i].params[2].v = parityStripeID; 1621 readParityNodes[i].params[3].v = RF_CREATE_PARAM3(RF_IO_NORMAL_PRIORITY, lu_flag, 0, which_ru); 1622 for (j = 0; j < readParityNodes[i].numSuccedents; j++) 1623 readParityNodes[i].propList[0] = NULL; 1624 pda = pda->next; 1625 } 1626 1627 /* initialize nodes which read old Q (Roq) */ 1628 if (nfaults == 2) { 1629 pda = asmap->qInfo; 1630 for (i = 0; i < numParityNodes; i++) { 1631 RF_ASSERT(pda != NULL); 1632 rf_InitNode(&readQNodes[i], rf_wait, RF_FALSE, rf_DiskReadFunc, rf_DiskReadUndoFunc, rf_GenericWakeupFunc, numParityNodes, 1, 4, 0, dag_h, "Roq", allocList); 1633 readQNodes[i].params[0].p = pda; 1634 readQNodes[i].params[1].p = rf_AllocBuffer(raidPtr, dag_h, pda, allocList); /* buffer to hold old Q */ 1635 readQNodes[i].params[2].v = parityStripeID; 1636 readQNodes[i].params[3].v = RF_CREATE_PARAM3(RF_IO_NORMAL_PRIORITY, lu_flag, 0, which_ru); 1637 for (j = 0; j < readQNodes[i].numSuccedents; j++) 1638 readQNodes[i].propList[0] = NULL; 1639 pda = pda->next; 1640 } 1641 } 1642 /* initialize nodes which write new data (Wnd) */ 1643 pda = asmap->physInfo; 1644 for (i = 0; i < numDataNodes; i++) { 1645 RF_ASSERT(pda != NULL); 1646 rf_InitNode(&writeDataNodes[i], rf_wait, RF_FALSE, rf_DiskWriteFunc, rf_DiskWriteUndoFunc, rf_GenericWakeupFunc, 1, 1, 4, 0, dag_h, "Wnd", allocList); 1647 writeDataNodes[i].params[0].p = pda; /* physical disk addr 1648 * desc */ 1649 writeDataNodes[i].params[1].p = pda->bufPtr; /* buffer holding new 1650 * data to be written */ 1651 writeDataNodes[i].params[2].v = parityStripeID; 1652 writeDataNodes[i].params[3].v = RF_CREATE_PARAM3(RF_IO_NORMAL_PRIORITY, 0, 0, which_ru); 1653 1654 if (lu_flag) { 1655 /* initialize node to unlock the disk queue */ 1656 rf_InitNode(&unlockDataNodes[i], rf_wait, RF_FALSE, rf_DiskUnlockFunc, rf_DiskUnlockUndoFunc, rf_GenericWakeupFunc, 1, 1, 2, 0, dag_h, "Und", allocList); 1657 unlockDataNodes[i].params[0].p = pda; /* physical disk addr 1658 * desc */ 1659 unlockDataNodes[i].params[1].v = RF_CREATE_PARAM3(RF_IO_NORMAL_PRIORITY, 0, lu_flag, which_ru); 1660 } 1661 pda = pda->next; 1662 } 1663 1664 1665 /* initialize nodes which compute new parity and Q */ 1666 /* we use the simple XOR func in the double-XOR case, and when we're 1667 * accessing only a portion of one stripe unit. the distinction 1668 * between the two is that the regular XOR func assumes that the 1669 * targbuf is a full SU in size, and examines the pda associated with 1670 * the buffer to decide where within the buffer to XOR the data, 1671 * whereas the simple XOR func just XORs the data into the start of 1672 * the buffer. */ 1673 if ((numParityNodes == 2) || ((numDataNodes == 1) && (asmap->totalSectorsAccessed < raidPtr->Layout.sectorsPerStripeUnit))) { 1674 func = pfuncs->simple; 1675 undoFunc = rf_NullNodeUndoFunc; 1676 name = pfuncs->SimpleName; 1677 if (qfuncs) { 1678 qfunc = qfuncs->simple; 1679 qname = qfuncs->SimpleName; 1680 } 1681 } else { 1682 func = pfuncs->regular; 1683 undoFunc = rf_NullNodeUndoFunc; 1684 name = pfuncs->RegularName; 1685 if (qfuncs) { 1686 qfunc = qfuncs->regular; 1687 qname = qfuncs->RegularName; 1688 } 1689 } 1690 /* initialize the xor nodes: params are {pda,buf} from {Rod,Wnd,Rop} 1691 * nodes, and raidPtr */ 1692 if (numParityNodes == 2) { /* double-xor case */ 1693 for (i = 0; i < numParityNodes; i++) { 1694 rf_InitNode(&xorNodes[i], rf_wait, RF_FALSE, func, undoFunc, NULL, numParityNodes, numParityNodes + numDataNodes, 7, 1, dag_h, name, allocList); /* no wakeup func for 1695 * xor */ 1696 xorNodes[i].flags |= RF_DAGNODE_FLAG_YIELD; 1697 xorNodes[i].params[0] = readDataNodes[i].params[0]; 1698 xorNodes[i].params[1] = readDataNodes[i].params[1]; 1699 xorNodes[i].params[2] = readParityNodes[i].params[0]; 1700 xorNodes[i].params[3] = readParityNodes[i].params[1]; 1701 xorNodes[i].params[4] = writeDataNodes[i].params[0]; 1702 xorNodes[i].params[5] = writeDataNodes[i].params[1]; 1703 xorNodes[i].params[6].p = raidPtr; 1704 xorNodes[i].results[0] = readParityNodes[i].params[1].p; /* use old parity buf as 1705 * target buf */ 1706 if (nfaults == 2) { 1707 rf_InitNode(&qNodes[i], rf_wait, RF_FALSE, qfunc, undoFunc, NULL, numParityNodes, numParityNodes + numDataNodes, 7, 1, dag_h, qname, allocList); /* no wakeup func for 1708 * xor */ 1709 qNodes[i].params[0] = readDataNodes[i].params[0]; 1710 qNodes[i].params[1] = readDataNodes[i].params[1]; 1711 qNodes[i].params[2] = readQNodes[i].params[0]; 1712 qNodes[i].params[3] = readQNodes[i].params[1]; 1713 qNodes[i].params[4] = writeDataNodes[i].params[0]; 1714 qNodes[i].params[5] = writeDataNodes[i].params[1]; 1715 qNodes[i].params[6].p = raidPtr; 1716 qNodes[i].results[0] = readQNodes[i].params[1].p; /* use old Q buf as 1717 * target buf */ 1718 } 1719 } 1720 } else { 1721 /* there is only one xor node in this case */ 1722 rf_InitNode(&xorNodes[0], rf_wait, RF_FALSE, func, undoFunc, NULL, numParityNodes, numParityNodes + numDataNodes, (2 * (numDataNodes + numDataNodes + 1) + 1), 1, dag_h, name, allocList); 1723 xorNodes[0].flags |= RF_DAGNODE_FLAG_YIELD; 1724 for (i = 0; i < numDataNodes + 1; i++) { 1725 /* set up params related to Rod and Rop nodes */ 1726 xorNodes[0].params[2 * i + 0] = readDataNodes[i].params[0]; /* pda */ 1727 xorNodes[0].params[2 * i + 1] = readDataNodes[i].params[1]; /* buffer pointer */ 1728 } 1729 for (i = 0; i < numDataNodes; i++) { 1730 /* set up params related to Wnd and Wnp nodes */ 1731 xorNodes[0].params[2 * (numDataNodes + 1 + i) + 0] = writeDataNodes[i].params[0]; /* pda */ 1732 xorNodes[0].params[2 * (numDataNodes + 1 + i) + 1] = writeDataNodes[i].params[1]; /* buffer pointer */ 1733 } 1734 xorNodes[0].params[2 * (numDataNodes + numDataNodes + 1)].p = raidPtr; /* xor node needs to get 1735 * at RAID information */ 1736 xorNodes[0].results[0] = readParityNodes[0].params[1].p; 1737 if (nfaults == 2) { 1738 rf_InitNode(&qNodes[0], rf_wait, RF_FALSE, qfunc, undoFunc, NULL, numParityNodes, numParityNodes + numDataNodes, (2 * (numDataNodes + numDataNodes + 1) + 1), 1, dag_h, qname, allocList); 1739 for (i = 0; i < numDataNodes; i++) { 1740 /* set up params related to Rod */ 1741 qNodes[0].params[2 * i + 0] = readDataNodes[i].params[0]; /* pda */ 1742 qNodes[0].params[2 * i + 1] = readDataNodes[i].params[1]; /* buffer pointer */ 1743 } 1744 /* and read old q */ 1745 qNodes[0].params[2 * numDataNodes + 0] = readQNodes[0].params[0]; /* pda */ 1746 qNodes[0].params[2 * numDataNodes + 1] = readQNodes[0].params[1]; /* buffer pointer */ 1747 for (i = 0; i < numDataNodes; i++) { 1748 /* set up params related to Wnd nodes */ 1749 qNodes[0].params[2 * (numDataNodes + 1 + i) + 0] = writeDataNodes[i].params[0]; /* pda */ 1750 qNodes[0].params[2 * (numDataNodes + 1 + i) + 1] = writeDataNodes[i].params[1]; /* buffer pointer */ 1751 } 1752 qNodes[0].params[2 * (numDataNodes + numDataNodes + 1)].p = raidPtr; /* xor node needs to get 1753 * at RAID information */ 1754 qNodes[0].results[0] = readQNodes[0].params[1].p; 1755 } 1756 } 1757 1758 /* initialize nodes which write new parity (Wnp) */ 1759 pda = asmap->parityInfo; 1760 for (i = 0; i < numParityNodes; i++) { 1761 rf_InitNode(&writeParityNodes[i], rf_wait, RF_FALSE, rf_DiskWriteFunc, rf_DiskWriteUndoFunc, rf_GenericWakeupFunc, 1, numParityNodes, 4, 0, dag_h, "Wnp", allocList); 1762 RF_ASSERT(pda != NULL); 1763 writeParityNodes[i].params[0].p = pda; /* param 1 (bufPtr) 1764 * filled in by xor node */ 1765 writeParityNodes[i].params[1].p = xorNodes[i].results[0]; /* buffer pointer for 1766 * parity write 1767 * operation */ 1768 writeParityNodes[i].params[2].v = parityStripeID; 1769 writeParityNodes[i].params[3].v = RF_CREATE_PARAM3(RF_IO_NORMAL_PRIORITY, 0, 0, which_ru); 1770 1771 if (lu_flag) { 1772 /* initialize node to unlock the disk queue */ 1773 rf_InitNode(&unlockParityNodes[i], rf_wait, RF_FALSE, rf_DiskUnlockFunc, rf_DiskUnlockUndoFunc, rf_GenericWakeupFunc, 1, 1, 2, 0, dag_h, "Unp", allocList); 1774 unlockParityNodes[i].params[0].p = pda; /* physical disk addr 1775 * desc */ 1776 unlockParityNodes[i].params[1].v = RF_CREATE_PARAM3(RF_IO_NORMAL_PRIORITY, 0, lu_flag, which_ru); 1777 } 1778 pda = pda->next; 1779 } 1780 1781 /* initialize nodes which write new Q (Wnq) */ 1782 if (nfaults == 2) { 1783 pda = asmap->qInfo; 1784 for (i = 0; i < numParityNodes; i++) { 1785 rf_InitNode(&writeQNodes[i], rf_wait, RF_FALSE, rf_DiskWriteFunc, rf_DiskWriteUndoFunc, rf_GenericWakeupFunc, 1, numParityNodes, 4, 0, dag_h, "Wnq", allocList); 1786 RF_ASSERT(pda != NULL); 1787 writeQNodes[i].params[0].p = pda; /* param 1 (bufPtr) 1788 * filled in by xor node */ 1789 writeQNodes[i].params[1].p = qNodes[i].results[0]; /* buffer pointer for 1790 * parity write 1791 * operation */ 1792 writeQNodes[i].params[2].v = parityStripeID; 1793 writeQNodes[i].params[3].v = RF_CREATE_PARAM3(RF_IO_NORMAL_PRIORITY, 0, 0, which_ru); 1794 1795 if (lu_flag) { 1796 /* initialize node to unlock the disk queue */ 1797 rf_InitNode(&unlockQNodes[i], rf_wait, RF_FALSE, rf_DiskUnlockFunc, rf_DiskUnlockUndoFunc, rf_GenericWakeupFunc, 1, 1, 2, 0, dag_h, "Unq", allocList); 1798 unlockQNodes[i].params[0].p = pda; /* physical disk addr 1799 * desc */ 1800 unlockQNodes[i].params[1].v = RF_CREATE_PARAM3(RF_IO_NORMAL_PRIORITY, 0, lu_flag, which_ru); 1801 } 1802 pda = pda->next; 1803 } 1804 } 1805 /* Step 4. connect the nodes */ 1806 1807 /* connect header to block node */ 1808 dag_h->succedents[0] = blockNode; 1809 1810 /* connect block node to read old data nodes */ 1811 RF_ASSERT(blockNode->numSuccedents == (numDataNodes + (numParityNodes * nfaults))); 1812 for (i = 0; i < numDataNodes; i++) { 1813 blockNode->succedents[i] = &readDataNodes[i]; 1814 RF_ASSERT(readDataNodes[i].numAntecedents == 1); 1815 readDataNodes[i].antecedents[0] = blockNode; 1816 readDataNodes[i].antType[0] = rf_control; 1817 } 1818 1819 /* connect block node to read old parity nodes */ 1820 for (i = 0; i < numParityNodes; i++) { 1821 blockNode->succedents[numDataNodes + i] = &readParityNodes[i]; 1822 RF_ASSERT(readParityNodes[i].numAntecedents == 1); 1823 readParityNodes[i].antecedents[0] = blockNode; 1824 readParityNodes[i].antType[0] = rf_control; 1825 } 1826 1827 /* connect block node to read old Q nodes */ 1828 if (nfaults == 2) 1829 for (i = 0; i < numParityNodes; i++) { 1830 blockNode->succedents[numDataNodes + numParityNodes + i] = &readQNodes[i]; 1831 RF_ASSERT(readQNodes[i].numAntecedents == 1); 1832 readQNodes[i].antecedents[0] = blockNode; 1833 readQNodes[i].antType[0] = rf_control; 1834 } 1835 1836 /* connect read old data nodes to write new data nodes */ 1837 for (i = 0; i < numDataNodes; i++) { 1838 RF_ASSERT(readDataNodes[i].numSuccedents == ((nfaults * numParityNodes) + 1)); 1839 RF_ASSERT(writeDataNodes[i].numAntecedents == 1); 1840 readDataNodes[i].succedents[0] = &writeDataNodes[i]; 1841 writeDataNodes[i].antecedents[0] = &readDataNodes[i]; 1842 writeDataNodes[i].antType[0] = rf_antiData; 1843 } 1844 1845 /* connect read old data nodes to xor nodes */ 1846 for (i = 0; i < numDataNodes; i++) { 1847 for (j = 0; j < numParityNodes; j++) { 1848 RF_ASSERT(xorNodes[j].numAntecedents == numDataNodes + numParityNodes); 1849 readDataNodes[i].succedents[1 + j] = &xorNodes[j]; 1850 xorNodes[j].antecedents[i] = &readDataNodes[i]; 1851 xorNodes[j].antType[i] = rf_trueData; 1852 } 1853 } 1854 1855 /* connect read old data nodes to q nodes */ 1856 if (nfaults == 2) 1857 for (i = 0; i < numDataNodes; i++) 1858 for (j = 0; j < numParityNodes; j++) { 1859 RF_ASSERT(qNodes[j].numAntecedents == numDataNodes + numParityNodes); 1860 readDataNodes[i].succedents[1 + numParityNodes + j] = &qNodes[j]; 1861 qNodes[j].antecedents[i] = &readDataNodes[i]; 1862 qNodes[j].antType[i] = rf_trueData; 1863 } 1864 1865 /* connect read old parity nodes to xor nodes */ 1866 for (i = 0; i < numParityNodes; i++) { 1867 for (j = 0; j < numParityNodes; j++) { 1868 RF_ASSERT(readParityNodes[i].numSuccedents == numParityNodes); 1869 readParityNodes[i].succedents[j] = &xorNodes[j]; 1870 xorNodes[j].antecedents[numDataNodes + i] = &readParityNodes[i]; 1871 xorNodes[j].antType[numDataNodes + i] = rf_trueData; 1872 } 1873 } 1874 1875 /* connect read old q nodes to q nodes */ 1876 if (nfaults == 2) 1877 for (i = 0; i < numParityNodes; i++) { 1878 for (j = 0; j < numParityNodes; j++) { 1879 RF_ASSERT(readQNodes[i].numSuccedents == numParityNodes); 1880 readQNodes[i].succedents[j] = &qNodes[j]; 1881 qNodes[j].antecedents[numDataNodes + i] = &readQNodes[i]; 1882 qNodes[j].antType[numDataNodes + i] = rf_trueData; 1883 } 1884 } 1885 1886 /* connect xor nodes to the write new parity nodes */ 1887 for (i = 0; i < numParityNodes; i++) { 1888 RF_ASSERT(writeParityNodes[i].numAntecedents == numParityNodes); 1889 for (j = 0; j < numParityNodes; j++) { 1890 RF_ASSERT(xorNodes[j].numSuccedents == numParityNodes); 1891 xorNodes[i].succedents[j] = &writeParityNodes[j]; 1892 writeParityNodes[j].antecedents[i] = &xorNodes[i]; 1893 writeParityNodes[j].antType[i] = rf_trueData; 1894 } 1895 } 1896 1897 /* connect q nodes to the write new q nodes */ 1898 if (nfaults == 2) 1899 for (i = 0; i < numParityNodes; i++) { 1900 RF_ASSERT(writeQNodes[i].numAntecedents == numParityNodes); 1901 for (j = 0; j < numParityNodes; j++) { 1902 RF_ASSERT(qNodes[j].numSuccedents == 1); 1903 qNodes[i].succedents[j] = &writeQNodes[j]; 1904 writeQNodes[j].antecedents[i] = &qNodes[i]; 1905 writeQNodes[j].antType[i] = rf_trueData; 1906 } 1907 } 1908 1909 RF_ASSERT(termNode->numAntecedents == (numDataNodes + (nfaults * numParityNodes))); 1910 RF_ASSERT(termNode->numSuccedents == 0); 1911 for (i = 0; i < numDataNodes; i++) { 1912 if (lu_flag) { 1913 /* connect write new data nodes to unlock nodes */ 1914 RF_ASSERT(writeDataNodes[i].numSuccedents == 1); 1915 RF_ASSERT(unlockDataNodes[i].numAntecedents == 1); 1916 writeDataNodes[i].succedents[0] = &unlockDataNodes[i]; 1917 unlockDataNodes[i].antecedents[0] = &writeDataNodes[i]; 1918 unlockDataNodes[i].antType[0] = rf_control; 1919 1920 /* connect unlock nodes to term node */ 1921 RF_ASSERT(unlockDataNodes[i].numSuccedents == 1); 1922 unlockDataNodes[i].succedents[0] = termNode; 1923 termNode->antecedents[i] = &unlockDataNodes[i]; 1924 termNode->antType[i] = rf_control; 1925 } else { 1926 /* connect write new data nodes to term node */ 1927 RF_ASSERT(writeDataNodes[i].numSuccedents == 1); 1928 RF_ASSERT(termNode->numAntecedents == (numDataNodes + (nfaults * numParityNodes))); 1929 writeDataNodes[i].succedents[0] = termNode; 1930 termNode->antecedents[i] = &writeDataNodes[i]; 1931 termNode->antType[i] = rf_control; 1932 } 1933 } 1934 1935 for (i = 0; i < numParityNodes; i++) { 1936 if (lu_flag) { 1937 /* connect write new parity nodes to unlock nodes */ 1938 RF_ASSERT(writeParityNodes[i].numSuccedents == 1); 1939 RF_ASSERT(unlockParityNodes[i].numAntecedents == 1); 1940 writeParityNodes[i].succedents[0] = &unlockParityNodes[i]; 1941 unlockParityNodes[i].antecedents[0] = &writeParityNodes[i]; 1942 unlockParityNodes[i].antType[0] = rf_control; 1943 1944 /* connect unlock nodes to term node */ 1945 RF_ASSERT(unlockParityNodes[i].numSuccedents == 1); 1946 unlockParityNodes[i].succedents[0] = termNode; 1947 termNode->antecedents[numDataNodes + i] = &unlockParityNodes[i]; 1948 termNode->antType[numDataNodes + i] = rf_control; 1949 } else { 1950 RF_ASSERT(writeParityNodes[i].numSuccedents == 1); 1951 writeParityNodes[i].succedents[0] = termNode; 1952 termNode->antecedents[numDataNodes + i] = &writeParityNodes[i]; 1953 termNode->antType[numDataNodes + i] = rf_control; 1954 } 1955 } 1956 1957 if (nfaults == 2) 1958 for (i = 0; i < numParityNodes; i++) { 1959 if (lu_flag) { 1960 /* connect write new Q nodes to unlock nodes */ 1961 RF_ASSERT(writeQNodes[i].numSuccedents == 1); 1962 RF_ASSERT(unlockQNodes[i].numAntecedents == 1); 1963 writeQNodes[i].succedents[0] = &unlockQNodes[i]; 1964 unlockQNodes[i].antecedents[0] = &writeQNodes[i]; 1965 unlockQNodes[i].antType[0] = rf_control; 1966 1967 /* connect unlock nodes to unblock node */ 1968 RF_ASSERT(unlockQNodes[i].numSuccedents == 1); 1969 unlockQNodes[i].succedents[0] = termNode; 1970 termNode->antecedents[numDataNodes + numParityNodes + i] = &unlockQNodes[i]; 1971 termNode->antType[numDataNodes + numParityNodes + i] = rf_control; 1972 } else { 1973 RF_ASSERT(writeQNodes[i].numSuccedents == 1); 1974 writeQNodes[i].succedents[0] = termNode; 1975 termNode->antecedents[numDataNodes + numParityNodes + i] = &writeQNodes[i]; 1976 termNode->antType[numDataNodes + numParityNodes + i] = rf_control; 1977 } 1978 } 1979 } 1980 1981 1982 1983 /****************************************************************************** 1984 * create a write graph (fault-free or degraded) for RAID level 1 1985 * 1986 * Hdr Nil -> Wpd -> Nil -> Trm 1987 * Nil -> Wsd -> 1988 * 1989 * The "Wpd" node writes data to the primary copy in the mirror pair 1990 * The "Wsd" node writes data to the secondary copy in the mirror pair 1991 * 1992 * Parameters: raidPtr - description of the physical array 1993 * asmap - logical & physical addresses for this access 1994 * bp - buffer ptr (holds write data) 1995 * flags - general flags (e.g. disk locking) 1996 * allocList - list of memory allocated in DAG creation 1997 *****************************************************************************/ 1998 1999 void 2000 rf_CreateRaidOneWriteDAGFwd( 2001 RF_Raid_t * raidPtr, 2002 RF_AccessStripeMap_t * asmap, 2003 RF_DagHeader_t * dag_h, 2004 void *bp, 2005 RF_RaidAccessFlags_t flags, 2006 RF_AllocListElem_t * allocList) 2007 { 2008 RF_DagNode_t *blockNode, *unblockNode, *termNode; 2009 RF_DagNode_t *nodes, *wndNode, *wmirNode; 2010 int nWndNodes, nWmirNodes, i; 2011 RF_ReconUnitNum_t which_ru; 2012 RF_PhysDiskAddr_t *pda, *pdaP; 2013 RF_StripeNum_t parityStripeID; 2014 2015 parityStripeID = rf_RaidAddressToParityStripeID(&(raidPtr->Layout), 2016 asmap->raidAddress, &which_ru); 2017 if (rf_dagDebug) { 2018 printf("[Creating RAID level 1 write DAG]\n"); 2019 } 2020 nWmirNodes = (asmap->parityInfo->next) ? 2 : 1; /* 2 implies access not 2021 * SU aligned */ 2022 nWndNodes = (asmap->physInfo->next) ? 2 : 1; 2023 2024 /* alloc the Wnd nodes and the Wmir node */ 2025 if (asmap->numDataFailed == 1) 2026 nWndNodes--; 2027 if (asmap->numParityFailed == 1) 2028 nWmirNodes--; 2029 2030 /* total number of nodes = nWndNodes + nWmirNodes + (block + unblock + 2031 * terminator) */ 2032 RF_CallocAndAdd(nodes, nWndNodes + nWmirNodes + 3, sizeof(RF_DagNode_t), (RF_DagNode_t *), allocList); 2033 i = 0; 2034 wndNode = &nodes[i]; 2035 i += nWndNodes; 2036 wmirNode = &nodes[i]; 2037 i += nWmirNodes; 2038 blockNode = &nodes[i]; 2039 i += 1; 2040 unblockNode = &nodes[i]; 2041 i += 1; 2042 termNode = &nodes[i]; 2043 i += 1; 2044 RF_ASSERT(i == (nWndNodes + nWmirNodes + 3)); 2045 2046 /* this dag can commit immediately */ 2047 dag_h->numCommitNodes = 0; 2048 dag_h->numCommits = 0; 2049 dag_h->numSuccedents = 1; 2050 2051 /* initialize the unblock and term nodes */ 2052 rf_InitNode(blockNode, rf_wait, RF_FALSE, rf_NullNodeFunc, rf_NullNodeUndoFunc, NULL, (nWndNodes + nWmirNodes), 0, 0, 0, dag_h, "Nil", allocList); 2053 rf_InitNode(unblockNode, rf_wait, RF_FALSE, rf_NullNodeFunc, rf_NullNodeUndoFunc, NULL, 1, (nWndNodes + nWmirNodes), 0, 0, dag_h, "Nil", allocList); 2054 rf_InitNode(termNode, rf_wait, RF_FALSE, rf_TerminateFunc, rf_TerminateUndoFunc, NULL, 0, 1, 0, 0, dag_h, "Trm", allocList); 2055 2056 /* initialize the wnd nodes */ 2057 if (nWndNodes > 0) { 2058 pda = asmap->physInfo; 2059 for (i = 0; i < nWndNodes; i++) { 2060 rf_InitNode(&wndNode[i], rf_wait, RF_FALSE, rf_DiskWriteFunc, rf_DiskWriteUndoFunc, rf_GenericWakeupFunc, 1, 1, 4, 0, dag_h, "Wpd", allocList); 2061 RF_ASSERT(pda != NULL); 2062 wndNode[i].params[0].p = pda; 2063 wndNode[i].params[1].p = pda->bufPtr; 2064 wndNode[i].params[2].v = parityStripeID; 2065 wndNode[i].params[3].v = RF_CREATE_PARAM3(RF_IO_NORMAL_PRIORITY, 0, 0, which_ru); 2066 pda = pda->next; 2067 } 2068 RF_ASSERT(pda == NULL); 2069 } 2070 /* initialize the mirror nodes */ 2071 if (nWmirNodes > 0) { 2072 pda = asmap->physInfo; 2073 pdaP = asmap->parityInfo; 2074 for (i = 0; i < nWmirNodes; i++) { 2075 rf_InitNode(&wmirNode[i], rf_wait, RF_FALSE, rf_DiskWriteFunc, rf_DiskWriteUndoFunc, rf_GenericWakeupFunc, 1, 1, 4, 0, dag_h, "Wsd", allocList); 2076 RF_ASSERT(pda != NULL); 2077 wmirNode[i].params[0].p = pdaP; 2078 wmirNode[i].params[1].p = pda->bufPtr; 2079 wmirNode[i].params[2].v = parityStripeID; 2080 wmirNode[i].params[3].v = RF_CREATE_PARAM3(RF_IO_NORMAL_PRIORITY, 0, 0, which_ru); 2081 pda = pda->next; 2082 pdaP = pdaP->next; 2083 } 2084 RF_ASSERT(pda == NULL); 2085 RF_ASSERT(pdaP == NULL); 2086 } 2087 /* link the header node to the block node */ 2088 RF_ASSERT(dag_h->numSuccedents == 1); 2089 RF_ASSERT(blockNode->numAntecedents == 0); 2090 dag_h->succedents[0] = blockNode; 2091 2092 /* link the block node to the write nodes */ 2093 RF_ASSERT(blockNode->numSuccedents == (nWndNodes + nWmirNodes)); 2094 for (i = 0; i < nWndNodes; i++) { 2095 RF_ASSERT(wndNode[i].numAntecedents == 1); 2096 blockNode->succedents[i] = &wndNode[i]; 2097 wndNode[i].antecedents[0] = blockNode; 2098 wndNode[i].antType[0] = rf_control; 2099 } 2100 for (i = 0; i < nWmirNodes; i++) { 2101 RF_ASSERT(wmirNode[i].numAntecedents == 1); 2102 blockNode->succedents[i + nWndNodes] = &wmirNode[i]; 2103 wmirNode[i].antecedents[0] = blockNode; 2104 wmirNode[i].antType[0] = rf_control; 2105 } 2106 2107 /* link the write nodes to the unblock node */ 2108 RF_ASSERT(unblockNode->numAntecedents == (nWndNodes + nWmirNodes)); 2109 for (i = 0; i < nWndNodes; i++) { 2110 RF_ASSERT(wndNode[i].numSuccedents == 1); 2111 wndNode[i].succedents[0] = unblockNode; 2112 unblockNode->antecedents[i] = &wndNode[i]; 2113 unblockNode->antType[i] = rf_control; 2114 } 2115 for (i = 0; i < nWmirNodes; i++) { 2116 RF_ASSERT(wmirNode[i].numSuccedents == 1); 2117 wmirNode[i].succedents[0] = unblockNode; 2118 unblockNode->antecedents[i + nWndNodes] = &wmirNode[i]; 2119 unblockNode->antType[i + nWndNodes] = rf_control; 2120 } 2121 2122 /* link the unblock node to the term node */ 2123 RF_ASSERT(unblockNode->numSuccedents == 1); 2124 RF_ASSERT(termNode->numAntecedents == 1); 2125 RF_ASSERT(termNode->numSuccedents == 0); 2126 unblockNode->succedents[0] = termNode; 2127 termNode->antecedents[0] = unblockNode; 2128 termNode->antType[0] = rf_control; 2129 2130 return; 2131 } 2132 #endif 2133