diff options
author | tomsou <soth@intracom-telecom.com> | 2017-03-27 15:56:02 +0000 |
---|---|---|
committer | tomsou <soth@intracom-telecom.com> | 2017-03-30 13:28:30 +0000 |
commit | 41b103d9a6804a97ca85e2b09b628cea06219faf (patch) | |
tree | e261310774013312ae5d4d4207a8392a1ad35219 /functest/utils/openstack_utils.py | |
parent | 9439d684449825f8d580c26b1b9475ccbdde698a (diff) |
Check a rule existence for a specific security group
Implement a functionality to check if a rule concerning
a specific security group exists
- function get_security_group_rules(neutron_client, sg_id)
returns the list of the security rules for a specific security group
- function check_security_group_rules(neutron_client,
sg_id, direction,
protocol,
port_min=None,
port_max=None)
checks if a specific rule for a specific security group exists
and returns True or False
- implement unit tests for the two new functions
This new functionality is needed for sdnvpn project
Change-Id: Ib930bc9a76141932f4164d88e2640b49f3df4d77
Signed-off-by: tomsou <soth@intracom-telecom.com>
Diffstat (limited to 'functest/utils/openstack_utils.py')
-rw-r--r-- | functest/utils/openstack_utils.py | 34 |
1 files changed, 34 insertions, 0 deletions
diff --git a/functest/utils/openstack_utils.py b/functest/utils/openstack_utils.py index ffc870f6..4663f7ba 100644 --- a/functest/utils/openstack_utils.py +++ b/functest/utils/openstack_utils.py @@ -1054,6 +1054,40 @@ def create_secgroup_rule(neutron_client, sg_id, direction, protocol, return False +def get_security_group_rules(neutron_client, sg_id): + try: + security_rules = neutron_client.list_security_group_rules()[ + 'security_group_rules'] + security_rules = [rule for rule in security_rules + if rule["security_group_id"] == sg_id] + return security_rules + except Exception, e: + logger.error("Error [get_security_group_rules(neutron_client, sg_id)]:" + " %s" % e) + return None + + +def check_security_group_rules(neutron_client, sg_id, direction, protocol, + port_min=None, port_max=None): + try: + security_rules = get_security_group_rules(neutron_client, sg_id) + security_rules = [rule for rule in security_rules + if (rule["direction"].lower() == direction + and rule["protocol"].lower() == protocol + and rule["port_range_min"] == port_min + and rule["port_range_max"] == port_max)] + if len(security_rules) == 0: + return True + else: + return False + except Exception, e: + logger.error("Error [check_security_group_rules(" + " neutron_client, sg_id, direction," + " protocol, port_min=None, port_max=None)]: " + "%s" % e) + return None + + def create_security_group_full(neutron_client, sg_name, sg_description): sg_id = get_security_group_id(neutron_client, sg_name) |