xref: /llvm-project/lldb/packages/Python/lldbsuite/test/tools/lldb-dap/lldbdap_testcase.py (revision 11a4d43f4a8ed02694de4d39b5591ff2b1c31803)
1import os
2import time
3
4import dap_server
5from lldbsuite.test.lldbtest import *
6
7
8class DAPTestCaseBase(TestBase):
9    # set timeout based on whether ASAN was enabled or not. Increase
10    # timeout by a factor of 10 if ASAN is enabled.
11    timeoutval = 10 * (10 if ('ASAN_OPTIONS' in os.environ) else 1)
12    NO_DEBUG_INFO_TESTCASE = True
13
14    def create_debug_adaptor(self, lldbDAPEnv=None):
15        """Create the Visual Studio Code debug adaptor"""
16        self.assertTrue(
17            is_exe(self.lldbDAPExec), "lldb-dap must exist and be executable"
18        )
19        log_file_path = self.getBuildArtifact("dap.txt")
20        self.dap_server = dap_server.DebugAdaptorServer(
21            executable=self.lldbDAPExec,
22            init_commands=self.setUpCommands(),
23            log_file=log_file_path,
24            env=lldbDAPEnv,
25        )
26
27    def build_and_create_debug_adaptor(self, lldbDAPEnv=None):
28        self.build()
29        self.create_debug_adaptor(lldbDAPEnv)
30
31    def set_source_breakpoints(self, source_path, lines, data=None):
32        """Sets source breakpoints and returns an array of strings containing
33        the breakpoint IDs ("1", "2") for each breakpoint that was set.
34        Parameter data is array of data objects for breakpoints.
35        Each object in data is 1:1 mapping with the entry in lines.
36        It contains optional location/hitCondition/logMessage parameters.
37        """
38        response = self.dap_server.request_setBreakpoints(source_path, lines, data)
39        if response is None:
40            return []
41        breakpoints = response["body"]["breakpoints"]
42        breakpoint_ids = []
43        for breakpoint in breakpoints:
44            breakpoint_ids.append("%i" % (breakpoint["id"]))
45        return breakpoint_ids
46
47    def set_function_breakpoints(self, functions, condition=None, hitCondition=None):
48        """Sets breakpoints by function name given an array of function names
49        and returns an array of strings containing the breakpoint IDs
50        ("1", "2") for each breakpoint that was set.
51        """
52        response = self.dap_server.request_setFunctionBreakpoints(
53            functions, condition=condition, hitCondition=hitCondition
54        )
55        if response is None:
56            return []
57        breakpoints = response["body"]["breakpoints"]
58        breakpoint_ids = []
59        for breakpoint in breakpoints:
60            breakpoint_ids.append("%i" % (breakpoint["id"]))
61        return breakpoint_ids
62
63    def waitUntil(self, condition_callback):
64        for _ in range(20):
65            if condition_callback():
66                return True
67            time.sleep(0.5)
68        return False
69
70    def verify_breakpoint_hit(self, breakpoint_ids):
71        """Wait for the process we are debugging to stop, and verify we hit
72        any breakpoint location in the "breakpoint_ids" array.
73        "breakpoint_ids" should be a list of breakpoint ID strings
74        (["1", "2"]). The return value from self.set_source_breakpoints()
75        or self.set_function_breakpoints() can be passed to this function"""
76        stopped_events = self.dap_server.wait_for_stopped()
77        for stopped_event in stopped_events:
78            if "body" in stopped_event:
79                body = stopped_event["body"]
80                if "reason" not in body:
81                    continue
82                if body["reason"] != "breakpoint":
83                    continue
84                if "description" not in body:
85                    continue
86                # Descriptions for breakpoints will be in the form
87                # "breakpoint 1.1", so look for any description that matches
88                # ("breakpoint 1.") in the description field as verification
89                # that one of the breakpoint locations was hit. DAP doesn't
90                # allow breakpoints to have multiple locations, but LLDB does.
91                # So when looking at the description we just want to make sure
92                # the right breakpoint matches and not worry about the actual
93                # location.
94                description = body["description"]
95                for breakpoint_id in breakpoint_ids:
96                    match_desc = "breakpoint %s." % (breakpoint_id)
97                    if match_desc in description:
98                        return
99        self.assertTrue(False, "breakpoint not hit")
100
101    def verify_stop_exception_info(self, expected_description):
102        """Wait for the process we are debugging to stop, and verify the stop
103        reason is 'exception' and that the description matches
104        'expected_description'
105        """
106        stopped_events = self.dap_server.wait_for_stopped()
107        for stopped_event in stopped_events:
108            if "body" in stopped_event:
109                body = stopped_event["body"]
110                if "reason" not in body:
111                    continue
112                if body["reason"] != "exception":
113                    continue
114                if "description" not in body:
115                    continue
116                description = body["description"]
117                if expected_description == description:
118                    return True
119        return False
120
121    def verify_commands(self, flavor, output, commands):
122        self.assertTrue(output and len(output) > 0, "expect console output")
123        lines = output.splitlines()
124        prefix = "(lldb) "
125        for cmd in commands:
126            found = False
127            for line in lines:
128                if len(cmd) > 0 and (cmd[0] == "!" or cmd[0] == "?"):
129                    cmd = cmd[1:]
130                if line.startswith(prefix) and cmd in line:
131                    found = True
132                    break
133            self.assertTrue(
134                found, "verify '%s' found in console output for '%s'" % (cmd, flavor)
135            )
136
137    def get_dict_value(self, d, key_path):
138        """Verify each key in the key_path array is in contained in each
139        dictionary within "d". Assert if any key isn't in the
140        corresponding dictionary. This is handy for grabbing values from VS
141        Code response dictionary like getting
142        response['body']['stackFrames']
143        """
144        value = d
145        for key in key_path:
146            if key in value:
147                value = value[key]
148            else:
149                self.assertTrue(
150                    key in value,
151                    'key "%s" from key_path "%s" not in "%s"' % (key, key_path, d),
152                )
153        return value
154
155    def get_stackFrames_and_totalFramesCount(
156        self, threadId=None, startFrame=None, levels=None, dump=False
157    ):
158        response = self.dap_server.request_stackTrace(
159            threadId=threadId, startFrame=startFrame, levels=levels, dump=dump
160        )
161        if response:
162            stackFrames = self.get_dict_value(response, ["body", "stackFrames"])
163            totalFrames = self.get_dict_value(response, ["body", "totalFrames"])
164            self.assertTrue(
165                totalFrames > 0,
166                "verify totalFrames count is provided by extension that supports "
167                "async frames loading",
168            )
169            return (stackFrames, totalFrames)
170        return (None, 0)
171
172    def get_stackFrames(self, threadId=None, startFrame=None, levels=None, dump=False):
173        (stackFrames, totalFrames) = self.get_stackFrames_and_totalFramesCount(
174            threadId=threadId, startFrame=startFrame, levels=levels, dump=dump
175        )
176        return stackFrames
177
178    def get_source_and_line(self, threadId=None, frameIndex=0):
179        stackFrames = self.get_stackFrames(
180            threadId=threadId, startFrame=frameIndex, levels=1
181        )
182        if stackFrames is not None:
183            stackFrame = stackFrames[0]
184            ["source", "path"]
185            if "source" in stackFrame:
186                source = stackFrame["source"]
187                if "path" in source:
188                    if "line" in stackFrame:
189                        return (source["path"], stackFrame["line"])
190        return ("", 0)
191
192    def get_stdout(self, timeout=0.0):
193        return self.dap_server.get_output("stdout", timeout=timeout)
194
195    def get_console(self, timeout=0.0):
196        return self.dap_server.get_output("console", timeout=timeout)
197
198    def collect_console(self, timeout_secs, pattern=None):
199        return self.dap_server.collect_output(
200            "console", timeout_secs=timeout_secs, pattern=pattern
201        )
202
203    def get_local_as_int(self, name, threadId=None):
204        value = self.dap_server.get_local_variable_value(name, threadId=threadId)
205        # 'value' may have the variable value and summary.
206        # Extract the variable value since summary can have nonnumeric characters.
207        value = value.split(" ")[0]
208        if value.startswith("0x"):
209            return int(value, 16)
210        elif value.startswith("0"):
211            return int(value, 8)
212        else:
213            return int(value)
214
215    def set_local(self, name, value, id=None):
216        """Set a top level local variable only."""
217        return self.dap_server.request_setVariable(1, name, str(value), id=id)
218
219    def set_global(self, name, value, id=None):
220        """Set a top level global variable only."""
221        return self.dap_server.request_setVariable(2, name, str(value), id=id)
222
223    def stepIn(self, threadId=None, targetId=None, waitForStop=True):
224        self.dap_server.request_stepIn(threadId=threadId, targetId=targetId)
225        if waitForStop:
226            return self.dap_server.wait_for_stopped()
227        return None
228
229    def stepOver(self, threadId=None, waitForStop=True):
230        self.dap_server.request_next(threadId=threadId)
231        if waitForStop:
232            return self.dap_server.wait_for_stopped()
233        return None
234
235    def stepOut(self, threadId=None, waitForStop=True):
236        self.dap_server.request_stepOut(threadId=threadId)
237        if waitForStop:
238            return self.dap_server.wait_for_stopped()
239        return None
240
241    def continue_to_next_stop(self):
242        self.dap_server.request_continue()
243        return self.dap_server.wait_for_stopped()
244
245    def continue_to_breakpoints(self, breakpoint_ids):
246        self.dap_server.request_continue()
247        self.verify_breakpoint_hit(breakpoint_ids)
248
249    def continue_to_exception_breakpoint(self, filter_label):
250        self.dap_server.request_continue()
251        self.assertTrue(
252            self.verify_stop_exception_info(filter_label),
253            'verify we got "%s"' % (filter_label),
254        )
255
256    def continue_to_exit(self, exitCode=0):
257        self.dap_server.request_continue()
258        stopped_events = self.dap_server.wait_for_stopped()
259        self.assertEqual(
260            len(stopped_events), 1, "stopped_events = {}".format(stopped_events)
261        )
262        self.assertEqual(
263            stopped_events[0]["event"], "exited", "make sure program ran to completion"
264        )
265        self.assertEqual(
266            stopped_events[0]["body"]["exitCode"],
267            exitCode,
268            "exitCode == %i" % (exitCode),
269        )
270
271    def disassemble(self, threadId=None, frameIndex=None):
272        stackFrames = self.get_stackFrames(
273            threadId=threadId, startFrame=frameIndex, levels=1
274        )
275        self.assertIsNotNone(stackFrames)
276        memoryReference = stackFrames[0]["instructionPointerReference"]
277        self.assertIsNotNone(memoryReference)
278
279        if memoryReference not in self.dap_server.disassembled_instructions:
280            self.dap_server.request_disassemble(memoryReference=memoryReference)
281
282        return self.dap_server.disassembled_instructions[memoryReference]
283
284    def attach(
285        self,
286        program=None,
287        pid=None,
288        waitFor=None,
289        trace=None,
290        initCommands=None,
291        preRunCommands=None,
292        stopCommands=None,
293        exitCommands=None,
294        attachCommands=None,
295        coreFile=None,
296        disconnectAutomatically=True,
297        terminateCommands=None,
298        postRunCommands=None,
299        sourceMap=None,
300        sourceInitFile=False,
301        expectFailure=False,
302    ):
303        """Build the default Makefile target, create the DAP debug adaptor,
304        and attach to the process.
305        """
306
307        # Make sure we disconnect and terminate the DAP debug adaptor even
308        # if we throw an exception during the test case.
309        def cleanup():
310            if disconnectAutomatically:
311                self.dap_server.request_disconnect(terminateDebuggee=True)
312            self.dap_server.terminate()
313
314        # Execute the cleanup function during test case tear down.
315        self.addTearDownHook(cleanup)
316        # Initialize and launch the program
317        self.dap_server.request_initialize(sourceInitFile)
318        response = self.dap_server.request_attach(
319            program=program,
320            pid=pid,
321            waitFor=waitFor,
322            trace=trace,
323            initCommands=initCommands,
324            preRunCommands=preRunCommands,
325            stopCommands=stopCommands,
326            exitCommands=exitCommands,
327            attachCommands=attachCommands,
328            terminateCommands=terminateCommands,
329            coreFile=coreFile,
330            postRunCommands=postRunCommands,
331            sourceMap=sourceMap,
332        )
333        if expectFailure:
334            return response
335        if not (response and response["success"]):
336            self.assertTrue(
337                response["success"], "attach failed (%s)" % (response["message"])
338            )
339
340    def launch(
341        self,
342        program=None,
343        args=None,
344        cwd=None,
345        env=None,
346        stopOnEntry=False,
347        disableASLR=True,
348        disableSTDIO=False,
349        shellExpandArguments=False,
350        trace=False,
351        initCommands=None,
352        preRunCommands=None,
353        stopCommands=None,
354        exitCommands=None,
355        terminateCommands=None,
356        sourcePath=None,
357        debuggerRoot=None,
358        sourceInitFile=False,
359        launchCommands=None,
360        sourceMap=None,
361        disconnectAutomatically=True,
362        runInTerminal=False,
363        expectFailure=False,
364        postRunCommands=None,
365        enableAutoVariableSummaries=False,
366        enableSyntheticChildDebugging=False,
367        commandEscapePrefix=None,
368        customFrameFormat=None,
369        customThreadFormat=None,
370    ):
371        """Sending launch request to dap"""
372
373        # Make sure we disconnect and terminate the DAP debug adapter,
374        # if we throw an exception during the test case
375        def cleanup():
376            if disconnectAutomatically:
377                self.dap_server.request_disconnect(terminateDebuggee=True)
378            self.dap_server.terminate()
379
380        # Execute the cleanup function during test case tear down.
381        self.addTearDownHook(cleanup)
382
383        # Initialize and launch the program
384        self.dap_server.request_initialize(sourceInitFile)
385        response = self.dap_server.request_launch(
386            program,
387            args=args,
388            cwd=cwd,
389            env=env,
390            stopOnEntry=stopOnEntry,
391            disableASLR=disableASLR,
392            disableSTDIO=disableSTDIO,
393            shellExpandArguments=shellExpandArguments,
394            trace=trace,
395            initCommands=initCommands,
396            preRunCommands=preRunCommands,
397            stopCommands=stopCommands,
398            exitCommands=exitCommands,
399            terminateCommands=terminateCommands,
400            sourcePath=sourcePath,
401            debuggerRoot=debuggerRoot,
402            launchCommands=launchCommands,
403            sourceMap=sourceMap,
404            runInTerminal=runInTerminal,
405            postRunCommands=postRunCommands,
406            enableAutoVariableSummaries=enableAutoVariableSummaries,
407            enableSyntheticChildDebugging=enableSyntheticChildDebugging,
408            commandEscapePrefix=commandEscapePrefix,
409            customFrameFormat=customFrameFormat,
410            customThreadFormat=customThreadFormat,
411        )
412
413        if expectFailure:
414            return response
415
416        if not (response and response["success"]):
417            self.assertTrue(
418                response["success"], "launch failed (%s)" % (response["message"])
419            )
420        return response
421
422    def build_and_launch(
423        self,
424        program,
425        args=None,
426        cwd=None,
427        env=None,
428        stopOnEntry=False,
429        disableASLR=True,
430        disableSTDIO=False,
431        shellExpandArguments=False,
432        trace=False,
433        initCommands=None,
434        preRunCommands=None,
435        stopCommands=None,
436        exitCommands=None,
437        terminateCommands=None,
438        sourcePath=None,
439        debuggerRoot=None,
440        sourceInitFile=False,
441        runInTerminal=False,
442        disconnectAutomatically=True,
443        postRunCommands=None,
444        lldbDAPEnv=None,
445        enableAutoVariableSummaries=False,
446        enableSyntheticChildDebugging=False,
447        commandEscapePrefix=None,
448        customFrameFormat=None,
449        customThreadFormat=None,
450        launchCommands=None,
451        expectFailure=False,
452    ):
453        """Build the default Makefile target, create the DAP debug adaptor,
454        and launch the process.
455        """
456        self.build_and_create_debug_adaptor(lldbDAPEnv)
457        self.assertTrue(os.path.exists(program), "executable must exist")
458
459        return self.launch(
460            program,
461            args,
462            cwd,
463            env,
464            stopOnEntry,
465            disableASLR,
466            disableSTDIO,
467            shellExpandArguments,
468            trace,
469            initCommands,
470            preRunCommands,
471            stopCommands,
472            exitCommands,
473            terminateCommands,
474            sourcePath,
475            debuggerRoot,
476            sourceInitFile,
477            runInTerminal=runInTerminal,
478            disconnectAutomatically=disconnectAutomatically,
479            postRunCommands=postRunCommands,
480            enableAutoVariableSummaries=enableAutoVariableSummaries,
481            enableSyntheticChildDebugging=enableSyntheticChildDebugging,
482            commandEscapePrefix=commandEscapePrefix,
483            customFrameFormat=customFrameFormat,
484            customThreadFormat=customThreadFormat,
485            launchCommands=launchCommands,
486            expectFailure=expectFailure,
487        )
488