summaryrefslogtreecommitdiffstats
path: root/snaps
diff options
context:
space:
mode:
Diffstat (limited to 'snaps')
-rw-r--r--snaps/openstack/create_project.py8
-rw-r--r--snaps/openstack/create_router.py24
-rw-r--r--snaps/openstack/create_security_group.py34
-rw-r--r--snaps/openstack/tests/create_project_tests.py7
-rw-r--r--snaps/openstack/tests/create_router_tests.py13
-rw-r--r--snaps/openstack/tests/create_security_group_tests.py18
-rw-r--r--snaps/tests/file_utils_tests.py47
7 files changed, 100 insertions, 51 deletions
diff --git a/snaps/openstack/create_project.py b/snaps/openstack/create_project.py
index 0384ccc..a20033e 100644
--- a/snaps/openstack/create_project.py
+++ b/snaps/openstack/create_project.py
@@ -131,5 +131,11 @@ class ProjectSettings:
self.enabled = True
if not self.name:
- raise Exception(
+ raise ProjectSettingsError(
"The attribute name is required for ProjectSettings")
+
+
+class ProjectSettingsError(Exception):
+ """
+ Exception to be thrown when project settings attributes are incorrect
+ """
diff --git a/snaps/openstack/create_router.py b/snaps/openstack/create_router.py
index db6ffe3..e50009c 100644
--- a/snaps/openstack/create_router.py
+++ b/snaps/openstack/create_router.py
@@ -39,7 +39,7 @@ class OpenStackRouter:
self.__os_creds = os_creds
if not router_settings:
- raise Exception('router_settings is required')
+ raise RouterCreationError('router_settings is required')
self.router_settings = router_settings
self.__neutron = None
@@ -84,7 +84,7 @@ class OpenStackRouter:
self.__internal_router_interface = neutron_utils.add_interface_router(
self.__neutron, self.__router, subnet=internal_subnet)
else:
- raise Exception(
+ raise RouterCreationError(
'Subnet not found with name ' + internal_subnet_name)
for port_setting in self.router_settings.port_settings:
@@ -108,7 +108,7 @@ class OpenStackRouter:
self.__router,
port=port)
else:
- raise Exception(
+ raise RouterCreationError(
'Error creating port with name - ' + port_setting.name)
return self.__router
@@ -163,6 +163,12 @@ class OpenStackRouter:
return self.__internal_router_interface
+class RouterCreationError(Exception):
+ """
+ Exception to be thrown when an router instance cannot be created
+ """
+
+
class RouterSettings:
"""
Class representing a router configuration
@@ -209,7 +215,7 @@ class RouterSettings:
PortSettings(**interface['port']))
if not self.name:
- raise Exception('Name is required')
+ raise RouterSettingsError('Name is required')
def dict_for_neutron(self, neutron, os_creds):
"""
@@ -239,7 +245,7 @@ class RouterSettings:
if project_id:
out['project_id'] = project_id
else:
- raise Exception(
+ raise RouterSettingsError(
'Could not find project ID for project named - ' +
self.project_name)
if self.admin_state_up is not None:
@@ -251,7 +257,7 @@ class RouterSettings:
ext_gw['network_id'] = ext_net.id
out['external_gateway_info'] = ext_gw
else:
- raise Exception(
+ raise RouterSettingsError(
'Could not find the external network named - ' +
self.external_gateway)
@@ -259,3 +265,9 @@ class RouterSettings:
# TODO: Add external_fixed_ips Tests
return {'router': out}
+
+
+class RouterSettingsError(Exception):
+ """
+ Exception to be thrown when router settings attributes are incorrect
+ """
diff --git a/snaps/openstack/create_security_group.py b/snaps/openstack/create_security_group.py
index 3dbf559..4291796 100644
--- a/snaps/openstack/create_security_group.py
+++ b/snaps/openstack/create_security_group.py
@@ -240,11 +240,11 @@ class SecurityGroupSettings:
**rule_setting))
if not self.name:
- raise Exception('The attribute name is required')
+ raise SecurityGroupSettingsError('The attribute name is required')
for rule_setting in self.rule_settings:
if rule_setting.sec_grp_name is not self.name:
- raise Exception(
+ raise SecurityGroupSettingsError(
'Rule settings must correspond with the name of this '
'security group')
@@ -272,7 +272,7 @@ class SecurityGroupSettings:
if project_id:
out['project_id'] = project_id
else:
- raise Exception(
+ raise SecurityGroupSettingsError(
'Could not find project ID for project named - ' +
self.project_name)
@@ -305,6 +305,13 @@ class Ethertype(enum.Enum):
IPv6 = 6
+class SecurityGroupSettingsError(Exception):
+ """
+ Exception to be thrown when security group settings attributes are
+ invalid
+ """
+
+
class SecurityGroupRuleSettings:
"""
Class representing a keypair configuration
@@ -368,7 +375,8 @@ class SecurityGroupRuleSettings:
self.remote_ip_prefix = kwargs.get('remote_ip_prefix')
if not self.direction or not self.sec_grp_name:
- raise Exception('direction and sec_grp_name are required')
+ raise SecurityGroupRuleSettingsError(
+ 'direction and sec_grp_name are required')
def dict_for_neutron(self, neutron):
"""
@@ -399,7 +407,7 @@ class SecurityGroupRuleSettings:
if sec_grp:
out['security_group_id'] = sec_grp.id
else:
- raise Exception(
+ raise SecurityGroupRuleSettingsError(
'Cannot locate security group with name - ' +
self.sec_grp_name)
if self.remote_group_id:
@@ -490,7 +498,8 @@ def map_direction(direction):
elif dir_str == 'ingress':
return Direction.ingress
else:
- raise Exception('Invalid Direction - ' + dir_str)
+ raise SecurityGroupRuleSettingsError(
+ 'Invalid Direction - ' + dir_str)
def map_protocol(protocol):
@@ -516,7 +525,8 @@ def map_protocol(protocol):
elif proto_str == 'null':
return Protocol.null
else:
- raise Exception('Invalid Protocol - ' + proto_str)
+ raise SecurityGroupRuleSettingsError(
+ 'Invalid Protocol - ' + proto_str)
def map_ethertype(ethertype):
@@ -538,4 +548,12 @@ def map_ethertype(ethertype):
elif eth_str == 'IPv4':
return Ethertype.IPv4
else:
- raise Exception('Invalid Ethertype - ' + eth_str)
+ raise SecurityGroupRuleSettingsError(
+ 'Invalid Ethertype - ' + eth_str)
+
+
+class SecurityGroupRuleSettingsError(Exception):
+ """
+ Exception to be thrown when security group rule settings attributes are
+ invalid
+ """
diff --git a/snaps/openstack/tests/create_project_tests.py b/snaps/openstack/tests/create_project_tests.py
index 3c6b2d1..f2af0d9 100644
--- a/snaps/openstack/tests/create_project_tests.py
+++ b/snaps/openstack/tests/create_project_tests.py
@@ -15,7 +15,8 @@
import unittest
import uuid
-from snaps.openstack.create_project import OpenStackProject, ProjectSettings
+from snaps.openstack.create_project import (
+ OpenStackProject, ProjectSettings, ProjectSettingsError)
from snaps.openstack.create_security_group import OpenStackSecurityGroup
from snaps.openstack.create_security_group import SecurityGroupSettings
from snaps.openstack.create_user import OpenStackUser
@@ -32,11 +33,11 @@ class ProjectSettingsUnitTests(unittest.TestCase):
"""
def test_no_params(self):
- with self.assertRaises(Exception):
+ with self.assertRaises(ProjectSettingsError):
ProjectSettings()
def test_empty_config(self):
- with self.assertRaises(Exception):
+ with self.assertRaises(ProjectSettingsError):
ProjectSettings(**dict())
def test_name_only(self):
diff --git a/snaps/openstack/tests/create_router_tests.py b/snaps/openstack/tests/create_router_tests.py
index 5f2534d..efa0993 100644
--- a/snaps/openstack/tests/create_router_tests.py
+++ b/snaps/openstack/tests/create_router_tests.py
@@ -1,4 +1,4 @@
-# Copyright (c) 2016 Cable Television Laboratories, Inc. ("CableLabs")
+# Copyright (c) 2017 Cable Television Laboratories, Inc. ("CableLabs")
# and others. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
@@ -20,7 +20,8 @@ from snaps.openstack import create_router
from snaps.openstack.create_network import (
NetworkSettings, PortSettings)
from snaps.openstack.create_network import OpenStackNetwork
-from snaps.openstack.create_router import RouterSettings
+from snaps.openstack.create_router import (
+ RouterSettings, RouterSettingsError, RouterCreationError)
from snaps.openstack.tests.os_source_file_test import OSIntegrationTestCase
from snaps.openstack.utils import neutron_utils
@@ -38,11 +39,11 @@ class RouterSettingsUnitTests(unittest.TestCase):
"""
def test_no_params(self):
- with self.assertRaises(Exception):
+ with self.assertRaises(RouterSettingsError):
RouterSettings()
def test_empty_config(self):
- with self.assertRaises(Exception):
+ with self.assertRaises(RouterSettingsError):
RouterSettings(**dict())
def test_name_only(self):
@@ -338,7 +339,7 @@ class CreateRouterNegativeTests(OSIntegrationTestCase):
"""
Test creating a router without a name.
"""
- with self.assertRaises(Exception):
+ with self.assertRaises(RouterSettingsError):
router_settings = RouterSettings(
name=None, external_gateway=self.ext_net_name)
self.router_creator = create_router.OpenStackRouter(
@@ -349,7 +350,7 @@ class CreateRouterNegativeTests(OSIntegrationTestCase):
"""
Test creating a router without a valid network gateway name.
"""
- with self.assertRaises(Exception):
+ with self.assertRaises(RouterSettingsError):
router_settings = RouterSettings(name=self.guid + '-pub-router',
external_gateway="Invalid_name")
self.router_creator = create_router.OpenStackRouter(
diff --git a/snaps/openstack/tests/create_security_group_tests.py b/snaps/openstack/tests/create_security_group_tests.py
index 75c6387..dd28d7d 100644
--- a/snaps/openstack/tests/create_security_group_tests.py
+++ b/snaps/openstack/tests/create_security_group_tests.py
@@ -19,7 +19,9 @@ from snaps.openstack import create_security_group
from snaps.openstack.create_security_group import (SecurityGroupSettings,
SecurityGroupRuleSettings,
Direction, Ethertype,
- Protocol)
+ Protocol,
+ SecurityGroupRuleSettingsError,
+ SecurityGroupSettingsError)
from snaps.openstack.tests import validation_utils
from snaps.openstack.tests.os_source_file_test import OSIntegrationTestCase
from snaps.openstack.utils import neutron_utils
@@ -33,19 +35,19 @@ class SecurityGroupRuleSettingsUnitTests(unittest.TestCase):
"""
def test_no_params(self):
- with self.assertRaises(Exception):
+ with self.assertRaises(SecurityGroupRuleSettingsError):
SecurityGroupRuleSettings()
def test_empty_config(self):
- with self.assertRaises(Exception):
+ with self.assertRaises(SecurityGroupRuleSettingsError):
SecurityGroupRuleSettings(**dict())
def test_name_only(self):
- with self.assertRaises(Exception):
+ with self.assertRaises(SecurityGroupRuleSettingsError):
SecurityGroupRuleSettings(sec_grp_name='foo')
def test_config_with_name_only(self):
- with self.assertRaises(Exception):
+ with self.assertRaises(SecurityGroupRuleSettingsError):
SecurityGroupRuleSettings(**{'sec_grp_name': 'foo'})
def test_name_and_direction(self):
@@ -105,11 +107,11 @@ class SecurityGroupSettingsUnitTests(unittest.TestCase):
"""
def test_no_params(self):
- with self.assertRaises(Exception):
+ with self.assertRaises(SecurityGroupSettingsError):
SecurityGroupSettings()
def test_empty_config(self):
- with self.assertRaises(Exception):
+ with self.assertRaises(SecurityGroupSettingsError):
SecurityGroupSettings(**dict())
def test_name_only(self):
@@ -123,7 +125,7 @@ class SecurityGroupSettingsUnitTests(unittest.TestCase):
def test_invalid_rule(self):
rule_setting = SecurityGroupRuleSettings(sec_grp_name='bar',
direction=Direction.ingress)
- with self.assertRaises(Exception):
+ with self.assertRaises(SecurityGroupSettingsError):
SecurityGroupSettings(name='foo', rule_settings=[rule_setting])
def test_all(self):
diff --git a/snaps/tests/file_utils_tests.py b/snaps/tests/file_utils_tests.py
index 62d96e8..f3a622a 100644
--- a/snaps/tests/file_utils_tests.py
+++ b/snaps/tests/file_utils_tests.py
@@ -19,6 +19,7 @@ import shutil
import uuid
from snaps import file_utils
+from snaps.openstack.tests import openstack_tests
__author__ = 'spisarski'
@@ -30,36 +31,39 @@ class FileUtilsTests(unittest.TestCase):
def setUp(self):
guid = self.__class__.__name__ + '-' + str(uuid.uuid4())
- self.tmpDir = 'tmp/' + str(guid)
- if not os.path.exists(self.tmpDir):
- os.makedirs(self.tmpDir)
+ self.tmp_dir = '.tmp/'
+ self.test_dir = self.tmp_dir + str(guid)
+ if not os.path.exists(self.test_dir):
+ os.makedirs(self.test_dir)
- self.tmpFile = self.tmpDir + '/bar.txt'
+ self.tmpFile = self.test_dir + '/bar.txt'
if not os.path.exists(self.tmpFile):
open(self.tmpFile, 'wb')
def tearDown(self):
- if os.path.exists(self.tmpDir) and os.path.isdir(self.tmpDir):
- shutil.rmtree(self.tmpDir)
+ if os.path.exists(self.test_dir) and os.path.isdir(self.test_dir):
+ shutil.rmtree(self.tmp_dir)
def testFileIsDirectory(self):
"""
- Ensure the file_utils.fileExists() method returns false with a directory
+ Ensure the file_utils.fileExists() method returns false with a
+ directory
"""
- result = file_utils.file_exists(self.tmpDir)
+ result = file_utils.file_exists(self.test_dir)
self.assertFalse(result)
- # TODO - Cleanup directory
def testFileNotExist(self):
"""
- Ensure the file_utils.fileExists() method returns false with a bogus file
+ Ensure the file_utils.fileExists() method returns false with a bogus
+ file
"""
result = file_utils.file_exists('/foo/bar.txt')
self.assertFalse(result)
def testFileExists(self):
"""
- Ensure the file_utils.fileExists() method returns false with a directory
+ Ensure the file_utils.fileExists() method returns false with a
+ directory
"""
if not os.path.exists(self.tmpFile):
os.makedirs(self.tmpFile)
@@ -72,31 +76,36 @@ class FileUtilsTests(unittest.TestCase):
Tests the file_utils.download() method when given a bad URL
"""
with self.assertRaises(Exception):
- file_utils.download('http://bunkUrl.com/foo/bar.iso', self.tmpDir)
+ file_utils.download('http://bunkUrl.com/foo/bar.iso',
+ self.test_dir)
def testDownloadBadDir(self):
"""
Tests the file_utils.download() method when given a bad URL
"""
with self.assertRaises(Exception):
- file_utils.download('http://download.cirros-cloud.net/0.3.4/cirros-0.3.4-x86_64-disk.img', '/foo/bar')
+ file_utils.download(openstack_tests.CIRROS_DEFAULT_IMAGE_URL,
+ '/foo/bar')
def testCirrosImageDownload(self):
"""
- Tests the file_utils.download() method when given a good Cirros QCOW2 URL
+ Tests the file_utils.download() method when given a good Cirros QCOW2
+ URL
"""
- image_file = file_utils.download('http://download.cirros-cloud.net/0.3.4/cirros-0.3.4-x86_64-disk.img',
- self.tmpDir)
+ image_file = file_utils.download(
+ openstack_tests.CIRROS_DEFAULT_IMAGE_URL, self.test_dir)
self.assertIsNotNone(image_file)
- self.assertTrue(image_file.name.endswith("cirros-0.3.4-x86_64-disk.img"))
- self.assertTrue(image_file.name.startswith(self.tmpDir))
+ self.assertTrue(
+ image_file.name.endswith("cirros-0.3.4-x86_64-disk.img"))
+ self.assertTrue(image_file.name.startswith(self.test_dir))
def testReadOSEnvFile(self):
"""
Tests that the OS Environment file is correctly parsed
:return:
"""
- rc_file_path = pkg_resources.resource_filename('snaps.openstack.tests.conf', 'overcloudrc_test')
+ rc_file_path = pkg_resources.resource_filename(
+ 'snaps.openstack.tests.conf', 'overcloudrc_test')
os_env_dict = file_utils.read_os_env_file(rc_file_path)
self.assertEqual('test_pw', os_env_dict['OS_PASSWORD'])
self.assertEqual('http://foo:5000/v2.0/', os_env_dict['OS_AUTH_URL'])