xref: /llvm-project/lldb/packages/Python/lldbsuite/test/tools/lldb-dap/lldbdap_testcase.py (revision 6257a98b258a3f17b78af31bf43009a559c5dd1d)
1import os
2import time
3
4import dap_server
5from lldbsuite.test.lldbtest import *
6from lldbsuite.test import lldbplatformutil
7import lldbgdbserverutils
8
9
10class DAPTestCaseBase(TestBase):
11    # set timeout based on whether ASAN was enabled or not. Increase
12    # timeout by a factor of 10 if ASAN is enabled.
13    timeoutval = 10 * (10 if ('ASAN_OPTIONS' in os.environ) else 1)
14    NO_DEBUG_INFO_TESTCASE = True
15
16    def create_debug_adaptor(self, lldbDAPEnv=None):
17        """Create the Visual Studio Code debug adaptor"""
18        self.assertTrue(
19            is_exe(self.lldbDAPExec), "lldb-dap must exist and be executable"
20        )
21        log_file_path = self.getBuildArtifact("dap.txt")
22        self.dap_server = dap_server.DebugAdaptorServer(
23            executable=self.lldbDAPExec,
24            init_commands=self.setUpCommands(),
25            log_file=log_file_path,
26            env=lldbDAPEnv,
27        )
28
29    def build_and_create_debug_adaptor(self, lldbDAPEnv=None):
30        self.build()
31        self.create_debug_adaptor(lldbDAPEnv)
32
33    def set_source_breakpoints(self, source_path, lines, data=None):
34        """Sets source breakpoints and returns an array of strings containing
35        the breakpoint IDs ("1", "2") for each breakpoint that was set.
36        Parameter data is array of data objects for breakpoints.
37        Each object in data is 1:1 mapping with the entry in lines.
38        It contains optional location/hitCondition/logMessage parameters.
39        """
40        response = self.dap_server.request_setBreakpoints(source_path, lines, data)
41        if response is None:
42            return []
43        breakpoints = response["body"]["breakpoints"]
44        breakpoint_ids = []
45        for breakpoint in breakpoints:
46            breakpoint_ids.append("%i" % (breakpoint["id"]))
47        return breakpoint_ids
48
49    def set_function_breakpoints(self, functions, condition=None, hitCondition=None):
50        """Sets breakpoints by function name given an array of function names
51        and returns an array of strings containing the breakpoint IDs
52        ("1", "2") for each breakpoint that was set.
53        """
54        response = self.dap_server.request_setFunctionBreakpoints(
55            functions, condition=condition, hitCondition=hitCondition
56        )
57        if response is None:
58            return []
59        breakpoints = response["body"]["breakpoints"]
60        breakpoint_ids = []
61        for breakpoint in breakpoints:
62            breakpoint_ids.append("%i" % (breakpoint["id"]))
63        return breakpoint_ids
64
65    def waitUntil(self, condition_callback):
66        for _ in range(20):
67            if condition_callback():
68                return True
69            time.sleep(0.5)
70        return False
71
72    def verify_breakpoint_hit(self, breakpoint_ids):
73        """Wait for the process we are debugging to stop, and verify we hit
74        any breakpoint location in the "breakpoint_ids" array.
75        "breakpoint_ids" should be a list of breakpoint ID strings
76        (["1", "2"]). The return value from self.set_source_breakpoints()
77        or self.set_function_breakpoints() can be passed to this function"""
78        stopped_events = self.dap_server.wait_for_stopped()
79        for stopped_event in stopped_events:
80            if "body" in stopped_event:
81                body = stopped_event["body"]
82                if "reason" not in body:
83                    continue
84                if body["reason"] != "breakpoint":
85                    continue
86                if "description" not in body:
87                    continue
88                # Descriptions for breakpoints will be in the form
89                # "breakpoint 1.1", so look for any description that matches
90                # ("breakpoint 1.") in the description field as verification
91                # that one of the breakpoint locations was hit. DAP doesn't
92                # allow breakpoints to have multiple locations, but LLDB does.
93                # So when looking at the description we just want to make sure
94                # the right breakpoint matches and not worry about the actual
95                # location.
96                description = body["description"]
97                for breakpoint_id in breakpoint_ids:
98                    match_desc = "breakpoint %s." % (breakpoint_id)
99                    if match_desc in description:
100                        return
101        self.assertTrue(False, "breakpoint not hit")
102
103    def verify_stop_exception_info(self, expected_description):
104        """Wait for the process we are debugging to stop, and verify the stop
105        reason is 'exception' and that the description matches
106        'expected_description'
107        """
108        stopped_events = self.dap_server.wait_for_stopped()
109        for stopped_event in stopped_events:
110            if "body" in stopped_event:
111                body = stopped_event["body"]
112                if "reason" not in body:
113                    continue
114                if body["reason"] != "exception":
115                    continue
116                if "description" not in body:
117                    continue
118                description = body["description"]
119                if expected_description == description:
120                    return True
121        return False
122
123    def verify_commands(self, flavor, output, commands):
124        self.assertTrue(output and len(output) > 0, "expect console output")
125        lines = output.splitlines()
126        prefix = "(lldb) "
127        for cmd in commands:
128            found = False
129            for line in lines:
130                if len(cmd) > 0 and (cmd[0] == "!" or cmd[0] == "?"):
131                    cmd = cmd[1:]
132                if line.startswith(prefix) and cmd in line:
133                    found = True
134                    break
135            self.assertTrue(
136                found, "verify '%s' found in console output for '%s'" % (cmd, flavor)
137            )
138
139    def get_dict_value(self, d, key_path):
140        """Verify each key in the key_path array is in contained in each
141        dictionary within "d". Assert if any key isn't in the
142        corresponding dictionary. This is handy for grabbing values from VS
143        Code response dictionary like getting
144        response['body']['stackFrames']
145        """
146        value = d
147        for key in key_path:
148            if key in value:
149                value = value[key]
150            else:
151                self.assertTrue(
152                    key in value,
153                    'key "%s" from key_path "%s" not in "%s"' % (key, key_path, d),
154                )
155        return value
156
157    def get_stackFrames_and_totalFramesCount(
158        self, threadId=None, startFrame=None, levels=None, dump=False
159    ):
160        response = self.dap_server.request_stackTrace(
161            threadId=threadId, startFrame=startFrame, levels=levels, dump=dump
162        )
163        if response:
164            stackFrames = self.get_dict_value(response, ["body", "stackFrames"])
165            totalFrames = self.get_dict_value(response, ["body", "totalFrames"])
166            self.assertTrue(
167                totalFrames > 0,
168                "verify totalFrames count is provided by extension that supports "
169                "async frames loading",
170            )
171            return (stackFrames, totalFrames)
172        return (None, 0)
173
174    def get_stackFrames(self, threadId=None, startFrame=None, levels=None, dump=False):
175        (stackFrames, totalFrames) = self.get_stackFrames_and_totalFramesCount(
176            threadId=threadId, startFrame=startFrame, levels=levels, dump=dump
177        )
178        return stackFrames
179
180    def get_source_and_line(self, threadId=None, frameIndex=0):
181        stackFrames = self.get_stackFrames(
182            threadId=threadId, startFrame=frameIndex, levels=1
183        )
184        if stackFrames is not None:
185            stackFrame = stackFrames[0]
186            ["source", "path"]
187            if "source" in stackFrame:
188                source = stackFrame["source"]
189                if "path" in source:
190                    if "line" in stackFrame:
191                        return (source["path"], stackFrame["line"])
192        return ("", 0)
193
194    def get_stdout(self, timeout=0.0):
195        return self.dap_server.get_output("stdout", timeout=timeout)
196
197    def get_console(self, timeout=0.0):
198        return self.dap_server.get_output("console", timeout=timeout)
199
200    def collect_console(self, timeout_secs, pattern=None):
201        return self.dap_server.collect_output(
202            "console", timeout_secs=timeout_secs, pattern=pattern
203        )
204
205    def get_local_as_int(self, name, threadId=None):
206        value = self.dap_server.get_local_variable_value(name, threadId=threadId)
207        # 'value' may have the variable value and summary.
208        # Extract the variable value since summary can have nonnumeric characters.
209        value = value.split(" ")[0]
210        if value.startswith("0x"):
211            return int(value, 16)
212        elif value.startswith("0"):
213            return int(value, 8)
214        else:
215            return int(value)
216
217    def set_local(self, name, value, id=None):
218        """Set a top level local variable only."""
219        return self.dap_server.request_setVariable(1, name, str(value), id=id)
220
221    def set_global(self, name, value, id=None):
222        """Set a top level global variable only."""
223        return self.dap_server.request_setVariable(2, name, str(value), id=id)
224
225    def stepIn(
226        self, threadId=None, targetId=None, waitForStop=True, granularity="statement"
227    ):
228        self.dap_server.request_stepIn(
229            threadId=threadId, targetId=targetId, granularity=granularity
230        )
231        if waitForStop:
232            return self.dap_server.wait_for_stopped()
233        return None
234
235    def stepOver(self, threadId=None, waitForStop=True, granularity="statement"):
236        self.dap_server.request_next(threadId=threadId, granularity=granularity)
237        if waitForStop:
238            return self.dap_server.wait_for_stopped()
239        return None
240
241    def stepOut(self, threadId=None, waitForStop=True):
242        self.dap_server.request_stepOut(threadId=threadId)
243        if waitForStop:
244            return self.dap_server.wait_for_stopped()
245        return None
246
247    def continue_to_next_stop(self):
248        self.dap_server.request_continue()
249        return self.dap_server.wait_for_stopped()
250
251    def continue_to_breakpoints(self, breakpoint_ids):
252        self.dap_server.request_continue()
253        self.verify_breakpoint_hit(breakpoint_ids)
254
255    def continue_to_exception_breakpoint(self, filter_label):
256        self.dap_server.request_continue()
257        self.assertTrue(
258            self.verify_stop_exception_info(filter_label),
259            'verify we got "%s"' % (filter_label),
260        )
261
262    def continue_to_exit(self, exitCode=0):
263        self.dap_server.request_continue()
264        stopped_events = self.dap_server.wait_for_stopped()
265        self.assertEqual(
266            len(stopped_events), 1, "stopped_events = {}".format(stopped_events)
267        )
268        self.assertEqual(
269            stopped_events[0]["event"], "exited", "make sure program ran to completion"
270        )
271        self.assertEqual(
272            stopped_events[0]["body"]["exitCode"],
273            exitCode,
274            "exitCode == %i" % (exitCode),
275        )
276
277    def disassemble(self, threadId=None, frameIndex=None):
278        stackFrames = self.get_stackFrames(
279            threadId=threadId, startFrame=frameIndex, levels=1
280        )
281        self.assertIsNotNone(stackFrames)
282        memoryReference = stackFrames[0]["instructionPointerReference"]
283        self.assertIsNotNone(memoryReference)
284
285        if memoryReference not in self.dap_server.disassembled_instructions:
286            self.dap_server.request_disassemble(memoryReference=memoryReference)
287
288        return self.dap_server.disassembled_instructions[memoryReference]
289
290    def attach(
291        self,
292        program=None,
293        pid=None,
294        waitFor=None,
295        trace=None,
296        initCommands=None,
297        preRunCommands=None,
298        stopCommands=None,
299        exitCommands=None,
300        attachCommands=None,
301        coreFile=None,
302        disconnectAutomatically=True,
303        terminateCommands=None,
304        postRunCommands=None,
305        sourceMap=None,
306        sourceInitFile=False,
307        expectFailure=False,
308        gdbRemotePort=None,
309        gdbRemoteHostname=None,
310    ):
311        """Build the default Makefile target, create the DAP debug adaptor,
312        and attach to the process.
313        """
314
315        # Make sure we disconnect and terminate the DAP debug adaptor even
316        # if we throw an exception during the test case.
317        def cleanup():
318            if disconnectAutomatically:
319                self.dap_server.request_disconnect(terminateDebuggee=True)
320            self.dap_server.terminate()
321
322        # Execute the cleanup function during test case tear down.
323        self.addTearDownHook(cleanup)
324        # Initialize and launch the program
325        self.dap_server.request_initialize(sourceInitFile)
326        response = self.dap_server.request_attach(
327            program=program,
328            pid=pid,
329            waitFor=waitFor,
330            trace=trace,
331            initCommands=initCommands,
332            preRunCommands=preRunCommands,
333            stopCommands=stopCommands,
334            exitCommands=exitCommands,
335            attachCommands=attachCommands,
336            terminateCommands=terminateCommands,
337            coreFile=coreFile,
338            postRunCommands=postRunCommands,
339            sourceMap=sourceMap,
340            gdbRemotePort=gdbRemotePort,
341            gdbRemoteHostname=gdbRemoteHostname,
342        )
343        if expectFailure:
344            return response
345        if not (response and response["success"]):
346            self.assertTrue(
347                response["success"], "attach failed (%s)" % (response["message"])
348            )
349
350    def launch(
351        self,
352        program=None,
353        args=None,
354        cwd=None,
355        env=None,
356        stopOnEntry=False,
357        disableASLR=True,
358        disableSTDIO=False,
359        shellExpandArguments=False,
360        trace=False,
361        initCommands=None,
362        preRunCommands=None,
363        stopCommands=None,
364        exitCommands=None,
365        terminateCommands=None,
366        sourcePath=None,
367        debuggerRoot=None,
368        sourceInitFile=False,
369        launchCommands=None,
370        sourceMap=None,
371        disconnectAutomatically=True,
372        runInTerminal=False,
373        expectFailure=False,
374        postRunCommands=None,
375        enableAutoVariableSummaries=False,
376        enableSyntheticChildDebugging=False,
377        commandEscapePrefix=None,
378        customFrameFormat=None,
379        customThreadFormat=None,
380    ):
381        """Sending launch request to dap"""
382
383        # Make sure we disconnect and terminate the DAP debug adapter,
384        # if we throw an exception during the test case
385        def cleanup():
386            if disconnectAutomatically:
387                self.dap_server.request_disconnect(terminateDebuggee=True)
388            self.dap_server.terminate()
389
390        # Execute the cleanup function during test case tear down.
391        self.addTearDownHook(cleanup)
392
393        # Initialize and launch the program
394        self.dap_server.request_initialize(sourceInitFile)
395        response = self.dap_server.request_launch(
396            program,
397            args=args,
398            cwd=cwd,
399            env=env,
400            stopOnEntry=stopOnEntry,
401            disableASLR=disableASLR,
402            disableSTDIO=disableSTDIO,
403            shellExpandArguments=shellExpandArguments,
404            trace=trace,
405            initCommands=initCommands,
406            preRunCommands=preRunCommands,
407            stopCommands=stopCommands,
408            exitCommands=exitCommands,
409            terminateCommands=terminateCommands,
410            sourcePath=sourcePath,
411            debuggerRoot=debuggerRoot,
412            launchCommands=launchCommands,
413            sourceMap=sourceMap,
414            runInTerminal=runInTerminal,
415            postRunCommands=postRunCommands,
416            enableAutoVariableSummaries=enableAutoVariableSummaries,
417            enableSyntheticChildDebugging=enableSyntheticChildDebugging,
418            commandEscapePrefix=commandEscapePrefix,
419            customFrameFormat=customFrameFormat,
420            customThreadFormat=customThreadFormat,
421        )
422
423        if expectFailure:
424            return response
425
426        if not (response and response["success"]):
427            self.assertTrue(
428                response["success"], "launch failed (%s)" % (response["message"])
429            )
430        return response
431
432    def build_and_launch(
433        self,
434        program,
435        args=None,
436        cwd=None,
437        env=None,
438        stopOnEntry=False,
439        disableASLR=True,
440        disableSTDIO=False,
441        shellExpandArguments=False,
442        trace=False,
443        initCommands=None,
444        preRunCommands=None,
445        stopCommands=None,
446        exitCommands=None,
447        terminateCommands=None,
448        sourcePath=None,
449        debuggerRoot=None,
450        sourceInitFile=False,
451        runInTerminal=False,
452        disconnectAutomatically=True,
453        postRunCommands=None,
454        lldbDAPEnv=None,
455        enableAutoVariableSummaries=False,
456        enableSyntheticChildDebugging=False,
457        commandEscapePrefix=None,
458        customFrameFormat=None,
459        customThreadFormat=None,
460        launchCommands=None,
461        expectFailure=False,
462    ):
463        """Build the default Makefile target, create the DAP debug adaptor,
464        and launch the process.
465        """
466        self.build_and_create_debug_adaptor(lldbDAPEnv)
467        self.assertTrue(os.path.exists(program), "executable must exist")
468
469        return self.launch(
470            program,
471            args,
472            cwd,
473            env,
474            stopOnEntry,
475            disableASLR,
476            disableSTDIO,
477            shellExpandArguments,
478            trace,
479            initCommands,
480            preRunCommands,
481            stopCommands,
482            exitCommands,
483            terminateCommands,
484            sourcePath,
485            debuggerRoot,
486            sourceInitFile,
487            runInTerminal=runInTerminal,
488            disconnectAutomatically=disconnectAutomatically,
489            postRunCommands=postRunCommands,
490            enableAutoVariableSummaries=enableAutoVariableSummaries,
491            enableSyntheticChildDebugging=enableSyntheticChildDebugging,
492            commandEscapePrefix=commandEscapePrefix,
493            customFrameFormat=customFrameFormat,
494            customThreadFormat=customThreadFormat,
495            launchCommands=launchCommands,
496            expectFailure=expectFailure,
497        )
498
499    def getBuiltinDebugServerTool(self):
500        # Tries to find simulation/lldb-server/gdbserver tool path.
501        server_tool = None
502        if lldbplatformutil.getPlatform() == "linux":
503            server_tool = lldbgdbserverutils.get_lldb_server_exe()
504            if server_tool is None:
505                self.dap_server.request_disconnect(terminateDebuggee=True)
506                self.assertIsNotNone(server_tool, "lldb-server not found.")
507        elif lldbplatformutil.getPlatform() == "macosx":
508            server_tool = lldbgdbserverutils.get_debugserver_exe()
509            if server_tool is None:
510                self.dap_server.request_disconnect(terminateDebuggee=True)
511                self.assertIsNotNone(server_tool, "debugserver not found.")
512        return server_tool
513