1import os 2import time 3 4import dap_server 5from lldbsuite.test.lldbtest import * 6 7 8class DAPTestCaseBase(TestBase): 9 NO_DEBUG_INFO_TESTCASE = True 10 11 def create_debug_adaptor(self, lldbDAPEnv=None): 12 """Create the Visual Studio Code debug adaptor""" 13 self.assertTrue( 14 is_exe(self.lldbDAPExec), "lldb-dap must exist and be executable" 15 ) 16 log_file_path = self.getBuildArtifact("dap.txt") 17 self.dap_server = dap_server.DebugAdaptorServer( 18 executable=self.lldbDAPExec, 19 init_commands=self.setUpCommands(), 20 log_file=log_file_path, 21 env=lldbDAPEnv, 22 ) 23 24 def build_and_create_debug_adaptor(self, lldbDAPEnv=None): 25 self.build() 26 self.create_debug_adaptor(lldbDAPEnv) 27 28 def set_source_breakpoints(self, source_path, lines, data=None): 29 """Sets source breakpoints and returns an array of strings containing 30 the breakpoint IDs ("1", "2") for each breakpoint that was set. 31 Parameter data is array of data objects for breakpoints. 32 Each object in data is 1:1 mapping with the entry in lines. 33 It contains optional location/hitCondition/logMessage parameters. 34 """ 35 response = self.dap_server.request_setBreakpoints(source_path, lines, data) 36 if response is None: 37 return [] 38 breakpoints = response["body"]["breakpoints"] 39 breakpoint_ids = [] 40 for breakpoint in breakpoints: 41 breakpoint_ids.append("%i" % (breakpoint["id"])) 42 return breakpoint_ids 43 44 def set_function_breakpoints(self, functions, condition=None, hitCondition=None): 45 """Sets breakpoints by function name given an array of function names 46 and returns an array of strings containing the breakpoint IDs 47 ("1", "2") for each breakpoint that was set. 48 """ 49 response = self.dap_server.request_setFunctionBreakpoints( 50 functions, condition=condition, hitCondition=hitCondition 51 ) 52 if response is None: 53 return [] 54 breakpoints = response["body"]["breakpoints"] 55 breakpoint_ids = [] 56 for breakpoint in breakpoints: 57 breakpoint_ids.append("%i" % (breakpoint["id"])) 58 return breakpoint_ids 59 60 def waitUntil(self, condition_callback): 61 for _ in range(20): 62 if condition_callback(): 63 return True 64 time.sleep(0.5) 65 return False 66 67 def verify_breakpoint_hit(self, breakpoint_ids): 68 """Wait for the process we are debugging to stop, and verify we hit 69 any breakpoint location in the "breakpoint_ids" array. 70 "breakpoint_ids" should be a list of breakpoint ID strings 71 (["1", "2"]). The return value from self.set_source_breakpoints() 72 or self.set_function_breakpoints() can be passed to this function""" 73 stopped_events = self.dap_server.wait_for_stopped() 74 for stopped_event in stopped_events: 75 if "body" in stopped_event: 76 body = stopped_event["body"] 77 if "reason" not in body: 78 continue 79 if body["reason"] != "breakpoint": 80 continue 81 if "description" not in body: 82 continue 83 # Descriptions for breakpoints will be in the form 84 # "breakpoint 1.1", so look for any description that matches 85 # ("breakpoint 1.") in the description field as verification 86 # that one of the breakpoint locations was hit. DAP doesn't 87 # allow breakpoints to have multiple locations, but LLDB does. 88 # So when looking at the description we just want to make sure 89 # the right breakpoint matches and not worry about the actual 90 # location. 91 description = body["description"] 92 for breakpoint_id in breakpoint_ids: 93 match_desc = "breakpoint %s." % (breakpoint_id) 94 if match_desc in description: 95 return 96 self.assertTrue(False, "breakpoint not hit") 97 98 def verify_stop_exception_info(self, expected_description): 99 """Wait for the process we are debugging to stop, and verify the stop 100 reason is 'exception' and that the description matches 101 'expected_description' 102 """ 103 stopped_events = self.dap_server.wait_for_stopped() 104 for stopped_event in stopped_events: 105 if "body" in stopped_event: 106 body = stopped_event["body"] 107 if "reason" not in body: 108 continue 109 if body["reason"] != "exception": 110 continue 111 if "description" not in body: 112 continue 113 description = body["description"] 114 if expected_description == description: 115 return True 116 return False 117 118 def verify_commands(self, flavor, output, commands): 119 self.assertTrue(output and len(output) > 0, "expect console output") 120 lines = output.splitlines() 121 prefix = "(lldb) " 122 for cmd in commands: 123 found = False 124 for line in lines: 125 if line.startswith(prefix) and cmd in line: 126 found = True 127 break 128 self.assertTrue( 129 found, "verify '%s' found in console output for '%s'" % (cmd, flavor) 130 ) 131 132 def get_dict_value(self, d, key_path): 133 """Verify each key in the key_path array is in contained in each 134 dictionary within "d". Assert if any key isn't in the 135 corresponding dictionary. This is handy for grabbing values from VS 136 Code response dictionary like getting 137 response['body']['stackFrames'] 138 """ 139 value = d 140 for key in key_path: 141 if key in value: 142 value = value[key] 143 else: 144 self.assertTrue( 145 key in value, 146 'key "%s" from key_path "%s" not in "%s"' % (key, key_path, d), 147 ) 148 return value 149 150 def get_stackFrames_and_totalFramesCount( 151 self, threadId=None, startFrame=None, levels=None, dump=False 152 ): 153 response = self.dap_server.request_stackTrace( 154 threadId=threadId, startFrame=startFrame, levels=levels, dump=dump 155 ) 156 if response: 157 stackFrames = self.get_dict_value(response, ["body", "stackFrames"]) 158 totalFrames = self.get_dict_value(response, ["body", "totalFrames"]) 159 self.assertTrue( 160 totalFrames > 0, 161 "verify totalFrames count is provided by extension that supports " 162 "async frames loading", 163 ) 164 return (stackFrames, totalFrames) 165 return (None, 0) 166 167 def get_stackFrames(self, threadId=None, startFrame=None, levels=None, dump=False): 168 (stackFrames, totalFrames) = self.get_stackFrames_and_totalFramesCount( 169 threadId=threadId, startFrame=startFrame, levels=levels, dump=dump 170 ) 171 return stackFrames 172 173 def get_source_and_line(self, threadId=None, frameIndex=0): 174 stackFrames = self.get_stackFrames( 175 threadId=threadId, startFrame=frameIndex, levels=1 176 ) 177 if stackFrames is not None: 178 stackFrame = stackFrames[0] 179 ["source", "path"] 180 if "source" in stackFrame: 181 source = stackFrame["source"] 182 if "path" in source: 183 if "line" in stackFrame: 184 return (source["path"], stackFrame["line"]) 185 return ("", 0) 186 187 def get_stdout(self, timeout=0.0): 188 return self.dap_server.get_output("stdout", timeout=timeout) 189 190 def get_console(self, timeout=0.0): 191 return self.dap_server.get_output("console", timeout=timeout) 192 193 def collect_console(self, duration): 194 return self.dap_server.collect_output("console", duration=duration) 195 196 def get_local_as_int(self, name, threadId=None): 197 value = self.dap_server.get_local_variable_value(name, threadId=threadId) 198 # 'value' may have the variable value and summary. 199 # Extract the variable value since summary can have nonnumeric characters. 200 value = value.split(" ")[0] 201 if value.startswith("0x"): 202 return int(value, 16) 203 elif value.startswith("0"): 204 return int(value, 8) 205 else: 206 return int(value) 207 208 def set_local(self, name, value, id=None): 209 """Set a top level local variable only.""" 210 return self.dap_server.request_setVariable(1, name, str(value), id=id) 211 212 def set_global(self, name, value, id=None): 213 """Set a top level global variable only.""" 214 return self.dap_server.request_setVariable(2, name, str(value), id=id) 215 216 def stepIn(self, threadId=None, waitForStop=True): 217 self.dap_server.request_stepIn(threadId=threadId) 218 if waitForStop: 219 return self.dap_server.wait_for_stopped() 220 return None 221 222 def stepOver(self, threadId=None, waitForStop=True): 223 self.dap_server.request_next(threadId=threadId) 224 if waitForStop: 225 return self.dap_server.wait_for_stopped() 226 return None 227 228 def stepOut(self, threadId=None, waitForStop=True): 229 self.dap_server.request_stepOut(threadId=threadId) 230 if waitForStop: 231 return self.dap_server.wait_for_stopped() 232 return None 233 234 def continue_to_next_stop(self): 235 self.dap_server.request_continue() 236 return self.dap_server.wait_for_stopped() 237 238 def continue_to_breakpoints(self, breakpoint_ids): 239 self.dap_server.request_continue() 240 self.verify_breakpoint_hit(breakpoint_ids) 241 242 def continue_to_exception_breakpoint(self, filter_label): 243 self.dap_server.request_continue() 244 self.assertTrue( 245 self.verify_stop_exception_info(filter_label), 246 'verify we got "%s"' % (filter_label), 247 ) 248 249 def continue_to_exit(self, exitCode=0): 250 self.dap_server.request_continue() 251 stopped_events = self.dap_server.wait_for_stopped() 252 self.assertEquals( 253 len(stopped_events), 1, "stopped_events = {}".format(stopped_events) 254 ) 255 self.assertEquals( 256 stopped_events[0]["event"], "exited", "make sure program ran to completion" 257 ) 258 self.assertEquals( 259 stopped_events[0]["body"]["exitCode"], 260 exitCode, 261 "exitCode == %i" % (exitCode), 262 ) 263 264 def disassemble(self, threadId=None, frameIndex=None): 265 stackFrames = self.get_stackFrames( 266 threadId=threadId, startFrame=frameIndex, levels=1 267 ) 268 self.assertIsNotNone(stackFrames) 269 memoryReference = stackFrames[0]["instructionPointerReference"] 270 self.assertIsNotNone(memoryReference) 271 272 if memoryReference not in self.dap_server.disassembled_instructions: 273 self.dap_server.request_disassemble(memoryReference=memoryReference) 274 275 return self.dap_server.disassembled_instructions[memoryReference] 276 277 def attach( 278 self, 279 program=None, 280 pid=None, 281 waitFor=None, 282 trace=None, 283 initCommands=None, 284 preRunCommands=None, 285 stopCommands=None, 286 exitCommands=None, 287 attachCommands=None, 288 coreFile=None, 289 disconnectAutomatically=True, 290 terminateCommands=None, 291 postRunCommands=None, 292 sourceMap=None, 293 sourceInitFile=False, 294 ): 295 """Build the default Makefile target, create the DAP debug adaptor, 296 and attach to the process. 297 """ 298 299 # Make sure we disconnect and terminate the DAP debug adaptor even 300 # if we throw an exception during the test case. 301 def cleanup(): 302 if disconnectAutomatically: 303 self.dap_server.request_disconnect(terminateDebuggee=True) 304 self.dap_server.terminate() 305 306 # Execute the cleanup function during test case tear down. 307 self.addTearDownHook(cleanup) 308 # Initialize and launch the program 309 self.dap_server.request_initialize(sourceInitFile) 310 response = self.dap_server.request_attach( 311 program=program, 312 pid=pid, 313 waitFor=waitFor, 314 trace=trace, 315 initCommands=initCommands, 316 preRunCommands=preRunCommands, 317 stopCommands=stopCommands, 318 exitCommands=exitCommands, 319 attachCommands=attachCommands, 320 terminateCommands=terminateCommands, 321 coreFile=coreFile, 322 postRunCommands=postRunCommands, 323 sourceMap=sourceMap, 324 ) 325 if not (response and response["success"]): 326 self.assertTrue( 327 response["success"], "attach failed (%s)" % (response["message"]) 328 ) 329 330 def launch( 331 self, 332 program=None, 333 args=None, 334 cwd=None, 335 env=None, 336 stopOnEntry=False, 337 disableASLR=True, 338 disableSTDIO=False, 339 shellExpandArguments=False, 340 trace=False, 341 initCommands=None, 342 preRunCommands=None, 343 stopCommands=None, 344 exitCommands=None, 345 terminateCommands=None, 346 sourcePath=None, 347 debuggerRoot=None, 348 sourceInitFile=False, 349 launchCommands=None, 350 sourceMap=None, 351 disconnectAutomatically=True, 352 runInTerminal=False, 353 expectFailure=False, 354 postRunCommands=None, 355 enableAutoVariableSummaries=False, 356 enableSyntheticChildDebugging=False, 357 commandEscapePrefix="`", 358 customFrameFormat=None, 359 customThreadFormat=None, 360 ): 361 """Sending launch request to dap""" 362 363 # Make sure we disconnect and terminate the DAP debug adapter, 364 # if we throw an exception during the test case 365 def cleanup(): 366 if disconnectAutomatically: 367 self.dap_server.request_disconnect(terminateDebuggee=True) 368 self.dap_server.terminate() 369 370 # Execute the cleanup function during test case tear down. 371 self.addTearDownHook(cleanup) 372 373 # Initialize and launch the program 374 self.dap_server.request_initialize(sourceInitFile) 375 response = self.dap_server.request_launch( 376 program, 377 args=args, 378 cwd=cwd, 379 env=env, 380 stopOnEntry=stopOnEntry, 381 disableASLR=disableASLR, 382 disableSTDIO=disableSTDIO, 383 shellExpandArguments=shellExpandArguments, 384 trace=trace, 385 initCommands=initCommands, 386 preRunCommands=preRunCommands, 387 stopCommands=stopCommands, 388 exitCommands=exitCommands, 389 terminateCommands=terminateCommands, 390 sourcePath=sourcePath, 391 debuggerRoot=debuggerRoot, 392 launchCommands=launchCommands, 393 sourceMap=sourceMap, 394 runInTerminal=runInTerminal, 395 postRunCommands=postRunCommands, 396 enableAutoVariableSummaries=enableAutoVariableSummaries, 397 enableSyntheticChildDebugging=enableSyntheticChildDebugging, 398 commandEscapePrefix=commandEscapePrefix, 399 customFrameFormat=customFrameFormat, 400 customThreadFormat=customThreadFormat, 401 ) 402 403 if expectFailure: 404 return response 405 406 if not (response and response["success"]): 407 self.assertTrue( 408 response["success"], "launch failed (%s)" % (response["message"]) 409 ) 410 return response 411 412 def build_and_launch( 413 self, 414 program, 415 args=None, 416 cwd=None, 417 env=None, 418 stopOnEntry=False, 419 disableASLR=True, 420 disableSTDIO=False, 421 shellExpandArguments=False, 422 trace=False, 423 initCommands=None, 424 preRunCommands=None, 425 stopCommands=None, 426 exitCommands=None, 427 terminateCommands=None, 428 sourcePath=None, 429 debuggerRoot=None, 430 sourceInitFile=False, 431 runInTerminal=False, 432 disconnectAutomatically=True, 433 postRunCommands=None, 434 lldbDAPEnv=None, 435 enableAutoVariableSummaries=False, 436 enableSyntheticChildDebugging=False, 437 commandEscapePrefix="`", 438 customFrameFormat=None, 439 customThreadFormat=None, 440 ): 441 """Build the default Makefile target, create the DAP debug adaptor, 442 and launch the process. 443 """ 444 self.build_and_create_debug_adaptor(lldbDAPEnv) 445 self.assertTrue(os.path.exists(program), "executable must exist") 446 447 return self.launch( 448 program, 449 args, 450 cwd, 451 env, 452 stopOnEntry, 453 disableASLR, 454 disableSTDIO, 455 shellExpandArguments, 456 trace, 457 initCommands, 458 preRunCommands, 459 stopCommands, 460 exitCommands, 461 terminateCommands, 462 sourcePath, 463 debuggerRoot, 464 sourceInitFile, 465 runInTerminal=runInTerminal, 466 disconnectAutomatically=disconnectAutomatically, 467 postRunCommands=postRunCommands, 468 enableAutoVariableSummaries=enableAutoVariableSummaries, 469 enableSyntheticChildDebugging=enableSyntheticChildDebugging, 470 commandEscapePrefix=commandEscapePrefix, 471 customFrameFormat=customFrameFormat, 472 customThreadFormat=customThreadFormat, 473 ) 474