xref: /llvm-project/lldb/packages/Python/lldbsuite/test/tools/lldb-dap/lldbdap_testcase.py (revision 01263c6c6fb495a94fe0ccbb1420bb1ec8460748)
1import os
2import time
3
4import dap_server
5from lldbsuite.test.lldbtest import *
6
7
8class DAPTestCaseBase(TestBase):
9    NO_DEBUG_INFO_TESTCASE = True
10
11    def create_debug_adaptor(self, lldbDAPEnv=None):
12        """Create the Visual Studio Code debug adaptor"""
13        self.assertTrue(
14            is_exe(self.lldbDAPExec), "lldb-dap must exist and be executable"
15        )
16        log_file_path = self.getBuildArtifact("dap.txt")
17        self.dap_server = dap_server.DebugAdaptorServer(
18            executable=self.lldbDAPExec,
19            init_commands=self.setUpCommands(),
20            log_file=log_file_path,
21            env=lldbDAPEnv,
22        )
23
24    def build_and_create_debug_adaptor(self, lldbDAPEnv=None):
25        self.build()
26        self.create_debug_adaptor(lldbDAPEnv)
27
28    def set_source_breakpoints(self, source_path, lines, data=None):
29        """Sets source breakpoints and returns an array of strings containing
30        the breakpoint IDs ("1", "2") for each breakpoint that was set.
31        Parameter data is array of data objects for breakpoints.
32        Each object in data is 1:1 mapping with the entry in lines.
33        It contains optional location/hitCondition/logMessage parameters.
34        """
35        response = self.dap_server.request_setBreakpoints(source_path, lines, data)
36        if response is None:
37            return []
38        breakpoints = response["body"]["breakpoints"]
39        breakpoint_ids = []
40        for breakpoint in breakpoints:
41            breakpoint_ids.append("%i" % (breakpoint["id"]))
42        return breakpoint_ids
43
44    def set_function_breakpoints(self, functions, condition=None, hitCondition=None):
45        """Sets breakpoints by function name given an array of function names
46        and returns an array of strings containing the breakpoint IDs
47        ("1", "2") for each breakpoint that was set.
48        """
49        response = self.dap_server.request_setFunctionBreakpoints(
50            functions, condition=condition, hitCondition=hitCondition
51        )
52        if response is None:
53            return []
54        breakpoints = response["body"]["breakpoints"]
55        breakpoint_ids = []
56        for breakpoint in breakpoints:
57            breakpoint_ids.append("%i" % (breakpoint["id"]))
58        return breakpoint_ids
59
60    def waitUntil(self, condition_callback):
61        for _ in range(20):
62            if condition_callback():
63                return True
64            time.sleep(0.5)
65        return False
66
67    def verify_breakpoint_hit(self, breakpoint_ids):
68        """Wait for the process we are debugging to stop, and verify we hit
69        any breakpoint location in the "breakpoint_ids" array.
70        "breakpoint_ids" should be a list of breakpoint ID strings
71        (["1", "2"]). The return value from self.set_source_breakpoints()
72        or self.set_function_breakpoints() can be passed to this function"""
73        stopped_events = self.dap_server.wait_for_stopped()
74        for stopped_event in stopped_events:
75            if "body" in stopped_event:
76                body = stopped_event["body"]
77                if "reason" not in body:
78                    continue
79                if body["reason"] != "breakpoint":
80                    continue
81                if "description" not in body:
82                    continue
83                # Descriptions for breakpoints will be in the form
84                # "breakpoint 1.1", so look for any description that matches
85                # ("breakpoint 1.") in the description field as verification
86                # that one of the breakpoint locations was hit. DAP doesn't
87                # allow breakpoints to have multiple locations, but LLDB does.
88                # So when looking at the description we just want to make sure
89                # the right breakpoint matches and not worry about the actual
90                # location.
91                description = body["description"]
92                for breakpoint_id in breakpoint_ids:
93                    match_desc = "breakpoint %s." % (breakpoint_id)
94                    if match_desc in description:
95                        return
96        self.assertTrue(False, "breakpoint not hit")
97
98    def verify_stop_exception_info(self, expected_description):
99        """Wait for the process we are debugging to stop, and verify the stop
100        reason is 'exception' and that the description matches
101        'expected_description'
102        """
103        stopped_events = self.dap_server.wait_for_stopped()
104        for stopped_event in stopped_events:
105            if "body" in stopped_event:
106                body = stopped_event["body"]
107                if "reason" not in body:
108                    continue
109                if body["reason"] != "exception":
110                    continue
111                if "description" not in body:
112                    continue
113                description = body["description"]
114                if expected_description == description:
115                    return True
116        return False
117
118    def verify_commands(self, flavor, output, commands):
119        self.assertTrue(output and len(output) > 0, "expect console output")
120        lines = output.splitlines()
121        prefix = "(lldb) "
122        for cmd in commands:
123            found = False
124            for line in lines:
125                if line.startswith(prefix) and cmd in line:
126                    found = True
127                    break
128            self.assertTrue(
129                found, "verify '%s' found in console output for '%s'" % (cmd, flavor)
130            )
131
132    def get_dict_value(self, d, key_path):
133        """Verify each key in the key_path array is in contained in each
134        dictionary within "d". Assert if any key isn't in the
135        corresponding dictionary. This is handy for grabbing values from VS
136        Code response dictionary like getting
137        response['body']['stackFrames']
138        """
139        value = d
140        for key in key_path:
141            if key in value:
142                value = value[key]
143            else:
144                self.assertTrue(
145                    key in value,
146                    'key "%s" from key_path "%s" not in "%s"' % (key, key_path, d),
147                )
148        return value
149
150    def get_stackFrames_and_totalFramesCount(
151        self, threadId=None, startFrame=None, levels=None, dump=False
152    ):
153        response = self.dap_server.request_stackTrace(
154            threadId=threadId, startFrame=startFrame, levels=levels, dump=dump
155        )
156        if response:
157            stackFrames = self.get_dict_value(response, ["body", "stackFrames"])
158            totalFrames = self.get_dict_value(response, ["body", "totalFrames"])
159            self.assertTrue(
160                totalFrames > 0,
161                "verify totalFrames count is provided by extension that supports "
162                "async frames loading",
163            )
164            return (stackFrames, totalFrames)
165        return (None, 0)
166
167    def get_stackFrames(self, threadId=None, startFrame=None, levels=None, dump=False):
168        (stackFrames, totalFrames) = self.get_stackFrames_and_totalFramesCount(
169            threadId=threadId, startFrame=startFrame, levels=levels, dump=dump
170        )
171        return stackFrames
172
173    def get_source_and_line(self, threadId=None, frameIndex=0):
174        stackFrames = self.get_stackFrames(
175            threadId=threadId, startFrame=frameIndex, levels=1
176        )
177        if stackFrames is not None:
178            stackFrame = stackFrames[0]
179            ["source", "path"]
180            if "source" in stackFrame:
181                source = stackFrame["source"]
182                if "path" in source:
183                    if "line" in stackFrame:
184                        return (source["path"], stackFrame["line"])
185        return ("", 0)
186
187    def get_stdout(self, timeout=0.0):
188        return self.dap_server.get_output("stdout", timeout=timeout)
189
190    def get_console(self, timeout=0.0):
191        return self.dap_server.get_output("console", timeout=timeout)
192
193    def collect_console(self, duration):
194        return self.dap_server.collect_output("console", duration=duration)
195
196    def get_local_as_int(self, name, threadId=None):
197        value = self.dap_server.get_local_variable_value(name, threadId=threadId)
198        if value.startswith("0x"):
199            return int(value, 16)
200        elif value.startswith("0"):
201            return int(value, 8)
202        else:
203            return int(value)
204
205    def set_local(self, name, value, id=None):
206        """Set a top level local variable only."""
207        return self.dap_server.request_setVariable(1, name, str(value), id=id)
208
209    def set_global(self, name, value, id=None):
210        """Set a top level global variable only."""
211        return self.dap_server.request_setVariable(2, name, str(value), id=id)
212
213    def stepIn(self, threadId=None, waitForStop=True):
214        self.dap_server.request_stepIn(threadId=threadId)
215        if waitForStop:
216            return self.dap_server.wait_for_stopped()
217        return None
218
219    def stepOver(self, threadId=None, waitForStop=True):
220        self.dap_server.request_next(threadId=threadId)
221        if waitForStop:
222            return self.dap_server.wait_for_stopped()
223        return None
224
225    def stepOut(self, threadId=None, waitForStop=True):
226        self.dap_server.request_stepOut(threadId=threadId)
227        if waitForStop:
228            return self.dap_server.wait_for_stopped()
229        return None
230
231    def continue_to_next_stop(self):
232        self.dap_server.request_continue()
233        return self.dap_server.wait_for_stopped()
234
235    def continue_to_breakpoints(self, breakpoint_ids):
236        self.dap_server.request_continue()
237        self.verify_breakpoint_hit(breakpoint_ids)
238
239    def continue_to_exception_breakpoint(self, filter_label):
240        self.dap_server.request_continue()
241        self.assertTrue(
242            self.verify_stop_exception_info(filter_label),
243            'verify we got "%s"' % (filter_label),
244        )
245
246    def continue_to_exit(self, exitCode=0):
247        self.dap_server.request_continue()
248        stopped_events = self.dap_server.wait_for_stopped()
249        self.assertEquals(
250            len(stopped_events), 1, "stopped_events = {}".format(stopped_events)
251        )
252        self.assertEquals(
253            stopped_events[0]["event"], "exited", "make sure program ran to completion"
254        )
255        self.assertEquals(
256            stopped_events[0]["body"]["exitCode"],
257            exitCode,
258            "exitCode == %i" % (exitCode),
259        )
260
261    def disassemble(self, threadId=None, frameIndex=None):
262        stackFrames = self.get_stackFrames(
263            threadId=threadId, startFrame=frameIndex, levels=1
264        )
265        self.assertIsNotNone(stackFrames)
266        memoryReference = stackFrames[0]["instructionPointerReference"]
267        self.assertIsNotNone(memoryReference)
268
269        if memoryReference not in self.dap_server.disassembled_instructions:
270            self.dap_server.request_disassemble(memoryReference=memoryReference)
271
272        return self.dap_server.disassembled_instructions[memoryReference]
273
274    def attach(
275        self,
276        program=None,
277        pid=None,
278        waitFor=None,
279        trace=None,
280        initCommands=None,
281        preRunCommands=None,
282        stopCommands=None,
283        exitCommands=None,
284        attachCommands=None,
285        coreFile=None,
286        disconnectAutomatically=True,
287        terminateCommands=None,
288        postRunCommands=None,
289        sourceMap=None,
290        sourceInitFile=False,
291    ):
292        """Build the default Makefile target, create the DAP debug adaptor,
293        and attach to the process.
294        """
295
296        # Make sure we disconnect and terminate the DAP debug adaptor even
297        # if we throw an exception during the test case.
298        def cleanup():
299            if disconnectAutomatically:
300                self.dap_server.request_disconnect(terminateDebuggee=True)
301            self.dap_server.terminate()
302
303        # Execute the cleanup function during test case tear down.
304        self.addTearDownHook(cleanup)
305        # Initialize and launch the program
306        self.dap_server.request_initialize(sourceInitFile)
307        response = self.dap_server.request_attach(
308            program=program,
309            pid=pid,
310            waitFor=waitFor,
311            trace=trace,
312            initCommands=initCommands,
313            preRunCommands=preRunCommands,
314            stopCommands=stopCommands,
315            exitCommands=exitCommands,
316            attachCommands=attachCommands,
317            terminateCommands=terminateCommands,
318            coreFile=coreFile,
319            postRunCommands=postRunCommands,
320            sourceMap=sourceMap,
321        )
322        if not (response and response["success"]):
323            self.assertTrue(
324                response["success"], "attach failed (%s)" % (response["message"])
325            )
326
327    def launch(
328        self,
329        program=None,
330        args=None,
331        cwd=None,
332        env=None,
333        stopOnEntry=False,
334        disableASLR=True,
335        disableSTDIO=False,
336        shellExpandArguments=False,
337        trace=False,
338        initCommands=None,
339        preRunCommands=None,
340        stopCommands=None,
341        exitCommands=None,
342        terminateCommands=None,
343        sourcePath=None,
344        debuggerRoot=None,
345        sourceInitFile=False,
346        launchCommands=None,
347        sourceMap=None,
348        disconnectAutomatically=True,
349        runInTerminal=False,
350        expectFailure=False,
351        postRunCommands=None,
352        enableAutoVariableSummaries=False,
353        enableSyntheticChildDebugging=False,
354    ):
355        """Sending launch request to dap"""
356
357        # Make sure we disconnect and terminate the DAP debug adapter,
358        # if we throw an exception during the test case
359        def cleanup():
360            if disconnectAutomatically:
361                self.dap_server.request_disconnect(terminateDebuggee=True)
362            self.dap_server.terminate()
363
364        # Execute the cleanup function during test case tear down.
365        self.addTearDownHook(cleanup)
366
367        # Initialize and launch the program
368        self.dap_server.request_initialize(sourceInitFile)
369        response = self.dap_server.request_launch(
370            program,
371            args=args,
372            cwd=cwd,
373            env=env,
374            stopOnEntry=stopOnEntry,
375            disableASLR=disableASLR,
376            disableSTDIO=disableSTDIO,
377            shellExpandArguments=shellExpandArguments,
378            trace=trace,
379            initCommands=initCommands,
380            preRunCommands=preRunCommands,
381            stopCommands=stopCommands,
382            exitCommands=exitCommands,
383            terminateCommands=terminateCommands,
384            sourcePath=sourcePath,
385            debuggerRoot=debuggerRoot,
386            launchCommands=launchCommands,
387            sourceMap=sourceMap,
388            runInTerminal=runInTerminal,
389            postRunCommands=postRunCommands,
390            enableAutoVariableSummaries=enableAutoVariableSummaries,
391            enableSyntheticChildDebugging=enableSyntheticChildDebugging,
392        )
393
394        if expectFailure:
395            return response
396
397        if not (response and response["success"]):
398            self.assertTrue(
399                response["success"], "launch failed (%s)" % (response["message"])
400            )
401        return response
402
403    def build_and_launch(
404        self,
405        program,
406        args=None,
407        cwd=None,
408        env=None,
409        stopOnEntry=False,
410        disableASLR=True,
411        disableSTDIO=False,
412        shellExpandArguments=False,
413        trace=False,
414        initCommands=None,
415        preRunCommands=None,
416        stopCommands=None,
417        exitCommands=None,
418        terminateCommands=None,
419        sourcePath=None,
420        debuggerRoot=None,
421        sourceInitFile=False,
422        runInTerminal=False,
423        disconnectAutomatically=True,
424        postRunCommands=None,
425        lldbDAPEnv=None,
426        enableAutoVariableSummaries=False,
427        enableSyntheticChildDebugging=False,
428    ):
429        """Build the default Makefile target, create the DAP debug adaptor,
430        and launch the process.
431        """
432        self.build_and_create_debug_adaptor(lldbDAPEnv)
433        self.assertTrue(os.path.exists(program), "executable must exist")
434
435        return self.launch(
436            program,
437            args,
438            cwd,
439            env,
440            stopOnEntry,
441            disableASLR,
442            disableSTDIO,
443            shellExpandArguments,
444            trace,
445            initCommands,
446            preRunCommands,
447            stopCommands,
448            exitCommands,
449            terminateCommands,
450            sourcePath,
451            debuggerRoot,
452            sourceInitFile,
453            runInTerminal=runInTerminal,
454            disconnectAutomatically=disconnectAutomatically,
455            postRunCommands=postRunCommands,
456            enableAutoVariableSummaries=enableAutoVariableSummaries,
457            enableSyntheticChildDebugging=enableSyntheticChildDebugging,
458        )
459