aboutsummaryrefslogtreecommitdiffstats
path: root/yardstick/benchmark/scenarios/storage/storperf.py
blob: 672cfaba2402f951797f9336749e1da6fde521ce (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
##############################################################################
# Copyright (c) 2016 Huawei Technologies Co.,Ltd.
#
# All rights reserved. This program and the accompanying materials
# are made available under the terms of the Apache License, Version 2.0
# which accompanies this distribution, and is available at
# http://www.apache.org/licenses/LICENSE-2.0
##############################################################################
from __future__ import absolute_import

import logging
import time

import requests
from oslo_serialization import jsonutils

from yardstick.benchmark.scenarios import base

LOG = logging.getLogger(__name__)


class StorPerf(base.Scenario):
    """Execute StorPerf benchmark.
    Once the StorPerf container has been started and the ReST API exposed,
    you can interact directly with it using the ReST API. StorPerf comes with a
    Swagger interface that is accessible through the exposed port at:
    http://StorPerf:5000/swagger/index.html

  Command line options:
    target = [device or path] (Optional):
    The path to either an attached storage device (/dev/vdb, etc) or a
    directory path (/opt/storperf) that will be used to execute the performance
    test. In the case of a device, the entire device will be used.
    If not specified, the current directory will be used.

    workload = [workload module] (Optional):
    If not specified, the default is to run all workloads.
    The workload types are:
        rs: 100% Read, sequential data
        ws: 100% Write, sequential data
        rr: 100% Read, random access
        wr: 100% Write, random access
        rw: 70% Read / 30% write, random access

    nossd (Optional):
    Do not perform SSD style preconditioning.

    nowarm (Optional):
    Do not perform a warmup prior to measurements.

    report = [job_id] (Optional):
    Query the status of the supplied job_id and report on metrics.
    If a workload is supplied, will report on only that subset.

    """
    __scenario_type__ = "StorPerf"

    def __init__(self, scenario_cfg, context_cfg):
        """Scenario construction."""
        super(StorPerf, self).__init__()
        self.scenario_cfg = scenario_cfg
        self.context_cfg = context_cfg
        self.options = self.scenario_cfg["options"]

        self.target = self.options.get("StorPerf_ip", None)
        self.query_interval = self.options.get("query_interval", 10)
        # Maximum allowed job time
        self.timeout = self.options.get('timeout', 3600)

        self.setup_done = False
        self.job_completed = False

    def _query_setup_state(self):
        """Query the stack status."""
        LOG.info("Querying the stack state...")
        setup_query = requests.get('http://%s:5000/api/v1.0/configurations'
                                   % self.target)

        setup_query_content = jsonutils.loads(
            setup_query.content)
        if setup_query_content["stack_created"]:
            self.setup_done = True
            LOG.debug("stack_created: %s",
                      setup_query_content["stack_created"])

    def setup(self):
        """Set the configuration."""
        env_args = {}
        env_args_payload_list = ["agent_count", "public_network",
                                 "agent_image", "volume_size"]

        for env_argument in env_args_payload_list:
            try:
                env_args[env_argument] = self.options[env_argument]
            except KeyError:
                pass

        LOG.info("Creating a stack on node %s with parameters %s",
                 self.target, env_args)
        setup_res = requests.post('http://%s:5000/api/v1.0/configurations'
                                  % self.target, json=env_args)

        setup_res_content = jsonutils.loads(
            setup_res.content)

        if setup_res.status_code != 200:
            raise RuntimeError("Failed to create a stack, error message:",
                               setup_res_content["message"])
        elif setup_res.status_code == 200:
            LOG.info("stack_id: %s", setup_res_content["stack_id"])

            while not self.setup_done:
                self._query_setup_state()
                time.sleep(self.query_interval)

    def _query_job_state(self, job_id):
        """Query the status of the supplied job_id and report on metrics"""
        LOG.info("Fetching report for %s...", job_id)
        report_res = requests.get('http://{}:5000/api/v1.0/jobs'.format
                                  (self.target),
                                  params={'id': job_id, 'type': 'status'})

        report_res_content = jsonutils.loads(
            report_res.content)

        if report_res.status_code != 200:
            raise RuntimeError("Failed to fetch report, error message:",
                               report_res_content["message"])
        else:
            job_status = report_res_content["Status"]

        LOG.debug("Job is: %s...", job_status)
        self.job_completed = job_status == "Completed"

        # TODO: Support using StorPerf ReST API to read Job ETA.

        # if job_status == "completed":
        #     self.job_completed = True
        #     ETA = 0
        # elif job_status == "running":
        #     ETA = report_res_content['time']
        #
        # return ETA

    def run(self, result):
        """Execute StorPerf benchmark"""
        if not self.setup_done:
            self.setup()

        job_args = {}
        job_args_payload_list = ["block_sizes", "queue_depths", "deadline",
                                 "target", "nossd", "nowarm", "workload"]

        for job_argument in job_args_payload_list:
            try:
                job_args[job_argument] = self.options[job_argument]
            except KeyError:
                pass

        LOG.info("Starting a job with parameters %s", job_args)
        job_res = requests.post('http://%s:5000/api/v1.0/jobs' % self.target,
                                json=job_args)

        job_res_content = jsonutils.loads(job_res.content)

        if job_res.status_code != 200:
            raise RuntimeError("Failed to start a job, error message:",
                               job_res_content["message"])
        elif job_res.status_code == 200:
            job_id = job_res_content["job_id"]
            LOG.info("Started job id: %s...", job_id)

            while not self.job_completed:
                self._query_job_state(job_id)
                time.sleep(self.query_interval)

            terminate_res = requests.delete('http://%s:5000/api/v1.0/jobs' %
                                            self.target)

            if terminate_res.status_code != 200:
                terminate_res_content = jsonutils.loads(
                    terminate_res.content)
                raise RuntimeError("Failed to start a job, error message:",
                                   terminate_res_content["message"])

        # TODO: Support using ETA to polls for completion.
        #       Read ETA, next poll in 1/2 ETA time slot.
        #       If ETA is greater than the maximum allowed job time,
        #       then terminate job immediately.

        #   while not self.job_completed:
        #       esti_time = self._query_state(job_id)
        #       if esti_time > self.timeout:
        #           terminate_res = requests.delete('http://%s:5000/api/v1.0
        #                                           /jobs' % self.target)
        #       else:
        #           time.sleep(int(est_time)/2)

            result_res = requests.get('http://%s:5000/api/v1.0/jobs?id=%s' %
                                      (self.target, job_id))
            result_res_content = jsonutils.loads(
                result_res.content)

            result.update(result_res_content)

    def teardown(self):
        """Deletes the agent configuration and the stack"""
        teardown_res = requests.delete('http://%s:5000/api/v1.0/\
                                       configurations' % self.target)

        if teardown_res.status_code == 400:
            teardown_res_content = jsonutils.loads(
                teardown_res.content)
            raise RuntimeError("Failed to reset environment, error message:",
                               teardown_res_content['message'])

        self.setup_done = False