1# -*- coding: utf-8 -*- 2# Part of the LLVM Project, under the Apache License v2.0 with LLVM Exceptions. 3# See https://llvm.org/LICENSE.txt for license information. 4# SPDX-License-Identifier: Apache-2.0 WITH LLVM-exception 5""" This module is a collection of methods commonly used in this project. """ 6import collections 7import functools 8import json 9import logging 10import os 11import os.path 12import re 13import shlex 14import subprocess 15import sys 16 17ENVIRONMENT_KEY = "INTERCEPT_BUILD" 18 19Execution = collections.namedtuple("Execution", ["pid", "cwd", "cmd"]) 20 21CtuConfig = collections.namedtuple( 22 "CtuConfig", ["collect", "analyze", "dir", "extdef_map_cmd"] 23) 24 25 26def duplicate_check(method): 27 """Predicate to detect duplicated entries. 28 29 Unique hash method can be use to detect duplicates. Entries are 30 represented as dictionaries, which has no default hash method. 31 This implementation uses a set datatype to store the unique hash values. 32 33 This method returns a method which can detect the duplicate values.""" 34 35 def predicate(entry): 36 entry_hash = predicate.unique(entry) 37 if entry_hash not in predicate.state: 38 predicate.state.add(entry_hash) 39 return False 40 return True 41 42 predicate.unique = method 43 predicate.state = set() 44 return predicate 45 46 47def run_build(command, *args, **kwargs): 48 """Run and report build command execution 49 50 :param command: array of tokens 51 :return: exit code of the process 52 """ 53 environment = kwargs.get("env", os.environ) 54 logging.debug("run build %s, in environment: %s", command, environment) 55 exit_code = subprocess.call(command, *args, **kwargs) 56 logging.debug("build finished with exit code: %d", exit_code) 57 return exit_code 58 59 60def run_command(command, cwd=None): 61 """Run a given command and report the execution. 62 63 :param command: array of tokens 64 :param cwd: the working directory where the command will be executed 65 :return: output of the command 66 """ 67 68 def decode_when_needed(result): 69 """check_output returns bytes or string depend on python version""" 70 return result.decode("utf-8") if isinstance(result, bytes) else result 71 72 try: 73 directory = os.path.abspath(cwd) if cwd else os.getcwd() 74 logging.debug("exec command %s in %s", command, directory) 75 output = subprocess.check_output( 76 command, cwd=directory, stderr=subprocess.STDOUT 77 ) 78 return decode_when_needed(output).splitlines() 79 except subprocess.CalledProcessError as ex: 80 ex.output = decode_when_needed(ex.output).splitlines() 81 raise ex 82 83 84def reconfigure_logging(verbose_level): 85 """Reconfigure logging level and format based on the verbose flag. 86 87 :param verbose_level: number of `-v` flags received by the command 88 :return: no return value 89 """ 90 # Exit when nothing to do. 91 if verbose_level == 0: 92 return 93 94 root = logging.getLogger() 95 # Tune logging level. 96 level = logging.WARNING - min(logging.WARNING, (10 * verbose_level)) 97 root.setLevel(level) 98 # Be verbose with messages. 99 if verbose_level <= 3: 100 fmt_string = "%(name)s: %(levelname)s: %(message)s" 101 else: 102 fmt_string = "%(name)s: %(levelname)s: %(funcName)s: %(message)s" 103 handler = logging.StreamHandler(sys.stdout) 104 handler.setFormatter(logging.Formatter(fmt=fmt_string)) 105 root.handlers = [handler] 106 107 108def command_entry_point(function): 109 """Decorator for command entry methods. 110 111 The decorator initialize/shutdown logging and guard on programming 112 errors (catch exceptions). 113 114 The decorated method can have arbitrary parameters, the return value will 115 be the exit code of the process.""" 116 117 @functools.wraps(function) 118 def wrapper(*args, **kwargs): 119 """Do housekeeping tasks and execute the wrapped method.""" 120 121 try: 122 logging.basicConfig( 123 format="%(name)s: %(message)s", level=logging.WARNING, stream=sys.stdout 124 ) 125 # This hack to get the executable name as %(name). 126 logging.getLogger().name = os.path.basename(sys.argv[0]) 127 return function(*args, **kwargs) 128 except KeyboardInterrupt: 129 logging.warning("Keyboard interrupt") 130 return 130 # Signal received exit code for bash. 131 except Exception: 132 logging.exception("Internal error.") 133 if logging.getLogger().isEnabledFor(logging.DEBUG): 134 logging.error( 135 "Please report this bug and attach the output " "to the bug report" 136 ) 137 else: 138 logging.error( 139 "Please run this command again and turn on " 140 "verbose mode (add '-vvvv' as argument)." 141 ) 142 return 64 # Some non used exit code for internal errors. 143 finally: 144 logging.shutdown() 145 146 return wrapper 147 148 149def compiler_wrapper(function): 150 """Implements compiler wrapper base functionality. 151 152 A compiler wrapper executes the real compiler, then implement some 153 functionality, then returns with the real compiler exit code. 154 155 :param function: the extra functionality what the wrapper want to 156 do on top of the compiler call. If it throws exception, it will be 157 caught and logged. 158 :return: the exit code of the real compiler. 159 160 The :param function: will receive the following arguments: 161 162 :param result: the exit code of the compilation. 163 :param execution: the command executed by the wrapper.""" 164 165 def is_cxx_compiler(): 166 """Find out was it a C++ compiler call. Compiler wrapper names 167 contain the compiler type. C++ compiler wrappers ends with `c++`, 168 but might have `.exe` extension on windows.""" 169 170 wrapper_command = os.path.basename(sys.argv[0]) 171 return re.match(r"(.+)c\+\+(.*)", wrapper_command) 172 173 def run_compiler(executable): 174 """Execute compilation with the real compiler.""" 175 176 command = executable + sys.argv[1:] 177 logging.debug("compilation: %s", command) 178 result = subprocess.call(command) 179 logging.debug("compilation exit code: %d", result) 180 return result 181 182 # Get relevant parameters from environment. 183 parameters = json.loads(os.environ[ENVIRONMENT_KEY]) 184 reconfigure_logging(parameters["verbose"]) 185 # Execute the requested compilation. Do crash if anything goes wrong. 186 cxx = is_cxx_compiler() 187 compiler = parameters["cxx"] if cxx else parameters["cc"] 188 result = run_compiler(compiler) 189 # Call the wrapped method and ignore it's return value. 190 try: 191 call = Execution( 192 pid=os.getpid(), 193 cwd=os.getcwd(), 194 cmd=["c++" if cxx else "cc"] + sys.argv[1:], 195 ) 196 function(result, call) 197 except: 198 logging.exception("Compiler wrapper failed complete.") 199 finally: 200 # Always return the real compiler exit code. 201 return result 202 203 204def wrapper_environment(args): 205 """Set up environment for interpose compiler wrapper.""" 206 207 return { 208 ENVIRONMENT_KEY: json.dumps( 209 { 210 "verbose": args.verbose, 211 "cc": shlex.split(args.cc), 212 "cxx": shlex.split(args.cxx), 213 } 214 ) 215 } 216