1# DExTer : Debugging Experience Tester
2# ~~~~~~   ~         ~~         ~   ~~
3#
4# Part of the LLVM Project, under the Apache License v2.0 with LLVM Exceptions.
5# See https://llvm.org/LICENSE.txt for license information.
6# SPDX-License-Identifier: Apache-2.0 WITH LLVM-exception
7"""Conditional Controller Class for DExTer.-"""
8
9
10import os
11import time
12from collections import defaultdict
13from itertools import chain
14
15from dex.debugger.DebuggerControllers.ControllerHelpers import in_source_file, update_step_watches
16from dex.debugger.DebuggerControllers.DebuggerControllerBase import DebuggerControllerBase
17from dex.debugger.DebuggerBase import DebuggerBase
18from dex.utils.Exceptions import DebuggerException
19
20
21class BreakpointRange:
22    """A range of breakpoints and a set of conditions.
23
24    The leading breakpoint (on line `range_from`) is always active.
25
26    When the leading breakpoint is hit the trailing range should be activated
27    when `expression` evaluates to any value in `values`. If there are no
28    conditions (`expression` is None) then the trailing breakpoint range should
29    always be activated upon hitting the leading breakpoint.
30
31    Args:
32       expression: None for no conditions, or a str expression to compare
33       against `values`.
34
35       hit_count: None for no limit, or int to set the number of times the
36                  leading breakpoint is triggered before it is removed.
37    """
38
39    def __init__(self, expression: str, path: str, range_from: int, range_to: int,
40                 values: list, hit_count: int):
41        self.expression = expression
42        self.path = path
43        self.range_from = range_from
44        self.range_to = range_to
45        self.conditional_values = values
46        self.max_hit_count = hit_count
47        self.current_hit_count = 0
48
49    def has_conditions(self):
50        return self.expression != None
51
52    def get_conditional_expression_list(self):
53        conditional_list = []
54        for value in self.conditional_values:
55            # (<expression>) == (<value>)
56            conditional_expression = '({}) == ({})'.format(self.expression, value)
57            conditional_list.append(conditional_expression)
58        return conditional_list
59
60    def add_hit(self):
61        self.current_hit_count += 1
62
63    def should_be_removed(self):
64        if self.max_hit_count == None:
65            return False
66        return self.current_hit_count >= self.max_hit_count
67
68
69class ConditionalController(DebuggerControllerBase):
70    def __init__(self, context, step_collection):
71      self.context = context
72      self.step_collection = step_collection
73      self._bp_ranges = None
74      self._build_bp_ranges()
75      self._watches = set()
76      self._step_index = 0
77      self._pause_between_steps = context.options.pause_between_steps
78      self._max_steps = context.options.max_steps
79      # Map {id: BreakpointRange}
80      self._leading_bp_handles = {}
81
82    def _build_bp_ranges(self):
83        commands = self.step_collection.commands
84        self._bp_ranges = []
85        try:
86            limit_commands = commands['DexLimitSteps']
87            for lc in limit_commands:
88                bpr = BreakpointRange(
89                  lc.expression,
90                  lc.path,
91                  lc.from_line,
92                  lc.to_line,
93                  lc.values,
94                  lc.hit_count)
95                self._bp_ranges.append(bpr)
96        except KeyError:
97            raise DebuggerException('Missing DexLimitSteps commands, cannot conditionally step.')
98
99    def _set_leading_bps(self):
100        # Set a leading breakpoint for each BreakpointRange, building a
101        # map of {leading bp id: BreakpointRange}.
102        for bpr in self._bp_ranges:
103            if bpr.has_conditions():
104                # Add a conditional breakpoint for each condition.
105                for cond_expr in bpr.get_conditional_expression_list():
106                    id = self.debugger.add_conditional_breakpoint(bpr.path,
107                                                                  bpr.range_from,
108                                                                  cond_expr)
109                    self._leading_bp_handles[id] = bpr
110            else:
111                # Add an unconditional breakpoint.
112                id = self.debugger.add_breakpoint(bpr.path, bpr.range_from)
113                self._leading_bp_handles[id] = bpr
114
115    def _run_debugger_custom(self):
116        # TODO: Add conditional and unconditional breakpoint support to dbgeng.
117        if self.debugger.get_name() == 'dbgeng':
118            raise DebuggerException('DexLimitSteps commands are not supported by dbgeng')
119
120        self.step_collection.clear_steps()
121        self._set_leading_bps()
122
123        for command_obj in chain.from_iterable(self.step_collection.commands.values()):
124            self._watches.update(command_obj.get_watches())
125
126        self.debugger.launch()
127        time.sleep(self._pause_between_steps)
128        while not self.debugger.is_finished:
129            while self.debugger.is_running:
130                pass
131
132            step_info = self.debugger.get_step_info(self._watches, self._step_index)
133            if step_info.current_frame:
134                self._step_index += 1
135                update_step_watches(step_info, self._watches, self.step_collection.commands)
136                self.step_collection.new_step(self.context, step_info)
137
138            bp_to_delete = []
139            for bp_id in self.debugger.get_triggered_breakpoint_ids():
140                try:
141                    # See if this is one of our leading breakpoints.
142                    bpr = self._leading_bp_handles[bp_id]
143                except KeyError:
144                    # This is a trailing bp. Mark it for removal.
145                    bp_to_delete.append(bp_id)
146                    continue
147
148                bpr.add_hit()
149                if bpr.should_be_removed():
150                    bp_to_delete.append(bp_id)
151                    del self._leading_bp_handles[bp_id]
152                # Add a range of trailing breakpoints covering the lines
153                # requested in the DexLimitSteps command. Ignore first line as
154                # that's covered by the leading bp we just hit and include the
155                # final line.
156                for line in range(bpr.range_from + 1, bpr.range_to + 1):
157                    self.debugger.add_breakpoint(bpr.path, line)
158
159            # Remove any trailing or expired leading breakpoints we just hit.
160            for bp_id in bp_to_delete:
161                self.debugger.delete_breakpoint(bp_id)
162
163            self.debugger.go()
164            time.sleep(self._pause_between_steps)
165