xref: /llvm-project/lldb/packages/Python/lldbsuite/test/tools/lldb-server/gdbremote_testcase.py (revision 5056a4b556077da79afe34f54b5447c19a77d97d)
1"""
2Base class for gdb-remote test cases.
3"""
4
5import errno
6import os
7import os.path
8import random
9import re
10import select
11import socket
12import subprocess
13import sys
14import tempfile
15import time
16from lldbsuite.test import configuration
17from lldbsuite.test.lldbtest import *
18from lldbsuite.support import seven
19from lldbgdbserverutils import *
20import logging
21
22
23class _ConnectionRefused(IOError):
24    pass
25
26
27class GdbRemoteTestCaseFactory(type):
28    def __new__(cls, name, bases, attrs):
29        newattrs = {}
30        for attrname, attrvalue in attrs.items():
31            if not attrname.startswith("test"):
32                newattrs[attrname] = attrvalue
33                continue
34
35            # If any debug server categories were explicitly tagged, assume
36            # that list to be authoritative. If none were specified, try
37            # all of them.
38            all_categories = set(["debugserver", "llgs"])
39            categories = set(getattr(attrvalue, "categories", [])) & all_categories
40            if not categories:
41                categories = all_categories
42
43            for cat in categories:
44
45                @decorators.add_test_categories([cat])
46                @wraps(attrvalue)
47                def test_method(self, attrvalue=attrvalue):
48                    return attrvalue(self)
49
50                method_name = attrname + "_" + cat
51                test_method.__name__ = method_name
52                test_method.debug_server = cat
53                newattrs[method_name] = test_method
54
55        return super(GdbRemoteTestCaseFactory, cls).__new__(cls, name, bases, newattrs)
56
57
58class GdbRemoteTestCaseBase(Base, metaclass=GdbRemoteTestCaseFactory):
59    # Default time out in seconds. The timeout is increased tenfold under Asan.
60    DEFAULT_TIMEOUT = 20 * (10 if ("ASAN_OPTIONS" in os.environ) else 1)
61    # Default sleep time in seconds. The sleep time is doubled under Asan.
62    DEFAULT_SLEEP = 5 * (2 if ("ASAN_OPTIONS" in os.environ) else 1)
63
64    _GDBREMOTE_KILL_PACKET = b"$k#6b"
65
66    # Start the inferior separately, attach to the inferior on the stub
67    # command line.
68    _STARTUP_ATTACH = "attach"
69    # Start the inferior separately, start the stub without attaching, allow
70    # the test to attach to the inferior however it wants (e.g. $vAttach;pid).
71    _STARTUP_ATTACH_MANUALLY = "attach_manually"
72    # Start the stub, and launch the inferior with an $A packet via the
73    # initial packet stream.
74    _STARTUP_LAUNCH = "launch"
75
76    # GDB Signal numbers that are not target-specific used for common
77    # exceptions
78    TARGET_EXC_BAD_ACCESS = 0x91
79    TARGET_EXC_BAD_INSTRUCTION = 0x92
80    TARGET_EXC_ARITHMETIC = 0x93
81    TARGET_EXC_EMULATION = 0x94
82    TARGET_EXC_SOFTWARE = 0x95
83    TARGET_EXC_BREAKPOINT = 0x96
84
85    _verbose_log_handler = None
86    _log_formatter = logging.Formatter(fmt="%(asctime)-15s %(levelname)-8s %(message)s")
87
88    def setUpBaseLogging(self):
89        self.logger = logging.getLogger(__name__)
90
91        if len(self.logger.handlers) > 0:
92            return  # We have set up this handler already
93
94        self.logger.propagate = False
95        self.logger.setLevel(logging.DEBUG)
96
97        # log all warnings to stderr
98        handler = logging.StreamHandler()
99        handler.setLevel(logging.WARNING)
100        handler.setFormatter(self._log_formatter)
101        self.logger.addHandler(handler)
102
103    def isVerboseLoggingRequested(self):
104        # We will report our detailed logs if the user requested that the "gdb-remote" channel is
105        # logged.
106        return any(("gdb-remote" in channel) for channel in lldbtest_config.channels)
107
108    def getDebugServer(self):
109        method = getattr(self, self.testMethodName)
110        return getattr(method, "debug_server", None)
111
112    def setUp(self):
113        super(GdbRemoteTestCaseBase, self).setUp()
114
115        self.setUpBaseLogging()
116        self.debug_monitor_extra_args = []
117
118        if self.isVerboseLoggingRequested():
119            # If requested, full logs go to a log file
120            self._verbose_log_handler = logging.FileHandler(
121                self.getLogBasenameForCurrentTest() + "-host.log"
122            )
123            self._verbose_log_handler.setFormatter(self._log_formatter)
124            self._verbose_log_handler.setLevel(logging.DEBUG)
125            self.logger.addHandler(self._verbose_log_handler)
126
127        self.test_sequence = GdbRemoteTestSequence(self.logger)
128        self.set_inferior_startup_launch()
129        self.port = self.get_next_port()
130        self.stub_sends_two_stop_notifications_on_kill = False
131        if configuration.lldb_platform_url:
132            if configuration.lldb_platform_url.startswith("unix-"):
133                url_pattern = r"(.+)://\[?(.+?)\]?/.*"
134            else:
135                url_pattern = r"(.+)://(.+):\d+"
136            scheme, host = re.match(
137                url_pattern, configuration.lldb_platform_url
138            ).groups()
139            if (
140                configuration.lldb_platform_name == "remote-android"
141                and host != "localhost"
142            ):
143                self.stub_device = host
144                self.stub_hostname = "localhost"
145            else:
146                self.stub_device = None
147                self.stub_hostname = host
148        else:
149            self.stub_hostname = "localhost"
150
151        debug_server = self.getDebugServer()
152        if debug_server == "debugserver":
153            self._init_debugserver_test()
154        else:
155            self._init_llgs_test()
156
157    def tearDown(self):
158        self.logger.removeHandler(self._verbose_log_handler)
159        self._verbose_log_handler = None
160        TestBase.tearDown(self)
161
162    def getLocalServerLogFile(self):
163        return self.getLogBasenameForCurrentTest() + "-server.log"
164
165    def setUpServerLogging(self, is_llgs):
166        if len(lldbtest_config.channels) == 0:
167            return  # No logging requested
168
169        if lldb.remote_platform:
170            log_file = lldbutil.join_remote_paths(
171                lldb.remote_platform.GetWorkingDirectory(), "server.log"
172            )
173        else:
174            log_file = self.getLocalServerLogFile()
175
176        if is_llgs:
177            self.debug_monitor_extra_args.append("--log-file=" + log_file)
178            self.debug_monitor_extra_args.append(
179                "--log-channels={}".format(":".join(lldbtest_config.channels))
180            )
181        else:
182            self.debug_monitor_extra_args = [
183                "--log-file=" + log_file,
184                "--log-flags=0x800000",
185            ]
186
187    def get_next_port(self):
188        return 12000 + random.randint(0, 7999)
189
190    def reset_test_sequence(self):
191        self.test_sequence = GdbRemoteTestSequence(self.logger)
192
193    def _init_llgs_test(self):
194        reverse_connect = True
195        if lldb.remote_platform:
196            # Reverse connections may be tricky due to firewalls/NATs.
197            reverse_connect = False
198
199            # FIXME: This is extremely linux-oriented
200
201            # Grab the ppid from /proc/[shell pid]/stat
202            err, retcode, shell_stat = self.run_platform_command("cat /proc/$$/stat")
203            self.assertTrue(
204                err.Success() and retcode == 0,
205                "Failed to read file /proc/$$/stat: %s, retcode: %d"
206                % (err.GetCString(), retcode),
207            )
208
209            # [pid] ([executable]) [state] [*ppid*]
210            pid = re.match(r"^\d+ \(.+\) . (\d+)", shell_stat).group(1)
211            err, retcode, ls_output = self.run_platform_command(
212                "ls -l /proc/%s/exe" % pid
213            )
214            self.assertTrue(
215                err.Success() and retcode == 0,
216                "Failed to read file /proc/%s/exe: %s, retcode: %d"
217                % (pid, err.GetCString(), retcode),
218            )
219            exe = ls_output.split()[-1]
220
221            # If the binary has been deleted, the link name has " (deleted)" appended.
222            # Remove if it's there.
223            self.debug_monitor_exe = re.sub(r" \(deleted\)$", "", exe)
224        else:
225            self.debug_monitor_exe = get_lldb_server_exe()
226
227        self.debug_monitor_extra_args = ["gdbserver"]
228        self.setUpServerLogging(is_llgs=True)
229
230        self.reverse_connect = reverse_connect
231
232    def _init_debugserver_test(self):
233        self.debug_monitor_exe = get_debugserver_exe()
234        self.setUpServerLogging(is_llgs=False)
235        self.reverse_connect = True
236
237        # The debugserver stub has a race on handling the 'k' command, so it sends an X09 right away, then sends the real X notification
238        # when the process truly dies.
239        self.stub_sends_two_stop_notifications_on_kill = True
240
241    def forward_adb_port(self, source, target, direction, device):
242        adb = ["adb"] + (["-s", device] if device else []) + [direction]
243
244        def remove_port_forward():
245            subprocess.call(adb + ["--remove", "tcp:%d" % source])
246
247        subprocess.call(adb + ["tcp:%d" % source, "tcp:%d" % target])
248        self.addTearDownHook(remove_port_forward)
249
250    def _verify_socket(self, sock):
251        # Normally, when the remote stub is not ready, we will get ECONNREFUSED during the
252        # connect() attempt. However, due to the way how ADB forwarding works, on android targets
253        # the connect() will always be successful, but the connection will be immediately dropped
254        # if ADB could not connect on the remote side. This function tries to detect this
255        # situation, and report it as "connection refused" so that the upper layers attempt the
256        # connection again.
257        triple = self.dbg.GetSelectedPlatform().GetTriple()
258        if not re.match(".*-.*-.*-android", triple):
259            return  # Not android.
260        can_read, _, _ = select.select([sock], [], [], 0.1)
261        if sock not in can_read:
262            return  # Data is not available, but the connection is alive.
263        if len(sock.recv(1, socket.MSG_PEEK)) == 0:
264            raise _ConnectionRefused()  # Got EOF, connection dropped.
265
266    def create_socket(self):
267        try:
268            sock = socket.socket(family=socket.AF_INET)
269        except OSError as e:
270            if e.errno != errno.EAFNOSUPPORT:
271                raise
272            sock = socket.socket(family=socket.AF_INET6)
273
274        logger = self.logger
275
276        triple = self.dbg.GetSelectedPlatform().GetTriple()
277        if re.match(".*-.*-.*-android", triple):
278            self.forward_adb_port(self.port, self.port, "forward", self.stub_device)
279
280        logger.info(
281            "Connecting to debug monitor on %s:%d", self.stub_hostname, self.port
282        )
283        connect_info = (self.stub_hostname, self.port)
284        try:
285            sock.connect(connect_info)
286        except socket.error as serr:
287            if serr.errno == errno.ECONNREFUSED:
288                raise _ConnectionRefused()
289            raise serr
290
291        def shutdown_socket():
292            if sock:
293                try:
294                    # send the kill packet so lldb-server shuts down gracefully
295                    sock.sendall(GdbRemoteTestCaseBase._GDBREMOTE_KILL_PACKET)
296                except:
297                    logger.warning(
298                        "failed to send kill packet to debug monitor: {}; ignoring".format(
299                            sys.exc_info()[0]
300                        )
301                    )
302
303                try:
304                    sock.close()
305                except:
306                    logger.warning(
307                        "failed to close socket to debug monitor: {}; ignoring".format(
308                            sys.exc_info()[0]
309                        )
310                    )
311
312        self.addTearDownHook(shutdown_socket)
313
314        self._verify_socket(sock)
315
316        return sock
317
318    def set_inferior_startup_launch(self):
319        self._inferior_startup = self._STARTUP_LAUNCH
320
321    def set_inferior_startup_attach(self):
322        self._inferior_startup = self._STARTUP_ATTACH
323
324    def set_inferior_startup_attach_manually(self):
325        self._inferior_startup = self._STARTUP_ATTACH_MANUALLY
326
327    def get_debug_monitor_command_line_args(self, attach_pid=None):
328        commandline_args = self.debug_monitor_extra_args
329        if attach_pid:
330            commandline_args += ["--attach=%d" % attach_pid]
331        if self.reverse_connect:
332            commandline_args += ["--reverse-connect", self.connect_address]
333        else:
334            if lldb.remote_platform:
335                commandline_args += ["*:{}".format(self.port)]
336            else:
337                commandline_args += ["localhost:{}".format(self.port)]
338
339        return commandline_args
340
341    def get_target_byte_order(self):
342        inferior_exe_path = self.getBuildArtifact("a.out")
343        target = self.dbg.CreateTarget(inferior_exe_path)
344        return target.GetByteOrder()
345
346    def launch_debug_monitor(self, attach_pid=None, logfile=None):
347        if self.reverse_connect:
348            family, type, proto, _, addr = socket.getaddrinfo(
349                "localhost", 0, proto=socket.IPPROTO_TCP
350            )[0]
351            sock = socket.socket(family, type, proto)
352            sock.settimeout(self.DEFAULT_TIMEOUT)
353
354            sock.bind(addr)
355            sock.listen(1)
356            addr = sock.getsockname()
357            self.connect_address = "[{}]:{}".format(*addr)
358
359        # Create the command line.
360        commandline_args = self.get_debug_monitor_command_line_args(
361            attach_pid=attach_pid
362        )
363
364        # Start the server.
365        server = self.spawnSubprocess(
366            self.debug_monitor_exe, commandline_args, install_remote=False
367        )
368        self.assertIsNotNone(server)
369
370        if self.reverse_connect:
371            self.sock = sock.accept()[0]
372            self.sock.settimeout(self.DEFAULT_TIMEOUT)
373
374        return server
375
376    def connect_to_debug_monitor(self, attach_pid=None):
377        if self.reverse_connect:
378            # Create the stub.
379            server = self.launch_debug_monitor(attach_pid=attach_pid)
380            self.assertIsNotNone(server)
381
382            # Schedule debug monitor to be shut down during teardown.
383            logger = self.logger
384
385            self._server = Server(self.sock, server)
386            return server
387
388        # We're using a random port algorithm to try not to collide with other ports,
389        # and retry a max # times.
390        attempts = 0
391        MAX_ATTEMPTS = 10
392        attempt_wait = 3
393
394        while attempts < MAX_ATTEMPTS:
395            server = self.launch_debug_monitor(attach_pid=attach_pid)
396
397            # Schedule debug monitor to be shut down during teardown.
398            logger = self.logger
399
400            connect_attemps = 0
401            MAX_CONNECT_ATTEMPTS = 10
402
403            while connect_attemps < MAX_CONNECT_ATTEMPTS:
404                # Create a socket to talk to the server
405                try:
406                    logger.info("Connect attempt %d", connect_attemps + 1)
407                    self.sock = self.create_socket()
408                    self._server = Server(self.sock, server)
409                    return server
410                except _ConnectionRefused as serr:
411                    # Ignore, and try again.
412                    pass
413                time.sleep(0.5)
414                connect_attemps += 1
415
416            # We should close the server here to be safe.
417            server.terminate()
418
419            # Increment attempts.
420            print(
421                "connect to debug monitor on port %d failed, attempt #%d of %d"
422                % (self.port, attempts + 1, MAX_ATTEMPTS)
423            )
424            attempts += 1
425
426            # And wait a random length of time before next attempt, to avoid
427            # collisions.
428            time.sleep(attempt_wait)
429            attempt_wait *= 1.2
430
431            # Now grab a new port number.
432            self.port = self.get_next_port()
433
434        raise Exception(
435            "failed to create a socket to the launched debug monitor after %d tries"
436            % attempts
437        )
438
439    def launch_process_for_attach(
440        self, inferior_args=None, sleep_seconds=3, exe_path=None
441    ):
442        # We're going to start a child process that the debug monitor stub can later attach to.
443        # This process needs to be started so that it just hangs around for a while.  We'll
444        # have it sleep.
445        if not exe_path:
446            exe_path = self.getBuildArtifact("a.out")
447
448        args = []
449        if inferior_args:
450            args.extend(inferior_args)
451        if sleep_seconds:
452            args.append("sleep:%d" % sleep_seconds)
453
454        return self.spawnSubprocess(exe_path, args)
455
456    def prep_debug_monitor_and_inferior(
457        self,
458        inferior_args=None,
459        inferior_sleep_seconds=3,
460        inferior_exe_path=None,
461        inferior_env=None,
462    ):
463        """Prep the debug monitor, the inferior, and the expected packet stream.
464
465        Handle the separate cases of using the debug monitor in attach-to-inferior mode
466        and in launch-inferior mode.
467
468        For attach-to-inferior mode, the inferior process is first started, then
469        the debug monitor is started in attach to pid mode (using --attach on the
470        stub command line), and the no-ack-mode setup is appended to the packet
471        stream.  The packet stream is not yet executed, ready to have more expected
472        packet entries added to it.
473
474        For launch-inferior mode, the stub is first started, then no ack mode is
475        setup on the expected packet stream, then the verified launch packets are added
476        to the expected socket stream.  The packet stream is not yet executed, ready
477        to have more expected packet entries added to it.
478
479        The return value is:
480        {inferior:<inferior>, server:<server>}
481        """
482        inferior = None
483        attach_pid = None
484
485        if (
486            self._inferior_startup == self._STARTUP_ATTACH
487            or self._inferior_startup == self._STARTUP_ATTACH_MANUALLY
488        ):
489            # Launch the process that we'll use as the inferior.
490            inferior = self.launch_process_for_attach(
491                inferior_args=inferior_args,
492                sleep_seconds=inferior_sleep_seconds,
493                exe_path=inferior_exe_path,
494            )
495            self.assertIsNotNone(inferior)
496            self.assertTrue(inferior.pid > 0)
497            if self._inferior_startup == self._STARTUP_ATTACH:
498                # In this case, we want the stub to attach via the command
499                # line, so set the command line attach pid here.
500                attach_pid = inferior.pid
501
502        if self._inferior_startup == self._STARTUP_LAUNCH:
503            # Build launch args
504            if not inferior_exe_path:
505                inferior_exe_path = self.getBuildArtifact("a.out")
506
507            if lldb.remote_platform:
508                remote_path = lldbutil.append_to_process_working_directory(
509                    self, os.path.basename(inferior_exe_path)
510                )
511                remote_file_spec = lldb.SBFileSpec(remote_path, False)
512                err = lldb.remote_platform.Install(
513                    lldb.SBFileSpec(inferior_exe_path, True), remote_file_spec
514                )
515                if err.Fail():
516                    raise Exception(
517                        "remote_platform.Install('%s', '%s') failed: %s"
518                        % (inferior_exe_path, remote_path, err)
519                    )
520                inferior_exe_path = remote_path
521
522            launch_args = [inferior_exe_path]
523            if inferior_args:
524                launch_args.extend(inferior_args)
525
526        # Launch the debug monitor stub, attaching to the inferior.
527        server = self.connect_to_debug_monitor(attach_pid=attach_pid)
528        self.assertIsNotNone(server)
529
530        self.do_handshake()
531
532        # Build the expected protocol stream
533        if inferior_env:
534            for name, value in inferior_env.items():
535                self.add_set_environment_packets(name, value)
536        if self._inferior_startup == self._STARTUP_LAUNCH:
537            self.add_verified_launch_packets(launch_args)
538
539        return {"inferior": inferior, "server": server}
540
541    def do_handshake(self):
542        server = self._server
543        server.send_ack()
544        server.send_packet(b"QStartNoAckMode")
545        self.assertEqual(server.get_normal_packet(), b"+")
546        self.assertEqual(server.get_normal_packet(), b"OK")
547        server.send_ack()
548
549    def add_verified_launch_packets(self, launch_args):
550        self.test_sequence.add_log_lines(
551            [
552                "read packet: %s" % build_gdbremote_A_packet(launch_args),
553                "send packet: $OK#00",
554                "read packet: $qLaunchSuccess#a5",
555                "send packet: $OK#00",
556            ],
557            True,
558        )
559
560    def add_thread_suffix_request_packets(self):
561        self.test_sequence.add_log_lines(
562            [
563                "read packet: $QThreadSuffixSupported#e4",
564                "send packet: $OK#00",
565            ],
566            True,
567        )
568
569    def add_process_info_collection_packets(self):
570        self.test_sequence.add_log_lines(
571            [
572                "read packet: $qProcessInfo#dc",
573                {
574                    "direction": "send",
575                    "regex": r"^\$(.+)#[0-9a-fA-F]{2}$",
576                    "capture": {1: "process_info_raw"},
577                },
578            ],
579            True,
580        )
581
582    def add_set_environment_packets(self, name, value):
583        self.test_sequence.add_log_lines(
584            [
585                "read packet: $QEnvironment:" + name + "=" + value + "#00",
586                "send packet: $OK#00",
587            ],
588            True,
589        )
590
591    _KNOWN_PROCESS_INFO_KEYS = [
592        "pid",
593        "parent-pid",
594        "real-uid",
595        "real-gid",
596        "effective-uid",
597        "effective-gid",
598        "cputype",
599        "cpusubtype",
600        "ostype",
601        "triple",
602        "vendor",
603        "endian",
604        "elf_abi",
605        "ptrsize",
606    ]
607
608    def parse_process_info_response(self, context):
609        # Ensure we have a process info response.
610        self.assertIsNotNone(context)
611        process_info_raw = context.get("process_info_raw")
612        self.assertIsNotNone(process_info_raw)
613
614        # Pull out key:value; pairs.
615        process_info_dict = {
616            match.group(1): match.group(2)
617            for match in re.finditer(r"([^:]+):([^;]+);", process_info_raw)
618        }
619
620        # Validate keys are known.
621        for key, val in list(process_info_dict.items()):
622            self.assertTrue(key in self._KNOWN_PROCESS_INFO_KEYS)
623            self.assertIsNotNone(val)
624
625        return process_info_dict
626
627    def add_register_info_collection_packets(self):
628        self.test_sequence.add_log_lines(
629            [
630                {
631                    "type": "multi_response",
632                    "query": "qRegisterInfo",
633                    "append_iteration_suffix": True,
634                    "end_regex": re.compile(r"^\$(E\d+)?#[0-9a-fA-F]{2}$"),
635                    "save_key": "reg_info_responses",
636                }
637            ],
638            True,
639        )
640
641    def parse_register_info_packets(self, context):
642        """Return an array of register info dictionaries, one per register info."""
643        reg_info_responses = context.get("reg_info_responses")
644        self.assertIsNotNone(reg_info_responses)
645
646        # Parse register infos.
647        return [
648            parse_reg_info_response(reg_info_response)
649            for reg_info_response in reg_info_responses
650        ]
651
652    def expect_gdbremote_sequence(self):
653        return expect_lldb_gdbserver_replay(
654            self,
655            self._server,
656            self.test_sequence,
657            self.DEFAULT_TIMEOUT * len(self.test_sequence),
658            self.logger,
659        )
660
661    _KNOWN_REGINFO_KEYS = [
662        "name",
663        "alt-name",
664        "bitsize",
665        "offset",
666        "encoding",
667        "format",
668        "set",
669        "gcc",
670        "ehframe",
671        "dwarf",
672        "generic",
673        "container-regs",
674        "invalidate-regs",
675        "dynamic_size_dwarf_expr_bytes",
676        "dynamic_size_dwarf_len",
677    ]
678
679    def assert_valid_reg_info(self, reg_info):
680        # Assert we know about all the reginfo keys parsed.
681        for key in reg_info:
682            self.assertTrue(key in self._KNOWN_REGINFO_KEYS)
683
684        # Check the bare-minimum expected set of register info keys.
685        self.assertTrue("name" in reg_info)
686        self.assertTrue("bitsize" in reg_info)
687
688        if not self.getArchitecture() == "aarch64":
689            self.assertTrue("offset" in reg_info)
690
691        self.assertTrue("encoding" in reg_info)
692        self.assertTrue("format" in reg_info)
693
694    def find_pc_reg_info(self, reg_infos):
695        lldb_reg_index = 0
696        for reg_info in reg_infos:
697            if ("generic" in reg_info) and (reg_info["generic"] == "pc"):
698                return (lldb_reg_index, reg_info)
699            lldb_reg_index += 1
700
701        return (None, None)
702
703    def add_lldb_register_index(self, reg_infos):
704        """Add a "lldb_register_index" key containing the 0-baed index of each reg_infos entry.
705
706        We'll use this when we want to call packets like P/p with a register index but do so
707        on only a subset of the full register info set.
708        """
709        self.assertIsNotNone(reg_infos)
710
711        reg_index = 0
712        for reg_info in reg_infos:
713            reg_info["lldb_register_index"] = reg_index
714            reg_index += 1
715
716    def add_query_memory_region_packets(self, address):
717        self.test_sequence.add_log_lines(
718            [
719                "read packet: $qMemoryRegionInfo:{0:x}#00".format(address),
720                {
721                    "direction": "send",
722                    "regex": r"^\$(.+)#[0-9a-fA-F]{2}$",
723                    "capture": {1: "memory_region_response"},
724                },
725            ],
726            True,
727        )
728
729    def parse_key_val_dict(self, key_val_text, allow_dupes=True):
730        self.assertIsNotNone(key_val_text)
731        kv_dict = {}
732        for match in re.finditer(r";?([^:]+):([^;]+)", key_val_text):
733            key = match.group(1)
734            val = match.group(2)
735            if key in kv_dict:
736                if allow_dupes:
737                    if isinstance(kv_dict[key], list):
738                        kv_dict[key].append(val)
739                    else:
740                        # Promote to list
741                        kv_dict[key] = [kv_dict[key], val]
742                else:
743                    self.fail(
744                        "key '{}' already present when attempting to add value '{}' (text='{}', dict={})".format(
745                            key, val, key_val_text, kv_dict
746                        )
747                    )
748            else:
749                kv_dict[key] = val
750        return kv_dict
751
752    def parse_memory_region_packet(self, context):
753        # Ensure we have a context.
754        self.assertIsNotNone(context.get("memory_region_response"))
755
756        # Pull out key:value; pairs.
757        mem_region_dict = self.parse_key_val_dict(context.get("memory_region_response"))
758
759        # Validate keys are known.
760        for key, val in list(mem_region_dict.items()):
761            self.assertIn(
762                key,
763                [
764                    "start",
765                    "size",
766                    "permissions",
767                    "flags",
768                    "name",
769                    "error",
770                    "dirty-pages",
771                    "type",
772                ],
773            )
774            self.assertIsNotNone(val)
775
776        mem_region_dict["name"] = seven.unhexlify(mem_region_dict.get("name", ""))
777        # Return the dictionary of key-value pairs for the memory region.
778        return mem_region_dict
779
780    def assert_address_within_memory_region(self, test_address, mem_region_dict):
781        self.assertIsNotNone(mem_region_dict)
782        self.assertTrue("start" in mem_region_dict)
783        self.assertTrue("size" in mem_region_dict)
784
785        range_start = int(mem_region_dict["start"], 16)
786        range_size = int(mem_region_dict["size"], 16)
787        range_end = range_start + range_size
788
789        if test_address < range_start:
790            self.fail(
791                "address 0x{0:x} comes before range 0x{1:x} - 0x{2:x} (size 0x{3:x})".format(
792                    test_address, range_start, range_end, range_size
793                )
794            )
795        elif test_address >= range_end:
796            self.fail(
797                "address 0x{0:x} comes after range 0x{1:x} - 0x{2:x} (size 0x{3:x})".format(
798                    test_address, range_start, range_end, range_size
799                )
800            )
801
802    def add_threadinfo_collection_packets(self):
803        self.test_sequence.add_log_lines(
804            [
805                {
806                    "type": "multi_response",
807                    "first_query": "qfThreadInfo",
808                    "next_query": "qsThreadInfo",
809                    "append_iteration_suffix": False,
810                    "end_regex": re.compile(r"^\$(l)?#[0-9a-fA-F]{2}$"),
811                    "save_key": "threadinfo_responses",
812                }
813            ],
814            True,
815        )
816
817    def parse_threadinfo_packets(self, context):
818        """Return an array of thread ids (decimal ints), one per thread."""
819        threadinfo_responses = context.get("threadinfo_responses")
820        self.assertIsNotNone(threadinfo_responses)
821
822        thread_ids = []
823        for threadinfo_response in threadinfo_responses:
824            new_thread_infos = parse_threadinfo_response(threadinfo_response)
825            thread_ids.extend(new_thread_infos)
826        return thread_ids
827
828    def launch_with_threads(self, thread_count):
829        procs = self.prep_debug_monitor_and_inferior(
830            inferior_args=["thread:new"] * (thread_count - 1) + ["trap"]
831        )
832
833        self.test_sequence.add_log_lines(
834            [
835                "read packet: $c#00",
836                {
837                    "direction": "send",
838                    "regex": r"^\$T([0-9a-fA-F]{2})([^#]*)#..$",
839                    "capture": {1: "stop_signo", 2: "stop_reply_kv"},
840                },
841            ],
842            True,
843        )
844        self.add_threadinfo_collection_packets()
845        context = self.expect_gdbremote_sequence()
846        threads = self.parse_threadinfo_packets(context)
847        self.assertGreaterEqual(len(threads), thread_count)
848        return context, threads
849
850    def add_set_breakpoint_packets(
851        self, address, z_packet_type=0, do_continue=True, breakpoint_kind=1
852    ):
853        self.test_sequence.add_log_lines(
854            [  # Set the breakpoint.
855                "read packet: $Z{2},{0:x},{1}#00".format(
856                    address, breakpoint_kind, z_packet_type
857                ),
858                # Verify the stub could set it.
859                "send packet: $OK#00",
860            ],
861            True,
862        )
863
864        if do_continue:
865            self.test_sequence.add_log_lines(
866                [  # Continue the inferior.
867                    "read packet: $c#63",
868                    # Expect a breakpoint stop report.
869                    {
870                        "direction": "send",
871                        "regex": r"^\$T([0-9a-fA-F]{2})thread:([0-9a-fA-F]+);",
872                        "capture": {1: "stop_signo", 2: "stop_thread_id"},
873                    },
874                ],
875                True,
876            )
877
878    def add_remove_breakpoint_packets(
879        self, address, z_packet_type=0, breakpoint_kind=1
880    ):
881        self.test_sequence.add_log_lines(
882            [  # Remove the breakpoint.
883                "read packet: $z{2},{0:x},{1}#00".format(
884                    address, breakpoint_kind, z_packet_type
885                ),
886                # Verify the stub could unset it.
887                "send packet: $OK#00",
888            ],
889            True,
890        )
891
892    def add_qSupported_packets(self, client_features=[]):
893        features = "".join(";" + x for x in client_features)
894        self.test_sequence.add_log_lines(
895            [
896                "read packet: $qSupported{}#00".format(features),
897                {
898                    "direction": "send",
899                    "regex": r"^\$(.*)#[0-9a-fA-F]{2}",
900                    "capture": {1: "qSupported_response"},
901                },
902            ],
903            True,
904        )
905
906    _KNOWN_QSUPPORTED_STUB_FEATURES = [
907        "augmented-libraries-svr4-read",
908        "PacketSize",
909        "QStartNoAckMode",
910        "QThreadSuffixSupported",
911        "QListThreadsInStopReply",
912        "qXfer:auxv:read",
913        "qXfer:libraries:read",
914        "qXfer:libraries-svr4:read",
915        "qXfer:features:read",
916        "qXfer:siginfo:read",
917        "qEcho",
918        "QPassSignals",
919        "multiprocess",
920        "fork-events",
921        "vfork-events",
922        "memory-tagging",
923        "qSaveCore",
924        "native-signals",
925        "QNonStop",
926        "SupportedWatchpointTypes",
927        "SupportedCompressions",
928    ]
929
930    def parse_qSupported_response(self, context):
931        self.assertIsNotNone(context)
932
933        raw_response = context.get("qSupported_response")
934        self.assertIsNotNone(raw_response)
935
936        # For values with key=val, the dict key and vals are set as expected.  For feature+, feature- and feature?, the
937        # +,-,? is stripped from the key and set as the value.
938        supported_dict = {}
939        for match in re.finditer(r";?([^=;]+)(=([^;]+))?", raw_response):
940            key = match.group(1)
941            val = match.group(3)
942
943            # key=val: store as is
944            if val and len(val) > 0:
945                supported_dict[key] = val
946            else:
947                if len(key) < 2:
948                    raise Exception(
949                        "singular stub feature is too short: must be stub_feature{+,-,?}"
950                    )
951                supported_type = key[-1]
952                key = key[:-1]
953                if not supported_type in ["+", "-", "?"]:
954                    raise Exception(
955                        "malformed stub feature: final character {} not in expected set (+,-,?)".format(
956                            supported_type
957                        )
958                    )
959                supported_dict[key] = supported_type
960            # Ensure we know the supported element
961            if key not in self._KNOWN_QSUPPORTED_STUB_FEATURES:
962                raise Exception("unknown qSupported stub feature reported: %s" % key)
963
964        return supported_dict
965
966    def continue_process_and_wait_for_stop(self):
967        self.test_sequence.add_log_lines(
968            [
969                "read packet: $vCont;c#a8",
970                {
971                    "direction": "send",
972                    "regex": r"^\$T([0-9a-fA-F]{2})(.*)#[0-9a-fA-F]{2}$",
973                    "capture": {1: "stop_signo", 2: "stop_key_val_text"},
974                },
975            ],
976            True,
977        )
978        context = self.expect_gdbremote_sequence()
979        self.assertIsNotNone(context)
980        return self.parse_interrupt_packets(context)
981
982    def select_modifiable_register(self, reg_infos):
983        """Find a register that can be read/written freely."""
984        PREFERRED_REGISTER_NAMES = set(
985            [
986                "rax",
987            ]
988        )
989
990        # First check for the first register from the preferred register name
991        # set.
992        alternative_register_index = None
993
994        self.assertIsNotNone(reg_infos)
995        for reg_info in reg_infos:
996            if ("name" in reg_info) and (reg_info["name"] in PREFERRED_REGISTER_NAMES):
997                # We found a preferred register.  Use it.
998                return reg_info["lldb_register_index"]
999            if ("generic" in reg_info) and (
1000                reg_info["generic"] == "fp" or reg_info["generic"] == "arg1"
1001            ):
1002                # A frame pointer or first arg register will do as a
1003                # register to modify temporarily.
1004                alternative_register_index = reg_info["lldb_register_index"]
1005
1006        # We didn't find a preferred register.  Return whatever alternative register
1007        # we found, if any.
1008        return alternative_register_index
1009
1010    def extract_registers_from_stop_notification(self, stop_key_vals_text):
1011        self.assertIsNotNone(stop_key_vals_text)
1012        kv_dict = self.parse_key_val_dict(stop_key_vals_text)
1013
1014        registers = {}
1015        for key, val in list(kv_dict.items()):
1016            if re.match(r"^[0-9a-fA-F]+$", key):
1017                registers[int(key, 16)] = val
1018        return registers
1019
1020    def gather_register_infos(self):
1021        self.reset_test_sequence()
1022        self.add_register_info_collection_packets()
1023
1024        context = self.expect_gdbremote_sequence()
1025        self.assertIsNotNone(context)
1026
1027        reg_infos = self.parse_register_info_packets(context)
1028        self.assertIsNotNone(reg_infos)
1029        self.add_lldb_register_index(reg_infos)
1030
1031        return reg_infos
1032
1033    def find_generic_register_with_name(self, reg_infos, generic_name):
1034        self.assertIsNotNone(reg_infos)
1035        for reg_info in reg_infos:
1036            if ("generic" in reg_info) and (reg_info["generic"] == generic_name):
1037                return reg_info
1038        return None
1039
1040    def find_register_with_name_and_dwarf_regnum(self, reg_infos, name, dwarf_num):
1041        self.assertIsNotNone(reg_infos)
1042        for reg_info in reg_infos:
1043            if (reg_info["name"] == name) and (reg_info["dwarf"] == dwarf_num):
1044                return reg_info
1045        return None
1046
1047    def decode_gdbremote_binary(self, encoded_bytes):
1048        decoded_bytes = ""
1049        i = 0
1050        while i < len(encoded_bytes):
1051            if encoded_bytes[i] == "}":
1052                # Handle escaped char.
1053                self.assertTrue(i + 1 < len(encoded_bytes))
1054                decoded_bytes += chr(ord(encoded_bytes[i + 1]) ^ 0x20)
1055                i += 2
1056            elif encoded_bytes[i] == "*":
1057                # Handle run length encoding.
1058                self.assertTrue(len(decoded_bytes) > 0)
1059                self.assertTrue(i + 1 < len(encoded_bytes))
1060                repeat_count = ord(encoded_bytes[i + 1]) - 29
1061                decoded_bytes += decoded_bytes[-1] * repeat_count
1062                i += 2
1063            else:
1064                decoded_bytes += encoded_bytes[i]
1065                i += 1
1066        return decoded_bytes
1067
1068    def build_auxv_dict(self, endian, word_size, auxv_data):
1069        self.assertIsNotNone(endian)
1070        self.assertIsNotNone(word_size)
1071        self.assertIsNotNone(auxv_data)
1072
1073        auxv_dict = {}
1074
1075        # PowerPC64le's auxvec has a special key that must be ignored.
1076        # This special key may be used multiple times, resulting in
1077        # multiple key/value pairs with the same key, which would otherwise
1078        # break this test check for repeated keys.
1079        #
1080        # AT_IGNOREPPC = 22
1081        ignored_keys_for_arch = {"powerpc64le": [22]}
1082        arch = self.getArchitecture()
1083        ignore_keys = None
1084        if arch in ignored_keys_for_arch:
1085            ignore_keys = ignored_keys_for_arch[arch]
1086
1087        while len(auxv_data) > 0:
1088            # Chop off key.
1089            raw_key = auxv_data[:word_size]
1090            auxv_data = auxv_data[word_size:]
1091
1092            # Chop of value.
1093            raw_value = auxv_data[:word_size]
1094            auxv_data = auxv_data[word_size:]
1095
1096            # Convert raw text from target endian.
1097            key = unpack_endian_binary_string(endian, raw_key)
1098            value = unpack_endian_binary_string(endian, raw_value)
1099
1100            if ignore_keys and key in ignore_keys:
1101                continue
1102
1103            # Handle ending entry.
1104            if key == 0:
1105                self.assertEqual(value, 0)
1106                return auxv_dict
1107
1108            # The key should not already be present.
1109            self.assertFalse(key in auxv_dict)
1110            auxv_dict[key] = value
1111
1112        self.fail(
1113            "should not reach here - implies required double zero entry not found"
1114        )
1115        return auxv_dict
1116
1117    def read_binary_data_in_chunks(self, command_prefix, chunk_length):
1118        """Collect command_prefix{offset:x},{chunk_length:x} until a single 'l' or 'l' with data is returned."""
1119        offset = 0
1120        done = False
1121        decoded_data = ""
1122
1123        while not done:
1124            # Grab the next iteration of data.
1125            self.reset_test_sequence()
1126            self.test_sequence.add_log_lines(
1127                [
1128                    "read packet: ${}{:x},{:x}:#00".format(
1129                        command_prefix, offset, chunk_length
1130                    ),
1131                    {
1132                        "direction": "send",
1133                        "regex": re.compile(
1134                            r"^\$([^E])(.*)#[0-9a-fA-F]{2}$", re.MULTILINE | re.DOTALL
1135                        ),
1136                        "capture": {1: "response_type", 2: "content_raw"},
1137                    },
1138                ],
1139                True,
1140            )
1141
1142            context = self.expect_gdbremote_sequence()
1143            self.assertIsNotNone(context)
1144
1145            response_type = context.get("response_type")
1146            self.assertIsNotNone(response_type)
1147            self.assertTrue(response_type in ["l", "m"])
1148
1149            # Move offset along.
1150            offset += chunk_length
1151
1152            # Figure out if we're done.  We're done if the response type is l.
1153            done = response_type == "l"
1154
1155            # Decode binary data.
1156            content_raw = context.get("content_raw")
1157            if content_raw and len(content_raw) > 0:
1158                self.assertIsNotNone(content_raw)
1159                decoded_data += self.decode_gdbremote_binary(content_raw)
1160        return decoded_data
1161
1162    def add_interrupt_packets(self):
1163        self.test_sequence.add_log_lines(
1164            [
1165                # Send the intterupt.
1166                "read packet: {}".format(chr(3)),
1167                # And wait for the stop notification.
1168                {
1169                    "direction": "send",
1170                    "regex": r"^\$T([0-9a-fA-F]{2})(.*)#[0-9a-fA-F]{2}$",
1171                    "capture": {1: "stop_signo", 2: "stop_key_val_text"},
1172                },
1173            ],
1174            True,
1175        )
1176
1177    def parse_interrupt_packets(self, context):
1178        self.assertIsNotNone(context.get("stop_signo"))
1179        self.assertIsNotNone(context.get("stop_key_val_text"))
1180        return (
1181            int(context["stop_signo"], 16),
1182            self.parse_key_val_dict(context["stop_key_val_text"]),
1183        )
1184
1185    def add_QSaveRegisterState_packets(self, thread_id):
1186        if thread_id:
1187            # Use the thread suffix form.
1188            request = "read packet: $QSaveRegisterState;thread:{:x}#00".format(
1189                thread_id
1190            )
1191        else:
1192            request = "read packet: $QSaveRegisterState#00"
1193
1194        self.test_sequence.add_log_lines(
1195            [
1196                request,
1197                {
1198                    "direction": "send",
1199                    "regex": r"^\$(E?.*)#[0-9a-fA-F]{2}$",
1200                    "capture": {1: "save_response"},
1201                },
1202            ],
1203            True,
1204        )
1205
1206    def parse_QSaveRegisterState_response(self, context):
1207        self.assertIsNotNone(context)
1208
1209        save_response = context.get("save_response")
1210        self.assertIsNotNone(save_response)
1211
1212        if len(save_response) < 1 or save_response[0] == "E":
1213            # error received
1214            return (False, None)
1215        else:
1216            return (True, int(save_response))
1217
1218    def add_QRestoreRegisterState_packets(self, save_id, thread_id=None):
1219        if thread_id:
1220            # Use the thread suffix form.
1221            request = "read packet: $QRestoreRegisterState:{};thread:{:x}#00".format(
1222                save_id, thread_id
1223            )
1224        else:
1225            request = "read packet: $QRestoreRegisterState:{}#00".format(save_id)
1226
1227        self.test_sequence.add_log_lines([request, "send packet: $OK#00"], True)
1228
1229    def flip_all_bits_in_each_register_value(self, reg_infos, endian, thread_id=None):
1230        self.assertIsNotNone(reg_infos)
1231
1232        successful_writes = 0
1233        failed_writes = 0
1234
1235        for reg_info in reg_infos:
1236            # Use the lldb register index added to the reg info.  We're not necessarily
1237            # working off a full set of register infos, so an inferred register
1238            # index could be wrong.
1239            reg_index = reg_info["lldb_register_index"]
1240            self.assertIsNotNone(reg_index)
1241
1242            reg_byte_size = int(reg_info["bitsize"]) // 8
1243            self.assertTrue(reg_byte_size > 0)
1244
1245            # Handle thread suffix.
1246            if thread_id:
1247                p_request = "read packet: $p{:x};thread:{:x}#00".format(
1248                    reg_index, thread_id
1249                )
1250            else:
1251                p_request = "read packet: $p{:x}#00".format(reg_index)
1252
1253            # Read the existing value.
1254            self.reset_test_sequence()
1255            self.test_sequence.add_log_lines(
1256                [
1257                    p_request,
1258                    {
1259                        "direction": "send",
1260                        "regex": r"^\$([0-9a-fA-F]+)#",
1261                        "capture": {1: "p_response"},
1262                    },
1263                ],
1264                True,
1265            )
1266            context = self.expect_gdbremote_sequence()
1267            self.assertIsNotNone(context)
1268
1269            # Verify the response length.
1270            p_response = context.get("p_response")
1271            self.assertIsNotNone(p_response)
1272            initial_reg_value = unpack_register_hex_unsigned(endian, p_response)
1273
1274            # Flip the value by xoring with all 1s
1275            all_one_bits_raw = "ff" * (int(reg_info["bitsize"]) // 8)
1276            flipped_bits_int = initial_reg_value ^ int(all_one_bits_raw, 16)
1277            # print("reg (index={}, name={}): val={}, flipped bits (int={}, hex={:x})".format(reg_index, reg_info["name"], initial_reg_value, flipped_bits_int, flipped_bits_int))
1278
1279            # Handle thread suffix for P.
1280            if thread_id:
1281                P_request = "read packet: $P{:x}={};thread:{:x}#00".format(
1282                    reg_index,
1283                    pack_register_hex(
1284                        endian, flipped_bits_int, byte_size=reg_byte_size
1285                    ),
1286                    thread_id,
1287                )
1288            else:
1289                P_request = "read packet: $P{:x}={}#00".format(
1290                    reg_index,
1291                    pack_register_hex(
1292                        endian, flipped_bits_int, byte_size=reg_byte_size
1293                    ),
1294                )
1295
1296            # Write the flipped value to the register.
1297            self.reset_test_sequence()
1298            self.test_sequence.add_log_lines(
1299                [
1300                    P_request,
1301                    {
1302                        "direction": "send",
1303                        "regex": r"^\$(OK|E[0-9a-fA-F]+)#[0-9a-fA-F]{2}",
1304                        "capture": {1: "P_response"},
1305                    },
1306                ],
1307                True,
1308            )
1309            context = self.expect_gdbremote_sequence()
1310            self.assertIsNotNone(context)
1311
1312            # Determine if the write succeeded.  There are a handful of registers that can fail, or partially fail
1313            # (e.g. flags, segment selectors, etc.) due to register value restrictions.  Don't worry about them
1314            # all flipping perfectly.
1315            P_response = context.get("P_response")
1316            self.assertIsNotNone(P_response)
1317            if P_response == "OK":
1318                successful_writes += 1
1319            else:
1320                failed_writes += 1
1321                # print("reg (index={}, name={}) write FAILED (error: {})".format(reg_index, reg_info["name"], P_response))
1322
1323            # Read back the register value, ensure it matches the flipped
1324            # value.
1325            if P_response == "OK":
1326                self.reset_test_sequence()
1327                self.test_sequence.add_log_lines(
1328                    [
1329                        p_request,
1330                        {
1331                            "direction": "send",
1332                            "regex": r"^\$([0-9a-fA-F]+)#",
1333                            "capture": {1: "p_response"},
1334                        },
1335                    ],
1336                    True,
1337                )
1338                context = self.expect_gdbremote_sequence()
1339                self.assertIsNotNone(context)
1340
1341                verify_p_response_raw = context.get("p_response")
1342                self.assertIsNotNone(verify_p_response_raw)
1343                verify_bits = unpack_register_hex_unsigned(
1344                    endian, verify_p_response_raw
1345                )
1346
1347                if verify_bits != flipped_bits_int:
1348                    # Some registers, like mxcsrmask and others, will permute what's written.  Adjust succeed/fail counts.
1349                    # print("reg (index={}, name={}): read verify FAILED: wrote {:x}, verify read back {:x}".format(reg_index, reg_info["name"], flipped_bits_int, verify_bits))
1350                    successful_writes -= 1
1351                    failed_writes += 1
1352
1353        return (successful_writes, failed_writes)
1354
1355    def is_bit_flippable_register(self, reg_info):
1356        if not reg_info:
1357            return False
1358        if not "set" in reg_info:
1359            return False
1360        if reg_info["set"] != "General Purpose Registers":
1361            return False
1362        if ("container-regs" in reg_info) and (len(reg_info["container-regs"]) > 0):
1363            # Don't try to bit flip registers contained in another register.
1364            return False
1365        if re.match("^.s$", reg_info["name"]):
1366            # This is a 2-letter register name that ends in "s", like a segment register.
1367            # Don't try to bit flip these.
1368            return False
1369        if re.match("^(c|)psr$", reg_info["name"]):
1370            # This is an ARM program status register; don't flip it.
1371            return False
1372        # Okay, this looks fine-enough.
1373        return True
1374
1375    def read_register_values(self, reg_infos, endian, thread_id=None):
1376        self.assertIsNotNone(reg_infos)
1377        values = {}
1378
1379        for reg_info in reg_infos:
1380            # We append a register index when load reg infos so we can work
1381            # with subsets.
1382            reg_index = reg_info.get("lldb_register_index")
1383            self.assertIsNotNone(reg_index)
1384
1385            # Handle thread suffix.
1386            if thread_id:
1387                p_request = "read packet: $p{:x};thread:{:x}#00".format(
1388                    reg_index, thread_id
1389                )
1390            else:
1391                p_request = "read packet: $p{:x}#00".format(reg_index)
1392
1393            # Read it with p.
1394            self.reset_test_sequence()
1395            self.test_sequence.add_log_lines(
1396                [
1397                    p_request,
1398                    {
1399                        "direction": "send",
1400                        "regex": r"^\$([0-9a-fA-F]+)#",
1401                        "capture": {1: "p_response"},
1402                    },
1403                ],
1404                True,
1405            )
1406            context = self.expect_gdbremote_sequence()
1407            self.assertIsNotNone(context)
1408
1409            # Convert value from target endian to integral.
1410            p_response = context.get("p_response")
1411            self.assertIsNotNone(p_response)
1412            self.assertTrue(len(p_response) > 0)
1413
1414            # on x86 Darwin, 4 GPR registers are often
1415            # unavailable, this is expected and correct.
1416            if (
1417                self.getArchitecture() == "x86_64"
1418                and self.platformIsDarwin()
1419                and p_response[0] == "E"
1420            ):
1421                values[reg_index] = 0
1422            else:
1423                self.assertFalse(p_response[0] == "E")
1424
1425            values[reg_index] = unpack_register_hex_unsigned(endian, p_response)
1426
1427        return values
1428
1429    def add_vCont_query_packets(self):
1430        self.test_sequence.add_log_lines(
1431            [
1432                "read packet: $vCont?#49",
1433                {
1434                    "direction": "send",
1435                    "regex": r"^\$(vCont)?(.*)#[0-9a-fA-F]{2}$",
1436                    "capture": {2: "vCont_query_response"},
1437                },
1438            ],
1439            True,
1440        )
1441
1442    def parse_vCont_query_response(self, context):
1443        self.assertIsNotNone(context)
1444        vCont_query_response = context.get("vCont_query_response")
1445
1446        # Handle case of no vCont support at all - in which case the capture
1447        # group will be none or zero length.
1448        if not vCont_query_response or len(vCont_query_response) == 0:
1449            return {}
1450
1451        return {
1452            key: 1 for key in vCont_query_response.split(";") if key and len(key) > 0
1453        }
1454
1455    def count_single_steps_until_true(
1456        self,
1457        thread_id,
1458        predicate,
1459        args,
1460        max_step_count=100,
1461        use_Hc_packet=True,
1462        step_instruction="s",
1463    ):
1464        """Used by single step test that appears in a few different contexts."""
1465        single_step_count = 0
1466
1467        while single_step_count < max_step_count:
1468            self.assertIsNotNone(thread_id)
1469
1470            # Build the packet for the single step instruction.  We replace
1471            # {thread}, if present, with the thread_id.
1472            step_packet = "read packet: ${}#00".format(
1473                re.sub(r"{thread}", "{:x}".format(thread_id), step_instruction)
1474            )
1475            # print("\nstep_packet created: {}\n".format(step_packet))
1476
1477            # Single step.
1478            self.reset_test_sequence()
1479            if use_Hc_packet:
1480                self.test_sequence.add_log_lines(
1481                    [  # Set the continue thread.
1482                        "read packet: $Hc{0:x}#00".format(thread_id),
1483                        "send packet: $OK#00",
1484                    ],
1485                    True,
1486                )
1487            self.test_sequence.add_log_lines(
1488                [
1489                    # Single step.
1490                    step_packet,
1491                    # "read packet: $vCont;s:{0:x}#00".format(thread_id),
1492                    # Expect a breakpoint stop report.
1493                    {
1494                        "direction": "send",
1495                        "regex": r"^\$T([0-9a-fA-F]{2})thread:([0-9a-fA-F]+);",
1496                        "capture": {1: "stop_signo", 2: "stop_thread_id"},
1497                    },
1498                ],
1499                True,
1500            )
1501            context = self.expect_gdbremote_sequence()
1502            self.assertIsNotNone(context)
1503            self.assertIsNotNone(context.get("stop_signo"))
1504            self.assertEqual(
1505                int(context.get("stop_signo"), 16),
1506                lldbutil.get_signal_number("SIGTRAP"),
1507            )
1508
1509            single_step_count += 1
1510
1511            # See if the predicate is true.  If so, we're done.
1512            if predicate(args):
1513                return (True, single_step_count)
1514
1515        # The predicate didn't return true within the runaway step count.
1516        return (False, single_step_count)
1517
1518    def g_c1_c2_contents_are(self, args):
1519        """Used by single step test that appears in a few different contexts."""
1520        g_c1_address = args["g_c1_address"]
1521        g_c2_address = args["g_c2_address"]
1522        expected_g_c1 = args["expected_g_c1"]
1523        expected_g_c2 = args["expected_g_c2"]
1524
1525        # Read g_c1 and g_c2 contents.
1526        self.reset_test_sequence()
1527        self.test_sequence.add_log_lines(
1528            [
1529                "read packet: $m{0:x},{1:x}#00".format(g_c1_address, 1),
1530                {
1531                    "direction": "send",
1532                    "regex": r"^\$(.+)#[0-9a-fA-F]{2}$",
1533                    "capture": {1: "g_c1_contents"},
1534                },
1535                "read packet: $m{0:x},{1:x}#00".format(g_c2_address, 1),
1536                {
1537                    "direction": "send",
1538                    "regex": r"^\$(.+)#[0-9a-fA-F]{2}$",
1539                    "capture": {1: "g_c2_contents"},
1540                },
1541            ],
1542            True,
1543        )
1544
1545        # Run the packet stream.
1546        context = self.expect_gdbremote_sequence()
1547        self.assertIsNotNone(context)
1548
1549        # Check if what we read from inferior memory is what we are expecting.
1550        self.assertIsNotNone(context.get("g_c1_contents"))
1551        self.assertIsNotNone(context.get("g_c2_contents"))
1552
1553        return (seven.unhexlify(context.get("g_c1_contents")) == expected_g_c1) and (
1554            seven.unhexlify(context.get("g_c2_contents")) == expected_g_c2
1555        )
1556
1557    def single_step_only_steps_one_instruction(
1558        self, use_Hc_packet=True, step_instruction="s"
1559    ):
1560        """Used by single step test that appears in a few different contexts."""
1561        # Start up the inferior.
1562        procs = self.prep_debug_monitor_and_inferior(
1563            inferior_args=[
1564                "get-code-address-hex:swap_chars",
1565                "get-data-address-hex:g_c1",
1566                "get-data-address-hex:g_c2",
1567                "sleep:1",
1568                "call-function:swap_chars",
1569                "sleep:5",
1570            ]
1571        )
1572
1573        # Run the process
1574        self.test_sequence.add_log_lines(
1575            [  # Start running after initial stop.
1576                "read packet: $c#63",
1577                # Match output line that prints the memory address of the function call entry point.
1578                # Note we require launch-only testing so we can get inferior otuput.
1579                {
1580                    "type": "output_match",
1581                    "regex": r"^code address: 0x([0-9a-fA-F]+)\r\ndata address: 0x([0-9a-fA-F]+)\r\ndata address: 0x([0-9a-fA-F]+)\r\n$",
1582                    "capture": {
1583                        1: "function_address",
1584                        2: "g_c1_address",
1585                        3: "g_c2_address",
1586                    },
1587                },
1588                # Now stop the inferior.
1589                "read packet: {}".format(chr(3)),
1590                # And wait for the stop notification.
1591                {
1592                    "direction": "send",
1593                    "regex": r"^\$T([0-9a-fA-F]{2})thread:([0-9a-fA-F]+);",
1594                    "capture": {1: "stop_signo", 2: "stop_thread_id"},
1595                },
1596            ],
1597            True,
1598        )
1599
1600        # Run the packet stream.
1601        context = self.expect_gdbremote_sequence()
1602        self.assertIsNotNone(context)
1603
1604        # Grab the main thread id.
1605        self.assertIsNotNone(context.get("stop_thread_id"))
1606        main_thread_id = int(context.get("stop_thread_id"), 16)
1607
1608        # Grab the function address.
1609        self.assertIsNotNone(context.get("function_address"))
1610        function_address = int(context.get("function_address"), 16)
1611
1612        # Grab the data addresses.
1613        self.assertIsNotNone(context.get("g_c1_address"))
1614        g_c1_address = int(context.get("g_c1_address"), 16)
1615
1616        self.assertIsNotNone(context.get("g_c2_address"))
1617        g_c2_address = int(context.get("g_c2_address"), 16)
1618
1619        # Set a breakpoint at the given address.
1620        if self.getArchitecture().startswith("arm"):
1621            # TODO: Handle case when setting breakpoint in thumb code
1622            BREAKPOINT_KIND = 4
1623        else:
1624            BREAKPOINT_KIND = 1
1625        self.reset_test_sequence()
1626        self.add_set_breakpoint_packets(
1627            function_address, do_continue=True, breakpoint_kind=BREAKPOINT_KIND
1628        )
1629        context = self.expect_gdbremote_sequence()
1630        self.assertIsNotNone(context)
1631
1632        # Remove the breakpoint.
1633        self.reset_test_sequence()
1634        self.add_remove_breakpoint_packets(
1635            function_address, breakpoint_kind=BREAKPOINT_KIND
1636        )
1637        context = self.expect_gdbremote_sequence()
1638        self.assertIsNotNone(context)
1639
1640        # Verify g_c1 and g_c2 match expected initial state.
1641        args = {}
1642        args["g_c1_address"] = g_c1_address
1643        args["g_c2_address"] = g_c2_address
1644        args["expected_g_c1"] = "0"
1645        args["expected_g_c2"] = "1"
1646
1647        self.assertTrue(self.g_c1_c2_contents_are(args))
1648
1649        # Verify we take only a small number of steps to hit the first state.
1650        # Might need to work through function entry prologue code.
1651        args["expected_g_c1"] = "1"
1652        args["expected_g_c2"] = "1"
1653        (state_reached, step_count) = self.count_single_steps_until_true(
1654            main_thread_id,
1655            self.g_c1_c2_contents_are,
1656            args,
1657            max_step_count=25,
1658            use_Hc_packet=use_Hc_packet,
1659            step_instruction=step_instruction,
1660        )
1661        self.assertTrue(state_reached)
1662
1663        # Verify we hit the next state.
1664        args["expected_g_c1"] = "1"
1665        args["expected_g_c2"] = "0"
1666        (state_reached, step_count) = self.count_single_steps_until_true(
1667            main_thread_id,
1668            self.g_c1_c2_contents_are,
1669            args,
1670            max_step_count=5,
1671            use_Hc_packet=use_Hc_packet,
1672            step_instruction=step_instruction,
1673        )
1674        self.assertTrue(state_reached)
1675        expected_step_count = 1
1676        arch = self.getArchitecture()
1677
1678        # MIPS required "3" (ADDIU, SB, LD) machine instructions for updation
1679        # of variable value
1680        if re.match("mips", arch):
1681            expected_step_count = 3
1682        # S390X requires "2" (LARL, MVI) machine instructions for updation of
1683        # variable value
1684        if re.match("s390x", arch):
1685            expected_step_count = 2
1686        # ARM64 requires "4" instructions: 2 to compute the address (adrp,
1687        # add), one to materialize the constant (mov) and the store. Once
1688        # addresses and constants are materialized, only one instruction is
1689        # needed.
1690        if re.match("arm64", arch):
1691            before_materialization_step_count = 4
1692            after_matrialization_step_count = 1
1693            self.assertIn(
1694                step_count,
1695                [before_materialization_step_count, after_matrialization_step_count],
1696            )
1697            expected_step_count = after_matrialization_step_count
1698        else:
1699            self.assertEqual(step_count, expected_step_count)
1700
1701        # Verify we hit the next state.
1702        args["expected_g_c1"] = "0"
1703        args["expected_g_c2"] = "0"
1704        (state_reached, step_count) = self.count_single_steps_until_true(
1705            main_thread_id,
1706            self.g_c1_c2_contents_are,
1707            args,
1708            max_step_count=5,
1709            use_Hc_packet=use_Hc_packet,
1710            step_instruction=step_instruction,
1711        )
1712        self.assertTrue(state_reached)
1713        self.assertEqual(step_count, expected_step_count)
1714
1715        # Verify we hit the next state.
1716        args["expected_g_c1"] = "0"
1717        args["expected_g_c2"] = "1"
1718        (state_reached, step_count) = self.count_single_steps_until_true(
1719            main_thread_id,
1720            self.g_c1_c2_contents_are,
1721            args,
1722            max_step_count=5,
1723            use_Hc_packet=use_Hc_packet,
1724            step_instruction=step_instruction,
1725        )
1726        self.assertTrue(state_reached)
1727        self.assertEqual(step_count, expected_step_count)
1728
1729    def maybe_strict_output_regex(self, regex):
1730        return (
1731            ".*" + regex + ".*"
1732            if lldbplatformutil.hasChattyStderr(self)
1733            else "^" + regex + "$"
1734        )
1735
1736    def install_and_create_launch_args(self):
1737        exe_path = self.getBuildArtifact("a.out")
1738        if not lldb.remote_platform:
1739            return [exe_path]
1740        remote_path = lldbutil.append_to_process_working_directory(
1741            self, os.path.basename(exe_path)
1742        )
1743        remote_file_spec = lldb.SBFileSpec(remote_path, False)
1744        err = lldb.remote_platform.Install(
1745            lldb.SBFileSpec(exe_path, True), remote_file_spec
1746        )
1747        if err.Fail():
1748            raise Exception(
1749                "remote_platform.Install('%s', '%s') failed: %s"
1750                % (exe_path, remote_path, err)
1751            )
1752        return [remote_path]
1753