1import os 2import sys 3import time 4 5from subprocess import CalledProcessError, check_call 6from typing import List, IO, Optional, Tuple 7 8 9def which(command: str, paths: Optional[str] = None) -> Optional[str]: 10 """which(command, [paths]) - Look up the given command in the paths string 11 (or the PATH environment variable, if unspecified).""" 12 13 if paths is None: 14 paths = os.environ.get("PATH", "") 15 16 # Check for absolute match first. 17 if os.path.exists(command): 18 return command 19 20 # Would be nice if Python had a lib function for this. 21 if not paths: 22 paths = os.defpath 23 24 # Get suffixes to search. 25 # On Cygwin, 'PATHEXT' may exist but it should not be used. 26 if os.pathsep == ";": 27 pathext = os.environ.get("PATHEXT", "").split(";") 28 else: 29 pathext = [""] 30 31 # Search the paths... 32 for path in paths.split(os.pathsep): 33 for ext in pathext: 34 p = os.path.join(path, command + ext) 35 if os.path.exists(p): 36 return p 37 38 return None 39 40 41def has_no_extension(file_name: str) -> bool: 42 root, ext = os.path.splitext(file_name) 43 return ext == "" 44 45 46def is_valid_single_input_file(file_name: str) -> bool: 47 root, ext = os.path.splitext(file_name) 48 return ext in (".i", ".ii", ".c", ".cpp", ".m", "") 49 50 51def time_to_str(time: float) -> str: 52 """ 53 Convert given time in seconds into a human-readable string. 54 """ 55 return f"{time:.2f}s" 56 57 58def memory_to_str(memory: int) -> str: 59 """ 60 Convert given number of bytes into a human-readable string. 61 """ 62 if memory: 63 try: 64 import humanize 65 66 return humanize.naturalsize(memory, gnu=True) 67 except ImportError: 68 # no formatter installed, let's keep it in bytes 69 return f"{memory}B" 70 71 # If memory is 0, we didn't succeed measuring it. 72 return "N/A" 73 74 75def check_and_measure_call(*popenargs, **kwargs) -> Tuple[float, int]: 76 """ 77 Run command with arguments. Wait for command to complete and measure 78 execution time and peak memory consumption. 79 If the exit code was zero then return, otherwise raise 80 CalledProcessError. The CalledProcessError object will have the 81 return code in the returncode attribute. 82 83 The arguments are the same as for the call and check_call functions. 84 85 Return a tuple of execution time and peak memory. 86 """ 87 peak_mem = 0 88 start_time = time.time() 89 90 try: 91 import psutil as ps 92 93 def get_memory(process: ps.Process) -> int: 94 mem = 0 95 96 # we want to gather memory usage from all of the child processes 97 descendants = list(process.children(recursive=True)) 98 descendants.append(process) 99 100 for subprocess in descendants: 101 try: 102 mem += subprocess.memory_info().rss 103 except (ps.NoSuchProcess, ps.AccessDenied): 104 continue 105 106 return mem 107 108 with ps.Popen(*popenargs, **kwargs) as process: 109 # while the process is running calculate resource utilization. 110 while process.is_running() and process.status() != ps.STATUS_ZOMBIE: 111 # track the peak utilization of the process 112 peak_mem = max(peak_mem, get_memory(process)) 113 time.sleep(0.5) 114 115 if process.is_running(): 116 process.kill() 117 118 if process.returncode != 0: 119 cmd = kwargs.get("args") 120 if cmd is None: 121 cmd = popenargs[0] 122 raise CalledProcessError(process.returncode, cmd) 123 124 except ImportError: 125 # back off to subprocess if we don't have psutil installed 126 peak_mem = 0 127 check_call(*popenargs, **kwargs) 128 129 return time.time() - start_time, peak_mem 130 131 132def run_script( 133 script_path: str, 134 build_log_file: IO, 135 cwd: str, 136 out=sys.stdout, 137 err=sys.stderr, 138 verbose: int = 0, 139): 140 """ 141 Run the provided script if it exists. 142 """ 143 if os.path.exists(script_path): 144 try: 145 if verbose == 1: 146 out.write(f" Executing: {script_path}\n") 147 148 check_call( 149 f"chmod +x '{script_path}'", 150 cwd=cwd, 151 stderr=build_log_file, 152 stdout=build_log_file, 153 shell=True, 154 ) 155 156 check_call( 157 f"'{script_path}'", 158 cwd=cwd, 159 stderr=build_log_file, 160 stdout=build_log_file, 161 shell=True, 162 ) 163 164 except CalledProcessError: 165 err.write( 166 f"Error: Running {script_path} failed. " 167 f"See {build_log_file.name} for details.\n" 168 ) 169 sys.exit(-1) 170 171 172def is_comment_csv_line(entries: List[str]) -> bool: 173 """ 174 Treat CSV lines starting with a '#' as a comment. 175 """ 176 return len(entries) > 0 and entries[0].startswith("#") 177