1cb14a3feSDimitry Andric //===-- llvm/Support/raw_socket_stream.cpp - Socket streams --*- C++ -*-===// 2cb14a3feSDimitry Andric // 3cb14a3feSDimitry Andric // Part of the LLVM Project, under the Apache License v2.0 with LLVM Exceptions. 4cb14a3feSDimitry Andric // See https://llvm.org/LICENSE.txt for license information. 5cb14a3feSDimitry Andric // SPDX-License-Identifier: Apache-2.0 WITH LLVM-exception 6cb14a3feSDimitry Andric // 7cb14a3feSDimitry Andric //===----------------------------------------------------------------------===// 8cb14a3feSDimitry Andric // 9cb14a3feSDimitry Andric // This file contains raw_ostream implementations for streams to communicate 10cb14a3feSDimitry Andric // via UNIX sockets 11cb14a3feSDimitry Andric // 12cb14a3feSDimitry Andric //===----------------------------------------------------------------------===// 13cb14a3feSDimitry Andric 14cb14a3feSDimitry Andric #include "llvm/Support/raw_socket_stream.h" 15cb14a3feSDimitry Andric #include "llvm/Config/config.h" 16cb14a3feSDimitry Andric #include "llvm/Support/Error.h" 17*0fca6ea1SDimitry Andric #include "llvm/Support/FileSystem.h" 18*0fca6ea1SDimitry Andric 19*0fca6ea1SDimitry Andric #include <atomic> 20*0fca6ea1SDimitry Andric #include <fcntl.h> 21*0fca6ea1SDimitry Andric #include <functional> 22*0fca6ea1SDimitry Andric #include <thread> 23cb14a3feSDimitry Andric 24cb14a3feSDimitry Andric #ifndef _WIN32 25*0fca6ea1SDimitry Andric #include <poll.h> 26cb14a3feSDimitry Andric #include <sys/socket.h> 27cb14a3feSDimitry Andric #include <sys/un.h> 28cb14a3feSDimitry Andric #else 29cb14a3feSDimitry Andric #include "llvm/Support/Windows/WindowsSupport.h" 30cb14a3feSDimitry Andric // winsock2.h must be included before afunix.h. Briefly turn off clang-format to 31cb14a3feSDimitry Andric // avoid error. 32cb14a3feSDimitry Andric // clang-format off 33cb14a3feSDimitry Andric #include <winsock2.h> 34cb14a3feSDimitry Andric #include <afunix.h> 35cb14a3feSDimitry Andric // clang-format on 36cb14a3feSDimitry Andric #include <io.h> 37cb14a3feSDimitry Andric #endif // _WIN32 38cb14a3feSDimitry Andric 39cb14a3feSDimitry Andric #if defined(HAVE_UNISTD_H) 40cb14a3feSDimitry Andric #include <unistd.h> 41cb14a3feSDimitry Andric #endif 42cb14a3feSDimitry Andric 43cb14a3feSDimitry Andric using namespace llvm; 44cb14a3feSDimitry Andric 45cb14a3feSDimitry Andric #ifdef _WIN32 46cb14a3feSDimitry Andric WSABalancer::WSABalancer() { 47cb14a3feSDimitry Andric WSADATA WsaData; 48cb14a3feSDimitry Andric ::memset(&WsaData, 0, sizeof(WsaData)); 49cb14a3feSDimitry Andric if (WSAStartup(MAKEWORD(2, 2), &WsaData) != 0) { 50cb14a3feSDimitry Andric llvm::report_fatal_error("WSAStartup failed"); 51cb14a3feSDimitry Andric } 52cb14a3feSDimitry Andric } 53cb14a3feSDimitry Andric 54cb14a3feSDimitry Andric WSABalancer::~WSABalancer() { WSACleanup(); } 55cb14a3feSDimitry Andric #endif // _WIN32 56cb14a3feSDimitry Andric 57cb14a3feSDimitry Andric static std::error_code getLastSocketErrorCode() { 58cb14a3feSDimitry Andric #ifdef _WIN32 59cb14a3feSDimitry Andric return std::error_code(::WSAGetLastError(), std::system_category()); 60cb14a3feSDimitry Andric #else 61*0fca6ea1SDimitry Andric return errnoAsErrorCode(); 62cb14a3feSDimitry Andric #endif 63cb14a3feSDimitry Andric } 64cb14a3feSDimitry Andric 65*0fca6ea1SDimitry Andric static sockaddr_un setSocketAddr(StringRef SocketPath) { 66cb14a3feSDimitry Andric struct sockaddr_un Addr; 67cb14a3feSDimitry Andric memset(&Addr, 0, sizeof(Addr)); 68cb14a3feSDimitry Andric Addr.sun_family = AF_UNIX; 69cb14a3feSDimitry Andric strncpy(Addr.sun_path, SocketPath.str().c_str(), sizeof(Addr.sun_path) - 1); 70*0fca6ea1SDimitry Andric return Addr; 71*0fca6ea1SDimitry Andric } 72cb14a3feSDimitry Andric 73*0fca6ea1SDimitry Andric static Expected<int> getSocketFD(StringRef SocketPath) { 74cb14a3feSDimitry Andric #ifdef _WIN32 75*0fca6ea1SDimitry Andric SOCKET Socket = socket(AF_UNIX, SOCK_STREAM, 0); 76*0fca6ea1SDimitry Andric if (Socket == INVALID_SOCKET) { 77cb14a3feSDimitry Andric #else 78*0fca6ea1SDimitry Andric int Socket = socket(AF_UNIX, SOCK_STREAM, 0); 79*0fca6ea1SDimitry Andric if (Socket == -1) { 80cb14a3feSDimitry Andric #endif // _WIN32 81cb14a3feSDimitry Andric return llvm::make_error<StringError>(getLastSocketErrorCode(), 82cb14a3feSDimitry Andric "Create socket failed"); 83cb14a3feSDimitry Andric } 84cb14a3feSDimitry Andric 85*0fca6ea1SDimitry Andric struct sockaddr_un Addr = setSocketAddr(SocketPath); 86*0fca6ea1SDimitry Andric if (::connect(Socket, (struct sockaddr *)&Addr, sizeof(Addr)) == -1) 87cb14a3feSDimitry Andric return llvm::make_error<StringError>(getLastSocketErrorCode(), 88cb14a3feSDimitry Andric "Connect socket failed"); 89*0fca6ea1SDimitry Andric 90cb14a3feSDimitry Andric #ifdef _WIN32 91*0fca6ea1SDimitry Andric return _open_osfhandle(Socket, 0); 92cb14a3feSDimitry Andric #else 93*0fca6ea1SDimitry Andric return Socket; 94cb14a3feSDimitry Andric #endif // _WIN32 95cb14a3feSDimitry Andric } 96cb14a3feSDimitry Andric 97*0fca6ea1SDimitry Andric ListeningSocket::ListeningSocket(int SocketFD, StringRef SocketPath, 98*0fca6ea1SDimitry Andric int PipeFD[2]) 99*0fca6ea1SDimitry Andric : FD(SocketFD), SocketPath(SocketPath), PipeFD{PipeFD[0], PipeFD[1]} {} 100*0fca6ea1SDimitry Andric 101*0fca6ea1SDimitry Andric ListeningSocket::ListeningSocket(ListeningSocket &&LS) 102*0fca6ea1SDimitry Andric : FD(LS.FD.load()), SocketPath(LS.SocketPath), 103*0fca6ea1SDimitry Andric PipeFD{LS.PipeFD[0], LS.PipeFD[1]} { 104*0fca6ea1SDimitry Andric 105*0fca6ea1SDimitry Andric LS.FD = -1; 106*0fca6ea1SDimitry Andric LS.SocketPath.clear(); 107*0fca6ea1SDimitry Andric LS.PipeFD[0] = -1; 108*0fca6ea1SDimitry Andric LS.PipeFD[1] = -1; 109*0fca6ea1SDimitry Andric } 110*0fca6ea1SDimitry Andric 111*0fca6ea1SDimitry Andric Expected<ListeningSocket> ListeningSocket::createUnix(StringRef SocketPath, 112*0fca6ea1SDimitry Andric int MaxBacklog) { 113*0fca6ea1SDimitry Andric 114*0fca6ea1SDimitry Andric // Handle instances where the target socket address already exists and 115*0fca6ea1SDimitry Andric // differentiate between a preexisting file with and without a bound socket 116*0fca6ea1SDimitry Andric // 117*0fca6ea1SDimitry Andric // ::bind will return std::errc:address_in_use if a file at the socket address 118*0fca6ea1SDimitry Andric // already exists (e.g., the file was not properly unlinked due to a crash) 119*0fca6ea1SDimitry Andric // even if another socket has not yet binded to that address 120*0fca6ea1SDimitry Andric if (llvm::sys::fs::exists(SocketPath)) { 121*0fca6ea1SDimitry Andric Expected<int> MaybeFD = getSocketFD(SocketPath); 122*0fca6ea1SDimitry Andric if (!MaybeFD) { 123*0fca6ea1SDimitry Andric 124*0fca6ea1SDimitry Andric // Regardless of the error, notify the caller that a file already exists 125*0fca6ea1SDimitry Andric // at the desired socket address and that there is no bound socket at that 126*0fca6ea1SDimitry Andric // address. The file must be removed before ::bind can use the address 127*0fca6ea1SDimitry Andric consumeError(MaybeFD.takeError()); 128*0fca6ea1SDimitry Andric return llvm::make_error<StringError>( 129*0fca6ea1SDimitry Andric std::make_error_code(std::errc::file_exists), 130*0fca6ea1SDimitry Andric "Socket address unavailable"); 131*0fca6ea1SDimitry Andric } 132*0fca6ea1SDimitry Andric ::close(std::move(*MaybeFD)); 133*0fca6ea1SDimitry Andric 134*0fca6ea1SDimitry Andric // Notify caller that the provided socket address already has a bound socket 135*0fca6ea1SDimitry Andric return llvm::make_error<StringError>( 136*0fca6ea1SDimitry Andric std::make_error_code(std::errc::address_in_use), 137*0fca6ea1SDimitry Andric "Socket address unavailable"); 138*0fca6ea1SDimitry Andric } 139*0fca6ea1SDimitry Andric 140*0fca6ea1SDimitry Andric #ifdef _WIN32 141*0fca6ea1SDimitry Andric WSABalancer _; 142*0fca6ea1SDimitry Andric SOCKET Socket = socket(AF_UNIX, SOCK_STREAM, 0); 143*0fca6ea1SDimitry Andric if (Socket == INVALID_SOCKET) 144*0fca6ea1SDimitry Andric #else 145*0fca6ea1SDimitry Andric int Socket = socket(AF_UNIX, SOCK_STREAM, 0); 146*0fca6ea1SDimitry Andric if (Socket == -1) 147*0fca6ea1SDimitry Andric #endif 148*0fca6ea1SDimitry Andric return llvm::make_error<StringError>(getLastSocketErrorCode(), 149*0fca6ea1SDimitry Andric "socket create failed"); 150*0fca6ea1SDimitry Andric 151*0fca6ea1SDimitry Andric struct sockaddr_un Addr = setSocketAddr(SocketPath); 152*0fca6ea1SDimitry Andric if (::bind(Socket, (struct sockaddr *)&Addr, sizeof(Addr)) == -1) { 153*0fca6ea1SDimitry Andric // Grab error code from call to ::bind before calling ::close 154*0fca6ea1SDimitry Andric std::error_code EC = getLastSocketErrorCode(); 155*0fca6ea1SDimitry Andric ::close(Socket); 156*0fca6ea1SDimitry Andric return llvm::make_error<StringError>(EC, "Bind error"); 157*0fca6ea1SDimitry Andric } 158*0fca6ea1SDimitry Andric 159*0fca6ea1SDimitry Andric // Mark socket as passive so incoming connections can be accepted 160*0fca6ea1SDimitry Andric if (::listen(Socket, MaxBacklog) == -1) 161*0fca6ea1SDimitry Andric return llvm::make_error<StringError>(getLastSocketErrorCode(), 162*0fca6ea1SDimitry Andric "Listen error"); 163*0fca6ea1SDimitry Andric 164*0fca6ea1SDimitry Andric int PipeFD[2]; 165*0fca6ea1SDimitry Andric #ifdef _WIN32 166*0fca6ea1SDimitry Andric // Reserve 1 byte for the pipe and use default textmode 167*0fca6ea1SDimitry Andric if (::_pipe(PipeFD, 1, 0) == -1) 168*0fca6ea1SDimitry Andric #else 169*0fca6ea1SDimitry Andric if (::pipe(PipeFD) == -1) 170*0fca6ea1SDimitry Andric #endif // _WIN32 171*0fca6ea1SDimitry Andric return llvm::make_error<StringError>(getLastSocketErrorCode(), 172*0fca6ea1SDimitry Andric "pipe failed"); 173*0fca6ea1SDimitry Andric 174*0fca6ea1SDimitry Andric #ifdef _WIN32 175*0fca6ea1SDimitry Andric return ListeningSocket{_open_osfhandle(Socket, 0), SocketPath, PipeFD}; 176*0fca6ea1SDimitry Andric #else 177*0fca6ea1SDimitry Andric return ListeningSocket{Socket, SocketPath, PipeFD}; 178*0fca6ea1SDimitry Andric #endif // _WIN32 179*0fca6ea1SDimitry Andric } 180*0fca6ea1SDimitry Andric 181*0fca6ea1SDimitry Andric // If a file descriptor being monitored by ::poll is closed by another thread, 182*0fca6ea1SDimitry Andric // the result is unspecified. In the case ::poll does not unblock and return, 183*0fca6ea1SDimitry Andric // when ActiveFD is closed, you can provide another file descriptor via CancelFD 184*0fca6ea1SDimitry Andric // that when written to will cause poll to return. Typically CancelFD is the 185*0fca6ea1SDimitry Andric // read end of a unidirectional pipe. 186*0fca6ea1SDimitry Andric // 187*0fca6ea1SDimitry Andric // Timeout should be -1 to block indefinitly 188*0fca6ea1SDimitry Andric // 189*0fca6ea1SDimitry Andric // getActiveFD is a callback to handle ActiveFD's of std::atomic<int> and int 190*0fca6ea1SDimitry Andric static std::error_code 191*0fca6ea1SDimitry Andric manageTimeout(const std::chrono::milliseconds &Timeout, 192*0fca6ea1SDimitry Andric const std::function<int()> &getActiveFD, 193*0fca6ea1SDimitry Andric const std::optional<int> &CancelFD = std::nullopt) { 194*0fca6ea1SDimitry Andric struct pollfd FD[2]; 195*0fca6ea1SDimitry Andric FD[0].events = POLLIN; 196*0fca6ea1SDimitry Andric #ifdef _WIN32 197*0fca6ea1SDimitry Andric SOCKET WinServerSock = _get_osfhandle(getActiveFD()); 198*0fca6ea1SDimitry Andric FD[0].fd = WinServerSock; 199*0fca6ea1SDimitry Andric #else 200*0fca6ea1SDimitry Andric FD[0].fd = getActiveFD(); 201*0fca6ea1SDimitry Andric #endif 202*0fca6ea1SDimitry Andric uint8_t FDCount = 1; 203*0fca6ea1SDimitry Andric if (CancelFD.has_value()) { 204*0fca6ea1SDimitry Andric FD[1].events = POLLIN; 205*0fca6ea1SDimitry Andric FD[1].fd = CancelFD.value(); 206*0fca6ea1SDimitry Andric FDCount++; 207*0fca6ea1SDimitry Andric } 208*0fca6ea1SDimitry Andric 209*0fca6ea1SDimitry Andric // Keep track of how much time has passed in case ::poll or WSAPoll are 210*0fca6ea1SDimitry Andric // interupted by a signal and need to be recalled 211*0fca6ea1SDimitry Andric auto Start = std::chrono::steady_clock::now(); 212*0fca6ea1SDimitry Andric auto RemainingTimeout = Timeout; 213*0fca6ea1SDimitry Andric int PollStatus = 0; 214*0fca6ea1SDimitry Andric do { 215*0fca6ea1SDimitry Andric // If Timeout is -1 then poll should block and RemainingTimeout does not 216*0fca6ea1SDimitry Andric // need to be recalculated 217*0fca6ea1SDimitry Andric if (PollStatus != 0 && Timeout != std::chrono::milliseconds(-1)) { 218*0fca6ea1SDimitry Andric auto TotalElapsedTime = 219*0fca6ea1SDimitry Andric std::chrono::duration_cast<std::chrono::milliseconds>( 220*0fca6ea1SDimitry Andric std::chrono::steady_clock::now() - Start); 221*0fca6ea1SDimitry Andric 222*0fca6ea1SDimitry Andric if (TotalElapsedTime >= Timeout) 223*0fca6ea1SDimitry Andric return std::make_error_code(std::errc::operation_would_block); 224*0fca6ea1SDimitry Andric 225*0fca6ea1SDimitry Andric RemainingTimeout = Timeout - TotalElapsedTime; 226*0fca6ea1SDimitry Andric } 227*0fca6ea1SDimitry Andric #ifdef _WIN32 228*0fca6ea1SDimitry Andric PollStatus = WSAPoll(FD, FDCount, RemainingTimeout.count()); 229*0fca6ea1SDimitry Andric } while (PollStatus == SOCKET_ERROR && 230*0fca6ea1SDimitry Andric getLastSocketErrorCode() == std::errc::interrupted); 231*0fca6ea1SDimitry Andric #else 232*0fca6ea1SDimitry Andric PollStatus = ::poll(FD, FDCount, RemainingTimeout.count()); 233*0fca6ea1SDimitry Andric } while (PollStatus == -1 && 234*0fca6ea1SDimitry Andric getLastSocketErrorCode() == std::errc::interrupted); 235*0fca6ea1SDimitry Andric #endif 236*0fca6ea1SDimitry Andric 237*0fca6ea1SDimitry Andric // If ActiveFD equals -1 or CancelFD has data to be read then the operation 238*0fca6ea1SDimitry Andric // has been canceled by another thread 239*0fca6ea1SDimitry Andric if (getActiveFD() == -1 || (CancelFD.has_value() && FD[1].revents & POLLIN)) 240*0fca6ea1SDimitry Andric return std::make_error_code(std::errc::operation_canceled); 241*0fca6ea1SDimitry Andric #if _WIN32 242*0fca6ea1SDimitry Andric if (PollStatus == SOCKET_ERROR) 243*0fca6ea1SDimitry Andric #else 244*0fca6ea1SDimitry Andric if (PollStatus == -1) 245*0fca6ea1SDimitry Andric #endif 246*0fca6ea1SDimitry Andric return getLastSocketErrorCode(); 247*0fca6ea1SDimitry Andric if (PollStatus == 0) 248*0fca6ea1SDimitry Andric return std::make_error_code(std::errc::timed_out); 249*0fca6ea1SDimitry Andric if (FD[0].revents & POLLNVAL) 250*0fca6ea1SDimitry Andric return std::make_error_code(std::errc::bad_file_descriptor); 251*0fca6ea1SDimitry Andric return std::error_code(); 252*0fca6ea1SDimitry Andric } 253*0fca6ea1SDimitry Andric 254*0fca6ea1SDimitry Andric Expected<std::unique_ptr<raw_socket_stream>> 255*0fca6ea1SDimitry Andric ListeningSocket::accept(const std::chrono::milliseconds &Timeout) { 256*0fca6ea1SDimitry Andric auto getActiveFD = [this]() -> int { return FD; }; 257*0fca6ea1SDimitry Andric std::error_code TimeoutErr = manageTimeout(Timeout, getActiveFD, PipeFD[0]); 258*0fca6ea1SDimitry Andric if (TimeoutErr) 259*0fca6ea1SDimitry Andric return llvm::make_error<StringError>(TimeoutErr, "Timeout error"); 260*0fca6ea1SDimitry Andric 261*0fca6ea1SDimitry Andric int AcceptFD; 262*0fca6ea1SDimitry Andric #ifdef _WIN32 263*0fca6ea1SDimitry Andric SOCKET WinAcceptSock = ::accept(_get_osfhandle(FD), NULL, NULL); 264*0fca6ea1SDimitry Andric AcceptFD = _open_osfhandle(WinAcceptSock, 0); 265*0fca6ea1SDimitry Andric #else 266*0fca6ea1SDimitry Andric AcceptFD = ::accept(FD, NULL, NULL); 267*0fca6ea1SDimitry Andric #endif 268*0fca6ea1SDimitry Andric 269*0fca6ea1SDimitry Andric if (AcceptFD == -1) 270*0fca6ea1SDimitry Andric return llvm::make_error<StringError>(getLastSocketErrorCode(), 271*0fca6ea1SDimitry Andric "Socket accept failed"); 272*0fca6ea1SDimitry Andric return std::make_unique<raw_socket_stream>(AcceptFD); 273*0fca6ea1SDimitry Andric } 274*0fca6ea1SDimitry Andric 275*0fca6ea1SDimitry Andric void ListeningSocket::shutdown() { 276*0fca6ea1SDimitry Andric int ObservedFD = FD.load(); 277*0fca6ea1SDimitry Andric 278*0fca6ea1SDimitry Andric if (ObservedFD == -1) 279*0fca6ea1SDimitry Andric return; 280*0fca6ea1SDimitry Andric 281*0fca6ea1SDimitry Andric // If FD equals ObservedFD set FD to -1; If FD doesn't equal ObservedFD then 282*0fca6ea1SDimitry Andric // another thread is responsible for shutdown so return 283*0fca6ea1SDimitry Andric if (!FD.compare_exchange_strong(ObservedFD, -1)) 284*0fca6ea1SDimitry Andric return; 285*0fca6ea1SDimitry Andric 286*0fca6ea1SDimitry Andric ::close(ObservedFD); 287*0fca6ea1SDimitry Andric ::unlink(SocketPath.c_str()); 288*0fca6ea1SDimitry Andric 289*0fca6ea1SDimitry Andric // Ensure ::poll returns if shutdown is called by a separate thread 290*0fca6ea1SDimitry Andric char Byte = 'A'; 291*0fca6ea1SDimitry Andric ssize_t written = ::write(PipeFD[1], &Byte, 1); 292*0fca6ea1SDimitry Andric 293*0fca6ea1SDimitry Andric // Ignore any write() error 294*0fca6ea1SDimitry Andric (void)written; 295*0fca6ea1SDimitry Andric } 296*0fca6ea1SDimitry Andric 297*0fca6ea1SDimitry Andric ListeningSocket::~ListeningSocket() { 298*0fca6ea1SDimitry Andric shutdown(); 299*0fca6ea1SDimitry Andric 300*0fca6ea1SDimitry Andric // Close the pipe's FDs in the destructor instead of within 301*0fca6ea1SDimitry Andric // ListeningSocket::shutdown to avoid unnecessary synchronization issues that 302*0fca6ea1SDimitry Andric // would occur as PipeFD's values would have to be changed to -1 303*0fca6ea1SDimitry Andric // 304*0fca6ea1SDimitry Andric // The move constructor sets PipeFD to -1 305*0fca6ea1SDimitry Andric if (PipeFD[0] != -1) 306*0fca6ea1SDimitry Andric ::close(PipeFD[0]); 307*0fca6ea1SDimitry Andric if (PipeFD[1] != -1) 308*0fca6ea1SDimitry Andric ::close(PipeFD[1]); 309*0fca6ea1SDimitry Andric } 310*0fca6ea1SDimitry Andric 311*0fca6ea1SDimitry Andric //===----------------------------------------------------------------------===// 312*0fca6ea1SDimitry Andric // raw_socket_stream 313*0fca6ea1SDimitry Andric //===----------------------------------------------------------------------===// 314*0fca6ea1SDimitry Andric 315cb14a3feSDimitry Andric raw_socket_stream::raw_socket_stream(int SocketFD) 316cb14a3feSDimitry Andric : raw_fd_stream(SocketFD, true) {} 317cb14a3feSDimitry Andric 318*0fca6ea1SDimitry Andric raw_socket_stream::~raw_socket_stream() {} 319*0fca6ea1SDimitry Andric 320cb14a3feSDimitry Andric Expected<std::unique_ptr<raw_socket_stream>> 321cb14a3feSDimitry Andric raw_socket_stream::createConnectedUnix(StringRef SocketPath) { 322cb14a3feSDimitry Andric #ifdef _WIN32 323cb14a3feSDimitry Andric WSABalancer _; 324cb14a3feSDimitry Andric #endif // _WIN32 325*0fca6ea1SDimitry Andric Expected<int> FD = getSocketFD(SocketPath); 326cb14a3feSDimitry Andric if (!FD) 327cb14a3feSDimitry Andric return FD.takeError(); 328cb14a3feSDimitry Andric return std::make_unique<raw_socket_stream>(*FD); 329cb14a3feSDimitry Andric } 330cb14a3feSDimitry Andric 331*0fca6ea1SDimitry Andric ssize_t raw_socket_stream::read(char *Ptr, size_t Size, 332*0fca6ea1SDimitry Andric const std::chrono::milliseconds &Timeout) { 333*0fca6ea1SDimitry Andric auto getActiveFD = [this]() -> int { return this->get_fd(); }; 334*0fca6ea1SDimitry Andric std::error_code Err = manageTimeout(Timeout, getActiveFD); 335*0fca6ea1SDimitry Andric // Mimic raw_fd_stream::read error handling behavior 336*0fca6ea1SDimitry Andric if (Err) { 337*0fca6ea1SDimitry Andric raw_fd_stream::error_detected(Err); 338*0fca6ea1SDimitry Andric return -1; 339*0fca6ea1SDimitry Andric } 340*0fca6ea1SDimitry Andric return raw_fd_stream::read(Ptr, Size); 341cb14a3feSDimitry Andric } 342