summaryrefslogtreecommitdiffstats
path: root/qemu/tests/image-fuzzer/runner.py
diff options
context:
space:
mode:
Diffstat (limited to 'qemu/tests/image-fuzzer/runner.py')
-rwxr-xr-xqemu/tests/image-fuzzer/runner.py437
1 files changed, 437 insertions, 0 deletions
diff --git a/qemu/tests/image-fuzzer/runner.py b/qemu/tests/image-fuzzer/runner.py
new file mode 100755
index 000000000..0a8743ef4
--- /dev/null
+++ b/qemu/tests/image-fuzzer/runner.py
@@ -0,0 +1,437 @@
+#!/usr/bin/env python
+
+# Tool for running fuzz tests
+#
+# Copyright (C) 2014 Maria Kustova <maria.k@catit.be>
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 2 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+#
+
+import sys
+import os
+import signal
+import subprocess
+import random
+import shutil
+from itertools import count
+import time
+import getopt
+import StringIO
+import resource
+
+try:
+ import json
+except ImportError:
+ try:
+ import simplejson as json
+ except ImportError:
+ print >>sys.stderr, \
+ "Warning: Module for JSON processing is not found.\n" \
+ "'--config' and '--command' options are not supported."
+
+# Backing file sizes in MB
+MAX_BACKING_FILE_SIZE = 10
+MIN_BACKING_FILE_SIZE = 1
+
+
+def multilog(msg, *output):
+ """ Write an object to all of specified file descriptors."""
+ for fd in output:
+ fd.write(msg)
+ fd.flush()
+
+
+def str_signal(sig):
+ """ Convert a numeric value of a system signal to the string one
+ defined by the current operational system.
+ """
+ for k, v in signal.__dict__.items():
+ if v == sig:
+ return k
+
+
+def run_app(fd, q_args):
+ """Start an application with specified arguments and return its exit code
+ or kill signal depending on the result of execution.
+ """
+
+ class Alarm(Exception):
+ """Exception for signal.alarm events."""
+ pass
+
+ def handler(*args):
+ """Notify that an alarm event occurred."""
+ raise Alarm
+
+ signal.signal(signal.SIGALRM, handler)
+ signal.alarm(600)
+ term_signal = signal.SIGKILL
+ devnull = open('/dev/null', 'r+')
+ process = subprocess.Popen(q_args, stdin=devnull,
+ stdout=subprocess.PIPE,
+ stderr=subprocess.PIPE)
+ try:
+ out, err = process.communicate()
+ signal.alarm(0)
+ fd.write(out)
+ fd.write(err)
+ fd.flush()
+ return process.returncode
+
+ except Alarm:
+ os.kill(process.pid, term_signal)
+ fd.write('The command was terminated by timeout.\n')
+ fd.flush()
+ return -term_signal
+
+
+class TestException(Exception):
+ """Exception for errors risen by TestEnv objects."""
+ pass
+
+
+class TestEnv(object):
+
+ """Test object.
+
+ The class sets up test environment, generates backing and test images
+ and executes application under tests with specified arguments and a test
+ image provided.
+
+ All logs are collected.
+
+ The summary log will contain short descriptions and statuses of tests in
+ a run.
+
+ The test log will include application (e.g. 'qemu-img') logs besides info
+ sent to the summary log.
+ """
+
+ def __init__(self, test_id, seed, work_dir, run_log,
+ cleanup=True, log_all=False):
+ """Set test environment in a specified work directory.
+
+ Path to qemu-img and qemu-io will be retrieved from 'QEMU_IMG' and
+ 'QEMU_IO' environment variables.
+ """
+ if seed is not None:
+ self.seed = seed
+ else:
+ self.seed = str(random.randint(0, sys.maxint))
+ random.seed(self.seed)
+
+ self.init_path = os.getcwd()
+ self.work_dir = work_dir
+ self.current_dir = os.path.join(work_dir, 'test-' + test_id)
+ self.qemu_img = \
+ os.environ.get('QEMU_IMG', 'qemu-img').strip().split(' ')
+ self.qemu_io = os.environ.get('QEMU_IO', 'qemu-io').strip().split(' ')
+ self.commands = [['qemu-img', 'check', '-f', 'qcow2', '$test_img'],
+ ['qemu-img', 'info', '-f', 'qcow2', '$test_img'],
+ ['qemu-io', '$test_img', '-c', 'read $off $len'],
+ ['qemu-io', '$test_img', '-c', 'write $off $len'],
+ ['qemu-io', '$test_img', '-c',
+ 'aio_read $off $len'],
+ ['qemu-io', '$test_img', '-c',
+ 'aio_write $off $len'],
+ ['qemu-io', '$test_img', '-c', 'flush'],
+ ['qemu-io', '$test_img', '-c',
+ 'discard $off $len'],
+ ['qemu-io', '$test_img', '-c',
+ 'truncate $off']]
+ for fmt in ['raw', 'vmdk', 'vdi', 'qcow2', 'file', 'qed', 'vpc']:
+ self.commands.append(
+ ['qemu-img', 'convert', '-f', 'qcow2', '-O', fmt,
+ '$test_img', 'converted_image.' + fmt])
+
+ try:
+ os.makedirs(self.current_dir)
+ except OSError, e:
+ print >>sys.stderr, \
+ "Error: The working directory '%s' cannot be used. Reason: %s"\
+ % (self.work_dir, e[1])
+ raise TestException
+ self.log = open(os.path.join(self.current_dir, "test.log"), "w")
+ self.parent_log = open(run_log, "a")
+ self.failed = False
+ self.cleanup = cleanup
+ self.log_all = log_all
+
+ def _create_backing_file(self):
+ """Create a backing file in the current directory.
+
+ Return a tuple of a backing file name and format.
+
+ Format of a backing file is randomly chosen from all formats supported
+ by 'qemu-img create'.
+ """
+ # All formats supported by the 'qemu-img create' command.
+ backing_file_fmt = random.choice(['raw', 'vmdk', 'vdi', 'qcow2',
+ 'file', 'qed', 'vpc'])
+ backing_file_name = 'backing_img.' + backing_file_fmt
+ backing_file_size = random.randint(MIN_BACKING_FILE_SIZE,
+ MAX_BACKING_FILE_SIZE) * (1 << 20)
+ cmd = self.qemu_img + ['create', '-f', backing_file_fmt,
+ backing_file_name, str(backing_file_size)]
+ temp_log = StringIO.StringIO()
+ retcode = run_app(temp_log, cmd)
+ if retcode == 0:
+ temp_log.close()
+ return (backing_file_name, backing_file_fmt)
+ else:
+ multilog("Warning: The %s backing file was not created.\n\n"
+ % backing_file_fmt, sys.stderr, self.log, self.parent_log)
+ self.log.write("Log for the failure:\n" + temp_log.getvalue() +
+ '\n\n')
+ temp_log.close()
+ return (None, None)
+
+ def execute(self, input_commands=None, fuzz_config=None):
+ """ Execute a test.
+
+ The method creates backing and test images, runs test app and analyzes
+ its exit status. If the application was killed by a signal, the test
+ is marked as failed.
+ """
+ if input_commands is None:
+ commands = self.commands
+ else:
+ commands = input_commands
+
+ os.chdir(self.current_dir)
+ backing_file_name, backing_file_fmt = self._create_backing_file()
+ img_size = image_generator.create_image(
+ 'test.img', backing_file_name, backing_file_fmt, fuzz_config)
+ for item in commands:
+ shutil.copy('test.img', 'copy.img')
+ # 'off' and 'len' are multiple of the sector size
+ sector_size = 512
+ start = random.randrange(0, img_size + 1, sector_size)
+ end = random.randrange(start, img_size + 1, sector_size)
+
+ if item[0] == 'qemu-img':
+ current_cmd = list(self.qemu_img)
+ elif item[0] == 'qemu-io':
+ current_cmd = list(self.qemu_io)
+ else:
+ multilog("Warning: test command '%s' is not defined.\n"
+ % item[0], sys.stderr, self.log, self.parent_log)
+ continue
+ # Replace all placeholders with their real values
+ for v in item[1:]:
+ c = (v
+ .replace('$test_img', 'copy.img')
+ .replace('$off', str(start))
+ .replace('$len', str(end - start)))
+ current_cmd.append(c)
+
+ # Log string with the test header
+ test_summary = "Seed: %s\nCommand: %s\nTest directory: %s\n" \
+ "Backing file: %s\n" \
+ % (self.seed, " ".join(current_cmd),
+ self.current_dir, backing_file_name)
+ temp_log = StringIO.StringIO()
+ try:
+ retcode = run_app(temp_log, current_cmd)
+ except OSError, e:
+ multilog("%sError: Start of '%s' failed. Reason: %s\n\n"
+ % (test_summary, os.path.basename(current_cmd[0]),
+ e[1]),
+ sys.stderr, self.log, self.parent_log)
+ raise TestException
+
+ if retcode < 0:
+ self.log.write(temp_log.getvalue())
+ multilog("%sFAIL: Test terminated by signal %s\n\n"
+ % (test_summary, str_signal(-retcode)),
+ sys.stderr, self.log, self.parent_log)
+ self.failed = True
+ else:
+ if self.log_all:
+ self.log.write(temp_log.getvalue())
+ multilog("%sPASS: Application exited with the code " \
+ "'%d'\n\n" % (test_summary, retcode),
+ sys.stdout, self.log, self.parent_log)
+ temp_log.close()
+ os.remove('copy.img')
+
+ def finish(self):
+ """Restore the test environment after a test execution."""
+ self.log.close()
+ self.parent_log.close()
+ os.chdir(self.init_path)
+ if self.cleanup and not self.failed:
+ shutil.rmtree(self.current_dir)
+
+if __name__ == '__main__':
+
+ def usage():
+ print """
+ Usage: runner.py [OPTION...] TEST_DIR IMG_GENERATOR
+
+ Set up test environment in TEST_DIR and run a test in it. A module for
+ test image generation should be specified via IMG_GENERATOR.
+
+ Example:
+ runner.py -c '[["qemu-img", "info", "$test_img"]]' /tmp/test qcow2
+
+ Optional arguments:
+ -h, --help display this help and exit
+ -d, --duration=NUMBER finish tests after NUMBER of seconds
+ -c, --command=JSON run tests for all commands specified in
+ the JSON array
+ -s, --seed=STRING seed for a test image generation,
+ by default will be generated randomly
+ --config=JSON take fuzzer configuration from the JSON
+ array
+ -k, --keep_passed don't remove folders of passed tests
+ -v, --verbose log information about passed tests
+
+ JSON:
+
+ '--command' accepts a JSON array of commands. Each command presents
+ an application under test with all its paramaters as a list of strings,
+ e.g. ["qemu-io", "$test_img", "-c", "write $off $len"].
+
+ Supported application aliases: 'qemu-img' and 'qemu-io'.
+
+ Supported argument aliases: $test_img for the fuzzed image, $off
+ for an offset, $len for length.
+
+ Values for $off and $len will be generated based on the virtual disk
+ size of the fuzzed image.
+
+ Paths to 'qemu-img' and 'qemu-io' are retrevied from 'QEMU_IMG' and
+ 'QEMU_IO' environment variables.
+
+ '--config' accepts a JSON array of fields to be fuzzed, e.g.
+ '[["header"], ["header", "version"]]'.
+
+ Each of the list elements can consist of a complex image element only
+ as ["header"] or ["feature_name_table"] or an exact field as
+ ["header", "version"]. In the first case random portion of the element
+ fields will be fuzzed, in the second one the specified field will be
+ fuzzed always.
+
+ If '--config' argument is specified, fields not listed in
+ the configuration array will not be fuzzed.
+ """
+
+ def run_test(test_id, seed, work_dir, run_log, cleanup, log_all,
+ command, fuzz_config):
+ """Setup environment for one test and execute this test."""
+ try:
+ test = TestEnv(test_id, seed, work_dir, run_log, cleanup,
+ log_all)
+ except TestException:
+ sys.exit(1)
+
+ # Python 2.4 doesn't support 'finally' and 'except' in the same 'try'
+ # block
+ try:
+ try:
+ test.execute(command, fuzz_config)
+ except TestException:
+ sys.exit(1)
+ finally:
+ test.finish()
+
+ def should_continue(duration, start_time):
+ """Return True if a new test can be started and False otherwise."""
+ current_time = int(time.time())
+ return (duration is None) or (current_time - start_time < duration)
+
+ try:
+ opts, args = getopt.gnu_getopt(sys.argv[1:], 'c:hs:kvd:',
+ ['command=', 'help', 'seed=', 'config=',
+ 'keep_passed', 'verbose', 'duration='])
+ except getopt.error, e:
+ print >>sys.stderr, \
+ "Error: %s\n\nTry 'runner.py --help' for more information" % e
+ sys.exit(1)
+
+ command = None
+ cleanup = True
+ log_all = False
+ seed = None
+ config = None
+ duration = None
+ for opt, arg in opts:
+ if opt in ('-h', '--help'):
+ usage()
+ sys.exit()
+ elif opt in ('-c', '--command'):
+ try:
+ command = json.loads(arg)
+ except (TypeError, ValueError, NameError), e:
+ print >>sys.stderr, \
+ "Error: JSON array of test commands cannot be loaded.\n" \
+ "Reason: %s" % e
+ sys.exit(1)
+ elif opt in ('-k', '--keep_passed'):
+ cleanup = False
+ elif opt in ('-v', '--verbose'):
+ log_all = True
+ elif opt in ('-s', '--seed'):
+ seed = arg
+ elif opt in ('-d', '--duration'):
+ duration = int(arg)
+ elif opt == '--config':
+ try:
+ config = json.loads(arg)
+ except (TypeError, ValueError, NameError), e:
+ print >>sys.stderr, \
+ "Error: JSON array with the fuzzer configuration cannot" \
+ " be loaded\nReason: %s" % e
+ sys.exit(1)
+
+ if not len(args) == 2:
+ print >>sys.stderr, \
+ "Expected two parameters\nTry 'runner.py --help'" \
+ " for more information."
+ sys.exit(1)
+
+ work_dir = os.path.realpath(args[0])
+ # run_log is created in 'main', because multiple tests are expected to
+ # log in it
+ run_log = os.path.join(work_dir, 'run.log')
+
+ # Add the path to the image generator module to sys.path
+ sys.path.append(os.path.realpath(os.path.dirname(args[1])))
+ # Remove a script extension from image generator module if any
+ generator_name = os.path.splitext(os.path.basename(args[1]))[0]
+
+ try:
+ image_generator = __import__(generator_name)
+ except ImportError, e:
+ print >>sys.stderr, \
+ "Error: The image generator '%s' cannot be imported.\n" \
+ "Reason: %s" % (generator_name, e)
+ sys.exit(1)
+
+ # Enable core dumps
+ resource.setrlimit(resource.RLIMIT_CORE, (-1, -1))
+ # If a seed is specified, only one test will be executed.
+ # Otherwise runner will terminate after a keyboard interruption
+ start_time = int(time.time())
+ test_id = count(1)
+ while should_continue(duration, start_time):
+ try:
+ run_test(str(test_id.next()), seed, work_dir, run_log, cleanup,
+ log_all, command, config)
+ except (KeyboardInterrupt, SystemExit):
+ sys.exit(1)
+
+ if seed is not None:
+ break