xref: /llvm-project/lldb/test/API/python_api/event/TestEvents.py (revision 4cc8f2a017c76af25234afc7c380550e9c93135c)
1"""
2Test lldb Python event APIs.
3"""
4
5from __future__ import print_function
6
7
8import re
9import lldb
10from lldbsuite.test.decorators import *
11from lldbsuite.test.lldbtest import *
12from lldbsuite.test import lldbutil
13
14
15@skipIfLinux   # llvm.org/pr25924, sometimes generating SIGSEGV
16class EventAPITestCase(TestBase):
17    NO_DEBUG_INFO_TESTCASE = True
18
19    def setUp(self):
20        # Call super's setUp().
21        TestBase.setUp(self)
22        # Find the line number to of function 'c'.
23        self.line = line_number(
24            'main.c', '// Find the line number of function "c" here.')
25
26    @expectedFailureAll(
27        oslist=["linux"],
28        bugnumber="llvm.org/pr23730 Flaky, fails ~1/10 cases")
29    @skipIfWindows # This is flakey on Windows AND when it fails, it hangs: llvm.org/pr38373
30    @skipIfNetBSD
31    def test_listen_for_and_print_event(self):
32        """Exercise SBEvent API."""
33        self.build()
34        exe = self.getBuildArtifact("a.out")
35
36        self.dbg.SetAsync(True)
37
38        # Create a target by the debugger.
39        target = self.dbg.CreateTarget(exe)
40        self.assertTrue(target, VALID_TARGET)
41
42        # Now create a breakpoint on main.c by name 'c'.
43        breakpoint = target.BreakpointCreateByName('c', 'a.out')
44
45        listener = lldb.SBListener("my listener")
46
47        # Now launch the process, and do not stop at the entry point.
48        error = lldb.SBError()
49        flags = target.GetLaunchInfo().GetLaunchFlags()
50        process = target.Launch(listener,
51                                None,      # argv
52                                None,      # envp
53                                None,      # stdin_path
54                                None,      # stdout_path
55                                None,      # stderr_path
56                                None,      # working directory
57                                flags,     # launch flags
58                                False,     # Stop at entry
59                                error)     # error
60
61        self.assertEqual(
62            process.GetState(), lldb.eStateStopped,
63            PROCESS_STOPPED)
64
65        # Create an empty event object.
66        event = lldb.SBEvent()
67
68        traceOn = self.TraceOn()
69        if traceOn:
70            lldbutil.print_stacktraces(process)
71
72        # Create MyListeningThread class to wait for any kind of event.
73        import threading
74
75        class MyListeningThread(threading.Thread):
76
77            def run(self):
78                count = 0
79                # Let's only try at most 4 times to retrieve any kind of event.
80                # After that, the thread exits.
81                while not count > 3:
82                    if traceOn:
83                        print("Try wait for event...")
84                    if listener.WaitForEvent(5, event):
85                        if traceOn:
86                            desc = lldbutil.get_description(event)
87                            print("Event description:", desc)
88                            print("Event data flavor:", event.GetDataFlavor())
89                            print(
90                                "Process state:",
91                                lldbutil.state_type_to_str(
92                                    process.GetState()))
93                            print()
94                    else:
95                        if traceOn:
96                            print("timeout occurred waiting for event...")
97                    count = count + 1
98                listener.Clear()
99                return
100
101        # Let's start the listening thread to retrieve the events.
102        my_thread = MyListeningThread()
103        my_thread.start()
104
105        # Use Python API to continue the process.  The listening thread should be
106        # able to receive the state changed events.
107        process.Continue()
108
109        # Use Python API to kill the process.  The listening thread should be
110        # able to receive the state changed event, too.
111        process.Kill()
112
113        # Wait until the 'MyListeningThread' terminates.
114        my_thread.join()
115
116        # Shouldn't we be testing against some kind of expectation here?
117
118    @expectedFlakeyLinux("llvm.org/pr23730")  # Flaky, fails ~1/100 cases
119    @skipIfWindows # This is flakey on Windows AND when it fails, it hangs: llvm.org/pr38373
120    @skipIfNetBSD
121    def test_wait_for_event(self):
122        """Exercise SBListener.WaitForEvent() API."""
123        self.build()
124        exe = self.getBuildArtifact("a.out")
125
126        self.dbg.SetAsync(True)
127
128        # Create a target by the debugger.
129        target = self.dbg.CreateTarget(exe)
130        self.assertTrue(target, VALID_TARGET)
131
132        # Now create a breakpoint on main.c by name 'c'.
133        breakpoint = target.BreakpointCreateByName('c', 'a.out')
134        self.trace("breakpoint:", breakpoint)
135        self.assertTrue(breakpoint and
136                        breakpoint.GetNumLocations() == 1,
137                        VALID_BREAKPOINT)
138
139        # Get the debugger listener.
140        listener = self.dbg.GetListener()
141
142        # Now launch the process, and do not stop at entry point.
143        error = lldb.SBError()
144        flags = target.GetLaunchInfo().GetLaunchFlags()
145        process = target.Launch(listener,
146                                None,      # argv
147                                None,      # envp
148                                None,      # stdin_path
149                                None,      # stdout_path
150                                None,      # stderr_path
151                                None,      # working directory
152                                flags,     # launch flags
153                                False,     # Stop at entry
154                                error)     # error
155        self.assertTrue(error.Success() and process, PROCESS_IS_VALID)
156
157        # Create an empty event object.
158        event = lldb.SBEvent()
159        self.assertFalse(event, "Event should not be valid initially")
160
161        # Create MyListeningThread to wait for any kind of event.
162        import threading
163
164        class MyListeningThread(threading.Thread):
165
166            def run(self):
167                count = 0
168                # Let's only try at most 3 times to retrieve any kind of event.
169                while not count > 3:
170                    if listener.WaitForEvent(5, event):
171                        self.context.trace("Got a valid event:", event)
172                        self.context.trace("Event data flavor:", event.GetDataFlavor())
173                        self.context.trace("Event type:", lldbutil.state_type_to_str(event.GetType()))
174                        listener.Clear()
175                        return
176                    count = count + 1
177                    print("Timeout: listener.WaitForEvent")
178                listener.Clear()
179                return
180
181        # Use Python API to kill the process.  The listening thread should be
182        # able to receive a state changed event.
183        process.Kill()
184
185        # Let's start the listening thread to retrieve the event.
186        my_thread = MyListeningThread()
187        my_thread.context = self
188        my_thread.start()
189
190        # Wait until the 'MyListeningThread' terminates.
191        my_thread.join()
192
193        self.assertTrue(event,
194                        "My listening thread successfully received an event")
195
196    @expectedFailureAll(
197        oslist=["linux"],
198        bugnumber="llvm.org/pr23617 Flaky, fails ~1/10 cases")
199    @skipIfWindows # This is flakey on Windows AND when it fails, it hangs: llvm.org/pr38373
200    @expectedFailureNetBSD
201    def test_add_listener_to_broadcaster(self):
202        """Exercise some SBBroadcaster APIs."""
203        self.build()
204        exe = self.getBuildArtifact("a.out")
205
206        self.dbg.SetAsync(True)
207
208        # Create a target by the debugger.
209        target = self.dbg.CreateTarget(exe)
210        self.assertTrue(target, VALID_TARGET)
211
212        # Now create a breakpoint on main.c by name 'c'.
213        breakpoint = target.BreakpointCreateByName('c', 'a.out')
214        self.trace("breakpoint:", breakpoint)
215        self.assertTrue(breakpoint and
216                        breakpoint.GetNumLocations() == 1,
217                        VALID_BREAKPOINT)
218
219        listener = lldb.SBListener("my listener")
220
221        # Now launch the process, and do not stop at the entry point.
222        error = lldb.SBError()
223        flags = target.GetLaunchInfo().GetLaunchFlags()
224        process = target.Launch(listener,
225                                None,      # argv
226                                None,      # envp
227                                None,      # stdin_path
228                                None,      # stdout_path
229                                None,      # stderr_path
230                                None,      # working directory
231                                flags,     # launch flags
232                                False,     # Stop at entry
233                                error)     # error
234
235        # Create an empty event object.
236        event = lldb.SBEvent()
237        self.assertFalse(event, "Event should not be valid initially")
238
239        # The finite state machine for our custom listening thread, with an
240        # initial state of None, which means no event has been received.
241        # It changes to 'connected' after 'connected' event is received (for remote platforms)
242        # It changes to 'running' after 'running' event is received (should happen only if the
243        # currentstate is either 'None' or 'connected')
244        # It changes to 'stopped' if a 'stopped' event is received (should happen only if the
245        # current state is 'running'.)
246        self.state = None
247
248        # Create MyListeningThread to wait for state changed events.
249        # By design, a "running" event is expected following by a "stopped"
250        # event.
251        import threading
252
253        class MyListeningThread(threading.Thread):
254
255            def run(self):
256                self.context.trace("Running MyListeningThread:", self)
257
258                # Regular expression pattern for the event description.
259                pattern = re.compile("data = {.*, state = (.*)}$")
260
261                # Let's only try at most 6 times to retrieve our events.
262                count = 0
263                while True:
264                    if listener.WaitForEvent(5, event):
265                        desc = lldbutil.get_description(event)
266                        self.context.trace("Event description:", desc)
267                        match = pattern.search(desc)
268                        if not match:
269                            break
270                        if match.group(1) == 'connected':
271                            # When debugging remote targets with lldb-server, we
272                            # first get the 'connected' event.
273                            self.context.assertTrue(self.context.state is None)
274                            self.context.state = 'connected'
275                            continue
276                        elif match.group(1) == 'running':
277                            self.context.assertTrue(
278                                self.context.state is None or self.context.state == 'connected')
279                            self.context.state = 'running'
280                            continue
281                        elif match.group(1) == 'stopped':
282                            self.context.assertTrue(
283                                self.context.state == 'running')
284                            # Whoopee, both events have been received!
285                            self.context.state = 'stopped'
286                            break
287                        else:
288                            break
289                    print("Timeout: listener.WaitForEvent")
290                    count = count + 1
291                    if count > 6:
292                        break
293                listener.Clear()
294                return
295
296        # Use Python API to continue the process.  The listening thread should be
297        # able to receive the state changed events.
298        process.Continue()
299
300        # Start the listening thread to receive the "running" followed by the
301        # "stopped" events.
302        my_thread = MyListeningThread()
303        # Supply the enclosing context so that our listening thread can access
304        # the 'state' variable.
305        my_thread.context = self
306        my_thread.start()
307
308        # Wait until the 'MyListeningThread' terminates.
309        my_thread.join()
310
311        # The final judgement. :-)
312        self.assertEqual(self.state, 'stopped',
313                        "Both expected state changed events received")
314