1""" 2Base class for gdb-remote test cases. 3""" 4 5import errno 6import os 7import os.path 8import random 9import re 10import select 11import socket 12import subprocess 13import sys 14import tempfile 15import time 16from lldbsuite.test import configuration 17from lldbsuite.test.lldbtest import * 18from lldbsuite.support import seven 19from lldbgdbserverutils import * 20import logging 21 22 23class _ConnectionRefused(IOError): 24 pass 25 26 27class GdbRemoteTestCaseFactory(type): 28 def __new__(cls, name, bases, attrs): 29 newattrs = {} 30 for attrname, attrvalue in attrs.items(): 31 if not attrname.startswith("test"): 32 newattrs[attrname] = attrvalue 33 continue 34 35 # If any debug server categories were explicitly tagged, assume 36 # that list to be authoritative. If none were specified, try 37 # all of them. 38 all_categories = set(["debugserver", "llgs"]) 39 categories = set(getattr(attrvalue, "categories", [])) & all_categories 40 if not categories: 41 categories = all_categories 42 43 for cat in categories: 44 45 @decorators.add_test_categories([cat]) 46 @wraps(attrvalue) 47 def test_method(self, attrvalue=attrvalue): 48 return attrvalue(self) 49 50 method_name = attrname + "_" + cat 51 test_method.__name__ = method_name 52 test_method.debug_server = cat 53 newattrs[method_name] = test_method 54 55 return super(GdbRemoteTestCaseFactory, cls).__new__(cls, name, bases, newattrs) 56 57 58class GdbRemoteTestCaseBase(Base, metaclass=GdbRemoteTestCaseFactory): 59 # Default time out in seconds. The timeout is increased tenfold under Asan. 60 DEFAULT_TIMEOUT = 20 * (10 if ("ASAN_OPTIONS" in os.environ) else 1) 61 # Default sleep time in seconds. The sleep time is doubled under Asan. 62 DEFAULT_SLEEP = 5 * (2 if ("ASAN_OPTIONS" in os.environ) else 1) 63 64 _GDBREMOTE_KILL_PACKET = b"$k#6b" 65 66 # Start the inferior separately, attach to the inferior on the stub 67 # command line. 68 _STARTUP_ATTACH = "attach" 69 # Start the inferior separately, start the stub without attaching, allow 70 # the test to attach to the inferior however it wants (e.g. $vAttach;pid). 71 _STARTUP_ATTACH_MANUALLY = "attach_manually" 72 # Start the stub, and launch the inferior with an $A packet via the 73 # initial packet stream. 74 _STARTUP_LAUNCH = "launch" 75 76 # GDB Signal numbers that are not target-specific used for common 77 # exceptions 78 TARGET_EXC_BAD_ACCESS = 0x91 79 TARGET_EXC_BAD_INSTRUCTION = 0x92 80 TARGET_EXC_ARITHMETIC = 0x93 81 TARGET_EXC_EMULATION = 0x94 82 TARGET_EXC_SOFTWARE = 0x95 83 TARGET_EXC_BREAKPOINT = 0x96 84 85 _verbose_log_handler = None 86 _log_formatter = logging.Formatter(fmt="%(asctime)-15s %(levelname)-8s %(message)s") 87 88 def setUpBaseLogging(self): 89 self.logger = logging.getLogger(__name__) 90 91 if len(self.logger.handlers) > 0: 92 return # We have set up this handler already 93 94 self.logger.propagate = False 95 self.logger.setLevel(logging.DEBUG) 96 97 # log all warnings to stderr 98 handler = logging.StreamHandler() 99 handler.setLevel(logging.WARNING) 100 handler.setFormatter(self._log_formatter) 101 self.logger.addHandler(handler) 102 103 def isVerboseLoggingRequested(self): 104 # We will report our detailed logs if the user requested that the "gdb-remote" channel is 105 # logged. 106 return any(("gdb-remote" in channel) for channel in lldbtest_config.channels) 107 108 def getDebugServer(self): 109 method = getattr(self, self.testMethodName) 110 return getattr(method, "debug_server", None) 111 112 def setUp(self): 113 super(GdbRemoteTestCaseBase, self).setUp() 114 115 self.setUpBaseLogging() 116 self.debug_monitor_extra_args = [] 117 118 if self.isVerboseLoggingRequested(): 119 # If requested, full logs go to a log file 120 self._verbose_log_handler = logging.FileHandler( 121 self.getLogBasenameForCurrentTest() + "-host.log" 122 ) 123 self._verbose_log_handler.setFormatter(self._log_formatter) 124 self._verbose_log_handler.setLevel(logging.DEBUG) 125 self.logger.addHandler(self._verbose_log_handler) 126 127 self.test_sequence = GdbRemoteTestSequence(self.logger) 128 self.set_inferior_startup_launch() 129 self.port = self.get_next_port() 130 self.stub_sends_two_stop_notifications_on_kill = False 131 if configuration.lldb_platform_url: 132 if configuration.lldb_platform_url.startswith("unix-"): 133 url_pattern = r"(.+)://\[?(.+?)\]?/.*" 134 else: 135 url_pattern = r"(.+)://(.+):\d+" 136 scheme, host = re.match( 137 url_pattern, configuration.lldb_platform_url 138 ).groups() 139 if ( 140 configuration.lldb_platform_name == "remote-android" 141 and host != "localhost" 142 ): 143 self.stub_device = host 144 self.stub_hostname = "localhost" 145 else: 146 self.stub_device = None 147 self.stub_hostname = host 148 else: 149 self.stub_hostname = "localhost" 150 151 debug_server = self.getDebugServer() 152 if debug_server == "debugserver": 153 self._init_debugserver_test() 154 else: 155 self._init_llgs_test() 156 157 def tearDown(self): 158 self.logger.removeHandler(self._verbose_log_handler) 159 self._verbose_log_handler = None 160 TestBase.tearDown(self) 161 162 def getLocalServerLogFile(self): 163 return self.getLogBasenameForCurrentTest() + "-server.log" 164 165 def setUpServerLogging(self, is_llgs): 166 if len(lldbtest_config.channels) == 0: 167 return # No logging requested 168 169 if lldb.remote_platform: 170 log_file = lldbutil.join_remote_paths( 171 lldb.remote_platform.GetWorkingDirectory(), "server.log" 172 ) 173 else: 174 log_file = self.getLocalServerLogFile() 175 176 if is_llgs: 177 self.debug_monitor_extra_args.append("--log-file=" + log_file) 178 self.debug_monitor_extra_args.append( 179 "--log-channels={}".format(":".join(lldbtest_config.channels)) 180 ) 181 else: 182 self.debug_monitor_extra_args = [ 183 "--log-file=" + log_file, 184 "--log-flags=0x800000", 185 ] 186 187 def get_next_port(self): 188 return 12000 + random.randint(0, 7999) 189 190 def reset_test_sequence(self): 191 self.test_sequence = GdbRemoteTestSequence(self.logger) 192 193 def _init_llgs_test(self): 194 reverse_connect = True 195 if lldb.remote_platform: 196 # Reverse connections may be tricky due to firewalls/NATs. 197 reverse_connect = False 198 199 # FIXME: This is extremely linux-oriented 200 201 # Grab the ppid from /proc/[shell pid]/stat 202 err, retcode, shell_stat = self.run_platform_command("cat /proc/$$/stat") 203 self.assertTrue( 204 err.Success() and retcode == 0, 205 "Failed to read file /proc/$$/stat: %s, retcode: %d" 206 % (err.GetCString(), retcode), 207 ) 208 209 # [pid] ([executable]) [state] [*ppid*] 210 pid = re.match(r"^\d+ \(.+\) . (\d+)", shell_stat).group(1) 211 err, retcode, ls_output = self.run_platform_command( 212 "ls -l /proc/%s/exe" % pid 213 ) 214 self.assertTrue( 215 err.Success() and retcode == 0, 216 "Failed to read file /proc/%s/exe: %s, retcode: %d" 217 % (pid, err.GetCString(), retcode), 218 ) 219 exe = ls_output.split()[-1] 220 221 # If the binary has been deleted, the link name has " (deleted)" appended. 222 # Remove if it's there. 223 self.debug_monitor_exe = re.sub(r" \(deleted\)$", "", exe) 224 else: 225 self.debug_monitor_exe = get_lldb_server_exe() 226 227 self.debug_monitor_extra_args = ["gdbserver"] 228 self.setUpServerLogging(is_llgs=True) 229 230 self.reverse_connect = reverse_connect 231 232 def _init_debugserver_test(self): 233 self.debug_monitor_exe = get_debugserver_exe() 234 self.setUpServerLogging(is_llgs=False) 235 self.reverse_connect = True 236 237 # The debugserver stub has a race on handling the 'k' command, so it sends an X09 right away, then sends the real X notification 238 # when the process truly dies. 239 self.stub_sends_two_stop_notifications_on_kill = True 240 241 def forward_adb_port(self, source, target, direction, device): 242 adb = ["adb"] + (["-s", device] if device else []) + [direction] 243 244 def remove_port_forward(): 245 subprocess.call(adb + ["--remove", "tcp:%d" % source]) 246 247 subprocess.call(adb + ["tcp:%d" % source, "tcp:%d" % target]) 248 self.addTearDownHook(remove_port_forward) 249 250 def _verify_socket(self, sock): 251 # Normally, when the remote stub is not ready, we will get ECONNREFUSED during the 252 # connect() attempt. However, due to the way how ADB forwarding works, on android targets 253 # the connect() will always be successful, but the connection will be immediately dropped 254 # if ADB could not connect on the remote side. This function tries to detect this 255 # situation, and report it as "connection refused" so that the upper layers attempt the 256 # connection again. 257 triple = self.dbg.GetSelectedPlatform().GetTriple() 258 if not re.match(".*-.*-.*-android", triple): 259 return # Not android. 260 can_read, _, _ = select.select([sock], [], [], 0.1) 261 if sock not in can_read: 262 return # Data is not available, but the connection is alive. 263 if len(sock.recv(1, socket.MSG_PEEK)) == 0: 264 raise _ConnectionRefused() # Got EOF, connection dropped. 265 266 def create_socket(self): 267 try: 268 sock = socket.socket(family=socket.AF_INET) 269 except OSError as e: 270 if e.errno != errno.EAFNOSUPPORT: 271 raise 272 sock = socket.socket(family=socket.AF_INET6) 273 274 logger = self.logger 275 276 triple = self.dbg.GetSelectedPlatform().GetTriple() 277 if re.match(".*-.*-.*-android", triple): 278 self.forward_adb_port(self.port, self.port, "forward", self.stub_device) 279 280 logger.info( 281 "Connecting to debug monitor on %s:%d", self.stub_hostname, self.port 282 ) 283 connect_info = (self.stub_hostname, self.port) 284 try: 285 sock.connect(connect_info) 286 except socket.error as serr: 287 if serr.errno == errno.ECONNREFUSED: 288 raise _ConnectionRefused() 289 raise serr 290 291 def shutdown_socket(): 292 if sock: 293 try: 294 # send the kill packet so lldb-server shuts down gracefully 295 sock.sendall(GdbRemoteTestCaseBase._GDBREMOTE_KILL_PACKET) 296 except: 297 logger.warning( 298 "failed to send kill packet to debug monitor: {}; ignoring".format( 299 sys.exc_info()[0] 300 ) 301 ) 302 303 try: 304 sock.close() 305 except: 306 logger.warning( 307 "failed to close socket to debug monitor: {}; ignoring".format( 308 sys.exc_info()[0] 309 ) 310 ) 311 312 self.addTearDownHook(shutdown_socket) 313 314 self._verify_socket(sock) 315 316 return sock 317 318 def set_inferior_startup_launch(self): 319 self._inferior_startup = self._STARTUP_LAUNCH 320 321 def set_inferior_startup_attach(self): 322 self._inferior_startup = self._STARTUP_ATTACH 323 324 def set_inferior_startup_attach_manually(self): 325 self._inferior_startup = self._STARTUP_ATTACH_MANUALLY 326 327 def get_debug_monitor_command_line_args(self, attach_pid=None): 328 commandline_args = self.debug_monitor_extra_args 329 if attach_pid: 330 commandline_args += ["--attach=%d" % attach_pid] 331 if self.reverse_connect: 332 commandline_args += ["--reverse-connect", self.connect_address] 333 else: 334 if lldb.remote_platform: 335 commandline_args += ["*:{}".format(self.port)] 336 else: 337 commandline_args += ["localhost:{}".format(self.port)] 338 339 return commandline_args 340 341 def get_target_byte_order(self): 342 inferior_exe_path = self.getBuildArtifact("a.out") 343 target = self.dbg.CreateTarget(inferior_exe_path) 344 return target.GetByteOrder() 345 346 def launch_debug_monitor(self, attach_pid=None, logfile=None): 347 if self.reverse_connect: 348 family, type, proto, _, addr = socket.getaddrinfo( 349 "localhost", 0, proto=socket.IPPROTO_TCP 350 )[0] 351 sock = socket.socket(family, type, proto) 352 sock.settimeout(self.DEFAULT_TIMEOUT) 353 354 sock.bind(addr) 355 sock.listen(1) 356 addr = sock.getsockname() 357 self.connect_address = "[{}]:{}".format(*addr) 358 359 # Create the command line. 360 commandline_args = self.get_debug_monitor_command_line_args( 361 attach_pid=attach_pid 362 ) 363 364 # Start the server. 365 server = self.spawnSubprocess( 366 self.debug_monitor_exe, commandline_args, install_remote=False 367 ) 368 self.assertIsNotNone(server) 369 370 if self.reverse_connect: 371 self.sock = sock.accept()[0] 372 self.sock.settimeout(self.DEFAULT_TIMEOUT) 373 374 return server 375 376 def connect_to_debug_monitor(self, attach_pid=None): 377 if self.reverse_connect: 378 # Create the stub. 379 server = self.launch_debug_monitor(attach_pid=attach_pid) 380 self.assertIsNotNone(server) 381 382 # Schedule debug monitor to be shut down during teardown. 383 logger = self.logger 384 385 self._server = Server(self.sock, server) 386 return server 387 388 # We're using a random port algorithm to try not to collide with other ports, 389 # and retry a max # times. 390 attempts = 0 391 MAX_ATTEMPTS = 10 392 attempt_wait = 3 393 394 while attempts < MAX_ATTEMPTS: 395 server = self.launch_debug_monitor(attach_pid=attach_pid) 396 397 # Schedule debug monitor to be shut down during teardown. 398 logger = self.logger 399 400 connect_attemps = 0 401 MAX_CONNECT_ATTEMPTS = 10 402 403 while connect_attemps < MAX_CONNECT_ATTEMPTS: 404 # Create a socket to talk to the server 405 try: 406 logger.info("Connect attempt %d", connect_attemps + 1) 407 self.sock = self.create_socket() 408 self._server = Server(self.sock, server) 409 return server 410 except _ConnectionRefused as serr: 411 # Ignore, and try again. 412 pass 413 time.sleep(0.5) 414 connect_attemps += 1 415 416 # We should close the server here to be safe. 417 server.terminate() 418 419 # Increment attempts. 420 print( 421 "connect to debug monitor on port %d failed, attempt #%d of %d" 422 % (self.port, attempts + 1, MAX_ATTEMPTS) 423 ) 424 attempts += 1 425 426 # And wait a random length of time before next attempt, to avoid 427 # collisions. 428 time.sleep(attempt_wait) 429 attempt_wait *= 1.2 430 431 # Now grab a new port number. 432 self.port = self.get_next_port() 433 434 raise Exception( 435 "failed to create a socket to the launched debug monitor after %d tries" 436 % attempts 437 ) 438 439 def launch_process_for_attach( 440 self, inferior_args=None, sleep_seconds=3, exe_path=None 441 ): 442 # We're going to start a child process that the debug monitor stub can later attach to. 443 # This process needs to be started so that it just hangs around for a while. We'll 444 # have it sleep. 445 if not exe_path: 446 exe_path = self.getBuildArtifact("a.out") 447 448 args = [] 449 if inferior_args: 450 args.extend(inferior_args) 451 if sleep_seconds: 452 args.append("sleep:%d" % sleep_seconds) 453 454 return self.spawnSubprocess(exe_path, args) 455 456 def prep_debug_monitor_and_inferior( 457 self, 458 inferior_args=None, 459 inferior_sleep_seconds=3, 460 inferior_exe_path=None, 461 inferior_env=None, 462 ): 463 """Prep the debug monitor, the inferior, and the expected packet stream. 464 465 Handle the separate cases of using the debug monitor in attach-to-inferior mode 466 and in launch-inferior mode. 467 468 For attach-to-inferior mode, the inferior process is first started, then 469 the debug monitor is started in attach to pid mode (using --attach on the 470 stub command line), and the no-ack-mode setup is appended to the packet 471 stream. The packet stream is not yet executed, ready to have more expected 472 packet entries added to it. 473 474 For launch-inferior mode, the stub is first started, then no ack mode is 475 setup on the expected packet stream, then the verified launch packets are added 476 to the expected socket stream. The packet stream is not yet executed, ready 477 to have more expected packet entries added to it. 478 479 The return value is: 480 {inferior:<inferior>, server:<server>} 481 """ 482 inferior = None 483 attach_pid = None 484 485 if ( 486 self._inferior_startup == self._STARTUP_ATTACH 487 or self._inferior_startup == self._STARTUP_ATTACH_MANUALLY 488 ): 489 # Launch the process that we'll use as the inferior. 490 inferior = self.launch_process_for_attach( 491 inferior_args=inferior_args, 492 sleep_seconds=inferior_sleep_seconds, 493 exe_path=inferior_exe_path, 494 ) 495 self.assertIsNotNone(inferior) 496 self.assertTrue(inferior.pid > 0) 497 if self._inferior_startup == self._STARTUP_ATTACH: 498 # In this case, we want the stub to attach via the command 499 # line, so set the command line attach pid here. 500 attach_pid = inferior.pid 501 502 if self._inferior_startup == self._STARTUP_LAUNCH: 503 # Build launch args 504 if not inferior_exe_path: 505 inferior_exe_path = self.getBuildArtifact("a.out") 506 507 if lldb.remote_platform: 508 remote_path = lldbutil.append_to_process_working_directory( 509 self, os.path.basename(inferior_exe_path) 510 ) 511 remote_file_spec = lldb.SBFileSpec(remote_path, False) 512 err = lldb.remote_platform.Install( 513 lldb.SBFileSpec(inferior_exe_path, True), remote_file_spec 514 ) 515 if err.Fail(): 516 raise Exception( 517 "remote_platform.Install('%s', '%s') failed: %s" 518 % (inferior_exe_path, remote_path, err) 519 ) 520 inferior_exe_path = remote_path 521 522 launch_args = [inferior_exe_path] 523 if inferior_args: 524 launch_args.extend(inferior_args) 525 526 # Launch the debug monitor stub, attaching to the inferior. 527 server = self.connect_to_debug_monitor(attach_pid=attach_pid) 528 self.assertIsNotNone(server) 529 530 self.do_handshake() 531 532 # Build the expected protocol stream 533 if inferior_env: 534 for name, value in inferior_env.items(): 535 self.add_set_environment_packets(name, value) 536 if self._inferior_startup == self._STARTUP_LAUNCH: 537 self.add_verified_launch_packets(launch_args) 538 539 return {"inferior": inferior, "server": server} 540 541 def do_handshake(self): 542 server = self._server 543 server.send_ack() 544 server.send_packet(b"QStartNoAckMode") 545 self.assertEqual(server.get_normal_packet(), b"+") 546 self.assertEqual(server.get_normal_packet(), b"OK") 547 server.send_ack() 548 549 def add_verified_launch_packets(self, launch_args): 550 self.test_sequence.add_log_lines( 551 [ 552 "read packet: %s" % build_gdbremote_A_packet(launch_args), 553 "send packet: $OK#00", 554 "read packet: $qLaunchSuccess#a5", 555 "send packet: $OK#00", 556 ], 557 True, 558 ) 559 560 def add_thread_suffix_request_packets(self): 561 self.test_sequence.add_log_lines( 562 [ 563 "read packet: $QThreadSuffixSupported#e4", 564 "send packet: $OK#00", 565 ], 566 True, 567 ) 568 569 def add_process_info_collection_packets(self): 570 self.test_sequence.add_log_lines( 571 [ 572 "read packet: $qProcessInfo#dc", 573 { 574 "direction": "send", 575 "regex": r"^\$(.+)#[0-9a-fA-F]{2}$", 576 "capture": {1: "process_info_raw"}, 577 }, 578 ], 579 True, 580 ) 581 582 def add_set_environment_packets(self, name, value): 583 self.test_sequence.add_log_lines( 584 [ 585 "read packet: $QEnvironment:" + name + "=" + value + "#00", 586 "send packet: $OK#00", 587 ], 588 True, 589 ) 590 591 _KNOWN_PROCESS_INFO_KEYS = [ 592 "pid", 593 "parent-pid", 594 "real-uid", 595 "real-gid", 596 "effective-uid", 597 "effective-gid", 598 "cputype", 599 "cpusubtype", 600 "ostype", 601 "triple", 602 "vendor", 603 "endian", 604 "elf_abi", 605 "ptrsize", 606 ] 607 608 def parse_process_info_response(self, context): 609 # Ensure we have a process info response. 610 self.assertIsNotNone(context) 611 process_info_raw = context.get("process_info_raw") 612 self.assertIsNotNone(process_info_raw) 613 614 # Pull out key:value; pairs. 615 process_info_dict = { 616 match.group(1): match.group(2) 617 for match in re.finditer(r"([^:]+):([^;]+);", process_info_raw) 618 } 619 620 # Validate keys are known. 621 for key, val in list(process_info_dict.items()): 622 self.assertTrue(key in self._KNOWN_PROCESS_INFO_KEYS) 623 self.assertIsNotNone(val) 624 625 return process_info_dict 626 627 def add_register_info_collection_packets(self): 628 self.test_sequence.add_log_lines( 629 [ 630 { 631 "type": "multi_response", 632 "query": "qRegisterInfo", 633 "append_iteration_suffix": True, 634 "end_regex": re.compile(r"^\$(E\d+)?#[0-9a-fA-F]{2}$"), 635 "save_key": "reg_info_responses", 636 } 637 ], 638 True, 639 ) 640 641 def parse_register_info_packets(self, context): 642 """Return an array of register info dictionaries, one per register info.""" 643 reg_info_responses = context.get("reg_info_responses") 644 self.assertIsNotNone(reg_info_responses) 645 646 # Parse register infos. 647 return [ 648 parse_reg_info_response(reg_info_response) 649 for reg_info_response in reg_info_responses 650 ] 651 652 def expect_gdbremote_sequence(self): 653 return expect_lldb_gdbserver_replay( 654 self, 655 self._server, 656 self.test_sequence, 657 self.DEFAULT_TIMEOUT * len(self.test_sequence), 658 self.logger, 659 ) 660 661 _KNOWN_REGINFO_KEYS = [ 662 "name", 663 "alt-name", 664 "bitsize", 665 "offset", 666 "encoding", 667 "format", 668 "set", 669 "gcc", 670 "ehframe", 671 "dwarf", 672 "generic", 673 "container-regs", 674 "invalidate-regs", 675 "dynamic_size_dwarf_expr_bytes", 676 "dynamic_size_dwarf_len", 677 ] 678 679 def assert_valid_reg_info(self, reg_info): 680 # Assert we know about all the reginfo keys parsed. 681 for key in reg_info: 682 self.assertTrue(key in self._KNOWN_REGINFO_KEYS) 683 684 # Check the bare-minimum expected set of register info keys. 685 self.assertTrue("name" in reg_info) 686 self.assertTrue("bitsize" in reg_info) 687 688 if not self.getArchitecture() == "aarch64": 689 self.assertTrue("offset" in reg_info) 690 691 self.assertTrue("encoding" in reg_info) 692 self.assertTrue("format" in reg_info) 693 694 def find_pc_reg_info(self, reg_infos): 695 lldb_reg_index = 0 696 for reg_info in reg_infos: 697 if ("generic" in reg_info) and (reg_info["generic"] == "pc"): 698 return (lldb_reg_index, reg_info) 699 lldb_reg_index += 1 700 701 return (None, None) 702 703 def add_lldb_register_index(self, reg_infos): 704 """Add a "lldb_register_index" key containing the 0-baed index of each reg_infos entry. 705 706 We'll use this when we want to call packets like P/p with a register index but do so 707 on only a subset of the full register info set. 708 """ 709 self.assertIsNotNone(reg_infos) 710 711 reg_index = 0 712 for reg_info in reg_infos: 713 reg_info["lldb_register_index"] = reg_index 714 reg_index += 1 715 716 def add_query_memory_region_packets(self, address): 717 self.test_sequence.add_log_lines( 718 [ 719 "read packet: $qMemoryRegionInfo:{0:x}#00".format(address), 720 { 721 "direction": "send", 722 "regex": r"^\$(.+)#[0-9a-fA-F]{2}$", 723 "capture": {1: "memory_region_response"}, 724 }, 725 ], 726 True, 727 ) 728 729 def parse_key_val_dict(self, key_val_text, allow_dupes=True): 730 self.assertIsNotNone(key_val_text) 731 kv_dict = {} 732 for match in re.finditer(r";?([^:]+):([^;]+)", key_val_text): 733 key = match.group(1) 734 val = match.group(2) 735 if key in kv_dict: 736 if allow_dupes: 737 if isinstance(kv_dict[key], list): 738 kv_dict[key].append(val) 739 else: 740 # Promote to list 741 kv_dict[key] = [kv_dict[key], val] 742 else: 743 self.fail( 744 "key '{}' already present when attempting to add value '{}' (text='{}', dict={})".format( 745 key, val, key_val_text, kv_dict 746 ) 747 ) 748 else: 749 kv_dict[key] = val 750 return kv_dict 751 752 def parse_memory_region_packet(self, context): 753 # Ensure we have a context. 754 self.assertIsNotNone(context.get("memory_region_response")) 755 756 # Pull out key:value; pairs. 757 mem_region_dict = self.parse_key_val_dict(context.get("memory_region_response")) 758 759 # Validate keys are known. 760 for key, val in list(mem_region_dict.items()): 761 self.assertIn( 762 key, 763 [ 764 "start", 765 "size", 766 "permissions", 767 "flags", 768 "name", 769 "error", 770 "dirty-pages", 771 "type", 772 ], 773 ) 774 self.assertIsNotNone(val) 775 776 mem_region_dict["name"] = seven.unhexlify(mem_region_dict.get("name", "")) 777 # Return the dictionary of key-value pairs for the memory region. 778 return mem_region_dict 779 780 def assert_address_within_memory_region(self, test_address, mem_region_dict): 781 self.assertIsNotNone(mem_region_dict) 782 self.assertTrue("start" in mem_region_dict) 783 self.assertTrue("size" in mem_region_dict) 784 785 range_start = int(mem_region_dict["start"], 16) 786 range_size = int(mem_region_dict["size"], 16) 787 range_end = range_start + range_size 788 789 if test_address < range_start: 790 self.fail( 791 "address 0x{0:x} comes before range 0x{1:x} - 0x{2:x} (size 0x{3:x})".format( 792 test_address, range_start, range_end, range_size 793 ) 794 ) 795 elif test_address >= range_end: 796 self.fail( 797 "address 0x{0:x} comes after range 0x{1:x} - 0x{2:x} (size 0x{3:x})".format( 798 test_address, range_start, range_end, range_size 799 ) 800 ) 801 802 def add_threadinfo_collection_packets(self): 803 self.test_sequence.add_log_lines( 804 [ 805 { 806 "type": "multi_response", 807 "first_query": "qfThreadInfo", 808 "next_query": "qsThreadInfo", 809 "append_iteration_suffix": False, 810 "end_regex": re.compile(r"^\$(l)?#[0-9a-fA-F]{2}$"), 811 "save_key": "threadinfo_responses", 812 } 813 ], 814 True, 815 ) 816 817 def parse_threadinfo_packets(self, context): 818 """Return an array of thread ids (decimal ints), one per thread.""" 819 threadinfo_responses = context.get("threadinfo_responses") 820 self.assertIsNotNone(threadinfo_responses) 821 822 thread_ids = [] 823 for threadinfo_response in threadinfo_responses: 824 new_thread_infos = parse_threadinfo_response(threadinfo_response) 825 thread_ids.extend(new_thread_infos) 826 return thread_ids 827 828 def launch_with_threads(self, thread_count): 829 procs = self.prep_debug_monitor_and_inferior( 830 inferior_args=["thread:new"] * (thread_count - 1) + ["trap"] 831 ) 832 833 self.test_sequence.add_log_lines( 834 [ 835 "read packet: $c#00", 836 { 837 "direction": "send", 838 "regex": r"^\$T([0-9a-fA-F]{2})([^#]*)#..$", 839 "capture": {1: "stop_signo", 2: "stop_reply_kv"}, 840 }, 841 ], 842 True, 843 ) 844 self.add_threadinfo_collection_packets() 845 context = self.expect_gdbremote_sequence() 846 threads = self.parse_threadinfo_packets(context) 847 self.assertGreaterEqual(len(threads), thread_count) 848 return context, threads 849 850 def add_set_breakpoint_packets( 851 self, address, z_packet_type=0, do_continue=True, breakpoint_kind=1 852 ): 853 self.test_sequence.add_log_lines( 854 [ # Set the breakpoint. 855 "read packet: $Z{2},{0:x},{1}#00".format( 856 address, breakpoint_kind, z_packet_type 857 ), 858 # Verify the stub could set it. 859 "send packet: $OK#00", 860 ], 861 True, 862 ) 863 864 if do_continue: 865 self.test_sequence.add_log_lines( 866 [ # Continue the inferior. 867 "read packet: $c#63", 868 # Expect a breakpoint stop report. 869 { 870 "direction": "send", 871 "regex": r"^\$T([0-9a-fA-F]{2})thread:([0-9a-fA-F]+);", 872 "capture": {1: "stop_signo", 2: "stop_thread_id"}, 873 }, 874 ], 875 True, 876 ) 877 878 def add_remove_breakpoint_packets( 879 self, address, z_packet_type=0, breakpoint_kind=1 880 ): 881 self.test_sequence.add_log_lines( 882 [ # Remove the breakpoint. 883 "read packet: $z{2},{0:x},{1}#00".format( 884 address, breakpoint_kind, z_packet_type 885 ), 886 # Verify the stub could unset it. 887 "send packet: $OK#00", 888 ], 889 True, 890 ) 891 892 def add_qSupported_packets(self, client_features=[]): 893 features = "".join(";" + x for x in client_features) 894 self.test_sequence.add_log_lines( 895 [ 896 "read packet: $qSupported{}#00".format(features), 897 { 898 "direction": "send", 899 "regex": r"^\$(.*)#[0-9a-fA-F]{2}", 900 "capture": {1: "qSupported_response"}, 901 }, 902 ], 903 True, 904 ) 905 906 _KNOWN_QSUPPORTED_STUB_FEATURES = [ 907 "augmented-libraries-svr4-read", 908 "PacketSize", 909 "QStartNoAckMode", 910 "QThreadSuffixSupported", 911 "QListThreadsInStopReply", 912 "qXfer:auxv:read", 913 "qXfer:libraries:read", 914 "qXfer:libraries-svr4:read", 915 "qXfer:features:read", 916 "qXfer:siginfo:read", 917 "qEcho", 918 "QPassSignals", 919 "multiprocess", 920 "fork-events", 921 "vfork-events", 922 "memory-tagging", 923 "qSaveCore", 924 "native-signals", 925 "QNonStop", 926 "SupportedWatchpointTypes", 927 "SupportedCompressions", 928 ] 929 930 def parse_qSupported_response(self, context): 931 self.assertIsNotNone(context) 932 933 raw_response = context.get("qSupported_response") 934 self.assertIsNotNone(raw_response) 935 936 # For values with key=val, the dict key and vals are set as expected. For feature+, feature- and feature?, the 937 # +,-,? is stripped from the key and set as the value. 938 supported_dict = {} 939 for match in re.finditer(r";?([^=;]+)(=([^;]+))?", raw_response): 940 key = match.group(1) 941 val = match.group(3) 942 943 # key=val: store as is 944 if val and len(val) > 0: 945 supported_dict[key] = val 946 else: 947 if len(key) < 2: 948 raise Exception( 949 "singular stub feature is too short: must be stub_feature{+,-,?}" 950 ) 951 supported_type = key[-1] 952 key = key[:-1] 953 if not supported_type in ["+", "-", "?"]: 954 raise Exception( 955 "malformed stub feature: final character {} not in expected set (+,-,?)".format( 956 supported_type 957 ) 958 ) 959 supported_dict[key] = supported_type 960 # Ensure we know the supported element 961 if key not in self._KNOWN_QSUPPORTED_STUB_FEATURES: 962 raise Exception("unknown qSupported stub feature reported: %s" % key) 963 964 return supported_dict 965 966 def continue_process_and_wait_for_stop(self): 967 self.test_sequence.add_log_lines( 968 [ 969 "read packet: $vCont;c#a8", 970 { 971 "direction": "send", 972 "regex": r"^\$T([0-9a-fA-F]{2})(.*)#[0-9a-fA-F]{2}$", 973 "capture": {1: "stop_signo", 2: "stop_key_val_text"}, 974 }, 975 ], 976 True, 977 ) 978 context = self.expect_gdbremote_sequence() 979 self.assertIsNotNone(context) 980 return self.parse_interrupt_packets(context) 981 982 def select_modifiable_register(self, reg_infos): 983 """Find a register that can be read/written freely.""" 984 PREFERRED_REGISTER_NAMES = set( 985 [ 986 "rax", 987 ] 988 ) 989 990 # First check for the first register from the preferred register name 991 # set. 992 alternative_register_index = None 993 994 self.assertIsNotNone(reg_infos) 995 for reg_info in reg_infos: 996 if ("name" in reg_info) and (reg_info["name"] in PREFERRED_REGISTER_NAMES): 997 # We found a preferred register. Use it. 998 return reg_info["lldb_register_index"] 999 if ("generic" in reg_info) and ( 1000 reg_info["generic"] == "fp" or reg_info["generic"] == "arg1" 1001 ): 1002 # A frame pointer or first arg register will do as a 1003 # register to modify temporarily. 1004 alternative_register_index = reg_info["lldb_register_index"] 1005 1006 # We didn't find a preferred register. Return whatever alternative register 1007 # we found, if any. 1008 return alternative_register_index 1009 1010 def extract_registers_from_stop_notification(self, stop_key_vals_text): 1011 self.assertIsNotNone(stop_key_vals_text) 1012 kv_dict = self.parse_key_val_dict(stop_key_vals_text) 1013 1014 registers = {} 1015 for key, val in list(kv_dict.items()): 1016 if re.match(r"^[0-9a-fA-F]+$", key): 1017 registers[int(key, 16)] = val 1018 return registers 1019 1020 def gather_register_infos(self): 1021 self.reset_test_sequence() 1022 self.add_register_info_collection_packets() 1023 1024 context = self.expect_gdbremote_sequence() 1025 self.assertIsNotNone(context) 1026 1027 reg_infos = self.parse_register_info_packets(context) 1028 self.assertIsNotNone(reg_infos) 1029 self.add_lldb_register_index(reg_infos) 1030 1031 return reg_infos 1032 1033 def find_generic_register_with_name(self, reg_infos, generic_name): 1034 self.assertIsNotNone(reg_infos) 1035 for reg_info in reg_infos: 1036 if ("generic" in reg_info) and (reg_info["generic"] == generic_name): 1037 return reg_info 1038 return None 1039 1040 def find_register_with_name_and_dwarf_regnum(self, reg_infos, name, dwarf_num): 1041 self.assertIsNotNone(reg_infos) 1042 for reg_info in reg_infos: 1043 if (reg_info["name"] == name) and (reg_info["dwarf"] == dwarf_num): 1044 return reg_info 1045 return None 1046 1047 def decode_gdbremote_binary(self, encoded_bytes): 1048 decoded_bytes = "" 1049 i = 0 1050 while i < len(encoded_bytes): 1051 if encoded_bytes[i] == "}": 1052 # Handle escaped char. 1053 self.assertTrue(i + 1 < len(encoded_bytes)) 1054 decoded_bytes += chr(ord(encoded_bytes[i + 1]) ^ 0x20) 1055 i += 2 1056 elif encoded_bytes[i] == "*": 1057 # Handle run length encoding. 1058 self.assertTrue(len(decoded_bytes) > 0) 1059 self.assertTrue(i + 1 < len(encoded_bytes)) 1060 repeat_count = ord(encoded_bytes[i + 1]) - 29 1061 decoded_bytes += decoded_bytes[-1] * repeat_count 1062 i += 2 1063 else: 1064 decoded_bytes += encoded_bytes[i] 1065 i += 1 1066 return decoded_bytes 1067 1068 def build_auxv_dict(self, endian, word_size, auxv_data): 1069 self.assertIsNotNone(endian) 1070 self.assertIsNotNone(word_size) 1071 self.assertIsNotNone(auxv_data) 1072 1073 auxv_dict = {} 1074 1075 # PowerPC64le's auxvec has a special key that must be ignored. 1076 # This special key may be used multiple times, resulting in 1077 # multiple key/value pairs with the same key, which would otherwise 1078 # break this test check for repeated keys. 1079 # 1080 # AT_IGNOREPPC = 22 1081 ignored_keys_for_arch = {"powerpc64le": [22]} 1082 arch = self.getArchitecture() 1083 ignore_keys = None 1084 if arch in ignored_keys_for_arch: 1085 ignore_keys = ignored_keys_for_arch[arch] 1086 1087 while len(auxv_data) > 0: 1088 # Chop off key. 1089 raw_key = auxv_data[:word_size] 1090 auxv_data = auxv_data[word_size:] 1091 1092 # Chop of value. 1093 raw_value = auxv_data[:word_size] 1094 auxv_data = auxv_data[word_size:] 1095 1096 # Convert raw text from target endian. 1097 key = unpack_endian_binary_string(endian, raw_key) 1098 value = unpack_endian_binary_string(endian, raw_value) 1099 1100 if ignore_keys and key in ignore_keys: 1101 continue 1102 1103 # Handle ending entry. 1104 if key == 0: 1105 self.assertEqual(value, 0) 1106 return auxv_dict 1107 1108 # The key should not already be present. 1109 self.assertFalse(key in auxv_dict) 1110 auxv_dict[key] = value 1111 1112 self.fail( 1113 "should not reach here - implies required double zero entry not found" 1114 ) 1115 return auxv_dict 1116 1117 def read_binary_data_in_chunks(self, command_prefix, chunk_length): 1118 """Collect command_prefix{offset:x},{chunk_length:x} until a single 'l' or 'l' with data is returned.""" 1119 offset = 0 1120 done = False 1121 decoded_data = "" 1122 1123 while not done: 1124 # Grab the next iteration of data. 1125 self.reset_test_sequence() 1126 self.test_sequence.add_log_lines( 1127 [ 1128 "read packet: ${}{:x},{:x}:#00".format( 1129 command_prefix, offset, chunk_length 1130 ), 1131 { 1132 "direction": "send", 1133 "regex": re.compile( 1134 r"^\$([^E])(.*)#[0-9a-fA-F]{2}$", re.MULTILINE | re.DOTALL 1135 ), 1136 "capture": {1: "response_type", 2: "content_raw"}, 1137 }, 1138 ], 1139 True, 1140 ) 1141 1142 context = self.expect_gdbremote_sequence() 1143 self.assertIsNotNone(context) 1144 1145 response_type = context.get("response_type") 1146 self.assertIsNotNone(response_type) 1147 self.assertTrue(response_type in ["l", "m"]) 1148 1149 # Move offset along. 1150 offset += chunk_length 1151 1152 # Figure out if we're done. We're done if the response type is l. 1153 done = response_type == "l" 1154 1155 # Decode binary data. 1156 content_raw = context.get("content_raw") 1157 if content_raw and len(content_raw) > 0: 1158 self.assertIsNotNone(content_raw) 1159 decoded_data += self.decode_gdbremote_binary(content_raw) 1160 return decoded_data 1161 1162 def add_interrupt_packets(self): 1163 self.test_sequence.add_log_lines( 1164 [ 1165 # Send the intterupt. 1166 "read packet: {}".format(chr(3)), 1167 # And wait for the stop notification. 1168 { 1169 "direction": "send", 1170 "regex": r"^\$T([0-9a-fA-F]{2})(.*)#[0-9a-fA-F]{2}$", 1171 "capture": {1: "stop_signo", 2: "stop_key_val_text"}, 1172 }, 1173 ], 1174 True, 1175 ) 1176 1177 def parse_interrupt_packets(self, context): 1178 self.assertIsNotNone(context.get("stop_signo")) 1179 self.assertIsNotNone(context.get("stop_key_val_text")) 1180 return ( 1181 int(context["stop_signo"], 16), 1182 self.parse_key_val_dict(context["stop_key_val_text"]), 1183 ) 1184 1185 def add_QSaveRegisterState_packets(self, thread_id): 1186 if thread_id: 1187 # Use the thread suffix form. 1188 request = "read packet: $QSaveRegisterState;thread:{:x}#00".format( 1189 thread_id 1190 ) 1191 else: 1192 request = "read packet: $QSaveRegisterState#00" 1193 1194 self.test_sequence.add_log_lines( 1195 [ 1196 request, 1197 { 1198 "direction": "send", 1199 "regex": r"^\$(E?.*)#[0-9a-fA-F]{2}$", 1200 "capture": {1: "save_response"}, 1201 }, 1202 ], 1203 True, 1204 ) 1205 1206 def parse_QSaveRegisterState_response(self, context): 1207 self.assertIsNotNone(context) 1208 1209 save_response = context.get("save_response") 1210 self.assertIsNotNone(save_response) 1211 1212 if len(save_response) < 1 or save_response[0] == "E": 1213 # error received 1214 return (False, None) 1215 else: 1216 return (True, int(save_response)) 1217 1218 def add_QRestoreRegisterState_packets(self, save_id, thread_id=None): 1219 if thread_id: 1220 # Use the thread suffix form. 1221 request = "read packet: $QRestoreRegisterState:{};thread:{:x}#00".format( 1222 save_id, thread_id 1223 ) 1224 else: 1225 request = "read packet: $QRestoreRegisterState:{}#00".format(save_id) 1226 1227 self.test_sequence.add_log_lines([request, "send packet: $OK#00"], True) 1228 1229 def flip_all_bits_in_each_register_value(self, reg_infos, endian, thread_id=None): 1230 self.assertIsNotNone(reg_infos) 1231 1232 successful_writes = 0 1233 failed_writes = 0 1234 1235 for reg_info in reg_infos: 1236 # Use the lldb register index added to the reg info. We're not necessarily 1237 # working off a full set of register infos, so an inferred register 1238 # index could be wrong. 1239 reg_index = reg_info["lldb_register_index"] 1240 self.assertIsNotNone(reg_index) 1241 1242 reg_byte_size = int(reg_info["bitsize"]) // 8 1243 self.assertTrue(reg_byte_size > 0) 1244 1245 # Handle thread suffix. 1246 if thread_id: 1247 p_request = "read packet: $p{:x};thread:{:x}#00".format( 1248 reg_index, thread_id 1249 ) 1250 else: 1251 p_request = "read packet: $p{:x}#00".format(reg_index) 1252 1253 # Read the existing value. 1254 self.reset_test_sequence() 1255 self.test_sequence.add_log_lines( 1256 [ 1257 p_request, 1258 { 1259 "direction": "send", 1260 "regex": r"^\$([0-9a-fA-F]+)#", 1261 "capture": {1: "p_response"}, 1262 }, 1263 ], 1264 True, 1265 ) 1266 context = self.expect_gdbremote_sequence() 1267 self.assertIsNotNone(context) 1268 1269 # Verify the response length. 1270 p_response = context.get("p_response") 1271 self.assertIsNotNone(p_response) 1272 initial_reg_value = unpack_register_hex_unsigned(endian, p_response) 1273 1274 # Flip the value by xoring with all 1s 1275 all_one_bits_raw = "ff" * (int(reg_info["bitsize"]) // 8) 1276 flipped_bits_int = initial_reg_value ^ int(all_one_bits_raw, 16) 1277 # print("reg (index={}, name={}): val={}, flipped bits (int={}, hex={:x})".format(reg_index, reg_info["name"], initial_reg_value, flipped_bits_int, flipped_bits_int)) 1278 1279 # Handle thread suffix for P. 1280 if thread_id: 1281 P_request = "read packet: $P{:x}={};thread:{:x}#00".format( 1282 reg_index, 1283 pack_register_hex( 1284 endian, flipped_bits_int, byte_size=reg_byte_size 1285 ), 1286 thread_id, 1287 ) 1288 else: 1289 P_request = "read packet: $P{:x}={}#00".format( 1290 reg_index, 1291 pack_register_hex( 1292 endian, flipped_bits_int, byte_size=reg_byte_size 1293 ), 1294 ) 1295 1296 # Write the flipped value to the register. 1297 self.reset_test_sequence() 1298 self.test_sequence.add_log_lines( 1299 [ 1300 P_request, 1301 { 1302 "direction": "send", 1303 "regex": r"^\$(OK|E[0-9a-fA-F]+)#[0-9a-fA-F]{2}", 1304 "capture": {1: "P_response"}, 1305 }, 1306 ], 1307 True, 1308 ) 1309 context = self.expect_gdbremote_sequence() 1310 self.assertIsNotNone(context) 1311 1312 # Determine if the write succeeded. There are a handful of registers that can fail, or partially fail 1313 # (e.g. flags, segment selectors, etc.) due to register value restrictions. Don't worry about them 1314 # all flipping perfectly. 1315 P_response = context.get("P_response") 1316 self.assertIsNotNone(P_response) 1317 if P_response == "OK": 1318 successful_writes += 1 1319 else: 1320 failed_writes += 1 1321 # print("reg (index={}, name={}) write FAILED (error: {})".format(reg_index, reg_info["name"], P_response)) 1322 1323 # Read back the register value, ensure it matches the flipped 1324 # value. 1325 if P_response == "OK": 1326 self.reset_test_sequence() 1327 self.test_sequence.add_log_lines( 1328 [ 1329 p_request, 1330 { 1331 "direction": "send", 1332 "regex": r"^\$([0-9a-fA-F]+)#", 1333 "capture": {1: "p_response"}, 1334 }, 1335 ], 1336 True, 1337 ) 1338 context = self.expect_gdbremote_sequence() 1339 self.assertIsNotNone(context) 1340 1341 verify_p_response_raw = context.get("p_response") 1342 self.assertIsNotNone(verify_p_response_raw) 1343 verify_bits = unpack_register_hex_unsigned( 1344 endian, verify_p_response_raw 1345 ) 1346 1347 if verify_bits != flipped_bits_int: 1348 # Some registers, like mxcsrmask and others, will permute what's written. Adjust succeed/fail counts. 1349 # print("reg (index={}, name={}): read verify FAILED: wrote {:x}, verify read back {:x}".format(reg_index, reg_info["name"], flipped_bits_int, verify_bits)) 1350 successful_writes -= 1 1351 failed_writes += 1 1352 1353 return (successful_writes, failed_writes) 1354 1355 def is_bit_flippable_register(self, reg_info): 1356 if not reg_info: 1357 return False 1358 if not "set" in reg_info: 1359 return False 1360 if reg_info["set"] != "General Purpose Registers": 1361 return False 1362 if ("container-regs" in reg_info) and (len(reg_info["container-regs"]) > 0): 1363 # Don't try to bit flip registers contained in another register. 1364 return False 1365 if re.match("^.s$", reg_info["name"]): 1366 # This is a 2-letter register name that ends in "s", like a segment register. 1367 # Don't try to bit flip these. 1368 return False 1369 if re.match("^(c|)psr$", reg_info["name"]): 1370 # This is an ARM program status register; don't flip it. 1371 return False 1372 # Okay, this looks fine-enough. 1373 return True 1374 1375 def read_register_values(self, reg_infos, endian, thread_id=None): 1376 self.assertIsNotNone(reg_infos) 1377 values = {} 1378 1379 for reg_info in reg_infos: 1380 # We append a register index when load reg infos so we can work 1381 # with subsets. 1382 reg_index = reg_info.get("lldb_register_index") 1383 self.assertIsNotNone(reg_index) 1384 1385 # Handle thread suffix. 1386 if thread_id: 1387 p_request = "read packet: $p{:x};thread:{:x}#00".format( 1388 reg_index, thread_id 1389 ) 1390 else: 1391 p_request = "read packet: $p{:x}#00".format(reg_index) 1392 1393 # Read it with p. 1394 self.reset_test_sequence() 1395 self.test_sequence.add_log_lines( 1396 [ 1397 p_request, 1398 { 1399 "direction": "send", 1400 "regex": r"^\$([0-9a-fA-F]+)#", 1401 "capture": {1: "p_response"}, 1402 }, 1403 ], 1404 True, 1405 ) 1406 context = self.expect_gdbremote_sequence() 1407 self.assertIsNotNone(context) 1408 1409 # Convert value from target endian to integral. 1410 p_response = context.get("p_response") 1411 self.assertIsNotNone(p_response) 1412 self.assertTrue(len(p_response) > 0) 1413 1414 # on x86 Darwin, 4 GPR registers are often 1415 # unavailable, this is expected and correct. 1416 if ( 1417 self.getArchitecture() == "x86_64" 1418 and self.platformIsDarwin() 1419 and p_response[0] == "E" 1420 ): 1421 values[reg_index] = 0 1422 else: 1423 self.assertFalse(p_response[0] == "E") 1424 1425 values[reg_index] = unpack_register_hex_unsigned(endian, p_response) 1426 1427 return values 1428 1429 def add_vCont_query_packets(self): 1430 self.test_sequence.add_log_lines( 1431 [ 1432 "read packet: $vCont?#49", 1433 { 1434 "direction": "send", 1435 "regex": r"^\$(vCont)?(.*)#[0-9a-fA-F]{2}$", 1436 "capture": {2: "vCont_query_response"}, 1437 }, 1438 ], 1439 True, 1440 ) 1441 1442 def parse_vCont_query_response(self, context): 1443 self.assertIsNotNone(context) 1444 vCont_query_response = context.get("vCont_query_response") 1445 1446 # Handle case of no vCont support at all - in which case the capture 1447 # group will be none or zero length. 1448 if not vCont_query_response or len(vCont_query_response) == 0: 1449 return {} 1450 1451 return { 1452 key: 1 for key in vCont_query_response.split(";") if key and len(key) > 0 1453 } 1454 1455 def count_single_steps_until_true( 1456 self, 1457 thread_id, 1458 predicate, 1459 args, 1460 max_step_count=100, 1461 use_Hc_packet=True, 1462 step_instruction="s", 1463 ): 1464 """Used by single step test that appears in a few different contexts.""" 1465 single_step_count = 0 1466 1467 while single_step_count < max_step_count: 1468 self.assertIsNotNone(thread_id) 1469 1470 # Build the packet for the single step instruction. We replace 1471 # {thread}, if present, with the thread_id. 1472 step_packet = "read packet: ${}#00".format( 1473 re.sub(r"{thread}", "{:x}".format(thread_id), step_instruction) 1474 ) 1475 # print("\nstep_packet created: {}\n".format(step_packet)) 1476 1477 # Single step. 1478 self.reset_test_sequence() 1479 if use_Hc_packet: 1480 self.test_sequence.add_log_lines( 1481 [ # Set the continue thread. 1482 "read packet: $Hc{0:x}#00".format(thread_id), 1483 "send packet: $OK#00", 1484 ], 1485 True, 1486 ) 1487 self.test_sequence.add_log_lines( 1488 [ 1489 # Single step. 1490 step_packet, 1491 # "read packet: $vCont;s:{0:x}#00".format(thread_id), 1492 # Expect a breakpoint stop report. 1493 { 1494 "direction": "send", 1495 "regex": r"^\$T([0-9a-fA-F]{2})thread:([0-9a-fA-F]+);", 1496 "capture": {1: "stop_signo", 2: "stop_thread_id"}, 1497 }, 1498 ], 1499 True, 1500 ) 1501 context = self.expect_gdbremote_sequence() 1502 self.assertIsNotNone(context) 1503 self.assertIsNotNone(context.get("stop_signo")) 1504 self.assertEqual( 1505 int(context.get("stop_signo"), 16), 1506 lldbutil.get_signal_number("SIGTRAP"), 1507 ) 1508 1509 single_step_count += 1 1510 1511 # See if the predicate is true. If so, we're done. 1512 if predicate(args): 1513 return (True, single_step_count) 1514 1515 # The predicate didn't return true within the runaway step count. 1516 return (False, single_step_count) 1517 1518 def g_c1_c2_contents_are(self, args): 1519 """Used by single step test that appears in a few different contexts.""" 1520 g_c1_address = args["g_c1_address"] 1521 g_c2_address = args["g_c2_address"] 1522 expected_g_c1 = args["expected_g_c1"] 1523 expected_g_c2 = args["expected_g_c2"] 1524 1525 # Read g_c1 and g_c2 contents. 1526 self.reset_test_sequence() 1527 self.test_sequence.add_log_lines( 1528 [ 1529 "read packet: $m{0:x},{1:x}#00".format(g_c1_address, 1), 1530 { 1531 "direction": "send", 1532 "regex": r"^\$(.+)#[0-9a-fA-F]{2}$", 1533 "capture": {1: "g_c1_contents"}, 1534 }, 1535 "read packet: $m{0:x},{1:x}#00".format(g_c2_address, 1), 1536 { 1537 "direction": "send", 1538 "regex": r"^\$(.+)#[0-9a-fA-F]{2}$", 1539 "capture": {1: "g_c2_contents"}, 1540 }, 1541 ], 1542 True, 1543 ) 1544 1545 # Run the packet stream. 1546 context = self.expect_gdbremote_sequence() 1547 self.assertIsNotNone(context) 1548 1549 # Check if what we read from inferior memory is what we are expecting. 1550 self.assertIsNotNone(context.get("g_c1_contents")) 1551 self.assertIsNotNone(context.get("g_c2_contents")) 1552 1553 return (seven.unhexlify(context.get("g_c1_contents")) == expected_g_c1) and ( 1554 seven.unhexlify(context.get("g_c2_contents")) == expected_g_c2 1555 ) 1556 1557 def single_step_only_steps_one_instruction( 1558 self, use_Hc_packet=True, step_instruction="s" 1559 ): 1560 """Used by single step test that appears in a few different contexts.""" 1561 # Start up the inferior. 1562 procs = self.prep_debug_monitor_and_inferior( 1563 inferior_args=[ 1564 "get-code-address-hex:swap_chars", 1565 "get-data-address-hex:g_c1", 1566 "get-data-address-hex:g_c2", 1567 "sleep:1", 1568 "call-function:swap_chars", 1569 "sleep:5", 1570 ] 1571 ) 1572 1573 # Run the process 1574 self.test_sequence.add_log_lines( 1575 [ # Start running after initial stop. 1576 "read packet: $c#63", 1577 # Match output line that prints the memory address of the function call entry point. 1578 # Note we require launch-only testing so we can get inferior otuput. 1579 { 1580 "type": "output_match", 1581 "regex": r"^code address: 0x([0-9a-fA-F]+)\r\ndata address: 0x([0-9a-fA-F]+)\r\ndata address: 0x([0-9a-fA-F]+)\r\n$", 1582 "capture": { 1583 1: "function_address", 1584 2: "g_c1_address", 1585 3: "g_c2_address", 1586 }, 1587 }, 1588 # Now stop the inferior. 1589 "read packet: {}".format(chr(3)), 1590 # And wait for the stop notification. 1591 { 1592 "direction": "send", 1593 "regex": r"^\$T([0-9a-fA-F]{2})thread:([0-9a-fA-F]+);", 1594 "capture": {1: "stop_signo", 2: "stop_thread_id"}, 1595 }, 1596 ], 1597 True, 1598 ) 1599 1600 # Run the packet stream. 1601 context = self.expect_gdbremote_sequence() 1602 self.assertIsNotNone(context) 1603 1604 # Grab the main thread id. 1605 self.assertIsNotNone(context.get("stop_thread_id")) 1606 main_thread_id = int(context.get("stop_thread_id"), 16) 1607 1608 # Grab the function address. 1609 self.assertIsNotNone(context.get("function_address")) 1610 function_address = int(context.get("function_address"), 16) 1611 1612 # Grab the data addresses. 1613 self.assertIsNotNone(context.get("g_c1_address")) 1614 g_c1_address = int(context.get("g_c1_address"), 16) 1615 1616 self.assertIsNotNone(context.get("g_c2_address")) 1617 g_c2_address = int(context.get("g_c2_address"), 16) 1618 1619 # Set a breakpoint at the given address. 1620 if self.getArchitecture().startswith("arm"): 1621 # TODO: Handle case when setting breakpoint in thumb code 1622 BREAKPOINT_KIND = 4 1623 else: 1624 BREAKPOINT_KIND = 1 1625 self.reset_test_sequence() 1626 self.add_set_breakpoint_packets( 1627 function_address, do_continue=True, breakpoint_kind=BREAKPOINT_KIND 1628 ) 1629 context = self.expect_gdbremote_sequence() 1630 self.assertIsNotNone(context) 1631 1632 # Remove the breakpoint. 1633 self.reset_test_sequence() 1634 self.add_remove_breakpoint_packets( 1635 function_address, breakpoint_kind=BREAKPOINT_KIND 1636 ) 1637 context = self.expect_gdbremote_sequence() 1638 self.assertIsNotNone(context) 1639 1640 # Verify g_c1 and g_c2 match expected initial state. 1641 args = {} 1642 args["g_c1_address"] = g_c1_address 1643 args["g_c2_address"] = g_c2_address 1644 args["expected_g_c1"] = "0" 1645 args["expected_g_c2"] = "1" 1646 1647 self.assertTrue(self.g_c1_c2_contents_are(args)) 1648 1649 # Verify we take only a small number of steps to hit the first state. 1650 # Might need to work through function entry prologue code. 1651 args["expected_g_c1"] = "1" 1652 args["expected_g_c2"] = "1" 1653 (state_reached, step_count) = self.count_single_steps_until_true( 1654 main_thread_id, 1655 self.g_c1_c2_contents_are, 1656 args, 1657 max_step_count=25, 1658 use_Hc_packet=use_Hc_packet, 1659 step_instruction=step_instruction, 1660 ) 1661 self.assertTrue(state_reached) 1662 1663 # Verify we hit the next state. 1664 args["expected_g_c1"] = "1" 1665 args["expected_g_c2"] = "0" 1666 (state_reached, step_count) = self.count_single_steps_until_true( 1667 main_thread_id, 1668 self.g_c1_c2_contents_are, 1669 args, 1670 max_step_count=5, 1671 use_Hc_packet=use_Hc_packet, 1672 step_instruction=step_instruction, 1673 ) 1674 self.assertTrue(state_reached) 1675 expected_step_count = 1 1676 arch = self.getArchitecture() 1677 1678 # MIPS required "3" (ADDIU, SB, LD) machine instructions for updation 1679 # of variable value 1680 if re.match("mips", arch): 1681 expected_step_count = 3 1682 # S390X requires "2" (LARL, MVI) machine instructions for updation of 1683 # variable value 1684 if re.match("s390x", arch): 1685 expected_step_count = 2 1686 # ARM64 requires "4" instructions: 2 to compute the address (adrp, 1687 # add), one to materialize the constant (mov) and the store. Once 1688 # addresses and constants are materialized, only one instruction is 1689 # needed. 1690 if re.match("arm64", arch): 1691 before_materialization_step_count = 4 1692 after_matrialization_step_count = 1 1693 self.assertIn( 1694 step_count, 1695 [before_materialization_step_count, after_matrialization_step_count], 1696 ) 1697 expected_step_count = after_matrialization_step_count 1698 else: 1699 self.assertEqual(step_count, expected_step_count) 1700 1701 # Verify we hit the next state. 1702 args["expected_g_c1"] = "0" 1703 args["expected_g_c2"] = "0" 1704 (state_reached, step_count) = self.count_single_steps_until_true( 1705 main_thread_id, 1706 self.g_c1_c2_contents_are, 1707 args, 1708 max_step_count=5, 1709 use_Hc_packet=use_Hc_packet, 1710 step_instruction=step_instruction, 1711 ) 1712 self.assertTrue(state_reached) 1713 self.assertEqual(step_count, expected_step_count) 1714 1715 # Verify we hit the next state. 1716 args["expected_g_c1"] = "0" 1717 args["expected_g_c2"] = "1" 1718 (state_reached, step_count) = self.count_single_steps_until_true( 1719 main_thread_id, 1720 self.g_c1_c2_contents_are, 1721 args, 1722 max_step_count=5, 1723 use_Hc_packet=use_Hc_packet, 1724 step_instruction=step_instruction, 1725 ) 1726 self.assertTrue(state_reached) 1727 self.assertEqual(step_count, expected_step_count) 1728 1729 def maybe_strict_output_regex(self, regex): 1730 return ( 1731 ".*" + regex + ".*" 1732 if lldbplatformutil.hasChattyStderr(self) 1733 else "^" + regex + "$" 1734 ) 1735 1736 def install_and_create_launch_args(self): 1737 exe_path = self.getBuildArtifact("a.out") 1738 if not lldb.remote_platform: 1739 return [exe_path] 1740 remote_path = lldbutil.append_to_process_working_directory( 1741 self, os.path.basename(exe_path) 1742 ) 1743 remote_file_spec = lldb.SBFileSpec(remote_path, False) 1744 err = lldb.remote_platform.Install( 1745 lldb.SBFileSpec(exe_path, True), remote_file_spec 1746 ) 1747 if err.Fail(): 1748 raise Exception( 1749 "remote_platform.Install('%s', '%s') failed: %s" 1750 % (exe_path, remote_path, err) 1751 ) 1752 return [remote_path] 1753