1""" 2Test lldb Python event APIs. 3""" 4 5from __future__ import print_function 6 7 8import re 9import lldb 10from lldbsuite.test.decorators import * 11from lldbsuite.test.lldbtest import * 12from lldbsuite.test import lldbutil 13 14 15@skipIfLinux # llvm.org/pr25924, sometimes generating SIGSEGV 16@skipIfDarwin 17class EventAPITestCase(TestBase): 18 19 mydir = TestBase.compute_mydir(__file__) 20 NO_DEBUG_INFO_TESTCASE = True 21 22 def setUp(self): 23 # Call super's setUp(). 24 TestBase.setUp(self) 25 # Find the line number to of function 'c'. 26 self.line = line_number( 27 'main.c', '// Find the line number of function "c" here.') 28 29 @expectedFailureAll( 30 oslist=["linux"], 31 bugnumber="llvm.org/pr23730 Flaky, fails ~1/10 cases") 32 @skipIfWindows # This is flakey on Windows AND when it fails, it hangs: llvm.org/pr38373 33 @skipIfNetBSD 34 def test_listen_for_and_print_event(self): 35 """Exercise SBEvent API.""" 36 self.build() 37 exe = self.getBuildArtifact("a.out") 38 39 self.dbg.SetAsync(True) 40 41 # Create a target by the debugger. 42 target = self.dbg.CreateTarget(exe) 43 self.assertTrue(target, VALID_TARGET) 44 45 # Now create a breakpoint on main.c by name 'c'. 46 breakpoint = target.BreakpointCreateByName('c', 'a.out') 47 48 listener = lldb.SBListener("my listener") 49 50 # Now launch the process, and do not stop at the entry point. 51 error = lldb.SBError() 52 flags = target.GetLaunchInfo().GetLaunchFlags() 53 process = target.Launch(listener, 54 None, # argv 55 None, # envp 56 None, # stdin_path 57 None, # stdout_path 58 None, # stderr_path 59 None, # working directory 60 flags, # launch flags 61 False, # Stop at entry 62 error) # error 63 64 self.assertEqual( 65 process.GetState(), lldb.eStateStopped, 66 PROCESS_STOPPED) 67 68 # Create an empty event object. 69 event = lldb.SBEvent() 70 71 traceOn = self.TraceOn() 72 if traceOn: 73 lldbutil.print_stacktraces(process) 74 75 # Create MyListeningThread class to wait for any kind of event. 76 import threading 77 78 class MyListeningThread(threading.Thread): 79 80 def run(self): 81 count = 0 82 # Let's only try at most 4 times to retrieve any kind of event. 83 # After that, the thread exits. 84 while not count > 3: 85 if traceOn: 86 print("Try wait for event...") 87 if listener.WaitForEvent(5, event): 88 if traceOn: 89 desc = lldbutil.get_description(event) 90 print("Event description:", desc) 91 print("Event data flavor:", event.GetDataFlavor()) 92 print( 93 "Process state:", 94 lldbutil.state_type_to_str( 95 process.GetState())) 96 print() 97 else: 98 if traceOn: 99 print("timeout occurred waiting for event...") 100 count = count + 1 101 listener.Clear() 102 return 103 104 # Let's start the listening thread to retrieve the events. 105 my_thread = MyListeningThread() 106 my_thread.start() 107 108 # Use Python API to continue the process. The listening thread should be 109 # able to receive the state changed events. 110 process.Continue() 111 112 # Use Python API to kill the process. The listening thread should be 113 # able to receive the state changed event, too. 114 process.Kill() 115 116 # Wait until the 'MyListeningThread' terminates. 117 my_thread.join() 118 119 # Shouldn't we be testing against some kind of expectation here? 120 121 @expectedFlakeyLinux("llvm.org/pr23730") # Flaky, fails ~1/100 cases 122 @skipIfWindows # This is flakey on Windows AND when it fails, it hangs: llvm.org/pr38373 123 @skipIfNetBSD 124 def test_wait_for_event(self): 125 """Exercise SBListener.WaitForEvent() API.""" 126 self.build() 127 exe = self.getBuildArtifact("a.out") 128 129 self.dbg.SetAsync(True) 130 131 # Create a target by the debugger. 132 target = self.dbg.CreateTarget(exe) 133 self.assertTrue(target, VALID_TARGET) 134 135 # Now create a breakpoint on main.c by name 'c'. 136 breakpoint = target.BreakpointCreateByName('c', 'a.out') 137 self.trace("breakpoint:", breakpoint) 138 self.assertTrue(breakpoint and 139 breakpoint.GetNumLocations() == 1, 140 VALID_BREAKPOINT) 141 142 # Get the debugger listener. 143 listener = self.dbg.GetListener() 144 145 # Now launch the process, and do not stop at entry point. 146 error = lldb.SBError() 147 flags = target.GetLaunchInfo().GetLaunchFlags() 148 process = target.Launch(listener, 149 None, # argv 150 None, # envp 151 None, # stdin_path 152 None, # stdout_path 153 None, # stderr_path 154 None, # working directory 155 flags, # launch flags 156 False, # Stop at entry 157 error) # error 158 self.assertTrue(error.Success() and process, PROCESS_IS_VALID) 159 160 # Create an empty event object. 161 event = lldb.SBEvent() 162 self.assertFalse(event, "Event should not be valid initially") 163 164 # Create MyListeningThread to wait for any kind of event. 165 import threading 166 167 class MyListeningThread(threading.Thread): 168 169 def run(self): 170 count = 0 171 # Let's only try at most 3 times to retrieve any kind of event. 172 while not count > 3: 173 if listener.WaitForEvent(5, event): 174 self.trace("Got a valid event:", event) 175 self.trace("Event data flavor:", event.GetDataFlavor()) 176 self.trace("Event type:", lldbutil.state_type_to_str(event.GetType())) 177 listener.Clear() 178 return 179 count = count + 1 180 print("Timeout: listener.WaitForEvent") 181 listener.Clear() 182 return 183 184 # Use Python API to kill the process. The listening thread should be 185 # able to receive a state changed event. 186 process.Kill() 187 188 # Let's start the listening thread to retrieve the event. 189 my_thread = MyListeningThread() 190 my_thread.start() 191 192 # Wait until the 'MyListeningThread' terminates. 193 my_thread.join() 194 195 self.assertTrue(event, 196 "My listening thread successfully received an event") 197 198 @expectedFailureAll( 199 oslist=["linux"], 200 bugnumber="llvm.org/pr23617 Flaky, fails ~1/10 cases") 201 @skipIfWindows # This is flakey on Windows AND when it fails, it hangs: llvm.org/pr38373 202 @expectedFailureAll(oslist=["freebsd"], bugnumber="llvm.org/pr48417") 203 @expectedFailureNetBSD 204 def test_add_listener_to_broadcaster(self): 205 """Exercise some SBBroadcaster APIs.""" 206 self.build() 207 exe = self.getBuildArtifact("a.out") 208 209 self.dbg.SetAsync(True) 210 211 # Create a target by the debugger. 212 target = self.dbg.CreateTarget(exe) 213 self.assertTrue(target, VALID_TARGET) 214 215 # Now create a breakpoint on main.c by name 'c'. 216 breakpoint = target.BreakpointCreateByName('c', 'a.out') 217 self.trace("breakpoint:", breakpoint) 218 self.assertTrue(breakpoint and 219 breakpoint.GetNumLocations() == 1, 220 VALID_BREAKPOINT) 221 222 listener = lldb.SBListener("my listener") 223 224 # Now launch the process, and do not stop at the entry point. 225 error = lldb.SBError() 226 flags = target.GetLaunchInfo().GetLaunchFlags() 227 process = target.Launch(listener, 228 None, # argv 229 None, # envp 230 None, # stdin_path 231 None, # stdout_path 232 None, # stderr_path 233 None, # working directory 234 flags, # launch flags 235 False, # Stop at entry 236 error) # error 237 238 # Create an empty event object. 239 event = lldb.SBEvent() 240 self.assertFalse(event, "Event should not be valid initially") 241 242 # The finite state machine for our custom listening thread, with an 243 # initial state of None, which means no event has been received. 244 # It changes to 'connected' after 'connected' event is received (for remote platforms) 245 # It changes to 'running' after 'running' event is received (should happen only if the 246 # currentstate is either 'None' or 'connected') 247 # It changes to 'stopped' if a 'stopped' event is received (should happen only if the 248 # current state is 'running'.) 249 self.state = None 250 251 # Create MyListeningThread to wait for state changed events. 252 # By design, a "running" event is expected following by a "stopped" 253 # event. 254 import threading 255 256 class MyListeningThread(threading.Thread): 257 258 def run(self): 259 self.trace("Running MyListeningThread:", self) 260 261 # Regular expression pattern for the event description. 262 pattern = re.compile("data = {.*, state = (.*)}$") 263 264 # Let's only try at most 6 times to retrieve our events. 265 count = 0 266 while True: 267 if listener.WaitForEvent(5, event): 268 desc = lldbutil.get_description(event) 269 self.trace("Event description:", desc) 270 match = pattern.search(desc) 271 if not match: 272 break 273 if match.group(1) == 'connected': 274 # When debugging remote targets with lldb-server, we 275 # first get the 'connected' event. 276 self.context.assertTrue(self.context.state is None) 277 self.context.state = 'connected' 278 continue 279 elif match.group(1) == 'running': 280 self.context.assertTrue( 281 self.context.state is None or self.context.state == 'connected') 282 self.context.state = 'running' 283 continue 284 elif match.group(1) == 'stopped': 285 self.context.assertTrue( 286 self.context.state == 'running') 287 # Whoopee, both events have been received! 288 self.context.state = 'stopped' 289 break 290 else: 291 break 292 print("Timeout: listener.WaitForEvent") 293 count = count + 1 294 if count > 6: 295 break 296 listener.Clear() 297 return 298 299 # Use Python API to continue the process. The listening thread should be 300 # able to receive the state changed events. 301 process.Continue() 302 303 # Start the listening thread to receive the "running" followed by the 304 # "stopped" events. 305 my_thread = MyListeningThread() 306 # Supply the enclosing context so that our listening thread can access 307 # the 'state' variable. 308 my_thread.context = self 309 my_thread.start() 310 311 # Wait until the 'MyListeningThread' terminates. 312 my_thread.join() 313 314 # The final judgement. :-) 315 self.assertEqual(self.state, 'stopped', 316 "Both expected state changed events received") 317