aboutsummaryrefslogtreecommitdiffstats
path: root/yardstick/common/messaging
diff options
context:
space:
mode:
Diffstat (limited to 'yardstick/common/messaging')
-rw-r--r--yardstick/common/messaging/__init__.py10
-rw-r--r--yardstick/common/messaging/consumer.py11
-rw-r--r--yardstick/common/messaging/payloads.py20
-rw-r--r--yardstick/common/messaging/producer.py13
4 files changed, 36 insertions, 18 deletions
diff --git a/yardstick/common/messaging/__init__.py b/yardstick/common/messaging/__init__.py
index f0f012ec3..089c99c9f 100644
--- a/yardstick/common/messaging/__init__.py
+++ b/yardstick/common/messaging/__init__.py
@@ -1,4 +1,4 @@
-# Copyright (c) 2018 Intel Corporation
+# Copyright (c) 2018-2019 Intel Corporation
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
@@ -26,11 +26,3 @@ TRANSPORT_URL = (MQ_SERVICE + '://' + MQ_USER + ':' + MQ_PASS + '@' + SERVER +
# RPC server.
RPC_SERVER_EXECUTOR = 'threading'
-
-# Topics.
-RUNNER = 'runner'
-
-# Methods.
-# RUNNER methods:
-RUNNER_INFO = 'runner_info'
-RUNNER_LOOP = 'runner_loop'
diff --git a/yardstick/common/messaging/consumer.py b/yardstick/common/messaging/consumer.py
index 24ec6f184..7ce9bdaf7 100644
--- a/yardstick/common/messaging/consumer.py
+++ b/yardstick/common/messaging/consumer.py
@@ -29,9 +29,10 @@ LOG = logging.getLogger(__name__)
class NotificationHandler(object):
"""Abstract class to define a endpoint object for a MessagingConsumer"""
- def __init__(self, _id, ctx_pids, queue):
+ def __init__(self, _id, ctx_ids, queue):
+ super(NotificationHandler, self).__init__()
self._id = _id
- self._ctx_pids = ctx_pids
+ self._ctx_ids = ctx_ids
self._queue = queue
@@ -43,11 +44,11 @@ class MessagingConsumer(object):
the messages published by a `MessagingNotifier`.
"""
- def __init__(self, topic, pids, endpoints, fanout=True):
+ def __init__(self, topic, ids, endpoints, fanout=True):
"""Init function.
:param topic: (string) MQ exchange topic
- :param pids: (list of int) list of PIDs of the processes implementing
+ :param ids: (list of int) list of IDs of the processes implementing
the MQ Notifier which will be in the message context
:param endpoints: (list of class) list of classes implementing the
methods (see `MessagingNotifier.send_message) used by
@@ -58,7 +59,7 @@ class MessagingConsumer(object):
:returns: `MessagingConsumer` class object
"""
- self._pids = pids
+ self._ids = ids
self._endpoints = endpoints
self._transport = oslo_messaging.get_rpc_transport(
cfg.CONF, url=messaging.TRANSPORT_URL)
diff --git a/yardstick/common/messaging/payloads.py b/yardstick/common/messaging/payloads.py
index d29d79808..8ede1e58e 100644
--- a/yardstick/common/messaging/payloads.py
+++ b/yardstick/common/messaging/payloads.py
@@ -51,3 +51,23 @@ class Payload(object):
def dict_to_obj(cls, _dict):
"""Returns a Payload object built from the dictionary elements"""
return cls(**_dict)
+
+
+class TrafficGeneratorPayload(Payload):
+ """Base traffic generator payload class"""
+ REQUIRED_FIELDS = {
+ 'version', # (str) version of the payload transmitted.
+ 'iteration', # (int) iteration index during the traffic injection,
+ # starting from 1.
+ 'kpi' # (dict) collection of KPIs collected from the traffic
+ # injection. The content will depend on the generator and the
+ # traffic type.
+ }
+
+
+class RunnerPayload(Payload):
+ """Base runner payload class"""
+ REQUIRED_FIELDS = {
+ 'version', # (str) version of the payload transmitted.
+ 'data' # (dict) generic container of data to be used if needed.
+ }
diff --git a/yardstick/common/messaging/producer.py b/yardstick/common/messaging/producer.py
index b6adc0c17..aadab649d 100644
--- a/yardstick/common/messaging/producer.py
+++ b/yardstick/common/messaging/producer.py
@@ -34,18 +34,18 @@ class MessagingProducer(object):
messages in a message queue.
"""
- def __init__(self, topic, pid=os.getpid(), fanout=True):
+ def __init__(self, topic, _id=os.getpid(), fanout=True):
"""Init function.
:param topic: (string) MQ exchange topic
- :param pid: (int) PID of the process implementing this MQ Notifier
+ :param id: (int) ID of the process implementing this MQ Notifier
:param fanout: (bool) MQ clients may request that a copy of the message
be delivered to all servers listening on a topic by
setting fanout to ``True``, rather than just one of them
:returns: `MessagingNotifier` class object
"""
self._topic = topic
- self._pid = pid
+ self._id = _id
self._fanout = fanout
self._transport = oslo_messaging.get_rpc_transport(
cfg.CONF, url=messaging.TRANSPORT_URL)
@@ -65,6 +65,11 @@ class MessagingProducer(object):
consumer endpoints
:param payload: (subclass `Payload`) payload content
"""
- self._notifier.cast({'pid': self._pid},
+ self._notifier.cast({'id': self._id},
method,
**payload.obj_to_dict())
+
+ @property
+ def id(self):
+ """Return MQ producer ID"""
+ return self._id