1""" 2Test SBDebugger.InterruptRequested and SBCommandInterpreter.WasInterrupted. 3""" 4 5import lldb 6import lldbsuite.test.lldbutil as lldbutil 7from lldbsuite.test.lldbtest import * 8import threading 9import os 10 11 12class TestDebuggerInterruption(TestBase): 13 """This test runs a command that starts up, rendevous with the test thread 14 using threading barriers, then checks whether it has been interrupted. 15 16 The command's first argument is either 'interp' or 'debugger', to test 17 InterruptRequested and WasInterrupted respectively. 18 19 The command has two modes, interrupt and check, the former is the one that 20 waits for an interrupt. Then latter just returns whether an interrupt was 21 requested. We use the latter to make sure we took down the flag correctly.""" 22 23 NO_DEBUG_INFO_TESTCASE = True 24 25 class CommandRunner(threading.Thread): 26 """This class is for running a command, and for making a thread to run the command on. 27 It gets passed the test it is working on behalf of, and most of the important 28 objects come from the test.""" 29 30 def __init__(self, test): 31 super().__init__() 32 self.test = test 33 34 def rendevous(self): 35 # We smuggle out barriers and event to the runner thread using thread local data: 36 import interruptible 37 38 interruptible.local_data = interruptible.BarrierContainer( 39 self.test.before_interrupt_barrier, 40 self.test.after_interrupt_barrier, 41 self.test.event, 42 ) 43 44 class DirectCommandRunner(CommandRunner): 45 """ "This version runs a single command using HandleCommand.""" 46 47 def __init__(self, test, command): 48 super().__init__(test) 49 self.command = command 50 51 def run(self): 52 self.rendevous() 53 result = self.test.dbg.GetCommandInterpreter().HandleCommand( 54 self.command, self.test.result 55 ) 56 if self.test.result_barrier: 57 self.test.result_barrier.wait() 58 59 class CommandInterpreterRunner(CommandRunner): 60 """This version runs the CommandInterpreter and feeds the command to it.""" 61 62 def __init__(self, test): 63 super().__init__(test) 64 65 def run(self): 66 self.rendevous() 67 68 test = self.test 69 70 # We will use files for debugger input and output: 71 72 # First write down the command: 73 with open(test.getBuildArtifact(test.in_filename), "w") as f: 74 f.write(f"{test.command}\n") 75 76 # Now set the debugger's stdout & stdin to our files, and run 77 # the CommandInterpreter: 78 with open(test.out_filename, "w") as outf, open( 79 test.in_filename, "r" 80 ) as inf: 81 outsbf = lldb.SBFile(outf.fileno(), "w", False) 82 orig_outf = test.dbg.GetOutputFile() 83 error = test.dbg.SetOutputFile(outsbf) 84 test.assertSuccess(error, "Could not set outfile") 85 86 insbf = lldb.SBFile(inf.fileno(), "r", False) 87 orig_inf = test.dbg.GetOutputFile() 88 error = test.dbg.SetInputFile(insbf) 89 test.assertSuccess(error, "Could not set infile") 90 91 options = lldb.SBCommandInterpreterRunOptions() 92 options.SetPrintResults(True) 93 options.SetEchoCommands(False) 94 95 test.dbg.RunCommandInterpreter(True, False, options, 0, False, False) 96 test.dbg.GetOutputFile().Flush() 97 98 error = test.dbg.SetOutputFile(orig_outf) 99 test.assertSuccess(error, "Restored outfile") 100 test.dbg.SetInputFile(orig_inf) 101 test.assertSuccess(error, "Restored infile") 102 103 def command_setup(self, args): 104 """Insert our command, if needed. Then set up event and barriers if needed. 105 Then return the command to run.""" 106 107 self.interp = self.dbg.GetCommandInterpreter() 108 self.command_name = "interruptible_command" 109 self.cmd_result = lldb.SBCommandReturnObject() 110 111 if not "check" in args: 112 self.event = threading.Event() 113 self.result_barrier = threading.Barrier(2, timeout=10) 114 self.before_interrupt_barrier = threading.Barrier(2, timeout=10) 115 self.after_interrupt_barrier = threading.Barrier(2, timeout=10) 116 else: 117 self.event = None 118 self.result_barrier = None 119 self.before_interrupt_barrier = None 120 self.after_interrupt_barrier = None 121 122 if not self.interp.UserCommandExists(self.command_name): 123 # Make the command we're going to use - it spins calling WasInterrupted: 124 cmd_filename = "interruptible" 125 cmd_filename = os.path.join(self.getSourceDir(), "interruptible.py") 126 self.runCmd(f"command script import {cmd_filename}") 127 cmd_string = f"command script add {self.command_name} --class interruptible.WelcomeCommand" 128 self.runCmd(cmd_string) 129 130 if len(args) == 0: 131 command = self.command_name 132 else: 133 command = self.command_name + " " + args 134 return command 135 136 def run_single_command(self, command): 137 # Now start up a thread to run the command: 138 self.result.Clear() 139 self.runner = TestDebuggerInterruption.DirectCommandRunner(self, command) 140 self.runner.start() 141 142 def start_command_interp(self): 143 self.runner = TestDebuggerInterruption.CommandInterpreterRunner(self) 144 self.runner.start() 145 146 def check_text(self, result_text, interrupted): 147 if interrupted: 148 self.assertIn( 149 "Command was interrupted", result_text, "Got the interrupted message" 150 ) 151 else: 152 self.assertIn( 153 "Command was not interrupted", 154 result_text, 155 "Got the not interrupted message", 156 ) 157 158 def gather_output(self): 159 # Now wait for the interrupt to interrupt the command: 160 self.runner.join(10.0) 161 finished = not self.runner.is_alive() 162 # Don't leave the runner thread stranded if the interrupt didn't work. 163 if not finished: 164 self.event.set() 165 self.runner.join(10.0) 166 167 self.assertTrue(finished, "We did finish the command") 168 169 def check_result(self, interrupted=True): 170 self.gather_output() 171 self.check_text(self.result.GetOutput(), interrupted) 172 173 def check_result_output(self, interrupted=True): 174 self.gather_output() 175 buffer = "" 176 # Okay, now open the file for reading, and read. 177 with open(self.out_filename, "r") as f: 178 buffer = f.read() 179 180 self.assertNotEqual(len(buffer), 0, "No command data") 181 self.check_text(buffer, interrupted) 182 183 def debugger_interrupt_test(self, use_interrupt_requested): 184 """Test that debugger interruption interrupts a command 185 running directly through HandleCommand. 186 If use_interrupt_requested is true, we'll check that API, 187 otherwise we'll check WasInterrupted. They should both do 188 the same thing.""" 189 190 if use_interrupt_requested: 191 command = self.command_setup("debugger") 192 else: 193 command = self.command_setup("interp") 194 195 self.result = lldb.SBCommandReturnObject() 196 self.run_single_command(command) 197 198 # Okay now wait till the command has gotten started to issue the interrupt: 199 self.before_interrupt_barrier.wait() 200 # I'm going to do it twice here to test that it works as a counter: 201 self.dbg.RequestInterrupt() 202 self.dbg.RequestInterrupt() 203 204 def cleanup(): 205 self.dbg.CancelInterruptRequest() 206 207 self.addTearDownHook(cleanup) 208 # Okay, now set both sides going: 209 self.after_interrupt_barrier.wait() 210 211 # Check that the command was indeed interrupted. First rendevous 212 # after the runner thread had a chance to execute the command: 213 self.result_barrier.wait() 214 self.assertTrue(self.result.Succeeded(), "Our command succeeded") 215 result_output = self.result.GetOutput() 216 self.check_result(True) 217 218 # Do it again to make sure that the counter is counting: 219 self.dbg.CancelInterruptRequest() 220 command = self.command_setup("debugger") 221 self.run_single_command(command) 222 223 # This time we won't even get to run the command, since HandleCommand 224 # checks for the interrupt state on entry, so we don't wait on the command 225 # barriers. 226 self.result_barrier.wait() 227 228 # Again check that we were 229 self.assertFalse(self.result.Succeeded(), "Our command was not allowed to run") 230 error_output = self.result.GetError() 231 self.assertIn( 232 "... Interrupted", error_output, "Command was cut short by interrupt" 233 ) 234 235 # Now take down the flag, and make sure that we aren't interrupted: 236 self.dbg.CancelInterruptRequest() 237 238 # Now make sure that we really did take down the flag: 239 command = self.command_setup("debugger check") 240 self.run_single_command(command) 241 result_output = self.result.GetOutput() 242 self.check_result(False) 243 244 def test_debugger_interrupt_use_dbg(self): 245 self.debugger_interrupt_test(True) 246 247 def test_debugger_interrupt_use_interp(self): 248 self.debugger_interrupt_test(False) 249 250 def test_interp_doesnt_interrupt_debugger(self): 251 """Test that interpreter interruption does not interrupt a command 252 running directly through HandleCommand. 253 If use_interrupt_requested is true, we'll check that API, 254 otherwise we'll check WasInterrupted. They should both do 255 the same thing.""" 256 257 command = self.command_setup("debugger poll") 258 259 self.result = lldb.SBCommandReturnObject() 260 self.run_single_command(command) 261 262 # Now raise the debugger interrupt flag. It will also interrupt the command: 263 self.before_interrupt_barrier.wait() 264 self.dbg.GetCommandInterpreter().InterruptCommand() 265 self.after_interrupt_barrier.wait() 266 267 # Check that the command was indeed interrupted: 268 self.result_barrier.wait() 269 self.assertTrue(self.result.Succeeded(), "Our command succeeded") 270 result_output = self.result.GetOutput() 271 self.check_result(False) 272 273 def interruptible_command_test(self, use_interrupt_requested): 274 """Test that interpreter interruption interrupts a command 275 running in the RunCommandInterpreter loop. 276 If use_interrupt_requested is true, we'll check that API, 277 otherwise we'll check WasInterrupted. They should both do 278 the same thing.""" 279 280 self.out_filename = self.getBuildArtifact("output") 281 self.in_filename = self.getBuildArtifact("input") 282 # We're going to overwrite the input file, but we 283 # don't want data accumulating in the output file. 284 285 if os.path.exists(self.out_filename): 286 os.unlink(self.out_filename) 287 288 # You should be able to use either check method interchangeably: 289 if use_interrupt_requested: 290 self.command = self.command_setup("debugger") + "\n" 291 else: 292 self.command = self.command_setup("interp") + "\n" 293 294 self.start_command_interp() 295 296 # Now give the interpreter a chance to run this command up 297 # to the first barrier 298 self.before_interrupt_barrier.wait() 299 # Then issue the interrupt: 300 sent_interrupt = self.dbg.GetCommandInterpreter().InterruptCommand() 301 self.assertTrue(sent_interrupt, "Did send command interrupt.") 302 # Now give the command a chance to finish: 303 self.after_interrupt_barrier.wait() 304 305 self.check_result_output(True) 306 307 os.unlink(self.out_filename) 308 309 # Now send the check command, and make sure the flag is now down. 310 self.command = self.command_setup("interp check") + "\n" 311 self.start_command_interp() 312 313 self.check_result_output(False) 314 315 def test_interruptible_command_check_dbg(self): 316 self.interruptible_command_test(True) 317 318 def test_interruptible_command_check_interp(self): 319 self.interruptible_command_test(False) 320 321 def test_debugger_doesnt_interrupt_command(self): 322 """Test that debugger interruption doesn't interrupt a command 323 running in the RunCommandInterpreter loop.""" 324 325 self.out_filename = self.getBuildArtifact("output") 326 self.in_filename = self.getBuildArtifact("input") 327 # We're going to overwrite the input file, but we 328 # don't want data accumulating in the output file. 329 330 if os.path.exists(self.out_filename): 331 os.unlink(self.out_filename) 332 333 self.command = self.command_setup("interp poll") + "\n" 334 335 self.start_command_interp() 336 337 self.before_interrupt_barrier.wait() 338 self.dbg.RequestInterrupt() 339 340 def cleanup(): 341 self.dbg.CancelInterruptRequest() 342 343 self.addTearDownHook(cleanup) 344 self.after_interrupt_barrier.wait() 345 346 self.check_result_output(False) 347 348 os.unlink(self.out_filename) 349