diff options
author | Cédric Ollivier <cedric.ollivier@orange.com> | 2020-06-03 15:25:58 +0200 |
---|---|---|
committer | Cédric Ollivier <cedric.ollivier@orange.com> | 2020-06-03 15:25:58 +0200 |
commit | 37b7e536d679303ad6ddfade450201efd90a983a (patch) | |
tree | 8320532f53b6c8643356646da76046d0e963b63a | |
parent | 8159c62598ed3e6b35367a8227f087e512cd03d0 (diff) |
Switching to Threading instead of multiprocessing
Change-Id: Id5059a06447357f4c9b058bad374ed6cbe4d742c
Signed-off-by: Cédric Ollivier <cedric.ollivier@orange.com>
-rw-r--r-- | docker/core/Try-to-detect-the-race-conditions.patch | 417 |
1 files changed, 74 insertions, 343 deletions
diff --git a/docker/core/Try-to-detect-the-race-conditions.patch b/docker/core/Try-to-detect-the-race-conditions.patch index 82f60e2d0..9413d7617 100644 --- a/docker/core/Try-to-detect-the-race-conditions.patch +++ b/docker/core/Try-to-detect-the-race-conditions.patch @@ -1,376 +1,107 @@ -From 41ce1778631894401573161e627a78c2b44182a1 Mon Sep 17 00:00:00 2001 +From 304497b81fbbe9cb8608b947cae76aeaa2b0934e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?C=C3=A9dric=20Ollivier?= <cedric.ollivier@orange.com> -Date: Thu, 30 Apr 2020 13:59:24 +0200 -Subject: [PATCH] Try to detect the race conditions +Date: Wed, 3 Jun 2020 15:23:59 +0200 +Subject: [PATCH 11/11] Try to detect the race conditions MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit -Change-Id: I9b468ec1cf79e0a66abeb1fb48f5f0f067c2c198 +Change-Id: I582933832e23d188c7fa5999e713dd5d7e82d2da Signed-off-by: Cédric Ollivier <cedric.ollivier@orange.com> --- - rally/cli/main.py | 6 +++ - rally/plugins/task/runners/constant.py | 41 +++++++++++++++---- - rally/plugins/task/runners/rps.py | 8 ++-- - .../task/scenarios/requests/http_requests.py | 9 ++++ - .../plugins/task/scenarios/requests/utils.py | 9 ++++ - rally/task/runner.py | 29 +++++++++++-- - rally/task/utils.py | 15 +++++++ - 7 files changed, 102 insertions(+), 15 deletions(-) + rally/cli/main.py | 5 ++++- + rally/task/runner.py | 23 ++++++++++++++++++----- + 2 files changed, 22 insertions(+), 6 deletions(-) diff --git a/rally/cli/main.py b/rally/cli/main.py -index 235a57113..d931924d8 100644 +index 235a57113..14c057c0e 100644 --- a/rally/cli/main.py +++ b/rally/cli/main.py -@@ -15,6 +15,12 @@ +@@ -15,6 +15,10 @@ """CLI interface for Rally.""" ++STACK_SIZE = 1024 * 1024 +import threading -+threading.stack_size(1024 * 1024) -+ -+import multiprocessing as mp -+mp.set_start_method('fork') ++threading.stack_size(STACK_SIZE) + import sys from rally.cli import cliutils -diff --git a/rally/plugins/task/runners/constant.py b/rally/plugins/task/runners/constant.py -index 5feb1fee1..2f6142cfa 100644 ---- a/rally/plugins/task/runners/constant.py -+++ b/rally/plugins/task/runners/constant.py -@@ -15,7 +15,7 @@ - - import collections - import multiprocessing --import queue as Queue -+import queue - import threading - import time - -@@ -24,6 +24,10 @@ from rally.common import validation - from rally import consts - from rally.task import runner - -+from rally.common import cfg -+from rally.common import logging -+CONF = cfg.CONF -+LOG = logging.getLogger(__file__) - - def _worker_process(queue, iteration_gen, timeout, concurrency, times, - duration, context, cls, method_name, args, event_queue, -@@ -55,6 +59,9 @@ def _worker_process(queue, iteration_gen, timeout, concurrency, times, - """ - def _to_be_continued(iteration, current_duration, aborted, times=None, - duration=None): -+ LOG.warning( -+ "!! _to_be_continued %(iteration)s %(current_duration)s %(aborted)s %(times)s %(duration)s !! " % -+ {"iteration": iteration, "current_duration": current_duration, "aborted": aborted, "times": times, "duration": duration}) - if times is not None: - return iteration < times and not aborted.is_set() - elif duration is not None: -@@ -74,7 +81,7 @@ def _worker_process(queue, iteration_gen, timeout, concurrency, times, - method_name=method_name, args=args) - - if timeout: -- timeout_queue = Queue.Queue() -+ timeout_queue = queue.Queue() - collector_thr_by_timeout = threading.Thread( - target=utils.timeout_thread, - args=(timeout_queue, ) -@@ -82,6 +89,7 @@ def _worker_process(queue, iteration_gen, timeout, concurrency, times, - collector_thr_by_timeout.start() - - iteration = next(iteration_gen) -+ LOG.warning("!! iteration %(iteration)s !! " % {"iteration": iteration}) - start_time = time.time() - # NOTE(msimonin): keep the previous behaviour - # > when duration is 0, scenario executes exactly 1 time -@@ -93,13 +101,25 @@ def _worker_process(queue, iteration_gen, timeout, concurrency, times, - worker_args = ( - queue, cls, method_name, scenario_context, args, event_queue) - -+ LOG.warn( -+ "Cedric _to_be_continued threading.Thread {} {}".format( -+ runner._worker_thread, worker_args)) - thread = threading.Thread(target=runner._worker_thread, - args=worker_args) - -+ LOG.warn( -+ "Cedric _to_be_continued thread.start() {} {}".format( -+ runner._worker_thread, worker_args)) - thread.start() -+ LOG.warn( -+ "Cedric _to_be_continued thread.start() {} {}".format( -+ runner._worker_thread, worker_args)) - if timeout: - timeout_queue.put((thread, time.time() + timeout)) - pool.append(thread) -+ LOG.warn( -+ "Cedric _to_be_continued pool.append {} {}".format( -+ pool, thread)) - alive_threads_in_pool += 1 - - while alive_threads_in_pool == concurrency: -@@ -128,7 +148,14 @@ def _worker_process(queue, iteration_gen, timeout, concurrency, times, - - # Wait until all threads are done - while pool: -- pool.popleft().join() -+ thr = pool.popleft() -+ LOG.warn( -+ "Cedric _worker_process wait_all_threads {} {} BEFORE JOIN".format( -+ pool, thr)) -+ thr.join() -+ LOG.warn( -+ "Cedric _worker_process wait_all_threads {} {} AFTER JOIN".format( -+ pool, thr)) - - if timeout: - timeout_queue.put((None, None,)) -@@ -229,8 +256,8 @@ class ConstantScenarioRunner(runner.ScenarioRunner): - concurrency_per_worker=concurrency_per_worker, - concurrency_overhead=concurrency_overhead) - -- result_queue = multiprocessing.Queue() -- event_queue = multiprocessing.Queue() -+ result_queue = queue.Queue() -+ event_queue = queue.Queue() - - def worker_args_gen(concurrency_overhead): - while True: -@@ -324,8 +351,8 @@ class ConstantForDurationScenarioRunner(runner.ScenarioRunner): - concurrency_per_worker=concurrency_per_worker, - concurrency_overhead=concurrency_overhead) - -- result_queue = multiprocessing.Queue() -- event_queue = multiprocessing.Queue() -+ result_queue = queue.Queue() -+ event_queue = queue.Queue() - - def worker_args_gen(concurrency_overhead): - while True: -diff --git a/rally/plugins/task/runners/rps.py b/rally/plugins/task/runners/rps.py -index 98a706d11..26b55feab 100644 ---- a/rally/plugins/task/runners/rps.py -+++ b/rally/plugins/task/runners/rps.py -@@ -15,7 +15,7 @@ - +@@ -25,7 +29,6 @@ from rally.cli.commands import plugin + from rally.cli.commands import task + from rally.cli.commands import verify + +- + categories = { + "db": db.DBCommands, + "env": env.EnvCommands, +diff --git a/rally/task/runner.py b/rally/task/runner.py +index 3397e1193..b2fde8550 100644 +--- a/rally/task/runner.py ++++ b/rally/task/runner.py +@@ -17,6 +17,7 @@ import abc import collections + import copy import multiprocessing --import queue as Queue -+import queue - import threading ++import threading import time -@@ -69,7 +69,7 @@ def _worker_process(queue, iteration_gen, timeout, times, max_concurrent, - (sleep * info["processes_counter"]) / info["processes_to_start"]) - - start = time.time() -- timeout_queue = Queue.Queue() -+ timeout_queue = queue.Queue() - - if timeout: - collector_thr_by_timeout = threading.Thread( -@@ -260,8 +260,8 @@ class RPSScenarioRunner(runner.ScenarioRunner): - concurrency_per_worker=concurrency_per_worker, - concurrency_overhead=concurrency_overhead) - -- result_queue = multiprocessing.Queue() -- event_queue = multiprocessing.Queue() -+ result_queue = queue.Queue() -+ event_queue = queue.Queue() - - def worker_args_gen(times_overhead, concurrency_overhead): - """Generate arguments for process worker. -diff --git a/rally/plugins/task/scenarios/requests/http_requests.py b/rally/plugins/task/scenarios/requests/http_requests.py -index e85ee5af2..4afdf29f1 100644 ---- a/rally/plugins/task/scenarios/requests/http_requests.py -+++ b/rally/plugins/task/scenarios/requests/http_requests.py -@@ -15,6 +15,11 @@ import random - from rally.plugins.task.scenarios.requests import utils - from rally.task import scenario - -+from rally.common import cfg -+from rally.common import logging -+CONF = cfg.CONF -+LOG = logging.getLogger(__file__) -+ - - """Scenarios for HTTP requests.""" - -@@ -34,6 +39,10 @@ class HttpRequestsCheckRequest(utils.RequestScenario): - :param kwargs: optional additional request parameters - """ - -+ LOG.warn("Cedric run url {}".format(url)) -+ LOG.warn("Cedric run method {}".format(method)) -+ LOG.warn("Cedric run status_code {}".format(status_code)) -+ LOG.warn("Cedric run kwargs {}".format(kwargs)) - self._check_request(url, method, status_code, **kwargs) - - -diff --git a/rally/plugins/task/scenarios/requests/utils.py b/rally/plugins/task/scenarios/requests/utils.py -index 8fd35347a..cd69900a5 100644 ---- a/rally/plugins/task/scenarios/requests/utils.py -+++ b/rally/plugins/task/scenarios/requests/utils.py -@@ -15,6 +15,11 @@ import requests - from rally.task import atomic - from rally.task import scenario - -+from rally.common import cfg -+from rally.common import logging -+CONF = cfg.CONF -+LOG = logging.getLogger(__file__) -+ - - class RequestScenario(scenario.Scenario): - """Base class for Request scenarios with basic atomic actions.""" -@@ -31,6 +36,10 @@ class RequestScenario(scenario.Scenario): - not equal to expected status code - """ - -+ LOG.warn("Cedric _check_request url {}".format(url)) -+ LOG.warn("Cedric _check_request method {}".format(method)) -+ LOG.warn("Cedric _check_request status_code {}".format(status_code)) -+ LOG.warn("Cedric _check_request kwargs {}".format(kwargs)) - resp = requests.request(method, url, **kwargs) - if status_code != resp.status_code: - error_msg = "Expected HTTP request code is `%s` actual `%s`" -diff --git a/rally/task/runner.py b/rally/task/runner.py -index 3397e1193..295558f44 100644 ---- a/rally/task/runner.py -+++ b/rally/task/runner.py -@@ -87,6 +87,11 @@ def _run_scenario_once(cls, method_name, context_obj, scenario_kwargs, + from rally.common import logging +@@ -51,10 +52,14 @@ def _get_scenario_context(iteration, context_obj): + def _run_scenario_once(cls, method_name, context_obj, scenario_kwargs, + event_queue): + iteration = context_obj["iteration"] ++ LOG.info("DEBUGRACE %s putting in event_queue iteration: %s", ++ threading.get_native_id(), iteration) + event_queue.put({ + "type": "iteration", + "value": iteration, + }) ++ LOG.info("DEBUGRACE %s put in event_queue iteration: %s", ++ threading.get_native_id(), iteration) + + # provide arguments isolation between iterations + scenario_kwargs = copy.deepcopy(scenario_kwargs) +@@ -65,6 +70,8 @@ def _run_scenario_once(cls, method_name, context_obj, scenario_kwargs, + scenario_inst = cls(context_obj) + error = [] + try: ++ LOG.info("DEBUGRACE %s running %s %s", ++ threading.get_native_id(), scenario_inst, scenario_inst) + with rutils.Timer() as timer: + getattr(scenario_inst, method_name)(**scenario_kwargs) + except Exception as e: +@@ -87,8 +94,14 @@ def _run_scenario_once(cls, method_name, context_obj, scenario_kwargs, def _worker_thread(queue, cls, method_name, context_obj, scenario_kwargs, event_queue): -+ LOG.debug( -+ "queue.put _run_scenario_once:\n\t%(cls)s\n\t%(method_name)s\n\t" -+ "%(context_obj)s\n\t%(scenario_kwargs)s\n\t%(event_queue)s" % -+ {"cls": cls, "method_name": method_name, "context_obj": context_obj, -+ "scenario_kwargs": scenario_kwargs, "event_queue": event_queue}) - queue.put(_run_scenario_once(cls, method_name, context_obj, - scenario_kwargs, event_queue)) - -@@ -186,6 +191,8 @@ class ScenarioRunner(plugin.Plugin, validation.ValidatablePluginMixin, +- queue.put(_run_scenario_once(cls, method_name, context_obj, +- scenario_kwargs, event_queue)) ++ result = _run_scenario_once(cls, method_name, context_obj, ++ scenario_kwargs, event_queue) ++ LOG.info("DEBUGRACE %s putting in result_queue context_obj: %s", ++ threading.get_native_id(), ++ context_obj) ++ queue.put(result) ++ LOG.info("DEBUGRACE %s put in result_queue context_obj: %s: %s", ++ threading.get_native_id(), context_obj, result) + + + def _log_worker_info(**info): +@@ -186,9 +199,9 @@ class ScenarioRunner(plugin.Plugin, validation.ValidatablePluginMixin, for i in range(processes_to_start): kwrgs = {"processes_to_start": processes_to_start, "processes_counter": i} -+ LOG.warning( -+ "!! _create_process_pool %(kwrgs)s !! " % {"kwrgs": kwrgs}) - process = multiprocessing.Process(target=worker_process, - args=next(worker_args_gen), - kwargs={"info": kwrgs}) -@@ -202,22 +209,28 @@ class ScenarioRunner(plugin.Plugin, validation.ValidatablePluginMixin, - :param event_queue: multiprocessing.Queue that receives the events - """ - while process_pool: -+ LOG.warn("Cedric _join_processes process_pool {}".format(process_pool)) - while process_pool and not process_pool[0].is_alive(): -+ LOG.warn("Cedric _join_processes process_pool {}".format(process_pool)) - process_pool.popleft().join() - - if result_queue.empty() and event_queue.empty(): - # sleep a bit to avoid 100% usage of CPU by this method -+ LOG.warn("Cedric _join_processes result_queue is empty {}".format(result_queue)) -+ LOG.warn("Cedric _join_processes event_queue is empty {}".format(event_queue)) - time.sleep(0.01) - - while not event_queue.empty(): -- self.send_event(**event_queue.get()) -+ col_event_queue = event_queue.get() -+ LOG.warn("Cedric _join_processes event_queue is not empty {}".format(col_event_queue)) -+ self.send_event(**col_event_queue) - - while not result_queue.empty(): -- self._send_result(result_queue.get()) -+ col_result_queue = result_queue.get() -+ LOG.warn("Cedric _join_processes result_queue is not empty {}".format(col_result_queue)) -+ self._send_result(col_result_queue) - - self._flush_results() -- result_queue.close() -- event_queue.close() - - def _flush_results(self): - if self.result_batch: -@@ -245,8 +258,13 @@ class ScenarioRunner(plugin.Plugin, validation.ValidatablePluginMixin, - if len(self.result_batch) >= self.batch_size: - sorted_batch = sorted(self.result_batch, - key=lambda r: result["timestamp"]) -+ LOG.debug("result_queue.append:\n\t%(sorted_batch)s" % { -+ "sorted_batch": sorted_batch -+ }) - self.result_queue.append(sorted_batch) - del self.result_batch[:] -+ else: -+ LOG.debug("WAHT DOEST IT MEAN? ") - - def send_event(self, type, value=None): - """Store event to send it to consumer later. -@@ -254,6 +272,9 @@ class ScenarioRunner(plugin.Plugin, validation.ValidatablePluginMixin, - :param type: Event type - :param value: Optional event data - """ -+ LOG.debug("send_event:\n\t%(type)s\n\t%(value)s" % { -+ "type": type, "value": value -+ }) - self.event_queue.append({"type": type, - "value": value}) - -diff --git a/rally/task/utils.py b/rally/task/utils.py -index 1252a1fc7..783adf3c6 100644 ---- a/rally/task/utils.py -+++ b/rally/task/utils.py -@@ -176,6 +176,9 @@ def wait_for_status(resource, ready_statuses, failure_statuses=["error"], - timeout=60, check_interval=1, check_deletion=False, - id_attr="id"): - -+ LOG.debug( -+ "Waiting for status %(resource)s" % {"resource": resource}) -+ - resource_repr = getattr(resource, "name", repr(resource)) - if not isinstance(ready_statuses, (set, list, tuple)): - raise ValueError("Ready statuses should be supplied as set, list or " -@@ -187,7 +190,11 @@ def wait_for_status(resource, ready_statuses, failure_statuses=["error"], - - # make all statuses upper case - ready_statuses = set(s.upper() for s in ready_statuses or []) -+ LOG.debug("%(resource)s: ready_statuses %(ready_statuses)s" % { -+ "resource": resource_repr, "ready_statuses": ready_statuses}) - failure_statuses = set(s.upper() for s in failure_statuses or []) -+ LOG.debug("%(resource)s: failure_statuses %(failure_statuses)s" % { -+ "resource": resource_repr, "failure_statuses": failure_statuses}) - - if (ready_statuses & failure_statuses): - raise ValueError( -@@ -205,9 +212,13 @@ def wait_for_status(resource, ready_statuses, failure_statuses=["error"], - start = time.time() - - latest_status = get_status(resource, status_attr) -+ LOG.debug("%(resource)s: latest_status %(latest_status)s" % { -+ "resource": resource_repr, "latest_status": latest_status}) - latest_status_update = start - - while True: -+ LOG.debug("%(resource)s: timeout %(timeout)s" % { -+ "resource": resource_repr, "timeout": timeout}) - try: - if id_attr == "id": - resource = update_resource(resource) -@@ -240,7 +251,11 @@ def wait_for_status(resource, ready_statuses, failure_statuses=["error"], - status=status, - fault="Status in failure list %s" % str(failure_statuses)) +- process = multiprocessing.Process(target=worker_process, +- args=next(worker_args_gen), +- kwargs={"info": kwrgs}) ++ process = threading.Thread(target=worker_process, ++ args=next(worker_args_gen), ++ kwargs={"info": kwrgs}) + process.start() + process_pool.append(process) -+ LOG.debug("%(resource)s: check_interval %(check_interval)s" % { -+ "resource": resource_repr, "check_interval": check_interval}) - time.sleep(check_interval) -+ LOG.debug("%(resource)s: elapsed_time %(elapsed_time)s" % { -+ "resource": resource_repr, "elapsed_time": time.time() - start}) - if time.time() - start > timeout: - raise exceptions.TimeoutException( - desired_status="('%s')" % "', '".join(ready_statuses), -- 2.26.2 |