{ "__inputs": [ { "name": "DS_YARDSTICK", "label": "yardstick", "description": "", "type": "datasource", "pluginId": "influxdb", "pluginName": "InfluxDB" } ], "__requires": [ { "type": "grafana", "id": "grafana", "name": "Grafana", "version": "4.4.3" }, { "type": "panel", "id": "graph", "name": "Graph", "version": "" }, { "type": "datasource", "id": "influxdb", "name": "InfluxDB", "version": "1.0.0" }, { "type": "panel", "id": "text", "name": "Text", "version": "" } ], "annotations": { "list": [] }, "editable": true, "gnetId": null, "graphTooltip": 0, "hideControls": false, "id": null, "links": [], "refresh": false, "rows": [ { "collapse": false, "height": "100px", "panels": [ { "content": "
The vPE handles packet processing, routing, QinQ encapsulation, flows, ACL rules, adds/removes MPLS tags and performs QoS
\nThe KPI is the number of packets per second for a specified packet size (min packet size is 68 byte) with an accepted minimal packet loss
\n#!/usr/bin/env python
##############################################################################
# Copyright (c) 2015 Ericsson AB and others.
#
# All rights reserved. This program and the accompanying materials
# are made available under the terms of the Apache License, Version 2.0
# which accompanies this distribution, and is available at
# http://www.apache.org/licenses/LICENSE-2.0
##############################################################################
# Unittest for yardstick.benchmark.scenarios.compute.lmbench.Lmbench
from __future__ import absolute_import
import mock
import unittest
import os
from yardstick.benchmark.scenarios.compute import cpuload
@mock.patch('yardstick.benchmark.scenarios.compute.cpuload.ssh')
class CPULoadTestCase(unittest.TestCase):
def setUp(self):
self.ctx = {
'host': {
'ip': '172.16.0.137',
'user': 'cirros',
'key_filename': "mykey.key"
}
}
self.result = {}
def test_setup_mpstat_installed(self, mock_ssh):
options = {
"interval": 1,
"count": 1
}
args = {'options': options}
l = cpuload.CPULoad(args, self.ctx)
mock_ssh.SSH.from_node().execute.return_value = (0, '', '')
l.setup()
self.assertIsNotNone(l.client)
self.assertTrue(l.setup_done)
self.assertTrue(l.has_mpstat)
def test_setup_mpstat_not_installed(self, mock_ssh):
options = {
"interval": 1,
"count": 1
}
args = {'options': options}
l = cpuload.CPULoad(args, self.ctx)
mock_ssh.SSH.from_node().execute.return_value = (127, '', '')
l.setup()
self.assertIsNotNone(l.client)
self.assertTrue(l.setup_done)
self.assertFalse(l.has_mpstat)
def test_execute_command_success(self, mock_ssh):
options = {
"interval": 1,
"count": 1
}
args = {'options': options}
l = cpuload.CPULoad(args, self.ctx)
mock_ssh.SSH.from_node().execute.return_value = (0, '', '')
l.setup()
expected_result = 'abcdefg'
mock_ssh.SSH.from_node().execute.return_value = (0, expected_result, '')
result = l._execute_command("foo")
self.assertEqual(result, expected_result)
def test_execute_command_failed(self, mock_ssh):
options = {
"interval": 1,
"count": 1
}
args = {'options': options}
l = cpuload.CPULoad(args, self.ctx)
mock_ssh.SSH.from_node().execute.return_value = (0, '', '')
l.setup()
mock_ssh.SSH.from_node().execute.return_value = (127, '', 'abcdefg'