xref: /dflybsd-src/sys/dev/disk/xdisk/xdisk.c (revision 9e1c08804a46f1c1a9cd11e190ddba7d2bc4abed)
1 /*
2  * Copyright (c) 2012 The DragonFly Project.  All rights reserved.
3  *
4  * This code is derived from software contributed to The DragonFly Project
5  * by Matthew Dillon <dillon@dragonflybsd.org>
6  *
7  * Redistribution and use in source and binary forms, with or without
8  * modification, are permitted provided that the following conditions
9  * are met:
10  *
11  * 1. Redistributions of source code must retain the above copyright
12  *    notice, this list of conditions and the following disclaimer.
13  * 2. Redistributions in binary form must reproduce the above copyright
14  *    notice, this list of conditions and the following disclaimer in
15  *    the documentation and/or other materials provided with the
16  *    distribution.
17  * 3. Neither the name of The DragonFly Project nor the names of its
18  *    contributors may be used to endorse or promote products derived
19  *    from this software without specific, prior written permission.
20  *
21  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
22  * ``AS IS'' AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
23  * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
24  * FOR A PARTICULAR PURPOSE ARE DISCLAIMED.  IN NO EVENT SHALL THE
25  * COPYRIGHT HOLDERS OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
26  * INCIDENTAL, SPECIAL, EXEMPLARY OR CONSEQUENTIAL DAMAGES (INCLUDING,
27  * BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
28  * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
29  * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
30  * OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT
31  * OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
32  * SUCH DAMAGE.
33  */
34 /*
35  * This module allows disk devices to be created and associated with a
36  * communications pipe or socket.  You open the device and issue an
37  * ioctl() to install a new disk along with its communications descriptor.
38  *
39  * All further communication occurs via the descriptor using the DMSG
40  * LNK_CONN, LNK_SPAN, and BLOCK protocols.  The descriptor can be a
41  * direct connection to a remote machine's disk (in-kernenl), to a remote
42  * cluster controller, to the local cluster controller, etc.
43  *
44  * /dev/xdisk is the control device, issue ioctl()s to create the /dev/xa%d
45  * devices.  These devices look like raw disks to the system.
46  *
47  * TODO:
48  *	Handle circuit disconnects, leave bio's pending
49  *	Restart bio's on circuit reconnect.
50  */
51 #include <sys/param.h>
52 #include <sys/systm.h>
53 #include <sys/buf.h>
54 #include <sys/conf.h>
55 #include <sys/device.h>
56 #include <sys/devicestat.h>
57 #include <sys/disk.h>
58 #include <sys/kernel.h>
59 #include <sys/malloc.h>
60 #include <sys/sysctl.h>
61 #include <sys/proc.h>
62 #include <sys/queue.h>
63 #include <sys/udev.h>
64 #include <sys/uuid.h>
65 #include <sys/kern_syscall.h>
66 
67 #include <sys/dmsg.h>
68 #include <sys/xdiskioctl.h>
69 
70 #include <sys/buf2.h>
71 #include <sys/thread2.h>
72 
73 struct xa_softc;
74 
75 struct xa_tag {
76 	TAILQ_ENTRY(xa_tag) entry;
77 	struct xa_softc	*xa;
78 	dmsg_blk_error_t status;
79 	kdmsg_state_t	*state;
80 	kdmsg_circuit_t	*circ;
81 	struct bio	*bio;
82 	int		running;	/* transaction running */
83 	int		waitseq;	/* streaming reply */
84 	int		done;		/* final (transaction closed) */
85 };
86 
87 typedef struct xa_tag	xa_tag_t;
88 
89 struct xa_softc {
90 	TAILQ_ENTRY(xa_softc) entry;
91 	cdev_t		dev;
92 	kdmsg_iocom_t	iocom;
93 	struct xdisk_attach_ioctl xaioc;
94 	struct disk_info info;
95 	struct disk	disk;
96 	uuid_t		pfs_fsid;
97 	int		unit;
98 	int		serializing;
99 	int		attached;
100 	int		opencnt;
101 	uint64_t	keyid;
102 	xa_tag_t	*opentag;
103 	TAILQ_HEAD(, bio) bioq;
104 	TAILQ_HEAD(, xa_tag) tag_freeq;
105 	TAILQ_HEAD(, xa_tag) tag_pendq;
106 	TAILQ_HEAD(, kdmsg_circuit) circq;
107 	struct lwkt_token tok;
108 };
109 
110 typedef struct xa_softc	xa_softc_t;
111 
112 #define MAXTAGS		64	/* no real limit */
113 
114 static int xdisk_attach(struct xdisk_attach_ioctl *xaioc);
115 static int xdisk_detach(struct xdisk_attach_ioctl *xaioc);
116 static void xa_exit(kdmsg_iocom_t *iocom);
117 static void xa_terminate_check(struct xa_softc *xa);
118 static int xa_rcvdmsg(kdmsg_msg_t *msg);
119 static void xa_autodmsg(kdmsg_msg_t *msg);
120 
121 static xa_tag_t *xa_setup_cmd(xa_softc_t *xa, struct bio *bio);
122 static void xa_start(xa_tag_t *tag, kdmsg_msg_t *msg);
123 static uint32_t xa_wait(xa_tag_t *tag, int seq);
124 static void xa_done(xa_tag_t *tag, int wasbio);
125 static int xa_sync_completion(kdmsg_state_t *state, kdmsg_msg_t *msg);
126 static int xa_bio_completion(kdmsg_state_t *state, kdmsg_msg_t *msg);
127 static void xa_restart_deferred(xa_softc_t *xa);
128 
129 MALLOC_DEFINE(M_XDISK, "Networked disk client", "Network Disks");
130 
131 /*
132  * Control device, issue ioctls to create xa devices.
133  */
134 static d_open_t xdisk_open;
135 static d_close_t xdisk_close;
136 static d_ioctl_t xdisk_ioctl;
137 
138 static struct dev_ops xdisk_ops = {
139 	{ "xdisk", 0, D_MPSAFE | D_TRACKCLOSE },
140         .d_open =	xdisk_open,
141         .d_close =	xdisk_close,
142         .d_ioctl =	xdisk_ioctl
143 };
144 
145 /*
146  * XA disk devices
147  */
148 static d_open_t xa_open;
149 static d_close_t xa_close;
150 static d_ioctl_t xa_ioctl;
151 static d_strategy_t xa_strategy;
152 static d_psize_t xa_size;
153 
154 static struct dev_ops xa_ops = {
155 	{ "xa", 0, D_DISK | D_CANFREE | D_MPSAFE | D_TRACKCLOSE },
156         .d_open =	xa_open,
157         .d_close =	xa_close,
158         .d_ioctl =	xa_ioctl,
159         .d_read =	physread,
160         .d_write =	physwrite,
161         .d_strategy =	xa_strategy,
162 	.d_psize =	xa_size
163 };
164 
165 static struct lwkt_token xdisk_token = LWKT_TOKEN_INITIALIZER(xdisk_token);
166 static int xdisk_opencount;
167 static cdev_t xdisk_dev;
168 static TAILQ_HEAD(, xa_softc) xa_queue;
169 
170 /*
171  * Module initialization
172  */
173 static int
174 xdisk_modevent(module_t mod, int type, void *data)
175 {
176 	switch (type) {
177 	case MOD_LOAD:
178 		TAILQ_INIT(&xa_queue);
179 		xdisk_dev = make_dev(&xdisk_ops, 0,
180 				     UID_ROOT, GID_WHEEL, 0600, "xdisk");
181 		break;
182 	case MOD_UNLOAD:
183 	case MOD_SHUTDOWN:
184 		if (xdisk_opencount || TAILQ_FIRST(&xa_queue))
185 			return (EBUSY);
186 		if (xdisk_dev) {
187 			destroy_dev(xdisk_dev);
188 			xdisk_dev = NULL;
189 		}
190 		dev_ops_remove_all(&xdisk_ops);
191 		dev_ops_remove_all(&xa_ops);
192 		break;
193 	default:
194 		break;
195 	}
196 	return 0;
197 }
198 
199 DEV_MODULE(xdisk, xdisk_modevent, 0);
200 
201 /*
202  * Control device
203  */
204 static int
205 xdisk_open(struct dev_open_args *ap)
206 {
207 	lwkt_gettoken(&xdisk_token);
208 	++xdisk_opencount;
209 	lwkt_reltoken(&xdisk_token);
210 	return(0);
211 }
212 
213 static int
214 xdisk_close(struct dev_close_args *ap)
215 {
216 	lwkt_gettoken(&xdisk_token);
217 	--xdisk_opencount;
218 	lwkt_reltoken(&xdisk_token);
219 	return(0);
220 }
221 
222 static int
223 xdisk_ioctl(struct dev_ioctl_args *ap)
224 {
225 	int error;
226 
227 	switch(ap->a_cmd) {
228 	case XDISKIOCATTACH:
229 		error = xdisk_attach((void *)ap->a_data);
230 		break;
231 	case XDISKIOCDETACH:
232 		error = xdisk_detach((void *)ap->a_data);
233 		break;
234 	default:
235 		error = ENOTTY;
236 		break;
237 	}
238 	return error;
239 }
240 
241 /************************************************************************
242  *				DMSG INTERFACE				*
243  ************************************************************************/
244 
245 static int
246 xdisk_attach(struct xdisk_attach_ioctl *xaioc)
247 {
248 	xa_softc_t *xa;
249 	xa_tag_t *tag;
250 	struct file *fp;
251 	int unit;
252 	int n;
253 	char devname[64];
254 	cdev_t dev;
255 
256 	/*
257 	 * Normalize ioctl params
258 	 */
259 	fp = holdfp(curproc->p_fd, xaioc->fd, -1);
260 	if (fp == NULL)
261 		return EINVAL;
262 	if (xaioc->cl_label[sizeof(xaioc->cl_label) - 1] != 0)
263 		return EINVAL;
264 	if (xaioc->fs_label[sizeof(xaioc->fs_label) - 1] != 0)
265 		return EINVAL;
266 	if (xaioc->blksize < DEV_BSIZE || xaioc->blksize > MAXBSIZE)
267 		return EINVAL;
268 
269 	/*
270 	 * See if the serial number is already present.  If we are
271 	 * racing a termination the disk subsystem may still have
272 	 * duplicate entries not yet removed so we wait a bit and
273 	 * retry.
274 	 */
275 	lwkt_gettoken(&xdisk_token);
276 again:
277 	TAILQ_FOREACH(xa, &xa_queue, entry) {
278 		if (strcmp(xa->iocom.auto_lnk_conn.fs_label,
279 			   xaioc->fs_label) == 0) {
280 			if (xa->serializing) {
281 				tsleep(xa, 0, "xadelay", hz / 10);
282 				goto again;
283 			}
284 			xa->serializing = 1;
285 			kdmsg_iocom_uninit(&xa->iocom);
286 			break;
287 		}
288 	}
289 
290 	/*
291 	 * Create a new xa if not already present
292 	 */
293 	if (xa == NULL) {
294 		unit = 0;
295 		for (;;) {
296 			TAILQ_FOREACH(xa, &xa_queue, entry) {
297 				if (xa->unit == unit)
298 					break;
299 			}
300 			if (xa == NULL)
301 				break;
302 			++unit;
303 		}
304 		xa = kmalloc(sizeof(*xa), M_XDISK, M_WAITOK|M_ZERO);
305 		xa->unit = unit;
306 		xa->serializing = 1;
307 		lwkt_token_init(&xa->tok, "xa");
308 		TAILQ_INIT(&xa->circq);
309 		TAILQ_INIT(&xa->bioq);
310 		TAILQ_INIT(&xa->tag_freeq);
311 		TAILQ_INIT(&xa->tag_pendq);
312 		for (n = 0; n < MAXTAGS; ++n) {
313 			tag = kmalloc(sizeof(*tag), M_XDISK, M_WAITOK|M_ZERO);
314 			tag->xa = xa;
315 			TAILQ_INSERT_TAIL(&xa->tag_freeq, tag, entry);
316 		}
317 		TAILQ_INSERT_TAIL(&xa_queue, xa, entry);
318 	} else {
319 		unit = xa->unit;
320 	}
321 
322 	/*
323 	 * (xa) is now serializing.
324 	 */
325 	xa->xaioc = *xaioc;
326 	xa->attached = 1;
327 	lwkt_reltoken(&xdisk_token);
328 
329 	/*
330 	 * Create device
331 	 */
332 	if (xa->dev == NULL) {
333 		dev = disk_create(unit, &xa->disk, &xa_ops);
334 		dev->si_drv1 = xa;
335 		xa->dev = dev;
336 	}
337 
338 	xa->info.d_media_blksize = xaioc->blksize;
339 	xa->info.d_media_blocks = xaioc->bytes / xaioc->blksize;
340 	xa->info.d_dsflags = DSO_MBRQUIET | DSO_RAWPSIZE;
341 	xa->info.d_secpertrack = 32;
342 	xa->info.d_nheads = 64;
343 	xa->info.d_secpercyl = xa->info.d_secpertrack * xa->info.d_nheads;
344 	xa->info.d_ncylinders = 0;
345 	if (xa->xaioc.fs_label[0])
346 		xa->info.d_serialno = xa->xaioc.fs_label;
347 
348 	/*
349 	 * Set up messaging connection
350 	 */
351 	ksnprintf(devname, sizeof(devname), "xa%d", unit);
352 	kdmsg_iocom_init(&xa->iocom, xa,
353 			 KDMSG_IOCOMF_AUTOCONN |
354 			 KDMSG_IOCOMF_AUTORXSPAN |
355 			 KDMSG_IOCOMF_AUTOTXSPAN |
356 			 KDMSG_IOCOMF_AUTORXCIRC |
357 			 KDMSG_IOCOMF_AUTOTXCIRC,
358 			 M_XDISK, xa_rcvdmsg);
359 	xa->iocom.exit_func = xa_exit;
360 
361 	kdmsg_iocom_reconnect(&xa->iocom, fp, devname);
362 
363 	/*
364 	 * Setup our LNK_CONN advertisement for autoinitiate.
365 	 *
366 	 * Our filter is setup to only accept PEER_BLOCK/SERVER
367 	 * advertisements.
368 	 */
369 	xa->iocom.auto_lnk_conn.pfs_type = DMSG_PFSTYPE_CLIENT;
370 	xa->iocom.auto_lnk_conn.proto_version = DMSG_SPAN_PROTO_1;
371 	xa->iocom.auto_lnk_conn.peer_type = DMSG_PEER_BLOCK;
372 	xa->iocom.auto_lnk_conn.peer_mask = 1LLU << DMSG_PEER_BLOCK;
373 	xa->iocom.auto_lnk_conn.pfs_mask = 1LLU << DMSG_PFSTYPE_SERVER;
374 	ksnprintf(xa->iocom.auto_lnk_conn.cl_label,
375 		  sizeof(xa->iocom.auto_lnk_conn.cl_label),
376 		  "%s", xaioc->cl_label);
377 
378 	/*
379 	 * We need a unique pfs_fsid to avoid confusion.
380 	 * We supply a rendezvous fs_label using the serial number.
381 	 */
382 	kern_uuidgen(&xa->pfs_fsid, 1);
383 	xa->iocom.auto_lnk_conn.pfs_fsid = xa->pfs_fsid;
384 	ksnprintf(xa->iocom.auto_lnk_conn.fs_label,
385 		  sizeof(xa->iocom.auto_lnk_conn.fs_label),
386 		  "%s", xaioc->fs_label);
387 
388 	/*
389 	 * Setup our LNK_SPAN advertisement for autoinitiate
390 	 */
391 	xa->iocom.auto_lnk_span.pfs_type = DMSG_PFSTYPE_CLIENT;
392 	xa->iocom.auto_lnk_span.proto_version = DMSG_SPAN_PROTO_1;
393 	xa->iocom.auto_lnk_span.peer_type = DMSG_PEER_BLOCK;
394 	ksnprintf(xa->iocom.auto_lnk_span.cl_label,
395 		  sizeof(xa->iocom.auto_lnk_span.cl_label),
396 		  "%s", xa->xaioc.cl_label);
397 
398 	kdmsg_iocom_autoinitiate(&xa->iocom, xa_autodmsg);
399 	disk_setdiskinfo_sync(&xa->disk, &xa->info);
400 
401 	lwkt_gettoken(&xdisk_token);
402 	xa->serializing = 0;
403 	xa_terminate_check(xa);
404 	lwkt_reltoken(&xdisk_token);
405 
406 	return(0);
407 }
408 
409 static int
410 xdisk_detach(struct xdisk_attach_ioctl *xaioc)
411 {
412 	struct xa_softc *xa;
413 
414 	lwkt_gettoken(&xdisk_token);
415 	for (;;) {
416 		TAILQ_FOREACH(xa, &xa_queue, entry) {
417 			if (strcmp(xa->iocom.auto_lnk_conn.fs_label,
418 				   xaioc->fs_label) == 0) {
419 				break;
420 			}
421 		}
422 		if (xa == NULL || xa->serializing == 0) {
423 			xa->serializing = 1;
424 			break;
425 		}
426 		tsleep(xa, 0, "xadet", hz / 10);
427 	}
428 	if (xa) {
429 		kdmsg_iocom_uninit(&xa->iocom);
430 		xa->serializing = 0;
431 	}
432 	lwkt_reltoken(&xdisk_token);
433 	return(0);
434 }
435 
436 /*
437  * Called from iocom core transmit thread upon disconnect.
438  */
439 static
440 void
441 xa_exit(kdmsg_iocom_t *iocom)
442 {
443 	struct xa_softc *xa = iocom->handle;
444 
445 	lwkt_gettoken(&xa->tok);
446 	lwkt_gettoken(&xdisk_token);
447 
448 	/*
449 	 * We must wait for any I/O's to complete to ensure that all
450 	 * state structure references are cleaned up before returning.
451 	 */
452 	xa->attached = -1;	/* force deferral or failure */
453 	while (TAILQ_FIRST(&xa->tag_pendq)) {
454 		tsleep(xa, 0, "xabiow", hz / 10);
455 	}
456 
457 	/*
458 	 * All serializing code checks for de-initialization so only
459 	 * do it if we aren't already serializing.
460 	 */
461 	if (xa->serializing == 0) {
462 		xa->serializing = 1;
463 		kdmsg_iocom_uninit(iocom);
464 		xa->serializing = 0;
465 	}
466 
467 	/*
468 	 * If the drive is not in use and no longer attach it can be
469 	 * destroyed.
470 	 */
471 	xa->attached = 0;
472 	xa_terminate_check(xa);
473 	lwkt_reltoken(&xdisk_token);
474 	lwkt_reltoken(&xa->tok);
475 }
476 
477 /*
478  * Determine if we can destroy the xa_softc.
479  *
480  * Called with xdisk_token held.
481  */
482 static
483 void
484 xa_terminate_check(struct xa_softc *xa)
485 {
486 	xa_tag_t *tag;
487 	struct bio *bio;
488 
489 	if (xa->opencnt || xa->attached || xa->serializing)
490 		return;
491 	xa->serializing = 1;
492 	kdmsg_iocom_uninit(&xa->iocom);
493 
494 	/*
495 	 * When destroying an xa make sure all pending I/O (typically
496 	 * from the disk probe) is done.
497 	 *
498 	 * XXX what about new I/O initiated prior to disk_destroy().
499 	 */
500 	while ((tag = TAILQ_FIRST(&xa->tag_pendq)) != NULL) {
501 		TAILQ_REMOVE(&xa->tag_pendq, tag, entry);
502 		if ((bio = tag->bio) != NULL) {
503 			tag->bio = NULL;
504 			bio->bio_buf->b_error = ENXIO;
505 			bio->bio_buf->b_flags |= B_ERROR;
506 			biodone(bio);
507 		}
508 		TAILQ_INSERT_TAIL(&xa->tag_freeq, tag, entry);
509 	}
510 	if (xa->dev) {
511 		disk_destroy(&xa->disk);
512 		xa->dev->si_drv1 = NULL;
513 		xa->dev = NULL;
514 	}
515 	KKASSERT(xa->opencnt == 0 && xa->attached == 0);
516 	while ((tag = TAILQ_FIRST(&xa->tag_freeq)) != NULL) {
517 		TAILQ_REMOVE(&xa->tag_freeq, tag, entry);
518 		tag->xa = NULL;
519 		kfree(tag, M_XDISK);
520 	}
521 	KKASSERT(TAILQ_EMPTY(&xa->tag_pendq));
522 	TAILQ_REMOVE(&xa_queue, xa, entry); /* XXX */
523 	kfree(xa, M_XDISK);
524 }
525 
526 /*
527  * Shim to catch and record virtual circuit events.
528  */
529 static void
530 xa_autodmsg(kdmsg_msg_t *msg)
531 {
532 	xa_softc_t *xa = msg->iocom->handle;
533 
534 	kdmsg_circuit_t *circ;
535 	kdmsg_circuit_t *cscan;
536 	uint32_t xcmd;
537 
538 	/*
539 	 * Because this is just a shim we don't have a state callback for
540 	 * the transactions we are sniffing, so make things easier by
541 	 * calculating the original command along with the current message's
542 	 * flags.  This is because transactions are made up of numerous
543 	 * messages and only the first typically specifies the actual command.
544 	 */
545 	if (msg->state) {
546 		xcmd = msg->state->icmd |
547 		       (msg->any.head.cmd & (DMSGF_CREATE |
548 					     DMSGF_DELETE |
549 					     DMSGF_REPLY));
550 	} else {
551 		xcmd = msg->any.head.cmd;
552 	}
553 
554 	/*
555 	 * Add or remove a circuit, sorted by weight (lower numbers are
556 	 * better).
557 	 */
558 	switch(xcmd) {
559 	case DMSG_LNK_CIRC | DMSGF_CREATE | DMSGF_REPLY:
560 		/*
561 		 * Track established circuits
562 		 */
563 		circ = msg->state->any.circ;
564 		lwkt_gettoken(&xa->tok);
565 		if (circ->recorded == 0) {
566 			TAILQ_FOREACH(cscan, &xa->circq, entry) {
567 				if (circ->weight < cscan->weight)
568 					break;
569 			}
570 			if (cscan)
571 				TAILQ_INSERT_BEFORE(cscan, circ, entry);
572 			else
573 				TAILQ_INSERT_TAIL(&xa->circq, circ, entry);
574 			circ->recorded = 1;
575 		}
576 
577 		/*
578 		 * Restart any deferred I/O.
579 		 */
580 		xa_restart_deferred(xa);
581 		lwkt_reltoken(&xa->tok);
582 		break;
583 	case DMSG_LNK_CIRC | DMSGF_DELETE | DMSGF_REPLY:
584 		/*
585 		 * Losing virtual circuit.  Remove the circ from contention.
586 		 */
587 		circ = msg->state->any.circ;
588 		lwkt_gettoken(&xa->tok);
589 		if (circ->recorded) {
590 			TAILQ_REMOVE(&xa->circq, circ, entry);
591 			circ->recorded = 0;
592 		}
593 		xa_restart_deferred(xa);
594 		lwkt_reltoken(&xa->tok);
595 		break;
596 	default:
597 		break;
598 	}
599 }
600 
601 static int
602 xa_rcvdmsg(kdmsg_msg_t *msg)
603 {
604 	switch(msg->any.head.cmd & DMSGF_TRANSMASK) {
605 	case DMSG_DBG_SHELL:
606 		/*
607 		 * Execute shell command (not supported atm).
608 		 *
609 		 * This is a one-way packet but if not (e.g. if part of
610 		 * a streaming transaction), we will have already closed
611 		 * our end.
612 		 */
613 		kdmsg_msg_reply(msg, DMSG_ERR_NOSUPP);
614 		break;
615 	case DMSG_DBG_SHELL | DMSGF_REPLY:
616 		/*
617 		 * Receive one or more replies to a shell command that we
618 		 * sent.
619 		 *
620 		 * This is a one-way packet but if not (e.g. if part of
621 		 * a streaming transaction), we will have already closed
622 		 * our end.
623 		 */
624 		if (msg->aux_data) {
625 			msg->aux_data[msg->aux_size - 1] = 0;
626 			kprintf("xdisk: DEBUGMSG: %s\n", msg->aux_data);
627 		}
628 		break;
629 	default:
630 		/*
631 		 * Unsupported LNK message received.  We only need to
632 		 * reply if it's a transaction in order to close our end.
633 		 * Ignore any one-way messages are any further messages
634 		 * associated with the transaction.
635 		 *
636 		 * NOTE: This case also includes DMSG_LNK_ERROR messages
637 		 *	 which might be one-way, replying to those would
638 		 *	 cause an infinite ping-pong.
639 		 */
640 		if (msg->any.head.cmd & DMSGF_CREATE)
641 			kdmsg_msg_reply(msg, DMSG_ERR_NOSUPP);
642 		break;
643 	}
644 	return(0);
645 }
646 
647 
648 /************************************************************************
649  *			   XA DEVICE INTERFACE				*
650  ************************************************************************/
651 
652 static int
653 xa_open(struct dev_open_args *ap)
654 {
655 	cdev_t dev = ap->a_head.a_dev;
656 	xa_softc_t *xa;
657 	xa_tag_t *tag;
658 	kdmsg_msg_t *msg;
659 	int error;
660 
661 	dev->si_bsize_phys = 512;
662 	dev->si_bsize_best = 32768;
663 
664 	/*
665 	 * Interlock open with opencnt, wait for attachment operations
666 	 * to finish.
667 	 */
668 	lwkt_gettoken(&xdisk_token);
669 again:
670 	xa = dev->si_drv1;
671 	if (xa == NULL) {
672 		lwkt_reltoken(&xdisk_token);
673 		return ENXIO;	/* raced destruction */
674 	}
675 	if (xa->serializing) {
676 		tsleep(xa, 0, "xarace", hz / 10);
677 		goto again;
678 	}
679 	if (xa->attached == 0) {
680 		lwkt_reltoken(&xdisk_token);
681 		return ENXIO;	/* raced destruction */
682 	}
683 
684 	/*
685 	 * Serialize initial open
686 	 */
687 	if (xa->opencnt++ > 0) {
688 		lwkt_reltoken(&xdisk_token);
689 		return(0);
690 	}
691 	xa->serializing = 1;
692 	lwkt_reltoken(&xdisk_token);
693 
694 	tag = xa_setup_cmd(xa, NULL);
695 	if (tag == NULL) {
696 		lwkt_gettoken(&xdisk_token);
697 		KKASSERT(xa->opencnt > 0);
698 		--xa->opencnt;
699 		xa->serializing = 0;
700 		xa_terminate_check(xa);
701 		lwkt_reltoken(&xdisk_token);
702 		return(ENXIO);
703 	}
704 	msg = kdmsg_msg_alloc(&xa->iocom, tag->circ,
705 			      DMSG_BLK_OPEN | DMSGF_CREATE,
706 			      xa_sync_completion, tag);
707 	msg->any.blk_open.modes = DMSG_BLKOPEN_RD | DMSG_BLKOPEN_WR;
708 	xa_start(tag, msg);
709 	if (xa_wait(tag, 0) == 0) {
710 		xa->keyid = tag->status.keyid;
711 		xa->opentag = tag;	/* leave tag open */
712 		xa->serializing = 0;
713 		error = 0;
714 	} else {
715 		xa_done(tag, 0);
716 		lwkt_gettoken(&xdisk_token);
717 		KKASSERT(xa->opencnt > 0);
718 		--xa->opencnt;
719 		xa->serializing = 0;
720 		xa_terminate_check(xa);
721 		lwkt_reltoken(&xdisk_token);
722 		error = ENXIO;
723 	}
724 	return (error);
725 }
726 
727 static int
728 xa_close(struct dev_close_args *ap)
729 {
730 	cdev_t dev = ap->a_head.a_dev;
731 	xa_softc_t *xa;
732 	xa_tag_t *tag;
733 
734 	xa = dev->si_drv1;
735 	if (xa == NULL)
736 		return ENXIO;	/* raced destruction */
737 
738 	lwkt_gettoken(&xa->tok);
739 	if ((tag = xa->opentag) != NULL) {
740 		xa->opentag = NULL;
741 		kdmsg_state_reply(tag->state, 0);
742 		while (tag->done == 0)
743 			xa_wait(tag, tag->waitseq);
744 		xa_done(tag, 0);
745 	}
746 	lwkt_reltoken(&xa->tok);
747 
748 	lwkt_gettoken(&xdisk_token);
749 	KKASSERT(xa->opencnt > 0);
750 	--xa->opencnt;
751 	xa_terminate_check(xa);
752 	lwkt_reltoken(&xdisk_token);
753 
754 	return(0);
755 }
756 
757 static int
758 xa_strategy(struct dev_strategy_args *ap)
759 {
760 	xa_softc_t *xa = ap->a_head.a_dev->si_drv1;
761 	xa_tag_t *tag;
762 	struct bio *bio = ap->a_bio;
763 
764 	/*
765 	 * Allow potentially temporary link failures to fail the I/Os
766 	 * only if the device is not open.  That is, we allow the disk
767 	 * probe code prior to mount to fail.
768 	 */
769 	if (xa->attached == 0 && xa->opencnt == 0) {
770 		bio->bio_buf->b_error = ENXIO;
771 		bio->bio_buf->b_flags |= B_ERROR;
772 		biodone(bio);
773 		return(0);
774 	}
775 
776 	tag = xa_setup_cmd(xa, bio);
777 	if (tag)
778 		xa_start(tag, NULL);
779 	return(0);
780 }
781 
782 static int
783 xa_ioctl(struct dev_ioctl_args *ap)
784 {
785 	return(ENOTTY);
786 }
787 
788 static int
789 xa_size(struct dev_psize_args *ap)
790 {
791 	struct xa_softc *xa;
792 
793 	if ((xa = ap->a_head.a_dev->si_drv1) == NULL)
794 		return (ENXIO);
795 	ap->a_result = xa->info.d_media_blocks;
796 	return (0);
797 }
798 
799 /************************************************************************
800  *		    XA BLOCK PROTOCOL STATE MACHINE			*
801  ************************************************************************
802  *
803  * Implement tag/msg setup and related functions.
804  */
805 static xa_tag_t *
806 xa_setup_cmd(xa_softc_t *xa, struct bio *bio)
807 {
808 	kdmsg_circuit_t *circ;
809 	xa_tag_t *tag;
810 
811 	/*
812 	 * Only get a tag if we have a valid virtual circuit to the server.
813 	 */
814 	lwkt_gettoken(&xa->tok);
815 	TAILQ_FOREACH(circ, &xa->circq, entry) {
816 		if (circ->lost == 0)
817 			break;
818 	}
819 	if (circ == NULL || xa->attached <= 0) {
820 		tag = NULL;
821 	} else if ((tag = TAILQ_FIRST(&xa->tag_freeq)) != NULL) {
822 		TAILQ_REMOVE(&xa->tag_freeq, tag, entry);
823 		tag->bio = bio;
824 		tag->circ = circ;
825 		kdmsg_circ_hold(circ);
826 		TAILQ_INSERT_TAIL(&xa->tag_pendq, tag, entry);
827 	}
828 
829 	/*
830 	 * If we can't dispatch now and this is a bio, queue it for later.
831 	 */
832 	if (tag == NULL && bio) {
833 		TAILQ_INSERT_TAIL(&xa->bioq, bio, bio_act);
834 	}
835 	lwkt_reltoken(&xa->tok);
836 
837 	return (tag);
838 }
839 
840 static void
841 xa_start(xa_tag_t *tag, kdmsg_msg_t *msg)
842 {
843 	xa_softc_t *xa = tag->xa;
844 
845 	if (msg == NULL) {
846 		struct bio *bio;
847 		struct buf *bp;
848 
849 		KKASSERT(tag->bio);
850 		bio = tag->bio;
851 		bp = bio->bio_buf;
852 
853 		switch(bp->b_cmd) {
854 		case BUF_CMD_READ:
855 			msg = kdmsg_msg_alloc(&xa->iocom, tag->circ,
856 					      DMSG_BLK_READ |
857 					      DMSGF_CREATE | DMSGF_DELETE,
858 					      xa_bio_completion, tag);
859 			msg->any.blk_read.keyid = xa->keyid;
860 			msg->any.blk_read.offset = bio->bio_offset;
861 			msg->any.blk_read.bytes = bp->b_bcount;
862 			break;
863 		case BUF_CMD_WRITE:
864 			msg = kdmsg_msg_alloc(&xa->iocom, tag->circ,
865 					      DMSG_BLK_WRITE |
866 					      DMSGF_CREATE | DMSGF_DELETE,
867 					      xa_bio_completion, tag);
868 			msg->any.blk_write.keyid = xa->keyid;
869 			msg->any.blk_write.offset = bio->bio_offset;
870 			msg->any.blk_write.bytes = bp->b_bcount;
871 			msg->aux_data = bp->b_data;
872 			msg->aux_size = bp->b_bcount;
873 			break;
874 		case BUF_CMD_FLUSH:
875 			msg = kdmsg_msg_alloc(&xa->iocom, tag->circ,
876 					      DMSG_BLK_FLUSH |
877 					      DMSGF_CREATE | DMSGF_DELETE,
878 					      xa_bio_completion, tag);
879 			msg->any.blk_flush.keyid = xa->keyid;
880 			msg->any.blk_flush.offset = bio->bio_offset;
881 			msg->any.blk_flush.bytes = bp->b_bcount;
882 			break;
883 		case BUF_CMD_FREEBLKS:
884 			msg = kdmsg_msg_alloc(&xa->iocom, tag->circ,
885 					      DMSG_BLK_FREEBLKS |
886 					      DMSGF_CREATE | DMSGF_DELETE,
887 					      xa_bio_completion, tag);
888 			msg->any.blk_freeblks.keyid = xa->keyid;
889 			msg->any.blk_freeblks.offset = bio->bio_offset;
890 			msg->any.blk_freeblks.bytes = bp->b_bcount;
891 			break;
892 		default:
893 			bp->b_flags |= B_ERROR;
894 			bp->b_error = EIO;
895 			biodone(bio);
896 			tag->bio = NULL;
897 			break;
898 		}
899 	}
900 
901 	tag->done = 0;
902 	tag->waitseq = 0;
903 	if (msg) {
904 		tag->state = msg->state;
905 		kdmsg_msg_write(msg);
906 	} else {
907 		xa_done(tag, 1);
908 	}
909 }
910 
911 static uint32_t
912 xa_wait(xa_tag_t *tag, int seq)
913 {
914 	xa_softc_t *xa = tag->xa;
915 
916 	lwkt_gettoken(&xa->tok);
917 	while (tag->waitseq == seq)
918 		tsleep(tag, 0, "xawait", 0);
919 	lwkt_reltoken(&xa->tok);
920 	return (tag->status.head.error);
921 }
922 
923 static void
924 xa_done(xa_tag_t *tag, int wasbio)
925 {
926 	xa_softc_t *xa = tag->xa;
927 	struct bio *bio;
928 
929 	KKASSERT(tag->bio == NULL);
930 	tag->done = 1;
931 	tag->state = NULL;
932 
933 	lwkt_gettoken(&xa->tok);
934 	if (wasbio && (bio = TAILQ_FIRST(&xa->bioq)) != NULL) {
935 		TAILQ_REMOVE(&xa->bioq, bio, bio_act);
936 		tag->bio = bio;
937 		lwkt_reltoken(&xa->tok);
938 		xa_start(tag, NULL);
939 	} else {
940 		if (tag->circ) {
941 			kdmsg_circ_drop(tag->circ);
942 			tag->circ = NULL;
943 		}
944 		TAILQ_REMOVE(&xa->tag_pendq, tag, entry);
945 		TAILQ_INSERT_TAIL(&xa->tag_freeq, tag, entry);
946 		lwkt_reltoken(&xa->tok);
947 	}
948 }
949 
950 static int
951 xa_sync_completion(kdmsg_state_t *state, kdmsg_msg_t *msg)
952 {
953 	xa_tag_t *tag = state->any.any;
954 	xa_softc_t *xa = tag->xa;
955 
956 	switch(msg->any.head.cmd & DMSGF_CMDSWMASK) {
957 	case DMSG_LNK_ERROR | DMSGF_REPLY:
958 		bzero(&tag->status, sizeof(tag->status));
959 		tag->status.head = msg->any.head;
960 		break;
961 	case DMSG_BLK_ERROR | DMSGF_REPLY:
962 		tag->status = msg->any.blk_error;
963 		break;
964 	}
965 	lwkt_gettoken(&xa->tok);
966 	if (msg->any.head.cmd & DMSGF_DELETE) {	/* receive termination */
967 		if (xa->opentag == tag) {
968 			xa->opentag = NULL;	/* XXX */
969 			kdmsg_state_reply(tag->state, 0);
970 			xa_done(tag, 0);
971 			lwkt_reltoken(&xa->tok);
972 			return(0);
973 		} else {
974 			tag->done = 1;
975 		}
976 	}
977 	++tag->waitseq;
978 	lwkt_reltoken(&xa->tok);
979 
980 	wakeup(tag);
981 
982 	return (0);
983 }
984 
985 static int
986 xa_bio_completion(kdmsg_state_t *state, kdmsg_msg_t *msg)
987 {
988 	xa_tag_t *tag = state->any.any;
989 	xa_softc_t *xa = tag->xa;
990 	struct bio *bio;
991 	struct buf *bp;
992 
993 	/*
994 	 * Get the bio from the tag.  If no bio is present we just do
995 	 * 'done' handling.
996 	 */
997 	if ((bio = tag->bio) == NULL)
998 		goto handle_done;
999 	bp = bio->bio_buf;
1000 
1001 	/*
1002 	 * Process return status
1003 	 */
1004 	switch(msg->any.head.cmd & DMSGF_CMDSWMASK) {
1005 	case DMSG_LNK_ERROR | DMSGF_REPLY:
1006 		bzero(&tag->status, sizeof(tag->status));
1007 		tag->status.head = msg->any.head;
1008 		if (tag->status.head.error)
1009 			tag->status.resid = bp->b_bcount;
1010 		else
1011 			tag->status.resid = 0;
1012 		break;
1013 	case DMSG_BLK_ERROR | DMSGF_REPLY:
1014 		tag->status = msg->any.blk_error;
1015 		break;
1016 	}
1017 
1018 	/*
1019 	 * Potentially move the bio back onto the pending queue if the
1020 	 * device is open and the error is related to losing the virtual
1021 	 * circuit.
1022 	 */
1023 	if (tag->status.head.error &&
1024 	    (msg->any.head.cmd & DMSGF_DELETE) && xa->opencnt) {
1025 		if (tag->status.head.error == DMSG_ERR_LOSTLINK ||
1026 		    tag->status.head.error == DMSG_ERR_CANTCIRC) {
1027 			goto handle_repend;
1028 		}
1029 	}
1030 
1031 	/*
1032 	 * Process bio completion
1033 	 *
1034 	 * For reads any returned data is zero-extended if necessary, so
1035 	 * the server can short-cut any all-zeros reads if it desires.
1036 	 */
1037 	switch(bp->b_cmd) {
1038 	case BUF_CMD_READ:
1039 		if (msg->aux_data && msg->aux_size) {
1040 			if (msg->aux_size < bp->b_bcount) {
1041 				bcopy(msg->aux_data, bp->b_data, msg->aux_size);
1042 				bzero(bp->b_data + msg->aux_size,
1043 				      bp->b_bcount - msg->aux_size);
1044 			} else {
1045 				bcopy(msg->aux_data, bp->b_data, bp->b_bcount);
1046 			}
1047 		} else {
1048 			bzero(bp->b_data, bp->b_bcount);
1049 		}
1050 		/* fall through */
1051 	case BUF_CMD_WRITE:
1052 	case BUF_CMD_FLUSH:
1053 	case BUF_CMD_FREEBLKS:
1054 	default:
1055 		if (tag->status.resid > bp->b_bcount)
1056 			tag->status.resid = bp->b_bcount;
1057 		bp->b_resid = tag->status.resid;
1058 		if ((bp->b_error = tag->status.head.error) != 0) {
1059 			bp->b_flags |= B_ERROR;
1060 		} else {
1061 			bp->b_resid = 0;
1062 		}
1063 		biodone(bio);
1064 		tag->bio = NULL;
1065 		break;
1066 	}
1067 
1068 	/*
1069 	 * Handle completion of the transaction.  If the bioq is not empty
1070 	 * we can initiate another bio on the same tag.
1071 	 *
1072 	 * NOTE: Most of our transactions will be single-message
1073 	 *	 CREATE+DELETEs, so we won't have to terminate the
1074 	 *	 transaction separately, here.  But just in case they
1075 	 *	 aren't be sure to terminate the transaction.
1076 	 */
1077 handle_done:
1078 	if (msg->any.head.cmd & DMSGF_DELETE) {
1079 		xa_done(tag, 1);
1080 		if ((state->txcmd & DMSGF_DELETE) == 0)
1081 			kdmsg_msg_reply(msg, 0);
1082 	}
1083 	return (0);
1084 
1085 	/*
1086 	 * Handle the case where the transaction failed due to a
1087 	 * connectivity issue.  The tag is put away with wasbio=0
1088 	 * and we restart the bio.
1089 	 *
1090 	 * Setting circ->lost causes xa_setup_cmd() to skip the circuit.
1091 	 * Other circuits might still be live.  Once a circuit gets messed
1092 	 * up it will (eventually) be deleted so we can simply leave (lost)
1093 	 * set forever after.
1094 	 */
1095 handle_repend:
1096 	lwkt_gettoken(&xa->tok);
1097 	kprintf("BIO CIRC FAILURE, REPEND BIO %p\n", bio);
1098 	tag->circ->lost = 1;
1099 	tag->bio = NULL;
1100 	xa_done(tag, 0);
1101 	if ((state->txcmd & DMSGF_DELETE) == 0)
1102 		kdmsg_msg_reply(msg, 0);
1103 
1104 	/*
1105 	 * Restart or requeue the bio
1106 	 */
1107 	tag = xa_setup_cmd(xa, bio);
1108 	if (tag)
1109 		xa_start(tag, NULL);
1110 	lwkt_reltoken(&xa->tok);
1111 	return (0);
1112 }
1113 
1114 /*
1115  * Restart as much deferred I/O as we can.
1116  *
1117  * Called with xa->tok held
1118  */
1119 static
1120 void
1121 xa_restart_deferred(xa_softc_t *xa)
1122 {
1123 	struct bio *bio;
1124 	xa_tag_t *tag;
1125 
1126 	while ((bio = TAILQ_FIRST(&xa->bioq)) != NULL) {
1127 		tag = xa_setup_cmd(xa, NULL);
1128 		if (tag == NULL)
1129 			break;
1130 		TAILQ_REMOVE(&xa->bioq, bio, bio_act);
1131 		tag->bio = bio;
1132 		xa_start(tag, NULL);
1133 	}
1134 }
1135