1 /* 2 * Copyright (c) 2012 The DragonFly Project. All rights reserved. 3 * 4 * This code is derived from software contributed to The DragonFly Project 5 * by Matthew Dillon <dillon@dragonflybsd.org> 6 * 7 * Redistribution and use in source and binary forms, with or without 8 * modification, are permitted provided that the following conditions 9 * are met: 10 * 11 * 1. Redistributions of source code must retain the above copyright 12 * notice, this list of conditions and the following disclaimer. 13 * 2. Redistributions in binary form must reproduce the above copyright 14 * notice, this list of conditions and the following disclaimer in 15 * the documentation and/or other materials provided with the 16 * distribution. 17 * 3. Neither the name of The DragonFly Project nor the names of its 18 * contributors may be used to endorse or promote products derived 19 * from this software without specific, prior written permission. 20 * 21 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS 22 * ``AS IS'' AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT 23 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS 24 * FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE 25 * COPYRIGHT HOLDERS OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, 26 * INCIDENTAL, SPECIAL, EXEMPLARY OR CONSEQUENTIAL DAMAGES (INCLUDING, 27 * BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; 28 * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED 29 * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, 30 * OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT 31 * OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF 32 * SUCH DAMAGE. 33 */ 34 /* 35 * This module allows disk devices to be created and associated with a 36 * communications pipe or socket. You open the device and issue an 37 * ioctl() to install a new disk along with its communications descriptor. 38 * 39 * All further communication occurs via the descriptor using the DMSG 40 * LNK_CONN, LNK_SPAN, and BLOCK protocols. The descriptor can be a 41 * direct connection to a remote machine's disk (in-kernenl), to a remote 42 * cluster controller, to the local cluster controller, etc. 43 * 44 * /dev/xdisk is the control device, issue ioctl()s to create the /dev/xa%d 45 * devices. These devices look like raw disks to the system. 46 * 47 * TODO: 48 * Handle circuit disconnects, leave bio's pending 49 * Restart bio's on circuit reconnect. 50 */ 51 #include <sys/param.h> 52 #include <sys/systm.h> 53 #include <sys/buf.h> 54 #include <sys/conf.h> 55 #include <sys/device.h> 56 #include <sys/devicestat.h> 57 #include <sys/disk.h> 58 #include <sys/kernel.h> 59 #include <sys/malloc.h> 60 #include <sys/sysctl.h> 61 #include <sys/proc.h> 62 #include <sys/queue.h> 63 #include <sys/udev.h> 64 #include <sys/uuid.h> 65 #include <sys/kern_syscall.h> 66 67 #include <sys/dmsg.h> 68 #include <sys/xdiskioctl.h> 69 70 #include <sys/buf2.h> 71 #include <sys/thread2.h> 72 73 struct xa_softc; 74 75 struct xa_tag { 76 TAILQ_ENTRY(xa_tag) entry; 77 struct xa_softc *xa; 78 dmsg_blk_error_t status; 79 kdmsg_state_t *state; 80 kdmsg_circuit_t *circ; 81 struct bio *bio; 82 int running; /* transaction running */ 83 int waitseq; /* streaming reply */ 84 int done; /* final (transaction closed) */ 85 }; 86 87 typedef struct xa_tag xa_tag_t; 88 89 struct xa_softc { 90 TAILQ_ENTRY(xa_softc) entry; 91 cdev_t dev; 92 kdmsg_iocom_t iocom; 93 struct xdisk_attach_ioctl xaioc; 94 struct disk_info info; 95 struct disk disk; 96 uuid_t pfs_fsid; 97 int unit; 98 int serializing; 99 int attached; 100 int opencnt; 101 uint64_t keyid; 102 xa_tag_t *opentag; 103 TAILQ_HEAD(, bio) bioq; 104 TAILQ_HEAD(, xa_tag) tag_freeq; 105 TAILQ_HEAD(, xa_tag) tag_pendq; 106 TAILQ_HEAD(, kdmsg_circuit) circq; 107 struct lwkt_token tok; 108 }; 109 110 typedef struct xa_softc xa_softc_t; 111 112 #define MAXTAGS 64 /* no real limit */ 113 114 static int xdisk_attach(struct xdisk_attach_ioctl *xaioc); 115 static int xdisk_detach(struct xdisk_attach_ioctl *xaioc); 116 static void xa_exit(kdmsg_iocom_t *iocom); 117 static void xa_terminate_check(struct xa_softc *xa); 118 static int xa_rcvdmsg(kdmsg_msg_t *msg); 119 static void xa_autodmsg(kdmsg_msg_t *msg); 120 121 static xa_tag_t *xa_setup_cmd(xa_softc_t *xa, struct bio *bio); 122 static void xa_start(xa_tag_t *tag, kdmsg_msg_t *msg); 123 static uint32_t xa_wait(xa_tag_t *tag, int seq); 124 static void xa_done(xa_tag_t *tag, int wasbio); 125 static int xa_sync_completion(kdmsg_state_t *state, kdmsg_msg_t *msg); 126 static int xa_bio_completion(kdmsg_state_t *state, kdmsg_msg_t *msg); 127 static void xa_restart_deferred(xa_softc_t *xa); 128 129 MALLOC_DEFINE(M_XDISK, "Networked disk client", "Network Disks"); 130 131 /* 132 * Control device, issue ioctls to create xa devices. 133 */ 134 static d_open_t xdisk_open; 135 static d_close_t xdisk_close; 136 static d_ioctl_t xdisk_ioctl; 137 138 static struct dev_ops xdisk_ops = { 139 { "xdisk", 0, D_MPSAFE | D_TRACKCLOSE }, 140 .d_open = xdisk_open, 141 .d_close = xdisk_close, 142 .d_ioctl = xdisk_ioctl 143 }; 144 145 /* 146 * XA disk devices 147 */ 148 static d_open_t xa_open; 149 static d_close_t xa_close; 150 static d_ioctl_t xa_ioctl; 151 static d_strategy_t xa_strategy; 152 static d_psize_t xa_size; 153 154 static struct dev_ops xa_ops = { 155 { "xa", 0, D_DISK | D_CANFREE | D_MPSAFE | D_TRACKCLOSE }, 156 .d_open = xa_open, 157 .d_close = xa_close, 158 .d_ioctl = xa_ioctl, 159 .d_read = physread, 160 .d_write = physwrite, 161 .d_strategy = xa_strategy, 162 .d_psize = xa_size 163 }; 164 165 static struct lwkt_token xdisk_token = LWKT_TOKEN_INITIALIZER(xdisk_token); 166 static int xdisk_opencount; 167 static cdev_t xdisk_dev; 168 static TAILQ_HEAD(, xa_softc) xa_queue; 169 170 /* 171 * Module initialization 172 */ 173 static int 174 xdisk_modevent(module_t mod, int type, void *data) 175 { 176 switch (type) { 177 case MOD_LOAD: 178 TAILQ_INIT(&xa_queue); 179 xdisk_dev = make_dev(&xdisk_ops, 0, 180 UID_ROOT, GID_WHEEL, 0600, "xdisk"); 181 break; 182 case MOD_UNLOAD: 183 case MOD_SHUTDOWN: 184 if (xdisk_opencount || TAILQ_FIRST(&xa_queue)) 185 return (EBUSY); 186 if (xdisk_dev) { 187 destroy_dev(xdisk_dev); 188 xdisk_dev = NULL; 189 } 190 dev_ops_remove_all(&xdisk_ops); 191 dev_ops_remove_all(&xa_ops); 192 break; 193 default: 194 break; 195 } 196 return 0; 197 } 198 199 DEV_MODULE(xdisk, xdisk_modevent, 0); 200 201 /* 202 * Control device 203 */ 204 static int 205 xdisk_open(struct dev_open_args *ap) 206 { 207 lwkt_gettoken(&xdisk_token); 208 ++xdisk_opencount; 209 lwkt_reltoken(&xdisk_token); 210 return(0); 211 } 212 213 static int 214 xdisk_close(struct dev_close_args *ap) 215 { 216 lwkt_gettoken(&xdisk_token); 217 --xdisk_opencount; 218 lwkt_reltoken(&xdisk_token); 219 return(0); 220 } 221 222 static int 223 xdisk_ioctl(struct dev_ioctl_args *ap) 224 { 225 int error; 226 227 switch(ap->a_cmd) { 228 case XDISKIOCATTACH: 229 error = xdisk_attach((void *)ap->a_data); 230 break; 231 case XDISKIOCDETACH: 232 error = xdisk_detach((void *)ap->a_data); 233 break; 234 default: 235 error = ENOTTY; 236 break; 237 } 238 return error; 239 } 240 241 /************************************************************************ 242 * DMSG INTERFACE * 243 ************************************************************************/ 244 245 static int 246 xdisk_attach(struct xdisk_attach_ioctl *xaioc) 247 { 248 xa_softc_t *xa; 249 xa_tag_t *tag; 250 struct file *fp; 251 int unit; 252 int n; 253 char devname[64]; 254 cdev_t dev; 255 256 /* 257 * Normalize ioctl params 258 */ 259 fp = holdfp(curproc->p_fd, xaioc->fd, -1); 260 if (fp == NULL) 261 return EINVAL; 262 if (xaioc->cl_label[sizeof(xaioc->cl_label) - 1] != 0) 263 return EINVAL; 264 if (xaioc->fs_label[sizeof(xaioc->fs_label) - 1] != 0) 265 return EINVAL; 266 if (xaioc->blksize < DEV_BSIZE || xaioc->blksize > MAXBSIZE) 267 return EINVAL; 268 269 /* 270 * See if the serial number is already present. If we are 271 * racing a termination the disk subsystem may still have 272 * duplicate entries not yet removed so we wait a bit and 273 * retry. 274 */ 275 lwkt_gettoken(&xdisk_token); 276 again: 277 TAILQ_FOREACH(xa, &xa_queue, entry) { 278 if (strcmp(xa->iocom.auto_lnk_conn.fs_label, 279 xaioc->fs_label) == 0) { 280 if (xa->serializing) { 281 tsleep(xa, 0, "xadelay", hz / 10); 282 goto again; 283 } 284 xa->serializing = 1; 285 kdmsg_iocom_uninit(&xa->iocom); 286 break; 287 } 288 } 289 290 /* 291 * Create a new xa if not already present 292 */ 293 if (xa == NULL) { 294 unit = 0; 295 for (;;) { 296 TAILQ_FOREACH(xa, &xa_queue, entry) { 297 if (xa->unit == unit) 298 break; 299 } 300 if (xa == NULL) 301 break; 302 ++unit; 303 } 304 xa = kmalloc(sizeof(*xa), M_XDISK, M_WAITOK|M_ZERO); 305 xa->unit = unit; 306 xa->serializing = 1; 307 lwkt_token_init(&xa->tok, "xa"); 308 TAILQ_INIT(&xa->circq); 309 TAILQ_INIT(&xa->bioq); 310 TAILQ_INIT(&xa->tag_freeq); 311 TAILQ_INIT(&xa->tag_pendq); 312 for (n = 0; n < MAXTAGS; ++n) { 313 tag = kmalloc(sizeof(*tag), M_XDISK, M_WAITOK|M_ZERO); 314 tag->xa = xa; 315 TAILQ_INSERT_TAIL(&xa->tag_freeq, tag, entry); 316 } 317 TAILQ_INSERT_TAIL(&xa_queue, xa, entry); 318 } else { 319 unit = xa->unit; 320 } 321 322 /* 323 * (xa) is now serializing. 324 */ 325 xa->xaioc = *xaioc; 326 xa->attached = 1; 327 lwkt_reltoken(&xdisk_token); 328 329 /* 330 * Create device 331 */ 332 if (xa->dev == NULL) { 333 dev = disk_create(unit, &xa->disk, &xa_ops); 334 dev->si_drv1 = xa; 335 xa->dev = dev; 336 } 337 338 xa->info.d_media_blksize = xaioc->blksize; 339 xa->info.d_media_blocks = xaioc->bytes / xaioc->blksize; 340 xa->info.d_dsflags = DSO_MBRQUIET | DSO_RAWPSIZE; 341 xa->info.d_secpertrack = 32; 342 xa->info.d_nheads = 64; 343 xa->info.d_secpercyl = xa->info.d_secpertrack * xa->info.d_nheads; 344 xa->info.d_ncylinders = 0; 345 if (xa->xaioc.fs_label[0]) 346 xa->info.d_serialno = xa->xaioc.fs_label; 347 348 /* 349 * Set up messaging connection 350 */ 351 ksnprintf(devname, sizeof(devname), "xa%d", unit); 352 kdmsg_iocom_init(&xa->iocom, xa, 353 KDMSG_IOCOMF_AUTOCONN | 354 KDMSG_IOCOMF_AUTORXSPAN | 355 KDMSG_IOCOMF_AUTOTXSPAN | 356 KDMSG_IOCOMF_AUTORXCIRC | 357 KDMSG_IOCOMF_AUTOTXCIRC, 358 M_XDISK, xa_rcvdmsg); 359 xa->iocom.exit_func = xa_exit; 360 361 kdmsg_iocom_reconnect(&xa->iocom, fp, devname); 362 363 /* 364 * Setup our LNK_CONN advertisement for autoinitiate. 365 * 366 * Our filter is setup to only accept PEER_BLOCK/SERVER 367 * advertisements. 368 */ 369 xa->iocom.auto_lnk_conn.pfs_type = DMSG_PFSTYPE_CLIENT; 370 xa->iocom.auto_lnk_conn.proto_version = DMSG_SPAN_PROTO_1; 371 xa->iocom.auto_lnk_conn.peer_type = DMSG_PEER_BLOCK; 372 xa->iocom.auto_lnk_conn.peer_mask = 1LLU << DMSG_PEER_BLOCK; 373 xa->iocom.auto_lnk_conn.pfs_mask = 1LLU << DMSG_PFSTYPE_SERVER; 374 ksnprintf(xa->iocom.auto_lnk_conn.cl_label, 375 sizeof(xa->iocom.auto_lnk_conn.cl_label), 376 "%s", xaioc->cl_label); 377 378 /* 379 * We need a unique pfs_fsid to avoid confusion. 380 * We supply a rendezvous fs_label using the serial number. 381 */ 382 kern_uuidgen(&xa->pfs_fsid, 1); 383 xa->iocom.auto_lnk_conn.pfs_fsid = xa->pfs_fsid; 384 ksnprintf(xa->iocom.auto_lnk_conn.fs_label, 385 sizeof(xa->iocom.auto_lnk_conn.fs_label), 386 "%s", xaioc->fs_label); 387 388 /* 389 * Setup our LNK_SPAN advertisement for autoinitiate 390 */ 391 xa->iocom.auto_lnk_span.pfs_type = DMSG_PFSTYPE_CLIENT; 392 xa->iocom.auto_lnk_span.proto_version = DMSG_SPAN_PROTO_1; 393 xa->iocom.auto_lnk_span.peer_type = DMSG_PEER_BLOCK; 394 ksnprintf(xa->iocom.auto_lnk_span.cl_label, 395 sizeof(xa->iocom.auto_lnk_span.cl_label), 396 "%s", xa->xaioc.cl_label); 397 398 kdmsg_iocom_autoinitiate(&xa->iocom, xa_autodmsg); 399 disk_setdiskinfo_sync(&xa->disk, &xa->info); 400 401 lwkt_gettoken(&xdisk_token); 402 xa->serializing = 0; 403 xa_terminate_check(xa); 404 lwkt_reltoken(&xdisk_token); 405 406 return(0); 407 } 408 409 static int 410 xdisk_detach(struct xdisk_attach_ioctl *xaioc) 411 { 412 struct xa_softc *xa; 413 414 lwkt_gettoken(&xdisk_token); 415 for (;;) { 416 TAILQ_FOREACH(xa, &xa_queue, entry) { 417 if (strcmp(xa->iocom.auto_lnk_conn.fs_label, 418 xaioc->fs_label) == 0) { 419 break; 420 } 421 } 422 if (xa == NULL || xa->serializing == 0) { 423 xa->serializing = 1; 424 break; 425 } 426 tsleep(xa, 0, "xadet", hz / 10); 427 } 428 if (xa) { 429 kdmsg_iocom_uninit(&xa->iocom); 430 xa->serializing = 0; 431 } 432 lwkt_reltoken(&xdisk_token); 433 return(0); 434 } 435 436 /* 437 * Called from iocom core transmit thread upon disconnect. 438 */ 439 static 440 void 441 xa_exit(kdmsg_iocom_t *iocom) 442 { 443 struct xa_softc *xa = iocom->handle; 444 445 lwkt_gettoken(&xa->tok); 446 lwkt_gettoken(&xdisk_token); 447 448 /* 449 * We must wait for any I/O's to complete to ensure that all 450 * state structure references are cleaned up before returning. 451 */ 452 xa->attached = -1; /* force deferral or failure */ 453 while (TAILQ_FIRST(&xa->tag_pendq)) { 454 tsleep(xa, 0, "xabiow", hz / 10); 455 } 456 457 /* 458 * All serializing code checks for de-initialization so only 459 * do it if we aren't already serializing. 460 */ 461 if (xa->serializing == 0) { 462 xa->serializing = 1; 463 kdmsg_iocom_uninit(iocom); 464 xa->serializing = 0; 465 } 466 467 /* 468 * If the drive is not in use and no longer attach it can be 469 * destroyed. 470 */ 471 xa->attached = 0; 472 xa_terminate_check(xa); 473 lwkt_reltoken(&xdisk_token); 474 lwkt_reltoken(&xa->tok); 475 } 476 477 /* 478 * Determine if we can destroy the xa_softc. 479 * 480 * Called with xdisk_token held. 481 */ 482 static 483 void 484 xa_terminate_check(struct xa_softc *xa) 485 { 486 xa_tag_t *tag; 487 struct bio *bio; 488 489 if (xa->opencnt || xa->attached || xa->serializing) 490 return; 491 xa->serializing = 1; 492 kdmsg_iocom_uninit(&xa->iocom); 493 494 /* 495 * When destroying an xa make sure all pending I/O (typically 496 * from the disk probe) is done. 497 * 498 * XXX what about new I/O initiated prior to disk_destroy(). 499 */ 500 while ((tag = TAILQ_FIRST(&xa->tag_pendq)) != NULL) { 501 TAILQ_REMOVE(&xa->tag_pendq, tag, entry); 502 if ((bio = tag->bio) != NULL) { 503 tag->bio = NULL; 504 bio->bio_buf->b_error = ENXIO; 505 bio->bio_buf->b_flags |= B_ERROR; 506 biodone(bio); 507 } 508 TAILQ_INSERT_TAIL(&xa->tag_freeq, tag, entry); 509 } 510 if (xa->dev) { 511 disk_destroy(&xa->disk); 512 xa->dev->si_drv1 = NULL; 513 xa->dev = NULL; 514 } 515 KKASSERT(xa->opencnt == 0 && xa->attached == 0); 516 while ((tag = TAILQ_FIRST(&xa->tag_freeq)) != NULL) { 517 TAILQ_REMOVE(&xa->tag_freeq, tag, entry); 518 tag->xa = NULL; 519 kfree(tag, M_XDISK); 520 } 521 KKASSERT(TAILQ_EMPTY(&xa->tag_pendq)); 522 TAILQ_REMOVE(&xa_queue, xa, entry); /* XXX */ 523 kfree(xa, M_XDISK); 524 } 525 526 /* 527 * Shim to catch and record virtual circuit events. 528 */ 529 static void 530 xa_autodmsg(kdmsg_msg_t *msg) 531 { 532 xa_softc_t *xa = msg->iocom->handle; 533 534 kdmsg_circuit_t *circ; 535 kdmsg_circuit_t *cscan; 536 uint32_t xcmd; 537 538 /* 539 * Because this is just a shim we don't have a state callback for 540 * the transactions we are sniffing, so make things easier by 541 * calculating the original command along with the current message's 542 * flags. This is because transactions are made up of numerous 543 * messages and only the first typically specifies the actual command. 544 */ 545 if (msg->state) { 546 xcmd = msg->state->icmd | 547 (msg->any.head.cmd & (DMSGF_CREATE | 548 DMSGF_DELETE | 549 DMSGF_REPLY)); 550 } else { 551 xcmd = msg->any.head.cmd; 552 } 553 554 /* 555 * Add or remove a circuit, sorted by weight (lower numbers are 556 * better). 557 */ 558 switch(xcmd) { 559 case DMSG_LNK_CIRC | DMSGF_CREATE | DMSGF_REPLY: 560 /* 561 * Track established circuits 562 */ 563 circ = msg->state->any.circ; 564 lwkt_gettoken(&xa->tok); 565 if (circ->recorded == 0) { 566 TAILQ_FOREACH(cscan, &xa->circq, entry) { 567 if (circ->weight < cscan->weight) 568 break; 569 } 570 if (cscan) 571 TAILQ_INSERT_BEFORE(cscan, circ, entry); 572 else 573 TAILQ_INSERT_TAIL(&xa->circq, circ, entry); 574 circ->recorded = 1; 575 } 576 577 /* 578 * Restart any deferred I/O. 579 */ 580 xa_restart_deferred(xa); 581 lwkt_reltoken(&xa->tok); 582 break; 583 case DMSG_LNK_CIRC | DMSGF_DELETE | DMSGF_REPLY: 584 /* 585 * Losing virtual circuit. Remove the circ from contention. 586 */ 587 circ = msg->state->any.circ; 588 lwkt_gettoken(&xa->tok); 589 if (circ->recorded) { 590 TAILQ_REMOVE(&xa->circq, circ, entry); 591 circ->recorded = 0; 592 } 593 xa_restart_deferred(xa); 594 lwkt_reltoken(&xa->tok); 595 break; 596 default: 597 break; 598 } 599 } 600 601 static int 602 xa_rcvdmsg(kdmsg_msg_t *msg) 603 { 604 switch(msg->any.head.cmd & DMSGF_TRANSMASK) { 605 case DMSG_DBG_SHELL: 606 /* 607 * Execute shell command (not supported atm). 608 * 609 * This is a one-way packet but if not (e.g. if part of 610 * a streaming transaction), we will have already closed 611 * our end. 612 */ 613 kdmsg_msg_reply(msg, DMSG_ERR_NOSUPP); 614 break; 615 case DMSG_DBG_SHELL | DMSGF_REPLY: 616 /* 617 * Receive one or more replies to a shell command that we 618 * sent. 619 * 620 * This is a one-way packet but if not (e.g. if part of 621 * a streaming transaction), we will have already closed 622 * our end. 623 */ 624 if (msg->aux_data) { 625 msg->aux_data[msg->aux_size - 1] = 0; 626 kprintf("xdisk: DEBUGMSG: %s\n", msg->aux_data); 627 } 628 break; 629 default: 630 /* 631 * Unsupported LNK message received. We only need to 632 * reply if it's a transaction in order to close our end. 633 * Ignore any one-way messages are any further messages 634 * associated with the transaction. 635 * 636 * NOTE: This case also includes DMSG_LNK_ERROR messages 637 * which might be one-way, replying to those would 638 * cause an infinite ping-pong. 639 */ 640 if (msg->any.head.cmd & DMSGF_CREATE) 641 kdmsg_msg_reply(msg, DMSG_ERR_NOSUPP); 642 break; 643 } 644 return(0); 645 } 646 647 648 /************************************************************************ 649 * XA DEVICE INTERFACE * 650 ************************************************************************/ 651 652 static int 653 xa_open(struct dev_open_args *ap) 654 { 655 cdev_t dev = ap->a_head.a_dev; 656 xa_softc_t *xa; 657 xa_tag_t *tag; 658 kdmsg_msg_t *msg; 659 int error; 660 661 dev->si_bsize_phys = 512; 662 dev->si_bsize_best = 32768; 663 664 /* 665 * Interlock open with opencnt, wait for attachment operations 666 * to finish. 667 */ 668 lwkt_gettoken(&xdisk_token); 669 again: 670 xa = dev->si_drv1; 671 if (xa == NULL) { 672 lwkt_reltoken(&xdisk_token); 673 return ENXIO; /* raced destruction */ 674 } 675 if (xa->serializing) { 676 tsleep(xa, 0, "xarace", hz / 10); 677 goto again; 678 } 679 if (xa->attached == 0) { 680 lwkt_reltoken(&xdisk_token); 681 return ENXIO; /* raced destruction */ 682 } 683 684 /* 685 * Serialize initial open 686 */ 687 if (xa->opencnt++ > 0) { 688 lwkt_reltoken(&xdisk_token); 689 return(0); 690 } 691 xa->serializing = 1; 692 lwkt_reltoken(&xdisk_token); 693 694 tag = xa_setup_cmd(xa, NULL); 695 if (tag == NULL) { 696 lwkt_gettoken(&xdisk_token); 697 KKASSERT(xa->opencnt > 0); 698 --xa->opencnt; 699 xa->serializing = 0; 700 xa_terminate_check(xa); 701 lwkt_reltoken(&xdisk_token); 702 return(ENXIO); 703 } 704 msg = kdmsg_msg_alloc(&xa->iocom, tag->circ, 705 DMSG_BLK_OPEN | DMSGF_CREATE, 706 xa_sync_completion, tag); 707 msg->any.blk_open.modes = DMSG_BLKOPEN_RD | DMSG_BLKOPEN_WR; 708 xa_start(tag, msg); 709 if (xa_wait(tag, 0) == 0) { 710 xa->keyid = tag->status.keyid; 711 xa->opentag = tag; /* leave tag open */ 712 xa->serializing = 0; 713 error = 0; 714 } else { 715 xa_done(tag, 0); 716 lwkt_gettoken(&xdisk_token); 717 KKASSERT(xa->opencnt > 0); 718 --xa->opencnt; 719 xa->serializing = 0; 720 xa_terminate_check(xa); 721 lwkt_reltoken(&xdisk_token); 722 error = ENXIO; 723 } 724 return (error); 725 } 726 727 static int 728 xa_close(struct dev_close_args *ap) 729 { 730 cdev_t dev = ap->a_head.a_dev; 731 xa_softc_t *xa; 732 xa_tag_t *tag; 733 734 xa = dev->si_drv1; 735 if (xa == NULL) 736 return ENXIO; /* raced destruction */ 737 738 lwkt_gettoken(&xa->tok); 739 if ((tag = xa->opentag) != NULL) { 740 xa->opentag = NULL; 741 kdmsg_state_reply(tag->state, 0); 742 while (tag->done == 0) 743 xa_wait(tag, tag->waitseq); 744 xa_done(tag, 0); 745 } 746 lwkt_reltoken(&xa->tok); 747 748 lwkt_gettoken(&xdisk_token); 749 KKASSERT(xa->opencnt > 0); 750 --xa->opencnt; 751 xa_terminate_check(xa); 752 lwkt_reltoken(&xdisk_token); 753 754 return(0); 755 } 756 757 static int 758 xa_strategy(struct dev_strategy_args *ap) 759 { 760 xa_softc_t *xa = ap->a_head.a_dev->si_drv1; 761 xa_tag_t *tag; 762 struct bio *bio = ap->a_bio; 763 764 /* 765 * Allow potentially temporary link failures to fail the I/Os 766 * only if the device is not open. That is, we allow the disk 767 * probe code prior to mount to fail. 768 */ 769 if (xa->attached == 0 && xa->opencnt == 0) { 770 bio->bio_buf->b_error = ENXIO; 771 bio->bio_buf->b_flags |= B_ERROR; 772 biodone(bio); 773 return(0); 774 } 775 776 tag = xa_setup_cmd(xa, bio); 777 if (tag) 778 xa_start(tag, NULL); 779 return(0); 780 } 781 782 static int 783 xa_ioctl(struct dev_ioctl_args *ap) 784 { 785 return(ENOTTY); 786 } 787 788 static int 789 xa_size(struct dev_psize_args *ap) 790 { 791 struct xa_softc *xa; 792 793 if ((xa = ap->a_head.a_dev->si_drv1) == NULL) 794 return (ENXIO); 795 ap->a_result = xa->info.d_media_blocks; 796 return (0); 797 } 798 799 /************************************************************************ 800 * XA BLOCK PROTOCOL STATE MACHINE * 801 ************************************************************************ 802 * 803 * Implement tag/msg setup and related functions. 804 */ 805 static xa_tag_t * 806 xa_setup_cmd(xa_softc_t *xa, struct bio *bio) 807 { 808 kdmsg_circuit_t *circ; 809 xa_tag_t *tag; 810 811 /* 812 * Only get a tag if we have a valid virtual circuit to the server. 813 */ 814 lwkt_gettoken(&xa->tok); 815 TAILQ_FOREACH(circ, &xa->circq, entry) { 816 if (circ->lost == 0) 817 break; 818 } 819 if (circ == NULL || xa->attached <= 0) { 820 tag = NULL; 821 } else if ((tag = TAILQ_FIRST(&xa->tag_freeq)) != NULL) { 822 TAILQ_REMOVE(&xa->tag_freeq, tag, entry); 823 tag->bio = bio; 824 tag->circ = circ; 825 kdmsg_circ_hold(circ); 826 TAILQ_INSERT_TAIL(&xa->tag_pendq, tag, entry); 827 } 828 829 /* 830 * If we can't dispatch now and this is a bio, queue it for later. 831 */ 832 if (tag == NULL && bio) { 833 TAILQ_INSERT_TAIL(&xa->bioq, bio, bio_act); 834 } 835 lwkt_reltoken(&xa->tok); 836 837 return (tag); 838 } 839 840 static void 841 xa_start(xa_tag_t *tag, kdmsg_msg_t *msg) 842 { 843 xa_softc_t *xa = tag->xa; 844 845 if (msg == NULL) { 846 struct bio *bio; 847 struct buf *bp; 848 849 KKASSERT(tag->bio); 850 bio = tag->bio; 851 bp = bio->bio_buf; 852 853 switch(bp->b_cmd) { 854 case BUF_CMD_READ: 855 msg = kdmsg_msg_alloc(&xa->iocom, tag->circ, 856 DMSG_BLK_READ | 857 DMSGF_CREATE | DMSGF_DELETE, 858 xa_bio_completion, tag); 859 msg->any.blk_read.keyid = xa->keyid; 860 msg->any.blk_read.offset = bio->bio_offset; 861 msg->any.blk_read.bytes = bp->b_bcount; 862 break; 863 case BUF_CMD_WRITE: 864 msg = kdmsg_msg_alloc(&xa->iocom, tag->circ, 865 DMSG_BLK_WRITE | 866 DMSGF_CREATE | DMSGF_DELETE, 867 xa_bio_completion, tag); 868 msg->any.blk_write.keyid = xa->keyid; 869 msg->any.blk_write.offset = bio->bio_offset; 870 msg->any.blk_write.bytes = bp->b_bcount; 871 msg->aux_data = bp->b_data; 872 msg->aux_size = bp->b_bcount; 873 break; 874 case BUF_CMD_FLUSH: 875 msg = kdmsg_msg_alloc(&xa->iocom, tag->circ, 876 DMSG_BLK_FLUSH | 877 DMSGF_CREATE | DMSGF_DELETE, 878 xa_bio_completion, tag); 879 msg->any.blk_flush.keyid = xa->keyid; 880 msg->any.blk_flush.offset = bio->bio_offset; 881 msg->any.blk_flush.bytes = bp->b_bcount; 882 break; 883 case BUF_CMD_FREEBLKS: 884 msg = kdmsg_msg_alloc(&xa->iocom, tag->circ, 885 DMSG_BLK_FREEBLKS | 886 DMSGF_CREATE | DMSGF_DELETE, 887 xa_bio_completion, tag); 888 msg->any.blk_freeblks.keyid = xa->keyid; 889 msg->any.blk_freeblks.offset = bio->bio_offset; 890 msg->any.blk_freeblks.bytes = bp->b_bcount; 891 break; 892 default: 893 bp->b_flags |= B_ERROR; 894 bp->b_error = EIO; 895 biodone(bio); 896 tag->bio = NULL; 897 break; 898 } 899 } 900 901 tag->done = 0; 902 tag->waitseq = 0; 903 if (msg) { 904 tag->state = msg->state; 905 kdmsg_msg_write(msg); 906 } else { 907 xa_done(tag, 1); 908 } 909 } 910 911 static uint32_t 912 xa_wait(xa_tag_t *tag, int seq) 913 { 914 xa_softc_t *xa = tag->xa; 915 916 lwkt_gettoken(&xa->tok); 917 while (tag->waitseq == seq) 918 tsleep(tag, 0, "xawait", 0); 919 lwkt_reltoken(&xa->tok); 920 return (tag->status.head.error); 921 } 922 923 static void 924 xa_done(xa_tag_t *tag, int wasbio) 925 { 926 xa_softc_t *xa = tag->xa; 927 struct bio *bio; 928 929 KKASSERT(tag->bio == NULL); 930 tag->done = 1; 931 tag->state = NULL; 932 933 lwkt_gettoken(&xa->tok); 934 if (wasbio && (bio = TAILQ_FIRST(&xa->bioq)) != NULL) { 935 TAILQ_REMOVE(&xa->bioq, bio, bio_act); 936 tag->bio = bio; 937 lwkt_reltoken(&xa->tok); 938 xa_start(tag, NULL); 939 } else { 940 if (tag->circ) { 941 kdmsg_circ_drop(tag->circ); 942 tag->circ = NULL; 943 } 944 TAILQ_REMOVE(&xa->tag_pendq, tag, entry); 945 TAILQ_INSERT_TAIL(&xa->tag_freeq, tag, entry); 946 lwkt_reltoken(&xa->tok); 947 } 948 } 949 950 static int 951 xa_sync_completion(kdmsg_state_t *state, kdmsg_msg_t *msg) 952 { 953 xa_tag_t *tag = state->any.any; 954 xa_softc_t *xa = tag->xa; 955 956 switch(msg->any.head.cmd & DMSGF_CMDSWMASK) { 957 case DMSG_LNK_ERROR | DMSGF_REPLY: 958 bzero(&tag->status, sizeof(tag->status)); 959 tag->status.head = msg->any.head; 960 break; 961 case DMSG_BLK_ERROR | DMSGF_REPLY: 962 tag->status = msg->any.blk_error; 963 break; 964 } 965 lwkt_gettoken(&xa->tok); 966 if (msg->any.head.cmd & DMSGF_DELETE) { /* receive termination */ 967 if (xa->opentag == tag) { 968 xa->opentag = NULL; /* XXX */ 969 kdmsg_state_reply(tag->state, 0); 970 xa_done(tag, 0); 971 lwkt_reltoken(&xa->tok); 972 return(0); 973 } else { 974 tag->done = 1; 975 } 976 } 977 ++tag->waitseq; 978 lwkt_reltoken(&xa->tok); 979 980 wakeup(tag); 981 982 return (0); 983 } 984 985 static int 986 xa_bio_completion(kdmsg_state_t *state, kdmsg_msg_t *msg) 987 { 988 xa_tag_t *tag = state->any.any; 989 xa_softc_t *xa = tag->xa; 990 struct bio *bio; 991 struct buf *bp; 992 993 /* 994 * Get the bio from the tag. If no bio is present we just do 995 * 'done' handling. 996 */ 997 if ((bio = tag->bio) == NULL) 998 goto handle_done; 999 bp = bio->bio_buf; 1000 1001 /* 1002 * Process return status 1003 */ 1004 switch(msg->any.head.cmd & DMSGF_CMDSWMASK) { 1005 case DMSG_LNK_ERROR | DMSGF_REPLY: 1006 bzero(&tag->status, sizeof(tag->status)); 1007 tag->status.head = msg->any.head; 1008 if (tag->status.head.error) 1009 tag->status.resid = bp->b_bcount; 1010 else 1011 tag->status.resid = 0; 1012 break; 1013 case DMSG_BLK_ERROR | DMSGF_REPLY: 1014 tag->status = msg->any.blk_error; 1015 break; 1016 } 1017 1018 /* 1019 * Potentially move the bio back onto the pending queue if the 1020 * device is open and the error is related to losing the virtual 1021 * circuit. 1022 */ 1023 if (tag->status.head.error && 1024 (msg->any.head.cmd & DMSGF_DELETE) && xa->opencnt) { 1025 if (tag->status.head.error == DMSG_ERR_LOSTLINK || 1026 tag->status.head.error == DMSG_ERR_CANTCIRC) { 1027 goto handle_repend; 1028 } 1029 } 1030 1031 /* 1032 * Process bio completion 1033 * 1034 * For reads any returned data is zero-extended if necessary, so 1035 * the server can short-cut any all-zeros reads if it desires. 1036 */ 1037 switch(bp->b_cmd) { 1038 case BUF_CMD_READ: 1039 if (msg->aux_data && msg->aux_size) { 1040 if (msg->aux_size < bp->b_bcount) { 1041 bcopy(msg->aux_data, bp->b_data, msg->aux_size); 1042 bzero(bp->b_data + msg->aux_size, 1043 bp->b_bcount - msg->aux_size); 1044 } else { 1045 bcopy(msg->aux_data, bp->b_data, bp->b_bcount); 1046 } 1047 } else { 1048 bzero(bp->b_data, bp->b_bcount); 1049 } 1050 /* fall through */ 1051 case BUF_CMD_WRITE: 1052 case BUF_CMD_FLUSH: 1053 case BUF_CMD_FREEBLKS: 1054 default: 1055 if (tag->status.resid > bp->b_bcount) 1056 tag->status.resid = bp->b_bcount; 1057 bp->b_resid = tag->status.resid; 1058 if ((bp->b_error = tag->status.head.error) != 0) { 1059 bp->b_flags |= B_ERROR; 1060 } else { 1061 bp->b_resid = 0; 1062 } 1063 biodone(bio); 1064 tag->bio = NULL; 1065 break; 1066 } 1067 1068 /* 1069 * Handle completion of the transaction. If the bioq is not empty 1070 * we can initiate another bio on the same tag. 1071 * 1072 * NOTE: Most of our transactions will be single-message 1073 * CREATE+DELETEs, so we won't have to terminate the 1074 * transaction separately, here. But just in case they 1075 * aren't be sure to terminate the transaction. 1076 */ 1077 handle_done: 1078 if (msg->any.head.cmd & DMSGF_DELETE) { 1079 xa_done(tag, 1); 1080 if ((state->txcmd & DMSGF_DELETE) == 0) 1081 kdmsg_msg_reply(msg, 0); 1082 } 1083 return (0); 1084 1085 /* 1086 * Handle the case where the transaction failed due to a 1087 * connectivity issue. The tag is put away with wasbio=0 1088 * and we restart the bio. 1089 * 1090 * Setting circ->lost causes xa_setup_cmd() to skip the circuit. 1091 * Other circuits might still be live. Once a circuit gets messed 1092 * up it will (eventually) be deleted so we can simply leave (lost) 1093 * set forever after. 1094 */ 1095 handle_repend: 1096 lwkt_gettoken(&xa->tok); 1097 kprintf("BIO CIRC FAILURE, REPEND BIO %p\n", bio); 1098 tag->circ->lost = 1; 1099 tag->bio = NULL; 1100 xa_done(tag, 0); 1101 if ((state->txcmd & DMSGF_DELETE) == 0) 1102 kdmsg_msg_reply(msg, 0); 1103 1104 /* 1105 * Restart or requeue the bio 1106 */ 1107 tag = xa_setup_cmd(xa, bio); 1108 if (tag) 1109 xa_start(tag, NULL); 1110 lwkt_reltoken(&xa->tok); 1111 return (0); 1112 } 1113 1114 /* 1115 * Restart as much deferred I/O as we can. 1116 * 1117 * Called with xa->tok held 1118 */ 1119 static 1120 void 1121 xa_restart_deferred(xa_softc_t *xa) 1122 { 1123 struct bio *bio; 1124 xa_tag_t *tag; 1125 1126 while ((bio = TAILQ_FIRST(&xa->bioq)) != NULL) { 1127 tag = xa_setup_cmd(xa, NULL); 1128 if (tag == NULL) 1129 break; 1130 TAILQ_REMOVE(&xa->bioq, bio, bio_act); 1131 tag->bio = bio; 1132 xa_start(tag, NULL); 1133 } 1134 } 1135