1 /* $NetBSD: udp.c,v 1.11 2023/01/25 21:43:31 christos Exp $ */ 2 3 /* 4 * Copyright (C) Internet Systems Consortium, Inc. ("ISC") 5 * 6 * SPDX-License-Identifier: MPL-2.0 7 * 8 * This Source Code Form is subject to the terms of the Mozilla Public 9 * License, v. 2.0. If a copy of the MPL was not distributed with this 10 * file, you can obtain one at https://mozilla.org/MPL/2.0/. 11 * 12 * See the COPYRIGHT file distributed with this work for additional 13 * information regarding copyright ownership. 14 */ 15 16 #include <unistd.h> 17 #include <uv.h> 18 19 #include <isc/atomic.h> 20 #include <isc/barrier.h> 21 #include <isc/buffer.h> 22 #include <isc/condition.h> 23 #include <isc/errno.h> 24 #include <isc/magic.h> 25 #include <isc/mem.h> 26 #include <isc/netmgr.h> 27 #include <isc/random.h> 28 #include <isc/refcount.h> 29 #include <isc/region.h> 30 #include <isc/result.h> 31 #include <isc/sockaddr.h> 32 #include <isc/thread.h> 33 #include <isc/util.h> 34 35 #include "netmgr-int.h" 36 #include "uv-compat.h" 37 38 static isc_result_t 39 udp_send_direct(isc_nmsocket_t *sock, isc__nm_uvreq_t *req, 40 isc_sockaddr_t *peer); 41 42 static void 43 udp_recv_cb(uv_udp_t *handle, ssize_t nrecv, const uv_buf_t *buf, 44 const struct sockaddr *addr, unsigned flags); 45 46 static void 47 udp_send_cb(uv_udp_send_t *req, int status); 48 49 static void 50 udp_close_cb(uv_handle_t *handle); 51 52 static void 53 read_timer_close_cb(uv_handle_t *handle); 54 55 static void 56 udp_close_direct(isc_nmsocket_t *sock); 57 58 static void 59 stop_udp_parent(isc_nmsocket_t *sock); 60 static void 61 stop_udp_child(isc_nmsocket_t *sock); 62 63 static uv_os_sock_t 64 isc__nm_udp_lb_socket(isc_nm_t *mgr, sa_family_t sa_family) { 65 isc_result_t result; 66 uv_os_sock_t sock; 67 68 result = isc__nm_socket(sa_family, SOCK_DGRAM, 0, &sock); 69 RUNTIME_CHECK(result == ISC_R_SUCCESS); 70 71 (void)isc__nm_socket_incoming_cpu(sock); 72 (void)isc__nm_socket_disable_pmtud(sock, sa_family); 73 74 result = isc__nm_socket_reuse(sock); 75 RUNTIME_CHECK(result == ISC_R_SUCCESS); 76 77 #ifndef _WIN32 78 if (mgr->load_balance_sockets) { 79 result = isc__nm_socket_reuse_lb(sock); 80 RUNTIME_CHECK(result == ISC_R_SUCCESS); 81 } 82 #endif 83 84 return (sock); 85 } 86 87 static void 88 start_udp_child(isc_nm_t *mgr, isc_sockaddr_t *iface, isc_nmsocket_t *sock, 89 uv_os_sock_t fd, int tid) { 90 isc_nmsocket_t *csock; 91 isc__netievent_udplisten_t *ievent = NULL; 92 93 csock = &sock->children[tid]; 94 95 isc__nmsocket_init(csock, mgr, isc_nm_udpsocket, iface); 96 csock->parent = sock; 97 csock->iface = sock->iface; 98 csock->reading = true; 99 csock->recv_cb = sock->recv_cb; 100 csock->recv_cbarg = sock->recv_cbarg; 101 csock->extrahandlesize = sock->extrahandlesize; 102 csock->tid = tid; 103 104 #ifdef _WIN32 105 UNUSED(fd); 106 csock->fd = isc__nm_udp_lb_socket(mgr, iface->type.sa.sa_family); 107 #else 108 if (mgr->load_balance_sockets) { 109 UNUSED(fd); 110 csock->fd = isc__nm_udp_lb_socket(mgr, 111 iface->type.sa.sa_family); 112 } else { 113 csock->fd = dup(fd); 114 } 115 #endif 116 REQUIRE(csock->fd >= 0); 117 118 ievent = isc__nm_get_netievent_udplisten(mgr, csock); 119 isc__nm_maybe_enqueue_ievent(&mgr->workers[tid], 120 (isc__netievent_t *)ievent); 121 } 122 123 static void 124 enqueue_stoplistening(isc_nmsocket_t *sock) { 125 isc__netievent_udpstop_t *ievent = 126 isc__nm_get_netievent_udpstop(sock->mgr, sock); 127 isc__nm_enqueue_ievent(&sock->mgr->workers[sock->tid], 128 (isc__netievent_t *)ievent); 129 } 130 131 isc_result_t 132 isc_nm_listenudp(isc_nm_t *mgr, isc_sockaddr_t *iface, isc_nm_recv_cb_t cb, 133 void *cbarg, size_t extrahandlesize, isc_nmsocket_t **sockp) { 134 isc_result_t result = ISC_R_SUCCESS; 135 isc_nmsocket_t *sock = NULL; 136 size_t children_size = 0; 137 REQUIRE(VALID_NM(mgr)); 138 uv_os_sock_t fd = -1; 139 140 /* 141 * We are creating mgr->nworkers duplicated sockets, one 142 * socket for each worker thread. 143 */ 144 sock = isc_mem_get(mgr->mctx, sizeof(isc_nmsocket_t)); 145 isc__nmsocket_init(sock, mgr, isc_nm_udplistener, iface); 146 147 atomic_init(&sock->rchildren, 0); 148 #if defined(WIN32) 149 sock->nchildren = 1; 150 #else 151 sock->nchildren = mgr->nworkers; 152 #endif 153 154 children_size = sock->nchildren * sizeof(sock->children[0]); 155 sock->children = isc_mem_get(mgr->mctx, children_size); 156 memset(sock->children, 0, children_size); 157 158 sock->recv_cb = cb; 159 sock->recv_cbarg = cbarg; 160 sock->extrahandlesize = extrahandlesize; 161 sock->result = ISC_R_UNSET; 162 163 sock->tid = 0; 164 sock->fd = -1; 165 166 #ifndef _WIN32 167 if (!mgr->load_balance_sockets) { 168 fd = isc__nm_udp_lb_socket(mgr, iface->type.sa.sa_family); 169 } 170 #endif 171 172 isc_barrier_init(&sock->startlistening, sock->nchildren); 173 174 for (size_t i = 0; i < sock->nchildren; i++) { 175 if ((int)i == isc_nm_tid()) { 176 continue; 177 } 178 start_udp_child(mgr, iface, sock, fd, i); 179 } 180 181 if (isc__nm_in_netthread()) { 182 start_udp_child(mgr, iface, sock, fd, isc_nm_tid()); 183 } 184 185 #ifndef _WIN32 186 if (!mgr->load_balance_sockets) { 187 isc__nm_closesocket(fd); 188 } 189 #endif 190 191 LOCK(&sock->lock); 192 while (atomic_load(&sock->rchildren) != sock->nchildren) { 193 WAIT(&sock->cond, &sock->lock); 194 } 195 result = sock->result; 196 atomic_store(&sock->active, true); 197 UNLOCK(&sock->lock); 198 199 INSIST(result != ISC_R_UNSET); 200 201 if (result == ISC_R_SUCCESS) { 202 REQUIRE(atomic_load(&sock->rchildren) == sock->nchildren); 203 *sockp = sock; 204 } else { 205 atomic_store(&sock->active, false); 206 enqueue_stoplistening(sock); 207 isc_nmsocket_close(&sock); 208 } 209 210 return (result); 211 } 212 213 /* 214 * Asynchronous 'udplisten' call handler: start listening on a UDP socket. 215 */ 216 void 217 isc__nm_async_udplisten(isc__networker_t *worker, isc__netievent_t *ev0) { 218 isc__netievent_udplisten_t *ievent = (isc__netievent_udplisten_t *)ev0; 219 isc_nmsocket_t *sock = NULL; 220 int r, uv_bind_flags = 0; 221 int uv_init_flags = 0; 222 sa_family_t sa_family; 223 isc_result_t result = ISC_R_UNSET; 224 isc_nm_t *mgr = NULL; 225 226 REQUIRE(VALID_NMSOCK(ievent->sock)); 227 REQUIRE(ievent->sock->tid == isc_nm_tid()); 228 REQUIRE(VALID_NMSOCK(ievent->sock->parent)); 229 230 sock = ievent->sock; 231 sa_family = sock->iface.type.sa.sa_family; 232 mgr = sock->mgr; 233 234 REQUIRE(sock->type == isc_nm_udpsocket); 235 REQUIRE(sock->parent != NULL); 236 REQUIRE(sock->tid == isc_nm_tid()); 237 238 #if HAVE_DECL_UV_UDP_RECVMMSG 239 uv_init_flags |= UV_UDP_RECVMMSG; 240 #endif 241 r = uv_udp_init_ex(&worker->loop, &sock->uv_handle.udp, uv_init_flags); 242 UV_RUNTIME_CHECK(uv_udp_init_ex, r); 243 uv_handle_set_data(&sock->uv_handle.handle, sock); 244 /* This keeps the socket alive after everything else is gone */ 245 isc__nmsocket_attach(sock, &(isc_nmsocket_t *){ NULL }); 246 247 r = uv_timer_init(&worker->loop, &sock->read_timer); 248 UV_RUNTIME_CHECK(uv_timer_init, r); 249 uv_handle_set_data((uv_handle_t *)&sock->read_timer, sock); 250 251 LOCK(&sock->parent->lock); 252 253 r = uv_udp_open(&sock->uv_handle.udp, sock->fd); 254 if (r < 0) { 255 isc__nm_closesocket(sock->fd); 256 isc__nm_incstats(sock->mgr, sock->statsindex[STATID_OPENFAIL]); 257 goto done; 258 } 259 isc__nm_incstats(sock->mgr, sock->statsindex[STATID_OPEN]); 260 261 if (sa_family == AF_INET6) { 262 uv_bind_flags |= UV_UDP_IPV6ONLY; 263 } 264 265 #ifdef _WIN32 266 r = isc_uv_udp_freebind(&sock->uv_handle.udp, 267 &sock->parent->iface.type.sa, uv_bind_flags); 268 if (r < 0) { 269 isc__nm_incstats(sock->mgr, sock->statsindex[STATID_BINDFAIL]); 270 goto done; 271 } 272 #else 273 if (mgr->load_balance_sockets) { 274 r = isc_uv_udp_freebind(&sock->uv_handle.udp, 275 &sock->parent->iface.type.sa, 276 uv_bind_flags); 277 if (r < 0) { 278 isc__nm_incstats(sock->mgr, 279 sock->statsindex[STATID_BINDFAIL]); 280 goto done; 281 } 282 } else { 283 if (sock->parent->fd == -1) { 284 /* This thread is first, bind the socket */ 285 r = isc_uv_udp_freebind(&sock->uv_handle.udp, 286 &sock->parent->iface.type.sa, 287 uv_bind_flags); 288 if (r < 0) { 289 isc__nm_incstats(sock->mgr, STATID_BINDFAIL); 290 goto done; 291 } 292 sock->parent->uv_handle.udp.flags = 293 sock->uv_handle.udp.flags; 294 sock->parent->fd = sock->fd; 295 } else { 296 /* The socket is already bound, just copy the flags */ 297 sock->uv_handle.udp.flags = 298 sock->parent->uv_handle.udp.flags; 299 } 300 } 301 #endif 302 303 #ifdef ISC_RECV_BUFFER_SIZE 304 uv_recv_buffer_size(&sock->uv_handle.handle, 305 &(int){ ISC_RECV_BUFFER_SIZE }); 306 #endif 307 #ifdef ISC_SEND_BUFFER_SIZE 308 uv_send_buffer_size(&sock->uv_handle.handle, 309 &(int){ ISC_SEND_BUFFER_SIZE }); 310 #endif 311 r = uv_udp_recv_start(&sock->uv_handle.udp, isc__nm_alloc_cb, 312 udp_recv_cb); 313 if (r != 0) { 314 isc__nm_incstats(sock->mgr, sock->statsindex[STATID_BINDFAIL]); 315 goto done; 316 } 317 318 atomic_store(&sock->listening, true); 319 320 done: 321 result = isc__nm_uverr2result(r); 322 atomic_fetch_add(&sock->parent->rchildren, 1); 323 if (sock->parent->result == ISC_R_UNSET) { 324 sock->parent->result = result; 325 } 326 SIGNAL(&sock->parent->cond); 327 UNLOCK(&sock->parent->lock); 328 329 isc_barrier_wait(&sock->parent->startlistening); 330 } 331 332 void 333 isc__nm_udp_stoplistening(isc_nmsocket_t *sock) { 334 REQUIRE(VALID_NMSOCK(sock)); 335 REQUIRE(sock->type == isc_nm_udplistener); 336 337 if (!atomic_compare_exchange_strong(&sock->closing, &(bool){ false }, 338 true)) 339 { 340 UNREACHABLE(); 341 } 342 343 if (!isc__nm_in_netthread()) { 344 enqueue_stoplistening(sock); 345 } else { 346 stop_udp_parent(sock); 347 } 348 } 349 350 /* 351 * Asynchronous 'udpstop' call handler: stop listening on a UDP socket. 352 */ 353 void 354 isc__nm_async_udpstop(isc__networker_t *worker, isc__netievent_t *ev0) { 355 isc__netievent_udpstop_t *ievent = (isc__netievent_udpstop_t *)ev0; 356 isc_nmsocket_t *sock = ievent->sock; 357 358 UNUSED(worker); 359 360 REQUIRE(VALID_NMSOCK(sock)); 361 REQUIRE(sock->tid == isc_nm_tid()); 362 363 if (sock->parent != NULL) { 364 stop_udp_child(sock); 365 return; 366 } 367 368 stop_udp_parent(sock); 369 } 370 371 /* 372 * udp_recv_cb handles incoming UDP packet from uv. The buffer here is 373 * reused for a series of packets, so we need to allocate a new one. 374 * This new one can be reused to send the response then. 375 */ 376 static void 377 udp_recv_cb(uv_udp_t *handle, ssize_t nrecv, const uv_buf_t *buf, 378 const struct sockaddr *addr, unsigned flags) { 379 isc_nmsocket_t *sock = uv_handle_get_data((uv_handle_t *)handle); 380 isc__nm_uvreq_t *req = NULL; 381 uint32_t maxudp; 382 isc_sockaddr_t sockaddr; 383 isc_result_t result; 384 385 REQUIRE(VALID_NMSOCK(sock)); 386 REQUIRE(sock->tid == isc_nm_tid()); 387 REQUIRE(sock->reading); 388 389 /* 390 * When using recvmmsg(2), if no errors occur, there will be a final 391 * callback with nrecv set to 0, addr set to NULL and the buffer 392 * pointing at the initially allocated data with the UV_UDP_MMSG_CHUNK 393 * flag cleared and the UV_UDP_MMSG_FREE flag set. 394 */ 395 #if HAVE_DECL_UV_UDP_MMSG_FREE 396 if ((flags & UV_UDP_MMSG_FREE) == UV_UDP_MMSG_FREE) { 397 INSIST(nrecv == 0); 398 INSIST(addr == NULL); 399 goto free; 400 } 401 #else 402 UNUSED(flags); 403 #endif 404 405 /* 406 * - If we're simulating a firewall blocking UDP packets 407 * bigger than 'maxudp' bytes for testing purposes. 408 */ 409 maxudp = atomic_load(&sock->mgr->maxudp); 410 if ((maxudp != 0 && (uint32_t)nrecv > maxudp)) { 411 /* 412 * We need to keep the read_cb intact in case, so the 413 * readtimeout_cb can trigger and not crash because of 414 * missing read_req. 415 */ 416 goto free; 417 } 418 419 /* 420 * - If addr == NULL, in which case it's the end of stream; 421 * we can free the buffer and bail. 422 */ 423 if (addr == NULL) { 424 isc__nm_failed_read_cb(sock, ISC_R_EOF, false); 425 goto free; 426 } 427 428 /* 429 * - If the socket is no longer active. 430 */ 431 if (!isc__nmsocket_active(sock)) { 432 isc__nm_failed_read_cb(sock, ISC_R_CANCELED, false); 433 goto free; 434 } 435 436 if (nrecv < 0) { 437 isc__nm_failed_read_cb(sock, isc__nm_uverr2result(nrecv), 438 false); 439 goto free; 440 } 441 442 result = isc_sockaddr_fromsockaddr(&sockaddr, addr); 443 RUNTIME_CHECK(result == ISC_R_SUCCESS); 444 445 req = isc__nm_get_read_req(sock, &sockaddr); 446 447 /* 448 * The callback will be called synchronously, because result is 449 * ISC_R_SUCCESS, so we are ok of passing the buf directly. 450 */ 451 req->uvbuf.base = buf->base; 452 req->uvbuf.len = nrecv; 453 454 sock->recv_read = false; 455 456 REQUIRE(!sock->processing); 457 sock->processing = true; 458 isc__nm_readcb(sock, req, ISC_R_SUCCESS); 459 sock->processing = false; 460 461 free: 462 #if HAVE_DECL_UV_UDP_MMSG_CHUNK 463 /* 464 * When using recvmmsg(2), chunks will have the UV_UDP_MMSG_CHUNK flag 465 * set, those must not be freed. 466 */ 467 if ((flags & UV_UDP_MMSG_CHUNK) == UV_UDP_MMSG_CHUNK) { 468 return; 469 } 470 #endif 471 472 /* 473 * When using recvmmsg(2), if a UDP socket error occurs, nrecv will be < 474 * 0. In either scenario, the callee can now safely free the provided 475 * buffer. 476 */ 477 if (nrecv < 0) { 478 /* 479 * The buffer may be a null buffer on error. 480 */ 481 if (buf->base == NULL && buf->len == 0) { 482 return; 483 } 484 } 485 486 isc__nm_free_uvbuf(sock, buf); 487 } 488 489 /* 490 * Send the data in 'region' to a peer via a UDP socket. We try to find 491 * a proper sibling/child socket so that we won't have to jump to 492 * another thread. 493 */ 494 void 495 isc__nm_udp_send(isc_nmhandle_t *handle, const isc_region_t *region, 496 isc_nm_cb_t cb, void *cbarg) { 497 isc_nmsocket_t *sock = handle->sock; 498 isc_nmsocket_t *rsock = NULL; 499 isc_sockaddr_t *peer = &handle->peer; 500 isc__nm_uvreq_t *uvreq = NULL; 501 uint32_t maxudp = atomic_load(&sock->mgr->maxudp); 502 int ntid; 503 504 INSIST(sock->type == isc_nm_udpsocket); 505 506 /* 507 * We're simulating a firewall blocking UDP packets bigger than 508 * 'maxudp' bytes, for testing purposes. 509 * 510 * The client would ordinarily have unreferenced the handle 511 * in the callback, but that won't happen in this case, so 512 * we need to do so here. 513 */ 514 if (maxudp != 0 && region->length > maxudp) { 515 isc_nmhandle_detach(&handle); 516 return; 517 } 518 519 if (atomic_load(&sock->client)) { 520 /* 521 * When we are sending from the client socket, we directly use 522 * the socket provided. 523 */ 524 rsock = sock; 525 goto send; 526 } else { 527 /* 528 * When we are sending from the server socket, we either use the 529 * socket associated with the network thread we are in, or we 530 * use the thread from the socket associated with the handle. 531 */ 532 INSIST(sock->parent != NULL); 533 534 #if defined(WIN32) 535 /* On Windows, we have only a single listening listener */ 536 rsock = sock; 537 #else 538 if (isc__nm_in_netthread()) { 539 ntid = isc_nm_tid(); 540 } else { 541 ntid = sock->tid; 542 } 543 rsock = &sock->parent->children[ntid]; 544 #endif 545 } 546 547 send: 548 uvreq = isc__nm_uvreq_get(rsock->mgr, rsock); 549 uvreq->uvbuf.base = (char *)region->base; 550 uvreq->uvbuf.len = region->length; 551 552 isc_nmhandle_attach(handle, &uvreq->handle); 553 554 uvreq->cb.send = cb; 555 uvreq->cbarg = cbarg; 556 557 if (isc_nm_tid() == rsock->tid) { 558 REQUIRE(rsock->tid == isc_nm_tid()); 559 isc__netievent_udpsend_t ievent = { .sock = rsock, 560 .req = uvreq, 561 .peer = *peer }; 562 563 isc__nm_async_udpsend(NULL, (isc__netievent_t *)&ievent); 564 } else { 565 isc__netievent_udpsend_t *ievent = 566 isc__nm_get_netievent_udpsend(sock->mgr, rsock); 567 ievent->peer = *peer; 568 ievent->req = uvreq; 569 570 isc__nm_enqueue_ievent(&sock->mgr->workers[rsock->tid], 571 (isc__netievent_t *)ievent); 572 } 573 } 574 575 /* 576 * Asynchronous 'udpsend' event handler: send a packet on a UDP socket. 577 */ 578 void 579 isc__nm_async_udpsend(isc__networker_t *worker, isc__netievent_t *ev0) { 580 isc_result_t result; 581 isc__netievent_udpsend_t *ievent = (isc__netievent_udpsend_t *)ev0; 582 isc_nmsocket_t *sock = ievent->sock; 583 isc__nm_uvreq_t *uvreq = ievent->req; 584 585 REQUIRE(sock->type == isc_nm_udpsocket); 586 REQUIRE(sock->tid == isc_nm_tid()); 587 UNUSED(worker); 588 589 if (isc__nmsocket_closing(sock)) { 590 isc__nm_failed_send_cb(sock, uvreq, ISC_R_CANCELED); 591 return; 592 } 593 594 result = udp_send_direct(sock, uvreq, &ievent->peer); 595 if (result != ISC_R_SUCCESS) { 596 isc__nm_incstats(sock->mgr, sock->statsindex[STATID_SENDFAIL]); 597 isc__nm_failed_send_cb(sock, uvreq, result); 598 } 599 } 600 601 static void 602 udp_send_cb(uv_udp_send_t *req, int status) { 603 isc_result_t result = ISC_R_SUCCESS; 604 isc__nm_uvreq_t *uvreq = uv_handle_get_data((uv_handle_t *)req); 605 isc_nmsocket_t *sock = NULL; 606 607 REQUIRE(VALID_UVREQ(uvreq)); 608 REQUIRE(VALID_NMHANDLE(uvreq->handle)); 609 610 sock = uvreq->sock; 611 612 REQUIRE(sock->tid == isc_nm_tid()); 613 614 if (status < 0) { 615 result = isc__nm_uverr2result(status); 616 isc__nm_incstats(sock->mgr, sock->statsindex[STATID_SENDFAIL]); 617 } 618 619 isc__nm_sendcb(sock, uvreq, result, false); 620 } 621 622 /* 623 * udp_send_direct sends buf to a peer on a socket. Sock has to be in 624 * the same thread as the callee. 625 */ 626 static isc_result_t 627 udp_send_direct(isc_nmsocket_t *sock, isc__nm_uvreq_t *req, 628 isc_sockaddr_t *peer) { 629 const struct sockaddr *sa = &peer->type.sa; 630 int r; 631 632 REQUIRE(VALID_NMSOCK(sock)); 633 REQUIRE(VALID_UVREQ(req)); 634 REQUIRE(sock->tid == isc_nm_tid()); 635 REQUIRE(sock->type == isc_nm_udpsocket); 636 637 if (isc__nmsocket_closing(sock)) { 638 return (ISC_R_CANCELED); 639 } 640 641 #if UV_VERSION_HEX >= UV_VERSION(1, 27, 0) 642 /* 643 * If we used uv_udp_connect() (and not the shim version for 644 * older versions of libuv), then the peer address has to be 645 * set to NULL or else uv_udp_send() could fail or assert, 646 * depending on the libuv version. 647 */ 648 if (atomic_load(&sock->connected)) { 649 sa = NULL; 650 } 651 #endif 652 653 r = uv_udp_send(&req->uv_req.udp_send, &sock->uv_handle.udp, 654 &req->uvbuf, 1, sa, udp_send_cb); 655 if (r < 0) { 656 return (isc__nm_uverr2result(r)); 657 } 658 659 return (ISC_R_SUCCESS); 660 } 661 662 static isc_result_t 663 udp_connect_direct(isc_nmsocket_t *sock, isc__nm_uvreq_t *req) { 664 isc__networker_t *worker = NULL; 665 int uv_bind_flags = UV_UDP_REUSEADDR; 666 isc_result_t result = ISC_R_UNSET; 667 int tries = 3; 668 int r; 669 670 REQUIRE(isc__nm_in_netthread()); 671 REQUIRE(sock->tid == isc_nm_tid()); 672 673 worker = &sock->mgr->workers[isc_nm_tid()]; 674 675 atomic_store(&sock->connecting, true); 676 677 r = uv_udp_init(&worker->loop, &sock->uv_handle.udp); 678 UV_RUNTIME_CHECK(uv_udp_init, r); 679 uv_handle_set_data(&sock->uv_handle.handle, sock); 680 681 r = uv_timer_init(&worker->loop, &sock->read_timer); 682 UV_RUNTIME_CHECK(uv_timer_init, r); 683 uv_handle_set_data((uv_handle_t *)&sock->read_timer, sock); 684 685 r = uv_udp_open(&sock->uv_handle.udp, sock->fd); 686 if (r != 0) { 687 isc__nm_incstats(sock->mgr, sock->statsindex[STATID_OPENFAIL]); 688 goto done; 689 } 690 isc__nm_incstats(sock->mgr, sock->statsindex[STATID_OPEN]); 691 692 if (sock->iface.type.sa.sa_family == AF_INET6) { 693 uv_bind_flags |= UV_UDP_IPV6ONLY; 694 } 695 696 r = uv_udp_bind(&sock->uv_handle.udp, &sock->iface.type.sa, 697 uv_bind_flags); 698 if (r != 0) { 699 isc__nm_incstats(sock->mgr, sock->statsindex[STATID_BINDFAIL]); 700 goto done; 701 } 702 703 #ifdef ISC_RECV_BUFFER_SIZE 704 uv_recv_buffer_size(&sock->uv_handle.handle, 705 &(int){ ISC_RECV_BUFFER_SIZE }); 706 #endif 707 #ifdef ISC_SEND_BUFFER_SIZE 708 uv_send_buffer_size(&sock->uv_handle.handle, 709 &(int){ ISC_SEND_BUFFER_SIZE }); 710 #endif 711 712 /* 713 * On FreeBSD the UDP connect() call sometimes results in a 714 * spurious transient EADDRINUSE. Try a few more times before 715 * giving up. 716 */ 717 do { 718 r = isc_uv_udp_connect(&sock->uv_handle.udp, 719 &req->peer.type.sa); 720 } while (r == UV_EADDRINUSE && --tries > 0); 721 if (r != 0) { 722 isc__nm_incstats(sock->mgr, 723 sock->statsindex[STATID_CONNECTFAIL]); 724 goto done; 725 } 726 isc__nm_incstats(sock->mgr, sock->statsindex[STATID_CONNECT]); 727 728 atomic_store(&sock->connecting, false); 729 atomic_store(&sock->connected, true); 730 731 done: 732 result = isc__nm_uverr2result(r); 733 734 LOCK(&sock->lock); 735 sock->result = result; 736 SIGNAL(&sock->cond); 737 if (!atomic_load(&sock->active)) { 738 WAIT(&sock->scond, &sock->lock); 739 } 740 INSIST(atomic_load(&sock->active)); 741 UNLOCK(&sock->lock); 742 743 return (result); 744 } 745 746 /* 747 * Asynchronous 'udpconnect' call handler: open a new UDP socket and 748 * call the 'open' callback with a handle. 749 */ 750 void 751 isc__nm_async_udpconnect(isc__networker_t *worker, isc__netievent_t *ev0) { 752 isc__netievent_udpconnect_t *ievent = 753 (isc__netievent_udpconnect_t *)ev0; 754 isc_nmsocket_t *sock = ievent->sock; 755 isc__nm_uvreq_t *req = ievent->req; 756 isc_result_t result; 757 758 UNUSED(worker); 759 760 REQUIRE(VALID_NMSOCK(sock)); 761 REQUIRE(sock->type == isc_nm_udpsocket); 762 REQUIRE(sock->parent == NULL); 763 REQUIRE(sock->tid == isc_nm_tid()); 764 765 result = udp_connect_direct(sock, req); 766 if (result != ISC_R_SUCCESS) { 767 atomic_store(&sock->active, false); 768 isc__nm_udp_close(sock); 769 isc__nm_connectcb(sock, req, result, true); 770 } else { 771 /* 772 * The callback has to be called after the socket has been 773 * initialized 774 */ 775 isc__nm_connectcb(sock, req, ISC_R_SUCCESS, true); 776 } 777 778 /* 779 * The sock is now attached to the handle. 780 */ 781 isc__nmsocket_detach(&sock); 782 } 783 784 void 785 isc_nm_udpconnect(isc_nm_t *mgr, isc_sockaddr_t *local, isc_sockaddr_t *peer, 786 isc_nm_cb_t cb, void *cbarg, unsigned int timeout, 787 size_t extrahandlesize) { 788 isc_result_t result = ISC_R_SUCCESS; 789 isc_nmsocket_t *sock = NULL; 790 isc__netievent_udpconnect_t *event = NULL; 791 isc__nm_uvreq_t *req = NULL; 792 sa_family_t sa_family; 793 794 REQUIRE(VALID_NM(mgr)); 795 REQUIRE(local != NULL); 796 REQUIRE(peer != NULL); 797 798 sa_family = peer->type.sa.sa_family; 799 800 sock = isc_mem_get(mgr->mctx, sizeof(isc_nmsocket_t)); 801 isc__nmsocket_init(sock, mgr, isc_nm_udpsocket, local); 802 803 sock->connect_cb = cb; 804 sock->connect_cbarg = cbarg; 805 sock->read_timeout = timeout; 806 sock->extrahandlesize = extrahandlesize; 807 sock->peer = *peer; 808 sock->result = ISC_R_UNSET; 809 atomic_init(&sock->client, true); 810 811 req = isc__nm_uvreq_get(mgr, sock); 812 req->cb.connect = cb; 813 req->cbarg = cbarg; 814 req->peer = *peer; 815 req->local = *local; 816 req->handle = isc__nmhandle_get(sock, &req->peer, &sock->iface); 817 818 result = isc__nm_socket(sa_family, SOCK_DGRAM, 0, &sock->fd); 819 if (result != ISC_R_SUCCESS) { 820 if (isc__nm_in_netthread()) { 821 sock->tid = isc_nm_tid(); 822 } 823 isc__nmsocket_clearcb(sock); 824 isc__nm_connectcb(sock, req, result, true); 825 atomic_store(&sock->closed, true); 826 isc__nmsocket_detach(&sock); 827 return; 828 } 829 830 result = isc__nm_socket_reuse(sock->fd); 831 RUNTIME_CHECK(result == ISC_R_SUCCESS || 832 result == ISC_R_NOTIMPLEMENTED); 833 834 result = isc__nm_socket_reuse_lb(sock->fd); 835 RUNTIME_CHECK(result == ISC_R_SUCCESS || 836 result == ISC_R_NOTIMPLEMENTED); 837 838 (void)isc__nm_socket_incoming_cpu(sock->fd); 839 840 (void)isc__nm_socket_disable_pmtud(sock->fd, sa_family); 841 842 event = isc__nm_get_netievent_udpconnect(mgr, sock, req); 843 844 if (isc__nm_in_netthread()) { 845 atomic_store(&sock->active, true); 846 sock->tid = isc_nm_tid(); 847 isc__nm_async_udpconnect(&mgr->workers[sock->tid], 848 (isc__netievent_t *)event); 849 isc__nm_put_netievent_udpconnect(mgr, event); 850 } else { 851 atomic_init(&sock->active, false); 852 sock->tid = isc_random_uniform(mgr->nworkers); 853 isc__nm_enqueue_ievent(&mgr->workers[sock->tid], 854 (isc__netievent_t *)event); 855 } 856 LOCK(&sock->lock); 857 while (sock->result == ISC_R_UNSET) { 858 WAIT(&sock->cond, &sock->lock); 859 } 860 atomic_store(&sock->active, true); 861 BROADCAST(&sock->scond); 862 UNLOCK(&sock->lock); 863 } 864 865 void 866 isc__nm_udp_read_cb(uv_udp_t *handle, ssize_t nrecv, const uv_buf_t *buf, 867 const struct sockaddr *addr, unsigned flags) { 868 isc_nmsocket_t *sock = uv_handle_get_data((uv_handle_t *)handle); 869 REQUIRE(VALID_NMSOCK(sock)); 870 871 udp_recv_cb(handle, nrecv, buf, addr, flags); 872 /* 873 * If a caller calls isc_nm_read() on a listening socket, we can 874 * get here, but we MUST NOT stop reading from the listener 875 * socket. The only difference between listener and connected 876 * sockets is that the former has sock->parent set and later 877 * does not. 878 */ 879 if (!sock->parent) { 880 isc__nm_stop_reading(sock); 881 } 882 } 883 884 void 885 isc__nm_udp_failed_read_cb(isc_nmsocket_t *sock, isc_result_t result) { 886 REQUIRE(VALID_NMSOCK(sock)); 887 REQUIRE(result != ISC_R_SUCCESS); 888 889 if (atomic_load(&sock->client)) { 890 isc__nmsocket_timer_stop(sock); 891 isc__nm_stop_reading(sock); 892 893 if (!sock->recv_read) { 894 goto destroy; 895 } 896 sock->recv_read = false; 897 898 if (sock->recv_cb != NULL) { 899 isc__nm_uvreq_t *req = isc__nm_get_read_req(sock, NULL); 900 isc__nmsocket_clearcb(sock); 901 isc__nm_readcb(sock, req, result); 902 } 903 904 destroy: 905 isc__nmsocket_prep_destroy(sock); 906 return; 907 } 908 909 /* 910 * For UDP server socket, we don't have child socket via 911 * "accept", so we: 912 * - we continue to read 913 * - we don't clear the callbacks 914 * - we don't destroy it (only stoplistening could do that) 915 */ 916 if (!sock->recv_read) { 917 return; 918 } 919 sock->recv_read = false; 920 921 if (sock->recv_cb != NULL) { 922 isc__nm_uvreq_t *req = isc__nm_get_read_req(sock, NULL); 923 isc__nm_readcb(sock, req, result); 924 } 925 } 926 927 /* 928 * Asynchronous 'udpread' call handler: start or resume reading on a 929 * socket; pause reading and call the 'recv' callback after each 930 * datagram. 931 */ 932 void 933 isc__nm_async_udpread(isc__networker_t *worker, isc__netievent_t *ev0) { 934 isc__netievent_udpread_t *ievent = (isc__netievent_udpread_t *)ev0; 935 isc_nmsocket_t *sock = ievent->sock; 936 isc_result_t result; 937 938 UNUSED(worker); 939 940 REQUIRE(VALID_NMSOCK(sock)); 941 REQUIRE(sock->tid == isc_nm_tid()); 942 943 if (isc__nmsocket_closing(sock)) { 944 result = ISC_R_CANCELED; 945 } else { 946 result = isc__nm_start_reading(sock); 947 } 948 949 if (result != ISC_R_SUCCESS) { 950 sock->reading = true; 951 isc__nm_failed_read_cb(sock, result, false); 952 return; 953 } 954 955 isc__nmsocket_timer_start(sock); 956 } 957 958 void 959 isc__nm_udp_read(isc_nmhandle_t *handle, isc_nm_recv_cb_t cb, void *cbarg) { 960 REQUIRE(VALID_NMHANDLE(handle)); 961 REQUIRE(VALID_NMSOCK(handle->sock)); 962 963 isc_nmsocket_t *sock = handle->sock; 964 965 REQUIRE(sock->type == isc_nm_udpsocket); 966 REQUIRE(sock->statichandle == handle); 967 REQUIRE(sock->tid == isc_nm_tid()); 968 REQUIRE(!sock->recv_read); 969 970 sock->recv_cb = cb; 971 sock->recv_cbarg = cbarg; 972 sock->recv_read = true; 973 974 if (!sock->reading && sock->tid == isc_nm_tid()) { 975 isc__netievent_udpread_t ievent = { .sock = sock }; 976 isc__nm_async_udpread(NULL, (isc__netievent_t *)&ievent); 977 } else { 978 isc__netievent_udpread_t *ievent = 979 isc__nm_get_netievent_udpread(sock->mgr, sock); 980 isc__nm_enqueue_ievent(&sock->mgr->workers[sock->tid], 981 (isc__netievent_t *)ievent); 982 } 983 } 984 985 static void 986 udp_stop_cb(uv_handle_t *handle) { 987 isc_nmsocket_t *sock = uv_handle_get_data(handle); 988 uv_handle_set_data(handle, NULL); 989 990 REQUIRE(VALID_NMSOCK(sock)); 991 REQUIRE(sock->tid == isc_nm_tid()); 992 REQUIRE(atomic_load(&sock->closing)); 993 994 if (!atomic_compare_exchange_strong(&sock->closed, &(bool){ false }, 995 true)) 996 { 997 UNREACHABLE(); 998 } 999 1000 isc__nm_incstats(sock->mgr, sock->statsindex[STATID_CLOSE]); 1001 1002 atomic_store(&sock->listening, false); 1003 1004 isc__nmsocket_detach(&sock); 1005 } 1006 1007 static void 1008 udp_close_cb(uv_handle_t *handle) { 1009 isc_nmsocket_t *sock = uv_handle_get_data(handle); 1010 uv_handle_set_data(handle, NULL); 1011 1012 REQUIRE(VALID_NMSOCK(sock)); 1013 REQUIRE(sock->tid == isc_nm_tid()); 1014 REQUIRE(atomic_load(&sock->closing)); 1015 1016 if (!atomic_compare_exchange_strong(&sock->closed, &(bool){ false }, 1017 true)) 1018 { 1019 UNREACHABLE(); 1020 } 1021 1022 isc__nm_incstats(sock->mgr, sock->statsindex[STATID_CLOSE]); 1023 1024 if (sock->server != NULL) { 1025 isc__nmsocket_detach(&sock->server); 1026 } 1027 1028 atomic_store(&sock->connected, false); 1029 atomic_store(&sock->listening, false); 1030 1031 isc__nmsocket_prep_destroy(sock); 1032 } 1033 1034 static void 1035 read_timer_close_cb(uv_handle_t *handle) { 1036 isc_nmsocket_t *sock = uv_handle_get_data(handle); 1037 uv_handle_set_data(handle, NULL); 1038 1039 if (sock->parent) { 1040 uv_close(&sock->uv_handle.handle, udp_stop_cb); 1041 } else { 1042 uv_close(&sock->uv_handle.handle, udp_close_cb); 1043 } 1044 } 1045 1046 static void 1047 stop_udp_child(isc_nmsocket_t *sock) { 1048 REQUIRE(sock->type == isc_nm_udpsocket); 1049 REQUIRE(sock->tid == isc_nm_tid()); 1050 1051 if (!atomic_compare_exchange_strong(&sock->closing, &(bool){ false }, 1052 true)) 1053 { 1054 return; 1055 } 1056 1057 udp_close_direct(sock); 1058 1059 atomic_fetch_sub(&sock->parent->rchildren, 1); 1060 1061 isc_barrier_wait(&sock->parent->stoplistening); 1062 } 1063 1064 static void 1065 stop_udp_parent(isc_nmsocket_t *sock) { 1066 isc_nmsocket_t *csock = NULL; 1067 1068 REQUIRE(VALID_NMSOCK(sock)); 1069 REQUIRE(sock->tid == isc_nm_tid()); 1070 REQUIRE(sock->type == isc_nm_udplistener); 1071 1072 isc_barrier_init(&sock->stoplistening, sock->nchildren); 1073 1074 for (size_t i = 0; i < sock->nchildren; i++) { 1075 csock = &sock->children[i]; 1076 REQUIRE(VALID_NMSOCK(csock)); 1077 1078 if ((int)i == isc_nm_tid()) { 1079 /* 1080 * We need to schedule closing the other sockets first 1081 */ 1082 continue; 1083 } 1084 1085 atomic_store(&csock->active, false); 1086 enqueue_stoplistening(csock); 1087 } 1088 1089 csock = &sock->children[isc_nm_tid()]; 1090 atomic_store(&csock->active, false); 1091 stop_udp_child(csock); 1092 1093 atomic_store(&sock->closed, true); 1094 isc__nmsocket_prep_destroy(sock); 1095 } 1096 1097 static void 1098 udp_close_direct(isc_nmsocket_t *sock) { 1099 REQUIRE(VALID_NMSOCK(sock)); 1100 REQUIRE(sock->tid == isc_nm_tid()); 1101 1102 uv_handle_set_data((uv_handle_t *)&sock->read_timer, sock); 1103 uv_close((uv_handle_t *)&sock->read_timer, read_timer_close_cb); 1104 } 1105 1106 void 1107 isc__nm_async_udpclose(isc__networker_t *worker, isc__netievent_t *ev0) { 1108 isc__netievent_udpclose_t *ievent = (isc__netievent_udpclose_t *)ev0; 1109 isc_nmsocket_t *sock = ievent->sock; 1110 1111 REQUIRE(VALID_NMSOCK(sock)); 1112 REQUIRE(sock->tid == isc_nm_tid()); 1113 UNUSED(worker); 1114 1115 udp_close_direct(sock); 1116 } 1117 1118 void 1119 isc__nm_udp_close(isc_nmsocket_t *sock) { 1120 REQUIRE(VALID_NMSOCK(sock)); 1121 REQUIRE(sock->type == isc_nm_udpsocket); 1122 REQUIRE(!isc__nmsocket_active(sock)); 1123 1124 if (!atomic_compare_exchange_strong(&sock->closing, &(bool){ false }, 1125 true)) 1126 { 1127 return; 1128 } 1129 1130 if (sock->tid == isc_nm_tid()) { 1131 udp_close_direct(sock); 1132 } else { 1133 isc__netievent_udpclose_t *ievent = 1134 isc__nm_get_netievent_udpclose(sock->mgr, sock); 1135 isc__nm_enqueue_ievent(&sock->mgr->workers[sock->tid], 1136 (isc__netievent_t *)ievent); 1137 } 1138 } 1139 1140 void 1141 isc__nm_udp_shutdown(isc_nmsocket_t *sock) { 1142 REQUIRE(VALID_NMSOCK(sock)); 1143 REQUIRE(sock->tid == isc_nm_tid()); 1144 REQUIRE(sock->type == isc_nm_udpsocket); 1145 1146 /* 1147 * If the socket is active, mark it inactive and 1148 * continue. If it isn't active, stop now. 1149 */ 1150 if (!isc__nmsocket_deactivate(sock)) { 1151 return; 1152 } 1153 1154 /* 1155 * If the socket is connecting, the cancel will happen in the 1156 * async_udpconnect() due socket being inactive now. 1157 */ 1158 if (atomic_load(&sock->connecting)) { 1159 return; 1160 } 1161 1162 /* 1163 * When the client detaches the last handle, the 1164 * sock->statichandle would be NULL, in that case, nobody is 1165 * interested in the callback. 1166 */ 1167 if (sock->statichandle != NULL) { 1168 isc__nm_failed_read_cb(sock, ISC_R_CANCELED, false); 1169 return; 1170 } 1171 1172 /* 1173 * Otherwise, we just send the socket to abyss... 1174 */ 1175 if (sock->parent == NULL) { 1176 isc__nmsocket_prep_destroy(sock); 1177 } 1178 } 1179 1180 void 1181 isc__nm_udp_cancelread(isc_nmhandle_t *handle) { 1182 isc_nmsocket_t *sock = NULL; 1183 isc__netievent_udpcancel_t *ievent = NULL; 1184 1185 REQUIRE(VALID_NMHANDLE(handle)); 1186 1187 sock = handle->sock; 1188 1189 REQUIRE(VALID_NMSOCK(sock)); 1190 REQUIRE(sock->type == isc_nm_udpsocket); 1191 1192 ievent = isc__nm_get_netievent_udpcancel(sock->mgr, sock, handle); 1193 1194 isc__nm_enqueue_ievent(&sock->mgr->workers[sock->tid], 1195 (isc__netievent_t *)ievent); 1196 } 1197 1198 void 1199 isc__nm_async_udpcancel(isc__networker_t *worker, isc__netievent_t *ev0) { 1200 isc__netievent_udpcancel_t *ievent = (isc__netievent_udpcancel_t *)ev0; 1201 isc_nmsocket_t *sock = NULL; 1202 1203 UNUSED(worker); 1204 1205 REQUIRE(VALID_NMSOCK(ievent->sock)); 1206 1207 sock = ievent->sock; 1208 1209 REQUIRE(sock->tid == isc_nm_tid()); 1210 REQUIRE(atomic_load(&sock->client)); 1211 1212 isc__nm_failed_read_cb(sock, ISC_R_EOF, false); 1213 } 1214