19823d425SMichał Górny //===-- ThreadedCommunication.cpp -----------------------------------------===// 29823d425SMichał Górny // 39823d425SMichał Górny // Part of the LLVM Project, under the Apache License v2.0 with LLVM Exceptions. 49823d425SMichał Górny // See https://llvm.org/LICENSE.txt for license information. 59823d425SMichał Górny // SPDX-License-Identifier: Apache-2.0 WITH LLVM-exception 69823d425SMichał Górny // 79823d425SMichał Górny //===----------------------------------------------------------------------===// 89823d425SMichał Górny 99823d425SMichał Górny #include "lldb/Core/ThreadedCommunication.h" 109823d425SMichał Górny 119823d425SMichał Górny #include "lldb/Host/ThreadLauncher.h" 129823d425SMichał Górny #include "lldb/Utility/Connection.h" 139823d425SMichał Górny #include "lldb/Utility/ConstString.h" 149823d425SMichał Górny #include "lldb/Utility/Event.h" 159823d425SMichał Górny #include "lldb/Utility/LLDBLog.h" 169823d425SMichał Górny #include "lldb/Utility/Listener.h" 179823d425SMichał Górny #include "lldb/Utility/Log.h" 189823d425SMichał Górny #include "lldb/Utility/Status.h" 199823d425SMichał Górny 209823d425SMichał Górny #include "llvm/Support/Compiler.h" 219823d425SMichał Górny 229823d425SMichał Górny #include <algorithm> 239823d425SMichał Górny #include <chrono> 249823d425SMichał Górny #include <cstring> 259823d425SMichał Górny #include <memory> 261a8d9a76SJonas Devlieghere #include <shared_mutex> 279823d425SMichał Górny 289823d425SMichał Górny #include <cerrno> 299823d425SMichał Górny #include <cinttypes> 309823d425SMichał Górny #include <cstdio> 319823d425SMichał Górny 329823d425SMichał Górny using namespace lldb; 339823d425SMichał Górny using namespace lldb_private; 349823d425SMichał Górny 3557794835SAlex Langford llvm::StringRef ThreadedCommunication::GetStaticBroadcasterClass() { 3657794835SAlex Langford static constexpr llvm::StringLiteral class_name("lldb.communication"); 379823d425SMichał Górny return class_name; 389823d425SMichał Górny } 399823d425SMichał Górny 409823d425SMichał Górny ThreadedCommunication::ThreadedCommunication(const char *name) 419823d425SMichał Górny : Communication(), Broadcaster(nullptr, name), m_read_thread_enabled(false), 429823d425SMichał Górny m_read_thread_did_exit(false), m_bytes(), m_bytes_mutex(), 439823d425SMichał Górny m_synchronize_mutex(), m_callback(nullptr), m_callback_baton(nullptr) { 449823d425SMichał Górny LLDB_LOG(GetLog(LLDBLog::Object | LLDBLog::Communication), 459823d425SMichał Górny "{0} ThreadedCommunication::ThreadedCommunication (name = {1})", 469823d425SMichał Górny this, name); 479823d425SMichał Górny 489823d425SMichał Górny SetEventName(eBroadcastBitDisconnected, "disconnected"); 499823d425SMichał Górny SetEventName(eBroadcastBitReadThreadGotBytes, "got bytes"); 509823d425SMichał Górny SetEventName(eBroadcastBitReadThreadDidExit, "read thread did exit"); 519823d425SMichał Górny SetEventName(eBroadcastBitReadThreadShouldExit, "read thread should exit"); 529823d425SMichał Górny SetEventName(eBroadcastBitPacketAvailable, "packet available"); 539823d425SMichał Górny SetEventName(eBroadcastBitNoMorePendingInput, "no more pending input"); 549823d425SMichał Górny 559823d425SMichał Górny CheckInWithManager(); 569823d425SMichał Górny } 579823d425SMichał Górny 589823d425SMichał Górny ThreadedCommunication::~ThreadedCommunication() { 599823d425SMichał Górny LLDB_LOG(GetLog(LLDBLog::Object | LLDBLog::Communication), 609823d425SMichał Górny "{0} ThreadedCommunication::~ThreadedCommunication (name = {1})", 61867ee3b8SAlex Langford this, GetBroadcasterName()); 629823d425SMichał Górny } 639823d425SMichał Górny 643b919570SAugusto Noronha void ThreadedCommunication::Clear() { 653b919570SAugusto Noronha SetReadThreadBytesReceivedCallback(nullptr, nullptr); 663b919570SAugusto Noronha StopReadThread(nullptr); 673b919570SAugusto Noronha Communication::Clear(); 683b919570SAugusto Noronha } 699823d425SMichał Górny 709823d425SMichał Górny ConnectionStatus ThreadedCommunication::Disconnect(Status *error_ptr) { 719823d425SMichał Górny assert((!m_read_thread_enabled || m_read_thread_did_exit) && 729823d425SMichał Górny "Disconnecting while the read thread is running is racy!"); 739823d425SMichał Górny return Communication::Disconnect(error_ptr); 749823d425SMichał Górny } 759823d425SMichał Górny 769823d425SMichał Górny size_t ThreadedCommunication::Read(void *dst, size_t dst_len, 779823d425SMichał Górny const Timeout<std::micro> &timeout, 789823d425SMichał Górny ConnectionStatus &status, 799823d425SMichał Górny Status *error_ptr) { 809823d425SMichał Górny Log *log = GetLog(LLDBLog::Communication); 819823d425SMichał Górny LLDB_LOG( 829823d425SMichał Górny log, 839823d425SMichał Górny "this = {0}, dst = {1}, dst_len = {2}, timeout = {3}, connection = {4}", 849823d425SMichał Górny this, dst, dst_len, timeout, m_connection_sp.get()); 859823d425SMichał Górny 869823d425SMichał Górny if (m_read_thread_enabled) { 879823d425SMichał Górny // We have a dedicated read thread that is getting data for us 889823d425SMichał Górny size_t cached_bytes = GetCachedBytes(dst, dst_len); 899823d425SMichał Górny if (cached_bytes > 0) { 909823d425SMichał Górny status = eConnectionStatusSuccess; 919823d425SMichał Górny return cached_bytes; 929823d425SMichał Górny } 939823d425SMichał Górny if (timeout && timeout->count() == 0) { 949823d425SMichał Górny if (error_ptr) 950642cd76SAdrian Prantl *error_ptr = Status::FromErrorString("Timed out."); 969823d425SMichał Górny status = eConnectionStatusTimedOut; 979823d425SMichał Górny return 0; 989823d425SMichał Górny } 999823d425SMichał Górny 1009823d425SMichał Górny if (!m_connection_sp) { 1019823d425SMichał Górny if (error_ptr) 1020642cd76SAdrian Prantl *error_ptr = Status::FromErrorString("Invalid connection."); 1039823d425SMichał Górny status = eConnectionStatusNoConnection; 1049823d425SMichał Górny return 0; 1059823d425SMichał Górny } 1069823d425SMichał Górny 10789a3691bSPavel Labath // No data yet, we have to start listening. 1089823d425SMichał Górny ListenerSP listener_sp( 1099823d425SMichał Górny Listener::MakeListener("ThreadedCommunication::Read")); 1109823d425SMichał Górny listener_sp->StartListeningForEvents( 1119823d425SMichał Górny this, eBroadcastBitReadThreadGotBytes | eBroadcastBitReadThreadDidExit); 11289a3691bSPavel Labath 11389a3691bSPavel Labath // Re-check for data, as it might have arrived while we were setting up our 11489a3691bSPavel Labath // listener. 11589a3691bSPavel Labath cached_bytes = GetCachedBytes(dst, dst_len); 11689a3691bSPavel Labath if (cached_bytes > 0) { 11789a3691bSPavel Labath status = eConnectionStatusSuccess; 11889a3691bSPavel Labath return cached_bytes; 11989a3691bSPavel Labath } 12089a3691bSPavel Labath 1219823d425SMichał Górny EventSP event_sp; 12289a3691bSPavel Labath // Explicitly check for the thread exit, for the same reason. 12389a3691bSPavel Labath if (m_read_thread_did_exit) { 12489a3691bSPavel Labath // We've missed the event, lets just conjure one up. 12589a3691bSPavel Labath event_sp = std::make_shared<Event>(eBroadcastBitReadThreadDidExit); 12689a3691bSPavel Labath } else { 12789a3691bSPavel Labath if (!listener_sp->GetEvent(event_sp, timeout)) { 12889a3691bSPavel Labath if (error_ptr) 1290642cd76SAdrian Prantl *error_ptr = Status::FromErrorString("Timed out."); 13089a3691bSPavel Labath status = eConnectionStatusTimedOut; 13189a3691bSPavel Labath return 0; 13289a3691bSPavel Labath } 13389a3691bSPavel Labath } 1349823d425SMichał Górny const uint32_t event_type = event_sp->GetType(); 1359823d425SMichał Górny if (event_type & eBroadcastBitReadThreadGotBytes) { 1369823d425SMichał Górny return GetCachedBytes(dst, dst_len); 1379823d425SMichał Górny } 1389823d425SMichał Górny 1399823d425SMichał Górny if (event_type & eBroadcastBitReadThreadDidExit) { 1409823d425SMichał Górny // If the thread exited of its own accord, it either means it 1419823d425SMichał Górny // hit an end-of-file condition or an error. 1429823d425SMichał Górny status = m_pass_status; 1439823d425SMichał Górny if (error_ptr) 1449823d425SMichał Górny *error_ptr = std::move(m_pass_error); 1459823d425SMichał Górny 1469823d425SMichał Górny if (GetCloseOnEOF()) 1479823d425SMichał Górny Disconnect(nullptr); 1489823d425SMichał Górny return 0; 1499823d425SMichał Górny } 15089a3691bSPavel Labath llvm_unreachable("Got unexpected event type!"); 1519823d425SMichał Górny } 1529823d425SMichał Górny 1539823d425SMichał Górny // We aren't using a read thread, just read the data synchronously in this 1549823d425SMichał Górny // thread. 1553b919570SAugusto Noronha return Communication::Read(dst, dst_len, timeout, status, error_ptr); 1569823d425SMichał Górny } 1579823d425SMichał Górny 1589823d425SMichał Górny bool ThreadedCommunication::StartReadThread(Status *error_ptr) { 1591a8d9a76SJonas Devlieghere std::lock_guard<std::mutex> lock(m_read_thread_mutex); 1601a8d9a76SJonas Devlieghere 1619823d425SMichał Górny if (error_ptr) 1629823d425SMichał Górny error_ptr->Clear(); 1639823d425SMichał Górny 1649823d425SMichał Górny if (m_read_thread.IsJoinable()) 1659823d425SMichał Górny return true; 1669823d425SMichał Górny 1679823d425SMichał Górny LLDB_LOG(GetLog(LLDBLog::Communication), 1689823d425SMichał Górny "{0} ThreadedCommunication::StartReadThread ()", this); 1699823d425SMichał Górny 1709823d425SMichał Górny const std::string thread_name = 1719823d425SMichał Górny llvm::formatv("<lldb.comm.{0}>", GetBroadcasterName()); 1729823d425SMichał Górny 1739823d425SMichał Górny m_read_thread_enabled = true; 1749823d425SMichał Górny m_read_thread_did_exit = false; 1759823d425SMichał Górny auto maybe_thread = ThreadLauncher::LaunchThread( 1769823d425SMichał Górny thread_name, [this] { return ReadThread(); }); 1779823d425SMichał Górny if (maybe_thread) { 1789823d425SMichał Górny m_read_thread = *maybe_thread; 1799823d425SMichał Górny } else { 1809823d425SMichał Górny if (error_ptr) 181*a0dd90ebSAdrian Prantl *error_ptr = Status::FromError(maybe_thread.takeError()); 1829823d425SMichał Górny else { 183520681e5SJonas Devlieghere LLDB_LOG_ERROR(GetLog(LLDBLog::Host), maybe_thread.takeError(), 184520681e5SJonas Devlieghere "failed to launch host thread: {0}"); 1859823d425SMichał Górny } 1869823d425SMichał Górny } 1879823d425SMichał Górny 1889823d425SMichał Górny if (!m_read_thread.IsJoinable()) 1899823d425SMichał Górny m_read_thread_enabled = false; 1909823d425SMichał Górny 1919823d425SMichał Górny return m_read_thread_enabled; 1929823d425SMichał Górny } 1939823d425SMichał Górny 1949823d425SMichał Górny bool ThreadedCommunication::StopReadThread(Status *error_ptr) { 1951a8d9a76SJonas Devlieghere std::lock_guard<std::mutex> lock(m_read_thread_mutex); 1961a8d9a76SJonas Devlieghere 1979823d425SMichał Górny if (!m_read_thread.IsJoinable()) 1989823d425SMichał Górny return true; 1999823d425SMichał Górny 2009823d425SMichał Górny LLDB_LOG(GetLog(LLDBLog::Communication), 2019823d425SMichał Górny "{0} ThreadedCommunication::StopReadThread ()", this); 2029823d425SMichał Górny 2039823d425SMichał Górny m_read_thread_enabled = false; 2049823d425SMichał Górny 2059823d425SMichał Górny BroadcastEvent(eBroadcastBitReadThreadShouldExit, nullptr); 2069823d425SMichał Górny 2079823d425SMichał Górny Status error = m_read_thread.Join(nullptr); 2089823d425SMichał Górny return error.Success(); 2099823d425SMichał Górny } 2109823d425SMichał Górny 2119823d425SMichał Górny bool ThreadedCommunication::JoinReadThread(Status *error_ptr) { 2121a8d9a76SJonas Devlieghere std::lock_guard<std::mutex> lock(m_read_thread_mutex); 2131a8d9a76SJonas Devlieghere 2149823d425SMichał Górny if (!m_read_thread.IsJoinable()) 2159823d425SMichał Górny return true; 2169823d425SMichał Górny 2179823d425SMichał Górny Status error = m_read_thread.Join(nullptr); 2189823d425SMichał Górny return error.Success(); 2199823d425SMichał Górny } 2209823d425SMichał Górny 2219823d425SMichał Górny size_t ThreadedCommunication::GetCachedBytes(void *dst, size_t dst_len) { 2229823d425SMichał Górny std::lock_guard<std::recursive_mutex> guard(m_bytes_mutex); 2239823d425SMichał Górny if (!m_bytes.empty()) { 2249823d425SMichał Górny // If DST is nullptr and we have a thread, then return the number of bytes 2259823d425SMichał Górny // that are available so the caller can call again 2269823d425SMichał Górny if (dst == nullptr) 2279823d425SMichał Górny return m_bytes.size(); 2289823d425SMichał Górny 2299823d425SMichał Górny const size_t len = std::min<size_t>(dst_len, m_bytes.size()); 2309823d425SMichał Górny 2319823d425SMichał Górny ::memcpy(dst, m_bytes.c_str(), len); 2329823d425SMichał Górny m_bytes.erase(m_bytes.begin(), m_bytes.begin() + len); 2339823d425SMichał Górny 2349823d425SMichał Górny return len; 2359823d425SMichał Górny } 2369823d425SMichał Górny return 0; 2379823d425SMichał Górny } 2389823d425SMichał Górny 2399823d425SMichał Górny void ThreadedCommunication::AppendBytesToCache(const uint8_t *bytes, size_t len, 2409823d425SMichał Górny bool broadcast, 2419823d425SMichał Górny ConnectionStatus status) { 2429823d425SMichał Górny LLDB_LOG(GetLog(LLDBLog::Communication), 2439823d425SMichał Górny "{0} ThreadedCommunication::AppendBytesToCache (src = {1}, src_len " 2449823d425SMichał Górny "= {2}, " 2459823d425SMichał Górny "broadcast = {3})", 2469823d425SMichał Górny this, bytes, (uint64_t)len, broadcast); 2479823d425SMichał Górny if ((bytes == nullptr || len == 0) && 2489823d425SMichał Górny (status != lldb::eConnectionStatusEndOfFile)) 2499823d425SMichał Górny return; 2509823d425SMichał Górny if (m_callback) { 2519823d425SMichał Górny // If the user registered a callback, then call it and do not broadcast 2529823d425SMichał Górny m_callback(m_callback_baton, bytes, len); 2539823d425SMichał Górny } else if (bytes != nullptr && len > 0) { 2549823d425SMichał Górny std::lock_guard<std::recursive_mutex> guard(m_bytes_mutex); 2559823d425SMichał Górny m_bytes.append((const char *)bytes, len); 2569823d425SMichał Górny if (broadcast) 2579823d425SMichał Górny BroadcastEventIfUnique(eBroadcastBitReadThreadGotBytes); 2589823d425SMichał Górny } 2599823d425SMichał Górny } 2609823d425SMichał Górny 2619823d425SMichał Górny bool ThreadedCommunication::ReadThreadIsRunning() { 2629823d425SMichał Górny return m_read_thread_enabled; 2639823d425SMichał Górny } 2649823d425SMichał Górny 2659823d425SMichał Górny lldb::thread_result_t ThreadedCommunication::ReadThread() { 2669823d425SMichał Górny Log *log = GetLog(LLDBLog::Communication); 2679823d425SMichał Górny 2689823d425SMichał Górny LLDB_LOG(log, "Communication({0}) thread starting...", this); 2699823d425SMichał Górny 2709823d425SMichał Górny uint8_t buf[1024]; 2719823d425SMichał Górny 2729823d425SMichał Górny Status error; 2739823d425SMichał Górny ConnectionStatus status = eConnectionStatusSuccess; 2749823d425SMichał Górny bool done = false; 2759823d425SMichał Górny bool disconnect = false; 2769823d425SMichał Górny while (!done && m_read_thread_enabled) { 2773b919570SAugusto Noronha size_t bytes_read = ReadFromConnection( 2783b919570SAugusto Noronha buf, sizeof(buf), std::chrono::seconds(5), status, &error); 2799823d425SMichał Górny if (bytes_read > 0 || status == eConnectionStatusEndOfFile) 2809823d425SMichał Górny AppendBytesToCache(buf, bytes_read, true, status); 2819823d425SMichał Górny 2829823d425SMichał Górny switch (status) { 2839823d425SMichał Górny case eConnectionStatusSuccess: 2849823d425SMichał Górny break; 2859823d425SMichał Górny 2869823d425SMichał Górny case eConnectionStatusEndOfFile: 2879823d425SMichał Górny done = true; 2889823d425SMichał Górny disconnect = GetCloseOnEOF(); 2899823d425SMichał Górny break; 2909823d425SMichał Górny case eConnectionStatusError: // Check GetError() for details 2919823d425SMichał Górny if (error.GetType() == eErrorTypePOSIX && error.GetError() == EIO) { 2929823d425SMichał Górny // EIO on a pipe is usually caused by remote shutdown 2939823d425SMichał Górny disconnect = GetCloseOnEOF(); 2949823d425SMichał Górny done = true; 2959823d425SMichał Górny } 2969823d425SMichał Górny if (error.Fail()) 2979823d425SMichał Górny LLDB_LOG(log, "error: {0}, status = {1}", error, 2989823d425SMichał Górny ThreadedCommunication::ConnectionStatusAsString(status)); 2999823d425SMichał Górny break; 3009823d425SMichał Górny case eConnectionStatusInterrupted: // Synchronization signal from 3019823d425SMichał Górny // SynchronizeWithReadThread() 3023b919570SAugusto Noronha // The connection returns eConnectionStatusInterrupted only when there is 3033b919570SAugusto Noronha // no input pending to be read, so we can signal that. 3049823d425SMichał Górny BroadcastEvent(eBroadcastBitNoMorePendingInput); 3059823d425SMichał Górny break; 3069823d425SMichał Górny case eConnectionStatusNoConnection: // No connection 3073b919570SAugusto Noronha case eConnectionStatusLostConnection: // Lost connection while connected to 3083b919570SAugusto Noronha // a valid connection 3099823d425SMichał Górny done = true; 3109823d425SMichał Górny [[fallthrough]]; 3119823d425SMichał Górny case eConnectionStatusTimedOut: // Request timed out 3129823d425SMichał Górny if (error.Fail()) 3139823d425SMichał Górny LLDB_LOG(log, "error: {0}, status = {1}", error, 3149823d425SMichał Górny ThreadedCommunication::ConnectionStatusAsString(status)); 3159823d425SMichał Górny break; 3169823d425SMichał Górny } 3179823d425SMichał Górny } 3189823d425SMichał Górny m_pass_status = status; 3199823d425SMichał Górny m_pass_error = std::move(error); 3209823d425SMichał Górny LLDB_LOG(log, "Communication({0}) thread exiting...", this); 3219823d425SMichał Górny 32289a3691bSPavel Labath // Start shutting down. We need to do this in a very specific order to ensure 32389a3691bSPavel Labath // we don't race with threads wanting to read/synchronize with us. 32489a3691bSPavel Labath 32589a3691bSPavel Labath // First, we signal our intent to exit. This ensures no new thread start 32689a3691bSPavel Labath // waiting on events from us. 3279823d425SMichał Górny m_read_thread_did_exit = true; 3289823d425SMichał Górny 3299823d425SMichał Górny // Unblock any existing thread waiting for the synchronization signal. 3309823d425SMichał Górny BroadcastEvent(eBroadcastBitNoMorePendingInput); 3319823d425SMichał Górny 33289a3691bSPavel Labath { 33389a3691bSPavel Labath // Wait for the synchronization thread to finish... 3349823d425SMichał Górny std::lock_guard<std::mutex> guard(m_synchronize_mutex); 3359823d425SMichał Górny // ... and disconnect. 3369823d425SMichał Górny if (disconnect) 3379823d425SMichał Górny Disconnect(); 3389823d425SMichał Górny } 3399823d425SMichał Górny 34089a3691bSPavel Labath // Finally, unblock any readers waiting for us to exit. 3419823d425SMichał Górny BroadcastEvent(eBroadcastBitReadThreadDidExit); 3429823d425SMichał Górny return {}; 3439823d425SMichał Górny } 3449823d425SMichał Górny 3459823d425SMichał Górny void ThreadedCommunication::SetReadThreadBytesReceivedCallback( 3469823d425SMichał Górny ReadThreadBytesReceived callback, void *callback_baton) { 3479823d425SMichał Górny m_callback = callback; 3489823d425SMichał Górny m_callback_baton = callback_baton; 3499823d425SMichał Górny } 3509823d425SMichał Górny 3519823d425SMichał Górny void ThreadedCommunication::SynchronizeWithReadThread() { 3529823d425SMichał Górny // Only one thread can do the synchronization dance at a time. 3539823d425SMichał Górny std::lock_guard<std::mutex> guard(m_synchronize_mutex); 3549823d425SMichał Górny 3559823d425SMichał Górny // First start listening for the synchronization event. 3569823d425SMichał Górny ListenerSP listener_sp(Listener::MakeListener( 3579823d425SMichał Górny "ThreadedCommunication::SyncronizeWithReadThread")); 3589823d425SMichał Górny listener_sp->StartListeningForEvents(this, eBroadcastBitNoMorePendingInput); 3599823d425SMichał Górny 3609823d425SMichał Górny // If the thread is not running, there is no point in synchronizing. 3619823d425SMichał Górny if (!m_read_thread_enabled || m_read_thread_did_exit) 3629823d425SMichał Górny return; 3639823d425SMichał Górny 3649823d425SMichał Górny // Notify the read thread. 3659823d425SMichał Górny m_connection_sp->InterruptRead(); 3669823d425SMichał Górny 3679823d425SMichał Górny // Wait for the synchronization event. 3689823d425SMichał Górny EventSP event_sp; 369343523d0SKazu Hirata listener_sp->GetEvent(event_sp, std::nullopt); 3709823d425SMichał Górny } 3719823d425SMichał Górny 3729823d425SMichał Górny void ThreadedCommunication::SetConnection( 3739823d425SMichał Górny std::unique_ptr<Connection> connection) { 3749823d425SMichał Górny StopReadThread(nullptr); 3759823d425SMichał Górny Communication::SetConnection(std::move(connection)); 3769823d425SMichał Górny } 377