1""" 2Test lldb Python event APIs. 3""" 4 5import re 6import lldb 7from lldbsuite.test.decorators import * 8from lldbsuite.test.lldbtest import * 9from lldbsuite.test import lldbutil 10 11 12@skipIfLinux # llvm.org/pr25924, sometimes generating SIGSEGV 13class EventAPITestCase(TestBase): 14 NO_DEBUG_INFO_TESTCASE = True 15 16 def setUp(self): 17 # Call super's setUp(). 18 TestBase.setUp(self) 19 # Find the line number to of function 'c'. 20 self.line = line_number( 21 'main.c', '// Find the line number of function "c" here.') 22 23 @expectedFailureAll( 24 oslist=["linux"], 25 bugnumber="llvm.org/pr23730 Flaky, fails ~1/10 cases") 26 @skipIfWindows # This is flakey on Windows AND when it fails, it hangs: llvm.org/pr38373 27 @skipIfNetBSD 28 def test_listen_for_and_print_event(self): 29 """Exercise SBEvent API.""" 30 self.build() 31 exe = self.getBuildArtifact("a.out") 32 33 self.dbg.SetAsync(True) 34 35 # Create a target by the debugger. 36 target = self.dbg.CreateTarget(exe) 37 self.assertTrue(target, VALID_TARGET) 38 39 # Now create a breakpoint on main.c by name 'c'. 40 breakpoint = target.BreakpointCreateByName('c', 'a.out') 41 42 listener = lldb.SBListener("my listener") 43 44 # Now launch the process, and do not stop at the entry point. 45 error = lldb.SBError() 46 flags = target.GetLaunchInfo().GetLaunchFlags() 47 process = target.Launch(listener, 48 None, # argv 49 None, # envp 50 None, # stdin_path 51 None, # stdout_path 52 None, # stderr_path 53 None, # working directory 54 flags, # launch flags 55 False, # Stop at entry 56 error) # error 57 58 self.assertEqual( 59 process.GetState(), lldb.eStateStopped, 60 PROCESS_STOPPED) 61 62 # Create an empty event object. 63 event = lldb.SBEvent() 64 65 traceOn = self.TraceOn() 66 if traceOn: 67 lldbutil.print_stacktraces(process) 68 69 # Create MyListeningThread class to wait for any kind of event. 70 import threading 71 72 class MyListeningThread(threading.Thread): 73 74 def run(self): 75 count = 0 76 # Let's only try at most 4 times to retrieve any kind of event. 77 # After that, the thread exits. 78 while not count > 3: 79 if traceOn: 80 print("Try wait for event...") 81 if listener.WaitForEvent(5, event): 82 if traceOn: 83 desc = lldbutil.get_description(event) 84 print("Event description:", desc) 85 print("Event data flavor:", event.GetDataFlavor()) 86 print( 87 "Process state:", 88 lldbutil.state_type_to_str( 89 process.GetState())) 90 print() 91 else: 92 if traceOn: 93 print("timeout occurred waiting for event...") 94 count = count + 1 95 listener.Clear() 96 return 97 98 # Let's start the listening thread to retrieve the events. 99 my_thread = MyListeningThread() 100 my_thread.start() 101 102 # Use Python API to continue the process. The listening thread should be 103 # able to receive the state changed events. 104 process.Continue() 105 106 # Use Python API to kill the process. The listening thread should be 107 # able to receive the state changed event, too. 108 process.Kill() 109 110 # Wait until the 'MyListeningThread' terminates. 111 my_thread.join() 112 113 # Shouldn't we be testing against some kind of expectation here? 114 115 @expectedFlakeyLinux("llvm.org/pr23730") # Flaky, fails ~1/100 cases 116 @skipIfWindows # This is flakey on Windows AND when it fails, it hangs: llvm.org/pr38373 117 @skipIfNetBSD 118 def test_wait_for_event(self): 119 """Exercise SBListener.WaitForEvent() API.""" 120 self.build() 121 exe = self.getBuildArtifact("a.out") 122 123 self.dbg.SetAsync(True) 124 125 # Create a target by the debugger. 126 target = self.dbg.CreateTarget(exe) 127 self.assertTrue(target, VALID_TARGET) 128 129 # Now create a breakpoint on main.c by name 'c'. 130 breakpoint = target.BreakpointCreateByName('c', 'a.out') 131 self.trace("breakpoint:", breakpoint) 132 self.assertTrue(breakpoint and 133 breakpoint.GetNumLocations() == 1, 134 VALID_BREAKPOINT) 135 136 # Get the debugger listener. 137 listener = self.dbg.GetListener() 138 139 # Now launch the process, and do not stop at entry point. 140 error = lldb.SBError() 141 flags = target.GetLaunchInfo().GetLaunchFlags() 142 process = target.Launch(listener, 143 None, # argv 144 None, # envp 145 None, # stdin_path 146 None, # stdout_path 147 None, # stderr_path 148 None, # working directory 149 flags, # launch flags 150 False, # Stop at entry 151 error) # error 152 self.assertTrue(error.Success() and process, PROCESS_IS_VALID) 153 154 # Create an empty event object. 155 event = lldb.SBEvent() 156 self.assertFalse(event, "Event should not be valid initially") 157 158 # Create MyListeningThread to wait for any kind of event. 159 import threading 160 161 class MyListeningThread(threading.Thread): 162 163 def run(self): 164 count = 0 165 # Let's only try at most 3 times to retrieve any kind of event. 166 while not count > 3: 167 if listener.WaitForEvent(5, event): 168 self.context.trace("Got a valid event:", event) 169 self.context.trace("Event data flavor:", event.GetDataFlavor()) 170 self.context.trace("Event type:", lldbutil.state_type_to_str(event.GetType())) 171 listener.Clear() 172 return 173 count = count + 1 174 print("Timeout: listener.WaitForEvent") 175 listener.Clear() 176 return 177 178 # Use Python API to kill the process. The listening thread should be 179 # able to receive a state changed event. 180 process.Kill() 181 182 # Let's start the listening thread to retrieve the event. 183 my_thread = MyListeningThread() 184 my_thread.context = self 185 my_thread.start() 186 187 # Wait until the 'MyListeningThread' terminates. 188 my_thread.join() 189 190 self.assertTrue(event, 191 "My listening thread successfully received an event") 192 193 @expectedFailureAll( 194 oslist=["linux"], 195 bugnumber="llvm.org/pr23617 Flaky, fails ~1/10 cases") 196 @skipIfWindows # This is flakey on Windows AND when it fails, it hangs: llvm.org/pr38373 197 @expectedFailureNetBSD 198 def test_add_listener_to_broadcaster(self): 199 """Exercise some SBBroadcaster APIs.""" 200 self.build() 201 exe = self.getBuildArtifact("a.out") 202 203 self.dbg.SetAsync(True) 204 205 # Create a target by the debugger. 206 target = self.dbg.CreateTarget(exe) 207 self.assertTrue(target, VALID_TARGET) 208 209 # Now create a breakpoint on main.c by name 'c'. 210 breakpoint = target.BreakpointCreateByName('c', 'a.out') 211 self.trace("breakpoint:", breakpoint) 212 self.assertTrue(breakpoint and 213 breakpoint.GetNumLocations() == 1, 214 VALID_BREAKPOINT) 215 216 listener = lldb.SBListener("my listener") 217 218 # Now launch the process, and do not stop at the entry point. 219 error = lldb.SBError() 220 flags = target.GetLaunchInfo().GetLaunchFlags() 221 process = target.Launch(listener, 222 None, # argv 223 None, # envp 224 None, # stdin_path 225 None, # stdout_path 226 None, # stderr_path 227 None, # working directory 228 flags, # launch flags 229 False, # Stop at entry 230 error) # error 231 232 # Create an empty event object. 233 event = lldb.SBEvent() 234 self.assertFalse(event, "Event should not be valid initially") 235 236 # The finite state machine for our custom listening thread, with an 237 # initial state of None, which means no event has been received. 238 # It changes to 'connected' after 'connected' event is received (for remote platforms) 239 # It changes to 'running' after 'running' event is received (should happen only if the 240 # currentstate is either 'None' or 'connected') 241 # It changes to 'stopped' if a 'stopped' event is received (should happen only if the 242 # current state is 'running'.) 243 self.state = None 244 245 # Create MyListeningThread to wait for state changed events. 246 # By design, a "running" event is expected following by a "stopped" 247 # event. 248 import threading 249 250 class MyListeningThread(threading.Thread): 251 252 def run(self): 253 self.context.trace("Running MyListeningThread:", self) 254 255 # Regular expression pattern for the event description. 256 pattern = re.compile("data = {.*, state = (.*)}$") 257 258 # Let's only try at most 6 times to retrieve our events. 259 count = 0 260 while True: 261 if listener.WaitForEvent(5, event): 262 desc = lldbutil.get_description(event) 263 self.context.trace("Event description:", desc) 264 match = pattern.search(desc) 265 if not match: 266 break 267 if match.group(1) == 'connected': 268 # When debugging remote targets with lldb-server, we 269 # first get the 'connected' event. 270 self.context.assertTrue(self.context.state is None) 271 self.context.state = 'connected' 272 continue 273 elif match.group(1) == 'running': 274 self.context.assertTrue( 275 self.context.state is None or self.context.state == 'connected') 276 self.context.state = 'running' 277 continue 278 elif match.group(1) == 'stopped': 279 self.context.assertTrue( 280 self.context.state == 'running') 281 # Whoopee, both events have been received! 282 self.context.state = 'stopped' 283 break 284 else: 285 break 286 print("Timeout: listener.WaitForEvent") 287 count = count + 1 288 if count > 6: 289 break 290 listener.Clear() 291 return 292 293 # Use Python API to continue the process. The listening thread should be 294 # able to receive the state changed events. 295 process.Continue() 296 297 # Start the listening thread to receive the "running" followed by the 298 # "stopped" events. 299 my_thread = MyListeningThread() 300 # Supply the enclosing context so that our listening thread can access 301 # the 'state' variable. 302 my_thread.context = self 303 my_thread.start() 304 305 # Wait until the 'MyListeningThread' terminates. 306 my_thread.join() 307 308 # The final judgement. :-) 309 self.assertEqual(self.state, 'stopped', 310 "Both expected state changed events received") 311