import gdbremote_testcase class GdbRemoteForkTestBase(gdbremote_testcase.GdbRemoteTestCaseBase): fork_regex = ( "[$]T[0-9a-fA-F]{{2}}thread:p([0-9a-f]+)[.]([0-9a-f]+);.*" "{}:p([0-9a-f]+)[.]([0-9a-f]+).*" ) fork_regex_nonstop = ( "%Stop:T[0-9a-fA-F]{{2}}" "thread:p([0-9a-f]+)[.]([0-9a-f]+);.*" "{}:p([0-9a-f]+)[.]([0-9a-f]+).*" ) fork_capture = {1: "parent_pid", 2: "parent_tid", 3: "child_pid", 4: "child_tid"} stop_regex_base = "T[0-9a-fA-F]{{2}}thread:p{}.{};.*reason:signal.*" stop_regex = "^[$]" + stop_regex_base def start_fork_test(self, args, variant="fork", nonstop=False): self.build() self.prep_debug_monitor_and_inferior(inferior_args=args) self.add_qSupported_packets(["multiprocess+", "{}-events+".format(variant)]) ret = self.expect_gdbremote_sequence() self.assertIn("{}-events+".format(variant), ret["qSupported_response"]) self.reset_test_sequence() # continue and expect fork if nonstop: self.test_sequence.add_log_lines( [ "read packet: $QNonStop:1#00", "send packet: $OK#00", "read packet: $c#00", "send packet: $OK#00", { "direction": "send", "regex": self.fork_regex_nonstop.format(variant), "capture": self.fork_capture, }, "read packet: $vStopped#00", "send packet: $OK#00", ], True, ) else: self.test_sequence.add_log_lines( [ "read packet: $c#00", { "direction": "send", "regex": self.fork_regex.format(variant), "capture": self.fork_capture, }, ], True, ) ret = self.expect_gdbremote_sequence() self.reset_test_sequence() return tuple( ret[x] for x in ("parent_pid", "parent_tid", "child_pid", "child_tid") ) def fork_and_detach_test(self, variant, nonstop=False): parent_pid, parent_tid, child_pid, child_tid = self.start_fork_test( [variant], variant, nonstop=nonstop ) # detach the forked child self.test_sequence.add_log_lines( [ "read packet: $D;{}#00".format(child_pid), "send packet: $OK#00", # verify that the current process is correct "read packet: $qC#00", "send packet: $QCp{}.{}#00".format(parent_pid, parent_tid), # verify that the correct processes are detached/available "read packet: $Hgp{}.{}#00".format(child_pid, child_tid), "send packet: $Eff#00", "read packet: $Hgp{}.{}#00".format(parent_pid, parent_tid), "send packet: $OK#00", ], True, ) self.expect_gdbremote_sequence() self.reset_test_sequence() return parent_pid, parent_tid def fork_and_follow_test(self, variant, nonstop=False): parent_pid, parent_tid, child_pid, child_tid = self.start_fork_test( [variant], variant, nonstop=nonstop ) # switch to the forked child self.test_sequence.add_log_lines( [ "read packet: $Hgp{}.{}#00".format(child_pid, child_tid), "send packet: $OK#00", "read packet: $Hcp{}.{}#00".format(child_pid, child_tid), "send packet: $OK#00", # detach the parent "read packet: $D;{}#00".format(parent_pid), "send packet: $OK#00", # verify that the correct processes are detached/available "read packet: $Hgp{}.{}#00".format(parent_pid, parent_tid), "send packet: $Eff#00", "read packet: $Hgp{}.{}#00".format(child_pid, child_tid), "send packet: $OK#00", # then resume the child "read packet: $c#00", ], True, ) if nonstop: self.test_sequence.add_log_lines( [ "send packet: $OK#00", "send packet: %Stop:W00;process:{}#00".format(child_pid), "read packet: $vStopped#00", "send packet: $OK#00", ], True, ) else: self.test_sequence.add_log_lines( [ "send packet: $W00;process:{}#00".format(child_pid), ], True, ) self.expect_gdbremote_sequence() def detach_all_test(self, nonstop=False): parent_pid, parent_tid, child_pid, child_tid = self.start_fork_test( ["fork"], nonstop=nonstop ) self.test_sequence.add_log_lines( [ # double-check our PIDs "read packet: $Hgp{}.{}#00".format(parent_pid, parent_tid), "send packet: $OK#00", "read packet: $Hgp{}.{}#00".format(child_pid, child_tid), "send packet: $OK#00", # detach all processes "read packet: $D#00", "send packet: $OK#00", # verify that both PIDs are invalid now "read packet: $Hgp{}.{}#00".format(parent_pid, parent_tid), "send packet: $Eff#00", "read packet: $Hgp{}.{}#00".format(child_pid, child_tid), "send packet: $Eff#00", ], True, ) self.expect_gdbremote_sequence() def vkill_test(self, kill_parent=False, kill_child=False, nonstop=False): assert kill_parent or kill_child parent_pid, parent_tid, child_pid, child_tid = self.start_fork_test( ["fork"], nonstop=nonstop ) if kill_parent: self.test_sequence.add_log_lines( [ # kill the process "read packet: $vKill;{}#00".format(parent_pid), "send packet: $OK#00", ], True, ) if kill_child: self.test_sequence.add_log_lines( [ # kill the process "read packet: $vKill;{}#00".format(child_pid), "send packet: $OK#00", ], True, ) self.test_sequence.add_log_lines( [ # check child PID/TID "read packet: $Hgp{}.{}#00".format(child_pid, child_tid), "send packet: ${}#00".format("Eff" if kill_child else "OK"), # check parent PID/TID "read packet: $Hgp{}.{}#00".format(parent_pid, parent_tid), "send packet: ${}#00".format("Eff" if kill_parent else "OK"), ], True, ) self.expect_gdbremote_sequence() def resume_one_test(self, run_order, use_vCont=False, nonstop=False): parent_pid, parent_tid, child_pid, child_tid = self.start_fork_test( ["fork", "stop"], nonstop=nonstop ) parent_expect = [ self.stop_regex_base.format(parent_pid, parent_tid), "W00;process:{}#.*".format(parent_pid), ] child_expect = [ self.stop_regex_base.format(child_pid, child_tid), "W00;process:{}#.*".format(child_pid), ] for x in run_order: if x == "parent": pidtid = (parent_pid, parent_tid) expect = parent_expect.pop(0) elif x == "child": pidtid = (child_pid, child_tid) expect = child_expect.pop(0) else: assert False, "unexpected x={}".format(x) if use_vCont: self.test_sequence.add_log_lines( [ # continue the selected process "read packet: $vCont;c:p{}.{}#00".format(*pidtid), ], True, ) else: self.test_sequence.add_log_lines( [ # continue the selected process "read packet: $Hcp{}.{}#00".format(*pidtid), "send packet: $OK#00", "read packet: $c#00", ], True, ) if nonstop: self.test_sequence.add_log_lines( [ "send packet: $OK#00", {"direction": "send", "regex": "%Stop:" + expect}, "read packet: $vStopped#00", "send packet: $OK#00", ], True, ) else: self.test_sequence.add_log_lines( [ {"direction": "send", "regex": "[$]" + expect}, ], True, ) # if at least one process remained, check both PIDs if parent_expect or child_expect: self.test_sequence.add_log_lines( [ "read packet: $Hgp{}.{}#00".format(parent_pid, parent_tid), "send packet: ${}#00".format("OK" if parent_expect else "Eff"), "read packet: $Hgp{}.{}#00".format(child_pid, child_tid), "send packet: ${}#00".format("OK" if child_expect else "Eff"), ], True, ) self.expect_gdbremote_sequence()