|  | #!/usr/bin/env python3 | 
|  |  | 
|  | # Tool for running fuzz tests | 
|  | # | 
|  | # Copyright (C) 2014 Maria Kustova <maria.k@catit.be> | 
|  | # | 
|  | # This program is free software: you can redistribute it and/or modify | 
|  | # it under the terms of the GNU General Public License as published by | 
|  | # the Free Software Foundation, either version 2 of the License, or | 
|  | # (at your option) any later version. | 
|  | # | 
|  | # This program is distributed in the hope that it will be useful, | 
|  | # but WITHOUT ANY WARRANTY; without even the implied warranty of | 
|  | # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the | 
|  | # GNU General Public License for more details. | 
|  | # | 
|  | # You should have received a copy of the GNU General Public License | 
|  | # along with this program.  If not, see <http://www.gnu.org/licenses/>. | 
|  | # | 
|  |  | 
|  | import sys | 
|  | import os | 
|  | import signal | 
|  | import subprocess | 
|  | import random | 
|  | import shutil | 
|  | from itertools import count | 
|  | import time | 
|  | import getopt | 
|  | import io | 
|  | import resource | 
|  |  | 
|  | try: | 
|  | import json | 
|  | except ImportError: | 
|  | try: | 
|  | import simplejson as json | 
|  | except ImportError: | 
|  | print("Warning: Module for JSON processing is not found.\n" \ | 
|  | "'--config' and '--command' options are not supported.", file=sys.stderr) | 
|  |  | 
|  | # Backing file sizes in MB | 
|  | MAX_BACKING_FILE_SIZE = 10 | 
|  | MIN_BACKING_FILE_SIZE = 1 | 
|  |  | 
|  |  | 
|  | def multilog(msg, *output): | 
|  | """ Write an object to all of specified file descriptors.""" | 
|  | for fd in output: | 
|  | fd.write(msg) | 
|  | fd.flush() | 
|  |  | 
|  |  | 
|  | def str_signal(sig): | 
|  | """ Convert a numeric value of a system signal to the string one | 
|  | defined by the current operational system. | 
|  | """ | 
|  | for k, v in signal.__dict__.items(): | 
|  | if v == sig: | 
|  | return k | 
|  |  | 
|  |  | 
|  | def run_app(fd, q_args): | 
|  | """Start an application with specified arguments and return its exit code | 
|  | or kill signal depending on the result of execution. | 
|  | """ | 
|  |  | 
|  | class Alarm(Exception): | 
|  | """Exception for signal.alarm events.""" | 
|  | pass | 
|  |  | 
|  | def handler(*args): | 
|  | """Notify that an alarm event occurred.""" | 
|  | raise Alarm | 
|  |  | 
|  | signal.signal(signal.SIGALRM, handler) | 
|  | signal.alarm(600) | 
|  | term_signal = signal.SIGKILL | 
|  | devnull = open('/dev/null', 'r+') | 
|  | process = subprocess.Popen(q_args, stdin=devnull, | 
|  | stdout=subprocess.PIPE, | 
|  | stderr=subprocess.PIPE, | 
|  | errors='replace') | 
|  | try: | 
|  | out, err = process.communicate() | 
|  | signal.alarm(0) | 
|  | fd.write(out) | 
|  | fd.write(err) | 
|  | fd.flush() | 
|  | return process.returncode | 
|  |  | 
|  | except Alarm: | 
|  | os.kill(process.pid, term_signal) | 
|  | fd.write('The command was terminated by timeout.\n') | 
|  | fd.flush() | 
|  | return -term_signal | 
|  |  | 
|  |  | 
|  | class TestException(Exception): | 
|  | """Exception for errors risen by TestEnv objects.""" | 
|  | pass | 
|  |  | 
|  |  | 
|  | class TestEnv(object): | 
|  |  | 
|  | """Test object. | 
|  |  | 
|  | The class sets up test environment, generates backing and test images | 
|  | and executes application under tests with specified arguments and a test | 
|  | image provided. | 
|  |  | 
|  | All logs are collected. | 
|  |  | 
|  | The summary log will contain short descriptions and statuses of tests in | 
|  | a run. | 
|  |  | 
|  | The test log will include application (e.g. 'qemu-img') logs besides info | 
|  | sent to the summary log. | 
|  | """ | 
|  |  | 
|  | def __init__(self, test_id, seed, work_dir, run_log, | 
|  | cleanup=True, log_all=False): | 
|  | """Set test environment in a specified work directory. | 
|  |  | 
|  | Path to qemu-img and qemu-io will be retrieved from 'QEMU_IMG' and | 
|  | 'QEMU_IO' environment variables. | 
|  | """ | 
|  | if seed is not None: | 
|  | self.seed = seed | 
|  | else: | 
|  | self.seed = str(random.randint(0, sys.maxsize)) | 
|  | random.seed(self.seed) | 
|  |  | 
|  | self.init_path = os.getcwd() | 
|  | self.work_dir = work_dir | 
|  | self.current_dir = os.path.join(work_dir, 'test-' + test_id) | 
|  | self.qemu_img = \ | 
|  | os.environ.get('QEMU_IMG', 'qemu-img').strip().split(' ') | 
|  | self.qemu_io = os.environ.get('QEMU_IO', 'qemu-io').strip().split(' ') | 
|  | self.commands = [['qemu-img', 'check', '-f', 'qcow2', '$test_img'], | 
|  | ['qemu-img', 'info', '-f', 'qcow2', '$test_img'], | 
|  | ['qemu-io', '$test_img', '-c', 'read $off $len'], | 
|  | ['qemu-io', '$test_img', '-c', 'write $off $len'], | 
|  | ['qemu-io', '$test_img', '-c', | 
|  | 'aio_read $off $len'], | 
|  | ['qemu-io', '$test_img', '-c', | 
|  | 'aio_write $off $len'], | 
|  | ['qemu-io', '$test_img', '-c', 'flush'], | 
|  | ['qemu-io', '$test_img', '-c', | 
|  | 'discard $off $len'], | 
|  | ['qemu-io', '$test_img', '-c', | 
|  | 'truncate $off']] | 
|  | for fmt in ['raw', 'vmdk', 'vdi', 'qcow2', 'file', 'qed', 'vpc']: | 
|  | self.commands.append( | 
|  | ['qemu-img', 'convert', '-f', 'qcow2', '-O', fmt, | 
|  | '$test_img', 'converted_image.' + fmt]) | 
|  |  | 
|  | try: | 
|  | os.makedirs(self.current_dir) | 
|  | except OSError as e: | 
|  | print("Error: The working directory '%s' cannot be used. Reason: %s"\ | 
|  | % (self.work_dir, e.strerror), file=sys.stderr) | 
|  | raise TestException | 
|  | self.log = open(os.path.join(self.current_dir, "test.log"), "w") | 
|  | self.parent_log = open(run_log, "a") | 
|  | self.failed = False | 
|  | self.cleanup = cleanup | 
|  | self.log_all = log_all | 
|  |  | 
|  | def _create_backing_file(self): | 
|  | """Create a backing file in the current directory. | 
|  |  | 
|  | Return a tuple of a backing file name and format. | 
|  |  | 
|  | Format of a backing file is randomly chosen from all formats supported | 
|  | by 'qemu-img create'. | 
|  | """ | 
|  | # All formats supported by the 'qemu-img create' command. | 
|  | backing_file_fmt = random.choice(['raw', 'vmdk', 'vdi', 'qcow2', | 
|  | 'file', 'qed', 'vpc']) | 
|  | backing_file_name = 'backing_img.' + backing_file_fmt | 
|  | backing_file_size = random.randint(MIN_BACKING_FILE_SIZE, | 
|  | MAX_BACKING_FILE_SIZE) * (1 << 20) | 
|  | cmd = self.qemu_img + ['create', '-f', backing_file_fmt, | 
|  | backing_file_name, str(backing_file_size)] | 
|  | temp_log = io.StringIO() | 
|  | retcode = run_app(temp_log, cmd) | 
|  | if retcode == 0: | 
|  | temp_log.close() | 
|  | return (backing_file_name, backing_file_fmt) | 
|  | else: | 
|  | multilog("Warning: The %s backing file was not created.\n\n" | 
|  | % backing_file_fmt, sys.stderr, self.log, self.parent_log) | 
|  | self.log.write("Log for the failure:\n" + temp_log.getvalue() + | 
|  | '\n\n') | 
|  | temp_log.close() | 
|  | return (None, None) | 
|  |  | 
|  | def execute(self, input_commands=None, fuzz_config=None): | 
|  | """ Execute a test. | 
|  |  | 
|  | The method creates backing and test images, runs test app and analyzes | 
|  | its exit status. If the application was killed by a signal, the test | 
|  | is marked as failed. | 
|  | """ | 
|  | if input_commands is None: | 
|  | commands = self.commands | 
|  | else: | 
|  | commands = input_commands | 
|  |  | 
|  | os.chdir(self.current_dir) | 
|  | backing_file_name, backing_file_fmt = self._create_backing_file() | 
|  | img_size = image_generator.create_image( | 
|  | 'test.img', backing_file_name, backing_file_fmt, fuzz_config) | 
|  | for item in commands: | 
|  | shutil.copy('test.img', 'copy.img') | 
|  | # 'off' and 'len' are multiple of the sector size | 
|  | sector_size = 512 | 
|  | start = random.randrange(0, img_size + 1, sector_size) | 
|  | end = random.randrange(start, img_size + 1, sector_size) | 
|  |  | 
|  | if item[0] == 'qemu-img': | 
|  | current_cmd = list(self.qemu_img) | 
|  | elif item[0] == 'qemu-io': | 
|  | current_cmd = list(self.qemu_io) | 
|  | else: | 
|  | multilog("Warning: test command '%s' is not defined.\n" | 
|  | % item[0], sys.stderr, self.log, self.parent_log) | 
|  | continue | 
|  | # Replace all placeholders with their real values | 
|  | for v in item[1:]: | 
|  | c = (v | 
|  | .replace('$test_img', 'copy.img') | 
|  | .replace('$off', str(start)) | 
|  | .replace('$len', str(end - start))) | 
|  | current_cmd.append(c) | 
|  |  | 
|  | # Log string with the test header | 
|  | test_summary = "Seed: %s\nCommand: %s\nTest directory: %s\n" \ | 
|  | "Backing file: %s\n" \ | 
|  | % (self.seed, " ".join(current_cmd), | 
|  | self.current_dir, backing_file_name) | 
|  | temp_log = io.StringIO() | 
|  | try: | 
|  | retcode = run_app(temp_log, current_cmd) | 
|  | except OSError as e: | 
|  | multilog("%sError: Start of '%s' failed. Reason: %s\n\n" | 
|  | % (test_summary, os.path.basename(current_cmd[0]), | 
|  | e.strerror), | 
|  | sys.stderr, self.log, self.parent_log) | 
|  | raise TestException | 
|  |  | 
|  | if retcode < 0: | 
|  | self.log.write(temp_log.getvalue()) | 
|  | multilog("%sFAIL: Test terminated by signal %s\n\n" | 
|  | % (test_summary, str_signal(-retcode)), | 
|  | sys.stderr, self.log, self.parent_log) | 
|  | self.failed = True | 
|  | else: | 
|  | if self.log_all: | 
|  | self.log.write(temp_log.getvalue()) | 
|  | multilog("%sPASS: Application exited with the code " \ | 
|  | "'%d'\n\n" % (test_summary, retcode), | 
|  | sys.stdout, self.log, self.parent_log) | 
|  | temp_log.close() | 
|  | os.remove('copy.img') | 
|  |  | 
|  | def finish(self): | 
|  | """Restore the test environment after a test execution.""" | 
|  | self.log.close() | 
|  | self.parent_log.close() | 
|  | os.chdir(self.init_path) | 
|  | if self.cleanup and not self.failed: | 
|  | shutil.rmtree(self.current_dir) | 
|  |  | 
|  | if __name__ == '__main__': | 
|  |  | 
|  | def usage(): | 
|  | print(""" | 
|  | Usage: runner.py [OPTION...] TEST_DIR IMG_GENERATOR | 
|  |  | 
|  | Set up test environment in TEST_DIR and run a test in it. A module for | 
|  | test image generation should be specified via IMG_GENERATOR. | 
|  |  | 
|  | Example: | 
|  | runner.py -c '[["qemu-img", "info", "$test_img"]]' /tmp/test qcow2 | 
|  |  | 
|  | Optional arguments: | 
|  | -h, --help                    display this help and exit | 
|  | -d, --duration=NUMBER         finish tests after NUMBER of seconds | 
|  | -c, --command=JSON            run tests for all commands specified in | 
|  | the JSON array | 
|  | -s, --seed=STRING             seed for a test image generation, | 
|  | by default will be generated randomly | 
|  | --config=JSON                 take fuzzer configuration from the JSON | 
|  | array | 
|  | -k, --keep_passed             don't remove folders of passed tests | 
|  | -v, --verbose                 log information about passed tests | 
|  |  | 
|  | JSON: | 
|  |  | 
|  | '--command' accepts a JSON array of commands. Each command presents | 
|  | an application under test with all its parameters as a list of strings, | 
|  | e.g. ["qemu-io", "$test_img", "-c", "write $off $len"]. | 
|  |  | 
|  | Supported application aliases: 'qemu-img' and 'qemu-io'. | 
|  |  | 
|  | Supported argument aliases: $test_img for the fuzzed image, $off | 
|  | for an offset, $len for length. | 
|  |  | 
|  | Values for $off and $len will be generated based on the virtual disk | 
|  | size of the fuzzed image. | 
|  |  | 
|  | Paths to 'qemu-img' and 'qemu-io' are retrevied from 'QEMU_IMG' and | 
|  | 'QEMU_IO' environment variables. | 
|  |  | 
|  | '--config' accepts a JSON array of fields to be fuzzed, e.g. | 
|  | '[["header"], ["header", "version"]]'. | 
|  |  | 
|  | Each of the list elements can consist of a complex image element only | 
|  | as ["header"] or ["feature_name_table"] or an exact field as | 
|  | ["header", "version"]. In the first case random portion of the element | 
|  | fields will be fuzzed, in the second one the specified field will be | 
|  | fuzzed always. | 
|  |  | 
|  | If '--config' argument is specified, fields not listed in | 
|  | the configuration array will not be fuzzed. | 
|  | """) | 
|  |  | 
|  | def run_test(test_id, seed, work_dir, run_log, cleanup, log_all, | 
|  | command, fuzz_config): | 
|  | """Setup environment for one test and execute this test.""" | 
|  | try: | 
|  | test = TestEnv(test_id, seed, work_dir, run_log, cleanup, | 
|  | log_all) | 
|  | except TestException: | 
|  | sys.exit(1) | 
|  |  | 
|  | # Python 2.4 doesn't support 'finally' and 'except' in the same 'try' | 
|  | # block | 
|  | try: | 
|  | try: | 
|  | test.execute(command, fuzz_config) | 
|  | except TestException: | 
|  | sys.exit(1) | 
|  | finally: | 
|  | test.finish() | 
|  |  | 
|  | def should_continue(duration, start_time): | 
|  | """Return True if a new test can be started and False otherwise.""" | 
|  | current_time = int(time.time()) | 
|  | return (duration is None) or (current_time - start_time < duration) | 
|  |  | 
|  | try: | 
|  | opts, args = getopt.gnu_getopt(sys.argv[1:], 'c:hs:kvd:', | 
|  | ['command=', 'help', 'seed=', 'config=', | 
|  | 'keep_passed', 'verbose', 'duration=']) | 
|  | except getopt.error as e: | 
|  | print("Error: %s\n\nTry 'runner.py --help' for more information" % e, file=sys.stderr) | 
|  | sys.exit(1) | 
|  |  | 
|  | command = None | 
|  | cleanup = True | 
|  | log_all = False | 
|  | seed = None | 
|  | config = None | 
|  | duration = None | 
|  | for opt, arg in opts: | 
|  | if opt in ('-h', '--help'): | 
|  | usage() | 
|  | sys.exit() | 
|  | elif opt in ('-c', '--command'): | 
|  | try: | 
|  | command = json.loads(arg) | 
|  | except (TypeError, ValueError, NameError) as e: | 
|  | print("Error: JSON array of test commands cannot be loaded.\n" \ | 
|  | "Reason: %s" % e, file=sys.stderr) | 
|  | sys.exit(1) | 
|  | elif opt in ('-k', '--keep_passed'): | 
|  | cleanup = False | 
|  | elif opt in ('-v', '--verbose'): | 
|  | log_all = True | 
|  | elif opt in ('-s', '--seed'): | 
|  | seed = arg | 
|  | elif opt in ('-d', '--duration'): | 
|  | duration = int(arg) | 
|  | elif opt == '--config': | 
|  | try: | 
|  | config = json.loads(arg) | 
|  | except (TypeError, ValueError, NameError) as e: | 
|  | print("Error: JSON array with the fuzzer configuration cannot" \ | 
|  | " be loaded\nReason: %s" % e, file=sys.stderr) | 
|  | sys.exit(1) | 
|  |  | 
|  | if not len(args) == 2: | 
|  | print("Expected two parameters\nTry 'runner.py --help'" \ | 
|  | " for more information.", file=sys.stderr) | 
|  | sys.exit(1) | 
|  |  | 
|  | work_dir = os.path.realpath(args[0]) | 
|  | # run_log is created in 'main', because multiple tests are expected to | 
|  | # log in it | 
|  | run_log = os.path.join(work_dir, 'run.log') | 
|  |  | 
|  | # Add the path to the image generator module to sys.path | 
|  | sys.path.append(os.path.realpath(os.path.dirname(args[1]))) | 
|  | # Remove a script extension from image generator module if any | 
|  | generator_name = os.path.splitext(os.path.basename(args[1]))[0] | 
|  |  | 
|  | try: | 
|  | image_generator = __import__(generator_name) | 
|  | except ImportError as e: | 
|  | print("Error: The image generator '%s' cannot be imported.\n" \ | 
|  | "Reason: %s" % (generator_name, e), file=sys.stderr) | 
|  | sys.exit(1) | 
|  |  | 
|  | # Enable core dumps | 
|  | resource.setrlimit(resource.RLIMIT_CORE, (-1, -1)) | 
|  | # If a seed is specified, only one test will be executed. | 
|  | # Otherwise runner will terminate after a keyboard interruption | 
|  | start_time = int(time.time()) | 
|  | test_id = count(1) | 
|  | while should_continue(duration, start_time): | 
|  | try: | 
|  | run_test(str(next(test_id)), seed, work_dir, run_log, cleanup, | 
|  | log_all, command, config) | 
|  | except (KeyboardInterrupt, SystemExit): | 
|  | sys.exit(1) | 
|  |  | 
|  | if seed is not None: | 
|  | break |