1 /* 2 * Copyright (c) 2011-2012 The DragonFly Project. All rights reserved. 3 * 4 * This code is derived from software contributed to The DragonFly Project 5 * by Matthew Dillon <dillon@dragonflybsd.org> 6 * by Venkatesh Srinivas <vsrinivas@dragonflybsd.org> 7 * 8 * Redistribution and use in source and binary forms, with or without 9 * modification, are permitted provided that the following conditions 10 * are met: 11 * 12 * 1. Redistributions of source code must retain the above copyright 13 * notice, this list of conditions and the following disclaimer. 14 * 2. Redistributions in binary form must reproduce the above copyright 15 * notice, this list of conditions and the following disclaimer in 16 * the documentation and/or other materials provided with the 17 * distribution. 18 * 3. Neither the name of The DragonFly Project nor the names of its 19 * contributors may be used to endorse or promote products derived 20 * from this software without specific, prior written permission. 21 * 22 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS 23 * ``AS IS'' AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT 24 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS 25 * FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE 26 * COPYRIGHT HOLDERS OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, 27 * INCIDENTAL, SPECIAL, EXEMPLARY OR CONSEQUENTIAL DAMAGES (INCLUDING, 28 * BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; 29 * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED 30 * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, 31 * OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT 32 * OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF 33 * SUCH DAMAGE. 34 */ 35 36 #include "hammer2.h" 37 38 struct diskcon { 39 TAILQ_ENTRY(diskcon) entry; 40 char *disk; 41 }; 42 43 #define WS " \r\n" 44 45 TAILQ_HEAD(, diskcon) diskconq = TAILQ_HEAD_INITIALIZER(diskconq); 46 pthread_mutex_t diskmtx; 47 48 static void *service_thread(void *data); 49 static void *udev_thread(void *data); 50 static void master_reconnect(const char *mntpt); 51 static void disk_reconnect(const char *disk); 52 static void disk_disconnect(void *handle); 53 static void udev_check_disks(void); 54 55 /* 56 * Start-up the master listener daemon for the machine. 57 * 58 * The master listener serves as a rendezvous point in the cluster, accepting 59 * connections, performing registrations and authentications, maintaining 60 * the spanning tree, and keeping track of message state so disconnects can 61 * be handled properly. 62 * 63 * Once authenticated only low-level messaging protocols (which includes 64 * tracking persistent messages) are handled by this daemon. This daemon 65 * does not run the higher level quorum or locking protocols. 66 * 67 * This daemon can also be told to maintain connections to other nodes, 68 * forming a messaging backbone, which in turn allows PFS's (if desired) to 69 * simply connect to the master daemon via localhost if desired. 70 * Backbones are specified via /etc/hammer2.conf. 71 */ 72 int 73 cmd_service(void) 74 { 75 struct sockaddr_in lsin; 76 int on; 77 int lfd; 78 79 /* 80 * Acquire socket and set options 81 */ 82 if ((lfd = socket(AF_INET, SOCK_STREAM, 0)) < 0) { 83 fprintf(stderr, "master_listen: socket(): %s\n", 84 strerror(errno)); 85 return 1; 86 } 87 on = 1; 88 setsockopt(lfd, SOL_SOCKET, SO_REUSEADDR, &on, sizeof(on)); 89 90 /* 91 * Setup listen port and try to bind. If the bind fails we assume 92 * that a master listener process is already running and silently 93 * fail. 94 */ 95 bzero(&lsin, sizeof(lsin)); 96 lsin.sin_family = AF_INET; 97 lsin.sin_addr.s_addr = INADDR_ANY; 98 lsin.sin_port = htons(DMSG_LISTEN_PORT); 99 if (bind(lfd, (struct sockaddr *)&lsin, sizeof(lsin)) < 0) { 100 close(lfd); 101 if (QuietOpt == 0) { 102 fprintf(stderr, 103 "master listen: daemon already running\n"); 104 } 105 return 0; 106 } 107 if (QuietOpt == 0) 108 fprintf(stderr, "master listen: startup\n"); 109 listen(lfd, 50); 110 111 /* 112 * Fork and disconnect the controlling terminal and parent process, 113 * executing the specified function as a pthread. 114 * 115 * Returns to the original process which can then continue running. 116 * In debug mode this call will create the pthread without forking 117 * and set NormalExit to 0, instead of fork. 118 */ 119 hammer2_demon(service_thread, (void *)(intptr_t)lfd); 120 if (NormalExit) 121 close(lfd); 122 return 0; 123 } 124 125 /* 126 * Master listen/accept thread. Accept connections on the master socket, 127 * starting a pthread for each one. 128 */ 129 static 130 void * 131 service_thread(void *data) 132 { 133 struct sockaddr_in asin; 134 socklen_t alen; 135 pthread_t thread; 136 dmsg_master_service_info_t *info; 137 int lfd = (int)(intptr_t)data; 138 int fd; 139 int i; 140 int count; 141 struct statfs *mntbuf = NULL; 142 struct statvfs *mntvbuf = NULL; 143 144 /* 145 * Nobody waits for us 146 */ 147 setproctitle("hammer2 master listen"); 148 pthread_detach(pthread_self()); 149 150 /* 151 * Start up a thread to handle block device monitoring 152 */ 153 thread = NULL; 154 pthread_create(&thread, NULL, udev_thread, NULL); 155 156 /* 157 * Scan existing hammer2 mounts and reconnect to them using 158 * HAMMER2IOC_RECLUSTER. 159 */ 160 count = getmntvinfo(&mntbuf, &mntvbuf, MNT_NOWAIT); 161 for (i = 0; i < count; ++i) { 162 if (strcmp(mntbuf[i].f_fstypename, "hammer2") == 0) 163 master_reconnect(mntbuf[i].f_mntonname); 164 } 165 166 /* 167 * Accept connections and create pthreads to handle them after 168 * validating the IP. 169 */ 170 for (;;) { 171 alen = sizeof(asin); 172 fd = accept(lfd, (struct sockaddr *)&asin, &alen); 173 if (fd < 0) { 174 if (errno == EINTR) 175 continue; 176 break; 177 } 178 thread = NULL; 179 fprintf(stderr, "service_thread: accept fd %d\n", fd); 180 info = malloc(sizeof(*info)); 181 bzero(info, sizeof(*info)); 182 info->fd = fd; 183 info->detachme = 1; 184 info->dbgmsg_callback = hammer2_shell_parse; 185 pthread_create(&thread, NULL, dmsg_master_service, info); 186 } 187 return (NULL); 188 } 189 190 /* 191 * Monitor block devices. Currently polls every ~10 seconds or so. 192 */ 193 static 194 void * 195 udev_thread(void *data __unused) 196 { 197 int fd; 198 int seq = 0; 199 200 pthread_detach(pthread_self()); 201 202 if ((fd = open(UDEV_DEVICE_PATH, O_RDWR)) < 0) { 203 fprintf(stderr, "udev_thread: unable to open \"%s\"\n", 204 UDEV_DEVICE_PATH); 205 pthread_exit(NULL); 206 } 207 udev_check_disks(); 208 while (ioctl(fd, UDEVWAIT, &seq) == 0) { 209 udev_check_disks(); 210 sleep(1); 211 } 212 return (NULL); 213 } 214 215 /* 216 * Retrieve the list of disk attachments and attempt to export 217 * them. 218 */ 219 static 220 void 221 udev_check_disks(void) 222 { 223 char tmpbuf[1024]; 224 char *buf = NULL; 225 char *disk; 226 int error; 227 size_t n; 228 229 for (;;) { 230 n = 0; 231 error = sysctlbyname("kern.disks", NULL, &n, NULL, 0); 232 if (error < 0 || n == 0) 233 break; 234 if (n >= sizeof(tmpbuf)) 235 buf = malloc(n + 1); 236 else 237 buf = tmpbuf; 238 error = sysctlbyname("kern.disks", buf, &n, NULL, 0); 239 if (error == 0) { 240 buf[n] = 0; 241 break; 242 } 243 if (buf != tmpbuf) { 244 free(buf); 245 buf = NULL; 246 } 247 if (errno != ENOMEM) 248 break; 249 } 250 if (buf) { 251 fprintf(stderr, "DISKS: %s\n", buf); 252 for (disk = strtok(buf, WS); disk; disk = strtok(NULL, WS)) { 253 disk_reconnect(disk); 254 } 255 if (buf != tmpbuf) 256 free(buf); 257 } 258 } 259 260 /* 261 * Normally the mount program supplies a cluster communications 262 * descriptor to the hammer2 vfs on mount, but if you kill the service 263 * daemon and restart it that link will be lost. 264 * 265 * This procedure attempts to [re]connect to existing mounts when 266 * the service daemon is started up before going into its accept 267 * loop. 268 * 269 * NOTE: A hammer2 mount point can only accomodate one connection at a time 270 * so this will disconnect any existing connection during the 271 * reconnect. 272 */ 273 static 274 void 275 master_reconnect(const char *mntpt) 276 { 277 struct hammer2_ioc_recluster recls; 278 dmsg_master_service_info_t *info; 279 pthread_t thread; 280 int fd; 281 int pipefds[2]; 282 283 fd = open(mntpt, O_RDONLY); 284 if (fd < 0) { 285 fprintf(stderr, "reconnect %s: no access to mount\n", mntpt); 286 return; 287 } 288 if (pipe(pipefds) < 0) { 289 fprintf(stderr, "reconnect %s: pipe() failed\n", mntpt); 290 close(fd); 291 return; 292 } 293 bzero(&recls, sizeof(recls)); 294 recls.fd = pipefds[0]; 295 if (ioctl(fd, HAMMER2IOC_RECLUSTER, &recls) < 0) { 296 fprintf(stderr, "reconnect %s: ioctl failed\n", mntpt); 297 close(pipefds[0]); 298 close(pipefds[1]); 299 close(fd); 300 return; 301 } 302 close(pipefds[0]); 303 close(fd); 304 305 info = malloc(sizeof(*info)); 306 bzero(info, sizeof(*info)); 307 info->fd = pipefds[1]; 308 info->detachme = 1; 309 info->dbgmsg_callback = hammer2_shell_parse; 310 pthread_create(&thread, NULL, dmsg_master_service, info); 311 } 312 313 /* 314 * Reconnect a physical disk to the mesh. 315 */ 316 static 317 void 318 disk_reconnect(const char *disk) 319 { 320 struct disk_ioc_recluster recls; 321 struct diskcon *dc; 322 dmsg_master_service_info_t *info; 323 pthread_t thread; 324 int fd; 325 int pipefds[2]; 326 char *path; 327 328 /* 329 * Urm, this will auto-create mdX+1, just ignore for now. 330 * This mechanic needs to be fixed. It might actually be nice 331 * to be able to export md disks. 332 */ 333 if (strncmp(disk, "md", 2) == 0) 334 return; 335 336 /* 337 * Check if already connected 338 */ 339 pthread_mutex_lock(&diskmtx); 340 TAILQ_FOREACH(dc, &diskconq, entry) { 341 if (strcmp(dc->disk, disk) == 0) 342 break; 343 } 344 pthread_mutex_unlock(&diskmtx); 345 if (dc) 346 return; 347 348 /* 349 * Not already connected, create a connection to the kernel 350 * disk driver. 351 */ 352 asprintf(&path, "/dev/%s", disk); 353 fd = open(path, O_RDONLY); 354 if (fd < 0) { 355 fprintf(stderr, "reconnect %s: no access to disk\n", disk); 356 free(path); 357 return; 358 } 359 free(path); 360 if (pipe(pipefds) < 0) { 361 fprintf(stderr, "reconnect %s: pipe() failed\n", disk); 362 close(fd); 363 return; 364 } 365 bzero(&recls, sizeof(recls)); 366 recls.fd = pipefds[0]; 367 if (ioctl(fd, DIOCRECLUSTER, &recls) < 0) { 368 fprintf(stderr, "reconnect %s: ioctl failed\n", disk); 369 close(pipefds[0]); 370 close(pipefds[1]); 371 close(fd); 372 return; 373 } 374 close(pipefds[0]); 375 close(fd); 376 377 dc = malloc(sizeof(*dc)); 378 dc->disk = strdup(disk); 379 pthread_mutex_lock(&diskmtx); 380 TAILQ_INSERT_TAIL(&diskconq, dc, entry); 381 pthread_mutex_unlock(&diskmtx); 382 383 info = malloc(sizeof(*info)); 384 bzero(info, sizeof(*info)); 385 info->fd = pipefds[1]; 386 info->detachme = 1; 387 info->dbgmsg_callback = hammer2_shell_parse; 388 info->exit_callback = disk_disconnect; 389 info->handle = dc; 390 pthread_create(&thread, NULL, dmsg_master_service, info); 391 } 392 393 static 394 void 395 disk_disconnect(void *handle) 396 { 397 struct diskcon *dc = handle; 398 399 fprintf(stderr, "DISK_DISCONNECT %s\n", dc->disk); 400 401 pthread_mutex_lock(&diskmtx); 402 TAILQ_REMOVE(&diskconq, dc, entry); 403 pthread_mutex_unlock(&diskmtx); 404 free(dc->disk); 405 free(dc); 406 } 407