1*7330f729Sjoerg#!/usr/bin/env python 2*7330f729Sjoerg 3*7330f729Sjoerg""" 4*7330f729SjoergThis is a generic fuzz testing tool, see --help for more information. 5*7330f729Sjoerg""" 6*7330f729Sjoerg 7*7330f729Sjoergimport os 8*7330f729Sjoergimport sys 9*7330f729Sjoergimport random 10*7330f729Sjoergimport subprocess 11*7330f729Sjoergimport itertools 12*7330f729Sjoerg 13*7330f729Sjoergclass TestGenerator: 14*7330f729Sjoerg def __init__(self, inputs, delete, insert, replace, 15*7330f729Sjoerg insert_strings, pick_input): 16*7330f729Sjoerg self.inputs = [(s, open(s).read()) for s in inputs] 17*7330f729Sjoerg 18*7330f729Sjoerg self.delete = bool(delete) 19*7330f729Sjoerg self.insert = bool(insert) 20*7330f729Sjoerg self.replace = bool(replace) 21*7330f729Sjoerg self.pick_input = bool(pick_input) 22*7330f729Sjoerg self.insert_strings = list(insert_strings) 23*7330f729Sjoerg 24*7330f729Sjoerg self.num_positions = sum([len(d) for _,d in self.inputs]) 25*7330f729Sjoerg self.num_insert_strings = len(insert_strings) 26*7330f729Sjoerg self.num_tests = ((delete + (insert + replace)*self.num_insert_strings) 27*7330f729Sjoerg * self.num_positions) 28*7330f729Sjoerg self.num_tests += 1 29*7330f729Sjoerg 30*7330f729Sjoerg if self.pick_input: 31*7330f729Sjoerg self.num_tests *= self.num_positions 32*7330f729Sjoerg 33*7330f729Sjoerg def position_to_source_index(self, position): 34*7330f729Sjoerg for i,(s,d) in enumerate(self.inputs): 35*7330f729Sjoerg n = len(d) 36*7330f729Sjoerg if position < n: 37*7330f729Sjoerg return (i,position) 38*7330f729Sjoerg position -= n 39*7330f729Sjoerg raise ValueError,'Invalid position.' 40*7330f729Sjoerg 41*7330f729Sjoerg def get_test(self, index): 42*7330f729Sjoerg assert 0 <= index < self.num_tests 43*7330f729Sjoerg 44*7330f729Sjoerg picked_position = None 45*7330f729Sjoerg if self.pick_input: 46*7330f729Sjoerg index,picked_position = divmod(index, self.num_positions) 47*7330f729Sjoerg picked_position = self.position_to_source_index(picked_position) 48*7330f729Sjoerg 49*7330f729Sjoerg if index == 0: 50*7330f729Sjoerg return ('nothing', None, None, picked_position) 51*7330f729Sjoerg 52*7330f729Sjoerg index -= 1 53*7330f729Sjoerg index,position = divmod(index, self.num_positions) 54*7330f729Sjoerg position = self.position_to_source_index(position) 55*7330f729Sjoerg if self.delete: 56*7330f729Sjoerg if index == 0: 57*7330f729Sjoerg return ('delete', position, None, picked_position) 58*7330f729Sjoerg index -= 1 59*7330f729Sjoerg 60*7330f729Sjoerg index,insert_index = divmod(index, self.num_insert_strings) 61*7330f729Sjoerg insert_str = self.insert_strings[insert_index] 62*7330f729Sjoerg if self.insert: 63*7330f729Sjoerg if index == 0: 64*7330f729Sjoerg return ('insert', position, insert_str, picked_position) 65*7330f729Sjoerg index -= 1 66*7330f729Sjoerg 67*7330f729Sjoerg assert self.replace 68*7330f729Sjoerg assert index == 0 69*7330f729Sjoerg return ('replace', position, insert_str, picked_position) 70*7330f729Sjoerg 71*7330f729Sjoergclass TestApplication: 72*7330f729Sjoerg def __init__(self, tg, test): 73*7330f729Sjoerg self.tg = tg 74*7330f729Sjoerg self.test = test 75*7330f729Sjoerg 76*7330f729Sjoerg def apply(self): 77*7330f729Sjoerg if self.test[0] == 'nothing': 78*7330f729Sjoerg pass 79*7330f729Sjoerg else: 80*7330f729Sjoerg i,j = self.test[1] 81*7330f729Sjoerg name,data = self.tg.inputs[i] 82*7330f729Sjoerg if self.test[0] == 'delete': 83*7330f729Sjoerg data = data[:j] + data[j+1:] 84*7330f729Sjoerg elif self.test[0] == 'insert': 85*7330f729Sjoerg data = data[:j] + self.test[2] + data[j:] 86*7330f729Sjoerg elif self.test[0] == 'replace': 87*7330f729Sjoerg data = data[:j] + self.test[2] + data[j+1:] 88*7330f729Sjoerg else: 89*7330f729Sjoerg raise ValueError,'Invalid test %r' % self.test 90*7330f729Sjoerg open(name,'wb').write(data) 91*7330f729Sjoerg 92*7330f729Sjoerg def revert(self): 93*7330f729Sjoerg if self.test[0] != 'nothing': 94*7330f729Sjoerg i,j = self.test[1] 95*7330f729Sjoerg name,data = self.tg.inputs[i] 96*7330f729Sjoerg open(name,'wb').write(data) 97*7330f729Sjoerg 98*7330f729Sjoergdef quote(str): 99*7330f729Sjoerg return '"' + str + '"' 100*7330f729Sjoerg 101*7330f729Sjoergdef run_one_test(test_application, index, input_files, args): 102*7330f729Sjoerg test = test_application.test 103*7330f729Sjoerg 104*7330f729Sjoerg # Interpolate arguments. 105*7330f729Sjoerg options = { 'index' : index, 106*7330f729Sjoerg 'inputs' : ' '.join(quote(f) for f in input_files) } 107*7330f729Sjoerg 108*7330f729Sjoerg # Add picked input interpolation arguments, if used. 109*7330f729Sjoerg if test[3] is not None: 110*7330f729Sjoerg pos = test[3][1] 111*7330f729Sjoerg options['picked_input'] = input_files[test[3][0]] 112*7330f729Sjoerg options['picked_input_pos'] = pos 113*7330f729Sjoerg # Compute the line and column. 114*7330f729Sjoerg file_data = test_application.tg.inputs[test[3][0]][1] 115*7330f729Sjoerg line = column = 1 116*7330f729Sjoerg for i in range(pos): 117*7330f729Sjoerg c = file_data[i] 118*7330f729Sjoerg if c == '\n': 119*7330f729Sjoerg line += 1 120*7330f729Sjoerg column = 1 121*7330f729Sjoerg else: 122*7330f729Sjoerg column += 1 123*7330f729Sjoerg options['picked_input_line'] = line 124*7330f729Sjoerg options['picked_input_col'] = column 125*7330f729Sjoerg 126*7330f729Sjoerg test_args = [a % options for a in args] 127*7330f729Sjoerg if opts.verbose: 128*7330f729Sjoerg print '%s: note: executing %r' % (sys.argv[0], test_args) 129*7330f729Sjoerg 130*7330f729Sjoerg stdout = None 131*7330f729Sjoerg stderr = None 132*7330f729Sjoerg if opts.log_dir: 133*7330f729Sjoerg stdout_log_path = os.path.join(opts.log_dir, '%s.out' % index) 134*7330f729Sjoerg stderr_log_path = os.path.join(opts.log_dir, '%s.err' % index) 135*7330f729Sjoerg stdout = open(stdout_log_path, 'wb') 136*7330f729Sjoerg stderr = open(stderr_log_path, 'wb') 137*7330f729Sjoerg else: 138*7330f729Sjoerg sys.stdout.flush() 139*7330f729Sjoerg p = subprocess.Popen(test_args, stdout=stdout, stderr=stderr) 140*7330f729Sjoerg p.communicate() 141*7330f729Sjoerg exit_code = p.wait() 142*7330f729Sjoerg 143*7330f729Sjoerg test_result = (exit_code == opts.expected_exit_code or 144*7330f729Sjoerg exit_code in opts.extra_exit_codes) 145*7330f729Sjoerg 146*7330f729Sjoerg if stdout is not None: 147*7330f729Sjoerg stdout.close() 148*7330f729Sjoerg stderr.close() 149*7330f729Sjoerg 150*7330f729Sjoerg # Remove the logs for passes, unless logging all results. 151*7330f729Sjoerg if not opts.log_all and test_result: 152*7330f729Sjoerg os.remove(stdout_log_path) 153*7330f729Sjoerg os.remove(stderr_log_path) 154*7330f729Sjoerg 155*7330f729Sjoerg if not test_result: 156*7330f729Sjoerg print 'FAIL: %d' % index 157*7330f729Sjoerg elif not opts.succinct: 158*7330f729Sjoerg print 'PASS: %d' % index 159*7330f729Sjoerg return test_result 160*7330f729Sjoerg 161*7330f729Sjoergdef main(): 162*7330f729Sjoerg global opts 163*7330f729Sjoerg from optparse import OptionParser, OptionGroup 164*7330f729Sjoerg parser = OptionParser("""%prog [options] ... test command args ... 165*7330f729Sjoerg 166*7330f729Sjoerg%prog is a tool for fuzzing inputs and testing them. 167*7330f729Sjoerg 168*7330f729SjoergThe most basic usage is something like: 169*7330f729Sjoerg 170*7330f729Sjoerg $ %prog --file foo.txt ./test.sh 171*7330f729Sjoerg 172*7330f729Sjoergwhich will run a default list of fuzzing strategies on the input. For each 173*7330f729Sjoergfuzzed input, it will overwrite the input files (in place), run the test script, 174*7330f729Sjoergthen restore the files back to their original contents. 175*7330f729Sjoerg 176*7330f729SjoergNOTE: You should make sure you have a backup copy of your inputs, in case 177*7330f729Sjoergsomething goes wrong!!! 178*7330f729Sjoerg 179*7330f729SjoergYou can cause the fuzzing to not restore the original files with 180*7330f729Sjoerg'--no-revert'. Generally this is used with '--test <index>' to run one failing 181*7330f729Sjoergtest and then leave the fuzzed inputs in place to examine the failure. 182*7330f729Sjoerg 183*7330f729SjoergFor each fuzzed input, %prog will run the test command given on the command 184*7330f729Sjoergline. Each argument in the command is subject to string interpolation before 185*7330f729Sjoergbeing executed. The syntax is "%(VARIABLE)FORMAT" where FORMAT is a standard 186*7330f729Sjoergprintf format, and VARIABLE is one of: 187*7330f729Sjoerg 188*7330f729Sjoerg 'index' - the test index being run 189*7330f729Sjoerg 'inputs' - the full list of test inputs 190*7330f729Sjoerg 'picked_input' - (with --pick-input) the selected input file 191*7330f729Sjoerg 'picked_input_pos' - (with --pick-input) the selected input position 192*7330f729Sjoerg 'picked_input_line' - (with --pick-input) the selected input line 193*7330f729Sjoerg 'picked_input_col' - (with --pick-input) the selected input column 194*7330f729Sjoerg 195*7330f729SjoergBy default, the script will run forever continually picking new tests to 196*7330f729Sjoergrun. You can limit the number of tests that are run with '--max-tests <number>', 197*7330f729Sjoergand you can run a particular test with '--test <index>'. 198*7330f729Sjoerg 199*7330f729SjoergYou can specify '--stop-on-fail' to stop the script on the first failure 200*7330f729Sjoergwithout reverting the changes. 201*7330f729Sjoerg 202*7330f729Sjoerg""") 203*7330f729Sjoerg parser.add_option("-v", "--verbose", help="Show more output", 204*7330f729Sjoerg action='store_true', dest="verbose", default=False) 205*7330f729Sjoerg parser.add_option("-s", "--succinct", help="Reduce amount of output", 206*7330f729Sjoerg action="store_true", dest="succinct", default=False) 207*7330f729Sjoerg 208*7330f729Sjoerg group = OptionGroup(parser, "Test Execution") 209*7330f729Sjoerg group.add_option("", "--expected-exit-code", help="Set expected exit code", 210*7330f729Sjoerg type=int, dest="expected_exit_code", 211*7330f729Sjoerg default=0) 212*7330f729Sjoerg group.add_option("", "--extra-exit-code", 213*7330f729Sjoerg help="Set additional expected exit code", 214*7330f729Sjoerg type=int, action="append", dest="extra_exit_codes", 215*7330f729Sjoerg default=[]) 216*7330f729Sjoerg group.add_option("", "--log-dir", 217*7330f729Sjoerg help="Capture test logs to an output directory", 218*7330f729Sjoerg type=str, dest="log_dir", 219*7330f729Sjoerg default=None) 220*7330f729Sjoerg group.add_option("", "--log-all", 221*7330f729Sjoerg help="Log all outputs (not just failures)", 222*7330f729Sjoerg action="store_true", dest="log_all", default=False) 223*7330f729Sjoerg parser.add_option_group(group) 224*7330f729Sjoerg 225*7330f729Sjoerg group = OptionGroup(parser, "Input Files") 226*7330f729Sjoerg group.add_option("", "--file", metavar="PATH", 227*7330f729Sjoerg help="Add an input file to fuzz", 228*7330f729Sjoerg type=str, action="append", dest="input_files", default=[]) 229*7330f729Sjoerg group.add_option("", "--filelist", metavar="LIST", 230*7330f729Sjoerg help="Add a list of inputs files to fuzz (one per line)", 231*7330f729Sjoerg type=str, action="append", dest="filelists", default=[]) 232*7330f729Sjoerg parser.add_option_group(group) 233*7330f729Sjoerg 234*7330f729Sjoerg group = OptionGroup(parser, "Fuzz Options") 235*7330f729Sjoerg group.add_option("", "--replacement-chars", dest="replacement_chars", 236*7330f729Sjoerg help="Characters to insert/replace", 237*7330f729Sjoerg default="0{}[]<>\;@#$^%& ") 238*7330f729Sjoerg group.add_option("", "--replacement-string", dest="replacement_strings", 239*7330f729Sjoerg action="append", help="Add a replacement string to use", 240*7330f729Sjoerg default=[]) 241*7330f729Sjoerg group.add_option("", "--replacement-list", dest="replacement_lists", 242*7330f729Sjoerg help="Add a list of replacement strings (one per line)", 243*7330f729Sjoerg action="append", default=[]) 244*7330f729Sjoerg group.add_option("", "--no-delete", help="Don't delete characters", 245*7330f729Sjoerg action='store_false', dest="enable_delete", default=True) 246*7330f729Sjoerg group.add_option("", "--no-insert", help="Don't insert strings", 247*7330f729Sjoerg action='store_false', dest="enable_insert", default=True) 248*7330f729Sjoerg group.add_option("", "--no-replace", help="Don't replace strings", 249*7330f729Sjoerg action='store_false', dest="enable_replace", default=True) 250*7330f729Sjoerg group.add_option("", "--no-revert", help="Don't revert changes", 251*7330f729Sjoerg action='store_false', dest="revert", default=True) 252*7330f729Sjoerg group.add_option("", "--stop-on-fail", help="Stop on first failure", 253*7330f729Sjoerg action='store_true', dest="stop_on_fail", default=False) 254*7330f729Sjoerg parser.add_option_group(group) 255*7330f729Sjoerg 256*7330f729Sjoerg group = OptionGroup(parser, "Test Selection") 257*7330f729Sjoerg group.add_option("", "--test", help="Run a particular test", 258*7330f729Sjoerg type=int, dest="test", default=None, metavar="INDEX") 259*7330f729Sjoerg group.add_option("", "--max-tests", help="Maximum number of tests", 260*7330f729Sjoerg type=int, dest="max_tests", default=None, metavar="COUNT") 261*7330f729Sjoerg group.add_option("", "--pick-input", 262*7330f729Sjoerg help="Randomly select an input byte as well as fuzzing", 263*7330f729Sjoerg action='store_true', dest="pick_input", default=False) 264*7330f729Sjoerg parser.add_option_group(group) 265*7330f729Sjoerg 266*7330f729Sjoerg parser.disable_interspersed_args() 267*7330f729Sjoerg 268*7330f729Sjoerg (opts, args) = parser.parse_args() 269*7330f729Sjoerg 270*7330f729Sjoerg if not args: 271*7330f729Sjoerg parser.error("Invalid number of arguments") 272*7330f729Sjoerg 273*7330f729Sjoerg # Collect the list of inputs. 274*7330f729Sjoerg input_files = list(opts.input_files) 275*7330f729Sjoerg for filelist in opts.filelists: 276*7330f729Sjoerg f = open(filelist) 277*7330f729Sjoerg try: 278*7330f729Sjoerg for ln in f: 279*7330f729Sjoerg ln = ln.strip() 280*7330f729Sjoerg if ln: 281*7330f729Sjoerg input_files.append(ln) 282*7330f729Sjoerg finally: 283*7330f729Sjoerg f.close() 284*7330f729Sjoerg input_files.sort() 285*7330f729Sjoerg 286*7330f729Sjoerg if not input_files: 287*7330f729Sjoerg parser.error("No input files!") 288*7330f729Sjoerg 289*7330f729Sjoerg print '%s: note: fuzzing %d files.' % (sys.argv[0], len(input_files)) 290*7330f729Sjoerg 291*7330f729Sjoerg # Make sure the log directory exists if used. 292*7330f729Sjoerg if opts.log_dir: 293*7330f729Sjoerg if not os.path.exists(opts.log_dir): 294*7330f729Sjoerg try: 295*7330f729Sjoerg os.mkdir(opts.log_dir) 296*7330f729Sjoerg except OSError: 297*7330f729Sjoerg print "%s: error: log directory couldn't be created!" % ( 298*7330f729Sjoerg sys.argv[0],) 299*7330f729Sjoerg raise SystemExit,1 300*7330f729Sjoerg 301*7330f729Sjoerg # Get the list if insert/replacement strings. 302*7330f729Sjoerg replacements = list(opts.replacement_chars) 303*7330f729Sjoerg replacements.extend(opts.replacement_strings) 304*7330f729Sjoerg for replacement_list in opts.replacement_lists: 305*7330f729Sjoerg f = open(replacement_list) 306*7330f729Sjoerg try: 307*7330f729Sjoerg for ln in f: 308*7330f729Sjoerg ln = ln[:-1] 309*7330f729Sjoerg if ln: 310*7330f729Sjoerg replacements.append(ln) 311*7330f729Sjoerg finally: 312*7330f729Sjoerg f.close() 313*7330f729Sjoerg 314*7330f729Sjoerg # Unique and order the replacement list. 315*7330f729Sjoerg replacements = list(set(replacements)) 316*7330f729Sjoerg replacements.sort() 317*7330f729Sjoerg 318*7330f729Sjoerg # Create the test generator. 319*7330f729Sjoerg tg = TestGenerator(input_files, opts.enable_delete, opts.enable_insert, 320*7330f729Sjoerg opts.enable_replace, replacements, opts.pick_input) 321*7330f729Sjoerg 322*7330f729Sjoerg print '%s: note: %d input bytes.' % (sys.argv[0], tg.num_positions) 323*7330f729Sjoerg print '%s: note: %d total tests.' % (sys.argv[0], tg.num_tests) 324*7330f729Sjoerg if opts.test is not None: 325*7330f729Sjoerg it = [opts.test] 326*7330f729Sjoerg elif opts.max_tests is not None: 327*7330f729Sjoerg it = itertools.imap(random.randrange, 328*7330f729Sjoerg itertools.repeat(tg.num_tests, opts.max_tests)) 329*7330f729Sjoerg else: 330*7330f729Sjoerg it = itertools.imap(random.randrange, itertools.repeat(tg.num_tests)) 331*7330f729Sjoerg for test in it: 332*7330f729Sjoerg t = tg.get_test(test) 333*7330f729Sjoerg 334*7330f729Sjoerg if opts.verbose: 335*7330f729Sjoerg print '%s: note: running test %d: %r' % (sys.argv[0], test, t) 336*7330f729Sjoerg ta = TestApplication(tg, t) 337*7330f729Sjoerg try: 338*7330f729Sjoerg ta.apply() 339*7330f729Sjoerg test_result = run_one_test(ta, test, input_files, args) 340*7330f729Sjoerg if not test_result and opts.stop_on_fail: 341*7330f729Sjoerg opts.revert = False 342*7330f729Sjoerg sys.exit(1) 343*7330f729Sjoerg finally: 344*7330f729Sjoerg if opts.revert: 345*7330f729Sjoerg ta.revert() 346*7330f729Sjoerg 347*7330f729Sjoerg sys.stdout.flush() 348*7330f729Sjoerg 349*7330f729Sjoergif __name__ == '__main__': 350*7330f729Sjoerg main() 351