xref: /llvm-project/lldb/packages/Python/lldbsuite/test/tools/lldb-dap/lldbdap_testcase.py (revision ceeb08b9e0a51a4d2e0804baeb579fe8a6485885)
1import os
2import time
3
4import dap_server
5from lldbsuite.test.lldbtest import *
6from lldbsuite.test import lldbplatformutil
7import lldbgdbserverutils
8
9
10class DAPTestCaseBase(TestBase):
11    # set timeout based on whether ASAN was enabled or not. Increase
12    # timeout by a factor of 10 if ASAN is enabled.
13    timeoutval = 10 * (10 if ('ASAN_OPTIONS' in os.environ) else 1)
14    NO_DEBUG_INFO_TESTCASE = True
15
16    def create_debug_adaptor(self, lldbDAPEnv=None):
17        """Create the Visual Studio Code debug adaptor"""
18        self.assertTrue(
19            is_exe(self.lldbDAPExec), "lldb-dap must exist and be executable"
20        )
21        log_file_path = self.getBuildArtifact("dap.txt")
22        self.dap_server = dap_server.DebugAdaptorServer(
23            executable=self.lldbDAPExec,
24            init_commands=self.setUpCommands(),
25            log_file=log_file_path,
26            env=lldbDAPEnv,
27        )
28
29    def build_and_create_debug_adaptor(self, lldbDAPEnv=None):
30        self.build()
31        self.create_debug_adaptor(lldbDAPEnv)
32
33    def set_source_breakpoints(self, source_path, lines, data=None):
34        """Sets source breakpoints and returns an array of strings containing
35        the breakpoint IDs ("1", "2") for each breakpoint that was set.
36        Parameter data is array of data objects for breakpoints.
37        Each object in data is 1:1 mapping with the entry in lines.
38        It contains optional location/hitCondition/logMessage parameters.
39        """
40        response = self.dap_server.request_setBreakpoints(source_path, lines, data)
41        if response is None:
42            return []
43        breakpoints = response["body"]["breakpoints"]
44        breakpoint_ids = []
45        for breakpoint in breakpoints:
46            breakpoint_ids.append("%i" % (breakpoint["id"]))
47        return breakpoint_ids
48
49    def set_function_breakpoints(self, functions, condition=None, hitCondition=None):
50        """Sets breakpoints by function name given an array of function names
51        and returns an array of strings containing the breakpoint IDs
52        ("1", "2") for each breakpoint that was set.
53        """
54        response = self.dap_server.request_setFunctionBreakpoints(
55            functions, condition=condition, hitCondition=hitCondition
56        )
57        if response is None:
58            return []
59        breakpoints = response["body"]["breakpoints"]
60        breakpoint_ids = []
61        for breakpoint in breakpoints:
62            breakpoint_ids.append("%i" % (breakpoint["id"]))
63        return breakpoint_ids
64
65    def waitUntil(self, condition_callback):
66        for _ in range(20):
67            if condition_callback():
68                return True
69            time.sleep(0.5)
70        return False
71
72    def verify_breakpoint_hit(self, breakpoint_ids):
73        """Wait for the process we are debugging to stop, and verify we hit
74        any breakpoint location in the "breakpoint_ids" array.
75        "breakpoint_ids" should be a list of breakpoint ID strings
76        (["1", "2"]). The return value from self.set_source_breakpoints()
77        or self.set_function_breakpoints() can be passed to this function"""
78        stopped_events = self.dap_server.wait_for_stopped()
79        for stopped_event in stopped_events:
80            if "body" in stopped_event:
81                body = stopped_event["body"]
82                if "reason" not in body:
83                    continue
84                if (
85                    body["reason"] != "breakpoint"
86                    and body["reason"] != "instruction breakpoint"
87                ):
88                    continue
89                if "description" not in body:
90                    continue
91                # Descriptions for breakpoints will be in the form
92                # "breakpoint 1.1", so look for any description that matches
93                # ("breakpoint 1.") in the description field as verification
94                # that one of the breakpoint locations was hit. DAP doesn't
95                # allow breakpoints to have multiple locations, but LLDB does.
96                # So when looking at the description we just want to make sure
97                # the right breakpoint matches and not worry about the actual
98                # location.
99                description = body["description"]
100                for breakpoint_id in breakpoint_ids:
101                    match_desc = "breakpoint %s." % (breakpoint_id)
102                    if match_desc in description:
103                        return
104        self.assertTrue(False, "breakpoint not hit")
105
106    def verify_stop_exception_info(self, expected_description, timeout=timeoutval):
107        """Wait for the process we are debugging to stop, and verify the stop
108        reason is 'exception' and that the description matches
109        'expected_description'
110        """
111        stopped_events = self.dap_server.wait_for_stopped(timeout=timeout)
112        for stopped_event in stopped_events:
113            print("stopped_event", stopped_event)
114            if "body" in stopped_event:
115                body = stopped_event["body"]
116                if "reason" not in body:
117                    continue
118                if body["reason"] != "exception":
119                    continue
120                if "description" not in body:
121                    continue
122                description = body["description"]
123                if expected_description == description:
124                    return True
125        return False
126
127    def verify_commands(self, flavor, output, commands):
128        self.assertTrue(output and len(output) > 0, "expect console output")
129        lines = output.splitlines()
130        prefix = "(lldb) "
131        for cmd in commands:
132            found = False
133            for line in lines:
134                if len(cmd) > 0 and (cmd[0] == "!" or cmd[0] == "?"):
135                    cmd = cmd[1:]
136                if line.startswith(prefix) and cmd in line:
137                    found = True
138                    break
139            self.assertTrue(
140                found, "verify '%s' found in console output for '%s'" % (cmd, flavor)
141            )
142
143    def get_dict_value(self, d, key_path):
144        """Verify each key in the key_path array is in contained in each
145        dictionary within "d". Assert if any key isn't in the
146        corresponding dictionary. This is handy for grabbing values from VS
147        Code response dictionary like getting
148        response['body']['stackFrames']
149        """
150        value = d
151        for key in key_path:
152            if key in value:
153                value = value[key]
154            else:
155                self.assertTrue(
156                    key in value,
157                    'key "%s" from key_path "%s" not in "%s"' % (key, key_path, d),
158                )
159        return value
160
161    def get_stackFrames_and_totalFramesCount(
162        self, threadId=None, startFrame=None, levels=None, dump=False
163    ):
164        response = self.dap_server.request_stackTrace(
165            threadId=threadId, startFrame=startFrame, levels=levels, dump=dump
166        )
167        if response:
168            stackFrames = self.get_dict_value(response, ["body", "stackFrames"])
169            totalFrames = self.get_dict_value(response, ["body", "totalFrames"])
170            self.assertTrue(
171                totalFrames > 0,
172                "verify totalFrames count is provided by extension that supports "
173                "async frames loading",
174            )
175            return (stackFrames, totalFrames)
176        return (None, 0)
177
178    def get_stackFrames(self, threadId=None, startFrame=None, levels=None, dump=False):
179        (stackFrames, totalFrames) = self.get_stackFrames_and_totalFramesCount(
180            threadId=threadId, startFrame=startFrame, levels=levels, dump=dump
181        )
182        return stackFrames
183
184    def get_exceptionInfo(self, threadId=None):
185        response = self.dap_server.request_exceptionInfo(threadId=threadId)
186        return self.get_dict_value(response, ["body"])
187
188    def get_source_and_line(self, threadId=None, frameIndex=0):
189        stackFrames = self.get_stackFrames(
190            threadId=threadId, startFrame=frameIndex, levels=1
191        )
192        if stackFrames is not None:
193            stackFrame = stackFrames[0]
194            ["source", "path"]
195            if "source" in stackFrame:
196                source = stackFrame["source"]
197                if "path" in source:
198                    if "line" in stackFrame:
199                        return (source["path"], stackFrame["line"])
200        return ("", 0)
201
202    def get_stdout(self, timeout=0.0):
203        return self.dap_server.get_output("stdout", timeout=timeout)
204
205    def get_console(self, timeout=0.0):
206        return self.dap_server.get_output("console", timeout=timeout)
207
208    def collect_console(self, timeout_secs, pattern=None):
209        return self.dap_server.collect_output(
210            "console", timeout_secs=timeout_secs, pattern=pattern
211        )
212
213    def collect_stdout(self, timeout_secs, pattern=None):
214        return self.dap_server.collect_output(
215            "stdout", timeout_secs=timeout_secs, pattern=pattern
216        )
217
218    def get_local_as_int(self, name, threadId=None):
219        value = self.dap_server.get_local_variable_value(name, threadId=threadId)
220        # 'value' may have the variable value and summary.
221        # Extract the variable value since summary can have nonnumeric characters.
222        value = value.split(" ")[0]
223        if value.startswith("0x"):
224            return int(value, 16)
225        elif value.startswith("0"):
226            return int(value, 8)
227        else:
228            return int(value)
229
230    def set_local(self, name, value, id=None):
231        """Set a top level local variable only."""
232        return self.dap_server.request_setVariable(1, name, str(value), id=id)
233
234    def set_global(self, name, value, id=None):
235        """Set a top level global variable only."""
236        return self.dap_server.request_setVariable(2, name, str(value), id=id)
237
238    def stepIn(
239        self, threadId=None, targetId=None, waitForStop=True, granularity="statement"
240    ):
241        self.dap_server.request_stepIn(
242            threadId=threadId, targetId=targetId, granularity=granularity
243        )
244        if waitForStop:
245            return self.dap_server.wait_for_stopped()
246        return None
247
248    def stepOver(self, threadId=None, waitForStop=True, granularity="statement"):
249        self.dap_server.request_next(threadId=threadId, granularity=granularity)
250        if waitForStop:
251            return self.dap_server.wait_for_stopped()
252        return None
253
254    def stepOut(self, threadId=None, waitForStop=True):
255        self.dap_server.request_stepOut(threadId=threadId)
256        if waitForStop:
257            return self.dap_server.wait_for_stopped()
258        return None
259
260    def continue_to_next_stop(self):
261        self.dap_server.request_continue()
262        return self.dap_server.wait_for_stopped()
263
264    def continue_to_breakpoints(self, breakpoint_ids):
265        self.dap_server.request_continue()
266        self.verify_breakpoint_hit(breakpoint_ids)
267
268    def continue_to_exception_breakpoint(self, filter_label):
269        self.dap_server.request_continue()
270        self.assertTrue(
271            self.verify_stop_exception_info(filter_label),
272            'verify we got "%s"' % (filter_label),
273        )
274
275    def continue_to_exit(self, exitCode=0):
276        self.dap_server.request_continue()
277        stopped_events = self.dap_server.wait_for_stopped()
278        self.assertEqual(
279            len(stopped_events), 1, "stopped_events = {}".format(stopped_events)
280        )
281        self.assertEqual(
282            stopped_events[0]["event"], "exited", "make sure program ran to completion"
283        )
284        self.assertEqual(
285            stopped_events[0]["body"]["exitCode"],
286            exitCode,
287            "exitCode == %i" % (exitCode),
288        )
289
290    def disassemble(self, threadId=None, frameIndex=None):
291        stackFrames = self.get_stackFrames(
292            threadId=threadId, startFrame=frameIndex, levels=1
293        )
294        self.assertIsNotNone(stackFrames)
295        memoryReference = stackFrames[0]["instructionPointerReference"]
296        self.assertIsNotNone(memoryReference)
297
298        if memoryReference not in self.dap_server.disassembled_instructions:
299            self.dap_server.request_disassemble(memoryReference=memoryReference)
300
301        return self.dap_server.disassembled_instructions[memoryReference]
302
303    def attach(
304        self,
305        program=None,
306        pid=None,
307        waitFor=None,
308        trace=None,
309        initCommands=None,
310        preRunCommands=None,
311        stopCommands=None,
312        exitCommands=None,
313        attachCommands=None,
314        coreFile=None,
315        disconnectAutomatically=True,
316        terminateCommands=None,
317        postRunCommands=None,
318        sourceMap=None,
319        sourceInitFile=False,
320        expectFailure=False,
321        gdbRemotePort=None,
322        gdbRemoteHostname=None,
323    ):
324        """Build the default Makefile target, create the DAP debug adaptor,
325        and attach to the process.
326        """
327
328        # Make sure we disconnect and terminate the DAP debug adaptor even
329        # if we throw an exception during the test case.
330        def cleanup():
331            if disconnectAutomatically:
332                self.dap_server.request_disconnect(terminateDebuggee=True)
333            self.dap_server.terminate()
334
335        # Execute the cleanup function during test case tear down.
336        self.addTearDownHook(cleanup)
337        # Initialize and launch the program
338        self.dap_server.request_initialize(sourceInitFile)
339        response = self.dap_server.request_attach(
340            program=program,
341            pid=pid,
342            waitFor=waitFor,
343            trace=trace,
344            initCommands=initCommands,
345            preRunCommands=preRunCommands,
346            stopCommands=stopCommands,
347            exitCommands=exitCommands,
348            attachCommands=attachCommands,
349            terminateCommands=terminateCommands,
350            coreFile=coreFile,
351            postRunCommands=postRunCommands,
352            sourceMap=sourceMap,
353            gdbRemotePort=gdbRemotePort,
354            gdbRemoteHostname=gdbRemoteHostname,
355        )
356        if expectFailure:
357            return response
358        if not (response and response["success"]):
359            self.assertTrue(
360                response["success"], "attach failed (%s)" % (response["message"])
361            )
362
363    def launch(
364        self,
365        program=None,
366        args=None,
367        cwd=None,
368        env=None,
369        stopOnEntry=False,
370        disableASLR=False,
371        disableSTDIO=False,
372        shellExpandArguments=False,
373        trace=False,
374        initCommands=None,
375        preRunCommands=None,
376        stopCommands=None,
377        exitCommands=None,
378        terminateCommands=None,
379        sourcePath=None,
380        debuggerRoot=None,
381        sourceInitFile=False,
382        launchCommands=None,
383        sourceMap=None,
384        disconnectAutomatically=True,
385        runInTerminal=False,
386        expectFailure=False,
387        postRunCommands=None,
388        enableAutoVariableSummaries=False,
389        displayExtendedBacktrace=False,
390        enableSyntheticChildDebugging=False,
391        commandEscapePrefix=None,
392        customFrameFormat=None,
393        customThreadFormat=None,
394    ):
395        """Sending launch request to dap"""
396
397        # Make sure we disconnect and terminate the DAP debug adapter,
398        # if we throw an exception during the test case
399        def cleanup():
400            if disconnectAutomatically:
401                self.dap_server.request_disconnect(terminateDebuggee=True)
402            self.dap_server.terminate()
403
404        # Execute the cleanup function during test case tear down.
405        self.addTearDownHook(cleanup)
406
407        # Initialize and launch the program
408        self.dap_server.request_initialize(sourceInitFile)
409        response = self.dap_server.request_launch(
410            program,
411            args=args,
412            cwd=cwd,
413            env=env,
414            stopOnEntry=stopOnEntry,
415            disableASLR=disableASLR,
416            disableSTDIO=disableSTDIO,
417            shellExpandArguments=shellExpandArguments,
418            trace=trace,
419            initCommands=initCommands,
420            preRunCommands=preRunCommands,
421            stopCommands=stopCommands,
422            exitCommands=exitCommands,
423            terminateCommands=terminateCommands,
424            sourcePath=sourcePath,
425            debuggerRoot=debuggerRoot,
426            launchCommands=launchCommands,
427            sourceMap=sourceMap,
428            runInTerminal=runInTerminal,
429            postRunCommands=postRunCommands,
430            enableAutoVariableSummaries=enableAutoVariableSummaries,
431            displayExtendedBacktrace=displayExtendedBacktrace,
432            enableSyntheticChildDebugging=enableSyntheticChildDebugging,
433            commandEscapePrefix=commandEscapePrefix,
434            customFrameFormat=customFrameFormat,
435            customThreadFormat=customThreadFormat,
436        )
437
438        if expectFailure:
439            return response
440
441        if not (response and response["success"]):
442            self.assertTrue(
443                response["success"], "launch failed (%s)" % (response["message"])
444            )
445        return response
446
447    def build_and_launch(
448        self,
449        program,
450        args=None,
451        cwd=None,
452        env=None,
453        stopOnEntry=False,
454        disableASLR=False,
455        disableSTDIO=False,
456        shellExpandArguments=False,
457        trace=False,
458        initCommands=None,
459        preRunCommands=None,
460        stopCommands=None,
461        exitCommands=None,
462        terminateCommands=None,
463        sourcePath=None,
464        debuggerRoot=None,
465        sourceInitFile=False,
466        runInTerminal=False,
467        disconnectAutomatically=True,
468        postRunCommands=None,
469        lldbDAPEnv=None,
470        enableAutoVariableSummaries=False,
471        displayExtendedBacktrace=False,
472        enableSyntheticChildDebugging=False,
473        commandEscapePrefix=None,
474        customFrameFormat=None,
475        customThreadFormat=None,
476        launchCommands=None,
477        expectFailure=False,
478    ):
479        """Build the default Makefile target, create the DAP debug adaptor,
480        and launch the process.
481        """
482        self.build_and_create_debug_adaptor(lldbDAPEnv)
483        self.assertTrue(os.path.exists(program), "executable must exist")
484
485        return self.launch(
486            program,
487            args,
488            cwd,
489            env,
490            stopOnEntry,
491            disableASLR,
492            disableSTDIO,
493            shellExpandArguments,
494            trace,
495            initCommands,
496            preRunCommands,
497            stopCommands,
498            exitCommands,
499            terminateCommands,
500            sourcePath,
501            debuggerRoot,
502            sourceInitFile,
503            runInTerminal=runInTerminal,
504            disconnectAutomatically=disconnectAutomatically,
505            postRunCommands=postRunCommands,
506            enableAutoVariableSummaries=enableAutoVariableSummaries,
507            enableSyntheticChildDebugging=enableSyntheticChildDebugging,
508            displayExtendedBacktrace=displayExtendedBacktrace,
509            commandEscapePrefix=commandEscapePrefix,
510            customFrameFormat=customFrameFormat,
511            customThreadFormat=customThreadFormat,
512            launchCommands=launchCommands,
513            expectFailure=expectFailure,
514        )
515
516    def getBuiltinDebugServerTool(self):
517        # Tries to find simulation/lldb-server/gdbserver tool path.
518        server_tool = None
519        if lldbplatformutil.getPlatform() == "linux":
520            server_tool = lldbgdbserverutils.get_lldb_server_exe()
521            if server_tool is None:
522                self.dap_server.request_disconnect(terminateDebuggee=True)
523                self.assertIsNotNone(server_tool, "lldb-server not found.")
524        elif lldbplatformutil.getPlatform() == "macosx":
525            server_tool = lldbgdbserverutils.get_debugserver_exe()
526            if server_tool is None:
527                self.dap_server.request_disconnect(terminateDebuggee=True)
528                self.assertIsNotNone(server_tool, "debugserver not found.")
529        return server_tool
530