1# DExTer : Debugging Experience Tester 2# ~~~~~~ ~ ~~ ~ ~~ 3# 4# Part of the LLVM Project, under the Apache License v2.0 with LLVM Exceptions. 5# See https://llvm.org/LICENSE.txt for license information. 6# SPDX-License-Identifier: Apache-2.0 WITH LLVM-exception 7"""Conditional Controller Class for DExTer.-""" 8 9 10import os 11import time 12from collections import defaultdict 13from itertools import chain 14 15from dex.debugger.DebuggerControllers.ControllerHelpers import ( 16 in_source_file, 17 update_step_watches, 18) 19from dex.debugger.DebuggerControllers.DebuggerControllerBase import ( 20 DebuggerControllerBase, 21) 22from dex.debugger.DebuggerBase import DebuggerBase 23from dex.utils.Exceptions import DebuggerException 24from dex.utils.Timeout import Timeout 25 26 27class BreakpointRange: 28 """A range of breakpoints and a set of conditions. 29 30 The leading breakpoint (on line `range_from`) is always active. 31 32 When the leading breakpoint is hit the trailing range should be activated 33 when `expression` evaluates to any value in `values`. If there are no 34 conditions (`expression` is None) then the trailing breakpoint range should 35 always be activated upon hitting the leading breakpoint. 36 37 Args: 38 expression: None for no conditions, or a str expression to compare 39 against `values`. 40 41 hit_count: None for no limit, or int to set the number of times the 42 leading breakpoint is triggered before it is removed. 43 """ 44 45 def __init__( 46 self, 47 expression: str, 48 path: str, 49 range_from: int, 50 range_to: int, 51 values: list, 52 hit_count: int, 53 finish_on_remove: bool, 54 ): 55 self.expression = expression 56 self.path = path 57 self.range_from = range_from 58 self.range_to = range_to 59 self.conditional_values = values 60 self.max_hit_count = hit_count 61 self.current_hit_count = 0 62 self.finish_on_remove = finish_on_remove 63 64 def has_conditions(self): 65 return self.expression is not None 66 67 def get_conditional_expression_list(self): 68 conditional_list = [] 69 for value in self.conditional_values: 70 # (<expression>) == (<value>) 71 conditional_expression = "({}) == ({})".format(self.expression, value) 72 conditional_list.append(conditional_expression) 73 return conditional_list 74 75 def add_hit(self): 76 self.current_hit_count += 1 77 78 def should_be_removed(self): 79 if self.max_hit_count is None: 80 return False 81 return self.current_hit_count >= self.max_hit_count 82 83 84class ConditionalController(DebuggerControllerBase): 85 def __init__(self, context, step_collection): 86 self._bp_ranges = None 87 self._watches = set() 88 self._step_index = 0 89 self._pause_between_steps = context.options.pause_between_steps 90 self._max_steps = context.options.max_steps 91 # Map {id: BreakpointRange} 92 self._leading_bp_handles = {} 93 super(ConditionalController, self).__init__(context, step_collection) 94 self._build_bp_ranges() 95 96 def _build_bp_ranges(self): 97 commands = self.step_collection.commands 98 self._bp_ranges = [] 99 try: 100 limit_commands = commands["DexLimitSteps"] 101 for lc in limit_commands: 102 bpr = BreakpointRange( 103 lc.expression, 104 lc.path, 105 lc.from_line, 106 lc.to_line, 107 lc.values, 108 lc.hit_count, 109 False, 110 ) 111 self._bp_ranges.append(bpr) 112 except KeyError: 113 raise DebuggerException( 114 "Missing DexLimitSteps commands, cannot conditionally step." 115 ) 116 if "DexFinishTest" in commands: 117 finish_commands = commands["DexFinishTest"] 118 for ic in finish_commands: 119 bpr = BreakpointRange( 120 ic.expression, 121 ic.path, 122 ic.on_line, 123 ic.on_line, 124 ic.values, 125 ic.hit_count + 1, 126 True, 127 ) 128 self._bp_ranges.append(bpr) 129 130 def _set_leading_bps(self): 131 # Set a leading breakpoint for each BreakpointRange, building a 132 # map of {leading bp id: BreakpointRange}. 133 for bpr in self._bp_ranges: 134 if bpr.has_conditions(): 135 # Add a conditional breakpoint for each condition. 136 for cond_expr in bpr.get_conditional_expression_list(): 137 id = self.debugger.add_conditional_breakpoint( 138 bpr.path, bpr.range_from, cond_expr 139 ) 140 self._leading_bp_handles[id] = bpr 141 else: 142 # Add an unconditional breakpoint. 143 id = self.debugger.add_breakpoint(bpr.path, bpr.range_from) 144 self._leading_bp_handles[id] = bpr 145 146 def _run_debugger_custom(self, cmdline): 147 # TODO: Add conditional and unconditional breakpoint support to dbgeng. 148 if self.debugger.get_name() == "dbgeng": 149 raise DebuggerException( 150 "DexLimitSteps commands are not supported by dbgeng" 151 ) 152 153 self.step_collection.clear_steps() 154 self._set_leading_bps() 155 156 for command_obj in chain.from_iterable(self.step_collection.commands.values()): 157 self._watches.update(command_obj.get_watches()) 158 159 self.debugger.launch(cmdline) 160 time.sleep(self._pause_between_steps) 161 162 exit_desired = False 163 timed_out = False 164 total_timeout = Timeout(self.context.options.timeout_total) 165 166 while not self.debugger.is_finished: 167 breakpoint_timeout = Timeout(self.context.options.timeout_breakpoint) 168 while self.debugger.is_running and not timed_out: 169 # Check to see whether we've timed out while we're waiting. 170 if total_timeout.timed_out(): 171 self.context.logger.error( 172 "Debugger session has been " 173 f"running for {total_timeout.elapsed}s, timeout reached!" 174 ) 175 timed_out = True 176 if breakpoint_timeout.timed_out(): 177 self.context.logger.error( 178 f"Debugger session has not " 179 f"hit a breakpoint for {breakpoint_timeout.elapsed}s, timeout " 180 "reached!" 181 ) 182 timed_out = True 183 184 if timed_out: 185 break 186 187 step_info = self.debugger.get_step_info(self._watches, self._step_index) 188 if step_info.current_frame: 189 self._step_index += 1 190 update_step_watches( 191 step_info, self._watches, self.step_collection.commands 192 ) 193 self.step_collection.new_step(self.context, step_info) 194 195 bp_to_delete = [] 196 for bp_id in self.debugger.get_triggered_breakpoint_ids(): 197 try: 198 # See if this is one of our leading breakpoints. 199 bpr = self._leading_bp_handles[bp_id] 200 except KeyError: 201 # This is a trailing bp. Mark it for removal. 202 bp_to_delete.append(bp_id) 203 continue 204 205 bpr.add_hit() 206 if bpr.should_be_removed(): 207 if bpr.finish_on_remove: 208 exit_desired = True 209 bp_to_delete.append(bp_id) 210 del self._leading_bp_handles[bp_id] 211 # Add a range of trailing breakpoints covering the lines 212 # requested in the DexLimitSteps command. Ignore first line as 213 # that's covered by the leading bp we just hit and include the 214 # final line. 215 for line in range(bpr.range_from + 1, bpr.range_to + 1): 216 self.debugger.add_breakpoint(bpr.path, line) 217 218 # Remove any trailing or expired leading breakpoints we just hit. 219 self.debugger.delete_breakpoints(bp_to_delete) 220 221 if exit_desired: 222 break 223 self.debugger.go() 224 time.sleep(self._pause_between_steps) 225