xref: /llvm-project/lldb/test/API/tools/lldb-server/TestGdbRemoteThreadsInStopReply.py (revision 80fcecb13c388ff087a27a4b0e7ca3dd8c98eaa4)
1import json
2import re
3
4import gdbremote_testcase
5from lldbsuite.test.decorators import *
6from lldbsuite.test.lldbtest import *
7from lldbsuite.test import lldbutil
8
9
10class TestGdbRemoteThreadsInStopReply(gdbremote_testcase.GdbRemoteTestCaseBase):
11    ENABLE_THREADS_IN_STOP_REPLY_ENTRIES = [
12        "read packet: $QListThreadsInStopReply#21",
13        "send packet: $OK#00",
14    ]
15
16    def gather_stop_reply_fields(self, thread_count, field_names):
17        context, threads = self.launch_with_threads(thread_count)
18        key_vals_text = context.get("stop_reply_kv")
19        self.assertIsNotNone(key_vals_text)
20
21        self.reset_test_sequence()
22        self.add_register_info_collection_packets()
23        self.add_process_info_collection_packets()
24
25        context = self.expect_gdbremote_sequence()
26        self.assertIsNotNone(context)
27        hw_info = self.parse_hw_info(context)
28
29        # Parse the stop reply contents.
30        kv_dict = self.parse_key_val_dict(key_vals_text)
31
32        result = dict()
33        result["pc_register"] = hw_info["pc_register"]
34        result["little_endian"] = hw_info["little_endian"]
35        for key_field in field_names:
36            result[key_field] = kv_dict.get(key_field)
37
38        return result
39
40    def gather_stop_reply_threads(self, thread_count):
41        # Pull out threads from stop response.
42        stop_reply_threads_text = self.gather_stop_reply_fields(
43            thread_count, ["threads"]
44        )["threads"]
45        if stop_reply_threads_text:
46            return [
47                int(thread_id, 16) for thread_id in stop_reply_threads_text.split(",")
48            ]
49        else:
50            return []
51
52    def gather_stop_reply_pcs(self, thread_count):
53        results = self.gather_stop_reply_fields(thread_count, ["threads", "thread-pcs"])
54        if not results:
55            return []
56
57        threads_text = results["threads"]
58        pcs_text = results["thread-pcs"]
59        thread_ids = threads_text.split(",")
60        pcs = pcs_text.split(",")
61        self.assertEqual(len(thread_ids), len(pcs))
62
63        thread_pcs = dict()
64        for i in range(0, len(pcs)):
65            thread_pcs[int(thread_ids[i], 16)] = pcs[i]
66
67        result = dict()
68        result["thread_pcs"] = thread_pcs
69        result["pc_register"] = results["pc_register"]
70        result["little_endian"] = results["little_endian"]
71        return result
72
73    def switch_endian(self, egg):
74        return "".join(reversed(re.findall("..", egg)))
75
76    def parse_hw_info(self, context):
77        self.assertIsNotNone(context)
78        process_info = self.parse_process_info_response(context)
79        endian = process_info.get("endian")
80        reg_info = self.parse_register_info_packets(context)
81        (pc_lldb_reg_index, pc_reg_info) = self.find_pc_reg_info(reg_info)
82
83        hw_info = dict()
84        hw_info["pc_register"] = pc_lldb_reg_index
85        hw_info["little_endian"] = endian == "little"
86        return hw_info
87
88    def gather_threads_info_pcs(self, pc_register, little_endian):
89        self.reset_test_sequence()
90        self.test_sequence.add_log_lines(
91            [
92                "read packet: $jThreadsInfo#c1",
93                {
94                    "direction": "send",
95                    "regex": r"^\$(.*)#[0-9a-fA-F]{2}$",
96                    "capture": {1: "threads_info"},
97                },
98            ],
99            True,
100        )
101
102        context = self.expect_gdbremote_sequence()
103        self.assertIsNotNone(context)
104        threads_info = context.get("threads_info")
105        register = str(pc_register)
106        # The jThreadsInfo response is not valid JSON data, so we have to
107        # clean it up first.
108        jthreads_info = json.loads(re.sub(r"}]", "}", threads_info))
109        thread_pcs = dict()
110        for thread_info in jthreads_info:
111            tid = thread_info["tid"]
112            pc = thread_info["registers"][register]
113            thread_pcs[tid] = self.switch_endian(pc) if little_endian else pc
114
115        return thread_pcs
116
117    def test_QListThreadsInStopReply_supported(self):
118        self.build()
119        self.set_inferior_startup_launch()
120        procs = self.prep_debug_monitor_and_inferior()
121        self.test_sequence.add_log_lines(
122            self.ENABLE_THREADS_IN_STOP_REPLY_ENTRIES, True
123        )
124
125        context = self.expect_gdbremote_sequence()
126        self.assertIsNotNone(context)
127
128    @skipIfNetBSD
129    @expectedFailureAll(oslist=["windows"])  # Extra threads present
130    def test_stop_reply_reports_multiple_threads(self):
131        self.build()
132        self.set_inferior_startup_launch()
133        # Gather threads from stop notification when QThreadsInStopReply is
134        # enabled.
135        self.test_sequence.add_log_lines(
136            self.ENABLE_THREADS_IN_STOP_REPLY_ENTRIES, True
137        )
138        stop_reply_threads = self.gather_stop_reply_threads(5)
139        self.assertEqual(len(stop_reply_threads), 5)
140
141    @skipIfNetBSD
142    def test_no_QListThreadsInStopReply_supplies_no_threads(self):
143        self.build()
144        self.set_inferior_startup_launch()
145        # Gather threads from stop notification when QThreadsInStopReply is not
146        # enabled.
147        stop_reply_threads = self.gather_stop_reply_threads(5)
148        self.assertEqual(len(stop_reply_threads), 0)
149
150    @skipIfNetBSD
151    def test_stop_reply_reports_correct_threads(self):
152        self.build()
153        self.set_inferior_startup_launch()
154        # Gather threads from stop notification when QThreadsInStopReply is
155        # enabled.
156        thread_count = 5
157        self.test_sequence.add_log_lines(
158            self.ENABLE_THREADS_IN_STOP_REPLY_ENTRIES, True
159        )
160        stop_reply_threads = self.gather_stop_reply_threads(thread_count)
161
162        # Gather threads from q{f,s}ThreadInfo.
163        self.reset_test_sequence()
164        self.add_threadinfo_collection_packets()
165
166        context = self.expect_gdbremote_sequence()
167        self.assertIsNotNone(context)
168
169        threads = self.parse_threadinfo_packets(context)
170        self.assertIsNotNone(threads)
171        self.assertGreaterEqual(len(threads), thread_count)
172
173        # Ensure each thread in q{f,s}ThreadInfo appears in stop reply threads
174        for tid in threads:
175            self.assertIn(tid, stop_reply_threads)
176
177    @skipIfNetBSD
178    @skipIfWindows  # Flaky on Windows
179    def test_stop_reply_contains_thread_pcs(self):
180        self.build()
181        self.set_inferior_startup_launch()
182        thread_count = 5
183        self.test_sequence.add_log_lines(
184            self.ENABLE_THREADS_IN_STOP_REPLY_ENTRIES, True
185        )
186        results = self.gather_stop_reply_pcs(thread_count)
187        stop_reply_pcs = results["thread_pcs"]
188        pc_register = results["pc_register"]
189        little_endian = results["little_endian"]
190        self.assertGreaterEqual(len(stop_reply_pcs), thread_count)
191
192        threads_info_pcs = self.gather_threads_info_pcs(pc_register, little_endian)
193
194        self.assertEqual(len(threads_info_pcs), len(stop_reply_pcs))
195        for thread_id in stop_reply_pcs:
196            self.assertIn(thread_id, threads_info_pcs)
197            self.assertEqual(
198                int(stop_reply_pcs[thread_id], 16), int(threads_info_pcs[thread_id], 16)
199            )
200