1 /* Copyright Joyent, Inc. and other Node contributors. All rights reserved.
2  *
3  * Permission is hereby granted, free of charge, to any person obtaining a copy
4  * of this software and associated documentation files (the "Software"), to
5  * deal in the Software without restriction, including without limitation the
6  * rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
7  * sell copies of the Software, and to permit persons to whom the Software is
8  * furnished to do so, subject to the following conditions:
9  *
10  * The above copyright notice and this permission notice shall be included in
11  * all copies or substantial portions of the Software.
12  *
13  * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
14  * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
15  * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
16  * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
17  * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
18  * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
19  * IN THE SOFTWARE.
20  */
21 
22 #include <assert.h>
23 #include <io.h>
24 #include <stdio.h>
25 #include <stdlib.h>
26 #include <string.h>
27 
28 #include "handle-inl.h"
29 #include "internal.h"
30 #include "req-inl.h"
31 #include "stream-inl.h"
32 #include "uv-common.h"
33 #include "uv.h"
34 
35 #include <aclapi.h>
36 #include <accctrl.h>
37 
38 /* A zero-size buffer for use by uv_pipe_read */
39 static char uv_zero_[] = "";
40 
41 /* Null uv_buf_t */
42 static const uv_buf_t uv_null_buf_ = { 0, NULL };
43 
44 /* The timeout that the pipe will wait for the remote end to write data when
45  * the local ends wants to shut it down. */
46 static const int64_t eof_timeout = 50; /* ms */
47 
48 static const int default_pending_pipe_instances = 4;
49 
50 /* Pipe prefix */
51 static char pipe_prefix[] = "\\\\?\\pipe";
52 static const int pipe_prefix_len = sizeof(pipe_prefix) - 1;
53 
54 /* IPC incoming xfer queue item. */
55 typedef struct {
56   uv__ipc_socket_xfer_type_t xfer_type;
57   uv__ipc_socket_xfer_info_t xfer_info;
58   QUEUE member;
59 } uv__ipc_xfer_queue_item_t;
60 
61 /* IPC frame header flags. */
62 /* clang-format off */
63 enum {
64   UV__IPC_FRAME_HAS_DATA                = 0x01,
65   UV__IPC_FRAME_HAS_SOCKET_XFER         = 0x02,
66   UV__IPC_FRAME_XFER_IS_TCP_CONNECTION  = 0x04,
67   /* These are combinations of the flags above. */
68   UV__IPC_FRAME_XFER_FLAGS              = 0x06,
69   UV__IPC_FRAME_VALID_FLAGS             = 0x07
70 };
71 /* clang-format on */
72 
73 /* IPC frame header. */
74 typedef struct {
75   uint32_t flags;
76   uint32_t reserved1;   /* Ignored. */
77   uint32_t data_length; /* Must be zero if there is no data. */
78   uint32_t reserved2;   /* Must be zero. */
79 } uv__ipc_frame_header_t;
80 
81 /* To implement the IPC protocol correctly, these structures must have exactly
82  * the right size. */
83 STATIC_ASSERT(sizeof(uv__ipc_frame_header_t) == 16);
84 STATIC_ASSERT(sizeof(uv__ipc_socket_xfer_info_t) == 632);
85 
86 /* Coalesced write request. */
87 typedef struct {
88   uv_write_t req;       /* Internal heap-allocated write request. */
89   uv_write_t* user_req; /* Pointer to user-specified uv_write_t. */
90 } uv__coalesced_write_t;
91 
92 
93 static void eof_timer_init(uv_pipe_t* pipe);
94 static void eof_timer_start(uv_pipe_t* pipe);
95 static void eof_timer_stop(uv_pipe_t* pipe);
96 static void eof_timer_cb(uv_timer_t* timer);
97 static void eof_timer_destroy(uv_pipe_t* pipe);
98 static void eof_timer_close_cb(uv_handle_t* handle);
99 
100 
uv__unique_pipe_name(char * ptr,char * name,size_t size)101 static void uv__unique_pipe_name(char* ptr, char* name, size_t size) {
102   snprintf(name, size, "\\\\?\\pipe\\uv\\%p-%lu", ptr, GetCurrentProcessId());
103 }
104 
105 
uv_pipe_init(uv_loop_t * loop,uv_pipe_t * handle,int ipc)106 int uv_pipe_init(uv_loop_t* loop, uv_pipe_t* handle, int ipc) {
107   uv__stream_init(loop, (uv_stream_t*)handle, UV_NAMED_PIPE);
108 
109   handle->reqs_pending = 0;
110   handle->handle = INVALID_HANDLE_VALUE;
111   handle->name = NULL;
112   handle->pipe.conn.ipc_remote_pid = 0;
113   handle->pipe.conn.ipc_data_frame.payload_remaining = 0;
114   QUEUE_INIT(&handle->pipe.conn.ipc_xfer_queue);
115   handle->pipe.conn.ipc_xfer_queue_length = 0;
116   handle->ipc = ipc;
117   handle->pipe.conn.non_overlapped_writes_tail = NULL;
118 
119   return 0;
120 }
121 
122 
uv__pipe_connection_init(uv_pipe_t * handle)123 static void uv__pipe_connection_init(uv_pipe_t* handle) {
124   assert(!(handle->flags & UV_HANDLE_PIPESERVER));
125   uv__connection_init((uv_stream_t*) handle);
126   handle->read_req.data = handle;
127   handle->pipe.conn.eof_timer = NULL;
128 }
129 
130 
open_named_pipe(const WCHAR * name,DWORD * duplex_flags)131 static HANDLE open_named_pipe(const WCHAR* name, DWORD* duplex_flags) {
132   HANDLE pipeHandle;
133 
134   /*
135    * Assume that we have a duplex pipe first, so attempt to
136    * connect with GENERIC_READ | GENERIC_WRITE.
137    */
138   pipeHandle = CreateFileW(name,
139                            GENERIC_READ | GENERIC_WRITE,
140                            0,
141                            NULL,
142                            OPEN_EXISTING,
143                            FILE_FLAG_OVERLAPPED,
144                            NULL);
145   if (pipeHandle != INVALID_HANDLE_VALUE) {
146     *duplex_flags = UV_HANDLE_READABLE | UV_HANDLE_WRITABLE;
147     return pipeHandle;
148   }
149 
150   /*
151    * If the pipe is not duplex CreateFileW fails with
152    * ERROR_ACCESS_DENIED.  In that case try to connect
153    * as a read-only or write-only.
154    */
155   if (GetLastError() == ERROR_ACCESS_DENIED) {
156     pipeHandle = CreateFileW(name,
157                              GENERIC_READ | FILE_WRITE_ATTRIBUTES,
158                              0,
159                              NULL,
160                              OPEN_EXISTING,
161                              FILE_FLAG_OVERLAPPED,
162                              NULL);
163 
164     if (pipeHandle != INVALID_HANDLE_VALUE) {
165       *duplex_flags = UV_HANDLE_READABLE;
166       return pipeHandle;
167     }
168   }
169 
170   if (GetLastError() == ERROR_ACCESS_DENIED) {
171     pipeHandle = CreateFileW(name,
172                              GENERIC_WRITE | FILE_READ_ATTRIBUTES,
173                              0,
174                              NULL,
175                              OPEN_EXISTING,
176                              FILE_FLAG_OVERLAPPED,
177                              NULL);
178 
179     if (pipeHandle != INVALID_HANDLE_VALUE) {
180       *duplex_flags = UV_HANDLE_WRITABLE;
181       return pipeHandle;
182     }
183   }
184 
185   return INVALID_HANDLE_VALUE;
186 }
187 
188 
close_pipe(uv_pipe_t * pipe)189 static void close_pipe(uv_pipe_t* pipe) {
190   assert(pipe->u.fd == -1 || pipe->u.fd > 2);
191   if (pipe->u.fd == -1)
192     CloseHandle(pipe->handle);
193   else
194     close(pipe->u.fd);
195 
196   pipe->u.fd = -1;
197   pipe->handle = INVALID_HANDLE_VALUE;
198 }
199 
200 
uv__pipe_server(HANDLE * pipeHandle_ptr,DWORD access,char * name,size_t nameSize,char * random)201 static int uv__pipe_server(
202     HANDLE* pipeHandle_ptr, DWORD access,
203     char* name, size_t nameSize, char* random) {
204   HANDLE pipeHandle;
205   int err;
206 
207   for (;;) {
208     uv__unique_pipe_name(random, name, nameSize);
209 
210     pipeHandle = CreateNamedPipeA(name,
211       access | FILE_FLAG_FIRST_PIPE_INSTANCE,
212       PIPE_TYPE_BYTE | PIPE_READMODE_BYTE | PIPE_WAIT, 1, 65536, 65536, 0,
213       NULL);
214 
215     if (pipeHandle != INVALID_HANDLE_VALUE) {
216       /* No name collisions.  We're done. */
217       break;
218     }
219 
220     err = GetLastError();
221     if (err != ERROR_PIPE_BUSY && err != ERROR_ACCESS_DENIED) {
222       goto error;
223     }
224 
225     /* Pipe name collision.  Increment the random number and try again. */
226     random++;
227   }
228 
229   *pipeHandle_ptr = pipeHandle;
230 
231   return 0;
232 
233  error:
234   if (pipeHandle != INVALID_HANDLE_VALUE)
235     CloseHandle(pipeHandle);
236 
237   return err;
238 }
239 
240 
uv__create_pipe_pair(HANDLE * server_pipe_ptr,HANDLE * client_pipe_ptr,unsigned int server_flags,unsigned int client_flags,int inherit_client,char * random)241 static int uv__create_pipe_pair(
242     HANDLE* server_pipe_ptr, HANDLE* client_pipe_ptr,
243     unsigned int server_flags, unsigned int client_flags,
244     int inherit_client, char* random) {
245   /* allowed flags are: UV_READABLE_PIPE | UV_WRITABLE_PIPE | UV_NONBLOCK_PIPE */
246   char pipe_name[64];
247   SECURITY_ATTRIBUTES sa;
248   DWORD server_access;
249   DWORD client_access;
250   HANDLE server_pipe;
251   HANDLE client_pipe;
252   int err;
253 
254   server_pipe = INVALID_HANDLE_VALUE;
255   client_pipe = INVALID_HANDLE_VALUE;
256 
257   server_access = 0;
258   if (server_flags & UV_READABLE_PIPE)
259     server_access |= PIPE_ACCESS_INBOUND;
260   if (server_flags & UV_WRITABLE_PIPE)
261     server_access |= PIPE_ACCESS_OUTBOUND;
262   if (server_flags & UV_NONBLOCK_PIPE)
263     server_access |= FILE_FLAG_OVERLAPPED;
264   server_access |= WRITE_DAC;
265 
266   client_access = 0;
267   if (client_flags & UV_READABLE_PIPE)
268     client_access |= GENERIC_READ;
269   else
270     client_access |= FILE_READ_ATTRIBUTES;
271   if (client_flags & UV_WRITABLE_PIPE)
272     client_access |= GENERIC_WRITE;
273   else
274     client_access |= FILE_WRITE_ATTRIBUTES;
275   client_access |= WRITE_DAC;
276 
277   /* Create server pipe handle. */
278   err = uv__pipe_server(&server_pipe,
279                         server_access,
280                         pipe_name,
281                         sizeof(pipe_name),
282                         random);
283   if (err)
284     goto error;
285 
286   /* Create client pipe handle. */
287   sa.nLength = sizeof sa;
288   sa.lpSecurityDescriptor = NULL;
289   sa.bInheritHandle = inherit_client;
290 
291   client_pipe = CreateFileA(pipe_name,
292                             client_access,
293                             0,
294                             &sa,
295                             OPEN_EXISTING,
296                             (client_flags & UV_NONBLOCK_PIPE) ? FILE_FLAG_OVERLAPPED : 0,
297                             NULL);
298   if (client_pipe == INVALID_HANDLE_VALUE) {
299     err = GetLastError();
300     goto error;
301   }
302 
303 #ifndef NDEBUG
304   /* Validate that the pipe was opened in the right mode. */
305   {
306     DWORD mode;
307     BOOL r;
308     r = GetNamedPipeHandleState(client_pipe, &mode, NULL, NULL, NULL, NULL, 0);
309     if (r == TRUE) {
310       assert(mode == (PIPE_READMODE_BYTE | PIPE_WAIT));
311     } else {
312       fprintf(stderr, "libuv assertion failure: GetNamedPipeHandleState failed\n");
313     }
314   }
315 #endif
316 
317   /* Do a blocking ConnectNamedPipe.  This should not block because we have
318    * both ends of the pipe created. */
319   if (!ConnectNamedPipe(server_pipe, NULL)) {
320     if (GetLastError() != ERROR_PIPE_CONNECTED) {
321       err = GetLastError();
322       goto error;
323     }
324   }
325 
326   *client_pipe_ptr = client_pipe;
327   *server_pipe_ptr = server_pipe;
328   return 0;
329 
330  error:
331   if (server_pipe != INVALID_HANDLE_VALUE)
332     CloseHandle(server_pipe);
333 
334   if (client_pipe != INVALID_HANDLE_VALUE)
335     CloseHandle(client_pipe);
336 
337   return err;
338 }
339 
340 
uv_pipe(uv_file fds[2],int read_flags,int write_flags)341 int uv_pipe(uv_file fds[2], int read_flags, int write_flags) {
342   uv_file temp[2];
343   int err;
344   HANDLE readh;
345   HANDLE writeh;
346 
347   /* Make the server side the inbound (read) end, */
348   /* so that both ends will have FILE_READ_ATTRIBUTES permission. */
349   /* TODO: better source of local randomness than &fds? */
350   read_flags |= UV_READABLE_PIPE;
351   write_flags |= UV_WRITABLE_PIPE;
352   err = uv__create_pipe_pair(&readh, &writeh, read_flags, write_flags, 0, (char*) &fds[0]);
353   if (err != 0)
354     return err;
355   temp[0] = _open_osfhandle((intptr_t) readh, 0);
356   if (temp[0] == -1) {
357     if (errno == UV_EMFILE)
358       err = UV_EMFILE;
359     else
360       err = UV_UNKNOWN;
361     CloseHandle(readh);
362     CloseHandle(writeh);
363     return err;
364   }
365   temp[1] = _open_osfhandle((intptr_t) writeh, 0);
366   if (temp[1] == -1) {
367     if (errno == UV_EMFILE)
368       err = UV_EMFILE;
369     else
370       err = UV_UNKNOWN;
371     _close(temp[0]);
372     CloseHandle(writeh);
373     return err;
374   }
375   fds[0] = temp[0];
376   fds[1] = temp[1];
377   return 0;
378 }
379 
380 
uv__create_stdio_pipe_pair(uv_loop_t * loop,uv_pipe_t * parent_pipe,HANDLE * child_pipe_ptr,unsigned int flags)381 int uv__create_stdio_pipe_pair(uv_loop_t* loop,
382     uv_pipe_t* parent_pipe, HANDLE* child_pipe_ptr, unsigned int flags) {
383   /* The parent_pipe is always the server_pipe and kept by libuv.
384    * The child_pipe is always the client_pipe and is passed to the child.
385    * The flags are specified with respect to their usage in the child. */
386   HANDLE server_pipe;
387   HANDLE client_pipe;
388   unsigned int server_flags;
389   unsigned int client_flags;
390   int err;
391 
392   uv__pipe_connection_init(parent_pipe);
393 
394   server_pipe = INVALID_HANDLE_VALUE;
395   client_pipe = INVALID_HANDLE_VALUE;
396 
397   server_flags = 0;
398   client_flags = 0;
399   if (flags & UV_READABLE_PIPE) {
400     /* The server needs inbound (read) access too, otherwise CreateNamedPipe()
401      * won't give us the FILE_READ_ATTRIBUTES permission. We need that to probe
402      * the state of the write buffer when we're trying to shutdown the pipe. */
403     server_flags |= UV_READABLE_PIPE | UV_WRITABLE_PIPE;
404     client_flags |= UV_READABLE_PIPE;
405   }
406   if (flags & UV_WRITABLE_PIPE) {
407     server_flags |= UV_READABLE_PIPE;
408     client_flags |= UV_WRITABLE_PIPE;
409   }
410   server_flags |= UV_NONBLOCK_PIPE;
411   if (flags & UV_NONBLOCK_PIPE || parent_pipe->ipc) {
412     client_flags |= UV_NONBLOCK_PIPE;
413   }
414 
415   err = uv__create_pipe_pair(&server_pipe, &client_pipe,
416           server_flags, client_flags, 1, (char*) server_pipe);
417   if (err)
418     goto error;
419 
420   if (CreateIoCompletionPort(server_pipe,
421                              loop->iocp,
422                              (ULONG_PTR) parent_pipe,
423                              0) == NULL) {
424     err = GetLastError();
425     goto error;
426   }
427 
428   parent_pipe->handle = server_pipe;
429   *child_pipe_ptr = client_pipe;
430 
431   /* The server end is now readable and/or writable. */
432   if (flags & UV_READABLE_PIPE)
433     parent_pipe->flags |= UV_HANDLE_WRITABLE;
434   if (flags & UV_WRITABLE_PIPE)
435     parent_pipe->flags |= UV_HANDLE_READABLE;
436 
437   return 0;
438 
439  error:
440   if (server_pipe != INVALID_HANDLE_VALUE)
441     CloseHandle(server_pipe);
442 
443   if (client_pipe != INVALID_HANDLE_VALUE)
444     CloseHandle(client_pipe);
445 
446   return err;
447 }
448 
449 
uv__set_pipe_handle(uv_loop_t * loop,uv_pipe_t * handle,HANDLE pipeHandle,int fd,DWORD duplex_flags)450 static int uv__set_pipe_handle(uv_loop_t* loop,
451                                uv_pipe_t* handle,
452                                HANDLE pipeHandle,
453                                int fd,
454                                DWORD duplex_flags) {
455   NTSTATUS nt_status;
456   IO_STATUS_BLOCK io_status;
457   FILE_MODE_INFORMATION mode_info;
458   DWORD mode = PIPE_READMODE_BYTE | PIPE_WAIT;
459   DWORD current_mode = 0;
460   DWORD err = 0;
461 
462   assert(handle->flags & UV_HANDLE_CONNECTION);
463   assert(!(handle->flags & UV_HANDLE_PIPESERVER));
464   if (handle->flags & UV_HANDLE_CLOSING)
465     return UV_EINVAL;
466   if (handle->handle != INVALID_HANDLE_VALUE)
467     return UV_EBUSY;
468 
469   if (!SetNamedPipeHandleState(pipeHandle, &mode, NULL, NULL)) {
470     err = GetLastError();
471     if (err == ERROR_ACCESS_DENIED) {
472       /*
473        * SetNamedPipeHandleState can fail if the handle doesn't have either
474        * GENERIC_WRITE  or FILE_WRITE_ATTRIBUTES.
475        * But if the handle already has the desired wait and blocking modes
476        * we can continue.
477        */
478       if (!GetNamedPipeHandleState(pipeHandle, ¤t_mode, NULL, NULL,
479                                    NULL, NULL, 0)) {
480         return uv_translate_sys_error(GetLastError());
481       } else if (current_mode & PIPE_NOWAIT) {
482         return UV_EACCES;
483       }
484     } else {
485       /* If this returns ERROR_INVALID_PARAMETER we probably opened
486        * something that is not a pipe. */
487       if (err == ERROR_INVALID_PARAMETER) {
488         return UV_ENOTSOCK;
489       }
490       return uv_translate_sys_error(err);
491     }
492   }
493 
494   /* Check if the pipe was created with FILE_FLAG_OVERLAPPED. */
495   nt_status = pNtQueryInformationFile(pipeHandle,
496                                       &io_status,
497                                       &mode_info,
498                                       sizeof(mode_info),
499                                       FileModeInformation);
500   if (nt_status != STATUS_SUCCESS) {
501     return uv_translate_sys_error(err);
502   }
503 
504   if (mode_info.Mode & FILE_SYNCHRONOUS_IO_ALERT ||
505       mode_info.Mode & FILE_SYNCHRONOUS_IO_NONALERT) {
506     /* Non-overlapped pipe. */
507     handle->flags |= UV_HANDLE_NON_OVERLAPPED_PIPE;
508     handle->pipe.conn.readfile_thread_handle = NULL;
509     InitializeCriticalSection(&handle->pipe.conn.readfile_thread_lock);
510   } else {
511     /* Overlapped pipe.  Try to associate with IOCP. */
512     if (CreateIoCompletionPort(pipeHandle,
513                                loop->iocp,
514                                (ULONG_PTR) handle,
515                                0) == NULL) {
516       handle->flags |= UV_HANDLE_EMULATE_IOCP;
517     }
518   }
519 
520   handle->handle = pipeHandle;
521   handle->u.fd = fd;
522   handle->flags |= duplex_flags;
523 
524   return 0;
525 }
526 
527 
pipe_alloc_accept(uv_loop_t * loop,uv_pipe_t * handle,uv_pipe_accept_t * req,BOOL firstInstance)528 static int pipe_alloc_accept(uv_loop_t* loop, uv_pipe_t* handle,
529                              uv_pipe_accept_t* req, BOOL firstInstance) {
530   assert(req->pipeHandle == INVALID_HANDLE_VALUE);
531 
532   req->pipeHandle =
533       CreateNamedPipeW(handle->name,
534                        PIPE_ACCESS_DUPLEX | FILE_FLAG_OVERLAPPED | WRITE_DAC |
535                          (firstInstance ? FILE_FLAG_FIRST_PIPE_INSTANCE : 0),
536                        PIPE_TYPE_BYTE | PIPE_READMODE_BYTE | PIPE_WAIT,
537                        PIPE_UNLIMITED_INSTANCES, 65536, 65536, 0, NULL);
538 
539   if (req->pipeHandle == INVALID_HANDLE_VALUE) {
540     return 0;
541   }
542 
543   /* Associate it with IOCP so we can get events. */
544   if (CreateIoCompletionPort(req->pipeHandle,
545                              loop->iocp,
546                              (ULONG_PTR) handle,
547                              0) == NULL) {
548     uv_fatal_error(GetLastError(), "CreateIoCompletionPort");
549   }
550 
551   /* Stash a handle in the server object for use from places such as
552    * getsockname and chmod. As we transfer ownership of these to client
553    * objects, we'll allocate new ones here. */
554   handle->handle = req->pipeHandle;
555 
556   return 1;
557 }
558 
559 
pipe_shutdown_thread_proc(void * parameter)560 static DWORD WINAPI pipe_shutdown_thread_proc(void* parameter) {
561   uv_loop_t* loop;
562   uv_pipe_t* handle;
563   uv_shutdown_t* req;
564 
565   req = (uv_shutdown_t*) parameter;
566   assert(req);
567   handle = (uv_pipe_t*) req->handle;
568   assert(handle);
569   loop = handle->loop;
570   assert(loop);
571 
572   FlushFileBuffers(handle->handle);
573 
574   /* Post completed */
575   POST_COMPLETION_FOR_REQ(loop, req);
576 
577   return 0;
578 }
579 
580 
uv__pipe_shutdown(uv_loop_t * loop,uv_pipe_t * handle,uv_shutdown_t * req)581 void uv__pipe_shutdown(uv_loop_t* loop, uv_pipe_t* handle, uv_shutdown_t *req) {
582   DWORD result;
583   NTSTATUS nt_status;
584   IO_STATUS_BLOCK io_status;
585   FILE_PIPE_LOCAL_INFORMATION pipe_info;
586 
587   assert(handle->flags & UV_HANDLE_CONNECTION);
588   assert(req != NULL);
589   assert(handle->stream.conn.write_reqs_pending == 0);
590   SET_REQ_SUCCESS(req);
591 
592   if (handle->flags & UV_HANDLE_CLOSING) {
593     uv__insert_pending_req(loop, (uv_req_t*) req);
594     return;
595   }
596 
597   /* Try to avoid flushing the pipe buffer in the thread pool. */
598   nt_status = pNtQueryInformationFile(handle->handle,
599                                       &io_status,
600                                       &pipe_info,
601                                       sizeof pipe_info,
602                                       FilePipeLocalInformation);
603 
604   if (nt_status != STATUS_SUCCESS) {
605     SET_REQ_ERROR(req, pRtlNtStatusToDosError(nt_status));
606     handle->flags |= UV_HANDLE_WRITABLE; /* Questionable. */
607     uv__insert_pending_req(loop, (uv_req_t*) req);
608     return;
609   }
610 
611   if (pipe_info.OutboundQuota == pipe_info.WriteQuotaAvailable) {
612     /* Short-circuit, no need to call FlushFileBuffers:
613      * all writes have been read. */
614     uv__insert_pending_req(loop, (uv_req_t*) req);
615     return;
616   }
617 
618   /* Run FlushFileBuffers in the thread pool. */
619   result = QueueUserWorkItem(pipe_shutdown_thread_proc,
620                              req,
621                              WT_EXECUTELONGFUNCTION);
622   if (!result) {
623     SET_REQ_ERROR(req, GetLastError());
624     handle->flags |= UV_HANDLE_WRITABLE; /* Questionable. */
625     uv__insert_pending_req(loop, (uv_req_t*) req);
626     return;
627   }
628 }
629 
630 
uv__pipe_endgame(uv_loop_t * loop,uv_pipe_t * handle)631 void uv__pipe_endgame(uv_loop_t* loop, uv_pipe_t* handle) {
632   uv__ipc_xfer_queue_item_t* xfer_queue_item;
633 
634   assert(handle->reqs_pending == 0);
635   assert(handle->flags & UV_HANDLE_CLOSING);
636   assert(!(handle->flags & UV_HANDLE_CLOSED));
637 
638   if (handle->flags & UV_HANDLE_CONNECTION) {
639     /* Free pending sockets */
640     while (!QUEUE_EMPTY(&handle->pipe.conn.ipc_xfer_queue)) {
641       QUEUE* q;
642       SOCKET socket;
643 
644       q = QUEUE_HEAD(&handle->pipe.conn.ipc_xfer_queue);
645       QUEUE_REMOVE(q);
646       xfer_queue_item = QUEUE_DATA(q, uv__ipc_xfer_queue_item_t, member);
647 
648       /* Materialize socket and close it */
649       socket = WSASocketW(FROM_PROTOCOL_INFO,
650                           FROM_PROTOCOL_INFO,
651                           FROM_PROTOCOL_INFO,
652                           &xfer_queue_item->xfer_info.socket_info,
653                           0,
654                           WSA_FLAG_OVERLAPPED);
655       uv__free(xfer_queue_item);
656 
657       if (socket != INVALID_SOCKET)
658         closesocket(socket);
659     }
660     handle->pipe.conn.ipc_xfer_queue_length = 0;
661 
662     if (handle->flags & UV_HANDLE_EMULATE_IOCP) {
663       if (handle->read_req.wait_handle != INVALID_HANDLE_VALUE) {
664         UnregisterWait(handle->read_req.wait_handle);
665         handle->read_req.wait_handle = INVALID_HANDLE_VALUE;
666       }
667       if (handle->read_req.event_handle != NULL) {
668         CloseHandle(handle->read_req.event_handle);
669         handle->read_req.event_handle = NULL;
670       }
671     }
672 
673     if (handle->flags & UV_HANDLE_NON_OVERLAPPED_PIPE)
674       DeleteCriticalSection(&handle->pipe.conn.readfile_thread_lock);
675   }
676 
677   if (handle->flags & UV_HANDLE_PIPESERVER) {
678     assert(handle->pipe.serv.accept_reqs);
679     uv__free(handle->pipe.serv.accept_reqs);
680     handle->pipe.serv.accept_reqs = NULL;
681   }
682 
683   uv__handle_close(handle);
684 }
685 
686 
uv_pipe_pending_instances(uv_pipe_t * handle,int count)687 void uv_pipe_pending_instances(uv_pipe_t* handle, int count) {
688   if (handle->flags & UV_HANDLE_BOUND)
689     return;
690   handle->pipe.serv.pending_instances = count;
691   handle->flags |= UV_HANDLE_PIPESERVER;
692 }
693 
694 
695 /* Creates a pipe server. */
uv_pipe_bind(uv_pipe_t * handle,const char * name)696 int uv_pipe_bind(uv_pipe_t* handle, const char* name) {
697   uv_loop_t* loop = handle->loop;
698   int i, err, nameSize;
699   uv_pipe_accept_t* req;
700 
701   if (handle->flags & UV_HANDLE_BOUND) {
702     return UV_EINVAL;
703   }
704 
705   if (!name) {
706     return UV_EINVAL;
707   }
708   if (uv__is_closing(handle)) {
709     return UV_EINVAL;
710   }
711   if (!(handle->flags & UV_HANDLE_PIPESERVER)) {
712     handle->pipe.serv.pending_instances = default_pending_pipe_instances;
713   }
714 
715   handle->pipe.serv.accept_reqs = (uv_pipe_accept_t*)
716     uv__malloc(sizeof(uv_pipe_accept_t) * handle->pipe.serv.pending_instances);
717   if (!handle->pipe.serv.accept_reqs) {
718     uv_fatal_error(ERROR_OUTOFMEMORY, "uv__malloc");
719   }
720 
721   for (i = 0; i < handle->pipe.serv.pending_instances; i++) {
722     req = &handle->pipe.serv.accept_reqs[i];
723     UV_REQ_INIT(req, UV_ACCEPT);
724     req->data = handle;
725     req->pipeHandle = INVALID_HANDLE_VALUE;
726     req->next_pending = NULL;
727   }
728 
729   /* Convert name to UTF16. */
730   nameSize = MultiByteToWideChar(CP_UTF8, 0, name, -1, NULL, 0) * sizeof(WCHAR);
731   handle->name = uv__malloc(nameSize);
732   if (!handle->name) {
733     uv_fatal_error(ERROR_OUTOFMEMORY, "uv__malloc");
734   }
735 
736   if (!MultiByteToWideChar(CP_UTF8,
737                            0,
738                            name,
739                            -1,
740                            handle->name,
741                            nameSize / sizeof(WCHAR))) {
742     err = GetLastError();
743     goto error;
744   }
745 
746   /*
747    * Attempt to create the first pipe with FILE_FLAG_FIRST_PIPE_INSTANCE.
748    * If this fails then there's already a pipe server for the given pipe name.
749    */
750   if (!pipe_alloc_accept(loop,
751                          handle,
752                          &handle->pipe.serv.accept_reqs[0],
753                          TRUE)) {
754     err = GetLastError();
755     if (err == ERROR_ACCESS_DENIED) {
756       err = WSAEADDRINUSE;  /* Translates to UV_EADDRINUSE. */
757     } else if (err == ERROR_PATH_NOT_FOUND || err == ERROR_INVALID_NAME) {
758       err = WSAEACCES;  /* Translates to UV_EACCES. */
759     }
760     goto error;
761   }
762 
763   handle->pipe.serv.pending_accepts = NULL;
764   handle->flags |= UV_HANDLE_PIPESERVER;
765   handle->flags |= UV_HANDLE_BOUND;
766 
767   return 0;
768 
769 error:
770   if (handle->name) {
771     uv__free(handle->name);
772     handle->name = NULL;
773   }
774 
775   return uv_translate_sys_error(err);
776 }
777 
778 
pipe_connect_thread_proc(void * parameter)779 static DWORD WINAPI pipe_connect_thread_proc(void* parameter) {
780   uv_loop_t* loop;
781   uv_pipe_t* handle;
782   uv_connect_t* req;
783   HANDLE pipeHandle = INVALID_HANDLE_VALUE;
784   DWORD duplex_flags;
785 
786   req = (uv_connect_t*) parameter;
787   assert(req);
788   handle = (uv_pipe_t*) req->handle;
789   assert(handle);
790   loop = handle->loop;
791   assert(loop);
792 
793   /* We're here because CreateFile on a pipe returned ERROR_PIPE_BUSY. We wait
794    * up to 30 seconds for the pipe to become available with WaitNamedPipe. */
795   while (WaitNamedPipeW(handle->name, 30000)) {
796     /* The pipe is now available, try to connect. */
797     pipeHandle = open_named_pipe(handle->name, &duplex_flags);
798     if (pipeHandle != INVALID_HANDLE_VALUE)
799       break;
800 
801     SwitchToThread();
802   }
803 
804   if (pipeHandle != INVALID_HANDLE_VALUE) {
805     SET_REQ_SUCCESS(req);
806     req->u.connect.pipeHandle = pipeHandle;
807     req->u.connect.duplex_flags = duplex_flags;
808   } else {
809     SET_REQ_ERROR(req, GetLastError());
810   }
811 
812   /* Post completed */
813   POST_COMPLETION_FOR_REQ(loop, req);
814 
815   return 0;
816 }
817 
818 
uv_pipe_connect(uv_connect_t * req,uv_pipe_t * handle,const char * name,uv_connect_cb cb)819 void uv_pipe_connect(uv_connect_t* req, uv_pipe_t* handle,
820     const char* name, uv_connect_cb cb) {
821   uv_loop_t* loop = handle->loop;
822   int err, nameSize;
823   HANDLE pipeHandle = INVALID_HANDLE_VALUE;
824   DWORD duplex_flags;
825 
826   UV_REQ_INIT(req, UV_CONNECT);
827   req->handle = (uv_stream_t*) handle;
828   req->cb = cb;
829   req->u.connect.pipeHandle = INVALID_HANDLE_VALUE;
830   req->u.connect.duplex_flags = 0;
831 
832   if (handle->flags & UV_HANDLE_PIPESERVER) {
833     err = ERROR_INVALID_PARAMETER;
834     goto error;
835   }
836   if (handle->flags & UV_HANDLE_CONNECTION) {
837     err = ERROR_PIPE_BUSY;
838     goto error;
839   }
840   uv__pipe_connection_init(handle);
841 
842   /* Convert name to UTF16. */
843   nameSize = MultiByteToWideChar(CP_UTF8, 0, name, -1, NULL, 0) * sizeof(WCHAR);
844   handle->name = uv__malloc(nameSize);
845   if (!handle->name) {
846     uv_fatal_error(ERROR_OUTOFMEMORY, "uv__malloc");
847   }
848 
849   if (!MultiByteToWideChar(CP_UTF8,
850                            0,
851                            name,
852                            -1,
853                            handle->name,
854                            nameSize / sizeof(WCHAR))) {
855     err = GetLastError();
856     goto error;
857   }
858 
859   pipeHandle = open_named_pipe(handle->name, &duplex_flags);
860   if (pipeHandle == INVALID_HANDLE_VALUE) {
861     if (GetLastError() == ERROR_PIPE_BUSY) {
862       /* Wait for the server to make a pipe instance available. */
863       if (!QueueUserWorkItem(&pipe_connect_thread_proc,
864                              req,
865                              WT_EXECUTELONGFUNCTION)) {
866         err = GetLastError();
867         goto error;
868       }
869 
870       REGISTER_HANDLE_REQ(loop, handle, req);
871       handle->reqs_pending++;
872 
873       return;
874     }
875 
876     err = GetLastError();
877     goto error;
878   }
879 
880   req->u.connect.pipeHandle = pipeHandle;
881   req->u.connect.duplex_flags = duplex_flags;
882   SET_REQ_SUCCESS(req);
883   uv__insert_pending_req(loop, (uv_req_t*) req);
884   handle->reqs_pending++;
885   REGISTER_HANDLE_REQ(loop, handle, req);
886   return;
887 
888 error:
889   if (handle->name) {
890     uv__free(handle->name);
891     handle->name = NULL;
892   }
893 
894   if (pipeHandle != INVALID_HANDLE_VALUE)
895     CloseHandle(pipeHandle);
896 
897   /* Make this req pending reporting an error. */
898   SET_REQ_ERROR(req, err);
899   uv__insert_pending_req(loop, (uv_req_t*) req);
900   handle->reqs_pending++;
901   REGISTER_HANDLE_REQ(loop, handle, req);
902   return;
903 }
904 
905 
uv__pipe_interrupt_read(uv_pipe_t * handle)906 void uv__pipe_interrupt_read(uv_pipe_t* handle) {
907   BOOL r;
908 
909   if (!(handle->flags & UV_HANDLE_READ_PENDING))
910     return; /* No pending reads. */
911   if (handle->flags & UV_HANDLE_CANCELLATION_PENDING)
912     return; /* Already cancelled. */
913   if (handle->handle == INVALID_HANDLE_VALUE)
914     return; /* Pipe handle closed. */
915 
916   if (!(handle->flags & UV_HANDLE_NON_OVERLAPPED_PIPE)) {
917     /* Cancel asynchronous read. */
918     r = CancelIoEx(handle->handle, &handle->read_req.u.io.overlapped);
919     assert(r || GetLastError() == ERROR_NOT_FOUND);
920     (void) r;
921   } else {
922     /* Cancel synchronous read (which is happening in the thread pool). */
923     HANDLE thread;
924     volatile HANDLE* thread_ptr = &handle->pipe.conn.readfile_thread_handle;
925 
926     EnterCriticalSection(&handle->pipe.conn.readfile_thread_lock);
927 
928     thread = *thread_ptr;
929     if (thread == NULL) {
930       /* The thread pool thread has not yet reached the point of blocking, we
931        * can pre-empt it by setting thread_handle to INVALID_HANDLE_VALUE. */
932       *thread_ptr = INVALID_HANDLE_VALUE;
933 
934     } else {
935       /* Spin until the thread has acknowledged (by setting the thread to
936        * INVALID_HANDLE_VALUE) that it is past the point of blocking. */
937       while (thread != INVALID_HANDLE_VALUE) {
938         r = CancelSynchronousIo(thread);
939         assert(r || GetLastError() == ERROR_NOT_FOUND);
940         SwitchToThread(); /* Yield thread. */
941         thread = *thread_ptr;
942       }
943     }
944 
945     LeaveCriticalSection(&handle->pipe.conn.readfile_thread_lock);
946   }
947 
948   /* Set flag to indicate that read has been cancelled. */
949   handle->flags |= UV_HANDLE_CANCELLATION_PENDING;
950 }
951 
952 
uv__pipe_read_stop(uv_pipe_t * handle)953 void uv__pipe_read_stop(uv_pipe_t* handle) {
954   handle->flags &= ~UV_HANDLE_READING;
955   DECREASE_ACTIVE_COUNT(handle->loop, handle);
956   uv__pipe_interrupt_read(handle);
957 }
958 
959 
960 /* Cleans up uv_pipe_t (server or connection) and all resources associated with
961  * it. */
uv__pipe_close(uv_loop_t * loop,uv_pipe_t * handle)962 void uv__pipe_close(uv_loop_t* loop, uv_pipe_t* handle) {
963   int i;
964   HANDLE pipeHandle;
965 
966   if (handle->flags & UV_HANDLE_READING) {
967     handle->flags &= ~UV_HANDLE_READING;
968     DECREASE_ACTIVE_COUNT(loop, handle);
969   }
970 
971   if (handle->flags & UV_HANDLE_LISTENING) {
972     handle->flags &= ~UV_HANDLE_LISTENING;
973     DECREASE_ACTIVE_COUNT(loop, handle);
974   }
975 
976   handle->flags &= ~(UV_HANDLE_READABLE | UV_HANDLE_WRITABLE);
977 
978   uv__handle_closing(handle);
979 
980   uv__pipe_interrupt_read(handle);
981 
982   if (handle->name) {
983     uv__free(handle->name);
984     handle->name = NULL;
985   }
986 
987   if (handle->flags & UV_HANDLE_PIPESERVER) {
988     for (i = 0; i < handle->pipe.serv.pending_instances; i++) {
989       pipeHandle = handle->pipe.serv.accept_reqs[i].pipeHandle;
990       if (pipeHandle != INVALID_HANDLE_VALUE) {
991         CloseHandle(pipeHandle);
992         handle->pipe.serv.accept_reqs[i].pipeHandle = INVALID_HANDLE_VALUE;
993       }
994     }
995     handle->handle = INVALID_HANDLE_VALUE;
996   }
997 
998   if (handle->flags & UV_HANDLE_CONNECTION) {
999     eof_timer_destroy(handle);
1000   }
1001 
1002   if ((handle->flags & UV_HANDLE_CONNECTION)
1003       && handle->handle != INVALID_HANDLE_VALUE) {
1004     /* This will eventually destroy the write queue for us too. */
1005     close_pipe(handle);
1006   }
1007 
1008   if (handle->reqs_pending == 0)
1009     uv__want_endgame(loop, (uv_handle_t*) handle);
1010 }
1011 
1012 
uv__pipe_queue_accept(uv_loop_t * loop,uv_pipe_t * handle,uv_pipe_accept_t * req,BOOL firstInstance)1013 static void uv__pipe_queue_accept(uv_loop_t* loop, uv_pipe_t* handle,
1014     uv_pipe_accept_t* req, BOOL firstInstance) {
1015   assert(handle->flags & UV_HANDLE_LISTENING);
1016 
1017   if (!firstInstance && !pipe_alloc_accept(loop, handle, req, FALSE)) {
1018     SET_REQ_ERROR(req, GetLastError());
1019     uv__insert_pending_req(loop, (uv_req_t*) req);
1020     handle->reqs_pending++;
1021     return;
1022   }
1023 
1024   assert(req->pipeHandle != INVALID_HANDLE_VALUE);
1025 
1026   /* Prepare the overlapped structure. */
1027   memset(&(req->u.io.overlapped), 0, sizeof(req->u.io.overlapped));
1028 
1029   if (!ConnectNamedPipe(req->pipeHandle, &req->u.io.overlapped) &&
1030       GetLastError() != ERROR_IO_PENDING) {
1031     if (GetLastError() == ERROR_PIPE_CONNECTED) {
1032       SET_REQ_SUCCESS(req);
1033     } else {
1034       CloseHandle(req->pipeHandle);
1035       req->pipeHandle = INVALID_HANDLE_VALUE;
1036       /* Make this req pending reporting an error. */
1037       SET_REQ_ERROR(req, GetLastError());
1038     }
1039     uv__insert_pending_req(loop, (uv_req_t*) req);
1040     handle->reqs_pending++;
1041     return;
1042   }
1043 
1044   /* Wait for completion via IOCP */
1045   handle->reqs_pending++;
1046 }
1047 
1048 
uv__pipe_accept(uv_pipe_t * server,uv_stream_t * client)1049 int uv__pipe_accept(uv_pipe_t* server, uv_stream_t* client) {
1050   uv_loop_t* loop = server->loop;
1051   uv_pipe_t* pipe_client;
1052   uv_pipe_accept_t* req;
1053   QUEUE* q;
1054   uv__ipc_xfer_queue_item_t* item;
1055   int err;
1056 
1057   if (server->ipc) {
1058     if (QUEUE_EMPTY(&server->pipe.conn.ipc_xfer_queue)) {
1059       /* No valid pending sockets. */
1060       return WSAEWOULDBLOCK;
1061     }
1062 
1063     q = QUEUE_HEAD(&server->pipe.conn.ipc_xfer_queue);
1064     QUEUE_REMOVE(q);
1065     server->pipe.conn.ipc_xfer_queue_length--;
1066     item = QUEUE_DATA(q, uv__ipc_xfer_queue_item_t, member);
1067 
1068     err = uv__tcp_xfer_import(
1069         (uv_tcp_t*) client, item->xfer_type, &item->xfer_info);
1070     if (err != 0)
1071       return err;
1072 
1073     uv__free(item);
1074 
1075   } else {
1076     pipe_client = (uv_pipe_t*) client;
1077     uv__pipe_connection_init(pipe_client);
1078 
1079     /* Find a connection instance that has been connected, but not yet
1080      * accepted. */
1081     req = server->pipe.serv.pending_accepts;
1082 
1083     if (!req) {
1084       /* No valid connections found, so we error out. */
1085       return WSAEWOULDBLOCK;
1086     }
1087 
1088     /* Initialize the client handle and copy the pipeHandle to the client */
1089     pipe_client->handle = req->pipeHandle;
1090     pipe_client->flags |= UV_HANDLE_READABLE | UV_HANDLE_WRITABLE;
1091 
1092     /* Prepare the req to pick up a new connection */
1093     server->pipe.serv.pending_accepts = req->next_pending;
1094     req->next_pending = NULL;
1095     req->pipeHandle = INVALID_HANDLE_VALUE;
1096 
1097     server->handle = INVALID_HANDLE_VALUE;
1098     if (!(server->flags & UV_HANDLE_CLOSING)) {
1099       uv__pipe_queue_accept(loop, server, req, FALSE);
1100     }
1101   }
1102 
1103   return 0;
1104 }
1105 
1106 
1107 /* Starts listening for connections for the given pipe. */
uv__pipe_listen(uv_pipe_t * handle,int backlog,uv_connection_cb cb)1108 int uv__pipe_listen(uv_pipe_t* handle, int backlog, uv_connection_cb cb) {
1109   uv_loop_t* loop = handle->loop;
1110   int i;
1111 
1112   if (handle->flags & UV_HANDLE_LISTENING) {
1113     handle->stream.serv.connection_cb = cb;
1114   }
1115 
1116   if (!(handle->flags & UV_HANDLE_BOUND)) {
1117     return WSAEINVAL;
1118   }
1119 
1120   if (handle->flags & UV_HANDLE_READING) {
1121     return WSAEISCONN;
1122   }
1123 
1124   if (!(handle->flags & UV_HANDLE_PIPESERVER)) {
1125     return ERROR_NOT_SUPPORTED;
1126   }
1127 
1128   if (handle->ipc) {
1129     return WSAEINVAL;
1130   }
1131 
1132   handle->flags |= UV_HANDLE_LISTENING;
1133   INCREASE_ACTIVE_COUNT(loop, handle);
1134   handle->stream.serv.connection_cb = cb;
1135 
1136   /* First pipe handle should have already been created in uv_pipe_bind */
1137   assert(handle->pipe.serv.accept_reqs[0].pipeHandle != INVALID_HANDLE_VALUE);
1138 
1139   for (i = 0; i < handle->pipe.serv.pending_instances; i++) {
1140     uv__pipe_queue_accept(loop, handle, &handle->pipe.serv.accept_reqs[i], i == 0);
1141   }
1142 
1143   return 0;
1144 }
1145 
1146 
uv_pipe_zero_readfile_thread_proc(void * arg)1147 static DWORD WINAPI uv_pipe_zero_readfile_thread_proc(void* arg) {
1148   uv_read_t* req = (uv_read_t*) arg;
1149   uv_pipe_t* handle = (uv_pipe_t*) req->data;
1150   uv_loop_t* loop = handle->loop;
1151   volatile HANDLE* thread_ptr = &handle->pipe.conn.readfile_thread_handle;
1152   CRITICAL_SECTION* lock = &handle->pipe.conn.readfile_thread_lock;
1153   HANDLE thread;
1154   DWORD bytes;
1155   DWORD err;
1156 
1157   assert(req->type == UV_READ);
1158   assert(handle->type == UV_NAMED_PIPE);
1159 
1160   err = 0;
1161 
1162   /* Create a handle to the current thread. */
1163   if (!DuplicateHandle(GetCurrentProcess(),
1164                        GetCurrentThread(),
1165                        GetCurrentProcess(),
1166                        &thread,
1167                        0,
1168                        FALSE,
1169                        DUPLICATE_SAME_ACCESS)) {
1170     err = GetLastError();
1171     goto out1;
1172   }
1173 
1174   /* The lock needs to be held when thread handle is modified. */
1175   EnterCriticalSection(lock);
1176   if (*thread_ptr == INVALID_HANDLE_VALUE) {
1177     /* uv__pipe_interrupt_read() cancelled reading before we got here. */
1178     err = ERROR_OPERATION_ABORTED;
1179   } else {
1180     /* Let main thread know which worker thread is doing the blocking read. */
1181     assert(*thread_ptr == NULL);
1182     *thread_ptr = thread;
1183   }
1184   LeaveCriticalSection(lock);
1185 
1186   if (err)
1187     goto out2;
1188 
1189   /* Block the thread until data is available on the pipe, or the read is
1190    * cancelled. */
1191   if (!ReadFile(handle->handle, &uv_zero_, 0, &bytes, NULL))
1192     err = GetLastError();
1193 
1194   /* Let the main thread know the worker is past the point of blocking. */
1195   assert(thread == *thread_ptr);
1196   *thread_ptr = INVALID_HANDLE_VALUE;
1197 
1198   /* Briefly acquire the mutex. Since the main thread holds the lock while it
1199    * is spinning trying to cancel this thread's I/O, we will block here until
1200    * it stops doing that. */
1201   EnterCriticalSection(lock);
1202   LeaveCriticalSection(lock);
1203 
1204 out2:
1205   /* Close the handle to the current thread. */
1206   CloseHandle(thread);
1207 
1208 out1:
1209   /* Set request status and post a completion record to the IOCP. */
1210   if (err)
1211     SET_REQ_ERROR(req, err);
1212   else
1213     SET_REQ_SUCCESS(req);
1214   POST_COMPLETION_FOR_REQ(loop, req);
1215 
1216   return 0;
1217 }
1218 
1219 
uv_pipe_writefile_thread_proc(void * parameter)1220 static DWORD WINAPI uv_pipe_writefile_thread_proc(void* parameter) {
1221   int result;
1222   DWORD bytes;
1223   uv_write_t* req = (uv_write_t*) parameter;
1224   uv_pipe_t* handle = (uv_pipe_t*) req->handle;
1225   uv_loop_t* loop = handle->loop;
1226 
1227   assert(req != NULL);
1228   assert(req->type == UV_WRITE);
1229   assert(handle->type == UV_NAMED_PIPE);
1230 
1231   result = WriteFile(handle->handle,
1232                      req->write_buffer.base,
1233                      req->write_buffer.len,
1234                      &bytes,
1235                      NULL);
1236 
1237   if (!result) {
1238     SET_REQ_ERROR(req, GetLastError());
1239   }
1240 
1241   POST_COMPLETION_FOR_REQ(loop, req);
1242   return 0;
1243 }
1244 
1245 
post_completion_read_wait(void * context,BOOLEAN timed_out)1246 static void CALLBACK post_completion_read_wait(void* context, BOOLEAN timed_out) {
1247   uv_read_t* req;
1248   uv_tcp_t* handle;
1249 
1250   req = (uv_read_t*) context;
1251   assert(req != NULL);
1252   handle = (uv_tcp_t*)req->data;
1253   assert(handle != NULL);
1254   assert(!timed_out);
1255 
1256   if (!PostQueuedCompletionStatus(handle->loop->iocp,
1257                                   req->u.io.overlapped.InternalHigh,
1258                                   0,
1259                                   &req->u.io.overlapped)) {
1260     uv_fatal_error(GetLastError(), "PostQueuedCompletionStatus");
1261   }
1262 }
1263 
1264 
post_completion_write_wait(void * context,BOOLEAN timed_out)1265 static void CALLBACK post_completion_write_wait(void* context, BOOLEAN timed_out) {
1266   uv_write_t* req;
1267   uv_tcp_t* handle;
1268 
1269   req = (uv_write_t*) context;
1270   assert(req != NULL);
1271   handle = (uv_tcp_t*)req->handle;
1272   assert(handle != NULL);
1273   assert(!timed_out);
1274 
1275   if (!PostQueuedCompletionStatus(handle->loop->iocp,
1276                                   req->u.io.overlapped.InternalHigh,
1277                                   0,
1278                                   &req->u.io.overlapped)) {
1279     uv_fatal_error(GetLastError(), "PostQueuedCompletionStatus");
1280   }
1281 }
1282 
1283 
uv__pipe_queue_read(uv_loop_t * loop,uv_pipe_t * handle)1284 static void uv__pipe_queue_read(uv_loop_t* loop, uv_pipe_t* handle) {
1285   uv_read_t* req;
1286   int result;
1287 
1288   assert(handle->flags & UV_HANDLE_READING);
1289   assert(!(handle->flags & UV_HANDLE_READ_PENDING));
1290 
1291   assert(handle->handle != INVALID_HANDLE_VALUE);
1292 
1293   req = &handle->read_req;
1294 
1295   if (handle->flags & UV_HANDLE_NON_OVERLAPPED_PIPE) {
1296     handle->pipe.conn.readfile_thread_handle = NULL; /* Reset cancellation. */
1297     if (!QueueUserWorkItem(&uv_pipe_zero_readfile_thread_proc,
1298                            req,
1299                            WT_EXECUTELONGFUNCTION)) {
1300       /* Make this req pending reporting an error. */
1301       SET_REQ_ERROR(req, GetLastError());
1302       goto error;
1303     }
1304   } else {
1305     memset(&req->u.io.overlapped, 0, sizeof(req->u.io.overlapped));
1306     if (handle->flags & UV_HANDLE_EMULATE_IOCP) {
1307       assert(req->event_handle != NULL);
1308       req->u.io.overlapped.hEvent = (HANDLE) ((uintptr_t) req->event_handle | 1);
1309     }
1310 
1311     /* Do 0-read */
1312     result = ReadFile(handle->handle,
1313                       &uv_zero_,
1314                       0,
1315                       NULL,
1316                       &req->u.io.overlapped);
1317 
1318     if (!result && GetLastError() != ERROR_IO_PENDING) {
1319       /* Make this req pending reporting an error. */
1320       SET_REQ_ERROR(req, GetLastError());
1321       goto error;
1322     }
1323 
1324     if (handle->flags & UV_HANDLE_EMULATE_IOCP) {
1325       if (req->wait_handle == INVALID_HANDLE_VALUE) {
1326         if (!RegisterWaitForSingleObject(&req->wait_handle,
1327             req->event_handle, post_completion_read_wait, (void*) req,
1328             INFINITE, WT_EXECUTEINWAITTHREAD)) {
1329           SET_REQ_ERROR(req, GetLastError());
1330           goto error;
1331         }
1332       }
1333     }
1334   }
1335 
1336   /* Start the eof timer if there is one */
1337   eof_timer_start(handle);
1338   handle->flags |= UV_HANDLE_READ_PENDING;
1339   handle->reqs_pending++;
1340   return;
1341 
1342 error:
1343   uv__insert_pending_req(loop, (uv_req_t*)req);
1344   handle->flags |= UV_HANDLE_READ_PENDING;
1345   handle->reqs_pending++;
1346 }
1347 
1348 
uv__pipe_read_start(uv_pipe_t * handle,uv_alloc_cb alloc_cb,uv_read_cb read_cb)1349 int uv__pipe_read_start(uv_pipe_t* handle,
1350                         uv_alloc_cb alloc_cb,
1351                         uv_read_cb read_cb) {
1352   uv_loop_t* loop = handle->loop;
1353 
1354   handle->flags |= UV_HANDLE_READING;
1355   INCREASE_ACTIVE_COUNT(loop, handle);
1356   handle->read_cb = read_cb;
1357   handle->alloc_cb = alloc_cb;
1358 
1359   /* If reading was stopped and then started again, there could still be a read
1360    * request pending. */
1361   if (!(handle->flags & UV_HANDLE_READ_PENDING)) {
1362     if (handle->flags & UV_HANDLE_EMULATE_IOCP &&
1363         handle->read_req.event_handle == NULL) {
1364       handle->read_req.event_handle = CreateEvent(NULL, 0, 0, NULL);
1365       if (handle->read_req.event_handle == NULL) {
1366         uv_fatal_error(GetLastError(), "CreateEvent");
1367       }
1368     }
1369     uv__pipe_queue_read(loop, handle);
1370   }
1371 
1372   return 0;
1373 }
1374 
1375 
uv__insert_non_overlapped_write_req(uv_pipe_t * handle,uv_write_t * req)1376 static void uv__insert_non_overlapped_write_req(uv_pipe_t* handle,
1377     uv_write_t* req) {
1378   req->next_req = NULL;
1379   if (handle->pipe.conn.non_overlapped_writes_tail) {
1380     req->next_req =
1381       handle->pipe.conn.non_overlapped_writes_tail->next_req;
1382     handle->pipe.conn.non_overlapped_writes_tail->next_req = (uv_req_t*)req;
1383     handle->pipe.conn.non_overlapped_writes_tail = req;
1384   } else {
1385     req->next_req = (uv_req_t*)req;
1386     handle->pipe.conn.non_overlapped_writes_tail = req;
1387   }
1388 }
1389 
1390 
uv_remove_non_overlapped_write_req(uv_pipe_t * handle)1391 static uv_write_t* uv_remove_non_overlapped_write_req(uv_pipe_t* handle) {
1392   uv_write_t* req;
1393 
1394   if (handle->pipe.conn.non_overlapped_writes_tail) {
1395     req = (uv_write_t*)handle->pipe.conn.non_overlapped_writes_tail->next_req;
1396 
1397     if (req == handle->pipe.conn.non_overlapped_writes_tail) {
1398       handle->pipe.conn.non_overlapped_writes_tail = NULL;
1399     } else {
1400       handle->pipe.conn.non_overlapped_writes_tail->next_req =
1401         req->next_req;
1402     }
1403 
1404     return req;
1405   } else {
1406     /* queue empty */
1407     return NULL;
1408   }
1409 }
1410 
1411 
uv__queue_non_overlapped_write(uv_pipe_t * handle)1412 static void uv__queue_non_overlapped_write(uv_pipe_t* handle) {
1413   uv_write_t* req = uv_remove_non_overlapped_write_req(handle);
1414   if (req) {
1415     if (!QueueUserWorkItem(&uv_pipe_writefile_thread_proc,
1416                            req,
1417                            WT_EXECUTELONGFUNCTION)) {
1418       uv_fatal_error(GetLastError(), "QueueUserWorkItem");
1419     }
1420   }
1421 }
1422 
1423 
uv__build_coalesced_write_req(uv_write_t * user_req,const uv_buf_t bufs[],size_t nbufs,uv_write_t ** req_out,uv_buf_t * write_buf_out)1424 static int uv__build_coalesced_write_req(uv_write_t* user_req,
1425                                          const uv_buf_t bufs[],
1426                                          size_t nbufs,
1427                                          uv_write_t** req_out,
1428                                          uv_buf_t* write_buf_out) {
1429   /* Pack into a single heap-allocated buffer:
1430    *   (a) a uv_write_t structure where libuv stores the actual state.
1431    *   (b) a pointer to the original uv_write_t.
1432    *   (c) data from all `bufs` entries.
1433    */
1434   char* heap_buffer;
1435   size_t heap_buffer_length, heap_buffer_offset;
1436   uv__coalesced_write_t* coalesced_write_req; /* (a) + (b) */
1437   char* data_start;                           /* (c) */
1438   size_t data_length;
1439   unsigned int i;
1440 
1441   /* Compute combined size of all combined buffers from `bufs`. */
1442   data_length = 0;
1443   for (i = 0; i < nbufs; i++)
1444     data_length += bufs[i].len;
1445 
1446   /* The total combined size of data buffers should not exceed UINT32_MAX,
1447    * because WriteFile() won't accept buffers larger than that. */
1448   if (data_length > UINT32_MAX)
1449     return WSAENOBUFS; /* Maps to UV_ENOBUFS. */
1450 
1451   /* Compute heap buffer size. */
1452   heap_buffer_length = sizeof *coalesced_write_req + /* (a) + (b) */
1453                        data_length;                  /* (c) */
1454 
1455   /* Allocate buffer. */
1456   heap_buffer = uv__malloc(heap_buffer_length);
1457   if (heap_buffer == NULL)
1458     return ERROR_NOT_ENOUGH_MEMORY; /* Maps to UV_ENOMEM. */
1459 
1460   /* Copy uv_write_t information to the buffer. */
1461   coalesced_write_req = (uv__coalesced_write_t*) heap_buffer;
1462   coalesced_write_req->req = *user_req; /* copy (a) */
1463   coalesced_write_req->req.coalesced = 1;
1464   coalesced_write_req->user_req = user_req;         /* copy (b) */
1465   heap_buffer_offset = sizeof *coalesced_write_req; /* offset (a) + (b) */
1466 
1467   /* Copy data buffers to the heap buffer. */
1468   data_start = &heap_buffer[heap_buffer_offset];
1469   for (i = 0; i < nbufs; i++) {
1470     memcpy(&heap_buffer[heap_buffer_offset],
1471            bufs[i].base,
1472            bufs[i].len);               /* copy (c) */
1473     heap_buffer_offset += bufs[i].len; /* offset (c) */
1474   }
1475   assert(heap_buffer_offset == heap_buffer_length);
1476 
1477   /* Set out arguments and return. */
1478   *req_out = &coalesced_write_req->req;
1479   *write_buf_out = uv_buf_init(data_start, (unsigned int) data_length);
1480   return 0;
1481 }
1482 
1483 
uv__pipe_write_data(uv_loop_t * loop,uv_write_t * req,uv_pipe_t * handle,const uv_buf_t bufs[],size_t nbufs,uv_write_cb cb,int copy_always)1484 static int uv__pipe_write_data(uv_loop_t* loop,
1485                                uv_write_t* req,
1486                                uv_pipe_t* handle,
1487                                const uv_buf_t bufs[],
1488                                size_t nbufs,
1489                                uv_write_cb cb,
1490                                int copy_always) {
1491   int err;
1492   int result;
1493   uv_buf_t write_buf;
1494 
1495   assert(handle->handle != INVALID_HANDLE_VALUE);
1496 
1497   UV_REQ_INIT(req, UV_WRITE);
1498   req->handle = (uv_stream_t*) handle;
1499   req->send_handle = NULL;
1500   req->cb = cb;
1501   /* Private fields. */
1502   req->coalesced = 0;
1503   req->event_handle = NULL;
1504   req->wait_handle = INVALID_HANDLE_VALUE;
1505 
1506   /* Prepare the overlapped structure. */
1507   memset(&req->u.io.overlapped, 0, sizeof(req->u.io.overlapped));
1508   if (handle->flags & (UV_HANDLE_EMULATE_IOCP | UV_HANDLE_BLOCKING_WRITES)) {
1509     req->event_handle = CreateEvent(NULL, 0, 0, NULL);
1510     if (req->event_handle == NULL) {
1511       uv_fatal_error(GetLastError(), "CreateEvent");
1512     }
1513     req->u.io.overlapped.hEvent = (HANDLE) ((uintptr_t) req->event_handle | 1);
1514   }
1515   req->write_buffer = uv_null_buf_;
1516 
1517   if (nbufs == 0) {
1518     /* Write empty buffer. */
1519     write_buf = uv_null_buf_;
1520   } else if (nbufs == 1 && !copy_always) {
1521     /* Write directly from bufs[0]. */
1522     write_buf = bufs[0];
1523   } else {
1524     /* Coalesce all `bufs` into one big buffer. This also creates a new
1525      * write-request structure that replaces the old one. */
1526     err = uv__build_coalesced_write_req(req, bufs, nbufs, &req, &write_buf);
1527     if (err != 0)
1528       return err;
1529   }
1530 
1531   if ((handle->flags &
1532       (UV_HANDLE_BLOCKING_WRITES | UV_HANDLE_NON_OVERLAPPED_PIPE)) ==
1533       (UV_HANDLE_BLOCKING_WRITES | UV_HANDLE_NON_OVERLAPPED_PIPE)) {
1534     DWORD bytes;
1535     result =
1536         WriteFile(handle->handle, write_buf.base, write_buf.len, &bytes, NULL);
1537 
1538     if (!result) {
1539       err = GetLastError();
1540       return err;
1541     } else {
1542       /* Request completed immediately. */
1543       req->u.io.queued_bytes = 0;
1544     }
1545 
1546     REGISTER_HANDLE_REQ(loop, handle, req);
1547     handle->reqs_pending++;
1548     handle->stream.conn.write_reqs_pending++;
1549     POST_COMPLETION_FOR_REQ(loop, req);
1550     return 0;
1551   } else if (handle->flags & UV_HANDLE_NON_OVERLAPPED_PIPE) {
1552     req->write_buffer = write_buf;
1553     uv__insert_non_overlapped_write_req(handle, req);
1554     if (handle->stream.conn.write_reqs_pending == 0) {
1555       uv__queue_non_overlapped_write(handle);
1556     }
1557 
1558     /* Request queued by the kernel. */
1559     req->u.io.queued_bytes = write_buf.len;
1560     handle->write_queue_size += req->u.io.queued_bytes;
1561   } else if (handle->flags & UV_HANDLE_BLOCKING_WRITES) {
1562     /* Using overlapped IO, but wait for completion before returning */
1563     result = WriteFile(handle->handle,
1564                        write_buf.base,
1565                        write_buf.len,
1566                        NULL,
1567                        &req->u.io.overlapped);
1568 
1569     if (!result && GetLastError() != ERROR_IO_PENDING) {
1570       err = GetLastError();
1571       CloseHandle(req->event_handle);
1572       req->event_handle = NULL;
1573       return err;
1574     }
1575 
1576     if (result) {
1577       /* Request completed immediately. */
1578       req->u.io.queued_bytes = 0;
1579     } else {
1580       /* Request queued by the kernel. */
1581       req->u.io.queued_bytes = write_buf.len;
1582       handle->write_queue_size += req->u.io.queued_bytes;
1583       if (WaitForSingleObject(req->event_handle, INFINITE) !=
1584           WAIT_OBJECT_0) {
1585         err = GetLastError();
1586         CloseHandle(req->event_handle);
1587         req->event_handle = NULL;
1588         return err;
1589       }
1590     }
1591     CloseHandle(req->event_handle);
1592     req->event_handle = NULL;
1593 
1594     REGISTER_HANDLE_REQ(loop, handle, req);
1595     handle->reqs_pending++;
1596     handle->stream.conn.write_reqs_pending++;
1597     return 0;
1598   } else {
1599     result = WriteFile(handle->handle,
1600                        write_buf.base,
1601                        write_buf.len,
1602                        NULL,
1603                        &req->u.io.overlapped);
1604 
1605     if (!result && GetLastError() != ERROR_IO_PENDING) {
1606       return GetLastError();
1607     }
1608 
1609     if (result) {
1610       /* Request completed immediately. */
1611       req->u.io.queued_bytes = 0;
1612     } else {
1613       /* Request queued by the kernel. */
1614       req->u.io.queued_bytes = write_buf.len;
1615       handle->write_queue_size += req->u.io.queued_bytes;
1616     }
1617 
1618     if (handle->flags & UV_HANDLE_EMULATE_IOCP) {
1619       if (!RegisterWaitForSingleObject(&req->wait_handle,
1620           req->event_handle, post_completion_write_wait, (void*) req,
1621           INFINITE, WT_EXECUTEINWAITTHREAD)) {
1622         return GetLastError();
1623       }
1624     }
1625   }
1626 
1627   REGISTER_HANDLE_REQ(loop, handle, req);
1628   handle->reqs_pending++;
1629   handle->stream.conn.write_reqs_pending++;
1630 
1631   return 0;
1632 }
1633 
1634 
uv__pipe_get_ipc_remote_pid(uv_pipe_t * handle)1635 static DWORD uv__pipe_get_ipc_remote_pid(uv_pipe_t* handle) {
1636   DWORD* pid = &handle->pipe.conn.ipc_remote_pid;
1637 
1638   /* If the both ends of the IPC pipe are owned by the same process,
1639    * the remote end pid may not yet be set. If so, do it here.
1640    * TODO: this is weird; it'd probably better to use a handshake. */
1641   if (*pid == 0)
1642     *pid = GetCurrentProcessId();
1643 
1644   return *pid;
1645 }
1646 
1647 
uv__pipe_write_ipc(uv_loop_t * loop,uv_write_t * req,uv_pipe_t * handle,const uv_buf_t data_bufs[],size_t data_buf_count,uv_stream_t * send_handle,uv_write_cb cb)1648 int uv__pipe_write_ipc(uv_loop_t* loop,
1649                        uv_write_t* req,
1650                        uv_pipe_t* handle,
1651                        const uv_buf_t data_bufs[],
1652                        size_t data_buf_count,
1653                        uv_stream_t* send_handle,
1654                        uv_write_cb cb) {
1655   uv_buf_t stack_bufs[6];
1656   uv_buf_t* bufs;
1657   size_t buf_count, buf_index;
1658   uv__ipc_frame_header_t frame_header;
1659   uv__ipc_socket_xfer_type_t xfer_type = UV__IPC_SOCKET_XFER_NONE;
1660   uv__ipc_socket_xfer_info_t xfer_info;
1661   uint64_t data_length;
1662   size_t i;
1663   int err;
1664 
1665   /* Compute the combined size of data buffers. */
1666   data_length = 0;
1667   for (i = 0; i < data_buf_count; i++)
1668     data_length += data_bufs[i].len;
1669   if (data_length > UINT32_MAX)
1670     return WSAENOBUFS; /* Maps to UV_ENOBUFS. */
1671 
1672   /* Prepare the frame's socket xfer payload. */
1673   if (send_handle != NULL) {
1674     uv_tcp_t* send_tcp_handle = (uv_tcp_t*) send_handle;
1675 
1676     /* Verify that `send_handle` it is indeed a tcp handle. */
1677     if (send_tcp_handle->type != UV_TCP)
1678       return ERROR_NOT_SUPPORTED;
1679 
1680     /* Export the tcp handle. */
1681     err = uv__tcp_xfer_export(send_tcp_handle,
1682                               uv__pipe_get_ipc_remote_pid(handle),
1683                               &xfer_type,
1684                               &xfer_info);
1685     if (err != 0)
1686       return err;
1687   }
1688 
1689   /* Compute the number of uv_buf_t's required. */
1690   buf_count = 1 + data_buf_count; /* Frame header and data buffers. */
1691   if (send_handle != NULL)
1692     buf_count += 1; /* One extra for the socket xfer information. */
1693 
1694   /* Use the on-stack buffer array if it is big enough; otherwise allocate
1695    * space for it on the heap. */
1696   if (buf_count < ARRAY_SIZE(stack_bufs)) {
1697     /* Use on-stack buffer array. */
1698     bufs = stack_bufs;
1699   } else {
1700     /* Use heap-allocated buffer array. */
1701     bufs = uv__calloc(buf_count, sizeof(uv_buf_t));
1702     if (bufs == NULL)
1703       return ERROR_NOT_ENOUGH_MEMORY; /* Maps to UV_ENOMEM. */
1704   }
1705   buf_index = 0;
1706 
1707   /* Initialize frame header and add it to the buffers list. */
1708   memset(&frame_header, 0, sizeof frame_header);
1709   bufs[buf_index++] = uv_buf_init((char*) &frame_header, sizeof frame_header);
1710 
1711   if (send_handle != NULL) {
1712     /* Add frame header flags. */
1713     switch (xfer_type) {
1714       case UV__IPC_SOCKET_XFER_TCP_CONNECTION:
1715         frame_header.flags |= UV__IPC_FRAME_HAS_SOCKET_XFER |
1716                               UV__IPC_FRAME_XFER_IS_TCP_CONNECTION;
1717         break;
1718       case UV__IPC_SOCKET_XFER_TCP_SERVER:
1719         frame_header.flags |= UV__IPC_FRAME_HAS_SOCKET_XFER;
1720         break;
1721       default:
1722         assert(0);  /* Unreachable. */
1723     }
1724     /* Add xfer info buffer. */
1725     bufs[buf_index++] = uv_buf_init((char*) &xfer_info, sizeof xfer_info);
1726   }
1727 
1728   if (data_length > 0) {
1729     /* Update frame header. */
1730     frame_header.flags |= UV__IPC_FRAME_HAS_DATA;
1731     frame_header.data_length = (uint32_t) data_length;
1732     /* Add data buffers to buffers list. */
1733     for (i = 0; i < data_buf_count; i++)
1734       bufs[buf_index++] = data_bufs[i];
1735   }
1736 
1737   /* Write buffers. We set the `always_copy` flag, so it is not a problem that
1738    * some of the written data lives on the stack. */
1739   err = uv__pipe_write_data(loop, req, handle, bufs, buf_count, cb, 1);
1740 
1741   /* If we had to heap-allocate the bufs array, free it now. */
1742   if (bufs != stack_bufs) {
1743     uv__free(bufs);
1744   }
1745 
1746   return err;
1747 }
1748 
1749 
uv__pipe_write(uv_loop_t * loop,uv_write_t * req,uv_pipe_t * handle,const uv_buf_t bufs[],size_t nbufs,uv_stream_t * send_handle,uv_write_cb cb)1750 int uv__pipe_write(uv_loop_t* loop,
1751                    uv_write_t* req,
1752                    uv_pipe_t* handle,
1753                    const uv_buf_t bufs[],
1754                    size_t nbufs,
1755                    uv_stream_t* send_handle,
1756                    uv_write_cb cb) {
1757   if (handle->ipc) {
1758     /* IPC pipe write: use framing protocol. */
1759     return uv__pipe_write_ipc(loop, req, handle, bufs, nbufs, send_handle, cb);
1760   } else {
1761     /* Non-IPC pipe write: put data on the wire directly. */
1762     assert(send_handle == NULL);
1763     return uv__pipe_write_data(loop, req, handle, bufs, nbufs, cb, 0);
1764   }
1765 }
1766 
1767 
uv__pipe_read_eof(uv_loop_t * loop,uv_pipe_t * handle,uv_buf_t buf)1768 static void uv__pipe_read_eof(uv_loop_t* loop, uv_pipe_t* handle,
1769     uv_buf_t buf) {
1770   /* If there is an eof timer running, we don't need it any more, so discard
1771    * it. */
1772   eof_timer_destroy(handle);
1773 
1774   uv_read_stop((uv_stream_t*) handle);
1775 
1776   handle->read_cb((uv_stream_t*) handle, UV_EOF, &buf);
1777 }
1778 
1779 
uv__pipe_read_error(uv_loop_t * loop,uv_pipe_t * handle,int error,uv_buf_t buf)1780 static void uv__pipe_read_error(uv_loop_t* loop, uv_pipe_t* handle, int error,
1781     uv_buf_t buf) {
1782   /* If there is an eof timer running, we don't need it any more, so discard
1783    * it. */
1784   eof_timer_destroy(handle);
1785 
1786   uv_read_stop((uv_stream_t*) handle);
1787 
1788   handle->read_cb((uv_stream_t*)handle, uv_translate_sys_error(error), &buf);
1789 }
1790 
1791 
uv__pipe_read_error_or_eof(uv_loop_t * loop,uv_pipe_t * handle,int error,uv_buf_t buf)1792 static void uv__pipe_read_error_or_eof(uv_loop_t* loop, uv_pipe_t* handle,
1793     int error, uv_buf_t buf) {
1794   if (error == ERROR_BROKEN_PIPE) {
1795     uv__pipe_read_eof(loop, handle, buf);
1796   } else {
1797     uv__pipe_read_error(loop, handle, error, buf);
1798   }
1799 }
1800 
1801 
uv__pipe_queue_ipc_xfer_info(uv_pipe_t * handle,uv__ipc_socket_xfer_type_t xfer_type,uv__ipc_socket_xfer_info_t * xfer_info)1802 static void uv__pipe_queue_ipc_xfer_info(
1803     uv_pipe_t* handle,
1804     uv__ipc_socket_xfer_type_t xfer_type,
1805     uv__ipc_socket_xfer_info_t* xfer_info) {
1806   uv__ipc_xfer_queue_item_t* item;
1807 
1808   item = (uv__ipc_xfer_queue_item_t*) uv__malloc(sizeof(*item));
1809   if (item == NULL)
1810     uv_fatal_error(ERROR_OUTOFMEMORY, "uv__malloc");
1811 
1812   item->xfer_type = xfer_type;
1813   item->xfer_info = *xfer_info;
1814 
1815   QUEUE_INSERT_TAIL(&handle->pipe.conn.ipc_xfer_queue, &item->member);
1816   handle->pipe.conn.ipc_xfer_queue_length++;
1817 }
1818 
1819 
1820 /* Read an exact number of bytes from a pipe. If an error or end-of-file is
1821  * encountered before the requested number of bytes are read, an error is
1822  * returned. */
uv__pipe_read_exactly(HANDLE h,void * buffer,DWORD count)1823 static int uv__pipe_read_exactly(HANDLE h, void* buffer, DWORD count) {
1824   DWORD bytes_read, bytes_read_now;
1825 
1826   bytes_read = 0;
1827   while (bytes_read < count) {
1828     if (!ReadFile(h,
1829                   (char*) buffer + bytes_read,
1830                   count - bytes_read,
1831                   &bytes_read_now,
1832                   NULL)) {
1833       return GetLastError();
1834     }
1835 
1836     bytes_read += bytes_read_now;
1837   }
1838 
1839   assert(bytes_read == count);
1840   return 0;
1841 }
1842 
1843 
uv__pipe_read_data(uv_loop_t * loop,uv_pipe_t * handle,DWORD suggested_bytes,DWORD max_bytes)1844 static DWORD uv__pipe_read_data(uv_loop_t* loop,
1845                                 uv_pipe_t* handle,
1846                                 DWORD suggested_bytes,
1847                                 DWORD max_bytes) {
1848   DWORD bytes_read;
1849   uv_buf_t buf;
1850 
1851   /* Ask the user for a buffer to read data into. */
1852   buf = uv_buf_init(NULL, 0);
1853   handle->alloc_cb((uv_handle_t*) handle, suggested_bytes, &buf);
1854   if (buf.base == NULL || buf.len == 0) {
1855     handle->read_cb((uv_stream_t*) handle, UV_ENOBUFS, &buf);
1856     return 0; /* Break out of read loop. */
1857   }
1858 
1859   /* Ensure we read at most the smaller of:
1860    *   (a) the length of the user-allocated buffer.
1861    *   (b) the maximum data length as specified by the `max_bytes` argument.
1862    */
1863   if (max_bytes > buf.len)
1864     max_bytes = buf.len;
1865 
1866   /* Read into the user buffer. */
1867   if (!ReadFile(handle->handle, buf.base, max_bytes, &bytes_read, NULL)) {
1868     uv__pipe_read_error_or_eof(loop, handle, GetLastError(), buf);
1869     return 0; /* Break out of read loop. */
1870   }
1871 
1872   /* Call the read callback. */
1873   handle->read_cb((uv_stream_t*) handle, bytes_read, &buf);
1874 
1875   return bytes_read;
1876 }
1877 
1878 
uv__pipe_read_ipc(uv_loop_t * loop,uv_pipe_t * handle)1879 static DWORD uv__pipe_read_ipc(uv_loop_t* loop, uv_pipe_t* handle) {
1880   uint32_t* data_remaining = &handle->pipe.conn.ipc_data_frame.payload_remaining;
1881   int err;
1882 
1883   if (*data_remaining > 0) {
1884     /* Read frame data payload. */
1885     DWORD bytes_read =
1886         uv__pipe_read_data(loop, handle, *data_remaining, *data_remaining);
1887     *data_remaining -= bytes_read;
1888     return bytes_read;
1889 
1890   } else {
1891     /* Start of a new IPC frame. */
1892     uv__ipc_frame_header_t frame_header;
1893     uint32_t xfer_flags;
1894     uv__ipc_socket_xfer_type_t xfer_type;
1895     uv__ipc_socket_xfer_info_t xfer_info;
1896 
1897     /* Read the IPC frame header. */
1898     err = uv__pipe_read_exactly(
1899         handle->handle, &frame_header, sizeof frame_header);
1900     if (err)
1901       goto error;
1902 
1903     /* Validate that flags are valid. */
1904     if ((frame_header.flags & ~UV__IPC_FRAME_VALID_FLAGS) != 0)
1905       goto invalid;
1906     /* Validate that reserved2 is zero. */
1907     if (frame_header.reserved2 != 0)
1908       goto invalid;
1909 
1910     /* Parse xfer flags. */
1911     xfer_flags = frame_header.flags & UV__IPC_FRAME_XFER_FLAGS;
1912     if (xfer_flags & UV__IPC_FRAME_HAS_SOCKET_XFER) {
1913       /* Socket coming -- determine the type. */
1914       xfer_type = xfer_flags & UV__IPC_FRAME_XFER_IS_TCP_CONNECTION
1915                       ? UV__IPC_SOCKET_XFER_TCP_CONNECTION
1916                       : UV__IPC_SOCKET_XFER_TCP_SERVER;
1917     } else if (xfer_flags == 0) {
1918       /* No socket. */
1919       xfer_type = UV__IPC_SOCKET_XFER_NONE;
1920     } else {
1921       /* Invalid flags. */
1922       goto invalid;
1923     }
1924 
1925     /* Parse data frame information. */
1926     if (frame_header.flags & UV__IPC_FRAME_HAS_DATA) {
1927       *data_remaining = frame_header.data_length;
1928     } else if (frame_header.data_length != 0) {
1929       /* Data length greater than zero but data flag not set -- invalid. */
1930       goto invalid;
1931     }
1932 
1933     /* If no socket xfer info follows, return here. Data will be read in a
1934      * subsequent invocation of uv__pipe_read_ipc(). */
1935     if (xfer_type == UV__IPC_SOCKET_XFER_NONE)
1936       return sizeof frame_header; /* Number of bytes read. */
1937 
1938     /* Read transferred socket information. */
1939     err = uv__pipe_read_exactly(handle->handle, &xfer_info, sizeof xfer_info);
1940     if (err)
1941       goto error;
1942 
1943     /* Store the pending socket info. */
1944     uv__pipe_queue_ipc_xfer_info(handle, xfer_type, &xfer_info);
1945 
1946     /* Return number of bytes read. */
1947     return sizeof frame_header + sizeof xfer_info;
1948   }
1949 
1950 invalid:
1951   /* Invalid frame. */
1952   err = WSAECONNABORTED; /* Maps to UV_ECONNABORTED. */
1953 
1954 error:
1955   uv__pipe_read_error_or_eof(loop, handle, err, uv_null_buf_);
1956   return 0; /* Break out of read loop. */
1957 }
1958 
1959 
uv__process_pipe_read_req(uv_loop_t * loop,uv_pipe_t * handle,uv_req_t * req)1960 void uv__process_pipe_read_req(uv_loop_t* loop,
1961                                uv_pipe_t* handle,
1962                                uv_req_t* req) {
1963   assert(handle->type == UV_NAMED_PIPE);
1964 
1965   handle->flags &= ~(UV_HANDLE_READ_PENDING | UV_HANDLE_CANCELLATION_PENDING);
1966   DECREASE_PENDING_REQ_COUNT(handle);
1967   eof_timer_stop(handle);
1968 
1969   /* At this point, we're done with bookkeeping. If the user has stopped
1970    * reading the pipe in the meantime, there is nothing left to do, since there
1971    * is no callback that we can call. */
1972   if (!(handle->flags & UV_HANDLE_READING))
1973     return;
1974 
1975   if (!REQ_SUCCESS(req)) {
1976     /* An error occurred doing the zero-read. */
1977     DWORD err = GET_REQ_ERROR(req);
1978 
1979     /* If the read was cancelled by uv__pipe_interrupt_read(), the request may
1980      * indicate an ERROR_OPERATION_ABORTED error. This error isn't relevant to
1981      * the user; we'll start a new zero-read at the end of this function. */
1982     if (err != ERROR_OPERATION_ABORTED)
1983       uv__pipe_read_error_or_eof(loop, handle, err, uv_null_buf_);
1984 
1985   } else {
1986     /* The zero-read completed without error, indicating there is data
1987      * available in the kernel buffer. */
1988     DWORD avail;
1989 
1990     /* Get the number of bytes available. */
1991     avail = 0;
1992     if (!PeekNamedPipe(handle->handle, NULL, 0, NULL, &avail, NULL))
1993       uv__pipe_read_error_or_eof(loop, handle, GetLastError(), uv_null_buf_);
1994 
1995     /* Read until we've either read all the bytes available, or the 'reading'
1996      * flag is cleared. */
1997     while (avail > 0 && handle->flags & UV_HANDLE_READING) {
1998       /* Depending on the type of pipe, read either IPC frames or raw data. */
1999       DWORD bytes_read =
2000           handle->ipc ? uv__pipe_read_ipc(loop, handle)
2001                       : uv__pipe_read_data(loop, handle, avail, (DWORD) -1);
2002 
2003       /* If no bytes were read, treat this as an indication that an error
2004        * occurred, and break out of the read loop. */
2005       if (bytes_read == 0)
2006         break;
2007 
2008       /* It is possible that more bytes were read than we thought were
2009        * available. To prevent `avail` from underflowing, break out of the loop
2010        * if this is the case. */
2011       if (bytes_read > avail)
2012         break;
2013 
2014       /* Recompute the number of bytes available. */
2015       avail -= bytes_read;
2016     }
2017   }
2018 
2019   /* Start another zero-read request if necessary. */
2020   if ((handle->flags & UV_HANDLE_READING) &&
2021       !(handle->flags & UV_HANDLE_READ_PENDING)) {
2022     uv__pipe_queue_read(loop, handle);
2023   }
2024 }
2025 
2026 
uv__process_pipe_write_req(uv_loop_t * loop,uv_pipe_t * handle,uv_write_t * req)2027 void uv__process_pipe_write_req(uv_loop_t* loop, uv_pipe_t* handle,
2028     uv_write_t* req) {
2029   int err;
2030 
2031   assert(handle->type == UV_NAMED_PIPE);
2032 
2033   assert(handle->write_queue_size >= req->u.io.queued_bytes);
2034   handle->write_queue_size -= req->u.io.queued_bytes;
2035 
2036   UNREGISTER_HANDLE_REQ(loop, handle, req);
2037 
2038   if (handle->flags & UV_HANDLE_EMULATE_IOCP) {
2039     if (req->wait_handle != INVALID_HANDLE_VALUE) {
2040       UnregisterWait(req->wait_handle);
2041       req->wait_handle = INVALID_HANDLE_VALUE;
2042     }
2043     if (req->event_handle) {
2044       CloseHandle(req->event_handle);
2045       req->event_handle = NULL;
2046     }
2047   }
2048 
2049   err = GET_REQ_ERROR(req);
2050 
2051   /* If this was a coalesced write, extract pointer to the user_provided
2052    * uv_write_t structure so we can pass the expected pointer to the callback,
2053    * then free the heap-allocated write req. */
2054   if (req->coalesced) {
2055     uv__coalesced_write_t* coalesced_write =
2056         container_of(req, uv__coalesced_write_t, req);
2057     req = coalesced_write->user_req;
2058     uv__free(coalesced_write);
2059   }
2060   if (req->cb) {
2061     req->cb(req, uv_translate_sys_error(err));
2062   }
2063 
2064   handle->stream.conn.write_reqs_pending--;
2065 
2066   if (handle->flags & UV_HANDLE_NON_OVERLAPPED_PIPE &&
2067       handle->pipe.conn.non_overlapped_writes_tail) {
2068     assert(handle->stream.conn.write_reqs_pending > 0);
2069     uv__queue_non_overlapped_write(handle);
2070   }
2071 
2072   if (handle->stream.conn.write_reqs_pending == 0)
2073     if (handle->flags & UV_HANDLE_SHUTTING)
2074       uv__pipe_shutdown(loop, handle, handle->stream.conn.shutdown_req);
2075 
2076   DECREASE_PENDING_REQ_COUNT(handle);
2077 }
2078 
2079 
uv__process_pipe_accept_req(uv_loop_t * loop,uv_pipe_t * handle,uv_req_t * raw_req)2080 void uv__process_pipe_accept_req(uv_loop_t* loop, uv_pipe_t* handle,
2081     uv_req_t* raw_req) {
2082   uv_pipe_accept_t* req = (uv_pipe_accept_t*) raw_req;
2083 
2084   assert(handle->type == UV_NAMED_PIPE);
2085 
2086   if (handle->flags & UV_HANDLE_CLOSING) {
2087     /* The req->pipeHandle should be freed already in uv__pipe_close(). */
2088     assert(req->pipeHandle == INVALID_HANDLE_VALUE);
2089     DECREASE_PENDING_REQ_COUNT(handle);
2090     return;
2091   }
2092 
2093   if (REQ_SUCCESS(req)) {
2094     assert(req->pipeHandle != INVALID_HANDLE_VALUE);
2095     req->next_pending = handle->pipe.serv.pending_accepts;
2096     handle->pipe.serv.pending_accepts = req;
2097 
2098     if (handle->stream.serv.connection_cb) {
2099       handle->stream.serv.connection_cb((uv_stream_t*)handle, 0);
2100     }
2101   } else {
2102     if (req->pipeHandle != INVALID_HANDLE_VALUE) {
2103       CloseHandle(req->pipeHandle);
2104       req->pipeHandle = INVALID_HANDLE_VALUE;
2105     }
2106     if (!(handle->flags & UV_HANDLE_CLOSING)) {
2107       uv__pipe_queue_accept(loop, handle, req, FALSE);
2108     }
2109   }
2110 
2111   DECREASE_PENDING_REQ_COUNT(handle);
2112 }
2113 
2114 
uv__process_pipe_connect_req(uv_loop_t * loop,uv_pipe_t * handle,uv_connect_t * req)2115 void uv__process_pipe_connect_req(uv_loop_t* loop, uv_pipe_t* handle,
2116     uv_connect_t* req) {
2117   HANDLE pipeHandle;
2118   DWORD duplex_flags;
2119   int err;
2120 
2121   assert(handle->type == UV_NAMED_PIPE);
2122 
2123   UNREGISTER_HANDLE_REQ(loop, handle, req);
2124 
2125   err = 0;
2126   if (REQ_SUCCESS(req)) {
2127     pipeHandle = req->u.connect.pipeHandle;
2128     duplex_flags = req->u.connect.duplex_flags;
2129     err = uv__set_pipe_handle(loop, handle, pipeHandle, -1, duplex_flags);
2130     if (err)
2131       CloseHandle(pipeHandle);
2132   } else {
2133     err = uv_translate_sys_error(GET_REQ_ERROR(req));
2134   }
2135 
2136   if (req->cb)
2137     req->cb(req, err);
2138 
2139   DECREASE_PENDING_REQ_COUNT(handle);
2140 }
2141 
2142 
2143 
uv__process_pipe_shutdown_req(uv_loop_t * loop,uv_pipe_t * handle,uv_shutdown_t * req)2144 void uv__process_pipe_shutdown_req(uv_loop_t* loop, uv_pipe_t* handle,
2145     uv_shutdown_t* req) {
2146   int err;
2147 
2148   assert(handle->type == UV_NAMED_PIPE);
2149 
2150   /* Clear the shutdown_req field so we don't go here again. */
2151   handle->stream.conn.shutdown_req = NULL;
2152   handle->flags &= ~UV_HANDLE_SHUTTING;
2153   UNREGISTER_HANDLE_REQ(loop, handle, req);
2154 
2155   if (handle->flags & UV_HANDLE_CLOSING) {
2156     /* Already closing. Cancel the shutdown. */
2157     err = UV_ECANCELED;
2158   } else if (!REQ_SUCCESS(req)) {
2159     /* An error occurred in trying to shutdown gracefully. */
2160     err = uv_translate_sys_error(GET_REQ_ERROR(req));
2161   } else {
2162     if (handle->flags & UV_HANDLE_READABLE) {
2163       /* Initialize and optionally start the eof timer. Only do this if the pipe
2164        * is readable and we haven't seen EOF come in ourselves. */
2165       eof_timer_init(handle);
2166 
2167       /* If reading start the timer right now. Otherwise uv__pipe_queue_read will
2168        * start it. */
2169       if (handle->flags & UV_HANDLE_READ_PENDING) {
2170         eof_timer_start(handle);
2171       }
2172 
2173     } else {
2174       /* This pipe is not readable. We can just close it to let the other end
2175        * know that we're done writing. */
2176       close_pipe(handle);
2177     }
2178     err = 0;
2179   }
2180 
2181   if (req->cb)
2182     req->cb(req, err);
2183 
2184   DECREASE_PENDING_REQ_COUNT(handle);
2185 }
2186 
2187 
eof_timer_init(uv_pipe_t * pipe)2188 static void eof_timer_init(uv_pipe_t* pipe) {
2189   int r;
2190 
2191   assert(pipe->pipe.conn.eof_timer == NULL);
2192   assert(pipe->flags & UV_HANDLE_CONNECTION);
2193 
2194   pipe->pipe.conn.eof_timer = (uv_timer_t*) uv__malloc(sizeof *pipe->pipe.conn.eof_timer);
2195 
2196   r = uv_timer_init(pipe->loop, pipe->pipe.conn.eof_timer);
2197   assert(r == 0);  /* timers can't fail */
2198   (void) r;
2199   pipe->pipe.conn.eof_timer->data = pipe;
2200   uv_unref((uv_handle_t*) pipe->pipe.conn.eof_timer);
2201 }
2202 
2203 
eof_timer_start(uv_pipe_t * pipe)2204 static void eof_timer_start(uv_pipe_t* pipe) {
2205   assert(pipe->flags & UV_HANDLE_CONNECTION);
2206 
2207   if (pipe->pipe.conn.eof_timer != NULL) {
2208     uv_timer_start(pipe->pipe.conn.eof_timer, eof_timer_cb, eof_timeout, 0);
2209   }
2210 }
2211 
2212 
eof_timer_stop(uv_pipe_t * pipe)2213 static void eof_timer_stop(uv_pipe_t* pipe) {
2214   assert(pipe->flags & UV_HANDLE_CONNECTION);
2215 
2216   if (pipe->pipe.conn.eof_timer != NULL) {
2217     uv_timer_stop(pipe->pipe.conn.eof_timer);
2218   }
2219 }
2220 
2221 
eof_timer_cb(uv_timer_t * timer)2222 static void eof_timer_cb(uv_timer_t* timer) {
2223   uv_pipe_t* pipe = (uv_pipe_t*) timer->data;
2224   uv_loop_t* loop = timer->loop;
2225 
2226   assert(pipe->type == UV_NAMED_PIPE);
2227 
2228   /* This should always be true, since we start the timer only in
2229    * uv__pipe_queue_read after successfully calling ReadFile, or in
2230    * uv__process_pipe_shutdown_req if a read is pending, and we always
2231    * immediately stop the timer in uv__process_pipe_read_req. */
2232   assert(pipe->flags & UV_HANDLE_READ_PENDING);
2233 
2234   /* If there are many packets coming off the iocp then the timer callback may
2235    * be called before the read request is coming off the queue. Therefore we
2236    * check here if the read request has completed but will be processed later.
2237    */
2238   if ((pipe->flags & UV_HANDLE_READ_PENDING) &&
2239       HasOverlappedIoCompleted(&pipe->read_req.u.io.overlapped)) {
2240     return;
2241   }
2242 
2243   /* Force both ends off the pipe. */
2244   close_pipe(pipe);
2245 
2246   /* Stop reading, so the pending read that is going to fail will not be
2247    * reported to the user. */
2248   uv_read_stop((uv_stream_t*) pipe);
2249 
2250   /* Report the eof and update flags. This will get reported even if the user
2251    * stopped reading in the meantime. TODO: is that okay? */
2252   uv__pipe_read_eof(loop, pipe, uv_null_buf_);
2253 }
2254 
2255 
eof_timer_destroy(uv_pipe_t * pipe)2256 static void eof_timer_destroy(uv_pipe_t* pipe) {
2257   assert(pipe->flags & UV_HANDLE_CONNECTION);
2258 
2259   if (pipe->pipe.conn.eof_timer) {
2260     uv_close((uv_handle_t*) pipe->pipe.conn.eof_timer, eof_timer_close_cb);
2261     pipe->pipe.conn.eof_timer = NULL;
2262   }
2263 }
2264 
2265 
eof_timer_close_cb(uv_handle_t * handle)2266 static void eof_timer_close_cb(uv_handle_t* handle) {
2267   assert(handle->type == UV_TIMER);
2268   uv__free(handle);
2269 }
2270 
2271 
uv_pipe_open(uv_pipe_t * pipe,uv_file file)2272 int uv_pipe_open(uv_pipe_t* pipe, uv_file file) {
2273   HANDLE os_handle = uv__get_osfhandle(file);
2274   NTSTATUS nt_status;
2275   IO_STATUS_BLOCK io_status;
2276   FILE_ACCESS_INFORMATION access;
2277   DWORD duplex_flags = 0;
2278   int err;
2279 
2280   if (os_handle == INVALID_HANDLE_VALUE)
2281     return UV_EBADF;
2282   if (pipe->flags & UV_HANDLE_PIPESERVER)
2283     return UV_EINVAL;
2284   if (pipe->flags & UV_HANDLE_CONNECTION)
2285     return UV_EBUSY;
2286 
2287   uv__pipe_connection_init(pipe);
2288   uv__once_init();
2289   /* In order to avoid closing a stdio file descriptor 0-2, duplicate the
2290    * underlying OS handle and forget about the original fd.
2291    * We could also opt to use the original OS handle and just never close it,
2292    * but then there would be no reliable way to cancel pending read operations
2293    * upon close.
2294    */
2295   if (file <= 2) {
2296     if (!DuplicateHandle(INVALID_HANDLE_VALUE,
2297                          os_handle,
2298                          INVALID_HANDLE_VALUE,
2299                          &os_handle,
2300                          0,
2301                          FALSE,
2302                          DUPLICATE_SAME_ACCESS))
2303       return uv_translate_sys_error(GetLastError());
2304     assert(os_handle != INVALID_HANDLE_VALUE);
2305     file = -1;
2306   }
2307 
2308   /* Determine what kind of permissions we have on this handle.
2309    * Cygwin opens the pipe in message mode, but we can support it,
2310    * just query the access flags and set the stream flags accordingly.
2311    */
2312   nt_status = pNtQueryInformationFile(os_handle,
2313                                       &io_status,
2314                                       &access,
2315                                       sizeof(access),
2316                                       FileAccessInformation);
2317   if (nt_status != STATUS_SUCCESS)
2318     return UV_EINVAL;
2319 
2320   if (pipe->ipc) {
2321     if (!(access.AccessFlags & FILE_WRITE_DATA) ||
2322         !(access.AccessFlags & FILE_READ_DATA)) {
2323       return UV_EINVAL;
2324     }
2325   }
2326 
2327   if (access.AccessFlags & FILE_WRITE_DATA)
2328     duplex_flags |= UV_HANDLE_WRITABLE;
2329   if (access.AccessFlags & FILE_READ_DATA)
2330     duplex_flags |= UV_HANDLE_READABLE;
2331 
2332   err = uv__set_pipe_handle(pipe->loop,
2333                             pipe,
2334                             os_handle,
2335                             file,
2336                             duplex_flags);
2337   if (err) {
2338     if (file == -1)
2339       CloseHandle(os_handle);
2340     return err;
2341   }
2342 
2343   if (pipe->ipc) {
2344     assert(!(pipe->flags & UV_HANDLE_NON_OVERLAPPED_PIPE));
2345     pipe->pipe.conn.ipc_remote_pid = uv_os_getppid();
2346     assert(pipe->pipe.conn.ipc_remote_pid != (DWORD)(uv_pid_t) -1);
2347   }
2348   return 0;
2349 }
2350 
2351 
uv__pipe_getname(const uv_pipe_t * handle,char * buffer,size_t * size)2352 static int uv__pipe_getname(const uv_pipe_t* handle, char* buffer, size_t* size) {
2353   NTSTATUS nt_status;
2354   IO_STATUS_BLOCK io_status;
2355   FILE_NAME_INFORMATION tmp_name_info;
2356   FILE_NAME_INFORMATION* name_info;
2357   WCHAR* name_buf;
2358   unsigned int addrlen;
2359   unsigned int name_size;
2360   unsigned int name_len;
2361   int err;
2362 
2363   uv__once_init();
2364   name_info = NULL;
2365 
2366   if (handle->name != NULL) {
2367     /* The user might try to query the name before we are connected,
2368      * and this is just easier to return the cached value if we have it. */
2369     name_buf = handle->name;
2370     name_len = wcslen(name_buf);
2371 
2372     /* check how much space we need */
2373     addrlen = WideCharToMultiByte(CP_UTF8,
2374                                   0,
2375                                   name_buf,
2376                                   name_len,
2377                                   NULL,
2378                                   0,
2379                                   NULL,
2380                                   NULL);
2381     if (!addrlen) {
2382       *size = 0;
2383       err = uv_translate_sys_error(GetLastError());
2384       return err;
2385     } else if (addrlen >= *size) {
2386       *size = addrlen + 1;
2387       err = UV_ENOBUFS;
2388       goto error;
2389     }
2390 
2391     addrlen = WideCharToMultiByte(CP_UTF8,
2392                                   0,
2393                                   name_buf,
2394                                   name_len,
2395                                   buffer,
2396                                   addrlen,
2397                                   NULL,
2398                                   NULL);
2399     if (!addrlen) {
2400       *size = 0;
2401       err = uv_translate_sys_error(GetLastError());
2402       return err;
2403     }
2404 
2405     *size = addrlen;
2406     buffer[addrlen] = '\0';
2407 
2408     return 0;
2409   }
2410 
2411   if (handle->handle == INVALID_HANDLE_VALUE) {
2412     *size = 0;
2413     return UV_EINVAL;
2414   }
2415 
2416   /* NtQueryInformationFile will block if another thread is performing a
2417    * blocking operation on the queried handle. If the pipe handle is
2418    * synchronous, there may be a worker thread currently calling ReadFile() on
2419    * the pipe handle, which could cause a deadlock. To avoid this, interrupt
2420    * the read. */
2421   if (handle->flags & UV_HANDLE_CONNECTION &&
2422       handle->flags & UV_HANDLE_NON_OVERLAPPED_PIPE) {
2423     uv__pipe_interrupt_read((uv_pipe_t*) handle); /* cast away const warning */
2424   }
2425 
2426   nt_status = pNtQueryInformationFile(handle->handle,
2427                                       &io_status,
2428                                       &tmp_name_info,
2429                                       sizeof tmp_name_info,
2430                                       FileNameInformation);
2431   if (nt_status == STATUS_BUFFER_OVERFLOW) {
2432     name_size = sizeof(*name_info) + tmp_name_info.FileNameLength;
2433     name_info = uv__malloc(name_size);
2434     if (!name_info) {
2435       *size = 0;
2436       err = UV_ENOMEM;
2437       goto cleanup;
2438     }
2439 
2440     nt_status = pNtQueryInformationFile(handle->handle,
2441                                         &io_status,
2442                                         name_info,
2443                                         name_size,
2444                                         FileNameInformation);
2445   }
2446 
2447   if (nt_status != STATUS_SUCCESS) {
2448     *size = 0;
2449     err = uv_translate_sys_error(pRtlNtStatusToDosError(nt_status));
2450     goto error;
2451   }
2452 
2453   if (!name_info) {
2454     /* the struct on stack was used */
2455     name_buf = tmp_name_info.FileName;
2456     name_len = tmp_name_info.FileNameLength;
2457   } else {
2458     name_buf = name_info->FileName;
2459     name_len = name_info->FileNameLength;
2460   }
2461 
2462   if (name_len == 0) {
2463     *size = 0;
2464     err = 0;
2465     goto error;
2466   }
2467 
2468   name_len /= sizeof(WCHAR);
2469 
2470   /* check how much space we need */
2471   addrlen = WideCharToMultiByte(CP_UTF8,
2472                                 0,
2473                                 name_buf,
2474                                 name_len,
2475                                 NULL,
2476                                 0,
2477                                 NULL,
2478                                 NULL);
2479   if (!addrlen) {
2480     *size = 0;
2481     err = uv_translate_sys_error(GetLastError());
2482     goto error;
2483   } else if (pipe_prefix_len + addrlen >= *size) {
2484     /* "\\\\.\\pipe" + name */
2485     *size = pipe_prefix_len + addrlen + 1;
2486     err = UV_ENOBUFS;
2487     goto error;
2488   }
2489 
2490   memcpy(buffer, pipe_prefix, pipe_prefix_len);
2491   addrlen = WideCharToMultiByte(CP_UTF8,
2492                                 0,
2493                                 name_buf,
2494                                 name_len,
2495                                 buffer+pipe_prefix_len,
2496                                 *size-pipe_prefix_len,
2497                                 NULL,
2498                                 NULL);
2499   if (!addrlen) {
2500     *size = 0;
2501     err = uv_translate_sys_error(GetLastError());
2502     goto error;
2503   }
2504 
2505   addrlen += pipe_prefix_len;
2506   *size = addrlen;
2507   buffer[addrlen] = '\0';
2508 
2509   err = 0;
2510 
2511 error:
2512   uv__free(name_info);
2513 
2514 cleanup:
2515   return err;
2516 }
2517 
2518 
uv_pipe_pending_count(uv_pipe_t * handle)2519 int uv_pipe_pending_count(uv_pipe_t* handle) {
2520   if (!handle->ipc)
2521     return 0;
2522   return handle->pipe.conn.ipc_xfer_queue_length;
2523 }
2524 
2525 
uv_pipe_getsockname(const uv_pipe_t * handle,char * buffer,size_t * size)2526 int uv_pipe_getsockname(const uv_pipe_t* handle, char* buffer, size_t* size) {
2527   if (handle->flags & UV_HANDLE_BOUND)
2528     return uv__pipe_getname(handle, buffer, size);
2529 
2530   if (handle->flags & UV_HANDLE_CONNECTION ||
2531       handle->handle != INVALID_HANDLE_VALUE) {
2532     *size = 0;
2533     return 0;
2534   }
2535 
2536   return UV_EBADF;
2537 }
2538 
2539 
uv_pipe_getpeername(const uv_pipe_t * handle,char * buffer,size_t * size)2540 int uv_pipe_getpeername(const uv_pipe_t* handle, char* buffer, size_t* size) {
2541   /* emulate unix behaviour */
2542   if (handle->flags & UV_HANDLE_BOUND)
2543     return UV_ENOTCONN;
2544 
2545   if (handle->handle != INVALID_HANDLE_VALUE)
2546     return uv__pipe_getname(handle, buffer, size);
2547 
2548   if (handle->flags & UV_HANDLE_CONNECTION) {
2549     if (handle->name != NULL)
2550       return uv__pipe_getname(handle, buffer, size);
2551   }
2552 
2553   return UV_EBADF;
2554 }
2555 
2556 
uv_pipe_pending_type(uv_pipe_t * handle)2557 uv_handle_type uv_pipe_pending_type(uv_pipe_t* handle) {
2558   if (!handle->ipc)
2559     return UV_UNKNOWN_HANDLE;
2560   if (handle->pipe.conn.ipc_xfer_queue_length == 0)
2561     return UV_UNKNOWN_HANDLE;
2562   else
2563     return UV_TCP;
2564 }
2565 
uv_pipe_chmod(uv_pipe_t * handle,int mode)2566 int uv_pipe_chmod(uv_pipe_t* handle, int mode) {
2567   SID_IDENTIFIER_AUTHORITY sid_world = { SECURITY_WORLD_SID_AUTHORITY };
2568   PACL old_dacl, new_dacl;
2569   PSECURITY_DESCRIPTOR sd;
2570   EXPLICIT_ACCESS ea;
2571   PSID everyone;
2572   int error;
2573 
2574   if (handle == NULL || handle->handle == INVALID_HANDLE_VALUE)
2575     return UV_EBADF;
2576 
2577   if (mode != UV_READABLE &&
2578       mode != UV_WRITABLE &&
2579       mode != (UV_WRITABLE | UV_READABLE))
2580     return UV_EINVAL;
2581 
2582   if (!AllocateAndInitializeSid(&sid_world,
2583                                 1,
2584                                 SECURITY_WORLD_RID,
2585                                 0, 0, 0, 0, 0, 0, 0,
2586                                 &everyone)) {
2587     error = GetLastError();
2588     goto done;
2589   }
2590 
2591   if (GetSecurityInfo(handle->handle,
2592                       SE_KERNEL_OBJECT,
2593                       DACL_SECURITY_INFORMATION,
2594                       NULL,
2595                       NULL,
2596                       &old_dacl,
2597                       NULL,
2598                       &sd)) {
2599     error = GetLastError();
2600     goto clean_sid;
2601   }
2602 
2603   memset(&ea, 0, sizeof(EXPLICIT_ACCESS));
2604   if (mode & UV_READABLE)
2605     ea.grfAccessPermissions |= GENERIC_READ | FILE_WRITE_ATTRIBUTES;
2606   if (mode & UV_WRITABLE)
2607     ea.grfAccessPermissions |= GENERIC_WRITE | FILE_READ_ATTRIBUTES;
2608   ea.grfAccessPermissions |= SYNCHRONIZE;
2609   ea.grfAccessMode = SET_ACCESS;
2610   ea.grfInheritance = NO_INHERITANCE;
2611   ea.Trustee.TrusteeForm = TRUSTEE_IS_SID;
2612   ea.Trustee.TrusteeType = TRUSTEE_IS_WELL_KNOWN_GROUP;
2613   ea.Trustee.ptstrName = (LPTSTR)everyone;
2614 
2615   if (SetEntriesInAcl(1, &ea, old_dacl, &new_dacl)) {
2616     error = GetLastError();
2617     goto clean_sd;
2618   }
2619 
2620   if (SetSecurityInfo(handle->handle,
2621                       SE_KERNEL_OBJECT,
2622                       DACL_SECURITY_INFORMATION,
2623                       NULL,
2624                       NULL,
2625                       new_dacl,
2626                       NULL)) {
2627     error = GetLastError();
2628     goto clean_dacl;
2629   }
2630 
2631   error = 0;
2632 
2633 clean_dacl:
2634   LocalFree((HLOCAL) new_dacl);
2635 clean_sd:
2636   LocalFree((HLOCAL) sd);
2637 clean_sid:
2638   FreeSid(everyone);
2639 done:
2640   return uv_translate_sys_error(error);
2641 }
2642