xref: /llvm-project/lldb/test/API/python_api/was_interrupted/TestDebuggerInterruption.py (revision fe61b38258bf4c5f34c32de26f4ed11ef5c32ebc)
1"""
2Test SBDebugger.InterruptRequested and SBCommandInterpreter.WasInterrupted.
3"""
4
5import lldb
6import lldbsuite.test.lldbutil as lldbutil
7from lldbsuite.test.lldbtest import *
8import threading
9import os
10
11class TestDebuggerInterruption(TestBase):
12    """This test runs a command that starts up, rendevous with the test thread
13       using threading barriers, then checks whether it has been interrupted.
14
15    The command's first argument is either 'interp' or 'debugger', to test
16    InterruptRequested and WasInterrupted respectively.
17
18    The command has two modes, interrupt and check, the former is the one that
19    waits for an interrupt.  Then latter just returns whether an interrupt was
20    requested.  We use the latter to make sure we took down the flag correctly."""
21
22    NO_DEBUG_INFO_TESTCASE = True
23
24    class CommandRunner(threading.Thread):
25        """This class is for running a command, and for making a thread to run the command on.
26           It gets passed the test it is working on behalf of, and most of the important
27           objects come from the test. """
28        def __init__(self, test):
29            super().__init__()
30            self.test = test
31
32        def rendevous(self):
33            # We smuggle out barriers and event to the runner thread using thread local data:
34            import interruptible
35            interruptible.local_data = interruptible.BarrierContainer(self.test.before_interrupt_barrier,
36                                                                   self.test.after_interrupt_barrier,
37                                                                   self.test.event)
38
39    class DirectCommandRunner(CommandRunner):
40        """"This version runs a single command using HandleCommand."""
41        def __init__(self, test, command):
42            super().__init__(test)
43            self.command = command
44
45        def run(self):
46            self.rendevous()
47            result = self.test.dbg.GetCommandInterpreter().HandleCommand(self.command, self.test.result)
48            if self.test.result_barrier:
49                self.test.result_barrier.wait()
50
51    class CommandInterpreterRunner(CommandRunner):
52        """This version runs the CommandInterpreter and feeds the command to it."""
53        def __init__(self, test):
54            super().__init__(test)
55
56        def run(self):
57            self.rendevous()
58
59            test = self.test
60
61            # We will use files for debugger input and output:
62
63            # First write down the command:
64            with open(test.getBuildArtifact(test.in_filename), "w") as f:
65                f.write(f"{test.command}\n")
66
67            # Now set the debugger's stdout & stdin to our files, and run
68            # the CommandInterpreter:
69            with open(test.out_filename, "w") as outf, open(test.in_filename, "r") as inf:
70                outsbf = lldb.SBFile(outf.fileno(), "w", False)
71                orig_outf = test.dbg.GetOutputFile()
72                error = test.dbg.SetOutputFile(outsbf)
73                test.assertSuccess(error, "Could not set outfile")
74
75                insbf = lldb.SBFile(inf.fileno(), "r", False)
76                orig_inf = test.dbg.GetOutputFile()
77                error = test.dbg.SetInputFile(insbf)
78                test.assertSuccess(error, "Could not set infile")
79
80                options = lldb.SBCommandInterpreterRunOptions()
81                options.SetPrintResults(True)
82                options.SetEchoCommands(False)
83
84                test.dbg.RunCommandInterpreter(True, False, options, 0, False, False)
85                test.dbg.GetOutputFile().Flush()
86
87                error = test.dbg.SetOutputFile(orig_outf)
88                test.assertSuccess(error, "Restored outfile")
89                test.dbg.SetInputFile(orig_inf)
90                test.assertSuccess(error, "Restored infile")
91
92    def command_setup(self, args):
93        """Insert our command, if needed.  Then set up event and barriers if needed.
94           Then return the command to run."""
95
96        self.interp = self.dbg.GetCommandInterpreter()
97        self.command_name = "interruptible_command"
98        self.cmd_result = lldb.SBCommandReturnObject()
99
100        if not "check" in args:
101            self.event = threading.Event()
102            self.result_barrier = threading.Barrier(2, timeout=10)
103            self.before_interrupt_barrier = threading.Barrier(2, timeout=10)
104            self.after_interrupt_barrier = threading.Barrier(2, timeout=10)
105        else:
106            self.event = None
107            self.result_barrier = None
108            self.before_interrupt_barrier = None
109            self.after_interrupt_barrier = None
110
111        if not self.interp.UserCommandExists(self.command_name):
112            # Make the command we're going to use - it spins calling WasInterrupted:
113            cmd_filename = "interruptible"
114            cmd_filename = os.path.join(self.getSourceDir(), "interruptible.py")
115            self.runCmd(f"command script import {cmd_filename}")
116            cmd_string = f"command script add {self.command_name} --class interruptible.WelcomeCommand"
117            self.runCmd(cmd_string)
118
119        if len(args) == 0:
120            command = self.command_name
121        else:
122            command = self.command_name + " " + args
123        return command
124
125    def run_single_command(self, command):
126        # Now start up a thread to run the command:
127        self.result.Clear()
128        self.runner = TestDebuggerInterruption.DirectCommandRunner(self, command)
129        self.runner.start()
130
131    def start_command_interp(self):
132        self.runner = TestDebuggerInterruption.CommandInterpreterRunner(self)
133        self.runner.start()
134
135    def check_text(self, result_text, interrupted):
136        if interrupted:
137            self.assertIn("Command was interrupted", result_text,
138                          "Got the interrupted message")
139        else:
140            self.assertIn("Command was not interrupted", result_text,
141                          "Got the not interrupted message")
142
143    def gather_output(self):
144        # Now wait for the interrupt to interrupt the command:
145        self.runner.join(10.0)
146        finished = not self.runner.is_alive()
147        # Don't leave the runner thread stranded if the interrupt didn't work.
148        if not finished:
149            self.event.set()
150            self.runner.join(10.0)
151
152        self.assertTrue(finished, "We did finish the command")
153
154    def check_result(self, interrupted = True):
155        self.gather_output()
156        self.check_text(self.result.GetOutput(), interrupted)
157
158    def check_result_output(self, interrupted = True):
159        self.gather_output()
160        buffer = ""
161        # Okay, now open the file for reading, and read.
162        with open(self.out_filename, "r") as f:
163            buffer = f.read()
164
165        self.assertNotEqual(len(buffer), 0, "No command data")
166        self.check_text(buffer, interrupted)
167
168    def debugger_interrupt_test(self, use_interrupt_requested):
169        """Test that debugger interruption interrupts a command
170           running directly through HandleCommand.
171           If use_interrupt_requested is true, we'll check that API,
172           otherwise we'll check WasInterrupted.  They should both do
173           the same thing."""
174
175        if use_interrupt_requested:
176            command = self.command_setup("debugger")
177        else:
178            command = self.command_setup("interp")
179
180        self.result = lldb.SBCommandReturnObject()
181        self.run_single_command(command)
182
183        # Okay now wait till the command has gotten started to issue the interrupt:
184        self.before_interrupt_barrier.wait()
185        # I'm going to do it twice here to test that it works as a counter:
186        self.dbg.RequestInterrupt()
187        self.dbg.RequestInterrupt()
188
189        def cleanup():
190            self.dbg.CancelInterruptRequest()
191        self.addTearDownHook(cleanup)
192        # Okay, now set both sides going:
193        self.after_interrupt_barrier.wait()
194
195        # Check that the command was indeed interrupted.  First rendevous
196        # after the runner thread had a chance to execute the command:
197        self.result_barrier.wait()
198        self.assertTrue(self.result.Succeeded(), "Our command succeeded")
199        result_output = self.result.GetOutput()
200        self.check_result(True)
201
202        # Do it again to make sure that the counter is counting:
203        self.dbg.CancelInterruptRequest()
204        command = self.command_setup("debugger")
205        self.run_single_command(command)
206
207        # This time we won't even get to run the command, since HandleCommand
208        # checks for the interrupt state on entry, so we don't wait on the command
209        # barriers.
210        self.result_barrier.wait()
211
212        # Again check that we were
213        self.assertFalse(self.result.Succeeded(), "Our command was not allowed to run")
214        error_output = self.result.GetError()
215        self.assertIn("... Interrupted", error_output, "Command was cut short by interrupt")
216
217        # Now take down the flag, and make sure that we aren't interrupted:
218        self.dbg.CancelInterruptRequest()
219
220        # Now make sure that we really did take down the flag:
221        command = self.command_setup("debugger check")
222        self.run_single_command(command)
223        result_output = self.result.GetOutput()
224        self.check_result(False)
225
226    def test_debugger_interrupt_use_dbg(self):
227        self.debugger_interrupt_test(True)
228
229    def test_debugger_interrupt_use_interp(self):
230        self.debugger_interrupt_test(False)
231
232    def test_interp_doesnt_interrupt_debugger(self):
233        """Test that interpreter interruption does not interrupt a command
234           running directly through HandleCommand.
235           If use_interrupt_requested is true, we'll check that API,
236           otherwise we'll check WasInterrupted.  They should both do
237           the same thing."""
238
239        command = self.command_setup("debugger poll")
240
241        self.result = lldb.SBCommandReturnObject()
242        self.run_single_command(command)
243
244        # Now raise the debugger interrupt flag.  It will also interrupt the command:
245        self.before_interrupt_barrier.wait()
246        self.dbg.GetCommandInterpreter().InterruptCommand()
247        self.after_interrupt_barrier.wait()
248
249        # Check that the command was indeed interrupted:
250        self.result_barrier.wait()
251        self.assertTrue(self.result.Succeeded(), "Our command succeeded")
252        result_output = self.result.GetOutput()
253        self.check_result(False)
254
255
256    def interruptible_command_test(self, use_interrupt_requested):
257        """Test that interpreter interruption interrupts a command
258           running in the RunCommandInterpreter loop.
259           If use_interrupt_requested is true, we'll check that API,
260           otherwise we'll check WasInterrupted.  They should both do
261           the same thing."""
262
263        self.out_filename = self.getBuildArtifact("output")
264        self.in_filename = self.getBuildArtifact("input")
265        # We're going to overwrite the input file, but we
266        # don't want data accumulating in the output file.
267
268        if os.path.exists(self.out_filename):
269            os.unlink(self.out_filename)
270
271        # You should be able to use either check method interchangeably:
272        if use_interrupt_requested:
273            self.command = self.command_setup("debugger") + "\n"
274        else:
275            self.command = self.command_setup("interp") + "\n"
276
277        self.start_command_interp()
278
279        # Now give the interpreter a chance to run this command up
280        # to the first barrier
281        self.before_interrupt_barrier.wait()
282        # Then issue the interrupt:
283        sent_interrupt = self.dbg.GetCommandInterpreter().InterruptCommand()
284        self.assertTrue(sent_interrupt, "Did send command interrupt.")
285        # Now give the command a chance to finish:
286        self.after_interrupt_barrier.wait()
287
288        self.check_result_output(True)
289
290        os.unlink(self.out_filename)
291
292        # Now send the check command, and make sure the flag is now down.
293        self.command = self.command_setup("interp check") + "\n"
294        self.start_command_interp()
295
296        self.check_result_output(False)
297
298    def test_interruptible_command_check_dbg(self):
299        self.interruptible_command_test(True)
300
301    def test_interruptible_command_check_interp(self):
302        self.interruptible_command_test(False)
303
304    def test_debugger_doesnt_interrupt_command(self):
305        """Test that debugger interruption doesn't interrupt a command
306           running in the RunCommandInterpreter loop."""
307
308        self.out_filename = self.getBuildArtifact("output")
309        self.in_filename = self.getBuildArtifact("input")
310        # We're going to overwrite the input file, but we
311        # don't want data accumulating in the output file.
312
313        if os.path.exists(self.out_filename):
314            os.unlink(self.out_filename)
315
316        self.command = self.command_setup("interp poll") + "\n"
317
318        self.start_command_interp()
319
320        self.before_interrupt_barrier.wait()
321        self.dbg.RequestInterrupt()
322        def cleanup():
323            self.dbg.CancelInterruptRequest()
324        self.addTearDownHook(cleanup)
325        self.after_interrupt_barrier.wait()
326
327        self.check_result_output(False)
328
329        os.unlink(self.out_filename)
330
331