1#!/usr/bin/env python 2 3""" 4CmpRuns - A simple tool for comparing two static analyzer runs to determine 5which reports have been added, removed, or changed. 6 7This is designed to support automated testing using the static analyzer, from 8two perspectives: 9 1. To monitor changes in the static analyzer's reports on real code bases, 10 for regression testing. 11 12 2. For use by end users who want to integrate regular static analyzer testing 13 into a buildbot like environment. 14 15Usage: 16 17 # Load the results of both runs, to obtain lists of the corresponding 18 # AnalysisDiagnostic objects. 19 # 20 resultsA = load_results_from_single_run(singleRunInfoA, delete_empty) 21 resultsB = load_results_from_single_run(singleRunInfoB, delete_empty) 22 23 # Generate a relation from diagnostics in run A to diagnostics in run B 24 # to obtain a list of triples (a, b, confidence). 25 diff = compare_results(resultsA, resultsB) 26 27""" 28import json 29import os 30import plistlib 31import re 32import sys 33 34from math import log 35from collections import defaultdict 36from copy import copy 37from enum import Enum 38from typing import (Any, cast, Dict, List, NamedTuple, Optional, Sequence, 39 TextIO, TypeVar, Tuple, Union) 40 41 42Number = Union[int, float] 43Stats = Dict[str, Dict[str, Number]] 44Plist = Dict[str, Any] 45JSON = Dict[str, Any] 46# Type for generics 47T = TypeVar('T') 48 49STATS_REGEXP = re.compile(r"Statistics: (\{.+\})", re.MULTILINE | re.DOTALL) 50 51 52class Colors: 53 """ 54 Color for terminal highlight. 55 """ 56 RED = '\x1b[2;30;41m' 57 GREEN = '\x1b[6;30;42m' 58 CLEAR = '\x1b[0m' 59 60 61class HistogramType(str, Enum): 62 RELATIVE = "relative" 63 LOG_RELATIVE = "log-relative" 64 ABSOLUTE = "absolute" 65 66 67class ResultsDirectory(NamedTuple): 68 path: str 69 root: str = "" 70 71 72class SingleRunInfo: 73 """ 74 Information about analysis run: 75 path - the analysis output directory 76 root - the name of the root directory, which will be disregarded when 77 determining the source file name 78 """ 79 def __init__(self, results: ResultsDirectory, 80 verbose_log: Optional[str] = None): 81 self.path = results.path 82 self.root = results.root.rstrip("/\\") 83 self.verbose_log = verbose_log 84 85 86class AnalysisDiagnostic: 87 def __init__(self, data: Plist, report: "AnalysisReport", 88 html_report: Optional[str]): 89 self._data = data 90 self._loc = self._data['location'] 91 self._report = report 92 self._html_report = html_report 93 self._report_size = len(self._data['path']) 94 95 def get_file_name(self) -> str: 96 root = self._report.run.root 97 file_name = self._report.files[self._loc['file']] 98 99 if file_name.startswith(root) and len(root) > 0: 100 return file_name[len(root) + 1:] 101 102 return file_name 103 104 def get_root_file_name(self) -> str: 105 path = self._data['path'] 106 107 if not path: 108 return self.get_file_name() 109 110 p = path[0] 111 if 'location' in p: 112 file_index = p['location']['file'] 113 else: # control edge 114 file_index = path[0]['edges'][0]['start'][0]['file'] 115 116 out = self._report.files[file_index] 117 root = self._report.run.root 118 119 if out.startswith(root): 120 return out[len(root):] 121 122 return out 123 124 def get_line(self) -> int: 125 return self._loc['line'] 126 127 def get_column(self) -> int: 128 return self._loc['col'] 129 130 def get_path_length(self) -> int: 131 return self._report_size 132 133 def get_category(self) -> str: 134 return self._data['category'] 135 136 def get_description(self) -> str: 137 return self._data['description'] 138 139 def get_issue_identifier(self) -> str: 140 id = self.get_file_name() + "+" 141 142 if "issue_context" in self._data: 143 id += self._data["issue_context"] + "+" 144 145 if "issue_hash_content_of_line_in_context" in self._data: 146 id += str(self._data["issue_hash_content_of_line_in_context"]) 147 148 return id 149 150 def get_html_report(self) -> str: 151 if self._html_report is None: 152 return " " 153 154 return os.path.join(self._report.run.path, self._html_report) 155 156 def get_readable_name(self) -> str: 157 if "issue_context" in self._data: 158 funcname_postfix = "#" + self._data["issue_context"] 159 else: 160 funcname_postfix = "" 161 162 root_filename = self.get_root_file_name() 163 file_name = self.get_file_name() 164 165 if root_filename != file_name: 166 file_prefix = f"[{root_filename}] {file_name}" 167 else: 168 file_prefix = root_filename 169 170 line = self.get_line() 171 col = self.get_column() 172 return f"{file_prefix}{funcname_postfix}:{line}:{col}" \ 173 f", {self.get_category()}: {self.get_description()}" 174 175 # Note, the data format is not an API and may change from one analyzer 176 # version to another. 177 def get_raw_data(self) -> Plist: 178 return self._data 179 180 181class AnalysisRun: 182 def __init__(self, info: SingleRunInfo): 183 self.path = info.path 184 self.root = info.root 185 self.info = info 186 self.reports: List[AnalysisReport] = [] 187 # Cumulative list of all diagnostics from all the reports. 188 self.diagnostics: List[AnalysisDiagnostic] = [] 189 self.clang_version: Optional[str] = None 190 self.raw_stats: List[JSON] = [] 191 192 def get_clang_version(self) -> Optional[str]: 193 return self.clang_version 194 195 def read_single_file(self, path: str, delete_empty: bool): 196 with open(path, "rb") as plist_file: 197 data = plistlib.load(plist_file) 198 199 if 'statistics' in data: 200 self.raw_stats.append(json.loads(data['statistics'])) 201 data.pop('statistics') 202 203 # We want to retrieve the clang version even if there are no 204 # reports. Assume that all reports were created using the same 205 # clang version (this is always true and is more efficient). 206 if 'clang_version' in data: 207 if self.clang_version is None: 208 self.clang_version = data.pop('clang_version') 209 else: 210 data.pop('clang_version') 211 212 # Ignore/delete empty reports. 213 if not data['files']: 214 if delete_empty: 215 os.remove(path) 216 return 217 218 # Extract the HTML reports, if they exists. 219 if 'HTMLDiagnostics_files' in data['diagnostics'][0]: 220 htmlFiles = [] 221 for d in data['diagnostics']: 222 # FIXME: Why is this named files, when does it have multiple 223 # files? 224 assert len(d['HTMLDiagnostics_files']) == 1 225 htmlFiles.append(d.pop('HTMLDiagnostics_files')[0]) 226 else: 227 htmlFiles = [None] * len(data['diagnostics']) 228 229 report = AnalysisReport(self, data.pop('files')) 230 diagnostics = [AnalysisDiagnostic(d, report, h) 231 for d, h in zip(data.pop('diagnostics'), htmlFiles)] 232 233 assert not data 234 235 report.diagnostics.extend(diagnostics) 236 self.reports.append(report) 237 self.diagnostics.extend(diagnostics) 238 239 240class AnalysisReport: 241 def __init__(self, run: AnalysisRun, files: List[str]): 242 self.run = run 243 self.files = files 244 self.diagnostics: List[AnalysisDiagnostic] = [] 245 246 247def load_results(results: ResultsDirectory, delete_empty: bool = True, 248 verbose_log: Optional[str] = None) -> AnalysisRun: 249 """ 250 Backwards compatibility API. 251 """ 252 return load_results_from_single_run(SingleRunInfo(results, 253 verbose_log), 254 delete_empty) 255 256 257def load_results_from_single_run(info: SingleRunInfo, 258 delete_empty: bool = True) -> AnalysisRun: 259 """ 260 # Load results of the analyzes from a given output folder. 261 # - info is the SingleRunInfo object 262 # - delete_empty specifies if the empty plist files should be deleted 263 264 """ 265 path = info.path 266 run = AnalysisRun(info) 267 268 if os.path.isfile(path): 269 run.read_single_file(path, delete_empty) 270 else: 271 for dirpath, dirnames, filenames in os.walk(path): 272 for f in filenames: 273 if not f.endswith('plist'): 274 continue 275 276 p = os.path.join(dirpath, f) 277 run.read_single_file(p, delete_empty) 278 279 return run 280 281 282def cmp_analysis_diagnostic(d): 283 return d.get_issue_identifier() 284 285 286PresentInBoth = Tuple[AnalysisDiagnostic, AnalysisDiagnostic] 287PresentOnlyInOld = Tuple[AnalysisDiagnostic, None] 288PresentOnlyInNew = Tuple[None, AnalysisDiagnostic] 289ComparisonResult = List[Union[PresentInBoth, 290 PresentOnlyInOld, 291 PresentOnlyInNew]] 292 293 294def compare_results(results_old: AnalysisRun, results_new: AnalysisRun, 295 histogram: Optional[HistogramType] = None 296 ) -> ComparisonResult: 297 """ 298 compare_results - Generate a relation from diagnostics in run A to 299 diagnostics in run B. 300 301 The result is the relation as a list of triples (a, b) where 302 each element {a,b} is None or a matching element from the respective run 303 """ 304 305 res: ComparisonResult = [] 306 307 # Map size_before -> size_after 308 path_difference_data: List[float] = [] 309 310 # Quickly eliminate equal elements. 311 neq_old: List[AnalysisDiagnostic] = [] 312 neq_new: List[AnalysisDiagnostic] = [] 313 314 diags_old = copy(results_old.diagnostics) 315 diags_new = copy(results_new.diagnostics) 316 317 diags_old.sort(key=cmp_analysis_diagnostic) 318 diags_new.sort(key=cmp_analysis_diagnostic) 319 320 while diags_old and diags_new: 321 a = diags_old.pop() 322 b = diags_new.pop() 323 324 if a.get_issue_identifier() == b.get_issue_identifier(): 325 if a.get_path_length() != b.get_path_length(): 326 327 if histogram == HistogramType.RELATIVE: 328 path_difference_data.append( 329 float(a.get_path_length()) / b.get_path_length()) 330 331 elif histogram == HistogramType.LOG_RELATIVE: 332 path_difference_data.append( 333 log(float(a.get_path_length()) / b.get_path_length())) 334 335 elif histogram == HistogramType.ABSOLUTE: 336 path_difference_data.append( 337 a.get_path_length() - b.get_path_length()) 338 339 res.append((a, b)) 340 341 elif a.get_issue_identifier() > b.get_issue_identifier(): 342 diags_new.append(b) 343 neq_old.append(a) 344 345 else: 346 diags_old.append(a) 347 neq_new.append(b) 348 349 neq_old.extend(diags_old) 350 neq_new.extend(diags_new) 351 352 # FIXME: Add fuzzy matching. One simple and possible effective idea would 353 # be to bin the diagnostics, print them in a normalized form (based solely 354 # on the structure of the diagnostic), compute the diff, then use that as 355 # the basis for matching. This has the nice property that we don't depend 356 # in any way on the diagnostic format. 357 358 for a in neq_old: 359 res.append((a, None)) 360 for b in neq_new: 361 res.append((None, b)) 362 363 if histogram: 364 from matplotlib import pyplot 365 pyplot.hist(path_difference_data, bins=100) 366 pyplot.show() 367 368 return res 369 370 371def compute_percentile(values: Sequence[T], percentile: float) -> T: 372 """ 373 Return computed percentile. 374 """ 375 return sorted(values)[int(round(percentile * len(values) + 0.5)) - 1] 376 377 378def derive_stats(results: AnalysisRun) -> Stats: 379 # Assume all keys are the same in each statistics bucket. 380 combined_data = defaultdict(list) 381 382 # Collect data on paths length. 383 for report in results.reports: 384 for diagnostic in report.diagnostics: 385 combined_data['PathsLength'].append(diagnostic.get_path_length()) 386 387 for stat in results.raw_stats: 388 for key, value in stat.items(): 389 combined_data[str(key)].append(value) 390 391 combined_stats: Stats = {} 392 393 for key, values in combined_data.items(): 394 combined_stats[key] = { 395 "max": max(values), 396 "min": min(values), 397 "mean": sum(values) / len(values), 398 "90th %tile": compute_percentile(values, 0.9), 399 "95th %tile": compute_percentile(values, 0.95), 400 "median": sorted(values)[len(values) // 2], 401 "total": sum(values) 402 } 403 404 return combined_stats 405 406 407# TODO: compare_results decouples comparison from the output, we should 408# do it here as well 409def compare_stats(results_old: AnalysisRun, results_new: AnalysisRun, 410 out: TextIO = sys.stdout): 411 stats_old = derive_stats(results_old) 412 stats_new = derive_stats(results_new) 413 414 old_keys = set(stats_old.keys()) 415 new_keys = set(stats_new.keys()) 416 keys = sorted(old_keys & new_keys) 417 418 for key in keys: 419 out.write(f"{key}\n") 420 421 nested_keys = sorted(set(stats_old[key]) & set(stats_new[key])) 422 423 for nested_key in nested_keys: 424 val_old = float(stats_old[key][nested_key]) 425 val_new = float(stats_new[key][nested_key]) 426 427 report = f"{val_old:.3f} -> {val_new:.3f}" 428 429 # Only apply highlighting when writing to TTY and it's not Windows 430 if out.isatty() and os.name != 'nt': 431 if val_new != 0: 432 ratio = (val_new - val_old) / val_new 433 if ratio < -0.2: 434 report = Colors.GREEN + report + Colors.CLEAR 435 elif ratio > 0.2: 436 report = Colors.RED + report + Colors.CLEAR 437 438 out.write(f"\t {nested_key} {report}\n") 439 440 removed_keys = old_keys - new_keys 441 if removed_keys: 442 out.write(f"REMOVED statistics: {removed_keys}\n") 443 444 added_keys = new_keys - old_keys 445 if added_keys: 446 out.write(f"ADDED statistics: {added_keys}\n") 447 448 out.write("\n") 449 450 451def dump_scan_build_results_diff(dir_old: ResultsDirectory, 452 dir_new: ResultsDirectory, 453 delete_empty: bool = True, 454 out: TextIO = sys.stdout, 455 show_stats: bool = False, 456 stats_only: bool = False, 457 histogram: Optional[HistogramType] = None, 458 verbose_log: Optional[str] = None): 459 """ 460 Compare directories with analysis results and dump results. 461 462 :param delete_empty: delete empty plist files 463 :param out: buffer to dump comparison results to. 464 :param show_stats: compare execution stats as well. 465 :param stats_only: compare ONLY execution stats. 466 :param histogram: optional histogram type to plot path differences. 467 :param verbose_log: optional path to an additional log file. 468 """ 469 results_old = load_results(dir_old, delete_empty, verbose_log) 470 results_new = load_results(dir_new, delete_empty, verbose_log) 471 472 if show_stats or stats_only: 473 compare_stats(results_old, results_new) 474 if stats_only: 475 return 476 477 # Open the verbose log, if given. 478 if verbose_log: 479 auxLog: Optional[TextIO] = open(verbose_log, "w") 480 else: 481 auxLog = None 482 483 diff = compare_results(results_old, results_new, histogram) 484 found_diffs = 0 485 total_added = 0 486 total_removed = 0 487 488 for res in diff: 489 old, new = res 490 if old is None: 491 # TODO: mypy still doesn't understand that old and new can't be 492 # both Nones, we should introduce a better type solution 493 new = cast(AnalysisDiagnostic, new) 494 out.write(f"ADDED: {new.get_readable_name()}\n") 495 found_diffs += 1 496 total_added += 1 497 if auxLog: 498 auxLog.write(f"('ADDED', {new.get_readable_name()}, " 499 f"{new.get_html_report()})\n") 500 501 elif new is None: 502 out.write(f"REMOVED: {old.get_readable_name()}\n") 503 found_diffs += 1 504 total_removed += 1 505 if auxLog: 506 auxLog.write(f"('REMOVED', {old.get_readable_name()}, " 507 f"{old.get_html_report()})\n") 508 else: 509 pass 510 511 total_reports = len(results_new.diagnostics) 512 out.write(f"TOTAL REPORTS: {total_reports}\n") 513 out.write(f"TOTAL ADDED: {total_added}\n") 514 out.write(f"TOTAL REMOVED: {total_removed}\n") 515 516 if auxLog: 517 auxLog.write(f"('TOTAL NEW REPORTS', {total_reports})\n") 518 auxLog.write(f"('TOTAL DIFFERENCES', {found_diffs})\n") 519 auxLog.close() 520 521 # TODO: change to NamedTuple 522 return found_diffs, len(results_old.diagnostics), \ 523 len(results_new.diagnostics) 524 525 526if __name__ == "__main__": 527 print("CmpRuns.py should not be used on its own.") 528 print("Please use 'SATest.py compare' instead") 529 sys.exit(1) 530