1import lldb 2import binascii 3from lldbsuite.test.lldbtest import * 4from lldbsuite.test.decorators import * 5from gdbclientutils import * 6 7 8class TestGDBRemoteClient(GDBRemoteTestBase): 9 10 class gPacketResponder(MockGDBServerResponder): 11 def readRegisters(self): 12 return '0000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000' 13 14 @skipIfReproducer # Packet log is not populated during replay. 15 def test_connect(self): 16 """Test connecting to a remote gdb server""" 17 target = self.createTarget("a.yaml") 18 process = self.connect(target) 19 self.assertPacketLogContains(["qProcessInfo", "qfThreadInfo"]) 20 21 @skipIfReproducer # FIXME: Unexpected packet during (active) replay 22 def test_attach_fail(self): 23 error_msg = "mock-error-msg" 24 25 class MyResponder(MockGDBServerResponder): 26 # Pretend we don't have any process during the initial queries. 27 def qC(self): 28 return "E42" 29 30 def qfThreadInfo(self): 31 return "OK" # No threads. 32 33 # Then, when we are asked to attach, error out. 34 def vAttach(self, pid): 35 return "E42;" + binascii.hexlify(error_msg.encode()).decode() 36 37 self.server.responder = MyResponder() 38 39 target = self.dbg.CreateTarget("") 40 process = self.connect(target) 41 lldbutil.expect_state_changes(self, self.dbg.GetListener(), process, [lldb.eStateConnected]) 42 43 error = lldb.SBError() 44 target.AttachToProcessWithID(lldb.SBListener(), 47, error) 45 self.assertEquals(error_msg, error.GetCString()) 46 47 def test_launch_fail(self): 48 class MyResponder(MockGDBServerResponder): 49 # Pretend we don't have any process during the initial queries. 50 def qC(self): 51 return "E42" 52 53 def qfThreadInfo(self): 54 return "OK" # No threads. 55 56 # Then, when we are asked to attach, error out. 57 def A(self, packet): 58 return "E47" 59 60 self.server.responder = MyResponder() 61 62 target = self.createTarget("a.yaml") 63 process = self.connect(target) 64 lldbutil.expect_state_changes(self, self.dbg.GetListener(), process, [lldb.eStateConnected]) 65 66 error = lldb.SBError() 67 target.Launch(lldb.SBListener(), None, None, None, None, None, 68 None, 0, True, error) 69 self.assertEquals("'A' packet returned an error: 71", error.GetCString()) 70 71 @skipIfReproducer # Packet log is not populated during replay. 72 def test_read_registers_using_g_packets(self): 73 """Test reading registers using 'g' packets (default behavior)""" 74 self.dbg.HandleCommand( 75 "settings set plugin.process.gdb-remote.use-g-packet-for-reading true") 76 self.addTearDownHook(lambda: 77 self.runCmd("settings set plugin.process.gdb-remote.use-g-packet-for-reading false")) 78 self.server.responder = self.gPacketResponder() 79 target = self.createTarget("a.yaml") 80 process = self.connect(target) 81 82 self.assertEquals(1, self.server.responder.packetLog.count("g")) 83 self.server.responder.packetLog = [] 84 self.read_registers(process) 85 # Reading registers should not cause any 'p' packets to be exchanged. 86 self.assertEquals( 87 0, len([p for p in self.server.responder.packetLog if p.startswith("p")])) 88 89 @skipIfReproducer # Packet log is not populated during replay. 90 def test_read_registers_using_p_packets(self): 91 """Test reading registers using 'p' packets""" 92 self.dbg.HandleCommand( 93 "settings set plugin.process.gdb-remote.use-g-packet-for-reading false") 94 target = self.createTarget("a.yaml") 95 process = self.connect(target) 96 97 self.read_registers(process) 98 self.assertNotIn("g", self.server.responder.packetLog) 99 self.assertGreater( 100 len([p for p in self.server.responder.packetLog if p.startswith("p")]), 0) 101 102 @skipIfReproducer # Packet log is not populated during replay. 103 def test_write_registers_using_P_packets(self): 104 """Test writing registers using 'P' packets (default behavior)""" 105 self.server.responder = self.gPacketResponder() 106 target = self.createTarget("a.yaml") 107 process = self.connect(target) 108 109 self.write_registers(process) 110 self.assertEquals(0, len( 111 [p for p in self.server.responder.packetLog if p.startswith("G")])) 112 self.assertGreater( 113 len([p for p in self.server.responder.packetLog if p.startswith("P")]), 0) 114 115 @skipIfReproducer # Packet log is not populated during replay. 116 def test_write_registers_using_G_packets(self): 117 """Test writing registers using 'G' packets""" 118 119 class MyResponder(self.gPacketResponder): 120 def readRegister(self, register): 121 # empty string means unsupported 122 return "" 123 124 self.server.responder = MyResponder() 125 target = self.createTarget("a.yaml") 126 process = self.connect(target) 127 128 self.write_registers(process) 129 self.assertEquals(0, len( 130 [p for p in self.server.responder.packetLog if p.startswith("P")])) 131 self.assertGreater(len( 132 [p for p in self.server.responder.packetLog if p.startswith("G")]), 0) 133 134 def read_registers(self, process): 135 self.for_each_gpr( 136 process, lambda r: self.assertEquals("0x00000000", r.GetValue())) 137 138 def write_registers(self, process): 139 self.for_each_gpr( 140 process, lambda r: r.SetValueFromCString("0x00000000")) 141 142 def for_each_gpr(self, process, operation): 143 registers = process.GetThreadAtIndex(0).GetFrameAtIndex(0).GetRegisters() 144 self.assertGreater(registers.GetSize(), 0) 145 regSet = registers[0] 146 numChildren = regSet.GetNumChildren() 147 self.assertGreater(numChildren, 0) 148 for i in range(numChildren): 149 operation(regSet.GetChildAtIndex(i)) 150 151 def test_launch_A(self): 152 class MyResponder(MockGDBServerResponder): 153 def __init__(self, *args, **kwargs): 154 self.started = False 155 return super().__init__(*args, **kwargs) 156 157 def qC(self): 158 if self.started: 159 return "QCp10.10" 160 else: 161 return "E42" 162 163 def qfThreadInfo(self): 164 if self.started: 165 return "mp10.10" 166 else: 167 return "E42" 168 169 def qsThreadInfo(self): 170 return "l" 171 172 def A(self, packet): 173 self.started = True 174 return "OK" 175 176 def qLaunchSuccess(self): 177 if self.started: 178 return "OK" 179 return "E42" 180 181 self.server.responder = MyResponder() 182 183 target = self.createTarget("a.yaml") 184 exe_path = self.getBuildArtifact("a") 185 exe_hex = binascii.b2a_hex(exe_path.encode()).decode() 186 process = self.connect(target) 187 lldbutil.expect_state_changes(self, self.dbg.GetListener(), process, 188 [lldb.eStateConnected]) 189 190 target.Launch(lldb.SBListener(), 191 ["arg1", "arg2", "arg3"], # argv 192 [], # envp 193 None, # stdin_path 194 None, # stdout_path 195 None, # stderr_path 196 None, # working_directory 197 0, # launch_flags 198 True, # stop_at_entry 199 lldb.SBError()) # error 200 self.assertTrue(process, PROCESS_IS_VALID) 201 self.assertEqual(process.GetProcessID(), 16) 202 203 self.assertPacketLogContains([ 204 "A%d,0,%s,8,1,61726731,8,2,61726732,8,3,61726733" % ( 205 len(exe_hex), exe_hex), 206 ]) 207 208 def test_launch_vRun(self): 209 class MyResponder(MockGDBServerResponder): 210 def __init__(self, *args, **kwargs): 211 self.started = False 212 return super().__init__(*args, **kwargs) 213 214 def qC(self): 215 if self.started: 216 return "QCp10.10" 217 else: 218 return "E42" 219 220 def qfThreadInfo(self): 221 if self.started: 222 return "mp10.10" 223 else: 224 return "E42" 225 226 def qsThreadInfo(self): 227 return "l" 228 229 def vRun(self, packet): 230 self.started = True 231 return "T13" 232 233 def A(self, packet): 234 return "E28" 235 236 self.server.responder = MyResponder() 237 238 target = self.createTarget("a.yaml") 239 exe_path = self.getBuildArtifact("a") 240 exe_hex = binascii.b2a_hex(exe_path.encode()).decode() 241 process = self.connect(target) 242 lldbutil.expect_state_changes(self, self.dbg.GetListener(), process, 243 [lldb.eStateConnected]) 244 245 process = target.Launch(lldb.SBListener(), 246 ["arg1", "arg2", "arg3"], # argv 247 [], # envp 248 None, # stdin_path 249 None, # stdout_path 250 None, # stderr_path 251 None, # working_directory 252 0, # launch_flags 253 True, # stop_at_entry 254 lldb.SBError()) # error 255 self.assertTrue(process, PROCESS_IS_VALID) 256 self.assertEqual(process.GetProcessID(), 16) 257 258 self.assertPacketLogContains([ 259 "vRun;%s;61726731;61726732;61726733" % (exe_hex,) 260 ]) 261 262 def test_launch_QEnvironment(self): 263 class MyResponder(MockGDBServerResponder): 264 def qC(self): 265 return "E42" 266 267 def qfThreadInfo(self): 268 return "E42" 269 270 def vRun(self, packet): 271 self.started = True 272 return "E28" 273 274 self.server.responder = MyResponder() 275 276 target = self.createTarget("a.yaml") 277 process = self.connect(target) 278 lldbutil.expect_state_changes(self, self.dbg.GetListener(), process, 279 [lldb.eStateConnected]) 280 281 target.Launch(lldb.SBListener(), 282 [], # argv 283 ["PLAIN=foo", 284 "NEEDSENC=frob$", 285 "NEEDSENC2=fr*ob", 286 "NEEDSENC3=fro}b", 287 "NEEDSENC4=f#rob", 288 "EQUALS=foo=bar", 289 ], # envp 290 None, # stdin_path 291 None, # stdout_path 292 None, # stderr_path 293 None, # working_directory 294 0, # launch_flags 295 True, # stop_at_entry 296 lldb.SBError()) # error 297 298 self.assertPacketLogContains([ 299 "QEnvironment:PLAIN=foo", 300 "QEnvironmentHexEncoded:4e45454453454e433d66726f6224", 301 "QEnvironmentHexEncoded:4e45454453454e43323d66722a6f62", 302 "QEnvironmentHexEncoded:4e45454453454e43333d66726f7d62", 303 "QEnvironmentHexEncoded:4e45454453454e43343d6623726f62", 304 "QEnvironment:EQUALS=foo=bar", 305 ]) 306 307 def test_launch_QEnvironmentHexEncoded_only(self): 308 class MyResponder(MockGDBServerResponder): 309 def qC(self): 310 return "E42" 311 312 def qfThreadInfo(self): 313 return "E42" 314 315 def vRun(self, packet): 316 self.started = True 317 return "E28" 318 319 def QEnvironment(self, packet): 320 return "" 321 322 self.server.responder = MyResponder() 323 324 target = self.createTarget("a.yaml") 325 process = self.connect(target) 326 lldbutil.expect_state_changes(self, self.dbg.GetListener(), process, 327 [lldb.eStateConnected]) 328 329 target.Launch(lldb.SBListener(), 330 [], # argv 331 ["PLAIN=foo", 332 "NEEDSENC=frob$", 333 "NEEDSENC2=fr*ob", 334 "NEEDSENC3=fro}b", 335 "NEEDSENC4=f#rob", 336 "EQUALS=foo=bar", 337 ], # envp 338 None, # stdin_path 339 None, # stdout_path 340 None, # stderr_path 341 None, # working_directory 342 0, # launch_flags 343 True, # stop_at_entry 344 lldb.SBError()) # error 345 346 self.assertPacketLogContains([ 347 "QEnvironmentHexEncoded:504c41494e3d666f6f", 348 "QEnvironmentHexEncoded:4e45454453454e433d66726f6224", 349 "QEnvironmentHexEncoded:4e45454453454e43323d66722a6f62", 350 "QEnvironmentHexEncoded:4e45454453454e43333d66726f7d62", 351 "QEnvironmentHexEncoded:4e45454453454e43343d6623726f62", 352 "QEnvironmentHexEncoded:455155414c533d666f6f3d626172", 353 ]) 354