summaryrefslogtreecommitdiffstats
path: root/functest
diff options
context:
space:
mode:
authorCédric Ollivier <cedric.ollivier@orange.com>2018-02-26 10:59:25 +0100
committerCédric Ollivier <cedric.ollivier@orange.com>2018-02-26 11:03:24 +0100
commit9c8dbc5be69b856c45ba61c525e74a4826f6a5c7 (patch)
treed971bea71e362dcc202acbd244f6ea77c045038b /functest
parentd6c5f7efe042f24bad35dab461d8820a7d507f7b (diff)
Remove openstack utils
They have been obsolete since we fully switched to snaps. The files are being moved to SDNVPN which is the last project using them [1]. [1] https://gerrit.opnfv.org/gerrit/#/c/52631/ Change-Id: I277b92a04cf6bc7b30015eed76f03f17830fad2b Signed-off-by: Cédric Ollivier <cedric.ollivier@orange.com>
Diffstat (limited to 'functest')
-rw-r--r--functest/tests/unit/utils/test_openstack_utils.py1784
-rw-r--r--functest/utils/openstack_utils.py1486
2 files changed, 0 insertions, 3270 deletions
diff --git a/functest/tests/unit/utils/test_openstack_utils.py b/functest/tests/unit/utils/test_openstack_utils.py
deleted file mode 100644
index 259e60d5..00000000
--- a/functest/tests/unit/utils/test_openstack_utils.py
+++ /dev/null
@@ -1,1784 +0,0 @@
-#!/usr/bin/env python
-
-# All rights reserved. This program and the accompanying materials
-# are made available under the terms of the Apache License, Version 2.0
-# which accompanies this distribution, and is available at
-# http://www.apache.org/licenses/LICENSE-2.0
-
-# pylint: disable=missing-docstring
-
-import copy
-import logging
-import os
-import unittest
-
-import mock
-
-from functest.utils import openstack_utils
-
-
-class OSUtilsTesting(unittest.TestCase):
-
- def _get_env_cred_dict(self, os_prefix=''):
- return {'OS_USERNAME': os_prefix + 'username',
- 'OS_PASSWORD': os_prefix + 'password',
- 'OS_AUTH_URL': os_prefix + 'auth_url',
- 'OS_TENANT_NAME': os_prefix + 'tenant_name',
- 'OS_USER_DOMAIN_NAME': os_prefix + 'user_domain_name',
- 'OS_PROJECT_DOMAIN_NAME': os_prefix + 'project_domain_name',
- 'OS_PROJECT_NAME': os_prefix + 'project_name',
- 'OS_ENDPOINT_TYPE': os_prefix + 'endpoint_type',
- 'OS_REGION_NAME': os_prefix + 'region_name',
- 'OS_CACERT': os_prefix + 'https_cacert',
- 'OS_INSECURE': os_prefix + 'https_insecure'}
-
- def _get_os_env_vars(self):
- return {'username': 'test_username', 'password': 'test_password',
- 'auth_url': 'test_auth_url', 'tenant_name': 'test_tenant_name',
- 'user_domain_name': 'test_user_domain_name',
- 'project_domain_name': 'test_project_domain_name',
- 'project_name': 'test_project_name',
- 'endpoint_type': 'test_endpoint_type',
- 'region_name': 'test_region_name',
- 'https_cacert': 'test_https_cacert',
- 'https_insecure': 'test_https_insecure'}
-
- def setUp(self):
- self.env_vars = ['OS_AUTH_URL', 'OS_USERNAME', 'OS_PASSWORD']
- self.tenant_name = 'test_tenant_name'
- self.env_cred_dict = self._get_env_cred_dict()
- self.os_environs = self._get_env_cred_dict(os_prefix='test_')
- self.os_env_vars = self._get_os_env_vars()
-
- mock_obj = mock.Mock()
- attrs = {'name': 'test_flavor',
- 'id': 'flavor_id',
- 'ram': 2}
- mock_obj.configure_mock(**attrs)
- self.flavor = mock_obj
-
- mock_obj = mock.Mock()
- attrs = {'name': 'test_aggregate',
- 'id': 'aggregate_id',
- 'hosts': ['host_name']}
- mock_obj.configure_mock(**attrs)
- self.aggregate = mock_obj
-
- mock_obj = mock.Mock()
- attrs = {'id': 'instance_id',
- 'name': 'test_instance',
- 'status': 'ok'}
- mock_obj.configure_mock(**attrs)
- self.instance = mock_obj
-
- mock_obj = mock.Mock()
- attrs = {'id': 'azone_id',
- 'zoneName': 'test_azone',
- 'status': 'ok'}
- mock_obj.configure_mock(**attrs)
- self.availability_zone = mock_obj
-
- mock_obj = mock.Mock()
- attrs = {'floating_network_id': 'floating_id',
- 'floating_ip_address': 'test_floating_ip'}
- mock_obj.configure_mock(**attrs)
- self.floating_ip = mock_obj
-
- mock_obj = mock.Mock()
- attrs = {'id': 'hypervisor_id',
- 'hypervisor_hostname': 'test_hostname',
- 'state': 'up'}
- mock_obj.configure_mock(**attrs)
- self.hypervisor = mock_obj
-
- mock_obj = mock.Mock()
- attrs = {'id': 'image_id',
- 'name': 'test_image'}
- mock_obj.configure_mock(**attrs)
- self.image = mock_obj
-
- mock_obj = mock.Mock()
- self.mock_return = mock_obj
-
- self.nova_client = mock.Mock()
- attrs = {'servers.list.return_value': [self.instance],
- 'servers.get.return_value': self.instance,
- 'servers.find.return_value': self.instance,
- 'servers.create.return_value': self.instance,
- 'flavors.list.return_value': [self.flavor],
- 'flavors.find.return_value': self.flavor,
- 'servers.add_floating_ip.return_value': mock.Mock(),
- 'servers.force_delete.return_value': mock.Mock(),
- 'aggregates.list.return_value': [self.aggregate],
- 'aggregates.add_host.return_value': mock.Mock(),
- 'aggregates.remove_host.return_value': mock.Mock(),
- 'aggregates.get.return_value': self.aggregate,
- 'aggregates.delete.return_value': mock.Mock(),
- 'availability_zones.list.return_value':
- [self.availability_zone],
- 'hypervisors.list.return_value': [self.hypervisor],
- 'create.return_value': mock.Mock(),
- 'add_security_group.return_value': mock.Mock(),
- 'images.list.return_value': [self.image],
- 'images.delete.return_value': mock.Mock(),
- }
- self.nova_client.configure_mock(**attrs)
-
- self.glance_client = mock.Mock()
- attrs = {'images.list.return_value': [self.image],
- 'images.create.return_value': self.image,
- 'images.upload.return_value': mock.Mock()}
- self.glance_client.configure_mock(**attrs)
-
- mock_obj = mock.Mock()
- attrs = {'id': 'volume_id',
- 'name': 'test_volume'}
- mock_obj.configure_mock(**attrs)
- self.volume = mock_obj
-
- self.cinder_client = mock.Mock()
- attrs = {'volumes.list.return_value': [self.volume],
- 'quotas.update.return_value': mock.Mock(),
- 'volumes.detach.return_value': mock.Mock(),
- 'volumes.force_delete.return_value': mock.Mock(),
- 'volumes.delete.return_value': mock.Mock()
- }
- self.cinder_client.configure_mock(**attrs)
-
- self.resource = mock.Mock()
- attrs = {'id': 'resource_test_id',
- 'name': 'resource_test_name'
- }
-
- self.heat_client = mock.Mock()
- attrs = {'resources.get.return_value': self.resource}
- self.heat_client.configure_mock(**attrs)
-
- mock_obj = mock.Mock()
- attrs = {'id': 'tenant_id',
- 'name': 'test_tenant'}
- mock_obj.configure_mock(**attrs)
- self.tenant = mock_obj
-
- mock_obj = mock.Mock()
- attrs = {'id': 'user_id',
- 'name': 'test_user'}
- mock_obj.configure_mock(**attrs)
- self.user = mock_obj
-
- mock_obj = mock.Mock()
- attrs = {'id': 'role_id',
- 'name': 'test_role'}
- mock_obj.configure_mock(**attrs)
- self.role = mock_obj
-
- mock_obj = mock.Mock()
- attrs = {'id': 'domain_id',
- 'name': 'test_domain'}
- mock_obj.configure_mock(**attrs)
- self.domain = mock_obj
-
- self.keystone_client = mock.Mock()
- attrs = {'projects.list.return_value': [self.tenant],
- 'tenants.list.return_value': [self.tenant],
- 'users.list.return_value': [self.user],
- 'roles.list.return_value': [self.role],
- 'domains.list.return_value': [self.domain],
- 'projects.create.return_value': self.tenant,
- 'tenants.create.return_value': self.tenant,
- 'users.create.return_value': self.user,
- 'roles.grant.return_value': mock.Mock(),
- 'roles.add_user_role.return_value': mock.Mock(),
- 'projects.delete.return_value': mock.Mock(),
- 'tenants.delete.return_value': mock.Mock(),
- 'users.delete.return_value': mock.Mock(),
- }
- self.keystone_client.configure_mock(**attrs)
-
- self.router = {'id': 'router_id',
- 'name': 'test_router'}
-
- self.subnet = {'id': 'subnet_id',
- 'name': 'test_subnet'}
-
- self.networks = [{'id': 'network_id',
- 'name': 'test_network',
- 'router:external': False,
- 'shared': True,
- 'subnets': [self.subnet]},
- {'id': 'network_id1',
- 'name': 'test_network1',
- 'router:external': True,
- 'shared': True,
- 'subnets': [self.subnet]}]
-
- self.port = {'id': 'port_id',
- 'name': 'test_port'}
-
- self.sec_group = {'id': 'sec_group_id',
- 'name': 'test_sec_group'}
-
- self.sec_group_rule = {'id': 'sec_group_rule_id',
- 'direction': 'direction',
- 'protocol': 'protocol',
- 'port_range_max': 'port_max',
- 'security_group_id': self.sec_group['id'],
- 'port_range_min': 'port_min'}
- self.neutron_floatingip = {'id': 'fip_id',
- 'floating_ip_address': 'test_ip'}
- self.neutron_client = mock.Mock()
- attrs = {'list_networks.return_value': {'networks': self.networks},
- 'list_routers.return_value': {'routers': [self.router]},
- 'list_ports.return_value': {'ports': [self.port]},
- 'list_subnets.return_value': {'subnets': [self.subnet]},
- 'create_network.return_value': {'network': self.networks[0]},
- 'create_subnet.return_value': {'subnets': [self.subnet]},
- 'create_router.return_value': {'router': self.router},
- 'create_port.return_value': {'port': self.port},
- 'create_floatingip.return_value': {'floatingip':
- self.neutron_floatingip},
- 'update_network.return_value': mock.Mock(),
- 'update_port.return_value': {'port': self.port},
- 'add_interface_router.return_value': mock.Mock(),
- 'add_gateway_router.return_value': mock.Mock(),
- 'delete_network.return_value': mock.Mock(),
- 'delete_subnet.return_value': mock.Mock(),
- 'delete_router.return_value': mock.Mock(),
- 'delete_port.return_value': mock.Mock(),
- 'remove_interface_router.return_value': mock.Mock(),
- 'remove_gateway_router.return_value': mock.Mock(),
- 'list_security_groups.return_value': {'security_groups':
- [self.sec_group]},
- 'list_security_group_rules.'
- 'return_value': {'security_group_rules':
- [self.sec_group_rule]},
- 'create_security_group_rule.return_value': mock.Mock(),
- 'create_security_group.return_value': {'security_group':
- self.sec_group},
- 'update_quota.return_value': mock.Mock(),
- 'delete_security_group.return_value': mock.Mock(),
- 'list_floatingips.return_value': {'floatingips':
- [self.floating_ip]},
- 'delete_floatingip.return_value': mock.Mock(),
- }
- self.neutron_client.configure_mock(**attrs)
-
- self.empty_client = mock.Mock()
- attrs = {'list_networks.return_value': {'networks': []},
- 'list_routers.return_value': {'routers': []},
- 'list_ports.return_value': {'ports': []},
- 'list_subnets.return_value': {'subnets': []}}
- self.empty_client.configure_mock(**attrs)
-
- @mock.patch('functest.utils.openstack_utils.os.getenv',
- return_value=None)
- def test_is_keystone_v3_missing_identity(self, mock_os_getenv):
- self.assertEqual(openstack_utils.is_keystone_v3(), False)
-
- @mock.patch('functest.utils.openstack_utils.os.getenv',
- return_value='3')
- def test_is_keystone_v3_default(self, mock_os_getenv):
- self.assertEqual(openstack_utils.is_keystone_v3(), True)
-
- @mock.patch('functest.utils.openstack_utils.is_keystone_v3',
- return_value=False)
- def test_get_rc_env_vars_missing_identity(self, mock_get_rc_env):
- exp_resp = self.env_vars
- exp_resp.extend(['OS_TENANT_NAME'])
- self.assertEqual(openstack_utils.get_rc_env_vars(), exp_resp)
-
- @mock.patch('functest.utils.openstack_utils.is_keystone_v3',
- return_value=True)
- def test_get_rc_env_vars_default(self, mock_get_rc_env):
- exp_resp = self.env_vars
- exp_resp.extend(['OS_PROJECT_NAME',
- 'OS_USER_DOMAIN_NAME',
- 'OS_PROJECT_DOMAIN_NAME'])
- self.assertEqual(openstack_utils.get_rc_env_vars(), exp_resp)
-
- @mock.patch('functest.utils.openstack_utils.get_rc_env_vars')
- def test_check_credentials_missing_env(self, mock_get_rc_env):
- exp_resp = self.env_vars
- exp_resp.extend(['OS_TENANT_NAME'])
- mock_get_rc_env.return_value = exp_resp
- with mock.patch.dict('functest.utils.openstack_utils.os.environ', {},
- clear=True):
- self.assertEqual(openstack_utils.check_credentials(), False)
-
- @mock.patch('functest.utils.openstack_utils.get_rc_env_vars')
- def test_check_credentials_default(self, mock_get_rc_env):
- exp_resp = ['OS_TENANT_NAME']
- mock_get_rc_env.return_value = exp_resp
- with mock.patch.dict('functest.utils.openstack_utils.os.environ',
- {'OS_TENANT_NAME': self.tenant_name},
- clear=True):
- self.assertEqual(openstack_utils.check_credentials(), True)
-
- def test_get_env_cred_dict(self):
- self.assertDictEqual(openstack_utils.get_env_cred_dict(),
- self.env_cred_dict)
-
- @mock.patch('functest.utils.openstack_utils.get_rc_env_vars')
- def test_get_credentials_default(self, mock_get_rc_env):
- mock_get_rc_env.return_value = self.env_cred_dict.keys()
- with mock.patch.dict('functest.utils.openstack_utils.os.environ',
- self.os_environs,
- clear=True):
- self.assertDictEqual(openstack_utils.get_credentials(),
- self.os_env_vars)
-
- def _get_credentials_missing_env(self, var):
- dic = copy.deepcopy(self.os_environs)
- dic.pop(var)
- with mock.patch('functest.utils.openstack_utils.get_rc_env_vars',
- return_value=self.env_cred_dict.keys()), \
- mock.patch.dict('functest.utils.openstack_utils.os.environ',
- dic,
- clear=True):
- self.assertRaises(openstack_utils.MissingEnvVar,
- lambda: openstack_utils.get_credentials())
-
- def test_get_credentials_missing_username(self):
- self._get_credentials_missing_env('OS_USERNAME')
-
- def test_get_credentials_missing_password(self):
- self._get_credentials_missing_env('OS_PASSWORD')
-
- def test_get_credentials_missing_auth_url(self):
- self._get_credentials_missing_env('OS_AUTH_URL')
-
- def test_get_credentials_missing_tenantname(self):
- self._get_credentials_missing_env('OS_TENANT_NAME')
-
- def test_get_credentials_missing_domainname(self):
- self._get_credentials_missing_env('OS_USER_DOMAIN_NAME')
-
- def test_get_credentials_missing_projectname(self):
- self._get_credentials_missing_env('OS_PROJECT_NAME')
-
- def test_get_credentials_missing_endpoint_type(self):
- self._get_credentials_missing_env('OS_ENDPOINT_TYPE')
-
- @mock.patch('functest.utils.openstack_utils.os.getenv',
- return_value=None)
- def test_get_keystone_client_version_missing_env(self, mock_os_getenv):
- self.assertEqual(openstack_utils.get_keystone_client_version(),
- openstack_utils.DEFAULT_API_VERSION)
-
- @mock.patch('functest.utils.openstack_utils.logger.info')
- @mock.patch('functest.utils.openstack_utils.os.getenv',
- return_value='3')
- def test_get_keystone_client_version_default(self, mock_os_getenv,
- mock_logger_info):
- self.assertEqual(openstack_utils.get_keystone_client_version(),
- '3')
- mock_logger_info.assert_called_once_with("OS_IDENTITY_API_VERSION is "
- "set in env as '%s'", '3')
-
- @mock.patch('functest.utils.openstack_utils.get_session')
- @mock.patch('functest.utils.openstack_utils.keystoneclient.Client')
- @mock.patch('functest.utils.openstack_utils.get_keystone_client_version',
- return_value='3')
- @mock.patch('functest.utils.openstack_utils.os.getenv',
- return_value='public')
- def test_get_keystone_client_with_interface(self, mock_os_getenv,
- mock_keystoneclient_version,
- mock_key_client,
- mock_get_session):
- mock_keystone_obj = mock.Mock()
- mock_session_obj = mock.Mock()
- mock_key_client.return_value = mock_keystone_obj
- mock_get_session.return_value = mock_session_obj
- self.assertEqual(openstack_utils.get_keystone_client(),
- mock_keystone_obj)
- mock_key_client.assert_called_once_with('3',
- session=mock_session_obj,
- interface='public')
-
- @mock.patch('functest.utils.openstack_utils.get_session')
- @mock.patch('functest.utils.openstack_utils.keystoneclient.Client')
- @mock.patch('functest.utils.openstack_utils.get_keystone_client_version',
- return_value='3')
- @mock.patch('functest.utils.openstack_utils.os.getenv',
- return_value='admin')
- def test_get_keystone_client_no_interface(self, mock_os_getenv,
- mock_keystoneclient_version,
- mock_key_client,
- mock_get_session):
- mock_keystone_obj = mock.Mock()
- mock_session_obj = mock.Mock()
- mock_key_client.return_value = mock_keystone_obj
- mock_get_session.return_value = mock_session_obj
- self.assertEqual(openstack_utils.get_keystone_client(),
- mock_keystone_obj)
- mock_key_client.assert_called_once_with('3',
- session=mock_session_obj,
- interface='admin')
-
- @mock.patch('functest.utils.openstack_utils.os.getenv',
- return_value=None)
- def test_get_nova_client_version_missing_env(self, mock_os_getenv):
- self.assertEqual(openstack_utils.get_nova_client_version(),
- openstack_utils.DEFAULT_API_VERSION)
-
- @mock.patch('functest.utils.openstack_utils.logger.info')
- @mock.patch('functest.utils.openstack_utils.os.getenv',
- return_value='3')
- def test_get_nova_client_version_default(self, mock_os_getenv,
- mock_logger_info):
- self.assertEqual(openstack_utils.get_nova_client_version(),
- '3')
- mock_logger_info.assert_called_once_with("OS_COMPUTE_API_VERSION is "
- "set in env as '%s'", '3')
-
- def test_get_nova_client(self):
- mock_nova_obj = mock.Mock()
- mock_session_obj = mock.Mock()
- with mock.patch('functest.utils.openstack_utils'
- '.get_nova_client_version', return_value='3'), \
- mock.patch('functest.utils.openstack_utils'
- '.novaclient.Client',
- return_value=mock_nova_obj) \
- as mock_nova_client, \
- mock.patch('functest.utils.openstack_utils.get_session',
- return_value=mock_session_obj):
- self.assertEqual(openstack_utils.get_nova_client(),
- mock_nova_obj)
- mock_nova_client.assert_called_once_with('3',
- session=mock_session_obj)
-
- @mock.patch('functest.utils.openstack_utils.os.getenv',
- return_value=None)
- def test_get_cinder_client_version_missing_env(self, mock_os_getenv):
- self.assertEqual(openstack_utils.get_cinder_client_version(),
- openstack_utils.DEFAULT_API_VERSION)
-
- @mock.patch('functest.utils.openstack_utils.logger.info')
- @mock.patch('functest.utils.openstack_utils.os.getenv',
- return_value='3')
- def test_get_cinder_client_version_default(self, mock_os_getenv,
- mock_logger_info):
- self.assertEqual(openstack_utils.get_cinder_client_version(),
- '3')
- mock_logger_info.assert_called_once_with("OS_VOLUME_API_VERSION is "
- "set in env as '%s'", '3')
-
- def test_get_cinder_client(self):
- mock_cinder_obj = mock.Mock()
- mock_session_obj = mock.Mock()
- with mock.patch('functest.utils.openstack_utils'
- '.get_cinder_client_version', return_value='3'), \
- mock.patch('functest.utils.openstack_utils'
- '.cinderclient.Client',
- return_value=mock_cinder_obj) \
- as mock_cind_client, \
- mock.patch('functest.utils.openstack_utils.get_session',
- return_value=mock_session_obj):
- self.assertEqual(openstack_utils.get_cinder_client(),
- mock_cinder_obj)
- mock_cind_client.assert_called_once_with('3',
- session=mock_session_obj)
-
- @mock.patch('functest.utils.openstack_utils.os.getenv',
- return_value=None)
- def test_get_neutron_client_version_missing_env(self, mock_os_getenv):
- self.assertEqual(openstack_utils.get_neutron_client_version(),
- openstack_utils.DEFAULT_API_VERSION)
-
- @mock.patch('functest.utils.openstack_utils.logger.info')
- @mock.patch('functest.utils.openstack_utils.os.getenv',
- return_value='3')
- def test_get_neutron_client_version_default(self, mock_os_getenv,
- mock_logger_info):
- self.assertEqual(openstack_utils.get_neutron_client_version(),
- '3')
- mock_logger_info.assert_called_once_with("OS_NETWORK_API_VERSION is "
- "set in env as '%s'", '3')
-
- def test_get_neutron_client(self):
- mock_neutron_obj = mock.Mock()
- mock_session_obj = mock.Mock()
- with mock.patch('functest.utils.openstack_utils'
- '.get_neutron_client_version', return_value='3'), \
- mock.patch('functest.utils.openstack_utils'
- '.neutronclient.Client',
- return_value=mock_neutron_obj) \
- as mock_neut_client, \
- mock.patch('functest.utils.openstack_utils.get_session',
- return_value=mock_session_obj):
- self.assertEqual(openstack_utils.get_neutron_client(),
- mock_neutron_obj)
- mock_neut_client.assert_called_once_with('3',
- session=mock_session_obj)
-
- @mock.patch('functest.utils.openstack_utils.os.getenv',
- return_value=None)
- def test_get_glance_client_version_missing_env(self, mock_os_getenv):
- self.assertEqual(openstack_utils.get_glance_client_version(),
- openstack_utils.DEFAULT_API_VERSION)
-
- @mock.patch('functest.utils.openstack_utils.logger.info')
- @mock.patch('functest.utils.openstack_utils.os.getenv',
- return_value='3')
- def test_get_glance_client_version_default(self, mock_os_getenv,
- mock_logger_info):
- self.assertEqual(openstack_utils.get_glance_client_version(),
- '3')
- mock_logger_info.assert_called_once_with("OS_IMAGE_API_VERSION is "
- "set in env as '%s'", '3')
-
- def test_get_glance_client(self):
- mock_glance_obj = mock.Mock()
- mock_session_obj = mock.Mock()
- with mock.patch('functest.utils.openstack_utils'
- '.get_glance_client_version', return_value='3'), \
- mock.patch('functest.utils.openstack_utils'
- '.glanceclient.Client',
- return_value=mock_glance_obj) \
- as mock_glan_client, \
- mock.patch('functest.utils.openstack_utils.get_session',
- return_value=mock_session_obj):
- self.assertEqual(openstack_utils.get_glance_client(),
- mock_glance_obj)
- mock_glan_client.assert_called_once_with('3',
- session=mock_session_obj)
-
- @mock.patch('functest.utils.openstack_utils.os.getenv',
- return_value=None)
- def test_get_heat_client_version_missing_env(self, mock_os_getenv):
- self.assertEqual(openstack_utils.get_heat_client_version(),
- openstack_utils.DEFAULT_HEAT_API_VERSION)
-
- @mock.patch('functest.utils.openstack_utils.logger.info')
- @mock.patch('functest.utils.openstack_utils.os.getenv', return_value='1')
- def test_get_heat_client_version_default(self, mock_os_getenv,
- mock_logger_info):
- self.assertEqual(openstack_utils.get_heat_client_version(), '1')
- mock_logger_info.assert_called_once_with(
- "OS_ORCHESTRATION_API_VERSION is set in env as '%s'", '1')
-
- def test_get_heat_client(self):
- mock_heat_obj = mock.Mock()
- mock_session_obj = mock.Mock()
- with mock.patch('functest.utils.openstack_utils'
- '.get_heat_client_version', return_value='1'), \
- mock.patch('functest.utils.openstack_utils'
- '.heatclient.Client',
- return_value=mock_heat_obj) \
- as mock_heat_client, \
- mock.patch('functest.utils.openstack_utils.get_session',
- return_value=mock_session_obj):
- self.assertEqual(openstack_utils.get_heat_client(),
- mock_heat_obj)
- mock_heat_client.assert_called_once_with('1',
- session=mock_session_obj)
-
- def test_get_instances_default(self):
- self.assertEqual(openstack_utils.get_instances(self.nova_client),
- [self.instance])
-
- @mock.patch('functest.utils.openstack_utils.logger.error')
- def test_get_instances_exception(self, mock_logger_error):
- self.assertEqual(openstack_utils.
- get_instances(Exception),
- None)
- self.assertTrue(mock_logger_error.called)
-
- def test_get_instance_status_default(self):
- self.assertEqual(openstack_utils.get_instance_status(self.nova_client,
- self.instance),
- 'ok')
-
- @mock.patch('functest.utils.openstack_utils.logger.error')
- def test_get_instance_status_exception(self, mock_logger_error):
- self.assertEqual(openstack_utils.
- get_instance_status(Exception,
- self.instance),
- None)
- self.assertTrue(mock_logger_error.called)
-
- def test_get_instance_by_name_default(self):
- self.assertEqual(openstack_utils.
- get_instance_by_name(self.nova_client,
- 'test_instance'),
- self.instance)
-
- @mock.patch('functest.utils.openstack_utils.logger.error')
- def test_get_instance_by_name_exception(self, mock_logger_error):
- self.assertEqual(openstack_utils.
- get_instance_by_name(Exception,
- 'test_instance'),
- None)
- self.assertTrue(mock_logger_error.called)
-
- def test_get_flavor_id_default(self):
- self.assertEqual(openstack_utils.
- get_flavor_id(self.nova_client,
- 'test_flavor'),
- self.flavor.id)
-
- def test_get_flavor_id_by_ram_range_default(self):
- self.assertEqual(openstack_utils.
- get_flavor_id_by_ram_range(self.nova_client,
- 1, 3),
- self.flavor.id)
-
- def test_get_aggregates_default(self):
- self.assertEqual(openstack_utils.
- get_aggregates(self.nova_client),
- [self.aggregate])
-
- @mock.patch('functest.utils.openstack_utils.logger.error')
- def test_get_aggregates_exception(self, mock_logger_error):
- self.assertEqual(openstack_utils.
- get_aggregates(Exception),
- None)
- self.assertTrue(mock_logger_error.called)
-
- def test_get_aggregate_id_default(self):
- with mock.patch('functest.utils.openstack_utils.get_aggregates',
- return_value=[self.aggregate]):
- self.assertEqual(openstack_utils.
- get_aggregate_id(self.nova_client,
- 'test_aggregate'),
- 'aggregate_id')
-
- @mock.patch('functest.utils.openstack_utils.logger.error')
- def test_get_aggregate_id_exception(self, mock_logger_error):
- self.assertEqual(openstack_utils.
- get_aggregate_id(Exception,
- 'test_aggregate'),
- None)
- self.assertTrue(mock_logger_error.called)
-
- def test_get_availability_zone_names_default(self):
- with mock.patch('functest.utils.openstack_utils'
- '.get_availability_zones',
- return_value=[self.availability_zone]):
- self.assertEqual(openstack_utils.
- get_availability_zone_names(self.nova_client),
- ['test_azone'])
-
- @mock.patch('functest.utils.openstack_utils.logger.error')
- def test_get_availability_zone_names_exception(self, mock_logger_error):
- self.assertEqual(openstack_utils.
- get_availability_zone_names(Exception),
- None)
- self.assertTrue(mock_logger_error.called)
-
- def test_get_availability_zones_default(self):
- self.assertEqual(openstack_utils.
- get_availability_zones(self.nova_client),
- [self.availability_zone])
-
- @mock.patch('functest.utils.openstack_utils.logger.error')
- def test_get_availability_zones_exception(self, mock_logger_error):
- self.assertEqual(openstack_utils.
- get_availability_zones(Exception),
- None)
- self.assertTrue(mock_logger_error.called)
-
- def test_get_floating_ips_default(self):
- self.assertEqual(openstack_utils.
- get_floating_ips(self.neutron_client),
- [self.floating_ip])
-
- @mock.patch('functest.utils.openstack_utils.logger.error')
- def test_get_floating_ips_exception(self, mock_logger_error):
- self.assertEqual(openstack_utils.
- get_floating_ips(Exception),
- None)
- self.assertTrue(mock_logger_error.called)
-
- def test_get_hypervisors_default(self):
- self.assertEqual(openstack_utils.
- get_hypervisors(self.nova_client),
- ['test_hostname'])
-
- @mock.patch('functest.utils.openstack_utils.logger.error')
- def test_get_hypervisors_exception(self, mock_logger_error):
- self.assertEqual(openstack_utils.
- get_hypervisors(Exception),
- None)
- self.assertTrue(mock_logger_error.called)
-
- def test_create_aggregate_default(self):
- self.assertTrue(openstack_utils.
- create_aggregate(self.nova_client,
- 'test_aggregate',
- 'azone'))
-
- @mock.patch('functest.utils.openstack_utils.logger.error')
- def test_create_aggregate_exception(self, mock_logger_error):
- self.assertEqual(openstack_utils.
- create_aggregate(Exception,
- 'test_aggregate',
- 'azone'),
- None)
- self.assertTrue(mock_logger_error.called)
-
- def test_add_host_to_aggregate_default(self):
- with mock.patch('functest.utils.openstack_utils.get_aggregate_id'):
- self.assertTrue(openstack_utils.
- add_host_to_aggregate(self.nova_client,
- 'test_aggregate',
- 'test_hostname'))
-
- @mock.patch('functest.utils.openstack_utils.logger.error')
- def test_add_host_to_aggregate_exception(self, mock_logger_error):
- self.assertEqual(openstack_utils.
- add_host_to_aggregate(Exception,
- 'test_aggregate',
- 'test_hostname'),
- None)
- self.assertTrue(mock_logger_error.called)
-
- def test_create_aggregate_with_host_default(self):
- with mock.patch('functest.utils.openstack_utils.create_aggregate'), \
- mock.patch('functest.utils.openstack_utils.'
- 'add_host_to_aggregate'):
- self.assertTrue(openstack_utils.
- create_aggregate_with_host(self.nova_client,
- 'test_aggregate',
- 'test_azone',
- 'test_hostname'))
-
- @mock.patch('functest.utils.openstack_utils.logger.error')
- def test_create_aggregate_with_host_exception(self, mock_logger_error):
- with mock.patch('functest.utils.openstack_utils.create_aggregate',
- side_effect=Exception):
- self.assertEqual(openstack_utils.
- create_aggregate_with_host(Exception,
- 'test_aggregate',
- 'test_azone',
- 'test_hostname'),
- None)
- self.assertTrue(mock_logger_error.called)
-
- def test_create_instance_default(self):
- with mock.patch('functest.utils.openstack_utils.'
- 'get_nova_client',
- return_value=self.nova_client):
- self.assertEqual(openstack_utils.
- create_instance('test_flavor',
- 'image_id',
- 'network_id'),
- self.instance)
-
- @mock.patch('functest.utils.openstack_utils.logger.error')
- def test_create_instance_exception(self, mock_logger_error):
- with mock.patch('functest.utils.openstack_utils.'
- 'get_nova_client',
- return_value=self.nova_client):
- self.nova_client.flavors.find.side_effect = Exception
- self.assertEqual(openstack_utils.
- create_instance('test_flavor',
- 'image_id',
- 'network_id'),
- None)
- self.assertTrue(mock_logger_error)
-
- def test_create_floating_ip_default(self):
- with mock.patch('functest.utils.openstack_utils.'
- 'get_external_net_id',
- return_value='external_net_id'):
- exp_resp = {'fip_addr': 'test_ip', 'fip_id': 'fip_id'}
- self.assertEqual(openstack_utils.
- create_floating_ip(self.neutron_client),
- exp_resp)
-
- @mock.patch('functest.utils.openstack_utils.logger.error')
- def test_create_floating_ip_exception(self, mock_logger_error):
- with mock.patch('functest.utils.openstack_utils.'
- 'get_external_net_id',
- return_value='external_net_id'):
- self.assertEqual(openstack_utils.
- create_floating_ip(Exception),
- None)
- self.assertTrue(mock_logger_error)
-
- def test_add_floating_ip_default(self):
- with mock.patch('functest.utils.openstack_utils.get_aggregate_id'):
- self.assertTrue(openstack_utils.
- add_floating_ip(self.nova_client,
- 'test_serverid',
- 'test_floatingip_addr'))
-
- @mock.patch('functest.utils.openstack_utils.logger.error')
- def test_add_floating_ip_exception(self, mock_logger_error):
- self.assertFalse(openstack_utils.
- add_floating_ip(Exception,
- 'test_serverid',
- 'test_floatingip_addr'))
- self.assertTrue(mock_logger_error.called)
-
- def test_delete_instance_default(self):
- self.assertTrue(openstack_utils.
- delete_instance(self.nova_client,
- 'instance_id'))
-
- @mock.patch('functest.utils.openstack_utils.logger.error')
- def test_delete_instance_exception(self, mock_logger_error):
- self.assertFalse(openstack_utils.
- delete_instance(Exception,
- 'instance_id'))
- self.assertTrue(mock_logger_error.called)
-
- def test_delete_floating_ip_default(self):
- self.assertTrue(openstack_utils.
- delete_floating_ip(self.neutron_client,
- 'floating_ip_id'))
-
- @mock.patch('functest.utils.openstack_utils.logger.error')
- def test_delete_floating_ip_exception(self, mock_logger_error):
- self.assertFalse(openstack_utils.
- delete_floating_ip(Exception,
- 'floating_ip_id'))
- self.assertTrue(mock_logger_error.called)
-
- def test_remove_host_from_aggregate_default(self):
- with mock.patch('functest.utils.openstack_utils.'
- 'get_aggregate_id'):
- self.assertTrue(openstack_utils.
- remove_host_from_aggregate(self.nova_client,
- 'agg_name',
- 'host_name'))
-
- @mock.patch('functest.utils.openstack_utils.logger.error')
- def test_remove_host_from_aggregate_exception(self, mock_logger_error):
- with mock.patch('functest.utils.openstack_utils.'
- 'get_aggregate_id', side_effect=Exception):
- self.assertFalse(openstack_utils.
- remove_host_from_aggregate(self.nova_client,
- 'agg_name',
- 'host_name'))
- self.assertTrue(mock_logger_error.called)
-
- def test_remove_hosts_from_aggregate_default(self):
- with mock.patch('functest.utils.openstack_utils.'
- 'get_aggregate_id'), \
- mock.patch('functest.utils.openstack_utils.'
- 'remove_host_from_aggregate',
- return_value=True) \
- as mock_method:
- openstack_utils.remove_hosts_from_aggregate(self.nova_client,
- 'test_aggregate')
- mock_method.assert_any_call(self.nova_client,
- 'test_aggregate',
- 'host_name')
-
- def test_delete_aggregate_default(self):
- with mock.patch('functest.utils.openstack_utils.'
- 'remove_hosts_from_aggregate'):
- self.assertTrue(openstack_utils.
- delete_aggregate(self.nova_client,
- 'agg_name'))
-
- @mock.patch('functest.utils.openstack_utils.logger.error')
- def test_delete_aggregate_exception(self, mock_logger_error):
- with mock.patch('functest.utils.openstack_utils.'
- 'remove_hosts_from_aggregate', side_effect=Exception):
- self.assertFalse(openstack_utils.
- delete_aggregate(self.nova_client,
- 'agg_name'))
- self.assertTrue(mock_logger_error.called)
-
- def test_get_network_list_default(self):
- self.assertEqual(openstack_utils.
- get_network_list(self.neutron_client),
- self.networks)
-
- def test_get_network_list_missing_network(self):
- self.assertEqual(openstack_utils.
- get_network_list(self.empty_client),
- None)
-
- def test_get_router_list_default(self):
- self.assertEqual(openstack_utils.
- get_router_list(self.neutron_client),
- [self.router])
-
- def test_get_router_list_missing_router(self):
- self.assertEqual(openstack_utils.
- get_router_list(self.empty_client),
- None)
-
- def test_get_port_list_default(self):
- self.assertEqual(openstack_utils.
- get_port_list(self.neutron_client),
- [self.port])
-
- def test_get_port_list_missing_port(self):
- self.assertEqual(openstack_utils.
- get_port_list(self.empty_client),
- None)
-
- def test_get_network_id_default(self):
- self.assertEqual(openstack_utils.
- get_network_id(self.neutron_client,
- 'test_network'),
- 'network_id')
-
- def test_get_subnet_id_default(self):
- self.assertEqual(openstack_utils.
- get_subnet_id(self.neutron_client,
- 'test_subnet'),
- 'subnet_id')
-
- def test_get_router_id_default(self):
- self.assertEqual(openstack_utils.
- get_router_id(self.neutron_client,
- 'test_router'),
- 'router_id')
-
- def test_get_private_net_default(self):
- self.assertEqual(openstack_utils.
- get_private_net(self.neutron_client),
- self.networks[0])
-
- def test_get_private_net_missing_net(self):
- self.assertEqual(openstack_utils.
- get_private_net(self.empty_client),
- None)
-
- def test_get_external_net_default(self):
- self.assertEqual(openstack_utils.
- get_external_net(self.neutron_client),
- 'test_network1')
-
- def test_get_external_net_missing_net(self):
- self.assertEqual(openstack_utils.
- get_external_net(self.empty_client),
- None)
-
- def test_get_external_net_id_default(self):
- self.assertEqual(openstack_utils.
- get_external_net_id(self.neutron_client),
- 'network_id1')
-
- def test_get_external_net_id_missing_net(self):
- self.assertEqual(openstack_utils.
- get_external_net_id(self.empty_client),
- None)
-
- def test_check_neutron_net_default(self):
- self.assertTrue(openstack_utils.
- check_neutron_net(self.neutron_client,
- 'test_network'))
-
- def test_check_neutron_net_missing_net(self):
- self.assertFalse(openstack_utils.
- check_neutron_net(self.empty_client,
- 'test_network'))
-
- def test_create_neutron_net_default(self):
- self.assertEqual(openstack_utils.
- create_neutron_net(self.neutron_client,
- 'test_network'),
- 'network_id')
-
- @mock.patch('functest.utils.openstack_utils.logger.error')
- def test_create_neutron_net_exception(self, mock_logger_error):
- self.assertEqual(openstack_utils.
- create_neutron_net(Exception,
- 'test_network'),
- None)
- self.assertTrue(mock_logger_error.called)
-
- def test_create_neutron_subnet_default(self):
- self.assertEqual(openstack_utils.
- create_neutron_subnet(self.neutron_client,
- 'test_subnet',
- 'test_cidr',
- 'network_id'),
- 'subnet_id')
-
- @mock.patch('functest.utils.openstack_utils.logger.error')
- def test_create_neutron_subnet_exception(self, mock_logger_error):
- self.assertEqual(openstack_utils.
- create_neutron_subnet(Exception,
- 'test_subnet',
- 'test_cidr',
- 'network_id'),
- None)
- self.assertTrue(mock_logger_error.called)
-
- def test_create_neutron_router_default(self):
- self.assertEqual(openstack_utils.
- create_neutron_router(self.neutron_client,
- 'test_router'),
- 'router_id')
-
- @mock.patch('functest.utils.openstack_utils.logger.error')
- def test_create_neutron_router_exception(self, mock_logger_error):
- self.assertEqual(openstack_utils.
- create_neutron_router(Exception,
- 'test_router'),
- None)
- self.assertTrue(mock_logger_error.called)
-
- def test_create_neutron_port_default(self):
- self.assertEqual(openstack_utils.
- create_neutron_port(self.neutron_client,
- 'test_port',
- 'network_id',
- 'test_ip'),
- 'port_id')
-
- @mock.patch('functest.utils.openstack_utils.logger.error')
- def test_create_neutron_port_exception(self, mock_logger_error):
- self.assertEqual(openstack_utils.
- create_neutron_port(Exception,
- 'test_port',
- 'network_id',
- 'test_ip'),
- None)
- self.assertTrue(mock_logger_error.called)
-
- def test_update_neutron_net_default(self):
- self.assertTrue(openstack_utils.
- update_neutron_net(self.neutron_client,
- 'network_id'))
-
- @mock.patch('functest.utils.openstack_utils.logger.error')
- def test_update_neutron_net_exception(self, mock_logger_error):
- self.assertFalse(openstack_utils.
- update_neutron_net(Exception,
- 'network_id'))
- self.assertTrue(mock_logger_error.called)
-
- def test_update_neutron_port_default(self):
- self.assertEqual(openstack_utils.
- update_neutron_port(self.neutron_client,
- 'port_id',
- 'test_owner'),
- 'port_id')
-
- @mock.patch('functest.utils.openstack_utils.logger.error')
- def test_update_neutron_port_exception(self, mock_logger_error):
- self.assertEqual(openstack_utils.
- update_neutron_port(Exception,
- 'port_id',
- 'test_owner'),
- None)
- self.assertTrue(mock_logger_error.called)
-
- def test_add_interface_router_default(self):
- self.assertTrue(openstack_utils.
- add_interface_router(self.neutron_client,
- 'router_id',
- 'subnet_id'))
-
- @mock.patch('functest.utils.openstack_utils.logger.error')
- def test_add_interface_router_exception(self, mock_logger_error):
- self.assertFalse(openstack_utils.
- add_interface_router(Exception,
- 'router_id',
- 'subnet_id'))
- self.assertTrue(mock_logger_error.called)
-
- def test_add_gateway_router_default(self):
- with mock.patch('functest.utils.openstack_utils.'
- 'get_external_net_id',
- return_value='network_id'):
- self.assertTrue(openstack_utils.
- add_gateway_router(self.neutron_client,
- 'router_id'))
-
- @mock.patch('functest.utils.openstack_utils.logger.error')
- def test_add_gateway_router_exception(self, mock_logger_error):
- with mock.patch('functest.utils.openstack_utils.'
- 'get_external_net_id',
- return_value='network_id'):
- self.assertFalse(openstack_utils.
- add_gateway_router(Exception,
- 'router_id'))
- self.assertTrue(mock_logger_error.called)
-
- def test_delete_neutron_net_default(self):
- self.assertTrue(openstack_utils.
- delete_neutron_net(self.neutron_client,
- 'network_id'))
-
- @mock.patch('functest.utils.openstack_utils.logger.error')
- def test_delete_neutron_net_exception(self, mock_logger_error):
- self.assertFalse(openstack_utils.
- delete_neutron_net(Exception,
- 'network_id'))
- self.assertTrue(mock_logger_error.called)
-
- def test_delete_neutron_subnet_default(self):
- self.assertTrue(openstack_utils.
- delete_neutron_subnet(self.neutron_client,
- 'subnet_id'))
-
- @mock.patch('functest.utils.openstack_utils.logger.error')
- def test_delete_neutron_subnet_exception(self, mock_logger_error):
- self.assertFalse(openstack_utils.
- delete_neutron_subnet(Exception,
- 'subnet_id'))
- self.assertTrue(mock_logger_error.called)
-
- def test_delete_neutron_router_default(self):
- self.assertTrue(openstack_utils.
- delete_neutron_router(self.neutron_client,
- 'router_id'))
-
- @mock.patch('functest.utils.openstack_utils.logger.error')
- def test_delete_neutron_router_exception(self, mock_logger_error):
- self.assertFalse(openstack_utils.
- delete_neutron_router(Exception,
- 'router_id'))
- self.assertTrue(mock_logger_error.called)
-
- def test_delete_neutron_port_default(self):
- self.assertTrue(openstack_utils.
- delete_neutron_port(self.neutron_client,
- 'port_id'))
-
- @mock.patch('functest.utils.openstack_utils.logger.error')
- def test_delete_neutron_port_exception(self, mock_logger_error):
- self.assertFalse(openstack_utils.
- delete_neutron_port(Exception,
- 'port_id'))
- self.assertTrue(mock_logger_error.called)
-
- def test_remove_interface_router_default(self):
- self.assertTrue(openstack_utils.
- remove_interface_router(self.neutron_client,
- 'router_id',
- 'subnet_id'))
-
- @mock.patch('functest.utils.openstack_utils.logger.error')
- def test_remove_interface_router_exception(self, mock_logger_error):
- self.assertFalse(openstack_utils.
- remove_interface_router(Exception,
- 'router_id',
- 'subnet_id'))
- self.assertTrue(mock_logger_error.called)
-
- def test_remove_gateway_router_default(self):
- self.assertTrue(openstack_utils.
- remove_gateway_router(self.neutron_client,
- 'router_id'))
-
- @mock.patch('functest.utils.openstack_utils.logger.error')
- def test_remove_gateway_router_exception(self, mock_logger_error):
- self.assertFalse(openstack_utils.
- remove_gateway_router(Exception,
- 'router_id'))
- self.assertTrue(mock_logger_error.called)
-
- def test_get_security_groups_default(self):
- self.assertEqual(openstack_utils.
- get_security_groups(self.neutron_client),
- [self.sec_group])
-
- @mock.patch('functest.utils.openstack_utils.logger.error')
- def test_get_security_groups_exception(self, mock_logger_error):
- self.assertEqual(openstack_utils.
- get_security_groups(Exception),
- None)
- self.assertTrue(mock_logger_error.called)
-
- def test_get_security_group_id_default(self):
- with mock.patch('functest.utils.openstack_utils.'
- 'get_security_groups',
- return_value=[self.sec_group]):
- self.assertEqual(openstack_utils.
- get_security_group_id(self.neutron_client,
- 'test_sec_group'),
- 'sec_group_id')
-
- def test_get_security_group_rules_default(self):
- self.assertEqual(openstack_utils.
- get_security_group_rules(self.neutron_client,
- self.sec_group['id']),
- [self.sec_group_rule])
-
- @mock.patch('functest.utils.openstack_utils.logger.error')
- def test_get_security_group_rules_exception(self, mock_logger_error):
- self.assertEqual(openstack_utils.
- get_security_group_rules(Exception,
- 'sec_group_id'),
- None)
- self.assertTrue(mock_logger_error.called)
-
- def test_check_security_group_rules_not_exists(self):
- self.assertEqual(openstack_utils.
- check_security_group_rules(self.neutron_client,
- 'sec_group_id_2',
- 'direction',
- 'protocol',
- 'port_min',
- 'port_max'),
- True)
-
- def test_check_security_group_rules_exists(self):
- self.assertEqual(openstack_utils.
- check_security_group_rules(self.neutron_client,
- self.sec_group['id'],
- 'direction',
- 'protocol',
- 'port_min',
- 'port_max'),
- False)
-
- @mock.patch('functest.utils.openstack_utils.logger.error')
- def test_check_security_group_rules_exception(self, mock_logger_error):
- self.assertEqual(openstack_utils.
- check_security_group_rules(Exception,
- 'sec_group_id',
- 'direction',
- 'protocol',
- 'port_max',
- 'port_min'),
- None)
- self.assertTrue(mock_logger_error.called)
-
- def test_create_security_group_default(self):
- self.assertEqual(openstack_utils.
- create_security_group(self.neutron_client,
- 'test_sec_group',
- 'sec_group_desc'),
- self.sec_group)
-
- @mock.patch('functest.utils.openstack_utils.logger.error')
- def test_create_security_group_exception(self, mock_logger_error):
- self.assertEqual(openstack_utils.
- create_security_group(Exception,
- 'test_sec_group',
- 'sec_group_desc'),
- None)
- self.assertTrue(mock_logger_error.called)
-
- def test_create_secgroup_rule_default(self):
- self.assertTrue(openstack_utils.
- create_secgroup_rule(self.neutron_client,
- 'sg_id',
- 'direction',
- 'protocol',
- 80,
- 80))
- self.assertTrue(openstack_utils.
- create_secgroup_rule(self.neutron_client,
- 'sg_id',
- 'direction',
- 'protocol'))
-
- @mock.patch('functest.utils.openstack_utils.logger.error')
- def test_create_secgroup_rule_invalid_port_range(self, mock_logger_error):
- self.assertFalse(openstack_utils.
- create_secgroup_rule(self.neutron_client,
- 'sg_id',
- 'direction',
- 'protocol',
- 80))
-
- @mock.patch('functest.utils.openstack_utils.logger.error')
- def test_create_secgroup_rule_exception(self, mock_logger_error):
- self.assertFalse(openstack_utils.
- create_secgroup_rule(Exception,
- 'sg_id',
- 'direction',
- 'protocol'))
-
- @mock.patch('functest.utils.openstack_utils.logger.info')
- def test_create_security_group_full_default(self, mock_logger_info):
- with mock.patch('functest.utils.openstack_utils.'
- 'get_security_group_id',
- return_value='sg_id'):
- self.assertEqual(openstack_utils.
- create_security_group_full(self.neutron_client,
- 'sg_name',
- 'sg_desc'),
- 'sg_id')
- self.assertTrue(mock_logger_info)
-
- @mock.patch('functest.utils.openstack_utils.logger.info')
- @mock.patch('functest.utils.openstack_utils.logger.error')
- def test_create_security_group_full_sec_group_fail(self,
- mock_logger_error,
- mock_logger_info):
- with mock.patch('functest.utils.openstack_utils.'
- 'get_security_group_id',
- return_value=''), \
- mock.patch('functest.utils.openstack_utils.'
- 'create_security_group',
- return_value=False):
- self.assertEqual(openstack_utils.
- create_security_group_full(self.neutron_client,
- 'sg_name',
- 'sg_desc'),
- None)
- self.assertTrue(mock_logger_error)
- self.assertTrue(mock_logger_info)
-
- @mock.patch('functest.utils.openstack_utils.logger.debug')
- @mock.patch('functest.utils.openstack_utils.logger.info')
- @mock.patch('functest.utils.openstack_utils.logger.error')
- def test_create_security_group_full_secgroup_rule_fail(self,
- mock_logger_error,
- mock_logger_info,
- mock_logger_debug):
- with mock.patch('functest.utils.openstack_utils.'
- 'get_security_group_id',
- return_value=''), \
- mock.patch('functest.utils.openstack_utils.'
- 'create_security_group',
- return_value={'id': 'sg_id',
- 'name': 'sg_name'}), \
- mock.patch('functest.utils.openstack_utils.'
- 'create_secgroup_rule',
- return_value=False):
- self.assertEqual(openstack_utils.
- create_security_group_full(self.neutron_client,
- 'sg_name',
- 'sg_desc'),
- None)
- self.assertTrue(mock_logger_error)
- self.assertTrue(mock_logger_info)
- self.assertTrue(mock_logger_debug)
-
- def test_add_secgroup_to_instance_default(self):
- self.assertTrue(openstack_utils.
- add_secgroup_to_instance(self.nova_client,
- 'instance_id',
- 'sec_group_id'))
-
- @mock.patch('functest.utils.openstack_utils.logger.error')
- def test_add_secgroup_to_instance_exception(self, mock_logger_error):
- self.assertFalse(openstack_utils.
- add_secgroup_to_instance(Exception,
- 'instance_id',
- 'sec_group_id'))
- self.assertTrue(mock_logger_error.called)
-
- def test_update_sg_quota_default(self):
- self.assertTrue(openstack_utils.
- update_sg_quota(self.neutron_client,
- 'tenant_id',
- 'sg_quota',
- 'sg_rule_quota'))
-
- @mock.patch('functest.utils.openstack_utils.logger.error')
- def test_update_sg_quota_exception(self, mock_logger_error):
- self.assertFalse(openstack_utils.
- update_sg_quota(Exception,
- 'tenant_id',
- 'sg_quota',
- 'sg_rule_quota'))
- self.assertTrue(mock_logger_error.called)
-
- def test_delete_security_group_default(self):
- self.assertTrue(openstack_utils.
- delete_security_group(self.neutron_client,
- 'sec_group_id'))
-
- @mock.patch('functest.utils.openstack_utils.logger.error')
- def test_delete_security_group_exception(self, mock_logger_error):
- self.assertFalse(openstack_utils.
- delete_security_group(Exception,
- 'sec_group_id'))
- self.assertTrue(mock_logger_error.called)
-
- def test_get_images_default(self):
- self.assertEqual(openstack_utils.
- get_images(self.glance_client),
- [self.image])
-
- @mock.patch('functest.utils.openstack_utils.logger.error')
- def test_get_images_exception(self, mock_logger_error):
- self.assertEqual(openstack_utils.
- get_images(Exception),
- None)
- self.assertTrue(mock_logger_error.called)
-
- def test_get_image_id_default(self):
- self.assertEqual(openstack_utils.
- get_image_id(self.glance_client,
- 'test_image'),
- 'image_id')
-
- # create_glance_image, get_or_create_image
- @mock.patch('functest.utils.openstack_utils.logger.error')
- def test_create_glance_image_file_present(self, mock_logger_error):
- with mock.patch('functest.utils.openstack_utils.'
- 'os.path.isfile',
- return_value=False):
- self.assertEqual(openstack_utils.
- create_glance_image(self.glance_client,
- 'test_image',
- 'file_path'),
- None)
- self.assertTrue(mock_logger_error.called)
-
- @mock.patch('functest.utils.openstack_utils.logger.info')
- def test_create_glance_image_already_exist(self, mock_logger_info):
- with mock.patch('functest.utils.openstack_utils.'
- 'os.path.isfile',
- return_value=True), \
- mock.patch('functest.utils.openstack_utils.get_image_id',
- return_value='image_id'):
- self.assertEqual(openstack_utils.
- create_glance_image(self.glance_client,
- 'test_image',
- 'file_path'),
- 'image_id')
- self.assertTrue(mock_logger_info.called)
-
- @mock.patch('functest.utils.openstack_utils.logger.info')
- def test_create_glance_image_default(self, mock_logger_info):
- with mock.patch('functest.utils.openstack_utils.'
- 'os.path.isfile',
- return_value=True), \
- mock.patch('functest.utils.openstack_utils.get_image_id',
- return_value=''), \
- mock.patch('six.moves.builtins.open',
- mock.mock_open(read_data='1')) as m:
- self.assertEqual(openstack_utils.
- create_glance_image(self.glance_client,
- 'test_image',
- 'file_path'),
- 'image_id')
- m.assert_called_once_with('file_path')
- self.assertTrue(mock_logger_info.called)
-
- @mock.patch('functest.utils.openstack_utils.logger.error')
- def test_create_glance_image_exception(self, mock_logger_error):
- with mock.patch('functest.utils.openstack_utils.'
- 'os.path.isfile',
- return_value=True), \
- mock.patch('functest.utils.openstack_utils.get_image_id',
- side_effect=Exception):
- self.assertEqual(openstack_utils.
- create_glance_image(self.glance_client,
- 'test_image',
- 'file_path'),
- None)
- self.assertTrue(mock_logger_error.called)
-
- def test_delete_glance_image_default(self):
- self.assertTrue(openstack_utils.
- delete_glance_image(self.nova_client,
- 'image_id'))
-
- @mock.patch('functest.utils.openstack_utils.logger.error')
- def test_delete_glance_image_exception(self, mock_logger_error):
- self.assertFalse(openstack_utils.
- delete_glance_image(Exception,
- 'image_id'))
- self.assertTrue(mock_logger_error.called)
-
- def test_get_volumes_default(self):
- self.assertEqual(openstack_utils.
- get_volumes(self.cinder_client),
- [self.volume])
-
- @mock.patch('functest.utils.openstack_utils.logger.error')
- def test_get_volumes_exception(self, mock_logger_error):
- self.assertEqual(openstack_utils.
- get_volumes(Exception),
- None)
- self.assertTrue(mock_logger_error.called)
-
- def test_update_cinder_quota_default(self):
- self.assertTrue(openstack_utils.
- update_cinder_quota(self.cinder_client,
- 'tenant_id',
- 'vols_quota',
- 'snap_quota',
- 'giga_quota'))
-
- @mock.patch('functest.utils.openstack_utils.logger.error')
- def test_update_cinder_quota_exception(self, mock_logger_error):
- self.assertFalse(openstack_utils.
- update_cinder_quota(Exception,
- 'tenant_id',
- 'vols_quota',
- 'snap_quota',
- 'giga_quota'))
- self.assertTrue(mock_logger_error.called)
-
- def test_delete_volume_default(self):
- self.assertTrue(openstack_utils.
- delete_volume(self.cinder_client,
- 'volume_id',
- forced=False))
-
- self.assertTrue(openstack_utils.
- delete_volume(self.cinder_client,
- 'volume_id',
- forced=True))
-
- @mock.patch('functest.utils.openstack_utils.logger.error')
- def test_delete_volume_exception(self, mock_logger_error):
- self.assertFalse(openstack_utils.
- delete_volume(Exception,
- 'volume_id',
- forced=True))
- self.assertTrue(mock_logger_error.called)
-
- def test_get_tenants_default(self):
- with mock.patch('functest.utils.openstack_utils.'
- 'is_keystone_v3', return_value=True):
- self.assertEqual(openstack_utils.
- get_tenants(self.keystone_client),
- [self.tenant])
- with mock.patch('functest.utils.openstack_utils.'
- 'is_keystone_v3', return_value=False):
- self.assertEqual(openstack_utils.
- get_tenants(self.keystone_client),
- [self.tenant])
-
- @mock.patch('functest.utils.openstack_utils.logger.error')
- def test_get_tenants_exception(self, mock_logger_error):
- self.assertEqual(openstack_utils.
- get_tenants(Exception),
- None)
- self.assertTrue(mock_logger_error.called)
-
- def test_get_users_default(self):
- self.assertEqual(openstack_utils.
- get_users(self.keystone_client),
- [self.user])
-
- @mock.patch('functest.utils.openstack_utils.logger.error')
- def test_get_users_exception(self, mock_logger_error):
- self.assertEqual(openstack_utils.
- get_users(Exception),
- None)
- self.assertTrue(mock_logger_error.called)
-
- def test_get_tenant_id_default(self):
- self.assertEqual(openstack_utils.
- get_tenant_id(self.keystone_client,
- 'test_tenant'),
- 'tenant_id')
-
- def test_get_user_id_default(self):
- self.assertEqual(openstack_utils.
- get_user_id(self.keystone_client,
- 'test_user'),
- 'user_id')
-
- def test_get_role_id_default(self):
- self.assertEqual(openstack_utils.
- get_role_id(self.keystone_client,
- 'test_role'),
- 'role_id')
-
- def test_get_domain_id_default(self):
- self.assertEqual(openstack_utils.
- get_domain_id(self.keystone_client,
- 'test_domain'),
- 'domain_id')
-
- def test_create_tenant_default(self):
- with mock.patch('functest.utils.openstack_utils.'
- 'is_keystone_v3', return_value=True):
- os.environ['OS_PROJECT_DOMAIN_NAME'] = 'Default'
- self.assertEqual(openstack_utils.
- create_tenant(self.keystone_client,
- 'test_tenant',
- 'tenant_desc'),
- 'tenant_id')
- with mock.patch('functest.utils.openstack_utils.'
- 'is_keystone_v3', return_value=False):
- self.assertEqual(openstack_utils.
- create_tenant(self.keystone_client,
- 'test_tenant',
- 'tenant_desc'),
- 'tenant_id')
-
- @mock.patch('functest.utils.openstack_utils.logger.error')
- def test_create_tenant_exception(self, mock_logger_error):
- self.assertEqual(openstack_utils.
- create_tenant(Exception,
- 'test_tenant',
- 'tenant_desc'),
- None)
- self.assertTrue(mock_logger_error.called)
-
- def test_create_user_default(self):
- with mock.patch('functest.utils.openstack_utils.'
- 'is_keystone_v3', return_value=True):
- self.assertEqual(openstack_utils.
- create_user(self.keystone_client,
- 'test_user',
- 'password',
- 'email',
- 'tenant_id'),
- 'user_id')
- with mock.patch('functest.utils.openstack_utils.'
- 'is_keystone_v3', return_value=False):
- self.assertEqual(openstack_utils.
- create_user(self.keystone_client,
- 'test_user',
- 'password',
- 'email',
- 'tenant_id'),
- 'user_id')
-
- @mock.patch('functest.utils.openstack_utils.logger.error')
- def test_create_user_exception(self, mock_logger_error):
- self.assertEqual(openstack_utils.
- create_user(Exception,
- 'test_user',
- 'password',
- 'email',
- 'tenant_id'),
- None)
- self.assertTrue(mock_logger_error.called)
-
- def test_add_role_user_default(self):
- with mock.patch('functest.utils.openstack_utils.'
- 'is_keystone_v3', return_value=True):
- self.assertTrue(openstack_utils.
- add_role_user(self.keystone_client,
- 'user_id',
- 'role_id',
- 'tenant_id'))
-
- with mock.patch('functest.utils.openstack_utils.'
- 'is_keystone_v3', return_value=False):
- self.assertTrue(openstack_utils.
- add_role_user(self.keystone_client,
- 'user_id',
- 'role_id',
- 'tenant_id'))
-
- @mock.patch('functest.utils.openstack_utils.logger.error')
- def test_add_role_user_exception(self, mock_logger_error):
- self.assertFalse(openstack_utils.
- add_role_user(Exception,
- 'user_id',
- 'role_id',
- 'tenant_id'))
- self.assertTrue(mock_logger_error.called)
-
- def test_delete_tenant_default(self):
- with mock.patch('functest.utils.openstack_utils.'
- 'is_keystone_v3', return_value=True):
- self.assertTrue(openstack_utils.
- delete_tenant(self.keystone_client,
- 'tenant_id'))
-
- with mock.patch('functest.utils.openstack_utils.'
- 'is_keystone_v3', return_value=False):
- self.assertTrue(openstack_utils.
- delete_tenant(self.keystone_client,
- 'tenant_id'))
-
- @mock.patch('functest.utils.openstack_utils.logger.error')
- def test_delete_tenant_exception(self, mock_logger_error):
- self.assertFalse(openstack_utils.
- delete_tenant(Exception,
- 'tenant_id'))
- self.assertTrue(mock_logger_error.called)
-
- def test_delete_user_default(self):
- self.assertTrue(openstack_utils.
- delete_user(self.keystone_client,
- 'user_id'))
-
- @mock.patch('functest.utils.openstack_utils.logger.error')
- def test_delete_user_exception(self, mock_logger_error):
- self.assertFalse(openstack_utils.
- delete_user(Exception,
- 'user_id'))
- self.assertTrue(mock_logger_error.called)
-
- def test_get_resource_default(self):
- with mock.patch('functest.utils.openstack_utils.'
- 'is_keystone_v3', return_value=True):
- self.assertEqual(openstack_utils.
- get_resource(self.heat_client,
- 'stack_id',
- 'resource'),
- self.resource)
-
- @mock.patch('functest.utils.openstack_utils.logger.error')
- def test_get_resource_exception(self, mock_logger_error):
- self.assertEqual(openstack_utils.
- get_resource(Exception,
- 'stack_id',
- 'resource'),
- None)
- self.assertTrue(mock_logger_error.called)
-
- def test_get_or_create_user_for_vnf_get(self):
- with mock.patch('functest.utils.openstack_utils.'
- 'get_user_id',
- return_value='user_id'), \
- mock.patch('functest.utils.openstack_utils.get_tenant_id',
- return_value='tenant_id'):
- self.assertFalse(openstack_utils.
- get_or_create_user_for_vnf(self.keystone_client,
- 'my_vnf'))
-
- def test_get_or_create_user_for_vnf_create(self):
- with mock.patch('functest.utils.openstack_utils.'
- 'get_user_id',
- return_value=None), \
- mock.patch('functest.utils.openstack_utils.get_tenant_id',
- return_value='tenant_id'):
- self.assertTrue(openstack_utils.
- get_or_create_user_for_vnf(self.keystone_client,
- 'my_vnf'))
-
- def test_get_or_create_user_for_vnf_error_get_user_id(self):
- with mock.patch('functest.utils.openstack_utils.'
- 'get_user_id',
- side_effect=Exception):
- self.assertRaises(Exception)
-
- def test_get_or_create_user_for_vnf_error_get_tenant_id(self):
- with mock.patch('functest.utils.openstack_utils.'
- 'get_user_id',
- return_value='user_id'), \
- mock.patch('functest.utils.openstack_utils.get_tenant_id',
- side_effect='Exception'):
- self.assertRaises(Exception)
-
- def test_get_or_create_tenant_for_vnf_get(self):
- with mock.patch('functest.utils.openstack_utils.'
- 'get_tenant_id',
- return_value='tenant_id'):
- self.assertFalse(
- openstack_utils.get_or_create_tenant_for_vnf(
- self.keystone_client, 'tenant_name', 'tenant_description'))
-
- def test_get_or_create_tenant_for_vnf_create(self):
- with mock.patch('functest.utils.openstack_utils.get_tenant_id',
- return_value=None):
- self.assertTrue(
- openstack_utils.get_or_create_tenant_for_vnf(
- self.keystone_client, 'tenant_name', 'tenant_description'))
-
- def test_get_or_create_tenant_for_vnf_error_get_tenant_id(self):
- with mock.patch('functest.utils.openstack_utils.'
- 'get_tenant_id',
- side_effect=Exception):
- self.assertRaises(Exception)
-
- def test_download_and_add_image_on_glance_image_creation_failure(self):
- with mock.patch('functest.utils.openstack_utils.'
- 'os.makedirs'), \
- mock.patch('functest.utils.openstack_utils.'
- 'ft_utils.download_url',
- return_value=True), \
- mock.patch('functest.utils.openstack_utils.'
- 'create_glance_image',
- return_value=''):
- resp = openstack_utils.download_and_add_image_on_glance(
- self.glance_client,
- 'image_name',
- 'http://url',
- 'data_dir')
- self.assertEqual(resp, False)
-
-
-if __name__ == "__main__":
- logging.disable(logging.CRITICAL)
- unittest.main(verbosity=2)
diff --git a/functest/utils/openstack_utils.py b/functest/utils/openstack_utils.py
deleted file mode 100644
index 98da48b8..00000000
--- a/functest/utils/openstack_utils.py
+++ /dev/null
@@ -1,1486 +0,0 @@
-#!/usr/bin/env python
-#
-# jose.lausuch@ericsson.com
-# valentin.boucher@orange.com
-# All rights reserved. This program and the accompanying materials
-# are made available under the terms of the Apache License, Version 2.0
-# which accompanies this distribution, and is available at
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-
-import logging
-import os.path
-import sys
-import time
-
-from keystoneauth1 import loading
-from keystoneauth1 import session
-from cinderclient import client as cinderclient
-from glanceclient import client as glanceclient
-from heatclient import client as heatclient
-from novaclient import client as novaclient
-from keystoneclient import client as keystoneclient
-from neutronclient.neutron import client as neutronclient
-
-from functest.utils import env
-import functest.utils.functest_utils as ft_utils
-
-logger = logging.getLogger(__name__)
-
-DEFAULT_API_VERSION = '2'
-DEFAULT_HEAT_API_VERSION = '1'
-
-
-# *********************************************
-# CREDENTIALS
-# *********************************************
-class MissingEnvVar(Exception):
-
- def __init__(self, var):
- self.var = var
-
- def __str__(self):
- return str.format("Please set the mandatory env var: {}", self.var)
-
-
-def is_keystone_v3():
- keystone_api_version = os.getenv('OS_IDENTITY_API_VERSION')
- if (keystone_api_version is None or
- keystone_api_version == '2'):
- return False
- else:
- return True
-
-
-def get_rc_env_vars():
- env_vars = ['OS_AUTH_URL', 'OS_USERNAME', 'OS_PASSWORD']
- if is_keystone_v3():
- env_vars.extend(['OS_PROJECT_NAME',
- 'OS_USER_DOMAIN_NAME',
- 'OS_PROJECT_DOMAIN_NAME'])
- else:
- env_vars.extend(['OS_TENANT_NAME'])
- return env_vars
-
-
-def check_credentials():
- """
- Check if the OpenStack credentials (openrc) are sourced
- """
- env_vars = get_rc_env_vars()
- return all(map(lambda v: v in os.environ and os.environ[v], env_vars))
-
-
-def get_env_cred_dict():
- env_cred_dict = {
- 'OS_USERNAME': 'username',
- 'OS_PASSWORD': 'password',
- 'OS_AUTH_URL': 'auth_url',
- 'OS_TENANT_NAME': 'tenant_name',
- 'OS_USER_DOMAIN_NAME': 'user_domain_name',
- 'OS_PROJECT_DOMAIN_NAME': 'project_domain_name',
- 'OS_PROJECT_NAME': 'project_name',
- 'OS_ENDPOINT_TYPE': 'endpoint_type',
- 'OS_REGION_NAME': 'region_name',
- 'OS_CACERT': 'https_cacert',
- 'OS_INSECURE': 'https_insecure'
- }
- return env_cred_dict
-
-
-def get_credentials(other_creds={}):
- """Returns a creds dictionary filled with parsed from env
- """
- creds = {}
- env_vars = get_rc_env_vars()
- env_cred_dict = get_env_cred_dict()
-
- for envvar in env_vars:
- if os.getenv(envvar) is None:
- raise MissingEnvVar(envvar)
- else:
- creds_key = env_cred_dict.get(envvar)
- creds.update({creds_key: os.getenv(envvar)})
-
- if 'tenant' in other_creds.keys():
- if is_keystone_v3():
- tenant = 'project_name'
- else:
- tenant = 'tenant_name'
- other_creds[tenant] = other_creds.pop('tenant')
-
- creds.update(other_creds)
-
- return creds
-
-
-def get_session_auth(other_creds={}):
- loader = loading.get_plugin_loader('password')
- creds = get_credentials(other_creds)
- auth = loader.load_from_options(**creds)
- return auth
-
-
-def get_endpoint(service_type, interface='public'):
- auth = get_session_auth()
- return get_session().get_endpoint(auth=auth,
- service_type=service_type,
- interface=interface)
-
-
-def get_session(other_creds={}):
- auth = get_session_auth(other_creds)
- https_cacert = os.getenv('OS_CACERT', '')
- https_insecure = os.getenv('OS_INSECURE', '').lower() == 'true'
- return session.Session(auth=auth,
- verify=(https_cacert or not https_insecure))
-
-
-# *********************************************
-# CLIENTS
-# *********************************************
-def get_keystone_client_version():
- api_version = os.getenv('OS_IDENTITY_API_VERSION')
- if api_version is not None:
- logger.info("OS_IDENTITY_API_VERSION is set in env as '%s'",
- api_version)
- return api_version
- return DEFAULT_API_VERSION
-
-
-def get_keystone_client(other_creds={}):
- sess = get_session(other_creds)
- return keystoneclient.Client(get_keystone_client_version(),
- session=sess,
- interface=os.getenv('OS_INTERFACE', 'admin'))
-
-
-def get_nova_client_version():
- api_version = os.getenv('OS_COMPUTE_API_VERSION')
- if api_version is not None:
- logger.info("OS_COMPUTE_API_VERSION is set in env as '%s'",
- api_version)
- return api_version
- return DEFAULT_API_VERSION
-
-
-def get_nova_client(other_creds={}):
- sess = get_session(other_creds)
- return novaclient.Client(get_nova_client_version(), session=sess)
-
-
-def get_cinder_client_version():
- api_version = os.getenv('OS_VOLUME_API_VERSION')
- if api_version is not None:
- logger.info("OS_VOLUME_API_VERSION is set in env as '%s'",
- api_version)
- return api_version
- return DEFAULT_API_VERSION
-
-
-def get_cinder_client(other_creds={}):
- sess = get_session(other_creds)
- return cinderclient.Client(get_cinder_client_version(), session=sess)
-
-
-def get_neutron_client_version():
- api_version = os.getenv('OS_NETWORK_API_VERSION')
- if api_version is not None:
- logger.info("OS_NETWORK_API_VERSION is set in env as '%s'",
- api_version)
- return api_version
- return DEFAULT_API_VERSION
-
-
-def get_neutron_client(other_creds={}):
- sess = get_session(other_creds)
- return neutronclient.Client(get_neutron_client_version(), session=sess)
-
-
-def get_glance_client_version():
- api_version = os.getenv('OS_IMAGE_API_VERSION')
- if api_version is not None:
- logger.info("OS_IMAGE_API_VERSION is set in env as '%s'", api_version)
- return api_version
- return DEFAULT_API_VERSION
-
-
-def get_glance_client(other_creds={}):
- sess = get_session(other_creds)
- return glanceclient.Client(get_glance_client_version(), session=sess)
-
-
-def get_heat_client_version():
- api_version = os.getenv('OS_ORCHESTRATION_API_VERSION')
- if api_version is not None:
- logger.info("OS_ORCHESTRATION_API_VERSION is set in env as '%s'",
- api_version)
- return api_version
- return DEFAULT_HEAT_API_VERSION
-
-
-def get_heat_client(other_creds={}):
- sess = get_session(other_creds)
- return heatclient.Client(get_heat_client_version(), session=sess)
-
-
-def download_and_add_image_on_glance(glance, image_name, image_url, data_dir):
- try:
- dest_path = data_dir
- if not os.path.exists(dest_path):
- os.makedirs(dest_path)
- file_name = image_url.rsplit('/')[-1]
- if not ft_utils.download_url(image_url, dest_path):
- return False
- except Exception:
- raise Exception("Impossible to download image from {}".format(
- image_url))
-
- try:
- image = create_glance_image(
- glance, image_name, dest_path + file_name)
- if not image:
- return False
- else:
- return image
- except Exception:
- raise Exception("Impossible to put image {} in glance".format(
- image_name))
-
-
-# *********************************************
-# NOVA
-# *********************************************
-def get_instances(nova_client):
- try:
- instances = nova_client.servers.list(search_opts={'all_tenants': 1})
- return instances
- except Exception as e:
- logger.error("Error [get_instances(nova_client)]: %s" % e)
- return None
-
-
-def get_instance_status(nova_client, instance):
- try:
- instance = nova_client.servers.get(instance.id)
- return instance.status
- except Exception as e:
- logger.error("Error [get_instance_status(nova_client)]: %s" % e)
- return None
-
-
-def get_instance_by_name(nova_client, instance_name):
- try:
- instance = nova_client.servers.find(name=instance_name)
- return instance
- except Exception as e:
- logger.error("Error [get_instance_by_name(nova_client, '%s')]: %s"
- % (instance_name, e))
- return None
-
-
-def get_flavor_id(nova_client, flavor_name):
- flavors = nova_client.flavors.list(detailed=True)
- id = ''
- for f in flavors:
- if f.name == flavor_name:
- id = f.id
- break
- return id
-
-
-def get_flavor_id_by_ram_range(nova_client, min_ram, max_ram):
- flavors = nova_client.flavors.list(detailed=True)
- id = ''
- for f in flavors:
- if min_ram <= f.ram and f.ram <= max_ram:
- id = f.id
- break
- return id
-
-
-def get_aggregates(nova_client):
- try:
- aggregates = nova_client.aggregates.list()
- return aggregates
- except Exception as e:
- logger.error("Error [get_aggregates(nova_client)]: %s" % e)
- return None
-
-
-def get_aggregate_id(nova_client, aggregate_name):
- try:
- aggregates = get_aggregates(nova_client)
- _id = [ag.id for ag in aggregates if ag.name == aggregate_name][0]
- return _id
- except Exception as e:
- logger.error("Error [get_aggregate_id(nova_client, %s)]:"
- " %s" % (aggregate_name, e))
- return None
-
-
-def get_availability_zones(nova_client):
- try:
- availability_zones = nova_client.availability_zones.list()
- return availability_zones
- except Exception as e:
- logger.error("Error [get_availability_zones(nova_client)]: %s" % e)
- return None
-
-
-def get_availability_zone_names(nova_client):
- try:
- az_names = [az.zoneName for az in get_availability_zones(nova_client)]
- return az_names
- except Exception as e:
- logger.error("Error [get_availability_zone_names(nova_client)]:"
- " %s" % e)
- return None
-
-
-def create_flavor(nova_client, flavor_name, ram, disk, vcpus, public=True):
- try:
- flavor = nova_client.flavors.create(
- flavor_name, ram, vcpus, disk, is_public=public)
- try:
- extra_specs = ft_utils.get_functest_config(
- 'general.flavor_extra_specs')
- flavor.set_keys(extra_specs)
- except ValueError:
- # flavor extra specs are not configured, therefore skip the update
- pass
-
- except Exception as e:
- logger.error("Error [create_flavor(nova_client, '%s', '%s', '%s', "
- "'%s')]: %s" % (flavor_name, ram, disk, vcpus, e))
- return None
- return flavor.id
-
-
-def get_or_create_flavor(flavor_name, ram, disk, vcpus, public=True):
- flavor_exists = False
- nova_client = get_nova_client()
-
- flavor_id = get_flavor_id(nova_client, flavor_name)
- if flavor_id != '':
- logger.info("Using existing flavor '%s'..." % flavor_name)
- flavor_exists = True
- else:
- logger.info("Creating flavor '%s' with '%s' RAM, '%s' disk size, "
- "'%s' vcpus..." % (flavor_name, ram, disk, vcpus))
- flavor_id = create_flavor(
- nova_client, flavor_name, ram, disk, vcpus, public=public)
- if not flavor_id:
- raise Exception("Failed to create flavor '%s'..." % (flavor_name))
- else:
- logger.debug("Flavor '%s' with ID=%s created successfully."
- % (flavor_name, flavor_id))
-
- return flavor_exists, flavor_id
-
-
-def get_floating_ips(neutron_client):
- try:
- floating_ips = neutron_client.list_floatingips()
- return floating_ips['floatingips']
- except Exception as e:
- logger.error("Error [get_floating_ips(neutron_client)]: %s" % e)
- return None
-
-
-def get_hypervisors(nova_client):
- try:
- nodes = []
- hypervisors = nova_client.hypervisors.list()
- for hypervisor in hypervisors:
- if hypervisor.state == "up":
- nodes.append(hypervisor.hypervisor_hostname)
- return nodes
- except Exception as e:
- logger.error("Error [get_hypervisors(nova_client)]: %s" % e)
- return None
-
-
-def create_aggregate(nova_client, aggregate_name, av_zone):
- try:
- nova_client.aggregates.create(aggregate_name, av_zone)
- return True
- except Exception as e:
- logger.error("Error [create_aggregate(nova_client, %s, %s)]: %s"
- % (aggregate_name, av_zone, e))
- return None
-
-
-def add_host_to_aggregate(nova_client, aggregate_name, compute_host):
- try:
- aggregate_id = get_aggregate_id(nova_client, aggregate_name)
- nova_client.aggregates.add_host(aggregate_id, compute_host)
- return True
- except Exception as e:
- logger.error("Error [add_host_to_aggregate(nova_client, %s, %s)]: %s"
- % (aggregate_name, compute_host, e))
- return None
-
-
-def create_aggregate_with_host(
- nova_client, aggregate_name, av_zone, compute_host):
- try:
- create_aggregate(nova_client, aggregate_name, av_zone)
- add_host_to_aggregate(nova_client, aggregate_name, compute_host)
- return True
- except Exception as e:
- logger.error("Error [create_aggregate_with_host("
- "nova_client, %s, %s, %s)]: %s"
- % (aggregate_name, av_zone, compute_host, e))
- return None
-
-
-def create_instance(flavor_name,
- image_id,
- network_id,
- instance_name="functest-vm",
- confdrive=True,
- userdata=None,
- av_zone='',
- fixed_ip=None,
- files=None):
- nova_client = get_nova_client()
- try:
- flavor = nova_client.flavors.find(name=flavor_name)
- except:
- flavors = nova_client.flavors.list()
- logger.error("Error: Flavor '%s' not found. Available flavors are: "
- "\n%s" % (flavor_name, flavors))
- return None
- if fixed_ip is not None:
- nics = {"net-id": network_id, "v4-fixed-ip": fixed_ip}
- else:
- nics = {"net-id": network_id}
- if userdata is None:
- instance = nova_client.servers.create(
- name=instance_name,
- flavor=flavor,
- image=image_id,
- nics=[nics],
- availability_zone=av_zone,
- files=files
- )
- else:
- instance = nova_client.servers.create(
- name=instance_name,
- flavor=flavor,
- image=image_id,
- nics=[nics],
- config_drive=confdrive,
- userdata=userdata,
- availability_zone=av_zone,
- files=files
- )
- return instance
-
-
-def create_instance_and_wait_for_active(flavor_name,
- image_id,
- network_id,
- instance_name="",
- config_drive=False,
- userdata="",
- av_zone='',
- fixed_ip=None,
- files=None):
- SLEEP = 3
- VM_BOOT_TIMEOUT = 180
- nova_client = get_nova_client()
- instance = create_instance(flavor_name,
- image_id,
- network_id,
- instance_name,
- config_drive,
- userdata,
- av_zone=av_zone,
- fixed_ip=fixed_ip,
- files=files)
- count = VM_BOOT_TIMEOUT / SLEEP
- for n in range(count, -1, -1):
- status = get_instance_status(nova_client, instance)
- if status is None:
- time.sleep(SLEEP)
- continue
- elif status.lower() == "active":
- return instance
- elif status.lower() == "error":
- logger.error("The instance %s went to ERROR status."
- % instance_name)
- return None
- time.sleep(SLEEP)
- logger.error("Timeout booting the instance %s." % instance_name)
- return None
-
-
-def create_floating_ip(neutron_client):
- extnet_id = get_external_net_id(neutron_client)
- props = {'floating_network_id': extnet_id}
- try:
- ip_json = neutron_client.create_floatingip({'floatingip': props})
- fip_addr = ip_json['floatingip']['floating_ip_address']
- fip_id = ip_json['floatingip']['id']
- except Exception as e:
- logger.error("Error [create_floating_ip(neutron_client)]: %s" % e)
- return None
- return {'fip_addr': fip_addr, 'fip_id': fip_id}
-
-
-def add_floating_ip(nova_client, server_id, floatingip_addr):
- try:
- nova_client.servers.add_floating_ip(server_id, floatingip_addr)
- return True
- except Exception as e:
- logger.error("Error [add_floating_ip(nova_client, '%s', '%s')]: %s"
- % (server_id, floatingip_addr, e))
- return False
-
-
-def delete_instance(nova_client, instance_id):
- try:
- nova_client.servers.force_delete(instance_id)
- return True
- except Exception as e:
- logger.error("Error [delete_instance(nova_client, '%s')]: %s"
- % (instance_id, e))
- return False
-
-
-def delete_floating_ip(neutron_client, floatingip_id):
- try:
- neutron_client.delete_floatingip(floatingip_id)
- return True
- except Exception as e:
- logger.error("Error [delete_floating_ip(neutron_client, '%s')]: %s"
- % (floatingip_id, e))
- return False
-
-
-def remove_host_from_aggregate(nova_client, aggregate_name, compute_host):
- try:
- aggregate_id = get_aggregate_id(nova_client, aggregate_name)
- nova_client.aggregates.remove_host(aggregate_id, compute_host)
- return True
- except Exception as e:
- logger.error("Error [remove_host_from_aggregate(nova_client, %s, %s)]:"
- " %s" % (aggregate_name, compute_host, e))
- return False
-
-
-def remove_hosts_from_aggregate(nova_client, aggregate_name):
- aggregate_id = get_aggregate_id(nova_client, aggregate_name)
- hosts = nova_client.aggregates.get(aggregate_id).hosts
- assert(
- all(remove_host_from_aggregate(nova_client, aggregate_name, host)
- for host in hosts))
-
-
-def delete_aggregate(nova_client, aggregate_name):
- try:
- remove_hosts_from_aggregate(nova_client, aggregate_name)
- nova_client.aggregates.delete(aggregate_name)
- return True
- except Exception as e:
- logger.error("Error [delete_aggregate(nova_client, %s)]: %s"
- % (aggregate_name, e))
- return False
-
-
-# *********************************************
-# NEUTRON
-# *********************************************
-def get_network_list(neutron_client):
- network_list = neutron_client.list_networks()['networks']
- if len(network_list) == 0:
- return None
- else:
- return network_list
-
-
-def get_router_list(neutron_client):
- router_list = neutron_client.list_routers()['routers']
- if len(router_list) == 0:
- return None
- else:
- return router_list
-
-
-def get_port_list(neutron_client):
- port_list = neutron_client.list_ports()['ports']
- if len(port_list) == 0:
- return None
- else:
- return port_list
-
-
-def get_network_id(neutron_client, network_name):
- networks = neutron_client.list_networks()['networks']
- id = ''
- for n in networks:
- if n['name'] == network_name:
- id = n['id']
- break
- return id
-
-
-def get_subnet_id(neutron_client, subnet_name):
- subnets = neutron_client.list_subnets()['subnets']
- id = ''
- for s in subnets:
- if s['name'] == subnet_name:
- id = s['id']
- break
- return id
-
-
-def get_router_id(neutron_client, router_name):
- routers = neutron_client.list_routers()['routers']
- id = ''
- for r in routers:
- if r['name'] == router_name:
- id = r['id']
- break
- return id
-
-
-def get_private_net(neutron_client):
- # Checks if there is an existing shared private network
- networks = neutron_client.list_networks()['networks']
- if len(networks) == 0:
- return None
- for net in networks:
- if (net['router:external'] is False) and (net['shared'] is True):
- return net
- return None
-
-
-def get_external_net(neutron_client):
- if (env.get('EXTERNAL_NETWORK')):
- return env.get('EXTERNAL_NETWORK')
- for network in neutron_client.list_networks()['networks']:
- if network['router:external']:
- return network['name']
- return None
-
-
-def get_external_net_id(neutron_client):
- if (env.get('EXTERNAL_NETWORK')):
- networks = neutron_client.list_networks(
- name=env.get('EXTERNAL_NETWORK'))
- net_id = networks['networks'][0]['id']
- return net_id
- for network in neutron_client.list_networks()['networks']:
- if network['router:external']:
- return network['id']
- return None
-
-
-def check_neutron_net(neutron_client, net_name):
- for network in neutron_client.list_networks()['networks']:
- if network['name'] == net_name:
- for subnet in network['subnets']:
- return True
- return False
-
-
-def create_neutron_net(neutron_client, name):
- json_body = {'network': {'name': name,
- 'admin_state_up': True}}
- try:
- network = neutron_client.create_network(body=json_body)
- network_dict = network['network']
- return network_dict['id']
- except Exception as e:
- logger.error("Error [create_neutron_net(neutron_client, '%s')]: %s"
- % (name, e))
- return None
-
-
-def create_neutron_subnet(neutron_client, name, cidr, net_id,
- dns=['8.8.8.8', '8.8.4.4']):
- json_body = {'subnets': [{'name': name, 'cidr': cidr,
- 'ip_version': 4, 'network_id': net_id,
- 'dns_nameservers': dns}]}
-
- try:
- subnet = neutron_client.create_subnet(body=json_body)
- return subnet['subnets'][0]['id']
- except Exception as e:
- logger.error("Error [create_neutron_subnet(neutron_client, '%s', "
- "'%s', '%s')]: %s" % (name, cidr, net_id, e))
- return None
-
-
-def create_neutron_router(neutron_client, name):
- json_body = {'router': {'name': name, 'admin_state_up': True}}
- try:
- router = neutron_client.create_router(json_body)
- return router['router']['id']
- except Exception as e:
- logger.error("Error [create_neutron_router(neutron_client, '%s')]: %s"
- % (name, e))
- return None
-
-
-def create_neutron_port(neutron_client, name, network_id, ip):
- json_body = {'port': {
- 'admin_state_up': True,
- 'name': name,
- 'network_id': network_id,
- 'fixed_ips': [{"ip_address": ip}]
- }}
- try:
- port = neutron_client.create_port(body=json_body)
- return port['port']['id']
- except Exception as e:
- logger.error("Error [create_neutron_port(neutron_client, '%s', '%s', "
- "'%s')]: %s" % (name, network_id, ip, e))
- return None
-
-
-def update_neutron_net(neutron_client, network_id, shared=False):
- json_body = {'network': {'shared': shared}}
- try:
- neutron_client.update_network(network_id, body=json_body)
- return True
- except Exception as e:
- logger.error("Error [update_neutron_net(neutron_client, '%s', '%s')]: "
- "%s" % (network_id, str(shared), e))
- return False
-
-
-def update_neutron_port(neutron_client, port_id, device_owner):
- json_body = {'port': {
- 'device_owner': device_owner,
- }}
- try:
- port = neutron_client.update_port(port=port_id,
- body=json_body)
- return port['port']['id']
- except Exception as e:
- logger.error("Error [update_neutron_port(neutron_client, '%s', '%s')]:"
- " %s" % (port_id, device_owner, e))
- return None
-
-
-def add_interface_router(neutron_client, router_id, subnet_id):
- json_body = {"subnet_id": subnet_id}
- try:
- neutron_client.add_interface_router(router=router_id, body=json_body)
- return True
- except Exception as e:
- logger.error("Error [add_interface_router(neutron_client, '%s', "
- "'%s')]: %s" % (router_id, subnet_id, e))
- return False
-
-
-def add_gateway_router(neutron_client, router_id):
- ext_net_id = get_external_net_id(neutron_client)
- router_dict = {'network_id': ext_net_id}
- try:
- neutron_client.add_gateway_router(router_id, router_dict)
- return True
- except Exception as e:
- logger.error("Error [add_gateway_router(neutron_client, '%s')]: %s"
- % (router_id, e))
- return False
-
-
-def delete_neutron_net(neutron_client, network_id):
- try:
- neutron_client.delete_network(network_id)
- return True
- except Exception as e:
- logger.error("Error [delete_neutron_net(neutron_client, '%s')]: %s"
- % (network_id, e))
- return False
-
-
-def delete_neutron_subnet(neutron_client, subnet_id):
- try:
- neutron_client.delete_subnet(subnet_id)
- return True
- except Exception as e:
- logger.error("Error [delete_neutron_subnet(neutron_client, '%s')]: %s"
- % (subnet_id, e))
- return False
-
-
-def delete_neutron_router(neutron_client, router_id):
- try:
- neutron_client.delete_router(router=router_id)
- return True
- except Exception as e:
- logger.error("Error [delete_neutron_router(neutron_client, '%s')]: %s"
- % (router_id, e))
- return False
-
-
-def delete_neutron_port(neutron_client, port_id):
- try:
- neutron_client.delete_port(port_id)
- return True
- except Exception as e:
- logger.error("Error [delete_neutron_port(neutron_client, '%s')]: %s"
- % (port_id, e))
- return False
-
-
-def remove_interface_router(neutron_client, router_id, subnet_id):
- json_body = {"subnet_id": subnet_id}
- try:
- neutron_client.remove_interface_router(router=router_id,
- body=json_body)
- return True
- except Exception as e:
- logger.error("Error [remove_interface_router(neutron_client, '%s', "
- "'%s')]: %s" % (router_id, subnet_id, e))
- return False
-
-
-def remove_gateway_router(neutron_client, router_id):
- try:
- neutron_client.remove_gateway_router(router_id)
- return True
- except Exception as e:
- logger.error("Error [remove_gateway_router(neutron_client, '%s')]: %s"
- % (router_id, e))
- return False
-
-
-def create_network_full(neutron_client,
- net_name,
- subnet_name,
- router_name,
- cidr,
- dns=['8.8.8.8', '8.8.4.4']):
-
- # Check if the network already exists
- network_id = get_network_id(neutron_client, net_name)
- subnet_id = get_subnet_id(neutron_client, subnet_name)
- router_id = get_router_id(neutron_client, router_name)
-
- if network_id != '' and subnet_id != '' and router_id != '':
- logger.info("A network with name '%s' already exists..." % net_name)
- else:
- neutron_client.format = 'json'
- logger.info('Creating neutron network %s...' % net_name)
- network_id = create_neutron_net(neutron_client, net_name)
-
- if not network_id:
- return False
-
- logger.debug("Network '%s' created successfully" % network_id)
- logger.debug('Creating Subnet....')
- subnet_id = create_neutron_subnet(neutron_client, subnet_name,
- cidr, network_id, dns)
- if not subnet_id:
- return None
-
- logger.debug("Subnet '%s' created successfully" % subnet_id)
- logger.debug('Creating Router...')
- router_id = create_neutron_router(neutron_client, router_name)
-
- if not router_id:
- return None
-
- logger.debug("Router '%s' created successfully" % router_id)
- logger.debug('Adding router to subnet...')
-
- if not add_interface_router(neutron_client, router_id, subnet_id):
- return None
-
- logger.debug("Interface added successfully.")
-
- logger.debug('Adding gateway to router...')
- if not add_gateway_router(neutron_client, router_id):
- return None
-
- logger.debug("Gateway added successfully.")
-
- network_dic = {'net_id': network_id,
- 'subnet_id': subnet_id,
- 'router_id': router_id}
- return network_dic
-
-
-def create_shared_network_full(net_name, subnt_name, router_name, subnet_cidr):
- neutron_client = get_neutron_client()
-
- network_dic = create_network_full(neutron_client,
- net_name,
- subnt_name,
- router_name,
- subnet_cidr)
- if network_dic:
- if not update_neutron_net(neutron_client,
- network_dic['net_id'],
- shared=True):
- logger.error("Failed to update network %s..." % net_name)
- return None
- else:
- logger.debug("Network '%s' is available..." % net_name)
- else:
- logger.error("Network %s creation failed" % net_name)
- return None
- return network_dic
-
-
-# *********************************************
-# SEC GROUPS
-# *********************************************
-
-
-def get_security_groups(neutron_client):
- try:
- security_groups = neutron_client.list_security_groups()[
- 'security_groups']
- return security_groups
- except Exception as e:
- logger.error("Error [get_security_groups(neutron_client)]: %s" % e)
- return None
-
-
-def get_security_group_id(neutron_client, sg_name):
- security_groups = get_security_groups(neutron_client)
- id = ''
- for sg in security_groups:
- if sg['name'] == sg_name:
- id = sg['id']
- break
- return id
-
-
-def create_security_group(neutron_client, sg_name, sg_description):
- json_body = {'security_group': {'name': sg_name,
- 'description': sg_description}}
- try:
- secgroup = neutron_client.create_security_group(json_body)
- return secgroup['security_group']
- except Exception as e:
- logger.error("Error [create_security_group(neutron_client, '%s', "
- "'%s')]: %s" % (sg_name, sg_description, e))
- return None
-
-
-def create_secgroup_rule(neutron_client, sg_id, direction, protocol,
- port_range_min=None, port_range_max=None):
- # We create a security group in 2 steps
- # 1 - we check the format and set the json body accordingly
- # 2 - we call neturon client to create the security group
-
- # Format check
- json_body = {'security_group_rule': {'direction': direction,
- 'security_group_id': sg_id,
- 'protocol': protocol}}
- # parameters may be
- # - both None => we do nothing
- # - both Not None => we add them to the json description
- # but one cannot be None is the other is not None
- if (port_range_min is not None and port_range_max is not None):
- # add port_range in json description
- json_body['security_group_rule']['port_range_min'] = port_range_min
- json_body['security_group_rule']['port_range_max'] = port_range_max
- logger.debug("Security_group format set (port range included)")
- else:
- # either both port range are set to None => do nothing
- # or one is set but not the other => log it and return False
- if port_range_min is None and port_range_max is None:
- logger.debug("Security_group format set (no port range mentioned)")
- else:
- logger.error("Bad security group format."
- "One of the port range is not properly set:"
- "range min: {},"
- "range max: {}".format(port_range_min,
- port_range_max))
- return False
-
- # Create security group using neutron client
- try:
- neutron_client.create_security_group_rule(json_body)
- return True
- except:
- logger.exception("Impossible to create_security_group_rule,"
- "security group rule probably already exists")
- return False
-
-
-def get_security_group_rules(neutron_client, sg_id):
- try:
- security_rules = neutron_client.list_security_group_rules()[
- 'security_group_rules']
- security_rules = [rule for rule in security_rules
- if rule["security_group_id"] == sg_id]
- return security_rules
- except Exception as e:
- logger.error("Error [get_security_group_rules(neutron_client, sg_id)]:"
- " %s" % e)
- return None
-
-
-def check_security_group_rules(neutron_client, sg_id, direction, protocol,
- port_min=None, port_max=None):
- try:
- security_rules = get_security_group_rules(neutron_client, sg_id)
- security_rules = [rule for rule in security_rules
- if (rule["direction"].lower() == direction and
- rule["protocol"].lower() == protocol and
- rule["port_range_min"] == port_min and
- rule["port_range_max"] == port_max)]
- if len(security_rules) == 0:
- return True
- else:
- return False
- except Exception as e:
- logger.error("Error [check_security_group_rules("
- " neutron_client, sg_id, direction,"
- " protocol, port_min=None, port_max=None)]: "
- "%s" % e)
- return None
-
-
-def create_security_group_full(neutron_client,
- sg_name, sg_description):
- sg_id = get_security_group_id(neutron_client, sg_name)
- if sg_id != '':
- logger.info("Using existing security group '%s'..." % sg_name)
- else:
- logger.info("Creating security group '%s'..." % sg_name)
- SECGROUP = create_security_group(neutron_client,
- sg_name,
- sg_description)
- if not SECGROUP:
- logger.error("Failed to create the security group...")
- return None
-
- sg_id = SECGROUP['id']
-
- logger.debug("Security group '%s' with ID=%s created successfully."
- % (SECGROUP['name'], sg_id))
-
- logger.debug("Adding ICMP rules in security group '%s'..."
- % sg_name)
- if not create_secgroup_rule(neutron_client, sg_id,
- 'ingress', 'icmp'):
- logger.error("Failed to create the security group rule...")
- return None
-
- logger.debug("Adding SSH rules in security group '%s'..."
- % sg_name)
- if not create_secgroup_rule(
- neutron_client, sg_id, 'ingress', 'tcp', '22', '22'):
- logger.error("Failed to create the security group rule...")
- return None
-
- if not create_secgroup_rule(
- neutron_client, sg_id, 'egress', 'tcp', '22', '22'):
- logger.error("Failed to create the security group rule...")
- return None
- return sg_id
-
-
-def add_secgroup_to_instance(nova_client, instance_id, secgroup_id):
- try:
- nova_client.servers.add_security_group(instance_id, secgroup_id)
- return True
- except Exception as e:
- logger.error("Error [add_secgroup_to_instance(nova_client, '%s', "
- "'%s')]: %s" % (instance_id, secgroup_id, e))
- return False
-
-
-def update_sg_quota(neutron_client, tenant_id, sg_quota, sg_rule_quota):
- json_body = {"quota": {
- "security_group": sg_quota,
- "security_group_rule": sg_rule_quota
- }}
-
- try:
- neutron_client.update_quota(tenant_id=tenant_id,
- body=json_body)
- return True
- except Exception as e:
- logger.error("Error [update_sg_quota(neutron_client, '%s', '%s', "
- "'%s')]: %s" % (tenant_id, sg_quota, sg_rule_quota, e))
- return False
-
-
-def delete_security_group(neutron_client, secgroup_id):
- try:
- neutron_client.delete_security_group(secgroup_id)
- return True
- except Exception as e:
- logger.error("Error [delete_security_group(neutron_client, '%s')]: %s"
- % (secgroup_id, e))
- return False
-
-
-# *********************************************
-# GLANCE
-# *********************************************
-def get_images(glance_client):
- try:
- images = glance_client.images.list()
- return images
- except Exception as e:
- logger.error("Error [get_images]: %s" % e)
- return None
-
-
-def get_image_id(glance_client, image_name):
- images = glance_client.images.list()
- id = ''
- for i in images:
- if i.name == image_name:
- id = i.id
- break
- return id
-
-
-def create_glance_image(glance_client,
- image_name,
- file_path,
- disk="qcow2",
- extra_properties={},
- container="bare",
- public="public"):
- if not os.path.isfile(file_path):
- logger.error("Error: file %s does not exist." % file_path)
- return None
- try:
- image_id = get_image_id(glance_client, image_name)
- if image_id != '':
- logger.info("Image %s already exists." % image_name)
- else:
- logger.info("Creating image '%s' from '%s'..." % (image_name,
- file_path))
-
- image = glance_client.images.create(name=image_name,
- visibility=public,
- disk_format=disk,
- container_format=container,
- **extra_properties)
- image_id = image.id
- with open(file_path) as image_data:
- glance_client.images.upload(image_id, image_data)
- return image_id
- except Exception as e:
- logger.error("Error [create_glance_image(glance_client, '%s', '%s', "
- "'%s')]: %s" % (image_name, file_path, public, e))
- return None
-
-
-def get_or_create_image(name, path, format, extra_properties):
- image_exists = False
- glance_client = get_glance_client()
-
- image_id = get_image_id(glance_client, name)
- if image_id != '':
- logger.info("Using existing image '%s'..." % name)
- image_exists = True
- else:
- logger.info("Creating image '%s' from '%s'..." % (name, path))
- image_id = create_glance_image(glance_client,
- name,
- path,
- format,
- extra_properties)
- if not image_id:
- logger.error("Failed to create a Glance image...")
- else:
- logger.debug("Image '%s' with ID=%s created successfully."
- % (name, image_id))
-
- return image_exists, image_id
-
-
-def delete_glance_image(glance_client, image_id):
- try:
- glance_client.images.delete(image_id)
- return True
- except Exception as e:
- logger.error("Error [delete_glance_image(glance_client, '%s')]: %s"
- % (image_id, e))
- return False
-
-
-# *********************************************
-# CINDER
-# *********************************************
-def get_volumes(cinder_client):
- try:
- volumes = cinder_client.volumes.list(search_opts={'all_tenants': 1})
- return volumes
- except Exception as e:
- logger.error("Error [get_volumes(cinder_client)]: %s" % e)
- return None
-
-
-def update_cinder_quota(cinder_client, tenant_id, vols_quota,
- snapshots_quota, gigabytes_quota):
- quotas_values = {"volumes": vols_quota,
- "snapshots": snapshots_quota,
- "gigabytes": gigabytes_quota}
-
- try:
- cinder_client.quotas.update(tenant_id, **quotas_values)
- return True
- except Exception as e:
- logger.error("Error [update_cinder_quota(cinder_client, '%s', '%s', "
- "'%s' '%s')]: %s" % (tenant_id, vols_quota,
- snapshots_quota, gigabytes_quota, e))
- return False
-
-
-def delete_volume(cinder_client, volume_id, forced=False):
- try:
- if forced:
- try:
- cinder_client.volumes.detach(volume_id)
- except:
- logger.error(sys.exc_info()[0])
- cinder_client.volumes.force_delete(volume_id)
- else:
- cinder_client.volumes.delete(volume_id)
- return True
- except Exception as e:
- logger.error("Error [delete_volume(cinder_client, '%s', '%s')]: %s"
- % (volume_id, str(forced), e))
- return False
-
-
-# *********************************************
-# KEYSTONE
-# *********************************************
-def get_tenants(keystone_client):
- try:
- if is_keystone_v3():
- tenants = keystone_client.projects.list()
- else:
- tenants = keystone_client.tenants.list()
- return tenants
- except Exception as e:
- logger.error("Error [get_tenants(keystone_client)]: %s" % e)
- return None
-
-
-def get_users(keystone_client):
- try:
- users = keystone_client.users.list()
- return users
- except Exception as e:
- logger.error("Error [get_users(keystone_client)]: %s" % e)
- return None
-
-
-def get_tenant_id(keystone_client, tenant_name):
- tenants = get_tenants(keystone_client)
- id = ''
- for t in tenants:
- if t.name == tenant_name:
- id = t.id
- break
- return id
-
-
-def get_user_id(keystone_client, user_name):
- users = get_users(keystone_client)
- id = ''
- for u in users:
- if u.name == user_name:
- id = u.id
- break
- return id
-
-
-def get_role_id(keystone_client, role_name):
- roles = keystone_client.roles.list()
- id = ''
- for r in roles:
- if r.name == role_name:
- id = r.id
- break
- return id
-
-
-def get_domain_id(keystone_client, domain_name):
- domains = keystone_client.domains.list()
- id = ''
- for d in domains:
- if d.name == domain_name:
- id = d.id
- break
- return id
-
-
-def create_tenant(keystone_client, tenant_name, tenant_description):
- try:
- if is_keystone_v3():
- domain_name = os.environ['OS_PROJECT_DOMAIN_NAME']
- domain_id = get_domain_id(keystone_client, domain_name)
- tenant = keystone_client.projects.create(
- name=tenant_name,
- description=tenant_description,
- domain=domain_id,
- enabled=True)
- else:
- tenant = keystone_client.tenants.create(tenant_name,
- tenant_description,
- enabled=True)
- return tenant.id
- except Exception as e:
- logger.error("Error [create_tenant(keystone_client, '%s', '%s')]: %s"
- % (tenant_name, tenant_description, e))
- return None
-
-
-def get_or_create_tenant(keystone_client, tenant_name, tenant_description):
- tenant_id = get_tenant_id(keystone_client, tenant_name)
- if not tenant_id:
- tenant_id = create_tenant(keystone_client, tenant_name,
- tenant_description)
-
- return tenant_id
-
-
-def get_or_create_tenant_for_vnf(keystone_client, tenant_name,
- tenant_description):
- """Get or Create a Tenant
-
- Args:
- keystone_client: keystone client reference
- tenant_name: the name of the tenant
- tenant_description: the description of the tenant
-
- return False if tenant retrieved though get
- return True if tenant created
- raise Exception if error during processing
- """
- try:
- tenant_id = get_tenant_id(keystone_client, tenant_name)
- if not tenant_id:
- tenant_id = create_tenant(keystone_client, tenant_name,
- tenant_description)
- return True
- else:
- return False
- except:
- raise Exception("Impossible to create a Tenant for the VNF {}".format(
- tenant_name))
-
-
-def create_user(keystone_client, user_name, user_password,
- user_email, tenant_id):
- try:
- if is_keystone_v3():
- user = keystone_client.users.create(name=user_name,
- password=user_password,
- email=user_email,
- project_id=tenant_id,
- enabled=True)
- else:
- user = keystone_client.users.create(user_name,
- user_password,
- user_email,
- tenant_id,
- enabled=True)
- return user.id
- except Exception as e:
- logger.error("Error [create_user(keystone_client, '%s', '%s', '%s'"
- "'%s')]: %s" % (user_name, user_password,
- user_email, tenant_id, e))
- return None
-
-
-def get_or_create_user(keystone_client, user_name, user_password,
- tenant_id, user_email=None):
- user_id = get_user_id(keystone_client, user_name)
- if not user_id:
- user_id = create_user(keystone_client, user_name, user_password,
- user_email, tenant_id)
- return user_id
-
-
-def get_or_create_user_for_vnf(keystone_client, vnf_ref):
- """Get or Create user for VNF
-
- Args:
- keystone_client: keystone client reference
- vnf_ref: VNF reference used as user name & password, tenant name
-
- return False if user retrieved through get
- return True if user created
- raise Exception if error during processing
- """
- try:
- user_id = get_user_id(keystone_client, vnf_ref)
- tenant_id = get_tenant_id(keystone_client, vnf_ref)
- created = False
- if not user_id:
- user_id = create_user(keystone_client, vnf_ref, vnf_ref,
- "", tenant_id)
- created = True
- try:
- role_id = get_role_id(keystone_client, 'admin')
- tenant_id = get_tenant_id(keystone_client, vnf_ref)
- add_role_user(keystone_client, user_id, role_id, tenant_id)
- except:
- logger.warn("Cannot associate user to role admin on tenant")
- return created
- except:
- raise Exception("Impossible to create a user for the VNF {}".format(
- vnf_ref))
-
-
-def add_role_user(keystone_client, user_id, role_id, tenant_id):
- try:
- if is_keystone_v3():
- keystone_client.roles.grant(role=role_id,
- user=user_id,
- project=tenant_id)
- else:
- keystone_client.roles.add_user_role(user_id, role_id, tenant_id)
- return True
- except Exception as e:
- logger.error("Error [add_role_user(keystone_client, '%s', '%s'"
- "'%s')]: %s " % (user_id, role_id, tenant_id, e))
- return False
-
-
-def delete_tenant(keystone_client, tenant_id):
- try:
- if is_keystone_v3():
- keystone_client.projects.delete(tenant_id)
- else:
- keystone_client.tenants.delete(tenant_id)
- return True
- except Exception as e:
- logger.error("Error [delete_tenant(keystone_client, '%s')]: %s"
- % (tenant_id, e))
- return False
-
-
-def delete_user(keystone_client, user_id):
- try:
- keystone_client.users.delete(user_id)
- return True
- except Exception as e:
- logger.error("Error [delete_user(keystone_client, '%s')]: %s"
- % (user_id, e))
- return False
-
-
-# *********************************************
-# HEAT
-# *********************************************
-def get_resource(heat_client, stack_id, resource):
- try:
- resources = heat_client.resources.get(stack_id, resource)
- return resources
- except Exception as e:
- logger.error("Error [get_resource]: %s" % e)
- return None