1import os, json, struct, signal, uuid, tempfile 2 3from typing import Any, Dict 4 5import lldb 6from lldb.plugins.scripted_process import ScriptedProcess 7from lldb.plugins.scripted_process import ScriptedThread 8 9from lldb.macosx.crashlog import CrashLog, CrashLogParser 10 11 12class CrashLogScriptedProcess(ScriptedProcess): 13 def set_crashlog(self, crashlog): 14 self.crashlog = crashlog 15 if self.crashlog.process_id: 16 if type(self.crashlog.process_id) is int: 17 self.pid = self.crashlog.process_id 18 elif type(self.crashlog.process_id) is str: 19 self.pid = int(self.crashlog.process_id, 0) 20 else: 21 self.pid = super().get_process_id() 22 self.addr_mask = self.crashlog.addr_mask 23 self.crashed_thread_idx = self.crashlog.crashed_thread_idx 24 self.loaded_images = [] 25 self.exception = self.crashlog.exception 26 self.app_specific_thread = None 27 if hasattr(self.crashlog, "asi"): 28 self.metadata["asi"] = self.crashlog.asi 29 if hasattr(self.crashlog, "asb"): 30 self.extended_thread_info = self.crashlog.asb 31 32 crashlog.load_images(self.options, self.loaded_images) 33 34 for thread in self.crashlog.threads: 35 if ( 36 hasattr(thread, "app_specific_backtrace") 37 and thread.app_specific_backtrace 38 ): 39 # We don't want to include the Application Specific Backtrace 40 # Thread into the Scripted Process' Thread list. 41 # Instead, we will try to extract the stackframe pcs from the 42 # backtrace and inject that as the extended thread info. 43 self.app_specific_thread = thread 44 continue 45 46 self.threads[thread.index] = CrashLogScriptedThread(self, None, thread) 47 48 if self.app_specific_thread: 49 self.extended_thread_info = CrashLogScriptedThread.resolve_stackframes( 50 self.app_specific_thread, self.addr_mask, self.target 51 ) 52 53 class CrashLogOptions: 54 load_all_images = False 55 crashed_only = True 56 no_parallel_image_loading = False 57 58 def __init__(self, exe_ctx: lldb.SBExecutionContext, args: lldb.SBStructuredData): 59 super().__init__(exe_ctx, args) 60 61 if not self.target or not self.target.IsValid(): 62 # Return error 63 return 64 65 self.crashlog_path = None 66 67 crashlog_path = args.GetValueForKey("file_path") 68 if crashlog_path and crashlog_path.IsValid(): 69 if crashlog_path.GetType() == lldb.eStructuredDataTypeString: 70 self.crashlog_path = crashlog_path.GetStringValue(4096) 71 72 if not self.crashlog_path: 73 # Return error 74 return 75 76 self.options = self.CrashLogOptions() 77 78 load_all_images = args.GetValueForKey("load_all_images") 79 if load_all_images and load_all_images.IsValid(): 80 if load_all_images.GetType() == lldb.eStructuredDataTypeBoolean: 81 self.options.load_all_images = load_all_images.GetBooleanValue() 82 83 crashed_only = args.GetValueForKey("crashed_only") 84 if crashed_only and crashed_only.IsValid(): 85 if crashed_only.GetType() == lldb.eStructuredDataTypeBoolean: 86 self.options.crashed_only = crashed_only.GetBooleanValue() 87 88 no_parallel_image_loading = args.GetValueForKey("no_parallel_image_loading") 89 if no_parallel_image_loading and no_parallel_image_loading.IsValid(): 90 if no_parallel_image_loading.GetType() == lldb.eStructuredDataTypeBoolean: 91 self.options.no_parallel_image_loading = ( 92 no_parallel_image_loading.GetBooleanValue() 93 ) 94 95 self.pid = super().get_process_id() 96 self.crashed_thread_idx = 0 97 self.exception = None 98 self.extended_thread_info = None 99 100 def read_memory_at_address( 101 self, addr: int, size: int, error: lldb.SBError 102 ) -> lldb.SBData: 103 # NOTE: CrashLogs don't contain any memory. 104 return lldb.SBData() 105 106 def get_loaded_images(self): 107 # TODO: Iterate over corefile_target modules and build a data structure 108 # from it. 109 return self.loaded_images 110 111 def should_stop(self) -> bool: 112 return True 113 114 def is_alive(self) -> bool: 115 return True 116 117 def get_scripted_thread_plugin(self): 118 return CrashLogScriptedThread.__module__ + "." + CrashLogScriptedThread.__name__ 119 120 def get_process_metadata(self): 121 return self.metadata 122 123 124class CrashLogScriptedThread(ScriptedThread): 125 def create_register_ctx(self): 126 if not self.has_crashed: 127 return dict.fromkeys( 128 [*map(lambda reg: reg["name"], self.register_info["registers"])], 0 129 ) 130 131 if not self.backing_thread or not len(self.backing_thread.registers): 132 return dict.fromkeys( 133 [*map(lambda reg: reg["name"], self.register_info["registers"])], 0 134 ) 135 136 for reg in self.register_info["registers"]: 137 reg_name = reg["name"] 138 if reg_name in self.backing_thread.registers: 139 self.register_ctx[reg_name] = self.backing_thread.registers[reg_name] 140 else: 141 self.register_ctx[reg_name] = 0 142 143 return self.register_ctx 144 145 def resolve_stackframes(thread, addr_mask, target): 146 frames = [] 147 for frame in thread.frames: 148 frame_pc = frame.pc & addr_mask 149 pc = frame_pc if frame.index == 0 or frame_pc == 0 else frame_pc - 1 150 sym_addr = lldb.SBAddress() 151 sym_addr.SetLoadAddress(pc, target) 152 if not sym_addr.IsValid(): 153 continue 154 frames.append({"idx": frame.index, "pc": pc}) 155 return frames 156 157 def create_stackframes(self): 158 if not (self.originating_process.options.load_all_images or self.has_crashed): 159 return None 160 161 if not self.backing_thread or not len(self.backing_thread.frames): 162 return None 163 164 self.frames = CrashLogScriptedThread.resolve_stackframes( 165 self.backing_thread, self.originating_process.addr_mask, self.target 166 ) 167 168 return self.frames 169 170 def __init__(self, process, args, crashlog_thread): 171 super().__init__(process, args) 172 173 self.backing_thread = crashlog_thread 174 self.idx = self.backing_thread.index 175 self.tid = self.backing_thread.id 176 self.name = self.backing_thread.name 177 self.queue = self.backing_thread.queue 178 self.has_crashed = self.originating_process.crashed_thread_idx == self.idx 179 self.create_stackframes() 180 181 def get_state(self): 182 if not self.has_crashed: 183 return lldb.eStateStopped 184 return lldb.eStateCrashed 185 186 def get_stop_reason(self) -> Dict[str, Any]: 187 if not self.has_crashed: 188 return {"type": lldb.eStopReasonNone} 189 # TODO: Investigate what stop reason should be reported when crashed 190 stop_reason = {"type": lldb.eStopReasonException, "data": {}} 191 if self.originating_process.exception: 192 stop_reason["data"]["mach_exception"] = self.originating_process.exception 193 return stop_reason 194 195 def get_register_context(self) -> str: 196 if not self.register_ctx: 197 self.register_ctx = self.create_register_ctx() 198 199 return struct.pack( 200 "{}Q".format(len(self.register_ctx)), *self.register_ctx.values() 201 ) 202 203 def get_extended_info(self): 204 if self.has_crashed: 205 self.extended_info = self.originating_process.extended_thread_info 206 return self.extended_info 207