From: Alexandru Avadanii <Alexandru.Avadanii@enea.com>
Date: Sun, 26 Mar 2017 19:16:47 +0200
Subject: [PATCH] Revert "Distributed serialization implementation"

This reverts commit f3c440f76961ef161ab6650b3a41a3c775073ba0.
---
 nailgun/nailgun/consts.py                          |   4 -
 nailgun/nailgun/fixtures/openstack.yaml            |  49 ---
 nailgun/nailgun/lcm/task_serializer.py             |  15 +-
 nailgun/nailgun/lcm/transaction_serializer.py      | 317 +------------------
 .../nailgun/orchestrator/deployment_serializers.py |   1 -
 nailgun/nailgun/settings.py                        |  22 +-
 nailgun/nailgun/settings.yaml                      |   9 -
 .../fuel_statistics/installation_info.py           |  10 -
 .../integration/test_cluster_changes_handler.py    |   3 -
 .../test/unit/test_lcm_transaction_serializer.py   | 351 +--------------------
 nailgun/requirements.txt                           |   2 -
 11 files changed, 14 insertions(+), 769 deletions(-)

diff --git a/nailgun/nailgun/consts.py b/nailgun/nailgun/consts.py
index 1a2f09f..5490d6c 100644
--- a/nailgun/nailgun/consts.py
+++ b/nailgun/nailgun/consts.py
@@ -528,7 +528,3 @@ HYPERVISORS = Enum(
 )

 DPDK_DRIVER_IN_SRIOV_CASE = 'vfio-pci'
-
-SERIALIZATION_POLICY = Enum(
-    'distributed'
-)
diff --git a/nailgun/nailgun/fixtures/openstack.yaml b/nailgun/nailgun/fixtures/openstack.yaml
index 5557710..8b4d763 100644
--- a/nailgun/nailgun/fixtures/openstack.yaml
+++ b/nailgun/nailgun/fixtures/openstack.yaml
@@ -1114,55 +1114,6 @@
             group: "security"
             weight: 20
             type: "radio"
-          serialization_policy:
-            value: "default"
-            values:
-              - data: "default"
-                label: "Default serialization"
-                description: "Run serialization on the master node only"
-              - data: "distributed"
-                label: "Distributed serialization"
-                description: "Run serialization on the master and environment nodes. Nodes for serialization are selected only form that environment for wich serialization is performing."
-            label: "Serialization policy"
-            group: "general"
-            weight: 30
-            type: "radio"
-          ds_use_discover:
-            group: "general"
-            label: "Use discovered nodes as workers for serialization"
-            type: "checkbox"
-            value: true
-            weight: 31
-            restrictions:
-              - condition: "settings:common.serialization_policy.value != 'distributed'"
-                action: "hide"
-          ds_use_provisioned:
-            group: "general"
-            label: "Use provisioned nodes as workers for serialization"
-            type: "checkbox"
-            value: true
-            weight: 32
-            restrictions:
-              - condition: "settings:common.serialization_policy.value != 'distributed'"
-                action: "hide"
-          ds_use_error:
-            group: "general"
-            label: "Use nodes in error state as workers for serialization"
-            type: "checkbox"
-            value: true
-            weight: 33
-            restrictions:
-              - condition: "settings:common.serialization_policy.value != 'distributed'"
-                action: "hide"
-          ds_use_ready:
-            group: "general"
-            label: "Use ready nodes as workers for serialization"
-            type: "checkbox"
-            value: false
-            weight: 34
-            restrictions:
-              - condition: "settings:common.serialization_policy.value != 'distributed'"
-                action: "hide"
         public_network_assignment:
           metadata:
             weight: 10
diff --git a/nailgun/nailgun/lcm/task_serializer.py b/nailgun/nailgun/lcm/task_serializer.py
index 0d70f36..c004115 100644
--- a/nailgun/nailgun/lcm/task_serializer.py
+++ b/nailgun/nailgun/lcm/task_serializer.py
@@ -110,8 +110,6 @@ class Context(object):
         return evaluate

     def get_formatter_context(self, node_id):
-        # TODO(akislitsky) remove formatter context from the
-        # tasks serialization workflow
         data = self._transaction.get_new_data(node_id)
         return {
             'CLUSTER_ID': data.get('cluster', {}).get('id'),
@@ -149,14 +147,9 @@ class DeploymentTaskSerializer(object):
         :return: the result
         """

-    def serialize(self, node_id, formatter_context=None):
-        """Serialize task in expected by orchestrator format
+    def serialize(self, node_id):
+        """Serialize task in expected by orchestrator format.

-        If serialization is performed on the remote worker
-        we should pass formatter_context parameter with values
-        from the master node settings
-
-        :param formatter_context: formatter context
         :param node_id: the target node_id
         """

@@ -164,12 +157,10 @@ class DeploymentTaskSerializer(object):
             "serialize task %s for node %s",
             self.task_template['id'], node_id
         )
