| #!/usr/bin/env python |
| |
| # Tool for running fuzz tests |
| # |
| # Copyright (C) 2014 Maria Kustova <maria.k@catit.be> |
| # |
| # This program is free software: you can redistribute it and/or modify |
| # it under the terms of the GNU General Public License as published by |
| # the Free Software Foundation, either version 2 of the License, or |
| # (at your option) any later version. |
| # |
| # This program is distributed in the hope that it will be useful, |
| # but WITHOUT ANY WARRANTY; without even the implied warranty of |
| # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the |
| # GNU General Public License for more details. |
| # |
| # You should have received a copy of the GNU General Public License |
| # along with this program. If not, see <http://www.gnu.org/licenses/>. |
| # |
| |
| import sys |
| import os |
| import signal |
| import subprocess |
| import random |
| import shutil |
| from itertools import count |
| import time |
| import getopt |
| import StringIO |
| import resource |
| |
| try: |
| import json |
| except ImportError: |
| try: |
| import simplejson as json |
| except ImportError: |
| print >>sys.stderr, \ |
| "Warning: Module for JSON processing is not found.\n" \ |
| "'--config' and '--command' options are not supported." |
| |
| # Backing file sizes in MB |
| MAX_BACKING_FILE_SIZE = 10 |
| MIN_BACKING_FILE_SIZE = 1 |
| |
| |
| def multilog(msg, *output): |
| """ Write an object to all of specified file descriptors.""" |
| for fd in output: |
| fd.write(msg) |
| fd.flush() |
| |
| |
| def str_signal(sig): |
| """ Convert a numeric value of a system signal to the string one |
| defined by the current operational system. |
| """ |
| for k, v in signal.__dict__.items(): |
| if v == sig: |
| return k |
| |
| |
| def run_app(fd, q_args): |
| """Start an application with specified arguments and return its exit code |
| or kill signal depending on the result of execution. |
| """ |
| |
| class Alarm(Exception): |
| """Exception for signal.alarm events.""" |
| pass |
| |
| def handler(*args): |
| """Notify that an alarm event occurred.""" |
| raise Alarm |
| |
| signal.signal(signal.SIGALRM, handler) |
| signal.alarm(600) |
| term_signal = signal.SIGKILL |
| devnull = open('/dev/null', 'r+') |
| process = subprocess.Popen(q_args, stdin=devnull, |
| stdout=subprocess.PIPE, |
| stderr=subprocess.PIPE) |
| try: |
| out, err = process.communicate() |
| signal.alarm(0) |
| fd.write(out) |
| fd.write(err) |
| fd.flush() |
| return process.returncode |
| |
| except Alarm: |
| os.kill(process.pid, term_signal) |
| fd.write('The command was terminated by timeout.\n') |
| fd.flush() |
| return -term_signal |
| |
| |
| class TestException(Exception): |
| """Exception for errors risen by TestEnv objects.""" |
| pass |
| |
| |
| class TestEnv(object): |
| |
| """Test object. |
| |
| The class sets up test environment, generates backing and test images |
| and executes application under tests with specified arguments and a test |
| image provided. |
| |
| All logs are collected. |
| |
| The summary log will contain short descriptions and statuses of tests in |
| a run. |
| |
| The test log will include application (e.g. 'qemu-img') logs besides info |
| sent to the summary log. |
| """ |
| |
| def __init__(self, test_id, seed, work_dir, run_log, |
| cleanup=True, log_all=False): |
| """Set test environment in a specified work directory. |
| |
| Path to qemu-img and qemu-io will be retrieved from 'QEMU_IMG' and |
| 'QEMU_IO' environment variables. |
| """ |
| if seed is not None: |
| self.seed = seed |
| else: |
| self.seed = str(random.randint(0, sys.maxint)) |
| random.seed(self.seed) |
| |
| self.init_path = os.getcwd() |
| self.work_dir = work_dir |
| self.current_dir = os.path.join(work_dir, 'test-' + test_id) |
| self.qemu_img = \ |
| os.environ.get('QEMU_IMG', 'qemu-img').strip().split(' ') |
| self.qemu_io = os.environ.get('QEMU_IO', 'qemu-io').strip().split(' ') |
| self.commands = [['qemu-img', 'check', '-f', 'qcow2', '$test_img'], |
| ['qemu-img', 'info', '-f', 'qcow2', '$test_img'], |
| ['qemu-io', '$test_img', '-c', 'read $off $len'], |
| ['qemu-io', '$test_img', '-c', 'write $off $len'], |
| ['qemu-io', '$test_img', '-c', |
| 'aio_read $off $len'], |
| ['qemu-io', '$test_img', '-c', |
| 'aio_write $off $len'], |
| ['qemu-io', '$test_img', '-c', 'flush'], |
| ['qemu-io', '$test_img', '-c', |
| 'discard $off $len'], |
| ['qemu-io', '$test_img', '-c', |
| 'truncate $off']] |
| for fmt in ['raw', 'vmdk', 'vdi', 'qcow2', 'file', 'qed', 'vpc']: |
| self.commands.append( |
| ['qemu-img', 'convert', '-f', 'qcow2', '-O', fmt, |
| '$test_img', 'converted_image.' + fmt]) |
| |
| try: |
| os.makedirs(self.current_dir) |
| except OSError as e: |
| print >>sys.stderr, \ |
| "Error: The working directory '%s' cannot be used. Reason: %s"\ |
| % (self.work_dir, e[1]) |
| raise TestException |
| self.log = open(os.path.join(self.current_dir, "test.log"), "w") |
| self.parent_log = open(run_log, "a") |
| self.failed = False |
| self.cleanup = cleanup |
| self.log_all = log_all |
| |
| def _create_backing_file(self): |
| """Create a backing file in the current directory. |
| |
| Return a tuple of a backing file name and format. |
| |
| Format of a backing file is randomly chosen from all formats supported |
| by 'qemu-img create'. |
| """ |
| # All formats supported by the 'qemu-img create' command. |
| backing_file_fmt = random.choice(['raw', 'vmdk', 'vdi', 'qcow2', |
| 'file', 'qed', 'vpc']) |
| backing_file_name = 'backing_img.' + backing_file_fmt |
| backing_file_size = random.randint(MIN_BACKING_FILE_SIZE, |
| MAX_BACKING_FILE_SIZE) * (1 << 20) |
| cmd = self.qemu_img + ['create', '-f', backing_file_fmt, |
| backing_file_name, str(backing_file_size)] |
| temp_log = StringIO.StringIO() |
| retcode = run_app(temp_log, cmd) |
| if retcode == 0: |
| temp_log.close() |
| return (backing_file_name, backing_file_fmt) |
| else: |
| multilog("Warning: The %s backing file was not created.\n\n" |
| % backing_file_fmt, sys.stderr, self.log, self.parent_log) |
| self.log.write("Log for the failure:\n" + temp_log.getvalue() + |
| '\n\n') |
| temp_log.close() |
| return (None, None) |
| |
| def execute(self, input_commands=None, fuzz_config=None): |
| """ Execute a test. |
| |
| The method creates backing and test images, runs test app and analyzes |
| its exit status. If the application was killed by a signal, the test |
| is marked as failed. |
| """ |
| if input_commands is None: |
| commands = self.commands |
| else: |
| commands = input_commands |
| |
| os.chdir(self.current_dir) |
| backing_file_name, backing_file_fmt = self._create_backing_file() |
| img_size = image_generator.create_image( |
| 'test.img', backing_file_name, backing_file_fmt, fuzz_config) |
| for item in commands: |
| shutil.copy('test.img', 'copy.img') |
| # 'off' and 'len' are multiple of the sector size |
| sector_size = 512 |
| start = random.randrange(0, img_size + 1, sector_size) |
| end = random.randrange(start, img_size + 1, sector_size) |
| |
| if item[0] == 'qemu-img': |
| current_cmd = list(self.qemu_img) |
| elif item[0] == 'qemu-io': |
| current_cmd = list(self.qemu_io) |
| else: |
| multilog("Warning: test command '%s' is not defined.\n" |
| % item[0], sys.stderr, self.log, self.parent_log) |
| continue |
| # Replace all placeholders with their real values |
| for v in item[1:]: |
| c = (v |
| .replace('$test_img', 'copy.img') |
| .replace('$off', str(start)) |
| .replace('$len', str(end - start))) |
| current_cmd.append(c) |
| |
| # Log string with the test header |
| test_summary = "Seed: %s\nCommand: %s\nTest directory: %s\n" \ |
| "Backing file: %s\n" \ |
| % (self.seed, " ".join(current_cmd), |
| self.current_dir, backing_file_name) |
| temp_log = StringIO.StringIO() |
| try: |
| retcode = run_app(temp_log, current_cmd) |
| except OSError as e: |
| multilog("%sError: Start of '%s' failed. Reason: %s\n\n" |
| % (test_summary, os.path.basename(current_cmd[0]), |
| e[1]), |
| sys.stderr, self.log, self.parent_log) |
| raise TestException |
| |
| if retcode < 0: |
| self.log.write(temp_log.getvalue()) |
| multilog("%sFAIL: Test terminated by signal %s\n\n" |
| % (test_summary, str_signal(-retcode)), |
| sys.stderr, self.log, self.parent_log) |
| self.failed = True |
| else: |
| if self.log_all: |
| self.log.write(temp_log.getvalue()) |
| multilog("%sPASS: Application exited with the code " \ |
| "'%d'\n\n" % (test_summary, retcode), |
| sys.stdout, self.log, self.parent_log) |
| temp_log.close() |
| os.remove('copy.img') |
| |
| def finish(self): |
| """Restore the test environment after a test execution.""" |
| self.log.close() |
| self.parent_log.close() |
| os.chdir(self.init_path) |
| if self.cleanup and not self.failed: |
| shutil.rmtree(self.current_dir) |
| |
| if __name__ == '__main__': |
| |
| def usage(): |
| print """ |
| Usage: runner.py [OPTION...] TEST_DIR IMG_GENERATOR |
| |
| Set up test environment in TEST_DIR and run a test in it. A module for |
| test image generation should be specified via IMG_GENERATOR. |
| |
| Example: |
| runner.py -c '[["qemu-img", "info", "$test_img"]]' /tmp/test qcow2 |
| |
| Optional arguments: |
| -h, --help display this help and exit |
| -d, --duration=NUMBER finish tests after NUMBER of seconds |
| -c, --command=JSON run tests for all commands specified in |
| the JSON array |
| -s, --seed=STRING seed for a test image generation, |
| by default will be generated randomly |
| --config=JSON take fuzzer configuration from the JSON |
| array |
| -k, --keep_passed don't remove folders of passed tests |
| -v, --verbose log information about passed tests |
| |
| JSON: |
| |
| '--command' accepts a JSON array of commands. Each command presents |
| an application under test with all its parameters as a list of strings, |
| e.g. ["qemu-io", "$test_img", "-c", "write $off $len"]. |
| |
| Supported application aliases: 'qemu-img' and 'qemu-io'. |
| |
| Supported argument aliases: $test_img for the fuzzed image, $off |
| for an offset, $len for length. |
| |
| Values for $off and $len will be generated based on the virtual disk |
| size of the fuzzed image. |
| |
| Paths to 'qemu-img' and 'qemu-io' are retrevied from 'QEMU_IMG' and |
| 'QEMU_IO' environment variables. |
| |
| '--config' accepts a JSON array of fields to be fuzzed, e.g. |
| '[["header"], ["header", "version"]]'. |
| |
| Each of the list elements can consist of a complex image element only |
| as ["header"] or ["feature_name_table"] or an exact field as |
| ["header", "version"]. In the first case random portion of the element |
| fields will be fuzzed, in the second one the specified field will be |
| fuzzed always. |
| |
| If '--config' argument is specified, fields not listed in |
| the configuration array will not be fuzzed. |
| """ |
| |
| def run_test(test_id, seed, work_dir, run_log, cleanup, log_all, |
| command, fuzz_config): |
| """Setup environment for one test and execute this test.""" |
| try: |
| test = TestEnv(test_id, seed, work_dir, run_log, cleanup, |
| log_all) |
| except TestException: |
| sys.exit(1) |
| |
| # Python 2.4 doesn't support 'finally' and 'except' in the same 'try' |
| # block |
| try: |
| try: |
| test.execute(command, fuzz_config) |
| except TestException: |
| sys.exit(1) |
| finally: |
| test.finish() |
| |
| def should_continue(duration, start_time): |
| """Return True if a new test can be started and False otherwise.""" |
| current_time = int(time.time()) |
| return (duration is None) or (current_time - start_time < duration) |
| |
| try: |
| opts, args = getopt.gnu_getopt(sys.argv[1:], 'c:hs:kvd:', |
| ['command=', 'help', 'seed=', 'config=', |
| 'keep_passed', 'verbose', 'duration=']) |
| except getopt.error as e: |
| print >>sys.stderr, \ |
| "Error: %s\n\nTry 'runner.py --help' for more information" % e |
| sys.exit(1) |
| |
| command = None |
| cleanup = True |
| log_all = False |
| seed = None |
| config = None |
| duration = None |
| for opt, arg in opts: |
| if opt in ('-h', '--help'): |
| usage() |
| sys.exit() |
| elif opt in ('-c', '--command'): |
| try: |
| command = json.loads(arg) |
| except (TypeError, ValueError, NameError) as e: |
| print >>sys.stderr, \ |
| "Error: JSON array of test commands cannot be loaded.\n" \ |
| "Reason: %s" % e |
| sys.exit(1) |
| elif opt in ('-k', '--keep_passed'): |
| cleanup = False |
| elif opt in ('-v', '--verbose'): |
| log_all = True |
| elif opt in ('-s', '--seed'): |
| seed = arg |
| elif opt in ('-d', '--duration'): |
| duration = int(arg) |
| elif opt == '--config': |
| try: |
| config = json.loads(arg) |
| except (TypeError, ValueError, NameError) as e: |
| print >>sys.stderr, \ |
| "Error: JSON array with the fuzzer configuration cannot" \ |
| " be loaded\nReason: %s" % e |
| sys.exit(1) |
| |
| if not len(args) == 2: |
| print >>sys.stderr, \ |
| "Expected two parameters\nTry 'runner.py --help'" \ |
| " for more information." |
| sys.exit(1) |
| |
| work_dir = os.path.realpath(args[0]) |
| # run_log is created in 'main', because multiple tests are expected to |
| # log in it |
| run_log = os.path.join(work_dir, 'run.log') |
| |
| # Add the path to the image generator module to sys.path |
| sys.path.append(os.path.realpath(os.path.dirname(args[1]))) |
| # Remove a script extension from image generator module if any |
| generator_name = os.path.splitext(os.path.basename(args[1]))[0] |
| |
| try: |
| image_generator = __import__(generator_name) |
| except ImportError as e: |
| print >>sys.stderr, \ |
| "Error: The image generator '%s' cannot be imported.\n" \ |
| "Reason: %s" % (generator_name, e) |
| sys.exit(1) |
| |
| # Enable core dumps |
| resource.setrlimit(resource.RLIMIT_CORE, (-1, -1)) |
| # If a seed is specified, only one test will be executed. |
| # Otherwise runner will terminate after a keyboard interruption |
| start_time = int(time.time()) |
| test_id = count(1) |
| while should_continue(duration, start_time): |
| try: |
| run_test(str(test_id.next()), seed, work_dir, run_log, cleanup, |
| log_all, command, config) |
| except (KeyboardInterrupt, SystemExit): |
| sys.exit(1) |
| |
| if seed is not None: |
| break |