summaryrefslogtreecommitdiffstats
path: root/testcases/vPing/CI/libraries
diff options
context:
space:
mode:
authorjose.lausuch <jose.lausuch@ericsson.com>2016-04-20 16:03:44 +0200
committerjose.lausuch <jose.lausuch@ericsson.com>2016-04-21 00:01:59 +0200
commitef62824e0471d07a4a3a40c401fc433070d961c6 (patch)
tree6d1bd61e7d49446ba1b1a52bb9978e1c5b88173c /testcases/vPing/CI/libraries
parent3a85a34474a9d7a9384f22bce35e7b81177830e3 (diff)
Fix Flake8 Violations in the Functest scripts
JIRA: FUNCTEST-213 Change-Id: I66c02dd6ff12ffb9798ebe44a4cfe7bfc73e76c3 Signed-off-by: jose.lausuch <jose.lausuch@ericsson.com>
Diffstat (limited to 'testcases/vPing/CI/libraries')
-rw-r--r--testcases/vPing/CI/libraries/vPing_ssh.py134
-rw-r--r--testcases/vPing/CI/libraries/vPing_userdata.py70
2 files changed, 123 insertions, 81 deletions
diff --git a/testcases/vPing/CI/libraries/vPing_ssh.py b/testcases/vPing/CI/libraries/vPing_ssh.py
index 7adf8a23d..43ab8525e 100644
--- a/testcases/vPing/CI/libraries/vPing_ssh.py
+++ b/testcases/vPing/CI/libraries/vPing_ssh.py
@@ -63,7 +63,7 @@ ch.setFormatter(formatter)
logger.addHandler(ch)
paramiko.util.log_to_file("/var/log/paramiko.log")
-REPO_PATH = os.environ['repos_dir']+'/functest/'
+REPO_PATH = os.environ['repos_dir'] + '/functest/'
if not os.path.exists(REPO_PATH):
logger.error("Functest repository directory not found '%s'" % REPO_PATH)
exit(-1)
@@ -159,15 +159,20 @@ def waitVmDeleted(nova, vm):
def create_private_neutron_net(neutron):
# Check if the network already exists
- network_id = openstack_utils.get_network_id(neutron, NEUTRON_PRIVATE_NET_NAME)
- subnet_id = openstack_utils.get_subnet_id(neutron, NEUTRON_PRIVATE_SUBNET_NAME)
- router_id = openstack_utils.get_router_id(neutron, NEUTRON_ROUTER_NAME)
+ network_id = openstack_utils.get_network_id(neutron,
+ NEUTRON_PRIVATE_NET_NAME)
+ subnet_id = openstack_utils.get_subnet_id(neutron,
+ NEUTRON_PRIVATE_SUBNET_NAME)
+ router_id = openstack_utils.get_router_id(neutron,
+ NEUTRON_ROUTER_NAME)
if network_id != '' and subnet_id != '' and router_id != '':
- logger.info("Using existing network '%s'..." % NEUTRON_PRIVATE_NET_NAME)
+ logger.info("Using existing network '%s'..."
+ % NEUTRON_PRIVATE_NET_NAME)
else:
neutron.format = 'json'
- logger.info('Creating neutron network %s...' % NEUTRON_PRIVATE_NET_NAME)
+ logger.info('Creating neutron network %s...'
+ % NEUTRON_PRIVATE_NET_NAME)
network_id = openstack_utils. \
create_neutron_net(neutron, NEUTRON_PRIVATE_NET_NAME)
@@ -193,7 +198,9 @@ def create_private_neutron_net(neutron):
logger.debug("Router '%s' created successfully" % router_id)
logger.debug('Adding router to subnet...')
- if not openstack_utils.add_interface_router(neutron, router_id, subnet_id):
+ if not openstack_utils.add_interface_router(neutron,
+ router_id,
+ subnet_id):
return False
logger.debug("Interface added successfully.")
@@ -209,37 +216,42 @@ def create_private_neutron_net(neutron):
def create_security_group(neutron_client):
- sg_id = openstack_utils.get_security_group_id(neutron_client, SECGROUP_NAME)
+ sg_id = openstack_utils.get_security_group_id(neutron_client,
+ SECGROUP_NAME)
if sg_id != '':
logger.info("Using existing security group '%s'..." % SECGROUP_NAME)
else:
logger.info("Creating security group '%s'..." % SECGROUP_NAME)
SECGROUP = openstack_utils.create_security_group(neutron_client,
- SECGROUP_NAME,
- SECGROUP_DESCR)
+ SECGROUP_NAME,
+ SECGROUP_DESCR)
if not SECGROUP:
logger.error("Failed to create the security group...")
return False
sg_id = SECGROUP['id']
- logger.debug("Security group '%s' with ID=%s created successfully." %\
- (SECGROUP['name'], sg_id))
+ logger.debug("Security group '%s' with ID=%s created successfully."
+ % (SECGROUP['name'], sg_id))
- logger.debug("Adding ICMP rules in security group '%s'..." % SECGROUP_NAME)
- if not openstack_utils.create_secgroup_rule(neutron_client, sg_id, \
- 'ingress', 'icmp'):
+ logger.debug("Adding ICMP rules in security group '%s'..."
+ % SECGROUP_NAME)
+ if not openstack_utils.create_secgroup_rule(neutron_client, sg_id,
+ 'ingress', 'icmp'):
logger.error("Failed to create the security group rule...")
return False
- logger.debug("Adding SSH rules in security group '%s'..." % SECGROUP_NAME)
- if not openstack_utils.create_secgroup_rule(neutron_client, sg_id, \
- 'ingress', 'tcp', '22', '22'):
+ logger.debug("Adding SSH rules in security group '%s'..."
+ % SECGROUP_NAME)
+ if not openstack_utils.\
+ create_secgroup_rule(neutron_client, sg_id,
+ 'ingress', 'tcp', '22', '22'):
logger.error("Failed to create the security group rule...")
return False
- if not openstack_utils.create_secgroup_rule(neutron_client, sg_id, \
- 'egress', 'tcp', '22', '22'):
+ if not openstack_utils.\
+ create_secgroup_rule(neutron_client, sg_id,
+ 'egress', 'tcp', '22', '22'):
logger.error("Failed to create the security group rule...")
return False
return sg_id
@@ -290,7 +302,7 @@ def cleanup(nova, neutron, image_id, network_dic, sg_id, floatingip):
router_id = network_dic["router_id"]
if not openstack_utils.remove_interface_router(neutron, router_id,
- subnet_id):
+ subnet_id):
logger.error("Unable to remove subnet '%s' from router '%s'" % (
subnet_id, router_id))
return False
@@ -324,7 +336,8 @@ def cleanup(nova, neutron, image_id, network_dic, sg_id, floatingip):
logger.debug("Releasing floating ip '%s'..." % floatingip['fip_addr'])
if not openstack_utils.delete_floating_ip(nova, floatingip['fip_id']):
- logger.error("Unable to delete floatingip '%s'" % floatingip['fip_addr'])
+ logger.error("Unable to delete floatingip '%s'"
+ % floatingip['fip_addr'])
return False
logger.debug(
"Floating IP '%s' deleted successfully" % floatingip['fip_addr'])
@@ -350,7 +363,8 @@ def push_results(start_time_ts, duration, test_status):
'duration': duration,
'status': test_status})
except:
- logger.error("Error pushing results into Database '%s'" % sys.exc_info()[0])
+ logger.error("Error pushing results into Database '%s'"
+ % sys.exc_info()[0])
def main():
@@ -361,8 +375,9 @@ def main():
neutron_client = neutronclient.Client(**creds_neutron)
creds_keystone = openstack_utils.get_credentials("keystone")
keystone_client = keystoneclient.Client(**creds_keystone)
- glance_endpoint = keystone_client.service_catalog.url_for(service_type='image',
- endpoint_type='publicURL')
+ glance_endpoint = keystone_client.\
+ service_catalog.url_for(service_type='image',
+ endpoint_type='publicURL')
glance_client = glanceclient.Client(1, glance_endpoint,
token=keystone_client.auth_token)
EXIT_CODE = -1
@@ -380,13 +395,13 @@ def main():
logger.info("Creating image '%s' from '%s'..." % (GLANCE_IMAGE_NAME,
GLANCE_IMAGE_PATH))
image_id = openstack_utils.create_glance_image(glance_client,
- GLANCE_IMAGE_NAME,
- GLANCE_IMAGE_PATH)
+ GLANCE_IMAGE_NAME,
+ GLANCE_IMAGE_PATH)
if not image_id:
logger.error("Failed to create a Glance image...")
return(EXIT_CODE)
- logger.debug("Image '%s' with ID=%s created successfully." %\
- (GLANCE_IMAGE_NAME, image_id))
+ logger.debug("Image '%s' with ID=%s created successfully."
+ % (GLANCE_IMAGE_NAME, image_id))
network_dic = create_private_neutron_net(neutron_client)
if not network_dic:
@@ -437,7 +452,8 @@ def main():
if not waitVmActive(nova_client, vm1):
logger.error("Instance '%s' cannot be booted. Status is '%s'" % (
NAME_VM_1, openstack_utils.get_instance_status(nova_client, vm1)))
- cleanup(nova_client, neutron_client, image_id, network_dic, sg_id, floatingip)
+ cleanup(nova_client, neutron_client, image_id, network_dic, sg_id,
+ floatingip)
return (EXIT_CODE)
else:
logger.info("Instance '%s' is ACTIVE." % NAME_VM_1)
@@ -446,7 +462,8 @@ def main():
test_ip = vm1.networks.get(NEUTRON_PRIVATE_NET_NAME)[0]
logger.debug("Instance '%s' got private ip '%s'." % (NAME_VM_1, test_ip))
- logger.info("Adding '%s' to security group '%s'..." % (NAME_VM_1, SECGROUP_NAME))
+ logger.info("Adding '%s' to security group '%s'..."
+ % (NAME_VM_1, SECGROUP_NAME))
openstack_utils.add_secgroup_to_instance(nova_client, vm1.id, sg_id)
# boot VM 2
@@ -464,29 +481,34 @@ def main():
if not waitVmActive(nova_client, vm2):
logger.error("Instance '%s' cannot be booted. Status is '%s'" % (
NAME_VM_2, openstack_utils.get_instance_status(nova_client, vm2)))
- cleanup(nova_client, neutron_client, image_id, network_dic, sg_id, floatip_dic)
+ cleanup(nova_client, neutron_client, image_id, network_dic, sg_id,
+ floatip_dic)
return (EXIT_CODE)
else:
logger.info("Instance '%s' is ACTIVE." % NAME_VM_2)
- logger.info("Adding '%s' to security group '%s'..." % (NAME_VM_2, SECGROUP_NAME))
+ logger.info("Adding '%s' to security group '%s'..." % (NAME_VM_2,
+ SECGROUP_NAME))
openstack_utils.add_secgroup_to_instance(nova_client, vm2.id, sg_id)
logger.info("Creating floating IP for VM '%s'..." % NAME_VM_2)
floatip_dic = openstack_utils.create_floating_ip(neutron_client)
floatip = floatip_dic['fip_addr']
- floatip_id = floatip_dic['fip_id']
+ # floatip_id = floatip_dic['fip_id']
if floatip is None:
logger.error("Cannot create floating IP.")
- cleanup(nova_client, neutron_client, image_id, network_dic, sg_id, floatip_dic)
+ cleanup(nova_client, neutron_client, image_id, network_dic, sg_id,
+ floatip_dic)
return (EXIT_CODE)
logger.info("Floating IP created: '%s'" % floatip)
- logger.info("Associating floating ip: '%s' to VM '%s' " % (floatip, NAME_VM_2))
+ logger.info("Associating floating ip: '%s' to VM '%s' "
+ % (floatip, NAME_VM_2))
if not openstack_utils.add_floating_ip(nova_client, vm2.id, floatip):
logger.error("Cannot associate floating IP to VM.")
- cleanup(nova_client, neutron_client, image_id, network_dic, sg_id, floatip_dic)
+ cleanup(nova_client, neutron_client, image_id, network_dic, sg_id,
+ floatip_dic)
return (EXIT_CODE)
logger.info("Trying to establish SSH connection to %s..." % floatip)
@@ -502,7 +524,8 @@ def main():
cidr_first_octet = NEUTRON_PRIVATE_SUBNET_CIDR.split('.')[0]
while timeout > 0:
try:
- ssh.connect(floatip, username=username, password=password, timeout=2)
+ ssh.connect(floatip, username=username,
+ password=password, timeout=2)
logger.debug("SSH connection established to %s." % floatip)
break
except:
@@ -513,26 +536,33 @@ def main():
console_log = vm2.get_console_output()
# print each "Sending discover" captured on the console log
- if len(re.findall("Sending discover", console_log)) > discover_count and not got_ip:
+ if len(re.findall("Sending discover", console_log)) > discover_count \
+ and not got_ip:
discover_count += 1
- logger.debug("Console-log '%s': Sending discover..." % NAME_VM_2)
+ logger.debug("Console-log '%s': Sending discover..."
+ % NAME_VM_2)
- # check if eth0 got an ip, the line looks like this: "inet addr:192.168."....
+ # check if eth0 got an ip,the line looks like this:
+ # "inet addr:192.168."....
# if the dhcp agent fails to assing ip, this line will not appear
- if "inet addr:"+cidr_first_octet in console_log and not got_ip:
+ if "inet addr:" + cidr_first_octet in console_log and not got_ip:
got_ip = True
- logger.debug("The instance '%s' succeeded to get the IP from the dhcp agent.")
+ logger.debug("The instance '%s' succeeded to get the IP "
+ "from the dhcp agent.")
- # if dhcp doesn't work, it shows "No lease, failing". The test will fail...
+ # if dhcp doesnt work,it shows "No lease, failing".The test will fail
if "No lease, failing" in console_log and not nolease and not got_ip:
nolease = True
- logger.debug("Console-log '%s': No lease, failing..." % NAME_VM_2)
- logger.info("The instance failed to get an IP from "\
- "the DHCP agent. The test will probably timeout...")
+ logger.debug("Console-log '%s': No lease, failing..."
+ % NAME_VM_2)
+ logger.info("The instance failed to get an IP from the "
+ "DHCP agent. The test will probably timeout...")
if timeout == 0: # 300 sec timeout (5 min)
- logger.error("Cannot establish connection to IP '%s'. Aborting" % floatip)
- cleanup(nova_client, neutron_client, image_id, network_dic, sg_id, floatip_dic)
+ logger.error("Cannot establish connection to IP '%s'. Aborting"
+ % floatip)
+ cleanup(nova_client, neutron_client, image_id, network_dic, sg_id,
+ floatip_dic)
return (EXIT_CODE)
scp = SCPClient(ssh.get_transport())
@@ -541,7 +571,8 @@ def main():
try:
scp.put(ping_script, "~/")
except:
- logger.error("Cannot SCP the file '%s' to VM '%s'" % (ping_script, floatip))
+ logger.error("Cannot SCP the file '%s' to VM '%s'"
+ % (ping_script, floatip))
cmd = 'chmod 755 ~/ping.sh'
(stdin, stdout, stderr) = ssh.exec_command(cmd)
@@ -579,7 +610,8 @@ def main():
logger.debug("Pinging %s. Waiting for response..." % test_ip)
sec += 1
- cleanup(nova_client, neutron_client, image_id, network_dic, sg_id, floatip_dic)
+ cleanup(nova_client, neutron_client, image_id, network_dic, sg_id,
+ floatip_dic)
test_status = "NOK"
if EXIT_CODE == 0:
diff --git a/testcases/vPing/CI/libraries/vPing_userdata.py b/testcases/vPing/CI/libraries/vPing_userdata.py
index 9ffb56be3..1fc9e1d37 100644
--- a/testcases/vPing/CI/libraries/vPing_userdata.py
+++ b/testcases/vPing/CI/libraries/vPing_userdata.py
@@ -60,7 +60,7 @@ formatter = logging.Formatter('%(asctime)s - %(name)s'
ch.setFormatter(formatter)
logger.addHandler(ch)
-REPO_PATH = os.environ['repos_dir']+'/functest/'
+REPO_PATH = os.environ['repos_dir'] + '/functest/'
if not os.path.exists(REPO_PATH):
logger.error("Functest repository directory not found '%s'" % REPO_PATH)
exit(-1)
@@ -156,9 +156,12 @@ def waitVmDeleted(nova, vm):
def create_private_neutron_net(neutron):
# Check if the network already exists
- network_id = openstack_utils.get_network_id(neutron, NEUTRON_PRIVATE_NET_NAME)
- subnet_id = openstack_utils.get_subnet_id(neutron, NEUTRON_PRIVATE_SUBNET_NAME)
- router_id = openstack_utils.get_router_id(neutron, NEUTRON_ROUTER_NAME)
+ network_id = openstack_utils.get_network_id(neutron,
+ NEUTRON_PRIVATE_NET_NAME)
+ subnet_id = openstack_utils.get_subnet_id(neutron,
+ NEUTRON_PRIVATE_SUBNET_NAME)
+ router_id = openstack_utils.get_router_id(neutron,
+ NEUTRON_ROUTER_NAME)
if network_id != '' and subnet_id != '' and router_id != '':
logger.info("Using existing network '%s'.." % NEUTRON_PRIVATE_NET_NAME)
@@ -190,7 +193,8 @@ def create_private_neutron_net(neutron):
logger.debug("Router '%s' created successfully" % router_id)
logger.debug('Adding router to subnet...')
- if not openstack_utils.add_interface_router(neutron, router_id, subnet_id):
+ if not openstack_utils.add_interface_router(neutron, router_id,
+ subnet_id):
return False
logger.debug("Interface added successfully.")
@@ -206,38 +210,42 @@ def create_private_neutron_net(neutron):
def create_security_group(neutron_client):
- sg_id = openstack_utils.get_security_group_id(neutron_client, SECGROUP_NAME)
+ sg_id = openstack_utils.get_security_group_id(neutron_client,
+ SECGROUP_NAME)
if sg_id != '':
logger.info("Using existing security group '%s'..." % SECGROUP_NAME)
else:
logger.info("Creating security group '%s'..." % SECGROUP_NAME)
SECGROUP = openstack_utils.create_security_group(neutron_client,
- SECGROUP_NAME,
- SECGROUP_DESCR)
+ SECGROUP_NAME,
+ SECGROUP_DESCR)
if not SECGROUP:
logger.error("Failed to create the security group...")
return False
sg_id = SECGROUP['id']
- logger.debug("Security group '%s' with ID=%s created successfully." %\
- (SECGROUP['name'], sg_id))
+ logger.debug("Security group '%s' with ID=%s created successfully."
+ % (SECGROUP['name'], sg_id))
- logger.debug("Adding ICMP rules in security group '%s'..." % SECGROUP_NAME)
- if not openstack_utils.create_secgroup_rule(neutron_client, sg_id, \
- 'ingress', 'icmp'):
+ logger.debug("Adding ICMP rules in security group '%s'..."
+ % SECGROUP_NAME)
+ if not openstack_utils.create_secgroup_rule(neutron_client, sg_id,
+ 'ingress', 'icmp'):
logger.error("Failed to create the security group rule...")
return False
- logger.debug("Adding SSH rules in security group '%s'..." % SECGROUP_NAME)
- if not openstack_utils.create_secgroup_rule(neutron_client, sg_id, \
- 'ingress', 'tcp',
- '22', '22'):
+ logger.debug("Adding SSH rules in security group '%s'..."
+ % SECGROUP_NAME)
+ if not openstack_utils.create_secgroup_rule(neutron_client, sg_id,
+ 'ingress', 'tcp',
+ '22', '22'):
logger.error("Failed to create the security group rule...")
return False
- if not openstack_utils.create_secgroup_rule(neutron_client, sg_id, \
- 'egress', 'tcp', '22', '22'):
+ if not openstack_utils.create_secgroup_rule(neutron_client, sg_id,
+ 'egress', 'tcp',
+ '22', '22'):
logger.error("Failed to create the security group rule...")
return False
return sg_id
@@ -288,7 +296,7 @@ def cleanup(nova, neutron, image_id, network_dic):
router_id = network_dic["router_id"]
if not openstack_utils.remove_interface_router(neutron, router_id,
- subnet_id):
+ subnet_id):
logger.error("Unable to remove subnet '%s' from router '%s'" % (
subnet_id, router_id))
return False
@@ -336,7 +344,8 @@ def push_results(start_time_ts, duration, test_status):
'duration': duration,
'status': test_status})
except:
- logger.error("Error pushing results into Database '%s'" % sys.exc_info()[0])
+ logger.error("Error pushing results into Database '%s'"
+ % sys.exc_info()[0])
def main():
@@ -347,8 +356,9 @@ def main():
neutron_client = neutronclient.Client(**creds_neutron)
creds_keystone = openstack_utils.get_credentials("keystone")
keystone_client = keystoneclient.Client(**creds_keystone)
- glance_endpoint = keystone_client.service_catalog.url_for(service_type='image',
- endpoint_type='publicURL')
+ glance_endpoint = keystone_client.\
+ service_catalog.url_for(service_type='image',
+ endpoint_type='publicURL')
glance_client = glanceclient.Client(1, glance_endpoint,
token=keystone_client.auth_token)
EXIT_CODE = -1
@@ -366,13 +376,13 @@ def main():
logger.info("Creating image '%s' from '%s'..." % (GLANCE_IMAGE_NAME,
GLANCE_IMAGE_PATH))
image_id = openstack_utils.create_glance_image(glance_client,
- GLANCE_IMAGE_NAME,
- GLANCE_IMAGE_PATH)
+ GLANCE_IMAGE_NAME,
+ GLANCE_IMAGE_PATH)
if not image_id:
logger.error("Failed to create a Glance image...")
return(EXIT_CODE)
- logger.debug("Image '%s' with ID=%s created successfully." %\
- (GLANCE_IMAGE_NAME, image_id))
+ logger.debug("Image '%s' with ID=%s created successfully."
+ % (GLANCE_IMAGE_NAME, image_id))
network_dic = create_private_neutron_net(neutron_client)
if not network_dic:
@@ -381,7 +391,7 @@ def main():
return(EXIT_CODE)
network_id = network_dic["net_id"]
- sg_id = create_security_group(neutron_client)
+ create_security_group(neutron_client)
# Check if the given flavor exists
try:
@@ -500,8 +510,8 @@ def main():
break
elif sec % 10 == 0:
if "request failed" in console_log:
- logger.debug("It seems userdata is not supported in nova boot." + \
- " Waiting a bit...")
+ logger.debug("It seems userdata is not supported in "
+ "nova boot. Waiting a bit...")
metadata_tries += 1
else:
logger.debug("Pinging %s. Waiting for response..." % test_ip)