aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--functest/tests/unit/utils/test_openstack_utils.py55
-rw-r--r--functest/utils/openstack_utils.py34
2 files changed, 89 insertions, 0 deletions
diff --git a/functest/tests/unit/utils/test_openstack_utils.py b/functest/tests/unit/utils/test_openstack_utils.py
index 673ad5e20..7f3995d03 100644
--- a/functest/tests/unit/utils/test_openstack_utils.py
+++ b/functest/tests/unit/utils/test_openstack_utils.py
@@ -229,6 +229,12 @@ class OSUtilsTesting(unittest.TestCase):
self.sec_group = {'id': 'sec_group_id',
'name': 'test_sec_group'}
+ self.sec_group_rule = {'id': 'sec_group_rule_id',
+ 'direction': 'direction',
+ 'protocol': 'protocol',
+ 'port_range_max': 'port_max',
+ 'security_group_id': self.sec_group['id'],
+ 'port_range_min': 'port_min'}
self.neutron_floatingip = {'id': 'fip_id',
'floating_ip_address': 'test_ip'}
self.neutron_client = mock.Mock()
@@ -260,6 +266,9 @@ class OSUtilsTesting(unittest.TestCase):
'show_bgpvpn.return_value': self.mock_return,
'list_security_groups.return_value': {'security_groups':
[self.sec_group]},
+ 'list_security_group_rules.'
+ 'return_value': {'security_group_rules':
+ [self.sec_group_rule]},
'create_security_group_rule.return_value': mock.Mock(),
'create_security_group.return_value': {'security_group':
self.sec_group},
@@ -1247,6 +1256,52 @@ class OSUtilsTesting(unittest.TestCase):
'test_sec_group'),
'sec_group_id')
+ def test_get_security_group_rules_default(self):
+ self.assertEqual(openstack_utils.
+ get_security_group_rules(self.neutron_client,
+ self.sec_group['id']),
+ [self.sec_group_rule])
+
+ @mock.patch('functest.utils.openstack_utils.logger.error')
+ def test_get_security_group_rules_exception(self, mock_logger_error):
+ self.assertEqual(openstack_utils.
+ get_security_group_rules(Exception,
+ 'sec_group_id'),
+ None)
+ self.assertTrue(mock_logger_error.called)
+
+ def test_check_security_group_rules_not_exists(self):
+ self.assertEqual(openstack_utils.
+ check_security_group_rules(self.neutron_client,
+ 'sec_group_id_2',
+ 'direction',
+ 'protocol',
+ 'port_min',
+ 'port_max'),
+ True)
+
+ def test_check_security_group_rules_exists(self):
+ self.assertEqual(openstack_utils.
+ check_security_group_rules(self.neutron_client,
+ self.sec_group['id'],
+ 'direction',
+ 'protocol',
+ 'port_min',
+ 'port_max'),
+ False)
+
+ @mock.patch('functest.utils.openstack_utils.logger.error')
+ def test_check_security_group_rules_exception(self, mock_logger_error):
+ self.assertEqual(openstack_utils.
+ check_security_group_rules(Exception,
+ 'sec_group_id',
+ 'direction',
+ 'protocol',
+ 'port_max',
+ 'port_min'),
+ None)
+ self.assertTrue(mock_logger_error.called)
+
def test_create_security_group_default(self):
self.assertEqual(openstack_utils.
create_security_group(self.neutron_client,
diff --git a/functest/utils/openstack_utils.py b/functest/utils/openstack_utils.py
index ffc870f62..4663f7ba6 100644
--- a/functest/utils/openstack_utils.py
+++ b/functest/utils/openstack_utils.py
@@ -1054,6 +1054,40 @@ def create_secgroup_rule(neutron_client, sg_id, direction, protocol,
return False
+def get_security_group_rules(neutron_client, sg_id):
+ try:
+ security_rules = neutron_client.list_security_group_rules()[
+ 'security_group_rules']
+ security_rules = [rule for rule in security_rules
+ if rule["security_group_id"] == sg_id]
+ return security_rules
+ except Exception, e:
+ logger.error("Error [get_security_group_rules(neutron_client, sg_id)]:"
+ " %s" % e)
+ return None
+
+
+def check_security_group_rules(neutron_client, sg_id, direction, protocol,
+ port_min=None, port_max=None):
+ try:
+ security_rules = get_security_group_rules(neutron_client, sg_id)
+ security_rules = [rule for rule in security_rules
+ if (rule["direction"].lower() == direction
+ and rule["protocol"].lower() == protocol
+ and rule["port_range_min"] == port_min
+ and rule["port_range_max"] == port_max)]
+ if len(security_rules) == 0:
+ return True
+ else:
+ return False
+ except Exception, e:
+ logger.error("Error [check_security_group_rules("
+ " neutron_client, sg_id, direction,"
+ " protocol, port_min=None, port_max=None)]: "
+ "%s" % e)
+ return None
+
+
def create_security_group_full(neutron_client,
sg_name, sg_description):
sg_id = get_security_group_id(neutron_client, sg_name)