1# DExTer : Debugging Experience Tester
2# ~~~~~~   ~         ~~         ~   ~~
3#
4# Part of the LLVM Project, under the Apache License v2.0 with LLVM Exceptions.
5# See https://llvm.org/LICENSE.txt for license information.
6# SPDX-License-Identifier: Apache-2.0 WITH LLVM-exception
7"""Conditional Controller Class for DExTer.-"""
8
9
10import os
11import time
12from collections import defaultdict
13from itertools import chain
14
15from dex.debugger.DebuggerControllers.ControllerHelpers import (
16    in_source_file,
17    update_step_watches,
18)
19from dex.debugger.DebuggerControllers.DebuggerControllerBase import (
20    DebuggerControllerBase,
21)
22from dex.debugger.DebuggerBase import DebuggerBase
23from dex.utils.Exceptions import DebuggerException
24from dex.utils.Timeout import Timeout
25
26
27class BreakpointRange:
28    """A range of breakpoints and a set of conditions.
29
30    The leading breakpoint (on line `range_from`) is always active.
31
32    When the leading breakpoint is hit the trailing range should be activated
33    when `expression` evaluates to any value in `values`. If there are no
34    conditions (`expression` is None) then the trailing breakpoint range should
35    always be activated upon hitting the leading breakpoint.
36
37    Args:
38       expression: None for no conditions, or a str expression to compare
39       against `values`.
40
41       hit_count: None for no limit, or int to set the number of times the
42                  leading breakpoint is triggered before it is removed.
43    """
44
45    def __init__(
46        self,
47        expression: str,
48        path: str,
49        range_from: int,
50        range_to: int,
51        values: list,
52        hit_count: int,
53        finish_on_remove: bool,
54    ):
55        self.expression = expression
56        self.path = path
57        self.range_from = range_from
58        self.range_to = range_to
59        self.conditional_values = values
60        self.max_hit_count = hit_count
61        self.current_hit_count = 0
62        self.finish_on_remove = finish_on_remove
63
64    def has_conditions(self):
65        return self.expression is not None
66
67    def get_conditional_expression_list(self):
68        conditional_list = []
69        for value in self.conditional_values:
70            # (<expression>) == (<value>)
71            conditional_expression = "({}) == ({})".format(self.expression, value)
72            conditional_list.append(conditional_expression)
73        return conditional_list
74
75    def add_hit(self):
76        self.current_hit_count += 1
77
78    def should_be_removed(self):
79        if self.max_hit_count is None:
80            return False
81        return self.current_hit_count >= self.max_hit_count
82
83
84class ConditionalController(DebuggerControllerBase):
85    def __init__(self, context, step_collection):
86        self._bp_ranges = None
87        self._watches = set()
88        self._step_index = 0
89        self._pause_between_steps = context.options.pause_between_steps
90        self._max_steps = context.options.max_steps
91        # Map {id: BreakpointRange}
92        self._leading_bp_handles = {}
93        super(ConditionalController, self).__init__(context, step_collection)
94        self._build_bp_ranges()
95
96    def _build_bp_ranges(self):
97        commands = self.step_collection.commands
98        self._bp_ranges = []
99        try:
100            limit_commands = commands["DexLimitSteps"]
101            for lc in limit_commands:
102                bpr = BreakpointRange(
103                    lc.expression,
104                    lc.path,
105                    lc.from_line,
106                    lc.to_line,
107                    lc.values,
108                    lc.hit_count,
109                    False,
110                )
111                self._bp_ranges.append(bpr)
112        except KeyError:
113            raise DebuggerException(
114                "Missing DexLimitSteps commands, cannot conditionally step."
115            )
116        if "DexFinishTest" in commands:
117            finish_commands = commands["DexFinishTest"]
118            for ic in finish_commands:
119                bpr = BreakpointRange(
120                    ic.expression,
121                    ic.path,
122                    ic.on_line,
123                    ic.on_line,
124                    ic.values,
125                    ic.hit_count + 1,
126                    True,
127                )
128                self._bp_ranges.append(bpr)
129
130    def _set_leading_bps(self):
131        # Set a leading breakpoint for each BreakpointRange, building a
132        # map of {leading bp id: BreakpointRange}.
133        for bpr in self._bp_ranges:
134            if bpr.has_conditions():
135                # Add a conditional breakpoint for each condition.
136                for cond_expr in bpr.get_conditional_expression_list():
137                    id = self.debugger.add_conditional_breakpoint(
138                        bpr.path, bpr.range_from, cond_expr
139                    )
140                    self._leading_bp_handles[id] = bpr
141            else:
142                # Add an unconditional breakpoint.
143                id = self.debugger.add_breakpoint(bpr.path, bpr.range_from)
144                self._leading_bp_handles[id] = bpr
145
146    def _run_debugger_custom(self, cmdline):
147        # TODO: Add conditional and unconditional breakpoint support to dbgeng.
148        if self.debugger.get_name() == "dbgeng":
149            raise DebuggerException(
150                "DexLimitSteps commands are not supported by dbgeng"
151            )
152
153        self.step_collection.clear_steps()
154        self._set_leading_bps()
155
156        for command_obj in chain.from_iterable(self.step_collection.commands.values()):
157            self._watches.update(command_obj.get_watches())
158
159        self.debugger.launch(cmdline)
160        time.sleep(self._pause_between_steps)
161
162        exit_desired = False
163        timed_out = False
164        total_timeout = Timeout(self.context.options.timeout_total)
165
166        while not self.debugger.is_finished:
167            breakpoint_timeout = Timeout(self.context.options.timeout_breakpoint)
168            while self.debugger.is_running and not timed_out:
169                # Check to see whether we've timed out while we're waiting.
170                if total_timeout.timed_out():
171                    self.context.logger.error(
172                        "Debugger session has been "
173                        f"running for {total_timeout.elapsed}s, timeout reached!"
174                    )
175                    timed_out = True
176                if breakpoint_timeout.timed_out():
177                    self.context.logger.error(
178                        f"Debugger session has not "
179                        f"hit a breakpoint for {breakpoint_timeout.elapsed}s, timeout "
180                        "reached!"
181                    )
182                    timed_out = True
183
184            if timed_out:
185                break
186
187            step_info = self.debugger.get_step_info(self._watches, self._step_index)
188            if step_info.current_frame:
189                self._step_index += 1
190                update_step_watches(
191                    step_info, self._watches, self.step_collection.commands
192                )
193                self.step_collection.new_step(self.context, step_info)
194
195            bp_to_delete = []
196            for bp_id in self.debugger.get_triggered_breakpoint_ids():
197                try:
198                    # See if this is one of our leading breakpoints.
199                    bpr = self._leading_bp_handles[bp_id]
200                except KeyError:
201                    # This is a trailing bp. Mark it for removal.
202                    bp_to_delete.append(bp_id)
203                    continue
204
205                bpr.add_hit()
206                if bpr.should_be_removed():
207                    if bpr.finish_on_remove:
208                        exit_desired = True
209                    bp_to_delete.append(bp_id)
210                    del self._leading_bp_handles[bp_id]
211                # Add a range of trailing breakpoints covering the lines
212                # requested in the DexLimitSteps command. Ignore first line as
213                # that's covered by the leading bp we just hit and include the
214                # final line.
215                for line in range(bpr.range_from + 1, bpr.range_to + 1):
216                    self.debugger.add_breakpoint(bpr.path, line)
217
218            # Remove any trailing or expired leading breakpoints we just hit.
219            self.debugger.delete_breakpoints(bp_to_delete)
220
221            if exit_desired:
222                break
223            self.debugger.go()
224            time.sleep(self._pause_between_steps)
225