1bdd1243dSDimitry Andric //===-- ThreadedCommunication.cpp -----------------------------------------===// 2bdd1243dSDimitry Andric // 3bdd1243dSDimitry Andric // Part of the LLVM Project, under the Apache License v2.0 with LLVM Exceptions. 4bdd1243dSDimitry Andric // See https://llvm.org/LICENSE.txt for license information. 5bdd1243dSDimitry Andric // SPDX-License-Identifier: Apache-2.0 WITH LLVM-exception 6bdd1243dSDimitry Andric // 7bdd1243dSDimitry Andric //===----------------------------------------------------------------------===// 8bdd1243dSDimitry Andric 9bdd1243dSDimitry Andric #include "lldb/Core/ThreadedCommunication.h" 10bdd1243dSDimitry Andric 11bdd1243dSDimitry Andric #include "lldb/Host/ThreadLauncher.h" 12bdd1243dSDimitry Andric #include "lldb/Utility/Connection.h" 13bdd1243dSDimitry Andric #include "lldb/Utility/ConstString.h" 14bdd1243dSDimitry Andric #include "lldb/Utility/Event.h" 15bdd1243dSDimitry Andric #include "lldb/Utility/LLDBLog.h" 16bdd1243dSDimitry Andric #include "lldb/Utility/Listener.h" 17bdd1243dSDimitry Andric #include "lldb/Utility/Log.h" 18bdd1243dSDimitry Andric #include "lldb/Utility/Status.h" 19bdd1243dSDimitry Andric 20bdd1243dSDimitry Andric #include "llvm/Support/Compiler.h" 21bdd1243dSDimitry Andric 22bdd1243dSDimitry Andric #include <algorithm> 23bdd1243dSDimitry Andric #include <chrono> 24bdd1243dSDimitry Andric #include <cstring> 25bdd1243dSDimitry Andric #include <memory> 265f757f3fSDimitry Andric #include <shared_mutex> 27bdd1243dSDimitry Andric 28bdd1243dSDimitry Andric #include <cerrno> 29bdd1243dSDimitry Andric #include <cinttypes> 30bdd1243dSDimitry Andric #include <cstdio> 31bdd1243dSDimitry Andric 32bdd1243dSDimitry Andric using namespace lldb; 33bdd1243dSDimitry Andric using namespace lldb_private; 34bdd1243dSDimitry Andric 35*0fca6ea1SDimitry Andric llvm::StringRef ThreadedCommunication::GetStaticBroadcasterClass() { 36*0fca6ea1SDimitry Andric static constexpr llvm::StringLiteral class_name("lldb.communication"); 37bdd1243dSDimitry Andric return class_name; 38bdd1243dSDimitry Andric } 39bdd1243dSDimitry Andric 40bdd1243dSDimitry Andric ThreadedCommunication::ThreadedCommunication(const char *name) 41bdd1243dSDimitry Andric : Communication(), Broadcaster(nullptr, name), m_read_thread_enabled(false), 42bdd1243dSDimitry Andric m_read_thread_did_exit(false), m_bytes(), m_bytes_mutex(), 43bdd1243dSDimitry Andric m_synchronize_mutex(), m_callback(nullptr), m_callback_baton(nullptr) { 44bdd1243dSDimitry Andric LLDB_LOG(GetLog(LLDBLog::Object | LLDBLog::Communication), 45bdd1243dSDimitry Andric "{0} ThreadedCommunication::ThreadedCommunication (name = {1})", 46bdd1243dSDimitry Andric this, name); 47bdd1243dSDimitry Andric 48bdd1243dSDimitry Andric SetEventName(eBroadcastBitDisconnected, "disconnected"); 49bdd1243dSDimitry Andric SetEventName(eBroadcastBitReadThreadGotBytes, "got bytes"); 50bdd1243dSDimitry Andric SetEventName(eBroadcastBitReadThreadDidExit, "read thread did exit"); 51bdd1243dSDimitry Andric SetEventName(eBroadcastBitReadThreadShouldExit, "read thread should exit"); 52bdd1243dSDimitry Andric SetEventName(eBroadcastBitPacketAvailable, "packet available"); 53bdd1243dSDimitry Andric SetEventName(eBroadcastBitNoMorePendingInput, "no more pending input"); 54bdd1243dSDimitry Andric 55bdd1243dSDimitry Andric CheckInWithManager(); 56bdd1243dSDimitry Andric } 57bdd1243dSDimitry Andric 58bdd1243dSDimitry Andric ThreadedCommunication::~ThreadedCommunication() { 59bdd1243dSDimitry Andric LLDB_LOG(GetLog(LLDBLog::Object | LLDBLog::Communication), 60bdd1243dSDimitry Andric "{0} ThreadedCommunication::~ThreadedCommunication (name = {1})", 6106c3fb27SDimitry Andric this, GetBroadcasterName()); 62bdd1243dSDimitry Andric } 63bdd1243dSDimitry Andric 64bdd1243dSDimitry Andric void ThreadedCommunication::Clear() { 65bdd1243dSDimitry Andric SetReadThreadBytesReceivedCallback(nullptr, nullptr); 66bdd1243dSDimitry Andric StopReadThread(nullptr); 67bdd1243dSDimitry Andric Communication::Clear(); 68bdd1243dSDimitry Andric } 69bdd1243dSDimitry Andric 70bdd1243dSDimitry Andric ConnectionStatus ThreadedCommunication::Disconnect(Status *error_ptr) { 71bdd1243dSDimitry Andric assert((!m_read_thread_enabled || m_read_thread_did_exit) && 72bdd1243dSDimitry Andric "Disconnecting while the read thread is running is racy!"); 73bdd1243dSDimitry Andric return Communication::Disconnect(error_ptr); 74bdd1243dSDimitry Andric } 75bdd1243dSDimitry Andric 76bdd1243dSDimitry Andric size_t ThreadedCommunication::Read(void *dst, size_t dst_len, 77bdd1243dSDimitry Andric const Timeout<std::micro> &timeout, 78bdd1243dSDimitry Andric ConnectionStatus &status, 79bdd1243dSDimitry Andric Status *error_ptr) { 80bdd1243dSDimitry Andric Log *log = GetLog(LLDBLog::Communication); 81bdd1243dSDimitry Andric LLDB_LOG( 82bdd1243dSDimitry Andric log, 83bdd1243dSDimitry Andric "this = {0}, dst = {1}, dst_len = {2}, timeout = {3}, connection = {4}", 84bdd1243dSDimitry Andric this, dst, dst_len, timeout, m_connection_sp.get()); 85bdd1243dSDimitry Andric 86bdd1243dSDimitry Andric if (m_read_thread_enabled) { 87bdd1243dSDimitry Andric // We have a dedicated read thread that is getting data for us 88bdd1243dSDimitry Andric size_t cached_bytes = GetCachedBytes(dst, dst_len); 89bdd1243dSDimitry Andric if (cached_bytes > 0) { 90bdd1243dSDimitry Andric status = eConnectionStatusSuccess; 91bdd1243dSDimitry Andric return cached_bytes; 92bdd1243dSDimitry Andric } 93bdd1243dSDimitry Andric if (timeout && timeout->count() == 0) { 94bdd1243dSDimitry Andric if (error_ptr) 95bdd1243dSDimitry Andric error_ptr->SetErrorString("Timed out."); 96bdd1243dSDimitry Andric status = eConnectionStatusTimedOut; 97bdd1243dSDimitry Andric return 0; 98bdd1243dSDimitry Andric } 99bdd1243dSDimitry Andric 100bdd1243dSDimitry Andric if (!m_connection_sp) { 101bdd1243dSDimitry Andric if (error_ptr) 102bdd1243dSDimitry Andric error_ptr->SetErrorString("Invalid connection."); 103bdd1243dSDimitry Andric status = eConnectionStatusNoConnection; 104bdd1243dSDimitry Andric return 0; 105bdd1243dSDimitry Andric } 106bdd1243dSDimitry Andric 107bdd1243dSDimitry Andric // No data yet, we have to start listening. 108bdd1243dSDimitry Andric ListenerSP listener_sp( 109bdd1243dSDimitry Andric Listener::MakeListener("ThreadedCommunication::Read")); 110bdd1243dSDimitry Andric listener_sp->StartListeningForEvents( 111bdd1243dSDimitry Andric this, eBroadcastBitReadThreadGotBytes | eBroadcastBitReadThreadDidExit); 112bdd1243dSDimitry Andric 113bdd1243dSDimitry Andric // Re-check for data, as it might have arrived while we were setting up our 114bdd1243dSDimitry Andric // listener. 115bdd1243dSDimitry Andric cached_bytes = GetCachedBytes(dst, dst_len); 116bdd1243dSDimitry Andric if (cached_bytes > 0) { 117bdd1243dSDimitry Andric status = eConnectionStatusSuccess; 118bdd1243dSDimitry Andric return cached_bytes; 119bdd1243dSDimitry Andric } 120bdd1243dSDimitry Andric 121bdd1243dSDimitry Andric EventSP event_sp; 122bdd1243dSDimitry Andric // Explicitly check for the thread exit, for the same reason. 123bdd1243dSDimitry Andric if (m_read_thread_did_exit) { 124bdd1243dSDimitry Andric // We've missed the event, lets just conjure one up. 125bdd1243dSDimitry Andric event_sp = std::make_shared<Event>(eBroadcastBitReadThreadDidExit); 126bdd1243dSDimitry Andric } else { 127bdd1243dSDimitry Andric if (!listener_sp->GetEvent(event_sp, timeout)) { 128bdd1243dSDimitry Andric if (error_ptr) 129bdd1243dSDimitry Andric error_ptr->SetErrorString("Timed out."); 130bdd1243dSDimitry Andric status = eConnectionStatusTimedOut; 131bdd1243dSDimitry Andric return 0; 132bdd1243dSDimitry Andric } 133bdd1243dSDimitry Andric } 134bdd1243dSDimitry Andric const uint32_t event_type = event_sp->GetType(); 135bdd1243dSDimitry Andric if (event_type & eBroadcastBitReadThreadGotBytes) { 136bdd1243dSDimitry Andric return GetCachedBytes(dst, dst_len); 137bdd1243dSDimitry Andric } 138bdd1243dSDimitry Andric 139bdd1243dSDimitry Andric if (event_type & eBroadcastBitReadThreadDidExit) { 140bdd1243dSDimitry Andric // If the thread exited of its own accord, it either means it 141bdd1243dSDimitry Andric // hit an end-of-file condition or an error. 142bdd1243dSDimitry Andric status = m_pass_status; 143bdd1243dSDimitry Andric if (error_ptr) 144bdd1243dSDimitry Andric *error_ptr = std::move(m_pass_error); 145bdd1243dSDimitry Andric 146bdd1243dSDimitry Andric if (GetCloseOnEOF()) 147bdd1243dSDimitry Andric Disconnect(nullptr); 148bdd1243dSDimitry Andric return 0; 149bdd1243dSDimitry Andric } 150bdd1243dSDimitry Andric llvm_unreachable("Got unexpected event type!"); 151bdd1243dSDimitry Andric } 152bdd1243dSDimitry Andric 153bdd1243dSDimitry Andric // We aren't using a read thread, just read the data synchronously in this 154bdd1243dSDimitry Andric // thread. 155bdd1243dSDimitry Andric return Communication::Read(dst, dst_len, timeout, status, error_ptr); 156bdd1243dSDimitry Andric } 157bdd1243dSDimitry Andric 158bdd1243dSDimitry Andric bool ThreadedCommunication::StartReadThread(Status *error_ptr) { 1595f757f3fSDimitry Andric std::lock_guard<std::mutex> lock(m_read_thread_mutex); 1605f757f3fSDimitry Andric 161bdd1243dSDimitry Andric if (error_ptr) 162bdd1243dSDimitry Andric error_ptr->Clear(); 163bdd1243dSDimitry Andric 164bdd1243dSDimitry Andric if (m_read_thread.IsJoinable()) 165bdd1243dSDimitry Andric return true; 166bdd1243dSDimitry Andric 167bdd1243dSDimitry Andric LLDB_LOG(GetLog(LLDBLog::Communication), 168bdd1243dSDimitry Andric "{0} ThreadedCommunication::StartReadThread ()", this); 169bdd1243dSDimitry Andric 170bdd1243dSDimitry Andric const std::string thread_name = 171bdd1243dSDimitry Andric llvm::formatv("<lldb.comm.{0}>", GetBroadcasterName()); 172bdd1243dSDimitry Andric 173bdd1243dSDimitry Andric m_read_thread_enabled = true; 174bdd1243dSDimitry Andric m_read_thread_did_exit = false; 175bdd1243dSDimitry Andric auto maybe_thread = ThreadLauncher::LaunchThread( 176bdd1243dSDimitry Andric thread_name, [this] { return ReadThread(); }); 177bdd1243dSDimitry Andric if (maybe_thread) { 178bdd1243dSDimitry Andric m_read_thread = *maybe_thread; 179bdd1243dSDimitry Andric } else { 180bdd1243dSDimitry Andric if (error_ptr) 181bdd1243dSDimitry Andric *error_ptr = Status(maybe_thread.takeError()); 182bdd1243dSDimitry Andric else { 18306c3fb27SDimitry Andric LLDB_LOG_ERROR(GetLog(LLDBLog::Host), maybe_thread.takeError(), 18406c3fb27SDimitry Andric "failed to launch host thread: {0}"); 185bdd1243dSDimitry Andric } 186bdd1243dSDimitry Andric } 187bdd1243dSDimitry Andric 188bdd1243dSDimitry Andric if (!m_read_thread.IsJoinable()) 189bdd1243dSDimitry Andric m_read_thread_enabled = false; 190bdd1243dSDimitry Andric 191bdd1243dSDimitry Andric return m_read_thread_enabled; 192bdd1243dSDimitry Andric } 193bdd1243dSDimitry Andric 194bdd1243dSDimitry Andric bool ThreadedCommunication::StopReadThread(Status *error_ptr) { 1955f757f3fSDimitry Andric std::lock_guard<std::mutex> lock(m_read_thread_mutex); 1965f757f3fSDimitry Andric 197bdd1243dSDimitry Andric if (!m_read_thread.IsJoinable()) 198bdd1243dSDimitry Andric return true; 199bdd1243dSDimitry Andric 200bdd1243dSDimitry Andric LLDB_LOG(GetLog(LLDBLog::Communication), 201bdd1243dSDimitry Andric "{0} ThreadedCommunication::StopReadThread ()", this); 202bdd1243dSDimitry Andric 203bdd1243dSDimitry Andric m_read_thread_enabled = false; 204bdd1243dSDimitry Andric 205bdd1243dSDimitry Andric BroadcastEvent(eBroadcastBitReadThreadShouldExit, nullptr); 206bdd1243dSDimitry Andric 207bdd1243dSDimitry Andric Status error = m_read_thread.Join(nullptr); 208bdd1243dSDimitry Andric return error.Success(); 209bdd1243dSDimitry Andric } 210bdd1243dSDimitry Andric 211bdd1243dSDimitry Andric bool ThreadedCommunication::JoinReadThread(Status *error_ptr) { 2125f757f3fSDimitry Andric std::lock_guard<std::mutex> lock(m_read_thread_mutex); 2135f757f3fSDimitry Andric 214bdd1243dSDimitry Andric if (!m_read_thread.IsJoinable()) 215bdd1243dSDimitry Andric return true; 216bdd1243dSDimitry Andric 217bdd1243dSDimitry Andric Status error = m_read_thread.Join(nullptr); 218bdd1243dSDimitry Andric return error.Success(); 219bdd1243dSDimitry Andric } 220bdd1243dSDimitry Andric 221bdd1243dSDimitry Andric size_t ThreadedCommunication::GetCachedBytes(void *dst, size_t dst_len) { 222bdd1243dSDimitry Andric std::lock_guard<std::recursive_mutex> guard(m_bytes_mutex); 223bdd1243dSDimitry Andric if (!m_bytes.empty()) { 224bdd1243dSDimitry Andric // If DST is nullptr and we have a thread, then return the number of bytes 225bdd1243dSDimitry Andric // that are available so the caller can call again 226bdd1243dSDimitry Andric if (dst == nullptr) 227bdd1243dSDimitry Andric return m_bytes.size(); 228bdd1243dSDimitry Andric 229bdd1243dSDimitry Andric const size_t len = std::min<size_t>(dst_len, m_bytes.size()); 230bdd1243dSDimitry Andric 231bdd1243dSDimitry Andric ::memcpy(dst, m_bytes.c_str(), len); 232bdd1243dSDimitry Andric m_bytes.erase(m_bytes.begin(), m_bytes.begin() + len); 233bdd1243dSDimitry Andric 234bdd1243dSDimitry Andric return len; 235bdd1243dSDimitry Andric } 236bdd1243dSDimitry Andric return 0; 237bdd1243dSDimitry Andric } 238bdd1243dSDimitry Andric 239bdd1243dSDimitry Andric void ThreadedCommunication::AppendBytesToCache(const uint8_t *bytes, size_t len, 240bdd1243dSDimitry Andric bool broadcast, 241bdd1243dSDimitry Andric ConnectionStatus status) { 242bdd1243dSDimitry Andric LLDB_LOG(GetLog(LLDBLog::Communication), 243bdd1243dSDimitry Andric "{0} ThreadedCommunication::AppendBytesToCache (src = {1}, src_len " 244bdd1243dSDimitry Andric "= {2}, " 245bdd1243dSDimitry Andric "broadcast = {3})", 246bdd1243dSDimitry Andric this, bytes, (uint64_t)len, broadcast); 247bdd1243dSDimitry Andric if ((bytes == nullptr || len == 0) && 248bdd1243dSDimitry Andric (status != lldb::eConnectionStatusEndOfFile)) 249bdd1243dSDimitry Andric return; 250bdd1243dSDimitry Andric if (m_callback) { 251bdd1243dSDimitry Andric // If the user registered a callback, then call it and do not broadcast 252bdd1243dSDimitry Andric m_callback(m_callback_baton, bytes, len); 253bdd1243dSDimitry Andric } else if (bytes != nullptr && len > 0) { 254bdd1243dSDimitry Andric std::lock_guard<std::recursive_mutex> guard(m_bytes_mutex); 255bdd1243dSDimitry Andric m_bytes.append((const char *)bytes, len); 256bdd1243dSDimitry Andric if (broadcast) 257bdd1243dSDimitry Andric BroadcastEventIfUnique(eBroadcastBitReadThreadGotBytes); 258bdd1243dSDimitry Andric } 259bdd1243dSDimitry Andric } 260bdd1243dSDimitry Andric 261bdd1243dSDimitry Andric bool ThreadedCommunication::ReadThreadIsRunning() { 262bdd1243dSDimitry Andric return m_read_thread_enabled; 263bdd1243dSDimitry Andric } 264bdd1243dSDimitry Andric 265bdd1243dSDimitry Andric lldb::thread_result_t ThreadedCommunication::ReadThread() { 266bdd1243dSDimitry Andric Log *log = GetLog(LLDBLog::Communication); 267bdd1243dSDimitry Andric 268bdd1243dSDimitry Andric LLDB_LOG(log, "Communication({0}) thread starting...", this); 269bdd1243dSDimitry Andric 270bdd1243dSDimitry Andric uint8_t buf[1024]; 271bdd1243dSDimitry Andric 272bdd1243dSDimitry Andric Status error; 273bdd1243dSDimitry Andric ConnectionStatus status = eConnectionStatusSuccess; 274bdd1243dSDimitry Andric bool done = false; 275bdd1243dSDimitry Andric bool disconnect = false; 276bdd1243dSDimitry Andric while (!done && m_read_thread_enabled) { 277bdd1243dSDimitry Andric size_t bytes_read = ReadFromConnection( 278bdd1243dSDimitry Andric buf, sizeof(buf), std::chrono::seconds(5), status, &error); 279bdd1243dSDimitry Andric if (bytes_read > 0 || status == eConnectionStatusEndOfFile) 280bdd1243dSDimitry Andric AppendBytesToCache(buf, bytes_read, true, status); 281bdd1243dSDimitry Andric 282bdd1243dSDimitry Andric switch (status) { 283bdd1243dSDimitry Andric case eConnectionStatusSuccess: 284bdd1243dSDimitry Andric break; 285bdd1243dSDimitry Andric 286bdd1243dSDimitry Andric case eConnectionStatusEndOfFile: 287bdd1243dSDimitry Andric done = true; 288bdd1243dSDimitry Andric disconnect = GetCloseOnEOF(); 289bdd1243dSDimitry Andric break; 290bdd1243dSDimitry Andric case eConnectionStatusError: // Check GetError() for details 291bdd1243dSDimitry Andric if (error.GetType() == eErrorTypePOSIX && error.GetError() == EIO) { 292bdd1243dSDimitry Andric // EIO on a pipe is usually caused by remote shutdown 293bdd1243dSDimitry Andric disconnect = GetCloseOnEOF(); 294bdd1243dSDimitry Andric done = true; 295bdd1243dSDimitry Andric } 296bdd1243dSDimitry Andric if (error.Fail()) 297bdd1243dSDimitry Andric LLDB_LOG(log, "error: {0}, status = {1}", error, 298bdd1243dSDimitry Andric ThreadedCommunication::ConnectionStatusAsString(status)); 299bdd1243dSDimitry Andric break; 300bdd1243dSDimitry Andric case eConnectionStatusInterrupted: // Synchronization signal from 301bdd1243dSDimitry Andric // SynchronizeWithReadThread() 302bdd1243dSDimitry Andric // The connection returns eConnectionStatusInterrupted only when there is 303bdd1243dSDimitry Andric // no input pending to be read, so we can signal that. 304bdd1243dSDimitry Andric BroadcastEvent(eBroadcastBitNoMorePendingInput); 305bdd1243dSDimitry Andric break; 306bdd1243dSDimitry Andric case eConnectionStatusNoConnection: // No connection 307bdd1243dSDimitry Andric case eConnectionStatusLostConnection: // Lost connection while connected to 308bdd1243dSDimitry Andric // a valid connection 309bdd1243dSDimitry Andric done = true; 310bdd1243dSDimitry Andric [[fallthrough]]; 311bdd1243dSDimitry Andric case eConnectionStatusTimedOut: // Request timed out 312bdd1243dSDimitry Andric if (error.Fail()) 313bdd1243dSDimitry Andric LLDB_LOG(log, "error: {0}, status = {1}", error, 314bdd1243dSDimitry Andric ThreadedCommunication::ConnectionStatusAsString(status)); 315bdd1243dSDimitry Andric break; 316bdd1243dSDimitry Andric } 317bdd1243dSDimitry Andric } 318bdd1243dSDimitry Andric m_pass_status = status; 319bdd1243dSDimitry Andric m_pass_error = std::move(error); 320bdd1243dSDimitry Andric LLDB_LOG(log, "Communication({0}) thread exiting...", this); 321bdd1243dSDimitry Andric 322bdd1243dSDimitry Andric // Start shutting down. We need to do this in a very specific order to ensure 323bdd1243dSDimitry Andric // we don't race with threads wanting to read/synchronize with us. 324bdd1243dSDimitry Andric 325bdd1243dSDimitry Andric // First, we signal our intent to exit. This ensures no new thread start 326bdd1243dSDimitry Andric // waiting on events from us. 327bdd1243dSDimitry Andric m_read_thread_did_exit = true; 328bdd1243dSDimitry Andric 329bdd1243dSDimitry Andric // Unblock any existing thread waiting for the synchronization signal. 330bdd1243dSDimitry Andric BroadcastEvent(eBroadcastBitNoMorePendingInput); 331bdd1243dSDimitry Andric 332bdd1243dSDimitry Andric { 333bdd1243dSDimitry Andric // Wait for the synchronization thread to finish... 334bdd1243dSDimitry Andric std::lock_guard<std::mutex> guard(m_synchronize_mutex); 335bdd1243dSDimitry Andric // ... and disconnect. 336bdd1243dSDimitry Andric if (disconnect) 337bdd1243dSDimitry Andric Disconnect(); 338bdd1243dSDimitry Andric } 339bdd1243dSDimitry Andric 340bdd1243dSDimitry Andric // Finally, unblock any readers waiting for us to exit. 341bdd1243dSDimitry Andric BroadcastEvent(eBroadcastBitReadThreadDidExit); 342bdd1243dSDimitry Andric return {}; 343bdd1243dSDimitry Andric } 344bdd1243dSDimitry Andric 345bdd1243dSDimitry Andric void ThreadedCommunication::SetReadThreadBytesReceivedCallback( 346bdd1243dSDimitry Andric ReadThreadBytesReceived callback, void *callback_baton) { 347bdd1243dSDimitry Andric m_callback = callback; 348bdd1243dSDimitry Andric m_callback_baton = callback_baton; 349bdd1243dSDimitry Andric } 350bdd1243dSDimitry Andric 351bdd1243dSDimitry Andric void ThreadedCommunication::SynchronizeWithReadThread() { 352bdd1243dSDimitry Andric // Only one thread can do the synchronization dance at a time. 353bdd1243dSDimitry Andric std::lock_guard<std::mutex> guard(m_synchronize_mutex); 354bdd1243dSDimitry Andric 355bdd1243dSDimitry Andric // First start listening for the synchronization event. 356bdd1243dSDimitry Andric ListenerSP listener_sp(Listener::MakeListener( 357bdd1243dSDimitry Andric "ThreadedCommunication::SyncronizeWithReadThread")); 358bdd1243dSDimitry Andric listener_sp->StartListeningForEvents(this, eBroadcastBitNoMorePendingInput); 359bdd1243dSDimitry Andric 360bdd1243dSDimitry Andric // If the thread is not running, there is no point in synchronizing. 361bdd1243dSDimitry Andric if (!m_read_thread_enabled || m_read_thread_did_exit) 362bdd1243dSDimitry Andric return; 363bdd1243dSDimitry Andric 364bdd1243dSDimitry Andric // Notify the read thread. 365bdd1243dSDimitry Andric m_connection_sp->InterruptRead(); 366bdd1243dSDimitry Andric 367bdd1243dSDimitry Andric // Wait for the synchronization event. 368bdd1243dSDimitry Andric EventSP event_sp; 369bdd1243dSDimitry Andric listener_sp->GetEvent(event_sp, std::nullopt); 370bdd1243dSDimitry Andric } 371bdd1243dSDimitry Andric 372bdd1243dSDimitry Andric void ThreadedCommunication::SetConnection( 373bdd1243dSDimitry Andric std::unique_ptr<Connection> connection) { 374bdd1243dSDimitry Andric StopReadThread(nullptr); 375bdd1243dSDimitry Andric Communication::SetConnection(std::move(connection)); 376bdd1243dSDimitry Andric } 377