1""" 2Test lldb Python event APIs. 3""" 4 5import re 6import lldb 7from lldbsuite.test.decorators import * 8from lldbsuite.test.lldbtest import * 9from lldbsuite.test import lldbutil 10 11 12@skipIfLinux # llvm.org/pr25924, sometimes generating SIGSEGV 13class EventAPITestCase(TestBase): 14 NO_DEBUG_INFO_TESTCASE = True 15 16 def setUp(self): 17 # Call super's setUp(). 18 TestBase.setUp(self) 19 # Find the line number to of function 'c'. 20 self.line = line_number( 21 "main.c", '// Find the line number of function "c" here.' 22 ) 23 24 @expectedFailureAll( 25 oslist=["linux"], bugnumber="llvm.org/pr23730 Flaky, fails ~1/10 cases" 26 ) 27 @skipIfWindows # This is flakey on Windows AND when it fails, it hangs: llvm.org/pr38373 28 @skipIfNetBSD 29 def test_listen_for_and_print_event(self): 30 """Exercise SBEvent API.""" 31 self.build() 32 exe = self.getBuildArtifact("a.out") 33 34 self.dbg.SetAsync(True) 35 36 # Create a target by the debugger. 37 target = self.dbg.CreateTarget(exe) 38 self.assertTrue(target, VALID_TARGET) 39 40 # Now create a breakpoint on main.c by name 'c'. 41 breakpoint = target.BreakpointCreateByName("c", "a.out") 42 43 listener = lldb.SBListener("my listener") 44 45 # Now launch the process, and do not stop at the entry point. 46 error = lldb.SBError() 47 flags = target.GetLaunchInfo().GetLaunchFlags() 48 process = target.Launch( 49 listener, 50 None, # argv 51 None, # envp 52 None, # stdin_path 53 None, # stdout_path 54 None, # stderr_path 55 None, # working directory 56 flags, # launch flags 57 False, # Stop at entry 58 error, 59 ) # error 60 61 self.assertEqual(process.GetState(), lldb.eStateStopped, PROCESS_STOPPED) 62 63 # Create an empty event object. 64 event = lldb.SBEvent() 65 66 traceOn = self.TraceOn() 67 if traceOn: 68 lldbutil.print_stacktraces(process) 69 70 # Create MyListeningThread class to wait for any kind of event. 71 import threading 72 73 class MyListeningThread(threading.Thread): 74 def run(self): 75 count = 0 76 # Let's only try at most 4 times to retrieve any kind of event. 77 # After that, the thread exits. 78 while not count > 3: 79 if traceOn: 80 print("Try wait for event...") 81 if listener.WaitForEvent(5, event): 82 if traceOn: 83 desc = lldbutil.get_description(event) 84 print("Event description:", desc) 85 print("Event data flavor:", event.GetDataFlavor()) 86 print( 87 "Process state:", 88 lldbutil.state_type_to_str(process.GetState()), 89 ) 90 print() 91 else: 92 if traceOn: 93 print("timeout occurred waiting for event...") 94 count = count + 1 95 listener.Clear() 96 return 97 98 # Let's start the listening thread to retrieve the events. 99 my_thread = MyListeningThread() 100 my_thread.start() 101 102 # Use Python API to continue the process. The listening thread should be 103 # able to receive the state changed events. 104 process.Continue() 105 106 # Use Python API to kill the process. The listening thread should be 107 # able to receive the state changed event, too. 108 process.Kill() 109 110 # Wait until the 'MyListeningThread' terminates. 111 my_thread.join() 112 113 # Shouldn't we be testing against some kind of expectation here? 114 115 @expectedFlakeyLinux("llvm.org/pr23730") # Flaky, fails ~1/100 cases 116 @skipIfWindows # This is flakey on Windows AND when it fails, it hangs: llvm.org/pr38373 117 @skipIfNetBSD 118 def test_wait_for_event(self): 119 """Exercise SBListener.WaitForEvent() API.""" 120 self.build() 121 exe = self.getBuildArtifact("a.out") 122 123 self.dbg.SetAsync(True) 124 125 # Create a target by the debugger. 126 target = self.dbg.CreateTarget(exe) 127 self.assertTrue(target, VALID_TARGET) 128 129 # Now create a breakpoint on main.c by name 'c'. 130 breakpoint = target.BreakpointCreateByName("c", "a.out") 131 self.trace("breakpoint:", breakpoint) 132 self.assertTrue( 133 breakpoint and breakpoint.GetNumLocations() == 1, VALID_BREAKPOINT 134 ) 135 136 # Get the debugger listener. 137 listener = self.dbg.GetListener() 138 139 # Now launch the process, and do not stop at entry point. 140 error = lldb.SBError() 141 flags = target.GetLaunchInfo().GetLaunchFlags() 142 process = target.Launch( 143 listener, 144 None, # argv 145 None, # envp 146 None, # stdin_path 147 None, # stdout_path 148 None, # stderr_path 149 None, # working directory 150 flags, # launch flags 151 False, # Stop at entry 152 error, 153 ) # error 154 self.assertTrue(error.Success() and process, PROCESS_IS_VALID) 155 156 # Create an empty event object. 157 event = lldb.SBEvent() 158 self.assertFalse(event, "Event should not be valid initially") 159 160 # Create MyListeningThread to wait for any kind of event. 161 import threading 162 163 class MyListeningThread(threading.Thread): 164 def run(self): 165 count = 0 166 # Let's only try at most 3 times to retrieve any kind of event. 167 while not count > 3: 168 if listener.WaitForEvent(5, event): 169 self.context.trace("Got a valid event:", event) 170 self.context.trace("Event data flavor:", event.GetDataFlavor()) 171 self.context.trace( 172 "Event type:", lldbutil.state_type_to_str(event.GetType()) 173 ) 174 listener.Clear() 175 return 176 count = count + 1 177 print("Timeout: listener.WaitForEvent") 178 listener.Clear() 179 return 180 181 # Use Python API to kill the process. The listening thread should be 182 # able to receive a state changed event. 183 process.Kill() 184 185 # Let's start the listening thread to retrieve the event. 186 my_thread = MyListeningThread() 187 my_thread.context = self 188 my_thread.start() 189 190 # Wait until the 'MyListeningThread' terminates. 191 my_thread.join() 192 193 self.assertTrue(event, "My listening thread successfully received an event") 194 195 @expectedFailureAll( 196 oslist=["linux"], bugnumber="llvm.org/pr23617 Flaky, fails ~1/10 cases" 197 ) 198 @skipIfWindows # This is flakey on Windows AND when it fails, it hangs: llvm.org/pr38373 199 @expectedFailureNetBSD 200 def test_add_listener_to_broadcaster(self): 201 """Exercise some SBBroadcaster APIs.""" 202 self.build() 203 exe = self.getBuildArtifact("a.out") 204 205 self.dbg.SetAsync(True) 206 207 # Create a target by the debugger. 208 target = self.dbg.CreateTarget(exe) 209 self.assertTrue(target, VALID_TARGET) 210 211 # Now create a breakpoint on main.c by name 'c'. 212 breakpoint = target.BreakpointCreateByName("c", "a.out") 213 self.trace("breakpoint:", breakpoint) 214 self.assertTrue( 215 breakpoint and breakpoint.GetNumLocations() == 1, VALID_BREAKPOINT 216 ) 217 218 listener = lldb.SBListener("my listener") 219 220 # Now launch the process, and do not stop at the entry point. 221 error = lldb.SBError() 222 flags = target.GetLaunchInfo().GetLaunchFlags() 223 process = target.Launch( 224 listener, 225 None, # argv 226 None, # envp 227 None, # stdin_path 228 None, # stdout_path 229 None, # stderr_path 230 None, # working directory 231 flags, # launch flags 232 False, # Stop at entry 233 error, 234 ) # error 235 236 # Create an empty event object. 237 event = lldb.SBEvent() 238 self.assertFalse(event, "Event should not be valid initially") 239 240 # The finite state machine for our custom listening thread, with an 241 # initial state of None, which means no event has been received. 242 # It changes to 'connected' after 'connected' event is received (for remote platforms) 243 # It changes to 'running' after 'running' event is received (should happen only if the 244 # currentstate is either 'None' or 'connected') 245 # It changes to 'stopped' if a 'stopped' event is received (should happen only if the 246 # current state is 'running'.) 247 self.state = None 248 249 # Create MyListeningThread to wait for state changed events. 250 # By design, a "running" event is expected following by a "stopped" 251 # event. 252 import threading 253 254 class MyListeningThread(threading.Thread): 255 def run(self): 256 self.context.trace("Running MyListeningThread:", self) 257 258 # Regular expression pattern for the event description. 259 pattern = re.compile("data = {.*, state = (.*)}$") 260 261 # Let's only try at most 6 times to retrieve our events. 262 count = 0 263 while True: 264 if listener.WaitForEvent(5, event): 265 desc = lldbutil.get_description(event) 266 self.context.trace("Event description:", desc) 267 match = pattern.search(desc) 268 if not match: 269 break 270 if match.group(1) == "connected": 271 # When debugging remote targets with lldb-server, we 272 # first get the 'connected' event. 273 self.context.assertTrue(self.context.state is None) 274 self.context.state = "connected" 275 continue 276 elif match.group(1) == "running": 277 self.context.assertTrue( 278 self.context.state is None 279 or self.context.state == "connected" 280 ) 281 self.context.state = "running" 282 continue 283 elif match.group(1) == "stopped": 284 self.context.assertTrue(self.context.state == "running") 285 # Whoopee, both events have been received! 286 self.context.state = "stopped" 287 break 288 else: 289 break 290 print("Timeout: listener.WaitForEvent") 291 count = count + 1 292 if count > 6: 293 break 294 listener.Clear() 295 return 296 297 # Use Python API to continue the process. The listening thread should be 298 # able to receive the state changed events. 299 process.Continue() 300 301 # Start the listening thread to receive the "running" followed by the 302 # "stopped" events. 303 my_thread = MyListeningThread() 304 # Supply the enclosing context so that our listening thread can access 305 # the 'state' variable. 306 my_thread.context = self 307 my_thread.start() 308 309 # Wait until the 'MyListeningThread' terminates. 310 my_thread.join() 311 312 # The final judgement. :-) 313 self.assertEqual( 314 self.state, "stopped", "Both expected state changed events received" 315 ) 316 317 def wait_for_next_event(self, expected_state, test_shadow=False): 318 """Wait for an event from self.primary & self.shadow listener. 319 If test_shadow is true, we also check that the shadow listener only 320 receives events AFTER the primary listener does.""" 321 # Waiting on the shadow listener shouldn't have events yet because 322 # we haven't fetched them for the primary listener yet: 323 event = lldb.SBEvent() 324 325 if test_shadow: 326 success = self.shadow_listener.WaitForEvent(1, event) 327 self.assertFalse(success, "Shadow listener doesn't pull events") 328 329 # But there should be an event for the primary listener: 330 success = self.primary_listener.WaitForEvent(5, event) 331 self.assertTrue(success, "Primary listener got the event") 332 333 state = lldb.SBProcess.GetStateFromEvent(event) 334 restart = False 335 if state == lldb.eStateStopped: 336 restart = lldb.SBProcess.GetRestartedFromEvent(event) 337 338 if expected_state is not None: 339 self.assertEqual( 340 state, expected_state, "Primary thread got the correct event" 341 ) 342 343 # And after pulling that one there should be an equivalent event for the shadow 344 # listener: 345 success = self.shadow_listener.WaitForEvent(5, event) 346 self.assertTrue(success, "Shadow listener got event too") 347 self.assertEqual( 348 state, lldb.SBProcess.GetStateFromEvent(event), "It was the same event" 349 ) 350 self.assertEqual( 351 restart, 352 lldb.SBProcess.GetRestartedFromEvent(event), 353 "It was the same restarted", 354 ) 355 356 return state, restart 357 358 @expectedFlakeyLinux("llvm.org/pr23730") # Flaky, fails ~1/100 cases 359 @skipIfWindows # This is flakey on Windows AND when it fails, it hangs: llvm.org/pr38373 360 @skipIfNetBSD 361 def test_shadow_listener(self): 362 self.build() 363 exe = self.getBuildArtifact("a.out") 364 365 # Create a target by the debugger. 366 target = self.dbg.CreateTarget(exe) 367 self.assertTrue(target, VALID_TARGET) 368 369 # Now create a breakpoint on main.c by name 'c'. 370 bkpt1 = target.BreakpointCreateByName("c", "a.out") 371 self.trace("breakpoint:", bkpt1) 372 self.assertEqual(bkpt1.GetNumLocations(), 1, VALID_BREAKPOINT) 373 374 self.primary_listener = lldb.SBListener("my listener") 375 self.shadow_listener = lldb.SBListener("shadow listener") 376 377 self.cur_thread = None 378 379 error = lldb.SBError() 380 launch_info = target.GetLaunchInfo() 381 launch_info.SetListener(self.primary_listener) 382 launch_info.SetShadowListener(self.shadow_listener) 383 384 self.runCmd( 385 "settings set target.process.extra-startup-command QSetLogging:bitmask=LOG_PROCESS|LOG_EXCEPTIONS|LOG_RNB_PACKETS|LOG_STEP;" 386 ) 387 self.dbg.SetAsync(True) 388 389 self.process = target.Launch(launch_info, error) 390 self.assertSuccess(error, "Process launched successfully") 391 392 # Keep fetching events from the primary to trigger the do on removal and 393 # then from the shadow listener, and make sure they match: 394 395 # Events in the launch sequence might be platform dependent, so don't 396 # expect any particular event till we get the stopped: 397 state = lldb.eStateInvalid 398 while state != lldb.eStateStopped: 399 state, restart = self.wait_for_next_event(None, False) 400 401 # Okay, we're now at a good stop, so try a next: 402 self.cur_thread = self.process.threads[0] 403 404 # Make sure we're at our expected breakpoint: 405 self.assertTrue(self.cur_thread.IsValid(), "Got a zeroth thread") 406 self.assertEqual(self.cur_thread.stop_reason, lldb.eStopReasonBreakpoint) 407 self.assertEqual( 408 self.cur_thread.GetStopReasonDataCount(), 2, "Only one breakpoint/loc here" 409 ) 410 self.assertEqual( 411 bkpt1.GetID(), 412 self.cur_thread.GetStopReasonDataAtIndex(0), 413 "Hit the right breakpoint", 414 ) 415 # Disable the first breakpoint so it doesn't get in the way... 416 bkpt1.SetEnabled(False) 417 418 self.cur_thread.StepOver() 419 # We'll run the test for "shadow listener blocked by primary listener 420 # for the first couple rounds, then we'll skip the 1 second pause... 421 self.wait_for_next_event(lldb.eStateRunning, True) 422 self.wait_for_next_event(lldb.eStateStopped, True) 423 424 # Next try an auto-continue breakpoint and make sure the shadow listener got 425 # the resumed info as well. Note that I'm not explicitly counting 426 # running events here. At the point when I wrote this lldb sometimes 427 # emits two running events in a row. Apparently the code to coalesce running 428 # events isn't working. But that's not what this test is testing, we're really 429 # testing that the primary & shadow listeners hear the same thing and in the 430 # right order. 431 432 main_spec = lldb.SBFileSpec("main.c") 433 bkpt2 = target.BreakpointCreateBySourceRegex("b.2. returns %d", main_spec) 434 self.assertGreater(bkpt2.GetNumLocations(), 0, "BP2 worked") 435 bkpt2.SetAutoContinue(True) 436 437 bkpt3 = target.BreakpointCreateBySourceRegex("a.3. returns %d", main_spec) 438 self.assertGreater(bkpt3.GetNumLocations(), 0, "BP3 worked") 439 440 state = lldb.eStateStopped 441 restarted = False 442 443 # Put in a counter to make sure we don't spin forever if there is some 444 # error in the logic. 445 counter = 0 446 while state != lldb.eStateExited: 447 counter += 1 448 self.assertLess( 449 counter, 50, "Took more than 50 events to hit two breakpoints." 450 ) 451 if state == lldb.eStateStopped and not restarted: 452 self.process.Continue() 453 state, restarted = self.wait_for_next_event(None, False) 454