xref: /llvm-project/lldb/packages/Python/lldbsuite/test/tools/lldb-dap/lldbdap_testcase.py (revision 4f48a81a620bc9280be4780f3554cdc9bda55bd3)
1import os
2import time
3
4import dap_server
5from lldbsuite.test.lldbtest import *
6from lldbsuite.test import lldbplatformutil
7import lldbgdbserverutils
8
9
10class DAPTestCaseBase(TestBase):
11    # set timeout based on whether ASAN was enabled or not. Increase
12    # timeout by a factor of 10 if ASAN is enabled.
13    timeoutval = 10 * (10 if ('ASAN_OPTIONS' in os.environ) else 1)
14    NO_DEBUG_INFO_TESTCASE = True
15
16    def create_debug_adaptor(self, lldbDAPEnv=None):
17        """Create the Visual Studio Code debug adaptor"""
18        self.assertTrue(
19            is_exe(self.lldbDAPExec), "lldb-dap must exist and be executable"
20        )
21        log_file_path = self.getBuildArtifact("dap.txt")
22        self.dap_server = dap_server.DebugAdaptorServer(
23            executable=self.lldbDAPExec,
24            init_commands=self.setUpCommands(),
25            log_file=log_file_path,
26            env=lldbDAPEnv,
27        )
28
29    def build_and_create_debug_adaptor(self, lldbDAPEnv=None):
30        self.build()
31        self.create_debug_adaptor(lldbDAPEnv)
32
33    def set_source_breakpoints(self, source_path, lines, data=None):
34        """Sets source breakpoints and returns an array of strings containing
35        the breakpoint IDs ("1", "2") for each breakpoint that was set.
36        Parameter data is array of data objects for breakpoints.
37        Each object in data is 1:1 mapping with the entry in lines.
38        It contains optional location/hitCondition/logMessage parameters.
39        """
40        response = self.dap_server.request_setBreakpoints(source_path, lines, data)
41        if response is None:
42            return []
43        breakpoints = response["body"]["breakpoints"]
44        breakpoint_ids = []
45        for breakpoint in breakpoints:
46            breakpoint_ids.append("%i" % (breakpoint["id"]))
47        return breakpoint_ids
48
49    def set_function_breakpoints(self, functions, condition=None, hitCondition=None):
50        """Sets breakpoints by function name given an array of function names
51        and returns an array of strings containing the breakpoint IDs
52        ("1", "2") for each breakpoint that was set.
53        """
54        response = self.dap_server.request_setFunctionBreakpoints(
55            functions, condition=condition, hitCondition=hitCondition
56        )
57        if response is None:
58            return []
59        breakpoints = response["body"]["breakpoints"]
60        breakpoint_ids = []
61        for breakpoint in breakpoints:
62            breakpoint_ids.append("%i" % (breakpoint["id"]))
63        return breakpoint_ids
64
65    def waitUntil(self, condition_callback):
66        for _ in range(20):
67            if condition_callback():
68                return True
69            time.sleep(0.5)
70        return False
71
72    def verify_breakpoint_hit(self, breakpoint_ids):
73        """Wait for the process we are debugging to stop, and verify we hit
74        any breakpoint location in the "breakpoint_ids" array.
75        "breakpoint_ids" should be a list of breakpoint ID strings
76        (["1", "2"]). The return value from self.set_source_breakpoints()
77        or self.set_function_breakpoints() can be passed to this function"""
78        stopped_events = self.dap_server.wait_for_stopped()
79        for stopped_event in stopped_events:
80            if "body" in stopped_event:
81                body = stopped_event["body"]
82                if "reason" not in body:
83                    continue
84                if (
85                    body["reason"] != "breakpoint"
86                    and body["reason"] != "instruction breakpoint"
87                ):
88                    continue
89                if "description" not in body:
90                    continue
91                # Descriptions for breakpoints will be in the form
92                # "breakpoint 1.1", so look for any description that matches
93                # ("breakpoint 1.") in the description field as verification
94                # that one of the breakpoint locations was hit. DAP doesn't
95                # allow breakpoints to have multiple locations, but LLDB does.
96                # So when looking at the description we just want to make sure
97                # the right breakpoint matches and not worry about the actual
98                # location.
99                description = body["description"]
100                for breakpoint_id in breakpoint_ids:
101                    match_desc = "breakpoint %s." % (breakpoint_id)
102                    if match_desc in description:
103                        return
104        self.assertTrue(False, "breakpoint not hit")
105
106    def verify_stop_exception_info(self, expected_description, timeout=timeoutval):
107        """Wait for the process we are debugging to stop, and verify the stop
108        reason is 'exception' and that the description matches
109        'expected_description'
110        """
111        stopped_events = self.dap_server.wait_for_stopped(timeout=timeout)
112        for stopped_event in stopped_events:
113            print("stopped_event", stopped_event)
114            if "body" in stopped_event:
115                body = stopped_event["body"]
116                if "reason" not in body:
117                    continue
118                if body["reason"] != "exception":
119                    continue
120                if "description" not in body:
121                    continue
122                description = body["description"]
123                if expected_description == description:
124                    return True
125        return False
126
127    def verify_commands(self, flavor, output, commands):
128        self.assertTrue(output and len(output) > 0, "expect console output")
129        lines = output.splitlines()
130        prefix = "(lldb) "
131        for cmd in commands:
132            found = False
133            for line in lines:
134                if len(cmd) > 0 and (cmd[0] == "!" or cmd[0] == "?"):
135                    cmd = cmd[1:]
136                if line.startswith(prefix) and cmd in line:
137                    found = True
138                    break
139            self.assertTrue(
140                found, "verify '%s' found in console output for '%s'" % (cmd, flavor)
141            )
142
143    def get_dict_value(self, d, key_path):
144        """Verify each key in the key_path array is in contained in each
145        dictionary within "d". Assert if any key isn't in the
146        corresponding dictionary. This is handy for grabbing values from VS
147        Code response dictionary like getting
148        response['body']['stackFrames']
149        """
150        value = d
151        for key in key_path:
152            if key in value:
153                value = value[key]
154            else:
155                self.assertTrue(
156                    key in value,
157                    'key "%s" from key_path "%s" not in "%s"' % (key, key_path, d),
158                )
159        return value
160
161    def get_stackFrames_and_totalFramesCount(
162        self, threadId=None, startFrame=None, levels=None, dump=False
163    ):
164        response = self.dap_server.request_stackTrace(
165            threadId=threadId, startFrame=startFrame, levels=levels, dump=dump
166        )
167        if response:
168            stackFrames = self.get_dict_value(response, ["body", "stackFrames"])
169            totalFrames = self.get_dict_value(response, ["body", "totalFrames"])
170            self.assertTrue(
171                totalFrames > 0,
172                "verify totalFrames count is provided by extension that supports "
173                "async frames loading",
174            )
175            return (stackFrames, totalFrames)
176        return (None, 0)
177
178    def get_stackFrames(self, threadId=None, startFrame=None, levels=None, dump=False):
179        (stackFrames, totalFrames) = self.get_stackFrames_and_totalFramesCount(
180            threadId=threadId, startFrame=startFrame, levels=levels, dump=dump
181        )
182        return stackFrames
183
184    def get_exceptionInfo(self, threadId=None):
185        response = self.dap_server.request_exceptionInfo(threadId=threadId)
186        return self.get_dict_value(response, ["body"])
187
188    def get_source_and_line(self, threadId=None, frameIndex=0):
189        stackFrames = self.get_stackFrames(
190            threadId=threadId, startFrame=frameIndex, levels=1
191        )
192        if stackFrames is not None:
193            stackFrame = stackFrames[0]
194            ["source", "path"]
195            if "source" in stackFrame:
196                source = stackFrame["source"]
197                if "path" in source:
198                    if "line" in stackFrame:
199                        return (source["path"], stackFrame["line"])
200        return ("", 0)
201
202    def get_stdout(self, timeout=0.0):
203        return self.dap_server.get_output("stdout", timeout=timeout)
204
205    def get_console(self, timeout=0.0):
206        return self.dap_server.get_output("console", timeout=timeout)
207
208    def collect_console(self, timeout_secs, pattern=None):
209        return self.dap_server.collect_output(
210            "console", timeout_secs=timeout_secs, pattern=pattern
211        )
212
213    def collect_stdout(self, timeout_secs, pattern=None):
214        return self.dap_server.collect_output(
215            "stdout", timeout_secs=timeout_secs, pattern=pattern
216        )
217
218    def get_local_as_int(self, name, threadId=None):
219        value = self.dap_server.get_local_variable_value(name, threadId=threadId)
220        # 'value' may have the variable value and summary.
221        # Extract the variable value since summary can have nonnumeric characters.
222        value = value.split(" ")[0]
223        if value.startswith("0x"):
224            return int(value, 16)
225        elif value.startswith("0"):
226            return int(value, 8)
227        else:
228            return int(value)
229
230    def set_local(self, name, value, id=None):
231        """Set a top level local variable only."""
232        return self.dap_server.request_setVariable(1, name, str(value), id=id)
233
234    def set_global(self, name, value, id=None):
235        """Set a top level global variable only."""
236        return self.dap_server.request_setVariable(2, name, str(value), id=id)
237
238    def stepIn(
239        self, threadId=None, targetId=None, waitForStop=True, granularity="statement"
240    ):
241        response = self.dap_server.request_stepIn(
242            threadId=threadId, targetId=targetId, granularity=granularity
243        )
244        self.assertTrue(response["success"])
245        if waitForStop:
246            return self.dap_server.wait_for_stopped()
247        return None
248
249    def stepOver(self, threadId=None, waitForStop=True, granularity="statement"):
250        self.dap_server.request_next(threadId=threadId, granularity=granularity)
251        if waitForStop:
252            return self.dap_server.wait_for_stopped()
253        return None
254
255    def stepOut(self, threadId=None, waitForStop=True):
256        self.dap_server.request_stepOut(threadId=threadId)
257        if waitForStop:
258            return self.dap_server.wait_for_stopped()
259        return None
260
261    def continue_to_next_stop(self):
262        self.dap_server.request_continue()
263        return self.dap_server.wait_for_stopped()
264
265    def continue_to_breakpoints(self, breakpoint_ids):
266        self.dap_server.request_continue()
267        self.verify_breakpoint_hit(breakpoint_ids)
268
269    def continue_to_exception_breakpoint(self, filter_label):
270        self.dap_server.request_continue()
271        self.assertTrue(
272            self.verify_stop_exception_info(filter_label),
273            'verify we got "%s"' % (filter_label),
274        )
275
276    def continue_to_exit(self, exitCode=0):
277        self.dap_server.request_continue()
278        stopped_events = self.dap_server.wait_for_stopped()
279        self.assertEqual(
280            len(stopped_events), 1, "stopped_events = {}".format(stopped_events)
281        )
282        self.assertEqual(
283            stopped_events[0]["event"], "exited", "make sure program ran to completion"
284        )
285        self.assertEqual(
286            stopped_events[0]["body"]["exitCode"],
287            exitCode,
288            "exitCode == %i" % (exitCode),
289        )
290
291    def disassemble(self, threadId=None, frameIndex=None):
292        stackFrames = self.get_stackFrames(
293            threadId=threadId, startFrame=frameIndex, levels=1
294        )
295        self.assertIsNotNone(stackFrames)
296        memoryReference = stackFrames[0]["instructionPointerReference"]
297        self.assertIsNotNone(memoryReference)
298
299        if memoryReference not in self.dap_server.disassembled_instructions:
300            self.dap_server.request_disassemble(memoryReference=memoryReference)
301
302        return self.dap_server.disassembled_instructions[memoryReference]
303
304    def attach(
305        self,
306        program=None,
307        pid=None,
308        waitFor=None,
309        trace=None,
310        initCommands=None,
311        preRunCommands=None,
312        stopCommands=None,
313        exitCommands=None,
314        attachCommands=None,
315        coreFile=None,
316        disconnectAutomatically=True,
317        terminateCommands=None,
318        postRunCommands=None,
319        sourceMap=None,
320        sourceInitFile=False,
321        expectFailure=False,
322        gdbRemotePort=None,
323        gdbRemoteHostname=None,
324    ):
325        """Build the default Makefile target, create the DAP debug adaptor,
326        and attach to the process.
327        """
328
329        # Make sure we disconnect and terminate the DAP debug adaptor even
330        # if we throw an exception during the test case.
331        def cleanup():
332            if disconnectAutomatically:
333                self.dap_server.request_disconnect(terminateDebuggee=True)
334            self.dap_server.terminate()
335
336        # Execute the cleanup function during test case tear down.
337        self.addTearDownHook(cleanup)
338        # Initialize and launch the program
339        self.dap_server.request_initialize(sourceInitFile)
340        response = self.dap_server.request_attach(
341            program=program,
342            pid=pid,
343            waitFor=waitFor,
344            trace=trace,
345            initCommands=initCommands,
346            preRunCommands=preRunCommands,
347            stopCommands=stopCommands,
348            exitCommands=exitCommands,
349            attachCommands=attachCommands,
350            terminateCommands=terminateCommands,
351            coreFile=coreFile,
352            postRunCommands=postRunCommands,
353            sourceMap=sourceMap,
354            gdbRemotePort=gdbRemotePort,
355            gdbRemoteHostname=gdbRemoteHostname,
356        )
357        if expectFailure:
358            return response
359        if not (response and response["success"]):
360            self.assertTrue(
361                response["success"], "attach failed (%s)" % (response["message"])
362            )
363
364    def launch(
365        self,
366        program=None,
367        args=None,
368        cwd=None,
369        env=None,
370        stopOnEntry=False,
371        disableASLR=False,
372        disableSTDIO=False,
373        shellExpandArguments=False,
374        trace=False,
375        initCommands=None,
376        preRunCommands=None,
377        stopCommands=None,
378        exitCommands=None,
379        terminateCommands=None,
380        sourcePath=None,
381        debuggerRoot=None,
382        sourceInitFile=False,
383        launchCommands=None,
384        sourceMap=None,
385        disconnectAutomatically=True,
386        runInTerminal=False,
387        expectFailure=False,
388        postRunCommands=None,
389        enableAutoVariableSummaries=False,
390        displayExtendedBacktrace=False,
391        enableSyntheticChildDebugging=False,
392        commandEscapePrefix=None,
393        customFrameFormat=None,
394        customThreadFormat=None,
395    ):
396        """Sending launch request to dap"""
397
398        # Make sure we disconnect and terminate the DAP debug adapter,
399        # if we throw an exception during the test case
400        def cleanup():
401            if disconnectAutomatically:
402                self.dap_server.request_disconnect(terminateDebuggee=True)
403            self.dap_server.terminate()
404
405        # Execute the cleanup function during test case tear down.
406        self.addTearDownHook(cleanup)
407
408        # Initialize and launch the program
409        self.dap_server.request_initialize(sourceInitFile)
410        response = self.dap_server.request_launch(
411            program,
412            args=args,
413            cwd=cwd,
414            env=env,
415            stopOnEntry=stopOnEntry,
416            disableASLR=disableASLR,
417            disableSTDIO=disableSTDIO,
418            shellExpandArguments=shellExpandArguments,
419            trace=trace,
420            initCommands=initCommands,
421            preRunCommands=preRunCommands,
422            stopCommands=stopCommands,
423            exitCommands=exitCommands,
424            terminateCommands=terminateCommands,
425            sourcePath=sourcePath,
426            debuggerRoot=debuggerRoot,
427            launchCommands=launchCommands,
428            sourceMap=sourceMap,
429            runInTerminal=runInTerminal,
430            postRunCommands=postRunCommands,
431            enableAutoVariableSummaries=enableAutoVariableSummaries,
432            displayExtendedBacktrace=displayExtendedBacktrace,
433            enableSyntheticChildDebugging=enableSyntheticChildDebugging,
434            commandEscapePrefix=commandEscapePrefix,
435            customFrameFormat=customFrameFormat,
436            customThreadFormat=customThreadFormat,
437        )
438
439        if expectFailure:
440            return response
441
442        if not (response and response["success"]):
443            self.assertTrue(
444                response["success"], "launch failed (%s)" % (response["message"])
445            )
446        return response
447
448    def build_and_launch(
449        self,
450        program,
451        args=None,
452        cwd=None,
453        env=None,
454        stopOnEntry=False,
455        disableASLR=False,
456        disableSTDIO=False,
457        shellExpandArguments=False,
458        trace=False,
459        initCommands=None,
460        preRunCommands=None,
461        stopCommands=None,
462        exitCommands=None,
463        terminateCommands=None,
464        sourcePath=None,
465        debuggerRoot=None,
466        sourceInitFile=False,
467        runInTerminal=False,
468        disconnectAutomatically=True,
469        postRunCommands=None,
470        lldbDAPEnv=None,
471        enableAutoVariableSummaries=False,
472        displayExtendedBacktrace=False,
473        enableSyntheticChildDebugging=False,
474        commandEscapePrefix=None,
475        customFrameFormat=None,
476        customThreadFormat=None,
477        launchCommands=None,
478        expectFailure=False,
479    ):
480        """Build the default Makefile target, create the DAP debug adaptor,
481        and launch the process.
482        """
483        self.build_and_create_debug_adaptor(lldbDAPEnv)
484        self.assertTrue(os.path.exists(program), "executable must exist")
485
486        return self.launch(
487            program,
488            args,
489            cwd,
490            env,
491            stopOnEntry,
492            disableASLR,
493            disableSTDIO,
494            shellExpandArguments,
495            trace,
496            initCommands,
497            preRunCommands,
498            stopCommands,
499            exitCommands,
500            terminateCommands,
501            sourcePath,
502            debuggerRoot,
503            sourceInitFile,
504            runInTerminal=runInTerminal,
505            disconnectAutomatically=disconnectAutomatically,
506            postRunCommands=postRunCommands,
507            enableAutoVariableSummaries=enableAutoVariableSummaries,
508            enableSyntheticChildDebugging=enableSyntheticChildDebugging,
509            displayExtendedBacktrace=displayExtendedBacktrace,
510            commandEscapePrefix=commandEscapePrefix,
511            customFrameFormat=customFrameFormat,
512            customThreadFormat=customThreadFormat,
513            launchCommands=launchCommands,
514            expectFailure=expectFailure,
515        )
516
517    def getBuiltinDebugServerTool(self):
518        # Tries to find simulation/lldb-server/gdbserver tool path.
519        server_tool = None
520        if lldbplatformutil.getPlatform() == "linux":
521            server_tool = lldbgdbserverutils.get_lldb_server_exe()
522            if server_tool is None:
523                self.dap_server.request_disconnect(terminateDebuggee=True)
524                self.assertIsNotNone(server_tool, "lldb-server not found.")
525        elif lldbplatformutil.getPlatform() == "macosx":
526            server_tool = lldbgdbserverutils.get_debugserver_exe()
527            if server_tool is None:
528                self.dap_server.request_disconnect(terminateDebuggee=True)
529                self.assertIsNotNone(server_tool, "debugserver not found.")
530        return server_tool
531