path: root/qemu/tests/image-fuzzer/
diff options
Diffstat (limited to 'qemu/tests/image-fuzzer/')
1 files changed, 0 insertions, 437 deletions
diff --git a/qemu/tests/image-fuzzer/ b/qemu/tests/image-fuzzer/
deleted file mode 100755
index 96a1c11b2..000000000
--- a/qemu/tests/image-fuzzer/
+++ /dev/null
@@ -1,437 +0,0 @@
-#!/usr/bin/env python
-# Tool for running fuzz tests
-# Copyright (C) 2014 Maria Kustova <>
-# This program is free software: you can redistribute it and/or modify
-# it under the terms of the GNU General Public License as published by
-# the Free Software Foundation, either version 2 of the License, or
-# (at your option) any later version.
-# This program is distributed in the hope that it will be useful,
-# but WITHOUT ANY WARRANTY; without even the implied warranty of
-# GNU General Public License for more details.
-# You should have received a copy of the GNU General Public License
-# along with this program. If not, see <>.
-import sys
-import os
-import signal
-import subprocess
-import random
-import shutil
-from itertools import count
-import time
-import getopt
-import StringIO
-import resource
- import json
-except ImportError:
- try:
- import simplejson as json
- except ImportError:
- print >>sys.stderr, \
- "Warning: Module for JSON processing is not found.\n" \
- "'--config' and '--command' options are not supported."
-# Backing file sizes in MB
-def multilog(msg, *output):
- """ Write an object to all of specified file descriptors."""
- for fd in output:
- fd.write(msg)
- fd.flush()
-def str_signal(sig):
- """ Convert a numeric value of a system signal to the string one
- defined by the current operational system.
- """
- for k, v in signal.__dict__.items():
- if v == sig:
- return k
-def run_app(fd, q_args):
- """Start an application with specified arguments and return its exit code
- or kill signal depending on the result of execution.
- """
- class Alarm(Exception):
- """Exception for signal.alarm events."""
- pass
- def handler(*args):
- """Notify that an alarm event occurred."""
- raise Alarm
- signal.signal(signal.SIGALRM, handler)
- signal.alarm(600)
- term_signal = signal.SIGKILL
- devnull = open('/dev/null', 'r+')
- process = subprocess.Popen(q_args, stdin=devnull,
- stdout=subprocess.PIPE,
- stderr=subprocess.PIPE)
- try:
- out, err = process.communicate()
- signal.alarm(0)
- fd.write(out)
- fd.write(err)
- fd.flush()
- return process.returncode
- except Alarm:
- os.kill(, term_signal)
- fd.write('The command was terminated by timeout.\n')
- fd.flush()
- return -term_signal
-class TestException(Exception):
- """Exception for errors risen by TestEnv objects."""
- pass
-class TestEnv(object):
- """Test object.
- The class sets up test environment, generates backing and test images
- and executes application under tests with specified arguments and a test
- image provided.
- All logs are collected.
- The summary log will contain short descriptions and statuses of tests in
- a run.
- The test log will include application (e.g. 'qemu-img') logs besides info
- sent to the summary log.
- """
- def __init__(self, test_id, seed, work_dir, run_log,
- cleanup=True, log_all=False):
- """Set test environment in a specified work directory.
- Path to qemu-img and qemu-io will be retrieved from 'QEMU_IMG' and
- 'QEMU_IO' environment variables.
- """
- if seed is not None:
- self.seed = seed
- else:
- self.seed = str(random.randint(0, sys.maxint))
- random.seed(self.seed)
- self.init_path = os.getcwd()
- self.work_dir = work_dir
- self.current_dir = os.path.join(work_dir, 'test-' + test_id)
- self.qemu_img = \
- os.environ.get('QEMU_IMG', 'qemu-img').strip().split(' ')
- self.qemu_io = os.environ.get('QEMU_IO', 'qemu-io').strip().split(' ')
- self.commands = [['qemu-img', 'check', '-f', 'qcow2', '$test_img'],
- ['qemu-img', 'info', '-f', 'qcow2', '$test_img'],
- ['qemu-io', '$test_img', '-c', 'read $off $len'],
- ['qemu-io', '$test_img', '-c', 'write $off $len'],
- ['qemu-io', '$test_img', '-c',
- 'aio_read $off $len'],
- ['qemu-io', '$test_img', '-c',
- 'aio_write $off $len'],
- ['qemu-io', '$test_img', '-c', 'flush'],
- ['qemu-io', '$test_img', '-c',
- 'discard $off $len'],
- ['qemu-io', '$test_img', '-c',
- 'truncate $off']]
- for fmt in ['raw', 'vmdk', 'vdi', 'qcow2', 'file', 'qed', 'vpc']:
- self.commands.append(
- ['qemu-img', 'convert', '-f', 'qcow2', '-O', fmt,
- '$test_img', 'converted_image.' + fmt])
- try:
- os.makedirs(self.current_dir)
- except OSError as e:
- print >>sys.stderr, \
- "Error: The working directory '%s' cannot be used. Reason: %s"\
- % (self.work_dir, e[1])
- raise TestException
- self.log = open(os.path.join(self.current_dir, "test.log"), "w")
- self.parent_log = open(run_log, "a")
- self.failed = False
- self.cleanup = cleanup
- self.log_all = log_all
- def _create_backing_file(self):
- """Create a backing file in the current directory.
- Return a tuple of a backing file name and format.
- Format of a backing file is randomly chosen from all formats supported
- by 'qemu-img create'.
- """
- # All formats supported by the 'qemu-img create' command.
- backing_file_fmt = random.choice(['raw', 'vmdk', 'vdi', 'qcow2',
- 'file', 'qed', 'vpc'])
- backing_file_name = 'backing_img.' + backing_file_fmt
- backing_file_size = random.randint(MIN_BACKING_FILE_SIZE,
- MAX_BACKING_FILE_SIZE) * (1 << 20)
- cmd = self.qemu_img + ['create', '-f', backing_file_fmt,
- backing_file_name, str(backing_file_size)]
- temp_log = StringIO.StringIO()
- retcode = run_app(temp_log, cmd)
- if retcode == 0:
- temp_log.close()
- return (backing_file_name, backing_file_fmt)
- else:
- multilog("Warning: The %s backing file was not created.\n\n"
- % backing_file_fmt, sys.stderr, self.log, self.parent_log)
- self.log.write("Log for the failure:\n" + temp_log.getvalue() +
- '\n\n')
- temp_log.close()
- return (None, None)
- def execute(self, input_commands=None, fuzz_config=None):
- """ Execute a test.
- The method creates backing and test images, runs test app and analyzes
- its exit status. If the application was killed by a signal, the test
- is marked as failed.
- """
- if input_commands is None:
- commands = self.commands
- else:
- commands = input_commands
- os.chdir(self.current_dir)
- backing_file_name, backing_file_fmt = self._create_backing_file()
- img_size = image_generator.create_image(
- 'test.img', backing_file_name, backing_file_fmt, fuzz_config)
- for item in commands:
- shutil.copy('test.img', 'copy.img')
- # 'off' and 'len' are multiple of the sector size
- sector_size = 512
- start = random.randrange(0, img_size + 1, sector_size)
- end = random.randrange(start, img_size + 1, sector_size)
- if item[0] == 'qemu-img':
- current_cmd = list(self.qemu_img)
- elif item[0] == 'qemu-io':
- current_cmd = list(self.qemu_io)
- else:
- multilog("Warning: test command '%s' is not defined.\n"
- % item[0], sys.stderr, self.log, self.parent_log)
- continue
- # Replace all placeholders with their real values
- for v in item[1:]:
- c = (v
- .replace('$test_img', 'copy.img')
- .replace('$off', str(start))
- .replace('$len', str(end - start)))
- current_cmd.append(c)
- # Log string with the test header
- test_summary = "Seed: %s\nCommand: %s\nTest directory: %s\n" \
- "Backing file: %s\n" \
- % (self.seed, " ".join(current_cmd),
- self.current_dir, backing_file_name)
- temp_log = StringIO.StringIO()
- try:
- retcode = run_app(temp_log, current_cmd)
- except OSError as e:
- multilog("%sError: Start of '%s' failed. Reason: %s\n\n"
- % (test_summary, os.path.basename(current_cmd[0]),
- e[1]),
- sys.stderr, self.log, self.parent_log)
- raise TestException
- if retcode < 0:
- self.log.write(temp_log.getvalue())
- multilog("%sFAIL: Test terminated by signal %s\n\n"
- % (test_summary, str_signal(-retcode)),
- sys.stderr, self.log, self.parent_log)
- self.failed = True
- else:
- if self.log_all:
- self.log.write(temp_log.getvalue())
- multilog("%sPASS: Application exited with the code " \
- "'%d'\n\n" % (test_summary, retcode),
- sys.stdout, self.log, self.parent_log)
- temp_log.close()
- os.remove('copy.img')
- def finish(self):
- """Restore the test environment after a test execution."""
- self.log.close()
- self.parent_log.close()
- os.chdir(self.init_path)
- if self.cleanup and not self.failed:
- shutil.rmtree(self.current_dir)
-if __name__ == '__main__':
- def usage():
- print """
- Set up test environment in TEST_DIR and run a test in it. A module for
- test image generation should be specified via IMG_GENERATOR.
- Example:
- -c '[["qemu-img", "info", "$test_img"]]' /tmp/test qcow2
- Optional arguments:
- -h, --help display this help and exit
- -d, --duration=NUMBER finish tests after NUMBER of seconds
- -c, --command=JSON run tests for all commands specified in
- the JSON array
- -s, --seed=STRING seed for a test image generation,
- by default will be generated randomly
- --config=JSON take fuzzer configuration from the JSON
- array
- -k, --keep_passed don't remove folders of passed tests
- -v, --verbose log information about passed tests
- '--command' accepts a JSON array of commands. Each command presents
- an application under test with all its parameters as a list of strings,
- e.g. ["qemu-io", "$test_img", "-c", "write $off $len"].
- Supported application aliases: 'qemu-img' and 'qemu-io'.
- Supported argument aliases: $test_img for the fuzzed image, $off
- for an offset, $len for length.
- Values for $off and $len will be generated based on the virtual disk
- size of the fuzzed image.
- Paths to 'qemu-img' and 'qemu-io' are retrevied from 'QEMU_IMG' and
- 'QEMU_IO' environment variables.
- '--config' accepts a JSON array of fields to be fuzzed, e.g.
- '[["header"], ["header", "version"]]'.
- Each of the list elements can consist of a complex image element only
- as ["header"] or ["feature_name_table"] or an exact field as
- ["header", "version"]. In the first case random portion of the element
- fields will be fuzzed, in the second one the specified field will be
- fuzzed always.
- If '--config' argument is specified, fields not listed in
- the configuration array will not be fuzzed.
- """
- def run_test(test_id, seed, work_dir, run_log, cleanup, log_all,
- command, fuzz_config):
- """Setup environment for one test and execute this test."""
- try:
- test = TestEnv(test_id, seed, work_dir, run_log, cleanup,
- log_all)
- except TestException:
- sys.exit(1)
- # Python 2.4 doesn't support 'finally' and 'except' in the same 'try'
- # block
- try:
- try:
- test.execute(command, fuzz_config)
- except TestException:
- sys.exit(1)
- finally:
- test.finish()
- def should_continue(duration, start_time):
- """Return True if a new test can be started and False otherwise."""
- current_time = int(time.time())
- return (duration is None) or (current_time - start_time < duration)
- try:
- opts, args = getopt.gnu_getopt(sys.argv[1:], 'c:hs:kvd:',
- ['command=', 'help', 'seed=', 'config=',
- 'keep_passed', 'verbose', 'duration='])
- except getopt.error as e:
- print >>sys.stderr, \
- "Error: %s\n\nTry ' --help' for more information" % e
- sys.exit(1)
- command = None
- cleanup = True
- log_all = False
- seed = None
- config = None
- duration = None
- for opt, arg in opts:
- if opt in ('-h', '--help'):
- usage()
- sys.exit()
- elif opt in ('-c', '--command'):
- try:
- command = json.loads(arg)
- except (TypeError, ValueError, NameError) as e:
- print >>sys.stderr, \
- "Error: JSON array of test commands cannot be loaded.\n" \
- "Reason: %s" % e
- sys.exit(1)
- elif opt in ('-k', '--keep_passed'):
- cleanup = False
- elif opt in ('-v', '--verbose'):
- log_all = True
- elif opt in ('-s', '--seed'):
- seed = arg
- elif opt in ('-d', '--duration'):
- duration = int(arg)
- elif opt == '--config':
- try:
- config = json.loads(arg)
- except (TypeError, ValueError, NameError) as e:
- print >>sys.stderr, \
- "Error: JSON array with the fuzzer configuration cannot" \
- " be loaded\nReason: %s" % e
- sys.exit(1)
- if not len(args) == 2:
- print >>sys.stderr, \
- "Expected two parameters\nTry ' --help'" \
- " for more information."
- sys.exit(1)
- work_dir = os.path.realpath(args[0])
- # run_log is created in 'main', because multiple tests are expected to
- # log in it
- run_log = os.path.join(work_dir, 'run.log')
- # Add the path to the image generator module to sys.path
- sys.path.append(os.path.realpath(os.path.dirname(args[1])))
- # Remove a script extension from image generator module if any
- generator_name = os.path.splitext(os.path.basename(args[1]))[0]
- try:
- image_generator = __import__(generator_name)
- except ImportError as e:
- print >>sys.stderr, \
- "Error: The image generator '%s' cannot be imported.\n" \
- "Reason: %s" % (generator_name, e)
- sys.exit(1)
- # Enable core dumps
- resource.setrlimit(resource.RLIMIT_CORE, (-1, -1))
- # If a seed is specified, only one test will be executed.
- # Otherwise runner will terminate after a keyboard interruption
- start_time = int(time.time())
- test_id = count(1)
- while should_continue(duration, start_time):
- try:
- run_test(str(, seed, work_dir, run_log, cleanup,
- log_all, command, config)
- except (KeyboardInterrupt, SystemExit):
- sys.exit(1)
- if seed is not None:
- break