xref: /netbsd-src/sys/dev/raidframe/rf_engine.c (revision daf6c4152fcddc27c445489775ed1f66ab4ea9a9)
1 /*	$NetBSD: rf_engine.c,v 1.41 2010/09/13 08:43:06 drochner Exp $	*/
2 /*
3  * Copyright (c) 1995 Carnegie-Mellon University.
4  * All rights reserved.
5  *
6  * Author: William V. Courtright II, Mark Holland, Rachad Youssef
7  *
8  * Permission to use, copy, modify and distribute this software and
9  * its documentation is hereby granted, provided that both the copyright
10  * notice and this permission notice appear in all copies of the
11  * software, derivative works or modified versions, and any portions
12  * thereof, and that both notices appear in supporting documentation.
13  *
14  * CARNEGIE MELLON ALLOWS FREE USE OF THIS SOFTWARE IN ITS "AS IS"
15  * CONDITION.  CARNEGIE MELLON DISCLAIMS ANY LIABILITY OF ANY KIND
16  * FOR ANY DAMAGES WHATSOEVER RESULTING FROM THE USE OF THIS SOFTWARE.
17  *
18  * Carnegie Mellon requests users of this software to return to
19  *
20  *  Software Distribution Coordinator  or  Software.Distribution@CS.CMU.EDU
21  *  School of Computer Science
22  *  Carnegie Mellon University
23  *  Pittsburgh PA 15213-3890
24  *
25  * any improvements or extensions that they make and grant Carnegie the
26  * rights to redistribute these changes.
27  */
28 
29 /****************************************************************************
30  *                                                                          *
31  * engine.c -- code for DAG execution engine                                *
32  *                                                                          *
33  * Modified to work as follows (holland):                                   *
34  *   A user-thread calls into DispatchDAG, which fires off the nodes that   *
35  *   are direct successors to the header node.  DispatchDAG then returns,   *
36  *   and the rest of the I/O continues asynchronously.  As each node        *
37  *   completes, the node execution function calls FinishNode().  FinishNode *
38  *   scans the list of successors to the node and increments the antecedent *
39  *   counts.  Each node that becomes enabled is placed on a central node    *
40  *   queue.  A dedicated dag-execution thread grabs nodes off of this       *
41  *   queue and fires them.                                                  *
42  *                                                                          *
43  *   NULL nodes are never fired.                                            *
44  *                                                                          *
45  *   Terminator nodes are never fired, but rather cause the callback        *
46  *   associated with the DAG to be invoked.                                 *
47  *                                                                          *
48  *   If a node fails, the dag either rolls forward to the completion or     *
49  *   rolls back, undoing previously-completed nodes and fails atomically.   *
50  *   The direction of recovery is determined by the location of the failed  *
51  *   node in the graph.  If the failure occurred before the commit node in   *
52  *   the graph, backward recovery is used.  Otherwise, forward recovery is  *
53  *   used.                                                                  *
54  *                                                                          *
55  ****************************************************************************/
56 
57 #include <sys/cdefs.h>
58 __KERNEL_RCSID(0, "$NetBSD: rf_engine.c,v 1.41 2010/09/13 08:43:06 drochner Exp $");
59 
60 #include <sys/errno.h>
61 
62 #include "rf_threadstuff.h"
63 #include "rf_dag.h"
64 #include "rf_engine.h"
65 #include "rf_etimer.h"
66 #include "rf_general.h"
67 #include "rf_dagutils.h"
68 #include "rf_shutdown.h"
69 #include "rf_raid.h"
70 #include "rf_kintf.h"
71 #include "rf_paritymap.h"
72 
73 static void rf_ShutdownEngine(void *);
74 static void DAGExecutionThread(RF_ThreadArg_t arg);
75 static void rf_RaidIOThread(RF_ThreadArg_t arg);
76 
77 /* synchronization primitives for this file.  DO_WAIT should be enclosed in a while loop. */
78 
79 #define DO_LOCK(_r_) \
80 do { \
81 	ks = splbio(); \
82 	RF_LOCK_MUTEX((_r_)->node_queue_mutex); \
83 } while (0)
84 
85 #define DO_UNLOCK(_r_) \
86 do { \
87 	RF_UNLOCK_MUTEX((_r_)->node_queue_mutex); \
88 	splx(ks); \
89 } while (0)
90 
91 #define	DO_WAIT(_r_) \
92 	RF_WAIT_COND((_r_)->node_queue, (_r_)->node_queue_mutex)
93 
94 #define	DO_SIGNAL(_r_) \
95 	RF_BROADCAST_COND((_r_)->node_queue)	/* XXX RF_SIGNAL_COND? */
96 
97 static void
98 rf_ShutdownEngine(void *arg)
99 {
100 	RF_Raid_t *raidPtr;
101 	int ks;
102 
103 	raidPtr = (RF_Raid_t *) arg;
104 
105 	/* Tell the rf_RaidIOThread to shutdown */
106 	simple_lock(&(raidPtr->iodone_lock));
107 
108 	raidPtr->shutdown_raidio = 1;
109 	wakeup(&(raidPtr->iodone));
110 
111 	/* ...and wait for it to tell us it has finished */
112 	while (raidPtr->shutdown_raidio)
113  		ltsleep(&(raidPtr->shutdown_raidio), PRIBIO, "raidshutdown", 0,
114 			&(raidPtr->iodone_lock));
115 
116 	simple_unlock(&(raidPtr->iodone_lock));
117 
118  	/* Now shut down the DAG execution engine. */
119  	DO_LOCK(raidPtr);
120   	raidPtr->shutdown_engine = 1;
121   	DO_SIGNAL(raidPtr);
122  	DO_UNLOCK(raidPtr);
123 
124 }
125 
126 int
127 rf_ConfigureEngine(RF_ShutdownList_t **listp, RF_Raid_t *raidPtr,
128 		   RF_Config_t *cfgPtr)
129 {
130 
131 	rf_mutex_init(&raidPtr->node_queue_mutex);
132 	raidPtr->node_queue = NULL;
133 	raidPtr->dags_in_flight = 0;
134 
135 	/* we create the execution thread only once per system boot. no need
136 	 * to check return code b/c the kernel panics if it can't create the
137 	 * thread. */
138 #if RF_DEBUG_ENGINE
139 	if (rf_engineDebug) {
140 		printf("raid%d: Creating engine thread\n", raidPtr->raidid);
141 	}
142 #endif
143 	if (RF_CREATE_ENGINE_THREAD(raidPtr->engine_thread,
144 				    DAGExecutionThread, raidPtr,
145 				    "raid%d", raidPtr->raidid)) {
146 		printf("raid%d: Unable to create engine thread\n",
147 		       raidPtr->raidid);
148 		return (ENOMEM);
149 	}
150 	if (RF_CREATE_ENGINE_THREAD(raidPtr->engine_helper_thread,
151 				    rf_RaidIOThread, raidPtr,
152 				    "raidio%d", raidPtr->raidid)) {
153 		printf("raid%d: Unable to create raidio thread\n",
154 		       raidPtr->raidid);
155 		return (ENOMEM);
156 	}
157 #if RF_DEBUG_ENGINE
158 	if (rf_engineDebug) {
159 		printf("raid%d: Created engine thread\n", raidPtr->raidid);
160 	}
161 #endif
162 
163 	/* engine thread is now running and waiting for work */
164 #if RF_DEBUG_ENGINE
165 	if (rf_engineDebug) {
166 		printf("raid%d: Engine thread running and waiting for events\n", raidPtr->raidid);
167 	}
168 #endif
169 	rf_ShutdownCreate(listp, rf_ShutdownEngine, raidPtr);
170 
171 	return (0);
172 }
173 
174 #if 0
175 static int
176 BranchDone(RF_DagNode_t *node)
177 {
178 	int     i;
179 
180 	/* return true if forward execution is completed for a node and it's
181 	 * succedents */
182 	switch (node->status) {
183 	case rf_wait:
184 		/* should never be called in this state */
185 		RF_PANIC();
186 		break;
187 	case rf_fired:
188 		/* node is currently executing, so we're not done */
189 		return (RF_FALSE);
190 	case rf_good:
191 		/* for each succedent recursively check branch */
192 		for (i = 0; i < node->numSuccedents; i++)
193 			if (!BranchDone(node->succedents[i]))
194 				return RF_FALSE;
195 		return RF_TRUE;	/* node and all succedent branches aren't in
196 				 * fired state */
197 	case rf_bad:
198 		/* succedents can't fire */
199 		return (RF_TRUE);
200 	case rf_recover:
201 		/* should never be called in this state */
202 		RF_PANIC();
203 		break;
204 	case rf_undone:
205 	case rf_panic:
206 		/* XXX need to fix this case */
207 		/* for now, assume that we're done */
208 		return (RF_TRUE);
209 	default:
210 		/* illegal node status */
211 		RF_PANIC();
212 		break;
213 	}
214 }
215 #endif
216 
217 static int
218 NodeReady(RF_DagNode_t *node)
219 {
220 	int     ready;
221 
222 	switch (node->dagHdr->status) {
223 	case rf_enable:
224 	case rf_rollForward:
225 		if ((node->status == rf_wait) &&
226 		    (node->numAntecedents == node->numAntDone))
227 			ready = RF_TRUE;
228 		else
229 			ready = RF_FALSE;
230 		break;
231 	case rf_rollBackward:
232 		RF_ASSERT(node->numSuccDone <= node->numSuccedents);
233 		RF_ASSERT(node->numSuccFired <= node->numSuccedents);
234 		RF_ASSERT(node->numSuccFired <= node->numSuccDone);
235 		if ((node->status == rf_good) &&
236 		    (node->numSuccDone == node->numSuccedents))
237 			ready = RF_TRUE;
238 		else
239 			ready = RF_FALSE;
240 		break;
241 	default:
242 		printf("Execution engine found illegal DAG status in NodeReady\n");
243 		RF_PANIC();
244 		break;
245 	}
246 
247 	return (ready);
248 }
249 
250 
251 
252 /* user context and dag-exec-thread context: Fire a node.  The node's
253  * status field determines which function, do or undo, to be fired.
254  * This routine assumes that the node's status field has alread been
255  * set to "fired" or "recover" to indicate the direction of execution.
256  */
257 static void
258 FireNode(RF_DagNode_t *node)
259 {
260 	switch (node->status) {
261 	case rf_fired:
262 		/* fire the do function of a node */
263 #if RF_DEBUG_ENGINE
264 		if (rf_engineDebug) {
265 			printf("raid%d: Firing node 0x%lx (%s)\n",
266 			       node->dagHdr->raidPtr->raidid,
267 			       (unsigned long) node, node->name);
268 		}
269 #endif
270 		if (node->flags & RF_DAGNODE_FLAG_YIELD) {
271 #if defined(__NetBSD__) && defined(_KERNEL)
272 			/* thread_block(); */
273 			/* printf("Need to block the thread here...\n");  */
274 			/* XXX thread_block is actually mentioned in
275 			 * /usr/include/vm/vm_extern.h */
276 #else
277 			thread_block();
278 #endif
279 		}
280 		(*(node->doFunc)) (node);
281 		break;
282 	case rf_recover:
283 		/* fire the undo function of a node */
284 #if RF_DEBUG_ENGINE
285 		if (rf_engineDebug) {
286 			printf("raid%d: Firing (undo) node 0x%lx (%s)\n",
287 			       node->dagHdr->raidPtr->raidid,
288 			       (unsigned long) node, node->name);
289 		}
290 #endif
291 		if (node->flags & RF_DAGNODE_FLAG_YIELD)
292 #if defined(__NetBSD__) && defined(_KERNEL)
293 			/* thread_block(); */
294 			/* printf("Need to block the thread here...\n"); */
295 			/* XXX thread_block is actually mentioned in
296 			 * /usr/include/vm/vm_extern.h */
297 #else
298 			thread_block();
299 #endif
300 		(*(node->undoFunc)) (node);
301 		break;
302 	default:
303 		RF_PANIC();
304 		break;
305 	}
306 }
307 
308 
309 
310 /* user context:
311  * Attempt to fire each node in a linear array.
312  * The entire list is fired atomically.
313  */
314 static void
315 FireNodeArray(int numNodes, RF_DagNode_t **nodeList)
316 {
317 	RF_DagStatus_t dstat;
318 	RF_DagNode_t *node;
319 	int     i, j;
320 
321 	/* first, mark all nodes which are ready to be fired */
322 	for (i = 0; i < numNodes; i++) {
323 		node = nodeList[i];
324 		dstat = node->dagHdr->status;
325 		RF_ASSERT((node->status == rf_wait) ||
326 			  (node->status == rf_good));
327 		if (NodeReady(node)) {
328 			if ((dstat == rf_enable) ||
329 			    (dstat == rf_rollForward)) {
330 				RF_ASSERT(node->status == rf_wait);
331 				if (node->commitNode)
332 					node->dagHdr->numCommits++;
333 				node->status = rf_fired;
334 				for (j = 0; j < node->numAntecedents; j++)
335 					node->antecedents[j]->numSuccFired++;
336 			} else {
337 				RF_ASSERT(dstat == rf_rollBackward);
338 				RF_ASSERT(node->status == rf_good);
339 				/* only one commit node per graph */
340 				RF_ASSERT(node->commitNode == RF_FALSE);
341 				node->status = rf_recover;
342 			}
343 		}
344 	}
345 	/* now, fire the nodes */
346 	for (i = 0; i < numNodes; i++) {
347 		if ((nodeList[i]->status == rf_fired) ||
348 		    (nodeList[i]->status == rf_recover))
349 			FireNode(nodeList[i]);
350 	}
351 }
352 
353 
354 /* user context:
355  * Attempt to fire each node in a linked list.
356  * The entire list is fired atomically.
357  */
358 static void
359 FireNodeList(RF_DagNode_t *nodeList)
360 {
361 	RF_DagNode_t *node, *next;
362 	RF_DagStatus_t dstat;
363 	int     j;
364 
365 	if (nodeList) {
366 		/* first, mark all nodes which are ready to be fired */
367 		for (node = nodeList; node; node = next) {
368 			next = node->next;
369 			dstat = node->dagHdr->status;
370 			RF_ASSERT((node->status == rf_wait) ||
371 				  (node->status == rf_good));
372 			if (NodeReady(node)) {
373 				if ((dstat == rf_enable) ||
374 				    (dstat == rf_rollForward)) {
375 					RF_ASSERT(node->status == rf_wait);
376 					if (node->commitNode)
377 						node->dagHdr->numCommits++;
378 					node->status = rf_fired;
379 					for (j = 0; j < node->numAntecedents; j++)
380 						node->antecedents[j]->numSuccFired++;
381 				} else {
382 					RF_ASSERT(dstat == rf_rollBackward);
383 					RF_ASSERT(node->status == rf_good);
384 					/* only one commit node per graph */
385 					RF_ASSERT(node->commitNode == RF_FALSE);
386 					node->status = rf_recover;
387 				}
388 			}
389 		}
390 		/* now, fire the nodes */
391 		for (node = nodeList; node; node = next) {
392 			next = node->next;
393 			if ((node->status == rf_fired) ||
394 			    (node->status == rf_recover))
395 				FireNode(node);
396 		}
397 	}
398 }
399 /* interrupt context:
400  * for each succedent
401  *    propagate required results from node to succedent
402  *    increment succedent's numAntDone
403  *    place newly-enable nodes on node queue for firing
404  *
405  * To save context switches, we don't place NIL nodes on the node queue,
406  * but rather just process them as if they had fired.  Note that NIL nodes
407  * that are the direct successors of the header will actually get fired by
408  * DispatchDAG, which is fine because no context switches are involved.
409  *
410  * Important:  when running at user level, this can be called by any
411  * disk thread, and so the increment and check of the antecedent count
412  * must be locked.  I used the node queue mutex and locked down the
413  * entire function, but this is certainly overkill.
414  */
415 static void
416 PropagateResults(RF_DagNode_t *node, int context)
417 {
418 	RF_DagNode_t *s, *a;
419 	RF_Raid_t *raidPtr;
420 	int     i, ks;
421 	RF_DagNode_t *finishlist = NULL;	/* a list of NIL nodes to be
422 						 * finished */
423 	RF_DagNode_t *skiplist = NULL;	/* list of nodes with failed truedata
424 					 * antecedents */
425 	RF_DagNode_t *firelist = NULL;	/* a list of nodes to be fired */
426 	RF_DagNode_t *q = NULL, *qh = NULL, *next;
427 	int     j, skipNode;
428 
429 	raidPtr = node->dagHdr->raidPtr;
430 
431 	DO_LOCK(raidPtr);
432 
433 	/* debug - validate fire counts */
434 	for (i = 0; i < node->numAntecedents; i++) {
435 		a = *(node->antecedents + i);
436 		RF_ASSERT(a->numSuccFired >= a->numSuccDone);
437 		RF_ASSERT(a->numSuccFired <= a->numSuccedents);
438 		a->numSuccDone++;
439 	}
440 
441 	switch (node->dagHdr->status) {
442 	case rf_enable:
443 	case rf_rollForward:
444 		for (i = 0; i < node->numSuccedents; i++) {
445 			s = *(node->succedents + i);
446 			RF_ASSERT(s->status == rf_wait);
447 			(s->numAntDone)++;
448 			if (s->numAntDone == s->numAntecedents) {
449 				/* look for NIL nodes */
450 				if (s->doFunc == rf_NullNodeFunc) {
451 					/* don't fire NIL nodes, just process
452 					 * them */
453 					s->next = finishlist;
454 					finishlist = s;
455 				} else {
456 					/* look to see if the node is to be
457 					 * skipped */
458 					skipNode = RF_FALSE;
459 					for (j = 0; j < s->numAntecedents; j++)
460 						if ((s->antType[j] == rf_trueData) && (s->antecedents[j]->status == rf_bad))
461 							skipNode = RF_TRUE;
462 					if (skipNode) {
463 						/* this node has one or more
464 						 * failed true data
465 						 * dependencies, so skip it */
466 						s->next = skiplist;
467 						skiplist = s;
468 					} else
469 						/* add s to list of nodes (q)
470 						 * to execute */
471 						if (context != RF_INTR_CONTEXT) {
472 							/* we only have to
473 							 * enqueue if we're at
474 							 * intr context */
475 							/* put node on
476                                                            a list to
477                                                            be fired
478                                                            after we
479                                                            unlock */
480 							s->next = firelist;
481 							firelist = s;
482 						} else {
483 							/* enqueue the
484 							   node for
485 							   the dag
486 							   exec thread
487 							   to fire */
488 							RF_ASSERT(NodeReady(s));
489 							if (q) {
490 								q->next = s;
491 								q = s;
492 							} else {
493 								qh = q = s;
494 								qh->next = NULL;
495 							}
496 						}
497 				}
498 			}
499 		}
500 
501 		if (q) {
502 			/* xfer our local list of nodes to the node queue */
503 			q->next = raidPtr->node_queue;
504 			raidPtr->node_queue = qh;
505 			DO_SIGNAL(raidPtr);
506 		}
507 		DO_UNLOCK(raidPtr);
508 
509 		for (; skiplist; skiplist = next) {
510 			next = skiplist->next;
511 			skiplist->status = rf_skipped;
512 			for (i = 0; i < skiplist->numAntecedents; i++) {
513 				skiplist->antecedents[i]->numSuccFired++;
514 			}
515 			if (skiplist->commitNode) {
516 				skiplist->dagHdr->numCommits++;
517 			}
518 			rf_FinishNode(skiplist, context);
519 		}
520 		for (; finishlist; finishlist = next) {
521 			/* NIL nodes: no need to fire them */
522 			next = finishlist->next;
523 			finishlist->status = rf_good;
524 			for (i = 0; i < finishlist->numAntecedents; i++) {
525 				finishlist->antecedents[i]->numSuccFired++;
526 			}
527 			if (finishlist->commitNode)
528 				finishlist->dagHdr->numCommits++;
529 			/*
530 		         * Okay, here we're calling rf_FinishNode() on
531 		         * nodes that have the null function as their
532 		         * work proc. Such a node could be the
533 		         * terminal node in a DAG. If so, it will
534 		         * cause the DAG to complete, which will in
535 		         * turn free memory used by the DAG, which
536 		         * includes the node in question. Thus, we
537 		         * must avoid referencing the node at all
538 		         * after calling rf_FinishNode() on it.  */
539 			rf_FinishNode(finishlist, context);	/* recursive call */
540 		}
541 		/* fire all nodes in firelist */
542 		FireNodeList(firelist);
543 		break;
544 
545 	case rf_rollBackward:
546 		for (i = 0; i < node->numAntecedents; i++) {
547 			a = *(node->antecedents + i);
548 			RF_ASSERT(a->status == rf_good);
549 			RF_ASSERT(a->numSuccDone <= a->numSuccedents);
550 			RF_ASSERT(a->numSuccDone <= a->numSuccFired);
551 
552 			if (a->numSuccDone == a->numSuccFired) {
553 				if (a->undoFunc == rf_NullNodeFunc) {
554 					/* don't fire NIL nodes, just process
555 					 * them */
556 					a->next = finishlist;
557 					finishlist = a;
558 				} else {
559 					if (context != RF_INTR_CONTEXT) {
560 						/* we only have to enqueue if
561 						 * we're at intr context */
562 						/* put node on a list to be
563 						   fired after we unlock */
564 						a->next = firelist;
565 
566 						firelist = a;
567 					} else {
568 						/* enqueue the node for the
569 						   dag exec thread to fire */
570 						RF_ASSERT(NodeReady(a));
571 						if (q) {
572 							q->next = a;
573 							q = a;
574 						} else {
575 							qh = q = a;
576 							qh->next = NULL;
577 						}
578 					}
579 				}
580 			}
581 		}
582 		if (q) {
583 			/* xfer our local list of nodes to the node queue */
584 			q->next = raidPtr->node_queue;
585 			raidPtr->node_queue = qh;
586 			DO_SIGNAL(raidPtr);
587 		}
588 		DO_UNLOCK(raidPtr);
589 		for (; finishlist; finishlist = next) {
590 			/* NIL nodes: no need to fire them */
591 			next = finishlist->next;
592 			finishlist->status = rf_good;
593 			/*
594 		         * Okay, here we're calling rf_FinishNode() on
595 		         * nodes that have the null function as their
596 		         * work proc. Such a node could be the first
597 		         * node in a DAG. If so, it will cause the DAG
598 		         * to complete, which will in turn free memory
599 		         * used by the DAG, which includes the node in
600 		         * question. Thus, we must avoid referencing
601 		         * the node at all after calling
602 		         * rf_FinishNode() on it.  */
603 			rf_FinishNode(finishlist, context);	/* recursive call */
604 		}
605 		/* fire all nodes in firelist */
606 		FireNodeList(firelist);
607 
608 		break;
609 	default:
610 		printf("Engine found illegal DAG status in PropagateResults()\n");
611 		RF_PANIC();
612 		break;
613 	}
614 }
615 
616 
617 
618 /*
619  * Process a fired node which has completed
620  */
621 static void
622 ProcessNode(RF_DagNode_t *node, int context)
623 {
624 	RF_Raid_t *raidPtr;
625 
626 	raidPtr = node->dagHdr->raidPtr;
627 
628 	switch (node->status) {
629 	case rf_good:
630 		/* normal case, don't need to do anything */
631 		break;
632 	case rf_bad:
633 		if ((node->dagHdr->numCommits > 0) ||
634 		    (node->dagHdr->numCommitNodes == 0)) {
635 			/* crossed commit barrier */
636 			node->dagHdr->status = rf_rollForward;
637 #if RF_DEBUG_ENGINE
638 			if (rf_engineDebug) {
639 				printf("raid%d: node (%s) returned fail, rolling forward\n", raidPtr->raidid, node->name);
640 			}
641 #endif
642 		} else {
643 			/* never reached commit barrier */
644 			node->dagHdr->status = rf_rollBackward;
645 #if RF_DEBUG_ENGINE
646 			if (rf_engineDebug) {
647 				printf("raid%d: node (%s) returned fail, rolling backward\n", raidPtr->raidid, node->name);
648 			}
649 #endif
650 		}
651 		break;
652 	case rf_undone:
653 		/* normal rollBackward case, don't need to do anything */
654 		break;
655 	case rf_panic:
656 		/* an undo node failed!!! */
657 		printf("UNDO of a node failed!!!/n");
658 		break;
659 	default:
660 		printf("node finished execution with an illegal status!!!\n");
661 		RF_PANIC();
662 		break;
663 	}
664 
665 	/* enqueue node's succedents (antecedents if rollBackward) for
666 	 * execution */
667 	PropagateResults(node, context);
668 }
669 
670 
671 
672 /* user context or dag-exec-thread context:
673  * This is the first step in post-processing a newly-completed node.
674  * This routine is called by each node execution function to mark the node
675  * as complete and fire off any successors that have been enabled.
676  */
677 int
678 rf_FinishNode(RF_DagNode_t *node, int context)
679 {
680 	int     retcode = RF_FALSE;
681 	node->dagHdr->numNodesCompleted++;
682 	ProcessNode(node, context);
683 
684 	return (retcode);
685 }
686 
687 
688 /* user context: submit dag for execution, return non-zero if we have
689  * to wait for completion.  if and only if we return non-zero, we'll
690  * cause cbFunc to get invoked with cbArg when the DAG has completed.
691  *
692  * for now we always return 1.  If the DAG does not cause any I/O,
693  * then the callback may get invoked before DispatchDAG returns.
694  * There's code in state 5 of ContinueRaidAccess to handle this.
695  *
696  * All we do here is fire the direct successors of the header node.
697  * The DAG execution thread does the rest of the dag processing.  */
698 int
699 rf_DispatchDAG(RF_DagHeader_t *dag, void (*cbFunc) (void *),
700 	       void *cbArg)
701 {
702 	RF_Raid_t *raidPtr;
703 
704 	raidPtr = dag->raidPtr;
705 #if RF_ACC_TRACE > 0
706 	if (dag->tracerec) {
707 		RF_ETIMER_START(dag->tracerec->timer);
708 	}
709 #endif
710 #if DEBUG
711 #if RF_DEBUG_VALIDATE_DAG
712 	if (rf_engineDebug || rf_validateDAGDebug) {
713 		if (rf_ValidateDAG(dag))
714 			RF_PANIC();
715 	}
716 #endif
717 #endif
718 #if RF_DEBUG_ENGINE
719 	if (rf_engineDebug) {
720 		printf("raid%d: Entering DispatchDAG\n", raidPtr->raidid);
721 	}
722 #endif
723 	raidPtr->dags_in_flight++;	/* debug only:  blow off proper
724 					 * locking */
725 	dag->cbFunc = cbFunc;
726 	dag->cbArg = cbArg;
727 	dag->numNodesCompleted = 0;
728 	dag->status = rf_enable;
729 	FireNodeArray(dag->numSuccedents, dag->succedents);
730 	return (1);
731 }
732 /* dedicated kernel thread: the thread that handles all DAG node
733  * firing.  To minimize locking and unlocking, we grab a copy of the
734  * entire node queue and then set the node queue to NULL before doing
735  * any firing of nodes.  This way we only have to release the lock
736  * once.  Of course, it's probably rare that there's more than one
737  * node in the queue at any one time, but it sometimes happens.
738  */
739 
740 static void
741 DAGExecutionThread(RF_ThreadArg_t arg)
742 {
743 	RF_DagNode_t *nd, *local_nq, *term_nq, *fire_nq;
744 	RF_Raid_t *raidPtr;
745 	int     ks;
746 	int     s;
747 
748 	raidPtr = (RF_Raid_t *) arg;
749 
750 #if RF_DEBUG_ENGINE
751 	if (rf_engineDebug) {
752 		printf("raid%d: Engine thread is running\n", raidPtr->raidid);
753 	}
754 #endif
755 	s = splbio();
756 
757 	DO_LOCK(raidPtr);
758 	while (!raidPtr->shutdown_engine) {
759 
760 		while (raidPtr->node_queue != NULL) {
761 			local_nq = raidPtr->node_queue;
762 			fire_nq = NULL;
763 			term_nq = NULL;
764 			raidPtr->node_queue = NULL;
765 			DO_UNLOCK(raidPtr);
766 
767 			/* first, strip out the terminal nodes */
768 			while (local_nq) {
769 				nd = local_nq;
770 				local_nq = local_nq->next;
771 				switch (nd->dagHdr->status) {
772 				case rf_enable:
773 				case rf_rollForward:
774 					if (nd->numSuccedents == 0) {
775 						/* end of the dag, add to
776 						 * callback list */
777 						nd->next = term_nq;
778 						term_nq = nd;
779 					} else {
780 						/* not the end, add to the
781 						 * fire queue */
782 						nd->next = fire_nq;
783 						fire_nq = nd;
784 					}
785 					break;
786 				case rf_rollBackward:
787 					if (nd->numAntecedents == 0) {
788 						/* end of the dag, add to the
789 						 * callback list */
790 						nd->next = term_nq;
791 						term_nq = nd;
792 					} else {
793 						/* not the end, add to the
794 						 * fire queue */
795 						nd->next = fire_nq;
796 						fire_nq = nd;
797 					}
798 					break;
799 				default:
800 					RF_PANIC();
801 					break;
802 				}
803 			}
804 
805 			/* execute callback of dags which have reached the
806 			 * terminal node */
807 			while (term_nq) {
808 				nd = term_nq;
809 				term_nq = term_nq->next;
810 				nd->next = NULL;
811 				(nd->dagHdr->cbFunc) (nd->dagHdr->cbArg);
812 				raidPtr->dags_in_flight--;	/* debug only */
813 			}
814 
815 			/* fire remaining nodes */
816 			FireNodeList(fire_nq);
817 
818 			DO_LOCK(raidPtr);
819 		}
820 		while (!raidPtr->shutdown_engine &&
821 		       raidPtr->node_queue == NULL) {
822 			DO_WAIT(raidPtr);
823 		}
824 	}
825 	DO_UNLOCK(raidPtr);
826 
827 	splx(s);
828 	kthread_exit(0);
829 }
830 
831 /*
832  * rf_RaidIOThread() -- When I/O to a component begins, raidstrategy()
833  * puts the I/O on a buf_queue, and then signals raidPtr->iodone.  If
834  * necessary, this function calls raidstart() to initiate the I/O.
835  * When I/O to a component completes, KernelWakeupFunc() puts the
836  * completed request onto raidPtr->iodone TAILQ.  This function looks
837  * after requests on that queue by calling rf_DiskIOComplete() for the
838  * request, and by calling any required CompleteFunc for the request.
839  */
840 
841 static void
842 rf_RaidIOThread(RF_ThreadArg_t arg)
843 {
844 	RF_Raid_t *raidPtr;
845 	RF_DiskQueueData_t *req;
846 	int s;
847 
848 	raidPtr = (RF_Raid_t *) arg;
849 
850 	s = splbio();
851 	simple_lock(&(raidPtr->iodone_lock));
852 
853 	while (!raidPtr->shutdown_raidio) {
854 		/* if there is nothing to do, then snooze. */
855 		if (TAILQ_EMPTY(&(raidPtr->iodone)) &&
856 		    rf_buf_queue_check(raidPtr->raidid)) {
857 			ltsleep(&(raidPtr->iodone), PRIBIO, "raidiow", 0,
858 				&(raidPtr->iodone_lock));
859 		}
860 
861 		/* Check for deferred parity-map-related work. */
862 		if (raidPtr->parity_map != NULL) {
863 			simple_unlock(&(raidPtr->iodone_lock));
864 			rf_paritymap_checkwork(raidPtr->parity_map);
865 			simple_lock(&(raidPtr->iodone_lock));
866 		}
867 
868 		/* See what I/Os, if any, have arrived */
869 		while ((req = TAILQ_FIRST(&(raidPtr->iodone))) != NULL) {
870 			TAILQ_REMOVE(&(raidPtr->iodone), req, iodone_entries);
871 			simple_unlock(&(raidPtr->iodone_lock));
872 			rf_DiskIOComplete(req->queue, req, req->error);
873 			(req->CompleteFunc) (req->argument, req->error);
874 			simple_lock(&(raidPtr->iodone_lock));
875 		}
876 
877 		/* process any pending outgoing IO */
878 		simple_unlock(&(raidPtr->iodone_lock));
879 		raidstart(raidPtr);
880 		simple_lock(&(raidPtr->iodone_lock));
881 
882 	}
883 
884 	/* Let rf_ShutdownEngine know that we're done... */
885 	raidPtr->shutdown_raidio = 0;
886 	wakeup(&(raidPtr->shutdown_raidio));
887 
888 	simple_unlock(&(raidPtr->iodone_lock));
889 	splx(s);
890 
891 	kthread_exit(0);
892 }
893