1"""
2Test the functionality of interactive scripted processes
3"""
4
5import lldb
6import lldbsuite.test.lldbutil as lldbutil
7from lldbsuite.test.decorators import *
8from lldbsuite.test.lldbtest import *
9import json, os
10
11
12class TestInteractiveScriptedProcess(TestBase):
13    NO_DEBUG_INFO_TESTCASE = True
14
15    def setUp(self):
16        # Call super's setUp().
17        TestBase.setUp(self)
18        # Build and load test program
19        self.build()
20        self.runCmd("file " + self.getBuildArtifact("a.out"), CURRENT_EXECUTABLE_SET)
21        self.main_source_file = lldb.SBFileSpec("main.cpp")
22        self.script_module = "interactive_scripted_process"
23        self.script_file = self.script_module + ".py"
24
25    # These tests are flakey and sometimes timeout.  They work most of the time
26    # so the basic event flow is right, but somehow the handling is off.
27    @skipUnlessDarwin
28    @skipIfDarwin
29    def test_passthrough_launch(self):
30        """Test a simple pass-through process launch"""
31        self.passthrough_launch()
32
33        lldbutil.run_break_set_by_source_regexp(self, "also break here")
34        self.assertEqual(self.mux_target.GetNumBreakpoints(), 2)
35        error = self.mux_process.Continue()
36        self.assertSuccess(error, "Resuming multiplexer scripted process")
37        self.assertTrue(self.mux_process.IsValid(), "Got a valid process")
38
39        event = lldbutil.fetch_next_event(
40            self, self.dbg.GetListener(), self.mux_process.GetBroadcaster(), timeout=20
41        )
42        self.assertState(lldb.SBProcess.GetStateFromEvent(event), lldb.eStateRunning)
43        event = lldbutil.fetch_next_event(
44            self, self.dbg.GetListener(), self.mux_process.GetBroadcaster()
45        )
46        self.assertState(lldb.SBProcess.GetStateFromEvent(event), lldb.eStateStopped)
47
48        event = lldbutil.fetch_next_event(
49            self, self.mux_process_listener, self.mux_process.GetBroadcaster()
50        )
51        self.assertState(lldb.SBProcess.GetStateFromEvent(event), lldb.eStateRunning)
52        event = lldbutil.fetch_next_event(
53            self, self.mux_process_listener, self.mux_process.GetBroadcaster()
54        )
55        self.assertState(lldb.SBProcess.GetStateFromEvent(event), lldb.eStateStopped)
56
57    @skipUnlessDarwin
58    @skipIfDarwin
59    def test_multiplexed_launch(self):
60        """Test a multiple interactive scripted process debugging"""
61        self.passthrough_launch()
62        self.assertEqual(self.dbg.GetNumTargets(), 2)
63
64        driving_target = self.mux_process.GetScriptedImplementation().driving_target
65        self.assertTrue(driving_target.IsValid(), "Driving target is invalid")
66
67        # Create a target for the multiplexed even scripted process
68        even_target = self.duplicate_target(driving_target)
69        self.assertTrue(
70            even_target.IsValid(),
71            "Couldn't duplicate driving target to launch multiplexed even scripted process",
72        )
73
74        class_name = f"{self.script_module}.MultiplexedScriptedProcess"
75        dictionary = {"driving_target_idx": self.dbg.GetIndexOfTarget(self.mux_target)}
76
77        dictionary["parity"] = 0
78        muxed_launch_info = self.get_launch_info(class_name, dictionary)
79
80        # Launch Even Child Scripted Process
81        error = lldb.SBError()
82        even_process = even_target.Launch(muxed_launch_info, error)
83        self.assertTrue(
84            even_process, "Couldn't launch multiplexed even scripted process"
85        )
86        self.multiplex(even_process)
87
88        # Check that the even process started running
89        event = lldbutil.fetch_next_event(
90            self, self.dbg.GetListener(), even_process.GetBroadcaster()
91        )
92        self.assertState(lldb.SBProcess.GetStateFromEvent(event), lldb.eStateRunning)
93        # Check that the even process stopped
94        event = lldbutil.fetch_next_event(
95            self, self.dbg.GetListener(), even_process.GetBroadcaster()
96        )
97        self.assertState(lldb.SBProcess.GetStateFromEvent(event), lldb.eStateStopped)
98
99        self.assertTrue(even_process.IsValid(), "Got a valid process")
100        self.assertState(
101            even_process.GetState(), lldb.eStateStopped, "Process is stopped"
102        )
103
104        # Create a target for the multiplexed odd scripted process
105        odd_target = self.duplicate_target(driving_target)
106        self.assertTrue(
107            odd_target.IsValid(),
108            "Couldn't duplicate driving target to launch multiplexed odd scripted process",
109        )
110
111        dictionary["parity"] = 1
112        muxed_launch_info = self.get_launch_info(class_name, dictionary)
113
114        # Launch Odd Child Scripted Process
115        error = lldb.SBError()
116        odd_process = odd_target.Launch(muxed_launch_info, error)
117        self.assertTrue(odd_process, "Couldn't launch multiplexed odd scripted process")
118        self.multiplex(odd_process)
119
120        # Check that the odd process started running
121        event = lldbutil.fetch_next_event(
122            self, self.dbg.GetListener(), odd_process.GetBroadcaster()
123        )
124        self.assertState(lldb.SBProcess.GetStateFromEvent(event), lldb.eStateRunning)
125        # Check that the odd process stopped
126        event = lldbutil.fetch_next_event(
127            self, self.dbg.GetListener(), odd_process.GetBroadcaster()
128        )
129        self.assertState(lldb.SBProcess.GetStateFromEvent(event), lldb.eStateStopped)
130
131        self.assertTrue(odd_process.IsValid(), "Got a valid process")
132        self.assertState(
133            odd_process.GetState(), lldb.eStateStopped, "Process is stopped"
134        )
135
136        # Set a breakpoint on the odd child process
137        bkpt = odd_target.BreakpointCreateBySourceRegex(
138            "also break here", self.main_source_file
139        )
140        self.assertEqual(odd_target.GetNumBreakpoints(), 1)
141        self.assertTrue(bkpt, "Second breakpoint set on child scripted process")
142        self.assertEqual(bkpt.GetNumLocations(), 1, "Second breakpoint has 1 location")
143
144        # Verify that the breakpoint was also set on the multiplexer & real target
145        self.assertEqual(self.mux_target.GetNumBreakpoints(), 2)
146        bkpt = self.mux_target.GetBreakpointAtIndex(1)
147        self.assertEqual(
148            bkpt.GetNumLocations(), 1, "Second breakpoint set on mux scripted process"
149        )
150        self.assertTrue(bkpt.MatchesName("multiplexed_scripted_process_421"))
151
152        self.assertGreater(driving_target.GetNumBreakpoints(), 1)
153
154        # Resume execution on child process
155        error = odd_process.Continue()
156        self.assertSuccess(error, "Resuming odd child scripted process")
157        self.assertTrue(odd_process.IsValid(), "Got a valid process")
158
159        # Since all the execution is asynchronous, the order in which events
160        # arrive is non-deterministic, so we need a data structure to make sure
161        # we received both the running and stopped event for each target.
162
163        # Initialize the execution event "bingo book", that maps a process index
164        # to a dictionary that contains flags that are not set for the process
165        # events that we care about (running & stopped)
166
167        execution_events = {
168            1: {lldb.eStateRunning: False, lldb.eStateStopped: False},
169            2: {lldb.eStateRunning: False, lldb.eStateStopped: False},
170            3: {lldb.eStateRunning: False, lldb.eStateStopped: False},
171        }
172
173        def fetch_process_event(self, execution_events):
174            event = lldbutil.fetch_next_event(
175                self,
176                self.dbg.GetListener(),
177                lldb.SBProcess.GetBroadcasterClass(),
178                match_class=True,
179            )
180            state = lldb.SBProcess.GetStateFromEvent(event)
181            self.assertIn(state, [lldb.eStateRunning, lldb.eStateStopped])
182            event_process = lldb.SBProcess.GetProcessFromEvent(event)
183            self.assertTrue(event_process.IsValid())
184            event_target = event_process.GetTarget()
185            event_target_idx = self.dbg.GetIndexOfTarget(event_target)
186            self.assertFalse(
187                execution_events[event_target_idx][state],
188                "Event already received for this process",
189            )
190            execution_events[event_target_idx][state] = True
191
192        for _ in range((self.dbg.GetNumTargets() - 1) * 2):
193            fetch_process_event(self, execution_events)
194
195        for target_index, event_states in execution_events.items():
196            for state, is_set in event_states.items():
197                self.assertTrue(is_set, f"Target {target_index} has state {state} set")
198
199        event = lldbutil.fetch_next_event(
200            self, self.mux_process_listener, self.mux_process.GetBroadcaster()
201        )
202        self.assertState(lldb.SBProcess.GetStateFromEvent(event), lldb.eStateRunning)
203
204        event = lldbutil.fetch_next_event(
205            self, self.mux_process_listener, self.mux_process.GetBroadcaster()
206        )
207        self.assertState(lldb.SBProcess.GetStateFromEvent(event), lldb.eStateStopped)
208
209    def duplicate_target(self, driving_target):
210        exe = driving_target.executable.fullpath
211        triple = driving_target.triple
212        return self.dbg.CreateTargetWithFileAndTargetTriple(exe, triple)
213
214    def get_launch_info(self, class_name, script_dict):
215        structured_data = lldb.SBStructuredData()
216        structured_data.SetFromJSON(json.dumps(script_dict))
217
218        launch_info = lldb.SBLaunchInfo(None)
219        launch_info.SetProcessPluginName("ScriptedProcess")
220        launch_info.SetScriptedProcessClassName(class_name)
221        launch_info.SetScriptedProcessDictionary(structured_data)
222        return launch_info
223
224    def multiplex(self, muxed_process):
225        muxed_process.GetScriptedImplementation().multiplexer = (
226            self.mux_process.GetScriptedImplementation()
227        )
228        self.mux_process.GetScriptedImplementation().multiplexed_processes[
229            muxed_process.GetProcessID()
230        ] = muxed_process
231
232    def passthrough_launch(self):
233        """Test that a simple passthrough wrapper functions correctly"""
234        # First build the real target:
235        self.assertEqual(self.dbg.GetNumTargets(), 1)
236        real_target_id = 0
237        real_target = self.dbg.GetTargetAtIndex(real_target_id)
238        lldbutil.run_break_set_by_source_regexp(self, "Break here")
239        self.assertEqual(real_target.GetNumBreakpoints(), 1)
240
241        # Now source in the scripted module:
242        script_path = os.path.join(self.getSourceDir(), self.script_file)
243        self.runCmd(f"command script import '{script_path}'")
244
245        self.mux_target = self.duplicate_target(real_target)
246        self.assertTrue(self.mux_target.IsValid(), "duplicate target succeeded")
247
248        mux_class = f"{self.script_module}.MultiplexerScriptedProcess"
249        script_dict = {"driving_target_idx": real_target_id}
250        mux_launch_info = self.get_launch_info(mux_class, script_dict)
251        self.mux_process_listener = lldb.SBListener(
252            "lldb.test.interactive-scripted-process.listener"
253        )
254        mux_launch_info.SetShadowListener(self.mux_process_listener)
255
256        self.dbg.SetAsync(True)
257        error = lldb.SBError()
258        self.mux_process = self.mux_target.Launch(mux_launch_info, error)
259        self.assertSuccess(error, "Launched multiplexer scripted process")
260        self.assertTrue(self.mux_process.IsValid(), "Got a valid process")
261
262        # Check that the real process started running
263        event = lldbutil.fetch_next_event(
264            self, self.dbg.GetListener(), self.mux_process.GetBroadcaster()
265        )
266        self.assertState(lldb.SBProcess.GetStateFromEvent(event), lldb.eStateRunning)
267        # Check that the mux process started running
268        event = lldbutil.fetch_next_event(
269            self, self.mux_process_listener, self.mux_process.GetBroadcaster()
270        )
271        self.assertState(lldb.SBProcess.GetStateFromEvent(event), lldb.eStateRunning)
272
273        # Check that the real process stopped
274        event = lldbutil.fetch_next_event(
275            self, self.dbg.GetListener(), self.mux_process.GetBroadcaster()
276        )
277        self.assertState(lldb.SBProcess.GetStateFromEvent(event), lldb.eStateStopped)
278        # Check that the mux process stopped
279        event = lldbutil.fetch_next_event(
280            self, self.mux_process_listener, self.mux_process.GetBroadcaster()
281        )
282        self.assertState(lldb.SBProcess.GetStateFromEvent(event), lldb.eStateStopped)
283
284        real_process = real_target.GetProcess()
285        self.assertTrue(real_process.IsValid(), "Got a valid process")
286        self.assertState(
287            real_process.GetState(), lldb.eStateStopped, "Process is stopped"
288        )
289
290        # This is a passthrough, so the two processes should have the same state:
291        # Check that we got the right threads:
292        self.assertEqual(
293            len(real_process.threads),
294            len(self.mux_process.threads),
295            "Same number of threads",
296        )
297        for id in range(len(real_process.threads)):
298            real_pc = real_process.threads[id].frame[0].pc
299            mux_pc = self.mux_process.threads[id].frame[0].pc
300            self.assertEqual(real_pc, mux_pc, f"PC's equal for {id}")
301