diff options
author | Alexandru Avadanii <Alexandru.Avadanii@enea.com> | 2017-03-26 19:18:52 +0200 |
---|---|---|
committer | Alexandru Avadanii <Alexandru.Avadanii@enea.com> | 2017-03-26 19:31:07 +0200 |
commit | 8b28c44a4be6beef8a71e672f543001bcc0a080f (patch) | |
tree | 9773f9cd21f403b7aa21249e9e720fc423979462 /build/f_repos/patch/fuel-web/0002-Revert-Distributed-serialization-implementation.patch | |
parent | cc3f3226b0e48c035549247d8a716f91675955f8 (diff) |
Revert: Blueprint "bp/distributed-serialization"
Until upstream fixes bug [1], partially revert blueprint
implementation [2]:
- fuel-agent: no-op, we overwrite fuel_bootstrap_cli.yaml anyway;
- fuel-library: no-op, keep newly added port config in place;
- fuel-web: revert all blueprint related changes;
NOTE: When this blueprint becomes part of Fuel@OPNFV, we need to
update our "fuel_bootstrap_cli.yaml".
[1] https://bugs.launchpad.net/fuel/+bug/1676197
[2] https://review.openstack.org/#/q/topic:bp/
distributed-serialization+status:merged+branch:stable/newton
JIRA: FUEL-265
Change-Id: Icf392a3446e5c89c2592d1733bc1f3401b7d69ad
Signed-off-by: Alexandru Avadanii <Alexandru.Avadanii@enea.com>
Diffstat (limited to 'build/f_repos/patch/fuel-web/0002-Revert-Distributed-serialization-implementation.patch')
-rw-r--r-- | build/f_repos/patch/fuel-web/0002-Revert-Distributed-serialization-implementation.patch | 1006 |
1 files changed, 1006 insertions, 0 deletions
diff --git a/build/f_repos/patch/fuel-web/0002-Revert-Distributed-serialization-implementation.patch b/build/f_repos/patch/fuel-web/0002-Revert-Distributed-serialization-implementation.patch new file mode 100644 index 000000000..152197e74 --- /dev/null +++ b/build/f_repos/patch/fuel-web/0002-Revert-Distributed-serialization-implementation.patch @@ -0,0 +1,1006 @@ +From: Alexandru Avadanii <Alexandru.Avadanii@enea.com> +Date: Sun, 26 Mar 2017 19:16:47 +0200 +Subject: [PATCH] Revert "Distributed serialization implementation" + +This reverts commit f3c440f76961ef161ab6650b3a41a3c775073ba0. +--- + nailgun/nailgun/consts.py | 4 - + nailgun/nailgun/fixtures/openstack.yaml | 49 --- + nailgun/nailgun/lcm/task_serializer.py | 15 +- + nailgun/nailgun/lcm/transaction_serializer.py | 317 +------------------ + .../nailgun/orchestrator/deployment_serializers.py | 1 - + nailgun/nailgun/settings.py | 22 +- + nailgun/nailgun/settings.yaml | 9 - + .../fuel_statistics/installation_info.py | 10 - + .../integration/test_cluster_changes_handler.py | 3 - + .../test/unit/test_lcm_transaction_serializer.py | 351 +-------------------- + nailgun/requirements.txt | 2 - + 11 files changed, 14 insertions(+), 769 deletions(-) + +diff --git a/nailgun/nailgun/consts.py b/nailgun/nailgun/consts.py +index 1a2f09f..5490d6c 100644 +--- a/nailgun/nailgun/consts.py ++++ b/nailgun/nailgun/consts.py +@@ -528,7 +528,3 @@ HYPERVISORS = Enum( + ) + + DPDK_DRIVER_IN_SRIOV_CASE = 'vfio-pci' +- +-SERIALIZATION_POLICY = Enum( +- 'distributed' +-) +diff --git a/nailgun/nailgun/fixtures/openstack.yaml b/nailgun/nailgun/fixtures/openstack.yaml +index 5557710..8b4d763 100644 +--- a/nailgun/nailgun/fixtures/openstack.yaml ++++ b/nailgun/nailgun/fixtures/openstack.yaml +@@ -1114,55 +1114,6 @@ + group: "security" + weight: 20 + type: "radio" +- serialization_policy: +- value: "default" +- values: +- - data: "default" +- label: "Default serialization" +- description: "Run serialization on the master node only" +- - data: "distributed" +- label: "Distributed serialization" +- description: "Run serialization on the master and environment nodes. Nodes for serialization are selected only form that environment for wich serialization is performing." +- label: "Serialization policy" +- group: "general" +- weight: 30 +- type: "radio" +- ds_use_discover: +- group: "general" +- label: "Use discovered nodes as workers for serialization" +- type: "checkbox" +- value: true +- weight: 31 +- restrictions: +- - condition: "settings:common.serialization_policy.value != 'distributed'" +- action: "hide" +- ds_use_provisioned: +- group: "general" +- label: "Use provisioned nodes as workers for serialization" +- type: "checkbox" +- value: true +- weight: 32 +- restrictions: +- - condition: "settings:common.serialization_policy.value != 'distributed'" +- action: "hide" +- ds_use_error: +- group: "general" +- label: "Use nodes in error state as workers for serialization" +- type: "checkbox" +- value: true +- weight: 33 +- restrictions: +- - condition: "settings:common.serialization_policy.value != 'distributed'" +- action: "hide" +- ds_use_ready: +- group: "general" +- label: "Use ready nodes as workers for serialization" +- type: "checkbox" +- value: false +- weight: 34 +- restrictions: +- - condition: "settings:common.serialization_policy.value != 'distributed'" +- action: "hide" + public_network_assignment: + metadata: + weight: 10 +diff --git a/nailgun/nailgun/lcm/task_serializer.py b/nailgun/nailgun/lcm/task_serializer.py +index 0d70f36..c004115 100644 +--- a/nailgun/nailgun/lcm/task_serializer.py ++++ b/nailgun/nailgun/lcm/task_serializer.py +@@ -110,8 +110,6 @@ class Context(object): + return evaluate + + def get_formatter_context(self, node_id): +- # TODO(akislitsky) remove formatter context from the +- # tasks serialization workflow + data = self._transaction.get_new_data(node_id) + return { + 'CLUSTER_ID': data.get('cluster', {}).get('id'), +@@ -149,14 +147,9 @@ class DeploymentTaskSerializer(object): + :return: the result + """ + +- def serialize(self, node_id, formatter_context=None): +- """Serialize task in expected by orchestrator format ++ def serialize(self, node_id): ++ """Serialize task in expected by orchestrator format. + +- If serialization is performed on the remote worker +- we should pass formatter_context parameter with values +- from the master node settings +- +- :param formatter_context: formatter context + :param node_id: the target node_id + """ + +@@ -164,12 +157,10 @@ class DeploymentTaskSerializer(object): + "serialize task %s for node %s", + self.task_template['id'], node_id + ) +- formatter_context = formatter_context \ +- or self.context.get_formatter_context(node_id) + task = utils.traverse( + self.task_template, + utils.text_format_safe, +- formatter_context, ++ self.context.get_formatter_context(node_id), + { + 'yaql_exp': self.context.get_yaql_interpreter( + node_id, self.task_template['id']) +diff --git a/nailgun/nailgun/lcm/transaction_serializer.py b/nailgun/nailgun/lcm/transaction_serializer.py +index 32953c1..c02146b 100644 +--- a/nailgun/nailgun/lcm/transaction_serializer.py ++++ b/nailgun/nailgun/lcm/transaction_serializer.py +@@ -14,21 +14,13 @@ + # License for the specific language governing permissions and limitations + # under the License. + +-import datetime ++from distutils.version import StrictVersion + import multiprocessing +-import os +-from Queue import Queue +-import shutil +-import tempfile + +-import distributed +-from distutils.version import StrictVersion + import six +-import toolz + + from nailgun import consts + from nailgun import errors +-from nailgun.lcm.task_serializer import Context + from nailgun.lcm.task_serializer import TasksSerializersFactory + from nailgun.logger import logger + from nailgun.settings import settings +@@ -136,308 +128,7 @@ class MultiProcessingConcurrencyPolicy(object): + pool.join() + + +-def _distributed_serialize_tasks_for_node(formatter_contexts_idx, +- node_and_tasks, scattered_data): +- """Remote serialization call for DistributedProcessingPolicy +- +- Code of the function is copied to the workers and executed there, thus +- we are including all required imports inside the function. +- +- :param formatter_contexts_idx: dict of formatter contexts with node_id +- value as key +- :param node_and_tasks: list of node_id, task_data tuples +- :param scattered_data: feature object, that points to data copied to +- workers +- :return: [(node_id, serialized), error] +- """ +- +- try: +- factory = TasksSerializersFactory(scattered_data['context']) +- +- # Restoring settings +- settings.config = scattered_data['settings_config'] +- for k in formatter_contexts_idx: +- formatter_contexts_idx[k]['SETTINGS'] = settings +- +- except Exception as e: +- logger.exception("Distributed serialization failed") +- return [((None, None), e)] +- +- result = [] +- +- for node_and_task in node_and_tasks: +- +- node_id = None +- try: +- node_id, task = node_and_task +- +- logger.debug("Starting distributed node %s task %s serialization", +- node_id, task['id']) +- +- formatter_context = formatter_contexts_idx[node_id] +- +- serializer = factory.create_serializer(task) +- serialized = serializer.serialize( +- node_id, formatter_context=formatter_context) +- +- logger.debug("Distributed node %s task %s serialization " +- "result: %s", node_id, task['id'], serialized) +- +- result.append(((node_id, serialized), None)) +- except Exception as e: +- logger.exception("Distributed serialization failed") +- result.append(((node_id, None), e)) +- break +- +- logger.debug("Processed tasks count: %s", len(result)) +- return result +- +- +-class DistributedProcessingPolicy(object): +- +- def __init__(self): +- self.sent_jobs = Queue() +- self.sent_jobs_count = 0 +- +- def _consume_jobs(self, chunk_size=None): +- """Consumes jobs +- +- If chunk_size is set function consumes specified number of +- Finished tasks or less if sent_jobs_ids queue became empty. +- If chunk_size is None function consumes jobs until +- sent_jobs_ids queue became empty. +- Jobs with statuses Cancelled, Abandoned, Terminated will be +- resent and their ids added to sent_jobs_ids queue +- +- :param chunk_size: size of consuming chunk +- :return: generator on job results +- """ +- logger.debug("Consuming jobs started") +- +- jobs_to_consume = [] +- while not self.sent_jobs.empty(): +- job = self.sent_jobs.get() +- jobs_to_consume.append(job) +- +- if chunk_size is not None: +- chunk_size -= 1 +- if chunk_size <= 0: +- break +- +- for ready_job in distributed.as_completed(jobs_to_consume): +- results = ready_job.result() +- self.sent_jobs_count -= 1 +- +- for result in results: +- (node_id, serialized), exc = result +- logger.debug("Got ready task for node %s, serialized: %s, " +- "error: %s", node_id, serialized, exc) +- if exc is not None: +- raise exc +- yield node_id, serialized +- +- logger.debug("Consuming jobs finished") +- +- def _get_formatter_context(self, task_context, formatter_contexts_idx, +- node_id): +- try: +- return formatter_contexts_idx[node_id] +- except KeyError: +- pass +- +- logger.debug("Calculating formatter context for node %s", node_id) +- formatter_context = task_context.get_formatter_context( +- node_id) +- # Settings file is already sent to the workers +- formatter_context.pop('SETTINGS', None) +- formatter_contexts_idx[node_id] = formatter_context +- +- return formatter_context +- +- def _upload_nailgun_code(self, job_cluster): +- """Creates zip of current nailgun code and uploads it to workers +- +- TODO(akislitsky): add workers scope when it will be implemented +- in the distributed library +- +- :param job_cluster: distributed.Client +- """ +- logger.debug("Compressing nailgun code") +- file_dir = os.path.dirname(__file__) +- nailgun_root_dir = os.path.realpath(os.path.join(file_dir, '..', '..')) +- archive = os.path.join(tempfile.gettempdir(), 'nailgun') +- result = shutil.make_archive(archive, 'zip', nailgun_root_dir, +- 'nailgun') +- logger.debug("Nailgun code saved to: %s", result) +- +- logger.debug("Uploading nailgun archive %s to workers", result) +- job_cluster.upload_file(result) +- +- def _scatter_data(self, job_cluster, context, workers): +- logger.debug("Scattering data to workers started") +- shared_data = {'context': context, 'settings_config': settings.config} +- scattered = job_cluster.scatter(shared_data, broadcast=True, +- workers=workers) +- # Waiting data is scattered to workers +- distributed.wait(scattered.values()) +- logger.debug("Scattering data to workers finished") +- +- return scattered +- +- def _get_allowed_nodes_statuses(self, context): +- """Extracts node statuses that allows distributed serialization""" +- common = context.new.get('common', {}) +- cluster = common.get('cluster', {}) +- logger.debug("Getting allowed nodes statuses to use as serialization " +- "workers for cluster %s", cluster.get('id')) +- check_fields = { +- 'ds_use_ready': consts.NODE_STATUSES.ready, +- 'ds_use_provisioned': consts.NODE_STATUSES.provisioned, +- 'ds_use_discover': consts.NODE_STATUSES.discover, +- 'ds_use_error': consts.NODE_STATUSES.error +- } +- statuses = set() +- for field, node_status in check_fields.items(): +- if common.get(field): +- statuses.add(node_status) +- +- logger.debug("Allowed nodes statuses to use as serialization workers " +- "for cluster %s are: %s", cluster.get('id'), statuses) +- return statuses +- +- def _get_allowed_nodes_ips(self, context): +- """Filters online nodes from cluster by their status +- +- In the cluster settings we select nodes statuses allowed for +- using in the distributed serialization. Accordingly to selected +- statuses nodes are going to be filtered. +- +- :param context: TransactionContext +- :return: set of allowed nodes ips +- """ +- ips = set() +- allowed_statuses = self._get_allowed_nodes_statuses(context) +- for node in six.itervalues(context.new.get('nodes', {})): +- if node.get('status') in allowed_statuses: +- ips.add(node.get('ip')) +- ips.add(settings.MASTER_IP) +- return ips +- +- def _get_allowed_workers(self, job_cluster, allowed_ips): +- """Calculates workers addresses for distributed serialization +- +- Only workers that placed on the allowed nodes must be selected +- for the serialization. +- +- :param job_cluster: distributed.Client +- :param allowed_ips: allowed for serialization nodes ips +- :return: list of workers addresses in format 'ip:port' +- """ +- logger.debug("Getting allowed workers") +- workers = {} +- +- # Worker has address like tcp://ip:port +- info = job_cluster.scheduler_info() +- for worker_addr in six.iterkeys(info['workers']): +- ip_port = worker_addr.split('//')[1] +- ip = ip_port.split(':')[0] +- if ip not in allowed_ips: +- continue +- try: +- pool = workers[ip] +- pool.add(ip_port) +- except KeyError: +- workers[ip] = set([ip_port]) +- +- return list(toolz.itertoolz.concat(six.itervalues(workers))) +- +- def execute(self, context, _, tasks): +- """Executes task serialization on distributed nodes +- +- :param context: the transaction context +- :param _: serializers factory +- :param tasks: the tasks to serialize +- :return sequence of serialized tasks +- """ +- logger.debug("Performing distributed tasks processing") +- sched_address = '{0}:{1}'.format(settings.MASTER_IP, +- settings.LCM_DS_JOB_SHEDULER_PORT) +- job_cluster = distributed.Client(sched_address) +- +- allowed_ips = self._get_allowed_nodes_ips(context) +- workers = self._get_allowed_workers(job_cluster, allowed_ips) +- logger.debug("Allowed workers list for serialization: %s", workers) +- workers_ips = set([ip_port.split(':')[0] for ip_port in workers]) +- logger.debug("Allowed workers ips list for serialization: %s", +- workers_ips) +- +- task_context = Context(context) +- formatter_contexts_idx = {} +- workers_num = len(workers) +- max_jobs_in_queue = workers_num * settings.LCM_DS_NODE_LOAD_COEFF +- logger.debug("Max jobs allowed in queue: %s", max_jobs_in_queue) +- +- start = datetime.datetime.utcnow() +- tasks_count = 0 +- +- try: +- self._upload_nailgun_code(job_cluster) +- scattered = self._scatter_data(job_cluster, context, workers) +- +- for tasks_chunk in toolz.partition_all( +- settings.LCM_DS_TASKS_PER_JOB, tasks): +- +- formatter_contexts_for_tasks = {} +- +- # Collecting required contexts for tasks +- for task in tasks_chunk: +- node_id, task_data = task +- formatter_context = self._get_formatter_context( +- task_context, formatter_contexts_idx, node_id) +- if node_id not in formatter_contexts_for_tasks: +- formatter_contexts_for_tasks[node_id] = \ +- formatter_context +- +- logger.debug("Submitting job for tasks chunk: %s", tasks_chunk) +- job = job_cluster.submit( +- _distributed_serialize_tasks_for_node, +- formatter_contexts_for_tasks, +- tasks_chunk, +- scattered, +- workers=workers_ips +- ) +- +- self.sent_jobs.put(job) +- self.sent_jobs_count += 1 +- +- # We are limit the max number of tasks by the number of nodes +- # which are used in the serialization +- if self.sent_jobs_count >= max_jobs_in_queue: +- for result in self._consume_jobs(chunk_size=workers_num): +- tasks_count += 1 +- yield result +- +- # We have no tasks any more but have unconsumed jobs +- for result in self._consume_jobs(): +- tasks_count += 1 +- yield result +- finally: +- end = datetime.datetime.utcnow() +- logger.debug("Distributed tasks processing finished. " +- "Total time: %s. Tasks processed: %s", +- end - start, tasks_count) +- job_cluster.shutdown() +- +- +-def is_distributed_processing_enabled(context): +- common = context.new.get('common', {}) +- return common.get('serialization_policy') == \ +- consts.SERIALIZATION_POLICY.distributed +- +- +-def get_processing_policy(context): +- if is_distributed_processing_enabled(context): +- return DistributedProcessingPolicy() ++def get_concurrency_policy(): + cpu_num = settings.LCM_SERIALIZERS_CONCURRENCY_FACTOR + if not cpu_num: + try: +@@ -471,7 +162,7 @@ class TransactionSerializer(object): + # ids of nodes in this group and how many nodes in this group can fail + # and deployment will not be interrupted + self.fault_tolerance_groups = [] +- self.processing_policy = get_processing_policy(context) ++ self.concurrency_policy = get_concurrency_policy() + + @classmethod + def serialize(cls, context, tasks, resolver): +@@ -525,7 +216,7 @@ class TransactionSerializer(object): + :param tasks: the deployment tasks + :return the mapping tasks per node + """ +- serialized = self.processing_policy.execute( ++ serialized = self.concurrency_policy.execute( + self.context, + self.serializer_factory_class, + self.expand_tasks(tasks) +diff --git a/nailgun/nailgun/orchestrator/deployment_serializers.py b/nailgun/nailgun/orchestrator/deployment_serializers.py +index 99d52a2..9473733 100644 +--- a/nailgun/nailgun/orchestrator/deployment_serializers.py ++++ b/nailgun/nailgun/orchestrator/deployment_serializers.py +@@ -221,7 +221,6 @@ class DeploymentMultinodeSerializer(object): + 'role': role, + 'vms_conf': node.vms_conf, + 'fail_if_error': role in self.critical_roles, +- 'ip': node.ip, + # TODO(eli): need to remove, requried for the fake thread only + 'online': node.online, + } +diff --git a/nailgun/nailgun/settings.py b/nailgun/nailgun/settings.py +index 1f9222b..eee08c9 100644 +--- a/nailgun/nailgun/settings.py ++++ b/nailgun/nailgun/settings.py +@@ -38,21 +38,7 @@ class NailgunSettings(object): + if test_config: + settings_files.append(test_config) + +- # If settings.yaml doesn't exist we should have default +- # config structure. Nailgun without settings is used +- # when we distribute source code to the workers for +- # distributed serialization +- self.config = { +- 'VERSION': {}, +- 'DATABASE': { +- 'engine': 'postgresql', +- 'name': '', +- 'host': '', +- 'port': '0', +- 'user': '', +- 'passwd': '' +- } +- } ++ self.config = {} + for sf in settings_files: + try: + logger.debug("Trying to read config file %s" % sf) +@@ -61,9 +47,9 @@ class NailgunSettings(object): + logger.error("Error while reading config file %s: %s" % + (sf, str(e))) + +- self.config['VERSION']['api'] = self.config.get('API') ++ self.config['VERSION']['api'] = self.config['API'] + self.config['VERSION']['feature_groups'] = \ +- self.config.get('FEATURE_GROUPS') ++ self.config['FEATURE_GROUPS'] + + fuel_release = self.get_file_content(consts.FUEL_RELEASE_FILE) + if fuel_release: +@@ -75,7 +61,7 @@ class NailgunSettings(object): + self.config['VERSION']['openstack_version'] = \ + fuel_openstack_version + +- if int(self.config.get("DEVELOPMENT", 0)): ++ if int(self.config.get("DEVELOPMENT")): + logger.info("DEVELOPMENT MODE ON:") + here = os.path.abspath( + os.path.join(os.path.dirname(__file__), '..') +diff --git a/nailgun/nailgun/settings.yaml b/nailgun/nailgun/settings.yaml +index c2faaa8..92b3518 100644 +--- a/nailgun/nailgun/settings.yaml ++++ b/nailgun/nailgun/settings.yaml +@@ -177,15 +177,6 @@ YAQL_MEMORY_QUOTA: 104857600 + + LCM_CHECK_TASK_VERSION: False + +-# Coefficient for calculation max jobs queue length. If jobs number reaches the +-# len(nodes) * load_coef we stop generate and start consume of jobs. +-LCM_DS_NODE_LOAD_COEFF: 2 +-# Port of dask-scheduler on the master node +-LCM_DS_JOB_SHEDULER_PORT: 8002 +-# Size of tasks chunk sending to the distributed worker +-LCM_DS_TASKS_PER_JOB: 100 +- +- + DPDK_MAX_CPUS_PER_NIC: 4 + + TRUNCATE_LOG_ENTRIES: 100 +diff --git a/nailgun/nailgun/statistics/fuel_statistics/installation_info.py b/nailgun/nailgun/statistics/fuel_statistics/installation_info.py +index d024cdd..482d8c5 100644 +--- a/nailgun/nailgun/statistics/fuel_statistics/installation_info.py ++++ b/nailgun/nailgun/statistics/fuel_statistics/installation_info.py +@@ -56,16 +56,6 @@ class InstallationInfo(object): + 'propagate_task_deploy', None), + WhiteListRule(('common', 'security_groups', 'value'), + 'security_groups', None), +- WhiteListRule(('common', 'serialization_policy', 'value'), +- 'serialization_policy', None), +- WhiteListRule(('common', 'ds_use_discover', 'value'), +- 'ds_use_discover', None), +- WhiteListRule(('common', 'ds_use_provisioned', 'value'), +- 'ds_use_provisioned', None), +- WhiteListRule(('common', 'ds_use_ready', 'value'), +- 'ds_use_ready', None), +- WhiteListRule(('common', 'ds_use_error', 'value'), +- 'ds_use_error', None), + WhiteListRule(('corosync', 'verified', 'value'), + 'corosync_verified', None), + +diff --git a/nailgun/nailgun/test/integration/test_cluster_changes_handler.py b/nailgun/nailgun/test/integration/test_cluster_changes_handler.py +index b8f4e98..a8661ab 100644 +--- a/nailgun/nailgun/test/integration/test_cluster_changes_handler.py ++++ b/nailgun/nailgun/test/integration/test_cluster_changes_handler.py +@@ -187,7 +187,6 @@ class TestHandlers(BaseIntegrationTest): + 'fail_if_error': is_critical, + 'vms_conf': [], + 'fqdn': 'node-%d.%s' % (node.id, settings.DNS_DOMAIN), +- 'ip': node.ip, + + 'network_data': { + 'eth1': { +@@ -604,7 +603,6 @@ class TestHandlers(BaseIntegrationTest): + 'online': node.online, + 'fail_if_error': is_critical, + 'fqdn': 'node-%d.%s' % (node.id, settings.DNS_DOMAIN), +- 'ip': node.ip, + 'priority': 100, + 'vms_conf': [], + 'network_scheme': { +@@ -1098,7 +1096,6 @@ class TestHandlers(BaseIntegrationTest): + 'fail_if_error': is_critical, + 'fqdn': 'node-%d.%s' % (node.id, settings.DNS_DOMAIN), + 'priority': 100, +- 'ip': node.ip, + 'vms_conf': [], + + 'network_scheme': { +diff --git a/nailgun/nailgun/test/unit/test_lcm_transaction_serializer.py b/nailgun/nailgun/test/unit/test_lcm_transaction_serializer.py +index 9751d55..6450e19 100644 +--- a/nailgun/nailgun/test/unit/test_lcm_transaction_serializer.py ++++ b/nailgun/nailgun/test/unit/test_lcm_transaction_serializer.py +@@ -14,24 +14,20 @@ + # License for the specific language governing permissions and limitations + # under the License. + +-import copy +-import exceptions + import mock + import multiprocessing.dummy + + from nailgun import consts + from nailgun import errors + from nailgun import lcm +-from nailgun.lcm import TransactionContext +-from nailgun.settings import settings +-from nailgun.test.base import BaseTestCase + from nailgun.utils.resolvers import TagResolver + ++from nailgun.test.base import BaseUnitTest + +-class TestTransactionSerializer(BaseTestCase): ++ ++class TestTransactionSerializer(BaseUnitTest): + @classmethod + def setUpClass(cls): +- super(TestTransactionSerializer, cls).setUpClass() + cls.tasks = [ + { + 'id': 'task1', 'roles': ['controller'], +@@ -466,344 +462,3 @@ class TestTransactionSerializer(BaseTestCase): + 9, + lcm.TransactionSerializer.calculate_fault_tolerance('-1 ', 10) + ) +- +- def _get_context_for_distributed_serialization(self): +- new = copy.deepcopy(self.context.new) +- new['common']['serialization_policy'] = \ +- consts.SERIALIZATION_POLICY.distributed +- return TransactionContext(new) +- +- @mock.patch('nailgun.lcm.transaction_serializer.distributed.wait') +- @mock.patch('nailgun.lcm.transaction_serializer.distributed.as_completed') +- def test_distributed_serialization(self, _, as_completed): +- context = self._get_context_for_distributed_serialization() +- +- with mock.patch( +- 'nailgun.lcm.transaction_serializer.distributed.Client' +- ) as job_cluster: +- job = mock.Mock() +- job.result.return_value = [ +- (('1', {"id": "task1", "type": "skipped"}), None) +- ] +- +- submit = mock.Mock() +- submit.return_value = job +- +- as_completed.return_value = [job] +- +- job_cluster.return_value.submit = submit +- job_cluster.return_value.scheduler_info.return_value = \ +- {'workers': {'tcp://worker': {}}} +- +- lcm.TransactionSerializer.serialize( +- context, self.tasks, self.resolver) +- self.assertTrue(submit.called) +- # 4 controller task + 1 compute + 1 cinder +- self.assertTrue(6, submit.call_count) +- +- @mock.patch('nailgun.lcm.transaction_serializer.distributed.wait') +- @mock.patch('nailgun.lcm.transaction_serializer.distributed.as_completed') +- @mock.patch('nailgun.lcm.transaction_serializer.' +- 'DistributedProcessingPolicy._get_formatter_context') +- def test_distributed_serialization_workers_scope(self, formatter_context, +- as_completed, _): +- context = self._get_context_for_distributed_serialization() +- +- node_id = '1' +- task = { +- 'id': 'task1', 'roles': ['controller'], +- 'type': 'puppet', 'version': '2.0.0' +- } +- +- with mock.patch( +- 'nailgun.lcm.transaction_serializer.distributed.Client' +- ) as job_cluster: +- +- # Mocking job processing +- job = mock.Mock() +- job.result.return_value = [((node_id, task), None)] +- +- submit = mock.Mock() +- submit.return_value = job +- +- as_completed.return_value = [job] +- +- scatter = mock.Mock() +- job_cluster.return_value.scatter = scatter +- +- job_cluster.return_value.scatter.return_value = {} +- job_cluster.return_value.submit = submit +- +- formatter_context.return_value = {node_id: {}} +- +- # Configuring available workers +- job_cluster.return_value.scheduler_info.return_value = \ +- { +- 'workers': { +- 'tcp://{0}'.format(settings.MASTER_IP): {}, +- 'tcp://192.168.0.1:33334': {}, +- 'tcp://127.0.0.2:33335': {}, +- } +- } +- +- # Performing serialization +- lcm.TransactionSerializer.serialize( +- context, [task], self.resolver +- ) +- +- # Checking data is scattered only to expected workers +- scatter.assert_called_once() +- scatter.assert_called_with( +- {'context': context, 'settings_config': settings.config}, +- broadcast=True, +- workers=[settings.MASTER_IP] +- ) +- +- # Checking submit job only to expected workers +- submit.assert_called_once() +- serializer = lcm.transaction_serializer +- submit.assert_called_with( +- serializer._distributed_serialize_tasks_for_node, +- {node_id: formatter_context()}, +- ((node_id, task),), +- job_cluster().scatter(), +- workers=set([settings.MASTER_IP]) +- ) +- +- def test_distributed_serialization_get_allowed_nodes_ips(self): +- policy = lcm.transaction_serializer.DistributedProcessingPolicy() +- +- context_data = { +- 'common': { +- 'serialization_policy': +- consts.SERIALIZATION_POLICY.distributed, +- 'ds_use_error': True, +- 'ds_use_provisioned': True, +- 'ds_use_discover': True, +- 'ds_use_ready': False +- }, +- 'nodes': { +- '1': {'status': consts.NODE_STATUSES.error, +- 'ip': '10.20.0.3'}, +- '2': {'status': consts.NODE_STATUSES.provisioned, +- 'ip': '10.20.0.4'}, +- '3': {'status': consts.NODE_STATUSES.discover, +- 'ip': '10.20.0.5'}, +- '4': {'status': consts.NODE_STATUSES.ready, +- 'ip': '10.20.0.6'}, +- } +- } +- +- actual = policy._get_allowed_nodes_ips( +- TransactionContext(context_data)) +- self.assertItemsEqual( +- [settings.MASTER_IP, '10.20.0.3', '10.20.0.4', '10.20.0.5'], +- actual +- ) +- +- def test_distributed_serialization_get_allowed_nodes_statuses(self): +- policy = lcm.transaction_serializer.DistributedProcessingPolicy() +- context_data = {} +- actual = policy._get_allowed_nodes_statuses( +- TransactionContext(context_data)) +- self.assertItemsEqual([], actual) +- +- context_data['common'] = { +- 'ds_use_discover': False, +- 'ds_use_provisioned': False, +- 'ds_use_error': False, +- 'ds_use_ready': False +- } +- actual = policy._get_allowed_nodes_statuses( +- TransactionContext(context_data)) +- self.assertItemsEqual([], actual) +- +- context_data['common']['ds_use_discover'] = True +- actual = policy._get_allowed_nodes_statuses( +- TransactionContext(context_data)) +- expected = [consts.NODE_STATUSES.discover] +- self.assertItemsEqual(expected, actual) +- +- context_data['common']['ds_use_provisioned'] = True +- actual = policy._get_allowed_nodes_statuses( +- TransactionContext(context_data)) +- expected = [consts.NODE_STATUSES.discover, +- consts.NODE_STATUSES.provisioned] +- self.assertItemsEqual(expected, actual) +- +- context_data['common']['ds_use_error'] = True +- actual = policy._get_allowed_nodes_statuses( +- TransactionContext(context_data)) +- expected = [consts.NODE_STATUSES.discover, +- consts.NODE_STATUSES.provisioned, +- consts.NODE_STATUSES.error] +- self.assertItemsEqual(expected, actual) +- +- context_data['common']['ds_use_ready'] = True +- actual = policy._get_allowed_nodes_statuses( +- TransactionContext(context_data)) +- expected = [consts.NODE_STATUSES.discover, +- consts.NODE_STATUSES.provisioned, +- consts.NODE_STATUSES.error, +- consts.NODE_STATUSES.ready] +- self.assertItemsEqual(expected, actual) +- +- def test_distributed_serialization_get_allowed_workers(self): +- policy = lcm.transaction_serializer.DistributedProcessingPolicy() +- +- with mock.patch( +- 'nailgun.lcm.transaction_serializer.distributed.Client' +- ) as job_cluster: +- job_cluster.scheduler_info.return_value = \ +- {'workers': { +- 'tcp://10.20.0.2:1': {}, +- 'tcp://10.20.0.2:2': {}, +- 'tcp://10.20.0.3:1': {}, +- 'tcp://10.20.0.3:2': {}, +- 'tcp://10.20.0.3:3': {}, +- 'tcp://10.20.0.4:1': {}, +- 'tcp://10.20.0.5:1': {} +- }} +- allowed_ips = set(['10.20.0.2', '10.20.0.3', '10.20.0.5']) +- +- expected = ['10.20.0.2:1', '10.20.0.2:2', '10.20.0.3:1', +- '10.20.0.3:2', '10.20.0.3:3', '10.20.0.5:1'] +- actual = policy._get_allowed_workers(job_cluster, allowed_ips) +- self.assertItemsEqual(expected, actual) +- +- def test_distributed_serialization_serialize_task(self): +- task = { +- 'id': 'task1', 'roles': ['controller'], +- 'type': 'puppet', 'version': '2.0.0', +- 'parameters': { +- 'master_ip': '{MN_IP}', +- 'host': {'yaql_exp': '$.public_ssl.hostname'}, +- 'attr': {'yaql_exp': '$node.attributes.a_str'} +- } +- } +- +- formatter_contexts_idx = { +- '1': {'MN_IP': '10.0.0.1'}, +- '2': {} +- } +- scattered_data = { +- 'settings_config': settings.config, +- 'context': self.context +- } +- +- serializer = lcm.transaction_serializer +- actual = serializer._distributed_serialize_tasks_for_node( +- formatter_contexts_idx, [('1', task), ('2', task)], scattered_data) +- +- expected = [ +- ( +- ( +- '1', +- { +- 'id': 'task1', +- 'type': 'puppet', +- 'parameters': { +- 'cwd': '/', +- 'master_ip': '10.0.0.1', +- 'host': 'localhost', +- 'attr': 'text1' +- }, +- 'fail_on_error': True +- } +- ), +- None +- ), +- ( +- ( +- '2', +- { +- 'id': 'task1', +- 'type': 'puppet', +- 'parameters': { +- 'cwd': '/', +- 'master_ip': '{MN_IP}', +- 'host': 'localhost', +- 'attr': 'text2' +- }, +- 'fail_on_error': True +- } +- ), +- None +- ) +- ] +- +- self.assertItemsEqual(expected, actual) +- +- def test_distributed_serialization_serialize_task_failure(self): +- task = { +- 'id': 'task1', 'roles': ['controller'], +- 'type': 'puppet', 'version': '2.0.0', +- 'parameters': { +- 'fake': {'yaql_exp': '$.some.fake_param'} +- } +- } +- +- formatter_contexts_idx = {'2': {}} +- scattered_data = { +- 'settings_config': settings.config, +- 'context': self.context +- } +- +- serializer = lcm.transaction_serializer +- result = serializer._distributed_serialize_tasks_for_node( +- formatter_contexts_idx, [('2', task)], scattered_data) +- (_, __), err = result[0] +- self.assertIsInstance(err, exceptions.KeyError) +- +- +-class TestConcurrencyPolicy(BaseTestCase): +- +- @mock.patch( +- 'nailgun.lcm.transaction_serializer.multiprocessing.cpu_count', +- return_value=1 +- ) +- def test_one_cpu(self, cpu_count): +- policy = lcm.transaction_serializer.get_processing_policy( +- lcm.TransactionContext({})) +- self.assertIsInstance( +- policy, +- lcm.transaction_serializer.SingleWorkerConcurrencyPolicy +- ) +- self.assertTrue(cpu_count.is_called) +- +- @mock.patch( +- 'nailgun.lcm.transaction_serializer.multiprocessing.cpu_count', +- return_value=0 +- ) +- def test_zero_cpu(self, cpu_count): +- policy = lcm.transaction_serializer.get_processing_policy( +- lcm.TransactionContext({})) +- self.assertIsInstance( +- policy, +- lcm.transaction_serializer.SingleWorkerConcurrencyPolicy +- ) +- self.assertTrue(cpu_count.is_called) +- +- @mock.patch( +- 'nailgun.lcm.transaction_serializer.multiprocessing.cpu_count', +- side_effect=NotImplementedError +- ) +- def test_cpu_count_not_implemented(self, cpu_count): +- policy = lcm.transaction_serializer.get_processing_policy( +- lcm.TransactionContext({})) +- self.assertIsInstance( +- policy, +- lcm.transaction_serializer.SingleWorkerConcurrencyPolicy +- ) +- self.assertTrue(cpu_count.is_called) +- +- def test_distributed_serialization_enabled_in_cluster(self): +- context_data = {'common': { +- 'serialization_policy': consts.SERIALIZATION_POLICY.distributed +- }} +- policy = lcm.transaction_serializer.get_processing_policy( +- lcm.TransactionContext(context_data)) +- self.assertIsInstance( +- policy, +- lcm.transaction_serializer.DistributedProcessingPolicy +- ) +diff --git a/nailgun/requirements.txt b/nailgun/requirements.txt +index c702e8a..96bed25 100644 +--- a/nailgun/requirements.txt ++++ b/nailgun/requirements.txt +@@ -47,5 +47,3 @@ stevedore>=1.5.0 + # See: https://bugs.launchpad.net/fuel/+bug/1519727 + setuptools<=18.5 + yaql>=1.0.0 +-# Distributed nodes serialization +-distributed==1.16.0 |