xref: /llvm-project/lldb/test/API/python_api/event/TestEvents.py (revision 619e2e095fb1cd1e60b745cf1a10af9f67a4cd41)
1"""
2Test lldb Python event APIs.
3"""
4
5from __future__ import print_function
6
7
8import re
9import lldb
10from lldbsuite.test.decorators import *
11from lldbsuite.test.lldbtest import *
12from lldbsuite.test import lldbutil
13
14
15@skipIfLinux   # llvm.org/pr25924, sometimes generating SIGSEGV
16@skipIfDarwin
17class EventAPITestCase(TestBase):
18
19    mydir = TestBase.compute_mydir(__file__)
20    NO_DEBUG_INFO_TESTCASE = True
21
22    def setUp(self):
23        # Call super's setUp().
24        TestBase.setUp(self)
25        # Find the line number to of function 'c'.
26        self.line = line_number(
27            'main.c', '// Find the line number of function "c" here.')
28
29    @add_test_categories(['pyapi'])
30    @expectedFailureAll(
31        oslist=["linux"],
32        bugnumber="llvm.org/pr23730 Flaky, fails ~1/10 cases")
33    @skipIfWindows # This is flakey on Windows AND when it fails, it hangs: llvm.org/pr38373
34    @skipIfNetBSD
35    def test_listen_for_and_print_event(self):
36        """Exercise SBEvent API."""
37        self.build()
38        exe = self.getBuildArtifact("a.out")
39
40        self.dbg.SetAsync(True)
41
42        # Create a target by the debugger.
43        target = self.dbg.CreateTarget(exe)
44        self.assertTrue(target, VALID_TARGET)
45
46        # Now create a breakpoint on main.c by name 'c'.
47        breakpoint = target.BreakpointCreateByName('c', 'a.out')
48
49        listener = lldb.SBListener("my listener")
50
51        # Now launch the process, and do not stop at the entry point.
52        error = lldb.SBError()
53        flags = target.GetLaunchInfo().GetLaunchFlags()
54        process = target.Launch(listener,
55                                None,      # argv
56                                None,      # envp
57                                None,      # stdin_path
58                                None,      # stdout_path
59                                None,      # stderr_path
60                                None,      # working directory
61                                flags,     # launch flags
62                                False,     # Stop at entry
63                                error)     # error
64
65        self.assertTrue(
66            process.GetState() == lldb.eStateStopped,
67            PROCESS_STOPPED)
68
69        # Create an empty event object.
70        event = lldb.SBEvent()
71
72        traceOn = self.TraceOn()
73        if traceOn:
74            lldbutil.print_stacktraces(process)
75
76        # Create MyListeningThread class to wait for any kind of event.
77        import threading
78
79        class MyListeningThread(threading.Thread):
80
81            def run(self):
82                count = 0
83                # Let's only try at most 4 times to retrieve any kind of event.
84                # After that, the thread exits.
85                while not count > 3:
86                    if traceOn:
87                        print("Try wait for event...")
88                    if listener.WaitForEvent(5, event):
89                        if traceOn:
90                            desc = lldbutil.get_description(event)
91                            print("Event description:", desc)
92                            print("Event data flavor:", event.GetDataFlavor())
93                            print(
94                                "Process state:",
95                                lldbutil.state_type_to_str(
96                                    process.GetState()))
97                            print()
98                    else:
99                        if traceOn:
100                            print("timeout occurred waiting for event...")
101                    count = count + 1
102                listener.Clear()
103                return
104
105        # Let's start the listening thread to retrieve the events.
106        my_thread = MyListeningThread()
107        my_thread.start()
108
109        # Use Python API to continue the process.  The listening thread should be
110        # able to receive the state changed events.
111        process.Continue()
112
113        # Use Python API to kill the process.  The listening thread should be
114        # able to receive the state changed event, too.
115        process.Kill()
116
117        # Wait until the 'MyListeningThread' terminates.
118        my_thread.join()
119
120        # Shouldn't we be testing against some kind of expectation here?
121
122    @add_test_categories(['pyapi'])
123    @expectedFlakeyLinux("llvm.org/pr23730")  # Flaky, fails ~1/100 cases
124    @skipIfWindows # This is flakey on Windows AND when it fails, it hangs: llvm.org/pr38373
125    @skipIfNetBSD
126    def test_wait_for_event(self):
127        """Exercise SBListener.WaitForEvent() API."""
128        self.build()
129        exe = self.getBuildArtifact("a.out")
130
131        self.dbg.SetAsync(True)
132
133        # Create a target by the debugger.
134        target = self.dbg.CreateTarget(exe)
135        self.assertTrue(target, VALID_TARGET)
136
137        # Now create a breakpoint on main.c by name 'c'.
138        breakpoint = target.BreakpointCreateByName('c', 'a.out')
139        self.trace("breakpoint:", breakpoint)
140        self.assertTrue(breakpoint and
141                        breakpoint.GetNumLocations() == 1,
142                        VALID_BREAKPOINT)
143
144        # Get the debugger listener.
145        listener = self.dbg.GetListener()
146
147        # Now launch the process, and do not stop at entry point.
148        error = lldb.SBError()
149        flags = target.GetLaunchInfo().GetLaunchFlags()
150        process = target.Launch(listener,
151                                None,      # argv
152                                None,      # envp
153                                None,      # stdin_path
154                                None,      # stdout_path
155                                None,      # stderr_path
156                                None,      # working directory
157                                flags,     # launch flags
158                                False,     # Stop at entry
159                                error)     # error
160        self.assertTrue(error.Success() and process, PROCESS_IS_VALID)
161
162        # Create an empty event object.
163        event = lldb.SBEvent()
164        self.assertFalse(event, "Event should not be valid initially")
165
166        # Create MyListeningThread to wait for any kind of event.
167        import threading
168
169        class MyListeningThread(threading.Thread):
170
171            def run(self):
172                count = 0
173                # Let's only try at most 3 times to retrieve any kind of event.
174                while not count > 3:
175                    if listener.WaitForEvent(5, event):
176                        self.trace("Got a valid event:", event)
177                        self.trace("Event data flavor:", event.GetDataFlavor())
178                        self.trace("Event type:", lldbutil.state_type_to_str(event.GetType()))
179                        listener.Clear()
180                        return
181                    count = count + 1
182                    print("Timeout: listener.WaitForEvent")
183                listener.Clear()
184                return
185
186        # Use Python API to kill the process.  The listening thread should be
187        # able to receive a state changed event.
188        process.Kill()
189
190        # Let's start the listening thread to retrieve the event.
191        my_thread = MyListeningThread()
192        my_thread.start()
193
194        # Wait until the 'MyListeningThread' terminates.
195        my_thread.join()
196
197        self.assertTrue(event,
198                        "My listening thread successfully received an event")
199
200    @add_test_categories(['pyapi'])
201    @expectedFailureAll(
202        oslist=["linux"],
203        bugnumber="llvm.org/pr23617 Flaky, fails ~1/10 cases")
204    @skipIfWindows # This is flakey on Windows AND when it fails, it hangs: llvm.org/pr38373
205    @expectedFailureAll(oslist=["freebsd"], bugnumber="llvm.org/pr48417")
206    @expectedFailureNetBSD
207    def test_add_listener_to_broadcaster(self):
208        """Exercise some SBBroadcaster APIs."""
209        self.build()
210        exe = self.getBuildArtifact("a.out")
211
212        self.dbg.SetAsync(True)
213
214        # Create a target by the debugger.
215        target = self.dbg.CreateTarget(exe)
216        self.assertTrue(target, VALID_TARGET)
217
218        # Now create a breakpoint on main.c by name 'c'.
219        breakpoint = target.BreakpointCreateByName('c', 'a.out')
220        self.trace("breakpoint:", breakpoint)
221        self.assertTrue(breakpoint and
222                        breakpoint.GetNumLocations() == 1,
223                        VALID_BREAKPOINT)
224
225        listener = lldb.SBListener("my listener")
226
227        # Now launch the process, and do not stop at the entry point.
228        error = lldb.SBError()
229        flags = target.GetLaunchInfo().GetLaunchFlags()
230        process = target.Launch(listener,
231                                None,      # argv
232                                None,      # envp
233                                None,      # stdin_path
234                                None,      # stdout_path
235                                None,      # stderr_path
236                                None,      # working directory
237                                flags,     # launch flags
238                                False,     # Stop at entry
239                                error)     # error
240
241        # Create an empty event object.
242        event = lldb.SBEvent()
243        self.assertFalse(event, "Event should not be valid initially")
244
245        # The finite state machine for our custom listening thread, with an
246        # initial state of None, which means no event has been received.
247        # It changes to 'connected' after 'connected' event is received (for remote platforms)
248        # It changes to 'running' after 'running' event is received (should happen only if the
249        # currentstate is either 'None' or 'connected')
250        # It changes to 'stopped' if a 'stopped' event is received (should happen only if the
251        # current state is 'running'.)
252        self.state = None
253
254        # Create MyListeningThread to wait for state changed events.
255        # By design, a "running" event is expected following by a "stopped"
256        # event.
257        import threading
258
259        class MyListeningThread(threading.Thread):
260
261            def run(self):
262                self.trace("Running MyListeningThread:", self)
263
264                # Regular expression pattern for the event description.
265                pattern = re.compile("data = {.*, state = (.*)}$")
266
267                # Let's only try at most 6 times to retrieve our events.
268                count = 0
269                while True:
270                    if listener.WaitForEvent(5, event):
271                        desc = lldbutil.get_description(event)
272                        self.trace("Event description:", desc)
273                        match = pattern.search(desc)
274                        if not match:
275                            break
276                        if match.group(1) == 'connected':
277                            # When debugging remote targets with lldb-server, we
278                            # first get the 'connected' event.
279                            self.context.assertTrue(self.context.state is None)
280                            self.context.state = 'connected'
281                            continue
282                        elif match.group(1) == 'running':
283                            self.context.assertTrue(
284                                self.context.state is None or self.context.state == 'connected')
285                            self.context.state = 'running'
286                            continue
287                        elif match.group(1) == 'stopped':
288                            self.context.assertTrue(
289                                self.context.state == 'running')
290                            # Whoopee, both events have been received!
291                            self.context.state = 'stopped'
292                            break
293                        else:
294                            break
295                    print("Timeout: listener.WaitForEvent")
296                    count = count + 1
297                    if count > 6:
298                        break
299                listener.Clear()
300                return
301
302        # Use Python API to continue the process.  The listening thread should be
303        # able to receive the state changed events.
304        process.Continue()
305
306        # Start the listening thread to receive the "running" followed by the
307        # "stopped" events.
308        my_thread = MyListeningThread()
309        # Supply the enclosing context so that our listening thread can access
310        # the 'state' variable.
311        my_thread.context = self
312        my_thread.start()
313
314        # Wait until the 'MyListeningThread' terminates.
315        my_thread.join()
316
317        # The final judgement. :-)
318        self.assertEqual(self.state, 'stopped',
319                        "Both expected state changed events received")
320