summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorhelenyao <yaohelan@huawei.com>2016-12-07 21:24:24 -0500
committerhelenyao <yaohelan@huawei.com>2016-12-14 04:06:25 -0500
commit703d4477d2385d3c654cdac549b936c08b9c5b31 (patch)
tree4305d2b9c36c74253336e112b94e385fd2afc77b
parent7bb9e652122482a31992901c9eae7429624aabdf (diff)
Authenticate clients with keystoneauth1.session
JIRA: FUNCTEST-529 1. use keystoneauth1.session to initialize the client for each service The keystoneauth1.session.Session class was introduced into keystoneauth1 as an attempt to bring a unified interface to the various OpenStack clients that share common authentication and request parameters between a variety of services. 2. update ODL case to leverage session to get the endpoint info Change-Id: Ic8c01b9b7ed86d3bdd9f5125504bc47f46a37700 Signed-off-by: helenyao <yaohelan@huawei.com>
-rw-r--r--functest/ci/run_tests.py1
-rwxr-xr-xfunctest/opnfv_tests/sdn/odl/odl.py7
-rwxr-xr-x[-rw-r--r--]functest/tests/unit/odl/test_odl.py43
-rwxr-xr-xfunctest/utils/openstack_clean.py12
-rwxr-xr-xfunctest/utils/openstack_snapshot.py4
-rwxr-xr-xfunctest/utils/openstack_utils.py161
-rwxr-xr-xrun_unit_tests.sh1
7 files changed, 127 insertions, 102 deletions
diff --git a/functest/ci/run_tests.py b/functest/ci/run_tests.py
index 3f02c872..d6991f66 100644
--- a/functest/ci/run_tests.py
+++ b/functest/ci/run_tests.py
@@ -124,6 +124,7 @@ def run_test(test, tier_name):
logger.info("Running test case '%s'..." % test_name)
print_separator("=")
logger.debug("\n%s" % test)
+ source_rc_file()
if GlobalVariables.CLEAN_FLAG:
generate_os_snapshot()
diff --git a/functest/opnfv_tests/sdn/odl/odl.py b/functest/opnfv_tests/sdn/odl/odl.py
index 95440746..bc9f1bb7 100755
--- a/functest/opnfv_tests/sdn/odl/odl.py
+++ b/functest/opnfv_tests/sdn/odl/odl.py
@@ -146,11 +146,8 @@ class ODLTests(testcase_base.TestcaseBase):
def run(self):
try:
- kclient = op_utils.get_keystone_client()
- keystone_url = kclient.service_catalog.url_for(
- service_type='identity', endpoint_type='publicURL')
- neutron_url = kclient.service_catalog.url_for(
- service_type='network', endpoint_type='publicURL')
+ keystone_url = op_utils.get_endpoint(service_type='identity')
+ neutron_url = op_utils.get_endpoint(service_type='network')
kwargs = {'keystoneip': urlparse.urlparse(keystone_url).hostname}
kwargs['neutronip'] = urlparse.urlparse(neutron_url).hostname
kwargs['odlip'] = kwargs['neutronip']
diff --git a/functest/tests/unit/odl/test_odl.py b/functest/tests/unit/odl/test_odl.py
index ef18016b..c29bfd7f 100644..100755
--- a/functest/tests/unit/odl/test_odl.py
+++ b/functest/tests/unit/odl/test_odl.py
@@ -59,14 +59,6 @@ class ODLTesting(unittest.TestCase):
else:
return None
- @classmethod
- def _get_fake_keystone_client(cls):
- kclient = mock.Mock()
- kclient.service_catalog = mock.Mock()
- kclient.service_catalog.url_for = mock.Mock(
- side_effect=cls._fake_url_for)
- return kclient
-
def _get_main_kwargs(self, key=None):
kwargs = {'odlusername': self._odl_username,
'odlpassword': self._odl_password,
@@ -247,8 +239,9 @@ class ODLTesting(unittest.TestCase):
def _test_run(self, status=testcase_base.TestcaseBase.EX_OK,
exception=None, odlip="127.0.0.3", odlwebport="8080"):
- with mock.patch('functest.utils.openstack_utils.get_keystone_client',
- return_value=self._get_fake_keystone_client()):
+ with mock.patch('functest.utils.openstack_utils.get_endpoint',
+ side_effect=[self._fake_url_for('identity'),
+ self._fake_url_for('network')]):
if exception:
self.test.main = mock.Mock(side_effect=exception)
else:
@@ -288,12 +281,10 @@ class ODLTesting(unittest.TestCase):
odlwebport=self._odl_webport)
def test_run_missing_sdn_controller_ip(self):
- with mock.patch('functest.utils.openstack_utils.get_keystone_client',
- return_value=self._get_fake_keystone_client()):
- ft_constants.CI_INSTALLER_TYPE = None
- ft_constants.SDN_CONTROLLER_IP = None
- self.assertEqual(self.test.run(),
- testcase_base.TestcaseBase.EX_RUN_ERROR)
+ ft_constants.CI_INSTALLER_TYPE = None
+ ft_constants.SDN_CONTROLLER_IP = None
+ self.assertEqual(self.test.run(),
+ testcase_base.TestcaseBase.EX_RUN_ERROR)
def test_run_without_installer_type(self):
ft_constants.SDN_CONTROLLER_IP = self._sdn_controller_ip
@@ -308,12 +299,10 @@ class ODLTesting(unittest.TestCase):
odlip=self._neutron_ip, odlwebport='8282')
def test_run_apex_missing_sdn_controller_ip(self):
- with mock.patch('functest.utils.openstack_utils.get_keystone_client',
- return_value=self._get_fake_keystone_client()):
- ft_constants.CI_INSTALLER_TYPE = "apex"
- ft_constants.SDN_CONTROLLER_IP = None
- self.assertEqual(self.test.run(),
- testcase_base.TestcaseBase.EX_RUN_ERROR)
+ ft_constants.CI_INSTALLER_TYPE = "apex"
+ ft_constants.SDN_CONTROLLER_IP = None
+ self.assertEqual(self.test.run(),
+ testcase_base.TestcaseBase.EX_RUN_ERROR)
def test_run_apex(self):
ft_constants.SDN_CONTROLLER_IP = self._sdn_controller_ip
@@ -322,12 +311,10 @@ class ODLTesting(unittest.TestCase):
odlip=self._sdn_controller_ip, odlwebport='8181')
def test_run_joid_missing_sdn_controller(self):
- with mock.patch('functest.utils.openstack_utils.get_keystone_client',
- return_value=self._get_fake_keystone_client()):
- ft_constants.CI_INSTALLER_TYPE = "joid"
- ft_constants.SDN_CONTROLLER = None
- self.assertEqual(self.test.run(),
- testcase_base.TestcaseBase.EX_RUN_ERROR)
+ ft_constants.CI_INSTALLER_TYPE = "joid"
+ ft_constants.SDN_CONTROLLER = None
+ self.assertEqual(self.test.run(),
+ testcase_base.TestcaseBase.EX_RUN_ERROR)
def test_run_joid(self):
ft_constants.SDN_CONTROLLER = self._sdn_controller_ip
diff --git a/functest/utils/openstack_clean.py b/functest/utils/openstack_clean.py
index 949eee90..c08568bd 100755
--- a/functest/utils/openstack_clean.py
+++ b/functest/utils/openstack_clean.py
@@ -9,6 +9,8 @@
# - Neutron networks, subnets and ports
# - Routers
# - Users and tenants
+# - Tacker VNFDs and VNFs
+# - Tacker SFCs and SFC classifiers
#
# Author:
# jose.lausuch@ericsson.com
@@ -105,7 +107,7 @@ def remove_volumes(cinder_client, default_volumes):
for volume in volumes:
volume_id = getattr(volume, 'id')
- volume_name = getattr(volume, 'display_name')
+ volume_name = getattr(volume, 'name')
logger.debug("'%s', ID=%s " % (volume_name, volume_id))
if (volume_id not in default_volumes and
volume_name not in default_volumes.values()):
@@ -393,7 +395,7 @@ def main():
default_security_groups = snapshot_yaml.get('secgroups')
default_floatingips = snapshot_yaml.get('floatingips')
default_users = snapshot_yaml.get('users')
- default_tenants = snapshot_yaml.get('tenants')
+ # default_tenants = snapshot_yaml.get('tenants')
if not os_utils.check_credentials():
logger.error("Please source the openrc credentials and run "
@@ -414,8 +416,10 @@ def main():
separator()
remove_users(keystone_client, default_users)
separator()
- remove_tenants(keystone_client, default_tenants)
- separator()
+ # TODO (Helen) tenant does not exist in V3
+ # need to figure our anohter general verification point
+ # remove_tenants(keystone_client, default_tenants)
+ # separator()
if __name__ == '__main__':
diff --git a/functest/utils/openstack_snapshot.py b/functest/utils/openstack_snapshot.py
index 4be1af44..5b50ffa5 100755
--- a/functest/utils/openstack_snapshot.py
+++ b/functest/utils/openstack_snapshot.py
@@ -62,7 +62,7 @@ def get_volumes(cinder_client):
volumes = os_utils.get_volumes(cinder_client)
if volumes is not None:
for volume in volumes:
- dic_volumes.update({volume.id: volume.display_name})
+ dic_volumes.update({volume.id: volume.name})
return {'volumes': dic_volumes}
@@ -149,7 +149,7 @@ def main():
snapshot.update(get_security_groups(neutron_client))
snapshot.update(get_floatinips(nova_client))
snapshot.update(get_users(keystone_client))
- snapshot.update(get_tenants(keystone_client))
+ # snapshot.update(get_tenants(keystone_client))
with open(OS_SNAPSHOT_FILE, 'w+') as yaml_file:
yaml_file.write(yaml.safe_dump(snapshot, default_flow_style=False))
diff --git a/functest/utils/openstack_utils.py b/functest/utils/openstack_utils.py
index 8f698d73..711a62f6 100755
--- a/functest/utils/openstack_utils.py
+++ b/functest/utils/openstack_utils.py
@@ -14,16 +14,21 @@ import subprocess
import sys
import time
+from keystoneauth1 import loading
+from keystoneauth1 import session
from cinderclient import client as cinderclient
-import functest.utils.functest_logger as ft_logger
-import functest.utils.functest_utils as ft_utils
from glanceclient import client as glanceclient
-from keystoneclient.v2_0 import client as keystoneclient
-from neutronclient.v2_0 import client as neutronclient
from novaclient import client as novaclient
+from keystoneclient import client as keystoneclient
+from neutronclient.neutron import client as neutronclient
+
+import functest.utils.functest_logger as ft_logger
+import functest.utils.functest_utils as ft_utils
logger = ft_logger.Logger("openstack_utils").getLogger()
+DEFAULT_API_VERSION = '2'
+
# *********************************************
# CREDENTIALS
@@ -45,14 +50,8 @@ def check_credentials():
return all(map(lambda v: v in os.environ and os.environ[v], env_vars))
-def get_credentials(service):
- """Returns a creds dictionary filled with the following keys:
- * username
- * password/api_key (depending on the service)
- * tenant_name/project_id (depending on the service)
- * auth_url
- :param service: a string indicating the name of the service
- requesting the credentials.
+def get_credentials():
+ """Returns a creds dictionary filled with parsed from env
"""
creds = {}
@@ -73,19 +72,11 @@ def get_credentials(service):
if os.getenv(envvar) is None:
raise MissingEnvVar(envvar)
- # Unfortunately, each of the OpenStack client will request slightly
- # different entries in their credentials dict.
- if service.lower() in ("nova", "cinder"):
- password = "api_key"
- tenant = "project_id"
- else:
- password = "password"
-
# The most common way to pass these info to the script is to do it through
# environment variables.
creds.update({
"username": os.environ.get("OS_USERNAME"),
- password: os.environ.get("OS_PASSWORD"),
+ "password": os.environ.get("OS_PASSWORD"),
"auth_url": os.environ.get("OS_AUTH_URL"),
tenant: os.environ.get(tenant_env)
})
@@ -132,7 +123,7 @@ def source_credentials(rc_file):
def get_credentials_for_rally():
- creds = get_credentials("keystone")
+ creds = get_credentials()
keystone_api_version = os.getenv('OS_IDENTITY_API_VERSION')
if (keystone_api_version is None or
keystone_api_version == '2'):
@@ -156,42 +147,95 @@ def get_credentials_for_rally():
return rally_conf
+def get_session_auth():
+ loader = loading.get_plugin_loader('password')
+ creds = get_credentials()
+ auth = loader.load_from_options(**creds)
+ return auth
+
+
+def get_endpoint(service_type, endpoint_type='publicURL'):
+ auth = get_session_auth()
+ return get_session().get_endpoint(auth=auth,
+ service_type=service_type,
+ endpoint_type=endpoint_type)
+
+
+def get_session():
+ auth = get_session_auth()
+ return session.Session(auth=auth)
+
+
# *********************************************
# CLIENTS
# *********************************************
+def get_keystone_client_version():
+ api_version = os.getenv('OS_IDENTITY_API_VERSION')
+ if api_version is not None:
+ logger.info("OS_IDENTITY_API_VERSION is set in env as '%s'",
+ api_version)
+ return api_version
+ return DEFAULT_API_VERSION
+
+
def get_keystone_client():
- creds_keystone = get_credentials("keystone")
- return keystoneclient.Client(**creds_keystone)
+ sess = get_session()
+ return keystoneclient.Client(get_keystone_client_version(), session=sess)
+
+
+def get_nova_client_version():
+ api_version = os.getenv('OS_COMPUTE_API_VERSION')
+ if api_version is not None:
+ logger.info("OS_COMPUTE_API_VERSION is set in env as '%s'",
+ api_version)
+ return api_version
+ return DEFAULT_API_VERSION
def get_nova_client():
- creds_nova = get_credentials("nova")
- return novaclient.Client('2', **creds_nova)
+ sess = get_session()
+ return novaclient.Client(get_nova_client_version(), session=sess)
+
+
+def get_cinder_client_version():
+ api_version = os.getenv('OS_VOLUME_API_VERSION')
+ if api_version is not None:
+ logger.info("OS_VOLUME_API_VERSION is set in env as '%s'",
+ api_version)
+ return api_version
+ return DEFAULT_API_VERSION
def get_cinder_client():
- creds_cinder = get_credentials("cinder")
- creds_cinder.update({
- "service_type": "volume"
- })
- return cinderclient.Client('2', **creds_cinder)
+ sess = get_session()
+ return cinderclient.Client(get_cinder_client_version(), session=sess)
+
+
+def get_neutron_client_version():
+ api_version = os.getenv('OS_NETWORK_API_VERSION')
+ if api_version is not None:
+ logger.info("OS_NETWORK_API_VERSION is set in env as '%s'",
+ api_version)
+ return api_version
+ return DEFAULT_API_VERSION
def get_neutron_client():
- creds_neutron = get_credentials("neutron")
- return neutronclient.Client(**creds_neutron)
+ sess = get_session()
+ return neutronclient.Client(get_neutron_client_version(), session=sess)
+
+
+def get_glance_client_version():
+ api_version = os.getenv('OS_IMAGE_API_VERSION')
+ if api_version is not None:
+ logger.info("OS_IMAGE_API_VERSION is set in env as '%s'", api_version)
+ return api_version
+ return DEFAULT_API_VERSION
def get_glance_client():
- keystone_client = get_keystone_client()
- glance_endpoint_type = 'publicURL'
- os_endpoint_type = os.getenv('OS_ENDPOINT_TYPE')
- if os_endpoint_type is not None:
- glance_endpoint_type = os_endpoint_type
- glance_endpoint = keystone_client.service_catalog.url_for(
- service_type='image', endpoint_type=glance_endpoint_type)
- return glanceclient.Client(1, glance_endpoint,
- token=keystone_client.auth_token)
+ sess = get_session()
+ return glanceclient.Client(get_glance_client_version(), session=sess)
# *********************************************
@@ -1070,38 +1114,29 @@ def get_image_id(glance_client, image_name):
def create_glance_image(glance_client, image_name, file_path, disk="qcow2",
- container="bare", public=True):
+ container="bare", public="public"):
if not os.path.isfile(file_path):
logger.error("Error: file %s does not exist." % file_path)
return None
try:
image_id = get_image_id(glance_client, image_name)
if image_id != '':
- if logger:
- logger.info("Image %s already exists." % image_name)
+ logger.info("Image %s already exists." % image_name)
else:
- if logger:
- logger.info("Creating image '%s' from '%s'..." % (image_name,
- file_path))
- try:
- properties = ft_utils.get_functest_config(
- 'general.image_properties')
- except ValueError:
- # image properties are not configured
- # therefore don't add any properties
- properties = {}
- with open(file_path) as fimage:
- image = glance_client.images.create(name=image_name,
- is_public=public,
- disk_format=disk,
- container_format=container,
- properties=properties,
- data=fimage)
+ logger.info("Creating image '%s' from '%s'..." % (image_name,
+ file_path))
+
+ image = glance_client.images.create(name=image_name,
+ visibility=public,
+ disk_format=disk,
+ container_format=container)
image_id = image.id
+ with open(file_path) as image_data:
+ glance_client.images.upload(image_id, image_data)
return image_id
except Exception, e:
logger.error("Error [create_glance_image(glance_client, '%s', '%s', "
- "'%s')]: %s" % (image_name, file_path, str(public), e))
+ "'%s')]: %s" % (image_name, file_path, public, e))
return None
diff --git a/run_unit_tests.sh b/run_unit_tests.sh
index ecd57d8a..71d21c9d 100755
--- a/run_unit_tests.sh
+++ b/run_unit_tests.sh
@@ -53,6 +53,7 @@ export CONFIG_FUNCTEST_YAML=$(pwd)/functest/ci/config_functest.yaml
nosetests --with-xunit \
--with-coverage \
--cover-erase \
+ --cover-tests \
--cover-package=functest.core.testcase_base \
--cover-package=functest.opnfv_tests.sdn.odl.odl \
--cover-xml \
span>(visitor.get_data()) def test_visitor(self): visitor = odl.ODLResultVisitor() data = {'name': 'foo', 'parent': 'bar', 'status': 'PASS', 'starttime': "20161216 16:00:00.000", 'endtime': "20161216 16:00:01.000", 'elapsedtime': 1000, 'text': 'Hello, World!', 'critical': True} test = testcase.TestCase(name=data['name'], status=data['status'], message=data['text'], starttime=data['starttime'], endtime=data['endtime']) test.parent = mock.Mock() config = {'name': data['parent'], 'criticality.test_is_critical.return_value': data[ 'critical']} test.parent.configure_mock(**config) visitor.visit_test(test) self.assertEqual(visitor.get_data(), [data]) @mock.patch('robot.api.ExecutionResult', side_effect=DataError) def test_parse_results_raises_exceptions(self, *args): with self.assertRaises(DataError): self.test.parse_results() def test_parse_results(self, *args): config = {'name': 'dummy', 'starttime': '20161216 16:00:00.000', 'endtime': '20161216 16:00:01.000', 'status': 'PASS'} suite = mock.Mock() suite.configure_mock(**config) with mock.patch('robot.api.ExecutionResult', return_value=mock.Mock(suite=suite)): self.test.parse_results() self.assertEqual(self.test.criteria, config['status']) self.assertEqual(self.test.start_time, timestamp_to_secs(config['starttime'])) self.assertEqual(self.test.stop_time, timestamp_to_secs(config['endtime'])) self.assertEqual(self.test.details, {'description': config['name'], 'tests': []}) @mock.patch('fileinput.input', side_effect=Exception()) def test_set_robotframework_vars_failed(self, *args): self.assertFalse(self.test.set_robotframework_vars()) @mock.patch('fileinput.input', return_value=[]) def test_set_robotframework_vars_empty(self, args): self.assertTrue(self.test.set_robotframework_vars()) @mock.patch('sys.stdout', new_callable=StringIO.StringIO) def _test_set_robotframework_vars(self, msg1, msg2, *args): line = mock.MagicMock() line.__iter__.return_value = [msg1] with mock.patch('fileinput.input', return_value=line) as mock_method: self.assertTrue(self.test.set_robotframework_vars()) mock_method.assert_called_once_with( os.path.join(odl.ODLTests.odl_test_repo, 'csit/variables/Variables.py'), inplace=True) self.assertEqual(args[0].getvalue(), "{}\n".format(msg2)) def test_set_robotframework_vars_auth_default(self): self._test_set_robotframework_vars("AUTH = []", "AUTH = [u'admin', u'admin']") def test_set_robotframework_vars_auth1(self): self._test_set_robotframework_vars("AUTH1 = []", "AUTH1 = []") @mock.patch('sys.stdout', new_callable=StringIO.StringIO) def test_set_robotframework_vars_auth_foo(self, *args): line = mock.MagicMock() line.__iter__.return_value = ["AUTH = []"] with mock.patch('fileinput.input', return_value=line) as mock_method: self.assertTrue(self.test.set_robotframework_vars('foo', 'bar')) mock_method.assert_called_once_with( os.path.join(odl.ODLTests.odl_test_repo, 'csit/variables/Variables.py'), inplace=True) self.assertEqual(args[0].getvalue(), "AUTH = [u'{}', u'{}']\n".format('foo', 'bar')) @classmethod def _fake_url_for(cls, service_type='identity', **kwargs): if service_type == 'identity': return "http://{}:5000/v2.0".format( ODLTesting._keystone_ip) elif service_type == 'network': return "http://{}:9696".format(ODLTesting._neutron_ip) else: return None def _get_main_kwargs(self, key=None): kwargs = {'odlusername': self._odl_username, 'odlpassword': self._odl_password, 'keystoneip': self._keystone_ip, 'neutronip': self._neutron_ip, 'osusername': self._os_username, 'ostenantname': self._os_tenantname, 'ospassword': self._os_password, 'odlip': self._sdn_controller_ip, 'odlwebport': self._odl_webport, 'odlrestconfport': self._odl_restconfport} if key: del kwargs[key] return kwargs def _test_main(self, status, *args): kwargs = self._get_main_kwargs() self.assertEqual(self.test.main(**kwargs), status) if len(args) > 0: args[0].assert_called_once_with( odl.ODLTests.res_dir) if len(args) > 1: variable = ['KEYSTONE:{}'.format(self._keystone_ip), 'NEUTRON:{}'.format(self._neutron_ip), 'OSUSERNAME:"{}"'.format(self._os_username), 'OSTENANTNAME:"{}"'.format(self._os_tenantname), 'OSPASSWORD:"{}"'.format(self._os_password), 'ODL_SYSTEM_IP:{}'.format(self._sdn_controller_ip), 'PORT:{}'.format(self._odl_webport), 'RESTCONFPORT:{}'.format(self._odl_restconfport)] args[1].assert_called_once_with( odl.ODLTests.basic_suite_dir, odl.ODLTests.neutron_suite_dir, log='NONE', output=os.path.join(odl.ODLTests.res_dir, 'output.xml'), report='NONE', stdout=mock.ANY, variable=variable) if len(args) > 2: args[2].assert_called_with( os.path.join(odl.ODLTests.res_dir, 'stdout.txt')) def _test_main_missing_keyword(self, key): kwargs = self._get_main_kwargs(key) self.assertEqual(self.test.main(**kwargs), testcase_base.TestcaseBase.EX_RUN_ERROR) def test_main_missing_odlusername(self): self._test_main_missing_keyword('odlusername') def test_main_missing_odlpassword(self): self._test_main_missing_keyword('odlpassword') def test_main_missing_keystoneip(self): self._test_main_missing_keyword('keystoneip') def test_main_missing_neutronip(self): self._test_main_missing_keyword('neutronip') def test_main_missing_osusername(self): self._test_main_missing_keyword('osusername') def test_main_missing_ostenantname(self): self._test_main_missing_keyword('ostenantname') def test_main_missing_ospassword(self): self._test_main_missing_keyword('ospassword') def test_main_missing_odlip(self): self._test_main_missing_keyword('odlip') def test_main_missing_odlwebport(self): self._test_main_missing_keyword('odlwebport') def test_main_missing_odlrestconfport(self): self._test_main_missing_keyword('odlrestconfport') def test_main_set_robotframework_vars_failed(self): with mock.patch.object(self.test, 'set_robotframework_vars', return_value=False): self._test_main(testcase_base.TestcaseBase.EX_RUN_ERROR) self.test.set_robotframework_vars.assert_called_once_with( self._odl_username, self._odl_password) @mock.patch('os.makedirs', side_effect=Exception) def test_main_makedirs_exception(self, mock_method): with mock.patch.object(self.test, 'set_robotframework_vars', return_value=True), \ self.assertRaises(Exception): self._test_main(testcase_base.TestcaseBase.EX_RUN_ERROR, mock_method) @mock.patch('os.makedirs', side_effect=OSError) def test_main_makedirs_oserror(self, mock_method): with mock.patch.object(self.test, 'set_robotframework_vars', return_value=True): self._test_main(testcase_base.TestcaseBase.EX_RUN_ERROR, mock_method) @mock.patch('robot.run', side_effect=RobotError) @mock.patch('os.makedirs') def test_main_robot_run_failed(self, *args): with mock.patch.object(self.test, 'set_robotframework_vars', return_value=True), \ mock.patch.object(odl, 'open', mock.mock_open(), create=True), \ self.assertRaises(RobotError): self._test_main(testcase_base.TestcaseBase.EX_RUN_ERROR, *args) @mock.patch('robot.run') @mock.patch('os.makedirs') def test_main_parse_results_failed(self, *args): with mock.patch.object(self.test, 'set_robotframework_vars', return_value=True), \ mock.patch.object(odl, 'open', mock.mock_open(), create=True), \ mock.patch.object(self.test, 'parse_results', side_effect=RobotError): self._test_main(testcase_base.TestcaseBase.EX_RUN_ERROR, *args) @mock.patch('os.remove', side_effect=Exception) @mock.patch('robot.run') @mock.patch('os.makedirs') def test_main_remove_exception(self, *args): with mock.patch.object(self.test, 'set_robotframework_vars', return_value=True), \ mock.patch.object(self.test, 'parse_results'), \ self.assertRaises(Exception): self._test_main(testcase_base.TestcaseBase.EX_OK, *args) @mock.patch('os.remove') @mock.patch('robot.run') @mock.patch('os.makedirs') def test_main(self, *args): with mock.patch.object(self.test, 'set_robotframework_vars', return_value=True), \ mock.patch.object(odl, 'open', mock.mock_open(), create=True), \ mock.patch.object(self.test, 'parse_results'): self._test_main(testcase_base.TestcaseBase.EX_OK, *args) @mock.patch('os.remove') @mock.patch('robot.run') @mock.patch('os.makedirs', side_effect=OSError(errno.EEXIST, '')) def test_main_makedirs_oserror17(self, *args): with mock.patch.object(self.test, 'set_robotframework_vars', return_value=True), \ mock.patch.object(odl, 'open', mock.mock_open(), create=True), \ mock.patch.object(self.test, 'parse_results'): self._test_main(testcase_base.TestcaseBase.EX_OK, *args) @mock.patch('os.remove') @mock.patch('robot.run', return_value=1) @mock.patch('os.makedirs') def test_main_testcases_in_failure(self, *args): with mock.patch.object(self.test, 'set_robotframework_vars', return_value=True), \ mock.patch.object(odl, 'open', mock.mock_open(), create=True), \ mock.patch.object(self.test, 'parse_results'): self._test_main(testcase_base.TestcaseBase.EX_OK, *args) @mock.patch('os.remove', side_effect=OSError) @mock.patch('robot.run') @mock.patch('os.makedirs') def test_main_remove_oserror(self, *args): with mock.patch.object(self.test, 'set_robotframework_vars', return_value=True), \ mock.patch.object(odl, 'open', mock.mock_open(), create=True), \ mock.patch.object(self.test, 'parse_results'): self._test_main(testcase_base.TestcaseBase.EX_OK, *args) def _test_run_missing_env_var(self, var): with mock.patch('functest.utils.openstack_utils.get_endpoint', side_effect=self._fake_url_for): del os.environ[var] self.assertEqual(self.test.run(), testcase_base.TestcaseBase.EX_RUN_ERROR) def _test_run(self, status=testcase_base.TestcaseBase.EX_OK, exception=None, odlip="127.0.0.3", odlwebport="8080", odlrestconfport="8181"): with mock.patch('functest.utils.openstack_utils.get_endpoint', side_effect=self._fake_url_for): if exception: self.test.main = mock.Mock(side_effect=exception) else: self.test.main = mock.Mock(return_value=status) self.assertEqual(self.test.run(), status) self.test.main.assert_called_once_with( odl.ODLTests.default_suites, keystoneip=self._keystone_ip, neutronip=self._neutron_ip, odlip=odlip, odlpassword=self._odl_password, odlrestconfport=odlrestconfport, odlusername=self._odl_username, odlwebport=odlwebport, ospassword=self._os_password, ostenantname=self._os_tenantname, osusername=self._os_username) def _test_run_defining_multiple_suites( self, suites, status=testcase_base.TestcaseBase.EX_OK, exception=None, odlip="127.0.0.3", odlwebport="8080", odlrestconfport="8181"): with mock.patch('functest.utils.openstack_utils.get_endpoint', side_effect=self._fake_url_for): if exception: self.test.main = mock.Mock(side_effect=exception) else: self.test.main = mock.Mock(return_value=status) self.assertEqual(self.test.run(suites=suites), status) self.test.main.assert_called_once_with( suites, keystoneip=self._keystone_ip, neutronip=self._neutron_ip, odlip=odlip, odlpassword=self._odl_password, odlrestconfport=odlrestconfport, odlusername=self._odl_username, odlwebport=odlwebport, ospassword=self._os_password, ostenantname=self._os_tenantname, osusername=self._os_username) def test_run_exception(self): with mock.patch('functest.utils.openstack_utils.get_endpoint', side_effect=auth_plugins.MissingAuthPlugin()): self.assertEqual(self.test.run(), testcase_base.TestcaseBase.EX_RUN_ERROR) def test_run_missing_os_username(self): self._test_run_missing_env_var("OS_USERNAME") def test_run_missing_os_password(self): self._test_run_missing_env_var("OS_PASSWORD") def test_run_missing_os_tenant_name(self): self._test_run_missing_env_var("OS_TENANT_NAME") def test_run_main_false(self): os.environ["SDN_CONTROLLER_IP"] = self._sdn_controller_ip self._test_run(testcase_base.TestcaseBase.EX_RUN_ERROR, odlip=self._sdn_controller_ip, odlwebport=self._odl_webport) def test_run_main_exception(self): with self.assertRaises(Exception): os.environ["SDN_CONTROLLER_IP"] = self._sdn_controller_ip self._test_run(status=testcase_base.TestcaseBase.EX_RUN_ERROR, exception=Exception(), odlip=self._sdn_controller_ip, odlwebport=self._odl_webport) def test_run_missing_sdn_controller_ip(self): with mock.patch('functest.utils.openstack_utils.get_endpoint', side_effect=self._fake_url_for): self.assertEqual(self.test.run(), testcase_base.TestcaseBase.EX_RUN_ERROR) def test_run_without_installer_type(self): os.environ["SDN_CONTROLLER_IP"] = self._sdn_controller_ip self._test_run(testcase_base.TestcaseBase.EX_OK, odlip=self._sdn_controller_ip, odlwebport=self._odl_webport) def test_run_redefining_suites(self): os.environ["SDN_CONTROLLER_IP"] = self._sdn_controller_ip self._test_run_defining_multiple_suites( [odl.ODLTests.basic_suite_dir], testcase_base.TestcaseBase.EX_OK, odlip=self._sdn_controller_ip, odlwebport=self._odl_webport) def test_run_fuel(self): os.environ["INSTALLER_TYPE"] = "fuel" self._test_run(testcase_base.TestcaseBase.EX_OK, odlip=self._neutron_ip, odlwebport='8282') def test_run_apex_missing_sdn_controller_ip(self): with mock.patch('functest.utils.openstack_utils.get_endpoint', side_effect=self._fake_url_for): os.environ["INSTALLER_TYPE"] = "apex" self.assertEqual(self.test.run(), testcase_base.TestcaseBase.EX_RUN_ERROR) def test_run_apex(self): os.environ["SDN_CONTROLLER_IP"] = self._sdn_controller_ip os.environ["INSTALLER_TYPE"] = "apex" self._test_run(testcase_base.TestcaseBase.EX_OK, odlip=self._sdn_controller_ip, odlwebport='8081', odlrestconfport='8081') def test_run_netvirt_missing_sdn_controller_ip(self): with mock.patch('functest.utils.openstack_utils.get_endpoint', side_effect=self._fake_url_for): os.environ["INSTALLER_TYPE"] = "netvirt" self.assertEqual(self.test.run(), testcase_base.TestcaseBase.EX_RUN_ERROR) def test_run_netvirt(self): os.environ["SDN_CONTROLLER_IP"] = self._sdn_controller_ip os.environ["INSTALLER_TYPE"] = "netvirt" self._test_run(testcase_base.TestcaseBase.EX_OK, odlip=self._sdn_controller_ip, odlwebport='8081', odlrestconfport='8081') def test_run_joid_missing_sdn_controller(self): with mock.patch('functest.utils.openstack_utils.get_endpoint', side_effect=self._fake_url_for): os.environ["INSTALLER_TYPE"] = "joid" self.assertEqual(self.test.run(), testcase_base.TestcaseBase.EX_RUN_ERROR) def test_run_joid(self): os.environ["SDN_CONTROLLER"] = self._sdn_controller_ip os.environ["INSTALLER_TYPE"] = "joid" self._test_run(testcase_base.TestcaseBase.EX_OK, odlip=self._sdn_controller_ip, odlwebport='8080') def test_run_compass(self, *args): os.environ["INSTALLER_TYPE"] = "compass" self._test_run(testcase_base.TestcaseBase.EX_OK, odlip=self._neutron_ip, odlwebport='8181') def test_argparser_default(self): parser = odl.ODLParser() self.assertEqual(parser.parse_args(), self.defaultargs) def test_argparser_basic(self): self.defaultargs['neutronip'] = self._neutron_ip self.defaultargs['odlip'] = self._sdn_controller_ip parser = odl.ODLParser() self.assertEqual(parser.parse_args( ["--neutronip={}".format(self._neutron_ip), "--odlip={}".format(self._sdn_controller_ip) ]), self.defaultargs) @mock.patch('sys.stderr', new_callable=StringIO.StringIO) def test_argparser_fail(self, *args): self.defaultargs['foo'] = 'bar' parser = odl.ODLParser() with self.assertRaises(SystemExit): parser.parse_args(["--foo=bar"]) def _test_argparser(self, arg, value): self.defaultargs[arg] = value parser = odl.ODLParser() self.assertEqual(parser.parse_args(["--{}={}".format(arg, value)]), self.defaultargs) def test_argparser_odlusername(self): self._test_argparser('odlusername', 'foo') def test_argparser_odlpassword(self): self._test_argparser('odlpassword', 'foo') def test_argparser_keystoneip(self): self._test_argparser('keystoneip', '127.0.0.4') def test_argparser_neutronip(self): self._test_argparser('neutronip', '127.0.0.4') def test_argparser_osusername(self): self._test_argparser('osusername', 'foo') def test_argparser_ostenantname(self): self._test_argparser('ostenantname', 'foo') def test_argparser_ospassword(self): self._test_argparser('ospassword', 'foo') def test_argparser_odlip(self): self._test_argparser('odlip', '127.0.0.4') def test_argparser_odlwebport(self): self._test_argparser('odlwebport', '80') def test_argparser_odlrestconfport(self): self._test_argparser('odlrestconfport', '80') def test_argparser_pushtodb(self): self.defaultargs['pushtodb'] = True parser = odl.ODLParser() self.assertEqual(parser.parse_args(["--{}".format('pushtodb')]), self.defaultargs) def test_argparser_multiple_args(self): self.defaultargs['neutronip'] = self._neutron_ip self.defaultargs['odlip'] = self._sdn_controller_ip parser = odl.ODLParser() self.assertEqual(parser.parse_args( ["--neutronip={}".format(self._neutron_ip), "--odlip={}".format(self._sdn_controller_ip) ]), self.defaultargs) if __name__ == "__main__": unittest.main(verbosity=2)