diff options
Diffstat (limited to 'src/ceph/qa/tasks/workunit.py')
-rw-r--r-- | src/ceph/qa/tasks/workunit.py | 486 |
1 files changed, 486 insertions, 0 deletions
diff --git a/src/ceph/qa/tasks/workunit.py b/src/ceph/qa/tasks/workunit.py new file mode 100644 index 0000000..f69b396 --- /dev/null +++ b/src/ceph/qa/tasks/workunit.py @@ -0,0 +1,486 @@ +""" +Workunit task -- Run ceph on sets of specific clients +""" +import logging +import pipes +import os +import re + +from copy import deepcopy +from util import get_remote_for_role + +from teuthology import misc +from teuthology.config import config as teuth_config +from teuthology.orchestra.run import CommandFailedError +from teuthology.parallel import parallel +from teuthology.orchestra import run + +log = logging.getLogger(__name__) + + +class Refspec: + def __init__(self, refspec): + self.refspec = refspec + + def __str__(self): + return self.refspec + + def _clone(self, git_url, clonedir, opts=None): + if opts is None: + opts = [] + return (['rm', '-rf', clonedir] + + [run.Raw('&&')] + + ['git', 'clone'] + opts + + [git_url, clonedir]) + + def _cd(self, clonedir): + return ['cd', clonedir] + + def _checkout(self): + return ['git', 'checkout', self.refspec] + + def clone(self, git_url, clonedir): + return (self._clone(git_url, clonedir) + + [run.Raw('&&')] + + self._cd(clonedir) + + [run.Raw('&&')] + + self._checkout()) + + +class Branch(Refspec): + def __init__(self, tag): + Refspec.__init__(self, tag) + + def clone(self, git_url, clonedir): + opts = ['--depth', '1', + '--branch', self.refspec] + return (self._clone(git_url, clonedir, opts) + + [run.Raw('&&')] + + self._cd(clonedir)) + + +class Head(Refspec): + def __init__(self): + Refspec.__init__(self, 'HEAD') + + def clone(self, git_url, clonedir): + opts = ['--depth', '1'] + return (self._clone(git_url, clonedir, opts) + + [run.Raw('&&')] + + self._cd(clonedir)) + + +def task(ctx, config): + """ + Run ceph on all workunits found under the specified path. + + For example:: + + tasks: + - ceph: + - ceph-fuse: [client.0] + - workunit: + clients: + client.0: [direct_io, xattrs.sh] + client.1: [snaps] + branch: foo + + You can also run a list of workunits on all clients: + tasks: + - ceph: + - ceph-fuse: + - workunit: + tag: v0.47 + clients: + all: [direct_io, xattrs.sh, snaps] + + If you have an "all" section it will run all the workunits + on each client simultaneously, AFTER running any workunits specified + for individual clients. (This prevents unintended simultaneous runs.) + + To customize tests, you can specify environment variables as a dict. You + can also specify a time limit for each work unit (defaults to 3h): + + tasks: + - ceph: + - ceph-fuse: + - workunit: + sha1: 9b28948635b17165d17c1cf83d4a870bd138ddf6 + clients: + all: [snaps] + env: + FOO: bar + BAZ: quux + timeout: 3h + + This task supports roles that include a ceph cluster, e.g.:: + + tasks: + - ceph: + - workunit: + clients: + backup.client.0: [foo] + client.1: [bar] # cluster is implicitly 'ceph' + + You can also specify an alternative top-level dir to 'qa/workunits', like + 'qa/standalone', with:: + + tasks: + - install: + - workunit: + basedir: qa/standalone + clients: + client.0: + - test-ceph-helpers.sh + + :param ctx: Context + :param config: Configuration + """ + assert isinstance(config, dict) + assert isinstance(config.get('clients'), dict), \ + 'configuration must contain a dictionary of clients' + + # mimic the behavior of the "install" task, where the "overrides" are + # actually the defaults of that task. in other words, if none of "sha1", + # "tag", or "branch" is specified by a "workunit" tasks, we will update + # it with the information in the "workunit" sub-task nested in "overrides". + overrides = deepcopy(ctx.config.get('overrides', {}).get('workunit', {})) + refspecs = {'branch': Branch, 'tag': Refspec, 'sha1': Refspec} + if any(map(lambda i: i in config, refspecs.iterkeys())): + for i in refspecs.iterkeys(): + overrides.pop(i, None) + misc.deep_merge(config, overrides) + + for spec, cls in refspecs.iteritems(): + refspec = config.get(spec) + if refspec: + refspec = cls(refspec) + break + if refspec is None: + refspec = Head() + + timeout = config.get('timeout', '3h') + + log.info('Pulling workunits from ref %s', refspec) + + created_mountpoint = {} + + if config.get('env') is not None: + assert isinstance(config['env'], dict), 'env must be a dictionary' + clients = config['clients'] + + # Create scratch dirs for any non-all workunits + log.info('Making a separate scratch dir for every client...') + for role in clients.iterkeys(): + assert isinstance(role, basestring) + if role == "all": + continue + + assert 'client' in role + created_mnt_dir = _make_scratch_dir(ctx, role, config.get('subdir')) + created_mountpoint[role] = created_mnt_dir + + # Execute any non-all workunits + with parallel() as p: + for role, tests in clients.iteritems(): + if role != "all": + p.spawn(_run_tests, ctx, refspec, role, tests, + config.get('env'), + basedir=config.get('basedir','qa/workunits'), + timeout=timeout) + + # Clean up dirs from any non-all workunits + for role, created in created_mountpoint.items(): + _delete_dir(ctx, role, created) + + # Execute any 'all' workunits + if 'all' in clients: + all_tasks = clients["all"] + _spawn_on_all_clients(ctx, refspec, all_tasks, config.get('env'), + config.get('basedir', 'qa/workunits'), + config.get('subdir'), timeout=timeout) + + +def _client_mountpoint(ctx, cluster, id_): + """ + Returns the path to the expected mountpoint for workunits running + on some kind of filesystem. + """ + # for compatibility with tasks like ceph-fuse that aren't cluster-aware yet, + # only include the cluster name in the dir if the cluster is not 'ceph' + if cluster == 'ceph': + dir_ = 'mnt.{0}'.format(id_) + else: + dir_ = 'mnt.{0}.{1}'.format(cluster, id_) + return os.path.join(misc.get_testdir(ctx), dir_) + + +def _delete_dir(ctx, role, created_mountpoint): + """ + Delete file used by this role, and delete the directory that this + role appeared in. + + :param ctx: Context + :param role: "role.#" where # is used for the role id. + """ + cluster, _, id_ = misc.split_role(role) + remote = get_remote_for_role(ctx, role) + mnt = _client_mountpoint(ctx, cluster, id_) + client = os.path.join(mnt, 'client.{id}'.format(id=id_)) + + # Remove the directory inside the mount where the workunit ran + remote.run( + args=[ + 'sudo', + 'rm', + '-rf', + '--', + client, + ], + ) + log.info("Deleted dir {dir}".format(dir=client)) + + # If the mount was an artificially created dir, delete that too + if created_mountpoint: + remote.run( + args=[ + 'rmdir', + '--', + mnt, + ], + ) + log.info("Deleted artificial mount point {dir}".format(dir=client)) + + +def _make_scratch_dir(ctx, role, subdir): + """ + Make scratch directories for this role. This also makes the mount + point if that directory does not exist. + + :param ctx: Context + :param role: "role.#" where # is used for the role id. + :param subdir: use this subdir (False if not used) + """ + created_mountpoint = False + cluster, _, id_ = misc.split_role(role) + remote = get_remote_for_role(ctx, role) + dir_owner = remote.user + mnt = _client_mountpoint(ctx, cluster, id_) + # if neither kclient nor ceph-fuse are required for a workunit, + # mnt may not exist. Stat and create the directory if it doesn't. + try: + remote.run( + args=[ + 'stat', + '--', + mnt, + ], + ) + log.info('Did not need to create dir {dir}'.format(dir=mnt)) + except CommandFailedError: + remote.run( + args=[ + 'mkdir', + '--', + mnt, + ], + ) + log.info('Created dir {dir}'.format(dir=mnt)) + created_mountpoint = True + + if not subdir: + subdir = 'client.{id}'.format(id=id_) + + if created_mountpoint: + remote.run( + args=[ + 'cd', + '--', + mnt, + run.Raw('&&'), + 'mkdir', + '--', + subdir, + ], + ) + else: + remote.run( + args=[ + # cd first so this will fail if the mount point does + # not exist; pure install -d will silently do the + # wrong thing + 'cd', + '--', + mnt, + run.Raw('&&'), + 'sudo', + 'install', + '-d', + '-m', '0755', + '--owner={user}'.format(user=dir_owner), + '--', + subdir, + ], + ) + + return created_mountpoint + + +def _spawn_on_all_clients(ctx, refspec, tests, env, basedir, subdir, timeout=None): + """ + Make a scratch directory for each client in the cluster, and then for each + test spawn _run_tests() for each role. + + See run_tests() for parameter documentation. + """ + is_client = misc.is_type('client') + client_remotes = {} + created_mountpoint = {} + for remote, roles_for_host in ctx.cluster.remotes.items(): + for role in roles_for_host: + if is_client(role): + client_remotes[role] = remote + created_mountpoint[role] = _make_scratch_dir(ctx, role, subdir) + + for unit in tests: + with parallel() as p: + for role, remote in client_remotes.items(): + p.spawn(_run_tests, ctx, refspec, role, [unit], env, + basedir, + subdir, + timeout=timeout) + + # cleanup the generated client directories + for role, _ in client_remotes.items(): + _delete_dir(ctx, role, created_mountpoint[role]) + + +def _run_tests(ctx, refspec, role, tests, env, basedir, + subdir=None, timeout=None): + """ + Run the individual test. Create a scratch directory and then extract the + workunits from git. Make the executables, and then run the tests. + Clean up (remove files created) after the tests are finished. + + :param ctx: Context + :param refspec: branch, sha1, or version tag used to identify this + build + :param tests: specific tests specified. + :param env: environment set in yaml file. Could be None. + :param subdir: subdirectory set in yaml file. Could be None + :param timeout: If present, use the 'timeout' command on the remote host + to limit execution time. Must be specified by a number + followed by 's' for seconds, 'm' for minutes, 'h' for + hours, or 'd' for days. If '0' or anything that evaluates + to False is passed, the 'timeout' command is not used. + """ + testdir = misc.get_testdir(ctx) + assert isinstance(role, basestring) + cluster, type_, id_ = misc.split_role(role) + assert type_ == 'client' + remote = get_remote_for_role(ctx, role) + mnt = _client_mountpoint(ctx, cluster, id_) + # subdir so we can remove and recreate this a lot without sudo + if subdir is None: + scratch_tmp = os.path.join(mnt, 'client.{id}'.format(id=id_), 'tmp') + else: + scratch_tmp = os.path.join(mnt, subdir) + clonedir = '{tdir}/clone.{role}'.format(tdir=testdir, role=role) + srcdir = '{cdir}/{basedir}'.format(cdir=clonedir, + basedir=basedir) + + git_url = teuth_config.get_ceph_qa_suite_git_url() + # if we are running an upgrade test, and ceph-ci does not have branches like + # `jewel`, so should use ceph.git as an alternative. + try: + remote.run(logger=log.getChild(role), + args=refspec.clone(git_url, clonedir)) + except CommandFailedError: + if git_url.endswith('/ceph-ci.git'): + alt_git_url = git_url.replace('/ceph-ci.git', '/ceph.git') + elif git_url.endswith('/ceph-ci'): + alt_git_url = re.sub(r'/ceph-ci$', '/ceph.git', git_url) + else: + raise + log.info( + "failed to check out '%s' from %s; will also try in %s", + refspec, + git_url, + alt_git_url, + ) + remote.run(logger=log.getChild(role), + args=refspec.clone(alt_git_url, clonedir)) + remote.run( + logger=log.getChild(role), + args=[ + 'cd', '--', srcdir, + run.Raw('&&'), + 'if', 'test', '-e', 'Makefile', run.Raw(';'), 'then', 'make', run.Raw(';'), 'fi', + run.Raw('&&'), + 'find', '-executable', '-type', 'f', '-printf', r'%P\0'.format(srcdir=srcdir), + run.Raw('>{tdir}/workunits.list.{role}'.format(tdir=testdir, role=role)), + ], + ) + + workunits_file = '{tdir}/workunits.list.{role}'.format(tdir=testdir, role=role) + workunits = sorted(misc.get_file(remote, workunits_file).split('\0')) + assert workunits + + try: + assert isinstance(tests, list) + for spec in tests: + log.info('Running workunits matching %s on %s...', spec, role) + prefix = '{spec}/'.format(spec=spec) + to_run = [w for w in workunits if w == spec or w.startswith(prefix)] + if not to_run: + raise RuntimeError('Spec did not match any workunits: {spec!r}'.format(spec=spec)) + for workunit in to_run: + log.info('Running workunit %s...', workunit) + args = [ + 'mkdir', '-p', '--', scratch_tmp, + run.Raw('&&'), + 'cd', '--', scratch_tmp, + run.Raw('&&'), + run.Raw('CEPH_CLI_TEST_DUP_COMMAND=1'), + run.Raw('CEPH_REF={ref}'.format(ref=refspec)), + run.Raw('TESTDIR="{tdir}"'.format(tdir=testdir)), + run.Raw('CEPH_ARGS="--cluster {0}"'.format(cluster)), + run.Raw('CEPH_ID="{id}"'.format(id=id_)), + run.Raw('PATH=$PATH:/usr/sbin'), + run.Raw('CEPH_BASE={dir}'.format(dir=clonedir)), + run.Raw('CEPH_ROOT={dir}'.format(dir=clonedir)), + ] + if env is not None: + for var, val in env.iteritems(): + quoted_val = pipes.quote(val) + env_arg = '{var}={val}'.format(var=var, val=quoted_val) + args.append(run.Raw(env_arg)) + args.extend([ + 'adjust-ulimits', + 'ceph-coverage', + '{tdir}/archive/coverage'.format(tdir=testdir)]) + if timeout and timeout != '0': + args.extend(['timeout', timeout]) + args.extend([ + '{srcdir}/{workunit}'.format( + srcdir=srcdir, + workunit=workunit, + ), + ]) + remote.run( + logger=log.getChild(role), + args=args, + label="workunit test {workunit}".format(workunit=workunit) + ) + remote.run( + logger=log.getChild(role), + args=['sudo', 'rm', '-rf', '--', scratch_tmp], + ) + finally: + log.info('Stopping %s on %s...', tests, role) + remote.run( + logger=log.getChild(role), + args=[ + 'rm', '-rf', '--', workunits_file, clonedir, + ], + ) |