summaryrefslogtreecommitdiffstats
path: root/snaps/openstack/utils
diff options
context:
space:
mode:
Diffstat (limited to 'snaps/openstack/utils')
-rw-r--r--snaps/openstack/utils/neutron_utils.py50
-rw-r--r--snaps/openstack/utils/nova_utils.py8
-rw-r--r--snaps/openstack/utils/tests/neutron_utils_tests.py38
3 files changed, 47 insertions, 49 deletions
diff --git a/snaps/openstack/utils/neutron_utils.py b/snaps/openstack/utils/neutron_utils.py
index 1889748..0dfc61a 100644
--- a/snaps/openstack/utils/neutron_utils.py
+++ b/snaps/openstack/utils/neutron_utils.py
@@ -16,6 +16,8 @@ import logging
from neutronclient.common.exceptions import NotFound
from neutronclient.neutron.client import Client
+
+from snaps.domain.network import SecurityGroup, SecurityGroupRule
from snaps.domain.vm_inst import FloatingIp
from snaps.openstack.utils import keystone_utils
@@ -321,19 +323,23 @@ def create_security_group(neutron, keystone, sec_grp_settings):
"""
logger.info('Creating security group with name - %s',
sec_grp_settings.name)
- return neutron.create_security_group(
+ os_group = neutron.create_security_group(
sec_grp_settings.dict_for_neutron(keystone))
+ return SecurityGroup(
+ id=os_group['security_group']['id'],
+ name=os_group['security_group']['name'],
+ project_id=os_group['security_group'].get(
+ 'project_id', os_group['security_group'].get('tenant_id')))
def delete_security_group(neutron, sec_grp):
"""
Deletes a security group object from OpenStack
:param neutron: the client
- :param sec_grp: the security group object to delete
+ :param sec_grp: the SNAPS SecurityGroup object to delete
"""
- logger.info('Deleting security group with name - %s',
- sec_grp['security_group']['name'])
- return neutron.delete_security_group(sec_grp['security_group']['id'])
+ logger.info('Deleting security group with name - %s', sec_grp.name)
+ neutron.delete_security_group(sec_grp.id)
def get_security_group(neutron, name):
@@ -347,7 +353,9 @@ def get_security_group(neutron, name):
groups = neutron.list_security_groups(**{'name': name})
for group in groups['security_groups']:
if group['name'] == name:
- return {'security_group': group}
+ return SecurityGroup(
+ id=group['id'], name=group['name'],
+ project_id=group.get('project_id', group.get('tenant_id')))
return None
@@ -362,7 +370,9 @@ def get_security_group_by_id(neutron, sec_grp_id):
groups = neutron.list_security_groups(**{'id': sec_grp_id})
for group in groups['security_groups']:
if group['id'] == sec_grp_id:
- return {'security_group': group}
+ return SecurityGroup(
+ id=group['id'], name=group['name'],
+ project_id=group.get('project_id', group.get('tenant_id')))
return None
@@ -375,36 +385,36 @@ def create_security_group_rule(neutron, sec_grp_rule_settings):
"""
logger.info('Creating security group to security group - %s',
sec_grp_rule_settings.sec_grp_name)
- return neutron.create_security_group_rule(
+ os_rule = neutron.create_security_group_rule(
sec_grp_rule_settings.dict_for_neutron(neutron))
+ return SecurityGroupRule(**os_rule['security_group_rule'])
def delete_security_group_rule(neutron, sec_grp_rule):
"""
Deletes a security group object from OpenStack
:param neutron: the client
- :param sec_grp_rule: the security group rule object to delete
+ :param sec_grp_rule: the SNAPS SecurityGroupRule object to delete
"""
logger.info('Deleting security group rule with ID - %s',
- sec_grp_rule['security_group_rule']['id'])
- neutron.delete_security_group_rule(
- sec_grp_rule['security_group_rule']['id'])
+ sec_grp_rule.id)
+ neutron.delete_security_group_rule(sec_grp_rule.id)
def get_rules_by_security_group(neutron, sec_grp):
"""
Retrieves all of the rules for a given security group
:param neutron: the client
- :param sec_grp: the security group object
+ :param sec_grp: the SNAPS SecurityGroup object
"""
logger.info('Retrieving security group rules associate with the '
- 'security group - %s', sec_grp['security_group']['name'])
+ 'security group - %s', sec_grp.name)
out = list()
rules = neutron.list_security_group_rules(
- **{'security_group_id': sec_grp['security_group']['id']})
+ **{'security_group_id': sec_grp.id})
for rule in rules['security_group_rules']:
- if rule['security_group_id'] == sec_grp['security_group']['id']:
- out.append({'security_group_rule': rule})
+ if rule['security_group_id'] == sec_grp.id:
+ out.append(SecurityGroupRule(**rule))
return out
@@ -412,14 +422,14 @@ def get_rule_by_id(neutron, sec_grp, rule_id):
"""
Deletes a security group object from OpenStack
:param neutron: the client
- :param sec_grp: the security group object
+ :param sec_grp: the SNAPS SecurityGroup domain object
:param rule_id: the rule's ID
"""
rules = neutron.list_security_group_rules(
- **{'security_group_id': sec_grp['security_group']['id']})
+ **{'security_group_id': sec_grp.id})
for rule in rules['security_group_rules']:
if rule['id'] == rule_id:
- return {'security_group_rule': rule}
+ return SecurityGroupRule(**rule)
return None
diff --git a/snaps/openstack/utils/nova_utils.py b/snaps/openstack/utils/nova_utils.py
index db9a666..ccbf3c4 100644
--- a/snaps/openstack/utils/nova_utils.py
+++ b/snaps/openstack/utils/nova_utils.py
@@ -238,7 +238,8 @@ def keypair_exists(nova, keypair_obj):
"""
try:
os_kp = nova.keypairs.get(keypair_obj)
- return Keypair(name=os_kp.name, id=os_kp.id, public_key=os_kp.public_key)
+ return Keypair(name=os_kp.name, id=os_kp.id,
+ public_key=os_kp.public_key)
except:
return None
@@ -414,10 +415,9 @@ def remove_security_group(nova, vm, security_group):
Removes a security group from an existing VM
:param nova: the nova client
:param vm: the OpenStack server object (VM) to alter
- :param security_group: the OpenStack security group object to add
+ :param security_group: the SNAPS SecurityGroup domain object to add
"""
- nova.servers.remove_security_group(
- str(vm.id), security_group['security_group']['name'])
+ nova.servers.remove_security_group(str(vm.id), security_group.name)
def add_floating_ip_to_server(nova, vm, floating_ip, ip_addr):
diff --git a/snaps/openstack/utils/tests/neutron_utils_tests.py b/snaps/openstack/utils/tests/neutron_utils_tests.py
index 1e89dda..516628b 100644
--- a/snaps/openstack/utils/tests/neutron_utils_tests.py
+++ b/snaps/openstack/utils/tests/neutron_utils_tests.py
@@ -669,14 +669,13 @@ class NeutronUtilsSecurityGroupTests(OSComponentTestCase):
self.keystone,
sec_grp_settings)
- self.assertTrue(sec_grp_settings.name,
- security_group['security_group']['name'])
+ self.assertTrue(sec_grp_settings.name, security_group.name)
sec_grp_get = neutron_utils.get_security_group(self.neutron,
sec_grp_settings.name)
self.assertIsNotNone(sec_grp_get)
self.assertTrue(validation_utils.objects_equivalent(
- security_group['security_group'], sec_grp_get['security_group']))
+ security_group, sec_grp_get))
neutron_utils.delete_security_group(self.neutron, security_group)
sec_grp_get = neutron_utils.get_security_group(self.neutron,
@@ -707,18 +706,13 @@ class NeutronUtilsSecurityGroupTests(OSComponentTestCase):
neutron_utils.create_security_group(self.neutron, self.keystone,
sec_grp_settings))
- self.assertTrue(sec_grp_settings.name,
- self.security_groups[0]['security_group']['name'])
- self.assertTrue(sec_grp_settings.description,
- self.security_groups[0]['security_group'][
- 'description'])
+ self.assertTrue(sec_grp_settings.name, self.security_groups[0].name)
+ self.assertEqual(sec_grp_settings.name, self.security_groups[0].name)
sec_grp_get = neutron_utils.get_security_group(self.neutron,
sec_grp_settings.name)
self.assertIsNotNone(sec_grp_get)
- self.assertTrue(validation_utils.objects_equivalent(
- self.security_groups[0]['security_group'],
- sec_grp_get['security_group']))
+ self.assertEqual(self.security_groups[0], sec_grp_get)
def test_create_sec_grp_one_rule(self):
"""
@@ -751,19 +745,15 @@ class NeutronUtilsSecurityGroupTests(OSComponentTestCase):
security_group)
self.assertTrue(
- validation_utils.objects_equivalent(self.security_group_rules,
- rules))
+ validation_utils.objects_equivalent(
+ self.security_group_rules, rules))
- self.assertTrue(sec_grp_settings.name,
- security_group['security_group']['name'])
- self.assertTrue(sec_grp_settings.description,
- security_group['security_group']['description'])
+ self.assertTrue(sec_grp_settings.name, security_group.name)
sec_grp_get = neutron_utils.get_security_group(self.neutron,
sec_grp_settings.name)
self.assertIsNotNone(sec_grp_get)
- self.assertTrue(validation_utils.objects_equivalent(
- security_group['security_group'], sec_grp_get['security_group']))
+ self.assertEqual(security_group, sec_grp_get)
def test_get_sec_grp_by_id(self):
"""
@@ -780,14 +770,12 @@ class NeutronUtilsSecurityGroupTests(OSComponentTestCase):
description='hello group')))
sec_grp_1b = neutron_utils.get_security_group_by_id(
- self.neutron, self.security_groups[0]['security_group']['id'])
+ self.neutron, self.security_groups[0].id)
sec_grp_2b = neutron_utils.get_security_group_by_id(
- self.neutron, self.security_groups[1]['security_group']['id'])
+ self.neutron, self.security_groups[1].id)
- self.assertEqual(self.security_groups[0]['security_group']['id'],
- sec_grp_1b['security_group']['id'])
- self.assertEqual(self.security_groups[1]['security_group']['id'],
- sec_grp_2b['security_group']['id'])
+ self.assertEqual(self.security_groups[0].id, sec_grp_1b.id)
+ self.assertEqual(self.security_groups[1].id, sec_grp_2b.id)
class NeutronUtilsFloatingIpTests(OSComponentTestCase):