xref: /llvm-project/lldb/packages/Python/lldbsuite/test/tools/lldb-dap/lldbdap_testcase.py (revision 2f2e31c3c980407b2660b4f5d10e7cdb3fa79138)
1import os
2import time
3
4import dap_server
5from lldbsuite.test.lldbtest import *
6
7
8class DAPTestCaseBase(TestBase):
9    # set timeout based on whether ASAN was enabled or not. Increase
10    # timeout by a factor of 10 if ASAN is enabled.
11    timeoutval = 10 * (10 if ('ASAN_OPTIONS' in os.environ) else 1)
12    NO_DEBUG_INFO_TESTCASE = True
13
14    def create_debug_adaptor(self, lldbDAPEnv=None):
15        """Create the Visual Studio Code debug adaptor"""
16        self.assertTrue(
17            is_exe(self.lldbDAPExec), "lldb-dap must exist and be executable"
18        )
19        log_file_path = self.getBuildArtifact("dap.txt")
20        self.dap_server = dap_server.DebugAdaptorServer(
21            executable=self.lldbDAPExec,
22            init_commands=self.setUpCommands(),
23            log_file=log_file_path,
24            env=lldbDAPEnv,
25        )
26
27    def build_and_create_debug_adaptor(self, lldbDAPEnv=None):
28        self.build()
29        self.create_debug_adaptor(lldbDAPEnv)
30
31    def set_source_breakpoints(self, source_path, lines, data=None):
32        """Sets source breakpoints and returns an array of strings containing
33        the breakpoint IDs ("1", "2") for each breakpoint that was set.
34        Parameter data is array of data objects for breakpoints.
35        Each object in data is 1:1 mapping with the entry in lines.
36        It contains optional location/hitCondition/logMessage parameters.
37        """
38        response = self.dap_server.request_setBreakpoints(source_path, lines, data)
39        if response is None:
40            return []
41        breakpoints = response["body"]["breakpoints"]
42        breakpoint_ids = []
43        for breakpoint in breakpoints:
44            breakpoint_ids.append("%i" % (breakpoint["id"]))
45        return breakpoint_ids
46
47    def set_function_breakpoints(self, functions, condition=None, hitCondition=None):
48        """Sets breakpoints by function name given an array of function names
49        and returns an array of strings containing the breakpoint IDs
50        ("1", "2") for each breakpoint that was set.
51        """
52        response = self.dap_server.request_setFunctionBreakpoints(
53            functions, condition=condition, hitCondition=hitCondition
54        )
55        if response is None:
56            return []
57        breakpoints = response["body"]["breakpoints"]
58        breakpoint_ids = []
59        for breakpoint in breakpoints:
60            breakpoint_ids.append("%i" % (breakpoint["id"]))
61        return breakpoint_ids
62
63    def waitUntil(self, condition_callback):
64        for _ in range(20):
65            if condition_callback():
66                return True
67            time.sleep(0.5)
68        return False
69
70    def verify_breakpoint_hit(self, breakpoint_ids):
71        """Wait for the process we are debugging to stop, and verify we hit
72        any breakpoint location in the "breakpoint_ids" array.
73        "breakpoint_ids" should be a list of breakpoint ID strings
74        (["1", "2"]). The return value from self.set_source_breakpoints()
75        or self.set_function_breakpoints() can be passed to this function"""
76        stopped_events = self.dap_server.wait_for_stopped()
77        for stopped_event in stopped_events:
78            if "body" in stopped_event:
79                body = stopped_event["body"]
80                if "reason" not in body:
81                    continue
82                if body["reason"] != "breakpoint":
83                    continue
84                if "description" not in body:
85                    continue
86                # Descriptions for breakpoints will be in the form
87                # "breakpoint 1.1", so look for any description that matches
88                # ("breakpoint 1.") in the description field as verification
89                # that one of the breakpoint locations was hit. DAP doesn't
90                # allow breakpoints to have multiple locations, but LLDB does.
91                # So when looking at the description we just want to make sure
92                # the right breakpoint matches and not worry about the actual
93                # location.
94                description = body["description"]
95                for breakpoint_id in breakpoint_ids:
96                    match_desc = "breakpoint %s." % (breakpoint_id)
97                    if match_desc in description:
98                        return
99        self.assertTrue(False, "breakpoint not hit")
100
101    def verify_stop_exception_info(self, expected_description):
102        """Wait for the process we are debugging to stop, and verify the stop
103        reason is 'exception' and that the description matches
104        'expected_description'
105        """
106        stopped_events = self.dap_server.wait_for_stopped()
107        for stopped_event in stopped_events:
108            if "body" in stopped_event:
109                body = stopped_event["body"]
110                if "reason" not in body:
111                    continue
112                if body["reason"] != "exception":
113                    continue
114                if "description" not in body:
115                    continue
116                description = body["description"]
117                if expected_description == description:
118                    return True
119        return False
120
121    def verify_commands(self, flavor, output, commands):
122        self.assertTrue(output and len(output) > 0, "expect console output")
123        lines = output.splitlines()
124        prefix = "(lldb) "
125        for cmd in commands:
126            found = False
127            for line in lines:
128                if len(cmd) > 0 and (cmd[0] == "!" or cmd[0] == "?"):
129                    cmd = cmd[1:]
130                if line.startswith(prefix) and cmd in line:
131                    found = True
132                    break
133            self.assertTrue(
134                found, "verify '%s' found in console output for '%s'" % (cmd, flavor)
135            )
136
137    def get_dict_value(self, d, key_path):
138        """Verify each key in the key_path array is in contained in each
139        dictionary within "d". Assert if any key isn't in the
140        corresponding dictionary. This is handy for grabbing values from VS
141        Code response dictionary like getting
142        response['body']['stackFrames']
143        """
144        value = d
145        for key in key_path:
146            if key in value:
147                value = value[key]
148            else:
149                self.assertTrue(
150                    key in value,
151                    'key "%s" from key_path "%s" not in "%s"' % (key, key_path, d),
152                )
153        return value
154
155    def get_stackFrames_and_totalFramesCount(
156        self, threadId=None, startFrame=None, levels=None, dump=False
157    ):
158        response = self.dap_server.request_stackTrace(
159            threadId=threadId, startFrame=startFrame, levels=levels, dump=dump
160        )
161        if response:
162            stackFrames = self.get_dict_value(response, ["body", "stackFrames"])
163            totalFrames = self.get_dict_value(response, ["body", "totalFrames"])
164            self.assertTrue(
165                totalFrames > 0,
166                "verify totalFrames count is provided by extension that supports "
167                "async frames loading",
168            )
169            return (stackFrames, totalFrames)
170        return (None, 0)
171
172    def get_stackFrames(self, threadId=None, startFrame=None, levels=None, dump=False):
173        (stackFrames, totalFrames) = self.get_stackFrames_and_totalFramesCount(
174            threadId=threadId, startFrame=startFrame, levels=levels, dump=dump
175        )
176        return stackFrames
177
178    def get_source_and_line(self, threadId=None, frameIndex=0):
179        stackFrames = self.get_stackFrames(
180            threadId=threadId, startFrame=frameIndex, levels=1
181        )
182        if stackFrames is not None:
183            stackFrame = stackFrames[0]
184            ["source", "path"]
185            if "source" in stackFrame:
186                source = stackFrame["source"]
187                if "path" in source:
188                    if "line" in stackFrame:
189                        return (source["path"], stackFrame["line"])
190        return ("", 0)
191
192    def get_stdout(self, timeout=0.0):
193        return self.dap_server.get_output("stdout", timeout=timeout)
194
195    def get_console(self, timeout=0.0):
196        return self.dap_server.get_output("console", timeout=timeout)
197
198    def collect_console(self, duration):
199        return self.dap_server.collect_output("console", duration=duration)
200
201    def get_local_as_int(self, name, threadId=None):
202        value = self.dap_server.get_local_variable_value(name, threadId=threadId)
203        # 'value' may have the variable value and summary.
204        # Extract the variable value since summary can have nonnumeric characters.
205        value = value.split(" ")[0]
206        if value.startswith("0x"):
207            return int(value, 16)
208        elif value.startswith("0"):
209            return int(value, 8)
210        else:
211            return int(value)
212
213    def set_local(self, name, value, id=None):
214        """Set a top level local variable only."""
215        return self.dap_server.request_setVariable(1, name, str(value), id=id)
216
217    def set_global(self, name, value, id=None):
218        """Set a top level global variable only."""
219        return self.dap_server.request_setVariable(2, name, str(value), id=id)
220
221    def stepIn(self, threadId=None, targetId=None, waitForStop=True):
222        self.dap_server.request_stepIn(threadId=threadId, targetId=targetId)
223        if waitForStop:
224            return self.dap_server.wait_for_stopped()
225        return None
226
227    def stepOver(self, threadId=None, waitForStop=True):
228        self.dap_server.request_next(threadId=threadId)
229        if waitForStop:
230            return self.dap_server.wait_for_stopped()
231        return None
232
233    def stepOut(self, threadId=None, waitForStop=True):
234        self.dap_server.request_stepOut(threadId=threadId)
235        if waitForStop:
236            return self.dap_server.wait_for_stopped()
237        return None
238
239    def continue_to_next_stop(self):
240        self.dap_server.request_continue()
241        return self.dap_server.wait_for_stopped()
242
243    def continue_to_breakpoints(self, breakpoint_ids):
244        self.dap_server.request_continue()
245        self.verify_breakpoint_hit(breakpoint_ids)
246
247    def continue_to_exception_breakpoint(self, filter_label):
248        self.dap_server.request_continue()
249        self.assertTrue(
250            self.verify_stop_exception_info(filter_label),
251            'verify we got "%s"' % (filter_label),
252        )
253
254    def continue_to_exit(self, exitCode=0):
255        self.dap_server.request_continue()
256        stopped_events = self.dap_server.wait_for_stopped()
257        self.assertEqual(
258            len(stopped_events), 1, "stopped_events = {}".format(stopped_events)
259        )
260        self.assertEqual(
261            stopped_events[0]["event"], "exited", "make sure program ran to completion"
262        )
263        self.assertEqual(
264            stopped_events[0]["body"]["exitCode"],
265            exitCode,
266            "exitCode == %i" % (exitCode),
267        )
268
269    def disassemble(self, threadId=None, frameIndex=None):
270        stackFrames = self.get_stackFrames(
271            threadId=threadId, startFrame=frameIndex, levels=1
272        )
273        self.assertIsNotNone(stackFrames)
274        memoryReference = stackFrames[0]["instructionPointerReference"]
275        self.assertIsNotNone(memoryReference)
276
277        if memoryReference not in self.dap_server.disassembled_instructions:
278            self.dap_server.request_disassemble(memoryReference=memoryReference)
279
280        return self.dap_server.disassembled_instructions[memoryReference]
281
282    def attach(
283        self,
284        program=None,
285        pid=None,
286        waitFor=None,
287        trace=None,
288        initCommands=None,
289        preRunCommands=None,
290        stopCommands=None,
291        exitCommands=None,
292        attachCommands=None,
293        coreFile=None,
294        disconnectAutomatically=True,
295        terminateCommands=None,
296        postRunCommands=None,
297        sourceMap=None,
298        sourceInitFile=False,
299        expectFailure=False,
300    ):
301        """Build the default Makefile target, create the DAP debug adaptor,
302        and attach to the process.
303        """
304
305        # Make sure we disconnect and terminate the DAP debug adaptor even
306        # if we throw an exception during the test case.
307        def cleanup():
308            if disconnectAutomatically:
309                self.dap_server.request_disconnect(terminateDebuggee=True)
310            self.dap_server.terminate()
311
312        # Execute the cleanup function during test case tear down.
313        self.addTearDownHook(cleanup)
314        # Initialize and launch the program
315        self.dap_server.request_initialize(sourceInitFile)
316        response = self.dap_server.request_attach(
317            program=program,
318            pid=pid,
319            waitFor=waitFor,
320            trace=trace,
321            initCommands=initCommands,
322            preRunCommands=preRunCommands,
323            stopCommands=stopCommands,
324            exitCommands=exitCommands,
325            attachCommands=attachCommands,
326            terminateCommands=terminateCommands,
327            coreFile=coreFile,
328            postRunCommands=postRunCommands,
329            sourceMap=sourceMap,
330        )
331        if expectFailure:
332            return response
333        if not (response and response["success"]):
334            self.assertTrue(
335                response["success"], "attach failed (%s)" % (response["message"])
336            )
337
338    def launch(
339        self,
340        program=None,
341        args=None,
342        cwd=None,
343        env=None,
344        stopOnEntry=False,
345        disableASLR=True,
346        disableSTDIO=False,
347        shellExpandArguments=False,
348        trace=False,
349        initCommands=None,
350        preRunCommands=None,
351        stopCommands=None,
352        exitCommands=None,
353        terminateCommands=None,
354        sourcePath=None,
355        debuggerRoot=None,
356        sourceInitFile=False,
357        launchCommands=None,
358        sourceMap=None,
359        disconnectAutomatically=True,
360        runInTerminal=False,
361        expectFailure=False,
362        postRunCommands=None,
363        enableAutoVariableSummaries=False,
364        enableSyntheticChildDebugging=False,
365        commandEscapePrefix=None,
366        customFrameFormat=None,
367        customThreadFormat=None,
368    ):
369        """Sending launch request to dap"""
370
371        # Make sure we disconnect and terminate the DAP debug adapter,
372        # if we throw an exception during the test case
373        def cleanup():
374            if disconnectAutomatically:
375                self.dap_server.request_disconnect(terminateDebuggee=True)
376            self.dap_server.terminate()
377
378        # Execute the cleanup function during test case tear down.
379        self.addTearDownHook(cleanup)
380
381        # Initialize and launch the program
382        self.dap_server.request_initialize(sourceInitFile)
383        response = self.dap_server.request_launch(
384            program,
385            args=args,
386            cwd=cwd,
387            env=env,
388            stopOnEntry=stopOnEntry,
389            disableASLR=disableASLR,
390            disableSTDIO=disableSTDIO,
391            shellExpandArguments=shellExpandArguments,
392            trace=trace,
393            initCommands=initCommands,
394            preRunCommands=preRunCommands,
395            stopCommands=stopCommands,
396            exitCommands=exitCommands,
397            terminateCommands=terminateCommands,
398            sourcePath=sourcePath,
399            debuggerRoot=debuggerRoot,
400            launchCommands=launchCommands,
401            sourceMap=sourceMap,
402            runInTerminal=runInTerminal,
403            postRunCommands=postRunCommands,
404            enableAutoVariableSummaries=enableAutoVariableSummaries,
405            enableSyntheticChildDebugging=enableSyntheticChildDebugging,
406            commandEscapePrefix=commandEscapePrefix,
407            customFrameFormat=customFrameFormat,
408            customThreadFormat=customThreadFormat,
409        )
410
411        if expectFailure:
412            return response
413
414        if not (response and response["success"]):
415            self.assertTrue(
416                response["success"], "launch failed (%s)" % (response["message"])
417            )
418        return response
419
420    def build_and_launch(
421        self,
422        program,
423        args=None,
424        cwd=None,
425        env=None,
426        stopOnEntry=False,
427        disableASLR=True,
428        disableSTDIO=False,
429        shellExpandArguments=False,
430        trace=False,
431        initCommands=None,
432        preRunCommands=None,
433        stopCommands=None,
434        exitCommands=None,
435        terminateCommands=None,
436        sourcePath=None,
437        debuggerRoot=None,
438        sourceInitFile=False,
439        runInTerminal=False,
440        disconnectAutomatically=True,
441        postRunCommands=None,
442        lldbDAPEnv=None,
443        enableAutoVariableSummaries=False,
444        enableSyntheticChildDebugging=False,
445        commandEscapePrefix=None,
446        customFrameFormat=None,
447        customThreadFormat=None,
448        launchCommands=None,
449        expectFailure=False,
450    ):
451        """Build the default Makefile target, create the DAP debug adaptor,
452        and launch the process.
453        """
454        self.build_and_create_debug_adaptor(lldbDAPEnv)
455        self.assertTrue(os.path.exists(program), "executable must exist")
456
457        return self.launch(
458            program,
459            args,
460            cwd,
461            env,
462            stopOnEntry,
463            disableASLR,
464            disableSTDIO,
465            shellExpandArguments,
466            trace,
467            initCommands,
468            preRunCommands,
469            stopCommands,
470            exitCommands,
471            terminateCommands,
472            sourcePath,
473            debuggerRoot,
474            sourceInitFile,
475            runInTerminal=runInTerminal,
476            disconnectAutomatically=disconnectAutomatically,
477            postRunCommands=postRunCommands,
478            enableAutoVariableSummaries=enableAutoVariableSummaries,
479            enableSyntheticChildDebugging=enableSyntheticChildDebugging,
480            commandEscapePrefix=commandEscapePrefix,
481            customFrameFormat=customFrameFormat,
482            customThreadFormat=customThreadFormat,
483            launchCommands=launchCommands,
484            expectFailure=expectFailure,
485        )
486