From 7da45d65be36d36b880cc55c5036e96c24b53f00 Mon Sep 17 00:00:00 2001 From: Qiaowei Ren Date: Thu, 1 Mar 2018 14:38:11 +0800 Subject: remove ceph code This patch removes initial ceph code, due to license issue. Change-Id: I092d44f601cdf34aed92300fe13214925563081c Signed-off-by: Qiaowei Ren --- src/ceph/qa/tasks/cephfs/filesystem.py | 1213 -------------------------------- 1 file changed, 1213 deletions(-) delete mode 100644 src/ceph/qa/tasks/cephfs/filesystem.py (limited to 'src/ceph/qa/tasks/cephfs/filesystem.py') diff --git a/src/ceph/qa/tasks/cephfs/filesystem.py b/src/ceph/qa/tasks/cephfs/filesystem.py deleted file mode 100644 index 9638fd5..0000000 --- a/src/ceph/qa/tasks/cephfs/filesystem.py +++ /dev/null @@ -1,1213 +0,0 @@ - -from StringIO import StringIO -import json -import logging -from gevent import Greenlet -import os -import time -import datetime -import re -import errno -import random - -from teuthology.exceptions import CommandFailedError -from teuthology import misc -from teuthology.nuke import clear_firewall -from teuthology.parallel import parallel -from tasks.ceph_manager import write_conf -from tasks import ceph_manager - - -log = logging.getLogger(__name__) - - -DAEMON_WAIT_TIMEOUT = 120 -ROOT_INO = 1 - - -class ObjectNotFound(Exception): - def __init__(self, object_name): - self._object_name = object_name - - def __str__(self): - return "Object not found: '{0}'".format(self._object_name) - -class FSStatus(object): - """ - Operations on a snapshot of the FSMap. - """ - def __init__(self, mon_manager): - self.mon = mon_manager - self.map = json.loads(self.mon.raw_cluster_cmd("fs", "dump", "--format=json")) - - def __str__(self): - return json.dumps(self.map, indent = 2, sort_keys = True) - - # Expose the fsmap for manual inspection. - def __getitem__(self, key): - """ - Get a field from the fsmap. - """ - return self.map[key] - - def get_filesystems(self): - """ - Iterator for all filesystems. - """ - for fs in self.map['filesystems']: - yield fs - - def get_all(self): - """ - Iterator for all the mds_info components in the FSMap. - """ - for info in self.get_standbys(): - yield info - for fs in self.map['filesystems']: - for info in fs['mdsmap']['info'].values(): - yield info - - def get_standbys(self): - """ - Iterator for all standbys. - """ - for info in self.map['standbys']: - yield info - - def get_fsmap(self, fscid): - """ - Get the fsmap for the given FSCID. - """ - for fs in self.map['filesystems']: - if fscid is None or fs['id'] == fscid: - return fs - raise RuntimeError("FSCID {0} not in map".format(fscid)) - - def get_fsmap_byname(self, name): - """ - Get the fsmap for the given file system name. - """ - for fs in self.map['filesystems']: - if name is None or fs['mdsmap']['fs_name'] == name: - return fs - raise RuntimeError("FS {0} not in map".format(name)) - - def get_replays(self, fscid): - """ - Get the standby:replay MDS for the given FSCID. - """ - fs = self.get_fsmap(fscid) - for info in fs['mdsmap']['info'].values(): - if info['state'] == 'up:standby-replay': - yield info - - def get_ranks(self, fscid): - """ - Get the ranks for the given FSCID. - """ - fs = self.get_fsmap(fscid) - for info in fs['mdsmap']['info'].values(): - if info['rank'] >= 0: - yield info - - def get_rank(self, fscid, rank): - """ - Get the rank for the given FSCID. - """ - for info in self.get_ranks(fscid): - if info['rank'] == rank: - return info - raise RuntimeError("FSCID {0} has no rank {1}".format(fscid, rank)) - - def get_mds(self, name): - """ - Get the info for the given MDS name. - """ - for info in self.get_all(): - if info['name'] == name: - return info - return None - - def get_mds_addr(self, name): - """ - Return the instance addr as a string, like "10.214.133.138:6807\/10825" - """ - info = self.get_mds(name) - if info: - return info['addr'] - else: - log.warn(json.dumps(list(self.get_all()), indent=2)) # dump for debugging - raise RuntimeError("MDS id '{0}' not found in map".format(name)) - -class CephCluster(object): - @property - def admin_remote(self): - first_mon = misc.get_first_mon(self._ctx, None) - (result,) = self._ctx.cluster.only(first_mon).remotes.iterkeys() - return result - - def __init__(self, ctx): - self._ctx = ctx - self.mon_manager = ceph_manager.CephManager(self.admin_remote, ctx=ctx, logger=log.getChild('ceph_manager')) - - def get_config(self, key, service_type=None): - """ - Get config from mon by default, or a specific service if caller asks for it - """ - if service_type is None: - service_type = 'mon' - - service_id = sorted(misc.all_roles_of_type(self._ctx.cluster, service_type))[0] - return self.json_asok(['config', 'get', key], service_type, service_id)[key] - - def set_ceph_conf(self, subsys, key, value): - if subsys not in self._ctx.ceph['ceph'].conf: - self._ctx.ceph['ceph'].conf[subsys] = {} - self._ctx.ceph['ceph'].conf[subsys][key] = value - write_conf(self._ctx) # XXX because we don't have the ceph task's config object, if they - # used a different config path this won't work. - - def clear_ceph_conf(self, subsys, key): - del self._ctx.ceph['ceph'].conf[subsys][key] - write_conf(self._ctx) - - def json_asok(self, command, service_type, service_id): - proc = self.mon_manager.admin_socket(service_type, service_id, command) - response_data = proc.stdout.getvalue() - log.info("_json_asok output: {0}".format(response_data)) - if response_data.strip(): - return json.loads(response_data) - else: - return None - - -class MDSCluster(CephCluster): - """ - Collective operations on all the MDS daemons in the Ceph cluster. These - daemons may be in use by various Filesystems. - - For the benefit of pre-multi-filesystem tests, this class is also - a parent of Filesystem. The correct way to use MDSCluster going forward is - as a separate instance outside of your (multiple) Filesystem instances. - """ - def __init__(self, ctx): - super(MDSCluster, self).__init__(ctx) - - self.mds_ids = list(misc.all_roles_of_type(ctx.cluster, 'mds')) - - if len(self.mds_ids) == 0: - raise RuntimeError("This task requires at least one MDS") - - if hasattr(self._ctx, "daemons"): - # Presence of 'daemons' attribute implies ceph task rather than ceph_deploy task - self.mds_daemons = dict([(mds_id, self._ctx.daemons.get_daemon('mds', mds_id)) for mds_id in self.mds_ids]) - - def _one_or_all(self, mds_id, cb, in_parallel=True): - """ - Call a callback for a single named MDS, or for all. - - Note that the parallelism here isn't for performance, it's to avoid being overly kind - to the cluster by waiting a graceful ssh-latency of time between doing things, and to - avoid being overly kind by executing them in a particular order. However, some actions - don't cope with being done in parallel, so it's optional (`in_parallel`) - - :param mds_id: MDS daemon name, or None - :param cb: Callback taking single argument of MDS daemon name - :param in_parallel: whether to invoke callbacks concurrently (else one after the other) - """ - if mds_id is None: - if in_parallel: - with parallel() as p: - for mds_id in self.mds_ids: - p.spawn(cb, mds_id) - else: - for mds_id in self.mds_ids: - cb(mds_id) - else: - cb(mds_id) - - def get_config(self, key, service_type=None): - """ - get_config specialization of service_type="mds" - """ - if service_type != "mds": - return super(MDSCluster, self).get_config(key, service_type) - - # Some tests stop MDS daemons, don't send commands to a dead one: - service_id = random.sample(filter(lambda i: self.mds_daemons[i].running(), self.mds_daemons), 1)[0] - return self.json_asok(['config', 'get', key], service_type, service_id)[key] - - def mds_stop(self, mds_id=None): - """ - Stop the MDS daemon process(se). If it held a rank, that rank - will eventually go laggy. - """ - self._one_or_all(mds_id, lambda id_: self.mds_daemons[id_].stop()) - - def mds_fail(self, mds_id=None): - """ - Inform MDSMonitor of the death of the daemon process(es). If it held - a rank, that rank will be relinquished. - """ - self._one_or_all(mds_id, lambda id_: self.mon_manager.raw_cluster_cmd("mds", "fail", id_)) - - def mds_restart(self, mds_id=None): - self._one_or_all(mds_id, lambda id_: self.mds_daemons[id_].restart()) - - def mds_fail_restart(self, mds_id=None): - """ - Variation on restart that includes marking MDSs as failed, so that doing this - operation followed by waiting for healthy daemon states guarantees that they - have gone down and come up, rather than potentially seeing the healthy states - that existed before the restart. - """ - def _fail_restart(id_): - self.mds_daemons[id_].stop() - self.mon_manager.raw_cluster_cmd("mds", "fail", id_) - self.mds_daemons[id_].restart() - - self._one_or_all(mds_id, _fail_restart) - - def newfs(self, name='cephfs', create=True): - return Filesystem(self._ctx, name=name, create=create) - - def status(self): - return FSStatus(self.mon_manager) - - def delete_all_filesystems(self): - """ - Remove all filesystems that exist, and any pools in use by them. - """ - pools = json.loads(self.mon_manager.raw_cluster_cmd("osd", "dump", "--format=json-pretty"))['pools'] - pool_id_name = {} - for pool in pools: - pool_id_name[pool['pool']] = pool['pool_name'] - - # mark cluster down for each fs to prevent churn during deletion - status = self.status() - for fs in status.get_filesystems(): - self.mon_manager.raw_cluster_cmd("fs", "set", fs['mdsmap']['fs_name'], "cluster_down", "true") - - # get a new copy as actives may have since changed - status = self.status() - for fs in status.get_filesystems(): - mdsmap = fs['mdsmap'] - metadata_pool = pool_id_name[mdsmap['metadata_pool']] - - for gid in mdsmap['up'].values(): - self.mon_manager.raw_cluster_cmd('mds', 'fail', gid.__str__()) - - self.mon_manager.raw_cluster_cmd('fs', 'rm', mdsmap['fs_name'], '--yes-i-really-mean-it') - self.mon_manager.raw_cluster_cmd('osd', 'pool', 'delete', - metadata_pool, metadata_pool, - '--yes-i-really-really-mean-it') - for data_pool in mdsmap['data_pools']: - data_pool = pool_id_name[data_pool] - try: - self.mon_manager.raw_cluster_cmd('osd', 'pool', 'delete', - data_pool, data_pool, - '--yes-i-really-really-mean-it') - except CommandFailedError as e: - if e.exitstatus == 16: # EBUSY, this data pool is used - pass # by two metadata pools, let the 2nd - else: # pass delete it - raise - - def get_standby_daemons(self): - return set([s['name'] for s in self.status().get_standbys()]) - - def get_mds_hostnames(self): - result = set() - for mds_id in self.mds_ids: - mds_remote = self.mon_manager.find_remote('mds', mds_id) - result.add(mds_remote.hostname) - - return list(result) - - def set_clients_block(self, blocked, mds_id=None): - """ - Block (using iptables) client communications to this MDS. Be careful: if - other services are running on this MDS, or other MDSs try to talk to this - MDS, their communications may also be blocked as collatoral damage. - - :param mds_id: Optional ID of MDS to block, default to all - :return: - """ - da_flag = "-A" if blocked else "-D" - - def set_block(_mds_id): - remote = self.mon_manager.find_remote('mds', _mds_id) - status = self.status() - - addr = status.get_mds_addr(_mds_id) - ip_str, port_str, inst_str = re.match("(.+):(.+)/(.+)", addr).groups() - - remote.run( - args=["sudo", "iptables", da_flag, "OUTPUT", "-p", "tcp", "--sport", port_str, "-j", "REJECT", "-m", - "comment", "--comment", "teuthology"]) - remote.run( - args=["sudo", "iptables", da_flag, "INPUT", "-p", "tcp", "--dport", port_str, "-j", "REJECT", "-m", - "comment", "--comment", "teuthology"]) - - self._one_or_all(mds_id, set_block, in_parallel=False) - - def clear_firewall(self): - clear_firewall(self._ctx) - - def get_mds_info(self, mds_id): - return FSStatus(self.mon_manager).get_mds(mds_id) - - def is_full(self): - flags = json.loads(self.mon_manager.raw_cluster_cmd("osd", "dump", "--format=json-pretty"))['flags'] - return 'full' in flags - - def is_pool_full(self, pool_name): - pools = json.loads(self.mon_manager.raw_cluster_cmd("osd", "dump", "--format=json-pretty"))['pools'] - for pool in pools: - if pool['pool_name'] == pool_name: - return 'full' in pool['flags_names'].split(",") - - raise RuntimeError("Pool not found '{0}'".format(pool_name)) - -class Filesystem(MDSCluster): - """ - This object is for driving a CephFS filesystem. The MDS daemons driven by - MDSCluster may be shared with other Filesystems. - """ - def __init__(self, ctx, fscid=None, name=None, create=False, - ec_profile=None): - super(Filesystem, self).__init__(ctx) - - self.name = name - self.ec_profile = ec_profile - self.id = None - self.metadata_pool_name = None - self.metadata_overlay = False - self.data_pool_name = None - self.data_pools = None - - client_list = list(misc.all_roles_of_type(self._ctx.cluster, 'client')) - self.client_id = client_list[0] - self.client_remote = list(misc.get_clients(ctx=ctx, roles=["client.{0}".format(self.client_id)]))[0][1] - - if name is not None: - if fscid is not None: - raise RuntimeError("cannot specify fscid when creating fs") - if create and not self.legacy_configured(): - self.create() - else: - if fscid is not None: - self.id = fscid - self.getinfo(refresh = True) - - # Stash a reference to the first created filesystem on ctx, so - # that if someone drops to the interactive shell they can easily - # poke our methods. - if not hasattr(self._ctx, "filesystem"): - self._ctx.filesystem = self - - def getinfo(self, refresh = False): - status = self.status() - if self.id is not None: - fsmap = status.get_fsmap(self.id) - elif self.name is not None: - fsmap = status.get_fsmap_byname(self.name) - else: - fss = [fs for fs in status.get_filesystems()] - if len(fss) == 1: - fsmap = fss[0] - elif len(fss) == 0: - raise RuntimeError("no file system available") - else: - raise RuntimeError("more than one file system available") - self.id = fsmap['id'] - self.name = fsmap['mdsmap']['fs_name'] - self.get_pool_names(status = status, refresh = refresh) - return status - - def set_metadata_overlay(self, overlay): - if self.id is not None: - raise RuntimeError("cannot specify fscid when configuring overlay") - self.metadata_overlay = overlay - - def deactivate(self, rank): - if rank < 0: - raise RuntimeError("invalid rank") - elif rank == 0: - raise RuntimeError("cannot deactivate rank 0") - self.mon_manager.raw_cluster_cmd("mds", "deactivate", "%d:%d" % (self.id, rank)) - - def set_max_mds(self, max_mds): - self.mon_manager.raw_cluster_cmd("fs", "set", self.name, "max_mds", "%d" % max_mds) - - def set_allow_dirfrags(self, yes): - self.mon_manager.raw_cluster_cmd("fs", "set", self.name, "allow_dirfrags", str(yes).lower(), '--yes-i-really-mean-it') - - def get_pgs_per_fs_pool(self): - """ - Calculate how many PGs to use when creating a pool, in order to avoid raising any - health warnings about mon_pg_warn_min_per_osd - - :return: an integer number of PGs - """ - pg_warn_min_per_osd = int(self.get_config('mon_pg_warn_min_per_osd')) - osd_count = len(list(misc.all_roles_of_type(self._ctx.cluster, 'osd'))) - return pg_warn_min_per_osd * osd_count - - def create(self): - if self.name is None: - self.name = "cephfs" - if self.metadata_pool_name is None: - self.metadata_pool_name = "{0}_metadata".format(self.name) - if self.data_pool_name is None: - data_pool_name = "{0}_data".format(self.name) - else: - data_pool_name = self.data_pool_name - - log.info("Creating filesystem '{0}'".format(self.name)) - - pgs_per_fs_pool = self.get_pgs_per_fs_pool() - - self.mon_manager.raw_cluster_cmd('osd', 'pool', 'create', - self.metadata_pool_name, pgs_per_fs_pool.__str__()) - if self.metadata_overlay: - self.mon_manager.raw_cluster_cmd('fs', 'new', - self.name, self.metadata_pool_name, data_pool_name, - '--allow-dangerous-metadata-overlay') - else: - if self.ec_profile: - log.info("EC profile is %s", self.ec_profile) - cmd = ['osd', 'erasure-code-profile', 'set', data_pool_name] - cmd.extend(self.ec_profile) - self.mon_manager.raw_cluster_cmd(*cmd) - self.mon_manager.raw_cluster_cmd( - 'osd', 'pool', 'create', - data_pool_name, pgs_per_fs_pool.__str__(), 'erasure', - data_pool_name) - self.mon_manager.raw_cluster_cmd( - 'osd', 'pool', 'set', - data_pool_name, 'allow_ec_overwrites', 'true') - else: - self.mon_manager.raw_cluster_cmd( - 'osd', 'pool', 'create', - data_pool_name, pgs_per_fs_pool.__str__()) - self.mon_manager.raw_cluster_cmd('fs', 'new', - self.name, self.metadata_pool_name, data_pool_name) - self.check_pool_application(self.metadata_pool_name) - self.check_pool_application(data_pool_name) - # Turn off spurious standby count warnings from modifying max_mds in tests. - try: - self.mon_manager.raw_cluster_cmd('fs', 'set', self.name, 'standby_count_wanted', '0') - except CommandFailedError as e: - if e.exitstatus == 22: - # standby_count_wanted not available prior to luminous (upgrade tests would fail otherwise) - pass - else: - raise - - self.getinfo(refresh = True) - - - def check_pool_application(self, pool_name): - osd_map = self.mon_manager.get_osd_dump_json() - for pool in osd_map['pools']: - if pool['pool_name'] == pool_name: - if "application_metadata" in pool: - if not "cephfs" in pool['application_metadata']: - raise RuntimeError("Pool %p does not name cephfs as application!".\ - format(pool_name)) - - - def __del__(self): - if getattr(self._ctx, "filesystem", None) == self: - delattr(self._ctx, "filesystem") - - def exists(self): - """ - Whether a filesystem exists in the mon's filesystem list - """ - fs_list = json.loads(self.mon_manager.raw_cluster_cmd('fs', 'ls', '--format=json-pretty')) - return self.name in [fs['name'] for fs in fs_list] - - def legacy_configured(self): - """ - Check if a legacy (i.e. pre "fs new") filesystem configuration is present. If this is - the case, the caller should avoid using Filesystem.create - """ - try: - out_text = self.mon_manager.raw_cluster_cmd('--format=json-pretty', 'osd', 'lspools') - pools = json.loads(out_text) - metadata_pool_exists = 'metadata' in [p['poolname'] for p in pools] - if metadata_pool_exists: - self.metadata_pool_name = 'metadata' - except CommandFailedError as e: - # For use in upgrade tests, Ceph cuttlefish and earlier don't support - # structured output (--format) from the CLI. - if e.exitstatus == 22: - metadata_pool_exists = True - else: - raise - - return metadata_pool_exists - - def _df(self): - return json.loads(self.mon_manager.raw_cluster_cmd("df", "--format=json-pretty")) - - def get_mds_map(self): - return self.status().get_fsmap(self.id)['mdsmap'] - - def add_data_pool(self, name): - self.mon_manager.raw_cluster_cmd('osd', 'pool', 'create', name, self.get_pgs_per_fs_pool().__str__()) - self.mon_manager.raw_cluster_cmd('fs', 'add_data_pool', self.name, name) - self.get_pool_names(refresh = True) - for poolid, fs_name in self.data_pools.items(): - if name == fs_name: - return poolid - raise RuntimeError("could not get just created pool '{0}'".format(name)) - - def get_pool_names(self, refresh = False, status = None): - if refresh or self.metadata_pool_name is None or self.data_pools is None: - if status is None: - status = self.status() - fsmap = status.get_fsmap(self.id) - - osd_map = self.mon_manager.get_osd_dump_json() - id_to_name = {} - for p in osd_map['pools']: - id_to_name[p['pool']] = p['pool_name'] - - self.metadata_pool_name = id_to_name[fsmap['mdsmap']['metadata_pool']] - self.data_pools = {} - for data_pool in fsmap['mdsmap']['data_pools']: - self.data_pools[data_pool] = id_to_name[data_pool] - - def get_data_pool_name(self, refresh = False): - if refresh or self.data_pools is None: - self.get_pool_names(refresh = True) - assert(len(self.data_pools) == 1) - return self.data_pools.values()[0] - - def get_data_pool_id(self, refresh = False): - """ - Don't call this if you have multiple data pools - :return: integer - """ - if refresh or self.data_pools is None: - self.get_pool_names(refresh = True) - assert(len(self.data_pools) == 1) - return self.data_pools.keys()[0] - - def get_data_pool_names(self, refresh = False): - if refresh or self.data_pools is None: - self.get_pool_names(refresh = True) - return self.data_pools.values() - - def get_metadata_pool_name(self): - return self.metadata_pool_name - - def set_data_pool_name(self, name): - if self.id is not None: - raise RuntimeError("can't set filesystem name if its fscid is set") - self.data_pool_name = name - - def get_namespace_id(self): - return self.id - - def get_pool_df(self, pool_name): - """ - Return a dict like: - {u'bytes_used': 0, u'max_avail': 83848701, u'objects': 0, u'kb_used': 0} - """ - for pool_df in self._df()['pools']: - if pool_df['name'] == pool_name: - return pool_df['stats'] - - raise RuntimeError("Pool name '{0}' not found".format(pool_name)) - - def get_usage(self): - return self._df()['stats']['total_used_bytes'] - - def are_daemons_healthy(self): - """ - Return true if all daemons are in one of active, standby, standby-replay, and - at least max_mds daemons are in 'active'. - - Unlike most of Filesystem, this function is tolerant of new-style `fs` - commands being missing, because we are part of the ceph installation - process during upgrade suites, so must fall back to old style commands - when we get an EINVAL on a new style command. - - :return: - """ - - active_count = 0 - try: - mds_map = self.get_mds_map() - except CommandFailedError as cfe: - # Old version, fall back to non-multi-fs commands - if cfe.exitstatus == errno.EINVAL: - mds_map = json.loads( - self.mon_manager.raw_cluster_cmd('mds', 'dump', '--format=json')) - else: - raise - - log.info("are_daemons_healthy: mds map: {0}".format(mds_map)) - - for mds_id, mds_status in mds_map['info'].items(): - if mds_status['state'] not in ["up:active", "up:standby", "up:standby-replay"]: - log.warning("Unhealthy mds state {0}:{1}".format(mds_id, mds_status['state'])) - return False - elif mds_status['state'] == 'up:active': - active_count += 1 - - log.info("are_daemons_healthy: {0}/{1}".format( - active_count, mds_map['max_mds'] - )) - - if active_count >= mds_map['max_mds']: - # The MDSMap says these guys are active, but let's check they really are - for mds_id, mds_status in mds_map['info'].items(): - if mds_status['state'] == 'up:active': - try: - daemon_status = self.mds_asok(["status"], mds_id=mds_status['name']) - except CommandFailedError as cfe: - if cfe.exitstatus == errno.EINVAL: - # Old version, can't do this check - continue - else: - # MDS not even running - return False - - if daemon_status['state'] != 'up:active': - # MDS hasn't taken the latest map yet - return False - - return True - else: - return False - - def get_daemon_names(self, state=None): - """ - Return MDS daemon names of those daemons in the given state - :param state: - :return: - """ - status = self.get_mds_map() - result = [] - for mds_status in sorted(status['info'].values(), lambda a, b: cmp(a['rank'], b['rank'])): - if mds_status['state'] == state or state is None: - result.append(mds_status['name']) - - return result - - def get_active_names(self): - """ - Return MDS daemon names of those daemons holding ranks - in state up:active - - :return: list of strings like ['a', 'b'], sorted by rank - """ - return self.get_daemon_names("up:active") - - def get_all_mds_rank(self): - status = self.get_mds_map() - result = [] - for mds_status in sorted(status['info'].values(), lambda a, b: cmp(a['rank'], b['rank'])): - if mds_status['rank'] != -1 and mds_status['state'] != 'up:standby-replay': - result.append(mds_status['rank']) - - return result - - def get_rank_names(self): - """ - Return MDS daemon names of those daemons holding a rank, - sorted by rank. This includes e.g. up:replay/reconnect - as well as active, but does not include standby or - standby-replay. - """ - status = self.get_mds_map() - result = [] - for mds_status in sorted(status['info'].values(), lambda a, b: cmp(a['rank'], b['rank'])): - if mds_status['rank'] != -1 and mds_status['state'] != 'up:standby-replay': - result.append(mds_status['name']) - - return result - - def wait_for_daemons(self, timeout=None): - """ - Wait until all daemons are healthy - :return: - """ - - if timeout is None: - timeout = DAEMON_WAIT_TIMEOUT - - elapsed = 0 - while True: - if self.are_daemons_healthy(): - return - else: - time.sleep(1) - elapsed += 1 - - if elapsed > timeout: - raise RuntimeError("Timed out waiting for MDS daemons to become healthy") - - def get_lone_mds_id(self): - """ - Get a single MDS ID: the only one if there is only one - configured, else the only one currently holding a rank, - else raise an error. - """ - if len(self.mds_ids) != 1: - alive = self.get_rank_names() - if len(alive) == 1: - return alive[0] - else: - raise ValueError("Explicit MDS argument required when multiple MDSs in use") - else: - return self.mds_ids[0] - - def recreate(self): - log.info("Creating new filesystem") - self.delete_all_filesystems() - self.id = None - self.create() - - def put_metadata_object_raw(self, object_id, infile): - """ - Save an object to the metadata pool - """ - temp_bin_path = infile - self.client_remote.run(args=[ - 'sudo', os.path.join(self._prefix, 'rados'), '-p', self.metadata_pool_name, 'put', object_id, temp_bin_path - ]) - - def get_metadata_object_raw(self, object_id): - """ - Retrieve an object from the metadata pool and store it in a file. - """ - temp_bin_path = '/tmp/' + object_id + '.bin' - - self.client_remote.run(args=[ - 'sudo', os.path.join(self._prefix, 'rados'), '-p', self.metadata_pool_name, 'get', object_id, temp_bin_path - ]) - - return temp_bin_path - - def get_metadata_object(self, object_type, object_id): - """ - Retrieve an object from the metadata pool, pass it through - ceph-dencoder to dump it to JSON, and return the decoded object. - """ - temp_bin_path = '/tmp/out.bin' - - self.client_remote.run(args=[ - 'sudo', os.path.join(self._prefix, 'rados'), '-p', self.metadata_pool_name, 'get', object_id, temp_bin_path - ]) - - stdout = StringIO() - self.client_remote.run(args=[ - 'sudo', os.path.join(self._prefix, 'ceph-dencoder'), 'type', object_type, 'import', temp_bin_path, 'decode', 'dump_json' - ], stdout=stdout) - dump_json = stdout.getvalue().strip() - try: - dump = json.loads(dump_json) - except (TypeError, ValueError): - log.error("Failed to decode JSON: '{0}'".format(dump_json)) - raise - - return dump - - def get_journal_version(self): - """ - Read the JournalPointer and Journal::Header objects to learn the version of - encoding in use. - """ - journal_pointer_object = '400.00000000' - journal_pointer_dump = self.get_metadata_object("JournalPointer", journal_pointer_object) - journal_ino = journal_pointer_dump['journal_pointer']['front'] - - journal_header_object = "{0:x}.00000000".format(journal_ino) - journal_header_dump = self.get_metadata_object('Journaler::Header', journal_header_object) - - version = journal_header_dump['journal_header']['stream_format'] - log.info("Read journal version {0}".format(version)) - - return version - - def mds_asok(self, command, mds_id=None): - if mds_id is None: - mds_id = self.get_lone_mds_id() - - return self.json_asok(command, 'mds', mds_id) - - def read_cache(self, path, depth=None): - cmd = ["dump", "tree", path] - if depth is not None: - cmd.append(depth.__str__()) - result = self.mds_asok(cmd) - if len(result) == 0: - raise RuntimeError("Path not found in cache: {0}".format(path)) - - return result - - def wait_for_state(self, goal_state, reject=None, timeout=None, mds_id=None, rank=None): - """ - Block until the MDS reaches a particular state, or a failure condition - is met. - - When there are multiple MDSs, succeed when exaclty one MDS is in the - goal state, or fail when any MDS is in the reject state. - - :param goal_state: Return once the MDS is in this state - :param reject: Fail if the MDS enters this state before the goal state - :param timeout: Fail if this many seconds pass before reaching goal - :return: number of seconds waited, rounded down to integer - """ - - started_at = time.time() - while True: - status = self.status() - if rank is not None: - mds_info = status.get_rank(self.id, rank) - current_state = mds_info['state'] if mds_info else None - log.info("Looked up MDS state for mds.{0}: {1}".format(rank, current_state)) - elif mds_id is not None: - # mds_info is None if no daemon with this ID exists in the map - mds_info = status.get_mds(mds_id) - current_state = mds_info['state'] if mds_info else None - log.info("Looked up MDS state for {0}: {1}".format(mds_id, current_state)) - else: - # In general, look for a single MDS - states = [m['state'] for m in status.get_ranks(self.id)] - if [s for s in states if s == goal_state] == [goal_state]: - current_state = goal_state - elif reject in states: - current_state = reject - else: - current_state = None - log.info("mapped states {0} to {1}".format(states, current_state)) - - elapsed = time.time() - started_at - if current_state == goal_state: - log.info("reached state '{0}' in {1}s".format(current_state, elapsed)) - return elapsed - elif reject is not None and current_state == reject: - raise RuntimeError("MDS in reject state {0}".format(current_state)) - elif timeout is not None and elapsed > timeout: - log.error("MDS status at timeout: {0}".format(status.get_fsmap(self.id))) - raise RuntimeError( - "Reached timeout after {0} seconds waiting for state {1}, while in state {2}".format( - elapsed, goal_state, current_state - )) - else: - time.sleep(1) - - def _read_data_xattr(self, ino_no, xattr_name, type, pool): - mds_id = self.mds_ids[0] - remote = self.mds_daemons[mds_id].remote - if pool is None: - pool = self.get_data_pool_name() - - obj_name = "{0:x}.00000000".format(ino_no) - - args = [ - os.path.join(self._prefix, "rados"), "-p", pool, "getxattr", obj_name, xattr_name - ] - try: - proc = remote.run( - args=args, - stdout=StringIO()) - except CommandFailedError as e: - log.error(e.__str__()) - raise ObjectNotFound(obj_name) - - data = proc.stdout.getvalue() - - p = remote.run( - args=[os.path.join(self._prefix, "ceph-dencoder"), "type", type, "import", "-", "decode", "dump_json"], - stdout=StringIO(), - stdin=data - ) - - return json.loads(p.stdout.getvalue().strip()) - - def _write_data_xattr(self, ino_no, xattr_name, data, pool=None): - """ - Write to an xattr of the 0th data object of an inode. Will - succeed whether the object and/or xattr already exist or not. - - :param ino_no: integer inode number - :param xattr_name: string name of the xattr - :param data: byte array data to write to the xattr - :param pool: name of data pool or None to use primary data pool - :return: None - """ - remote = self.mds_daemons[self.mds_ids[0]].remote - if pool is None: - pool = self.get_data_pool_name() - - obj_name = "{0:x}.00000000".format(ino_no) - args = [ - os.path.join(self._prefix, "rados"), "-p", pool, "setxattr", - obj_name, xattr_name, data - ] - remote.run( - args=args, - stdout=StringIO()) - - def read_backtrace(self, ino_no, pool=None): - """ - Read the backtrace from the data pool, return a dict in the format - given by inode_backtrace_t::dump, which is something like: - - :: - - rados -p cephfs_data getxattr 10000000002.00000000 parent > out.bin - ceph-dencoder type inode_backtrace_t import out.bin decode dump_json - - { "ino": 1099511627778, - "ancestors": [ - { "dirino": 1, - "dname": "blah", - "version": 11}], - "pool": 1, - "old_pools": []} - - :param pool: name of pool to read backtrace from. If omitted, FS must have only - one data pool and that will be used. - """ - return self._read_data_xattr(ino_no, "parent", "inode_backtrace_t", pool) - - def read_layout(self, ino_no, pool=None): - """ - Read 'layout' xattr of an inode and parse the result, returning a dict like: - :: - { - "stripe_unit": 4194304, - "stripe_count": 1, - "object_size": 4194304, - "pool_id": 1, - "pool_ns": "", - } - - :param pool: name of pool to read backtrace from. If omitted, FS must have only - one data pool and that will be used. - """ - return self._read_data_xattr(ino_no, "layout", "file_layout_t", pool) - - def _enumerate_data_objects(self, ino, size): - """ - Get the list of expected data objects for a range, and the list of objects - that really exist. - - :return a tuple of two lists of strings (expected, actual) - """ - stripe_size = 1024 * 1024 * 4 - - size = max(stripe_size, size) - - want_objects = [ - "{0:x}.{1:08x}".format(ino, n) - for n in range(0, ((size - 1) / stripe_size) + 1) - ] - - exist_objects = self.rados(["ls"], pool=self.get_data_pool_name()).split("\n") - - return want_objects, exist_objects - - def data_objects_present(self, ino, size): - """ - Check that *all* the expected data objects for an inode are present in the data pool - """ - - want_objects, exist_objects = self._enumerate_data_objects(ino, size) - missing = set(want_objects) - set(exist_objects) - - if missing: - log.info("Objects missing (ino {0}, size {1}): {2}".format( - ino, size, missing - )) - return False - else: - log.info("All objects for ino {0} size {1} found".format(ino, size)) - return True - - def data_objects_absent(self, ino, size): - want_objects, exist_objects = self._enumerate_data_objects(ino, size) - present = set(want_objects) & set(exist_objects) - - if present: - log.info("Objects not absent (ino {0}, size {1}): {2}".format( - ino, size, present - )) - return False - else: - log.info("All objects for ino {0} size {1} are absent".format(ino, size)) - return True - - def dirfrag_exists(self, ino, frag): - try: - self.rados(["stat", "{0:x}.{1:08x}".format(ino, frag)]) - except CommandFailedError as e: - return False - else: - return True - - def rados(self, args, pool=None, namespace=None, stdin_data=None): - """ - Call into the `rados` CLI from an MDS - """ - - if pool is None: - pool = self.get_metadata_pool_name() - - # Doesn't matter which MDS we use to run rados commands, they all - # have access to the pools - mds_id = self.mds_ids[0] - remote = self.mds_daemons[mds_id].remote - - # NB we could alternatively use librados pybindings for this, but it's a one-liner - # using the `rados` CLI - args = ([os.path.join(self._prefix, "rados"), "-p", pool] + - (["--namespace", namespace] if namespace else []) + - args) - p = remote.run( - args=args, - stdin=stdin_data, - stdout=StringIO()) - return p.stdout.getvalue().strip() - - def list_dirfrag(self, dir_ino): - """ - Read the named object and return the list of omap keys - - :return a list of 0 or more strings - """ - - dirfrag_obj_name = "{0:x}.00000000".format(dir_ino) - - try: - key_list_str = self.rados(["listomapkeys", dirfrag_obj_name]) - except CommandFailedError as e: - log.error(e.__str__()) - raise ObjectNotFound(dirfrag_obj_name) - - return key_list_str.split("\n") if key_list_str else [] - - def erase_metadata_objects(self, prefix): - """ - For all objects in the metadata pool matching the prefix, - erase them. - - This O(N) with the number of objects in the pool, so only suitable - for use on toy test filesystems. - """ - all_objects = self.rados(["ls"]).split("\n") - matching_objects = [o for o in all_objects if o.startswith(prefix)] - for o in matching_objects: - self.rados(["rm", o]) - - def erase_mds_objects(self, rank): - """ - Erase all the per-MDS objects for a particular rank. This includes - inotable, sessiontable, journal - """ - - def obj_prefix(multiplier): - """ - MDS object naming conventions like rank 1's - journal is at 201.*** - """ - return "%x." % (multiplier * 0x100 + rank) - - # MDS_INO_LOG_OFFSET - self.erase_metadata_objects(obj_prefix(2)) - # MDS_INO_LOG_BACKUP_OFFSET - self.erase_metadata_objects(obj_prefix(3)) - # MDS_INO_LOG_POINTER_OFFSET - self.erase_metadata_objects(obj_prefix(4)) - # MDSTables & SessionMap - self.erase_metadata_objects("mds{rank:d}_".format(rank=rank)) - - @property - def _prefix(self): - """ - Override this to set a different - """ - return "" - - def _run_tool(self, tool, args, rank=None, quiet=False): - # Tests frequently have [client] configuration that jacks up - # the objecter log level (unlikely to be interesting here) - # and does not set the mds log level (very interesting here) - if quiet: - base_args = [os.path.join(self._prefix, tool), '--debug-mds=1', '--debug-objecter=1'] - else: - base_args = [os.path.join(self._prefix, tool), '--debug-mds=4', '--debug-objecter=1'] - - if rank is not None: - base_args.extend(["--rank", "%d" % rank]) - - t1 = datetime.datetime.now() - r = self.tool_remote.run( - args=base_args + args, - stdout=StringIO()).stdout.getvalue().strip() - duration = datetime.datetime.now() - t1 - log.info("Ran {0} in time {1}, result:\n{2}".format( - base_args + args, duration, r - )) - return r - - @property - def tool_remote(self): - """ - An arbitrary remote to use when invoking recovery tools. Use an MDS host because - it'll definitely have keys with perms to access cephfs metadata pool. This is public - so that tests can use this remote to go get locally written output files from the tools. - """ - mds_id = self.mds_ids[0] - return self.mds_daemons[mds_id].remote - - def journal_tool(self, args, rank=None, quiet=False): - """ - Invoke cephfs-journal-tool with the passed arguments, and return its stdout - """ - return self._run_tool("cephfs-journal-tool", args, rank, quiet) - - def table_tool(self, args, quiet=False): - """ - Invoke cephfs-table-tool with the passed arguments, and return its stdout - """ - return self._run_tool("cephfs-table-tool", args, None, quiet) - - def data_scan(self, args, quiet=False, worker_count=1): - """ - Invoke cephfs-data-scan with the passed arguments, and return its stdout - - :param worker_count: if greater than 1, multiple workers will be run - in parallel and the return value will be None - """ - - workers = [] - - for n in range(0, worker_count): - if worker_count > 1: - # data-scan args first token is a command, followed by args to it. - # insert worker arguments after the command. - cmd = args[0] - worker_args = [cmd] + ["--worker_n", n.__str__(), "--worker_m", worker_count.__str__()] + args[1:] - else: - worker_args = args - - workers.append(Greenlet.spawn(lambda wargs=worker_args: - self._run_tool("cephfs-data-scan", wargs, None, quiet))) - - for w in workers: - w.get() - - if worker_count == 1: - return workers[0].value - else: - return None -- cgit 1.2.3-korg