1 /* Copyright Joyent, Inc. and other Node contributors. All rights reserved. 2 * 3 * Permission is hereby granted, free of charge, to any person obtaining a copy 4 * of this software and associated documentation files (the "Software"), to 5 * deal in the Software without restriction, including without limitation the 6 * rights to use, copy, modify, merge, publish, distribute, sublicense, and/or 7 * sell copies of the Software, and to permit persons to whom the Software is 8 * furnished to do so, subject to the following conditions: 9 * 10 * The above copyright notice and this permission notice shall be included in 11 * all copies or substantial portions of the Software. 12 * 13 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR 14 * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, 15 * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE 16 * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER 17 * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING 18 * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS 19 * IN THE SOFTWARE. 20 */ 21 22 #include <assert.h> 23 #include <io.h> 24 #include <stdio.h> 25 #include <stdlib.h> 26 #include <string.h> 27 28 #include "handle-inl.h" 29 #include "internal.h" 30 #include "req-inl.h" 31 #include "stream-inl.h" 32 #include "uv-common.h" 33 #include "uv.h" 34 35 #include <aclapi.h> 36 #include <accctrl.h> 37 38 /* A zero-size buffer for use by uv_pipe_read */ 39 static char uv_zero_[] = ""; 40 41 /* Null uv_buf_t */ 42 static const uv_buf_t uv_null_buf_ = { 0, NULL }; 43 44 /* The timeout that the pipe will wait for the remote end to write data when 45 * the local ends wants to shut it down. */ 46 static const int64_t eof_timeout = 50; /* ms */ 47 48 static const int default_pending_pipe_instances = 4; 49 50 /* Pipe prefix */ 51 static char pipe_prefix[] = "\\\\?\\pipe"; 52 static const int pipe_prefix_len = sizeof(pipe_prefix) - 1; 53 54 /* IPC incoming xfer queue item. */ 55 typedef struct { 56 uv__ipc_socket_xfer_type_t xfer_type; 57 uv__ipc_socket_xfer_info_t xfer_info; 58 QUEUE member; 59 } uv__ipc_xfer_queue_item_t; 60 61 /* IPC frame header flags. */ 62 /* clang-format off */ 63 enum { 64 UV__IPC_FRAME_HAS_DATA = 0x01, 65 UV__IPC_FRAME_HAS_SOCKET_XFER = 0x02, 66 UV__IPC_FRAME_XFER_IS_TCP_CONNECTION = 0x04, 67 /* These are combinations of the flags above. */ 68 UV__IPC_FRAME_XFER_FLAGS = 0x06, 69 UV__IPC_FRAME_VALID_FLAGS = 0x07 70 }; 71 /* clang-format on */ 72 73 /* IPC frame header. */ 74 typedef struct { 75 uint32_t flags; 76 uint32_t reserved1; /* Ignored. */ 77 uint32_t data_length; /* Must be zero if there is no data. */ 78 uint32_t reserved2; /* Must be zero. */ 79 } uv__ipc_frame_header_t; 80 81 /* To implement the IPC protocol correctly, these structures must have exactly 82 * the right size. */ 83 STATIC_ASSERT(sizeof(uv__ipc_frame_header_t) == 16); 84 STATIC_ASSERT(sizeof(uv__ipc_socket_xfer_info_t) == 632); 85 86 /* Coalesced write request. */ 87 typedef struct { 88 uv_write_t req; /* Internal heap-allocated write request. */ 89 uv_write_t* user_req; /* Pointer to user-specified uv_write_t. */ 90 } uv__coalesced_write_t; 91 92 93 static void eof_timer_init(uv_pipe_t* pipe); 94 static void eof_timer_start(uv_pipe_t* pipe); 95 static void eof_timer_stop(uv_pipe_t* pipe); 96 static void eof_timer_cb(uv_timer_t* timer); 97 static void eof_timer_destroy(uv_pipe_t* pipe); 98 static void eof_timer_close_cb(uv_handle_t* handle); 99 100 101 static void uv_unique_pipe_name(char* ptr, char* name, size_t size) { 102 snprintf(name, size, "\\\\?\\pipe\\uv\\%p-%lu", ptr, GetCurrentProcessId()); 103 } 104 105 106 int uv_pipe_init(uv_loop_t* loop, uv_pipe_t* handle, int ipc) { 107 uv_stream_init(loop, (uv_stream_t*)handle, UV_NAMED_PIPE); 108 109 handle->reqs_pending = 0; 110 handle->handle = INVALID_HANDLE_VALUE; 111 handle->name = NULL; 112 handle->pipe.conn.ipc_remote_pid = 0; 113 handle->pipe.conn.ipc_data_frame.payload_remaining = 0; 114 QUEUE_INIT(&handle->pipe.conn.ipc_xfer_queue); 115 handle->pipe.conn.ipc_xfer_queue_length = 0; 116 handle->ipc = ipc; 117 handle->pipe.conn.non_overlapped_writes_tail = NULL; 118 119 return 0; 120 } 121 122 123 static void uv_pipe_connection_init(uv_pipe_t* handle) { 124 uv_connection_init((uv_stream_t*) handle); 125 handle->read_req.data = handle; 126 handle->pipe.conn.eof_timer = NULL; 127 assert(!(handle->flags & UV_HANDLE_PIPESERVER)); 128 if (handle->flags & UV_HANDLE_NON_OVERLAPPED_PIPE) { 129 handle->pipe.conn.readfile_thread_handle = NULL; 130 InitializeCriticalSection(&handle->pipe.conn.readfile_thread_lock); 131 } 132 } 133 134 135 static HANDLE open_named_pipe(const WCHAR* name, DWORD* duplex_flags) { 136 HANDLE pipeHandle; 137 138 /* 139 * Assume that we have a duplex pipe first, so attempt to 140 * connect with GENERIC_READ | GENERIC_WRITE. 141 */ 142 pipeHandle = CreateFileW(name, 143 GENERIC_READ | GENERIC_WRITE, 144 0, 145 NULL, 146 OPEN_EXISTING, 147 FILE_FLAG_OVERLAPPED, 148 NULL); 149 if (pipeHandle != INVALID_HANDLE_VALUE) { 150 *duplex_flags = UV_HANDLE_READABLE | UV_HANDLE_WRITABLE; 151 return pipeHandle; 152 } 153 154 /* 155 * If the pipe is not duplex CreateFileW fails with 156 * ERROR_ACCESS_DENIED. In that case try to connect 157 * as a read-only or write-only. 158 */ 159 if (GetLastError() == ERROR_ACCESS_DENIED) { 160 pipeHandle = CreateFileW(name, 161 GENERIC_READ | FILE_WRITE_ATTRIBUTES, 162 0, 163 NULL, 164 OPEN_EXISTING, 165 FILE_FLAG_OVERLAPPED, 166 NULL); 167 168 if (pipeHandle != INVALID_HANDLE_VALUE) { 169 *duplex_flags = UV_HANDLE_READABLE; 170 return pipeHandle; 171 } 172 } 173 174 if (GetLastError() == ERROR_ACCESS_DENIED) { 175 pipeHandle = CreateFileW(name, 176 GENERIC_WRITE | FILE_READ_ATTRIBUTES, 177 0, 178 NULL, 179 OPEN_EXISTING, 180 FILE_FLAG_OVERLAPPED, 181 NULL); 182 183 if (pipeHandle != INVALID_HANDLE_VALUE) { 184 *duplex_flags = UV_HANDLE_WRITABLE; 185 return pipeHandle; 186 } 187 } 188 189 return INVALID_HANDLE_VALUE; 190 } 191 192 193 static void close_pipe(uv_pipe_t* pipe) { 194 assert(pipe->u.fd == -1 || pipe->u.fd > 2); 195 if (pipe->u.fd == -1) 196 CloseHandle(pipe->handle); 197 else 198 close(pipe->u.fd); 199 200 pipe->u.fd = -1; 201 pipe->handle = INVALID_HANDLE_VALUE; 202 } 203 204 205 int uv_stdio_pipe_server(uv_loop_t* loop, uv_pipe_t* handle, DWORD access, 206 char* name, size_t nameSize) { 207 HANDLE pipeHandle; 208 int err; 209 char* ptr = (char*)handle; 210 211 for (;;) { 212 uv_unique_pipe_name(ptr, name, nameSize); 213 214 pipeHandle = CreateNamedPipeA(name, 215 access | FILE_FLAG_OVERLAPPED | FILE_FLAG_FIRST_PIPE_INSTANCE | WRITE_DAC, 216 PIPE_TYPE_BYTE | PIPE_READMODE_BYTE | PIPE_WAIT, 1, 65536, 65536, 0, 217 NULL); 218 219 if (pipeHandle != INVALID_HANDLE_VALUE) { 220 /* No name collisions. We're done. */ 221 break; 222 } 223 224 err = GetLastError(); 225 if (err != ERROR_PIPE_BUSY && err != ERROR_ACCESS_DENIED) { 226 goto error; 227 } 228 229 /* Pipe name collision. Increment the pointer and try again. */ 230 ptr++; 231 } 232 233 if (CreateIoCompletionPort(pipeHandle, 234 loop->iocp, 235 (ULONG_PTR)handle, 236 0) == NULL) { 237 err = GetLastError(); 238 goto error; 239 } 240 241 uv_pipe_connection_init(handle); 242 handle->handle = pipeHandle; 243 244 return 0; 245 246 error: 247 if (pipeHandle != INVALID_HANDLE_VALUE) { 248 CloseHandle(pipeHandle); 249 } 250 251 return err; 252 } 253 254 255 static int uv_set_pipe_handle(uv_loop_t* loop, 256 uv_pipe_t* handle, 257 HANDLE pipeHandle, 258 int fd, 259 DWORD duplex_flags) { 260 NTSTATUS nt_status; 261 IO_STATUS_BLOCK io_status; 262 FILE_MODE_INFORMATION mode_info; 263 DWORD mode = PIPE_READMODE_BYTE | PIPE_WAIT; 264 DWORD current_mode = 0; 265 DWORD err = 0; 266 267 if (handle->flags & UV_HANDLE_PIPESERVER) 268 return UV_EINVAL; 269 if (handle->handle != INVALID_HANDLE_VALUE) 270 return UV_EBUSY; 271 272 if (!SetNamedPipeHandleState(pipeHandle, &mode, NULL, NULL)) { 273 err = GetLastError(); 274 if (err == ERROR_ACCESS_DENIED) { 275 /* 276 * SetNamedPipeHandleState can fail if the handle doesn't have either 277 * GENERIC_WRITE or FILE_WRITE_ATTRIBUTES. 278 * But if the handle already has the desired wait and blocking modes 279 * we can continue. 280 */ 281 if (!GetNamedPipeHandleState(pipeHandle, ¤t_mode, NULL, NULL, 282 NULL, NULL, 0)) { 283 return -1; 284 } else if (current_mode & PIPE_NOWAIT) { 285 SetLastError(ERROR_ACCESS_DENIED); 286 return -1; 287 } 288 } else { 289 /* If this returns ERROR_INVALID_PARAMETER we probably opened 290 * something that is not a pipe. */ 291 if (err == ERROR_INVALID_PARAMETER) { 292 SetLastError(WSAENOTSOCK); 293 } 294 return -1; 295 } 296 } 297 298 /* Check if the pipe was created with FILE_FLAG_OVERLAPPED. */ 299 nt_status = pNtQueryInformationFile(pipeHandle, 300 &io_status, 301 &mode_info, 302 sizeof(mode_info), 303 FileModeInformation); 304 if (nt_status != STATUS_SUCCESS) { 305 return -1; 306 } 307 308 if (mode_info.Mode & FILE_SYNCHRONOUS_IO_ALERT || 309 mode_info.Mode & FILE_SYNCHRONOUS_IO_NONALERT) { 310 /* Non-overlapped pipe. */ 311 handle->flags |= UV_HANDLE_NON_OVERLAPPED_PIPE; 312 } else { 313 /* Overlapped pipe. Try to associate with IOCP. */ 314 if (CreateIoCompletionPort(pipeHandle, 315 loop->iocp, 316 (ULONG_PTR) handle, 317 0) == NULL) { 318 handle->flags |= UV_HANDLE_EMULATE_IOCP; 319 } 320 } 321 322 handle->handle = pipeHandle; 323 handle->u.fd = fd; 324 handle->flags |= duplex_flags; 325 326 return 0; 327 } 328 329 330 static int pipe_alloc_accept(uv_loop_t* loop, uv_pipe_t* handle, 331 uv_pipe_accept_t* req, BOOL firstInstance) { 332 assert(req->pipeHandle == INVALID_HANDLE_VALUE); 333 334 req->pipeHandle = 335 CreateNamedPipeW(handle->name, 336 PIPE_ACCESS_DUPLEX | FILE_FLAG_OVERLAPPED | WRITE_DAC | 337 (firstInstance ? FILE_FLAG_FIRST_PIPE_INSTANCE : 0), 338 PIPE_TYPE_BYTE | PIPE_READMODE_BYTE | PIPE_WAIT, 339 PIPE_UNLIMITED_INSTANCES, 65536, 65536, 0, NULL); 340 341 if (req->pipeHandle == INVALID_HANDLE_VALUE) { 342 return 0; 343 } 344 345 /* Associate it with IOCP so we can get events. */ 346 if (CreateIoCompletionPort(req->pipeHandle, 347 loop->iocp, 348 (ULONG_PTR) handle, 349 0) == NULL) { 350 uv_fatal_error(GetLastError(), "CreateIoCompletionPort"); 351 } 352 353 /* Stash a handle in the server object for use from places such as 354 * getsockname and chmod. As we transfer ownership of these to client 355 * objects, we'll allocate new ones here. */ 356 handle->handle = req->pipeHandle; 357 358 return 1; 359 } 360 361 362 static DWORD WINAPI pipe_shutdown_thread_proc(void* parameter) { 363 uv_loop_t* loop; 364 uv_pipe_t* handle; 365 uv_shutdown_t* req; 366 367 req = (uv_shutdown_t*) parameter; 368 assert(req); 369 handle = (uv_pipe_t*) req->handle; 370 assert(handle); 371 loop = handle->loop; 372 assert(loop); 373 374 FlushFileBuffers(handle->handle); 375 376 /* Post completed */ 377 POST_COMPLETION_FOR_REQ(loop, req); 378 379 return 0; 380 } 381 382 383 void uv_pipe_endgame(uv_loop_t* loop, uv_pipe_t* handle) { 384 int err; 385 DWORD result; 386 uv_shutdown_t* req; 387 NTSTATUS nt_status; 388 IO_STATUS_BLOCK io_status; 389 FILE_PIPE_LOCAL_INFORMATION pipe_info; 390 uv__ipc_xfer_queue_item_t* xfer_queue_item; 391 392 if ((handle->flags & UV_HANDLE_CONNECTION) && 393 handle->stream.conn.shutdown_req != NULL && 394 handle->stream.conn.write_reqs_pending == 0) { 395 req = handle->stream.conn.shutdown_req; 396 397 /* Clear the shutdown_req field so we don't go here again. */ 398 handle->stream.conn.shutdown_req = NULL; 399 400 if (handle->flags & UV_HANDLE_CLOSING) { 401 UNREGISTER_HANDLE_REQ(loop, handle, req); 402 403 /* Already closing. Cancel the shutdown. */ 404 if (req->cb) { 405 req->cb(req, UV_ECANCELED); 406 } 407 408 DECREASE_PENDING_REQ_COUNT(handle); 409 return; 410 } 411 412 /* Try to avoid flushing the pipe buffer in the thread pool. */ 413 nt_status = pNtQueryInformationFile(handle->handle, 414 &io_status, 415 &pipe_info, 416 sizeof pipe_info, 417 FilePipeLocalInformation); 418 419 if (nt_status != STATUS_SUCCESS) { 420 /* Failure */ 421 UNREGISTER_HANDLE_REQ(loop, handle, req); 422 423 handle->flags |= UV_HANDLE_WRITABLE; /* Questionable */ 424 if (req->cb) { 425 err = pRtlNtStatusToDosError(nt_status); 426 req->cb(req, uv_translate_sys_error(err)); 427 } 428 429 DECREASE_PENDING_REQ_COUNT(handle); 430 return; 431 } 432 433 if (pipe_info.OutboundQuota == pipe_info.WriteQuotaAvailable) { 434 /* Short-circuit, no need to call FlushFileBuffers. */ 435 uv_insert_pending_req(loop, (uv_req_t*) req); 436 return; 437 } 438 439 /* Run FlushFileBuffers in the thread pool. */ 440 result = QueueUserWorkItem(pipe_shutdown_thread_proc, 441 req, 442 WT_EXECUTELONGFUNCTION); 443 if (result) { 444 return; 445 446 } else { 447 /* Failure. */ 448 UNREGISTER_HANDLE_REQ(loop, handle, req); 449 450 handle->flags |= UV_HANDLE_WRITABLE; /* Questionable */ 451 if (req->cb) { 452 err = GetLastError(); 453 req->cb(req, uv_translate_sys_error(err)); 454 } 455 456 DECREASE_PENDING_REQ_COUNT(handle); 457 return; 458 } 459 } 460 461 if (handle->flags & UV_HANDLE_CLOSING && 462 handle->reqs_pending == 0) { 463 assert(!(handle->flags & UV_HANDLE_CLOSED)); 464 465 if (handle->flags & UV_HANDLE_CONNECTION) { 466 /* Free pending sockets */ 467 while (!QUEUE_EMPTY(&handle->pipe.conn.ipc_xfer_queue)) { 468 QUEUE* q; 469 SOCKET socket; 470 471 q = QUEUE_HEAD(&handle->pipe.conn.ipc_xfer_queue); 472 QUEUE_REMOVE(q); 473 xfer_queue_item = QUEUE_DATA(q, uv__ipc_xfer_queue_item_t, member); 474 475 /* Materialize socket and close it */ 476 socket = WSASocketW(FROM_PROTOCOL_INFO, 477 FROM_PROTOCOL_INFO, 478 FROM_PROTOCOL_INFO, 479 &xfer_queue_item->xfer_info.socket_info, 480 0, 481 WSA_FLAG_OVERLAPPED); 482 uv__free(xfer_queue_item); 483 484 if (socket != INVALID_SOCKET) 485 closesocket(socket); 486 } 487 handle->pipe.conn.ipc_xfer_queue_length = 0; 488 489 if (handle->flags & UV_HANDLE_EMULATE_IOCP) { 490 if (handle->read_req.wait_handle != INVALID_HANDLE_VALUE) { 491 UnregisterWait(handle->read_req.wait_handle); 492 handle->read_req.wait_handle = INVALID_HANDLE_VALUE; 493 } 494 if (handle->read_req.event_handle != NULL) { 495 CloseHandle(handle->read_req.event_handle); 496 handle->read_req.event_handle = NULL; 497 } 498 } 499 500 if (handle->flags & UV_HANDLE_NON_OVERLAPPED_PIPE) 501 DeleteCriticalSection(&handle->pipe.conn.readfile_thread_lock); 502 } 503 504 if (handle->flags & UV_HANDLE_PIPESERVER) { 505 assert(handle->pipe.serv.accept_reqs); 506 uv__free(handle->pipe.serv.accept_reqs); 507 handle->pipe.serv.accept_reqs = NULL; 508 } 509 510 uv__handle_close(handle); 511 } 512 } 513 514 515 void uv_pipe_pending_instances(uv_pipe_t* handle, int count) { 516 if (handle->flags & UV_HANDLE_BOUND) 517 return; 518 handle->pipe.serv.pending_instances = count; 519 handle->flags |= UV_HANDLE_PIPESERVER; 520 } 521 522 523 /* Creates a pipe server. */ 524 int uv_pipe_bind(uv_pipe_t* handle, const char* name) { 525 uv_loop_t* loop = handle->loop; 526 int i, err, nameSize; 527 uv_pipe_accept_t* req; 528 529 if (handle->flags & UV_HANDLE_BOUND) { 530 return UV_EINVAL; 531 } 532 533 if (!name) { 534 return UV_EINVAL; 535 } 536 537 if (!(handle->flags & UV_HANDLE_PIPESERVER)) { 538 handle->pipe.serv.pending_instances = default_pending_pipe_instances; 539 } 540 541 handle->pipe.serv.accept_reqs = (uv_pipe_accept_t*) 542 uv__malloc(sizeof(uv_pipe_accept_t) * handle->pipe.serv.pending_instances); 543 if (!handle->pipe.serv.accept_reqs) { 544 uv_fatal_error(ERROR_OUTOFMEMORY, "uv__malloc"); 545 } 546 547 for (i = 0; i < handle->pipe.serv.pending_instances; i++) { 548 req = &handle->pipe.serv.accept_reqs[i]; 549 UV_REQ_INIT(req, UV_ACCEPT); 550 req->data = handle; 551 req->pipeHandle = INVALID_HANDLE_VALUE; 552 req->next_pending = NULL; 553 } 554 555 /* Convert name to UTF16. */ 556 nameSize = MultiByteToWideChar(CP_UTF8, 0, name, -1, NULL, 0) * sizeof(WCHAR); 557 handle->name = (WCHAR*)uv__malloc(nameSize); 558 if (!handle->name) { 559 uv_fatal_error(ERROR_OUTOFMEMORY, "uv__malloc"); 560 } 561 562 if (!MultiByteToWideChar(CP_UTF8, 563 0, 564 name, 565 -1, 566 handle->name, 567 nameSize / sizeof(WCHAR))) { 568 err = GetLastError(); 569 goto error; 570 } 571 572 /* 573 * Attempt to create the first pipe with FILE_FLAG_FIRST_PIPE_INSTANCE. 574 * If this fails then there's already a pipe server for the given pipe name. 575 */ 576 if (!pipe_alloc_accept(loop, 577 handle, 578 &handle->pipe.serv.accept_reqs[0], 579 TRUE)) { 580 err = GetLastError(); 581 if (err == ERROR_ACCESS_DENIED) { 582 err = WSAEADDRINUSE; /* Translates to UV_EADDRINUSE. */ 583 } else if (err == ERROR_PATH_NOT_FOUND || err == ERROR_INVALID_NAME) { 584 err = WSAEACCES; /* Translates to UV_EACCES. */ 585 } 586 goto error; 587 } 588 589 handle->pipe.serv.pending_accepts = NULL; 590 handle->flags |= UV_HANDLE_PIPESERVER; 591 handle->flags |= UV_HANDLE_BOUND; 592 593 return 0; 594 595 error: 596 if (handle->name) { 597 uv__free(handle->name); 598 handle->name = NULL; 599 } 600 601 return uv_translate_sys_error(err); 602 } 603 604 605 static DWORD WINAPI pipe_connect_thread_proc(void* parameter) { 606 uv_loop_t* loop; 607 uv_pipe_t* handle; 608 uv_connect_t* req; 609 HANDLE pipeHandle = INVALID_HANDLE_VALUE; 610 DWORD duplex_flags; 611 612 req = (uv_connect_t*) parameter; 613 assert(req); 614 handle = (uv_pipe_t*) req->handle; 615 assert(handle); 616 loop = handle->loop; 617 assert(loop); 618 619 /* We're here because CreateFile on a pipe returned ERROR_PIPE_BUSY. We wait 620 * for the pipe to become available with WaitNamedPipe. */ 621 while (WaitNamedPipeW(handle->name, 30000)) { 622 /* The pipe is now available, try to connect. */ 623 pipeHandle = open_named_pipe(handle->name, &duplex_flags); 624 if (pipeHandle != INVALID_HANDLE_VALUE) { 625 break; 626 } 627 628 SwitchToThread(); 629 } 630 631 if (pipeHandle != INVALID_HANDLE_VALUE && 632 !uv_set_pipe_handle(loop, handle, pipeHandle, -1, duplex_flags)) { 633 SET_REQ_SUCCESS(req); 634 } else { 635 SET_REQ_ERROR(req, GetLastError()); 636 } 637 638 /* Post completed */ 639 POST_COMPLETION_FOR_REQ(loop, req); 640 641 return 0; 642 } 643 644 645 void uv_pipe_connect(uv_connect_t* req, uv_pipe_t* handle, 646 const char* name, uv_connect_cb cb) { 647 uv_loop_t* loop = handle->loop; 648 int err, nameSize; 649 HANDLE pipeHandle = INVALID_HANDLE_VALUE; 650 DWORD duplex_flags; 651 652 UV_REQ_INIT(req, UV_CONNECT); 653 req->handle = (uv_stream_t*) handle; 654 req->cb = cb; 655 656 /* Convert name to UTF16. */ 657 nameSize = MultiByteToWideChar(CP_UTF8, 0, name, -1, NULL, 0) * sizeof(WCHAR); 658 handle->name = (WCHAR*)uv__malloc(nameSize); 659 if (!handle->name) { 660 uv_fatal_error(ERROR_OUTOFMEMORY, "uv__malloc"); 661 } 662 663 if (!MultiByteToWideChar(CP_UTF8, 664 0, 665 name, 666 -1, 667 handle->name, 668 nameSize / sizeof(WCHAR))) { 669 err = GetLastError(); 670 goto error; 671 } 672 673 pipeHandle = open_named_pipe(handle->name, &duplex_flags); 674 if (pipeHandle == INVALID_HANDLE_VALUE) { 675 if (GetLastError() == ERROR_PIPE_BUSY) { 676 /* Wait for the server to make a pipe instance available. */ 677 if (!QueueUserWorkItem(&pipe_connect_thread_proc, 678 req, 679 WT_EXECUTELONGFUNCTION)) { 680 err = GetLastError(); 681 goto error; 682 } 683 684 REGISTER_HANDLE_REQ(loop, handle, req); 685 handle->reqs_pending++; 686 687 return; 688 } 689 690 err = GetLastError(); 691 goto error; 692 } 693 694 assert(pipeHandle != INVALID_HANDLE_VALUE); 695 696 if (uv_set_pipe_handle(loop, 697 (uv_pipe_t*) req->handle, 698 pipeHandle, 699 -1, 700 duplex_flags)) { 701 err = GetLastError(); 702 goto error; 703 } 704 705 SET_REQ_SUCCESS(req); 706 uv_insert_pending_req(loop, (uv_req_t*) req); 707 handle->reqs_pending++; 708 REGISTER_HANDLE_REQ(loop, handle, req); 709 return; 710 711 error: 712 if (handle->name) { 713 uv__free(handle->name); 714 handle->name = NULL; 715 } 716 717 if (pipeHandle != INVALID_HANDLE_VALUE) { 718 CloseHandle(pipeHandle); 719 } 720 721 /* Make this req pending reporting an error. */ 722 SET_REQ_ERROR(req, err); 723 uv_insert_pending_req(loop, (uv_req_t*) req); 724 handle->reqs_pending++; 725 REGISTER_HANDLE_REQ(loop, handle, req); 726 return; 727 } 728 729 730 void uv__pipe_interrupt_read(uv_pipe_t* handle) { 731 BOOL r; 732 733 if (!(handle->flags & UV_HANDLE_READ_PENDING)) 734 return; /* No pending reads. */ 735 if (handle->flags & UV_HANDLE_CANCELLATION_PENDING) 736 return; /* Already cancelled. */ 737 if (handle->handle == INVALID_HANDLE_VALUE) 738 return; /* Pipe handle closed. */ 739 740 if (!(handle->flags & UV_HANDLE_NON_OVERLAPPED_PIPE)) { 741 /* Cancel asynchronous read. */ 742 r = CancelIoEx(handle->handle, &handle->read_req.u.io.overlapped); 743 assert(r || GetLastError() == ERROR_NOT_FOUND); 744 745 } else { 746 /* Cancel synchronous read (which is happening in the thread pool). */ 747 HANDLE thread; 748 volatile HANDLE* thread_ptr = &handle->pipe.conn.readfile_thread_handle; 749 750 EnterCriticalSection(&handle->pipe.conn.readfile_thread_lock); 751 752 thread = *thread_ptr; 753 if (thread == NULL) { 754 /* The thread pool thread has not yet reached the point of blocking, we 755 * can pre-empt it by setting thread_handle to INVALID_HANDLE_VALUE. */ 756 *thread_ptr = INVALID_HANDLE_VALUE; 757 758 } else { 759 /* Spin until the thread has acknowledged (by setting the thread to 760 * INVALID_HANDLE_VALUE) that it is past the point of blocking. */ 761 while (thread != INVALID_HANDLE_VALUE) { 762 r = CancelSynchronousIo(thread); 763 assert(r || GetLastError() == ERROR_NOT_FOUND); 764 SwitchToThread(); /* Yield thread. */ 765 thread = *thread_ptr; 766 } 767 } 768 769 LeaveCriticalSection(&handle->pipe.conn.readfile_thread_lock); 770 } 771 772 /* Set flag to indicate that read has been cancelled. */ 773 handle->flags |= UV_HANDLE_CANCELLATION_PENDING; 774 } 775 776 777 void uv__pipe_read_stop(uv_pipe_t* handle) { 778 handle->flags &= ~UV_HANDLE_READING; 779 DECREASE_ACTIVE_COUNT(handle->loop, handle); 780 781 uv__pipe_interrupt_read(handle); 782 } 783 784 785 /* Cleans up uv_pipe_t (server or connection) and all resources associated with 786 * it. */ 787 void uv_pipe_cleanup(uv_loop_t* loop, uv_pipe_t* handle) { 788 int i; 789 HANDLE pipeHandle; 790 791 uv__pipe_interrupt_read(handle); 792 793 if (handle->name) { 794 uv__free(handle->name); 795 handle->name = NULL; 796 } 797 798 if (handle->flags & UV_HANDLE_PIPESERVER) { 799 for (i = 0; i < handle->pipe.serv.pending_instances; i++) { 800 pipeHandle = handle->pipe.serv.accept_reqs[i].pipeHandle; 801 if (pipeHandle != INVALID_HANDLE_VALUE) { 802 CloseHandle(pipeHandle); 803 handle->pipe.serv.accept_reqs[i].pipeHandle = INVALID_HANDLE_VALUE; 804 } 805 } 806 handle->handle = INVALID_HANDLE_VALUE; 807 } 808 809 if (handle->flags & UV_HANDLE_CONNECTION) { 810 handle->flags &= ~UV_HANDLE_WRITABLE; 811 eof_timer_destroy(handle); 812 } 813 814 if ((handle->flags & UV_HANDLE_CONNECTION) 815 && handle->handle != INVALID_HANDLE_VALUE) 816 close_pipe(handle); 817 } 818 819 820 void uv_pipe_close(uv_loop_t* loop, uv_pipe_t* handle) { 821 if (handle->flags & UV_HANDLE_READING) { 822 handle->flags &= ~UV_HANDLE_READING; 823 DECREASE_ACTIVE_COUNT(loop, handle); 824 } 825 826 if (handle->flags & UV_HANDLE_LISTENING) { 827 handle->flags &= ~UV_HANDLE_LISTENING; 828 DECREASE_ACTIVE_COUNT(loop, handle); 829 } 830 831 uv_pipe_cleanup(loop, handle); 832 833 if (handle->reqs_pending == 0) { 834 uv_want_endgame(loop, (uv_handle_t*) handle); 835 } 836 837 handle->flags &= ~(UV_HANDLE_READABLE | UV_HANDLE_WRITABLE); 838 uv__handle_closing(handle); 839 } 840 841 842 static void uv_pipe_queue_accept(uv_loop_t* loop, uv_pipe_t* handle, 843 uv_pipe_accept_t* req, BOOL firstInstance) { 844 assert(handle->flags & UV_HANDLE_LISTENING); 845 846 if (!firstInstance && !pipe_alloc_accept(loop, handle, req, FALSE)) { 847 SET_REQ_ERROR(req, GetLastError()); 848 uv_insert_pending_req(loop, (uv_req_t*) req); 849 handle->reqs_pending++; 850 return; 851 } 852 853 assert(req->pipeHandle != INVALID_HANDLE_VALUE); 854 855 /* Prepare the overlapped structure. */ 856 memset(&(req->u.io.overlapped), 0, sizeof(req->u.io.overlapped)); 857 858 if (!ConnectNamedPipe(req->pipeHandle, &req->u.io.overlapped) && 859 GetLastError() != ERROR_IO_PENDING) { 860 if (GetLastError() == ERROR_PIPE_CONNECTED) { 861 SET_REQ_SUCCESS(req); 862 } else { 863 CloseHandle(req->pipeHandle); 864 req->pipeHandle = INVALID_HANDLE_VALUE; 865 /* Make this req pending reporting an error. */ 866 SET_REQ_ERROR(req, GetLastError()); 867 } 868 uv_insert_pending_req(loop, (uv_req_t*) req); 869 handle->reqs_pending++; 870 return; 871 } 872 873 /* Wait for completion via IOCP */ 874 handle->reqs_pending++; 875 } 876 877 878 int uv_pipe_accept(uv_pipe_t* server, uv_stream_t* client) { 879 uv_loop_t* loop = server->loop; 880 uv_pipe_t* pipe_client; 881 uv_pipe_accept_t* req; 882 QUEUE* q; 883 uv__ipc_xfer_queue_item_t* item; 884 int err; 885 886 if (server->ipc) { 887 if (QUEUE_EMPTY(&server->pipe.conn.ipc_xfer_queue)) { 888 /* No valid pending sockets. */ 889 return WSAEWOULDBLOCK; 890 } 891 892 q = QUEUE_HEAD(&server->pipe.conn.ipc_xfer_queue); 893 QUEUE_REMOVE(q); 894 server->pipe.conn.ipc_xfer_queue_length--; 895 item = QUEUE_DATA(q, uv__ipc_xfer_queue_item_t, member); 896 897 err = uv__tcp_xfer_import( 898 (uv_tcp_t*) client, item->xfer_type, &item->xfer_info); 899 if (err != 0) 900 return err; 901 902 uv__free(item); 903 904 } else { 905 pipe_client = (uv_pipe_t*) client; 906 907 /* Find a connection instance that has been connected, but not yet 908 * accepted. */ 909 req = server->pipe.serv.pending_accepts; 910 911 if (!req) { 912 /* No valid connections found, so we error out. */ 913 return WSAEWOULDBLOCK; 914 } 915 916 /* Initialize the client handle and copy the pipeHandle to the client */ 917 uv_pipe_connection_init(pipe_client); 918 pipe_client->handle = req->pipeHandle; 919 pipe_client->flags |= UV_HANDLE_READABLE | UV_HANDLE_WRITABLE; 920 921 /* Prepare the req to pick up a new connection */ 922 server->pipe.serv.pending_accepts = req->next_pending; 923 req->next_pending = NULL; 924 req->pipeHandle = INVALID_HANDLE_VALUE; 925 926 server->handle = INVALID_HANDLE_VALUE; 927 if (!(server->flags & UV_HANDLE_CLOSING)) { 928 uv_pipe_queue_accept(loop, server, req, FALSE); 929 } 930 } 931 932 return 0; 933 } 934 935 936 /* Starts listening for connections for the given pipe. */ 937 int uv_pipe_listen(uv_pipe_t* handle, int backlog, uv_connection_cb cb) { 938 uv_loop_t* loop = handle->loop; 939 int i; 940 941 if (handle->flags & UV_HANDLE_LISTENING) { 942 handle->stream.serv.connection_cb = cb; 943 } 944 945 if (!(handle->flags & UV_HANDLE_BOUND)) { 946 return WSAEINVAL; 947 } 948 949 if (handle->flags & UV_HANDLE_READING) { 950 return WSAEISCONN; 951 } 952 953 if (!(handle->flags & UV_HANDLE_PIPESERVER)) { 954 return ERROR_NOT_SUPPORTED; 955 } 956 957 if (handle->ipc) { 958 return WSAEINVAL; 959 } 960 961 handle->flags |= UV_HANDLE_LISTENING; 962 INCREASE_ACTIVE_COUNT(loop, handle); 963 handle->stream.serv.connection_cb = cb; 964 965 /* First pipe handle should have already been created in uv_pipe_bind */ 966 assert(handle->pipe.serv.accept_reqs[0].pipeHandle != INVALID_HANDLE_VALUE); 967 968 for (i = 0; i < handle->pipe.serv.pending_instances; i++) { 969 uv_pipe_queue_accept(loop, handle, &handle->pipe.serv.accept_reqs[i], i == 0); 970 } 971 972 return 0; 973 } 974 975 976 static DWORD WINAPI uv_pipe_zero_readfile_thread_proc(void* arg) { 977 uv_read_t* req = (uv_read_t*) arg; 978 uv_pipe_t* handle = (uv_pipe_t*) req->data; 979 uv_loop_t* loop = handle->loop; 980 volatile HANDLE* thread_ptr = &handle->pipe.conn.readfile_thread_handle; 981 CRITICAL_SECTION* lock = &handle->pipe.conn.readfile_thread_lock; 982 HANDLE thread; 983 DWORD bytes; 984 DWORD err; 985 986 assert(req->type == UV_READ); 987 assert(handle->type == UV_NAMED_PIPE); 988 989 err = 0; 990 991 /* Create a handle to the current thread. */ 992 if (!DuplicateHandle(GetCurrentProcess(), 993 GetCurrentThread(), 994 GetCurrentProcess(), 995 &thread, 996 0, 997 FALSE, 998 DUPLICATE_SAME_ACCESS)) { 999 err = GetLastError(); 1000 goto out1; 1001 } 1002 1003 /* The lock needs to be held when thread handle is modified. */ 1004 EnterCriticalSection(lock); 1005 if (*thread_ptr == INVALID_HANDLE_VALUE) { 1006 /* uv__pipe_interrupt_read() cancelled reading before we got here. */ 1007 err = ERROR_OPERATION_ABORTED; 1008 } else { 1009 /* Let main thread know which worker thread is doing the blocking read. */ 1010 assert(*thread_ptr == NULL); 1011 *thread_ptr = thread; 1012 } 1013 LeaveCriticalSection(lock); 1014 1015 if (err) 1016 goto out2; 1017 1018 /* Block the thread until data is available on the pipe, or the read is 1019 * cancelled. */ 1020 if (!ReadFile(handle->handle, &uv_zero_, 0, &bytes, NULL)) 1021 err = GetLastError(); 1022 1023 /* Let the main thread know the worker is past the point of blocking. */ 1024 assert(thread == *thread_ptr); 1025 *thread_ptr = INVALID_HANDLE_VALUE; 1026 1027 /* Briefly acquire the mutex. Since the main thread holds the lock while it 1028 * is spinning trying to cancel this thread's I/O, we will block here until 1029 * it stops doing that. */ 1030 EnterCriticalSection(lock); 1031 LeaveCriticalSection(lock); 1032 1033 out2: 1034 /* Close the handle to the current thread. */ 1035 CloseHandle(thread); 1036 1037 out1: 1038 /* Set request status and post a completion record to the IOCP. */ 1039 if (err) 1040 SET_REQ_ERROR(req, err); 1041 else 1042 SET_REQ_SUCCESS(req); 1043 POST_COMPLETION_FOR_REQ(loop, req); 1044 1045 return 0; 1046 } 1047 1048 1049 static DWORD WINAPI uv_pipe_writefile_thread_proc(void* parameter) { 1050 int result; 1051 DWORD bytes; 1052 uv_write_t* req = (uv_write_t*) parameter; 1053 uv_pipe_t* handle = (uv_pipe_t*) req->handle; 1054 uv_loop_t* loop = handle->loop; 1055 1056 assert(req != NULL); 1057 assert(req->type == UV_WRITE); 1058 assert(handle->type == UV_NAMED_PIPE); 1059 assert(req->write_buffer.base); 1060 1061 result = WriteFile(handle->handle, 1062 req->write_buffer.base, 1063 req->write_buffer.len, 1064 &bytes, 1065 NULL); 1066 1067 if (!result) { 1068 SET_REQ_ERROR(req, GetLastError()); 1069 } 1070 1071 POST_COMPLETION_FOR_REQ(loop, req); 1072 return 0; 1073 } 1074 1075 1076 static void CALLBACK post_completion_read_wait(void* context, BOOLEAN timed_out) { 1077 uv_read_t* req; 1078 uv_tcp_t* handle; 1079 1080 req = (uv_read_t*) context; 1081 assert(req != NULL); 1082 handle = (uv_tcp_t*)req->data; 1083 assert(handle != NULL); 1084 assert(!timed_out); 1085 1086 if (!PostQueuedCompletionStatus(handle->loop->iocp, 1087 req->u.io.overlapped.InternalHigh, 1088 0, 1089 &req->u.io.overlapped)) { 1090 uv_fatal_error(GetLastError(), "PostQueuedCompletionStatus"); 1091 } 1092 } 1093 1094 1095 static void CALLBACK post_completion_write_wait(void* context, BOOLEAN timed_out) { 1096 uv_write_t* req; 1097 uv_tcp_t* handle; 1098 1099 req = (uv_write_t*) context; 1100 assert(req != NULL); 1101 handle = (uv_tcp_t*)req->handle; 1102 assert(handle != NULL); 1103 assert(!timed_out); 1104 1105 if (!PostQueuedCompletionStatus(handle->loop->iocp, 1106 req->u.io.overlapped.InternalHigh, 1107 0, 1108 &req->u.io.overlapped)) { 1109 uv_fatal_error(GetLastError(), "PostQueuedCompletionStatus"); 1110 } 1111 } 1112 1113 1114 static void uv_pipe_queue_read(uv_loop_t* loop, uv_pipe_t* handle) { 1115 uv_read_t* req; 1116 int result; 1117 1118 assert(handle->flags & UV_HANDLE_READING); 1119 assert(!(handle->flags & UV_HANDLE_READ_PENDING)); 1120 1121 assert(handle->handle != INVALID_HANDLE_VALUE); 1122 1123 req = &handle->read_req; 1124 1125 if (handle->flags & UV_HANDLE_NON_OVERLAPPED_PIPE) { 1126 handle->pipe.conn.readfile_thread_handle = NULL; /* Reset cancellation. */ 1127 if (!QueueUserWorkItem(&uv_pipe_zero_readfile_thread_proc, 1128 req, 1129 WT_EXECUTELONGFUNCTION)) { 1130 /* Make this req pending reporting an error. */ 1131 SET_REQ_ERROR(req, GetLastError()); 1132 goto error; 1133 } 1134 } else { 1135 memset(&req->u.io.overlapped, 0, sizeof(req->u.io.overlapped)); 1136 if (handle->flags & UV_HANDLE_EMULATE_IOCP) { 1137 assert(req->event_handle != NULL); 1138 req->u.io.overlapped.hEvent = (HANDLE) ((uintptr_t) req->event_handle | 1); 1139 } 1140 1141 /* Do 0-read */ 1142 result = ReadFile(handle->handle, 1143 &uv_zero_, 1144 0, 1145 NULL, 1146 &req->u.io.overlapped); 1147 1148 if (!result && GetLastError() != ERROR_IO_PENDING) { 1149 /* Make this req pending reporting an error. */ 1150 SET_REQ_ERROR(req, GetLastError()); 1151 goto error; 1152 } 1153 1154 if (handle->flags & UV_HANDLE_EMULATE_IOCP) { 1155 if (req->wait_handle == INVALID_HANDLE_VALUE) { 1156 if (!RegisterWaitForSingleObject(&req->wait_handle, 1157 req->event_handle, post_completion_read_wait, (void*) req, 1158 INFINITE, WT_EXECUTEINWAITTHREAD)) { 1159 SET_REQ_ERROR(req, GetLastError()); 1160 goto error; 1161 } 1162 } 1163 } 1164 } 1165 1166 /* Start the eof timer if there is one */ 1167 eof_timer_start(handle); 1168 handle->flags |= UV_HANDLE_READ_PENDING; 1169 handle->reqs_pending++; 1170 return; 1171 1172 error: 1173 uv_insert_pending_req(loop, (uv_req_t*)req); 1174 handle->flags |= UV_HANDLE_READ_PENDING; 1175 handle->reqs_pending++; 1176 } 1177 1178 1179 int uv_pipe_read_start(uv_pipe_t* handle, 1180 uv_alloc_cb alloc_cb, 1181 uv_read_cb read_cb) { 1182 uv_loop_t* loop = handle->loop; 1183 1184 handle->flags |= UV_HANDLE_READING; 1185 INCREASE_ACTIVE_COUNT(loop, handle); 1186 handle->read_cb = read_cb; 1187 handle->alloc_cb = alloc_cb; 1188 1189 /* If reading was stopped and then started again, there could still be a read 1190 * request pending. */ 1191 if (!(handle->flags & UV_HANDLE_READ_PENDING)) { 1192 if (handle->flags & UV_HANDLE_EMULATE_IOCP && 1193 handle->read_req.event_handle == NULL) { 1194 handle->read_req.event_handle = CreateEvent(NULL, 0, 0, NULL); 1195 if (handle->read_req.event_handle == NULL) { 1196 uv_fatal_error(GetLastError(), "CreateEvent"); 1197 } 1198 } 1199 uv_pipe_queue_read(loop, handle); 1200 } 1201 1202 return 0; 1203 } 1204 1205 1206 static void uv_insert_non_overlapped_write_req(uv_pipe_t* handle, 1207 uv_write_t* req) { 1208 req->next_req = NULL; 1209 if (handle->pipe.conn.non_overlapped_writes_tail) { 1210 req->next_req = 1211 handle->pipe.conn.non_overlapped_writes_tail->next_req; 1212 handle->pipe.conn.non_overlapped_writes_tail->next_req = (uv_req_t*)req; 1213 handle->pipe.conn.non_overlapped_writes_tail = req; 1214 } else { 1215 req->next_req = (uv_req_t*)req; 1216 handle->pipe.conn.non_overlapped_writes_tail = req; 1217 } 1218 } 1219 1220 1221 static uv_write_t* uv_remove_non_overlapped_write_req(uv_pipe_t* handle) { 1222 uv_write_t* req; 1223 1224 if (handle->pipe.conn.non_overlapped_writes_tail) { 1225 req = (uv_write_t*)handle->pipe.conn.non_overlapped_writes_tail->next_req; 1226 1227 if (req == handle->pipe.conn.non_overlapped_writes_tail) { 1228 handle->pipe.conn.non_overlapped_writes_tail = NULL; 1229 } else { 1230 handle->pipe.conn.non_overlapped_writes_tail->next_req = 1231 req->next_req; 1232 } 1233 1234 return req; 1235 } else { 1236 /* queue empty */ 1237 return NULL; 1238 } 1239 } 1240 1241 1242 static void uv_queue_non_overlapped_write(uv_pipe_t* handle) { 1243 uv_write_t* req = uv_remove_non_overlapped_write_req(handle); 1244 if (req) { 1245 if (!QueueUserWorkItem(&uv_pipe_writefile_thread_proc, 1246 req, 1247 WT_EXECUTELONGFUNCTION)) { 1248 uv_fatal_error(GetLastError(), "QueueUserWorkItem"); 1249 } 1250 } 1251 } 1252 1253 1254 static int uv__build_coalesced_write_req(uv_write_t* user_req, 1255 const uv_buf_t bufs[], 1256 size_t nbufs, 1257 uv_write_t** req_out, 1258 uv_buf_t* write_buf_out) { 1259 /* Pack into a single heap-allocated buffer: 1260 * (a) a uv_write_t structure where libuv stores the actual state. 1261 * (b) a pointer to the original uv_write_t. 1262 * (c) data from all `bufs` entries. 1263 */ 1264 char* heap_buffer; 1265 size_t heap_buffer_length, heap_buffer_offset; 1266 uv__coalesced_write_t* coalesced_write_req; /* (a) + (b) */ 1267 char* data_start; /* (c) */ 1268 size_t data_length; 1269 unsigned int i; 1270 1271 /* Compute combined size of all combined buffers from `bufs`. */ 1272 data_length = 0; 1273 for (i = 0; i < nbufs; i++) 1274 data_length += bufs[i].len; 1275 1276 /* The total combined size of data buffers should not exceed UINT32_MAX, 1277 * because WriteFile() won't accept buffers larger than that. */ 1278 if (data_length > UINT32_MAX) 1279 return WSAENOBUFS; /* Maps to UV_ENOBUFS. */ 1280 1281 /* Compute heap buffer size. */ 1282 heap_buffer_length = sizeof *coalesced_write_req + /* (a) + (b) */ 1283 data_length; /* (c) */ 1284 1285 /* Allocate buffer. */ 1286 heap_buffer = uv__malloc(heap_buffer_length); 1287 if (heap_buffer == NULL) 1288 return ERROR_NOT_ENOUGH_MEMORY; /* Maps to UV_ENOMEM. */ 1289 1290 /* Copy uv_write_t information to the buffer. */ 1291 coalesced_write_req = (uv__coalesced_write_t*) heap_buffer; 1292 coalesced_write_req->req = *user_req; /* copy (a) */ 1293 coalesced_write_req->req.coalesced = 1; 1294 coalesced_write_req->user_req = user_req; /* copy (b) */ 1295 heap_buffer_offset = sizeof *coalesced_write_req; /* offset (a) + (b) */ 1296 1297 /* Copy data buffers to the heap buffer. */ 1298 data_start = &heap_buffer[heap_buffer_offset]; 1299 for (i = 0; i < nbufs; i++) { 1300 memcpy(&heap_buffer[heap_buffer_offset], 1301 bufs[i].base, 1302 bufs[i].len); /* copy (c) */ 1303 heap_buffer_offset += bufs[i].len; /* offset (c) */ 1304 } 1305 assert(heap_buffer_offset == heap_buffer_length); 1306 1307 /* Set out arguments and return. */ 1308 *req_out = &coalesced_write_req->req; 1309 *write_buf_out = uv_buf_init(data_start, (unsigned int) data_length); 1310 return 0; 1311 } 1312 1313 1314 static int uv__pipe_write_data(uv_loop_t* loop, 1315 uv_write_t* req, 1316 uv_pipe_t* handle, 1317 const uv_buf_t bufs[], 1318 size_t nbufs, 1319 uv_write_cb cb, 1320 int copy_always) { 1321 int err; 1322 int result; 1323 uv_buf_t write_buf; 1324 1325 assert(handle->handle != INVALID_HANDLE_VALUE); 1326 1327 UV_REQ_INIT(req, UV_WRITE); 1328 req->handle = (uv_stream_t*) handle; 1329 req->send_handle = NULL; 1330 req->cb = cb; 1331 /* Private fields. */ 1332 req->coalesced = 0; 1333 req->event_handle = NULL; 1334 req->wait_handle = INVALID_HANDLE_VALUE; 1335 1336 /* Prepare the overlapped structure. */ 1337 memset(&req->u.io.overlapped, 0, sizeof(req->u.io.overlapped)); 1338 if (handle->flags & (UV_HANDLE_EMULATE_IOCP | UV_HANDLE_BLOCKING_WRITES)) { 1339 req->event_handle = CreateEvent(NULL, 0, 0, NULL); 1340 if (req->event_handle == NULL) { 1341 uv_fatal_error(GetLastError(), "CreateEvent"); 1342 } 1343 req->u.io.overlapped.hEvent = (HANDLE) ((uintptr_t) req->event_handle | 1); 1344 } 1345 req->write_buffer = uv_null_buf_; 1346 1347 if (nbufs == 0) { 1348 /* Write empty buffer. */ 1349 write_buf = uv_null_buf_; 1350 } else if (nbufs == 1 && !copy_always) { 1351 /* Write directly from bufs[0]. */ 1352 write_buf = bufs[0]; 1353 } else { 1354 /* Coalesce all `bufs` into one big buffer. This also creates a new 1355 * write-request structure that replaces the old one. */ 1356 err = uv__build_coalesced_write_req(req, bufs, nbufs, &req, &write_buf); 1357 if (err != 0) 1358 return err; 1359 } 1360 1361 if ((handle->flags & 1362 (UV_HANDLE_BLOCKING_WRITES | UV_HANDLE_NON_OVERLAPPED_PIPE)) == 1363 (UV_HANDLE_BLOCKING_WRITES | UV_HANDLE_NON_OVERLAPPED_PIPE)) { 1364 DWORD bytes; 1365 result = 1366 WriteFile(handle->handle, write_buf.base, write_buf.len, &bytes, NULL); 1367 1368 if (!result) { 1369 err = GetLastError(); 1370 return err; 1371 } else { 1372 /* Request completed immediately. */ 1373 req->u.io.queued_bytes = 0; 1374 } 1375 1376 REGISTER_HANDLE_REQ(loop, handle, req); 1377 handle->reqs_pending++; 1378 handle->stream.conn.write_reqs_pending++; 1379 POST_COMPLETION_FOR_REQ(loop, req); 1380 return 0; 1381 } else if (handle->flags & UV_HANDLE_NON_OVERLAPPED_PIPE) { 1382 req->write_buffer = write_buf; 1383 uv_insert_non_overlapped_write_req(handle, req); 1384 if (handle->stream.conn.write_reqs_pending == 0) { 1385 uv_queue_non_overlapped_write(handle); 1386 } 1387 1388 /* Request queued by the kernel. */ 1389 req->u.io.queued_bytes = write_buf.len; 1390 handle->write_queue_size += req->u.io.queued_bytes; 1391 } else if (handle->flags & UV_HANDLE_BLOCKING_WRITES) { 1392 /* Using overlapped IO, but wait for completion before returning */ 1393 result = WriteFile(handle->handle, 1394 write_buf.base, 1395 write_buf.len, 1396 NULL, 1397 &req->u.io.overlapped); 1398 1399 if (!result && GetLastError() != ERROR_IO_PENDING) { 1400 err = GetLastError(); 1401 CloseHandle(req->event_handle); 1402 req->event_handle = NULL; 1403 return err; 1404 } 1405 1406 if (result) { 1407 /* Request completed immediately. */ 1408 req->u.io.queued_bytes = 0; 1409 } else { 1410 /* Request queued by the kernel. */ 1411 req->u.io.queued_bytes = write_buf.len; 1412 handle->write_queue_size += req->u.io.queued_bytes; 1413 if (WaitForSingleObject(req->event_handle, INFINITE) != 1414 WAIT_OBJECT_0) { 1415 err = GetLastError(); 1416 CloseHandle(req->event_handle); 1417 req->event_handle = NULL; 1418 return err; 1419 } 1420 } 1421 CloseHandle(req->event_handle); 1422 req->event_handle = NULL; 1423 1424 REGISTER_HANDLE_REQ(loop, handle, req); 1425 handle->reqs_pending++; 1426 handle->stream.conn.write_reqs_pending++; 1427 return 0; 1428 } else { 1429 result = WriteFile(handle->handle, 1430 write_buf.base, 1431 write_buf.len, 1432 NULL, 1433 &req->u.io.overlapped); 1434 1435 if (!result && GetLastError() != ERROR_IO_PENDING) { 1436 return GetLastError(); 1437 } 1438 1439 if (result) { 1440 /* Request completed immediately. */ 1441 req->u.io.queued_bytes = 0; 1442 } else { 1443 /* Request queued by the kernel. */ 1444 req->u.io.queued_bytes = write_buf.len; 1445 handle->write_queue_size += req->u.io.queued_bytes; 1446 } 1447 1448 if (handle->flags & UV_HANDLE_EMULATE_IOCP) { 1449 if (!RegisterWaitForSingleObject(&req->wait_handle, 1450 req->event_handle, post_completion_write_wait, (void*) req, 1451 INFINITE, WT_EXECUTEINWAITTHREAD)) { 1452 return GetLastError(); 1453 } 1454 } 1455 } 1456 1457 REGISTER_HANDLE_REQ(loop, handle, req); 1458 handle->reqs_pending++; 1459 handle->stream.conn.write_reqs_pending++; 1460 1461 return 0; 1462 } 1463 1464 1465 static DWORD uv__pipe_get_ipc_remote_pid(uv_pipe_t* handle) { 1466 DWORD* pid = &handle->pipe.conn.ipc_remote_pid; 1467 1468 /* If the both ends of the IPC pipe are owned by the same process, 1469 * the remote end pid may not yet be set. If so, do it here. 1470 * TODO: this is weird; it'd probably better to use a handshake. */ 1471 if (*pid == 0) 1472 *pid = GetCurrentProcessId(); 1473 1474 return *pid; 1475 } 1476 1477 1478 int uv__pipe_write_ipc(uv_loop_t* loop, 1479 uv_write_t* req, 1480 uv_pipe_t* handle, 1481 const uv_buf_t data_bufs[], 1482 size_t data_buf_count, 1483 uv_stream_t* send_handle, 1484 uv_write_cb cb) { 1485 uv_buf_t stack_bufs[6]; 1486 uv_buf_t* bufs; 1487 size_t buf_count, buf_index; 1488 uv__ipc_frame_header_t frame_header; 1489 uv__ipc_socket_xfer_type_t xfer_type = UV__IPC_SOCKET_XFER_NONE; 1490 uv__ipc_socket_xfer_info_t xfer_info; 1491 uint64_t data_length; 1492 size_t i; 1493 int err; 1494 1495 /* Compute the combined size of data buffers. */ 1496 data_length = 0; 1497 for (i = 0; i < data_buf_count; i++) 1498 data_length += data_bufs[i].len; 1499 if (data_length > UINT32_MAX) 1500 return WSAENOBUFS; /* Maps to UV_ENOBUFS. */ 1501 1502 /* Prepare the frame's socket xfer payload. */ 1503 if (send_handle != NULL) { 1504 uv_tcp_t* send_tcp_handle = (uv_tcp_t*) send_handle; 1505 1506 /* Verify that `send_handle` it is indeed a tcp handle. */ 1507 if (send_tcp_handle->type != UV_TCP) 1508 return ERROR_NOT_SUPPORTED; 1509 1510 /* Export the tcp handle. */ 1511 err = uv__tcp_xfer_export(send_tcp_handle, 1512 uv__pipe_get_ipc_remote_pid(handle), 1513 &xfer_type, 1514 &xfer_info); 1515 if (err != 0) 1516 return err; 1517 } 1518 1519 /* Compute the number of uv_buf_t's required. */ 1520 buf_count = 1 + data_buf_count; /* Frame header and data buffers. */ 1521 if (send_handle != NULL) 1522 buf_count += 1; /* One extra for the socket xfer information. */ 1523 1524 /* Use the on-stack buffer array if it is big enough; otherwise allocate 1525 * space for it on the heap. */ 1526 if (buf_count < ARRAY_SIZE(stack_bufs)) { 1527 /* Use on-stack buffer array. */ 1528 bufs = stack_bufs; 1529 } else { 1530 /* Use heap-allocated buffer array. */ 1531 bufs = uv__calloc(buf_count, sizeof(uv_buf_t)); 1532 if (bufs == NULL) 1533 return ERROR_NOT_ENOUGH_MEMORY; /* Maps to UV_ENOMEM. */ 1534 } 1535 buf_index = 0; 1536 1537 /* Initialize frame header and add it to the buffers list. */ 1538 memset(&frame_header, 0, sizeof frame_header); 1539 bufs[buf_index++] = uv_buf_init((char*) &frame_header, sizeof frame_header); 1540 1541 if (send_handle != NULL) { 1542 /* Add frame header flags. */ 1543 switch (xfer_type) { 1544 case UV__IPC_SOCKET_XFER_TCP_CONNECTION: 1545 frame_header.flags |= UV__IPC_FRAME_HAS_SOCKET_XFER | 1546 UV__IPC_FRAME_XFER_IS_TCP_CONNECTION; 1547 break; 1548 case UV__IPC_SOCKET_XFER_TCP_SERVER: 1549 frame_header.flags |= UV__IPC_FRAME_HAS_SOCKET_XFER; 1550 break; 1551 default: 1552 assert(0); /* Unreachable. */ 1553 } 1554 /* Add xfer info buffer. */ 1555 bufs[buf_index++] = uv_buf_init((char*) &xfer_info, sizeof xfer_info); 1556 } 1557 1558 if (data_length > 0) { 1559 /* Update frame header. */ 1560 frame_header.flags |= UV__IPC_FRAME_HAS_DATA; 1561 frame_header.data_length = (uint32_t) data_length; 1562 /* Add data buffers to buffers list. */ 1563 for (i = 0; i < data_buf_count; i++) 1564 bufs[buf_index++] = data_bufs[i]; 1565 } 1566 1567 /* Write buffers. We set the `always_copy` flag, so it is not a problem that 1568 * some of the written data lives on the stack. */ 1569 err = uv__pipe_write_data(loop, req, handle, bufs, buf_count, cb, 1); 1570 1571 /* If we had to heap-allocate the bufs array, free it now. */ 1572 if (bufs != stack_bufs) { 1573 uv__free(bufs); 1574 } 1575 1576 return err; 1577 } 1578 1579 1580 int uv__pipe_write(uv_loop_t* loop, 1581 uv_write_t* req, 1582 uv_pipe_t* handle, 1583 const uv_buf_t bufs[], 1584 size_t nbufs, 1585 uv_stream_t* send_handle, 1586 uv_write_cb cb) { 1587 if (handle->ipc) { 1588 /* IPC pipe write: use framing protocol. */ 1589 return uv__pipe_write_ipc(loop, req, handle, bufs, nbufs, send_handle, cb); 1590 } else { 1591 /* Non-IPC pipe write: put data on the wire directly. */ 1592 assert(send_handle == NULL); 1593 return uv__pipe_write_data(loop, req, handle, bufs, nbufs, cb, 0); 1594 } 1595 } 1596 1597 1598 static void uv_pipe_read_eof(uv_loop_t* loop, uv_pipe_t* handle, 1599 uv_buf_t buf) { 1600 /* If there is an eof timer running, we don't need it any more, so discard 1601 * it. */ 1602 eof_timer_destroy(handle); 1603 1604 handle->flags &= ~UV_HANDLE_READABLE; 1605 uv_read_stop((uv_stream_t*) handle); 1606 1607 handle->read_cb((uv_stream_t*) handle, UV_EOF, &buf); 1608 } 1609 1610 1611 static void uv_pipe_read_error(uv_loop_t* loop, uv_pipe_t* handle, int error, 1612 uv_buf_t buf) { 1613 /* If there is an eof timer running, we don't need it any more, so discard 1614 * it. */ 1615 eof_timer_destroy(handle); 1616 1617 uv_read_stop((uv_stream_t*) handle); 1618 1619 handle->read_cb((uv_stream_t*)handle, uv_translate_sys_error(error), &buf); 1620 } 1621 1622 1623 static void uv_pipe_read_error_or_eof(uv_loop_t* loop, uv_pipe_t* handle, 1624 int error, uv_buf_t buf) { 1625 if (error == ERROR_BROKEN_PIPE) { 1626 uv_pipe_read_eof(loop, handle, buf); 1627 } else { 1628 uv_pipe_read_error(loop, handle, error, buf); 1629 } 1630 } 1631 1632 1633 static void uv__pipe_queue_ipc_xfer_info( 1634 uv_pipe_t* handle, 1635 uv__ipc_socket_xfer_type_t xfer_type, 1636 uv__ipc_socket_xfer_info_t* xfer_info) { 1637 uv__ipc_xfer_queue_item_t* item; 1638 1639 item = (uv__ipc_xfer_queue_item_t*) uv__malloc(sizeof(*item)); 1640 if (item == NULL) 1641 uv_fatal_error(ERROR_OUTOFMEMORY, "uv__malloc"); 1642 1643 item->xfer_type = xfer_type; 1644 item->xfer_info = *xfer_info; 1645 1646 QUEUE_INSERT_TAIL(&handle->pipe.conn.ipc_xfer_queue, &item->member); 1647 handle->pipe.conn.ipc_xfer_queue_length++; 1648 } 1649 1650 1651 /* Read an exact number of bytes from a pipe. If an error or end-of-file is 1652 * encountered before the requested number of bytes are read, an error is 1653 * returned. */ 1654 static int uv__pipe_read_exactly(HANDLE h, void* buffer, DWORD count) { 1655 DWORD bytes_read, bytes_read_now; 1656 1657 bytes_read = 0; 1658 while (bytes_read < count) { 1659 if (!ReadFile(h, 1660 (char*) buffer + bytes_read, 1661 count - bytes_read, 1662 &bytes_read_now, 1663 NULL)) { 1664 return GetLastError(); 1665 } 1666 1667 bytes_read += bytes_read_now; 1668 } 1669 1670 assert(bytes_read == count); 1671 return 0; 1672 } 1673 1674 1675 static DWORD uv__pipe_read_data(uv_loop_t* loop, 1676 uv_pipe_t* handle, 1677 DWORD suggested_bytes, 1678 DWORD max_bytes) { 1679 DWORD bytes_read; 1680 uv_buf_t buf; 1681 1682 /* Ask the user for a buffer to read data into. */ 1683 buf = uv_buf_init(NULL, 0); 1684 handle->alloc_cb((uv_handle_t*) handle, suggested_bytes, &buf); 1685 if (buf.base == NULL || buf.len == 0) { 1686 handle->read_cb((uv_stream_t*) handle, UV_ENOBUFS, &buf); 1687 return 0; /* Break out of read loop. */ 1688 } 1689 1690 /* Ensure we read at most the smaller of: 1691 * (a) the length of the user-allocated buffer. 1692 * (b) the maximum data length as specified by the `max_bytes` argument. 1693 */ 1694 if (max_bytes > buf.len) 1695 max_bytes = buf.len; 1696 1697 /* Read into the user buffer. */ 1698 if (!ReadFile(handle->handle, buf.base, max_bytes, &bytes_read, NULL)) { 1699 uv_pipe_read_error_or_eof(loop, handle, GetLastError(), buf); 1700 return 0; /* Break out of read loop. */ 1701 } 1702 1703 /* Call the read callback. */ 1704 handle->read_cb((uv_stream_t*) handle, bytes_read, &buf); 1705 1706 return bytes_read; 1707 } 1708 1709 1710 static DWORD uv__pipe_read_ipc(uv_loop_t* loop, uv_pipe_t* handle) { 1711 uint32_t* data_remaining = &handle->pipe.conn.ipc_data_frame.payload_remaining; 1712 int err; 1713 1714 if (*data_remaining > 0) { 1715 /* Read frame data payload. */ 1716 DWORD bytes_read = 1717 uv__pipe_read_data(loop, handle, *data_remaining, *data_remaining); 1718 *data_remaining -= bytes_read; 1719 return bytes_read; 1720 1721 } else { 1722 /* Start of a new IPC frame. */ 1723 uv__ipc_frame_header_t frame_header; 1724 uint32_t xfer_flags; 1725 uv__ipc_socket_xfer_type_t xfer_type; 1726 uv__ipc_socket_xfer_info_t xfer_info; 1727 1728 /* Read the IPC frame header. */ 1729 err = uv__pipe_read_exactly( 1730 handle->handle, &frame_header, sizeof frame_header); 1731 if (err) 1732 goto error; 1733 1734 /* Validate that flags are valid. */ 1735 if ((frame_header.flags & ~UV__IPC_FRAME_VALID_FLAGS) != 0) 1736 goto invalid; 1737 /* Validate that reserved2 is zero. */ 1738 if (frame_header.reserved2 != 0) 1739 goto invalid; 1740 1741 /* Parse xfer flags. */ 1742 xfer_flags = frame_header.flags & UV__IPC_FRAME_XFER_FLAGS; 1743 if (xfer_flags & UV__IPC_FRAME_HAS_SOCKET_XFER) { 1744 /* Socket coming -- determine the type. */ 1745 xfer_type = xfer_flags & UV__IPC_FRAME_XFER_IS_TCP_CONNECTION 1746 ? UV__IPC_SOCKET_XFER_TCP_CONNECTION 1747 : UV__IPC_SOCKET_XFER_TCP_SERVER; 1748 } else if (xfer_flags == 0) { 1749 /* No socket. */ 1750 xfer_type = UV__IPC_SOCKET_XFER_NONE; 1751 } else { 1752 /* Invalid flags. */ 1753 goto invalid; 1754 } 1755 1756 /* Parse data frame information. */ 1757 if (frame_header.flags & UV__IPC_FRAME_HAS_DATA) { 1758 *data_remaining = frame_header.data_length; 1759 } else if (frame_header.data_length != 0) { 1760 /* Data length greater than zero but data flag not set -- invalid. */ 1761 goto invalid; 1762 } 1763 1764 /* If no socket xfer info follows, return here. Data will be read in a 1765 * subsequent invocation of uv__pipe_read_ipc(). */ 1766 if (xfer_type == UV__IPC_SOCKET_XFER_NONE) 1767 return sizeof frame_header; /* Number of bytes read. */ 1768 1769 /* Read transferred socket information. */ 1770 err = uv__pipe_read_exactly(handle->handle, &xfer_info, sizeof xfer_info); 1771 if (err) 1772 goto error; 1773 1774 /* Store the pending socket info. */ 1775 uv__pipe_queue_ipc_xfer_info(handle, xfer_type, &xfer_info); 1776 1777 /* Return number of bytes read. */ 1778 return sizeof frame_header + sizeof xfer_info; 1779 } 1780 1781 invalid: 1782 /* Invalid frame. */ 1783 err = WSAECONNABORTED; /* Maps to UV_ECONNABORTED. */ 1784 1785 error: 1786 uv_pipe_read_error_or_eof(loop, handle, err, uv_null_buf_); 1787 return 0; /* Break out of read loop. */ 1788 } 1789 1790 1791 void uv_process_pipe_read_req(uv_loop_t* loop, 1792 uv_pipe_t* handle, 1793 uv_req_t* req) { 1794 assert(handle->type == UV_NAMED_PIPE); 1795 1796 handle->flags &= ~(UV_HANDLE_READ_PENDING | UV_HANDLE_CANCELLATION_PENDING); 1797 DECREASE_PENDING_REQ_COUNT(handle); 1798 eof_timer_stop(handle); 1799 1800 /* At this point, we're done with bookkeeping. If the user has stopped 1801 * reading the pipe in the meantime, there is nothing left to do, since there 1802 * is no callback that we can call. */ 1803 if (!(handle->flags & UV_HANDLE_READING)) 1804 return; 1805 1806 if (!REQ_SUCCESS(req)) { 1807 /* An error occurred doing the zero-read. */ 1808 DWORD err = GET_REQ_ERROR(req); 1809 1810 /* If the read was cancelled by uv__pipe_interrupt_read(), the request may 1811 * indicate an ERROR_OPERATION_ABORTED error. This error isn't relevant to 1812 * the user; we'll start a new zero-read at the end of this function. */ 1813 if (err != ERROR_OPERATION_ABORTED) 1814 uv_pipe_read_error_or_eof(loop, handle, err, uv_null_buf_); 1815 1816 } else { 1817 /* The zero-read completed without error, indicating there is data 1818 * available in the kernel buffer. */ 1819 DWORD avail; 1820 1821 /* Get the number of bytes available. */ 1822 avail = 0; 1823 if (!PeekNamedPipe(handle->handle, NULL, 0, NULL, &avail, NULL)) 1824 uv_pipe_read_error_or_eof(loop, handle, GetLastError(), uv_null_buf_); 1825 1826 /* Read until we've either read all the bytes available, or the 'reading' 1827 * flag is cleared. */ 1828 while (avail > 0 && handle->flags & UV_HANDLE_READING) { 1829 /* Depending on the type of pipe, read either IPC frames or raw data. */ 1830 DWORD bytes_read = 1831 handle->ipc ? uv__pipe_read_ipc(loop, handle) 1832 : uv__pipe_read_data(loop, handle, avail, (DWORD) -1); 1833 1834 /* If no bytes were read, treat this as an indication that an error 1835 * occurred, and break out of the read loop. */ 1836 if (bytes_read == 0) 1837 break; 1838 1839 /* It is possible that more bytes were read than we thought were 1840 * available. To prevent `avail` from underflowing, break out of the loop 1841 * if this is the case. */ 1842 if (bytes_read > avail) 1843 break; 1844 1845 /* Recompute the number of bytes available. */ 1846 avail -= bytes_read; 1847 } 1848 } 1849 1850 /* Start another zero-read request if necessary. */ 1851 if ((handle->flags & UV_HANDLE_READING) && 1852 !(handle->flags & UV_HANDLE_READ_PENDING)) { 1853 uv_pipe_queue_read(loop, handle); 1854 } 1855 } 1856 1857 1858 void uv_process_pipe_write_req(uv_loop_t* loop, uv_pipe_t* handle, 1859 uv_write_t* req) { 1860 int err; 1861 1862 assert(handle->type == UV_NAMED_PIPE); 1863 1864 assert(handle->write_queue_size >= req->u.io.queued_bytes); 1865 handle->write_queue_size -= req->u.io.queued_bytes; 1866 1867 UNREGISTER_HANDLE_REQ(loop, handle, req); 1868 1869 if (handle->flags & UV_HANDLE_EMULATE_IOCP) { 1870 if (req->wait_handle != INVALID_HANDLE_VALUE) { 1871 UnregisterWait(req->wait_handle); 1872 req->wait_handle = INVALID_HANDLE_VALUE; 1873 } 1874 if (req->event_handle) { 1875 CloseHandle(req->event_handle); 1876 req->event_handle = NULL; 1877 } 1878 } 1879 1880 err = GET_REQ_ERROR(req); 1881 1882 /* If this was a coalesced write, extract pointer to the user_provided 1883 * uv_write_t structure so we can pass the expected pointer to the callback, 1884 * then free the heap-allocated write req. */ 1885 if (req->coalesced) { 1886 uv__coalesced_write_t* coalesced_write = 1887 container_of(req, uv__coalesced_write_t, req); 1888 req = coalesced_write->user_req; 1889 uv__free(coalesced_write); 1890 } 1891 if (req->cb) { 1892 req->cb(req, uv_translate_sys_error(err)); 1893 } 1894 1895 handle->stream.conn.write_reqs_pending--; 1896 1897 if (handle->flags & UV_HANDLE_NON_OVERLAPPED_PIPE && 1898 handle->pipe.conn.non_overlapped_writes_tail) { 1899 assert(handle->stream.conn.write_reqs_pending > 0); 1900 uv_queue_non_overlapped_write(handle); 1901 } 1902 1903 if (handle->stream.conn.shutdown_req != NULL && 1904 handle->stream.conn.write_reqs_pending == 0) { 1905 uv_want_endgame(loop, (uv_handle_t*)handle); 1906 } 1907 1908 DECREASE_PENDING_REQ_COUNT(handle); 1909 } 1910 1911 1912 void uv_process_pipe_accept_req(uv_loop_t* loop, uv_pipe_t* handle, 1913 uv_req_t* raw_req) { 1914 uv_pipe_accept_t* req = (uv_pipe_accept_t*) raw_req; 1915 1916 assert(handle->type == UV_NAMED_PIPE); 1917 1918 if (handle->flags & UV_HANDLE_CLOSING) { 1919 /* The req->pipeHandle should be freed already in uv_pipe_cleanup(). */ 1920 assert(req->pipeHandle == INVALID_HANDLE_VALUE); 1921 DECREASE_PENDING_REQ_COUNT(handle); 1922 return; 1923 } 1924 1925 if (REQ_SUCCESS(req)) { 1926 assert(req->pipeHandle != INVALID_HANDLE_VALUE); 1927 req->next_pending = handle->pipe.serv.pending_accepts; 1928 handle->pipe.serv.pending_accepts = req; 1929 1930 if (handle->stream.serv.connection_cb) { 1931 handle->stream.serv.connection_cb((uv_stream_t*)handle, 0); 1932 } 1933 } else { 1934 if (req->pipeHandle != INVALID_HANDLE_VALUE) { 1935 CloseHandle(req->pipeHandle); 1936 req->pipeHandle = INVALID_HANDLE_VALUE; 1937 } 1938 if (!(handle->flags & UV_HANDLE_CLOSING)) { 1939 uv_pipe_queue_accept(loop, handle, req, FALSE); 1940 } 1941 } 1942 1943 DECREASE_PENDING_REQ_COUNT(handle); 1944 } 1945 1946 1947 void uv_process_pipe_connect_req(uv_loop_t* loop, uv_pipe_t* handle, 1948 uv_connect_t* req) { 1949 int err; 1950 1951 assert(handle->type == UV_NAMED_PIPE); 1952 1953 UNREGISTER_HANDLE_REQ(loop, handle, req); 1954 1955 if (req->cb) { 1956 err = 0; 1957 if (REQ_SUCCESS(req)) { 1958 uv_pipe_connection_init(handle); 1959 } else { 1960 err = GET_REQ_ERROR(req); 1961 } 1962 req->cb(req, uv_translate_sys_error(err)); 1963 } 1964 1965 DECREASE_PENDING_REQ_COUNT(handle); 1966 } 1967 1968 1969 void uv_process_pipe_shutdown_req(uv_loop_t* loop, uv_pipe_t* handle, 1970 uv_shutdown_t* req) { 1971 assert(handle->type == UV_NAMED_PIPE); 1972 1973 UNREGISTER_HANDLE_REQ(loop, handle, req); 1974 1975 if (handle->flags & UV_HANDLE_READABLE) { 1976 /* Initialize and optionally start the eof timer. Only do this if the pipe 1977 * is readable and we haven't seen EOF come in ourselves. */ 1978 eof_timer_init(handle); 1979 1980 /* If reading start the timer right now. Otherwise uv_pipe_queue_read will 1981 * start it. */ 1982 if (handle->flags & UV_HANDLE_READ_PENDING) { 1983 eof_timer_start(handle); 1984 } 1985 1986 } else { 1987 /* This pipe is not readable. We can just close it to let the other end 1988 * know that we're done writing. */ 1989 close_pipe(handle); 1990 } 1991 1992 if (req->cb) { 1993 req->cb(req, 0); 1994 } 1995 1996 DECREASE_PENDING_REQ_COUNT(handle); 1997 } 1998 1999 2000 static void eof_timer_init(uv_pipe_t* pipe) { 2001 int r; 2002 2003 assert(pipe->pipe.conn.eof_timer == NULL); 2004 assert(pipe->flags & UV_HANDLE_CONNECTION); 2005 2006 pipe->pipe.conn.eof_timer = (uv_timer_t*) uv__malloc(sizeof *pipe->pipe.conn.eof_timer); 2007 2008 r = uv_timer_init(pipe->loop, pipe->pipe.conn.eof_timer); 2009 assert(r == 0); /* timers can't fail */ 2010 pipe->pipe.conn.eof_timer->data = pipe; 2011 uv_unref((uv_handle_t*) pipe->pipe.conn.eof_timer); 2012 } 2013 2014 2015 static void eof_timer_start(uv_pipe_t* pipe) { 2016 assert(pipe->flags & UV_HANDLE_CONNECTION); 2017 2018 if (pipe->pipe.conn.eof_timer != NULL) { 2019 uv_timer_start(pipe->pipe.conn.eof_timer, eof_timer_cb, eof_timeout, 0); 2020 } 2021 } 2022 2023 2024 static void eof_timer_stop(uv_pipe_t* pipe) { 2025 assert(pipe->flags & UV_HANDLE_CONNECTION); 2026 2027 if (pipe->pipe.conn.eof_timer != NULL) { 2028 uv_timer_stop(pipe->pipe.conn.eof_timer); 2029 } 2030 } 2031 2032 2033 static void eof_timer_cb(uv_timer_t* timer) { 2034 uv_pipe_t* pipe = (uv_pipe_t*) timer->data; 2035 uv_loop_t* loop = timer->loop; 2036 2037 assert(pipe->type == UV_NAMED_PIPE); 2038 2039 /* This should always be true, since we start the timer only in 2040 * uv_pipe_queue_read after successfully calling ReadFile, or in 2041 * uv_process_pipe_shutdown_req if a read is pending, and we always 2042 * immediately stop the timer in uv_process_pipe_read_req. */ 2043 assert(pipe->flags & UV_HANDLE_READ_PENDING); 2044 2045 /* If there are many packets coming off the iocp then the timer callback may 2046 * be called before the read request is coming off the queue. Therefore we 2047 * check here if the read request has completed but will be processed later. 2048 */ 2049 if ((pipe->flags & UV_HANDLE_READ_PENDING) && 2050 HasOverlappedIoCompleted(&pipe->read_req.u.io.overlapped)) { 2051 return; 2052 } 2053 2054 /* Force both ends off the pipe. */ 2055 close_pipe(pipe); 2056 2057 /* Stop reading, so the pending read that is going to fail will not be 2058 * reported to the user. */ 2059 uv_read_stop((uv_stream_t*) pipe); 2060 2061 /* Report the eof and update flags. This will get reported even if the user 2062 * stopped reading in the meantime. TODO: is that okay? */ 2063 uv_pipe_read_eof(loop, pipe, uv_null_buf_); 2064 } 2065 2066 2067 static void eof_timer_destroy(uv_pipe_t* pipe) { 2068 assert(pipe->flags & UV_HANDLE_CONNECTION); 2069 2070 if (pipe->pipe.conn.eof_timer) { 2071 uv_close((uv_handle_t*) pipe->pipe.conn.eof_timer, eof_timer_close_cb); 2072 pipe->pipe.conn.eof_timer = NULL; 2073 } 2074 } 2075 2076 2077 static void eof_timer_close_cb(uv_handle_t* handle) { 2078 assert(handle->type == UV_TIMER); 2079 uv__free(handle); 2080 } 2081 2082 2083 int uv_pipe_open(uv_pipe_t* pipe, uv_file file) { 2084 HANDLE os_handle = uv__get_osfhandle(file); 2085 NTSTATUS nt_status; 2086 IO_STATUS_BLOCK io_status; 2087 FILE_ACCESS_INFORMATION access; 2088 DWORD duplex_flags = 0; 2089 2090 if (os_handle == INVALID_HANDLE_VALUE) 2091 return UV_EBADF; 2092 2093 uv__once_init(); 2094 /* In order to avoid closing a stdio file descriptor 0-2, duplicate the 2095 * underlying OS handle and forget about the original fd. 2096 * We could also opt to use the original OS handle and just never close it, 2097 * but then there would be no reliable way to cancel pending read operations 2098 * upon close. 2099 */ 2100 if (file <= 2) { 2101 if (!DuplicateHandle(INVALID_HANDLE_VALUE, 2102 os_handle, 2103 INVALID_HANDLE_VALUE, 2104 &os_handle, 2105 0, 2106 FALSE, 2107 DUPLICATE_SAME_ACCESS)) 2108 return uv_translate_sys_error(GetLastError()); 2109 file = -1; 2110 } 2111 2112 /* Determine what kind of permissions we have on this handle. 2113 * Cygwin opens the pipe in message mode, but we can support it, 2114 * just query the access flags and set the stream flags accordingly. 2115 */ 2116 nt_status = pNtQueryInformationFile(os_handle, 2117 &io_status, 2118 &access, 2119 sizeof(access), 2120 FileAccessInformation); 2121 if (nt_status != STATUS_SUCCESS) 2122 return UV_EINVAL; 2123 2124 if (pipe->ipc) { 2125 if (!(access.AccessFlags & FILE_WRITE_DATA) || 2126 !(access.AccessFlags & FILE_READ_DATA)) { 2127 return UV_EINVAL; 2128 } 2129 } 2130 2131 if (access.AccessFlags & FILE_WRITE_DATA) 2132 duplex_flags |= UV_HANDLE_WRITABLE; 2133 if (access.AccessFlags & FILE_READ_DATA) 2134 duplex_flags |= UV_HANDLE_READABLE; 2135 2136 if (os_handle == INVALID_HANDLE_VALUE || 2137 uv_set_pipe_handle(pipe->loop, 2138 pipe, 2139 os_handle, 2140 file, 2141 duplex_flags) == -1) { 2142 return UV_EINVAL; 2143 } 2144 2145 uv_pipe_connection_init(pipe); 2146 2147 if (pipe->ipc) { 2148 assert(!(pipe->flags & UV_HANDLE_NON_OVERLAPPED_PIPE)); 2149 pipe->pipe.conn.ipc_remote_pid = uv_os_getppid(); 2150 assert(pipe->pipe.conn.ipc_remote_pid != (DWORD) -1); 2151 } 2152 return 0; 2153 } 2154 2155 2156 static int uv__pipe_getname(const uv_pipe_t* handle, char* buffer, size_t* size) { 2157 NTSTATUS nt_status; 2158 IO_STATUS_BLOCK io_status; 2159 FILE_NAME_INFORMATION tmp_name_info; 2160 FILE_NAME_INFORMATION* name_info; 2161 WCHAR* name_buf; 2162 unsigned int addrlen; 2163 unsigned int name_size; 2164 unsigned int name_len; 2165 int err; 2166 2167 uv__once_init(); 2168 name_info = NULL; 2169 2170 if (handle->handle == INVALID_HANDLE_VALUE) { 2171 *size = 0; 2172 return UV_EINVAL; 2173 } 2174 2175 /* NtQueryInformationFile will block if another thread is performing a 2176 * blocking operation on the queried handle. If the pipe handle is 2177 * synchronous, there may be a worker thread currently calling ReadFile() on 2178 * the pipe handle, which could cause a deadlock. To avoid this, interrupt 2179 * the read. */ 2180 if (handle->flags & UV_HANDLE_CONNECTION && 2181 handle->flags & UV_HANDLE_NON_OVERLAPPED_PIPE) { 2182 uv__pipe_interrupt_read((uv_pipe_t*) handle); /* cast away const warning */ 2183 } 2184 2185 nt_status = pNtQueryInformationFile(handle->handle, 2186 &io_status, 2187 &tmp_name_info, 2188 sizeof tmp_name_info, 2189 FileNameInformation); 2190 if (nt_status == STATUS_BUFFER_OVERFLOW) { 2191 name_size = sizeof(*name_info) + tmp_name_info.FileNameLength; 2192 name_info = uv__malloc(name_size); 2193 if (!name_info) { 2194 *size = 0; 2195 err = UV_ENOMEM; 2196 goto cleanup; 2197 } 2198 2199 nt_status = pNtQueryInformationFile(handle->handle, 2200 &io_status, 2201 name_info, 2202 name_size, 2203 FileNameInformation); 2204 } 2205 2206 if (nt_status != STATUS_SUCCESS) { 2207 *size = 0; 2208 err = uv_translate_sys_error(pRtlNtStatusToDosError(nt_status)); 2209 goto error; 2210 } 2211 2212 if (!name_info) { 2213 /* the struct on stack was used */ 2214 name_buf = tmp_name_info.FileName; 2215 name_len = tmp_name_info.FileNameLength; 2216 } else { 2217 name_buf = name_info->FileName; 2218 name_len = name_info->FileNameLength; 2219 } 2220 2221 if (name_len == 0) { 2222 *size = 0; 2223 err = 0; 2224 goto error; 2225 } 2226 2227 name_len /= sizeof(WCHAR); 2228 2229 /* check how much space we need */ 2230 addrlen = WideCharToMultiByte(CP_UTF8, 2231 0, 2232 name_buf, 2233 name_len, 2234 NULL, 2235 0, 2236 NULL, 2237 NULL); 2238 if (!addrlen) { 2239 *size = 0; 2240 err = uv_translate_sys_error(GetLastError()); 2241 goto error; 2242 } else if (pipe_prefix_len + addrlen >= *size) { 2243 /* "\\\\.\\pipe" + name */ 2244 *size = pipe_prefix_len + addrlen + 1; 2245 err = UV_ENOBUFS; 2246 goto error; 2247 } 2248 2249 memcpy(buffer, pipe_prefix, pipe_prefix_len); 2250 addrlen = WideCharToMultiByte(CP_UTF8, 2251 0, 2252 name_buf, 2253 name_len, 2254 buffer+pipe_prefix_len, 2255 *size-pipe_prefix_len, 2256 NULL, 2257 NULL); 2258 if (!addrlen) { 2259 *size = 0; 2260 err = uv_translate_sys_error(GetLastError()); 2261 goto error; 2262 } 2263 2264 addrlen += pipe_prefix_len; 2265 *size = addrlen; 2266 buffer[addrlen] = '\0'; 2267 2268 err = 0; 2269 2270 error: 2271 uv__free(name_info); 2272 2273 cleanup: 2274 return err; 2275 } 2276 2277 2278 int uv_pipe_pending_count(uv_pipe_t* handle) { 2279 if (!handle->ipc) 2280 return 0; 2281 return handle->pipe.conn.ipc_xfer_queue_length; 2282 } 2283 2284 2285 int uv_pipe_getsockname(const uv_pipe_t* handle, char* buffer, size_t* size) { 2286 if (handle->flags & UV_HANDLE_BOUND) 2287 return uv__pipe_getname(handle, buffer, size); 2288 2289 if (handle->flags & UV_HANDLE_CONNECTION || 2290 handle->handle != INVALID_HANDLE_VALUE) { 2291 *size = 0; 2292 return 0; 2293 } 2294 2295 return UV_EBADF; 2296 } 2297 2298 2299 int uv_pipe_getpeername(const uv_pipe_t* handle, char* buffer, size_t* size) { 2300 /* emulate unix behaviour */ 2301 if (handle->flags & UV_HANDLE_BOUND) 2302 return UV_ENOTCONN; 2303 2304 if (handle->handle != INVALID_HANDLE_VALUE) 2305 return uv__pipe_getname(handle, buffer, size); 2306 2307 return UV_EBADF; 2308 } 2309 2310 2311 uv_handle_type uv_pipe_pending_type(uv_pipe_t* handle) { 2312 if (!handle->ipc) 2313 return UV_UNKNOWN_HANDLE; 2314 if (handle->pipe.conn.ipc_xfer_queue_length == 0) 2315 return UV_UNKNOWN_HANDLE; 2316 else 2317 return UV_TCP; 2318 } 2319 2320 int uv_pipe_chmod(uv_pipe_t* handle, int mode) { 2321 SID_IDENTIFIER_AUTHORITY sid_world = { SECURITY_WORLD_SID_AUTHORITY }; 2322 PACL old_dacl, new_dacl; 2323 PSECURITY_DESCRIPTOR sd; 2324 EXPLICIT_ACCESS ea; 2325 PSID everyone; 2326 int error; 2327 2328 if (handle == NULL || handle->handle == INVALID_HANDLE_VALUE) 2329 return UV_EBADF; 2330 2331 if (mode != UV_READABLE && 2332 mode != UV_WRITABLE && 2333 mode != (UV_WRITABLE | UV_READABLE)) 2334 return UV_EINVAL; 2335 2336 if (!AllocateAndInitializeSid(&sid_world, 2337 1, 2338 SECURITY_WORLD_RID, 2339 0, 0, 0, 0, 0, 0, 0, 2340 &everyone)) { 2341 error = GetLastError(); 2342 goto done; 2343 } 2344 2345 if (GetSecurityInfo(handle->handle, 2346 SE_KERNEL_OBJECT, 2347 DACL_SECURITY_INFORMATION, 2348 NULL, 2349 NULL, 2350 &old_dacl, 2351 NULL, 2352 &sd)) { 2353 error = GetLastError(); 2354 goto clean_sid; 2355 } 2356 2357 memset(&ea, 0, sizeof(EXPLICIT_ACCESS)); 2358 if (mode & UV_READABLE) 2359 ea.grfAccessPermissions |= GENERIC_READ | FILE_WRITE_ATTRIBUTES; 2360 if (mode & UV_WRITABLE) 2361 ea.grfAccessPermissions |= GENERIC_WRITE | FILE_READ_ATTRIBUTES; 2362 ea.grfAccessPermissions |= SYNCHRONIZE; 2363 ea.grfAccessMode = SET_ACCESS; 2364 ea.grfInheritance = NO_INHERITANCE; 2365 ea.Trustee.TrusteeForm = TRUSTEE_IS_SID; 2366 ea.Trustee.TrusteeType = TRUSTEE_IS_WELL_KNOWN_GROUP; 2367 ea.Trustee.ptstrName = (LPTSTR)everyone; 2368 2369 if (SetEntriesInAcl(1, &ea, old_dacl, &new_dacl)) { 2370 error = GetLastError(); 2371 goto clean_sd; 2372 } 2373 2374 if (SetSecurityInfo(handle->handle, 2375 SE_KERNEL_OBJECT, 2376 DACL_SECURITY_INFORMATION, 2377 NULL, 2378 NULL, 2379 new_dacl, 2380 NULL)) { 2381 error = GetLastError(); 2382 goto clean_dacl; 2383 } 2384 2385 error = 0; 2386 2387 clean_dacl: 2388 LocalFree((HLOCAL) new_dacl); 2389 clean_sd: 2390 LocalFree((HLOCAL) sd); 2391 clean_sid: 2392 FreeSid(everyone); 2393 done: 2394 return uv_translate_sys_error(error); 2395 } 2396