summaryrefslogtreecommitdiffstats
path: root/snaps/openstack/utils/tests/neutron_utils_tests.py
diff options
context:
space:
mode:
authorspisarski <s.pisarski@cablelabs.com>2017-06-22 12:43:09 -0600
committerspisarski <s.pisarski@cablelabs.com>2017-06-22 14:28:08 -0600
commitc7ba89444d160cb81656a49cb93416ee5013aa8f (patch)
tree0f46d74f98e17e256c3e1eb9a592c3bcc29044e8 /snaps/openstack/utils/tests/neutron_utils_tests.py
parentdbfb9c4e94e500592a8b93f42b7b87230d0af311 (diff)
Use neutron to create floating IPs.
This patch moves the floating IP creation out of nova and into neutron. Other changes include the use of domain objects for VM and Floating IP instances, addition of new nova_utils tests to exercise the create server functionality, and more PEP8 compliance. JIRA: SNAPS-92 Change-Id: I16c12b26b56008901633e90ae307586ad2045f9b Signed-off-by: spisarski <s.pisarski@cablelabs.com>
Diffstat (limited to 'snaps/openstack/utils/tests/neutron_utils_tests.py')
-rw-r--r--snaps/openstack/utils/tests/neutron_utils_tests.py686
1 files changed, 464 insertions, 222 deletions
diff --git a/snaps/openstack/utils/tests/neutron_utils_tests.py b/snaps/openstack/utils/tests/neutron_utils_tests.py
index e90b94c..1e89dda 100644
--- a/snaps/openstack/utils/tests/neutron_utils_tests.py
+++ b/snaps/openstack/utils/tests/neutron_utils_tests.py
@@ -14,14 +14,16 @@
# limitations under the License.
import uuid
-from snaps.openstack.utils import keystone_utils
-from snaps.openstack.create_security_group import SecurityGroupSettings, SecurityGroupRuleSettings, Direction
-from snaps.openstack.tests import openstack_tests
-from snaps.openstack.utils import neutron_utils
-from snaps.openstack.create_network import NetworkSettings, SubnetSettings, PortSettings
from snaps.openstack import create_router
-from snaps.openstack.tests.os_source_file_test import OSComponentTestCase
+from snaps.openstack.create_network import NetworkSettings, SubnetSettings, \
+ PortSettings
+from snaps.openstack.create_security_group import SecurityGroupSettings, \
+ SecurityGroupRuleSettings, Direction
+from snaps.openstack.tests import openstack_tests
from snaps.openstack.tests import validation_utils
+from snaps.openstack.tests.os_source_file_test import OSComponentTestCase
+from snaps.openstack.utils import keystone_utils
+from snaps.openstack.utils import neutron_utils
__author__ = 'spisarski'
@@ -57,13 +59,14 @@ class NeutronSmokeTests(OSComponentTestCase):
with self.assertRaises(Exception):
neutron = neutron_utils.neutron_client(
- OSCreds(username='user', password='pass', auth_url='url', project_name='project'))
+ OSCreds(username='user', password='pass', auth_url='url',
+ project_name='project'))
neutron.list_networks()
def test_retrieve_ext_network_name(self):
"""
- Tests the neutron_utils.get_external_network_names to ensure the configured self.ext_net_name is contained
- within the returned list
+ Tests the neutron_utils.get_external_network_names to ensure the
+ configured self.ext_net_name is contained within the returned list
:return:
"""
neutron = neutron_utils.neutron_client(self.os_creds)
@@ -86,7 +89,8 @@ class NeutronUtilsNetworkTests(OSComponentTestCase):
self.port_name = str(guid) + '-port'
self.neutron = neutron_utils.neutron_client(self.os_creds)
self.network = None
- self.net_config = openstack_tests.get_pub_net_config(net_name=guid + '-pub-net')
+ self.net_config = openstack_tests.get_pub_net_config(
+ net_name=guid + '-pub-net')
def tearDown(self):
"""
@@ -94,31 +98,40 @@ class NeutronUtilsNetworkTests(OSComponentTestCase):
"""
if self.network:
neutron_utils.delete_network(self.neutron, self.network)
- validate_network(self.neutron, self.network['network']['name'], False)
+ validate_network(self.neutron, self.network['network']['name'],
+ False)
def test_create_network(self):
"""
Tests the neutron_utils.create_neutron_net() function
"""
- self.network = neutron_utils.create_network(self.neutron, self.os_creds, self.net_config.network_settings)
- self.assertEqual(self.net_config.network_settings.name, self.network['network']['name'])
- self.assertTrue(validate_network(self.neutron, self.net_config.network_settings.name, True))
+ self.network = neutron_utils.create_network(
+ self.neutron, self.os_creds, self.net_config.network_settings)
+ self.assertEqual(self.net_config.network_settings.name,
+ self.network['network']['name'])
+ self.assertTrue(validate_network(self.neutron,
+ self.net_config.network_settings.name,
+ True))
def test_create_network_empty_name(self):
"""
- Tests the neutron_utils.create_neutron_net() function with an empty network name
+ Tests the neutron_utils.create_neutron_net() function with an empty
+ network name
"""
with self.assertRaises(Exception):
- self.network = neutron_utils.create_network(self.neutron, self.os_creds,
- network_settings=NetworkSettings(name=''))
+ self.network = neutron_utils.create_network(
+ self.neutron, self.os_creds,
+ network_settings=NetworkSettings(name=''))
def test_create_network_null_name(self):
"""
- Tests the neutron_utils.create_neutron_net() function when the network name is None
+ Tests the neutron_utils.create_neutron_net() function when the network
+ name is None
"""
with self.assertRaises(Exception):
- self.network = neutron_utils.create_network(self.neutron, self.os_creds,
- network_settings=NetworkSettings())
+ self.network = neutron_utils.create_network(
+ self.neutron, self.os_creds,
+ network_settings=NetworkSettings())
class NeutronUtilsSubnetTests(OSComponentTestCase):
@@ -133,7 +146,8 @@ class NeutronUtilsSubnetTests(OSComponentTestCase):
self.network = None
self.subnet = None
self.net_config = openstack_tests.get_pub_net_config(
- net_name=guid + '-pub-net', subnet_name=guid + '-pub-subnet', external_net=self.ext_net_name)
+ net_name=guid + '-pub-net', subnet_name=guid + '-pub-subnet',
+ external_net=self.ext_net_name)
def tearDown(self):
"""
@@ -142,71 +156,108 @@ class NeutronUtilsSubnetTests(OSComponentTestCase):
if self.subnet:
neutron_utils.delete_subnet(self.neutron, self.subnet)
validate_subnet(self.neutron, self.subnet.get('name'),
- self.net_config.network_settings.subnet_settings[0].cidr, False)
+ self.net_config.network_settings.subnet_settings[
+ 0].cidr, False)
if self.network:
neutron_utils.delete_network(self.neutron, self.network)
- validate_network(self.neutron, self.network['network']['name'], False)
+ validate_network(self.neutron, self.network['network']['name'],
+ False)
def test_create_subnet(self):
"""
Tests the neutron_utils.create_neutron_net() function
"""
- self.network = neutron_utils.create_network(self.neutron, self.os_creds, self.net_config.network_settings)
- self.assertEqual(self.net_config.network_settings.name, self.network['network']['name'])
- self.assertTrue(validate_network(self.neutron, self.net_config.network_settings.name, True))
-
- self.subnet = neutron_utils.create_subnet(self.neutron, self.net_config.network_settings.subnet_settings[0],
- self.os_creds, network=self.network)
- validate_subnet(self.neutron, self.net_config.network_settings.subnet_settings[0].name,
- self.net_config.network_settings.subnet_settings[0].cidr, True)
+ self.network = neutron_utils.create_network(
+ self.neutron, self.os_creds, self.net_config.network_settings)
+ self.assertEqual(self.net_config.network_settings.name,
+ self.network['network']['name'])
+ self.assertTrue(validate_network(self.neutron,
+ self.net_config.network_settings.name,
+ True))
+
+ subnet_setting = self.net_config.network_settings.subnet_settings[0]
+ self.subnet = neutron_utils.create_subnet(
+ self.neutron, subnet_setting,
+ self.os_creds, network=self.network)
+ validate_subnet(
+ self.neutron,
+ subnet_setting.name,
+ subnet_setting.cidr, True)
def test_create_subnet_null_name(self):
"""
- Tests the neutron_utils.create_neutron_subnet() function for an Exception when the subnet name is None
+ Tests the neutron_utils.create_neutron_subnet() function for an
+ Exception when the subnet name is None
"""
- self.network = neutron_utils.create_network(self.neutron, self.os_creds, self.net_config.network_settings)
- self.assertEqual(self.net_config.network_settings.name, self.network['network']['name'])
- self.assertTrue(validate_network(self.neutron, self.net_config.network_settings.name, True))
+ self.network = neutron_utils.create_network(
+ self.neutron, self.os_creds, self.net_config.network_settings)
+ self.assertEqual(self.net_config.network_settings.name,
+ self.network['network']['name'])
+ self.assertTrue(validate_network(self.neutron,
+ self.net_config.network_settings.name,
+ True))
with self.assertRaises(Exception):
SubnetSettings(cidr=self.net_config.subnet_cidr)
def test_create_subnet_empty_name(self):
"""
- Tests the neutron_utils.create_neutron_net() function with an empty name
+ Tests the neutron_utils.create_neutron_net() function with an empty
+ name
"""
- self.network = neutron_utils.create_network(self.neutron, self.os_creds, self.net_config.network_settings)
- self.assertEqual(self.net_config.network_settings.name, self.network['network']['name'])
- self.assertTrue(validate_network(self.neutron, self.net_config.network_settings.name, True))
+ self.network = neutron_utils.create_network(
+ self.neutron, self.os_creds, self.net_config.network_settings)
+ self.assertEqual(self.net_config.network_settings.name,
+ self.network['network']['name'])
+ self.assertTrue(validate_network(self.neutron,
+ self.net_config.network_settings.name,
+ True))
- neutron_utils.create_subnet(self.neutron, self.net_config.network_settings.subnet_settings[0],
- self.os_creds, network=self.network)
- validate_subnet(self.neutron, '', self.net_config.network_settings.subnet_settings[0].cidr, True)
+ subnet_setting = self.net_config.network_settings.subnet_settings[0]
+ neutron_utils.create_subnet(
+ self.neutron, subnet_setting,
+ self.os_creds, network=self.network)
+ validate_subnet(self.neutron, '',
+ subnet_setting.cidr, True)
def test_create_subnet_null_cidr(self):
"""
- Tests the neutron_utils.create_neutron_subnet() function for an Exception when the subnet CIDR value is None
+ Tests the neutron_utils.create_neutron_subnet() function for an
+ Exception when the subnet CIDR value is None
"""
- self.network = neutron_utils.create_network(self.neutron, self.os_creds, self.net_config.network_settings)
- self.assertEqual(self.net_config.network_settings.name, self.network['network']['name'])
- self.assertTrue(validate_network(self.neutron, self.net_config.network_settings.name, True))
+ self.network = neutron_utils.create_network(
+ self.neutron, self.os_creds, self.net_config.network_settings)
+ self.assertEqual(self.net_config.network_settings.name,
+ self.network['network']['name'])
+ self.assertTrue(validate_network(self.neutron,
+ self.net_config.network_settings.name,
+ True))
with self.assertRaises(Exception):
- sub_sets = SubnetSettings(cidr=None, name=self.net_config.subnet_name)
- neutron_utils.create_subnet(self.neutron, sub_sets, self.os_creds, network=self.network)
+ sub_sets = SubnetSettings(cidr=None,
+ name=self.net_config.subnet_name)
+ neutron_utils.create_subnet(self.neutron, sub_sets, self.os_creds,
+ network=self.network)
def test_create_subnet_empty_cidr(self):
"""
- Tests the neutron_utils.create_neutron_subnet() function for an Exception when the subnet CIDR value is empty
+ Tests the neutron_utils.create_neutron_subnet() function for an
+ Exception when the subnet CIDR value is empty
"""
- self.network = neutron_utils.create_network(self.neutron, self.os_creds, self.net_config.network_settings)
- self.assertEqual(self.net_config.network_settings.name, self.network['network']['name'])
- self.assertTrue(validate_network(self.neutron, self.net_config.network_settings.name, True))
+ self.network = neutron_utils.create_network(
+ self.neutron, self.os_creds, self.net_config.network_settings)
+ self.assertEqual(self.net_config.network_settings.name,
+ self.network['network']['name'])
+ self.assertTrue(validate_network(self.neutron,
+ self.net_config.network_settings.name,
+ True))
with self.assertRaises(Exception):
- sub_sets = SubnetSettings(cidr='', name=self.net_config.subnet_name)
- neutron_utils.create_subnet(self.neutron, sub_sets, self.os_creds, network=self.network)
+ sub_sets = SubnetSettings(cidr='',
+ name=self.net_config.subnet_name)
+ neutron_utils.create_subnet(self.neutron, sub_sets, self.os_creds,
+ network=self.network)
class NeutronUtilsRouterTests(OSComponentTestCase):
@@ -232,7 +283,8 @@ class NeutronUtilsRouterTests(OSComponentTestCase):
Cleans the remote OpenStack objects
"""
if self.interface_router:
- neutron_utils.remove_interface_router(self.neutron, self.router, self.subnet)
+ neutron_utils.remove_interface_router(self.neutron, self.router,
+ self.subnet)
if self.router:
neutron_utils.delete_router(self.neutron, self.router)
@@ -244,30 +296,40 @@ class NeutronUtilsRouterTests(OSComponentTestCase):
if self.subnet:
neutron_utils.delete_subnet(self.neutron, self.subnet)
validate_subnet(self.neutron, self.subnet.get('name'),
- self.net_config.network_settings.subnet_settings[0].cidr, False)
+ self.net_config.network_settings.subnet_settings[
+ 0].cidr, False)
if self.network:
neutron_utils.delete_network(self.neutron, self.network)
- validate_network(self.neutron, self.network['network']['name'], False)
+ validate_network(self.neutron, self.network['network']['name'],
+ False)
def test_create_router_simple(self):
"""
- Tests the neutron_utils.create_neutron_net() function when an external gateway is requested
+ Tests the neutron_utils.create_neutron_net() function when an external
+ gateway is requested
"""
- self.router = neutron_utils.create_router(self.neutron, self.os_creds, self.net_config.router_settings)
- validate_router(self.neutron, self.net_config.router_settings.name, True)
+ self.router = neutron_utils.create_router(
+ self.neutron, self.os_creds, self.net_config.router_settings)
+ validate_router(self.neutron, self.net_config.router_settings.name,
+ True)
def test_create_router_with_public_interface(self):
"""
- Tests the neutron_utils.create_neutron_net() function when an external gateway is requested
+ Tests the neutron_utils.create_neutron_net() function when an external
+ gateway is requested
"""
+ subnet_setting = self.net_config.network_settings.subnet_settings[0]
self.net_config = openstack_tests.OSNetworkConfig(
self.net_config.network_settings.name,
- self.net_config.network_settings.subnet_settings[0].name,
- self.net_config.network_settings.subnet_settings[0].cidr, self.net_config.router_settings.name,
+ subnet_setting.name,
+ subnet_setting.cidr,
+ self.net_config.router_settings.name,
self.ext_net_name)
- self.router = neutron_utils.create_router(self.neutron, self.os_creds, self.net_config.router_settings)
- validate_router(self.neutron, self.net_config.router_settings.name, True)
+ self.router = neutron_utils.create_router(
+ self.neutron, self.os_creds, self.net_config.router_settings)
+ validate_router(self.neutron, self.net_config.router_settings.name,
+ True)
# TODO - Add validation that the router gatway has been set
def test_create_router_empty_name(self):
@@ -276,83 +338,125 @@ class NeutronUtilsRouterTests(OSComponentTestCase):
"""
with self.assertRaises(Exception):
this_router_settings = create_router.RouterSettings(name='')
- self.router = neutron_utils.create_router(self.neutron, self.os_creds, this_router_settings)
+ self.router = neutron_utils.create_router(self.neutron,
+ self.os_creds,
+ this_router_settings)
def test_create_router_null_name(self):
"""
- Tests the neutron_utils.create_neutron_subnet() function when the subnet CIDR value is None
+ Tests the neutron_utils.create_neutron_subnet() function when the
+ subnet CIDR value is None
"""
with self.assertRaises(Exception):
this_router_settings = create_router.RouterSettings()
- self.router = neutron_utils.create_router(self.neutron, self.os_creds, this_router_settings)
+ self.router = neutron_utils.create_router(self.neutron,
+ self.os_creds,
+ this_router_settings)
validate_router(self.neutron, None, True)
def test_add_interface_router(self):
"""
Tests the neutron_utils.add_interface_router() function
"""
- self.network = neutron_utils.create_network(self.neutron, self.os_creds, self.net_config.network_settings)
- self.assertEqual(self.net_config.network_settings.name, self.network['network']['name'])
- self.assertTrue(validate_network(self.neutron, self.net_config.network_settings.name, True))
-
- self.subnet = neutron_utils.create_subnet(self.neutron, self.net_config.network_settings.subnet_settings[0],
- self.os_creds, self.network)
- validate_subnet(self.neutron, self.net_config.network_settings.subnet_settings[0].name,
- self.net_config.network_settings.subnet_settings[0].cidr, True)
-
- self.router = neutron_utils.create_router(self.neutron, self.os_creds, self.net_config.router_settings)
- validate_router(self.neutron, self.net_config.router_settings.name, True)
-
- self.interface_router = neutron_utils.add_interface_router(self.neutron, self.router, self.subnet)
- validate_interface_router(self.interface_router, self.router, self.subnet)
+ self.network = neutron_utils.create_network(
+ self.neutron, self.os_creds, self.net_config.network_settings)
+ self.assertEqual(self.net_config.network_settings.name,
+ self.network['network']['name'])
+ self.assertTrue(validate_network(self.neutron,
+ self.net_config.network_settings.name,
+ True))
+
+ subnet_setting = self.net_config.network_settings.subnet_settings[0]
+ self.subnet = neutron_utils.create_subnet(
+ self.neutron, subnet_setting,
+ self.os_creds, self.network)
+ validate_subnet(
+ self.neutron,
+ subnet_setting.name,
+ subnet_setting.cidr, True)
+
+ self.router = neutron_utils.create_router(
+ self.neutron, self.os_creds, self.net_config.router_settings)
+ validate_router(self.neutron, self.net_config.router_settings.name,
+ True)
+
+ self.interface_router = neutron_utils.add_interface_router(
+ self.neutron, self.router, self.subnet)
+ validate_interface_router(self.interface_router, self.router,
+ self.subnet)
def test_add_interface_router_null_router(self):
"""
- Tests the neutron_utils.add_interface_router() function for an Exception when the router value is None
- """
- self.network = neutron_utils.create_network(self.neutron, self.os_creds, self.net_config.network_settings)
- self.assertEqual(self.net_config.network_settings.name, self.network['network']['name'])
- self.assertTrue(validate_network(self.neutron, self.net_config.network_settings.name, True))
-
- self.subnet = neutron_utils.create_subnet(self.neutron, self.net_config.network_settings.subnet_settings[0],
- self.os_creds, self.network)
- validate_subnet(self.neutron, self.net_config.network_settings.subnet_settings[0].name,
- self.net_config.network_settings.subnet_settings[0].cidr, True)
+ Tests the neutron_utils.add_interface_router() function for an
+ Exception when the router value is None
+ """
+ self.network = neutron_utils.create_network(
+ self.neutron, self.os_creds, self.net_config.network_settings)
+ self.assertEqual(self.net_config.network_settings.name,
+ self.network['network']['name'])
+ self.assertTrue(validate_network(self.neutron,
+ self.net_config.network_settings.name,
+ True))
+
+ subnet_setting = self.net_config.network_settings.subnet_settings[0]
+ self.subnet = neutron_utils.create_subnet(
+ self.neutron, subnet_setting,
+ self.os_creds, self.network)
+ validate_subnet(
+ self.neutron,
+ subnet_setting.name,
+ subnet_setting.cidr, True)
with self.assertRaises(Exception):
- self.interface_router = neutron_utils.add_interface_router(self.neutron, self.router, self.subnet)
+ self.interface_router = neutron_utils.add_interface_router(
+ self.neutron, self.router, self.subnet)
def test_add_interface_router_null_subnet(self):
"""
- Tests the neutron_utils.add_interface_router() function for an Exception when the subnet value is None
+ Tests the neutron_utils.add_interface_router() function for an
+ Exception when the subnet value is None
"""
- self.network = neutron_utils.create_network(self.neutron, self.os_creds, self.net_config.network_settings)
- self.assertEqual(self.net_config.network_settings.name, self.network['network']['name'])
- self.assertTrue(validate_network(self.neutron, self.net_config.network_settings.name, True))
+ self.network = neutron_utils.create_network(
+ self.neutron, self.os_creds, self.net_config.network_settings)
+ self.assertEqual(self.net_config.network_settings.name,
+ self.network['network']['name'])
+ self.assertTrue(validate_network(self.neutron,
+ self.net_config.network_settings.name,
+ True))
- self.router = neutron_utils.create_router(self.neutron, self.os_creds, self.net_config.router_settings)
- validate_router(self.neutron, self.net_config.router_settings.name, True)
+ self.router = neutron_utils.create_router(
+ self.neutron, self.os_creds, self.net_config.router_settings)
+ validate_router(self.neutron, self.net_config.router_settings.name,
+ True)
with self.assertRaises(Exception):
- self.interface_router = neutron_utils.add_interface_router(self.neutron, self.router, self.subnet)
+ self.interface_router = neutron_utils.add_interface_router(
+ self.neutron, self.router, self.subnet)
def test_create_port(self):
"""
Tests the neutron_utils.create_port() function
"""
- self.network = neutron_utils.create_network(self.neutron, self.os_creds, self.net_config.network_settings)
- self.assertEqual(self.net_config.network_settings.name, self.network['network']['name'])
- self.assertTrue(validate_network(self.neutron, self.net_config.network_settings.name, True))
+ self.network = neutron_utils.create_network(
+ self.neutron, self.os_creds, self.net_config.network_settings)
+ self.assertEqual(self.net_config.network_settings.name,
+ self.network['network']['name'])
+ self.assertTrue(validate_network(self.neutron,
+ self.net_config.network_settings.name,
+ True))
- self.subnet = neutron_utils.create_subnet(self.neutron, self.net_config.network_settings.subnet_settings[0],
- self.os_creds, self.network)
- validate_subnet(self.neutron, self.net_config.network_settings.subnet_settings[0].name,
- self.net_config.network_settings.subnet_settings[0].cidr, True)
+ subnet_setting = self.net_config.network_settings.subnet_settings[0]
+ self.subnet = neutron_utils.create_subnet(
+ self.neutron, subnet_setting, self.os_creds, self.network)
+ validate_subnet(self.neutron, subnet_setting.name,
+ subnet_setting.cidr, True)
self.port = neutron_utils.create_port(
self.neutron, self.os_creds, PortSettings(
name=self.port_name,
- ip_addrs=[{'subnet_name': self.net_config.network_settings.subnet_settings[0].name, 'ip': ip_1}],
+ ip_addrs=[{
+ 'subnet_name': subnet_setting.name,
+ 'ip': ip_1}],
network_name=self.net_config.network_settings.name))
validate_port(self.neutron, self.port, self.port_name)
@@ -360,111 +464,172 @@ class NeutronUtilsRouterTests(OSComponentTestCase):
"""
Tests the neutron_utils.create_port() function
"""
- self.network = neutron_utils.create_network(self.neutron, self.os_creds, self.net_config.network_settings)
- self.assertEqual(self.net_config.network_settings.name, self.network['network']['name'])
- self.assertTrue(validate_network(self.neutron, self.net_config.network_settings.name, True))
+ self.network = neutron_utils.create_network(
+ self.neutron, self.os_creds, self.net_config.network_settings)
+ self.assertEqual(self.net_config.network_settings.name,
+ self.network['network']['name'])
+ self.assertTrue(validate_network(self.neutron,
+ self.net_config.network_settings.name,
+ True))
- self.subnet = neutron_utils.create_subnet(self.neutron, self.net_config.network_settings.subnet_settings[0],
- self.os_creds, self.network)
- validate_subnet(self.neutron, self.net_config.network_settings.subnet_settings[0].name,
- self.net_config.network_settings.subnet_settings[0].cidr, True)
+ subnet_setting = self.net_config.network_settings.subnet_settings[0]
+ self.subnet = neutron_utils.create_subnet(
+ self.neutron, subnet_setting, self.os_creds, self.network)
+ validate_subnet(self.neutron, subnet_setting.name, subnet_setting.cidr,
+ True)
self.port = neutron_utils.create_port(
self.neutron, self.os_creds, PortSettings(
- name=self.port_name, network_name=self.net_config.network_settings.name,
- ip_addrs=[{'subnet_name': self.net_config.network_settings.subnet_settings[0].name, 'ip': ip_1}]))
+ name=self.port_name,
+ network_name=self.net_config.network_settings.name,
+ ip_addrs=[{
+ 'subnet_name': subnet_setting.name,
+ 'ip': ip_1}]))
validate_port(self.neutron, self.port, self.port_name)
def test_create_port_null_name(self):
"""
- Tests the neutron_utils.create_port() function for an Exception when the port name value is None
- """
- self.network = neutron_utils.create_network(self.neutron, self.os_creds, self.net_config.network_settings)
- self.assertEqual(self.net_config.network_settings.name, self.network['network']['name'])
- self.assertTrue(validate_network(self.neutron, self.net_config.network_settings.name, True))
-
- self.subnet = neutron_utils.create_subnet(self.neutron, self.net_config.network_settings.subnet_settings[0],
- self.os_creds, self.network)
- validate_subnet(self.neutron, self.net_config.network_settings.subnet_settings[0].name,
- self.net_config.network_settings.subnet_settings[0].cidr, True)
+ Tests the neutron_utils.create_port() function for an Exception when
+ the port name value is None
+ """
+ self.network = neutron_utils.create_network(
+ self.neutron, self.os_creds, self.net_config.network_settings)
+ self.assertEqual(self.net_config.network_settings.name,
+ self.network['network']['name'])
+ self.assertTrue(validate_network(self.neutron,
+ self.net_config.network_settings.name,
+ True))
+
+ subnet_setting = self.net_config.network_settings.subnet_settings[0]
+ self.subnet = neutron_utils.create_subnet(
+ self.neutron, subnet_setting,
+ self.os_creds, self.network)
+ validate_subnet(
+ self.neutron,
+ subnet_setting.name,
+ subnet_setting.cidr, True)
with self.assertRaises(Exception):
- self.port = neutron_utils.create_port(self.neutron, self.os_creds, PortSettings(
- network_name=self.net_config.network_settings.name,
- ip_addrs=[{'subnet_name': self.net_config.network_settings.subnet_settings.name, 'ip': ip_1}]))
+ self.port = neutron_utils.create_port(
+ self.neutron, self.os_creds,
+ PortSettings(
+ network_name=self.net_config.network_settings.name,
+ ip_addrs=[{
+ 'subnet_name': subnet_setting.name,
+ 'ip': ip_1}]))
def test_create_port_null_network_object(self):
"""
- Tests the neutron_utils.create_port() function for an Exception when the network object is None
+ Tests the neutron_utils.create_port() function for an Exception when
+ the network object is None
"""
- self.network = neutron_utils.create_network(self.neutron, self.os_creds, self.net_config.network_settings)
- self.assertEqual(self.net_config.network_settings.name, self.network['network']['name'])
- self.assertTrue(validate_network(self.neutron, self.net_config.network_settings.name, True))
-
- self.subnet = neutron_utils.create_subnet(self.neutron, self.net_config.network_settings.subnet_settings[0],
- self.os_creds, self.network)
- validate_subnet(self.neutron, self.net_config.network_settings.subnet_settings[0].name,
- self.net_config.network_settings.subnet_settings[0].cidr, True)
-
with self.assertRaises(Exception):
- self.port = neutron_utils.create_port(self.neutron, self.os_creds, PortSettings(
- self.neutron, self.port_name, self.net_config.network_settings.name,
- ip_addrs=[{'subnet_name': self.net_config.network_settings.subnet_settings.name, 'ip': ip_1}]))
+ self.port = neutron_utils.create_port(
+ self.neutron, self.os_creds,
+ PortSettings(
+ name=self.port_name,
+ network_name=self.net_config.network_settings.name,
+ ip_addrs=[{
+ 'subnet_name':
+ self.net_config.network_settings.subnet_settings[
+ 0].name,
+ 'ip': ip_1}]))
def test_create_port_null_ip(self):
"""
- Tests the neutron_utils.create_port() function for an Exception when the IP value is None
- """
- self.network = neutron_utils.create_network(self.neutron, self.os_creds, self.net_config.network_settings)
- self.assertEqual(self.net_config.network_settings.name, self.network['network']['name'])
- self.assertTrue(validate_network(self.neutron, self.net_config.network_settings.name, True))
-
- self.subnet = neutron_utils.create_subnet(self.neutron, self.net_config.network_settings.subnet_settings[0],
- self.os_creds, self.network)
- validate_subnet(self.neutron, self.net_config.network_settings.subnet_settings[0].name,
- self.net_config.network_settings.subnet_settings[0].cidr, True)
+ Tests the neutron_utils.create_port() function for an Exception when
+ the IP value is None
+ """
+ self.network = neutron_utils.create_network(
+ self.neutron, self.os_creds, self.net_config.network_settings)
+ self.assertEqual(self.net_config.network_settings.name,
+ self.network['network']['name'])
+ self.assertTrue(validate_network(self.neutron,
+ self.net_config.network_settings.name,
+ True))
+
+ subnet_setting = self.net_config.network_settings.subnet_settings[0]
+ self.subnet = neutron_utils.create_subnet(
+ self.neutron, subnet_setting,
+ self.os_creds, self.network)
+ validate_subnet(
+ self.neutron,
+ subnet_setting.name,
+ subnet_setting.cidr, True)
with self.assertRaises(Exception):
- self.port = neutron_utils.create_port(self.neutron, self.os_creds, PortSettings(
- name=self.port_name, network_name=self.net_config.network_settings.name,
- ip_addrs=[{'subnet_name': self.net_config.network_settings.subnet_settings.name, 'ip': None}]))
+ self.port = neutron_utils.create_port(
+ self.neutron, self.os_creds,
+ PortSettings(
+ name=self.port_name,
+ network_name=self.net_config.network_settings.name,
+ ip_addrs=[{
+ 'subnet_name': subnet_setting.name,
+ 'ip': None}]))
def test_create_port_invalid_ip(self):
"""
- Tests the neutron_utils.create_port() function for an Exception when the IP value is None
- """
- self.network = neutron_utils.create_network(self.neutron, self.os_creds, self.net_config.network_settings)
- self.assertEqual(self.net_config.network_settings.name, self.network['network']['name'])
- self.assertTrue(validate_network(self.neutron, self.net_config.network_settings.name, True))
-
- self.subnet = neutron_utils.create_subnet(self.neutron, self.net_config.network_settings.subnet_settings[0],
- self.os_creds, self.network)
- validate_subnet(self.neutron, self.net_config.network_settings.subnet_settings[0].name,
- self.net_config.network_settings.subnet_settings[0].cidr, True)
+ Tests the neutron_utils.create_port() function for an Exception when
+ the IP value is None
+ """
+ self.network = neutron_utils.create_network(
+ self.neutron, self.os_creds, self.net_config.network_settings)
+ self.assertEqual(self.net_config.network_settings.name,
+ self.network['network']['name'])
+ self.assertTrue(validate_network(self.neutron,
+ self.net_config.network_settings.name,
+ True))
+
+ subnet_setting = self.net_config.network_settings.subnet_settings[0]
+ self.subnet = neutron_utils.create_subnet(
+ self.neutron,
+ subnet_setting,
+ self.os_creds, self.network)
+ validate_subnet(self.neutron,
+ subnet_setting.name,
+ subnet_setting.cidr, True)
with self.assertRaises(Exception):
- self.port = neutron_utils.create_port(self.neutron, self.os_creds, PortSettings(
- name=self.port_name, network_name=self.net_config.network_settings.name,
- ip_addrs=[{'subnet_name': self.net_config.network_settings.subnet_settings.name, 'ip': 'foo'}]))
+ self.port = neutron_utils.create_port(
+ self.neutron, self.os_creds,
+ PortSettings(
+ name=self.port_name,
+ network_name=self.net_config.network_settings.name,
+ ip_addrs=[{
+ 'subnet_name': subnet_setting.name,
+ 'ip': 'foo'}]))
def test_create_port_invalid_ip_to_subnet(self):
"""
- Tests the neutron_utils.create_port() function for an Exception when the IP value is None
- """
- self.network = neutron_utils.create_network(self.neutron, self.os_creds, self.net_config.network_settings)
- self.assertEqual(self.net_config.network_settings.name, self.network['network']['name'])
- self.assertTrue(validate_network(self.neutron, self.net_config.network_settings.name, True))
-
- self.subnet = neutron_utils.create_subnet(self.neutron, self.net_config.network_settings.subnet_settings[0],
- self.os_creds, self.network)
- validate_subnet(self.neutron, self.net_config.network_settings.subnet_settings[0].name,
- self.net_config.network_settings.subnet_settings[0].cidr, True)
+ Tests the neutron_utils.create_port() function for an Exception when
+ the IP value is None
+ """
+ self.network = neutron_utils.create_network(
+ self.neutron, self.os_creds, self.net_config.network_settings)
+ self.assertEqual(self.net_config.network_settings.name,
+ self.network['network']['name'])
+ self.assertTrue(validate_network(self.neutron,
+ self.net_config.network_settings.name,
+ True))
+
+ subnet_setting = self.net_config.network_settings.subnet_settings[0]
+ self.subnet = neutron_utils.create_subnet(
+ self.neutron,
+ subnet_setting,
+ self.os_creds, self.network)
+ validate_subnet(self.neutron,
+ subnet_setting.name,
+ subnet_setting.cidr, True)
with self.assertRaises(Exception):
- self.port = neutron_utils.create_port(self.neutron, self.os_creds, PortSettings(
- name=self.port_name, network_name=self.net_config.network_settings.name,
- ip_addrs=[{'subnet_name': self.net_config.network_settings.subnet_settings.name,
- 'ip': '10.197.123.100'}]))
+ self.port = neutron_utils.create_port(
+ self.neutron, self.os_creds,
+ PortSettings(
+ name=self.port_name,
+ network_name=self.net_config.network_settings.name,
+ ip_addrs=[{
+ 'subnet_name': subnet_setting.name,
+ 'ip': '10.197.123.100'}]))
class NeutronUtilsSecurityGroupTests(OSComponentTestCase):
@@ -490,7 +655,8 @@ class NeutronUtilsSecurityGroupTests(OSComponentTestCase):
for security_group in self.security_groups:
try:
- neutron_utils.delete_security_group(self.neutron, security_group)
+ neutron_utils.delete_security_group(self.neutron,
+ security_group)
except:
pass
@@ -499,72 +665,102 @@ class NeutronUtilsSecurityGroupTests(OSComponentTestCase):
Tests the neutron_utils.create_security_group() function
"""
sec_grp_settings = SecurityGroupSettings(name=self.sec_grp_name)
- security_group = neutron_utils.create_security_group(self.neutron, self.keystone, sec_grp_settings)
+ security_group = neutron_utils.create_security_group(self.neutron,
+ self.keystone,
+ sec_grp_settings)
- self.assertTrue(sec_grp_settings.name, security_group['security_group']['name'])
+ self.assertTrue(sec_grp_settings.name,
+ security_group['security_group']['name'])
- sec_grp_get = neutron_utils.get_security_group(self.neutron, sec_grp_settings.name)
+ sec_grp_get = neutron_utils.get_security_group(self.neutron,
+ sec_grp_settings.name)
self.assertIsNotNone(sec_grp_get)
self.assertTrue(validation_utils.objects_equivalent(
security_group['security_group'], sec_grp_get['security_group']))
neutron_utils.delete_security_group(self.neutron, security_group)
- sec_grp_get = neutron_utils.get_security_group(self.neutron, sec_grp_settings.name)
+ sec_grp_get = neutron_utils.get_security_group(self.neutron,
+ sec_grp_settings.name)
self.assertIsNone(sec_grp_get)
def test_create_sec_grp_no_name(self):
"""
- Tests the SecurityGroupSettings constructor and neutron_utils.create_security_group() function to ensure
- that attempting to create a security group without a name will raise an exception
+ Tests the SecurityGroupSettings constructor and
+ neutron_utils.create_security_group() function to ensure that
+ attempting to create a security group without a name will raise an
+ exception
"""
with self.assertRaises(Exception):
sec_grp_settings = SecurityGroupSettings()
self.security_groups.append(
- neutron_utils.create_security_group(self.neutron, self.keystone, sec_grp_settings))
+ neutron_utils.create_security_group(self.neutron,
+ self.keystone,
+ sec_grp_settings))
def test_create_sec_grp_no_rules(self):
"""
Tests the neutron_utils.create_security_group() function
"""
- sec_grp_settings = SecurityGroupSettings(name=self.sec_grp_name, description='hello group')
- self.security_groups.append(neutron_utils.create_security_group(self.neutron, self.keystone, sec_grp_settings))
+ sec_grp_settings = SecurityGroupSettings(name=self.sec_grp_name,
+ description='hello group')
+ self.security_groups.append(
+ neutron_utils.create_security_group(self.neutron, self.keystone,
+ sec_grp_settings))
- self.assertTrue(sec_grp_settings.name, self.security_groups[0]['security_group']['name'])
- self.assertTrue(sec_grp_settings.description, self.security_groups[0]['security_group']['description'])
+ self.assertTrue(sec_grp_settings.name,
+ self.security_groups[0]['security_group']['name'])
+ self.assertTrue(sec_grp_settings.description,
+ self.security_groups[0]['security_group'][
+ 'description'])
- sec_grp_get = neutron_utils.get_security_group(self.neutron, sec_grp_settings.name)
+ sec_grp_get = neutron_utils.get_security_group(self.neutron,
+ sec_grp_settings.name)
self.assertIsNotNone(sec_grp_get)
self.assertTrue(validation_utils.objects_equivalent(
- self.security_groups[0]['security_group'], sec_grp_get['security_group']))
+ self.security_groups[0]['security_group'],
+ sec_grp_get['security_group']))
def test_create_sec_grp_one_rule(self):
"""
Tests the neutron_utils.create_security_group() function
"""
- sec_grp_rule_settings = SecurityGroupRuleSettings(sec_grp_name=self.sec_grp_name, direction=Direction.ingress)
- sec_grp_settings = SecurityGroupSettings(name=self.sec_grp_name, description='hello group',
- rule_settings=[sec_grp_rule_settings])
+ sec_grp_rule_settings = SecurityGroupRuleSettings(
+ sec_grp_name=self.sec_grp_name, direction=Direction.ingress)
+ sec_grp_settings = SecurityGroupSettings(
+ name=self.sec_grp_name, description='hello group',
+ rule_settings=[sec_grp_rule_settings])
- self.security_groups.append(neutron_utils.create_security_group(self.neutron, self.keystone, sec_grp_settings))
- free_rules = neutron_utils.get_rules_by_security_group(self.neutron, self.security_groups[0])
+ self.security_groups.append(
+ neutron_utils.create_security_group(self.neutron, self.keystone,
+ sec_grp_settings))
+ free_rules = neutron_utils.get_rules_by_security_group(
+ self.neutron, self.security_groups[0])
for free_rule in free_rules:
self.security_group_rules.append(free_rule)
self.security_group_rules.append(
- neutron_utils.create_security_group_rule(self.neutron, sec_grp_settings.rule_settings[0]))
+ neutron_utils.create_security_group_rule(
+ self.neutron, sec_grp_settings.rule_settings[0]))
# Refresh object so it is populated with the newly added rule
- security_group = neutron_utils.get_security_group(self.neutron, sec_grp_settings.name)
+ security_group = neutron_utils.get_security_group(
+ self.neutron, sec_grp_settings.name)
- rules = neutron_utils.get_rules_by_security_group(self.neutron, security_group)
+ rules = neutron_utils.get_rules_by_security_group(self.neutron,
+ security_group)
- self.assertTrue(validation_utils.objects_equivalent(self.security_group_rules, rules))
+ self.assertTrue(
+ validation_utils.objects_equivalent(self.security_group_rules,
+ rules))
- self.assertTrue(sec_grp_settings.name, security_group['security_group']['name'])
- self.assertTrue(sec_grp_settings.description, security_group['security_group']['description'])
+ self.assertTrue(sec_grp_settings.name,
+ security_group['security_group']['name'])
+ self.assertTrue(sec_grp_settings.description,
+ security_group['security_group']['description'])
- sec_grp_get = neutron_utils.get_security_group(self.neutron, sec_grp_settings.name)
+ sec_grp_get = neutron_utils.get_security_group(self.neutron,
+ sec_grp_settings.name)
self.assertIsNotNone(sec_grp_get)
self.assertTrue(validation_utils.objects_equivalent(
security_group['security_group'], sec_grp_get['security_group']))
@@ -576,18 +772,59 @@ class NeutronUtilsSecurityGroupTests(OSComponentTestCase):
self.security_groups.append(neutron_utils.create_security_group(
self.neutron, self.keystone,
- SecurityGroupSettings(name=self.sec_grp_name + '-1', description='hello group')))
+ SecurityGroupSettings(name=self.sec_grp_name + '-1',
+ description='hello group')))
self.security_groups.append(neutron_utils.create_security_group(
self.neutron, self.keystone,
- SecurityGroupSettings(name=self.sec_grp_name + '-2', description='hello group')))
+ SecurityGroupSettings(name=self.sec_grp_name + '-2',
+ description='hello group')))
sec_grp_1b = neutron_utils.get_security_group_by_id(
self.neutron, self.security_groups[0]['security_group']['id'])
sec_grp_2b = neutron_utils.get_security_group_by_id(
self.neutron, self.security_groups[1]['security_group']['id'])
- self.assertEqual(self.security_groups[0]['security_group']['id'], sec_grp_1b['security_group']['id'])
- self.assertEqual(self.security_groups[1]['security_group']['id'], sec_grp_2b['security_group']['id'])
+ self.assertEqual(self.security_groups[0]['security_group']['id'],
+ sec_grp_1b['security_group']['id'])
+ self.assertEqual(self.security_groups[1]['security_group']['id'],
+ sec_grp_2b['security_group']['id'])
+
+
+class NeutronUtilsFloatingIpTests(OSComponentTestCase):
+ """
+ Test basic nova keypair functionality
+ """
+
+ def setUp(self):
+ """
+ Instantiates the CreateImage object that is responsible for downloading
+ and creating an OS image file within OpenStack
+ """
+ self.neutron = neutron_utils.neutron_client(self.os_creds)
+ self.floating_ip = None
+
+ def tearDown(self):
+ """
+ Cleans the image and downloaded image file
+ """
+ if self.floating_ip:
+ neutron_utils.delete_floating_ip(self.neutron, self.floating_ip)
+
+ def test_floating_ips(self):
+ """
+ Tests the creation of a floating IP
+ :return:
+ """
+ initial_fips = neutron_utils.get_floating_ips(self.neutron)
+
+ self.floating_ip = neutron_utils.create_floating_ip(self.neutron,
+ self.ext_net_name)
+ all_fips = neutron_utils.get_floating_ips(self.neutron)
+ self.assertEqual(len(initial_fips) + 1, len(all_fips))
+ returned = neutron_utils.get_floating_ip(self.neutron,
+ self.floating_ip)
+ self.assertEqual(self.floating_ip.id, returned.id)
+ self.assertEqual(self.floating_ip.ip, returned.ip)
"""
@@ -597,8 +834,9 @@ Validation routines
def validate_network(neutron, name, exists):
"""
- Returns true if a network for a given name DOES NOT exist if the exists parameter is false conversely true.
- Returns false if a network for a given name DOES exist if the exists parameter is true conversely false.
+ Returns true if a network for a given name DOES NOT exist if the exists
+ parameter is false conversely true. Returns false if a network for a given
+ name DOES exist if the exists parameter is true conversely false.
:param neutron: The neutron client
:param name: The expected network name
:param exists: Whether or not the network name should exist or not
@@ -614,8 +852,9 @@ def validate_network(neutron, name, exists):
def validate_subnet(neutron, name, cidr, exists):
"""
- Returns true if a subnet for a given name DOES NOT exist if the exists parameter is false conversely true.
- Returns false if a subnet for a given name DOES exist if the exists parameter is true conversely false.
+ Returns true if a subnet for a given name DOES NOT exist if the exists
+ parameter is false conversely true. Returns false if a subnet for a given
+ name DOES exist if the exists parameter is true conversely false.
:param neutron: The neutron client
:param name: The expected subnet name
:param cidr: The expected CIDR value
@@ -632,8 +871,9 @@ def validate_subnet(neutron, name, cidr, exists):
def validate_router(neutron, name, exists):
"""
- Returns true if a router for a given name DOES NOT exist if the exists parameter is false conversely true.
- Returns false if a router for a given name DOES exist if the exists parameter is true conversely false.
+ Returns true if a router for a given name DOES NOT exist if the exists
+ parameter is false conversely true. Returns false if a router for a given
+ name DOES exist if the exists parameter is true conversely false.
:param neutron: The neutron client
:param name: The expected router name
:param exists: Whether or not the network name should exist or not
@@ -647,7 +887,8 @@ def validate_router(neutron, name, exists):
def validate_interface_router(interface_router, router, subnet):
"""
- Returns true if the router ID & subnet ID have been properly included into the interface router object
+ Returns true if the router ID & subnet ID have been properly included into
+ the interface router object
:param interface_router: the object to validate
:param router: to validate against the interface_router
:param subnet: to validate against the interface_router
@@ -661,8 +902,9 @@ def validate_interface_router(interface_router, router, subnet):
def validate_port(neutron, port_obj, this_port_name):
"""
- Returns true if a port for a given name DOES NOT exist if the exists parameter is false conversely true.
- Returns false if a port for a given name DOES exist if the exists parameter is true conversely false.
+ Returns true if a port for a given name DOES NOT exist if the exists
+ parameter is false conversely true. Returns false if a port for a given
+ name DOES exist if the exists parameter is true conversely false.
:param neutron: The neutron client
:param port_obj: The port object to lookup
:param this_port_name: The expected router name