diff options
Diffstat (limited to 'functest')
-rw-r--r-- | functest/tests/unit/utils/test_openstack_utils.py | 1784 | ||||
-rw-r--r-- | functest/utils/openstack_utils.py | 1486 |
2 files changed, 0 insertions, 3270 deletions
diff --git a/functest/tests/unit/utils/test_openstack_utils.py b/functest/tests/unit/utils/test_openstack_utils.py deleted file mode 100644 index 259e60d5..00000000 --- a/functest/tests/unit/utils/test_openstack_utils.py +++ /dev/null @@ -1,1784 +0,0 @@ -#!/usr/bin/env python - -# All rights reserved. This program and the accompanying materials -# are made available under the terms of the Apache License, Version 2.0 -# which accompanies this distribution, and is available at -# http://www.apache.org/licenses/LICENSE-2.0 - -# pylint: disable=missing-docstring - -import copy -import logging -import os -import unittest - -import mock - -from functest.utils import openstack_utils - - -class OSUtilsTesting(unittest.TestCase): - - def _get_env_cred_dict(self, os_prefix=''): - return {'OS_USERNAME': os_prefix + 'username', - 'OS_PASSWORD': os_prefix + 'password', - 'OS_AUTH_URL': os_prefix + 'auth_url', - 'OS_TENANT_NAME': os_prefix + 'tenant_name', - 'OS_USER_DOMAIN_NAME': os_prefix + 'user_domain_name', - 'OS_PROJECT_DOMAIN_NAME': os_prefix + 'project_domain_name', - 'OS_PROJECT_NAME': os_prefix + 'project_name', - 'OS_ENDPOINT_TYPE': os_prefix + 'endpoint_type', - 'OS_REGION_NAME': os_prefix + 'region_name', - 'OS_CACERT': os_prefix + 'https_cacert', - 'OS_INSECURE': os_prefix + 'https_insecure'} - - def _get_os_env_vars(self): - return {'username': 'test_username', 'password': 'test_password', - 'auth_url': 'test_auth_url', 'tenant_name': 'test_tenant_name', - 'user_domain_name': 'test_user_domain_name', - 'project_domain_name': 'test_project_domain_name', - 'project_name': 'test_project_name', - 'endpoint_type': 'test_endpoint_type', - 'region_name': 'test_region_name', - 'https_cacert': 'test_https_cacert', - 'https_insecure': 'test_https_insecure'} - - def setUp(self): - self.env_vars = ['OS_AUTH_URL', 'OS_USERNAME', 'OS_PASSWORD'] - self.tenant_name = 'test_tenant_name' - self.env_cred_dict = self._get_env_cred_dict() - self.os_environs = self._get_env_cred_dict(os_prefix='test_') - self.os_env_vars = self._get_os_env_vars() - - mock_obj = mock.Mock() - attrs = {'name': 'test_flavor', - 'id': 'flavor_id', - 'ram': 2} - mock_obj.configure_mock(**attrs) - self.flavor = mock_obj - - mock_obj = mock.Mock() - attrs = {'name': 'test_aggregate', - 'id': 'aggregate_id', - 'hosts': ['host_name']} - mock_obj.configure_mock(**attrs) - self.aggregate = mock_obj - - mock_obj = mock.Mock() - attrs = {'id': 'instance_id', - 'name': 'test_instance', - 'status': 'ok'} - mock_obj.configure_mock(**attrs) - self.instance = mock_obj - - mock_obj = mock.Mock() - attrs = {'id': 'azone_id', - 'zoneName': 'test_azone', - 'status': 'ok'} - mock_obj.configure_mock(**attrs) - self.availability_zone = mock_obj - - mock_obj = mock.Mock() - attrs = {'floating_network_id': 'floating_id', - 'floating_ip_address': 'test_floating_ip'} - mock_obj.configure_mock(**attrs) - self.floating_ip = mock_obj - - mock_obj = mock.Mock() - attrs = {'id': 'hypervisor_id', - 'hypervisor_hostname': 'test_hostname', - 'state': 'up'} - mock_obj.configure_mock(**attrs) - self.hypervisor = mock_obj - - mock_obj = mock.Mock() - attrs = {'id': 'image_id', - 'name': 'test_image'} - mock_obj.configure_mock(**attrs) - self.image = mock_obj - - mock_obj = mock.Mock() - self.mock_return = mock_obj - - self.nova_client = mock.Mock() - attrs = {'servers.list.return_value': [self.instance], - 'servers.get.return_value': self.instance, - 'servers.find.return_value': self.instance, - 'servers.create.return_value': self.instance, - 'flavors.list.return_value': [self.flavor], - 'flavors.find.return_value': self.flavor, - 'servers.add_floating_ip.return_value': mock.Mock(), - 'servers.force_delete.return_value': mock.Mock(), - 'aggregates.list.return_value': [self.aggregate], - 'aggregates.add_host.return_value': mock.Mock(), - 'aggregates.remove_host.return_value': mock.Mock(), - 'aggregates.get.return_value': self.aggregate, - 'aggregates.delete.return_value': mock.Mock(), - 'availability_zones.list.return_value': - [self.availability_zone], - 'hypervisors.list.return_value': [self.hypervisor], - 'create.return_value': mock.Mock(), - 'add_security_group.return_value': mock.Mock(), - 'images.list.return_value': [self.image], - 'images.delete.return_value': mock.Mock(), - } - self.nova_client.configure_mock(**attrs) - - self.glance_client = mock.Mock() - attrs = {'images.list.return_value': [self.image], - 'images.create.return_value': self.image, - 'images.upload.return_value': mock.Mock()} - self.glance_client.configure_mock(**attrs) - - mock_obj = mock.Mock() - attrs = {'id': 'volume_id', - 'name': 'test_volume'} - mock_obj.configure_mock(**attrs) - self.volume = mock_obj - - self.cinder_client = mock.Mock() - attrs = {'volumes.list.return_value': [self.volume], - 'quotas.update.return_value': mock.Mock(), - 'volumes.detach.return_value': mock.Mock(), - 'volumes.force_delete.return_value': mock.Mock(), - 'volumes.delete.return_value': mock.Mock() - } - self.cinder_client.configure_mock(**attrs) - - self.resource = mock.Mock() - attrs = {'id': 'resource_test_id', - 'name': 'resource_test_name' - } - - self.heat_client = mock.Mock() - attrs = {'resources.get.return_value': self.resource} - self.heat_client.configure_mock(**attrs) - - mock_obj = mock.Mock() - attrs = {'id': 'tenant_id', - 'name': 'test_tenant'} - mock_obj.configure_mock(**attrs) - self.tenant = mock_obj - - mock_obj = mock.Mock() - attrs = {'id': 'user_id', - 'name': 'test_user'} - mock_obj.configure_mock(**attrs) - self.user = mock_obj - - mock_obj = mock.Mock() - attrs = {'id': 'role_id', - 'name': 'test_role'} - mock_obj.configure_mock(**attrs) - self.role = mock_obj - - mock_obj = mock.Mock() - attrs = {'id': 'domain_id', - 'name': 'test_domain'} - mock_obj.configure_mock(**attrs) - self.domain = mock_obj - - self.keystone_client = mock.Mock() - attrs = {'projects.list.return_value': [self.tenant], - 'tenants.list.return_value': [self.tenant], - 'users.list.return_value': [self.user], - 'roles.list.return_value': [self.role], - 'domains.list.return_value': [self.domain], - 'projects.create.return_value': self.tenant, - 'tenants.create.return_value': self.tenant, - 'users.create.return_value': self.user, - 'roles.grant.return_value': mock.Mock(), - 'roles.add_user_role.return_value': mock.Mock(), - 'projects.delete.return_value': mock.Mock(), - 'tenants.delete.return_value': mock.Mock(), - 'users.delete.return_value': mock.Mock(), - } - self.keystone_client.configure_mock(**attrs) - - self.router = {'id': 'router_id', - 'name': 'test_router'} - - self.subnet = {'id': 'subnet_id', - 'name': 'test_subnet'} - - self.networks = [{'id': 'network_id', - 'name': 'test_network', - 'router:external': False, - 'shared': True, - 'subnets': [self.subnet]}, - {'id': 'network_id1', - 'name': 'test_network1', - 'router:external': True, - 'shared': True, - 'subnets': [self.subnet]}] - - self.port = {'id': 'port_id', - 'name': 'test_port'} - - self.sec_group = {'id': 'sec_group_id', - 'name': 'test_sec_group'} - - self.sec_group_rule = {'id': 'sec_group_rule_id', - 'direction': 'direction', - 'protocol': 'protocol', - 'port_range_max': 'port_max', - 'security_group_id': self.sec_group['id'], - 'port_range_min': 'port_min'} - self.neutron_floatingip = {'id': 'fip_id', - 'floating_ip_address': 'test_ip'} - self.neutron_client = mock.Mock() - attrs = {'list_networks.return_value': {'networks': self.networks}, - 'list_routers.return_value': {'routers': [self.router]}, - 'list_ports.return_value': {'ports': [self.port]}, - 'list_subnets.return_value': {'subnets': [self.subnet]}, - 'create_network.return_value': {'network': self.networks[0]}, - 'create_subnet.return_value': {'subnets': [self.subnet]}, - 'create_router.return_value': {'router': self.router}, - 'create_port.return_value': {'port': self.port}, - 'create_floatingip.return_value': {'floatingip': - self.neutron_floatingip}, - 'update_network.return_value': mock.Mock(), - 'update_port.return_value': {'port': self.port}, - 'add_interface_router.return_value': mock.Mock(), - 'add_gateway_router.return_value': mock.Mock(), - 'delete_network.return_value': mock.Mock(), - 'delete_subnet.return_value': mock.Mock(), - 'delete_router.return_value': mock.Mock(), - 'delete_port.return_value': mock.Mock(), - 'remove_interface_router.return_value': mock.Mock(), - 'remove_gateway_router.return_value': mock.Mock(), - 'list_security_groups.return_value': {'security_groups': - [self.sec_group]}, - 'list_security_group_rules.' - 'return_value': {'security_group_rules': - [self.sec_group_rule]}, - 'create_security_group_rule.return_value': mock.Mock(), - 'create_security_group.return_value': {'security_group': - self.sec_group}, - 'update_quota.return_value': mock.Mock(), - 'delete_security_group.return_value': mock.Mock(), - 'list_floatingips.return_value': {'floatingips': - [self.floating_ip]}, - 'delete_floatingip.return_value': mock.Mock(), - } - self.neutron_client.configure_mock(**attrs) - - self.empty_client = mock.Mock() - attrs = {'list_networks.return_value': {'networks': []}, - 'list_routers.return_value': {'routers': []}, - 'list_ports.return_value': {'ports': []}, - 'list_subnets.return_value': {'subnets': []}} - self.empty_client.configure_mock(**attrs) - - @mock.patch('functest.utils.openstack_utils.os.getenv', - return_value=None) - def test_is_keystone_v3_missing_identity(self, mock_os_getenv): - self.assertEqual(openstack_utils.is_keystone_v3(), False) - - @mock.patch('functest.utils.openstack_utils.os.getenv', - return_value='3') - def test_is_keystone_v3_default(self, mock_os_getenv): - self.assertEqual(openstack_utils.is_keystone_v3(), True) - - @mock.patch('functest.utils.openstack_utils.is_keystone_v3', - return_value=False) - def test_get_rc_env_vars_missing_identity(self, mock_get_rc_env): - exp_resp = self.env_vars - exp_resp.extend(['OS_TENANT_NAME']) - self.assertEqual(openstack_utils.get_rc_env_vars(), exp_resp) - - @mock.patch('functest.utils.openstack_utils.is_keystone_v3', - return_value=True) - def test_get_rc_env_vars_default(self, mock_get_rc_env): - exp_resp = self.env_vars - exp_resp.extend(['OS_PROJECT_NAME', - 'OS_USER_DOMAIN_NAME', - 'OS_PROJECT_DOMAIN_NAME']) - self.assertEqual(openstack_utils.get_rc_env_vars(), exp_resp) - - @mock.patch('functest.utils.openstack_utils.get_rc_env_vars') - def test_check_credentials_missing_env(self, mock_get_rc_env): - exp_resp = self.env_vars - exp_resp.extend(['OS_TENANT_NAME']) - mock_get_rc_env.return_value = exp_resp - with mock.patch.dict('functest.utils.openstack_utils.os.environ', {}, - clear=True): - self.assertEqual(openstack_utils.check_credentials(), False) - - @mock.patch('functest.utils.openstack_utils.get_rc_env_vars') - def test_check_credentials_default(self, mock_get_rc_env): - exp_resp = ['OS_TENANT_NAME'] - mock_get_rc_env.return_value = exp_resp - with mock.patch.dict('functest.utils.openstack_utils.os.environ', - {'OS_TENANT_NAME': self.tenant_name}, - clear=True): - self.assertEqual(openstack_utils.check_credentials(), True) - - def test_get_env_cred_dict(self): - self.assertDictEqual(openstack_utils.get_env_cred_dict(), - self.env_cred_dict) - - @mock.patch('functest.utils.openstack_utils.get_rc_env_vars') - def test_get_credentials_default(self, mock_get_rc_env): - mock_get_rc_env.return_value = self.env_cred_dict.keys() - with mock.patch.dict('functest.utils.openstack_utils.os.environ', - self.os_environs, - clear=True): - self.assertDictEqual(openstack_utils.get_credentials(), - self.os_env_vars) - - def _get_credentials_missing_env(self, var): - dic = copy.deepcopy(self.os_environs) - dic.pop(var) - with mock.patch('functest.utils.openstack_utils.get_rc_env_vars', - return_value=self.env_cred_dict.keys()), \ - mock.patch.dict('functest.utils.openstack_utils.os.environ', - dic, - clear=True): - self.assertRaises(openstack_utils.MissingEnvVar, - lambda: openstack_utils.get_credentials()) - - def test_get_credentials_missing_username(self): - self._get_credentials_missing_env('OS_USERNAME') - - def test_get_credentials_missing_password(self): - self._get_credentials_missing_env('OS_PASSWORD') - - def test_get_credentials_missing_auth_url(self): - self._get_credentials_missing_env('OS_AUTH_URL') - - def test_get_credentials_missing_tenantname(self): - self._get_credentials_missing_env('OS_TENANT_NAME') - - def test_get_credentials_missing_domainname(self): - self._get_credentials_missing_env('OS_USER_DOMAIN_NAME') - - def test_get_credentials_missing_projectname(self): - self._get_credentials_missing_env('OS_PROJECT_NAME') - - def test_get_credentials_missing_endpoint_type(self): - self._get_credentials_missing_env('OS_ENDPOINT_TYPE') - - @mock.patch('functest.utils.openstack_utils.os.getenv', - return_value=None) - def test_get_keystone_client_version_missing_env(self, mock_os_getenv): - self.assertEqual(openstack_utils.get_keystone_client_version(), - openstack_utils.DEFAULT_API_VERSION) - - @mock.patch('functest.utils.openstack_utils.logger.info') - @mock.patch('functest.utils.openstack_utils.os.getenv', - return_value='3') - def test_get_keystone_client_version_default(self, mock_os_getenv, - mock_logger_info): - self.assertEqual(openstack_utils.get_keystone_client_version(), - '3') - mock_logger_info.assert_called_once_with("OS_IDENTITY_API_VERSION is " - "set in env as '%s'", '3') - - @mock.patch('functest.utils.openstack_utils.get_session') - @mock.patch('functest.utils.openstack_utils.keystoneclient.Client') - @mock.patch('functest.utils.openstack_utils.get_keystone_client_version', - return_value='3') - @mock.patch('functest.utils.openstack_utils.os.getenv', - return_value='public') - def test_get_keystone_client_with_interface(self, mock_os_getenv, - mock_keystoneclient_version, - mock_key_client, - mock_get_session): - mock_keystone_obj = mock.Mock() - mock_session_obj = mock.Mock() - mock_key_client.return_value = mock_keystone_obj - mock_get_session.return_value = mock_session_obj - self.assertEqual(openstack_utils.get_keystone_client(), - mock_keystone_obj) - mock_key_client.assert_called_once_with('3', - session=mock_session_obj, - interface='public') - - @mock.patch('functest.utils.openstack_utils.get_session') - @mock.patch('functest.utils.openstack_utils.keystoneclient.Client') - @mock.patch('functest.utils.openstack_utils.get_keystone_client_version', - return_value='3') - @mock.patch('functest.utils.openstack_utils.os.getenv', - return_value='admin') - def test_get_keystone_client_no_interface(self, mock_os_getenv, - mock_keystoneclient_version, - mock_key_client, - mock_get_session): - mock_keystone_obj = mock.Mock() - mock_session_obj = mock.Mock() - mock_key_client.return_value = mock_keystone_obj - mock_get_session.return_value = mock_session_obj - self.assertEqual(openstack_utils.get_keystone_client(), - mock_keystone_obj) - mock_key_client.assert_called_once_with('3', - session=mock_session_obj, - interface='admin') - - @mock.patch('functest.utils.openstack_utils.os.getenv', - return_value=None) - def test_get_nova_client_version_missing_env(self, mock_os_getenv): - self.assertEqual(openstack_utils.get_nova_client_version(), - openstack_utils.DEFAULT_API_VERSION) - - @mock.patch('functest.utils.openstack_utils.logger.info') - @mock.patch('functest.utils.openstack_utils.os.getenv', - return_value='3') - def test_get_nova_client_version_default(self, mock_os_getenv, - mock_logger_info): - self.assertEqual(openstack_utils.get_nova_client_version(), - '3') - mock_logger_info.assert_called_once_with("OS_COMPUTE_API_VERSION is " - "set in env as '%s'", '3') - - def test_get_nova_client(self): - mock_nova_obj = mock.Mock() - mock_session_obj = mock.Mock() - with mock.patch('functest.utils.openstack_utils' - '.get_nova_client_version', return_value='3'), \ - mock.patch('functest.utils.openstack_utils' - '.novaclient.Client', - return_value=mock_nova_obj) \ - as mock_nova_client, \ - mock.patch('functest.utils.openstack_utils.get_session', - return_value=mock_session_obj): - self.assertEqual(openstack_utils.get_nova_client(), - mock_nova_obj) - mock_nova_client.assert_called_once_with('3', - session=mock_session_obj) - - @mock.patch('functest.utils.openstack_utils.os.getenv', - return_value=None) - def test_get_cinder_client_version_missing_env(self, mock_os_getenv): - self.assertEqual(openstack_utils.get_cinder_client_version(), - openstack_utils.DEFAULT_API_VERSION) - - @mock.patch('functest.utils.openstack_utils.logger.info') - @mock.patch('functest.utils.openstack_utils.os.getenv', - return_value='3') - def test_get_cinder_client_version_default(self, mock_os_getenv, - mock_logger_info): - self.assertEqual(openstack_utils.get_cinder_client_version(), - '3') - mock_logger_info.assert_called_once_with("OS_VOLUME_API_VERSION is " - "set in env as '%s'", '3') - - def test_get_cinder_client(self): - mock_cinder_obj = mock.Mock() - mock_session_obj = mock.Mock() - with mock.patch('functest.utils.openstack_utils' - '.get_cinder_client_version', return_value='3'), \ - mock.patch('functest.utils.openstack_utils' - '.cinderclient.Client', - return_value=mock_cinder_obj) \ - as mock_cind_client, \ - mock.patch('functest.utils.openstack_utils.get_session', - return_value=mock_session_obj): - self.assertEqual(openstack_utils.get_cinder_client(), - mock_cinder_obj) - mock_cind_client.assert_called_once_with('3', - session=mock_session_obj) - - @mock.patch('functest.utils.openstack_utils.os.getenv', - return_value=None) - def test_get_neutron_client_version_missing_env(self, mock_os_getenv): - self.assertEqual(openstack_utils.get_neutron_client_version(), - openstack_utils.DEFAULT_API_VERSION) - - @mock.patch('functest.utils.openstack_utils.logger.info') - @mock.patch('functest.utils.openstack_utils.os.getenv', - return_value='3') - def test_get_neutron_client_version_default(self, mock_os_getenv, - mock_logger_info): - self.assertEqual(openstack_utils.get_neutron_client_version(), - '3') - mock_logger_info.assert_called_once_with("OS_NETWORK_API_VERSION is " - "set in env as '%s'", '3') - - def test_get_neutron_client(self): - mock_neutron_obj = mock.Mock() - mock_session_obj = mock.Mock() - with mock.patch('functest.utils.openstack_utils' - '.get_neutron_client_version', return_value='3'), \ - mock.patch('functest.utils.openstack_utils' - '.neutronclient.Client', - return_value=mock_neutron_obj) \ - as mock_neut_client, \ - mock.patch('functest.utils.openstack_utils.get_session', - return_value=mock_session_obj): - self.assertEqual(openstack_utils.get_neutron_client(), - mock_neutron_obj) - mock_neut_client.assert_called_once_with('3', - session=mock_session_obj) - - @mock.patch('functest.utils.openstack_utils.os.getenv', - return_value=None) - def test_get_glance_client_version_missing_env(self, mock_os_getenv): - self.assertEqual(openstack_utils.get_glance_client_version(), - openstack_utils.DEFAULT_API_VERSION) - - @mock.patch('functest.utils.openstack_utils.logger.info') - @mock.patch('functest.utils.openstack_utils.os.getenv', - return_value='3') - def test_get_glance_client_version_default(self, mock_os_getenv, - mock_logger_info): - self.assertEqual(openstack_utils.get_glance_client_version(), - '3') - mock_logger_info.assert_called_once_with("OS_IMAGE_API_VERSION is " - "set in env as '%s'", '3') - - def test_get_glance_client(self): - mock_glance_obj = mock.Mock() - mock_session_obj = mock.Mock() - with mock.patch('functest.utils.openstack_utils' - '.get_glance_client_version', return_value='3'), \ - mock.patch('functest.utils.openstack_utils' - '.glanceclient.Client', - return_value=mock_glance_obj) \ - as mock_glan_client, \ - mock.patch('functest.utils.openstack_utils.get_session', - return_value=mock_session_obj): - self.assertEqual(openstack_utils.get_glance_client(), - mock_glance_obj) - mock_glan_client.assert_called_once_with('3', - session=mock_session_obj) - - @mock.patch('functest.utils.openstack_utils.os.getenv', - return_value=None) - def test_get_heat_client_version_missing_env(self, mock_os_getenv): - self.assertEqual(openstack_utils.get_heat_client_version(), - openstack_utils.DEFAULT_HEAT_API_VERSION) - - @mock.patch('functest.utils.openstack_utils.logger.info') - @mock.patch('functest.utils.openstack_utils.os.getenv', return_value='1') - def test_get_heat_client_version_default(self, mock_os_getenv, - mock_logger_info): - self.assertEqual(openstack_utils.get_heat_client_version(), '1') - mock_logger_info.assert_called_once_with( - "OS_ORCHESTRATION_API_VERSION is set in env as '%s'", '1') - - def test_get_heat_client(self): - mock_heat_obj = mock.Mock() - mock_session_obj = mock.Mock() - with mock.patch('functest.utils.openstack_utils' - '.get_heat_client_version', return_value='1'), \ - mock.patch('functest.utils.openstack_utils' - '.heatclient.Client', - return_value=mock_heat_obj) \ - as mock_heat_client, \ - mock.patch('functest.utils.openstack_utils.get_session', - return_value=mock_session_obj): - self.assertEqual(openstack_utils.get_heat_client(), - mock_heat_obj) - mock_heat_client.assert_called_once_with('1', - session=mock_session_obj) - - def test_get_instances_default(self): - self.assertEqual(openstack_utils.get_instances(self.nova_client), - [self.instance]) - - @mock.patch('functest.utils.openstack_utils.logger.error') - def test_get_instances_exception(self, mock_logger_error): - self.assertEqual(openstack_utils. - get_instances(Exception), - None) - self.assertTrue(mock_logger_error.called) - - def test_get_instance_status_default(self): - self.assertEqual(openstack_utils.get_instance_status(self.nova_client, - self.instance), - 'ok') - - @mock.patch('functest.utils.openstack_utils.logger.error') - def test_get_instance_status_exception(self, mock_logger_error): - self.assertEqual(openstack_utils. - get_instance_status(Exception, - self.instance), - None) - self.assertTrue(mock_logger_error.called) - - def test_get_instance_by_name_default(self): - self.assertEqual(openstack_utils. - get_instance_by_name(self.nova_client, - 'test_instance'), - self.instance) - - @mock.patch('functest.utils.openstack_utils.logger.error') - def test_get_instance_by_name_exception(self, mock_logger_error): - self.assertEqual(openstack_utils. - get_instance_by_name(Exception, - 'test_instance'), - None) - self.assertTrue(mock_logger_error.called) - - def test_get_flavor_id_default(self): - self.assertEqual(openstack_utils. - get_flavor_id(self.nova_client, - 'test_flavor'), - self.flavor.id) - - def test_get_flavor_id_by_ram_range_default(self): - self.assertEqual(openstack_utils. - get_flavor_id_by_ram_range(self.nova_client, - 1, 3), - self.flavor.id) - - def test_get_aggregates_default(self): - self.assertEqual(openstack_utils. - get_aggregates(self.nova_client), - [self.aggregate]) - - @mock.patch('functest.utils.openstack_utils.logger.error') - def test_get_aggregates_exception(self, mock_logger_error): - self.assertEqual(openstack_utils. - get_aggregates(Exception), - None) - self.assertTrue(mock_logger_error.called) - - def test_get_aggregate_id_default(self): - with mock.patch('functest.utils.openstack_utils.get_aggregates', - return_value=[self.aggregate]): - self.assertEqual(openstack_utils. - get_aggregate_id(self.nova_client, - 'test_aggregate'), - 'aggregate_id') - - @mock.patch('functest.utils.openstack_utils.logger.error') - def test_get_aggregate_id_exception(self, mock_logger_error): - self.assertEqual(openstack_utils. - get_aggregate_id(Exception, - 'test_aggregate'), - None) - self.assertTrue(mock_logger_error.called) - - def test_get_availability_zone_names_default(self): - with mock.patch('functest.utils.openstack_utils' - '.get_availability_zones', - return_value=[self.availability_zone]): - self.assertEqual(openstack_utils. - get_availability_zone_names(self.nova_client), - ['test_azone']) - - @mock.patch('functest.utils.openstack_utils.logger.error') - def test_get_availability_zone_names_exception(self, mock_logger_error): - self.assertEqual(openstack_utils. - get_availability_zone_names(Exception), - None) - self.assertTrue(mock_logger_error.called) - - def test_get_availability_zones_default(self): - self.assertEqual(openstack_utils. - get_availability_zones(self.nova_client), - [self.availability_zone]) - - @mock.patch('functest.utils.openstack_utils.logger.error') - def test_get_availability_zones_exception(self, mock_logger_error): - self.assertEqual(openstack_utils. - get_availability_zones(Exception), - None) - self.assertTrue(mock_logger_error.called) - - def test_get_floating_ips_default(self): - self.assertEqual(openstack_utils. - get_floating_ips(self.neutron_client), - [self.floating_ip]) - - @mock.patch('functest.utils.openstack_utils.logger.error') - def test_get_floating_ips_exception(self, mock_logger_error): - self.assertEqual(openstack_utils. - get_floating_ips(Exception), - None) - self.assertTrue(mock_logger_error.called) - - def test_get_hypervisors_default(self): - self.assertEqual(openstack_utils. - get_hypervisors(self.nova_client), - ['test_hostname']) - - @mock.patch('functest.utils.openstack_utils.logger.error') - def test_get_hypervisors_exception(self, mock_logger_error): - self.assertEqual(openstack_utils. - get_hypervisors(Exception), - None) - self.assertTrue(mock_logger_error.called) - - def test_create_aggregate_default(self): - self.assertTrue(openstack_utils. - create_aggregate(self.nova_client, - 'test_aggregate', - 'azone')) - - @mock.patch('functest.utils.openstack_utils.logger.error') - def test_create_aggregate_exception(self, mock_logger_error): - self.assertEqual(openstack_utils. - create_aggregate(Exception, - 'test_aggregate', - 'azone'), - None) - self.assertTrue(mock_logger_error.called) - - def test_add_host_to_aggregate_default(self): - with mock.patch('functest.utils.openstack_utils.get_aggregate_id'): - self.assertTrue(openstack_utils. - add_host_to_aggregate(self.nova_client, - 'test_aggregate', - 'test_hostname')) - - @mock.patch('functest.utils.openstack_utils.logger.error') - def test_add_host_to_aggregate_exception(self, mock_logger_error): - self.assertEqual(openstack_utils. - add_host_to_aggregate(Exception, - 'test_aggregate', - 'test_hostname'), - None) - self.assertTrue(mock_logger_error.called) - - def test_create_aggregate_with_host_default(self): - with mock.patch('functest.utils.openstack_utils.create_aggregate'), \ - mock.patch('functest.utils.openstack_utils.' - 'add_host_to_aggregate'): - self.assertTrue(openstack_utils. - create_aggregate_with_host(self.nova_client, - 'test_aggregate', - 'test_azone', - 'test_hostname')) - - @mock.patch('functest.utils.openstack_utils.logger.error') - def test_create_aggregate_with_host_exception(self, mock_logger_error): - with mock.patch('functest.utils.openstack_utils.create_aggregate', - side_effect=Exception): - self.assertEqual(openstack_utils. - create_aggregate_with_host(Exception, - 'test_aggregate', - 'test_azone', - 'test_hostname'), - None) - self.assertTrue(mock_logger_error.called) - - def test_create_instance_default(self): - with mock.patch('functest.utils.openstack_utils.' - 'get_nova_client', - return_value=self.nova_client): - self.assertEqual(openstack_utils. - create_instance('test_flavor', - 'image_id', - 'network_id'), - self.instance) - - @mock.patch('functest.utils.openstack_utils.logger.error') - def test_create_instance_exception(self, mock_logger_error): - with mock.patch('functest.utils.openstack_utils.' - 'get_nova_client', - return_value=self.nova_client): - self.nova_client.flavors.find.side_effect = Exception - self.assertEqual(openstack_utils. - create_instance('test_flavor', - 'image_id', - 'network_id'), - None) - self.assertTrue(mock_logger_error) - - def test_create_floating_ip_default(self): - with mock.patch('functest.utils.openstack_utils.' - 'get_external_net_id', - return_value='external_net_id'): - exp_resp = {'fip_addr': 'test_ip', 'fip_id': 'fip_id'} - self.assertEqual(openstack_utils. - create_floating_ip(self.neutron_client), - exp_resp) - - @mock.patch('functest.utils.openstack_utils.logger.error') - def test_create_floating_ip_exception(self, mock_logger_error): - with mock.patch('functest.utils.openstack_utils.' - 'get_external_net_id', - return_value='external_net_id'): - self.assertEqual(openstack_utils. - create_floating_ip(Exception), - None) - self.assertTrue(mock_logger_error) - - def test_add_floating_ip_default(self): - with mock.patch('functest.utils.openstack_utils.get_aggregate_id'): - self.assertTrue(openstack_utils. - add_floating_ip(self.nova_client, - 'test_serverid', - 'test_floatingip_addr')) - - @mock.patch('functest.utils.openstack_utils.logger.error') - def test_add_floating_ip_exception(self, mock_logger_error): - self.assertFalse(openstack_utils. - add_floating_ip(Exception, - 'test_serverid', - 'test_floatingip_addr')) - self.assertTrue(mock_logger_error.called) - - def test_delete_instance_default(self): - self.assertTrue(openstack_utils. - delete_instance(self.nova_client, - 'instance_id')) - - @mock.patch('functest.utils.openstack_utils.logger.error') - def test_delete_instance_exception(self, mock_logger_error): - self.assertFalse(openstack_utils. - delete_instance(Exception, - 'instance_id')) - self.assertTrue(mock_logger_error.called) - - def test_delete_floating_ip_default(self): - self.assertTrue(openstack_utils. - delete_floating_ip(self.neutron_client, - 'floating_ip_id')) - - @mock.patch('functest.utils.openstack_utils.logger.error') - def test_delete_floating_ip_exception(self, mock_logger_error): - self.assertFalse(openstack_utils. - delete_floating_ip(Exception, - 'floating_ip_id')) - self.assertTrue(mock_logger_error.called) - - def test_remove_host_from_aggregate_default(self): - with mock.patch('functest.utils.openstack_utils.' - 'get_aggregate_id'): - self.assertTrue(openstack_utils. - remove_host_from_aggregate(self.nova_client, - 'agg_name', - 'host_name')) - - @mock.patch('functest.utils.openstack_utils.logger.error') - def test_remove_host_from_aggregate_exception(self, mock_logger_error): - with mock.patch('functest.utils.openstack_utils.' - 'get_aggregate_id', side_effect=Exception): - self.assertFalse(openstack_utils. - remove_host_from_aggregate(self.nova_client, - 'agg_name', - 'host_name')) - self.assertTrue(mock_logger_error.called) - - def test_remove_hosts_from_aggregate_default(self): - with mock.patch('functest.utils.openstack_utils.' - 'get_aggregate_id'), \ - mock.patch('functest.utils.openstack_utils.' - 'remove_host_from_aggregate', - return_value=True) \ - as mock_method: - openstack_utils.remove_hosts_from_aggregate(self.nova_client, - 'test_aggregate') - mock_method.assert_any_call(self.nova_client, - 'test_aggregate', - 'host_name') - - def test_delete_aggregate_default(self): - with mock.patch('functest.utils.openstack_utils.' - 'remove_hosts_from_aggregate'): - self.assertTrue(openstack_utils. - delete_aggregate(self.nova_client, - 'agg_name')) - - @mock.patch('functest.utils.openstack_utils.logger.error') - def test_delete_aggregate_exception(self, mock_logger_error): - with mock.patch('functest.utils.openstack_utils.' - 'remove_hosts_from_aggregate', side_effect=Exception): - self.assertFalse(openstack_utils. - delete_aggregate(self.nova_client, - 'agg_name')) - self.assertTrue(mock_logger_error.called) - - def test_get_network_list_default(self): - self.assertEqual(openstack_utils. - get_network_list(self.neutron_client), - self.networks) - - def test_get_network_list_missing_network(self): - self.assertEqual(openstack_utils. - get_network_list(self.empty_client), - None) - - def test_get_router_list_default(self): - self.assertEqual(openstack_utils. - get_router_list(self.neutron_client), - [self.router]) - - def test_get_router_list_missing_router(self): - self.assertEqual(openstack_utils. - get_router_list(self.empty_client), - None) - - def test_get_port_list_default(self): - self.assertEqual(openstack_utils. - get_port_list(self.neutron_client), - [self.port]) - - def test_get_port_list_missing_port(self): - self.assertEqual(openstack_utils. - get_port_list(self.empty_client), - None) - - def test_get_network_id_default(self): - self.assertEqual(openstack_utils. - get_network_id(self.neutron_client, - 'test_network'), - 'network_id') - - def test_get_subnet_id_default(self): - self.assertEqual(openstack_utils. - get_subnet_id(self.neutron_client, - 'test_subnet'), - 'subnet_id') - - def test_get_router_id_default(self): - self.assertEqual(openstack_utils. - get_router_id(self.neutron_client, - 'test_router'), - 'router_id') - - def test_get_private_net_default(self): - self.assertEqual(openstack_utils. - get_private_net(self.neutron_client), - self.networks[0]) - - def test_get_private_net_missing_net(self): - self.assertEqual(openstack_utils. - get_private_net(self.empty_client), - None) - - def test_get_external_net_default(self): - self.assertEqual(openstack_utils. - get_external_net(self.neutron_client), - 'test_network1') - - def test_get_external_net_missing_net(self): - self.assertEqual(openstack_utils. - get_external_net(self.empty_client), - None) - - def test_get_external_net_id_default(self): - self.assertEqual(openstack_utils. - get_external_net_id(self.neutron_client), - 'network_id1') - - def test_get_external_net_id_missing_net(self): - self.assertEqual(openstack_utils. - get_external_net_id(self.empty_client), - None) - - def test_check_neutron_net_default(self): - self.assertTrue(openstack_utils. - check_neutron_net(self.neutron_client, - 'test_network')) - - def test_check_neutron_net_missing_net(self): - self.assertFalse(openstack_utils. - check_neutron_net(self.empty_client, - 'test_network')) - - def test_create_neutron_net_default(self): - self.assertEqual(openstack_utils. - create_neutron_net(self.neutron_client, - 'test_network'), - 'network_id') - - @mock.patch('functest.utils.openstack_utils.logger.error') - def test_create_neutron_net_exception(self, mock_logger_error): - self.assertEqual(openstack_utils. - create_neutron_net(Exception, - 'test_network'), - None) - self.assertTrue(mock_logger_error.called) - - def test_create_neutron_subnet_default(self): - self.assertEqual(openstack_utils. - create_neutron_subnet(self.neutron_client, - 'test_subnet', - 'test_cidr', - 'network_id'), - 'subnet_id') - - @mock.patch('functest.utils.openstack_utils.logger.error') - def test_create_neutron_subnet_exception(self, mock_logger_error): - self.assertEqual(openstack_utils. - create_neutron_subnet(Exception, - 'test_subnet', - 'test_cidr', - 'network_id'), - None) - self.assertTrue(mock_logger_error.called) - - def test_create_neutron_router_default(self): - self.assertEqual(openstack_utils. - create_neutron_router(self.neutron_client, - 'test_router'), - 'router_id') - - @mock.patch('functest.utils.openstack_utils.logger.error') - def test_create_neutron_router_exception(self, mock_logger_error): - self.assertEqual(openstack_utils. - create_neutron_router(Exception, - 'test_router'), - None) - self.assertTrue(mock_logger_error.called) - - def test_create_neutron_port_default(self): - self.assertEqual(openstack_utils. - create_neutron_port(self.neutron_client, - 'test_port', - 'network_id', - 'test_ip'), - 'port_id') - - @mock.patch('functest.utils.openstack_utils.logger.error') - def test_create_neutron_port_exception(self, mock_logger_error): - self.assertEqual(openstack_utils. - create_neutron_port(Exception, - 'test_port', - 'network_id', - 'test_ip'), - None) - self.assertTrue(mock_logger_error.called) - - def test_update_neutron_net_default(self): - self.assertTrue(openstack_utils. - update_neutron_net(self.neutron_client, - 'network_id')) - - @mock.patch('functest.utils.openstack_utils.logger.error') - def test_update_neutron_net_exception(self, mock_logger_error): - self.assertFalse(openstack_utils. - update_neutron_net(Exception, - 'network_id')) - self.assertTrue(mock_logger_error.called) - - def test_update_neutron_port_default(self): - self.assertEqual(openstack_utils. - update_neutron_port(self.neutron_client, - 'port_id', - 'test_owner'), - 'port_id') - - @mock.patch('functest.utils.openstack_utils.logger.error') - def test_update_neutron_port_exception(self, mock_logger_error): - self.assertEqual(openstack_utils. - update_neutron_port(Exception, - 'port_id', - 'test_owner'), - None) - self.assertTrue(mock_logger_error.called) - - def test_add_interface_router_default(self): - self.assertTrue(openstack_utils. - add_interface_router(self.neutron_client, - 'router_id', - 'subnet_id')) - - @mock.patch('functest.utils.openstack_utils.logger.error') - def test_add_interface_router_exception(self, mock_logger_error): - self.assertFalse(openstack_utils. - add_interface_router(Exception, - 'router_id', - 'subnet_id')) - self.assertTrue(mock_logger_error.called) - - def test_add_gateway_router_default(self): - with mock.patch('functest.utils.openstack_utils.' - 'get_external_net_id', - return_value='network_id'): - self.assertTrue(openstack_utils. - add_gateway_router(self.neutron_client, - 'router_id')) - - @mock.patch('functest.utils.openstack_utils.logger.error') - def test_add_gateway_router_exception(self, mock_logger_error): - with mock.patch('functest.utils.openstack_utils.' - 'get_external_net_id', - return_value='network_id'): - self.assertFalse(openstack_utils. - add_gateway_router(Exception, - 'router_id')) - self.assertTrue(mock_logger_error.called) - - def test_delete_neutron_net_default(self): - self.assertTrue(openstack_utils. - delete_neutron_net(self.neutron_client, - 'network_id')) - - @mock.patch('functest.utils.openstack_utils.logger.error') - def test_delete_neutron_net_exception(self, mock_logger_error): - self.assertFalse(openstack_utils. - delete_neutron_net(Exception, - 'network_id')) - self.assertTrue(mock_logger_error.called) - - def test_delete_neutron_subnet_default(self): - self.assertTrue(openstack_utils. - delete_neutron_subnet(self.neutron_client, - 'subnet_id')) - - @mock.patch('functest.utils.openstack_utils.logger.error') - def test_delete_neutron_subnet_exception(self, mock_logger_error): - self.assertFalse(openstack_utils. - delete_neutron_subnet(Exception, - 'subnet_id')) - self.assertTrue(mock_logger_error.called) - - def test_delete_neutron_router_default(self): - self.assertTrue(openstack_utils. - delete_neutron_router(self.neutron_client, - 'router_id')) - - @mock.patch('functest.utils.openstack_utils.logger.error') - def test_delete_neutron_router_exception(self, mock_logger_error): - self.assertFalse(openstack_utils. - delete_neutron_router(Exception, - 'router_id')) - self.assertTrue(mock_logger_error.called) - - def test_delete_neutron_port_default(self): - self.assertTrue(openstack_utils. - delete_neutron_port(self.neutron_client, - 'port_id')) - - @mock.patch('functest.utils.openstack_utils.logger.error') - def test_delete_neutron_port_exception(self, mock_logger_error): - self.assertFalse(openstack_utils. - delete_neutron_port(Exception, - 'port_id')) - self.assertTrue(mock_logger_error.called) - - def test_remove_interface_router_default(self): - self.assertTrue(openstack_utils. - remove_interface_router(self.neutron_client, - 'router_id', - 'subnet_id')) - - @mock.patch('functest.utils.openstack_utils.logger.error') - def test_remove_interface_router_exception(self, mock_logger_error): - self.assertFalse(openstack_utils. - remove_interface_router(Exception, - 'router_id', - 'subnet_id')) - self.assertTrue(mock_logger_error.called) - - def test_remove_gateway_router_default(self): - self.assertTrue(openstack_utils. - remove_gateway_router(self.neutron_client, - 'router_id')) - - @mock.patch('functest.utils.openstack_utils.logger.error') - def test_remove_gateway_router_exception(self, mock_logger_error): - self.assertFalse(openstack_utils. - remove_gateway_router(Exception, - 'router_id')) - self.assertTrue(mock_logger_error.called) - - def test_get_security_groups_default(self): - self.assertEqual(openstack_utils. - get_security_groups(self.neutron_client), - [self.sec_group]) - - @mock.patch('functest.utils.openstack_utils.logger.error') - def test_get_security_groups_exception(self, mock_logger_error): - self.assertEqual(openstack_utils. - get_security_groups(Exception), - None) - self.assertTrue(mock_logger_error.called) - - def test_get_security_group_id_default(self): - with mock.patch('functest.utils.openstack_utils.' - 'get_security_groups', - return_value=[self.sec_group]): - self.assertEqual(openstack_utils. - get_security_group_id(self.neutron_client, - 'test_sec_group'), - 'sec_group_id') - - def test_get_security_group_rules_default(self): - self.assertEqual(openstack_utils. - get_security_group_rules(self.neutron_client, - self.sec_group['id']), - [self.sec_group_rule]) - - @mock.patch('functest.utils.openstack_utils.logger.error') - def test_get_security_group_rules_exception(self, mock_logger_error): - self.assertEqual(openstack_utils. - get_security_group_rules(Exception, - 'sec_group_id'), - None) - self.assertTrue(mock_logger_error.called) - - def test_check_security_group_rules_not_exists(self): - self.assertEqual(openstack_utils. - check_security_group_rules(self.neutron_client, - 'sec_group_id_2', - 'direction', - 'protocol', - 'port_min', - 'port_max'), - True) - - def test_check_security_group_rules_exists(self): - self.assertEqual(openstack_utils. - check_security_group_rules(self.neutron_client, - self.sec_group['id'], - 'direction', - 'protocol', - 'port_min', - 'port_max'), - False) - - @mock.patch('functest.utils.openstack_utils.logger.error') - def test_check_security_group_rules_exception(self, mock_logger_error): - self.assertEqual(openstack_utils. - check_security_group_rules(Exception, - 'sec_group_id', - 'direction', - 'protocol', - 'port_max', - 'port_min'), - None) - self.assertTrue(mock_logger_error.called) - - def test_create_security_group_default(self): - self.assertEqual(openstack_utils. - create_security_group(self.neutron_client, - 'test_sec_group', - 'sec_group_desc'), - self.sec_group) - - @mock.patch('functest.utils.openstack_utils.logger.error') - def test_create_security_group_exception(self, mock_logger_error): - self.assertEqual(openstack_utils. - create_security_group(Exception, - 'test_sec_group', - 'sec_group_desc'), - None) - self.assertTrue(mock_logger_error.called) - - def test_create_secgroup_rule_default(self): - self.assertTrue(openstack_utils. - create_secgroup_rule(self.neutron_client, - 'sg_id', - 'direction', - 'protocol', - 80, - 80)) - self.assertTrue(openstack_utils. - create_secgroup_rule(self.neutron_client, - 'sg_id', - 'direction', - 'protocol')) - - @mock.patch('functest.utils.openstack_utils.logger.error') - def test_create_secgroup_rule_invalid_port_range(self, mock_logger_error): - self.assertFalse(openstack_utils. - create_secgroup_rule(self.neutron_client, - 'sg_id', - 'direction', - 'protocol', - 80)) - - @mock.patch('functest.utils.openstack_utils.logger.error') - def test_create_secgroup_rule_exception(self, mock_logger_error): - self.assertFalse(openstack_utils. - create_secgroup_rule(Exception, - 'sg_id', - 'direction', - 'protocol')) - - @mock.patch('functest.utils.openstack_utils.logger.info') - def test_create_security_group_full_default(self, mock_logger_info): - with mock.patch('functest.utils.openstack_utils.' - 'get_security_group_id', - return_value='sg_id'): - self.assertEqual(openstack_utils. - create_security_group_full(self.neutron_client, - 'sg_name', - 'sg_desc'), - 'sg_id') - self.assertTrue(mock_logger_info) - - @mock.patch('functest.utils.openstack_utils.logger.info') - @mock.patch('functest.utils.openstack_utils.logger.error') - def test_create_security_group_full_sec_group_fail(self, - mock_logger_error, - mock_logger_info): - with mock.patch('functest.utils.openstack_utils.' - 'get_security_group_id', - return_value=''), \ - mock.patch('functest.utils.openstack_utils.' - 'create_security_group', - return_value=False): - self.assertEqual(openstack_utils. - create_security_group_full(self.neutron_client, - 'sg_name', - 'sg_desc'), - None) - self.assertTrue(mock_logger_error) - self.assertTrue(mock_logger_info) - - @mock.patch('functest.utils.openstack_utils.logger.debug') - @mock.patch('functest.utils.openstack_utils.logger.info') - @mock.patch('functest.utils.openstack_utils.logger.error') - def test_create_security_group_full_secgroup_rule_fail(self, - mock_logger_error, - mock_logger_info, - mock_logger_debug): - with mock.patch('functest.utils.openstack_utils.' - 'get_security_group_id', - return_value=''), \ - mock.patch('functest.utils.openstack_utils.' - 'create_security_group', - return_value={'id': 'sg_id', - 'name': 'sg_name'}), \ - mock.patch('functest.utils.openstack_utils.' - 'create_secgroup_rule', - return_value=False): - self.assertEqual(openstack_utils. - create_security_group_full(self.neutron_client, - 'sg_name', - 'sg_desc'), - None) - self.assertTrue(mock_logger_error) - self.assertTrue(mock_logger_info) - self.assertTrue(mock_logger_debug) - - def test_add_secgroup_to_instance_default(self): - self.assertTrue(openstack_utils. - add_secgroup_to_instance(self.nova_client, - 'instance_id', - 'sec_group_id')) - - @mock.patch('functest.utils.openstack_utils.logger.error') - def test_add_secgroup_to_instance_exception(self, mock_logger_error): - self.assertFalse(openstack_utils. - add_secgroup_to_instance(Exception, - 'instance_id', - 'sec_group_id')) - self.assertTrue(mock_logger_error.called) - - def test_update_sg_quota_default(self): - self.assertTrue(openstack_utils. - update_sg_quota(self.neutron_client, - 'tenant_id', - 'sg_quota', - 'sg_rule_quota')) - - @mock.patch('functest.utils.openstack_utils.logger.error') - def test_update_sg_quota_exception(self, mock_logger_error): - self.assertFalse(openstack_utils. - update_sg_quota(Exception, - 'tenant_id', - 'sg_quota', - 'sg_rule_quota')) - self.assertTrue(mock_logger_error.called) - - def test_delete_security_group_default(self): - self.assertTrue(openstack_utils. - delete_security_group(self.neutron_client, - 'sec_group_id')) - - @mock.patch('functest.utils.openstack_utils.logger.error') - def test_delete_security_group_exception(self, mock_logger_error): - self.assertFalse(openstack_utils. - delete_security_group(Exception, - 'sec_group_id')) - self.assertTrue(mock_logger_error.called) - - def test_get_images_default(self): - self.assertEqual(openstack_utils. - get_images(self.glance_client), - [self.image]) - - @mock.patch('functest.utils.openstack_utils.logger.error') - def test_get_images_exception(self, mock_logger_error): - self.assertEqual(openstack_utils. - get_images(Exception), - None) - self.assertTrue(mock_logger_error.called) - - def test_get_image_id_default(self): - self.assertEqual(openstack_utils. - get_image_id(self.glance_client, - 'test_image'), - 'image_id') - - # create_glance_image, get_or_create_image - @mock.patch('functest.utils.openstack_utils.logger.error') - def test_create_glance_image_file_present(self, mock_logger_error): - with mock.patch('functest.utils.openstack_utils.' - 'os.path.isfile', - return_value=False): - self.assertEqual(openstack_utils. - create_glance_image(self.glance_client, - 'test_image', - 'file_path'), - None) - self.assertTrue(mock_logger_error.called) - - @mock.patch('functest.utils.openstack_utils.logger.info') - def test_create_glance_image_already_exist(self, mock_logger_info): - with mock.patch('functest.utils.openstack_utils.' - 'os.path.isfile', - return_value=True), \ - mock.patch('functest.utils.openstack_utils.get_image_id', - return_value='image_id'): - self.assertEqual(openstack_utils. - create_glance_image(self.glance_client, - 'test_image', - 'file_path'), - 'image_id') - self.assertTrue(mock_logger_info.called) - - @mock.patch('functest.utils.openstack_utils.logger.info') - def test_create_glance_image_default(self, mock_logger_info): - with mock.patch('functest.utils.openstack_utils.' - 'os.path.isfile', - return_value=True), \ - mock.patch('functest.utils.openstack_utils.get_image_id', - return_value=''), \ - mock.patch('six.moves.builtins.open', - mock.mock_open(read_data='1')) as m: - self.assertEqual(openstack_utils. - create_glance_image(self.glance_client, - 'test_image', - 'file_path'), - 'image_id') - m.assert_called_once_with('file_path') - self.assertTrue(mock_logger_info.called) - - @mock.patch('functest.utils.openstack_utils.logger.error') - def test_create_glance_image_exception(self, mock_logger_error): - with mock.patch('functest.utils.openstack_utils.' - 'os.path.isfile', - return_value=True), \ - mock.patch('functest.utils.openstack_utils.get_image_id', - side_effect=Exception): - self.assertEqual(openstack_utils. - create_glance_image(self.glance_client, - 'test_image', - 'file_path'), - None) - self.assertTrue(mock_logger_error.called) - - def test_delete_glance_image_default(self): - self.assertTrue(openstack_utils. - delete_glance_image(self.nova_client, - 'image_id')) - - @mock.patch('functest.utils.openstack_utils.logger.error') - def test_delete_glance_image_exception(self, mock_logger_error): - self.assertFalse(openstack_utils. - delete_glance_image(Exception, - 'image_id')) - self.assertTrue(mock_logger_error.called) - - def test_get_volumes_default(self): - self.assertEqual(openstack_utils. - get_volumes(self.cinder_client), - [self.volume]) - - @mock.patch('functest.utils.openstack_utils.logger.error') - def test_get_volumes_exception(self, mock_logger_error): - self.assertEqual(openstack_utils. - get_volumes(Exception), - None) - self.assertTrue(mock_logger_error.called) - - def test_update_cinder_quota_default(self): - self.assertTrue(openstack_utils. - update_cinder_quota(self.cinder_client, - 'tenant_id', - 'vols_quota', - 'snap_quota', - 'giga_quota')) - - @mock.patch('functest.utils.openstack_utils.logger.error') - def test_update_cinder_quota_exception(self, mock_logger_error): - self.assertFalse(openstack_utils. - update_cinder_quota(Exception, - 'tenant_id', - 'vols_quota', - 'snap_quota', - 'giga_quota')) - self.assertTrue(mock_logger_error.called) - - def test_delete_volume_default(self): - self.assertTrue(openstack_utils. - delete_volume(self.cinder_client, - 'volume_id', - forced=False)) - - self.assertTrue(openstack_utils. - delete_volume(self.cinder_client, - 'volume_id', - forced=True)) - - @mock.patch('functest.utils.openstack_utils.logger.error') - def test_delete_volume_exception(self, mock_logger_error): - self.assertFalse(openstack_utils. - delete_volume(Exception, - 'volume_id', - forced=True)) - self.assertTrue(mock_logger_error.called) - - def test_get_tenants_default(self): - with mock.patch('functest.utils.openstack_utils.' - 'is_keystone_v3', return_value=True): - self.assertEqual(openstack_utils. - get_tenants(self.keystone_client), - [self.tenant]) - with mock.patch('functest.utils.openstack_utils.' - 'is_keystone_v3', return_value=False): - self.assertEqual(openstack_utils. - get_tenants(self.keystone_client), - [self.tenant]) - - @mock.patch('functest.utils.openstack_utils.logger.error') - def test_get_tenants_exception(self, mock_logger_error): - self.assertEqual(openstack_utils. - get_tenants(Exception), - None) - self.assertTrue(mock_logger_error.called) - - def test_get_users_default(self): - self.assertEqual(openstack_utils. - get_users(self.keystone_client), - [self.user]) - - @mock.patch('functest.utils.openstack_utils.logger.error') - def test_get_users_exception(self, mock_logger_error): - self.assertEqual(openstack_utils. - get_users(Exception), - None) - self.assertTrue(mock_logger_error.called) - - def test_get_tenant_id_default(self): - self.assertEqual(openstack_utils. - get_tenant_id(self.keystone_client, - 'test_tenant'), - 'tenant_id') - - def test_get_user_id_default(self): - self.assertEqual(openstack_utils. - get_user_id(self.keystone_client, - 'test_user'), - 'user_id') - - def test_get_role_id_default(self): - self.assertEqual(openstack_utils. - get_role_id(self.keystone_client, - 'test_role'), - 'role_id') - - def test_get_domain_id_default(self): - self.assertEqual(openstack_utils. - get_domain_id(self.keystone_client, - 'test_domain'), - 'domain_id') - - def test_create_tenant_default(self): - with mock.patch('functest.utils.openstack_utils.' - 'is_keystone_v3', return_value=True): - os.environ['OS_PROJECT_DOMAIN_NAME'] = 'Default' - self.assertEqual(openstack_utils. - create_tenant(self.keystone_client, - 'test_tenant', - 'tenant_desc'), - 'tenant_id') - with mock.patch('functest.utils.openstack_utils.' - 'is_keystone_v3', return_value=False): - self.assertEqual(openstack_utils. - create_tenant(self.keystone_client, - 'test_tenant', - 'tenant_desc'), - 'tenant_id') - - @mock.patch('functest.utils.openstack_utils.logger.error') - def test_create_tenant_exception(self, mock_logger_error): - self.assertEqual(openstack_utils. - create_tenant(Exception, - 'test_tenant', - 'tenant_desc'), - None) - self.assertTrue(mock_logger_error.called) - - def test_create_user_default(self): - with mock.patch('functest.utils.openstack_utils.' - 'is_keystone_v3', return_value=True): - self.assertEqual(openstack_utils. - create_user(self.keystone_client, - 'test_user', - 'password', - 'email', - 'tenant_id'), - 'user_id') - with mock.patch('functest.utils.openstack_utils.' - 'is_keystone_v3', return_value=False): - self.assertEqual(openstack_utils. - create_user(self.keystone_client, - 'test_user', - 'password', - 'email', - 'tenant_id'), - 'user_id') - - @mock.patch('functest.utils.openstack_utils.logger.error') - def test_create_user_exception(self, mock_logger_error): - self.assertEqual(openstack_utils. - create_user(Exception, - 'test_user', - 'password', - 'email', - 'tenant_id'), - None) - self.assertTrue(mock_logger_error.called) - - def test_add_role_user_default(self): - with mock.patch('functest.utils.openstack_utils.' - 'is_keystone_v3', return_value=True): - self.assertTrue(openstack_utils. - add_role_user(self.keystone_client, - 'user_id', - 'role_id', - 'tenant_id')) - - with mock.patch('functest.utils.openstack_utils.' - 'is_keystone_v3', return_value=False): - self.assertTrue(openstack_utils. - add_role_user(self.keystone_client, - 'user_id', - 'role_id', - 'tenant_id')) - - @mock.patch('functest.utils.openstack_utils.logger.error') - def test_add_role_user_exception(self, mock_logger_error): - self.assertFalse(openstack_utils. - add_role_user(Exception, - 'user_id', - 'role_id', - 'tenant_id')) - self.assertTrue(mock_logger_error.called) - - def test_delete_tenant_default(self): - with mock.patch('functest.utils.openstack_utils.' - 'is_keystone_v3', return_value=True): - self.assertTrue(openstack_utils. - delete_tenant(self.keystone_client, - 'tenant_id')) - - with mock.patch('functest.utils.openstack_utils.' - 'is_keystone_v3', return_value=False): - self.assertTrue(openstack_utils. - delete_tenant(self.keystone_client, - 'tenant_id')) - - @mock.patch('functest.utils.openstack_utils.logger.error') - def test_delete_tenant_exception(self, mock_logger_error): - self.assertFalse(openstack_utils. - delete_tenant(Exception, - 'tenant_id')) - self.assertTrue(mock_logger_error.called) - - def test_delete_user_default(self): - self.assertTrue(openstack_utils. - delete_user(self.keystone_client, - 'user_id')) - - @mock.patch('functest.utils.openstack_utils.logger.error') - def test_delete_user_exception(self, mock_logger_error): - self.assertFalse(openstack_utils. - delete_user(Exception, - 'user_id')) - self.assertTrue(mock_logger_error.called) - - def test_get_resource_default(self): - with mock.patch('functest.utils.openstack_utils.' - 'is_keystone_v3', return_value=True): - self.assertEqual(openstack_utils. - get_resource(self.heat_client, - 'stack_id', - 'resource'), - self.resource) - - @mock.patch('functest.utils.openstack_utils.logger.error') - def test_get_resource_exception(self, mock_logger_error): - self.assertEqual(openstack_utils. - get_resource(Exception, - 'stack_id', - 'resource'), - None) - self.assertTrue(mock_logger_error.called) - - def test_get_or_create_user_for_vnf_get(self): - with mock.patch('functest.utils.openstack_utils.' - 'get_user_id', - return_value='user_id'), \ - mock.patch('functest.utils.openstack_utils.get_tenant_id', - return_value='tenant_id'): - self.assertFalse(openstack_utils. - get_or_create_user_for_vnf(self.keystone_client, - 'my_vnf')) - - def test_get_or_create_user_for_vnf_create(self): - with mock.patch('functest.utils.openstack_utils.' - 'get_user_id', - return_value=None), \ - mock.patch('functest.utils.openstack_utils.get_tenant_id', - return_value='tenant_id'): - self.assertTrue(openstack_utils. - get_or_create_user_for_vnf(self.keystone_client, - 'my_vnf')) - - def test_get_or_create_user_for_vnf_error_get_user_id(self): - with mock.patch('functest.utils.openstack_utils.' - 'get_user_id', - side_effect=Exception): - self.assertRaises(Exception) - - def test_get_or_create_user_for_vnf_error_get_tenant_id(self): - with mock.patch('functest.utils.openstack_utils.' - 'get_user_id', - return_value='user_id'), \ - mock.patch('functest.utils.openstack_utils.get_tenant_id', - side_effect='Exception'): - self.assertRaises(Exception) - - def test_get_or_create_tenant_for_vnf_get(self): - with mock.patch('functest.utils.openstack_utils.' - 'get_tenant_id', - return_value='tenant_id'): - self.assertFalse( - openstack_utils.get_or_create_tenant_for_vnf( - self.keystone_client, 'tenant_name', 'tenant_description')) - - def test_get_or_create_tenant_for_vnf_create(self): - with mock.patch('functest.utils.openstack_utils.get_tenant_id', - return_value=None): - self.assertTrue( - openstack_utils.get_or_create_tenant_for_vnf( - self.keystone_client, 'tenant_name', 'tenant_description')) - - def test_get_or_create_tenant_for_vnf_error_get_tenant_id(self): - with mock.patch('functest.utils.openstack_utils.' - 'get_tenant_id', - side_effect=Exception): - self.assertRaises(Exception) - - def test_download_and_add_image_on_glance_image_creation_failure(self): - with mock.patch('functest.utils.openstack_utils.' - 'os.makedirs'), \ - mock.patch('functest.utils.openstack_utils.' - 'ft_utils.download_url', - return_value=True), \ - mock.patch('functest.utils.openstack_utils.' - 'create_glance_image', - return_value=''): - resp = openstack_utils.download_and_add_image_on_glance( - self.glance_client, - 'image_name', - 'http://url', - 'data_dir') - self.assertEqual(resp, False) - - -if __name__ == "__main__": - logging.disable(logging.CRITICAL) - unittest.main(verbosity=2) diff --git a/functest/utils/openstack_utils.py b/functest/utils/openstack_utils.py deleted file mode 100644 index 98da48b8..00000000 --- a/functest/utils/openstack_utils.py +++ /dev/null @@ -1,1486 +0,0 @@ -#!/usr/bin/env python -# -# jose.lausuch@ericsson.com -# valentin.boucher@orange.com -# All rights reserved. This program and the accompanying materials -# are made available under the terms of the Apache License, Version 2.0 -# which accompanies this distribution, and is available at -# http://www.apache.org/licenses/LICENSE-2.0 -# - -import logging -import os.path -import sys -import time - -from keystoneauth1 import loading -from keystoneauth1 import session -from cinderclient import client as cinderclient -from glanceclient import client as glanceclient -from heatclient import client as heatclient -from novaclient import client as novaclient -from keystoneclient import client as keystoneclient -from neutronclient.neutron import client as neutronclient - -from functest.utils import env -import functest.utils.functest_utils as ft_utils - -logger = logging.getLogger(__name__) - -DEFAULT_API_VERSION = '2' -DEFAULT_HEAT_API_VERSION = '1' - - -# ********************************************* -# CREDENTIALS -# ********************************************* -class MissingEnvVar(Exception): - - def __init__(self, var): - self.var = var - - def __str__(self): - return str.format("Please set the mandatory env var: {}", self.var) - - -def is_keystone_v3(): - keystone_api_version = os.getenv('OS_IDENTITY_API_VERSION') - if (keystone_api_version is None or - keystone_api_version == '2'): - return False - else: - return True - - -def get_rc_env_vars(): - env_vars = ['OS_AUTH_URL', 'OS_USERNAME', 'OS_PASSWORD'] - if is_keystone_v3(): - env_vars.extend(['OS_PROJECT_NAME', - 'OS_USER_DOMAIN_NAME', - 'OS_PROJECT_DOMAIN_NAME']) - else: - env_vars.extend(['OS_TENANT_NAME']) - return env_vars - - -def check_credentials(): - """ - Check if the OpenStack credentials (openrc) are sourced - """ - env_vars = get_rc_env_vars() - return all(map(lambda v: v in os.environ and os.environ[v], env_vars)) - - -def get_env_cred_dict(): - env_cred_dict = { - 'OS_USERNAME': 'username', - 'OS_PASSWORD': 'password', - 'OS_AUTH_URL': 'auth_url', - 'OS_TENANT_NAME': 'tenant_name', - 'OS_USER_DOMAIN_NAME': 'user_domain_name', - 'OS_PROJECT_DOMAIN_NAME': 'project_domain_name', - 'OS_PROJECT_NAME': 'project_name', - 'OS_ENDPOINT_TYPE': 'endpoint_type', - 'OS_REGION_NAME': 'region_name', - 'OS_CACERT': 'https_cacert', - 'OS_INSECURE': 'https_insecure' - } - return env_cred_dict - - -def get_credentials(other_creds={}): - """Returns a creds dictionary filled with parsed from env - """ - creds = {} - env_vars = get_rc_env_vars() - env_cred_dict = get_env_cred_dict() - - for envvar in env_vars: - if os.getenv(envvar) is None: - raise MissingEnvVar(envvar) - else: - creds_key = env_cred_dict.get(envvar) - creds.update({creds_key: os.getenv(envvar)}) - - if 'tenant' in other_creds.keys(): - if is_keystone_v3(): - tenant = 'project_name' - else: - tenant = 'tenant_name' - other_creds[tenant] = other_creds.pop('tenant') - - creds.update(other_creds) - - return creds - - -def get_session_auth(other_creds={}): - loader = loading.get_plugin_loader('password') - creds = get_credentials(other_creds) - auth = loader.load_from_options(**creds) - return auth - - -def get_endpoint(service_type, interface='public'): - auth = get_session_auth() - return get_session().get_endpoint(auth=auth, - service_type=service_type, - interface=interface) - - -def get_session(other_creds={}): - auth = get_session_auth(other_creds) - https_cacert = os.getenv('OS_CACERT', '') - https_insecure = os.getenv('OS_INSECURE', '').lower() == 'true' - return session.Session(auth=auth, - verify=(https_cacert or not https_insecure)) - - -# ********************************************* -# CLIENTS -# ********************************************* -def get_keystone_client_version(): - api_version = os.getenv('OS_IDENTITY_API_VERSION') - if api_version is not None: - logger.info("OS_IDENTITY_API_VERSION is set in env as '%s'", - api_version) - return api_version - return DEFAULT_API_VERSION - - -def get_keystone_client(other_creds={}): - sess = get_session(other_creds) - return keystoneclient.Client(get_keystone_client_version(), - session=sess, - interface=os.getenv('OS_INTERFACE', 'admin')) - - -def get_nova_client_version(): - api_version = os.getenv('OS_COMPUTE_API_VERSION') - if api_version is not None: - logger.info("OS_COMPUTE_API_VERSION is set in env as '%s'", - api_version) - return api_version - return DEFAULT_API_VERSION - - -def get_nova_client(other_creds={}): - sess = get_session(other_creds) - return novaclient.Client(get_nova_client_version(), session=sess) - - -def get_cinder_client_version(): - api_version = os.getenv('OS_VOLUME_API_VERSION') - if api_version is not None: - logger.info("OS_VOLUME_API_VERSION is set in env as '%s'", - api_version) - return api_version - return DEFAULT_API_VERSION - - -def get_cinder_client(other_creds={}): - sess = get_session(other_creds) - return cinderclient.Client(get_cinder_client_version(), session=sess) - - -def get_neutron_client_version(): - api_version = os.getenv('OS_NETWORK_API_VERSION') - if api_version is not None: - logger.info("OS_NETWORK_API_VERSION is set in env as '%s'", - api_version) - return api_version - return DEFAULT_API_VERSION - - -def get_neutron_client(other_creds={}): - sess = get_session(other_creds) - return neutronclient.Client(get_neutron_client_version(), session=sess) - - -def get_glance_client_version(): - api_version = os.getenv('OS_IMAGE_API_VERSION') - if api_version is not None: - logger.info("OS_IMAGE_API_VERSION is set in env as '%s'", api_version) - return api_version - return DEFAULT_API_VERSION - - -def get_glance_client(other_creds={}): - sess = get_session(other_creds) - return glanceclient.Client(get_glance_client_version(), session=sess) - - -def get_heat_client_version(): - api_version = os.getenv('OS_ORCHESTRATION_API_VERSION') - if api_version is not None: - logger.info("OS_ORCHESTRATION_API_VERSION is set in env as '%s'", - api_version) - return api_version - return DEFAULT_HEAT_API_VERSION - - -def get_heat_client(other_creds={}): - sess = get_session(other_creds) - return heatclient.Client(get_heat_client_version(), session=sess) - - -def download_and_add_image_on_glance(glance, image_name, image_url, data_dir): - try: - dest_path = data_dir - if not os.path.exists(dest_path): - os.makedirs(dest_path) - file_name = image_url.rsplit('/')[-1] - if not ft_utils.download_url(image_url, dest_path): - return False - except Exception: - raise Exception("Impossible to download image from {}".format( - image_url)) - - try: - image = create_glance_image( - glance, image_name, dest_path + file_name) - if not image: - return False - else: - return image - except Exception: - raise Exception("Impossible to put image {} in glance".format( - image_name)) - - -# ********************************************* -# NOVA -# ********************************************* -def get_instances(nova_client): - try: - instances = nova_client.servers.list(search_opts={'all_tenants': 1}) - return instances - except Exception as e: - logger.error("Error [get_instances(nova_client)]: %s" % e) - return None - - -def get_instance_status(nova_client, instance): - try: - instance = nova_client.servers.get(instance.id) - return instance.status - except Exception as e: - logger.error("Error [get_instance_status(nova_client)]: %s" % e) - return None - - -def get_instance_by_name(nova_client, instance_name): - try: - instance = nova_client.servers.find(name=instance_name) - return instance - except Exception as e: - logger.error("Error [get_instance_by_name(nova_client, '%s')]: %s" - % (instance_name, e)) - return None - - -def get_flavor_id(nova_client, flavor_name): - flavors = nova_client.flavors.list(detailed=True) - id = '' - for f in flavors: - if f.name == flavor_name: - id = f.id - break - return id - - -def get_flavor_id_by_ram_range(nova_client, min_ram, max_ram): - flavors = nova_client.flavors.list(detailed=True) - id = '' - for f in flavors: - if min_ram <= f.ram and f.ram <= max_ram: - id = f.id - break - return id - - -def get_aggregates(nova_client): - try: - aggregates = nova_client.aggregates.list() - return aggregates - except Exception as e: - logger.error("Error [get_aggregates(nova_client)]: %s" % e) - return None - - -def get_aggregate_id(nova_client, aggregate_name): - try: - aggregates = get_aggregates(nova_client) - _id = [ag.id for ag in aggregates if ag.name == aggregate_name][0] - return _id - except Exception as e: - logger.error("Error [get_aggregate_id(nova_client, %s)]:" - " %s" % (aggregate_name, e)) - return None - - -def get_availability_zones(nova_client): - try: - availability_zones = nova_client.availability_zones.list() - return availability_zones - except Exception as e: - logger.error("Error [get_availability_zones(nova_client)]: %s" % e) - return None - - -def get_availability_zone_names(nova_client): - try: - az_names = [az.zoneName for az in get_availability_zones(nova_client)] - return az_names - except Exception as e: - logger.error("Error [get_availability_zone_names(nova_client)]:" - " %s" % e) - return None - - -def create_flavor(nova_client, flavor_name, ram, disk, vcpus, public=True): - try: - flavor = nova_client.flavors.create( - flavor_name, ram, vcpus, disk, is_public=public) - try: - extra_specs = ft_utils.get_functest_config( - 'general.flavor_extra_specs') - flavor.set_keys(extra_specs) - except ValueError: - # flavor extra specs are not configured, therefore skip the update - pass - - except Exception as e: - logger.error("Error [create_flavor(nova_client, '%s', '%s', '%s', " - "'%s')]: %s" % (flavor_name, ram, disk, vcpus, e)) - return None - return flavor.id - - -def get_or_create_flavor(flavor_name, ram, disk, vcpus, public=True): - flavor_exists = False - nova_client = get_nova_client() - - flavor_id = get_flavor_id(nova_client, flavor_name) - if flavor_id != '': - logger.info("Using existing flavor '%s'..." % flavor_name) - flavor_exists = True - else: - logger.info("Creating flavor '%s' with '%s' RAM, '%s' disk size, " - "'%s' vcpus..." % (flavor_name, ram, disk, vcpus)) - flavor_id = create_flavor( - nova_client, flavor_name, ram, disk, vcpus, public=public) - if not flavor_id: - raise Exception("Failed to create flavor '%s'..." % (flavor_name)) - else: - logger.debug("Flavor '%s' with ID=%s created successfully." - % (flavor_name, flavor_id)) - - return flavor_exists, flavor_id - - -def get_floating_ips(neutron_client): - try: - floating_ips = neutron_client.list_floatingips() - return floating_ips['floatingips'] - except Exception as e: - logger.error("Error [get_floating_ips(neutron_client)]: %s" % e) - return None - - -def get_hypervisors(nova_client): - try: - nodes = [] - hypervisors = nova_client.hypervisors.list() - for hypervisor in hypervisors: - if hypervisor.state == "up": - nodes.append(hypervisor.hypervisor_hostname) - return nodes - except Exception as e: - logger.error("Error [get_hypervisors(nova_client)]: %s" % e) - return None - - -def create_aggregate(nova_client, aggregate_name, av_zone): - try: - nova_client.aggregates.create(aggregate_name, av_zone) - return True - except Exception as e: - logger.error("Error [create_aggregate(nova_client, %s, %s)]: %s" - % (aggregate_name, av_zone, e)) - return None - - -def add_host_to_aggregate(nova_client, aggregate_name, compute_host): - try: - aggregate_id = get_aggregate_id(nova_client, aggregate_name) - nova_client.aggregates.add_host(aggregate_id, compute_host) - return True - except Exception as e: - logger.error("Error [add_host_to_aggregate(nova_client, %s, %s)]: %s" - % (aggregate_name, compute_host, e)) - return None - - -def create_aggregate_with_host( - nova_client, aggregate_name, av_zone, compute_host): - try: - create_aggregate(nova_client, aggregate_name, av_zone) - add_host_to_aggregate(nova_client, aggregate_name, compute_host) - return True - except Exception as e: - logger.error("Error [create_aggregate_with_host(" - "nova_client, %s, %s, %s)]: %s" - % (aggregate_name, av_zone, compute_host, e)) - return None - - -def create_instance(flavor_name, - image_id, - network_id, - instance_name="functest-vm", - confdrive=True, - userdata=None, - av_zone='', - fixed_ip=None, - files=None): - nova_client = get_nova_client() - try: - flavor = nova_client.flavors.find(name=flavor_name) - except: - flavors = nova_client.flavors.list() - logger.error("Error: Flavor '%s' not found. Available flavors are: " - "\n%s" % (flavor_name, flavors)) - return None - if fixed_ip is not None: - nics = {"net-id": network_id, "v4-fixed-ip": fixed_ip} - else: - nics = {"net-id": network_id} - if userdata is None: - instance = nova_client.servers.create( - name=instance_name, - flavor=flavor, - image=image_id, - nics=[nics], - availability_zone=av_zone, - files=files - ) - else: - instance = nova_client.servers.create( - name=instance_name, - flavor=flavor, - image=image_id, - nics=[nics], - config_drive=confdrive, - userdata=userdata, - availability_zone=av_zone, - files=files - ) - return instance - - -def create_instance_and_wait_for_active(flavor_name, - image_id, - network_id, - instance_name="", - config_drive=False, - userdata="", - av_zone='', - fixed_ip=None, - files=None): - SLEEP = 3 - VM_BOOT_TIMEOUT = 180 - nova_client = get_nova_client() - instance = create_instance(flavor_name, - image_id, - network_id, - instance_name, - config_drive, - userdata, - av_zone=av_zone, - fixed_ip=fixed_ip, - files=files) - count = VM_BOOT_TIMEOUT / SLEEP - for n in range(count, -1, -1): - status = get_instance_status(nova_client, instance) - if status is None: - time.sleep(SLEEP) - continue - elif status.lower() == "active": - return instance - elif status.lower() == "error": - logger.error("The instance %s went to ERROR status." - % instance_name) - return None - time.sleep(SLEEP) - logger.error("Timeout booting the instance %s." % instance_name) - return None - - -def create_floating_ip(neutron_client): - extnet_id = get_external_net_id(neutron_client) - props = {'floating_network_id': extnet_id} - try: - ip_json = neutron_client.create_floatingip({'floatingip': props}) - fip_addr = ip_json['floatingip']['floating_ip_address'] - fip_id = ip_json['floatingip']['id'] - except Exception as e: - logger.error("Error [create_floating_ip(neutron_client)]: %s" % e) - return None - return {'fip_addr': fip_addr, 'fip_id': fip_id} - - -def add_floating_ip(nova_client, server_id, floatingip_addr): - try: - nova_client.servers.add_floating_ip(server_id, floatingip_addr) - return True - except Exception as e: - logger.error("Error [add_floating_ip(nova_client, '%s', '%s')]: %s" - % (server_id, floatingip_addr, e)) - return False - - -def delete_instance(nova_client, instance_id): - try: - nova_client.servers.force_delete(instance_id) - return True - except Exception as e: - logger.error("Error [delete_instance(nova_client, '%s')]: %s" - % (instance_id, e)) - return False - - -def delete_floating_ip(neutron_client, floatingip_id): - try: - neutron_client.delete_floatingip(floatingip_id) - return True - except Exception as e: - logger.error("Error [delete_floating_ip(neutron_client, '%s')]: %s" - % (floatingip_id, e)) - return False - - -def remove_host_from_aggregate(nova_client, aggregate_name, compute_host): - try: - aggregate_id = get_aggregate_id(nova_client, aggregate_name) - nova_client.aggregates.remove_host(aggregate_id, compute_host) - return True - except Exception as e: - logger.error("Error [remove_host_from_aggregate(nova_client, %s, %s)]:" - " %s" % (aggregate_name, compute_host, e)) - return False - - -def remove_hosts_from_aggregate(nova_client, aggregate_name): - aggregate_id = get_aggregate_id(nova_client, aggregate_name) - hosts = nova_client.aggregates.get(aggregate_id).hosts - assert( - all(remove_host_from_aggregate(nova_client, aggregate_name, host) - for host in hosts)) - - -def delete_aggregate(nova_client, aggregate_name): - try: - remove_hosts_from_aggregate(nova_client, aggregate_name) - nova_client.aggregates.delete(aggregate_name) - return True - except Exception as e: - logger.error("Error [delete_aggregate(nova_client, %s)]: %s" - % (aggregate_name, e)) - return False - - -# ********************************************* -# NEUTRON -# ********************************************* -def get_network_list(neutron_client): - network_list = neutron_client.list_networks()['networks'] - if len(network_list) == 0: - return None - else: - return network_list - - -def get_router_list(neutron_client): - router_list = neutron_client.list_routers()['routers'] - if len(router_list) == 0: - return None - else: - return router_list - - -def get_port_list(neutron_client): - port_list = neutron_client.list_ports()['ports'] - if len(port_list) == 0: - return None - else: - return port_list - - -def get_network_id(neutron_client, network_name): - networks = neutron_client.list_networks()['networks'] - id = '' - for n in networks: - if n['name'] == network_name: - id = n['id'] - break - return id - - -def get_subnet_id(neutron_client, subnet_name): - subnets = neutron_client.list_subnets()['subnets'] - id = '' - for s in subnets: - if s['name'] == subnet_name: - id = s['id'] - break - return id - - -def get_router_id(neutron_client, router_name): - routers = neutron_client.list_routers()['routers'] - id = '' - for r in routers: - if r['name'] == router_name: - id = r['id'] - break - return id - - -def get_private_net(neutron_client): - # Checks if there is an existing shared private network - networks = neutron_client.list_networks()['networks'] - if len(networks) == 0: - return None - for net in networks: - if (net['router:external'] is False) and (net['shared'] is True): - return net - return None - - -def get_external_net(neutron_client): - if (env.get('EXTERNAL_NETWORK')): - return env.get('EXTERNAL_NETWORK') - for network in neutron_client.list_networks()['networks']: - if network['router:external']: - return network['name'] - return None - - -def get_external_net_id(neutron_client): - if (env.get('EXTERNAL_NETWORK')): - networks = neutron_client.list_networks( - name=env.get('EXTERNAL_NETWORK')) - net_id = networks['networks'][0]['id'] - return net_id - for network in neutron_client.list_networks()['networks']: - if network['router:external']: - return network['id'] - return None - - -def check_neutron_net(neutron_client, net_name): - for network in neutron_client.list_networks()['networks']: - if network['name'] == net_name: - for subnet in network['subnets']: - return True - return False - - -def create_neutron_net(neutron_client, name): - json_body = {'network': {'name': name, - 'admin_state_up': True}} - try: - network = neutron_client.create_network(body=json_body) - network_dict = network['network'] - return network_dict['id'] - except Exception as e: - logger.error("Error [create_neutron_net(neutron_client, '%s')]: %s" - % (name, e)) - return None - - -def create_neutron_subnet(neutron_client, name, cidr, net_id, - dns=['8.8.8.8', '8.8.4.4']): - json_body = {'subnets': [{'name': name, 'cidr': cidr, - 'ip_version': 4, 'network_id': net_id, - 'dns_nameservers': dns}]} - - try: - subnet = neutron_client.create_subnet(body=json_body) - return subnet['subnets'][0]['id'] - except Exception as e: - logger.error("Error [create_neutron_subnet(neutron_client, '%s', " - "'%s', '%s')]: %s" % (name, cidr, net_id, e)) - return None - - -def create_neutron_router(neutron_client, name): - json_body = {'router': {'name': name, 'admin_state_up': True}} - try: - router = neutron_client.create_router(json_body) - return router['router']['id'] - except Exception as e: - logger.error("Error [create_neutron_router(neutron_client, '%s')]: %s" - % (name, e)) - return None - - -def create_neutron_port(neutron_client, name, network_id, ip): - json_body = {'port': { - 'admin_state_up': True, - 'name': name, - 'network_id': network_id, - 'fixed_ips': [{"ip_address": ip}] - }} - try: - port = neutron_client.create_port(body=json_body) - return port['port']['id'] - except Exception as e: - logger.error("Error [create_neutron_port(neutron_client, '%s', '%s', " - "'%s')]: %s" % (name, network_id, ip, e)) - return None - - -def update_neutron_net(neutron_client, network_id, shared=False): - json_body = {'network': {'shared': shared}} - try: - neutron_client.update_network(network_id, body=json_body) - return True - except Exception as e: - logger.error("Error [update_neutron_net(neutron_client, '%s', '%s')]: " - "%s" % (network_id, str(shared), e)) - return False - - -def update_neutron_port(neutron_client, port_id, device_owner): - json_body = {'port': { - 'device_owner': device_owner, - }} - try: - port = neutron_client.update_port(port=port_id, - body=json_body) - return port['port']['id'] - except Exception as e: - logger.error("Error [update_neutron_port(neutron_client, '%s', '%s')]:" - " %s" % (port_id, device_owner, e)) - return None - - -def add_interface_router(neutron_client, router_id, subnet_id): - json_body = {"subnet_id": subnet_id} - try: - neutron_client.add_interface_router(router=router_id, body=json_body) - return True - except Exception as e: - logger.error("Error [add_interface_router(neutron_client, '%s', " - "'%s')]: %s" % (router_id, subnet_id, e)) - return False - - -def add_gateway_router(neutron_client, router_id): - ext_net_id = get_external_net_id(neutron_client) - router_dict = {'network_id': ext_net_id} - try: - neutron_client.add_gateway_router(router_id, router_dict) - return True - except Exception as e: - logger.error("Error [add_gateway_router(neutron_client, '%s')]: %s" - % (router_id, e)) - return False - - -def delete_neutron_net(neutron_client, network_id): - try: - neutron_client.delete_network(network_id) - return True - except Exception as e: - logger.error("Error [delete_neutron_net(neutron_client, '%s')]: %s" - % (network_id, e)) - return False - - -def delete_neutron_subnet(neutron_client, subnet_id): - try: - neutron_client.delete_subnet(subnet_id) - return True - except Exception as e: - logger.error("Error [delete_neutron_subnet(neutron_client, '%s')]: %s" - % (subnet_id, e)) - return False - - -def delete_neutron_router(neutron_client, router_id): - try: - neutron_client.delete_router(router=router_id) - return True - except Exception as e: - logger.error("Error [delete_neutron_router(neutron_client, '%s')]: %s" - % (router_id, e)) - return False - - -def delete_neutron_port(neutron_client, port_id): - try: - neutron_client.delete_port(port_id) - return True - except Exception as e: - logger.error("Error [delete_neutron_port(neutron_client, '%s')]: %s" - % (port_id, e)) - return False - - -def remove_interface_router(neutron_client, router_id, subnet_id): - json_body = {"subnet_id": subnet_id} - try: - neutron_client.remove_interface_router(router=router_id, - body=json_body) - return True - except Exception as e: - logger.error("Error [remove_interface_router(neutron_client, '%s', " - "'%s')]: %s" % (router_id, subnet_id, e)) - return False - - -def remove_gateway_router(neutron_client, router_id): - try: - neutron_client.remove_gateway_router(router_id) - return True - except Exception as e: - logger.error("Error [remove_gateway_router(neutron_client, '%s')]: %s" - % (router_id, e)) - return False - - -def create_network_full(neutron_client, - net_name, - subnet_name, - router_name, - cidr, - dns=['8.8.8.8', '8.8.4.4']): - - # Check if the network already exists - network_id = get_network_id(neutron_client, net_name) - subnet_id = get_subnet_id(neutron_client, subnet_name) - router_id = get_router_id(neutron_client, router_name) - - if network_id != '' and subnet_id != '' and router_id != '': - logger.info("A network with name '%s' already exists..." % net_name) - else: - neutron_client.format = 'json' - logger.info('Creating neutron network %s...' % net_name) - network_id = create_neutron_net(neutron_client, net_name) - - if not network_id: - return False - - logger.debug("Network '%s' created successfully" % network_id) - logger.debug('Creating Subnet....') - subnet_id = create_neutron_subnet(neutron_client, subnet_name, - cidr, network_id, dns) - if not subnet_id: - return None - - logger.debug("Subnet '%s' created successfully" % subnet_id) - logger.debug('Creating Router...') - router_id = create_neutron_router(neutron_client, router_name) - - if not router_id: - return None - - logger.debug("Router '%s' created successfully" % router_id) - logger.debug('Adding router to subnet...') - - if not add_interface_router(neutron_client, router_id, subnet_id): - return None - - logger.debug("Interface added successfully.") - - logger.debug('Adding gateway to router...') - if not add_gateway_router(neutron_client, router_id): - return None - - logger.debug("Gateway added successfully.") - - network_dic = {'net_id': network_id, - 'subnet_id': subnet_id, - 'router_id': router_id} - return network_dic - - -def create_shared_network_full(net_name, subnt_name, router_name, subnet_cidr): - neutron_client = get_neutron_client() - - network_dic = create_network_full(neutron_client, - net_name, - subnt_name, - router_name, - subnet_cidr) - if network_dic: - if not update_neutron_net(neutron_client, - network_dic['net_id'], - shared=True): - logger.error("Failed to update network %s..." % net_name) - return None - else: - logger.debug("Network '%s' is available..." % net_name) - else: - logger.error("Network %s creation failed" % net_name) - return None - return network_dic - - -# ********************************************* -# SEC GROUPS -# ********************************************* - - -def get_security_groups(neutron_client): - try: - security_groups = neutron_client.list_security_groups()[ - 'security_groups'] - return security_groups - except Exception as e: - logger.error("Error [get_security_groups(neutron_client)]: %s" % e) - return None - - -def get_security_group_id(neutron_client, sg_name): - security_groups = get_security_groups(neutron_client) - id = '' - for sg in security_groups: - if sg['name'] == sg_name: - id = sg['id'] - break - return id - - -def create_security_group(neutron_client, sg_name, sg_description): - json_body = {'security_group': {'name': sg_name, - 'description': sg_description}} - try: - secgroup = neutron_client.create_security_group(json_body) - return secgroup['security_group'] - except Exception as e: - logger.error("Error [create_security_group(neutron_client, '%s', " - "'%s')]: %s" % (sg_name, sg_description, e)) - return None - - -def create_secgroup_rule(neutron_client, sg_id, direction, protocol, - port_range_min=None, port_range_max=None): - # We create a security group in 2 steps - # 1 - we check the format and set the json body accordingly - # 2 - we call neturon client to create the security group - - # Format check - json_body = {'security_group_rule': {'direction': direction, - 'security_group_id': sg_id, - 'protocol': protocol}} - # parameters may be - # - both None => we do nothing - # - both Not None => we add them to the json description - # but one cannot be None is the other is not None - if (port_range_min is not None and port_range_max is not None): - # add port_range in json description - json_body['security_group_rule']['port_range_min'] = port_range_min - json_body['security_group_rule']['port_range_max'] = port_range_max - logger.debug("Security_group format set (port range included)") - else: - # either both port range are set to None => do nothing - # or one is set but not the other => log it and return False - if port_range_min is None and port_range_max is None: - logger.debug("Security_group format set (no port range mentioned)") - else: - logger.error("Bad security group format." - "One of the port range is not properly set:" - "range min: {}," - "range max: {}".format(port_range_min, - port_range_max)) - return False - - # Create security group using neutron client - try: - neutron_client.create_security_group_rule(json_body) - return True - except: - logger.exception("Impossible to create_security_group_rule," - "security group rule probably already exists") - return False - - -def get_security_group_rules(neutron_client, sg_id): - try: - security_rules = neutron_client.list_security_group_rules()[ - 'security_group_rules'] - security_rules = [rule for rule in security_rules - if rule["security_group_id"] == sg_id] - return security_rules - except Exception as e: - logger.error("Error [get_security_group_rules(neutron_client, sg_id)]:" - " %s" % e) - return None - - -def check_security_group_rules(neutron_client, sg_id, direction, protocol, - port_min=None, port_max=None): - try: - security_rules = get_security_group_rules(neutron_client, sg_id) - security_rules = [rule for rule in security_rules - if (rule["direction"].lower() == direction and - rule["protocol"].lower() == protocol and - rule["port_range_min"] == port_min and - rule["port_range_max"] == port_max)] - if len(security_rules) == 0: - return True - else: - return False - except Exception as e: - logger.error("Error [check_security_group_rules(" - " neutron_client, sg_id, direction," - " protocol, port_min=None, port_max=None)]: " - "%s" % e) - return None - - -def create_security_group_full(neutron_client, - sg_name, sg_description): - sg_id = get_security_group_id(neutron_client, sg_name) - if sg_id != '': - logger.info("Using existing security group '%s'..." % sg_name) - else: - logger.info("Creating security group '%s'..." % sg_name) - SECGROUP = create_security_group(neutron_client, - sg_name, - sg_description) - if not SECGROUP: - logger.error("Failed to create the security group...") - return None - - sg_id = SECGROUP['id'] - - logger.debug("Security group '%s' with ID=%s created successfully." - % (SECGROUP['name'], sg_id)) - - logger.debug("Adding ICMP rules in security group '%s'..." - % sg_name) - if not create_secgroup_rule(neutron_client, sg_id, - 'ingress', 'icmp'): - logger.error("Failed to create the security group rule...") - return None - - logger.debug("Adding SSH rules in security group '%s'..." - % sg_name) - if not create_secgroup_rule( - neutron_client, sg_id, 'ingress', 'tcp', '22', '22'): - logger.error("Failed to create the security group rule...") - return None - - if not create_secgroup_rule( - neutron_client, sg_id, 'egress', 'tcp', '22', '22'): - logger.error("Failed to create the security group rule...") - return None - return sg_id - - -def add_secgroup_to_instance(nova_client, instance_id, secgroup_id): - try: - nova_client.servers.add_security_group(instance_id, secgroup_id) - return True - except Exception as e: - logger.error("Error [add_secgroup_to_instance(nova_client, '%s', " - "'%s')]: %s" % (instance_id, secgroup_id, e)) - return False - - -def update_sg_quota(neutron_client, tenant_id, sg_quota, sg_rule_quota): - json_body = {"quota": { - "security_group": sg_quota, - "security_group_rule": sg_rule_quota - }} - - try: - neutron_client.update_quota(tenant_id=tenant_id, - body=json_body) - return True - except Exception as e: - logger.error("Error [update_sg_quota(neutron_client, '%s', '%s', " - "'%s')]: %s" % (tenant_id, sg_quota, sg_rule_quota, e)) - return False - - -def delete_security_group(neutron_client, secgroup_id): - try: - neutron_client.delete_security_group(secgroup_id) - return True - except Exception as e: - logger.error("Error [delete_security_group(neutron_client, '%s')]: %s" - % (secgroup_id, e)) - return False - - -# ********************************************* -# GLANCE -# ********************************************* -def get_images(glance_client): - try: - images = glance_client.images.list() - return images - except Exception as e: - logger.error("Error [get_images]: %s" % e) - return None - - -def get_image_id(glance_client, image_name): - images = glance_client.images.list() - id = '' - for i in images: - if i.name == image_name: - id = i.id - break - return id - - -def create_glance_image(glance_client, - image_name, - file_path, - disk="qcow2", - extra_properties={}, - container="bare", - public="public"): - if not os.path.isfile(file_path): - logger.error("Error: file %s does not exist." % file_path) - return None - try: - image_id = get_image_id(glance_client, image_name) - if image_id != '': - logger.info("Image %s already exists." % image_name) - else: - logger.info("Creating image '%s' from '%s'..." % (image_name, - file_path)) - - image = glance_client.images.create(name=image_name, - visibility=public, - disk_format=disk, - container_format=container, - **extra_properties) - image_id = image.id - with open(file_path) as image_data: - glance_client.images.upload(image_id, image_data) - return image_id - except Exception as e: - logger.error("Error [create_glance_image(glance_client, '%s', '%s', " - "'%s')]: %s" % (image_name, file_path, public, e)) - return None - - -def get_or_create_image(name, path, format, extra_properties): - image_exists = False - glance_client = get_glance_client() - - image_id = get_image_id(glance_client, name) - if image_id != '': - logger.info("Using existing image '%s'..." % name) - image_exists = True - else: - logger.info("Creating image '%s' from '%s'..." % (name, path)) - image_id = create_glance_image(glance_client, - name, - path, - format, - extra_properties) - if not image_id: - logger.error("Failed to create a Glance image...") - else: - logger.debug("Image '%s' with ID=%s created successfully." - % (name, image_id)) - - return image_exists, image_id - - -def delete_glance_image(glance_client, image_id): - try: - glance_client.images.delete(image_id) - return True - except Exception as e: - logger.error("Error [delete_glance_image(glance_client, '%s')]: %s" - % (image_id, e)) - return False - - -# ********************************************* -# CINDER -# ********************************************* -def get_volumes(cinder_client): - try: - volumes = cinder_client.volumes.list(search_opts={'all_tenants': 1}) - return volumes - except Exception as e: - logger.error("Error [get_volumes(cinder_client)]: %s" % e) - return None - - -def update_cinder_quota(cinder_client, tenant_id, vols_quota, - snapshots_quota, gigabytes_quota): - quotas_values = {"volumes": vols_quota, - "snapshots": snapshots_quota, - "gigabytes": gigabytes_quota} - - try: - cinder_client.quotas.update(tenant_id, **quotas_values) - return True - except Exception as e: - logger.error("Error [update_cinder_quota(cinder_client, '%s', '%s', " - "'%s' '%s')]: %s" % (tenant_id, vols_quota, - snapshots_quota, gigabytes_quota, e)) - return False - - -def delete_volume(cinder_client, volume_id, forced=False): - try: - if forced: - try: - cinder_client.volumes.detach(volume_id) - except: - logger.error(sys.exc_info()[0]) - cinder_client.volumes.force_delete(volume_id) - else: - cinder_client.volumes.delete(volume_id) - return True - except Exception as e: - logger.error("Error [delete_volume(cinder_client, '%s', '%s')]: %s" - % (volume_id, str(forced), e)) - return False - - -# ********************************************* -# KEYSTONE -# ********************************************* -def get_tenants(keystone_client): - try: - if is_keystone_v3(): - tenants = keystone_client.projects.list() - else: - tenants = keystone_client.tenants.list() - return tenants - except Exception as e: - logger.error("Error [get_tenants(keystone_client)]: %s" % e) - return None - - -def get_users(keystone_client): - try: - users = keystone_client.users.list() - return users - except Exception as e: - logger.error("Error [get_users(keystone_client)]: %s" % e) - return None - - -def get_tenant_id(keystone_client, tenant_name): - tenants = get_tenants(keystone_client) - id = '' - for t in tenants: - if t.name == tenant_name: - id = t.id - break - return id - - -def get_user_id(keystone_client, user_name): - users = get_users(keystone_client) - id = '' - for u in users: - if u.name == user_name: - id = u.id - break - return id - - -def get_role_id(keystone_client, role_name): - roles = keystone_client.roles.list() - id = '' - for r in roles: - if r.name == role_name: - id = r.id - break - return id - - -def get_domain_id(keystone_client, domain_name): - domains = keystone_client.domains.list() - id = '' - for d in domains: - if d.name == domain_name: - id = d.id - break - return id - - -def create_tenant(keystone_client, tenant_name, tenant_description): - try: - if is_keystone_v3(): - domain_name = os.environ['OS_PROJECT_DOMAIN_NAME'] - domain_id = get_domain_id(keystone_client, domain_name) - tenant = keystone_client.projects.create( - name=tenant_name, - description=tenant_description, - domain=domain_id, - enabled=True) - else: - tenant = keystone_client.tenants.create(tenant_name, - tenant_description, - enabled=True) - return tenant.id - except Exception as e: - logger.error("Error [create_tenant(keystone_client, '%s', '%s')]: %s" - % (tenant_name, tenant_description, e)) - return None - - -def get_or_create_tenant(keystone_client, tenant_name, tenant_description): - tenant_id = get_tenant_id(keystone_client, tenant_name) - if not tenant_id: - tenant_id = create_tenant(keystone_client, tenant_name, - tenant_description) - - return tenant_id - - -def get_or_create_tenant_for_vnf(keystone_client, tenant_name, - tenant_description): - """Get or Create a Tenant - - Args: - keystone_client: keystone client reference - tenant_name: the name of the tenant - tenant_description: the description of the tenant - - return False if tenant retrieved though get - return True if tenant created - raise Exception if error during processing - """ - try: - tenant_id = get_tenant_id(keystone_client, tenant_name) - if not tenant_id: - tenant_id = create_tenant(keystone_client, tenant_name, - tenant_description) - return True - else: - return False - except: - raise Exception("Impossible to create a Tenant for the VNF {}".format( - tenant_name)) - - -def create_user(keystone_client, user_name, user_password, - user_email, tenant_id): - try: - if is_keystone_v3(): - user = keystone_client.users.create(name=user_name, - password=user_password, - email=user_email, - project_id=tenant_id, - enabled=True) - else: - user = keystone_client.users.create(user_name, - user_password, - user_email, - tenant_id, - enabled=True) - return user.id - except Exception as e: - logger.error("Error [create_user(keystone_client, '%s', '%s', '%s'" - "'%s')]: %s" % (user_name, user_password, - user_email, tenant_id, e)) - return None - - -def get_or_create_user(keystone_client, user_name, user_password, - tenant_id, user_email=None): - user_id = get_user_id(keystone_client, user_name) - if not user_id: - user_id = create_user(keystone_client, user_name, user_password, - user_email, tenant_id) - return user_id - - -def get_or_create_user_for_vnf(keystone_client, vnf_ref): - """Get or Create user for VNF - - Args: - keystone_client: keystone client reference - vnf_ref: VNF reference used as user name & password, tenant name - - return False if user retrieved through get - return True if user created - raise Exception if error during processing - """ - try: - user_id = get_user_id(keystone_client, vnf_ref) - tenant_id = get_tenant_id(keystone_client, vnf_ref) - created = False - if not user_id: - user_id = create_user(keystone_client, vnf_ref, vnf_ref, - "", tenant_id) - created = True - try: - role_id = get_role_id(keystone_client, 'admin') - tenant_id = get_tenant_id(keystone_client, vnf_ref) - add_role_user(keystone_client, user_id, role_id, tenant_id) - except: - logger.warn("Cannot associate user to role admin on tenant") - return created - except: - raise Exception("Impossible to create a user for the VNF {}".format( - vnf_ref)) - - -def add_role_user(keystone_client, user_id, role_id, tenant_id): - try: - if is_keystone_v3(): - keystone_client.roles.grant(role=role_id, - user=user_id, - project=tenant_id) - else: - keystone_client.roles.add_user_role(user_id, role_id, tenant_id) - return True - except Exception as e: - logger.error("Error [add_role_user(keystone_client, '%s', '%s'" - "'%s')]: %s " % (user_id, role_id, tenant_id, e)) - return False - - -def delete_tenant(keystone_client, tenant_id): - try: - if is_keystone_v3(): - keystone_client.projects.delete(tenant_id) - else: - keystone_client.tenants.delete(tenant_id) - return True - except Exception as e: - logger.error("Error [delete_tenant(keystone_client, '%s')]: %s" - % (tenant_id, e)) - return False - - -def delete_user(keystone_client, user_id): - try: - keystone_client.users.delete(user_id) - return True - except Exception as e: - logger.error("Error [delete_user(keystone_client, '%s')]: %s" - % (user_id, e)) - return False - - -# ********************************************* -# HEAT -# ********************************************* -def get_resource(heat_client, stack_id, resource): - try: - resources = heat_client.resources.get(stack_id, resource) - return resources - except Exception as e: - logger.error("Error [get_resource]: %s" % e) - return None |