1 /*- 2 * Copyright (c) 2018 The FreeBSD Foundation 3 * 4 * This software was developed by Mark Johnston under sponsorship from 5 * the FreeBSD Foundation. 6 * 7 * Redistribution and use in source and binary forms, with or without 8 * modification, are permitted provided that the following conditions are 9 * met: 10 * 1. Redistributions of source code must retain the above copyright 11 * notice, this list of conditions and the following disclaimer. 12 * 2. Redistributions in binary form must reproduce the above copyright 13 * notice, this list of conditions and the following disclaimer in 14 * the documentation and/or other materials provided with the 15 * distribution. 16 * 17 * THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND 18 * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE 19 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE 20 * ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE 21 * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL 22 * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS 23 * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) 24 * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT 25 * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY 26 * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF 27 * SUCH DAMAGE. 28 */ 29 30 #include <sys/param.h> 31 #include <sys/event.h> 32 #include <sys/socket.h> 33 34 #include <netinet/in.h> 35 #include <netinet/tcp.h> 36 37 #include <err.h> 38 #include <errno.h> 39 #include <pthread.h> 40 #include <stdatomic.h> 41 #include <stdlib.h> 42 #include <unistd.h> 43 44 #include <atf-c.h> 45 46 /* 47 * Given an array of non-blocking listening sockets configured in a LB group 48 * for "addr", try connecting to "addr" in a loop and verify that connections 49 * are roughly balanced across the sockets. 50 */ 51 static void 52 lb_simple_accept_loop(int domain, const struct sockaddr *addr, int sds[], 53 size_t nsds, int nconns) 54 { 55 size_t i; 56 int *acceptcnt; 57 int csd, error, excnt, sd; 58 const struct linger lopt = { 1, 0 }; 59 60 /* 61 * We expect each listening socket to accept roughly nconns/nsds 62 * connections, but allow for some error. 63 */ 64 excnt = nconns / nsds / 8; 65 acceptcnt = calloc(nsds, sizeof(*acceptcnt)); 66 ATF_REQUIRE_MSG(acceptcnt != NULL, "calloc() failed: %s", 67 strerror(errno)); 68 69 while (nconns-- > 0) { 70 sd = socket(domain, SOCK_STREAM, 0); 71 ATF_REQUIRE_MSG(sd >= 0, "socket() failed: %s", 72 strerror(errno)); 73 74 error = connect(sd, addr, addr->sa_len); 75 ATF_REQUIRE_MSG(error == 0, "connect() failed: %s", 76 strerror(errno)); 77 78 error = setsockopt(sd, SOL_SOCKET, SO_LINGER, &lopt, sizeof(lopt)); 79 ATF_REQUIRE_MSG(error == 0, "Setting linger failed: %s", 80 strerror(errno)); 81 82 /* 83 * Poll the listening sockets. 84 */ 85 do { 86 for (i = 0; i < nsds; i++) { 87 csd = accept(sds[i], NULL, NULL); 88 if (csd < 0) { 89 ATF_REQUIRE_MSG(errno == EWOULDBLOCK || 90 errno == EAGAIN, 91 "accept() failed: %s", 92 strerror(errno)); 93 continue; 94 } 95 96 error = close(csd); 97 ATF_REQUIRE_MSG(error == 0, 98 "close() failed: %s", strerror(errno)); 99 100 acceptcnt[i]++; 101 break; 102 } 103 } while (i == nsds); 104 105 error = close(sd); 106 ATF_REQUIRE_MSG(error == 0, "close() failed: %s", 107 strerror(errno)); 108 } 109 110 for (i = 0; i < nsds; i++) 111 ATF_REQUIRE_MSG(acceptcnt[i] > excnt, "uneven balancing"); 112 } 113 114 static int 115 lb_listen_socket(int domain, int flags) 116 { 117 int one; 118 int error, sd; 119 120 sd = socket(domain, SOCK_STREAM | flags, 0); 121 ATF_REQUIRE_MSG(sd >= 0, "socket() failed: %s", strerror(errno)); 122 123 one = 1; 124 error = setsockopt(sd, SOL_SOCKET, SO_REUSEPORT_LB, &one, sizeof(one)); 125 ATF_REQUIRE_MSG(error == 0, "setsockopt(SO_REUSEPORT_LB) failed: %s", 126 strerror(errno)); 127 128 return (sd); 129 } 130 131 ATF_TC_WITHOUT_HEAD(basic_ipv4); 132 ATF_TC_BODY(basic_ipv4, tc) 133 { 134 struct sockaddr_in addr; 135 socklen_t slen; 136 size_t i; 137 const int nconns = 16384; 138 int error, sds[16]; 139 uint16_t port; 140 141 sds[0] = lb_listen_socket(PF_INET, SOCK_NONBLOCK); 142 143 memset(&addr, 0, sizeof(addr)); 144 addr.sin_len = sizeof(addr); 145 addr.sin_family = AF_INET; 146 addr.sin_port = htons(0); 147 addr.sin_addr.s_addr = htonl(INADDR_LOOPBACK); 148 error = bind(sds[0], (const struct sockaddr *)&addr, sizeof(addr)); 149 ATF_REQUIRE_MSG(error == 0, "bind() failed: %s", strerror(errno)); 150 error = listen(sds[0], 1); 151 ATF_REQUIRE_MSG(error == 0, "listen() failed: %s", strerror(errno)); 152 153 slen = sizeof(addr); 154 error = getsockname(sds[0], (struct sockaddr *)&addr, &slen); 155 ATF_REQUIRE_MSG(error == 0, "getsockname() failed: %s", 156 strerror(errno)); 157 ATF_REQUIRE_MSG(slen == sizeof(addr), "sockaddr size changed"); 158 port = addr.sin_port; 159 160 memset(&addr, 0, sizeof(addr)); 161 addr.sin_len = sizeof(addr); 162 addr.sin_family = AF_INET; 163 addr.sin_port = port; 164 addr.sin_addr.s_addr = htonl(INADDR_LOOPBACK); 165 for (i = 1; i < nitems(sds); i++) { 166 sds[i] = lb_listen_socket(PF_INET, SOCK_NONBLOCK); 167 168 error = bind(sds[i], (const struct sockaddr *)&addr, 169 sizeof(addr)); 170 ATF_REQUIRE_MSG(error == 0, "bind() failed: %s", 171 strerror(errno)); 172 error = listen(sds[i], 1); 173 ATF_REQUIRE_MSG(error == 0, "listen() failed: %s", 174 strerror(errno)); 175 } 176 177 lb_simple_accept_loop(PF_INET, (struct sockaddr *)&addr, sds, 178 nitems(sds), nconns); 179 for (i = 0; i < nitems(sds); i++) { 180 error = close(sds[i]); 181 ATF_REQUIRE_MSG(error == 0, "close() failed: %s", 182 strerror(errno)); 183 } 184 } 185 186 ATF_TC_WITHOUT_HEAD(basic_ipv6); 187 ATF_TC_BODY(basic_ipv6, tc) 188 { 189 const struct in6_addr loopback6 = IN6ADDR_LOOPBACK_INIT; 190 struct sockaddr_in6 addr; 191 socklen_t slen; 192 size_t i; 193 const int nconns = 16384; 194 int error, sds[16]; 195 uint16_t port; 196 197 sds[0] = lb_listen_socket(PF_INET6, SOCK_NONBLOCK); 198 199 memset(&addr, 0, sizeof(addr)); 200 addr.sin6_len = sizeof(addr); 201 addr.sin6_family = AF_INET6; 202 addr.sin6_port = htons(0); 203 addr.sin6_addr = loopback6; 204 error = bind(sds[0], (const struct sockaddr *)&addr, sizeof(addr)); 205 ATF_REQUIRE_MSG(error == 0, "bind() failed: %s", strerror(errno)); 206 error = listen(sds[0], 1); 207 ATF_REQUIRE_MSG(error == 0, "listen() failed: %s", strerror(errno)); 208 209 slen = sizeof(addr); 210 error = getsockname(sds[0], (struct sockaddr *)&addr, &slen); 211 ATF_REQUIRE_MSG(error == 0, "getsockname() failed: %s", 212 strerror(errno)); 213 ATF_REQUIRE_MSG(slen == sizeof(addr), "sockaddr size changed"); 214 port = addr.sin6_port; 215 216 memset(&addr, 0, sizeof(addr)); 217 addr.sin6_len = sizeof(addr); 218 addr.sin6_family = AF_INET6; 219 addr.sin6_port = port; 220 addr.sin6_addr = loopback6; 221 for (i = 1; i < nitems(sds); i++) { 222 sds[i] = lb_listen_socket(PF_INET6, SOCK_NONBLOCK); 223 224 error = bind(sds[i], (const struct sockaddr *)&addr, 225 sizeof(addr)); 226 ATF_REQUIRE_MSG(error == 0, "bind() failed: %s", 227 strerror(errno)); 228 error = listen(sds[i], 1); 229 ATF_REQUIRE_MSG(error == 0, "listen() failed: %s", 230 strerror(errno)); 231 } 232 233 lb_simple_accept_loop(PF_INET6, (struct sockaddr *)&addr, sds, 234 nitems(sds), nconns); 235 for (i = 0; i < nitems(sds); i++) { 236 error = close(sds[i]); 237 ATF_REQUIRE_MSG(error == 0, "close() failed: %s", 238 strerror(errno)); 239 } 240 } 241 242 struct concurrent_add_softc { 243 struct sockaddr_storage ss; 244 int socks[128]; 245 int kq; 246 }; 247 248 static void * 249 listener(void *arg) 250 { 251 for (struct concurrent_add_softc *sc = arg;;) { 252 struct kevent kev; 253 ssize_t n; 254 int error, count, cs, s; 255 uint8_t b; 256 257 count = kevent(sc->kq, NULL, 0, &kev, 1, NULL); 258 ATF_REQUIRE_MSG(count == 1, 259 "kevent() failed: %s", strerror(errno)); 260 261 s = (int)kev.ident; 262 cs = accept(s, NULL, NULL); 263 ATF_REQUIRE_MSG(cs >= 0, 264 "accept() failed: %s", strerror(errno)); 265 266 b = 'M'; 267 n = write(cs, &b, sizeof(b)); 268 ATF_REQUIRE_MSG(n >= 0, "write() failed: %s", strerror(errno)); 269 ATF_REQUIRE(n == 1); 270 271 error = close(cs); 272 ATF_REQUIRE_MSG(error == 0 || errno == ECONNRESET, 273 "close() failed: %s", strerror(errno)); 274 } 275 } 276 277 static void * 278 connector(void *arg) 279 { 280 for (struct concurrent_add_softc *sc = arg;;) { 281 ssize_t n; 282 int error, s; 283 uint8_t b; 284 285 s = socket(sc->ss.ss_family, SOCK_STREAM, 0); 286 ATF_REQUIRE_MSG(s >= 0, "socket() failed: %s", strerror(errno)); 287 288 error = setsockopt(s, SOL_SOCKET, SO_REUSEADDR, (int[]){1}, 289 sizeof(int)); 290 291 error = connect(s, (struct sockaddr *)&sc->ss, sc->ss.ss_len); 292 ATF_REQUIRE_MSG(error == 0, "connect() failed: %s", 293 strerror(errno)); 294 295 n = read(s, &b, sizeof(b)); 296 ATF_REQUIRE_MSG(n >= 0, "read() failed: %s", 297 strerror(errno)); 298 ATF_REQUIRE(n == 1); 299 ATF_REQUIRE(b == 'M'); 300 error = close(s); 301 ATF_REQUIRE_MSG(error == 0, 302 "close() failed: %s", strerror(errno)); 303 } 304 } 305 306 /* 307 * Run three threads. One accepts connections from listening sockets on a 308 * kqueue, while the other makes connections. The third thread slowly adds 309 * sockets to the LB group. This is meant to help flush out race conditions. 310 */ 311 ATF_TC_WITHOUT_HEAD(concurrent_add); 312 ATF_TC_BODY(concurrent_add, tc) 313 { 314 struct concurrent_add_softc sc; 315 struct sockaddr_in *sin; 316 pthread_t threads[4]; 317 int error; 318 319 sc.kq = kqueue(); 320 ATF_REQUIRE_MSG(sc.kq >= 0, "kqueue() failed: %s", strerror(errno)); 321 322 error = pthread_create(&threads[0], NULL, listener, &sc); 323 ATF_REQUIRE_MSG(error == 0, "pthread_create() failed: %s", 324 strerror(error)); 325 326 sin = (struct sockaddr_in *)&sc.ss; 327 memset(sin, 0, sizeof(*sin)); 328 sin->sin_len = sizeof(*sin); 329 sin->sin_family = AF_INET; 330 sin->sin_port = htons(0); 331 sin->sin_addr.s_addr = htonl(INADDR_LOOPBACK); 332 333 for (size_t i = 0; i < nitems(sc.socks); i++) { 334 struct kevent kev; 335 int s; 336 337 sc.socks[i] = s = socket(AF_INET, SOCK_STREAM, 0); 338 ATF_REQUIRE_MSG(s >= 0, "socket() failed: %s", strerror(errno)); 339 340 error = setsockopt(s, SOL_SOCKET, SO_REUSEPORT_LB, (int[]){1}, 341 sizeof(int)); 342 ATF_REQUIRE_MSG(error == 0, 343 "setsockopt(SO_REUSEPORT_LB) failed: %s", strerror(errno)); 344 345 error = bind(s, (struct sockaddr *)sin, sizeof(*sin)); 346 ATF_REQUIRE_MSG(error == 0, "bind() failed: %s", 347 strerror(errno)); 348 349 error = listen(s, 5); 350 ATF_REQUIRE_MSG(error == 0, "listen() failed: %s", 351 strerror(errno)); 352 353 EV_SET(&kev, s, EVFILT_READ, EV_ADD | EV_ENABLE, 0, 0, 0); 354 error = kevent(sc.kq, &kev, 1, NULL, 0, NULL); 355 ATF_REQUIRE_MSG(error == 0, "kevent() failed: %s", 356 strerror(errno)); 357 358 if (i == 0) { 359 socklen_t slen = sizeof(sc.ss); 360 361 error = getsockname(sc.socks[i], 362 (struct sockaddr *)&sc.ss, &slen); 363 ATF_REQUIRE_MSG(error == 0, "getsockname() failed: %s", 364 strerror(errno)); 365 ATF_REQUIRE(sc.ss.ss_family == AF_INET); 366 367 for (size_t j = 1; j < nitems(threads); j++) { 368 error = pthread_create(&threads[j], NULL, 369 connector, &sc); 370 ATF_REQUIRE_MSG(error == 0, 371 "pthread_create() failed: %s", 372 strerror(error)); 373 } 374 } 375 376 usleep(20000); 377 } 378 } 379 380 /* 381 * Try calling listen(2) twice on a socket with SO_REUSEPORT_LB set. 382 */ 383 ATF_TC_WITHOUT_HEAD(double_listen_ipv4); 384 ATF_TC_BODY(double_listen_ipv4, tc) 385 { 386 struct sockaddr_in sin; 387 int error, s; 388 389 s = lb_listen_socket(PF_INET, 0); 390 391 memset(&sin, 0, sizeof(sin)); 392 sin.sin_len = sizeof(sin); 393 sin.sin_family = AF_INET; 394 sin.sin_port = htons(0); 395 sin.sin_addr.s_addr = htonl(INADDR_LOOPBACK); 396 error = bind(s, (struct sockaddr *)&sin, sizeof(sin)); 397 ATF_REQUIRE_MSG(error == 0, "bind() failed: %s", strerror(errno)); 398 399 error = listen(s, 1); 400 ATF_REQUIRE_MSG(error == 0, "listen() failed: %s", strerror(errno)); 401 error = listen(s, 2); 402 ATF_REQUIRE_MSG(error == 0, "listen() failed: %s", strerror(errno)); 403 404 error = close(s); 405 ATF_REQUIRE_MSG(error == 0, "close() failed: %s", strerror(errno)); 406 } 407 408 /* 409 * Try calling listen(2) twice on a socket with SO_REUSEPORT_LB set. 410 */ 411 ATF_TC_WITHOUT_HEAD(double_listen_ipv6); 412 ATF_TC_BODY(double_listen_ipv6, tc) 413 { 414 struct sockaddr_in6 sin6; 415 int error, s; 416 417 s = lb_listen_socket(PF_INET6, 0); 418 419 memset(&sin6, 0, sizeof(sin6)); 420 sin6.sin6_len = sizeof(sin6); 421 sin6.sin6_family = AF_INET6; 422 sin6.sin6_port = htons(0); 423 sin6.sin6_addr = in6addr_loopback; 424 error = bind(s, (struct sockaddr *)&sin6, sizeof(sin6)); 425 ATF_REQUIRE_MSG(error == 0, "bind() failed: %s", strerror(errno)); 426 427 error = listen(s, 1); 428 ATF_REQUIRE_MSG(error == 0, "listen() failed: %s", strerror(errno)); 429 error = listen(s, 2); 430 ATF_REQUIRE_MSG(error == 0, "listen() failed: %s", strerror(errno)); 431 432 error = close(s); 433 ATF_REQUIRE_MSG(error == 0, "close() failed: %s", strerror(errno)); 434 } 435 436 ATF_TP_ADD_TCS(tp) 437 { 438 ATF_TP_ADD_TC(tp, basic_ipv4); 439 ATF_TP_ADD_TC(tp, basic_ipv6); 440 ATF_TP_ADD_TC(tp, concurrent_add); 441 ATF_TP_ADD_TC(tp, double_listen_ipv4); 442 ATF_TP_ADD_TC(tp, double_listen_ipv6); 443 444 return (atf_no_error()); 445 } 446