summaryrefslogtreecommitdiffstats
path: root/snaps
diff options
context:
space:
mode:
Diffstat (limited to 'snaps')
-rw-r--r--snaps/domain/network.py1
-rw-r--r--snaps/domain/project.py4
-rw-r--r--snaps/domain/test/network_tests.py10
-rw-r--r--snaps/domain/test/project_tests.py18
-rw-r--r--snaps/openstack/create_keypairs.py15
-rw-r--r--snaps/openstack/tests/create_keypairs_tests.py44
-rw-r--r--snaps/openstack/tests/create_project_tests.py38
-rw-r--r--snaps/openstack/tests/create_security_group_tests.py248
-rw-r--r--snaps/openstack/utils/keystone_utils.py12
-rw-r--r--snaps/openstack/utils/neutron_utils.py14
10 files changed, 329 insertions, 75 deletions
diff --git a/snaps/domain/network.py b/snaps/domain/network.py
index 68889f1..2c71db8 100644
--- a/snaps/domain/network.py
+++ b/snaps/domain/network.py
@@ -130,6 +130,7 @@ class SecurityGroup:
"""
self.name = kwargs.get('name')
self.id = kwargs.get('id')
+ self.description = kwargs.get('description')
self.project_id = kwargs.get('project_id', kwargs.get('tenant_id'))
def __eq__(self, other):
diff --git a/snaps/domain/project.py b/snaps/domain/project.py
index 818645b..73357c7 100644
--- a/snaps/domain/project.py
+++ b/snaps/domain/project.py
@@ -19,14 +19,16 @@ class Project:
SNAPS domain object for Projects. Should contain attributes that
are shared amongst cloud providers
"""
- def __init__(self, name, project_id):
+ def __init__(self, name, project_id, domain_id=None):
"""
Constructor
:param name: the project's name
:param project_id: the project's id
+ :param domain_id: the project's domain id
"""
self.name = name
self.id = project_id
+ self.domain_id = domain_id
def __eq__(self, other):
return self.name == other.name and self.id == other.id
diff --git a/snaps/domain/test/network_tests.py b/snaps/domain/test/network_tests.py
index 592090b..4fd20d4 100644
--- a/snaps/domain/test/network_tests.py
+++ b/snaps/domain/test/network_tests.py
@@ -133,10 +133,11 @@ class SecurityGroupDomainObjectTests(unittest.TestCase):
def test_construction_proj_id_kwargs(self):
sec_grp = SecurityGroup(
- **{'name': 'name', 'id': 'id',
- 'project_id': 'foo'})
+ **{'name': 'name', 'id': 'id', 'project_id': 'foo',
+ 'description': 'test desc'})
self.assertEqual('name', sec_grp.name)
self.assertEqual('id', sec_grp.id)
+ self.assertEqual('test desc', sec_grp.description)
self.assertEqual('foo', sec_grp.project_id)
def test_construction_tenant_id_kwargs(self):
@@ -146,11 +147,14 @@ class SecurityGroupDomainObjectTests(unittest.TestCase):
self.assertEqual('name', sec_grp.name)
self.assertEqual('id', sec_grp.id)
self.assertEqual('foo', sec_grp.project_id)
+ self.assertIsNone(sec_grp.description)
def test_construction_named(self):
- sec_grp = SecurityGroup(tenant_id='foo', id='id', name='name')
+ sec_grp = SecurityGroup(description='test desc', tenant_id='foo',
+ id='id', name='name')
self.assertEqual('name', sec_grp.name)
self.assertEqual('id', sec_grp.id)
+ self.assertEqual('test desc', sec_grp.description)
self.assertEqual('foo', sec_grp.project_id)
diff --git a/snaps/domain/test/project_tests.py b/snaps/domain/test/project_tests.py
index 4056bba..73939f0 100644
--- a/snaps/domain/test/project_tests.py
+++ b/snaps/domain/test/project_tests.py
@@ -22,12 +22,26 @@ class ProjectDomainObjectTests(unittest.TestCase):
Tests the construction of the snaps.domain.test.Project class
"""
- def test_construction_positional(self):
+ def test_construction_positional_minimal(self):
project = Project('foo', '123-456')
self.assertEqual('foo', project.name)
self.assertEqual('123-456', project.id)
+ self.assertIsNone(project.domain_id)
- def test_construction_named(self):
+ def test_construction_positional_all(self):
+ project = Project('foo', '123-456', 'hello')
+ self.assertEqual('foo', project.name)
+ self.assertEqual('123-456', project.id)
+ self.assertEqual('hello', project.domain_id)
+
+ def test_construction_named_minimal(self):
project = Project(project_id='123-456', name='foo')
self.assertEqual('foo', project.name)
self.assertEqual('123-456', project.id)
+ self.assertIsNone(project.domain_id)
+
+ def test_construction_named_all(self):
+ project = Project(domain_id='hello', project_id='123-456', name='foo')
+ self.assertEqual('foo', project.name)
+ self.assertEqual('123-456', project.id)
+ self.assertEqual('hello', project.domain_id)
diff --git a/snaps/openstack/create_keypairs.py b/snaps/openstack/create_keypairs.py
index 16374ad..c81fef5 100644
--- a/snaps/openstack/create_keypairs.py
+++ b/snaps/openstack/create_keypairs.py
@@ -68,13 +68,13 @@ class OpenStackKeypair:
self.keypair_settings.public_filepath)
if self.keypair_settings.delete_on_clean is not None:
- self.__delete_keys_on_clean = self.keypair_settings.delete_on_clean
+ delete_on_clean = self.keypair_settings.delete_on_clean
+ self.__delete_keys_on_clean = delete_on_clean
else:
self.__delete_keys_on_clean = False
else:
logger.info("Creating new keypair")
- # TODO - Make this value configurable
- keys = nova_utils.create_keys(1024)
+ keys = nova_utils.create_keys(self.keypair_settings.key_size)
self.__keypair = nova_utils.upload_keypair(
self.__nova, self.keypair_settings.name,
nova_utils.public_key_openssh(keys))
@@ -83,7 +83,8 @@ class OpenStackKeypair:
self.keypair_settings.private_filepath)
if self.keypair_settings.delete_on_clean is not None:
- self.__delete_keys_on_clean = self.keypair_settings.delete_on_clean
+ delete_on_clean = self.keypair_settings.delete_on_clean
+ self.__delete_keys_on_clean = delete_on_clean
else:
self.__delete_keys_on_clean = True
elif self.__keypair and not os.path.isfile(
@@ -137,6 +138,8 @@ class KeypairSettings:
public key file is or will be stored
:param private_filepath: The path where the generated private key file
will be stored
+ :param key_size: The number of bytes for the key size when it needs to
+ be generated (Must be >=512 default 1024)
:param delete_on_clean: when True, the key files will be deleted when
OpenStackKeypair#clean() is called
:return:
@@ -145,6 +148,7 @@ class KeypairSettings:
self.name = kwargs.get('name')
self.public_filepath = kwargs.get('public_filepath')
self.private_filepath = kwargs.get('private_filepath')
+ self.key_size = int(kwargs.get('key_size', 1024))
if kwargs.get('delete_on_clean') is not None:
if isinstance(kwargs.get('delete_on_clean'), bool):
@@ -157,6 +161,9 @@ class KeypairSettings:
if not self.name:
raise KeypairSettingsError('Name is a required attribute')
+ if self.key_size < 512:
+ raise KeypairSettingsError('key_size must be >=512')
+
class KeypairSettingsError(Exception):
"""
diff --git a/snaps/openstack/tests/create_keypairs_tests.py b/snaps/openstack/tests/create_keypairs_tests.py
index 2824c34..0b35095 100644
--- a/snaps/openstack/tests/create_keypairs_tests.py
+++ b/snaps/openstack/tests/create_keypairs_tests.py
@@ -39,9 +39,14 @@ class KeypairSettingsUnitTests(unittest.TestCase):
with self.assertRaises(KeypairSettingsError):
KeypairSettings(**dict())
+ def test_small_key_size(self):
+ with self.assertRaises(KeypairSettingsError):
+ KeypairSettings(name='foo', key_size=511)
+
def test_name_only(self):
settings = KeypairSettings(name='foo')
self.assertEqual('foo', settings.name)
+ self.assertEqual(1024, settings.key_size)
self.assertIsNone(settings.public_filepath)
self.assertIsNone(settings.private_filepath)
self.assertIsNone(settings.delete_on_clean)
@@ -49,6 +54,7 @@ class KeypairSettingsUnitTests(unittest.TestCase):
def test_config_with_name_only(self):
settings = KeypairSettings(**{'name': 'foo'})
self.assertEqual('foo', settings.name)
+ self.assertEqual(1024, settings.key_size)
self.assertIsNone(settings.public_filepath)
self.assertIsNone(settings.private_filepath)
self.assertIsNone(settings.delete_on_clean)
@@ -56,6 +62,7 @@ class KeypairSettingsUnitTests(unittest.TestCase):
def test_name_pub_only(self):
settings = KeypairSettings(name='foo', public_filepath='/foo/bar.pub')
self.assertEqual('foo', settings.name)
+ self.assertEqual(1024, settings.key_size)
self.assertEqual('/foo/bar.pub', settings.public_filepath)
self.assertIsNone(settings.private_filepath)
self.assertIsNone(settings.delete_on_clean)
@@ -64,6 +71,7 @@ class KeypairSettingsUnitTests(unittest.TestCase):
settings = KeypairSettings(
**{'name': 'foo', 'public_filepath': '/foo/bar.pub'})
self.assertEqual('foo', settings.name)
+ self.assertEqual(1024, settings.key_size)
self.assertEqual('/foo/bar.pub', settings.public_filepath)
self.assertIsNone(settings.private_filepath)
self.assertIsNone(settings.delete_on_clean)
@@ -71,6 +79,7 @@ class KeypairSettingsUnitTests(unittest.TestCase):
def test_name_priv_only(self):
settings = KeypairSettings(name='foo', private_filepath='/foo/bar')
self.assertEqual('foo', settings.name)
+ self.assertEqual(1024, settings.key_size)
self.assertIsNone(settings.public_filepath)
self.assertEqual('/foo/bar', settings.private_filepath)
self.assertIsNone(settings.delete_on_clean)
@@ -79,6 +88,7 @@ class KeypairSettingsUnitTests(unittest.TestCase):
settings = KeypairSettings(
**{'name': 'foo', 'private_filepath': '/foo/bar'})
self.assertEqual('foo', settings.name)
+ self.assertEqual(1024, settings.key_size)
self.assertIsNone(settings.public_filepath)
self.assertEqual('/foo/bar', settings.private_filepath)
self.assertIsNone(settings.delete_on_clean)
@@ -86,8 +96,10 @@ class KeypairSettingsUnitTests(unittest.TestCase):
def test_all_delete_bool(self):
settings = KeypairSettings(
name='foo', public_filepath='/foo/bar.pub',
- private_filepath='/foo/bar', delete_on_clean=True)
+ private_filepath='/foo/bar', delete_on_clean=True,
+ key_size=999)
self.assertEqual('foo', settings.name)
+ self.assertEqual(999, settings.key_size)
self.assertEqual('/foo/bar.pub', settings.public_filepath)
self.assertEqual('/foo/bar', settings.private_filepath)
self.assertTrue(settings.delete_on_clean)
@@ -97,6 +109,7 @@ class KeypairSettingsUnitTests(unittest.TestCase):
name='foo', public_filepath='/foo/bar.pub',
private_filepath='/foo/bar', delete_on_clean='True')
self.assertEqual('foo', settings.name)
+ self.assertEqual(1024, settings.key_size)
self.assertEqual('/foo/bar.pub', settings.public_filepath)
self.assertEqual('/foo/bar', settings.private_filepath)
self.assertTrue(settings.delete_on_clean)
@@ -106,6 +119,7 @@ class KeypairSettingsUnitTests(unittest.TestCase):
name='foo', public_filepath='/foo/bar.pub',
private_filepath='/foo/bar', delete_on_clean='true')
self.assertEqual('foo', settings.name)
+ self.assertEqual(1024, settings.key_size)
self.assertEqual('/foo/bar.pub', settings.public_filepath)
self.assertEqual('/foo/bar', settings.private_filepath)
self.assertTrue(settings.delete_on_clean)
@@ -115,6 +129,7 @@ class KeypairSettingsUnitTests(unittest.TestCase):
name='foo', public_filepath='/foo/bar.pub',
private_filepath='/foo/bar', delete_on_clean='False')
self.assertEqual('foo', settings.name)
+ self.assertEqual(1024, settings.key_size)
self.assertEqual('/foo/bar.pub', settings.public_filepath)
self.assertEqual('/foo/bar', settings.private_filepath)
self.assertFalse(settings.delete_on_clean)
@@ -122,8 +137,10 @@ class KeypairSettingsUnitTests(unittest.TestCase):
def test_all_delete_str_false_lc(self):
settings = KeypairSettings(
name='foo', public_filepath='/foo/bar.pub',
- private_filepath='/foo/bar', delete_on_clean='false')
+ private_filepath='/foo/bar', delete_on_clean='false',
+ key_size='999')
self.assertEqual('foo', settings.name)
+ self.assertEqual(999, settings.key_size)
self.assertEqual('/foo/bar.pub', settings.public_filepath)
self.assertEqual('/foo/bar', settings.private_filepath)
self.assertFalse(settings.delete_on_clean)
@@ -131,8 +148,10 @@ class KeypairSettingsUnitTests(unittest.TestCase):
def test_config_all_delete_false_bool(self):
settings = KeypairSettings(
**{'name': 'foo', 'public_filepath': '/foo/bar.pub',
- 'private_filepath': '/foo/bar', 'delete_on_clean': False})
+ 'private_filepath': '/foo/bar', 'delete_on_clean': False,
+ 'key_size': 999})
self.assertEqual('foo', settings.name)
+ self.assertEqual(999, settings.key_size)
self.assertEqual('/foo/bar.pub', settings.public_filepath)
self.assertEqual('/foo/bar', settings.private_filepath)
self.assertFalse(settings.delete_on_clean)
@@ -142,6 +161,7 @@ class KeypairSettingsUnitTests(unittest.TestCase):
**{'name': 'foo', 'public_filepath': '/foo/bar.pub',
'private_filepath': '/foo/bar', 'delete_on_clean': 'False'})
self.assertEqual('foo', settings.name)
+ self.assertEqual(1024, settings.key_size)
self.assertEqual('/foo/bar.pub', settings.public_filepath)
self.assertEqual('/foo/bar', settings.private_filepath)
self.assertFalse(settings.delete_on_clean)
@@ -151,6 +171,7 @@ class KeypairSettingsUnitTests(unittest.TestCase):
**{'name': 'foo', 'public_filepath': '/foo/bar.pub',
'private_filepath': '/foo/bar', 'delete_on_clean': 'false'})
self.assertEqual('foo', settings.name)
+ self.assertEqual(1024, settings.key_size)
self.assertEqual('/foo/bar.pub', settings.public_filepath)
self.assertEqual('/foo/bar', settings.private_filepath)
self.assertFalse(settings.delete_on_clean)
@@ -158,8 +179,10 @@ class KeypairSettingsUnitTests(unittest.TestCase):
def test_config_all_delete_false_str_foo(self):
settings = KeypairSettings(
**{'name': 'foo', 'public_filepath': '/foo/bar.pub',
- 'private_filepath': '/foo/bar', 'delete_on_clean': 'foo'})
+ 'private_filepath': '/foo/bar', 'delete_on_clean': 'foo',
+ 'key_size': '999'})
self.assertEqual('foo', settings.name)
+ self.assertEqual(999, settings.key_size)
self.assertEqual('/foo/bar.pub', settings.public_filepath)
self.assertEqual('/foo/bar', settings.private_filepath)
self.assertFalse(settings.delete_on_clean)
@@ -213,6 +236,19 @@ class CreateKeypairsTests(OSIntegrationTestCase):
self.keypair_creator.get_keypair())
self.assertEqual(self.keypair_creator.get_keypair(), keypair)
+ def test_create_keypair_large_key(self):
+ """
+ Tests the creation of a generated keypair without saving to file
+ :return:
+ """
+ self.keypair_creator = OpenStackKeypair(self.os_creds, KeypairSettings(
+ name=self.keypair_name, key_size=10000))
+ self.keypair_creator.create()
+
+ keypair = nova_utils.keypair_exists(self.nova,
+ self.keypair_creator.get_keypair())
+ self.assertEqual(self.keypair_creator.get_keypair(), keypair)
+
def test_create_delete_keypair(self):
"""
Tests the creation then deletion of an OpenStack keypair to ensure
diff --git a/snaps/openstack/tests/create_project_tests.py b/snaps/openstack/tests/create_project_tests.py
index f2af0d9..f388ba5 100644
--- a/snaps/openstack/tests/create_project_tests.py
+++ b/snaps/openstack/tests/create_project_tests.py
@@ -15,6 +15,8 @@
import unittest
import uuid
+from keystoneclient.exceptions import BadRequest
+
from snaps.openstack.create_project import (
OpenStackProject, ProjectSettings, ProjectSettingsError)
from snaps.openstack.create_security_group import OpenStackSecurityGroup
@@ -98,6 +100,19 @@ class CreateProjectSuccessTests(OSComponentTestCase):
if self.project_creator:
self.project_creator.clean()
+ def test_create_project_bad_domain(self):
+ """
+ Tests the creation of an OpenStack project with an invalid domain
+ value. This test will not do anything with a keystone v2.0 client.
+ """
+ if self.keystone.version != keystone_utils.V2_VERSION_STR:
+ self.project_settings.domain = 'foo'
+ self.project_creator = OpenStackProject(self.os_creds,
+ self.project_settings)
+
+ with self.assertRaises(BadRequest):
+ self.project_creator.create()
+
def test_create_project(self):
"""
Tests the creation of an OpenStack project.
@@ -111,6 +126,8 @@ class CreateProjectSuccessTests(OSComponentTestCase):
keystone=self.keystone, project_name=self.project_settings.name)
self.assertIsNotNone(retrieved_project)
self.assertEqual(created_project, retrieved_project)
+ self.assertTrue(validate_project(self.keystone, self.project_settings,
+ created_project))
def test_create_project_2x(self):
"""
@@ -130,6 +147,8 @@ class CreateProjectSuccessTests(OSComponentTestCase):
project2 = OpenStackProject(self.os_creds,
self.project_settings).create()
self.assertEqual(retrieved_project, project2)
+ self.assertTrue(validate_project(self.keystone, self.project_settings,
+ created_project))
def test_create_delete_project(self):
"""
@@ -147,7 +166,8 @@ class CreateProjectSuccessTests(OSComponentTestCase):
self.assertIsNone(self.project_creator.get_project())
- # TODO - Expand tests
+ self.assertTrue(validate_project(self.keystone, self.project_settings,
+ created_project))
class CreateProjectUserTests(OSComponentTestCase):
@@ -254,3 +274,19 @@ class CreateProjectUserTests(OSComponentTestCase):
self.assertEqual(self.project_creator.get_project().id,
sec_grp.project_id)
+
+
+def validate_project(keystone, project_settings, project):
+ """
+ Validates that the project_settings used to create the project have been
+ properly set
+ :param keystone: the keystone client for version checking
+ :param project_settings: the settings used to create the project
+ :param project: the SNAPS-OO Project domain object
+ :return: T/F
+ """
+ if keystone.version == keystone_utils.V2_VERSION_STR:
+ return project_settings.name == project.name
+ else:
+ return (project_settings.name == project.name and
+ project_settings.domain == project.domain_id)
diff --git a/snaps/openstack/tests/create_security_group_tests.py b/snaps/openstack/tests/create_security_group_tests.py
index 7cae62b..a0392ea 100644
--- a/snaps/openstack/tests/create_security_group_tests.py
+++ b/snaps/openstack/tests/create_security_group_tests.py
@@ -120,17 +120,20 @@ class SecurityGroupSettingsUnitTests(unittest.TestCase):
self.assertEqual('foo', settings.name)
def test_invalid_rule(self):
- rule_setting = SecurityGroupRuleSettings(sec_grp_name='bar',
- direction=Direction.ingress)
+ rule_setting = SecurityGroupRuleSettings(
+ sec_grp_name='bar', direction=Direction.ingress,
+ description='test_rule_1')
with self.assertRaises(SecurityGroupSettingsError):
SecurityGroupSettings(name='foo', rule_settings=[rule_setting])
def test_all(self):
rule_settings = list()
rule_settings.append(SecurityGroupRuleSettings(
- sec_grp_name='bar', direction=Direction.egress))
+ sec_grp_name='bar', direction=Direction.egress,
+ description='test_rule_1'))
rule_settings.append(SecurityGroupRuleSettings(
- sec_grp_name='bar', direction=Direction.ingress))
+ sec_grp_name='bar', direction=Direction.ingress,
+ description='test_rule_2'))
settings = SecurityGroupSettings(
name='bar', description='fubar', project_name='foo',
rule_settings=rule_settings)
@@ -209,6 +212,11 @@ class CreateSecurityGroupTests(OSIntegrationTestCase):
validation_utils.objects_equivalent(self.sec_grp_creator.get_rules(),
rules)
+ self.assertTrue(
+ validate_sec_grp(
+ self.neutron, self.sec_grp_creator.sec_grp_settings,
+ self.sec_grp_creator.get_security_group()))
+
def test_create_group_admin_user_to_new_project(self):
"""
Tests the creation of an OpenStack Security Group without custom rules.
@@ -233,6 +241,11 @@ class CreateSecurityGroupTests(OSIntegrationTestCase):
validation_utils.objects_equivalent(self.sec_grp_creator.get_rules(),
rules)
+ self.assertTrue(
+ validate_sec_grp(
+ self.neutron, self.sec_grp_creator.sec_grp_settings,
+ self.sec_grp_creator.get_security_group(), rules))
+
def test_create_group_new_user_to_admin_project(self):
"""
Tests the creation of an OpenStack Security Group without custom rules.
@@ -257,6 +270,11 @@ class CreateSecurityGroupTests(OSIntegrationTestCase):
validation_utils.objects_equivalent(self.sec_grp_creator.get_rules(),
rules)
+ self.assertTrue(
+ validate_sec_grp(
+ self.neutron, self.sec_grp_creator.sec_grp_settings,
+ self.sec_grp_creator.get_security_group(), rules))
+
def test_create_delete_group(self):
"""
Tests the creation of an OpenStack Security Group without custom rules.
@@ -269,6 +287,11 @@ class CreateSecurityGroupTests(OSIntegrationTestCase):
created_sec_grp = self.sec_grp_creator.create()
self.assertIsNotNone(created_sec_grp)
+ self.assertTrue(
+ validate_sec_grp(
+ self.neutron, self.sec_grp_creator.sec_grp_settings,
+ self.sec_grp_creator.get_security_group()))
+
neutron_utils.delete_security_group(self.neutron, created_sec_grp)
self.assertIsNone(neutron_utils.get_security_group(
self.neutron, self.sec_grp_creator.sec_grp_settings.name))
@@ -283,8 +306,9 @@ class CreateSecurityGroupTests(OSIntegrationTestCase):
# Create Image
sec_grp_rule_settings = list()
sec_grp_rule_settings.append(
- SecurityGroupRuleSettings(sec_grp_name=self.sec_grp_name,
- direction=Direction.ingress))
+ SecurityGroupRuleSettings(
+ sec_grp_name=self.sec_grp_name, direction=Direction.ingress,
+ description='test_rule_1'))
sec_grp_settings = SecurityGroupSettings(
name=self.sec_grp_name, description='hello group',
rule_settings=sec_grp_rule_settings)
@@ -302,6 +326,46 @@ class CreateSecurityGroupTests(OSIntegrationTestCase):
validation_utils.objects_equivalent(self.sec_grp_creator.get_rules(),
rules)
+ self.assertTrue(
+ validate_sec_grp(
+ self.neutron, self.sec_grp_creator.sec_grp_settings,
+ self.sec_grp_creator.get_security_group(), rules))
+
+ def test_create_group_with_one_complex_rule(self):
+ """
+ Tests the creation of an OpenStack Security Group with one simple
+ custom rule.
+ """
+ # Create Image
+ sec_grp_rule_settings = list()
+ sec_grp_rule_settings.append(
+ SecurityGroupRuleSettings(
+ sec_grp_name=self.sec_grp_name, direction=Direction.egress,
+ protocol=Protocol.udp, ethertype=Ethertype.IPv4,
+ port_range_min=10, port_range_max=20,
+ description='test_rule_1'))
+ sec_grp_settings = SecurityGroupSettings(
+ name=self.sec_grp_name, description='hello group',
+ rule_settings=sec_grp_rule_settings)
+ self.sec_grp_creator = create_security_group.OpenStackSecurityGroup(
+ self.os_creds, sec_grp_settings)
+ self.sec_grp_creator.create()
+
+ sec_grp = neutron_utils.get_security_group(self.neutron,
+ self.sec_grp_name)
+ validation_utils.objects_equivalent(
+ self.sec_grp_creator.get_security_group(), sec_grp)
+ rules = neutron_utils.get_rules_by_security_group(
+ self.neutron, self.sec_grp_creator.get_security_group())
+ self.assertEqual(len(self.sec_grp_creator.get_rules()), len(rules))
+ validation_utils.objects_equivalent(self.sec_grp_creator.get_rules(),
+ rules)
+
+ self.assertTrue(
+ validate_sec_grp(
+ self.neutron, self.sec_grp_creator.sec_grp_settings,
+ self.sec_grp_creator.get_security_group(), rules))
+
def test_create_group_with_several_rules(self):
"""
Tests the creation of an OpenStack Security Group with one simple
@@ -310,20 +374,20 @@ class CreateSecurityGroupTests(OSIntegrationTestCase):
# Create Image
sec_grp_rule_settings = list()
sec_grp_rule_settings.append(
- SecurityGroupRuleSettings(sec_grp_name=self.sec_grp_name,
- direction=Direction.ingress))
+ SecurityGroupRuleSettings(
+ sec_grp_name=self.sec_grp_name, direction=Direction.ingress,
+ description='test_rule_1'))
sec_grp_rule_settings.append(
- SecurityGroupRuleSettings(sec_grp_name=self.sec_grp_name,
- direction=Direction.egress,
- protocol=Protocol.udp,
- ethertype=Ethertype.IPv6))
+ SecurityGroupRuleSettings(
+ sec_grp_name=self.sec_grp_name, direction=Direction.egress,
+ protocol=Protocol.udp, ethertype=Ethertype.IPv6,
+ description='test_rule_2'))
sec_grp_rule_settings.append(
- SecurityGroupRuleSettings(sec_grp_name=self.sec_grp_name,
- direction=Direction.egress,
- protocol=Protocol.udp,
- ethertype=Ethertype.IPv4,
- port_range_min=10,
- port_range_max=20))
+ SecurityGroupRuleSettings(
+ sec_grp_name=self.sec_grp_name, direction=Direction.egress,
+ protocol=Protocol.udp, ethertype=Ethertype.IPv4,
+ port_range_min=10, port_range_max=20,
+ description='test_rule_3'))
sec_grp_settings = SecurityGroupSettings(
name=self.sec_grp_name, description='hello group',
rule_settings=sec_grp_rule_settings)
@@ -341,6 +405,11 @@ class CreateSecurityGroupTests(OSIntegrationTestCase):
validation_utils.objects_equivalent(self.sec_grp_creator.get_rules(),
rules)
+ self.assertTrue(
+ validate_sec_grp(
+ self.neutron, self.sec_grp_creator.sec_grp_settings,
+ self.sec_grp_creator.get_security_group(), rules))
+
def test_add_rule(self):
"""
Tests the creation of an OpenStack Security Group with one simple
@@ -349,8 +418,9 @@ class CreateSecurityGroupTests(OSIntegrationTestCase):
# Create Image
sec_grp_rule_settings = list()
sec_grp_rule_settings.append(
- SecurityGroupRuleSettings(sec_grp_name=self.sec_grp_name,
- direction=Direction.ingress))
+ SecurityGroupRuleSettings(
+ sec_grp_name=self.sec_grp_name, direction=Direction.ingress,
+ description='test_rule_1'))
sec_grp_settings = SecurityGroupSettings(
name=self.sec_grp_name, description='hello group',
rule_settings=sec_grp_rule_settings)
@@ -362,6 +432,15 @@ class CreateSecurityGroupTests(OSIntegrationTestCase):
self.sec_grp_name)
validation_utils.objects_equivalent(
self.sec_grp_creator.get_security_group(), sec_grp)
+
+ rules = neutron_utils.get_rules_by_security_group(
+ self.neutron, self.sec_grp_creator.get_security_group())
+
+ self.assertTrue(
+ validate_sec_grp(
+ self.neutron, self.sec_grp_creator.sec_grp_settings,
+ self.sec_grp_creator.get_security_group(), rules))
+
rules = neutron_utils.get_rules_by_security_group(
self.neutron, self.sec_grp_creator.get_security_group())
self.assertEqual(len(self.sec_grp_creator.get_rules()), len(rules))
@@ -370,7 +449,8 @@ class CreateSecurityGroupTests(OSIntegrationTestCase):
self.sec_grp_creator.add_rule(SecurityGroupRuleSettings(
sec_grp_name=self.sec_grp_creator.sec_grp_settings.name,
- direction=Direction.egress, protocol=Protocol.icmp))
+ direction=Direction.egress, protocol=Protocol.icmp,
+ description='test_rule_2'))
rules2 = neutron_utils.get_rules_by_security_group(
self.neutron, self.sec_grp_creator.get_security_group())
self.assertEqual(len(rules) + 1, len(rules2))
@@ -383,20 +463,20 @@ class CreateSecurityGroupTests(OSIntegrationTestCase):
# Create Image
sec_grp_rule_settings = list()
sec_grp_rule_settings.append(
- SecurityGroupRuleSettings(sec_grp_name=self.sec_grp_name,
- direction=Direction.ingress))
+ SecurityGroupRuleSettings(
+ sec_grp_name=self.sec_grp_name, direction=Direction.ingress,
+ description='test_rule_1'))
sec_grp_rule_settings.append(
- SecurityGroupRuleSettings(sec_grp_name=self.sec_grp_name,
- direction=Direction.egress,
- protocol=Protocol.udp,
- ethertype=Ethertype.IPv6))
+ SecurityGroupRuleSettings(
+ sec_grp_name=self.sec_grp_name, direction=Direction.egress,
+ protocol=Protocol.udp, ethertype=Ethertype.IPv6,
+ description='test_rule_2'))
sec_grp_rule_settings.append(
- SecurityGroupRuleSettings(sec_grp_name=self.sec_grp_name,
- direction=Direction.egress,
- protocol=Protocol.udp,
- ethertype=Ethertype.IPv4,
- port_range_min=10,
- port_range_max=20))
+ SecurityGroupRuleSettings(
+ sec_grp_name=self.sec_grp_name, direction=Direction.egress,
+ protocol=Protocol.udp, ethertype=Ethertype.IPv4,
+ port_range_min=10, port_range_max=20,
+ description='test_rule_3'))
sec_grp_settings = SecurityGroupSettings(
name=self.sec_grp_name, description='hello group',
rule_settings=sec_grp_rule_settings)
@@ -414,6 +494,11 @@ class CreateSecurityGroupTests(OSIntegrationTestCase):
validation_utils.objects_equivalent(self.sec_grp_creator.get_rules(),
rules)
+ self.assertTrue(
+ validate_sec_grp(
+ self.neutron, self.sec_grp_creator.sec_grp_settings,
+ self.sec_grp_creator.get_security_group(), rules))
+
self.sec_grp_creator.remove_rule(
rule_id=rules[0].id)
rules_after_del = neutron_utils.get_rules_by_security_group(
@@ -429,20 +514,20 @@ class CreateSecurityGroupTests(OSIntegrationTestCase):
# Create Image
sec_grp_rule_settings = list()
sec_grp_rule_settings.append(
- SecurityGroupRuleSettings(sec_grp_name=self.sec_grp_name,
- direction=Direction.ingress))
+ SecurityGroupRuleSettings(
+ sec_grp_name=self.sec_grp_name, direction=Direction.ingress,
+ description='test_rule_1'))
sec_grp_rule_settings.append(
- SecurityGroupRuleSettings(sec_grp_name=self.sec_grp_name,
- direction=Direction.egress,
- protocol=Protocol.udp,
- ethertype=Ethertype.IPv6))
+ SecurityGroupRuleSettings(
+ sec_grp_name=self.sec_grp_name, direction=Direction.egress,
+ protocol=Protocol.udp, ethertype=Ethertype.IPv6,
+ description='test_rule_2'))
sec_grp_rule_settings.append(
- SecurityGroupRuleSettings(sec_grp_name=self.sec_grp_name,
- direction=Direction.egress,
- protocol=Protocol.udp,
- ethertype=Ethertype.IPv4,
- port_range_min=10,
- port_range_max=20))
+ SecurityGroupRuleSettings(
+ sec_grp_name=self.sec_grp_name, direction=Direction.egress,
+ protocol=Protocol.udp, ethertype=Ethertype.IPv4,
+ port_range_min=10, port_range_max=20,
+ description='test_rule_3'))
sec_grp_settings = SecurityGroupSettings(
name=self.sec_grp_name, description='hello group',
rule_settings=sec_grp_rule_settings)
@@ -454,17 +539,86 @@ class CreateSecurityGroupTests(OSIntegrationTestCase):
self.sec_grp_name)
validation_utils.objects_equivalent(
self.sec_grp_creator.get_security_group(), sec_grp)
+
rules = neutron_utils.get_rules_by_security_group(
self.neutron, self.sec_grp_creator.get_security_group())
self.assertEqual(len(self.sec_grp_creator.get_rules()), len(rules))
validation_utils.objects_equivalent(self.sec_grp_creator.get_rules(),
rules)
+ self.assertTrue(
+ validate_sec_grp(
+ self.neutron, self.sec_grp_creator.sec_grp_settings,
+ self.sec_grp_creator.get_security_group(), rules))
+
self.sec_grp_creator.remove_rule(rule_setting=sec_grp_rule_settings[0])
rules_after_del = neutron_utils.get_rules_by_security_group(
self.neutron,
self.sec_grp_creator.get_security_group())
self.assertEqual(len(rules) - 1, len(rules_after_del))
-# TODO - Add more tests with different rules. Rule creation parameters can be
-# somewhat complex
+
+def validate_sec_grp(neutron, sec_grp_settings, sec_grp, rules=list()):
+ """
+ Returns True is the settings on a security group are properly contained
+ on the SNAPS SecurityGroup domain object
+ :param neutron: the neutron client
+ :param sec_grp_settings: the security group configuration
+ :param sec_grp: the SNAPS-OO security group object
+ :param rules: collection of SNAPS-OO security group rule objects
+ :return: T/F
+ """
+ return (sec_grp.description == sec_grp_settings.description and
+ sec_grp.name == sec_grp_settings.name and
+ validate_sec_grp_rules(
+ neutron, sec_grp_settings.rule_settings, rules))
+
+
+def validate_sec_grp_rules(neutron, rule_settings, rules):
+ """
+ Returns True is the settings on a security group rule are properly
+ contained on the SNAPS SecurityGroupRule domain object.
+ This function will only operate on rules that contain a description as
+ this is the only means to tell if the rule is custom or defaulted by
+ OpenStack
+ :param neutron: the neutron client
+ :param rule_settings: collection of SecurityGroupRuleSettings objects
+ :param rules: a collection of SecurityGroupRule domain objects
+ :return: T/F
+ """
+
+ for rule_setting in rule_settings:
+ if rule_setting.description:
+ match = False
+ for rule in rules:
+ if rule_setting.protocol == Protocol.null:
+ setting_proto = None
+ else:
+ setting_proto = rule_setting.protocol.name
+
+ sec_grp = neutron_utils.get_security_group(
+ neutron, rule_setting.sec_grp_name)
+
+ setting_eth_type = create_security_group.Ethertype.IPv4
+ if rule_setting.ethertype:
+ setting_eth_type = rule_setting.ethertype
+
+ if not sec_grp:
+ return False
+
+ if (rule.description == rule_setting.description and
+ rule.direction == rule_setting.direction.name and
+ rule.ethertype == setting_eth_type.name and
+ rule.port_range_max == rule_setting.port_range_max and
+ rule.port_range_min == rule_setting.port_range_min and
+ rule.protocol == setting_proto and
+ rule.remote_group_id == rule_setting.remote_group_id and
+ rule.remote_ip_prefix == rule_setting.remote_ip_prefix and
+ rule.security_group_id == sec_grp.id):
+ match = True
+ break
+
+ if not match:
+ return False
+
+ return True
diff --git a/snaps/openstack/utils/keystone_utils.py b/snaps/openstack/utils/keystone_utils.py
index 8446df0..e070a49 100644
--- a/snaps/openstack/utils/keystone_utils.py
+++ b/snaps/openstack/utils/keystone_utils.py
@@ -127,8 +127,12 @@ def get_project(keystone=None, os_creds=None, project_name=None):
projects = keystone.projects.list(**{'name': project_name})
for project in projects:
+ domain_id = None
+ if keystone.version != V2_VERSION_STR:
+ domain_id = project.domain_id
if project.name == project_name:
- return Project(name=project.name, project_id=project.id)
+ return Project(name=project.name, project_id=project.id,
+ domain_id=domain_id)
return None
@@ -140,6 +144,8 @@ def create_project(keystone, project_settings):
:param project_settings: the project configuration
:return: SNAPS-OO Project domain object
"""
+ domain_id = None
+
if keystone.version == V2_VERSION_STR:
os_project = keystone.tenants.create(
project_settings.name, project_settings.description,
@@ -149,8 +155,10 @@ def create_project(keystone, project_settings):
project_settings.name, project_settings.domain,
description=project_settings.description,
enabled=project_settings.enabled)
+ domain_id = os_project.domain_id
- return Project(name=os_project.name, project_id=os_project.id)
+ return Project(
+ name=os_project.name, project_id=os_project.id, domain_id=domain_id)
def delete_project(keystone, project):
diff --git a/snaps/openstack/utils/neutron_utils.py b/snaps/openstack/utils/neutron_utils.py
index bf8cb08..2f8ef7e 100644
--- a/snaps/openstack/utils/neutron_utils.py
+++ b/snaps/openstack/utils/neutron_utils.py
@@ -331,11 +331,7 @@ def create_security_group(neutron, keystone, sec_grp_settings):
sec_grp_settings.name)
os_group = neutron.create_security_group(
sec_grp_settings.dict_for_neutron(keystone))
- return SecurityGroup(
- id=os_group['security_group']['id'],
- name=os_group['security_group']['name'],
- project_id=os_group['security_group'].get(
- 'project_id', os_group['security_group'].get('tenant_id')))
+ return SecurityGroup(**os_group['security_group'])
def delete_security_group(neutron, sec_grp):
@@ -360,9 +356,7 @@ def get_security_group(neutron, name):
groups = neutron.list_security_groups(**{'name': name})
for group in groups['security_groups']:
if group['name'] == name:
- return SecurityGroup(
- id=group['id'], name=group['name'],
- project_id=group.get('project_id', group.get('tenant_id')))
+ return SecurityGroup(**group)
return None
@@ -378,9 +372,7 @@ def get_security_group_by_id(neutron, sec_grp_id):
groups = neutron.list_security_groups(**{'id': sec_grp_id})
for group in groups['security_groups']:
if group['id'] == sec_grp_id:
- return SecurityGroup(
- id=group['id'], name=group['name'],
- project_id=group.get('project_id', group.get('tenant_id')))
+ return SecurityGroup(**group)
return None