1# DExTer : Debugging Experience Tester
2# ~~~~~~   ~         ~~         ~   ~~
3#
4# Part of the LLVM Project, under the Apache License v2.0 with LLVM Exceptions.
5# See https://llvm.org/LICENSE.txt for license information.
6# SPDX-License-Identifier: Apache-2.0 WITH LLVM-exception
7"""Conditional Controller Class for DExTer.-"""
8
9
10import os
11import time
12from collections import defaultdict
13from itertools import chain
14
15from dex.debugger.DebuggerControllers.ControllerHelpers import in_source_file, update_step_watches
16from dex.debugger.DebuggerControllers.DebuggerControllerBase import DebuggerControllerBase
17from dex.debugger.DebuggerBase import DebuggerBase
18from dex.utils.Exceptions import DebuggerException
19from dex.utils.Timeout import Timeout
20
21
22class BreakpointRange:
23    """A range of breakpoints and a set of conditions.
24
25    The leading breakpoint (on line `range_from`) is always active.
26
27    When the leading breakpoint is hit the trailing range should be activated
28    when `expression` evaluates to any value in `values`. If there are no
29    conditions (`expression` is None) then the trailing breakpoint range should
30    always be activated upon hitting the leading breakpoint.
31
32    Args:
33       expression: None for no conditions, or a str expression to compare
34       against `values`.
35
36       hit_count: None for no limit, or int to set the number of times the
37                  leading breakpoint is triggered before it is removed.
38    """
39
40    def __init__(self, expression: str, path: str, range_from: int, range_to: int,
41                 values: list, hit_count: int, finish_on_remove: bool):
42        self.expression = expression
43        self.path = path
44        self.range_from = range_from
45        self.range_to = range_to
46        self.conditional_values = values
47        self.max_hit_count = hit_count
48        self.current_hit_count = 0
49        self.finish_on_remove = finish_on_remove
50
51    def has_conditions(self):
52        return self.expression != None
53
54    def get_conditional_expression_list(self):
55        conditional_list = []
56        for value in self.conditional_values:
57            # (<expression>) == (<value>)
58            conditional_expression = '({}) == ({})'.format(self.expression, value)
59            conditional_list.append(conditional_expression)
60        return conditional_list
61
62    def add_hit(self):
63        self.current_hit_count += 1
64
65    def should_be_removed(self):
66        if self.max_hit_count == None:
67            return False
68        return self.current_hit_count >= self.max_hit_count
69
70
71class ConditionalController(DebuggerControllerBase):
72    def __init__(self, context, step_collection):
73      self._bp_ranges = None
74      self._watches = set()
75      self._step_index = 0
76      self._pause_between_steps = context.options.pause_between_steps
77      self._max_steps = context.options.max_steps
78      # Map {id: BreakpointRange}
79      self._leading_bp_handles = {}
80      super(ConditionalController, self).__init__(context, step_collection)
81      self._build_bp_ranges()
82
83    def _build_bp_ranges(self):
84        commands = self.step_collection.commands
85        self._bp_ranges = []
86        try:
87            limit_commands = commands['DexLimitSteps']
88            for lc in limit_commands:
89                bpr = BreakpointRange(
90                  lc.expression,
91                  lc.path,
92                  lc.from_line,
93                  lc.to_line,
94                  lc.values,
95                  lc.hit_count,
96                  False)
97                self._bp_ranges.append(bpr)
98        except KeyError:
99            raise DebuggerException('Missing DexLimitSteps commands, cannot conditionally step.')
100        if 'DexFinishTest' in commands:
101            finish_commands = commands['DexFinishTest']
102            for ic in finish_commands:
103                bpr = BreakpointRange(
104                  ic.expression,
105                  ic.path,
106                  ic.on_line,
107                  ic.on_line,
108                  ic.values,
109                  ic.hit_count + 1,
110                  True)
111                self._bp_ranges.append(bpr)
112
113    def _set_leading_bps(self):
114        # Set a leading breakpoint for each BreakpointRange, building a
115        # map of {leading bp id: BreakpointRange}.
116        for bpr in self._bp_ranges:
117            if bpr.has_conditions():
118                # Add a conditional breakpoint for each condition.
119                for cond_expr in bpr.get_conditional_expression_list():
120                    id = self.debugger.add_conditional_breakpoint(bpr.path,
121                                                                  bpr.range_from,
122                                                                  cond_expr)
123                    self._leading_bp_handles[id] = bpr
124            else:
125                # Add an unconditional breakpoint.
126                id = self.debugger.add_breakpoint(bpr.path, bpr.range_from)
127                self._leading_bp_handles[id] = bpr
128
129    def _run_debugger_custom(self, cmdline):
130        # TODO: Add conditional and unconditional breakpoint support to dbgeng.
131        if self.debugger.get_name() == 'dbgeng':
132            raise DebuggerException('DexLimitSteps commands are not supported by dbgeng')
133
134        self.step_collection.clear_steps()
135        self._set_leading_bps()
136
137        for command_obj in chain.from_iterable(self.step_collection.commands.values()):
138            self._watches.update(command_obj.get_watches())
139
140        self.debugger.launch(cmdline)
141        time.sleep(self._pause_between_steps)
142
143        exit_desired = False
144        timed_out = False
145        total_timeout = Timeout(self.context.options.timeout_total)
146
147        while not self.debugger.is_finished:
148
149            breakpoint_timeout = Timeout(self.context.options.timeout_breakpoint)
150            while self.debugger.is_running and not timed_out:
151                # Check to see whether we've timed out while we're waiting.
152                if total_timeout.timed_out():
153                    self.context.logger.error('Debugger session has been '
154                        f'running for {total_timeout.elapsed}s, timeout reached!')
155                    timed_out = True
156                if breakpoint_timeout.timed_out():
157                    self.context.logger.error(f'Debugger session has not '
158                        f'hit a breakpoint for {breakpoint_timeout.elapsed}s, timeout '
159                        'reached!')
160                    timed_out = True
161
162            if timed_out:
163                break
164
165            step_info = self.debugger.get_step_info(self._watches, self._step_index)
166            if step_info.current_frame:
167                self._step_index += 1
168                update_step_watches(step_info, self._watches, self.step_collection.commands)
169                self.step_collection.new_step(self.context, step_info)
170
171            bp_to_delete = []
172            for bp_id in self.debugger.get_triggered_breakpoint_ids():
173                try:
174                    # See if this is one of our leading breakpoints.
175                    bpr = self._leading_bp_handles[bp_id]
176                except KeyError:
177                    # This is a trailing bp. Mark it for removal.
178                    bp_to_delete.append(bp_id)
179                    continue
180
181                bpr.add_hit()
182                if bpr.should_be_removed():
183                    if bpr.finish_on_remove:
184                        exit_desired = True
185                    bp_to_delete.append(bp_id)
186                    del self._leading_bp_handles[bp_id]
187                # Add a range of trailing breakpoints covering the lines
188                # requested in the DexLimitSteps command. Ignore first line as
189                # that's covered by the leading bp we just hit and include the
190                # final line.
191                for line in range(bpr.range_from + 1, bpr.range_to + 1):
192                    self.debugger.add_breakpoint(bpr.path, line)
193
194            # Remove any trailing or expired leading breakpoints we just hit.
195            self.debugger.delete_breakpoints(bp_to_delete)
196
197            if exit_desired:
198                break
199            self.debugger.go()
200            time.sleep(self._pause_between_steps)
201