summaryrefslogtreecommitdiffstats
path: root/src/ceph/qa/tasks/recovery_bench.py
diff options
context:
space:
mode:
authorQiaowei Ren <qiaowei.ren@intel.com>2018-01-04 13:43:33 +0800
committerQiaowei Ren <qiaowei.ren@intel.com>2018-01-05 11:59:39 +0800
commit812ff6ca9fcd3e629e49d4328905f33eee8ca3f5 (patch)
tree04ece7b4da00d9d2f98093774594f4057ae561d4 /src/ceph/qa/tasks/recovery_bench.py
parent15280273faafb77777eab341909a3f495cf248d9 (diff)
initial code repo
This patch creates initial code repo. For ceph, luminous stable release will be used for base code, and next changes and optimization for ceph will be added to it. For opensds, currently any changes can be upstreamed into original opensds repo (https://github.com/opensds/opensds), and so stor4nfv will directly clone opensds code to deploy stor4nfv environment. And the scripts for deployment based on ceph and opensds will be put into 'ci' directory. Change-Id: I46a32218884c75dda2936337604ff03c554648e4 Signed-off-by: Qiaowei Ren <qiaowei.ren@intel.com>
Diffstat (limited to 'src/ceph/qa/tasks/recovery_bench.py')
-rw-r--r--src/ceph/qa/tasks/recovery_bench.py208
1 files changed, 208 insertions, 0 deletions
diff --git a/src/ceph/qa/tasks/recovery_bench.py b/src/ceph/qa/tasks/recovery_bench.py
new file mode 100644
index 0000000..5eb9fd2
--- /dev/null
+++ b/src/ceph/qa/tasks/recovery_bench.py
@@ -0,0 +1,208 @@
+"""
+Recovery system benchmarking
+"""
+from cStringIO import StringIO
+
+import contextlib
+import gevent
+import json
+import logging
+import random
+import time
+
+import ceph_manager
+from teuthology import misc as teuthology
+
+log = logging.getLogger(__name__)
+
+@contextlib.contextmanager
+def task(ctx, config):
+ """
+ Benchmark the recovery system.
+
+ Generates objects with smalliobench, runs it normally to get a
+ baseline performance measurement, then marks an OSD out and reruns
+ to measure performance during recovery.
+
+ The config should be as follows:
+
+ recovery_bench:
+ duration: <seconds for each measurement run>
+ num_objects: <number of objects>
+ io_size: <io size in bytes>
+
+ example:
+
+ tasks:
+ - ceph:
+ - recovery_bench:
+ duration: 60
+ num_objects: 500
+ io_size: 4096
+ """
+ if config is None:
+ config = {}
+ assert isinstance(config, dict), \
+ 'recovery_bench task only accepts a dict for configuration'
+
+ log.info('Beginning recovery bench...')
+
+ first_mon = teuthology.get_first_mon(ctx, config)
+ (mon,) = ctx.cluster.only(first_mon).remotes.iterkeys()
+
+ manager = ceph_manager.CephManager(
+ mon,
+ ctx=ctx,
+ logger=log.getChild('ceph_manager'),
+ )
+
+ num_osds = teuthology.num_instances_of_type(ctx.cluster, 'osd')
+ while len(manager.get_osd_status()['up']) < num_osds:
+ time.sleep(10)
+
+ bench_proc = RecoveryBencher(
+ manager,
+ config,
+ )
+ try:
+ yield
+ finally:
+ log.info('joining recovery bencher')
+ bench_proc.do_join()
+
+class RecoveryBencher:
+ """
+ RecoveryBencher
+ """
+ def __init__(self, manager, config):
+ self.ceph_manager = manager
+ self.ceph_manager.wait_for_clean()
+
+ osd_status = self.ceph_manager.get_osd_status()
+ self.osds = osd_status['up']
+
+ self.config = config
+ if self.config is None:
+ self.config = dict()
+
+ else:
+ def tmp(x):
+ """
+ Local wrapper to print value.
+ """
+ print x
+ self.log = tmp
+
+ log.info("spawning thread")
+
+ self.thread = gevent.spawn(self.do_bench)
+
+ def do_join(self):
+ """
+ Join the recovery bencher. This is called after the main
+ task exits.
+ """
+ self.thread.get()
+
+ def do_bench(self):
+ """
+ Do the benchmarking.
+ """
+ duration = self.config.get("duration", 60)
+ num_objects = self.config.get("num_objects", 500)
+ io_size = self.config.get("io_size", 4096)
+
+ osd = str(random.choice(self.osds))
+ (osd_remote,) = self.ceph_manager.ctx.cluster.only('osd.%s' % osd).remotes.iterkeys()
+
+ testdir = teuthology.get_testdir(self.ceph_manager.ctx)
+
+ # create the objects
+ osd_remote.run(
+ args=[
+ 'adjust-ulimits',
+ 'ceph-coverage',
+ '{tdir}/archive/coverage'.format(tdir=testdir),
+ 'smalliobench'.format(tdir=testdir),
+ '--use-prefix', 'recovery_bench',
+ '--init-only', '1',
+ '--num-objects', str(num_objects),
+ '--io-size', str(io_size),
+ ],
+ wait=True,
+ )
+
+ # baseline bench
+ log.info('non-recovery (baseline)')
+ p = osd_remote.run(
+ args=[
+ 'adjust-ulimits',
+ 'ceph-coverage',
+ '{tdir}/archive/coverage'.format(tdir=testdir),
+ 'smalliobench',
+ '--use-prefix', 'recovery_bench',
+ '--do-not-init', '1',
+ '--duration', str(duration),
+ '--io-size', str(io_size),
+ ],
+ stdout=StringIO(),
+ stderr=StringIO(),
+ wait=True,
+ )
+ self.process_samples(p.stderr.getvalue())
+
+ self.ceph_manager.raw_cluster_cmd('osd', 'out', osd)
+ time.sleep(5)
+
+ # recovery bench
+ log.info('recovery active')
+ p = osd_remote.run(
+ args=[
+ 'adjust-ulimits',
+ 'ceph-coverage',
+ '{tdir}/archive/coverage'.format(tdir=testdir),
+ 'smalliobench',
+ '--use-prefix', 'recovery_bench',
+ '--do-not-init', '1',
+ '--duration', str(duration),
+ '--io-size', str(io_size),
+ ],
+ stdout=StringIO(),
+ stderr=StringIO(),
+ wait=True,
+ )
+ self.process_samples(p.stderr.getvalue())
+
+ self.ceph_manager.raw_cluster_cmd('osd', 'in', osd)
+
+ def process_samples(self, input):
+ """
+ Extract samples from the input and process the results
+
+ :param input: input lines in JSON format
+ """
+ lat = {}
+ for line in input.split('\n'):
+ try:
+ sample = json.loads(line)
+ samples = lat.setdefault(sample['type'], [])
+ samples.append(float(sample['latency']))
+ except Exception:
+ pass
+
+ for type in lat:
+ samples = lat[type]
+ samples.sort()
+
+ num = len(samples)
+
+ # median
+ if num & 1 == 1: # odd number of samples
+ median = samples[num / 2]
+ else:
+ median = (samples[num / 2] + samples[num / 2 - 1]) / 2
+
+ # 99%
+ ninety_nine = samples[int(num * 0.99)]
+
+ log.info("%s: median %f, 99%% %f" % (type, median, ninety_nine))