From b2590d79a80dc12948d028ffee5b911b59daa777 Mon Sep 17 00:00:00 2001 From: mbeierl Date: Fri, 15 Jun 2018 16:01:39 -0400 Subject: Adds Threads to Heat Calls Adds thread pool and coverts looping calls to OpenStack to be submitted to it, allowing them to be run in parallel. Places a configurable maximum size (default 5) of concurrent calls on the thread pool. JIRA: SNAPS-316 Change-Id: Id209c5fe9b7ee7d755d3210a124b38fc3234f0d7 Signed-off-by: mbeierl --- snaps/openstack/create_stack.py | 47 +++++++++++------ snaps/openstack/utils/__init__.py | 3 +- snaps/openstack/utils/heat_utils.py | 102 ++++++++++++++++++++++++++++-------- snaps/thread_utils.py | 27 ++++++++++ 4 files changed, 140 insertions(+), 39 deletions(-) create mode 100644 snaps/thread_utils.py (limited to 'snaps') diff --git a/snaps/openstack/create_stack.py b/snaps/openstack/create_stack.py index 43ac307..7ecf449 100644 --- a/snaps/openstack/create_stack.py +++ b/snaps/openstack/create_stack.py @@ -23,16 +23,17 @@ from snaps.config.stack import StackConfig from snaps.openstack.create_flavor import OpenStackFlavor from snaps.openstack.create_instance import OpenStackVmInstance from snaps.openstack.create_keypairs import OpenStackKeypair -from snaps.openstack.create_security_group import OpenStackSecurityGroup +from snaps.openstack.create_network import OpenStackNetwork from snaps.openstack.create_router import OpenStackRouter +from snaps.openstack.create_security_group import OpenStackSecurityGroup from snaps.openstack.create_volume import OpenStackVolume from snaps.openstack.create_volume_type import OpenStackVolumeType from snaps.openstack.openstack_creator import OpenStackCloudObject from snaps.openstack.utils import ( nova_utils, settings_utils, glance_utils, cinder_utils) - -from snaps.openstack.create_network import OpenStackNetwork from snaps.openstack.utils import heat_utils, neutron_utils +from snaps.thread_utils import worker_pool + __author__ = 'spisarski' @@ -295,6 +296,23 @@ class OpenStackHeatStack(OpenStackCloudObject, object): return out + def __create_vm_inst(self, heat_keypair_option, stack_server): + + vm_inst_settings = settings_utils.create_vm_inst_config( + self.__nova, self._keystone, self.__neutron, stack_server, + self._os_creds.project_name) + image_settings = settings_utils.determine_image_config( + self.__glance, stack_server, self.image_settings) + keypair_settings = settings_utils.determine_keypair_config( + self.__heat_cli, self.__stack, stack_server, + keypair_settings=self.keypair_settings, + priv_key_key=heat_keypair_option) + vm_inst_creator = OpenStackVmInstance( + self._os_creds, vm_inst_settings, image_settings, + keypair_settings) + vm_inst_creator.initialize() + return vm_inst_creator + def get_vm_inst_creators(self, heat_keypair_option=None): """ Returns a list of VM Instance creator objects as configured by the heat @@ -308,21 +326,16 @@ class OpenStackHeatStack(OpenStackCloudObject, object): self.__heat_cli, self.__nova, self.__neutron, self._keystone, self.__stack, self._os_creds.project_name) + workers = [] for stack_server in stack_servers: - vm_inst_settings = settings_utils.create_vm_inst_config( - self.__nova, self._keystone, self.__neutron, stack_server, - self._os_creds.project_name) - image_settings = settings_utils.determine_image_config( - self.__glance, stack_server, self.image_settings) - keypair_settings = settings_utils.determine_keypair_config( - self.__heat_cli, self.__stack, stack_server, - keypair_settings=self.keypair_settings, - priv_key_key=heat_keypair_option) - vm_inst_creator = OpenStackVmInstance( - self._os_creds, vm_inst_settings, image_settings, - keypair_settings) - out.append(vm_inst_creator) - vm_inst_creator.initialize() + worker = worker_pool().apply_async( + self.__create_vm_inst, + (heat_keypair_option, + stack_server)) + workers.append(worker) + + for worker in workers: + out.append(worker.get()) return out diff --git a/snaps/openstack/utils/__init__.py b/snaps/openstack/utils/__init__.py index 7f92908..5435f8f 100644 --- a/snaps/openstack/utils/__init__.py +++ b/snaps/openstack/utils/__init__.py @@ -12,4 +12,5 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. -__author__ = 'spisarski' \ No newline at end of file + +__author__ = 'spisarski' diff --git a/snaps/openstack/utils/heat_utils.py b/snaps/openstack/utils/heat_utils.py index b38a7b9..a90690b 100644 --- a/snaps/openstack/utils/heat_utils.py +++ b/snaps/openstack/utils/heat_utils.py @@ -15,17 +15,18 @@ import logging import os -import yaml from heatclient.client import Client from heatclient.common.template_format import yaml_loader from novaclient.exceptions import NotFound from oslo_serialization import jsonutils +import yaml from snaps import file_utils from snaps.domain.stack import Stack, Resource, Output - from snaps.openstack.utils import ( keystone_utils, neutron_utils, nova_utils, cinder_utils) +from snaps.thread_utils import worker_pool + __author__ = 'spisarski' @@ -220,8 +221,14 @@ def get_stack_networks(heat_cli, neutron, stack): out = list() resources = get_resources(heat_cli, stack.id, 'OS::Neutron::Net') + workers = [] for resource in resources: - network = neutron_utils.get_network_by_id(neutron, resource.id) + worker = worker_pool().apply_async(neutron_utils.get_network_by_id, + (neutron, resource.id)) + workers.append(worker) + + for worker in workers: + network = worker.get() if network: out.append(network) @@ -239,8 +246,14 @@ def get_stack_routers(heat_cli, neutron, stack): out = list() resources = get_resources(heat_cli, stack.id, 'OS::Neutron::Router') + workers = [] for resource in resources: - router = neutron_utils.get_router_by_id(neutron, resource.id) + worker = worker_pool().apply_async(neutron_utils.get_router_by_id, + (neutron, resource.id)) + workers.append(worker) + + for worker in workers: + router = worker.get() if router: out.append(router) @@ -258,9 +271,15 @@ def get_stack_security_groups(heat_cli, neutron, stack): out = list() resources = get_resources(heat_cli, stack.id, 'OS::Neutron::SecurityGroup') + workers = [] for resource in resources: - security_group = neutron_utils.get_security_group_by_id( - neutron, resource.id) + worker = worker_pool().apply_async( + neutron_utils.get_security_group_by_id, + (neutron, resource.id)) + workers.append(worker) + + for worker in workers: + security_group = worker.get() if security_group: out.append(security_group) @@ -281,26 +300,39 @@ def get_stack_servers(heat_cli, nova, neutron, keystone, stack, project_name): out = list() srvr_res = get_resources(heat_cli, stack.id, 'OS::Nova::Server') + workers = [] for resource in srvr_res: + worker = worker_pool().apply_async( + nova_utils.get_server_object_by_id, + (nova, neutron, keystone, resource.id, project_name)) + workers.append((resource.id, worker)) + + for worker in workers: + resource_id = worker[0] try: - server = nova_utils.get_server_object_by_id( - nova, neutron, keystone, resource.id, project_name) + server = worker[1].get() if server: out.append(server) except NotFound: - logger.warn('VmInst cannot be located with ID %s', resource.id) + logger.warn('VmInst cannot be located with ID %s', resource_id) res_grps = get_resources(heat_cli, stack.id, 'OS::Heat::ResourceGroup') for res_grp in res_grps: res_ress = get_resources(heat_cli, res_grp.id) + workers = [] for res_res in res_ress: res_res_srvrs = get_resources( heat_cli, res_res.id, 'OS::Nova::Server') for res_srvr in res_res_srvrs: - server = nova_utils.get_server_object_by_id( - nova, neutron, keystone, res_srvr.id, project_name) - if server: - out.append(server) + worker = worker_pool().apply_async( + nova_utils.get_server_object_by_id, + (nova, neutron, keystone, res_srvr.id, project_name)) + workers.append(worker) + + for worker in workers: + server = worker.get() + if server: + out.append(server) return out @@ -316,13 +348,20 @@ def get_stack_keypairs(heat_cli, nova, stack): out = list() resources = get_resources(heat_cli, stack.id, 'OS::Nova::KeyPair') + workers = [] for resource in resources: + worker = worker_pool().apply_async( + nova_utils.get_keypair_by_id, (nova, resource.id)) + workers.append((resource.id, worker)) + + for worker in workers: + resource_id = worker[0] try: - keypair = nova_utils.get_keypair_by_id(nova, resource.id) + keypair = worker[1].get() if keypair: out.append(keypair) except NotFound: - logger.warn('Keypair cannot be located with ID %s', resource.id) + logger.warn('Keypair cannot be located with ID %s', resource_id) return out @@ -338,13 +377,20 @@ def get_stack_volumes(heat_cli, cinder, stack): out = list() resources = get_resources(heat_cli, stack.id, 'OS::Cinder::Volume') + workers = [] for resource in resources: + worker = worker_pool().apply_async( + cinder_utils.get_volume_by_id, (cinder, resource.id)) + workers.append((resource.id, worker)) + + for worker in workers: + resource_id = worker[0] try: - server = cinder_utils.get_volume_by_id(cinder, resource.id) + server = worker[1].get() if server: out.append(server) except NotFound: - logger.warn('Volume cannot be located with ID %s', resource.id) + logger.warn('Volume cannot be located with ID %s', resource_id) return out @@ -360,13 +406,20 @@ def get_stack_volume_types(heat_cli, cinder, stack): out = list() resources = get_resources(heat_cli, stack.id, 'OS::Cinder::VolumeType') + workers = [] for resource in resources: + worker = worker_pool().apply_async( + cinder_utils.get_volume_type_by_id, (cinder, resource.id)) + workers.append((resource.id, worker)) + + for worker in workers: + resource_id = worker[0] try: - vol_type = cinder_utils.get_volume_type_by_id(cinder, resource.id) + vol_type = worker[1].get() if vol_type: out.append(vol_type) except NotFound: - logger.warn('VolumeType cannot be located with ID %s', resource.id) + logger.warn('VolumeType cannot be located with ID %s', resource_id) return out @@ -383,13 +436,20 @@ def get_stack_flavors(heat_cli, nova, stack): out = list() resources = get_resources(heat_cli, stack.id, 'OS::Nova::Flavor') + workers = [] for resource in resources: + worker = worker_pool().apply_async( + nova_utils.get_flavor_by_id, (nova, resource.id)) + workers.append((resource.id, worker)) + + for worker in workers: + resource_id = worker[0] try: - flavor = nova_utils.get_flavor_by_id(nova, resource.id) + flavor = worker[1].get() if flavor: out.append(flavor) except NotFound: - logger.warn('Flavor cannot be located with ID %s', resource.id) + logger.warn('Flavor cannot be located with ID %s', resource_id) return out diff --git a/snaps/thread_utils.py b/snaps/thread_utils.py new file mode 100644 index 0000000..3a3eb4d --- /dev/null +++ b/snaps/thread_utils.py @@ -0,0 +1,27 @@ +# Copyright (c) 2017 Cable Television Laboratories, Inc. ("CableLabs") +# and others. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at: +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from multiprocessing.pool import ThreadPool + +_pool = None + + +# Define a thread pool with a limit for how many simultaneous API requests +# can be in progress at once. +def worker_pool(size=5): + global _pool + if _pool is None: + _pool = ThreadPool(processes=size) + return _pool -- cgit 1.2.3-korg