xref: /llvm-project/lldb/test/API/tools/lldb-server/TestGdbRemoteFork.py (revision 2238dcc39358353cac21df75c3c3286ab20b8f53)
1import random
2
3from lldbsuite.test.decorators import *
4from lldbsuite.test.lldbtest import *
5
6from fork_testbase import GdbRemoteForkTestBase
7
8
9class TestGdbRemoteFork(GdbRemoteForkTestBase):
10    def setUp(self):
11        GdbRemoteForkTestBase.setUp(self)
12        if self.getPlatform() == "linux" and self.getArchitecture() in [
13            "arm",
14            "aarch64",
15        ]:
16            self.skipTest("Unsupported for Arm/AArch64 Linux")
17
18    @add_test_categories(["fork"])
19    def test_fork_multithreaded(self):
20        _, _, child_pid, _ = self.start_fork_test(["thread:new"] * 2 + ["fork"])
21
22        # detach the forked child
23        self.test_sequence.add_log_lines(
24            [
25                "read packet: $D;{}#00".format(child_pid),
26                "send packet: $OK#00",
27                "read packet: $k#00",
28            ],
29            True,
30        )
31        self.expect_gdbremote_sequence()
32
33    @add_test_categories(["fork"])
34    def test_fork(self):
35        parent_pid, _ = self.fork_and_detach_test("fork")
36
37        # resume the parent
38        self.test_sequence.add_log_lines(
39            [
40                "read packet: $c#00",
41                "send packet: $W00;process:{}#00".format(parent_pid),
42            ],
43            True,
44        )
45        self.expect_gdbremote_sequence()
46
47    @add_test_categories(["fork"])
48    def test_vfork(self):
49        parent_pid, parent_tid = self.fork_and_detach_test("vfork")
50
51        # resume the parent
52        self.test_sequence.add_log_lines(
53            [
54                "read packet: $c#00",
55                {
56                    "direction": "send",
57                    "regex": r"[$]T[0-9a-fA-F]{{2}}thread:p{}[.]{}.*vforkdone.*".format(
58                        parent_pid, parent_tid
59                    ),
60                },
61                "read packet: $c#00",
62                "send packet: $W00;process:{}#00".format(parent_pid),
63            ],
64            True,
65        )
66        self.expect_gdbremote_sequence()
67
68    @add_test_categories(["fork"])
69    def test_fork_follow(self):
70        self.fork_and_follow_test("fork")
71
72    @add_test_categories(["fork"])
73    def test_vfork_follow(self):
74        self.fork_and_follow_test("vfork")
75
76    @add_test_categories(["fork"])
77    def test_select_wrong_pid(self):
78        self.build()
79        self.prep_debug_monitor_and_inferior()
80        self.add_qSupported_packets(["multiprocess+"])
81        ret = self.expect_gdbremote_sequence()
82        self.assertIn("multiprocess+", ret["qSupported_response"])
83        self.reset_test_sequence()
84
85        # get process pid
86        self.test_sequence.add_log_lines(
87            [
88                "read packet: $qC#00",
89                {
90                    "direction": "send",
91                    "regex": "[$]QCp([0-9a-f]+).([0-9a-f]+)#.*",
92                    "capture": {1: "pid", 2: "tid"},
93                },
94            ],
95            True,
96        )
97        ret = self.expect_gdbremote_sequence()
98        pid, tid = (int(ret[x], 16) for x in ("pid", "tid"))
99        self.reset_test_sequence()
100
101        self.test_sequence.add_log_lines(
102            [
103                # try switching to correct pid
104                "read packet: $Hgp{:x}.{:x}#00".format(pid, tid),
105                "send packet: $OK#00",
106                "read packet: $Hcp{:x}.{:x}#00".format(pid, tid),
107                "send packet: $OK#00",
108                # try switching to invalid tid
109                "read packet: $Hgp{:x}.{:x}#00".format(pid, tid + 1),
110                "send packet: $E15#00",
111                "read packet: $Hcp{:x}.{:x}#00".format(pid, tid + 1),
112                "send packet: $E15#00",
113                # try switching to invalid pid
114                "read packet: $Hgp{:x}.{:x}#00".format(pid + 1, tid),
115                "send packet: $Eff#00",
116                "read packet: $Hcp{:x}.{:x}#00".format(pid + 1, tid),
117                "send packet: $Eff#00",
118            ],
119            True,
120        )
121        self.expect_gdbremote_sequence()
122
123    @add_test_categories(["fork"])
124    def test_detach_current(self):
125        self.build()
126        self.prep_debug_monitor_and_inferior()
127        self.add_qSupported_packets(["multiprocess+"])
128        ret = self.expect_gdbremote_sequence()
129        self.assertIn("multiprocess+", ret["qSupported_response"])
130        self.reset_test_sequence()
131
132        # get process pid
133        self.test_sequence.add_log_lines(
134            [
135                "read packet: $qC#00",
136                {
137                    "direction": "send",
138                    "regex": "[$]QCp([0-9a-f]+).[0-9a-f]+#.*",
139                    "capture": {1: "pid"},
140                },
141            ],
142            True,
143        )
144        ret = self.expect_gdbremote_sequence()
145        pid = ret["pid"]
146        self.reset_test_sequence()
147
148        # detach the process
149        self.test_sequence.add_log_lines(
150            [
151                "read packet: $D;{}#00".format(pid),
152                "send packet: $OK#00",
153                "read packet: $qC#00",
154                "send packet: $E44#00",
155            ],
156            True,
157        )
158        self.expect_gdbremote_sequence()
159
160    @add_test_categories(["fork"])
161    def test_detach_all(self):
162        self.detach_all_test()
163
164    @add_test_categories(["fork"])
165    def test_kill_all(self):
166        parent_pid, _, child_pid, _ = self.start_fork_test(["fork"])
167
168        exit_regex = "[$]X09;process:([0-9a-f]+)#.*"
169        self.test_sequence.add_log_lines(
170            [
171                # kill all processes
172                "read packet: $k#00",
173                {"direction": "send", "regex": exit_regex, "capture": {1: "pid1"}},
174                {"direction": "send", "regex": exit_regex, "capture": {1: "pid2"}},
175            ],
176            True,
177        )
178        ret = self.expect_gdbremote_sequence()
179        self.assertEqual(set([ret["pid1"], ret["pid2"]]), set([parent_pid, child_pid]))
180
181    @add_test_categories(["fork"])
182    def test_vkill_child(self):
183        self.vkill_test(kill_child=True)
184
185    @add_test_categories(["fork"])
186    def test_vkill_parent(self):
187        self.vkill_test(kill_parent=True)
188
189    @add_test_categories(["fork"])
190    def test_vkill_both(self):
191        self.vkill_test(kill_parent=True, kill_child=True)
192
193    @add_test_categories(["fork"])
194    def test_c_parent(self):
195        self.resume_one_test(run_order=["parent", "parent"])
196
197    @add_test_categories(["fork"])
198    def test_c_child(self):
199        self.resume_one_test(run_order=["child", "child"])
200
201    @add_test_categories(["fork"])
202    def test_c_parent_then_child(self):
203        self.resume_one_test(run_order=["parent", "parent", "child", "child"])
204
205    @add_test_categories(["fork"])
206    def test_c_child_then_parent(self):
207        self.resume_one_test(run_order=["child", "child", "parent", "parent"])
208
209    @add_test_categories(["fork"])
210    def test_c_interspersed(self):
211        self.resume_one_test(run_order=["parent", "child", "parent", "child"])
212
213    @add_test_categories(["fork"])
214    def test_vCont_parent(self):
215        self.resume_one_test(run_order=["parent", "parent"], use_vCont=True)
216
217    @add_test_categories(["fork"])
218    def test_vCont_child(self):
219        self.resume_one_test(run_order=["child", "child"], use_vCont=True)
220
221    @add_test_categories(["fork"])
222    def test_vCont_parent_then_child(self):
223        self.resume_one_test(
224            run_order=["parent", "parent", "child", "child"], use_vCont=True
225        )
226
227    @add_test_categories(["fork"])
228    def test_vCont_child_then_parent(self):
229        self.resume_one_test(
230            run_order=["child", "child", "parent", "parent"], use_vCont=True
231        )
232
233    @add_test_categories(["fork"])
234    def test_vCont_interspersed(self):
235        self.resume_one_test(
236            run_order=["parent", "child", "parent", "child"], use_vCont=True
237        )
238
239    @add_test_categories(["fork"])
240    def test_vCont_two_processes(self):
241        parent_pid, parent_tid, child_pid, child_tid = self.start_fork_test(
242            ["fork", "stop"]
243        )
244
245        self.test_sequence.add_log_lines(
246            [
247                # try to resume both processes
248                "read packet: $vCont;c:p{}.{};c:p{}.{}#00".format(
249                    parent_pid, parent_tid, child_pid, child_tid
250                ),
251                "send packet: $E03#00",
252            ],
253            True,
254        )
255        self.expect_gdbremote_sequence()
256
257    @add_test_categories(["fork"])
258    def test_vCont_all_processes_explicit(self):
259        self.start_fork_test(["fork", "stop"])
260
261        self.test_sequence.add_log_lines(
262            [
263                # try to resume all processes implicitly
264                "read packet: $vCont;c:p-1.-1#00",
265                "send packet: $E03#00",
266            ],
267            True,
268        )
269        self.expect_gdbremote_sequence()
270
271    @add_test_categories(["fork"])
272    def test_vCont_all_processes_implicit(self):
273        self.start_fork_test(["fork", "stop"])
274
275        self.test_sequence.add_log_lines(
276            [
277                # try to resume all processes implicitly
278                "read packet: $vCont;c#00",
279                "send packet: $E03#00",
280            ],
281            True,
282        )
283        self.expect_gdbremote_sequence()
284
285    @add_test_categories(["fork"])
286    def test_threadinfo(self):
287        parent_pid, parent_tid, child_pid, child_tid = self.start_fork_test(
288            ["fork", "thread:new", "stop"]
289        )
290        pidtids = [
291            (parent_pid, parent_tid),
292            (child_pid, child_tid),
293        ]
294
295        self.add_threadinfo_collection_packets()
296        ret = self.expect_gdbremote_sequence()
297        prev_pidtids = set(self.parse_threadinfo_packets(ret))
298        self.assertEqual(
299            prev_pidtids,
300            frozenset((int(pid, 16), int(tid, 16)) for pid, tid in pidtids),
301        )
302        self.reset_test_sequence()
303
304        for pidtid in pidtids:
305            self.test_sequence.add_log_lines(
306                [
307                    "read packet: $Hcp{}.{}#00".format(*pidtid),
308                    "send packet: $OK#00",
309                    "read packet: $c#00",
310                    {
311                        "direction": "send",
312                        "regex": self.stop_regex.format(*pidtid),
313                    },
314                ],
315                True,
316            )
317            self.add_threadinfo_collection_packets()
318            ret = self.expect_gdbremote_sequence()
319            self.reset_test_sequence()
320            new_pidtids = set(self.parse_threadinfo_packets(ret))
321            added_pidtid = new_pidtids - prev_pidtids
322            prev_pidtids = new_pidtids
323
324            # verify that we've got exactly one new thread, and that
325            # the PID matches
326            self.assertEqual(len(added_pidtid), 1)
327            self.assertEqual(added_pidtid.pop()[0], int(pidtid[0], 16))
328
329        for pidtid in new_pidtids:
330            self.test_sequence.add_log_lines(
331                [
332                    "read packet: $Hgp{:x}.{:x}#00".format(*pidtid),
333                    "send packet: $OK#00",
334                ],
335                True,
336            )
337        self.expect_gdbremote_sequence()
338
339    @add_test_categories(["fork"])
340    def test_memory_read_write(self):
341        self.build()
342        INITIAL_DATA = "Initial message"
343        self.prep_debug_monitor_and_inferior(
344            inferior_args=[
345                "set-message:{}".format(INITIAL_DATA),
346                "get-data-address-hex:g_message",
347                "fork",
348                "print-message:",
349                "stop",
350            ]
351        )
352        self.add_qSupported_packets(["multiprocess+", "fork-events+"])
353        ret = self.expect_gdbremote_sequence()
354        self.assertIn("fork-events+", ret["qSupported_response"])
355        self.reset_test_sequence()
356
357        # continue and expect fork
358        self.test_sequence.add_log_lines(
359            [
360                "read packet: $c#00",
361                {
362                    "type": "output_match",
363                    "regex": self.maybe_strict_output_regex(
364                        r"data address: 0x([0-9a-fA-F]+)\r\n"
365                    ),
366                    "capture": {1: "addr"},
367                },
368                {
369                    "direction": "send",
370                    "regex": self.fork_regex.format("fork"),
371                    "capture": self.fork_capture,
372                },
373            ],
374            True,
375        )
376        ret = self.expect_gdbremote_sequence()
377        pidtids = {
378            "parent": (ret["parent_pid"], ret["parent_tid"]),
379            "child": (ret["child_pid"], ret["child_tid"]),
380        }
381        addr = ret["addr"]
382        self.reset_test_sequence()
383
384        for name, pidtid in pidtids.items():
385            self.test_sequence.add_log_lines(
386                [
387                    "read packet: $Hgp{}.{}#00".format(*pidtid),
388                    "send packet: $OK#00",
389                    # read the current memory contents
390                    "read packet: $m{},{:x}#00".format(addr, len(INITIAL_DATA) + 1),
391                    {
392                        "direction": "send",
393                        "regex": r"^[$](.+)#.*$",
394                        "capture": {1: "data"},
395                    },
396                    # write a new value
397                    "read packet: $M{},{:x}:{}#00".format(
398                        addr, len(name) + 1, seven.hexlify(name + "\0")
399                    ),
400                    "send packet: $OK#00",
401                    # resume the process and wait for the trap
402                    "read packet: $Hcp{}.{}#00".format(*pidtid),
403                    "send packet: $OK#00",
404                    "read packet: $c#00",
405                    {
406                        "type": "output_match",
407                        "regex": self.maybe_strict_output_regex(r"message: (.*)\r\n"),
408                        "capture": {1: "printed_message"},
409                    },
410                    {
411                        "direction": "send",
412                        "regex": self.stop_regex.format(*pidtid),
413                    },
414                ],
415                True,
416            )
417            ret = self.expect_gdbremote_sequence()
418            data = seven.unhexlify(ret["data"])
419            self.assertEqual(data, INITIAL_DATA + "\0")
420            self.assertEqual(ret["printed_message"], name)
421            self.reset_test_sequence()
422
423        # we do the second round separately to make sure that initial data
424        # is correctly preserved while writing into the first process
425
426        for name, pidtid in pidtids.items():
427            self.test_sequence.add_log_lines(
428                [
429                    "read packet: $Hgp{}.{}#00".format(*pidtid),
430                    "send packet: $OK#00",
431                    # read the current memory contents
432                    "read packet: $m{},{:x}#00".format(addr, len(name) + 1),
433                    {
434                        "direction": "send",
435                        "regex": r"^[$](.+)#.*$",
436                        "capture": {1: "data"},
437                    },
438                ],
439                True,
440            )
441            ret = self.expect_gdbremote_sequence()
442            self.assertIsNotNone(ret.get("data"))
443            data = seven.unhexlify(ret.get("data"))
444            self.assertEqual(data, name + "\0")
445            self.reset_test_sequence()
446
447    @add_test_categories(["fork"])
448    def test_register_read_write(self):
449        parent_pid, parent_tid, child_pid, child_tid = self.start_fork_test(
450            ["fork", "thread:new", "stop"]
451        )
452        pidtids = [
453            (parent_pid, parent_tid),
454            (child_pid, child_tid),
455        ]
456
457        for pidtid in pidtids:
458            self.test_sequence.add_log_lines(
459                [
460                    "read packet: $Hcp{}.{}#00".format(*pidtid),
461                    "send packet: $OK#00",
462                    "read packet: $c#00",
463                    {
464                        "direction": "send",
465                        "regex": self.stop_regex.format(*pidtid),
466                    },
467                ],
468                True,
469            )
470
471        self.add_threadinfo_collection_packets()
472        ret = self.expect_gdbremote_sequence()
473        self.reset_test_sequence()
474
475        pidtids = set(self.parse_threadinfo_packets(ret))
476        self.assertEqual(len(pidtids), 4)
477        # first, save register values from all the threads
478        thread_regs = {}
479        for pidtid in pidtids:
480            for regno in range(256):
481                self.test_sequence.add_log_lines(
482                    [
483                        "read packet: $Hgp{:x}.{:x}#00".format(*pidtid),
484                        "send packet: $OK#00",
485                        "read packet: $p{:x}#00".format(regno),
486                        {
487                            "direction": "send",
488                            "regex": r"^[$](.+)#.*$",
489                            "capture": {1: "data"},
490                        },
491                    ],
492                    True,
493                )
494                ret = self.expect_gdbremote_sequence()
495                data = ret.get("data")
496                self.assertIsNotNone(data)
497                # ignore registers shorter than 32 bits (this also catches
498                # "Exx" errors)
499                if len(data) >= 8:
500                    break
501            else:
502                self.skipTest("no usable register found")
503            thread_regs[pidtid] = (regno, data)
504
505        vals = set(x[1] for x in thread_regs.values())
506        # NB: cheap hack to make the loop below easier
507        new_val = next(iter(vals))
508
509        # then, start altering them and verify that we don't unexpectedly
510        # change the value from another thread
511        for pidtid in pidtids:
512            old_val = thread_regs[pidtid]
513            regno = old_val[0]
514            old_val_length = len(old_val[1])
515            # generate a unique new_val
516            while new_val in vals:
517                new_val = "{{:0{}x}}".format(old_val_length).format(
518                    random.getrandbits(old_val_length * 4)
519                )
520            vals.add(new_val)
521
522            self.test_sequence.add_log_lines(
523                [
524                    "read packet: $Hgp{:x}.{:x}#00".format(*pidtid),
525                    "send packet: $OK#00",
526                    "read packet: $p{:x}#00".format(regno),
527                    {
528                        "direction": "send",
529                        "regex": r"^[$](.+)#.*$",
530                        "capture": {1: "data"},
531                    },
532                    "read packet: $P{:x}={}#00".format(regno, new_val),
533                    "send packet: $OK#00",
534                ],
535                True,
536            )
537            ret = self.expect_gdbremote_sequence()
538            data = ret.get("data")
539            self.assertIsNotNone(data)
540            self.assertEqual(data, old_val[1])
541            thread_regs[pidtid] = (regno, new_val)
542
543        # finally, verify that new values took effect
544        for pidtid in pidtids:
545            old_val = thread_regs[pidtid]
546            self.test_sequence.add_log_lines(
547                [
548                    "read packet: $Hgp{:x}.{:x}#00".format(*pidtid),
549                    "send packet: $OK#00",
550                    "read packet: $p{:x}#00".format(old_val[0]),
551                    {
552                        "direction": "send",
553                        "regex": r"^[$](.+)#.*$",
554                        "capture": {1: "data"},
555                    },
556                ],
557                True,
558            )
559            ret = self.expect_gdbremote_sequence()
560            data = ret.get("data")
561            self.assertIsNotNone(data)
562            self.assertEqual(data, old_val[1])
563
564    @add_test_categories(["fork"])
565    def test_qC(self):
566        parent_pid, parent_tid, child_pid, child_tid = self.start_fork_test(
567            ["fork", "thread:new", "stop"]
568        )
569        pidtids = [
570            (parent_pid, parent_tid),
571            (child_pid, child_tid),
572        ]
573
574        for pidtid in pidtids:
575            self.test_sequence.add_log_lines(
576                [
577                    "read packet: $Hcp{}.{}#00".format(*pidtid),
578                    "send packet: $OK#00",
579                    "read packet: $c#00",
580                    {
581                        "direction": "send",
582                        "regex": self.stop_regex.format(*pidtid),
583                    },
584                ],
585                True,
586            )
587
588        self.add_threadinfo_collection_packets()
589        ret = self.expect_gdbremote_sequence()
590        self.reset_test_sequence()
591
592        pidtids = set(self.parse_threadinfo_packets(ret))
593        self.assertEqual(len(pidtids), 4)
594        for pidtid in pidtids:
595            self.test_sequence.add_log_lines(
596                [
597                    "read packet: $Hgp{:x}.{:x}#00".format(*pidtid),
598                    "send packet: $OK#00",
599                    "read packet: $qC#00",
600                    "send packet: $QCp{:x}.{:x}#00".format(*pidtid),
601                ],
602                True,
603            )
604        self.expect_gdbremote_sequence()
605
606    @add_test_categories(["fork"])
607    def test_T(self):
608        parent_pid, parent_tid, child_pid, child_tid = self.start_fork_test(
609            ["fork", "thread:new", "stop"]
610        )
611        pidtids = [
612            (parent_pid, parent_tid),
613            (child_pid, child_tid),
614        ]
615
616        for pidtid in pidtids:
617            self.test_sequence.add_log_lines(
618                [
619                    "read packet: $Hcp{}.{}#00".format(*pidtid),
620                    "send packet: $OK#00",
621                    "read packet: $c#00",
622                    {
623                        "direction": "send",
624                        "regex": self.stop_regex.format(*pidtid),
625                    },
626                ],
627                True,
628            )
629
630        self.add_threadinfo_collection_packets()
631        ret = self.expect_gdbremote_sequence()
632        self.reset_test_sequence()
633
634        pidtids = set(self.parse_threadinfo_packets(ret))
635        self.assertEqual(len(pidtids), 4)
636        max_pid = max(pid for pid, tid in pidtids)
637        max_tid = max(tid for pid, tid in pidtids)
638        bad_pidtids = (
639            (max_pid, max_tid + 1, "E02"),
640            (max_pid + 1, max_tid, "E01"),
641            (max_pid + 1, max_tid + 1, "E01"),
642        )
643
644        for pidtid in pidtids:
645            self.test_sequence.add_log_lines(
646                [
647                    # test explicit PID+TID
648                    "read packet: $Tp{:x}.{:x}#00".format(*pidtid),
649                    "send packet: $OK#00",
650                    # test implicit PID via Hg
651                    "read packet: $Hgp{:x}.{:x}#00".format(*pidtid),
652                    "send packet: $OK#00",
653                    "read packet: $T{:x}#00".format(max_tid + 1),
654                    "send packet: $E02#00",
655                    "read packet: $T{:x}#00".format(pidtid[1]),
656                    "send packet: $OK#00",
657                ],
658                True,
659            )
660        for pid, tid, expected in bad_pidtids:
661            self.test_sequence.add_log_lines(
662                [
663                    "read packet: $Tp{:x}.{:x}#00".format(pid, tid),
664                    "send packet: ${}#00".format(expected),
665                ],
666                True,
667            )
668        self.expect_gdbremote_sequence()
669