From 41ce1778631894401573161e627a78c2b44182a1 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?C=C3=A9dric=20Ollivier?= Date: Thu, 30 Apr 2020 13:59:24 +0200 Subject: [PATCH] Try to detect the race conditions MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Change-Id: I9b468ec1cf79e0a66abeb1fb48f5f0f067c2c198 Signed-off-by: Cédric Ollivier --- rally/cli/main.py | 6 +++ rally/plugins/task/runners/constant.py | 41 +++++++++++++++---- rally/plugins/task/runners/rps.py | 8 ++-- .../task/scenarios/requests/http_requests.py | 9 ++++ .../plugins/task/scenarios/requests/utils.py | 9 ++++ rally/task/runner.py | 29 +++++++++++-- rally/task/utils.py | 15 +++++++ 7 files changed, 102 insertions(+), 15 deletions(-) diff --git a/rally/cli/main.py b/rally/cli/main.py index 235a57113..d931924d8 100644 --- a/rally/cli/main.py +++ b/rally/cli/main.py @@ -15,6 +15,12 @@ """CLI interface for Rally.""" +import threading +threading.stack_size(1024 * 1024) + +import multiprocessing as mp +mp.set_start_method('fork') + import sys from rally.cli import cliutils diff --git a/rally/plugins/task/runners/constant.py b/rally/plugins/task/runners/constant.py index 5feb1fee1..2f6142cfa 100644 --- a/rally/plugins/task/runners/constant.py +++ b/rally/plugins/task/runners/constant.py @@ -15,7 +15,7 @@ import collections import multiprocessing -import queue as Queue +import queue import threading import time @@ -24,6 +24,10 @@ from rally.common import validation from rally import consts from rally.task import runner +from rally.common import cfg +from rally.common import logging +CONF = cfg.CONF +LOG = logging.getLogger(__file__) def _worker_process(queue, iteration_gen, timeout, concurrency, times, duration, context, cls, method_name, args, event_queue, @@ -55,6 +59,9 @@ def _worker_process(queue, iteration_gen, timeout, concurrency, times, """ def _to_be_continued(iteration, current_duration, aborted, times=None, duration=None): + LOG.warning( + "!! _to_be_continued %(iteration)s %(current_duration)s %(aborted)s %(times)s %(duration)s !! " % + {"iteration": iteration, "current_duration": current_duration, "aborted": aborted, "times": times, "duration": duration}) if times is not None: return iteration < times and not aborted.is_set() elif duration is not None: @@ -74,7 +81,7 @@ def _worker_process(queue, iteration_gen, timeout, concurrency, times, method_name=method_name, args=args) if timeout: - timeout_queue = Queue.Queue() + timeout_queue = queue.Queue() collector_thr_by_timeout = threading.Thread( target=utils.timeout_thread, args=(timeout_queue, ) @@ -82,6 +89,7 @@ def _worker_process(queue, iteration_gen, timeout, concurrency, times, collector_thr_by_timeout.start() iteration = next(iteration_gen) + LOG.warning("!! iteration %(iteration)s !! " % {"iteration": iteration}) start_time = time.time() # NOTE(msimonin): keep the previous behaviour # > when duration is 0, scenario executes exactly 1 time @@ -93,13 +101,25 @@ def _worker_process(queue, iteration_gen, timeout, concurrency, times, worker_args = ( queue, cls, method_name, scenario_context, args, event_queue) + LOG.warn( + "Cedric _to_be_continued threading.Thread {} {}".format( + runner._worker_thread, worker_args)) thread = threading.Thread(target=runner._worker_thread, args=worker_args) + LOG.warn( + "Cedric _to_be_continued thread.start() {} {}".format( + runner._worker_thread, worker_args)) thread.start() + LOG.warn( + "Cedric _to_be_continued thread.start() {} {}".format( + runner._worker_thread, worker_args)) if timeout: timeout_queue.put((thread, time.time() + timeout)) pool.append(thread) + LOG.warn( + "Cedric _to_be_continued pool.append {} {}".format( + pool, thread)) alive_threads_in_pool += 1 while alive_threads_in_pool == concurrency: @@ -128,7 +148,14 @@ def _worker_process(queue, iteration_gen, timeout, concurrency, times, # Wait until all threads are done while pool: - pool.popleft().join() + thr = pool.popleft() + LOG.warn( + "Cedric _worker_process wait_all_threads {} {} BEFORE JOIN".format( + pool, thr)) + thr.join() + LOG.warn( + "Cedric _worker_process wait_all_threads {} {} AFTER JOIN".format( + pool, thr)) if timeout: timeout_queue.put((None, None,)) @@ -229,8 +256,8 @@ class ConstantScenarioRunner(runner.ScenarioRunner): concurrency_per_worker=concurrency_per_worker, concurrency_overhead=concurrency_overhead) - result_queue = multiprocessing.Queue() - event_queue = multiprocessing.Queue() + result_queue = queue.Queue() + event_queue = queue.Queue() def worker_args_gen(concurrency_overhead): while True: @@ -324,8 +351,8 @@ class ConstantForDurationScenarioRunner(runner.ScenarioRunner): concurrency_per_worker=concurrency_per_worker, concurrency_overhead=concurrency_overhead) - result_queue = multiprocessing.Queue() - event_queue = multiprocessing.Queue() + result_queue = queue.Queue() + event_queue = queue.Queue() def worker_args_gen(concurrency_overhead): while True: diff --git a/rally/plugins/task/runners/rps.py b/rally/plugins/task/runners/rps.py index 98a706d11..26b55feab 100644 --- a/rally/plugins/task/runners/rps.py +++ b/rally/plugins/task/runners/rps.py @@ -15,7 +15,7 @@ import collections import multiprocessing -import queue as Queue +import queue import threading import time @@ -69,7 +69,7 @@ def _worker_process(queue, iteration_gen, timeout, times, max_concurrent, (sleep * info["processes_counter"]) / info["processes_to_start"]) start = time.time() - timeout_queue = Queue.Queue() + timeout_queue = queue.Queue() if timeout: collector_thr_by_timeout = threading.Thread( @@ -260,8 +260,8 @@ class RPSScenarioRunner(runner.ScenarioRunner): concurrency_per_worker=concurrency_per_worker, concurrency_overhead=concurrency_overhead) - result_queue = multiprocessing.Queue() - event_queue = multiprocessing.Queue() + result_queue = queue.Queue() + event_queue = queue.Queue() def worker_args_gen(times_overhead, concurrency_overhead): """Generate arguments for process worker. diff --git a/rally/plugins/task/scenarios/requests/http_requests.py b/rally/plugins/task/scenarios/requests/http_requests.py index e85ee5af2..4afdf29f1 100644 --- a/rally/plugins/task/scenarios/requests/http_requests.py +++ b/rally/plugins/task/scenarios/requests/http_requests.py @@ -15,6 +15,11 @@ import random from rally.plugins.task.scenarios.requests import utils from rally.task import scenario +from rally.common import cfg +from rally.common import logging +CONF = cfg.CONF +LOG = logging.getLogger(__file__) + """Scenarios for HTTP requests.""" @@ -34,6 +39,10 @@ class HttpRequestsCheckRequest(utils.RequestScenario): :param kwargs: optional additional request parameters """ + LOG.warn("Cedric run url {}".format(url)) + LOG.warn("Cedric run method {}".format(method)) + LOG.warn("Cedric run status_code {}".format(status_code)) + LOG.warn("Cedric run kwargs {}".format(kwargs)) self._check_request(url, method, status_code, **kwargs) diff --git a/rally/plugins/task/scenarios/requests/utils.py b/rally/plugins/task/scenarios/requests/utils.py index 8fd35347a..cd69900a5 100644 --- a/rally/plugins/task/scenarios/requests/utils.py +++ b/rally/plugins/task/scenarios/requests/utils.py @@ -15,6 +15,11 @@ import requests from rally.task import atomic from rally.task import scenario +from rally.common import cfg +from rally.common import logging +CONF = cfg.CONF +LOG = logging.getLogger(__file__) + class RequestScenario(scenario.Scenario): """Base class for Request scenarios with basic atomic actions.""" @@ -31,6 +36,10 @@ class RequestScenario(scenario.Scenario): not equal to expected status code """ + LOG.warn("Cedric _check_request url {}".format(url)) + LOG.warn("Cedric _check_request method {}".format(method)) + LOG.warn("Cedric _check_request status_code {}".format(status_code)) + LOG.warn("Cedric _check_request kwargs {}".format(kwargs)) resp = requests.request(method, url, **kwargs) if status_code != resp.status_code: error_msg = "Expected HTTP request code is `%s` actual `%s`" diff --git a/rally/task/runner.py b/rally/task/runner.py index 3397e1193..295558f44 100644 --- a/rally/task/runner.py +++ b/rally/task/runner.py @@ -87,6 +87,11 @@ def _run_scenario_once(cls, method_name, context_obj, scenario_kwargs, def _worker_thread(queue, cls, method_name, context_obj, scenario_kwargs, event_queue): + LOG.debug( + "queue.put _run_scenario_once:\n\t%(cls)s\n\t%(method_name)s\n\t" + "%(context_obj)s\n\t%(scenario_kwargs)s\n\t%(event_queue)s" % + {"cls": cls, "method_name": method_name, "context_obj": context_obj, + "scenario_kwargs": scenario_kwargs, "event_queue": event_queue}) queue.put(_run_scenario_once(cls, method_name, context_obj, scenario_kwargs, event_queue)) @@ -186,6 +191,8 @@ class ScenarioRunner(plugin.Plugin, validation.ValidatablePluginMixin, for i in range(processes_to_start): kwrgs = {"processes_to_start": processes_to_start, "processes_counter": i} + LOG.warning( + "!! _create_process_pool %(kwrgs)s !! " % {"kwrgs": kwrgs}) process = multiprocessing.Process(target=worker_process, args=next(worker_args_gen), kwargs={"info": kwrgs}) @@ -202,22 +209,28 @@ class ScenarioRunner(plugin.Plugin, validation.ValidatablePluginMixin, :param event_queue: multiprocessing.Queue that receives the events """ while process_pool: + LOG.warn("Cedric _join_processes process_pool {}".format(process_pool)) while process_pool and not process_pool[0].is_alive(): + LOG.warn("Cedric _join_processes process_pool {}".format(process_pool)) process_pool.popleft().join() if result_queue.empty() and event_queue.empty(): # sleep a bit to avoid 100% usage of CPU by this method + LOG.warn("Cedric _join_processes result_queue is empty {}".format(result_queue)) + LOG.warn("Cedric _join_processes event_queue is empty {}".format(event_queue)) time.sleep(0.01) while not event_queue.empty(): - self.send_event(**event_queue.get()) + col_event_queue = event_queue.get() + LOG.warn("Cedric _join_processes event_queue is not empty {}".format(col_event_queue)) + self.send_event(**col_event_queue) while not result_queue.empty(): - self._send_result(result_queue.get()) + col_result_queue = result_queue.get() + LOG.warn("Cedric _join_processes result_queue is not empty {}".format(col_result_queue)) + self._send_result(col_result_queue) self._flush_results() - result_queue.close() - event_queue.close() def _flush_results(self): if self.result_batch: @@ -245,8 +258,13 @@ class ScenarioRunner(plugin.Plugin, validation.ValidatablePluginMixin, if len(self.result_batch) >= self.batch_size: sorted_batch = sorted(self.result_batch, key=lambda r: result["timestamp"]) + LOG.debug("result_queue.append:\n\t%(sorted_batch)s" % { + "sorted_batch": sorted_batch + }) self.result_queue.append(sorted_batch) del self.result_batch[:] + else: + LOG.debug("WAHT DOEST IT MEAN? ") def send_event(self, type, value=None): """Store event to send it to consumer later. @@ -254,6 +272,9 @@ class ScenarioRunner(plugin.Plugin, validation.ValidatablePluginMixin, :param type: Event type :param value: Optional event data """ + LOG.debug("send_event:\n\t%(type)s\n\t%(value)s" % { + "type": type, "value": value + }) self.event_queue.append({"type": type, "value": value}) diff --git a/rally/task/utils.py b/rally/task/utils.py index 1252a1fc7..783adf3c6 100644 --- a/rally/task/utils.py +++ b/rally/task/utils.py @@ -176,6 +176,9 @@ def wait_for_status(resource, ready_statuses, failure_statuses=["error"], timeout=60, check_interval=1, check_deletion=False, id_attr="id"): + LOG.debug( + "Waiting for status %(resource)s" % {"resource": resource}) + resource_repr = getattr(resource, "name", repr(resource)) if not isinstance(ready_statuses, (set, list, tuple)): raise ValueError("Ready statuses should be supplied as set, list or " @@ -187,7 +190,11 @@ def wait_for_status(resource, ready_statuses, failure_statuses=["error"], # make all statuses upper case ready_statuses = set(s.upper() for s in ready_statuses or []) + LOG.debug("%(resource)s: ready_statuses %(ready_statuses)s" % { + "resource": resource_repr, "ready_statuses": ready_statuses}) failure_statuses = set(s.upper() for s in failure_statuses or []) + LOG.debug("%(resource)s: failure_statuses %(failure_statuses)s" % { + "resource": resource_repr, "failure_statuses": failure_statuses}) if (ready_statuses & failure_statuses): raise ValueError( @@ -205,9 +212,13 @@ def wait_for_status(resource, ready_statuses, failure_statuses=["error"], start = time.time() latest_status = get_status(resource, status_attr) + LOG.debug("%(resource)s: latest_status %(latest_status)s" % { + "resource": resource_repr, "latest_status": latest_status}) latest_status_update = start while True: + LOG.debug("%(resource)s: timeout %(timeout)s" % { + "resource": resource_repr, "timeout": timeout}) try: if id_attr == "id": resource = update_resource(resource) @@ -240,7 +251,11 @@ def wait_for_status(resource, ready_statuses, failure_statuses=["error"], status=status, fault="Status in failure list %s" % str(failure_statuses)) + LOG.debug("%(resource)s: check_interval %(check_interval)s" % { + "resource": resource_repr, "check_interval": check_interval}) time.sleep(check_interval) + LOG.debug("%(resource)s: elapsed_time %(elapsed_time)s" % { + "resource": resource_repr, "elapsed_time": time.time() - start}) if time.time() - start > timeout: raise exceptions.TimeoutException( desired_status="('%s')" % "', '".join(ready_statuses), -- 2.26.2