1# DExTer : Debugging Experience Tester 2# ~~~~~~ ~ ~~ ~ ~~ 3# 4# Part of the LLVM Project, under the Apache License v2.0 with LLVM Exceptions. 5# See https://llvm.org/LICENSE.txt for license information. 6# SPDX-License-Identifier: Apache-2.0 WITH LLVM-exception 7"""Conditional Controller Class for DExTer.-""" 8 9 10import os 11import time 12from collections import defaultdict 13from itertools import chain 14 15from dex.debugger.DebuggerControllers.ControllerHelpers import in_source_file, update_step_watches 16from dex.debugger.DebuggerControllers.DebuggerControllerBase import DebuggerControllerBase 17from dex.debugger.DebuggerBase import DebuggerBase 18from dex.utils.Exceptions import DebuggerException 19from dex.utils.Timeout import Timeout 20 21 22class BreakpointRange: 23 """A range of breakpoints and a set of conditions. 24 25 The leading breakpoint (on line `range_from`) is always active. 26 27 When the leading breakpoint is hit the trailing range should be activated 28 when `expression` evaluates to any value in `values`. If there are no 29 conditions (`expression` is None) then the trailing breakpoint range should 30 always be activated upon hitting the leading breakpoint. 31 32 Args: 33 expression: None for no conditions, or a str expression to compare 34 against `values`. 35 36 hit_count: None for no limit, or int to set the number of times the 37 leading breakpoint is triggered before it is removed. 38 """ 39 40 def __init__(self, expression: str, path: str, range_from: int, range_to: int, 41 values: list, hit_count: int, finish_on_remove: bool): 42 self.expression = expression 43 self.path = path 44 self.range_from = range_from 45 self.range_to = range_to 46 self.conditional_values = values 47 self.max_hit_count = hit_count 48 self.current_hit_count = 0 49 self.finish_on_remove = finish_on_remove 50 51 def has_conditions(self): 52 return self.expression != None 53 54 def get_conditional_expression_list(self): 55 conditional_list = [] 56 for value in self.conditional_values: 57 # (<expression>) == (<value>) 58 conditional_expression = '({}) == ({})'.format(self.expression, value) 59 conditional_list.append(conditional_expression) 60 return conditional_list 61 62 def add_hit(self): 63 self.current_hit_count += 1 64 65 def should_be_removed(self): 66 if self.max_hit_count == None: 67 return False 68 return self.current_hit_count >= self.max_hit_count 69 70 71class ConditionalController(DebuggerControllerBase): 72 def __init__(self, context, step_collection): 73 self._bp_ranges = None 74 self._watches = set() 75 self._step_index = 0 76 self._pause_between_steps = context.options.pause_between_steps 77 self._max_steps = context.options.max_steps 78 # Map {id: BreakpointRange} 79 self._leading_bp_handles = {} 80 super(ConditionalController, self).__init__(context, step_collection) 81 self._build_bp_ranges() 82 83 def _build_bp_ranges(self): 84 commands = self.step_collection.commands 85 self._bp_ranges = [] 86 try: 87 limit_commands = commands['DexLimitSteps'] 88 for lc in limit_commands: 89 bpr = BreakpointRange( 90 lc.expression, 91 lc.path, 92 lc.from_line, 93 lc.to_line, 94 lc.values, 95 lc.hit_count, 96 False) 97 self._bp_ranges.append(bpr) 98 except KeyError: 99 raise DebuggerException('Missing DexLimitSteps commands, cannot conditionally step.') 100 if 'DexFinishTest' in commands: 101 finish_commands = commands['DexFinishTest'] 102 for ic in finish_commands: 103 bpr = BreakpointRange( 104 ic.expression, 105 ic.path, 106 ic.on_line, 107 ic.on_line, 108 ic.values, 109 ic.hit_count + 1, 110 True) 111 self._bp_ranges.append(bpr) 112 113 def _set_leading_bps(self): 114 # Set a leading breakpoint for each BreakpointRange, building a 115 # map of {leading bp id: BreakpointRange}. 116 for bpr in self._bp_ranges: 117 if bpr.has_conditions(): 118 # Add a conditional breakpoint for each condition. 119 for cond_expr in bpr.get_conditional_expression_list(): 120 id = self.debugger.add_conditional_breakpoint(bpr.path, 121 bpr.range_from, 122 cond_expr) 123 self._leading_bp_handles[id] = bpr 124 else: 125 # Add an unconditional breakpoint. 126 id = self.debugger.add_breakpoint(bpr.path, bpr.range_from) 127 self._leading_bp_handles[id] = bpr 128 129 def _run_debugger_custom(self, cmdline): 130 # TODO: Add conditional and unconditional breakpoint support to dbgeng. 131 if self.debugger.get_name() == 'dbgeng': 132 raise DebuggerException('DexLimitSteps commands are not supported by dbgeng') 133 134 self.step_collection.clear_steps() 135 self._set_leading_bps() 136 137 for command_obj in chain.from_iterable(self.step_collection.commands.values()): 138 self._watches.update(command_obj.get_watches()) 139 140 self.debugger.launch(cmdline) 141 time.sleep(self._pause_between_steps) 142 143 exit_desired = False 144 timed_out = False 145 total_timeout = Timeout(self.context.options.timeout_total) 146 147 while not self.debugger.is_finished: 148 149 breakpoint_timeout = Timeout(self.context.options.timeout_breakpoint) 150 while self.debugger.is_running and not timed_out: 151 # Check to see whether we've timed out while we're waiting. 152 if total_timeout.timed_out(): 153 self.context.logger.error('Debugger session has been ' 154 f'running for {total_timeout.elapsed}s, timeout reached!') 155 timed_out = True 156 if breakpoint_timeout.timed_out(): 157 self.context.logger.error(f'Debugger session has not ' 158 f'hit a breakpoint for {breakpoint_timeout.elapsed}s, timeout ' 159 'reached!') 160 timed_out = True 161 162 if timed_out: 163 break 164 165 step_info = self.debugger.get_step_info(self._watches, self._step_index) 166 if step_info.current_frame: 167 self._step_index += 1 168 update_step_watches(step_info, self._watches, self.step_collection.commands) 169 self.step_collection.new_step(self.context, step_info) 170 171 bp_to_delete = [] 172 for bp_id in self.debugger.get_triggered_breakpoint_ids(): 173 try: 174 # See if this is one of our leading breakpoints. 175 bpr = self._leading_bp_handles[bp_id] 176 except KeyError: 177 # This is a trailing bp. Mark it for removal. 178 bp_to_delete.append(bp_id) 179 continue 180 181 bpr.add_hit() 182 if bpr.should_be_removed(): 183 if bpr.finish_on_remove: 184 exit_desired = True 185 bp_to_delete.append(bp_id) 186 del self._leading_bp_handles[bp_id] 187 # Add a range of trailing breakpoints covering the lines 188 # requested in the DexLimitSteps command. Ignore first line as 189 # that's covered by the leading bp we just hit and include the 190 # final line. 191 for line in range(bpr.range_from + 1, bpr.range_to + 1): 192 self.debugger.add_breakpoint(bpr.path, line) 193 194 # Remove any trailing or expired leading breakpoints we just hit. 195 self.debugger.delete_breakpoints(bp_to_delete) 196 197 if exit_desired: 198 break 199 self.debugger.go() 200 time.sleep(self._pause_between_steps) 201