summaryrefslogtreecommitdiffstats
path: root/sdnvpn/lib/openstack_utils.py
diff options
context:
space:
mode:
authorStamatis Katsaounis <mokats@intracom-telecom.com>2018-09-06 14:44:48 +0300
committerStamatis Katsaounis <mokats@intracom-telecom.com>2018-10-09 16:40:10 +0300
commitebf6885a907379083b360ae729f69151c1dd8115 (patch)
tree715f7716028fff85e2e3e44045ad55dff916c1d8 /sdnvpn/lib/openstack_utils.py
parentc05105a4f9f51f7bb31cad791e65d664e5a3bc4b (diff)
Replace neutron client calls with openstack sdk
JIRA: SDNVPN-220 This patch replaces almost all calls of neutron client with openstack sdk. The calls regarding bgpvpn are not replaced due to lack of support from openstack sdk at the moment. Change-Id: Idf52b8c57b895f87ba6320e350bf170faa24d0a7 Signed-off-by: Stamatis Katsaounis <mokats@intracom-telecom.com>
Diffstat (limited to 'sdnvpn/lib/openstack_utils.py')
-rw-r--r--sdnvpn/lib/openstack_utils.py387
1 files changed, 171 insertions, 216 deletions
diff --git a/sdnvpn/lib/openstack_utils.py b/sdnvpn/lib/openstack_utils.py
index bf78ef1..9126eeb 100644
--- a/sdnvpn/lib/openstack_utils.py
+++ b/sdnvpn/lib/openstack_utils.py
@@ -372,12 +372,12 @@ def get_or_create_flavor(flavor_name, ram, disk, vcpus, public=True):
return flavor_exists, flavor_id
-def get_floating_ips(neutron_client):
+def get_floating_ips(conn):
try:
- floating_ips = neutron_client.list_floatingips()
- return floating_ips['floatingips']
+ floating_ips = conn.network.ips()
+ return floating_ips
except Exception as e:
- logger.error("Error [get_floating_ips(neutron_client)]: %s" % e)
+ logger.error("Error [get_floating_ips(network)]: %s" % e)
return None
@@ -505,27 +505,25 @@ def create_instance_and_wait_for_active(flavor_name,
return None
-def create_floating_ip(neutron_client):
- extnet_id = get_external_net_id(neutron_client)
- props = {'floating_network_id': extnet_id}
+def create_floating_ip(conn):
+ extnet_id = get_external_net_id(conn)
try:
- ip_json = neutron_client.create_floatingip({'floatingip': props})
- fip_addr = ip_json['floatingip']['floating_ip_address']
- fip_id = ip_json['floatingip']['id']
+ fip = conn.network.create_ip(floating_network_id=extnet_id)
+ fip_addr = fip.floating_ip_address
+ fip_id = fip.id
except Exception as e:
- logger.error("Error [create_floating_ip(neutron_client)]: %s" % e)
+ logger.error("Error [create_floating_ip(network)]: %s" % e)
return None
return {'fip_addr': fip_addr, 'fip_id': fip_id}
-def attach_floating_ip(neutron_client, port_id):
- extnet_id = get_external_net_id(neutron_client)
- props = {'floating_network_id': extnet_id,
- 'port_id': port_id}
+def attach_floating_ip(conn, port_id):
+ extnet_id = get_external_net_id(conn)
try:
- return neutron_client.create_floatingip({'floatingip': props})
+ return conn.network.create_ip(floating_network_id=extnet_id,
+ port_id=port_id)
except Exception as e:
- logger.error("Error [Attach_floating_ip(neutron_client), %s]: %s"
+ logger.error("Error [Attach_floating_ip(network), %s]: %s"
% (port_id, e))
return None
@@ -550,12 +548,12 @@ def delete_instance(conn, instance_id):
return False
-def delete_floating_ip(neutron_client, floatingip_id):
+def delete_floating_ip(conn, floatingip_id):
try:
- neutron_client.delete_floatingip(floatingip_id)
+ conn.network.delete_ip(floatingip_id)
return True
except Exception as e:
- logger.error("Error [delete_floating_ip(neutron_client, '%s')]: %s"
+ logger.error("Error [delete_floating_ip(network, '%s')]: %s"
% (floatingip_id, e))
return False
@@ -593,266 +591,237 @@ def delete_aggregate(cloud, aggregate_name):
# *********************************************
# NEUTRON
# *********************************************
-def get_network_list(neutron_client):
- network_list = neutron_client.list_networks()['networks']
- if len(network_list) == 0:
- return None
- else:
- return network_list
+def get_network_list(conn):
+ return conn.network.networks()
-def get_router_list(neutron_client):
- router_list = neutron_client.list_routers()['routers']
- if len(router_list) == 0:
- return None
- else:
- return router_list
+def get_router_list(conn):
+ return conn.network.routers()
-def get_port_list(neutron_client):
- port_list = neutron_client.list_ports()['ports']
- if len(port_list) == 0:
- return None
- else:
- return port_list
+def get_port_list(conn):
+ return conn.network.ports()
-def get_network_id(neutron_client, network_name):
- networks = neutron_client.list_networks()['networks']
+def get_network_id(conn, network_name):
+ networks = conn.network.networks()
id = ''
for n in networks:
- if n['name'] == network_name:
- id = n['id']
+ if n.name == network_name:
+ id = n.id
break
return id
-def get_subnet_id(neutron_client, subnet_name):
- subnets = neutron_client.list_subnets()['subnets']
+def get_subnet_id(conn, subnet_name):
+ subnets = conn.network.subnets()
id = ''
for s in subnets:
- if s['name'] == subnet_name:
- id = s['id']
+ if s.name == subnet_name:
+ id = s.id
break
return id
-def get_router_id(neutron_client, router_name):
- routers = neutron_client.list_routers()['routers']
+def get_router_id(conn, router_name):
+ routers = conn.network.routers()
id = ''
for r in routers:
- if r['name'] == router_name:
- id = r['id']
+ if r.name == router_name:
+ id = r.id
break
return id
-def get_private_net(neutron_client):
+def get_private_net(conn):
# Checks if there is an existing shared private network
- networks = neutron_client.list_networks()['networks']
- if len(networks) == 0:
- return None
+ networks = conn.network.networks()
for net in networks:
- if (net['router:external'] is False) and (net['shared'] is True):
+ if (net.is_router_external is False) and (net.is_shared is True):
return net
return None
-def get_external_net(neutron_client):
+def get_external_net(conn):
if (env.get('EXTERNAL_NETWORK')):
return env.get('EXTERNAL_NETWORK')
- for network in neutron_client.list_networks()['networks']:
- if network['router:external']:
- return network['name']
+ for network in conn.network.networks():
+ if network.is_router_external:
+ return network.name
return None
-def get_external_net_id(neutron_client):
+def get_external_net_id(conn):
if (env.get('EXTERNAL_NETWORK')):
- networks = neutron_client.list_networks(
- name=env.get('EXTERNAL_NETWORK'))
- net_id = networks['networks'][0]['id']
+ networks = conn.network.networks(name=env.get('EXTERNAL_NETWORK'))
+ net_id = networks.next().id
return net_id
- for network in neutron_client.list_networks()['networks']:
- if network['router:external']:
- return network['id']
+ for network in conn.network.networks():
+ if network.is_router_external:
+ return network.id
return None
-def check_neutron_net(neutron_client, net_name):
- for network in neutron_client.list_networks()['networks']:
- if network['name'] == net_name:
- for subnet in network['subnets']:
+def check_neutron_net(conn, net_name):
+ for network in conn.network.networks():
+ if network.name == net_name:
+ for subnet in network.subnet_ids:
return True
return False
-def create_neutron_net(neutron_client, name):
- json_body = {'network': {'name': name,
- 'admin_state_up': True}}
+def create_neutron_net(conn, name):
try:
- network = neutron_client.create_network(body=json_body)
- network_dict = network['network']
- return network_dict['id']
+ network = conn.network.create_network(name=name)
+ return network.id
except Exception as e:
- logger.error("Error [create_neutron_net(neutron_client, '%s')]: %s"
+ logger.error("Error [create_neutron_net(network, '%s')]: %s"
% (name, e))
return None
-def create_neutron_subnet(neutron_client, name, cidr, net_id,
+def create_neutron_subnet(conn, name, cidr, net_id,
dns=['8.8.8.8', '8.8.4.4']):
- json_body = {'subnets': [{'name': name, 'cidr': cidr,
- 'ip_version': 4, 'network_id': net_id,
- 'dns_nameservers': dns}]}
-
try:
- subnet = neutron_client.create_subnet(body=json_body)
- return subnet['subnets'][0]['id']
+ subnet = conn.network.create_subnet(name=name,
+ cidr=cidr,
+ ip_version='4',
+ network_id=net_id,
+ dns_nameservers=dns)
+ return subnet.id
except Exception as e:
- logger.error("Error [create_neutron_subnet(neutron_client, '%s', "
+ logger.error("Error [create_neutron_subnet(network, '%s', "
"'%s', '%s')]: %s" % (name, cidr, net_id, e))
return None
-def create_neutron_router(neutron_client, name):
- json_body = {'router': {'name': name, 'admin_state_up': True}}
+def create_neutron_router(conn, name):
try:
- router = neutron_client.create_router(json_body)
- return router['router']['id']
+ router = conn.network.create_router(name=name)
+ return router.id
except Exception as e:
- logger.error("Error [create_neutron_router(neutron_client, '%s')]: %s"
+ logger.error("Error [create_neutron_router(network, '%s')]: %s"
% (name, e))
return None
-def create_neutron_port(neutron_client, name, network_id, ip):
- json_body = {'port': {
- 'admin_state_up': True,
- 'name': name,
- 'network_id': network_id,
- 'fixed_ips': [{"ip_address": ip}]
- }}
+def create_neutron_port(conn, name, network_id, ip):
try:
- port = neutron_client.create_port(body=json_body)
- return port['port']['id']
+ port = conn.network.create_port(name=name,
+ network_id=network_id,
+ fixed_ips=[{'ip_address': ip}])
+ return port.id
except Exception as e:
- logger.error("Error [create_neutron_port(neutron_client, '%s', '%s', "
+ logger.error("Error [create_neutron_port(network, '%s', '%s', "
"'%s')]: %s" % (name, network_id, ip, e))
return None
-def update_neutron_net(neutron_client, network_id, shared=False):
- json_body = {'network': {'shared': shared}}
+def update_neutron_net(conn, network_id, shared=False):
try:
- neutron_client.update_network(network_id, body=json_body)
+ conn.network.update_network(network_id, is_shared=shared)
return True
except Exception as e:
- logger.error("Error [update_neutron_net(neutron_client, '%s', '%s')]: "
+ logger.error("Error [update_neutron_net(network, '%s', '%s')]: "
"%s" % (network_id, str(shared), e))
return False
-def update_neutron_port(neutron_client, port_id, device_owner):
- json_body = {'port': {
- 'device_owner': device_owner,
- }}
+def update_neutron_port(conn, port_id, device_owner):
try:
- port = neutron_client.update_port(port=port_id,
- body=json_body)
- return port['port']['id']
+ port = conn.network.update_port(port_id, device_owner=device_owner)
+ return port.id
except Exception as e:
- logger.error("Error [update_neutron_port(neutron_client, '%s', '%s')]:"
+ logger.error("Error [update_neutron_port(network, '%s', '%s')]:"
" %s" % (port_id, device_owner, e))
return None
-def add_interface_router(neutron_client, router_id, subnet_id):
- json_body = {"subnet_id": subnet_id}
+def add_interface_router(conn, router_id, subnet_id):
try:
- neutron_client.add_interface_router(router=router_id, body=json_body)
+ conn.network.add_interface_to_router(router_id, subnet_id=subnet_id)
return True
except Exception as e:
- logger.error("Error [add_interface_router(neutron_client, '%s', "
+ logger.error("Error [add_interface_router(network, '%s', "
"'%s')]: %s" % (router_id, subnet_id, e))
return False
-def add_gateway_router(neutron_client, router_id):
- ext_net_id = get_external_net_id(neutron_client)
+def add_gateway_router(conn, router_id):
+ ext_net_id = get_external_net_id(conn)
router_dict = {'network_id': ext_net_id}
try:
- neutron_client.add_gateway_router(router_id, router_dict)
+ conn.network.update_router(router_id,
+ external_gateway_info=router_dict)
return True
except Exception as e:
- logger.error("Error [add_gateway_router(neutron_client, '%s')]: %s"
+ logger.error("Error [add_gateway_router(network, '%s')]: %s"
% (router_id, e))
return False
-def delete_neutron_net(neutron_client, network_id):
+def delete_neutron_net(conn, network_id):
try:
- neutron_client.delete_network(network_id)
+ conn.network.delete_network(network_id, ignore_missing=False)
return True
except Exception as e:
- logger.error("Error [delete_neutron_net(neutron_client, '%s')]: %s"
+ logger.error("Error [delete_neutron_net(network, '%s')]: %s"
% (network_id, e))
return False
-def delete_neutron_subnet(neutron_client, subnet_id):
+def delete_neutron_subnet(conn, subnet_id):
try:
- neutron_client.delete_subnet(subnet_id)
+ conn.network.delete_subnet(subnet_id, ignore_missing=False)
return True
except Exception as e:
- logger.error("Error [delete_neutron_subnet(neutron_client, '%s')]: %s"
+ logger.error("Error [delete_neutron_subnet(network, '%s')]: %s"
% (subnet_id, e))
return False
-def delete_neutron_router(neutron_client, router_id):
+def delete_neutron_router(conn, router_id):
try:
- neutron_client.delete_router(router=router_id)
+ conn.network.delete_router(router_id, ignore_missing=False)
return True
except Exception as e:
- logger.error("Error [delete_neutron_router(neutron_client, '%s')]: %s"
+ logger.error("Error [delete_neutron_router(network, '%s')]: %s"
% (router_id, e))
return False
-def delete_neutron_port(neutron_client, port_id):
+def delete_neutron_port(conn, port_id):
try:
- neutron_client.delete_port(port_id)
+ conn.network.delete_port(port_id, ignore_missing=False)
return True
except Exception as e:
- logger.error("Error [delete_neutron_port(neutron_client, '%s')]: %s"
+ logger.error("Error [delete_neutron_port(network, '%s')]: %s"
% (port_id, e))
return False
-def remove_interface_router(neutron_client, router_id, subnet_id):
- json_body = {"subnet_id": subnet_id}
+def remove_interface_router(conn, router_id, subnet_id):
try:
- neutron_client.remove_interface_router(router=router_id,
- body=json_body)
+ conn.network.remove_interface_from_router(router_id,
+ subnet_id=subnet_id)
return True
except Exception as e:
- logger.error("Error [remove_interface_router(neutron_client, '%s', "
+ logger.error("Error [remove_interface_router(network, '%s', "
"'%s')]: %s" % (router_id, subnet_id, e))
return False
-def remove_gateway_router(neutron_client, router_id):
+def remove_gateway_router(conn, router_id):
try:
- neutron_client.remove_gateway_router(router_id)
+ conn.network.update_router(router_id, external_gateway_info=None)
return True
except Exception as e:
- logger.error("Error [remove_gateway_router(neutron_client, '%s')]: %s"
+ logger.error("Error [remove_gateway_router(network, '%s')]: %s"
% (router_id, e))
return False
-def create_network_full(neutron_client,
+def create_network_full(conn,
net_name,
subnet_name,
router_name,
@@ -860,45 +829,43 @@ def create_network_full(neutron_client,
dns=['8.8.8.8', '8.8.4.4']):
# Check if the network already exists
- network_id = get_network_id(neutron_client, net_name)
- subnet_id = get_subnet_id(neutron_client, subnet_name)
- router_id = get_router_id(neutron_client, router_name)
+ network_id = get_network_id(conn, net_name)
+ subnet_id = get_subnet_id(conn, subnet_name)
+ router_id = get_router_id(conn, router_name)
if network_id != '' and subnet_id != '' and router_id != '':
logger.info("A network with name '%s' already exists..." % net_name)
else:
- neutron_client.format = 'json'
-
logger.info('Creating neutron network %s...' % net_name)
if network_id == '':
- network_id = create_neutron_net(neutron_client, net_name)
+ network_id = create_neutron_net(conn, net_name)
if not network_id:
return False
logger.debug("Network '%s' created successfully" % network_id)
logger.debug('Creating Subnet....')
if subnet_id == '':
- subnet_id = create_neutron_subnet(neutron_client, subnet_name,
- cidr, network_id, dns)
+ subnet_id = create_neutron_subnet(conn, subnet_name, cidr,
+ network_id, dns)
if not subnet_id:
return None
logger.debug("Subnet '%s' created successfully" % subnet_id)
logger.debug('Creating Router...')
if router_id == '':
- router_id = create_neutron_router(neutron_client, router_name)
+ router_id = create_neutron_router(conn, router_name)
if not router_id:
return None
logger.debug("Router '%s' created successfully" % router_id)
logger.debug('Adding router to subnet...')
- if not add_interface_router(neutron_client, router_id, subnet_id):
+ if not add_interface_router(conn, router_id, subnet_id):
return None
logger.debug("Interface added successfully.")
logger.debug('Adding gateway to router...')
- if not add_gateway_router(neutron_client, router_id):
+ if not add_gateway_router(conn, router_id):
return None
logger.debug("Gateway added successfully.")
@@ -909,15 +876,15 @@ def create_network_full(neutron_client,
def create_shared_network_full(net_name, subnt_name, router_name, subnet_cidr):
- neutron_client = get_neutron_client()
+ conn = get_os_connection()
- network_dic = create_network_full(neutron_client,
+ network_dic = create_network_full(conn,
net_name,
subnt_name,
router_name,
subnet_cidr)
if network_dic:
- if not update_neutron_net(neutron_client,
+ if not update_neutron_net(conn,
network_dic['net_id'],
shared=True):
logger.error("Failed to update network %s..." % net_name)
@@ -935,56 +902,49 @@ def create_shared_network_full(net_name, subnt_name, router_name, subnet_cidr):
# *********************************************
-def get_security_groups(neutron_client):
- try:
- security_groups = neutron_client.list_security_groups()[
- 'security_groups']
- return security_groups
- except Exception as e:
- logger.error("Error [get_security_groups(neutron_client)]: %s" % e)
- return None
+def get_security_groups(conn):
+ return conn.network.security_groups()
-def get_security_group_id(neutron_client, sg_name):
- security_groups = get_security_groups(neutron_client)
+def get_security_group_id(conn, sg_name):
+ security_groups = get_security_groups(conn)
id = ''
for sg in security_groups:
- if sg['name'] == sg_name:
- id = sg['id']
+ if sg.name == sg_name:
+ id = sg.id
break
return id
-def create_security_group(neutron_client, sg_name, sg_description):
- json_body = {'security_group': {'name': sg_name,
- 'description': sg_description}}
+def create_security_group(conn, sg_name, sg_description):
try:
- secgroup = neutron_client.create_security_group(json_body)
- return secgroup['security_group']
+ secgroup = conn.network.\
+ create_security_group(name=sg_name, description=sg_description)
+ return secgroup
except Exception as e:
- logger.error("Error [create_security_group(neutron_client, '%s', "
+ logger.error("Error [create_security_group(network, '%s', "
"'%s')]: %s" % (sg_name, sg_description, e))
return None
-def create_secgroup_rule(neutron_client, sg_id, direction, protocol,
+def create_secgroup_rule(conn, sg_id, direction, protocol,
port_range_min=None, port_range_max=None):
# We create a security group in 2 steps
- # 1 - we check the format and set the json body accordingly
- # 2 - we call neturon client to create the security group
+ # 1 - we check the format and set the secgroup rule attributes accordingly
+ # 2 - we call openstacksdk to create the security group
# Format check
- json_body = {'security_group_rule': {'direction': direction,
- 'security_group_id': sg_id,
- 'protocol': protocol}}
+ secgroup_rule_attrs = {'direction': direction,
+ 'security_group_id': sg_id,
+ 'protocol': protocol}
# parameters may be
# - both None => we do nothing
- # - both Not None => we add them to the json description
+ # - both Not None => we add them to the secgroup rule attributes
# but one cannot be None is the other is not None
if (port_range_min is not None and port_range_max is not None):
- # add port_range in json description
- json_body['security_group_rule']['port_range_min'] = port_range_min
- json_body['security_group_rule']['port_range_max'] = port_range_max
+ # add port_range in secgroup rule attributes
+ secgroup_rule_attrs['port_range_min'] = port_range_min
+ secgroup_rule_attrs['port_range_max'] = port_range_max
logger.debug("Security_group format set (port range included)")
else:
# either both port range are set to None => do nothing
@@ -1001,7 +961,7 @@ def create_secgroup_rule(neutron_client, sg_id, direction, protocol,
# Create security group using neutron client
try:
- neutron_client.create_security_group_rule(json_body)
+ conn.network.create_security_group_rule(**secgroup_rule_attrs)
return True
except:
logger.exception("Impossible to create_security_group_rule,"
@@ -1009,62 +969,61 @@ def create_secgroup_rule(neutron_client, sg_id, direction, protocol,
return False
-def get_security_group_rules(neutron_client, sg_id):
+def get_security_group_rules(conn, sg_id):
try:
- security_rules = neutron_client.list_security_group_rules()[
- 'security_group_rules']
+ security_rules = conn.network.security_group_rules()
security_rules = [rule for rule in security_rules
- if rule["security_group_id"] == sg_id]
+ if rule.security_group_id == sg_id]
return security_rules
except Exception as e:
- logger.error("Error [get_security_group_rules(neutron_client, sg_id)]:"
+ logger.error("Error [get_security_group_rules(network, sg_id)]:"
" %s" % e)
return None
-def check_security_group_rules(neutron_client, sg_id, direction, protocol,
+def check_security_group_rules(conn, sg_id, direction, protocol,
port_min=None, port_max=None):
try:
- security_rules = get_security_group_rules(neutron_client, sg_id)
+ security_rules = get_security_group_rules(conn, sg_id)
security_rules = [rule for rule in security_rules
- if (rule["direction"].lower() == direction and
- rule["protocol"].lower() == protocol and
- rule["port_range_min"] == port_min and
- rule["port_range_max"] == port_max)]
+ if (rule.direction.lower() == direction and
+ rule.protocol.lower() == protocol and
+ rule.port_range_min == port_min and
+ rule.port_range_max == port_max)]
if len(security_rules) == 0:
return True
else:
return False
except Exception as e:
logger.error("Error [check_security_group_rules("
- " neutron_client, sg_id, direction,"
+ " network, sg_id, direction,"
" protocol, port_min=None, port_max=None)]: "
"%s" % e)
return None
-def create_security_group_full(neutron_client,
+def create_security_group_full(conn,
sg_name, sg_description):
- sg_id = get_security_group_id(neutron_client, sg_name)
+ sg_id = get_security_group_id(conn, sg_name)
if sg_id != '':
logger.info("Using existing security group '%s'..." % sg_name)
else:
logger.info("Creating security group '%s'..." % sg_name)
- SECGROUP = create_security_group(neutron_client,
+ SECGROUP = create_security_group(conn,
sg_name,
sg_description)
if not SECGROUP:
logger.error("Failed to create the security group...")
return None
- sg_id = SECGROUP['id']
+ sg_id = SECGROUP.id
logger.debug("Security group '%s' with ID=%s created successfully."
- % (SECGROUP['name'], sg_id))
+ % (SECGROUP.name, sg_id))
logger.debug("Adding ICMP rules in security group '%s'..."
% sg_name)
- if not create_secgroup_rule(neutron_client, sg_id,
+ if not create_secgroup_rule(conn, sg_id,
'ingress', 'icmp'):
logger.error("Failed to create the security group rule...")
return None
@@ -1072,12 +1031,12 @@ def create_security_group_full(neutron_client,
logger.debug("Adding SSH rules in security group '%s'..."
% sg_name)
if not create_secgroup_rule(
- neutron_client, sg_id, 'ingress', 'tcp', '22', '22'):
+ conn, sg_id, 'ingress', 'tcp', '22', '22'):
logger.error("Failed to create the security group rule...")
return None
if not create_secgroup_rule(
- neutron_client, sg_id, 'egress', 'tcp', '22', '22'):
+ conn, sg_id, 'egress', 'tcp', '22', '22'):
logger.error("Failed to create the security group rule...")
return None
return sg_id
@@ -1093,28 +1052,24 @@ def add_secgroup_to_instance(conn, instance_id, secgroup_id):
return False
-def update_sg_quota(neutron_client, tenant_id, sg_quota, sg_rule_quota):
- json_body = {"quota": {
- "security_group": sg_quota,
- "security_group_rule": sg_rule_quota
- }}
-
+def update_sg_quota(conn, tenant_id, sg_quota, sg_rule_quota):
try:
- neutron_client.update_quota(tenant_id=tenant_id,
- body=json_body)
+ conn.network.update_quota(tenant_id,
+ security_group_rules=sg_rule_quota,
+ security_groups=sg_quota)
return True
except Exception as e:
- logger.error("Error [update_sg_quota(neutron_client, '%s', '%s', "
+ logger.error("Error [update_sg_quota(network, '%s', '%s', "
"'%s')]: %s" % (tenant_id, sg_quota, sg_rule_quota, e))
return False
-def delete_security_group(neutron_client, secgroup_id):
+def delete_security_group(conn, secgroup_id):
try:
- neutron_client.delete_security_group(secgroup_id)
+ conn.network.delete_security_group(secgroup_id, ignore_missing=False)
return True
except Exception as e:
- logger.error("Error [delete_security_group(neutron_client, '%s')]: %s"
+ logger.error("Error [delete_security_group(network, '%s')]: %s"
% (secgroup_id, e))
return False