-        formatter_context = formatter_context \
-            or self.context.get_formatter_context(node_id)
         task = utils.traverse(
             self.task_template,
             utils.text_format_safe,
-            formatter_context,
+            self.context.get_formatter_context(node_id),
             {
                 'yaql_exp': self.context.get_yaql_interpreter(
                     node_id, self.task_template['id'])
diff --git a/nailgun/nailgun/lcm/transaction_serializer.py b/nailgun/nailgun/lcm/transaction_serializer.py
index 32953c1..c02146b 100644
--- a/nailgun/nailgun/lcm/transaction_serializer.py
+++ b/nailgun/nailgun/lcm/transaction_serializer.py
@@ -14,21 +14,13 @@
 #    License for the specific language governing permissions and limitations
 #    under the License.

-import datetime
+from distutils.version import StrictVersion
 import multiprocessing
-import os
-from Queue import Queue
-import shutil
-import tempfile

-import distributed
-from distutils.version import StrictVersion
 import six
-import toolz

 from nailgun import consts
 from nailgun import errors
-from nailgun.lcm.task_serializer import Context
 from nailgun.lcm.task_serializer import TasksSerializersFactory
 from nailgun.logger import logger
 from nailgun.settings import settings
@@ -136,308 +128,7 @@ class MultiProcessingConcurrencyPolicy(object):
             pool.join()


-def _distributed_serialize_tasks_for_node(formatter_contexts_idx,
-                                          node_and_tasks, scattered_data):
-    """Remote serialization call for DistributedProcessingPolicy
-
-    Code of the function is copied to the workers and executed there, thus
-    we are including all required imports inside the function.
-
-    :param formatter_contexts_idx: dict of formatter contexts with node_id
-    value as key
-    :param node_and_tasks: list of node_id, task_data tuples
-    :param scattered_data: feature object, that points to data copied to
-    workers
-    :return: [(node_id, serialized), error]
-    """
-
-    try:
-        factory = TasksSerializersFactory(scattered_data['context'])
-
-        # Restoring settings
-        settings.config = scattered_data['settings_config']
-        for k in formatter_contexts_idx:
-            formatter_contexts_idx[k]['SETTINGS'] = settings
-
-    except Exception as e:
-        logger.exception("Distributed serialization failed")
-        return [((None, None), e)]
-
-    result = []
-
-    for node_and_task in node_and_tasks:
-
-        node_id = None
-        try:
-            node_id, task = node_and_task
-
-            logger.debug("Starting distributed node %s task %s serialization",
-                         node_id, task['id'])
-
-            formatter_context = formatter_contexts_idx[node_id]
-
-            serializer = factory.create_serializer(task)
-            serialized = serializer.serialize(
-                node_id, formatter_context=formatter_context)
-
-            logger.debug("Distributed node %s task %s serialization "
-                         "result: %s", node_id, task['id'], serialized)
-
-            result.append(((node_id, serialized), None))
-        except Exception as e:
-            logger.exception("Distributed serialization failed")
-            result.append(((node_id, None), e))
-            break
-
-    logger.debug("Processed tasks count: %s", len(result))
-    return result
-
-
-class DistributedProcessingPolicy(object):
-
-    def __init__(self):
-        self.sent_jobs = Queue()
-        self.sent_jobs_count = 0
-
-    def _consume_jobs(self, chunk_size=None):
-        """Consumes jobs
-
-        If chunk_size is set function consumes specified number of
-        Finished tasks or less if sent_jobs_ids queue became empty.
-        If chunk_size is None function consumes jobs until
-        sent_jobs_ids queue became empty.
-        Jobs with statuses Cancelled, Abandoned, Terminated will be
-        resent and their ids added to sent_jobs_ids queue
-
-        :param chunk_size: size of consuming chunk
-        :return: generator on job results
-        """
-        logger.debug("Consuming jobs started")
-
-        jobs_to_consume = []
-        while not self.sent_jobs.empty():
-            job = self.sent_jobs.get()
-            jobs_to_consume.append(job)
-
-            if chunk_size is not None:
-                chunk_size -= 1
-                if chunk_size <= 0:
-                    break
-
-        for ready_job in distributed.as_completed(jobs_to_consume):
-            results = ready_job.result()
-            self.sent_jobs_count -= 1
-
-            for result in results:
-                (node_id, serialized), exc = result
-                logger.debug("Got ready task for node %s, serialized: %s, "
-                             "error: %s", node_id, serialized, exc)
-                if exc is not None:
-                    raise exc
-                yield node_id, serialized
-
-        logger.debug("Consuming jobs finished")
-
-    def _get_formatter_context(self, task_context, formatter_contexts_idx,
-                               node_id):
-        try:
-            return formatter_contexts_idx[node_id]
-        except KeyError:
-            pass
-
-        logger.debug("Calculating formatter context for node %s", node_id)
-        formatter_context = task_context.get_formatter_context(
-            node_id)
-        # Settings file is already sent to the workers
-        formatter_context.pop('SETTINGS', None)
-        formatter_contexts_idx[node_id] = formatter_context
-
-        return formatter_context
-
-    def _upload_nailgun_code(self, job_cluster):
-        """Creates zip of current nailgun code and uploads it to workers
-
-        TODO(akislitsky): add workers scope when it will be implemented
-        in the distributed library
-
-        :param job_cluster: distributed.Client
-        """
-        logger.debug("Compressing nailgun code")
-        file_dir = os.path.dirname(__file__)
-        nailgun_root_dir = os.path.realpath(os.path.join(file_dir, '..', '..'))
-        archive = os.path.join(tempfile.gettempdir(), 'nailgun')
-        result = shutil.make_archive(archive, 'zip', nailgun_root_dir,
-                                     'nailgun')
-        logger.debug("Nailgun code saved to: %s", result)
-
-        logger.debug("Uploading nailgun archive %s to workers", result)
-        job_cluster.upload_file(result)
-
-    def _scatter_data(self, job_cluster, context, workers):
-        logger.debug("Scattering data to workers started")
-        shared_data = {'context': context, 'settings_config': settings.config}
-        scattered = job_cluster.scatter(shared_data, broadcast=True,
-                                        workers=workers)
-        # Waiting data is scattered to workers
-        distributed.wait(scattered.values())
-        logger.debug("Scattering data to workers finished")
-
-        return scattered
-
-    def _get_allowed_nodes_statuses(self, context):
-        """Extracts node statuses that allows distributed serialization"""
-        common = context.new.get('common', {})
-        cluster = common.get('cluster', {})
-        logger.debug("Getting allowed nodes statuses to use as serialization "
-                     "workers for cluster %s", cluster.get('id'))
-        check_fields = {
-            'ds_use_ready': consts.NODE_STATUSES.ready,
-            'ds_use_provisioned': consts.NODE_STATUSES.provisioned,
-            'ds_use_discover': consts.NODE_STATUSES.discover,
-            'ds_use_error': consts.NODE_STATUSES.error
-        }
-        statuses = set()
-        for field, node_status in check_fields.items():
-            if common.get(field):
-                statuses.add(node_status)
-
-        logger.debug("Allowed nodes statuses to use as serialization workers "
-                     "for cluster %s are: %s", cluster.get('id'), statuses)
-        return statuses
-
-    def _get_allowed_nodes_ips(self, context):
-        """Filters online nodes from cluster by their status
-
-        In the cluster settings we select nodes statuses allowed for
-        using in the distributed serialization. Accordingly to selected
-        statuses nodes are going to be filtered.
-
-        :param context: TransactionContext
-        :return: set of allowed nodes ips
-        """
-        ips = set()
-        allowed_statuses = self._get_allowed_nodes_statuses(context)
-        for node in six.itervalues(context.new.get('nodes', {})):
-            if node.get('status') in allowed_statuses:
-                ips.add(node.get('ip'))
-        ips.add(settings.MASTER_IP)
-        return ips
-
-    def _get_allowed_workers(self, job_cluster, allowed_ips):
-        """Calculates workers addresses for distributed serialization
-
-        Only workers that placed on the allowed nodes must be selected
-        for the serialization.
-
-        :param job_cluster: distributed.Client
-        :param allowed_ips: allowed for serialization nodes ips
-        :return: list of workers addresses in format 'ip:port'
-        """
-        logger.debug("Getting allowed workers")
-        workers = {}
-
-        # Worker has address like tcp://ip:port
-        info = job_cluster.scheduler_info()
-        for worker_addr in six.iterkeys(info['workers']):
-            ip_port = worker_addr.split('//')[1]
-            ip = ip_port.split(':')[0]
-            if ip not in allowed_ips:
-                continue
-            try:
-                pool = workers[ip]
-                pool.add(ip_port)
-            except KeyError:
-                workers[ip] = set([ip_port])
-
-        return list(toolz.itertoolz.concat(six.itervalues(workers)))
-
-    def execute(self, context, _, tasks):
-        """Executes task serialization on distributed nodes
-
-        :param context: the transaction context
-        :param _: serializers factory
-        :param tasks: the tasks to serialize
-        :return sequence of serialized tasks
-        """
-        logger.debug("Performing distributed tasks processing")
-        sched_address = '{0}:{1}'.format(settings.MASTER_IP,
-                                         settings.LCM_DS_JOB_SHEDULER_PORT)
-        job_cluster = distributed.Client(sched_address)
-
-        allowed_ips = self._get_allowed_nodes_ips(context)
-        workers = self._get_allowed_workers(job_cluster, allowed_ips)
-        logger.debug("Allowed workers list for serialization: %s", workers)
-        workers_ips = set([ip_port.split(':')[0] for ip_port in workers])
-        logger.debug("Allowed workers ips list for serialization: %s",
-                     workers_ips)
-
-        task_context = Context(context)
-        formatter_contexts_idx = {}
-        workers_num = len(workers)
-        max_jobs_in_queue = workers_num * settings.LCM_DS_NODE_LOAD_COEFF
-        logger.debug("Max jobs allowed in queue: %s", max_jobs_in_queue)
-
-        start = datetime.datetime.utcnow()
-        tasks_count = 0
-
-        try:
-            self._upload_nailgun_code(job_cluster)
-            scattered = self._scatter_data(job_cluster, context, workers)
-
-            for tasks_chunk in toolz.partition_all(
-                    settings.LCM_DS_TASKS_PER_JOB, tasks):
-
-                formatter_contexts_for_tasks = {}
-
-                # Collecting required contexts for tasks
-                for task in tasks_chunk:
-                    node_id, task_data = task
-                    formatter_context = self._get_formatter_context(
-                        task_context, formatter_contexts_idx, node_id)
-                    if node_id not in formatter_contexts_for_tasks:
-                        formatter_contexts_for_tasks[node_id] = \
-                            formatter_context
-
-                logger.debug("Submitting job for tasks chunk: %s", tasks_chunk)
-                job = job_cluster.submit(
-                    _distributed_serialize_tasks_for_node,
-                    formatter_contexts_for_tasks,
-                    tasks_chunk,
-                    scattered,
-                    workers=workers_ips
-                )
-
-                self.sent_jobs.put(job)
-                self.sent_jobs_count += 1
-
-                # We are limit the max number of tasks by the number of nodes
-                # which are used in the serialization
-                if self.sent_jobs_count >= max_jobs_in_queue:
-                    for result in self._consume_jobs(chunk_size=workers_num):
-                        tasks_count += 1
-                        yield result
-
-            # We have no tasks any more but have unconsumed jobs
-            for result in self._consume_jobs():
-                tasks_count += 1
-                yield result
-        finally:
-            end = datetime.datetime.utcnow()
-            logger.debug("Distributed tasks processing finished. "
-                         "Total time: %s. Tasks processed: %s",
-                         end - start, tasks_count)
-            job_cluster.shutdown()
-
-
-def is_distributed_processing_enabled(context):
-    common = context.new.get('common', {})
-    return common.get('serialization_policy') == \
-        consts.SERIALIZATION_POLICY.distributed
-
-
-def get_processing_policy(context):
-    if is_distributed_processing_enabled(context):
-        return DistributedProcessingPolicy()
+def get_concurrency_policy():
     cpu_num = settings.LCM_SERIALIZERS_CONCURRENCY_FACTOR
     if not cpu_num:
         try:
@@ -471,7 +162,7 @@ class TransactionSerializer(object):
         # ids of nodes in this group and how many nodes in this group can fail
         # and deployment will not be interrupted
         self.fault_tolerance_groups = []
-        self.processing_policy = get_processing_policy(context)
+        self.concurrency_policy = get_concurrency_policy()

     @classmethod
     def serialize(cls, context, tasks, resolver):
@@ -525,7 +216,7 @@ class TransactionSerializer(object):
         :param tasks: the deployment tasks
         :return the mapping tasks per node
         """
-        serialized = self.processing_policy.execute(
+        serialized = self.concurrency_policy.execute(
             self.context,
             self.serializer_factory_class,
             self.expand_tasks(tasks)
diff --git a/nailgun/nailgun/orchestrator/deployment_serializers.py b/nailgun/nailgun/orchestrator/deployment_serializers.py
index 99d52a2..9473733 100644
--- a/nailgun/nailgun/orchestrator/deployment_serializers.py
+++ b/nailgun/nailgun/orchestrator/deployment_serializers.py
@@ -221,7 +221,6 @@ class DeploymentMultinodeSerializer(object):
             'role': role,
             'vms_conf': node.vms_conf,
             'fail_if_error': role in self.critical_roles,
-            'ip': node.ip,
             # TODO(eli): need to remove, requried for the fake thread only
             'online': node.online,
         }
diff --git a/nailgun/nailgun/settings.py b/nailgun/nailgun/settings.py
index 1f9222b..eee08c9 100644
--- a/nailgun/nailgun/settings.py
+++ b/nailgun/nailgun/settings.py
@@ -38,21 +38,7 @@ class NailgunSettings(object):
         if test_config:
             settings_files.append(test_config)

-        # If settings.yaml doesn't exist we should have default
-        # config structure. Nailgun without settings is used
-        # when we distribute source code to the workers for
-        # distributed serialization
-        self.config = {
-            'VERSION': {},
-            'DATABASE': {
-                'engine': 'postgresql',
-                'name': '',
-                'host': '',
-                'port': '0',
-                'user': '',
-                'passwd': ''
-            }
-        }
+        self.config = {}
         for sf in settings_files:
             try:
                 logger.debug("Trying to read config file %s" % sf)
@@ -61,9 +47,9 @@ class NailgunSettings(object):
                 logger.error("Error while reading config file %s: %s" %
                              (sf, str(e)))

-        self.config['VERSION']['api'] = self.config.get('API')
+        self.config['VERSION']['api'] = self.config['API']
         self.config['VERSION']['feature_groups'] = \
-            self.config.get('FEATURE_GROUPS')
+            self.config['FEATURE_GROUPS']

         fuel_release = self.get_file_content(consts.FUEL_RELEASE_FILE)
         if fuel_release:
@@ -75,7 +61,7 @@ class NailgunSettings(object):
             self.config['VERSION']['openstack_version'] = \
                 fuel_openstack_version

-        if int(self.config.get("DEVELOPMENT", 0)):
+        if int(self.config.get("DEVELOPMENT")):
             logger.info("DEVELOPMENT MODE ON:")
             here = os.path.abspath(
                 os.path.join(os.path.dirname(__file__), '..')
diff --git a/nailgun/nailgun/settings.yaml b/nailgun/nailgun/settings.yaml
index c2faaa8..92b3518 100644
--- a/nailgun/nailgun/settings.yaml
+++ b/nailgun/nailgun/settings.yaml
@@ -177,15 +177,6 @@ YAQL_MEMORY_QUOTA: 104857600

 LCM_CHECK_TASK_VERSION: False

-# Coefficient for calculation max jobs queue length. If jobs number reaches the
-# len(nodes) * load_coef we stop generate and start consume of jobs.
-LCM_DS_NODE_LOAD_COEFF: 2
-# Port of dask-scheduler on the master node
-LCM_DS_JOB_SHEDULER_PORT: 8002
-# Size of tasks chunk sending to the distributed worker
-LCM_DS_TASKS_PER_JOB: 100
-
-
 DPDK_MAX_CPUS_PER_NIC: 4

 TRUNCATE_LOG_ENTRIES: 100
diff --git a/nailgun/nailgun/statistics/fuel_statistics/installation_info.py b/nailgun/nailgun/statistics/fuel_statistics/installation_info.py
index d024cdd..482d8c5 100644
--- a/nailgun/nailgun/statistics/fuel_statistics/installation_info.py
+++ b/nailgun/nailgun/statistics/fuel_statistics/installation_info.py
@@ -56,16 +56,6 @@ class InstallationInfo(object):
                       'propagate_task_deploy', None),
         WhiteListRule(('common', 'security_groups', 'value'),
                       'security_groups', None),
-        WhiteListRule(('common', 'serialization_policy', 'value'),
-                      'serialization_policy', None),
-        WhiteListRule(('common', 'ds_use_discover', 'value'),
-                      'ds_use_discover', None),
-        WhiteListRule(('common', 'ds_use_provisioned', 'value'),
-                      'ds_use_provisioned', None),
-        WhiteListRule(('common', 'ds_use_ready', 'value'),
-                      'ds_use_ready', None),
-        WhiteListRule(('common', 'ds_use_error', 'value'),
-                      'ds_use_error', None),
         WhiteListRule(('corosync', 'verified', 'value'),
                       'corosync_verified', None),

diff --git a/nailgun/nailgun/test/integration/test_cluster_changes_handler.py b/nailgun/nailgun/test/integration/test_cluster_changes_handler.py
index b8f4e98..a8661ab 100644
--- a/nailgun/nailgun/test/integration/test_cluster_changes_handler.py
+++ b/nailgun/nailgun/test/integration/test_cluster_changes_handler.py
@@ -187,7 +187,6 @@ class TestHandlers(BaseIntegrationTest):
                     'fail_if_error': is_critical,
                     'vms_conf': [],
                     'fqdn': 'node-%d.%s' % (node.id, settings.DNS_DOMAIN),
-                    'ip': node.ip,

                     'network_data': {
                         'eth1': {
@@ -604,7 +603,6 @@ class TestHandlers(BaseIntegrationTest):
                     'online': node.online,
                     'fail_if_error': is_critical,
                     'fqdn': 'node-%d.%s' % (node.id, settings.DNS_DOMAIN),
-                    'ip': node.ip,
                     'priority': 100,
                     'vms_conf': [],
                     'network_scheme': {
@@ -1098,7 +1096,6 @@ class TestHandlers(BaseIntegrationTest):
                     'fail_if_error': is_critical,
                     'fqdn': 'node-%d.%s' % (node.id, settings.DNS_DOMAIN),
                     'priority': 100,
-                    'ip': node.ip,
                     'vms_conf': [],

                     'network_scheme': {
diff --git a/nailgun/nailgun/test/unit/test_lcm_transaction_serializer.py b/nailgun/nailgun/test/unit/test_lcm_transaction_serializer.py
index 9751d55..6450e19 100644
--- a/nailgun/nailgun/test/unit/test_lcm_transaction_serializer.py
+++ b/nailgun/nailgun/test/unit/test_lcm_transaction_serializer.py
@@ -14,24 +14,20 @@
 #    License for the specific language governing permissions and limitations
 #    under the License.

-import copy
-import exceptions
 import mock
 import multiprocessing.dummy

 from nailgun import consts
 from nailgun import errors
 from nailgun import lcm
-from nailgun.lcm import TransactionContext
-from nailgun.settings import settings
-from nailgun.test.base import BaseTestCase
 from nailgun.utils.resolvers import TagResolver

+from nailgun.test.base import BaseUnitTest

-class TestTransactionSerializer(BaseTestCase):
+
+class TestTransactionSerializer(BaseUnitTest):
     @classmethod
     def setUpClass(cls):
-        super(TestTransactionSerializer, cls).setUpClass()
         cls.tasks = [
             {
                 'id': 'task1', 'roles': ['controller'],
@@ -466,344 +462,3 @@ class TestTransactionSerializer(BaseTestCase):
             9,
             lcm.TransactionSerializer.calculate_fault_tolerance('-1 ', 10)
         )
-
-    def _get_context_for_distributed_serialization(self):
-        new = copy.deepcopy(self.context.new)
-        new['common']['serialization_policy'] = \
-            consts.SERIALIZATION_POLICY.distributed
-        return TransactionContext(new)
-
-    @mock.patch('nailgun.lcm.transaction_serializer.distributed.wait')
-    @mock.patch('nailgun.lcm.transaction_serializer.distributed.as_completed')
-    def test_distributed_serialization(self, _, as_completed):
-        context = self._get_context_for_distributed_serialization()
-
-        with mock.patch(
-                'nailgun.lcm.transaction_serializer.distributed.Client'
-        ) as job_cluster:
-            job = mock.Mock()
-            job.result.return_value = [
-                (('1', {"id": "task1", "type": "skipped"}), None)
-            ]
-
-            submit = mock.Mock()
-            submit.return_value = job
-
-            as_completed.return_value = [job]
-
-            job_cluster.return_value.submit = submit
-            job_cluster.return_value.scheduler_info.return_value = \
-                {'workers': {'tcp://worker': {}}}
-
-            lcm.TransactionSerializer.serialize(
-                context, self.tasks, self.resolver)
-            self.assertTrue(submit.called)
-            # 4 controller task + 1 compute + 1 cinder
-            self.assertTrue(6, submit.call_count)
-
-    @mock.patch('nailgun.lcm.transaction_serializer.distributed.wait')
-    @mock.patch('nailgun.lcm.transaction_serializer.distributed.as_completed')
-    @mock.patch('nailgun.lcm.transaction_serializer.'
-                'DistributedProcessingPolicy._get_formatter_context')
-    def test_distributed_serialization_workers_scope(self, formatter_context,
-                                                     as_completed, _):
-        context = self._get_context_for_distributed_serialization()
-
-        node_id = '1'
-        task = {
-            'id': 'task1', 'roles': ['controller'],
-            'type': 'puppet', 'version': '2.0.0'
-        }
-
-        with mock.patch(
-                'nailgun.lcm.transaction_serializer.distributed.Client'
-        ) as job_cluster:
-
-            # Mocking job processing
-            job = mock.Mock()
-            job.result.return_value = [((node_id, task), None)]
-
-            submit = mock.Mock()
-            submit.return_value = job
-
-            as_completed.return_value = [job]
-
-            scatter = mock.Mock()
-            job_cluster.return_value.scatter = scatter
-
-            job_cluster.return_value.scatter.return_value = {}
-            job_cluster.return_value.submit = submit
-
-            formatter_context.return_value = {node_id: {}}
-
-            # Configuring available workers
-            job_cluster.return_value.scheduler_info.return_value = \
-                {
-                    'workers': {
-                        'tcp://{0}'.format(settings.MASTER_IP): {},
-                        'tcp://192.168.0.1:33334': {},
-                        'tcp://127.0.0.2:33335': {},
-                    }
-                }
-
-            # Performing serialization
-            lcm.TransactionSerializer.serialize(
-                context, [task], self.resolver
-            )
-
-            # Checking data is scattered only to expected workers
-            scatter.assert_called_once()
-            scatter.assert_called_with(
-                {'context': context, 'settings_config': settings.config},
-                broadcast=True,
-                workers=[settings.MASTER_IP]
-            )
-
-            # Checking submit job only to expected workers
-            submit.assert_called_once()
-            serializer = lcm.transaction_serializer
-            submit.assert_called_with(
-                serializer._distributed_serialize_tasks_for_node,
-                {node_id: formatter_context()},
-                ((node_id, task),),
-                job_cluster().scatter(),
-                workers=set([settings.MASTER_IP])
-            )
-
-    def test_distributed_serialization_get_allowed_nodes_ips(self):
-        policy = lcm.transaction_serializer.DistributedProcessingPolicy()
-
-        context_data = {
-            'common': {
-                'serialization_policy':
-                    consts.SERIALIZATION_POLICY.distributed,
-                'ds_use_error': True,
-                'ds_use_provisioned': True,
-                'ds_use_discover': True,
-                'ds_use_ready': False
-            },
-            'nodes': {
-                '1': {'status': consts.NODE_STATUSES.error,
-                      'ip': '10.20.0.3'},
-                '2': {'status': consts.NODE_STATUSES.provisioned,
-                      'ip': '10.20.0.4'},
-                '3': {'status': consts.NODE_STATUSES.discover,
-                      'ip': '10.20.0.5'},
-                '4': {'status': consts.NODE_STATUSES.ready,
-                      'ip': '10.20.0.6'},
-            }
-        }
-
-        actual = policy._get_allowed_nodes_ips(
-            TransactionContext(context_data))
-        self.assertItemsEqual(
-            [settings.MASTER_IP, '10.20.0.3', '10.20.0.4', '10.20.0.5'],
-            actual
-        )
-
-    def test_distributed_serialization_get_allowed_nodes_statuses(self):
-        policy = lcm.transaction_serializer.DistributedProcessingPolicy()
-        context_data = {}
-        actual = policy._get_allowed_nodes_statuses(
-            TransactionContext(context_data))
-        self.assertItemsEqual([], actual)
-
-        context_data['common'] = {
-            'ds_use_discover': False,
-            'ds_use_provisioned': False,
-            'ds_use_error': False,
-            'ds_use_ready': False
-        }
-        actual = policy._get_allowed_nodes_statuses(
-            TransactionContext(context_data))
-        self.assertItemsEqual([], actual)
-
-        context_data['common']['ds_use_discover'] = True
-        actual = policy._get_allowed_nodes_statuses(
-            TransactionContext(context_data))
-        expected = [consts.NODE_STATUSES.discover]
-        self.assertItemsEqual(expected, actual)
-
-        context_data['common']['ds_use_provisioned'] = True
-        actual = policy._get_allowed_nodes_statuses(
-            TransactionContext(context_data))
-        expected = [consts.NODE_STATUSES.discover,
-                    consts.NODE_STATUSES.provisioned]
-        self.assertItemsEqual(expected, actual)
-
-        context_data['common']['ds_use_error'] = True
-        actual = policy._get_allowed_nodes_statuses(
-            TransactionContext(context_data))
-        expected = [consts.NODE_STATUSES.discover,
-                    consts.NODE_STATUSES.provisioned,
-                    consts.NODE_STATUSES.error]
-        self.assertItemsEqual(expected, actual)
-
-        context_data['common']['ds_use_ready'] = True
-        actual = policy._get_allowed_nodes_statuses(
-            TransactionContext(context_data))
-        expected = [consts.NODE_STATUSES.discover,
-                    consts.NODE_STATUSES.provisioned,
-                    consts.NODE_STATUSES.error,
-                    consts.NODE_STATUSES.ready]
-        self.assertItemsEqual(expected, actual)
-
-    def test_distributed_serialization_get_allowed_workers(self):
-        policy = lcm.transaction_serializer.DistributedProcessingPolicy()
-
-        with mock.patch(
-                'nailgun.lcm.transaction_serializer.distributed.Client'
-        ) as job_cluster:
-            job_cluster.scheduler_info.return_value = \
-                {'workers': {
-                    'tcp://10.20.0.2:1': {},
-                    'tcp://10.20.0.2:2': {},
-                    'tcp://10.20.0.3:1': {},
-                    'tcp://10.20.0.3:2': {},
-                    'tcp://10.20.0.3:3': {},
-                    'tcp://10.20.0.4:1': {},
-                    'tcp://10.20.0.5:1': {}
-                }}
-            allowed_ips = set(['10.20.0.2', '10.20.0.3', '10.20.0.5'])
-
-            expected = ['10.20.0.2:1', '10.20.0.2:2', '10.20.0.3:1',
-                        '10.20.0.3:2', '10.20.0.3:3', '10.20.0.5:1']
-            actual = policy._get_allowed_workers(job_cluster, allowed_ips)
-            self.assertItemsEqual(expected, actual)
-
-    def test_distributed_serialization_serialize_task(self):
-        task = {
-            'id': 'task1', 'roles': ['controller'],
-            'type': 'puppet', 'version': '2.0.0',
-            'parameters': {
-                'master_ip': '{MN_IP}',
-                'host': {'yaql_exp': '$.public_ssl.hostname'},
-                'attr': {'yaql_exp': '$node.attributes.a_str'}
-            }
-        }
-
-        formatter_contexts_idx = {
-            '1': {'MN_IP': '10.0.0.1'},
-            '2': {}
-        }
-        scattered_data = {
-            'settings_config': settings.config,
-            'context': self.context
-        }
-
-        serializer = lcm.transaction_serializer
-        actual = serializer._distributed_serialize_tasks_for_node(
-            formatter_contexts_idx, [('1', task), ('2', task)], scattered_data)
-
-        expected = [
-            (
-                (
-                    '1',
-                    {
-                        'id': 'task1',
-                        'type': 'puppet',
-                        'parameters': {
-                            'cwd': '/',
-                            'master_ip': '10.0.0.1',
-                            'host': 'localhost',
-                            'attr': 'text1'
-                        },
-                        'fail_on_error': True
-                    }
-                ),
-                None
-            ),
-            (
-                (
-                    '2',
-                    {
-                        'id': 'task1',
-                        'type': 'puppet',
-                        'parameters': {
-                            'cwd': '/',
-                            'master_ip': '{MN_IP}',
-                            'host': 'localhost',
-                            'attr': 'text2'
-                        },
-                        'fail_on_error': True
-                    }
-                ),
-                None
-            )
-        ]
-
-        self.assertItemsEqual(expected, actual)
-
-    def test_distributed_serialization_serialize_task_failure(self):
-        task = {
-            'id': 'task1', 'roles': ['controller'],
-            'type': 'puppet', 'version': '2.0.0',
-            'parameters': {
-                'fake': {'yaql_exp': '$.some.fake_param'}
-            }
-        }
-
-        formatter_contexts_idx = {'2': {}}
-        scattered_data = {
-            'settings_config': settings.config,
-            'context': self.context
-        }
-
-        serializer = lcm.transaction_serializer
-        result = serializer._distributed_serialize_tasks_for_node(
-            formatter_contexts_idx, [('2', task)], scattered_data)
-        (_, __), err = result[0]
-        self.assertIsInstance(err, exceptions.KeyError)
-
-
-class TestConcurrencyPolicy(BaseTestCase):
-
-    @mock.patch(
-        'nailgun.lcm.transaction_serializer.multiprocessing.cpu_count',
-        return_value=1
-    )
-    def test_one_cpu(self, cpu_count):
-        policy = lcm.transaction_serializer.get_processing_policy(
-            lcm.TransactionContext({}))
-        self.assertIsInstance(
-            policy,
-            lcm.transaction_serializer.SingleWorkerConcurrencyPolicy
-        )
-        self.assertTrue(cpu_count.is_called)
-
-    @mock.patch(
-        'nailgun.lcm.transaction_serializer.multiprocessing.cpu_count',
-        return_value=0
-    )
-    def test_zero_cpu(self, cpu_count):
-        policy = lcm.transaction_serializer.get_processing_policy(
-            lcm.TransactionContext({}))
-        self.assertIsInstance(
-            policy,
-            lcm.transaction_serializer.SingleWorkerConcurrencyPolicy
-        )
-        self.assertTrue(cpu_count.is_called)
-
-    @mock.patch(
-        'nailgun.lcm.transaction_serializer.multiprocessing.cpu_count',
-        side_effect=NotImplementedError
-    )
-    def test_cpu_count_not_implemented(self, cpu_count):
-        policy = lcm.transaction_serializer.get_processing_policy(
-            lcm.TransactionContext({}))
-        self.assertIsInstance(
-            policy,
-            lcm.transaction_serializer.SingleWorkerConcurrencyPolicy
-        )
-        self.assertTrue(cpu_count.is_called)
-
-    def test_distributed_serialization_enabled_in_cluster(self):
-        context_data = {'common': {
-            'serialization_policy': consts.SERIALIZATION_POLICY.distributed
-        }}
-        policy = lcm.transaction_serializer.get_processing_policy(
-            lcm.TransactionContext(context_data))
-        self.assertIsInstance(
-            policy,
-            lcm.transaction_serializer.DistributedProcessingPolicy
-        )
diff --git a/nailgun/requirements.txt b/nailgun/requirements.txt
index c702e8a..96bed25 100644
--- a/nailgun/requirements.txt
+++ b/nailgun/requirements.txt
@@ -47,5 +47,3 @@ stevedore>=1.5.0
 # See: https://bugs.launchpad.net/fuel/+bug/1519727
 setuptools<=18.5
 yaql>=1.0.0
-# Distributed nodes serialization
-distributed==1.16.0