summaryrefslogtreecommitdiffstats
path: root/src/ceph/qa/tasks/mds_thrash.py
diff options
context:
space:
mode:
Diffstat (limited to 'src/ceph/qa/tasks/mds_thrash.py')
-rw-r--r--src/ceph/qa/tasks/mds_thrash.py555
1 files changed, 0 insertions, 555 deletions
diff --git a/src/ceph/qa/tasks/mds_thrash.py b/src/ceph/qa/tasks/mds_thrash.py
deleted file mode 100644
index 75d236d..0000000
--- a/src/ceph/qa/tasks/mds_thrash.py
+++ /dev/null
@@ -1,555 +0,0 @@
-"""
-Thrash mds by simulating failures
-"""
-import logging
-import contextlib
-import ceph_manager
-import itertools
-import random
-import signal
-import time
-
-from gevent import sleep
-from gevent.greenlet import Greenlet
-from gevent.event import Event
-from teuthology import misc as teuthology
-
-from tasks.cephfs.filesystem import MDSCluster, Filesystem
-
-log = logging.getLogger(__name__)
-
-class DaemonWatchdog(Greenlet):
- """
- DaemonWatchdog::
-
- Watch Ceph daemons for failures. If an extended failure is detected (i.e.
- not intentional), then the watchdog will unmount file systems and send
- SIGTERM to all daemons. The duration of an extended failure is configurable
- with watchdog_daemon_timeout.
-
- watchdog_daemon_timeout [default: 300]: number of seconds a daemon
- is allowed to be failed before the watchdog will bark.
- """
-
- def __init__(self, ctx, manager, config, thrashers):
- Greenlet.__init__(self)
- self.ctx = ctx
- self.config = config
- self.e = None
- self.logger = log.getChild('daemon_watchdog')
- self.manager = manager
- self.name = 'watchdog'
- self.stopping = Event()
- self.thrashers = thrashers
-
- def _run(self):
- try:
- self.watch()
- except Exception as e:
- # See _run exception comment for MDSThrasher
- self.e = e
- self.logger.exception("exception:")
- # allow successful completion so gevent doesn't see an exception...
-
- def log(self, x):
- """Write data to logger"""
- self.logger.info(x)
-
- def stop(self):
- self.stopping.set()
-
- def bark(self):
- self.log("BARK! unmounting mounts and killing all daemons")
- for mount in self.ctx.mounts.values():
- try:
- mount.umount_wait(force=True)
- except:
- self.logger.exception("ignoring exception:")
- daemons = []
- daemons.extend(filter(lambda daemon: daemon.running() and not daemon.proc.finished, self.ctx.daemons.iter_daemons_of_role('mds', cluster=self.manager.cluster)))
- daemons.extend(filter(lambda daemon: daemon.running() and not daemon.proc.finished, self.ctx.daemons.iter_daemons_of_role('mon', cluster=self.manager.cluster)))
- for daemon in daemons:
- try:
- daemon.signal(signal.SIGTERM)
- except:
- self.logger.exception("ignoring exception:")
-
- def watch(self):
- self.log("watchdog starting")
- daemon_timeout = int(self.config.get('watchdog_daemon_timeout', 300))
- daemon_failure_time = {}
- while not self.stopping.is_set():
- bark = False
- now = time.time()
-
- mons = self.ctx.daemons.iter_daemons_of_role('mon', cluster=self.manager.cluster)
- mdss = self.ctx.daemons.iter_daemons_of_role('mds', cluster=self.manager.cluster)
- clients = self.ctx.daemons.iter_daemons_of_role('client', cluster=self.manager.cluster)
-
- #for daemon in mons:
- # self.log("mon daemon {role}.{id}: running={r}".format(role=daemon.role, id=daemon.id_, r=daemon.running() and not daemon.proc.finished))
- #for daemon in mdss:
- # self.log("mds daemon {role}.{id}: running={r}".format(role=daemon.role, id=daemon.id_, r=daemon.running() and not daemon.proc.finished))
-
- daemon_failures = []
- daemon_failures.extend(filter(lambda daemon: daemon.running() and daemon.proc.finished, mons))
- daemon_failures.extend(filter(lambda daemon: daemon.running() and daemon.proc.finished, mdss))
- for daemon in daemon_failures:
- name = daemon.role + '.' + daemon.id_
- dt = daemon_failure_time.setdefault(name, (daemon, now))
- assert dt[0] is daemon
- delta = now-dt[1]
- self.log("daemon {name} is failed for ~{t:.0f}s".format(name=name, t=delta))
- if delta > daemon_timeout:
- bark = True
-
- # If a daemon is no longer failed, remove it from tracking:
- for name in daemon_failure_time.keys():
- if name not in [d.role + '.' + d.id_ for d in daemon_failures]:
- self.log("daemon {name} has been restored".format(name=name))
- del daemon_failure_time[name]
-
- for thrasher in self.thrashers:
- if thrasher.e is not None:
- self.log("thrasher on fs.{name} failed".format(name=thrasher.fs.name))
- bark = True
-
- if bark:
- self.bark()
- return
-
- sleep(5)
-
- self.log("watchdog finished")
-
-class MDSThrasher(Greenlet):
- """
- MDSThrasher::
-
- The MDSThrasher thrashes MDSs during execution of other tasks (workunits, etc).
-
- The config is optional. Many of the config parameters are a a maximum value
- to use when selecting a random value from a range. To always use the maximum
- value, set no_random to true. The config is a dict containing some or all of:
-
- max_thrash: [default: 1] the maximum number of active MDSs per FS that will be thrashed at
- any given time.
-
- max_thrash_delay: [default: 30] maximum number of seconds to delay before
- thrashing again.
-
- max_replay_thrash_delay: [default: 4] maximum number of seconds to delay while in
- the replay state before thrashing.
-
- max_revive_delay: [default: 10] maximum number of seconds to delay before
- bringing back a thrashed MDS.
-
- randomize: [default: true] enables randomization and use the max/min values
-
- seed: [no default] seed the random number generator
-
- thrash_in_replay: [default: 0.0] likelihood that the MDS will be thrashed
- during replay. Value should be between 0.0 and 1.0.
-
- thrash_max_mds: [default: 0.05] likelihood that the max_mds of the mds
- cluster will be modified to a value [1, current) or (current, starting
- max_mds]. When reduced, randomly selected MDSs other than rank 0 will be
- deactivated to reach the new max_mds. Value should be between 0.0 and 1.0.
-
- thrash_while_stopping: [default: false] thrash an MDS while there
- are MDS in up:stopping (because max_mds was changed and some
- MDS were deactivated).
-
- thrash_weights: allows specific MDSs to be thrashed more/less frequently.
- This option overrides anything specified by max_thrash. This option is a
- dict containing mds.x: weight pairs. For example, [mds.a: 0.7, mds.b:
- 0.3, mds.c: 0.0]. Each weight is a value from 0.0 to 1.0. Any MDSs not
- specified will be automatically given a weight of 0.0 (not thrashed).
- For a given MDS, by default the trasher delays for up to
- max_thrash_delay, trashes, waits for the MDS to recover, and iterates.
- If a non-zero weight is specified for an MDS, for each iteration the
- thrasher chooses whether to thrash during that iteration based on a
- random value [0-1] not exceeding the weight of that MDS.
-
- Examples::
-
-
- The following example sets the likelihood that mds.a will be thrashed
- to 80%, mds.b to 20%, and other MDSs will not be thrashed. It also sets the
- likelihood that an MDS will be thrashed in replay to 40%.
- Thrash weights do not have to sum to 1.
-
- tasks:
- - ceph:
- - mds_thrash:
- thrash_weights:
- - mds.a: 0.8
- - mds.b: 0.2
- thrash_in_replay: 0.4
- - ceph-fuse:
- - workunit:
- clients:
- all: [suites/fsx.sh]
-
- The following example disables randomization, and uses the max delay values:
-
- tasks:
- - ceph:
- - mds_thrash:
- max_thrash_delay: 10
- max_revive_delay: 1
- max_replay_thrash_delay: 4
-
- """
-
- def __init__(self, ctx, manager, config, fs, max_mds):
- Greenlet.__init__(self)
-
- self.config = config
- self.ctx = ctx
- self.e = None
- self.logger = log.getChild('fs.[{f}]'.format(f = fs.name))
- self.fs = fs
- self.manager = manager
- self.max_mds = max_mds
- self.name = 'thrasher.fs.[{f}]'.format(f = fs.name)
- self.stopping = Event()
-
- self.randomize = bool(self.config.get('randomize', True))
- self.thrash_max_mds = float(self.config.get('thrash_max_mds', 0.05))
- self.max_thrash = int(self.config.get('max_thrash', 1))
- self.max_thrash_delay = float(self.config.get('thrash_delay', 120.0))
- self.thrash_in_replay = float(self.config.get('thrash_in_replay', False))
- assert self.thrash_in_replay >= 0.0 and self.thrash_in_replay <= 1.0, 'thrash_in_replay ({v}) must be between [0.0, 1.0]'.format(
- v=self.thrash_in_replay)
- self.max_replay_thrash_delay = float(self.config.get('max_replay_thrash_delay', 4.0))
- self.max_revive_delay = float(self.config.get('max_revive_delay', 10.0))
-
- def _run(self):
- try:
- self.do_thrash()
- except Exception as e:
- # Log exceptions here so we get the full backtrace (gevent loses them).
- # Also allow succesful completion as gevent exception handling is a broken mess:
- #
- # 2017-02-03T14:34:01.259 CRITICAL:root: File "gevent.libev.corecext.pyx", line 367, in gevent.libev.corecext.loop.handle_error (src/gevent/libev/gevent.corecext.c:5051)
- # File "/home/teuthworker/src/git.ceph.com_git_teuthology_master/virtualenv/local/lib/python2.7/site-packages/gevent/hub.py", line 558, in handle_error
- # self.print_exception(context, type, value, tb)
- # File "/home/teuthworker/src/git.ceph.com_git_teuthology_master/virtualenv/local/lib/python2.7/site-packages/gevent/hub.py", line 605, in print_exception
- # traceback.print_exception(type, value, tb, file=errstream)
- # File "/usr/lib/python2.7/traceback.py", line 124, in print_exception
- # _print(file, 'Traceback (most recent call last):')
- # File "/usr/lib/python2.7/traceback.py", line 13, in _print
- # file.write(str+terminator)
- # 2017-02-03T14:34:01.261 CRITICAL:root:IOError
- self.e = e
- self.logger.exception("exception:")
- # allow successful completion so gevent doesn't see an exception...
-
- def log(self, x):
- """Write data to logger assigned to this MDThrasher"""
- self.logger.info(x)
-
- def stop(self):
- self.stopping.set()
-
- def kill_mds(self, mds):
- if self.config.get('powercycle'):
- (remote,) = (self.ctx.cluster.only('mds.{m}'.format(m=mds)).
- remotes.iterkeys())
- self.log('kill_mds on mds.{m} doing powercycle of {s}'.
- format(m=mds, s=remote.name))
- self._assert_ipmi(remote)
- remote.console.power_off()
- else:
- self.ctx.daemons.get_daemon('mds', mds).stop()
-
- @staticmethod
- def _assert_ipmi(remote):
- assert remote.console.has_ipmi_credentials, (
- "powercycling requested but RemoteConsole is not "
- "initialized. Check ipmi config.")
-
- def revive_mds(self, mds, standby_for_rank=None):
- """
- Revive mds -- do an ipmpi powercycle (if indicated by the config)
- and then restart (using --hot-standby if specified.
- """
- if self.config.get('powercycle'):
- (remote,) = (self.ctx.cluster.only('mds.{m}'.format(m=mds)).
- remotes.iterkeys())
- self.log('revive_mds on mds.{m} doing powercycle of {s}'.
- format(m=mds, s=remote.name))
- self._assert_ipmi(remote)
- remote.console.power_on()
- self.manager.make_admin_daemon_dir(self.ctx, remote)
- args = []
- if standby_for_rank:
- args.extend(['--hot-standby', standby_for_rank])
- self.ctx.daemons.get_daemon('mds', mds).restart(*args)
-
- def wait_for_stable(self, rank = None, gid = None):
- self.log('waiting for mds cluster to stabilize...')
- for itercount in itertools.count():
- status = self.fs.status()
- max_mds = status.get_fsmap(self.fs.id)['mdsmap']['max_mds']
- ranks = list(status.get_ranks(self.fs.id))
- stopping = filter(lambda info: "up:stopping" == info['state'], ranks)
- actives = filter(lambda info: "up:active" == info['state'] and "laggy_since" not in info, ranks)
-
- if not bool(self.config.get('thrash_while_stopping', False)) and len(stopping) > 0:
- if itercount % 5 == 0:
- self.log('cluster is considered unstable while MDS are in up:stopping (!thrash_while_stopping)')
- else:
- if rank is not None:
- try:
- info = status.get_rank(self.fs.id, rank)
- if info['gid'] != gid and "up:active" == info['state']:
- self.log('mds.{name} has gained rank={rank}, replacing gid={gid}'.format(name = info['name'], rank = rank, gid = gid))
- return status
- except:
- pass # no rank present
- if len(actives) >= max_mds:
- # no replacement can occur!
- self.log("cluster has %d actives (max_mds is %d), no MDS can replace rank %d".format(len(actives), max_mds, rank))
- return status
- else:
- if len(actives) >= max_mds:
- self.log('mds cluster has {count} alive and active, now stable!'.format(count = len(actives)))
- return status, None
- if itercount > 300/2: # 5 minutes
- raise RuntimeError('timeout waiting for cluster to stabilize')
- elif itercount % 5 == 0:
- self.log('mds map: {status}'.format(status=status))
- else:
- self.log('no change')
- sleep(2)
-
- def do_thrash(self):
- """
- Perform the random thrashing action
- """
-
- self.log('starting mds_do_thrash for fs {fs}'.format(fs = self.fs.name))
- stats = {
- "max_mds": 0,
- "deactivate": 0,
- "kill": 0,
- }
-
- while not self.stopping.is_set():
- delay = self.max_thrash_delay
- if self.randomize:
- delay = random.randrange(0.0, self.max_thrash_delay)
-
- if delay > 0.0:
- self.log('waiting for {delay} secs before thrashing'.format(delay=delay))
- self.stopping.wait(delay)
- if self.stopping.is_set():
- continue
-
- status = self.fs.status()
-
- if random.random() <= self.thrash_max_mds:
- max_mds = status.get_fsmap(self.fs.id)['mdsmap']['max_mds']
- options = range(1, max_mds)+range(max_mds+1, self.max_mds+1)
- if len(options) > 0:
- sample = random.sample(options, 1)
- new_max_mds = sample[0]
- self.log('thrashing max_mds: %d -> %d' % (max_mds, new_max_mds))
- self.fs.set_max_mds(new_max_mds)
- stats['max_mds'] += 1
-
- targets = filter(lambda r: r['rank'] >= new_max_mds, status.get_ranks(self.fs.id))
- if len(targets) > 0:
- # deactivate mds in decending order
- targets = sorted(targets, key=lambda r: r['rank'], reverse=True)
- for target in targets:
- self.log("deactivating rank %d" % target['rank'])
- self.fs.deactivate(target['rank'])
- stats['deactivate'] += 1
- status = self.wait_for_stable()[0]
- else:
- status = self.wait_for_stable()[0]
-
- count = 0
- for info in status.get_ranks(self.fs.id):
- name = info['name']
- label = 'mds.' + name
- rank = info['rank']
- gid = info['gid']
-
- # if thrash_weights isn't specified and we've reached max_thrash,
- # we're done
- count = count + 1
- if 'thrash_weights' not in self.config and count > self.max_thrash:
- break
-
- weight = 1.0
- if 'thrash_weights' in self.config:
- weight = self.config['thrash_weights'].get(label, '0.0')
- skip = random.randrange(0.0, 1.0)
- if weight <= skip:
- self.log('skipping thrash iteration with skip ({skip}) > weight ({weight})'.format(skip=skip, weight=weight))
- continue
-
- self.log('kill {label} (rank={rank})'.format(label=label, rank=rank))
- self.kill_mds(name)
- stats['kill'] += 1
-
- # wait for mon to report killed mds as crashed
- last_laggy_since = None
- itercount = 0
- while True:
- status = self.fs.status()
- info = status.get_mds(name)
- if not info:
- break
- if 'laggy_since' in info:
- last_laggy_since = info['laggy_since']
- break
- if any([(f == name) for f in status.get_fsmap(self.fs.id)['mdsmap']['failed']]):
- break
- self.log(
- 'waiting till mds map indicates {label} is laggy/crashed, in failed state, or {label} is removed from mdsmap'.format(
- label=label))
- itercount = itercount + 1
- if itercount > 10:
- self.log('mds map: {status}'.format(status=status))
- sleep(2)
-
- if last_laggy_since:
- self.log(
- '{label} reported laggy/crashed since: {since}'.format(label=label, since=last_laggy_since))
- else:
- self.log('{label} down, removed from mdsmap'.format(label=label, since=last_laggy_since))
-
- # wait for a standby mds to takeover and become active
- status = self.wait_for_stable(rank, gid)
-
- # wait for a while before restarting old active to become new
- # standby
- delay = self.max_revive_delay
- if self.randomize:
- delay = random.randrange(0.0, self.max_revive_delay)
-
- self.log('waiting for {delay} secs before reviving {label}'.format(
- delay=delay, label=label))
- sleep(delay)
-
- self.log('reviving {label}'.format(label=label))
- self.revive_mds(name)
-
- for itercount in itertools.count():
- if itercount > 300/2: # 5 minutes
- raise RuntimeError('timeout waiting for MDS to revive')
- status = self.fs.status()
- info = status.get_mds(name)
- if info and info['state'] in ('up:standby', 'up:standby-replay', 'up:active'):
- self.log('{label} reported in {state} state'.format(label=label, state=info['state']))
- break
- self.log(
- 'waiting till mds map indicates {label} is in active, standby or standby-replay'.format(label=label))
- sleep(2)
-
- for stat in stats:
- self.log("stat['{key}'] = {value}".format(key = stat, value = stats[stat]))
-
- # don't do replay thrashing right now
-# for info in status.get_replays(self.fs.id):
-# # this might race with replay -> active transition...
-# if status['state'] == 'up:replay' and random.randrange(0.0, 1.0) < self.thrash_in_replay:
-# delay = self.max_replay_thrash_delay
-# if self.randomize:
-# delay = random.randrange(0.0, self.max_replay_thrash_delay)
-# sleep(delay)
-# self.log('kill replaying mds.{id}'.format(id=self.to_kill))
-# self.kill_mds(self.to_kill)
-#
-# delay = self.max_revive_delay
-# if self.randomize:
-# delay = random.randrange(0.0, self.max_revive_delay)
-#
-# self.log('waiting for {delay} secs before reviving mds.{id}'.format(
-# delay=delay, id=self.to_kill))
-# sleep(delay)
-#
-# self.log('revive mds.{id}'.format(id=self.to_kill))
-# self.revive_mds(self.to_kill)
-
-
-@contextlib.contextmanager
-def task(ctx, config):
- """
- Stress test the mds by thrashing while another task/workunit
- is running.
-
- Please refer to MDSThrasher class for further information on the
- available options.
- """
-
- mds_cluster = MDSCluster(ctx)
-
- if config is None:
- config = {}
- assert isinstance(config, dict), \
- 'mds_thrash task only accepts a dict for configuration'
- mdslist = list(teuthology.all_roles_of_type(ctx.cluster, 'mds'))
- assert len(mdslist) > 1, \
- 'mds_thrash task requires at least 2 metadata servers'
-
- # choose random seed
- if 'seed' in config:
- seed = int(config['seed'])
- else:
- seed = int(time.time())
- log.info('mds thrasher using random seed: {seed}'.format(seed=seed))
- random.seed(seed)
-
- (first,) = ctx.cluster.only('mds.{_id}'.format(_id=mdslist[0])).remotes.iterkeys()
- manager = ceph_manager.CephManager(
- first, ctx=ctx, logger=log.getChild('ceph_manager'),
- )
-
- # make sure everyone is in active, standby, or standby-replay
- log.info('Wait for all MDSs to reach steady state...')
- status = mds_cluster.status()
- while True:
- steady = True
- for info in status.get_all():
- state = info['state']
- if state not in ('up:active', 'up:standby', 'up:standby-replay'):
- steady = False
- break
- if steady:
- break
- sleep(2)
- status = mds_cluster.status()
- log.info('Ready to start thrashing')
-
- thrashers = []
-
- watchdog = DaemonWatchdog(ctx, manager, config, thrashers)
- watchdog.start()
-
- manager.wait_for_clean()
- assert manager.is_clean()
- for fs in status.get_filesystems():
- thrasher = MDSThrasher(ctx, manager, config, Filesystem(ctx, fs['id']), fs['mdsmap']['max_mds'])
- thrasher.start()
- thrashers.append(thrasher)
-
- try:
- log.debug('Yielding')
- yield
- finally:
- log.info('joining mds_thrashers')
- for thrasher in thrashers:
- thrasher.stop()
- if thrasher.e:
- raise RuntimeError('error during thrashing')
- thrasher.join()
- log.info('done joining')
-
- watchdog.stop()
- watchdog.join()