xref: /llvm-project/llvm/lib/ExecutionEngine/Orc/Shared/SimpleRemoteEPCUtils.cpp (revision 89e6a288674c9fae33aeb5448c7b1fe782b2bf53)
1 //===------ SimpleRemoteEPCUtils.cpp - Utils for Simple Remote EPC --------===//
2 //
3 // Part of the LLVM Project, under the Apache License v2.0 with LLVM Exceptions.
4 // See https://llvm.org/LICENSE.txt for license information.
5 // SPDX-License-Identifier: Apache-2.0 WITH LLVM-exception
6 //
7 //===----------------------------------------------------------------------===//
8 //
9 // Message definitions and other utilities for SimpleRemoteEPC and
10 // SimpleRemoteEPCServer.
11 //
12 //===----------------------------------------------------------------------===//
13 
14 #include "llvm/ExecutionEngine/Orc/Shared/SimpleRemoteEPCUtils.h"
15 #include "llvm/Config/llvm-config.h" // for LLVM_ENABLE_THREADS
16 #include "llvm/Support/Endian.h"
17 #include "llvm/Support/FormatVariadic.h"
18 
19 #if !defined(_MSC_VER) && !defined(__MINGW32__)
20 #include <unistd.h>
21 #else
22 #include <io.h>
23 #endif
24 
25 namespace {
26 
27 struct FDMsgHeader {
28   static constexpr unsigned MsgSizeOffset = 0;
29   static constexpr unsigned OpCOffset = MsgSizeOffset + sizeof(uint64_t);
30   static constexpr unsigned SeqNoOffset = OpCOffset + sizeof(uint64_t);
31   static constexpr unsigned TagAddrOffset = SeqNoOffset + sizeof(uint64_t);
32   static constexpr unsigned Size = TagAddrOffset + sizeof(uint64_t);
33 };
34 
35 } // namespace
36 
37 namespace llvm {
38 namespace orc {
39 namespace SimpleRemoteEPCDefaultBootstrapSymbolNames {
40 
41 const char *ExecutorSessionObjectName =
42     "__llvm_orc_SimpleRemoteEPC_dispatch_ctx";
43 const char *DispatchFnName = "__llvm_orc_SimpleRemoteEPC_dispatch_fn";
44 
45 } // end namespace SimpleRemoteEPCDefaultBootstrapSymbolNames
46 
47 SimpleRemoteEPCTransportClient::~SimpleRemoteEPCTransportClient() = default;
48 SimpleRemoteEPCTransport::~SimpleRemoteEPCTransport() = default;
49 
50 Expected<std::unique_ptr<FDSimpleRemoteEPCTransport>>
51 FDSimpleRemoteEPCTransport::Create(SimpleRemoteEPCTransportClient &C, int InFD,
52                                    int OutFD) {
53 #if LLVM_ENABLE_THREADS
54   if (InFD == -1)
55     return make_error<StringError>("Invalid input file descriptor " +
56                                        Twine(InFD),
57                                    inconvertibleErrorCode());
58   if (OutFD == -1)
59     return make_error<StringError>("Invalid output file descriptor " +
60                                        Twine(OutFD),
61                                    inconvertibleErrorCode());
62   std::unique_ptr<FDSimpleRemoteEPCTransport> FDT(
63       new FDSimpleRemoteEPCTransport(C, InFD, OutFD));
64   return std::move(FDT);
65 #else
66   return make_error<StringError>("FD-based SimpleRemoteEPC transport requires "
67                                  "thread support, but llvm was built with "
68                                  "LLVM_ENABLE_THREADS=Off",
69                                  inconvertibleErrorCode());
70 #endif
71 }
72 
73 FDSimpleRemoteEPCTransport::~FDSimpleRemoteEPCTransport() {
74 #if LLVM_ENABLE_THREADS
75   ListenerThread.join();
76 #endif
77 }
78 
79 Error FDSimpleRemoteEPCTransport::start() {
80 #if LLVM_ENABLE_THREADS
81   ListenerThread = std::thread([this]() { listenLoop(); });
82   return Error::success();
83 #endif
84   llvm_unreachable("Should not be called with LLVM_ENABLE_THREADS=Off");
85 }
86 
87 Error FDSimpleRemoteEPCTransport::sendMessage(SimpleRemoteEPCOpcode OpC,
88                                               uint64_t SeqNo,
89                                               ExecutorAddr TagAddr,
90                                               ArrayRef<char> ArgBytes) {
91   char HeaderBuffer[FDMsgHeader::Size];
92 
93   *((support::ulittle64_t *)(HeaderBuffer + FDMsgHeader::MsgSizeOffset)) =
94       FDMsgHeader::Size + ArgBytes.size();
95   *((support::ulittle64_t *)(HeaderBuffer + FDMsgHeader::OpCOffset)) =
96       static_cast<uint64_t>(OpC);
97   *((support::ulittle64_t *)(HeaderBuffer + FDMsgHeader::SeqNoOffset)) = SeqNo;
98   *((support::ulittle64_t *)(HeaderBuffer + FDMsgHeader::TagAddrOffset)) =
99       TagAddr.getValue();
100 
101   std::lock_guard<std::mutex> Lock(M);
102   if (Disconnected)
103     return make_error<StringError>("FD-transport disconnected",
104                                    inconvertibleErrorCode());
105   if (int ErrNo = writeBytes(HeaderBuffer, FDMsgHeader::Size))
106     return errorCodeToError(std::error_code(ErrNo, std::generic_category()));
107   if (int ErrNo = writeBytes(ArgBytes.data(), ArgBytes.size()))
108     return errorCodeToError(std::error_code(ErrNo, std::generic_category()));
109   return Error::success();
110 }
111 
112 void FDSimpleRemoteEPCTransport::disconnect() {
113   if (Disconnected)
114     return; // Return if already disconnected.
115 
116   Disconnected = true;
117   bool CloseOutFD = InFD != OutFD;
118 
119   // Close InFD.
120   while (close(InFD) == -1) {
121     if (errno == EBADF)
122       break;
123   }
124 
125   // Close OutFD.
126   if (CloseOutFD) {
127     while (close(OutFD) == -1) {
128       if (errno == EBADF)
129         break;
130     }
131   }
132 }
133 
134 static Error makeUnexpectedEOFError() {
135   return make_error<StringError>("Unexpected end-of-file",
136                                  inconvertibleErrorCode());
137 }
138 
139 Error FDSimpleRemoteEPCTransport::readBytes(char *Dst, size_t Size,
140                                             bool *IsEOF) {
141   assert((Size == 0 || Dst) && "Attempt to read into null.");
142   ssize_t Completed = 0;
143   while (Completed < static_cast<ssize_t>(Size)) {
144     ssize_t Read = ::read(InFD, Dst + Completed, Size - Completed);
145     if (Read <= 0) {
146       auto ErrNo = errno;
147       if (Read == 0) {
148         if (Completed == 0 && IsEOF) {
149           *IsEOF = true;
150           return Error::success();
151         } else
152           return makeUnexpectedEOFError();
153       } else if (ErrNo == EAGAIN || ErrNo == EINTR)
154         continue;
155       else {
156         std::lock_guard<std::mutex> Lock(M);
157         if (Disconnected && IsEOF) { // disconnect called,  pretend this is EOF.
158           *IsEOF = true;
159           return Error::success();
160         }
161         return errorCodeToError(
162             std::error_code(ErrNo, std::generic_category()));
163       }
164     }
165     Completed += Read;
166   }
167   return Error::success();
168 }
169 
170 int FDSimpleRemoteEPCTransport::writeBytes(const char *Src, size_t Size) {
171   assert((Size == 0 || Src) && "Attempt to append from null.");
172   ssize_t Completed = 0;
173   while (Completed < static_cast<ssize_t>(Size)) {
174     ssize_t Written = ::write(OutFD, Src + Completed, Size - Completed);
175     if (Written < 0) {
176       auto ErrNo = errno;
177       if (ErrNo == EAGAIN || ErrNo == EINTR)
178         continue;
179       else
180         return ErrNo;
181     }
182     Completed += Written;
183   }
184   return 0;
185 }
186 
187 void FDSimpleRemoteEPCTransport::listenLoop() {
188   Error Err = Error::success();
189   do {
190 
191     char HeaderBuffer[FDMsgHeader::Size];
192     // Read the header buffer.
193     {
194       bool IsEOF = false;
195       if (auto Err2 = readBytes(HeaderBuffer, FDMsgHeader::Size, &IsEOF)) {
196         Err = joinErrors(std::move(Err), std::move(Err2));
197         break;
198       }
199       if (IsEOF)
200         break;
201     }
202 
203     // Decode header buffer.
204     uint64_t MsgSize;
205     SimpleRemoteEPCOpcode OpC;
206     uint64_t SeqNo;
207     ExecutorAddr TagAddr;
208 
209     MsgSize =
210         *((support::ulittle64_t *)(HeaderBuffer + FDMsgHeader::MsgSizeOffset));
211     OpC = static_cast<SimpleRemoteEPCOpcode>(static_cast<uint64_t>(
212         *((support::ulittle64_t *)(HeaderBuffer + FDMsgHeader::OpCOffset))));
213     SeqNo =
214         *((support::ulittle64_t *)(HeaderBuffer + FDMsgHeader::SeqNoOffset));
215     TagAddr.setValue(
216         *((support::ulittle64_t *)(HeaderBuffer + FDMsgHeader::TagAddrOffset)));
217 
218     if (MsgSize < FDMsgHeader::Size) {
219       Err = joinErrors(std::move(Err),
220                        make_error<StringError>("Message size too small",
221                                                inconvertibleErrorCode()));
222       break;
223     }
224 
225     // Read the argument bytes.
226     SimpleRemoteEPCArgBytesVector ArgBytes;
227     ArgBytes.resize(MsgSize - FDMsgHeader::Size);
228     if (auto Err2 = readBytes(ArgBytes.data(), ArgBytes.size())) {
229       Err = joinErrors(std::move(Err), std::move(Err2));
230       break;
231     }
232 
233     if (auto Action = C.handleMessage(OpC, SeqNo, TagAddr, ArgBytes)) {
234       if (*Action == SimpleRemoteEPCTransportClient::EndSession)
235         break;
236     } else {
237       Err = joinErrors(std::move(Err), Action.takeError());
238       break;
239     }
240   } while (true);
241 
242   // Attempt to close FDs, set Disconnected to true so that subsequent
243   // sendMessage calls fail.
244   disconnect();
245 
246   // Call up to the client to handle the disconnection.
247   C.handleDisconnect(std::move(Err));
248 }
249 
250 } // end namespace orc
251 } // end namespace llvm
252