1 /* Copyright Joyent, Inc. and other Node contributors. All rights reserved.
2 *
3 * Permission is hereby granted, free of charge, to any person obtaining a copy
4 * of this software and associated documentation files (the "Software"), to
5 * deal in the Software without restriction, including without limitation the
6 * rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
7 * sell copies of the Software, and to permit persons to whom the Software is
8 * furnished to do so, subject to the following conditions:
9 *
10 * The above copyright notice and this permission notice shall be included in
11 * all copies or substantial portions of the Software.
12 *
13 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
14 * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
15 * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
16 * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
17 * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
18 * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
19 * IN THE SOFTWARE.
20 */
21
22 #include "uv.h"
23 #include "internal.h"
24
25 #include <stdio.h>
26 #include <stdlib.h>
27 #include <string.h>
28 #include <assert.h>
29 #include <errno.h>
30
31 #include <sys/types.h>
32 #include <sys/socket.h>
33 #include <sys/uio.h>
34 #include <sys/un.h>
35 #include <unistd.h>
36 #include <limits.h> /* IOV_MAX */
37
38 #if defined(__APPLE__)
39 # include <sys/event.h>
40 # include <sys/time.h>
41 # include <sys/select.h>
42
43 /* Forward declaration */
44 typedef struct uv__stream_select_s uv__stream_select_t;
45
46 struct uv__stream_select_s {
47 uv_stream_t* stream;
48 uv_thread_t thread;
49 uv_sem_t close_sem;
50 uv_sem_t async_sem;
51 uv_async_t async;
52 int events;
53 int fake_fd;
54 int int_fd;
55 int fd;
56 fd_set* sread;
57 size_t sread_sz;
58 fd_set* swrite;
59 size_t swrite_sz;
60 };
61 #endif /* defined(__APPLE__) */
62
63 static void uv__stream_connect(uv_stream_t*);
64 static void uv__write(uv_stream_t* stream);
65 static void uv__read(uv_stream_t* stream);
66 static void uv__stream_io(uv_loop_t* loop, uv__io_t* w, unsigned int events);
67 static void uv__write_callbacks(uv_stream_t* stream);
68 static size_t uv__write_req_size(uv_write_t* req);
69 static void uv__drain(uv_stream_t* stream);
70
71
uv__stream_init(uv_loop_t * loop,uv_stream_t * stream,uv_handle_type type)72 void uv__stream_init(uv_loop_t* loop,
73 uv_stream_t* stream,
74 uv_handle_type type) {
75 int err;
76
77 uv__handle_init(loop, (uv_handle_t*)stream, type);
78 stream->read_cb = NULL;
79 stream->alloc_cb = NULL;
80 stream->close_cb = NULL;
81 stream->connection_cb = NULL;
82 stream->connect_req = NULL;
83 stream->shutdown_req = NULL;
84 stream->accepted_fd = -1;
85 stream->queued_fds = NULL;
86 stream->delayed_error = 0;
87 QUEUE_INIT(&stream->write_queue);
88 QUEUE_INIT(&stream->write_completed_queue);
89 stream->write_queue_size = 0;
90
91 if (loop->emfile_fd == -1) {
92 err = uv__open_cloexec("/dev/null", O_RDONLY);
93 if (err < 0)
94 /* In the rare case that "/dev/null" isn't mounted open "/"
95 * instead.
96 */
97 err = uv__open_cloexec("/", O_RDONLY);
98 if (err >= 0)
99 loop->emfile_fd = err;
100 }
101
102 #if defined(__APPLE__)
103 stream->select = NULL;
104 #endif /* defined(__APPLE_) */
105
106 uv__io_init(&stream->io_watcher, uv__stream_io, -1);
107 }
108
109
uv__stream_osx_interrupt_select(uv_stream_t * stream)110 static void uv__stream_osx_interrupt_select(uv_stream_t* stream) {
111 #if defined(__APPLE__)
112 /* Notify select() thread about state change */
113 uv__stream_select_t* s;
114 int r;
115
116 s = stream->select;
117 if (s == NULL)
118 return;
119
120 /* Interrupt select() loop
121 * NOTE: fake_fd and int_fd are socketpair(), thus writing to one will
122 * emit read event on other side
123 */
124 do
125 r = write(s->fake_fd, "x", 1);
126 while (r == -1 && errno == EINTR);
127
128 assert(r == 1);
129 #else /* !defined(__APPLE__) */
130 /* No-op on any other platform */
131 #endif /* !defined(__APPLE__) */
132 }
133
134
135 #if defined(__APPLE__)
uv__stream_osx_select(void * arg)136 static void uv__stream_osx_select(void* arg) {
137 uv_stream_t* stream;
138 uv__stream_select_t* s;
139 char buf[1024];
140 int events;
141 int fd;
142 int r;
143 int max_fd;
144
145 stream = arg;
146 s = stream->select;
147 fd = s->fd;
148
149 if (fd > s->int_fd)
150 max_fd = fd;
151 else
152 max_fd = s->int_fd;
153
154 for (;;) {
155 /* Terminate on semaphore */
156 if (uv_sem_trywait(&s->close_sem) == 0)
157 break;
158
159 /* Watch fd using select(2) */
160 memset(s->sread, 0, s->sread_sz);
161 memset(s->swrite, 0, s->swrite_sz);
162
163 if (uv__io_active(&stream->io_watcher, POLLIN))
164 FD_SET(fd, s->sread);
165 if (uv__io_active(&stream->io_watcher, POLLOUT))
166 FD_SET(fd, s->swrite);
167 FD_SET(s->int_fd, s->sread);
168
169 /* Wait indefinitely for fd events */
170 r = select(max_fd + 1, s->sread, s->swrite, NULL, NULL);
171 if (r == -1) {
172 if (errno == EINTR)
173 continue;
174
175 /* XXX: Possible?! */
176 abort();
177 }
178
179 /* Ignore timeouts */
180 if (r == 0)
181 continue;
182
183 /* Empty socketpair's buffer in case of interruption */
184 if (FD_ISSET(s->int_fd, s->sread))
185 for (;;) {
186 r = read(s->int_fd, buf, sizeof(buf));
187
188 if (r == sizeof(buf))
189 continue;
190
191 if (r != -1)
192 break;
193
194 if (errno == EAGAIN || errno == EWOULDBLOCK)
195 break;
196
197 if (errno == EINTR)
198 continue;
199
200 abort();
201 }
202
203 /* Handle events */
204 events = 0;
205 if (FD_ISSET(fd, s->sread))
206 events |= POLLIN;
207 if (FD_ISSET(fd, s->swrite))
208 events |= POLLOUT;
209
210 assert(events != 0 || FD_ISSET(s->int_fd, s->sread));
211 if (events != 0) {
212 ACCESS_ONCE(int, s->events) = events;
213
214 uv_async_send(&s->async);
215 uv_sem_wait(&s->async_sem);
216
217 /* Should be processed at this stage */
218 assert((s->events == 0) || (stream->flags & UV_HANDLE_CLOSING));
219 }
220 }
221 }
222
223
uv__stream_osx_select_cb(uv_async_t * handle)224 static void uv__stream_osx_select_cb(uv_async_t* handle) {
225 uv__stream_select_t* s;
226 uv_stream_t* stream;
227 int events;
228
229 s = container_of(handle, uv__stream_select_t, async);
230 stream = s->stream;
231
232 /* Get and reset stream's events */
233 events = s->events;
234 ACCESS_ONCE(int, s->events) = 0;
235
236 assert(events != 0);
237 assert(events == (events & (POLLIN | POLLOUT)));
238
239 /* Invoke callback on event-loop */
240 if ((events & POLLIN) && uv__io_active(&stream->io_watcher, POLLIN))
241 uv__stream_io(stream->loop, &stream->io_watcher, POLLIN);
242
243 if ((events & POLLOUT) && uv__io_active(&stream->io_watcher, POLLOUT))
244 uv__stream_io(stream->loop, &stream->io_watcher, POLLOUT);
245
246 if (stream->flags & UV_HANDLE_CLOSING)
247 return;
248
249 /* NOTE: It is important to do it here, otherwise `select()` might be called
250 * before the actual `uv__read()`, leading to the blocking syscall
251 */
252 uv_sem_post(&s->async_sem);
253 }
254
255
uv__stream_osx_cb_close(uv_handle_t * async)256 static void uv__stream_osx_cb_close(uv_handle_t* async) {
257 uv__stream_select_t* s;
258
259 s = container_of(async, uv__stream_select_t, async);
260 uv__free(s);
261 }
262
263
uv__stream_try_select(uv_stream_t * stream,int * fd)264 int uv__stream_try_select(uv_stream_t* stream, int* fd) {
265 /*
266 * kqueue doesn't work with some files from /dev mount on osx.
267 * select(2) in separate thread for those fds
268 */
269
270 struct kevent filter[1];
271 struct kevent events[1];
272 struct timespec timeout;
273 uv__stream_select_t* s;
274 int fds[2];
275 int err;
276 int ret;
277 int kq;
278 int old_fd;
279 int max_fd;
280 size_t sread_sz;
281 size_t swrite_sz;
282
283 kq = kqueue();
284 if (kq == -1) {
285 perror("(libuv) kqueue()");
286 return UV__ERR(errno);
287 }
288
289 EV_SET(&filter[0], *fd, EVFILT_READ, EV_ADD | EV_ENABLE, 0, 0, 0);
290
291 /* Use small timeout, because we only want to capture EINVALs */
292 timeout.tv_sec = 0;
293 timeout.tv_nsec = 1;
294
295 do
296 ret = kevent(kq, filter, 1, events, 1, &timeout);
297 while (ret == -1 && errno == EINTR);
298
299 uv__close(kq);
300
301 if (ret == -1)
302 return UV__ERR(errno);
303
304 if (ret == 0 || (events[0].flags & EV_ERROR) == 0 || events[0].data != EINVAL)
305 return 0;
306
307 /* At this point we definitely know that this fd won't work with kqueue */
308
309 /*
310 * Create fds for io watcher and to interrupt the select() loop.
311 * NOTE: do it ahead of malloc below to allocate enough space for fd_sets
312 */
313 if (socketpair(AF_UNIX, SOCK_STREAM, 0, fds))
314 return UV__ERR(errno);
315
316 max_fd = *fd;
317 if (fds[1] > max_fd)
318 max_fd = fds[1];
319
320 sread_sz = ROUND_UP(max_fd + 1, sizeof(uint32_t) * NBBY) / NBBY;
321 swrite_sz = sread_sz;
322
323 s = uv__malloc(sizeof(*s) + sread_sz + swrite_sz);
324 if (s == NULL) {
325 err = UV_ENOMEM;
326 goto failed_malloc;
327 }
328
329 s->events = 0;
330 s->fd = *fd;
331 s->sread = (fd_set*) ((char*) s + sizeof(*s));
332 s->sread_sz = sread_sz;
333 s->swrite = (fd_set*) ((char*) s->sread + sread_sz);
334 s->swrite_sz = swrite_sz;
335
336 err = uv_async_init(stream->loop, &s->async, uv__stream_osx_select_cb);
337 if (err)
338 goto failed_async_init;
339
340 s->async.flags |= UV_HANDLE_INTERNAL;
341 uv__handle_unref(&s->async);
342
343 err = uv_sem_init(&s->close_sem, 0);
344 if (err != 0)
345 goto failed_close_sem_init;
346
347 err = uv_sem_init(&s->async_sem, 0);
348 if (err != 0)
349 goto failed_async_sem_init;
350
351 s->fake_fd = fds[0];
352 s->int_fd = fds[1];
353
354 old_fd = *fd;
355 s->stream = stream;
356 stream->select = s;
357 *fd = s->fake_fd;
358
359 err = uv_thread_create(&s->thread, uv__stream_osx_select, stream);
360 if (err != 0)
361 goto failed_thread_create;
362
363 return 0;
364
365 failed_thread_create:
366 s->stream = NULL;
367 stream->select = NULL;
368 *fd = old_fd;
369
370 uv_sem_destroy(&s->async_sem);
371
372 failed_async_sem_init:
373 uv_sem_destroy(&s->close_sem);
374
375 failed_close_sem_init:
376 uv__close(fds[0]);
377 uv__close(fds[1]);
378 uv_close((uv_handle_t*) &s->async, uv__stream_osx_cb_close);
379 return err;
380
381 failed_async_init:
382 uv__free(s);
383
384 failed_malloc:
385 uv__close(fds[0]);
386 uv__close(fds[1]);
387
388 return err;
389 }
390 #endif /* defined(__APPLE__) */
391
392
uv__stream_open(uv_stream_t * stream,int fd,int flags)393 int uv__stream_open(uv_stream_t* stream, int fd, int flags) {
394 #if defined(__APPLE__)
395 int enable;
396 #endif
397
398 if (!(stream->io_watcher.fd == -1 || stream->io_watcher.fd == fd))
399 return UV_EBUSY;
400
401 assert(fd >= 0);
402 stream->flags |= flags;
403
404 if (stream->type == UV_TCP) {
405 if ((stream->flags & UV_HANDLE_TCP_NODELAY) && uv__tcp_nodelay(fd, 1))
406 return UV__ERR(errno);
407
408 /* TODO Use delay the user passed in. */
409 if ((stream->flags & UV_HANDLE_TCP_KEEPALIVE) &&
410 uv__tcp_keepalive(fd, 1, 60)) {
411 return UV__ERR(errno);
412 }
413 }
414
415 #if defined(__APPLE__)
416 enable = 1;
417 if (setsockopt(fd, SOL_SOCKET, SO_OOBINLINE, &enable, sizeof(enable)) &&
418 errno != ENOTSOCK &&
419 errno != EINVAL) {
420 return UV__ERR(errno);
421 }
422 #endif
423
424 stream->io_watcher.fd = fd;
425
426 return 0;
427 }
428
429
uv__stream_flush_write_queue(uv_stream_t * stream,int error)430 void uv__stream_flush_write_queue(uv_stream_t* stream, int error) {
431 uv_write_t* req;
432 QUEUE* q;
433 while (!QUEUE_EMPTY(&stream->write_queue)) {
434 q = QUEUE_HEAD(&stream->write_queue);
435 QUEUE_REMOVE(q);
436
437 req = QUEUE_DATA(q, uv_write_t, queue);
438 req->error = error;
439
440 QUEUE_INSERT_TAIL(&stream->write_completed_queue, &req->queue);
441 }
442 }
443
444
uv__stream_destroy(uv_stream_t * stream)445 void uv__stream_destroy(uv_stream_t* stream) {
446 assert(!uv__io_active(&stream->io_watcher, POLLIN | POLLOUT));
447 assert(stream->flags & UV_HANDLE_CLOSED);
448
449 if (stream->connect_req) {
450 uv__req_unregister(stream->loop, stream->connect_req);
451 stream->connect_req->cb(stream->connect_req, UV_ECANCELED);
452 stream->connect_req = NULL;
453 }
454
455 uv__stream_flush_write_queue(stream, UV_ECANCELED);
456 uv__write_callbacks(stream);
457 uv__drain(stream);
458
459 assert(stream->write_queue_size == 0);
460 }
461
462
463 /* Implements a best effort approach to mitigating accept() EMFILE errors.
464 * We have a spare file descriptor stashed away that we close to get below
465 * the EMFILE limit. Next, we accept all pending connections and close them
466 * immediately to signal the clients that we're overloaded - and we are, but
467 * we still keep on trucking.
468 *
469 * There is one caveat: it's not reliable in a multi-threaded environment.
470 * The file descriptor limit is per process. Our party trick fails if another
471 * thread opens a file or creates a socket in the time window between us
472 * calling close() and accept().
473 */
uv__emfile_trick(uv_loop_t * loop,int accept_fd)474 static int uv__emfile_trick(uv_loop_t* loop, int accept_fd) {
475 int err;
476 int emfile_fd;
477
478 if (loop->emfile_fd == -1)
479 return UV_EMFILE;
480
481 uv__close(loop->emfile_fd);
482 loop->emfile_fd = -1;
483
484 do {
485 err = uv__accept(accept_fd);
486 if (err >= 0)
487 uv__close(err);
488 } while (err >= 0 || err == UV_EINTR);
489
490 emfile_fd = uv__open_cloexec("/", O_RDONLY);
491 if (emfile_fd >= 0)
492 loop->emfile_fd = emfile_fd;
493
494 return err;
495 }
496
497
498 #if defined(UV_HAVE_KQUEUE)
499 # define UV_DEC_BACKLOG(w) w->rcount--;
500 #else
501 # define UV_DEC_BACKLOG(w) /* no-op */
502 #endif /* defined(UV_HAVE_KQUEUE) */
503
504
uv__server_io(uv_loop_t * loop,uv__io_t * w,unsigned int events)505 void uv__server_io(uv_loop_t* loop, uv__io_t* w, unsigned int events) {
506 uv_stream_t* stream;
507 int err;
508
509 stream = container_of(w, uv_stream_t, io_watcher);
510 assert(events & POLLIN);
511 assert(stream->accepted_fd == -1);
512 assert(!(stream->flags & UV_HANDLE_CLOSING));
513
514 uv__io_start(stream->loop, &stream->io_watcher, POLLIN);
515
516 /* connection_cb can close the server socket while we're
517 * in the loop so check it on each iteration.
518 */
519 while (uv__stream_fd(stream) != -1) {
520 assert(stream->accepted_fd == -1);
521
522 #if defined(UV_HAVE_KQUEUE)
523 if (w->rcount <= 0)
524 return;
525 #endif /* defined(UV_HAVE_KQUEUE) */
526
527 err = uv__accept(uv__stream_fd(stream));
528 if (err < 0) {
529 if (err == UV_EAGAIN || err == UV__ERR(EWOULDBLOCK))
530 return; /* Not an error. */
531
532 if (err == UV_ECONNABORTED)
533 continue; /* Ignore. Nothing we can do about that. */
534
535 if (err == UV_EMFILE || err == UV_ENFILE) {
536 err = uv__emfile_trick(loop, uv__stream_fd(stream));
537 if (err == UV_EAGAIN || err == UV__ERR(EWOULDBLOCK))
538 break;
539 }
540
541 stream->connection_cb(stream, err);
542 continue;
543 }
544
545 UV_DEC_BACKLOG(w)
546 stream->accepted_fd = err;
547 stream->connection_cb(stream, 0);
548
549 if (stream->accepted_fd != -1) {
550 /* The user hasn't yet accepted called uv_accept() */
551 uv__io_stop(loop, &stream->io_watcher, POLLIN);
552 return;
553 }
554
555 if (stream->type == UV_TCP &&
556 (stream->flags & UV_HANDLE_TCP_SINGLE_ACCEPT)) {
557 /* Give other processes a chance to accept connections. */
558 struct timespec timeout = { 0, 1 };
559 nanosleep(&timeout, NULL);
560 }
561 }
562 }
563
564
565 #undef UV_DEC_BACKLOG
566
567
uv_accept(uv_stream_t * server,uv_stream_t * client)568 int uv_accept(uv_stream_t* server, uv_stream_t* client) {
569 int err;
570
571 assert(server->loop == client->loop);
572
573 if (server->accepted_fd == -1)
574 return UV_EAGAIN;
575
576 switch (client->type) {
577 case UV_NAMED_PIPE:
578 case UV_TCP:
579 err = uv__stream_open(client,
580 server->accepted_fd,
581 UV_HANDLE_READABLE | UV_HANDLE_WRITABLE);
582 if (err) {
583 /* TODO handle error */
584 uv__close(server->accepted_fd);
585 goto done;
586 }
587 break;
588
589 case UV_UDP:
590 err = uv_udp_open((uv_udp_t*) client, server->accepted_fd);
591 if (err) {
592 uv__close(server->accepted_fd);
593 goto done;
594 }
595 break;
596
597 default:
598 return UV_EINVAL;
599 }
600
601 client->flags |= UV_HANDLE_BOUND;
602
603 done:
604 /* Process queued fds */
605 if (server->queued_fds != NULL) {
606 uv__stream_queued_fds_t* queued_fds;
607
608 queued_fds = server->queued_fds;
609
610 /* Read first */
611 server->accepted_fd = queued_fds->fds[0];
612
613 /* All read, free */
614 assert(queued_fds->offset > 0);
615 if (--queued_fds->offset == 0) {
616 uv__free(queued_fds);
617 server->queued_fds = NULL;
618 } else {
619 /* Shift rest */
620 memmove(queued_fds->fds,
621 queued_fds->fds + 1,
622 queued_fds->offset * sizeof(*queued_fds->fds));
623 }
624 } else {
625 server->accepted_fd = -1;
626 if (err == 0)
627 uv__io_start(server->loop, &server->io_watcher, POLLIN);
628 }
629 return err;
630 }
631
632
uv_listen(uv_stream_t * stream,int backlog,uv_connection_cb cb)633 int uv_listen(uv_stream_t* stream, int backlog, uv_connection_cb cb) {
634 int err;
635 if (uv__is_closing(stream)) {
636 return UV_EINVAL;
637 }
638 switch (stream->type) {
639 case UV_TCP:
640 err = uv__tcp_listen((uv_tcp_t*)stream, backlog, cb);
641 break;
642
643 case UV_NAMED_PIPE:
644 err = uv__pipe_listen((uv_pipe_t*)stream, backlog, cb);
645 break;
646
647 default:
648 err = UV_EINVAL;
649 }
650
651 if (err == 0)
652 uv__handle_start(stream);
653
654 return err;
655 }
656
657
uv__drain(uv_stream_t * stream)658 static void uv__drain(uv_stream_t* stream) {
659 uv_shutdown_t* req;
660 int err;
661
662 assert(QUEUE_EMPTY(&stream->write_queue));
663 if (!(stream->flags & UV_HANDLE_CLOSING)) {
664 uv__io_stop(stream->loop, &stream->io_watcher, POLLOUT);
665 uv__stream_osx_interrupt_select(stream);
666 }
667
668 if (!(stream->flags & UV_HANDLE_SHUTTING))
669 return;
670
671 req = stream->shutdown_req;
672 assert(req);
673
674 if ((stream->flags & UV_HANDLE_CLOSING) ||
675 !(stream->flags & UV_HANDLE_SHUT)) {
676 stream->shutdown_req = NULL;
677 stream->flags &= ~UV_HANDLE_SHUTTING;
678 uv__req_unregister(stream->loop, req);
679
680 err = 0;
681 if (stream->flags & UV_HANDLE_CLOSING)
682 /* The user destroyed the stream before we got to do the shutdown. */
683 err = UV_ECANCELED;
684 else if (shutdown(uv__stream_fd(stream), SHUT_WR))
685 err = UV__ERR(errno);
686 else /* Success. */
687 stream->flags |= UV_HANDLE_SHUT;
688
689 if (req->cb != NULL)
690 req->cb(req, err);
691 }
692 }
693
694
uv__writev(int fd,struct iovec * vec,size_t n)695 static ssize_t uv__writev(int fd, struct iovec* vec, size_t n) {
696 if (n == 1)
697 return write(fd, vec->iov_base, vec->iov_len);
698 else
699 return writev(fd, vec, n);
700 }
701
702
uv__write_req_size(uv_write_t * req)703 static size_t uv__write_req_size(uv_write_t* req) {
704 size_t size;
705
706 assert(req->bufs != NULL);
707 size = uv__count_bufs(req->bufs + req->write_index,
708 req->nbufs - req->write_index);
709 assert(req->handle->write_queue_size >= size);
710
711 return size;
712 }
713
714
715 /* Returns 1 if all write request data has been written, or 0 if there is still
716 * more data to write.
717 *
718 * Note: the return value only says something about the *current* request.
719 * There may still be other write requests sitting in the queue.
720 */
uv__write_req_update(uv_stream_t * stream,uv_write_t * req,size_t n)721 static int uv__write_req_update(uv_stream_t* stream,
722 uv_write_t* req,
723 size_t n) {
724 uv_buf_t* buf;
725 size_t len;
726
727 assert(n <= stream->write_queue_size);
728 stream->write_queue_size -= n;
729
730 buf = req->bufs + req->write_index;
731
732 do {
733 len = n < buf->len ? n : buf->len;
734 buf->base += len;
735 buf->len -= len;
736 buf += (buf->len == 0); /* Advance to next buffer if this one is empty. */
737 n -= len;
738 } while (n > 0);
739
740 req->write_index = buf - req->bufs;
741
742 return req->write_index == req->nbufs;
743 }
744
745
uv__write_req_finish(uv_write_t * req)746 static void uv__write_req_finish(uv_write_t* req) {
747 uv_stream_t* stream = req->handle;
748
749 /* Pop the req off tcp->write_queue. */
750 QUEUE_REMOVE(&req->queue);
751
752 /* Only free when there was no error. On error, we touch up write_queue_size
753 * right before making the callback. The reason we don't do that right away
754 * is that a write_queue_size > 0 is our only way to signal to the user that
755 * they should stop writing - which they should if we got an error. Something
756 * to revisit in future revisions of the libuv API.
757 */
758 if (req->error == 0) {
759 if (req->bufs != req->bufsml)
760 uv__free(req->bufs);
761 req->bufs = NULL;
762 }
763
764 /* Add it to the write_completed_queue where it will have its
765 * callback called in the near future.
766 */
767 QUEUE_INSERT_TAIL(&stream->write_completed_queue, &req->queue);
768 uv__io_feed(stream->loop, &stream->io_watcher);
769 }
770
771
uv__handle_fd(uv_handle_t * handle)772 static int uv__handle_fd(uv_handle_t* handle) {
773 switch (handle->type) {
774 case UV_NAMED_PIPE:
775 case UV_TCP:
776 return ((uv_stream_t*) handle)->io_watcher.fd;
777
778 case UV_UDP:
779 return ((uv_udp_t*) handle)->io_watcher.fd;
780
781 default:
782 return -1;
783 }
784 }
785
uv__try_write(uv_stream_t * stream,const uv_buf_t bufs[],unsigned int nbufs,uv_stream_t * send_handle)786 static int uv__try_write(uv_stream_t* stream,
787 const uv_buf_t bufs[],
788 unsigned int nbufs,
789 uv_stream_t* send_handle) {
790 struct iovec* iov;
791 int iovmax;
792 int iovcnt;
793 ssize_t n;
794
795 /*
796 * Cast to iovec. We had to have our own uv_buf_t instead of iovec
797 * because Windows's WSABUF is not an iovec.
798 */
799 iov = (struct iovec*) bufs;
800 iovcnt = nbufs;
801
802 iovmax = uv__getiovmax();
803
804 /* Limit iov count to avoid EINVALs from writev() */
805 if (iovcnt > iovmax)
806 iovcnt = iovmax;
807
808 /*
809 * Now do the actual writev. Note that we've been updating the pointers
810 * inside the iov each time we write. So there is no need to offset it.
811 */
812 if (send_handle != NULL) {
813 int fd_to_send;
814 struct msghdr msg;
815 struct cmsghdr *cmsg;
816 union {
817 char data[64];
818 struct cmsghdr alias;
819 } scratch;
820
821 if (uv__is_closing(send_handle))
822 return UV_EBADF;
823
824 fd_to_send = uv__handle_fd((uv_handle_t*) send_handle);
825
826 memset(&scratch, 0, sizeof(scratch));
827
828 assert(fd_to_send >= 0);
829
830 msg.msg_name = NULL;
831 msg.msg_namelen = 0;
832 msg.msg_iov = iov;
833 msg.msg_iovlen = iovcnt;
834 msg.msg_flags = 0;
835
836 msg.msg_control = &scratch.alias;
837 msg.msg_controllen = CMSG_SPACE(sizeof(fd_to_send));
838
839 cmsg = CMSG_FIRSTHDR(&msg);
840 cmsg->cmsg_level = SOL_SOCKET;
841 cmsg->cmsg_type = SCM_RIGHTS;
842 cmsg->cmsg_len = CMSG_LEN(sizeof(fd_to_send));
843
844 /* silence aliasing warning */
845 {
846 void* pv = CMSG_DATA(cmsg);
847 int* pi = pv;
848 *pi = fd_to_send;
849 }
850
851 do
852 n = sendmsg(uv__stream_fd(stream), &msg, 0);
853 while (n == -1 && errno == EINTR);
854 } else {
855 do
856 n = uv__writev(uv__stream_fd(stream), iov, iovcnt);
857 while (n == -1 && errno == EINTR);
858 }
859
860 if (n >= 0)
861 return n;
862
863 if (errno == EAGAIN || errno == EWOULDBLOCK || errno == ENOBUFS)
864 return UV_EAGAIN;
865
866 #ifdef __APPLE__
867 /* macOS versions 10.10 and 10.15 - and presumbaly 10.11 to 10.14, too -
868 * have a bug where a race condition causes the kernel to return EPROTOTYPE
869 * because the socket isn't fully constructed. It's probably the result of
870 * the peer closing the connection and that is why libuv translates it to
871 * ECONNRESET. Previously, libuv retried until the EPROTOTYPE error went
872 * away but some VPN software causes the same behavior except the error is
873 * permanent, not transient, turning the retry mechanism into an infinite
874 * loop. See https://github.com/libuv/libuv/pull/482.
875 */
876 if (errno == EPROTOTYPE)
877 return UV_ECONNRESET;
878 #endif /* __APPLE__ */
879
880 return UV__ERR(errno);
881 }
882
uv__write(uv_stream_t * stream)883 static void uv__write(uv_stream_t* stream) {
884 QUEUE* q;
885 uv_write_t* req;
886 ssize_t n;
887
888 assert(uv__stream_fd(stream) >= 0);
889
890 for (;;) {
891 if (QUEUE_EMPTY(&stream->write_queue))
892 return;
893
894 q = QUEUE_HEAD(&stream->write_queue);
895 req = QUEUE_DATA(q, uv_write_t, queue);
896 assert(req->handle == stream);
897
898 n = uv__try_write(stream,
899 &(req->bufs[req->write_index]),
900 req->nbufs - req->write_index,
901 req->send_handle);
902
903 /* Ensure the handle isn't sent again in case this is a partial write. */
904 if (n >= 0) {
905 req->send_handle = NULL;
906 if (uv__write_req_update(stream, req, n)) {
907 uv__write_req_finish(req);
908 return; /* TODO(bnoordhuis) Start trying to write the next request. */
909 }
910 } else if (n != UV_EAGAIN)
911 break;
912
913 /* If this is a blocking stream, try again. */
914 if (stream->flags & UV_HANDLE_BLOCKING_WRITES)
915 continue;
916
917 /* We're not done. */
918 uv__io_start(stream->loop, &stream->io_watcher, POLLOUT);
919
920 /* Notify select() thread about state change */
921 uv__stream_osx_interrupt_select(stream);
922
923 return;
924 }
925
926 req->error = n;
927 uv__write_req_finish(req);
928 uv__io_stop(stream->loop, &stream->io_watcher, POLLOUT);
929 uv__stream_osx_interrupt_select(stream);
930 }
931
932
uv__write_callbacks(uv_stream_t * stream)933 static void uv__write_callbacks(uv_stream_t* stream) {
934 uv_write_t* req;
935 QUEUE* q;
936 QUEUE pq;
937
938 if (QUEUE_EMPTY(&stream->write_completed_queue))
939 return;
940
941 QUEUE_MOVE(&stream->write_completed_queue, &pq);
942
943 while (!QUEUE_EMPTY(&pq)) {
944 /* Pop a req off write_completed_queue. */
945 q = QUEUE_HEAD(&pq);
946 req = QUEUE_DATA(q, uv_write_t, queue);
947 QUEUE_REMOVE(q);
948 uv__req_unregister(stream->loop, req);
949
950 if (req->bufs != NULL) {
951 stream->write_queue_size -= uv__write_req_size(req);
952 if (req->bufs != req->bufsml)
953 uv__free(req->bufs);
954 req->bufs = NULL;
955 }
956
957 /* NOTE: call callback AFTER freeing the request data. */
958 if (req->cb)
959 req->cb(req, req->error);
960 }
961 }
962
963
uv__stream_eof(uv_stream_t * stream,const uv_buf_t * buf)964 static void uv__stream_eof(uv_stream_t* stream, const uv_buf_t* buf) {
965 stream->flags |= UV_HANDLE_READ_EOF;
966 stream->flags &= ~UV_HANDLE_READING;
967 uv__io_stop(stream->loop, &stream->io_watcher, POLLIN);
968 uv__handle_stop(stream);
969 uv__stream_osx_interrupt_select(stream);
970 stream->read_cb(stream, UV_EOF, buf);
971 }
972
973
uv__stream_queue_fd(uv_stream_t * stream,int fd)974 static int uv__stream_queue_fd(uv_stream_t* stream, int fd) {
975 uv__stream_queued_fds_t* queued_fds;
976 unsigned int queue_size;
977
978 queued_fds = stream->queued_fds;
979 if (queued_fds == NULL) {
980 queue_size = 8;
981 queued_fds = uv__malloc((queue_size - 1) * sizeof(*queued_fds->fds) +
982 sizeof(*queued_fds));
983 if (queued_fds == NULL)
984 return UV_ENOMEM;
985 queued_fds->size = queue_size;
986 queued_fds->offset = 0;
987 stream->queued_fds = queued_fds;
988
989 /* Grow */
990 } else if (queued_fds->size == queued_fds->offset) {
991 queue_size = queued_fds->size + 8;
992 queued_fds = uv__realloc(queued_fds,
993 (queue_size - 1) * sizeof(*queued_fds->fds) +
994 sizeof(*queued_fds));
995
996 /*
997 * Allocation failure, report back.
998 * NOTE: if it is fatal - sockets will be closed in uv__stream_close
999 */
1000 if (queued_fds == NULL)
1001 return UV_ENOMEM;
1002 queued_fds->size = queue_size;
1003 stream->queued_fds = queued_fds;
1004 }
1005
1006 /* Put fd in a queue */
1007 queued_fds->fds[queued_fds->offset++] = fd;
1008
1009 return 0;
1010 }
1011
1012
1013 #if defined(__PASE__)
1014 /* on IBMi PASE the control message length can not exceed 256. */
1015 # define UV__CMSG_FD_COUNT 60
1016 #else
1017 # define UV__CMSG_FD_COUNT 64
1018 #endif
1019 #define UV__CMSG_FD_SIZE (UV__CMSG_FD_COUNT * sizeof(int))
1020
1021
uv__stream_recv_cmsg(uv_stream_t * stream,struct msghdr * msg)1022 static int uv__stream_recv_cmsg(uv_stream_t* stream, struct msghdr* msg) {
1023 struct cmsghdr* cmsg;
1024
1025 for (cmsg = CMSG_FIRSTHDR(msg); cmsg != NULL; cmsg = CMSG_NXTHDR(msg, cmsg)) {
1026 char* start;
1027 char* end;
1028 int err;
1029 void* pv;
1030 int* pi;
1031 unsigned int i;
1032 unsigned int count;
1033
1034 if (cmsg->cmsg_type != SCM_RIGHTS) {
1035 fprintf(stderr, "ignoring non-SCM_RIGHTS ancillary data: %d\n",
1036 cmsg->cmsg_type);
1037 continue;
1038 }
1039
1040 /* silence aliasing warning */
1041 pv = CMSG_DATA(cmsg);
1042 pi = pv;
1043
1044 /* Count available fds */
1045 start = (char*) cmsg;
1046 end = (char*) cmsg + cmsg->cmsg_len;
1047 count = 0;
1048 while (start + CMSG_LEN(count * sizeof(*pi)) < end)
1049 count++;
1050 assert(start + CMSG_LEN(count * sizeof(*pi)) == end);
1051
1052 for (i = 0; i < count; i++) {
1053 /* Already has accepted fd, queue now */
1054 if (stream->accepted_fd != -1) {
1055 err = uv__stream_queue_fd(stream, pi[i]);
1056 if (err != 0) {
1057 /* Close rest */
1058 for (; i < count; i++)
1059 uv__close(pi[i]);
1060 return err;
1061 }
1062 } else {
1063 stream->accepted_fd = pi[i];
1064 }
1065 }
1066 }
1067
1068 return 0;
1069 }
1070
1071
1072 #ifdef __clang__
1073 # pragma clang diagnostic push
1074 # pragma clang diagnostic ignored "-Wgnu-folding-constant"
1075 # pragma clang diagnostic ignored "-Wvla-extension"
1076 #endif
1077
uv__read(uv_stream_t * stream)1078 static void uv__read(uv_stream_t* stream) {
1079 uv_buf_t buf;
1080 ssize_t nread;
1081 struct msghdr msg;
1082 char cmsg_space[CMSG_SPACE(UV__CMSG_FD_SIZE)];
1083 int count;
1084 int err;
1085 int is_ipc;
1086
1087 stream->flags &= ~UV_HANDLE_READ_PARTIAL;
1088
1089 /* Prevent loop starvation when the data comes in as fast as (or faster than)
1090 * we can read it. XXX Need to rearm fd if we switch to edge-triggered I/O.
1091 */
1092 count = 32;
1093
1094 is_ipc = stream->type == UV_NAMED_PIPE && ((uv_pipe_t*) stream)->ipc;
1095
1096 /* XXX: Maybe instead of having UV_HANDLE_READING we just test if
1097 * tcp->read_cb is NULL or not?
1098 */
1099 while (stream->read_cb
1100 && (stream->flags & UV_HANDLE_READING)
1101 && (count-- > 0)) {
1102 assert(stream->alloc_cb != NULL);
1103
1104 buf = uv_buf_init(NULL, 0);
1105 stream->alloc_cb((uv_handle_t*)stream, 64 * 1024, &buf);
1106 if (buf.base == NULL || buf.len == 0) {
1107 /* User indicates it can't or won't handle the read. */
1108 stream->read_cb(stream, UV_ENOBUFS, &buf);
1109 return;
1110 }
1111
1112 assert(buf.base != NULL);
1113 assert(uv__stream_fd(stream) >= 0);
1114
1115 if (!is_ipc) {
1116 do {
1117 nread = read(uv__stream_fd(stream), buf.base, buf.len);
1118 }
1119 while (nread < 0 && errno == EINTR);
1120 } else {
1121 /* ipc uses recvmsg */
1122 msg.msg_flags = 0;
1123 msg.msg_iov = (struct iovec*) &buf;
1124 msg.msg_iovlen = 1;
1125 msg.msg_name = NULL;
1126 msg.msg_namelen = 0;
1127 /* Set up to receive a descriptor even if one isn't in the message */
1128 msg.msg_controllen = sizeof(cmsg_space);
1129 msg.msg_control = cmsg_space;
1130
1131 do {
1132 nread = uv__recvmsg(uv__stream_fd(stream), &msg, 0);
1133 }
1134 while (nread < 0 && errno == EINTR);
1135 }
1136
1137 if (nread < 0) {
1138 /* Error */
1139 if (errno == EAGAIN || errno == EWOULDBLOCK) {
1140 /* Wait for the next one. */
1141 if (stream->flags & UV_HANDLE_READING) {
1142 uv__io_start(stream->loop, &stream->io_watcher, POLLIN);
1143 uv__stream_osx_interrupt_select(stream);
1144 }
1145 stream->read_cb(stream, 0, &buf);
1146 #if defined(__CYGWIN__) || defined(__MSYS__)
1147 } else if (errno == ECONNRESET && stream->type == UV_NAMED_PIPE) {
1148 uv__stream_eof(stream, &buf);
1149 return;
1150 #endif
1151 } else {
1152 /* Error. User should call uv_close(). */
1153 stream->flags &= ~(UV_HANDLE_READABLE | UV_HANDLE_WRITABLE);
1154 stream->read_cb(stream, UV__ERR(errno), &buf);
1155 if (stream->flags & UV_HANDLE_READING) {
1156 stream->flags &= ~UV_HANDLE_READING;
1157 uv__io_stop(stream->loop, &stream->io_watcher, POLLIN);
1158 uv__handle_stop(stream);
1159 uv__stream_osx_interrupt_select(stream);
1160 }
1161 }
1162 return;
1163 } else if (nread == 0) {
1164 uv__stream_eof(stream, &buf);
1165 return;
1166 } else {
1167 /* Successful read */
1168 ssize_t buflen = buf.len;
1169
1170 if (is_ipc) {
1171 err = uv__stream_recv_cmsg(stream, &msg);
1172 if (err != 0) {
1173 stream->read_cb(stream, err, &buf);
1174 return;
1175 }
1176 }
1177
1178 #if defined(__MVS__)
1179 if (is_ipc && msg.msg_controllen > 0) {
1180 uv_buf_t blankbuf;
1181 int nread;
1182 struct iovec *old;
1183
1184 blankbuf.base = 0;
1185 blankbuf.len = 0;
1186 old = msg.msg_iov;
1187 msg.msg_iov = (struct iovec*) &blankbuf;
1188 nread = 0;
1189 do {
1190 nread = uv__recvmsg(uv__stream_fd(stream), &msg, 0);
1191 err = uv__stream_recv_cmsg(stream, &msg);
1192 if (err != 0) {
1193 stream->read_cb(stream, err, &buf);
1194 msg.msg_iov = old;
1195 return;
1196 }
1197 } while (nread == 0 && msg.msg_controllen > 0);
1198 msg.msg_iov = old;
1199 }
1200 #endif
1201 stream->read_cb(stream, nread, &buf);
1202
1203 /* Return if we didn't fill the buffer, there is no more data to read. */
1204 if (nread < buflen) {
1205 stream->flags |= UV_HANDLE_READ_PARTIAL;
1206 return;
1207 }
1208 }
1209 }
1210 }
1211
1212
1213 #ifdef __clang__
1214 # pragma clang diagnostic pop
1215 #endif
1216
1217 #undef UV__CMSG_FD_COUNT
1218 #undef UV__CMSG_FD_SIZE
1219
1220
uv_shutdown(uv_shutdown_t * req,uv_stream_t * stream,uv_shutdown_cb cb)1221 int uv_shutdown(uv_shutdown_t* req, uv_stream_t* stream, uv_shutdown_cb cb) {
1222 assert(stream->type == UV_TCP ||
1223 stream->type == UV_TTY ||
1224 stream->type == UV_NAMED_PIPE);
1225
1226 if (!(stream->flags & UV_HANDLE_WRITABLE) ||
1227 stream->flags & UV_HANDLE_SHUT ||
1228 stream->flags & UV_HANDLE_SHUTTING ||
1229 uv__is_closing(stream)) {
1230 return UV_ENOTCONN;
1231 }
1232
1233 assert(uv__stream_fd(stream) >= 0);
1234
1235 /* Initialize request. The `shutdown(2)` call will always be deferred until
1236 * `uv__drain`, just before the callback is run. */
1237 uv__req_init(stream->loop, req, UV_SHUTDOWN);
1238 req->handle = stream;
1239 req->cb = cb;
1240 stream->shutdown_req = req;
1241 stream->flags |= UV_HANDLE_SHUTTING;
1242 stream->flags &= ~UV_HANDLE_WRITABLE;
1243
1244 if (QUEUE_EMPTY(&stream->write_queue))
1245 uv__io_feed(stream->loop, &stream->io_watcher);
1246
1247 return 0;
1248 }
1249
1250
uv__stream_io(uv_loop_t * loop,uv__io_t * w,unsigned int events)1251 static void uv__stream_io(uv_loop_t* loop, uv__io_t* w, unsigned int events) {
1252 uv_stream_t* stream;
1253
1254 stream = container_of(w, uv_stream_t, io_watcher);
1255
1256 assert(stream->type == UV_TCP ||
1257 stream->type == UV_NAMED_PIPE ||
1258 stream->type == UV_TTY);
1259 assert(!(stream->flags & UV_HANDLE_CLOSING));
1260
1261 if (stream->connect_req) {
1262 uv__stream_connect(stream);
1263 return;
1264 }
1265
1266 assert(uv__stream_fd(stream) >= 0);
1267
1268 /* Ignore POLLHUP here. Even if it's set, there may still be data to read. */
1269 if (events & (POLLIN | POLLERR | POLLHUP))
1270 uv__read(stream);
1271
1272 if (uv__stream_fd(stream) == -1)
1273 return; /* read_cb closed stream. */
1274
1275 /* Short-circuit iff POLLHUP is set, the user is still interested in read
1276 * events and uv__read() reported a partial read but not EOF. If the EOF
1277 * flag is set, uv__read() called read_cb with err=UV_EOF and we don't
1278 * have to do anything. If the partial read flag is not set, we can't
1279 * report the EOF yet because there is still data to read.
1280 */
1281 if ((events & POLLHUP) &&
1282 (stream->flags & UV_HANDLE_READING) &&
1283 (stream->flags & UV_HANDLE_READ_PARTIAL) &&
1284 !(stream->flags & UV_HANDLE_READ_EOF)) {
1285 uv_buf_t buf = { NULL, 0 };
1286 uv__stream_eof(stream, &buf);
1287 }
1288
1289 if (uv__stream_fd(stream) == -1)
1290 return; /* read_cb closed stream. */
1291
1292 if (events & (POLLOUT | POLLERR | POLLHUP)) {
1293 uv__write(stream);
1294 uv__write_callbacks(stream);
1295
1296 /* Write queue drained. */
1297 if (QUEUE_EMPTY(&stream->write_queue))
1298 uv__drain(stream);
1299 }
1300 }
1301
1302
1303 /**
1304 * We get called here from directly following a call to connect(2).
1305 * In order to determine if we've errored out or succeeded must call
1306 * getsockopt.
1307 */
uv__stream_connect(uv_stream_t * stream)1308 static void uv__stream_connect(uv_stream_t* stream) {
1309 int error;
1310 uv_connect_t* req = stream->connect_req;
1311 socklen_t errorsize = sizeof(int);
1312
1313 assert(stream->type == UV_TCP || stream->type == UV_NAMED_PIPE);
1314 assert(req);
1315
1316 if (stream->delayed_error) {
1317 /* To smooth over the differences between unixes errors that
1318 * were reported synchronously on the first connect can be delayed
1319 * until the next tick--which is now.
1320 */
1321 error = stream->delayed_error;
1322 stream->delayed_error = 0;
1323 } else {
1324 /* Normal situation: we need to get the socket error from the kernel. */
1325 assert(uv__stream_fd(stream) >= 0);
1326 getsockopt(uv__stream_fd(stream),
1327 SOL_SOCKET,
1328 SO_ERROR,
1329 &error,
1330 &errorsize);
1331 error = UV__ERR(error);
1332 }
1333
1334 if (error == UV__ERR(EINPROGRESS))
1335 return;
1336
1337 stream->connect_req = NULL;
1338 uv__req_unregister(stream->loop, req);
1339
1340 if (error < 0 || QUEUE_EMPTY(&stream->write_queue)) {
1341 uv__io_stop(stream->loop, &stream->io_watcher, POLLOUT);
1342 }
1343
1344 if (req->cb)
1345 req->cb(req, error);
1346
1347 if (uv__stream_fd(stream) == -1)
1348 return;
1349
1350 if (error < 0) {
1351 uv__stream_flush_write_queue(stream, UV_ECANCELED);
1352 uv__write_callbacks(stream);
1353 }
1354 }
1355
1356
uv__check_before_write(uv_stream_t * stream,unsigned int nbufs,uv_stream_t * send_handle)1357 static int uv__check_before_write(uv_stream_t* stream,
1358 unsigned int nbufs,
1359 uv_stream_t* send_handle) {
1360 assert(nbufs > 0);
1361 assert((stream->type == UV_TCP ||
1362 stream->type == UV_NAMED_PIPE ||
1363 stream->type == UV_TTY) &&
1364 "uv_write (unix) does not yet support other types of streams");
1365
1366 if (uv__stream_fd(stream) < 0)
1367 return UV_EBADF;
1368
1369 if (!(stream->flags & UV_HANDLE_WRITABLE))
1370 return UV_EPIPE;
1371
1372 if (send_handle != NULL) {
1373 if (stream->type != UV_NAMED_PIPE || !((uv_pipe_t*)stream)->ipc)
1374 return UV_EINVAL;
1375
1376 /* XXX We abuse uv_write2() to send over UDP handles to child processes.
1377 * Don't call uv__stream_fd() on those handles, it's a macro that on OS X
1378 * evaluates to a function that operates on a uv_stream_t with a couple of
1379 * OS X specific fields. On other Unices it does (handle)->io_watcher.fd,
1380 * which works but only by accident.
1381 */
1382 if (uv__handle_fd((uv_handle_t*) send_handle) < 0)
1383 return UV_EBADF;
1384
1385 #if defined(__CYGWIN__) || defined(__MSYS__)
1386 /* Cygwin recvmsg always sets msg_controllen to zero, so we cannot send it.
1387 See https://github.com/mirror/newlib-cygwin/blob/86fc4bf0/winsup/cygwin/fhandler_socket.cc#L1736-L1743 */
1388 return UV_ENOSYS;
1389 #endif
1390 }
1391
1392 return 0;
1393 }
1394
uv_write2(uv_write_t * req,uv_stream_t * stream,const uv_buf_t bufs[],unsigned int nbufs,uv_stream_t * send_handle,uv_write_cb cb)1395 int uv_write2(uv_write_t* req,
1396 uv_stream_t* stream,
1397 const uv_buf_t bufs[],
1398 unsigned int nbufs,
1399 uv_stream_t* send_handle,
1400 uv_write_cb cb) {
1401 int empty_queue;
1402 int err;
1403
1404 err = uv__check_before_write(stream, nbufs, send_handle);
1405 if (err < 0)
1406 return err;
1407
1408 /* It's legal for write_queue_size > 0 even when the write_queue is empty;
1409 * it means there are error-state requests in the write_completed_queue that
1410 * will touch up write_queue_size later, see also uv__write_req_finish().
1411 * We could check that write_queue is empty instead but that implies making
1412 * a write() syscall when we know that the handle is in error mode.
1413 */
1414 empty_queue = (stream->write_queue_size == 0);
1415
1416 /* Initialize the req */
1417 uv__req_init(stream->loop, req, UV_WRITE);
1418 req->cb = cb;
1419 req->handle = stream;
1420 req->error = 0;
1421 req->send_handle = send_handle;
1422 QUEUE_INIT(&req->queue);
1423
1424 req->bufs = req->bufsml;
1425 if (nbufs > ARRAY_SIZE(req->bufsml))
1426 req->bufs = uv__malloc(nbufs * sizeof(bufs[0]));
1427
1428 if (req->bufs == NULL)
1429 return UV_ENOMEM;
1430
1431 memcpy(req->bufs, bufs, nbufs * sizeof(bufs[0]));
1432 req->nbufs = nbufs;
1433 req->write_index = 0;
1434 stream->write_queue_size += uv__count_bufs(bufs, nbufs);
1435
1436 /* Append the request to write_queue. */
1437 QUEUE_INSERT_TAIL(&stream->write_queue, &req->queue);
1438
1439 /* If the queue was empty when this function began, we should attempt to
1440 * do the write immediately. Otherwise start the write_watcher and wait
1441 * for the fd to become writable.
1442 */
1443 if (stream->connect_req) {
1444 /* Still connecting, do nothing. */
1445 }
1446 else if (empty_queue) {
1447 uv__write(stream);
1448 }
1449 else {
1450 /*
1451 * blocking streams should never have anything in the queue.
1452 * if this assert fires then somehow the blocking stream isn't being
1453 * sufficiently flushed in uv__write.
1454 */
1455 assert(!(stream->flags & UV_HANDLE_BLOCKING_WRITES));
1456 uv__io_start(stream->loop, &stream->io_watcher, POLLOUT);
1457 uv__stream_osx_interrupt_select(stream);
1458 }
1459
1460 return 0;
1461 }
1462
1463
1464 /* The buffers to be written must remain valid until the callback is called.
1465 * This is not required for the uv_buf_t array.
1466 */
uv_write(uv_write_t * req,uv_stream_t * handle,const uv_buf_t bufs[],unsigned int nbufs,uv_write_cb cb)1467 int uv_write(uv_write_t* req,
1468 uv_stream_t* handle,
1469 const uv_buf_t bufs[],
1470 unsigned int nbufs,
1471 uv_write_cb cb) {
1472 return uv_write2(req, handle, bufs, nbufs, NULL, cb);
1473 }
1474
1475
uv_try_write(uv_stream_t * stream,const uv_buf_t bufs[],unsigned int nbufs)1476 int uv_try_write(uv_stream_t* stream,
1477 const uv_buf_t bufs[],
1478 unsigned int nbufs) {
1479 return uv_try_write2(stream, bufs, nbufs, NULL);
1480 }
1481
1482
uv_try_write2(uv_stream_t * stream,const uv_buf_t bufs[],unsigned int nbufs,uv_stream_t * send_handle)1483 int uv_try_write2(uv_stream_t* stream,
1484 const uv_buf_t bufs[],
1485 unsigned int nbufs,
1486 uv_stream_t* send_handle) {
1487 int err;
1488
1489 /* Connecting or already writing some data */
1490 if (stream->connect_req != NULL || stream->write_queue_size != 0)
1491 return UV_EAGAIN;
1492
1493 err = uv__check_before_write(stream, nbufs, NULL);
1494 if (err < 0)
1495 return err;
1496
1497 return uv__try_write(stream, bufs, nbufs, send_handle);
1498 }
1499
1500
uv__read_start(uv_stream_t * stream,uv_alloc_cb alloc_cb,uv_read_cb read_cb)1501 int uv__read_start(uv_stream_t* stream,
1502 uv_alloc_cb alloc_cb,
1503 uv_read_cb read_cb) {
1504 assert(stream->type == UV_TCP || stream->type == UV_NAMED_PIPE ||
1505 stream->type == UV_TTY);
1506
1507 /* The UV_HANDLE_READING flag is irrelevant of the state of the stream - it
1508 * just expresses the desired state of the user. */
1509 stream->flags |= UV_HANDLE_READING;
1510 stream->flags &= ~UV_HANDLE_READ_EOF;
1511
1512 /* TODO: try to do the read inline? */
1513 assert(uv__stream_fd(stream) >= 0);
1514 assert(alloc_cb);
1515
1516 stream->read_cb = read_cb;
1517 stream->alloc_cb = alloc_cb;
1518
1519 uv__io_start(stream->loop, &stream->io_watcher, POLLIN);
1520 uv__handle_start(stream);
1521 uv__stream_osx_interrupt_select(stream);
1522
1523 return 0;
1524 }
1525
1526
uv_read_stop(uv_stream_t * stream)1527 int uv_read_stop(uv_stream_t* stream) {
1528 if (!(stream->flags & UV_HANDLE_READING))
1529 return 0;
1530
1531 stream->flags &= ~UV_HANDLE_READING;
1532 uv__io_stop(stream->loop, &stream->io_watcher, POLLIN);
1533 uv__handle_stop(stream);
1534 uv__stream_osx_interrupt_select(stream);
1535
1536 stream->read_cb = NULL;
1537 stream->alloc_cb = NULL;
1538 return 0;
1539 }
1540
1541
uv_is_readable(const uv_stream_t * stream)1542 int uv_is_readable(const uv_stream_t* stream) {
1543 return !!(stream->flags & UV_HANDLE_READABLE);
1544 }
1545
1546
uv_is_writable(const uv_stream_t * stream)1547 int uv_is_writable(const uv_stream_t* stream) {
1548 return !!(stream->flags & UV_HANDLE_WRITABLE);
1549 }
1550
1551
1552 #if defined(__APPLE__)
uv___stream_fd(const uv_stream_t * handle)1553 int uv___stream_fd(const uv_stream_t* handle) {
1554 const uv__stream_select_t* s;
1555
1556 assert(handle->type == UV_TCP ||
1557 handle->type == UV_TTY ||
1558 handle->type == UV_NAMED_PIPE);
1559
1560 s = handle->select;
1561 if (s != NULL)
1562 return s->fd;
1563
1564 return handle->io_watcher.fd;
1565 }
1566 #endif /* defined(__APPLE__) */
1567
1568
uv__stream_close(uv_stream_t * handle)1569 void uv__stream_close(uv_stream_t* handle) {
1570 unsigned int i;
1571 uv__stream_queued_fds_t* queued_fds;
1572
1573 #if defined(__APPLE__)
1574 /* Terminate select loop first */
1575 if (handle->select != NULL) {
1576 uv__stream_select_t* s;
1577
1578 s = handle->select;
1579
1580 uv_sem_post(&s->close_sem);
1581 uv_sem_post(&s->async_sem);
1582 uv__stream_osx_interrupt_select(handle);
1583 uv_thread_join(&s->thread);
1584 uv_sem_destroy(&s->close_sem);
1585 uv_sem_destroy(&s->async_sem);
1586 uv__close(s->fake_fd);
1587 uv__close(s->int_fd);
1588 uv_close((uv_handle_t*) &s->async, uv__stream_osx_cb_close);
1589
1590 handle->select = NULL;
1591 }
1592 #endif /* defined(__APPLE__) */
1593
1594 uv__io_close(handle->loop, &handle->io_watcher);
1595 uv_read_stop(handle);
1596 uv__handle_stop(handle);
1597 handle->flags &= ~(UV_HANDLE_READABLE | UV_HANDLE_WRITABLE);
1598
1599 if (handle->io_watcher.fd != -1) {
1600 /* Don't close stdio file descriptors. Nothing good comes from it. */
1601 if (handle->io_watcher.fd > STDERR_FILENO)
1602 uv__close(handle->io_watcher.fd);
1603 handle->io_watcher.fd = -1;
1604 }
1605
1606 if (handle->accepted_fd != -1) {
1607 uv__close(handle->accepted_fd);
1608 handle->accepted_fd = -1;
1609 }
1610
1611 /* Close all queued fds */
1612 if (handle->queued_fds != NULL) {
1613 queued_fds = handle->queued_fds;
1614 for (i = 0; i < queued_fds->offset; i++)
1615 uv__close(queued_fds->fds[i]);
1616 uv__free(handle->queued_fds);
1617 handle->queued_fds = NULL;
1618 }
1619
1620 assert(!uv__io_active(&handle->io_watcher, POLLIN | POLLOUT));
1621 }
1622
1623
uv_stream_set_blocking(uv_stream_t * handle,int blocking)1624 int uv_stream_set_blocking(uv_stream_t* handle, int blocking) {
1625 /* Don't need to check the file descriptor, uv__nonblock()
1626 * will fail with EBADF if it's not valid.
1627 */
1628 return uv__nonblock(uv__stream_fd(handle), !blocking);
1629 }
1630