1"""
2Test the functionality of interactive scripted processes
3"""
4
5import lldb
6import lldbsuite.test.lldbutil as lldbutil
7from lldbsuite.test.lldbtest import *
8import json, os
9
10
11class TestInteractiveScriptedProcess(TestBase):
12
13    NO_DEBUG_INFO_TESTCASE = True
14
15    def setUp(self):
16        # Call super's setUp().
17        TestBase.setUp(self)
18        # Build and load test program
19        self.build()
20        self.runCmd("file " + self.getBuildArtifact("a.out"), CURRENT_EXECUTABLE_SET)
21        self.main_source_file = lldb.SBFileSpec("main.cpp")
22        self.script_module = "interactive_scripted_process"
23        self.script_file = self.script_module + ".py"
24
25    @skipUnlessDarwin
26    def test_passthrough_launch(self):
27        """Test a simple pass-through process launch"""
28        self.passthrough_launch()
29
30        lldbutil.run_break_set_by_source_regexp(self, "also break here")
31        self.assertEqual(self.mux_target.GetNumBreakpoints(), 2)
32        error = self.mux_process.Continue()
33        self.assertSuccess(error, "Resuming multiplexer scripted process")
34        self.assertTrue(self.mux_process.IsValid(), "Got a valid process")
35
36        event = lldbutil.fetch_next_event(
37            self, self.mux_process_listener, self.mux_process.GetBroadcaster()
38        )
39        self.assertState(lldb.SBProcess.GetStateFromEvent(event), lldb.eStateRunning)
40        event = lldbutil.fetch_next_event(
41            self, self.mux_process_listener, self.mux_process.GetBroadcaster()
42        )
43        self.assertState(lldb.SBProcess.GetStateFromEvent(event), lldb.eStateStopped)
44
45    @skipUnlessDarwin
46    def test_multiplexed_launch(self):
47        """Test a multiple interactive scripted process debugging"""
48        self.passthrough_launch()
49        self.assertEqual(self.dbg.GetNumTargets(), 2)
50
51        driving_target = self.mux_process.GetScriptedImplementation().driving_target
52        self.assertTrue(driving_target.IsValid(), "Driving target is invalid")
53
54        # Create a target for the multiplexed even scripted process
55        even_target = self.duplicate_target(driving_target)
56        self.assertTrue(
57            even_target.IsValid(),
58            "Couldn't duplicate driving target to launch multiplexed even scripted process",
59        )
60
61        class_name = f"{self.script_module}.MultiplexedScriptedProcess"
62        dictionary = {"driving_target_idx": self.dbg.GetIndexOfTarget(self.mux_target)}
63
64        dictionary["parity"] = 0
65        muxed_launch_info = self.get_launch_info(class_name, dictionary)
66
67        # Launch Even Child Scripted Process
68        error = lldb.SBError()
69        even_process = even_target.Launch(muxed_launch_info, error)
70        self.assertTrue(
71            even_process, "Couldn't launch multiplexed even scripted process"
72        )
73        self.multiplex(even_process)
74
75        # Check that the even process started running
76        event = lldbutil.fetch_next_event(
77            self, self.dbg.GetListener(), even_process.GetBroadcaster()
78        )
79        self.assertState(lldb.SBProcess.GetStateFromEvent(event), lldb.eStateRunning)
80        # Check that the even process stopped
81        event = lldbutil.fetch_next_event(
82            self, self.dbg.GetListener(), even_process.GetBroadcaster()
83        )
84        self.assertState(lldb.SBProcess.GetStateFromEvent(event), lldb.eStateStopped)
85
86        self.assertTrue(even_process.IsValid(), "Got a valid process")
87        self.assertState(
88            even_process.GetState(), lldb.eStateStopped, "Process is stopped"
89        )
90
91        # Create a target for the multiplexed odd scripted process
92        odd_target = self.duplicate_target(driving_target)
93        self.assertTrue(
94            odd_target.IsValid(),
95            "Couldn't duplicate driving target to launch multiplexed odd scripted process",
96        )
97
98        dictionary["parity"] = 1
99        muxed_launch_info = self.get_launch_info(class_name, dictionary)
100
101        # Launch Odd Child Scripted Process
102        error = lldb.SBError()
103        odd_process = odd_target.Launch(muxed_launch_info, error)
104        self.assertTrue(odd_process, "Couldn't launch multiplexed odd scripted process")
105        self.multiplex(odd_process)
106
107        # Check that the odd process started running
108        event = lldbutil.fetch_next_event(
109            self, self.dbg.GetListener(), odd_process.GetBroadcaster()
110        )
111        self.assertState(lldb.SBProcess.GetStateFromEvent(event), lldb.eStateRunning)
112        # Check that the odd process stopped
113        event = lldbutil.fetch_next_event(
114            self, self.dbg.GetListener(), odd_process.GetBroadcaster()
115        )
116        self.assertState(lldb.SBProcess.GetStateFromEvent(event), lldb.eStateStopped)
117
118        self.assertTrue(odd_process.IsValid(), "Got a valid process")
119        self.assertState(
120            odd_process.GetState(), lldb.eStateStopped, "Process is stopped"
121        )
122
123        # Set a breakpoint on the odd child process
124        bkpt = odd_target.BreakpointCreateBySourceRegex(
125            "also break here", self.main_source_file
126        )
127        self.assertEqual(odd_target.GetNumBreakpoints(), 1)
128        self.assertTrue(bkpt, "Second breakpoint set on child scripted process")
129        self.assertEqual(bkpt.GetNumLocations(), 1, "Second breakpoint has 1 location")
130
131        # Verify that the breakpoint was also set on the multiplexer & real target
132        self.assertEqual(self.mux_target.GetNumBreakpoints(), 2)
133        bkpt = self.mux_target.GetBreakpointAtIndex(1)
134        self.assertEqual(
135            bkpt.GetNumLocations(), 1, "Second breakpoint set on mux scripted process"
136        )
137        self.assertTrue(bkpt.MatchesName("multiplexed_scripted_process_421"))
138
139        self.assertGreater(driving_target.GetNumBreakpoints(), 1)
140
141        # Resume execution on child process
142        error = odd_process.Continue()
143        self.assertSuccess(error, "Resuming odd child scripted process")
144        self.assertTrue(odd_process.IsValid(), "Got a valid process")
145
146        # Since all the execution is asynchronous, the order in which events
147        # arrive is non-deterministic, so we need a data structure to make sure
148        # we received both the running and stopped event for each target.
149
150        # Initialize the execution event "bingo book", that maps a process index
151        # to a dictionary that contains flags that are not set for the process
152        # events that we care about (running & stopped)
153
154        execution_events = {
155            1: {lldb.eStateRunning: False, lldb.eStateStopped: False},
156            2: {lldb.eStateRunning: False, lldb.eStateStopped: False},
157            3: {lldb.eStateRunning: False, lldb.eStateStopped: False},
158        }
159
160        def fetch_process_event(self, execution_events):
161            event = lldbutil.fetch_next_event(
162                self,
163                self.dbg.GetListener(),
164                lldb.SBProcess.GetBroadcasterClass(),
165                match_class=True,
166            )
167            state = lldb.SBProcess.GetStateFromEvent(event)
168            self.assertIn(state, [lldb.eStateRunning, lldb.eStateStopped])
169            event_process = lldb.SBProcess.GetProcessFromEvent(event)
170            self.assertTrue(event_process.IsValid())
171            event_target = event_process.GetTarget()
172            event_target_idx = self.dbg.GetIndexOfTarget(event_target)
173            self.assertFalse(
174                execution_events[event_target_idx][state],
175                "Event already received for this process",
176            )
177            execution_events[event_target_idx][state] = True
178
179        event = lldbutil.fetch_next_event(
180            self, self.mux_process_listener, self.mux_process.GetBroadcaster()
181        )
182        self.assertState(lldb.SBProcess.GetStateFromEvent(event), lldb.eStateRunning)
183
184        event = lldbutil.fetch_next_event(
185            self, self.mux_process_listener, self.mux_process.GetBroadcaster()
186        )
187        self.assertState(lldb.SBProcess.GetStateFromEvent(event), lldb.eStateStopped)
188
189        for _ in range((self.dbg.GetNumTargets() - 1) * 2):
190            fetch_process_event(self, execution_events)
191
192        for target_index, event_states in execution_events.items():
193            for state, is_set in event_states.items():
194                self.assertTrue(is_set, f"Target {target_index} has state {state} set")
195
196    def duplicate_target(self, driving_target):
197        exe = driving_target.executable.fullpath
198        triple = driving_target.triple
199        return self.dbg.CreateTargetWithFileAndTargetTriple(exe, triple)
200
201    def get_launch_info(self, class_name, script_dict):
202        structured_data = lldb.SBStructuredData()
203        structured_data.SetFromJSON(json.dumps(script_dict))
204
205        launch_info = lldb.SBLaunchInfo(None)
206        launch_info.SetProcessPluginName("ScriptedProcess")
207        launch_info.SetScriptedProcessClassName(class_name)
208        launch_info.SetScriptedProcessDictionary(structured_data)
209        return launch_info
210
211    def multiplex(self, muxed_process):
212        muxed_process.GetScriptedImplementation().multiplexer = (
213            self.mux_process.GetScriptedImplementation()
214        )
215        self.mux_process.GetScriptedImplementation().multiplexed_processes[
216            muxed_process.GetProcessID()
217        ] = muxed_process
218
219    def passthrough_launch(self):
220        """Test that a simple passthrough wrapper functions correctly"""
221        # First build the real target:
222        self.assertEqual(self.dbg.GetNumTargets(), 1)
223        real_target_id = 0
224        real_target = self.dbg.GetTargetAtIndex(real_target_id)
225        lldbutil.run_break_set_by_source_regexp(self, "Break here")
226        self.assertEqual(real_target.GetNumBreakpoints(), 1)
227
228        # Now source in the scripted module:
229        script_path = os.path.join(self.getSourceDir(), self.script_file)
230        self.runCmd(f"command script import '{script_path}'")
231
232        self.mux_target = self.duplicate_target(real_target)
233        self.assertTrue(self.mux_target.IsValid(), "duplicate target succeeded")
234
235        mux_class = f"{self.script_module}.MultiplexerScriptedProcess"
236        script_dict = {"driving_target_idx": real_target_id}
237        mux_launch_info = self.get_launch_info(mux_class, script_dict)
238        self.mux_process_listener = lldb.SBListener(
239            "lldb.test.interactive-scripted-process.listener"
240        )
241        mux_launch_info.SetShadowListener(self.mux_process_listener)
242
243        self.dbg.SetAsync(True)
244        error = lldb.SBError()
245        self.mux_process = self.mux_target.Launch(mux_launch_info, error)
246        self.assertSuccess(error, "Launched multiplexer scripted process")
247        self.assertTrue(self.mux_process.IsValid(), "Got a valid process")
248
249        # Check that the mux process started running
250        event = lldbutil.fetch_next_event(
251            self, self.mux_process_listener, self.mux_process.GetBroadcaster()
252        )
253        self.assertState(lldb.SBProcess.GetStateFromEvent(event), lldb.eStateRunning)
254        # Check that the real process started running
255        event = lldbutil.fetch_next_event(
256            self, self.dbg.GetListener(), self.mux_process.GetBroadcaster()
257        )
258        self.assertState(lldb.SBProcess.GetStateFromEvent(event), lldb.eStateRunning)
259
260        # Check that the real process stopped
261        event = lldbutil.fetch_next_event(
262            self, self.dbg.GetListener(), self.mux_process.GetBroadcaster()
263        )
264        self.assertState(lldb.SBProcess.GetStateFromEvent(event), lldb.eStateStopped)
265        # Check that the mux process stopped
266        event = lldbutil.fetch_next_event(
267            self, self.mux_process_listener, self.mux_process.GetBroadcaster()
268        )
269        self.assertState(lldb.SBProcess.GetStateFromEvent(event), lldb.eStateStopped)
270
271        real_process = real_target.GetProcess()
272        self.assertTrue(real_process.IsValid(), "Got a valid process")
273        self.assertState(
274            real_process.GetState(), lldb.eStateStopped, "Process is stopped"
275        )
276
277        # This is a passthrough, so the two processes should have the same state:
278        # Check that we got the right threads:
279        self.assertEqual(
280            len(real_process.threads),
281            len(self.mux_process.threads),
282            "Same number of threads",
283        )
284        for id in range(len(real_process.threads)):
285            real_pc = real_process.threads[id].frame[0].pc
286            mux_pc = self.mux_process.threads[id].frame[0].pc
287            self.assertEqual(real_pc, mux_pc, f"PC's equal for {id}")
288