xref: /llvm-project/lldb/test/API/tools/lldb-dap/attach/TestDAP_attach.py (revision e054712a85f924e0afe7f180fd960be7a8214d64)
1"""
2Test lldb-dap setBreakpoints request
3"""
4
5
6import dap_server
7from lldbsuite.test.decorators import *
8from lldbsuite.test.lldbtest import *
9from lldbsuite.test import lldbutil
10import lldbdap_testcase
11import os
12import shutil
13import subprocess
14import tempfile
15import threading
16import time
17
18
19def spawn_and_wait(program, delay):
20    if delay:
21        time.sleep(delay)
22    process = subprocess.Popen(
23        [program], stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE
24    )
25    process.wait()
26
27
28class TestDAP_attach(lldbdap_testcase.DAPTestCaseBase):
29    def set_and_hit_breakpoint(self, continueToExit=True):
30        source = "main.c"
31        breakpoint1_line = line_number(source, "// breakpoint 1")
32        lines = [breakpoint1_line]
33        # Set breakpoint in the thread function so we can step the threads
34        breakpoint_ids = self.set_source_breakpoints(source, lines)
35        self.assertEqual(
36            len(breakpoint_ids), len(lines), "expect correct number of breakpoints"
37        )
38        self.continue_to_breakpoints(breakpoint_ids)
39        if continueToExit:
40            self.continue_to_exit()
41
42    @skipIfNetBSD  # Hangs on NetBSD as well
43    def test_by_pid(self):
44        """
45        Tests attaching to a process by process ID.
46        """
47        self.build_and_create_debug_adaptor()
48        program = self.getBuildArtifact("a.out")
49        self.process = subprocess.Popen(
50            [program],
51            stdin=subprocess.PIPE,
52            stdout=subprocess.PIPE,
53            stderr=subprocess.PIPE,
54        )
55        self.attach(pid=self.process.pid)
56        self.set_and_hit_breakpoint(continueToExit=True)
57
58    @skipIfNetBSD  # Hangs on NetBSD as well
59    def test_by_name(self):
60        """
61        Tests attaching to a process by process name.
62        """
63        self.build_and_create_debug_adaptor()
64        orig_program = self.getBuildArtifact("a.out")
65        # Since we are going to attach by process name, we need a unique
66        # process name that has minimal chance to match a process that is
67        # already running. To do this we use tempfile.mktemp() to give us a
68        # full path to a location where we can copy our executable. We then
69        # run this copy to ensure we don't get the error "more that one
70        # process matches 'a.out'".
71        program = tempfile.mktemp()
72        shutil.copyfile(orig_program, program)
73        shutil.copymode(orig_program, program)
74
75        # Use a file as a synchronization point between test and inferior.
76        pid_file_path = lldbutil.append_to_process_working_directory(
77            self, "pid_file_%d" % (int(time.time()))
78        )
79
80        def cleanup():
81            if os.path.exists(program):
82                os.unlink(program)
83            self.run_platform_command("rm %s" % (pid_file_path))
84
85        # Execute the cleanup function during test case tear down.
86        self.addTearDownHook(cleanup)
87
88        popen = self.spawnSubprocess(program, [pid_file_path])
89
90        pid = lldbutil.wait_for_file_on_target(self, pid_file_path)
91
92        self.attach(program=program)
93        self.set_and_hit_breakpoint(continueToExit=True)
94
95    @skipUnlessDarwin
96    @skipIfDarwin
97    @skipIfNetBSD  # Hangs on NetBSD as well
98    def test_by_name_waitFor(self):
99        """
100        Tests attaching to a process by process name and waiting for the
101        next instance of a process to be launched, ingoring all current
102        ones.
103        """
104        self.build_and_create_debug_adaptor()
105        program = self.getBuildArtifact("a.out")
106        self.spawn_thread = threading.Thread(
107            target=spawn_and_wait,
108            args=(
109                program,
110                1.0,
111            ),
112        )
113        self.spawn_thread.start()
114        self.attach(program=program, waitFor=True)
115        self.set_and_hit_breakpoint(continueToExit=True)
116
117    @skipIfDarwin
118    @skipIfNetBSD  # Hangs on NetBSD as well
119    def test_commands(self):
120        """
121        Tests the "initCommands", "preRunCommands", "stopCommands",
122        "exitCommands", "terminateCommands" and "attachCommands"
123        that can be passed during attach.
124
125        "initCommands" are a list of LLDB commands that get executed
126        before the targt is created.
127        "preRunCommands" are a list of LLDB commands that get executed
128        after the target has been created and before the launch.
129        "stopCommands" are a list of LLDB commands that get executed each
130        time the program stops.
131        "exitCommands" are a list of LLDB commands that get executed when
132        the process exits
133        "attachCommands" are a list of LLDB commands that get executed and
134        must have a valid process in the selected target in LLDB after
135        they are done executing. This allows custom commands to create any
136        kind of debug session.
137        "terminateCommands" are a list of LLDB commands that get executed when
138        the debugger session terminates.
139        """
140        self.build_and_create_debug_adaptor()
141        program = self.getBuildArtifact("a.out")
142        # Here we just create a target and launch the process as a way to test
143        # if we are able to use attach commands to create any kind of a target
144        # and use it for debugging
145        attachCommands = [
146            'target create -d "%s"' % (program),
147            "process launch --stop-at-entry",
148        ]
149        initCommands = ["target list", "platform list"]
150        preRunCommands = ["image list a.out", "image dump sections a.out"]
151        postRunCommands = ["help trace", "help process trace"]
152        stopCommands = ["frame variable", "thread backtrace"]
153        exitCommands = ["expr 2+3", "expr 3+4"]
154        terminateCommands = ["expr 4+2"]
155        self.attach(
156            program=program,
157            attachCommands=attachCommands,
158            initCommands=initCommands,
159            preRunCommands=preRunCommands,
160            stopCommands=stopCommands,
161            exitCommands=exitCommands,
162            terminateCommands=terminateCommands,
163            postRunCommands=postRunCommands,
164        )
165        # Get output from the console. This should contain both the
166        # "initCommands" and the "preRunCommands".
167        output = self.get_console()
168        # Verify all "initCommands" were found in console output
169        self.verify_commands("initCommands", output, initCommands)
170        # Verify all "preRunCommands" were found in console output
171        self.verify_commands("preRunCommands", output, preRunCommands)
172        # Verify all "postRunCommands" were found in console output
173        self.verify_commands("postRunCommands", output, postRunCommands)
174
175        functions = ["main"]
176        breakpoint_ids = self.set_function_breakpoints(functions)
177        self.assertEqual(len(breakpoint_ids), len(functions), "expect one breakpoint")
178        self.continue_to_breakpoints(breakpoint_ids)
179        output = self.collect_console(timeout_secs=10, pattern=stopCommands[-1])
180        self.verify_commands("stopCommands", output, stopCommands)
181
182        # Continue after launch and hit the "pause()" call and stop the target.
183        # Get output from the console. This should contain both the
184        # "stopCommands" that were run after we stop.
185        self.dap_server.request_continue()
186        time.sleep(0.5)
187        self.dap_server.request_pause()
188        self.dap_server.wait_for_stopped()
189        output = self.collect_console(timeout_secs=10, pattern=stopCommands[-1])
190        self.verify_commands("stopCommands", output, stopCommands)
191
192        # Continue until the program exits
193        self.continue_to_exit()
194        # Get output from the console. This should contain both the
195        # "exitCommands" that were run after the second breakpoint was hit
196        # and the "terminateCommands" due to the debugging session ending
197        output = self.collect_console(
198            timeout_secs=10.0,
199            pattern=terminateCommands[0],
200        )
201        self.verify_commands("exitCommands", output, exitCommands)
202        self.verify_commands("terminateCommands", output, terminateCommands)
203
204    @skipIfDarwin
205    @skipIfNetBSD  # Hangs on NetBSD as well
206    @skipIf(
207        archs=["arm", "aarch64"]
208    )  # Example of a flaky run http://lab.llvm.org:8011/builders/lldb-aarch64-ubuntu/builds/5517/steps/test/logs/stdio
209    def test_terminate_commands(self):
210        """
211        Tests that the "terminateCommands", that can be passed during
212        attach, are run when the debugger is disconnected.
213        """
214        self.build_and_create_debug_adaptor()
215        program = self.getBuildArtifact("a.out")
216        # Here we just create a target and launch the process as a way to test
217        # if we are able to use attach commands to create any kind of a target
218        # and use it for debugging
219        attachCommands = [
220            'target create -d "%s"' % (program),
221            "process launch --stop-at-entry",
222        ]
223        terminateCommands = ["expr 4+2"]
224        self.attach(
225            program=program,
226            attachCommands=attachCommands,
227            terminateCommands=terminateCommands,
228            disconnectAutomatically=False,
229        )
230        self.get_console()
231        # Once it's disconnected the console should contain the
232        # "terminateCommands"
233        self.dap_server.request_disconnect(terminateDebuggee=True)
234        output = self.collect_console(
235            timeout_secs=1.0,
236            pattern=terminateCommands[0],
237        )
238        self.verify_commands("terminateCommands", output, terminateCommands)
239