1""" 2Test lldb Python event APIs. 3""" 4 5import re 6import lldb 7from lldbsuite.test.decorators import * 8from lldbsuite.test.lldbtest import * 9from lldbsuite.test import lldbutil 10 11 12@skipIfLinux # llvm.org/pr25924, sometimes generating SIGSEGV 13class EventAPITestCase(TestBase): 14 NO_DEBUG_INFO_TESTCASE = True 15 16 def setUp(self): 17 # Call super's setUp(). 18 TestBase.setUp(self) 19 # Find the line number to of function 'c'. 20 self.line = line_number( 21 "main.c", '// Find the line number of function "c" here.' 22 ) 23 24 @expectedFailureAll( 25 oslist=["linux"], bugnumber="llvm.org/pr23730 Flaky, fails ~1/10 cases" 26 ) 27 @skipIfWindows # This is flakey on Windows AND when it fails, it hangs: llvm.org/pr38373 28 @skipIfNetBSD 29 def test_listen_for_and_print_event(self): 30 """Exercise SBEvent API.""" 31 self.build() 32 exe = self.getBuildArtifact("a.out") 33 34 self.dbg.SetAsync(True) 35 36 # Create a target by the debugger. 37 target = self.dbg.CreateTarget(exe) 38 self.assertTrue(target, VALID_TARGET) 39 40 # Now create a breakpoint on main.c by name 'c'. 41 breakpoint = target.BreakpointCreateByName("c", "a.out") 42 43 listener = lldb.SBListener("my listener") 44 45 # Now launch the process, and do not stop at the entry point. 46 error = lldb.SBError() 47 flags = target.GetLaunchInfo().GetLaunchFlags() 48 process = target.Launch( 49 listener, 50 None, # argv 51 None, # envp 52 None, # stdin_path 53 None, # stdout_path 54 None, # stderr_path 55 None, # working directory 56 flags, # launch flags 57 False, # Stop at entry 58 error, 59 ) # error 60 61 self.assertEqual(process.GetState(), lldb.eStateStopped, PROCESS_STOPPED) 62 63 # Create an empty event object. 64 event = lldb.SBEvent() 65 66 traceOn = self.TraceOn() 67 if traceOn: 68 lldbutil.print_stacktraces(process) 69 70 # Create MyListeningThread class to wait for any kind of event. 71 import threading 72 73 class MyListeningThread(threading.Thread): 74 def run(self): 75 count = 0 76 # Let's only try at most 4 times to retrieve any kind of event. 77 # After that, the thread exits. 78 while not count > 3: 79 if traceOn: 80 print("Try wait for event...") 81 if listener.WaitForEvent(5, event): 82 if traceOn: 83 desc = lldbutil.get_description(event) 84 print("Event description:", desc) 85 print("Event data flavor:", event.GetDataFlavor()) 86 print( 87 "Process state:", 88 lldbutil.state_type_to_str(process.GetState()), 89 ) 90 print() 91 else: 92 if traceOn: 93 print("timeout occurred waiting for event...") 94 count = count + 1 95 listener.Clear() 96 return 97 98 # Let's start the listening thread to retrieve the events. 99 my_thread = MyListeningThread() 100 my_thread.start() 101 102 # Use Python API to continue the process. The listening thread should be 103 # able to receive the state changed events. 104 process.Continue() 105 106 # Use Python API to kill the process. The listening thread should be 107 # able to receive the state changed event, too. 108 process.Kill() 109 110 # Wait until the 'MyListeningThread' terminates. 111 my_thread.join() 112 113 # Shouldn't we be testing against some kind of expectation here? 114 115 @expectedFlakeyLinux("llvm.org/pr23730") # Flaky, fails ~1/100 cases 116 @skipIfWindows # This is flakey on Windows AND when it fails, it hangs: llvm.org/pr38373 117 @skipIfNetBSD 118 def test_wait_for_event(self): 119 """Exercise SBListener.WaitForEvent() API.""" 120 self.build() 121 exe = self.getBuildArtifact("a.out") 122 123 self.dbg.SetAsync(True) 124 125 # Create a target by the debugger. 126 target = self.dbg.CreateTarget(exe) 127 self.assertTrue(target, VALID_TARGET) 128 129 # Now create a breakpoint on main.c by name 'c'. 130 breakpoint = target.BreakpointCreateByName("c", "a.out") 131 self.trace("breakpoint:", breakpoint) 132 self.assertTrue( 133 breakpoint and breakpoint.GetNumLocations() == 1, VALID_BREAKPOINT 134 ) 135 136 # Get the debugger listener. 137 listener = self.dbg.GetListener() 138 139 # Now launch the process, and do not stop at entry point. 140 error = lldb.SBError() 141 flags = target.GetLaunchInfo().GetLaunchFlags() 142 process = target.Launch( 143 listener, 144 None, # argv 145 None, # envp 146 None, # stdin_path 147 None, # stdout_path 148 None, # stderr_path 149 None, # working directory 150 flags, # launch flags 151 False, # Stop at entry 152 error, 153 ) # error 154 self.assertTrue(error.Success() and process, PROCESS_IS_VALID) 155 156 # Create an empty event object. 157 event = lldb.SBEvent() 158 self.assertFalse(event, "Event should not be valid initially") 159 160 # Create MyListeningThread to wait for any kind of event. 161 import threading 162 163 class MyListeningThread(threading.Thread): 164 def run(self): 165 count = 0 166 # Let's only try at most 3 times to retrieve any kind of event. 167 while not count > 3: 168 if listener.WaitForEvent(5, event): 169 self.context.trace("Got a valid event:", event) 170 self.context.trace("Event data flavor:", event.GetDataFlavor()) 171 self.context.trace( 172 "Event type:", lldbutil.state_type_to_str(event.GetType()) 173 ) 174 listener.Clear() 175 return 176 count = count + 1 177 print("Timeout: listener.WaitForEvent") 178 listener.Clear() 179 return 180 181 # Use Python API to kill the process. The listening thread should be 182 # able to receive a state changed event. 183 process.Kill() 184 185 # Let's start the listening thread to retrieve the event. 186 my_thread = MyListeningThread() 187 my_thread.context = self 188 my_thread.start() 189 190 # Wait until the 'MyListeningThread' terminates. 191 my_thread.join() 192 193 self.assertTrue(event, "My listening thread successfully received an event") 194 195 @expectedFailureAll( 196 oslist=["linux"], bugnumber="llvm.org/pr23617 Flaky, fails ~1/10 cases" 197 ) 198 @skipIfWindows # This is flakey on Windows AND when it fails, it hangs: llvm.org/pr38373 199 @expectedFailureNetBSD 200 def test_add_listener_to_broadcaster(self): 201 """Exercise some SBBroadcaster APIs.""" 202 self.build() 203 exe = self.getBuildArtifact("a.out") 204 205 self.dbg.SetAsync(True) 206 207 # Create a target by the debugger. 208 target = self.dbg.CreateTarget(exe) 209 self.assertTrue(target, VALID_TARGET) 210 211 # Now create a breakpoint on main.c by name 'c'. 212 breakpoint = target.BreakpointCreateByName("c", "a.out") 213 self.trace("breakpoint:", breakpoint) 214 self.assertTrue( 215 breakpoint and breakpoint.GetNumLocations() == 1, VALID_BREAKPOINT 216 ) 217 218 listener = lldb.SBListener("my listener") 219 220 # Now launch the process, and do not stop at the entry point. 221 error = lldb.SBError() 222 flags = target.GetLaunchInfo().GetLaunchFlags() 223 process = target.Launch( 224 listener, 225 None, # argv 226 None, # envp 227 None, # stdin_path 228 None, # stdout_path 229 None, # stderr_path 230 None, # working directory 231 flags, # launch flags 232 False, # Stop at entry 233 error, 234 ) # error 235 236 # Create an empty event object. 237 event = lldb.SBEvent() 238 self.assertFalse(event, "Event should not be valid initially") 239 240 # The finite state machine for our custom listening thread, with an 241 # initial state of None, which means no event has been received. 242 # It changes to 'connected' after 'connected' event is received (for remote platforms) 243 # It changes to 'running' after 'running' event is received (should happen only if the 244 # currentstate is either 'None' or 'connected') 245 # It changes to 'stopped' if a 'stopped' event is received (should happen only if the 246 # current state is 'running'.) 247 self.state = None 248 249 # Create MyListeningThread to wait for state changed events. 250 # By design, a "running" event is expected following by a "stopped" 251 # event. 252 import threading 253 254 class MyListeningThread(threading.Thread): 255 def run(self): 256 self.context.trace("Running MyListeningThread:", self) 257 258 # Regular expression pattern for the event description. 259 pattern = re.compile("data = {.*, state = (.*)}$") 260 261 # Let's only try at most 6 times to retrieve our events. 262 count = 0 263 while True: 264 if listener.WaitForEvent(5, event): 265 desc = lldbutil.get_description(event) 266 self.context.trace("Event description:", desc) 267 match = pattern.search(desc) 268 if not match: 269 break 270 if match.group(1) == "connected": 271 # When debugging remote targets with lldb-server, we 272 # first get the 'connected' event. 273 self.context.assertTrue(self.context.state is None) 274 self.context.state = "connected" 275 continue 276 elif match.group(1) == "running": 277 self.context.assertTrue( 278 self.context.state is None 279 or self.context.state == "connected" 280 ) 281 self.context.state = "running" 282 continue 283 elif match.group(1) == "stopped": 284 self.context.assertTrue(self.context.state == "running") 285 # Whoopee, both events have been received! 286 self.context.state = "stopped" 287 break 288 else: 289 break 290 print("Timeout: listener.WaitForEvent") 291 count = count + 1 292 if count > 6: 293 break 294 listener.Clear() 295 return 296 297 # Use Python API to continue the process. The listening thread should be 298 # able to receive the state changed events. 299 process.Continue() 300 301 # Start the listening thread to receive the "running" followed by the 302 # "stopped" events. 303 my_thread = MyListeningThread() 304 # Supply the enclosing context so that our listening thread can access 305 # the 'state' variable. 306 my_thread.context = self 307 my_thread.start() 308 309 # Wait until the 'MyListeningThread' terminates. 310 my_thread.join() 311 312 # The final judgement. :-) 313 self.assertEqual( 314 self.state, "stopped", "Both expected state changed events received" 315 ) 316