1"""
2Test the functionality of interactive scripted processes
3"""
4
5import lldb
6import lldbsuite.test.lldbutil as lldbutil
7from lldbsuite.test.decorators import *
8from lldbsuite.test.lldbtest import *
9import json, os
10
11
12class TestInteractiveScriptedProcess(TestBase):
13    NO_DEBUG_INFO_TESTCASE = True
14
15    def setUp(self):
16        # Call super's setUp().
17        TestBase.setUp(self)
18        # Build and load test program
19        self.build()
20        self.runCmd("file " + self.getBuildArtifact("a.out"), CURRENT_EXECUTABLE_SET)
21        self.main_source_file = lldb.SBFileSpec("main.cpp")
22        self.script_module = "interactive_scripted_process"
23        self.script_file = self.script_module + ".py"
24
25    @skipUnlessDarwin
26    @expectedFailureDarwin
27    def test_passthrough_launch(self):
28        """Test a simple pass-through process launch"""
29        self.passthrough_launch()
30
31        lldbutil.run_break_set_by_source_regexp(self, "also break here")
32        self.assertEqual(self.mux_target.GetNumBreakpoints(), 2)
33        error = self.mux_process.Continue()
34        self.assertSuccess(error, "Resuming multiplexer scripted process")
35        self.assertTrue(self.mux_process.IsValid(), "Got a valid process")
36
37        event = lldbutil.fetch_next_event(
38            self, self.mux_process_listener, self.mux_process.GetBroadcaster()
39        )
40        self.assertState(lldb.SBProcess.GetStateFromEvent(event), lldb.eStateRunning)
41        event = lldbutil.fetch_next_event(
42            self, self.mux_process_listener, self.mux_process.GetBroadcaster()
43        )
44        self.assertState(lldb.SBProcess.GetStateFromEvent(event), lldb.eStateStopped)
45
46    @skipUnlessDarwin
47    @expectedFailureDarwin
48    def test_multiplexed_launch(self):
49        """Test a multiple interactive scripted process debugging"""
50        self.passthrough_launch()
51        self.assertEqual(self.dbg.GetNumTargets(), 2)
52
53        driving_target = self.mux_process.GetScriptedImplementation().driving_target
54        self.assertTrue(driving_target.IsValid(), "Driving target is invalid")
55
56        # Create a target for the multiplexed even scripted process
57        even_target = self.duplicate_target(driving_target)
58        self.assertTrue(
59            even_target.IsValid(),
60            "Couldn't duplicate driving target to launch multiplexed even scripted process",
61        )
62
63        class_name = f"{self.script_module}.MultiplexedScriptedProcess"
64        dictionary = {"driving_target_idx": self.dbg.GetIndexOfTarget(self.mux_target)}
65
66        dictionary["parity"] = 0
67        muxed_launch_info = self.get_launch_info(class_name, dictionary)
68
69        # Launch Even Child Scripted Process
70        error = lldb.SBError()
71        even_process = even_target.Launch(muxed_launch_info, error)
72        self.assertTrue(
73            even_process, "Couldn't launch multiplexed even scripted process"
74        )
75        self.multiplex(even_process)
76
77        # Check that the even process started running
78        event = lldbutil.fetch_next_event(
79            self, self.dbg.GetListener(), even_process.GetBroadcaster()
80        )
81        self.assertState(lldb.SBProcess.GetStateFromEvent(event), lldb.eStateRunning)
82        # Check that the even process stopped
83        event = lldbutil.fetch_next_event(
84            self, self.dbg.GetListener(), even_process.GetBroadcaster()
85        )
86        self.assertState(lldb.SBProcess.GetStateFromEvent(event), lldb.eStateStopped)
87
88        self.assertTrue(even_process.IsValid(), "Got a valid process")
89        self.assertState(
90            even_process.GetState(), lldb.eStateStopped, "Process is stopped"
91        )
92
93        # Create a target for the multiplexed odd scripted process
94        odd_target = self.duplicate_target(driving_target)
95        self.assertTrue(
96            odd_target.IsValid(),
97            "Couldn't duplicate driving target to launch multiplexed odd scripted process",
98        )
99
100        dictionary["parity"] = 1
101        muxed_launch_info = self.get_launch_info(class_name, dictionary)
102
103        # Launch Odd Child Scripted Process
104        error = lldb.SBError()
105        odd_process = odd_target.Launch(muxed_launch_info, error)
106        self.assertTrue(odd_process, "Couldn't launch multiplexed odd scripted process")
107        self.multiplex(odd_process)
108
109        # Check that the odd process started running
110        event = lldbutil.fetch_next_event(
111            self, self.dbg.GetListener(), odd_process.GetBroadcaster()
112        )
113        self.assertState(lldb.SBProcess.GetStateFromEvent(event), lldb.eStateRunning)
114        # Check that the odd process stopped
115        event = lldbutil.fetch_next_event(
116            self, self.dbg.GetListener(), odd_process.GetBroadcaster()
117        )
118        self.assertState(lldb.SBProcess.GetStateFromEvent(event), lldb.eStateStopped)
119
120        self.assertTrue(odd_process.IsValid(), "Got a valid process")
121        self.assertState(
122            odd_process.GetState(), lldb.eStateStopped, "Process is stopped"
123        )
124
125        # Set a breakpoint on the odd child process
126        bkpt = odd_target.BreakpointCreateBySourceRegex(
127            "also break here", self.main_source_file
128        )
129        self.assertEqual(odd_target.GetNumBreakpoints(), 1)
130        self.assertTrue(bkpt, "Second breakpoint set on child scripted process")
131        self.assertEqual(bkpt.GetNumLocations(), 1, "Second breakpoint has 1 location")
132
133        # Verify that the breakpoint was also set on the multiplexer & real target
134        self.assertEqual(self.mux_target.GetNumBreakpoints(), 2)
135        bkpt = self.mux_target.GetBreakpointAtIndex(1)
136        self.assertEqual(
137            bkpt.GetNumLocations(), 1, "Second breakpoint set on mux scripted process"
138        )
139        self.assertTrue(bkpt.MatchesName("multiplexed_scripted_process_421"))
140
141        self.assertGreater(driving_target.GetNumBreakpoints(), 1)
142
143        # Resume execution on child process
144        error = odd_process.Continue()
145        self.assertSuccess(error, "Resuming odd child scripted process")
146        self.assertTrue(odd_process.IsValid(), "Got a valid process")
147
148        # Since all the execution is asynchronous, the order in which events
149        # arrive is non-deterministic, so we need a data structure to make sure
150        # we received both the running and stopped event for each target.
151
152        # Initialize the execution event "bingo book", that maps a process index
153        # to a dictionary that contains flags that are not set for the process
154        # events that we care about (running & stopped)
155
156        execution_events = {
157            1: {lldb.eStateRunning: False, lldb.eStateStopped: False},
158            2: {lldb.eStateRunning: False, lldb.eStateStopped: False},
159            3: {lldb.eStateRunning: False, lldb.eStateStopped: False},
160        }
161
162        def fetch_process_event(self, execution_events):
163            event = lldbutil.fetch_next_event(
164                self,
165                self.dbg.GetListener(),
166                lldb.SBProcess.GetBroadcasterClass(),
167                match_class=True,
168            )
169            state = lldb.SBProcess.GetStateFromEvent(event)
170            self.assertIn(state, [lldb.eStateRunning, lldb.eStateStopped])
171            event_process = lldb.SBProcess.GetProcessFromEvent(event)
172            self.assertTrue(event_process.IsValid())
173            event_target = event_process.GetTarget()
174            event_target_idx = self.dbg.GetIndexOfTarget(event_target)
175            self.assertFalse(
176                execution_events[event_target_idx][state],
177                "Event already received for this process",
178            )
179            execution_events[event_target_idx][state] = True
180
181        event = lldbutil.fetch_next_event(
182            self, self.mux_process_listener, self.mux_process.GetBroadcaster()
183        )
184        self.assertState(lldb.SBProcess.GetStateFromEvent(event), lldb.eStateRunning)
185
186        event = lldbutil.fetch_next_event(
187            self, self.mux_process_listener, self.mux_process.GetBroadcaster()
188        )
189        self.assertState(lldb.SBProcess.GetStateFromEvent(event), lldb.eStateStopped)
190
191        for _ in range((self.dbg.GetNumTargets() - 1) * 2):
192            fetch_process_event(self, execution_events)
193
194        for target_index, event_states in execution_events.items():
195            for state, is_set in event_states.items():
196                self.assertTrue(is_set, f"Target {target_index} has state {state} set")
197
198    def duplicate_target(self, driving_target):
199        exe = driving_target.executable.fullpath
200        triple = driving_target.triple
201        return self.dbg.CreateTargetWithFileAndTargetTriple(exe, triple)
202
203    def get_launch_info(self, class_name, script_dict):
204        structured_data = lldb.SBStructuredData()
205        structured_data.SetFromJSON(json.dumps(script_dict))
206
207        launch_info = lldb.SBLaunchInfo(None)
208        launch_info.SetProcessPluginName("ScriptedProcess")
209        launch_info.SetScriptedProcessClassName(class_name)
210        launch_info.SetScriptedProcessDictionary(structured_data)
211        return launch_info
212
213    def multiplex(self, muxed_process):
214        muxed_process.GetScriptedImplementation().multiplexer = (
215            self.mux_process.GetScriptedImplementation()
216        )
217        self.mux_process.GetScriptedImplementation().multiplexed_processes[
218            muxed_process.GetProcessID()
219        ] = muxed_process
220
221    def passthrough_launch(self):
222        """Test that a simple passthrough wrapper functions correctly"""
223        # First build the real target:
224        self.assertEqual(self.dbg.GetNumTargets(), 1)
225        real_target_id = 0
226        real_target = self.dbg.GetTargetAtIndex(real_target_id)
227        lldbutil.run_break_set_by_source_regexp(self, "Break here")
228        self.assertEqual(real_target.GetNumBreakpoints(), 1)
229
230        # Now source in the scripted module:
231        script_path = os.path.join(self.getSourceDir(), self.script_file)
232        self.runCmd(f"command script import '{script_path}'")
233
234        self.mux_target = self.duplicate_target(real_target)
235        self.assertTrue(self.mux_target.IsValid(), "duplicate target succeeded")
236
237        mux_class = f"{self.script_module}.MultiplexerScriptedProcess"
238        script_dict = {"driving_target_idx": real_target_id}
239        mux_launch_info = self.get_launch_info(mux_class, script_dict)
240        self.mux_process_listener = lldb.SBListener(
241            "lldb.test.interactive-scripted-process.listener"
242        )
243        mux_launch_info.SetShadowListener(self.mux_process_listener)
244
245        self.dbg.SetAsync(True)
246        error = lldb.SBError()
247        self.mux_process = self.mux_target.Launch(mux_launch_info, error)
248        self.assertSuccess(error, "Launched multiplexer scripted process")
249        self.assertTrue(self.mux_process.IsValid(), "Got a valid process")
250
251        # Check that the mux process started running
252        event = lldbutil.fetch_next_event(
253            self, self.mux_process_listener, self.mux_process.GetBroadcaster()
254        )
255        self.assertState(lldb.SBProcess.GetStateFromEvent(event), lldb.eStateRunning)
256        # Check that the real process started running
257        event = lldbutil.fetch_next_event(
258            self, self.dbg.GetListener(), self.mux_process.GetBroadcaster()
259        )
260        self.assertState(lldb.SBProcess.GetStateFromEvent(event), lldb.eStateRunning)
261
262        # Check that the real process stopped
263        event = lldbutil.fetch_next_event(
264            self, self.dbg.GetListener(), self.mux_process.GetBroadcaster()
265        )
266        self.assertState(lldb.SBProcess.GetStateFromEvent(event), lldb.eStateStopped)
267        # Check that the mux process stopped
268        event = lldbutil.fetch_next_event(
269            self, self.mux_process_listener, self.mux_process.GetBroadcaster()
270        )
271        self.assertState(lldb.SBProcess.GetStateFromEvent(event), lldb.eStateStopped)
272
273        real_process = real_target.GetProcess()
274        self.assertTrue(real_process.IsValid(), "Got a valid process")
275        self.assertState(
276            real_process.GetState(), lldb.eStateStopped, "Process is stopped"
277        )
278
279        # This is a passthrough, so the two processes should have the same state:
280        # Check that we got the right threads:
281        self.assertEqual(
282            len(real_process.threads),
283            len(self.mux_process.threads),
284            "Same number of threads",
285        )
286        for id in range(len(real_process.threads)):
287            real_pc = real_process.threads[id].frame[0].pc
288            mux_pc = self.mux_process.threads[id].frame[0].pc
289            self.assertEqual(real_pc, mux_pc, f"PC's equal for {id}")
290