1 /* Copyright Joyent, Inc. and other Node contributors. All rights reserved. 2 * 3 * Permission is hereby granted, free of charge, to any person obtaining a copy 4 * of this software and associated documentation files (the "Software"), to 5 * deal in the Software without restriction, including without limitation the 6 * rights to use, copy, modify, merge, publish, distribute, sublicense, and/or 7 * sell copies of the Software, and to permit persons to whom the Software is 8 * furnished to do so, subject to the following conditions: 9 * 10 * The above copyright notice and this permission notice shall be included in 11 * all copies or substantial portions of the Software. 12 * 13 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR 14 * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, 15 * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE 16 * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER 17 * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING 18 * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS 19 * IN THE SOFTWARE. 20 */ 21 22 #include "uv.h" 23 #include "internal.h" 24 25 #include <stdio.h> 26 #include <stdlib.h> 27 #include <string.h> 28 #include <assert.h> 29 #include <errno.h> 30 31 #include <sys/types.h> 32 #include <sys/socket.h> 33 #include <sys/uio.h> 34 #include <sys/un.h> 35 #include <unistd.h> 36 #include <limits.h> /* IOV_MAX */ 37 38 #if defined(__APPLE__) 39 # include <sys/event.h> 40 # include <sys/time.h> 41 # include <sys/select.h> 42 43 /* Forward declaration */ 44 typedef struct uv__stream_select_s uv__stream_select_t; 45 46 struct uv__stream_select_s { 47 uv_stream_t* stream; 48 uv_thread_t thread; 49 uv_sem_t close_sem; 50 uv_sem_t async_sem; 51 uv_async_t async; 52 int events; 53 int fake_fd; 54 int int_fd; 55 int fd; 56 fd_set* sread; 57 size_t sread_sz; 58 fd_set* swrite; 59 size_t swrite_sz; 60 }; 61 #endif /* defined(__APPLE__) */ 62 63 static void uv__stream_connect(uv_stream_t*); 64 static void uv__write(uv_stream_t* stream); 65 static void uv__read(uv_stream_t* stream); 66 static void uv__stream_io(uv_loop_t* loop, uv__io_t* w, unsigned int events); 67 static void uv__write_callbacks(uv_stream_t* stream); 68 static size_t uv__write_req_size(uv_write_t* req); 69 static void uv__drain(uv_stream_t* stream); 70 71 72 void uv__stream_init(uv_loop_t* loop, 73 uv_stream_t* stream, 74 uv_handle_type type) { 75 int err; 76 77 uv__handle_init(loop, (uv_handle_t*)stream, type); 78 stream->read_cb = NULL; 79 stream->alloc_cb = NULL; 80 stream->close_cb = NULL; 81 stream->connection_cb = NULL; 82 stream->connect_req = NULL; 83 stream->shutdown_req = NULL; 84 stream->accepted_fd = -1; 85 stream->queued_fds = NULL; 86 stream->delayed_error = 0; 87 QUEUE_INIT(&stream->write_queue); 88 QUEUE_INIT(&stream->write_completed_queue); 89 stream->write_queue_size = 0; 90 91 if (loop->emfile_fd == -1) { 92 err = uv__open_cloexec("/dev/null", O_RDONLY); 93 if (err < 0) 94 /* In the rare case that "/dev/null" isn't mounted open "/" 95 * instead. 96 */ 97 err = uv__open_cloexec("/", O_RDONLY); 98 if (err >= 0) 99 loop->emfile_fd = err; 100 } 101 102 #if defined(__APPLE__) 103 stream->select = NULL; 104 #endif /* defined(__APPLE_) */ 105 106 uv__io_init(&stream->io_watcher, uv__stream_io, -1); 107 } 108 109 110 static void uv__stream_osx_interrupt_select(uv_stream_t* stream) { 111 #if defined(__APPLE__) 112 /* Notify select() thread about state change */ 113 uv__stream_select_t* s; 114 int r; 115 116 s = stream->select; 117 if (s == NULL) 118 return; 119 120 /* Interrupt select() loop 121 * NOTE: fake_fd and int_fd are socketpair(), thus writing to one will 122 * emit read event on other side 123 */ 124 do 125 r = write(s->fake_fd, "x", 1); 126 while (r == -1 && errno == EINTR); 127 128 assert(r == 1); 129 #else /* !defined(__APPLE__) */ 130 /* No-op on any other platform */ 131 #endif /* !defined(__APPLE__) */ 132 } 133 134 135 #if defined(__APPLE__) 136 static void uv__stream_osx_select(void* arg) { 137 uv_stream_t* stream; 138 uv__stream_select_t* s; 139 char buf[1024]; 140 int events; 141 int fd; 142 int r; 143 int max_fd; 144 145 stream = arg; 146 s = stream->select; 147 fd = s->fd; 148 149 if (fd > s->int_fd) 150 max_fd = fd; 151 else 152 max_fd = s->int_fd; 153 154 for (;;) { 155 /* Terminate on semaphore */ 156 if (uv_sem_trywait(&s->close_sem) == 0) 157 break; 158 159 /* Watch fd using select(2) */ 160 memset(s->sread, 0, s->sread_sz); 161 memset(s->swrite, 0, s->swrite_sz); 162 163 if (uv__io_active(&stream->io_watcher, POLLIN)) 164 FD_SET(fd, s->sread); 165 if (uv__io_active(&stream->io_watcher, POLLOUT)) 166 FD_SET(fd, s->swrite); 167 FD_SET(s->int_fd, s->sread); 168 169 /* Wait indefinitely for fd events */ 170 r = select(max_fd + 1, s->sread, s->swrite, NULL, NULL); 171 if (r == -1) { 172 if (errno == EINTR) 173 continue; 174 175 /* XXX: Possible?! */ 176 abort(); 177 } 178 179 /* Ignore timeouts */ 180 if (r == 0) 181 continue; 182 183 /* Empty socketpair's buffer in case of interruption */ 184 if (FD_ISSET(s->int_fd, s->sread)) 185 for (;;) { 186 r = read(s->int_fd, buf, sizeof(buf)); 187 188 if (r == sizeof(buf)) 189 continue; 190 191 if (r != -1) 192 break; 193 194 if (errno == EAGAIN || errno == EWOULDBLOCK) 195 break; 196 197 if (errno == EINTR) 198 continue; 199 200 abort(); 201 } 202 203 /* Handle events */ 204 events = 0; 205 if (FD_ISSET(fd, s->sread)) 206 events |= POLLIN; 207 if (FD_ISSET(fd, s->swrite)) 208 events |= POLLOUT; 209 210 assert(events != 0 || FD_ISSET(s->int_fd, s->sread)); 211 if (events != 0) { 212 ACCESS_ONCE(int, s->events) = events; 213 214 uv_async_send(&s->async); 215 uv_sem_wait(&s->async_sem); 216 217 /* Should be processed at this stage */ 218 assert((s->events == 0) || (stream->flags & UV_HANDLE_CLOSING)); 219 } 220 } 221 } 222 223 224 static void uv__stream_osx_select_cb(uv_async_t* handle) { 225 uv__stream_select_t* s; 226 uv_stream_t* stream; 227 int events; 228 229 s = container_of(handle, uv__stream_select_t, async); 230 stream = s->stream; 231 232 /* Get and reset stream's events */ 233 events = s->events; 234 ACCESS_ONCE(int, s->events) = 0; 235 236 assert(events != 0); 237 assert(events == (events & (POLLIN | POLLOUT))); 238 239 /* Invoke callback on event-loop */ 240 if ((events & POLLIN) && uv__io_active(&stream->io_watcher, POLLIN)) 241 uv__stream_io(stream->loop, &stream->io_watcher, POLLIN); 242 243 if ((events & POLLOUT) && uv__io_active(&stream->io_watcher, POLLOUT)) 244 uv__stream_io(stream->loop, &stream->io_watcher, POLLOUT); 245 246 if (stream->flags & UV_HANDLE_CLOSING) 247 return; 248 249 /* NOTE: It is important to do it here, otherwise `select()` might be called 250 * before the actual `uv__read()`, leading to the blocking syscall 251 */ 252 uv_sem_post(&s->async_sem); 253 } 254 255 256 static void uv__stream_osx_cb_close(uv_handle_t* async) { 257 uv__stream_select_t* s; 258 259 s = container_of(async, uv__stream_select_t, async); 260 uv__free(s); 261 } 262 263 264 int uv__stream_try_select(uv_stream_t* stream, int* fd) { 265 /* 266 * kqueue doesn't work with some files from /dev mount on osx. 267 * select(2) in separate thread for those fds 268 */ 269 270 struct kevent filter[1]; 271 struct kevent events[1]; 272 struct timespec timeout; 273 uv__stream_select_t* s; 274 int fds[2]; 275 int err; 276 int ret; 277 int kq; 278 int old_fd; 279 int max_fd; 280 size_t sread_sz; 281 size_t swrite_sz; 282 283 kq = kqueue(); 284 if (kq == -1) { 285 perror("(libuv) kqueue()"); 286 return UV__ERR(errno); 287 } 288 289 EV_SET(&filter[0], *fd, EVFILT_READ, EV_ADD | EV_ENABLE, 0, 0, 0); 290 291 /* Use small timeout, because we only want to capture EINVALs */ 292 timeout.tv_sec = 0; 293 timeout.tv_nsec = 1; 294 295 do 296 ret = kevent(kq, filter, 1, events, 1, &timeout); 297 while (ret == -1 && errno == EINTR); 298 299 uv__close(kq); 300 301 if (ret == -1) 302 return UV__ERR(errno); 303 304 if (ret == 0 || (events[0].flags & EV_ERROR) == 0 || events[0].data != EINVAL) 305 return 0; 306 307 /* At this point we definitely know that this fd won't work with kqueue */ 308 309 /* 310 * Create fds for io watcher and to interrupt the select() loop. 311 * NOTE: do it ahead of malloc below to allocate enough space for fd_sets 312 */ 313 if (socketpair(AF_UNIX, SOCK_STREAM, 0, fds)) 314 return UV__ERR(errno); 315 316 max_fd = *fd; 317 if (fds[1] > max_fd) 318 max_fd = fds[1]; 319 320 sread_sz = ROUND_UP(max_fd + 1, sizeof(uint32_t) * NBBY) / NBBY; 321 swrite_sz = sread_sz; 322 323 s = uv__malloc(sizeof(*s) + sread_sz + swrite_sz); 324 if (s == NULL) { 325 err = UV_ENOMEM; 326 goto failed_malloc; 327 } 328 329 s->events = 0; 330 s->fd = *fd; 331 s->sread = (fd_set*) ((char*) s + sizeof(*s)); 332 s->sread_sz = sread_sz; 333 s->swrite = (fd_set*) ((char*) s->sread + sread_sz); 334 s->swrite_sz = swrite_sz; 335 336 err = uv_async_init(stream->loop, &s->async, uv__stream_osx_select_cb); 337 if (err) 338 goto failed_async_init; 339 340 s->async.flags |= UV_HANDLE_INTERNAL; 341 uv__handle_unref(&s->async); 342 343 err = uv_sem_init(&s->close_sem, 0); 344 if (err != 0) 345 goto failed_close_sem_init; 346 347 err = uv_sem_init(&s->async_sem, 0); 348 if (err != 0) 349 goto failed_async_sem_init; 350 351 s->fake_fd = fds[0]; 352 s->int_fd = fds[1]; 353 354 old_fd = *fd; 355 s->stream = stream; 356 stream->select = s; 357 *fd = s->fake_fd; 358 359 err = uv_thread_create(&s->thread, uv__stream_osx_select, stream); 360 if (err != 0) 361 goto failed_thread_create; 362 363 return 0; 364 365 failed_thread_create: 366 s->stream = NULL; 367 stream->select = NULL; 368 *fd = old_fd; 369 370 uv_sem_destroy(&s->async_sem); 371 372 failed_async_sem_init: 373 uv_sem_destroy(&s->close_sem); 374 375 failed_close_sem_init: 376 uv__close(fds[0]); 377 uv__close(fds[1]); 378 uv_close((uv_handle_t*) &s->async, uv__stream_osx_cb_close); 379 return err; 380 381 failed_async_init: 382 uv__free(s); 383 384 failed_malloc: 385 uv__close(fds[0]); 386 uv__close(fds[1]); 387 388 return err; 389 } 390 #endif /* defined(__APPLE__) */ 391 392 393 int uv__stream_open(uv_stream_t* stream, int fd, int flags) { 394 #if defined(__APPLE__) 395 int enable; 396 #endif 397 398 if (!(stream->io_watcher.fd == -1 || stream->io_watcher.fd == fd)) 399 return UV_EBUSY; 400 401 assert(fd >= 0); 402 stream->flags |= flags; 403 404 if (stream->type == UV_TCP) { 405 if ((stream->flags & UV_HANDLE_TCP_NODELAY) && uv__tcp_nodelay(fd, 1)) 406 return UV__ERR(errno); 407 408 /* TODO Use delay the user passed in. */ 409 if ((stream->flags & UV_HANDLE_TCP_KEEPALIVE) && 410 uv__tcp_keepalive(fd, 1, 60)) { 411 return UV__ERR(errno); 412 } 413 } 414 415 #if defined(__APPLE__) 416 enable = 1; 417 if (setsockopt(fd, SOL_SOCKET, SO_OOBINLINE, &enable, sizeof(enable)) && 418 errno != ENOTSOCK && 419 errno != EINVAL) { 420 return UV__ERR(errno); 421 } 422 #endif 423 424 stream->io_watcher.fd = fd; 425 426 return 0; 427 } 428 429 430 void uv__stream_flush_write_queue(uv_stream_t* stream, int error) { 431 uv_write_t* req; 432 QUEUE* q; 433 while (!QUEUE_EMPTY(&stream->write_queue)) { 434 q = QUEUE_HEAD(&stream->write_queue); 435 QUEUE_REMOVE(q); 436 437 req = QUEUE_DATA(q, uv_write_t, queue); 438 req->error = error; 439 440 QUEUE_INSERT_TAIL(&stream->write_completed_queue, &req->queue); 441 } 442 } 443 444 445 void uv__stream_destroy(uv_stream_t* stream) { 446 assert(!uv__io_active(&stream->io_watcher, POLLIN | POLLOUT)); 447 assert(stream->flags & UV_HANDLE_CLOSED); 448 449 if (stream->connect_req) { 450 uv__req_unregister(stream->loop, stream->connect_req); 451 stream->connect_req->cb(stream->connect_req, UV_ECANCELED); 452 stream->connect_req = NULL; 453 } 454 455 uv__stream_flush_write_queue(stream, UV_ECANCELED); 456 uv__write_callbacks(stream); 457 uv__drain(stream); 458 459 assert(stream->write_queue_size == 0); 460 } 461 462 463 /* Implements a best effort approach to mitigating accept() EMFILE errors. 464 * We have a spare file descriptor stashed away that we close to get below 465 * the EMFILE limit. Next, we accept all pending connections and close them 466 * immediately to signal the clients that we're overloaded - and we are, but 467 * we still keep on trucking. 468 * 469 * There is one caveat: it's not reliable in a multi-threaded environment. 470 * The file descriptor limit is per process. Our party trick fails if another 471 * thread opens a file or creates a socket in the time window between us 472 * calling close() and accept(). 473 */ 474 static int uv__emfile_trick(uv_loop_t* loop, int accept_fd) { 475 int err; 476 int emfile_fd; 477 478 if (loop->emfile_fd == -1) 479 return UV_EMFILE; 480 481 uv__close(loop->emfile_fd); 482 loop->emfile_fd = -1; 483 484 do { 485 err = uv__accept(accept_fd); 486 if (err >= 0) 487 uv__close(err); 488 } while (err >= 0 || err == UV_EINTR); 489 490 emfile_fd = uv__open_cloexec("/", O_RDONLY); 491 if (emfile_fd >= 0) 492 loop->emfile_fd = emfile_fd; 493 494 return err; 495 } 496 497 498 #if defined(UV_HAVE_KQUEUE) 499 # define UV_DEC_BACKLOG(w) w->rcount--; 500 #else 501 # define UV_DEC_BACKLOG(w) /* no-op */ 502 #endif /* defined(UV_HAVE_KQUEUE) */ 503 504 505 void uv__server_io(uv_loop_t* loop, uv__io_t* w, unsigned int events) { 506 uv_stream_t* stream; 507 int err; 508 509 stream = container_of(w, uv_stream_t, io_watcher); 510 assert(events & POLLIN); 511 assert(stream->accepted_fd == -1); 512 assert(!(stream->flags & UV_HANDLE_CLOSING)); 513 514 uv__io_start(stream->loop, &stream->io_watcher, POLLIN); 515 516 /* connection_cb can close the server socket while we're 517 * in the loop so check it on each iteration. 518 */ 519 while (uv__stream_fd(stream) != -1) { 520 assert(stream->accepted_fd == -1); 521 522 #if defined(UV_HAVE_KQUEUE) 523 if (w->rcount <= 0) 524 return; 525 #endif /* defined(UV_HAVE_KQUEUE) */ 526 527 err = uv__accept(uv__stream_fd(stream)); 528 if (err < 0) { 529 if (err == UV_EAGAIN || err == UV__ERR(EWOULDBLOCK)) 530 return; /* Not an error. */ 531 532 if (err == UV_ECONNABORTED) 533 continue; /* Ignore. Nothing we can do about that. */ 534 535 if (err == UV_EMFILE || err == UV_ENFILE) { 536 err = uv__emfile_trick(loop, uv__stream_fd(stream)); 537 if (err == UV_EAGAIN || err == UV__ERR(EWOULDBLOCK)) 538 break; 539 } 540 541 stream->connection_cb(stream, err); 542 continue; 543 } 544 545 UV_DEC_BACKLOG(w) 546 stream->accepted_fd = err; 547 stream->connection_cb(stream, 0); 548 549 if (stream->accepted_fd != -1) { 550 /* The user hasn't yet accepted called uv_accept() */ 551 uv__io_stop(loop, &stream->io_watcher, POLLIN); 552 return; 553 } 554 555 if (stream->type == UV_TCP && 556 (stream->flags & UV_HANDLE_TCP_SINGLE_ACCEPT)) { 557 /* Give other processes a chance to accept connections. */ 558 struct timespec timeout = { 0, 1 }; 559 nanosleep(&timeout, NULL); 560 } 561 } 562 } 563 564 565 #undef UV_DEC_BACKLOG 566 567 568 int uv_accept(uv_stream_t* server, uv_stream_t* client) { 569 int err; 570 571 assert(server->loop == client->loop); 572 573 if (server->accepted_fd == -1) 574 return UV_EAGAIN; 575 576 switch (client->type) { 577 case UV_NAMED_PIPE: 578 case UV_TCP: 579 err = uv__stream_open(client, 580 server->accepted_fd, 581 UV_HANDLE_READABLE | UV_HANDLE_WRITABLE); 582 if (err) { 583 /* TODO handle error */ 584 uv__close(server->accepted_fd); 585 goto done; 586 } 587 break; 588 589 case UV_UDP: 590 err = uv_udp_open((uv_udp_t*) client, server->accepted_fd); 591 if (err) { 592 uv__close(server->accepted_fd); 593 goto done; 594 } 595 break; 596 597 default: 598 return UV_EINVAL; 599 } 600 601 client->flags |= UV_HANDLE_BOUND; 602 603 done: 604 /* Process queued fds */ 605 if (server->queued_fds != NULL) { 606 uv__stream_queued_fds_t* queued_fds; 607 608 queued_fds = server->queued_fds; 609 610 /* Read first */ 611 server->accepted_fd = queued_fds->fds[0]; 612 613 /* All read, free */ 614 assert(queued_fds->offset > 0); 615 if (--queued_fds->offset == 0) { 616 uv__free(queued_fds); 617 server->queued_fds = NULL; 618 } else { 619 /* Shift rest */ 620 memmove(queued_fds->fds, 621 queued_fds->fds + 1, 622 queued_fds->offset * sizeof(*queued_fds->fds)); 623 } 624 } else { 625 server->accepted_fd = -1; 626 if (err == 0) 627 uv__io_start(server->loop, &server->io_watcher, POLLIN); 628 } 629 return err; 630 } 631 632 633 int uv_listen(uv_stream_t* stream, int backlog, uv_connection_cb cb) { 634 int err; 635 if (uv__is_closing(stream)) { 636 return UV_EINVAL; 637 } 638 switch (stream->type) { 639 case UV_TCP: 640 err = uv__tcp_listen((uv_tcp_t*)stream, backlog, cb); 641 break; 642 643 case UV_NAMED_PIPE: 644 err = uv__pipe_listen((uv_pipe_t*)stream, backlog, cb); 645 break; 646 647 default: 648 err = UV_EINVAL; 649 } 650 651 if (err == 0) 652 uv__handle_start(stream); 653 654 return err; 655 } 656 657 658 static void uv__drain(uv_stream_t* stream) { 659 uv_shutdown_t* req; 660 int err; 661 662 assert(QUEUE_EMPTY(&stream->write_queue)); 663 if (!(stream->flags & UV_HANDLE_CLOSING)) { 664 uv__io_stop(stream->loop, &stream->io_watcher, POLLOUT); 665 uv__stream_osx_interrupt_select(stream); 666 } 667 668 if (!(stream->flags & UV_HANDLE_SHUTTING)) 669 return; 670 671 req = stream->shutdown_req; 672 assert(req); 673 674 if ((stream->flags & UV_HANDLE_CLOSING) || 675 !(stream->flags & UV_HANDLE_SHUT)) { 676 stream->shutdown_req = NULL; 677 stream->flags &= ~UV_HANDLE_SHUTTING; 678 uv__req_unregister(stream->loop, req); 679 680 err = 0; 681 if (stream->flags & UV_HANDLE_CLOSING) 682 /* The user destroyed the stream before we got to do the shutdown. */ 683 err = UV_ECANCELED; 684 else if (shutdown(uv__stream_fd(stream), SHUT_WR)) 685 err = UV__ERR(errno); 686 else /* Success. */ 687 stream->flags |= UV_HANDLE_SHUT; 688 689 if (req->cb != NULL) 690 req->cb(req, err); 691 } 692 } 693 694 695 static ssize_t uv__writev(int fd, struct iovec* vec, size_t n) { 696 if (n == 1) 697 return write(fd, vec->iov_base, vec->iov_len); 698 else 699 return writev(fd, vec, n); 700 } 701 702 703 static size_t uv__write_req_size(uv_write_t* req) { 704 size_t size; 705 706 assert(req->bufs != NULL); 707 size = uv__count_bufs(req->bufs + req->write_index, 708 req->nbufs - req->write_index); 709 assert(req->handle->write_queue_size >= size); 710 711 return size; 712 } 713 714 715 /* Returns 1 if all write request data has been written, or 0 if there is still 716 * more data to write. 717 * 718 * Note: the return value only says something about the *current* request. 719 * There may still be other write requests sitting in the queue. 720 */ 721 static int uv__write_req_update(uv_stream_t* stream, 722 uv_write_t* req, 723 size_t n) { 724 uv_buf_t* buf; 725 size_t len; 726 727 assert(n <= stream->write_queue_size); 728 stream->write_queue_size -= n; 729 730 buf = req->bufs + req->write_index; 731 732 do { 733 len = n < buf->len ? n : buf->len; 734 buf->base += len; 735 buf->len -= len; 736 buf += (buf->len == 0); /* Advance to next buffer if this one is empty. */ 737 n -= len; 738 } while (n > 0); 739 740 req->write_index = buf - req->bufs; 741 742 return req->write_index == req->nbufs; 743 } 744 745 746 static void uv__write_req_finish(uv_write_t* req) { 747 uv_stream_t* stream = req->handle; 748 749 /* Pop the req off tcp->write_queue. */ 750 QUEUE_REMOVE(&req->queue); 751 752 /* Only free when there was no error. On error, we touch up write_queue_size 753 * right before making the callback. The reason we don't do that right away 754 * is that a write_queue_size > 0 is our only way to signal to the user that 755 * they should stop writing - which they should if we got an error. Something 756 * to revisit in future revisions of the libuv API. 757 */ 758 if (req->error == 0) { 759 if (req->bufs != req->bufsml) 760 uv__free(req->bufs); 761 req->bufs = NULL; 762 } 763 764 /* Add it to the write_completed_queue where it will have its 765 * callback called in the near future. 766 */ 767 QUEUE_INSERT_TAIL(&stream->write_completed_queue, &req->queue); 768 uv__io_feed(stream->loop, &stream->io_watcher); 769 } 770 771 772 static int uv__handle_fd(uv_handle_t* handle) { 773 switch (handle->type) { 774 case UV_NAMED_PIPE: 775 case UV_TCP: 776 return ((uv_stream_t*) handle)->io_watcher.fd; 777 778 case UV_UDP: 779 return ((uv_udp_t*) handle)->io_watcher.fd; 780 781 default: 782 return -1; 783 } 784 } 785 786 static int uv__try_write(uv_stream_t* stream, 787 const uv_buf_t bufs[], 788 unsigned int nbufs, 789 uv_stream_t* send_handle) { 790 struct iovec* iov; 791 int iovmax; 792 int iovcnt; 793 ssize_t n; 794 795 /* 796 * Cast to iovec. We had to have our own uv_buf_t instead of iovec 797 * because Windows's WSABUF is not an iovec. 798 */ 799 iov = (struct iovec*) bufs; 800 iovcnt = nbufs; 801 802 iovmax = uv__getiovmax(); 803 804 /* Limit iov count to avoid EINVALs from writev() */ 805 if (iovcnt > iovmax) 806 iovcnt = iovmax; 807 808 /* 809 * Now do the actual writev. Note that we've been updating the pointers 810 * inside the iov each time we write. So there is no need to offset it. 811 */ 812 if (send_handle != NULL) { 813 int fd_to_send; 814 struct msghdr msg; 815 struct cmsghdr *cmsg; 816 union { 817 char data[64]; 818 struct cmsghdr alias; 819 } scratch; 820 821 if (uv__is_closing(send_handle)) 822 return UV_EBADF; 823 824 fd_to_send = uv__handle_fd((uv_handle_t*) send_handle); 825 826 memset(&scratch, 0, sizeof(scratch)); 827 828 assert(fd_to_send >= 0); 829 830 msg.msg_name = NULL; 831 msg.msg_namelen = 0; 832 msg.msg_iov = iov; 833 msg.msg_iovlen = iovcnt; 834 msg.msg_flags = 0; 835 836 msg.msg_control = &scratch.alias; 837 msg.msg_controllen = CMSG_SPACE(sizeof(fd_to_send)); 838 839 cmsg = CMSG_FIRSTHDR(&msg); 840 cmsg->cmsg_level = SOL_SOCKET; 841 cmsg->cmsg_type = SCM_RIGHTS; 842 cmsg->cmsg_len = CMSG_LEN(sizeof(fd_to_send)); 843 844 /* silence aliasing warning */ 845 { 846 void* pv = CMSG_DATA(cmsg); 847 int* pi = pv; 848 *pi = fd_to_send; 849 } 850 851 do 852 n = sendmsg(uv__stream_fd(stream), &msg, 0); 853 while (n == -1 && errno == EINTR); 854 } else { 855 do 856 n = uv__writev(uv__stream_fd(stream), iov, iovcnt); 857 while (n == -1 && errno == EINTR); 858 } 859 860 if (n >= 0) 861 return n; 862 863 if (errno == EAGAIN || errno == EWOULDBLOCK || errno == ENOBUFS) 864 return UV_EAGAIN; 865 866 #ifdef __APPLE__ 867 /* macOS versions 10.10 and 10.15 - and presumbaly 10.11 to 10.14, too - 868 * have a bug where a race condition causes the kernel to return EPROTOTYPE 869 * because the socket isn't fully constructed. It's probably the result of 870 * the peer closing the connection and that is why libuv translates it to 871 * ECONNRESET. Previously, libuv retried until the EPROTOTYPE error went 872 * away but some VPN software causes the same behavior except the error is 873 * permanent, not transient, turning the retry mechanism into an infinite 874 * loop. See https://github.com/libuv/libuv/pull/482. 875 */ 876 if (errno == EPROTOTYPE) 877 return UV_ECONNRESET; 878 #endif /* __APPLE__ */ 879 880 return UV__ERR(errno); 881 } 882 883 static void uv__write(uv_stream_t* stream) { 884 QUEUE* q; 885 uv_write_t* req; 886 ssize_t n; 887 888 assert(uv__stream_fd(stream) >= 0); 889 890 for (;;) { 891 if (QUEUE_EMPTY(&stream->write_queue)) 892 return; 893 894 q = QUEUE_HEAD(&stream->write_queue); 895 req = QUEUE_DATA(q, uv_write_t, queue); 896 assert(req->handle == stream); 897 898 n = uv__try_write(stream, 899 &(req->bufs[req->write_index]), 900 req->nbufs - req->write_index, 901 req->send_handle); 902 903 /* Ensure the handle isn't sent again in case this is a partial write. */ 904 if (n >= 0) { 905 req->send_handle = NULL; 906 if (uv__write_req_update(stream, req, n)) { 907 uv__write_req_finish(req); 908 return; /* TODO(bnoordhuis) Start trying to write the next request. */ 909 } 910 } else if (n != UV_EAGAIN) 911 break; 912 913 /* If this is a blocking stream, try again. */ 914 if (stream->flags & UV_HANDLE_BLOCKING_WRITES) 915 continue; 916 917 /* We're not done. */ 918 uv__io_start(stream->loop, &stream->io_watcher, POLLOUT); 919 920 /* Notify select() thread about state change */ 921 uv__stream_osx_interrupt_select(stream); 922 923 return; 924 } 925 926 req->error = n; 927 uv__write_req_finish(req); 928 uv__io_stop(stream->loop, &stream->io_watcher, POLLOUT); 929 uv__stream_osx_interrupt_select(stream); 930 } 931 932 933 static void uv__write_callbacks(uv_stream_t* stream) { 934 uv_write_t* req; 935 QUEUE* q; 936 QUEUE pq; 937 938 if (QUEUE_EMPTY(&stream->write_completed_queue)) 939 return; 940 941 QUEUE_MOVE(&stream->write_completed_queue, &pq); 942 943 while (!QUEUE_EMPTY(&pq)) { 944 /* Pop a req off write_completed_queue. */ 945 q = QUEUE_HEAD(&pq); 946 req = QUEUE_DATA(q, uv_write_t, queue); 947 QUEUE_REMOVE(q); 948 uv__req_unregister(stream->loop, req); 949 950 if (req->bufs != NULL) { 951 stream->write_queue_size -= uv__write_req_size(req); 952 if (req->bufs != req->bufsml) 953 uv__free(req->bufs); 954 req->bufs = NULL; 955 } 956 957 /* NOTE: call callback AFTER freeing the request data. */ 958 if (req->cb) 959 req->cb(req, req->error); 960 } 961 } 962 963 964 static void uv__stream_eof(uv_stream_t* stream, const uv_buf_t* buf) { 965 stream->flags |= UV_HANDLE_READ_EOF; 966 stream->flags &= ~UV_HANDLE_READING; 967 uv__io_stop(stream->loop, &stream->io_watcher, POLLIN); 968 uv__handle_stop(stream); 969 uv__stream_osx_interrupt_select(stream); 970 stream->read_cb(stream, UV_EOF, buf); 971 } 972 973 974 static int uv__stream_queue_fd(uv_stream_t* stream, int fd) { 975 uv__stream_queued_fds_t* queued_fds; 976 unsigned int queue_size; 977 978 queued_fds = stream->queued_fds; 979 if (queued_fds == NULL) { 980 queue_size = 8; 981 queued_fds = uv__malloc((queue_size - 1) * sizeof(*queued_fds->fds) + 982 sizeof(*queued_fds)); 983 if (queued_fds == NULL) 984 return UV_ENOMEM; 985 queued_fds->size = queue_size; 986 queued_fds->offset = 0; 987 stream->queued_fds = queued_fds; 988 989 /* Grow */ 990 } else if (queued_fds->size == queued_fds->offset) { 991 queue_size = queued_fds->size + 8; 992 queued_fds = uv__realloc(queued_fds, 993 (queue_size - 1) * sizeof(*queued_fds->fds) + 994 sizeof(*queued_fds)); 995 996 /* 997 * Allocation failure, report back. 998 * NOTE: if it is fatal - sockets will be closed in uv__stream_close 999 */ 1000 if (queued_fds == NULL) 1001 return UV_ENOMEM; 1002 queued_fds->size = queue_size; 1003 stream->queued_fds = queued_fds; 1004 } 1005 1006 /* Put fd in a queue */ 1007 queued_fds->fds[queued_fds->offset++] = fd; 1008 1009 return 0; 1010 } 1011 1012 1013 #if defined(__PASE__) 1014 /* on IBMi PASE the control message length can not exceed 256. */ 1015 # define UV__CMSG_FD_COUNT 60 1016 #else 1017 # define UV__CMSG_FD_COUNT 64 1018 #endif 1019 #define UV__CMSG_FD_SIZE (UV__CMSG_FD_COUNT * sizeof(int)) 1020 1021 1022 static int uv__stream_recv_cmsg(uv_stream_t* stream, struct msghdr* msg) { 1023 struct cmsghdr* cmsg; 1024 1025 for (cmsg = CMSG_FIRSTHDR(msg); cmsg != NULL; cmsg = CMSG_NXTHDR(msg, cmsg)) { 1026 char* start; 1027 char* end; 1028 int err; 1029 void* pv; 1030 int* pi; 1031 unsigned int i; 1032 unsigned int count; 1033 1034 if (cmsg->cmsg_type != SCM_RIGHTS) { 1035 fprintf(stderr, "ignoring non-SCM_RIGHTS ancillary data: %d\n", 1036 cmsg->cmsg_type); 1037 continue; 1038 } 1039 1040 /* silence aliasing warning */ 1041 pv = CMSG_DATA(cmsg); 1042 pi = pv; 1043 1044 /* Count available fds */ 1045 start = (char*) cmsg; 1046 end = (char*) cmsg + cmsg->cmsg_len; 1047 count = 0; 1048 while (start + CMSG_LEN(count * sizeof(*pi)) < end) 1049 count++; 1050 assert(start + CMSG_LEN(count * sizeof(*pi)) == end); 1051 1052 for (i = 0; i < count; i++) { 1053 /* Already has accepted fd, queue now */ 1054 if (stream->accepted_fd != -1) { 1055 err = uv__stream_queue_fd(stream, pi[i]); 1056 if (err != 0) { 1057 /* Close rest */ 1058 for (; i < count; i++) 1059 uv__close(pi[i]); 1060 return err; 1061 } 1062 } else { 1063 stream->accepted_fd = pi[i]; 1064 } 1065 } 1066 } 1067 1068 return 0; 1069 } 1070 1071 1072 #ifdef __clang__ 1073 # pragma clang diagnostic push 1074 # pragma clang diagnostic ignored "-Wgnu-folding-constant" 1075 # pragma clang diagnostic ignored "-Wvla-extension" 1076 #endif 1077 1078 static void uv__read(uv_stream_t* stream) { 1079 uv_buf_t buf; 1080 ssize_t nread; 1081 struct msghdr msg; 1082 char cmsg_space[CMSG_SPACE(UV__CMSG_FD_SIZE)]; 1083 int count; 1084 int err; 1085 int is_ipc; 1086 1087 stream->flags &= ~UV_HANDLE_READ_PARTIAL; 1088 1089 /* Prevent loop starvation when the data comes in as fast as (or faster than) 1090 * we can read it. XXX Need to rearm fd if we switch to edge-triggered I/O. 1091 */ 1092 count = 32; 1093 1094 is_ipc = stream->type == UV_NAMED_PIPE && ((uv_pipe_t*) stream)->ipc; 1095 1096 /* XXX: Maybe instead of having UV_HANDLE_READING we just test if 1097 * tcp->read_cb is NULL or not? 1098 */ 1099 while (stream->read_cb 1100 && (stream->flags & UV_HANDLE_READING) 1101 && (count-- > 0)) { 1102 assert(stream->alloc_cb != NULL); 1103 1104 buf = uv_buf_init(NULL, 0); 1105 stream->alloc_cb((uv_handle_t*)stream, 64 * 1024, &buf); 1106 if (buf.base == NULL || buf.len == 0) { 1107 /* User indicates it can't or won't handle the read. */ 1108 stream->read_cb(stream, UV_ENOBUFS, &buf); 1109 return; 1110 } 1111 1112 assert(buf.base != NULL); 1113 assert(uv__stream_fd(stream) >= 0); 1114 1115 if (!is_ipc) { 1116 do { 1117 nread = read(uv__stream_fd(stream), buf.base, buf.len); 1118 } 1119 while (nread < 0 && errno == EINTR); 1120 } else { 1121 /* ipc uses recvmsg */ 1122 msg.msg_flags = 0; 1123 msg.msg_iov = (struct iovec*) &buf; 1124 msg.msg_iovlen = 1; 1125 msg.msg_name = NULL; 1126 msg.msg_namelen = 0; 1127 /* Set up to receive a descriptor even if one isn't in the message */ 1128 msg.msg_controllen = sizeof(cmsg_space); 1129 msg.msg_control = cmsg_space; 1130 1131 do { 1132 nread = uv__recvmsg(uv__stream_fd(stream), &msg, 0); 1133 } 1134 while (nread < 0 && errno == EINTR); 1135 } 1136 1137 if (nread < 0) { 1138 /* Error */ 1139 if (errno == EAGAIN || errno == EWOULDBLOCK) { 1140 /* Wait for the next one. */ 1141 if (stream->flags & UV_HANDLE_READING) { 1142 uv__io_start(stream->loop, &stream->io_watcher, POLLIN); 1143 uv__stream_osx_interrupt_select(stream); 1144 } 1145 stream->read_cb(stream, 0, &buf); 1146 #if defined(__CYGWIN__) || defined(__MSYS__) 1147 } else if (errno == ECONNRESET && stream->type == UV_NAMED_PIPE) { 1148 uv__stream_eof(stream, &buf); 1149 return; 1150 #endif 1151 } else { 1152 /* Error. User should call uv_close(). */ 1153 stream->flags &= ~(UV_HANDLE_READABLE | UV_HANDLE_WRITABLE); 1154 stream->read_cb(stream, UV__ERR(errno), &buf); 1155 if (stream->flags & UV_HANDLE_READING) { 1156 stream->flags &= ~UV_HANDLE_READING; 1157 uv__io_stop(stream->loop, &stream->io_watcher, POLLIN); 1158 uv__handle_stop(stream); 1159 uv__stream_osx_interrupt_select(stream); 1160 } 1161 } 1162 return; 1163 } else if (nread == 0) { 1164 uv__stream_eof(stream, &buf); 1165 return; 1166 } else { 1167 /* Successful read */ 1168 ssize_t buflen = buf.len; 1169 1170 if (is_ipc) { 1171 err = uv__stream_recv_cmsg(stream, &msg); 1172 if (err != 0) { 1173 stream->read_cb(stream, err, &buf); 1174 return; 1175 } 1176 } 1177 1178 #if defined(__MVS__) 1179 if (is_ipc && msg.msg_controllen > 0) { 1180 uv_buf_t blankbuf; 1181 int nread; 1182 struct iovec *old; 1183 1184 blankbuf.base = 0; 1185 blankbuf.len = 0; 1186 old = msg.msg_iov; 1187 msg.msg_iov = (struct iovec*) &blankbuf; 1188 nread = 0; 1189 do { 1190 nread = uv__recvmsg(uv__stream_fd(stream), &msg, 0); 1191 err = uv__stream_recv_cmsg(stream, &msg); 1192 if (err != 0) { 1193 stream->read_cb(stream, err, &buf); 1194 msg.msg_iov = old; 1195 return; 1196 } 1197 } while (nread == 0 && msg.msg_controllen > 0); 1198 msg.msg_iov = old; 1199 } 1200 #endif 1201 stream->read_cb(stream, nread, &buf); 1202 1203 /* Return if we didn't fill the buffer, there is no more data to read. */ 1204 if (nread < buflen) { 1205 stream->flags |= UV_HANDLE_READ_PARTIAL; 1206 return; 1207 } 1208 } 1209 } 1210 } 1211 1212 1213 #ifdef __clang__ 1214 # pragma clang diagnostic pop 1215 #endif 1216 1217 #undef UV__CMSG_FD_COUNT 1218 #undef UV__CMSG_FD_SIZE 1219 1220 1221 int uv_shutdown(uv_shutdown_t* req, uv_stream_t* stream, uv_shutdown_cb cb) { 1222 assert(stream->type == UV_TCP || 1223 stream->type == UV_TTY || 1224 stream->type == UV_NAMED_PIPE); 1225 1226 if (!(stream->flags & UV_HANDLE_WRITABLE) || 1227 stream->flags & UV_HANDLE_SHUT || 1228 stream->flags & UV_HANDLE_SHUTTING || 1229 uv__is_closing(stream)) { 1230 return UV_ENOTCONN; 1231 } 1232 1233 assert(uv__stream_fd(stream) >= 0); 1234 1235 /* Initialize request. The `shutdown(2)` call will always be deferred until 1236 * `uv__drain`, just before the callback is run. */ 1237 uv__req_init(stream->loop, req, UV_SHUTDOWN); 1238 req->handle = stream; 1239 req->cb = cb; 1240 stream->shutdown_req = req; 1241 stream->flags |= UV_HANDLE_SHUTTING; 1242 stream->flags &= ~UV_HANDLE_WRITABLE; 1243 1244 if (QUEUE_EMPTY(&stream->write_queue)) 1245 uv__io_feed(stream->loop, &stream->io_watcher); 1246 1247 return 0; 1248 } 1249 1250 1251 static void uv__stream_io(uv_loop_t* loop, uv__io_t* w, unsigned int events) { 1252 uv_stream_t* stream; 1253 1254 stream = container_of(w, uv_stream_t, io_watcher); 1255 1256 assert(stream->type == UV_TCP || 1257 stream->type == UV_NAMED_PIPE || 1258 stream->type == UV_TTY); 1259 assert(!(stream->flags & UV_HANDLE_CLOSING)); 1260 1261 if (stream->connect_req) { 1262 uv__stream_connect(stream); 1263 return; 1264 } 1265 1266 assert(uv__stream_fd(stream) >= 0); 1267 1268 /* Ignore POLLHUP here. Even if it's set, there may still be data to read. */ 1269 if (events & (POLLIN | POLLERR | POLLHUP)) 1270 uv__read(stream); 1271 1272 if (uv__stream_fd(stream) == -1) 1273 return; /* read_cb closed stream. */ 1274 1275 /* Short-circuit iff POLLHUP is set, the user is still interested in read 1276 * events and uv__read() reported a partial read but not EOF. If the EOF 1277 * flag is set, uv__read() called read_cb with err=UV_EOF and we don't 1278 * have to do anything. If the partial read flag is not set, we can't 1279 * report the EOF yet because there is still data to read. 1280 */ 1281 if ((events & POLLHUP) && 1282 (stream->flags & UV_HANDLE_READING) && 1283 (stream->flags & UV_HANDLE_READ_PARTIAL) && 1284 !(stream->flags & UV_HANDLE_READ_EOF)) { 1285 uv_buf_t buf = { NULL, 0 }; 1286 uv__stream_eof(stream, &buf); 1287 } 1288 1289 if (uv__stream_fd(stream) == -1) 1290 return; /* read_cb closed stream. */ 1291 1292 if (events & (POLLOUT | POLLERR | POLLHUP)) { 1293 uv__write(stream); 1294 uv__write_callbacks(stream); 1295 1296 /* Write queue drained. */ 1297 if (QUEUE_EMPTY(&stream->write_queue)) 1298 uv__drain(stream); 1299 } 1300 } 1301 1302 1303 /** 1304 * We get called here from directly following a call to connect(2). 1305 * In order to determine if we've errored out or succeeded must call 1306 * getsockopt. 1307 */ 1308 static void uv__stream_connect(uv_stream_t* stream) { 1309 int error; 1310 uv_connect_t* req = stream->connect_req; 1311 socklen_t errorsize = sizeof(int); 1312 1313 assert(stream->type == UV_TCP || stream->type == UV_NAMED_PIPE); 1314 assert(req); 1315 1316 if (stream->delayed_error) { 1317 /* To smooth over the differences between unixes errors that 1318 * were reported synchronously on the first connect can be delayed 1319 * until the next tick--which is now. 1320 */ 1321 error = stream->delayed_error; 1322 stream->delayed_error = 0; 1323 } else { 1324 /* Normal situation: we need to get the socket error from the kernel. */ 1325 assert(uv__stream_fd(stream) >= 0); 1326 getsockopt(uv__stream_fd(stream), 1327 SOL_SOCKET, 1328 SO_ERROR, 1329 &error, 1330 &errorsize); 1331 error = UV__ERR(error); 1332 } 1333 1334 if (error == UV__ERR(EINPROGRESS)) 1335 return; 1336 1337 stream->connect_req = NULL; 1338 uv__req_unregister(stream->loop, req); 1339 1340 if (error < 0 || QUEUE_EMPTY(&stream->write_queue)) { 1341 uv__io_stop(stream->loop, &stream->io_watcher, POLLOUT); 1342 } 1343 1344 if (req->cb) 1345 req->cb(req, error); 1346 1347 if (uv__stream_fd(stream) == -1) 1348 return; 1349 1350 if (error < 0) { 1351 uv__stream_flush_write_queue(stream, UV_ECANCELED); 1352 uv__write_callbacks(stream); 1353 } 1354 } 1355 1356 1357 static int uv__check_before_write(uv_stream_t* stream, 1358 unsigned int nbufs, 1359 uv_stream_t* send_handle) { 1360 assert(nbufs > 0); 1361 assert((stream->type == UV_TCP || 1362 stream->type == UV_NAMED_PIPE || 1363 stream->type == UV_TTY) && 1364 "uv_write (unix) does not yet support other types of streams"); 1365 1366 if (uv__stream_fd(stream) < 0) 1367 return UV_EBADF; 1368 1369 if (!(stream->flags & UV_HANDLE_WRITABLE)) 1370 return UV_EPIPE; 1371 1372 if (send_handle != NULL) { 1373 if (stream->type != UV_NAMED_PIPE || !((uv_pipe_t*)stream)->ipc) 1374 return UV_EINVAL; 1375 1376 /* XXX We abuse uv_write2() to send over UDP handles to child processes. 1377 * Don't call uv__stream_fd() on those handles, it's a macro that on OS X 1378 * evaluates to a function that operates on a uv_stream_t with a couple of 1379 * OS X specific fields. On other Unices it does (handle)->io_watcher.fd, 1380 * which works but only by accident. 1381 */ 1382 if (uv__handle_fd((uv_handle_t*) send_handle) < 0) 1383 return UV_EBADF; 1384 1385 #if defined(__CYGWIN__) || defined(__MSYS__) 1386 /* Cygwin recvmsg always sets msg_controllen to zero, so we cannot send it. 1387 See https://github.com/mirror/newlib-cygwin/blob/86fc4bf0/winsup/cygwin/fhandler_socket.cc#L1736-L1743 */ 1388 return UV_ENOSYS; 1389 #endif 1390 } 1391 1392 return 0; 1393 } 1394 1395 int uv_write2(uv_write_t* req, 1396 uv_stream_t* stream, 1397 const uv_buf_t bufs[], 1398 unsigned int nbufs, 1399 uv_stream_t* send_handle, 1400 uv_write_cb cb) { 1401 int empty_queue; 1402 int err; 1403 1404 err = uv__check_before_write(stream, nbufs, send_handle); 1405 if (err < 0) 1406 return err; 1407 1408 /* It's legal for write_queue_size > 0 even when the write_queue is empty; 1409 * it means there are error-state requests in the write_completed_queue that 1410 * will touch up write_queue_size later, see also uv__write_req_finish(). 1411 * We could check that write_queue is empty instead but that implies making 1412 * a write() syscall when we know that the handle is in error mode. 1413 */ 1414 empty_queue = (stream->write_queue_size == 0); 1415 1416 /* Initialize the req */ 1417 uv__req_init(stream->loop, req, UV_WRITE); 1418 req->cb = cb; 1419 req->handle = stream; 1420 req->error = 0; 1421 req->send_handle = send_handle; 1422 QUEUE_INIT(&req->queue); 1423 1424 req->bufs = req->bufsml; 1425 if (nbufs > ARRAY_SIZE(req->bufsml)) 1426 req->bufs = uv__malloc(nbufs * sizeof(bufs[0])); 1427 1428 if (req->bufs == NULL) 1429 return UV_ENOMEM; 1430 1431 memcpy(req->bufs, bufs, nbufs * sizeof(bufs[0])); 1432 req->nbufs = nbufs; 1433 req->write_index = 0; 1434 stream->write_queue_size += uv__count_bufs(bufs, nbufs); 1435 1436 /* Append the request to write_queue. */ 1437 QUEUE_INSERT_TAIL(&stream->write_queue, &req->queue); 1438 1439 /* If the queue was empty when this function began, we should attempt to 1440 * do the write immediately. Otherwise start the write_watcher and wait 1441 * for the fd to become writable. 1442 */ 1443 if (stream->connect_req) { 1444 /* Still connecting, do nothing. */ 1445 } 1446 else if (empty_queue) { 1447 uv__write(stream); 1448 } 1449 else { 1450 /* 1451 * blocking streams should never have anything in the queue. 1452 * if this assert fires then somehow the blocking stream isn't being 1453 * sufficiently flushed in uv__write. 1454 */ 1455 assert(!(stream->flags & UV_HANDLE_BLOCKING_WRITES)); 1456 uv__io_start(stream->loop, &stream->io_watcher, POLLOUT); 1457 uv__stream_osx_interrupt_select(stream); 1458 } 1459 1460 return 0; 1461 } 1462 1463 1464 /* The buffers to be written must remain valid until the callback is called. 1465 * This is not required for the uv_buf_t array. 1466 */ 1467 int uv_write(uv_write_t* req, 1468 uv_stream_t* handle, 1469 const uv_buf_t bufs[], 1470 unsigned int nbufs, 1471 uv_write_cb cb) { 1472 return uv_write2(req, handle, bufs, nbufs, NULL, cb); 1473 } 1474 1475 1476 int uv_try_write(uv_stream_t* stream, 1477 const uv_buf_t bufs[], 1478 unsigned int nbufs) { 1479 return uv_try_write2(stream, bufs, nbufs, NULL); 1480 } 1481 1482 1483 int uv_try_write2(uv_stream_t* stream, 1484 const uv_buf_t bufs[], 1485 unsigned int nbufs, 1486 uv_stream_t* send_handle) { 1487 int err; 1488 1489 /* Connecting or already writing some data */ 1490 if (stream->connect_req != NULL || stream->write_queue_size != 0) 1491 return UV_EAGAIN; 1492 1493 err = uv__check_before_write(stream, nbufs, NULL); 1494 if (err < 0) 1495 return err; 1496 1497 return uv__try_write(stream, bufs, nbufs, send_handle); 1498 } 1499 1500 1501 int uv__read_start(uv_stream_t* stream, 1502 uv_alloc_cb alloc_cb, 1503 uv_read_cb read_cb) { 1504 assert(stream->type == UV_TCP || stream->type == UV_NAMED_PIPE || 1505 stream->type == UV_TTY); 1506 1507 /* The UV_HANDLE_READING flag is irrelevant of the state of the stream - it 1508 * just expresses the desired state of the user. */ 1509 stream->flags |= UV_HANDLE_READING; 1510 stream->flags &= ~UV_HANDLE_READ_EOF; 1511 1512 /* TODO: try to do the read inline? */ 1513 assert(uv__stream_fd(stream) >= 0); 1514 assert(alloc_cb); 1515 1516 stream->read_cb = read_cb; 1517 stream->alloc_cb = alloc_cb; 1518 1519 uv__io_start(stream->loop, &stream->io_watcher, POLLIN); 1520 uv__handle_start(stream); 1521 uv__stream_osx_interrupt_select(stream); 1522 1523 return 0; 1524 } 1525 1526 1527 int uv_read_stop(uv_stream_t* stream) { 1528 if (!(stream->flags & UV_HANDLE_READING)) 1529 return 0; 1530 1531 stream->flags &= ~UV_HANDLE_READING; 1532 uv__io_stop(stream->loop, &stream->io_watcher, POLLIN); 1533 uv__handle_stop(stream); 1534 uv__stream_osx_interrupt_select(stream); 1535 1536 stream->read_cb = NULL; 1537 stream->alloc_cb = NULL; 1538 return 0; 1539 } 1540 1541 1542 int uv_is_readable(const uv_stream_t* stream) { 1543 return !!(stream->flags & UV_HANDLE_READABLE); 1544 } 1545 1546 1547 int uv_is_writable(const uv_stream_t* stream) { 1548 return !!(stream->flags & UV_HANDLE_WRITABLE); 1549 } 1550 1551 1552 #if defined(__APPLE__) 1553 int uv___stream_fd(const uv_stream_t* handle) { 1554 const uv__stream_select_t* s; 1555 1556 assert(handle->type == UV_TCP || 1557 handle->type == UV_TTY || 1558 handle->type == UV_NAMED_PIPE); 1559 1560 s = handle->select; 1561 if (s != NULL) 1562 return s->fd; 1563 1564 return handle->io_watcher.fd; 1565 } 1566 #endif /* defined(__APPLE__) */ 1567 1568 1569 void uv__stream_close(uv_stream_t* handle) { 1570 unsigned int i; 1571 uv__stream_queued_fds_t* queued_fds; 1572 1573 #if defined(__APPLE__) 1574 /* Terminate select loop first */ 1575 if (handle->select != NULL) { 1576 uv__stream_select_t* s; 1577 1578 s = handle->select; 1579 1580 uv_sem_post(&s->close_sem); 1581 uv_sem_post(&s->async_sem); 1582 uv__stream_osx_interrupt_select(handle); 1583 uv_thread_join(&s->thread); 1584 uv_sem_destroy(&s->close_sem); 1585 uv_sem_destroy(&s->async_sem); 1586 uv__close(s->fake_fd); 1587 uv__close(s->int_fd); 1588 uv_close((uv_handle_t*) &s->async, uv__stream_osx_cb_close); 1589 1590 handle->select = NULL; 1591 } 1592 #endif /* defined(__APPLE__) */ 1593 1594 uv__io_close(handle->loop, &handle->io_watcher); 1595 uv_read_stop(handle); 1596 uv__handle_stop(handle); 1597 handle->flags &= ~(UV_HANDLE_READABLE | UV_HANDLE_WRITABLE); 1598 1599 if (handle->io_watcher.fd != -1) { 1600 /* Don't close stdio file descriptors. Nothing good comes from it. */ 1601 if (handle->io_watcher.fd > STDERR_FILENO) 1602 uv__close(handle->io_watcher.fd); 1603 handle->io_watcher.fd = -1; 1604 } 1605 1606 if (handle->accepted_fd != -1) { 1607 uv__close(handle->accepted_fd); 1608 handle->accepted_fd = -1; 1609 } 1610 1611 /* Close all queued fds */ 1612 if (handle->queued_fds != NULL) { 1613 queued_fds = handle->queued_fds; 1614 for (i = 0; i < queued_fds->offset; i++) 1615 uv__close(queued_fds->fds[i]); 1616 uv__free(handle->queued_fds); 1617 handle->queued_fds = NULL; 1618 } 1619 1620 assert(!uv__io_active(&handle->io_watcher, POLLIN | POLLOUT)); 1621 } 1622 1623 1624 int uv_stream_set_blocking(uv_stream_t* handle, int blocking) { 1625 /* Don't need to check the file descriptor, uv__nonblock() 1626 * will fail with EBADF if it's not valid. 1627 */ 1628 return uv__nonblock(uv__stream_fd(handle), !blocking); 1629 } 1630