diff options
Diffstat (limited to 'docker/storperf-master/storperf/storperf_master.py')
-rw-r--r-- | docker/storperf-master/storperf/storperf_master.py | 757 |
1 files changed, 464 insertions, 293 deletions
diff --git a/docker/storperf-master/storperf/storperf_master.py b/docker/storperf-master/storperf/storperf_master.py index ad6e16c..73f8f0d 100644 --- a/docker/storperf-master/storperf/storperf_master.py +++ b/docker/storperf-master/storperf/storperf_master.py @@ -7,23 +7,26 @@ # http://www.apache.org/licenses/LICENSE-2.0 ############################################################################## -from datetime import datetime -import logging -import os -import socket -from threading import Thread -from time import sleep -from cinderclient import client as cinderclient -from keystoneauth1 import loading -from keystoneauth1 import session -import paramiko +from datetime import datetime +from io import StringIO +from multiprocessing.pool import ThreadPool from scp import SCPClient - -import heatclient.client as heatclient -from storperf.db.configuration_db import ConfigurationDB +from snaps.config.stack import StackConfig +from snaps.openstack.create_stack import OpenStackHeatStack +from snaps.openstack.os_credentials import OSCreds +from snaps.openstack.utils import heat_utils, cinder_utils, glance_utils +from snaps.thread_utils import worker_pool from storperf.db.job_db import JobDB from storperf.test_executor import TestExecutor +from storperf.utilities import ip_helper +from time import sleep +import json +import logging +import os +import paramiko +import socket +import uuid class ParameterError(Exception): @@ -35,257 +38,347 @@ class StorPerfMaster(object): def __init__(self): self.logger = logging.getLogger(__name__) - self.configuration_db = ConfigurationDB() + self.reset_values() + self.job_db = JobDB() + self.stack_settings = StackConfig( + name=self.stack_name, + template_path='storperf/resources/hot/agent-group.yaml') + + self.os_creds = OSCreds( + username=os.environ.get('OS_USERNAME'), + password=os.environ.get('OS_PASSWORD'), + auth_url=os.environ.get('OS_AUTH_URL'), + identity_api_version=os.environ.get('OS_IDENTITY_API_VERSION'), + user_domain_name=os.environ.get('OS_USER_DOMAIN_NAME'), + user_domain_id=os.environ.get('OS_USER_DOMAIN_ID'), + region_name=os.environ.get('OS_REGION_NAME'), + project_domain_name=os.environ.get('OS_PROJECT_DOMAIN_NAME'), + project_domain_id=os.environ.get('OS_PROJECT_DOMAIN_ID'), + project_name=os.environ.get('OS_PROJECT_NAME')) + + self.logger.debug("OSCreds: %s" % self.os_creds) + + self.heat_stack = OpenStackHeatStack(self.os_creds, + self.stack_settings) + + self._snaps_pool = worker_pool(20) + + def reset_values(self): + self._stack_name = 'StorPerfAgentGroup' + self.username = None + self.password = None + self._ssh_key = None + self._test_executor = None + self._agent_count = 1 + self._agent_image = None + self._agent_flavor = None + self._availability_zone = None + self._public_network = None + self._volume_count = 1 + self._volume_size = 1 + self._volume_type = None + self._slave_addresses = [] + self._filename = None + self._deadline = None + self._steady_state_samples = 10 + self._queue_depths = "1" + self._block_sizes = "4096" + self._workload_modules = [] + self._custom_workloads = [] + self._subnet_CIDR = '172.16.0.0/16' + self.slave_info = {} + self.stackless = False + self.mkfs = None + self.mount_device = None + self._last_snaps_check_time = None + self._cached_stack_id = None - template_file = open("storperf/resources/hot/agent-group.yaml") - self._agent_group_hot = template_file.read() - template_file = open("storperf/resources/hot/storperf-agent.yaml") - self._agent_resource_hot = template_file.read() - self._hot_files = { - 'storperf-agent.yaml': self._agent_resource_hot - } - self.logger.debug( - "Loaded agent-group template as: " + self._agent_group_hot) - self.logger.debug( - "Loaded agent-resource template as: " + self._agent_resource_hot) - - self._cinder_client = None - self._heat_client = None - self._test_executor = TestExecutor() - self._last_openstack_auth = datetime.now() + @property + def volume_count(self): + self._get_stack_info() + return self._volume_count + + @volume_count.setter + def volume_count(self, value): + if (self.stack_id is not None): + raise ParameterError( + "ERROR: Cannot change volume count after stack is created") + self._volume_count = value @property def volume_size(self): - value = self.configuration_db.get_configuration_value( - 'stack', - 'volume_size') - if (value is None): - self.volume_size = 1 - value = 1 - return int(value) + self._get_stack_info() + return self._volume_size @volume_size.setter def volume_size(self, value): if (self.stack_id is not None): raise ParameterError( "ERROR: Cannot change volume size after stack is created") + self._volume_size = value + + @property + def volume_type(self): + self._get_stack_info() + return self._volume_type + + @volume_type.setter + def volume_type(self, value): + if (self.stack_id is not None): + raise ParameterError( + "ERROR: Cannot change volume type after stack is created") + self._volume_type = value - self.configuration_db.set_configuration_value( - 'stack', - 'volume_size', - value) + @property + def stack_name(self): + return self._stack_name + + @stack_name.setter + def stack_name(self, value): + if value is None: + self.stackless = True + else: + self.stackless = False + self._stack_name = value + self.stack_settings.name = self.stack_name + self.stack_id = None + self._last_snaps_check_time = None @property - def agent_count(self): - value = self.configuration_db.get_configuration_value( - 'stack', - 'agent_count') + def subnet_CIDR(self): + return self._subnet_CIDR + + @subnet_CIDR.setter + def subnet_CIDR(self, value): + if (self.stack_id is not None): + raise ParameterError( + "ERROR: Cannot change subnet CIDR after stack is created") + self._subnet_CIDR = value - if (value is None): - self.agent_count = 1 - value = 1 - return int(value) + @property + def agent_count(self): + self._get_stack_info() + return self._agent_count @agent_count.setter def agent_count(self, value): if (self.stack_id is not None): raise ParameterError( "ERROR: Cannot change agent count after stack is created") - - self.configuration_db.set_configuration_value( - 'stack', - 'agent_count', - value) + self._agent_count = value @property def agent_image(self): - value = self.configuration_db.get_configuration_value( - 'stack', - 'agent_image') - - if (value is None): - value = 'Ubuntu 14.04' - self.agent_image = value - - return value + self._get_stack_info() + return self._agent_image @agent_image.setter def agent_image(self, value): if (self.stack_id is not None): raise ParameterError( "ERROR: Cannot change agent image after stack is created") - - self.configuration_db.set_configuration_value( - 'stack', - 'agent_image', - value) + self._agent_image = value @property def public_network(self): - return self.configuration_db.get_configuration_value( - 'stack', - 'public_network') + self._get_stack_info() + return self._public_network @public_network.setter def public_network(self, value): if (self.stack_id is not None): raise ParameterError( "ERROR: Cannot change public network after stack is created") - - self.configuration_db.set_configuration_value( - 'stack', - 'public_network', - value) + self._public_network = value @property def agent_flavor(self): - return self.configuration_db.get_configuration_value( - 'stack', - 'agent_flavor') + self._get_stack_info() + return self._agent_flavor @agent_flavor.setter def agent_flavor(self, value): if (self.stack_id is not None): raise ParameterError( "ERROR: Cannot change flavor after stack is created") + self._agent_flavor = value + + @property + def slave_addresses(self): + return self._slave_addresses - self.configuration_db.set_configuration_value( - 'stack', - 'agent_flavor', - value) + @slave_addresses.setter + def slave_addresses(self, value): + self._slave_addresses = value @property def stack_id(self): - return self.configuration_db.get_configuration_value( - 'stack', - 'stack_id') + self._get_stack_info() + return self._cached_stack_id @stack_id.setter def stack_id(self, value): - self.configuration_db.set_configuration_value( - 'stack', - 'stack_id', - value) + self._cached_stack_id = value + + def _get_stack_info(self): + if self.stackless: + self._cached_stack_id = None + return None + + if self._last_snaps_check_time is not None: + time_since_check = datetime.now() - self._last_snaps_check_time + if time_since_check.total_seconds() < 60: + return self._cached_stack_id + + self.heat_stack.initialize() + + if self.heat_stack.get_stack() is not None: + self._cached_stack_id = self.heat_stack.get_stack().id + cinder_cli = cinder_utils.cinder_client(self.os_creds) + glance_cli = glance_utils.glance_client(self.os_creds) + + router_worker = self._snaps_pool.apply_async( + self.heat_stack.get_router_creators) + + vm_inst_creators = self.heat_stack.get_vm_inst_creators() + self._agent_count = len(vm_inst_creators) + vm1 = vm_inst_creators[0] + self._availability_zone = \ + vm1.instance_settings.availability_zone + self._agent_flavor = vm1.instance_settings.flavor.name + + self._slave_addresses = [] + for instance in vm_inst_creators: + floating_ip = instance.get_floating_ip() + self._slave_addresses.append(floating_ip.ip) + self.logger.debug("Found VM at %s" % floating_ip.ip) + + server = vm1.get_vm_inst() + + image_worker = self._snaps_pool.apply_async( + glance_utils.get_image_by_id, (glance_cli, server.image_id)) + + self._volume_count = len(server.volume_ids) + if self._volume_count > 0: + volume_id = server.volume_ids[0]['id'] + volume = cinder_utils.get_volume_by_id( + cinder_cli, volume_id) + self.logger.debug("Volume id %s, size=%s, type=%s" % + (volume.id, + volume.size, + volume.type)) + self._volume_size = volume.size + self._volume_type = volume.type + + image = image_worker.get() + self._agent_image = image.name + + router_creators = router_worker.get() + router1 = router_creators[0] + self._public_network = \ + router1.router_settings.external_gateway + + self._last_snaps_check_time = datetime.now() + else: + self._cached_stack_id = None + + return self._cached_stack_id @property def availability_zone(self): - return self.configuration_db.get_configuration_value( - 'stack', - 'availability_zone') + self._get_stack_info() + return self._availability_zone @availability_zone.setter def availability_zone(self, value): - self.configuration_db.set_configuration_value( - 'stack', - 'availability_zone', - value) + if (self.stack_id is not None): + raise ParameterError( + "ERROR: Cannot change zone after stack is created") + self._availability_zone = value @property def volume_quota(self): - self._attach_to_openstack() - quotas = self._cinder_client.quotas.get( - os.environ.get('OS_TENANT_ID')) - return int(quotas.volumes) + # (TODO) Use SNAPS equivalent for Volume Quotas + pass @property def filename(self): - return self._test_executor.filename + return self._filename @filename.setter def filename(self, value): - self._test_executor.filename = value + self._filename = value @property def deadline(self): - return self._test_executor.deadline + return self._deadline @deadline.setter def deadline(self, value): - self._test_executor.deadline = value + self._deadline = value @property def steady_state_samples(self): - return self._test_executor.steady_state_samples + return self._steady_state_samples @steady_state_samples.setter def steady_state_samples(self, value): - self._test_executor.steady_state_samples = value + self._steady_state_samples = value @property def queue_depths(self): - return self._test_executor.queue_depths + return self._queue_depths @queue_depths.setter def queue_depths(self, value): - self._test_executor.queue_depths = value + self._queue_depths = value @property def block_sizes(self): - return self._test_executor.block_sizes + return self._block_sizes @block_sizes.setter def block_sizes(self, value): - self._test_executor.block_sizes = value - - @property - def is_stack_created(self): - if (self.stack_id is not None): - self._attach_to_openstack() - - stack = self._heat_client.stacks.get(self.stack_id) - status = getattr(stack, 'stack_status') - - self.logger.info("Status=" + status) - if (status == u'CREATE_COMPLETE'): - return True - - return False + self._block_sizes = value @property def workloads(self): - return self.configuration_db.get_configuration_value( - 'workload', - 'workloads') + return self._workload_modules @workloads.setter def workloads(self, value): - self._test_executor.register_workloads(value) + executor = TestExecutor() + executor.register_workloads(value) + self._workload_modules = value - self.configuration_db.set_configuration_value( - 'workload', - 'workloads', - str(self._test_executor.workload_modules)) + @property + def custom_workloads(self): + return self._custom_workloads + + @custom_workloads.setter + def custom_workloads(self, value): + self.logger.info("Custom workloads = %s" % value) + self._custom_workloads = value @property - def username(self): - return self.configuration_db.get_configuration_value( - 'stack', - 'username' - ) - - @username.setter - def username(self, value): - self.configuration_db.set_configuration_value( - 'stack', - 'username', - value - ) + def ssh_key(self): + if self._ssh_key is None: + return None + key = StringIO(self._ssh_key) + pkey = paramiko.RSAKey.from_private_key(key) + key.close() + return pkey + + @ssh_key.setter + def ssh_key(self, value): + self._ssh_key = value @property - def password(self): - return self.configuration_db.get_configuration_value( - 'stack', - 'password' - ) - - @password.setter - def password(self, value): - self.configuration_db.set_configuration_value( - 'stack', - 'password', - value - ) + def is_stack_created(self): + return (self.stack_id is not None and + (self.heat_stack.get_status() == u'CREATE_COMPLETE' or + self.heat_stack.get_status() == u'UPDATE_COMPLETE')) def get_logs(self, lines=None): LOG_DIR = './storperf.log' @@ -305,103 +398,135 @@ class StorPerfMaster(object): return logs def create_stack(self): - if (self.stack_id is not None): - raise ParameterError("ERROR: Stack has already been created") - - self._attach_to_openstack() - volume_quota = self.volume_quota - if (volume_quota > 0 and self.agent_count > volume_quota): - message = "ERROR: Volume quota too low: " + \ - str(self.agent_count) + " > " + str(self.volume_quota) - raise ParameterError(message) - - self.logger.debug("Creating stack") - stack = self._heat_client.stacks.create( - stack_name="StorPerfAgentGroup", - template=self._agent_group_hot, - files=self._hot_files, - parameters=self._make_parameters()) - - self.stack_id = stack['stack']['id'] - - while True: - stack = self._heat_client.stacks.get(self.stack_id) - status = getattr(stack, 'stack_status') - self.logger.debug("Stack status=%s" % (status,)) - if (status == u'CREATE_COMPLETE'): - return True - if (status == u'DELETE_COMPLETE'): - self.stack_id = None - return True - if (status == u'CREATE_FAILED'): - self.status_reason = getattr(stack, 'stack_status_reason') - sleep(5) - self._heat_client.stacks.delete(stack_id=self.stack_id) - sleep(2) + self.stackless = False + + self.stack_settings.resource_files = [ + 'storperf/resources/hot/storperf-agent.yaml', + 'storperf/resources/hot/storperf-volume.yaml'] + self.stack_settings.env_values = self._make_parameters() + try: + self.heat_stack.create(block=True) + except Exception as e: + self.logger.error("Stack creation failed") + self.logger.exception(e) + heat_cli = heat_utils.heat_client(self.os_creds) + if self.heat_stack.get_stack() is not None: + res = heat_utils.get_resources(heat_cli, + self.heat_stack.get_stack().id) + reason = "" + failed = False + for resource in res: + if resource.status == u'CREATE_FAILED': + failed = True + reason += "%s: %s " % (resource.name, + resource.status_reason) + self.logger.error("%s - %s: %s" % (resource.name, + resource.status, + resource.status_reason)) + + if failed: + try: + self.heat_stack.clean() + except Exception: + pass + raise Exception(reason) + else: + raise e def delete_stack(self): - if (self.stack_id is None): - raise ParameterError("ERROR: Stack does not exist") - - self._attach_to_openstack() - while True: - stack = self._heat_client.stacks.get(self.stack_id) - status = getattr(stack, 'stack_status') - self.logger.debug("Stack status=%s" % (status,)) - if (status == u'CREATE_COMPLETE'): - self._heat_client.stacks.delete(stack_id=self.stack_id) - if (status == u'DELETE_COMPLETE'): - self.stack_id = None - return True - if (status == u'DELETE_FAILED'): - sleep(5) - self._heat_client.stacks.delete(stack_id=self.stack_id) - sleep(2) + if self._test_executor is not None: + self._test_executor.terminate() + + stack_id = None + if (self.stack_id is not None): + stack_id = self.stack_id + try: + self.heat_stack.clean() + except Exception as e: + self.logger.error("Stack creation failed") + raise Exception(e) + self.stack_id = None + return stack_id + + def executor_event(self, executor): + if executor.terminated: + self._test_executor = None def execute_workloads(self, metadata={}): - if (self.stack_id is None): - raise ParameterError("ERROR: Stack does not exist") + if (self._test_executor is not None and + (not self._test_executor.terminated and + self._test_executor.job_id is not None)): + raise Exception("ERROR: Job {} is already running".format( + self._test_executor.job_id)) - job_list = self.job_db.fetch_jobs() - for job in job_list: - report = self.fetch_job_status(job) - if report['Status'] == 'Running': - raise "ERROR: Job {} is already running".format(job) + if (not self.stackless and + self.stack_id is None): + raise ParameterError("ERROR: Stack %s does not exist" % + self.stack_name) - self._attach_to_openstack() + self._test_executor = TestExecutor() + self._test_executor.register(self.executor_event) + self._test_executor.register_workloads(self._workload_modules) + self._test_executor.custom_workloads = self.custom_workloads + self._test_executor.block_sizes = self._block_sizes + self._test_executor.filename = self._filename + self._test_executor.deadline = self._deadline + self._test_executor.steady_state_samples = self._steady_state_samples + self._test_executor.queue_depths = self._queue_depths - stack = self._heat_client.stacks.get(self.stack_id) - outputs = getattr(stack, 'outputs') - slaves = outputs[0]['output_value'] + slaves = self._slave_addresses - setup_threads = [] + setup_pool = ThreadPool(processes=len(slaves)) + workers = [] for slave in slaves: - t = Thread(target=self._setup_slave, args=(slave,)) - setup_threads.append(t) - t.start() + worker = setup_pool.apply_async( + self._setup_slave, (slave,)) + workers.append(worker) - for thread in setup_threads: - thread.join() + for worker in workers: + worker.get() - self._test_executor.slaves = slaves + setup_pool.close() + self._test_executor.slaves = slaves + self._test_executor.volume_count = self.volume_count params = metadata - params['agent_count'] = self.agent_count + params['agent_count'] = len(slaves) + params['agent_flavor'] = self.agent_flavor + params['agent_image'] = self.agent_image + params['agent_info'] = json.dumps(self.slave_info) + params['avaiability_zone'] = self.availability_zone + params['block_sizes'] = self.block_sizes + params['deadline'] = self.deadline params['public_network'] = self.public_network + params['stack_name'] = self.stack_name + params['steady_state_samples'] = self.steady_state_samples + params['subnet_CIDR'] = self.subnet_CIDR + params['target'] = self.filename + params['volume_count'] = self.volume_count params['volume_size'] = self.volume_size - if self.username and self.password: + params['volume_type'] = self.volume_type + if self.username: params['username'] = self.username + if self.password: params['password'] = self.password + if self.ssh_key: + params['ssh_key'] = self.ssh_key job_id = self._test_executor.execute(params) + self.slave_info = {} return job_id def terminate_workloads(self): - return self._test_executor.terminate() + if self._test_executor is not None: + return self._test_executor.terminate() + else: + return True def fetch_results(self, job_id): - if self._test_executor.job_db.job_id == job_id: + if (self._test_executor is not None and + self._test_executor.job_db.job_id == job_id): return self._test_executor.metadata['details']['metrics'] workload_params = self.job_db.fetch_workload_params(job_id) @@ -414,7 +539,19 @@ class StorPerfMaster(object): return self.job_db.fetch_workload_params(job_id) def fetch_job_status(self, job_id): - return self._test_executor.execution_status(job_id) + results = {} + + if (self._test_executor is not None and + self._test_executor.job_id == job_id): + results['Status'] = 'Running' + results['Workloads'] = self._test_executor.workload_status + else: + jobs = self.job_db.fetch_jobs() + for job in jobs: + if job == job_id: + results['Status'] = "Completed" + + return results def fetch_all_jobs(self, metrics_type): job_list = self.job_db.fetch_jobs() @@ -442,7 +579,8 @@ class StorPerfMaster(object): timer = 10 while not alive: s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) - result = s.connect_ex((slave, 22)) + host, port = ip_helper.parse_address_and_port(slave) + result = s.connect_ex((host, port)) s.close() if result: @@ -459,13 +597,31 @@ class StorPerfMaster(object): ssh = paramiko.SSHClient() ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy()) if self.username and self.password: - ssh.connect(slave, - username=self.username, - password=self.password) + ssh.connect( + host, + port=port, + username=self.username, + password=self.password, + timeout=2) + elif self.username and self.ssh_key: + ssh.connect( + host, + port=port, + username=self.username, + pkey=self.ssh_key, + timeout=2) else: - ssh.connect(slave, username='storperf', - key_filename='storperf/resources/ssh/storperf_rsa', - timeout=2) + ssh.connect( + slave, + port=port, + username='storperf', + key_filename='storperf/resources/ssh/storperf_rsa', + timeout=2) + + uname = self._get_uname(ssh) + logger.debug("Slave uname is %s" % uname) + self.slave_info[slave] = {} + self.slave_info[slave]['uname'] = uname available = self._check_root_fs(ssh) logger.debug("Available space on / is %s" % available) @@ -484,6 +640,16 @@ class StorPerfMaster(object): logger.debug("Transferring fio to %s" % slave) scp.put('/usr/local/bin/fio', '~/') + if self.mkfs is not None: + self._mkfs(ssh, logger) + + if self.mount_device is not None: + self._mount(ssh, logger) + + def _get_uname(self, ssh): + (_, stdout, _) = ssh.exec_command("uname -a") + return stdout.readline() + def _check_root_fs(self, ssh): (_, stdout, _) = ssh.exec_command("df /") stdout.readline() @@ -492,6 +658,59 @@ class StorPerfMaster(object): available = lines[3] return int(available) + def _mkfs(self, ssh, logger): + command = "sudo umount %s" % (self.mount_device) + logger.info("Attempting %s" % command) + (_, stdout, stderr) = ssh.exec_command(command) + stdout.channel.recv_exit_status() + for line in iter(stdout.readline, b''): + logger.info(line) + for line in iter(stderr.readline, b''): + logger.error(line) + + command = "sudo mkfs.%s %s" % (self.mkfs, self.mount_device) + logger.info("Attempting %s" % command) + (_, stdout, stderr) = ssh.exec_command(command) + rc = stdout.channel.recv_exit_status() + stdout.channel.recv_exit_status() + for line in iter(stdout.readline, b''): + logger.info(line) + error_messages = "" + for line in iter(stderr.readline, b''): + logger.error(line) + error_messages += line.rstrip() + + if rc != 0: + raise Exception( + "Error executing on {0}: {1}".format( + command, error_messages)) + + def _mount(self, ssh, logger): + command = "sudo mkdir -p %s" % (self.filename) + logger.info("Attempting %s" % command) + (_, stdout, stderr) = ssh.exec_command(command) + stdout.channel.recv_exit_status() + for line in iter(stdout.readline, b''): + logger.info(line) + for line in iter(stderr.readline, b''): + logger.error(line) + + command = "sudo mount %s %s" % (self.mount_device, self.filename) + logger.info("Attempting %s" % command) + (_, stdout, stderr) = ssh.exec_command(command) + rc = stdout.channel.recv_exit_status() + for line in iter(stdout.readline, b''): + logger.info(line) + error_messages = "" + for line in iter(stderr.readline, b''): + logger.error(line) + error_messages += line.rstrip() + + if rc != 0: + raise Exception( + "Could not mount {0}: {1}".format( + self.mount_device, error_messages)) + def _resize_root_fs(self, ssh, logger): command = "sudo /usr/sbin/resize2fs /dev/vda1" logger.info("Attempting %s" % command) @@ -503,65 +722,17 @@ class StorPerfMaster(object): logger.error(line) def _make_parameters(self): + random_str = uuid.uuid4().hex[:6].upper() heat_parameters = {} heat_parameters['public_network'] = self.public_network heat_parameters['agent_count'] = self.agent_count + heat_parameters['volume_count'] = self.volume_count heat_parameters['volume_size'] = self.volume_size + heat_parameters['keypair_name'] = 'storperf_agent_keypair' + random_str + heat_parameters['subnet_CIDR'] = self.subnet_CIDR + if self.volume_type is not None: + heat_parameters['volume_type'] = self.volume_type heat_parameters['agent_image'] = self.agent_image heat_parameters['agent_flavor'] = self.agent_flavor heat_parameters['availability_zone'] = self.availability_zone return heat_parameters - - def _attach_to_openstack(self): - - time_since_last_auth = datetime.now() - self._last_openstack_auth - - if (self._heat_client is None or - time_since_last_auth.total_seconds() > 600): - self._last_openstack_auth = datetime.now() - - creds = { - "username": os.environ.get('OS_USERNAME'), - "password": os.environ.get('OS_PASSWORD'), - "auth_url": os.environ.get('OS_AUTH_URL'), - "project_domain_id": - os.environ.get('OS_PROJECT_DOMAIN_ID'), - "project_domain_name": - os.environ.get('OS_PROJECT_DOMAIN_NAME'), - "project_id": os.environ.get('OS_PROJECT_ID'), - "project_name": os.environ.get('OS_PROJECT_NAME'), - "tenant_name": os.environ.get('OS_TENANT_NAME'), - "tenant_id": os.environ.get("OS_TENANT_ID"), - "user_domain_id": os.environ.get('OS_USER_DOMAIN_ID'), - "user_domain_name": os.environ.get('OS_USER_DOMAIN_NAME') - } - - self.logger.debug("Creds: %s" % creds) - - loader = loading.get_plugin_loader('password') - auth = loader.load_from_options(**creds) - - https_cacert = os.getenv('OS_CACERT', '') - https_insecure = os.getenv('OS_INSECURE', '').lower() == 'true' - - self.logger.info("cacert=%s" % https_cacert) - - sess = session.Session(auth=auth, - verify=(https_cacert or not https_insecure)) - - self.logger.debug("Looking up orchestration endpoint") - heat_endpoint = sess.get_endpoint(auth=auth, - service_type="orchestration", - endpoint_type='publicURL') - - self.logger.debug("Orchestration endpoint is %s" % heat_endpoint) - - self._heat_client = heatclient.Client( - "1", - endpoint=heat_endpoint, - session=sess) - - self.logger.debug("Creating cinder client") - self._cinder_client = cinderclient.Client("2", session=sess, - cacert=https_cacert) - self.logger.debug("OpenStack authentication complete") |