1"""
2Test the functionality of interactive scripted processes
3"""
4
5import lldb
6import lldbsuite.test.lldbutil as lldbutil
7from lldbsuite.test.decorators import *
8from lldbsuite.test.lldbtest import *
9import json, os
10
11
12class TestInteractiveScriptedProcess(TestBase):
13
14    NO_DEBUG_INFO_TESTCASE = True
15
16    def setUp(self):
17        # Call super's setUp().
18        TestBase.setUp(self)
19        # Build and load test program
20        self.build()
21        self.runCmd("file " + self.getBuildArtifact("a.out"), CURRENT_EXECUTABLE_SET)
22        self.main_source_file = lldb.SBFileSpec("main.cpp")
23        self.script_module = "interactive_scripted_process"
24        self.script_file = self.script_module + ".py"
25
26    @skipUnlessDarwin
27    def test_passthrough_launch(self):
28        """Test a simple pass-through process launch"""
29        self.passthrough_launch()
30
31        lldbutil.run_break_set_by_source_regexp(self, "also break here")
32        self.assertEqual(self.mux_target.GetNumBreakpoints(), 2)
33        error = self.mux_process.Continue()
34        self.assertSuccess(error, "Resuming multiplexer scripted process")
35        self.assertTrue(self.mux_process.IsValid(), "Got a valid process")
36
37        event = lldbutil.fetch_next_event(
38            self, self.mux_process_listener, self.mux_process.GetBroadcaster()
39        )
40        self.assertState(lldb.SBProcess.GetStateFromEvent(event), lldb.eStateRunning)
41        event = lldbutil.fetch_next_event(
42            self, self.mux_process_listener, self.mux_process.GetBroadcaster()
43        )
44        self.assertState(lldb.SBProcess.GetStateFromEvent(event), lldb.eStateStopped)
45
46    @skipUnlessDarwin
47    def test_multiplexed_launch(self):
48        """Test a multiple interactive scripted process debugging"""
49        self.passthrough_launch()
50        self.assertEqual(self.dbg.GetNumTargets(), 2)
51
52        driving_target = self.mux_process.GetScriptedImplementation().driving_target
53        self.assertTrue(driving_target.IsValid(), "Driving target is invalid")
54
55        # Create a target for the multiplexed even scripted process
56        even_target = self.duplicate_target(driving_target)
57        self.assertTrue(
58            even_target.IsValid(),
59            "Couldn't duplicate driving target to launch multiplexed even scripted process",
60        )
61
62        class_name = f"{self.script_module}.MultiplexedScriptedProcess"
63        dictionary = {"driving_target_idx": self.dbg.GetIndexOfTarget(self.mux_target)}
64
65        dictionary["parity"] = 0
66        muxed_launch_info = self.get_launch_info(class_name, dictionary)
67
68        # Launch Even Child Scripted Process
69        error = lldb.SBError()
70        even_process = even_target.Launch(muxed_launch_info, error)
71        self.assertTrue(
72            even_process, "Couldn't launch multiplexed even scripted process"
73        )
74        self.multiplex(even_process)
75
76        # Check that the even process started running
77        event = lldbutil.fetch_next_event(
78            self, self.dbg.GetListener(), even_process.GetBroadcaster()
79        )
80        self.assertState(lldb.SBProcess.GetStateFromEvent(event), lldb.eStateRunning)
81        # Check that the even process stopped
82        event = lldbutil.fetch_next_event(
83            self, self.dbg.GetListener(), even_process.GetBroadcaster()
84        )
85        self.assertState(lldb.SBProcess.GetStateFromEvent(event), lldb.eStateStopped)
86
87        self.assertTrue(even_process.IsValid(), "Got a valid process")
88        self.assertState(
89            even_process.GetState(), lldb.eStateStopped, "Process is stopped"
90        )
91
92        # Create a target for the multiplexed odd scripted process
93        odd_target = self.duplicate_target(driving_target)
94        self.assertTrue(
95            odd_target.IsValid(),
96            "Couldn't duplicate driving target to launch multiplexed odd scripted process",
97        )
98
99        dictionary["parity"] = 1
100        muxed_launch_info = self.get_launch_info(class_name, dictionary)
101
102        # Launch Odd Child Scripted Process
103        error = lldb.SBError()
104        odd_process = odd_target.Launch(muxed_launch_info, error)
105        self.assertTrue(odd_process, "Couldn't launch multiplexed odd scripted process")
106        self.multiplex(odd_process)
107
108        # Check that the odd process started running
109        event = lldbutil.fetch_next_event(
110            self, self.dbg.GetListener(), odd_process.GetBroadcaster()
111        )
112        self.assertState(lldb.SBProcess.GetStateFromEvent(event), lldb.eStateRunning)
113        # Check that the odd process stopped
114        event = lldbutil.fetch_next_event(
115            self, self.dbg.GetListener(), odd_process.GetBroadcaster()
116        )
117        self.assertState(lldb.SBProcess.GetStateFromEvent(event), lldb.eStateStopped)
118
119        self.assertTrue(odd_process.IsValid(), "Got a valid process")
120        self.assertState(
121            odd_process.GetState(), lldb.eStateStopped, "Process is stopped"
122        )
123
124        # Set a breakpoint on the odd child process
125        bkpt = odd_target.BreakpointCreateBySourceRegex(
126            "also break here", self.main_source_file
127        )
128        self.assertEqual(odd_target.GetNumBreakpoints(), 1)
129        self.assertTrue(bkpt, "Second breakpoint set on child scripted process")
130        self.assertEqual(bkpt.GetNumLocations(), 1, "Second breakpoint has 1 location")
131
132        # Verify that the breakpoint was also set on the multiplexer & real target
133        self.assertEqual(self.mux_target.GetNumBreakpoints(), 2)
134        bkpt = self.mux_target.GetBreakpointAtIndex(1)
135        self.assertEqual(
136            bkpt.GetNumLocations(), 1, "Second breakpoint set on mux scripted process"
137        )
138        self.assertTrue(bkpt.MatchesName("multiplexed_scripted_process_421"))
139
140        self.assertGreater(driving_target.GetNumBreakpoints(), 1)
141
142        # Resume execution on child process
143        error = odd_process.Continue()
144        self.assertSuccess(error, "Resuming odd child scripted process")
145        self.assertTrue(odd_process.IsValid(), "Got a valid process")
146
147        # Since all the execution is asynchronous, the order in which events
148        # arrive is non-deterministic, so we need a data structure to make sure
149        # we received both the running and stopped event for each target.
150
151        # Initialize the execution event "bingo book", that maps a process index
152        # to a dictionary that contains flags that are not set for the process
153        # events that we care about (running & stopped)
154
155        execution_events = {
156            1: {lldb.eStateRunning: False, lldb.eStateStopped: False},
157            2: {lldb.eStateRunning: False, lldb.eStateStopped: False},
158            3: {lldb.eStateRunning: False, lldb.eStateStopped: False},
159        }
160
161        def fetch_process_event(self, execution_events):
162            event = lldbutil.fetch_next_event(
163                self,
164                self.dbg.GetListener(),
165                lldb.SBProcess.GetBroadcasterClass(),
166                match_class=True,
167            )
168            state = lldb.SBProcess.GetStateFromEvent(event)
169            self.assertIn(state, [lldb.eStateRunning, lldb.eStateStopped])
170            event_process = lldb.SBProcess.GetProcessFromEvent(event)
171            self.assertTrue(event_process.IsValid())
172            event_target = event_process.GetTarget()
173            event_target_idx = self.dbg.GetIndexOfTarget(event_target)
174            self.assertFalse(
175                execution_events[event_target_idx][state],
176                "Event already received for this process",
177            )
178            execution_events[event_target_idx][state] = True
179
180        event = lldbutil.fetch_next_event(
181            self, self.mux_process_listener, self.mux_process.GetBroadcaster()
182        )
183        self.assertState(lldb.SBProcess.GetStateFromEvent(event), lldb.eStateRunning)
184
185        event = lldbutil.fetch_next_event(
186            self, self.mux_process_listener, self.mux_process.GetBroadcaster()
187        )
188        self.assertState(lldb.SBProcess.GetStateFromEvent(event), lldb.eStateStopped)
189
190        for _ in range((self.dbg.GetNumTargets() - 1) * 2):
191            fetch_process_event(self, execution_events)
192
193        for target_index, event_states in execution_events.items():
194            for state, is_set in event_states.items():
195                self.assertTrue(is_set, f"Target {target_index} has state {state} set")
196
197    def duplicate_target(self, driving_target):
198        exe = driving_target.executable.fullpath
199        triple = driving_target.triple
200        return self.dbg.CreateTargetWithFileAndTargetTriple(exe, triple)
201
202    def get_launch_info(self, class_name, script_dict):
203        structured_data = lldb.SBStructuredData()
204        structured_data.SetFromJSON(json.dumps(script_dict))
205
206        launch_info = lldb.SBLaunchInfo(None)
207        launch_info.SetProcessPluginName("ScriptedProcess")
208        launch_info.SetScriptedProcessClassName(class_name)
209        launch_info.SetScriptedProcessDictionary(structured_data)
210        return launch_info
211
212    def multiplex(self, muxed_process):
213        muxed_process.GetScriptedImplementation().multiplexer = (
214            self.mux_process.GetScriptedImplementation()
215        )
216        self.mux_process.GetScriptedImplementation().multiplexed_processes[
217            muxed_process.GetProcessID()
218        ] = muxed_process
219
220    def passthrough_launch(self):
221        """Test that a simple passthrough wrapper functions correctly"""
222        # First build the real target:
223        self.assertEqual(self.dbg.GetNumTargets(), 1)
224        real_target_id = 0
225        real_target = self.dbg.GetTargetAtIndex(real_target_id)
226        lldbutil.run_break_set_by_source_regexp(self, "Break here")
227        self.assertEqual(real_target.GetNumBreakpoints(), 1)
228
229        # Now source in the scripted module:
230        script_path = os.path.join(self.getSourceDir(), self.script_file)
231        self.runCmd(f"command script import '{script_path}'")
232
233        self.mux_target = self.duplicate_target(real_target)
234        self.assertTrue(self.mux_target.IsValid(), "duplicate target succeeded")
235
236        mux_class = f"{self.script_module}.MultiplexerScriptedProcess"
237        script_dict = {"driving_target_idx": real_target_id}
238        mux_launch_info = self.get_launch_info(mux_class, script_dict)
239        self.mux_process_listener = lldb.SBListener(
240            "lldb.test.interactive-scripted-process.listener"
241        )
242        mux_launch_info.SetShadowListener(self.mux_process_listener)
243
244        self.dbg.SetAsync(True)
245        error = lldb.SBError()
246        self.mux_process = self.mux_target.Launch(mux_launch_info, error)
247        self.assertSuccess(error, "Launched multiplexer scripted process")
248        self.assertTrue(self.mux_process.IsValid(), "Got a valid process")
249
250        # Check that the mux process started running
251        event = lldbutil.fetch_next_event(
252            self, self.mux_process_listener, self.mux_process.GetBroadcaster()
253        )
254        self.assertState(lldb.SBProcess.GetStateFromEvent(event), lldb.eStateRunning)
255        # Check that the real process started running
256        event = lldbutil.fetch_next_event(
257            self, self.dbg.GetListener(), self.mux_process.GetBroadcaster()
258        )
259        self.assertState(lldb.SBProcess.GetStateFromEvent(event), lldb.eStateRunning)
260
261        # Check that the real process stopped
262        event = lldbutil.fetch_next_event(
263            self, self.dbg.GetListener(), self.mux_process.GetBroadcaster()
264        )
265        self.assertState(lldb.SBProcess.GetStateFromEvent(event), lldb.eStateStopped)
266        # Check that the mux process stopped
267        event = lldbutil.fetch_next_event(
268            self, self.mux_process_listener, self.mux_process.GetBroadcaster()
269        )
270        self.assertState(lldb.SBProcess.GetStateFromEvent(event), lldb.eStateStopped)
271
272        real_process = real_target.GetProcess()
273        self.assertTrue(real_process.IsValid(), "Got a valid process")
274        self.assertState(
275            real_process.GetState(), lldb.eStateStopped, "Process is stopped"
276        )
277
278        # This is a passthrough, so the two processes should have the same state:
279        # Check that we got the right threads:
280        self.assertEqual(
281            len(real_process.threads),
282            len(self.mux_process.threads),
283            "Same number of threads",
284        )
285        for id in range(len(real_process.threads)):
286            real_pc = real_process.threads[id].frame[0].pc
287            mux_pc = self.mux_process.threads[id].frame[0].pc
288            self.assertEqual(real_pc, mux_pc, f"PC's equal for {id}")
289