1 /* Copyright Joyent, Inc. and other Node contributors. All rights reserved. 2 * 3 * Permission is hereby granted, free of charge, to any person obtaining a copy 4 * of this software and associated documentation files (the "Software"), to 5 * deal in the Software without restriction, including without limitation the 6 * rights to use, copy, modify, merge, publish, distribute, sublicense, and/or 7 * sell copies of the Software, and to permit persons to whom the Software is 8 * furnished to do so, subject to the following conditions: 9 * 10 * The above copyright notice and this permission notice shall be included in 11 * all copies or substantial portions of the Software. 12 * 13 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR 14 * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, 15 * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE 16 * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER 17 * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING 18 * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS 19 * IN THE SOFTWARE. 20 */ 21 22 #include "uv.h" 23 #include "task.h" 24 25 #include <stdio.h> 26 #include <string.h> 27 28 static uv_pipe_t channel; 29 static uv_tcp_t tcp_server; 30 static uv_tcp_t tcp_server2; 31 static uv_tcp_t tcp_connection; 32 33 static int exit_cb_called; 34 static int read_cb_called; 35 static int tcp_write_cb_called; 36 static int tcp_read_cb_called; 37 static int on_pipe_read_called; 38 static int local_conn_accepted; 39 static int remote_conn_accepted; 40 static int tcp_server_listening; 41 static uv_write_t write_req; 42 static uv_write_t write_req2; 43 static uv_write_t conn_notify_req; 44 static int close_cb_called; 45 static int connection_accepted; 46 static int tcp_conn_read_cb_called; 47 static int tcp_conn_write_cb_called; 48 static int closed_handle_data_read; 49 static int closed_handle_write; 50 static int send_zero_write; 51 52 typedef struct { 53 uv_connect_t conn_req; 54 uv_write_t tcp_write_req; 55 uv_tcp_t conn; 56 } tcp_conn; 57 58 #define CONN_COUNT 100 59 #define BACKLOG 128 60 #define LARGE_SIZE 100000 61 62 static uv_buf_t large_buf; 63 static char buffer[LARGE_SIZE]; 64 static uv_write_t write_reqs[300]; 65 static int write_reqs_completed; 66 67 static unsigned int write_until_data_queued(void); 68 static void send_handle_and_close(void); 69 70 71 static void close_server_conn_cb(uv_handle_t* handle) { 72 free(handle); 73 } 74 75 76 static void on_connection(uv_stream_t* server, int status) { 77 uv_tcp_t* conn; 78 int r; 79 80 if (!local_conn_accepted) { 81 /* Accept the connection and close it. Also and close the server. */ 82 ASSERT_EQ(status, 0); 83 ASSERT_PTR_EQ(&tcp_server, server); 84 85 conn = malloc(sizeof(*conn)); 86 ASSERT_NOT_NULL(conn); 87 r = uv_tcp_init(server->loop, conn); 88 ASSERT_EQ(r, 0); 89 90 r = uv_accept(server, (uv_stream_t*)conn); 91 ASSERT_EQ(r, 0); 92 93 uv_close((uv_handle_t*)conn, close_server_conn_cb); 94 uv_close((uv_handle_t*)server, NULL); 95 local_conn_accepted = 1; 96 } 97 } 98 99 100 static void exit_cb(uv_process_t* process, 101 int64_t exit_status, 102 int term_signal) { 103 printf("exit_cb\n"); 104 exit_cb_called++; 105 ASSERT_EQ(exit_status, 0); 106 ASSERT_EQ(term_signal, 0); 107 uv_close((uv_handle_t*)process, NULL); 108 } 109 110 111 static void on_alloc(uv_handle_t* handle, 112 size_t suggested_size, 113 uv_buf_t* buf) { 114 buf->base = malloc(suggested_size); 115 buf->len = suggested_size; 116 } 117 118 119 static void close_client_conn_cb(uv_handle_t* handle) { 120 tcp_conn* p = (tcp_conn*)handle->data; 121 free(p); 122 } 123 124 125 static void connect_cb(uv_connect_t* req, int status) { 126 uv_close((uv_handle_t*)req->handle, close_client_conn_cb); 127 } 128 129 130 static void make_many_connections(void) { 131 tcp_conn* conn; 132 struct sockaddr_in addr; 133 int r, i; 134 135 for (i = 0; i < CONN_COUNT; i++) { 136 conn = malloc(sizeof(*conn)); 137 ASSERT_NOT_NULL(conn); 138 139 r = uv_tcp_init(uv_default_loop(), &conn->conn); 140 ASSERT_EQ(r, 0); 141 ASSERT_EQ(0, uv_ip4_addr("127.0.0.1", TEST_PORT, &addr)); 142 143 r = uv_tcp_connect(&conn->conn_req, 144 (uv_tcp_t*) &conn->conn, 145 (const struct sockaddr*) &addr, 146 connect_cb); 147 ASSERT_EQ(r, 0); 148 149 conn->conn.data = conn; 150 } 151 } 152 153 154 static void on_read(uv_stream_t* handle, 155 ssize_t nread, 156 const uv_buf_t* buf) { 157 int r; 158 uv_pipe_t* pipe; 159 uv_handle_type pending; 160 uv_buf_t outbuf; 161 162 pipe = (uv_pipe_t*) handle; 163 164 if (nread == 0) { 165 /* Everything OK, but nothing read. */ 166 free(buf->base); 167 return; 168 } 169 170 if (nread < 0) { 171 if (nread == UV_EOF) { 172 free(buf->base); 173 return; 174 } 175 176 printf("error recving on channel: %s\n", uv_strerror(nread)); 177 abort(); 178 } 179 180 fprintf(stderr, "got %d bytes\n", (int)nread); 181 182 pending = uv_pipe_pending_type(pipe); 183 if (!tcp_server_listening) { 184 ASSERT_EQ(1, uv_pipe_pending_count(pipe)); 185 ASSERT_GT(nread, 0); 186 ASSERT_NOT_NULL(buf->base); 187 ASSERT_NE(pending, UV_UNKNOWN_HANDLE); 188 read_cb_called++; 189 190 /* Accept the pending TCP server, and start listening on it. */ 191 ASSERT_EQ(pending, UV_TCP); 192 r = uv_tcp_init(uv_default_loop(), &tcp_server); 193 ASSERT_EQ(r, 0); 194 195 r = uv_accept((uv_stream_t*)pipe, (uv_stream_t*)&tcp_server); 196 ASSERT_EQ(r, 0); 197 198 r = uv_listen((uv_stream_t*)&tcp_server, BACKLOG, on_connection); 199 ASSERT_EQ(r, 0); 200 201 tcp_server_listening = 1; 202 203 /* Make sure that the expected data is correctly multiplexed. */ 204 ASSERT_MEM_EQ("hello\n", buf->base, nread); 205 206 outbuf = uv_buf_init("world\n", 6); 207 r = uv_write(&write_req, (uv_stream_t*)pipe, &outbuf, 1, NULL); 208 ASSERT_EQ(r, 0); 209 210 /* Create a bunch of connections to get both servers to accept. */ 211 make_many_connections(); 212 } else if (memcmp("accepted_connection\n", buf->base, nread) == 0) { 213 /* Remote server has accepted a connection. Close the channel. */ 214 ASSERT_EQ(0, uv_pipe_pending_count(pipe)); 215 ASSERT_EQ(pending, UV_UNKNOWN_HANDLE); 216 remote_conn_accepted = 1; 217 uv_close((uv_handle_t*)&channel, NULL); 218 } 219 220 free(buf->base); 221 } 222 223 #ifdef _WIN32 224 static void on_read_listen_after_bound_twice(uv_stream_t* handle, 225 ssize_t nread, 226 const uv_buf_t* buf) { 227 int r; 228 uv_pipe_t* pipe; 229 uv_handle_type pending; 230 231 pipe = (uv_pipe_t*) handle; 232 233 if (nread == 0) { 234 /* Everything OK, but nothing read. */ 235 free(buf->base); 236 return; 237 } 238 239 if (nread < 0) { 240 if (nread == UV_EOF) { 241 free(buf->base); 242 return; 243 } 244 245 printf("error recving on channel: %s\n", uv_strerror(nread)); 246 abort(); 247 } 248 249 fprintf(stderr, "got %d bytes\n", (int)nread); 250 251 ASSERT_GT(uv_pipe_pending_count(pipe), 0); 252 pending = uv_pipe_pending_type(pipe); 253 ASSERT_GT(nread, 0); 254 ASSERT_NOT_NULL(buf->base); 255 ASSERT_NE(pending, UV_UNKNOWN_HANDLE); 256 read_cb_called++; 257 258 if (read_cb_called == 1) { 259 /* Accept the first TCP server, and start listening on it. */ 260 ASSERT_EQ(pending, UV_TCP); 261 r = uv_tcp_init(uv_default_loop(), &tcp_server); 262 ASSERT_EQ(r, 0); 263 264 r = uv_accept((uv_stream_t*)pipe, (uv_stream_t*)&tcp_server); 265 ASSERT_EQ(r, 0); 266 267 r = uv_listen((uv_stream_t*)&tcp_server, BACKLOG, on_connection); 268 ASSERT_EQ(r, 0); 269 } else if (read_cb_called == 2) { 270 /* Accept the second TCP server, and start listening on it. */ 271 ASSERT_EQ(pending, UV_TCP); 272 r = uv_tcp_init(uv_default_loop(), &tcp_server2); 273 ASSERT_EQ(r, 0); 274 275 r = uv_accept((uv_stream_t*)pipe, (uv_stream_t*)&tcp_server2); 276 ASSERT_EQ(r, 0); 277 278 r = uv_listen((uv_stream_t*)&tcp_server2, BACKLOG, on_connection); 279 ASSERT_EQ(r, UV_EADDRINUSE); 280 281 uv_close((uv_handle_t*)&tcp_server, NULL); 282 uv_close((uv_handle_t*)&tcp_server2, NULL); 283 ASSERT_EQ(0, uv_pipe_pending_count(pipe)); 284 uv_close((uv_handle_t*)&channel, NULL); 285 } 286 287 free(buf->base); 288 } 289 #endif 290 291 void spawn_helper(uv_pipe_t* channel, 292 uv_process_t* process, 293 const char* helper) { 294 uv_process_options_t options; 295 size_t exepath_size; 296 char exepath[1024]; 297 char* args[3]; 298 int r; 299 uv_stdio_container_t stdio[3]; 300 301 r = uv_pipe_init(uv_default_loop(), channel, 1); 302 ASSERT_EQ(r, 0); 303 ASSERT_NE(channel->ipc, 0); 304 305 exepath_size = sizeof(exepath); 306 r = uv_exepath(exepath, &exepath_size); 307 ASSERT_EQ(r, 0); 308 309 exepath[exepath_size] = '\0'; 310 args[0] = exepath; 311 args[1] = (char*)helper; 312 args[2] = NULL; 313 314 memset(&options, 0, sizeof(options)); 315 options.file = exepath; 316 options.args = args; 317 options.exit_cb = exit_cb; 318 options.stdio = stdio; 319 options.stdio_count = ARRAY_SIZE(stdio); 320 321 stdio[0].flags = UV_CREATE_PIPE | UV_READABLE_PIPE | UV_WRITABLE_PIPE; 322 stdio[0].data.stream = (uv_stream_t*) channel; 323 stdio[1].flags = UV_INHERIT_FD; 324 stdio[1].data.fd = 1; 325 stdio[2].flags = UV_INHERIT_FD; 326 stdio[2].data.fd = 2; 327 328 r = uv_spawn(uv_default_loop(), process, &options); 329 ASSERT_EQ(r, 0); 330 } 331 332 333 static void on_tcp_write(uv_write_t* req, int status) { 334 ASSERT_EQ(status, 0); 335 ASSERT_PTR_EQ(req->handle, &tcp_connection); 336 tcp_write_cb_called++; 337 } 338 339 340 static void on_read_alloc(uv_handle_t* handle, 341 size_t suggested_size, 342 uv_buf_t* buf) { 343 buf->base = malloc(suggested_size); 344 buf->len = suggested_size; 345 } 346 347 348 static void on_tcp_read(uv_stream_t* tcp, ssize_t nread, const uv_buf_t* buf) { 349 ASSERT_GT(nread, 0); 350 ASSERT_MEM_EQ("hello again\n", buf->base, nread); 351 ASSERT_PTR_EQ(tcp, &tcp_connection); 352 free(buf->base); 353 354 tcp_read_cb_called++; 355 356 uv_close((uv_handle_t*)tcp, NULL); 357 uv_close((uv_handle_t*)&channel, NULL); 358 } 359 360 361 static void on_read_connection(uv_stream_t* handle, 362 ssize_t nread, 363 const uv_buf_t* buf) { 364 int r; 365 uv_buf_t outbuf; 366 uv_pipe_t* pipe; 367 uv_handle_type pending; 368 369 pipe = (uv_pipe_t*) handle; 370 if (nread == 0) { 371 /* Everything OK, but nothing read. */ 372 free(buf->base); 373 return; 374 } 375 376 if (nread < 0) { 377 if (nread == UV_EOF) { 378 free(buf->base); 379 return; 380 } 381 382 printf("error recving on channel: %s\n", uv_strerror(nread)); 383 abort(); 384 } 385 386 fprintf(stderr, "got %d bytes\n", (int)nread); 387 388 ASSERT_EQ(1, uv_pipe_pending_count(pipe)); 389 pending = uv_pipe_pending_type(pipe); 390 391 ASSERT_GT(nread, 0); 392 ASSERT_NOT_NULL(buf->base); 393 ASSERT_NE(pending, UV_UNKNOWN_HANDLE); 394 read_cb_called++; 395 396 /* Accept the pending TCP connection */ 397 ASSERT_EQ(pending, UV_TCP); 398 r = uv_tcp_init(uv_default_loop(), &tcp_connection); 399 ASSERT_EQ(r, 0); 400 401 r = uv_accept(handle, (uv_stream_t*)&tcp_connection); 402 ASSERT_EQ(r, 0); 403 404 /* Make sure that the expected data is correctly multiplexed. */ 405 ASSERT_MEM_EQ("hello\n", buf->base, nread); 406 407 /* Write/read to/from the connection */ 408 outbuf = uv_buf_init("world\n", 6); 409 r = uv_write(&write_req, (uv_stream_t*)&tcp_connection, &outbuf, 1, 410 on_tcp_write); 411 ASSERT_EQ(r, 0); 412 413 r = uv_read_start((uv_stream_t*)&tcp_connection, on_read_alloc, on_tcp_read); 414 ASSERT_EQ(r, 0); 415 416 free(buf->base); 417 } 418 419 420 #ifndef _WIN32 421 static void on_read_closed_handle(uv_stream_t* handle, 422 ssize_t nread, 423 const uv_buf_t* buf) { 424 if (nread == 0 || nread == UV_EOF) { 425 free(buf->base); 426 return; 427 } 428 429 if (nread < 0) { 430 printf("error recving on channel: %s\n", uv_strerror(nread)); 431 abort(); 432 } 433 434 closed_handle_data_read += nread; 435 free(buf->base); 436 } 437 #endif 438 439 440 static void on_read_send_zero(uv_stream_t* handle, 441 ssize_t nread, 442 const uv_buf_t* buf) { 443 ASSERT(nread == 0 || nread == UV_EOF); 444 free(buf->base); 445 } 446 447 448 static int run_ipc_test(const char* helper, uv_read_cb read_cb) { 449 uv_process_t process; 450 int r; 451 452 spawn_helper(&channel, &process, helper); 453 uv_read_start((uv_stream_t*)&channel, on_alloc, read_cb); 454 455 r = uv_run(uv_default_loop(), UV_RUN_DEFAULT); 456 ASSERT_EQ(r, 0); 457 458 MAKE_VALGRIND_HAPPY(); 459 return 0; 460 } 461 462 463 TEST_IMPL(ipc_listen_before_write) { 464 #if defined(NO_SEND_HANDLE_ON_PIPE) 465 RETURN_SKIP(NO_SEND_HANDLE_ON_PIPE); 466 #endif 467 int r = run_ipc_test("ipc_helper_listen_before_write", on_read); 468 ASSERT_EQ(local_conn_accepted, 1); 469 ASSERT_EQ(remote_conn_accepted, 1); 470 ASSERT_EQ(read_cb_called, 1); 471 ASSERT_EQ(exit_cb_called, 1); 472 return r; 473 } 474 475 476 TEST_IMPL(ipc_listen_after_write) { 477 #if defined(NO_SEND_HANDLE_ON_PIPE) 478 RETURN_SKIP(NO_SEND_HANDLE_ON_PIPE); 479 #endif 480 int r = run_ipc_test("ipc_helper_listen_after_write", on_read); 481 ASSERT_EQ(local_conn_accepted, 1); 482 ASSERT_EQ(remote_conn_accepted, 1); 483 ASSERT_EQ(read_cb_called, 1); 484 ASSERT_EQ(exit_cb_called, 1); 485 return r; 486 } 487 488 489 TEST_IMPL(ipc_tcp_connection) { 490 #if defined(NO_SEND_HANDLE_ON_PIPE) 491 RETURN_SKIP(NO_SEND_HANDLE_ON_PIPE); 492 #endif 493 int r = run_ipc_test("ipc_helper_tcp_connection", on_read_connection); 494 ASSERT_EQ(read_cb_called, 1); 495 ASSERT_EQ(tcp_write_cb_called, 1); 496 ASSERT_EQ(tcp_read_cb_called, 1); 497 ASSERT_EQ(exit_cb_called, 1); 498 return r; 499 } 500 501 #ifndef _WIN32 502 TEST_IMPL(ipc_closed_handle) { 503 int r; 504 r = run_ipc_test("ipc_helper_closed_handle", on_read_closed_handle); 505 ASSERT_EQ(r, 0); 506 return 0; 507 } 508 #endif 509 510 511 #ifdef _WIN32 512 TEST_IMPL(listen_with_simultaneous_accepts) { 513 uv_tcp_t server; 514 int r; 515 struct sockaddr_in addr; 516 517 ASSERT_EQ(0, uv_ip4_addr("0.0.0.0", TEST_PORT, &addr)); 518 519 r = uv_tcp_init(uv_default_loop(), &server); 520 ASSERT_EQ(r, 0); 521 522 r = uv_tcp_bind(&server, (const struct sockaddr*) &addr, 0); 523 ASSERT_EQ(r, 0); 524 525 r = uv_tcp_simultaneous_accepts(&server, 1); 526 ASSERT_EQ(r, 0); 527 528 r = uv_listen((uv_stream_t*)&server, SOMAXCONN, NULL); 529 ASSERT_EQ(r, 0); 530 ASSERT_EQ(server.reqs_pending, 32); 531 532 MAKE_VALGRIND_HAPPY(); 533 return 0; 534 } 535 536 537 TEST_IMPL(listen_no_simultaneous_accepts) { 538 uv_tcp_t server; 539 int r; 540 struct sockaddr_in addr; 541 542 ASSERT_EQ(0, uv_ip4_addr("0.0.0.0", TEST_PORT, &addr)); 543 544 r = uv_tcp_init(uv_default_loop(), &server); 545 ASSERT_EQ(r, 0); 546 547 r = uv_tcp_bind(&server, (const struct sockaddr*) &addr, 0); 548 ASSERT_EQ(r, 0); 549 550 r = uv_tcp_simultaneous_accepts(&server, 0); 551 ASSERT_EQ(r, 0); 552 553 r = uv_listen((uv_stream_t*)&server, SOMAXCONN, NULL); 554 ASSERT_EQ(r, 0); 555 ASSERT_EQ(server.reqs_pending, 1); 556 557 MAKE_VALGRIND_HAPPY(); 558 return 0; 559 } 560 561 TEST_IMPL(ipc_listen_after_bind_twice) { 562 #if defined(NO_SEND_HANDLE_ON_PIPE) 563 RETURN_SKIP(NO_SEND_HANDLE_ON_PIPE); 564 #endif 565 int r = run_ipc_test("ipc_helper_bind_twice", on_read_listen_after_bound_twice); 566 ASSERT_EQ(read_cb_called, 2); 567 ASSERT_EQ(exit_cb_called, 1); 568 return r; 569 } 570 #endif 571 572 TEST_IMPL(ipc_send_zero) { 573 int r; 574 r = run_ipc_test("ipc_helper_send_zero", on_read_send_zero); 575 ASSERT_EQ(r, 0); 576 return 0; 577 } 578 579 580 /* Everything here runs in a child process. */ 581 582 static tcp_conn conn; 583 584 585 static void close_cb(uv_handle_t* handle) { 586 close_cb_called++; 587 } 588 589 590 static void conn_notify_write_cb(uv_write_t* req, int status) { 591 uv_close((uv_handle_t*)&tcp_server, close_cb); 592 uv_close((uv_handle_t*)&channel, close_cb); 593 } 594 595 596 static void tcp_connection_write_cb(uv_write_t* req, int status) { 597 ASSERT_PTR_EQ(&conn.conn, req->handle); 598 uv_close((uv_handle_t*)req->handle, close_cb); 599 uv_close((uv_handle_t*)&channel, close_cb); 600 uv_close((uv_handle_t*)&tcp_server, close_cb); 601 tcp_conn_write_cb_called++; 602 } 603 604 605 static void closed_handle_large_write_cb(uv_write_t* req, int status) { 606 ASSERT_EQ(status, 0); 607 ASSERT(closed_handle_data_read = LARGE_SIZE); 608 if (++write_reqs_completed == ARRAY_SIZE(write_reqs)) { 609 write_reqs_completed = 0; 610 if (write_until_data_queued() > 0) 611 send_handle_and_close(); 612 } 613 } 614 615 616 static void closed_handle_write_cb(uv_write_t* req, int status) { 617 ASSERT_EQ(status, UV_EBADF); 618 closed_handle_write = 1; 619 } 620 621 622 static void send_zero_write_cb(uv_write_t* req, int status) { 623 ASSERT_EQ(status, 0); 624 send_zero_write++; 625 } 626 627 static void on_tcp_child_process_read(uv_stream_t* tcp, 628 ssize_t nread, 629 const uv_buf_t* buf) { 630 uv_buf_t outbuf; 631 int r; 632 633 if (nread < 0) { 634 if (nread == UV_EOF) { 635 free(buf->base); 636 return; 637 } 638 639 printf("error recving on tcp connection: %s\n", uv_strerror(nread)); 640 abort(); 641 } 642 643 ASSERT_GT(nread, 0); 644 ASSERT_MEM_EQ("world\n", buf->base, nread); 645 on_pipe_read_called++; 646 free(buf->base); 647 648 /* Write to the socket */ 649 outbuf = uv_buf_init("hello again\n", 12); 650 r = uv_write(&conn.tcp_write_req, tcp, &outbuf, 1, tcp_connection_write_cb); 651 ASSERT_EQ(r, 0); 652 653 tcp_conn_read_cb_called++; 654 } 655 656 657 static void connect_child_process_cb(uv_connect_t* req, int status) { 658 int r; 659 660 ASSERT_EQ(status, 0); 661 r = uv_read_start(req->handle, on_read_alloc, on_tcp_child_process_read); 662 ASSERT_EQ(r, 0); 663 } 664 665 666 static void ipc_on_connection(uv_stream_t* server, int status) { 667 int r; 668 uv_buf_t buf; 669 670 if (!connection_accepted) { 671 /* 672 * Accept the connection and close it. Also let the other 673 * side know. 674 */ 675 ASSERT_EQ(status, 0); 676 ASSERT_PTR_EQ(&tcp_server, server); 677 678 r = uv_tcp_init(server->loop, &conn.conn); 679 ASSERT_EQ(r, 0); 680 681 r = uv_accept(server, (uv_stream_t*)&conn.conn); 682 ASSERT_EQ(r, 0); 683 684 uv_close((uv_handle_t*)&conn.conn, close_cb); 685 686 buf = uv_buf_init("accepted_connection\n", 20); 687 r = uv_write2(&conn_notify_req, (uv_stream_t*)&channel, &buf, 1, 688 NULL, conn_notify_write_cb); 689 ASSERT_EQ(r, 0); 690 691 connection_accepted = 1; 692 } 693 } 694 695 696 static void ipc_on_connection_tcp_conn(uv_stream_t* server, int status) { 697 int r; 698 uv_buf_t buf; 699 uv_tcp_t* conn; 700 701 ASSERT_EQ(status, 0); 702 ASSERT_PTR_EQ(&tcp_server, server); 703 704 conn = malloc(sizeof(*conn)); 705 ASSERT_NOT_NULL(conn); 706 707 r = uv_tcp_init(server->loop, conn); 708 ASSERT_EQ(r, 0); 709 710 r = uv_accept(server, (uv_stream_t*)conn); 711 ASSERT_EQ(r, 0); 712 713 /* Send the accepted connection to the other process */ 714 buf = uv_buf_init("hello\n", 6); 715 r = uv_write2(&conn_notify_req, (uv_stream_t*)&channel, &buf, 1, 716 (uv_stream_t*)conn, NULL); 717 ASSERT_EQ(r, 0); 718 719 r = uv_read_start((uv_stream_t*) conn, 720 on_read_alloc, 721 on_tcp_child_process_read); 722 ASSERT_EQ(r, 0); 723 724 uv_close((uv_handle_t*)conn, close_cb); 725 } 726 727 728 int ipc_helper(int listen_after_write) { 729 /* 730 * This is launched from test-ipc.c. stdin is a duplex channel that we 731 * over which a handle will be transmitted. 732 */ 733 struct sockaddr_in addr; 734 int r; 735 uv_buf_t buf; 736 737 ASSERT_EQ(0, uv_ip4_addr("0.0.0.0", TEST_PORT, &addr)); 738 739 r = uv_pipe_init(uv_default_loop(), &channel, 1); 740 ASSERT_EQ(r, 0); 741 742 uv_pipe_open(&channel, 0); 743 744 ASSERT_EQ(1, uv_is_readable((uv_stream_t*) &channel)); 745 ASSERT_EQ(1, uv_is_writable((uv_stream_t*) &channel)); 746 ASSERT_EQ(0, uv_is_closing((uv_handle_t*) &channel)); 747 748 r = uv_tcp_init(uv_default_loop(), &tcp_server); 749 ASSERT_EQ(r, 0); 750 751 r = uv_tcp_bind(&tcp_server, (const struct sockaddr*) &addr, 0); 752 ASSERT_EQ(r, 0); 753 754 if (!listen_after_write) { 755 r = uv_listen((uv_stream_t*)&tcp_server, BACKLOG, ipc_on_connection); 756 ASSERT_EQ(r, 0); 757 } 758 759 buf = uv_buf_init("hello\n", 6); 760 r = uv_write2(&write_req, (uv_stream_t*)&channel, &buf, 1, 761 (uv_stream_t*)&tcp_server, NULL); 762 ASSERT_EQ(r, 0); 763 764 if (listen_after_write) { 765 r = uv_listen((uv_stream_t*)&tcp_server, BACKLOG, ipc_on_connection); 766 ASSERT_EQ(r, 0); 767 } 768 769 notify_parent_process(); 770 r = uv_run(uv_default_loop(), UV_RUN_DEFAULT); 771 ASSERT_EQ(r, 0); 772 773 ASSERT_EQ(connection_accepted, 1); 774 ASSERT_EQ(close_cb_called, 3); 775 776 MAKE_VALGRIND_HAPPY(); 777 return 0; 778 } 779 780 781 int ipc_helper_tcp_connection(void) { 782 /* 783 * This is launched from test-ipc.c. stdin is a duplex channel 784 * over which a handle will be transmitted. 785 */ 786 787 int r; 788 struct sockaddr_in addr; 789 790 r = uv_pipe_init(uv_default_loop(), &channel, 1); 791 ASSERT_EQ(r, 0); 792 793 uv_pipe_open(&channel, 0); 794 795 ASSERT_EQ(1, uv_is_readable((uv_stream_t*) &channel)); 796 ASSERT_EQ(1, uv_is_writable((uv_stream_t*) &channel)); 797 ASSERT_EQ(0, uv_is_closing((uv_handle_t*) &channel)); 798 799 r = uv_tcp_init(uv_default_loop(), &tcp_server); 800 ASSERT_EQ(r, 0); 801 802 ASSERT_EQ(0, uv_ip4_addr("0.0.0.0", TEST_PORT, &addr)); 803 804 r = uv_tcp_bind(&tcp_server, (const struct sockaddr*) &addr, 0); 805 ASSERT_EQ(r, 0); 806 807 r = uv_listen((uv_stream_t*)&tcp_server, BACKLOG, ipc_on_connection_tcp_conn); 808 ASSERT_EQ(r, 0); 809 810 /* Make a connection to the server */ 811 r = uv_tcp_init(uv_default_loop(), &conn.conn); 812 ASSERT_EQ(r, 0); 813 814 ASSERT_EQ(0, uv_ip4_addr("127.0.0.1", TEST_PORT, &addr)); 815 816 r = uv_tcp_connect(&conn.conn_req, 817 (uv_tcp_t*) &conn.conn, 818 (const struct sockaddr*) &addr, 819 connect_child_process_cb); 820 ASSERT_EQ(r, 0); 821 822 r = uv_run(uv_default_loop(), UV_RUN_DEFAULT); 823 ASSERT_EQ(r, 0); 824 825 ASSERT_EQ(tcp_conn_read_cb_called, 1); 826 ASSERT_EQ(tcp_conn_write_cb_called, 1); 827 ASSERT_EQ(close_cb_called, 4); 828 829 MAKE_VALGRIND_HAPPY(); 830 return 0; 831 } 832 833 static unsigned int write_until_data_queued() { 834 unsigned int i; 835 int r; 836 837 i = 0; 838 do { 839 r = uv_write(&write_reqs[i], 840 (uv_stream_t*)&channel, 841 &large_buf, 842 1, 843 closed_handle_large_write_cb); 844 ASSERT_EQ(r, 0); 845 i++; 846 } while (channel.write_queue_size == 0 && 847 i < ARRAY_SIZE(write_reqs)); 848 849 return channel.write_queue_size; 850 } 851 852 static void send_handle_and_close() { 853 int r; 854 struct sockaddr_in addr; 855 856 r = uv_tcp_init(uv_default_loop(), &tcp_server); 857 ASSERT_EQ(r, 0); 858 859 ASSERT_EQ(0, uv_ip4_addr("0.0.0.0", TEST_PORT, &addr)); 860 861 r = uv_tcp_bind(&tcp_server, (const struct sockaddr*) &addr, 0); 862 ASSERT_EQ(r, 0); 863 864 r = uv_write2(&write_req, 865 (uv_stream_t*)&channel, 866 &large_buf, 867 1, 868 (uv_stream_t*)&tcp_server, 869 closed_handle_write_cb); 870 ASSERT_EQ(r, 0); 871 872 uv_close((uv_handle_t*)&tcp_server, NULL); 873 } 874 875 int ipc_helper_closed_handle(void) { 876 int r; 877 878 memset(buffer, '.', LARGE_SIZE); 879 large_buf = uv_buf_init(buffer, LARGE_SIZE); 880 881 r = uv_pipe_init(uv_default_loop(), &channel, 1); 882 ASSERT_EQ(r, 0); 883 884 uv_pipe_open(&channel, 0); 885 886 ASSERT_EQ(1, uv_is_readable((uv_stream_t*) &channel)); 887 ASSERT_EQ(1, uv_is_writable((uv_stream_t*) &channel)); 888 ASSERT_EQ(0, uv_is_closing((uv_handle_t*) &channel)); 889 890 if (write_until_data_queued() > 0) 891 send_handle_and_close(); 892 893 r = uv_run(uv_default_loop(), UV_RUN_DEFAULT); 894 ASSERT_EQ(r, 0); 895 896 ASSERT_EQ(closed_handle_write, 1); 897 898 MAKE_VALGRIND_HAPPY(); 899 return 0; 900 } 901 902 903 int ipc_helper_bind_twice(void) { 904 /* 905 * This is launched from test-ipc.c. stdin is a duplex channel 906 * over which two handles will be transmitted. 907 */ 908 struct sockaddr_in addr; 909 int r; 910 uv_buf_t buf; 911 912 ASSERT_EQ(0, uv_ip4_addr("0.0.0.0", TEST_PORT, &addr)); 913 914 r = uv_pipe_init(uv_default_loop(), &channel, 1); 915 ASSERT_EQ(r, 0); 916 917 uv_pipe_open(&channel, 0); 918 919 ASSERT_EQ(1, uv_is_readable((uv_stream_t*) &channel)); 920 ASSERT_EQ(1, uv_is_writable((uv_stream_t*) &channel)); 921 ASSERT_EQ(0, uv_is_closing((uv_handle_t*) &channel)); 922 923 buf = uv_buf_init("hello\n", 6); 924 925 r = uv_tcp_init(uv_default_loop(), &tcp_server); 926 ASSERT_EQ(r, 0); 927 r = uv_tcp_init(uv_default_loop(), &tcp_server2); 928 ASSERT_EQ(r, 0); 929 930 r = uv_tcp_bind(&tcp_server, (const struct sockaddr*) &addr, 0); 931 ASSERT_EQ(r, 0); 932 r = uv_tcp_bind(&tcp_server2, (const struct sockaddr*) &addr, 0); 933 ASSERT_EQ(r, 0); 934 935 r = uv_write2(&write_req, (uv_stream_t*)&channel, &buf, 1, 936 (uv_stream_t*)&tcp_server, NULL); 937 ASSERT_EQ(r, 0); 938 r = uv_write2(&write_req2, (uv_stream_t*)&channel, &buf, 1, 939 (uv_stream_t*)&tcp_server2, NULL); 940 ASSERT_EQ(r, 0); 941 942 r = uv_run(uv_default_loop(), UV_RUN_DEFAULT); 943 ASSERT_EQ(r, 0); 944 945 MAKE_VALGRIND_HAPPY(); 946 return 0; 947 } 948 949 int ipc_helper_send_zero(void) { 950 int r; 951 uv_buf_t zero_buf; 952 953 zero_buf = uv_buf_init(0, 0); 954 955 r = uv_pipe_init(uv_default_loop(), &channel, 0); 956 ASSERT_EQ(r, 0); 957 958 uv_pipe_open(&channel, 0); 959 960 ASSERT_EQ(1, uv_is_readable((uv_stream_t*) &channel)); 961 ASSERT_EQ(1, uv_is_writable((uv_stream_t*) &channel)); 962 ASSERT_EQ(0, uv_is_closing((uv_handle_t*) &channel)); 963 964 r = uv_write(&write_req, 965 (uv_stream_t*)&channel, 966 &zero_buf, 967 1, 968 send_zero_write_cb); 969 970 ASSERT_EQ(r, 0); 971 972 r = uv_run(uv_default_loop(), UV_RUN_DEFAULT); 973 ASSERT_EQ(r, 0); 974 975 ASSERT_EQ(send_zero_write, 1); 976 977 MAKE_VALGRIND_HAPPY(); 978 return 0; 979 } 980