xref: /dflybsd-src/sbin/hammer2/cmd_service.c (revision fda7d3889b1114d34ad3a52a7257a2b80fe24e4c)
1 /*
2  * Copyright (c) 2011-2012 The DragonFly Project.  All rights reserved.
3  *
4  * This code is derived from software contributed to The DragonFly Project
5  * by Matthew Dillon <dillon@dragonflybsd.org>
6  * by Venkatesh Srinivas <vsrinivas@dragonflybsd.org>
7  *
8  * Redistribution and use in source and binary forms, with or without
9  * modification, are permitted provided that the following conditions
10  * are met:
11  *
12  * 1. Redistributions of source code must retain the above copyright
13  *    notice, this list of conditions and the following disclaimer.
14  * 2. Redistributions in binary form must reproduce the above copyright
15  *    notice, this list of conditions and the following disclaimer in
16  *    the documentation and/or other materials provided with the
17  *    distribution.
18  * 3. Neither the name of The DragonFly Project nor the names of its
19  *    contributors may be used to endorse or promote products derived
20  *    from this software without specific, prior written permission.
21  *
22  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
23  * ``AS IS'' AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
24  * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
25  * FOR A PARTICULAR PURPOSE ARE DISCLAIMED.  IN NO EVENT SHALL THE
26  * COPYRIGHT HOLDERS OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
27  * INCIDENTAL, SPECIAL, EXEMPLARY OR CONSEQUENTIAL DAMAGES (INCLUDING,
28  * BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
29  * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
30  * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
31  * OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT
32  * OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
33  * SUCH DAMAGE.
34  */
35 
36 #include "hammer2.h"
37 
38 struct diskcon {
39 	TAILQ_ENTRY(diskcon) entry;
40 	char	*disk;
41 };
42 
43 #define WS " \r\n"
44 
45 TAILQ_HEAD(, diskcon) diskconq = TAILQ_HEAD_INITIALIZER(diskconq);
46 pthread_mutex_t diskmtx;
47 
48 static void *service_thread(void *data);
49 static void *udev_thread(void *data);
50 static void master_reconnect(const char *mntpt);
51 static void disk_reconnect(const char *disk);
52 static void disk_disconnect(void *handle);
53 static void udev_check_disks(void);
54 
55 /*
56  * Start-up the master listener daemon for the machine.
57  *
58  * The master listener serves as a rendezvous point in the cluster, accepting
59  * connections, performing registrations and authentications, maintaining
60  * the spanning tree, and keeping track of message state so disconnects can
61  * be handled properly.
62  *
63  * Once authenticated only low-level messaging protocols (which includes
64  * tracking persistent messages) are handled by this daemon.  This daemon
65  * does not run the higher level quorum or locking protocols.
66  *
67  * This daemon can also be told to maintain connections to other nodes,
68  * forming a messaging backbone, which in turn allows PFS's (if desired) to
69  * simply connect to the master daemon via localhost if desired.
70  * Backbones are specified via /etc/hammer2.conf.
71  */
72 int
73 cmd_service(void)
74 {
75 	struct sockaddr_in lsin;
76 	int on;
77 	int lfd;
78 
79 	/*
80 	 * Acquire socket and set options
81 	 */
82 	if ((lfd = socket(AF_INET, SOCK_STREAM, 0)) < 0) {
83 		fprintf(stderr, "master_listen: socket(): %s\n",
84 			strerror(errno));
85 		return 1;
86 	}
87 	on = 1;
88 	setsockopt(lfd, SOL_SOCKET, SO_REUSEADDR, &on, sizeof(on));
89 
90 	/*
91 	 * Setup listen port and try to bind.  If the bind fails we assume
92 	 * that a master listener process is already running and silently
93 	 * fail.
94 	 */
95 	bzero(&lsin, sizeof(lsin));
96 	lsin.sin_family = AF_INET;
97 	lsin.sin_addr.s_addr = INADDR_ANY;
98 	lsin.sin_port = htons(DMSG_LISTEN_PORT);
99 	if (bind(lfd, (struct sockaddr *)&lsin, sizeof(lsin)) < 0) {
100 		close(lfd);
101 		if (QuietOpt == 0) {
102 			fprintf(stderr,
103 				"master listen: daemon already running\n");
104 		}
105 		return 0;
106 	}
107 	if (QuietOpt == 0)
108 		fprintf(stderr, "master listen: startup\n");
109 	listen(lfd, 50);
110 
111 	/*
112 	 * Fork and disconnect the controlling terminal and parent process,
113 	 * executing the specified function as a pthread.
114 	 *
115 	 * Returns to the original process which can then continue running.
116 	 * In debug mode this call will create the pthread without forking
117 	 * and set NormalExit to 0, instead of fork.
118 	 */
119 	hammer2_demon(service_thread, (void *)(intptr_t)lfd);
120 	if (NormalExit)
121 		close(lfd);
122 	return 0;
123 }
124 
125 /*
126  * Master listen/accept thread.  Accept connections on the master socket,
127  * starting a pthread for each one.
128  */
129 static
130 void *
131 service_thread(void *data)
132 {
133 	struct sockaddr_in asin;
134 	socklen_t alen;
135 	pthread_t thread;
136 	dmsg_master_service_info_t *info;
137 	int lfd = (int)(intptr_t)data;
138 	int fd;
139 	int i;
140 	int count;
141 	struct statfs *mntbuf = NULL;
142 	struct statvfs *mntvbuf = NULL;
143 
144 	/*
145 	 * Nobody waits for us
146 	 */
147 	setproctitle("hammer2 master listen");
148 	pthread_detach(pthread_self());
149 
150 	/*
151 	 * Start up a thread to handle block device monitoring
152 	 */
153 	thread = NULL;
154 	pthread_create(&thread, NULL, udev_thread, NULL);
155 
156 	/*
157 	 * Scan existing hammer2 mounts and reconnect to them using
158 	 * HAMMER2IOC_RECLUSTER.
159 	 */
160 	count = getmntvinfo(&mntbuf, &mntvbuf, MNT_NOWAIT);
161 	for (i = 0; i < count; ++i) {
162 		if (strcmp(mntbuf[i].f_fstypename, "hammer2") == 0)
163 			master_reconnect(mntbuf[i].f_mntonname);
164 	}
165 
166 	/*
167 	 * Accept connections and create pthreads to handle them after
168 	 * validating the IP.
169 	 */
170 	for (;;) {
171 		alen = sizeof(asin);
172 		fd = accept(lfd, (struct sockaddr *)&asin, &alen);
173 		if (fd < 0) {
174 			if (errno == EINTR)
175 				continue;
176 			break;
177 		}
178 		thread = NULL;
179 		fprintf(stderr, "service_thread: accept fd %d\n", fd);
180 		info = malloc(sizeof(*info));
181 		bzero(info, sizeof(*info));
182 		info->fd = fd;
183 		info->detachme = 1;
184 		info->dbgmsg_callback = hammer2_shell_parse;
185 		pthread_create(&thread, NULL, dmsg_master_service, info);
186 	}
187 	return (NULL);
188 }
189 
190 /*
191  * Monitor block devices.  Currently polls every ~10 seconds or so.
192  */
193 static
194 void *
195 udev_thread(void *data __unused)
196 {
197 	int	fd;
198 	int	seq = 0;
199 
200 	pthread_detach(pthread_self());
201 
202 	if ((fd = open(UDEV_DEVICE_PATH, O_RDWR)) < 0) {
203 		fprintf(stderr, "udev_thread: unable to open \"%s\"\n",
204 			UDEV_DEVICE_PATH);
205 		pthread_exit(NULL);
206 	}
207 	udev_check_disks();
208 	while (ioctl(fd, UDEVWAIT, &seq) == 0) {
209 		udev_check_disks();
210 		sleep(1);
211 	}
212 	return (NULL);
213 }
214 
215 /*
216  * Retrieve the list of disk attachments and attempt to export
217  * them.
218  */
219 static
220 void
221 udev_check_disks(void)
222 {
223 	char tmpbuf[1024];
224 	char *buf = NULL;
225 	char *disk;
226 	int error;
227 	size_t n;
228 
229 	for (;;) {
230 		n = 0;
231 		error = sysctlbyname("kern.disks", NULL, &n, NULL, 0);
232 		if (error < 0 || n == 0)
233 			break;
234 		if (n >= sizeof(tmpbuf))
235 			buf = malloc(n + 1);
236 		else
237 			buf = tmpbuf;
238 		error = sysctlbyname("kern.disks", buf, &n, NULL, 0);
239 		if (error == 0) {
240 			buf[n] = 0;
241 			break;
242 		}
243 		if (buf != tmpbuf) {
244 			free(buf);
245 			buf = NULL;
246 		}
247 		if (errno != ENOMEM)
248 			break;
249 	}
250 	if (buf) {
251 		fprintf(stderr, "DISKS: %s\n", buf);
252 		for (disk = strtok(buf, WS); disk; disk = strtok(NULL, WS)) {
253 			disk_reconnect(disk);
254 		}
255 		if (buf != tmpbuf)
256 			free(buf);
257 	}
258 }
259 
260 /*
261  * Normally the mount program supplies a cluster communications
262  * descriptor to the hammer2 vfs on mount, but if you kill the service
263  * daemon and restart it that link will be lost.
264  *
265  * This procedure attempts to [re]connect to existing mounts when
266  * the service daemon is started up before going into its accept
267  * loop.
268  *
269  * NOTE: A hammer2 mount point can only accomodate one connection at a time
270  *	 so this will disconnect any existing connection during the
271  *	 reconnect.
272  */
273 static
274 void
275 master_reconnect(const char *mntpt)
276 {
277 	struct hammer2_ioc_recluster recls;
278 	dmsg_master_service_info_t *info;
279 	pthread_t thread;
280 	int fd;
281 	int pipefds[2];
282 
283 	fd = open(mntpt, O_RDONLY);
284 	if (fd < 0) {
285 		fprintf(stderr, "reconnect %s: no access to mount\n", mntpt);
286 		return;
287 	}
288 	if (pipe(pipefds) < 0) {
289 		fprintf(stderr, "reconnect %s: pipe() failed\n", mntpt);
290 		close(fd);
291 		return;
292 	}
293 	bzero(&recls, sizeof(recls));
294 	recls.fd = pipefds[0];
295 	if (ioctl(fd, HAMMER2IOC_RECLUSTER, &recls) < 0) {
296 		fprintf(stderr, "reconnect %s: ioctl failed\n", mntpt);
297 		close(pipefds[0]);
298 		close(pipefds[1]);
299 		close(fd);
300 		return;
301 	}
302 	close(pipefds[0]);
303 	close(fd);
304 
305 	info = malloc(sizeof(*info));
306 	bzero(info, sizeof(*info));
307 	info->fd = pipefds[1];
308 	info->detachme = 1;
309 	info->dbgmsg_callback = hammer2_shell_parse;
310 	pthread_create(&thread, NULL, dmsg_master_service, info);
311 }
312 
313 /*
314  * Reconnect a physical disk to the mesh.
315  */
316 static
317 void
318 disk_reconnect(const char *disk)
319 {
320 	struct disk_ioc_recluster recls;
321 	struct diskcon *dc;
322 	dmsg_master_service_info_t *info;
323 	pthread_t thread;
324 	int fd;
325 	int pipefds[2];
326 	char *path;
327 
328 	/*
329 	 * Urm, this will auto-create mdX+1, just ignore for now.
330 	 * This mechanic needs to be fixed.  It might actually be nice
331 	 * to be able to export md disks.
332 	 */
333 	if (strncmp(disk, "md", 2) == 0)
334 		return;
335 
336 	/*
337 	 * Check if already connected
338 	 */
339 	pthread_mutex_lock(&diskmtx);
340 	TAILQ_FOREACH(dc, &diskconq, entry) {
341 		if (strcmp(dc->disk, disk) == 0)
342 			break;
343 	}
344 	pthread_mutex_unlock(&diskmtx);
345 	if (dc)
346 		return;
347 
348 	/*
349 	 * Not already connected, create a connection to the kernel
350 	 * disk driver.
351 	 */
352 	asprintf(&path, "/dev/%s", disk);
353 	fd = open(path, O_RDONLY);
354 	if (fd < 0) {
355 		fprintf(stderr, "reconnect %s: no access to disk\n", disk);
356 		free(path);
357 		return;
358 	}
359 	free(path);
360 	if (pipe(pipefds) < 0) {
361 		fprintf(stderr, "reconnect %s: pipe() failed\n", disk);
362 		close(fd);
363 		return;
364 	}
365 	bzero(&recls, sizeof(recls));
366 	recls.fd = pipefds[0];
367 	if (ioctl(fd, DIOCRECLUSTER, &recls) < 0) {
368 		fprintf(stderr, "reconnect %s: ioctl failed\n", disk);
369 		close(pipefds[0]);
370 		close(pipefds[1]);
371 		close(fd);
372 		return;
373 	}
374 	close(pipefds[0]);
375 	close(fd);
376 
377 	dc = malloc(sizeof(*dc));
378 	dc->disk = strdup(disk);
379 	pthread_mutex_lock(&diskmtx);
380 	TAILQ_INSERT_TAIL(&diskconq, dc, entry);
381 	pthread_mutex_unlock(&diskmtx);
382 
383 	info = malloc(sizeof(*info));
384 	bzero(info, sizeof(*info));
385 	info->fd = pipefds[1];
386 	info->detachme = 1;
387 	info->dbgmsg_callback = hammer2_shell_parse;
388 	info->exit_callback = disk_disconnect;
389 	info->handle = dc;
390 	pthread_create(&thread, NULL, dmsg_master_service, info);
391 }
392 
393 static
394 void
395 disk_disconnect(void *handle)
396 {
397 	struct diskcon *dc = handle;
398 
399 	fprintf(stderr, "DISK_DISCONNECT %s\n", dc->disk);
400 
401 	pthread_mutex_lock(&diskmtx);
402 	TAILQ_REMOVE(&diskconq, dc, entry);
403 	pthread_mutex_unlock(&diskmtx);
404 	free(dc->disk);
405 	free(dc);
406 }
407