1import random 2 3import gdbremote_testcase 4from lldbsuite.test.decorators import * 5from lldbsuite.test.lldbtest import * 6from lldbsuite.test import lldbutil 7 8class TestGdbRemoteFork(gdbremote_testcase.GdbRemoteTestCaseBase): 9 10 fork_regex = ("[$]T05thread:p([0-9a-f]+)[.]([0-9a-f]+);.*" 11 "{}:p([0-9a-f]+)[.]([0-9a-f]+).*") 12 fork_capture = {1: "parent_pid", 2: "parent_tid", 13 3: "child_pid", 4: "child_tid"} 14 15 def start_fork_test(self, args, variant="fork"): 16 self.build() 17 self.prep_debug_monitor_and_inferior(inferior_args=args) 18 self.add_qSupported_packets(["multiprocess+", 19 "{}-events+".format(variant)]) 20 ret = self.expect_gdbremote_sequence() 21 self.assertIn("{}-events+".format(variant), ret["qSupported_response"]) 22 self.reset_test_sequence() 23 24 # continue and expect fork 25 self.test_sequence.add_log_lines([ 26 "read packet: $c#00", 27 {"direction": "send", "regex": self.fork_regex.format(variant), 28 "capture": self.fork_capture}, 29 ], True) 30 ret = self.expect_gdbremote_sequence() 31 self.reset_test_sequence() 32 33 return tuple(ret[x] for x in ("parent_pid", "parent_tid", 34 "child_pid", "child_tid")) 35 36 @add_test_categories(["fork"]) 37 def test_fork_multithreaded(self): 38 _, _, child_pid, _ = self.start_fork_test(["thread:new"]*2 + ["fork"]) 39 40 # detach the forked child 41 self.test_sequence.add_log_lines([ 42 "read packet: $D;{}#00".format(child_pid), 43 "send packet: $OK#00", 44 "read packet: $k#00", 45 ], True) 46 self.expect_gdbremote_sequence() 47 48 def fork_and_detach_test(self, variant): 49 parent_pid, parent_tid, child_pid, child_tid = ( 50 self.start_fork_test([variant], variant)) 51 52 # detach the forked child 53 self.test_sequence.add_log_lines([ 54 "read packet: $D;{}#00".format(child_pid), 55 "send packet: $OK#00", 56 # verify that the current process is correct 57 "read packet: $qC#00", 58 "send packet: $QCp{}.{}#00".format(parent_pid, parent_tid), 59 # verify that the correct processes are detached/available 60 "read packet: $Hgp{}.{}#00".format(child_pid, child_tid), 61 "send packet: $Eff#00", 62 "read packet: $Hgp{}.{}#00".format(parent_pid, parent_tid), 63 "send packet: $OK#00", 64 ], True) 65 self.expect_gdbremote_sequence() 66 self.reset_test_sequence() 67 return parent_pid, parent_tid 68 69 @add_test_categories(["fork"]) 70 def test_fork(self): 71 parent_pid, _ = self.fork_and_detach_test("fork") 72 73 # resume the parent 74 self.test_sequence.add_log_lines([ 75 "read packet: $c#00", 76 "send packet: $W00;process:{}#00".format(parent_pid), 77 ], True) 78 self.expect_gdbremote_sequence() 79 80 @add_test_categories(["fork"]) 81 def test_vfork(self): 82 parent_pid, parent_tid = self.fork_and_detach_test("vfork") 83 84 # resume the parent 85 self.test_sequence.add_log_lines([ 86 "read packet: $c#00", 87 {"direction": "send", 88 "regex": r"[$]T05thread:p{}[.]{}.*vforkdone.*".format(parent_pid, 89 parent_tid), 90 }, 91 "read packet: $c#00", 92 "send packet: $W00;process:{}#00".format(parent_pid), 93 ], True) 94 self.expect_gdbremote_sequence() 95 96 def fork_and_follow_test(self, variant): 97 parent_pid, parent_tid, child_pid, child_tid = ( 98 self.start_fork_test([variant], variant)) 99 100 # switch to the forked child 101 self.test_sequence.add_log_lines([ 102 "read packet: $Hgp{}.{}#00".format(child_pid, child_tid), 103 "send packet: $OK#00", 104 "read packet: $Hcp{}.{}#00".format(child_pid, child_tid), 105 "send packet: $OK#00", 106 # detach the parent 107 "read packet: $D;{}#00".format(parent_pid), 108 "send packet: $OK#00", 109 # verify that the correct processes are detached/available 110 "read packet: $Hgp{}.{}#00".format(parent_pid, parent_tid), 111 "send packet: $Eff#00", 112 "read packet: $Hgp{}.{}#00".format(child_pid, child_tid), 113 "send packet: $OK#00", 114 # then resume the child 115 "read packet: $c#00", 116 "send packet: $W00;process:{}#00".format(child_pid), 117 ], True) 118 self.expect_gdbremote_sequence() 119 120 @add_test_categories(["fork"]) 121 def test_fork_follow(self): 122 self.fork_and_follow_test("fork") 123 124 @add_test_categories(["fork"]) 125 def test_vfork_follow(self): 126 self.fork_and_follow_test("vfork") 127 128 @add_test_categories(["fork"]) 129 def test_select_wrong_pid(self): 130 self.build() 131 self.prep_debug_monitor_and_inferior() 132 self.add_qSupported_packets(["multiprocess+"]) 133 ret = self.expect_gdbremote_sequence() 134 self.assertIn("multiprocess+", ret["qSupported_response"]) 135 self.reset_test_sequence() 136 137 # get process pid 138 self.test_sequence.add_log_lines([ 139 "read packet: $qC#00", 140 {"direction": "send", "regex": "[$]QCp([0-9a-f]+).([0-9a-f]+)#.*", 141 "capture": {1: "pid", 2: "tid"}}, 142 ], True) 143 ret = self.expect_gdbremote_sequence() 144 pid, tid = (int(ret[x], 16) for x in ("pid", "tid")) 145 self.reset_test_sequence() 146 147 self.test_sequence.add_log_lines([ 148 # try switching to correct pid 149 "read packet: $Hgp{:x}.{:x}#00".format(pid, tid), 150 "send packet: $OK#00", 151 "read packet: $Hcp{:x}.{:x}#00".format(pid, tid), 152 "send packet: $OK#00", 153 # try switching to invalid tid 154 "read packet: $Hgp{:x}.{:x}#00".format(pid, tid+1), 155 "send packet: $E15#00", 156 "read packet: $Hcp{:x}.{:x}#00".format(pid, tid+1), 157 "send packet: $E15#00", 158 # try switching to invalid pid 159 "read packet: $Hgp{:x}.{:x}#00".format(pid+1, tid), 160 "send packet: $Eff#00", 161 "read packet: $Hcp{:x}.{:x}#00".format(pid+1, tid), 162 "send packet: $Eff#00", 163 ], True) 164 self.expect_gdbremote_sequence() 165 166 @add_test_categories(["fork"]) 167 def test_detach_current(self): 168 self.build() 169 self.prep_debug_monitor_and_inferior() 170 self.add_qSupported_packets(["multiprocess+"]) 171 ret = self.expect_gdbremote_sequence() 172 self.assertIn("multiprocess+", ret["qSupported_response"]) 173 self.reset_test_sequence() 174 175 # get process pid 176 self.test_sequence.add_log_lines([ 177 "read packet: $qC#00", 178 {"direction": "send", "regex": "[$]QCp([0-9a-f]+).[0-9a-f]+#.*", 179 "capture": {1: "pid"}}, 180 ], True) 181 ret = self.expect_gdbremote_sequence() 182 pid = ret["pid"] 183 self.reset_test_sequence() 184 185 # detach the process 186 self.test_sequence.add_log_lines([ 187 "read packet: $D;{}#00".format(pid), 188 "send packet: $OK#00", 189 "read packet: $qC#00", 190 "send packet: $E44#00", 191 ], True) 192 self.expect_gdbremote_sequence() 193 194 @add_test_categories(["fork"]) 195 def test_detach_all(self): 196 parent_pid, parent_tid, child_pid, child_tid = ( 197 self.start_fork_test(["fork"])) 198 199 self.test_sequence.add_log_lines([ 200 # double-check our PIDs 201 "read packet: $Hgp{}.{}#00".format(parent_pid, parent_tid), 202 "send packet: $OK#00", 203 "read packet: $Hgp{}.{}#00".format(child_pid, child_tid), 204 "send packet: $OK#00", 205 # detach all processes 206 "read packet: $D#00", 207 "send packet: $OK#00", 208 # verify that both PIDs are invalid now 209 "read packet: $Hgp{}.{}#00".format(parent_pid, parent_tid), 210 "send packet: $Eff#00", 211 "read packet: $Hgp{}.{}#00".format(child_pid, child_tid), 212 "send packet: $Eff#00", 213 ], True) 214 self.expect_gdbremote_sequence() 215 216 @add_test_categories(["fork"]) 217 def test_kill_all(self): 218 parent_pid, _, child_pid, _ = self.start_fork_test(["fork"]) 219 220 exit_regex = "[$]X09;process:([0-9a-f]+)#.*" 221 self.test_sequence.add_log_lines([ 222 # kill all processes 223 "read packet: $k#00", 224 {"direction": "send", "regex": exit_regex, 225 "capture": {1: "pid1"}}, 226 {"direction": "send", "regex": exit_regex, 227 "capture": {1: "pid2"}}, 228 ], True) 229 ret = self.expect_gdbremote_sequence() 230 self.assertEqual(set([ret["pid1"], ret["pid2"]]), 231 set([parent_pid, child_pid])) 232 233 def vkill_test(self, kill_parent=False, kill_child=False): 234 assert kill_parent or kill_child 235 parent_pid, parent_tid, child_pid, child_tid = ( 236 self.start_fork_test(["fork"])) 237 238 if kill_parent: 239 self.test_sequence.add_log_lines([ 240 # kill the process 241 "read packet: $vKill;{}#00".format(parent_pid), 242 "send packet: $OK#00", 243 ], True) 244 if kill_child: 245 self.test_sequence.add_log_lines([ 246 # kill the process 247 "read packet: $vKill;{}#00".format(child_pid), 248 "send packet: $OK#00", 249 ], True) 250 self.test_sequence.add_log_lines([ 251 # check child PID/TID 252 "read packet: $Hgp{}.{}#00".format(child_pid, child_tid), 253 "send packet: ${}#00".format("Eff" if kill_child else "OK"), 254 # check parent PID/TID 255 "read packet: $Hgp{}.{}#00".format(parent_pid, parent_tid), 256 "send packet: ${}#00".format("Eff" if kill_parent else "OK"), 257 ], True) 258 self.expect_gdbremote_sequence() 259 260 @add_test_categories(["fork"]) 261 def test_vkill_child(self): 262 self.vkill_test(kill_child=True) 263 264 @add_test_categories(["fork"]) 265 def test_vkill_parent(self): 266 self.vkill_test(kill_parent=True) 267 268 @add_test_categories(["fork"]) 269 def test_vkill_both(self): 270 self.vkill_test(kill_parent=True, kill_child=True) 271 272 def resume_one_test(self, run_order, use_vCont=False): 273 parent_pid, parent_tid, child_pid, child_tid = ( 274 self.start_fork_test(["fork", "trap"])) 275 276 parent_expect = [ 277 "[$]T05thread:p{}.{};.*".format(parent_pid, parent_tid), 278 "[$]W00;process:{}#.*".format(parent_pid), 279 ] 280 child_expect = [ 281 "[$]T05thread:p{}.{};.*".format(child_pid, child_tid), 282 "[$]W00;process:{}#.*".format(child_pid), 283 ] 284 285 for x in run_order: 286 if x == "parent": 287 pidtid = (parent_pid, parent_tid) 288 expect = parent_expect.pop(0) 289 elif x == "child": 290 pidtid = (child_pid, child_tid) 291 expect = child_expect.pop(0) 292 else: 293 assert False, "unexpected x={}".format(x) 294 295 if use_vCont: 296 self.test_sequence.add_log_lines([ 297 # continue the selected process 298 "read packet: $vCont;c:p{}.{}#00".format(*pidtid), 299 ], True) 300 else: 301 self.test_sequence.add_log_lines([ 302 # continue the selected process 303 "read packet: $Hcp{}.{}#00".format(*pidtid), 304 "send packet: $OK#00", 305 "read packet: $c#00", 306 ], True) 307 self.test_sequence.add_log_lines([ 308 {"direction": "send", "regex": expect}, 309 ], True) 310 # if at least one process remained, check both PIDs 311 if parent_expect or child_expect: 312 self.test_sequence.add_log_lines([ 313 "read packet: $Hgp{}.{}#00".format(parent_pid, parent_tid), 314 "send packet: ${}#00".format("OK" if parent_expect else "Eff"), 315 "read packet: $Hgp{}.{}#00".format(child_pid, child_tid), 316 "send packet: ${}#00".format("OK" if child_expect else "Eff"), 317 ], True) 318 self.expect_gdbremote_sequence() 319 320 @expectedFailureAll(archs=["aarch64"], 321 bugnumber="https://github.com/llvm/llvm-project/issues/56268") 322 @add_test_categories(["fork"]) 323 def test_c_parent(self): 324 self.resume_one_test(run_order=["parent", "parent"]) 325 326 @expectedFailureAll(archs=["aarch64"], 327 bugnumber="https://github.com/llvm/llvm-project/issues/56268") 328 @add_test_categories(["fork"]) 329 def test_c_child(self): 330 self.resume_one_test(run_order=["child", "child"]) 331 332 @expectedFailureAll(archs=["aarch64"], 333 bugnumber="https://github.com/llvm/llvm-project/issues/56268") 334 @add_test_categories(["fork"]) 335 def test_c_parent_then_child(self): 336 self.resume_one_test(run_order=["parent", "parent", "child", "child"]) 337 338 @expectedFailureAll(archs=["aarch64"], 339 bugnumber="https://github.com/llvm/llvm-project/issues/56268") 340 @add_test_categories(["fork"]) 341 def test_c_child_then_parent(self): 342 self.resume_one_test(run_order=["child", "child", "parent", "parent"]) 343 344 @expectedFailureAll(archs=["aarch64"], 345 bugnumber="https://github.com/llvm/llvm-project/issues/56268") 346 @add_test_categories(["fork"]) 347 def test_c_interspersed(self): 348 self.resume_one_test(run_order=["parent", "child", "parent", "child"]) 349 350 @expectedFailureAll(archs=["aarch64"], 351 bugnumber="https://github.com/llvm/llvm-project/issues/56268") 352 @add_test_categories(["fork"]) 353 def test_vCont_parent(self): 354 self.resume_one_test(run_order=["parent", "parent"], use_vCont=True) 355 356 @expectedFailureAll(archs=["aarch64"], 357 bugnumber="https://github.com/llvm/llvm-project/issues/56268") 358 @add_test_categories(["fork"]) 359 def test_vCont_child(self): 360 self.resume_one_test(run_order=["child", "child"], use_vCont=True) 361 362 @expectedFailureAll(archs=["aarch64"], 363 bugnumber="https://github.com/llvm/llvm-project/issues/56268") 364 @add_test_categories(["fork"]) 365 def test_vCont_parent_then_child(self): 366 self.resume_one_test(run_order=["parent", "parent", "child", "child"], 367 use_vCont=True) 368 369 @expectedFailureAll(archs=["aarch64"], 370 bugnumber="https://github.com/llvm/llvm-project/issues/56268") 371 @add_test_categories(["fork"]) 372 def test_vCont_child_then_parent(self): 373 self.resume_one_test(run_order=["child", "child", "parent", "parent"], 374 use_vCont=True) 375 376 @expectedFailureAll(archs=["aarch64"], 377 bugnumber="https://github.com/llvm/llvm-project/issues/56268") 378 @add_test_categories(["fork"]) 379 def test_vCont_interspersed(self): 380 self.resume_one_test(run_order=["parent", "child", "parent", "child"], 381 use_vCont=True) 382 383 @add_test_categories(["fork"]) 384 def test_vCont_two_processes(self): 385 parent_pid, parent_tid, child_pid, child_tid = ( 386 self.start_fork_test(["fork", "trap"])) 387 388 self.test_sequence.add_log_lines([ 389 # try to resume both processes 390 "read packet: $vCont;c:p{}.{};c:p{}.{}#00".format( 391 parent_pid, parent_tid, child_pid, child_tid), 392 "send packet: $E03#00", 393 ], True) 394 self.expect_gdbremote_sequence() 395 396 @add_test_categories(["fork"]) 397 def test_vCont_all_processes_explicit(self): 398 self.start_fork_test(["fork", "trap"]) 399 400 self.test_sequence.add_log_lines([ 401 # try to resume all processes implicitly 402 "read packet: $vCont;c:p-1.-1#00", 403 "send packet: $E03#00", 404 ], True) 405 self.expect_gdbremote_sequence() 406 407 @add_test_categories(["fork"]) 408 def test_vCont_all_processes_implicit(self): 409 self.start_fork_test(["fork", "trap"]) 410 411 self.test_sequence.add_log_lines([ 412 # try to resume all processes implicitly 413 "read packet: $vCont;c#00", 414 "send packet: $E03#00", 415 ], True) 416 self.expect_gdbremote_sequence() 417 418 @add_test_categories(["fork"]) 419 def test_threadinfo(self): 420 parent_pid, parent_tid, child_pid, child_tid = ( 421 self.start_fork_test(["fork", "thread:new", "trap"])) 422 pidtids = [ 423 (parent_pid, parent_tid), 424 (child_pid, child_tid), 425 ] 426 427 self.add_threadinfo_collection_packets() 428 ret = self.expect_gdbremote_sequence() 429 prev_pidtids = set(self.parse_threadinfo_packets(ret)) 430 self.assertEqual(prev_pidtids, 431 frozenset((int(pid, 16), int(tid, 16)) 432 for pid, tid in pidtids)) 433 self.reset_test_sequence() 434 435 for pidtid in pidtids: 436 self.test_sequence.add_log_lines( 437 ["read packet: $Hcp{}.{}#00".format(*pidtid), 438 "send packet: $OK#00", 439 "read packet: $c#00", 440 {"direction": "send", 441 "regex": "^[$]T05thread:p{}.{}.*".format(*pidtid), 442 }, 443 ], True) 444 self.add_threadinfo_collection_packets() 445 ret = self.expect_gdbremote_sequence() 446 self.reset_test_sequence() 447 new_pidtids = set(self.parse_threadinfo_packets(ret)) 448 added_pidtid = new_pidtids - prev_pidtids 449 prev_pidtids = new_pidtids 450 451 # verify that we've got exactly one new thread, and that 452 # the PID matches 453 self.assertEqual(len(added_pidtid), 1) 454 self.assertEqual(added_pidtid.pop()[0], int(pidtid[0], 16)) 455 456 for pidtid in new_pidtids: 457 self.test_sequence.add_log_lines( 458 ["read packet: $Hgp{:x}.{:x}#00".format(*pidtid), 459 "send packet: $OK#00", 460 ], True) 461 self.expect_gdbremote_sequence() 462 463 @add_test_categories(["fork"]) 464 def test_memory_read_write(self): 465 self.build() 466 INITIAL_DATA = "Initial message" 467 self.prep_debug_monitor_and_inferior( 468 inferior_args=["set-message:{}".format(INITIAL_DATA), 469 "get-data-address-hex:g_message", 470 "fork", 471 "print-message:", 472 "trap", 473 ]) 474 self.add_qSupported_packets(["multiprocess+", 475 "fork-events+"]) 476 ret = self.expect_gdbremote_sequence() 477 self.assertIn("fork-events+", ret["qSupported_response"]) 478 self.reset_test_sequence() 479 480 # continue and expect fork 481 self.test_sequence.add_log_lines([ 482 "read packet: $c#00", 483 {"type": "output_match", 484 "regex": self.maybe_strict_output_regex(r"data address: 0x([0-9a-fA-F]+)\r\n"), 485 "capture": {1: "addr"}}, 486 {"direction": "send", "regex": self.fork_regex.format("fork"), 487 "capture": self.fork_capture}, 488 ], True) 489 ret = self.expect_gdbremote_sequence() 490 pidtids = { 491 "parent": (ret["parent_pid"], ret["parent_tid"]), 492 "child": (ret["child_pid"], ret["child_tid"]), 493 } 494 addr = ret["addr"] 495 self.reset_test_sequence() 496 497 for name, pidtid in pidtids.items(): 498 self.test_sequence.add_log_lines( 499 ["read packet: $Hgp{}.{}#00".format(*pidtid), 500 "send packet: $OK#00", 501 # read the current memory contents 502 "read packet: $m{},{:x}#00".format(addr, 503 len(INITIAL_DATA) + 1), 504 {"direction": "send", 505 "regex": r"^[$](.+)#.*$", 506 "capture": {1: "data"}}, 507 # write a new value 508 "read packet: $M{},{:x}:{}#00".format(addr, 509 len(name) + 1, 510 seven.hexlify( 511 name + "\0")), 512 "send packet: $OK#00", 513 # resume the process and wait for the trap 514 "read packet: $Hcp{}.{}#00".format(*pidtid), 515 "send packet: $OK#00", 516 "read packet: $c#00", 517 {"type": "output_match", 518 "regex": self.maybe_strict_output_regex(r"message: (.*)\r\n"), 519 "capture": {1: "printed_message"}}, 520 {"direction": "send", 521 "regex": "^[$]T05thread:p{}.{}.*".format(*pidtid), 522 }, 523 ], True) 524 ret = self.expect_gdbremote_sequence() 525 data = seven.unhexlify(ret["data"]) 526 self.assertEqual(data, INITIAL_DATA + "\0") 527 self.assertEqual(ret["printed_message"], name); 528 self.reset_test_sequence() 529 530 # we do the second round separately to make sure that initial data 531 # is correctly preserved while writing into the first process 532 533 for name, pidtid in pidtids.items(): 534 self.test_sequence.add_log_lines( 535 ["read packet: $Hgp{}.{}#00".format(*pidtid), 536 "send packet: $OK#00", 537 # read the current memory contents 538 "read packet: $m{},{:x}#00".format(addr, 539 len(name) + 1), 540 {"direction": "send", 541 "regex": r"^[$](.+)#.*$", 542 "capture": {1: "data"}}, 543 ], True) 544 ret = self.expect_gdbremote_sequence() 545 self.assertIsNotNone(ret.get("data")) 546 data = seven.unhexlify(ret.get("data")) 547 self.assertEqual(data, name + "\0") 548 self.reset_test_sequence() 549 550 @add_test_categories(["fork"]) 551 def test_register_read_write(self): 552 parent_pid, parent_tid, child_pid, child_tid = ( 553 self.start_fork_test(["fork", "thread:new", "trap"])) 554 pidtids = [ 555 (parent_pid, parent_tid), 556 (child_pid, child_tid), 557 ] 558 559 for pidtid in pidtids: 560 self.test_sequence.add_log_lines( 561 ["read packet: $Hcp{}.{}#00".format(*pidtid), 562 "send packet: $OK#00", 563 "read packet: $c#00", 564 {"direction": "send", 565 "regex": "^[$]T05thread:p{}.{}.*".format(*pidtid), 566 }, 567 ], True) 568 569 self.add_threadinfo_collection_packets() 570 ret = self.expect_gdbremote_sequence() 571 self.reset_test_sequence() 572 573 pidtids = set(self.parse_threadinfo_packets(ret)) 574 self.assertEqual(len(pidtids), 4) 575 # first, save register values from all the threads 576 thread_regs = {} 577 for pidtid in pidtids: 578 for regno in range(256): 579 self.test_sequence.add_log_lines( 580 ["read packet: $Hgp{:x}.{:x}#00".format(*pidtid), 581 "send packet: $OK#00", 582 "read packet: $p{:x}#00".format(regno), 583 {"direction": "send", 584 "regex": r"^[$](.+)#.*$", 585 "capture": {1: "data"}}, 586 ], True) 587 ret = self.expect_gdbremote_sequence() 588 data = ret.get("data") 589 self.assertIsNotNone(data) 590 # ignore registers shorter than 32 bits (this also catches 591 # "Exx" errors) 592 if len(data) >= 8: 593 break 594 else: 595 self.skipTest("no usable register found") 596 thread_regs[pidtid] = (regno, data) 597 598 vals = set(x[1] for x in thread_regs.values()) 599 # NB: cheap hack to make the loop below easier 600 new_val = next(iter(vals)) 601 602 # then, start altering them and verify that we don't unexpectedly 603 # change the value from another thread 604 for pidtid in pidtids: 605 old_val = thread_regs[pidtid] 606 regno = old_val[0] 607 old_val_length = len(old_val[1]) 608 # generate a unique new_val 609 while new_val in vals: 610 new_val = ('{{:0{}x}}'.format(old_val_length) 611 .format(random.getrandbits(old_val_length*4))) 612 vals.add(new_val) 613 614 self.test_sequence.add_log_lines( 615 ["read packet: $Hgp{:x}.{:x}#00".format(*pidtid), 616 "send packet: $OK#00", 617 "read packet: $p{:x}#00".format(regno), 618 {"direction": "send", 619 "regex": r"^[$](.+)#.*$", 620 "capture": {1: "data"}}, 621 "read packet: $P{:x}={}#00".format(regno, new_val), 622 "send packet: $OK#00", 623 ], True) 624 ret = self.expect_gdbremote_sequence() 625 data = ret.get("data") 626 self.assertIsNotNone(data) 627 self.assertEqual(data, old_val[1]) 628 thread_regs[pidtid] = (regno, new_val) 629 630 # finally, verify that new values took effect 631 for pidtid in pidtids: 632 old_val = thread_regs[pidtid] 633 self.test_sequence.add_log_lines( 634 ["read packet: $Hgp{:x}.{:x}#00".format(*pidtid), 635 "send packet: $OK#00", 636 "read packet: $p{:x}#00".format(old_val[0]), 637 {"direction": "send", 638 "regex": r"^[$](.+)#.*$", 639 "capture": {1: "data"}}, 640 ], True) 641 ret = self.expect_gdbremote_sequence() 642 data = ret.get("data") 643 self.assertIsNotNone(data) 644 self.assertEqual(data, old_val[1]) 645 646 @add_test_categories(["fork"]) 647 def test_qC(self): 648 parent_pid, parent_tid, child_pid, child_tid = ( 649 self.start_fork_test(["fork", "thread:new", "trap"])) 650 pidtids = [ 651 (parent_pid, parent_tid), 652 (child_pid, child_tid), 653 ] 654 655 for pidtid in pidtids: 656 self.test_sequence.add_log_lines( 657 ["read packet: $Hcp{}.{}#00".format(*pidtid), 658 "send packet: $OK#00", 659 "read packet: $c#00", 660 {"direction": "send", 661 "regex": "^[$]T05thread:p{}.{}.*".format(*pidtid), 662 }, 663 ], True) 664 665 self.add_threadinfo_collection_packets() 666 ret = self.expect_gdbremote_sequence() 667 self.reset_test_sequence() 668 669 pidtids = set(self.parse_threadinfo_packets(ret)) 670 self.assertEqual(len(pidtids), 4) 671 for pidtid in pidtids: 672 self.test_sequence.add_log_lines( 673 ["read packet: $Hgp{:x}.{:x}#00".format(*pidtid), 674 "send packet: $OK#00", 675 "read packet: $qC#00", 676 "send packet: $QCp{:x}.{:x}#00".format(*pidtid), 677 ], True) 678 self.expect_gdbremote_sequence() 679 680 @add_test_categories(["fork"]) 681 def test_T(self): 682 parent_pid, parent_tid, child_pid, child_tid = ( 683 self.start_fork_test(["fork", "thread:new", "trap"])) 684 pidtids = [ 685 (parent_pid, parent_tid), 686 (child_pid, child_tid), 687 ] 688 689 for pidtid in pidtids: 690 self.test_sequence.add_log_lines( 691 ["read packet: $Hcp{}.{}#00".format(*pidtid), 692 "send packet: $OK#00", 693 "read packet: $c#00", 694 {"direction": "send", 695 "regex": "^[$]T05thread:p{}.{}.*".format(*pidtid), 696 }, 697 ], True) 698 699 self.add_threadinfo_collection_packets() 700 ret = self.expect_gdbremote_sequence() 701 self.reset_test_sequence() 702 703 pidtids = set(self.parse_threadinfo_packets(ret)) 704 self.assertEqual(len(pidtids), 4) 705 max_pid = max(pid for pid, tid in pidtids) 706 max_tid = max(tid for pid, tid in pidtids) 707 bad_pidtids = ( 708 (max_pid, max_tid + 1, "E02"), 709 (max_pid + 1, max_tid, "E01"), 710 (max_pid + 1, max_tid + 1, "E01"), 711 ) 712 713 for pidtid in pidtids: 714 self.test_sequence.add_log_lines( 715 [ 716 # test explicit PID+TID 717 "read packet: $Tp{:x}.{:x}#00".format(*pidtid), 718 "send packet: $OK#00", 719 # test implicit PID via Hg 720 "read packet: $Hgp{:x}.{:x}#00".format(*pidtid), 721 "send packet: $OK#00", 722 "read packet: $T{:x}#00".format(max_tid + 1), 723 "send packet: $E02#00", 724 "read packet: $T{:x}#00".format(pidtid[1]), 725 "send packet: $OK#00", 726 ], True) 727 for pid, tid, expected in bad_pidtids: 728 self.test_sequence.add_log_lines( 729 ["read packet: $Tp{:x}.{:x}#00".format(pid, tid), 730 "send packet: ${}#00".format(expected), 731 ], True) 732 self.expect_gdbremote_sequence() 733