1""" 2Test the functionality of interactive scripted processes 3""" 4 5import lldb 6import lldbsuite.test.lldbutil as lldbutil 7from lldbsuite.test.decorators import * 8from lldbsuite.test.lldbtest import * 9import json, os 10 11 12class TestInteractiveScriptedProcess(TestBase): 13 NO_DEBUG_INFO_TESTCASE = True 14 15 def setUp(self): 16 # Call super's setUp(). 17 TestBase.setUp(self) 18 # Build and load test program 19 self.build() 20 self.runCmd("file " + self.getBuildArtifact("a.out"), CURRENT_EXECUTABLE_SET) 21 self.main_source_file = lldb.SBFileSpec("main.cpp") 22 self.script_module = "interactive_scripted_process" 23 self.script_file = self.script_module + ".py" 24 25 # These tests are flakey and sometimes timeout. They work most of the time 26 # so the basic event flow is right, but somehow the handling is off. 27 @skipUnlessDarwin 28 @skipIfDarwin 29 def test_passthrough_launch(self): 30 """Test a simple pass-through process launch""" 31 self.passthrough_launch() 32 33 lldbutil.run_break_set_by_source_regexp(self, "also break here") 34 self.assertEqual(self.mux_target.GetNumBreakpoints(), 2) 35 error = self.mux_process.Continue() 36 self.assertSuccess(error, "Resuming multiplexer scripted process") 37 self.assertTrue(self.mux_process.IsValid(), "Got a valid process") 38 39 event = lldbutil.fetch_next_event( 40 self, self.dbg.GetListener(), self.mux_process.GetBroadcaster(), timeout=20 41 ) 42 self.assertState(lldb.SBProcess.GetStateFromEvent(event), lldb.eStateRunning) 43 event = lldbutil.fetch_next_event( 44 self, self.dbg.GetListener(), self.mux_process.GetBroadcaster() 45 ) 46 self.assertState(lldb.SBProcess.GetStateFromEvent(event), lldb.eStateStopped) 47 48 event = lldbutil.fetch_next_event( 49 self, self.mux_process_listener, self.mux_process.GetBroadcaster() 50 ) 51 self.assertState(lldb.SBProcess.GetStateFromEvent(event), lldb.eStateRunning) 52 event = lldbutil.fetch_next_event( 53 self, self.mux_process_listener, self.mux_process.GetBroadcaster() 54 ) 55 self.assertState(lldb.SBProcess.GetStateFromEvent(event), lldb.eStateStopped) 56 57 @skipUnlessDarwin 58 @skipIfDarwin 59 def test_multiplexed_launch(self): 60 """Test a multiple interactive scripted process debugging""" 61 self.passthrough_launch() 62 self.assertEqual(self.dbg.GetNumTargets(), 2) 63 64 driving_target = self.mux_process.GetScriptedImplementation().driving_target 65 self.assertTrue(driving_target.IsValid(), "Driving target is invalid") 66 67 # Create a target for the multiplexed even scripted process 68 even_target = self.duplicate_target(driving_target) 69 self.assertTrue( 70 even_target.IsValid(), 71 "Couldn't duplicate driving target to launch multiplexed even scripted process", 72 ) 73 74 class_name = f"{self.script_module}.MultiplexedScriptedProcess" 75 dictionary = {"driving_target_idx": self.dbg.GetIndexOfTarget(self.mux_target)} 76 77 dictionary["parity"] = 0 78 muxed_launch_info = self.get_launch_info(class_name, dictionary) 79 80 # Launch Even Child Scripted Process 81 error = lldb.SBError() 82 even_process = even_target.Launch(muxed_launch_info, error) 83 self.assertTrue( 84 even_process, "Couldn't launch multiplexed even scripted process" 85 ) 86 self.multiplex(even_process) 87 88 # Check that the even process started running 89 event = lldbutil.fetch_next_event( 90 self, self.dbg.GetListener(), even_process.GetBroadcaster() 91 ) 92 self.assertState(lldb.SBProcess.GetStateFromEvent(event), lldb.eStateRunning) 93 # Check that the even process stopped 94 event = lldbutil.fetch_next_event( 95 self, self.dbg.GetListener(), even_process.GetBroadcaster() 96 ) 97 self.assertState(lldb.SBProcess.GetStateFromEvent(event), lldb.eStateStopped) 98 99 self.assertTrue(even_process.IsValid(), "Got a valid process") 100 self.assertState( 101 even_process.GetState(), lldb.eStateStopped, "Process is stopped" 102 ) 103 104 # Create a target for the multiplexed odd scripted process 105 odd_target = self.duplicate_target(driving_target) 106 self.assertTrue( 107 odd_target.IsValid(), 108 "Couldn't duplicate driving target to launch multiplexed odd scripted process", 109 ) 110 111 dictionary["parity"] = 1 112 muxed_launch_info = self.get_launch_info(class_name, dictionary) 113 114 # Launch Odd Child Scripted Process 115 error = lldb.SBError() 116 odd_process = odd_target.Launch(muxed_launch_info, error) 117 self.assertTrue(odd_process, "Couldn't launch multiplexed odd scripted process") 118 self.multiplex(odd_process) 119 120 # Check that the odd process started running 121 event = lldbutil.fetch_next_event( 122 self, self.dbg.GetListener(), odd_process.GetBroadcaster() 123 ) 124 self.assertState(lldb.SBProcess.GetStateFromEvent(event), lldb.eStateRunning) 125 # Check that the odd process stopped 126 event = lldbutil.fetch_next_event( 127 self, self.dbg.GetListener(), odd_process.GetBroadcaster() 128 ) 129 self.assertState(lldb.SBProcess.GetStateFromEvent(event), lldb.eStateStopped) 130 131 self.assertTrue(odd_process.IsValid(), "Got a valid process") 132 self.assertState( 133 odd_process.GetState(), lldb.eStateStopped, "Process is stopped" 134 ) 135 136 # Set a breakpoint on the odd child process 137 bkpt = odd_target.BreakpointCreateBySourceRegex( 138 "also break here", self.main_source_file 139 ) 140 self.assertEqual(odd_target.GetNumBreakpoints(), 1) 141 self.assertTrue(bkpt, "Second breakpoint set on child scripted process") 142 self.assertEqual(bkpt.GetNumLocations(), 1, "Second breakpoint has 1 location") 143 144 # Verify that the breakpoint was also set on the multiplexer & real target 145 self.assertEqual(self.mux_target.GetNumBreakpoints(), 2) 146 bkpt = self.mux_target.GetBreakpointAtIndex(1) 147 self.assertEqual( 148 bkpt.GetNumLocations(), 1, "Second breakpoint set on mux scripted process" 149 ) 150 self.assertTrue(bkpt.MatchesName("multiplexed_scripted_process_421")) 151 152 self.assertGreater(driving_target.GetNumBreakpoints(), 1) 153 154 # Resume execution on child process 155 error = odd_process.Continue() 156 self.assertSuccess(error, "Resuming odd child scripted process") 157 self.assertTrue(odd_process.IsValid(), "Got a valid process") 158 159 # Since all the execution is asynchronous, the order in which events 160 # arrive is non-deterministic, so we need a data structure to make sure 161 # we received both the running and stopped event for each target. 162 163 # Initialize the execution event "bingo book", that maps a process index 164 # to a dictionary that contains flags that are not set for the process 165 # events that we care about (running & stopped) 166 167 execution_events = { 168 1: {lldb.eStateRunning: False, lldb.eStateStopped: False}, 169 2: {lldb.eStateRunning: False, lldb.eStateStopped: False}, 170 3: {lldb.eStateRunning: False, lldb.eStateStopped: False}, 171 } 172 173 def fetch_process_event(self, execution_events): 174 event = lldbutil.fetch_next_event( 175 self, 176 self.dbg.GetListener(), 177 lldb.SBProcess.GetBroadcasterClass(), 178 match_class=True, 179 ) 180 state = lldb.SBProcess.GetStateFromEvent(event) 181 self.assertIn(state, [lldb.eStateRunning, lldb.eStateStopped]) 182 event_process = lldb.SBProcess.GetProcessFromEvent(event) 183 self.assertTrue(event_process.IsValid()) 184 event_target = event_process.GetTarget() 185 event_target_idx = self.dbg.GetIndexOfTarget(event_target) 186 self.assertFalse( 187 execution_events[event_target_idx][state], 188 "Event already received for this process", 189 ) 190 execution_events[event_target_idx][state] = True 191 192 for _ in range((self.dbg.GetNumTargets() - 1) * 2): 193 fetch_process_event(self, execution_events) 194 195 for target_index, event_states in execution_events.items(): 196 for state, is_set in event_states.items(): 197 self.assertTrue(is_set, f"Target {target_index} has state {state} set") 198 199 event = lldbutil.fetch_next_event( 200 self, self.mux_process_listener, self.mux_process.GetBroadcaster() 201 ) 202 self.assertState(lldb.SBProcess.GetStateFromEvent(event), lldb.eStateRunning) 203 204 event = lldbutil.fetch_next_event( 205 self, self.mux_process_listener, self.mux_process.GetBroadcaster() 206 ) 207 self.assertState(lldb.SBProcess.GetStateFromEvent(event), lldb.eStateStopped) 208 209 def duplicate_target(self, driving_target): 210 exe = driving_target.executable.fullpath 211 triple = driving_target.triple 212 return self.dbg.CreateTargetWithFileAndTargetTriple(exe, triple) 213 214 def get_launch_info(self, class_name, script_dict): 215 structured_data = lldb.SBStructuredData() 216 structured_data.SetFromJSON(json.dumps(script_dict)) 217 218 launch_info = lldb.SBLaunchInfo(None) 219 launch_info.SetProcessPluginName("ScriptedProcess") 220 launch_info.SetScriptedProcessClassName(class_name) 221 launch_info.SetScriptedProcessDictionary(structured_data) 222 return launch_info 223 224 def multiplex(self, muxed_process): 225 muxed_process.GetScriptedImplementation().multiplexer = ( 226 self.mux_process.GetScriptedImplementation() 227 ) 228 self.mux_process.GetScriptedImplementation().multiplexed_processes[ 229 muxed_process.GetProcessID() 230 ] = muxed_process 231 232 def passthrough_launch(self): 233 """Test that a simple passthrough wrapper functions correctly""" 234 # First build the real target: 235 self.assertEqual(self.dbg.GetNumTargets(), 1) 236 real_target_id = 0 237 real_target = self.dbg.GetTargetAtIndex(real_target_id) 238 lldbutil.run_break_set_by_source_regexp(self, "Break here") 239 self.assertEqual(real_target.GetNumBreakpoints(), 1) 240 241 # Now source in the scripted module: 242 script_path = os.path.join(self.getSourceDir(), self.script_file) 243 self.runCmd(f"command script import '{script_path}'") 244 245 self.mux_target = self.duplicate_target(real_target) 246 self.assertTrue(self.mux_target.IsValid(), "duplicate target succeeded") 247 248 mux_class = f"{self.script_module}.MultiplexerScriptedProcess" 249 script_dict = {"driving_target_idx": real_target_id} 250 mux_launch_info = self.get_launch_info(mux_class, script_dict) 251 self.mux_process_listener = lldb.SBListener( 252 "lldb.test.interactive-scripted-process.listener" 253 ) 254 mux_launch_info.SetShadowListener(self.mux_process_listener) 255 256 self.dbg.SetAsync(True) 257 error = lldb.SBError() 258 self.mux_process = self.mux_target.Launch(mux_launch_info, error) 259 self.assertSuccess(error, "Launched multiplexer scripted process") 260 self.assertTrue(self.mux_process.IsValid(), "Got a valid process") 261 262 # Check that the real process started running 263 event = lldbutil.fetch_next_event( 264 self, self.dbg.GetListener(), self.mux_process.GetBroadcaster() 265 ) 266 self.assertState(lldb.SBProcess.GetStateFromEvent(event), lldb.eStateRunning) 267 # Check that the mux process started running 268 event = lldbutil.fetch_next_event( 269 self, self.mux_process_listener, self.mux_process.GetBroadcaster() 270 ) 271 self.assertState(lldb.SBProcess.GetStateFromEvent(event), lldb.eStateRunning) 272 273 # Check that the real process stopped 274 event = lldbutil.fetch_next_event( 275 self, self.dbg.GetListener(), self.mux_process.GetBroadcaster() 276 ) 277 self.assertState(lldb.SBProcess.GetStateFromEvent(event), lldb.eStateStopped) 278 # Check that the mux process stopped 279 event = lldbutil.fetch_next_event( 280 self, self.mux_process_listener, self.mux_process.GetBroadcaster() 281 ) 282 self.assertState(lldb.SBProcess.GetStateFromEvent(event), lldb.eStateStopped) 283 284 real_process = real_target.GetProcess() 285 self.assertTrue(real_process.IsValid(), "Got a valid process") 286 self.assertState( 287 real_process.GetState(), lldb.eStateStopped, "Process is stopped" 288 ) 289 290 # This is a passthrough, so the two processes should have the same state: 291 # Check that we got the right threads: 292 self.assertEqual( 293 len(real_process.threads), 294 len(self.mux_process.threads), 295 "Same number of threads", 296 ) 297 for id in range(len(real_process.threads)): 298 real_pc = real_process.threads[id].frame[0].pc 299 mux_pc = self.mux_process.threads[id].frame[0].pc 300 self.assertEqual(real_pc, mux_pc, f"PC's equal for {id}") 301