1""" 2Test lldb Python event APIs. 3""" 4 5from __future__ import print_function 6 7 8import re 9import lldb 10from lldbsuite.test.decorators import * 11from lldbsuite.test.lldbtest import * 12from lldbsuite.test import lldbutil 13 14 15@skipIfLinux # llvm.org/pr25924, sometimes generating SIGSEGV 16@skipIfDarwin 17class EventAPITestCase(TestBase): 18 19 mydir = TestBase.compute_mydir(__file__) 20 NO_DEBUG_INFO_TESTCASE = True 21 22 def setUp(self): 23 # Call super's setUp(). 24 TestBase.setUp(self) 25 # Find the line number to of function 'c'. 26 self.line = line_number( 27 'main.c', '// Find the line number of function "c" here.') 28 29 @add_test_categories(['pyapi']) 30 @expectedFailureAll( 31 oslist=["linux"], 32 bugnumber="llvm.org/pr23730 Flaky, fails ~1/10 cases") 33 @skipIfWindows # This is flakey on Windows AND when it fails, it hangs: llvm.org/pr38373 34 @skipIfNetBSD 35 def test_listen_for_and_print_event(self): 36 """Exercise SBEvent API.""" 37 self.build() 38 exe = self.getBuildArtifact("a.out") 39 40 self.dbg.SetAsync(True) 41 42 # Create a target by the debugger. 43 target = self.dbg.CreateTarget(exe) 44 self.assertTrue(target, VALID_TARGET) 45 46 # Now create a breakpoint on main.c by name 'c'. 47 breakpoint = target.BreakpointCreateByName('c', 'a.out') 48 49 listener = lldb.SBListener("my listener") 50 51 # Now launch the process, and do not stop at the entry point. 52 error = lldb.SBError() 53 flags = target.GetLaunchInfo().GetLaunchFlags() 54 process = target.Launch(listener, 55 None, # argv 56 None, # envp 57 None, # stdin_path 58 None, # stdout_path 59 None, # stderr_path 60 None, # working directory 61 flags, # launch flags 62 False, # Stop at entry 63 error) # error 64 65 self.assertTrue( 66 process.GetState() == lldb.eStateStopped, 67 PROCESS_STOPPED) 68 69 # Create an empty event object. 70 event = lldb.SBEvent() 71 72 traceOn = self.TraceOn() 73 if traceOn: 74 lldbutil.print_stacktraces(process) 75 76 # Create MyListeningThread class to wait for any kind of event. 77 import threading 78 79 class MyListeningThread(threading.Thread): 80 81 def run(self): 82 count = 0 83 # Let's only try at most 4 times to retrieve any kind of event. 84 # After that, the thread exits. 85 while not count > 3: 86 if traceOn: 87 print("Try wait for event...") 88 if listener.WaitForEvent(5, event): 89 if traceOn: 90 desc = lldbutil.get_description(event) 91 print("Event description:", desc) 92 print("Event data flavor:", event.GetDataFlavor()) 93 print( 94 "Process state:", 95 lldbutil.state_type_to_str( 96 process.GetState())) 97 print() 98 else: 99 if traceOn: 100 print("timeout occurred waiting for event...") 101 count = count + 1 102 listener.Clear() 103 return 104 105 # Let's start the listening thread to retrieve the events. 106 my_thread = MyListeningThread() 107 my_thread.start() 108 109 # Use Python API to continue the process. The listening thread should be 110 # able to receive the state changed events. 111 process.Continue() 112 113 # Use Python API to kill the process. The listening thread should be 114 # able to receive the state changed event, too. 115 process.Kill() 116 117 # Wait until the 'MyListeningThread' terminates. 118 my_thread.join() 119 120 # Shouldn't we be testing against some kind of expectation here? 121 122 @add_test_categories(['pyapi']) 123 @expectedFlakeyLinux("llvm.org/pr23730") # Flaky, fails ~1/100 cases 124 @skipIfWindows # This is flakey on Windows AND when it fails, it hangs: llvm.org/pr38373 125 @skipIfNetBSD 126 def test_wait_for_event(self): 127 """Exercise SBListener.WaitForEvent() API.""" 128 self.build() 129 exe = self.getBuildArtifact("a.out") 130 131 self.dbg.SetAsync(True) 132 133 # Create a target by the debugger. 134 target = self.dbg.CreateTarget(exe) 135 self.assertTrue(target, VALID_TARGET) 136 137 # Now create a breakpoint on main.c by name 'c'. 138 breakpoint = target.BreakpointCreateByName('c', 'a.out') 139 self.trace("breakpoint:", breakpoint) 140 self.assertTrue(breakpoint and 141 breakpoint.GetNumLocations() == 1, 142 VALID_BREAKPOINT) 143 144 # Get the debugger listener. 145 listener = self.dbg.GetListener() 146 147 # Now launch the process, and do not stop at entry point. 148 error = lldb.SBError() 149 flags = target.GetLaunchInfo().GetLaunchFlags() 150 process = target.Launch(listener, 151 None, # argv 152 None, # envp 153 None, # stdin_path 154 None, # stdout_path 155 None, # stderr_path 156 None, # working directory 157 flags, # launch flags 158 False, # Stop at entry 159 error) # error 160 self.assertTrue(error.Success() and process, PROCESS_IS_VALID) 161 162 # Create an empty event object. 163 event = lldb.SBEvent() 164 self.assertFalse(event, "Event should not be valid initially") 165 166 # Create MyListeningThread to wait for any kind of event. 167 import threading 168 169 class MyListeningThread(threading.Thread): 170 171 def run(self): 172 count = 0 173 # Let's only try at most 3 times to retrieve any kind of event. 174 while not count > 3: 175 if listener.WaitForEvent(5, event): 176 self.trace("Got a valid event:", event) 177 self.trace("Event data flavor:", event.GetDataFlavor()) 178 self.trace("Event type:", lldbutil.state_type_to_str(event.GetType())) 179 listener.Clear() 180 return 181 count = count + 1 182 print("Timeout: listener.WaitForEvent") 183 listener.Clear() 184 return 185 186 # Use Python API to kill the process. The listening thread should be 187 # able to receive a state changed event. 188 process.Kill() 189 190 # Let's start the listening thread to retrieve the event. 191 my_thread = MyListeningThread() 192 my_thread.start() 193 194 # Wait until the 'MyListeningThread' terminates. 195 my_thread.join() 196 197 self.assertTrue(event, 198 "My listening thread successfully received an event") 199 200 @add_test_categories(['pyapi']) 201 @expectedFailureAll( 202 oslist=["linux"], 203 bugnumber="llvm.org/pr23617 Flaky, fails ~1/10 cases") 204 @skipIfWindows # This is flakey on Windows AND when it fails, it hangs: llvm.org/pr38373 205 @expectedFailureAll(oslist=["freebsd"], bugnumber="llvm.org/pr48417") 206 @expectedFailureNetBSD 207 def test_add_listener_to_broadcaster(self): 208 """Exercise some SBBroadcaster APIs.""" 209 self.build() 210 exe = self.getBuildArtifact("a.out") 211 212 self.dbg.SetAsync(True) 213 214 # Create a target by the debugger. 215 target = self.dbg.CreateTarget(exe) 216 self.assertTrue(target, VALID_TARGET) 217 218 # Now create a breakpoint on main.c by name 'c'. 219 breakpoint = target.BreakpointCreateByName('c', 'a.out') 220 self.trace("breakpoint:", breakpoint) 221 self.assertTrue(breakpoint and 222 breakpoint.GetNumLocations() == 1, 223 VALID_BREAKPOINT) 224 225 listener = lldb.SBListener("my listener") 226 227 # Now launch the process, and do not stop at the entry point. 228 error = lldb.SBError() 229 flags = target.GetLaunchInfo().GetLaunchFlags() 230 process = target.Launch(listener, 231 None, # argv 232 None, # envp 233 None, # stdin_path 234 None, # stdout_path 235 None, # stderr_path 236 None, # working directory 237 flags, # launch flags 238 False, # Stop at entry 239 error) # error 240 241 # Create an empty event object. 242 event = lldb.SBEvent() 243 self.assertFalse(event, "Event should not be valid initially") 244 245 # The finite state machine for our custom listening thread, with an 246 # initial state of None, which means no event has been received. 247 # It changes to 'connected' after 'connected' event is received (for remote platforms) 248 # It changes to 'running' after 'running' event is received (should happen only if the 249 # currentstate is either 'None' or 'connected') 250 # It changes to 'stopped' if a 'stopped' event is received (should happen only if the 251 # current state is 'running'.) 252 self.state = None 253 254 # Create MyListeningThread to wait for state changed events. 255 # By design, a "running" event is expected following by a "stopped" 256 # event. 257 import threading 258 259 class MyListeningThread(threading.Thread): 260 261 def run(self): 262 self.trace("Running MyListeningThread:", self) 263 264 # Regular expression pattern for the event description. 265 pattern = re.compile("data = {.*, state = (.*)}$") 266 267 # Let's only try at most 6 times to retrieve our events. 268 count = 0 269 while True: 270 if listener.WaitForEvent(5, event): 271 desc = lldbutil.get_description(event) 272 self.trace("Event description:", desc) 273 match = pattern.search(desc) 274 if not match: 275 break 276 if match.group(1) == 'connected': 277 # When debugging remote targets with lldb-server, we 278 # first get the 'connected' event. 279 self.context.assertTrue(self.context.state is None) 280 self.context.state = 'connected' 281 continue 282 elif match.group(1) == 'running': 283 self.context.assertTrue( 284 self.context.state is None or self.context.state == 'connected') 285 self.context.state = 'running' 286 continue 287 elif match.group(1) == 'stopped': 288 self.context.assertTrue( 289 self.context.state == 'running') 290 # Whoopee, both events have been received! 291 self.context.state = 'stopped' 292 break 293 else: 294 break 295 print("Timeout: listener.WaitForEvent") 296 count = count + 1 297 if count > 6: 298 break 299 listener.Clear() 300 return 301 302 # Use Python API to continue the process. The listening thread should be 303 # able to receive the state changed events. 304 process.Continue() 305 306 # Start the listening thread to receive the "running" followed by the 307 # "stopped" events. 308 my_thread = MyListeningThread() 309 # Supply the enclosing context so that our listening thread can access 310 # the 'state' variable. 311 my_thread.context = self 312 my_thread.start() 313 314 # Wait until the 'MyListeningThread' terminates. 315 my_thread.join() 316 317 # The final judgement. :-) 318 self.assertEqual(self.state, 'stopped', 319 "Both expected state changed events received") 320