xref: /llvm-project/lldb/test/API/tools/lldb-server/attach-wait/TestGdbRemoteAttachWait.py (revision d38ea8c4c84be9496249098053599c24b87f1376)
1import os
2from time import sleep
3
4import gdbremote_testcase
5import lldbgdbserverutils
6from lldbsuite.test.decorators import *
7from lldbsuite.test.lldbtest import *
8from lldbsuite.test import lldbutil
9
10
11class TestGdbRemoteAttachWait(gdbremote_testcase.GdbRemoteTestCaseBase):
12    def _set_up_inferior(self):
13        self._exe_to_attach = "%s_%d" % (self.testMethodName, os.getpid())
14        self.build(dictionary={"EXE": self._exe_to_attach, "CXX_SOURCES": "main.cpp"})
15
16        if self.getPlatform() != "windows":
17            # Use a shim to ensure that the process is ready to be attached from
18            # the get-go.
19            self._exe_to_run = "shim"
20            self._exe_to_attach = lldbutil.install_to_target(
21                self, self.getBuildArtifact(self._exe_to_attach)
22            )
23            self._run_args = [self._exe_to_attach]
24            self.build(dictionary={"EXE": self._exe_to_run, "CXX_SOURCES": "shim.cpp"})
25        else:
26            self._exe_to_run = self._exe_to_attach
27            self._run_args = []
28
29    def _launch_inferior(self, args):
30        inferior = self.spawnSubprocess(self.getBuildArtifact(self._exe_to_run), args)
31        self.assertIsNotNone(inferior)
32        self.assertGreater(inferior.pid, 0)
33        self.assertTrue(lldbgdbserverutils.process_is_running(inferior.pid, True))
34        return inferior
35
36    def _launch_and_wait_for_init(self):
37        sync_file_path = lldbutil.append_to_process_working_directory(
38            self, "process_ready"
39        )
40        inferior = self._launch_inferior(self._run_args + [sync_file_path])
41        lldbutil.wait_for_file_on_target(self, sync_file_path)
42        return inferior
43
44    def _attach_packet(self, packet_type):
45        return "read packet: ${};{}#00".format(
46            packet_type,
47            lldbgdbserverutils.gdbremote_hex_encode_string(self._exe_to_attach),
48        )
49
50    @skipIfWindows  # This test is flaky on Windows
51    def test_attach_with_vAttachWait(self):
52        self._set_up_inferior()
53
54        self.set_inferior_startup_attach_manually()
55        server = self.connect_to_debug_monitor()
56        self.do_handshake()
57
58        # Launch the first inferior (we shouldn't attach to this one).
59        self._launch_and_wait_for_init()
60
61        self.test_sequence.add_log_lines([self._attach_packet("vAttachWait")], True)
62        # Run the stream until attachWait.
63        context = self.expect_gdbremote_sequence()
64        self.assertIsNotNone(context)
65
66        # Sleep so we're sure that the inferior is launched after we ask for the attach.
67        sleep(1)
68
69        # Launch the second inferior (we SHOULD attach to this one).
70        inferior_to_attach = self._launch_inferior(self._run_args)
71
72        # Make sure the attach succeeded.
73        self.test_sequence.add_log_lines(
74            [
75                {
76                    "direction": "send",
77                    "regex": r"^\$T([0-9a-fA-F]{2})[^#]*#[0-9a-fA-F]{2}$",
78                    "capture": {1: "stop_signal_hex"},
79                },
80            ],
81            True,
82        )
83        self.add_process_info_collection_packets()
84
85        # Run the stream sending the response..
86        context = self.expect_gdbremote_sequence()
87        self.assertIsNotNone(context)
88
89        # Gather process info response.
90        process_info = self.parse_process_info_response(context)
91        self.assertIsNotNone(process_info)
92
93        # Ensure the process id matches what we expected.
94        pid_text = process_info.get("pid", None)
95        self.assertIsNotNone(pid_text)
96        reported_pid = int(pid_text, base=16)
97        self.assertEqual(reported_pid, inferior_to_attach.pid)
98
99    @skipIfWindows  # This test is flaky on Windows
100    def test_launch_before_attach_with_vAttachOrWait(self):
101        self._set_up_inferior()
102
103        self.set_inferior_startup_attach_manually()
104        server = self.connect_to_debug_monitor()
105        self.do_handshake()
106
107        inferior = self._launch_and_wait_for_init()
108
109        # Add attach packets.
110        self.test_sequence.add_log_lines(
111            [
112                # Do the attach.
113                self._attach_packet("vAttachOrWait"),
114                # Expect a stop notification from the attach.
115                {
116                    "direction": "send",
117                    "regex": r"^\$T([0-9a-fA-F]{2})[^#]*#[0-9a-fA-F]{2}$",
118                    "capture": {1: "stop_signal_hex"},
119                },
120            ],
121            True,
122        )
123        self.add_process_info_collection_packets()
124
125        # Run the stream
126        context = self.expect_gdbremote_sequence()
127        self.assertIsNotNone(context)
128
129        # Gather process info response
130        process_info = self.parse_process_info_response(context)
131        self.assertIsNotNone(process_info)
132
133        # Ensure the process id matches what we expected.
134        pid_text = process_info.get("pid", None)
135        self.assertIsNotNone(pid_text)
136        reported_pid = int(pid_text, base=16)
137        self.assertEqual(reported_pid, inferior.pid)
138
139    @skipIfWindows  # This test is flaky on Windows
140    def test_launch_after_attach_with_vAttachOrWait(self):
141        self._set_up_inferior()
142
143        self.set_inferior_startup_attach_manually()
144        server = self.connect_to_debug_monitor()
145        self.do_handshake()
146
147        self.test_sequence.add_log_lines([self._attach_packet("vAttachOrWait")], True)
148        # Run the stream until attachWait.
149        context = self.expect_gdbremote_sequence()
150        self.assertIsNotNone(context)
151
152        # Sleep so we're sure that the inferior is launched after we ask for the attach.
153        sleep(1)
154
155        # Launch the inferior.
156        inferior = self._launch_inferior(self._run_args)
157
158        # Make sure the attach succeeded.
159        self.test_sequence.add_log_lines(
160            [
161                {
162                    "direction": "send",
163                    "regex": r"^\$T([0-9a-fA-F]{2})[^#]*#[0-9a-fA-F]{2}$",
164                    "capture": {1: "stop_signal_hex"},
165                },
166            ],
167            True,
168        )
169        self.add_process_info_collection_packets()
170
171        # Run the stream sending the response..
172        context = self.expect_gdbremote_sequence()
173        self.assertIsNotNone(context)
174
175        # Gather process info response.
176        process_info = self.parse_process_info_response(context)
177        self.assertIsNotNone(process_info)
178
179        # Ensure the process id matches what we expected.
180        pid_text = process_info.get("pid", None)
181        self.assertIsNotNone(pid_text)
182        reported_pid = int(pid_text, base=16)
183        self.assertEqual(reported_pid, inferior.pid)
184