xref: /dflybsd-src/sys/dev/disk/xdisk/xdisk.c (revision 0302f53e78f8152d346f5a9ee1fbb6ec20f3527b)
1 /*
2  * Copyright (c) 2012-2014 The DragonFly Project.  All rights reserved.
3  *
4  * This code is derived from software contributed to The DragonFly Project
5  * by Matthew Dillon <dillon@dragonflybsd.org>
6  *
7  * Redistribution and use in source and binary forms, with or without
8  * modification, are permitted provided that the following conditions
9  * are met:
10  *
11  * 1. Redistributions of source code must retain the above copyright
12  *    notice, this list of conditions and the following disclaimer.
13  * 2. Redistributions in binary form must reproduce the above copyright
14  *    notice, this list of conditions and the following disclaimer in
15  *    the documentation and/or other materials provided with the
16  *    distribution.
17  * 3. Neither the name of The DragonFly Project nor the names of its
18  *    contributors may be used to endorse or promote products derived
19  *    from this software without specific, prior written permission.
20  *
21  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
22  * ``AS IS'' AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
23  * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
24  * FOR A PARTICULAR PURPOSE ARE DISCLAIMED.  IN NO EVENT SHALL THE
25  * COPYRIGHT HOLDERS OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
26  * INCIDENTAL, SPECIAL, EXEMPLARY OR CONSEQUENTIAL DAMAGES (INCLUDING,
27  * BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
28  * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
29  * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
30  * OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT
31  * OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
32  * SUCH DAMAGE.
33  */
34 /*
35  * This module allows disk devices to be created and associated with a
36  * communications pipe or socket.  You open the device and issue an
37  * ioctl() to install a new disk along with its communications descriptor.
38  *
39  * All further communication occurs via the descriptor using the DMSG
40  * LNK_CONN, LNK_SPAN, and BLOCK protocols.  The descriptor can be a
41  * direct connection to a remote machine's disk (in-kernenl), to a remote
42  * cluster controller, to the local cluster controller, etc.
43  *
44  * /dev/xdisk is the control device, issue ioctl()s to create the /dev/xa%d
45  * devices.  These devices look like raw disks to the system.
46  */
47 #include <sys/param.h>
48 #include <sys/systm.h>
49 #include <sys/buf.h>
50 #include <sys/conf.h>
51 #include <sys/device.h>
52 #include <sys/devicestat.h>
53 #include <sys/disk.h>
54 #include <sys/kernel.h>
55 #include <sys/malloc.h>
56 #include <sys/sysctl.h>
57 #include <sys/proc.h>
58 #include <sys/queue.h>
59 #include <sys/tree.h>
60 #include <sys/udev.h>
61 #include <sys/uuid.h>
62 #include <sys/kern_syscall.h>
63 
64 #include <sys/dmsg.h>
65 #include <sys/xdiskioctl.h>
66 
67 #include <sys/buf2.h>
68 
69 struct xa_softc;
70 struct xa_softc_tree;
71 RB_HEAD(xa_softc_tree, xa_softc);
72 RB_PROTOTYPE(xa_softc_tree, xa_softc, rbnode, xa_softc_cmp);
73 
74 static int xa_active;
75 SYSCTL_INT(_debug, OID_AUTO, xa_active, CTLFLAG_RW, &xa_active, 0,
76 	   "Number of active xdisk IOs");
77 static uint64_t xa_last;
78 SYSCTL_ULONG(_debug, OID_AUTO, xa_last, CTLFLAG_RW, &xa_last, 0,
79 	   "Offset of last xdisk IO");
80 static int xa_debug = 1;
81 SYSCTL_INT(_debug, OID_AUTO, xa_debug, CTLFLAG_RW, &xa_debug, 0,
82 	   "xdisk debugging");
83 
84 /*
85  * Track a BIO tag
86  */
87 struct xa_tag {
88 	TAILQ_ENTRY(xa_tag) entry;
89 	struct xa_softc	*sc;
90 	dmsg_blk_error_t status;
91 	kdmsg_state_t	*state;
92 	struct bio	*bio;
93 	int		waiting;
94 	int		async;
95 	int		done;
96 };
97 
98 typedef struct xa_tag	xa_tag_t;
99 
100 /*
101  * Track devices.
102  */
103 struct xa_softc {
104 	struct kdmsg_state_list spanq;
105 	RB_ENTRY(xa_softc) rbnode;
106 	cdev_t		dev;
107 	struct devstat	stats;
108 	struct disk_info info;
109 	struct disk	disk;
110 	uuid_t		peer_id;
111 	int		unit;
112 	int		opencnt;
113 	int		spancnt;
114 	uint64_t	keyid;
115 	int		serializing;
116 	int		last_error;
117 	int		terminating;
118 	char		peer_label[64];	/* from LNK_SPAN host/dev */
119 	char		pfs_label[64];	/* from LNK_SPAN serno */
120 	xa_tag_t	*open_tag;
121 	TAILQ_HEAD(, bio) bioq;		/* pending BIOs */
122 	TAILQ_HEAD(, xa_tag) tag_freeq;	/* available I/O tags */
123 	TAILQ_HEAD(, xa_tag) tag_pendq;	/* running I/O tags */
124 	struct lock	lk;
125 };
126 
127 typedef struct xa_softc	xa_softc_t;
128 
129 struct xa_iocom {
130 	TAILQ_ENTRY(xa_iocom) entry;
131 	kdmsg_iocom_t	iocom;
132 	xa_softc_t	dummysc;
133 };
134 
135 typedef struct xa_iocom xa_iocom_t;
136 
137 static int xa_softc_cmp(xa_softc_t *sc1, xa_softc_t *sc2);
138 RB_GENERATE(xa_softc_tree, xa_softc, rbnode, xa_softc_cmp);
139 static struct xa_softc_tree xa_device_tree;
140 
141 #define MAXTAGS		64	/* no real limit */
142 
143 static int xdisk_attach(struct xdisk_attach_ioctl *xaioc);
144 static int xdisk_detach(struct xdisk_attach_ioctl *xaioc);
145 static void xaio_exit(kdmsg_iocom_t *iocom);
146 static int xaio_rcvdmsg(kdmsg_msg_t *msg);
147 
148 static void xa_terminate_check(struct xa_softc *sc);
149 
150 static xa_tag_t *xa_setup_cmd(xa_softc_t *sc, struct bio *bio);
151 static void xa_start(xa_tag_t *tag, kdmsg_msg_t *msg, int async);
152 static void xa_done(xa_tag_t *tag, int wasbio);
153 static void xa_release(xa_tag_t *tag, int wasbio);
154 static uint32_t xa_wait(xa_tag_t *tag);
155 static int xa_sync_completion(kdmsg_state_t *state, kdmsg_msg_t *msg);
156 static int xa_bio_completion(kdmsg_state_t *state, kdmsg_msg_t *msg);
157 static void xa_restart_deferred(xa_softc_t *sc);
158 
159 #define xa_printf(level, ctl, ...)	\
160 	if (xa_debug >= (level)) kprintf("xdisk: " ctl, __VA_ARGS__)
161 
162 MALLOC_DEFINE(M_XDISK, "Networked disk client", "Network Disks");
163 
164 /*
165  * Control device, issue ioctls to create xa devices.
166  */
167 static d_open_t xdisk_open;
168 static d_close_t xdisk_close;
169 static d_ioctl_t xdisk_ioctl;
170 
171 static struct dev_ops xdisk_ops = {
172 	{ "xdisk", 0, D_MPSAFE | D_TRACKCLOSE },
173         .d_open =	xdisk_open,
174         .d_close =	xdisk_close,
175         .d_ioctl =	xdisk_ioctl
176 };
177 
178 /*
179  * XA disk devices
180  */
181 static d_open_t xa_open;
182 static d_close_t xa_close;
183 static d_ioctl_t xa_ioctl;
184 static d_strategy_t xa_strategy;
185 static d_psize_t xa_size;
186 
187 static struct dev_ops xa_ops = {
188 	{ "xa", 0, D_DISK | D_CANFREE | D_MPSAFE | D_TRACKCLOSE },
189         .d_open =	xa_open,
190         .d_close =	xa_close,
191         .d_ioctl =	xa_ioctl,
192         .d_read =	physread,
193         .d_write =	physwrite,
194         .d_strategy =	xa_strategy,
195 	.d_psize =	xa_size
196 };
197 
198 static int xdisk_opencount;
199 static cdev_t xdisk_dev;
200 struct lock xdisk_lk;
201 static TAILQ_HEAD(, xa_iocom) xaiocomq;
202 
203 /*
204  * Module initialization
205  */
206 static int
207 xdisk_modevent(module_t mod, int type, void *data)
208 {
209 	switch (type) {
210 	case MOD_LOAD:
211 		TAILQ_INIT(&xaiocomq);
212 		RB_INIT(&xa_device_tree);
213 		lockinit(&xdisk_lk, "xdisk", 0, 0);
214 		xdisk_dev = make_dev(&xdisk_ops, 0,
215 				     UID_ROOT, GID_WHEEL, 0600, "xdisk");
216 		break;
217 	case MOD_UNLOAD:
218 	case MOD_SHUTDOWN:
219 		if (!RB_EMPTY(&xa_device_tree))
220 			return (EBUSY);
221 		if (xdisk_opencount || TAILQ_FIRST(&xaiocomq))
222 			return (EBUSY);
223 		if (xdisk_dev) {
224 			destroy_dev(xdisk_dev);
225 			xdisk_dev = NULL;
226 		}
227 		dev_ops_remove_all(&xdisk_ops);
228 		dev_ops_remove_all(&xa_ops);
229 		break;
230 	default:
231 		break;
232 	}
233 	return 0;
234 }
235 
236 DEV_MODULE(xdisk, xdisk_modevent, 0);
237 
238 static int
239 xa_softc_cmp(xa_softc_t *sc1, xa_softc_t *sc2)
240 {
241 	return(strcmp(sc1->pfs_label, sc2->pfs_label));
242 }
243 
244 /*
245  * Control device
246  */
247 static int
248 xdisk_open(struct dev_open_args *ap)
249 {
250 	lockmgr(&xdisk_lk, LK_EXCLUSIVE);
251 	++xdisk_opencount;
252 	lockmgr(&xdisk_lk, LK_RELEASE);
253 	return(0);
254 }
255 
256 static int
257 xdisk_close(struct dev_close_args *ap)
258 {
259 	lockmgr(&xdisk_lk, LK_EXCLUSIVE);
260 	--xdisk_opencount;
261 	lockmgr(&xdisk_lk, LK_RELEASE);
262 	return(0);
263 }
264 
265 static int
266 xdisk_ioctl(struct dev_ioctl_args *ap)
267 {
268 	int error;
269 
270 	switch(ap->a_cmd) {
271 	case XDISKIOCATTACH:
272 		error = xdisk_attach((void *)ap->a_data);
273 		break;
274 	case XDISKIOCDETACH:
275 		error = xdisk_detach((void *)ap->a_data);
276 		break;
277 	default:
278 		error = ENOTTY;
279 		break;
280 	}
281 	return error;
282 }
283 
284 /************************************************************************
285  *				DMSG INTERFACE				*
286  ************************************************************************/
287 
288 static int
289 xdisk_attach(struct xdisk_attach_ioctl *xaioc)
290 {
291 	xa_iocom_t *xaio;
292 	struct file *fp;
293 
294 	/*
295 	 * Normalize ioctl params
296 	 */
297 	fp = holdfp(curthread, xaioc->fd, -1);
298 	if (fp == NULL)
299 		return EINVAL;
300 	xa_printf(1, "xdisk_attach fp=%p\n", fp);
301 
302 	/*
303 	 * See if the serial number is already present.  If we are
304 	 * racing a termination the disk subsystem may still have
305 	 * duplicate entries not yet removed so we wait a bit and
306 	 * retry.
307 	 */
308 	lockmgr(&xdisk_lk, LK_EXCLUSIVE);
309 
310 	xaio = kmalloc(sizeof(*xaio), M_XDISK, M_WAITOK | M_ZERO);
311 	kdmsg_iocom_init(&xaio->iocom, xaio,
312 			 KDMSG_IOCOMF_AUTOCONN,
313 			 M_XDISK, xaio_rcvdmsg);
314 	xaio->iocom.exit_func = xaio_exit;
315 
316 	kdmsg_iocom_reconnect(&xaio->iocom, fp, "xdisk");
317 
318 	/*
319 	 * Setup our LNK_CONN advertisement for autoinitiate.
320 	 *
321 	 * Our filter is setup to only accept PEER_BLOCK advertisements.
322 	 * XXX no peer_id filter.
323 	 *
324 	 * We need a unique pfs_fsid to avoid confusion.
325 	 */
326 	xaio->iocom.auto_lnk_conn.peer_type = DMSG_PEER_CLIENT;
327 	xaio->iocom.auto_lnk_conn.proto_version = DMSG_SPAN_PROTO_1;
328 	xaio->iocom.auto_lnk_conn.peer_mask = 1LLU << DMSG_PEER_BLOCK;
329 	ksnprintf(xaio->iocom.auto_lnk_conn.peer_label,
330 		  sizeof(xaio->iocom.auto_lnk_conn.peer_label),
331 		  "%s/xdisk",
332 		  hostname);
333 	/* kern_uuidgen(&xaio->iocom.auto_lnk_conn.pfs_fsid, 1); */
334 
335 	/*
336 	 * Setup our LNK_SPAN advertisement for autoinitiate
337 	 */
338 	TAILQ_INSERT_TAIL(&xaiocomq, xaio, entry);
339 	kdmsg_iocom_autoinitiate(&xaio->iocom, NULL);
340 
341 	lockmgr(&xdisk_lk, LK_RELEASE);
342 
343 	return 0;
344 }
345 
346 static int
347 xdisk_detach(struct xdisk_attach_ioctl *xaioc)
348 {
349 	return EINVAL;
350 }
351 
352 /*
353  * Called from iocom core transmit thread upon disconnect.
354  */
355 static
356 void
357 xaio_exit(kdmsg_iocom_t *iocom)
358 {
359 	xa_iocom_t *xaio = iocom->handle;
360 
361 	lockmgr(&xdisk_lk, LK_EXCLUSIVE);
362 	xa_printf(1, "%s", "xdisk_detach [xaio_exit()]\n");
363 	TAILQ_REMOVE(&xaiocomq, xaio, entry);
364 	lockmgr(&xdisk_lk, LK_RELEASE);
365 
366 	kdmsg_iocom_uninit(&xaio->iocom);
367 
368 	kfree(xaio, M_XDISK);
369 }
370 
371 /*
372  * Called from iocom core to handle messages that the iocom core does not
373  * handle itself and for which a state function callback has not yet been
374  * established.
375  *
376  * We primarily care about LNK_SPAN transactions here.
377  */
378 static int
379 xaio_rcvdmsg(kdmsg_msg_t *msg)
380 {
381 	kdmsg_state_t	*state = msg->state;
382 	xa_iocom_t	*xaio = state->iocom->handle;
383 	xa_softc_t	*sc;
384 
385 	if (state) {
386 		xa_printf(4,
387 			"xdisk - rcvmsg state=%p rx=%08x tx=%08x msgcmd=%08x\n",
388 			state, state->rxcmd, state->txcmd,
389 			msg->any.head.cmd);
390 	}
391 	lockmgr(&xdisk_lk, LK_EXCLUSIVE);
392 
393 	switch(msg->tcmd) {
394 	case DMSG_LNK_SPAN | DMSGF_CREATE | DMSGF_DELETE:
395 		/*
396 		 * A LNK_SPAN transaction which is opened and closed
397 		 * degenerately is not useful to us, just ignore it.
398 		 */
399 		kdmsg_msg_reply(msg, 0);
400 		break;
401 	case DMSG_LNK_SPAN | DMSGF_CREATE:
402 		/*
403 		 * Manage the tracking node for the remote LNK_SPAN.
404 		 *
405 		 * Return a streaming result, leaving the transaction open
406 		 * in both directions to allow sub-transactions.
407 		 */
408 		bcopy(msg->any.lnk_span.peer_label, xaio->dummysc.peer_label,
409 		      sizeof(xaio->dummysc.peer_label));
410 		xaio->dummysc.peer_label[
411 			sizeof(xaio->dummysc.peer_label) - 1] = 0;
412 
413 		bcopy(msg->any.lnk_span.pfs_label, xaio->dummysc.pfs_label,
414 		      sizeof(xaio->dummysc.pfs_label));
415 		xaio->dummysc.pfs_label[
416 			sizeof(xaio->dummysc.pfs_label) - 1] = 0;
417 
418 		xa_printf(3, "LINK_SPAN state %p create for %s\n",
419 			  msg->state, msg->any.lnk_span.pfs_label);
420 
421 		sc = RB_FIND(xa_softc_tree, &xa_device_tree, &xaio->dummysc);
422 		if (sc == NULL) {
423 			xa_softc_t *sctmp;
424 			xa_tag_t *tag;
425 			cdev_t dev;
426 			int unit;
427 			int n;
428 
429 			sc = kmalloc(sizeof(*sc), M_XDISK, M_WAITOK | M_ZERO);
430 			bcopy(msg->any.lnk_span.peer_label, sc->peer_label,
431 			      sizeof(sc->peer_label));
432 			sc->peer_label[sizeof(sc->peer_label) - 1] = 0;
433 			bcopy(msg->any.lnk_span.pfs_label, sc->pfs_label,
434 			      sizeof(sc->pfs_label));
435 			sc->pfs_label[sizeof(sc->pfs_label) - 1] = 0;
436 
437 			/* XXX FIXME O(N^2) */
438 			unit = -1;
439 			do {
440 				++unit;
441 				RB_FOREACH(sctmp, xa_softc_tree,
442 					   &xa_device_tree) {
443 					if (sctmp->unit == unit)
444 						break;
445 				}
446 			} while (sctmp);
447 
448 			sc->unit = unit;
449 			sc->serializing = 1;
450 			sc->spancnt = 1;
451 			lockinit(&sc->lk, "xalk", 0, 0);
452 			TAILQ_INIT(&sc->spanq);
453 			TAILQ_INIT(&sc->bioq);
454 			TAILQ_INIT(&sc->tag_freeq);
455 			TAILQ_INIT(&sc->tag_pendq);
456 
457 			lockmgr(&sc->lk, LK_EXCLUSIVE);
458 			RB_INSERT(xa_softc_tree, &xa_device_tree, sc);
459 			TAILQ_INSERT_TAIL(&sc->spanq, msg->state, user_entry);
460 			msg->state->any.xa_sc = sc;
461 
462 			/*
463 			 * Setup block device
464 			 */
465 			for (n = 0; n < MAXTAGS; ++n) {
466 				tag = kmalloc(sizeof(*tag),
467 					      M_XDISK, M_WAITOK|M_ZERO);
468 				tag->sc = sc;
469 				TAILQ_INSERT_TAIL(&sc->tag_freeq, tag, entry);
470 			}
471 
472 			if (sc->dev == NULL) {
473 				dev = disk_create(unit, &sc->disk, &xa_ops);
474 				dev->si_drv1 = sc;
475 				sc->dev = dev;
476 				devstat_add_entry(&sc->stats, "xa", unit,
477 						  DEV_BSIZE,
478 						  DEVSTAT_NO_ORDERED_TAGS,
479 						  DEVSTAT_TYPE_DIRECT |
480 						  DEVSTAT_TYPE_IF_OTHER,
481 						  DEVSTAT_PRIORITY_OTHER);
482 			}
483 
484 			sc->info.d_media_blksize =
485 				msg->any.lnk_span.media.block.blksize;
486 			if (sc->info.d_media_blksize <= 0)
487 				sc->info.d_media_blksize = 1;
488 			sc->info.d_media_blocks =
489 				msg->any.lnk_span.media.block.bytes /
490 				sc->info.d_media_blksize;
491 			sc->info.d_dsflags = DSO_MBRQUIET | DSO_RAWPSIZE;
492 			sc->info.d_secpertrack = 32;
493 			sc->info.d_nheads = 64;
494 			sc->info.d_secpercyl = sc->info.d_secpertrack *
495 					       sc->info.d_nheads;
496 			sc->info.d_ncylinders = 0;
497 			if (sc->pfs_label[0])
498 				sc->info.d_serialno = sc->pfs_label;
499 			/*
500 			 * WARNING! disk_setdiskinfo() must be asynchronous
501 			 *	    because we are in the rxmsg thread.  If
502 			 *	    it is synchronous and issues more disk
503 			 *	    I/Os, we will deadlock.
504 			 */
505 			disk_setdiskinfo(&sc->disk, &sc->info);
506 			xa_restart_deferred(sc);	/* eats serializing */
507 			lockmgr(&sc->lk, LK_RELEASE);
508 		} else {
509 			lockmgr(&sc->lk, LK_EXCLUSIVE);
510 			++sc->spancnt;
511 			TAILQ_INSERT_TAIL(&sc->spanq, msg->state, user_entry);
512 			msg->state->any.xa_sc = sc;
513 			if (sc->serializing == 0 && sc->open_tag == NULL) {
514 				sc->serializing = 1;
515 				xa_restart_deferred(sc); /* eats serializing */
516 			}
517 			lockmgr(&sc->lk, LK_RELEASE);
518 			if (sc->dev && sc->dev->si_disk) {
519 				xa_printf(1, "reprobe disk: %s\n",
520 					  sc->pfs_label);
521 				disk_msg_send(DISK_DISK_REPROBE,
522 					      sc->dev->si_disk,
523 					      NULL);
524 			}
525 		}
526 		xa_printf(2, "sc %p spancnt %d\n", sc, sc->spancnt);
527 		kdmsg_msg_result(msg, 0);
528 		break;
529 	case DMSG_LNK_SPAN | DMSGF_DELETE:
530 		/*
531 		 * Manage the tracking node for the remote LNK_SPAN.
532 		 *
533 		 * Return a final result, closing our end of the transaction.
534 		 */
535 		sc = msg->state->any.xa_sc;
536 		xa_printf(3, "LINK_SPAN state %p delete for %s (sc=%p)\n",
537 			  msg->state, (sc ? sc->pfs_label : "(null)"), sc);
538 		lockmgr(&sc->lk, LK_EXCLUSIVE);
539 		msg->state->any.xa_sc = NULL;
540 		TAILQ_REMOVE(&sc->spanq, msg->state, user_entry);
541 		--sc->spancnt;
542 
543 		xa_printf(2, "sc %p spancnt %d\n", sc, sc->spancnt);
544 
545 		/*
546 		 * Spans can come and go as the graph stabilizes, so if
547 		 * we lose a span along with sc->open_tag we may be able
548 		 * to restart the I/Os on a different span.
549 		 */
550 		if (sc->spancnt &&
551 		    sc->serializing == 0 && sc->open_tag == NULL) {
552 			sc->serializing = 1;
553 			xa_restart_deferred(sc);
554 		}
555 		lockmgr(&sc->lk, LK_RELEASE);
556 		kdmsg_msg_reply(msg, 0);
557 
558 #if 0
559 		/*
560 		 * Termination
561 		 */
562 		if (sc->spancnt == 0)
563 			xa_terminate_check(sc);
564 #endif
565 		break;
566 	case DMSG_LNK_SPAN | DMSGF_DELETE | DMSGF_REPLY:
567 		/*
568 		 * Ignore unimplemented streaming replies on our LNK_SPAN
569 		 * transaction.
570 		 */
571 		xa_printf(3, "LINK_SPAN state %p delete+reply\n",
572 			  msg->state);
573 		break;
574 	case DMSG_LNK_SPAN | DMSGF_REPLY:
575 		/*
576 		 * Ignore unimplemented streaming replies on our LNK_SPAN
577 		 * transaction.
578 		 */
579 		xa_printf(3, "LINK_SPAN state %p reply\n",
580 			  msg->state);
581 		break;
582 	case DMSG_DBG_SHELL:
583 		/*
584 		 * Execute shell command (not supported atm).
585 		 *
586 		 * This is a one-way packet but if not (e.g. if part of
587 		 * a streaming transaction), we will have already closed
588 		 * our end.
589 		 */
590 		kdmsg_msg_reply(msg, DMSG_ERR_NOSUPP);
591 		break;
592 	case DMSG_DBG_SHELL | DMSGF_REPLY:
593 		/*
594 		 * Receive one or more replies to a shell command
595 		 * that we sent.  Just dump it to the console.
596 		 *
597 		 * This is a one-way packet but if not (e.g. if
598 		 * part of a streaming transaction), we will have
599 		 * already closed our end.
600 		 */
601 		if (msg->aux_data) {
602 			msg->aux_data[msg->aux_size - 1] = 0;
603 			xa_printf(0, "DEBUGMSG: %s\n", msg->aux_data);
604 		}
605 		break;
606 	default:
607 		/*
608 		 * Unsupported one-way message, streaming message, or
609 		 * transaction.
610 		 *
611 		 * Terminate any unsupported transactions with an error
612 		 * and ignore any unsupported streaming messages.
613 		 *
614 		 * NOTE: This case also includes DMSG_LNK_ERROR messages
615 		 *	 which might be one-way, replying to those would
616 		 *	 cause an infinite ping-pong.
617 		 */
618 		if (msg->any.head.cmd & DMSGF_CREATE)
619 			kdmsg_msg_reply(msg, DMSG_ERR_NOSUPP);
620 		break;
621 	}
622 	lockmgr(&xdisk_lk, LK_RELEASE);
623 
624 	return 0;
625 }
626 
627 /*
628  * Determine if we can destroy the xa_softc.
629  *
630  * Called with xdisk_lk held.
631  */
632 static
633 void
634 xa_terminate_check(struct xa_softc *sc)
635 {
636 	xa_tag_t *tag;
637 
638 	/*
639 	 * Determine if we can destroy the softc.
640 	 */
641 	xa_printf(1, "Terminate check xa%d (%d,%d,%d) sc=%p ",
642 		sc->unit,
643 		sc->opencnt, sc->serializing, sc->spancnt,
644 		sc);
645 
646 	if (sc->opencnt || sc->serializing || sc->spancnt ||
647 	    TAILQ_FIRST(&sc->bioq) || TAILQ_FIRST(&sc->tag_pendq)) {
648 		xa_printf(1, "%s", "(leave intact)\n");
649 		return;
650 	}
651 
652 	/*
653 	 * Remove from device tree, a race with a new incoming span
654 	 * will create a new softc and disk.
655 	 */
656 	RB_REMOVE(xa_softc_tree, &xa_device_tree, sc);
657 	sc->terminating = 1;
658 
659 	/*
660 	 * Device has to go first to prevent device ops races.
661 	 */
662 	if (sc->dev) {
663 		disk_destroy(&sc->disk);
664 		devstat_remove_entry(&sc->stats);
665 		sc->dev->si_drv1 = NULL;
666 		sc->dev = NULL;
667 	}
668 
669 	xa_printf(1, "%s", "(remove from tree)\n");
670 	sc->serializing = 1;
671 	KKASSERT(sc->opencnt == 0);
672 	KKASSERT(TAILQ_EMPTY(&sc->tag_pendq));
673 
674 	while ((tag = TAILQ_FIRST(&sc->tag_freeq)) != NULL) {
675 		TAILQ_REMOVE(&sc->tag_freeq, tag, entry);
676 		tag->sc = NULL;
677 		kfree(tag, M_XDISK);
678 	}
679 
680 	kfree(sc, M_XDISK);
681 }
682 
683 /************************************************************************
684  *			   XA DEVICE INTERFACE				*
685  ************************************************************************/
686 
687 static int
688 xa_open(struct dev_open_args *ap)
689 {
690 	cdev_t dev = ap->a_head.a_dev;
691 	xa_softc_t *sc;
692 	int error;
693 
694 	dev->si_bsize_phys = 512;
695 	dev->si_bsize_best = 32768;
696 
697 	/*
698 	 * Interlock open with opencnt, wait for attachment operations
699 	 * to finish.
700 	 */
701 	lockmgr(&xdisk_lk, LK_EXCLUSIVE);
702 again:
703 	sc = dev->si_drv1;
704 	if (sc == NULL) {
705 		lockmgr(&xdisk_lk, LK_RELEASE);
706 		return ENXIO;	/* raced destruction */
707 	}
708 	if (sc->serializing) {
709 		tsleep(sc, 0, "xarace", hz / 10);
710 		goto again;
711 	}
712 	if (sc->terminating) {
713 		lockmgr(&xdisk_lk, LK_RELEASE);
714 		return ENXIO;	/* raced destruction */
715 	}
716 	sc->serializing = 1;
717 
718 	/*
719 	 * Serialize initial open
720 	 */
721 	if (sc->opencnt++ > 0) {
722 		sc->serializing = 0;
723 		wakeup(sc);
724 		lockmgr(&xdisk_lk, LK_RELEASE);
725 		return(0);
726 	}
727 
728 	/*
729 	 * Issue BLK_OPEN if necessary.  ENXIO is returned if we have trouble.
730 	 */
731 	if (sc->open_tag == NULL) {
732 		lockmgr(&sc->lk, LK_EXCLUSIVE);
733 		xa_restart_deferred(sc); /* eats serializing */
734 		lockmgr(&sc->lk, LK_RELEASE);
735 	} else {
736 		sc->serializing = 0;
737 		wakeup(sc);
738 	}
739 	lockmgr(&xdisk_lk, LK_RELEASE);
740 
741 	/*
742 	 * Wait for completion of the BLK_OPEN
743 	 */
744 	lockmgr(&xdisk_lk, LK_EXCLUSIVE);
745 	while (sc->serializing)
746 		lksleep(sc, &xdisk_lk, 0, "xaopen", hz);
747 
748 	error = sc->last_error;
749 	if (error) {
750 		KKASSERT(sc->opencnt > 0);
751 		--sc->opencnt;
752 		xa_terminate_check(sc);
753 		sc = NULL;	/* sc may be invalid now */
754 	}
755 	lockmgr(&xdisk_lk, LK_RELEASE);
756 
757 	return (error);
758 }
759 
760 static int
761 xa_close(struct dev_close_args *ap)
762 {
763 	cdev_t dev = ap->a_head.a_dev;
764 	xa_softc_t *sc;
765 	xa_tag_t *tag;
766 
767 	lockmgr(&xdisk_lk, LK_EXCLUSIVE);
768 	sc = dev->si_drv1;
769 	if (sc == NULL) {
770 		lockmgr(&sc->lk, LK_RELEASE);
771 		return ENXIO;	/* raced destruction */
772 	}
773 	if (sc->terminating) {
774 		lockmgr(&sc->lk, LK_RELEASE);
775 		return ENXIO;	/* raced destruction */
776 	}
777 	lockmgr(&sc->lk, LK_EXCLUSIVE);
778 
779 	/*
780 	 * NOTE: Clearing open_tag allows a concurrent open to re-open
781 	 *	 the device and prevents autonomous completion of the tag.
782 	 */
783 	if (sc->opencnt == 1 && sc->open_tag) {
784 		tag = sc->open_tag;
785 		sc->open_tag = NULL;
786 		lockmgr(&sc->lk, LK_RELEASE);
787 		kdmsg_state_reply(tag->state, 0);	/* close our side */
788 		xa_wait(tag);				/* wait on remote */
789 	} else {
790 		lockmgr(&sc->lk, LK_RELEASE);
791 	}
792 	KKASSERT(sc->opencnt > 0);
793 	--sc->opencnt;
794 	xa_terminate_check(sc);
795 	lockmgr(&xdisk_lk, LK_RELEASE);
796 
797 	return(0);
798 }
799 
800 static int
801 xa_strategy(struct dev_strategy_args *ap)
802 {
803 	xa_softc_t *sc = ap->a_head.a_dev->si_drv1;
804 	xa_tag_t *tag;
805 	struct bio *bio = ap->a_bio;
806 
807 	devstat_start_transaction(&sc->stats);
808 	atomic_add_int(&xa_active, 1);
809 	xa_last = bio->bio_offset;
810 
811 	/*
812 	 * If no tags are available NULL is returned and the bio is
813 	 * placed on sc->bioq.
814 	 */
815 	lockmgr(&sc->lk, LK_EXCLUSIVE);
816 	tag = xa_setup_cmd(sc, bio);
817 	if (tag)
818 		xa_start(tag, NULL, 1);
819 	lockmgr(&sc->lk, LK_RELEASE);
820 
821 	return(0);
822 }
823 
824 static int
825 xa_ioctl(struct dev_ioctl_args *ap)
826 {
827 	return(ENOTTY);
828 }
829 
830 static int
831 xa_size(struct dev_psize_args *ap)
832 {
833 	struct xa_softc *sc;
834 
835 	if ((sc = ap->a_head.a_dev->si_drv1) == NULL)
836 		return (ENXIO);
837 	ap->a_result = sc->info.d_media_blocks;
838 	return (0);
839 }
840 
841 /************************************************************************
842  *		    XA BLOCK PROTOCOL STATE MACHINE			*
843  ************************************************************************
844  *
845  * Implement tag/msg setup and related functions.
846  * Called with sc->lk held.
847  */
848 static xa_tag_t *
849 xa_setup_cmd(xa_softc_t *sc, struct bio *bio)
850 {
851 	xa_tag_t *tag;
852 
853 	/*
854 	 * Only get a tag if we have a valid virtual circuit to the server.
855 	 */
856 	if ((tag = TAILQ_FIRST(&sc->tag_freeq)) != NULL) {
857 		TAILQ_REMOVE(&sc->tag_freeq, tag, entry);
858 		tag->bio = bio;
859 		TAILQ_INSERT_TAIL(&sc->tag_pendq, tag, entry);
860 	}
861 
862 	/*
863 	 * If we can't dispatch now and this is a bio, queue it for later.
864 	 */
865 	if (tag == NULL && bio) {
866 		TAILQ_INSERT_TAIL(&sc->bioq, bio, bio_act);
867 	}
868 
869 	return (tag);
870 }
871 
872 /*
873  * Called with sc->lk held
874  */
875 static void
876 xa_start(xa_tag_t *tag, kdmsg_msg_t *msg, int async)
877 {
878 	xa_softc_t *sc = tag->sc;
879 
880 	tag->done = 0;
881 	tag->async = async;
882 	tag->status.head.error = DMSG_ERR_IO;	/* fallback error */
883 
884 	if (msg == NULL) {
885 		struct bio *bio;
886 		struct buf *bp;
887 		kdmsg_state_t *trans;
888 
889 		if (sc->opencnt == 0 || sc->open_tag == NULL) {
890 			TAILQ_FOREACH(trans, &sc->spanq, user_entry) {
891 				if ((trans->rxcmd & DMSGF_DELETE) == 0)
892 					break;
893 			}
894 		} else {
895 			trans = sc->open_tag->state;
896 		}
897 		if (trans == NULL)
898 			goto skip;
899 
900 		KKASSERT(tag->bio);
901 		bio = tag->bio;
902 		bp = bio->bio_buf;
903 
904 		switch(bp->b_cmd) {
905 		case BUF_CMD_READ:
906 			msg = kdmsg_msg_alloc(trans,
907 					      DMSG_BLK_READ |
908 					      DMSGF_CREATE |
909 					      DMSGF_DELETE,
910 					      xa_bio_completion, tag);
911 			msg->any.blk_read.keyid = sc->keyid;
912 			msg->any.blk_read.offset = bio->bio_offset;
913 			msg->any.blk_read.bytes = bp->b_bcount;
914 			break;
915 		case BUF_CMD_WRITE:
916 			msg = kdmsg_msg_alloc(trans,
917 					      DMSG_BLK_WRITE |
918 					      DMSGF_CREATE | DMSGF_DELETE,
919 					      xa_bio_completion, tag);
920 			msg->any.blk_write.keyid = sc->keyid;
921 			msg->any.blk_write.offset = bio->bio_offset;
922 			msg->any.blk_write.bytes = bp->b_bcount;
923 			msg->aux_data = bp->b_data;
924 			msg->aux_size = bp->b_bcount;
925 			break;
926 		case BUF_CMD_FLUSH:
927 			msg = kdmsg_msg_alloc(trans,
928 					      DMSG_BLK_FLUSH |
929 					      DMSGF_CREATE | DMSGF_DELETE,
930 					      xa_bio_completion, tag);
931 			msg->any.blk_flush.keyid = sc->keyid;
932 			msg->any.blk_flush.offset = bio->bio_offset;
933 			msg->any.blk_flush.bytes = bp->b_bcount;
934 			break;
935 		case BUF_CMD_FREEBLKS:
936 			msg = kdmsg_msg_alloc(trans,
937 					      DMSG_BLK_FREEBLKS |
938 					      DMSGF_CREATE | DMSGF_DELETE,
939 					      xa_bio_completion, tag);
940 			msg->any.blk_freeblks.keyid = sc->keyid;
941 			msg->any.blk_freeblks.offset = bio->bio_offset;
942 			msg->any.blk_freeblks.bytes = bp->b_bcount;
943 			break;
944 		default:
945 			bp->b_flags |= B_ERROR;
946 			bp->b_error = EIO;
947 			devstat_end_transaction_buf(&sc->stats, bp);
948 			atomic_add_int(&xa_active, -1);
949 			biodone(bio);
950 			tag->bio = NULL;
951 			break;
952 		}
953 	}
954 
955 	/*
956 	 * If no msg was allocated we likely could not find a good span.
957 	 */
958 skip:
959 	if (msg) {
960 		/*
961 		 * Message was passed in or constructed.
962 		 */
963 		tag->state = msg->state;
964 		lockmgr(&sc->lk, LK_RELEASE);
965 		kdmsg_msg_write(msg);
966 		lockmgr(&sc->lk, LK_EXCLUSIVE);
967 	} else if (tag->bio &&
968 		   (tag->bio->bio_buf->b_flags & B_FAILONDIS) == 0) {
969 		/*
970 		 * No spans available but BIO is not allowed to fail
971 		 * on connectivity problems.  Requeue the BIO.
972 		 */
973 		TAILQ_INSERT_TAIL(&sc->bioq, tag->bio, bio_act);
974 		tag->bio = NULL;
975 		lockmgr(&sc->lk, LK_RELEASE);
976 		xa_done(tag, 1);
977 		lockmgr(&sc->lk, LK_EXCLUSIVE);
978 	} else {
979 		/*
980 		 * No spans available, bio is allowed to fail.
981 		 */
982 		lockmgr(&sc->lk, LK_RELEASE);
983 		tag->status.head.error = DMSG_ERR_IO;
984 		xa_done(tag, 1);
985 		lockmgr(&sc->lk, LK_EXCLUSIVE);
986 	}
987 }
988 
989 static uint32_t
990 xa_wait(xa_tag_t *tag)
991 {
992 	xa_softc_t *sc = tag->sc;
993 	uint32_t error;
994 
995 	lockmgr(&sc->lk, LK_EXCLUSIVE);
996 	tag->waiting = 1;
997 	while (tag->done == 0)
998 		lksleep(tag, &sc->lk, 0, "xawait", 0);
999 	lockmgr(&sc->lk, LK_RELEASE);
1000 
1001 	error = tag->status.head.error;
1002 	tag->waiting = 0;
1003 	xa_release(tag, 0);
1004 
1005 	return error;
1006 }
1007 
1008 static void
1009 xa_done(xa_tag_t *tag, int wasbio)
1010 {
1011 	KKASSERT(tag->bio == NULL);
1012 
1013 	tag->state = NULL;
1014 	tag->done = 1;
1015 	if (tag->waiting)
1016 		wakeup(tag);
1017 	if (tag->async)
1018 		xa_release(tag, wasbio);
1019 }
1020 
1021 /*
1022  * Release a tag.  If everything looks ok and there are pending BIOs
1023  * (due to all tags in-use), we can use the tag to start the next BIO.
1024  * Do not try to restart if the connection is currently failed.
1025  */
1026 static
1027 void
1028 xa_release(xa_tag_t *tag, int wasbio)
1029 {
1030 	xa_softc_t *sc = tag->sc;
1031 	struct bio *bio;
1032 
1033 	if ((bio = tag->bio) != NULL) {
1034 		struct buf *bp = bio->bio_buf;
1035 
1036 		bp->b_error = EIO;
1037 		bp->b_flags |= B_ERROR;
1038 		devstat_end_transaction_buf(&sc->stats, bp);
1039 		atomic_add_int(&xa_active, -1);
1040 		biodone(bio);
1041 		tag->bio = NULL;
1042 	}
1043 
1044 	lockmgr(&sc->lk, LK_EXCLUSIVE);
1045 
1046 	if (wasbio && sc->open_tag &&
1047 	    (bio = TAILQ_FIRST(&sc->bioq)) != NULL) {
1048 		TAILQ_REMOVE(&sc->bioq, bio, bio_act);
1049 		tag->bio = bio;
1050 		xa_start(tag, NULL, 1);
1051 	} else {
1052 		TAILQ_REMOVE(&sc->tag_pendq, tag, entry);
1053 		TAILQ_INSERT_TAIL(&sc->tag_freeq, tag, entry);
1054 	}
1055 	lockmgr(&sc->lk, LK_RELEASE);
1056 }
1057 
1058 /*
1059  * Handle messages under the BLKOPEN transaction.
1060  */
1061 static int
1062 xa_sync_completion(kdmsg_state_t *state, kdmsg_msg_t *msg)
1063 {
1064 	xa_tag_t *tag = state->any.any;
1065 	xa_softc_t *sc;
1066 	struct bio *bio;
1067 
1068 	/*
1069 	 * If the tag has been cleaned out we already closed our side
1070 	 * of the transaction and we are waiting for the other side to
1071 	 * close.
1072 	 */
1073 	xa_printf(1, "xa_sync_completion: tag %p msg %08x state %p\n",
1074 		  tag, msg->any.head.cmd, msg->state);
1075 
1076 	if (tag == NULL) {
1077 		if (msg->any.head.cmd & DMSGF_CREATE)
1078 			kdmsg_state_reply(state, DMSG_ERR_LOSTLINK);
1079 		return 0;
1080 	}
1081 	sc = tag->sc;
1082 
1083 	/*
1084 	 * Validate the tag
1085 	 */
1086 	lockmgr(&sc->lk, LK_EXCLUSIVE);
1087 
1088 	/*
1089 	 * Handle initial response to our open and restart any deferred
1090 	 * BIOs on success.
1091 	 *
1092 	 * NOTE: DELETE may also be set.
1093 	 */
1094 	if (msg->any.head.cmd & DMSGF_CREATE) {
1095 		switch(msg->any.head.cmd & DMSGF_CMDSWMASK) {
1096 		case DMSG_LNK_ERROR | DMSGF_REPLY:
1097 			bzero(&tag->status, sizeof(tag->status));
1098 			tag->status.head = msg->any.head;
1099 			break;
1100 		case DMSG_BLK_ERROR | DMSGF_REPLY:
1101 			tag->status = msg->any.blk_error;
1102 			break;
1103 		}
1104 		sc->last_error = tag->status.head.error;
1105 		xa_printf(1, "blk_open completion status %d\n",
1106 			  sc->last_error);
1107 		if (sc->last_error == 0) {
1108 			while ((bio = TAILQ_FIRST(&sc->bioq)) != NULL) {
1109 				tag = xa_setup_cmd(sc, NULL);
1110 				if (tag == NULL)
1111 					break;
1112 				TAILQ_REMOVE(&sc->bioq, bio, bio_act);
1113 				tag->bio = bio;
1114 				xa_start(tag, NULL, 1);
1115 			}
1116 		}
1117 		sc->serializing = 0;
1118 		wakeup(sc);
1119 	}
1120 
1121 	/*
1122 	 * Handle unexpected termination (or lost comm channel) from other
1123 	 * side.  Autonomous completion only if open_tag matches,
1124 	 * otherwise another thread is probably waiting on the tag.
1125 	 *
1126 	 * (see xa_close() for other interactions)
1127 	 */
1128 	if (msg->any.head.cmd & DMSGF_DELETE) {
1129 		kdmsg_state_reply(tag->state, 0);
1130 		if (sc->open_tag == tag) {
1131 			sc->open_tag = NULL;
1132 			xa_done(tag, 0);
1133 		} else {
1134 			tag->async = 0;
1135 			xa_done(tag, 0);
1136 		}
1137 	}
1138 	lockmgr(&sc->lk, LK_RELEASE);
1139 
1140 	return (0);
1141 }
1142 
1143 static int
1144 xa_bio_completion(kdmsg_state_t *state, kdmsg_msg_t *msg)
1145 {
1146 	xa_tag_t *tag = state->any.any;
1147 	xa_softc_t *sc = tag->sc;
1148 	struct bio *bio;
1149 	struct buf *bp;
1150 
1151 	/*
1152 	 * Get the bio from the tag.  If no bio is present we just do
1153 	 * 'done' handling.
1154 	 */
1155 	if ((bio = tag->bio) == NULL)
1156 		goto handle_done;
1157 	bp = bio->bio_buf;
1158 
1159 	/*
1160 	 * Process return status
1161 	 */
1162 	switch(msg->any.head.cmd & DMSGF_CMDSWMASK) {
1163 	case DMSG_LNK_ERROR | DMSGF_REPLY:
1164 		bzero(&tag->status, sizeof(tag->status));
1165 		tag->status.head = msg->any.head;
1166 		if (tag->status.head.error)
1167 			tag->status.resid = bp->b_bcount;
1168 		else
1169 			tag->status.resid = 0;
1170 		break;
1171 	case DMSG_BLK_ERROR | DMSGF_REPLY:
1172 		tag->status = msg->any.blk_error;
1173 		break;
1174 	}
1175 
1176 	/*
1177 	 * If the device is open stall the bio on DMSG errors.  If an
1178 	 * actual I/O error occured on the remote device, DMSG_ERR_IO
1179 	 * will be returned.
1180 	 */
1181 	if (tag->status.head.error &&
1182 	    (msg->any.head.cmd & DMSGF_DELETE) && sc->opencnt) {
1183 		if (tag->status.head.error != DMSG_ERR_IO)
1184 			goto handle_repend;
1185 	}
1186 
1187 	/*
1188 	 * Process bio completion
1189 	 *
1190 	 * For reads any returned data is zero-extended if necessary, so
1191 	 * the server can short-cut any all-zeros reads if it desires.
1192 	 */
1193 	switch(bp->b_cmd) {
1194 	case BUF_CMD_READ:
1195 		if (msg->aux_data && msg->aux_size) {
1196 			if (msg->aux_size < bp->b_bcount) {
1197 				bcopy(msg->aux_data, bp->b_data, msg->aux_size);
1198 				bzero(bp->b_data + msg->aux_size,
1199 				      bp->b_bcount - msg->aux_size);
1200 			} else {
1201 				bcopy(msg->aux_data, bp->b_data, bp->b_bcount);
1202 			}
1203 		} else {
1204 			bzero(bp->b_data, bp->b_bcount);
1205 		}
1206 		/* fall through */
1207 	case BUF_CMD_WRITE:
1208 	case BUF_CMD_FLUSH:
1209 	case BUF_CMD_FREEBLKS:
1210 	default:
1211 		if (tag->status.resid > bp->b_bcount)
1212 			tag->status.resid = bp->b_bcount;
1213 		bp->b_resid = tag->status.resid;
1214 		if (tag->status.head.error != 0) {
1215 			bp->b_error = EIO;
1216 			bp->b_flags |= B_ERROR;
1217 		} else {
1218 			bp->b_resid = 0;
1219 		}
1220 		devstat_end_transaction_buf(&sc->stats, bp);
1221 		atomic_add_int(&xa_active, -1);
1222 		biodone(bio);
1223 		tag->bio = NULL;
1224 		break;
1225 	}
1226 
1227 	/*
1228 	 * Handle completion of the transaction.  If the bioq is not empty
1229 	 * we can initiate another bio on the same tag.
1230 	 *
1231 	 * NOTE: Most of our transactions will be single-message
1232 	 *	 CREATE+DELETEs, so we won't have to terminate the
1233 	 *	 transaction separately, here.  But just in case they
1234 	 *	 aren't be sure to terminate the transaction.
1235 	 */
1236 handle_done:
1237 	if (msg->any.head.cmd & DMSGF_DELETE) {
1238 		xa_done(tag, 1);
1239 		if ((state->txcmd & DMSGF_DELETE) == 0)
1240 			kdmsg_msg_reply(msg, 0);
1241 	}
1242 	return (0);
1243 
1244 	/*
1245 	 * Handle the case where the transaction failed due to a
1246 	 * connectivity issue.  The tag is put away with wasbio=0
1247 	 * and we put the BIO back onto the bioq for a later restart.
1248 	 *
1249 	 * probe I/Os (where the device is not open) will be failed
1250 	 * instead of requeued.
1251 	 */
1252 handle_repend:
1253 	tag->bio = NULL;
1254 	if (bio->bio_buf->b_flags & B_FAILONDIS) {
1255 		xa_printf(1, "xa_strategy: lost link, fail probe bp %p\n",
1256 			  bio->bio_buf);
1257 		bio->bio_buf->b_error = ENXIO;
1258 		bio->bio_buf->b_flags |= B_ERROR;
1259 		biodone(bio);
1260 		bio = NULL;
1261 	} else {
1262 		xa_printf(1, "xa_strategy: lost link, requeue bp %p\n",
1263 			  bio->bio_buf);
1264 	}
1265 	xa_done(tag, 0);
1266 	if ((state->txcmd & DMSGF_DELETE) == 0)
1267 		kdmsg_msg_reply(msg, 0);
1268 
1269 	/*
1270 	 * Requeue the bio
1271 	 */
1272 	if (bio) {
1273 		lockmgr(&sc->lk, LK_EXCLUSIVE);
1274 		TAILQ_INSERT_TAIL(&sc->bioq, bio, bio_act);
1275 		lockmgr(&sc->lk, LK_RELEASE);
1276 	}
1277 	return (0);
1278 }
1279 
1280 /*
1281  * Restart as much deferred I/O as we can.  The serializer is set and we
1282  * eat it (clear it) when done.
1283  *
1284  * Called with sc->lk held
1285  */
1286 static
1287 void
1288 xa_restart_deferred(xa_softc_t *sc)
1289 {
1290 	kdmsg_state_t *span;
1291 	kdmsg_msg_t *msg;
1292 	xa_tag_t *tag;
1293 	int error;
1294 
1295 	KKASSERT(sc->serializing);
1296 
1297 	/*
1298 	 * Determine if a restart is needed.
1299 	 */
1300 	if (sc->opencnt == 0) {
1301 		/*
1302 		 * Device is not open, nothing to do, eat serializing.
1303 		 */
1304 		sc->serializing = 0;
1305 		wakeup(sc);
1306 	} else if (sc->open_tag == NULL) {
1307 		/*
1308 		 * BLK_OPEN required before we can restart any BIOs.
1309 		 * Select the best LNK_SPAN to issue the BLK_OPEN under.
1310 		 *
1311 		 * serializing interlocks waiting open()s.
1312 		 */
1313 		error = 0;
1314 		TAILQ_FOREACH(span, &sc->spanq, user_entry) {
1315 			if ((span->rxcmd & DMSGF_DELETE) == 0)
1316 				break;
1317 		}
1318 		if (span == NULL)
1319 			error = ENXIO;
1320 
1321 		if (error == 0) {
1322 			tag = xa_setup_cmd(sc, NULL);
1323 			if (tag == NULL)
1324 				error = ENXIO;
1325 		}
1326 		if (error == 0) {
1327 			sc->open_tag = tag;
1328 			msg = kdmsg_msg_alloc(span,
1329 					      DMSG_BLK_OPEN |
1330 					      DMSGF_CREATE,
1331 					      xa_sync_completion, tag);
1332 			msg->any.blk_open.modes = DMSG_BLKOPEN_RD;
1333 			xa_printf(1,
1334 				  "BLK_OPEN tag %p state %p "
1335 				  "span-state %p\n",
1336 				  tag, msg->state, span);
1337 			xa_start(tag, msg, 0);
1338 		}
1339 		if (error) {
1340 			sc->serializing = 0;
1341 			wakeup(sc);
1342 		}
1343 		/* else leave serializing set until BLK_OPEN response */
1344 	} else {
1345 		/* nothing to do */
1346 		sc->serializing = 0;
1347 		wakeup(sc);
1348 	}
1349 }
1350