aboutsummaryrefslogtreecommitdiffstats
path: root/yardstick/benchmark/scenarios/storage/storperf.py
blob: f2fcce651c3b1136c1232911893140c67078913a (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
##############################################################################
# Copyright (c) 2016 Huawei Technologies Co.,Ltd.
#
# All rights reserved. This program and the accompanying materials
# are made available under the terms of the Apache License, Version 2.0
# which accompanies this distribution, and is available at
# http://www.apache.org/licenses/LICENSE-2.0
##############################################################################
from __future__ import absolute_import

import logging
import os
import time

from oslo_serialization import jsonutils
import requests

from yardstick.benchmark.scenarios import base


LOG = logging.getLogger(__name__)


class StorPerf(base.Scenario):
    """Execute StorPerf benchmark.
    Once the StorPerf container has been started and the ReST API exposed,
    you can interact directly with it using the ReST API. StorPerf comes with a
    Swagger interface that is accessible through the exposed port at:
    http://StorPerf:5000/swagger/index.html

  Command line options:
    target = [device or path] (Optional):
    The path to either an attached storage device (/dev/vdb, etc) or a
    directory path (/opt/storperf) that will be used to execute the performance
    test. In the case of a device, the entire device will be used.
    If not specified, the current directory will be used.

    workload = [workload module] (Optional):
    If not specified, the default is to run all workloads.
    The workload types are:
        rs: 100% Read, sequential data
        ws: 100% Write, sequential data
        rr: 100% Read, random access
        wr: 100% Write, random access
        rw: 70% Read / 30% write, random access

    report = [job_id] (Optional):
    Query the status of the supplied job_id and report on metrics.
    If a workload is supplied, will report on only that subset.

    """
    __scenario_type__ = "StorPerf"

    def __init__(self, scenario_cfg, context_cfg):
        """Scenario construction."""
        super(StorPerf, self).__init__()
        self.scenario_cfg = scenario_cfg
        self.context_cfg = context_cfg
        self.options = self.scenario_cfg["options"]

        self.target = self.options.get("StorPerf_ip", None)
        self.query_interval = self.options.get("query_interval", 10)
        # Maximum allowed job time
        self.timeout = self.options.get('timeout', 3600)

        self.setup_done = False
        self.job_completed = False

    def _query_setup_state(self):
        """Query the stack status."""
        LOG.info("Querying the stack state...")
        setup_query = requests.get('http://%s:5000/api/v1.0/configurations'
                                   % self.target)

        setup_query_content = jsonutils.loads(
            setup_query.content)
        if ("stack_created" in setup_query_content and
                setup_query_content["stack_created"]):
            LOG.debug("stack_created: %s",
                      setup_query_content["stack_created"])
            return True

        return False

    def setup(self):
        """Set the configuration."""
        env_args = {}
        env_args_payload_list = ["agent_count", "agent_flavor",
                                 "public_network", "agent_image",
                                 "volume_size", "volume_type",
                                 "volume_count", "availability_zone",
                                 "stack_name", "subnet_CIDR"]

        for env_argument in env_args_payload_list:
            try:
                env_args[env_argument] = self.options[env_argument]
            except KeyError:
                pass

        LOG.info("Creating a stack on node %s with parameters %s",
                 self.target, env_args)
        setup_res = requests.post('http://%s:5000/api/v1.0/configurations'
                                  % self.target, json=env_args)

        setup_res_content = jsonutils.loads(
            setup_res.content)

        if setup_res.status_code != 200:
            raise RuntimeError("Failed to create a stack, error message:",
                               setup_res_content["message"])
        elif setup_res.status_code == 200:
            LOG.info("stack_id: %s", setup_res_content["stack_id"])

        while not self._query_setup_state():
            time.sleep(self.query_interval)

        # We do not want to load the results of the disk initialization,
        # so it is not added to the results here.
        self.initialize_disks()
        self.setup_done = True

    def _query_job_state(self, job_id):
        """Query the status of the supplied job_id and report on metrics"""
        LOG.info("Fetching report for %s...", job_id)
        report_res = requests.get('http://{}:5000/api/v1.0/jobs'.format
                                  (self.target),
                                  params={'id': job_id, 'type': 'status'})

        report_res_content = jsonutils.loads(
            report_res.content)

        if report_res.status_code != 200:
            raise RuntimeError("Failed to fetch report, error message:",
                               report_res_content["message"])
        else:
            job_status = report_res_content["Status"]

        LOG.debug("Job is: %s...", job_status)
        self.job_completed = job_status == "Completed"

        # TODO: Support using StorPerf ReST API to read Job ETA.

        # if job_status == "completed":
        #     self.job_completed = True
        #     ETA = 0
        # elif job_status == "running":
        #     ETA = report_res_content['time']
        #
        # return ETA

    def run(self, result):
        """Execute StorPerf benchmark"""
        if not self.setup_done:
            self.setup()

        metadata = {"build_tag": "latest",
                    "test_case": "opnfv_yardstick_tc074"}
        metadata_payload_dict = {"pod_name": "NODE_NAME",
                                 "scenario_name": "DEPLOY_SCENARIO",
                                 "version": "YARDSTICK_BRANCH"}

        for key, value in metadata_payload_dict.items():
            try:
                metadata[key] = os.environ[value]
            except KeyError:
                pass

        job_args = {"metadata": metadata}
        job_args_payload_list = ["block_sizes", "queue_depths", "deadline",
                                 "target", "workload", "workloads",
                                 "agent_count", "steady_state_samples"]
        job_args["deadline"] = self.options["timeout"]

        for job_argument in job_args_payload_list:
            try:
                job_args[job_argument] = self.options[job_argument]
            except KeyError:
                pass

        api_version = "v1.0"

        if ("workloads" in job_args and
                job_args["workloads"] is not None and
                len(job_args["workloads"])) > 0:
            api_version = "v2.0"

        LOG.info("Starting a job with parameters %s", job_args)
        job_res = requests.post('http://%s:5000/api/%s/jobs' % (self.target,
                                                                api_version),
                                json=job_args)

        job_res_content = jsonutils.loads(job_res.content)

        if job_res.status_code != 200:
            raise RuntimeError("Failed to start a job, error message:",
                               job_res_content["message"])
        elif job_res.status_code == 200:
            job_id = job_res_content["job_id"]
            LOG.info("Started job id: %s...", job_id)

            while not self.job_completed:
                self._query_job_state(job_id)
                time.sleep(self.query_interval)

        # TODO: Support using ETA to polls for completion.
        #       Read ETA, next poll in 1/2 ETA time slot.
        #       If ETA is greater than the maximum allowed job time,
        #       then terminate job immediately.

        #   while not self.job_completed:
        #       esti_time = self._query_state(job_id)
        #       if esti_time > self.timeout:
        #           terminate_res = requests.delete('http://%s:5000/api/v1.0
        #                                           /jobs' % self.target)
        #       else:
        #           time.sleep(int(esti_time)/2)

            result_res = requests.get('http://%s:5000/api/v1.0/jobs?id=%s' %
                                      (self.target, job_id))
            result_res_content = jsonutils.loads(
                result_res.content)

            result.update(result_res_content)

    def initialize_disks(self):
        """Fills the target with random data prior to executing workloads"""

        job_args = {}
        job_args_payload_list = ["target"]

        for job_argument in job_args_payload_list:
            try:
                job_args[job_argument] = self.options[job_argument]
            except KeyError:
                pass

        LOG.info("Starting initialization with parameters %s", job_args)
        job_res = requests.post('http://%s:5000/api/v1.0/initializations' %
                                self.target, json=job_args)

        job_res_content = jsonutils.loads(job_res.content)

        if job_res.status_code != 200:
            raise RuntimeError(
                "Failed to start initialization job, error message:",
                job_res_content["message"])
        elif job_res.status_code == 200:
            job_id = job_res_content["job_id"]
            LOG.info("Started initialization as job id: %s...", job_id)

        while not self.job_completed:
            self._query_job_state(job_id)
            time.sleep(self.query_interval)

        self.job_completed = False

    def teardown(self):
        """Deletes the agent configuration and the stack"""
        teardown_res = requests.delete(
            'http://%s:5000/api/v1.0/configurations' % self.target)

        if teardown_res.status_code == 400:
            teardown_res_content = jsonutils.loads(
                teardown_res.json_data)
            raise RuntimeError("Failed to reset environment, error message:",
                               teardown_res_content['message'])

        self.setup_done = False