summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--yardstick/common/ansible_common.py36
-rw-r--r--yardstick/tests/unit/common/test_ansible_common.py205
2 files changed, 108 insertions, 133 deletions
diff --git a/yardstick/common/ansible_common.py b/yardstick/common/ansible_common.py
index ca5a110e2..dee7044a5 100644
--- a/yardstick/common/ansible_common.py
+++ b/yardstick/common/ansible_common.py
@@ -12,8 +12,6 @@
# See the License for the specific language governing permissions and
# limitations under the License.
-from __future__ import absolute_import
-
import cgitb
import collections
import contextlib as cl
@@ -23,11 +21,11 @@ import os
from collections import Mapping, MutableMapping, Iterable, Callable, deque
from functools import partial
from itertools import chain
-from subprocess import CalledProcessError, Popen, PIPE
-from tempfile import NamedTemporaryFile
+import subprocess
+import tempfile
import six
-import six.moves.configparser as ConfigParser
+from six.moves import configparser
import yaml
from six import StringIO
from chainmap import ChainMap
@@ -134,10 +132,9 @@ class CustomTemporaryFile(object):
else:
self.data_types = self.DEFAULT_DATA_TYPES
# must open "w+" so unicode is encoded correctly
- self.creator = partial(NamedTemporaryFile, mode="w+", delete=False,
- dir=directory,
- prefix=prefix,
- suffix=self.suffix)
+ self.creator = partial(
+ tempfile.NamedTemporaryFile, mode="w+", delete=False,
+ dir=directory, prefix=prefix, suffix=self.suffix)
def make_context(self, data, write_func, descriptor='data'):
return TempfileContext(data, write_func, descriptor, self.data_types,
@@ -191,8 +188,8 @@ class FileNameGenerator(object):
if not prefix.endswith('_'):
prefix += '_'
- temp_file = NamedTemporaryFile(delete=False, dir=directory,
- prefix=prefix, suffix=suffix)
+ temp_file = tempfile.NamedTemporaryFile(delete=False, dir=directory,
+ prefix=prefix, suffix=suffix)
with cl.closing(temp_file):
return temp_file.name
@@ -474,7 +471,7 @@ class AnsibleCommon(object):
prefix = '_'.join([self.prefix, prefix, 'inventory'])
ini_temp_file = IniMapTemporaryFile(directory=directory, prefix=prefix)
- inventory_config = ConfigParser.ConfigParser(allow_no_value=True)
+ inventory_config = configparser.ConfigParser(allow_no_value=True)
# disable default lowercasing
inventory_config.optionxform = str
return ini_temp_file.make_context(self.inventory_dict, write_func,
@@ -510,7 +507,7 @@ class AnsibleCommon(object):
return timeout
def _generate_ansible_cfg(self, directory):
- parser = ConfigParser.ConfigParser()
+ parser = configparser.ConfigParser()
parser.add_section('defaults')
parser.set('defaults', 'host_key_checking', 'False')
@@ -541,12 +538,12 @@ class AnsibleCommon(object):
cmd = ['ansible', 'all', '-m', 'setup', '-i',
inventory_path, '--tree', sut_dir]
- proc = Popen(cmd, stdout=PIPE, cwd=directory)
+ proc = subprocess.Popen(cmd, stdout=subprocess.PIPE, cwd=directory)
output, _ = proc.communicate()
retcode = proc.wait()
LOG.debug("exit status = %s", retcode)
if retcode != 0:
- raise CalledProcessError(retcode, cmd, output)
+ raise subprocess.CalledProcessError(retcode, cmd, output)
def _gen_sut_info_dict(self, sut_dir):
sut_info = {}
@@ -617,12 +614,13 @@ class AnsibleCommon(object):
# 'timeout': timeout / 2,
})
with Timer() as timer:
- proc = Popen(cmd, stdout=PIPE, **exec_args)
+ proc = subprocess.Popen(cmd, stdout=subprocess.PIPE,
+ **exec_args)
output, _ = proc.communicate()
retcode = proc.wait()
LOG.debug("exit status = %s", retcode)
if retcode != 0:
- raise CalledProcessError(retcode, cmd, output)
+ raise subprocess.CalledProcessError(retcode, cmd, output)
timeout -= timer.total_seconds()
cmd.remove("--syntax-check")
@@ -632,10 +630,10 @@ class AnsibleCommon(object):
# TODO: add timeout support of use subprocess32 backport
# 'timeout': timeout,
})
- proc = Popen(cmd, stdout=PIPE, **exec_args)
+ proc = subprocess.Popen(cmd, stdout=subprocess.PIPE, **exec_args)
output, _ = proc.communicate()
retcode = proc.wait()
LOG.debug("exit status = %s", retcode)
if retcode != 0:
- raise CalledProcessError(retcode, cmd, output)
+ raise subprocess.CalledProcessError(retcode, cmd, output)
return output
diff --git a/yardstick/tests/unit/common/test_ansible_common.py b/yardstick/tests/unit/common/test_ansible_common.py
index 48d8a60c8..bf82f6288 100644
--- a/yardstick/tests/unit/common/test_ansible_common.py
+++ b/yardstick/tests/unit/common/test_ansible_common.py
@@ -12,28 +12,23 @@
# See the License for the specific language governing permissions and
# limitations under the License.
-
-from __future__ import absolute_import
-
-import os
-import tempfile
+import collections
import shutil
-from collections import defaultdict
+import subprocess
+import tempfile
import mock
-import unittest
-
-from six.moves.configparser import ConfigParser
-from six.moves import StringIO
+from six import moves
+from six.moves import configparser
from yardstick.common import ansible_common
+from yardstick.tests.unit import base as ut_base
-PREFIX = 'yardstick.common.ansible_common'
+class OverwriteDictTestCase(ut_base.BaseUnitTestCase):
-class OverwriteDictTestCase(unittest.TestCase):
def test_overwrite_dict_cfg(self):
- c = ConfigParser(allow_no_value=True)
+ c = configparser.ConfigParser(allow_no_value=True)
d = {
"section_a": "empty_value",
"section_b": {"key_c": "Val_d", "key_d": "VAL_D"},
@@ -43,86 +38,78 @@ class OverwriteDictTestCase(unittest.TestCase):
# Python3 and Python2 convert empty values into None or ''
# we don't really care but we need to compare correctly for unittest
self.assertTrue(c.has_option("section_a", "empty_value"))
- self.assertEqual(sorted(c.items("section_b")), [('key_c', 'Val_d'), ('key_d', 'VAL_D')])
+ self.assertEqual(sorted(c.items("section_b")),
+ [('key_c', 'Val_d'), ('key_d', 'VAL_D')])
self.assertTrue(c.has_option("section_c", "key_c"))
self.assertTrue(c.has_option("section_c", "key_d"))
-class FilenameGeneratorTestCase(unittest.TestCase):
- @mock.patch('{}.NamedTemporaryFile'.format(PREFIX))
+class FilenameGeneratorTestCase(ut_base.BaseUnitTestCase):
+
+ @mock.patch.object(tempfile, 'NamedTemporaryFile')
def test__handle_existing_file(self, _):
- ansible_common.FileNameGenerator._handle_existing_file("/dev/null")
+ ansible_common.FileNameGenerator._handle_existing_file('/dev/null')
def test_get_generator_from_file(self):
- ansible_common.FileNameGenerator.get_generator_from_filename("/dev/null", "", "", "")
+ ansible_common.FileNameGenerator.get_generator_from_filename(
+ '/dev/null', '', '', '')
def test_get_generator_from_file_middle(self):
- ansible_common.FileNameGenerator.get_generator_from_filename("/dev/null", "", "",
- "null")
+ ansible_common.FileNameGenerator.get_generator_from_filename(
+ '/dev/null', '', '', 'null')
def test_get_generator_from_file_prefix(self):
- ansible_common.FileNameGenerator.get_generator_from_filename("/dev/null", "", "null",
- "middle")
+ ansible_common.FileNameGenerator.get_generator_from_filename(
+ '/dev/null', '', 'null', 'middle')
-class AnsibleNodeTestCase(unittest.TestCase):
- def test_ansible_node(self):
- ansible_common.AnsibleNode()
+class AnsibleNodeTestCase(ut_base.BaseUnitTestCase):
def test_ansible_node_len(self):
- a = ansible_common.AnsibleNode()
- len(a)
+ self.assertEqual(0, len(ansible_common.AnsibleNode()))
def test_ansible_node_repr(self):
- a = ansible_common.AnsibleNode()
- repr(a)
+ self.assertEqual('AnsibleNode<{}>', repr(ansible_common.AnsibleNode()))
def test_ansible_node_iter(self):
- a = ansible_common.AnsibleNode()
- for _ in a:
- pass
+ node = ansible_common.AnsibleNode(data={'a': 1, 'b': 2, 'c': 3})
+ for key in node:
+ self.assertIn(key, ('a', 'b', 'c'))
def test_is_role(self):
- a = ansible_common.AnsibleNode()
- self.assertFalse(a.is_role("", default="foo"))
+ node = ansible_common.AnsibleNode()
+ self.assertFalse(node.is_role('', default='foo'))
def test_ansible_node_get_tuple(self):
- a = ansible_common.AnsibleNode({"name": "name"})
- self.assertEqual(a.get_tuple(), ('name', a))
+ node = ansible_common.AnsibleNode({'name': 'name'})
+ self.assertEqual(node.get_tuple(), ('name', node))
def test_gen_inventory_line(self):
- a = ansible_common.AnsibleNode(defaultdict(str))
+ a = ansible_common.AnsibleNode(collections.defaultdict(str))
self.assertEqual(a.gen_inventory_line(), "")
def test_ansible_node_delitem(self):
- a = ansible_common.AnsibleNode({"name": "name"})
- del a['name']
+ node = ansible_common.AnsibleNode({'name': 'name'})
+ self.assertEqual(1, len(node))
+ del node['name']
+ self.assertEqual(0, len(node))
def test_ansible_node_getattr(self):
- a = ansible_common.AnsibleNode({"name": "name"})
- self.assertIsNone(getattr(a, "nosuch", None))
+ node = ansible_common.AnsibleNode({'name': 'name'})
+ self.assertIsNone(getattr(node, 'nosuch', None))
-class AnsibleNodeDictTestCase(unittest.TestCase):
- def test_ansible_node_dict(self):
- n = ansible_common.AnsibleNode
- ansible_common.AnsibleNodeDict(n, {})
+class AnsibleNodeDictTestCase(ut_base.BaseUnitTestCase):
def test_ansible_node_dict_len(self):
n = ansible_common.AnsibleNode
a = ansible_common.AnsibleNodeDict(n, {})
- len(a)
+ self.assertEqual(0, len(a))
def test_ansible_node_dict_repr(self):
n = ansible_common.AnsibleNode
a = ansible_common.AnsibleNodeDict(n, {})
- repr(a)
-
- def test_ansible_node_dict_iter(self):
- n = ansible_common.AnsibleNode
- a = ansible_common.AnsibleNodeDict(n, {})
- for _ in a:
- pass
+ self.assertEqual('{}', repr(a))
def test_ansible_node_dict_get(self):
n = ansible_common.AnsibleNode
@@ -144,12 +131,15 @@ class AnsibleNodeDictTestCase(unittest.TestCase):
["name ansible_ssh_pass=PASS ansible_user=user"])
-class AnsibleCommonTestCase(unittest.TestCase):
- def test_get_timeouts(self):
- self.assertAlmostEqual(ansible_common.AnsibleCommon.get_timeout(-100), 1200.0)
+class AnsibleCommonTestCase(ut_base.BaseUnitTestCase):
- def test__init__(self):
- ansible_common.AnsibleCommon({})
+ @staticmethod
+ def _delete_tmpdir(dir):
+ shutil.rmtree(dir)
+
+ def test_get_timeouts(self):
+ self.assertAlmostEqual(
+ ansible_common.AnsibleCommon.get_timeout(-100), 1200.0)
def test_reset(self):
a = ansible_common.AnsibleCommon({})
@@ -184,81 +174,68 @@ class AnsibleCommonTestCase(unittest.TestCase):
a.deploy_dir = "d"
self.assertEqual(a.deploy_dir, "d")
- @mock.patch('{}.open'.format(PREFIX))
- def test__gen_ansible_playbook_file_list(self, _):
+ @mock.patch.object(moves.builtins, 'open')
+ def test__gen_ansible_playbook_file_list(self, *args):
d = tempfile.mkdtemp()
- try:
- a = ansible_common.AnsibleCommon({})
- a._gen_ansible_playbook_file(["a"], d)
- finally:
- os.rmdir(d)
-
- @mock.patch('{}.NamedTemporaryFile'.format(PREFIX))
- @mock.patch('{}.open'.format(PREFIX))
- def test__gen_ansible_inventory_file(self, _, __):
+ self.addCleanup(self._delete_tmpdir, d)
+ a = ansible_common.AnsibleCommon({})
+ a._gen_ansible_playbook_file(["a"], d)
+
+ @mock.patch.object(tempfile, 'NamedTemporaryFile')
+ @mock.patch.object(moves.builtins, 'open')
+ def test__gen_ansible_inventory_file(self, *args):
nodes = [{
"name": "name", "user": "user", "password": "PASS",
"role": "role",
}]
d = tempfile.mkdtemp()
- try:
- a = ansible_common.AnsibleCommon(nodes)
- a.gen_inventory_ini_dict()
- inv_context = a._gen_ansible_inventory_file(d)
- with inv_context:
- c = StringIO()
- inv_context.write_func(c)
- self.assertIn("ansible_ssh_pass=PASS", c.getvalue())
- finally:
- os.rmdir(d)
-
- @mock.patch('{}.NamedTemporaryFile'.format(PREFIX))
- @mock.patch('{}.open'.format(PREFIX))
- def test__gen_ansible_playbook_file_list_multiple(self, _, __):
+ self.addCleanup(self._delete_tmpdir, d)
+ a = ansible_common.AnsibleCommon(nodes)
+ a.gen_inventory_ini_dict()
+ inv_context = a._gen_ansible_inventory_file(d)
+ with inv_context:
+ c = moves.StringIO()
+ inv_context.write_func(c)
+ self.assertIn("ansible_ssh_pass=PASS", c.getvalue())
+
+ @mock.patch.object(tempfile, 'NamedTemporaryFile')
+ @mock.patch.object(moves.builtins, 'open')
+ def test__gen_ansible_playbook_file_list_multiple(self, *args):
d = tempfile.mkdtemp()
- try:
- a = ansible_common.AnsibleCommon({})
- a._gen_ansible_playbook_file(["a", "b"], d)
- finally:
- os.rmdir(d)
-
- @mock.patch('{}.NamedTemporaryFile'.format(PREFIX))
- @mock.patch('{}.Popen'.format(PREFIX))
- @mock.patch('{}.open'.format(PREFIX))
- def test_do_install_tmp_dir(self, _, mock_popen, __):
+ self.addCleanup(self._delete_tmpdir, d)
+ a = ansible_common.AnsibleCommon({})
+ a._gen_ansible_playbook_file(["a", "b"], d)
+
+ @mock.patch.object(tempfile, 'NamedTemporaryFile')
+ @mock.patch.object(subprocess, 'Popen')
+ @mock.patch.object(moves.builtins, 'open')
+ def test_do_install_tmp_dir(self, _, mock_popen, *args):
mock_popen.return_value.communicate.return_value = "", ""
mock_popen.return_value.wait.return_value = 0
d = tempfile.mkdtemp()
- try:
- a = ansible_common.AnsibleCommon({})
- a.do_install('', d)
- finally:
- os.rmdir(d)
-
- @mock.patch('{}.NamedTemporaryFile'.format(PREFIX))
- @mock.patch('{}.Popen'.format(PREFIX))
- @mock.patch('{}.open'.format(PREFIX))
- def test_execute_ansible_check(self, _, mock_popen, __):
+ self.addCleanup(self._delete_tmpdir, d)
+ a = ansible_common.AnsibleCommon({})
+ a.do_install('', d)
+
+ @mock.patch.object(tempfile, 'NamedTemporaryFile')
+ @mock.patch.object(moves.builtins, 'open')
+ @mock.patch.object(subprocess, 'Popen')
+ def test_execute_ansible_check(self, mock_popen, *args):
mock_popen.return_value.communicate.return_value = "", ""
mock_popen.return_value.wait.return_value = 0
d = tempfile.mkdtemp()
- try:
- a = ansible_common.AnsibleCommon({})
- a.execute_ansible('', d, ansible_check=True, verbose=True)
- finally:
- os.rmdir(d)
+ self.addCleanup(self._delete_tmpdir, d)
+ a = ansible_common.AnsibleCommon({})
+ a.execute_ansible('', d, ansible_check=True, verbose=True)
def test_get_sut_info(self):
d = tempfile.mkdtemp()
a = ansible_common.AnsibleCommon({})
- try:
+ self.addCleanup(self._delete_tmpdir, d)
+ with mock.patch.object(a, '_exec_get_sut_info_cmd'):
a.get_sut_info(d)
- finally:
- shutil.rmtree(d)
def test_get_sut_info_not_exist(self):
a = ansible_common.AnsibleCommon({})
- try:
+ with self.assertRaises(OSError):
a.get_sut_info('/hello/world')
- except OSError:
- pass