xref: /dflybsd-src/sbin/hammer2/cmd_service.c (revision 9e1c08804a46f1c1a9cd11e190ddba7d2bc4abed)
1 /*
2  * Copyright (c) 2011-2012 The DragonFly Project.  All rights reserved.
3  *
4  * This code is derived from software contributed to The DragonFly Project
5  * by Matthew Dillon <dillon@dragonflybsd.org>
6  * by Venkatesh Srinivas <vsrinivas@dragonflybsd.org>
7  *
8  * Redistribution and use in source and binary forms, with or without
9  * modification, are permitted provided that the following conditions
10  * are met:
11  *
12  * 1. Redistributions of source code must retain the above copyright
13  *    notice, this list of conditions and the following disclaimer.
14  * 2. Redistributions in binary form must reproduce the above copyright
15  *    notice, this list of conditions and the following disclaimer in
16  *    the documentation and/or other materials provided with the
17  *    distribution.
18  * 3. Neither the name of The DragonFly Project nor the names of its
19  *    contributors may be used to endorse or promote products derived
20  *    from this software without specific, prior written permission.
21  *
22  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
23  * ``AS IS'' AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
24  * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
25  * FOR A PARTICULAR PURPOSE ARE DISCLAIMED.  IN NO EVENT SHALL THE
26  * COPYRIGHT HOLDERS OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
27  * INCIDENTAL, SPECIAL, EXEMPLARY OR CONSEQUENTIAL DAMAGES (INCLUDING,
28  * BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
29  * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
30  * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
31  * OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT
32  * OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
33  * SUCH DAMAGE.
34  */
35 
36 #include "hammer2.h"
37 
38 #include <sys/xdiskioctl.h>
39 #include <machine/atomic.h>
40 
41 struct hammer2_media_config {
42 	hammer2_volconf_t	copy_run;
43 	hammer2_volconf_t	copy_pend;
44 	pthread_t		thread;
45 	pthread_cond_t		cond;
46 	int			ctl;
47 	int			fd;
48 	int			pipefd[2];      /* signal stop */
49 	dmsg_iocom_t		iocom;
50 	pthread_t		iocom_thread;
51 	enum { H2MC_STOPPED, H2MC_CONNECT, H2MC_RUNNING } state;
52 };
53 
54 typedef struct hammer2_media_config hammer2_media_config_t;
55 
56 #define H2CONFCTL_STOP          0x00000001
57 #define H2CONFCTL_UPDATE        0x00000002
58 
59 struct diskcon {
60 	TAILQ_ENTRY(diskcon) entry;
61 	char	*disk;
62 };
63 
64 struct service_node_opaque {
65 	char	cl_label[64];
66 	char	fs_label[64];
67 	dmsg_media_block_t block;
68 	int	attached;
69 	int	servicing;
70 	int	servicefd;
71 };
72 
73 struct autoconn {
74 	TAILQ_ENTRY(autoconn) entry;
75 	char	*host;
76 	int	stage;
77 	int	stopme;
78 	int	pipefd[2];	/* {read,write} */
79 	enum { AUTOCONN_INACTIVE, AUTOCONN_ACTIVE } state;
80 	pthread_t thread;
81 };
82 
83 #define WS " \r\n"
84 
85 TAILQ_HEAD(, diskcon) diskconq = TAILQ_HEAD_INITIALIZER(diskconq);
86 static pthread_mutex_t diskmtx;
87 static pthread_mutex_t confmtx;
88 
89 static void *service_thread(void *data);
90 static void *udev_thread(void *data);
91 static void *autoconn_thread(void *data);
92 static void master_reconnect(const char *mntpt);
93 static void disk_reconnect(const char *disk);
94 static void disk_disconnect(void *handle);
95 static void udev_check_disks(void);
96 static void hammer2_usrmsg_handler(dmsg_msg_t *msg, int unmanaged);
97 static void hammer2_node_handler(void **opaque, struct dmsg_msg *msg, int op);
98 static void *hammer2_volconf_thread(void *info);
99 static void hammer2_volconf_signal(dmsg_iocom_t *iocom);
100 static void hammer2_volconf_start(hammer2_media_config_t *conf,
101 			const char *hostname);
102 static void hammer2_volconf_stop(hammer2_media_config_t *conf);
103 
104 
105 static void xdisk_reconnect(struct service_node_opaque *info);
106 static void xdisk_disconnect(void *handle);
107 static void *xdisk_attach_tmpthread(void *data);
108 
109 /*
110  * Start-up the master listener daemon for the machine.  This daemon runs
111  * a UDP discovery protocol, a TCP rendezvous, and scans certain files
112  * and directories for work.
113  *
114  * --
115  *
116  * The only purpose for the UDP discovery protocol is to determine what
117  * other IPs on the LAN are running the hammer2 service daemon.  DNS is not
118  * required to operate, but hostnames (if assigned) must be unique.  If
119  * no hostname is assigned the host's IP is used as the name.  This name
120  * is broadcast along with the mtime of the originator's private key.
121  *
122  * Receiving hammer2 service daemons which are able to match the label against
123  * /etc/hammer2/remote/<label>.pub will initiate a persistent connection
124  * to the target.  Removal of the file will cause a disconnection.  A failed
125  * public key negotiation stops further connection attempts until either the
126  * file is updated or the remote mtime is updated.
127  *
128  * Generally speaking this results in a web of connections, typically a
129  * combination of point-to-point for the more important links and relayed
130  * (spanning tree) for less important or filtered links.
131  *
132  * --
133  *
134  * The TCP listener serves as a rendezvous point in the cluster, accepting
135  * connections, performing registrations and authentications, maintaining
136  * the spanning tree, and keeping track of message state so disconnects can
137  * be handled properly.
138  *
139  * Once authenticated only low-level messaging protocols (which includes
140  * tracking persistent messages) are handled by this daemon.  This daemon
141  * does not run the higher level quorum or locking protocols.
142  *
143  * --
144  *
145  * The file /etc/hammer2/autoconn, if it exists, contains a list of targets
146  * to connect to (which do not have to be on the local lan).  This list will
147  * be retried until a connection can be established.  The file is not usually
148  * needed for linkages local to the LAN.
149  */
150 int
151 cmd_service(void)
152 {
153 	struct sockaddr_in lsin;
154 	int on;
155 	int lfd;
156 
157 	/*
158 	 * Acquire socket and set options
159 	 */
160 	if ((lfd = socket(AF_INET, SOCK_STREAM, 0)) < 0) {
161 		fprintf(stderr, "master_listen: socket(): %s\n",
162 			strerror(errno));
163 		return 1;
164 	}
165 	on = 1;
166 	setsockopt(lfd, SOL_SOCKET, SO_REUSEADDR, &on, sizeof(on));
167 
168 	/*
169 	 * Setup listen port and try to bind.  If the bind fails we assume
170 	 * that a master listener process is already running and silently
171 	 * fail.
172 	 */
173 	bzero(&lsin, sizeof(lsin));
174 	lsin.sin_family = AF_INET;
175 	lsin.sin_addr.s_addr = INADDR_ANY;
176 	lsin.sin_port = htons(DMSG_LISTEN_PORT);
177 	if (bind(lfd, (struct sockaddr *)&lsin, sizeof(lsin)) < 0) {
178 		close(lfd);
179 		if (QuietOpt == 0) {
180 			fprintf(stderr,
181 				"master listen: daemon already running\n");
182 		}
183 		return 0;
184 	}
185 	if (QuietOpt == 0)
186 		fprintf(stderr, "master listen: startup\n");
187 	listen(lfd, 50);
188 
189 	/*
190 	 * Fork and disconnect the controlling terminal and parent process,
191 	 * executing the specified function as a pthread.
192 	 *
193 	 * Returns to the original process which can then continue running.
194 	 * In debug mode this call will create the pthread without forking
195 	 * and set NormalExit to 0, instead of fork.
196 	 */
197 	hammer2_demon(service_thread, (void *)(intptr_t)lfd);
198 	if (NormalExit)
199 		close(lfd);
200 	return 0;
201 }
202 
203 /*
204  * Master listen/accept thread.  Accept connections on the master socket,
205  * starting a pthread for each one.
206  */
207 static
208 void *
209 service_thread(void *data)
210 {
211 	struct sockaddr_in asin;
212 	socklen_t alen;
213 	pthread_t thread;
214 	dmsg_master_service_info_t *info;
215 	int lfd = (int)(intptr_t)data;
216 	int fd;
217 	int i;
218 	int count;
219 	struct statfs *mntbuf = NULL;
220 	struct statvfs *mntvbuf = NULL;
221 
222 	/*
223 	 * Nobody waits for us
224 	 */
225 	setproctitle("hammer2 master listen");
226 	pthread_detach(pthread_self());
227 
228 	/*
229 	 * Start up a thread to handle block device monitoring
230 	 */
231 	thread = NULL;
232 	pthread_create(&thread, NULL, udev_thread, NULL);
233 
234 	/*
235 	 * Start thread to manage /etc/hammer2/autoconn
236 	 */
237 	thread = NULL;
238 	pthread_create(&thread, NULL, autoconn_thread, NULL);
239 
240 	/*
241 	 * Scan existing hammer2 mounts and reconnect to them using
242 	 * HAMMER2IOC_RECLUSTER.
243 	 */
244 	count = getmntvinfo(&mntbuf, &mntvbuf, MNT_NOWAIT);
245 	for (i = 0; i < count; ++i) {
246 		if (strcmp(mntbuf[i].f_fstypename, "hammer2") == 0)
247 			master_reconnect(mntbuf[i].f_mntonname);
248 	}
249 
250 	/*
251 	 * Accept connections and create pthreads to handle them after
252 	 * validating the IP.
253 	 */
254 	for (;;) {
255 		alen = sizeof(asin);
256 		fd = accept(lfd, (struct sockaddr *)&asin, &alen);
257 		if (fd < 0) {
258 			if (errno == EINTR)
259 				continue;
260 			break;
261 		}
262 		thread = NULL;
263 		fprintf(stderr, "service_thread: accept fd %d\n", fd);
264 		info = malloc(sizeof(*info));
265 		bzero(info, sizeof(*info));
266 		info->fd = fd;
267 		info->detachme = 1;
268 		info->usrmsg_callback = hammer2_usrmsg_handler;
269 		info->node_handler = hammer2_node_handler;
270 		info->label = strdup("client");
271 		pthread_create(&thread, NULL, dmsg_master_service, info);
272 	}
273 	return (NULL);
274 }
275 
276 /*
277  * Handle/Monitor the dmsg stream.  If unmanaged is set we are responsible
278  * for responding for the message, otherwise if it is not set libdmsg has
279  * already done some preprocessing and will respond to the message for us
280  * when we return.
281  *
282  * We primarily monitor for VOLCONFs
283  */
284 static
285 void
286 hammer2_usrmsg_handler(dmsg_msg_t *msg, int unmanaged)
287 {
288 	dmsg_state_t *state;
289 	hammer2_media_config_t *conf;
290 	dmsg_lnk_hammer2_volconf_t *msgconf;
291 	int i;
292 
293 	/*
294 	 * Only process messages which are part of a LNK_CONN stream
295 	 */
296 	state = msg->state;
297 	if (state == NULL ||
298 	    (state->rxcmd & DMSGF_BASECMDMASK) != DMSG_LNK_CONN) {
299 		hammer2_shell_parse(msg, unmanaged);
300 		return;
301 	}
302 
303 	switch(msg->tcmd) {
304 	case DMSG_LNK_CONN | DMSGF_CREATE | DMSGF_DELETE:
305 	case DMSG_LNK_CONN | DMSGF_DELETE:
306 	case DMSG_LNK_ERROR | DMSGF_DELETE:
307 		/*
308 		 * Deleting connection, clean out all volume configs
309 		 */
310 		if (state->media == NULL || state->media->usrhandle == NULL)
311 			break;
312 		conf = state->media->usrhandle;
313 		fprintf(stderr, "Shutting down media spans\n");
314 		for (i = 0; i < HAMMER2_COPYID_COUNT; ++i) {
315 			if (conf[i].thread) {
316 				conf[i].ctl = H2CONFCTL_STOP;
317 				pthread_cond_signal(&conf[i].cond);
318 			}
319 		}
320 		for (i = 0; i < HAMMER2_COPYID_COUNT; ++i) {
321 			if (conf[i].thread) {
322 				pthread_join(conf[i].thread, NULL);
323 				conf->thread = NULL;
324 				pthread_cond_destroy(&conf[i].cond);
325 			}
326 		}
327 		state->media->usrhandle = NULL;
328 		free(conf);
329 		break;
330 	case DMSG_LNK_HAMMER2_VOLCONF:
331 		/*
332 		 * One-way volume-configuration message is transmitted
333 		 * over the open LNK_CONN transaction.
334 		 */
335 		fprintf(stderr, "RECEIVED VOLCONF\n");
336 
337 		if ((conf = state->media->usrhandle) == NULL) {
338 			conf = malloc(sizeof(*conf) * HAMMER2_COPYID_COUNT);
339 			bzero(conf, sizeof(*conf) * HAMMER2_COPYID_COUNT);
340 			state->media->usrhandle = conf;
341 		}
342 		msgconf = H2_LNK_VOLCONF(msg);
343 
344 		if (msgconf->index < 0 ||
345 		    msgconf->index >= HAMMER2_COPYID_COUNT) {
346 			fprintf(stderr,
347 				"VOLCONF: ILLEGAL INDEX %d\n",
348 				msgconf->index);
349 			break;
350 		}
351 		if (msgconf->copy.path[sizeof(msgconf->copy.path) - 1] != 0 ||
352 		    msgconf->copy.path[0] == 0) {
353 			fprintf(stderr,
354 				"VOLCONF: ILLEGAL PATH %d\n",
355 				msgconf->index);
356 			break;
357 		}
358 		conf += msgconf->index;
359 		pthread_mutex_lock(&confmtx);
360 		conf->copy_pend = msgconf->copy;
361 		conf->ctl |= H2CONFCTL_UPDATE;
362 		pthread_mutex_unlock(&confmtx);
363 		if (conf->thread == NULL) {
364 			fprintf(stderr, "VOLCONF THREAD STARTED\n");
365 			pthread_cond_init(&conf->cond, NULL);
366 			pthread_create(&conf->thread, NULL,
367 				       hammer2_volconf_thread, (void *)conf);
368 		}
369 		pthread_cond_signal(&conf->cond);
370 		break;
371 	default:
372 		if (unmanaged)
373 			dmsg_msg_reply(msg, DMSG_ERR_NOSUPP);
374 		break;
375 	}
376 }
377 
378 static void *
379 hammer2_volconf_thread(void *info)
380 {
381 	hammer2_media_config_t *conf = info;
382 
383 	pthread_mutex_lock(&confmtx);
384 	while ((conf->ctl & H2CONFCTL_STOP) == 0) {
385 		if (conf->ctl & H2CONFCTL_UPDATE) {
386 			fprintf(stderr, "VOLCONF UPDATE\n");
387 			conf->ctl &= ~H2CONFCTL_UPDATE;
388 			if (bcmp(&conf->copy_run, &conf->copy_pend,
389 				 sizeof(conf->copy_run)) == 0) {
390 				fprintf(stderr, "VOLCONF: no changes\n");
391 				continue;
392 			}
393 			/*
394 			 * XXX TODO - auto reconnect on lookup failure or
395 			 *		connect failure or stream failure.
396 			 */
397 
398 			pthread_mutex_unlock(&confmtx);
399 			hammer2_volconf_stop(conf);
400 			conf->copy_run = conf->copy_pend;
401 			if (conf->copy_run.copyid != 0 &&
402 			    strncmp(conf->copy_run.path, "span:", 5) == 0) {
403 				hammer2_volconf_start(conf,
404 						      conf->copy_run.path + 5);
405 			}
406 			pthread_mutex_lock(&confmtx);
407 			fprintf(stderr, "VOLCONF UPDATE DONE state %d\n", conf->state);
408 		}
409 		if (conf->state == H2MC_CONNECT) {
410 			hammer2_volconf_start(conf, conf->copy_run.path + 5);
411 			pthread_mutex_unlock(&confmtx);
412 			sleep(5);
413 			pthread_mutex_lock(&confmtx);
414 		} else {
415 			pthread_cond_wait(&conf->cond, &confmtx);
416 		}
417 	}
418 	pthread_mutex_unlock(&confmtx);
419 	hammer2_volconf_stop(conf);
420 	return(NULL);
421 }
422 
423 static
424 void
425 hammer2_volconf_start(hammer2_media_config_t *conf, const char *hostname)
426 {
427 	dmsg_master_service_info_t *info;
428 
429 	switch(conf->state) {
430 	case H2MC_STOPPED:
431 	case H2MC_CONNECT:
432 		conf->fd = dmsg_connect(hostname);
433 		if (conf->fd < 0) {
434 			fprintf(stderr, "Unable to connect to %s\n", hostname);
435 			conf->state = H2MC_CONNECT;
436 		} else if (pipe(conf->pipefd) < 0) {
437 			close(conf->fd);
438 			fprintf(stderr, "pipe() failed during volconf\n");
439 			conf->state = H2MC_CONNECT;
440 		} else {
441 			fprintf(stderr, "VOLCONF CONNECT\n");
442 			info = malloc(sizeof(*info));
443 			bzero(info, sizeof(*info));
444 			info->fd = conf->fd;
445 			info->altfd = conf->pipefd[0];
446 			info->altmsg_callback = hammer2_volconf_signal;
447 			info->usrmsg_callback = hammer2_usrmsg_handler;
448 			info->detachme = 0;
449 			conf->state = H2MC_RUNNING;
450 			pthread_create(&conf->iocom_thread, NULL,
451 				       dmsg_master_service, info);
452 		}
453 		break;
454 	case H2MC_RUNNING:
455 		break;
456 	}
457 }
458 
459 static
460 void
461 hammer2_volconf_stop(hammer2_media_config_t *conf)
462 {
463 	switch(conf->state) {
464 	case H2MC_STOPPED:
465 		break;
466 	case H2MC_CONNECT:
467 		conf->state = H2MC_STOPPED;
468 		break;
469 	case H2MC_RUNNING:
470 		close(conf->pipefd[1]);
471 		conf->pipefd[1] = -1;
472 		pthread_join(conf->iocom_thread, NULL);
473 		conf->iocom_thread = NULL;
474 		conf->state = H2MC_STOPPED;
475 		break;
476 	}
477 }
478 
479 static
480 void
481 hammer2_volconf_signal(dmsg_iocom_t *iocom)
482 {
483 	atomic_set_int(&iocom->flags, DMSG_IOCOMF_EOF);
484 }
485 
486 
487 /*
488  * Node discovery code on received SPANs (or loss of SPANs).  This code
489  * is used to track the availability of remote block devices and install
490  * or deinstall them using the xdisk driver (/dev/xdisk).
491  *
492  * An installed xdisk creates /dev/xa%d and /dev/serno/<blah> based on
493  * the data handed to it.  When opened, a virtual circuit is forged and
494  * maintained to the block device server via DMSG.  Temporary failures
495  * stall the device until successfully reconnected or explicitly destroyed.
496  */
497 static
498 void
499 hammer2_node_handler(void **opaquep, struct dmsg_msg *msg, int op)
500 {
501 	struct service_node_opaque *info = *opaquep;
502 
503 	switch(op) {
504 	case DMSG_NODEOP_ADD:
505 		if (msg->any.lnk_span.peer_type != DMSG_PEER_BLOCK)
506 			break;
507 		if (msg->any.lnk_span.pfs_type != DMSG_PFSTYPE_SERVER)
508 			break;
509 		if (info == NULL) {
510 			info = malloc(sizeof(*info));
511 			bzero(info, sizeof(*info));
512 			*opaquep = info;
513 		}
514 		snprintf(info->cl_label, sizeof(info->cl_label),
515 			 "%s", msg->any.lnk_span.cl_label);
516 		snprintf(info->fs_label, sizeof(info->fs_label),
517 			 "%s", msg->any.lnk_span.fs_label);
518 		info->block = msg->any.lnk_span.media.block;
519 		fprintf(stderr, "NODE ADD %s serno %s\n",
520 			info->cl_label, info->fs_label);
521 		info->attached = 1;
522 		xdisk_reconnect(info);
523 		break;
524 	case DMSG_NODEOP_DEL:
525 		if (info) {
526 			fprintf(stderr, "NODE DEL %s serno %s\n",
527 				info->cl_label, info->fs_label);
528 			pthread_mutex_lock(&diskmtx);
529 			*opaquep = NULL;
530 			info->attached = 0;
531 			if (info->servicing == 0)
532 				free(info);
533 			else
534 				shutdown(info->servicefd, SHUT_RDWR);/*XXX*/
535 			pthread_mutex_unlock(&diskmtx);
536 		}
537 		break;
538 	default:
539 		break;
540 	}
541 }
542 
543 /*
544  * Monitor block devices.  Currently polls every ~10 seconds or so.
545  */
546 static
547 void *
548 udev_thread(void *data __unused)
549 {
550 	int	fd;
551 	int	seq = 0;
552 
553 	pthread_detach(pthread_self());
554 
555 	if ((fd = open(UDEV_DEVICE_PATH, O_RDWR)) < 0) {
556 		fprintf(stderr, "udev_thread: unable to open \"%s\"\n",
557 			UDEV_DEVICE_PATH);
558 		pthread_exit(NULL);
559 	}
560 	udev_check_disks();
561 	while (ioctl(fd, UDEVWAIT, &seq) == 0) {
562 		udev_check_disks();
563 		sleep(1);
564 	}
565 	return (NULL);
566 }
567 
568 static void *autoconn_connect_thread(void *data);
569 static void autoconn_disconnect_signal(dmsg_iocom_t *iocom);
570 
571 static
572 void *
573 autoconn_thread(void *data __unused)
574 {
575 	TAILQ_HEAD(, autoconn) autolist;
576 	struct autoconn *ac;
577 	struct autoconn *next;
578 	pthread_t thread;
579 	struct stat st;
580 	time_t	t;
581 	time_t	lmod;
582 	int	found_last;
583 	FILE	*fp;
584 	char	buf[256];
585 
586 	TAILQ_INIT(&autolist);
587 	found_last = 0;
588 	lmod = 0;
589 
590 	pthread_detach(pthread_self());
591 	for (;;) {
592 		/*
593 		 * Polling interval
594 		 */
595 		sleep(5);
596 
597 		/*
598 		 * Poll the file.  Loop up if the synchronized state (lmod)
599 		 * has not changed.
600 		 */
601 		if (stat(HAMMER2_DEFAULT_DIR "/autoconn", &st) == 0) {
602 			if (lmod == st.st_mtime)
603 				continue;
604 			fp = fopen(HAMMER2_DEFAULT_DIR "/autoconn", "r");
605 			if (fp == NULL)
606 				continue;
607 		} else {
608 			if (lmod == 0)
609 				continue;
610 			fp = NULL;
611 		}
612 
613 		/*
614 		 * Wait at least 5 seconds after the file is created or
615 		 * removed.
616 		 *
617 		 * Do not update the synchronized state.
618 		 */
619 		if (fp == NULL && found_last) {
620 			found_last = 0;
621 			continue;
622 		} else if (fp && found_last == 0) {
623 			fclose(fp);
624 			found_last = 1;
625 			continue;
626 		}
627 
628 		/*
629 		 * Don't scan the file until the time progresses past the
630 		 * file's mtime, so we can validate that the file was not
631 		 * further modified during our scan.
632 		 *
633 		 * Do not update the synchronized state.
634 		 */
635 		time(&t);
636 		if (fp) {
637 			if (t == st.st_mtime) {
638 				fclose(fp);
639 				continue;
640 			}
641 			t = st.st_mtime;
642 		} else {
643 			t = 0;
644 		}
645 
646 		/*
647 		 * Set staging to disconnect, then scan the file.
648 		 */
649 		TAILQ_FOREACH(ac, &autolist, entry)
650 			ac->stage = 0;
651 		while (fp && fgets(buf, sizeof(buf), fp) != NULL) {
652 			char *host;
653 
654 			if ((host = strtok(buf, " \t\r\n")) == NULL ||
655 			    host[0] == '#') {
656 				continue;
657 			}
658 			TAILQ_FOREACH(ac, &autolist, entry) {
659 				if (strcmp(host, ac->host) == 0)
660 					break;
661 			}
662 			if (ac == NULL) {
663 				ac = malloc(sizeof(*ac));
664 				bzero(ac, sizeof(*ac));
665 				ac->host = strdup(host);
666 				ac->state = AUTOCONN_INACTIVE;
667 				TAILQ_INSERT_TAIL(&autolist, ac, entry);
668 			}
669 			ac->stage = 1;
670 		}
671 
672 		/*
673 		 * Ignore the scan (and retry again) if the file was
674 		 * modified during the scan.
675 		 *
676 		 * Do not update the synchronized state.
677 		 */
678 		if (fp) {
679 			if (fstat(fileno(fp), &st) < 0) {
680 				fclose(fp);
681 				continue;
682 			}
683 			fclose(fp);
684 			if (t != st.st_mtime)
685 				continue;
686 		}
687 
688 		/*
689 		 * Update the synchronized state and reconfigure the
690 		 * connect list as needed.
691 		 */
692 		lmod = t;
693 		next = TAILQ_FIRST(&autolist);
694 		while ((ac = next) != NULL) {
695 			next = TAILQ_NEXT(ac, entry);
696 
697 			/*
698 			 * Staging, initiate
699 			 */
700 			if (ac->stage && ac->state == AUTOCONN_INACTIVE) {
701 				if (pipe(ac->pipefd) == 0) {
702 					ac->stopme = 0;
703 					ac->state = AUTOCONN_ACTIVE;
704 					thread = NULL;
705 					pthread_create(&thread, NULL,
706 						       autoconn_connect_thread,
707 						       ac);
708 				}
709 			}
710 
711 			/*
712 			 * Unstaging, stop active connection.
713 			 *
714 			 * We write to the pipe which causes the iocom_core
715 			 * to call autoconn_disconnect_signal().
716 			 */
717 			if (ac->stage == 0 &&
718 			    ac->state == AUTOCONN_ACTIVE) {
719 				if (ac->stopme == 0) {
720 					char dummy = 0;
721 					ac->stopme = 1;
722 					write(ac->pipefd[1], &dummy, 1);
723 				}
724 			}
725 
726 			/*
727 			 * Unstaging, delete inactive connection.
728 			 */
729 			if (ac->stage == 0 &&
730 			    ac->state == AUTOCONN_INACTIVE) {
731 				TAILQ_REMOVE(&autolist, ac, entry);
732 				free(ac->host);
733 				free(ac);
734 				continue;
735 			}
736 		}
737 		sleep(5);
738 	}
739 	return(NULL);
740 }
741 
742 static
743 void *
744 autoconn_connect_thread(void *data)
745 {
746 	dmsg_master_service_info_t *info;
747 	struct autoconn *ac;
748 	void *res;
749 	int fd;
750 
751 	ac = data;
752 	pthread_detach(pthread_self());
753 
754 	while (ac->stopme == 0) {
755 		fd = dmsg_connect(ac->host);
756 		if (fd < 0) {
757 			fprintf(stderr, "autoconn: Connect failure: %s\n",
758 				ac->host);
759 			sleep(5);
760 			continue;
761 		}
762 		fprintf(stderr, "autoconn: Connect %s\n", ac->host);
763 
764 		info = malloc(sizeof(*info));
765 		bzero(info, sizeof(*info));
766 		info->fd = fd;
767 		info->altfd = ac->pipefd[0];
768 		info->altmsg_callback = autoconn_disconnect_signal;
769 		info->usrmsg_callback = hammer2_usrmsg_handler;
770 		info->detachme = 0;
771 		info->noclosealt = 1;
772 		pthread_create(&ac->thread, NULL, dmsg_master_service, info);
773 		pthread_join(ac->thread, &res);
774 	}
775 	close(ac->pipefd[0]);
776 	ac->state = AUTOCONN_INACTIVE;
777 	/* auto structure can be ripped out here */
778 	return(NULL);
779 }
780 
781 static
782 void
783 autoconn_disconnect_signal(dmsg_iocom_t *iocom)
784 {
785 	fprintf(stderr, "autoconn: Shutting down socket\n");
786 	atomic_set_int(&iocom->flags, DMSG_IOCOMF_EOF);
787 }
788 
789 /*
790  * Retrieve the list of disk attachments and attempt to export
791  * them.
792  */
793 static
794 void
795 udev_check_disks(void)
796 {
797 	char tmpbuf[1024];
798 	char *buf = NULL;
799 	char *disk;
800 	int error;
801 	size_t n;
802 
803 	for (;;) {
804 		n = 0;
805 		error = sysctlbyname("kern.disks", NULL, &n, NULL, 0);
806 		if (error < 0 || n == 0)
807 			break;
808 		if (n >= sizeof(tmpbuf))
809 			buf = malloc(n + 1);
810 		else
811 			buf = tmpbuf;
812 		error = sysctlbyname("kern.disks", buf, &n, NULL, 0);
813 		if (error == 0) {
814 			buf[n] = 0;
815 			break;
816 		}
817 		if (buf != tmpbuf) {
818 			free(buf);
819 			buf = NULL;
820 		}
821 		if (errno != ENOMEM)
822 			break;
823 	}
824 	if (buf) {
825 		fprintf(stderr, "DISKS: %s\n", buf);
826 		for (disk = strtok(buf, WS); disk; disk = strtok(NULL, WS)) {
827 			disk_reconnect(disk);
828 		}
829 		if (buf != tmpbuf)
830 			free(buf);
831 	}
832 }
833 
834 /*
835  * Normally the mount program supplies a cluster communications
836  * descriptor to the hammer2 vfs on mount, but if you kill the service
837  * daemon and restart it that link will be lost.
838  *
839  * This procedure attempts to [re]connect to existing mounts when
840  * the service daemon is started up before going into its accept
841  * loop.
842  *
843  * NOTE: A hammer2 mount point can only accomodate one connection at a time
844  *	 so this will disconnect any existing connection during the
845  *	 reconnect.
846  */
847 static
848 void
849 master_reconnect(const char *mntpt)
850 {
851 	struct hammer2_ioc_recluster recls;
852 	dmsg_master_service_info_t *info;
853 	pthread_t thread;
854 	int fd;
855 	int pipefds[2];
856 
857 	fd = open(mntpt, O_RDONLY);
858 	if (fd < 0) {
859 		fprintf(stderr, "reconnect %s: no access to mount\n", mntpt);
860 		return;
861 	}
862 	if (pipe(pipefds) < 0) {
863 		fprintf(stderr, "reconnect %s: pipe() failed\n", mntpt);
864 		close(fd);
865 		return;
866 	}
867 	bzero(&recls, sizeof(recls));
868 	recls.fd = pipefds[0];
869 	if (ioctl(fd, HAMMER2IOC_RECLUSTER, &recls) < 0) {
870 		fprintf(stderr, "reconnect %s: ioctl failed\n", mntpt);
871 		close(pipefds[0]);
872 		close(pipefds[1]);
873 		close(fd);
874 		return;
875 	}
876 	close(pipefds[0]);
877 	close(fd);
878 
879 	info = malloc(sizeof(*info));
880 	bzero(info, sizeof(*info));
881 	info->fd = pipefds[1];
882 	info->detachme = 1;
883 	info->usrmsg_callback = hammer2_usrmsg_handler;
884 	info->node_handler = hammer2_node_handler;
885 	info->label = strdup("hammer2");
886 	pthread_create(&thread, NULL, dmsg_master_service, info);
887 }
888 
889 /*
890  * Reconnect a physical disk service to the mesh.
891  */
892 static
893 void
894 disk_reconnect(const char *disk)
895 {
896 	struct disk_ioc_recluster recls;
897 	struct diskcon *dc;
898 	dmsg_master_service_info_t *info;
899 	pthread_t thread;
900 	int fd;
901 	int pipefds[2];
902 	char *path;
903 
904 	/*
905 	 * Urm, this will auto-create mdX+1, just ignore for now.
906 	 * This mechanic needs to be fixed.  It might actually be nice
907 	 * to be able to export md disks.
908 	 */
909 	if (strncmp(disk, "md", 2) == 0)
910 		return;
911 	if (strncmp(disk, "xa", 2) == 0)
912 		return;
913 
914 	/*
915 	 * Check if already connected
916 	 */
917 	pthread_mutex_lock(&diskmtx);
918 	TAILQ_FOREACH(dc, &diskconq, entry) {
919 		if (strcmp(dc->disk, disk) == 0)
920 			break;
921 	}
922 	pthread_mutex_unlock(&diskmtx);
923 	if (dc)
924 		return;
925 
926 	/*
927 	 * Not already connected, create a connection to the kernel
928 	 * disk driver.
929 	 */
930 	asprintf(&path, "/dev/%s", disk);
931 	fd = open(path, O_RDONLY);
932 	if (fd < 0) {
933 		fprintf(stderr, "reconnect %s: no access to disk\n", disk);
934 		free(path);
935 		return;
936 	}
937 	free(path);
938 	if (pipe(pipefds) < 0) {
939 		fprintf(stderr, "reconnect %s: pipe() failed\n", disk);
940 		close(fd);
941 		return;
942 	}
943 	bzero(&recls, sizeof(recls));
944 	recls.fd = pipefds[0];
945 	if (ioctl(fd, DIOCRECLUSTER, &recls) < 0) {
946 		fprintf(stderr, "reconnect %s: ioctl failed\n", disk);
947 		close(pipefds[0]);
948 		close(pipefds[1]);
949 		close(fd);
950 		return;
951 	}
952 	close(pipefds[0]);
953 	close(fd);
954 
955 	dc = malloc(sizeof(*dc));
956 	dc->disk = strdup(disk);
957 	pthread_mutex_lock(&diskmtx);
958 	TAILQ_INSERT_TAIL(&diskconq, dc, entry);
959 	pthread_mutex_unlock(&diskmtx);
960 
961 	info = malloc(sizeof(*info));
962 	bzero(info, sizeof(*info));
963 	info->fd = pipefds[1];
964 	info->detachme = 1;
965 	info->usrmsg_callback = hammer2_usrmsg_handler;
966 	info->node_handler = hammer2_node_handler;
967 	info->exit_callback = disk_disconnect;
968 	info->handle = dc;
969 	info->label = strdup(dc->disk);
970 	pthread_create(&thread, NULL, dmsg_master_service, info);
971 }
972 
973 static
974 void
975 disk_disconnect(void *handle)
976 {
977 	struct diskcon *dc = handle;
978 
979 	fprintf(stderr, "DISK_DISCONNECT %s\n", dc->disk);
980 
981 	pthread_mutex_lock(&diskmtx);
982 	TAILQ_REMOVE(&diskconq, dc, entry);
983 	pthread_mutex_unlock(&diskmtx);
984 	free(dc->disk);
985 	free(dc);
986 }
987 
988 /*
989  * [re]connect a remote disk service to the local system via /dev/xdisk.
990  */
991 static
992 void
993 xdisk_reconnect(struct service_node_opaque *xdisk)
994 {
995 	struct xdisk_attach_ioctl *xaioc;
996 	dmsg_master_service_info_t *info;
997 	pthread_t thread;
998 	int pipefds[2];
999 
1000 	if (pipe(pipefds) < 0) {
1001 		fprintf(stderr, "reconnect %s: pipe() failed\n",
1002 			xdisk->cl_label);
1003 		return;
1004 	}
1005 
1006 	info = malloc(sizeof(*info));
1007 	bzero(info, sizeof(*info));
1008 	info->fd = pipefds[1];
1009 	info->detachme = 1;
1010 	info->usrmsg_callback = hammer2_usrmsg_handler;
1011 	info->node_handler = hammer2_node_handler;
1012 	info->exit_callback = xdisk_disconnect;
1013 	info->handle = xdisk;
1014 	xdisk->servicing = 1;
1015 	xdisk->servicefd = info->fd;
1016 	info->label = strdup(xdisk->cl_label);
1017 	pthread_create(&thread, NULL, dmsg_master_service, info);
1018 
1019 	/*
1020 	 * We have to run the attach in its own pthread because it will
1021 	 * synchronously interact with the messaging subsystem over the
1022 	 * pipe.  If we do it here we will deadlock.
1023 	 */
1024 	xaioc = malloc(sizeof(*xaioc));
1025 	bzero(xaioc, sizeof(*xaioc));
1026 	snprintf(xaioc->cl_label, sizeof(xaioc->cl_label),
1027 		 "%s", xdisk->cl_label);
1028 	snprintf(xaioc->fs_label, sizeof(xaioc->fs_label),
1029 		 "X-%s", xdisk->fs_label);
1030 	xaioc->bytes = xdisk->block.bytes;
1031 	xaioc->blksize = xdisk->block.blksize;
1032 	xaioc->fd = pipefds[0];
1033 
1034 	pthread_create(&thread, NULL, xdisk_attach_tmpthread, xaioc);
1035 }
1036 
1037 static
1038 void *
1039 xdisk_attach_tmpthread(void *data)
1040 {
1041 	struct xdisk_attach_ioctl *xaioc = data;
1042 	int fd;
1043 
1044 	pthread_detach(pthread_self());
1045 
1046 	fd = open("/dev/xdisk", O_RDWR, 0600);
1047 	if (fd < 0) {
1048 		fprintf(stderr, "xdisk_reconnect: Unable to open /dev/xdisk\n");
1049 	}
1050 	if (ioctl(fd, XDISKIOCATTACH, xaioc) < 0) {
1051 		fprintf(stderr, "reconnect %s: xdisk attach failed\n",
1052 			xaioc->cl_label);
1053 	}
1054 	close(xaioc->fd);
1055 	close(fd);
1056 	return (NULL);
1057 }
1058 
1059 static
1060 void
1061 xdisk_disconnect(void *handle)
1062 {
1063 	struct service_node_opaque *info = handle;
1064 
1065 	assert(info->servicing == 1);
1066 
1067 	pthread_mutex_lock(&diskmtx);
1068 	info->servicing = 0;
1069 	if (info->attached == 0)
1070 		free(info);
1071 	pthread_mutex_unlock(&diskmtx);
1072 }
1073