xref: /llvm-project/lldb/test/API/python_api/event/TestEvents.py (revision 586114510c5fa71d1377c7f53e68a3b12c472aa2)
1"""
2Test lldb Python event APIs.
3"""
4
5import re
6import lldb
7from lldbsuite.test.decorators import *
8from lldbsuite.test.lldbtest import *
9from lldbsuite.test import lldbutil
10
11
12@skipIfLinux  # llvm.org/pr25924, sometimes generating SIGSEGV
13class EventAPITestCase(TestBase):
14    NO_DEBUG_INFO_TESTCASE = True
15
16    def setUp(self):
17        # Call super's setUp().
18        TestBase.setUp(self)
19        # Find the line number to of function 'c'.
20        self.line = line_number(
21            "main.c", '// Find the line number of function "c" here.'
22        )
23
24    @expectedFailureAll(
25        oslist=["linux"], bugnumber="llvm.org/pr23730 Flaky, fails ~1/10 cases"
26    )
27    @skipIfWindows  # This is flakey on Windows AND when it fails, it hangs: llvm.org/pr38373
28    @skipIfNetBSD
29    def test_listen_for_and_print_event(self):
30        """Exercise SBEvent API."""
31        self.build()
32        exe = self.getBuildArtifact("a.out")
33
34        self.dbg.SetAsync(True)
35
36        # Create a target by the debugger.
37        target = self.dbg.CreateTarget(exe)
38        self.assertTrue(target, VALID_TARGET)
39
40        # Now create a breakpoint on main.c by name 'c'.
41        breakpoint = target.BreakpointCreateByName("c", "a.out")
42
43        listener = lldb.SBListener("my listener")
44
45        # Now launch the process, and do not stop at the entry point.
46        error = lldb.SBError()
47        flags = target.GetLaunchInfo().GetLaunchFlags()
48        process = target.Launch(
49            listener,
50            None,  # argv
51            None,  # envp
52            None,  # stdin_path
53            None,  # stdout_path
54            None,  # stderr_path
55            None,  # working directory
56            flags,  # launch flags
57            False,  # Stop at entry
58            error,
59        )  # error
60
61        self.assertEqual(process.GetState(), lldb.eStateStopped, PROCESS_STOPPED)
62
63        # Create an empty event object.
64        event = lldb.SBEvent()
65
66        traceOn = self.TraceOn()
67        if traceOn:
68            lldbutil.print_stacktraces(process)
69
70        # Create MyListeningThread class to wait for any kind of event.
71        import threading
72
73        class MyListeningThread(threading.Thread):
74            def run(self):
75                count = 0
76                # Let's only try at most 4 times to retrieve any kind of event.
77                # After that, the thread exits.
78                while not count > 3:
79                    if traceOn:
80                        print("Try wait for event...")
81                    if listener.WaitForEvent(5, event):
82                        if traceOn:
83                            desc = lldbutil.get_description(event)
84                            print("Event description:", desc)
85                            print("Event data flavor:", event.GetDataFlavor())
86                            print(
87                                "Process state:",
88                                lldbutil.state_type_to_str(process.GetState()),
89                            )
90                            print()
91                    else:
92                        if traceOn:
93                            print("timeout occurred waiting for event...")
94                    count = count + 1
95                listener.Clear()
96                return
97
98        # Let's start the listening thread to retrieve the events.
99        my_thread = MyListeningThread()
100        my_thread.start()
101
102        # Use Python API to continue the process.  The listening thread should be
103        # able to receive the state changed events.
104        process.Continue()
105
106        # Use Python API to kill the process.  The listening thread should be
107        # able to receive the state changed event, too.
108        process.Kill()
109
110        # Wait until the 'MyListeningThread' terminates.
111        my_thread.join()
112
113        # Shouldn't we be testing against some kind of expectation here?
114
115    @expectedFlakeyLinux("llvm.org/pr23730")  # Flaky, fails ~1/100 cases
116    @skipIfWindows  # This is flakey on Windows AND when it fails, it hangs: llvm.org/pr38373
117    @skipIfNetBSD
118    def test_wait_for_event(self):
119        """Exercise SBListener.WaitForEvent() API."""
120        self.build()
121        exe = self.getBuildArtifact("a.out")
122
123        self.dbg.SetAsync(True)
124
125        # Create a target by the debugger.
126        target = self.dbg.CreateTarget(exe)
127        self.assertTrue(target, VALID_TARGET)
128
129        # Now create a breakpoint on main.c by name 'c'.
130        breakpoint = target.BreakpointCreateByName("c", "a.out")
131        self.trace("breakpoint:", breakpoint)
132        self.assertTrue(
133            breakpoint and breakpoint.GetNumLocations() == 1, VALID_BREAKPOINT
134        )
135
136        # Get the debugger listener.
137        listener = self.dbg.GetListener()
138
139        # Now launch the process, and do not stop at entry point.
140        error = lldb.SBError()
141        flags = target.GetLaunchInfo().GetLaunchFlags()
142        process = target.Launch(
143            listener,
144            None,  # argv
145            None,  # envp
146            None,  # stdin_path
147            None,  # stdout_path
148            None,  # stderr_path
149            None,  # working directory
150            flags,  # launch flags
151            False,  # Stop at entry
152            error,
153        )  # error
154        self.assertTrue(error.Success() and process, PROCESS_IS_VALID)
155
156        # Create an empty event object.
157        event = lldb.SBEvent()
158        self.assertFalse(event, "Event should not be valid initially")
159
160        # Create MyListeningThread to wait for any kind of event.
161        import threading
162
163        class MyListeningThread(threading.Thread):
164            def run(self):
165                count = 0
166                # Let's only try at most 3 times to retrieve any kind of event.
167                while not count > 3:
168                    if listener.WaitForEvent(5, event):
169                        self.context.trace("Got a valid event:", event)
170                        self.context.trace("Event data flavor:", event.GetDataFlavor())
171                        self.context.trace(
172                            "Event type:", lldbutil.state_type_to_str(event.GetType())
173                        )
174                        listener.Clear()
175                        return
176                    count = count + 1
177                    print("Timeout: listener.WaitForEvent")
178                listener.Clear()
179                return
180
181        # Use Python API to kill the process.  The listening thread should be
182        # able to receive a state changed event.
183        process.Kill()
184
185        # Let's start the listening thread to retrieve the event.
186        my_thread = MyListeningThread()
187        my_thread.context = self
188        my_thread.start()
189
190        # Wait until the 'MyListeningThread' terminates.
191        my_thread.join()
192
193        self.assertTrue(event, "My listening thread successfully received an event")
194
195    @expectedFailureAll(
196        oslist=["linux"], bugnumber="llvm.org/pr23617 Flaky, fails ~1/10 cases"
197    )
198    @skipIfWindows  # This is flakey on Windows AND when it fails, it hangs: llvm.org/pr38373
199    @expectedFailureNetBSD
200    def test_add_listener_to_broadcaster(self):
201        """Exercise some SBBroadcaster APIs."""
202        self.build()
203        exe = self.getBuildArtifact("a.out")
204
205        self.dbg.SetAsync(True)
206
207        # Create a target by the debugger.
208        target = self.dbg.CreateTarget(exe)
209        self.assertTrue(target, VALID_TARGET)
210
211        # Now create a breakpoint on main.c by name 'c'.
212        breakpoint = target.BreakpointCreateByName("c", "a.out")
213        self.trace("breakpoint:", breakpoint)
214        self.assertTrue(
215            breakpoint and breakpoint.GetNumLocations() == 1, VALID_BREAKPOINT
216        )
217
218        listener = lldb.SBListener("my listener")
219
220        # Now launch the process, and do not stop at the entry point.
221        error = lldb.SBError()
222        flags = target.GetLaunchInfo().GetLaunchFlags()
223        process = target.Launch(
224            listener,
225            None,  # argv
226            None,  # envp
227            None,  # stdin_path
228            None,  # stdout_path
229            None,  # stderr_path
230            None,  # working directory
231            flags,  # launch flags
232            False,  # Stop at entry
233            error,
234        )  # error
235
236        # Create an empty event object.
237        event = lldb.SBEvent()
238        self.assertFalse(event, "Event should not be valid initially")
239
240        # The finite state machine for our custom listening thread, with an
241        # initial state of None, which means no event has been received.
242        # It changes to 'connected' after 'connected' event is received (for remote platforms)
243        # It changes to 'running' after 'running' event is received (should happen only if the
244        # currentstate is either 'None' or 'connected')
245        # It changes to 'stopped' if a 'stopped' event is received (should happen only if the
246        # current state is 'running'.)
247        self.state = None
248
249        # Create MyListeningThread to wait for state changed events.
250        # By design, a "running" event is expected following by a "stopped"
251        # event.
252        import threading
253
254        class MyListeningThread(threading.Thread):
255            def run(self):
256                self.context.trace("Running MyListeningThread:", self)
257
258                # Regular expression pattern for the event description.
259                pattern = re.compile("data = {.*, state = (.*)}$")
260
261                # Let's only try at most 6 times to retrieve our events.
262                count = 0
263                while True:
264                    if listener.WaitForEvent(5, event):
265                        desc = lldbutil.get_description(event)
266                        self.context.trace("Event description:", desc)
267                        match = pattern.search(desc)
268                        if not match:
269                            break
270                        if match.group(1) == "connected":
271                            # When debugging remote targets with lldb-server, we
272                            # first get the 'connected' event.
273                            self.context.assertTrue(self.context.state is None)
274                            self.context.state = "connected"
275                            continue
276                        elif match.group(1) == "running":
277                            self.context.assertTrue(
278                                self.context.state is None
279                                or self.context.state == "connected"
280                            )
281                            self.context.state = "running"
282                            continue
283                        elif match.group(1) == "stopped":
284                            self.context.assertTrue(self.context.state == "running")
285                            # Whoopee, both events have been received!
286                            self.context.state = "stopped"
287                            break
288                        else:
289                            break
290                    print("Timeout: listener.WaitForEvent")
291                    count = count + 1
292                    if count > 6:
293                        break
294                listener.Clear()
295                return
296
297        # Use Python API to continue the process.  The listening thread should be
298        # able to receive the state changed events.
299        process.Continue()
300
301        # Start the listening thread to receive the "running" followed by the
302        # "stopped" events.
303        my_thread = MyListeningThread()
304        # Supply the enclosing context so that our listening thread can access
305        # the 'state' variable.
306        my_thread.context = self
307        my_thread.start()
308
309        # Wait until the 'MyListeningThread' terminates.
310        my_thread.join()
311
312        # The final judgement. :-)
313        self.assertEqual(
314            self.state, "stopped", "Both expected state changed events received"
315        )
316
317    def wait_for_next_event(self, expected_state, test_shadow=False):
318        """Wait for an event from self.primary & self.shadow listener.
319        If test_shadow is true, we also check that the shadow listener only
320        receives events AFTER the primary listener does."""
321        # Waiting on the shadow listener shouldn't have events yet because
322        # we haven't fetched them for the primary listener yet:
323        event = lldb.SBEvent()
324
325        if test_shadow:
326            success = self.shadow_listener.WaitForEvent(1, event)
327            self.assertFalse(success, "Shadow listener doesn't pull events")
328
329        # But there should be an event for the primary listener:
330        success = self.primary_listener.WaitForEvent(5, event)
331        self.assertTrue(success, "Primary listener got the event")
332
333        state = lldb.SBProcess.GetStateFromEvent(event)
334        restart = False
335        if state == lldb.eStateStopped:
336            restart = lldb.SBProcess.GetRestartedFromEvent(event)
337
338        if expected_state is not None:
339            self.assertEqual(
340                state, expected_state, "Primary thread got the correct event"
341            )
342
343        # And after pulling that one there should be an equivalent event for the shadow
344        # listener:
345        success = self.shadow_listener.WaitForEvent(5, event)
346        self.assertTrue(success, "Shadow listener got event too")
347        self.assertEqual(
348            state, lldb.SBProcess.GetStateFromEvent(event), "It was the same event"
349        )
350        self.assertEqual(
351            restart,
352            lldb.SBProcess.GetRestartedFromEvent(event),
353            "It was the same restarted",
354        )
355
356        return state, restart
357
358    @expectedFlakeyLinux("llvm.org/pr23730")  # Flaky, fails ~1/100 cases
359    @skipIfWindows  # This is flakey on Windows AND when it fails, it hangs: llvm.org/pr38373
360    @skipIfNetBSD
361    def test_shadow_listener(self):
362        self.build()
363        exe = self.getBuildArtifact("a.out")
364
365        # Create a target by the debugger.
366        target = self.dbg.CreateTarget(exe)
367        self.assertTrue(target, VALID_TARGET)
368
369        # Now create a breakpoint on main.c by name 'c'.
370        bkpt1 = target.BreakpointCreateByName("c", "a.out")
371        self.trace("breakpoint:", bkpt1)
372        self.assertEqual(bkpt1.GetNumLocations(), 1, VALID_BREAKPOINT)
373
374        self.primary_listener = lldb.SBListener("my listener")
375        self.shadow_listener = lldb.SBListener("shadow listener")
376
377        self.cur_thread = None
378
379        error = lldb.SBError()
380        launch_info = target.GetLaunchInfo()
381        launch_info.SetListener(self.primary_listener)
382        launch_info.SetShadowListener(self.shadow_listener)
383
384        self.runCmd(
385            "settings set target.process.extra-startup-command QSetLogging:bitmask=LOG_PROCESS|LOG_EXCEPTIONS|LOG_RNB_PACKETS|LOG_STEP;"
386        )
387        self.dbg.SetAsync(True)
388
389        self.process = target.Launch(launch_info, error)
390        self.assertSuccess(error, "Process launched successfully")
391
392        # Keep fetching events from the primary to trigger the do on removal and
393        # then from the shadow listener, and make sure they match:
394
395        # Events in the launch sequence might be platform dependent, so don't
396        # expect any particular event till we get the stopped:
397        state = lldb.eStateInvalid
398        while state != lldb.eStateStopped:
399            state, restart = self.wait_for_next_event(None, False)
400
401        # Okay, we're now at a good stop, so try a next:
402        self.cur_thread = self.process.threads[0]
403
404        # Make sure we're at our expected breakpoint:
405        self.assertTrue(self.cur_thread.IsValid(), "Got a zeroth thread")
406        self.assertEqual(self.cur_thread.stop_reason, lldb.eStopReasonBreakpoint)
407        self.assertEqual(
408            self.cur_thread.GetStopReasonDataCount(), 2, "Only one breakpoint/loc here"
409        )
410        self.assertEqual(
411            bkpt1.GetID(),
412            self.cur_thread.GetStopReasonDataAtIndex(0),
413            "Hit the right breakpoint",
414        )
415        # Disable the first breakpoint so it doesn't get in the way...
416        bkpt1.SetEnabled(False)
417
418        self.cur_thread.StepOver()
419        # We'll run the test for "shadow listener blocked by primary listener
420        # for the first couple rounds, then we'll skip the 1 second pause...
421        self.wait_for_next_event(lldb.eStateRunning, True)
422        self.wait_for_next_event(lldb.eStateStopped, True)
423
424        # Next try an auto-continue breakpoint and make sure the shadow listener got
425        # the resumed info as well.  Note that I'm not explicitly counting
426        # running events here.  At the point when I wrote this lldb sometimes
427        # emits two running events in a row.  Apparently the code to coalesce running
428        # events isn't working.  But that's not what this test is testing, we're really
429        # testing that the primary & shadow listeners hear the same thing and in the
430        # right order.
431
432        main_spec = lldb.SBFileSpec("main.c")
433        bkpt2 = target.BreakpointCreateBySourceRegex("b.2. returns %d", main_spec)
434        self.assertGreater(bkpt2.GetNumLocations(), 0, "BP2 worked")
435        bkpt2.SetAutoContinue(True)
436
437        bkpt3 = target.BreakpointCreateBySourceRegex("a.3. returns %d", main_spec)
438        self.assertGreater(bkpt3.GetNumLocations(), 0, "BP3 worked")
439
440        state = lldb.eStateStopped
441        restarted = False
442
443        # Put in a counter to make sure we don't spin forever if there is some
444        # error in the logic.
445        counter = 0
446        while state != lldb.eStateExited:
447            counter += 1
448            self.assertLess(
449                counter, 50, "Took more than 50 events to hit two breakpoints."
450            )
451            if state == lldb.eStateStopped and not restarted:
452                self.process.Continue()
453            state, restarted = self.wait_for_next_event(None, False)
454