xref: /netbsd-src/external/mit/libuv/dist/test/test-ipc.c (revision b5c47949a45ac972130c38cf13dfd8afb1f09285)
1 /* Copyright Joyent, Inc. and other Node contributors. All rights reserved.
2  *
3  * Permission is hereby granted, free of charge, to any person obtaining a copy
4  * of this software and associated documentation files (the "Software"), to
5  * deal in the Software without restriction, including without limitation the
6  * rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
7  * sell copies of the Software, and to permit persons to whom the Software is
8  * furnished to do so, subject to the following conditions:
9  *
10  * The above copyright notice and this permission notice shall be included in
11  * all copies or substantial portions of the Software.
12  *
13  * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
14  * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
15  * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
16  * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
17  * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
18  * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
19  * IN THE SOFTWARE.
20  */
21 
22 #include "uv.h"
23 #include "task.h"
24 
25 #include <stdio.h>
26 #include <string.h>
27 
28 static uv_pipe_t channel;
29 static uv_tcp_t tcp_server;
30 static uv_tcp_t tcp_server2;
31 static uv_tcp_t tcp_connection;
32 
33 static int exit_cb_called;
34 static int read_cb_called;
35 static int tcp_write_cb_called;
36 static int tcp_read_cb_called;
37 static int on_pipe_read_called;
38 static int local_conn_accepted;
39 static int remote_conn_accepted;
40 static int tcp_server_listening;
41 static uv_write_t write_req;
42 static uv_write_t write_req2;
43 static uv_write_t conn_notify_req;
44 static int close_cb_called;
45 static int connection_accepted;
46 static int tcp_conn_read_cb_called;
47 static int tcp_conn_write_cb_called;
48 static int closed_handle_data_read;
49 static int closed_handle_write;
50 static int send_zero_write;
51 
52 typedef struct {
53   uv_connect_t conn_req;
54   uv_write_t tcp_write_req;
55   uv_tcp_t conn;
56 } tcp_conn;
57 
58 #define CONN_COUNT 100
59 #define BACKLOG 128
60 #define LARGE_SIZE 100000
61 
62 static uv_buf_t large_buf;
63 static char buffer[LARGE_SIZE];
64 static uv_write_t write_reqs[300];
65 static int write_reqs_completed;
66 
67 static unsigned int write_until_data_queued(void);
68 static void send_handle_and_close(void);
69 
70 
71 static void close_server_conn_cb(uv_handle_t* handle) {
72   free(handle);
73 }
74 
75 
76 static void on_connection(uv_stream_t* server, int status) {
77   uv_tcp_t* conn;
78   int r;
79 
80   if (!local_conn_accepted) {
81     /* Accept the connection and close it.  Also and close the server. */
82     ASSERT_EQ(status, 0);
83     ASSERT_PTR_EQ(&tcp_server, server);
84 
85     conn = malloc(sizeof(*conn));
86     ASSERT_NOT_NULL(conn);
87     r = uv_tcp_init(server->loop, conn);
88     ASSERT_EQ(r, 0);
89 
90     r = uv_accept(server, (uv_stream_t*)conn);
91     ASSERT_EQ(r, 0);
92 
93     uv_close((uv_handle_t*)conn, close_server_conn_cb);
94     uv_close((uv_handle_t*)server, NULL);
95     local_conn_accepted = 1;
96   }
97 }
98 
99 
100 static void exit_cb(uv_process_t* process,
101                     int64_t exit_status,
102                     int term_signal) {
103   printf("exit_cb\n");
104   exit_cb_called++;
105   ASSERT_EQ(exit_status, 0);
106   ASSERT_EQ(term_signal, 0);
107   uv_close((uv_handle_t*)process, NULL);
108 }
109 
110 
111 static void on_alloc(uv_handle_t* handle,
112                      size_t suggested_size,
113                      uv_buf_t* buf) {
114   buf->base = malloc(suggested_size);
115   buf->len = suggested_size;
116 }
117 
118 
119 static void close_client_conn_cb(uv_handle_t* handle) {
120   tcp_conn* p = (tcp_conn*)handle->data;
121   free(p);
122 }
123 
124 
125 static void connect_cb(uv_connect_t* req, int status) {
126   uv_close((uv_handle_t*)req->handle, close_client_conn_cb);
127 }
128 
129 
130 static void make_many_connections(void) {
131   tcp_conn* conn;
132   struct sockaddr_in addr;
133   int r, i;
134 
135   for (i = 0; i < CONN_COUNT; i++) {
136     conn = malloc(sizeof(*conn));
137     ASSERT_NOT_NULL(conn);
138 
139     r = uv_tcp_init(uv_default_loop(), &conn->conn);
140     ASSERT_EQ(r, 0);
141     ASSERT_EQ(0, uv_ip4_addr("127.0.0.1", TEST_PORT, &addr));
142 
143     r = uv_tcp_connect(&conn->conn_req,
144                        (uv_tcp_t*) &conn->conn,
145                        (const struct sockaddr*) &addr,
146                        connect_cb);
147     ASSERT_EQ(r, 0);
148 
149     conn->conn.data = conn;
150   }
151 }
152 
153 
154 static void on_read(uv_stream_t* handle,
155                     ssize_t nread,
156                     const uv_buf_t* buf) {
157   int r;
158   uv_pipe_t* pipe;
159   uv_handle_type pending;
160   uv_buf_t outbuf;
161 
162   pipe = (uv_pipe_t*) handle;
163 
164   if (nread == 0) {
165     /* Everything OK, but nothing read. */
166     free(buf->base);
167     return;
168   }
169 
170   if (nread < 0) {
171     if (nread == UV_EOF) {
172       free(buf->base);
173       return;
174     }
175 
176     printf("error recving on channel: %s\n", uv_strerror(nread));
177     abort();
178   }
179 
180   fprintf(stderr, "got %d bytes\n", (int)nread);
181 
182   pending = uv_pipe_pending_type(pipe);
183   if (!tcp_server_listening) {
184     ASSERT_EQ(1, uv_pipe_pending_count(pipe));
185     ASSERT_GT(nread, 0);
186     ASSERT_NOT_NULL(buf->base);
187     ASSERT_NE(pending, UV_UNKNOWN_HANDLE);
188     read_cb_called++;
189 
190     /* Accept the pending TCP server, and start listening on it. */
191     ASSERT_EQ(pending, UV_TCP);
192     r = uv_tcp_init(uv_default_loop(), &tcp_server);
193     ASSERT_EQ(r, 0);
194 
195     r = uv_accept((uv_stream_t*)pipe, (uv_stream_t*)&tcp_server);
196     ASSERT_EQ(r, 0);
197 
198     r = uv_listen((uv_stream_t*)&tcp_server, BACKLOG, on_connection);
199     ASSERT_EQ(r, 0);
200 
201     tcp_server_listening = 1;
202 
203     /* Make sure that the expected data is correctly multiplexed. */
204     ASSERT_MEM_EQ("hello\n", buf->base, nread);
205 
206     outbuf = uv_buf_init("world\n", 6);
207     r = uv_write(&write_req, (uv_stream_t*)pipe, &outbuf, 1, NULL);
208     ASSERT_EQ(r, 0);
209 
210     /* Create a bunch of connections to get both servers to accept. */
211     make_many_connections();
212   } else if (memcmp("accepted_connection\n", buf->base, nread) == 0) {
213     /* Remote server has accepted a connection.  Close the channel. */
214     ASSERT_EQ(0, uv_pipe_pending_count(pipe));
215     ASSERT_EQ(pending, UV_UNKNOWN_HANDLE);
216     remote_conn_accepted = 1;
217     uv_close((uv_handle_t*)&channel, NULL);
218   }
219 
220   free(buf->base);
221 }
222 
223 #ifdef _WIN32
224 static void on_read_listen_after_bound_twice(uv_stream_t* handle,
225                                              ssize_t nread,
226                                              const uv_buf_t* buf) {
227   int r;
228   uv_pipe_t* pipe;
229   uv_handle_type pending;
230 
231   pipe = (uv_pipe_t*) handle;
232 
233   if (nread == 0) {
234     /* Everything OK, but nothing read. */
235     free(buf->base);
236     return;
237   }
238 
239   if (nread < 0) {
240     if (nread == UV_EOF) {
241       free(buf->base);
242       return;
243     }
244 
245     printf("error recving on channel: %s\n", uv_strerror(nread));
246     abort();
247   }
248 
249   fprintf(stderr, "got %d bytes\n", (int)nread);
250 
251   ASSERT_GT(uv_pipe_pending_count(pipe), 0);
252   pending = uv_pipe_pending_type(pipe);
253   ASSERT_GT(nread, 0);
254   ASSERT_NOT_NULL(buf->base);
255   ASSERT_NE(pending, UV_UNKNOWN_HANDLE);
256   read_cb_called++;
257 
258   if (read_cb_called == 1) {
259     /* Accept the first TCP server, and start listening on it. */
260     ASSERT_EQ(pending, UV_TCP);
261     r = uv_tcp_init(uv_default_loop(), &tcp_server);
262     ASSERT_EQ(r, 0);
263 
264     r = uv_accept((uv_stream_t*)pipe, (uv_stream_t*)&tcp_server);
265     ASSERT_EQ(r, 0);
266 
267     r = uv_listen((uv_stream_t*)&tcp_server, BACKLOG, on_connection);
268     ASSERT_EQ(r, 0);
269   } else if (read_cb_called == 2) {
270     /* Accept the second TCP server, and start listening on it. */
271     ASSERT_EQ(pending, UV_TCP);
272     r = uv_tcp_init(uv_default_loop(), &tcp_server2);
273     ASSERT_EQ(r, 0);
274 
275     r = uv_accept((uv_stream_t*)pipe, (uv_stream_t*)&tcp_server2);
276     ASSERT_EQ(r, 0);
277 
278     r = uv_listen((uv_stream_t*)&tcp_server2, BACKLOG, on_connection);
279     ASSERT_EQ(r, UV_EADDRINUSE);
280 
281     uv_close((uv_handle_t*)&tcp_server, NULL);
282     uv_close((uv_handle_t*)&tcp_server2, NULL);
283     ASSERT_EQ(0, uv_pipe_pending_count(pipe));
284     uv_close((uv_handle_t*)&channel, NULL);
285   }
286 
287   free(buf->base);
288 }
289 #endif
290 
291 void spawn_helper(uv_pipe_t* channel,
292                   uv_process_t* process,
293                   const char* helper) {
294   uv_process_options_t options;
295   size_t exepath_size;
296   char exepath[1024];
297   char* args[3];
298   int r;
299   uv_stdio_container_t stdio[3];
300 
301   r = uv_pipe_init(uv_default_loop(), channel, 1);
302   ASSERT_EQ(r, 0);
303   ASSERT_NE(channel->ipc, 0);
304 
305   exepath_size = sizeof(exepath);
306   r = uv_exepath(exepath, &exepath_size);
307   ASSERT_EQ(r, 0);
308 
309   exepath[exepath_size] = '\0';
310   args[0] = exepath;
311   args[1] = (char*)helper;
312   args[2] = NULL;
313 
314   memset(&options, 0, sizeof(options));
315   options.file = exepath;
316   options.args = args;
317   options.exit_cb = exit_cb;
318   options.stdio = stdio;
319   options.stdio_count = ARRAY_SIZE(stdio);
320 
321   stdio[0].flags = UV_CREATE_PIPE | UV_READABLE_PIPE | UV_WRITABLE_PIPE;
322   stdio[0].data.stream = (uv_stream_t*) channel;
323   stdio[1].flags = UV_INHERIT_FD;
324   stdio[1].data.fd = 1;
325   stdio[2].flags = UV_INHERIT_FD;
326   stdio[2].data.fd = 2;
327 
328   r = uv_spawn(uv_default_loop(), process, &options);
329   ASSERT_EQ(r, 0);
330 }
331 
332 
333 static void on_tcp_write(uv_write_t* req, int status) {
334   ASSERT_EQ(status, 0);
335   ASSERT_PTR_EQ(req->handle, &tcp_connection);
336   tcp_write_cb_called++;
337 }
338 
339 
340 static void on_read_alloc(uv_handle_t* handle,
341                           size_t suggested_size,
342                           uv_buf_t* buf) {
343   buf->base = malloc(suggested_size);
344   buf->len = suggested_size;
345 }
346 
347 
348 static void on_tcp_read(uv_stream_t* tcp, ssize_t nread, const uv_buf_t* buf) {
349   ASSERT_GT(nread, 0);
350   ASSERT_MEM_EQ("hello again\n", buf->base, nread);
351   ASSERT_PTR_EQ(tcp, &tcp_connection);
352   free(buf->base);
353 
354   tcp_read_cb_called++;
355 
356   uv_close((uv_handle_t*)tcp, NULL);
357   uv_close((uv_handle_t*)&channel, NULL);
358 }
359 
360 
361 static void on_read_connection(uv_stream_t* handle,
362                                ssize_t nread,
363                                const uv_buf_t* buf) {
364   int r;
365   uv_buf_t outbuf;
366   uv_pipe_t* pipe;
367   uv_handle_type pending;
368 
369   pipe = (uv_pipe_t*) handle;
370   if (nread == 0) {
371     /* Everything OK, but nothing read. */
372     free(buf->base);
373     return;
374   }
375 
376   if (nread < 0) {
377     if (nread == UV_EOF) {
378       free(buf->base);
379       return;
380     }
381 
382     printf("error recving on channel: %s\n", uv_strerror(nread));
383     abort();
384   }
385 
386   fprintf(stderr, "got %d bytes\n", (int)nread);
387 
388   ASSERT_EQ(1, uv_pipe_pending_count(pipe));
389   pending = uv_pipe_pending_type(pipe);
390 
391   ASSERT_GT(nread, 0);
392   ASSERT_NOT_NULL(buf->base);
393   ASSERT_NE(pending, UV_UNKNOWN_HANDLE);
394   read_cb_called++;
395 
396   /* Accept the pending TCP connection */
397   ASSERT_EQ(pending, UV_TCP);
398   r = uv_tcp_init(uv_default_loop(), &tcp_connection);
399   ASSERT_EQ(r, 0);
400 
401   r = uv_accept(handle, (uv_stream_t*)&tcp_connection);
402   ASSERT_EQ(r, 0);
403 
404   /* Make sure that the expected data is correctly multiplexed. */
405   ASSERT_MEM_EQ("hello\n", buf->base, nread);
406 
407   /* Write/read to/from the connection */
408   outbuf = uv_buf_init("world\n", 6);
409   r = uv_write(&write_req, (uv_stream_t*)&tcp_connection, &outbuf, 1,
410     on_tcp_write);
411   ASSERT_EQ(r, 0);
412 
413   r = uv_read_start((uv_stream_t*)&tcp_connection, on_read_alloc, on_tcp_read);
414   ASSERT_EQ(r, 0);
415 
416   free(buf->base);
417 }
418 
419 
420 #ifndef _WIN32
421 static void on_read_closed_handle(uv_stream_t* handle,
422                                   ssize_t nread,
423                                   const uv_buf_t* buf) {
424   if (nread == 0 || nread == UV_EOF) {
425     free(buf->base);
426     return;
427   }
428 
429   if (nread < 0) {
430     printf("error recving on channel: %s\n", uv_strerror(nread));
431     abort();
432   }
433 
434   closed_handle_data_read += nread;
435   free(buf->base);
436 }
437 #endif
438 
439 
440 static void on_read_send_zero(uv_stream_t* handle,
441                               ssize_t nread,
442                               const uv_buf_t* buf) {
443   ASSERT(nread == 0 || nread == UV_EOF);
444   free(buf->base);
445 }
446 
447 
448 static int run_ipc_test(const char* helper, uv_read_cb read_cb) {
449   uv_process_t process;
450   int r;
451 
452   spawn_helper(&channel, &process, helper);
453   uv_read_start((uv_stream_t*)&channel, on_alloc, read_cb);
454 
455   r = uv_run(uv_default_loop(), UV_RUN_DEFAULT);
456   ASSERT_EQ(r, 0);
457 
458   MAKE_VALGRIND_HAPPY();
459   return 0;
460 }
461 
462 
463 TEST_IMPL(ipc_listen_before_write) {
464 #if defined(NO_SEND_HANDLE_ON_PIPE)
465   RETURN_SKIP(NO_SEND_HANDLE_ON_PIPE);
466 #endif
467   int r = run_ipc_test("ipc_helper_listen_before_write", on_read);
468   ASSERT_EQ(local_conn_accepted, 1);
469   ASSERT_EQ(remote_conn_accepted, 1);
470   ASSERT_EQ(read_cb_called, 1);
471   ASSERT_EQ(exit_cb_called, 1);
472   return r;
473 }
474 
475 
476 TEST_IMPL(ipc_listen_after_write) {
477 #if defined(NO_SEND_HANDLE_ON_PIPE)
478   RETURN_SKIP(NO_SEND_HANDLE_ON_PIPE);
479 #endif
480   int r = run_ipc_test("ipc_helper_listen_after_write", on_read);
481   ASSERT_EQ(local_conn_accepted, 1);
482   ASSERT_EQ(remote_conn_accepted, 1);
483   ASSERT_EQ(read_cb_called, 1);
484   ASSERT_EQ(exit_cb_called, 1);
485   return r;
486 }
487 
488 
489 TEST_IMPL(ipc_tcp_connection) {
490 #if defined(NO_SEND_HANDLE_ON_PIPE)
491   RETURN_SKIP(NO_SEND_HANDLE_ON_PIPE);
492 #endif
493   int r = run_ipc_test("ipc_helper_tcp_connection", on_read_connection);
494   ASSERT_EQ(read_cb_called, 1);
495   ASSERT_EQ(tcp_write_cb_called, 1);
496   ASSERT_EQ(tcp_read_cb_called, 1);
497   ASSERT_EQ(exit_cb_called, 1);
498   return r;
499 }
500 
501 #ifndef _WIN32
502 TEST_IMPL(ipc_closed_handle) {
503   int r;
504   r = run_ipc_test("ipc_helper_closed_handle", on_read_closed_handle);
505   ASSERT_EQ(r, 0);
506   return 0;
507 }
508 #endif
509 
510 
511 #ifdef _WIN32
512 TEST_IMPL(listen_with_simultaneous_accepts) {
513   uv_tcp_t server;
514   int r;
515   struct sockaddr_in addr;
516 
517   ASSERT_EQ(0, uv_ip4_addr("0.0.0.0", TEST_PORT, &addr));
518 
519   r = uv_tcp_init(uv_default_loop(), &server);
520   ASSERT_EQ(r, 0);
521 
522   r = uv_tcp_bind(&server, (const struct sockaddr*) &addr, 0);
523   ASSERT_EQ(r, 0);
524 
525   r = uv_tcp_simultaneous_accepts(&server, 1);
526   ASSERT_EQ(r, 0);
527 
528   r = uv_listen((uv_stream_t*)&server, SOMAXCONN, NULL);
529   ASSERT_EQ(r, 0);
530   ASSERT_EQ(server.reqs_pending, 32);
531 
532   MAKE_VALGRIND_HAPPY();
533   return 0;
534 }
535 
536 
537 TEST_IMPL(listen_no_simultaneous_accepts) {
538   uv_tcp_t server;
539   int r;
540   struct sockaddr_in addr;
541 
542   ASSERT_EQ(0, uv_ip4_addr("0.0.0.0", TEST_PORT, &addr));
543 
544   r = uv_tcp_init(uv_default_loop(), &server);
545   ASSERT_EQ(r, 0);
546 
547   r = uv_tcp_bind(&server, (const struct sockaddr*) &addr, 0);
548   ASSERT_EQ(r, 0);
549 
550   r = uv_tcp_simultaneous_accepts(&server, 0);
551   ASSERT_EQ(r, 0);
552 
553   r = uv_listen((uv_stream_t*)&server, SOMAXCONN, NULL);
554   ASSERT_EQ(r, 0);
555   ASSERT_EQ(server.reqs_pending, 1);
556 
557   MAKE_VALGRIND_HAPPY();
558   return 0;
559 }
560 
561 TEST_IMPL(ipc_listen_after_bind_twice) {
562 #if defined(NO_SEND_HANDLE_ON_PIPE)
563   RETURN_SKIP(NO_SEND_HANDLE_ON_PIPE);
564 #endif
565   int r = run_ipc_test("ipc_helper_bind_twice", on_read_listen_after_bound_twice);
566   ASSERT_EQ(read_cb_called, 2);
567   ASSERT_EQ(exit_cb_called, 1);
568   return r;
569 }
570 #endif
571 
572 TEST_IMPL(ipc_send_zero) {
573   int r;
574   r = run_ipc_test("ipc_helper_send_zero", on_read_send_zero);
575   ASSERT_EQ(r, 0);
576   return 0;
577 }
578 
579 
580 /* Everything here runs in a child process. */
581 
582 static tcp_conn conn;
583 
584 
585 static void close_cb(uv_handle_t* handle) {
586   close_cb_called++;
587 }
588 
589 
590 static void conn_notify_write_cb(uv_write_t* req, int status) {
591   uv_close((uv_handle_t*)&tcp_server, close_cb);
592   uv_close((uv_handle_t*)&channel, close_cb);
593 }
594 
595 
596 static void tcp_connection_write_cb(uv_write_t* req, int status) {
597   ASSERT_PTR_EQ(&conn.conn, req->handle);
598   uv_close((uv_handle_t*)req->handle, close_cb);
599   uv_close((uv_handle_t*)&channel, close_cb);
600   uv_close((uv_handle_t*)&tcp_server, close_cb);
601   tcp_conn_write_cb_called++;
602 }
603 
604 
605 static void closed_handle_large_write_cb(uv_write_t* req, int status) {
606   ASSERT_EQ(status, 0);
607   ASSERT(closed_handle_data_read = LARGE_SIZE);
608   if (++write_reqs_completed == ARRAY_SIZE(write_reqs)) {
609     write_reqs_completed = 0;
610     if (write_until_data_queued() > 0)
611       send_handle_and_close();
612   }
613 }
614 
615 
616 static void closed_handle_write_cb(uv_write_t* req, int status) {
617   ASSERT_EQ(status, UV_EBADF);
618   closed_handle_write = 1;
619 }
620 
621 
622 static void send_zero_write_cb(uv_write_t* req, int status) {
623   ASSERT_EQ(status, 0);
624   send_zero_write++;
625 }
626 
627 static void on_tcp_child_process_read(uv_stream_t* tcp,
628                                       ssize_t nread,
629                                       const uv_buf_t* buf) {
630   uv_buf_t outbuf;
631   int r;
632 
633   if (nread < 0) {
634     if (nread == UV_EOF) {
635       free(buf->base);
636       return;
637     }
638 
639     printf("error recving on tcp connection: %s\n", uv_strerror(nread));
640     abort();
641   }
642 
643   ASSERT_GT(nread, 0);
644   ASSERT_MEM_EQ("world\n", buf->base, nread);
645   on_pipe_read_called++;
646   free(buf->base);
647 
648   /* Write to the socket */
649   outbuf = uv_buf_init("hello again\n", 12);
650   r = uv_write(&conn.tcp_write_req, tcp, &outbuf, 1, tcp_connection_write_cb);
651   ASSERT_EQ(r, 0);
652 
653   tcp_conn_read_cb_called++;
654 }
655 
656 
657 static void connect_child_process_cb(uv_connect_t* req, int status) {
658   int r;
659 
660   ASSERT_EQ(status, 0);
661   r = uv_read_start(req->handle, on_read_alloc, on_tcp_child_process_read);
662   ASSERT_EQ(r, 0);
663 }
664 
665 
666 static void ipc_on_connection(uv_stream_t* server, int status) {
667   int r;
668   uv_buf_t buf;
669 
670   if (!connection_accepted) {
671     /*
672      * Accept the connection and close it.  Also let the other
673      * side know.
674      */
675     ASSERT_EQ(status, 0);
676     ASSERT_PTR_EQ(&tcp_server, server);
677 
678     r = uv_tcp_init(server->loop, &conn.conn);
679     ASSERT_EQ(r, 0);
680 
681     r = uv_accept(server, (uv_stream_t*)&conn.conn);
682     ASSERT_EQ(r, 0);
683 
684     uv_close((uv_handle_t*)&conn.conn, close_cb);
685 
686     buf = uv_buf_init("accepted_connection\n", 20);
687     r = uv_write2(&conn_notify_req, (uv_stream_t*)&channel, &buf, 1,
688       NULL, conn_notify_write_cb);
689     ASSERT_EQ(r, 0);
690 
691     connection_accepted = 1;
692   }
693 }
694 
695 
696 static void ipc_on_connection_tcp_conn(uv_stream_t* server, int status) {
697   int r;
698   uv_buf_t buf;
699   uv_tcp_t* conn;
700 
701   ASSERT_EQ(status, 0);
702   ASSERT_PTR_EQ(&tcp_server, server);
703 
704   conn = malloc(sizeof(*conn));
705   ASSERT_NOT_NULL(conn);
706 
707   r = uv_tcp_init(server->loop, conn);
708   ASSERT_EQ(r, 0);
709 
710   r = uv_accept(server, (uv_stream_t*)conn);
711   ASSERT_EQ(r, 0);
712 
713   /* Send the accepted connection to the other process */
714   buf = uv_buf_init("hello\n", 6);
715   r = uv_write2(&conn_notify_req, (uv_stream_t*)&channel, &buf, 1,
716     (uv_stream_t*)conn, NULL);
717   ASSERT_EQ(r, 0);
718 
719   r = uv_read_start((uv_stream_t*) conn,
720                     on_read_alloc,
721                     on_tcp_child_process_read);
722   ASSERT_EQ(r, 0);
723 
724   uv_close((uv_handle_t*)conn, close_cb);
725 }
726 
727 
728 int ipc_helper(int listen_after_write) {
729   /*
730    * This is launched from test-ipc.c. stdin is a duplex channel that we
731    * over which a handle will be transmitted.
732    */
733   struct sockaddr_in addr;
734   int r;
735   uv_buf_t buf;
736 
737   ASSERT_EQ(0, uv_ip4_addr("0.0.0.0", TEST_PORT, &addr));
738 
739   r = uv_pipe_init(uv_default_loop(), &channel, 1);
740   ASSERT_EQ(r, 0);
741 
742   uv_pipe_open(&channel, 0);
743 
744   ASSERT_EQ(1, uv_is_readable((uv_stream_t*) &channel));
745   ASSERT_EQ(1, uv_is_writable((uv_stream_t*) &channel));
746   ASSERT_EQ(0, uv_is_closing((uv_handle_t*) &channel));
747 
748   r = uv_tcp_init(uv_default_loop(), &tcp_server);
749   ASSERT_EQ(r, 0);
750 
751   r = uv_tcp_bind(&tcp_server, (const struct sockaddr*) &addr, 0);
752   ASSERT_EQ(r, 0);
753 
754   if (!listen_after_write) {
755     r = uv_listen((uv_stream_t*)&tcp_server, BACKLOG, ipc_on_connection);
756     ASSERT_EQ(r, 0);
757   }
758 
759   buf = uv_buf_init("hello\n", 6);
760   r = uv_write2(&write_req, (uv_stream_t*)&channel, &buf, 1,
761       (uv_stream_t*)&tcp_server, NULL);
762   ASSERT_EQ(r, 0);
763 
764   if (listen_after_write) {
765     r = uv_listen((uv_stream_t*)&tcp_server, BACKLOG, ipc_on_connection);
766     ASSERT_EQ(r, 0);
767   }
768 
769   notify_parent_process();
770   r = uv_run(uv_default_loop(), UV_RUN_DEFAULT);
771   ASSERT_EQ(r, 0);
772 
773   ASSERT_EQ(connection_accepted, 1);
774   ASSERT_EQ(close_cb_called, 3);
775 
776   MAKE_VALGRIND_HAPPY();
777   return 0;
778 }
779 
780 
781 int ipc_helper_tcp_connection(void) {
782   /*
783    * This is launched from test-ipc.c. stdin is a duplex channel
784    * over which a handle will be transmitted.
785    */
786 
787   int r;
788   struct sockaddr_in addr;
789 
790   r = uv_pipe_init(uv_default_loop(), &channel, 1);
791   ASSERT_EQ(r, 0);
792 
793   uv_pipe_open(&channel, 0);
794 
795   ASSERT_EQ(1, uv_is_readable((uv_stream_t*) &channel));
796   ASSERT_EQ(1, uv_is_writable((uv_stream_t*) &channel));
797   ASSERT_EQ(0, uv_is_closing((uv_handle_t*) &channel));
798 
799   r = uv_tcp_init(uv_default_loop(), &tcp_server);
800   ASSERT_EQ(r, 0);
801 
802   ASSERT_EQ(0, uv_ip4_addr("0.0.0.0", TEST_PORT, &addr));
803 
804   r = uv_tcp_bind(&tcp_server, (const struct sockaddr*) &addr, 0);
805   ASSERT_EQ(r, 0);
806 
807   r = uv_listen((uv_stream_t*)&tcp_server, BACKLOG, ipc_on_connection_tcp_conn);
808   ASSERT_EQ(r, 0);
809 
810   /* Make a connection to the server */
811   r = uv_tcp_init(uv_default_loop(), &conn.conn);
812   ASSERT_EQ(r, 0);
813 
814   ASSERT_EQ(0, uv_ip4_addr("127.0.0.1", TEST_PORT, &addr));
815 
816   r = uv_tcp_connect(&conn.conn_req,
817                      (uv_tcp_t*) &conn.conn,
818                      (const struct sockaddr*) &addr,
819                      connect_child_process_cb);
820   ASSERT_EQ(r, 0);
821 
822   r = uv_run(uv_default_loop(), UV_RUN_DEFAULT);
823   ASSERT_EQ(r, 0);
824 
825   ASSERT_EQ(tcp_conn_read_cb_called, 1);
826   ASSERT_EQ(tcp_conn_write_cb_called, 1);
827   ASSERT_EQ(close_cb_called, 4);
828 
829   MAKE_VALGRIND_HAPPY();
830   return 0;
831 }
832 
833 static unsigned int write_until_data_queued() {
834   unsigned int i;
835   int r;
836 
837   i = 0;
838   do {
839     r = uv_write(&write_reqs[i],
840                  (uv_stream_t*)&channel,
841                  &large_buf,
842                  1,
843                  closed_handle_large_write_cb);
844     ASSERT_EQ(r, 0);
845     i++;
846   } while (channel.write_queue_size == 0 &&
847            i < ARRAY_SIZE(write_reqs));
848 
849   return channel.write_queue_size;
850 }
851 
852 static void send_handle_and_close() {
853   int r;
854   struct sockaddr_in addr;
855 
856   r = uv_tcp_init(uv_default_loop(), &tcp_server);
857   ASSERT_EQ(r, 0);
858 
859   ASSERT_EQ(0, uv_ip4_addr("0.0.0.0", TEST_PORT, &addr));
860 
861   r = uv_tcp_bind(&tcp_server, (const struct sockaddr*) &addr, 0);
862   ASSERT_EQ(r, 0);
863 
864   r = uv_write2(&write_req,
865                 (uv_stream_t*)&channel,
866                 &large_buf,
867                 1,
868                 (uv_stream_t*)&tcp_server,
869                 closed_handle_write_cb);
870   ASSERT_EQ(r, 0);
871 
872   uv_close((uv_handle_t*)&tcp_server, NULL);
873 }
874 
875 int ipc_helper_closed_handle(void) {
876   int r;
877 
878   memset(buffer, '.', LARGE_SIZE);
879   large_buf = uv_buf_init(buffer, LARGE_SIZE);
880 
881   r = uv_pipe_init(uv_default_loop(), &channel, 1);
882   ASSERT_EQ(r, 0);
883 
884   uv_pipe_open(&channel, 0);
885 
886   ASSERT_EQ(1, uv_is_readable((uv_stream_t*) &channel));
887   ASSERT_EQ(1, uv_is_writable((uv_stream_t*) &channel));
888   ASSERT_EQ(0, uv_is_closing((uv_handle_t*) &channel));
889 
890   if (write_until_data_queued() > 0)
891     send_handle_and_close();
892 
893   r = uv_run(uv_default_loop(), UV_RUN_DEFAULT);
894   ASSERT_EQ(r, 0);
895 
896   ASSERT_EQ(closed_handle_write, 1);
897 
898   MAKE_VALGRIND_HAPPY();
899   return 0;
900 }
901 
902 
903 int ipc_helper_bind_twice(void) {
904   /*
905    * This is launched from test-ipc.c. stdin is a duplex channel
906    * over which two handles will be transmitted.
907    */
908   struct sockaddr_in addr;
909   int r;
910   uv_buf_t buf;
911 
912   ASSERT_EQ(0, uv_ip4_addr("0.0.0.0", TEST_PORT, &addr));
913 
914   r = uv_pipe_init(uv_default_loop(), &channel, 1);
915   ASSERT_EQ(r, 0);
916 
917   uv_pipe_open(&channel, 0);
918 
919   ASSERT_EQ(1, uv_is_readable((uv_stream_t*) &channel));
920   ASSERT_EQ(1, uv_is_writable((uv_stream_t*) &channel));
921   ASSERT_EQ(0, uv_is_closing((uv_handle_t*) &channel));
922 
923   buf = uv_buf_init("hello\n", 6);
924 
925   r = uv_tcp_init(uv_default_loop(), &tcp_server);
926   ASSERT_EQ(r, 0);
927   r = uv_tcp_init(uv_default_loop(), &tcp_server2);
928   ASSERT_EQ(r, 0);
929 
930   r = uv_tcp_bind(&tcp_server, (const struct sockaddr*) &addr, 0);
931   ASSERT_EQ(r, 0);
932   r = uv_tcp_bind(&tcp_server2, (const struct sockaddr*) &addr, 0);
933   ASSERT_EQ(r, 0);
934 
935   r = uv_write2(&write_req, (uv_stream_t*)&channel, &buf, 1,
936                 (uv_stream_t*)&tcp_server, NULL);
937   ASSERT_EQ(r, 0);
938   r = uv_write2(&write_req2, (uv_stream_t*)&channel, &buf, 1,
939                 (uv_stream_t*)&tcp_server2, NULL);
940   ASSERT_EQ(r, 0);
941 
942   r = uv_run(uv_default_loop(), UV_RUN_DEFAULT);
943   ASSERT_EQ(r, 0);
944 
945   MAKE_VALGRIND_HAPPY();
946   return 0;
947 }
948 
949 int ipc_helper_send_zero(void) {
950   int r;
951   uv_buf_t zero_buf;
952 
953   zero_buf = uv_buf_init(0, 0);
954 
955   r = uv_pipe_init(uv_default_loop(), &channel, 0);
956   ASSERT_EQ(r, 0);
957 
958   uv_pipe_open(&channel, 0);
959 
960   ASSERT_EQ(1, uv_is_readable((uv_stream_t*) &channel));
961   ASSERT_EQ(1, uv_is_writable((uv_stream_t*) &channel));
962   ASSERT_EQ(0, uv_is_closing((uv_handle_t*) &channel));
963 
964   r = uv_write(&write_req,
965                (uv_stream_t*)&channel,
966                &zero_buf,
967                1,
968                send_zero_write_cb);
969 
970   ASSERT_EQ(r, 0);
971 
972   r = uv_run(uv_default_loop(), UV_RUN_DEFAULT);
973   ASSERT_EQ(r, 0);
974 
975   ASSERT_EQ(send_zero_write, 1);
976 
977   MAKE_VALGRIND_HAPPY();
978   return 0;
979 }
980