1import random 2 3from lldbsuite.test.decorators import * 4from lldbsuite.test.lldbtest import * 5 6from fork_testbase import GdbRemoteForkTestBase 7 8 9class TestGdbRemoteFork(GdbRemoteForkTestBase): 10 def setUp(self): 11 GdbRemoteForkTestBase.setUp(self) 12 if self.getPlatform() == "linux" and self.getArchitecture() in [ 13 "arm", 14 "aarch64", 15 ]: 16 self.skipTest("Unsupported for Arm/AArch64 Linux") 17 18 @add_test_categories(["fork"]) 19 def test_fork_multithreaded(self): 20 _, _, child_pid, _ = self.start_fork_test(["thread:new"] * 2 + ["fork"]) 21 22 # detach the forked child 23 self.test_sequence.add_log_lines( 24 [ 25 "read packet: $D;{}#00".format(child_pid), 26 "send packet: $OK#00", 27 "read packet: $k#00", 28 ], 29 True, 30 ) 31 self.expect_gdbremote_sequence() 32 33 @add_test_categories(["fork"]) 34 def test_fork(self): 35 parent_pid, _ = self.fork_and_detach_test("fork") 36 37 # resume the parent 38 self.test_sequence.add_log_lines( 39 [ 40 "read packet: $c#00", 41 "send packet: $W00;process:{}#00".format(parent_pid), 42 ], 43 True, 44 ) 45 self.expect_gdbremote_sequence() 46 47 @add_test_categories(["fork"]) 48 def test_vfork(self): 49 parent_pid, parent_tid = self.fork_and_detach_test("vfork") 50 51 # resume the parent 52 self.test_sequence.add_log_lines( 53 [ 54 "read packet: $c#00", 55 { 56 "direction": "send", 57 "regex": r"[$]T[0-9a-fA-F]{{2}}thread:p{}[.]{}.*vforkdone.*".format( 58 parent_pid, parent_tid 59 ), 60 }, 61 "read packet: $c#00", 62 "send packet: $W00;process:{}#00".format(parent_pid), 63 ], 64 True, 65 ) 66 self.expect_gdbremote_sequence() 67 68 @add_test_categories(["fork"]) 69 def test_fork_follow(self): 70 self.fork_and_follow_test("fork") 71 72 @add_test_categories(["fork"]) 73 def test_vfork_follow(self): 74 self.fork_and_follow_test("vfork") 75 76 @add_test_categories(["fork"]) 77 def test_select_wrong_pid(self): 78 self.build() 79 self.prep_debug_monitor_and_inferior() 80 self.add_qSupported_packets(["multiprocess+"]) 81 ret = self.expect_gdbremote_sequence() 82 self.assertIn("multiprocess+", ret["qSupported_response"]) 83 self.reset_test_sequence() 84 85 # get process pid 86 self.test_sequence.add_log_lines( 87 [ 88 "read packet: $qC#00", 89 { 90 "direction": "send", 91 "regex": "[$]QCp([0-9a-f]+).([0-9a-f]+)#.*", 92 "capture": {1: "pid", 2: "tid"}, 93 }, 94 ], 95 True, 96 ) 97 ret = self.expect_gdbremote_sequence() 98 pid, tid = (int(ret[x], 16) for x in ("pid", "tid")) 99 self.reset_test_sequence() 100 101 self.test_sequence.add_log_lines( 102 [ 103 # try switching to correct pid 104 "read packet: $Hgp{:x}.{:x}#00".format(pid, tid), 105 "send packet: $OK#00", 106 "read packet: $Hcp{:x}.{:x}#00".format(pid, tid), 107 "send packet: $OK#00", 108 # try switching to invalid tid 109 "read packet: $Hgp{:x}.{:x}#00".format(pid, tid + 1), 110 "send packet: $E15#00", 111 "read packet: $Hcp{:x}.{:x}#00".format(pid, tid + 1), 112 "send packet: $E15#00", 113 # try switching to invalid pid 114 "read packet: $Hgp{:x}.{:x}#00".format(pid + 1, tid), 115 "send packet: $Eff#00", 116 "read packet: $Hcp{:x}.{:x}#00".format(pid + 1, tid), 117 "send packet: $Eff#00", 118 ], 119 True, 120 ) 121 self.expect_gdbremote_sequence() 122 123 @add_test_categories(["fork"]) 124 def test_detach_current(self): 125 self.build() 126 self.prep_debug_monitor_and_inferior() 127 self.add_qSupported_packets(["multiprocess+"]) 128 ret = self.expect_gdbremote_sequence() 129 self.assertIn("multiprocess+", ret["qSupported_response"]) 130 self.reset_test_sequence() 131 132 # get process pid 133 self.test_sequence.add_log_lines( 134 [ 135 "read packet: $qC#00", 136 { 137 "direction": "send", 138 "regex": "[$]QCp([0-9a-f]+).[0-9a-f]+#.*", 139 "capture": {1: "pid"}, 140 }, 141 ], 142 True, 143 ) 144 ret = self.expect_gdbremote_sequence() 145 pid = ret["pid"] 146 self.reset_test_sequence() 147 148 # detach the process 149 self.test_sequence.add_log_lines( 150 [ 151 "read packet: $D;{}#00".format(pid), 152 "send packet: $OK#00", 153 "read packet: $qC#00", 154 "send packet: $E44#00", 155 ], 156 True, 157 ) 158 self.expect_gdbremote_sequence() 159 160 @add_test_categories(["fork"]) 161 def test_detach_all(self): 162 self.detach_all_test() 163 164 @add_test_categories(["fork"]) 165 def test_kill_all(self): 166 parent_pid, _, child_pid, _ = self.start_fork_test(["fork"]) 167 168 exit_regex = "[$]X09;process:([0-9a-f]+)#.*" 169 self.test_sequence.add_log_lines( 170 [ 171 # kill all processes 172 "read packet: $k#00", 173 {"direction": "send", "regex": exit_regex, "capture": {1: "pid1"}}, 174 {"direction": "send", "regex": exit_regex, "capture": {1: "pid2"}}, 175 ], 176 True, 177 ) 178 ret = self.expect_gdbremote_sequence() 179 self.assertEqual(set([ret["pid1"], ret["pid2"]]), set([parent_pid, child_pid])) 180 181 @add_test_categories(["fork"]) 182 def test_vkill_child(self): 183 self.vkill_test(kill_child=True) 184 185 @add_test_categories(["fork"]) 186 def test_vkill_parent(self): 187 self.vkill_test(kill_parent=True) 188 189 @add_test_categories(["fork"]) 190 def test_vkill_both(self): 191 self.vkill_test(kill_parent=True, kill_child=True) 192 193 @add_test_categories(["fork"]) 194 def test_c_parent(self): 195 self.resume_one_test(run_order=["parent", "parent"]) 196 197 @add_test_categories(["fork"]) 198 def test_c_child(self): 199 self.resume_one_test(run_order=["child", "child"]) 200 201 @add_test_categories(["fork"]) 202 def test_c_parent_then_child(self): 203 self.resume_one_test(run_order=["parent", "parent", "child", "child"]) 204 205 @add_test_categories(["fork"]) 206 def test_c_child_then_parent(self): 207 self.resume_one_test(run_order=["child", "child", "parent", "parent"]) 208 209 @add_test_categories(["fork"]) 210 def test_c_interspersed(self): 211 self.resume_one_test(run_order=["parent", "child", "parent", "child"]) 212 213 @add_test_categories(["fork"]) 214 def test_vCont_parent(self): 215 self.resume_one_test(run_order=["parent", "parent"], use_vCont=True) 216 217 @add_test_categories(["fork"]) 218 def test_vCont_child(self): 219 self.resume_one_test(run_order=["child", "child"], use_vCont=True) 220 221 @add_test_categories(["fork"]) 222 def test_vCont_parent_then_child(self): 223 self.resume_one_test( 224 run_order=["parent", "parent", "child", "child"], use_vCont=True 225 ) 226 227 @add_test_categories(["fork"]) 228 def test_vCont_child_then_parent(self): 229 self.resume_one_test( 230 run_order=["child", "child", "parent", "parent"], use_vCont=True 231 ) 232 233 @add_test_categories(["fork"]) 234 def test_vCont_interspersed(self): 235 self.resume_one_test( 236 run_order=["parent", "child", "parent", "child"], use_vCont=True 237 ) 238 239 @add_test_categories(["fork"]) 240 def test_vCont_two_processes(self): 241 parent_pid, parent_tid, child_pid, child_tid = self.start_fork_test( 242 ["fork", "stop"] 243 ) 244 245 self.test_sequence.add_log_lines( 246 [ 247 # try to resume both processes 248 "read packet: $vCont;c:p{}.{};c:p{}.{}#00".format( 249 parent_pid, parent_tid, child_pid, child_tid 250 ), 251 "send packet: $E03#00", 252 ], 253 True, 254 ) 255 self.expect_gdbremote_sequence() 256 257 @add_test_categories(["fork"]) 258 def test_vCont_all_processes_explicit(self): 259 self.start_fork_test(["fork", "stop"]) 260 261 self.test_sequence.add_log_lines( 262 [ 263 # try to resume all processes implicitly 264 "read packet: $vCont;c:p-1.-1#00", 265 "send packet: $E03#00", 266 ], 267 True, 268 ) 269 self.expect_gdbremote_sequence() 270 271 @add_test_categories(["fork"]) 272 def test_vCont_all_processes_implicit(self): 273 self.start_fork_test(["fork", "stop"]) 274 275 self.test_sequence.add_log_lines( 276 [ 277 # try to resume all processes implicitly 278 "read packet: $vCont;c#00", 279 "send packet: $E03#00", 280 ], 281 True, 282 ) 283 self.expect_gdbremote_sequence() 284 285 @add_test_categories(["fork"]) 286 def test_threadinfo(self): 287 parent_pid, parent_tid, child_pid, child_tid = self.start_fork_test( 288 ["fork", "thread:new", "stop"] 289 ) 290 pidtids = [ 291 (parent_pid, parent_tid), 292 (child_pid, child_tid), 293 ] 294 295 self.add_threadinfo_collection_packets() 296 ret = self.expect_gdbremote_sequence() 297 prev_pidtids = set(self.parse_threadinfo_packets(ret)) 298 self.assertEqual( 299 prev_pidtids, 300 frozenset((int(pid, 16), int(tid, 16)) for pid, tid in pidtids), 301 ) 302 self.reset_test_sequence() 303 304 for pidtid in pidtids: 305 self.test_sequence.add_log_lines( 306 [ 307 "read packet: $Hcp{}.{}#00".format(*pidtid), 308 "send packet: $OK#00", 309 "read packet: $c#00", 310 { 311 "direction": "send", 312 "regex": self.stop_regex.format(*pidtid), 313 }, 314 ], 315 True, 316 ) 317 self.add_threadinfo_collection_packets() 318 ret = self.expect_gdbremote_sequence() 319 self.reset_test_sequence() 320 new_pidtids = set(self.parse_threadinfo_packets(ret)) 321 added_pidtid = new_pidtids - prev_pidtids 322 prev_pidtids = new_pidtids 323 324 # verify that we've got exactly one new thread, and that 325 # the PID matches 326 self.assertEqual(len(added_pidtid), 1) 327 self.assertEqual(added_pidtid.pop()[0], int(pidtid[0], 16)) 328 329 for pidtid in new_pidtids: 330 self.test_sequence.add_log_lines( 331 [ 332 "read packet: $Hgp{:x}.{:x}#00".format(*pidtid), 333 "send packet: $OK#00", 334 ], 335 True, 336 ) 337 self.expect_gdbremote_sequence() 338 339 @add_test_categories(["fork"]) 340 def test_memory_read_write(self): 341 self.build() 342 INITIAL_DATA = "Initial message" 343 self.prep_debug_monitor_and_inferior( 344 inferior_args=[ 345 "set-message:{}".format(INITIAL_DATA), 346 "get-data-address-hex:g_message", 347 "fork", 348 "print-message:", 349 "stop", 350 ] 351 ) 352 self.add_qSupported_packets(["multiprocess+", "fork-events+"]) 353 ret = self.expect_gdbremote_sequence() 354 self.assertIn("fork-events+", ret["qSupported_response"]) 355 self.reset_test_sequence() 356 357 # continue and expect fork 358 self.test_sequence.add_log_lines( 359 [ 360 "read packet: $c#00", 361 { 362 "type": "output_match", 363 "regex": self.maybe_strict_output_regex( 364 r"data address: 0x([0-9a-fA-F]+)\r\n" 365 ), 366 "capture": {1: "addr"}, 367 }, 368 { 369 "direction": "send", 370 "regex": self.fork_regex.format("fork"), 371 "capture": self.fork_capture, 372 }, 373 ], 374 True, 375 ) 376 ret = self.expect_gdbremote_sequence() 377 pidtids = { 378 "parent": (ret["parent_pid"], ret["parent_tid"]), 379 "child": (ret["child_pid"], ret["child_tid"]), 380 } 381 addr = ret["addr"] 382 self.reset_test_sequence() 383 384 for name, pidtid in pidtids.items(): 385 self.test_sequence.add_log_lines( 386 [ 387 "read packet: $Hgp{}.{}#00".format(*pidtid), 388 "send packet: $OK#00", 389 # read the current memory contents 390 "read packet: $m{},{:x}#00".format(addr, len(INITIAL_DATA) + 1), 391 { 392 "direction": "send", 393 "regex": r"^[$](.+)#.*$", 394 "capture": {1: "data"}, 395 }, 396 # write a new value 397 "read packet: $M{},{:x}:{}#00".format( 398 addr, len(name) + 1, seven.hexlify(name + "\0") 399 ), 400 "send packet: $OK#00", 401 # resume the process and wait for the trap 402 "read packet: $Hcp{}.{}#00".format(*pidtid), 403 "send packet: $OK#00", 404 "read packet: $c#00", 405 { 406 "type": "output_match", 407 "regex": self.maybe_strict_output_regex(r"message: (.*)\r\n"), 408 "capture": {1: "printed_message"}, 409 }, 410 { 411 "direction": "send", 412 "regex": self.stop_regex.format(*pidtid), 413 }, 414 ], 415 True, 416 ) 417 ret = self.expect_gdbremote_sequence() 418 data = seven.unhexlify(ret["data"]) 419 self.assertEqual(data, INITIAL_DATA + "\0") 420 self.assertEqual(ret["printed_message"], name) 421 self.reset_test_sequence() 422 423 # we do the second round separately to make sure that initial data 424 # is correctly preserved while writing into the first process 425 426 for name, pidtid in pidtids.items(): 427 self.test_sequence.add_log_lines( 428 [ 429 "read packet: $Hgp{}.{}#00".format(*pidtid), 430 "send packet: $OK#00", 431 # read the current memory contents 432 "read packet: $m{},{:x}#00".format(addr, len(name) + 1), 433 { 434 "direction": "send", 435 "regex": r"^[$](.+)#.*$", 436 "capture": {1: "data"}, 437 }, 438 ], 439 True, 440 ) 441 ret = self.expect_gdbremote_sequence() 442 self.assertIsNotNone(ret.get("data")) 443 data = seven.unhexlify(ret.get("data")) 444 self.assertEqual(data, name + "\0") 445 self.reset_test_sequence() 446 447 @add_test_categories(["fork"]) 448 def test_register_read_write(self): 449 parent_pid, parent_tid, child_pid, child_tid = self.start_fork_test( 450 ["fork", "thread:new", "stop"] 451 ) 452 pidtids = [ 453 (parent_pid, parent_tid), 454 (child_pid, child_tid), 455 ] 456 457 for pidtid in pidtids: 458 self.test_sequence.add_log_lines( 459 [ 460 "read packet: $Hcp{}.{}#00".format(*pidtid), 461 "send packet: $OK#00", 462 "read packet: $c#00", 463 { 464 "direction": "send", 465 "regex": self.stop_regex.format(*pidtid), 466 }, 467 ], 468 True, 469 ) 470 471 self.add_threadinfo_collection_packets() 472 ret = self.expect_gdbremote_sequence() 473 self.reset_test_sequence() 474 475 pidtids = set(self.parse_threadinfo_packets(ret)) 476 self.assertEqual(len(pidtids), 4) 477 # first, save register values from all the threads 478 thread_regs = {} 479 for pidtid in pidtids: 480 for regno in range(256): 481 self.test_sequence.add_log_lines( 482 [ 483 "read packet: $Hgp{:x}.{:x}#00".format(*pidtid), 484 "send packet: $OK#00", 485 "read packet: $p{:x}#00".format(regno), 486 { 487 "direction": "send", 488 "regex": r"^[$](.+)#.*$", 489 "capture": {1: "data"}, 490 }, 491 ], 492 True, 493 ) 494 ret = self.expect_gdbremote_sequence() 495 data = ret.get("data") 496 self.assertIsNotNone(data) 497 # ignore registers shorter than 32 bits (this also catches 498 # "Exx" errors) 499 if len(data) >= 8: 500 break 501 else: 502 self.skipTest("no usable register found") 503 thread_regs[pidtid] = (regno, data) 504 505 vals = set(x[1] for x in thread_regs.values()) 506 # NB: cheap hack to make the loop below easier 507 new_val = next(iter(vals)) 508 509 # then, start altering them and verify that we don't unexpectedly 510 # change the value from another thread 511 for pidtid in pidtids: 512 old_val = thread_regs[pidtid] 513 regno = old_val[0] 514 old_val_length = len(old_val[1]) 515 # generate a unique new_val 516 while new_val in vals: 517 new_val = "{{:0{}x}}".format(old_val_length).format( 518 random.getrandbits(old_val_length * 4) 519 ) 520 vals.add(new_val) 521 522 self.test_sequence.add_log_lines( 523 [ 524 "read packet: $Hgp{:x}.{:x}#00".format(*pidtid), 525 "send packet: $OK#00", 526 "read packet: $p{:x}#00".format(regno), 527 { 528 "direction": "send", 529 "regex": r"^[$](.+)#.*$", 530 "capture": {1: "data"}, 531 }, 532 "read packet: $P{:x}={}#00".format(regno, new_val), 533 "send packet: $OK#00", 534 ], 535 True, 536 ) 537 ret = self.expect_gdbremote_sequence() 538 data = ret.get("data") 539 self.assertIsNotNone(data) 540 self.assertEqual(data, old_val[1]) 541 thread_regs[pidtid] = (regno, new_val) 542 543 # finally, verify that new values took effect 544 for pidtid in pidtids: 545 old_val = thread_regs[pidtid] 546 self.test_sequence.add_log_lines( 547 [ 548 "read packet: $Hgp{:x}.{:x}#00".format(*pidtid), 549 "send packet: $OK#00", 550 "read packet: $p{:x}#00".format(old_val[0]), 551 { 552 "direction": "send", 553 "regex": r"^[$](.+)#.*$", 554 "capture": {1: "data"}, 555 }, 556 ], 557 True, 558 ) 559 ret = self.expect_gdbremote_sequence() 560 data = ret.get("data") 561 self.assertIsNotNone(data) 562 self.assertEqual(data, old_val[1]) 563 564 @add_test_categories(["fork"]) 565 def test_qC(self): 566 parent_pid, parent_tid, child_pid, child_tid = self.start_fork_test( 567 ["fork", "thread:new", "stop"] 568 ) 569 pidtids = [ 570 (parent_pid, parent_tid), 571 (child_pid, child_tid), 572 ] 573 574 for pidtid in pidtids: 575 self.test_sequence.add_log_lines( 576 [ 577 "read packet: $Hcp{}.{}#00".format(*pidtid), 578 "send packet: $OK#00", 579 "read packet: $c#00", 580 { 581 "direction": "send", 582 "regex": self.stop_regex.format(*pidtid), 583 }, 584 ], 585 True, 586 ) 587 588 self.add_threadinfo_collection_packets() 589 ret = self.expect_gdbremote_sequence() 590 self.reset_test_sequence() 591 592 pidtids = set(self.parse_threadinfo_packets(ret)) 593 self.assertEqual(len(pidtids), 4) 594 for pidtid in pidtids: 595 self.test_sequence.add_log_lines( 596 [ 597 "read packet: $Hgp{:x}.{:x}#00".format(*pidtid), 598 "send packet: $OK#00", 599 "read packet: $qC#00", 600 "send packet: $QCp{:x}.{:x}#00".format(*pidtid), 601 ], 602 True, 603 ) 604 self.expect_gdbremote_sequence() 605 606 @add_test_categories(["fork"]) 607 def test_T(self): 608 parent_pid, parent_tid, child_pid, child_tid = self.start_fork_test( 609 ["fork", "thread:new", "stop"] 610 ) 611 pidtids = [ 612 (parent_pid, parent_tid), 613 (child_pid, child_tid), 614 ] 615 616 for pidtid in pidtids: 617 self.test_sequence.add_log_lines( 618 [ 619 "read packet: $Hcp{}.{}#00".format(*pidtid), 620 "send packet: $OK#00", 621 "read packet: $c#00", 622 { 623 "direction": "send", 624 "regex": self.stop_regex.format(*pidtid), 625 }, 626 ], 627 True, 628 ) 629 630 self.add_threadinfo_collection_packets() 631 ret = self.expect_gdbremote_sequence() 632 self.reset_test_sequence() 633 634 pidtids = set(self.parse_threadinfo_packets(ret)) 635 self.assertEqual(len(pidtids), 4) 636 max_pid = max(pid for pid, tid in pidtids) 637 max_tid = max(tid for pid, tid in pidtids) 638 bad_pidtids = ( 639 (max_pid, max_tid + 1, "E02"), 640 (max_pid + 1, max_tid, "E01"), 641 (max_pid + 1, max_tid + 1, "E01"), 642 ) 643 644 for pidtid in pidtids: 645 self.test_sequence.add_log_lines( 646 [ 647 # test explicit PID+TID 648 "read packet: $Tp{:x}.{:x}#00".format(*pidtid), 649 "send packet: $OK#00", 650 # test implicit PID via Hg 651 "read packet: $Hgp{:x}.{:x}#00".format(*pidtid), 652 "send packet: $OK#00", 653 "read packet: $T{:x}#00".format(max_tid + 1), 654 "send packet: $E02#00", 655 "read packet: $T{:x}#00".format(pidtid[1]), 656 "send packet: $OK#00", 657 ], 658 True, 659 ) 660 for pid, tid, expected in bad_pidtids: 661 self.test_sequence.add_log_lines( 662 [ 663 "read packet: $Tp{:x}.{:x}#00".format(pid, tid), 664 "send packet: ${}#00".format(expected), 665 ], 666 True, 667 ) 668 self.expect_gdbremote_sequence() 669