1"""
2Test the functionality of interactive scripted processes
3"""
4
5import lldb
6import lldbsuite.test.lldbutil as lldbutil
7from lldbsuite.test.lldbtest import *
8import json, os
9
10
11class TestInteractiveScriptedProcess(TestBase):
12
13    NO_DEBUG_INFO_TESTCASE = True
14
15    def test_passthrough_launch(self):
16        """Test a simple pass-through process launch"""
17        self.build()
18        self.runCmd("file " + self.getBuildArtifact("a.out"), CURRENT_EXECUTABLE_SET)
19        self.main_source_file = lldb.SBFileSpec("main.cpp")
20        self.script_module = "interactive_scripted_process"
21        self.script_file = self.script_module + ".py"
22        self.passthrough_launch()
23
24    def duplicate_target(self, driving_target):
25        exe = driving_target.executable.fullpath
26        triple = driving_target.triple
27        return self.dbg.CreateTargetWithFileAndTargetTriple(exe, triple)
28
29    def get_launch_info(self, class_name, script_dict):
30        structured_data = lldb.SBStructuredData()
31        structured_data.SetFromJSON(json.dumps(script_dict))
32
33        launch_info = lldb.SBLaunchInfo(None)
34        launch_info.SetProcessPluginName("ScriptedProcess")
35        launch_info.SetScriptedProcessClassName(class_name)
36        launch_info.SetScriptedProcessDictionary(structured_data)
37        return launch_info
38
39    def passthrough_launch(self):
40        """Test that a simple passthrough wrapper functions correctly"""
41        # First build the real target:
42        self.assertEqual(self.dbg.GetNumTargets(), 1)
43        real_target_id = 0
44        real_target = self.dbg.GetTargetAtIndex(real_target_id)
45        lldbutil.run_break_set_by_source_regexp(self, "Break here")
46        self.assertEqual(real_target.GetNumBreakpoints(), 1)
47
48        # Now source in the scripted module:
49        script_path = os.path.join(self.getSourceDir(), self.script_file)
50        self.runCmd(f"command script import '{script_path}'")
51
52        mux_target = self.duplicate_target(real_target)
53        self.assertTrue(mux_target.IsValid(), "duplicate target succeeded")
54
55        mux_class = f"{self.script_module}.MultiplexerScriptedProcess"
56        script_dict = {"driving_target_idx": real_target_id}
57        mux_launch_info = self.get_launch_info(mux_class, script_dict)
58        mux_process_listener = lldb.SBListener(
59            "lldb.test.interactive-scripted-process.listener"
60        )
61        mux_launch_info.SetPassthroughListener(mux_process_listener)
62
63        self.dbg.SetAsync(True)
64        error = lldb.SBError()
65        mux_process = mux_target.Launch(mux_launch_info, error)
66        self.assertSuccess(error, "Launched multiplexer scripted process")
67        self.assertTrue(mux_process.IsValid(), "Got a valid process")
68
69        # Check that the mux process started running
70        event = lldbutil.fetch_next_event(
71            self, mux_process_listener, mux_process.GetBroadcaster(), timeout=60 * 5
72        )
73        self.assertState(lldb.SBProcess.GetStateFromEvent(event), lldb.eStateRunning)
74        # Check that the real process started running
75        event = lldbutil.fetch_next_event(
76            self, self.dbg.GetListener(), mux_process.GetBroadcaster()
77        )
78        self.assertState(lldb.SBProcess.GetStateFromEvent(event), lldb.eStateRunning)
79
80        # Check that the real process stopped
81        event = lldbutil.fetch_next_event(
82            self, self.dbg.GetListener(), mux_process.GetBroadcaster(), timeout=60 * 5
83        )
84        self.assertState(lldb.SBProcess.GetStateFromEvent(event), lldb.eStateStopped)
85        # Check that the mux process stopped
86        event = lldbutil.fetch_next_event(
87            self, mux_process_listener, mux_process.GetBroadcaster(), timeout=60 * 5
88        )
89        self.assertState(lldb.SBProcess.GetStateFromEvent(event), lldb.eStateStopped)
90
91        real_process = real_target.GetProcess()
92        self.assertTrue(real_process.IsValid(), "Got a valid process")
93        self.assertState(
94            real_process.GetState(), lldb.eStateStopped, "Process is stopped"
95        )
96
97        # This is a passthrough, so the two processes should have the same state:
98        # Check that we got the right threads:
99        self.assertEqual(
100            len(real_process.threads),
101            len(mux_process.threads),
102            "Same number of threads",
103        )
104        for id in range(len(real_process.threads)):
105            real_pc = real_process.threads[id].frame[0].pc
106            mux_pc = mux_process.threads[id].frame[0].pc
107            self.assertEqual(real_pc, mux_pc, f"PC's equal for {id}")
108
109        lldbutil.run_break_set_by_source_regexp(self, "also break here")
110        self.assertEqual(mux_target.GetNumBreakpoints(), 2)
111        error = mux_process.Continue()
112        self.assertSuccess(error, "Resuming multiplexer scripted process")
113        self.assertTrue(mux_process.IsValid(), "Got a valid process")
114
115        event = lldbutil.fetch_next_event(
116            self, mux_process_listener, mux_process.GetBroadcaster()
117        )
118        self.assertState(lldb.SBProcess.GetStateFromEvent(event), lldb.eStateRunning)
119        event = lldbutil.fetch_next_event(
120            self, mux_process_listener, mux_process.GetBroadcaster()
121        )
122        self.assertState(lldb.SBProcess.GetStateFromEvent(event), lldb.eStateStopped)
123