aboutsummaryrefslogtreecommitdiffstats
path: root/yardstick/network_services/nfvi/resource.py
blob: ba49ab5b4c6d2dd9c1bff1ae24f101fbdcaea7cb (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
# Copyright (c) 2016-2017 Intel Corporation
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#      http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
""" Resource collection definitions """

import errno
from itertools import chain
import logging
import multiprocessing
import os
import os.path
import re

import jinja2
import pkg_resources
from oslo_config import cfg
from oslo_utils.encodeutils import safe_decode

from yardstick import ssh
from yardstick.common.exceptions import ResourceCommandError
from yardstick.common.task_template import finalize_for_yaml
from yardstick.common.utils import validate_non_string_sequence
from yardstick.network_services.nfvi.collectd import AmqpConsumer
from yardstick.benchmark.contexts import heat


LOG = logging.getLogger(__name__)

CONF = cfg.CONF
ZMQ_OVS_PORT = 5567
ZMQ_POLLING_TIME = 12000
LIST_PLUGINS_ENABLED = ["amqp", "cpu", "cpufreq", "memory",
                        "hugepages"]


class ResourceProfile(object):
    """
    This profile adds a resource at the beginning of the test session
    """
    COLLECTD_CONF = "collectd.conf"
    BAR_COLLECTD_CONF_PATH = "/opt/collectd/etc/collectd.conf.d/"
    AMPQ_PORT = 5672
    DEFAULT_INTERVAL = 25
    DEFAULT_TIMEOUT = 3600
    OVS_SOCKET_PATH = "/usr/local/var/run/openvswitch/db.sock"

    def __init__(self, mgmt, port_names=None, plugins=None,
                 interval=None, timeout=None, reset_mq_flag=True):

        if plugins is None:
            self.plugins = {}
        else:
            self.plugins = plugins

        if interval is None:
            self.interval = self.DEFAULT_INTERVAL
        else:
            self.interval = interval

        if timeout is None:
            self.timeout = self.DEFAULT_TIMEOUT
        else:
            self.timeout = timeout

        self.enable = True
        self._queue = multiprocessing.Queue()
        self.amqp_client = None
        self.port_names = validate_non_string_sequence(port_names, default=[])

        # we need to save mgmt so we can connect to port 5672
        self.mgmt = mgmt
        self.connection = ssh.AutoConnectSSH.from_node(mgmt)
        self._reset_mq_flag = reset_mq_flag

    @classmethod
    def make_from_node(cls, node, timeout):
        # node dict works as mgmt dict
        # don't need port names, there is no way we can
        # tell what port is used on the compute node
        collectd_options = node["collectd"]
        plugins = collectd_options.get("plugins", {})
        interval = collectd_options.get("interval")

        reset_mq_flag = (False if node.get("ctx_type") == heat.HeatContext.__context_type__
                          else True)
        return cls(node, plugins=plugins, interval=interval,
                   timeout=timeout, reset_mq_flag=reset_mq_flag)

    def check_if_system_agent_running(self, process):
        """ verify if system agent is running """
        try:
            err, pid, _ = self.connection.execute("pgrep -f %s" % process)
            # strip whitespace
            return err, pid.strip()
        except OSError as e:
            if e.errno in {errno.ECONNRESET}:
                # if we can't connect to check, then we won't be able to connect to stop it
                LOG.exception("Can't connect to host to check %s status", process)
                return 1, None
            raise

    def run_collectd_amqp(self):
        """ run amqp consumer to collect the NFVi data """
        amqp_url = 'amqp://admin:admin@{}:{}/%2F'.format(self.mgmt['ip'], self.AMPQ_PORT)
        amqp = AmqpConsumer(amqp_url, self._queue)
        try:
            amqp.run()
        except (AttributeError, RuntimeError, KeyboardInterrupt):
            amqp.stop()

    @classmethod
    def parse_simple_resource(cls, key, value):
        reskey = "/".join(rkey for rkey in key if "nsb_stats" not in rkey)
        return {reskey: value.split(":")[1]}

    @classmethod
    def get_cpu_data(cls, res_key0, res_key1, value):
        """ Get cpu topology of the host """
        pattern = r"-(\d+)"

        if 'cpufreq' in res_key0:
            metric, source = res_key0, res_key1
        else:
            metric, source = res_key1, res_key0

        match = re.search(pattern, source, re.MULTILINE)
        if not match:
            return "error", "Invalid", "", ""

        time, value = value.split(":")
        return str(match.group(1)), metric, value, time

    @classmethod
    def parse_hugepages(cls, key, value):
        return cls.parse_simple_resource(key, value)

    @classmethod
    def parse_dpdkstat(cls, key, value):
        return cls.parse_simple_resource(key, value)

    @classmethod
    def parse_virt(cls, key, value):
        return cls.parse_simple_resource(key, value)

    @classmethod
    def parse_ovs_stats(cls, key, value):
        return cls.parse_simple_resource(key, value)

    @classmethod
    def parse_intel_pmu_stats(cls, key, value):
        return {''.join(str(v) for v in key): value.split(":")[1]}

    def parse_collectd_result(self, metrics):
        """ convert collectd data into json"""
        result = {
            "cpu": {},
            "memory": {},
            "hugepages": {},
            "dpdkstat": {},
            "virt": {},
            "ovs_stats": {},
        }
        testcase = ""

        # unicode decode
        decoded = ((safe_decode(k, 'utf-8'), safe_decode(v, 'utf-8')) for k, v in metrics.items())
        for key, value in decoded:
            key_split = key.split("/")
            res_key_iter = (key for key in key_split if "nsb_stats" not in key)
            res_key0 = next(res_key_iter)
            res_key1 = next(res_key_iter)

            if "cpu" in res_key0 or "intel_rdt" in res_key0 or "intel_pmu" in res_key0:
                cpu_key, name, metric, testcase = \
                    self.get_cpu_data(res_key0, res_key1, value)
                result["cpu"].setdefault(cpu_key, {}).update({name: metric})

            elif "memory" in res_key0:
                result["memory"].update({res_key1: value.split(":")[0]})

            elif "hugepages" in res_key0:
                result["hugepages"].update(self.parse_hugepages(key_split, value))

            elif "dpdkstat" in res_key0:
                result["dpdkstat"].update(self.parse_dpdkstat(key_split, value))

            elif "virt" in res_key1:
                result["virt"].update(self.parse_virt(key_split, value))

            elif "ovs_stats" in res_key0:
                result["ovs_stats"].update(self.parse_ovs_stats(key_split, value))

        result["timestamp"] = testcase

        return result

    def amqp_process_for_nfvi_kpi(self):
        """ amqp collect and return nfvi kpis """
        if self.amqp_client is None and self.enable:
            self.amqp_client = multiprocessing.Process(
                name="AmqpClient-{}-{}".format(self.mgmt['ip'], os.getpid()),
                target=self.run_collectd_amqp)
            self.amqp_client.start()

    def amqp_collect_nfvi_kpi(self):
        """ amqp collect and return nfvi kpis """
        if not self.enable:
            return {}

        if self.check_if_system_agent_running("collectd")[0] != 0:
            return {}

        metric = {}
        while not self._queue.empty():
            metric.update(self._queue.get())

        return self.parse_collectd_result(metric)

    def _provide_config_file(self, config_file_path, nfvi_cfg, template_kwargs):
        template = pkg_resources.resource_string("yardstick.network_services.nfvi",
                                                 nfvi_cfg).decode('utf-8')
        cfg_content = jinja2.Template(template, trim_blocks=True, lstrip_blocks=True,
                                      finalize=finalize_for_yaml).render(
            **template_kwargs)
        # cfg_content = io.StringIO(template.format(**template_kwargs))
        cfg_file = os.path.join(config_file_path, nfvi_cfg)
        # must write as root, so use sudo
        self.connection.execute("cat | sudo tee {}".format(cfg_file), stdin=cfg_content)

    def _prepare_collectd_conf(self, config_file_path):
        """ Prepare collectd conf """

        kwargs = {
            "interval": self.interval,
            "loadplugins": set(chain(LIST_PLUGINS_ENABLED, self.plugins.keys())),
            # Optional fields PortName is descriptive only, use whatever is present
            "port_names": self.port_names,
            # "ovs_bridge_interfaces": ["br-int"],
            "plugins": self.plugins,
        }
        self._provide_config_file(config_file_path, self.COLLECTD_CONF, kwargs)
        self._provide_config_file(self.BAR_COLLECTD_CONF_PATH,
                                  self.COLLECTD_CONF, kwargs)

    def _setup_ovs_stats(self, connection):
        try:
            socket_path = self.plugins["ovs_stats"].get("ovs_socket_path", self.OVS_SOCKET_PATH)
        except KeyError:
            # ovs_stats is not a dict
            socket_path = self.OVS_SOCKET_PATH
        status = connection.execute("test -S {}".format(socket_path))[0]
        if status != 0:
            LOG.error("cannot find OVS socket %s", socket_path)

    def _reset_rabbitmq(self, connection):
        # Reset amqp queue
        LOG.debug("reset and setup amqp to collect data from collectd")
        # ensure collectd.conf.d exists to avoid error/warning
        cmd_list = ["sudo mkdir -p /etc/collectd/collectd.conf.d",
                    "sudo service rabbitmq-server restart",
                    "sudo rabbitmqctl stop_app",
                    "sudo rabbitmqctl reset",
                    "sudo rabbitmqctl start_app",
                    "sudo rabbitmqctl add_user admin admin",
                    "sudo rabbitmqctl authenticate_user admin admin",
                    "sudo rabbitmqctl set_permissions -p / admin '.*' '.*' '.*'"
                    ]

        for cmd in cmd_list:
            exit_status, _, stderr = connection.execute(cmd)
            if exit_status != 0:
                raise ResourceCommandError(command=cmd, stderr=stderr)

    def _check_rabbitmq_user(self, connection, user='admin'):
        exit_status, stdout, _ = connection.execute("sudo rabbitmqctl list_users")
        if exit_status == 0:
            for line in stdout.split('\n')[1:]:
                if line.split('\t')[0] == user:
                    return True

    def _set_rabbitmq_admin_user(self, connection):
        LOG.debug("add admin user to amqp")
        cmd_list = ["sudo rabbitmqctl add_user admin admin",
                    "sudo rabbitmqctl authenticate_user admin admin",
                    "sudo rabbitmqctl set_permissions -p / admin '.*' '.*' '.*'"
                    ]

        for cmd in cmd_list:
            exit_status, stdout, stderr = connection.execute(cmd)
            if exit_status != 0:
                raise ResourceCommandError(command=cmd, stdout=stdout, stderr=stderr)

    def _start_rabbitmq(self, connection):
        if self._reset_mq_flag:
            self._reset_rabbitmq(connection)
        else:
            if not self._check_rabbitmq_user(connection):
                self._set_rabbitmq_admin_user(connection)

        # check stdout for "sudo rabbitmqctl status" command
        cmd = "sudo rabbitmqctl status"
        _, stdout, stderr = connection.execute(cmd)
        if not re.search("RabbitMQ", stdout):
            LOG.error("rabbitmqctl status don't have RabbitMQ in running apps")
            raise ResourceCommandError(command=cmd, stderr=stderr)

    def _start_collectd(self, connection, bin_path):
        LOG.debug("Starting collectd to collect NFVi stats")
        collectd_path = os.path.join(bin_path, "collectd", "sbin", "collectd")
        config_file_path = os.path.join(bin_path, "collectd", "etc")
        self._prepare_collectd_conf(config_file_path)

        connection.execute('sudo pkill -x -9 collectd')
        cmd = "which %s > /dev/null 2>&1" % collectd_path
        exit_status, _, stderr = connection.execute(cmd)
        if exit_status != 0:
            raise ResourceCommandError(command=cmd, stderr=stderr)

        if "ovs_stats" in self.plugins:
            self._setup_ovs_stats(connection)

        LOG.debug("Starting collectd to collect NFVi stats")
        LOG.debug("Start collectd service..... %s second timeout", self.timeout)
        # intel_pmu plug requires large numbers of files open, so try to set
        # ulimit -n to a large value

        cmd = "sudo bash -c 'ulimit -n 1000000 ; %s'" % collectd_path
        exit_status, _, stderr = connection.execute(cmd, timeout=self.timeout)
        if exit_status != 0:
            raise ResourceCommandError(command=cmd, stderr=stderr)

        LOG.debug("Done")

    def initiate_systemagent(self, bin_path):
        """ Start system agent for NFVi collection on host """
        if self.enable:
            try:
                self._start_rabbitmq(self.connection)
                self._start_collectd(self.connection, bin_path)
            except ResourceCommandError as e:
                LOG.exception("Exception during collectd and rabbitmq start: %s", str(e))
                raise

    def start(self):
        """ start nfvi collection """
        if self.enable:
            LOG.debug("Start NVFi metric collection...")

    def stop(self):
        """ stop nfvi collection """
        if not self.enable:
            return

        agent = "collectd"
        LOG.debug("Stop resource monitor...")

        if self.amqp_client is not None:
            # we proper and try to join first
            self.amqp_client.join(3)
            self.amqp_client.terminate()

        LOG.debug("Check if %s is running", agent)
        status, pid = self.check_if_system_agent_running(agent)
        LOG.debug("status %s  pid %s", status, pid)
        if status != 0:
            return

        if pid:
            self.connection.execute('sudo kill -9 "%s"' % pid)
        self.connection.execute('sudo pkill -9 "%s"' % agent)

        if self._reset_mq_flag:
            self.connection.execute('sudo service rabbitmq-server stop')
            self.connection.execute("sudo rabbitmqctl stop_app")