xref: /llvm-project/lldb/packages/Python/lldbsuite/test/tools/lldb-dap/lldbdap_testcase.py (revision 10664813fca8d5ccbfd90bae9e791b7062dabd7c)
1import os
2import time
3
4import dap_server
5from lldbsuite.test.lldbtest import *
6
7
8class DAPTestCaseBase(TestBase):
9    NO_DEBUG_INFO_TESTCASE = True
10
11    def create_debug_adaptor(self, lldbDAPEnv=None):
12        """Create the Visual Studio Code debug adaptor"""
13        self.assertTrue(
14            is_exe(self.lldbDAPExec), "lldb-dap must exist and be executable"
15        )
16        log_file_path = self.getBuildArtifact("dap.txt")
17        self.dap_server = dap_server.DebugAdaptorServer(
18            executable=self.lldbDAPExec,
19            init_commands=self.setUpCommands(),
20            log_file=log_file_path,
21            env=lldbDAPEnv,
22        )
23
24    def build_and_create_debug_adaptor(self, lldbDAPEnv=None):
25        self.build()
26        self.create_debug_adaptor(lldbDAPEnv)
27
28    def set_source_breakpoints(self, source_path, lines, data=None):
29        """Sets source breakpoints and returns an array of strings containing
30        the breakpoint IDs ("1", "2") for each breakpoint that was set.
31        Parameter data is array of data objects for breakpoints.
32        Each object in data is 1:1 mapping with the entry in lines.
33        It contains optional location/hitCondition/logMessage parameters.
34        """
35        response = self.dap_server.request_setBreakpoints(source_path, lines, data)
36        if response is None:
37            return []
38        breakpoints = response["body"]["breakpoints"]
39        breakpoint_ids = []
40        for breakpoint in breakpoints:
41            breakpoint_ids.append("%i" % (breakpoint["id"]))
42        return breakpoint_ids
43
44    def set_function_breakpoints(self, functions, condition=None, hitCondition=None):
45        """Sets breakpoints by function name given an array of function names
46        and returns an array of strings containing the breakpoint IDs
47        ("1", "2") for each breakpoint that was set.
48        """
49        response = self.dap_server.request_setFunctionBreakpoints(
50            functions, condition=condition, hitCondition=hitCondition
51        )
52        if response is None:
53            return []
54        breakpoints = response["body"]["breakpoints"]
55        breakpoint_ids = []
56        for breakpoint in breakpoints:
57            breakpoint_ids.append("%i" % (breakpoint["id"]))
58        return breakpoint_ids
59
60    def waitUntil(self, condition_callback):
61        for _ in range(20):
62            if condition_callback():
63                return True
64            time.sleep(0.5)
65        return False
66
67    def verify_breakpoint_hit(self, breakpoint_ids):
68        """Wait for the process we are debugging to stop, and verify we hit
69        any breakpoint location in the "breakpoint_ids" array.
70        "breakpoint_ids" should be a list of breakpoint ID strings
71        (["1", "2"]). The return value from self.set_source_breakpoints()
72        or self.set_function_breakpoints() can be passed to this function"""
73        stopped_events = self.dap_server.wait_for_stopped()
74        for stopped_event in stopped_events:
75            if "body" in stopped_event:
76                body = stopped_event["body"]
77                if "reason" not in body:
78                    continue
79                if body["reason"] != "breakpoint":
80                    continue
81                if "description" not in body:
82                    continue
83                # Descriptions for breakpoints will be in the form
84                # "breakpoint 1.1", so look for any description that matches
85                # ("breakpoint 1.") in the description field as verification
86                # that one of the breakpoint locations was hit. DAP doesn't
87                # allow breakpoints to have multiple locations, but LLDB does.
88                # So when looking at the description we just want to make sure
89                # the right breakpoint matches and not worry about the actual
90                # location.
91                description = body["description"]
92                for breakpoint_id in breakpoint_ids:
93                    match_desc = "breakpoint %s." % (breakpoint_id)
94                    if match_desc in description:
95                        return
96        self.assertTrue(False, "breakpoint not hit")
97
98    def verify_stop_exception_info(self, expected_description):
99        """Wait for the process we are debugging to stop, and verify the stop
100        reason is 'exception' and that the description matches
101        'expected_description'
102        """
103        stopped_events = self.dap_server.wait_for_stopped()
104        for stopped_event in stopped_events:
105            if "body" in stopped_event:
106                body = stopped_event["body"]
107                if "reason" not in body:
108                    continue
109                if body["reason"] != "exception":
110                    continue
111                if "description" not in body:
112                    continue
113                description = body["description"]
114                if expected_description == description:
115                    return True
116        return False
117
118    def verify_commands(self, flavor, output, commands):
119        self.assertTrue(output and len(output) > 0, "expect console output")
120        lines = output.splitlines()
121        prefix = "(lldb) "
122        for cmd in commands:
123            found = False
124            for line in lines:
125                if line.startswith(prefix) and cmd in line:
126                    found = True
127                    break
128            self.assertTrue(
129                found, "verify '%s' found in console output for '%s'" % (cmd, flavor)
130            )
131
132    def get_dict_value(self, d, key_path):
133        """Verify each key in the key_path array is in contained in each
134        dictionary within "d". Assert if any key isn't in the
135        corresponding dictionary. This is handy for grabbing values from VS
136        Code response dictionary like getting
137        response['body']['stackFrames']
138        """
139        value = d
140        for key in key_path:
141            if key in value:
142                value = value[key]
143            else:
144                self.assertTrue(
145                    key in value,
146                    'key "%s" from key_path "%s" not in "%s"' % (key, key_path, d),
147                )
148        return value
149
150    def get_stackFrames_and_totalFramesCount(
151        self, threadId=None, startFrame=None, levels=None, dump=False
152    ):
153        response = self.dap_server.request_stackTrace(
154            threadId=threadId, startFrame=startFrame, levels=levels, dump=dump
155        )
156        if response:
157            stackFrames = self.get_dict_value(response, ["body", "stackFrames"])
158            totalFrames = self.get_dict_value(response, ["body", "totalFrames"])
159            self.assertTrue(
160                totalFrames > 0,
161                "verify totalFrames count is provided by extension that supports "
162                "async frames loading",
163            )
164            return (stackFrames, totalFrames)
165        return (None, 0)
166
167    def get_stackFrames(self, threadId=None, startFrame=None, levels=None, dump=False):
168        (stackFrames, totalFrames) = self.get_stackFrames_and_totalFramesCount(
169            threadId=threadId, startFrame=startFrame, levels=levels, dump=dump
170        )
171        return stackFrames
172
173    def get_source_and_line(self, threadId=None, frameIndex=0):
174        stackFrames = self.get_stackFrames(
175            threadId=threadId, startFrame=frameIndex, levels=1
176        )
177        if stackFrames is not None:
178            stackFrame = stackFrames[0]
179            ["source", "path"]
180            if "source" in stackFrame:
181                source = stackFrame["source"]
182                if "path" in source:
183                    if "line" in stackFrame:
184                        return (source["path"], stackFrame["line"])
185        return ("", 0)
186
187    def get_stdout(self, timeout=0.0):
188        return self.dap_server.get_output("stdout", timeout=timeout)
189
190    def get_console(self, timeout=0.0):
191        return self.dap_server.get_output("console", timeout=timeout)
192
193    def collect_console(self, duration):
194        return self.dap_server.collect_output("console", duration=duration)
195
196    def get_local_as_int(self, name, threadId=None):
197        value = self.dap_server.get_local_variable_value(name, threadId=threadId)
198        if value.startswith("0x"):
199            return int(value, 16)
200        elif value.startswith("0"):
201            return int(value, 8)
202        else:
203            return int(value)
204
205    def set_local(self, name, value, id=None):
206        """Set a top level local variable only."""
207        return self.dap_server.request_setVariable(1, name, str(value), id=id)
208
209    def set_global(self, name, value, id=None):
210        """Set a top level global variable only."""
211        return self.dap_server.request_setVariable(2, name, str(value), id=id)
212
213    def stepIn(self, threadId=None, waitForStop=True):
214        self.dap_server.request_stepIn(threadId=threadId)
215        if waitForStop:
216            return self.dap_server.wait_for_stopped()
217        return None
218
219    def stepOver(self, threadId=None, waitForStop=True):
220        self.dap_server.request_next(threadId=threadId)
221        if waitForStop:
222            return self.dap_server.wait_for_stopped()
223        return None
224
225    def stepOut(self, threadId=None, waitForStop=True):
226        self.dap_server.request_stepOut(threadId=threadId)
227        if waitForStop:
228            return self.dap_server.wait_for_stopped()
229        return None
230
231    def continue_to_next_stop(self):
232        self.dap_server.request_continue()
233        return self.dap_server.wait_for_stopped()
234
235    def continue_to_breakpoints(self, breakpoint_ids):
236        self.dap_server.request_continue()
237        self.verify_breakpoint_hit(breakpoint_ids)
238
239    def continue_to_exception_breakpoint(self, filter_label):
240        self.dap_server.request_continue()
241        self.assertTrue(
242            self.verify_stop_exception_info(filter_label),
243            'verify we got "%s"' % (filter_label),
244        )
245
246    def continue_to_exit(self, exitCode=0):
247        self.dap_server.request_continue()
248        stopped_events = self.dap_server.wait_for_stopped()
249        self.assertEquals(
250            len(stopped_events), 1, "stopped_events = {}".format(stopped_events)
251        )
252        self.assertEquals(
253            stopped_events[0]["event"], "exited", "make sure program ran to completion"
254        )
255        self.assertEquals(
256            stopped_events[0]["body"]["exitCode"],
257            exitCode,
258            "exitCode == %i" % (exitCode),
259        )
260
261    def disassemble(self, threadId=None, frameIndex=None):
262        stackFrames = self.get_stackFrames(
263            threadId=threadId, startFrame=frameIndex, levels=1
264        )
265        self.assertIsNotNone(stackFrames)
266        memoryReference = stackFrames[0]["instructionPointerReference"]
267        self.assertIsNotNone(memoryReference)
268
269        if memoryReference not in self.dap_server.disassembled_instructions:
270            self.dap_server.request_disassemble(memoryReference=memoryReference)
271
272        return self.dap_server.disassembled_instructions[memoryReference]
273
274    def attach(
275        self,
276        program=None,
277        pid=None,
278        waitFor=None,
279        trace=None,
280        initCommands=None,
281        preRunCommands=None,
282        stopCommands=None,
283        exitCommands=None,
284        attachCommands=None,
285        coreFile=None,
286        disconnectAutomatically=True,
287        terminateCommands=None,
288        postRunCommands=None,
289        sourceMap=None,
290        sourceInitFile=False,
291    ):
292        """Build the default Makefile target, create the DAP debug adaptor,
293        and attach to the process.
294        """
295
296        # Make sure we disconnect and terminate the DAP debug adaptor even
297        # if we throw an exception during the test case.
298        def cleanup():
299            if disconnectAutomatically:
300                self.dap_server.request_disconnect(terminateDebuggee=True)
301            self.dap_server.terminate()
302
303        # Execute the cleanup function during test case tear down.
304        self.addTearDownHook(cleanup)
305        # Initialize and launch the program
306        self.dap_server.request_initialize(sourceInitFile)
307        response = self.dap_server.request_attach(
308            program=program,
309            pid=pid,
310            waitFor=waitFor,
311            trace=trace,
312            initCommands=initCommands,
313            preRunCommands=preRunCommands,
314            stopCommands=stopCommands,
315            exitCommands=exitCommands,
316            attachCommands=attachCommands,
317            terminateCommands=terminateCommands,
318            coreFile=coreFile,
319            postRunCommands=postRunCommands,
320            sourceMap=sourceMap,
321        )
322        if not (response and response["success"]):
323            self.assertTrue(
324                response["success"], "attach failed (%s)" % (response["message"])
325            )
326
327    def launch(
328        self,
329        program=None,
330        args=None,
331        cwd=None,
332        env=None,
333        stopOnEntry=False,
334        disableASLR=True,
335        disableSTDIO=False,
336        shellExpandArguments=False,
337        trace=False,
338        initCommands=None,
339        preRunCommands=None,
340        stopCommands=None,
341        exitCommands=None,
342        terminateCommands=None,
343        sourcePath=None,
344        debuggerRoot=None,
345        sourceInitFile=False,
346        launchCommands=None,
347        sourceMap=None,
348        disconnectAutomatically=True,
349        runInTerminal=False,
350        expectFailure=False,
351        postRunCommands=None,
352        enableAutoVariableSummaries=False,
353        enableSyntheticChildDebugging=False,
354        commandEscapePrefix="`",
355    ):
356        """Sending launch request to dap"""
357
358        # Make sure we disconnect and terminate the DAP debug adapter,
359        # if we throw an exception during the test case
360        def cleanup():
361            if disconnectAutomatically:
362                self.dap_server.request_disconnect(terminateDebuggee=True)
363            self.dap_server.terminate()
364
365        # Execute the cleanup function during test case tear down.
366        self.addTearDownHook(cleanup)
367
368        # Initialize and launch the program
369        self.dap_server.request_initialize(sourceInitFile)
370        response = self.dap_server.request_launch(
371            program,
372            args=args,
373            cwd=cwd,
374            env=env,
375            stopOnEntry=stopOnEntry,
376            disableASLR=disableASLR,
377            disableSTDIO=disableSTDIO,
378            shellExpandArguments=shellExpandArguments,
379            trace=trace,
380            initCommands=initCommands,
381            preRunCommands=preRunCommands,
382            stopCommands=stopCommands,
383            exitCommands=exitCommands,
384            terminateCommands=terminateCommands,
385            sourcePath=sourcePath,
386            debuggerRoot=debuggerRoot,
387            launchCommands=launchCommands,
388            sourceMap=sourceMap,
389            runInTerminal=runInTerminal,
390            postRunCommands=postRunCommands,
391            enableAutoVariableSummaries=enableAutoVariableSummaries,
392            enableSyntheticChildDebugging=enableSyntheticChildDebugging,
393            commandEscapePrefix=commandEscapePrefix,
394        )
395
396        if expectFailure:
397            return response
398
399        if not (response and response["success"]):
400            self.assertTrue(
401                response["success"], "launch failed (%s)" % (response["message"])
402            )
403        return response
404
405    def build_and_launch(
406        self,
407        program,
408        args=None,
409        cwd=None,
410        env=None,
411        stopOnEntry=False,
412        disableASLR=True,
413        disableSTDIO=False,
414        shellExpandArguments=False,
415        trace=False,
416        initCommands=None,
417        preRunCommands=None,
418        stopCommands=None,
419        exitCommands=None,
420        terminateCommands=None,
421        sourcePath=None,
422        debuggerRoot=None,
423        sourceInitFile=False,
424        runInTerminal=False,
425        disconnectAutomatically=True,
426        postRunCommands=None,
427        lldbDAPEnv=None,
428        enableAutoVariableSummaries=False,
429        enableSyntheticChildDebugging=False,
430        commandEscapePrefix="`",
431    ):
432        """Build the default Makefile target, create the DAP debug adaptor,
433        and launch the process.
434        """
435        self.build_and_create_debug_adaptor(lldbDAPEnv)
436        self.assertTrue(os.path.exists(program), "executable must exist")
437
438        return self.launch(
439            program,
440            args,
441            cwd,
442            env,
443            stopOnEntry,
444            disableASLR,
445            disableSTDIO,
446            shellExpandArguments,
447            trace,
448            initCommands,
449            preRunCommands,
450            stopCommands,
451            exitCommands,
452            terminateCommands,
453            sourcePath,
454            debuggerRoot,
455            sourceInitFile,
456            runInTerminal=runInTerminal,
457            disconnectAutomatically=disconnectAutomatically,
458            postRunCommands=postRunCommands,
459            enableAutoVariableSummaries=enableAutoVariableSummaries,
460            enableSyntheticChildDebugging=enableSyntheticChildDebugging,
461            commandEscapePrefix=commandEscapePrefix,
462        )
463