1import lldb 2import binascii 3import os.path 4from lldbsuite.test.lldbtest import * 5from lldbsuite.test.decorators import * 6from gdbclientutils import * 7 8 9class TestGDBRemoteClient(GDBRemoteTestBase): 10 11 class gPacketResponder(MockGDBServerResponder): 12 def readRegisters(self): 13 return '0000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000' 14 15 @skipIfReproducer # Packet log is not populated during replay. 16 def test_connect(self): 17 """Test connecting to a remote gdb server""" 18 target = self.createTarget("a.yaml") 19 process = self.connect(target) 20 self.assertPacketLogContains(["qProcessInfo", "qfThreadInfo"]) 21 22 @skipIfReproducer # FIXME: Unexpected packet during (active) replay 23 def test_attach_fail(self): 24 error_msg = "mock-error-msg" 25 26 class MyResponder(MockGDBServerResponder): 27 # Pretend we don't have any process during the initial queries. 28 def qC(self): 29 return "E42" 30 31 def qfThreadInfo(self): 32 return "OK" # No threads. 33 34 # Then, when we are asked to attach, error out. 35 def vAttach(self, pid): 36 return "E42;" + binascii.hexlify(error_msg.encode()).decode() 37 38 self.server.responder = MyResponder() 39 40 target = self.dbg.CreateTarget("") 41 process = self.connect(target) 42 lldbutil.expect_state_changes(self, self.dbg.GetListener(), process, [lldb.eStateConnected]) 43 44 error = lldb.SBError() 45 target.AttachToProcessWithID(lldb.SBListener(), 47, error) 46 self.assertEquals(error_msg, error.GetCString()) 47 48 def test_launch_fail(self): 49 class MyResponder(MockGDBServerResponder): 50 # Pretend we don't have any process during the initial queries. 51 def qC(self): 52 return "E42" 53 54 def qfThreadInfo(self): 55 return "OK" # No threads. 56 57 # Then, when we are asked to attach, error out. 58 def A(self, packet): 59 return "E47" 60 61 self.server.responder = MyResponder() 62 63 target = self.createTarget("a.yaml") 64 process = self.connect(target) 65 lldbutil.expect_state_changes(self, self.dbg.GetListener(), process, [lldb.eStateConnected]) 66 67 error = lldb.SBError() 68 target.Launch(lldb.SBListener(), None, None, None, None, None, 69 None, 0, True, error) 70 self.assertEquals("'A' packet returned an error: 71", error.GetCString()) 71 72 @skipIfReproducer # Packet log is not populated during replay. 73 def test_read_registers_using_g_packets(self): 74 """Test reading registers using 'g' packets (default behavior)""" 75 self.dbg.HandleCommand( 76 "settings set plugin.process.gdb-remote.use-g-packet-for-reading true") 77 self.addTearDownHook(lambda: 78 self.runCmd("settings set plugin.process.gdb-remote.use-g-packet-for-reading false")) 79 self.server.responder = self.gPacketResponder() 80 target = self.createTarget("a.yaml") 81 process = self.connect(target) 82 83 self.assertEquals(1, self.server.responder.packetLog.count("g")) 84 self.server.responder.packetLog = [] 85 self.read_registers(process) 86 # Reading registers should not cause any 'p' packets to be exchanged. 87 self.assertEquals( 88 0, len([p for p in self.server.responder.packetLog if p.startswith("p")])) 89 90 @skipIfReproducer # Packet log is not populated during replay. 91 def test_read_registers_using_p_packets(self): 92 """Test reading registers using 'p' packets""" 93 self.dbg.HandleCommand( 94 "settings set plugin.process.gdb-remote.use-g-packet-for-reading false") 95 target = self.createTarget("a.yaml") 96 process = self.connect(target) 97 98 self.read_registers(process) 99 self.assertNotIn("g", self.server.responder.packetLog) 100 self.assertGreater( 101 len([p for p in self.server.responder.packetLog if p.startswith("p")]), 0) 102 103 @skipIfReproducer # Packet log is not populated during replay. 104 def test_write_registers_using_P_packets(self): 105 """Test writing registers using 'P' packets (default behavior)""" 106 self.server.responder = self.gPacketResponder() 107 target = self.createTarget("a.yaml") 108 process = self.connect(target) 109 110 self.write_registers(process) 111 self.assertEquals(0, len( 112 [p for p in self.server.responder.packetLog if p.startswith("G")])) 113 self.assertGreater( 114 len([p for p in self.server.responder.packetLog if p.startswith("P")]), 0) 115 116 @skipIfReproducer # Packet log is not populated during replay. 117 def test_write_registers_using_G_packets(self): 118 """Test writing registers using 'G' packets""" 119 120 class MyResponder(self.gPacketResponder): 121 def readRegister(self, register): 122 # empty string means unsupported 123 return "" 124 125 self.server.responder = MyResponder() 126 target = self.createTarget("a.yaml") 127 process = self.connect(target) 128 129 self.write_registers(process) 130 self.assertEquals(0, len( 131 [p for p in self.server.responder.packetLog if p.startswith("P")])) 132 self.assertGreater(len( 133 [p for p in self.server.responder.packetLog if p.startswith("G")]), 0) 134 135 def read_registers(self, process): 136 self.for_each_gpr( 137 process, lambda r: self.assertEquals("0x00000000", r.GetValue())) 138 139 def write_registers(self, process): 140 self.for_each_gpr( 141 process, lambda r: r.SetValueFromCString("0x00000000")) 142 143 def for_each_gpr(self, process, operation): 144 registers = process.GetThreadAtIndex(0).GetFrameAtIndex(0).GetRegisters() 145 self.assertGreater(registers.GetSize(), 0) 146 regSet = registers[0] 147 numChildren = regSet.GetNumChildren() 148 self.assertGreater(numChildren, 0) 149 for i in range(numChildren): 150 operation(regSet.GetChildAtIndex(i)) 151 152 def test_launch_A(self): 153 class MyResponder(MockGDBServerResponder): 154 def __init__(self, *args, **kwargs): 155 self.started = False 156 return super().__init__(*args, **kwargs) 157 158 def qC(self): 159 if self.started: 160 return "QCp10.10" 161 else: 162 return "E42" 163 164 def qfThreadInfo(self): 165 if self.started: 166 return "mp10.10" 167 else: 168 return "E42" 169 170 def qsThreadInfo(self): 171 return "l" 172 173 def A(self, packet): 174 self.started = True 175 return "OK" 176 177 def qLaunchSuccess(self): 178 if self.started: 179 return "OK" 180 return "E42" 181 182 self.server.responder = MyResponder() 183 184 target = self.createTarget("a.yaml") 185 # NB: apparently GDB packets are using "/" on Windows too 186 exe_path = self.getBuildArtifact("a").replace(os.path.sep, '/') 187 exe_hex = binascii.b2a_hex(exe_path.encode()).decode() 188 process = self.connect(target) 189 lldbutil.expect_state_changes(self, self.dbg.GetListener(), process, 190 [lldb.eStateConnected]) 191 192 target.Launch(lldb.SBListener(), 193 ["arg1", "arg2", "arg3"], # argv 194 [], # envp 195 None, # stdin_path 196 None, # stdout_path 197 None, # stderr_path 198 None, # working_directory 199 0, # launch_flags 200 True, # stop_at_entry 201 lldb.SBError()) # error 202 self.assertTrue(process, PROCESS_IS_VALID) 203 self.assertEqual(process.GetProcessID(), 16) 204 205 self.assertPacketLogContains([ 206 "A%d,0,%s,8,1,61726731,8,2,61726732,8,3,61726733" % ( 207 len(exe_hex), exe_hex), 208 ]) 209 210 def test_launch_vRun(self): 211 class MyResponder(MockGDBServerResponder): 212 def __init__(self, *args, **kwargs): 213 self.started = False 214 return super().__init__(*args, **kwargs) 215 216 def qC(self): 217 if self.started: 218 return "QCp10.10" 219 else: 220 return "E42" 221 222 def qfThreadInfo(self): 223 if self.started: 224 return "mp10.10" 225 else: 226 return "E42" 227 228 def qsThreadInfo(self): 229 return "l" 230 231 def vRun(self, packet): 232 self.started = True 233 return "T13" 234 235 def A(self, packet): 236 return "E28" 237 238 self.server.responder = MyResponder() 239 240 target = self.createTarget("a.yaml") 241 # NB: apparently GDB packets are using "/" on Windows too 242 exe_path = self.getBuildArtifact("a").replace(os.path.sep, '/') 243 exe_hex = binascii.b2a_hex(exe_path.encode()).decode() 244 process = self.connect(target) 245 lldbutil.expect_state_changes(self, self.dbg.GetListener(), process, 246 [lldb.eStateConnected]) 247 248 process = target.Launch(lldb.SBListener(), 249 ["arg1", "arg2", "arg3"], # argv 250 [], # envp 251 None, # stdin_path 252 None, # stdout_path 253 None, # stderr_path 254 None, # working_directory 255 0, # launch_flags 256 True, # stop_at_entry 257 lldb.SBError()) # error 258 self.assertTrue(process, PROCESS_IS_VALID) 259 self.assertEqual(process.GetProcessID(), 16) 260 261 self.assertPacketLogContains([ 262 "vRun;%s;61726731;61726732;61726733" % (exe_hex,) 263 ]) 264 265 def test_launch_QEnvironment(self): 266 class MyResponder(MockGDBServerResponder): 267 def qC(self): 268 return "E42" 269 270 def qfThreadInfo(self): 271 return "E42" 272 273 def vRun(self, packet): 274 self.started = True 275 return "E28" 276 277 self.server.responder = MyResponder() 278 279 target = self.createTarget("a.yaml") 280 process = self.connect(target) 281 lldbutil.expect_state_changes(self, self.dbg.GetListener(), process, 282 [lldb.eStateConnected]) 283 284 target.Launch(lldb.SBListener(), 285 [], # argv 286 ["PLAIN=foo", 287 "NEEDSENC=frob$", 288 "NEEDSENC2=fr*ob", 289 "NEEDSENC3=fro}b", 290 "NEEDSENC4=f#rob", 291 "EQUALS=foo=bar", 292 ], # envp 293 None, # stdin_path 294 None, # stdout_path 295 None, # stderr_path 296 None, # working_directory 297 0, # launch_flags 298 True, # stop_at_entry 299 lldb.SBError()) # error 300 301 self.assertPacketLogContains([ 302 "QEnvironment:PLAIN=foo", 303 "QEnvironmentHexEncoded:4e45454453454e433d66726f6224", 304 "QEnvironmentHexEncoded:4e45454453454e43323d66722a6f62", 305 "QEnvironmentHexEncoded:4e45454453454e43333d66726f7d62", 306 "QEnvironmentHexEncoded:4e45454453454e43343d6623726f62", 307 "QEnvironment:EQUALS=foo=bar", 308 ]) 309 310 def test_launch_QEnvironmentHexEncoded_only(self): 311 class MyResponder(MockGDBServerResponder): 312 def qC(self): 313 return "E42" 314 315 def qfThreadInfo(self): 316 return "E42" 317 318 def vRun(self, packet): 319 self.started = True 320 return "E28" 321 322 def QEnvironment(self, packet): 323 return "" 324 325 self.server.responder = MyResponder() 326 327 target = self.createTarget("a.yaml") 328 process = self.connect(target) 329 lldbutil.expect_state_changes(self, self.dbg.GetListener(), process, 330 [lldb.eStateConnected]) 331 332 target.Launch(lldb.SBListener(), 333 [], # argv 334 ["PLAIN=foo", 335 "NEEDSENC=frob$", 336 "NEEDSENC2=fr*ob", 337 "NEEDSENC3=fro}b", 338 "NEEDSENC4=f#rob", 339 "EQUALS=foo=bar", 340 ], # envp 341 None, # stdin_path 342 None, # stdout_path 343 None, # stderr_path 344 None, # working_directory 345 0, # launch_flags 346 True, # stop_at_entry 347 lldb.SBError()) # error 348 349 self.assertPacketLogContains([ 350 "QEnvironmentHexEncoded:504c41494e3d666f6f", 351 "QEnvironmentHexEncoded:4e45454453454e433d66726f6224", 352 "QEnvironmentHexEncoded:4e45454453454e43323d66722a6f62", 353 "QEnvironmentHexEncoded:4e45454453454e43333d66726f7d62", 354 "QEnvironmentHexEncoded:4e45454453454e43343d6623726f62", 355 "QEnvironmentHexEncoded:455155414c533d666f6f3d626172", 356 ]) 357 358 def test_detach_no_multiprocess(self): 359 class MyResponder(MockGDBServerResponder): 360 def __init__(self): 361 super().__init__() 362 self.detached = None 363 364 def qfThreadInfo(self): 365 return "10200" 366 367 def D(self, packet): 368 self.detached = packet 369 return "OK" 370 371 self.server.responder = MyResponder() 372 target = self.dbg.CreateTarget('') 373 process = self.connect(target) 374 process.Detach() 375 self.assertEqual(self.server.responder.detached, "D") 376 377 def test_detach_pid(self): 378 class MyResponder(MockGDBServerResponder): 379 def __init__(self, test_case): 380 super().__init__() 381 self.test_case = test_case 382 self.detached = None 383 384 def qSupported(self, client_supported): 385 self.test_case.assertIn("multiprocess+", client_supported) 386 return "multiprocess+;" + super().qSupported(client_supported) 387 388 def qfThreadInfo(self): 389 return "mp400.10200" 390 391 def D(self, packet): 392 self.detached = packet 393 return "OK" 394 395 self.server.responder = MyResponder(self) 396 target = self.dbg.CreateTarget('') 397 process = self.connect(target) 398 process.Detach() 399 self.assertRegex(self.server.responder.detached, r"D;0*400") 400