xref: /llvm-project/lldb/test/API/functionalities/gdb_remote_client/TestGDBRemoteClient.py (revision b1099120ff963d0a0f1de12e3315b1ee4e4ed7e7)
1import lldb
2import binascii
3import os.path
4from lldbsuite.test.lldbtest import *
5from lldbsuite.test.decorators import *
6from gdbclientutils import *
7
8
9class TestGDBRemoteClient(GDBRemoteTestBase):
10
11    class gPacketResponder(MockGDBServerResponder):
12        def readRegisters(self):
13            return '0000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000'
14
15    @skipIfReproducer # Packet log is not populated during replay.
16    def test_connect(self):
17        """Test connecting to a remote gdb server"""
18        target = self.createTarget("a.yaml")
19        process = self.connect(target)
20        self.assertPacketLogContains(["qProcessInfo", "qfThreadInfo"])
21
22    @skipIfReproducer # FIXME: Unexpected packet during (active) replay
23    def test_attach_fail(self):
24        error_msg = "mock-error-msg"
25
26        class MyResponder(MockGDBServerResponder):
27            # Pretend we don't have any process during the initial queries.
28            def qC(self):
29                return "E42"
30
31            def qfThreadInfo(self):
32                return "OK" # No threads.
33
34            # Then, when we are asked to attach, error out.
35            def vAttach(self, pid):
36                return "E42;" + binascii.hexlify(error_msg.encode()).decode()
37
38        self.server.responder = MyResponder()
39
40        target = self.dbg.CreateTarget("")
41        process = self.connect(target)
42        lldbutil.expect_state_changes(self, self.dbg.GetListener(), process, [lldb.eStateConnected])
43
44        error = lldb.SBError()
45        target.AttachToProcessWithID(lldb.SBListener(), 47, error)
46        self.assertEquals(error_msg, error.GetCString())
47
48    def test_launch_fail(self):
49        class MyResponder(MockGDBServerResponder):
50            # Pretend we don't have any process during the initial queries.
51            def qC(self):
52                return "E42"
53
54            def qfThreadInfo(self):
55                return "OK" # No threads.
56
57            # Then, when we are asked to attach, error out.
58            def A(self, packet):
59                return "E47"
60
61        self.server.responder = MyResponder()
62
63        target = self.createTarget("a.yaml")
64        process = self.connect(target)
65        lldbutil.expect_state_changes(self, self.dbg.GetListener(), process, [lldb.eStateConnected])
66
67        error = lldb.SBError()
68        target.Launch(lldb.SBListener(), None, None, None, None, None,
69                None, 0, True, error)
70        self.assertEquals("'A' packet returned an error: 71", error.GetCString())
71
72    @skipIfReproducer # Packet log is not populated during replay.
73    def test_read_registers_using_g_packets(self):
74        """Test reading registers using 'g' packets (default behavior)"""
75        self.dbg.HandleCommand(
76                "settings set plugin.process.gdb-remote.use-g-packet-for-reading true")
77        self.addTearDownHook(lambda:
78                self.runCmd("settings set plugin.process.gdb-remote.use-g-packet-for-reading false"))
79        self.server.responder = self.gPacketResponder()
80        target = self.createTarget("a.yaml")
81        process = self.connect(target)
82
83        self.assertEquals(1, self.server.responder.packetLog.count("g"))
84        self.server.responder.packetLog = []
85        self.read_registers(process)
86        # Reading registers should not cause any 'p' packets to be exchanged.
87        self.assertEquals(
88                0, len([p for p in self.server.responder.packetLog if p.startswith("p")]))
89
90    @skipIfReproducer # Packet log is not populated during replay.
91    def test_read_registers_using_p_packets(self):
92        """Test reading registers using 'p' packets"""
93        self.dbg.HandleCommand(
94                "settings set plugin.process.gdb-remote.use-g-packet-for-reading false")
95        target = self.createTarget("a.yaml")
96        process = self.connect(target)
97
98        self.read_registers(process)
99        self.assertNotIn("g", self.server.responder.packetLog)
100        self.assertGreater(
101                len([p for p in self.server.responder.packetLog if p.startswith("p")]), 0)
102
103    @skipIfReproducer # Packet log is not populated during replay.
104    def test_write_registers_using_P_packets(self):
105        """Test writing registers using 'P' packets (default behavior)"""
106        self.server.responder = self.gPacketResponder()
107        target = self.createTarget("a.yaml")
108        process = self.connect(target)
109
110        self.write_registers(process)
111        self.assertEquals(0, len(
112                [p for p in self.server.responder.packetLog if p.startswith("G")]))
113        self.assertGreater(
114                len([p for p in self.server.responder.packetLog if p.startswith("P")]), 0)
115
116    @skipIfReproducer # Packet log is not populated during replay.
117    def test_write_registers_using_G_packets(self):
118        """Test writing registers using 'G' packets"""
119
120        class MyResponder(self.gPacketResponder):
121            def readRegister(self, register):
122                # empty string means unsupported
123                return ""
124
125        self.server.responder = MyResponder()
126        target = self.createTarget("a.yaml")
127        process = self.connect(target)
128
129        self.write_registers(process)
130        self.assertEquals(0, len(
131                [p for p in self.server.responder.packetLog if p.startswith("P")]))
132        self.assertGreater(len(
133                [p for p in self.server.responder.packetLog if p.startswith("G")]), 0)
134
135    def read_registers(self, process):
136        self.for_each_gpr(
137                process, lambda r: self.assertEquals("0x00000000", r.GetValue()))
138
139    def write_registers(self, process):
140        self.for_each_gpr(
141                process, lambda r: r.SetValueFromCString("0x00000000"))
142
143    def for_each_gpr(self, process, operation):
144        registers = process.GetThreadAtIndex(0).GetFrameAtIndex(0).GetRegisters()
145        self.assertGreater(registers.GetSize(), 0)
146        regSet = registers[0]
147        numChildren = regSet.GetNumChildren()
148        self.assertGreater(numChildren, 0)
149        for i in range(numChildren):
150            operation(regSet.GetChildAtIndex(i))
151
152    def test_launch_A(self):
153        class MyResponder(MockGDBServerResponder):
154            def __init__(self, *args, **kwargs):
155                self.started = False
156                return super().__init__(*args, **kwargs)
157
158            def qC(self):
159                if self.started:
160                    return "QCp10.10"
161                else:
162                    return "E42"
163
164            def qfThreadInfo(self):
165                if self.started:
166                    return "mp10.10"
167                else:
168                   return "E42"
169
170            def qsThreadInfo(self):
171                return "l"
172
173            def A(self, packet):
174                self.started = True
175                return "OK"
176
177            def qLaunchSuccess(self):
178                if self.started:
179                    return "OK"
180                return "E42"
181
182        self.server.responder = MyResponder()
183
184        target = self.createTarget("a.yaml")
185        # NB: apparently GDB packets are using "/" on Windows too
186        exe_path = self.getBuildArtifact("a").replace(os.path.sep, '/')
187        exe_hex = binascii.b2a_hex(exe_path.encode()).decode()
188        process = self.connect(target)
189        lldbutil.expect_state_changes(self, self.dbg.GetListener(), process,
190                                      [lldb.eStateConnected])
191
192        target.Launch(lldb.SBListener(),
193                      ["arg1", "arg2", "arg3"],  # argv
194                      [],  # envp
195                      None,  # stdin_path
196                      None,  # stdout_path
197                      None,  # stderr_path
198                      None,  # working_directory
199                      0,  # launch_flags
200                      True,  # stop_at_entry
201                      lldb.SBError())  # error
202        self.assertTrue(process, PROCESS_IS_VALID)
203        self.assertEqual(process.GetProcessID(), 16)
204
205        self.assertPacketLogContains([
206          "A%d,0,%s,8,1,61726731,8,2,61726732,8,3,61726733" % (
207              len(exe_hex), exe_hex),
208        ])
209
210    def test_launch_vRun(self):
211        class MyResponder(MockGDBServerResponder):
212            def __init__(self, *args, **kwargs):
213                self.started = False
214                return super().__init__(*args, **kwargs)
215
216            def qC(self):
217                if self.started:
218                    return "QCp10.10"
219                else:
220                    return "E42"
221
222            def qfThreadInfo(self):
223                if self.started:
224                    return "mp10.10"
225                else:
226                   return "E42"
227
228            def qsThreadInfo(self):
229                return "l"
230
231            def vRun(self, packet):
232                self.started = True
233                return "T13"
234
235            def A(self, packet):
236                return "E28"
237
238        self.server.responder = MyResponder()
239
240        target = self.createTarget("a.yaml")
241        # NB: apparently GDB packets are using "/" on Windows too
242        exe_path = self.getBuildArtifact("a").replace(os.path.sep, '/')
243        exe_hex = binascii.b2a_hex(exe_path.encode()).decode()
244        process = self.connect(target)
245        lldbutil.expect_state_changes(self, self.dbg.GetListener(), process,
246                                      [lldb.eStateConnected])
247
248        process = target.Launch(lldb.SBListener(),
249                                ["arg1", "arg2", "arg3"],  # argv
250                                [],  # envp
251                                None,  # stdin_path
252                                None,  # stdout_path
253                                None,  # stderr_path
254                                None,  # working_directory
255                                0,  # launch_flags
256                                True,  # stop_at_entry
257                                lldb.SBError())  # error
258        self.assertTrue(process, PROCESS_IS_VALID)
259        self.assertEqual(process.GetProcessID(), 16)
260
261        self.assertPacketLogContains([
262          "vRun;%s;61726731;61726732;61726733" % (exe_hex,)
263        ])
264
265    def test_launch_QEnvironment(self):
266        class MyResponder(MockGDBServerResponder):
267            def qC(self):
268                return "E42"
269
270            def qfThreadInfo(self):
271               return "E42"
272
273            def vRun(self, packet):
274                self.started = True
275                return "E28"
276
277        self.server.responder = MyResponder()
278
279        target = self.createTarget("a.yaml")
280        process = self.connect(target)
281        lldbutil.expect_state_changes(self, self.dbg.GetListener(), process,
282                                      [lldb.eStateConnected])
283
284        target.Launch(lldb.SBListener(),
285                      [],  # argv
286                      ["PLAIN=foo",
287                       "NEEDSENC=frob$",
288                       "NEEDSENC2=fr*ob",
289                       "NEEDSENC3=fro}b",
290                       "NEEDSENC4=f#rob",
291                       "EQUALS=foo=bar",
292                       ],  # envp
293                      None,  # stdin_path
294                      None,  # stdout_path
295                      None,  # stderr_path
296                      None,  # working_directory
297                      0,  # launch_flags
298                      True,  # stop_at_entry
299                      lldb.SBError())  # error
300
301        self.assertPacketLogContains([
302          "QEnvironment:PLAIN=foo",
303          "QEnvironmentHexEncoded:4e45454453454e433d66726f6224",
304          "QEnvironmentHexEncoded:4e45454453454e43323d66722a6f62",
305          "QEnvironmentHexEncoded:4e45454453454e43333d66726f7d62",
306          "QEnvironmentHexEncoded:4e45454453454e43343d6623726f62",
307          "QEnvironment:EQUALS=foo=bar",
308        ])
309
310    def test_launch_QEnvironmentHexEncoded_only(self):
311        class MyResponder(MockGDBServerResponder):
312            def qC(self):
313                return "E42"
314
315            def qfThreadInfo(self):
316               return "E42"
317
318            def vRun(self, packet):
319                self.started = True
320                return "E28"
321
322            def QEnvironment(self, packet):
323                return ""
324
325        self.server.responder = MyResponder()
326
327        target = self.createTarget("a.yaml")
328        process = self.connect(target)
329        lldbutil.expect_state_changes(self, self.dbg.GetListener(), process,
330                                      [lldb.eStateConnected])
331
332        target.Launch(lldb.SBListener(),
333                      [],  # argv
334                      ["PLAIN=foo",
335                       "NEEDSENC=frob$",
336                       "NEEDSENC2=fr*ob",
337                       "NEEDSENC3=fro}b",
338                       "NEEDSENC4=f#rob",
339                       "EQUALS=foo=bar",
340                       ],  # envp
341                      None,  # stdin_path
342                      None,  # stdout_path
343                      None,  # stderr_path
344                      None,  # working_directory
345                      0,  # launch_flags
346                      True,  # stop_at_entry
347                      lldb.SBError())  # error
348
349        self.assertPacketLogContains([
350          "QEnvironmentHexEncoded:504c41494e3d666f6f",
351          "QEnvironmentHexEncoded:4e45454453454e433d66726f6224",
352          "QEnvironmentHexEncoded:4e45454453454e43323d66722a6f62",
353          "QEnvironmentHexEncoded:4e45454453454e43333d66726f7d62",
354          "QEnvironmentHexEncoded:4e45454453454e43343d6623726f62",
355          "QEnvironmentHexEncoded:455155414c533d666f6f3d626172",
356        ])
357
358    def test_detach_no_multiprocess(self):
359        class MyResponder(MockGDBServerResponder):
360            def __init__(self):
361                super().__init__()
362                self.detached = None
363
364            def qfThreadInfo(self):
365                return "10200"
366
367            def D(self, packet):
368                self.detached = packet
369                return "OK"
370
371        self.server.responder = MyResponder()
372        target = self.dbg.CreateTarget('')
373        process = self.connect(target)
374        process.Detach()
375        self.assertEqual(self.server.responder.detached, "D")
376
377    def test_detach_pid(self):
378        class MyResponder(MockGDBServerResponder):
379            def __init__(self, test_case):
380                super().__init__()
381                self.test_case = test_case
382                self.detached = None
383
384            def qSupported(self, client_supported):
385                self.test_case.assertIn("multiprocess+", client_supported)
386                return "multiprocess+;" + super().qSupported(client_supported)
387
388            def qfThreadInfo(self):
389                return "mp400.10200"
390
391            def D(self, packet):
392                self.detached = packet
393                return "OK"
394
395        self.server.responder = MyResponder(self)
396        target = self.dbg.CreateTarget('')
397        process = self.connect(target)
398        process.Detach()
399        self.assertRegex(self.server.responder.detached, r"D;0*400")
400