1import gdbremote_testcase 2from lldbsuite.test.decorators import * 3from lldbsuite.test.lldbtest import * 4from lldbsuite.test import lldbutil 5 6 7class TestGdbRemote_qThreadStopInfo(gdbremote_testcase.GdbRemoteTestCaseBase): 8 THREAD_COUNT = 5 9 10 def gather_stop_replies_via_qThreadStopInfo(self, threads): 11 # Grab stop reply for each thread via qThreadStopInfo{tid:hex}. 12 stop_replies = {} 13 thread_dicts = {} 14 for thread in threads: 15 # Run the qThreadStopInfo command. 16 self.reset_test_sequence() 17 self.test_sequence.add_log_lines( 18 [ 19 "read packet: $qThreadStopInfo{:x}#00".format(thread), 20 { 21 "direction": "send", 22 "regex": r"^\$T([0-9a-fA-F]+)([^#]+)#[0-9a-fA-F]{2}$", 23 "capture": {1: "stop_result", 2: "key_vals_text"}, 24 }, 25 ], 26 True, 27 ) 28 context = self.expect_gdbremote_sequence() 29 self.assertIsNotNone(context) 30 31 # Parse stop reply contents. 32 key_vals_text = context.get("key_vals_text") 33 self.assertIsNotNone(key_vals_text) 34 kv_dict = self.parse_key_val_dict(key_vals_text) 35 self.assertIsNotNone(kv_dict) 36 37 # Verify there is a thread and that it matches the expected thread 38 # id. 39 kv_thread = kv_dict.get("thread") 40 self.assertIsNotNone(kv_thread) 41 kv_thread_id = int(kv_thread, 16) 42 self.assertEqual(kv_thread_id, thread) 43 44 # Grab the stop id reported. 45 stop_result_text = context.get("stop_result") 46 self.assertIsNotNone(stop_result_text) 47 stop_replies[kv_thread_id] = int(stop_result_text, 16) 48 49 # Hang on to the key-val dictionary for the thread. 50 thread_dicts[kv_thread_id] = kv_dict 51 52 return stop_replies 53 54 @skipIfNetBSD 55 def test_qThreadStopInfo_works_for_multiple_threads(self): 56 self.build() 57 self.set_inferior_startup_launch() 58 _, threads = self.launch_with_threads(self.THREAD_COUNT) 59 stop_replies = self.gather_stop_replies_via_qThreadStopInfo(threads) 60 triple = self.dbg.GetSelectedPlatform().GetTriple() 61 # Consider one more thread created by calling DebugBreakProcess. 62 if re.match(".*-.*-windows", triple): 63 self.assertGreaterEqual(len(stop_replies), self.THREAD_COUNT) 64 else: 65 self.assertEqual(len(stop_replies), self.THREAD_COUNT) 66 67 @expectedFailureAll(oslist=["freebsd"], bugnumber="llvm.org/pr48418") 68 @expectedFailureNetBSD 69 @expectedFailureAll(oslist=["windows"]) # Output forwarding not implemented 70 def test_qThreadStopInfo_only_reports_one_thread_stop_reason_during_interrupt(self): 71 self.build() 72 self.set_inferior_startup_launch() 73 procs = self.prep_debug_monitor_and_inferior( 74 inferior_args=["thread:new"] * 4 + ["stop-me-now", "sleep:60"] 75 ) 76 77 self.test_sequence.add_log_lines( 78 [ 79 "read packet: $c#00", 80 { 81 "type": "output_match", 82 "regex": self.maybe_strict_output_regex(r"stop-me-now\r\n"), 83 }, 84 "read packet: \x03", 85 {"direction": "send", "regex": r"^\$T([0-9a-fA-F]{2})([^#]*)#..$"}, 86 ], 87 True, 88 ) 89 self.add_threadinfo_collection_packets() 90 context = self.expect_gdbremote_sequence() 91 threads = self.parse_threadinfo_packets(context) 92 93 stop_replies = self.gather_stop_replies_via_qThreadStopInfo(threads) 94 self.assertIsNotNone(stop_replies) 95 96 no_stop_reason_count = sum( 97 1 for stop_reason in list(stop_replies.values()) if stop_reason == 0 98 ) 99 with_stop_reason_count = sum( 100 1 for stop_reason in list(stop_replies.values()) if stop_reason != 0 101 ) 102 103 # All but one thread should report no stop reason. 104 triple = self.dbg.GetSelectedPlatform().GetTriple() 105 106 # Consider one more thread created by calling DebugBreakProcess. 107 if re.match(".*-.*-windows", triple): 108 self.assertGreaterEqual(no_stop_reason_count, self.THREAD_COUNT - 1) 109 else: 110 self.assertEqual(no_stop_reason_count, self.THREAD_COUNT - 1) 111 112 # Only one thread should should indicate a stop reason. 113 self.assertEqual(with_stop_reason_count, 1) 114