1 /* Copyright Joyent, Inc. and other Node contributors. All rights reserved. 2 * 3 * Permission is hereby granted, free of charge, to any person obtaining a copy 4 * of this software and associated documentation files (the "Software"), to 5 * deal in the Software without restriction, including without limitation the 6 * rights to use, copy, modify, merge, publish, distribute, sublicense, and/or 7 * sell copies of the Software, and to permit persons to whom the Software is 8 * furnished to do so, subject to the following conditions: 9 * 10 * The above copyright notice and this permission notice shall be included in 11 * all copies or substantial portions of the Software. 12 * 13 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR 14 * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, 15 * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE 16 * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER 17 * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING 18 * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS 19 * IN THE SOFTWARE. 20 */ 21 22 #include "uv.h" 23 #include "internal.h" 24 25 #include <stdio.h> 26 #include <stdlib.h> 27 #include <string.h> 28 #include <assert.h> 29 #include <errno.h> 30 31 #include <sys/types.h> 32 #include <sys/socket.h> 33 #include <sys/uio.h> 34 #include <sys/un.h> 35 #include <unistd.h> 36 #include <limits.h> /* IOV_MAX */ 37 38 #if defined(__APPLE__) 39 # include <sys/event.h> 40 # include <sys/time.h> 41 # include <sys/select.h> 42 43 /* Forward declaration */ 44 typedef struct uv__stream_select_s uv__stream_select_t; 45 46 struct uv__stream_select_s { 47 uv_stream_t* stream; 48 uv_thread_t thread; 49 uv_sem_t close_sem; 50 uv_sem_t async_sem; 51 uv_async_t async; 52 int events; 53 int fake_fd; 54 int int_fd; 55 int fd; 56 fd_set* sread; 57 size_t sread_sz; 58 fd_set* swrite; 59 size_t swrite_sz; 60 }; 61 62 /* Due to a possible kernel bug at least in OS X 10.10 "Yosemite", 63 * EPROTOTYPE can be returned while trying to write to a socket that is 64 * shutting down. If we retry the write, we should get the expected EPIPE 65 * instead. 66 */ 67 # define RETRY_ON_WRITE_ERROR(errno) (errno == EINTR || errno == EPROTOTYPE) 68 # define IS_TRANSIENT_WRITE_ERROR(errno, send_handle) \ 69 (errno == EAGAIN || errno == EWOULDBLOCK || errno == ENOBUFS || \ 70 (errno == EMSGSIZE && send_handle != NULL)) 71 #else 72 # define RETRY_ON_WRITE_ERROR(errno) (errno == EINTR) 73 # define IS_TRANSIENT_WRITE_ERROR(errno, send_handle) \ 74 (errno == EAGAIN || errno == EWOULDBLOCK || errno == ENOBUFS) 75 #endif /* defined(__APPLE__) */ 76 77 static void uv__stream_connect(uv_stream_t*); 78 static void uv__write(uv_stream_t* stream); 79 static void uv__read(uv_stream_t* stream); 80 static void uv__stream_io(uv_loop_t* loop, uv__io_t* w, unsigned int events); 81 static void uv__write_callbacks(uv_stream_t* stream); 82 static size_t uv__write_req_size(uv_write_t* req); 83 84 85 void uv__stream_init(uv_loop_t* loop, 86 uv_stream_t* stream, 87 uv_handle_type type) { 88 int err; 89 90 uv__handle_init(loop, (uv_handle_t*)stream, type); 91 stream->read_cb = NULL; 92 stream->alloc_cb = NULL; 93 stream->close_cb = NULL; 94 stream->connection_cb = NULL; 95 stream->connect_req = NULL; 96 stream->shutdown_req = NULL; 97 stream->accepted_fd = -1; 98 stream->queued_fds = NULL; 99 stream->delayed_error = 0; 100 QUEUE_INIT(&stream->write_queue); 101 QUEUE_INIT(&stream->write_completed_queue); 102 stream->write_queue_size = 0; 103 104 if (loop->emfile_fd == -1) { 105 err = uv__open_cloexec("/dev/null", O_RDONLY); 106 if (err < 0) 107 /* In the rare case that "/dev/null" isn't mounted open "/" 108 * instead. 109 */ 110 err = uv__open_cloexec("/", O_RDONLY); 111 if (err >= 0) 112 loop->emfile_fd = err; 113 } 114 115 #if defined(__APPLE__) 116 stream->select = NULL; 117 #endif /* defined(__APPLE_) */ 118 119 uv__io_init(&stream->io_watcher, uv__stream_io, -1); 120 } 121 122 123 static void uv__stream_osx_interrupt_select(uv_stream_t* stream) { 124 #if defined(__APPLE__) 125 /* Notify select() thread about state change */ 126 uv__stream_select_t* s; 127 int r; 128 129 s = stream->select; 130 if (s == NULL) 131 return; 132 133 /* Interrupt select() loop 134 * NOTE: fake_fd and int_fd are socketpair(), thus writing to one will 135 * emit read event on other side 136 */ 137 do 138 r = write(s->fake_fd, "x", 1); 139 while (r == -1 && errno == EINTR); 140 141 assert(r == 1); 142 #else /* !defined(__APPLE__) */ 143 /* No-op on any other platform */ 144 #endif /* !defined(__APPLE__) */ 145 } 146 147 148 #if defined(__APPLE__) 149 static void uv__stream_osx_select(void* arg) { 150 uv_stream_t* stream; 151 uv__stream_select_t* s; 152 char buf[1024]; 153 int events; 154 int fd; 155 int r; 156 int max_fd; 157 158 stream = arg; 159 s = stream->select; 160 fd = s->fd; 161 162 if (fd > s->int_fd) 163 max_fd = fd; 164 else 165 max_fd = s->int_fd; 166 167 while (1) { 168 /* Terminate on semaphore */ 169 if (uv_sem_trywait(&s->close_sem) == 0) 170 break; 171 172 /* Watch fd using select(2) */ 173 memset(s->sread, 0, s->sread_sz); 174 memset(s->swrite, 0, s->swrite_sz); 175 176 if (uv__io_active(&stream->io_watcher, POLLIN)) 177 FD_SET(fd, s->sread); 178 if (uv__io_active(&stream->io_watcher, POLLOUT)) 179 FD_SET(fd, s->swrite); 180 FD_SET(s->int_fd, s->sread); 181 182 /* Wait indefinitely for fd events */ 183 r = select(max_fd + 1, s->sread, s->swrite, NULL, NULL); 184 if (r == -1) { 185 if (errno == EINTR) 186 continue; 187 188 /* XXX: Possible?! */ 189 abort(); 190 } 191 192 /* Ignore timeouts */ 193 if (r == 0) 194 continue; 195 196 /* Empty socketpair's buffer in case of interruption */ 197 if (FD_ISSET(s->int_fd, s->sread)) 198 while (1) { 199 r = read(s->int_fd, buf, sizeof(buf)); 200 201 if (r == sizeof(buf)) 202 continue; 203 204 if (r != -1) 205 break; 206 207 if (errno == EAGAIN || errno == EWOULDBLOCK) 208 break; 209 210 if (errno == EINTR) 211 continue; 212 213 abort(); 214 } 215 216 /* Handle events */ 217 events = 0; 218 if (FD_ISSET(fd, s->sread)) 219 events |= POLLIN; 220 if (FD_ISSET(fd, s->swrite)) 221 events |= POLLOUT; 222 223 assert(events != 0 || FD_ISSET(s->int_fd, s->sread)); 224 if (events != 0) { 225 ACCESS_ONCE(int, s->events) = events; 226 227 uv_async_send(&s->async); 228 uv_sem_wait(&s->async_sem); 229 230 /* Should be processed at this stage */ 231 assert((s->events == 0) || (stream->flags & UV_HANDLE_CLOSING)); 232 } 233 } 234 } 235 236 237 static void uv__stream_osx_select_cb(uv_async_t* handle) { 238 uv__stream_select_t* s; 239 uv_stream_t* stream; 240 int events; 241 242 s = container_of(handle, uv__stream_select_t, async); 243 stream = s->stream; 244 245 /* Get and reset stream's events */ 246 events = s->events; 247 ACCESS_ONCE(int, s->events) = 0; 248 249 assert(events != 0); 250 assert(events == (events & (POLLIN | POLLOUT))); 251 252 /* Invoke callback on event-loop */ 253 if ((events & POLLIN) && uv__io_active(&stream->io_watcher, POLLIN)) 254 uv__stream_io(stream->loop, &stream->io_watcher, POLLIN); 255 256 if ((events & POLLOUT) && uv__io_active(&stream->io_watcher, POLLOUT)) 257 uv__stream_io(stream->loop, &stream->io_watcher, POLLOUT); 258 259 if (stream->flags & UV_HANDLE_CLOSING) 260 return; 261 262 /* NOTE: It is important to do it here, otherwise `select()` might be called 263 * before the actual `uv__read()`, leading to the blocking syscall 264 */ 265 uv_sem_post(&s->async_sem); 266 } 267 268 269 static void uv__stream_osx_cb_close(uv_handle_t* async) { 270 uv__stream_select_t* s; 271 272 s = container_of(async, uv__stream_select_t, async); 273 uv__free(s); 274 } 275 276 277 int uv__stream_try_select(uv_stream_t* stream, int* fd) { 278 /* 279 * kqueue doesn't work with some files from /dev mount on osx. 280 * select(2) in separate thread for those fds 281 */ 282 283 struct kevent filter[1]; 284 struct kevent events[1]; 285 struct timespec timeout; 286 uv__stream_select_t* s; 287 int fds[2]; 288 int err; 289 int ret; 290 int kq; 291 int old_fd; 292 int max_fd; 293 size_t sread_sz; 294 size_t swrite_sz; 295 296 kq = kqueue(); 297 if (kq == -1) { 298 perror("(libuv) kqueue()"); 299 return UV__ERR(errno); 300 } 301 302 EV_SET(&filter[0], *fd, EVFILT_READ, EV_ADD | EV_ENABLE, 0, 0, 0); 303 304 /* Use small timeout, because we only want to capture EINVALs */ 305 timeout.tv_sec = 0; 306 timeout.tv_nsec = 1; 307 308 do 309 ret = kevent(kq, filter, 1, events, 1, &timeout); 310 while (ret == -1 && errno == EINTR); 311 312 uv__close(kq); 313 314 if (ret == -1) 315 return UV__ERR(errno); 316 317 if (ret == 0 || (events[0].flags & EV_ERROR) == 0 || events[0].data != EINVAL) 318 return 0; 319 320 /* At this point we definitely know that this fd won't work with kqueue */ 321 322 /* 323 * Create fds for io watcher and to interrupt the select() loop. 324 * NOTE: do it ahead of malloc below to allocate enough space for fd_sets 325 */ 326 if (socketpair(AF_UNIX, SOCK_STREAM, 0, fds)) 327 return UV__ERR(errno); 328 329 max_fd = *fd; 330 if (fds[1] > max_fd) 331 max_fd = fds[1]; 332 333 sread_sz = ROUND_UP(max_fd + 1, sizeof(uint32_t) * NBBY) / NBBY; 334 swrite_sz = sread_sz; 335 336 s = uv__malloc(sizeof(*s) + sread_sz + swrite_sz); 337 if (s == NULL) { 338 err = UV_ENOMEM; 339 goto failed_malloc; 340 } 341 342 s->events = 0; 343 s->fd = *fd; 344 s->sread = (fd_set*) ((char*) s + sizeof(*s)); 345 s->sread_sz = sread_sz; 346 s->swrite = (fd_set*) ((char*) s->sread + sread_sz); 347 s->swrite_sz = swrite_sz; 348 349 err = uv_async_init(stream->loop, &s->async, uv__stream_osx_select_cb); 350 if (err) 351 goto failed_async_init; 352 353 s->async.flags |= UV_HANDLE_INTERNAL; 354 uv__handle_unref(&s->async); 355 356 err = uv_sem_init(&s->close_sem, 0); 357 if (err != 0) 358 goto failed_close_sem_init; 359 360 err = uv_sem_init(&s->async_sem, 0); 361 if (err != 0) 362 goto failed_async_sem_init; 363 364 s->fake_fd = fds[0]; 365 s->int_fd = fds[1]; 366 367 old_fd = *fd; 368 s->stream = stream; 369 stream->select = s; 370 *fd = s->fake_fd; 371 372 err = uv_thread_create(&s->thread, uv__stream_osx_select, stream); 373 if (err != 0) 374 goto failed_thread_create; 375 376 return 0; 377 378 failed_thread_create: 379 s->stream = NULL; 380 stream->select = NULL; 381 *fd = old_fd; 382 383 uv_sem_destroy(&s->async_sem); 384 385 failed_async_sem_init: 386 uv_sem_destroy(&s->close_sem); 387 388 failed_close_sem_init: 389 uv__close(fds[0]); 390 uv__close(fds[1]); 391 uv_close((uv_handle_t*) &s->async, uv__stream_osx_cb_close); 392 return err; 393 394 failed_async_init: 395 uv__free(s); 396 397 failed_malloc: 398 uv__close(fds[0]); 399 uv__close(fds[1]); 400 401 return err; 402 } 403 #endif /* defined(__APPLE__) */ 404 405 406 int uv__stream_open(uv_stream_t* stream, int fd, int flags) { 407 #if defined(__APPLE__) 408 int enable; 409 #endif 410 411 if (!(stream->io_watcher.fd == -1 || stream->io_watcher.fd == fd)) 412 return UV_EBUSY; 413 414 assert(fd >= 0); 415 stream->flags |= flags; 416 417 if (stream->type == UV_TCP) { 418 if ((stream->flags & UV_HANDLE_TCP_NODELAY) && uv__tcp_nodelay(fd, 1)) 419 return UV__ERR(errno); 420 421 /* TODO Use delay the user passed in. */ 422 if ((stream->flags & UV_HANDLE_TCP_KEEPALIVE) && 423 uv__tcp_keepalive(fd, 1, 60)) { 424 return UV__ERR(errno); 425 } 426 } 427 428 #if defined(__APPLE__) 429 enable = 1; 430 if (setsockopt(fd, SOL_SOCKET, SO_OOBINLINE, &enable, sizeof(enable)) && 431 errno != ENOTSOCK && 432 errno != EINVAL) { 433 return UV__ERR(errno); 434 } 435 #endif 436 437 stream->io_watcher.fd = fd; 438 439 return 0; 440 } 441 442 443 void uv__stream_flush_write_queue(uv_stream_t* stream, int error) { 444 uv_write_t* req; 445 QUEUE* q; 446 while (!QUEUE_EMPTY(&stream->write_queue)) { 447 q = QUEUE_HEAD(&stream->write_queue); 448 QUEUE_REMOVE(q); 449 450 req = QUEUE_DATA(q, uv_write_t, queue); 451 req->error = error; 452 453 QUEUE_INSERT_TAIL(&stream->write_completed_queue, &req->queue); 454 } 455 } 456 457 458 void uv__stream_destroy(uv_stream_t* stream) { 459 assert(!uv__io_active(&stream->io_watcher, POLLIN | POLLOUT)); 460 assert(stream->flags & UV_HANDLE_CLOSED); 461 462 if (stream->connect_req) { 463 uv__req_unregister(stream->loop, stream->connect_req); 464 stream->connect_req->cb(stream->connect_req, UV_ECANCELED); 465 stream->connect_req = NULL; 466 } 467 468 uv__stream_flush_write_queue(stream, UV_ECANCELED); 469 uv__write_callbacks(stream); 470 471 if (stream->shutdown_req) { 472 /* The ECANCELED error code is a lie, the shutdown(2) syscall is a 473 * fait accompli at this point. Maybe we should revisit this in v0.11. 474 * A possible reason for leaving it unchanged is that it informs the 475 * callee that the handle has been destroyed. 476 */ 477 uv__req_unregister(stream->loop, stream->shutdown_req); 478 stream->shutdown_req->cb(stream->shutdown_req, UV_ECANCELED); 479 stream->shutdown_req = NULL; 480 } 481 482 assert(stream->write_queue_size == 0); 483 } 484 485 486 /* Implements a best effort approach to mitigating accept() EMFILE errors. 487 * We have a spare file descriptor stashed away that we close to get below 488 * the EMFILE limit. Next, we accept all pending connections and close them 489 * immediately to signal the clients that we're overloaded - and we are, but 490 * we still keep on trucking. 491 * 492 * There is one caveat: it's not reliable in a multi-threaded environment. 493 * The file descriptor limit is per process. Our party trick fails if another 494 * thread opens a file or creates a socket in the time window between us 495 * calling close() and accept(). 496 */ 497 static int uv__emfile_trick(uv_loop_t* loop, int accept_fd) { 498 int err; 499 int emfile_fd; 500 501 if (loop->emfile_fd == -1) 502 return UV_EMFILE; 503 504 uv__close(loop->emfile_fd); 505 loop->emfile_fd = -1; 506 507 do { 508 err = uv__accept(accept_fd); 509 if (err >= 0) 510 uv__close(err); 511 } while (err >= 0 || err == UV_EINTR); 512 513 emfile_fd = uv__open_cloexec("/", O_RDONLY); 514 if (emfile_fd >= 0) 515 loop->emfile_fd = emfile_fd; 516 517 return err; 518 } 519 520 521 #if defined(UV_HAVE_KQUEUE) 522 # define UV_DEC_BACKLOG(w) w->rcount--; 523 #else 524 # define UV_DEC_BACKLOG(w) /* no-op */ 525 #endif /* defined(UV_HAVE_KQUEUE) */ 526 527 528 void uv__server_io(uv_loop_t* loop, uv__io_t* w, unsigned int events) { 529 uv_stream_t* stream; 530 int err; 531 532 stream = container_of(w, uv_stream_t, io_watcher); 533 assert(events & POLLIN); 534 assert(stream->accepted_fd == -1); 535 assert(!(stream->flags & UV_HANDLE_CLOSING)); 536 537 uv__io_start(stream->loop, &stream->io_watcher, POLLIN); 538 539 /* connection_cb can close the server socket while we're 540 * in the loop so check it on each iteration. 541 */ 542 while (uv__stream_fd(stream) != -1) { 543 assert(stream->accepted_fd == -1); 544 545 #if defined(UV_HAVE_KQUEUE) 546 if (w->rcount <= 0) 547 return; 548 #endif /* defined(UV_HAVE_KQUEUE) */ 549 550 err = uv__accept(uv__stream_fd(stream)); 551 if (err < 0) { 552 if (err == UV_EAGAIN || err == UV__ERR(EWOULDBLOCK)) 553 return; /* Not an error. */ 554 555 if (err == UV_ECONNABORTED) 556 continue; /* Ignore. Nothing we can do about that. */ 557 558 if (err == UV_EMFILE || err == UV_ENFILE) { 559 err = uv__emfile_trick(loop, uv__stream_fd(stream)); 560 if (err == UV_EAGAIN || err == UV__ERR(EWOULDBLOCK)) 561 break; 562 } 563 564 stream->connection_cb(stream, err); 565 continue; 566 } 567 568 UV_DEC_BACKLOG(w) 569 stream->accepted_fd = err; 570 stream->connection_cb(stream, 0); 571 572 if (stream->accepted_fd != -1) { 573 /* The user hasn't yet accepted called uv_accept() */ 574 uv__io_stop(loop, &stream->io_watcher, POLLIN); 575 return; 576 } 577 578 if (stream->type == UV_TCP && 579 (stream->flags & UV_HANDLE_TCP_SINGLE_ACCEPT)) { 580 /* Give other processes a chance to accept connections. */ 581 struct timespec timeout = { 0, 1 }; 582 nanosleep(&timeout, NULL); 583 } 584 } 585 } 586 587 588 #undef UV_DEC_BACKLOG 589 590 591 int uv_accept(uv_stream_t* server, uv_stream_t* client) { 592 int err; 593 594 assert(server->loop == client->loop); 595 596 if (server->accepted_fd == -1) 597 return UV_EAGAIN; 598 599 switch (client->type) { 600 case UV_NAMED_PIPE: 601 case UV_TCP: 602 err = uv__stream_open(client, 603 server->accepted_fd, 604 UV_HANDLE_READABLE | UV_HANDLE_WRITABLE); 605 if (err) { 606 /* TODO handle error */ 607 uv__close(server->accepted_fd); 608 goto done; 609 } 610 break; 611 612 case UV_UDP: 613 err = uv_udp_open((uv_udp_t*) client, server->accepted_fd); 614 if (err) { 615 uv__close(server->accepted_fd); 616 goto done; 617 } 618 break; 619 620 default: 621 return UV_EINVAL; 622 } 623 624 client->flags |= UV_HANDLE_BOUND; 625 626 done: 627 /* Process queued fds */ 628 if (server->queued_fds != NULL) { 629 uv__stream_queued_fds_t* queued_fds; 630 631 queued_fds = server->queued_fds; 632 633 /* Read first */ 634 server->accepted_fd = queued_fds->fds[0]; 635 636 /* All read, free */ 637 assert(queued_fds->offset > 0); 638 if (--queued_fds->offset == 0) { 639 uv__free(queued_fds); 640 server->queued_fds = NULL; 641 } else { 642 /* Shift rest */ 643 memmove(queued_fds->fds, 644 queued_fds->fds + 1, 645 queued_fds->offset * sizeof(*queued_fds->fds)); 646 } 647 } else { 648 server->accepted_fd = -1; 649 if (err == 0) 650 uv__io_start(server->loop, &server->io_watcher, POLLIN); 651 } 652 return err; 653 } 654 655 656 int uv_listen(uv_stream_t* stream, int backlog, uv_connection_cb cb) { 657 int err; 658 659 switch (stream->type) { 660 case UV_TCP: 661 err = uv_tcp_listen((uv_tcp_t*)stream, backlog, cb); 662 break; 663 664 case UV_NAMED_PIPE: 665 err = uv_pipe_listen((uv_pipe_t*)stream, backlog, cb); 666 break; 667 668 default: 669 err = UV_EINVAL; 670 } 671 672 if (err == 0) 673 uv__handle_start(stream); 674 675 return err; 676 } 677 678 679 static void uv__drain(uv_stream_t* stream) { 680 uv_shutdown_t* req; 681 int err; 682 683 assert(QUEUE_EMPTY(&stream->write_queue)); 684 uv__io_stop(stream->loop, &stream->io_watcher, POLLOUT); 685 uv__stream_osx_interrupt_select(stream); 686 687 /* Shutdown? */ 688 if ((stream->flags & UV_HANDLE_SHUTTING) && 689 !(stream->flags & UV_HANDLE_CLOSING) && 690 !(stream->flags & UV_HANDLE_SHUT)) { 691 assert(stream->shutdown_req); 692 693 req = stream->shutdown_req; 694 stream->shutdown_req = NULL; 695 stream->flags &= ~UV_HANDLE_SHUTTING; 696 uv__req_unregister(stream->loop, req); 697 698 err = 0; 699 if (shutdown(uv__stream_fd(stream), SHUT_WR)) 700 err = UV__ERR(errno); 701 702 if (err == 0) 703 stream->flags |= UV_HANDLE_SHUT; 704 705 if (req->cb != NULL) 706 req->cb(req, err); 707 } 708 } 709 710 711 static ssize_t uv__writev(int fd, struct iovec* vec, size_t n) { 712 if (n == 1) 713 return write(fd, vec->iov_base, vec->iov_len); 714 else 715 return writev(fd, vec, n); 716 } 717 718 719 static size_t uv__write_req_size(uv_write_t* req) { 720 size_t size; 721 722 assert(req->bufs != NULL); 723 size = uv__count_bufs(req->bufs + req->write_index, 724 req->nbufs - req->write_index); 725 assert(req->handle->write_queue_size >= size); 726 727 return size; 728 } 729 730 731 /* Returns 1 if all write request data has been written, or 0 if there is still 732 * more data to write. 733 * 734 * Note: the return value only says something about the *current* request. 735 * There may still be other write requests sitting in the queue. 736 */ 737 static int uv__write_req_update(uv_stream_t* stream, 738 uv_write_t* req, 739 size_t n) { 740 uv_buf_t* buf; 741 size_t len; 742 743 assert(n <= stream->write_queue_size); 744 stream->write_queue_size -= n; 745 746 buf = req->bufs + req->write_index; 747 748 do { 749 len = n < buf->len ? n : buf->len; 750 buf->base += len; 751 buf->len -= len; 752 buf += (buf->len == 0); /* Advance to next buffer if this one is empty. */ 753 n -= len; 754 } while (n > 0); 755 756 req->write_index = buf - req->bufs; 757 758 return req->write_index == req->nbufs; 759 } 760 761 762 static void uv__write_req_finish(uv_write_t* req) { 763 uv_stream_t* stream = req->handle; 764 765 /* Pop the req off tcp->write_queue. */ 766 QUEUE_REMOVE(&req->queue); 767 768 /* Only free when there was no error. On error, we touch up write_queue_size 769 * right before making the callback. The reason we don't do that right away 770 * is that a write_queue_size > 0 is our only way to signal to the user that 771 * they should stop writing - which they should if we got an error. Something 772 * to revisit in future revisions of the libuv API. 773 */ 774 if (req->error == 0) { 775 if (req->bufs != req->bufsml) 776 uv__free(req->bufs); 777 req->bufs = NULL; 778 } 779 780 /* Add it to the write_completed_queue where it will have its 781 * callback called in the near future. 782 */ 783 QUEUE_INSERT_TAIL(&stream->write_completed_queue, &req->queue); 784 uv__io_feed(stream->loop, &stream->io_watcher); 785 } 786 787 788 static int uv__handle_fd(uv_handle_t* handle) { 789 switch (handle->type) { 790 case UV_NAMED_PIPE: 791 case UV_TCP: 792 return ((uv_stream_t*) handle)->io_watcher.fd; 793 794 case UV_UDP: 795 return ((uv_udp_t*) handle)->io_watcher.fd; 796 797 default: 798 return -1; 799 } 800 } 801 802 static void uv__write(uv_stream_t* stream) { 803 struct iovec* iov; 804 QUEUE* q; 805 uv_write_t* req; 806 int iovmax; 807 int iovcnt; 808 ssize_t n; 809 int err; 810 811 start: 812 813 assert(uv__stream_fd(stream) >= 0); 814 815 if (QUEUE_EMPTY(&stream->write_queue)) 816 return; 817 818 q = QUEUE_HEAD(&stream->write_queue); 819 req = QUEUE_DATA(q, uv_write_t, queue); 820 assert(req->handle == stream); 821 822 /* 823 * Cast to iovec. We had to have our own uv_buf_t instead of iovec 824 * because Windows's WSABUF is not an iovec. 825 */ 826 assert(sizeof(uv_buf_t) == sizeof(struct iovec)); 827 iov = (struct iovec*) &(req->bufs[req->write_index]); 828 iovcnt = req->nbufs - req->write_index; 829 830 iovmax = uv__getiovmax(); 831 832 /* Limit iov count to avoid EINVALs from writev() */ 833 if (iovcnt > iovmax) 834 iovcnt = iovmax; 835 836 /* 837 * Now do the actual writev. Note that we've been updating the pointers 838 * inside the iov each time we write. So there is no need to offset it. 839 */ 840 841 if (req->send_handle) { 842 int fd_to_send; 843 struct msghdr msg; 844 struct cmsghdr *cmsg; 845 union { 846 char data[64]; 847 struct cmsghdr alias; 848 } scratch; 849 850 if (uv__is_closing(req->send_handle)) { 851 err = UV_EBADF; 852 goto error; 853 } 854 855 fd_to_send = uv__handle_fd((uv_handle_t*) req->send_handle); 856 857 memset(&scratch, 0, sizeof(scratch)); 858 859 assert(fd_to_send >= 0); 860 861 msg.msg_name = NULL; 862 msg.msg_namelen = 0; 863 msg.msg_iov = iov; 864 msg.msg_iovlen = iovcnt; 865 msg.msg_flags = 0; 866 867 msg.msg_control = &scratch.alias; 868 msg.msg_controllen = CMSG_SPACE(sizeof(fd_to_send)); 869 870 cmsg = CMSG_FIRSTHDR(&msg); 871 cmsg->cmsg_level = SOL_SOCKET; 872 cmsg->cmsg_type = SCM_RIGHTS; 873 cmsg->cmsg_len = CMSG_LEN(sizeof(fd_to_send)); 874 875 /* silence aliasing warning */ 876 { 877 void* pv = CMSG_DATA(cmsg); 878 int* pi = pv; 879 *pi = fd_to_send; 880 } 881 882 do 883 n = sendmsg(uv__stream_fd(stream), &msg, 0); 884 while (n == -1 && RETRY_ON_WRITE_ERROR(errno)); 885 886 /* Ensure the handle isn't sent again in case this is a partial write. */ 887 if (n >= 0) 888 req->send_handle = NULL; 889 } else { 890 do 891 n = uv__writev(uv__stream_fd(stream), iov, iovcnt); 892 while (n == -1 && RETRY_ON_WRITE_ERROR(errno)); 893 } 894 895 if (n == -1 && !IS_TRANSIENT_WRITE_ERROR(errno, req->send_handle)) { 896 err = UV__ERR(errno); 897 goto error; 898 } 899 900 if (n >= 0 && uv__write_req_update(stream, req, n)) { 901 uv__write_req_finish(req); 902 return; /* TODO(bnoordhuis) Start trying to write the next request. */ 903 } 904 905 /* If this is a blocking stream, try again. */ 906 if (stream->flags & UV_HANDLE_BLOCKING_WRITES) 907 goto start; 908 909 /* We're not done. */ 910 uv__io_start(stream->loop, &stream->io_watcher, POLLOUT); 911 912 /* Notify select() thread about state change */ 913 uv__stream_osx_interrupt_select(stream); 914 915 return; 916 917 error: 918 req->error = err; 919 uv__write_req_finish(req); 920 uv__io_stop(stream->loop, &stream->io_watcher, POLLOUT); 921 if (!uv__io_active(&stream->io_watcher, POLLIN)) 922 uv__handle_stop(stream); 923 uv__stream_osx_interrupt_select(stream); 924 } 925 926 927 static void uv__write_callbacks(uv_stream_t* stream) { 928 uv_write_t* req; 929 QUEUE* q; 930 QUEUE pq; 931 932 if (QUEUE_EMPTY(&stream->write_completed_queue)) 933 return; 934 935 QUEUE_MOVE(&stream->write_completed_queue, &pq); 936 937 while (!QUEUE_EMPTY(&pq)) { 938 /* Pop a req off write_completed_queue. */ 939 q = QUEUE_HEAD(&pq); 940 req = QUEUE_DATA(q, uv_write_t, queue); 941 QUEUE_REMOVE(q); 942 uv__req_unregister(stream->loop, req); 943 944 if (req->bufs != NULL) { 945 stream->write_queue_size -= uv__write_req_size(req); 946 if (req->bufs != req->bufsml) 947 uv__free(req->bufs); 948 req->bufs = NULL; 949 } 950 951 /* NOTE: call callback AFTER freeing the request data. */ 952 if (req->cb) 953 req->cb(req, req->error); 954 } 955 } 956 957 958 uv_handle_type uv__handle_type(int fd) { 959 struct sockaddr_storage ss; 960 socklen_t sslen; 961 socklen_t len; 962 int type; 963 964 memset(&ss, 0, sizeof(ss)); 965 sslen = sizeof(ss); 966 967 if (getsockname(fd, (struct sockaddr*)&ss, &sslen)) 968 return UV_UNKNOWN_HANDLE; 969 970 len = sizeof type; 971 972 if (getsockopt(fd, SOL_SOCKET, SO_TYPE, &type, &len)) 973 return UV_UNKNOWN_HANDLE; 974 975 if (type == SOCK_STREAM) { 976 #if defined(_AIX) || defined(__DragonFly__) 977 /* on AIX/DragonFly the getsockname call returns an empty sa structure 978 * for sockets of type AF_UNIX. For all other types it will 979 * return a properly filled in structure. 980 */ 981 if (sslen == 0) 982 return UV_NAMED_PIPE; 983 #endif 984 switch (ss.ss_family) { 985 case AF_UNIX: 986 return UV_NAMED_PIPE; 987 case AF_INET: 988 case AF_INET6: 989 return UV_TCP; 990 } 991 } 992 993 if (type == SOCK_DGRAM && 994 (ss.ss_family == AF_INET || ss.ss_family == AF_INET6)) 995 return UV_UDP; 996 997 return UV_UNKNOWN_HANDLE; 998 } 999 1000 1001 static void uv__stream_eof(uv_stream_t* stream, const uv_buf_t* buf) { 1002 stream->flags |= UV_HANDLE_READ_EOF; 1003 stream->flags &= ~UV_HANDLE_READING; 1004 uv__io_stop(stream->loop, &stream->io_watcher, POLLIN); 1005 if (!uv__io_active(&stream->io_watcher, POLLOUT)) 1006 uv__handle_stop(stream); 1007 uv__stream_osx_interrupt_select(stream); 1008 stream->read_cb(stream, UV_EOF, buf); 1009 } 1010 1011 1012 static int uv__stream_queue_fd(uv_stream_t* stream, int fd) { 1013 uv__stream_queued_fds_t* queued_fds; 1014 unsigned int queue_size; 1015 1016 queued_fds = stream->queued_fds; 1017 if (queued_fds == NULL) { 1018 queue_size = 8; 1019 queued_fds = uv__malloc((queue_size - 1) * sizeof(*queued_fds->fds) + 1020 sizeof(*queued_fds)); 1021 if (queued_fds == NULL) 1022 return UV_ENOMEM; 1023 queued_fds->size = queue_size; 1024 queued_fds->offset = 0; 1025 stream->queued_fds = queued_fds; 1026 1027 /* Grow */ 1028 } else if (queued_fds->size == queued_fds->offset) { 1029 queue_size = queued_fds->size + 8; 1030 queued_fds = uv__realloc(queued_fds, 1031 (queue_size - 1) * sizeof(*queued_fds->fds) + 1032 sizeof(*queued_fds)); 1033 1034 /* 1035 * Allocation failure, report back. 1036 * NOTE: if it is fatal - sockets will be closed in uv__stream_close 1037 */ 1038 if (queued_fds == NULL) 1039 return UV_ENOMEM; 1040 queued_fds->size = queue_size; 1041 stream->queued_fds = queued_fds; 1042 } 1043 1044 /* Put fd in a queue */ 1045 queued_fds->fds[queued_fds->offset++] = fd; 1046 1047 return 0; 1048 } 1049 1050 1051 #if defined(__PASE__) 1052 /* on IBMi PASE the control message length can not exceed 256. */ 1053 # define UV__CMSG_FD_COUNT 60 1054 #else 1055 # define UV__CMSG_FD_COUNT 64 1056 #endif 1057 #define UV__CMSG_FD_SIZE (UV__CMSG_FD_COUNT * sizeof(int)) 1058 1059 1060 static int uv__stream_recv_cmsg(uv_stream_t* stream, struct msghdr* msg) { 1061 struct cmsghdr* cmsg; 1062 1063 for (cmsg = CMSG_FIRSTHDR(msg); cmsg != NULL; cmsg = CMSG_NXTHDR(msg, cmsg)) { 1064 char* start; 1065 char* end; 1066 int err; 1067 void* pv; 1068 int* pi; 1069 unsigned int i; 1070 unsigned int count; 1071 1072 if (cmsg->cmsg_type != SCM_RIGHTS) { 1073 fprintf(stderr, "ignoring non-SCM_RIGHTS ancillary data: %d\n", 1074 cmsg->cmsg_type); 1075 continue; 1076 } 1077 1078 /* silence aliasing warning */ 1079 pv = CMSG_DATA(cmsg); 1080 pi = pv; 1081 1082 /* Count available fds */ 1083 start = (char*) cmsg; 1084 end = (char*) cmsg + cmsg->cmsg_len; 1085 count = 0; 1086 while (start + CMSG_LEN(count * sizeof(*pi)) < end) 1087 count++; 1088 assert(start + CMSG_LEN(count * sizeof(*pi)) == end); 1089 1090 for (i = 0; i < count; i++) { 1091 /* Already has accepted fd, queue now */ 1092 if (stream->accepted_fd != -1) { 1093 err = uv__stream_queue_fd(stream, pi[i]); 1094 if (err != 0) { 1095 /* Close rest */ 1096 for (; i < count; i++) 1097 uv__close(pi[i]); 1098 return err; 1099 } 1100 } else { 1101 stream->accepted_fd = pi[i]; 1102 } 1103 } 1104 } 1105 1106 return 0; 1107 } 1108 1109 1110 #ifdef __clang__ 1111 # pragma clang diagnostic push 1112 # pragma clang diagnostic ignored "-Wgnu-folding-constant" 1113 # pragma clang diagnostic ignored "-Wvla-extension" 1114 #endif 1115 1116 static void uv__read(uv_stream_t* stream) { 1117 uv_buf_t buf; 1118 ssize_t nread; 1119 struct msghdr msg; 1120 char cmsg_space[CMSG_SPACE(UV__CMSG_FD_SIZE)]; 1121 int count; 1122 int err; 1123 int is_ipc; 1124 1125 stream->flags &= ~UV_HANDLE_READ_PARTIAL; 1126 1127 /* Prevent loop starvation when the data comes in as fast as (or faster than) 1128 * we can read it. XXX Need to rearm fd if we switch to edge-triggered I/O. 1129 */ 1130 count = 32; 1131 1132 is_ipc = stream->type == UV_NAMED_PIPE && ((uv_pipe_t*) stream)->ipc; 1133 1134 /* XXX: Maybe instead of having UV_HANDLE_READING we just test if 1135 * tcp->read_cb is NULL or not? 1136 */ 1137 while (stream->read_cb 1138 && (stream->flags & UV_HANDLE_READING) 1139 && (count-- > 0)) { 1140 assert(stream->alloc_cb != NULL); 1141 1142 buf = uv_buf_init(NULL, 0); 1143 stream->alloc_cb((uv_handle_t*)stream, 64 * 1024, &buf); 1144 if (buf.base == NULL || buf.len == 0) { 1145 /* User indicates it can't or won't handle the read. */ 1146 stream->read_cb(stream, UV_ENOBUFS, &buf); 1147 return; 1148 } 1149 1150 assert(buf.base != NULL); 1151 assert(uv__stream_fd(stream) >= 0); 1152 1153 if (!is_ipc) { 1154 do { 1155 nread = read(uv__stream_fd(stream), buf.base, buf.len); 1156 } 1157 while (nread < 0 && errno == EINTR); 1158 } else { 1159 /* ipc uses recvmsg */ 1160 msg.msg_flags = 0; 1161 msg.msg_iov = (struct iovec*) &buf; 1162 msg.msg_iovlen = 1; 1163 msg.msg_name = NULL; 1164 msg.msg_namelen = 0; 1165 /* Set up to receive a descriptor even if one isn't in the message */ 1166 msg.msg_controllen = sizeof(cmsg_space); 1167 msg.msg_control = cmsg_space; 1168 1169 do { 1170 nread = uv__recvmsg(uv__stream_fd(stream), &msg, 0); 1171 } 1172 while (nread < 0 && errno == EINTR); 1173 } 1174 1175 if (nread < 0) { 1176 /* Error */ 1177 if (errno == EAGAIN || errno == EWOULDBLOCK) { 1178 /* Wait for the next one. */ 1179 if (stream->flags & UV_HANDLE_READING) { 1180 uv__io_start(stream->loop, &stream->io_watcher, POLLIN); 1181 uv__stream_osx_interrupt_select(stream); 1182 } 1183 stream->read_cb(stream, 0, &buf); 1184 #if defined(__CYGWIN__) || defined(__MSYS__) 1185 } else if (errno == ECONNRESET && stream->type == UV_NAMED_PIPE) { 1186 uv__stream_eof(stream, &buf); 1187 return; 1188 #endif 1189 } else { 1190 /* Error. User should call uv_close(). */ 1191 stream->read_cb(stream, UV__ERR(errno), &buf); 1192 if (stream->flags & UV_HANDLE_READING) { 1193 stream->flags &= ~UV_HANDLE_READING; 1194 uv__io_stop(stream->loop, &stream->io_watcher, POLLIN); 1195 if (!uv__io_active(&stream->io_watcher, POLLOUT)) 1196 uv__handle_stop(stream); 1197 uv__stream_osx_interrupt_select(stream); 1198 } 1199 } 1200 return; 1201 } else if (nread == 0) { 1202 uv__stream_eof(stream, &buf); 1203 return; 1204 } else { 1205 /* Successful read */ 1206 ssize_t buflen = buf.len; 1207 1208 if (is_ipc) { 1209 err = uv__stream_recv_cmsg(stream, &msg); 1210 if (err != 0) { 1211 stream->read_cb(stream, err, &buf); 1212 return; 1213 } 1214 } 1215 1216 #if defined(__MVS__) 1217 if (is_ipc && msg.msg_controllen > 0) { 1218 uv_buf_t blankbuf; 1219 int nread; 1220 struct iovec *old; 1221 1222 blankbuf.base = 0; 1223 blankbuf.len = 0; 1224 old = msg.msg_iov; 1225 msg.msg_iov = (struct iovec*) &blankbuf; 1226 nread = 0; 1227 do { 1228 nread = uv__recvmsg(uv__stream_fd(stream), &msg, 0); 1229 err = uv__stream_recv_cmsg(stream, &msg); 1230 if (err != 0) { 1231 stream->read_cb(stream, err, &buf); 1232 msg.msg_iov = old; 1233 return; 1234 } 1235 } while (nread == 0 && msg.msg_controllen > 0); 1236 msg.msg_iov = old; 1237 } 1238 #endif 1239 stream->read_cb(stream, nread, &buf); 1240 1241 /* Return if we didn't fill the buffer, there is no more data to read. */ 1242 if (nread < buflen) { 1243 stream->flags |= UV_HANDLE_READ_PARTIAL; 1244 return; 1245 } 1246 } 1247 } 1248 } 1249 1250 1251 #ifdef __clang__ 1252 # pragma clang diagnostic pop 1253 #endif 1254 1255 #undef UV__CMSG_FD_COUNT 1256 #undef UV__CMSG_FD_SIZE 1257 1258 1259 int uv_shutdown(uv_shutdown_t* req, uv_stream_t* stream, uv_shutdown_cb cb) { 1260 assert(stream->type == UV_TCP || 1261 stream->type == UV_TTY || 1262 stream->type == UV_NAMED_PIPE); 1263 1264 if (!(stream->flags & UV_HANDLE_WRITABLE) || 1265 stream->flags & UV_HANDLE_SHUT || 1266 stream->flags & UV_HANDLE_SHUTTING || 1267 uv__is_closing(stream)) { 1268 return UV_ENOTCONN; 1269 } 1270 1271 assert(uv__stream_fd(stream) >= 0); 1272 1273 /* Initialize request */ 1274 uv__req_init(stream->loop, req, UV_SHUTDOWN); 1275 req->handle = stream; 1276 req->cb = cb; 1277 stream->shutdown_req = req; 1278 stream->flags |= UV_HANDLE_SHUTTING; 1279 1280 uv__io_start(stream->loop, &stream->io_watcher, POLLOUT); 1281 uv__stream_osx_interrupt_select(stream); 1282 1283 return 0; 1284 } 1285 1286 1287 static void uv__stream_io(uv_loop_t* loop, uv__io_t* w, unsigned int events) { 1288 uv_stream_t* stream; 1289 1290 stream = container_of(w, uv_stream_t, io_watcher); 1291 1292 assert(stream->type == UV_TCP || 1293 stream->type == UV_NAMED_PIPE || 1294 stream->type == UV_TTY); 1295 assert(!(stream->flags & UV_HANDLE_CLOSING)); 1296 1297 if (stream->connect_req) { 1298 uv__stream_connect(stream); 1299 return; 1300 } 1301 1302 assert(uv__stream_fd(stream) >= 0); 1303 1304 /* Ignore POLLHUP here. Even if it's set, there may still be data to read. */ 1305 if (events & (POLLIN | POLLERR | POLLHUP)) 1306 uv__read(stream); 1307 1308 if (uv__stream_fd(stream) == -1) 1309 return; /* read_cb closed stream. */ 1310 1311 /* Short-circuit iff POLLHUP is set, the user is still interested in read 1312 * events and uv__read() reported a partial read but not EOF. If the EOF 1313 * flag is set, uv__read() called read_cb with err=UV_EOF and we don't 1314 * have to do anything. If the partial read flag is not set, we can't 1315 * report the EOF yet because there is still data to read. 1316 */ 1317 if ((events & POLLHUP) && 1318 (stream->flags & UV_HANDLE_READING) && 1319 (stream->flags & UV_HANDLE_READ_PARTIAL) && 1320 !(stream->flags & UV_HANDLE_READ_EOF)) { 1321 uv_buf_t buf = { NULL, 0 }; 1322 uv__stream_eof(stream, &buf); 1323 } 1324 1325 if (uv__stream_fd(stream) == -1) 1326 return; /* read_cb closed stream. */ 1327 1328 if (events & (POLLOUT | POLLERR | POLLHUP)) { 1329 uv__write(stream); 1330 uv__write_callbacks(stream); 1331 1332 /* Write queue drained. */ 1333 if (QUEUE_EMPTY(&stream->write_queue)) 1334 uv__drain(stream); 1335 } 1336 } 1337 1338 1339 /** 1340 * We get called here from directly following a call to connect(2). 1341 * In order to determine if we've errored out or succeeded must call 1342 * getsockopt. 1343 */ 1344 static void uv__stream_connect(uv_stream_t* stream) { 1345 int error; 1346 uv_connect_t* req = stream->connect_req; 1347 socklen_t errorsize = sizeof(int); 1348 1349 assert(stream->type == UV_TCP || stream->type == UV_NAMED_PIPE); 1350 assert(req); 1351 1352 if (stream->delayed_error) { 1353 /* To smooth over the differences between unixes errors that 1354 * were reported synchronously on the first connect can be delayed 1355 * until the next tick--which is now. 1356 */ 1357 error = stream->delayed_error; 1358 stream->delayed_error = 0; 1359 } else { 1360 /* Normal situation: we need to get the socket error from the kernel. */ 1361 assert(uv__stream_fd(stream) >= 0); 1362 getsockopt(uv__stream_fd(stream), 1363 SOL_SOCKET, 1364 SO_ERROR, 1365 &error, 1366 &errorsize); 1367 error = UV__ERR(error); 1368 } 1369 1370 if (error == UV__ERR(EINPROGRESS)) 1371 return; 1372 1373 stream->connect_req = NULL; 1374 uv__req_unregister(stream->loop, req); 1375 1376 if (error < 0 || QUEUE_EMPTY(&stream->write_queue)) { 1377 uv__io_stop(stream->loop, &stream->io_watcher, POLLOUT); 1378 } 1379 1380 if (req->cb) 1381 req->cb(req, error); 1382 1383 if (uv__stream_fd(stream) == -1) 1384 return; 1385 1386 if (error < 0) { 1387 uv__stream_flush_write_queue(stream, UV_ECANCELED); 1388 uv__write_callbacks(stream); 1389 } 1390 } 1391 1392 1393 int uv_write2(uv_write_t* req, 1394 uv_stream_t* stream, 1395 const uv_buf_t bufs[], 1396 unsigned int nbufs, 1397 uv_stream_t* send_handle, 1398 uv_write_cb cb) { 1399 int empty_queue; 1400 1401 assert(nbufs > 0); 1402 assert((stream->type == UV_TCP || 1403 stream->type == UV_NAMED_PIPE || 1404 stream->type == UV_TTY) && 1405 "uv_write (unix) does not yet support other types of streams"); 1406 1407 if (uv__stream_fd(stream) < 0) 1408 return UV_EBADF; 1409 1410 if (!(stream->flags & UV_HANDLE_WRITABLE)) 1411 return UV_EPIPE; 1412 1413 if (send_handle) { 1414 if (stream->type != UV_NAMED_PIPE || !((uv_pipe_t*)stream)->ipc) 1415 return UV_EINVAL; 1416 1417 /* XXX We abuse uv_write2() to send over UDP handles to child processes. 1418 * Don't call uv__stream_fd() on those handles, it's a macro that on OS X 1419 * evaluates to a function that operates on a uv_stream_t with a couple of 1420 * OS X specific fields. On other Unices it does (handle)->io_watcher.fd, 1421 * which works but only by accident. 1422 */ 1423 if (uv__handle_fd((uv_handle_t*) send_handle) < 0) 1424 return UV_EBADF; 1425 1426 #if defined(__CYGWIN__) || defined(__MSYS__) 1427 /* Cygwin recvmsg always sets msg_controllen to zero, so we cannot send it. 1428 See https://github.com/mirror/newlib-cygwin/blob/86fc4bf0/winsup/cygwin/fhandler_socket.cc#L1736-L1743 */ 1429 return UV_ENOSYS; 1430 #endif 1431 } 1432 1433 /* It's legal for write_queue_size > 0 even when the write_queue is empty; 1434 * it means there are error-state requests in the write_completed_queue that 1435 * will touch up write_queue_size later, see also uv__write_req_finish(). 1436 * We could check that write_queue is empty instead but that implies making 1437 * a write() syscall when we know that the handle is in error mode. 1438 */ 1439 empty_queue = (stream->write_queue_size == 0); 1440 1441 /* Initialize the req */ 1442 uv__req_init(stream->loop, req, UV_WRITE); 1443 req->cb = cb; 1444 req->handle = stream; 1445 req->error = 0; 1446 req->send_handle = send_handle; 1447 QUEUE_INIT(&req->queue); 1448 1449 req->bufs = req->bufsml; 1450 if (nbufs > ARRAY_SIZE(req->bufsml)) 1451 req->bufs = uv__malloc(nbufs * sizeof(bufs[0])); 1452 1453 if (req->bufs == NULL) 1454 return UV_ENOMEM; 1455 1456 memcpy(req->bufs, bufs, nbufs * sizeof(bufs[0])); 1457 req->nbufs = nbufs; 1458 req->write_index = 0; 1459 stream->write_queue_size += uv__count_bufs(bufs, nbufs); 1460 1461 /* Append the request to write_queue. */ 1462 QUEUE_INSERT_TAIL(&stream->write_queue, &req->queue); 1463 1464 /* If the queue was empty when this function began, we should attempt to 1465 * do the write immediately. Otherwise start the write_watcher and wait 1466 * for the fd to become writable. 1467 */ 1468 if (stream->connect_req) { 1469 /* Still connecting, do nothing. */ 1470 } 1471 else if (empty_queue) { 1472 uv__write(stream); 1473 } 1474 else { 1475 /* 1476 * blocking streams should never have anything in the queue. 1477 * if this assert fires then somehow the blocking stream isn't being 1478 * sufficiently flushed in uv__write. 1479 */ 1480 assert(!(stream->flags & UV_HANDLE_BLOCKING_WRITES)); 1481 uv__io_start(stream->loop, &stream->io_watcher, POLLOUT); 1482 uv__stream_osx_interrupt_select(stream); 1483 } 1484 1485 return 0; 1486 } 1487 1488 1489 /* The buffers to be written must remain valid until the callback is called. 1490 * This is not required for the uv_buf_t array. 1491 */ 1492 int uv_write(uv_write_t* req, 1493 uv_stream_t* handle, 1494 const uv_buf_t bufs[], 1495 unsigned int nbufs, 1496 uv_write_cb cb) { 1497 return uv_write2(req, handle, bufs, nbufs, NULL, cb); 1498 } 1499 1500 1501 void uv_try_write_cb(uv_write_t* req, int status) { 1502 /* Should not be called */ 1503 abort(); 1504 } 1505 1506 1507 int uv_try_write(uv_stream_t* stream, 1508 const uv_buf_t bufs[], 1509 unsigned int nbufs) { 1510 int r; 1511 int has_pollout; 1512 size_t written; 1513 size_t req_size; 1514 uv_write_t req; 1515 1516 /* Connecting or already writing some data */ 1517 if (stream->connect_req != NULL || stream->write_queue_size != 0) 1518 return UV_EAGAIN; 1519 1520 has_pollout = uv__io_active(&stream->io_watcher, POLLOUT); 1521 1522 r = uv_write(&req, stream, bufs, nbufs, uv_try_write_cb); 1523 if (r != 0) 1524 return r; 1525 1526 /* Remove not written bytes from write queue size */ 1527 written = uv__count_bufs(bufs, nbufs); 1528 if (req.bufs != NULL) 1529 req_size = uv__write_req_size(&req); 1530 else 1531 req_size = 0; 1532 written -= req_size; 1533 stream->write_queue_size -= req_size; 1534 1535 /* Unqueue request, regardless of immediateness */ 1536 QUEUE_REMOVE(&req.queue); 1537 uv__req_unregister(stream->loop, &req); 1538 if (req.bufs != req.bufsml) 1539 uv__free(req.bufs); 1540 req.bufs = NULL; 1541 1542 /* Do not poll for writable, if we wasn't before calling this */ 1543 if (!has_pollout) { 1544 uv__io_stop(stream->loop, &stream->io_watcher, POLLOUT); 1545 uv__stream_osx_interrupt_select(stream); 1546 } 1547 1548 if (written == 0 && req_size != 0) 1549 return req.error < 0 ? req.error : UV_EAGAIN; 1550 else 1551 return written; 1552 } 1553 1554 1555 int uv_read_start(uv_stream_t* stream, 1556 uv_alloc_cb alloc_cb, 1557 uv_read_cb read_cb) { 1558 assert(stream->type == UV_TCP || stream->type == UV_NAMED_PIPE || 1559 stream->type == UV_TTY); 1560 1561 if (stream->flags & UV_HANDLE_CLOSING) 1562 return UV_EINVAL; 1563 1564 if (!(stream->flags & UV_HANDLE_READABLE)) 1565 return UV_ENOTCONN; 1566 1567 /* The UV_HANDLE_READING flag is irrelevant of the state of the tcp - it just 1568 * expresses the desired state of the user. 1569 */ 1570 stream->flags |= UV_HANDLE_READING; 1571 1572 /* TODO: try to do the read inline? */ 1573 /* TODO: keep track of tcp state. If we've gotten a EOF then we should 1574 * not start the IO watcher. 1575 */ 1576 assert(uv__stream_fd(stream) >= 0); 1577 assert(alloc_cb); 1578 1579 stream->read_cb = read_cb; 1580 stream->alloc_cb = alloc_cb; 1581 1582 uv__io_start(stream->loop, &stream->io_watcher, POLLIN); 1583 uv__handle_start(stream); 1584 uv__stream_osx_interrupt_select(stream); 1585 1586 return 0; 1587 } 1588 1589 1590 int uv_read_stop(uv_stream_t* stream) { 1591 if (!(stream->flags & UV_HANDLE_READING)) 1592 return 0; 1593 1594 stream->flags &= ~UV_HANDLE_READING; 1595 uv__io_stop(stream->loop, &stream->io_watcher, POLLIN); 1596 if (!uv__io_active(&stream->io_watcher, POLLOUT)) 1597 uv__handle_stop(stream); 1598 uv__stream_osx_interrupt_select(stream); 1599 1600 stream->read_cb = NULL; 1601 stream->alloc_cb = NULL; 1602 return 0; 1603 } 1604 1605 1606 int uv_is_readable(const uv_stream_t* stream) { 1607 return !!(stream->flags & UV_HANDLE_READABLE); 1608 } 1609 1610 1611 int uv_is_writable(const uv_stream_t* stream) { 1612 return !!(stream->flags & UV_HANDLE_WRITABLE); 1613 } 1614 1615 1616 #if defined(__APPLE__) 1617 int uv___stream_fd(const uv_stream_t* handle) { 1618 const uv__stream_select_t* s; 1619 1620 assert(handle->type == UV_TCP || 1621 handle->type == UV_TTY || 1622 handle->type == UV_NAMED_PIPE); 1623 1624 s = handle->select; 1625 if (s != NULL) 1626 return s->fd; 1627 1628 return handle->io_watcher.fd; 1629 } 1630 #endif /* defined(__APPLE__) */ 1631 1632 1633 void uv__stream_close(uv_stream_t* handle) { 1634 unsigned int i; 1635 uv__stream_queued_fds_t* queued_fds; 1636 1637 #if defined(__APPLE__) 1638 /* Terminate select loop first */ 1639 if (handle->select != NULL) { 1640 uv__stream_select_t* s; 1641 1642 s = handle->select; 1643 1644 uv_sem_post(&s->close_sem); 1645 uv_sem_post(&s->async_sem); 1646 uv__stream_osx_interrupt_select(handle); 1647 uv_thread_join(&s->thread); 1648 uv_sem_destroy(&s->close_sem); 1649 uv_sem_destroy(&s->async_sem); 1650 uv__close(s->fake_fd); 1651 uv__close(s->int_fd); 1652 uv_close((uv_handle_t*) &s->async, uv__stream_osx_cb_close); 1653 1654 handle->select = NULL; 1655 } 1656 #endif /* defined(__APPLE__) */ 1657 1658 uv__io_close(handle->loop, &handle->io_watcher); 1659 uv_read_stop(handle); 1660 uv__handle_stop(handle); 1661 handle->flags &= ~(UV_HANDLE_READABLE | UV_HANDLE_WRITABLE); 1662 1663 if (handle->io_watcher.fd != -1) { 1664 /* Don't close stdio file descriptors. Nothing good comes from it. */ 1665 if (handle->io_watcher.fd > STDERR_FILENO) 1666 uv__close(handle->io_watcher.fd); 1667 handle->io_watcher.fd = -1; 1668 } 1669 1670 if (handle->accepted_fd != -1) { 1671 uv__close(handle->accepted_fd); 1672 handle->accepted_fd = -1; 1673 } 1674 1675 /* Close all queued fds */ 1676 if (handle->queued_fds != NULL) { 1677 queued_fds = handle->queued_fds; 1678 for (i = 0; i < queued_fds->offset; i++) 1679 uv__close(queued_fds->fds[i]); 1680 uv__free(handle->queued_fds); 1681 handle->queued_fds = NULL; 1682 } 1683 1684 assert(!uv__io_active(&handle->io_watcher, POLLIN | POLLOUT)); 1685 } 1686 1687 1688 int uv_stream_set_blocking(uv_stream_t* handle, int blocking) { 1689 /* Don't need to check the file descriptor, uv__nonblock() 1690 * will fail with EBADF if it's not valid. 1691 */ 1692 return uv__nonblock(uv__stream_fd(handle), !blocking); 1693 } 1694