xref: /netbsd-src/external/gpl2/lvm2/dist/daemons/clvmd/clvmd.c (revision 2d48ac808c43ea6701ba8f33cfc3645685301f79)
1 /*	$NetBSD: clvmd.c,v 1.1.1.2 2009/02/18 11:16:38 haad Exp $	*/
2 
3 /*
4  * Copyright (C) 2002-2004 Sistina Software, Inc. All rights reserved.
5  * Copyright (C) 2004-2007 Red Hat, Inc. All rights reserved.
6  *
7  * This file is part of LVM2.
8  *
9  * This copyrighted material is made available to anyone wishing to use,
10  * modify, copy, or redistribute it subject to the terms and conditions
11  * of the GNU General Public License v.2.
12  *
13  * You should have received a copy of the GNU General Public License
14  * along with this program; if not, write to the Free Software Foundation,
15  * Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
16  */
17 
18 /*
19  * CLVMD: Cluster LVM daemon
20  */
21 
22 #define _GNU_SOURCE
23 #define _FILE_OFFSET_BITS 64
24 
25 #include <configure.h>
26 #include <libdevmapper.h>
27 
28 #include <pthread.h>
29 #include <sys/types.h>
30 #include <sys/stat.h>
31 #include <sys/socket.h>
32 #include <sys/uio.h>
33 #include <sys/un.h>
34 #include <sys/time.h>
35 #include <sys/ioctl.h>
36 #include <sys/utsname.h>
37 #include <netinet/in.h>
38 #include <stdio.h>
39 #include <stdlib.h>
40 #include <stddef.h>
41 #include <stdarg.h>
42 #include <signal.h>
43 #include <unistd.h>
44 #include <fcntl.h>
45 #include <getopt.h>
46 #include <syslog.h>
47 #include <errno.h>
48 #include <limits.h>
49 #include <libdlm.h>
50 
51 #include "clvmd-comms.h"
52 #include "lvm-functions.h"
53 #include "clvm.h"
54 #include "version.h"
55 #include "clvmd.h"
56 #include "refresh_clvmd.h"
57 #include "lvm-logging.h"
58 
59 #ifndef TRUE
60 #define TRUE 1
61 #endif
62 #ifndef FALSE
63 #define FALSE 0
64 #endif
65 
66 #define MAX_RETRIES 4
67 
68 #define ISLOCAL_CSID(c) (memcmp(c, our_csid, max_csid_len) == 0)
69 
70 /* Head of the fd list. Also contains
71    the cluster_socket details */
72 static struct local_client local_client_head;
73 
74 static unsigned short global_xid = 0;	/* Last transaction ID issued */
75 
76 struct cluster_ops *clops = NULL;
77 
78 static char our_csid[MAX_CSID_LEN];
79 static unsigned max_csid_len;
80 static unsigned max_cluster_message;
81 static unsigned max_cluster_member_name_len;
82 
83 /* Structure of items on the LVM thread list */
84 struct lvm_thread_cmd {
85 	struct dm_list list;
86 
87 	struct local_client *client;
88 	struct clvm_header *msg;
89 	char csid[MAX_CSID_LEN];
90 	int remote;		/* Flag */
91 	int msglen;
92 	unsigned short xid;
93 };
94 
95 debug_t debug;
96 static pthread_t lvm_thread;
97 static pthread_mutex_t lvm_thread_mutex;
98 static pthread_cond_t lvm_thread_cond;
99 static pthread_mutex_t lvm_start_mutex;
100 static struct dm_list lvm_cmd_head;
101 static volatile sig_atomic_t quit = 0;
102 static volatile sig_atomic_t reread_config = 0;
103 static int child_pipe[2];
104 
105 /* Reasons the daemon failed initialisation */
106 #define DFAIL_INIT       1
107 #define DFAIL_LOCAL_SOCK 2
108 #define DFAIL_CLUSTER_IF 3
109 #define DFAIL_MALLOC     4
110 #define DFAIL_TIMEOUT    5
111 #define SUCCESS          0
112 
113 /* Prototypes for code further down */
114 static void sigusr2_handler(int sig);
115 static void sighup_handler(int sig);
116 static void sigterm_handler(int sig);
117 static void send_local_reply(struct local_client *client, int status,
118 			     int clientid);
119 static void free_reply(struct local_client *client);
120 static void send_version_message(void);
121 static void *pre_and_post_thread(void *arg);
122 static int send_message(void *buf, int msglen, const char *csid, int fd,
123 			const char *errtext);
124 static int read_from_local_sock(struct local_client *thisfd);
125 static int process_local_command(struct clvm_header *msg, int msglen,
126 				 struct local_client *client,
127 				 unsigned short xid);
128 static void process_remote_command(struct clvm_header *msg, int msglen, int fd,
129 				   const char *csid);
130 static int process_reply(const struct clvm_header *msg, int msglen,
131 			 const char *csid);
132 static int open_local_sock(void);
133 static int check_local_clvmd(void);
134 static struct local_client *find_client(int clientid);
135 static void main_loop(int local_sock, int cmd_timeout);
136 static void be_daemon(int start_timeout);
137 static int check_all_clvmds_running(struct local_client *client);
138 static int local_rendezvous_callback(struct local_client *thisfd, char *buf,
139 				     int len, const char *csid,
140 				     struct local_client **new_client);
141 static void *lvm_thread_fn(void *);
142 static int add_to_lvmqueue(struct local_client *client, struct clvm_header *msg,
143 			   int msglen, const char *csid);
144 static int distribute_command(struct local_client *thisfd);
145 static void hton_clvm(struct clvm_header *hdr);
146 static void ntoh_clvm(struct clvm_header *hdr);
147 static void add_reply_to_list(struct local_client *client, int status,
148 			      const char *csid, const char *buf, int len);
149 
150 static void usage(char *prog, FILE *file)
151 {
152 	fprintf(file, "Usage:\n");
153 	fprintf(file, "%s [Vhd]\n", prog);
154 	fprintf(file, "\n");
155 	fprintf(file, "   -V       Show version of clvmd\n");
156 	fprintf(file, "   -h       Show this help information\n");
157 	fprintf(file, "   -d       Set debug level\n");
158 	fprintf(file, "            If starting clvmd then don't fork, run in the foreground\n");
159 	fprintf(file, "   -R       Tell all running clvmds in the cluster to reload their device cache\n");
160 	fprintf(file, "   -C       Sets debug level (from -d) on all clvmd instances clusterwide\n");
161 	fprintf(file, "   -t<secs> Command timeout (default 60 seconds)\n");
162 	fprintf(file, "   -T<secs> Startup timeout (default none)\n");
163 	fprintf(file, "\n");
164 }
165 
166 /* Called to signal the parent how well we got on during initialisation */
167 static void child_init_signal(int status)
168 {
169         if (child_pipe[1]) {
170 	        write(child_pipe[1], &status, sizeof(status));
171 		close(child_pipe[1]);
172 	}
173 	if (status)
174 	        exit(status);
175 }
176 
177 
178 void debuglog(const char *fmt, ...)
179 {
180 	time_t P;
181 	va_list ap;
182 	static int syslog_init = 0;
183 
184 	if (debug == DEBUG_STDERR) {
185 		va_start(ap,fmt);
186 		time(&P);
187 		fprintf(stderr, "CLVMD[%x]: %.15s ", (int)pthread_self(), ctime(&P)+4 );
188 		vfprintf(stderr, fmt, ap);
189 		va_end(ap);
190 	}
191 	if (debug == DEBUG_SYSLOG) {
192 		if (!syslog_init) {
193 			openlog("clvmd", LOG_PID, LOG_DAEMON);
194 			syslog_init = 1;
195 		}
196 
197 		va_start(ap,fmt);
198 		vsyslog(LOG_DEBUG, fmt, ap);
199 		va_end(ap);
200 	}
201 }
202 
203 static const char *decode_cmd(unsigned char cmdl)
204 {
205 	static char buf[128];
206 	const char *command;
207 
208 	switch (cmdl) {
209 	case CLVMD_CMD_TEST:
210 		command = "TEST";
211 		break;
212 	case CLVMD_CMD_LOCK_VG:
213 		command = "LOCK_VG";
214 		break;
215 	case CLVMD_CMD_LOCK_LV:
216 		command = "LOCK_LV";
217 		break;
218 	case CLVMD_CMD_REFRESH:
219 		command = "REFRESH";
220 		break;
221 	case CLVMD_CMD_SET_DEBUG:
222 		command = "SET_DEBUG";
223 		break;
224 	case CLVMD_CMD_GET_CLUSTERNAME:
225 		command = "GET_CLUSTERNAME";
226 		break;
227 	case CLVMD_CMD_VG_BACKUP:
228 		command = "VG_BACKUP";
229 		break;
230 	case CLVMD_CMD_REPLY:
231 		command = "REPLY";
232 		break;
233 	case CLVMD_CMD_VERSION:
234 		command = "VERSION";
235 		break;
236 	case CLVMD_CMD_GOAWAY:
237 		command = "GOAWAY";
238 		break;
239 	case CLVMD_CMD_LOCK:
240 		command = "LOCK";
241 		break;
242 	case CLVMD_CMD_UNLOCK:
243 		command = "UNLOCK";
244 		break;
245 	default:
246 		command = "unknown";
247 		break;
248 	}
249 
250 	sprintf(buf, "%s (0x%x)", command, cmdl);
251 
252 	return buf;
253 }
254 
255 int main(int argc, char *argv[])
256 {
257 	int local_sock;
258 	struct local_client *newfd;
259 	struct utsname nodeinfo;
260 	signed char opt;
261 	int cmd_timeout = DEFAULT_CMD_TIMEOUT;
262 	int start_timeout = 0;
263 	sigset_t ss;
264 	int using_gulm = 0;
265 	int debug_opt = 0;
266 	int clusterwide_opt = 0;
267 
268 	/* Deal with command-line arguments */
269 	opterr = 0;
270 	optind = 0;
271 	while ((opt = getopt(argc, argv, "?vVhd::t:RT:C")) != EOF) {
272 		switch (opt) {
273 		case 'h':
274 			usage(argv[0], stdout);
275 			exit(0);
276 
277 		case '?':
278 			usage(argv[0], stderr);
279 			exit(0);
280 
281 		case 'R':
282 			return refresh_clvmd();
283 
284 		case 'C':
285 			clusterwide_opt = 1;
286 			break;
287 
288 		case 'd':
289 			debug_opt = 1;
290 			if (optarg)
291 				debug = atoi(optarg);
292 			else
293 				debug = DEBUG_STDERR;
294 			break;
295 
296 		case 't':
297 			cmd_timeout = atoi(optarg);
298 			if (!cmd_timeout) {
299 				fprintf(stderr, "command timeout is invalid\n");
300 				usage(argv[0], stderr);
301 				exit(1);
302 			}
303 			break;
304 		case 'T':
305 			start_timeout = atoi(optarg);
306 			if (start_timeout <= 0) {
307 				fprintf(stderr, "startup timeout is invalid\n");
308 				usage(argv[0], stderr);
309 				exit(1);
310 			}
311 			break;
312 
313 		case 'V':
314 		        printf("Cluster LVM daemon version: %s\n", LVM_VERSION);
315 			printf("Protocol version:           %d.%d.%d\n",
316 			       CLVMD_MAJOR_VERSION, CLVMD_MINOR_VERSION,
317 			       CLVMD_PATCH_VERSION);
318 			exit(1);
319 			break;
320 
321 		}
322 	}
323 
324 	/* Setting debug options on an existing clvmd */
325 	if (debug_opt && !check_local_clvmd()) {
326 
327 		/* Sending to stderr makes no sense for a detached daemon */
328 		if (debug == DEBUG_STDERR)
329 			debug = DEBUG_SYSLOG;
330 		return debug_clvmd(debug, clusterwide_opt);
331 	}
332 
333 	/* Fork into the background (unless requested not to) */
334 	if (debug != DEBUG_STDERR) {
335 		be_daemon(start_timeout);
336 	}
337 
338 	DEBUGLOG("CLVMD started\n");
339 
340 	/* Open the Unix socket we listen for commands on.
341 	   We do this before opening the cluster socket so that
342 	   potential clients will block rather than error if we are running
343 	   but the cluster is not ready yet */
344 	local_sock = open_local_sock();
345 	if (local_sock < 0)
346 		child_init_signal(DFAIL_LOCAL_SOCK);
347 
348 	/* Set up signal handlers, USR1 is for cluster change notifications (in cman)
349 	   USR2 causes child threads to exit.
350 	   HUP causes gulm version to re-read nodes list from CCS.
351 	   PIPE should be ignored */
352 	signal(SIGUSR2, sigusr2_handler);
353 	signal(SIGHUP,  sighup_handler);
354 	signal(SIGPIPE, SIG_IGN);
355 
356 	/* Block SIGUSR2 in the main process */
357 	sigemptyset(&ss);
358 	sigaddset(&ss, SIGUSR2);
359 	sigprocmask(SIG_BLOCK, &ss, NULL);
360 
361 	/* Initialise the LVM thread variables */
362 	dm_list_init(&lvm_cmd_head);
363 	pthread_mutex_init(&lvm_thread_mutex, NULL);
364 	pthread_cond_init(&lvm_thread_cond, NULL);
365 	pthread_mutex_init(&lvm_start_mutex, NULL);
366 	init_lvhash();
367 
368 	/* Start the cluster interface */
369 #ifdef USE_CMAN
370 	if ((clops = init_cman_cluster())) {
371 		max_csid_len = CMAN_MAX_CSID_LEN;
372 		max_cluster_message = CMAN_MAX_CLUSTER_MESSAGE;
373 		max_cluster_member_name_len = CMAN_MAX_NODENAME_LEN;
374 		syslog(LOG_NOTICE, "Cluster LVM daemon started - connected to CMAN");
375 	}
376 #endif
377 #ifdef USE_GULM
378 	if (!clops)
379 		if ((clops = init_gulm_cluster())) {
380 			max_csid_len = GULM_MAX_CSID_LEN;
381 			max_cluster_message = GULM_MAX_CLUSTER_MESSAGE;
382 			max_cluster_member_name_len = GULM_MAX_CLUSTER_MEMBER_NAME_LEN;
383 			using_gulm = 1;
384 			syslog(LOG_NOTICE, "Cluster LVM daemon started - connected to GULM");
385 		}
386 #endif
387 #ifdef USE_OPENAIS
388 	if (!clops)
389 		if ((clops = init_openais_cluster())) {
390 			max_csid_len = OPENAIS_CSID_LEN;
391 			max_cluster_message = OPENAIS_MAX_CLUSTER_MESSAGE;
392 			max_cluster_member_name_len = OPENAIS_MAX_CLUSTER_MEMBER_NAME_LEN;
393 			syslog(LOG_NOTICE, "Cluster LVM daemon started - connected to OpenAIS");
394 		}
395 #endif
396 #ifdef USE_COROSYNC
397 	if (!clops)
398 		if ((clops = init_corosync_cluster())) {
399 			max_csid_len = COROSYNC_CSID_LEN;
400 			max_cluster_message = COROSYNC_MAX_CLUSTER_MESSAGE;
401 			max_cluster_member_name_len = COROSYNC_MAX_CLUSTER_MEMBER_NAME_LEN;
402 			syslog(LOG_NOTICE, "Cluster LVM daemon started - connected to Corosync");
403 		}
404 #endif
405 
406 	if (!clops) {
407 		DEBUGLOG("Can't initialise cluster interface\n");
408 		log_error("Can't initialise cluster interface\n");
409 		child_init_signal(DFAIL_CLUSTER_IF);
410 	}
411 	DEBUGLOG("Cluster ready, doing some more initialisation\n");
412 
413 	/* Save our CSID */
414 	uname(&nodeinfo);
415 	clops->get_our_csid(our_csid);
416 
417 	/* Initialise the FD list head */
418 	local_client_head.fd = clops->get_main_cluster_fd();
419 	local_client_head.type = CLUSTER_MAIN_SOCK;
420 	local_client_head.callback = clops->cluster_fd_callback;
421 
422 	/* Add the local socket to the list */
423 	newfd = malloc(sizeof(struct local_client));
424 	if (!newfd)
425 	        child_init_signal(DFAIL_MALLOC);
426 
427 	newfd->fd = local_sock;
428 	newfd->removeme = 0;
429 	newfd->type = LOCAL_RENDEZVOUS;
430 	newfd->callback = local_rendezvous_callback;
431 	newfd->next = local_client_head.next;
432 	local_client_head.next = newfd;
433 
434 	/* This needs to be started after cluster initialisation
435 	   as it may need to take out locks */
436 	DEBUGLOG("starting LVM thread\n");
437 
438 	/* Don't let anyone else to do work until we are started */
439 	pthread_mutex_lock(&lvm_start_mutex);
440 	pthread_create(&lvm_thread, NULL, lvm_thread_fn,
441 			(void *)(long)using_gulm);
442 
443 	/* Tell the rest of the cluster our version number */
444 	/* CMAN can do this immediately, gulm needs to wait until
445 	   the core initialisation has finished and the node list
446 	   has been gathered */
447 	if (clops->cluster_init_completed)
448 		clops->cluster_init_completed();
449 
450 	DEBUGLOG("clvmd ready for work\n");
451 	child_init_signal(SUCCESS);
452 
453 	/* Try to shutdown neatly */
454 	signal(SIGTERM, sigterm_handler);
455 	signal(SIGINT, sigterm_handler);
456 
457 	/* Do some work */
458 	main_loop(local_sock, cmd_timeout);
459 
460 	return 0;
461 }
462 
463 /* Called when the GuLM cluster layer has completed initialisation.
464    We send the version message */
465 void clvmd_cluster_init_completed()
466 {
467 	send_version_message();
468 }
469 
470 /* Data on a connected socket */
471 static int local_sock_callback(struct local_client *thisfd, char *buf, int len,
472 			       const char *csid,
473 			       struct local_client **new_client)
474 {
475 	*new_client = NULL;
476 	return read_from_local_sock(thisfd);
477 }
478 
479 /* Data on a connected socket */
480 static int local_rendezvous_callback(struct local_client *thisfd, char *buf,
481 				     int len, const char *csid,
482 				     struct local_client **new_client)
483 {
484 	/* Someone connected to our local socket, accept it. */
485 
486 	struct sockaddr_un socka;
487 	struct local_client *newfd;
488 	socklen_t sl = sizeof(socka);
489 	int client_fd = accept(thisfd->fd, (struct sockaddr *) &socka, &sl);
490 
491 	if (client_fd == -1 && errno == EINTR)
492 		return 1;
493 
494 	if (client_fd >= 0) {
495 		newfd = malloc(sizeof(struct local_client));
496 		if (!newfd) {
497 			close(client_fd);
498 			return 1;
499 		}
500 		newfd->fd = client_fd;
501 		newfd->type = LOCAL_SOCK;
502 		newfd->xid = 0;
503 		newfd->removeme = 0;
504 		newfd->callback = local_sock_callback;
505 		newfd->bits.localsock.replies = NULL;
506 		newfd->bits.localsock.expected_replies = 0;
507 		newfd->bits.localsock.cmd = NULL;
508 		newfd->bits.localsock.in_progress = FALSE;
509 		newfd->bits.localsock.sent_out = FALSE;
510 		newfd->bits.localsock.threadid = 0;
511 		newfd->bits.localsock.finished = 0;
512 		newfd->bits.localsock.pipe_client = NULL;
513 		newfd->bits.localsock.private = NULL;
514 		newfd->bits.localsock.all_success = 1;
515 		DEBUGLOG("Got new connection on fd %d\n", newfd->fd);
516 		*new_client = newfd;
517 	}
518 	return 1;
519 }
520 
521 static int local_pipe_callback(struct local_client *thisfd, char *buf,
522 			       int maxlen, const char *csid,
523 			       struct local_client **new_client)
524 {
525 	int len;
526 	char buffer[PIPE_BUF];
527 	struct local_client *sock_client = thisfd->bits.pipe.client;
528 	int status = -1;	/* in error by default */
529 
530 	len = read(thisfd->fd, buffer, sizeof(int));
531 	if (len == -1 && errno == EINTR)
532 		return 1;
533 
534 	if (len == sizeof(int)) {
535 		memcpy(&status, buffer, sizeof(int));
536 	}
537 
538 	DEBUGLOG("read on PIPE %d: %d bytes: status: %d\n",
539 		 thisfd->fd, len, status);
540 
541 	/* EOF on pipe or an error, close it */
542 	if (len <= 0) {
543 		int jstat;
544 		void *ret = &status;
545 		close(thisfd->fd);
546 
547 		/* Clear out the cross-link */
548 		if (thisfd->bits.pipe.client != NULL)
549 			thisfd->bits.pipe.client->bits.localsock.pipe_client =
550 			    NULL;
551 
552 		/* Reap child thread */
553 		if (thisfd->bits.pipe.threadid) {
554 			jstat = pthread_join(thisfd->bits.pipe.threadid, &ret);
555 			thisfd->bits.pipe.threadid = 0;
556 			if (thisfd->bits.pipe.client != NULL)
557 				thisfd->bits.pipe.client->bits.localsock.
558 				    threadid = 0;
559 		}
560 		return -1;
561 	} else {
562 		DEBUGLOG("background routine status was %d, sock_client=%p\n",
563 			 status, sock_client);
564 		/* But has the client gone away ?? */
565 		if (sock_client == NULL) {
566 			DEBUGLOG
567 			    ("Got PIPE response for dead client, ignoring it\n");
568 		} else {
569 			/* If error then just return that code */
570 			if (status)
571 				send_local_reply(sock_client, status,
572 						 sock_client->fd);
573 			else {
574 				if (sock_client->bits.localsock.state ==
575 				    POST_COMMAND) {
576 					send_local_reply(sock_client, 0,
577 							 sock_client->fd);
578 				} else	// PRE_COMMAND finished.
579 				{
580 					if (
581 					    (status =
582 					     distribute_command(sock_client)) !=
583 					    0) send_local_reply(sock_client,
584 								EFBIG,
585 								sock_client->
586 								fd);
587 				}
588 			}
589 		}
590 	}
591 	return len;
592 }
593 
594 /* If a noed is up, look for it in the reply array, if it's not there then
595    add one with "ETIMEDOUT".
596    NOTE: This won't race with real replies because they happen in the same thread.
597 */
598 static void timedout_callback(struct local_client *client, const char *csid,
599 			      int node_up)
600 {
601 	if (node_up) {
602 		struct node_reply *reply;
603 		char nodename[max_cluster_member_name_len];
604 
605 		clops->name_from_csid(csid, nodename);
606 		DEBUGLOG("Checking for a reply from %s\n", nodename);
607 		pthread_mutex_lock(&client->bits.localsock.reply_mutex);
608 
609 		reply = client->bits.localsock.replies;
610 		while (reply && strcmp(reply->node, nodename) != 0) {
611 			reply = reply->next;
612 		}
613 
614 		pthread_mutex_unlock(&client->bits.localsock.reply_mutex);
615 
616 		if (!reply) {
617 			DEBUGLOG("Node %s timed-out\n", nodename);
618 			add_reply_to_list(client, ETIMEDOUT, csid,
619 					  "Command timed out", 18);
620 		}
621 	}
622 }
623 
624 /* Called when the request has timed out on at least one node. We fill in
625    the remaining node entries with ETIMEDOUT and return.
626 
627    By the time we get here the node that caused
628    the timeout could have gone down, in which case we will never get the expected
629    number of replies that triggers the post command so we need to do it here
630 */
631 static void request_timed_out(struct local_client *client)
632 {
633 	DEBUGLOG("Request timed-out. padding\n");
634 	clops->cluster_do_node_callback(client, timedout_callback);
635 
636 	if (client->bits.localsock.num_replies !=
637 	    client->bits.localsock.expected_replies) {
638 		/* Post-process the command */
639 		if (client->bits.localsock.threadid) {
640 			pthread_mutex_lock(&client->bits.localsock.mutex);
641 			client->bits.localsock.state = POST_COMMAND;
642 			pthread_cond_signal(&client->bits.localsock.cond);
643 			pthread_mutex_unlock(&client->bits.localsock.mutex);
644 		}
645 	}
646 }
647 
648 /* This is where the real work happens */
649 static void main_loop(int local_sock, int cmd_timeout)
650 {
651 	DEBUGLOG("Using timeout of %d seconds\n", cmd_timeout);
652 
653 	/* Main loop */
654 	while (!quit) {
655 		fd_set in;
656 		int select_status;
657 		struct local_client *thisfd;
658 		struct timeval tv = { cmd_timeout, 0 };
659 		int quorate = clops->is_quorate();
660 
661 		/* Wait on the cluster FD and all local sockets/pipes */
662 		local_client_head.fd = clops->get_main_cluster_fd();
663 		FD_ZERO(&in);
664 		for (thisfd = &local_client_head; thisfd != NULL;
665 		     thisfd = thisfd->next) {
666 
667 			if (thisfd->removeme)
668 				continue;
669 
670 			/* if the cluster is not quorate then don't listen for new requests */
671 			if ((thisfd->type != LOCAL_RENDEZVOUS &&
672 			     thisfd->type != LOCAL_SOCK) || quorate)
673 				FD_SET(thisfd->fd, &in);
674 		}
675 
676 		select_status = select(FD_SETSIZE, &in, NULL, NULL, &tv);
677 
678 		if (reread_config) {
679 			int saved_errno = errno;
680 
681 			reread_config = 0;
682 			if (clops->reread_config)
683 				clops->reread_config();
684 			errno = saved_errno;
685 		}
686 
687 		if (select_status > 0) {
688 			struct local_client *lastfd = NULL;
689 			char csid[MAX_CSID_LEN];
690 			char buf[max_cluster_message];
691 
692 			for (thisfd = &local_client_head; thisfd != NULL;
693 			     thisfd = thisfd->next) {
694 
695 				if (thisfd->removeme) {
696 					struct local_client *free_fd;
697 					lastfd->next = thisfd->next;
698 					free_fd = thisfd;
699 					thisfd = lastfd;
700 
701 					DEBUGLOG("removeme set for fd %d\n", free_fd->fd);
702 
703 					/* Queue cleanup, this also frees the client struct */
704 					add_to_lvmqueue(free_fd, NULL, 0, NULL);
705 					break;
706 				}
707 
708 				if (FD_ISSET(thisfd->fd, &in)) {
709 					struct local_client *newfd = NULL;
710 					int ret;
711 
712 					/* Do callback */
713 					ret =
714 					    thisfd->callback(thisfd, buf,
715 							     sizeof(buf), csid,
716 							     &newfd);
717 					/* Ignore EAGAIN */
718 					if (ret < 0 && (errno == EAGAIN ||
719 							errno == EINTR)) continue;
720 
721 					/* Got error or EOF: Remove it from the list safely */
722 					if (ret <= 0) {
723 						struct local_client *free_fd;
724 						int type = thisfd->type;
725 
726 						/* If the cluster socket shuts down, so do we */
727 						if (type == CLUSTER_MAIN_SOCK ||
728 						    type == CLUSTER_INTERNAL)
729 							goto closedown;
730 
731 						DEBUGLOG("ret == %d, errno = %d. removing client\n",
732 							 ret, errno);
733 						lastfd->next = thisfd->next;
734 						free_fd = thisfd;
735 						thisfd = lastfd;
736 						close(free_fd->fd);
737 
738 						/* Queue cleanup, this also frees the client struct */
739 						add_to_lvmqueue(free_fd, NULL, 0, NULL);
740 						break;
741 					}
742 
743 					/* New client...simply add it to the list */
744 					if (newfd) {
745 						newfd->next = thisfd->next;
746 						thisfd->next = newfd;
747 						break;
748 					}
749 				}
750 				lastfd = thisfd;
751 			}
752 		}
753 
754 		/* Select timed out. Check for clients that have been waiting too long for a response */
755 		if (select_status == 0) {
756 			time_t the_time = time(NULL);
757 
758 			for (thisfd = &local_client_head; thisfd != NULL;
759 			     thisfd = thisfd->next) {
760 				if (thisfd->type == LOCAL_SOCK
761 				    && thisfd->bits.localsock.sent_out
762 				    && thisfd->bits.localsock.sent_time +
763 				    cmd_timeout < the_time
764 				    && thisfd->bits.localsock.
765 				    expected_replies !=
766 				    thisfd->bits.localsock.num_replies) {
767 					/* Send timed out message + replies we already have */
768 					DEBUGLOG
769 					    ("Request timed-out (send: %ld, now: %ld)\n",
770 					     thisfd->bits.localsock.sent_time,
771 					     the_time);
772 
773 					thisfd->bits.localsock.all_success = 0;
774 
775 					request_timed_out(thisfd);
776 				}
777 			}
778 		}
779 		if (select_status < 0) {
780 			if (errno == EINTR)
781 				continue;
782 
783 #ifdef DEBUG
784 			perror("select error");
785 			exit(-1);
786 #endif
787 		}
788 	}
789 
790       closedown:
791 	clops->cluster_closedown();
792 	close(local_sock);
793 }
794 
795 static __attribute__ ((noreturn)) void wait_for_child(int c_pipe, int timeout)
796 {
797 	int child_status;
798 	int sstat;
799 	fd_set fds;
800 	struct timeval tv = {timeout, 0};
801 
802 	FD_ZERO(&fds);
803 	FD_SET(c_pipe, &fds);
804 
805 	sstat = select(c_pipe+1, &fds, NULL, NULL, timeout? &tv: NULL);
806 	if (sstat == 0) {
807 		fprintf(stderr, "clvmd startup timed out\n");
808 		exit(DFAIL_TIMEOUT);
809 	}
810 	if (sstat == 1) {
811 		if (read(c_pipe, &child_status, sizeof(child_status)) !=
812 		    sizeof(child_status)) {
813 
814 			fprintf(stderr, "clvmd failed in initialisation\n");
815 			exit(DFAIL_INIT);
816 		}
817 		else {
818 			switch (child_status) {
819 			case SUCCESS:
820 				break;
821 			case DFAIL_INIT:
822 				fprintf(stderr, "clvmd failed in initialisation\n");
823 				break;
824 			case DFAIL_LOCAL_SOCK:
825 				fprintf(stderr, "clvmd could not create local socket\n");
826 				fprintf(stderr, "Another clvmd is probably already running\n");
827 				break;
828 			case DFAIL_CLUSTER_IF:
829 				fprintf(stderr, "clvmd could not connect to cluster manager\n");
830 				fprintf(stderr, "Consult syslog for more information\n");
831 				break;
832 			case DFAIL_MALLOC:
833 				fprintf(stderr, "clvmd failed, not enough memory\n");
834 				break;
835 			default:
836 				fprintf(stderr, "clvmd failed, error was %d\n", child_status);
837 				break;
838 			}
839 			exit(child_status);
840 		}
841 	}
842 	fprintf(stderr, "clvmd startup, select failed: %s\n", strerror(errno));
843 	exit(DFAIL_INIT);
844 }
845 
846 /*
847  * Fork into the background and detach from our parent process.
848  * In the interests of user-friendliness we wait for the daemon
849  * to complete initialisation before returning its status
850  * the the user.
851  */
852 static void be_daemon(int timeout)
853 {
854         pid_t pid;
855 	int devnull = open("/dev/null", O_RDWR);
856 	if (devnull == -1) {
857 		perror("Can't open /dev/null");
858 		exit(3);
859 	}
860 
861 	pipe(child_pipe);
862 
863 	switch (pid = fork()) {
864 	case -1:
865 		perror("clvmd: can't fork");
866 		exit(2);
867 
868 	case 0:		/* Child */
869 	        close(child_pipe[0]);
870 		break;
871 
872 	default:       /* Parent */
873 		close(child_pipe[1]);
874 		wait_for_child(child_pipe[0], timeout);
875 	}
876 
877 	/* Detach ourself from the calling environment */
878 	if (close(0) || close(1) || close(2)) {
879 		perror("Error closing terminal FDs");
880 		exit(4);
881 	}
882 	setsid();
883 
884 	if (dup2(devnull, 0) < 0 || dup2(devnull, 1) < 0
885 	    || dup2(devnull, 2) < 0) {
886 		perror("Error setting terminal FDs to /dev/null");
887 		log_error("Error setting terminal FDs to /dev/null: %m");
888 		exit(5);
889 	}
890 	if (chdir("/")) {
891 		log_error("Error setting current directory to /: %m");
892 		exit(6);
893 	}
894 
895 }
896 
897 /* Called when we have a read from the local socket.
898    was in the main loop but it's grown up and is a big girl now */
899 static int read_from_local_sock(struct local_client *thisfd)
900 {
901 	int len;
902 	int argslen;
903 	int missing_len;
904 	char buffer[PIPE_BUF];
905 
906 	len = read(thisfd->fd, buffer, sizeof(buffer));
907 	if (len == -1 && errno == EINTR)
908 		return 1;
909 
910 	DEBUGLOG("Read on local socket %d, len = %d\n", thisfd->fd, len);
911 
912 	/* EOF or error on socket */
913 	if (len <= 0) {
914 		int *status;
915 		int jstat;
916 
917 		DEBUGLOG("EOF on local socket: inprogress=%d\n",
918 			 thisfd->bits.localsock.in_progress);
919 
920 		thisfd->bits.localsock.finished = 1;
921 
922 		/* If the client went away in mid command then tidy up */
923 		if (thisfd->bits.localsock.in_progress) {
924 			pthread_kill(thisfd->bits.localsock.threadid, SIGUSR2);
925 			pthread_mutex_lock(&thisfd->bits.localsock.mutex);
926 			thisfd->bits.localsock.state = POST_COMMAND;
927 			pthread_cond_signal(&thisfd->bits.localsock.cond);
928 			pthread_mutex_unlock(&thisfd->bits.localsock.mutex);
929 
930 			/* Free any unsent buffers */
931 			free_reply(thisfd);
932 		}
933 
934 		/* Kill the subthread & free resources */
935 		if (thisfd->bits.localsock.threadid) {
936 			DEBUGLOG("Waiting for child thread\n");
937 			pthread_mutex_lock(&thisfd->bits.localsock.mutex);
938 			thisfd->bits.localsock.state = PRE_COMMAND;
939 			pthread_cond_signal(&thisfd->bits.localsock.cond);
940 			pthread_mutex_unlock(&thisfd->bits.localsock.mutex);
941 
942 			jstat =
943 			    pthread_join(thisfd->bits.localsock.threadid,
944 					 (void **) &status);
945 			DEBUGLOG("Joined child thread\n");
946 
947 			thisfd->bits.localsock.threadid = 0;
948 			pthread_cond_destroy(&thisfd->bits.localsock.cond);
949 			pthread_mutex_destroy(&thisfd->bits.localsock.mutex);
950 
951 			/* Remove the pipe client */
952 			if (thisfd->bits.localsock.pipe_client != NULL) {
953 				struct local_client *newfd;
954 				struct local_client *lastfd = NULL;
955 				struct local_client *free_fd = NULL;
956 
957 				close(thisfd->bits.localsock.pipe_client->fd);	/* Close pipe */
958 				close(thisfd->bits.localsock.pipe);
959 
960 				/* Remove pipe client */
961 				for (newfd = &local_client_head; newfd != NULL;
962 				     newfd = newfd->next) {
963 					if (thisfd->bits.localsock.
964 					    pipe_client == newfd) {
965 						thisfd->bits.localsock.
966 						    pipe_client = NULL;
967 
968 						lastfd->next = newfd->next;
969 						free_fd = newfd;
970 						newfd->next = lastfd;
971 						free(free_fd);
972 						break;
973 					}
974 					lastfd = newfd;
975 				}
976 			}
977 		}
978 
979 		/* Free the command buffer */
980 		free(thisfd->bits.localsock.cmd);
981 
982 		/* Clear out the cross-link */
983 		if (thisfd->bits.localsock.pipe_client != NULL)
984 			thisfd->bits.localsock.pipe_client->bits.pipe.client =
985 			    NULL;
986 
987 		close(thisfd->fd);
988 		return 0;
989 	} else {
990 		int comms_pipe[2];
991 		struct local_client *newfd;
992 		char csid[MAX_CSID_LEN];
993 		struct clvm_header *inheader;
994 		int status;
995 
996 		inheader = (struct clvm_header *) buffer;
997 
998 		/* Fill in the client ID */
999 		inheader->clientid = htonl(thisfd->fd);
1000 
1001 		/* If we are already busy then return an error */
1002 		if (thisfd->bits.localsock.in_progress) {
1003 			struct clvm_header reply;
1004 			reply.cmd = CLVMD_CMD_REPLY;
1005 			reply.status = EBUSY;
1006 			reply.arglen = 0;
1007 			reply.flags = 0;
1008 			send_message(&reply, sizeof(reply), our_csid,
1009 				     thisfd->fd,
1010 				     "Error sending EBUSY reply to local user");
1011 			return len;
1012 		}
1013 
1014 		/* Free any old buffer space */
1015 		free(thisfd->bits.localsock.cmd);
1016 
1017 		/* See if we have the whole message */
1018 		argslen =
1019 		    len - strlen(inheader->node) - sizeof(struct clvm_header);
1020 		missing_len = inheader->arglen - argslen;
1021 
1022 		if (missing_len < 0)
1023 			missing_len = 0;
1024 
1025 		/* Save the message */
1026 		thisfd->bits.localsock.cmd = malloc(len + missing_len);
1027 
1028 		if (!thisfd->bits.localsock.cmd) {
1029 			struct clvm_header reply;
1030 			reply.cmd = CLVMD_CMD_REPLY;
1031 			reply.status = ENOMEM;
1032 			reply.arglen = 0;
1033 			reply.flags = 0;
1034 			send_message(&reply, sizeof(reply), our_csid,
1035 				     thisfd->fd,
1036 				     "Error sending ENOMEM reply to local user");
1037 			return 0;
1038 		}
1039 		memcpy(thisfd->bits.localsock.cmd, buffer, len);
1040 		thisfd->bits.localsock.cmd_len = len + missing_len;
1041 		inheader = (struct clvm_header *) thisfd->bits.localsock.cmd;
1042 
1043 		/* If we don't have the full message then read the rest now */
1044 		if (missing_len) {
1045 			char *argptr =
1046 			    inheader->node + strlen(inheader->node) + 1;
1047 
1048 			while (missing_len > 0 && len >= 0) {
1049 				DEBUGLOG
1050 				    ("got %d bytes, need another %d (total %d)\n",
1051 				     argslen, missing_len, inheader->arglen);
1052 				len = read(thisfd->fd, argptr + argslen,
1053 					   missing_len);
1054 				if (len >= 0) {
1055 					missing_len -= len;
1056 					argslen += len;
1057 				}
1058 			}
1059 		}
1060 
1061 		/* Initialise and lock the mutex so the subthread will wait after
1062 		   finishing the PRE routine */
1063 		if (!thisfd->bits.localsock.threadid) {
1064 			pthread_mutex_init(&thisfd->bits.localsock.mutex, NULL);
1065 			pthread_cond_init(&thisfd->bits.localsock.cond, NULL);
1066 			pthread_mutex_init(&thisfd->bits.localsock.reply_mutex, NULL);
1067 		}
1068 
1069 		/* Only run the command if all the cluster nodes are running CLVMD */
1070 		if (((inheader->flags & CLVMD_FLAG_LOCAL) == 0) &&
1071 		    (check_all_clvmds_running(thisfd) == -1)) {
1072 			thisfd->bits.localsock.expected_replies = 0;
1073 			thisfd->bits.localsock.num_replies = 0;
1074 			send_local_reply(thisfd, EHOSTDOWN, thisfd->fd);
1075 			return len;
1076 		}
1077 
1078 		/* Check the node name for validity */
1079 		if (inheader->node[0] && clops->csid_from_name(csid, inheader->node)) {
1080 			/* Error, node is not in the cluster */
1081 			struct clvm_header reply;
1082 			DEBUGLOG("Unknown node: '%s'\n", inheader->node);
1083 
1084 			reply.cmd = CLVMD_CMD_REPLY;
1085 			reply.status = ENOENT;
1086 			reply.flags = 0;
1087 			reply.arglen = 0;
1088 			send_message(&reply, sizeof(reply), our_csid,
1089 				     thisfd->fd,
1090 				     "Error sending ENOENT reply to local user");
1091 			thisfd->bits.localsock.expected_replies = 0;
1092 			thisfd->bits.localsock.num_replies = 0;
1093 			thisfd->bits.localsock.in_progress = FALSE;
1094 			thisfd->bits.localsock.sent_out = FALSE;
1095 			return len;
1096 		}
1097 
1098 		/* If we already have a subthread then just signal it to start */
1099 		if (thisfd->bits.localsock.threadid) {
1100 			pthread_mutex_lock(&thisfd->bits.localsock.mutex);
1101 			thisfd->bits.localsock.state = PRE_COMMAND;
1102 			pthread_cond_signal(&thisfd->bits.localsock.cond);
1103 			pthread_mutex_unlock(&thisfd->bits.localsock.mutex);
1104 			return len;
1105 		}
1106 
1107 		/* Create a pipe and add the reading end to our FD list */
1108 		pipe(comms_pipe);
1109 		newfd = malloc(sizeof(struct local_client));
1110 		if (!newfd) {
1111 			struct clvm_header reply;
1112 			close(comms_pipe[0]);
1113 			close(comms_pipe[1]);
1114 
1115 			reply.cmd = CLVMD_CMD_REPLY;
1116 			reply.status = ENOMEM;
1117 			reply.arglen = 0;
1118 			reply.flags = 0;
1119 			send_message(&reply, sizeof(reply), our_csid,
1120 				     thisfd->fd,
1121 				     "Error sending ENOMEM reply to local user");
1122 			return len;
1123 		}
1124 		DEBUGLOG("creating pipe, [%d, %d]\n", comms_pipe[0],
1125 			 comms_pipe[1]);
1126 		newfd->fd = comms_pipe[0];
1127 		newfd->removeme = 0;
1128 		newfd->type = THREAD_PIPE;
1129 		newfd->callback = local_pipe_callback;
1130 		newfd->next = thisfd->next;
1131 		newfd->bits.pipe.client = thisfd;
1132 		newfd->bits.pipe.threadid = 0;
1133 		thisfd->next = newfd;
1134 
1135 		/* Store a cross link to the pipe */
1136 		thisfd->bits.localsock.pipe_client = newfd;
1137 
1138 		thisfd->bits.localsock.pipe = comms_pipe[1];
1139 
1140 		/* Make sure the thread has a copy of it's own ID */
1141 		newfd->bits.pipe.threadid = thisfd->bits.localsock.threadid;
1142 
1143 		/* Run the pre routine */
1144 		thisfd->bits.localsock.in_progress = TRUE;
1145 		thisfd->bits.localsock.state = PRE_COMMAND;
1146 		DEBUGLOG("Creating pre&post thread\n");
1147 		status = pthread_create(&thisfd->bits.localsock.threadid, NULL,
1148 			       pre_and_post_thread, thisfd);
1149 		DEBUGLOG("Created pre&post thread, state = %d\n", status);
1150 	}
1151 	return len;
1152 }
1153 
1154 /* Add a file descriptor from the cluster or comms interface to
1155    our list of FDs for select
1156 */
1157 int add_client(struct local_client *new_client)
1158 {
1159 	new_client->next = local_client_head.next;
1160 	local_client_head.next = new_client;
1161 
1162 	return 0;
1163 }
1164 
1165 /* Called when the pre-command has completed successfully - we
1166    now execute the real command on all the requested nodes */
1167 static int distribute_command(struct local_client *thisfd)
1168 {
1169 	struct clvm_header *inheader =
1170 	    (struct clvm_header *) thisfd->bits.localsock.cmd;
1171 	int len = thisfd->bits.localsock.cmd_len;
1172 
1173 	thisfd->xid = global_xid++;
1174 	DEBUGLOG("distribute command: XID = %d\n", thisfd->xid);
1175 
1176 	/* Forward it to other nodes in the cluster if needed */
1177 	if (!(inheader->flags & CLVMD_FLAG_LOCAL)) {
1178 		/* if node is empty then do it on the whole cluster */
1179 		if (inheader->node[0] == '\0') {
1180 			thisfd->bits.localsock.expected_replies =
1181 			    clops->get_num_nodes();
1182 			thisfd->bits.localsock.num_replies = 0;
1183 			thisfd->bits.localsock.sent_time = time(NULL);
1184 			thisfd->bits.localsock.in_progress = TRUE;
1185 			thisfd->bits.localsock.sent_out = TRUE;
1186 
1187 			/* Do it here first */
1188 			add_to_lvmqueue(thisfd, inheader, len, NULL);
1189 
1190 			DEBUGLOG("Sending message to all cluster nodes\n");
1191 			inheader->xid = thisfd->xid;
1192 			send_message(inheader, len, NULL, -1,
1193 				     "Error forwarding message to cluster");
1194 		} else {
1195                         /* Do it on a single node */
1196 			char csid[MAX_CSID_LEN];
1197 
1198 			if (clops->csid_from_name(csid, inheader->node)) {
1199 				/* This has already been checked so should not happen */
1200 				return 0;
1201 			} else {
1202 			        /* OK, found a node... */
1203 				thisfd->bits.localsock.expected_replies = 1;
1204 				thisfd->bits.localsock.num_replies = 0;
1205 				thisfd->bits.localsock.in_progress = TRUE;
1206 
1207 				/* Are we the requested node ?? */
1208 				if (memcmp(csid, our_csid, max_csid_len) == 0) {
1209 					DEBUGLOG("Doing command on local node only\n");
1210 					add_to_lvmqueue(thisfd, inheader, len, NULL);
1211 				} else {
1212 					DEBUGLOG("Sending message to single node: %s\n",
1213 						 inheader->node);
1214 					inheader->xid = thisfd->xid;
1215 					send_message(inheader, len,
1216 						     csid, -1,
1217 						     "Error forwarding message to cluster node");
1218 				}
1219 			}
1220 		}
1221 	} else {
1222 		/* Local explicitly requested, ignore nodes */
1223 		thisfd->bits.localsock.in_progress = TRUE;
1224 		thisfd->bits.localsock.expected_replies = 1;
1225 		thisfd->bits.localsock.num_replies = 0;
1226 		add_to_lvmqueue(thisfd, inheader, len, NULL);
1227 	}
1228 	return 0;
1229 }
1230 
1231 /* Process a command from a remote node and return the result */
1232 static void process_remote_command(struct clvm_header *msg, int msglen, int fd,
1233 			    	   const char *csid)
1234 {
1235 	char *replyargs;
1236 	char nodename[max_cluster_member_name_len];
1237 	int replylen = 0;
1238 	int buflen = max_cluster_message - sizeof(struct clvm_header) - 1;
1239 	int status;
1240 	int msg_malloced = 0;
1241 
1242 	/* Get the node name as we /may/ need it later */
1243 	clops->name_from_csid(csid, nodename);
1244 
1245 	DEBUGLOG("process_remote_command %s for clientid 0x%x XID %d on node %s\n",
1246 		 decode_cmd(msg->cmd), msg->clientid, msg->xid, nodename);
1247 
1248 	/* Check for GOAWAY and sulk */
1249 	if (msg->cmd == CLVMD_CMD_GOAWAY) {
1250 
1251 		DEBUGLOG("Told to go away by %s\n", nodename);
1252 		log_error("Told to go away by %s\n", nodename);
1253 		exit(99);
1254 	}
1255 
1256 	/* Version check is internal - don't bother exposing it in
1257 	   clvmd-command.c */
1258 	if (msg->cmd == CLVMD_CMD_VERSION) {
1259 		int version_nums[3];
1260 		char node[256];
1261 
1262 		memcpy(version_nums, msg->args, sizeof(version_nums));
1263 
1264 		clops->name_from_csid(csid, node);
1265 		DEBUGLOG("Remote node %s is version %d.%d.%d\n",
1266 			 node,
1267 			 ntohl(version_nums[0]),
1268 			 ntohl(version_nums[1]), ntohl(version_nums[2]));
1269 
1270 		if (ntohl(version_nums[0]) != CLVMD_MAJOR_VERSION) {
1271 			struct clvm_header byebyemsg;
1272 			DEBUGLOG
1273 			    ("Telling node %s to go away because of incompatible version number\n",
1274 			     node);
1275 			log_notice
1276 			    ("Telling node %s to go away because of incompatible version number %d.%d.%d\n",
1277 			     node, ntohl(version_nums[0]),
1278 			     ntohl(version_nums[1]), ntohl(version_nums[2]));
1279 
1280 			byebyemsg.cmd = CLVMD_CMD_GOAWAY;
1281 			byebyemsg.status = 0;
1282 			byebyemsg.flags = 0;
1283 			byebyemsg.arglen = 0;
1284 			byebyemsg.clientid = 0;
1285 			clops->cluster_send_message(&byebyemsg, sizeof(byebyemsg),
1286 					     our_csid,
1287 					     "Error Sending GOAWAY message");
1288 		} else {
1289 			clops->add_up_node(csid);
1290 		}
1291 		return;
1292 	}
1293 
1294 	/* Allocate a default reply buffer */
1295 	replyargs = malloc(max_cluster_message - sizeof(struct clvm_header));
1296 
1297 	if (replyargs != NULL) {
1298 		/* Run the command */
1299 		status =
1300 		    do_command(NULL, msg, msglen, &replyargs, buflen,
1301 			       &replylen);
1302 	} else {
1303 		status = ENOMEM;
1304 	}
1305 
1306 	/* If it wasn't a reply, then reply */
1307 	if (msg->cmd != CLVMD_CMD_REPLY) {
1308 		char *aggreply;
1309 
1310 		aggreply =
1311 		    realloc(replyargs, replylen + sizeof(struct clvm_header));
1312 		if (aggreply) {
1313 			struct clvm_header *agghead =
1314 			    (struct clvm_header *) aggreply;
1315 
1316 			replyargs = aggreply;
1317 			/* Move it up so there's room for a header in front of the data */
1318 			memmove(aggreply + offsetof(struct clvm_header, args),
1319 				replyargs, replylen);
1320 
1321 			agghead->xid = msg->xid;
1322 			agghead->cmd = CLVMD_CMD_REPLY;
1323 			agghead->status = status;
1324 			agghead->flags = 0;
1325 			agghead->clientid = msg->clientid;
1326 			agghead->arglen = replylen;
1327 			agghead->node[0] = '\0';
1328 			send_message(aggreply,
1329 				     sizeof(struct clvm_header) +
1330 				     replylen, csid, fd,
1331 				     "Error sending command reply");
1332 		} else {
1333 			struct clvm_header head;
1334 
1335 			DEBUGLOG("Error attempting to realloc return buffer\n");
1336 			/* Return a failure response */
1337 			head.cmd = CLVMD_CMD_REPLY;
1338 			head.status = ENOMEM;
1339 			head.flags = 0;
1340 			head.clientid = msg->clientid;
1341 			head.arglen = 0;
1342 			head.node[0] = '\0';
1343 			send_message(&head, sizeof(struct clvm_header), csid,
1344 				     fd, "Error sending ENOMEM command reply");
1345 			return;
1346 		}
1347 	}
1348 
1349 	/* Free buffer if it was malloced */
1350 	if (msg_malloced) {
1351 		free(msg);
1352 	}
1353 	free(replyargs);
1354 }
1355 
1356 /* Add a reply to a command to the list of replies for this client.
1357    If we have got a full set then send them to the waiting client down the local
1358    socket */
1359 static void add_reply_to_list(struct local_client *client, int status,
1360 			      const char *csid, const char *buf, int len)
1361 {
1362 	struct node_reply *reply;
1363 
1364 	pthread_mutex_lock(&client->bits.localsock.reply_mutex);
1365 
1366 	/* Add it to the list of replies */
1367 	reply = malloc(sizeof(struct node_reply));
1368 	if (reply) {
1369 		reply->status = status;
1370 		clops->name_from_csid(csid, reply->node);
1371 		DEBUGLOG("Reply from node %s: %d bytes\n", reply->node, len);
1372 
1373 		if (len > 0) {
1374 			reply->replymsg = malloc(len);
1375 			if (!reply->replymsg) {
1376 				reply->status = ENOMEM;
1377 			} else {
1378 				memcpy(reply->replymsg, buf, len);
1379 			}
1380 		} else {
1381 			reply->replymsg = NULL;
1382 		}
1383 		/* Hook it onto the reply chain */
1384 		reply->next = client->bits.localsock.replies;
1385 		client->bits.localsock.replies = reply;
1386 	} else {
1387 		/* It's all gone horribly wrong... */
1388 		pthread_mutex_unlock(&client->bits.localsock.reply_mutex);
1389 		send_local_reply(client, ENOMEM, client->fd);
1390 		return;
1391 	}
1392 	DEBUGLOG("Got %d replies, expecting: %d\n",
1393 		 client->bits.localsock.num_replies + 1,
1394 		 client->bits.localsock.expected_replies);
1395 
1396 	/* If we have the whole lot then do the post-process */
1397 	if (++client->bits.localsock.num_replies ==
1398 	    client->bits.localsock.expected_replies) {
1399 		/* Post-process the command */
1400 		if (client->bits.localsock.threadid) {
1401 			pthread_mutex_lock(&client->bits.localsock.mutex);
1402 			client->bits.localsock.state = POST_COMMAND;
1403 			pthread_cond_signal(&client->bits.localsock.cond);
1404 			pthread_mutex_unlock(&client->bits.localsock.mutex);
1405 		}
1406 	}
1407 	pthread_mutex_unlock(&client->bits.localsock.reply_mutex);
1408 }
1409 
1410 /* This is the thread that runs the PRE and post commands for a particular connection */
1411 static __attribute__ ((noreturn)) void *pre_and_post_thread(void *arg)
1412 {
1413 	struct local_client *client = (struct local_client *) arg;
1414 	int status;
1415 	int write_status;
1416 	sigset_t ss;
1417 	int pipe_fd = client->bits.localsock.pipe;
1418 
1419 	DEBUGLOG("in sub thread: client = %p\n", client);
1420 
1421 	/* Don't start until the LVM thread is ready */
1422 	pthread_mutex_lock(&lvm_start_mutex);
1423 	pthread_mutex_unlock(&lvm_start_mutex);
1424 	DEBUGLOG("Sub thread ready for work.\n");
1425 
1426 	/* Ignore SIGUSR1 (handled by master process) but enable
1427 	   SIGUSR2 (kills subthreads) */
1428 	sigemptyset(&ss);
1429 	sigaddset(&ss, SIGUSR1);
1430 	pthread_sigmask(SIG_BLOCK, &ss, NULL);
1431 
1432 	sigdelset(&ss, SIGUSR1);
1433 	sigaddset(&ss, SIGUSR2);
1434 	pthread_sigmask(SIG_UNBLOCK, &ss, NULL);
1435 
1436 	/* Loop around doing PRE and POST functions until the client goes away */
1437 	while (!client->bits.localsock.finished) {
1438 		/* Execute the code */
1439 		status = do_pre_command(client);
1440 
1441 		if (status)
1442 			client->bits.localsock.all_success = 0;
1443 
1444 		DEBUGLOG("Writing status %d down pipe %d\n", status, pipe_fd);
1445 
1446 		/* Tell the parent process we have finished this bit */
1447 		do {
1448 			write_status = write(pipe_fd, &status, sizeof(int));
1449 			if (write_status == sizeof(int))
1450 				break;
1451 			if (write_status < 0 &&
1452 			    (errno == EINTR || errno == EAGAIN))
1453 				continue;
1454 			log_error("Error sending to pipe: %m\n");
1455 			break;
1456 		} while(1);
1457 
1458 		if (status) {
1459 			client->bits.localsock.state = POST_COMMAND;
1460 			goto next_pre;
1461 		}
1462 
1463 		/* We may need to wait for the condition variable before running the post command */
1464 		pthread_mutex_lock(&client->bits.localsock.mutex);
1465 		DEBUGLOG("Waiting to do post command - state = %d\n",
1466 			 client->bits.localsock.state);
1467 
1468 		if (client->bits.localsock.state != POST_COMMAND) {
1469 			pthread_cond_wait(&client->bits.localsock.cond,
1470 					  &client->bits.localsock.mutex);
1471 		}
1472 		pthread_mutex_unlock(&client->bits.localsock.mutex);
1473 
1474 		DEBUGLOG("Got post command condition...\n");
1475 
1476 		/* POST function must always run, even if the client aborts */
1477 		status = 0;
1478 		do_post_command(client);
1479 
1480 		do {
1481 			write_status = write(pipe_fd, &status, sizeof(int));
1482 			if (write_status == sizeof(int))
1483 				break;
1484 			if (write_status < 0 &&
1485 			    (errno == EINTR || errno == EAGAIN))
1486 				continue;
1487 			log_error("Error sending to pipe: %m\n");
1488 			break;
1489 		} while(1);
1490 next_pre:
1491 		DEBUGLOG("Waiting for next pre command\n");
1492 
1493 		pthread_mutex_lock(&client->bits.localsock.mutex);
1494 		if (client->bits.localsock.state != PRE_COMMAND &&
1495 		    !client->bits.localsock.finished) {
1496 			pthread_cond_wait(&client->bits.localsock.cond,
1497 					  &client->bits.localsock.mutex);
1498 		}
1499 		pthread_mutex_unlock(&client->bits.localsock.mutex);
1500 
1501 		DEBUGLOG("Got pre command condition...\n");
1502 	}
1503 	DEBUGLOG("Subthread finished\n");
1504 	pthread_exit((void *) 0);
1505 }
1506 
1507 /* Process a command on the local node and store the result */
1508 static int process_local_command(struct clvm_header *msg, int msglen,
1509 				 struct local_client *client,
1510 				 unsigned short xid)
1511 {
1512 	char *replybuf = malloc(max_cluster_message);
1513 	int buflen = max_cluster_message - sizeof(struct clvm_header) - 1;
1514 	int replylen = 0;
1515 	int status;
1516 
1517 	DEBUGLOG("process_local_command: %s msg=%p, msglen =%d, client=%p\n",
1518 		 decode_cmd(msg->cmd), msg, msglen, client);
1519 
1520 	if (replybuf == NULL)
1521 		return -1;
1522 
1523 	status = do_command(client, msg, msglen, &replybuf, buflen, &replylen);
1524 
1525 	if (status)
1526 		client->bits.localsock.all_success = 0;
1527 
1528 	/* If we took too long then discard the reply */
1529 	if (xid == client->xid) {
1530 		add_reply_to_list(client, status, our_csid, replybuf, replylen);
1531 	} else {
1532 		DEBUGLOG
1533 		    ("Local command took too long, discarding xid %d, current is %d\n",
1534 		     xid, client->xid);
1535 	}
1536 
1537 	free(replybuf);
1538 	return status;
1539 }
1540 
1541 static int process_reply(const struct clvm_header *msg, int msglen, const char *csid)
1542 {
1543 	struct local_client *client = NULL;
1544 
1545 	client = find_client(msg->clientid);
1546 	if (!client) {
1547 		DEBUGLOG("Got message for unknown client 0x%x\n",
1548 			 msg->clientid);
1549 		log_error("Got message for unknown client 0x%x\n",
1550 			  msg->clientid);
1551 		return -1;
1552 	}
1553 
1554 	if (msg->status)
1555 		client->bits.localsock.all_success = 0;
1556 
1557 	/* Gather replies together for this client id */
1558 	if (msg->xid == client->xid) {
1559 		add_reply_to_list(client, msg->status, csid, msg->args,
1560 				  msg->arglen);
1561 	} else {
1562 		DEBUGLOG("Discarding reply with old XID %d, current = %d\n",
1563 			 msg->xid, client->xid);
1564 	}
1565 	return 0;
1566 }
1567 
1568 /* Send an aggregated reply back to the client */
1569 static void send_local_reply(struct local_client *client, int status, int fd)
1570 {
1571 	struct clvm_header *clientreply;
1572 	struct node_reply *thisreply = client->bits.localsock.replies;
1573 	char *replybuf;
1574 	char *ptr;
1575 	int message_len = 0;
1576 
1577 	DEBUGLOG("Send local reply\n");
1578 
1579 	/* Work out the total size of the reply */
1580 	while (thisreply) {
1581 		if (thisreply->replymsg)
1582 			message_len += strlen(thisreply->replymsg) + 1;
1583 		else
1584 			message_len++;
1585 
1586 		message_len += strlen(thisreply->node) + 1 + sizeof(int);
1587 
1588 		thisreply = thisreply->next;
1589 	}
1590 
1591 	/* Add in the size of our header */
1592 	message_len = message_len + sizeof(struct clvm_header) + 1;
1593 	replybuf = malloc(message_len);
1594 
1595 	clientreply = (struct clvm_header *) replybuf;
1596 	clientreply->status = status;
1597 	clientreply->cmd = CLVMD_CMD_REPLY;
1598 	clientreply->node[0] = '\0';
1599 	clientreply->flags = 0;
1600 
1601 	ptr = clientreply->args;
1602 
1603 	/* Add in all the replies, and free them as we go */
1604 	thisreply = client->bits.localsock.replies;
1605 	while (thisreply) {
1606 		struct node_reply *tempreply = thisreply;
1607 
1608 		strcpy(ptr, thisreply->node);
1609 		ptr += strlen(thisreply->node) + 1;
1610 
1611 		if (thisreply->status)
1612 			clientreply->flags |= CLVMD_FLAG_NODEERRS;
1613 
1614 		memcpy(ptr, &thisreply->status, sizeof(int));
1615 		ptr += sizeof(int);
1616 
1617 		if (thisreply->replymsg) {
1618 			strcpy(ptr, thisreply->replymsg);
1619 			ptr += strlen(thisreply->replymsg) + 1;
1620 		} else {
1621 			ptr[0] = '\0';
1622 			ptr++;
1623 		}
1624 		thisreply = thisreply->next;
1625 
1626 		free(tempreply->replymsg);
1627 		free(tempreply);
1628 	}
1629 
1630 	/* Terminate with an empty node name */
1631 	*ptr = '\0';
1632 
1633 	clientreply->arglen = ptr - clientreply->args + 1;
1634 
1635 	/* And send it */
1636 	send_message(replybuf, message_len, our_csid, fd,
1637 		     "Error sending REPLY to client");
1638 	free(replybuf);
1639 
1640 	/* Reset comms variables */
1641 	client->bits.localsock.replies = NULL;
1642 	client->bits.localsock.expected_replies = 0;
1643 	client->bits.localsock.in_progress = FALSE;
1644 	client->bits.localsock.sent_out = FALSE;
1645 }
1646 
1647 /* Just free a reply chain baceuse it wasn't used. */
1648 static void free_reply(struct local_client *client)
1649 {
1650 	/* Add in all the replies, and free them as we go */
1651 	struct node_reply *thisreply = client->bits.localsock.replies;
1652 	while (thisreply) {
1653 		struct node_reply *tempreply = thisreply;
1654 
1655 		thisreply = thisreply->next;
1656 
1657 		free(tempreply->replymsg);
1658 		free(tempreply);
1659 	}
1660 	client->bits.localsock.replies = NULL;
1661 }
1662 
1663 /* Send our version number to the cluster */
1664 static void send_version_message()
1665 {
1666 	char message[sizeof(struct clvm_header) + sizeof(int) * 3];
1667 	struct clvm_header *msg = (struct clvm_header *) message;
1668 	int version_nums[3];
1669 
1670 	msg->cmd = CLVMD_CMD_VERSION;
1671 	msg->status = 0;
1672 	msg->flags = 0;
1673 	msg->clientid = 0;
1674 	msg->arglen = sizeof(version_nums);
1675 
1676 	version_nums[0] = htonl(CLVMD_MAJOR_VERSION);
1677 	version_nums[1] = htonl(CLVMD_MINOR_VERSION);
1678 	version_nums[2] = htonl(CLVMD_PATCH_VERSION);
1679 
1680 	memcpy(&msg->args, version_nums, sizeof(version_nums));
1681 
1682 	hton_clvm(msg);
1683 
1684 	clops->cluster_send_message(message, sizeof(message), NULL,
1685 			     "Error Sending version number");
1686 }
1687 
1688 /* Send a message to either a local client or another server */
1689 static int send_message(void *buf, int msglen, const char *csid, int fd,
1690 			const char *errtext)
1691 {
1692 	int len = 0;
1693 	int saved_errno = 0;
1694 	struct timespec delay;
1695 	struct timespec remtime;
1696 
1697 	int retry_cnt = 0;
1698 
1699 	/* Send remote messages down the cluster socket */
1700 	if (csid == NULL || !ISLOCAL_CSID(csid)) {
1701 		hton_clvm((struct clvm_header *) buf);
1702 		return clops->cluster_send_message(buf, msglen, csid, errtext);
1703 	} else {
1704 		int ptr = 0;
1705 
1706 		/* Make sure it all goes */
1707 		do {
1708 			if (retry_cnt > MAX_RETRIES)
1709 			{
1710 				errno = saved_errno;
1711 				log_error("%s", errtext);
1712 				errno = saved_errno;
1713 				break;
1714 			}
1715 
1716 			len = write(fd, buf + ptr, msglen - ptr);
1717 
1718 			if (len <= 0) {
1719 				if (errno == EINTR)
1720 					continue;
1721 				if (errno == EAGAIN ||
1722 				    errno == EIO ||
1723 				    errno == ENOSPC) {
1724 					saved_errno = errno;
1725 					retry_cnt++;
1726 
1727 					delay.tv_sec = 0;
1728 					delay.tv_nsec = 100000;
1729 					remtime.tv_sec = 0;
1730 					remtime.tv_nsec = 0;
1731 					(void) nanosleep (&delay, &remtime);
1732 
1733 					continue;
1734 				}
1735 				log_error("%s", errtext);
1736 				break;
1737 			}
1738 			ptr += len;
1739 		} while (ptr < msglen);
1740 	}
1741 	return len;
1742 }
1743 
1744 static int process_work_item(struct lvm_thread_cmd *cmd)
1745 {
1746 	/* If msg is NULL then this is a cleanup request */
1747 	if (cmd->msg == NULL) {
1748 		DEBUGLOG("process_work_item: free fd %d\n", cmd->client->fd);
1749 		cmd_client_cleanup(cmd->client);
1750 		free(cmd->client);
1751 		return 0;
1752 	}
1753 
1754 	if (!cmd->remote) {
1755 		DEBUGLOG("process_work_item: local\n");
1756 		process_local_command(cmd->msg, cmd->msglen, cmd->client,
1757 				      cmd->xid);
1758 	} else {
1759 		DEBUGLOG("process_work_item: remote\n");
1760 		process_remote_command(cmd->msg, cmd->msglen, cmd->client->fd,
1761 				       cmd->csid);
1762 	}
1763 	return 0;
1764 }
1765 
1766 /*
1767  * Routine that runs in the "LVM thread".
1768  */
1769 static __attribute__ ((noreturn)) void *lvm_thread_fn(void *arg)
1770 {
1771 	struct dm_list *cmdl, *tmp;
1772 	sigset_t ss;
1773 	int using_gulm = (int)(long)arg;
1774 
1775 	DEBUGLOG("LVM thread function started\n");
1776 
1777 	/* Ignore SIGUSR1 & 2 */
1778 	sigemptyset(&ss);
1779 	sigaddset(&ss, SIGUSR1);
1780 	sigaddset(&ss, SIGUSR2);
1781 	pthread_sigmask(SIG_BLOCK, &ss, NULL);
1782 
1783 	/* Initialise the interface to liblvm */
1784 	init_lvm(using_gulm);
1785 
1786 	/* Allow others to get moving */
1787 	pthread_mutex_unlock(&lvm_start_mutex);
1788 
1789 	/* Now wait for some actual work */
1790 	for (;;) {
1791 		DEBUGLOG("LVM thread waiting for work\n");
1792 
1793 		pthread_mutex_lock(&lvm_thread_mutex);
1794 		if (dm_list_empty(&lvm_cmd_head))
1795 			pthread_cond_wait(&lvm_thread_cond, &lvm_thread_mutex);
1796 
1797 		dm_list_iterate_safe(cmdl, tmp, &lvm_cmd_head) {
1798 			struct lvm_thread_cmd *cmd;
1799 
1800 			cmd =
1801 			    dm_list_struct_base(cmdl, struct lvm_thread_cmd, list);
1802 			dm_list_del(&cmd->list);
1803 			pthread_mutex_unlock(&lvm_thread_mutex);
1804 
1805 			process_work_item(cmd);
1806 			free(cmd->msg);
1807 			free(cmd);
1808 
1809 			pthread_mutex_lock(&lvm_thread_mutex);
1810 		}
1811 		pthread_mutex_unlock(&lvm_thread_mutex);
1812 	}
1813 }
1814 
1815 /* Pass down some work to the LVM thread */
1816 static int add_to_lvmqueue(struct local_client *client, struct clvm_header *msg,
1817 			   int msglen, const char *csid)
1818 {
1819 	struct lvm_thread_cmd *cmd;
1820 
1821 	cmd = malloc(sizeof(struct lvm_thread_cmd));
1822 	if (!cmd)
1823 		return ENOMEM;
1824 
1825 	if (msglen) {
1826 		cmd->msg = malloc(msglen);
1827 		if (!cmd->msg) {
1828 			log_error("Unable to allocate buffer space\n");
1829 			free(cmd);
1830 			return -1;
1831 		}
1832 		memcpy(cmd->msg, msg, msglen);
1833 	}
1834 	else {
1835 		cmd->msg = NULL;
1836 	}
1837 	cmd->client = client;
1838 	cmd->msglen = msglen;
1839 	cmd->xid = client->xid;
1840 
1841 	if (csid) {
1842 		memcpy(cmd->csid, csid, max_csid_len);
1843 		cmd->remote = 1;
1844 	} else {
1845 		cmd->remote = 0;
1846 	}
1847 
1848 	DEBUGLOG
1849 	    ("add_to_lvmqueue: cmd=%p. client=%p, msg=%p, len=%d, csid=%p, xid=%d\n",
1850 	     cmd, client, msg, msglen, csid, cmd->xid);
1851 	pthread_mutex_lock(&lvm_thread_mutex);
1852 	dm_list_add(&lvm_cmd_head, &cmd->list);
1853 	pthread_cond_signal(&lvm_thread_cond);
1854 	pthread_mutex_unlock(&lvm_thread_mutex);
1855 
1856 	return 0;
1857 }
1858 
1859 /* Return 0 if we can talk to an existing clvmd */
1860 static int check_local_clvmd(void)
1861 {
1862 	int local_socket;
1863 	struct sockaddr_un sockaddr;
1864 	int ret = 0;
1865 
1866 	/* Open local socket */
1867 	if ((local_socket = socket(PF_UNIX, SOCK_STREAM, 0)) < 0) {
1868 		return -1;
1869 	}
1870 
1871 	memset(&sockaddr, 0, sizeof(sockaddr));
1872 	memcpy(sockaddr.sun_path, CLVMD_SOCKNAME, sizeof(CLVMD_SOCKNAME));
1873 	sockaddr.sun_family = AF_UNIX;
1874 
1875 	if (connect(local_socket,(struct sockaddr *) &sockaddr,
1876 		    sizeof(sockaddr))) {
1877 		ret = -1;
1878 	}
1879 
1880 	close(local_socket);
1881 	return ret;
1882 }
1883 
1884 
1885 /* Open the local socket, that's the one we talk to libclvm down */
1886 static int open_local_sock()
1887 {
1888 	int local_socket;
1889 	struct sockaddr_un sockaddr;
1890 
1891 	/* Open local socket */
1892 	if (CLVMD_SOCKNAME[0] != '\0')
1893 		unlink(CLVMD_SOCKNAME);
1894 	local_socket = socket(PF_UNIX, SOCK_STREAM, 0);
1895 	if (local_socket < 0) {
1896 		log_error("Can't create local socket: %m");
1897 		return -1;
1898 	}
1899 	/* Set Close-on-exec & non-blocking */
1900 	fcntl(local_socket, F_SETFD, 1);
1901 	fcntl(local_socket, F_SETFL, fcntl(local_socket, F_GETFL, 0) | O_NONBLOCK);
1902 
1903 	memset(&sockaddr, 0, sizeof(sockaddr));
1904 	memcpy(sockaddr.sun_path, CLVMD_SOCKNAME, sizeof(CLVMD_SOCKNAME));
1905 	sockaddr.sun_family = AF_UNIX;
1906 	if (bind(local_socket, (struct sockaddr *) &sockaddr, sizeof(sockaddr))) {
1907 		log_error("can't bind local socket: %m");
1908 		close(local_socket);
1909 		return -1;
1910 	}
1911 	if (listen(local_socket, 1) != 0) {
1912 		log_error("listen local: %m");
1913 		close(local_socket);
1914 		return -1;
1915 	}
1916 	if (CLVMD_SOCKNAME[0] != '\0')
1917 		chmod(CLVMD_SOCKNAME, 0600);
1918 
1919 	return local_socket;
1920 }
1921 
1922 void process_message(struct local_client *client, const char *buf, int len,
1923 		     const char *csid)
1924 {
1925 	struct clvm_header *inheader;
1926 
1927 	inheader = (struct clvm_header *) buf;
1928 	ntoh_clvm(inheader);	/* Byteswap fields */
1929 	if (inheader->cmd == CLVMD_CMD_REPLY)
1930 		process_reply(inheader, len, csid);
1931 	else
1932 		add_to_lvmqueue(client, inheader, len, csid);
1933 }
1934 
1935 
1936 static void check_all_callback(struct local_client *client, const char *csid,
1937 			       int node_up)
1938 {
1939 	if (!node_up)
1940 		add_reply_to_list(client, EHOSTDOWN, csid, "CLVMD not running",
1941 				  18);
1942 }
1943 
1944 /* Check to see if all CLVMDs are running (ie one on
1945    every node in the cluster).
1946    If not, returns -1 and prints out a list of errant nodes */
1947 static int check_all_clvmds_running(struct local_client *client)
1948 {
1949 	DEBUGLOG("check_all_clvmds_running\n");
1950 	return clops->cluster_do_node_callback(client, check_all_callback);
1951 }
1952 
1953 /* Return a local_client struct given a client ID.
1954    client IDs are in network byte order */
1955 static struct local_client *find_client(int clientid)
1956 {
1957 	struct local_client *thisfd;
1958 	for (thisfd = &local_client_head; thisfd != NULL; thisfd = thisfd->next) {
1959 		if (thisfd->fd == ntohl(clientid))
1960 			return thisfd;
1961 	}
1962 	return NULL;
1963 }
1964 
1965 /* Byte-swapping routines for the header so we
1966    work in a heterogeneous environment */
1967 static void hton_clvm(struct clvm_header *hdr)
1968 {
1969 	hdr->status = htonl(hdr->status);
1970 	hdr->arglen = htonl(hdr->arglen);
1971 	hdr->xid = htons(hdr->xid);
1972 	/* Don't swap clientid as it's only a token as far as
1973 	   remote nodes are concerned */
1974 }
1975 
1976 static void ntoh_clvm(struct clvm_header *hdr)
1977 {
1978 	hdr->status = ntohl(hdr->status);
1979 	hdr->arglen = ntohl(hdr->arglen);
1980 	hdr->xid = ntohs(hdr->xid);
1981 }
1982 
1983 /* Handler for SIGUSR2 - sent to kill subthreads */
1984 static void sigusr2_handler(int sig)
1985 {
1986 	DEBUGLOG("SIGUSR2 received\n");
1987 	return;
1988 }
1989 
1990 static void sigterm_handler(int sig)
1991 {
1992 	DEBUGLOG("SIGTERM received\n");
1993 	quit = 1;
1994 	return;
1995 }
1996 
1997 static void sighup_handler(int sig)
1998 {
1999         DEBUGLOG("got SIGHUP\n");
2000 	reread_config = 1;
2001 }
2002 
2003 int sync_lock(const char *resource, int mode, int flags, int *lockid)
2004 {
2005 	return clops->sync_lock(resource, mode, flags, lockid);
2006 }
2007 
2008 int sync_unlock(const char *resource, int lockid)
2009 {
2010 	return clops->sync_unlock(resource, lockid);
2011 }
2012 
2013