xref: /llvm-project/lldb/test/API/tools/lldb-server/TestGdbRemoteForkNonStop.py (revision 2238dcc39358353cac21df75c3c3286ab20b8f53)
1from lldbsuite.test.decorators import *
2from lldbsuite.test.lldbtest import *
3
4from fork_testbase import GdbRemoteForkTestBase
5
6
7class TestGdbRemoteForkNonStop(GdbRemoteForkTestBase):
8    def setUp(self):
9        GdbRemoteForkTestBase.setUp(self)
10        if self.getPlatform() == "linux" and self.getArchitecture() in [
11            "arm",
12            "aarch64",
13        ]:
14            self.skipTest("Unsupported for Arm/AArch64 Linux")
15
16    @add_test_categories(["fork"])
17    def test_vfork_nonstop(self):
18        parent_pid, parent_tid = self.fork_and_detach_test("vfork", nonstop=True)
19
20        # resume the parent
21        self.test_sequence.add_log_lines(
22            [
23                "read packet: $c#00",
24                "send packet: $OK#00",
25                {
26                    "direction": "send",
27                    "regex": r"%Stop:T[0-9a-fA-F]{{2}}thread:p{}[.]{}.*vforkdone.*".format(
28                        parent_pid, parent_tid
29                    ),
30                },
31                "read packet: $vStopped#00",
32                "send packet: $OK#00",
33                "read packet: $c#00",
34                "send packet: $OK#00",
35                "send packet: %Stop:W00;process:{}#00".format(parent_pid),
36                "read packet: $vStopped#00",
37                "send packet: $OK#00",
38            ],
39            True,
40        )
41        self.expect_gdbremote_sequence()
42
43    @add_test_categories(["fork"])
44    def test_fork_nonstop(self):
45        parent_pid, _ = self.fork_and_detach_test("fork", nonstop=True)
46
47        # resume the parent
48        self.test_sequence.add_log_lines(
49            [
50                "read packet: $c#00",
51                "send packet: $OK#00",
52                "send packet: %Stop:W00;process:{}#00".format(parent_pid),
53                "read packet: $vStopped#00",
54                "send packet: $OK#00",
55            ],
56            True,
57        )
58        self.expect_gdbremote_sequence()
59
60    @add_test_categories(["fork"])
61    def test_fork_follow_nonstop(self):
62        self.fork_and_follow_test("fork", nonstop=True)
63
64    @add_test_categories(["fork"])
65    def test_vfork_follow_nonstop(self):
66        self.fork_and_follow_test("vfork", nonstop=True)
67
68    @add_test_categories(["fork"])
69    def test_detach_all_nonstop(self):
70        self.detach_all_test(nonstop=True)
71
72    @add_test_categories(["fork"])
73    def test_kill_all_nonstop(self):
74        parent_pid, _, child_pid, _ = self.start_fork_test(["fork"], nonstop=True)
75
76        exit_regex = "X09;process:([0-9a-f]+)"
77        # Depending on a potential race, the second kill may make it into
78        # the async queue before we issue vStopped or after.  In the former
79        # case, we should expect the exit status in reply to vStopped.
80        # In the latter, we should expect an OK response (queue empty),
81        # followed by another async notification.
82        vstop_regex = "[$](OK|{})#.*".format(exit_regex)
83        self.test_sequence.add_log_lines(
84            [
85                # kill all processes
86                "read packet: $k#00",
87                "send packet: $OK#00",
88                {
89                    "direction": "send",
90                    "regex": "%Stop:{}#.*".format(exit_regex),
91                    "capture": {1: "pid1"},
92                },
93                "read packet: $vStopped#00",
94                {
95                    "direction": "send",
96                    "regex": vstop_regex,
97                    "capture": {1: "vstop_reply", 2: "pid2"},
98                },
99            ],
100            True,
101        )
102        ret = self.expect_gdbremote_sequence()
103        pid1 = ret["pid1"]
104        if ret["vstop_reply"] == "OK":
105            self.reset_test_sequence()
106            self.test_sequence.add_log_lines(
107                [
108                    {
109                        "direction": "send",
110                        "regex": "%Stop:{}#.*".format(exit_regex),
111                        "capture": {1: "pid2"},
112                    },
113                ],
114                True,
115            )
116            ret = self.expect_gdbremote_sequence()
117        pid2 = ret["pid2"]
118        self.reset_test_sequence()
119        self.test_sequence.add_log_lines(
120            [
121                "read packet: $vStopped#00",
122                "send packet: $OK#00",
123            ],
124            True,
125        )
126        self.expect_gdbremote_sequence()
127        self.assertEqual(set([pid1, pid2]), set([parent_pid, child_pid]))
128
129    @add_test_categories(["fork"])
130    def test_vkill_both_nonstop(self):
131        self.vkill_test(kill_parent=True, kill_child=True, nonstop=True)
132
133    @add_test_categories(["fork"])
134    def test_c_interspersed_nonstop(self):
135        self.resume_one_test(
136            run_order=["parent", "child", "parent", "child"], nonstop=True
137        )
138
139    @add_test_categories(["fork"])
140    def test_vCont_interspersed_nonstop(self):
141        self.resume_one_test(
142            run_order=["parent", "child", "parent", "child"],
143            use_vCont=True,
144            nonstop=True,
145        )
146
147    def get_all_output_via_vStdio(self, output_test):
148        # The output may be split into an arbitrary number of messages.
149        # Loop until we have everything. The first message is waiting for us
150        # in the packet queue.
151        output = self._server.get_raw_output_packet()
152        while not output_test(output):
153            self._server.send_packet(b"vStdio")
154            output += self._server.get_raw_output_packet()
155        return output
156
157    @add_test_categories(["fork"])
158    def test_c_both_nonstop(self):
159        lock1 = self.getBuildArtifact("lock1")
160        lock2 = self.getBuildArtifact("lock2")
161        parent_pid, parent_tid, child_pid, child_tid = self.start_fork_test(
162            [
163                "fork",
164                "process:sync:" + lock1,
165                "print-pid",
166                "process:sync:" + lock2,
167                "stop",
168            ],
169            nonstop=True,
170        )
171
172        self.test_sequence.add_log_lines(
173            [
174                "read packet: $Hcp{}.{}#00".format(parent_pid, parent_tid),
175                "send packet: $OK#00",
176                "read packet: $c#00",
177                "send packet: $OK#00",
178                "read packet: $Hcp{}.{}#00".format(child_pid, child_tid),
179                "send packet: $OK#00",
180                "read packet: $c#00",
181                "send packet: $OK#00",
182                {"direction": "send", "regex": "%Stop:T.*"},
183            ],
184            True,
185        )
186        self.expect_gdbremote_sequence()
187
188        output = self.get_all_output_via_vStdio(
189            lambda output: output.count(b"PID: ") >= 2
190        )
191        self.assertEqual(output.count(b"PID: "), 2)
192        self.assertIn("PID: {}".format(int(parent_pid, 16)).encode(), output)
193        self.assertIn("PID: {}".format(int(child_pid, 16)).encode(), output)
194
195    @add_test_categories(["fork"])
196    def test_vCont_both_nonstop(self):
197        lock1 = self.getBuildArtifact("lock1")
198        lock2 = self.getBuildArtifact("lock2")
199        parent_pid, parent_tid, child_pid, child_tid = self.start_fork_test(
200            [
201                "fork",
202                "process:sync:" + lock1,
203                "print-pid",
204                "process:sync:" + lock2,
205                "stop",
206            ],
207            nonstop=True,
208        )
209
210        self.test_sequence.add_log_lines(
211            [
212                "read packet: $vCont;c:p{}.{};c:p{}.{}#00".format(
213                    parent_pid, parent_tid, child_pid, child_tid
214                ),
215                "send packet: $OK#00",
216                {"direction": "send", "regex": "%Stop:T.*"},
217            ],
218            True,
219        )
220        self.expect_gdbremote_sequence()
221
222        output = self.get_all_output_via_vStdio(
223            lambda output: output.count(b"PID: ") >= 2
224        )
225        self.assertEqual(output.count(b"PID: "), 2)
226        self.assertIn("PID: {}".format(int(parent_pid, 16)).encode(), output)
227        self.assertIn("PID: {}".format(int(child_pid, 16)).encode(), output)
228
229    def vCont_both_nonstop_test(self, vCont_packet):
230        lock1 = self.getBuildArtifact("lock1")
231        lock2 = self.getBuildArtifact("lock2")
232        parent_pid, parent_tid, child_pid, child_tid = self.start_fork_test(
233            [
234                "fork",
235                "process:sync:" + lock1,
236                "print-pid",
237                "process:sync:" + lock2,
238                "stop",
239            ],
240            nonstop=True,
241        )
242
243        self.test_sequence.add_log_lines(
244            [
245                "read packet: ${}#00".format(vCont_packet),
246                "send packet: $OK#00",
247                {"direction": "send", "regex": "%Stop:T.*"},
248            ],
249            True,
250        )
251        self.expect_gdbremote_sequence()
252
253        output = self.get_all_output_via_vStdio(
254            lambda output: output.count(b"PID: ") >= 2
255        )
256        self.assertEqual(output.count(b"PID: "), 2)
257        self.assertIn("PID: {}".format(int(parent_pid, 16)).encode(), output)
258        self.assertIn("PID: {}".format(int(child_pid, 16)).encode(), output)
259
260    @add_test_categories(["fork"])
261    def test_vCont_both_implicit_nonstop(self):
262        self.vCont_both_nonstop_test("vCont;c")
263
264    @add_test_categories(["fork"])
265    def test_vCont_both_minus_one_nonstop(self):
266        self.vCont_both_nonstop_test("vCont;c:p-1.-1")
267