1""" 2Test lldb Python event APIs. 3""" 4 5from __future__ import print_function 6 7 8import re 9import lldb 10from lldbsuite.test.decorators import * 11from lldbsuite.test.lldbtest import * 12from lldbsuite.test import lldbutil 13 14 15@skipIfLinux # llvm.org/pr25924, sometimes generating SIGSEGV 16class EventAPITestCase(TestBase): 17 NO_DEBUG_INFO_TESTCASE = True 18 19 def setUp(self): 20 # Call super's setUp(). 21 TestBase.setUp(self) 22 # Find the line number to of function 'c'. 23 self.line = line_number( 24 'main.c', '// Find the line number of function "c" here.') 25 26 @expectedFailureAll( 27 oslist=["linux"], 28 bugnumber="llvm.org/pr23730 Flaky, fails ~1/10 cases") 29 @skipIfWindows # This is flakey on Windows AND when it fails, it hangs: llvm.org/pr38373 30 @skipIfNetBSD 31 def test_listen_for_and_print_event(self): 32 """Exercise SBEvent API.""" 33 self.build() 34 exe = self.getBuildArtifact("a.out") 35 36 self.dbg.SetAsync(True) 37 38 # Create a target by the debugger. 39 target = self.dbg.CreateTarget(exe) 40 self.assertTrue(target, VALID_TARGET) 41 42 # Now create a breakpoint on main.c by name 'c'. 43 breakpoint = target.BreakpointCreateByName('c', 'a.out') 44 45 listener = lldb.SBListener("my listener") 46 47 # Now launch the process, and do not stop at the entry point. 48 error = lldb.SBError() 49 flags = target.GetLaunchInfo().GetLaunchFlags() 50 process = target.Launch(listener, 51 None, # argv 52 None, # envp 53 None, # stdin_path 54 None, # stdout_path 55 None, # stderr_path 56 None, # working directory 57 flags, # launch flags 58 False, # Stop at entry 59 error) # error 60 61 self.assertEqual( 62 process.GetState(), lldb.eStateStopped, 63 PROCESS_STOPPED) 64 65 # Create an empty event object. 66 event = lldb.SBEvent() 67 68 traceOn = self.TraceOn() 69 if traceOn: 70 lldbutil.print_stacktraces(process) 71 72 # Create MyListeningThread class to wait for any kind of event. 73 import threading 74 75 class MyListeningThread(threading.Thread): 76 77 def run(self): 78 count = 0 79 # Let's only try at most 4 times to retrieve any kind of event. 80 # After that, the thread exits. 81 while not count > 3: 82 if traceOn: 83 print("Try wait for event...") 84 if listener.WaitForEvent(5, event): 85 if traceOn: 86 desc = lldbutil.get_description(event) 87 print("Event description:", desc) 88 print("Event data flavor:", event.GetDataFlavor()) 89 print( 90 "Process state:", 91 lldbutil.state_type_to_str( 92 process.GetState())) 93 print() 94 else: 95 if traceOn: 96 print("timeout occurred waiting for event...") 97 count = count + 1 98 listener.Clear() 99 return 100 101 # Let's start the listening thread to retrieve the events. 102 my_thread = MyListeningThread() 103 my_thread.start() 104 105 # Use Python API to continue the process. The listening thread should be 106 # able to receive the state changed events. 107 process.Continue() 108 109 # Use Python API to kill the process. The listening thread should be 110 # able to receive the state changed event, too. 111 process.Kill() 112 113 # Wait until the 'MyListeningThread' terminates. 114 my_thread.join() 115 116 # Shouldn't we be testing against some kind of expectation here? 117 118 @expectedFlakeyLinux("llvm.org/pr23730") # Flaky, fails ~1/100 cases 119 @skipIfWindows # This is flakey on Windows AND when it fails, it hangs: llvm.org/pr38373 120 @skipIfNetBSD 121 def test_wait_for_event(self): 122 """Exercise SBListener.WaitForEvent() API.""" 123 self.build() 124 exe = self.getBuildArtifact("a.out") 125 126 self.dbg.SetAsync(True) 127 128 # Create a target by the debugger. 129 target = self.dbg.CreateTarget(exe) 130 self.assertTrue(target, VALID_TARGET) 131 132 # Now create a breakpoint on main.c by name 'c'. 133 breakpoint = target.BreakpointCreateByName('c', 'a.out') 134 self.trace("breakpoint:", breakpoint) 135 self.assertTrue(breakpoint and 136 breakpoint.GetNumLocations() == 1, 137 VALID_BREAKPOINT) 138 139 # Get the debugger listener. 140 listener = self.dbg.GetListener() 141 142 # Now launch the process, and do not stop at entry point. 143 error = lldb.SBError() 144 flags = target.GetLaunchInfo().GetLaunchFlags() 145 process = target.Launch(listener, 146 None, # argv 147 None, # envp 148 None, # stdin_path 149 None, # stdout_path 150 None, # stderr_path 151 None, # working directory 152 flags, # launch flags 153 False, # Stop at entry 154 error) # error 155 self.assertTrue(error.Success() and process, PROCESS_IS_VALID) 156 157 # Create an empty event object. 158 event = lldb.SBEvent() 159 self.assertFalse(event, "Event should not be valid initially") 160 161 # Create MyListeningThread to wait for any kind of event. 162 import threading 163 164 class MyListeningThread(threading.Thread): 165 166 def run(self): 167 count = 0 168 # Let's only try at most 3 times to retrieve any kind of event. 169 while not count > 3: 170 if listener.WaitForEvent(5, event): 171 self.context.trace("Got a valid event:", event) 172 self.context.trace("Event data flavor:", event.GetDataFlavor()) 173 self.context.trace("Event type:", lldbutil.state_type_to_str(event.GetType())) 174 listener.Clear() 175 return 176 count = count + 1 177 print("Timeout: listener.WaitForEvent") 178 listener.Clear() 179 return 180 181 # Use Python API to kill the process. The listening thread should be 182 # able to receive a state changed event. 183 process.Kill() 184 185 # Let's start the listening thread to retrieve the event. 186 my_thread = MyListeningThread() 187 my_thread.context = self 188 my_thread.start() 189 190 # Wait until the 'MyListeningThread' terminates. 191 my_thread.join() 192 193 self.assertTrue(event, 194 "My listening thread successfully received an event") 195 196 @expectedFailureAll( 197 oslist=["linux"], 198 bugnumber="llvm.org/pr23617 Flaky, fails ~1/10 cases") 199 @skipIfWindows # This is flakey on Windows AND when it fails, it hangs: llvm.org/pr38373 200 @expectedFailureNetBSD 201 def test_add_listener_to_broadcaster(self): 202 """Exercise some SBBroadcaster APIs.""" 203 self.build() 204 exe = self.getBuildArtifact("a.out") 205 206 self.dbg.SetAsync(True) 207 208 # Create a target by the debugger. 209 target = self.dbg.CreateTarget(exe) 210 self.assertTrue(target, VALID_TARGET) 211 212 # Now create a breakpoint on main.c by name 'c'. 213 breakpoint = target.BreakpointCreateByName('c', 'a.out') 214 self.trace("breakpoint:", breakpoint) 215 self.assertTrue(breakpoint and 216 breakpoint.GetNumLocations() == 1, 217 VALID_BREAKPOINT) 218 219 listener = lldb.SBListener("my listener") 220 221 # Now launch the process, and do not stop at the entry point. 222 error = lldb.SBError() 223 flags = target.GetLaunchInfo().GetLaunchFlags() 224 process = target.Launch(listener, 225 None, # argv 226 None, # envp 227 None, # stdin_path 228 None, # stdout_path 229 None, # stderr_path 230 None, # working directory 231 flags, # launch flags 232 False, # Stop at entry 233 error) # error 234 235 # Create an empty event object. 236 event = lldb.SBEvent() 237 self.assertFalse(event, "Event should not be valid initially") 238 239 # The finite state machine for our custom listening thread, with an 240 # initial state of None, which means no event has been received. 241 # It changes to 'connected' after 'connected' event is received (for remote platforms) 242 # It changes to 'running' after 'running' event is received (should happen only if the 243 # currentstate is either 'None' or 'connected') 244 # It changes to 'stopped' if a 'stopped' event is received (should happen only if the 245 # current state is 'running'.) 246 self.state = None 247 248 # Create MyListeningThread to wait for state changed events. 249 # By design, a "running" event is expected following by a "stopped" 250 # event. 251 import threading 252 253 class MyListeningThread(threading.Thread): 254 255 def run(self): 256 self.context.trace("Running MyListeningThread:", self) 257 258 # Regular expression pattern for the event description. 259 pattern = re.compile("data = {.*, state = (.*)}$") 260 261 # Let's only try at most 6 times to retrieve our events. 262 count = 0 263 while True: 264 if listener.WaitForEvent(5, event): 265 desc = lldbutil.get_description(event) 266 self.context.trace("Event description:", desc) 267 match = pattern.search(desc) 268 if not match: 269 break 270 if match.group(1) == 'connected': 271 # When debugging remote targets with lldb-server, we 272 # first get the 'connected' event. 273 self.context.assertTrue(self.context.state is None) 274 self.context.state = 'connected' 275 continue 276 elif match.group(1) == 'running': 277 self.context.assertTrue( 278 self.context.state is None or self.context.state == 'connected') 279 self.context.state = 'running' 280 continue 281 elif match.group(1) == 'stopped': 282 self.context.assertTrue( 283 self.context.state == 'running') 284 # Whoopee, both events have been received! 285 self.context.state = 'stopped' 286 break 287 else: 288 break 289 print("Timeout: listener.WaitForEvent") 290 count = count + 1 291 if count > 6: 292 break 293 listener.Clear() 294 return 295 296 # Use Python API to continue the process. The listening thread should be 297 # able to receive the state changed events. 298 process.Continue() 299 300 # Start the listening thread to receive the "running" followed by the 301 # "stopped" events. 302 my_thread = MyListeningThread() 303 # Supply the enclosing context so that our listening thread can access 304 # the 'state' variable. 305 my_thread.context = self 306 my_thread.start() 307 308 # Wait until the 'MyListeningThread' terminates. 309 my_thread.join() 310 311 # The final judgement. :-) 312 self.assertEqual(self.state, 'stopped', 313 "Both expected state changed events received") 314