1*d415bd75Srobert //===------- SimpleEPCServer.cpp - EPC over simple abstract channel -------===//
2*d415bd75Srobert //
3*d415bd75Srobert // Part of the LLVM Project, under the Apache License v2.0 with LLVM Exceptions.
4*d415bd75Srobert // See https://llvm.org/LICENSE.txt for license information.
5*d415bd75Srobert // SPDX-License-Identifier: Apache-2.0 WITH LLVM-exception
6*d415bd75Srobert //
7*d415bd75Srobert //===----------------------------------------------------------------------===//
8*d415bd75Srobert
9*d415bd75Srobert #include "llvm/ExecutionEngine/Orc/TargetProcess/SimpleRemoteEPCServer.h"
10*d415bd75Srobert
11*d415bd75Srobert #include "llvm/ExecutionEngine/Orc/Shared/TargetProcessControlTypes.h"
12*d415bd75Srobert #include "llvm/Support/FormatVariadic.h"
13*d415bd75Srobert #include "llvm/Support/Host.h"
14*d415bd75Srobert #include "llvm/Support/Process.h"
15*d415bd75Srobert
16*d415bd75Srobert #include "OrcRTBootstrap.h"
17*d415bd75Srobert
18*d415bd75Srobert #define DEBUG_TYPE "orc"
19*d415bd75Srobert
20*d415bd75Srobert using namespace llvm::orc::shared;
21*d415bd75Srobert
22*d415bd75Srobert namespace llvm {
23*d415bd75Srobert namespace orc {
24*d415bd75Srobert
25*d415bd75Srobert ExecutorBootstrapService::~ExecutorBootstrapService() = default;
26*d415bd75Srobert
27*d415bd75Srobert SimpleRemoteEPCServer::Dispatcher::~Dispatcher() = default;
28*d415bd75Srobert
29*d415bd75Srobert #if LLVM_ENABLE_THREADS
dispatch(unique_function<void ()> Work)30*d415bd75Srobert void SimpleRemoteEPCServer::ThreadDispatcher::dispatch(
31*d415bd75Srobert unique_function<void()> Work) {
32*d415bd75Srobert {
33*d415bd75Srobert std::lock_guard<std::mutex> Lock(DispatchMutex);
34*d415bd75Srobert if (!Running)
35*d415bd75Srobert return;
36*d415bd75Srobert ++Outstanding;
37*d415bd75Srobert }
38*d415bd75Srobert
39*d415bd75Srobert std::thread([this, Work = std::move(Work)]() mutable {
40*d415bd75Srobert Work();
41*d415bd75Srobert std::lock_guard<std::mutex> Lock(DispatchMutex);
42*d415bd75Srobert --Outstanding;
43*d415bd75Srobert OutstandingCV.notify_all();
44*d415bd75Srobert }).detach();
45*d415bd75Srobert }
46*d415bd75Srobert
shutdown()47*d415bd75Srobert void SimpleRemoteEPCServer::ThreadDispatcher::shutdown() {
48*d415bd75Srobert std::unique_lock<std::mutex> Lock(DispatchMutex);
49*d415bd75Srobert Running = false;
50*d415bd75Srobert OutstandingCV.wait(Lock, [this]() { return Outstanding == 0; });
51*d415bd75Srobert }
52*d415bd75Srobert #endif
53*d415bd75Srobert
defaultBootstrapSymbols()54*d415bd75Srobert StringMap<ExecutorAddr> SimpleRemoteEPCServer::defaultBootstrapSymbols() {
55*d415bd75Srobert StringMap<ExecutorAddr> DBS;
56*d415bd75Srobert rt_bootstrap::addTo(DBS);
57*d415bd75Srobert return DBS;
58*d415bd75Srobert }
59*d415bd75Srobert
60*d415bd75Srobert Expected<SimpleRemoteEPCTransportClient::HandleMessageAction>
handleMessage(SimpleRemoteEPCOpcode OpC,uint64_t SeqNo,ExecutorAddr TagAddr,SimpleRemoteEPCArgBytesVector ArgBytes)61*d415bd75Srobert SimpleRemoteEPCServer::handleMessage(SimpleRemoteEPCOpcode OpC, uint64_t SeqNo,
62*d415bd75Srobert ExecutorAddr TagAddr,
63*d415bd75Srobert SimpleRemoteEPCArgBytesVector ArgBytes) {
64*d415bd75Srobert
65*d415bd75Srobert LLVM_DEBUG({
66*d415bd75Srobert dbgs() << "SimpleRemoteEPCServer::handleMessage: opc = ";
67*d415bd75Srobert switch (OpC) {
68*d415bd75Srobert case SimpleRemoteEPCOpcode::Setup:
69*d415bd75Srobert dbgs() << "Setup";
70*d415bd75Srobert assert(SeqNo == 0 && "Non-zero SeqNo for Setup?");
71*d415bd75Srobert assert(TagAddr.getValue() == 0 && "Non-zero TagAddr for Setup?");
72*d415bd75Srobert break;
73*d415bd75Srobert case SimpleRemoteEPCOpcode::Hangup:
74*d415bd75Srobert dbgs() << "Hangup";
75*d415bd75Srobert assert(SeqNo == 0 && "Non-zero SeqNo for Hangup?");
76*d415bd75Srobert assert(TagAddr.getValue() == 0 && "Non-zero TagAddr for Hangup?");
77*d415bd75Srobert break;
78*d415bd75Srobert case SimpleRemoteEPCOpcode::Result:
79*d415bd75Srobert dbgs() << "Result";
80*d415bd75Srobert assert(TagAddr.getValue() == 0 && "Non-zero TagAddr for Result?");
81*d415bd75Srobert break;
82*d415bd75Srobert case SimpleRemoteEPCOpcode::CallWrapper:
83*d415bd75Srobert dbgs() << "CallWrapper";
84*d415bd75Srobert break;
85*d415bd75Srobert }
86*d415bd75Srobert dbgs() << ", seqno = " << SeqNo
87*d415bd75Srobert << ", tag-addr = " << formatv("{0:x}", TagAddr.getValue())
88*d415bd75Srobert << ", arg-buffer = " << formatv("{0:x}", ArgBytes.size())
89*d415bd75Srobert << " bytes\n";
90*d415bd75Srobert });
91*d415bd75Srobert
92*d415bd75Srobert using UT = std::underlying_type_t<SimpleRemoteEPCOpcode>;
93*d415bd75Srobert if (static_cast<UT>(OpC) > static_cast<UT>(SimpleRemoteEPCOpcode::LastOpC))
94*d415bd75Srobert return make_error<StringError>("Unexpected opcode",
95*d415bd75Srobert inconvertibleErrorCode());
96*d415bd75Srobert
97*d415bd75Srobert // TODO: Clean detach message?
98*d415bd75Srobert switch (OpC) {
99*d415bd75Srobert case SimpleRemoteEPCOpcode::Setup:
100*d415bd75Srobert return make_error<StringError>("Unexpected Setup opcode",
101*d415bd75Srobert inconvertibleErrorCode());
102*d415bd75Srobert case SimpleRemoteEPCOpcode::Hangup:
103*d415bd75Srobert return SimpleRemoteEPCTransportClient::EndSession;
104*d415bd75Srobert case SimpleRemoteEPCOpcode::Result:
105*d415bd75Srobert if (auto Err = handleResult(SeqNo, TagAddr, std::move(ArgBytes)))
106*d415bd75Srobert return std::move(Err);
107*d415bd75Srobert break;
108*d415bd75Srobert case SimpleRemoteEPCOpcode::CallWrapper:
109*d415bd75Srobert handleCallWrapper(SeqNo, TagAddr, std::move(ArgBytes));
110*d415bd75Srobert break;
111*d415bd75Srobert }
112*d415bd75Srobert return ContinueSession;
113*d415bd75Srobert }
114*d415bd75Srobert
waitForDisconnect()115*d415bd75Srobert Error SimpleRemoteEPCServer::waitForDisconnect() {
116*d415bd75Srobert std::unique_lock<std::mutex> Lock(ServerStateMutex);
117*d415bd75Srobert ShutdownCV.wait(Lock, [this]() { return RunState == ServerShutDown; });
118*d415bd75Srobert return std::move(ShutdownErr);
119*d415bd75Srobert }
120*d415bd75Srobert
handleDisconnect(Error Err)121*d415bd75Srobert void SimpleRemoteEPCServer::handleDisconnect(Error Err) {
122*d415bd75Srobert PendingJITDispatchResultsMap TmpPending;
123*d415bd75Srobert
124*d415bd75Srobert {
125*d415bd75Srobert std::lock_guard<std::mutex> Lock(ServerStateMutex);
126*d415bd75Srobert std::swap(TmpPending, PendingJITDispatchResults);
127*d415bd75Srobert RunState = ServerShuttingDown;
128*d415bd75Srobert }
129*d415bd75Srobert
130*d415bd75Srobert // Send out-of-band errors to any waiting threads.
131*d415bd75Srobert for (auto &KV : TmpPending)
132*d415bd75Srobert KV.second->set_value(
133*d415bd75Srobert shared::WrapperFunctionResult::createOutOfBandError("disconnecting"));
134*d415bd75Srobert
135*d415bd75Srobert // Wait for dispatcher to clear.
136*d415bd75Srobert D->shutdown();
137*d415bd75Srobert
138*d415bd75Srobert // Shut down services.
139*d415bd75Srobert while (!Services.empty()) {
140*d415bd75Srobert ShutdownErr =
141*d415bd75Srobert joinErrors(std::move(ShutdownErr), Services.back()->shutdown());
142*d415bd75Srobert Services.pop_back();
143*d415bd75Srobert }
144*d415bd75Srobert
145*d415bd75Srobert std::lock_guard<std::mutex> Lock(ServerStateMutex);
146*d415bd75Srobert ShutdownErr = joinErrors(std::move(ShutdownErr), std::move(Err));
147*d415bd75Srobert RunState = ServerShutDown;
148*d415bd75Srobert ShutdownCV.notify_all();
149*d415bd75Srobert }
150*d415bd75Srobert
sendMessage(SimpleRemoteEPCOpcode OpC,uint64_t SeqNo,ExecutorAddr TagAddr,ArrayRef<char> ArgBytes)151*d415bd75Srobert Error SimpleRemoteEPCServer::sendMessage(SimpleRemoteEPCOpcode OpC,
152*d415bd75Srobert uint64_t SeqNo, ExecutorAddr TagAddr,
153*d415bd75Srobert ArrayRef<char> ArgBytes) {
154*d415bd75Srobert
155*d415bd75Srobert LLVM_DEBUG({
156*d415bd75Srobert dbgs() << "SimpleRemoteEPCServer::sendMessage: opc = ";
157*d415bd75Srobert switch (OpC) {
158*d415bd75Srobert case SimpleRemoteEPCOpcode::Setup:
159*d415bd75Srobert dbgs() << "Setup";
160*d415bd75Srobert assert(SeqNo == 0 && "Non-zero SeqNo for Setup?");
161*d415bd75Srobert assert(TagAddr.getValue() == 0 && "Non-zero TagAddr for Setup?");
162*d415bd75Srobert break;
163*d415bd75Srobert case SimpleRemoteEPCOpcode::Hangup:
164*d415bd75Srobert dbgs() << "Hangup";
165*d415bd75Srobert assert(SeqNo == 0 && "Non-zero SeqNo for Hangup?");
166*d415bd75Srobert assert(TagAddr.getValue() == 0 && "Non-zero TagAddr for Hangup?");
167*d415bd75Srobert break;
168*d415bd75Srobert case SimpleRemoteEPCOpcode::Result:
169*d415bd75Srobert dbgs() << "Result";
170*d415bd75Srobert assert(TagAddr.getValue() == 0 && "Non-zero TagAddr for Result?");
171*d415bd75Srobert break;
172*d415bd75Srobert case SimpleRemoteEPCOpcode::CallWrapper:
173*d415bd75Srobert dbgs() << "CallWrapper";
174*d415bd75Srobert break;
175*d415bd75Srobert }
176*d415bd75Srobert dbgs() << ", seqno = " << SeqNo
177*d415bd75Srobert << ", tag-addr = " << formatv("{0:x}", TagAddr.getValue())
178*d415bd75Srobert << ", arg-buffer = " << formatv("{0:x}", ArgBytes.size())
179*d415bd75Srobert << " bytes\n";
180*d415bd75Srobert });
181*d415bd75Srobert auto Err = T->sendMessage(OpC, SeqNo, TagAddr, ArgBytes);
182*d415bd75Srobert LLVM_DEBUG({
183*d415bd75Srobert if (Err)
184*d415bd75Srobert dbgs() << " \\--> SimpleRemoteEPC::sendMessage failed\n";
185*d415bd75Srobert });
186*d415bd75Srobert return Err;
187*d415bd75Srobert }
188*d415bd75Srobert
sendSetupMessage(StringMap<ExecutorAddr> BootstrapSymbols)189*d415bd75Srobert Error SimpleRemoteEPCServer::sendSetupMessage(
190*d415bd75Srobert StringMap<ExecutorAddr> BootstrapSymbols) {
191*d415bd75Srobert
192*d415bd75Srobert using namespace SimpleRemoteEPCDefaultBootstrapSymbolNames;
193*d415bd75Srobert
194*d415bd75Srobert std::vector<char> SetupPacket;
195*d415bd75Srobert SimpleRemoteEPCExecutorInfo EI;
196*d415bd75Srobert EI.TargetTriple = sys::getProcessTriple();
197*d415bd75Srobert if (auto PageSize = sys::Process::getPageSize())
198*d415bd75Srobert EI.PageSize = *PageSize;
199*d415bd75Srobert else
200*d415bd75Srobert return PageSize.takeError();
201*d415bd75Srobert EI.BootstrapSymbols = std::move(BootstrapSymbols);
202*d415bd75Srobert
203*d415bd75Srobert assert(!EI.BootstrapSymbols.count(ExecutorSessionObjectName) &&
204*d415bd75Srobert "Dispatch context name should not be set");
205*d415bd75Srobert assert(!EI.BootstrapSymbols.count(DispatchFnName) &&
206*d415bd75Srobert "Dispatch function name should not be set");
207*d415bd75Srobert EI.BootstrapSymbols[ExecutorSessionObjectName] = ExecutorAddr::fromPtr(this);
208*d415bd75Srobert EI.BootstrapSymbols[DispatchFnName] = ExecutorAddr::fromPtr(jitDispatchEntry);
209*d415bd75Srobert
210*d415bd75Srobert using SPSSerialize =
211*d415bd75Srobert shared::SPSArgList<shared::SPSSimpleRemoteEPCExecutorInfo>;
212*d415bd75Srobert auto SetupPacketBytes =
213*d415bd75Srobert shared::WrapperFunctionResult::allocate(SPSSerialize::size(EI));
214*d415bd75Srobert shared::SPSOutputBuffer OB(SetupPacketBytes.data(), SetupPacketBytes.size());
215*d415bd75Srobert if (!SPSSerialize::serialize(OB, EI))
216*d415bd75Srobert return make_error<StringError>("Could not send setup packet",
217*d415bd75Srobert inconvertibleErrorCode());
218*d415bd75Srobert
219*d415bd75Srobert return sendMessage(SimpleRemoteEPCOpcode::Setup, 0, ExecutorAddr(),
220*d415bd75Srobert {SetupPacketBytes.data(), SetupPacketBytes.size()});
221*d415bd75Srobert }
222*d415bd75Srobert
handleResult(uint64_t SeqNo,ExecutorAddr TagAddr,SimpleRemoteEPCArgBytesVector ArgBytes)223*d415bd75Srobert Error SimpleRemoteEPCServer::handleResult(
224*d415bd75Srobert uint64_t SeqNo, ExecutorAddr TagAddr,
225*d415bd75Srobert SimpleRemoteEPCArgBytesVector ArgBytes) {
226*d415bd75Srobert std::promise<shared::WrapperFunctionResult> *P = nullptr;
227*d415bd75Srobert {
228*d415bd75Srobert std::lock_guard<std::mutex> Lock(ServerStateMutex);
229*d415bd75Srobert auto I = PendingJITDispatchResults.find(SeqNo);
230*d415bd75Srobert if (I == PendingJITDispatchResults.end())
231*d415bd75Srobert return make_error<StringError>("No call for sequence number " +
232*d415bd75Srobert Twine(SeqNo),
233*d415bd75Srobert inconvertibleErrorCode());
234*d415bd75Srobert P = I->second;
235*d415bd75Srobert PendingJITDispatchResults.erase(I);
236*d415bd75Srobert releaseSeqNo(SeqNo);
237*d415bd75Srobert }
238*d415bd75Srobert auto R = shared::WrapperFunctionResult::allocate(ArgBytes.size());
239*d415bd75Srobert memcpy(R.data(), ArgBytes.data(), ArgBytes.size());
240*d415bd75Srobert P->set_value(std::move(R));
241*d415bd75Srobert return Error::success();
242*d415bd75Srobert }
243*d415bd75Srobert
handleCallWrapper(uint64_t RemoteSeqNo,ExecutorAddr TagAddr,SimpleRemoteEPCArgBytesVector ArgBytes)244*d415bd75Srobert void SimpleRemoteEPCServer::handleCallWrapper(
245*d415bd75Srobert uint64_t RemoteSeqNo, ExecutorAddr TagAddr,
246*d415bd75Srobert SimpleRemoteEPCArgBytesVector ArgBytes) {
247*d415bd75Srobert D->dispatch([this, RemoteSeqNo, TagAddr, ArgBytes = std::move(ArgBytes)]() {
248*d415bd75Srobert using WrapperFnTy =
249*d415bd75Srobert shared::CWrapperFunctionResult (*)(const char *, size_t);
250*d415bd75Srobert auto *Fn = TagAddr.toPtr<WrapperFnTy>();
251*d415bd75Srobert shared::WrapperFunctionResult ResultBytes(
252*d415bd75Srobert Fn(ArgBytes.data(), ArgBytes.size()));
253*d415bd75Srobert if (auto Err = sendMessage(SimpleRemoteEPCOpcode::Result, RemoteSeqNo,
254*d415bd75Srobert ExecutorAddr(),
255*d415bd75Srobert {ResultBytes.data(), ResultBytes.size()}))
256*d415bd75Srobert ReportError(std::move(Err));
257*d415bd75Srobert });
258*d415bd75Srobert }
259*d415bd75Srobert
260*d415bd75Srobert shared::WrapperFunctionResult
doJITDispatch(const void * FnTag,const char * ArgData,size_t ArgSize)261*d415bd75Srobert SimpleRemoteEPCServer::doJITDispatch(const void *FnTag, const char *ArgData,
262*d415bd75Srobert size_t ArgSize) {
263*d415bd75Srobert uint64_t SeqNo;
264*d415bd75Srobert std::promise<shared::WrapperFunctionResult> ResultP;
265*d415bd75Srobert auto ResultF = ResultP.get_future();
266*d415bd75Srobert {
267*d415bd75Srobert std::lock_guard<std::mutex> Lock(ServerStateMutex);
268*d415bd75Srobert if (RunState != ServerRunning)
269*d415bd75Srobert return shared::WrapperFunctionResult::createOutOfBandError(
270*d415bd75Srobert "jit_dispatch not available (EPC server shut down)");
271*d415bd75Srobert
272*d415bd75Srobert SeqNo = getNextSeqNo();
273*d415bd75Srobert assert(!PendingJITDispatchResults.count(SeqNo) && "SeqNo already in use");
274*d415bd75Srobert PendingJITDispatchResults[SeqNo] = &ResultP;
275*d415bd75Srobert }
276*d415bd75Srobert
277*d415bd75Srobert if (auto Err = sendMessage(SimpleRemoteEPCOpcode::CallWrapper, SeqNo,
278*d415bd75Srobert ExecutorAddr::fromPtr(FnTag), {ArgData, ArgSize}))
279*d415bd75Srobert ReportError(std::move(Err));
280*d415bd75Srobert
281*d415bd75Srobert return ResultF.get();
282*d415bd75Srobert }
283*d415bd75Srobert
284*d415bd75Srobert shared::CWrapperFunctionResult
jitDispatchEntry(void * DispatchCtx,const void * FnTag,const char * ArgData,size_t ArgSize)285*d415bd75Srobert SimpleRemoteEPCServer::jitDispatchEntry(void *DispatchCtx, const void *FnTag,
286*d415bd75Srobert const char *ArgData, size_t ArgSize) {
287*d415bd75Srobert return reinterpret_cast<SimpleRemoteEPCServer *>(DispatchCtx)
288*d415bd75Srobert ->doJITDispatch(FnTag, ArgData, ArgSize)
289*d415bd75Srobert .release();
290*d415bd75Srobert }
291*d415bd75Srobert
292*d415bd75Srobert } // end namespace orc
293*d415bd75Srobert } // end namespace llvm
294