xref: /llvm-project/lldb/test/API/python_api/event/TestEvents.py (revision 2a29c3f72e8d93385be83bd24a993c3bb57ac181)
1"""
2Test lldb Python event APIs.
3"""
4
5from __future__ import print_function
6
7
8import re
9import lldb
10from lldbsuite.test.decorators import *
11from lldbsuite.test.lldbtest import *
12from lldbsuite.test import lldbutil
13
14
15@skipIfLinux   # llvm.org/pr25924, sometimes generating SIGSEGV
16class EventAPITestCase(TestBase):
17
18    mydir = TestBase.compute_mydir(__file__)
19    NO_DEBUG_INFO_TESTCASE = True
20
21    def setUp(self):
22        # Call super's setUp().
23        TestBase.setUp(self)
24        # Find the line number to of function 'c'.
25        self.line = line_number(
26            'main.c', '// Find the line number of function "c" here.')
27
28    @expectedFailureAll(
29        oslist=["linux"],
30        bugnumber="llvm.org/pr23730 Flaky, fails ~1/10 cases")
31    @skipIfWindows # This is flakey on Windows AND when it fails, it hangs: llvm.org/pr38373
32    @skipIfNetBSD
33    def test_listen_for_and_print_event(self):
34        """Exercise SBEvent API."""
35        self.build()
36        exe = self.getBuildArtifact("a.out")
37
38        self.dbg.SetAsync(True)
39
40        # Create a target by the debugger.
41        target = self.dbg.CreateTarget(exe)
42        self.assertTrue(target, VALID_TARGET)
43
44        # Now create a breakpoint on main.c by name 'c'.
45        breakpoint = target.BreakpointCreateByName('c', 'a.out')
46
47        listener = lldb.SBListener("my listener")
48
49        # Now launch the process, and do not stop at the entry point.
50        error = lldb.SBError()
51        flags = target.GetLaunchInfo().GetLaunchFlags()
52        process = target.Launch(listener,
53                                None,      # argv
54                                None,      # envp
55                                None,      # stdin_path
56                                None,      # stdout_path
57                                None,      # stderr_path
58                                None,      # working directory
59                                flags,     # launch flags
60                                False,     # Stop at entry
61                                error)     # error
62
63        self.assertEqual(
64            process.GetState(), lldb.eStateStopped,
65            PROCESS_STOPPED)
66
67        # Create an empty event object.
68        event = lldb.SBEvent()
69
70        traceOn = self.TraceOn()
71        if traceOn:
72            lldbutil.print_stacktraces(process)
73
74        # Create MyListeningThread class to wait for any kind of event.
75        import threading
76
77        class MyListeningThread(threading.Thread):
78
79            def run(self):
80                count = 0
81                # Let's only try at most 4 times to retrieve any kind of event.
82                # After that, the thread exits.
83                while not count > 3:
84                    if traceOn:
85                        print("Try wait for event...")
86                    if listener.WaitForEvent(5, event):
87                        if traceOn:
88                            desc = lldbutil.get_description(event)
89                            print("Event description:", desc)
90                            print("Event data flavor:", event.GetDataFlavor())
91                            print(
92                                "Process state:",
93                                lldbutil.state_type_to_str(
94                                    process.GetState()))
95                            print()
96                    else:
97                        if traceOn:
98                            print("timeout occurred waiting for event...")
99                    count = count + 1
100                listener.Clear()
101                return
102
103        # Let's start the listening thread to retrieve the events.
104        my_thread = MyListeningThread()
105        my_thread.start()
106
107        # Use Python API to continue the process.  The listening thread should be
108        # able to receive the state changed events.
109        process.Continue()
110
111        # Use Python API to kill the process.  The listening thread should be
112        # able to receive the state changed event, too.
113        process.Kill()
114
115        # Wait until the 'MyListeningThread' terminates.
116        my_thread.join()
117
118        # Shouldn't we be testing against some kind of expectation here?
119
120    @expectedFlakeyLinux("llvm.org/pr23730")  # Flaky, fails ~1/100 cases
121    @skipIfWindows # This is flakey on Windows AND when it fails, it hangs: llvm.org/pr38373
122    @skipIfNetBSD
123    def test_wait_for_event(self):
124        """Exercise SBListener.WaitForEvent() API."""
125        self.build()
126        exe = self.getBuildArtifact("a.out")
127
128        self.dbg.SetAsync(True)
129
130        # Create a target by the debugger.
131        target = self.dbg.CreateTarget(exe)
132        self.assertTrue(target, VALID_TARGET)
133
134        # Now create a breakpoint on main.c by name 'c'.
135        breakpoint = target.BreakpointCreateByName('c', 'a.out')
136        self.trace("breakpoint:", breakpoint)
137        self.assertTrue(breakpoint and
138                        breakpoint.GetNumLocations() == 1,
139                        VALID_BREAKPOINT)
140
141        # Get the debugger listener.
142        listener = self.dbg.GetListener()
143
144        # Now launch the process, and do not stop at entry point.
145        error = lldb.SBError()
146        flags = target.GetLaunchInfo().GetLaunchFlags()
147        process = target.Launch(listener,
148                                None,      # argv
149                                None,      # envp
150                                None,      # stdin_path
151                                None,      # stdout_path
152                                None,      # stderr_path
153                                None,      # working directory
154                                flags,     # launch flags
155                                False,     # Stop at entry
156                                error)     # error
157        self.assertTrue(error.Success() and process, PROCESS_IS_VALID)
158
159        # Create an empty event object.
160        event = lldb.SBEvent()
161        self.assertFalse(event, "Event should not be valid initially")
162
163        # Create MyListeningThread to wait for any kind of event.
164        import threading
165
166        class MyListeningThread(threading.Thread):
167
168            def run(self):
169                count = 0
170                # Let's only try at most 3 times to retrieve any kind of event.
171                while not count > 3:
172                    if listener.WaitForEvent(5, event):
173                        self.context.trace("Got a valid event:", event)
174                        self.context.trace("Event data flavor:", event.GetDataFlavor())
175                        self.context.trace("Event type:", lldbutil.state_type_to_str(event.GetType()))
176                        listener.Clear()
177                        return
178                    count = count + 1
179                    print("Timeout: listener.WaitForEvent")
180                listener.Clear()
181                return
182
183        # Use Python API to kill the process.  The listening thread should be
184        # able to receive a state changed event.
185        process.Kill()
186
187        # Let's start the listening thread to retrieve the event.
188        my_thread = MyListeningThread()
189        my_thread.context = self
190        my_thread.start()
191
192        # Wait until the 'MyListeningThread' terminates.
193        my_thread.join()
194
195        self.assertTrue(event,
196                        "My listening thread successfully received an event")
197
198    @expectedFailureAll(
199        oslist=["linux"],
200        bugnumber="llvm.org/pr23617 Flaky, fails ~1/10 cases")
201    @skipIfWindows # This is flakey on Windows AND when it fails, it hangs: llvm.org/pr38373
202    @expectedFailureAll(oslist=["freebsd"], bugnumber="llvm.org/pr48417")
203    @expectedFailureNetBSD
204    def test_add_listener_to_broadcaster(self):
205        """Exercise some SBBroadcaster APIs."""
206        self.build()
207        exe = self.getBuildArtifact("a.out")
208
209        self.dbg.SetAsync(True)
210
211        # Create a target by the debugger.
212        target = self.dbg.CreateTarget(exe)
213        self.assertTrue(target, VALID_TARGET)
214
215        # Now create a breakpoint on main.c by name 'c'.
216        breakpoint = target.BreakpointCreateByName('c', 'a.out')
217        self.trace("breakpoint:", breakpoint)
218        self.assertTrue(breakpoint and
219                        breakpoint.GetNumLocations() == 1,
220                        VALID_BREAKPOINT)
221
222        listener = lldb.SBListener("my listener")
223
224        # Now launch the process, and do not stop at the entry point.
225        error = lldb.SBError()
226        flags = target.GetLaunchInfo().GetLaunchFlags()
227        process = target.Launch(listener,
228                                None,      # argv
229                                None,      # envp
230                                None,      # stdin_path
231                                None,      # stdout_path
232                                None,      # stderr_path
233                                None,      # working directory
234                                flags,     # launch flags
235                                False,     # Stop at entry
236                                error)     # error
237
238        # Create an empty event object.
239        event = lldb.SBEvent()
240        self.assertFalse(event, "Event should not be valid initially")
241
242        # The finite state machine for our custom listening thread, with an
243        # initial state of None, which means no event has been received.
244        # It changes to 'connected' after 'connected' event is received (for remote platforms)
245        # It changes to 'running' after 'running' event is received (should happen only if the
246        # currentstate is either 'None' or 'connected')
247        # It changes to 'stopped' if a 'stopped' event is received (should happen only if the
248        # current state is 'running'.)
249        self.state = None
250
251        # Create MyListeningThread to wait for state changed events.
252        # By design, a "running" event is expected following by a "stopped"
253        # event.
254        import threading
255
256        class MyListeningThread(threading.Thread):
257
258            def run(self):
259                self.context.trace("Running MyListeningThread:", self)
260
261                # Regular expression pattern for the event description.
262                pattern = re.compile("data = {.*, state = (.*)}$")
263
264                # Let's only try at most 6 times to retrieve our events.
265                count = 0
266                while True:
267                    if listener.WaitForEvent(5, event):
268                        desc = lldbutil.get_description(event)
269                        self.context.trace("Event description:", desc)
270                        match = pattern.search(desc)
271                        if not match:
272                            break
273                        if match.group(1) == 'connected':
274                            # When debugging remote targets with lldb-server, we
275                            # first get the 'connected' event.
276                            self.context.assertTrue(self.context.state is None)
277                            self.context.state = 'connected'
278                            continue
279                        elif match.group(1) == 'running':
280                            self.context.assertTrue(
281                                self.context.state is None or self.context.state == 'connected')
282                            self.context.state = 'running'
283                            continue
284                        elif match.group(1) == 'stopped':
285                            self.context.assertTrue(
286                                self.context.state == 'running')
287                            # Whoopee, both events have been received!
288                            self.context.state = 'stopped'
289                            break
290                        else:
291                            break
292                    print("Timeout: listener.WaitForEvent")
293                    count = count + 1
294                    if count > 6:
295                        break
296                listener.Clear()
297                return
298
299        # Use Python API to continue the process.  The listening thread should be
300        # able to receive the state changed events.
301        process.Continue()
302
303        # Start the listening thread to receive the "running" followed by the
304        # "stopped" events.
305        my_thread = MyListeningThread()
306        # Supply the enclosing context so that our listening thread can access
307        # the 'state' variable.
308        my_thread.context = self
309        my_thread.start()
310
311        # Wait until the 'MyListeningThread' terminates.
312        my_thread.join()
313
314        # The final judgement. :-)
315        self.assertEqual(self.state, 'stopped',
316                        "Both expected state changed events received")
317