1import os 2import time 3 4import dap_server 5from lldbsuite.test.lldbtest import * 6from lldbsuite.test import lldbplatformutil 7import lldbgdbserverutils 8 9 10class DAPTestCaseBase(TestBase): 11 # set timeout based on whether ASAN was enabled or not. Increase 12 # timeout by a factor of 10 if ASAN is enabled. 13 timeoutval = 10 * (10 if ('ASAN_OPTIONS' in os.environ) else 1) 14 NO_DEBUG_INFO_TESTCASE = True 15 16 def create_debug_adaptor(self, lldbDAPEnv=None): 17 """Create the Visual Studio Code debug adaptor""" 18 self.assertTrue( 19 is_exe(self.lldbDAPExec), "lldb-dap must exist and be executable" 20 ) 21 log_file_path = self.getBuildArtifact("dap.txt") 22 self.dap_server = dap_server.DebugAdaptorServer( 23 executable=self.lldbDAPExec, 24 init_commands=self.setUpCommands(), 25 log_file=log_file_path, 26 env=lldbDAPEnv, 27 ) 28 29 def build_and_create_debug_adaptor(self, lldbDAPEnv=None): 30 self.build() 31 self.create_debug_adaptor(lldbDAPEnv) 32 33 def set_source_breakpoints(self, source_path, lines, data=None): 34 """Sets source breakpoints and returns an array of strings containing 35 the breakpoint IDs ("1", "2") for each breakpoint that was set. 36 Parameter data is array of data objects for breakpoints. 37 Each object in data is 1:1 mapping with the entry in lines. 38 It contains optional location/hitCondition/logMessage parameters. 39 """ 40 response = self.dap_server.request_setBreakpoints(source_path, lines, data) 41 if response is None: 42 return [] 43 breakpoints = response["body"]["breakpoints"] 44 breakpoint_ids = [] 45 for breakpoint in breakpoints: 46 breakpoint_ids.append("%i" % (breakpoint["id"])) 47 return breakpoint_ids 48 49 def set_function_breakpoints(self, functions, condition=None, hitCondition=None): 50 """Sets breakpoints by function name given an array of function names 51 and returns an array of strings containing the breakpoint IDs 52 ("1", "2") for each breakpoint that was set. 53 """ 54 response = self.dap_server.request_setFunctionBreakpoints( 55 functions, condition=condition, hitCondition=hitCondition 56 ) 57 if response is None: 58 return [] 59 breakpoints = response["body"]["breakpoints"] 60 breakpoint_ids = [] 61 for breakpoint in breakpoints: 62 breakpoint_ids.append("%i" % (breakpoint["id"])) 63 return breakpoint_ids 64 65 def waitUntil(self, condition_callback): 66 for _ in range(20): 67 if condition_callback(): 68 return True 69 time.sleep(0.5) 70 return False 71 72 def verify_breakpoint_hit(self, breakpoint_ids): 73 """Wait for the process we are debugging to stop, and verify we hit 74 any breakpoint location in the "breakpoint_ids" array. 75 "breakpoint_ids" should be a list of breakpoint ID strings 76 (["1", "2"]). The return value from self.set_source_breakpoints() 77 or self.set_function_breakpoints() can be passed to this function""" 78 stopped_events = self.dap_server.wait_for_stopped() 79 for stopped_event in stopped_events: 80 if "body" in stopped_event: 81 body = stopped_event["body"] 82 if "reason" not in body: 83 continue 84 if body["reason"] != "breakpoint": 85 continue 86 if "description" not in body: 87 continue 88 # Descriptions for breakpoints will be in the form 89 # "breakpoint 1.1", so look for any description that matches 90 # ("breakpoint 1.") in the description field as verification 91 # that one of the breakpoint locations was hit. DAP doesn't 92 # allow breakpoints to have multiple locations, but LLDB does. 93 # So when looking at the description we just want to make sure 94 # the right breakpoint matches and not worry about the actual 95 # location. 96 description = body["description"] 97 for breakpoint_id in breakpoint_ids: 98 match_desc = "breakpoint %s." % (breakpoint_id) 99 if match_desc in description: 100 return 101 self.assertTrue(False, "breakpoint not hit") 102 103 def verify_stop_exception_info(self, expected_description): 104 """Wait for the process we are debugging to stop, and verify the stop 105 reason is 'exception' and that the description matches 106 'expected_description' 107 """ 108 stopped_events = self.dap_server.wait_for_stopped() 109 for stopped_event in stopped_events: 110 if "body" in stopped_event: 111 body = stopped_event["body"] 112 if "reason" not in body: 113 continue 114 if body["reason"] != "exception": 115 continue 116 if "description" not in body: 117 continue 118 description = body["description"] 119 if expected_description == description: 120 return True 121 return False 122 123 def verify_commands(self, flavor, output, commands): 124 self.assertTrue(output and len(output) > 0, "expect console output") 125 lines = output.splitlines() 126 prefix = "(lldb) " 127 for cmd in commands: 128 found = False 129 for line in lines: 130 if len(cmd) > 0 and (cmd[0] == "!" or cmd[0] == "?"): 131 cmd = cmd[1:] 132 if line.startswith(prefix) and cmd in line: 133 found = True 134 break 135 self.assertTrue( 136 found, "verify '%s' found in console output for '%s'" % (cmd, flavor) 137 ) 138 139 def get_dict_value(self, d, key_path): 140 """Verify each key in the key_path array is in contained in each 141 dictionary within "d". Assert if any key isn't in the 142 corresponding dictionary. This is handy for grabbing values from VS 143 Code response dictionary like getting 144 response['body']['stackFrames'] 145 """ 146 value = d 147 for key in key_path: 148 if key in value: 149 value = value[key] 150 else: 151 self.assertTrue( 152 key in value, 153 'key "%s" from key_path "%s" not in "%s"' % (key, key_path, d), 154 ) 155 return value 156 157 def get_stackFrames_and_totalFramesCount( 158 self, threadId=None, startFrame=None, levels=None, dump=False 159 ): 160 response = self.dap_server.request_stackTrace( 161 threadId=threadId, startFrame=startFrame, levels=levels, dump=dump 162 ) 163 if response: 164 stackFrames = self.get_dict_value(response, ["body", "stackFrames"]) 165 totalFrames = self.get_dict_value(response, ["body", "totalFrames"]) 166 self.assertTrue( 167 totalFrames > 0, 168 "verify totalFrames count is provided by extension that supports " 169 "async frames loading", 170 ) 171 return (stackFrames, totalFrames) 172 return (None, 0) 173 174 def get_stackFrames(self, threadId=None, startFrame=None, levels=None, dump=False): 175 (stackFrames, totalFrames) = self.get_stackFrames_and_totalFramesCount( 176 threadId=threadId, startFrame=startFrame, levels=levels, dump=dump 177 ) 178 return stackFrames 179 180 def get_source_and_line(self, threadId=None, frameIndex=0): 181 stackFrames = self.get_stackFrames( 182 threadId=threadId, startFrame=frameIndex, levels=1 183 ) 184 if stackFrames is not None: 185 stackFrame = stackFrames[0] 186 ["source", "path"] 187 if "source" in stackFrame: 188 source = stackFrame["source"] 189 if "path" in source: 190 if "line" in stackFrame: 191 return (source["path"], stackFrame["line"]) 192 return ("", 0) 193 194 def get_stdout(self, timeout=0.0): 195 return self.dap_server.get_output("stdout", timeout=timeout) 196 197 def get_console(self, timeout=0.0): 198 return self.dap_server.get_output("console", timeout=timeout) 199 200 def collect_console(self, timeout_secs, pattern=None): 201 return self.dap_server.collect_output( 202 "console", timeout_secs=timeout_secs, pattern=pattern 203 ) 204 205 def get_local_as_int(self, name, threadId=None): 206 value = self.dap_server.get_local_variable_value(name, threadId=threadId) 207 # 'value' may have the variable value and summary. 208 # Extract the variable value since summary can have nonnumeric characters. 209 value = value.split(" ")[0] 210 if value.startswith("0x"): 211 return int(value, 16) 212 elif value.startswith("0"): 213 return int(value, 8) 214 else: 215 return int(value) 216 217 def set_local(self, name, value, id=None): 218 """Set a top level local variable only.""" 219 return self.dap_server.request_setVariable(1, name, str(value), id=id) 220 221 def set_global(self, name, value, id=None): 222 """Set a top level global variable only.""" 223 return self.dap_server.request_setVariable(2, name, str(value), id=id) 224 225 def stepIn( 226 self, threadId=None, targetId=None, waitForStop=True, granularity="statement" 227 ): 228 self.dap_server.request_stepIn( 229 threadId=threadId, targetId=targetId, granularity=granularity 230 ) 231 if waitForStop: 232 return self.dap_server.wait_for_stopped() 233 return None 234 235 def stepOver(self, threadId=None, waitForStop=True, granularity="statement"): 236 self.dap_server.request_next(threadId=threadId, granularity=granularity) 237 if waitForStop: 238 return self.dap_server.wait_for_stopped() 239 return None 240 241 def stepOut(self, threadId=None, waitForStop=True): 242 self.dap_server.request_stepOut(threadId=threadId) 243 if waitForStop: 244 return self.dap_server.wait_for_stopped() 245 return None 246 247 def continue_to_next_stop(self): 248 self.dap_server.request_continue() 249 return self.dap_server.wait_for_stopped() 250 251 def continue_to_breakpoints(self, breakpoint_ids): 252 self.dap_server.request_continue() 253 self.verify_breakpoint_hit(breakpoint_ids) 254 255 def continue_to_exception_breakpoint(self, filter_label): 256 self.dap_server.request_continue() 257 self.assertTrue( 258 self.verify_stop_exception_info(filter_label), 259 'verify we got "%s"' % (filter_label), 260 ) 261 262 def continue_to_exit(self, exitCode=0): 263 self.dap_server.request_continue() 264 stopped_events = self.dap_server.wait_for_stopped() 265 self.assertEqual( 266 len(stopped_events), 1, "stopped_events = {}".format(stopped_events) 267 ) 268 self.assertEqual( 269 stopped_events[0]["event"], "exited", "make sure program ran to completion" 270 ) 271 self.assertEqual( 272 stopped_events[0]["body"]["exitCode"], 273 exitCode, 274 "exitCode == %i" % (exitCode), 275 ) 276 277 def disassemble(self, threadId=None, frameIndex=None): 278 stackFrames = self.get_stackFrames( 279 threadId=threadId, startFrame=frameIndex, levels=1 280 ) 281 self.assertIsNotNone(stackFrames) 282 memoryReference = stackFrames[0]["instructionPointerReference"] 283 self.assertIsNotNone(memoryReference) 284 285 if memoryReference not in self.dap_server.disassembled_instructions: 286 self.dap_server.request_disassemble(memoryReference=memoryReference) 287 288 return self.dap_server.disassembled_instructions[memoryReference] 289 290 def attach( 291 self, 292 program=None, 293 pid=None, 294 waitFor=None, 295 trace=None, 296 initCommands=None, 297 preRunCommands=None, 298 stopCommands=None, 299 exitCommands=None, 300 attachCommands=None, 301 coreFile=None, 302 disconnectAutomatically=True, 303 terminateCommands=None, 304 postRunCommands=None, 305 sourceMap=None, 306 sourceInitFile=False, 307 expectFailure=False, 308 gdbRemotePort=None, 309 gdbRemoteHostname=None, 310 ): 311 """Build the default Makefile target, create the DAP debug adaptor, 312 and attach to the process. 313 """ 314 315 # Make sure we disconnect and terminate the DAP debug adaptor even 316 # if we throw an exception during the test case. 317 def cleanup(): 318 if disconnectAutomatically: 319 self.dap_server.request_disconnect(terminateDebuggee=True) 320 self.dap_server.terminate() 321 322 # Execute the cleanup function during test case tear down. 323 self.addTearDownHook(cleanup) 324 # Initialize and launch the program 325 self.dap_server.request_initialize(sourceInitFile) 326 response = self.dap_server.request_attach( 327 program=program, 328 pid=pid, 329 waitFor=waitFor, 330 trace=trace, 331 initCommands=initCommands, 332 preRunCommands=preRunCommands, 333 stopCommands=stopCommands, 334 exitCommands=exitCommands, 335 attachCommands=attachCommands, 336 terminateCommands=terminateCommands, 337 coreFile=coreFile, 338 postRunCommands=postRunCommands, 339 sourceMap=sourceMap, 340 gdbRemotePort=gdbRemotePort, 341 gdbRemoteHostname=gdbRemoteHostname, 342 ) 343 if expectFailure: 344 return response 345 if not (response and response["success"]): 346 self.assertTrue( 347 response["success"], "attach failed (%s)" % (response["message"]) 348 ) 349 350 def launch( 351 self, 352 program=None, 353 args=None, 354 cwd=None, 355 env=None, 356 stopOnEntry=False, 357 disableASLR=True, 358 disableSTDIO=False, 359 shellExpandArguments=False, 360 trace=False, 361 initCommands=None, 362 preRunCommands=None, 363 stopCommands=None, 364 exitCommands=None, 365 terminateCommands=None, 366 sourcePath=None, 367 debuggerRoot=None, 368 sourceInitFile=False, 369 launchCommands=None, 370 sourceMap=None, 371 disconnectAutomatically=True, 372 runInTerminal=False, 373 expectFailure=False, 374 postRunCommands=None, 375 enableAutoVariableSummaries=False, 376 enableSyntheticChildDebugging=False, 377 commandEscapePrefix=None, 378 customFrameFormat=None, 379 customThreadFormat=None, 380 ): 381 """Sending launch request to dap""" 382 383 # Make sure we disconnect and terminate the DAP debug adapter, 384 # if we throw an exception during the test case 385 def cleanup(): 386 if disconnectAutomatically: 387 self.dap_server.request_disconnect(terminateDebuggee=True) 388 self.dap_server.terminate() 389 390 # Execute the cleanup function during test case tear down. 391 self.addTearDownHook(cleanup) 392 393 # Initialize and launch the program 394 self.dap_server.request_initialize(sourceInitFile) 395 response = self.dap_server.request_launch( 396 program, 397 args=args, 398 cwd=cwd, 399 env=env, 400 stopOnEntry=stopOnEntry, 401 disableASLR=disableASLR, 402 disableSTDIO=disableSTDIO, 403 shellExpandArguments=shellExpandArguments, 404 trace=trace, 405 initCommands=initCommands, 406 preRunCommands=preRunCommands, 407 stopCommands=stopCommands, 408 exitCommands=exitCommands, 409 terminateCommands=terminateCommands, 410 sourcePath=sourcePath, 411 debuggerRoot=debuggerRoot, 412 launchCommands=launchCommands, 413 sourceMap=sourceMap, 414 runInTerminal=runInTerminal, 415 postRunCommands=postRunCommands, 416 enableAutoVariableSummaries=enableAutoVariableSummaries, 417 enableSyntheticChildDebugging=enableSyntheticChildDebugging, 418 commandEscapePrefix=commandEscapePrefix, 419 customFrameFormat=customFrameFormat, 420 customThreadFormat=customThreadFormat, 421 ) 422 423 if expectFailure: 424 return response 425 426 if not (response and response["success"]): 427 self.assertTrue( 428 response["success"], "launch failed (%s)" % (response["message"]) 429 ) 430 return response 431 432 def build_and_launch( 433 self, 434 program, 435 args=None, 436 cwd=None, 437 env=None, 438 stopOnEntry=False, 439 disableASLR=True, 440 disableSTDIO=False, 441 shellExpandArguments=False, 442 trace=False, 443 initCommands=None, 444 preRunCommands=None, 445 stopCommands=None, 446 exitCommands=None, 447 terminateCommands=None, 448 sourcePath=None, 449 debuggerRoot=None, 450 sourceInitFile=False, 451 runInTerminal=False, 452 disconnectAutomatically=True, 453 postRunCommands=None, 454 lldbDAPEnv=None, 455 enableAutoVariableSummaries=False, 456 enableSyntheticChildDebugging=False, 457 commandEscapePrefix=None, 458 customFrameFormat=None, 459 customThreadFormat=None, 460 launchCommands=None, 461 expectFailure=False, 462 ): 463 """Build the default Makefile target, create the DAP debug adaptor, 464 and launch the process. 465 """ 466 self.build_and_create_debug_adaptor(lldbDAPEnv) 467 self.assertTrue(os.path.exists(program), "executable must exist") 468 469 return self.launch( 470 program, 471 args, 472 cwd, 473 env, 474 stopOnEntry, 475 disableASLR, 476 disableSTDIO, 477 shellExpandArguments, 478 trace, 479 initCommands, 480 preRunCommands, 481 stopCommands, 482 exitCommands, 483 terminateCommands, 484 sourcePath, 485 debuggerRoot, 486 sourceInitFile, 487 runInTerminal=runInTerminal, 488 disconnectAutomatically=disconnectAutomatically, 489 postRunCommands=postRunCommands, 490 enableAutoVariableSummaries=enableAutoVariableSummaries, 491 enableSyntheticChildDebugging=enableSyntheticChildDebugging, 492 commandEscapePrefix=commandEscapePrefix, 493 customFrameFormat=customFrameFormat, 494 customThreadFormat=customThreadFormat, 495 launchCommands=launchCommands, 496 expectFailure=expectFailure, 497 ) 498 499 def getBuiltinDebugServerTool(self): 500 # Tries to find simulation/lldb-server/gdbserver tool path. 501 server_tool = None 502 if lldbplatformutil.getPlatform() == "linux": 503 server_tool = lldbgdbserverutils.get_lldb_server_exe() 504 if server_tool is None: 505 self.dap_server.request_disconnect(terminateDebuggee=True) 506 self.assertIsNotNone(server_tool, "lldb-server not found.") 507 elif lldbplatformutil.getPlatform() == "macosx": 508 server_tool = lldbgdbserverutils.get_debugserver_exe() 509 if server_tool is None: 510 self.dap_server.request_disconnect(terminateDebuggee=True) 511 self.assertIsNotNone(server_tool, "debugserver not found.") 512 return server_tool 513