diff options
Diffstat (limited to 'build/f_repos/patch/fuel-web/0002-Revert-Distributed-serialization-implementation.patch')
-rw-r--r-- | build/f_repos/patch/fuel-web/0002-Revert-Distributed-serialization-implementation.patch | 1006 |
1 files changed, 0 insertions, 1006 deletions
diff --git a/build/f_repos/patch/fuel-web/0002-Revert-Distributed-serialization-implementation.patch b/build/f_repos/patch/fuel-web/0002-Revert-Distributed-serialization-implementation.patch deleted file mode 100644 index 152197e74..000000000 --- a/build/f_repos/patch/fuel-web/0002-Revert-Distributed-serialization-implementation.patch +++ /dev/null @@ -1,1006 +0,0 @@ -From: Alexandru Avadanii <Alexandru.Avadanii@enea.com> -Date: Sun, 26 Mar 2017 19:16:47 +0200 -Subject: [PATCH] Revert "Distributed serialization implementation" - -This reverts commit f3c440f76961ef161ab6650b3a41a3c775073ba0. ---- - nailgun/nailgun/consts.py | 4 - - nailgun/nailgun/fixtures/openstack.yaml | 49 --- - nailgun/nailgun/lcm/task_serializer.py | 15 +- - nailgun/nailgun/lcm/transaction_serializer.py | 317 +------------------ - .../nailgun/orchestrator/deployment_serializers.py | 1 - - nailgun/nailgun/settings.py | 22 +- - nailgun/nailgun/settings.yaml | 9 - - .../fuel_statistics/installation_info.py | 10 - - .../integration/test_cluster_changes_handler.py | 3 - - .../test/unit/test_lcm_transaction_serializer.py | 351 +-------------------- - nailgun/requirements.txt | 2 - - 11 files changed, 14 insertions(+), 769 deletions(-) - -diff --git a/nailgun/nailgun/consts.py b/nailgun/nailgun/consts.py -index 1a2f09f..5490d6c 100644 ---- a/nailgun/nailgun/consts.py -+++ b/nailgun/nailgun/consts.py -@@ -528,7 +528,3 @@ HYPERVISORS = Enum( - ) - - DPDK_DRIVER_IN_SRIOV_CASE = 'vfio-pci' -- --SERIALIZATION_POLICY = Enum( -- 'distributed' --) -diff --git a/nailgun/nailgun/fixtures/openstack.yaml b/nailgun/nailgun/fixtures/openstack.yaml -index 5557710..8b4d763 100644 ---- a/nailgun/nailgun/fixtures/openstack.yaml -+++ b/nailgun/nailgun/fixtures/openstack.yaml -@@ -1114,55 +1114,6 @@ - group: "security" - weight: 20 - type: "radio" -- serialization_policy: -- value: "default" -- values: -- - data: "default" -- label: "Default serialization" -- description: "Run serialization on the master node only" -- - data: "distributed" -- label: "Distributed serialization" -- description: "Run serialization on the master and environment nodes. Nodes for serialization are selected only form that environment for wich serialization is performing." -- label: "Serialization policy" -- group: "general" -- weight: 30 -- type: "radio" -- ds_use_discover: -- group: "general" -- label: "Use discovered nodes as workers for serialization" -- type: "checkbox" -- value: true -- weight: 31 -- restrictions: -- - condition: "settings:common.serialization_policy.value != 'distributed'" -- action: "hide" -- ds_use_provisioned: -- group: "general" -- label: "Use provisioned nodes as workers for serialization" -- type: "checkbox" -- value: true -- weight: 32 -- restrictions: -- - condition: "settings:common.serialization_policy.value != 'distributed'" -- action: "hide" -- ds_use_error: -- group: "general" -- label: "Use nodes in error state as workers for serialization" -- type: "checkbox" -- value: true -- weight: 33 -- restrictions: -- - condition: "settings:common.serialization_policy.value != 'distributed'" -- action: "hide" -- ds_use_ready: -- group: "general" -- label: "Use ready nodes as workers for serialization" -- type: "checkbox" -- value: false -- weight: 34 -- restrictions: -- - condition: "settings:common.serialization_policy.value != 'distributed'" -- action: "hide" - public_network_assignment: - metadata: - weight: 10 -diff --git a/nailgun/nailgun/lcm/task_serializer.py b/nailgun/nailgun/lcm/task_serializer.py -index 0d70f36..c004115 100644 ---- a/nailgun/nailgun/lcm/task_serializer.py -+++ b/nailgun/nailgun/lcm/task_serializer.py -@@ -110,8 +110,6 @@ class Context(object): - return evaluate - - def get_formatter_context(self, node_id): -- # TODO(akislitsky) remove formatter context from the -- # tasks serialization workflow - data = self._transaction.get_new_data(node_id) - return { - 'CLUSTER_ID': data.get('cluster', {}).get('id'), -@@ -149,14 +147,9 @@ class DeploymentTaskSerializer(object): - :return: the result - """ - -- def serialize(self, node_id, formatter_context=None): -- """Serialize task in expected by orchestrator format -+ def serialize(self, node_id): -+ """Serialize task in expected by orchestrator format. - -- If serialization is performed on the remote worker -- we should pass formatter_context parameter with values -- from the master node settings -- -- :param formatter_context: formatter context - :param node_id: the target node_id - """ - -@@ -164,12 +157,10 @@ class DeploymentTaskSerializer(object): - "serialize task %s for node %s", - self.task_template['id'], node_id - ) -- formatter_context = formatter_context \ -- or self.context.get_formatter_context(node_id) - task = utils.traverse( - self.task_template, - utils.text_format_safe, -- formatter_context, -+ self.context.get_formatter_context(node_id), - { - 'yaql_exp': self.context.get_yaql_interpreter( - node_id, self.task_template['id']) -diff --git a/nailgun/nailgun/lcm/transaction_serializer.py b/nailgun/nailgun/lcm/transaction_serializer.py -index 32953c1..c02146b 100644 ---- a/nailgun/nailgun/lcm/transaction_serializer.py -+++ b/nailgun/nailgun/lcm/transaction_serializer.py -@@ -14,21 +14,13 @@ - # License for the specific language governing permissions and limitations - # under the License. - --import datetime -+from distutils.version import StrictVersion - import multiprocessing --import os --from Queue import Queue --import shutil --import tempfile - --import distributed --from distutils.version import StrictVersion - import six --import toolz - - from nailgun import consts - from nailgun import errors --from nailgun.lcm.task_serializer import Context - from nailgun.lcm.task_serializer import TasksSerializersFactory - from nailgun.logger import logger - from nailgun.settings import settings -@@ -136,308 +128,7 @@ class MultiProcessingConcurrencyPolicy(object): - pool.join() - - --def _distributed_serialize_tasks_for_node(formatter_contexts_idx, -- node_and_tasks, scattered_data): -- """Remote serialization call for DistributedProcessingPolicy -- -- Code of the function is copied to the workers and executed there, thus -- we are including all required imports inside the function. -- -- :param formatter_contexts_idx: dict of formatter contexts with node_id -- value as key -- :param node_and_tasks: list of node_id, task_data tuples -- :param scattered_data: feature object, that points to data copied to -- workers -- :return: [(node_id, serialized), error] -- """ -- -- try: -- factory = TasksSerializersFactory(scattered_data['context']) -- -- # Restoring settings -- settings.config = scattered_data['settings_config'] -- for k in formatter_contexts_idx: -- formatter_contexts_idx[k]['SETTINGS'] = settings -- -- except Exception as e: -- logger.exception("Distributed serialization failed") -- return [((None, None), e)] -- -- result = [] -- -- for node_and_task in node_and_tasks: -- -- node_id = None -- try: -- node_id, task = node_and_task -- -- logger.debug("Starting distributed node %s task %s serialization", -- node_id, task['id']) -- -- formatter_context = formatter_contexts_idx[node_id] -- -- serializer = factory.create_serializer(task) -- serialized = serializer.serialize( -- node_id, formatter_context=formatter_context) -- -- logger.debug("Distributed node %s task %s serialization " -- "result: %s", node_id, task['id'], serialized) -- -- result.append(((node_id, serialized), None)) -- except Exception as e: -- logger.exception("Distributed serialization failed") -- result.append(((node_id, None), e)) -- break -- -- logger.debug("Processed tasks count: %s", len(result)) -- return result -- -- --class DistributedProcessingPolicy(object): -- -- def __init__(self): -- self.sent_jobs = Queue() -- self.sent_jobs_count = 0 -- -- def _consume_jobs(self, chunk_size=None): -- """Consumes jobs -- -- If chunk_size is set function consumes specified number of -- Finished tasks or less if sent_jobs_ids queue became empty. -- If chunk_size is None function consumes jobs until -- sent_jobs_ids queue became empty. -- Jobs with statuses Cancelled, Abandoned, Terminated will be -- resent and their ids added to sent_jobs_ids queue -- -- :param chunk_size: size of consuming chunk -- :return: generator on job results -- """ -- logger.debug("Consuming jobs started") -- -- jobs_to_consume = [] -- while not self.sent_jobs.empty(): -- job = self.sent_jobs.get() -- jobs_to_consume.append(job) -- -- if chunk_size is not None: -- chunk_size -= 1 -- if chunk_size <= 0: -- break -- -- for ready_job in distributed.as_completed(jobs_to_consume): -- results = ready_job.result() -- self.sent_jobs_count -= 1 -- -- for result in results: -- (node_id, serialized), exc = result -- logger.debug("Got ready task for node %s, serialized: %s, " -- "error: %s", node_id, serialized, exc) -- if exc is not None: -- raise exc -- yield node_id, serialized -- -- logger.debug("Consuming jobs finished") -- -- def _get_formatter_context(self, task_context, formatter_contexts_idx, -- node_id): -- try: -- return formatter_contexts_idx[node_id] -- except KeyError: -- pass -- -- logger.debug("Calculating formatter context for node %s", node_id) -- formatter_context = task_context.get_formatter_context( -- node_id) -- # Settings file is already sent to the workers -- formatter_context.pop('SETTINGS', None) -- formatter_contexts_idx[node_id] = formatter_context -- -- return formatter_context -- -- def _upload_nailgun_code(self, job_cluster): -- """Creates zip of current nailgun code and uploads it to workers -- -- TODO(akislitsky): add workers scope when it will be implemented -- in the distributed library -- -- :param job_cluster: distributed.Client -- """ -- logger.debug("Compressing nailgun code") -- file_dir = os.path.dirname(__file__) -- nailgun_root_dir = os.path.realpath(os.path.join(file_dir, '..', '..')) -- archive = os.path.join(tempfile.gettempdir(), 'nailgun') -- result = shutil.make_archive(archive, 'zip', nailgun_root_dir, -- 'nailgun') -- logger.debug("Nailgun code saved to: %s", result) -- -- logger.debug("Uploading nailgun archive %s to workers", result) -- job_cluster.upload_file(result) -- -- def _scatter_data(self, job_cluster, context, workers): -- logger.debug("Scattering data to workers started") -- shared_data = {'context': context, 'settings_config': settings.config} -- scattered = job_cluster.scatter(shared_data, broadcast=True, -- workers=workers) -- # Waiting data is scattered to workers -- distributed.wait(scattered.values()) -- logger.debug("Scattering data to workers finished") -- -- return scattered -- -- def _get_allowed_nodes_statuses(self, context): -- """Extracts node statuses that allows distributed serialization""" -- common = context.new.get('common', {}) -- cluster = common.get('cluster', {}) -- logger.debug("Getting allowed nodes statuses to use as serialization " -- "workers for cluster %s", cluster.get('id')) -- check_fields = { -- 'ds_use_ready': consts.NODE_STATUSES.ready, -- 'ds_use_provisioned': consts.NODE_STATUSES.provisioned, -- 'ds_use_discover': consts.NODE_STATUSES.discover, -- 'ds_use_error': consts.NODE_STATUSES.error -- } -- statuses = set() -- for field, node_status in check_fields.items(): -- if common.get(field): -- statuses.add(node_status) -- -- logger.debug("Allowed nodes statuses to use as serialization workers " -- "for cluster %s are: %s", cluster.get('id'), statuses) -- return statuses -- -- def _get_allowed_nodes_ips(self, context): -- """Filters online nodes from cluster by their status -- -- In the cluster settings we select nodes statuses allowed for -- using in the distributed serialization. Accordingly to selected -- statuses nodes are going to be filtered. -- -- :param context: TransactionContext -- :return: set of allowed nodes ips -- """ -- ips = set() -- allowed_statuses = self._get_allowed_nodes_statuses(context) -- for node in six.itervalues(context.new.get('nodes', {})): -- if node.get('status') in allowed_statuses: -- ips.add(node.get('ip')) -- ips.add(settings.MASTER_IP) -- return ips -- -- def _get_allowed_workers(self, job_cluster, allowed_ips): -- """Calculates workers addresses for distributed serialization -- -- Only workers that placed on the allowed nodes must be selected -- for the serialization. -- -- :param job_cluster: distributed.Client -- :param allowed_ips: allowed for serialization nodes ips -- :return: list of workers addresses in format 'ip:port' -- """ -- logger.debug("Getting allowed workers") -- workers = {} -- -- # Worker has address like tcp://ip:port -- info = job_cluster.scheduler_info() -- for worker_addr in six.iterkeys(info['workers']): -- ip_port = worker_addr.split('//')[1] -- ip = ip_port.split(':')[0] -- if ip not in allowed_ips: -- continue -- try: -- pool = workers[ip] -- pool.add(ip_port) -- except KeyError: -- workers[ip] = set([ip_port]) -- -- return list(toolz.itertoolz.concat(six.itervalues(workers))) -- -- def execute(self, context, _, tasks): -- """Executes task serialization on distributed nodes -- -- :param context: the transaction context -- :param _: serializers factory -- :param tasks: the tasks to serialize -- :return sequence of serialized tasks -- """ -- logger.debug("Performing distributed tasks processing") -- sched_address = '{0}:{1}'.format(settings.MASTER_IP, -- settings.LCM_DS_JOB_SHEDULER_PORT) -- job_cluster = distributed.Client(sched_address) -- -- allowed_ips = self._get_allowed_nodes_ips(context) -- workers = self._get_allowed_workers(job_cluster, allowed_ips) -- logger.debug("Allowed workers list for serialization: %s", workers) -- workers_ips = set([ip_port.split(':')[0] for ip_port in workers]) -- logger.debug("Allowed workers ips list for serialization: %s", -- workers_ips) -- -- task_context = Context(context) -- formatter_contexts_idx = {} -- workers_num = len(workers) -- max_jobs_in_queue = workers_num * settings.LCM_DS_NODE_LOAD_COEFF -- logger.debug("Max jobs allowed in queue: %s", max_jobs_in_queue) -- -- start = datetime.datetime.utcnow() -- tasks_count = 0 -- -- try: -- self._upload_nailgun_code(job_cluster) -- scattered = self._scatter_data(job_cluster, context, workers) -- -- for tasks_chunk in toolz.partition_all( -- settings.LCM_DS_TASKS_PER_JOB, tasks): -- -- formatter_contexts_for_tasks = {} -- -- # Collecting required contexts for tasks -- for task in tasks_chunk: -- node_id, task_data = task -- formatter_context = self._get_formatter_context( -- task_context, formatter_contexts_idx, node_id) -- if node_id not in formatter_contexts_for_tasks: -- formatter_contexts_for_tasks[node_id] = \ -- formatter_context -- -- logger.debug("Submitting job for tasks chunk: %s", tasks_chunk) -- job = job_cluster.submit( -- _distributed_serialize_tasks_for_node, -- formatter_contexts_for_tasks, -- tasks_chunk, -- scattered, -- workers=workers_ips -- ) -- -- self.sent_jobs.put(job) -- self.sent_jobs_count += 1 -- -- # We are limit the max number of tasks by the number of nodes -- # which are used in the serialization -- if self.sent_jobs_count >= max_jobs_in_queue: -- for result in self._consume_jobs(chunk_size=workers_num): -- tasks_count += 1 -- yield result -- -- # We have no tasks any more but have unconsumed jobs -- for result in self._consume_jobs(): -- tasks_count += 1 -- yield result -- finally: -- end = datetime.datetime.utcnow() -- logger.debug("Distributed tasks processing finished. " -- "Total time: %s. Tasks processed: %s", -- end - start, tasks_count) -- job_cluster.shutdown() -- -- --def is_distributed_processing_enabled(context): -- common = context.new.get('common', {}) -- return common.get('serialization_policy') == \ -- consts.SERIALIZATION_POLICY.distributed -- -- --def get_processing_policy(context): -- if is_distributed_processing_enabled(context): -- return DistributedProcessingPolicy() -+def get_concurrency_policy(): - cpu_num = settings.LCM_SERIALIZERS_CONCURRENCY_FACTOR - if not cpu_num: - try: -@@ -471,7 +162,7 @@ class TransactionSerializer(object): - # ids of nodes in this group and how many nodes in this group can fail - # and deployment will not be interrupted - self.fault_tolerance_groups = [] -- self.processing_policy = get_processing_policy(context) -+ self.concurrency_policy = get_concurrency_policy() - - @classmethod - def serialize(cls, context, tasks, resolver): -@@ -525,7 +216,7 @@ class TransactionSerializer(object): - :param tasks: the deployment tasks - :return the mapping tasks per node - """ -- serialized = self.processing_policy.execute( -+ serialized = self.concurrency_policy.execute( - self.context, - self.serializer_factory_class, - self.expand_tasks(tasks) -diff --git a/nailgun/nailgun/orchestrator/deployment_serializers.py b/nailgun/nailgun/orchestrator/deployment_serializers.py -index 99d52a2..9473733 100644 ---- a/nailgun/nailgun/orchestrator/deployment_serializers.py -+++ b/nailgun/nailgun/orchestrator/deployment_serializers.py -@@ -221,7 +221,6 @@ class DeploymentMultinodeSerializer(object): - 'role': role, - 'vms_conf': node.vms_conf, - 'fail_if_error': role in self.critical_roles, -- 'ip': node.ip, - # TODO(eli): need to remove, requried for the fake thread only - 'online': node.online, - } -diff --git a/nailgun/nailgun/settings.py b/nailgun/nailgun/settings.py -index 1f9222b..eee08c9 100644 ---- a/nailgun/nailgun/settings.py -+++ b/nailgun/nailgun/settings.py -@@ -38,21 +38,7 @@ class NailgunSettings(object): - if test_config: - settings_files.append(test_config) - -- # If settings.yaml doesn't exist we should have default -- # config structure. Nailgun without settings is used -- # when we distribute source code to the workers for -- # distributed serialization -- self.config = { -- 'VERSION': {}, -- 'DATABASE': { -- 'engine': 'postgresql', -- 'name': '', -- 'host': '', -- 'port': '0', -- 'user': '', -- 'passwd': '' -- } -- } -+ self.config = {} - for sf in settings_files: - try: - logger.debug("Trying to read config file %s" % sf) -@@ -61,9 +47,9 @@ class NailgunSettings(object): - logger.error("Error while reading config file %s: %s" % - (sf, str(e))) - -- self.config['VERSION']['api'] = self.config.get('API') -+ self.config['VERSION']['api'] = self.config['API'] - self.config['VERSION']['feature_groups'] = \ -- self.config.get('FEATURE_GROUPS') -+ self.config['FEATURE_GROUPS'] - - fuel_release = self.get_file_content(consts.FUEL_RELEASE_FILE) - if fuel_release: -@@ -75,7 +61,7 @@ class NailgunSettings(object): - self.config['VERSION']['openstack_version'] = \ - fuel_openstack_version - -- if int(self.config.get("DEVELOPMENT", 0)): -+ if int(self.config.get("DEVELOPMENT")): - logger.info("DEVELOPMENT MODE ON:") - here = os.path.abspath( - os.path.join(os.path.dirname(__file__), '..') -diff --git a/nailgun/nailgun/settings.yaml b/nailgun/nailgun/settings.yaml -index c2faaa8..92b3518 100644 ---- a/nailgun/nailgun/settings.yaml -+++ b/nailgun/nailgun/settings.yaml -@@ -177,15 +177,6 @@ YAQL_MEMORY_QUOTA: 104857600 - - LCM_CHECK_TASK_VERSION: False - --# Coefficient for calculation max jobs queue length. If jobs number reaches the --# len(nodes) * load_coef we stop generate and start consume of jobs. --LCM_DS_NODE_LOAD_COEFF: 2 --# Port of dask-scheduler on the master node --LCM_DS_JOB_SHEDULER_PORT: 8002 --# Size of tasks chunk sending to the distributed worker --LCM_DS_TASKS_PER_JOB: 100 -- -- - DPDK_MAX_CPUS_PER_NIC: 4 - - TRUNCATE_LOG_ENTRIES: 100 -diff --git a/nailgun/nailgun/statistics/fuel_statistics/installation_info.py b/nailgun/nailgun/statistics/fuel_statistics/installation_info.py -index d024cdd..482d8c5 100644 ---- a/nailgun/nailgun/statistics/fuel_statistics/installation_info.py -+++ b/nailgun/nailgun/statistics/fuel_statistics/installation_info.py -@@ -56,16 +56,6 @@ class InstallationInfo(object): - 'propagate_task_deploy', None), - WhiteListRule(('common', 'security_groups', 'value'), - 'security_groups', None), -- WhiteListRule(('common', 'serialization_policy', 'value'), -- 'serialization_policy', None), -- WhiteListRule(('common', 'ds_use_discover', 'value'), -- 'ds_use_discover', None), -- WhiteListRule(('common', 'ds_use_provisioned', 'value'), -- 'ds_use_provisioned', None), -- WhiteListRule(('common', 'ds_use_ready', 'value'), -- 'ds_use_ready', None), -- WhiteListRule(('common', 'ds_use_error', 'value'), -- 'ds_use_error', None), - WhiteListRule(('corosync', 'verified', 'value'), - 'corosync_verified', None), - -diff --git a/nailgun/nailgun/test/integration/test_cluster_changes_handler.py b/nailgun/nailgun/test/integration/test_cluster_changes_handler.py -index b8f4e98..a8661ab 100644 ---- a/nailgun/nailgun/test/integration/test_cluster_changes_handler.py -+++ b/nailgun/nailgun/test/integration/test_cluster_changes_handler.py -@@ -187,7 +187,6 @@ class TestHandlers(BaseIntegrationTest): - 'fail_if_error': is_critical, - 'vms_conf': [], - 'fqdn': 'node-%d.%s' % (node.id, settings.DNS_DOMAIN), -- 'ip': node.ip, - - 'network_data': { - 'eth1': { -@@ -604,7 +603,6 @@ class TestHandlers(BaseIntegrationTest): - 'online': node.online, - 'fail_if_error': is_critical, - 'fqdn': 'node-%d.%s' % (node.id, settings.DNS_DOMAIN), -- 'ip': node.ip, - 'priority': 100, - 'vms_conf': [], - 'network_scheme': { -@@ -1098,7 +1096,6 @@ class TestHandlers(BaseIntegrationTest): - 'fail_if_error': is_critical, - 'fqdn': 'node-%d.%s' % (node.id, settings.DNS_DOMAIN), - 'priority': 100, -- 'ip': node.ip, - 'vms_conf': [], - - 'network_scheme': { -diff --git a/nailgun/nailgun/test/unit/test_lcm_transaction_serializer.py b/nailgun/nailgun/test/unit/test_lcm_transaction_serializer.py -index 9751d55..6450e19 100644 ---- a/nailgun/nailgun/test/unit/test_lcm_transaction_serializer.py -+++ b/nailgun/nailgun/test/unit/test_lcm_transaction_serializer.py -@@ -14,24 +14,20 @@ - # License for the specific language governing permissions and limitations - # under the License. - --import copy --import exceptions - import mock - import multiprocessing.dummy - - from nailgun import consts - from nailgun import errors - from nailgun import lcm --from nailgun.lcm import TransactionContext --from nailgun.settings import settings --from nailgun.test.base import BaseTestCase - from nailgun.utils.resolvers import TagResolver - -+from nailgun.test.base import BaseUnitTest - --class TestTransactionSerializer(BaseTestCase): -+ -+class TestTransactionSerializer(BaseUnitTest): - @classmethod - def setUpClass(cls): -- super(TestTransactionSerializer, cls).setUpClass() - cls.tasks = [ - { - 'id': 'task1', 'roles': ['controller'], -@@ -466,344 +462,3 @@ class TestTransactionSerializer(BaseTestCase): - 9, - lcm.TransactionSerializer.calculate_fault_tolerance('-1 ', 10) - ) -- -- def _get_context_for_distributed_serialization(self): -- new = copy.deepcopy(self.context.new) -- new['common']['serialization_policy'] = \ -- consts.SERIALIZATION_POLICY.distributed -- return TransactionContext(new) -- -- @mock.patch('nailgun.lcm.transaction_serializer.distributed.wait') -- @mock.patch('nailgun.lcm.transaction_serializer.distributed.as_completed') -- def test_distributed_serialization(self, _, as_completed): -- context = self._get_context_for_distributed_serialization() -- -- with mock.patch( -- 'nailgun.lcm.transaction_serializer.distributed.Client' -- ) as job_cluster: -- job = mock.Mock() -- job.result.return_value = [ -- (('1', {"id": "task1", "type": "skipped"}), None) -- ] -- -- submit = mock.Mock() -- submit.return_value = job -- -- as_completed.return_value = [job] -- -- job_cluster.return_value.submit = submit -- job_cluster.return_value.scheduler_info.return_value = \ -- {'workers': {'tcp://worker': {}}} -- -- lcm.TransactionSerializer.serialize( -- context, self.tasks, self.resolver) -- self.assertTrue(submit.called) -- # 4 controller task + 1 compute + 1 cinder -- self.assertTrue(6, submit.call_count) -- -- @mock.patch('nailgun.lcm.transaction_serializer.distributed.wait') -- @mock.patch('nailgun.lcm.transaction_serializer.distributed.as_completed') -- @mock.patch('nailgun.lcm.transaction_serializer.' -- 'DistributedProcessingPolicy._get_formatter_context') -- def test_distributed_serialization_workers_scope(self, formatter_context, -- as_completed, _): -- context = self._get_context_for_distributed_serialization() -- -- node_id = '1' -- task = { -- 'id': 'task1', 'roles': ['controller'], -- 'type': 'puppet', 'version': '2.0.0' -- } -- -- with mock.patch( -- 'nailgun.lcm.transaction_serializer.distributed.Client' -- ) as job_cluster: -- -- # Mocking job processing -- job = mock.Mock() -- job.result.return_value = [((node_id, task), None)] -- -- submit = mock.Mock() -- submit.return_value = job -- -- as_completed.return_value = [job] -- -- scatter = mock.Mock() -- job_cluster.return_value.scatter = scatter -- -- job_cluster.return_value.scatter.return_value = {} -- job_cluster.return_value.submit = submit -- -- formatter_context.return_value = {node_id: {}} -- -- # Configuring available workers -- job_cluster.return_value.scheduler_info.return_value = \ -- { -- 'workers': { -- 'tcp://{0}'.format(settings.MASTER_IP): {}, -- 'tcp://192.168.0.1:33334': {}, -- 'tcp://127.0.0.2:33335': {}, -- } -- } -- -- # Performing serialization -- lcm.TransactionSerializer.serialize( -- context, [task], self.resolver -- ) -- -- # Checking data is scattered only to expected workers -- scatter.assert_called_once() -- scatter.assert_called_with( -- {'context': context, 'settings_config': settings.config}, -- broadcast=True, -- workers=[settings.MASTER_IP] -- ) -- -- # Checking submit job only to expected workers -- submit.assert_called_once() -- serializer = lcm.transaction_serializer -- submit.assert_called_with( -- serializer._distributed_serialize_tasks_for_node, -- {node_id: formatter_context()}, -- ((node_id, task),), -- job_cluster().scatter(), -- workers=set([settings.MASTER_IP]) -- ) -- -- def test_distributed_serialization_get_allowed_nodes_ips(self): -- policy = lcm.transaction_serializer.DistributedProcessingPolicy() -- -- context_data = { -- 'common': { -- 'serialization_policy': -- consts.SERIALIZATION_POLICY.distributed, -- 'ds_use_error': True, -- 'ds_use_provisioned': True, -- 'ds_use_discover': True, -- 'ds_use_ready': False -- }, -- 'nodes': { -- '1': {'status': consts.NODE_STATUSES.error, -- 'ip': '10.20.0.3'}, -- '2': {'status': consts.NODE_STATUSES.provisioned, -- 'ip': '10.20.0.4'}, -- '3': {'status': consts.NODE_STATUSES.discover, -- 'ip': '10.20.0.5'}, -- '4': {'status': consts.NODE_STATUSES.ready, -- 'ip': '10.20.0.6'}, -- } -- } -- -- actual = policy._get_allowed_nodes_ips( -- TransactionContext(context_data)) -- self.assertItemsEqual( -- [settings.MASTER_IP, '10.20.0.3', '10.20.0.4', '10.20.0.5'], -- actual -- ) -- -- def test_distributed_serialization_get_allowed_nodes_statuses(self): -- policy = lcm.transaction_serializer.DistributedProcessingPolicy() -- context_data = {} -- actual = policy._get_allowed_nodes_statuses( -- TransactionContext(context_data)) -- self.assertItemsEqual([], actual) -- -- context_data['common'] = { -- 'ds_use_discover': False, -- 'ds_use_provisioned': False, -- 'ds_use_error': False, -- 'ds_use_ready': False -- } -- actual = policy._get_allowed_nodes_statuses( -- TransactionContext(context_data)) -- self.assertItemsEqual([], actual) -- -- context_data['common']['ds_use_discover'] = True -- actual = policy._get_allowed_nodes_statuses( -- TransactionContext(context_data)) -- expected = [consts.NODE_STATUSES.discover] -- self.assertItemsEqual(expected, actual) -- -- context_data['common']['ds_use_provisioned'] = True -- actual = policy._get_allowed_nodes_statuses( -- TransactionContext(context_data)) -- expected = [consts.NODE_STATUSES.discover, -- consts.NODE_STATUSES.provisioned] -- self.assertItemsEqual(expected, actual) -- -- context_data['common']['ds_use_error'] = True -- actual = policy._get_allowed_nodes_statuses( -- TransactionContext(context_data)) -- expected = [consts.NODE_STATUSES.discover, -- consts.NODE_STATUSES.provisioned, -- consts.NODE_STATUSES.error] -- self.assertItemsEqual(expected, actual) -- -- context_data['common']['ds_use_ready'] = True -- actual = policy._get_allowed_nodes_statuses( -- TransactionContext(context_data)) -- expected = [consts.NODE_STATUSES.discover, -- consts.NODE_STATUSES.provisioned, -- consts.NODE_STATUSES.error, -- consts.NODE_STATUSES.ready] -- self.assertItemsEqual(expected, actual) -- -- def test_distributed_serialization_get_allowed_workers(self): -- policy = lcm.transaction_serializer.DistributedProcessingPolicy() -- -- with mock.patch( -- 'nailgun.lcm.transaction_serializer.distributed.Client' -- ) as job_cluster: -- job_cluster.scheduler_info.return_value = \ -- {'workers': { -- 'tcp://10.20.0.2:1': {}, -- 'tcp://10.20.0.2:2': {}, -- 'tcp://10.20.0.3:1': {}, -- 'tcp://10.20.0.3:2': {}, -- 'tcp://10.20.0.3:3': {}, -- 'tcp://10.20.0.4:1': {}, -- 'tcp://10.20.0.5:1': {} -- }} -- allowed_ips = set(['10.20.0.2', '10.20.0.3', '10.20.0.5']) -- -- expected = ['10.20.0.2:1', '10.20.0.2:2', '10.20.0.3:1', -- '10.20.0.3:2', '10.20.0.3:3', '10.20.0.5:1'] -- actual = policy._get_allowed_workers(job_cluster, allowed_ips) -- self.assertItemsEqual(expected, actual) -- -- def test_distributed_serialization_serialize_task(self): -- task = { -- 'id': 'task1', 'roles': ['controller'], -- 'type': 'puppet', 'version': '2.0.0', -- 'parameters': { -- 'master_ip': '{MN_IP}', -- 'host': {'yaql_exp': '$.public_ssl.hostname'}, -- 'attr': {'yaql_exp': '$node.attributes.a_str'} -- } -- } -- -- formatter_contexts_idx = { -- '1': {'MN_IP': '10.0.0.1'}, -- '2': {} -- } -- scattered_data = { -- 'settings_config': settings.config, -- 'context': self.context -- } -- -- serializer = lcm.transaction_serializer -- actual = serializer._distributed_serialize_tasks_for_node( -- formatter_contexts_idx, [('1', task), ('2', task)], scattered_data) -- -- expected = [ -- ( -- ( -- '1', -- { -- 'id': 'task1', -- 'type': 'puppet', -- 'parameters': { -- 'cwd': '/', -- 'master_ip': '10.0.0.1', -- 'host': 'localhost', -- 'attr': 'text1' -- }, -- 'fail_on_error': True -- } -- ), -- None -- ), -- ( -- ( -- '2', -- { -- 'id': 'task1', -- 'type': 'puppet', -- 'parameters': { -- 'cwd': '/', -- 'master_ip': '{MN_IP}', -- 'host': 'localhost', -- 'attr': 'text2' -- }, -- 'fail_on_error': True -- } -- ), -- None -- ) -- ] -- -- self.assertItemsEqual(expected, actual) -- -- def test_distributed_serialization_serialize_task_failure(self): -- task = { -- 'id': 'task1', 'roles': ['controller'], -- 'type': 'puppet', 'version': '2.0.0', -- 'parameters': { -- 'fake': {'yaql_exp': '$.some.fake_param'} -- } -- } -- -- formatter_contexts_idx = {'2': {}} -- scattered_data = { -- 'settings_config': settings.config, -- 'context': self.context -- } -- -- serializer = lcm.transaction_serializer -- result = serializer._distributed_serialize_tasks_for_node( -- formatter_contexts_idx, [('2', task)], scattered_data) -- (_, __), err = result[0] -- self.assertIsInstance(err, exceptions.KeyError) -- -- --class TestConcurrencyPolicy(BaseTestCase): -- -- @mock.patch( -- 'nailgun.lcm.transaction_serializer.multiprocessing.cpu_count', -- return_value=1 -- ) -- def test_one_cpu(self, cpu_count): -- policy = lcm.transaction_serializer.get_processing_policy( -- lcm.TransactionContext({})) -- self.assertIsInstance( -- policy, -- lcm.transaction_serializer.SingleWorkerConcurrencyPolicy -- ) -- self.assertTrue(cpu_count.is_called) -- -- @mock.patch( -- 'nailgun.lcm.transaction_serializer.multiprocessing.cpu_count', -- return_value=0 -- ) -- def test_zero_cpu(self, cpu_count): -- policy = lcm.transaction_serializer.get_processing_policy( -- lcm.TransactionContext({})) -- self.assertIsInstance( -- policy, -- lcm.transaction_serializer.SingleWorkerConcurrencyPolicy -- ) -- self.assertTrue(cpu_count.is_called) -- -- @mock.patch( -- 'nailgun.lcm.transaction_serializer.multiprocessing.cpu_count', -- side_effect=NotImplementedError -- ) -- def test_cpu_count_not_implemented(self, cpu_count): -- policy = lcm.transaction_serializer.get_processing_policy( -- lcm.TransactionContext({})) -- self.assertIsInstance( -- policy, -- lcm.transaction_serializer.SingleWorkerConcurrencyPolicy -- ) -- self.assertTrue(cpu_count.is_called) -- -- def test_distributed_serialization_enabled_in_cluster(self): -- context_data = {'common': { -- 'serialization_policy': consts.SERIALIZATION_POLICY.distributed -- }} -- policy = lcm.transaction_serializer.get_processing_policy( -- lcm.TransactionContext(context_data)) -- self.assertIsInstance( -- policy, -- lcm.transaction_serializer.DistributedProcessingPolicy -- ) -diff --git a/nailgun/requirements.txt b/nailgun/requirements.txt -index c702e8a..96bed25 100644 ---- a/nailgun/requirements.txt -+++ b/nailgun/requirements.txt -@@ -47,5 +47,3 @@ stevedore>=1.5.0 - # See: https://bugs.launchpad.net/fuel/+bug/1519727 - setuptools<=18.5 - yaql>=1.0.0 --# Distributed nodes serialization --distributed==1.16.0 |