xref: /llvm-project/llvm/lib/ExecutionEngine/Orc/TargetProcess/SimpleRemoteEPCServer.cpp (revision 1f4d91ecb8529678a3d3919d7523743bd21942ca)
1 //===------- SimpleEPCServer.cpp - EPC over simple abstract channel -------===//
2 //
3 // Part of the LLVM Project, under the Apache License v2.0 with LLVM Exceptions.
4 // See https://llvm.org/LICENSE.txt for license information.
5 // SPDX-License-Identifier: Apache-2.0 WITH LLVM-exception
6 //
7 //===----------------------------------------------------------------------===//
8 
9 #include "llvm/ExecutionEngine/Orc/TargetProcess/SimpleRemoteEPCServer.h"
10 
11 #include "llvm/ExecutionEngine/Orc/Shared/OrcRTBridge.h"
12 #include "llvm/ExecutionEngine/Orc/TargetProcess/RegisterEHFrames.h"
13 #include "llvm/Support/FormatVariadic.h"
14 #include "llvm/Support/Process.h"
15 #include "llvm/TargetParser/Host.h"
16 
17 #include "OrcRTBootstrap.h"
18 
19 #define DEBUG_TYPE "orc"
20 
21 using namespace llvm::orc::shared;
22 
23 namespace llvm {
24 namespace orc {
25 
26 ExecutorBootstrapService::~ExecutorBootstrapService() = default;
27 
28 SimpleRemoteEPCServer::Dispatcher::~Dispatcher() = default;
29 
30 #if LLVM_ENABLE_THREADS
31 void SimpleRemoteEPCServer::ThreadDispatcher::dispatch(
32     unique_function<void()> Work) {
33   {
34     std::lock_guard<std::mutex> Lock(DispatchMutex);
35     if (!Running)
36       return;
37     ++Outstanding;
38   }
39 
40   std::thread([this, Work = std::move(Work)]() mutable {
41     Work();
42     std::lock_guard<std::mutex> Lock(DispatchMutex);
43     --Outstanding;
44     OutstandingCV.notify_all();
45   }).detach();
46 }
47 
48 void SimpleRemoteEPCServer::ThreadDispatcher::shutdown() {
49   std::unique_lock<std::mutex> Lock(DispatchMutex);
50   Running = false;
51   OutstandingCV.wait(Lock, [this]() { return Outstanding == 0; });
52 }
53 #endif
54 
55 StringMap<ExecutorAddr> SimpleRemoteEPCServer::defaultBootstrapSymbols() {
56   StringMap<ExecutorAddr> DBS;
57   rt_bootstrap::addTo(DBS);
58   return DBS;
59 }
60 
61 Expected<SimpleRemoteEPCTransportClient::HandleMessageAction>
62 SimpleRemoteEPCServer::handleMessage(SimpleRemoteEPCOpcode OpC, uint64_t SeqNo,
63                                      ExecutorAddr TagAddr,
64                                      SimpleRemoteEPCArgBytesVector ArgBytes) {
65 
66   LLVM_DEBUG({
67     dbgs() << "SimpleRemoteEPCServer::handleMessage: opc = ";
68     switch (OpC) {
69     case SimpleRemoteEPCOpcode::Setup:
70       dbgs() << "Setup";
71       assert(SeqNo == 0 && "Non-zero SeqNo for Setup?");
72       assert(!TagAddr && "Non-zero TagAddr for Setup?");
73       break;
74     case SimpleRemoteEPCOpcode::Hangup:
75       dbgs() << "Hangup";
76       assert(SeqNo == 0 && "Non-zero SeqNo for Hangup?");
77       assert(!TagAddr && "Non-zero TagAddr for Hangup?");
78       break;
79     case SimpleRemoteEPCOpcode::Result:
80       dbgs() << "Result";
81       assert(!TagAddr && "Non-zero TagAddr for Result?");
82       break;
83     case SimpleRemoteEPCOpcode::CallWrapper:
84       dbgs() << "CallWrapper";
85       break;
86     }
87     dbgs() << ", seqno = " << SeqNo << ", tag-addr = " << TagAddr
88            << ", arg-buffer = " << formatv("{0:x}", ArgBytes.size())
89            << " bytes\n";
90   });
91 
92   using UT = std::underlying_type_t<SimpleRemoteEPCOpcode>;
93   if (static_cast<UT>(OpC) > static_cast<UT>(SimpleRemoteEPCOpcode::LastOpC))
94     return make_error<StringError>("Unexpected opcode",
95                                    inconvertibleErrorCode());
96 
97   // TODO: Clean detach message?
98   switch (OpC) {
99   case SimpleRemoteEPCOpcode::Setup:
100     return make_error<StringError>("Unexpected Setup opcode",
101                                    inconvertibleErrorCode());
102   case SimpleRemoteEPCOpcode::Hangup:
103     return SimpleRemoteEPCTransportClient::EndSession;
104   case SimpleRemoteEPCOpcode::Result:
105     if (auto Err = handleResult(SeqNo, TagAddr, std::move(ArgBytes)))
106       return std::move(Err);
107     break;
108   case SimpleRemoteEPCOpcode::CallWrapper:
109     handleCallWrapper(SeqNo, TagAddr, std::move(ArgBytes));
110     break;
111   }
112   return ContinueSession;
113 }
114 
115 Error SimpleRemoteEPCServer::waitForDisconnect() {
116   std::unique_lock<std::mutex> Lock(ServerStateMutex);
117   ShutdownCV.wait(Lock, [this]() { return RunState == ServerShutDown; });
118   return std::move(ShutdownErr);
119 }
120 
121 void SimpleRemoteEPCServer::handleDisconnect(Error Err) {
122   PendingJITDispatchResultsMap TmpPending;
123 
124   {
125     std::lock_guard<std::mutex> Lock(ServerStateMutex);
126     std::swap(TmpPending, PendingJITDispatchResults);
127     RunState = ServerShuttingDown;
128   }
129 
130   // Send out-of-band errors to any waiting threads.
131   for (auto &KV : TmpPending)
132     KV.second->set_value(
133         shared::WrapperFunctionResult::createOutOfBandError("disconnecting"));
134 
135   // Wait for dispatcher to clear.
136   D->shutdown();
137 
138   // Shut down services.
139   while (!Services.empty()) {
140     ShutdownErr =
141       joinErrors(std::move(ShutdownErr), Services.back()->shutdown());
142     Services.pop_back();
143   }
144 
145   std::lock_guard<std::mutex> Lock(ServerStateMutex);
146   ShutdownErr = joinErrors(std::move(ShutdownErr), std::move(Err));
147   RunState = ServerShutDown;
148   ShutdownCV.notify_all();
149 }
150 
151 Error SimpleRemoteEPCServer::sendMessage(SimpleRemoteEPCOpcode OpC,
152                                          uint64_t SeqNo, ExecutorAddr TagAddr,
153                                          ArrayRef<char> ArgBytes) {
154 
155   LLVM_DEBUG({
156     dbgs() << "SimpleRemoteEPCServer::sendMessage: opc = ";
157     switch (OpC) {
158     case SimpleRemoteEPCOpcode::Setup:
159       dbgs() << "Setup";
160       assert(SeqNo == 0 && "Non-zero SeqNo for Setup?");
161       assert(!TagAddr && "Non-zero TagAddr for Setup?");
162       break;
163     case SimpleRemoteEPCOpcode::Hangup:
164       dbgs() << "Hangup";
165       assert(SeqNo == 0 && "Non-zero SeqNo for Hangup?");
166       assert(!TagAddr && "Non-zero TagAddr for Hangup?");
167       break;
168     case SimpleRemoteEPCOpcode::Result:
169       dbgs() << "Result";
170       assert(!TagAddr && "Non-zero TagAddr for Result?");
171       break;
172     case SimpleRemoteEPCOpcode::CallWrapper:
173       dbgs() << "CallWrapper";
174       break;
175     }
176     dbgs() << ", seqno = " << SeqNo << ", tag-addr = " << TagAddr
177            << ", arg-buffer = " << formatv("{0:x}", ArgBytes.size())
178            << " bytes\n";
179   });
180   auto Err = T->sendMessage(OpC, SeqNo, TagAddr, ArgBytes);
181   LLVM_DEBUG({
182     if (Err)
183       dbgs() << "  \\--> SimpleRemoteEPC::sendMessage failed\n";
184   });
185   return Err;
186 }
187 
188 Error SimpleRemoteEPCServer::sendSetupMessage(
189     StringMap<std::vector<char>> BootstrapMap,
190     StringMap<ExecutorAddr> BootstrapSymbols) {
191 
192   using namespace SimpleRemoteEPCDefaultBootstrapSymbolNames;
193 
194   std::vector<char> SetupPacket;
195   SimpleRemoteEPCExecutorInfo EI;
196   EI.TargetTriple = sys::getProcessTriple();
197   if (auto PageSize = sys::Process::getPageSize())
198     EI.PageSize = *PageSize;
199   else
200     return PageSize.takeError();
201   EI.BootstrapMap = std::move(BootstrapMap);
202   EI.BootstrapSymbols = std::move(BootstrapSymbols);
203 
204   assert(!EI.BootstrapSymbols.count(ExecutorSessionObjectName) &&
205          "Dispatch context name should not be set");
206   assert(!EI.BootstrapSymbols.count(DispatchFnName) &&
207          "Dispatch function name should not be set");
208   EI.BootstrapSymbols[ExecutorSessionObjectName] = ExecutorAddr::fromPtr(this);
209   EI.BootstrapSymbols[DispatchFnName] = ExecutorAddr::fromPtr(jitDispatchEntry);
210   EI.BootstrapSymbols[rt::RegisterEHFrameSectionWrapperName] =
211       ExecutorAddr::fromPtr(&llvm_orc_registerEHFrameSectionWrapper);
212   EI.BootstrapSymbols[rt::DeregisterEHFrameSectionWrapperName] =
213       ExecutorAddr::fromPtr(&llvm_orc_deregisterEHFrameSectionWrapper);
214 
215   using SPSSerialize =
216       shared::SPSArgList<shared::SPSSimpleRemoteEPCExecutorInfo>;
217   auto SetupPacketBytes =
218       shared::WrapperFunctionResult::allocate(SPSSerialize::size(EI));
219   shared::SPSOutputBuffer OB(SetupPacketBytes.data(), SetupPacketBytes.size());
220   if (!SPSSerialize::serialize(OB, EI))
221     return make_error<StringError>("Could not send setup packet",
222                                    inconvertibleErrorCode());
223 
224   return sendMessage(SimpleRemoteEPCOpcode::Setup, 0, ExecutorAddr(),
225                      {SetupPacketBytes.data(), SetupPacketBytes.size()});
226 }
227 
228 Error SimpleRemoteEPCServer::handleResult(
229     uint64_t SeqNo, ExecutorAddr TagAddr,
230     SimpleRemoteEPCArgBytesVector ArgBytes) {
231   std::promise<shared::WrapperFunctionResult> *P = nullptr;
232   {
233     std::lock_guard<std::mutex> Lock(ServerStateMutex);
234     auto I = PendingJITDispatchResults.find(SeqNo);
235     if (I == PendingJITDispatchResults.end())
236       return make_error<StringError>("No call for sequence number " +
237                                          Twine(SeqNo),
238                                      inconvertibleErrorCode());
239     P = I->second;
240     PendingJITDispatchResults.erase(I);
241     releaseSeqNo(SeqNo);
242   }
243   auto R = shared::WrapperFunctionResult::allocate(ArgBytes.size());
244   memcpy(R.data(), ArgBytes.data(), ArgBytes.size());
245   P->set_value(std::move(R));
246   return Error::success();
247 }
248 
249 void SimpleRemoteEPCServer::handleCallWrapper(
250     uint64_t RemoteSeqNo, ExecutorAddr TagAddr,
251     SimpleRemoteEPCArgBytesVector ArgBytes) {
252   D->dispatch([this, RemoteSeqNo, TagAddr, ArgBytes = std::move(ArgBytes)]() {
253     using WrapperFnTy =
254         shared::CWrapperFunctionResult (*)(const char *, size_t);
255     auto *Fn = TagAddr.toPtr<WrapperFnTy>();
256     shared::WrapperFunctionResult ResultBytes(
257         Fn(ArgBytes.data(), ArgBytes.size()));
258     if (auto Err = sendMessage(SimpleRemoteEPCOpcode::Result, RemoteSeqNo,
259                                ExecutorAddr(),
260                                {ResultBytes.data(), ResultBytes.size()}))
261       ReportError(std::move(Err));
262   });
263 }
264 
265 shared::WrapperFunctionResult
266 SimpleRemoteEPCServer::doJITDispatch(const void *FnTag, const char *ArgData,
267                                      size_t ArgSize) {
268   uint64_t SeqNo;
269   std::promise<shared::WrapperFunctionResult> ResultP;
270   auto ResultF = ResultP.get_future();
271   {
272     std::lock_guard<std::mutex> Lock(ServerStateMutex);
273     if (RunState != ServerRunning)
274       return shared::WrapperFunctionResult::createOutOfBandError(
275           "jit_dispatch not available (EPC server shut down)");
276 
277     SeqNo = getNextSeqNo();
278     assert(!PendingJITDispatchResults.count(SeqNo) && "SeqNo already in use");
279     PendingJITDispatchResults[SeqNo] = &ResultP;
280   }
281 
282   if (auto Err = sendMessage(SimpleRemoteEPCOpcode::CallWrapper, SeqNo,
283                              ExecutorAddr::fromPtr(FnTag), {ArgData, ArgSize}))
284     ReportError(std::move(Err));
285 
286   return ResultF.get();
287 }
288 
289 shared::CWrapperFunctionResult
290 SimpleRemoteEPCServer::jitDispatchEntry(void *DispatchCtx, const void *FnTag,
291                                         const char *ArgData, size_t ArgSize) {
292   return reinterpret_cast<SimpleRemoteEPCServer *>(DispatchCtx)
293       ->doJITDispatch(FnTag, ArgData, ArgSize)
294       .release();
295 }
296 
297 } // end namespace orc
298 } // end namespace llvm
299