xref: /llvm-project/lldb/examples/python/crashlog_scripted_process.py (revision 86dddbe3b54eae22db6e208e6bc1c3cda9b7e149)
1import os, json, struct, signal, uuid, tempfile
2
3from typing import Any, Dict
4
5import lldb
6from lldb.plugins.scripted_process import ScriptedProcess
7from lldb.plugins.scripted_process import ScriptedThread
8
9from lldb.macosx.crashlog import CrashLog, CrashLogParser
10
11
12class CrashLogScriptedProcess(ScriptedProcess):
13    def set_crashlog(self, crashlog):
14        self.crashlog = crashlog
15        if self.crashlog.process_id:
16            if type(self.crashlog.process_id) is int:
17                self.pid = self.crashlog.process_id
18            elif type(self.crashlog.process_id) is str:
19                self.pid = int(self.crashlog.process_id, 0)
20            else:
21                self.pid = super().get_process_id()
22        self.addr_mask = self.crashlog.addr_mask
23        self.crashed_thread_idx = self.crashlog.crashed_thread_idx
24        self.loaded_images = []
25        self.exception = self.crashlog.exception
26        self.app_specific_thread = None
27        if hasattr(self.crashlog, "asi"):
28            self.metadata["asi"] = self.crashlog.asi
29        if hasattr(self.crashlog, "asb"):
30            self.extended_thread_info = self.crashlog.asb
31
32        crashlog.load_images(self.options, self.loaded_images)
33
34        for thread in self.crashlog.threads:
35            if (
36                hasattr(thread, "app_specific_backtrace")
37                and thread.app_specific_backtrace
38            ):
39                # We don't want to include the Application Specific Backtrace
40                # Thread into the Scripted Process' Thread list.
41                # Instead, we will try to extract the stackframe pcs from the
42                # backtrace and inject that as the extended thread info.
43                self.app_specific_thread = thread
44                continue
45
46            self.threads[thread.index] = CrashLogScriptedThread(self, None, thread)
47
48        if self.app_specific_thread:
49            self.extended_thread_info = CrashLogScriptedThread.resolve_stackframes(
50                self.app_specific_thread, self.addr_mask, self.target
51            )
52
53    class CrashLogOptions:
54        load_all_images = False
55        crashed_only = True
56        no_parallel_image_loading = False
57
58    def __init__(self, exe_ctx: lldb.SBExecutionContext, args: lldb.SBStructuredData):
59        super().__init__(exe_ctx, args)
60
61        if not self.target or not self.target.IsValid():
62            # Return error
63            return
64
65        self.crashlog_path = None
66
67        crashlog_path = args.GetValueForKey("file_path")
68        if crashlog_path and crashlog_path.IsValid():
69            if crashlog_path.GetType() == lldb.eStructuredDataTypeString:
70                self.crashlog_path = crashlog_path.GetStringValue(4096)
71
72        if not self.crashlog_path:
73            # Return error
74            return
75
76        self.options = self.CrashLogOptions()
77
78        load_all_images = args.GetValueForKey("load_all_images")
79        if load_all_images and load_all_images.IsValid():
80            if load_all_images.GetType() == lldb.eStructuredDataTypeBoolean:
81                self.options.load_all_images = load_all_images.GetBooleanValue()
82
83        crashed_only = args.GetValueForKey("crashed_only")
84        if crashed_only and crashed_only.IsValid():
85            if crashed_only.GetType() == lldb.eStructuredDataTypeBoolean:
86                self.options.crashed_only = crashed_only.GetBooleanValue()
87
88        no_parallel_image_loading = args.GetValueForKey("no_parallel_image_loading")
89        if no_parallel_image_loading and no_parallel_image_loading.IsValid():
90            if no_parallel_image_loading.GetType() == lldb.eStructuredDataTypeBoolean:
91                self.options.no_parallel_image_loading = (
92                    no_parallel_image_loading.GetBooleanValue()
93                )
94
95        self.pid = super().get_process_id()
96        self.crashed_thread_idx = 0
97        self.exception = None
98        self.extended_thread_info = None
99
100    def read_memory_at_address(
101        self, addr: int, size: int, error: lldb.SBError
102    ) -> lldb.SBData:
103        # NOTE: CrashLogs don't contain any memory.
104        return lldb.SBData()
105
106    def get_loaded_images(self):
107        # TODO: Iterate over corefile_target modules and build a data structure
108        # from it.
109        return self.loaded_images
110
111    def should_stop(self) -> bool:
112        return True
113
114    def is_alive(self) -> bool:
115        return True
116
117    def get_scripted_thread_plugin(self):
118        return CrashLogScriptedThread.__module__ + "." + CrashLogScriptedThread.__name__
119
120    def get_process_metadata(self):
121        return self.metadata
122
123
124class CrashLogScriptedThread(ScriptedThread):
125    def create_register_ctx(self):
126        if not self.has_crashed:
127            return dict.fromkeys(
128                [*map(lambda reg: reg["name"], self.register_info["registers"])], 0
129            )
130
131        if not self.backing_thread or not len(self.backing_thread.registers):
132            return dict.fromkeys(
133                [*map(lambda reg: reg["name"], self.register_info["registers"])], 0
134            )
135
136        for reg in self.register_info["registers"]:
137            reg_name = reg["name"]
138            if reg_name in self.backing_thread.registers:
139                self.register_ctx[reg_name] = self.backing_thread.registers[reg_name]
140            else:
141                self.register_ctx[reg_name] = 0
142
143        return self.register_ctx
144
145    def resolve_stackframes(thread, addr_mask, target):
146        frames = []
147        for frame in thread.frames:
148            frame_pc = frame.pc & addr_mask
149            pc = frame_pc if frame.index == 0 or frame_pc == 0 else frame_pc - 1
150            sym_addr = lldb.SBAddress()
151            sym_addr.SetLoadAddress(pc, target)
152            if not sym_addr.IsValid():
153                continue
154            frames.append({"idx": frame.index, "pc": pc})
155        return frames
156
157    def create_stackframes(self):
158        if not (self.originating_process.options.load_all_images or self.has_crashed):
159            return None
160
161        if not self.backing_thread or not len(self.backing_thread.frames):
162            return None
163
164        self.frames = CrashLogScriptedThread.resolve_stackframes(
165            self.backing_thread, self.originating_process.addr_mask, self.target
166        )
167
168        return self.frames
169
170    def __init__(self, process, args, crashlog_thread):
171        super().__init__(process, args)
172
173        self.backing_thread = crashlog_thread
174        self.idx = self.backing_thread.index
175        self.tid = self.backing_thread.id
176        self.name = self.backing_thread.name
177        self.queue = self.backing_thread.queue
178        self.has_crashed = self.originating_process.crashed_thread_idx == self.idx
179        self.create_stackframes()
180
181    def get_state(self):
182        if not self.has_crashed:
183            return lldb.eStateStopped
184        return lldb.eStateCrashed
185
186    def get_stop_reason(self) -> Dict[str, Any]:
187        if not self.has_crashed:
188            return {"type": lldb.eStopReasonNone}
189        # TODO: Investigate what stop reason should be reported when crashed
190        stop_reason = {"type": lldb.eStopReasonException, "data": {}}
191        if self.originating_process.exception:
192            stop_reason["data"]["mach_exception"] = self.originating_process.exception
193        return stop_reason
194
195    def get_register_context(self) -> str:
196        if not self.register_ctx:
197            self.register_ctx = self.create_register_ctx()
198
199        return struct.pack(
200            "{}Q".format(len(self.register_ctx)), *self.register_ctx.values()
201        )
202
203    def get_extended_info(self):
204        if self.has_crashed:
205            self.extended_info = self.originating_process.extended_thread_info
206        return self.extended_info
207