1""" 2Test the functionality of interactive scripted processes 3""" 4 5import lldb 6import lldbsuite.test.lldbutil as lldbutil 7from lldbsuite.test.decorators import * 8from lldbsuite.test.lldbtest import * 9import json, os 10 11 12class TestInteractiveScriptedProcess(TestBase): 13 14 NO_DEBUG_INFO_TESTCASE = True 15 16 def setUp(self): 17 # Call super's setUp(). 18 TestBase.setUp(self) 19 # Build and load test program 20 self.build() 21 self.runCmd("file " + self.getBuildArtifact("a.out"), CURRENT_EXECUTABLE_SET) 22 self.main_source_file = lldb.SBFileSpec("main.cpp") 23 self.script_module = "interactive_scripted_process" 24 self.script_file = self.script_module + ".py" 25 26 @skipUnlessDarwin 27 def test_passthrough_launch(self): 28 """Test a simple pass-through process launch""" 29 self.passthrough_launch() 30 31 lldbutil.run_break_set_by_source_regexp(self, "also break here") 32 self.assertEqual(self.mux_target.GetNumBreakpoints(), 2) 33 error = self.mux_process.Continue() 34 self.assertSuccess(error, "Resuming multiplexer scripted process") 35 self.assertTrue(self.mux_process.IsValid(), "Got a valid process") 36 37 event = lldbutil.fetch_next_event( 38 self, self.mux_process_listener, self.mux_process.GetBroadcaster() 39 ) 40 self.assertState(lldb.SBProcess.GetStateFromEvent(event), lldb.eStateRunning) 41 event = lldbutil.fetch_next_event( 42 self, self.mux_process_listener, self.mux_process.GetBroadcaster() 43 ) 44 self.assertState(lldb.SBProcess.GetStateFromEvent(event), lldb.eStateStopped) 45 46 @skipUnlessDarwin 47 def test_multiplexed_launch(self): 48 """Test a multiple interactive scripted process debugging""" 49 self.passthrough_launch() 50 self.assertEqual(self.dbg.GetNumTargets(), 2) 51 52 driving_target = self.mux_process.GetScriptedImplementation().driving_target 53 self.assertTrue(driving_target.IsValid(), "Driving target is invalid") 54 55 # Create a target for the multiplexed even scripted process 56 even_target = self.duplicate_target(driving_target) 57 self.assertTrue( 58 even_target.IsValid(), 59 "Couldn't duplicate driving target to launch multiplexed even scripted process", 60 ) 61 62 class_name = f"{self.script_module}.MultiplexedScriptedProcess" 63 dictionary = {"driving_target_idx": self.dbg.GetIndexOfTarget(self.mux_target)} 64 65 dictionary["parity"] = 0 66 muxed_launch_info = self.get_launch_info(class_name, dictionary) 67 68 # Launch Even Child Scripted Process 69 error = lldb.SBError() 70 even_process = even_target.Launch(muxed_launch_info, error) 71 self.assertTrue( 72 even_process, "Couldn't launch multiplexed even scripted process" 73 ) 74 self.multiplex(even_process) 75 76 # Check that the even process started running 77 event = lldbutil.fetch_next_event( 78 self, self.dbg.GetListener(), even_process.GetBroadcaster() 79 ) 80 self.assertState(lldb.SBProcess.GetStateFromEvent(event), lldb.eStateRunning) 81 # Check that the even process stopped 82 event = lldbutil.fetch_next_event( 83 self, self.dbg.GetListener(), even_process.GetBroadcaster() 84 ) 85 self.assertState(lldb.SBProcess.GetStateFromEvent(event), lldb.eStateStopped) 86 87 self.assertTrue(even_process.IsValid(), "Got a valid process") 88 self.assertState( 89 even_process.GetState(), lldb.eStateStopped, "Process is stopped" 90 ) 91 92 # Create a target for the multiplexed odd scripted process 93 odd_target = self.duplicate_target(driving_target) 94 self.assertTrue( 95 odd_target.IsValid(), 96 "Couldn't duplicate driving target to launch multiplexed odd scripted process", 97 ) 98 99 dictionary["parity"] = 1 100 muxed_launch_info = self.get_launch_info(class_name, dictionary) 101 102 # Launch Odd Child Scripted Process 103 error = lldb.SBError() 104 odd_process = odd_target.Launch(muxed_launch_info, error) 105 self.assertTrue(odd_process, "Couldn't launch multiplexed odd scripted process") 106 self.multiplex(odd_process) 107 108 # Check that the odd process started running 109 event = lldbutil.fetch_next_event( 110 self, self.dbg.GetListener(), odd_process.GetBroadcaster() 111 ) 112 self.assertState(lldb.SBProcess.GetStateFromEvent(event), lldb.eStateRunning) 113 # Check that the odd process stopped 114 event = lldbutil.fetch_next_event( 115 self, self.dbg.GetListener(), odd_process.GetBroadcaster() 116 ) 117 self.assertState(lldb.SBProcess.GetStateFromEvent(event), lldb.eStateStopped) 118 119 self.assertTrue(odd_process.IsValid(), "Got a valid process") 120 self.assertState( 121 odd_process.GetState(), lldb.eStateStopped, "Process is stopped" 122 ) 123 124 # Set a breakpoint on the odd child process 125 bkpt = odd_target.BreakpointCreateBySourceRegex( 126 "also break here", self.main_source_file 127 ) 128 self.assertEqual(odd_target.GetNumBreakpoints(), 1) 129 self.assertTrue(bkpt, "Second breakpoint set on child scripted process") 130 self.assertEqual(bkpt.GetNumLocations(), 1, "Second breakpoint has 1 location") 131 132 # Verify that the breakpoint was also set on the multiplexer & real target 133 self.assertEqual(self.mux_target.GetNumBreakpoints(), 2) 134 bkpt = self.mux_target.GetBreakpointAtIndex(1) 135 self.assertEqual( 136 bkpt.GetNumLocations(), 1, "Second breakpoint set on mux scripted process" 137 ) 138 self.assertTrue(bkpt.MatchesName("multiplexed_scripted_process_421")) 139 140 self.assertGreater(driving_target.GetNumBreakpoints(), 1) 141 142 # Resume execution on child process 143 error = odd_process.Continue() 144 self.assertSuccess(error, "Resuming odd child scripted process") 145 self.assertTrue(odd_process.IsValid(), "Got a valid process") 146 147 # Since all the execution is asynchronous, the order in which events 148 # arrive is non-deterministic, so we need a data structure to make sure 149 # we received both the running and stopped event for each target. 150 151 # Initialize the execution event "bingo book", that maps a process index 152 # to a dictionary that contains flags that are not set for the process 153 # events that we care about (running & stopped) 154 155 execution_events = { 156 1: {lldb.eStateRunning: False, lldb.eStateStopped: False}, 157 2: {lldb.eStateRunning: False, lldb.eStateStopped: False}, 158 3: {lldb.eStateRunning: False, lldb.eStateStopped: False}, 159 } 160 161 def fetch_process_event(self, execution_events): 162 event = lldbutil.fetch_next_event( 163 self, 164 self.dbg.GetListener(), 165 lldb.SBProcess.GetBroadcasterClass(), 166 match_class=True, 167 ) 168 state = lldb.SBProcess.GetStateFromEvent(event) 169 self.assertIn(state, [lldb.eStateRunning, lldb.eStateStopped]) 170 event_process = lldb.SBProcess.GetProcessFromEvent(event) 171 self.assertTrue(event_process.IsValid()) 172 event_target = event_process.GetTarget() 173 event_target_idx = self.dbg.GetIndexOfTarget(event_target) 174 self.assertFalse( 175 execution_events[event_target_idx][state], 176 "Event already received for this process", 177 ) 178 execution_events[event_target_idx][state] = True 179 180 event = lldbutil.fetch_next_event( 181 self, self.mux_process_listener, self.mux_process.GetBroadcaster() 182 ) 183 self.assertState(lldb.SBProcess.GetStateFromEvent(event), lldb.eStateRunning) 184 185 event = lldbutil.fetch_next_event( 186 self, self.mux_process_listener, self.mux_process.GetBroadcaster() 187 ) 188 self.assertState(lldb.SBProcess.GetStateFromEvent(event), lldb.eStateStopped) 189 190 for _ in range((self.dbg.GetNumTargets() - 1) * 2): 191 fetch_process_event(self, execution_events) 192 193 for target_index, event_states in execution_events.items(): 194 for state, is_set in event_states.items(): 195 self.assertTrue(is_set, f"Target {target_index} has state {state} set") 196 197 def duplicate_target(self, driving_target): 198 exe = driving_target.executable.fullpath 199 triple = driving_target.triple 200 return self.dbg.CreateTargetWithFileAndTargetTriple(exe, triple) 201 202 def get_launch_info(self, class_name, script_dict): 203 structured_data = lldb.SBStructuredData() 204 structured_data.SetFromJSON(json.dumps(script_dict)) 205 206 launch_info = lldb.SBLaunchInfo(None) 207 launch_info.SetProcessPluginName("ScriptedProcess") 208 launch_info.SetScriptedProcessClassName(class_name) 209 launch_info.SetScriptedProcessDictionary(structured_data) 210 return launch_info 211 212 def multiplex(self, muxed_process): 213 muxed_process.GetScriptedImplementation().multiplexer = ( 214 self.mux_process.GetScriptedImplementation() 215 ) 216 self.mux_process.GetScriptedImplementation().multiplexed_processes[ 217 muxed_process.GetProcessID() 218 ] = muxed_process 219 220 def passthrough_launch(self): 221 """Test that a simple passthrough wrapper functions correctly""" 222 # First build the real target: 223 self.assertEqual(self.dbg.GetNumTargets(), 1) 224 real_target_id = 0 225 real_target = self.dbg.GetTargetAtIndex(real_target_id) 226 lldbutil.run_break_set_by_source_regexp(self, "Break here") 227 self.assertEqual(real_target.GetNumBreakpoints(), 1) 228 229 # Now source in the scripted module: 230 script_path = os.path.join(self.getSourceDir(), self.script_file) 231 self.runCmd(f"command script import '{script_path}'") 232 233 self.mux_target = self.duplicate_target(real_target) 234 self.assertTrue(self.mux_target.IsValid(), "duplicate target succeeded") 235 236 mux_class = f"{self.script_module}.MultiplexerScriptedProcess" 237 script_dict = {"driving_target_idx": real_target_id} 238 mux_launch_info = self.get_launch_info(mux_class, script_dict) 239 self.mux_process_listener = lldb.SBListener( 240 "lldb.test.interactive-scripted-process.listener" 241 ) 242 mux_launch_info.SetShadowListener(self.mux_process_listener) 243 244 self.dbg.SetAsync(True) 245 error = lldb.SBError() 246 self.mux_process = self.mux_target.Launch(mux_launch_info, error) 247 self.assertSuccess(error, "Launched multiplexer scripted process") 248 self.assertTrue(self.mux_process.IsValid(), "Got a valid process") 249 250 # Check that the mux process started running 251 event = lldbutil.fetch_next_event( 252 self, self.mux_process_listener, self.mux_process.GetBroadcaster() 253 ) 254 self.assertState(lldb.SBProcess.GetStateFromEvent(event), lldb.eStateRunning) 255 # Check that the real process started running 256 event = lldbutil.fetch_next_event( 257 self, self.dbg.GetListener(), self.mux_process.GetBroadcaster() 258 ) 259 self.assertState(lldb.SBProcess.GetStateFromEvent(event), lldb.eStateRunning) 260 261 # Check that the real process stopped 262 event = lldbutil.fetch_next_event( 263 self, self.dbg.GetListener(), self.mux_process.GetBroadcaster() 264 ) 265 self.assertState(lldb.SBProcess.GetStateFromEvent(event), lldb.eStateStopped) 266 # Check that the mux process stopped 267 event = lldbutil.fetch_next_event( 268 self, self.mux_process_listener, self.mux_process.GetBroadcaster() 269 ) 270 self.assertState(lldb.SBProcess.GetStateFromEvent(event), lldb.eStateStopped) 271 272 real_process = real_target.GetProcess() 273 self.assertTrue(real_process.IsValid(), "Got a valid process") 274 self.assertState( 275 real_process.GetState(), lldb.eStateStopped, "Process is stopped" 276 ) 277 278 # This is a passthrough, so the two processes should have the same state: 279 # Check that we got the right threads: 280 self.assertEqual( 281 len(real_process.threads), 282 len(self.mux_process.threads), 283 "Same number of threads", 284 ) 285 for id in range(len(real_process.threads)): 286 real_pc = real_process.threads[id].frame[0].pc 287 mux_pc = self.mux_process.threads[id].frame[0].pc 288 self.assertEqual(real_pc, mux_pc, f"PC's equal for {id}") 289