parametFrom b7a04b5993c232f4357a213517ed798cb15f3c25 Mon Sep 17 00:00:00 2001
From: =?UTF-8?q?C=C3=A9dric=20Ollivier?= <cedric.ollivier@orange.com>
Date: Thu, 30 Apr 2020 13:59:24 +0200
Subject: [PATCH] Try to detect the race conditions
MIME-Version: 1.0
Content-Type: text/plain; charset=UTF-8
Content-Transfer-Encoding: 8bit
Change-Id: I9b468ec1cf79e0a66abeb1fb48f5f0f067c2c198
Signed-off-by: Cédric Ollivier <cedric.ollivier@orange.com>
---
rally/cli/main.py | 3 ++
rally/plugins/task/runners/constant.py | 30 ++++++++++++++++++-
.../task/scenarios/requests/http_requests.py | 9 ++++++
.../plugins/task/scenarios/requests/utils.py | 9 ++++++
rally/task/runner.py | 27 +++++++++++++++--
rally/task/utils.py | 15 ++++++++++
6 files changed, 90 insertions(+), 3 deletions(-)
diff --git a/rally/cli/main.py b/rally/cli/main.py
index 235a57113..bcc41cb01 100644
--- a/rally/cli/main.py
+++ b/rally/cli/main.py
@@ -15,6 +15,9 @@
"""CLI interface for Rally."""
+import threading
+threading.stack_size(1024 * 1024)
+
import sys
from rally.cli import cliutils
diff --git a/rally/plugins/task/runners/constant.py b/rally/plugins/task/runners/constant.py
index 5feb1fee1..38a01e28e 100644
--- a/rally/plugins/task/runners/constant.py
+++ b/rally/plugins/task/runners/constant.py
@@ -24,6 +24,10 @@ from rally.common import validation
from rally import consts
from rally.task import runner
+from rally.common import cfg
+from rally.common import logging
+CONF = cfg.CONF
+LOG = logging.getLogger(__file__)
def _worker_process(queue, iteration_gen, timeout, concurrency, times,
duration, context, cls, method_name, args, event_queue,
@@ -55,6 +59,9 @@ def _worker_process(queue, iteration_gen, timeout, concurrency, times,
"""
def _to_be_continued(iteration, current_duration, aborted, times=None,
duration=None):
+ LOG.warning(
+ "!! _to_be_continued %(iteration)s %(current_duration)s %(aborted)s %(times)s %(duration)s !! " %
+ {"iteration": iteration, "current_duration": current_duration, "aborted": aborted, "times": times, "duration": duration})
if times is not None:
return iteration < times and not aborted.is_set()
elif duration is not None:
@@ -82,6 +89,7 @@ def _worker_process(queue, iteration_gen, timeout, concurrency, times,
collector_thr_by_timeout.start()
iteration = next(iteration_gen)
+ LOG.warning("!! iteration %(iteration)s !! " % {"iteration": iteration})
start_time = time.time()
# NOTE(msimonin): keep the previous behaviour
# > when duration is 0, scenario executes exactly 1 time
@@ -93,13 +101,25 @@ def _worker_process(queue, iteration_gen, timeout, concurrency, times,
worker_args = (
queue, cls, method_name, scenario_context, args, event_queue)
+ LOG.warn(
+ "Cedric _to_be_continued threading.Thread {} {}".format(
+ runner._worker_thread, worker_args))
thread = threading.Thread(target=runner._worker_thread,
args=worker_args)
+ LOG.warn(
+ "Cedric _to_be_continued thread.start() {} {}".format(
+ runner._worker_thread, worker_args))
thread.start()
+ LOG.warn(
+ "Cedric _to_be_continued thread.start() {} {}".format(
+ runner._worker_thread, worker_args))
if timeout:
timeout_queue.put((thread, time.time() + timeout))
pool.append(thread)
+ LOG.warn(
+ "Cedric _to_be_continued pool.append {} {}".format(
+ pool, thread))
alive_threads_in_pool += 1
while alive_threads_in_pool == concurrency:
@@ -128,7 +148,14 @@ def _worker_process(queue, iteration_gen, timeout, concurrency, times,
# Wait until all threads are done
while pool:
- pool.popleft().join()
+ thr = pool.popleft()
+ LOG.warn(
+ "Cedric _worker_process wait_all_threads {} {} BEFORE JOIN".format(
+ pool, thr))
+ thr.join()
+ LOG.warn(
+ "Cedric _worker_process wait_all_threads {} {} AFTER JOIN".format(
+ pool, thr))
if timeout:
timeout_queue.put((None, None,))
@@ -340,3 +367,4 @@ class ConstantForDurationScenarioRunner(runner.ScenarioRunner):
processes_to_start, _worker_process,
worker_args_gen(concurrency_overhead))
self._join_processes(process_pool, result_queue, event_queue)
+
diff --git a/rally/plugins/task/scenarios/requests/http_requests.py b/rally/plugins/task/scenarios/requests/http_requests.py
index e85ee5af2..4afdf29f1 100644
--- a/rally/plugins/task/scenarios/requests/http_requests.py
+++ b/rally/plugins/task/scenarios/requests/http_requests.py
@@ -15,6 +15,11 @@ import random
from rally.plugins.task.scenarios.requests import utils
from rally.task import scenario
+from rally.common import cfg
+from rally.common import logging
+CONF = cfg.CONF
+LOG = logging.getLogger(__file__)
+
"""Scenarios for HTTP requests."""
@@ -34,6 +39,10 @@ class HttpRequestsCheckRequest(utils.RequestScenario):
:param kwargs: optional additional request parameters
"""
+ LOG.warn("Cedric run url {}".format(url))
+ LOG.warn("Cedric run method {}".format(method))
+ LOG.warn("Cedric run status_code {}".format(status_code))
+ LOG.warn("Cedric run kwargs {}".format(kwargs))
self._check_request(url, method, status_code, **kwargs)
diff --git a/rally/plugins/task/scenarios/requests/utils.py b/rally/plugins/task/scenarios/requests/utils.py
index 8fd35347a..cd69900a5 100644
--- a/rally/plugins/task/scenarios/requests/utils.py
+++ b/rally/plugins/task/scenarios/requests/utils.py
@@ -15,6 +15,11 @@ import requests
from rally.task import atomic
from rally.task import scenario
+from rally.common import cfg
+from rally.common import logging
+CONF = cfg.CONF
+LOG = logging.getLogger(__file__)
+
class RequestScenario(scenario.Scenario):
"""Base class for Request scenarios with basic atomic actions."""
@@ -31,6 +36,10 @@ class RequestScenario(scenario.Scenario):
not equal to expected status code
"""
+ LOG.warn("Cedric _check_request url {}".format(url))
+ LOG.warn("Cedric _check_request method {}".format(method))
+ LOG.warn("Cedric _check_request status_code {}".format(status_code))
+ LOG.warn("Cedric _check_request kwargs {}".format(kwargs))
resp = requests.request(method, url, **kwargs)
if status_code != resp.status_code:
error_msg = "Expected HTTP request code is `%s` actual `%s`"
diff --git a/rally/task/runner.py b/rally/task/runner.py
index 3397e1193..57f9428f4 100644
--- a/rally/task/runner.py
+++ b/rally/task/runner.py
@@ -87,6 +87,11 @@ def _run_scenario_once(cls, method_name, context_obj, scenario_kwargs,
def _worker_thread(queue, cls, method_name, context_obj, scenario_kwargs,
event_queue):
+ LOG.debug(
+ "queue.put _run_scenario_once:\n\t%(cls)s\n\t%(method_name)s\n\t"
+ "%(context_obj)s\n\t%(scenario_kwargs)s\n\t%(event_queue)s" %
+ {"cls": cls, "method_name": method_name, "context_obj": context_obj,
+ "scenario_kwargs": scenario_kwargs, "event_queue": event_queue})
queue.put(_run_scenario_once(cls, method_name, context_obj,
scenario_kwargs, event_queue))
@@ -186,6 +191,8 @@ class ScenarioRunner(plugin.Plugin, validation.ValidatablePluginMixin,
for i in range(processes_to_start):
kwrgs = {"processes_to_start": processes_to_start,
"processes_counter": i}
+ LOG.warning(
+ "!! _create_process_pool %(kwrgs)s !! " % {"kwrgs": kwrgs})
process = multiprocessing.Process(target=worker_process,
args=next(worker_args_gen),
kwargs={"info": kwrgs})
@@ -202,18 +209,26 @@ class ScenarioRunner(plugin.Plugin, validation.ValidatablePluginMixin,
:param event_queue: multiprocessing.Queue that receives the events
"""
while process_pool:
+ LOG.warn("Cedric _join_processes process_pool {}".format(process_pool))
while process_pool and not process_pool[0].is_alive():
+ LOG.warn("Cedric _join_processes process_pool {}".format(process_pool))
process_pool.popleft().join()
if result_queue.empty() and event_queue.empty():
# sleep a bit to avoid 100% usage of CPU by this method
+ LOG.warn("Cedric _join_processes result_queue is empty {}".format(result_queue))
+ LOG.warn("Cedric _join_processes event_queue is empty {}".format(event_queue))
time.sleep(0.01)
while not event_queue.empty():
- self.send_event(**event_queue.get())
+ col_event_queue = event_queue.get()
+ LOG.warn("Cedric _join_processes event_queue is not empty {}".format(col_event_queue))
+ self.send_event(**col_event_queue)
while not result_queue.empty():
- self._send_result(result_queue.get())
+ col_result_queue = result_queue.get()
+ LOG.warn("Cedric _join_processes result_queue is not empty {}".format(col_result_queue))
+ self._send_result(col_result_queue)
self._flush_results()
result_queue.close()
@@ -245,8 +260,13 @@ class ScenarioRunner(plugin.Plugin, validation.ValidatablePluginMixin,
if len(self.result_batch) >= self.batch_size:
sorted_batch = sorted(self.result_batch,
key=lambda r: result["timestamp"])
+ LOG.debug("result_queue.append:\n\t%(sorted_batch)s" % {
+ "sorted_batch": sorted_batch
+ })
self.result_queue.append(sorted_batch)
del self.result_batch[:]
+ else:
+ LOG.debug("WAHT DOEST IT MEAN? ")
def send_event(self, type, value=None):
"""Store event to send it to consumer later.
@@ -254,6 +274,9 @@ class ScenarioRunner(plugin.Plugin, validation.ValidatablePluginMixin,
:param type: Event type
:param value: Optional event data
"""
+ LOG.debug("send_event:\n\t%(type)s\n\t%(value)s" % {
+ "type": type, "value": value
+ })
self.event_queue.append({"type": type,
"value": value})
diff --git a/rally/task/utils.py b/rally/task/utils.py
index 1252a1fc7..783adf3c6 100644
--- a/rally/task/utils.py
+++ b/rally/task/utils.py
@@ -176,6 +176,9 @@ def wait_for_status(resource, ready_statuses, failure_statuses=["error"],
timeout=60, check_interval=1, check_deletion=False,
id_attr="id"):
+ LOG.debug(
+ "Waiting for status %(resource)s" % {"resource": resource})
+
resource_repr = getattr(resource, "name", repr(resource))
if not isinstance(ready_statuses, (set, list, tuple)):
raise ValueError("Ready statuses should be supplied as set, list or "
@@ -187,7 +190,11 @@ def wait_for_status(resource, ready_statuses, failure_statuses=["error"],
# make all statuses upper case
ready_statuses = set(s.upper() for s in ready_statuses or [])
+ LOG.debug("%(resource)s: ready_statuses %(ready_statuses)s" % {
+ "resource": resource_repr, "ready_statuses": ready_statuses})
failure_statuses = set(s.upper() for s in failure_statuses or [])
+ LOG.debug("%(resource)s: failure_statuses %(failure_statuses)s" % {
+ "resource": resource_repr, "failure_statuses": failure_statuses})
if (ready_statuses & failure_statuses):
raise ValueError(
@@ -205,9 +212,13 @@ def wait_for_status(resource, ready_statuses, failure_statuses=["error"],
start = time.time()
latest_status = get_status(resource, status_attr)
+ LOG.debug("%(resource)s: latest_status %(latest_status)s" % {
+ "resource": resource_repr, "latest_status": latest_status})
latest_status_update = start
while True:
+ LOG.debug("%(resource)s: timeout %(timeout)s" % {
+ "resource": resource_repr, "timeout": timeout})
try:
if id_attr == "id":
resource = update_resource(resource)
@@ -240,7 +251,11 @@ def wait_for_status(resource, ready_statuses, failure_statuses=["error"],
status=status,
fault="Status in failure list %s" % str(failure_statuses))
+ LOG.debug("%(resource)s: check_interval %(check_interval)s" % {
+ "resource": resource_repr, "check_interval": check_interval})
time.sleep(check_interval)
+ LOG.debug("%(resource)s: elapsed_time %(elapsed_time)s" % {
+ "resource": resource_repr, "elapsed_time": time.time() - start})
if time.time() - start > timeout:
raise exceptions.TimeoutException(
desired_status="('%s')" % "', '".join(ready_statuses),
--
2.26.2