1 /* 2 * Copyright (c) 2011-2012 The DragonFly Project. All rights reserved. 3 * 4 * This code is derived from software contributed to The DragonFly Project 5 * by Matthew Dillon <dillon@dragonflybsd.org> 6 * by Venkatesh Srinivas <vsrinivas@dragonflybsd.org> 7 * 8 * Redistribution and use in source and binary forms, with or without 9 * modification, are permitted provided that the following conditions 10 * are met: 11 * 12 * 1. Redistributions of source code must retain the above copyright 13 * notice, this list of conditions and the following disclaimer. 14 * 2. Redistributions in binary form must reproduce the above copyright 15 * notice, this list of conditions and the following disclaimer in 16 * the documentation and/or other materials provided with the 17 * distribution. 18 * 3. Neither the name of The DragonFly Project nor the names of its 19 * contributors may be used to endorse or promote products derived 20 * from this software without specific, prior written permission. 21 * 22 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS 23 * ``AS IS'' AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT 24 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS 25 * FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE 26 * COPYRIGHT HOLDERS OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, 27 * INCIDENTAL, SPECIAL, EXEMPLARY OR CONSEQUENTIAL DAMAGES (INCLUDING, 28 * BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; 29 * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED 30 * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, 31 * OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT 32 * OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF 33 * SUCH DAMAGE. 34 */ 35 36 #include "hammer2.h" 37 38 #include <sys/xdiskioctl.h> 39 #include <machine/atomic.h> 40 41 struct hammer2_media_config { 42 hammer2_volconf_t copy_run; 43 hammer2_volconf_t copy_pend; 44 pthread_t thread; 45 pthread_cond_t cond; 46 int ctl; 47 int fd; 48 int pipefd[2]; /* signal stop */ 49 dmsg_iocom_t iocom; 50 pthread_t iocom_thread; 51 enum { H2MC_STOPPED, H2MC_CONNECT, H2MC_RUNNING } state; 52 }; 53 54 typedef struct hammer2_media_config hammer2_media_config_t; 55 56 #define H2CONFCTL_STOP 0x00000001 57 #define H2CONFCTL_UPDATE 0x00000002 58 59 struct diskcon { 60 TAILQ_ENTRY(diskcon) entry; 61 char *disk; 62 }; 63 64 struct service_node_opaque { 65 char cl_label[64]; 66 char fs_label[64]; 67 dmsg_media_block_t block; 68 int attached; 69 int servicing; 70 int servicefd; 71 }; 72 73 struct autoconn { 74 TAILQ_ENTRY(autoconn) entry; 75 char *host; 76 int stage; 77 int stopme; 78 int pipefd[2]; /* {read,write} */ 79 enum { AUTOCONN_INACTIVE, AUTOCONN_ACTIVE } state; 80 pthread_t thread; 81 }; 82 83 #define WS " \r\n" 84 85 TAILQ_HEAD(, diskcon) diskconq = TAILQ_HEAD_INITIALIZER(diskconq); 86 static pthread_mutex_t diskmtx; 87 static pthread_mutex_t confmtx; 88 89 static void *service_thread(void *data); 90 static void *udev_thread(void *data); 91 static void *autoconn_thread(void *data); 92 static void master_reconnect(const char *mntpt); 93 static void disk_reconnect(const char *disk); 94 static void disk_disconnect(void *handle); 95 static void udev_check_disks(void); 96 static void hammer2_usrmsg_handler(dmsg_msg_t *msg, int unmanaged); 97 static void hammer2_node_handler(void **opaque, struct dmsg_msg *msg, int op); 98 static void *hammer2_volconf_thread(void *info); 99 static void hammer2_volconf_signal(dmsg_iocom_t *iocom); 100 static void hammer2_volconf_start(hammer2_media_config_t *conf, 101 const char *hostname); 102 static void hammer2_volconf_stop(hammer2_media_config_t *conf); 103 104 105 static void xdisk_reconnect(struct service_node_opaque *info); 106 static void xdisk_disconnect(void *handle); 107 static void *xdisk_attach_tmpthread(void *data); 108 109 /* 110 * Start-up the master listener daemon for the machine. This daemon runs 111 * a UDP discovery protocol, a TCP rendezvous, and scans certain files 112 * and directories for work. 113 * 114 * -- 115 * 116 * The only purpose for the UDP discovery protocol is to determine what 117 * other IPs on the LAN are running the hammer2 service daemon. DNS is not 118 * required to operate, but hostnames (if assigned) must be unique. If 119 * no hostname is assigned the host's IP is used as the name. This name 120 * is broadcast along with the mtime of the originator's private key. 121 * 122 * Receiving hammer2 service daemons which are able to match the label against 123 * /etc/hammer2/remote/<label>.pub will initiate a persistent connection 124 * to the target. Removal of the file will cause a disconnection. A failed 125 * public key negotiation stops further connection attempts until either the 126 * file is updated or the remote mtime is updated. 127 * 128 * Generally speaking this results in a web of connections, typically a 129 * combination of point-to-point for the more important links and relayed 130 * (spanning tree) for less important or filtered links. 131 * 132 * -- 133 * 134 * The TCP listener serves as a rendezvous point in the cluster, accepting 135 * connections, performing registrations and authentications, maintaining 136 * the spanning tree, and keeping track of message state so disconnects can 137 * be handled properly. 138 * 139 * Once authenticated only low-level messaging protocols (which includes 140 * tracking persistent messages) are handled by this daemon. This daemon 141 * does not run the higher level quorum or locking protocols. 142 * 143 * -- 144 * 145 * The file /etc/hammer2/autoconn, if it exists, contains a list of targets 146 * to connect to (which do not have to be on the local lan). This list will 147 * be retried until a connection can be established. The file is not usually 148 * needed for linkages local to the LAN. 149 */ 150 int 151 cmd_service(void) 152 { 153 struct sockaddr_in lsin; 154 int on; 155 int lfd; 156 157 /* 158 * Acquire socket and set options 159 */ 160 if ((lfd = socket(AF_INET, SOCK_STREAM, 0)) < 0) { 161 fprintf(stderr, "master_listen: socket(): %s\n", 162 strerror(errno)); 163 return 1; 164 } 165 on = 1; 166 setsockopt(lfd, SOL_SOCKET, SO_REUSEADDR, &on, sizeof(on)); 167 168 /* 169 * Setup listen port and try to bind. If the bind fails we assume 170 * that a master listener process is already running and silently 171 * fail. 172 */ 173 bzero(&lsin, sizeof(lsin)); 174 lsin.sin_family = AF_INET; 175 lsin.sin_addr.s_addr = INADDR_ANY; 176 lsin.sin_port = htons(DMSG_LISTEN_PORT); 177 if (bind(lfd, (struct sockaddr *)&lsin, sizeof(lsin)) < 0) { 178 close(lfd); 179 if (QuietOpt == 0) { 180 fprintf(stderr, 181 "master listen: daemon already running\n"); 182 } 183 return 0; 184 } 185 if (QuietOpt == 0) 186 fprintf(stderr, "master listen: startup\n"); 187 listen(lfd, 50); 188 189 /* 190 * Fork and disconnect the controlling terminal and parent process, 191 * executing the specified function as a pthread. 192 * 193 * Returns to the original process which can then continue running. 194 * In debug mode this call will create the pthread without forking 195 * and set NormalExit to 0, instead of fork. 196 */ 197 hammer2_demon(service_thread, (void *)(intptr_t)lfd); 198 if (NormalExit) 199 close(lfd); 200 return 0; 201 } 202 203 /* 204 * Master listen/accept thread. Accept connections on the master socket, 205 * starting a pthread for each one. 206 */ 207 static 208 void * 209 service_thread(void *data) 210 { 211 struct sockaddr_in asin; 212 socklen_t alen; 213 pthread_t thread; 214 dmsg_master_service_info_t *info; 215 int lfd = (int)(intptr_t)data; 216 int fd; 217 int i; 218 int count; 219 struct statfs *mntbuf = NULL; 220 struct statvfs *mntvbuf = NULL; 221 222 /* 223 * Nobody waits for us 224 */ 225 setproctitle("hammer2 master listen"); 226 pthread_detach(pthread_self()); 227 228 /* 229 * Start up a thread to handle block device monitoring 230 */ 231 thread = NULL; 232 pthread_create(&thread, NULL, udev_thread, NULL); 233 234 /* 235 * Start thread to manage /etc/hammer2/autoconn 236 */ 237 thread = NULL; 238 pthread_create(&thread, NULL, autoconn_thread, NULL); 239 240 /* 241 * Scan existing hammer2 mounts and reconnect to them using 242 * HAMMER2IOC_RECLUSTER. 243 */ 244 count = getmntvinfo(&mntbuf, &mntvbuf, MNT_NOWAIT); 245 for (i = 0; i < count; ++i) { 246 if (strcmp(mntbuf[i].f_fstypename, "hammer2") == 0) 247 master_reconnect(mntbuf[i].f_mntonname); 248 } 249 250 /* 251 * Accept connections and create pthreads to handle them after 252 * validating the IP. 253 */ 254 for (;;) { 255 alen = sizeof(asin); 256 fd = accept(lfd, (struct sockaddr *)&asin, &alen); 257 if (fd < 0) { 258 if (errno == EINTR) 259 continue; 260 break; 261 } 262 thread = NULL; 263 fprintf(stderr, "service_thread: accept fd %d\n", fd); 264 info = malloc(sizeof(*info)); 265 bzero(info, sizeof(*info)); 266 info->fd = fd; 267 info->detachme = 1; 268 info->usrmsg_callback = hammer2_usrmsg_handler; 269 info->node_handler = hammer2_node_handler; 270 info->label = strdup("client"); 271 pthread_create(&thread, NULL, dmsg_master_service, info); 272 } 273 return (NULL); 274 } 275 276 /* 277 * Handle/Monitor the dmsg stream. If unmanaged is set we are responsible 278 * for responding for the message, otherwise if it is not set libdmsg has 279 * already done some preprocessing and will respond to the message for us 280 * when we return. 281 * 282 * We primarily monitor for VOLCONFs 283 */ 284 static 285 void 286 hammer2_usrmsg_handler(dmsg_msg_t *msg, int unmanaged) 287 { 288 dmsg_state_t *state; 289 hammer2_media_config_t *conf; 290 dmsg_lnk_hammer2_volconf_t *msgconf; 291 int i; 292 293 /* 294 * Only process messages which are part of a LNK_CONN stream 295 */ 296 state = msg->state; 297 if (state == NULL || 298 (state->rxcmd & DMSGF_BASECMDMASK) != DMSG_LNK_CONN) { 299 hammer2_shell_parse(msg, unmanaged); 300 return; 301 } 302 303 switch(msg->tcmd) { 304 case DMSG_LNK_CONN | DMSGF_CREATE | DMSGF_DELETE: 305 case DMSG_LNK_CONN | DMSGF_DELETE: 306 case DMSG_LNK_ERROR | DMSGF_DELETE: 307 /* 308 * Deleting connection, clean out all volume configs 309 */ 310 if (state->media == NULL || state->media->usrhandle == NULL) 311 break; 312 conf = state->media->usrhandle; 313 fprintf(stderr, "Shutting down media spans\n"); 314 for (i = 0; i < HAMMER2_COPYID_COUNT; ++i) { 315 if (conf[i].thread) { 316 conf[i].ctl = H2CONFCTL_STOP; 317 pthread_cond_signal(&conf[i].cond); 318 } 319 } 320 for (i = 0; i < HAMMER2_COPYID_COUNT; ++i) { 321 if (conf[i].thread) { 322 pthread_join(conf[i].thread, NULL); 323 conf->thread = NULL; 324 pthread_cond_destroy(&conf[i].cond); 325 } 326 } 327 state->media->usrhandle = NULL; 328 free(conf); 329 break; 330 case DMSG_LNK_HAMMER2_VOLCONF: 331 /* 332 * One-way volume-configuration message is transmitted 333 * over the open LNK_CONN transaction. 334 */ 335 fprintf(stderr, "RECEIVED VOLCONF\n"); 336 337 if ((conf = state->media->usrhandle) == NULL) { 338 conf = malloc(sizeof(*conf) * HAMMER2_COPYID_COUNT); 339 bzero(conf, sizeof(*conf) * HAMMER2_COPYID_COUNT); 340 state->media->usrhandle = conf; 341 } 342 msgconf = H2_LNK_VOLCONF(msg); 343 344 if (msgconf->index < 0 || 345 msgconf->index >= HAMMER2_COPYID_COUNT) { 346 fprintf(stderr, 347 "VOLCONF: ILLEGAL INDEX %d\n", 348 msgconf->index); 349 break; 350 } 351 if (msgconf->copy.path[sizeof(msgconf->copy.path) - 1] != 0 || 352 msgconf->copy.path[0] == 0) { 353 fprintf(stderr, 354 "VOLCONF: ILLEGAL PATH %d\n", 355 msgconf->index); 356 break; 357 } 358 conf += msgconf->index; 359 pthread_mutex_lock(&confmtx); 360 conf->copy_pend = msgconf->copy; 361 conf->ctl |= H2CONFCTL_UPDATE; 362 pthread_mutex_unlock(&confmtx); 363 if (conf->thread == NULL) { 364 fprintf(stderr, "VOLCONF THREAD STARTED\n"); 365 pthread_cond_init(&conf->cond, NULL); 366 pthread_create(&conf->thread, NULL, 367 hammer2_volconf_thread, (void *)conf); 368 } 369 pthread_cond_signal(&conf->cond); 370 break; 371 default: 372 if (unmanaged) 373 dmsg_msg_reply(msg, DMSG_ERR_NOSUPP); 374 break; 375 } 376 } 377 378 static void * 379 hammer2_volconf_thread(void *info) 380 { 381 hammer2_media_config_t *conf = info; 382 383 pthread_mutex_lock(&confmtx); 384 while ((conf->ctl & H2CONFCTL_STOP) == 0) { 385 if (conf->ctl & H2CONFCTL_UPDATE) { 386 fprintf(stderr, "VOLCONF UPDATE\n"); 387 conf->ctl &= ~H2CONFCTL_UPDATE; 388 if (bcmp(&conf->copy_run, &conf->copy_pend, 389 sizeof(conf->copy_run)) == 0) { 390 fprintf(stderr, "VOLCONF: no changes\n"); 391 continue; 392 } 393 /* 394 * XXX TODO - auto reconnect on lookup failure or 395 * connect failure or stream failure. 396 */ 397 398 pthread_mutex_unlock(&confmtx); 399 hammer2_volconf_stop(conf); 400 conf->copy_run = conf->copy_pend; 401 if (conf->copy_run.copyid != 0 && 402 strncmp(conf->copy_run.path, "span:", 5) == 0) { 403 hammer2_volconf_start(conf, 404 conf->copy_run.path + 5); 405 } 406 pthread_mutex_lock(&confmtx); 407 fprintf(stderr, "VOLCONF UPDATE DONE state %d\n", conf->state); 408 } 409 if (conf->state == H2MC_CONNECT) { 410 hammer2_volconf_start(conf, conf->copy_run.path + 5); 411 pthread_mutex_unlock(&confmtx); 412 sleep(5); 413 pthread_mutex_lock(&confmtx); 414 } else { 415 pthread_cond_wait(&conf->cond, &confmtx); 416 } 417 } 418 pthread_mutex_unlock(&confmtx); 419 hammer2_volconf_stop(conf); 420 return(NULL); 421 } 422 423 static 424 void 425 hammer2_volconf_start(hammer2_media_config_t *conf, const char *hostname) 426 { 427 dmsg_master_service_info_t *info; 428 429 switch(conf->state) { 430 case H2MC_STOPPED: 431 case H2MC_CONNECT: 432 conf->fd = dmsg_connect(hostname); 433 if (conf->fd < 0) { 434 fprintf(stderr, "Unable to connect to %s\n", hostname); 435 conf->state = H2MC_CONNECT; 436 } else if (pipe(conf->pipefd) < 0) { 437 close(conf->fd); 438 fprintf(stderr, "pipe() failed during volconf\n"); 439 conf->state = H2MC_CONNECT; 440 } else { 441 fprintf(stderr, "VOLCONF CONNECT\n"); 442 info = malloc(sizeof(*info)); 443 bzero(info, sizeof(*info)); 444 info->fd = conf->fd; 445 info->altfd = conf->pipefd[0]; 446 info->altmsg_callback = hammer2_volconf_signal; 447 info->usrmsg_callback = hammer2_usrmsg_handler; 448 info->detachme = 0; 449 conf->state = H2MC_RUNNING; 450 pthread_create(&conf->iocom_thread, NULL, 451 dmsg_master_service, info); 452 } 453 break; 454 case H2MC_RUNNING: 455 break; 456 } 457 } 458 459 static 460 void 461 hammer2_volconf_stop(hammer2_media_config_t *conf) 462 { 463 switch(conf->state) { 464 case H2MC_STOPPED: 465 break; 466 case H2MC_CONNECT: 467 conf->state = H2MC_STOPPED; 468 break; 469 case H2MC_RUNNING: 470 close(conf->pipefd[1]); 471 conf->pipefd[1] = -1; 472 pthread_join(conf->iocom_thread, NULL); 473 conf->iocom_thread = NULL; 474 conf->state = H2MC_STOPPED; 475 break; 476 } 477 } 478 479 static 480 void 481 hammer2_volconf_signal(dmsg_iocom_t *iocom) 482 { 483 atomic_set_int(&iocom->flags, DMSG_IOCOMF_EOF); 484 } 485 486 487 /* 488 * Node discovery code on received SPANs (or loss of SPANs). This code 489 * is used to track the availability of remote block devices and install 490 * or deinstall them using the xdisk driver (/dev/xdisk). 491 * 492 * An installed xdisk creates /dev/xa%d and /dev/serno/<blah> based on 493 * the data handed to it. When opened, a virtual circuit is forged and 494 * maintained to the block device server via DMSG. Temporary failures 495 * stall the device until successfully reconnected or explicitly destroyed. 496 */ 497 static 498 void 499 hammer2_node_handler(void **opaquep, struct dmsg_msg *msg, int op) 500 { 501 struct service_node_opaque *info = *opaquep; 502 503 switch(op) { 504 case DMSG_NODEOP_ADD: 505 if (msg->any.lnk_span.peer_type != DMSG_PEER_BLOCK) 506 break; 507 if (msg->any.lnk_span.pfs_type != DMSG_PFSTYPE_SERVER) 508 break; 509 if (info == NULL) { 510 info = malloc(sizeof(*info)); 511 bzero(info, sizeof(*info)); 512 *opaquep = info; 513 } 514 snprintf(info->cl_label, sizeof(info->cl_label), 515 "%s", msg->any.lnk_span.cl_label); 516 snprintf(info->fs_label, sizeof(info->fs_label), 517 "%s", msg->any.lnk_span.fs_label); 518 info->block = msg->any.lnk_span.media.block; 519 fprintf(stderr, "NODE ADD %s serno %s\n", 520 info->cl_label, info->fs_label); 521 info->attached = 1; 522 xdisk_reconnect(info); 523 break; 524 case DMSG_NODEOP_DEL: 525 if (info) { 526 fprintf(stderr, "NODE DEL %s serno %s\n", 527 info->cl_label, info->fs_label); 528 pthread_mutex_lock(&diskmtx); 529 *opaquep = NULL; 530 info->attached = 0; 531 if (info->servicing == 0) 532 free(info); 533 else 534 shutdown(info->servicefd, SHUT_RDWR);/*XXX*/ 535 pthread_mutex_unlock(&diskmtx); 536 } 537 break; 538 default: 539 break; 540 } 541 } 542 543 /* 544 * Monitor block devices. Currently polls every ~10 seconds or so. 545 */ 546 static 547 void * 548 udev_thread(void *data __unused) 549 { 550 int fd; 551 int seq = 0; 552 553 pthread_detach(pthread_self()); 554 555 if ((fd = open(UDEV_DEVICE_PATH, O_RDWR)) < 0) { 556 fprintf(stderr, "udev_thread: unable to open \"%s\"\n", 557 UDEV_DEVICE_PATH); 558 pthread_exit(NULL); 559 } 560 udev_check_disks(); 561 while (ioctl(fd, UDEVWAIT, &seq) == 0) { 562 udev_check_disks(); 563 sleep(1); 564 } 565 return (NULL); 566 } 567 568 static void *autoconn_connect_thread(void *data); 569 static void autoconn_disconnect_signal(dmsg_iocom_t *iocom); 570 571 static 572 void * 573 autoconn_thread(void *data __unused) 574 { 575 TAILQ_HEAD(, autoconn) autolist; 576 struct autoconn *ac; 577 struct autoconn *next; 578 pthread_t thread; 579 struct stat st; 580 time_t t; 581 time_t lmod; 582 int found_last; 583 FILE *fp; 584 char buf[256]; 585 586 TAILQ_INIT(&autolist); 587 found_last = 0; 588 lmod = 0; 589 590 pthread_detach(pthread_self()); 591 for (;;) { 592 /* 593 * Polling interval 594 */ 595 sleep(5); 596 597 /* 598 * Poll the file. Loop up if the synchronized state (lmod) 599 * has not changed. 600 */ 601 if (stat(HAMMER2_DEFAULT_DIR "/autoconn", &st) == 0) { 602 if (lmod == st.st_mtime) 603 continue; 604 fp = fopen(HAMMER2_DEFAULT_DIR "/autoconn", "r"); 605 if (fp == NULL) 606 continue; 607 } else { 608 if (lmod == 0) 609 continue; 610 fp = NULL; 611 } 612 613 /* 614 * Wait at least 5 seconds after the file is created or 615 * removed. 616 * 617 * Do not update the synchronized state. 618 */ 619 if (fp == NULL && found_last) { 620 found_last = 0; 621 continue; 622 } else if (fp && found_last == 0) { 623 fclose(fp); 624 found_last = 1; 625 continue; 626 } 627 628 /* 629 * Don't scan the file until the time progresses past the 630 * file's mtime, so we can validate that the file was not 631 * further modified during our scan. 632 * 633 * Do not update the synchronized state. 634 */ 635 time(&t); 636 if (fp) { 637 if (t == st.st_mtime) { 638 fclose(fp); 639 continue; 640 } 641 t = st.st_mtime; 642 } else { 643 t = 0; 644 } 645 646 /* 647 * Set staging to disconnect, then scan the file. 648 */ 649 TAILQ_FOREACH(ac, &autolist, entry) 650 ac->stage = 0; 651 while (fp && fgets(buf, sizeof(buf), fp) != NULL) { 652 char *host; 653 654 if ((host = strtok(buf, " \t\r\n")) == NULL || 655 host[0] == '#') { 656 continue; 657 } 658 TAILQ_FOREACH(ac, &autolist, entry) { 659 if (strcmp(host, ac->host) == 0) 660 break; 661 } 662 if (ac == NULL) { 663 ac = malloc(sizeof(*ac)); 664 bzero(ac, sizeof(*ac)); 665 ac->host = strdup(host); 666 ac->state = AUTOCONN_INACTIVE; 667 TAILQ_INSERT_TAIL(&autolist, ac, entry); 668 } 669 ac->stage = 1; 670 } 671 672 /* 673 * Ignore the scan (and retry again) if the file was 674 * modified during the scan. 675 * 676 * Do not update the synchronized state. 677 */ 678 if (fp) { 679 if (fstat(fileno(fp), &st) < 0) { 680 fclose(fp); 681 continue; 682 } 683 fclose(fp); 684 if (t != st.st_mtime) 685 continue; 686 } 687 688 /* 689 * Update the synchronized state and reconfigure the 690 * connect list as needed. 691 */ 692 lmod = t; 693 next = TAILQ_FIRST(&autolist); 694 while ((ac = next) != NULL) { 695 next = TAILQ_NEXT(ac, entry); 696 697 /* 698 * Staging, initiate 699 */ 700 if (ac->stage && ac->state == AUTOCONN_INACTIVE) { 701 if (pipe(ac->pipefd) == 0) { 702 ac->stopme = 0; 703 ac->state = AUTOCONN_ACTIVE; 704 thread = NULL; 705 pthread_create(&thread, NULL, 706 autoconn_connect_thread, 707 ac); 708 } 709 } 710 711 /* 712 * Unstaging, stop active connection. 713 * 714 * We write to the pipe which causes the iocom_core 715 * to call autoconn_disconnect_signal(). 716 */ 717 if (ac->stage == 0 && 718 ac->state == AUTOCONN_ACTIVE) { 719 if (ac->stopme == 0) { 720 char dummy = 0; 721 ac->stopme = 1; 722 write(ac->pipefd[1], &dummy, 1); 723 } 724 } 725 726 /* 727 * Unstaging, delete inactive connection. 728 */ 729 if (ac->stage == 0 && 730 ac->state == AUTOCONN_INACTIVE) { 731 TAILQ_REMOVE(&autolist, ac, entry); 732 free(ac->host); 733 free(ac); 734 continue; 735 } 736 } 737 sleep(5); 738 } 739 return(NULL); 740 } 741 742 static 743 void * 744 autoconn_connect_thread(void *data) 745 { 746 dmsg_master_service_info_t *info; 747 struct autoconn *ac; 748 void *res; 749 int fd; 750 751 ac = data; 752 pthread_detach(pthread_self()); 753 754 while (ac->stopme == 0) { 755 fd = dmsg_connect(ac->host); 756 if (fd < 0) { 757 fprintf(stderr, "autoconn: Connect failure: %s\n", 758 ac->host); 759 sleep(5); 760 continue; 761 } 762 fprintf(stderr, "autoconn: Connect %s\n", ac->host); 763 764 info = malloc(sizeof(*info)); 765 bzero(info, sizeof(*info)); 766 info->fd = fd; 767 info->altfd = ac->pipefd[0]; 768 info->altmsg_callback = autoconn_disconnect_signal; 769 info->usrmsg_callback = hammer2_usrmsg_handler; 770 info->detachme = 0; 771 info->noclosealt = 1; 772 pthread_create(&ac->thread, NULL, dmsg_master_service, info); 773 pthread_join(ac->thread, &res); 774 } 775 close(ac->pipefd[0]); 776 ac->state = AUTOCONN_INACTIVE; 777 /* auto structure can be ripped out here */ 778 return(NULL); 779 } 780 781 static 782 void 783 autoconn_disconnect_signal(dmsg_iocom_t *iocom) 784 { 785 fprintf(stderr, "autoconn: Shutting down socket\n"); 786 atomic_set_int(&iocom->flags, DMSG_IOCOMF_EOF); 787 } 788 789 /* 790 * Retrieve the list of disk attachments and attempt to export 791 * them. 792 */ 793 static 794 void 795 udev_check_disks(void) 796 { 797 char tmpbuf[1024]; 798 char *buf = NULL; 799 char *disk; 800 int error; 801 size_t n; 802 803 for (;;) { 804 n = 0; 805 error = sysctlbyname("kern.disks", NULL, &n, NULL, 0); 806 if (error < 0 || n == 0) 807 break; 808 if (n >= sizeof(tmpbuf)) 809 buf = malloc(n + 1); 810 else 811 buf = tmpbuf; 812 error = sysctlbyname("kern.disks", buf, &n, NULL, 0); 813 if (error == 0) { 814 buf[n] = 0; 815 break; 816 } 817 if (buf != tmpbuf) { 818 free(buf); 819 buf = NULL; 820 } 821 if (errno != ENOMEM) 822 break; 823 } 824 if (buf) { 825 fprintf(stderr, "DISKS: %s\n", buf); 826 for (disk = strtok(buf, WS); disk; disk = strtok(NULL, WS)) { 827 disk_reconnect(disk); 828 } 829 if (buf != tmpbuf) 830 free(buf); 831 } 832 } 833 834 /* 835 * Normally the mount program supplies a cluster communications 836 * descriptor to the hammer2 vfs on mount, but if you kill the service 837 * daemon and restart it that link will be lost. 838 * 839 * This procedure attempts to [re]connect to existing mounts when 840 * the service daemon is started up before going into its accept 841 * loop. 842 * 843 * NOTE: A hammer2 mount point can only accomodate one connection at a time 844 * so this will disconnect any existing connection during the 845 * reconnect. 846 */ 847 static 848 void 849 master_reconnect(const char *mntpt) 850 { 851 struct hammer2_ioc_recluster recls; 852 dmsg_master_service_info_t *info; 853 pthread_t thread; 854 int fd; 855 int pipefds[2]; 856 857 fd = open(mntpt, O_RDONLY); 858 if (fd < 0) { 859 fprintf(stderr, "reconnect %s: no access to mount\n", mntpt); 860 return; 861 } 862 if (pipe(pipefds) < 0) { 863 fprintf(stderr, "reconnect %s: pipe() failed\n", mntpt); 864 close(fd); 865 return; 866 } 867 bzero(&recls, sizeof(recls)); 868 recls.fd = pipefds[0]; 869 if (ioctl(fd, HAMMER2IOC_RECLUSTER, &recls) < 0) { 870 fprintf(stderr, "reconnect %s: ioctl failed\n", mntpt); 871 close(pipefds[0]); 872 close(pipefds[1]); 873 close(fd); 874 return; 875 } 876 close(pipefds[0]); 877 close(fd); 878 879 info = malloc(sizeof(*info)); 880 bzero(info, sizeof(*info)); 881 info->fd = pipefds[1]; 882 info->detachme = 1; 883 info->usrmsg_callback = hammer2_usrmsg_handler; 884 info->node_handler = hammer2_node_handler; 885 info->label = strdup("hammer2"); 886 pthread_create(&thread, NULL, dmsg_master_service, info); 887 } 888 889 /* 890 * Reconnect a physical disk service to the mesh. 891 */ 892 static 893 void 894 disk_reconnect(const char *disk) 895 { 896 struct disk_ioc_recluster recls; 897 struct diskcon *dc; 898 dmsg_master_service_info_t *info; 899 pthread_t thread; 900 int fd; 901 int pipefds[2]; 902 char *path; 903 904 /* 905 * Urm, this will auto-create mdX+1, just ignore for now. 906 * This mechanic needs to be fixed. It might actually be nice 907 * to be able to export md disks. 908 */ 909 if (strncmp(disk, "md", 2) == 0) 910 return; 911 if (strncmp(disk, "xa", 2) == 0) 912 return; 913 914 /* 915 * Check if already connected 916 */ 917 pthread_mutex_lock(&diskmtx); 918 TAILQ_FOREACH(dc, &diskconq, entry) { 919 if (strcmp(dc->disk, disk) == 0) 920 break; 921 } 922 pthread_mutex_unlock(&diskmtx); 923 if (dc) 924 return; 925 926 /* 927 * Not already connected, create a connection to the kernel 928 * disk driver. 929 */ 930 asprintf(&path, "/dev/%s", disk); 931 fd = open(path, O_RDONLY); 932 if (fd < 0) { 933 fprintf(stderr, "reconnect %s: no access to disk\n", disk); 934 free(path); 935 return; 936 } 937 free(path); 938 if (pipe(pipefds) < 0) { 939 fprintf(stderr, "reconnect %s: pipe() failed\n", disk); 940 close(fd); 941 return; 942 } 943 bzero(&recls, sizeof(recls)); 944 recls.fd = pipefds[0]; 945 if (ioctl(fd, DIOCRECLUSTER, &recls) < 0) { 946 fprintf(stderr, "reconnect %s: ioctl failed\n", disk); 947 close(pipefds[0]); 948 close(pipefds[1]); 949 close(fd); 950 return; 951 } 952 close(pipefds[0]); 953 close(fd); 954 955 dc = malloc(sizeof(*dc)); 956 dc->disk = strdup(disk); 957 pthread_mutex_lock(&diskmtx); 958 TAILQ_INSERT_TAIL(&diskconq, dc, entry); 959 pthread_mutex_unlock(&diskmtx); 960 961 info = malloc(sizeof(*info)); 962 bzero(info, sizeof(*info)); 963 info->fd = pipefds[1]; 964 info->detachme = 1; 965 info->usrmsg_callback = hammer2_usrmsg_handler; 966 info->node_handler = hammer2_node_handler; 967 info->exit_callback = disk_disconnect; 968 info->handle = dc; 969 info->label = strdup(dc->disk); 970 pthread_create(&thread, NULL, dmsg_master_service, info); 971 } 972 973 static 974 void 975 disk_disconnect(void *handle) 976 { 977 struct diskcon *dc = handle; 978 979 fprintf(stderr, "DISK_DISCONNECT %s\n", dc->disk); 980 981 pthread_mutex_lock(&diskmtx); 982 TAILQ_REMOVE(&diskconq, dc, entry); 983 pthread_mutex_unlock(&diskmtx); 984 free(dc->disk); 985 free(dc); 986 } 987 988 /* 989 * [re]connect a remote disk service to the local system via /dev/xdisk. 990 */ 991 static 992 void 993 xdisk_reconnect(struct service_node_opaque *xdisk) 994 { 995 struct xdisk_attach_ioctl *xaioc; 996 dmsg_master_service_info_t *info; 997 pthread_t thread; 998 int pipefds[2]; 999 1000 if (pipe(pipefds) < 0) { 1001 fprintf(stderr, "reconnect %s: pipe() failed\n", 1002 xdisk->cl_label); 1003 return; 1004 } 1005 1006 info = malloc(sizeof(*info)); 1007 bzero(info, sizeof(*info)); 1008 info->fd = pipefds[1]; 1009 info->detachme = 1; 1010 info->usrmsg_callback = hammer2_usrmsg_handler; 1011 info->node_handler = hammer2_node_handler; 1012 info->exit_callback = xdisk_disconnect; 1013 info->handle = xdisk; 1014 xdisk->servicing = 1; 1015 xdisk->servicefd = info->fd; 1016 info->label = strdup(xdisk->cl_label); 1017 pthread_create(&thread, NULL, dmsg_master_service, info); 1018 1019 /* 1020 * We have to run the attach in its own pthread because it will 1021 * synchronously interact with the messaging subsystem over the 1022 * pipe. If we do it here we will deadlock. 1023 */ 1024 xaioc = malloc(sizeof(*xaioc)); 1025 bzero(xaioc, sizeof(*xaioc)); 1026 snprintf(xaioc->cl_label, sizeof(xaioc->cl_label), 1027 "%s", xdisk->cl_label); 1028 snprintf(xaioc->fs_label, sizeof(xaioc->fs_label), 1029 "X-%s", xdisk->fs_label); 1030 xaioc->bytes = xdisk->block.bytes; 1031 xaioc->blksize = xdisk->block.blksize; 1032 xaioc->fd = pipefds[0]; 1033 1034 pthread_create(&thread, NULL, xdisk_attach_tmpthread, xaioc); 1035 } 1036 1037 static 1038 void * 1039 xdisk_attach_tmpthread(void *data) 1040 { 1041 struct xdisk_attach_ioctl *xaioc = data; 1042 int fd; 1043 1044 pthread_detach(pthread_self()); 1045 1046 fd = open("/dev/xdisk", O_RDWR, 0600); 1047 if (fd < 0) { 1048 fprintf(stderr, "xdisk_reconnect: Unable to open /dev/xdisk\n"); 1049 } 1050 if (ioctl(fd, XDISKIOCATTACH, xaioc) < 0) { 1051 fprintf(stderr, "reconnect %s: xdisk attach failed\n", 1052 xaioc->cl_label); 1053 } 1054 close(xaioc->fd); 1055 close(fd); 1056 return (NULL); 1057 } 1058 1059 static 1060 void 1061 xdisk_disconnect(void *handle) 1062 { 1063 struct service_node_opaque *info = handle; 1064 1065 assert(info->servicing == 1); 1066 1067 pthread_mutex_lock(&diskmtx); 1068 info->servicing = 0; 1069 if (info->attached == 0) 1070 free(info); 1071 pthread_mutex_unlock(&diskmtx); 1072 } 1073