xref: /llvm-project/llvm/lib/ExecutionEngine/Orc/TargetProcess/SimpleRemoteEPCServer.cpp (revision a2c1cf09dfaaa6d2161fee00f8317005bf955d64)
1 //===------- SimpleEPCServer.cpp - EPC over simple abstract channel -------===//
2 //
3 // Part of the LLVM Project, under the Apache License v2.0 with LLVM Exceptions.
4 // See https://llvm.org/LICENSE.txt for license information.
5 // SPDX-License-Identifier: Apache-2.0 WITH LLVM-exception
6 //
7 //===----------------------------------------------------------------------===//
8 
9 #include "llvm/ExecutionEngine/Orc/TargetProcess/SimpleRemoteEPCServer.h"
10 
11 #include "llvm/ExecutionEngine/Orc/Shared/TargetProcessControlTypes.h"
12 #include "llvm/Support/FormatVariadic.h"
13 #include "llvm/Support/Host.h"
14 #include "llvm/Support/Process.h"
15 
16 #include "OrcRTBootstrap.h"
17 
18 #define DEBUG_TYPE "orc"
19 
20 using namespace llvm::orc::shared;
21 
22 namespace llvm {
23 namespace orc {
24 
25 ExecutorBootstrapService::~ExecutorBootstrapService() {}
26 
27 SimpleRemoteEPCServer::Dispatcher::~Dispatcher() {}
28 
29 #if LLVM_ENABLE_THREADS
30 void SimpleRemoteEPCServer::ThreadDispatcher::dispatch(
31     unique_function<void()> Work) {
32   {
33     std::lock_guard<std::mutex> Lock(DispatchMutex);
34     if (!Running)
35       return;
36     ++Outstanding;
37   }
38 
39   std::thread([this, Work = std::move(Work)]() mutable {
40     Work();
41     std::lock_guard<std::mutex> Lock(DispatchMutex);
42     --Outstanding;
43     OutstandingCV.notify_all();
44   }).detach();
45 }
46 
47 void SimpleRemoteEPCServer::ThreadDispatcher::shutdown() {
48   std::unique_lock<std::mutex> Lock(DispatchMutex);
49   Running = false;
50   OutstandingCV.wait(Lock, [this]() { return Outstanding == 0; });
51 }
52 #endif
53 
54 StringMap<ExecutorAddress> SimpleRemoteEPCServer::defaultBootstrapSymbols() {
55   StringMap<ExecutorAddress> DBS;
56   rt_bootstrap::addTo(DBS);
57   return DBS;
58 }
59 
60 Expected<SimpleRemoteEPCTransportClient::HandleMessageAction>
61 SimpleRemoteEPCServer::handleMessage(SimpleRemoteEPCOpcode OpC, uint64_t SeqNo,
62                                      ExecutorAddress TagAddr,
63                                      SimpleRemoteEPCArgBytesVector ArgBytes) {
64   using UT = std::underlying_type_t<SimpleRemoteEPCOpcode>;
65   if (static_cast<UT>(OpC) > static_cast<UT>(SimpleRemoteEPCOpcode::LastOpC))
66     return make_error<StringError>("Unexpected opcode",
67                                    inconvertibleErrorCode());
68 
69   // TODO: Clean detach message?
70   switch (OpC) {
71   case SimpleRemoteEPCOpcode::Setup:
72     return make_error<StringError>("Unexpected Setup opcode",
73                                    inconvertibleErrorCode());
74   case SimpleRemoteEPCOpcode::Hangup:
75     return SimpleRemoteEPCTransportClient::EndSession;
76   case SimpleRemoteEPCOpcode::Result:
77     if (auto Err = handleResult(SeqNo, TagAddr, std::move(ArgBytes)))
78       return std::move(Err);
79     break;
80   case SimpleRemoteEPCOpcode::CallWrapper:
81     handleCallWrapper(SeqNo, TagAddr, std::move(ArgBytes));
82     break;
83   }
84   return ContinueSession;
85 }
86 
87 Error SimpleRemoteEPCServer::waitForDisconnect() {
88   std::unique_lock<std::mutex> Lock(ServerStateMutex);
89   ShutdownCV.wait(Lock, [this]() { return RunState == ServerShutDown; });
90   return std::move(ShutdownErr);
91 }
92 
93 void SimpleRemoteEPCServer::handleDisconnect(Error Err) {
94   PendingJITDispatchResultsMap TmpPending;
95 
96   {
97     std::lock_guard<std::mutex> Lock(ServerStateMutex);
98     std::swap(TmpPending, PendingJITDispatchResults);
99     RunState = ServerShuttingDown;
100   }
101 
102   // Send out-of-band errors to any waiting threads.
103   for (auto &KV : TmpPending)
104     KV.second->set_value(
105         shared::WrapperFunctionResult::createOutOfBandError("disconnecting"));
106 
107   // TODO: Free attached resources.
108   // 1. Close libraries in DylibHandles.
109 
110   // Wait for dispatcher to clear.
111   D->shutdown();
112 
113   // Shut down services.
114   while (!Services.empty()) {
115     ShutdownErr =
116       joinErrors(std::move(ShutdownErr), Services.back()->shutdown());
117     Services.pop_back();
118   }
119 
120   std::lock_guard<std::mutex> Lock(ServerStateMutex);
121   ShutdownErr = joinErrors(std::move(ShutdownErr), std::move(Err));
122   RunState = ServerShutDown;
123   ShutdownCV.notify_all();
124 }
125 
126 Error SimpleRemoteEPCServer::sendSetupMessage(
127     StringMap<ExecutorAddress> BootstrapSymbols) {
128 
129   using namespace SimpleRemoteEPCDefaultBootstrapSymbolNames;
130 
131   std::vector<char> SetupPacket;
132   SimpleRemoteEPCExecutorInfo EI;
133   EI.TargetTriple = sys::getProcessTriple();
134   if (auto PageSize = sys::Process::getPageSize())
135     EI.PageSize = *PageSize;
136   else
137     return PageSize.takeError();
138   EI.BootstrapSymbols = std::move(BootstrapSymbols);
139 
140   assert(!EI.BootstrapSymbols.count(ExecutorSessionObjectName) &&
141          "Dispatch context name should not be set");
142   assert(!EI.BootstrapSymbols.count(DispatchFnName) &&
143          "Dispatch function name should not be set");
144   EI.BootstrapSymbols[ExecutorSessionObjectName] =
145       ExecutorAddress::fromPtr(this);
146   EI.BootstrapSymbols[DispatchFnName] =
147       ExecutorAddress::fromPtr(jitDispatchEntry);
148 
149   using SPSSerialize =
150       shared::SPSArgList<shared::SPSSimpleRemoteEPCExecutorInfo>;
151   auto SetupPacketBytes =
152       shared::WrapperFunctionResult::allocate(SPSSerialize::size(EI));
153   shared::SPSOutputBuffer OB(SetupPacketBytes.data(), SetupPacketBytes.size());
154   if (!SPSSerialize::serialize(OB, EI))
155     return make_error<StringError>("Could not send setup packet",
156                                    inconvertibleErrorCode());
157 
158   return T->sendMessage(SimpleRemoteEPCOpcode::Setup, 0, ExecutorAddress(),
159                         {SetupPacketBytes.data(), SetupPacketBytes.size()});
160 }
161 
162 Error SimpleRemoteEPCServer::handleResult(
163     uint64_t SeqNo, ExecutorAddress TagAddr,
164     SimpleRemoteEPCArgBytesVector ArgBytes) {
165   std::promise<shared::WrapperFunctionResult> *P = nullptr;
166   {
167     std::lock_guard<std::mutex> Lock(ServerStateMutex);
168     auto I = PendingJITDispatchResults.find(SeqNo);
169     if (I == PendingJITDispatchResults.end())
170       return make_error<StringError>("No call for sequence number " +
171                                          Twine(SeqNo),
172                                      inconvertibleErrorCode());
173     P = I->second;
174     PendingJITDispatchResults.erase(I);
175     releaseSeqNo(SeqNo);
176   }
177   auto R = shared::WrapperFunctionResult::allocate(ArgBytes.size());
178   memcpy(R.data(), ArgBytes.data(), ArgBytes.size());
179   P->set_value(std::move(R));
180   return Error::success();
181 }
182 
183 void SimpleRemoteEPCServer::handleCallWrapper(
184     uint64_t RemoteSeqNo, ExecutorAddress TagAddr,
185     SimpleRemoteEPCArgBytesVector ArgBytes) {
186   D->dispatch([this, RemoteSeqNo, TagAddr, ArgBytes = std::move(ArgBytes)]() {
187     using WrapperFnTy =
188         shared::detail::CWrapperFunctionResult (*)(const char *, size_t);
189     auto *Fn = TagAddr.toPtr<WrapperFnTy>();
190     shared::WrapperFunctionResult ResultBytes(
191         Fn(ArgBytes.data(), ArgBytes.size()));
192     if (auto Err = T->sendMessage(SimpleRemoteEPCOpcode::Result, RemoteSeqNo,
193                                   ExecutorAddress(),
194                                   {ResultBytes.data(), ResultBytes.size()}))
195       ReportError(std::move(Err));
196   });
197 }
198 
199 shared::WrapperFunctionResult
200 SimpleRemoteEPCServer::doJITDispatch(const void *FnTag, const char *ArgData,
201                                      size_t ArgSize) {
202   uint64_t SeqNo;
203   std::promise<shared::WrapperFunctionResult> ResultP;
204   auto ResultF = ResultP.get_future();
205   {
206     std::lock_guard<std::mutex> Lock(ServerStateMutex);
207     if (RunState != ServerRunning)
208       return shared::WrapperFunctionResult::createOutOfBandError(
209           "jit_dispatch not available (EPC server shut down)");
210 
211     SeqNo = getNextSeqNo();
212     assert(!PendingJITDispatchResults.count(SeqNo) && "SeqNo already in use");
213     PendingJITDispatchResults[SeqNo] = &ResultP;
214   }
215 
216   if (auto Err =
217           T->sendMessage(SimpleRemoteEPCOpcode::CallWrapper, SeqNo,
218                          ExecutorAddress::fromPtr(FnTag), {ArgData, ArgSize}))
219     ReportError(std::move(Err));
220 
221   return ResultF.get();
222 }
223 
224 shared::detail::CWrapperFunctionResult
225 SimpleRemoteEPCServer::jitDispatchEntry(void *DispatchCtx, const void *FnTag,
226                                         const char *ArgData, size_t ArgSize) {
227   return reinterpret_cast<SimpleRemoteEPCServer *>(DispatchCtx)
228       ->doJITDispatch(FnTag, ArgData, ArgSize)
229       .release();
230 }
231 
232 } // end namespace orc
233 } // end namespace llvm
234