1import os 2import time 3 4import dap_server 5from lldbsuite.test.lldbtest import * 6 7 8class DAPTestCaseBase(TestBase): 9 NO_DEBUG_INFO_TESTCASE = True 10 11 def create_debug_adaptor(self, lldbDAPEnv=None): 12 """Create the Visual Studio Code debug adaptor""" 13 self.assertTrue( 14 is_exe(self.lldbDAPExec), "lldb-dap must exist and be executable" 15 ) 16 log_file_path = self.getBuildArtifact("dap.txt") 17 self.dap_server = dap_server.DebugAdaptorServer( 18 executable=self.lldbDAPExec, 19 init_commands=self.setUpCommands(), 20 log_file=log_file_path, 21 env=lldbDAPEnv, 22 ) 23 24 def build_and_create_debug_adaptor(self, lldbDAPEnv=None): 25 self.build() 26 self.create_debug_adaptor(lldbDAPEnv) 27 28 def set_source_breakpoints(self, source_path, lines, data=None): 29 """Sets source breakpoints and returns an array of strings containing 30 the breakpoint IDs ("1", "2") for each breakpoint that was set. 31 Parameter data is array of data objects for breakpoints. 32 Each object in data is 1:1 mapping with the entry in lines. 33 It contains optional location/hitCondition/logMessage parameters. 34 """ 35 response = self.dap_server.request_setBreakpoints(source_path, lines, data) 36 if response is None: 37 return [] 38 breakpoints = response["body"]["breakpoints"] 39 breakpoint_ids = [] 40 for breakpoint in breakpoints: 41 breakpoint_ids.append("%i" % (breakpoint["id"])) 42 return breakpoint_ids 43 44 def set_function_breakpoints(self, functions, condition=None, hitCondition=None): 45 """Sets breakpoints by function name given an array of function names 46 and returns an array of strings containing the breakpoint IDs 47 ("1", "2") for each breakpoint that was set. 48 """ 49 response = self.dap_server.request_setFunctionBreakpoints( 50 functions, condition=condition, hitCondition=hitCondition 51 ) 52 if response is None: 53 return [] 54 breakpoints = response["body"]["breakpoints"] 55 breakpoint_ids = [] 56 for breakpoint in breakpoints: 57 breakpoint_ids.append("%i" % (breakpoint["id"])) 58 return breakpoint_ids 59 60 def waitUntil(self, condition_callback): 61 for _ in range(20): 62 if condition_callback(): 63 return True 64 time.sleep(0.5) 65 return False 66 67 def verify_breakpoint_hit(self, breakpoint_ids): 68 """Wait for the process we are debugging to stop, and verify we hit 69 any breakpoint location in the "breakpoint_ids" array. 70 "breakpoint_ids" should be a list of breakpoint ID strings 71 (["1", "2"]). The return value from self.set_source_breakpoints() 72 or self.set_function_breakpoints() can be passed to this function""" 73 stopped_events = self.dap_server.wait_for_stopped() 74 for stopped_event in stopped_events: 75 if "body" in stopped_event: 76 body = stopped_event["body"] 77 if "reason" not in body: 78 continue 79 if body["reason"] != "breakpoint": 80 continue 81 if "description" not in body: 82 continue 83 # Descriptions for breakpoints will be in the form 84 # "breakpoint 1.1", so look for any description that matches 85 # ("breakpoint 1.") in the description field as verification 86 # that one of the breakpoint locations was hit. DAP doesn't 87 # allow breakpoints to have multiple locations, but LLDB does. 88 # So when looking at the description we just want to make sure 89 # the right breakpoint matches and not worry about the actual 90 # location. 91 description = body["description"] 92 for breakpoint_id in breakpoint_ids: 93 match_desc = "breakpoint %s." % (breakpoint_id) 94 if match_desc in description: 95 return 96 self.assertTrue(False, "breakpoint not hit") 97 98 def verify_stop_exception_info(self, expected_description): 99 """Wait for the process we are debugging to stop, and verify the stop 100 reason is 'exception' and that the description matches 101 'expected_description' 102 """ 103 stopped_events = self.dap_server.wait_for_stopped() 104 for stopped_event in stopped_events: 105 if "body" in stopped_event: 106 body = stopped_event["body"] 107 if "reason" not in body: 108 continue 109 if body["reason"] != "exception": 110 continue 111 if "description" not in body: 112 continue 113 description = body["description"] 114 if expected_description == description: 115 return True 116 return False 117 118 def verify_commands(self, flavor, output, commands): 119 self.assertTrue(output and len(output) > 0, "expect console output") 120 lines = output.splitlines() 121 prefix = "(lldb) " 122 for cmd in commands: 123 found = False 124 for line in lines: 125 if line.startswith(prefix) and cmd in line: 126 found = True 127 break 128 self.assertTrue( 129 found, "verify '%s' found in console output for '%s'" % (cmd, flavor) 130 ) 131 132 def get_dict_value(self, d, key_path): 133 """Verify each key in the key_path array is in contained in each 134 dictionary within "d". Assert if any key isn't in the 135 corresponding dictionary. This is handy for grabbing values from VS 136 Code response dictionary like getting 137 response['body']['stackFrames'] 138 """ 139 value = d 140 for key in key_path: 141 if key in value: 142 value = value[key] 143 else: 144 self.assertTrue( 145 key in value, 146 'key "%s" from key_path "%s" not in "%s"' % (key, key_path, d), 147 ) 148 return value 149 150 def get_stackFrames_and_totalFramesCount( 151 self, threadId=None, startFrame=None, levels=None, dump=False 152 ): 153 response = self.dap_server.request_stackTrace( 154 threadId=threadId, startFrame=startFrame, levels=levels, dump=dump 155 ) 156 if response: 157 stackFrames = self.get_dict_value(response, ["body", "stackFrames"]) 158 totalFrames = self.get_dict_value(response, ["body", "totalFrames"]) 159 self.assertTrue( 160 totalFrames > 0, 161 "verify totalFrames count is provided by extension that supports " 162 "async frames loading", 163 ) 164 return (stackFrames, totalFrames) 165 return (None, 0) 166 167 def get_stackFrames(self, threadId=None, startFrame=None, levels=None, dump=False): 168 (stackFrames, totalFrames) = self.get_stackFrames_and_totalFramesCount( 169 threadId=threadId, startFrame=startFrame, levels=levels, dump=dump 170 ) 171 return stackFrames 172 173 def get_source_and_line(self, threadId=None, frameIndex=0): 174 stackFrames = self.get_stackFrames( 175 threadId=threadId, startFrame=frameIndex, levels=1 176 ) 177 if stackFrames is not None: 178 stackFrame = stackFrames[0] 179 ["source", "path"] 180 if "source" in stackFrame: 181 source = stackFrame["source"] 182 if "path" in source: 183 if "line" in stackFrame: 184 return (source["path"], stackFrame["line"]) 185 return ("", 0) 186 187 def get_stdout(self, timeout=0.0): 188 return self.dap_server.get_output("stdout", timeout=timeout) 189 190 def get_console(self, timeout=0.0): 191 return self.dap_server.get_output("console", timeout=timeout) 192 193 def collect_console(self, duration): 194 return self.dap_server.collect_output("console", duration=duration) 195 196 def get_local_as_int(self, name, threadId=None): 197 value = self.dap_server.get_local_variable_value(name, threadId=threadId) 198 # 'value' may have the variable value and summary. 199 # Extract the variable value since summary can have nonnumeric characters. 200 value = value.split(" ")[0] 201 if value.startswith("0x"): 202 return int(value, 16) 203 elif value.startswith("0"): 204 return int(value, 8) 205 else: 206 return int(value) 207 208 def set_local(self, name, value, id=None): 209 """Set a top level local variable only.""" 210 return self.dap_server.request_setVariable(1, name, str(value), id=id) 211 212 def set_global(self, name, value, id=None): 213 """Set a top level global variable only.""" 214 return self.dap_server.request_setVariable(2, name, str(value), id=id) 215 216 def stepIn(self, threadId=None, waitForStop=True): 217 self.dap_server.request_stepIn(threadId=threadId) 218 if waitForStop: 219 return self.dap_server.wait_for_stopped() 220 return None 221 222 def stepOver(self, threadId=None, waitForStop=True): 223 self.dap_server.request_next(threadId=threadId) 224 if waitForStop: 225 return self.dap_server.wait_for_stopped() 226 return None 227 228 def stepOut(self, threadId=None, waitForStop=True): 229 self.dap_server.request_stepOut(threadId=threadId) 230 if waitForStop: 231 return self.dap_server.wait_for_stopped() 232 return None 233 234 def continue_to_next_stop(self): 235 self.dap_server.request_continue() 236 return self.dap_server.wait_for_stopped() 237 238 def continue_to_breakpoints(self, breakpoint_ids): 239 self.dap_server.request_continue() 240 self.verify_breakpoint_hit(breakpoint_ids) 241 242 def continue_to_exception_breakpoint(self, filter_label): 243 self.dap_server.request_continue() 244 self.assertTrue( 245 self.verify_stop_exception_info(filter_label), 246 'verify we got "%s"' % (filter_label), 247 ) 248 249 def continue_to_exit(self, exitCode=0): 250 self.dap_server.request_continue() 251 stopped_events = self.dap_server.wait_for_stopped() 252 self.assertEqual( 253 len(stopped_events), 1, "stopped_events = {}".format(stopped_events) 254 ) 255 self.assertEqual( 256 stopped_events[0]["event"], "exited", "make sure program ran to completion" 257 ) 258 self.assertEqual( 259 stopped_events[0]["body"]["exitCode"], 260 exitCode, 261 "exitCode == %i" % (exitCode), 262 ) 263 264 def disassemble(self, threadId=None, frameIndex=None): 265 stackFrames = self.get_stackFrames( 266 threadId=threadId, startFrame=frameIndex, levels=1 267 ) 268 self.assertIsNotNone(stackFrames) 269 memoryReference = stackFrames[0]["instructionPointerReference"] 270 self.assertIsNotNone(memoryReference) 271 272 if memoryReference not in self.dap_server.disassembled_instructions: 273 self.dap_server.request_disassemble(memoryReference=memoryReference) 274 275 return self.dap_server.disassembled_instructions[memoryReference] 276 277 def attach( 278 self, 279 program=None, 280 pid=None, 281 waitFor=None, 282 trace=None, 283 initCommands=None, 284 preRunCommands=None, 285 stopCommands=None, 286 exitCommands=None, 287 attachCommands=None, 288 coreFile=None, 289 disconnectAutomatically=True, 290 terminateCommands=None, 291 postRunCommands=None, 292 sourceMap=None, 293 sourceInitFile=False, 294 expectFailure=False, 295 ): 296 """Build the default Makefile target, create the DAP debug adaptor, 297 and attach to the process. 298 """ 299 300 # Make sure we disconnect and terminate the DAP debug adaptor even 301 # if we throw an exception during the test case. 302 def cleanup(): 303 if disconnectAutomatically: 304 self.dap_server.request_disconnect(terminateDebuggee=True) 305 self.dap_server.terminate() 306 307 # Execute the cleanup function during test case tear down. 308 self.addTearDownHook(cleanup) 309 # Initialize and launch the program 310 self.dap_server.request_initialize(sourceInitFile) 311 response = self.dap_server.request_attach( 312 program=program, 313 pid=pid, 314 waitFor=waitFor, 315 trace=trace, 316 initCommands=initCommands, 317 preRunCommands=preRunCommands, 318 stopCommands=stopCommands, 319 exitCommands=exitCommands, 320 attachCommands=attachCommands, 321 terminateCommands=terminateCommands, 322 coreFile=coreFile, 323 postRunCommands=postRunCommands, 324 sourceMap=sourceMap, 325 ) 326 if expectFailure: 327 return response 328 if not (response and response["success"]): 329 self.assertTrue( 330 response["success"], "attach failed (%s)" % (response["message"]) 331 ) 332 333 def launch( 334 self, 335 program=None, 336 args=None, 337 cwd=None, 338 env=None, 339 stopOnEntry=False, 340 disableASLR=True, 341 disableSTDIO=False, 342 shellExpandArguments=False, 343 trace=False, 344 initCommands=None, 345 preRunCommands=None, 346 stopCommands=None, 347 exitCommands=None, 348 terminateCommands=None, 349 sourcePath=None, 350 debuggerRoot=None, 351 sourceInitFile=False, 352 launchCommands=None, 353 sourceMap=None, 354 disconnectAutomatically=True, 355 runInTerminal=False, 356 expectFailure=False, 357 postRunCommands=None, 358 enableAutoVariableSummaries=False, 359 enableSyntheticChildDebugging=False, 360 commandEscapePrefix=None, 361 customFrameFormat=None, 362 customThreadFormat=None, 363 ): 364 """Sending launch request to dap""" 365 366 # Make sure we disconnect and terminate the DAP debug adapter, 367 # if we throw an exception during the test case 368 def cleanup(): 369 if disconnectAutomatically: 370 self.dap_server.request_disconnect(terminateDebuggee=True) 371 self.dap_server.terminate() 372 373 # Execute the cleanup function during test case tear down. 374 self.addTearDownHook(cleanup) 375 376 # Initialize and launch the program 377 self.dap_server.request_initialize(sourceInitFile) 378 response = self.dap_server.request_launch( 379 program, 380 args=args, 381 cwd=cwd, 382 env=env, 383 stopOnEntry=stopOnEntry, 384 disableASLR=disableASLR, 385 disableSTDIO=disableSTDIO, 386 shellExpandArguments=shellExpandArguments, 387 trace=trace, 388 initCommands=initCommands, 389 preRunCommands=preRunCommands, 390 stopCommands=stopCommands, 391 exitCommands=exitCommands, 392 terminateCommands=terminateCommands, 393 sourcePath=sourcePath, 394 debuggerRoot=debuggerRoot, 395 launchCommands=launchCommands, 396 sourceMap=sourceMap, 397 runInTerminal=runInTerminal, 398 postRunCommands=postRunCommands, 399 enableAutoVariableSummaries=enableAutoVariableSummaries, 400 enableSyntheticChildDebugging=enableSyntheticChildDebugging, 401 commandEscapePrefix=commandEscapePrefix, 402 customFrameFormat=customFrameFormat, 403 customThreadFormat=customThreadFormat, 404 ) 405 406 if expectFailure: 407 return response 408 409 if not (response and response["success"]): 410 self.assertTrue( 411 response["success"], "launch failed (%s)" % (response["message"]) 412 ) 413 return response 414 415 def build_and_launch( 416 self, 417 program, 418 args=None, 419 cwd=None, 420 env=None, 421 stopOnEntry=False, 422 disableASLR=True, 423 disableSTDIO=False, 424 shellExpandArguments=False, 425 trace=False, 426 initCommands=None, 427 preRunCommands=None, 428 stopCommands=None, 429 exitCommands=None, 430 terminateCommands=None, 431 sourcePath=None, 432 debuggerRoot=None, 433 sourceInitFile=False, 434 runInTerminal=False, 435 disconnectAutomatically=True, 436 postRunCommands=None, 437 lldbDAPEnv=None, 438 enableAutoVariableSummaries=False, 439 enableSyntheticChildDebugging=False, 440 commandEscapePrefix=None, 441 customFrameFormat=None, 442 customThreadFormat=None, 443 launchCommands=None, 444 expectFailure=False, 445 ): 446 """Build the default Makefile target, create the DAP debug adaptor, 447 and launch the process. 448 """ 449 self.build_and_create_debug_adaptor(lldbDAPEnv) 450 self.assertTrue(os.path.exists(program), "executable must exist") 451 452 return self.launch( 453 program, 454 args, 455 cwd, 456 env, 457 stopOnEntry, 458 disableASLR, 459 disableSTDIO, 460 shellExpandArguments, 461 trace, 462 initCommands, 463 preRunCommands, 464 stopCommands, 465 exitCommands, 466 terminateCommands, 467 sourcePath, 468 debuggerRoot, 469 sourceInitFile, 470 runInTerminal=runInTerminal, 471 disconnectAutomatically=disconnectAutomatically, 472 postRunCommands=postRunCommands, 473 enableAutoVariableSummaries=enableAutoVariableSummaries, 474 enableSyntheticChildDebugging=enableSyntheticChildDebugging, 475 commandEscapePrefix=commandEscapePrefix, 476 customFrameFormat=customFrameFormat, 477 customThreadFormat=customThreadFormat, 478 launchCommands=launchCommands, 479 expectFailure=expectFailure, 480 ) 481