xref: /netbsd-src/sys/dev/raidframe/rf_engine.c (revision d20841bb642898112fe68f0ad3f7b26dddf56f07)
1 /*	$NetBSD: rf_engine.c,v 1.31 2004/01/02 21:41:08 oster Exp $	*/
2 /*
3  * Copyright (c) 1995 Carnegie-Mellon University.
4  * All rights reserved.
5  *
6  * Author: William V. Courtright II, Mark Holland, Rachad Youssef
7  *
8  * Permission to use, copy, modify and distribute this software and
9  * its documentation is hereby granted, provided that both the copyright
10  * notice and this permission notice appear in all copies of the
11  * software, derivative works or modified versions, and any portions
12  * thereof, and that both notices appear in supporting documentation.
13  *
14  * CARNEGIE MELLON ALLOWS FREE USE OF THIS SOFTWARE IN ITS "AS IS"
15  * CONDITION.  CARNEGIE MELLON DISCLAIMS ANY LIABILITY OF ANY KIND
16  * FOR ANY DAMAGES WHATSOEVER RESULTING FROM THE USE OF THIS SOFTWARE.
17  *
18  * Carnegie Mellon requests users of this software to return to
19  *
20  *  Software Distribution Coordinator  or  Software.Distribution@CS.CMU.EDU
21  *  School of Computer Science
22  *  Carnegie Mellon University
23  *  Pittsburgh PA 15213-3890
24  *
25  * any improvements or extensions that they make and grant Carnegie the
26  * rights to redistribute these changes.
27  */
28 
29 /****************************************************************************
30  *                                                                          *
31  * engine.c -- code for DAG execution engine                                *
32  *                                                                          *
33  * Modified to work as follows (holland):                                   *
34  *   A user-thread calls into DispatchDAG, which fires off the nodes that   *
35  *   are direct successors to the header node.  DispatchDAG then returns,   *
36  *   and the rest of the I/O continues asynchronously.  As each node        *
37  *   completes, the node execution function calls FinishNode().  FinishNode *
38  *   scans the list of successors to the node and increments the antecedent *
39  *   counts.  Each node that becomes enabled is placed on a central node    *
40  *   queue.  A dedicated dag-execution thread grabs nodes off of this       *
41  *   queue and fires them.                                                  *
42  *                                                                          *
43  *   NULL nodes are never fired.                                            *
44  *                                                                          *
45  *   Terminator nodes are never fired, but rather cause the callback        *
46  *   associated with the DAG to be invoked.                                 *
47  *                                                                          *
48  *   If a node fails, the dag either rolls forward to the completion or     *
49  *   rolls back, undoing previously-completed nodes and fails atomically.   *
50  *   The direction of recovery is determined by the location of the failed  *
51  *   node in the graph.  If the failure occurred before the commit node in   *
52  *   the graph, backward recovery is used.  Otherwise, forward recovery is  *
53  *   used.                                                                  *
54  *                                                                          *
55  ****************************************************************************/
56 
57 #include <sys/cdefs.h>
58 __KERNEL_RCSID(0, "$NetBSD: rf_engine.c,v 1.31 2004/01/02 21:41:08 oster Exp $");
59 
60 #include <sys/errno.h>
61 
62 #include "rf_threadstuff.h"
63 #include "rf_dag.h"
64 #include "rf_engine.h"
65 #include "rf_etimer.h"
66 #include "rf_general.h"
67 #include "rf_dagutils.h"
68 #include "rf_shutdown.h"
69 #include "rf_raid.h"
70 
71 static void rf_ShutdownEngine(void *);
72 static void DAGExecutionThread(RF_ThreadArg_t arg);
73 static void rf_RaidIOThread(RF_ThreadArg_t arg);
74 
75 /* synchronization primitives for this file.  DO_WAIT should be enclosed in a while loop. */
76 
77 #define DO_LOCK(_r_) \
78 do { \
79 	ks = splbio(); \
80 	RF_LOCK_MUTEX((_r_)->node_queue_mutex); \
81 } while (0)
82 
83 #define DO_UNLOCK(_r_) \
84 do { \
85 	RF_UNLOCK_MUTEX((_r_)->node_queue_mutex); \
86 	splx(ks); \
87 } while (0)
88 
89 #define	DO_WAIT(_r_) \
90 	RF_WAIT_COND((_r_)->node_queue, (_r_)->node_queue_mutex)
91 
92 #define	DO_SIGNAL(_r_) \
93 	RF_BROADCAST_COND((_r_)->node_queue)	/* XXX RF_SIGNAL_COND? */
94 
95 static void
96 rf_ShutdownEngine(void *arg)
97 {
98 	RF_Raid_t *raidPtr;
99 	int ks;
100 
101 	raidPtr = (RF_Raid_t *) arg;
102 
103 	/* Tell the rf_RaidIOThread to shutdown */
104 	simple_lock(&(raidPtr->iodone_lock));
105 
106 	raidPtr->shutdown_raidio = 1;
107 	wakeup(&(raidPtr->iodone));
108 
109 	/* ...and wait for it to tell us it has finished */
110 	while (raidPtr->shutdown_raidio)
111  		ltsleep(&(raidPtr->shutdown_raidio), PRIBIO, "raidshutdown", 0,
112 			&(raidPtr->iodone_lock));
113 
114 	simple_unlock(&(raidPtr->iodone_lock));
115 
116  	/* Now shut down the DAG execution engine. */
117  	DO_LOCK(raidPtr);
118   	raidPtr->shutdown_engine = 1;
119   	DO_SIGNAL(raidPtr);
120  	DO_UNLOCK(raidPtr);
121 
122 }
123 
124 int
125 rf_ConfigureEngine(RF_ShutdownList_t **listp, RF_Raid_t *raidPtr,
126 		   RF_Config_t *cfgPtr)
127 {
128 	int     rc;
129 
130 	rf_mutex_init(&raidPtr->node_queue_mutex);
131 	raidPtr->node_queue = NULL;
132 	raidPtr->dags_in_flight = 0;
133 
134 	/* we create the execution thread only once per system boot. no need
135 	 * to check return code b/c the kernel panics if it can't create the
136 	 * thread. */
137 	if (rf_engineDebug) {
138 		printf("raid%d: Creating engine thread\n", raidPtr->raidid);
139 	}
140 	if (RF_CREATE_ENGINE_THREAD(raidPtr->engine_thread,
141 				    DAGExecutionThread, raidPtr,
142 				    "raid%d", raidPtr->raidid)) {
143 		printf("raid%d: Unable to create engine thread\n",
144 		       raidPtr->raidid);
145 		return (ENOMEM);
146 	}
147 	if (RF_CREATE_ENGINE_THREAD(raidPtr->engine_helper_thread,
148 				    rf_RaidIOThread, raidPtr,
149 				    "raidio%d", raidPtr->raidid)) {
150 		printf("raid%d: Unable to create raidio thread\n",
151 		       raidPtr->raidid);
152 		return (ENOMEM);
153 	}
154 	if (rf_engineDebug) {
155 		printf("raid%d: Created engine thread\n", raidPtr->raidid);
156 	}
157 
158 	/* engine thread is now running and waiting for work */
159 	if (rf_engineDebug) {
160 		printf("raid%d: Engine thread running and waiting for events\n", raidPtr->raidid);
161 	}
162 	rc = rf_ShutdownCreate(listp, rf_ShutdownEngine, raidPtr);
163 	if (rc) {
164 		rf_print_unable_to_add_shutdown(__FILE__, __LINE__, rc);
165 		rf_ShutdownEngine(NULL);
166 	}
167 	return (rc);
168 }
169 
170 static int
171 BranchDone(RF_DagNode_t *node)
172 {
173 	int     i;
174 
175 	/* return true if forward execution is completed for a node and it's
176 	 * succedents */
177 	switch (node->status) {
178 	case rf_wait:
179 		/* should never be called in this state */
180 		RF_PANIC();
181 		break;
182 	case rf_fired:
183 		/* node is currently executing, so we're not done */
184 		return (RF_FALSE);
185 	case rf_good:
186 		/* for each succedent recursively check branch */
187 		for (i = 0; i < node->numSuccedents; i++)
188 			if (!BranchDone(node->succedents[i]))
189 				return RF_FALSE;
190 		return RF_TRUE;	/* node and all succedent branches aren't in
191 				 * fired state */
192 	case rf_bad:
193 		/* succedents can't fire */
194 		return (RF_TRUE);
195 	case rf_recover:
196 		/* should never be called in this state */
197 		RF_PANIC();
198 		break;
199 	case rf_undone:
200 	case rf_panic:
201 		/* XXX need to fix this case */
202 		/* for now, assume that we're done */
203 		return (RF_TRUE);
204 	default:
205 		/* illegal node status */
206 		RF_PANIC();
207 		break;
208 	}
209 }
210 
211 static int
212 NodeReady(RF_DagNode_t *node)
213 {
214 	int     ready;
215 
216 	switch (node->dagHdr->status) {
217 	case rf_enable:
218 	case rf_rollForward:
219 		if ((node->status == rf_wait) &&
220 		    (node->numAntecedents == node->numAntDone))
221 			ready = RF_TRUE;
222 		else
223 			ready = RF_FALSE;
224 		break;
225 	case rf_rollBackward:
226 		RF_ASSERT(node->numSuccDone <= node->numSuccedents);
227 		RF_ASSERT(node->numSuccFired <= node->numSuccedents);
228 		RF_ASSERT(node->numSuccFired <= node->numSuccDone);
229 		if ((node->status == rf_good) &&
230 		    (node->numSuccDone == node->numSuccedents))
231 			ready = RF_TRUE;
232 		else
233 			ready = RF_FALSE;
234 		break;
235 	default:
236 		printf("Execution engine found illegal DAG status in NodeReady\n");
237 		RF_PANIC();
238 		break;
239 	}
240 
241 	return (ready);
242 }
243 
244 
245 
246 /* user context and dag-exec-thread context: Fire a node.  The node's
247  * status field determines which function, do or undo, to be fired.
248  * This routine assumes that the node's status field has alread been
249  * set to "fired" or "recover" to indicate the direction of execution.
250  */
251 static void
252 FireNode(RF_DagNode_t *node)
253 {
254 	switch (node->status) {
255 	case rf_fired:
256 		/* fire the do function of a node */
257 		if (rf_engineDebug) {
258 			printf("raid%d: Firing node 0x%lx (%s)\n",
259 			       node->dagHdr->raidPtr->raidid,
260 			       (unsigned long) node, node->name);
261 		}
262 		if (node->flags & RF_DAGNODE_FLAG_YIELD) {
263 #if defined(__NetBSD__) && defined(_KERNEL)
264 			/* thread_block(); */
265 			/* printf("Need to block the thread here...\n");  */
266 			/* XXX thread_block is actually mentioned in
267 			 * /usr/include/vm/vm_extern.h */
268 #else
269 			thread_block();
270 #endif
271 		}
272 		(*(node->doFunc)) (node);
273 		break;
274 	case rf_recover:
275 		/* fire the undo function of a node */
276 		if (rf_engineDebug) {
277 			printf("raid%d: Firing (undo) node 0x%lx (%s)\n",
278 			       node->dagHdr->raidPtr->raidid,
279 			       (unsigned long) node, node->name);
280 		}
281 		if (node->flags & RF_DAGNODE_FLAG_YIELD)
282 #if defined(__NetBSD__) && defined(_KERNEL)
283 			/* thread_block(); */
284 			/* printf("Need to block the thread here...\n"); */
285 			/* XXX thread_block is actually mentioned in
286 			 * /usr/include/vm/vm_extern.h */
287 #else
288 			thread_block();
289 #endif
290 		(*(node->undoFunc)) (node);
291 		break;
292 	default:
293 		RF_PANIC();
294 		break;
295 	}
296 }
297 
298 
299 
300 /* user context:
301  * Attempt to fire each node in a linear array.
302  * The entire list is fired atomically.
303  */
304 static void
305 FireNodeArray(int numNodes, RF_DagNode_t **nodeList)
306 {
307 	RF_DagStatus_t dstat;
308 	RF_DagNode_t *node;
309 	int     i, j;
310 
311 	/* first, mark all nodes which are ready to be fired */
312 	for (i = 0; i < numNodes; i++) {
313 		node = nodeList[i];
314 		dstat = node->dagHdr->status;
315 		RF_ASSERT((node->status == rf_wait) ||
316 			  (node->status == rf_good));
317 		if (NodeReady(node)) {
318 			if ((dstat == rf_enable) ||
319 			    (dstat == rf_rollForward)) {
320 				RF_ASSERT(node->status == rf_wait);
321 				if (node->commitNode)
322 					node->dagHdr->numCommits++;
323 				node->status = rf_fired;
324 				for (j = 0; j < node->numAntecedents; j++)
325 					node->antecedents[j]->numSuccFired++;
326 			} else {
327 				RF_ASSERT(dstat == rf_rollBackward);
328 				RF_ASSERT(node->status == rf_good);
329 				/* only one commit node per graph */
330 				RF_ASSERT(node->commitNode == RF_FALSE);
331 				node->status = rf_recover;
332 			}
333 		}
334 	}
335 	/* now, fire the nodes */
336 	for (i = 0; i < numNodes; i++) {
337 		if ((nodeList[i]->status == rf_fired) ||
338 		    (nodeList[i]->status == rf_recover))
339 			FireNode(nodeList[i]);
340 	}
341 }
342 
343 
344 /* user context:
345  * Attempt to fire each node in a linked list.
346  * The entire list is fired atomically.
347  */
348 static void
349 FireNodeList(RF_DagNode_t *nodeList)
350 {
351 	RF_DagNode_t *node, *next;
352 	RF_DagStatus_t dstat;
353 	int     j;
354 
355 	if (nodeList) {
356 		/* first, mark all nodes which are ready to be fired */
357 		for (node = nodeList; node; node = next) {
358 			next = node->next;
359 			dstat = node->dagHdr->status;
360 			RF_ASSERT((node->status == rf_wait) ||
361 				  (node->status == rf_good));
362 			if (NodeReady(node)) {
363 				if ((dstat == rf_enable) ||
364 				    (dstat == rf_rollForward)) {
365 					RF_ASSERT(node->status == rf_wait);
366 					if (node->commitNode)
367 						node->dagHdr->numCommits++;
368 					node->status = rf_fired;
369 					for (j = 0; j < node->numAntecedents; j++)
370 						node->antecedents[j]->numSuccFired++;
371 				} else {
372 					RF_ASSERT(dstat == rf_rollBackward);
373 					RF_ASSERT(node->status == rf_good);
374 					/* only one commit node per graph */
375 					RF_ASSERT(node->commitNode == RF_FALSE);
376 					node->status = rf_recover;
377 				}
378 			}
379 		}
380 		/* now, fire the nodes */
381 		for (node = nodeList; node; node = next) {
382 			next = node->next;
383 			if ((node->status == rf_fired) ||
384 			    (node->status == rf_recover))
385 				FireNode(node);
386 		}
387 	}
388 }
389 /* interrupt context:
390  * for each succedent
391  *    propagate required results from node to succedent
392  *    increment succedent's numAntDone
393  *    place newly-enable nodes on node queue for firing
394  *
395  * To save context switches, we don't place NIL nodes on the node queue,
396  * but rather just process them as if they had fired.  Note that NIL nodes
397  * that are the direct successors of the header will actually get fired by
398  * DispatchDAG, which is fine because no context switches are involved.
399  *
400  * Important:  when running at user level, this can be called by any
401  * disk thread, and so the increment and check of the antecedent count
402  * must be locked.  I used the node queue mutex and locked down the
403  * entire function, but this is certainly overkill.
404  */
405 static void
406 PropagateResults(RF_DagNode_t *node, int context)
407 {
408 	RF_DagNode_t *s, *a;
409 	RF_Raid_t *raidPtr;
410 	int     i, ks;
411 	RF_DagNode_t *finishlist = NULL;	/* a list of NIL nodes to be
412 						 * finished */
413 	RF_DagNode_t *skiplist = NULL;	/* list of nodes with failed truedata
414 					 * antecedents */
415 	RF_DagNode_t *firelist = NULL;	/* a list of nodes to be fired */
416 	RF_DagNode_t *q = NULL, *qh = NULL, *next;
417 	int     j, skipNode;
418 
419 	raidPtr = node->dagHdr->raidPtr;
420 
421 	DO_LOCK(raidPtr);
422 
423 	/* debug - validate fire counts */
424 	for (i = 0; i < node->numAntecedents; i++) {
425 		a = *(node->antecedents + i);
426 		RF_ASSERT(a->numSuccFired >= a->numSuccDone);
427 		RF_ASSERT(a->numSuccFired <= a->numSuccedents);
428 		a->numSuccDone++;
429 	}
430 
431 	switch (node->dagHdr->status) {
432 	case rf_enable:
433 	case rf_rollForward:
434 		for (i = 0; i < node->numSuccedents; i++) {
435 			s = *(node->succedents + i);
436 			RF_ASSERT(s->status == rf_wait);
437 			(s->numAntDone)++;
438 			if (s->numAntDone == s->numAntecedents) {
439 				/* look for NIL nodes */
440 				if (s->doFunc == rf_NullNodeFunc) {
441 					/* don't fire NIL nodes, just process
442 					 * them */
443 					s->next = finishlist;
444 					finishlist = s;
445 				} else {
446 					/* look to see if the node is to be
447 					 * skipped */
448 					skipNode = RF_FALSE;
449 					for (j = 0; j < s->numAntecedents; j++)
450 						if ((s->antType[j] == rf_trueData) && (s->antecedents[j]->status == rf_bad))
451 							skipNode = RF_TRUE;
452 					if (skipNode) {
453 						/* this node has one or more
454 						 * failed true data
455 						 * dependencies, so skip it */
456 						s->next = skiplist;
457 						skiplist = s;
458 					} else
459 						/* add s to list of nodes (q)
460 						 * to execute */
461 						if (context != RF_INTR_CONTEXT) {
462 							/* we only have to
463 							 * enqueue if we're at
464 							 * intr context */
465 							/* put node on
466                                                            a list to
467                                                            be fired
468                                                            after we
469                                                            unlock */
470 							s->next = firelist;
471 							firelist = s;
472 						} else {
473 							/* enqueue the
474 							   node for
475 							   the dag
476 							   exec thread
477 							   to fire */
478 							RF_ASSERT(NodeReady(s));
479 							if (q) {
480 								q->next = s;
481 								q = s;
482 							} else {
483 								qh = q = s;
484 								qh->next = NULL;
485 							}
486 						}
487 				}
488 			}
489 		}
490 
491 		if (q) {
492 			/* xfer our local list of nodes to the node queue */
493 			q->next = raidPtr->node_queue;
494 			raidPtr->node_queue = qh;
495 			DO_SIGNAL(raidPtr);
496 		}
497 		DO_UNLOCK(raidPtr);
498 
499 		for (; skiplist; skiplist = next) {
500 			next = skiplist->next;
501 			skiplist->status = rf_skipped;
502 			for (i = 0; i < skiplist->numAntecedents; i++) {
503 				skiplist->antecedents[i]->numSuccFired++;
504 			}
505 			if (skiplist->commitNode) {
506 				skiplist->dagHdr->numCommits++;
507 			}
508 			rf_FinishNode(skiplist, context);
509 		}
510 		for (; finishlist; finishlist = next) {
511 			/* NIL nodes: no need to fire them */
512 			next = finishlist->next;
513 			finishlist->status = rf_good;
514 			for (i = 0; i < finishlist->numAntecedents; i++) {
515 				finishlist->antecedents[i]->numSuccFired++;
516 			}
517 			if (finishlist->commitNode)
518 				finishlist->dagHdr->numCommits++;
519 			/*
520 		         * Okay, here we're calling rf_FinishNode() on
521 		         * nodes that have the null function as their
522 		         * work proc. Such a node could be the
523 		         * terminal node in a DAG. If so, it will
524 		         * cause the DAG to complete, which will in
525 		         * turn free memory used by the DAG, which
526 		         * includes the node in question. Thus, we
527 		         * must avoid referencing the node at all
528 		         * after calling rf_FinishNode() on it.  */
529 			rf_FinishNode(finishlist, context);	/* recursive call */
530 		}
531 		/* fire all nodes in firelist */
532 		FireNodeList(firelist);
533 		break;
534 
535 	case rf_rollBackward:
536 		for (i = 0; i < node->numAntecedents; i++) {
537 			a = *(node->antecedents + i);
538 			RF_ASSERT(a->status == rf_good);
539 			RF_ASSERT(a->numSuccDone <= a->numSuccedents);
540 			RF_ASSERT(a->numSuccDone <= a->numSuccFired);
541 
542 			if (a->numSuccDone == a->numSuccFired) {
543 				if (a->undoFunc == rf_NullNodeFunc) {
544 					/* don't fire NIL nodes, just process
545 					 * them */
546 					a->next = finishlist;
547 					finishlist = a;
548 				} else {
549 					if (context != RF_INTR_CONTEXT) {
550 						/* we only have to enqueue if
551 						 * we're at intr context */
552 						/* put node on a list to be
553 						   fired after we unlock */
554 						a->next = firelist;
555 
556 						firelist = a;
557 					} else {
558 						/* enqueue the node for the
559 						   dag exec thread to fire */
560 						RF_ASSERT(NodeReady(a));
561 						if (q) {
562 							q->next = a;
563 							q = a;
564 						} else {
565 							qh = q = a;
566 							qh->next = NULL;
567 						}
568 					}
569 				}
570 			}
571 		}
572 		if (q) {
573 			/* xfer our local list of nodes to the node queue */
574 			q->next = raidPtr->node_queue;
575 			raidPtr->node_queue = qh;
576 			DO_SIGNAL(raidPtr);
577 		}
578 		DO_UNLOCK(raidPtr);
579 		for (; finishlist; finishlist = next) {
580 			/* NIL nodes: no need to fire them */
581 			next = finishlist->next;
582 			finishlist->status = rf_good;
583 			/*
584 		         * Okay, here we're calling rf_FinishNode() on
585 		         * nodes that have the null function as their
586 		         * work proc. Such a node could be the first
587 		         * node in a DAG. If so, it will cause the DAG
588 		         * to complete, which will in turn free memory
589 		         * used by the DAG, which includes the node in
590 		         * question. Thus, we must avoid referencing
591 		         * the node at all after calling
592 		         * rf_FinishNode() on it.  */
593 			rf_FinishNode(finishlist, context);	/* recursive call */
594 		}
595 		/* fire all nodes in firelist */
596 		FireNodeList(firelist);
597 
598 		break;
599 	default:
600 		printf("Engine found illegal DAG status in PropagateResults()\n");
601 		RF_PANIC();
602 		break;
603 	}
604 }
605 
606 
607 
608 /*
609  * Process a fired node which has completed
610  */
611 static void
612 ProcessNode(RF_DagNode_t *node, int context)
613 {
614 	RF_Raid_t *raidPtr;
615 
616 	raidPtr = node->dagHdr->raidPtr;
617 
618 	switch (node->status) {
619 	case rf_good:
620 		/* normal case, don't need to do anything */
621 		break;
622 	case rf_bad:
623 		if ((node->dagHdr->numCommits > 0) ||
624 		    (node->dagHdr->numCommitNodes == 0)) {
625 			/* crossed commit barrier */
626 			node->dagHdr->status = rf_rollForward;
627 			if (rf_engineDebug) {
628 				printf("raid%d: node (%s) returned fail, rolling forward\n", raidPtr->raidid, node->name);
629 			}
630 		} else {
631 			/* never reached commit barrier */
632 			node->dagHdr->status = rf_rollBackward;
633 			if (rf_engineDebug) {
634 				printf("raid%d: node (%s) returned fail, rolling backward\n", raidPtr->raidid, node->name);
635 			}
636 		}
637 		break;
638 	case rf_undone:
639 		/* normal rollBackward case, don't need to do anything */
640 		break;
641 	case rf_panic:
642 		/* an undo node failed!!! */
643 		printf("UNDO of a node failed!!!/n");
644 		break;
645 	default:
646 		printf("node finished execution with an illegal status!!!\n");
647 		RF_PANIC();
648 		break;
649 	}
650 
651 	/* enqueue node's succedents (antecedents if rollBackward) for
652 	 * execution */
653 	PropagateResults(node, context);
654 }
655 
656 
657 
658 /* user context or dag-exec-thread context:
659  * This is the first step in post-processing a newly-completed node.
660  * This routine is called by each node execution function to mark the node
661  * as complete and fire off any successors that have been enabled.
662  */
663 int
664 rf_FinishNode(RF_DagNode_t *node, int context)
665 {
666 	int     retcode = RF_FALSE;
667 	node->dagHdr->numNodesCompleted++;
668 	ProcessNode(node, context);
669 
670 	return (retcode);
671 }
672 
673 
674 /* user context: submit dag for execution, return non-zero if we have
675  * to wait for completion.  if and only if we return non-zero, we'll
676  * cause cbFunc to get invoked with cbArg when the DAG has completed.
677  *
678  * for now we always return 1.  If the DAG does not cause any I/O,
679  * then the callback may get invoked before DispatchDAG returns.
680  * There's code in state 5 of ContinueRaidAccess to handle this.
681  *
682  * All we do here is fire the direct successors of the header node.
683  * The DAG execution thread does the rest of the dag processing.  */
684 int
685 rf_DispatchDAG(RF_DagHeader_t *dag, void (*cbFunc) (void *),
686 	       void *cbArg)
687 {
688 	RF_Raid_t *raidPtr;
689 
690 	raidPtr = dag->raidPtr;
691 	if (dag->tracerec) {
692 		RF_ETIMER_START(dag->tracerec->timer);
693 	}
694 #if DEBUG
695 #if RF_DEBUG_VALIDATE_DAG
696 	if (rf_engineDebug || rf_validateDAGDebug) {
697 		if (rf_ValidateDAG(dag))
698 			RF_PANIC();
699 	}
700 #endif
701 #endif
702 	if (rf_engineDebug) {
703 		printf("raid%d: Entering DispatchDAG\n", raidPtr->raidid);
704 	}
705 	raidPtr->dags_in_flight++;	/* debug only:  blow off proper
706 					 * locking */
707 	dag->cbFunc = cbFunc;
708 	dag->cbArg = cbArg;
709 	dag->numNodesCompleted = 0;
710 	dag->status = rf_enable;
711 	FireNodeArray(dag->numSuccedents, dag->succedents);
712 	return (1);
713 }
714 /* dedicated kernel thread: the thread that handles all DAG node
715  * firing.  To minimize locking and unlocking, we grab a copy of the
716  * entire node queue and then set the node queue to NULL before doing
717  * any firing of nodes.  This way we only have to release the lock
718  * once.  Of course, it's probably rare that there's more than one
719  * node in the queue at any one time, but it sometimes happens.
720  */
721 
722 static void
723 DAGExecutionThread(RF_ThreadArg_t arg)
724 {
725 	RF_DagNode_t *nd, *local_nq, *term_nq, *fire_nq;
726 	RF_Raid_t *raidPtr;
727 	int     ks;
728 	int     s;
729 
730 	raidPtr = (RF_Raid_t *) arg;
731 
732 	if (rf_engineDebug) {
733 		printf("raid%d: Engine thread is running\n", raidPtr->raidid);
734 	}
735 
736 	s = splbio();
737 
738 	DO_LOCK(raidPtr);
739 	while (!raidPtr->shutdown_engine) {
740 
741 		while (raidPtr->node_queue != NULL) {
742 			local_nq = raidPtr->node_queue;
743 			fire_nq = NULL;
744 			term_nq = NULL;
745 			raidPtr->node_queue = NULL;
746 			DO_UNLOCK(raidPtr);
747 
748 			/* first, strip out the terminal nodes */
749 			while (local_nq) {
750 				nd = local_nq;
751 				local_nq = local_nq->next;
752 				switch (nd->dagHdr->status) {
753 				case rf_enable:
754 				case rf_rollForward:
755 					if (nd->numSuccedents == 0) {
756 						/* end of the dag, add to
757 						 * callback list */
758 						nd->next = term_nq;
759 						term_nq = nd;
760 					} else {
761 						/* not the end, add to the
762 						 * fire queue */
763 						nd->next = fire_nq;
764 						fire_nq = nd;
765 					}
766 					break;
767 				case rf_rollBackward:
768 					if (nd->numAntecedents == 0) {
769 						/* end of the dag, add to the
770 						 * callback list */
771 						nd->next = term_nq;
772 						term_nq = nd;
773 					} else {
774 						/* not the end, add to the
775 						 * fire queue */
776 						nd->next = fire_nq;
777 						fire_nq = nd;
778 					}
779 					break;
780 				default:
781 					RF_PANIC();
782 					break;
783 				}
784 			}
785 
786 			/* execute callback of dags which have reached the
787 			 * terminal node */
788 			while (term_nq) {
789 				nd = term_nq;
790 				term_nq = term_nq->next;
791 				nd->next = NULL;
792 				(nd->dagHdr->cbFunc) (nd->dagHdr->cbArg);
793 				raidPtr->dags_in_flight--;	/* debug only */
794 			}
795 
796 			/* fire remaining nodes */
797 			FireNodeList(fire_nq);
798 
799 			DO_LOCK(raidPtr);
800 		}
801 		while (!raidPtr->shutdown_engine &&
802 		       raidPtr->node_queue == NULL) {
803 			DO_WAIT(raidPtr);
804 		}
805 	}
806 	DO_UNLOCK(raidPtr);
807 
808 	splx(s);
809 	kthread_exit(0);
810 }
811 
812 /*
813  * rf_RaidIOThread() -- When I/O to a component completes,
814  * KernelWakeupFunc() puts the completed request onto raidPtr->iodone
815  * TAILQ.  This function looks after requests on that queue by calling
816  * rf_DiskIOComplete() for the request, and by calling any required
817  * CompleteFunc for the request.
818  */
819 
820 static void
821 rf_RaidIOThread(RF_ThreadArg_t arg)
822 {
823 	RF_Raid_t *raidPtr;
824 	RF_DiskQueueData_t *req;
825 	int s;
826 
827 	raidPtr = (RF_Raid_t *) arg;
828 
829 	s = splbio();
830 	simple_lock(&(raidPtr->iodone_lock));
831 
832 	while (!raidPtr->shutdown_raidio) {
833 		/* if there is nothing to do, then snooze. */
834 		if (TAILQ_EMPTY(&(raidPtr->iodone))) {
835 			ltsleep(&(raidPtr->iodone), PRIBIO, "raidiow", 0,
836 				&(raidPtr->iodone_lock));
837 		}
838 
839 		/* See what I/Os, if any, have arrived */
840 		while ((req = TAILQ_FIRST(&(raidPtr->iodone))) != NULL) {
841 			TAILQ_REMOVE(&(raidPtr->iodone), req, iodone_entries);
842 			simple_unlock(&(raidPtr->iodone_lock));
843 			rf_DiskIOComplete(req->queue, req, req->error);
844 			(req->CompleteFunc) (req->argument, req->error);
845 			simple_lock(&(raidPtr->iodone_lock));
846 		}
847 	}
848 
849 	/* Let rf_ShutdownEngine know that we're done... */
850 	raidPtr->shutdown_raidio = 0;
851 	wakeup(&(raidPtr->shutdown_raidio));
852 
853 	simple_unlock(&(raidPtr->iodone_lock));
854 	splx(s);
855 
856 	kthread_exit(0);
857 }
858