From 41b103d9a6804a97ca85e2b09b628cea06219faf Mon Sep 17 00:00:00 2001 From: tomsou Date: Mon, 27 Mar 2017 15:56:02 +0000 Subject: Check a rule existence for a specific security group Implement a functionality to check if a rule concerning a specific security group exists - function get_security_group_rules(neutron_client, sg_id) returns the list of the security rules for a specific security group - function check_security_group_rules(neutron_client, sg_id, direction, protocol, port_min=None, port_max=None) checks if a specific rule for a specific security group exists and returns True or False - implement unit tests for the two new functions This new functionality is needed for sdnvpn project Change-Id: Ib930bc9a76141932f4164d88e2640b49f3df4d77 Signed-off-by: tomsou --- functest/tests/unit/utils/test_openstack_utils.py | 55 +++++++++++++++++++++++ functest/utils/openstack_utils.py | 34 ++++++++++++++ 2 files changed, 89 insertions(+) diff --git a/functest/tests/unit/utils/test_openstack_utils.py b/functest/tests/unit/utils/test_openstack_utils.py index 673ad5e2..7f3995d0 100644 --- a/functest/tests/unit/utils/test_openstack_utils.py +++ b/functest/tests/unit/utils/test_openstack_utils.py @@ -229,6 +229,12 @@ class OSUtilsTesting(unittest.TestCase): self.sec_group = {'id': 'sec_group_id', 'name': 'test_sec_group'} + self.sec_group_rule = {'id': 'sec_group_rule_id', + 'direction': 'direction', + 'protocol': 'protocol', + 'port_range_max': 'port_max', + 'security_group_id': self.sec_group['id'], + 'port_range_min': 'port_min'} self.neutron_floatingip = {'id': 'fip_id', 'floating_ip_address': 'test_ip'} self.neutron_client = mock.Mock() @@ -260,6 +266,9 @@ class OSUtilsTesting(unittest.TestCase): 'show_bgpvpn.return_value': self.mock_return, 'list_security_groups.return_value': {'security_groups': [self.sec_group]}, + 'list_security_group_rules.' + 'return_value': {'security_group_rules': + [self.sec_group_rule]}, 'create_security_group_rule.return_value': mock.Mock(), 'create_security_group.return_value': {'security_group': self.sec_group}, @@ -1247,6 +1256,52 @@ class OSUtilsTesting(unittest.TestCase): 'test_sec_group'), 'sec_group_id') + def test_get_security_group_rules_default(self): + self.assertEqual(openstack_utils. + get_security_group_rules(self.neutron_client, + self.sec_group['id']), + [self.sec_group_rule]) + + @mock.patch('functest.utils.openstack_utils.logger.error') + def test_get_security_group_rules_exception(self, mock_logger_error): + self.assertEqual(openstack_utils. + get_security_group_rules(Exception, + 'sec_group_id'), + None) + self.assertTrue(mock_logger_error.called) + + def test_check_security_group_rules_not_exists(self): + self.assertEqual(openstack_utils. + check_security_group_rules(self.neutron_client, + 'sec_group_id_2', + 'direction', + 'protocol', + 'port_min', + 'port_max'), + True) + + def test_check_security_group_rules_exists(self): + self.assertEqual(openstack_utils. + check_security_group_rules(self.neutron_client, + self.sec_group['id'], + 'direction', + 'protocol', + 'port_min', + 'port_max'), + False) + + @mock.patch('functest.utils.openstack_utils.logger.error') + def test_check_security_group_rules_exception(self, mock_logger_error): + self.assertEqual(openstack_utils. + check_security_group_rules(Exception, + 'sec_group_id', + 'direction', + 'protocol', + 'port_max', + 'port_min'), + None) + self.assertTrue(mock_logger_error.called) + def test_create_security_group_default(self): self.assertEqual(openstack_utils. create_security_group(self.neutron_client, diff --git a/functest/utils/openstack_utils.py b/functest/utils/openstack_utils.py index ffc870f6..4663f7ba 100644 --- a/functest/utils/openstack_utils.py +++ b/functest/utils/openstack_utils.py @@ -1054,6 +1054,40 @@ def create_secgroup_rule(neutron_client, sg_id, direction, protocol, return False +def get_security_group_rules(neutron_client, sg_id): + try: + security_rules = neutron_client.list_security_group_rules()[ + 'security_group_rules'] + security_rules = [rule for rule in security_rules + if rule["security_group_id"] == sg_id] + return security_rules + except Exception, e: + logger.error("Error [get_security_group_rules(neutron_client, sg_id)]:" + " %s" % e) + return None + + +def check_security_group_rules(neutron_client, sg_id, direction, protocol, + port_min=None, port_max=None): + try: + security_rules = get_security_group_rules(neutron_client, sg_id) + security_rules = [rule for rule in security_rules + if (rule["direction"].lower() == direction + and rule["protocol"].lower() == protocol + and rule["port_range_min"] == port_min + and rule["port_range_max"] == port_max)] + if len(security_rules) == 0: + return True + else: + return False + except Exception, e: + logger.error("Error [check_security_group_rules(" + " neutron_client, sg_id, direction," + " protocol, port_min=None, port_max=None)]: " + "%s" % e) + return None + + def create_security_group_full(neutron_client, sg_name, sg_description): sg_id = get_security_group_id(neutron_client, sg_name) -- cgit 1.2.3-korg