xref: /llvm-project/lldb/source/Core/ThreadedCommunication.cpp (revision a0dd90eb7dc318c9b3fccb9ba02e1e22fb073094)
19823d425SMichał Górny //===-- ThreadedCommunication.cpp -----------------------------------------===//
29823d425SMichał Górny //
39823d425SMichał Górny // Part of the LLVM Project, under the Apache License v2.0 with LLVM Exceptions.
49823d425SMichał Górny // See https://llvm.org/LICENSE.txt for license information.
59823d425SMichał Górny // SPDX-License-Identifier: Apache-2.0 WITH LLVM-exception
69823d425SMichał Górny //
79823d425SMichał Górny //===----------------------------------------------------------------------===//
89823d425SMichał Górny 
99823d425SMichał Górny #include "lldb/Core/ThreadedCommunication.h"
109823d425SMichał Górny 
119823d425SMichał Górny #include "lldb/Host/ThreadLauncher.h"
129823d425SMichał Górny #include "lldb/Utility/Connection.h"
139823d425SMichał Górny #include "lldb/Utility/ConstString.h"
149823d425SMichał Górny #include "lldb/Utility/Event.h"
159823d425SMichał Górny #include "lldb/Utility/LLDBLog.h"
169823d425SMichał Górny #include "lldb/Utility/Listener.h"
179823d425SMichał Górny #include "lldb/Utility/Log.h"
189823d425SMichał Górny #include "lldb/Utility/Status.h"
199823d425SMichał Górny 
209823d425SMichał Górny #include "llvm/Support/Compiler.h"
219823d425SMichał Górny 
229823d425SMichał Górny #include <algorithm>
239823d425SMichał Górny #include <chrono>
249823d425SMichał Górny #include <cstring>
259823d425SMichał Górny #include <memory>
261a8d9a76SJonas Devlieghere #include <shared_mutex>
279823d425SMichał Górny 
289823d425SMichał Górny #include <cerrno>
299823d425SMichał Górny #include <cinttypes>
309823d425SMichał Górny #include <cstdio>
319823d425SMichał Górny 
329823d425SMichał Górny using namespace lldb;
339823d425SMichał Górny using namespace lldb_private;
349823d425SMichał Górny 
3557794835SAlex Langford llvm::StringRef ThreadedCommunication::GetStaticBroadcasterClass() {
3657794835SAlex Langford   static constexpr llvm::StringLiteral class_name("lldb.communication");
379823d425SMichał Górny   return class_name;
389823d425SMichał Górny }
399823d425SMichał Górny 
409823d425SMichał Górny ThreadedCommunication::ThreadedCommunication(const char *name)
419823d425SMichał Górny     : Communication(), Broadcaster(nullptr, name), m_read_thread_enabled(false),
429823d425SMichał Górny       m_read_thread_did_exit(false), m_bytes(), m_bytes_mutex(),
439823d425SMichał Górny       m_synchronize_mutex(), m_callback(nullptr), m_callback_baton(nullptr) {
449823d425SMichał Górny   LLDB_LOG(GetLog(LLDBLog::Object | LLDBLog::Communication),
459823d425SMichał Górny            "{0} ThreadedCommunication::ThreadedCommunication (name = {1})",
469823d425SMichał Górny            this, name);
479823d425SMichał Górny 
489823d425SMichał Górny   SetEventName(eBroadcastBitDisconnected, "disconnected");
499823d425SMichał Górny   SetEventName(eBroadcastBitReadThreadGotBytes, "got bytes");
509823d425SMichał Górny   SetEventName(eBroadcastBitReadThreadDidExit, "read thread did exit");
519823d425SMichał Górny   SetEventName(eBroadcastBitReadThreadShouldExit, "read thread should exit");
529823d425SMichał Górny   SetEventName(eBroadcastBitPacketAvailable, "packet available");
539823d425SMichał Górny   SetEventName(eBroadcastBitNoMorePendingInput, "no more pending input");
549823d425SMichał Górny 
559823d425SMichał Górny   CheckInWithManager();
569823d425SMichał Górny }
579823d425SMichał Górny 
589823d425SMichał Górny ThreadedCommunication::~ThreadedCommunication() {
599823d425SMichał Górny   LLDB_LOG(GetLog(LLDBLog::Object | LLDBLog::Communication),
609823d425SMichał Górny            "{0} ThreadedCommunication::~ThreadedCommunication (name = {1})",
61867ee3b8SAlex Langford            this, GetBroadcasterName());
629823d425SMichał Górny }
639823d425SMichał Górny 
643b919570SAugusto Noronha void ThreadedCommunication::Clear() {
653b919570SAugusto Noronha   SetReadThreadBytesReceivedCallback(nullptr, nullptr);
663b919570SAugusto Noronha   StopReadThread(nullptr);
673b919570SAugusto Noronha   Communication::Clear();
683b919570SAugusto Noronha }
699823d425SMichał Górny 
709823d425SMichał Górny ConnectionStatus ThreadedCommunication::Disconnect(Status *error_ptr) {
719823d425SMichał Górny   assert((!m_read_thread_enabled || m_read_thread_did_exit) &&
729823d425SMichał Górny          "Disconnecting while the read thread is running is racy!");
739823d425SMichał Górny   return Communication::Disconnect(error_ptr);
749823d425SMichał Górny }
759823d425SMichał Górny 
769823d425SMichał Górny size_t ThreadedCommunication::Read(void *dst, size_t dst_len,
779823d425SMichał Górny                                    const Timeout<std::micro> &timeout,
789823d425SMichał Górny                                    ConnectionStatus &status,
799823d425SMichał Górny                                    Status *error_ptr) {
809823d425SMichał Górny   Log *log = GetLog(LLDBLog::Communication);
819823d425SMichał Górny   LLDB_LOG(
829823d425SMichał Górny       log,
839823d425SMichał Górny       "this = {0}, dst = {1}, dst_len = {2}, timeout = {3}, connection = {4}",
849823d425SMichał Górny       this, dst, dst_len, timeout, m_connection_sp.get());
859823d425SMichał Górny 
869823d425SMichał Górny   if (m_read_thread_enabled) {
879823d425SMichał Górny     // We have a dedicated read thread that is getting data for us
889823d425SMichał Górny     size_t cached_bytes = GetCachedBytes(dst, dst_len);
899823d425SMichał Górny     if (cached_bytes > 0) {
909823d425SMichał Górny       status = eConnectionStatusSuccess;
919823d425SMichał Górny       return cached_bytes;
929823d425SMichał Górny     }
939823d425SMichał Górny     if (timeout && timeout->count() == 0) {
949823d425SMichał Górny       if (error_ptr)
950642cd76SAdrian Prantl         *error_ptr = Status::FromErrorString("Timed out.");
969823d425SMichał Górny       status = eConnectionStatusTimedOut;
979823d425SMichał Górny       return 0;
989823d425SMichał Górny     }
999823d425SMichał Górny 
1009823d425SMichał Górny     if (!m_connection_sp) {
1019823d425SMichał Górny       if (error_ptr)
1020642cd76SAdrian Prantl         *error_ptr = Status::FromErrorString("Invalid connection.");
1039823d425SMichał Górny       status = eConnectionStatusNoConnection;
1049823d425SMichał Górny       return 0;
1059823d425SMichał Górny     }
1069823d425SMichał Górny 
10789a3691bSPavel Labath     // No data yet, we have to start listening.
1089823d425SMichał Górny     ListenerSP listener_sp(
1099823d425SMichał Górny         Listener::MakeListener("ThreadedCommunication::Read"));
1109823d425SMichał Górny     listener_sp->StartListeningForEvents(
1119823d425SMichał Górny         this, eBroadcastBitReadThreadGotBytes | eBroadcastBitReadThreadDidExit);
11289a3691bSPavel Labath 
11389a3691bSPavel Labath     // Re-check for data, as it might have arrived while we were setting up our
11489a3691bSPavel Labath     // listener.
11589a3691bSPavel Labath     cached_bytes = GetCachedBytes(dst, dst_len);
11689a3691bSPavel Labath     if (cached_bytes > 0) {
11789a3691bSPavel Labath       status = eConnectionStatusSuccess;
11889a3691bSPavel Labath       return cached_bytes;
11989a3691bSPavel Labath     }
12089a3691bSPavel Labath 
1219823d425SMichał Górny     EventSP event_sp;
12289a3691bSPavel Labath     // Explicitly check for the thread exit, for the same reason.
12389a3691bSPavel Labath     if (m_read_thread_did_exit) {
12489a3691bSPavel Labath       // We've missed the event, lets just conjure one up.
12589a3691bSPavel Labath       event_sp = std::make_shared<Event>(eBroadcastBitReadThreadDidExit);
12689a3691bSPavel Labath     } else {
12789a3691bSPavel Labath       if (!listener_sp->GetEvent(event_sp, timeout)) {
12889a3691bSPavel Labath         if (error_ptr)
1290642cd76SAdrian Prantl           *error_ptr = Status::FromErrorString("Timed out.");
13089a3691bSPavel Labath         status = eConnectionStatusTimedOut;
13189a3691bSPavel Labath         return 0;
13289a3691bSPavel Labath       }
13389a3691bSPavel Labath     }
1349823d425SMichał Górny     const uint32_t event_type = event_sp->GetType();
1359823d425SMichał Górny     if (event_type & eBroadcastBitReadThreadGotBytes) {
1369823d425SMichał Górny       return GetCachedBytes(dst, dst_len);
1379823d425SMichał Górny     }
1389823d425SMichał Górny 
1399823d425SMichał Górny     if (event_type & eBroadcastBitReadThreadDidExit) {
1409823d425SMichał Górny       // If the thread exited of its own accord, it either means it
1419823d425SMichał Górny       // hit an end-of-file condition or an error.
1429823d425SMichał Górny       status = m_pass_status;
1439823d425SMichał Górny       if (error_ptr)
1449823d425SMichał Górny         *error_ptr = std::move(m_pass_error);
1459823d425SMichał Górny 
1469823d425SMichał Górny       if (GetCloseOnEOF())
1479823d425SMichał Górny         Disconnect(nullptr);
1489823d425SMichał Górny       return 0;
1499823d425SMichał Górny     }
15089a3691bSPavel Labath     llvm_unreachable("Got unexpected event type!");
1519823d425SMichał Górny   }
1529823d425SMichał Górny 
1539823d425SMichał Górny   // We aren't using a read thread, just read the data synchronously in this
1549823d425SMichał Górny   // thread.
1553b919570SAugusto Noronha   return Communication::Read(dst, dst_len, timeout, status, error_ptr);
1569823d425SMichał Górny }
1579823d425SMichał Górny 
1589823d425SMichał Górny bool ThreadedCommunication::StartReadThread(Status *error_ptr) {
1591a8d9a76SJonas Devlieghere   std::lock_guard<std::mutex> lock(m_read_thread_mutex);
1601a8d9a76SJonas Devlieghere 
1619823d425SMichał Górny   if (error_ptr)
1629823d425SMichał Górny     error_ptr->Clear();
1639823d425SMichał Górny 
1649823d425SMichał Górny   if (m_read_thread.IsJoinable())
1659823d425SMichał Górny     return true;
1669823d425SMichał Górny 
1679823d425SMichał Górny   LLDB_LOG(GetLog(LLDBLog::Communication),
1689823d425SMichał Górny            "{0} ThreadedCommunication::StartReadThread ()", this);
1699823d425SMichał Górny 
1709823d425SMichał Górny   const std::string thread_name =
1719823d425SMichał Górny       llvm::formatv("<lldb.comm.{0}>", GetBroadcasterName());
1729823d425SMichał Górny 
1739823d425SMichał Górny   m_read_thread_enabled = true;
1749823d425SMichał Górny   m_read_thread_did_exit = false;
1759823d425SMichał Górny   auto maybe_thread = ThreadLauncher::LaunchThread(
1769823d425SMichał Górny       thread_name, [this] { return ReadThread(); });
1779823d425SMichał Górny   if (maybe_thread) {
1789823d425SMichał Górny     m_read_thread = *maybe_thread;
1799823d425SMichał Górny   } else {
1809823d425SMichał Górny     if (error_ptr)
181*a0dd90ebSAdrian Prantl       *error_ptr = Status::FromError(maybe_thread.takeError());
1829823d425SMichał Górny     else {
183520681e5SJonas Devlieghere       LLDB_LOG_ERROR(GetLog(LLDBLog::Host), maybe_thread.takeError(),
184520681e5SJonas Devlieghere                      "failed to launch host thread: {0}");
1859823d425SMichał Górny     }
1869823d425SMichał Górny   }
1879823d425SMichał Górny 
1889823d425SMichał Górny   if (!m_read_thread.IsJoinable())
1899823d425SMichał Górny     m_read_thread_enabled = false;
1909823d425SMichał Górny 
1919823d425SMichał Górny   return m_read_thread_enabled;
1929823d425SMichał Górny }
1939823d425SMichał Górny 
1949823d425SMichał Górny bool ThreadedCommunication::StopReadThread(Status *error_ptr) {
1951a8d9a76SJonas Devlieghere   std::lock_guard<std::mutex> lock(m_read_thread_mutex);
1961a8d9a76SJonas Devlieghere 
1979823d425SMichał Górny   if (!m_read_thread.IsJoinable())
1989823d425SMichał Górny     return true;
1999823d425SMichał Górny 
2009823d425SMichał Górny   LLDB_LOG(GetLog(LLDBLog::Communication),
2019823d425SMichał Górny            "{0} ThreadedCommunication::StopReadThread ()", this);
2029823d425SMichał Górny 
2039823d425SMichał Górny   m_read_thread_enabled = false;
2049823d425SMichał Górny 
2059823d425SMichał Górny   BroadcastEvent(eBroadcastBitReadThreadShouldExit, nullptr);
2069823d425SMichał Górny 
2079823d425SMichał Górny   Status error = m_read_thread.Join(nullptr);
2089823d425SMichał Górny   return error.Success();
2099823d425SMichał Górny }
2109823d425SMichał Górny 
2119823d425SMichał Górny bool ThreadedCommunication::JoinReadThread(Status *error_ptr) {
2121a8d9a76SJonas Devlieghere   std::lock_guard<std::mutex> lock(m_read_thread_mutex);
2131a8d9a76SJonas Devlieghere 
2149823d425SMichał Górny   if (!m_read_thread.IsJoinable())
2159823d425SMichał Górny     return true;
2169823d425SMichał Górny 
2179823d425SMichał Górny   Status error = m_read_thread.Join(nullptr);
2189823d425SMichał Górny   return error.Success();
2199823d425SMichał Górny }
2209823d425SMichał Górny 
2219823d425SMichał Górny size_t ThreadedCommunication::GetCachedBytes(void *dst, size_t dst_len) {
2229823d425SMichał Górny   std::lock_guard<std::recursive_mutex> guard(m_bytes_mutex);
2239823d425SMichał Górny   if (!m_bytes.empty()) {
2249823d425SMichał Górny     // If DST is nullptr and we have a thread, then return the number of bytes
2259823d425SMichał Górny     // that are available so the caller can call again
2269823d425SMichał Górny     if (dst == nullptr)
2279823d425SMichał Górny       return m_bytes.size();
2289823d425SMichał Górny 
2299823d425SMichał Górny     const size_t len = std::min<size_t>(dst_len, m_bytes.size());
2309823d425SMichał Górny 
2319823d425SMichał Górny     ::memcpy(dst, m_bytes.c_str(), len);
2329823d425SMichał Górny     m_bytes.erase(m_bytes.begin(), m_bytes.begin() + len);
2339823d425SMichał Górny 
2349823d425SMichał Górny     return len;
2359823d425SMichał Górny   }
2369823d425SMichał Górny   return 0;
2379823d425SMichał Górny }
2389823d425SMichał Górny 
2399823d425SMichał Górny void ThreadedCommunication::AppendBytesToCache(const uint8_t *bytes, size_t len,
2409823d425SMichał Górny                                                bool broadcast,
2419823d425SMichał Górny                                                ConnectionStatus status) {
2429823d425SMichał Górny   LLDB_LOG(GetLog(LLDBLog::Communication),
2439823d425SMichał Górny            "{0} ThreadedCommunication::AppendBytesToCache (src = {1}, src_len "
2449823d425SMichał Górny            "= {2}, "
2459823d425SMichał Górny            "broadcast = {3})",
2469823d425SMichał Górny            this, bytes, (uint64_t)len, broadcast);
2479823d425SMichał Górny   if ((bytes == nullptr || len == 0) &&
2489823d425SMichał Górny       (status != lldb::eConnectionStatusEndOfFile))
2499823d425SMichał Górny     return;
2509823d425SMichał Górny   if (m_callback) {
2519823d425SMichał Górny     // If the user registered a callback, then call it and do not broadcast
2529823d425SMichał Górny     m_callback(m_callback_baton, bytes, len);
2539823d425SMichał Górny   } else if (bytes != nullptr && len > 0) {
2549823d425SMichał Górny     std::lock_guard<std::recursive_mutex> guard(m_bytes_mutex);
2559823d425SMichał Górny     m_bytes.append((const char *)bytes, len);
2569823d425SMichał Górny     if (broadcast)
2579823d425SMichał Górny       BroadcastEventIfUnique(eBroadcastBitReadThreadGotBytes);
2589823d425SMichał Górny   }
2599823d425SMichał Górny }
2609823d425SMichał Górny 
2619823d425SMichał Górny bool ThreadedCommunication::ReadThreadIsRunning() {
2629823d425SMichał Górny   return m_read_thread_enabled;
2639823d425SMichał Górny }
2649823d425SMichał Górny 
2659823d425SMichał Górny lldb::thread_result_t ThreadedCommunication::ReadThread() {
2669823d425SMichał Górny   Log *log = GetLog(LLDBLog::Communication);
2679823d425SMichał Górny 
2689823d425SMichał Górny   LLDB_LOG(log, "Communication({0}) thread starting...", this);
2699823d425SMichał Górny 
2709823d425SMichał Górny   uint8_t buf[1024];
2719823d425SMichał Górny 
2729823d425SMichał Górny   Status error;
2739823d425SMichał Górny   ConnectionStatus status = eConnectionStatusSuccess;
2749823d425SMichał Górny   bool done = false;
2759823d425SMichał Górny   bool disconnect = false;
2769823d425SMichał Górny   while (!done && m_read_thread_enabled) {
2773b919570SAugusto Noronha     size_t bytes_read = ReadFromConnection(
2783b919570SAugusto Noronha         buf, sizeof(buf), std::chrono::seconds(5), status, &error);
2799823d425SMichał Górny     if (bytes_read > 0 || status == eConnectionStatusEndOfFile)
2809823d425SMichał Górny       AppendBytesToCache(buf, bytes_read, true, status);
2819823d425SMichał Górny 
2829823d425SMichał Górny     switch (status) {
2839823d425SMichał Górny     case eConnectionStatusSuccess:
2849823d425SMichał Górny       break;
2859823d425SMichał Górny 
2869823d425SMichał Górny     case eConnectionStatusEndOfFile:
2879823d425SMichał Górny       done = true;
2889823d425SMichał Górny       disconnect = GetCloseOnEOF();
2899823d425SMichał Górny       break;
2909823d425SMichał Górny     case eConnectionStatusError: // Check GetError() for details
2919823d425SMichał Górny       if (error.GetType() == eErrorTypePOSIX && error.GetError() == EIO) {
2929823d425SMichał Górny         // EIO on a pipe is usually caused by remote shutdown
2939823d425SMichał Górny         disconnect = GetCloseOnEOF();
2949823d425SMichał Górny         done = true;
2959823d425SMichał Górny       }
2969823d425SMichał Górny       if (error.Fail())
2979823d425SMichał Górny         LLDB_LOG(log, "error: {0}, status = {1}", error,
2989823d425SMichał Górny                  ThreadedCommunication::ConnectionStatusAsString(status));
2999823d425SMichał Górny       break;
3009823d425SMichał Górny     case eConnectionStatusInterrupted: // Synchronization signal from
3019823d425SMichał Górny                                        // SynchronizeWithReadThread()
3023b919570SAugusto Noronha       // The connection returns eConnectionStatusInterrupted only when there is
3033b919570SAugusto Noronha       // no input pending to be read, so we can signal that.
3049823d425SMichał Górny       BroadcastEvent(eBroadcastBitNoMorePendingInput);
3059823d425SMichał Górny       break;
3069823d425SMichał Górny     case eConnectionStatusNoConnection:   // No connection
3073b919570SAugusto Noronha     case eConnectionStatusLostConnection: // Lost connection while connected to
3083b919570SAugusto Noronha                                           // a valid connection
3099823d425SMichał Górny       done = true;
3109823d425SMichał Górny       [[fallthrough]];
3119823d425SMichał Górny     case eConnectionStatusTimedOut: // Request timed out
3129823d425SMichał Górny       if (error.Fail())
3139823d425SMichał Górny         LLDB_LOG(log, "error: {0}, status = {1}", error,
3149823d425SMichał Górny                  ThreadedCommunication::ConnectionStatusAsString(status));
3159823d425SMichał Górny       break;
3169823d425SMichał Górny     }
3179823d425SMichał Górny   }
3189823d425SMichał Górny   m_pass_status = status;
3199823d425SMichał Górny   m_pass_error = std::move(error);
3209823d425SMichał Górny   LLDB_LOG(log, "Communication({0}) thread exiting...", this);
3219823d425SMichał Górny 
32289a3691bSPavel Labath   // Start shutting down. We need to do this in a very specific order to ensure
32389a3691bSPavel Labath   // we don't race with threads wanting to read/synchronize with us.
32489a3691bSPavel Labath 
32589a3691bSPavel Labath   // First, we signal our intent to exit. This ensures no new thread start
32689a3691bSPavel Labath   // waiting on events from us.
3279823d425SMichał Górny   m_read_thread_did_exit = true;
3289823d425SMichał Górny 
3299823d425SMichał Górny   // Unblock any existing thread waiting for the synchronization signal.
3309823d425SMichał Górny   BroadcastEvent(eBroadcastBitNoMorePendingInput);
3319823d425SMichał Górny 
33289a3691bSPavel Labath   {
33389a3691bSPavel Labath     // Wait for the synchronization thread to finish...
3349823d425SMichał Górny     std::lock_guard<std::mutex> guard(m_synchronize_mutex);
3359823d425SMichał Górny     // ... and disconnect.
3369823d425SMichał Górny     if (disconnect)
3379823d425SMichał Górny       Disconnect();
3389823d425SMichał Górny   }
3399823d425SMichał Górny 
34089a3691bSPavel Labath   // Finally, unblock any readers waiting for us to exit.
3419823d425SMichał Górny   BroadcastEvent(eBroadcastBitReadThreadDidExit);
3429823d425SMichał Górny   return {};
3439823d425SMichał Górny }
3449823d425SMichał Górny 
3459823d425SMichał Górny void ThreadedCommunication::SetReadThreadBytesReceivedCallback(
3469823d425SMichał Górny     ReadThreadBytesReceived callback, void *callback_baton) {
3479823d425SMichał Górny   m_callback = callback;
3489823d425SMichał Górny   m_callback_baton = callback_baton;
3499823d425SMichał Górny }
3509823d425SMichał Górny 
3519823d425SMichał Górny void ThreadedCommunication::SynchronizeWithReadThread() {
3529823d425SMichał Górny   // Only one thread can do the synchronization dance at a time.
3539823d425SMichał Górny   std::lock_guard<std::mutex> guard(m_synchronize_mutex);
3549823d425SMichał Górny 
3559823d425SMichał Górny   // First start listening for the synchronization event.
3569823d425SMichał Górny   ListenerSP listener_sp(Listener::MakeListener(
3579823d425SMichał Górny       "ThreadedCommunication::SyncronizeWithReadThread"));
3589823d425SMichał Górny   listener_sp->StartListeningForEvents(this, eBroadcastBitNoMorePendingInput);
3599823d425SMichał Górny 
3609823d425SMichał Górny   // If the thread is not running, there is no point in synchronizing.
3619823d425SMichał Górny   if (!m_read_thread_enabled || m_read_thread_did_exit)
3629823d425SMichał Górny     return;
3639823d425SMichał Górny 
3649823d425SMichał Górny   // Notify the read thread.
3659823d425SMichał Górny   m_connection_sp->InterruptRead();
3669823d425SMichał Górny 
3679823d425SMichał Górny   // Wait for the synchronization event.
3689823d425SMichał Górny   EventSP event_sp;
369343523d0SKazu Hirata   listener_sp->GetEvent(event_sp, std::nullopt);
3709823d425SMichał Górny }
3719823d425SMichał Górny 
3729823d425SMichał Górny void ThreadedCommunication::SetConnection(
3739823d425SMichał Górny     std::unique_ptr<Connection> connection) {
3749823d425SMichał Górny   StopReadThread(nullptr);
3759823d425SMichał Górny   Communication::SetConnection(std::move(connection));
3769823d425SMichał Górny }
377