# Usage: # ./bin/lldb $LLVM/lldb/test/API/functionalities/interactive_scripted_process/main \ # -o "br set -p 'Break here'" \ # -o "command script import $LLVM/lldb/test/API/functionalities/interactive_scripted_process/interactive_scripted_process.py" \ # -o "create_mux" \ # -o "create_sub" \ # -o "br set -p 'also break here'" -o 'continue' import os, json, struct, signal, tempfile from threading import Thread from typing import Any, Dict import lldb from lldb.plugins.scripted_process import PassthroughScriptedProcess from lldb.plugins.scripted_process import PassthroughScriptedThread class MultiplexedScriptedProcess(PassthroughScriptedProcess): def __init__(self, exe_ctx: lldb.SBExecutionContext, args: lldb.SBStructuredData): super().__init__(exe_ctx, args) self.multiplexer = None if isinstance(self.driving_process, lldb.SBProcess) and self.driving_process: parity = args.GetValueForKey("parity") # TODO: Change to Walrus operator (:=) with oneline if assignment # Requires python 3.8 val = parity.GetUnsignedIntegerValue() if val is not None: self.parity = val # Turn PassthroughScriptedThread into MultiplexedScriptedThread for thread in self.threads.values(): thread.__class__ = MultiplexedScriptedThread def get_process_id(self) -> int: return self.parity + 420 def launch(self, should_stop: bool = True) -> lldb.SBError: self.first_launch = True return lldb.SBError() def resume(self, should_stop: bool) -> lldb.SBError: if self.first_launch: self.first_launch = False return super().resume() else: if not self.multiplexer: error = lldb.SBError("Multiplexer is not set.") return error return self.multiplexer.resume(should_stop) def get_threads_info(self) -> Dict[int, Any]: if not self.multiplexer: return super().get_threads_info() filtered_threads = self.multiplexer.get_threads_info(pid=self.get_process_id()) # Update the filtered thread class from PassthroughScriptedThread to MultiplexedScriptedThread return dict( map( lambda pair: (pair[0], MultiplexedScriptedThread(pair[1])), filtered_threads.items(), ) ) def create_breakpoint(self, addr, error, pid=None): if not self.multiplexer: error.SetErrorString("Multiplexer is not set.") return self.multiplexer.create_breakpoint(addr, error, self.get_process_id()) def get_scripted_thread_plugin(self) -> str: return f"{MultiplexedScriptedThread.__module__}.{MultiplexedScriptedThread.__name__}" class MultiplexedScriptedThread(PassthroughScriptedThread): def get_name(self) -> str: parity = "Odd" if self.scripted_process.parity % 2 else "Even" return f"{parity}{MultiplexedScriptedThread.__name__}.thread-{self.idx}" class MultiplexerScriptedProcess(PassthroughScriptedProcess): listener = None multiplexed_processes = None def wait_for_driving_process_to_stop(self): def handle_process_state_event(): # Update multiplexer process log("Updating interactive scripted process threads") dbg = self.driving_target.GetDebugger() new_driving_thread_ids = [] for driving_thread in self.driving_process: new_driving_thread_ids.append(driving_thread.id) log(f"{len(self.threads)} New thread {hex(driving_thread.id)}") structured_data = lldb.SBStructuredData() structured_data.SetFromJSON( json.dumps( { "driving_target_idx": dbg.GetIndexOfTarget( self.driving_target ), "thread_idx": driving_thread.GetIndexID(), } ) ) self.threads[driving_thread.id] = PassthroughScriptedThread( self, structured_data ) for thread_id in self.threads: if thread_id not in new_driving_thread_ids: log(f"Removing old thread {hex(thread_id)}") del self.threads[thread_id] print(f"New thread count: {len(self.threads)}") mux_process = self.target.GetProcess() mux_process.ForceScriptedState(lldb.eStateRunning) mux_process.ForceScriptedState(lldb.eStateStopped) for child_process in self.multiplexed_processes.values(): child_process.ForceScriptedState(lldb.eStateRunning) child_process.ForceScriptedState(lldb.eStateStopped) event = lldb.SBEvent() while True: if not self.driving_process: continue if self.listener.WaitForEvent(1, event): event_mask = event.GetType() if event_mask & lldb.SBProcess.eBroadcastBitStateChanged: state = lldb.SBProcess.GetStateFromEvent(event) log(f"Received public process state event: {state}") if state == lldb.eStateStopped: # If it's a stop event, iterate over the driving process # thread, looking for a breakpoint stop reason, if internal # continue. handle_process_state_event() else: continue def __init__(self, exe_ctx: lldb.SBExecutionContext, args: lldb.SBStructuredData): super().__init__(exe_ctx, args, launched_driving_process=False) if isinstance(self.driving_target, lldb.SBTarget) and self.driving_target: self.listener = lldb.SBListener( "lldb.listener.multiplexer-scripted-process" ) self.multiplexed_processes = {} # Copy breakpoints from real target to passthrough with tempfile.NamedTemporaryFile() as tf: bkpt_file = lldb.SBFileSpec(tf.name) error = self.driving_target.BreakpointsWriteToFile(bkpt_file) if error.Fail(): log( "Failed to save breakpoints from driving target (%s)" % error.GetCString() ) bkpts_list = lldb.SBBreakpointList(self.target) error = self.target.BreakpointsCreateFromFile(bkpt_file, bkpts_list) if error.Fail(): log( "Failed create breakpoints from driving target \ (bkpt file: %s)" % tf.name ) # Copy breakpoint from passthrough to real target if error.Success(): self.driving_target.DeleteAllBreakpoints() for bkpt in self.target.breakpoints: if bkpt.IsValid(): for bl in bkpt: real_bpkt = self.driving_target.BreakpointCreateBySBAddress( bl.GetAddress() ) if not real_bpkt.IsValid(): log( "Failed to set breakpoint at address %s in \ driving target" % hex(bl.GetLoadAddress()) ) self.listener_thread = Thread( target=self.wait_for_driving_process_to_stop, daemon=True ) self.listener_thread.start() def launch(self, should_stop: bool = True) -> lldb.SBError: if not self.driving_target: return lldb.SBError( f"{self.__class__.__name__}.resume: Invalid driving target." ) if self.driving_process: return lldb.SBError( f"{self.__class__.__name__}.resume: Invalid driving process." ) error = lldb.SBError() launch_info = lldb.SBLaunchInfo(None) launch_info.SetListener(self.listener) driving_process = self.driving_target.Launch(launch_info, error) if not driving_process or error.Fail(): return error self.driving_process = driving_process for module in self.driving_target.modules: path = module.file.fullpath load_addr = module.GetObjectFileHeaderAddress().GetLoadAddress( self.driving_target ) self.loaded_images.append({"path": path, "load_addr": load_addr}) self.first_resume = True return error def resume(self, should_stop: bool = True) -> lldb.SBError: if self.first_resume: # When we resume the multiplexer process for the first time, # we shouldn't do anything because lldb's execution machinery # will resume the driving process by itself. # Also, no need to update the multiplexer scripted process state # here because since it's listening for the real process stop events. # Once it receives the stop event from the driving process, # `wait_for_driving_process_to_stop` will update the multiplexer # state for us. self.first_resume = False return lldb.SBError() if not self.driving_process: return lldb.SBError( f"{self.__class__.__name__}.resume: Invalid driving process." ) return self.driving_process.Continue() def get_threads_info(self, pid: int = None) -> Dict[int, Any]: if not pid: return super().get_threads_info() parity = pid % 2 return dict(filter(lambda pair: pair[0] % 2 == parity, self.threads.items())) def create_breakpoint(self, addr, error, pid=None): if not self.driving_target: error.SetErrorString("%s has no driving target." % self.__class__.__name__) return False def create_breakpoint_with_name(target, load_addr, name, error): addr = lldb.SBAddress(load_addr, target) if not addr.IsValid(): error.SetErrorString("Invalid breakpoint address %s" % hex(load_addr)) return False bkpt = target.BreakpointCreateBySBAddress(addr) if not bkpt.IsValid(): error.SetErrorString( "Failed to create breakpoint at address %s" % hex(addr.GetLoadAddress()) ) return False error = bkpt.AddNameWithErrorHandling(name) return error.Success() name = ( "multiplexer_scripted_process" if not pid else f"multiplexed_scripted_process_{pid}" ) if pid is not None: # This means that this method has been called from one of the # multiplexed scripted process. That also means that the multiplexer # target doesn't have this breakpoint created. mux_error = lldb.SBError() bkpt = create_breakpoint_with_name(self.target, addr, name, mux_error) if mux_error.Fail(): error.SetError( "Failed to create breakpoint in multiplexer \ target: %s" % mux_error.GetCString() ) return False return create_breakpoint_with_name(self.driving_target, addr, name, error) def multiplex(mux_process, muxed_process): muxed_process.GetScriptedImplementation().multiplexer = ( mux_process.GetScriptedImplementation() ) mux_process.GetScriptedImplementation().multiplexed_processes[ muxed_process.GetProcessID() ] = muxed_process def launch_scripted_process(target, class_name, dictionary): structured_data = lldb.SBStructuredData() structured_data.SetFromJSON(json.dumps(dictionary)) launch_info = lldb.SBLaunchInfo(None) launch_info.SetProcessPluginName("ScriptedProcess") launch_info.SetScriptedProcessClassName(class_name) launch_info.SetScriptedProcessDictionary(structured_data) error = lldb.SBError() return target.Launch(launch_info, error) def duplicate_target(driving_target): error = lldb.SBError() exe = driving_target.executable.fullpath triple = driving_target.triple debugger = driving_target.GetDebugger() return debugger.CreateTargetWithFileAndTargetTriple(exe, triple) def create_mux_process(debugger, command, exe_ctx, result, dict): if not debugger.GetNumTargets() > 0: return result.SetError( "Interactive scripted processes requires one non scripted process." ) debugger.SetAsync(True) driving_target = debugger.GetSelectedTarget() if not driving_target: return result.SetError("Driving target is invalid") # Create a seconde target for the multiplexer scripted process mux_target = duplicate_target(driving_target) if not mux_target: return result.SetError( "Couldn't duplicate driving target to launch multiplexer scripted process" ) class_name = f"{__name__}.{MultiplexerScriptedProcess.__name__}" dictionary = {"driving_target_idx": debugger.GetIndexOfTarget(driving_target)} mux_process = launch_scripted_process(mux_target, class_name, dictionary) if not mux_process: return result.SetError("Couldn't launch multiplexer scripted process") def create_child_processes(debugger, command, exe_ctx, result, dict): if not debugger.GetNumTargets() >= 2: return result.SetError("Scripted Multiplexer process not setup") debugger.SetAsync(True) # Create a seconde target for the multiplexer scripted process mux_target = debugger.GetSelectedTarget() if not mux_target: return result.SetError("Couldn't get multiplexer scripted process target") mux_process = mux_target.GetProcess() if not mux_process: return result.SetError("Couldn't get multiplexer scripted process") driving_target = mux_process.GetScriptedImplementation().driving_target if not driving_target: return result.SetError("Driving target is invalid") # Create a target for the multiplexed even scripted process even_target = duplicate_target(driving_target) if not even_target: return result.SetError( "Couldn't duplicate driving target to launch multiplexed even scripted process" ) class_name = f"{__name__}.{MultiplexedScriptedProcess.__name__}" dictionary = {"driving_target_idx": debugger.GetIndexOfTarget(mux_target)} dictionary["parity"] = 0 even_process = launch_scripted_process(even_target, class_name, dictionary) if not even_process: return result.SetError("Couldn't launch multiplexed even scripted process") multiplex(mux_process, even_process) # Create a target for the multiplexed odd scripted process odd_target = duplicate_target(driving_target) if not odd_target: return result.SetError( "Couldn't duplicate driving target to launch multiplexed odd scripted process" ) dictionary["parity"] = 1 odd_process = launch_scripted_process(odd_target, class_name, dictionary) if not odd_process: return result.SetError("Couldn't launch multiplexed odd scripted process") multiplex(mux_process, odd_process) def log(message): # FIXME: For now, we discard the log message until we can pass it to an lldb # logging channel. should_log = False if should_log: print(message) def __lldb_init_module(dbg, dict): dbg.HandleCommand( "command script add -o -f interactive_scripted_process.create_mux_process create_mux" ) dbg.HandleCommand( "command script add -o -f interactive_scripted_process.create_child_processes create_sub" )