summaryrefslogtreecommitdiffstats
path: root/src/ceph/qa/tasks/recovery_bench.py
diff options
context:
space:
mode:
Diffstat (limited to 'src/ceph/qa/tasks/recovery_bench.py')
-rw-r--r--src/ceph/qa/tasks/recovery_bench.py208
1 files changed, 0 insertions, 208 deletions
diff --git a/src/ceph/qa/tasks/recovery_bench.py b/src/ceph/qa/tasks/recovery_bench.py
deleted file mode 100644
index 5eb9fd2..0000000
--- a/src/ceph/qa/tasks/recovery_bench.py
+++ /dev/null
@@ -1,208 +0,0 @@
-"""
-Recovery system benchmarking
-"""
-from cStringIO import StringIO
-
-import contextlib
-import gevent
-import json
-import logging
-import random
-import time
-
-import ceph_manager
-from teuthology import misc as teuthology
-
-log = logging.getLogger(__name__)
-
-@contextlib.contextmanager
-def task(ctx, config):
- """
- Benchmark the recovery system.
-
- Generates objects with smalliobench, runs it normally to get a
- baseline performance measurement, then marks an OSD out and reruns
- to measure performance during recovery.
-
- The config should be as follows:
-
- recovery_bench:
- duration: <seconds for each measurement run>
- num_objects: <number of objects>
- io_size: <io size in bytes>
-
- example:
-
- tasks:
- - ceph:
- - recovery_bench:
- duration: 60
- num_objects: 500
- io_size: 4096
- """
- if config is None:
- config = {}
- assert isinstance(config, dict), \
- 'recovery_bench task only accepts a dict for configuration'
-
- log.info('Beginning recovery bench...')
-
- first_mon = teuthology.get_first_mon(ctx, config)
- (mon,) = ctx.cluster.only(first_mon).remotes.iterkeys()
-
- manager = ceph_manager.CephManager(
- mon,
- ctx=ctx,
- logger=log.getChild('ceph_manager'),
- )
-
- num_osds = teuthology.num_instances_of_type(ctx.cluster, 'osd')
- while len(manager.get_osd_status()['up']) < num_osds:
- time.sleep(10)
-
- bench_proc = RecoveryBencher(
- manager,
- config,
- )
- try:
- yield
- finally:
- log.info('joining recovery bencher')
- bench_proc.do_join()
-
-class RecoveryBencher:
- """
- RecoveryBencher
- """
- def __init__(self, manager, config):
- self.ceph_manager = manager
- self.ceph_manager.wait_for_clean()
-
- osd_status = self.ceph_manager.get_osd_status()
- self.osds = osd_status['up']
-
- self.config = config
- if self.config is None:
- self.config = dict()
-
- else:
- def tmp(x):
- """
- Local wrapper to print value.
- """
- print x
- self.log = tmp
-
- log.info("spawning thread")
-
- self.thread = gevent.spawn(self.do_bench)
-
- def do_join(self):
- """
- Join the recovery bencher. This is called after the main
- task exits.
- """
- self.thread.get()
-
- def do_bench(self):
- """
- Do the benchmarking.
- """
- duration = self.config.get("duration", 60)
- num_objects = self.config.get("num_objects", 500)
- io_size = self.config.get("io_size", 4096)
-
- osd = str(random.choice(self.osds))
- (osd_remote,) = self.ceph_manager.ctx.cluster.only('osd.%s' % osd).remotes.iterkeys()
-
- testdir = teuthology.get_testdir(self.ceph_manager.ctx)
-
- # create the objects
- osd_remote.run(
- args=[
- 'adjust-ulimits',
- 'ceph-coverage',
- '{tdir}/archive/coverage'.format(tdir=testdir),
- 'smalliobench'.format(tdir=testdir),
- '--use-prefix', 'recovery_bench',
- '--init-only', '1',
- '--num-objects', str(num_objects),
- '--io-size', str(io_size),
- ],
- wait=True,
- )
-
- # baseline bench
- log.info('non-recovery (baseline)')
- p = osd_remote.run(
- args=[
- 'adjust-ulimits',
- 'ceph-coverage',
- '{tdir}/archive/coverage'.format(tdir=testdir),
- 'smalliobench',
- '--use-prefix', 'recovery_bench',
- '--do-not-init', '1',
- '--duration', str(duration),
- '--io-size', str(io_size),
- ],
- stdout=StringIO(),
- stderr=StringIO(),
- wait=True,
- )
- self.process_samples(p.stderr.getvalue())
-
- self.ceph_manager.raw_cluster_cmd('osd', 'out', osd)
- time.sleep(5)
-
- # recovery bench
- log.info('recovery active')
- p = osd_remote.run(
- args=[
- 'adjust-ulimits',
- 'ceph-coverage',
- '{tdir}/archive/coverage'.format(tdir=testdir),
- 'smalliobench',
- '--use-prefix', 'recovery_bench',
- '--do-not-init', '1',
- '--duration', str(duration),
- '--io-size', str(io_size),
- ],
- stdout=StringIO(),
- stderr=StringIO(),
- wait=True,
- )
- self.process_samples(p.stderr.getvalue())
-
- self.ceph_manager.raw_cluster_cmd('osd', 'in', osd)
-
- def process_samples(self, input):
- """
- Extract samples from the input and process the results
-
- :param input: input lines in JSON format
- """
- lat = {}
- for line in input.split('\n'):
- try:
- sample = json.loads(line)
- samples = lat.setdefault(sample['type'], [])
- samples.append(float(sample['latency']))
- except Exception:
- pass
-
- for type in lat:
- samples = lat[type]
- samples.sort()
-
- num = len(samples)
-
- # median
- if num & 1 == 1: # odd number of samples
- median = samples[num / 2]
- else:
- median = (samples[num / 2] + samples[num / 2 - 1]) / 2
-
- # 99%
- ninety_nine = samples[int(num * 0.99)]
-
- log.info("%s: median %f, 99%% %f" % (type, median, ninety_nine))