xref: /llvm-project/lldb/test/API/tools/lldb-server/TestGdbRemoteFork.py (revision a1df636a8b51a660d58708d82a5bc060ee5f57d3)
1import random
2
3import gdbremote_testcase
4from lldbsuite.test.decorators import *
5from lldbsuite.test.lldbtest import *
6from lldbsuite.test import lldbutil
7
8class TestGdbRemoteFork(gdbremote_testcase.GdbRemoteTestCaseBase):
9
10    fork_regex = ("[$]T05thread:p([0-9a-f]+)[.]([0-9a-f]+);.*"
11                  "{}:p([0-9a-f]+)[.]([0-9a-f]+).*")
12    fork_capture = {1: "parent_pid", 2: "parent_tid",
13                    3: "child_pid", 4: "child_tid"}
14
15    def start_fork_test(self, args, variant="fork"):
16        self.build()
17        self.prep_debug_monitor_and_inferior(inferior_args=args)
18        self.add_qSupported_packets(["multiprocess+",
19                                     "{}-events+".format(variant)])
20        ret = self.expect_gdbremote_sequence()
21        self.assertIn("{}-events+".format(variant), ret["qSupported_response"])
22        self.reset_test_sequence()
23
24        # continue and expect fork
25        self.test_sequence.add_log_lines([
26            "read packet: $c#00",
27            {"direction": "send", "regex": self.fork_regex.format(variant),
28             "capture": self.fork_capture},
29        ], True)
30        ret = self.expect_gdbremote_sequence()
31        self.reset_test_sequence()
32
33        return tuple(ret[x] for x in ("parent_pid", "parent_tid",
34                                      "child_pid", "child_tid"))
35
36    @add_test_categories(["fork"])
37    def test_fork_multithreaded(self):
38        _, _, child_pid, _ = self.start_fork_test(["thread:new"]*2 + ["fork"])
39
40        # detach the forked child
41        self.test_sequence.add_log_lines([
42            "read packet: $D;{}#00".format(child_pid),
43            "send packet: $OK#00",
44            "read packet: $k#00",
45        ], True)
46        self.expect_gdbremote_sequence()
47
48    def fork_and_detach_test(self, variant):
49        parent_pid, parent_tid, child_pid, child_tid = (
50            self.start_fork_test([variant], variant))
51
52        # detach the forked child
53        self.test_sequence.add_log_lines([
54            "read packet: $D;{}#00".format(child_pid),
55            "send packet: $OK#00",
56            # verify that the current process is correct
57            "read packet: $qC#00",
58            "send packet: $QCp{}.{}#00".format(parent_pid, parent_tid),
59            # verify that the correct processes are detached/available
60            "read packet: $Hgp{}.{}#00".format(child_pid, child_tid),
61            "send packet: $Eff#00",
62            "read packet: $Hgp{}.{}#00".format(parent_pid, parent_tid),
63            "send packet: $OK#00",
64        ], True)
65        self.expect_gdbremote_sequence()
66        self.reset_test_sequence()
67        return parent_pid, parent_tid
68
69    @add_test_categories(["fork"])
70    def test_fork(self):
71        parent_pid, _ = self.fork_and_detach_test("fork")
72
73        # resume the parent
74        self.test_sequence.add_log_lines([
75            "read packet: $c#00",
76            "send packet: $W00;process:{}#00".format(parent_pid),
77        ], True)
78        self.expect_gdbremote_sequence()
79
80    @add_test_categories(["fork"])
81    def test_vfork(self):
82        parent_pid, parent_tid = self.fork_and_detach_test("vfork")
83
84        # resume the parent
85        self.test_sequence.add_log_lines([
86            "read packet: $c#00",
87            {"direction": "send",
88             "regex": r"[$]T05thread:p{}[.]{}.*vforkdone.*".format(parent_pid,
89                                                                   parent_tid),
90             },
91            "read packet: $c#00",
92            "send packet: $W00;process:{}#00".format(parent_pid),
93        ], True)
94        self.expect_gdbremote_sequence()
95
96    def fork_and_follow_test(self, variant):
97        parent_pid, parent_tid, child_pid, child_tid = (
98            self.start_fork_test([variant], variant))
99
100        # switch to the forked child
101        self.test_sequence.add_log_lines([
102            "read packet: $Hgp{}.{}#00".format(child_pid, child_tid),
103            "send packet: $OK#00",
104            "read packet: $Hcp{}.{}#00".format(child_pid, child_tid),
105            "send packet: $OK#00",
106            # detach the parent
107            "read packet: $D;{}#00".format(parent_pid),
108            "send packet: $OK#00",
109            # verify that the correct processes are detached/available
110            "read packet: $Hgp{}.{}#00".format(parent_pid, parent_tid),
111            "send packet: $Eff#00",
112            "read packet: $Hgp{}.{}#00".format(child_pid, child_tid),
113            "send packet: $OK#00",
114            # then resume the child
115            "read packet: $c#00",
116            "send packet: $W00;process:{}#00".format(child_pid),
117        ], True)
118        self.expect_gdbremote_sequence()
119
120    @add_test_categories(["fork"])
121    def test_fork_follow(self):
122        self.fork_and_follow_test("fork")
123
124    @add_test_categories(["fork"])
125    def test_vfork_follow(self):
126        self.fork_and_follow_test("vfork")
127
128    @add_test_categories(["fork"])
129    def test_select_wrong_pid(self):
130        self.build()
131        self.prep_debug_monitor_and_inferior()
132        self.add_qSupported_packets(["multiprocess+"])
133        ret = self.expect_gdbremote_sequence()
134        self.assertIn("multiprocess+", ret["qSupported_response"])
135        self.reset_test_sequence()
136
137        # get process pid
138        self.test_sequence.add_log_lines([
139            "read packet: $qC#00",
140            {"direction": "send", "regex": "[$]QCp([0-9a-f]+).([0-9a-f]+)#.*",
141             "capture": {1: "pid", 2: "tid"}},
142        ], True)
143        ret = self.expect_gdbremote_sequence()
144        pid, tid = (int(ret[x], 16) for x in ("pid", "tid"))
145        self.reset_test_sequence()
146
147        self.test_sequence.add_log_lines([
148            # try switching to correct pid
149            "read packet: $Hgp{:x}.{:x}#00".format(pid, tid),
150            "send packet: $OK#00",
151            "read packet: $Hcp{:x}.{:x}#00".format(pid, tid),
152            "send packet: $OK#00",
153            # try switching to invalid tid
154            "read packet: $Hgp{:x}.{:x}#00".format(pid, tid+1),
155            "send packet: $E15#00",
156            "read packet: $Hcp{:x}.{:x}#00".format(pid, tid+1),
157            "send packet: $E15#00",
158            # try switching to invalid pid
159            "read packet: $Hgp{:x}.{:x}#00".format(pid+1, tid),
160            "send packet: $Eff#00",
161            "read packet: $Hcp{:x}.{:x}#00".format(pid+1, tid),
162            "send packet: $Eff#00",
163        ], True)
164        self.expect_gdbremote_sequence()
165
166    @add_test_categories(["fork"])
167    def test_detach_current(self):
168        self.build()
169        self.prep_debug_monitor_and_inferior()
170        self.add_qSupported_packets(["multiprocess+"])
171        ret = self.expect_gdbremote_sequence()
172        self.assertIn("multiprocess+", ret["qSupported_response"])
173        self.reset_test_sequence()
174
175        # get process pid
176        self.test_sequence.add_log_lines([
177            "read packet: $qC#00",
178            {"direction": "send", "regex": "[$]QCp([0-9a-f]+).[0-9a-f]+#.*",
179             "capture": {1: "pid"}},
180        ], True)
181        ret = self.expect_gdbremote_sequence()
182        pid = ret["pid"]
183        self.reset_test_sequence()
184
185        # detach the process
186        self.test_sequence.add_log_lines([
187            "read packet: $D;{}#00".format(pid),
188            "send packet: $OK#00",
189            "read packet: $qC#00",
190            "send packet: $E44#00",
191        ], True)
192        self.expect_gdbremote_sequence()
193
194    @add_test_categories(["fork"])
195    def test_detach_all(self):
196        parent_pid, parent_tid, child_pid, child_tid = (
197            self.start_fork_test(["fork"]))
198
199        self.test_sequence.add_log_lines([
200            # double-check our PIDs
201            "read packet: $Hgp{}.{}#00".format(parent_pid, parent_tid),
202            "send packet: $OK#00",
203            "read packet: $Hgp{}.{}#00".format(child_pid, child_tid),
204            "send packet: $OK#00",
205            # detach all processes
206            "read packet: $D#00",
207            "send packet: $OK#00",
208            # verify that both PIDs are invalid now
209            "read packet: $Hgp{}.{}#00".format(parent_pid, parent_tid),
210            "send packet: $Eff#00",
211            "read packet: $Hgp{}.{}#00".format(child_pid, child_tid),
212            "send packet: $Eff#00",
213        ], True)
214        self.expect_gdbremote_sequence()
215
216    @add_test_categories(["fork"])
217    def test_kill_all(self):
218        parent_pid, _, child_pid, _ = self.start_fork_test(["fork"])
219
220        exit_regex = "[$]X09;process:([0-9a-f]+)#.*"
221        self.test_sequence.add_log_lines([
222            # kill all processes
223            "read packet: $k#00",
224            {"direction": "send", "regex": exit_regex,
225             "capture": {1: "pid1"}},
226            {"direction": "send", "regex": exit_regex,
227             "capture": {1: "pid2"}},
228        ], True)
229        ret = self.expect_gdbremote_sequence()
230        self.assertEqual(set([ret["pid1"], ret["pid2"]]),
231                         set([parent_pid, child_pid]))
232
233    def vkill_test(self, kill_parent=False, kill_child=False):
234        assert kill_parent or kill_child
235        parent_pid, parent_tid, child_pid, child_tid = (
236            self.start_fork_test(["fork"]))
237
238        if kill_parent:
239            self.test_sequence.add_log_lines([
240                # kill the process
241                "read packet: $vKill;{}#00".format(parent_pid),
242                "send packet: $OK#00",
243            ], True)
244        if kill_child:
245            self.test_sequence.add_log_lines([
246                # kill the process
247                "read packet: $vKill;{}#00".format(child_pid),
248                "send packet: $OK#00",
249            ], True)
250        self.test_sequence.add_log_lines([
251            # check child PID/TID
252            "read packet: $Hgp{}.{}#00".format(child_pid, child_tid),
253            "send packet: ${}#00".format("Eff" if kill_child else "OK"),
254            # check parent PID/TID
255            "read packet: $Hgp{}.{}#00".format(parent_pid, parent_tid),
256            "send packet: ${}#00".format("Eff" if kill_parent else "OK"),
257        ], True)
258        self.expect_gdbremote_sequence()
259
260    @add_test_categories(["fork"])
261    def test_vkill_child(self):
262        self.vkill_test(kill_child=True)
263
264    @add_test_categories(["fork"])
265    def test_vkill_parent(self):
266        self.vkill_test(kill_parent=True)
267
268    @add_test_categories(["fork"])
269    def test_vkill_both(self):
270        self.vkill_test(kill_parent=True, kill_child=True)
271
272    def resume_one_test(self, run_order, use_vCont=False):
273        parent_pid, parent_tid, child_pid, child_tid = (
274            self.start_fork_test(["fork", "trap"]))
275
276        parent_expect = [
277            "[$]T05thread:p{}.{};.*".format(parent_pid, parent_tid),
278            "[$]W00;process:{}#.*".format(parent_pid),
279        ]
280        child_expect = [
281            "[$]T05thread:p{}.{};.*".format(child_pid, child_tid),
282            "[$]W00;process:{}#.*".format(child_pid),
283        ]
284
285        for x in run_order:
286            if x == "parent":
287                pidtid = (parent_pid, parent_tid)
288                expect = parent_expect.pop(0)
289            elif x == "child":
290                pidtid = (child_pid, child_tid)
291                expect = child_expect.pop(0)
292            else:
293                assert False, "unexpected x={}".format(x)
294
295            if use_vCont:
296                self.test_sequence.add_log_lines([
297                    # continue the selected process
298                    "read packet: $vCont;c:p{}.{}#00".format(*pidtid),
299                ], True)
300            else:
301                self.test_sequence.add_log_lines([
302                    # continue the selected process
303                    "read packet: $Hcp{}.{}#00".format(*pidtid),
304                    "send packet: $OK#00",
305                    "read packet: $c#00",
306                ], True)
307            self.test_sequence.add_log_lines([
308                {"direction": "send", "regex": expect},
309            ], True)
310            # if at least one process remained, check both PIDs
311            if parent_expect or child_expect:
312                self.test_sequence.add_log_lines([
313                    "read packet: $Hgp{}.{}#00".format(parent_pid, parent_tid),
314                    "send packet: ${}#00".format("OK" if parent_expect else "Eff"),
315                    "read packet: $Hgp{}.{}#00".format(child_pid, child_tid),
316                    "send packet: ${}#00".format("OK" if child_expect else "Eff"),
317                ], True)
318        self.expect_gdbremote_sequence()
319
320    @expectedFailureAll(archs=["aarch64"],
321                        bugnumber="https://github.com/llvm/llvm-project/issues/56268")
322    @add_test_categories(["fork"])
323    def test_c_parent(self):
324        self.resume_one_test(run_order=["parent", "parent"])
325
326    @expectedFailureAll(archs=["aarch64"],
327                        bugnumber="https://github.com/llvm/llvm-project/issues/56268")
328    @add_test_categories(["fork"])
329    def test_c_child(self):
330        self.resume_one_test(run_order=["child", "child"])
331
332    @expectedFailureAll(archs=["aarch64"],
333                        bugnumber="https://github.com/llvm/llvm-project/issues/56268")
334    @add_test_categories(["fork"])
335    def test_c_parent_then_child(self):
336        self.resume_one_test(run_order=["parent", "parent", "child", "child"])
337
338    @expectedFailureAll(archs=["aarch64"],
339                        bugnumber="https://github.com/llvm/llvm-project/issues/56268")
340    @add_test_categories(["fork"])
341    def test_c_child_then_parent(self):
342        self.resume_one_test(run_order=["child", "child", "parent", "parent"])
343
344    @expectedFailureAll(archs=["aarch64"],
345                        bugnumber="https://github.com/llvm/llvm-project/issues/56268")
346    @add_test_categories(["fork"])
347    def test_c_interspersed(self):
348        self.resume_one_test(run_order=["parent", "child", "parent", "child"])
349
350    @expectedFailureAll(archs=["aarch64"],
351                        bugnumber="https://github.com/llvm/llvm-project/issues/56268")
352    @add_test_categories(["fork"])
353    def test_vCont_parent(self):
354        self.resume_one_test(run_order=["parent", "parent"], use_vCont=True)
355
356    @expectedFailureAll(archs=["aarch64"],
357                        bugnumber="https://github.com/llvm/llvm-project/issues/56268")
358    @add_test_categories(["fork"])
359    def test_vCont_child(self):
360        self.resume_one_test(run_order=["child", "child"], use_vCont=True)
361
362    @expectedFailureAll(archs=["aarch64"],
363                        bugnumber="https://github.com/llvm/llvm-project/issues/56268")
364    @add_test_categories(["fork"])
365    def test_vCont_parent_then_child(self):
366        self.resume_one_test(run_order=["parent", "parent", "child", "child"],
367                             use_vCont=True)
368
369    @expectedFailureAll(archs=["aarch64"],
370                        bugnumber="https://github.com/llvm/llvm-project/issues/56268")
371    @add_test_categories(["fork"])
372    def test_vCont_child_then_parent(self):
373        self.resume_one_test(run_order=["child", "child", "parent", "parent"],
374                             use_vCont=True)
375
376    @expectedFailureAll(archs=["aarch64"],
377                        bugnumber="https://github.com/llvm/llvm-project/issues/56268")
378    @add_test_categories(["fork"])
379    def test_vCont_interspersed(self):
380        self.resume_one_test(run_order=["parent", "child", "parent", "child"],
381                             use_vCont=True)
382
383    @add_test_categories(["fork"])
384    def test_vCont_two_processes(self):
385        parent_pid, parent_tid, child_pid, child_tid = (
386            self.start_fork_test(["fork", "trap"]))
387
388        self.test_sequence.add_log_lines([
389            # try to resume both processes
390            "read packet: $vCont;c:p{}.{};c:p{}.{}#00".format(
391                parent_pid, parent_tid, child_pid, child_tid),
392            "send packet: $E03#00",
393        ], True)
394        self.expect_gdbremote_sequence()
395
396    @add_test_categories(["fork"])
397    def test_vCont_all_processes_explicit(self):
398        self.start_fork_test(["fork", "trap"])
399
400        self.test_sequence.add_log_lines([
401            # try to resume all processes implicitly
402            "read packet: $vCont;c:p-1.-1#00",
403            "send packet: $E03#00",
404        ], True)
405        self.expect_gdbremote_sequence()
406
407    @add_test_categories(["fork"])
408    def test_vCont_all_processes_implicit(self):
409        self.start_fork_test(["fork", "trap"])
410
411        self.test_sequence.add_log_lines([
412            # try to resume all processes implicitly
413            "read packet: $vCont;c#00",
414            "send packet: $E03#00",
415        ], True)
416        self.expect_gdbremote_sequence()
417
418    @add_test_categories(["fork"])
419    def test_threadinfo(self):
420        parent_pid, parent_tid, child_pid, child_tid = (
421            self.start_fork_test(["fork", "thread:new", "trap"]))
422        pidtids = [
423            (parent_pid, parent_tid),
424            (child_pid, child_tid),
425        ]
426
427        self.add_threadinfo_collection_packets()
428        ret = self.expect_gdbremote_sequence()
429        prev_pidtids = set(self.parse_threadinfo_packets(ret))
430        self.assertEqual(prev_pidtids,
431                         frozenset((int(pid, 16), int(tid, 16))
432                                   for pid, tid in pidtids))
433        self.reset_test_sequence()
434
435        for pidtid in pidtids:
436            self.test_sequence.add_log_lines(
437                ["read packet: $Hcp{}.{}#00".format(*pidtid),
438                 "send packet: $OK#00",
439                 "read packet: $c#00",
440                 {"direction": "send",
441                  "regex": "^[$]T05thread:p{}.{}.*".format(*pidtid),
442                  },
443                 ], True)
444            self.add_threadinfo_collection_packets()
445            ret = self.expect_gdbremote_sequence()
446            self.reset_test_sequence()
447            new_pidtids = set(self.parse_threadinfo_packets(ret))
448            added_pidtid = new_pidtids - prev_pidtids
449            prev_pidtids = new_pidtids
450
451            # verify that we've got exactly one new thread, and that
452            # the PID matches
453            self.assertEqual(len(added_pidtid), 1)
454            self.assertEqual(added_pidtid.pop()[0], int(pidtid[0], 16))
455
456        for pidtid in new_pidtids:
457            self.test_sequence.add_log_lines(
458                ["read packet: $Hgp{:x}.{:x}#00".format(*pidtid),
459                 "send packet: $OK#00",
460                 ], True)
461        self.expect_gdbremote_sequence()
462
463    @add_test_categories(["fork"])
464    def test_memory_read_write(self):
465        self.build()
466        INITIAL_DATA = "Initial message"
467        self.prep_debug_monitor_and_inferior(
468            inferior_args=["set-message:{}".format(INITIAL_DATA),
469                           "get-data-address-hex:g_message",
470                           "fork",
471                           "print-message:",
472                           "trap",
473                           ])
474        self.add_qSupported_packets(["multiprocess+",
475                                     "fork-events+"])
476        ret = self.expect_gdbremote_sequence()
477        self.assertIn("fork-events+", ret["qSupported_response"])
478        self.reset_test_sequence()
479
480        # continue and expect fork
481        self.test_sequence.add_log_lines([
482            "read packet: $c#00",
483            {"type": "output_match",
484             "regex": self.maybe_strict_output_regex(r"data address: 0x([0-9a-fA-F]+)\r\n"),
485             "capture": {1: "addr"}},
486            {"direction": "send", "regex": self.fork_regex.format("fork"),
487             "capture": self.fork_capture},
488        ], True)
489        ret = self.expect_gdbremote_sequence()
490        pidtids = {
491            "parent": (ret["parent_pid"], ret["parent_tid"]),
492            "child": (ret["child_pid"], ret["child_tid"]),
493        }
494        addr = ret["addr"]
495        self.reset_test_sequence()
496
497        for name, pidtid in pidtids.items():
498            self.test_sequence.add_log_lines(
499                ["read packet: $Hgp{}.{}#00".format(*pidtid),
500                 "send packet: $OK#00",
501                 # read the current memory contents
502                 "read packet: $m{},{:x}#00".format(addr,
503                                                    len(INITIAL_DATA) + 1),
504                 {"direction": "send",
505                  "regex": r"^[$](.+)#.*$",
506                  "capture": {1: "data"}},
507                 # write a new value
508                 "read packet: $M{},{:x}:{}#00".format(addr,
509                                                       len(name) + 1,
510                                                       seven.hexlify(
511                                                           name + "\0")),
512                 "send packet: $OK#00",
513                 # resume the process and wait for the trap
514                 "read packet: $Hcp{}.{}#00".format(*pidtid),
515                 "send packet: $OK#00",
516                 "read packet: $c#00",
517                 {"type": "output_match",
518                  "regex": self.maybe_strict_output_regex(r"message: (.*)\r\n"),
519                  "capture": {1: "printed_message"}},
520                 {"direction": "send",
521                  "regex": "^[$]T05thread:p{}.{}.*".format(*pidtid),
522                  },
523                 ], True)
524            ret = self.expect_gdbremote_sequence()
525            data = seven.unhexlify(ret["data"])
526            self.assertEqual(data, INITIAL_DATA + "\0")
527            self.assertEqual(ret["printed_message"], name);
528            self.reset_test_sequence()
529
530        # we do the second round separately to make sure that initial data
531        # is correctly preserved while writing into the first process
532
533        for name, pidtid in pidtids.items():
534            self.test_sequence.add_log_lines(
535                ["read packet: $Hgp{}.{}#00".format(*pidtid),
536                 "send packet: $OK#00",
537                 # read the current memory contents
538                 "read packet: $m{},{:x}#00".format(addr,
539                                                    len(name) + 1),
540                 {"direction": "send",
541                  "regex": r"^[$](.+)#.*$",
542                  "capture": {1: "data"}},
543                 ], True)
544            ret = self.expect_gdbremote_sequence()
545            self.assertIsNotNone(ret.get("data"))
546            data = seven.unhexlify(ret.get("data"))
547            self.assertEqual(data, name + "\0")
548            self.reset_test_sequence()
549
550    @add_test_categories(["fork"])
551    def test_register_read_write(self):
552        parent_pid, parent_tid, child_pid, child_tid = (
553            self.start_fork_test(["fork", "thread:new", "trap"]))
554        pidtids = [
555            (parent_pid, parent_tid),
556            (child_pid, child_tid),
557        ]
558
559        for pidtid in pidtids:
560            self.test_sequence.add_log_lines(
561                ["read packet: $Hcp{}.{}#00".format(*pidtid),
562                 "send packet: $OK#00",
563                 "read packet: $c#00",
564                 {"direction": "send",
565                  "regex": "^[$]T05thread:p{}.{}.*".format(*pidtid),
566                  },
567                 ], True)
568
569        self.add_threadinfo_collection_packets()
570        ret = self.expect_gdbremote_sequence()
571        self.reset_test_sequence()
572
573        pidtids = set(self.parse_threadinfo_packets(ret))
574        self.assertEqual(len(pidtids), 4)
575        # first, save register values from all the threads
576        thread_regs = {}
577        for pidtid in pidtids:
578            for regno in range(256):
579                self.test_sequence.add_log_lines(
580                    ["read packet: $Hgp{:x}.{:x}#00".format(*pidtid),
581                     "send packet: $OK#00",
582                     "read packet: $p{:x}#00".format(regno),
583                     {"direction": "send",
584                      "regex": r"^[$](.+)#.*$",
585                      "capture": {1: "data"}},
586                     ], True)
587                ret = self.expect_gdbremote_sequence()
588                data = ret.get("data")
589                self.assertIsNotNone(data)
590                # ignore registers shorter than 32 bits (this also catches
591                # "Exx" errors)
592                if len(data) >= 8:
593                    break
594            else:
595                self.skipTest("no usable register found")
596            thread_regs[pidtid] = (regno, data)
597
598        vals = set(x[1] for x in thread_regs.values())
599        # NB: cheap hack to make the loop below easier
600        new_val = next(iter(vals))
601
602        # then, start altering them and verify that we don't unexpectedly
603        # change the value from another thread
604        for pidtid in pidtids:
605            old_val = thread_regs[pidtid]
606            regno = old_val[0]
607            old_val_length = len(old_val[1])
608            # generate a unique new_val
609            while new_val in vals:
610                new_val = ('{{:0{}x}}'.format(old_val_length)
611                           .format(random.getrandbits(old_val_length*4)))
612            vals.add(new_val)
613
614            self.test_sequence.add_log_lines(
615                ["read packet: $Hgp{:x}.{:x}#00".format(*pidtid),
616                 "send packet: $OK#00",
617                 "read packet: $p{:x}#00".format(regno),
618                 {"direction": "send",
619                  "regex": r"^[$](.+)#.*$",
620                  "capture": {1: "data"}},
621                 "read packet: $P{:x}={}#00".format(regno, new_val),
622                 "send packet: $OK#00",
623                 ], True)
624            ret = self.expect_gdbremote_sequence()
625            data = ret.get("data")
626            self.assertIsNotNone(data)
627            self.assertEqual(data, old_val[1])
628            thread_regs[pidtid] = (regno, new_val)
629
630        # finally, verify that new values took effect
631        for pidtid in pidtids:
632            old_val = thread_regs[pidtid]
633            self.test_sequence.add_log_lines(
634                ["read packet: $Hgp{:x}.{:x}#00".format(*pidtid),
635                 "send packet: $OK#00",
636                 "read packet: $p{:x}#00".format(old_val[0]),
637                 {"direction": "send",
638                  "regex": r"^[$](.+)#.*$",
639                  "capture": {1: "data"}},
640                 ], True)
641            ret = self.expect_gdbremote_sequence()
642            data = ret.get("data")
643            self.assertIsNotNone(data)
644            self.assertEqual(data, old_val[1])
645
646    @add_test_categories(["fork"])
647    def test_qC(self):
648        parent_pid, parent_tid, child_pid, child_tid = (
649            self.start_fork_test(["fork", "thread:new", "trap"]))
650        pidtids = [
651            (parent_pid, parent_tid),
652            (child_pid, child_tid),
653        ]
654
655        for pidtid in pidtids:
656            self.test_sequence.add_log_lines(
657                ["read packet: $Hcp{}.{}#00".format(*pidtid),
658                 "send packet: $OK#00",
659                 "read packet: $c#00",
660                 {"direction": "send",
661                  "regex": "^[$]T05thread:p{}.{}.*".format(*pidtid),
662                  },
663                 ], True)
664
665        self.add_threadinfo_collection_packets()
666        ret = self.expect_gdbremote_sequence()
667        self.reset_test_sequence()
668
669        pidtids = set(self.parse_threadinfo_packets(ret))
670        self.assertEqual(len(pidtids), 4)
671        for pidtid in pidtids:
672            self.test_sequence.add_log_lines(
673                ["read packet: $Hgp{:x}.{:x}#00".format(*pidtid),
674                 "send packet: $OK#00",
675                 "read packet: $qC#00",
676                 "send packet: $QCp{:x}.{:x}#00".format(*pidtid),
677                 ], True)
678        self.expect_gdbremote_sequence()
679
680    @add_test_categories(["fork"])
681    def test_T(self):
682        parent_pid, parent_tid, child_pid, child_tid = (
683            self.start_fork_test(["fork", "thread:new", "trap"]))
684        pidtids = [
685            (parent_pid, parent_tid),
686            (child_pid, child_tid),
687        ]
688
689        for pidtid in pidtids:
690            self.test_sequence.add_log_lines(
691                ["read packet: $Hcp{}.{}#00".format(*pidtid),
692                 "send packet: $OK#00",
693                 "read packet: $c#00",
694                 {"direction": "send",
695                  "regex": "^[$]T05thread:p{}.{}.*".format(*pidtid),
696                  },
697                 ], True)
698
699        self.add_threadinfo_collection_packets()
700        ret = self.expect_gdbremote_sequence()
701        self.reset_test_sequence()
702
703        pidtids = set(self.parse_threadinfo_packets(ret))
704        self.assertEqual(len(pidtids), 4)
705        max_pid = max(pid for pid, tid in pidtids)
706        max_tid = max(tid for pid, tid in pidtids)
707        bad_pidtids = (
708            (max_pid, max_tid + 1, "E02"),
709            (max_pid + 1, max_tid, "E01"),
710            (max_pid + 1, max_tid + 1, "E01"),
711        )
712
713        for pidtid in pidtids:
714            self.test_sequence.add_log_lines(
715                [
716                 # test explicit PID+TID
717                 "read packet: $Tp{:x}.{:x}#00".format(*pidtid),
718                 "send packet: $OK#00",
719                 # test implicit PID via Hg
720                 "read packet: $Hgp{:x}.{:x}#00".format(*pidtid),
721                 "send packet: $OK#00",
722                 "read packet: $T{:x}#00".format(max_tid + 1),
723                 "send packet: $E02#00",
724                 "read packet: $T{:x}#00".format(pidtid[1]),
725                 "send packet: $OK#00",
726                 ], True)
727        for pid, tid, expected in bad_pidtids:
728            self.test_sequence.add_log_lines(
729                ["read packet: $Tp{:x}.{:x}#00".format(pid, tid),
730                 "send packet: ${}#00".format(expected),
731                 ], True)
732        self.expect_gdbremote_sequence()
733