1import os 2import time 3 4import dap_server 5from lldbsuite.test.lldbtest import * 6from lldbsuite.test import lldbplatformutil 7import lldbgdbserverutils 8 9 10class DAPTestCaseBase(TestBase): 11 # set timeout based on whether ASAN was enabled or not. Increase 12 # timeout by a factor of 10 if ASAN is enabled. 13 timeoutval = 10 * (10 if ('ASAN_OPTIONS' in os.environ) else 1) 14 NO_DEBUG_INFO_TESTCASE = True 15 16 def create_debug_adaptor(self, lldbDAPEnv=None): 17 """Create the Visual Studio Code debug adaptor""" 18 self.assertTrue( 19 is_exe(self.lldbDAPExec), "lldb-dap must exist and be executable" 20 ) 21 log_file_path = self.getBuildArtifact("dap.txt") 22 self.dap_server = dap_server.DebugAdaptorServer( 23 executable=self.lldbDAPExec, 24 init_commands=self.setUpCommands(), 25 log_file=log_file_path, 26 env=lldbDAPEnv, 27 ) 28 29 def build_and_create_debug_adaptor(self, lldbDAPEnv=None): 30 self.build() 31 self.create_debug_adaptor(lldbDAPEnv) 32 33 def set_source_breakpoints(self, source_path, lines, data=None): 34 """Sets source breakpoints and returns an array of strings containing 35 the breakpoint IDs ("1", "2") for each breakpoint that was set. 36 Parameter data is array of data objects for breakpoints. 37 Each object in data is 1:1 mapping with the entry in lines. 38 It contains optional location/hitCondition/logMessage parameters. 39 """ 40 response = self.dap_server.request_setBreakpoints(source_path, lines, data) 41 if response is None: 42 return [] 43 breakpoints = response["body"]["breakpoints"] 44 breakpoint_ids = [] 45 for breakpoint in breakpoints: 46 breakpoint_ids.append("%i" % (breakpoint["id"])) 47 return breakpoint_ids 48 49 def set_function_breakpoints(self, functions, condition=None, hitCondition=None): 50 """Sets breakpoints by function name given an array of function names 51 and returns an array of strings containing the breakpoint IDs 52 ("1", "2") for each breakpoint that was set. 53 """ 54 response = self.dap_server.request_setFunctionBreakpoints( 55 functions, condition=condition, hitCondition=hitCondition 56 ) 57 if response is None: 58 return [] 59 breakpoints = response["body"]["breakpoints"] 60 breakpoint_ids = [] 61 for breakpoint in breakpoints: 62 breakpoint_ids.append("%i" % (breakpoint["id"])) 63 return breakpoint_ids 64 65 def waitUntil(self, condition_callback): 66 for _ in range(20): 67 if condition_callback(): 68 return True 69 time.sleep(0.5) 70 return False 71 72 def verify_breakpoint_hit(self, breakpoint_ids): 73 """Wait for the process we are debugging to stop, and verify we hit 74 any breakpoint location in the "breakpoint_ids" array. 75 "breakpoint_ids" should be a list of breakpoint ID strings 76 (["1", "2"]). The return value from self.set_source_breakpoints() 77 or self.set_function_breakpoints() can be passed to this function""" 78 stopped_events = self.dap_server.wait_for_stopped() 79 for stopped_event in stopped_events: 80 if "body" in stopped_event: 81 body = stopped_event["body"] 82 if "reason" not in body: 83 continue 84 if ( 85 body["reason"] != "breakpoint" 86 and body["reason"] != "instruction breakpoint" 87 ): 88 continue 89 if "description" not in body: 90 continue 91 # Descriptions for breakpoints will be in the form 92 # "breakpoint 1.1", so look for any description that matches 93 # ("breakpoint 1.") in the description field as verification 94 # that one of the breakpoint locations was hit. DAP doesn't 95 # allow breakpoints to have multiple locations, but LLDB does. 96 # So when looking at the description we just want to make sure 97 # the right breakpoint matches and not worry about the actual 98 # location. 99 description = body["description"] 100 for breakpoint_id in breakpoint_ids: 101 match_desc = "breakpoint %s." % (breakpoint_id) 102 if match_desc in description: 103 return 104 self.assertTrue(False, "breakpoint not hit") 105 106 def verify_stop_exception_info(self, expected_description, timeout=timeoutval): 107 """Wait for the process we are debugging to stop, and verify the stop 108 reason is 'exception' and that the description matches 109 'expected_description' 110 """ 111 stopped_events = self.dap_server.wait_for_stopped(timeout=timeout) 112 for stopped_event in stopped_events: 113 print("stopped_event", stopped_event) 114 if "body" in stopped_event: 115 body = stopped_event["body"] 116 if "reason" not in body: 117 continue 118 if body["reason"] != "exception": 119 continue 120 if "description" not in body: 121 continue 122 description = body["description"] 123 if expected_description == description: 124 return True 125 return False 126 127 def verify_commands(self, flavor, output, commands): 128 self.assertTrue(output and len(output) > 0, "expect console output") 129 lines = output.splitlines() 130 prefix = "(lldb) " 131 for cmd in commands: 132 found = False 133 for line in lines: 134 if len(cmd) > 0 and (cmd[0] == "!" or cmd[0] == "?"): 135 cmd = cmd[1:] 136 if line.startswith(prefix) and cmd in line: 137 found = True 138 break 139 self.assertTrue( 140 found, "verify '%s' found in console output for '%s'" % (cmd, flavor) 141 ) 142 143 def get_dict_value(self, d, key_path): 144 """Verify each key in the key_path array is in contained in each 145 dictionary within "d". Assert if any key isn't in the 146 corresponding dictionary. This is handy for grabbing values from VS 147 Code response dictionary like getting 148 response['body']['stackFrames'] 149 """ 150 value = d 151 for key in key_path: 152 if key in value: 153 value = value[key] 154 else: 155 self.assertTrue( 156 key in value, 157 'key "%s" from key_path "%s" not in "%s"' % (key, key_path, d), 158 ) 159 return value 160 161 def get_stackFrames_and_totalFramesCount( 162 self, threadId=None, startFrame=None, levels=None, dump=False 163 ): 164 response = self.dap_server.request_stackTrace( 165 threadId=threadId, startFrame=startFrame, levels=levels, dump=dump 166 ) 167 if response: 168 stackFrames = self.get_dict_value(response, ["body", "stackFrames"]) 169 totalFrames = self.get_dict_value(response, ["body", "totalFrames"]) 170 self.assertTrue( 171 totalFrames > 0, 172 "verify totalFrames count is provided by extension that supports " 173 "async frames loading", 174 ) 175 return (stackFrames, totalFrames) 176 return (None, 0) 177 178 def get_stackFrames(self, threadId=None, startFrame=None, levels=None, dump=False): 179 (stackFrames, totalFrames) = self.get_stackFrames_and_totalFramesCount( 180 threadId=threadId, startFrame=startFrame, levels=levels, dump=dump 181 ) 182 return stackFrames 183 184 def get_exceptionInfo(self, threadId=None): 185 response = self.dap_server.request_exceptionInfo(threadId=threadId) 186 return self.get_dict_value(response, ["body"]) 187 188 def get_source_and_line(self, threadId=None, frameIndex=0): 189 stackFrames = self.get_stackFrames( 190 threadId=threadId, startFrame=frameIndex, levels=1 191 ) 192 if stackFrames is not None: 193 stackFrame = stackFrames[0] 194 ["source", "path"] 195 if "source" in stackFrame: 196 source = stackFrame["source"] 197 if "path" in source: 198 if "line" in stackFrame: 199 return (source["path"], stackFrame["line"]) 200 return ("", 0) 201 202 def get_stdout(self, timeout=0.0): 203 return self.dap_server.get_output("stdout", timeout=timeout) 204 205 def get_console(self, timeout=0.0): 206 return self.dap_server.get_output("console", timeout=timeout) 207 208 def collect_console(self, timeout_secs, pattern=None): 209 return self.dap_server.collect_output( 210 "console", timeout_secs=timeout_secs, pattern=pattern 211 ) 212 213 def collect_stdout(self, timeout_secs, pattern=None): 214 return self.dap_server.collect_output( 215 "stdout", timeout_secs=timeout_secs, pattern=pattern 216 ) 217 218 def get_local_as_int(self, name, threadId=None): 219 value = self.dap_server.get_local_variable_value(name, threadId=threadId) 220 # 'value' may have the variable value and summary. 221 # Extract the variable value since summary can have nonnumeric characters. 222 value = value.split(" ")[0] 223 if value.startswith("0x"): 224 return int(value, 16) 225 elif value.startswith("0"): 226 return int(value, 8) 227 else: 228 return int(value) 229 230 def set_local(self, name, value, id=None): 231 """Set a top level local variable only.""" 232 return self.dap_server.request_setVariable(1, name, str(value), id=id) 233 234 def set_global(self, name, value, id=None): 235 """Set a top level global variable only.""" 236 return self.dap_server.request_setVariable(2, name, str(value), id=id) 237 238 def stepIn( 239 self, threadId=None, targetId=None, waitForStop=True, granularity="statement" 240 ): 241 response = self.dap_server.request_stepIn( 242 threadId=threadId, targetId=targetId, granularity=granularity 243 ) 244 self.assertTrue(response["success"]) 245 if waitForStop: 246 return self.dap_server.wait_for_stopped() 247 return None 248 249 def stepOver(self, threadId=None, waitForStop=True, granularity="statement"): 250 self.dap_server.request_next(threadId=threadId, granularity=granularity) 251 if waitForStop: 252 return self.dap_server.wait_for_stopped() 253 return None 254 255 def stepOut(self, threadId=None, waitForStop=True): 256 self.dap_server.request_stepOut(threadId=threadId) 257 if waitForStop: 258 return self.dap_server.wait_for_stopped() 259 return None 260 261 def continue_to_next_stop(self): 262 self.dap_server.request_continue() 263 return self.dap_server.wait_for_stopped() 264 265 def continue_to_breakpoints(self, breakpoint_ids): 266 self.dap_server.request_continue() 267 self.verify_breakpoint_hit(breakpoint_ids) 268 269 def continue_to_exception_breakpoint(self, filter_label): 270 self.dap_server.request_continue() 271 self.assertTrue( 272 self.verify_stop_exception_info(filter_label), 273 'verify we got "%s"' % (filter_label), 274 ) 275 276 def continue_to_exit(self, exitCode=0): 277 self.dap_server.request_continue() 278 stopped_events = self.dap_server.wait_for_stopped() 279 self.assertEqual( 280 len(stopped_events), 1, "stopped_events = {}".format(stopped_events) 281 ) 282 self.assertEqual( 283 stopped_events[0]["event"], "exited", "make sure program ran to completion" 284 ) 285 self.assertEqual( 286 stopped_events[0]["body"]["exitCode"], 287 exitCode, 288 "exitCode == %i" % (exitCode), 289 ) 290 291 def disassemble(self, threadId=None, frameIndex=None): 292 stackFrames = self.get_stackFrames( 293 threadId=threadId, startFrame=frameIndex, levels=1 294 ) 295 self.assertIsNotNone(stackFrames) 296 memoryReference = stackFrames[0]["instructionPointerReference"] 297 self.assertIsNotNone(memoryReference) 298 299 if memoryReference not in self.dap_server.disassembled_instructions: 300 self.dap_server.request_disassemble(memoryReference=memoryReference) 301 302 return self.dap_server.disassembled_instructions[memoryReference] 303 304 def attach( 305 self, 306 program=None, 307 pid=None, 308 waitFor=None, 309 trace=None, 310 initCommands=None, 311 preRunCommands=None, 312 stopCommands=None, 313 exitCommands=None, 314 attachCommands=None, 315 coreFile=None, 316 disconnectAutomatically=True, 317 terminateCommands=None, 318 postRunCommands=None, 319 sourceMap=None, 320 sourceInitFile=False, 321 expectFailure=False, 322 gdbRemotePort=None, 323 gdbRemoteHostname=None, 324 ): 325 """Build the default Makefile target, create the DAP debug adaptor, 326 and attach to the process. 327 """ 328 329 # Make sure we disconnect and terminate the DAP debug adaptor even 330 # if we throw an exception during the test case. 331 def cleanup(): 332 if disconnectAutomatically: 333 self.dap_server.request_disconnect(terminateDebuggee=True) 334 self.dap_server.terminate() 335 336 # Execute the cleanup function during test case tear down. 337 self.addTearDownHook(cleanup) 338 # Initialize and launch the program 339 self.dap_server.request_initialize(sourceInitFile) 340 response = self.dap_server.request_attach( 341 program=program, 342 pid=pid, 343 waitFor=waitFor, 344 trace=trace, 345 initCommands=initCommands, 346 preRunCommands=preRunCommands, 347 stopCommands=stopCommands, 348 exitCommands=exitCommands, 349 attachCommands=attachCommands, 350 terminateCommands=terminateCommands, 351 coreFile=coreFile, 352 postRunCommands=postRunCommands, 353 sourceMap=sourceMap, 354 gdbRemotePort=gdbRemotePort, 355 gdbRemoteHostname=gdbRemoteHostname, 356 ) 357 if expectFailure: 358 return response 359 if not (response and response["success"]): 360 self.assertTrue( 361 response["success"], "attach failed (%s)" % (response["message"]) 362 ) 363 364 def launch( 365 self, 366 program=None, 367 args=None, 368 cwd=None, 369 env=None, 370 stopOnEntry=False, 371 disableASLR=False, 372 disableSTDIO=False, 373 shellExpandArguments=False, 374 trace=False, 375 initCommands=None, 376 preRunCommands=None, 377 stopCommands=None, 378 exitCommands=None, 379 terminateCommands=None, 380 sourcePath=None, 381 debuggerRoot=None, 382 sourceInitFile=False, 383 launchCommands=None, 384 sourceMap=None, 385 disconnectAutomatically=True, 386 runInTerminal=False, 387 expectFailure=False, 388 postRunCommands=None, 389 enableAutoVariableSummaries=False, 390 displayExtendedBacktrace=False, 391 enableSyntheticChildDebugging=False, 392 commandEscapePrefix=None, 393 customFrameFormat=None, 394 customThreadFormat=None, 395 ): 396 """Sending launch request to dap""" 397 398 # Make sure we disconnect and terminate the DAP debug adapter, 399 # if we throw an exception during the test case 400 def cleanup(): 401 if disconnectAutomatically: 402 self.dap_server.request_disconnect(terminateDebuggee=True) 403 self.dap_server.terminate() 404 405 # Execute the cleanup function during test case tear down. 406 self.addTearDownHook(cleanup) 407 408 # Initialize and launch the program 409 self.dap_server.request_initialize(sourceInitFile) 410 response = self.dap_server.request_launch( 411 program, 412 args=args, 413 cwd=cwd, 414 env=env, 415 stopOnEntry=stopOnEntry, 416 disableASLR=disableASLR, 417 disableSTDIO=disableSTDIO, 418 shellExpandArguments=shellExpandArguments, 419 trace=trace, 420 initCommands=initCommands, 421 preRunCommands=preRunCommands, 422 stopCommands=stopCommands, 423 exitCommands=exitCommands, 424 terminateCommands=terminateCommands, 425 sourcePath=sourcePath, 426 debuggerRoot=debuggerRoot, 427 launchCommands=launchCommands, 428 sourceMap=sourceMap, 429 runInTerminal=runInTerminal, 430 postRunCommands=postRunCommands, 431 enableAutoVariableSummaries=enableAutoVariableSummaries, 432 displayExtendedBacktrace=displayExtendedBacktrace, 433 enableSyntheticChildDebugging=enableSyntheticChildDebugging, 434 commandEscapePrefix=commandEscapePrefix, 435 customFrameFormat=customFrameFormat, 436 customThreadFormat=customThreadFormat, 437 ) 438 439 if expectFailure: 440 return response 441 442 if not (response and response["success"]): 443 self.assertTrue( 444 response["success"], "launch failed (%s)" % (response["message"]) 445 ) 446 return response 447 448 def build_and_launch( 449 self, 450 program, 451 args=None, 452 cwd=None, 453 env=None, 454 stopOnEntry=False, 455 disableASLR=False, 456 disableSTDIO=False, 457 shellExpandArguments=False, 458 trace=False, 459 initCommands=None, 460 preRunCommands=None, 461 stopCommands=None, 462 exitCommands=None, 463 terminateCommands=None, 464 sourcePath=None, 465 debuggerRoot=None, 466 sourceInitFile=False, 467 runInTerminal=False, 468 disconnectAutomatically=True, 469 postRunCommands=None, 470 lldbDAPEnv=None, 471 enableAutoVariableSummaries=False, 472 displayExtendedBacktrace=False, 473 enableSyntheticChildDebugging=False, 474 commandEscapePrefix=None, 475 customFrameFormat=None, 476 customThreadFormat=None, 477 launchCommands=None, 478 expectFailure=False, 479 ): 480 """Build the default Makefile target, create the DAP debug adaptor, 481 and launch the process. 482 """ 483 self.build_and_create_debug_adaptor(lldbDAPEnv) 484 self.assertTrue(os.path.exists(program), "executable must exist") 485 486 return self.launch( 487 program, 488 args, 489 cwd, 490 env, 491 stopOnEntry, 492 disableASLR, 493 disableSTDIO, 494 shellExpandArguments, 495 trace, 496 initCommands, 497 preRunCommands, 498 stopCommands, 499 exitCommands, 500 terminateCommands, 501 sourcePath, 502 debuggerRoot, 503 sourceInitFile, 504 runInTerminal=runInTerminal, 505 disconnectAutomatically=disconnectAutomatically, 506 postRunCommands=postRunCommands, 507 enableAutoVariableSummaries=enableAutoVariableSummaries, 508 enableSyntheticChildDebugging=enableSyntheticChildDebugging, 509 displayExtendedBacktrace=displayExtendedBacktrace, 510 commandEscapePrefix=commandEscapePrefix, 511 customFrameFormat=customFrameFormat, 512 customThreadFormat=customThreadFormat, 513 launchCommands=launchCommands, 514 expectFailure=expectFailure, 515 ) 516 517 def getBuiltinDebugServerTool(self): 518 # Tries to find simulation/lldb-server/gdbserver tool path. 519 server_tool = None 520 if lldbplatformutil.getPlatform() == "linux": 521 server_tool = lldbgdbserverutils.get_lldb_server_exe() 522 if server_tool is None: 523 self.dap_server.request_disconnect(terminateDebuggee=True) 524 self.assertIsNotNone(server_tool, "lldb-server not found.") 525 elif lldbplatformutil.getPlatform() == "macosx": 526 server_tool = lldbgdbserverutils.get_debugserver_exe() 527 if server_tool is None: 528 self.dap_server.request_disconnect(terminateDebuggee=True) 529 self.assertIsNotNone(server_tool, "debugserver not found.") 530 return server_tool 531