xref: /llvm-project/lldb/examples/python/crashlog_scripted_process.py (revision 6bff2d51dcc7a369ba82fd44faa9eb207c621267)
1import os, json, struct, signal, uuid, tempfile
2
3from typing import Any, Dict
4
5import lldb
6from lldb.plugins.scripted_process import ScriptedProcess
7from lldb.plugins.scripted_process import ScriptedThread
8
9from lldb.macosx.crashlog import CrashLog, CrashLogParser
10
11
12class CrashLogScriptedProcess(ScriptedProcess):
13    def set_crashlog(self, crashlog):
14        self.crashlog = crashlog
15        if self.crashlog.process_id:
16            if type(self.crashlog.process_id) is int:
17                self.pid = self.crashlog.process_id
18            elif type(self.crashlog.process_id) is str:
19                self.pid = int(self.crashlog.process_id, 0)
20            else:
21                self.pid = super().get_process_id()
22        self.addr_mask = self.crashlog.addr_mask
23        self.crashed_thread_idx = self.crashlog.crashed_thread_idx
24        self.loaded_images = []
25        self.exception = self.crashlog.exception
26        self.app_specific_thread = None
27        if hasattr(self.crashlog, "asi"):
28            self.metadata["asi"] = self.crashlog.asi
29        if hasattr(self.crashlog, "asb"):
30            self.extended_thread_info = self.crashlog.asb
31
32        if self.load_all_images:
33            for image in self.crashlog.images:
34                image.resolve = True
35        else:
36            for thread in self.crashlog.threads:
37                if thread.did_crash():
38                    for ident in thread.idents:
39                        for image in self.crashlog.find_images_with_identifier(ident):
40                            image.resolve = True
41
42        with tempfile.TemporaryDirectory() as obj_dir:
43            for image in self.crashlog.images:
44                if image not in self.loaded_images:
45                    if image.uuid == uuid.UUID(int=0):
46                        continue
47                    err = image.add_module(self.target, obj_dir)
48                    if err:
49                        # Append to SBCommandReturnObject
50                        print(err)
51                    else:
52                        self.loaded_images.append(image)
53
54        for thread in self.crashlog.threads:
55            if (
56                hasattr(thread, "app_specific_backtrace")
57                and thread.app_specific_backtrace
58            ):
59                # We don't want to include the Application Specific Backtrace
60                # Thread into the Scripted Process' Thread list.
61                # Instead, we will try to extract the stackframe pcs from the
62                # backtrace and inject that as the extended thread info.
63                self.app_specific_thread = thread
64                continue
65
66            self.threads[thread.index] = CrashLogScriptedThread(self, None, thread)
67
68        if self.app_specific_thread:
69            self.extended_thread_info = CrashLogScriptedThread.resolve_stackframes(
70                self.app_specific_thread, self.addr_mask, self.target
71            )
72
73    def __init__(self, exe_ctx: lldb.SBExecutionContext, args: lldb.SBStructuredData):
74        super().__init__(exe_ctx, args)
75
76        if not self.target or not self.target.IsValid():
77            # Return error
78            return
79
80        self.crashlog_path = None
81
82        crashlog_path = args.GetValueForKey("file_path")
83        if crashlog_path and crashlog_path.IsValid():
84            if crashlog_path.GetType() == lldb.eStructuredDataTypeString:
85                self.crashlog_path = crashlog_path.GetStringValue(4096)
86
87        if not self.crashlog_path:
88            # Return error
89            return
90
91        load_all_images = args.GetValueForKey("load_all_images")
92        if load_all_images and load_all_images.IsValid():
93            if load_all_images.GetType() == lldb.eStructuredDataTypeBoolean:
94                self.load_all_images = load_all_images.GetBooleanValue()
95
96        if not self.load_all_images:
97            self.load_all_images = False
98
99        self.pid = super().get_process_id()
100        self.crashed_thread_idx = 0
101        self.exception = None
102        self.extended_thread_info = None
103
104    def read_memory_at_address(
105        self, addr: int, size: int, error: lldb.SBError
106    ) -> lldb.SBData:
107        # NOTE: CrashLogs don't contain any memory.
108        return lldb.SBData()
109
110    def get_loaded_images(self):
111        # TODO: Iterate over corefile_target modules and build a data structure
112        # from it.
113        return self.loaded_images
114
115    def should_stop(self) -> bool:
116        return True
117
118    def is_alive(self) -> bool:
119        return True
120
121    def get_scripted_thread_plugin(self):
122        return CrashLogScriptedThread.__module__ + "." + CrashLogScriptedThread.__name__
123
124    def get_process_metadata(self):
125        return self.metadata
126
127
128class CrashLogScriptedThread(ScriptedThread):
129    def create_register_ctx(self):
130        if not self.has_crashed:
131            return dict.fromkeys(
132                [*map(lambda reg: reg["name"], self.register_info["registers"])], 0
133            )
134
135        if not self.backing_thread or not len(self.backing_thread.registers):
136            return dict.fromkeys(
137                [*map(lambda reg: reg["name"], self.register_info["registers"])], 0
138            )
139
140        for reg in self.register_info["registers"]:
141            reg_name = reg["name"]
142            if reg_name in self.backing_thread.registers:
143                self.register_ctx[reg_name] = self.backing_thread.registers[reg_name]
144            else:
145                self.register_ctx[reg_name] = 0
146
147        return self.register_ctx
148
149    def resolve_stackframes(thread, addr_mask, target):
150        frames = []
151        for frame in thread.frames:
152            frame_pc = frame.pc & addr_mask
153            pc = frame_pc if frame.index == 0 or frame_pc == 0 else frame_pc - 1
154            sym_addr = lldb.SBAddress()
155            sym_addr.SetLoadAddress(pc, target)
156            if not sym_addr.IsValid():
157                continue
158            frames.append({"idx": frame.index, "pc": pc})
159        return frames
160
161    def create_stackframes(self):
162        if not (self.scripted_process.load_all_images or self.has_crashed):
163            return None
164
165        if not self.backing_thread or not len(self.backing_thread.frames):
166            return None
167
168        self.frames = CrashLogScriptedThread.resolve_stackframes(
169            self.backing_thread, self.scripted_process.addr_mask, self.target
170        )
171
172        return self.frames
173
174    def __init__(self, process, args, crashlog_thread):
175        super().__init__(process, args)
176
177        self.backing_thread = crashlog_thread
178        self.idx = self.backing_thread.index
179        self.tid = self.backing_thread.id
180        if self.backing_thread.app_specific_backtrace:
181            self.name = "Application Specific Backtrace"
182        else:
183            self.name = self.backing_thread.name
184        self.queue = self.backing_thread.queue
185        self.has_crashed = self.scripted_process.crashed_thread_idx == self.idx
186        self.create_stackframes()
187
188    def get_state(self):
189        if not self.has_crashed:
190            return lldb.eStateStopped
191        return lldb.eStateCrashed
192
193    def get_stop_reason(self) -> Dict[str, Any]:
194        if not self.has_crashed:
195            return {"type": lldb.eStopReasonNone}
196        # TODO: Investigate what stop reason should be reported when crashed
197        stop_reason = {"type": lldb.eStopReasonException, "data": {}}
198        if self.scripted_process.exception:
199            stop_reason["data"]["mach_exception"] = self.scripted_process.exception
200        return stop_reason
201
202    def get_register_context(self) -> str:
203        if not self.register_ctx:
204            self.register_ctx = self.create_register_ctx()
205
206        return struct.pack(
207            "{}Q".format(len(self.register_ctx)), *self.register_ctx.values()
208        )
209
210    def get_extended_info(self):
211        if self.has_crashed:
212            self.extended_info = self.scripted_process.extended_thread_info
213        return self.extended_info
214