1""" 2Test the functionality of interactive scripted processes 3""" 4 5import lldb 6import lldbsuite.test.lldbutil as lldbutil 7from lldbsuite.test.decorators import * 8from lldbsuite.test.lldbtest import * 9import json, os 10 11 12class TestInteractiveScriptedProcess(TestBase): 13 NO_DEBUG_INFO_TESTCASE = True 14 15 def setUp(self): 16 # Call super's setUp(). 17 TestBase.setUp(self) 18 # Build and load test program 19 self.build() 20 self.runCmd("file " + self.getBuildArtifact("a.out"), CURRENT_EXECUTABLE_SET) 21 self.main_source_file = lldb.SBFileSpec("main.cpp") 22 self.script_module = "interactive_scripted_process" 23 self.script_file = self.script_module + ".py" 24 25 @skipUnlessDarwin 26 @expectedFailureDarwin 27 def test_passthrough_launch(self): 28 """Test a simple pass-through process launch""" 29 self.passthrough_launch() 30 31 lldbutil.run_break_set_by_source_regexp(self, "also break here") 32 self.assertEqual(self.mux_target.GetNumBreakpoints(), 2) 33 error = self.mux_process.Continue() 34 self.assertSuccess(error, "Resuming multiplexer scripted process") 35 self.assertTrue(self.mux_process.IsValid(), "Got a valid process") 36 37 event = lldbutil.fetch_next_event( 38 self, self.mux_process_listener, self.mux_process.GetBroadcaster() 39 ) 40 self.assertState(lldb.SBProcess.GetStateFromEvent(event), lldb.eStateRunning) 41 event = lldbutil.fetch_next_event( 42 self, self.mux_process_listener, self.mux_process.GetBroadcaster() 43 ) 44 self.assertState(lldb.SBProcess.GetStateFromEvent(event), lldb.eStateStopped) 45 46 @skipUnlessDarwin 47 @expectedFailureDarwin 48 def test_multiplexed_launch(self): 49 """Test a multiple interactive scripted process debugging""" 50 self.passthrough_launch() 51 self.assertEqual(self.dbg.GetNumTargets(), 2) 52 53 driving_target = self.mux_process.GetScriptedImplementation().driving_target 54 self.assertTrue(driving_target.IsValid(), "Driving target is invalid") 55 56 # Create a target for the multiplexed even scripted process 57 even_target = self.duplicate_target(driving_target) 58 self.assertTrue( 59 even_target.IsValid(), 60 "Couldn't duplicate driving target to launch multiplexed even scripted process", 61 ) 62 63 class_name = f"{self.script_module}.MultiplexedScriptedProcess" 64 dictionary = {"driving_target_idx": self.dbg.GetIndexOfTarget(self.mux_target)} 65 66 dictionary["parity"] = 0 67 muxed_launch_info = self.get_launch_info(class_name, dictionary) 68 69 # Launch Even Child Scripted Process 70 error = lldb.SBError() 71 even_process = even_target.Launch(muxed_launch_info, error) 72 self.assertTrue( 73 even_process, "Couldn't launch multiplexed even scripted process" 74 ) 75 self.multiplex(even_process) 76 77 # Check that the even process started running 78 event = lldbutil.fetch_next_event( 79 self, self.dbg.GetListener(), even_process.GetBroadcaster() 80 ) 81 self.assertState(lldb.SBProcess.GetStateFromEvent(event), lldb.eStateRunning) 82 # Check that the even process stopped 83 event = lldbutil.fetch_next_event( 84 self, self.dbg.GetListener(), even_process.GetBroadcaster() 85 ) 86 self.assertState(lldb.SBProcess.GetStateFromEvent(event), lldb.eStateStopped) 87 88 self.assertTrue(even_process.IsValid(), "Got a valid process") 89 self.assertState( 90 even_process.GetState(), lldb.eStateStopped, "Process is stopped" 91 ) 92 93 # Create a target for the multiplexed odd scripted process 94 odd_target = self.duplicate_target(driving_target) 95 self.assertTrue( 96 odd_target.IsValid(), 97 "Couldn't duplicate driving target to launch multiplexed odd scripted process", 98 ) 99 100 dictionary["parity"] = 1 101 muxed_launch_info = self.get_launch_info(class_name, dictionary) 102 103 # Launch Odd Child Scripted Process 104 error = lldb.SBError() 105 odd_process = odd_target.Launch(muxed_launch_info, error) 106 self.assertTrue(odd_process, "Couldn't launch multiplexed odd scripted process") 107 self.multiplex(odd_process) 108 109 # Check that the odd process started running 110 event = lldbutil.fetch_next_event( 111 self, self.dbg.GetListener(), odd_process.GetBroadcaster() 112 ) 113 self.assertState(lldb.SBProcess.GetStateFromEvent(event), lldb.eStateRunning) 114 # Check that the odd process stopped 115 event = lldbutil.fetch_next_event( 116 self, self.dbg.GetListener(), odd_process.GetBroadcaster() 117 ) 118 self.assertState(lldb.SBProcess.GetStateFromEvent(event), lldb.eStateStopped) 119 120 self.assertTrue(odd_process.IsValid(), "Got a valid process") 121 self.assertState( 122 odd_process.GetState(), lldb.eStateStopped, "Process is stopped" 123 ) 124 125 # Set a breakpoint on the odd child process 126 bkpt = odd_target.BreakpointCreateBySourceRegex( 127 "also break here", self.main_source_file 128 ) 129 self.assertEqual(odd_target.GetNumBreakpoints(), 1) 130 self.assertTrue(bkpt, "Second breakpoint set on child scripted process") 131 self.assertEqual(bkpt.GetNumLocations(), 1, "Second breakpoint has 1 location") 132 133 # Verify that the breakpoint was also set on the multiplexer & real target 134 self.assertEqual(self.mux_target.GetNumBreakpoints(), 2) 135 bkpt = self.mux_target.GetBreakpointAtIndex(1) 136 self.assertEqual( 137 bkpt.GetNumLocations(), 1, "Second breakpoint set on mux scripted process" 138 ) 139 self.assertTrue(bkpt.MatchesName("multiplexed_scripted_process_421")) 140 141 self.assertGreater(driving_target.GetNumBreakpoints(), 1) 142 143 # Resume execution on child process 144 error = odd_process.Continue() 145 self.assertSuccess(error, "Resuming odd child scripted process") 146 self.assertTrue(odd_process.IsValid(), "Got a valid process") 147 148 # Since all the execution is asynchronous, the order in which events 149 # arrive is non-deterministic, so we need a data structure to make sure 150 # we received both the running and stopped event for each target. 151 152 # Initialize the execution event "bingo book", that maps a process index 153 # to a dictionary that contains flags that are not set for the process 154 # events that we care about (running & stopped) 155 156 execution_events = { 157 1: {lldb.eStateRunning: False, lldb.eStateStopped: False}, 158 2: {lldb.eStateRunning: False, lldb.eStateStopped: False}, 159 3: {lldb.eStateRunning: False, lldb.eStateStopped: False}, 160 } 161 162 def fetch_process_event(self, execution_events): 163 event = lldbutil.fetch_next_event( 164 self, 165 self.dbg.GetListener(), 166 lldb.SBProcess.GetBroadcasterClass(), 167 match_class=True, 168 ) 169 state = lldb.SBProcess.GetStateFromEvent(event) 170 self.assertIn(state, [lldb.eStateRunning, lldb.eStateStopped]) 171 event_process = lldb.SBProcess.GetProcessFromEvent(event) 172 self.assertTrue(event_process.IsValid()) 173 event_target = event_process.GetTarget() 174 event_target_idx = self.dbg.GetIndexOfTarget(event_target) 175 self.assertFalse( 176 execution_events[event_target_idx][state], 177 "Event already received for this process", 178 ) 179 execution_events[event_target_idx][state] = True 180 181 event = lldbutil.fetch_next_event( 182 self, self.mux_process_listener, self.mux_process.GetBroadcaster() 183 ) 184 self.assertState(lldb.SBProcess.GetStateFromEvent(event), lldb.eStateRunning) 185 186 event = lldbutil.fetch_next_event( 187 self, self.mux_process_listener, self.mux_process.GetBroadcaster() 188 ) 189 self.assertState(lldb.SBProcess.GetStateFromEvent(event), lldb.eStateStopped) 190 191 for _ in range((self.dbg.GetNumTargets() - 1) * 2): 192 fetch_process_event(self, execution_events) 193 194 for target_index, event_states in execution_events.items(): 195 for state, is_set in event_states.items(): 196 self.assertTrue(is_set, f"Target {target_index} has state {state} set") 197 198 def duplicate_target(self, driving_target): 199 exe = driving_target.executable.fullpath 200 triple = driving_target.triple 201 return self.dbg.CreateTargetWithFileAndTargetTriple(exe, triple) 202 203 def get_launch_info(self, class_name, script_dict): 204 structured_data = lldb.SBStructuredData() 205 structured_data.SetFromJSON(json.dumps(script_dict)) 206 207 launch_info = lldb.SBLaunchInfo(None) 208 launch_info.SetProcessPluginName("ScriptedProcess") 209 launch_info.SetScriptedProcessClassName(class_name) 210 launch_info.SetScriptedProcessDictionary(structured_data) 211 return launch_info 212 213 def multiplex(self, muxed_process): 214 muxed_process.GetScriptedImplementation().multiplexer = ( 215 self.mux_process.GetScriptedImplementation() 216 ) 217 self.mux_process.GetScriptedImplementation().multiplexed_processes[ 218 muxed_process.GetProcessID() 219 ] = muxed_process 220 221 def passthrough_launch(self): 222 """Test that a simple passthrough wrapper functions correctly""" 223 # First build the real target: 224 self.assertEqual(self.dbg.GetNumTargets(), 1) 225 real_target_id = 0 226 real_target = self.dbg.GetTargetAtIndex(real_target_id) 227 lldbutil.run_break_set_by_source_regexp(self, "Break here") 228 self.assertEqual(real_target.GetNumBreakpoints(), 1) 229 230 # Now source in the scripted module: 231 script_path = os.path.join(self.getSourceDir(), self.script_file) 232 self.runCmd(f"command script import '{script_path}'") 233 234 self.mux_target = self.duplicate_target(real_target) 235 self.assertTrue(self.mux_target.IsValid(), "duplicate target succeeded") 236 237 mux_class = f"{self.script_module}.MultiplexerScriptedProcess" 238 script_dict = {"driving_target_idx": real_target_id} 239 mux_launch_info = self.get_launch_info(mux_class, script_dict) 240 self.mux_process_listener = lldb.SBListener( 241 "lldb.test.interactive-scripted-process.listener" 242 ) 243 mux_launch_info.SetShadowListener(self.mux_process_listener) 244 245 self.dbg.SetAsync(True) 246 error = lldb.SBError() 247 self.mux_process = self.mux_target.Launch(mux_launch_info, error) 248 self.assertSuccess(error, "Launched multiplexer scripted process") 249 self.assertTrue(self.mux_process.IsValid(), "Got a valid process") 250 251 # Check that the mux process started running 252 event = lldbutil.fetch_next_event( 253 self, self.mux_process_listener, self.mux_process.GetBroadcaster() 254 ) 255 self.assertState(lldb.SBProcess.GetStateFromEvent(event), lldb.eStateRunning) 256 # Check that the real process started running 257 event = lldbutil.fetch_next_event( 258 self, self.dbg.GetListener(), self.mux_process.GetBroadcaster() 259 ) 260 self.assertState(lldb.SBProcess.GetStateFromEvent(event), lldb.eStateRunning) 261 262 # Check that the real process stopped 263 event = lldbutil.fetch_next_event( 264 self, self.dbg.GetListener(), self.mux_process.GetBroadcaster() 265 ) 266 self.assertState(lldb.SBProcess.GetStateFromEvent(event), lldb.eStateStopped) 267 # Check that the mux process stopped 268 event = lldbutil.fetch_next_event( 269 self, self.mux_process_listener, self.mux_process.GetBroadcaster() 270 ) 271 self.assertState(lldb.SBProcess.GetStateFromEvent(event), lldb.eStateStopped) 272 273 real_process = real_target.GetProcess() 274 self.assertTrue(real_process.IsValid(), "Got a valid process") 275 self.assertState( 276 real_process.GetState(), lldb.eStateStopped, "Process is stopped" 277 ) 278 279 # This is a passthrough, so the two processes should have the same state: 280 # Check that we got the right threads: 281 self.assertEqual( 282 len(real_process.threads), 283 len(self.mux_process.threads), 284 "Same number of threads", 285 ) 286 for id in range(len(real_process.threads)): 287 real_pc = real_process.threads[id].frame[0].pc 288 mux_pc = self.mux_process.threads[id].frame[0].pc 289 self.assertEqual(real_pc, mux_pc, f"PC's equal for {id}") 290