From: Alexandru Avadanii Date: Sun, 26 Mar 2017 19:16:47 +0200 Subject: [PATCH] Revert "Distributed serialization implementation" This reverts commit f3c440f76961ef161ab6650b3a41a3c775073ba0. --- nailgun/nailgun/consts.py | 4 - nailgun/nailgun/fixtures/openstack.yaml | 49 --- nailgun/nailgun/lcm/task_serializer.py | 15 +- nailgun/nailgun/lcm/transaction_serializer.py | 317 +------------------ .../nailgun/orchestrator/deployment_serializers.py | 1 - nailgun/nailgun/settings.py | 22 +- nailgun/nailgun/settings.yaml | 9 - .../fuel_statistics/installation_info.py | 10 - .../integration/test_cluster_changes_handler.py | 3 - .../test/unit/test_lcm_transaction_serializer.py | 351 +-------------------- nailgun/requirements.txt | 2 - 11 files changed, 14 insertions(+), 769 deletions(-) diff --git a/nailgun/nailgun/consts.py b/nailgun/nailgun/consts.py index 1a2f09f..5490d6c 100644 --- a/nailgun/nailgun/consts.py +++ b/nailgun/nailgun/consts.py @@ -528,7 +528,3 @@ HYPERVISORS = Enum( ) DPDK_DRIVER_IN_SRIOV_CASE = 'vfio-pci' - -SERIALIZATION_POLICY = Enum( - 'distributed' -) diff --git a/nailgun/nailgun/fixtures/openstack.yaml b/nailgun/nailgun/fixtures/openstack.yaml index 5557710..8b4d763 100644 --- a/nailgun/nailgun/fixtures/openstack.yaml +++ b/nailgun/nailgun/fixtures/openstack.yaml @@ -1114,55 +1114,6 @@ group: "security" weight: 20 type: "radio" - serialization_policy: - value: "default" - values: - - data: "default" - label: "Default serialization" - description: "Run serialization on the master node only" - - data: "distributed" - label: "Distributed serialization" - description: "Run serialization on the master and environment nodes. Nodes for serialization are selected only form that environment for wich serialization is performing." - label: "Serialization policy" - group: "general" - weight: 30 - type: "radio" - ds_use_discover: - group: "general" - label: "Use discovered nodes as workers for serialization" - type: "checkbox" - value: true - weight: 31 - restrictions: - - condition: "settings:common.serialization_policy.value != 'distributed'" - action: "hide" - ds_use_provisioned: - group: "general" - label: "Use provisioned nodes as workers for serialization" - type: "checkbox" - value: true - weight: 32 - restrictions: - - condition: "settings:common.serialization_policy.value != 'distributed'" - action: "hide" - ds_use_error: - group: "general" - label: "Use nodes in error state as workers for serialization" - type: "checkbox" - value: true - weight: 33 - restrictions: - - condition: "settings:common.serialization_policy.value != 'distributed'" - action: "hide" - ds_use_ready: - group: "general" - label: "Use ready nodes as workers for serialization" - type: "checkbox" - value: false - weight: 34 - restrictions: - - condition: "settings:common.serialization_policy.value != 'distributed'" - action: "hide" public_network_assignment: metadata: weight: 10 diff --git a/nailgun/nailgun/lcm/task_serializer.py b/nailgun/nailgun/lcm/task_serializer.py index 0d70f36..c004115 100644 --- a/nailgun/nailgun/lcm/task_serializer.py +++ b/nailgun/nailgun/lcm/task_serializer.py @@ -110,8 +110,6 @@ class Context(object): return evaluate def get_formatter_context(self, node_id): - # TODO(akislitsky) remove formatter context from the - # tasks serialization workflow data = self._transaction.get_new_data(node_id) return { 'CLUSTER_ID': data.get('cluster', {}).get('id'), @@ -149,14 +147,9 @@ class DeploymentTaskSerializer(object): :return: the result """ - def serialize(self, node_id, formatter_context=None): - """Serialize task in expected by orchestrator format + def serialize(self, node_id): + """Serialize task in expected by orchestrator format. - If serialization is performed on the remote worker - we should pass formatter_context parameter with values - from the master node settings - - :param formatter_context: formatter context :param node_id: the target node_id """ @@ -164,12 +157,10 @@ class DeploymentTaskSerializer(object): "serialize task %s for node %s", self.task_template['id'], node_id ) - formatter_context = formatter_context \ - or self.context.get_formatter_context(node_id) task = utils.traverse( self.task_template, utils.text_format_safe, - formatter_context, + self.context.get_formatter_context(node_id), { 'yaql_exp': self.context.get_yaql_interpreter( node_id, self.task_template['id']) diff --git a/nailgun/nailgun/lcm/transaction_serializer.py b/nailgun/nailgun/lcm/transaction_serializer.py index 32953c1..c02146b 100644 --- a/nailgun/nailgun/lcm/transaction_serializer.py +++ b/nailgun/nailgun/lcm/transaction_serializer.py @@ -14,21 +14,13 @@ # License for the specific language governing permissions and limitations # under the License. -import datetime +from distutils.version import StrictVersion import multiprocessing -import os -from Queue import Queue -import shutil -import tempfile -import distributed -from distutils.version import StrictVersion import six -import toolz from nailgun import consts from nailgun import errors -from nailgun.lcm.task_serializer import Context from nailgun.lcm.task_serializer import TasksSerializersFactory from nailgun.logger import logger from nailgun.settings import settings @@ -136,308 +128,7 @@ class MultiProcessingConcurrencyPolicy(object): pool.join() -def _distributed_serialize_tasks_for_node(formatter_contexts_idx, - node_and_tasks, scattered_data): - """Remote serialization call for DistributedProcessingPolicy - - Code of the function is copied to the workers and executed there, thus - we are including all required imports inside the function. - - :param formatter_contexts_idx: dict of formatter contexts with node_id - value as key - :param node_and_tasks: list of node_id, task_data tuples - :param scattered_data: feature object, that points to data copied to - workers - :return: [(node_id, serialized), error] - """ - - try: - factory = TasksSerializersFactory(scattered_data['context']) - - # Restoring settings - settings.config = scattered_data['settings_config'] - for k in formatter_contexts_idx: - formatter_contexts_idx[k]['SETTINGS'] = settings - - except Exception as e: - logger.exception("Distributed serialization failed") - return [((None, None), e)] - - result = [] - - for node_and_task in node_and_tasks: - - node_id = None - try: - node_id, task = node_and_task - - logger.debug("Starting distributed node %s task %s serialization", - node_id, task['id']) - - formatter_context = formatter_contexts_idx[node_id] - - serializer = factory.create_serializer(task) - serialized = serializer.serialize( - node_id, formatter_context=formatter_context) - - logger.debug("Distributed node %s task %s serialization " - "result: %s", node_id, task['id'], serialized) - - result.append(((node_id, serialized), None)) - except Exception as e: - logger.exception("Distributed serialization failed") - result.append(((node_id, None), e)) - break - - logger.debug("Processed tasks count: %s", len(result)) - return result - - -class DistributedProcessingPolicy(object): - - def __init__(self): - self.sent_jobs = Queue() - self.sent_jobs_count = 0 - - def _consume_jobs(self, chunk_size=None): - """Consumes jobs - - If chunk_size is set function consumes specified number of - Finished tasks or less if sent_jobs_ids queue became empty. - If chunk_size is None function consumes jobs until - sent_jobs_ids queue became empty. - Jobs with statuses Cancelled, Abandoned, Terminated will be - resent and their ids added to sent_jobs_ids queue - - :param chunk_size: size of consuming chunk - :return: generator on job results - """ - logger.debug("Consuming jobs started") - - jobs_to_consume = [] - while not self.sent_jobs.empty(): - job = self.sent_jobs.get() - jobs_to_consume.append(job) - - if chunk_size is not None: - chunk_size -= 1 - if chunk_size <= 0: - break - - for ready_job in distributed.as_completed(jobs_to_consume): - results = ready_job.result() - self.sent_jobs_count -= 1 - - for result in results: - (node_id, serialized), exc = result - logger.debug("Got ready task for node %s, serialized: %s, " - "error: %s", node_id, serialized, exc) - if exc is not None: - raise exc - yield node_id, serialized - - logger.debug("Consuming jobs finished") - - def _get_formatter_context(self, task_context, formatter_contexts_idx, - node_id): - try: - return formatter_contexts_idx[node_id] - except KeyError: - pass - - logger.debug("Calculating formatter context for node %s", node_id) - formatter_context = task_context.get_formatter_context( - node_id) - # Settings file is already sent to the workers - formatter_context.pop('SETTINGS', None) - formatter_contexts_idx[node_id] = formatter_context - - return formatter_context - - def _upload_nailgun_code(self, job_cluster): - """Creates zip of current nailgun code and uploads it to workers - - TODO(akislitsky): add workers scope when it will be implemented - in the distributed library - - :param job_cluster: distributed.Client - """ - logger.debug("Compressing nailgun code") - file_dir = os.path.dirname(__file__) - nailgun_root_dir = os.path.realpath(os.path.join(file_dir, '..', '..')) - archive = os.path.join(tempfile.gettempdir(), 'nailgun') - result = shutil.make_archive(archive, 'zip', nailgun_root_dir, - 'nailgun') - logger.debug("Nailgun code saved to: %s", result) - - logger.debug("Uploading nailgun archive %s to workers", result) - job_cluster.upload_file(result) - - def _scatter_data(self, job_cluster, context, workers): - logger.debug("Scattering data to workers started") - shared_data = {'context': context, 'settings_config': settings.config} - scattered = job_cluster.scatter(shared_data, broadcast=True, - workers=workers) - # Waiting data is scattered to workers - distributed.wait(scattered.values()) - logger.debug("Scattering data to workers finished") - - return scattered - - def _get_allowed_nodes_statuses(self, context): - """Extracts node statuses that allows distributed serialization""" - common = context.new.get('common', {}) - cluster = common.get('cluster', {}) - logger.debug("Getting allowed nodes statuses to use as serialization " - "workers for cluster %s", cluster.get('id')) - check_fields = { - 'ds_use_ready': consts.NODE_STATUSES.ready, - 'ds_use_provisioned': consts.NODE_STATUSES.provisioned, - 'ds_use_discover': consts.NODE_STATUSES.discover, - 'ds_use_error': consts.NODE_STATUSES.error - } - statuses = set() - for field, node_status in check_fields.items(): - if common.get(field): - statuses.add(node_status) - - logger.debug("Allowed nodes statuses to use as serialization workers " - "for cluster %s are: %s", cluster.get('id'), statuses) - return statuses - - def _get_allowed_nodes_ips(self, context): - """Filters online nodes from cluster by their status - - In the cluster settings we select nodes statuses allowed for - using in the distributed serialization. Accordingly to selected - statuses nodes are going to be filtered. - - :param context: TransactionContext - :return: set of allowed nodes ips - """ - ips = set() - allowed_statuses = self._get_allowed_nodes_statuses(context) - for node in six.itervalues(context.new.get('nodes', {})): - if node.get('status') in allowed_statuses: - ips.add(node.get('ip')) - ips.add(settings.MASTER_IP) - return ips - - def _get_allowed_workers(self, job_cluster, allowed_ips): - """Calculates workers addresses for distributed serialization - - Only workers that placed on the allowed nodes must be selected - for the serialization. - - :param job_cluster: distributed.Client - :param allowed_ips: allowed for serialization nodes ips - :return: list of workers addresses in format 'ip:port' - """ - logger.debug("Getting allowed workers") - workers = {} - - # Worker has address like tcp://ip:port - info = job_cluster.scheduler_info() - for worker_addr in six.iterkeys(info['workers']): - ip_port = worker_addr.split('//')[1] - ip = ip_port.split(':')[0] - if ip not in allowed_ips: - continue - try: - pool = workers[ip] - pool.add(ip_port) - except KeyError: - workers[ip] = set([ip_port]) - - return list(toolz.itertoolz.concat(six.itervalues(workers))) - - def execute(self, context, _, tasks): - """Executes task serialization on distributed nodes - - :param context: the transaction context - :param _: serializers factory - :param tasks: the tasks to serialize - :return sequence of serialized tasks - """ - logger.debug("Performing distributed tasks processing") - sched_address = '{0}:{1}'.format(settings.MASTER_IP, - settings.LCM_DS_JOB_SHEDULER_PORT) - job_cluster = distributed.Client(sched_address) - - allowed_ips = self._get_allowed_nodes_ips(context) - workers = self._get_allowed_workers(job_cluster, allowed_ips) - logger.debug("Allowed workers list for serialization: %s", workers) - workers_ips = set([ip_port.split(':')[0] for ip_port in workers]) - logger.debug("Allowed workers ips list for serialization: %s", - workers_ips) - - task_context = Context(context) - formatter_contexts_idx = {} - workers_num = len(workers) - max_jobs_in_queue = workers_num * settings.LCM_DS_NODE_LOAD_COEFF - logger.debug("Max jobs allowed in queue: %s", max_jobs_in_queue) - - start = datetime.datetime.utcnow() - tasks_count = 0 - - try: - self._upload_nailgun_code(job_cluster) - scattered = self._scatter_data(job_cluster, context, workers) - - for tasks_chunk in toolz.partition_all( - settings.LCM_DS_TASKS_PER_JOB, tasks): - - formatter_contexts_for_tasks = {} - - # Collecting required contexts for tasks - for task in tasks_chunk: - node_id, task_data = task - formatter_context = self._get_formatter_context( - task_context, formatter_contexts_idx, node_id) - if node_id not in formatter_contexts_for_tasks: - formatter_contexts_for_tasks[node_id] = \ - formatter_context - - logger.debug("Submitting job for tasks chunk: %s", tasks_chunk) - job = job_cluster.submit( - _distributed_serialize_tasks_for_node, - formatter_contexts_for_tasks, - tasks_chunk, - scattered, - workers=workers_ips - ) - - self.sent_jobs.put(job) - self.sent_jobs_count += 1 - - # We are limit the max number of tasks by the number of nodes - # which are used in the serialization - if self.sent_jobs_count >= max_jobs_in_queue: - for result in self._consume_jobs(chunk_size=workers_num): - tasks_count += 1 - yield result - - # We have no tasks any more but have unconsumed jobs - for result in self._consume_jobs(): - tasks_count += 1 - yield result - finally: - end = datetime.datetime.utcnow() - logger.debug("Distributed tasks processing finished. " - "Total time: %s. Tasks processed: %s", - end - start, tasks_count) - job_cluster.shutdown() - - -def is_distributed_processing_enabled(context): - common = context.new.get('common', {}) - return common.get('serialization_policy') == \ - consts.SERIALIZATION_POLICY.distributed - - -def get_processing_policy(context): - if is_distributed_processing_enabled(context): - return DistributedProcessingPolicy() +def get_concurrency_policy(): cpu_num = settings.LCM_SERIALIZERS_CONCURRENCY_FACTOR if not cpu_num: try: @@ -471,7 +162,7 @@ class TransactionSerializer(object): # ids of nodes in this group and how many nodes in this group can fail # and deployment will not be interrupted self.fault_tolerance_groups = [] - self.processing_policy = get_processing_policy(context) + self.concurrency_policy = get_concurrency_policy() @classmethod def serialize(cls, context, tasks, resolver): @@ -525,7 +216,7 @@ class TransactionSerializer(object): :param tasks: the deployment tasks :return the mapping tasks per node """ - serialized = self.processing_policy.execute( + serialized = self.concurrency_policy.execute( self.context, self.serializer_factory_class, self.expand_tasks(tasks) diff --git a/nailgun/nailgun/orchestrator/deployment_serializers.py b/nailgun/nailgun/orchestrator/deployment_serializers.py index 99d52a2..9473733 100644 --- a/nailgun/nailgun/orchestrator/deployment_serializers.py +++ b/nailgun/nailgun/orchestrator/deployment_serializers.py @@ -221,7 +221,6 @@ class DeploymentMultinodeSerializer(object): 'role': role, 'vms_conf': node.vms_conf, 'fail_if_error': role in self.critical_roles, - 'ip': node.ip, # TODO(eli): need to remove, requried for the fake thread only 'online': node.online, } diff --git a/nailgun/nailgun/settings.py b/nailgun/nailgun/settings.py index 1f9222b..eee08c9 100644 --- a/nailgun/nailgun/settings.py +++ b/nailgun/nailgun/settings.py @@ -38,21 +38,7 @@ class NailgunSettings(object): if test_config: settings_files.append(test_config) - # If settings.yaml doesn't exist we should have default - # config structure. Nailgun without settings is used - # when we distribute source code to the workers for - # distributed serialization - self.config = { - 'VERSION': {}, - 'DATABASE': { - 'engine': 'postgresql', - 'name': '', - 'host': '', - 'port': '0', - 'user': '', - 'passwd': '' - } - } + self.config = {} for sf in settings_files: try: logger.debug("Trying to read config file %s" % sf) @@ -61,9 +47,9 @@ class NailgunSettings(object): logger.error("Error while reading config file %s: %s" % (sf, str(e))) - self.config['VERSION']['api'] = self.config.get('API') + self.config['VERSION']['api'] = self.config['API'] self.config['VERSION']['feature_groups'] = \ - self.config.get('FEATURE_GROUPS') + self.config['FEATURE_GROUPS'] fuel_release = self.get_file_content(consts.FUEL_RELEASE_FILE) if fuel_release: @@ -75,7 +61,7 @@ class NailgunSettings(object): self.config['VERSION']['openstack_version'] = \ fuel_openstack_version - if int(self.config.get("DEVELOPMENT", 0)): + if int(self.config.get("DEVELOPMENT")): logger.info("DEVELOPMENT MODE ON:") here = os.path.abspath( os.path.join(os.path.dirname(__file__), '..') diff --git a/nailgun/nailgun/settings.yaml b/nailgun/nailgun/settings.yaml index c2faaa8..92b3518 100644 --- a/nailgun/nailgun/settings.yaml +++ b/nailgun/nailgun/settings.yaml @@ -177,15 +177,6 @@ YAQL_MEMORY_QUOTA: 104857600 LCM_CHECK_TASK_VERSION: False -# Coefficient for calculation max jobs queue length. If jobs number reaches the -# len(nodes) * load_coef we stop generate and start consume of jobs. -LCM_DS_NODE_LOAD_COEFF: 2 -# Port of dask-scheduler on the master node -LCM_DS_JOB_SHEDULER_PORT: 8002 -# Size of tasks chunk sending to the distributed worker -LCM_DS_TASKS_PER_JOB: 100 - - DPDK_MAX_CPUS_PER_NIC: 4 TRUNCATE_LOG_ENTRIES: 100 diff --git a/nailgun/nailgun/statistics/fuel_statistics/installation_info.py b/nailgun/nailgun/statistics/fuel_statistics/installation_info.py index d024cdd..482d8c5 100644 --- a/nailgun/nailgun/statistics/fuel_statistics/installation_info.py +++ b/nailgun/nailgun/statistics/fuel_statistics/installation_info.py @@ -56,16 +56,6 @@ class InstallationInfo(object): 'propagate_task_deploy', None), WhiteListRule(('common', 'security_groups', 'value'), 'security_groups', None), - WhiteListRule(('common', 'serialization_policy', 'value'), - 'serialization_policy', None), - WhiteListRule(('common', 'ds_use_discover', 'value'), - 'ds_use_discover', None), - WhiteListRule(('common', 'ds_use_provisioned', 'value'), - 'ds_use_provisioned', None), - WhiteListRule(('common', 'ds_use_ready', 'value'), - 'ds_use_ready', None), - WhiteListRule(('common', 'ds_use_error', 'value'), - 'ds_use_error', None), WhiteListRule(('corosync', 'verified', 'value'), 'corosync_verified', None), diff --git a/nailgun/nailgun/test/integration/test_cluster_changes_handler.py b/nailgun/nailgun/test/integration/test_cluster_changes_handler.py index b8f4e98..a8661ab 100644 --- a/nailgun/nailgun/test/integration/test_cluster_changes_handler.py +++ b/nailgun/nailgun/test/integration/test_cluster_changes_handler.py @@ -187,7 +187,6 @@ class TestHandlers(BaseIntegrationTest): 'fail_if_error': is_critical, 'vms_conf': [], 'fqdn': 'node-%d.%s' % (node.id, settings.DNS_DOMAIN), - 'ip': node.ip, 'network_data': { 'eth1': { @@ -604,7 +603,6 @@ class TestHandlers(BaseIntegrationTest): 'online': node.online, 'fail_if_error': is_critical, 'fqdn': 'node-%d.%s' % (node.id, settings.DNS_DOMAIN), - 'ip': node.ip, 'priority': 100, 'vms_conf': [], 'network_scheme': { @@ -1098,7 +1096,6 @@ class TestHandlers(BaseIntegrationTest): 'fail_if_error': is_critical, 'fqdn': 'node-%d.%s' % (node.id, settings.DNS_DOMAIN), 'priority': 100, - 'ip': node.ip, 'vms_conf': [], 'network_scheme': { diff --git a/nailgun/nailgun/test/unit/test_lcm_transaction_serializer.py b/nailgun/nailgun/test/unit/test_lcm_transaction_serializer.py index 9751d55..6450e19 100644 --- a/nailgun/nailgun/test/unit/test_lcm_transaction_serializer.py +++ b/nailgun/nailgun/test/unit/test_lcm_transaction_serializer.py @@ -14,24 +14,20 @@ # License for the specific language governing permissions and limitations # under the License. -import copy -import exceptions import mock import multiprocessing.dummy from nailgun import consts from nailgun import errors from nailgun import lcm -from nailgun.lcm import TransactionContext -from nailgun.settings import settings -from nailgun.test.base import BaseTestCase from nailgun.utils.resolvers import TagResolver +from nailgun.test.base import BaseUnitTest -class TestTransactionSerializer(BaseTestCase): + +class TestTransactionSerializer(BaseUnitTest): @classmethod def setUpClass(cls): - super(TestTransactionSerializer, cls).setUpClass() cls.tasks = [ { 'id': 'task1', 'roles': ['controller'], @@ -466,344 +462,3 @@ class TestTransactionSerializer(BaseTestCase): 9, lcm.TransactionSerializer.calculate_fault_tolerance('-1 ', 10) ) - - def _get_context_for_distributed_serialization(self): - new = copy.deepcopy(self.context.new) - new['common']['serialization_policy'] = \ - consts.SERIALIZATION_POLICY.distributed - return TransactionContext(new) - - @mock.patch('nailgun.lcm.transaction_serializer.distributed.wait') - @mock.patch('nailgun.lcm.transaction_serializer.distributed.as_completed') - def test_distributed_serialization(self, _, as_completed): - context = self._get_context_for_distributed_serialization() - - with mock.patch( - 'nailgun.lcm.transaction_serializer.distributed.Client' - ) as job_cluster: - job = mock.Mock() - job.result.return_value = [ - (('1', {"id": "task1", "type": "skipped"}), None) - ] - - submit = mock.Mock() - submit.return_value = job - - as_completed.return_value = [job] - - job_cluster.return_value.submit = submit - job_cluster.return_value.scheduler_info.return_value = \ - {'workers': {'tcp://worker': {}}} - - lcm.TransactionSerializer.serialize( - context, self.tasks, self.resolver) - self.assertTrue(submit.called) - # 4 controller task + 1 compute + 1 cinder - self.assertTrue(6, submit.call_count) - - @mock.patch('nailgun.lcm.transaction_serializer.distributed.wait') - @mock.patch('nailgun.lcm.transaction_serializer.distributed.as_completed') - @mock.patch('nailgun.lcm.transaction_serializer.' - 'DistributedProcessingPolicy._get_formatter_context') - def test_distributed_serialization_workers_scope(self, formatter_context, - as_completed, _): - context = self._get_context_for_distributed_serialization() - - node_id = '1' - task = { - 'id': 'task1', 'roles': ['controller'], - 'type': 'puppet', 'version': '2.0.0' - } - - with mock.patch( - 'nailgun.lcm.transaction_serializer.distributed.Client' - ) as job_cluster: - - # Mocking job processing - job = mock.Mock() - job.result.return_value = [((node_id, task), None)] - - submit = mock.Mock() - submit.return_value = job - - as_completed.return_value = [job] - - scatter = mock.Mock() - job_cluster.return_value.scatter = scatter - - job_cluster.return_value.scatter.return_value = {} - job_cluster.return_value.submit = submit - - formatter_context.return_value = {node_id: {}} - - # Configuring available workers - job_cluster.return_value.scheduler_info.return_value = \ - { - 'workers': { - 'tcp://{0}'.format(settings.MASTER_IP): {}, - 'tcp://192.168.0.1:33334': {}, - 'tcp://127.0.0.2:33335': {}, - } - } - - # Performing serialization - lcm.TransactionSerializer.serialize( - context, [task], self.resolver - ) - - # Checking data is scattered only to expected workers - scatter.assert_called_once() - scatter.assert_called_with( - {'context': context, 'settings_config': settings.config}, - broadcast=True, - workers=[settings.MASTER_IP] - ) - - # Checking submit job only to expected workers - submit.assert_called_once() - serializer = lcm.transaction_serializer - submit.assert_called_with( - serializer._distributed_serialize_tasks_for_node, - {node_id: formatter_context()}, - ((node_id, task),), - job_cluster().scatter(), - workers=set([settings.MASTER_IP]) - ) - - def test_distributed_serialization_get_allowed_nodes_ips(self): - policy = lcm.transaction_serializer.DistributedProcessingPolicy() - - context_data = { - 'common': { - 'serialization_policy': - consts.SERIALIZATION_POLICY.distributed, - 'ds_use_error': True, - 'ds_use_provisioned': True, - 'ds_use_discover': True, - 'ds_use_ready': False - }, - 'nodes': { - '1': {'status': consts.NODE_STATUSES.error, - 'ip': '10.20.0.3'}, - '2': {'status': consts.NODE_STATUSES.provisioned, - 'ip': '10.20.0.4'}, - '3': {'status': consts.NODE_STATUSES.discover, - 'ip': '10.20.0.5'}, - '4': {'status': consts.NODE_STATUSES.ready, - 'ip': '10.20.0.6'}, - } - } - - actual = policy._get_allowed_nodes_ips( - TransactionContext(context_data)) - self.assertItemsEqual( - [settings.MASTER_IP, '10.20.0.3', '10.20.0.4', '10.20.0.5'], - actual - ) - - def test_distributed_serialization_get_allowed_nodes_statuses(self): - policy = lcm.transaction_serializer.DistributedProcessingPolicy() - context_data = {} - actual = policy._get_allowed_nodes_statuses( - TransactionContext(context_data)) - self.assertItemsEqual([], actual) - - context_data['common'] = { - 'ds_use_discover': False, - 'ds_use_provisioned': False, - 'ds_use_error': False, - 'ds_use_ready': False - } - actual = policy._get_allowed_nodes_statuses( - TransactionContext(context_data)) - self.assertItemsEqual([], actual) - - context_data['common']['ds_use_discover'] = True - actual = policy._get_allowed_nodes_statuses( - TransactionContext(context_data)) - expected = [consts.NODE_STATUSES.discover] - self.assertItemsEqual(expected, actual) - - context_data['common']['ds_use_provisioned'] = True - actual = policy._get_allowed_nodes_statuses( - TransactionContext(context_data)) - expected = [consts.NODE_STATUSES.discover, - consts.NODE_STATUSES.provisioned] - self.assertItemsEqual(expected, actual) - - context_data['common']['ds_use_error'] = True - actual = policy._get_allowed_nodes_statuses( - TransactionContext(context_data)) - expected = [consts.NODE_STATUSES.discover, - consts.NODE_STATUSES.provisioned, - consts.NODE_STATUSES.error] - self.assertItemsEqual(expected, actual) - - context_data['common']['ds_use_ready'] = True - actual = policy._get_allowed_nodes_statuses( - TransactionContext(context_data)) - expected = [consts.NODE_STATUSES.discover, - consts.NODE_STATUSES.provisioned, - consts.NODE_STATUSES.error, - consts.NODE_STATUSES.ready] - self.assertItemsEqual(expected, actual) - - def test_distributed_serialization_get_allowed_workers(self): - policy = lcm.transaction_serializer.DistributedProcessingPolicy() - - with mock.patch( - 'nailgun.lcm.transaction_serializer.distributed.Client' - ) as job_cluster: - job_cluster.scheduler_info.return_value = \ - {'workers': { - 'tcp://10.20.0.2:1': {}, - 'tcp://10.20.0.2:2': {}, - 'tcp://10.20.0.3:1': {}, - 'tcp://10.20.0.3:2': {}, - 'tcp://10.20.0.3:3': {}, - 'tcp://10.20.0.4:1': {}, - 'tcp://10.20.0.5:1': {} - }} - allowed_ips = set(['10.20.0.2', '10.20.0.3', '10.20.0.5']) - - expected = ['10.20.0.2:1', '10.20.0.2:2', '10.20.0.3:1', - '10.20.0.3:2', '10.20.0.3:3', '10.20.0.5:1'] - actual = policy._get_allowed_workers(job_cluster, allowed_ips) - self.assertItemsEqual(expected, actual) - - def test_distributed_serialization_serialize_task(self): - task = { - 'id': 'task1', 'roles': ['controller'], - 'type': 'puppet', 'version': '2.0.0', - 'parameters': { - 'master_ip': '{MN_IP}', - 'host': {'yaql_exp': '$.public_ssl.hostname'}, - 'attr': {'yaql_exp': '$node.attributes.a_str'} - } - } - - formatter_contexts_idx = { - '1': {'MN_IP': '10.0.0.1'}, - '2': {} - } - scattered_data = { - 'settings_config': settings.config, - 'context': self.context - } - - serializer = lcm.transaction_serializer - actual = serializer._distributed_serialize_tasks_for_node( - formatter_contexts_idx, [('1', task), ('2', task)], scattered_data) - - expected = [ - ( - ( - '1', - { - 'id': 'task1', - 'type': 'puppet', - 'parameters': { - 'cwd': '/', - 'master_ip': '10.0.0.1', - 'host': 'localhost', - 'attr': 'text1' - }, - 'fail_on_error': True - } - ), - None - ), - ( - ( - '2', - { - 'id': 'task1', - 'type': 'puppet', - 'parameters': { - 'cwd': '/', - 'master_ip': '{MN_IP}', - 'host': 'localhost', - 'attr': 'text2' - }, - 'fail_on_error': True - } - ), - None - ) - ] - - self.assertItemsEqual(expected, actual) - - def test_distributed_serialization_serialize_task_failure(self): - task = { - 'id': 'task1', 'roles': ['controller'], - 'type': 'puppet', 'version': '2.0.0', - 'parameters': { - 'fake': {'yaql_exp': '$.some.fake_param'} - } - } - - formatter_contexts_idx = {'2': {}} - scattered_data = { - 'settings_config': settings.config, - 'context': self.context - } - - serializer = lcm.transaction_serializer - result = serializer._distributed_serialize_tasks_for_node( - formatter_contexts_idx, [('2', task)], scattered_data) - (_, __), err = result[0] - self.assertIsInstance(err, exceptions.KeyError) - - -class TestConcurrencyPolicy(BaseTestCase): - - @mock.patch( - 'nailgun.lcm.transaction_serializer.multiprocessing.cpu_count', - return_value=1 - ) - def test_one_cpu(self, cpu_count): - policy = lcm.transaction_serializer.get_processing_policy( - lcm.TransactionContext({})) - self.assertIsInstance( - policy, - lcm.transaction_serializer.SingleWorkerConcurrencyPolicy - ) - self.assertTrue(cpu_count.is_called) - - @mock.patch( - 'nailgun.lcm.transaction_serializer.multiprocessing.cpu_count', - return_value=0 - ) - def test_zero_cpu(self, cpu_count): - policy = lcm.transaction_serializer.get_processing_policy( - lcm.TransactionContext({})) - self.assertIsInstance( - policy, - lcm.transaction_serializer.SingleWorkerConcurrencyPolicy - ) - self.assertTrue(cpu_count.is_called) - - @mock.patch( - 'nailgun.lcm.transaction_serializer.multiprocessing.cpu_count', - side_effect=NotImplementedError - ) - def test_cpu_count_not_implemented(self, cpu_count): - policy = lcm.transaction_serializer.get_processing_policy( - lcm.TransactionContext({})) - self.assertIsInstance( - policy, - lcm.transaction_serializer.SingleWorkerConcurrencyPolicy - ) - self.assertTrue(cpu_count.is_called) - - def test_distributed_serialization_enabled_in_cluster(self): - context_data = {'common': { - 'serialization_policy': consts.SERIALIZATION_POLICY.distributed - }} - policy = lcm.transaction_serializer.get_processing_policy( - lcm.TransactionContext(context_data)) - self.assertIsInstance( - policy, - lcm.transaction_serializer.DistributedProcessingPolicy - ) diff --git a/nailgun/requirements.txt b/nailgun/requirements.txt index c702e8a..96bed25 100644 --- a/nailgun/requirements.txt +++ b/nailgun/requirements.txt @@ -47,5 +47,3 @@ stevedore>=1.5.0 # See: https://bugs.launchpad.net/fuel/+bug/1519727 setuptools<=18.5 yaql>=1.0.0 -# Distributed nodes serialization -distributed==1.16.0