1import os, json, struct, signal, uuid, tempfile 2 3from typing import Any, Dict 4 5import lldb 6from lldb.plugins.scripted_process import ScriptedProcess 7from lldb.plugins.scripted_process import ScriptedThread 8 9from lldb.macosx.crashlog import CrashLog, CrashLogParser 10 11 12class CrashLogScriptedProcess(ScriptedProcess): 13 def set_crashlog(self, crashlog): 14 self.crashlog = crashlog 15 if self.crashlog.process_id: 16 if type(self.crashlog.process_id) is int: 17 self.pid = self.crashlog.process_id 18 elif type(self.crashlog.process_id) is str: 19 self.pid = int(self.crashlog.process_id, 0) 20 else: 21 self.pid = super().get_process_id() 22 self.addr_mask = self.crashlog.addr_mask 23 self.crashed_thread_idx = self.crashlog.crashed_thread_idx 24 self.loaded_images = [] 25 self.exception = self.crashlog.exception 26 self.app_specific_thread = None 27 if hasattr(self.crashlog, "asi"): 28 self.metadata["asi"] = self.crashlog.asi 29 if hasattr(self.crashlog, "asb"): 30 self.extended_thread_info = self.crashlog.asb 31 32 if self.load_all_images: 33 for image in self.crashlog.images: 34 image.resolve = True 35 else: 36 for thread in self.crashlog.threads: 37 if thread.did_crash(): 38 for ident in thread.idents: 39 for image in self.crashlog.find_images_with_identifier(ident): 40 image.resolve = True 41 42 with tempfile.TemporaryDirectory() as obj_dir: 43 for image in self.crashlog.images: 44 if image not in self.loaded_images: 45 if image.uuid == uuid.UUID(int=0): 46 continue 47 err = image.add_module(self.target, obj_dir) 48 if err: 49 # Append to SBCommandReturnObject 50 print(err) 51 else: 52 self.loaded_images.append(image) 53 54 for thread in self.crashlog.threads: 55 if ( 56 hasattr(thread, "app_specific_backtrace") 57 and thread.app_specific_backtrace 58 ): 59 # We don't want to include the Application Specific Backtrace 60 # Thread into the Scripted Process' Thread list. 61 # Instead, we will try to extract the stackframe pcs from the 62 # backtrace and inject that as the extended thread info. 63 self.app_specific_thread = thread 64 continue 65 66 self.threads[thread.index] = CrashLogScriptedThread(self, None, thread) 67 68 if self.app_specific_thread: 69 self.extended_thread_info = CrashLogScriptedThread.resolve_stackframes( 70 self.app_specific_thread, self.addr_mask, self.target 71 ) 72 73 def __init__(self, exe_ctx: lldb.SBExecutionContext, args: lldb.SBStructuredData): 74 super().__init__(exe_ctx, args) 75 76 if not self.target or not self.target.IsValid(): 77 # Return error 78 return 79 80 self.crashlog_path = None 81 82 crashlog_path = args.GetValueForKey("file_path") 83 if crashlog_path and crashlog_path.IsValid(): 84 if crashlog_path.GetType() == lldb.eStructuredDataTypeString: 85 self.crashlog_path = crashlog_path.GetStringValue(4096) 86 87 if not self.crashlog_path: 88 # Return error 89 return 90 91 load_all_images = args.GetValueForKey("load_all_images") 92 if load_all_images and load_all_images.IsValid(): 93 if load_all_images.GetType() == lldb.eStructuredDataTypeBoolean: 94 self.load_all_images = load_all_images.GetBooleanValue() 95 96 if not self.load_all_images: 97 self.load_all_images = False 98 99 self.pid = super().get_process_id() 100 self.crashed_thread_idx = 0 101 self.exception = None 102 self.extended_thread_info = None 103 104 def read_memory_at_address( 105 self, addr: int, size: int, error: lldb.SBError 106 ) -> lldb.SBData: 107 # NOTE: CrashLogs don't contain any memory. 108 return lldb.SBData() 109 110 def get_loaded_images(self): 111 # TODO: Iterate over corefile_target modules and build a data structure 112 # from it. 113 return self.loaded_images 114 115 def should_stop(self) -> bool: 116 return True 117 118 def is_alive(self) -> bool: 119 return True 120 121 def get_scripted_thread_plugin(self): 122 return CrashLogScriptedThread.__module__ + "." + CrashLogScriptedThread.__name__ 123 124 def get_process_metadata(self): 125 return self.metadata 126 127 128class CrashLogScriptedThread(ScriptedThread): 129 def create_register_ctx(self): 130 if not self.has_crashed: 131 return dict.fromkeys( 132 [*map(lambda reg: reg["name"], self.register_info["registers"])], 0 133 ) 134 135 if not self.backing_thread or not len(self.backing_thread.registers): 136 return dict.fromkeys( 137 [*map(lambda reg: reg["name"], self.register_info["registers"])], 0 138 ) 139 140 for reg in self.register_info["registers"]: 141 reg_name = reg["name"] 142 if reg_name in self.backing_thread.registers: 143 self.register_ctx[reg_name] = self.backing_thread.registers[reg_name] 144 else: 145 self.register_ctx[reg_name] = 0 146 147 return self.register_ctx 148 149 def resolve_stackframes(thread, addr_mask, target): 150 frames = [] 151 for frame in thread.frames: 152 frame_pc = frame.pc & addr_mask 153 pc = frame_pc if frame.index == 0 or frame_pc == 0 else frame_pc - 1 154 sym_addr = lldb.SBAddress() 155 sym_addr.SetLoadAddress(pc, target) 156 if not sym_addr.IsValid(): 157 continue 158 frames.append({"idx": frame.index, "pc": pc}) 159 return frames 160 161 def create_stackframes(self): 162 if not (self.scripted_process.load_all_images or self.has_crashed): 163 return None 164 165 if not self.backing_thread or not len(self.backing_thread.frames): 166 return None 167 168 self.frames = CrashLogScriptedThread.resolve_stackframes( 169 self.backing_thread, self.scripted_process.addr_mask, self.target 170 ) 171 172 return self.frames 173 174 def __init__(self, process, args, crashlog_thread): 175 super().__init__(process, args) 176 177 self.backing_thread = crashlog_thread 178 self.idx = self.backing_thread.index 179 self.tid = self.backing_thread.id 180 if self.backing_thread.app_specific_backtrace: 181 self.name = "Application Specific Backtrace" 182 else: 183 self.name = self.backing_thread.name 184 self.queue = self.backing_thread.queue 185 self.has_crashed = self.scripted_process.crashed_thread_idx == self.idx 186 self.create_stackframes() 187 188 def get_state(self): 189 if not self.has_crashed: 190 return lldb.eStateStopped 191 return lldb.eStateCrashed 192 193 def get_stop_reason(self) -> Dict[str, Any]: 194 if not self.has_crashed: 195 return {"type": lldb.eStopReasonNone} 196 # TODO: Investigate what stop reason should be reported when crashed 197 stop_reason = {"type": lldb.eStopReasonException, "data": {}} 198 if self.scripted_process.exception: 199 stop_reason["data"]["mach_exception"] = self.scripted_process.exception 200 return stop_reason 201 202 def get_register_context(self) -> str: 203 if not self.register_ctx: 204 self.register_ctx = self.create_register_ctx() 205 206 return struct.pack( 207 "{}Q".format(len(self.register_ctx)), *self.register_ctx.values() 208 ) 209 210 def get_extended_info(self): 211 if self.has_crashed: 212 self.extended_info = self.scripted_process.extended_thread_info 213 return self.extended_info 214