1 /* Copyright Joyent, Inc. and other Node contributors. All rights reserved. 2 * 3 * Permission is hereby granted, free of charge, to any person obtaining a copy 4 * of this software and associated documentation files (the "Software"), to 5 * deal in the Software without restriction, including without limitation the 6 * rights to use, copy, modify, merge, publish, distribute, sublicense, and/or 7 * sell copies of the Software, and to permit persons to whom the Software is 8 * furnished to do so, subject to the following conditions: 9 * 10 * The above copyright notice and this permission notice shall be included in 11 * all copies or substantial portions of the Software. 12 * 13 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR 14 * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, 15 * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE 16 * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER 17 * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING 18 * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS 19 * IN THE SOFTWARE. 20 */ 21 22 #include "uv.h" 23 #include "internal.h" 24 25 #include <assert.h> 26 #include <string.h> 27 #include <errno.h> 28 #include <stdlib.h> 29 #include <unistd.h> 30 #if defined(__MVS__) 31 #include <xti.h> 32 #endif 33 #include <sys/un.h> 34 35 #define UV__UDP_DGRAM_MAXSIZE (64 * 1024) 36 37 #if defined(IPV6_JOIN_GROUP) && !defined(IPV6_ADD_MEMBERSHIP) 38 # define IPV6_ADD_MEMBERSHIP IPV6_JOIN_GROUP 39 #endif 40 41 #if defined(IPV6_LEAVE_GROUP) && !defined(IPV6_DROP_MEMBERSHIP) 42 # define IPV6_DROP_MEMBERSHIP IPV6_LEAVE_GROUP 43 #endif 44 45 union uv__sockaddr { 46 struct sockaddr_in6 in6; 47 struct sockaddr_in in; 48 struct sockaddr addr; 49 }; 50 51 static void uv__udp_run_completed(uv_udp_t* handle); 52 static void uv__udp_io(uv_loop_t* loop, uv__io_t* w, unsigned int revents); 53 static void uv__udp_recvmsg(uv_udp_t* handle); 54 static void uv__udp_sendmsg(uv_udp_t* handle); 55 static int uv__udp_maybe_deferred_bind(uv_udp_t* handle, 56 int domain, 57 unsigned int flags); 58 59 #if HAVE_MMSG 60 61 #define UV__MMSG_MAXWIDTH 20 62 63 static int uv__udp_recvmmsg(uv_udp_t* handle, uv_buf_t* buf); 64 static void uv__udp_sendmmsg(uv_udp_t* handle); 65 66 static int uv__recvmmsg_avail; 67 static int uv__sendmmsg_avail; 68 static uv_once_t once = UV_ONCE_INIT; 69 70 static void uv__udp_mmsg_init(void) { 71 int ret; 72 int s; 73 s = uv__socket(AF_INET, SOCK_DGRAM, 0); 74 if (s < 0) 75 return; 76 ret = uv__sendmmsg(s, NULL, 0, 0); 77 if (ret == 0 || errno != ENOSYS) { 78 uv__sendmmsg_avail = 1; 79 uv__recvmmsg_avail = 1; 80 } else { 81 ret = uv__recvmmsg(s, NULL, 0, 0, NULL); 82 if (ret == 0 || errno != ENOSYS) 83 uv__recvmmsg_avail = 1; 84 } 85 uv__close(s); 86 } 87 88 #endif 89 90 void uv__udp_close(uv_udp_t* handle) { 91 uv__io_close(handle->loop, &handle->io_watcher); 92 uv__handle_stop(handle); 93 94 if (handle->io_watcher.fd != -1) { 95 uv__close(handle->io_watcher.fd); 96 handle->io_watcher.fd = -1; 97 } 98 } 99 100 101 void uv__udp_finish_close(uv_udp_t* handle) { 102 uv_udp_send_t* req; 103 QUEUE* q; 104 105 assert(!uv__io_active(&handle->io_watcher, POLLIN | POLLOUT)); 106 assert(handle->io_watcher.fd == -1); 107 108 while (!QUEUE_EMPTY(&handle->write_queue)) { 109 q = QUEUE_HEAD(&handle->write_queue); 110 QUEUE_REMOVE(q); 111 112 req = QUEUE_DATA(q, uv_udp_send_t, queue); 113 req->status = UV_ECANCELED; 114 QUEUE_INSERT_TAIL(&handle->write_completed_queue, &req->queue); 115 } 116 117 uv__udp_run_completed(handle); 118 119 assert(handle->send_queue_size == 0); 120 assert(handle->send_queue_count == 0); 121 122 /* Now tear down the handle. */ 123 handle->recv_cb = NULL; 124 handle->alloc_cb = NULL; 125 /* but _do not_ touch close_cb */ 126 } 127 128 129 static void uv__udp_run_completed(uv_udp_t* handle) { 130 uv_udp_send_t* req; 131 QUEUE* q; 132 133 assert(!(handle->flags & UV_HANDLE_UDP_PROCESSING)); 134 handle->flags |= UV_HANDLE_UDP_PROCESSING; 135 136 while (!QUEUE_EMPTY(&handle->write_completed_queue)) { 137 q = QUEUE_HEAD(&handle->write_completed_queue); 138 QUEUE_REMOVE(q); 139 140 req = QUEUE_DATA(q, uv_udp_send_t, queue); 141 uv__req_unregister(handle->loop, req); 142 143 handle->send_queue_size -= uv__count_bufs(req->bufs, req->nbufs); 144 handle->send_queue_count--; 145 146 if (req->bufs != req->bufsml) 147 uv__free(req->bufs); 148 req->bufs = NULL; 149 150 if (req->send_cb == NULL) 151 continue; 152 153 /* req->status >= 0 == bytes written 154 * req->status < 0 == errno 155 */ 156 if (req->status >= 0) 157 req->send_cb(req, 0); 158 else 159 req->send_cb(req, req->status); 160 } 161 162 if (QUEUE_EMPTY(&handle->write_queue)) { 163 /* Pending queue and completion queue empty, stop watcher. */ 164 uv__io_stop(handle->loop, &handle->io_watcher, POLLOUT); 165 if (!uv__io_active(&handle->io_watcher, POLLIN)) 166 uv__handle_stop(handle); 167 } 168 169 handle->flags &= ~UV_HANDLE_UDP_PROCESSING; 170 } 171 172 173 static void uv__udp_io(uv_loop_t* loop, uv__io_t* w, unsigned int revents) { 174 uv_udp_t* handle; 175 176 handle = container_of(w, uv_udp_t, io_watcher); 177 assert(handle->type == UV_UDP); 178 179 if (revents & POLLIN) 180 uv__udp_recvmsg(handle); 181 182 if (revents & POLLOUT) { 183 uv__udp_sendmsg(handle); 184 uv__udp_run_completed(handle); 185 } 186 } 187 188 #if HAVE_MMSG 189 static int uv__udp_recvmmsg(uv_udp_t* handle, uv_buf_t* buf) { 190 struct sockaddr_in6 peers[UV__MMSG_MAXWIDTH]; 191 struct iovec iov[UV__MMSG_MAXWIDTH]; 192 struct uv__mmsghdr msgs[UV__MMSG_MAXWIDTH]; 193 ssize_t nread; 194 uv_buf_t chunk_buf; 195 size_t chunks; 196 int flags; 197 size_t k; 198 199 /* prepare structures for recvmmsg */ 200 chunks = buf->len / UV__UDP_DGRAM_MAXSIZE; 201 if (chunks > ARRAY_SIZE(iov)) 202 chunks = ARRAY_SIZE(iov); 203 for (k = 0; k < chunks; ++k) { 204 iov[k].iov_base = buf->base + k * UV__UDP_DGRAM_MAXSIZE; 205 iov[k].iov_len = UV__UDP_DGRAM_MAXSIZE; 206 msgs[k].msg_hdr.msg_iov = iov + k; 207 msgs[k].msg_hdr.msg_iovlen = 1; 208 msgs[k].msg_hdr.msg_name = peers + k; 209 msgs[k].msg_hdr.msg_namelen = sizeof(peers[0]); 210 msgs[k].msg_hdr.msg_control = NULL; 211 msgs[k].msg_hdr.msg_controllen = 0; 212 msgs[k].msg_hdr.msg_flags = 0; 213 } 214 215 do 216 nread = uv__recvmmsg(handle->io_watcher.fd, msgs, chunks, 0, NULL); 217 while (nread == -1 && errno == EINTR); 218 219 if (nread < 1) { 220 if (nread == 0 || errno == EAGAIN || errno == EWOULDBLOCK) 221 handle->recv_cb(handle, 0, buf, NULL, 0); 222 else 223 handle->recv_cb(handle, UV__ERR(errno), buf, NULL, 0); 224 } else { 225 /* pass each chunk to the application */ 226 for (k = 0; k < (size_t) nread && handle->recv_cb != NULL; k++) { 227 flags = UV_UDP_MMSG_CHUNK; 228 if (msgs[k].msg_hdr.msg_flags & MSG_TRUNC) 229 flags |= UV_UDP_PARTIAL; 230 231 chunk_buf = uv_buf_init(iov[k].iov_base, iov[k].iov_len); 232 handle->recv_cb(handle, 233 msgs[k].msg_len, 234 &chunk_buf, 235 msgs[k].msg_hdr.msg_name, 236 flags); 237 } 238 239 /* one last callback so the original buffer is freed */ 240 if (handle->recv_cb != NULL) 241 handle->recv_cb(handle, 0, buf, NULL, 0); 242 } 243 return nread; 244 } 245 #endif 246 247 static void uv__udp_recvmsg(uv_udp_t* handle) { 248 struct sockaddr_storage peer; 249 struct msghdr h; 250 ssize_t nread; 251 uv_buf_t buf; 252 int flags; 253 int count; 254 255 assert(handle->recv_cb != NULL); 256 assert(handle->alloc_cb != NULL); 257 258 /* Prevent loop starvation when the data comes in as fast as (or faster than) 259 * we can read it. XXX Need to rearm fd if we switch to edge-triggered I/O. 260 */ 261 count = 32; 262 263 do { 264 buf = uv_buf_init(NULL, 0); 265 handle->alloc_cb((uv_handle_t*) handle, UV__UDP_DGRAM_MAXSIZE, &buf); 266 if (buf.base == NULL || buf.len == 0) { 267 handle->recv_cb(handle, UV_ENOBUFS, &buf, NULL, 0); 268 return; 269 } 270 assert(buf.base != NULL); 271 272 #if HAVE_MMSG 273 if (handle->flags & UV_HANDLE_UDP_RECVMMSG) { 274 uv_once(&once, uv__udp_mmsg_init); 275 if (uv__recvmmsg_avail) { 276 nread = uv__udp_recvmmsg(handle, &buf); 277 if (nread > 0) 278 count -= nread; 279 continue; 280 } 281 } 282 #endif 283 284 memset(&h, 0, sizeof(h)); 285 memset(&peer, 0, sizeof(peer)); 286 h.msg_name = &peer; 287 h.msg_namelen = sizeof(peer); 288 h.msg_iov = (void*) &buf; 289 h.msg_iovlen = 1; 290 291 do { 292 nread = recvmsg(handle->io_watcher.fd, &h, 0); 293 } 294 while (nread == -1 && errno == EINTR); 295 296 if (nread == -1) { 297 if (errno == EAGAIN || errno == EWOULDBLOCK) 298 handle->recv_cb(handle, 0, &buf, NULL, 0); 299 else 300 handle->recv_cb(handle, UV__ERR(errno), &buf, NULL, 0); 301 } 302 else { 303 flags = 0; 304 if (h.msg_flags & MSG_TRUNC) 305 flags |= UV_UDP_PARTIAL; 306 307 handle->recv_cb(handle, nread, &buf, (const struct sockaddr*) &peer, flags); 308 } 309 count--; 310 } 311 /* recv_cb callback may decide to pause or close the handle */ 312 while (nread != -1 313 && count > 0 314 && handle->io_watcher.fd != -1 315 && handle->recv_cb != NULL); 316 } 317 318 #if HAVE_MMSG 319 static void uv__udp_sendmmsg(uv_udp_t* handle) { 320 uv_udp_send_t* req; 321 struct uv__mmsghdr h[UV__MMSG_MAXWIDTH]; 322 struct uv__mmsghdr *p; 323 QUEUE* q; 324 ssize_t npkts; 325 size_t pkts; 326 size_t i; 327 328 if (QUEUE_EMPTY(&handle->write_queue)) 329 return; 330 331 write_queue_drain: 332 for (pkts = 0, q = QUEUE_HEAD(&handle->write_queue); 333 pkts < UV__MMSG_MAXWIDTH && q != &handle->write_queue; 334 ++pkts, q = QUEUE_HEAD(q)) { 335 assert(q != NULL); 336 req = QUEUE_DATA(q, uv_udp_send_t, queue); 337 assert(req != NULL); 338 339 p = &h[pkts]; 340 memset(p, 0, sizeof(*p)); 341 if (req->addr.ss_family == AF_UNSPEC) { 342 p->msg_hdr.msg_name = NULL; 343 p->msg_hdr.msg_namelen = 0; 344 } else { 345 p->msg_hdr.msg_name = &req->addr; 346 if (req->addr.ss_family == AF_INET6) 347 p->msg_hdr.msg_namelen = sizeof(struct sockaddr_in6); 348 else if (req->addr.ss_family == AF_INET) 349 p->msg_hdr.msg_namelen = sizeof(struct sockaddr_in); 350 else if (req->addr.ss_family == AF_UNIX) 351 p->msg_hdr.msg_namelen = sizeof(struct sockaddr_un); 352 else { 353 assert(0 && "unsupported address family"); 354 abort(); 355 } 356 } 357 h[pkts].msg_hdr.msg_iov = (struct iovec*) req->bufs; 358 h[pkts].msg_hdr.msg_iovlen = req->nbufs; 359 } 360 361 do 362 npkts = uv__sendmmsg(handle->io_watcher.fd, h, pkts, 0); 363 while (npkts == -1 && errno == EINTR); 364 365 if (npkts < 1) { 366 if (errno == EAGAIN || errno == EWOULDBLOCK || errno == ENOBUFS) 367 return; 368 for (i = 0, q = QUEUE_HEAD(&handle->write_queue); 369 i < pkts && q != &handle->write_queue; 370 ++i, q = QUEUE_HEAD(q)) { 371 assert(q != NULL); 372 req = QUEUE_DATA(q, uv_udp_send_t, queue); 373 assert(req != NULL); 374 375 req->status = UV__ERR(errno); 376 QUEUE_REMOVE(&req->queue); 377 QUEUE_INSERT_TAIL(&handle->write_completed_queue, &req->queue); 378 } 379 uv__io_feed(handle->loop, &handle->io_watcher); 380 return; 381 } 382 383 for (i = 0, q = QUEUE_HEAD(&handle->write_queue); 384 i < pkts && q != &handle->write_queue; 385 ++i, q = QUEUE_HEAD(&handle->write_queue)) { 386 assert(q != NULL); 387 req = QUEUE_DATA(q, uv_udp_send_t, queue); 388 assert(req != NULL); 389 390 req->status = req->bufs[0].len; 391 392 /* Sending a datagram is an atomic operation: either all data 393 * is written or nothing is (and EMSGSIZE is raised). That is 394 * why we don't handle partial writes. Just pop the request 395 * off the write queue and onto the completed queue, done. 396 */ 397 QUEUE_REMOVE(&req->queue); 398 QUEUE_INSERT_TAIL(&handle->write_completed_queue, &req->queue); 399 } 400 401 /* couldn't batch everything, continue sending (jump to avoid stack growth) */ 402 if (!QUEUE_EMPTY(&handle->write_queue)) 403 goto write_queue_drain; 404 uv__io_feed(handle->loop, &handle->io_watcher); 405 return; 406 } 407 #endif 408 409 static void uv__udp_sendmsg(uv_udp_t* handle) { 410 uv_udp_send_t* req; 411 struct msghdr h; 412 QUEUE* q; 413 ssize_t size; 414 415 #if HAVE_MMSG 416 uv_once(&once, uv__udp_mmsg_init); 417 if (uv__sendmmsg_avail) { 418 uv__udp_sendmmsg(handle); 419 return; 420 } 421 #endif 422 423 while (!QUEUE_EMPTY(&handle->write_queue)) { 424 q = QUEUE_HEAD(&handle->write_queue); 425 assert(q != NULL); 426 427 req = QUEUE_DATA(q, uv_udp_send_t, queue); 428 assert(req != NULL); 429 430 memset(&h, 0, sizeof h); 431 if (req->addr.ss_family == AF_UNSPEC) { 432 h.msg_name = NULL; 433 h.msg_namelen = 0; 434 } else { 435 h.msg_name = &req->addr; 436 if (req->addr.ss_family == AF_INET6) 437 h.msg_namelen = sizeof(struct sockaddr_in6); 438 else if (req->addr.ss_family == AF_INET) 439 h.msg_namelen = sizeof(struct sockaddr_in); 440 else if (req->addr.ss_family == AF_UNIX) 441 h.msg_namelen = sizeof(struct sockaddr_un); 442 else { 443 assert(0 && "unsupported address family"); 444 abort(); 445 } 446 } 447 h.msg_iov = (struct iovec*) req->bufs; 448 h.msg_iovlen = req->nbufs; 449 450 do { 451 size = sendmsg(handle->io_watcher.fd, &h, 0); 452 } while (size == -1 && errno == EINTR); 453 454 if (size == -1) { 455 if (errno == EAGAIN || errno == EWOULDBLOCK || errno == ENOBUFS) 456 break; 457 } 458 459 req->status = (size == -1 ? UV__ERR(errno) : size); 460 461 /* Sending a datagram is an atomic operation: either all data 462 * is written or nothing is (and EMSGSIZE is raised). That is 463 * why we don't handle partial writes. Just pop the request 464 * off the write queue and onto the completed queue, done. 465 */ 466 QUEUE_REMOVE(&req->queue); 467 QUEUE_INSERT_TAIL(&handle->write_completed_queue, &req->queue); 468 uv__io_feed(handle->loop, &handle->io_watcher); 469 } 470 } 471 472 /* On the BSDs, SO_REUSEPORT implies SO_REUSEADDR but with some additional 473 * refinements for programs that use multicast. 474 * 475 * Linux as of 3.9 has a SO_REUSEPORT socket option but with semantics that 476 * are different from the BSDs: it _shares_ the port rather than steal it 477 * from the current listener. While useful, it's not something we can emulate 478 * on other platforms so we don't enable it. 479 * 480 * zOS does not support getsockname with SO_REUSEPORT option when using 481 * AF_UNIX. 482 */ 483 static int uv__set_reuse(int fd) { 484 int yes; 485 yes = 1; 486 487 #if defined(SO_REUSEPORT) && defined(__MVS__) 488 struct sockaddr_in sockfd; 489 unsigned int sockfd_len = sizeof(sockfd); 490 if (getsockname(fd, (struct sockaddr*) &sockfd, &sockfd_len) == -1) 491 return UV__ERR(errno); 492 if (sockfd.sin_family == AF_UNIX) { 493 if (setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &yes, sizeof(yes))) 494 return UV__ERR(errno); 495 } else { 496 if (setsockopt(fd, SOL_SOCKET, SO_REUSEPORT, &yes, sizeof(yes))) 497 return UV__ERR(errno); 498 } 499 #elif defined(SO_REUSEPORT) && !defined(__linux__) 500 if (setsockopt(fd, SOL_SOCKET, SO_REUSEPORT, &yes, sizeof(yes))) 501 return UV__ERR(errno); 502 #else 503 if (setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &yes, sizeof(yes))) 504 return UV__ERR(errno); 505 #endif 506 507 return 0; 508 } 509 510 511 int uv__udp_bind(uv_udp_t* handle, 512 const struct sockaddr* addr, 513 unsigned int addrlen, 514 unsigned int flags) { 515 int err; 516 int yes; 517 int fd; 518 519 /* Check for bad flags. */ 520 if (flags & ~(UV_UDP_IPV6ONLY | UV_UDP_REUSEADDR)) 521 return UV_EINVAL; 522 523 /* Cannot set IPv6-only mode on non-IPv6 socket. */ 524 if ((flags & UV_UDP_IPV6ONLY) && addr->sa_family != AF_INET6) 525 return UV_EINVAL; 526 527 fd = handle->io_watcher.fd; 528 if (fd == -1) { 529 err = uv__socket(addr->sa_family, SOCK_DGRAM, 0); 530 if (err < 0) 531 return err; 532 fd = err; 533 handle->io_watcher.fd = fd; 534 } 535 536 if (flags & UV_UDP_REUSEADDR) { 537 err = uv__set_reuse(fd); 538 if (err) 539 return err; 540 } 541 542 if (flags & UV_UDP_IPV6ONLY) { 543 #ifdef IPV6_V6ONLY 544 yes = 1; 545 if (setsockopt(fd, IPPROTO_IPV6, IPV6_V6ONLY, &yes, sizeof yes) == -1) { 546 err = UV__ERR(errno); 547 return err; 548 } 549 #else 550 err = UV_ENOTSUP; 551 return err; 552 #endif 553 } 554 555 if (bind(fd, addr, addrlen)) { 556 err = UV__ERR(errno); 557 if (errno == EAFNOSUPPORT) 558 /* OSX, other BSDs and SunoS fail with EAFNOSUPPORT when binding a 559 * socket created with AF_INET to an AF_INET6 address or vice versa. */ 560 err = UV_EINVAL; 561 return err; 562 } 563 564 if (addr->sa_family == AF_INET6) 565 handle->flags |= UV_HANDLE_IPV6; 566 567 handle->flags |= UV_HANDLE_BOUND; 568 return 0; 569 } 570 571 572 static int uv__udp_maybe_deferred_bind(uv_udp_t* handle, 573 int domain, 574 unsigned int flags) { 575 union uv__sockaddr taddr; 576 socklen_t addrlen; 577 578 if (handle->io_watcher.fd != -1) 579 return 0; 580 581 switch (domain) { 582 case AF_INET: 583 { 584 struct sockaddr_in* addr = &taddr.in; 585 memset(addr, 0, sizeof *addr); 586 addr->sin_family = AF_INET; 587 addr->sin_addr.s_addr = INADDR_ANY; 588 addrlen = sizeof *addr; 589 break; 590 } 591 case AF_INET6: 592 { 593 struct sockaddr_in6* addr = &taddr.in6; 594 memset(addr, 0, sizeof *addr); 595 addr->sin6_family = AF_INET6; 596 addr->sin6_addr = in6addr_any; 597 addrlen = sizeof *addr; 598 break; 599 } 600 default: 601 assert(0 && "unsupported address family"); 602 abort(); 603 } 604 605 return uv__udp_bind(handle, &taddr.addr, addrlen, flags); 606 } 607 608 609 int uv__udp_connect(uv_udp_t* handle, 610 const struct sockaddr* addr, 611 unsigned int addrlen) { 612 int err; 613 614 err = uv__udp_maybe_deferred_bind(handle, addr->sa_family, 0); 615 if (err) 616 return err; 617 618 do { 619 errno = 0; 620 err = connect(handle->io_watcher.fd, addr, addrlen); 621 } while (err == -1 && errno == EINTR); 622 623 if (err) 624 return UV__ERR(errno); 625 626 handle->flags |= UV_HANDLE_UDP_CONNECTED; 627 628 return 0; 629 } 630 631 632 int uv__udp_disconnect(uv_udp_t* handle) { 633 int r; 634 struct sockaddr addr; 635 636 memset(&addr, 0, sizeof(addr)); 637 638 addr.sa_family = AF_UNSPEC; 639 640 do { 641 errno = 0; 642 r = connect(handle->io_watcher.fd, &addr, sizeof(addr)); 643 } while (r == -1 && errno == EINTR); 644 645 if (r == -1 && errno != EAFNOSUPPORT) 646 return UV__ERR(errno); 647 648 handle->flags &= ~UV_HANDLE_UDP_CONNECTED; 649 return 0; 650 } 651 652 653 int uv__udp_send(uv_udp_send_t* req, 654 uv_udp_t* handle, 655 const uv_buf_t bufs[], 656 unsigned int nbufs, 657 const struct sockaddr* addr, 658 unsigned int addrlen, 659 uv_udp_send_cb send_cb) { 660 int err; 661 int empty_queue; 662 663 assert(nbufs > 0); 664 665 if (addr) { 666 err = uv__udp_maybe_deferred_bind(handle, addr->sa_family, 0); 667 if (err) 668 return err; 669 } 670 671 /* It's legal for send_queue_count > 0 even when the write_queue is empty; 672 * it means there are error-state requests in the write_completed_queue that 673 * will touch up send_queue_size/count later. 674 */ 675 empty_queue = (handle->send_queue_count == 0); 676 677 uv__req_init(handle->loop, req, UV_UDP_SEND); 678 assert(addrlen <= sizeof(req->addr)); 679 if (addr == NULL) 680 req->addr.ss_family = AF_UNSPEC; 681 else 682 memcpy(&req->addr, addr, addrlen); 683 req->send_cb = send_cb; 684 req->handle = handle; 685 req->nbufs = nbufs; 686 687 req->bufs = req->bufsml; 688 if (nbufs > ARRAY_SIZE(req->bufsml)) 689 req->bufs = uv__malloc(nbufs * sizeof(bufs[0])); 690 691 if (req->bufs == NULL) { 692 uv__req_unregister(handle->loop, req); 693 return UV_ENOMEM; 694 } 695 696 memcpy(req->bufs, bufs, nbufs * sizeof(bufs[0])); 697 handle->send_queue_size += uv__count_bufs(req->bufs, req->nbufs); 698 handle->send_queue_count++; 699 QUEUE_INSERT_TAIL(&handle->write_queue, &req->queue); 700 uv__handle_start(handle); 701 702 if (empty_queue && !(handle->flags & UV_HANDLE_UDP_PROCESSING)) { 703 uv__udp_sendmsg(handle); 704 705 /* `uv__udp_sendmsg` may not be able to do non-blocking write straight 706 * away. In such cases the `io_watcher` has to be queued for asynchronous 707 * write. 708 */ 709 if (!QUEUE_EMPTY(&handle->write_queue)) 710 uv__io_start(handle->loop, &handle->io_watcher, POLLOUT); 711 } else { 712 uv__io_start(handle->loop, &handle->io_watcher, POLLOUT); 713 } 714 715 return 0; 716 } 717 718 719 int uv__udp_try_send(uv_udp_t* handle, 720 const uv_buf_t bufs[], 721 unsigned int nbufs, 722 const struct sockaddr* addr, 723 unsigned int addrlen) { 724 int err; 725 struct msghdr h; 726 ssize_t size; 727 728 assert(nbufs > 0); 729 730 /* already sending a message */ 731 if (handle->send_queue_count != 0) 732 return UV_EAGAIN; 733 734 if (addr) { 735 err = uv__udp_maybe_deferred_bind(handle, addr->sa_family, 0); 736 if (err) 737 return err; 738 } else { 739 assert(handle->flags & UV_HANDLE_UDP_CONNECTED); 740 } 741 742 memset(&h, 0, sizeof h); 743 h.msg_name = (struct sockaddr*) addr; 744 h.msg_namelen = addrlen; 745 h.msg_iov = (struct iovec*) bufs; 746 h.msg_iovlen = nbufs; 747 748 do { 749 size = sendmsg(handle->io_watcher.fd, &h, 0); 750 } while (size == -1 && errno == EINTR); 751 752 if (size == -1) { 753 if (errno == EAGAIN || errno == EWOULDBLOCK || errno == ENOBUFS) 754 return UV_EAGAIN; 755 else 756 return UV__ERR(errno); 757 } 758 759 return size; 760 } 761 762 763 static int uv__udp_set_membership4(uv_udp_t* handle, 764 const struct sockaddr_in* multicast_addr, 765 const char* interface_addr, 766 uv_membership membership) { 767 struct ip_mreq mreq; 768 int optname; 769 int err; 770 771 memset(&mreq, 0, sizeof mreq); 772 773 if (interface_addr) { 774 err = uv_inet_pton(AF_INET, interface_addr, &mreq.imr_interface.s_addr); 775 if (err) 776 return err; 777 } else { 778 mreq.imr_interface.s_addr = htonl(INADDR_ANY); 779 } 780 781 mreq.imr_multiaddr.s_addr = multicast_addr->sin_addr.s_addr; 782 783 switch (membership) { 784 case UV_JOIN_GROUP: 785 optname = IP_ADD_MEMBERSHIP; 786 break; 787 case UV_LEAVE_GROUP: 788 optname = IP_DROP_MEMBERSHIP; 789 break; 790 default: 791 return UV_EINVAL; 792 } 793 794 if (setsockopt(handle->io_watcher.fd, 795 IPPROTO_IP, 796 optname, 797 &mreq, 798 sizeof(mreq))) { 799 #if defined(__MVS__) 800 if (errno == ENXIO) 801 return UV_ENODEV; 802 #endif 803 return UV__ERR(errno); 804 } 805 806 return 0; 807 } 808 809 810 static int uv__udp_set_membership6(uv_udp_t* handle, 811 const struct sockaddr_in6* multicast_addr, 812 const char* interface_addr, 813 uv_membership membership) { 814 int optname; 815 struct ipv6_mreq mreq; 816 struct sockaddr_in6 addr6; 817 818 memset(&mreq, 0, sizeof mreq); 819 820 if (interface_addr) { 821 if (uv_ip6_addr(interface_addr, 0, &addr6)) 822 return UV_EINVAL; 823 mreq.ipv6mr_interface = addr6.sin6_scope_id; 824 } else { 825 mreq.ipv6mr_interface = 0; 826 } 827 828 mreq.ipv6mr_multiaddr = multicast_addr->sin6_addr; 829 830 switch (membership) { 831 case UV_JOIN_GROUP: 832 optname = IPV6_ADD_MEMBERSHIP; 833 break; 834 case UV_LEAVE_GROUP: 835 optname = IPV6_DROP_MEMBERSHIP; 836 break; 837 default: 838 return UV_EINVAL; 839 } 840 841 if (setsockopt(handle->io_watcher.fd, 842 IPPROTO_IPV6, 843 optname, 844 &mreq, 845 sizeof(mreq))) { 846 #if defined(__MVS__) 847 if (errno == ENXIO) 848 return UV_ENODEV; 849 #endif 850 return UV__ERR(errno); 851 } 852 853 return 0; 854 } 855 856 857 #if !defined(__OpenBSD__) && !defined(__NetBSD__) && !defined(__ANDROID__) 858 static int uv__udp_set_source_membership4(uv_udp_t* handle, 859 const struct sockaddr_in* multicast_addr, 860 const char* interface_addr, 861 const struct sockaddr_in* source_addr, 862 uv_membership membership) { 863 struct ip_mreq_source mreq; 864 int optname; 865 int err; 866 867 err = uv__udp_maybe_deferred_bind(handle, AF_INET, UV_UDP_REUSEADDR); 868 if (err) 869 return err; 870 871 memset(&mreq, 0, sizeof(mreq)); 872 873 if (interface_addr != NULL) { 874 err = uv_inet_pton(AF_INET, interface_addr, &mreq.imr_interface.s_addr); 875 if (err) 876 return err; 877 } else { 878 mreq.imr_interface.s_addr = htonl(INADDR_ANY); 879 } 880 881 mreq.imr_multiaddr.s_addr = multicast_addr->sin_addr.s_addr; 882 mreq.imr_sourceaddr.s_addr = source_addr->sin_addr.s_addr; 883 884 if (membership == UV_JOIN_GROUP) 885 optname = IP_ADD_SOURCE_MEMBERSHIP; 886 else if (membership == UV_LEAVE_GROUP) 887 optname = IP_DROP_SOURCE_MEMBERSHIP; 888 else 889 return UV_EINVAL; 890 891 if (setsockopt(handle->io_watcher.fd, 892 IPPROTO_IP, 893 optname, 894 &mreq, 895 sizeof(mreq))) { 896 return UV__ERR(errno); 897 } 898 899 return 0; 900 } 901 902 903 static int uv__udp_set_source_membership6(uv_udp_t* handle, 904 const struct sockaddr_in6* multicast_addr, 905 const char* interface_addr, 906 const struct sockaddr_in6* source_addr, 907 uv_membership membership) { 908 struct group_source_req mreq; 909 struct sockaddr_in6 addr6; 910 int optname; 911 int err; 912 913 err = uv__udp_maybe_deferred_bind(handle, AF_INET6, UV_UDP_REUSEADDR); 914 if (err) 915 return err; 916 917 memset(&mreq, 0, sizeof(mreq)); 918 919 if (interface_addr != NULL) { 920 err = uv_ip6_addr(interface_addr, 0, &addr6); 921 if (err) 922 return err; 923 mreq.gsr_interface = addr6.sin6_scope_id; 924 } else { 925 mreq.gsr_interface = 0; 926 } 927 928 STATIC_ASSERT(sizeof(mreq.gsr_group) >= sizeof(*multicast_addr)); 929 STATIC_ASSERT(sizeof(mreq.gsr_source) >= sizeof(*source_addr)); 930 memcpy(&mreq.gsr_group, multicast_addr, sizeof(*multicast_addr)); 931 memcpy(&mreq.gsr_source, source_addr, sizeof(*source_addr)); 932 933 if (membership == UV_JOIN_GROUP) 934 optname = MCAST_JOIN_SOURCE_GROUP; 935 else if (membership == UV_LEAVE_GROUP) 936 optname = MCAST_LEAVE_SOURCE_GROUP; 937 else 938 return UV_EINVAL; 939 940 if (setsockopt(handle->io_watcher.fd, 941 IPPROTO_IPV6, 942 optname, 943 &mreq, 944 sizeof(mreq))) { 945 return UV__ERR(errno); 946 } 947 948 return 0; 949 } 950 #endif 951 952 953 int uv__udp_init_ex(uv_loop_t* loop, 954 uv_udp_t* handle, 955 unsigned flags, 956 int domain) { 957 int fd; 958 959 fd = -1; 960 if (domain != AF_UNSPEC) { 961 fd = uv__socket(domain, SOCK_DGRAM, 0); 962 if (fd < 0) 963 return fd; 964 } 965 966 uv__handle_init(loop, (uv_handle_t*)handle, UV_UDP); 967 handle->alloc_cb = NULL; 968 handle->recv_cb = NULL; 969 handle->send_queue_size = 0; 970 handle->send_queue_count = 0; 971 uv__io_init(&handle->io_watcher, uv__udp_io, fd); 972 QUEUE_INIT(&handle->write_queue); 973 QUEUE_INIT(&handle->write_completed_queue); 974 975 return 0; 976 } 977 978 979 int uv_udp_open(uv_udp_t* handle, uv_os_sock_t sock) { 980 int err; 981 982 /* Check for already active socket. */ 983 if (handle->io_watcher.fd != -1) 984 return UV_EBUSY; 985 986 if (uv__fd_exists(handle->loop, sock)) 987 return UV_EEXIST; 988 989 err = uv__nonblock(sock, 1); 990 if (err) 991 return err; 992 993 err = uv__set_reuse(sock); 994 if (err) 995 return err; 996 997 handle->io_watcher.fd = sock; 998 if (uv__udp_is_connected(handle)) 999 handle->flags |= UV_HANDLE_UDP_CONNECTED; 1000 1001 return 0; 1002 } 1003 1004 1005 int uv_udp_set_membership(uv_udp_t* handle, 1006 const char* multicast_addr, 1007 const char* interface_addr, 1008 uv_membership membership) { 1009 int err; 1010 struct sockaddr_in addr4; 1011 struct sockaddr_in6 addr6; 1012 1013 if (uv_ip4_addr(multicast_addr, 0, &addr4) == 0) { 1014 err = uv__udp_maybe_deferred_bind(handle, AF_INET, UV_UDP_REUSEADDR); 1015 if (err) 1016 return err; 1017 return uv__udp_set_membership4(handle, &addr4, interface_addr, membership); 1018 } else if (uv_ip6_addr(multicast_addr, 0, &addr6) == 0) { 1019 err = uv__udp_maybe_deferred_bind(handle, AF_INET6, UV_UDP_REUSEADDR); 1020 if (err) 1021 return err; 1022 return uv__udp_set_membership6(handle, &addr6, interface_addr, membership); 1023 } else { 1024 return UV_EINVAL; 1025 } 1026 } 1027 1028 1029 int uv_udp_set_source_membership(uv_udp_t* handle, 1030 const char* multicast_addr, 1031 const char* interface_addr, 1032 const char* source_addr, 1033 uv_membership membership) { 1034 #if !defined(__OpenBSD__) && !defined(__NetBSD__) && !defined(__ANDROID__) 1035 int err; 1036 union uv__sockaddr mcast_addr; 1037 union uv__sockaddr src_addr; 1038 1039 err = uv_ip4_addr(multicast_addr, 0, &mcast_addr.in); 1040 if (err) { 1041 err = uv_ip6_addr(multicast_addr, 0, &mcast_addr.in6); 1042 if (err) 1043 return err; 1044 err = uv_ip6_addr(source_addr, 0, &src_addr.in6); 1045 if (err) 1046 return err; 1047 return uv__udp_set_source_membership6(handle, 1048 &mcast_addr.in6, 1049 interface_addr, 1050 &src_addr.in6, 1051 membership); 1052 } 1053 1054 err = uv_ip4_addr(source_addr, 0, &src_addr.in); 1055 if (err) 1056 return err; 1057 return uv__udp_set_source_membership4(handle, 1058 &mcast_addr.in, 1059 interface_addr, 1060 &src_addr.in, 1061 membership); 1062 #else 1063 return UV_ENOSYS; 1064 #endif 1065 } 1066 1067 1068 static int uv__setsockopt(uv_udp_t* handle, 1069 int option4, 1070 int option6, 1071 const void* val, 1072 socklen_t size) { 1073 int r; 1074 1075 if (handle->flags & UV_HANDLE_IPV6) 1076 r = setsockopt(handle->io_watcher.fd, 1077 IPPROTO_IPV6, 1078 option6, 1079 val, 1080 size); 1081 else 1082 r = setsockopt(handle->io_watcher.fd, 1083 IPPROTO_IP, 1084 option4, 1085 val, 1086 size); 1087 if (r) 1088 return UV__ERR(errno); 1089 1090 return 0; 1091 } 1092 1093 static int uv__setsockopt_maybe_char(uv_udp_t* handle, 1094 int option4, 1095 int option6, 1096 int val) { 1097 #if defined(__sun) || defined(_AIX) || defined(__MVS__) 1098 char arg = val; 1099 #elif defined(__OpenBSD__) 1100 unsigned char arg = val; 1101 #else 1102 int arg = val; 1103 #endif 1104 1105 if (val < 0 || val > 255) 1106 return UV_EINVAL; 1107 1108 return uv__setsockopt(handle, option4, option6, &arg, sizeof(arg)); 1109 } 1110 1111 1112 int uv_udp_set_broadcast(uv_udp_t* handle, int on) { 1113 if (setsockopt(handle->io_watcher.fd, 1114 SOL_SOCKET, 1115 SO_BROADCAST, 1116 &on, 1117 sizeof(on))) { 1118 return UV__ERR(errno); 1119 } 1120 1121 return 0; 1122 } 1123 1124 1125 int uv_udp_set_ttl(uv_udp_t* handle, int ttl) { 1126 if (ttl < 1 || ttl > 255) 1127 return UV_EINVAL; 1128 1129 #if defined(__MVS__) 1130 if (!(handle->flags & UV_HANDLE_IPV6)) 1131 return UV_ENOTSUP; /* zOS does not support setting ttl for IPv4 */ 1132 #endif 1133 1134 /* 1135 * On Solaris and derivatives such as SmartOS, the length of socket options 1136 * is sizeof(int) for IP_TTL and IPV6_UNICAST_HOPS, 1137 * so hardcode the size of these options on this platform, 1138 * and use the general uv__setsockopt_maybe_char call on other platforms. 1139 */ 1140 #if defined(__sun) || defined(_AIX) || defined(__OpenBSD__) || \ 1141 defined(__MVS__) 1142 1143 return uv__setsockopt(handle, 1144 IP_TTL, 1145 IPV6_UNICAST_HOPS, 1146 &ttl, 1147 sizeof(ttl)); 1148 1149 #else /* !(defined(__sun) || defined(_AIX) || defined (__OpenBSD__) || 1150 defined(__MVS__)) */ 1151 1152 return uv__setsockopt_maybe_char(handle, 1153 IP_TTL, 1154 IPV6_UNICAST_HOPS, 1155 ttl); 1156 1157 #endif /* defined(__sun) || defined(_AIX) || defined (__OpenBSD__) || 1158 defined(__MVS__) */ 1159 } 1160 1161 1162 int uv_udp_set_multicast_ttl(uv_udp_t* handle, int ttl) { 1163 /* 1164 * On Solaris and derivatives such as SmartOS, the length of socket options 1165 * is sizeof(int) for IPV6_MULTICAST_HOPS and sizeof(char) for 1166 * IP_MULTICAST_TTL, so hardcode the size of the option in the IPv6 case, 1167 * and use the general uv__setsockopt_maybe_char call otherwise. 1168 */ 1169 #if defined(__sun) || defined(_AIX) || defined(__OpenBSD__) || \ 1170 defined(__MVS__) 1171 if (handle->flags & UV_HANDLE_IPV6) 1172 return uv__setsockopt(handle, 1173 IP_MULTICAST_TTL, 1174 IPV6_MULTICAST_HOPS, 1175 &ttl, 1176 sizeof(ttl)); 1177 #endif /* defined(__sun) || defined(_AIX) || defined(__OpenBSD__) || \ 1178 defined(__MVS__) */ 1179 1180 return uv__setsockopt_maybe_char(handle, 1181 IP_MULTICAST_TTL, 1182 IPV6_MULTICAST_HOPS, 1183 ttl); 1184 } 1185 1186 1187 int uv_udp_set_multicast_loop(uv_udp_t* handle, int on) { 1188 /* 1189 * On Solaris and derivatives such as SmartOS, the length of socket options 1190 * is sizeof(int) for IPV6_MULTICAST_LOOP and sizeof(char) for 1191 * IP_MULTICAST_LOOP, so hardcode the size of the option in the IPv6 case, 1192 * and use the general uv__setsockopt_maybe_char call otherwise. 1193 */ 1194 #if defined(__sun) || defined(_AIX) || defined(__OpenBSD__) || \ 1195 defined(__MVS__) 1196 if (handle->flags & UV_HANDLE_IPV6) 1197 return uv__setsockopt(handle, 1198 IP_MULTICAST_LOOP, 1199 IPV6_MULTICAST_LOOP, 1200 &on, 1201 sizeof(on)); 1202 #endif /* defined(__sun) || defined(_AIX) ||defined(__OpenBSD__) || 1203 defined(__MVS__) */ 1204 1205 return uv__setsockopt_maybe_char(handle, 1206 IP_MULTICAST_LOOP, 1207 IPV6_MULTICAST_LOOP, 1208 on); 1209 } 1210 1211 int uv_udp_set_multicast_interface(uv_udp_t* handle, const char* interface_addr) { 1212 struct sockaddr_storage addr_st; 1213 struct sockaddr_in* addr4; 1214 struct sockaddr_in6* addr6; 1215 1216 addr4 = (struct sockaddr_in*) &addr_st; 1217 addr6 = (struct sockaddr_in6*) &addr_st; 1218 1219 if (!interface_addr) { 1220 memset(&addr_st, 0, sizeof addr_st); 1221 if (handle->flags & UV_HANDLE_IPV6) { 1222 addr_st.ss_family = AF_INET6; 1223 addr6->sin6_scope_id = 0; 1224 } else { 1225 addr_st.ss_family = AF_INET; 1226 addr4->sin_addr.s_addr = htonl(INADDR_ANY); 1227 } 1228 } else if (uv_ip4_addr(interface_addr, 0, addr4) == 0) { 1229 /* nothing, address was parsed */ 1230 } else if (uv_ip6_addr(interface_addr, 0, addr6) == 0) { 1231 /* nothing, address was parsed */ 1232 } else { 1233 return UV_EINVAL; 1234 } 1235 1236 if (addr_st.ss_family == AF_INET) { 1237 if (setsockopt(handle->io_watcher.fd, 1238 IPPROTO_IP, 1239 IP_MULTICAST_IF, 1240 (void*) &addr4->sin_addr, 1241 sizeof(addr4->sin_addr)) == -1) { 1242 return UV__ERR(errno); 1243 } 1244 } else if (addr_st.ss_family == AF_INET6) { 1245 if (setsockopt(handle->io_watcher.fd, 1246 IPPROTO_IPV6, 1247 IPV6_MULTICAST_IF, 1248 &addr6->sin6_scope_id, 1249 sizeof(addr6->sin6_scope_id)) == -1) { 1250 return UV__ERR(errno); 1251 } 1252 } else { 1253 assert(0 && "unexpected address family"); 1254 abort(); 1255 } 1256 1257 return 0; 1258 } 1259 1260 int uv_udp_getpeername(const uv_udp_t* handle, 1261 struct sockaddr* name, 1262 int* namelen) { 1263 1264 return uv__getsockpeername((const uv_handle_t*) handle, 1265 getpeername, 1266 name, 1267 namelen); 1268 } 1269 1270 int uv_udp_getsockname(const uv_udp_t* handle, 1271 struct sockaddr* name, 1272 int* namelen) { 1273 1274 return uv__getsockpeername((const uv_handle_t*) handle, 1275 getsockname, 1276 name, 1277 namelen); 1278 } 1279 1280 1281 int uv__udp_recv_start(uv_udp_t* handle, 1282 uv_alloc_cb alloc_cb, 1283 uv_udp_recv_cb recv_cb) { 1284 int err; 1285 1286 if (alloc_cb == NULL || recv_cb == NULL) 1287 return UV_EINVAL; 1288 1289 if (uv__io_active(&handle->io_watcher, POLLIN)) 1290 return UV_EALREADY; /* FIXME(bnoordhuis) Should be UV_EBUSY. */ 1291 1292 err = uv__udp_maybe_deferred_bind(handle, AF_INET, 0); 1293 if (err) 1294 return err; 1295 1296 handle->alloc_cb = alloc_cb; 1297 handle->recv_cb = recv_cb; 1298 1299 uv__io_start(handle->loop, &handle->io_watcher, POLLIN); 1300 uv__handle_start(handle); 1301 1302 return 0; 1303 } 1304 1305 1306 int uv__udp_recv_stop(uv_udp_t* handle) { 1307 uv__io_stop(handle->loop, &handle->io_watcher, POLLIN); 1308 1309 if (!uv__io_active(&handle->io_watcher, POLLOUT)) 1310 uv__handle_stop(handle); 1311 1312 handle->alloc_cb = NULL; 1313 handle->recv_cb = NULL; 1314 1315 return 0; 1316 } 1317