xref: /llvm-project/lldb/test/API/python_api/event/TestEvents.py (revision 193259cbcec77add8e189c4dedeefb15fef50d5e)
1"""
2Test lldb Python event APIs.
3"""
4
5import re
6import lldb
7from lldbsuite.test.decorators import *
8from lldbsuite.test.lldbtest import *
9from lldbsuite.test import lldbutil
10
11
12@skipIfLinux   # llvm.org/pr25924, sometimes generating SIGSEGV
13class EventAPITestCase(TestBase):
14    NO_DEBUG_INFO_TESTCASE = True
15
16    def setUp(self):
17        # Call super's setUp().
18        TestBase.setUp(self)
19        # Find the line number to of function 'c'.
20        self.line = line_number(
21            'main.c', '// Find the line number of function "c" here.')
22
23    @expectedFailureAll(
24        oslist=["linux"],
25        bugnumber="llvm.org/pr23730 Flaky, fails ~1/10 cases")
26    @skipIfWindows # This is flakey on Windows AND when it fails, it hangs: llvm.org/pr38373
27    @skipIfNetBSD
28    def test_listen_for_and_print_event(self):
29        """Exercise SBEvent API."""
30        self.build()
31        exe = self.getBuildArtifact("a.out")
32
33        self.dbg.SetAsync(True)
34
35        # Create a target by the debugger.
36        target = self.dbg.CreateTarget(exe)
37        self.assertTrue(target, VALID_TARGET)
38
39        # Now create a breakpoint on main.c by name 'c'.
40        breakpoint = target.BreakpointCreateByName('c', 'a.out')
41
42        listener = lldb.SBListener("my listener")
43
44        # Now launch the process, and do not stop at the entry point.
45        error = lldb.SBError()
46        flags = target.GetLaunchInfo().GetLaunchFlags()
47        process = target.Launch(listener,
48                                None,      # argv
49                                None,      # envp
50                                None,      # stdin_path
51                                None,      # stdout_path
52                                None,      # stderr_path
53                                None,      # working directory
54                                flags,     # launch flags
55                                False,     # Stop at entry
56                                error)     # error
57
58        self.assertEqual(
59            process.GetState(), lldb.eStateStopped,
60            PROCESS_STOPPED)
61
62        # Create an empty event object.
63        event = lldb.SBEvent()
64
65        traceOn = self.TraceOn()
66        if traceOn:
67            lldbutil.print_stacktraces(process)
68
69        # Create MyListeningThread class to wait for any kind of event.
70        import threading
71
72        class MyListeningThread(threading.Thread):
73
74            def run(self):
75                count = 0
76                # Let's only try at most 4 times to retrieve any kind of event.
77                # After that, the thread exits.
78                while not count > 3:
79                    if traceOn:
80                        print("Try wait for event...")
81                    if listener.WaitForEvent(5, event):
82                        if traceOn:
83                            desc = lldbutil.get_description(event)
84                            print("Event description:", desc)
85                            print("Event data flavor:", event.GetDataFlavor())
86                            print(
87                                "Process state:",
88                                lldbutil.state_type_to_str(
89                                    process.GetState()))
90                            print()
91                    else:
92                        if traceOn:
93                            print("timeout occurred waiting for event...")
94                    count = count + 1
95                listener.Clear()
96                return
97
98        # Let's start the listening thread to retrieve the events.
99        my_thread = MyListeningThread()
100        my_thread.start()
101
102        # Use Python API to continue the process.  The listening thread should be
103        # able to receive the state changed events.
104        process.Continue()
105
106        # Use Python API to kill the process.  The listening thread should be
107        # able to receive the state changed event, too.
108        process.Kill()
109
110        # Wait until the 'MyListeningThread' terminates.
111        my_thread.join()
112
113        # Shouldn't we be testing against some kind of expectation here?
114
115    @expectedFlakeyLinux("llvm.org/pr23730")  # Flaky, fails ~1/100 cases
116    @skipIfWindows # This is flakey on Windows AND when it fails, it hangs: llvm.org/pr38373
117    @skipIfNetBSD
118    def test_wait_for_event(self):
119        """Exercise SBListener.WaitForEvent() API."""
120        self.build()
121        exe = self.getBuildArtifact("a.out")
122
123        self.dbg.SetAsync(True)
124
125        # Create a target by the debugger.
126        target = self.dbg.CreateTarget(exe)
127        self.assertTrue(target, VALID_TARGET)
128
129        # Now create a breakpoint on main.c by name 'c'.
130        breakpoint = target.BreakpointCreateByName('c', 'a.out')
131        self.trace("breakpoint:", breakpoint)
132        self.assertTrue(breakpoint and
133                        breakpoint.GetNumLocations() == 1,
134                        VALID_BREAKPOINT)
135
136        # Get the debugger listener.
137        listener = self.dbg.GetListener()
138
139        # Now launch the process, and do not stop at entry point.
140        error = lldb.SBError()
141        flags = target.GetLaunchInfo().GetLaunchFlags()
142        process = target.Launch(listener,
143                                None,      # argv
144                                None,      # envp
145                                None,      # stdin_path
146                                None,      # stdout_path
147                                None,      # stderr_path
148                                None,      # working directory
149                                flags,     # launch flags
150                                False,     # Stop at entry
151                                error)     # error
152        self.assertTrue(error.Success() and process, PROCESS_IS_VALID)
153
154        # Create an empty event object.
155        event = lldb.SBEvent()
156        self.assertFalse(event, "Event should not be valid initially")
157
158        # Create MyListeningThread to wait for any kind of event.
159        import threading
160
161        class MyListeningThread(threading.Thread):
162
163            def run(self):
164                count = 0
165                # Let's only try at most 3 times to retrieve any kind of event.
166                while not count > 3:
167                    if listener.WaitForEvent(5, event):
168                        self.context.trace("Got a valid event:", event)
169                        self.context.trace("Event data flavor:", event.GetDataFlavor())
170                        self.context.trace("Event type:", lldbutil.state_type_to_str(event.GetType()))
171                        listener.Clear()
172                        return
173                    count = count + 1
174                    print("Timeout: listener.WaitForEvent")
175                listener.Clear()
176                return
177
178        # Use Python API to kill the process.  The listening thread should be
179        # able to receive a state changed event.
180        process.Kill()
181
182        # Let's start the listening thread to retrieve the event.
183        my_thread = MyListeningThread()
184        my_thread.context = self
185        my_thread.start()
186
187        # Wait until the 'MyListeningThread' terminates.
188        my_thread.join()
189
190        self.assertTrue(event,
191                        "My listening thread successfully received an event")
192
193    @expectedFailureAll(
194        oslist=["linux"],
195        bugnumber="llvm.org/pr23617 Flaky, fails ~1/10 cases")
196    @skipIfWindows # This is flakey on Windows AND when it fails, it hangs: llvm.org/pr38373
197    @expectedFailureNetBSD
198    def test_add_listener_to_broadcaster(self):
199        """Exercise some SBBroadcaster APIs."""
200        self.build()
201        exe = self.getBuildArtifact("a.out")
202
203        self.dbg.SetAsync(True)
204
205        # Create a target by the debugger.
206        target = self.dbg.CreateTarget(exe)
207        self.assertTrue(target, VALID_TARGET)
208
209        # Now create a breakpoint on main.c by name 'c'.
210        breakpoint = target.BreakpointCreateByName('c', 'a.out')
211        self.trace("breakpoint:", breakpoint)
212        self.assertTrue(breakpoint and
213                        breakpoint.GetNumLocations() == 1,
214                        VALID_BREAKPOINT)
215
216        listener = lldb.SBListener("my listener")
217
218        # Now launch the process, and do not stop at the entry point.
219        error = lldb.SBError()
220        flags = target.GetLaunchInfo().GetLaunchFlags()
221        process = target.Launch(listener,
222                                None,      # argv
223                                None,      # envp
224                                None,      # stdin_path
225                                None,      # stdout_path
226                                None,      # stderr_path
227                                None,      # working directory
228                                flags,     # launch flags
229                                False,     # Stop at entry
230                                error)     # error
231
232        # Create an empty event object.
233        event = lldb.SBEvent()
234        self.assertFalse(event, "Event should not be valid initially")
235
236        # The finite state machine for our custom listening thread, with an
237        # initial state of None, which means no event has been received.
238        # It changes to 'connected' after 'connected' event is received (for remote platforms)
239        # It changes to 'running' after 'running' event is received (should happen only if the
240        # currentstate is either 'None' or 'connected')
241        # It changes to 'stopped' if a 'stopped' event is received (should happen only if the
242        # current state is 'running'.)
243        self.state = None
244
245        # Create MyListeningThread to wait for state changed events.
246        # By design, a "running" event is expected following by a "stopped"
247        # event.
248        import threading
249
250        class MyListeningThread(threading.Thread):
251
252            def run(self):
253                self.context.trace("Running MyListeningThread:", self)
254
255                # Regular expression pattern for the event description.
256                pattern = re.compile("data = {.*, state = (.*)}$")
257
258                # Let's only try at most 6 times to retrieve our events.
259                count = 0
260                while True:
261                    if listener.WaitForEvent(5, event):
262                        desc = lldbutil.get_description(event)
263                        self.context.trace("Event description:", desc)
264                        match = pattern.search(desc)
265                        if not match:
266                            break
267                        if match.group(1) == 'connected':
268                            # When debugging remote targets with lldb-server, we
269                            # first get the 'connected' event.
270                            self.context.assertTrue(self.context.state is None)
271                            self.context.state = 'connected'
272                            continue
273                        elif match.group(1) == 'running':
274                            self.context.assertTrue(
275                                self.context.state is None or self.context.state == 'connected')
276                            self.context.state = 'running'
277                            continue
278                        elif match.group(1) == 'stopped':
279                            self.context.assertTrue(
280                                self.context.state == 'running')
281                            # Whoopee, both events have been received!
282                            self.context.state = 'stopped'
283                            break
284                        else:
285                            break
286                    print("Timeout: listener.WaitForEvent")
287                    count = count + 1
288                    if count > 6:
289                        break
290                listener.Clear()
291                return
292
293        # Use Python API to continue the process.  The listening thread should be
294        # able to receive the state changed events.
295        process.Continue()
296
297        # Start the listening thread to receive the "running" followed by the
298        # "stopped" events.
299        my_thread = MyListeningThread()
300        # Supply the enclosing context so that our listening thread can access
301        # the 'state' variable.
302        my_thread.context = self
303        my_thread.start()
304
305        # Wait until the 'MyListeningThread' terminates.
306        my_thread.join()
307
308        # The final judgement. :-)
309        self.assertEqual(self.state, 'stopped',
310                        "Both expected state changed events received")
311