xref: /netbsd-src/external/mit/libuv/dist/src/unix/udp.c (revision 181254a7b1bdde6873432bffef2d2decc4b5c22f)
1 /* Copyright Joyent, Inc. and other Node contributors. All rights reserved.
2  *
3  * Permission is hereby granted, free of charge, to any person obtaining a copy
4  * of this software and associated documentation files (the "Software"), to
5  * deal in the Software without restriction, including without limitation the
6  * rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
7  * sell copies of the Software, and to permit persons to whom the Software is
8  * furnished to do so, subject to the following conditions:
9  *
10  * The above copyright notice and this permission notice shall be included in
11  * all copies or substantial portions of the Software.
12  *
13  * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
14  * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
15  * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
16  * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
17  * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
18  * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
19  * IN THE SOFTWARE.
20  */
21 
22 #include "uv.h"
23 #include "internal.h"
24 
25 #include <assert.h>
26 #include <string.h>
27 #include <errno.h>
28 #include <stdlib.h>
29 #include <unistd.h>
30 #if defined(__MVS__)
31 #include <xti.h>
32 #endif
33 #include <sys/un.h>
34 
35 #define UV__UDP_DGRAM_MAXSIZE (64 * 1024)
36 
37 #if defined(IPV6_JOIN_GROUP) && !defined(IPV6_ADD_MEMBERSHIP)
38 # define IPV6_ADD_MEMBERSHIP IPV6_JOIN_GROUP
39 #endif
40 
41 #if defined(IPV6_LEAVE_GROUP) && !defined(IPV6_DROP_MEMBERSHIP)
42 # define IPV6_DROP_MEMBERSHIP IPV6_LEAVE_GROUP
43 #endif
44 
45 union uv__sockaddr {
46   struct sockaddr_in6 in6;
47   struct sockaddr_in in;
48   struct sockaddr addr;
49 };
50 
51 static void uv__udp_run_completed(uv_udp_t* handle);
52 static void uv__udp_io(uv_loop_t* loop, uv__io_t* w, unsigned int revents);
53 static void uv__udp_recvmsg(uv_udp_t* handle);
54 static void uv__udp_sendmsg(uv_udp_t* handle);
55 static int uv__udp_maybe_deferred_bind(uv_udp_t* handle,
56                                        int domain,
57                                        unsigned int flags);
58 
59 #if HAVE_MMSG
60 
61 #define UV__MMSG_MAXWIDTH 20
62 
63 static int uv__udp_recvmmsg(uv_udp_t* handle, uv_buf_t* buf);
64 static void uv__udp_sendmmsg(uv_udp_t* handle);
65 
66 static int uv__recvmmsg_avail;
67 static int uv__sendmmsg_avail;
68 static uv_once_t once = UV_ONCE_INIT;
69 
70 static void uv__udp_mmsg_init(void) {
71   int ret;
72   int s;
73   s = uv__socket(AF_INET, SOCK_DGRAM, 0);
74   if (s < 0)
75     return;
76   ret = uv__sendmmsg(s, NULL, 0, 0);
77   if (ret == 0 || errno != ENOSYS) {
78     uv__sendmmsg_avail = 1;
79     uv__recvmmsg_avail = 1;
80   } else {
81     ret = uv__recvmmsg(s, NULL, 0, 0, NULL);
82     if (ret == 0 || errno != ENOSYS)
83       uv__recvmmsg_avail = 1;
84   }
85   uv__close(s);
86 }
87 
88 #endif
89 
90 void uv__udp_close(uv_udp_t* handle) {
91   uv__io_close(handle->loop, &handle->io_watcher);
92   uv__handle_stop(handle);
93 
94   if (handle->io_watcher.fd != -1) {
95     uv__close(handle->io_watcher.fd);
96     handle->io_watcher.fd = -1;
97   }
98 }
99 
100 
101 void uv__udp_finish_close(uv_udp_t* handle) {
102   uv_udp_send_t* req;
103   QUEUE* q;
104 
105   assert(!uv__io_active(&handle->io_watcher, POLLIN | POLLOUT));
106   assert(handle->io_watcher.fd == -1);
107 
108   while (!QUEUE_EMPTY(&handle->write_queue)) {
109     q = QUEUE_HEAD(&handle->write_queue);
110     QUEUE_REMOVE(q);
111 
112     req = QUEUE_DATA(q, uv_udp_send_t, queue);
113     req->status = UV_ECANCELED;
114     QUEUE_INSERT_TAIL(&handle->write_completed_queue, &req->queue);
115   }
116 
117   uv__udp_run_completed(handle);
118 
119   assert(handle->send_queue_size == 0);
120   assert(handle->send_queue_count == 0);
121 
122   /* Now tear down the handle. */
123   handle->recv_cb = NULL;
124   handle->alloc_cb = NULL;
125   /* but _do not_ touch close_cb */
126 }
127 
128 
129 static void uv__udp_run_completed(uv_udp_t* handle) {
130   uv_udp_send_t* req;
131   QUEUE* q;
132 
133   assert(!(handle->flags & UV_HANDLE_UDP_PROCESSING));
134   handle->flags |= UV_HANDLE_UDP_PROCESSING;
135 
136   while (!QUEUE_EMPTY(&handle->write_completed_queue)) {
137     q = QUEUE_HEAD(&handle->write_completed_queue);
138     QUEUE_REMOVE(q);
139 
140     req = QUEUE_DATA(q, uv_udp_send_t, queue);
141     uv__req_unregister(handle->loop, req);
142 
143     handle->send_queue_size -= uv__count_bufs(req->bufs, req->nbufs);
144     handle->send_queue_count--;
145 
146     if (req->bufs != req->bufsml)
147       uv__free(req->bufs);
148     req->bufs = NULL;
149 
150     if (req->send_cb == NULL)
151       continue;
152 
153     /* req->status >= 0 == bytes written
154      * req->status <  0 == errno
155      */
156     if (req->status >= 0)
157       req->send_cb(req, 0);
158     else
159       req->send_cb(req, req->status);
160   }
161 
162   if (QUEUE_EMPTY(&handle->write_queue)) {
163     /* Pending queue and completion queue empty, stop watcher. */
164     uv__io_stop(handle->loop, &handle->io_watcher, POLLOUT);
165     if (!uv__io_active(&handle->io_watcher, POLLIN))
166       uv__handle_stop(handle);
167   }
168 
169   handle->flags &= ~UV_HANDLE_UDP_PROCESSING;
170 }
171 
172 
173 static void uv__udp_io(uv_loop_t* loop, uv__io_t* w, unsigned int revents) {
174   uv_udp_t* handle;
175 
176   handle = container_of(w, uv_udp_t, io_watcher);
177   assert(handle->type == UV_UDP);
178 
179   if (revents & POLLIN)
180     uv__udp_recvmsg(handle);
181 
182   if (revents & POLLOUT) {
183     uv__udp_sendmsg(handle);
184     uv__udp_run_completed(handle);
185   }
186 }
187 
188 #if HAVE_MMSG
189 static int uv__udp_recvmmsg(uv_udp_t* handle, uv_buf_t* buf) {
190   struct sockaddr_in6 peers[UV__MMSG_MAXWIDTH];
191   struct iovec iov[UV__MMSG_MAXWIDTH];
192   struct uv__mmsghdr msgs[UV__MMSG_MAXWIDTH];
193   ssize_t nread;
194   uv_buf_t chunk_buf;
195   size_t chunks;
196   int flags;
197   size_t k;
198 
199   /* prepare structures for recvmmsg */
200   chunks = buf->len / UV__UDP_DGRAM_MAXSIZE;
201   if (chunks > ARRAY_SIZE(iov))
202     chunks = ARRAY_SIZE(iov);
203   for (k = 0; k < chunks; ++k) {
204     iov[k].iov_base = buf->base + k * UV__UDP_DGRAM_MAXSIZE;
205     iov[k].iov_len = UV__UDP_DGRAM_MAXSIZE;
206     msgs[k].msg_hdr.msg_iov = iov + k;
207     msgs[k].msg_hdr.msg_iovlen = 1;
208     msgs[k].msg_hdr.msg_name = peers + k;
209     msgs[k].msg_hdr.msg_namelen = sizeof(peers[0]);
210     msgs[k].msg_hdr.msg_control = NULL;
211     msgs[k].msg_hdr.msg_controllen = 0;
212     msgs[k].msg_hdr.msg_flags = 0;
213   }
214 
215   do
216     nread = uv__recvmmsg(handle->io_watcher.fd, msgs, chunks, 0, NULL);
217   while (nread == -1 && errno == EINTR);
218 
219   if (nread < 1) {
220     if (nread == 0 || errno == EAGAIN || errno == EWOULDBLOCK)
221       handle->recv_cb(handle, 0, buf, NULL, 0);
222     else
223       handle->recv_cb(handle, UV__ERR(errno), buf, NULL, 0);
224   } else {
225     /* pass each chunk to the application */
226     for (k = 0; k < (size_t) nread && handle->recv_cb != NULL; k++) {
227       flags = UV_UDP_MMSG_CHUNK;
228       if (msgs[k].msg_hdr.msg_flags & MSG_TRUNC)
229         flags |= UV_UDP_PARTIAL;
230 
231       chunk_buf = uv_buf_init(iov[k].iov_base, iov[k].iov_len);
232       handle->recv_cb(handle,
233                       msgs[k].msg_len,
234                       &chunk_buf,
235                       msgs[k].msg_hdr.msg_name,
236                       flags);
237     }
238 
239     /* one last callback so the original buffer is freed */
240     if (handle->recv_cb != NULL)
241       handle->recv_cb(handle, 0, buf, NULL, 0);
242   }
243   return nread;
244 }
245 #endif
246 
247 static void uv__udp_recvmsg(uv_udp_t* handle) {
248   struct sockaddr_storage peer;
249   struct msghdr h;
250   ssize_t nread;
251   uv_buf_t buf;
252   int flags;
253   int count;
254 
255   assert(handle->recv_cb != NULL);
256   assert(handle->alloc_cb != NULL);
257 
258   /* Prevent loop starvation when the data comes in as fast as (or faster than)
259    * we can read it. XXX Need to rearm fd if we switch to edge-triggered I/O.
260    */
261   count = 32;
262 
263   do {
264     buf = uv_buf_init(NULL, 0);
265     handle->alloc_cb((uv_handle_t*) handle, UV__UDP_DGRAM_MAXSIZE, &buf);
266     if (buf.base == NULL || buf.len == 0) {
267       handle->recv_cb(handle, UV_ENOBUFS, &buf, NULL, 0);
268       return;
269     }
270     assert(buf.base != NULL);
271 
272 #if HAVE_MMSG
273     if (handle->flags & UV_HANDLE_UDP_RECVMMSG) {
274       uv_once(&once, uv__udp_mmsg_init);
275       if (uv__recvmmsg_avail) {
276         nread = uv__udp_recvmmsg(handle, &buf);
277         if (nread > 0)
278           count -= nread;
279         continue;
280       }
281     }
282 #endif
283 
284     memset(&h, 0, sizeof(h));
285     memset(&peer, 0, sizeof(peer));
286     h.msg_name = &peer;
287     h.msg_namelen = sizeof(peer);
288     h.msg_iov = (void*) &buf;
289     h.msg_iovlen = 1;
290 
291     do {
292       nread = recvmsg(handle->io_watcher.fd, &h, 0);
293     }
294     while (nread == -1 && errno == EINTR);
295 
296     if (nread == -1) {
297       if (errno == EAGAIN || errno == EWOULDBLOCK)
298         handle->recv_cb(handle, 0, &buf, NULL, 0);
299       else
300         handle->recv_cb(handle, UV__ERR(errno), &buf, NULL, 0);
301     }
302     else {
303       flags = 0;
304       if (h.msg_flags & MSG_TRUNC)
305         flags |= UV_UDP_PARTIAL;
306 
307       handle->recv_cb(handle, nread, &buf, (const struct sockaddr*) &peer, flags);
308     }
309     count--;
310   }
311   /* recv_cb callback may decide to pause or close the handle */
312   while (nread != -1
313       && count > 0
314       && handle->io_watcher.fd != -1
315       && handle->recv_cb != NULL);
316 }
317 
318 #if HAVE_MMSG
319 static void uv__udp_sendmmsg(uv_udp_t* handle) {
320   uv_udp_send_t* req;
321   struct uv__mmsghdr h[UV__MMSG_MAXWIDTH];
322   struct uv__mmsghdr *p;
323   QUEUE* q;
324   ssize_t npkts;
325   size_t pkts;
326   size_t i;
327 
328   if (QUEUE_EMPTY(&handle->write_queue))
329     return;
330 
331 write_queue_drain:
332   for (pkts = 0, q = QUEUE_HEAD(&handle->write_queue);
333        pkts < UV__MMSG_MAXWIDTH && q != &handle->write_queue;
334        ++pkts, q = QUEUE_HEAD(q)) {
335     assert(q != NULL);
336     req = QUEUE_DATA(q, uv_udp_send_t, queue);
337     assert(req != NULL);
338 
339     p = &h[pkts];
340     memset(p, 0, sizeof(*p));
341     if (req->addr.ss_family == AF_UNSPEC) {
342       p->msg_hdr.msg_name = NULL;
343       p->msg_hdr.msg_namelen = 0;
344     } else {
345       p->msg_hdr.msg_name = &req->addr;
346       if (req->addr.ss_family == AF_INET6)
347         p->msg_hdr.msg_namelen = sizeof(struct sockaddr_in6);
348       else if (req->addr.ss_family == AF_INET)
349         p->msg_hdr.msg_namelen = sizeof(struct sockaddr_in);
350       else if (req->addr.ss_family == AF_UNIX)
351         p->msg_hdr.msg_namelen = sizeof(struct sockaddr_un);
352       else {
353         assert(0 && "unsupported address family");
354         abort();
355       }
356     }
357     h[pkts].msg_hdr.msg_iov = (struct iovec*) req->bufs;
358     h[pkts].msg_hdr.msg_iovlen = req->nbufs;
359   }
360 
361   do
362     npkts = uv__sendmmsg(handle->io_watcher.fd, h, pkts, 0);
363   while (npkts == -1 && errno == EINTR);
364 
365   if (npkts < 1) {
366     if (errno == EAGAIN || errno == EWOULDBLOCK || errno == ENOBUFS)
367       return;
368     for (i = 0, q = QUEUE_HEAD(&handle->write_queue);
369          i < pkts && q != &handle->write_queue;
370          ++i, q = QUEUE_HEAD(q)) {
371       assert(q != NULL);
372       req = QUEUE_DATA(q, uv_udp_send_t, queue);
373       assert(req != NULL);
374 
375       req->status = UV__ERR(errno);
376       QUEUE_REMOVE(&req->queue);
377       QUEUE_INSERT_TAIL(&handle->write_completed_queue, &req->queue);
378     }
379     uv__io_feed(handle->loop, &handle->io_watcher);
380     return;
381   }
382 
383   for (i = 0, q = QUEUE_HEAD(&handle->write_queue);
384        i < pkts && q != &handle->write_queue;
385        ++i, q = QUEUE_HEAD(&handle->write_queue)) {
386     assert(q != NULL);
387     req = QUEUE_DATA(q, uv_udp_send_t, queue);
388     assert(req != NULL);
389 
390     req->status = req->bufs[0].len;
391 
392     /* Sending a datagram is an atomic operation: either all data
393      * is written or nothing is (and EMSGSIZE is raised). That is
394      * why we don't handle partial writes. Just pop the request
395      * off the write queue and onto the completed queue, done.
396      */
397     QUEUE_REMOVE(&req->queue);
398     QUEUE_INSERT_TAIL(&handle->write_completed_queue, &req->queue);
399   }
400 
401   /* couldn't batch everything, continue sending (jump to avoid stack growth) */
402   if (!QUEUE_EMPTY(&handle->write_queue))
403     goto write_queue_drain;
404   uv__io_feed(handle->loop, &handle->io_watcher);
405   return;
406 }
407 #endif
408 
409 static void uv__udp_sendmsg(uv_udp_t* handle) {
410   uv_udp_send_t* req;
411   struct msghdr h;
412   QUEUE* q;
413   ssize_t size;
414 
415 #if HAVE_MMSG
416   uv_once(&once, uv__udp_mmsg_init);
417   if (uv__sendmmsg_avail) {
418     uv__udp_sendmmsg(handle);
419     return;
420   }
421 #endif
422 
423   while (!QUEUE_EMPTY(&handle->write_queue)) {
424     q = QUEUE_HEAD(&handle->write_queue);
425     assert(q != NULL);
426 
427     req = QUEUE_DATA(q, uv_udp_send_t, queue);
428     assert(req != NULL);
429 
430     memset(&h, 0, sizeof h);
431     if (req->addr.ss_family == AF_UNSPEC) {
432       h.msg_name = NULL;
433       h.msg_namelen = 0;
434     } else {
435       h.msg_name = &req->addr;
436       if (req->addr.ss_family == AF_INET6)
437         h.msg_namelen = sizeof(struct sockaddr_in6);
438       else if (req->addr.ss_family == AF_INET)
439         h.msg_namelen = sizeof(struct sockaddr_in);
440       else if (req->addr.ss_family == AF_UNIX)
441         h.msg_namelen = sizeof(struct sockaddr_un);
442       else {
443         assert(0 && "unsupported address family");
444         abort();
445       }
446     }
447     h.msg_iov = (struct iovec*) req->bufs;
448     h.msg_iovlen = req->nbufs;
449 
450     do {
451       size = sendmsg(handle->io_watcher.fd, &h, 0);
452     } while (size == -1 && errno == EINTR);
453 
454     if (size == -1) {
455       if (errno == EAGAIN || errno == EWOULDBLOCK || errno == ENOBUFS)
456         break;
457     }
458 
459     req->status = (size == -1 ? UV__ERR(errno) : size);
460 
461     /* Sending a datagram is an atomic operation: either all data
462      * is written or nothing is (and EMSGSIZE is raised). That is
463      * why we don't handle partial writes. Just pop the request
464      * off the write queue and onto the completed queue, done.
465      */
466     QUEUE_REMOVE(&req->queue);
467     QUEUE_INSERT_TAIL(&handle->write_completed_queue, &req->queue);
468     uv__io_feed(handle->loop, &handle->io_watcher);
469   }
470 }
471 
472 /* On the BSDs, SO_REUSEPORT implies SO_REUSEADDR but with some additional
473  * refinements for programs that use multicast.
474  *
475  * Linux as of 3.9 has a SO_REUSEPORT socket option but with semantics that
476  * are different from the BSDs: it _shares_ the port rather than steal it
477  * from the current listener.  While useful, it's not something we can emulate
478  * on other platforms so we don't enable it.
479  *
480  * zOS does not support getsockname with SO_REUSEPORT option when using
481  * AF_UNIX.
482  */
483 static int uv__set_reuse(int fd) {
484   int yes;
485   yes = 1;
486 
487 #if defined(SO_REUSEPORT) && defined(__MVS__)
488   struct sockaddr_in sockfd;
489   unsigned int sockfd_len = sizeof(sockfd);
490   if (getsockname(fd, (struct sockaddr*) &sockfd, &sockfd_len) == -1)
491       return UV__ERR(errno);
492   if (sockfd.sin_family == AF_UNIX) {
493     if (setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &yes, sizeof(yes)))
494       return UV__ERR(errno);
495   } else {
496     if (setsockopt(fd, SOL_SOCKET, SO_REUSEPORT, &yes, sizeof(yes)))
497        return UV__ERR(errno);
498   }
499 #elif defined(SO_REUSEPORT) && !defined(__linux__)
500   if (setsockopt(fd, SOL_SOCKET, SO_REUSEPORT, &yes, sizeof(yes)))
501     return UV__ERR(errno);
502 #else
503   if (setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &yes, sizeof(yes)))
504     return UV__ERR(errno);
505 #endif
506 
507   return 0;
508 }
509 
510 
511 int uv__udp_bind(uv_udp_t* handle,
512                  const struct sockaddr* addr,
513                  unsigned int addrlen,
514                  unsigned int flags) {
515   int err;
516   int yes;
517   int fd;
518 
519   /* Check for bad flags. */
520   if (flags & ~(UV_UDP_IPV6ONLY | UV_UDP_REUSEADDR))
521     return UV_EINVAL;
522 
523   /* Cannot set IPv6-only mode on non-IPv6 socket. */
524   if ((flags & UV_UDP_IPV6ONLY) && addr->sa_family != AF_INET6)
525     return UV_EINVAL;
526 
527   fd = handle->io_watcher.fd;
528   if (fd == -1) {
529     err = uv__socket(addr->sa_family, SOCK_DGRAM, 0);
530     if (err < 0)
531       return err;
532     fd = err;
533     handle->io_watcher.fd = fd;
534   }
535 
536   if (flags & UV_UDP_REUSEADDR) {
537     err = uv__set_reuse(fd);
538     if (err)
539       return err;
540   }
541 
542   if (flags & UV_UDP_IPV6ONLY) {
543 #ifdef IPV6_V6ONLY
544     yes = 1;
545     if (setsockopt(fd, IPPROTO_IPV6, IPV6_V6ONLY, &yes, sizeof yes) == -1) {
546       err = UV__ERR(errno);
547       return err;
548     }
549 #else
550     err = UV_ENOTSUP;
551     return err;
552 #endif
553   }
554 
555   if (bind(fd, addr, addrlen)) {
556     err = UV__ERR(errno);
557     if (errno == EAFNOSUPPORT)
558       /* OSX, other BSDs and SunoS fail with EAFNOSUPPORT when binding a
559        * socket created with AF_INET to an AF_INET6 address or vice versa. */
560       err = UV_EINVAL;
561     return err;
562   }
563 
564   if (addr->sa_family == AF_INET6)
565     handle->flags |= UV_HANDLE_IPV6;
566 
567   handle->flags |= UV_HANDLE_BOUND;
568   return 0;
569 }
570 
571 
572 static int uv__udp_maybe_deferred_bind(uv_udp_t* handle,
573                                        int domain,
574                                        unsigned int flags) {
575   union uv__sockaddr taddr;
576   socklen_t addrlen;
577 
578   if (handle->io_watcher.fd != -1)
579     return 0;
580 
581   switch (domain) {
582   case AF_INET:
583   {
584     struct sockaddr_in* addr = &taddr.in;
585     memset(addr, 0, sizeof *addr);
586     addr->sin_family = AF_INET;
587     addr->sin_addr.s_addr = INADDR_ANY;
588     addrlen = sizeof *addr;
589     break;
590   }
591   case AF_INET6:
592   {
593     struct sockaddr_in6* addr = &taddr.in6;
594     memset(addr, 0, sizeof *addr);
595     addr->sin6_family = AF_INET6;
596     addr->sin6_addr = in6addr_any;
597     addrlen = sizeof *addr;
598     break;
599   }
600   default:
601     assert(0 && "unsupported address family");
602     abort();
603   }
604 
605   return uv__udp_bind(handle, &taddr.addr, addrlen, flags);
606 }
607 
608 
609 int uv__udp_connect(uv_udp_t* handle,
610                     const struct sockaddr* addr,
611                     unsigned int addrlen) {
612   int err;
613 
614   err = uv__udp_maybe_deferred_bind(handle, addr->sa_family, 0);
615   if (err)
616     return err;
617 
618   do {
619     errno = 0;
620     err = connect(handle->io_watcher.fd, addr, addrlen);
621   } while (err == -1 && errno == EINTR);
622 
623   if (err)
624     return UV__ERR(errno);
625 
626   handle->flags |= UV_HANDLE_UDP_CONNECTED;
627 
628   return 0;
629 }
630 
631 
632 int uv__udp_disconnect(uv_udp_t* handle) {
633     int r;
634     struct sockaddr addr;
635 
636     memset(&addr, 0, sizeof(addr));
637 
638     addr.sa_family = AF_UNSPEC;
639 
640     do {
641       errno = 0;
642       r = connect(handle->io_watcher.fd, &addr, sizeof(addr));
643     } while (r == -1 && errno == EINTR);
644 
645     if (r == -1 && errno != EAFNOSUPPORT)
646       return UV__ERR(errno);
647 
648     handle->flags &= ~UV_HANDLE_UDP_CONNECTED;
649     return 0;
650 }
651 
652 
653 int uv__udp_send(uv_udp_send_t* req,
654                  uv_udp_t* handle,
655                  const uv_buf_t bufs[],
656                  unsigned int nbufs,
657                  const struct sockaddr* addr,
658                  unsigned int addrlen,
659                  uv_udp_send_cb send_cb) {
660   int err;
661   int empty_queue;
662 
663   assert(nbufs > 0);
664 
665   if (addr) {
666     err = uv__udp_maybe_deferred_bind(handle, addr->sa_family, 0);
667     if (err)
668       return err;
669   }
670 
671   /* It's legal for send_queue_count > 0 even when the write_queue is empty;
672    * it means there are error-state requests in the write_completed_queue that
673    * will touch up send_queue_size/count later.
674    */
675   empty_queue = (handle->send_queue_count == 0);
676 
677   uv__req_init(handle->loop, req, UV_UDP_SEND);
678   assert(addrlen <= sizeof(req->addr));
679   if (addr == NULL)
680     req->addr.ss_family = AF_UNSPEC;
681   else
682     memcpy(&req->addr, addr, addrlen);
683   req->send_cb = send_cb;
684   req->handle = handle;
685   req->nbufs = nbufs;
686 
687   req->bufs = req->bufsml;
688   if (nbufs > ARRAY_SIZE(req->bufsml))
689     req->bufs = uv__malloc(nbufs * sizeof(bufs[0]));
690 
691   if (req->bufs == NULL) {
692     uv__req_unregister(handle->loop, req);
693     return UV_ENOMEM;
694   }
695 
696   memcpy(req->bufs, bufs, nbufs * sizeof(bufs[0]));
697   handle->send_queue_size += uv__count_bufs(req->bufs, req->nbufs);
698   handle->send_queue_count++;
699   QUEUE_INSERT_TAIL(&handle->write_queue, &req->queue);
700   uv__handle_start(handle);
701 
702   if (empty_queue && !(handle->flags & UV_HANDLE_UDP_PROCESSING)) {
703     uv__udp_sendmsg(handle);
704 
705     /* `uv__udp_sendmsg` may not be able to do non-blocking write straight
706      * away. In such cases the `io_watcher` has to be queued for asynchronous
707      * write.
708      */
709     if (!QUEUE_EMPTY(&handle->write_queue))
710       uv__io_start(handle->loop, &handle->io_watcher, POLLOUT);
711   } else {
712     uv__io_start(handle->loop, &handle->io_watcher, POLLOUT);
713   }
714 
715   return 0;
716 }
717 
718 
719 int uv__udp_try_send(uv_udp_t* handle,
720                      const uv_buf_t bufs[],
721                      unsigned int nbufs,
722                      const struct sockaddr* addr,
723                      unsigned int addrlen) {
724   int err;
725   struct msghdr h;
726   ssize_t size;
727 
728   assert(nbufs > 0);
729 
730   /* already sending a message */
731   if (handle->send_queue_count != 0)
732     return UV_EAGAIN;
733 
734   if (addr) {
735     err = uv__udp_maybe_deferred_bind(handle, addr->sa_family, 0);
736     if (err)
737       return err;
738   } else {
739     assert(handle->flags & UV_HANDLE_UDP_CONNECTED);
740   }
741 
742   memset(&h, 0, sizeof h);
743   h.msg_name = (struct sockaddr*) addr;
744   h.msg_namelen = addrlen;
745   h.msg_iov = (struct iovec*) bufs;
746   h.msg_iovlen = nbufs;
747 
748   do {
749     size = sendmsg(handle->io_watcher.fd, &h, 0);
750   } while (size == -1 && errno == EINTR);
751 
752   if (size == -1) {
753     if (errno == EAGAIN || errno == EWOULDBLOCK || errno == ENOBUFS)
754       return UV_EAGAIN;
755     else
756       return UV__ERR(errno);
757   }
758 
759   return size;
760 }
761 
762 
763 static int uv__udp_set_membership4(uv_udp_t* handle,
764                                    const struct sockaddr_in* multicast_addr,
765                                    const char* interface_addr,
766                                    uv_membership membership) {
767   struct ip_mreq mreq;
768   int optname;
769   int err;
770 
771   memset(&mreq, 0, sizeof mreq);
772 
773   if (interface_addr) {
774     err = uv_inet_pton(AF_INET, interface_addr, &mreq.imr_interface.s_addr);
775     if (err)
776       return err;
777   } else {
778     mreq.imr_interface.s_addr = htonl(INADDR_ANY);
779   }
780 
781   mreq.imr_multiaddr.s_addr = multicast_addr->sin_addr.s_addr;
782 
783   switch (membership) {
784   case UV_JOIN_GROUP:
785     optname = IP_ADD_MEMBERSHIP;
786     break;
787   case UV_LEAVE_GROUP:
788     optname = IP_DROP_MEMBERSHIP;
789     break;
790   default:
791     return UV_EINVAL;
792   }
793 
794   if (setsockopt(handle->io_watcher.fd,
795                  IPPROTO_IP,
796                  optname,
797                  &mreq,
798                  sizeof(mreq))) {
799 #if defined(__MVS__)
800   if (errno == ENXIO)
801     return UV_ENODEV;
802 #endif
803     return UV__ERR(errno);
804   }
805 
806   return 0;
807 }
808 
809 
810 static int uv__udp_set_membership6(uv_udp_t* handle,
811                                    const struct sockaddr_in6* multicast_addr,
812                                    const char* interface_addr,
813                                    uv_membership membership) {
814   int optname;
815   struct ipv6_mreq mreq;
816   struct sockaddr_in6 addr6;
817 
818   memset(&mreq, 0, sizeof mreq);
819 
820   if (interface_addr) {
821     if (uv_ip6_addr(interface_addr, 0, &addr6))
822       return UV_EINVAL;
823     mreq.ipv6mr_interface = addr6.sin6_scope_id;
824   } else {
825     mreq.ipv6mr_interface = 0;
826   }
827 
828   mreq.ipv6mr_multiaddr = multicast_addr->sin6_addr;
829 
830   switch (membership) {
831   case UV_JOIN_GROUP:
832     optname = IPV6_ADD_MEMBERSHIP;
833     break;
834   case UV_LEAVE_GROUP:
835     optname = IPV6_DROP_MEMBERSHIP;
836     break;
837   default:
838     return UV_EINVAL;
839   }
840 
841   if (setsockopt(handle->io_watcher.fd,
842                  IPPROTO_IPV6,
843                  optname,
844                  &mreq,
845                  sizeof(mreq))) {
846 #if defined(__MVS__)
847   if (errno == ENXIO)
848     return UV_ENODEV;
849 #endif
850     return UV__ERR(errno);
851   }
852 
853   return 0;
854 }
855 
856 
857 #if !defined(__OpenBSD__) && !defined(__NetBSD__) && !defined(__ANDROID__)
858 static int uv__udp_set_source_membership4(uv_udp_t* handle,
859                                           const struct sockaddr_in* multicast_addr,
860                                           const char* interface_addr,
861                                           const struct sockaddr_in* source_addr,
862                                           uv_membership membership) {
863   struct ip_mreq_source mreq;
864   int optname;
865   int err;
866 
867   err = uv__udp_maybe_deferred_bind(handle, AF_INET, UV_UDP_REUSEADDR);
868   if (err)
869     return err;
870 
871   memset(&mreq, 0, sizeof(mreq));
872 
873   if (interface_addr != NULL) {
874     err = uv_inet_pton(AF_INET, interface_addr, &mreq.imr_interface.s_addr);
875     if (err)
876       return err;
877   } else {
878     mreq.imr_interface.s_addr = htonl(INADDR_ANY);
879   }
880 
881   mreq.imr_multiaddr.s_addr = multicast_addr->sin_addr.s_addr;
882   mreq.imr_sourceaddr.s_addr = source_addr->sin_addr.s_addr;
883 
884   if (membership == UV_JOIN_GROUP)
885     optname = IP_ADD_SOURCE_MEMBERSHIP;
886   else if (membership == UV_LEAVE_GROUP)
887     optname = IP_DROP_SOURCE_MEMBERSHIP;
888   else
889     return UV_EINVAL;
890 
891   if (setsockopt(handle->io_watcher.fd,
892                  IPPROTO_IP,
893                  optname,
894                  &mreq,
895                  sizeof(mreq))) {
896     return UV__ERR(errno);
897   }
898 
899   return 0;
900 }
901 
902 
903 static int uv__udp_set_source_membership6(uv_udp_t* handle,
904                                           const struct sockaddr_in6* multicast_addr,
905                                           const char* interface_addr,
906                                           const struct sockaddr_in6* source_addr,
907                                           uv_membership membership) {
908   struct group_source_req mreq;
909   struct sockaddr_in6 addr6;
910   int optname;
911   int err;
912 
913   err = uv__udp_maybe_deferred_bind(handle, AF_INET6, UV_UDP_REUSEADDR);
914   if (err)
915     return err;
916 
917   memset(&mreq, 0, sizeof(mreq));
918 
919   if (interface_addr != NULL) {
920     err = uv_ip6_addr(interface_addr, 0, &addr6);
921     if (err)
922       return err;
923     mreq.gsr_interface = addr6.sin6_scope_id;
924   } else {
925     mreq.gsr_interface = 0;
926   }
927 
928   STATIC_ASSERT(sizeof(mreq.gsr_group) >= sizeof(*multicast_addr));
929   STATIC_ASSERT(sizeof(mreq.gsr_source) >= sizeof(*source_addr));
930   memcpy(&mreq.gsr_group, multicast_addr, sizeof(*multicast_addr));
931   memcpy(&mreq.gsr_source, source_addr, sizeof(*source_addr));
932 
933   if (membership == UV_JOIN_GROUP)
934     optname = MCAST_JOIN_SOURCE_GROUP;
935   else if (membership == UV_LEAVE_GROUP)
936     optname = MCAST_LEAVE_SOURCE_GROUP;
937   else
938     return UV_EINVAL;
939 
940   if (setsockopt(handle->io_watcher.fd,
941                  IPPROTO_IPV6,
942                  optname,
943                  &mreq,
944                  sizeof(mreq))) {
945     return UV__ERR(errno);
946   }
947 
948   return 0;
949 }
950 #endif
951 
952 
953 int uv__udp_init_ex(uv_loop_t* loop,
954                     uv_udp_t* handle,
955                     unsigned flags,
956                     int domain) {
957   int fd;
958 
959   fd = -1;
960   if (domain != AF_UNSPEC) {
961     fd = uv__socket(domain, SOCK_DGRAM, 0);
962     if (fd < 0)
963       return fd;
964   }
965 
966   uv__handle_init(loop, (uv_handle_t*)handle, UV_UDP);
967   handle->alloc_cb = NULL;
968   handle->recv_cb = NULL;
969   handle->send_queue_size = 0;
970   handle->send_queue_count = 0;
971   uv__io_init(&handle->io_watcher, uv__udp_io, fd);
972   QUEUE_INIT(&handle->write_queue);
973   QUEUE_INIT(&handle->write_completed_queue);
974 
975   return 0;
976 }
977 
978 
979 int uv_udp_open(uv_udp_t* handle, uv_os_sock_t sock) {
980   int err;
981 
982   /* Check for already active socket. */
983   if (handle->io_watcher.fd != -1)
984     return UV_EBUSY;
985 
986   if (uv__fd_exists(handle->loop, sock))
987     return UV_EEXIST;
988 
989   err = uv__nonblock(sock, 1);
990   if (err)
991     return err;
992 
993   err = uv__set_reuse(sock);
994   if (err)
995     return err;
996 
997   handle->io_watcher.fd = sock;
998   if (uv__udp_is_connected(handle))
999     handle->flags |= UV_HANDLE_UDP_CONNECTED;
1000 
1001   return 0;
1002 }
1003 
1004 
1005 int uv_udp_set_membership(uv_udp_t* handle,
1006                           const char* multicast_addr,
1007                           const char* interface_addr,
1008                           uv_membership membership) {
1009   int err;
1010   struct sockaddr_in addr4;
1011   struct sockaddr_in6 addr6;
1012 
1013   if (uv_ip4_addr(multicast_addr, 0, &addr4) == 0) {
1014     err = uv__udp_maybe_deferred_bind(handle, AF_INET, UV_UDP_REUSEADDR);
1015     if (err)
1016       return err;
1017     return uv__udp_set_membership4(handle, &addr4, interface_addr, membership);
1018   } else if (uv_ip6_addr(multicast_addr, 0, &addr6) == 0) {
1019     err = uv__udp_maybe_deferred_bind(handle, AF_INET6, UV_UDP_REUSEADDR);
1020     if (err)
1021       return err;
1022     return uv__udp_set_membership6(handle, &addr6, interface_addr, membership);
1023   } else {
1024     return UV_EINVAL;
1025   }
1026 }
1027 
1028 
1029 int uv_udp_set_source_membership(uv_udp_t* handle,
1030                                  const char* multicast_addr,
1031                                  const char* interface_addr,
1032                                  const char* source_addr,
1033                                  uv_membership membership) {
1034 #if !defined(__OpenBSD__) && !defined(__NetBSD__) && !defined(__ANDROID__)
1035   int err;
1036   union uv__sockaddr mcast_addr;
1037   union uv__sockaddr src_addr;
1038 
1039   err = uv_ip4_addr(multicast_addr, 0, &mcast_addr.in);
1040   if (err) {
1041     err = uv_ip6_addr(multicast_addr, 0, &mcast_addr.in6);
1042     if (err)
1043       return err;
1044     err = uv_ip6_addr(source_addr, 0, &src_addr.in6);
1045     if (err)
1046       return err;
1047     return uv__udp_set_source_membership6(handle,
1048                                           &mcast_addr.in6,
1049                                           interface_addr,
1050                                           &src_addr.in6,
1051                                           membership);
1052   }
1053 
1054   err = uv_ip4_addr(source_addr, 0, &src_addr.in);
1055   if (err)
1056     return err;
1057   return uv__udp_set_source_membership4(handle,
1058                                         &mcast_addr.in,
1059                                         interface_addr,
1060                                         &src_addr.in,
1061                                         membership);
1062 #else
1063   return UV_ENOSYS;
1064 #endif
1065 }
1066 
1067 
1068 static int uv__setsockopt(uv_udp_t* handle,
1069                          int option4,
1070                          int option6,
1071                          const void* val,
1072                          socklen_t size) {
1073   int r;
1074 
1075   if (handle->flags & UV_HANDLE_IPV6)
1076     r = setsockopt(handle->io_watcher.fd,
1077                    IPPROTO_IPV6,
1078                    option6,
1079                    val,
1080                    size);
1081   else
1082     r = setsockopt(handle->io_watcher.fd,
1083                    IPPROTO_IP,
1084                    option4,
1085                    val,
1086                    size);
1087   if (r)
1088     return UV__ERR(errno);
1089 
1090   return 0;
1091 }
1092 
1093 static int uv__setsockopt_maybe_char(uv_udp_t* handle,
1094                                      int option4,
1095                                      int option6,
1096                                      int val) {
1097 #if defined(__sun) || defined(_AIX) || defined(__MVS__)
1098   char arg = val;
1099 #elif defined(__OpenBSD__)
1100   unsigned char arg = val;
1101 #else
1102   int arg = val;
1103 #endif
1104 
1105   if (val < 0 || val > 255)
1106     return UV_EINVAL;
1107 
1108   return uv__setsockopt(handle, option4, option6, &arg, sizeof(arg));
1109 }
1110 
1111 
1112 int uv_udp_set_broadcast(uv_udp_t* handle, int on) {
1113   if (setsockopt(handle->io_watcher.fd,
1114                  SOL_SOCKET,
1115                  SO_BROADCAST,
1116                  &on,
1117                  sizeof(on))) {
1118     return UV__ERR(errno);
1119   }
1120 
1121   return 0;
1122 }
1123 
1124 
1125 int uv_udp_set_ttl(uv_udp_t* handle, int ttl) {
1126   if (ttl < 1 || ttl > 255)
1127     return UV_EINVAL;
1128 
1129 #if defined(__MVS__)
1130   if (!(handle->flags & UV_HANDLE_IPV6))
1131     return UV_ENOTSUP;  /* zOS does not support setting ttl for IPv4 */
1132 #endif
1133 
1134 /*
1135  * On Solaris and derivatives such as SmartOS, the length of socket options
1136  * is sizeof(int) for IP_TTL and IPV6_UNICAST_HOPS,
1137  * so hardcode the size of these options on this platform,
1138  * and use the general uv__setsockopt_maybe_char call on other platforms.
1139  */
1140 #if defined(__sun) || defined(_AIX) || defined(__OpenBSD__) || \
1141     defined(__MVS__)
1142 
1143   return uv__setsockopt(handle,
1144                         IP_TTL,
1145                         IPV6_UNICAST_HOPS,
1146                         &ttl,
1147                         sizeof(ttl));
1148 
1149 #else /* !(defined(__sun) || defined(_AIX) || defined (__OpenBSD__) ||
1150            defined(__MVS__)) */
1151 
1152   return uv__setsockopt_maybe_char(handle,
1153                                    IP_TTL,
1154                                    IPV6_UNICAST_HOPS,
1155                                    ttl);
1156 
1157 #endif /* defined(__sun) || defined(_AIX) || defined (__OpenBSD__) ||
1158           defined(__MVS__) */
1159 }
1160 
1161 
1162 int uv_udp_set_multicast_ttl(uv_udp_t* handle, int ttl) {
1163 /*
1164  * On Solaris and derivatives such as SmartOS, the length of socket options
1165  * is sizeof(int) for IPV6_MULTICAST_HOPS and sizeof(char) for
1166  * IP_MULTICAST_TTL, so hardcode the size of the option in the IPv6 case,
1167  * and use the general uv__setsockopt_maybe_char call otherwise.
1168  */
1169 #if defined(__sun) || defined(_AIX) || defined(__OpenBSD__) || \
1170     defined(__MVS__)
1171   if (handle->flags & UV_HANDLE_IPV6)
1172     return uv__setsockopt(handle,
1173                           IP_MULTICAST_TTL,
1174                           IPV6_MULTICAST_HOPS,
1175                           &ttl,
1176                           sizeof(ttl));
1177 #endif /* defined(__sun) || defined(_AIX) || defined(__OpenBSD__) || \
1178     defined(__MVS__) */
1179 
1180   return uv__setsockopt_maybe_char(handle,
1181                                    IP_MULTICAST_TTL,
1182                                    IPV6_MULTICAST_HOPS,
1183                                    ttl);
1184 }
1185 
1186 
1187 int uv_udp_set_multicast_loop(uv_udp_t* handle, int on) {
1188 /*
1189  * On Solaris and derivatives such as SmartOS, the length of socket options
1190  * is sizeof(int) for IPV6_MULTICAST_LOOP and sizeof(char) for
1191  * IP_MULTICAST_LOOP, so hardcode the size of the option in the IPv6 case,
1192  * and use the general uv__setsockopt_maybe_char call otherwise.
1193  */
1194 #if defined(__sun) || defined(_AIX) || defined(__OpenBSD__) || \
1195     defined(__MVS__)
1196   if (handle->flags & UV_HANDLE_IPV6)
1197     return uv__setsockopt(handle,
1198                           IP_MULTICAST_LOOP,
1199                           IPV6_MULTICAST_LOOP,
1200                           &on,
1201                           sizeof(on));
1202 #endif /* defined(__sun) || defined(_AIX) ||defined(__OpenBSD__) ||
1203     defined(__MVS__) */
1204 
1205   return uv__setsockopt_maybe_char(handle,
1206                                    IP_MULTICAST_LOOP,
1207                                    IPV6_MULTICAST_LOOP,
1208                                    on);
1209 }
1210 
1211 int uv_udp_set_multicast_interface(uv_udp_t* handle, const char* interface_addr) {
1212   struct sockaddr_storage addr_st;
1213   struct sockaddr_in* addr4;
1214   struct sockaddr_in6* addr6;
1215 
1216   addr4 = (struct sockaddr_in*) &addr_st;
1217   addr6 = (struct sockaddr_in6*) &addr_st;
1218 
1219   if (!interface_addr) {
1220     memset(&addr_st, 0, sizeof addr_st);
1221     if (handle->flags & UV_HANDLE_IPV6) {
1222       addr_st.ss_family = AF_INET6;
1223       addr6->sin6_scope_id = 0;
1224     } else {
1225       addr_st.ss_family = AF_INET;
1226       addr4->sin_addr.s_addr = htonl(INADDR_ANY);
1227     }
1228   } else if (uv_ip4_addr(interface_addr, 0, addr4) == 0) {
1229     /* nothing, address was parsed */
1230   } else if (uv_ip6_addr(interface_addr, 0, addr6) == 0) {
1231     /* nothing, address was parsed */
1232   } else {
1233     return UV_EINVAL;
1234   }
1235 
1236   if (addr_st.ss_family == AF_INET) {
1237     if (setsockopt(handle->io_watcher.fd,
1238                    IPPROTO_IP,
1239                    IP_MULTICAST_IF,
1240                    (void*) &addr4->sin_addr,
1241                    sizeof(addr4->sin_addr)) == -1) {
1242       return UV__ERR(errno);
1243     }
1244   } else if (addr_st.ss_family == AF_INET6) {
1245     if (setsockopt(handle->io_watcher.fd,
1246                    IPPROTO_IPV6,
1247                    IPV6_MULTICAST_IF,
1248                    &addr6->sin6_scope_id,
1249                    sizeof(addr6->sin6_scope_id)) == -1) {
1250       return UV__ERR(errno);
1251     }
1252   } else {
1253     assert(0 && "unexpected address family");
1254     abort();
1255   }
1256 
1257   return 0;
1258 }
1259 
1260 int uv_udp_getpeername(const uv_udp_t* handle,
1261                        struct sockaddr* name,
1262                        int* namelen) {
1263 
1264   return uv__getsockpeername((const uv_handle_t*) handle,
1265                              getpeername,
1266                              name,
1267                              namelen);
1268 }
1269 
1270 int uv_udp_getsockname(const uv_udp_t* handle,
1271                        struct sockaddr* name,
1272                        int* namelen) {
1273 
1274   return uv__getsockpeername((const uv_handle_t*) handle,
1275                              getsockname,
1276                              name,
1277                              namelen);
1278 }
1279 
1280 
1281 int uv__udp_recv_start(uv_udp_t* handle,
1282                        uv_alloc_cb alloc_cb,
1283                        uv_udp_recv_cb recv_cb) {
1284   int err;
1285 
1286   if (alloc_cb == NULL || recv_cb == NULL)
1287     return UV_EINVAL;
1288 
1289   if (uv__io_active(&handle->io_watcher, POLLIN))
1290     return UV_EALREADY;  /* FIXME(bnoordhuis) Should be UV_EBUSY. */
1291 
1292   err = uv__udp_maybe_deferred_bind(handle, AF_INET, 0);
1293   if (err)
1294     return err;
1295 
1296   handle->alloc_cb = alloc_cb;
1297   handle->recv_cb = recv_cb;
1298 
1299   uv__io_start(handle->loop, &handle->io_watcher, POLLIN);
1300   uv__handle_start(handle);
1301 
1302   return 0;
1303 }
1304 
1305 
1306 int uv__udp_recv_stop(uv_udp_t* handle) {
1307   uv__io_stop(handle->loop, &handle->io_watcher, POLLIN);
1308 
1309   if (!uv__io_active(&handle->io_watcher, POLLOUT))
1310     uv__handle_stop(handle);
1311 
1312   handle->alloc_cb = NULL;
1313   handle->recv_cb = NULL;
1314 
1315   return 0;
1316 }
1317