1import os 2import time 3 4import dap_server 5from lldbsuite.test.lldbtest import * 6from lldbsuite.test import lldbplatformutil 7import lldbgdbserverutils 8 9 10class DAPTestCaseBase(TestBase): 11 # set timeout based on whether ASAN was enabled or not. Increase 12 # timeout by a factor of 10 if ASAN is enabled. 13 timeoutval = 10 * (10 if ('ASAN_OPTIONS' in os.environ) else 1) 14 NO_DEBUG_INFO_TESTCASE = True 15 16 def create_debug_adaptor(self, lldbDAPEnv=None): 17 """Create the Visual Studio Code debug adaptor""" 18 self.assertTrue( 19 is_exe(self.lldbDAPExec), "lldb-dap must exist and be executable" 20 ) 21 log_file_path = self.getBuildArtifact("dap.txt") 22 self.dap_server = dap_server.DebugAdaptorServer( 23 executable=self.lldbDAPExec, 24 init_commands=self.setUpCommands(), 25 log_file=log_file_path, 26 env=lldbDAPEnv, 27 ) 28 29 def build_and_create_debug_adaptor(self, lldbDAPEnv=None): 30 self.build() 31 self.create_debug_adaptor(lldbDAPEnv) 32 33 def set_source_breakpoints(self, source_path, lines, data=None): 34 """Sets source breakpoints and returns an array of strings containing 35 the breakpoint IDs ("1", "2") for each breakpoint that was set. 36 Parameter data is array of data objects for breakpoints. 37 Each object in data is 1:1 mapping with the entry in lines. 38 It contains optional location/hitCondition/logMessage parameters. 39 """ 40 response = self.dap_server.request_setBreakpoints(source_path, lines, data) 41 if response is None: 42 return [] 43 breakpoints = response["body"]["breakpoints"] 44 breakpoint_ids = [] 45 for breakpoint in breakpoints: 46 breakpoint_ids.append("%i" % (breakpoint["id"])) 47 return breakpoint_ids 48 49 def set_function_breakpoints(self, functions, condition=None, hitCondition=None): 50 """Sets breakpoints by function name given an array of function names 51 and returns an array of strings containing the breakpoint IDs 52 ("1", "2") for each breakpoint that was set. 53 """ 54 response = self.dap_server.request_setFunctionBreakpoints( 55 functions, condition=condition, hitCondition=hitCondition 56 ) 57 if response is None: 58 return [] 59 breakpoints = response["body"]["breakpoints"] 60 breakpoint_ids = [] 61 for breakpoint in breakpoints: 62 breakpoint_ids.append("%i" % (breakpoint["id"])) 63 return breakpoint_ids 64 65 def waitUntil(self, condition_callback): 66 for _ in range(20): 67 if condition_callback(): 68 return True 69 time.sleep(0.5) 70 return False 71 72 def verify_breakpoint_hit(self, breakpoint_ids): 73 """Wait for the process we are debugging to stop, and verify we hit 74 any breakpoint location in the "breakpoint_ids" array. 75 "breakpoint_ids" should be a list of breakpoint ID strings 76 (["1", "2"]). The return value from self.set_source_breakpoints() 77 or self.set_function_breakpoints() can be passed to this function""" 78 stopped_events = self.dap_server.wait_for_stopped() 79 for stopped_event in stopped_events: 80 if "body" in stopped_event: 81 body = stopped_event["body"] 82 if "reason" not in body: 83 continue 84 if body["reason"] != "breakpoint": 85 continue 86 if "description" not in body: 87 continue 88 # Descriptions for breakpoints will be in the form 89 # "breakpoint 1.1", so look for any description that matches 90 # ("breakpoint 1.") in the description field as verification 91 # that one of the breakpoint locations was hit. DAP doesn't 92 # allow breakpoints to have multiple locations, but LLDB does. 93 # So when looking at the description we just want to make sure 94 # the right breakpoint matches and not worry about the actual 95 # location. 96 description = body["description"] 97 for breakpoint_id in breakpoint_ids: 98 match_desc = "breakpoint %s." % (breakpoint_id) 99 if match_desc in description: 100 return 101 self.assertTrue(False, "breakpoint not hit") 102 103 def verify_stop_exception_info(self, expected_description): 104 """Wait for the process we are debugging to stop, and verify the stop 105 reason is 'exception' and that the description matches 106 'expected_description' 107 """ 108 stopped_events = self.dap_server.wait_for_stopped() 109 for stopped_event in stopped_events: 110 if "body" in stopped_event: 111 body = stopped_event["body"] 112 if "reason" not in body: 113 continue 114 if body["reason"] != "exception": 115 continue 116 if "description" not in body: 117 continue 118 description = body["description"] 119 if expected_description == description: 120 return True 121 return False 122 123 def verify_commands(self, flavor, output, commands): 124 self.assertTrue(output and len(output) > 0, "expect console output") 125 lines = output.splitlines() 126 prefix = "(lldb) " 127 for cmd in commands: 128 found = False 129 for line in lines: 130 if len(cmd) > 0 and (cmd[0] == "!" or cmd[0] == "?"): 131 cmd = cmd[1:] 132 if line.startswith(prefix) and cmd in line: 133 found = True 134 break 135 self.assertTrue( 136 found, "verify '%s' found in console output for '%s'" % (cmd, flavor) 137 ) 138 139 def get_dict_value(self, d, key_path): 140 """Verify each key in the key_path array is in contained in each 141 dictionary within "d". Assert if any key isn't in the 142 corresponding dictionary. This is handy for grabbing values from VS 143 Code response dictionary like getting 144 response['body']['stackFrames'] 145 """ 146 value = d 147 for key in key_path: 148 if key in value: 149 value = value[key] 150 else: 151 self.assertTrue( 152 key in value, 153 'key "%s" from key_path "%s" not in "%s"' % (key, key_path, d), 154 ) 155 return value 156 157 def get_stackFrames_and_totalFramesCount( 158 self, threadId=None, startFrame=None, levels=None, dump=False 159 ): 160 response = self.dap_server.request_stackTrace( 161 threadId=threadId, startFrame=startFrame, levels=levels, dump=dump 162 ) 163 if response: 164 stackFrames = self.get_dict_value(response, ["body", "stackFrames"]) 165 totalFrames = self.get_dict_value(response, ["body", "totalFrames"]) 166 self.assertTrue( 167 totalFrames > 0, 168 "verify totalFrames count is provided by extension that supports " 169 "async frames loading", 170 ) 171 return (stackFrames, totalFrames) 172 return (None, 0) 173 174 def get_stackFrames(self, threadId=None, startFrame=None, levels=None, dump=False): 175 (stackFrames, totalFrames) = self.get_stackFrames_and_totalFramesCount( 176 threadId=threadId, startFrame=startFrame, levels=levels, dump=dump 177 ) 178 return stackFrames 179 180 def get_source_and_line(self, threadId=None, frameIndex=0): 181 stackFrames = self.get_stackFrames( 182 threadId=threadId, startFrame=frameIndex, levels=1 183 ) 184 if stackFrames is not None: 185 stackFrame = stackFrames[0] 186 ["source", "path"] 187 if "source" in stackFrame: 188 source = stackFrame["source"] 189 if "path" in source: 190 if "line" in stackFrame: 191 return (source["path"], stackFrame["line"]) 192 return ("", 0) 193 194 def get_stdout(self, timeout=0.0): 195 return self.dap_server.get_output("stdout", timeout=timeout) 196 197 def get_console(self, timeout=0.0): 198 return self.dap_server.get_output("console", timeout=timeout) 199 200 def collect_console(self, timeout_secs, pattern=None): 201 return self.dap_server.collect_output( 202 "console", timeout_secs=timeout_secs, pattern=pattern 203 ) 204 205 def get_local_as_int(self, name, threadId=None): 206 value = self.dap_server.get_local_variable_value(name, threadId=threadId) 207 # 'value' may have the variable value and summary. 208 # Extract the variable value since summary can have nonnumeric characters. 209 value = value.split(" ")[0] 210 if value.startswith("0x"): 211 return int(value, 16) 212 elif value.startswith("0"): 213 return int(value, 8) 214 else: 215 return int(value) 216 217 def set_local(self, name, value, id=None): 218 """Set a top level local variable only.""" 219 return self.dap_server.request_setVariable(1, name, str(value), id=id) 220 221 def set_global(self, name, value, id=None): 222 """Set a top level global variable only.""" 223 return self.dap_server.request_setVariable(2, name, str(value), id=id) 224 225 def stepIn(self, threadId=None, targetId=None, waitForStop=True): 226 self.dap_server.request_stepIn(threadId=threadId, targetId=targetId) 227 if waitForStop: 228 return self.dap_server.wait_for_stopped() 229 return None 230 231 def stepOver(self, threadId=None, waitForStop=True): 232 self.dap_server.request_next(threadId=threadId) 233 if waitForStop: 234 return self.dap_server.wait_for_stopped() 235 return None 236 237 def stepOut(self, threadId=None, waitForStop=True): 238 self.dap_server.request_stepOut(threadId=threadId) 239 if waitForStop: 240 return self.dap_server.wait_for_stopped() 241 return None 242 243 def continue_to_next_stop(self): 244 self.dap_server.request_continue() 245 return self.dap_server.wait_for_stopped() 246 247 def continue_to_breakpoints(self, breakpoint_ids): 248 self.dap_server.request_continue() 249 self.verify_breakpoint_hit(breakpoint_ids) 250 251 def continue_to_exception_breakpoint(self, filter_label): 252 self.dap_server.request_continue() 253 self.assertTrue( 254 self.verify_stop_exception_info(filter_label), 255 'verify we got "%s"' % (filter_label), 256 ) 257 258 def continue_to_exit(self, exitCode=0): 259 self.dap_server.request_continue() 260 stopped_events = self.dap_server.wait_for_stopped() 261 self.assertEqual( 262 len(stopped_events), 1, "stopped_events = {}".format(stopped_events) 263 ) 264 self.assertEqual( 265 stopped_events[0]["event"], "exited", "make sure program ran to completion" 266 ) 267 self.assertEqual( 268 stopped_events[0]["body"]["exitCode"], 269 exitCode, 270 "exitCode == %i" % (exitCode), 271 ) 272 273 def disassemble(self, threadId=None, frameIndex=None): 274 stackFrames = self.get_stackFrames( 275 threadId=threadId, startFrame=frameIndex, levels=1 276 ) 277 self.assertIsNotNone(stackFrames) 278 memoryReference = stackFrames[0]["instructionPointerReference"] 279 self.assertIsNotNone(memoryReference) 280 281 if memoryReference not in self.dap_server.disassembled_instructions: 282 self.dap_server.request_disassemble(memoryReference=memoryReference) 283 284 return self.dap_server.disassembled_instructions[memoryReference] 285 286 def attach( 287 self, 288 program=None, 289 pid=None, 290 waitFor=None, 291 trace=None, 292 initCommands=None, 293 preRunCommands=None, 294 stopCommands=None, 295 exitCommands=None, 296 attachCommands=None, 297 coreFile=None, 298 disconnectAutomatically=True, 299 terminateCommands=None, 300 postRunCommands=None, 301 sourceMap=None, 302 sourceInitFile=False, 303 expectFailure=False, 304 gdbRemotePort=None, 305 gdbRemoteHostname=None, 306 ): 307 """Build the default Makefile target, create the DAP debug adaptor, 308 and attach to the process. 309 """ 310 311 # Make sure we disconnect and terminate the DAP debug adaptor even 312 # if we throw an exception during the test case. 313 def cleanup(): 314 if disconnectAutomatically: 315 self.dap_server.request_disconnect(terminateDebuggee=True) 316 self.dap_server.terminate() 317 318 # Execute the cleanup function during test case tear down. 319 self.addTearDownHook(cleanup) 320 # Initialize and launch the program 321 self.dap_server.request_initialize(sourceInitFile) 322 response = self.dap_server.request_attach( 323 program=program, 324 pid=pid, 325 waitFor=waitFor, 326 trace=trace, 327 initCommands=initCommands, 328 preRunCommands=preRunCommands, 329 stopCommands=stopCommands, 330 exitCommands=exitCommands, 331 attachCommands=attachCommands, 332 terminateCommands=terminateCommands, 333 coreFile=coreFile, 334 postRunCommands=postRunCommands, 335 sourceMap=sourceMap, 336 gdbRemotePort=gdbRemotePort, 337 gdbRemoteHostname=gdbRemoteHostname, 338 ) 339 if expectFailure: 340 return response 341 if not (response and response["success"]): 342 self.assertTrue( 343 response["success"], "attach failed (%s)" % (response["message"]) 344 ) 345 346 def launch( 347 self, 348 program=None, 349 args=None, 350 cwd=None, 351 env=None, 352 stopOnEntry=False, 353 disableASLR=True, 354 disableSTDIO=False, 355 shellExpandArguments=False, 356 trace=False, 357 initCommands=None, 358 preRunCommands=None, 359 stopCommands=None, 360 exitCommands=None, 361 terminateCommands=None, 362 sourcePath=None, 363 debuggerRoot=None, 364 sourceInitFile=False, 365 launchCommands=None, 366 sourceMap=None, 367 disconnectAutomatically=True, 368 runInTerminal=False, 369 expectFailure=False, 370 postRunCommands=None, 371 enableAutoVariableSummaries=False, 372 enableSyntheticChildDebugging=False, 373 commandEscapePrefix=None, 374 customFrameFormat=None, 375 customThreadFormat=None, 376 ): 377 """Sending launch request to dap""" 378 379 # Make sure we disconnect and terminate the DAP debug adapter, 380 # if we throw an exception during the test case 381 def cleanup(): 382 if disconnectAutomatically: 383 self.dap_server.request_disconnect(terminateDebuggee=True) 384 self.dap_server.terminate() 385 386 # Execute the cleanup function during test case tear down. 387 self.addTearDownHook(cleanup) 388 389 # Initialize and launch the program 390 self.dap_server.request_initialize(sourceInitFile) 391 response = self.dap_server.request_launch( 392 program, 393 args=args, 394 cwd=cwd, 395 env=env, 396 stopOnEntry=stopOnEntry, 397 disableASLR=disableASLR, 398 disableSTDIO=disableSTDIO, 399 shellExpandArguments=shellExpandArguments, 400 trace=trace, 401 initCommands=initCommands, 402 preRunCommands=preRunCommands, 403 stopCommands=stopCommands, 404 exitCommands=exitCommands, 405 terminateCommands=terminateCommands, 406 sourcePath=sourcePath, 407 debuggerRoot=debuggerRoot, 408 launchCommands=launchCommands, 409 sourceMap=sourceMap, 410 runInTerminal=runInTerminal, 411 postRunCommands=postRunCommands, 412 enableAutoVariableSummaries=enableAutoVariableSummaries, 413 enableSyntheticChildDebugging=enableSyntheticChildDebugging, 414 commandEscapePrefix=commandEscapePrefix, 415 customFrameFormat=customFrameFormat, 416 customThreadFormat=customThreadFormat, 417 ) 418 419 if expectFailure: 420 return response 421 422 if not (response and response["success"]): 423 self.assertTrue( 424 response["success"], "launch failed (%s)" % (response["message"]) 425 ) 426 return response 427 428 def build_and_launch( 429 self, 430 program, 431 args=None, 432 cwd=None, 433 env=None, 434 stopOnEntry=False, 435 disableASLR=True, 436 disableSTDIO=False, 437 shellExpandArguments=False, 438 trace=False, 439 initCommands=None, 440 preRunCommands=None, 441 stopCommands=None, 442 exitCommands=None, 443 terminateCommands=None, 444 sourcePath=None, 445 debuggerRoot=None, 446 sourceInitFile=False, 447 runInTerminal=False, 448 disconnectAutomatically=True, 449 postRunCommands=None, 450 lldbDAPEnv=None, 451 enableAutoVariableSummaries=False, 452 enableSyntheticChildDebugging=False, 453 commandEscapePrefix=None, 454 customFrameFormat=None, 455 customThreadFormat=None, 456 launchCommands=None, 457 expectFailure=False, 458 ): 459 """Build the default Makefile target, create the DAP debug adaptor, 460 and launch the process. 461 """ 462 self.build_and_create_debug_adaptor(lldbDAPEnv) 463 self.assertTrue(os.path.exists(program), "executable must exist") 464 465 return self.launch( 466 program, 467 args, 468 cwd, 469 env, 470 stopOnEntry, 471 disableASLR, 472 disableSTDIO, 473 shellExpandArguments, 474 trace, 475 initCommands, 476 preRunCommands, 477 stopCommands, 478 exitCommands, 479 terminateCommands, 480 sourcePath, 481 debuggerRoot, 482 sourceInitFile, 483 runInTerminal=runInTerminal, 484 disconnectAutomatically=disconnectAutomatically, 485 postRunCommands=postRunCommands, 486 enableAutoVariableSummaries=enableAutoVariableSummaries, 487 enableSyntheticChildDebugging=enableSyntheticChildDebugging, 488 commandEscapePrefix=commandEscapePrefix, 489 customFrameFormat=customFrameFormat, 490 customThreadFormat=customThreadFormat, 491 launchCommands=launchCommands, 492 expectFailure=expectFailure, 493 ) 494 495 def getBuiltinDebugServerTool(self): 496 # Tries to find simulation/lldb-server/gdbserver tool path. 497 server_tool = None 498 if lldbplatformutil.getPlatform() == "linux": 499 server_tool = lldbgdbserverutils.get_lldb_server_exe() 500 if server_tool is None: 501 self.dap_server.request_disconnect(terminateDebuggee=True) 502 self.assertIsNotNone(server_tool, "lldb-server not found.") 503 elif lldbplatformutil.getPlatform() == "macosx": 504 server_tool = lldbgdbserverutils.get_debugserver_exe() 505 if server_tool is None: 506 self.dap_server.request_disconnect(terminateDebuggee=True) 507 self.assertIsNotNone(server_tool, "debugserver not found.") 508 return server_tool 509