summaryrefslogtreecommitdiffstats
path: root/snaps
diff options
context:
space:
mode:
Diffstat (limited to 'snaps')
-rw-r--r--snaps/domain/network.py23
-rw-r--r--snaps/domain/test/network_tests.py27
-rw-r--r--snaps/openstack/create_instance.py17
-rw-r--r--snaps/openstack/tests/create_instance_tests.py8
-rw-r--r--snaps/openstack/utils/neutron_utils.py19
-rw-r--r--snaps/openstack/utils/nova_utils.py2
-rw-r--r--snaps/openstack/utils/tests/neutron_utils_tests.py2
-rw-r--r--snaps/test_suite_builder.py5
8 files changed, 76 insertions, 27 deletions
diff --git a/snaps/domain/network.py b/snaps/domain/network.py
index 9e02ba6..8ac5300 100644
--- a/snaps/domain/network.py
+++ b/snaps/domain/network.py
@@ -14,6 +14,29 @@
# limitations under the License.
+class Port:
+ """
+ SNAPS domain object for ports. Should contain attributes that
+ are shared amongst cloud providers
+ """
+ def __init__(self, **kwargs):
+ """
+ Constructor
+ :param name: the security group's name
+ :param id: the security group's id
+ :param ips: a list of IP addresses
+ """
+ self.name = kwargs.get('name')
+ self.id = kwargs.get('id')
+ self.ips = kwargs.get('ips')
+ self.mac_address = kwargs.get('mac_address')
+ self.allowed_address_pairs = kwargs.get('allowed_address_pairs')
+
+ def __eq__(self, other):
+ return (self.name == other.name and self.id == other.id and
+ self.ips == other.ips, self.mac_address == other.mac_address)
+
+
class SecurityGroup:
"""
SNAPS domain object for SecurityGroups. Should contain attributes that
diff --git a/snaps/domain/test/network_tests.py b/snaps/domain/test/network_tests.py
index a2f1374..13015b2 100644
--- a/snaps/domain/test/network_tests.py
+++ b/snaps/domain/test/network_tests.py
@@ -14,7 +14,28 @@
# limitations under the License.
import unittest
-from snaps.domain.network import SecurityGroup, SecurityGroupRule
+from snaps.domain.network import Port, SecurityGroup, SecurityGroupRule
+
+
+class PortDomainObjectTests(unittest.TestCase):
+ """
+ Tests the construction of the snaps.domain.network.Port class
+ """
+
+ def test_construction_kwargs(self):
+ ips = ['10', '11']
+ port = Port(
+ **{'name': 'name', 'id': 'id', 'ips': ips})
+ self.assertEqual('name', port.name)
+ self.assertEqual('id', port.id)
+ self.assertEqual(ips, port.ips)
+
+ def test_construction_named(self):
+ ips = ['10', '11']
+ port = Port(ips=ips, id='id', name='name')
+ self.assertEqual('name', port.name)
+ self.assertEqual('id', port.id)
+ self.assertEqual(ips, port.ips)
class SecurityGroupDomainObjectTests(unittest.TestCase):
@@ -52,8 +73,8 @@ class SecurityGroupRuleDomainObjectTests(unittest.TestCase):
def test_construction_kwargs(self):
sec_grp_rule = SecurityGroupRule(
- **{'id': 'id', 'security_group_id': 'grp_id', 'description': 'desc',
- 'direction': 'dir', 'ethertype': 'eType',
+ **{'id': 'id', 'security_group_id': 'grp_id',
+ 'description': 'desc', 'direction': 'dir', 'ethertype': 'eType',
'port_range_min': '10.0.0.100', 'port_range_max': '10.0.0.200',
'protocol': 'proto', 'remote_group_id': 'group_id',
'remote_ip_prefix': 'ip_prefix'})
diff --git a/snaps/openstack/create_instance.py b/snaps/openstack/create_instance.py
index 99ab87c..c970a31 100644
--- a/snaps/openstack/create_instance.py
+++ b/snaps/openstack/create_instance.py
@@ -298,13 +298,13 @@ class OpenStackVmInstance:
if subnet:
# Take IP of subnet if there is one configured on which to place
# the floating IP
- for fixed_ip in port['port']['fixed_ips']:
+ for fixed_ip in port.fixed_ips:
if fixed_ip['subnet_id'] == subnet['subnet']['id']:
ip = fixed_ip['ip_address']
break
else:
# Simply take the first
- ip = port['port']['fixed_ips'][0]['ip_address']
+ ip = port.ips[0]['ip_address']
if ip:
count = timeout / poll_interval
@@ -363,7 +363,6 @@ class OpenStackVmInstance:
"""
port = self.get_port_by_name(port_name)
if port:
- port_dict = port['port']
if subnet_name:
subnet = neutron_utils.get_subnet_by_name(self.__neutron,
subnet_name)
@@ -372,13 +371,12 @@ class OpenStackVmInstance:
'not be located with name - %s',
subnet_name)
return None
- for fixed_ip in port_dict['fixed_ips']:
+ for fixed_ip in port.ips:
if fixed_ip['subnet_id'] == subnet['subnet']['id']:
return fixed_ip['ip_address']
else:
- fixed_ips = port_dict['fixed_ips']
- if fixed_ips and len(fixed_ips) > 0:
- return fixed_ips[0]['ip_address']
+ if port.ips and len(port.ips) > 0:
+ return port.ips[0]['ip_address']
return None
def get_port_mac(self, port_name):
@@ -392,8 +390,7 @@ class OpenStackVmInstance:
"""
port = self.get_port_by_name(port_name)
if port:
- port_dict = port['port']
- return port_dict['mac_address']
+ return port.mac_address
return None
def get_port_by_name(self, port_name):
@@ -450,7 +447,7 @@ class OpenStackVmInstance:
:param ip: The IP on which to apply the playbook.
:return: the return value from ansible
"""
- port_ip = port['port']['fixed_ips'][0]['ip_address']
+ port_ip = port.ips[0]['ip_address']
variables = {
'floating_ip': ip,
'nic_name': nic_name,
diff --git a/snaps/openstack/tests/create_instance_tests.py b/snaps/openstack/tests/create_instance_tests.py
index dc8d79b..a13a38c 100644
--- a/snaps/openstack/tests/create_instance_tests.py
+++ b/snaps/openstack/tests/create_instance_tests.py
@@ -988,10 +988,10 @@ class CreateInstancePortManipulationTests(OSIntegrationTestCase):
port = self.inst_creator.get_port_by_name(port_settings.name)
self.assertIsNotNone(port)
- self.assertIsNotNone(port['port'].get('allowed_address_pairs'))
- self.assertEqual(1, len(port['port']['allowed_address_pairs']))
- validation_utils.objects_equivalent(pair, port['port'][
- 'allowed_address_pairs'][0])
+ self.assertIsNotNone(port.allowed_address_pairs)
+ self.assertEqual(1, len(port.allowed_address_pairs))
+ validation_utils.objects_equivalent(pair,
+ port.allowed_address_pairs[0])
def test_set_allowed_address_pairs_bad_mac(self):
"""
diff --git a/snaps/openstack/utils/neutron_utils.py b/snaps/openstack/utils/neutron_utils.py
index 0dfc61a..e1bed66 100644
--- a/snaps/openstack/utils/neutron_utils.py
+++ b/snaps/openstack/utils/neutron_utils.py
@@ -17,7 +17,7 @@ import logging
from neutronclient.common.exceptions import NotFound
from neutronclient.neutron.client import Client
-from snaps.domain.network import SecurityGroup, SecurityGroupRule
+from snaps.domain.network import Port, SecurityGroup, SecurityGroupRule
from snaps.domain.vm_inst import FloatingIp
from snaps.openstack.utils import keystone_utils
@@ -280,23 +280,27 @@ def create_port(neutron, os_creds, port_settings):
:param neutron: the client
:param os_creds: the OpenStack credentials
:param port_settings: the settings object for port configuration
- :return: the port object
+ :return: the SNAPS-OO Port domain object
"""
json_body = port_settings.dict_for_neutron(neutron, os_creds)
logger.info('Creating port for network with name - %s',
port_settings.network_name)
- return neutron.create_port(body=json_body)
+ os_port = neutron.create_port(body=json_body)['port']
+ return Port(name=os_port['name'], id=os_port['id'],
+ ips=os_port['fixed_ips'],
+ mac_address=os_port['mac_address'],
+ allowed_address_pairs=os_port['allowed_address_pairs'])
def delete_port(neutron, port):
"""
Removes an OpenStack port
:param neutron: the client
- :param port: the port object
+ :param port: the SNAPS-OO Port domain object
:return:
"""
- logger.info('Deleting port with name ' + port['port']['name'])
- neutron.delete_port(port['port']['id'])
+ logger.info('Deleting port with name ' + port.name)
+ neutron.delete_port(port.id)
def get_port_by_name(neutron, port_name):
@@ -309,7 +313,8 @@ def get_port_by_name(neutron, port_name):
ports = neutron.list_ports(**{'name': port_name})
for port in ports['ports']:
if port['name'] == port_name:
- return {'port': port}
+ return Port(name=port['name'], id=port['id'],
+ ips=port['fixed_ips'], mac_address=port['mac_address'])
return None
diff --git a/snaps/openstack/utils/nova_utils.py b/snaps/openstack/utils/nova_utils.py
index ccbf3c4..1ced4d7 100644
--- a/snaps/openstack/utils/nova_utils.py
+++ b/snaps/openstack/utils/nova_utils.py
@@ -69,7 +69,7 @@ def create_server(nova, neutron, glance, instance_settings, image_settings,
nics = []
for port in ports:
kv = dict()
- kv['port-id'] = port['port']['id']
+ kv['port-id'] = port.id
nics.append(kv)
logger.info('Creating VM with name - ' + instance_settings.name)
diff --git a/snaps/openstack/utils/tests/neutron_utils_tests.py b/snaps/openstack/utils/tests/neutron_utils_tests.py
index 516628b..0080b57 100644
--- a/snaps/openstack/utils/tests/neutron_utils_tests.py
+++ b/snaps/openstack/utils/tests/neutron_utils_tests.py
@@ -901,6 +901,6 @@ def validate_port(neutron, port_obj, this_port_name):
ports = neutron.list_ports()
for port, port_insts in ports.items():
for inst in port_insts:
- if inst['id'] == port_obj['port']['id']:
+ if inst['id'] == port_obj.id:
return inst['name'] == this_port_name
return False
diff --git a/snaps/test_suite_builder.py b/snaps/test_suite_builder.py
index 5c366ab..ab044e9 100644
--- a/snaps/test_suite_builder.py
+++ b/snaps/test_suite_builder.py
@@ -20,7 +20,8 @@ from snaps.domain.test.flavor_tests import FlavorDomainObjectTests
from snaps.domain.test.image_tests import ImageDomainObjectTests
from snaps.domain.test.keypair_tests import KeypairDomainObjectTests
from snaps.domain.test.network_tests import (
- SecurityGroupDomainObjectTests, SecurityGroupRuleDomainObjectTests)
+ SecurityGroupDomainObjectTests, SecurityGroupRuleDomainObjectTests,
+ PortDomainObjectTests)
from snaps.domain.test.project_tests import ProjectDomainObjectTests
from snaps.domain.test.stack_tests import StackDomainObjectTests
from snaps.domain.test.user_tests import UserDomainObjectTests
@@ -125,6 +126,8 @@ def add_unit_tests(suite):
suite.addTest(unittest.TestLoader().loadTestsFromTestCase(
PortSettingsUnitTests))
suite.addTest(unittest.TestLoader().loadTestsFromTestCase(
+ PortDomainObjectTests))
+ suite.addTest(unittest.TestLoader().loadTestsFromTestCase(
FloatingIpSettingsUnitTests))
suite.addTest(unittest.TestLoader().loadTestsFromTestCase(
VmInstanceSettingsUnitTests))