diff options
Diffstat (limited to 'testing/robot/lib/FDSLibrary.py')
-rw-r--r-- | testing/robot/lib/FDSLibrary.py | 75 |
1 files changed, 64 insertions, 11 deletions
diff --git a/testing/robot/lib/FDSLibrary.py b/testing/robot/lib/FDSLibrary.py index 0cb43ee..32c18eb 100644 --- a/testing/robot/lib/FDSLibrary.py +++ b/testing/robot/lib/FDSLibrary.py @@ -7,35 +7,58 @@ # http://www.apache.org/licenses/LICENSE-2.0 ############################################################################## +from keystoneauth1 import loading +from keystoneauth1 import session +from glanceclient import client as glance from neutronclient.v2_0 import client as neutron from novaclient import client as nova from novaclient.exceptions import NotFound +from robot.api import logger import time import datetime import os import subprocess + class FDSLibrary(): def __init__(self): + auth_obj = loading.get_plugin_loader('password').load_from_options(auth_url=os.getenv('OS_AUTH_URL'), + username=os.getenv('OS_USERNAME'), + password=os.getenv('OS_PASSWORD'), + project_id=os.getenv('OS_PROJECT_ID')) + logger.debug("Initializing glance client.") + self.glance_client = glance.Client('2', session=session.Session(auth=auth_obj)) + logger.debug("Initializing neutron client.") self.neutron_client = neutron.Client(username=os.getenv('OS_USERNAME'), password=os.getenv('OS_PASSWORD'), tenant_name=os.getenv('OS_TENANT_NAME'), auth_url=os.getenv('OS_AUTH_URL')) - - self.nova_client = nova.Client('2', - os.getenv('OS_USERNAME'), - os.getenv('OS_PASSWORD'), - os.getenv('OS_TENANT_NAME'), - os.getenv('OS_AUTH_URL')) + logger.debug("Initializing nova client.") + self.nova_client = nova.Client('2', session=session.Session(auth=auth_obj)) def check_flavor_exists(self, flavor): flavor_list_names = [x.name for x in self.nova_client.flavors.list()] return flavor in flavor_list_names + def create_flavor(self, name, ram, vcpus="1", disk="0"): + response = self.nova_client.flavors.create(name, ram, vcpus, disk) + return response + def check_image_exists(self, image): - image_list_names = [x.name for x in self.nova_client.images.list()] + image_list_names = [x.name for x in self.glance_client.images.list()] return image in image_list_names + def create_image(self, image_name, file_path, disk="qcow2", + container="bare", public="public", property="hw_mem_page_size=large"): + image = self.glance_client.images.create(name=image_name, + visibility=public, + disk_format=disk, + container_format=container, + property=property) + with open(file_path) as image_data: + self.glance_client.images.upload(image.id, image_data) + return image.id + def create_network(self, name): body = {'network': {'name': name}} response = self.neutron_client.create_network(body=body) @@ -101,11 +124,33 @@ class FDSLibrary(): time.sleep(5) return False - def create_security_group(self): - pass + def create_security_group(self, name): + body = {'security_group': { + 'name': name + }} + response = self.neutron_client.create_security_group(body=body) + return response - def create_security_rule(self): - pass + def create_security_rule(self, sg_id, dir, eth, desc=None, proto=None, port_min=None, port_max=None, r_sg_id=None, r_prefix=None): + body = {'security_group_rule': { + 'security_group_id': sg_id, + 'ethertype': eth, + 'direction': dir + }} + if desc is not None: + body['security_group_rule']['description'] = desc + if proto is not None: + body['security_group_rule']['protocol'] = proto + if port_min is not None: + body['security_group_rule']['port_range_min'] = port_min + if port_max is not None: + body['security_group_rule']['port_range_max'] = port_max + if r_sg_id is not None: + body['security_group_rule']['remote_group_id'] = r_sg_id + if r_prefix is not None: + body['security_group_rule']['remote_ip_prefix'] = r_prefix + response = self.neutron_client.create_security_group_rule(body=body) + return response def poll_server(self, vm_id, status, timeout=300): try: @@ -144,6 +189,14 @@ class FDSLibrary(): response = self.neutron_client.delete_network(net_id) return response + def delete_security_group(self, sg_id): + response = self.neutron_client.delete_security_group(sg_id) + return response + + def delete_security_rule(self, rule_id): + response = self.neutron_client.delete_security_group_rule(rule_id) + return response + def ping_vm(self, ip_address): try: output = subprocess.check_output(['ping', '-c', '4', ip_address]) |