1""" 2Test lldb Python event APIs. 3""" 4 5import re 6import lldb 7from lldbsuite.test.decorators import * 8from lldbsuite.test.lldbtest import * 9from lldbsuite.test import lldbutil 10 11 12@skipIfLinux # llvm.org/pr25924, sometimes generating SIGSEGV 13class EventAPITestCase(TestBase): 14 NO_DEBUG_INFO_TESTCASE = True 15 16 def setUp(self): 17 # Call super's setUp(). 18 TestBase.setUp(self) 19 # Find the line number to of function 'c'. 20 self.line = line_number( 21 "main.c", '// Find the line number of function "c" here.' 22 ) 23 24 @expectedFailureAll( 25 oslist=["linux"], bugnumber="llvm.org/pr23730 Flaky, fails ~1/10 cases" 26 ) 27 @skipIfWindows # This is flakey on Windows AND when it fails, it hangs: llvm.org/pr38373 28 @skipIfNetBSD 29 def test_listen_for_and_print_event(self): 30 """Exercise SBEvent API.""" 31 self.build() 32 exe = self.getBuildArtifact("a.out") 33 34 self.dbg.SetAsync(True) 35 36 # Create a target by the debugger. 37 target = self.dbg.CreateTarget(exe) 38 self.assertTrue(target, VALID_TARGET) 39 40 # Now create a breakpoint on main.c by name 'c'. 41 breakpoint = target.BreakpointCreateByName("c", "a.out") 42 43 listener = lldb.SBListener("my listener") 44 45 # Now launch the process, and do not stop at the entry point. 46 error = lldb.SBError() 47 flags = target.GetLaunchInfo().GetLaunchFlags() 48 process = target.Launch( 49 listener, 50 None, # argv 51 None, # envp 52 None, # stdin_path 53 None, # stdout_path 54 None, # stderr_path 55 None, # working directory 56 flags, # launch flags 57 False, # Stop at entry 58 error, 59 ) # error 60 61 self.assertEqual(process.GetState(), lldb.eStateStopped, PROCESS_STOPPED) 62 63 # Create an empty event object. 64 event = lldb.SBEvent() 65 66 traceOn = self.TraceOn() 67 if traceOn: 68 lldbutil.print_stacktraces(process) 69 70 # Create MyListeningThread class to wait for any kind of event. 71 import threading 72 73 class MyListeningThread(threading.Thread): 74 def run(self): 75 count = 0 76 # Let's only try at most 4 times to retrieve any kind of event. 77 # After that, the thread exits. 78 while not count > 3: 79 if traceOn: 80 print("Try wait for event...") 81 if listener.WaitForEvent(5, event): 82 if traceOn: 83 desc = lldbutil.get_description(event) 84 print("Event description:", desc) 85 print("Event data flavor:", event.GetDataFlavor()) 86 print( 87 "Process state:", 88 lldbutil.state_type_to_str(process.GetState()), 89 ) 90 print() 91 else: 92 if traceOn: 93 print("timeout occurred waiting for event...") 94 count = count + 1 95 listener.Clear() 96 return 97 98 # Let's start the listening thread to retrieve the events. 99 my_thread = MyListeningThread() 100 my_thread.start() 101 102 # Use Python API to continue the process. The listening thread should be 103 # able to receive the state changed events. 104 process.Continue() 105 106 # Use Python API to kill the process. The listening thread should be 107 # able to receive the state changed event, too. 108 process.Kill() 109 110 # Wait until the 'MyListeningThread' terminates. 111 my_thread.join() 112 113 # Shouldn't we be testing against some kind of expectation here? 114 115 @expectedFlakeyLinux("llvm.org/pr23730") # Flaky, fails ~1/100 cases 116 @skipIfWindows # This is flakey on Windows AND when it fails, it hangs: llvm.org/pr38373 117 @skipIfNetBSD 118 def test_wait_for_event(self): 119 """Exercise SBListener.WaitForEvent() API.""" 120 self.build() 121 exe = self.getBuildArtifact("a.out") 122 123 self.dbg.SetAsync(True) 124 125 # Create a target by the debugger. 126 target = self.dbg.CreateTarget(exe) 127 self.assertTrue(target, VALID_TARGET) 128 129 # Now create a breakpoint on main.c by name 'c'. 130 breakpoint = target.BreakpointCreateByName("c", "a.out") 131 self.trace("breakpoint:", breakpoint) 132 self.assertTrue( 133 breakpoint and breakpoint.GetNumLocations() == 1, VALID_BREAKPOINT 134 ) 135 136 # Get the debugger listener. 137 listener = self.dbg.GetListener() 138 139 # Now launch the process, and do not stop at entry point. 140 error = lldb.SBError() 141 flags = target.GetLaunchInfo().GetLaunchFlags() 142 process = target.Launch( 143 listener, 144 None, # argv 145 None, # envp 146 None, # stdin_path 147 None, # stdout_path 148 None, # stderr_path 149 None, # working directory 150 flags, # launch flags 151 False, # Stop at entry 152 error, 153 ) # error 154 self.assertTrue(error.Success() and process, PROCESS_IS_VALID) 155 156 # Create an empty event object. 157 event = lldb.SBEvent() 158 self.assertFalse(event, "Event should not be valid initially") 159 160 # Create MyListeningThread to wait for any kind of event. 161 import threading 162 163 class MyListeningThread(threading.Thread): 164 def run(self): 165 count = 0 166 # Let's only try at most 3 times to retrieve any kind of event. 167 while not count > 3: 168 if listener.WaitForEvent(5, event): 169 self.context.trace("Got a valid event:", event) 170 self.context.trace("Event data flavor:", event.GetDataFlavor()) 171 self.context.trace( 172 "Event type:", lldbutil.state_type_to_str(event.GetType()) 173 ) 174 listener.Clear() 175 return 176 count = count + 1 177 print("Timeout: listener.WaitForEvent") 178 listener.Clear() 179 return 180 181 # Use Python API to kill the process. The listening thread should be 182 # able to receive a state changed event. 183 process.Kill() 184 185 # Let's start the listening thread to retrieve the event. 186 my_thread = MyListeningThread() 187 my_thread.context = self 188 my_thread.start() 189 190 # Wait until the 'MyListeningThread' terminates. 191 my_thread.join() 192 193 self.assertTrue(event, "My listening thread successfully received an event") 194 195 @expectedFailureAll( 196 oslist=["linux"], bugnumber="llvm.org/pr23617 Flaky, fails ~1/10 cases" 197 ) 198 @skipIfWindows # This is flakey on Windows AND when it fails, it hangs: llvm.org/pr38373 199 @expectedFailureNetBSD 200 def test_add_listener_to_broadcaster(self): 201 """Exercise some SBBroadcaster APIs.""" 202 self.build() 203 exe = self.getBuildArtifact("a.out") 204 205 self.dbg.SetAsync(True) 206 207 # Create a target by the debugger. 208 target = self.dbg.CreateTarget(exe) 209 self.assertTrue(target, VALID_TARGET) 210 211 # Now create a breakpoint on main.c by name 'c'. 212 breakpoint = target.BreakpointCreateByName("c", "a.out") 213 self.trace("breakpoint:", breakpoint) 214 self.assertTrue( 215 breakpoint and breakpoint.GetNumLocations() == 1, VALID_BREAKPOINT 216 ) 217 218 listener = lldb.SBListener("my listener") 219 220 # Now launch the process, and do not stop at the entry point. 221 error = lldb.SBError() 222 flags = target.GetLaunchInfo().GetLaunchFlags() 223 process = target.Launch( 224 listener, 225 None, # argv 226 None, # envp 227 None, # stdin_path 228 None, # stdout_path 229 None, # stderr_path 230 None, # working directory 231 flags, # launch flags 232 False, # Stop at entry 233 error, 234 ) # error 235 236 # Create an empty event object. 237 event = lldb.SBEvent() 238 self.assertFalse(event, "Event should not be valid initially") 239 240 # The finite state machine for our custom listening thread, with an 241 # initial state of None, which means no event has been received. 242 # It changes to 'connected' after 'connected' event is received (for remote platforms) 243 # It changes to 'running' after 'running' event is received (should happen only if the 244 # currentstate is either 'None' or 'connected') 245 # It changes to 'stopped' if a 'stopped' event is received (should happen only if the 246 # current state is 'running'.) 247 self.state = None 248 249 # Create MyListeningThread to wait for state changed events. 250 # By design, a "running" event is expected following by a "stopped" 251 # event. 252 import threading 253 254 class MyListeningThread(threading.Thread): 255 def run(self): 256 self.context.trace("Running MyListeningThread:", self) 257 258 # Regular expression pattern for the event description. 259 pattern = re.compile("data = {.*, state = (.*)}$") 260 261 # Let's only try at most 6 times to retrieve our events. 262 count = 0 263 while True: 264 if listener.WaitForEvent(5, event): 265 desc = lldbutil.get_description(event) 266 self.context.trace("Event description:", desc) 267 match = pattern.search(desc) 268 if not match: 269 break 270 if match.group(1) == "connected": 271 # When debugging remote targets with lldb-server, we 272 # first get the 'connected' event. 273 self.context.assertTrue(self.context.state is None) 274 self.context.state = "connected" 275 continue 276 elif match.group(1) == "running": 277 self.context.assertTrue( 278 self.context.state is None 279 or self.context.state == "connected" 280 ) 281 self.context.state = "running" 282 continue 283 elif match.group(1) == "stopped": 284 self.context.assertTrue(self.context.state == "running") 285 # Whoopee, both events have been received! 286 self.context.state = "stopped" 287 break 288 else: 289 break 290 print("Timeout: listener.WaitForEvent") 291 count = count + 1 292 if count > 6: 293 break 294 listener.Clear() 295 return 296 297 # Use Python API to continue the process. The listening thread should be 298 # able to receive the state changed events. 299 process.Continue() 300 301 # Start the listening thread to receive the "running" followed by the 302 # "stopped" events. 303 my_thread = MyListeningThread() 304 # Supply the enclosing context so that our listening thread can access 305 # the 'state' variable. 306 my_thread.context = self 307 my_thread.start() 308 309 # Wait until the 'MyListeningThread' terminates. 310 my_thread.join() 311 312 # The final judgement. :-) 313 self.assertEqual( 314 self.state, "stopped", "Both expected state changed events received" 315 ) 316 317 def wait_for_next_event(self, expected_state, test_shadow = False): 318 """Wait for an event from self.primary & self.shadow listener. 319 If test_shadow is true, we also check that the shadow listener only 320 receives events AFTER the primary listener does.""" 321 # Waiting on the shadow listener shouldn't have events yet because 322 # we haven't fetched them for the primary listener yet: 323 event = lldb.SBEvent() 324 325 if test_shadow: 326 success = self.shadow_listener.WaitForEvent(1, event) 327 self.assertFalse(success, "Shadow listener doesn't pull events") 328 329 # But there should be an event for the primary listener: 330 success = self.primary_listener.WaitForEvent(5, event) 331 self.assertTrue(success, "Primary listener got the event") 332 333 state = lldb.SBProcess.GetStateFromEvent(event) 334 restart = False 335 if state == lldb.eStateStopped: 336 restart = lldb.SBProcess.GetRestartedFromEvent(event) 337 338 if expected_state != None: 339 self.assertEqual(state, expected_state, "Primary thread got the correct event") 340 341 # And after pulling that one there should be an equivalent event for the shadow 342 # listener: 343 success = self.shadow_listener.WaitForEvent(5, event) 344 self.assertTrue(success, "Shadow listener got event too") 345 self.assertEqual(state, lldb.SBProcess.GetStateFromEvent(event), "It was the same event") 346 self.assertEqual(restart, lldb.SBProcess.GetRestartedFromEvent(event), "It was the same restarted") 347 348 return state, restart 349 350 def test_shadow_listener(self): 351 self.build() 352 exe = self.getBuildArtifact("a.out") 353 354 # Create a target by the debugger. 355 target = self.dbg.CreateTarget(exe) 356 self.assertTrue(target, VALID_TARGET) 357 358 # Now create a breakpoint on main.c by name 'c'. 359 bkpt1 = target.BreakpointCreateByName("c", "a.out") 360 self.trace("breakpoint:", bkpt1) 361 self.assertTrue(bkpt1.GetNumLocations() == 1, VALID_BREAKPOINT) 362 363 self.primary_listener = lldb.SBListener("my listener") 364 self.shadow_listener = lldb.SBListener("shadow listener") 365 366 self.cur_thread = None 367 368 error = lldb.SBError() 369 launch_info = target.GetLaunchInfo() 370 launch_info.SetListener(self.primary_listener) 371 launch_info.SetShadowListener(self.shadow_listener) 372 373 self.runCmd("settings set target.process.extra-startup-command QSetLogging:bitmask=LOG_PROCESS|LOG_EXCEPTIONS|LOG_RNB_PACKETS|LOG_STEP;") 374 self.dbg.SetAsync(True) 375 376 self.process = target.Launch(launch_info, error) 377 self.assertSuccess(error, "Process launched successfully") 378 379 # Keep fetching events from the primary to trigger the do on removal and 380 # then from the shadow listener, and make sure they match: 381 382 # Events in the launch sequence might be platform dependent, so don't 383 # expect any particular event till we get the stopped: 384 state = lldb.eStateInvalid 385 while state != lldb.eStateStopped: 386 state, restart = self.wait_for_next_event(None, False) 387 388 # Okay, we're now at a good stop, so try a next: 389 self.cur_thread = self.process.threads[0] 390 391 # Make sure we're at our expected breakpoint: 392 self.assertTrue(self.cur_thread.IsValid(), "Got a zeroth thread") 393 self.assertEqual(self.cur_thread.stop_reason, lldb.eStopReasonBreakpoint) 394 self.assertEqual(self.cur_thread.GetStopReasonDataCount(), 2, "Only one breakpoint/loc here") 395 self.assertEqual(bkpt1.GetID(), self.cur_thread.GetStopReasonDataAtIndex(0), "Hit the right breakpoint") 396 # Disable the first breakpoint so it doesn't get in the way... 397 bkpt1.SetEnabled(False) 398 399 self.cur_thread.StepOver() 400 # We'll run the test for "shadow listener blocked by primary listener 401 # for the first couple rounds, then we'll skip the 1 second pause... 402 self.wait_for_next_event(lldb.eStateRunning, True) 403 self.wait_for_next_event(lldb.eStateStopped, True) 404 405 # Next try an auto-continue breakpoint and make sure the shadow listener got 406 # the resumed info as well. Note that I'm not explicitly counting 407 # running events here. At the point when I wrote this lldb sometimes 408 # emits two running events in a row. Apparently the code to coalesce running 409 # events isn't working. But that's not what this test is testing, we're really 410 # testing that the primary & shadow listeners hear the same thing and in the 411 # right order. 412 413 main_spec = lldb.SBFileSpec("main.c") 414 bkpt2 = target.BreakpointCreateBySourceRegex("b.2. returns %d", main_spec) 415 self.assertTrue(bkpt2.GetNumLocations() > 0, "BP2 worked") 416 bkpt2.SetAutoContinue(True) 417 418 bkpt3 = target.BreakpointCreateBySourceRegex("a.3. returns %d", main_spec) 419 self.assertTrue(bkpt3.GetNumLocations() > 0, "BP3 worked") 420 421 state = lldb.eStateStopped 422 restarted = False 423 424 # Put in a counter to make sure we don't spin forever if there is some 425 # error in the logic. 426 counter = 0 427 while state != lldb.eStateExited: 428 counter += 1 429 self.assertLess(counter, 50, "Took more than 50 events to hit two breakpoints.") 430 if state == lldb.eStateStopped and not restarted: 431 self.process.Continue() 432 state, restarted = self.wait_for_next_event(None, False) 433 434