xref: /llvm-project/lldb/packages/Python/lldbsuite/test/tools/lldb-dap/lldbdap_testcase.py (revision a52be0cc114cc58a35bee65c517adaeb66ee6d89)
1import os
2import time
3
4import dap_server
5from lldbsuite.test.lldbtest import *
6from lldbsuite.test import lldbplatformutil
7import lldbgdbserverutils
8
9
10class DAPTestCaseBase(TestBase):
11    # set timeout based on whether ASAN was enabled or not. Increase
12    # timeout by a factor of 10 if ASAN is enabled.
13    timeoutval = 10 * (10 if ('ASAN_OPTIONS' in os.environ) else 1)
14    NO_DEBUG_INFO_TESTCASE = True
15
16    def create_debug_adaptor(self, lldbDAPEnv=None):
17        """Create the Visual Studio Code debug adaptor"""
18        self.assertTrue(
19            is_exe(self.lldbDAPExec), "lldb-dap must exist and be executable"
20        )
21        log_file_path = self.getBuildArtifact("dap.txt")
22        self.dap_server = dap_server.DebugAdaptorServer(
23            executable=self.lldbDAPExec,
24            init_commands=self.setUpCommands(),
25            log_file=log_file_path,
26            env=lldbDAPEnv,
27        )
28
29    def build_and_create_debug_adaptor(self, lldbDAPEnv=None):
30        self.build()
31        self.create_debug_adaptor(lldbDAPEnv)
32
33    def set_source_breakpoints(self, source_path, lines, data=None):
34        """Sets source breakpoints and returns an array of strings containing
35        the breakpoint IDs ("1", "2") for each breakpoint that was set.
36        Parameter data is array of data objects for breakpoints.
37        Each object in data is 1:1 mapping with the entry in lines.
38        It contains optional location/hitCondition/logMessage parameters.
39        """
40        response = self.dap_server.request_setBreakpoints(source_path, lines, data)
41        if response is None:
42            return []
43        breakpoints = response["body"]["breakpoints"]
44        breakpoint_ids = []
45        for breakpoint in breakpoints:
46            breakpoint_ids.append("%i" % (breakpoint["id"]))
47        return breakpoint_ids
48
49    def set_function_breakpoints(self, functions, condition=None, hitCondition=None):
50        """Sets breakpoints by function name given an array of function names
51        and returns an array of strings containing the breakpoint IDs
52        ("1", "2") for each breakpoint that was set.
53        """
54        response = self.dap_server.request_setFunctionBreakpoints(
55            functions, condition=condition, hitCondition=hitCondition
56        )
57        if response is None:
58            return []
59        breakpoints = response["body"]["breakpoints"]
60        breakpoint_ids = []
61        for breakpoint in breakpoints:
62            breakpoint_ids.append("%i" % (breakpoint["id"]))
63        return breakpoint_ids
64
65    def waitUntil(self, condition_callback):
66        for _ in range(20):
67            if condition_callback():
68                return True
69            time.sleep(0.5)
70        return False
71
72    def verify_breakpoint_hit(self, breakpoint_ids):
73        """Wait for the process we are debugging to stop, and verify we hit
74        any breakpoint location in the "breakpoint_ids" array.
75        "breakpoint_ids" should be a list of breakpoint ID strings
76        (["1", "2"]). The return value from self.set_source_breakpoints()
77        or self.set_function_breakpoints() can be passed to this function"""
78        stopped_events = self.dap_server.wait_for_stopped()
79        for stopped_event in stopped_events:
80            if "body" in stopped_event:
81                body = stopped_event["body"]
82                if "reason" not in body:
83                    continue
84                if body["reason"] != "breakpoint":
85                    continue
86                if "description" not in body:
87                    continue
88                # Descriptions for breakpoints will be in the form
89                # "breakpoint 1.1", so look for any description that matches
90                # ("breakpoint 1.") in the description field as verification
91                # that one of the breakpoint locations was hit. DAP doesn't
92                # allow breakpoints to have multiple locations, but LLDB does.
93                # So when looking at the description we just want to make sure
94                # the right breakpoint matches and not worry about the actual
95                # location.
96                description = body["description"]
97                for breakpoint_id in breakpoint_ids:
98                    match_desc = "breakpoint %s." % (breakpoint_id)
99                    if match_desc in description:
100                        return
101        self.assertTrue(False, "breakpoint not hit")
102
103    def verify_stop_exception_info(self, expected_description):
104        """Wait for the process we are debugging to stop, and verify the stop
105        reason is 'exception' and that the description matches
106        'expected_description'
107        """
108        stopped_events = self.dap_server.wait_for_stopped()
109        for stopped_event in stopped_events:
110            if "body" in stopped_event:
111                body = stopped_event["body"]
112                if "reason" not in body:
113                    continue
114                if body["reason"] != "exception":
115                    continue
116                if "description" not in body:
117                    continue
118                description = body["description"]
119                if expected_description == description:
120                    return True
121        return False
122
123    def verify_commands(self, flavor, output, commands):
124        self.assertTrue(output and len(output) > 0, "expect console output")
125        lines = output.splitlines()
126        prefix = "(lldb) "
127        for cmd in commands:
128            found = False
129            for line in lines:
130                if len(cmd) > 0 and (cmd[0] == "!" or cmd[0] == "?"):
131                    cmd = cmd[1:]
132                if line.startswith(prefix) and cmd in line:
133                    found = True
134                    break
135            self.assertTrue(
136                found, "verify '%s' found in console output for '%s'" % (cmd, flavor)
137            )
138
139    def get_dict_value(self, d, key_path):
140        """Verify each key in the key_path array is in contained in each
141        dictionary within "d". Assert if any key isn't in the
142        corresponding dictionary. This is handy for grabbing values from VS
143        Code response dictionary like getting
144        response['body']['stackFrames']
145        """
146        value = d
147        for key in key_path:
148            if key in value:
149                value = value[key]
150            else:
151                self.assertTrue(
152                    key in value,
153                    'key "%s" from key_path "%s" not in "%s"' % (key, key_path, d),
154                )
155        return value
156
157    def get_stackFrames_and_totalFramesCount(
158        self, threadId=None, startFrame=None, levels=None, dump=False
159    ):
160        response = self.dap_server.request_stackTrace(
161            threadId=threadId, startFrame=startFrame, levels=levels, dump=dump
162        )
163        if response:
164            stackFrames = self.get_dict_value(response, ["body", "stackFrames"])
165            totalFrames = self.get_dict_value(response, ["body", "totalFrames"])
166            self.assertTrue(
167                totalFrames > 0,
168                "verify totalFrames count is provided by extension that supports "
169                "async frames loading",
170            )
171            return (stackFrames, totalFrames)
172        return (None, 0)
173
174    def get_stackFrames(self, threadId=None, startFrame=None, levels=None, dump=False):
175        (stackFrames, totalFrames) = self.get_stackFrames_and_totalFramesCount(
176            threadId=threadId, startFrame=startFrame, levels=levels, dump=dump
177        )
178        return stackFrames
179
180    def get_source_and_line(self, threadId=None, frameIndex=0):
181        stackFrames = self.get_stackFrames(
182            threadId=threadId, startFrame=frameIndex, levels=1
183        )
184        if stackFrames is not None:
185            stackFrame = stackFrames[0]
186            ["source", "path"]
187            if "source" in stackFrame:
188                source = stackFrame["source"]
189                if "path" in source:
190                    if "line" in stackFrame:
191                        return (source["path"], stackFrame["line"])
192        return ("", 0)
193
194    def get_stdout(self, timeout=0.0):
195        return self.dap_server.get_output("stdout", timeout=timeout)
196
197    def get_console(self, timeout=0.0):
198        return self.dap_server.get_output("console", timeout=timeout)
199
200    def collect_console(self, timeout_secs, pattern=None):
201        return self.dap_server.collect_output(
202            "console", timeout_secs=timeout_secs, pattern=pattern
203        )
204
205    def get_local_as_int(self, name, threadId=None):
206        value = self.dap_server.get_local_variable_value(name, threadId=threadId)
207        # 'value' may have the variable value and summary.
208        # Extract the variable value since summary can have nonnumeric characters.
209        value = value.split(" ")[0]
210        if value.startswith("0x"):
211            return int(value, 16)
212        elif value.startswith("0"):
213            return int(value, 8)
214        else:
215            return int(value)
216
217    def set_local(self, name, value, id=None):
218        """Set a top level local variable only."""
219        return self.dap_server.request_setVariable(1, name, str(value), id=id)
220
221    def set_global(self, name, value, id=None):
222        """Set a top level global variable only."""
223        return self.dap_server.request_setVariable(2, name, str(value), id=id)
224
225    def stepIn(self, threadId=None, targetId=None, waitForStop=True):
226        self.dap_server.request_stepIn(threadId=threadId, targetId=targetId)
227        if waitForStop:
228            return self.dap_server.wait_for_stopped()
229        return None
230
231    def stepOver(self, threadId=None, waitForStop=True):
232        self.dap_server.request_next(threadId=threadId)
233        if waitForStop:
234            return self.dap_server.wait_for_stopped()
235        return None
236
237    def stepOut(self, threadId=None, waitForStop=True):
238        self.dap_server.request_stepOut(threadId=threadId)
239        if waitForStop:
240            return self.dap_server.wait_for_stopped()
241        return None
242
243    def continue_to_next_stop(self):
244        self.dap_server.request_continue()
245        return self.dap_server.wait_for_stopped()
246
247    def continue_to_breakpoints(self, breakpoint_ids):
248        self.dap_server.request_continue()
249        self.verify_breakpoint_hit(breakpoint_ids)
250
251    def continue_to_exception_breakpoint(self, filter_label):
252        self.dap_server.request_continue()
253        self.assertTrue(
254            self.verify_stop_exception_info(filter_label),
255            'verify we got "%s"' % (filter_label),
256        )
257
258    def continue_to_exit(self, exitCode=0):
259        self.dap_server.request_continue()
260        stopped_events = self.dap_server.wait_for_stopped()
261        self.assertEqual(
262            len(stopped_events), 1, "stopped_events = {}".format(stopped_events)
263        )
264        self.assertEqual(
265            stopped_events[0]["event"], "exited", "make sure program ran to completion"
266        )
267        self.assertEqual(
268            stopped_events[0]["body"]["exitCode"],
269            exitCode,
270            "exitCode == %i" % (exitCode),
271        )
272
273    def disassemble(self, threadId=None, frameIndex=None):
274        stackFrames = self.get_stackFrames(
275            threadId=threadId, startFrame=frameIndex, levels=1
276        )
277        self.assertIsNotNone(stackFrames)
278        memoryReference = stackFrames[0]["instructionPointerReference"]
279        self.assertIsNotNone(memoryReference)
280
281        if memoryReference not in self.dap_server.disassembled_instructions:
282            self.dap_server.request_disassemble(memoryReference=memoryReference)
283
284        return self.dap_server.disassembled_instructions[memoryReference]
285
286    def attach(
287        self,
288        program=None,
289        pid=None,
290        waitFor=None,
291        trace=None,
292        initCommands=None,
293        preRunCommands=None,
294        stopCommands=None,
295        exitCommands=None,
296        attachCommands=None,
297        coreFile=None,
298        disconnectAutomatically=True,
299        terminateCommands=None,
300        postRunCommands=None,
301        sourceMap=None,
302        sourceInitFile=False,
303        expectFailure=False,
304        gdbRemotePort=None,
305        gdbRemoteHostname=None,
306    ):
307        """Build the default Makefile target, create the DAP debug adaptor,
308        and attach to the process.
309        """
310
311        # Make sure we disconnect and terminate the DAP debug adaptor even
312        # if we throw an exception during the test case.
313        def cleanup():
314            if disconnectAutomatically:
315                self.dap_server.request_disconnect(terminateDebuggee=True)
316            self.dap_server.terminate()
317
318        # Execute the cleanup function during test case tear down.
319        self.addTearDownHook(cleanup)
320        # Initialize and launch the program
321        self.dap_server.request_initialize(sourceInitFile)
322        response = self.dap_server.request_attach(
323            program=program,
324            pid=pid,
325            waitFor=waitFor,
326            trace=trace,
327            initCommands=initCommands,
328            preRunCommands=preRunCommands,
329            stopCommands=stopCommands,
330            exitCommands=exitCommands,
331            attachCommands=attachCommands,
332            terminateCommands=terminateCommands,
333            coreFile=coreFile,
334            postRunCommands=postRunCommands,
335            sourceMap=sourceMap,
336            gdbRemotePort=gdbRemotePort,
337            gdbRemoteHostname=gdbRemoteHostname,
338        )
339        if expectFailure:
340            return response
341        if not (response and response["success"]):
342            self.assertTrue(
343                response["success"], "attach failed (%s)" % (response["message"])
344            )
345
346    def launch(
347        self,
348        program=None,
349        args=None,
350        cwd=None,
351        env=None,
352        stopOnEntry=False,
353        disableASLR=True,
354        disableSTDIO=False,
355        shellExpandArguments=False,
356        trace=False,
357        initCommands=None,
358        preRunCommands=None,
359        stopCommands=None,
360        exitCommands=None,
361        terminateCommands=None,
362        sourcePath=None,
363        debuggerRoot=None,
364        sourceInitFile=False,
365        launchCommands=None,
366        sourceMap=None,
367        disconnectAutomatically=True,
368        runInTerminal=False,
369        expectFailure=False,
370        postRunCommands=None,
371        enableAutoVariableSummaries=False,
372        enableSyntheticChildDebugging=False,
373        commandEscapePrefix=None,
374        customFrameFormat=None,
375        customThreadFormat=None,
376    ):
377        """Sending launch request to dap"""
378
379        # Make sure we disconnect and terminate the DAP debug adapter,
380        # if we throw an exception during the test case
381        def cleanup():
382            if disconnectAutomatically:
383                self.dap_server.request_disconnect(terminateDebuggee=True)
384            self.dap_server.terminate()
385
386        # Execute the cleanup function during test case tear down.
387        self.addTearDownHook(cleanup)
388
389        # Initialize and launch the program
390        self.dap_server.request_initialize(sourceInitFile)
391        response = self.dap_server.request_launch(
392            program,
393            args=args,
394            cwd=cwd,
395            env=env,
396            stopOnEntry=stopOnEntry,
397            disableASLR=disableASLR,
398            disableSTDIO=disableSTDIO,
399            shellExpandArguments=shellExpandArguments,
400            trace=trace,
401            initCommands=initCommands,
402            preRunCommands=preRunCommands,
403            stopCommands=stopCommands,
404            exitCommands=exitCommands,
405            terminateCommands=terminateCommands,
406            sourcePath=sourcePath,
407            debuggerRoot=debuggerRoot,
408            launchCommands=launchCommands,
409            sourceMap=sourceMap,
410            runInTerminal=runInTerminal,
411            postRunCommands=postRunCommands,
412            enableAutoVariableSummaries=enableAutoVariableSummaries,
413            enableSyntheticChildDebugging=enableSyntheticChildDebugging,
414            commandEscapePrefix=commandEscapePrefix,
415            customFrameFormat=customFrameFormat,
416            customThreadFormat=customThreadFormat,
417        )
418
419        if expectFailure:
420            return response
421
422        if not (response and response["success"]):
423            self.assertTrue(
424                response["success"], "launch failed (%s)" % (response["message"])
425            )
426        return response
427
428    def build_and_launch(
429        self,
430        program,
431        args=None,
432        cwd=None,
433        env=None,
434        stopOnEntry=False,
435        disableASLR=True,
436        disableSTDIO=False,
437        shellExpandArguments=False,
438        trace=False,
439        initCommands=None,
440        preRunCommands=None,
441        stopCommands=None,
442        exitCommands=None,
443        terminateCommands=None,
444        sourcePath=None,
445        debuggerRoot=None,
446        sourceInitFile=False,
447        runInTerminal=False,
448        disconnectAutomatically=True,
449        postRunCommands=None,
450        lldbDAPEnv=None,
451        enableAutoVariableSummaries=False,
452        enableSyntheticChildDebugging=False,
453        commandEscapePrefix=None,
454        customFrameFormat=None,
455        customThreadFormat=None,
456        launchCommands=None,
457        expectFailure=False,
458    ):
459        """Build the default Makefile target, create the DAP debug adaptor,
460        and launch the process.
461        """
462        self.build_and_create_debug_adaptor(lldbDAPEnv)
463        self.assertTrue(os.path.exists(program), "executable must exist")
464
465        return self.launch(
466            program,
467            args,
468            cwd,
469            env,
470            stopOnEntry,
471            disableASLR,
472            disableSTDIO,
473            shellExpandArguments,
474            trace,
475            initCommands,
476            preRunCommands,
477            stopCommands,
478            exitCommands,
479            terminateCommands,
480            sourcePath,
481            debuggerRoot,
482            sourceInitFile,
483            runInTerminal=runInTerminal,
484            disconnectAutomatically=disconnectAutomatically,
485            postRunCommands=postRunCommands,
486            enableAutoVariableSummaries=enableAutoVariableSummaries,
487            enableSyntheticChildDebugging=enableSyntheticChildDebugging,
488            commandEscapePrefix=commandEscapePrefix,
489            customFrameFormat=customFrameFormat,
490            customThreadFormat=customThreadFormat,
491            launchCommands=launchCommands,
492            expectFailure=expectFailure,
493        )
494
495    def getBuiltinDebugServerTool(self):
496        # Tries to find simulation/lldb-server/gdbserver tool path.
497        server_tool = None
498        if lldbplatformutil.getPlatform() == "linux":
499            server_tool = lldbgdbserverutils.get_lldb_server_exe()
500            if server_tool is None:
501                self.dap_server.request_disconnect(terminateDebuggee=True)
502                self.assertIsNotNone(server_tool, "lldb-server not found.")
503        elif lldbplatformutil.getPlatform() == "macosx":
504            server_tool = lldbgdbserverutils.get_debugserver_exe()
505            if server_tool is None:
506                self.dap_server.request_disconnect(terminateDebuggee=True)
507                self.assertIsNotNone(server_tool, "debugserver not found.")
508        return server_tool
509