xref: /llvm-project/lldb/packages/Python/lldbsuite/test/lldbutil.py (revision aba0476f23fc2a851792e9d85c25ee34a5ea7ed0)
1"""
2This LLDB module contains miscellaneous utilities.
3Some of the test suite takes advantage of the utility functions defined here.
4They can also be useful for general purpose lldb scripting.
5"""
6
7# System modules
8import errno
9import io
10import os
11import re
12import sys
13import subprocess
14from typing import Dict
15
16# LLDB modules
17import lldb
18from . import lldbtest_config
19from . import configuration
20
21# How often failed simulator process launches are retried.
22SIMULATOR_RETRY = 3
23
24# ===================================================
25# Utilities for locating/checking executable programs
26# ===================================================
27
28
29def is_exe(fpath):
30    """Returns True if fpath is an executable."""
31    return os.path.isfile(fpath) and os.access(fpath, os.X_OK)
32
33
34def which(program):
35    """Returns the full path to a program; None otherwise."""
36    fpath, fname = os.path.split(program)
37    if fpath:
38        if is_exe(program):
39            return program
40    else:
41        for path in os.environ["PATH"].split(os.pathsep):
42            exe_file = os.path.join(path, program)
43            if is_exe(exe_file):
44                return exe_file
45    return None
46
47
48def mkdir_p(path):
49    try:
50        os.makedirs(path)
51    except OSError as e:
52        if e.errno != errno.EEXIST:
53            raise
54    if not os.path.isdir(path):
55        raise OSError(errno.ENOTDIR, "%s is not a directory" % path)
56
57
58# ============================
59# Dealing with SDK and triples
60# ============================
61
62
63def get_xcode_sdk(os, env):
64    # Respect --apple-sdk <path> if it's specified. If the SDK is simply
65    # mounted from some disk image, and not actually installed, this is the
66    # only way to use it.
67    if configuration.apple_sdk:
68        return configuration.apple_sdk
69    if os == "ios":
70        if env == "simulator":
71            return "iphonesimulator"
72        if env == "macabi":
73            return "macosx"
74        return "iphoneos"
75    elif os == "tvos":
76        if env == "simulator":
77            return "appletvsimulator"
78        return "appletvos"
79    elif os == "watchos":
80        if env == "simulator":
81            return "watchsimulator"
82        return "watchos"
83    return os
84
85
86def get_xcode_sdk_version(sdk):
87    return (
88        subprocess.check_output(["xcrun", "--sdk", sdk, "--show-sdk-version"])
89        .rstrip()
90        .decode("utf-8")
91    )
92
93
94def get_xcode_sdk_root(sdk):
95    return (
96        subprocess.check_output(["xcrun", "--sdk", sdk, "--show-sdk-path"])
97        .rstrip()
98        .decode("utf-8")
99    )
100
101
102def get_xcode_clang(sdk):
103    return (
104        subprocess.check_output(["xcrun", "-sdk", sdk, "-f", "clang"])
105        .rstrip()
106        .decode("utf-8")
107    )
108
109
110# ===================================================
111# Disassembly for an SBFunction or an SBSymbol object
112# ===================================================
113
114
115def disassemble(target, function_or_symbol):
116    """Disassemble the function or symbol given a target.
117
118    It returns the disassembly content in a string object.
119    """
120    buf = io.StringIO()
121    insts = function_or_symbol.GetInstructions(target)
122    for i in insts:
123        print(i, file=buf)
124    return buf.getvalue()
125
126
127# ==========================================================
128# Integer (byte size 1, 2, 4, and 8) to bytearray conversion
129# ==========================================================
130
131
132def int_to_bytearray(val, bytesize):
133    """Utility function to convert an integer into a bytearray.
134
135    It returns the bytearray in the little endian format.  It is easy to get the
136    big endian format, just do ba.reverse() on the returned object.
137    """
138    import struct
139
140    if bytesize == 1:
141        return bytearray([val])
142
143    # Little endian followed by a format character.
144    template = "<%c"
145    if bytesize == 2:
146        fmt = template % "h"
147    elif bytesize == 4:
148        fmt = template % "i"
149    elif bytesize == 4:
150        fmt = template % "q"
151    else:
152        return None
153
154    packed = struct.pack(fmt, val)
155    return bytearray(packed)
156
157
158def bytearray_to_int(bytes, bytesize):
159    """Utility function to convert a bytearray into an integer.
160
161    It interprets the bytearray in the little endian format. For a big endian
162    bytearray, just do ba.reverse() on the object before passing it in.
163    """
164    import struct
165
166    if bytesize == 1:
167        return bytes[0]
168
169    # Little endian followed by a format character.
170    template = "<%c"
171    if bytesize == 2:
172        fmt = template % "h"
173    elif bytesize == 4:
174        fmt = template % "i"
175    elif bytesize == 4:
176        fmt = template % "q"
177    else:
178        return None
179
180    unpacked = struct.unpack_from(fmt, bytes)
181    return unpacked[0]
182
183
184# ==============================================================
185# Get the description of an lldb object or None if not available
186# ==============================================================
187def get_description(obj, option=None):
188    """Calls lldb_obj.GetDescription() and returns a string, or None.
189
190    For SBTarget, SBBreakpointLocation, and SBWatchpoint lldb objects, an extra
191    option can be passed in to describe the detailed level of description
192    desired:
193        o lldb.eDescriptionLevelBrief
194        o lldb.eDescriptionLevelFull
195        o lldb.eDescriptionLevelVerbose
196    """
197    method = getattr(obj, "GetDescription")
198    if not method:
199        return None
200    tuple = (lldb.SBTarget, lldb.SBBreakpointLocation, lldb.SBWatchpoint)
201    if isinstance(obj, tuple):
202        if option is None:
203            option = lldb.eDescriptionLevelBrief
204
205    stream = lldb.SBStream()
206    if option is None:
207        success = method(stream)
208    else:
209        success = method(stream, option)
210    if not success:
211        return None
212    return stream.GetData()
213
214
215# =================================================
216# Convert some enum value to its string counterpart
217# =================================================
218
219
220def _enum_names(prefix: str) -> Dict[int, str]:
221    """Generate a mapping of enum value to name, for the enum prefix."""
222    suffix_start = len(prefix)
223    return {
224        getattr(lldb, attr): attr[suffix_start:].lower()
225        for attr in dir(lldb)
226        if attr.startswith(prefix)
227    }
228
229
230_STATE_NAMES = _enum_names(prefix="eState")
231
232
233def state_type_to_str(enum: int) -> str:
234    """Returns the stateType string given an enum."""
235    name = _STATE_NAMES.get(enum)
236    if name:
237        return name
238    raise Exception(f"Unknown StateType enum: {enum}")
239
240
241_STOP_REASON_NAMES = _enum_names(prefix="eStopReason")
242
243
244def stop_reason_to_str(enum: int) -> str:
245    """Returns the stopReason string given an enum."""
246    name = _STOP_REASON_NAMES.get(enum)
247    if name:
248        return name
249    raise Exception(f"Unknown StopReason enum: {enum}")
250
251
252_SYMBOL_TYPE_NAMES = _enum_names(prefix="eSymbolType")
253
254
255def symbol_type_to_str(enum: int) -> str:
256    """Returns the symbolType string given an enum."""
257    name = _SYMBOL_TYPE_NAMES.get(enum)
258    if name:
259        return name
260    raise Exception(f"Unknown SymbolType enum: {enum}")
261
262
263_VALUE_TYPE_NAMES = _enum_names(prefix="eValueType")
264
265
266def value_type_to_str(enum: int) -> str:
267    """Returns the valueType string given an enum."""
268    name = _VALUE_TYPE_NAMES.get(enum)
269    if name:
270        return name
271    raise Exception(f"Unknown ValueType enum: {enum}")
272
273
274# ==================================================
275# Get stopped threads due to each stop reason.
276# ==================================================
277
278
279def sort_stopped_threads(
280    process,
281    breakpoint_threads=None,
282    crashed_threads=None,
283    watchpoint_threads=None,
284    signal_threads=None,
285    exiting_threads=None,
286    other_threads=None,
287):
288    """Fills array *_threads with threads stopped for the corresponding stop
289    reason.
290    """
291    for lst in [
292        breakpoint_threads,
293        watchpoint_threads,
294        signal_threads,
295        exiting_threads,
296        other_threads,
297    ]:
298        if lst is not None:
299            lst[:] = []
300
301    for thread in process:
302        dispatched = False
303        for reason, list in [
304            (lldb.eStopReasonBreakpoint, breakpoint_threads),
305            (lldb.eStopReasonException, crashed_threads),
306            (lldb.eStopReasonWatchpoint, watchpoint_threads),
307            (lldb.eStopReasonSignal, signal_threads),
308            (lldb.eStopReasonThreadExiting, exiting_threads),
309            (None, other_threads),
310        ]:
311            if not dispatched and list is not None:
312                if thread.GetStopReason() == reason or reason is None:
313                    list.append(thread)
314                    dispatched = True
315
316
317# ==================================================
318# Utility functions for setting breakpoints
319# ==================================================
320
321
322def run_break_set_by_script(
323    test, class_name, extra_options=None, num_expected_locations=1
324):
325    """Set a scripted breakpoint.  Check that it got the right number of locations."""
326    test.assertTrue(class_name is not None, "Must pass in a class name.")
327    command = "breakpoint set -P " + class_name
328    if extra_options is not None:
329        command += " " + extra_options
330
331    break_results = run_break_set_command(test, command)
332    check_breakpoint_result(test, break_results, num_locations=num_expected_locations)
333    return get_bpno_from_match(break_results)
334
335
336def run_break_set_by_file_and_line(
337    test,
338    file_name,
339    line_number,
340    extra_options=None,
341    num_expected_locations=1,
342    loc_exact=False,
343    module_name=None,
344):
345    """Set a breakpoint by file and line, returning the breakpoint number.
346
347    If extra_options is not None, then we append it to the breakpoint set command.
348
349    If num_expected_locations is -1, we check that we got AT LEAST one location. If num_expected_locations is -2, we don't
350    check the actual number at all. Otherwise, we check that num_expected_locations equals the number of locations.
351
352    If loc_exact is true, we check that there is one location, and that location must be at the input file and line number.
353    """
354
355    if file_name is None:
356        command = "breakpoint set -l %d" % (line_number)
357    else:
358        command = 'breakpoint set -f "%s" -l %d' % (file_name, line_number)
359
360    if module_name:
361        command += " --shlib '%s'" % (module_name)
362
363    if extra_options:
364        command += " " + extra_options
365
366    break_results = run_break_set_command(test, command)
367
368    if num_expected_locations == 1 and loc_exact:
369        check_breakpoint_result(
370            test,
371            break_results,
372            num_locations=num_expected_locations,
373            file_name=file_name,
374            line_number=line_number,
375            module_name=module_name,
376        )
377    else:
378        check_breakpoint_result(
379            test, break_results, num_locations=num_expected_locations
380        )
381
382    return get_bpno_from_match(break_results)
383
384
385def run_break_set_by_symbol(
386    test,
387    symbol,
388    extra_options=None,
389    num_expected_locations=-1,
390    sym_exact=False,
391    module_name=None,
392):
393    """Set a breakpoint by symbol name.  Common options are the same as run_break_set_by_file_and_line.
394
395    If sym_exact is true, then the output symbol must match the input exactly, otherwise we do a substring match.
396    """
397    command = 'breakpoint set -n "%s"' % (symbol)
398
399    if module_name:
400        command += " --shlib '%s'" % (module_name)
401
402    if extra_options:
403        command += " " + extra_options
404
405    break_results = run_break_set_command(test, command)
406
407    if num_expected_locations == 1 and sym_exact:
408        check_breakpoint_result(
409            test,
410            break_results,
411            num_locations=num_expected_locations,
412            symbol_name=symbol,
413            module_name=module_name,
414        )
415    else:
416        check_breakpoint_result(
417            test, break_results, num_locations=num_expected_locations
418        )
419
420    return get_bpno_from_match(break_results)
421
422
423def run_break_set_by_selector(
424    test, selector, extra_options=None, num_expected_locations=-1, module_name=None
425):
426    """Set a breakpoint by selector.  Common options are the same as run_break_set_by_file_and_line."""
427
428    command = 'breakpoint set -S "%s"' % (selector)
429
430    if module_name:
431        command += ' --shlib "%s"' % (module_name)
432
433    if extra_options:
434        command += " " + extra_options
435
436    break_results = run_break_set_command(test, command)
437
438    if num_expected_locations == 1:
439        check_breakpoint_result(
440            test,
441            break_results,
442            num_locations=num_expected_locations,
443            symbol_name=selector,
444            symbol_match_exact=False,
445            module_name=module_name,
446        )
447    else:
448        check_breakpoint_result(
449            test, break_results, num_locations=num_expected_locations
450        )
451
452    return get_bpno_from_match(break_results)
453
454
455def run_break_set_by_regexp(
456    test, regexp, extra_options=None, num_expected_locations=-1
457):
458    """Set a breakpoint by regular expression match on symbol name.  Common options are the same as run_break_set_by_file_and_line."""
459
460    command = 'breakpoint set -r "%s"' % (regexp)
461    if extra_options:
462        command += " " + extra_options
463
464    break_results = run_break_set_command(test, command)
465
466    check_breakpoint_result(test, break_results, num_locations=num_expected_locations)
467
468    return get_bpno_from_match(break_results)
469
470
471def run_break_set_by_source_regexp(
472    test, regexp, extra_options=None, num_expected_locations=-1
473):
474    """Set a breakpoint by source regular expression.  Common options are the same as run_break_set_by_file_and_line."""
475    command = 'breakpoint set -p "%s"' % (regexp)
476    if extra_options:
477        command += " " + extra_options
478
479    break_results = run_break_set_command(test, command)
480
481    check_breakpoint_result(test, break_results, num_locations=num_expected_locations)
482
483    return get_bpno_from_match(break_results)
484
485
486def run_break_set_by_file_colon_line(
487    test,
488    specifier,
489    path,
490    line_number,
491    column_number=0,
492    extra_options=None,
493    num_expected_locations=-1,
494):
495    command = 'breakpoint set -y "%s"' % (specifier)
496    if extra_options:
497        command += " " + extra_options
498
499    print("About to run: '%s'", command)
500    break_results = run_break_set_command(test, command)
501    check_breakpoint_result(
502        test,
503        break_results,
504        num_locations=num_expected_locations,
505        file_name=path,
506        line_number=line_number,
507        column_number=column_number,
508    )
509
510    return get_bpno_from_match(break_results)
511
512
513def run_break_set_command(test, command):
514    """Run the command passed in - it must be some break set variant - and analyze the result.
515    Returns a dictionary of information gleaned from the command-line results.
516    Will assert if the breakpoint setting fails altogether.
517
518    Dictionary will contain:
519        bpno          - breakpoint of the newly created breakpoint, -1 on error.
520        num_locations - number of locations set for the breakpoint.
521
522    If there is only one location, the dictionary MAY contain:
523        file          - source file name
524        line_no       - source line number
525        column        - source column number
526        symbol        - symbol name
527        inline_symbol - inlined symbol name
528        offset        - offset from the original symbol
529        module        - module
530        address       - address at which the breakpoint was set."""
531
532    patterns = [
533        r"^Breakpoint (?P<bpno>[0-9]+): (?P<num_locations>[0-9]+) locations\.$",
534        r"^Breakpoint (?P<bpno>[0-9]+): (?P<num_locations>no) locations \(pending\)\.",
535        r"^Breakpoint (?P<bpno>[0-9]+): where = (?P<module>.*)`(?P<symbol>[+\-]{0,1}[^+]+)( \+ (?P<offset>[0-9]+)){0,1}( \[inlined\] (?P<inline_symbol>.*)){0,1} at (?P<file>[^:]+):(?P<line_no>[0-9]+)(?P<column>(:[0-9]+)?), address = (?P<address>0x[0-9a-fA-F]+)$",
536        r"^Breakpoint (?P<bpno>[0-9]+): where = (?P<module>.*)`(?P<symbol>.*)( \+ (?P<offset>[0-9]+)){0,1}, address = (?P<address>0x[0-9a-fA-F]+)$",
537    ]
538    match_object = test.match(command, patterns)
539    break_results = match_object.groupdict()
540
541    # We always insert the breakpoint number, setting it to -1 if we couldn't find it
542    # Also, make sure it gets stored as an integer.
543    if not "bpno" in break_results:
544        break_results["bpno"] = -1
545    else:
546        break_results["bpno"] = int(break_results["bpno"])
547
548    # We always insert the number of locations
549    # If ONE location is set for the breakpoint, then the output doesn't mention locations, but it has to be 1...
550    # We also make sure it is an integer.
551
552    if not "num_locations" in break_results:
553        num_locations = 1
554    else:
555        num_locations = break_results["num_locations"]
556        if num_locations == "no":
557            num_locations = 0
558        else:
559            num_locations = int(break_results["num_locations"])
560
561    break_results["num_locations"] = num_locations
562
563    if "line_no" in break_results:
564        break_results["line_no"] = int(break_results["line_no"])
565
566    return break_results
567
568
569def get_bpno_from_match(break_results):
570    return int(break_results["bpno"])
571
572
573def check_breakpoint_result(
574    test,
575    break_results,
576    file_name=None,
577    line_number=-1,
578    column_number=0,
579    symbol_name=None,
580    symbol_match_exact=True,
581    module_name=None,
582    offset=-1,
583    num_locations=-1,
584):
585    out_num_locations = break_results["num_locations"]
586
587    if num_locations == -1:
588        test.assertTrue(
589            out_num_locations > 0, "Expecting one or more locations, got none."
590        )
591    elif num_locations != -2:
592        test.assertTrue(
593            num_locations == out_num_locations,
594            "Expecting %d locations, got %d." % (num_locations, out_num_locations),
595        )
596
597    if file_name:
598        out_file_name = ""
599        if "file" in break_results:
600            out_file_name = break_results["file"]
601        test.assertTrue(
602            file_name.endswith(out_file_name),
603            "Breakpoint file name '%s' doesn't match resultant name '%s'."
604            % (file_name, out_file_name),
605        )
606
607    if line_number != -1:
608        out_line_number = -1
609        if "line_no" in break_results:
610            out_line_number = break_results["line_no"]
611
612        test.assertTrue(
613            line_number == out_line_number,
614            "Breakpoint line number %s doesn't match resultant line %s."
615            % (line_number, out_line_number),
616        )
617
618    if column_number != 0:
619        out_column_number = 0
620        if "column" in break_results:
621            out_column_number = break_results["column"]
622
623        test.assertTrue(
624            column_number == out_column_number,
625            "Breakpoint column number %s doesn't match resultant column %s."
626            % (column_number, out_column_number),
627        )
628
629    if symbol_name:
630        out_symbol_name = ""
631        # Look first for the inlined symbol name, otherwise use the symbol
632        # name:
633        if "inline_symbol" in break_results and break_results["inline_symbol"]:
634            out_symbol_name = break_results["inline_symbol"]
635        elif "symbol" in break_results:
636            out_symbol_name = break_results["symbol"]
637
638        if symbol_match_exact:
639            test.assertTrue(
640                symbol_name == out_symbol_name,
641                "Symbol name '%s' doesn't match resultant symbol '%s'."
642                % (symbol_name, out_symbol_name),
643            )
644        else:
645            test.assertTrue(
646                out_symbol_name.find(symbol_name) != -1,
647                "Symbol name '%s' isn't in resultant symbol '%s'."
648                % (symbol_name, out_symbol_name),
649            )
650
651    if module_name:
652        out_module_name = None
653        if "module" in break_results:
654            out_module_name = break_results["module"]
655
656        test.assertTrue(
657            module_name.find(out_module_name) != -1,
658            "Symbol module name '%s' isn't in expected module name '%s'."
659            % (out_module_name, module_name),
660        )
661
662
663def check_breakpoint(
664    test,
665    bpno,
666    expected_locations=None,
667    expected_resolved_count=None,
668    expected_hit_count=None,
669    location_id=None,
670    expected_location_resolved=True,
671    expected_location_hit_count=None,
672):
673    """
674    Test breakpoint or breakpoint location.
675    Breakpoint resolved count is always checked. If not specified the assumption is that all locations
676    should be resolved.
677    To test a breakpoint location, breakpoint number (bpno) and location_id must be set. In this case
678    the resolved count for a breakpoint is not tested by default. The location is expected to be resolved,
679    unless expected_location_resolved is set to False.
680    test - test context
681    bpno - breakpoint number to test
682    expected_locations - expected number of locations for this breakpoint. If 'None' this parameter is not tested.
683    expected_resolved_count - expected resolved locations number for the breakpoint.  If 'None' - all locations should be resolved.
684    expected_hit_count - expected hit count for this breakpoint. If 'None' this parameter is not tested.
685    location_id - If not 'None' sets the location ID for the breakpoint to test.
686    expected_location_resolved - Extected resolved status for the location_id (True/False). Default - True.
687    expected_location_hit_count - Expected hit count for the breakpoint at location_id. Must be set if the location_id parameter is set.
688    """
689
690    if isinstance(test.target, lldb.SBTarget):
691        target = test.target
692    else:
693        target = test.target()
694    bkpt = target.FindBreakpointByID(bpno)
695
696    test.assertTrue(bkpt.IsValid(), "Breakpoint is not valid.")
697
698    if expected_locations is not None:
699        test.assertEqual(expected_locations, bkpt.GetNumLocations())
700
701    if expected_resolved_count is not None:
702        test.assertEqual(expected_resolved_count, bkpt.GetNumResolvedLocations())
703    else:
704        expected_resolved_count = bkpt.GetNumLocations()
705        if location_id is None:
706            test.assertEqual(expected_resolved_count, bkpt.GetNumResolvedLocations())
707
708    if expected_hit_count is not None:
709        test.assertEqual(expected_hit_count, bkpt.GetHitCount())
710
711    if location_id is not None:
712        loc_bkpt = bkpt.FindLocationByID(location_id)
713        test.assertTrue(loc_bkpt.IsValid(), "Breakpoint location is not valid.")
714        test.assertEqual(loc_bkpt.IsResolved(), expected_location_resolved)
715        if expected_location_hit_count is not None:
716            test.assertEqual(expected_location_hit_count, loc_bkpt.GetHitCount())
717
718
719# ==================================================
720# Utility functions related to Threads and Processes
721# ==================================================
722
723
724def get_stopped_threads(process, reason):
725    """Returns the thread(s) with the specified stop reason in a list.
726
727    The list can be empty if no such thread exists.
728    """
729    threads = []
730    for t in process:
731        if t.GetStopReason() == reason:
732            threads.append(t)
733    return threads
734
735
736def get_stopped_thread(process, reason):
737    """A convenience function which returns the first thread with the given stop
738    reason or None.
739
740    Example usages:
741
742    1. Get the stopped thread due to a breakpoint condition
743
744    ...
745        from lldbutil import get_stopped_thread
746        thread = get_stopped_thread(process, lldb.eStopReasonPlanComplete)
747        self.assertTrue(thread.IsValid(), "There should be a thread stopped due to breakpoint condition")
748    ...
749
750    2. Get the thread stopped due to a breakpoint
751
752    ...
753        from lldbutil import get_stopped_thread
754        thread = get_stopped_thread(process, lldb.eStopReasonBreakpoint)
755        self.assertTrue(thread.IsValid(), "There should be a thread stopped due to breakpoint")
756    ...
757
758    """
759    threads = get_stopped_threads(process, reason)
760    if len(threads) == 0:
761        return None
762    return threads[0]
763
764
765def get_threads_stopped_at_breakpoint_id(process, bpid):
766    """For a stopped process returns the thread stopped at the breakpoint passed in bkpt"""
767    stopped_threads = []
768    threads = []
769
770    stopped_threads = get_stopped_threads(process, lldb.eStopReasonBreakpoint)
771
772    if len(stopped_threads) == 0:
773        return threads
774
775    for thread in stopped_threads:
776        # Make sure we've hit our breakpoint.
777        # From the docs of GetStopReasonDataAtIndex: "Breakpoint stop reasons
778        # will have data that consists of pairs of breakpoint IDs followed by
779        # the breakpoint location IDs".
780        # Iterate over all such pairs looking for `bpid`.
781        break_ids = [
782            thread.GetStopReasonDataAtIndex(idx)
783            for idx in range(0, thread.GetStopReasonDataCount(), 2)
784        ]
785        if bpid in break_ids:
786            threads.append(thread)
787
788    return threads
789
790
791def get_threads_stopped_at_breakpoint(process, bkpt):
792    return get_threads_stopped_at_breakpoint_id(process, bkpt.GetID())
793
794
795def get_one_thread_stopped_at_breakpoint_id(process, bpid, require_exactly_one=True):
796    threads = get_threads_stopped_at_breakpoint_id(process, bpid)
797    if len(threads) == 0:
798        return None
799    if require_exactly_one and len(threads) != 1:
800        return None
801
802    return threads[0]
803
804
805def get_one_thread_stopped_at_breakpoint(process, bkpt, require_exactly_one=True):
806    return get_one_thread_stopped_at_breakpoint_id(
807        process, bkpt.GetID(), require_exactly_one
808    )
809
810
811def is_thread_crashed(test, thread):
812    """In the test suite we dereference a null pointer to simulate a crash. The way this is
813    reported depends on the platform."""
814    if test.platformIsDarwin():
815        return (
816            thread.GetStopReason() == lldb.eStopReasonException
817            and "EXC_BAD_ACCESS" in thread.GetStopDescription(100)
818        )
819    elif test.getPlatform() in ["linux", "freebsd"]:
820        return (
821            thread.GetStopReason() == lldb.eStopReasonSignal
822            and thread.GetStopReasonDataAtIndex(0)
823            == thread.GetProcess().GetUnixSignals().GetSignalNumberFromName("SIGSEGV")
824        )
825    elif test.getPlatform() == "windows":
826        return "Exception 0xc0000005" in thread.GetStopDescription(200)
827    else:
828        return "invalid address" in thread.GetStopDescription(100)
829
830
831def get_crashed_threads(test, process):
832    threads = []
833    if process.GetState() != lldb.eStateStopped:
834        return threads
835    for thread in process:
836        if is_thread_crashed(test, thread):
837            threads.append(thread)
838    return threads
839
840
841# Helper functions for run_to_{source,name}_breakpoint:
842
843
844def run_to_breakpoint_make_target(test, exe_name="a.out", in_cwd=True):
845    exe = test.getBuildArtifact(exe_name) if in_cwd else exe_name
846
847    # Create the target
848    target = test.dbg.CreateTarget(exe)
849    test.assertTrue(target, "Target: %s is not valid." % (exe_name))
850
851    # Set environment variables for the inferior.
852    if lldbtest_config.inferior_env:
853        test.runCmd(
854            "settings set target.env-vars {}".format(lldbtest_config.inferior_env)
855        )
856
857    return target
858
859
860def run_to_breakpoint_do_run(
861    test, target, bkpt, launch_info=None, only_one_thread=True, extra_images=None
862):
863    # Launch the process, and do not stop at the entry point.
864    if not launch_info:
865        launch_info = target.GetLaunchInfo()
866        launch_info.SetWorkingDirectory(test.get_process_working_directory())
867
868    if extra_images:
869        environ = test.registerSharedLibrariesWithTarget(target, extra_images)
870        launch_info.SetEnvironmentEntries(environ, True)
871
872    error = lldb.SBError()
873    process = target.Launch(launch_info, error)
874
875    # Unfortunate workaround for the iPhone simulator.
876    retry = SIMULATOR_RETRY
877    while (
878        retry
879        and error.Fail()
880        and error.GetCString()
881        and "Unable to boot the Simulator" in error.GetCString()
882    ):
883        retry -= 1
884        print("** Simulator is unresponsive. Retrying %d more time(s)" % retry)
885        import time
886
887        time.sleep(60)
888        error = lldb.SBError()
889        process = target.Launch(launch_info, error)
890
891    test.assertTrue(
892        process,
893        "Could not create a valid process for %s: %s"
894        % (target.GetExecutable().GetFilename(), error.GetCString()),
895    )
896    test.assertFalse(error.Fail(), "Process launch failed: %s" % (error.GetCString()))
897
898    def processStateInfo(process):
899        info = "state: {}".format(state_type_to_str(process.state))
900        if process.state == lldb.eStateExited:
901            info += ", exit code: {}".format(process.GetExitStatus())
902            if process.exit_description:
903                info += ", exit description: '{}'".format(process.exit_description)
904        stdout = process.GetSTDOUT(999)
905        if stdout:
906            info += ", stdout: '{}'".format(stdout)
907        stderr = process.GetSTDERR(999)
908        if stderr:
909            info += ", stderr: '{}'".format(stderr)
910        return info
911
912    if process.state != lldb.eStateStopped:
913        test.fail(
914            "Test process is not stopped at breakpoint: {}".format(
915                processStateInfo(process)
916            )
917        )
918
919    # Frame #0 should be at our breakpoint.
920    threads = get_threads_stopped_at_breakpoint(process, bkpt)
921
922    num_threads = len(threads)
923    if only_one_thread:
924        test.assertEqual(
925            num_threads,
926            1,
927            "Expected 1 thread to stop at breakpoint, %d did." % (num_threads),
928        )
929    else:
930        test.assertGreater(num_threads, 0, "No threads stopped at breakpoint")
931
932    thread = threads[0]
933    return (target, process, thread, bkpt)
934
935
936def run_to_name_breakpoint(
937    test,
938    bkpt_name,
939    launch_info=None,
940    exe_name="a.out",
941    bkpt_module=None,
942    in_cwd=True,
943    only_one_thread=True,
944    extra_images=None,
945):
946    """Start up a target, using exe_name as the executable, and run it to
947    a breakpoint set by name on bkpt_name restricted to bkpt_module.
948
949    If you want to pass in launch arguments or environment
950    variables, you can optionally pass in an SBLaunchInfo.  If you
951    do that, remember to set the working directory as well.
952
953    If your executable isn't called a.out, you can pass that in.
954    And if your executable isn't in the CWD, pass in the absolute
955    path to the executable in exe_name, and set in_cwd to False.
956
957    If you need to restrict the breakpoint to a particular module,
958    pass the module name (a string not a FileSpec) in bkpt_module.  If
959    nothing is passed in setting will be unrestricted.
960
961    If the target isn't valid, the breakpoint isn't found, or hit, the
962    function will cause a testsuite failure.
963
964    If successful it returns a tuple with the target process and
965    thread that hit the breakpoint, and the breakpoint that we set
966    for you.
967
968    If only_one_thread is true, we require that there be only one
969    thread stopped at the breakpoint.  Otherwise we only require one
970    or more threads stop there.  If there are more than one, we return
971    the first thread that stopped.
972    """
973
974    target = run_to_breakpoint_make_target(test, exe_name, in_cwd)
975
976    breakpoint = target.BreakpointCreateByName(bkpt_name, bkpt_module)
977
978    test.assertTrue(
979        breakpoint.GetNumLocations() > 0,
980        "No locations found for name breakpoint: '%s'." % (bkpt_name),
981    )
982    return run_to_breakpoint_do_run(
983        test, target, breakpoint, launch_info, only_one_thread, extra_images
984    )
985
986
987def run_to_source_breakpoint(
988    test,
989    bkpt_pattern,
990    source_spec,
991    launch_info=None,
992    exe_name="a.out",
993    bkpt_module=None,
994    in_cwd=True,
995    only_one_thread=True,
996    extra_images=None,
997    has_locations_before_run=True,
998):
999    """Start up a target, using exe_name as the executable, and run it to
1000    a breakpoint set by source regex bkpt_pattern.
1001
1002    The rest of the behavior is the same as run_to_name_breakpoint.
1003    """
1004
1005    target = run_to_breakpoint_make_target(test, exe_name, in_cwd)
1006    # Set the breakpoints
1007    breakpoint = target.BreakpointCreateBySourceRegex(
1008        bkpt_pattern, source_spec, bkpt_module
1009    )
1010    if has_locations_before_run:
1011        test.assertTrue(
1012            breakpoint.GetNumLocations() > 0,
1013            'No locations found for source breakpoint: "%s", file: "%s", dir: "%s"'
1014            % (bkpt_pattern, source_spec.GetFilename(), source_spec.GetDirectory()),
1015        )
1016    return run_to_breakpoint_do_run(
1017        test, target, breakpoint, launch_info, only_one_thread, extra_images
1018    )
1019
1020
1021def run_to_line_breakpoint(
1022    test,
1023    source_spec,
1024    line_number,
1025    column=0,
1026    launch_info=None,
1027    exe_name="a.out",
1028    bkpt_module=None,
1029    in_cwd=True,
1030    only_one_thread=True,
1031    extra_images=None,
1032):
1033    """Start up a target, using exe_name as the executable, and run it to
1034    a breakpoint set by (source_spec, line_number(, column)).
1035
1036    The rest of the behavior is the same as run_to_name_breakpoint.
1037    """
1038
1039    target = run_to_breakpoint_make_target(test, exe_name, in_cwd)
1040    # Set the breakpoints
1041    breakpoint = target.BreakpointCreateByLocation(
1042        source_spec, line_number, column, 0, lldb.SBFileSpecList()
1043    )
1044    test.assertTrue(
1045        breakpoint.GetNumLocations() > 0,
1046        'No locations found for line breakpoint: "%s:%d(:%d)", dir: "%s"'
1047        % (source_spec.GetFilename(), line_number, column, source_spec.GetDirectory()),
1048    )
1049    return run_to_breakpoint_do_run(
1050        test, target, breakpoint, launch_info, only_one_thread, extra_images
1051    )
1052
1053
1054def continue_to_breakpoint(process, bkpt):
1055    """Continues the process, if it stops, returns the threads stopped at bkpt; otherwise, returns None"""
1056    process.Continue()
1057    if process.GetState() != lldb.eStateStopped:
1058        return None
1059    else:
1060        return get_threads_stopped_at_breakpoint(process, bkpt)
1061
1062
1063def continue_to_source_breakpoint(test, process, bkpt_pattern, source_spec):
1064    """
1065    Sets a breakpoint set by source regex bkpt_pattern, continues the process, and deletes the breakpoint again.
1066    Otherwise the same as `continue_to_breakpoint`
1067    """
1068    breakpoint = process.target.BreakpointCreateBySourceRegex(
1069        bkpt_pattern, source_spec, None
1070    )
1071    test.assertTrue(
1072        breakpoint.GetNumLocations() > 0,
1073        'No locations found for source breakpoint: "%s", file: "%s", dir: "%s"'
1074        % (bkpt_pattern, source_spec.GetFilename(), source_spec.GetDirectory()),
1075    )
1076    stopped_threads = continue_to_breakpoint(process, breakpoint)
1077    process.target.BreakpointDelete(breakpoint.GetID())
1078    return stopped_threads
1079
1080
1081def get_caller_symbol(thread):
1082    """
1083    Returns the symbol name for the call site of the leaf function.
1084    """
1085    depth = thread.GetNumFrames()
1086    if depth <= 1:
1087        return None
1088    caller = thread.GetFrameAtIndex(1).GetSymbol()
1089    if caller:
1090        return caller.GetName()
1091    else:
1092        return None
1093
1094
1095def get_function_names(thread):
1096    """
1097    Returns a sequence of function names from the stack frames of this thread.
1098    """
1099
1100    def GetFuncName(i):
1101        return thread.GetFrameAtIndex(i).GetFunctionName()
1102
1103    return list(map(GetFuncName, list(range(thread.GetNumFrames()))))
1104
1105
1106def get_symbol_names(thread):
1107    """
1108    Returns a sequence of symbols for this thread.
1109    """
1110
1111    def GetSymbol(i):
1112        return thread.GetFrameAtIndex(i).GetSymbol().GetName()
1113
1114    return list(map(GetSymbol, list(range(thread.GetNumFrames()))))
1115
1116
1117def get_pc_addresses(thread):
1118    """
1119    Returns a sequence of pc addresses for this thread.
1120    """
1121
1122    def GetPCAddress(i):
1123        return thread.GetFrameAtIndex(i).GetPCAddress()
1124
1125    return list(map(GetPCAddress, list(range(thread.GetNumFrames()))))
1126
1127
1128def get_filenames(thread):
1129    """
1130    Returns a sequence of file names from the stack frames of this thread.
1131    """
1132
1133    def GetFilename(i):
1134        return thread.GetFrameAtIndex(i).GetLineEntry().GetFileSpec().GetFilename()
1135
1136    return list(map(GetFilename, list(range(thread.GetNumFrames()))))
1137
1138
1139def get_line_numbers(thread):
1140    """
1141    Returns a sequence of line numbers from the stack frames of this thread.
1142    """
1143
1144    def GetLineNumber(i):
1145        return thread.GetFrameAtIndex(i).GetLineEntry().GetLine()
1146
1147    return list(map(GetLineNumber, list(range(thread.GetNumFrames()))))
1148
1149
1150def get_module_names(thread):
1151    """
1152    Returns a sequence of module names from the stack frames of this thread.
1153    """
1154
1155    def GetModuleName(i):
1156        return thread.GetFrameAtIndex(i).GetModule().GetFileSpec().GetFilename()
1157
1158    return list(map(GetModuleName, list(range(thread.GetNumFrames()))))
1159
1160
1161def print_stacktrace(thread, string_buffer=False):
1162    """Prints a simple stack trace of this thread."""
1163
1164    output = io.StringIO() if string_buffer else sys.stdout
1165    target = thread.GetProcess().GetTarget()
1166
1167    depth = thread.GetNumFrames()
1168
1169    mods = get_module_names(thread)
1170    funcs = get_function_names(thread)
1171    symbols = get_symbol_names(thread)
1172    files = get_filenames(thread)
1173    lines = get_line_numbers(thread)
1174    addrs = get_pc_addresses(thread)
1175
1176    if thread.GetStopReason() != lldb.eStopReasonInvalid:
1177        desc = "stop reason=" + stop_reason_to_str(thread.GetStopReason())
1178    else:
1179        desc = ""
1180    print(
1181        "Stack trace for thread id={0:#x} name={1} queue={2} ".format(
1182            thread.GetThreadID(), thread.GetName(), thread.GetQueueName()
1183        )
1184        + desc,
1185        file=output,
1186    )
1187
1188    for i in range(depth):
1189        frame = thread.GetFrameAtIndex(i)
1190        function = frame.GetFunction()
1191
1192        load_addr = addrs[i].GetLoadAddress(target)
1193        if not function:
1194            file_addr = addrs[i].GetFileAddress()
1195            start_addr = frame.GetSymbol().GetStartAddress().GetFileAddress()
1196            symbol_offset = file_addr - start_addr
1197            print(
1198                "  frame #{num}: {addr:#016x} {mod}`{symbol} + {offset}".format(
1199                    num=i,
1200                    addr=load_addr,
1201                    mod=mods[i],
1202                    symbol=symbols[i],
1203                    offset=symbol_offset,
1204                ),
1205                file=output,
1206            )
1207        else:
1208            print(
1209                "  frame #{num}: {addr:#016x} {mod}`{func} at {file}:{line} {args}".format(
1210                    num=i,
1211                    addr=load_addr,
1212                    mod=mods[i],
1213                    func="%s [inlined]" % funcs[i] if frame.IsInlined() else funcs[i],
1214                    file=files[i],
1215                    line=lines[i],
1216                    args=get_args_as_string(frame, showFuncName=False)
1217                    if not frame.IsInlined()
1218                    else "()",
1219                ),
1220                file=output,
1221            )
1222
1223    if string_buffer:
1224        return output.getvalue()
1225
1226
1227def print_stacktraces(process, string_buffer=False):
1228    """Prints the stack traces of all the threads."""
1229
1230    output = io.StringIO() if string_buffer else sys.stdout
1231
1232    print("Stack traces for " + str(process), file=output)
1233
1234    for thread in process:
1235        print(print_stacktrace(thread, string_buffer=True), file=output)
1236
1237    if string_buffer:
1238        return output.getvalue()
1239
1240
1241def expect_state_changes(test, listener, process, states, timeout=30):
1242    """Listens for state changed events on the listener and makes sure they match what we
1243    expect. Stop-and-restart events (where GetRestartedFromEvent() returns true) are ignored.
1244    """
1245
1246    for expected_state in states:
1247
1248        def get_next_event():
1249            event = lldb.SBEvent()
1250            if not listener.WaitForEventForBroadcasterWithType(
1251                timeout,
1252                process.GetBroadcaster(),
1253                lldb.SBProcess.eBroadcastBitStateChanged,
1254                event,
1255            ):
1256                test.fail(
1257                    "Timed out while waiting for a transition to state %s"
1258                    % lldb.SBDebugger.StateAsCString(expected_state)
1259                )
1260            return event
1261
1262        event = get_next_event()
1263        while lldb.SBProcess.GetStateFromEvent(
1264            event
1265        ) == lldb.eStateStopped and lldb.SBProcess.GetRestartedFromEvent(event):
1266            # Ignore restarted event and the subsequent running event.
1267            event = get_next_event()
1268            test.assertEqual(
1269                lldb.SBProcess.GetStateFromEvent(event),
1270                lldb.eStateRunning,
1271                "Restarted event followed by a running event",
1272            )
1273            event = get_next_event()
1274
1275        test.assertEqual(lldb.SBProcess.GetStateFromEvent(event), expected_state)
1276
1277
1278def start_listening_from(broadcaster, event_mask):
1279    """Creates a listener for a specific event mask and add it to the source broadcaster."""
1280
1281    listener = lldb.SBListener("lldb.test.listener")
1282    broadcaster.AddListener(listener, event_mask)
1283    return listener
1284
1285
1286def fetch_next_event(test, listener, broadcaster, match_class=False, timeout=10):
1287    """Fetch one event from the listener and return it if it matches the provided broadcaster.
1288    If `match_class` is set to True, this will match an event with an entire broadcaster class.
1289    Fails otherwise."""
1290
1291    event = lldb.SBEvent()
1292
1293    if listener.WaitForEvent(timeout, event):
1294        if match_class:
1295            if event.GetBroadcasterClass() == broadcaster:
1296                return event
1297        else:
1298            if event.BroadcasterMatchesRef(broadcaster):
1299                return event
1300
1301        stream = lldb.SBStream()
1302        event.GetDescription(stream)
1303        test.fail(
1304            "received event '%s' from unexpected broadcaster '%s'."
1305            % (stream.GetData(), event.GetBroadcaster().GetName())
1306        )
1307
1308    test.fail("couldn't fetch an event before reaching the timeout.")
1309
1310
1311# ===================================
1312# Utility functions related to Frames
1313# ===================================
1314
1315
1316def get_parent_frame(frame):
1317    """
1318    Returns the parent frame of the input frame object; None if not available.
1319    """
1320    thread = frame.GetThread()
1321    parent_found = False
1322    for f in thread:
1323        if parent_found:
1324            return f
1325        if f.GetFrameID() == frame.GetFrameID():
1326            parent_found = True
1327
1328    # If we reach here, no parent has been found, return None.
1329    return None
1330
1331
1332def get_args_as_string(frame, showFuncName=True):
1333    """
1334    Returns the args of the input frame object as a string.
1335    """
1336    # arguments     => True
1337    # locals        => False
1338    # statics       => False
1339    # in_scope_only => True
1340    vars = frame.GetVariables(True, False, False, True)  # type of SBValueList
1341    args = []  # list of strings
1342    for var in vars:
1343        args.append("(%s)%s=%s" % (var.GetTypeName(), var.GetName(), var.GetValue()))
1344    if frame.GetFunction():
1345        name = frame.GetFunction().GetName()
1346    elif frame.GetSymbol():
1347        name = frame.GetSymbol().GetName()
1348    else:
1349        name = ""
1350    if showFuncName:
1351        return "%s(%s)" % (name, ", ".join(args))
1352    else:
1353        return "(%s)" % (", ".join(args))
1354
1355
1356def get_registers(frame, kind):
1357    """Returns the registers given the frame and the kind of registers desired.
1358
1359    Returns None if there's no such kind.
1360    """
1361    registerSet = frame.GetRegisters()  # Return type of SBValueList.
1362    for value in registerSet:
1363        if kind.lower() in value.GetName().lower():
1364            return value
1365
1366    return None
1367
1368
1369def get_GPRs(frame):
1370    """Returns the general purpose registers of the frame as an SBValue.
1371
1372    The returned SBValue object is iterable.  An example:
1373        ...
1374        from lldbutil import get_GPRs
1375        regs = get_GPRs(frame)
1376        for reg in regs:
1377            print("%s => %s" % (reg.GetName(), reg.GetValue()))
1378        ...
1379    """
1380    return get_registers(frame, "general purpose")
1381
1382
1383def get_FPRs(frame):
1384    """Returns the floating point registers of the frame as an SBValue.
1385
1386    The returned SBValue object is iterable.  An example:
1387        ...
1388        from lldbutil import get_FPRs
1389        regs = get_FPRs(frame)
1390        for reg in regs:
1391            print("%s => %s" % (reg.GetName(), reg.GetValue()))
1392        ...
1393    """
1394    return get_registers(frame, "floating point")
1395
1396
1397def get_ESRs(frame):
1398    """Returns the exception state registers of the frame as an SBValue.
1399
1400    The returned SBValue object is iterable.  An example:
1401        ...
1402        from lldbutil import get_ESRs
1403        regs = get_ESRs(frame)
1404        for reg in regs:
1405            print("%s => %s" % (reg.GetName(), reg.GetValue()))
1406        ...
1407    """
1408    return get_registers(frame, "exception state")
1409
1410
1411# ======================================
1412# Utility classes/functions for SBValues
1413# ======================================
1414
1415
1416class BasicFormatter(object):
1417    """The basic formatter inspects the value object and prints the value."""
1418
1419    def format(self, value, buffer=None, indent=0):
1420        if not buffer:
1421            output = io.StringIO()
1422        else:
1423            output = buffer
1424        # If there is a summary, it suffices.
1425        val = value.GetSummary()
1426        # Otherwise, get the value.
1427        if val is None:
1428            val = value.GetValue()
1429        if val is None and value.GetNumChildren() > 0:
1430            val = "%s (location)" % value.GetLocation()
1431        print(
1432            "{indentation}({type}) {name} = {value}".format(
1433                indentation=" " * indent,
1434                type=value.GetTypeName(),
1435                name=value.GetName(),
1436                value=val,
1437            ),
1438            file=output,
1439        )
1440        return output.getvalue()
1441
1442
1443class ChildVisitingFormatter(BasicFormatter):
1444    """The child visiting formatter prints the value and its immediate children.
1445
1446    The constructor takes a keyword arg: indent_child, which defaults to 2.
1447    """
1448
1449    def __init__(self, indent_child=2):
1450        """Default indentation of 2 SPC's for the children."""
1451        self.cindent = indent_child
1452
1453    def format(self, value, buffer=None):
1454        if not buffer:
1455            output = io.StringIO()
1456        else:
1457            output = buffer
1458
1459        BasicFormatter.format(self, value, buffer=output)
1460        for child in value:
1461            BasicFormatter.format(self, child, buffer=output, indent=self.cindent)
1462
1463        return output.getvalue()
1464
1465
1466class RecursiveDecentFormatter(BasicFormatter):
1467    """The recursive decent formatter prints the value and the decendents.
1468
1469    The constructor takes two keyword args: indent_level, which defaults to 0,
1470    and indent_child, which defaults to 2.  The current indentation level is
1471    determined by indent_level, while the immediate children has an additional
1472    indentation by inden_child.
1473    """
1474
1475    def __init__(self, indent_level=0, indent_child=2):
1476        self.lindent = indent_level
1477        self.cindent = indent_child
1478
1479    def format(self, value, buffer=None):
1480        if not buffer:
1481            output = io.StringIO()
1482        else:
1483            output = buffer
1484
1485        BasicFormatter.format(self, value, buffer=output, indent=self.lindent)
1486        new_indent = self.lindent + self.cindent
1487        for child in value:
1488            if child.GetSummary() is not None:
1489                BasicFormatter.format(self, child, buffer=output, indent=new_indent)
1490            else:
1491                if child.GetNumChildren() > 0:
1492                    rdf = RecursiveDecentFormatter(indent_level=new_indent)
1493                    rdf.format(child, buffer=output)
1494                else:
1495                    BasicFormatter.format(self, child, buffer=output, indent=new_indent)
1496
1497        return output.getvalue()
1498
1499
1500# ===========================================================
1501# Utility functions for path manipulation on remote platforms
1502# ===========================================================
1503
1504
1505def join_remote_paths(*paths):
1506    # TODO: update with actual platform name for remote windows once it exists
1507    if lldb.remote_platform.GetName() == "remote-windows":
1508        return os.path.join(*paths).replace(os.path.sep, "\\")
1509    return os.path.join(*paths).replace(os.path.sep, "/")
1510
1511
1512def append_to_process_working_directory(test, *paths):
1513    remote = lldb.remote_platform
1514    if remote:
1515        return join_remote_paths(remote.GetWorkingDirectory(), *paths)
1516    return os.path.join(test.getBuildDir(), *paths)
1517
1518
1519# ==================================================
1520# Utility functions to get the correct signal number
1521# ==================================================
1522
1523import signal
1524
1525
1526def get_signal_number(signal_name):
1527    platform = lldb.remote_platform
1528    if platform and platform.IsValid():
1529        signals = platform.GetUnixSignals()
1530        if signals.IsValid():
1531            signal_number = signals.GetSignalNumberFromName(signal_name)
1532            if signal_number > 0:
1533                return signal_number
1534    # No remote platform; fall back to using local python signals.
1535    return getattr(signal, signal_name)
1536
1537
1538def get_actions_for_signal(
1539    testcase, signal_name, from_target=False, expected_absent=False
1540):
1541    """Returns a triple of (pass, stop, notify)"""
1542    return_obj = lldb.SBCommandReturnObject()
1543    command = "process handle {0}".format(signal_name)
1544    if from_target:
1545        command += " -t"
1546    testcase.dbg.GetCommandInterpreter().HandleCommand(command, return_obj)
1547    match = re.match(
1548        "NAME *PASS *STOP *NOTIFY.*(false|true|not set) *(false|true|not set) *(false|true|not set)",
1549        return_obj.GetOutput(),
1550        re.IGNORECASE | re.DOTALL,
1551    )
1552    if match and expected_absent:
1553        testcase.fail('Signal "{0}" was supposed to be absent'.format(signal_name))
1554    if not match:
1555        if expected_absent:
1556            return (None, None, None)
1557        testcase.fail("Unable to retrieve default signal disposition.")
1558    return (match.group(1), match.group(2), match.group(3))
1559
1560
1561def set_actions_for_signal(
1562    testcase, signal_name, pass_action, stop_action, notify_action, expect_success=True
1563):
1564    return_obj = lldb.SBCommandReturnObject()
1565    command = "process handle {0}".format(signal_name)
1566    if pass_action is not None:
1567        command += " -p {0}".format(pass_action)
1568    if stop_action is not None:
1569        command += " -s {0}".format(stop_action)
1570    if notify_action is not None:
1571        command += " -n {0}".format(notify_action)
1572
1573    testcase.dbg.GetCommandInterpreter().HandleCommand(command, return_obj)
1574    testcase.assertEqual(
1575        expect_success,
1576        return_obj.Succeeded(),
1577        "Setting signal handling for {0} worked as expected".format(signal_name),
1578    )
1579
1580
1581def skip_if_callable(test, mycallable, reason):
1582    if callable(mycallable):
1583        if mycallable(test):
1584            test.skipTest(reason)
1585            return True
1586    return False
1587
1588
1589def skip_if_library_missing(test, target, library):
1590    def find_library(target, library):
1591        for module in target.modules:
1592            filename = module.file.GetFilename()
1593            if isinstance(library, str):
1594                if library == filename:
1595                    return False
1596            elif hasattr(library, "match"):
1597                if library.match(filename):
1598                    return False
1599        return True
1600
1601    def find_library_callable(test):
1602        return find_library(target, library)
1603
1604    return skip_if_callable(
1605        test,
1606        find_library_callable,
1607        "could not find library matching '%s' in target %s" % (library, target),
1608    )
1609
1610
1611def install_to_target(test, path):
1612    if lldb.remote_platform:
1613        filename = os.path.basename(path)
1614        remote_path = append_to_process_working_directory(test, filename)
1615        err = lldb.remote_platform.Install(
1616            lldb.SBFileSpec(path, True), lldb.SBFileSpec(remote_path, False)
1617        )
1618        if err.Fail():
1619            raise Exception(
1620                "remote_platform.Install('%s', '%s') failed: %s"
1621                % (path, remote_path, err)
1622            )
1623        path = remote_path
1624    return path
1625
1626
1627def read_file_on_target(test, remote):
1628    if lldb.remote_platform:
1629        local = test.getBuildArtifact("file_from_target")
1630        error = lldb.remote_platform.Get(
1631            lldb.SBFileSpec(remote, False), lldb.SBFileSpec(local, True)
1632        )
1633        test.assertTrue(
1634            error.Success(), "Reading file {0} failed: {1}".format(remote, error)
1635        )
1636    else:
1637        local = remote
1638    with open(local, "r") as f:
1639        return f.read()
1640
1641
1642def read_file_from_process_wd(test, name):
1643    path = append_to_process_working_directory(test, name)
1644    return read_file_on_target(test, path)
1645
1646
1647def wait_for_file_on_target(testcase, file_path, max_attempts=6):
1648    for i in range(max_attempts):
1649        err, retcode, msg = testcase.run_platform_command("ls %s" % file_path)
1650        if err.Success() and retcode == 0:
1651            break
1652        if i < max_attempts:
1653            # Exponential backoff!
1654            import time
1655
1656            time.sleep(pow(2, i) * 0.25)
1657    else:
1658        testcase.fail(
1659            "File %s not found even after %d attempts." % (file_path, max_attempts)
1660        )
1661
1662    return read_file_on_target(testcase, file_path)
1663
1664
1665def packetlog_get_process_info(log):
1666    """parse a gdb-remote packet log file and extract the response to qProcessInfo"""
1667    process_info = dict()
1668    with open(log, "r") as logfile:
1669        process_info_ostype = None
1670        expect_process_info_response = False
1671        for line in logfile:
1672            if expect_process_info_response:
1673                for pair in line.split(";"):
1674                    keyval = pair.split(":")
1675                    if len(keyval) == 2:
1676                        process_info[keyval[0]] = keyval[1]
1677                break
1678            if "send packet: $qProcessInfo#" in line:
1679                expect_process_info_response = True
1680    return process_info
1681
1682
1683def packetlog_get_dylib_info(log):
1684    """parse a gdb-remote packet log file and extract the *last* complete
1685    (=> fetch_all_solibs=true) response to jGetLoadedDynamicLibrariesInfos"""
1686    import json
1687
1688    dylib_info = None
1689    with open(log, "r") as logfile:
1690        dylib_info = None
1691        expect_dylib_info_response = False
1692        for line in logfile:
1693            if expect_dylib_info_response:
1694                while line[0] != "$":
1695                    line = line[1:]
1696                line = line[1:]
1697                # Unescape '}'.
1698                dylib_info = json.loads(line.replace("}]", "}")[:-4])
1699                expect_dylib_info_response = False
1700            if (
1701                'send packet: $jGetLoadedDynamicLibrariesInfos:{"fetch_all_solibs":true}'
1702                in line
1703            ):
1704                expect_dylib_info_response = True
1705
1706    return dylib_info
1707