aboutsummaryrefslogtreecommitdiffstats
path: root/build/f_repos/patch/fuel-web/0002-Revert-Distributed-serialization-implementation.patch
diff options
context:
space:
mode:
Diffstat (limited to 'build/f_repos/patch/fuel-web/0002-Revert-Distributed-serialization-implementation.patch')
-rw-r--r--build/f_repos/patch/fuel-web/0002-Revert-Distributed-serialization-implementation.patch1006
1 files changed, 1006 insertions, 0 deletions
diff --git a/build/f_repos/patch/fuel-web/0002-Revert-Distributed-serialization-implementation.patch b/build/f_repos/patch/fuel-web/0002-Revert-Distributed-serialization-implementation.patch
new file mode 100644
index 000000000..152197e74
--- /dev/null
+++ b/build/f_repos/patch/fuel-web/0002-Revert-Distributed-serialization-implementation.patch
@@ -0,0 +1,1006 @@
+From: Alexandru Avadanii <Alexandru.Avadanii@enea.com>
+Date: Sun, 26 Mar 2017 19:16:47 +0200
+Subject: [PATCH] Revert "Distributed serialization implementation"
+
+This reverts commit f3c440f76961ef161ab6650b3a41a3c775073ba0.
+---
+ nailgun/nailgun/consts.py | 4 -
+ nailgun/nailgun/fixtures/openstack.yaml | 49 ---
+ nailgun/nailgun/lcm/task_serializer.py | 15 +-
+ nailgun/nailgun/lcm/transaction_serializer.py | 317 +------------------
+ .../nailgun/orchestrator/deployment_serializers.py | 1 -
+ nailgun/nailgun/settings.py | 22 +-
+ nailgun/nailgun/settings.yaml | 9 -
+ .../fuel_statistics/installation_info.py | 10 -
+ .../integration/test_cluster_changes_handler.py | 3 -
+ .../test/unit/test_lcm_transaction_serializer.py | 351 +--------------------
+ nailgun/requirements.txt | 2 -
+ 11 files changed, 14 insertions(+), 769 deletions(-)
+
+diff --git a/nailgun/nailgun/consts.py b/nailgun/nailgun/consts.py
+index 1a2f09f..5490d6c 100644
+--- a/nailgun/nailgun/consts.py
++++ b/nailgun/nailgun/consts.py
+@@ -528,7 +528,3 @@ HYPERVISORS = Enum(
+ )
+
+ DPDK_DRIVER_IN_SRIOV_CASE = 'vfio-pci'
+-
+-SERIALIZATION_POLICY = Enum(
+- 'distributed'
+-)
+diff --git a/nailgun/nailgun/fixtures/openstack.yaml b/nailgun/nailgun/fixtures/openstack.yaml
+index 5557710..8b4d763 100644
+--- a/nailgun/nailgun/fixtures/openstack.yaml
++++ b/nailgun/nailgun/fixtures/openstack.yaml
+@@ -1114,55 +1114,6 @@
+ group: "security"
+ weight: 20
+ type: "radio"
+- serialization_policy:
+- value: "default"
+- values:
+- - data: "default"
+- label: "Default serialization"
+- description: "Run serialization on the master node only"
+- - data: "distributed"
+- label: "Distributed serialization"
+- description: "Run serialization on the master and environment nodes. Nodes for serialization are selected only form that environment for wich serialization is performing."
+- label: "Serialization policy"
+- group: "general"
+- weight: 30
+- type: "radio"
+- ds_use_discover:
+- group: "general"
+- label: "Use discovered nodes as workers for serialization"
+- type: "checkbox"
+- value: true
+- weight: 31
+- restrictions:
+- - condition: "settings:common.serialization_policy.value != 'distributed'"
+- action: "hide"
+- ds_use_provisioned:
+- group: "general"
+- label: "Use provisioned nodes as workers for serialization"
+- type: "checkbox"
+- value: true
+- weight: 32
+- restrictions:
+- - condition: "settings:common.serialization_policy.value != 'distributed'"
+- action: "hide"
+- ds_use_error:
+- group: "general"
+- label: "Use nodes in error state as workers for serialization"
+- type: "checkbox"
+- value: true
+- weight: 33
+- restrictions:
+- - condition: "settings:common.serialization_policy.value != 'distributed'"
+- action: "hide"
+- ds_use_ready:
+- group: "general"
+- label: "Use ready nodes as workers for serialization"
+- type: "checkbox"
+- value: false
+- weight: 34
+- restrictions:
+- - condition: "settings:common.serialization_policy.value != 'distributed'"
+- action: "hide"
+ public_network_assignment:
+ metadata:
+ weight: 10
+diff --git a/nailgun/nailgun/lcm/task_serializer.py b/nailgun/nailgun/lcm/task_serializer.py
+index 0d70f36..c004115 100644
+--- a/nailgun/nailgun/lcm/task_serializer.py
++++ b/nailgun/nailgun/lcm/task_serializer.py
+@@ -110,8 +110,6 @@ class Context(object):
+ return evaluate
+
+ def get_formatter_context(self, node_id):
+- # TODO(akislitsky) remove formatter context from the
+- # tasks serialization workflow
+ data = self._transaction.get_new_data(node_id)
+ return {
+ 'CLUSTER_ID': data.get('cluster', {}).get('id'),
+@@ -149,14 +147,9 @@ class DeploymentTaskSerializer(object):
+ :return: the result
+ """
+
+- def serialize(self, node_id, formatter_context=None):
+- """Serialize task in expected by orchestrator format
++ def serialize(self, node_id):
++ """Serialize task in expected by orchestrator format.
+
+- If serialization is performed on the remote worker
+- we should pass formatter_context parameter with values
+- from the master node settings
+-
+- :param formatter_context: formatter context
+ :param node_id: the target node_id
+ """
+
+@@ -164,12 +157,10 @@ class DeploymentTaskSerializer(object):
+ "serialize task %s for node %s",
+ self.task_template['id'], node_id
+ )
+- formatter_context = formatter_context \
+- or self.context.get_formatter_context(node_id)
+ task = utils.traverse(
+ self.task_template,
+ utils.text_format_safe,
+- formatter_context,
++ self.context.get_formatter_context(node_id),
+ {
+ 'yaql_exp': self.context.get_yaql_interpreter(
+ node_id, self.task_template['id'])
+diff --git a/nailgun/nailgun/lcm/transaction_serializer.py b/nailgun/nailgun/lcm/transaction_serializer.py
+index 32953c1..c02146b 100644
+--- a/nailgun/nailgun/lcm/transaction_serializer.py
++++ b/nailgun/nailgun/lcm/transaction_serializer.py
+@@ -14,21 +14,13 @@
+ # License for the specific language governing permissions and limitations
+ # under the License.
+
+-import datetime
++from distutils.version import StrictVersion
+ import multiprocessing
+-import os
+-from Queue import Queue
+-import shutil
+-import tempfile
+
+-import distributed
+-from distutils.version import StrictVersion
+ import six
+-import toolz
+
+ from nailgun import consts
+ from nailgun import errors
+-from nailgun.lcm.task_serializer import Context
+ from nailgun.lcm.task_serializer import TasksSerializersFactory
+ from nailgun.logger import logger
+ from nailgun.settings import settings
+@@ -136,308 +128,7 @@ class MultiProcessingConcurrencyPolicy(object):
+ pool.join()
+
+
+-def _distributed_serialize_tasks_for_node(formatter_contexts_idx,
+- node_and_tasks, scattered_data):
+- """Remote serialization call for DistributedProcessingPolicy
+-
+- Code of the function is copied to the workers and executed there, thus
+- we are including all required imports inside the function.
+-
+- :param formatter_contexts_idx: dict of formatter contexts with node_id
+- value as key
+- :param node_and_tasks: list of node_id, task_data tuples
+- :param scattered_data: feature object, that points to data copied to
+- workers
+- :return: [(node_id, serialized), error]
+- """
+-
+- try:
+- factory = TasksSerializersFactory(scattered_data['context'])
+-
+- # Restoring settings
+- settings.config = scattered_data['settings_config']
+- for k in formatter_contexts_idx:
+- formatter_contexts_idx[k]['SETTINGS'] = settings
+-
+- except Exception as e:
+- logger.exception("Distributed serialization failed")
+- return [((None, None), e)]
+-
+- result = []
+-
+- for node_and_task in node_and_tasks:
+-
+- node_id = None
+- try:
+- node_id, task = node_and_task
+-
+- logger.debug("Starting distributed node %s task %s serialization",
+- node_id, task['id'])
+-
+- formatter_context = formatter_contexts_idx[node_id]
+-
+- serializer = factory.create_serializer(task)
+- serialized = serializer.serialize(
+- node_id, formatter_context=formatter_context)
+-
+- logger.debug("Distributed node %s task %s serialization "
+- "result: %s", node_id, task['id'], serialized)
+-
+- result.append(((node_id, serialized), None))
+- except Exception as e:
+- logger.exception("Distributed serialization failed")
+- result.append(((node_id, None), e))
+- break
+-
+- logger.debug("Processed tasks count: %s", len(result))
+- return result
+-
+-
+-class DistributedProcessingPolicy(object):
+-
+- def __init__(self):
+- self.sent_jobs = Queue()
+- self.sent_jobs_count = 0
+-
+- def _consume_jobs(self, chunk_size=None):
+- """Consumes jobs
+-
+- If chunk_size is set function consumes specified number of
+- Finished tasks or less if sent_jobs_ids queue became empty.
+- If chunk_size is None function consumes jobs until
+- sent_jobs_ids queue became empty.
+- Jobs with statuses Cancelled, Abandoned, Terminated will be
+- resent and their ids added to sent_jobs_ids queue
+-
+- :param chunk_size: size of consuming chunk
+- :return: generator on job results
+- """
+- logger.debug("Consuming jobs started")
+-
+- jobs_to_consume = []
+- while not self.sent_jobs.empty():
+- job = self.sent_jobs.get()
+- jobs_to_consume.append(job)
+-
+- if chunk_size is not None:
+- chunk_size -= 1
+- if chunk_size <= 0:
+- break
+-
+- for ready_job in distributed.as_completed(jobs_to_consume):
+- results = ready_job.result()
+- self.sent_jobs_count -= 1
+-
+- for result in results:
+- (node_id, serialized), exc = result
+- logger.debug("Got ready task for node %s, serialized: %s, "
+- "error: %s", node_id, serialized, exc)
+- if exc is not None:
+- raise exc
+- yield node_id, serialized
+-
+- logger.debug("Consuming jobs finished")
+-
+- def _get_formatter_context(self, task_context, formatter_contexts_idx,
+- node_id):
+- try:
+- return formatter_contexts_idx[node_id]
+- except KeyError:
+- pass
+-
+- logger.debug("Calculating formatter context for node %s", node_id)
+- formatter_context = task_context.get_formatter_context(
+- node_id)
+- # Settings file is already sent to the workers
+- formatter_context.pop('SETTINGS', None)
+- formatter_contexts_idx[node_id] = formatter_context
+-
+- return formatter_context
+-
+- def _upload_nailgun_code(self, job_cluster):
+- """Creates zip of current nailgun code and uploads it to workers
+-
+- TODO(akislitsky): add workers scope when it will be implemented
+- in the distributed library
+-
+- :param job_cluster: distributed.Client
+- """
+- logger.debug("Compressing nailgun code")
+- file_dir = os.path.dirname(__file__)
+- nailgun_root_dir = os.path.realpath(os.path.join(file_dir, '..', '..'))
+- archive = os.path.join(tempfile.gettempdir(), 'nailgun')
+- result = shutil.make_archive(archive, 'zip', nailgun_root_dir,
+- 'nailgun')
+- logger.debug("Nailgun code saved to: %s", result)
+-
+- logger.debug("Uploading nailgun archive %s to workers", result)
+- job_cluster.upload_file(result)
+-
+- def _scatter_data(self, job_cluster, context, workers):
+- logger.debug("Scattering data to workers started")
+- shared_data = {'context': context, 'settings_config': settings.config}
+- scattered = job_cluster.scatter(shared_data, broadcast=True,
+- workers=workers)
+- # Waiting data is scattered to workers
+- distributed.wait(scattered.values())
+- logger.debug("Scattering data to workers finished")
+-
+- return scattered
+-
+- def _get_allowed_nodes_statuses(self, context):
+- """Extracts node statuses that allows distributed serialization"""
+- common = context.new.get('common', {})
+- cluster = common.get('cluster', {})
+- logger.debug("Getting allowed nodes statuses to use as serialization "
+- "workers for cluster %s", cluster.get('id'))
+- check_fields = {
+- 'ds_use_ready': consts.NODE_STATUSES.ready,
+- 'ds_use_provisioned': consts.NODE_STATUSES.provisioned,
+- 'ds_use_discover': consts.NODE_STATUSES.discover,
+- 'ds_use_error': consts.NODE_STATUSES.error
+- }
+- statuses = set()
+- for field, node_status in check_fields.items():
+- if common.get(field):
+- statuses.add(node_status)
+-
+- logger.debug("Allowed nodes statuses to use as serialization workers "
+- "for cluster %s are: %s", cluster.get('id'), statuses)
+- return statuses
+-
+- def _get_allowed_nodes_ips(self, context):
+- """Filters online nodes from cluster by their status
+-
+- In the cluster settings we select nodes statuses allowed for
+- using in the distributed serialization. Accordingly to selected
+- statuses nodes are going to be filtered.
+-
+- :param context: TransactionContext
+- :return: set of allowed nodes ips
+- """
+- ips = set()
+- allowed_statuses = self._get_allowed_nodes_statuses(context)
+- for node in six.itervalues(context.new.get('nodes', {})):
+- if node.get('status') in allowed_statuses:
+- ips.add(node.get('ip'))
+- ips.add(settings.MASTER_IP)
+- return ips
+-
+- def _get_allowed_workers(self, job_cluster, allowed_ips):
+- """Calculates workers addresses for distributed serialization
+-
+- Only workers that placed on the allowed nodes must be selected
+- for the serialization.
+-
+- :param job_cluster: distributed.Client
+- :param allowed_ips: allowed for serialization nodes ips
+- :return: list of workers addresses in format 'ip:port'
+- """
+- logger.debug("Getting allowed workers")
+- workers = {}
+-
+- # Worker has address like tcp://ip:port
+- info = job_cluster.scheduler_info()
+- for worker_addr in six.iterkeys(info['workers']):
+- ip_port = worker_addr.split('//')[1]
+- ip = ip_port.split(':')[0]
+- if ip not in allowed_ips:
+- continue
+- try:
+- pool = workers[ip]
+- pool.add(ip_port)
+- except KeyError:
+- workers[ip] = set([ip_port])
+-
+- return list(toolz.itertoolz.concat(six.itervalues(workers)))
+-
+- def execute(self, context, _, tasks):
+- """Executes task serialization on distributed nodes
+-
+- :param context: the transaction context
+- :param _: serializers factory
+- :param tasks: the tasks to serialize
+- :return sequence of serialized tasks
+- """
+- logger.debug("Performing distributed tasks processing")
+- sched_address = '{0}:{1}'.format(settings.MASTER_IP,
+- settings.LCM_DS_JOB_SHEDULER_PORT)
+- job_cluster = distributed.Client(sched_address)
+-
+- allowed_ips = self._get_allowed_nodes_ips(context)
+- workers = self._get_allowed_workers(job_cluster, allowed_ips)
+- logger.debug("Allowed workers list for serialization: %s", workers)
+- workers_ips = set([ip_port.split(':')[0] for ip_port in workers])
+- logger.debug("Allowed workers ips list for serialization: %s",
+- workers_ips)
+-
+- task_context = Context(context)
+- formatter_contexts_idx = {}
+- workers_num = len(workers)
+- max_jobs_in_queue = workers_num * settings.LCM_DS_NODE_LOAD_COEFF
+- logger.debug("Max jobs allowed in queue: %s", max_jobs_in_queue)
+-
+- start = datetime.datetime.utcnow()
+- tasks_count = 0
+-
+- try:
+- self._upload_nailgun_code(job_cluster)
+- scattered = self._scatter_data(job_cluster, context, workers)
+-
+- for tasks_chunk in toolz.partition_all(
+- settings.LCM_DS_TASKS_PER_JOB, tasks):
+-
+- formatter_contexts_for_tasks = {}
+-
+- # Collecting required contexts for tasks
+- for task in tasks_chunk:
+- node_id, task_data = task
+- formatter_context = self._get_formatter_context(
+- task_context, formatter_contexts_idx, node_id)
+- if node_id not in formatter_contexts_for_tasks:
+- formatter_contexts_for_tasks[node_id] = \
+- formatter_context
+-
+- logger.debug("Submitting job for tasks chunk: %s", tasks_chunk)
+- job = job_cluster.submit(
+- _distributed_serialize_tasks_for_node,
+- formatter_contexts_for_tasks,
+- tasks_chunk,
+- scattered,
+- workers=workers_ips
+- )
+-
+- self.sent_jobs.put(job)
+- self.sent_jobs_count += 1
+-
+- # We are limit the max number of tasks by the number of nodes
+- # which are used in the serialization
+- if self.sent_jobs_count >= max_jobs_in_queue:
+- for result in self._consume_jobs(chunk_size=workers_num):
+- tasks_count += 1
+- yield result
+-
+- # We have no tasks any more but have unconsumed jobs
+- for result in self._consume_jobs():
+- tasks_count += 1
+- yield result
+- finally:
+- end = datetime.datetime.utcnow()
+- logger.debug("Distributed tasks processing finished. "
+- "Total time: %s. Tasks processed: %s",
+- end - start, tasks_count)
+- job_cluster.shutdown()
+-
+-
+-def is_distributed_processing_enabled(context):
+- common = context.new.get('common', {})
+- return common.get('serialization_policy') == \
+- consts.SERIALIZATION_POLICY.distributed
+-
+-
+-def get_processing_policy(context):
+- if is_distributed_processing_enabled(context):
+- return DistributedProcessingPolicy()
++def get_concurrency_policy():
+ cpu_num = settings.LCM_SERIALIZERS_CONCURRENCY_FACTOR
+ if not cpu_num:
+ try:
+@@ -471,7 +162,7 @@ class TransactionSerializer(object):
+ # ids of nodes in this group and how many nodes in this group can fail
+ # and deployment will not be interrupted
+ self.fault_tolerance_groups = []
+- self.processing_policy = get_processing_policy(context)
++ self.concurrency_policy = get_concurrency_policy()
+
+ @classmethod
+ def serialize(cls, context, tasks, resolver):
+@@ -525,7 +216,7 @@ class TransactionSerializer(object):
+ :param tasks: the deployment tasks
+ :return the mapping tasks per node
+ """
+- serialized = self.processing_policy.execute(
++ serialized = self.concurrency_policy.execute(
+ self.context,
+ self.serializer_factory_class,
+ self.expand_tasks(tasks)
+diff --git a/nailgun/nailgun/orchestrator/deployment_serializers.py b/nailgun/nailgun/orchestrator/deployment_serializers.py
+index 99d52a2..9473733 100644
+--- a/nailgun/nailgun/orchestrator/deployment_serializers.py
++++ b/nailgun/nailgun/orchestrator/deployment_serializers.py
+@@ -221,7 +221,6 @@ class DeploymentMultinodeSerializer(object):
+ 'role': role,
+ 'vms_conf': node.vms_conf,
+ 'fail_if_error': role in self.critical_roles,
+- 'ip': node.ip,
+ # TODO(eli): need to remove, requried for the fake thread only
+ 'online': node.online,
+ }
+diff --git a/nailgun/nailgun/settings.py b/nailgun/nailgun/settings.py
+index 1f9222b..eee08c9 100644
+--- a/nailgun/nailgun/settings.py
++++ b/nailgun/nailgun/settings.py
+@@ -38,21 +38,7 @@ class NailgunSettings(object):
+ if test_config:
+ settings_files.append(test_config)
+
+- # If settings.yaml doesn't exist we should have default
+- # config structure. Nailgun without settings is used
+- # when we distribute source code to the workers for
+- # distributed serialization
+- self.config = {
+- 'VERSION': {},
+- 'DATABASE': {
+- 'engine': 'postgresql',
+- 'name': '',
+- 'host': '',
+- 'port': '0',
+- 'user': '',
+- 'passwd': ''
+- }
+- }
++ self.config = {}
+ for sf in settings_files:
+ try:
+ logger.debug("Trying to read config file %s" % sf)
+@@ -61,9 +47,9 @@ class NailgunSettings(object):
+ logger.error("Error while reading config file %s: %s" %
+ (sf, str(e)))
+
+- self.config['VERSION']['api'] = self.config.get('API')
++ self.config['VERSION']['api'] = self.config['API']
+ self.config['VERSION']['feature_groups'] = \
+- self.config.get('FEATURE_GROUPS')
++ self.config['FEATURE_GROUPS']
+
+ fuel_release = self.get_file_content(consts.FUEL_RELEASE_FILE)
+ if fuel_release:
+@@ -75,7 +61,7 @@ class NailgunSettings(object):
+ self.config['VERSION']['openstack_version'] = \
+ fuel_openstack_version
+
+- if int(self.config.get("DEVELOPMENT", 0)):
++ if int(self.config.get("DEVELOPMENT")):
+ logger.info("DEVELOPMENT MODE ON:")
+ here = os.path.abspath(
+ os.path.join(os.path.dirname(__file__), '..')
+diff --git a/nailgun/nailgun/settings.yaml b/nailgun/nailgun/settings.yaml
+index c2faaa8..92b3518 100644
+--- a/nailgun/nailgun/settings.yaml
++++ b/nailgun/nailgun/settings.yaml
+@@ -177,15 +177,6 @@ YAQL_MEMORY_QUOTA: 104857600
+
+ LCM_CHECK_TASK_VERSION: False
+
+-# Coefficient for calculation max jobs queue length. If jobs number reaches the
+-# len(nodes) * load_coef we stop generate and start consume of jobs.
+-LCM_DS_NODE_LOAD_COEFF: 2
+-# Port of dask-scheduler on the master node
+-LCM_DS_JOB_SHEDULER_PORT: 8002
+-# Size of tasks chunk sending to the distributed worker
+-LCM_DS_TASKS_PER_JOB: 100
+-
+-
+ DPDK_MAX_CPUS_PER_NIC: 4
+
+ TRUNCATE_LOG_ENTRIES: 100
+diff --git a/nailgun/nailgun/statistics/fuel_statistics/installation_info.py b/nailgun/nailgun/statistics/fuel_statistics/installation_info.py
+index d024cdd..482d8c5 100644
+--- a/nailgun/nailgun/statistics/fuel_statistics/installation_info.py
++++ b/nailgun/nailgun/statistics/fuel_statistics/installation_info.py
+@@ -56,16 +56,6 @@ class InstallationInfo(object):
+ 'propagate_task_deploy', None),
+ WhiteListRule(('common', 'security_groups', 'value'),
+ 'security_groups', None),
+- WhiteListRule(('common', 'serialization_policy', 'value'),
+- 'serialization_policy', None),
+- WhiteListRule(('common', 'ds_use_discover', 'value'),
+- 'ds_use_discover', None),
+- WhiteListRule(('common', 'ds_use_provisioned', 'value'),
+- 'ds_use_provisioned', None),
+- WhiteListRule(('common', 'ds_use_ready', 'value'),
+- 'ds_use_ready', None),
+- WhiteListRule(('common', 'ds_use_error', 'value'),
+- 'ds_use_error', None),
+ WhiteListRule(('corosync', 'verified', 'value'),
+ 'corosync_verified', None),
+
+diff --git a/nailgun/nailgun/test/integration/test_cluster_changes_handler.py b/nailgun/nailgun/test/integration/test_cluster_changes_handler.py
+index b8f4e98..a8661ab 100644
+--- a/nailgun/nailgun/test/integration/test_cluster_changes_handler.py
++++ b/nailgun/nailgun/test/integration/test_cluster_changes_handler.py
+@@ -187,7 +187,6 @@ class TestHandlers(BaseIntegrationTest):
+ 'fail_if_error': is_critical,
+ 'vms_conf': [],
+ 'fqdn': 'node-%d.%s' % (node.id, settings.DNS_DOMAIN),
+- 'ip': node.ip,
+
+ 'network_data': {
+ 'eth1': {
+@@ -604,7 +603,6 @@ class TestHandlers(BaseIntegrationTest):
+ 'online': node.online,
+ 'fail_if_error': is_critical,
+ 'fqdn': 'node-%d.%s' % (node.id, settings.DNS_DOMAIN),
+- 'ip': node.ip,
+ 'priority': 100,
+ 'vms_conf': [],
+ 'network_scheme': {
+@@ -1098,7 +1096,6 @@ class TestHandlers(BaseIntegrationTest):
+ 'fail_if_error': is_critical,
+ 'fqdn': 'node-%d.%s' % (node.id, settings.DNS_DOMAIN),
+ 'priority': 100,
+- 'ip': node.ip,
+ 'vms_conf': [],
+
+ 'network_scheme': {
+diff --git a/nailgun/nailgun/test/unit/test_lcm_transaction_serializer.py b/nailgun/nailgun/test/unit/test_lcm_transaction_serializer.py
+index 9751d55..6450e19 100644
+--- a/nailgun/nailgun/test/unit/test_lcm_transaction_serializer.py
++++ b/nailgun/nailgun/test/unit/test_lcm_transaction_serializer.py
+@@ -14,24 +14,20 @@
+ # License for the specific language governing permissions and limitations
+ # under the License.
+
+-import copy
+-import exceptions
+ import mock
+ import multiprocessing.dummy
+
+ from nailgun import consts
+ from nailgun import errors
+ from nailgun import lcm
+-from nailgun.lcm import TransactionContext
+-from nailgun.settings import settings
+-from nailgun.test.base import BaseTestCase
+ from nailgun.utils.resolvers import TagResolver
+
++from nailgun.test.base import BaseUnitTest
+
+-class TestTransactionSerializer(BaseTestCase):
++
++class TestTransactionSerializer(BaseUnitTest):
+ @classmethod
+ def setUpClass(cls):
+- super(TestTransactionSerializer, cls).setUpClass()
+ cls.tasks = [
+ {
+ 'id': 'task1', 'roles': ['controller'],
+@@ -466,344 +462,3 @@ class TestTransactionSerializer(BaseTestCase):
+ 9,
+ lcm.TransactionSerializer.calculate_fault_tolerance('-1 ', 10)
+ )
+-
+- def _get_context_for_distributed_serialization(self):
+- new = copy.deepcopy(self.context.new)
+- new['common']['serialization_policy'] = \
+- consts.SERIALIZATION_POLICY.distributed
+- return TransactionContext(new)
+-
+- @mock.patch('nailgun.lcm.transaction_serializer.distributed.wait')
+- @mock.patch('nailgun.lcm.transaction_serializer.distributed.as_completed')
+- def test_distributed_serialization(self, _, as_completed):
+- context = self._get_context_for_distributed_serialization()
+-
+- with mock.patch(
+- 'nailgun.lcm.transaction_serializer.distributed.Client'
+- ) as job_cluster:
+- job = mock.Mock()
+- job.result.return_value = [
+- (('1', {"id": "task1", "type": "skipped"}), None)
+- ]
+-
+- submit = mock.Mock()
+- submit.return_value = job
+-
+- as_completed.return_value = [job]
+-
+- job_cluster.return_value.submit = submit
+- job_cluster.return_value.scheduler_info.return_value = \
+- {'workers': {'tcp://worker': {}}}
+-
+- lcm.TransactionSerializer.serialize(
+- context, self.tasks, self.resolver)
+- self.assertTrue(submit.called)
+- # 4 controller task + 1 compute + 1 cinder
+- self.assertTrue(6, submit.call_count)
+-
+- @mock.patch('nailgun.lcm.transaction_serializer.distributed.wait')
+- @mock.patch('nailgun.lcm.transaction_serializer.distributed.as_completed')
+- @mock.patch('nailgun.lcm.transaction_serializer.'
+- 'DistributedProcessingPolicy._get_formatter_context')
+- def test_distributed_serialization_workers_scope(self, formatter_context,
+- as_completed, _):
+- context = self._get_context_for_distributed_serialization()
+-
+- node_id = '1'
+- task = {
+- 'id': 'task1', 'roles': ['controller'],
+- 'type': 'puppet', 'version': '2.0.0'
+- }
+-
+- with mock.patch(
+- 'nailgun.lcm.transaction_serializer.distributed.Client'
+- ) as job_cluster:
+-
+- # Mocking job processing
+- job = mock.Mock()
+- job.result.return_value = [((node_id, task), None)]
+-
+- submit = mock.Mock()
+- submit.return_value = job
+-
+- as_completed.return_value = [job]
+-
+- scatter = mock.Mock()
+- job_cluster.return_value.scatter = scatter
+-
+- job_cluster.return_value.scatter.return_value = {}
+- job_cluster.return_value.submit = submit
+-
+- formatter_context.return_value = {node_id: {}}
+-
+- # Configuring available workers
+- job_cluster.return_value.scheduler_info.return_value = \
+- {
+- 'workers': {
+- 'tcp://{0}'.format(settings.MASTER_IP): {},
+- 'tcp://192.168.0.1:33334': {},
+- 'tcp://127.0.0.2:33335': {},
+- }
+- }
+-
+- # Performing serialization
+- lcm.TransactionSerializer.serialize(
+- context, [task], self.resolver
+- )
+-
+- # Checking data is scattered only to expected workers
+- scatter.assert_called_once()
+- scatter.assert_called_with(
+- {'context': context, 'settings_config': settings.config},
+- broadcast=True,
+- workers=[settings.MASTER_IP]
+- )
+-
+- # Checking submit job only to expected workers
+- submit.assert_called_once()
+- serializer = lcm.transaction_serializer
+- submit.assert_called_with(
+- serializer._distributed_serialize_tasks_for_node,
+- {node_id: formatter_context()},
+- ((node_id, task),),
+- job_cluster().scatter(),
+- workers=set([settings.MASTER_IP])
+- )
+-
+- def test_distributed_serialization_get_allowed_nodes_ips(self):
+- policy = lcm.transaction_serializer.DistributedProcessingPolicy()
+-
+- context_data = {
+- 'common': {
+- 'serialization_policy':
+- consts.SERIALIZATION_POLICY.distributed,
+- 'ds_use_error': True,
+- 'ds_use_provisioned': True,
+- 'ds_use_discover': True,
+- 'ds_use_ready': False
+- },
+- 'nodes': {
+- '1': {'status': consts.NODE_STATUSES.error,
+- 'ip': '10.20.0.3'},
+- '2': {'status': consts.NODE_STATUSES.provisioned,
+- 'ip': '10.20.0.4'},
+- '3': {'status': consts.NODE_STATUSES.discover,
+- 'ip': '10.20.0.5'},
+- '4': {'status': consts.NODE_STATUSES.ready,
+- 'ip': '10.20.0.6'},
+- }
+- }
+-
+- actual = policy._get_allowed_nodes_ips(
+- TransactionContext(context_data))
+- self.assertItemsEqual(
+- [settings.MASTER_IP, '10.20.0.3', '10.20.0.4', '10.20.0.5'],
+- actual
+- )
+-
+- def test_distributed_serialization_get_allowed_nodes_statuses(self):
+- policy = lcm.transaction_serializer.DistributedProcessingPolicy()
+- context_data = {}
+- actual = policy._get_allowed_nodes_statuses(
+- TransactionContext(context_data))
+- self.assertItemsEqual([], actual)
+-
+- context_data['common'] = {
+- 'ds_use_discover': False,
+- 'ds_use_provisioned': False,
+- 'ds_use_error': False,
+- 'ds_use_ready': False
+- }
+- actual = policy._get_allowed_nodes_statuses(
+- TransactionContext(context_data))
+- self.assertItemsEqual([], actual)
+-
+- context_data['common']['ds_use_discover'] = True
+- actual = policy._get_allowed_nodes_statuses(
+- TransactionContext(context_data))
+- expected = [consts.NODE_STATUSES.discover]
+- self.assertItemsEqual(expected, actual)
+-
+- context_data['common']['ds_use_provisioned'] = True
+- actual = policy._get_allowed_nodes_statuses(
+- TransactionContext(context_data))
+- expected = [consts.NODE_STATUSES.discover,
+- consts.NODE_STATUSES.provisioned]
+- self.assertItemsEqual(expected, actual)
+-
+- context_data['common']['ds_use_error'] = True
+- actual = policy._get_allowed_nodes_statuses(
+- TransactionContext(context_data))
+- expected = [consts.NODE_STATUSES.discover,
+- consts.NODE_STATUSES.provisioned,
+- consts.NODE_STATUSES.error]
+- self.assertItemsEqual(expected, actual)
+-
+- context_data['common']['ds_use_ready'] = True
+- actual = policy._get_allowed_nodes_statuses(
+- TransactionContext(context_data))
+- expected = [consts.NODE_STATUSES.discover,
+- consts.NODE_STATUSES.provisioned,
+- consts.NODE_STATUSES.error,
+- consts.NODE_STATUSES.ready]
+- self.assertItemsEqual(expected, actual)
+-
+- def test_distributed_serialization_get_allowed_workers(self):
+- policy = lcm.transaction_serializer.DistributedProcessingPolicy()
+-
+- with mock.patch(
+- 'nailgun.lcm.transaction_serializer.distributed.Client'
+- ) as job_cluster:
+- job_cluster.scheduler_info.return_value = \
+- {'workers': {
+- 'tcp://10.20.0.2:1': {},
+- 'tcp://10.20.0.2:2': {},
+- 'tcp://10.20.0.3:1': {},
+- 'tcp://10.20.0.3:2': {},
+- 'tcp://10.20.0.3:3': {},
+- 'tcp://10.20.0.4:1': {},
+- 'tcp://10.20.0.5:1': {}
+- }}
+- allowed_ips = set(['10.20.0.2', '10.20.0.3', '10.20.0.5'])
+-
+- expected = ['10.20.0.2:1', '10.20.0.2:2', '10.20.0.3:1',
+- '10.20.0.3:2', '10.20.0.3:3', '10.20.0.5:1']
+- actual = policy._get_allowed_workers(job_cluster, allowed_ips)
+- self.assertItemsEqual(expected, actual)
+-
+- def test_distributed_serialization_serialize_task(self):
+- task = {
+- 'id': 'task1', 'roles': ['controller'],
+- 'type': 'puppet', 'version': '2.0.0',
+- 'parameters': {
+- 'master_ip': '{MN_IP}',
+- 'host': {'yaql_exp': '$.public_ssl.hostname'},
+- 'attr': {'yaql_exp': '$node.attributes.a_str'}
+- }
+- }
+-
+- formatter_contexts_idx = {
+- '1': {'MN_IP': '10.0.0.1'},
+- '2': {}
+- }
+- scattered_data = {
+- 'settings_config': settings.config,
+- 'context': self.context
+- }
+-
+- serializer = lcm.transaction_serializer
+- actual = serializer._distributed_serialize_tasks_for_node(
+- formatter_contexts_idx, [('1', task), ('2', task)], scattered_data)
+-
+- expected = [
+- (
+- (
+- '1',
+- {
+- 'id': 'task1',
+- 'type': 'puppet',
+- 'parameters': {
+- 'cwd': '/',
+- 'master_ip': '10.0.0.1',
+- 'host': 'localhost',
+- 'attr': 'text1'
+- },
+- 'fail_on_error': True
+- }
+- ),
+- None
+- ),
+- (
+- (
+- '2',
+- {
+- 'id': 'task1',
+- 'type': 'puppet',
+- 'parameters': {
+- 'cwd': '/',
+- 'master_ip': '{MN_IP}',
+- 'host': 'localhost',
+- 'attr': 'text2'
+- },
+- 'fail_on_error': True
+- }
+- ),
+- None
+- )
+- ]
+-
+- self.assertItemsEqual(expected, actual)
+-
+- def test_distributed_serialization_serialize_task_failure(self):
+- task = {
+- 'id': 'task1', 'roles': ['controller'],
+- 'type': 'puppet', 'version': '2.0.0',
+- 'parameters': {
+- 'fake': {'yaql_exp': '$.some.fake_param'}
+- }
+- }
+-
+- formatter_contexts_idx = {'2': {}}
+- scattered_data = {
+- 'settings_config': settings.config,
+- 'context': self.context
+- }
+-
+- serializer = lcm.transaction_serializer
+- result = serializer._distributed_serialize_tasks_for_node(
+- formatter_contexts_idx, [('2', task)], scattered_data)
+- (_, __), err = result[0]
+- self.assertIsInstance(err, exceptions.KeyError)
+-
+-
+-class TestConcurrencyPolicy(BaseTestCase):
+-
+- @mock.patch(
+- 'nailgun.lcm.transaction_serializer.multiprocessing.cpu_count',
+- return_value=1
+- )
+- def test_one_cpu(self, cpu_count):
+- policy = lcm.transaction_serializer.get_processing_policy(
+- lcm.TransactionContext({}))
+- self.assertIsInstance(
+- policy,
+- lcm.transaction_serializer.SingleWorkerConcurrencyPolicy
+- )
+- self.assertTrue(cpu_count.is_called)
+-
+- @mock.patch(
+- 'nailgun.lcm.transaction_serializer.multiprocessing.cpu_count',
+- return_value=0
+- )
+- def test_zero_cpu(self, cpu_count):
+- policy = lcm.transaction_serializer.get_processing_policy(
+- lcm.TransactionContext({}))
+- self.assertIsInstance(
+- policy,
+- lcm.transaction_serializer.SingleWorkerConcurrencyPolicy
+- )
+- self.assertTrue(cpu_count.is_called)
+-
+- @mock.patch(
+- 'nailgun.lcm.transaction_serializer.multiprocessing.cpu_count',
+- side_effect=NotImplementedError
+- )
+- def test_cpu_count_not_implemented(self, cpu_count):
+- policy = lcm.transaction_serializer.get_processing_policy(
+- lcm.TransactionContext({}))
+- self.assertIsInstance(
+- policy,
+- lcm.transaction_serializer.SingleWorkerConcurrencyPolicy
+- )
+- self.assertTrue(cpu_count.is_called)
+-
+- def test_distributed_serialization_enabled_in_cluster(self):
+- context_data = {'common': {
+- 'serialization_policy': consts.SERIALIZATION_POLICY.distributed
+- }}
+- policy = lcm.transaction_serializer.get_processing_policy(
+- lcm.TransactionContext(context_data))
+- self.assertIsInstance(
+- policy,
+- lcm.transaction_serializer.DistributedProcessingPolicy
+- )
+diff --git a/nailgun/requirements.txt b/nailgun/requirements.txt
+index c702e8a..96bed25 100644
+--- a/nailgun/requirements.txt
++++ b/nailgun/requirements.txt
+@@ -47,5 +47,3 @@ stevedore>=1.5.0
+ # See: https://bugs.launchpad.net/fuel/+bug/1519727
+ setuptools<=18.5
+ yaql>=1.0.0
+-# Distributed nodes serialization
+-distributed==1.16.0