1 /* $NetBSD: clvmd.c,v 1.1.1.2 2009/02/18 11:16:38 haad Exp $ */ 2 3 /* 4 * Copyright (C) 2002-2004 Sistina Software, Inc. All rights reserved. 5 * Copyright (C) 2004-2007 Red Hat, Inc. All rights reserved. 6 * 7 * This file is part of LVM2. 8 * 9 * This copyrighted material is made available to anyone wishing to use, 10 * modify, copy, or redistribute it subject to the terms and conditions 11 * of the GNU General Public License v.2. 12 * 13 * You should have received a copy of the GNU General Public License 14 * along with this program; if not, write to the Free Software Foundation, 15 * Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA 16 */ 17 18 /* 19 * CLVMD: Cluster LVM daemon 20 */ 21 22 #define _GNU_SOURCE 23 #define _FILE_OFFSET_BITS 64 24 25 #include <configure.h> 26 #include <libdevmapper.h> 27 28 #include <pthread.h> 29 #include <sys/types.h> 30 #include <sys/stat.h> 31 #include <sys/socket.h> 32 #include <sys/uio.h> 33 #include <sys/un.h> 34 #include <sys/time.h> 35 #include <sys/ioctl.h> 36 #include <sys/utsname.h> 37 #include <netinet/in.h> 38 #include <stdio.h> 39 #include <stdlib.h> 40 #include <stddef.h> 41 #include <stdarg.h> 42 #include <signal.h> 43 #include <unistd.h> 44 #include <fcntl.h> 45 #include <getopt.h> 46 #include <syslog.h> 47 #include <errno.h> 48 #include <limits.h> 49 #include <libdlm.h> 50 51 #include "clvmd-comms.h" 52 #include "lvm-functions.h" 53 #include "clvm.h" 54 #include "version.h" 55 #include "clvmd.h" 56 #include "refresh_clvmd.h" 57 #include "lvm-logging.h" 58 59 #ifndef TRUE 60 #define TRUE 1 61 #endif 62 #ifndef FALSE 63 #define FALSE 0 64 #endif 65 66 #define MAX_RETRIES 4 67 68 #define ISLOCAL_CSID(c) (memcmp(c, our_csid, max_csid_len) == 0) 69 70 /* Head of the fd list. Also contains 71 the cluster_socket details */ 72 static struct local_client local_client_head; 73 74 static unsigned short global_xid = 0; /* Last transaction ID issued */ 75 76 struct cluster_ops *clops = NULL; 77 78 static char our_csid[MAX_CSID_LEN]; 79 static unsigned max_csid_len; 80 static unsigned max_cluster_message; 81 static unsigned max_cluster_member_name_len; 82 83 /* Structure of items on the LVM thread list */ 84 struct lvm_thread_cmd { 85 struct dm_list list; 86 87 struct local_client *client; 88 struct clvm_header *msg; 89 char csid[MAX_CSID_LEN]; 90 int remote; /* Flag */ 91 int msglen; 92 unsigned short xid; 93 }; 94 95 debug_t debug; 96 static pthread_t lvm_thread; 97 static pthread_mutex_t lvm_thread_mutex; 98 static pthread_cond_t lvm_thread_cond; 99 static pthread_mutex_t lvm_start_mutex; 100 static struct dm_list lvm_cmd_head; 101 static volatile sig_atomic_t quit = 0; 102 static volatile sig_atomic_t reread_config = 0; 103 static int child_pipe[2]; 104 105 /* Reasons the daemon failed initialisation */ 106 #define DFAIL_INIT 1 107 #define DFAIL_LOCAL_SOCK 2 108 #define DFAIL_CLUSTER_IF 3 109 #define DFAIL_MALLOC 4 110 #define DFAIL_TIMEOUT 5 111 #define SUCCESS 0 112 113 /* Prototypes for code further down */ 114 static void sigusr2_handler(int sig); 115 static void sighup_handler(int sig); 116 static void sigterm_handler(int sig); 117 static void send_local_reply(struct local_client *client, int status, 118 int clientid); 119 static void free_reply(struct local_client *client); 120 static void send_version_message(void); 121 static void *pre_and_post_thread(void *arg); 122 static int send_message(void *buf, int msglen, const char *csid, int fd, 123 const char *errtext); 124 static int read_from_local_sock(struct local_client *thisfd); 125 static int process_local_command(struct clvm_header *msg, int msglen, 126 struct local_client *client, 127 unsigned short xid); 128 static void process_remote_command(struct clvm_header *msg, int msglen, int fd, 129 const char *csid); 130 static int process_reply(const struct clvm_header *msg, int msglen, 131 const char *csid); 132 static int open_local_sock(void); 133 static int check_local_clvmd(void); 134 static struct local_client *find_client(int clientid); 135 static void main_loop(int local_sock, int cmd_timeout); 136 static void be_daemon(int start_timeout); 137 static int check_all_clvmds_running(struct local_client *client); 138 static int local_rendezvous_callback(struct local_client *thisfd, char *buf, 139 int len, const char *csid, 140 struct local_client **new_client); 141 static void *lvm_thread_fn(void *); 142 static int add_to_lvmqueue(struct local_client *client, struct clvm_header *msg, 143 int msglen, const char *csid); 144 static int distribute_command(struct local_client *thisfd); 145 static void hton_clvm(struct clvm_header *hdr); 146 static void ntoh_clvm(struct clvm_header *hdr); 147 static void add_reply_to_list(struct local_client *client, int status, 148 const char *csid, const char *buf, int len); 149 150 static void usage(char *prog, FILE *file) 151 { 152 fprintf(file, "Usage:\n"); 153 fprintf(file, "%s [Vhd]\n", prog); 154 fprintf(file, "\n"); 155 fprintf(file, " -V Show version of clvmd\n"); 156 fprintf(file, " -h Show this help information\n"); 157 fprintf(file, " -d Set debug level\n"); 158 fprintf(file, " If starting clvmd then don't fork, run in the foreground\n"); 159 fprintf(file, " -R Tell all running clvmds in the cluster to reload their device cache\n"); 160 fprintf(file, " -C Sets debug level (from -d) on all clvmd instances clusterwide\n"); 161 fprintf(file, " -t<secs> Command timeout (default 60 seconds)\n"); 162 fprintf(file, " -T<secs> Startup timeout (default none)\n"); 163 fprintf(file, "\n"); 164 } 165 166 /* Called to signal the parent how well we got on during initialisation */ 167 static void child_init_signal(int status) 168 { 169 if (child_pipe[1]) { 170 write(child_pipe[1], &status, sizeof(status)); 171 close(child_pipe[1]); 172 } 173 if (status) 174 exit(status); 175 } 176 177 178 void debuglog(const char *fmt, ...) 179 { 180 time_t P; 181 va_list ap; 182 static int syslog_init = 0; 183 184 if (debug == DEBUG_STDERR) { 185 va_start(ap,fmt); 186 time(&P); 187 fprintf(stderr, "CLVMD[%x]: %.15s ", (int)pthread_self(), ctime(&P)+4 ); 188 vfprintf(stderr, fmt, ap); 189 va_end(ap); 190 } 191 if (debug == DEBUG_SYSLOG) { 192 if (!syslog_init) { 193 openlog("clvmd", LOG_PID, LOG_DAEMON); 194 syslog_init = 1; 195 } 196 197 va_start(ap,fmt); 198 vsyslog(LOG_DEBUG, fmt, ap); 199 va_end(ap); 200 } 201 } 202 203 static const char *decode_cmd(unsigned char cmdl) 204 { 205 static char buf[128]; 206 const char *command; 207 208 switch (cmdl) { 209 case CLVMD_CMD_TEST: 210 command = "TEST"; 211 break; 212 case CLVMD_CMD_LOCK_VG: 213 command = "LOCK_VG"; 214 break; 215 case CLVMD_CMD_LOCK_LV: 216 command = "LOCK_LV"; 217 break; 218 case CLVMD_CMD_REFRESH: 219 command = "REFRESH"; 220 break; 221 case CLVMD_CMD_SET_DEBUG: 222 command = "SET_DEBUG"; 223 break; 224 case CLVMD_CMD_GET_CLUSTERNAME: 225 command = "GET_CLUSTERNAME"; 226 break; 227 case CLVMD_CMD_VG_BACKUP: 228 command = "VG_BACKUP"; 229 break; 230 case CLVMD_CMD_REPLY: 231 command = "REPLY"; 232 break; 233 case CLVMD_CMD_VERSION: 234 command = "VERSION"; 235 break; 236 case CLVMD_CMD_GOAWAY: 237 command = "GOAWAY"; 238 break; 239 case CLVMD_CMD_LOCK: 240 command = "LOCK"; 241 break; 242 case CLVMD_CMD_UNLOCK: 243 command = "UNLOCK"; 244 break; 245 default: 246 command = "unknown"; 247 break; 248 } 249 250 sprintf(buf, "%s (0x%x)", command, cmdl); 251 252 return buf; 253 } 254 255 int main(int argc, char *argv[]) 256 { 257 int local_sock; 258 struct local_client *newfd; 259 struct utsname nodeinfo; 260 signed char opt; 261 int cmd_timeout = DEFAULT_CMD_TIMEOUT; 262 int start_timeout = 0; 263 sigset_t ss; 264 int using_gulm = 0; 265 int debug_opt = 0; 266 int clusterwide_opt = 0; 267 268 /* Deal with command-line arguments */ 269 opterr = 0; 270 optind = 0; 271 while ((opt = getopt(argc, argv, "?vVhd::t:RT:C")) != EOF) { 272 switch (opt) { 273 case 'h': 274 usage(argv[0], stdout); 275 exit(0); 276 277 case '?': 278 usage(argv[0], stderr); 279 exit(0); 280 281 case 'R': 282 return refresh_clvmd(); 283 284 case 'C': 285 clusterwide_opt = 1; 286 break; 287 288 case 'd': 289 debug_opt = 1; 290 if (optarg) 291 debug = atoi(optarg); 292 else 293 debug = DEBUG_STDERR; 294 break; 295 296 case 't': 297 cmd_timeout = atoi(optarg); 298 if (!cmd_timeout) { 299 fprintf(stderr, "command timeout is invalid\n"); 300 usage(argv[0], stderr); 301 exit(1); 302 } 303 break; 304 case 'T': 305 start_timeout = atoi(optarg); 306 if (start_timeout <= 0) { 307 fprintf(stderr, "startup timeout is invalid\n"); 308 usage(argv[0], stderr); 309 exit(1); 310 } 311 break; 312 313 case 'V': 314 printf("Cluster LVM daemon version: %s\n", LVM_VERSION); 315 printf("Protocol version: %d.%d.%d\n", 316 CLVMD_MAJOR_VERSION, CLVMD_MINOR_VERSION, 317 CLVMD_PATCH_VERSION); 318 exit(1); 319 break; 320 321 } 322 } 323 324 /* Setting debug options on an existing clvmd */ 325 if (debug_opt && !check_local_clvmd()) { 326 327 /* Sending to stderr makes no sense for a detached daemon */ 328 if (debug == DEBUG_STDERR) 329 debug = DEBUG_SYSLOG; 330 return debug_clvmd(debug, clusterwide_opt); 331 } 332 333 /* Fork into the background (unless requested not to) */ 334 if (debug != DEBUG_STDERR) { 335 be_daemon(start_timeout); 336 } 337 338 DEBUGLOG("CLVMD started\n"); 339 340 /* Open the Unix socket we listen for commands on. 341 We do this before opening the cluster socket so that 342 potential clients will block rather than error if we are running 343 but the cluster is not ready yet */ 344 local_sock = open_local_sock(); 345 if (local_sock < 0) 346 child_init_signal(DFAIL_LOCAL_SOCK); 347 348 /* Set up signal handlers, USR1 is for cluster change notifications (in cman) 349 USR2 causes child threads to exit. 350 HUP causes gulm version to re-read nodes list from CCS. 351 PIPE should be ignored */ 352 signal(SIGUSR2, sigusr2_handler); 353 signal(SIGHUP, sighup_handler); 354 signal(SIGPIPE, SIG_IGN); 355 356 /* Block SIGUSR2 in the main process */ 357 sigemptyset(&ss); 358 sigaddset(&ss, SIGUSR2); 359 sigprocmask(SIG_BLOCK, &ss, NULL); 360 361 /* Initialise the LVM thread variables */ 362 dm_list_init(&lvm_cmd_head); 363 pthread_mutex_init(&lvm_thread_mutex, NULL); 364 pthread_cond_init(&lvm_thread_cond, NULL); 365 pthread_mutex_init(&lvm_start_mutex, NULL); 366 init_lvhash(); 367 368 /* Start the cluster interface */ 369 #ifdef USE_CMAN 370 if ((clops = init_cman_cluster())) { 371 max_csid_len = CMAN_MAX_CSID_LEN; 372 max_cluster_message = CMAN_MAX_CLUSTER_MESSAGE; 373 max_cluster_member_name_len = CMAN_MAX_NODENAME_LEN; 374 syslog(LOG_NOTICE, "Cluster LVM daemon started - connected to CMAN"); 375 } 376 #endif 377 #ifdef USE_GULM 378 if (!clops) 379 if ((clops = init_gulm_cluster())) { 380 max_csid_len = GULM_MAX_CSID_LEN; 381 max_cluster_message = GULM_MAX_CLUSTER_MESSAGE; 382 max_cluster_member_name_len = GULM_MAX_CLUSTER_MEMBER_NAME_LEN; 383 using_gulm = 1; 384 syslog(LOG_NOTICE, "Cluster LVM daemon started - connected to GULM"); 385 } 386 #endif 387 #ifdef USE_OPENAIS 388 if (!clops) 389 if ((clops = init_openais_cluster())) { 390 max_csid_len = OPENAIS_CSID_LEN; 391 max_cluster_message = OPENAIS_MAX_CLUSTER_MESSAGE; 392 max_cluster_member_name_len = OPENAIS_MAX_CLUSTER_MEMBER_NAME_LEN; 393 syslog(LOG_NOTICE, "Cluster LVM daemon started - connected to OpenAIS"); 394 } 395 #endif 396 #ifdef USE_COROSYNC 397 if (!clops) 398 if ((clops = init_corosync_cluster())) { 399 max_csid_len = COROSYNC_CSID_LEN; 400 max_cluster_message = COROSYNC_MAX_CLUSTER_MESSAGE; 401 max_cluster_member_name_len = COROSYNC_MAX_CLUSTER_MEMBER_NAME_LEN; 402 syslog(LOG_NOTICE, "Cluster LVM daemon started - connected to Corosync"); 403 } 404 #endif 405 406 if (!clops) { 407 DEBUGLOG("Can't initialise cluster interface\n"); 408 log_error("Can't initialise cluster interface\n"); 409 child_init_signal(DFAIL_CLUSTER_IF); 410 } 411 DEBUGLOG("Cluster ready, doing some more initialisation\n"); 412 413 /* Save our CSID */ 414 uname(&nodeinfo); 415 clops->get_our_csid(our_csid); 416 417 /* Initialise the FD list head */ 418 local_client_head.fd = clops->get_main_cluster_fd(); 419 local_client_head.type = CLUSTER_MAIN_SOCK; 420 local_client_head.callback = clops->cluster_fd_callback; 421 422 /* Add the local socket to the list */ 423 newfd = malloc(sizeof(struct local_client)); 424 if (!newfd) 425 child_init_signal(DFAIL_MALLOC); 426 427 newfd->fd = local_sock; 428 newfd->removeme = 0; 429 newfd->type = LOCAL_RENDEZVOUS; 430 newfd->callback = local_rendezvous_callback; 431 newfd->next = local_client_head.next; 432 local_client_head.next = newfd; 433 434 /* This needs to be started after cluster initialisation 435 as it may need to take out locks */ 436 DEBUGLOG("starting LVM thread\n"); 437 438 /* Don't let anyone else to do work until we are started */ 439 pthread_mutex_lock(&lvm_start_mutex); 440 pthread_create(&lvm_thread, NULL, lvm_thread_fn, 441 (void *)(long)using_gulm); 442 443 /* Tell the rest of the cluster our version number */ 444 /* CMAN can do this immediately, gulm needs to wait until 445 the core initialisation has finished and the node list 446 has been gathered */ 447 if (clops->cluster_init_completed) 448 clops->cluster_init_completed(); 449 450 DEBUGLOG("clvmd ready for work\n"); 451 child_init_signal(SUCCESS); 452 453 /* Try to shutdown neatly */ 454 signal(SIGTERM, sigterm_handler); 455 signal(SIGINT, sigterm_handler); 456 457 /* Do some work */ 458 main_loop(local_sock, cmd_timeout); 459 460 return 0; 461 } 462 463 /* Called when the GuLM cluster layer has completed initialisation. 464 We send the version message */ 465 void clvmd_cluster_init_completed() 466 { 467 send_version_message(); 468 } 469 470 /* Data on a connected socket */ 471 static int local_sock_callback(struct local_client *thisfd, char *buf, int len, 472 const char *csid, 473 struct local_client **new_client) 474 { 475 *new_client = NULL; 476 return read_from_local_sock(thisfd); 477 } 478 479 /* Data on a connected socket */ 480 static int local_rendezvous_callback(struct local_client *thisfd, char *buf, 481 int len, const char *csid, 482 struct local_client **new_client) 483 { 484 /* Someone connected to our local socket, accept it. */ 485 486 struct sockaddr_un socka; 487 struct local_client *newfd; 488 socklen_t sl = sizeof(socka); 489 int client_fd = accept(thisfd->fd, (struct sockaddr *) &socka, &sl); 490 491 if (client_fd == -1 && errno == EINTR) 492 return 1; 493 494 if (client_fd >= 0) { 495 newfd = malloc(sizeof(struct local_client)); 496 if (!newfd) { 497 close(client_fd); 498 return 1; 499 } 500 newfd->fd = client_fd; 501 newfd->type = LOCAL_SOCK; 502 newfd->xid = 0; 503 newfd->removeme = 0; 504 newfd->callback = local_sock_callback; 505 newfd->bits.localsock.replies = NULL; 506 newfd->bits.localsock.expected_replies = 0; 507 newfd->bits.localsock.cmd = NULL; 508 newfd->bits.localsock.in_progress = FALSE; 509 newfd->bits.localsock.sent_out = FALSE; 510 newfd->bits.localsock.threadid = 0; 511 newfd->bits.localsock.finished = 0; 512 newfd->bits.localsock.pipe_client = NULL; 513 newfd->bits.localsock.private = NULL; 514 newfd->bits.localsock.all_success = 1; 515 DEBUGLOG("Got new connection on fd %d\n", newfd->fd); 516 *new_client = newfd; 517 } 518 return 1; 519 } 520 521 static int local_pipe_callback(struct local_client *thisfd, char *buf, 522 int maxlen, const char *csid, 523 struct local_client **new_client) 524 { 525 int len; 526 char buffer[PIPE_BUF]; 527 struct local_client *sock_client = thisfd->bits.pipe.client; 528 int status = -1; /* in error by default */ 529 530 len = read(thisfd->fd, buffer, sizeof(int)); 531 if (len == -1 && errno == EINTR) 532 return 1; 533 534 if (len == sizeof(int)) { 535 memcpy(&status, buffer, sizeof(int)); 536 } 537 538 DEBUGLOG("read on PIPE %d: %d bytes: status: %d\n", 539 thisfd->fd, len, status); 540 541 /* EOF on pipe or an error, close it */ 542 if (len <= 0) { 543 int jstat; 544 void *ret = &status; 545 close(thisfd->fd); 546 547 /* Clear out the cross-link */ 548 if (thisfd->bits.pipe.client != NULL) 549 thisfd->bits.pipe.client->bits.localsock.pipe_client = 550 NULL; 551 552 /* Reap child thread */ 553 if (thisfd->bits.pipe.threadid) { 554 jstat = pthread_join(thisfd->bits.pipe.threadid, &ret); 555 thisfd->bits.pipe.threadid = 0; 556 if (thisfd->bits.pipe.client != NULL) 557 thisfd->bits.pipe.client->bits.localsock. 558 threadid = 0; 559 } 560 return -1; 561 } else { 562 DEBUGLOG("background routine status was %d, sock_client=%p\n", 563 status, sock_client); 564 /* But has the client gone away ?? */ 565 if (sock_client == NULL) { 566 DEBUGLOG 567 ("Got PIPE response for dead client, ignoring it\n"); 568 } else { 569 /* If error then just return that code */ 570 if (status) 571 send_local_reply(sock_client, status, 572 sock_client->fd); 573 else { 574 if (sock_client->bits.localsock.state == 575 POST_COMMAND) { 576 send_local_reply(sock_client, 0, 577 sock_client->fd); 578 } else // PRE_COMMAND finished. 579 { 580 if ( 581 (status = 582 distribute_command(sock_client)) != 583 0) send_local_reply(sock_client, 584 EFBIG, 585 sock_client-> 586 fd); 587 } 588 } 589 } 590 } 591 return len; 592 } 593 594 /* If a noed is up, look for it in the reply array, if it's not there then 595 add one with "ETIMEDOUT". 596 NOTE: This won't race with real replies because they happen in the same thread. 597 */ 598 static void timedout_callback(struct local_client *client, const char *csid, 599 int node_up) 600 { 601 if (node_up) { 602 struct node_reply *reply; 603 char nodename[max_cluster_member_name_len]; 604 605 clops->name_from_csid(csid, nodename); 606 DEBUGLOG("Checking for a reply from %s\n", nodename); 607 pthread_mutex_lock(&client->bits.localsock.reply_mutex); 608 609 reply = client->bits.localsock.replies; 610 while (reply && strcmp(reply->node, nodename) != 0) { 611 reply = reply->next; 612 } 613 614 pthread_mutex_unlock(&client->bits.localsock.reply_mutex); 615 616 if (!reply) { 617 DEBUGLOG("Node %s timed-out\n", nodename); 618 add_reply_to_list(client, ETIMEDOUT, csid, 619 "Command timed out", 18); 620 } 621 } 622 } 623 624 /* Called when the request has timed out on at least one node. We fill in 625 the remaining node entries with ETIMEDOUT and return. 626 627 By the time we get here the node that caused 628 the timeout could have gone down, in which case we will never get the expected 629 number of replies that triggers the post command so we need to do it here 630 */ 631 static void request_timed_out(struct local_client *client) 632 { 633 DEBUGLOG("Request timed-out. padding\n"); 634 clops->cluster_do_node_callback(client, timedout_callback); 635 636 if (client->bits.localsock.num_replies != 637 client->bits.localsock.expected_replies) { 638 /* Post-process the command */ 639 if (client->bits.localsock.threadid) { 640 pthread_mutex_lock(&client->bits.localsock.mutex); 641 client->bits.localsock.state = POST_COMMAND; 642 pthread_cond_signal(&client->bits.localsock.cond); 643 pthread_mutex_unlock(&client->bits.localsock.mutex); 644 } 645 } 646 } 647 648 /* This is where the real work happens */ 649 static void main_loop(int local_sock, int cmd_timeout) 650 { 651 DEBUGLOG("Using timeout of %d seconds\n", cmd_timeout); 652 653 /* Main loop */ 654 while (!quit) { 655 fd_set in; 656 int select_status; 657 struct local_client *thisfd; 658 struct timeval tv = { cmd_timeout, 0 }; 659 int quorate = clops->is_quorate(); 660 661 /* Wait on the cluster FD and all local sockets/pipes */ 662 local_client_head.fd = clops->get_main_cluster_fd(); 663 FD_ZERO(&in); 664 for (thisfd = &local_client_head; thisfd != NULL; 665 thisfd = thisfd->next) { 666 667 if (thisfd->removeme) 668 continue; 669 670 /* if the cluster is not quorate then don't listen for new requests */ 671 if ((thisfd->type != LOCAL_RENDEZVOUS && 672 thisfd->type != LOCAL_SOCK) || quorate) 673 FD_SET(thisfd->fd, &in); 674 } 675 676 select_status = select(FD_SETSIZE, &in, NULL, NULL, &tv); 677 678 if (reread_config) { 679 int saved_errno = errno; 680 681 reread_config = 0; 682 if (clops->reread_config) 683 clops->reread_config(); 684 errno = saved_errno; 685 } 686 687 if (select_status > 0) { 688 struct local_client *lastfd = NULL; 689 char csid[MAX_CSID_LEN]; 690 char buf[max_cluster_message]; 691 692 for (thisfd = &local_client_head; thisfd != NULL; 693 thisfd = thisfd->next) { 694 695 if (thisfd->removeme) { 696 struct local_client *free_fd; 697 lastfd->next = thisfd->next; 698 free_fd = thisfd; 699 thisfd = lastfd; 700 701 DEBUGLOG("removeme set for fd %d\n", free_fd->fd); 702 703 /* Queue cleanup, this also frees the client struct */ 704 add_to_lvmqueue(free_fd, NULL, 0, NULL); 705 break; 706 } 707 708 if (FD_ISSET(thisfd->fd, &in)) { 709 struct local_client *newfd = NULL; 710 int ret; 711 712 /* Do callback */ 713 ret = 714 thisfd->callback(thisfd, buf, 715 sizeof(buf), csid, 716 &newfd); 717 /* Ignore EAGAIN */ 718 if (ret < 0 && (errno == EAGAIN || 719 errno == EINTR)) continue; 720 721 /* Got error or EOF: Remove it from the list safely */ 722 if (ret <= 0) { 723 struct local_client *free_fd; 724 int type = thisfd->type; 725 726 /* If the cluster socket shuts down, so do we */ 727 if (type == CLUSTER_MAIN_SOCK || 728 type == CLUSTER_INTERNAL) 729 goto closedown; 730 731 DEBUGLOG("ret == %d, errno = %d. removing client\n", 732 ret, errno); 733 lastfd->next = thisfd->next; 734 free_fd = thisfd; 735 thisfd = lastfd; 736 close(free_fd->fd); 737 738 /* Queue cleanup, this also frees the client struct */ 739 add_to_lvmqueue(free_fd, NULL, 0, NULL); 740 break; 741 } 742 743 /* New client...simply add it to the list */ 744 if (newfd) { 745 newfd->next = thisfd->next; 746 thisfd->next = newfd; 747 break; 748 } 749 } 750 lastfd = thisfd; 751 } 752 } 753 754 /* Select timed out. Check for clients that have been waiting too long for a response */ 755 if (select_status == 0) { 756 time_t the_time = time(NULL); 757 758 for (thisfd = &local_client_head; thisfd != NULL; 759 thisfd = thisfd->next) { 760 if (thisfd->type == LOCAL_SOCK 761 && thisfd->bits.localsock.sent_out 762 && thisfd->bits.localsock.sent_time + 763 cmd_timeout < the_time 764 && thisfd->bits.localsock. 765 expected_replies != 766 thisfd->bits.localsock.num_replies) { 767 /* Send timed out message + replies we already have */ 768 DEBUGLOG 769 ("Request timed-out (send: %ld, now: %ld)\n", 770 thisfd->bits.localsock.sent_time, 771 the_time); 772 773 thisfd->bits.localsock.all_success = 0; 774 775 request_timed_out(thisfd); 776 } 777 } 778 } 779 if (select_status < 0) { 780 if (errno == EINTR) 781 continue; 782 783 #ifdef DEBUG 784 perror("select error"); 785 exit(-1); 786 #endif 787 } 788 } 789 790 closedown: 791 clops->cluster_closedown(); 792 close(local_sock); 793 } 794 795 static __attribute__ ((noreturn)) void wait_for_child(int c_pipe, int timeout) 796 { 797 int child_status; 798 int sstat; 799 fd_set fds; 800 struct timeval tv = {timeout, 0}; 801 802 FD_ZERO(&fds); 803 FD_SET(c_pipe, &fds); 804 805 sstat = select(c_pipe+1, &fds, NULL, NULL, timeout? &tv: NULL); 806 if (sstat == 0) { 807 fprintf(stderr, "clvmd startup timed out\n"); 808 exit(DFAIL_TIMEOUT); 809 } 810 if (sstat == 1) { 811 if (read(c_pipe, &child_status, sizeof(child_status)) != 812 sizeof(child_status)) { 813 814 fprintf(stderr, "clvmd failed in initialisation\n"); 815 exit(DFAIL_INIT); 816 } 817 else { 818 switch (child_status) { 819 case SUCCESS: 820 break; 821 case DFAIL_INIT: 822 fprintf(stderr, "clvmd failed in initialisation\n"); 823 break; 824 case DFAIL_LOCAL_SOCK: 825 fprintf(stderr, "clvmd could not create local socket\n"); 826 fprintf(stderr, "Another clvmd is probably already running\n"); 827 break; 828 case DFAIL_CLUSTER_IF: 829 fprintf(stderr, "clvmd could not connect to cluster manager\n"); 830 fprintf(stderr, "Consult syslog for more information\n"); 831 break; 832 case DFAIL_MALLOC: 833 fprintf(stderr, "clvmd failed, not enough memory\n"); 834 break; 835 default: 836 fprintf(stderr, "clvmd failed, error was %d\n", child_status); 837 break; 838 } 839 exit(child_status); 840 } 841 } 842 fprintf(stderr, "clvmd startup, select failed: %s\n", strerror(errno)); 843 exit(DFAIL_INIT); 844 } 845 846 /* 847 * Fork into the background and detach from our parent process. 848 * In the interests of user-friendliness we wait for the daemon 849 * to complete initialisation before returning its status 850 * the the user. 851 */ 852 static void be_daemon(int timeout) 853 { 854 pid_t pid; 855 int devnull = open("/dev/null", O_RDWR); 856 if (devnull == -1) { 857 perror("Can't open /dev/null"); 858 exit(3); 859 } 860 861 pipe(child_pipe); 862 863 switch (pid = fork()) { 864 case -1: 865 perror("clvmd: can't fork"); 866 exit(2); 867 868 case 0: /* Child */ 869 close(child_pipe[0]); 870 break; 871 872 default: /* Parent */ 873 close(child_pipe[1]); 874 wait_for_child(child_pipe[0], timeout); 875 } 876 877 /* Detach ourself from the calling environment */ 878 if (close(0) || close(1) || close(2)) { 879 perror("Error closing terminal FDs"); 880 exit(4); 881 } 882 setsid(); 883 884 if (dup2(devnull, 0) < 0 || dup2(devnull, 1) < 0 885 || dup2(devnull, 2) < 0) { 886 perror("Error setting terminal FDs to /dev/null"); 887 log_error("Error setting terminal FDs to /dev/null: %m"); 888 exit(5); 889 } 890 if (chdir("/")) { 891 log_error("Error setting current directory to /: %m"); 892 exit(6); 893 } 894 895 } 896 897 /* Called when we have a read from the local socket. 898 was in the main loop but it's grown up and is a big girl now */ 899 static int read_from_local_sock(struct local_client *thisfd) 900 { 901 int len; 902 int argslen; 903 int missing_len; 904 char buffer[PIPE_BUF]; 905 906 len = read(thisfd->fd, buffer, sizeof(buffer)); 907 if (len == -1 && errno == EINTR) 908 return 1; 909 910 DEBUGLOG("Read on local socket %d, len = %d\n", thisfd->fd, len); 911 912 /* EOF or error on socket */ 913 if (len <= 0) { 914 int *status; 915 int jstat; 916 917 DEBUGLOG("EOF on local socket: inprogress=%d\n", 918 thisfd->bits.localsock.in_progress); 919 920 thisfd->bits.localsock.finished = 1; 921 922 /* If the client went away in mid command then tidy up */ 923 if (thisfd->bits.localsock.in_progress) { 924 pthread_kill(thisfd->bits.localsock.threadid, SIGUSR2); 925 pthread_mutex_lock(&thisfd->bits.localsock.mutex); 926 thisfd->bits.localsock.state = POST_COMMAND; 927 pthread_cond_signal(&thisfd->bits.localsock.cond); 928 pthread_mutex_unlock(&thisfd->bits.localsock.mutex); 929 930 /* Free any unsent buffers */ 931 free_reply(thisfd); 932 } 933 934 /* Kill the subthread & free resources */ 935 if (thisfd->bits.localsock.threadid) { 936 DEBUGLOG("Waiting for child thread\n"); 937 pthread_mutex_lock(&thisfd->bits.localsock.mutex); 938 thisfd->bits.localsock.state = PRE_COMMAND; 939 pthread_cond_signal(&thisfd->bits.localsock.cond); 940 pthread_mutex_unlock(&thisfd->bits.localsock.mutex); 941 942 jstat = 943 pthread_join(thisfd->bits.localsock.threadid, 944 (void **) &status); 945 DEBUGLOG("Joined child thread\n"); 946 947 thisfd->bits.localsock.threadid = 0; 948 pthread_cond_destroy(&thisfd->bits.localsock.cond); 949 pthread_mutex_destroy(&thisfd->bits.localsock.mutex); 950 951 /* Remove the pipe client */ 952 if (thisfd->bits.localsock.pipe_client != NULL) { 953 struct local_client *newfd; 954 struct local_client *lastfd = NULL; 955 struct local_client *free_fd = NULL; 956 957 close(thisfd->bits.localsock.pipe_client->fd); /* Close pipe */ 958 close(thisfd->bits.localsock.pipe); 959 960 /* Remove pipe client */ 961 for (newfd = &local_client_head; newfd != NULL; 962 newfd = newfd->next) { 963 if (thisfd->bits.localsock. 964 pipe_client == newfd) { 965 thisfd->bits.localsock. 966 pipe_client = NULL; 967 968 lastfd->next = newfd->next; 969 free_fd = newfd; 970 newfd->next = lastfd; 971 free(free_fd); 972 break; 973 } 974 lastfd = newfd; 975 } 976 } 977 } 978 979 /* Free the command buffer */ 980 free(thisfd->bits.localsock.cmd); 981 982 /* Clear out the cross-link */ 983 if (thisfd->bits.localsock.pipe_client != NULL) 984 thisfd->bits.localsock.pipe_client->bits.pipe.client = 985 NULL; 986 987 close(thisfd->fd); 988 return 0; 989 } else { 990 int comms_pipe[2]; 991 struct local_client *newfd; 992 char csid[MAX_CSID_LEN]; 993 struct clvm_header *inheader; 994 int status; 995 996 inheader = (struct clvm_header *) buffer; 997 998 /* Fill in the client ID */ 999 inheader->clientid = htonl(thisfd->fd); 1000 1001 /* If we are already busy then return an error */ 1002 if (thisfd->bits.localsock.in_progress) { 1003 struct clvm_header reply; 1004 reply.cmd = CLVMD_CMD_REPLY; 1005 reply.status = EBUSY; 1006 reply.arglen = 0; 1007 reply.flags = 0; 1008 send_message(&reply, sizeof(reply), our_csid, 1009 thisfd->fd, 1010 "Error sending EBUSY reply to local user"); 1011 return len; 1012 } 1013 1014 /* Free any old buffer space */ 1015 free(thisfd->bits.localsock.cmd); 1016 1017 /* See if we have the whole message */ 1018 argslen = 1019 len - strlen(inheader->node) - sizeof(struct clvm_header); 1020 missing_len = inheader->arglen - argslen; 1021 1022 if (missing_len < 0) 1023 missing_len = 0; 1024 1025 /* Save the message */ 1026 thisfd->bits.localsock.cmd = malloc(len + missing_len); 1027 1028 if (!thisfd->bits.localsock.cmd) { 1029 struct clvm_header reply; 1030 reply.cmd = CLVMD_CMD_REPLY; 1031 reply.status = ENOMEM; 1032 reply.arglen = 0; 1033 reply.flags = 0; 1034 send_message(&reply, sizeof(reply), our_csid, 1035 thisfd->fd, 1036 "Error sending ENOMEM reply to local user"); 1037 return 0; 1038 } 1039 memcpy(thisfd->bits.localsock.cmd, buffer, len); 1040 thisfd->bits.localsock.cmd_len = len + missing_len; 1041 inheader = (struct clvm_header *) thisfd->bits.localsock.cmd; 1042 1043 /* If we don't have the full message then read the rest now */ 1044 if (missing_len) { 1045 char *argptr = 1046 inheader->node + strlen(inheader->node) + 1; 1047 1048 while (missing_len > 0 && len >= 0) { 1049 DEBUGLOG 1050 ("got %d bytes, need another %d (total %d)\n", 1051 argslen, missing_len, inheader->arglen); 1052 len = read(thisfd->fd, argptr + argslen, 1053 missing_len); 1054 if (len >= 0) { 1055 missing_len -= len; 1056 argslen += len; 1057 } 1058 } 1059 } 1060 1061 /* Initialise and lock the mutex so the subthread will wait after 1062 finishing the PRE routine */ 1063 if (!thisfd->bits.localsock.threadid) { 1064 pthread_mutex_init(&thisfd->bits.localsock.mutex, NULL); 1065 pthread_cond_init(&thisfd->bits.localsock.cond, NULL); 1066 pthread_mutex_init(&thisfd->bits.localsock.reply_mutex, NULL); 1067 } 1068 1069 /* Only run the command if all the cluster nodes are running CLVMD */ 1070 if (((inheader->flags & CLVMD_FLAG_LOCAL) == 0) && 1071 (check_all_clvmds_running(thisfd) == -1)) { 1072 thisfd->bits.localsock.expected_replies = 0; 1073 thisfd->bits.localsock.num_replies = 0; 1074 send_local_reply(thisfd, EHOSTDOWN, thisfd->fd); 1075 return len; 1076 } 1077 1078 /* Check the node name for validity */ 1079 if (inheader->node[0] && clops->csid_from_name(csid, inheader->node)) { 1080 /* Error, node is not in the cluster */ 1081 struct clvm_header reply; 1082 DEBUGLOG("Unknown node: '%s'\n", inheader->node); 1083 1084 reply.cmd = CLVMD_CMD_REPLY; 1085 reply.status = ENOENT; 1086 reply.flags = 0; 1087 reply.arglen = 0; 1088 send_message(&reply, sizeof(reply), our_csid, 1089 thisfd->fd, 1090 "Error sending ENOENT reply to local user"); 1091 thisfd->bits.localsock.expected_replies = 0; 1092 thisfd->bits.localsock.num_replies = 0; 1093 thisfd->bits.localsock.in_progress = FALSE; 1094 thisfd->bits.localsock.sent_out = FALSE; 1095 return len; 1096 } 1097 1098 /* If we already have a subthread then just signal it to start */ 1099 if (thisfd->bits.localsock.threadid) { 1100 pthread_mutex_lock(&thisfd->bits.localsock.mutex); 1101 thisfd->bits.localsock.state = PRE_COMMAND; 1102 pthread_cond_signal(&thisfd->bits.localsock.cond); 1103 pthread_mutex_unlock(&thisfd->bits.localsock.mutex); 1104 return len; 1105 } 1106 1107 /* Create a pipe and add the reading end to our FD list */ 1108 pipe(comms_pipe); 1109 newfd = malloc(sizeof(struct local_client)); 1110 if (!newfd) { 1111 struct clvm_header reply; 1112 close(comms_pipe[0]); 1113 close(comms_pipe[1]); 1114 1115 reply.cmd = CLVMD_CMD_REPLY; 1116 reply.status = ENOMEM; 1117 reply.arglen = 0; 1118 reply.flags = 0; 1119 send_message(&reply, sizeof(reply), our_csid, 1120 thisfd->fd, 1121 "Error sending ENOMEM reply to local user"); 1122 return len; 1123 } 1124 DEBUGLOG("creating pipe, [%d, %d]\n", comms_pipe[0], 1125 comms_pipe[1]); 1126 newfd->fd = comms_pipe[0]; 1127 newfd->removeme = 0; 1128 newfd->type = THREAD_PIPE; 1129 newfd->callback = local_pipe_callback; 1130 newfd->next = thisfd->next; 1131 newfd->bits.pipe.client = thisfd; 1132 newfd->bits.pipe.threadid = 0; 1133 thisfd->next = newfd; 1134 1135 /* Store a cross link to the pipe */ 1136 thisfd->bits.localsock.pipe_client = newfd; 1137 1138 thisfd->bits.localsock.pipe = comms_pipe[1]; 1139 1140 /* Make sure the thread has a copy of it's own ID */ 1141 newfd->bits.pipe.threadid = thisfd->bits.localsock.threadid; 1142 1143 /* Run the pre routine */ 1144 thisfd->bits.localsock.in_progress = TRUE; 1145 thisfd->bits.localsock.state = PRE_COMMAND; 1146 DEBUGLOG("Creating pre&post thread\n"); 1147 status = pthread_create(&thisfd->bits.localsock.threadid, NULL, 1148 pre_and_post_thread, thisfd); 1149 DEBUGLOG("Created pre&post thread, state = %d\n", status); 1150 } 1151 return len; 1152 } 1153 1154 /* Add a file descriptor from the cluster or comms interface to 1155 our list of FDs for select 1156 */ 1157 int add_client(struct local_client *new_client) 1158 { 1159 new_client->next = local_client_head.next; 1160 local_client_head.next = new_client; 1161 1162 return 0; 1163 } 1164 1165 /* Called when the pre-command has completed successfully - we 1166 now execute the real command on all the requested nodes */ 1167 static int distribute_command(struct local_client *thisfd) 1168 { 1169 struct clvm_header *inheader = 1170 (struct clvm_header *) thisfd->bits.localsock.cmd; 1171 int len = thisfd->bits.localsock.cmd_len; 1172 1173 thisfd->xid = global_xid++; 1174 DEBUGLOG("distribute command: XID = %d\n", thisfd->xid); 1175 1176 /* Forward it to other nodes in the cluster if needed */ 1177 if (!(inheader->flags & CLVMD_FLAG_LOCAL)) { 1178 /* if node is empty then do it on the whole cluster */ 1179 if (inheader->node[0] == '\0') { 1180 thisfd->bits.localsock.expected_replies = 1181 clops->get_num_nodes(); 1182 thisfd->bits.localsock.num_replies = 0; 1183 thisfd->bits.localsock.sent_time = time(NULL); 1184 thisfd->bits.localsock.in_progress = TRUE; 1185 thisfd->bits.localsock.sent_out = TRUE; 1186 1187 /* Do it here first */ 1188 add_to_lvmqueue(thisfd, inheader, len, NULL); 1189 1190 DEBUGLOG("Sending message to all cluster nodes\n"); 1191 inheader->xid = thisfd->xid; 1192 send_message(inheader, len, NULL, -1, 1193 "Error forwarding message to cluster"); 1194 } else { 1195 /* Do it on a single node */ 1196 char csid[MAX_CSID_LEN]; 1197 1198 if (clops->csid_from_name(csid, inheader->node)) { 1199 /* This has already been checked so should not happen */ 1200 return 0; 1201 } else { 1202 /* OK, found a node... */ 1203 thisfd->bits.localsock.expected_replies = 1; 1204 thisfd->bits.localsock.num_replies = 0; 1205 thisfd->bits.localsock.in_progress = TRUE; 1206 1207 /* Are we the requested node ?? */ 1208 if (memcmp(csid, our_csid, max_csid_len) == 0) { 1209 DEBUGLOG("Doing command on local node only\n"); 1210 add_to_lvmqueue(thisfd, inheader, len, NULL); 1211 } else { 1212 DEBUGLOG("Sending message to single node: %s\n", 1213 inheader->node); 1214 inheader->xid = thisfd->xid; 1215 send_message(inheader, len, 1216 csid, -1, 1217 "Error forwarding message to cluster node"); 1218 } 1219 } 1220 } 1221 } else { 1222 /* Local explicitly requested, ignore nodes */ 1223 thisfd->bits.localsock.in_progress = TRUE; 1224 thisfd->bits.localsock.expected_replies = 1; 1225 thisfd->bits.localsock.num_replies = 0; 1226 add_to_lvmqueue(thisfd, inheader, len, NULL); 1227 } 1228 return 0; 1229 } 1230 1231 /* Process a command from a remote node and return the result */ 1232 static void process_remote_command(struct clvm_header *msg, int msglen, int fd, 1233 const char *csid) 1234 { 1235 char *replyargs; 1236 char nodename[max_cluster_member_name_len]; 1237 int replylen = 0; 1238 int buflen = max_cluster_message - sizeof(struct clvm_header) - 1; 1239 int status; 1240 int msg_malloced = 0; 1241 1242 /* Get the node name as we /may/ need it later */ 1243 clops->name_from_csid(csid, nodename); 1244 1245 DEBUGLOG("process_remote_command %s for clientid 0x%x XID %d on node %s\n", 1246 decode_cmd(msg->cmd), msg->clientid, msg->xid, nodename); 1247 1248 /* Check for GOAWAY and sulk */ 1249 if (msg->cmd == CLVMD_CMD_GOAWAY) { 1250 1251 DEBUGLOG("Told to go away by %s\n", nodename); 1252 log_error("Told to go away by %s\n", nodename); 1253 exit(99); 1254 } 1255 1256 /* Version check is internal - don't bother exposing it in 1257 clvmd-command.c */ 1258 if (msg->cmd == CLVMD_CMD_VERSION) { 1259 int version_nums[3]; 1260 char node[256]; 1261 1262 memcpy(version_nums, msg->args, sizeof(version_nums)); 1263 1264 clops->name_from_csid(csid, node); 1265 DEBUGLOG("Remote node %s is version %d.%d.%d\n", 1266 node, 1267 ntohl(version_nums[0]), 1268 ntohl(version_nums[1]), ntohl(version_nums[2])); 1269 1270 if (ntohl(version_nums[0]) != CLVMD_MAJOR_VERSION) { 1271 struct clvm_header byebyemsg; 1272 DEBUGLOG 1273 ("Telling node %s to go away because of incompatible version number\n", 1274 node); 1275 log_notice 1276 ("Telling node %s to go away because of incompatible version number %d.%d.%d\n", 1277 node, ntohl(version_nums[0]), 1278 ntohl(version_nums[1]), ntohl(version_nums[2])); 1279 1280 byebyemsg.cmd = CLVMD_CMD_GOAWAY; 1281 byebyemsg.status = 0; 1282 byebyemsg.flags = 0; 1283 byebyemsg.arglen = 0; 1284 byebyemsg.clientid = 0; 1285 clops->cluster_send_message(&byebyemsg, sizeof(byebyemsg), 1286 our_csid, 1287 "Error Sending GOAWAY message"); 1288 } else { 1289 clops->add_up_node(csid); 1290 } 1291 return; 1292 } 1293 1294 /* Allocate a default reply buffer */ 1295 replyargs = malloc(max_cluster_message - sizeof(struct clvm_header)); 1296 1297 if (replyargs != NULL) { 1298 /* Run the command */ 1299 status = 1300 do_command(NULL, msg, msglen, &replyargs, buflen, 1301 &replylen); 1302 } else { 1303 status = ENOMEM; 1304 } 1305 1306 /* If it wasn't a reply, then reply */ 1307 if (msg->cmd != CLVMD_CMD_REPLY) { 1308 char *aggreply; 1309 1310 aggreply = 1311 realloc(replyargs, replylen + sizeof(struct clvm_header)); 1312 if (aggreply) { 1313 struct clvm_header *agghead = 1314 (struct clvm_header *) aggreply; 1315 1316 replyargs = aggreply; 1317 /* Move it up so there's room for a header in front of the data */ 1318 memmove(aggreply + offsetof(struct clvm_header, args), 1319 replyargs, replylen); 1320 1321 agghead->xid = msg->xid; 1322 agghead->cmd = CLVMD_CMD_REPLY; 1323 agghead->status = status; 1324 agghead->flags = 0; 1325 agghead->clientid = msg->clientid; 1326 agghead->arglen = replylen; 1327 agghead->node[0] = '\0'; 1328 send_message(aggreply, 1329 sizeof(struct clvm_header) + 1330 replylen, csid, fd, 1331 "Error sending command reply"); 1332 } else { 1333 struct clvm_header head; 1334 1335 DEBUGLOG("Error attempting to realloc return buffer\n"); 1336 /* Return a failure response */ 1337 head.cmd = CLVMD_CMD_REPLY; 1338 head.status = ENOMEM; 1339 head.flags = 0; 1340 head.clientid = msg->clientid; 1341 head.arglen = 0; 1342 head.node[0] = '\0'; 1343 send_message(&head, sizeof(struct clvm_header), csid, 1344 fd, "Error sending ENOMEM command reply"); 1345 return; 1346 } 1347 } 1348 1349 /* Free buffer if it was malloced */ 1350 if (msg_malloced) { 1351 free(msg); 1352 } 1353 free(replyargs); 1354 } 1355 1356 /* Add a reply to a command to the list of replies for this client. 1357 If we have got a full set then send them to the waiting client down the local 1358 socket */ 1359 static void add_reply_to_list(struct local_client *client, int status, 1360 const char *csid, const char *buf, int len) 1361 { 1362 struct node_reply *reply; 1363 1364 pthread_mutex_lock(&client->bits.localsock.reply_mutex); 1365 1366 /* Add it to the list of replies */ 1367 reply = malloc(sizeof(struct node_reply)); 1368 if (reply) { 1369 reply->status = status; 1370 clops->name_from_csid(csid, reply->node); 1371 DEBUGLOG("Reply from node %s: %d bytes\n", reply->node, len); 1372 1373 if (len > 0) { 1374 reply->replymsg = malloc(len); 1375 if (!reply->replymsg) { 1376 reply->status = ENOMEM; 1377 } else { 1378 memcpy(reply->replymsg, buf, len); 1379 } 1380 } else { 1381 reply->replymsg = NULL; 1382 } 1383 /* Hook it onto the reply chain */ 1384 reply->next = client->bits.localsock.replies; 1385 client->bits.localsock.replies = reply; 1386 } else { 1387 /* It's all gone horribly wrong... */ 1388 pthread_mutex_unlock(&client->bits.localsock.reply_mutex); 1389 send_local_reply(client, ENOMEM, client->fd); 1390 return; 1391 } 1392 DEBUGLOG("Got %d replies, expecting: %d\n", 1393 client->bits.localsock.num_replies + 1, 1394 client->bits.localsock.expected_replies); 1395 1396 /* If we have the whole lot then do the post-process */ 1397 if (++client->bits.localsock.num_replies == 1398 client->bits.localsock.expected_replies) { 1399 /* Post-process the command */ 1400 if (client->bits.localsock.threadid) { 1401 pthread_mutex_lock(&client->bits.localsock.mutex); 1402 client->bits.localsock.state = POST_COMMAND; 1403 pthread_cond_signal(&client->bits.localsock.cond); 1404 pthread_mutex_unlock(&client->bits.localsock.mutex); 1405 } 1406 } 1407 pthread_mutex_unlock(&client->bits.localsock.reply_mutex); 1408 } 1409 1410 /* This is the thread that runs the PRE and post commands for a particular connection */ 1411 static __attribute__ ((noreturn)) void *pre_and_post_thread(void *arg) 1412 { 1413 struct local_client *client = (struct local_client *) arg; 1414 int status; 1415 int write_status; 1416 sigset_t ss; 1417 int pipe_fd = client->bits.localsock.pipe; 1418 1419 DEBUGLOG("in sub thread: client = %p\n", client); 1420 1421 /* Don't start until the LVM thread is ready */ 1422 pthread_mutex_lock(&lvm_start_mutex); 1423 pthread_mutex_unlock(&lvm_start_mutex); 1424 DEBUGLOG("Sub thread ready for work.\n"); 1425 1426 /* Ignore SIGUSR1 (handled by master process) but enable 1427 SIGUSR2 (kills subthreads) */ 1428 sigemptyset(&ss); 1429 sigaddset(&ss, SIGUSR1); 1430 pthread_sigmask(SIG_BLOCK, &ss, NULL); 1431 1432 sigdelset(&ss, SIGUSR1); 1433 sigaddset(&ss, SIGUSR2); 1434 pthread_sigmask(SIG_UNBLOCK, &ss, NULL); 1435 1436 /* Loop around doing PRE and POST functions until the client goes away */ 1437 while (!client->bits.localsock.finished) { 1438 /* Execute the code */ 1439 status = do_pre_command(client); 1440 1441 if (status) 1442 client->bits.localsock.all_success = 0; 1443 1444 DEBUGLOG("Writing status %d down pipe %d\n", status, pipe_fd); 1445 1446 /* Tell the parent process we have finished this bit */ 1447 do { 1448 write_status = write(pipe_fd, &status, sizeof(int)); 1449 if (write_status == sizeof(int)) 1450 break; 1451 if (write_status < 0 && 1452 (errno == EINTR || errno == EAGAIN)) 1453 continue; 1454 log_error("Error sending to pipe: %m\n"); 1455 break; 1456 } while(1); 1457 1458 if (status) { 1459 client->bits.localsock.state = POST_COMMAND; 1460 goto next_pre; 1461 } 1462 1463 /* We may need to wait for the condition variable before running the post command */ 1464 pthread_mutex_lock(&client->bits.localsock.mutex); 1465 DEBUGLOG("Waiting to do post command - state = %d\n", 1466 client->bits.localsock.state); 1467 1468 if (client->bits.localsock.state != POST_COMMAND) { 1469 pthread_cond_wait(&client->bits.localsock.cond, 1470 &client->bits.localsock.mutex); 1471 } 1472 pthread_mutex_unlock(&client->bits.localsock.mutex); 1473 1474 DEBUGLOG("Got post command condition...\n"); 1475 1476 /* POST function must always run, even if the client aborts */ 1477 status = 0; 1478 do_post_command(client); 1479 1480 do { 1481 write_status = write(pipe_fd, &status, sizeof(int)); 1482 if (write_status == sizeof(int)) 1483 break; 1484 if (write_status < 0 && 1485 (errno == EINTR || errno == EAGAIN)) 1486 continue; 1487 log_error("Error sending to pipe: %m\n"); 1488 break; 1489 } while(1); 1490 next_pre: 1491 DEBUGLOG("Waiting for next pre command\n"); 1492 1493 pthread_mutex_lock(&client->bits.localsock.mutex); 1494 if (client->bits.localsock.state != PRE_COMMAND && 1495 !client->bits.localsock.finished) { 1496 pthread_cond_wait(&client->bits.localsock.cond, 1497 &client->bits.localsock.mutex); 1498 } 1499 pthread_mutex_unlock(&client->bits.localsock.mutex); 1500 1501 DEBUGLOG("Got pre command condition...\n"); 1502 } 1503 DEBUGLOG("Subthread finished\n"); 1504 pthread_exit((void *) 0); 1505 } 1506 1507 /* Process a command on the local node and store the result */ 1508 static int process_local_command(struct clvm_header *msg, int msglen, 1509 struct local_client *client, 1510 unsigned short xid) 1511 { 1512 char *replybuf = malloc(max_cluster_message); 1513 int buflen = max_cluster_message - sizeof(struct clvm_header) - 1; 1514 int replylen = 0; 1515 int status; 1516 1517 DEBUGLOG("process_local_command: %s msg=%p, msglen =%d, client=%p\n", 1518 decode_cmd(msg->cmd), msg, msglen, client); 1519 1520 if (replybuf == NULL) 1521 return -1; 1522 1523 status = do_command(client, msg, msglen, &replybuf, buflen, &replylen); 1524 1525 if (status) 1526 client->bits.localsock.all_success = 0; 1527 1528 /* If we took too long then discard the reply */ 1529 if (xid == client->xid) { 1530 add_reply_to_list(client, status, our_csid, replybuf, replylen); 1531 } else { 1532 DEBUGLOG 1533 ("Local command took too long, discarding xid %d, current is %d\n", 1534 xid, client->xid); 1535 } 1536 1537 free(replybuf); 1538 return status; 1539 } 1540 1541 static int process_reply(const struct clvm_header *msg, int msglen, const char *csid) 1542 { 1543 struct local_client *client = NULL; 1544 1545 client = find_client(msg->clientid); 1546 if (!client) { 1547 DEBUGLOG("Got message for unknown client 0x%x\n", 1548 msg->clientid); 1549 log_error("Got message for unknown client 0x%x\n", 1550 msg->clientid); 1551 return -1; 1552 } 1553 1554 if (msg->status) 1555 client->bits.localsock.all_success = 0; 1556 1557 /* Gather replies together for this client id */ 1558 if (msg->xid == client->xid) { 1559 add_reply_to_list(client, msg->status, csid, msg->args, 1560 msg->arglen); 1561 } else { 1562 DEBUGLOG("Discarding reply with old XID %d, current = %d\n", 1563 msg->xid, client->xid); 1564 } 1565 return 0; 1566 } 1567 1568 /* Send an aggregated reply back to the client */ 1569 static void send_local_reply(struct local_client *client, int status, int fd) 1570 { 1571 struct clvm_header *clientreply; 1572 struct node_reply *thisreply = client->bits.localsock.replies; 1573 char *replybuf; 1574 char *ptr; 1575 int message_len = 0; 1576 1577 DEBUGLOG("Send local reply\n"); 1578 1579 /* Work out the total size of the reply */ 1580 while (thisreply) { 1581 if (thisreply->replymsg) 1582 message_len += strlen(thisreply->replymsg) + 1; 1583 else 1584 message_len++; 1585 1586 message_len += strlen(thisreply->node) + 1 + sizeof(int); 1587 1588 thisreply = thisreply->next; 1589 } 1590 1591 /* Add in the size of our header */ 1592 message_len = message_len + sizeof(struct clvm_header) + 1; 1593 replybuf = malloc(message_len); 1594 1595 clientreply = (struct clvm_header *) replybuf; 1596 clientreply->status = status; 1597 clientreply->cmd = CLVMD_CMD_REPLY; 1598 clientreply->node[0] = '\0'; 1599 clientreply->flags = 0; 1600 1601 ptr = clientreply->args; 1602 1603 /* Add in all the replies, and free them as we go */ 1604 thisreply = client->bits.localsock.replies; 1605 while (thisreply) { 1606 struct node_reply *tempreply = thisreply; 1607 1608 strcpy(ptr, thisreply->node); 1609 ptr += strlen(thisreply->node) + 1; 1610 1611 if (thisreply->status) 1612 clientreply->flags |= CLVMD_FLAG_NODEERRS; 1613 1614 memcpy(ptr, &thisreply->status, sizeof(int)); 1615 ptr += sizeof(int); 1616 1617 if (thisreply->replymsg) { 1618 strcpy(ptr, thisreply->replymsg); 1619 ptr += strlen(thisreply->replymsg) + 1; 1620 } else { 1621 ptr[0] = '\0'; 1622 ptr++; 1623 } 1624 thisreply = thisreply->next; 1625 1626 free(tempreply->replymsg); 1627 free(tempreply); 1628 } 1629 1630 /* Terminate with an empty node name */ 1631 *ptr = '\0'; 1632 1633 clientreply->arglen = ptr - clientreply->args + 1; 1634 1635 /* And send it */ 1636 send_message(replybuf, message_len, our_csid, fd, 1637 "Error sending REPLY to client"); 1638 free(replybuf); 1639 1640 /* Reset comms variables */ 1641 client->bits.localsock.replies = NULL; 1642 client->bits.localsock.expected_replies = 0; 1643 client->bits.localsock.in_progress = FALSE; 1644 client->bits.localsock.sent_out = FALSE; 1645 } 1646 1647 /* Just free a reply chain baceuse it wasn't used. */ 1648 static void free_reply(struct local_client *client) 1649 { 1650 /* Add in all the replies, and free them as we go */ 1651 struct node_reply *thisreply = client->bits.localsock.replies; 1652 while (thisreply) { 1653 struct node_reply *tempreply = thisreply; 1654 1655 thisreply = thisreply->next; 1656 1657 free(tempreply->replymsg); 1658 free(tempreply); 1659 } 1660 client->bits.localsock.replies = NULL; 1661 } 1662 1663 /* Send our version number to the cluster */ 1664 static void send_version_message() 1665 { 1666 char message[sizeof(struct clvm_header) + sizeof(int) * 3]; 1667 struct clvm_header *msg = (struct clvm_header *) message; 1668 int version_nums[3]; 1669 1670 msg->cmd = CLVMD_CMD_VERSION; 1671 msg->status = 0; 1672 msg->flags = 0; 1673 msg->clientid = 0; 1674 msg->arglen = sizeof(version_nums); 1675 1676 version_nums[0] = htonl(CLVMD_MAJOR_VERSION); 1677 version_nums[1] = htonl(CLVMD_MINOR_VERSION); 1678 version_nums[2] = htonl(CLVMD_PATCH_VERSION); 1679 1680 memcpy(&msg->args, version_nums, sizeof(version_nums)); 1681 1682 hton_clvm(msg); 1683 1684 clops->cluster_send_message(message, sizeof(message), NULL, 1685 "Error Sending version number"); 1686 } 1687 1688 /* Send a message to either a local client or another server */ 1689 static int send_message(void *buf, int msglen, const char *csid, int fd, 1690 const char *errtext) 1691 { 1692 int len = 0; 1693 int saved_errno = 0; 1694 struct timespec delay; 1695 struct timespec remtime; 1696 1697 int retry_cnt = 0; 1698 1699 /* Send remote messages down the cluster socket */ 1700 if (csid == NULL || !ISLOCAL_CSID(csid)) { 1701 hton_clvm((struct clvm_header *) buf); 1702 return clops->cluster_send_message(buf, msglen, csid, errtext); 1703 } else { 1704 int ptr = 0; 1705 1706 /* Make sure it all goes */ 1707 do { 1708 if (retry_cnt > MAX_RETRIES) 1709 { 1710 errno = saved_errno; 1711 log_error("%s", errtext); 1712 errno = saved_errno; 1713 break; 1714 } 1715 1716 len = write(fd, buf + ptr, msglen - ptr); 1717 1718 if (len <= 0) { 1719 if (errno == EINTR) 1720 continue; 1721 if (errno == EAGAIN || 1722 errno == EIO || 1723 errno == ENOSPC) { 1724 saved_errno = errno; 1725 retry_cnt++; 1726 1727 delay.tv_sec = 0; 1728 delay.tv_nsec = 100000; 1729 remtime.tv_sec = 0; 1730 remtime.tv_nsec = 0; 1731 (void) nanosleep (&delay, &remtime); 1732 1733 continue; 1734 } 1735 log_error("%s", errtext); 1736 break; 1737 } 1738 ptr += len; 1739 } while (ptr < msglen); 1740 } 1741 return len; 1742 } 1743 1744 static int process_work_item(struct lvm_thread_cmd *cmd) 1745 { 1746 /* If msg is NULL then this is a cleanup request */ 1747 if (cmd->msg == NULL) { 1748 DEBUGLOG("process_work_item: free fd %d\n", cmd->client->fd); 1749 cmd_client_cleanup(cmd->client); 1750 free(cmd->client); 1751 return 0; 1752 } 1753 1754 if (!cmd->remote) { 1755 DEBUGLOG("process_work_item: local\n"); 1756 process_local_command(cmd->msg, cmd->msglen, cmd->client, 1757 cmd->xid); 1758 } else { 1759 DEBUGLOG("process_work_item: remote\n"); 1760 process_remote_command(cmd->msg, cmd->msglen, cmd->client->fd, 1761 cmd->csid); 1762 } 1763 return 0; 1764 } 1765 1766 /* 1767 * Routine that runs in the "LVM thread". 1768 */ 1769 static __attribute__ ((noreturn)) void *lvm_thread_fn(void *arg) 1770 { 1771 struct dm_list *cmdl, *tmp; 1772 sigset_t ss; 1773 int using_gulm = (int)(long)arg; 1774 1775 DEBUGLOG("LVM thread function started\n"); 1776 1777 /* Ignore SIGUSR1 & 2 */ 1778 sigemptyset(&ss); 1779 sigaddset(&ss, SIGUSR1); 1780 sigaddset(&ss, SIGUSR2); 1781 pthread_sigmask(SIG_BLOCK, &ss, NULL); 1782 1783 /* Initialise the interface to liblvm */ 1784 init_lvm(using_gulm); 1785 1786 /* Allow others to get moving */ 1787 pthread_mutex_unlock(&lvm_start_mutex); 1788 1789 /* Now wait for some actual work */ 1790 for (;;) { 1791 DEBUGLOG("LVM thread waiting for work\n"); 1792 1793 pthread_mutex_lock(&lvm_thread_mutex); 1794 if (dm_list_empty(&lvm_cmd_head)) 1795 pthread_cond_wait(&lvm_thread_cond, &lvm_thread_mutex); 1796 1797 dm_list_iterate_safe(cmdl, tmp, &lvm_cmd_head) { 1798 struct lvm_thread_cmd *cmd; 1799 1800 cmd = 1801 dm_list_struct_base(cmdl, struct lvm_thread_cmd, list); 1802 dm_list_del(&cmd->list); 1803 pthread_mutex_unlock(&lvm_thread_mutex); 1804 1805 process_work_item(cmd); 1806 free(cmd->msg); 1807 free(cmd); 1808 1809 pthread_mutex_lock(&lvm_thread_mutex); 1810 } 1811 pthread_mutex_unlock(&lvm_thread_mutex); 1812 } 1813 } 1814 1815 /* Pass down some work to the LVM thread */ 1816 static int add_to_lvmqueue(struct local_client *client, struct clvm_header *msg, 1817 int msglen, const char *csid) 1818 { 1819 struct lvm_thread_cmd *cmd; 1820 1821 cmd = malloc(sizeof(struct lvm_thread_cmd)); 1822 if (!cmd) 1823 return ENOMEM; 1824 1825 if (msglen) { 1826 cmd->msg = malloc(msglen); 1827 if (!cmd->msg) { 1828 log_error("Unable to allocate buffer space\n"); 1829 free(cmd); 1830 return -1; 1831 } 1832 memcpy(cmd->msg, msg, msglen); 1833 } 1834 else { 1835 cmd->msg = NULL; 1836 } 1837 cmd->client = client; 1838 cmd->msglen = msglen; 1839 cmd->xid = client->xid; 1840 1841 if (csid) { 1842 memcpy(cmd->csid, csid, max_csid_len); 1843 cmd->remote = 1; 1844 } else { 1845 cmd->remote = 0; 1846 } 1847 1848 DEBUGLOG 1849 ("add_to_lvmqueue: cmd=%p. client=%p, msg=%p, len=%d, csid=%p, xid=%d\n", 1850 cmd, client, msg, msglen, csid, cmd->xid); 1851 pthread_mutex_lock(&lvm_thread_mutex); 1852 dm_list_add(&lvm_cmd_head, &cmd->list); 1853 pthread_cond_signal(&lvm_thread_cond); 1854 pthread_mutex_unlock(&lvm_thread_mutex); 1855 1856 return 0; 1857 } 1858 1859 /* Return 0 if we can talk to an existing clvmd */ 1860 static int check_local_clvmd(void) 1861 { 1862 int local_socket; 1863 struct sockaddr_un sockaddr; 1864 int ret = 0; 1865 1866 /* Open local socket */ 1867 if ((local_socket = socket(PF_UNIX, SOCK_STREAM, 0)) < 0) { 1868 return -1; 1869 } 1870 1871 memset(&sockaddr, 0, sizeof(sockaddr)); 1872 memcpy(sockaddr.sun_path, CLVMD_SOCKNAME, sizeof(CLVMD_SOCKNAME)); 1873 sockaddr.sun_family = AF_UNIX; 1874 1875 if (connect(local_socket,(struct sockaddr *) &sockaddr, 1876 sizeof(sockaddr))) { 1877 ret = -1; 1878 } 1879 1880 close(local_socket); 1881 return ret; 1882 } 1883 1884 1885 /* Open the local socket, that's the one we talk to libclvm down */ 1886 static int open_local_sock() 1887 { 1888 int local_socket; 1889 struct sockaddr_un sockaddr; 1890 1891 /* Open local socket */ 1892 if (CLVMD_SOCKNAME[0] != '\0') 1893 unlink(CLVMD_SOCKNAME); 1894 local_socket = socket(PF_UNIX, SOCK_STREAM, 0); 1895 if (local_socket < 0) { 1896 log_error("Can't create local socket: %m"); 1897 return -1; 1898 } 1899 /* Set Close-on-exec & non-blocking */ 1900 fcntl(local_socket, F_SETFD, 1); 1901 fcntl(local_socket, F_SETFL, fcntl(local_socket, F_GETFL, 0) | O_NONBLOCK); 1902 1903 memset(&sockaddr, 0, sizeof(sockaddr)); 1904 memcpy(sockaddr.sun_path, CLVMD_SOCKNAME, sizeof(CLVMD_SOCKNAME)); 1905 sockaddr.sun_family = AF_UNIX; 1906 if (bind(local_socket, (struct sockaddr *) &sockaddr, sizeof(sockaddr))) { 1907 log_error("can't bind local socket: %m"); 1908 close(local_socket); 1909 return -1; 1910 } 1911 if (listen(local_socket, 1) != 0) { 1912 log_error("listen local: %m"); 1913 close(local_socket); 1914 return -1; 1915 } 1916 if (CLVMD_SOCKNAME[0] != '\0') 1917 chmod(CLVMD_SOCKNAME, 0600); 1918 1919 return local_socket; 1920 } 1921 1922 void process_message(struct local_client *client, const char *buf, int len, 1923 const char *csid) 1924 { 1925 struct clvm_header *inheader; 1926 1927 inheader = (struct clvm_header *) buf; 1928 ntoh_clvm(inheader); /* Byteswap fields */ 1929 if (inheader->cmd == CLVMD_CMD_REPLY) 1930 process_reply(inheader, len, csid); 1931 else 1932 add_to_lvmqueue(client, inheader, len, csid); 1933 } 1934 1935 1936 static void check_all_callback(struct local_client *client, const char *csid, 1937 int node_up) 1938 { 1939 if (!node_up) 1940 add_reply_to_list(client, EHOSTDOWN, csid, "CLVMD not running", 1941 18); 1942 } 1943 1944 /* Check to see if all CLVMDs are running (ie one on 1945 every node in the cluster). 1946 If not, returns -1 and prints out a list of errant nodes */ 1947 static int check_all_clvmds_running(struct local_client *client) 1948 { 1949 DEBUGLOG("check_all_clvmds_running\n"); 1950 return clops->cluster_do_node_callback(client, check_all_callback); 1951 } 1952 1953 /* Return a local_client struct given a client ID. 1954 client IDs are in network byte order */ 1955 static struct local_client *find_client(int clientid) 1956 { 1957 struct local_client *thisfd; 1958 for (thisfd = &local_client_head; thisfd != NULL; thisfd = thisfd->next) { 1959 if (thisfd->fd == ntohl(clientid)) 1960 return thisfd; 1961 } 1962 return NULL; 1963 } 1964 1965 /* Byte-swapping routines for the header so we 1966 work in a heterogeneous environment */ 1967 static void hton_clvm(struct clvm_header *hdr) 1968 { 1969 hdr->status = htonl(hdr->status); 1970 hdr->arglen = htonl(hdr->arglen); 1971 hdr->xid = htons(hdr->xid); 1972 /* Don't swap clientid as it's only a token as far as 1973 remote nodes are concerned */ 1974 } 1975 1976 static void ntoh_clvm(struct clvm_header *hdr) 1977 { 1978 hdr->status = ntohl(hdr->status); 1979 hdr->arglen = ntohl(hdr->arglen); 1980 hdr->xid = ntohs(hdr->xid); 1981 } 1982 1983 /* Handler for SIGUSR2 - sent to kill subthreads */ 1984 static void sigusr2_handler(int sig) 1985 { 1986 DEBUGLOG("SIGUSR2 received\n"); 1987 return; 1988 } 1989 1990 static void sigterm_handler(int sig) 1991 { 1992 DEBUGLOG("SIGTERM received\n"); 1993 quit = 1; 1994 return; 1995 } 1996 1997 static void sighup_handler(int sig) 1998 { 1999 DEBUGLOG("got SIGHUP\n"); 2000 reread_config = 1; 2001 } 2002 2003 int sync_lock(const char *resource, int mode, int flags, int *lockid) 2004 { 2005 return clops->sync_lock(resource, mode, flags, lockid); 2006 } 2007 2008 int sync_unlock(const char *resource, int lockid) 2009 { 2010 return clops->sync_unlock(resource, lockid); 2011 } 2012 2013