1import lldb 2import binascii 3import os.path 4from lldbsuite.test.lldbtest import * 5from lldbsuite.test.decorators import * 6from lldbsuite.test.gdbclientutils import * 7from lldbsuite.test.lldbgdbclient import GDBRemoteTestBase 8 9 10class TestGDBRemoteClient(GDBRemoteTestBase): 11 class gPacketResponder(MockGDBServerResponder): 12 registers = [ 13 "name:rax;bitsize:64;offset:0;encoding:uint;format:hex;set:General Purpose Registers;ehframe:0;dwarf:0;", 14 "name:rbx;bitsize:64;offset:8;encoding:uint;format:hex;set:General Purpose Registers;ehframe:3;dwarf:3;", 15 "name:rcx;bitsize:64;offset:16;encoding:uint;format:hex;set:General Purpose Registers;ehframe:2;dwarf:2;generic:arg4;", 16 "name:rdx;bitsize:64;offset:24;encoding:uint;format:hex;set:General Purpose Registers;ehframe:1;dwarf:1;generic:arg3;", 17 "name:rdi;bitsize:64;offset:32;encoding:uint;format:hex;set:General Purpose Registers;ehframe:5;dwarf:5;generic:arg1;", 18 "name:rsi;bitsize:64;offset:40;encoding:uint;format:hex;set:General Purpose Registers;ehframe:4;dwarf:4;generic:arg2;", 19 "name:rbp;bitsize:64;offset:48;encoding:uint;format:hex;set:General Purpose Registers;ehframe:6;dwarf:6;generic:fp;", 20 "name:rsp;bitsize:64;offset:56;encoding:uint;format:hex;set:General Purpose Registers;ehframe:7;dwarf:7;generic:sp;", 21 ] 22 23 def qRegisterInfo(self, num): 24 try: 25 return self.registers[num] 26 except IndexError: 27 return "E45" 28 29 def readRegisters(self): 30 return len(self.registers) * 16 * "0" 31 32 def readRegister(self, register): 33 return "0000000000000000" 34 35 def test_connect(self): 36 """Test connecting to a remote gdb server""" 37 target = self.createTarget("a.yaml") 38 process = self.connect(target) 39 self.assertPacketLogContains(["qProcessInfo", "qfThreadInfo"]) 40 41 def test_attach_fail(self): 42 error_msg = "mock-error-msg" 43 44 class MyResponder(MockGDBServerResponder): 45 # Pretend we don't have any process during the initial queries. 46 def qC(self): 47 return "E42" 48 49 def qfThreadInfo(self): 50 return "OK" # No threads. 51 52 # Then, when we are asked to attach, error out. 53 def vAttach(self, pid): 54 return "E42;" + binascii.hexlify(error_msg.encode()).decode() 55 56 self.server.responder = MyResponder() 57 58 target = self.dbg.CreateTarget("") 59 process = self.connect(target) 60 lldbutil.expect_state_changes( 61 self, self.dbg.GetListener(), process, [lldb.eStateConnected] 62 ) 63 64 error = lldb.SBError() 65 target.AttachToProcessWithID(lldb.SBListener(), 47, error) 66 self.assertEqual(error_msg, error.GetCString()) 67 68 def test_launch_fail(self): 69 class MyResponder(MockGDBServerResponder): 70 # Pretend we don't have any process during the initial queries. 71 def qC(self): 72 return "E42" 73 74 def qfThreadInfo(self): 75 return "OK" # No threads. 76 77 # Then, when we are asked to attach, error out. 78 def A(self, packet): 79 return "E47" 80 81 self.server.responder = MyResponder() 82 83 target = self.createTarget("a.yaml") 84 process = self.connect(target) 85 lldbutil.expect_state_changes( 86 self, self.dbg.GetListener(), process, [lldb.eStateConnected] 87 ) 88 89 error = lldb.SBError() 90 target.Launch( 91 lldb.SBListener(), None, None, None, None, None, None, 0, True, error 92 ) 93 self.assertRegex(error.GetCString(), "Cannot launch '.*a': Error 71") 94 95 def test_launch_rich_error(self): 96 class MyResponder(MockGDBServerResponder): 97 def qC(self): 98 return "E42" 99 100 def qfThreadInfo(self): 101 return "OK" # No threads. 102 103 # Then, when we are asked to attach, error out. 104 def vRun(self, packet): 105 return "Eff;" + seven.hexlify("I'm a teapot") 106 107 self.server.responder = MyResponder() 108 109 target = self.createTarget("a.yaml") 110 process = self.connect(target) 111 lldbutil.expect_state_changes( 112 self, self.dbg.GetListener(), process, [lldb.eStateConnected] 113 ) 114 115 error = lldb.SBError() 116 target.Launch( 117 lldb.SBListener(), None, None, None, None, None, None, 0, True, error 118 ) 119 self.assertRegex(error.GetCString(), "Cannot launch '.*a': I'm a teapot") 120 121 def test_read_registers_using_g_packets(self): 122 """Test reading registers using 'g' packets (default behavior)""" 123 self.dbg.HandleCommand( 124 "settings set plugin.process.gdb-remote.use-g-packet-for-reading true" 125 ) 126 self.addTearDownHook( 127 lambda: self.runCmd( 128 "settings set plugin.process.gdb-remote.use-g-packet-for-reading false" 129 ) 130 ) 131 self.server.responder = self.gPacketResponder() 132 target = self.createTarget("a.yaml") 133 process = self.connect(target) 134 135 # We want to make sure that the process is using the g packet, but it's 136 # not required the "connect" should read all registers. However, it might 137 # have... So we need to wait till we explicitly 'read_registers' to do 138 # test. 139 # Also, even with the use-g-packet-for-reading lldb will sometimes send p0 140 # early on to see if the packet is supported. So we can't say that there 141 # will be NO p packets. 142 # But there certainly should be no p packets after the g packet. 143 144 self.read_registers(process) 145 print(f"\nPACKET LOG:\n{self.server.responder.packetLog}\n") 146 g_pos = 0 147 try: 148 g_pos = self.server.responder.packetLog.index("g") 149 except err: 150 self.fail("'g' packet not found after fetching registers") 151 152 try: 153 second_g = self.server.responder.packetLog.index("g", g_pos) 154 self.fail("Found more than one 'g' packet") 155 except: 156 pass 157 158 # Make sure there aren't any `p` packets after the `g` packet: 159 self.assertEqual( 160 0, 161 len( 162 [ 163 p 164 for p in self.server.responder.packetLog[g_pos:] 165 if p.startswith("p") 166 ] 167 ), 168 ) 169 170 def test_read_registers_using_p_packets(self): 171 """Test reading registers using 'p' packets""" 172 self.dbg.HandleCommand( 173 "settings set plugin.process.gdb-remote.use-g-packet-for-reading false" 174 ) 175 self.server.responder = self.gPacketResponder() 176 target = self.createTarget("a.yaml") 177 process = self.connect(target) 178 179 self.read_registers(process) 180 self.assertNotIn("g", self.server.responder.packetLog) 181 self.assertGreater( 182 len([p for p in self.server.responder.packetLog if p.startswith("p")]), 0 183 ) 184 185 def test_write_registers_using_P_packets(self): 186 """Test writing registers using 'P' packets (default behavior)""" 187 self.server.responder = self.gPacketResponder() 188 target = self.createTarget("a.yaml") 189 process = self.connect(target) 190 191 self.write_registers(process) 192 self.assertEqual( 193 0, len([p for p in self.server.responder.packetLog if p.startswith("G")]) 194 ) 195 self.assertGreater( 196 len([p for p in self.server.responder.packetLog if p.startswith("P")]), 0 197 ) 198 199 def test_write_registers_using_G_packets(self): 200 """Test writing registers using 'G' packets""" 201 202 class MyResponder(self.gPacketResponder): 203 def readRegister(self, register): 204 # empty string means unsupported 205 return "" 206 207 self.server.responder = MyResponder() 208 target = self.createTarget("a.yaml") 209 process = self.connect(target) 210 211 self.write_registers(process) 212 self.assertEqual( 213 0, len([p for p in self.server.responder.packetLog if p.startswith("P")]) 214 ) 215 self.assertGreater( 216 len([p for p in self.server.responder.packetLog if p.startswith("G")]), 0 217 ) 218 219 def read_registers(self, process): 220 self.for_each_gpr( 221 process, lambda r: self.assertEqual("0x0000000000000000", r.GetValue()) 222 ) 223 224 def write_registers(self, process): 225 self.for_each_gpr( 226 process, lambda r: r.SetValueFromCString("0x0000000000000000") 227 ) 228 229 def for_each_gpr(self, process, operation): 230 registers = process.GetThreadAtIndex(0).GetFrameAtIndex(0).GetRegisters() 231 self.assertGreater(registers.GetSize(), 0) 232 regSet = registers[0] 233 numChildren = regSet.GetNumChildren() 234 self.assertGreater(numChildren, 0) 235 for i in range(numChildren): 236 operation(regSet.GetChildAtIndex(i)) 237 238 def test_launch_A(self): 239 class MyResponder(MockGDBServerResponder): 240 def __init__(self, *args, **kwargs): 241 self.started = False 242 return super().__init__(*args, **kwargs) 243 244 def qC(self): 245 if self.started: 246 return "QCp10.10" 247 else: 248 return "E42" 249 250 def qfThreadInfo(self): 251 if self.started: 252 return "mp10.10" 253 else: 254 return "E42" 255 256 def qsThreadInfo(self): 257 return "l" 258 259 def A(self, packet): 260 self.started = True 261 return "OK" 262 263 def qLaunchSuccess(self): 264 if self.started: 265 return "OK" 266 return "E42" 267 268 self.server.responder = MyResponder() 269 270 target = self.createTarget("a.yaml") 271 # NB: apparently GDB packets are using "/" on Windows too 272 exe_path = self.getBuildArtifact("a").replace(os.path.sep, "/") 273 exe_hex = binascii.b2a_hex(exe_path.encode()).decode() 274 process = self.connect(target) 275 lldbutil.expect_state_changes( 276 self, self.dbg.GetListener(), process, [lldb.eStateConnected] 277 ) 278 279 target.Launch( 280 lldb.SBListener(), 281 ["arg1", "arg2", "arg3"], # argv 282 [], # envp 283 None, # stdin_path 284 None, # stdout_path 285 None, # stderr_path 286 None, # working_directory 287 0, # launch_flags 288 True, # stop_at_entry 289 lldb.SBError(), 290 ) # error 291 self.assertTrue(process, PROCESS_IS_VALID) 292 self.assertEqual(process.GetProcessID(), 16) 293 294 self.assertPacketLogContains( 295 [ 296 "A%d,0,%s,8,1,61726731,8,2,61726732,8,3,61726733" 297 % (len(exe_hex), exe_hex), 298 ] 299 ) 300 301 def test_launch_vRun(self): 302 class MyResponder(MockGDBServerResponder): 303 def __init__(self, *args, **kwargs): 304 self.started = False 305 return super().__init__(*args, **kwargs) 306 307 def qC(self): 308 if self.started: 309 return "QCp10.10" 310 else: 311 return "E42" 312 313 def qfThreadInfo(self): 314 if self.started: 315 return "mp10.10" 316 else: 317 return "E42" 318 319 def qsThreadInfo(self): 320 return "l" 321 322 def vRun(self, packet): 323 self.started = True 324 return "T13" 325 326 def A(self, packet): 327 return "E28" 328 329 self.server.responder = MyResponder() 330 331 target = self.createTarget("a.yaml") 332 # NB: apparently GDB packets are using "/" on Windows too 333 exe_path = self.getBuildArtifact("a").replace(os.path.sep, "/") 334 exe_hex = binascii.b2a_hex(exe_path.encode()).decode() 335 process = self.connect(target) 336 lldbutil.expect_state_changes( 337 self, self.dbg.GetListener(), process, [lldb.eStateConnected] 338 ) 339 340 process = target.Launch( 341 lldb.SBListener(), 342 ["arg1", "arg2", "arg3"], # argv 343 [], # envp 344 None, # stdin_path 345 None, # stdout_path 346 None, # stderr_path 347 None, # working_directory 348 0, # launch_flags 349 True, # stop_at_entry 350 lldb.SBError(), 351 ) # error 352 self.assertTrue(process, PROCESS_IS_VALID) 353 self.assertEqual(process.GetProcessID(), 16) 354 355 self.assertPacketLogContains( 356 ["vRun;%s;61726731;61726732;61726733" % (exe_hex,)] 357 ) 358 359 def test_launch_QEnvironment(self): 360 class MyResponder(MockGDBServerResponder): 361 def qC(self): 362 return "E42" 363 364 def qfThreadInfo(self): 365 return "E42" 366 367 def vRun(self, packet): 368 self.started = True 369 return "E28" 370 371 self.server.responder = MyResponder() 372 373 target = self.createTarget("a.yaml") 374 process = self.connect(target) 375 lldbutil.expect_state_changes( 376 self, self.dbg.GetListener(), process, [lldb.eStateConnected] 377 ) 378 379 target.Launch( 380 lldb.SBListener(), 381 [], # argv 382 [ 383 "PLAIN=foo", 384 "NEEDSENC=frob$", 385 "NEEDSENC2=fr*ob", 386 "NEEDSENC3=fro}b", 387 "NEEDSENC4=f#rob", 388 "EQUALS=foo=bar", 389 ], # envp 390 None, # stdin_path 391 None, # stdout_path 392 None, # stderr_path 393 None, # working_directory 394 0, # launch_flags 395 True, # stop_at_entry 396 lldb.SBError(), 397 ) # error 398 399 self.assertPacketLogContains( 400 [ 401 "QEnvironment:EQUALS=foo=bar", 402 "QEnvironmentHexEncoded:4e45454453454e433d66726f6224", 403 "QEnvironmentHexEncoded:4e45454453454e43323d66722a6f62", 404 "QEnvironmentHexEncoded:4e45454453454e43333d66726f7d62", 405 "QEnvironmentHexEncoded:4e45454453454e43343d6623726f62", 406 "QEnvironment:PLAIN=foo", 407 ] 408 ) 409 410 def test_launch_QEnvironmentHexEncoded_only(self): 411 class MyResponder(MockGDBServerResponder): 412 def qC(self): 413 return "E42" 414 415 def qfThreadInfo(self): 416 return "E42" 417 418 def vRun(self, packet): 419 self.started = True 420 return "E28" 421 422 def QEnvironment(self, packet): 423 return "" 424 425 self.server.responder = MyResponder() 426 427 target = self.createTarget("a.yaml") 428 process = self.connect(target) 429 lldbutil.expect_state_changes( 430 self, self.dbg.GetListener(), process, [lldb.eStateConnected] 431 ) 432 433 target.Launch( 434 lldb.SBListener(), 435 [], # argv 436 [ 437 "PLAIN=foo", 438 "NEEDSENC=frob$", 439 "NEEDSENC2=fr*ob", 440 "NEEDSENC3=fro}b", 441 "NEEDSENC4=f#rob", 442 "EQUALS=foo=bar", 443 ], # envp 444 None, # stdin_path 445 None, # stdout_path 446 None, # stderr_path 447 None, # working_directory 448 0, # launch_flags 449 True, # stop_at_entry 450 lldb.SBError(), 451 ) # error 452 453 self.assertPacketLogContains( 454 [ 455 "QEnvironmentHexEncoded:455155414c533d666f6f3d626172", 456 "QEnvironmentHexEncoded:4e45454453454e433d66726f6224", 457 "QEnvironmentHexEncoded:4e45454453454e43323d66722a6f62", 458 "QEnvironmentHexEncoded:4e45454453454e43333d66726f7d62", 459 "QEnvironmentHexEncoded:4e45454453454e43343d6623726f62", 460 "QEnvironmentHexEncoded:504c41494e3d666f6f", 461 ] 462 ) 463 464 def test_detach_no_multiprocess(self): 465 class MyResponder(MockGDBServerResponder): 466 def __init__(self): 467 super().__init__() 468 self.detached = None 469 470 def qfThreadInfo(self): 471 return "10200" 472 473 def D(self, packet): 474 self.detached = packet 475 return "OK" 476 477 self.server.responder = MyResponder() 478 target = self.dbg.CreateTarget("") 479 process = self.connect(target) 480 process.Detach() 481 self.assertEqual(self.server.responder.detached, "D") 482 483 def test_detach_pid(self): 484 class MyResponder(MockGDBServerResponder): 485 def __init__(self, test_case): 486 super().__init__() 487 self.test_case = test_case 488 self.detached = None 489 490 def qSupported(self, client_supported): 491 self.test_case.assertIn("multiprocess+", client_supported) 492 return "multiprocess+;" + super().qSupported(client_supported) 493 494 def qfThreadInfo(self): 495 return "mp400.10200" 496 497 def D(self, packet): 498 self.detached = packet 499 return "OK" 500 501 self.server.responder = MyResponder(self) 502 target = self.dbg.CreateTarget("") 503 process = self.connect(target) 504 process.Detach() 505 self.assertRegex(self.server.responder.detached, r"D;0*400") 506 507 def test_signal_gdb(self): 508 class MyResponder(MockGDBServerResponder): 509 def qSupported(self, client_supported): 510 return "PacketSize=3fff;QStartNoAckMode+" 511 512 def haltReason(self): 513 return "S0a" 514 515 def cont(self): 516 return self.haltReason() 517 518 self.server.responder = MyResponder() 519 520 self.runCmd("platform select remote-linux") 521 target = self.createTarget("a.yaml") 522 process = self.connect(target) 523 524 self.assertEqual(process.threads[0].GetStopReason(), lldb.eStopReasonSignal) 525 self.assertEqual(process.threads[0].GetStopDescription(100), "signal SIGBUS") 526 527 def test_signal_lldb_old(self): 528 class MyResponder(MockGDBServerResponder): 529 def qSupported(self, client_supported): 530 return "PacketSize=3fff;QStartNoAckMode+" 531 532 def qHostInfo(self): 533 return "triple:61726d76372d756e6b6e6f776e2d6c696e75782d676e75;" 534 535 def QThreadSuffixSupported(self): 536 return "OK" 537 538 def haltReason(self): 539 return "S0a" 540 541 def cont(self): 542 return self.haltReason() 543 544 self.server.responder = MyResponder() 545 546 self.runCmd("platform select remote-linux") 547 target = self.createTarget("a.yaml") 548 process = self.connect(target) 549 550 self.assertEqual(process.threads[0].GetStopReason(), lldb.eStopReasonSignal) 551 self.assertEqual(process.threads[0].GetStopDescription(100), "signal SIGUSR1") 552 553 def test_signal_lldb(self): 554 class MyResponder(MockGDBServerResponder): 555 def qSupported(self, client_supported): 556 return "PacketSize=3fff;QStartNoAckMode+;native-signals+" 557 558 def qHostInfo(self): 559 return "triple:61726d76372d756e6b6e6f776e2d6c696e75782d676e75;" 560 561 def haltReason(self): 562 return "S0a" 563 564 def cont(self): 565 return self.haltReason() 566 567 self.server.responder = MyResponder() 568 569 self.runCmd("platform select remote-linux") 570 target = self.createTarget("a.yaml") 571 process = self.connect(target) 572 573 self.assertEqual(process.threads[0].GetStopReason(), lldb.eStopReasonSignal) 574 self.assertEqual(process.threads[0].GetStopDescription(100), "signal SIGUSR1") 575 576 def do_siginfo_test(self, platform, target_yaml, raw_data, expected): 577 class MyResponder(MockGDBServerResponder): 578 def qSupported(self, client_supported): 579 return "PacketSize=3fff;QStartNoAckMode+;qXfer:siginfo:read+" 580 581 def qXferRead(self, obj, annex, offset, length): 582 if obj == "siginfo": 583 return raw_data, False 584 else: 585 return None, False 586 587 def haltReason(self): 588 return "T02" 589 590 def cont(self): 591 return self.haltReason() 592 593 self.server.responder = MyResponder() 594 595 self.runCmd("platform select " + platform) 596 target = self.createTarget(target_yaml) 597 process = self.connect(target) 598 599 siginfo = process.threads[0].GetSiginfo() 600 self.assertSuccess(siginfo.GetError()) 601 602 for key, value in expected.items(): 603 self.assertEqual( 604 siginfo.GetValueForExpressionPath("." + key).GetValueAsUnsigned(), value 605 ) 606 607 def test_siginfo_linux_amd64(self): 608 data = ( 609 # si_signo si_errno si_code 610 "\x11\x00\x00\x00\x00\x00\x00\x00\x01\x00\x00\x00" 611 # __pad0 si_pid si_uid 612 "\x00\x00\x00\x00\xbf\xf7\x0b\x00\xe8\x03\x00\x00" 613 # si_status 614 "\x0c\x00\x00\x00" 615 + "\x00" * 100 616 ) 617 expected = { 618 "si_signo": 17, # SIGCHLD 619 "si_errno": 0, 620 "si_code": 1, # CLD_EXITED 621 "_sifields._sigchld.si_pid": 784319, 622 "_sifields._sigchld.si_uid": 1000, 623 "_sifields._sigchld.si_status": 12, 624 "_sifields._sigchld.si_utime": 0, 625 "_sifields._sigchld.si_stime": 0, 626 } 627 self.do_siginfo_test("remote-linux", "basic_eh_frame.yaml", data, expected) 628 629 def test_siginfo_linux_i386(self): 630 data = ( 631 # si_signo si_errno si_code 632 "\x11\x00\x00\x00\x00\x00\x00\x00\x01\x00\x00\x00" 633 # si_pid si_uid si_status 634 "\x49\x43\x07\x00\xe8\x03\x00\x00\x0c\x00\x00\x00" 635 + "\x00" * 104 636 ) 637 expected = { 638 "si_signo": 17, # SIGCHLD 639 "si_errno": 0, 640 "si_code": 1, # CLD_EXITED 641 "_sifields._sigchld.si_pid": 475977, 642 "_sifields._sigchld.si_uid": 1000, 643 "_sifields._sigchld.si_status": 12, 644 "_sifields._sigchld.si_utime": 0, 645 "_sifields._sigchld.si_stime": 0, 646 } 647 self.do_siginfo_test("remote-linux", "basic_eh_frame-i386.yaml", data, expected) 648 649 def test_siginfo_freebsd_amd64(self): 650 data = ( 651 # si_signo si_errno si_code 652 "\x0b\x00\x00\x00\x00\x00\x00\x00\x01\x00\x00\x00" 653 # si_pid si_uid si_status 654 "\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00" 655 # si_addr 656 "\x76\x98\xba\xdc\xfe\x00\x00\x00" 657 # si_status si_trapno 658 "\x00\x00\x00\x00\x00\x00\x00\x00\x0c\x00\x00\x00" 659 + "\x00" * 36 660 ) 661 662 expected = { 663 "si_signo": 11, # SIGSEGV 664 "si_errno": 0, 665 "si_code": 1, # SEGV_MAPERR 666 "si_addr": 0xFEDCBA9876, 667 "_reason._fault._trapno": 12, 668 } 669 self.do_siginfo_test("remote-freebsd", "basic_eh_frame.yaml", data, expected) 670