diff options
Diffstat (limited to 'yardstick/common/messaging')
-rw-r--r-- | yardstick/common/messaging/__init__.py | 10 | ||||
-rw-r--r-- | yardstick/common/messaging/consumer.py | 11 | ||||
-rw-r--r-- | yardstick/common/messaging/payloads.py | 20 | ||||
-rw-r--r-- | yardstick/common/messaging/producer.py | 13 |
4 files changed, 36 insertions, 18 deletions
diff --git a/yardstick/common/messaging/__init__.py b/yardstick/common/messaging/__init__.py index f0f012ec3..089c99c9f 100644 --- a/yardstick/common/messaging/__init__.py +++ b/yardstick/common/messaging/__init__.py @@ -1,4 +1,4 @@ -# Copyright (c) 2018 Intel Corporation +# Copyright (c) 2018-2019 Intel Corporation # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -26,11 +26,3 @@ TRANSPORT_URL = (MQ_SERVICE + '://' + MQ_USER + ':' + MQ_PASS + '@' + SERVER + # RPC server. RPC_SERVER_EXECUTOR = 'threading' - -# Topics. -RUNNER = 'runner' - -# Methods. -# RUNNER methods: -RUNNER_INFO = 'runner_info' -RUNNER_LOOP = 'runner_loop' diff --git a/yardstick/common/messaging/consumer.py b/yardstick/common/messaging/consumer.py index 24ec6f184..7ce9bdaf7 100644 --- a/yardstick/common/messaging/consumer.py +++ b/yardstick/common/messaging/consumer.py @@ -29,9 +29,10 @@ LOG = logging.getLogger(__name__) class NotificationHandler(object): """Abstract class to define a endpoint object for a MessagingConsumer""" - def __init__(self, _id, ctx_pids, queue): + def __init__(self, _id, ctx_ids, queue): + super(NotificationHandler, self).__init__() self._id = _id - self._ctx_pids = ctx_pids + self._ctx_ids = ctx_ids self._queue = queue @@ -43,11 +44,11 @@ class MessagingConsumer(object): the messages published by a `MessagingNotifier`. """ - def __init__(self, topic, pids, endpoints, fanout=True): + def __init__(self, topic, ids, endpoints, fanout=True): """Init function. :param topic: (string) MQ exchange topic - :param pids: (list of int) list of PIDs of the processes implementing + :param ids: (list of int) list of IDs of the processes implementing the MQ Notifier which will be in the message context :param endpoints: (list of class) list of classes implementing the methods (see `MessagingNotifier.send_message) used by @@ -58,7 +59,7 @@ class MessagingConsumer(object): :returns: `MessagingConsumer` class object """ - self._pids = pids + self._ids = ids self._endpoints = endpoints self._transport = oslo_messaging.get_rpc_transport( cfg.CONF, url=messaging.TRANSPORT_URL) diff --git a/yardstick/common/messaging/payloads.py b/yardstick/common/messaging/payloads.py index d29d79808..8ede1e58e 100644 --- a/yardstick/common/messaging/payloads.py +++ b/yardstick/common/messaging/payloads.py @@ -51,3 +51,23 @@ class Payload(object): def dict_to_obj(cls, _dict): """Returns a Payload object built from the dictionary elements""" return cls(**_dict) + + +class TrafficGeneratorPayload(Payload): + """Base traffic generator payload class""" + REQUIRED_FIELDS = { + 'version', # (str) version of the payload transmitted. + 'iteration', # (int) iteration index during the traffic injection, + # starting from 1. + 'kpi' # (dict) collection of KPIs collected from the traffic + # injection. The content will depend on the generator and the + # traffic type. + } + + +class RunnerPayload(Payload): + """Base runner payload class""" + REQUIRED_FIELDS = { + 'version', # (str) version of the payload transmitted. + 'data' # (dict) generic container of data to be used if needed. + } diff --git a/yardstick/common/messaging/producer.py b/yardstick/common/messaging/producer.py index b6adc0c17..aadab649d 100644 --- a/yardstick/common/messaging/producer.py +++ b/yardstick/common/messaging/producer.py @@ -34,18 +34,18 @@ class MessagingProducer(object): messages in a message queue. """ - def __init__(self, topic, pid=os.getpid(), fanout=True): + def __init__(self, topic, _id=os.getpid(), fanout=True): """Init function. :param topic: (string) MQ exchange topic - :param pid: (int) PID of the process implementing this MQ Notifier + :param id: (int) ID of the process implementing this MQ Notifier :param fanout: (bool) MQ clients may request that a copy of the message be delivered to all servers listening on a topic by setting fanout to ``True``, rather than just one of them :returns: `MessagingNotifier` class object """ self._topic = topic - self._pid = pid + self._id = _id self._fanout = fanout self._transport = oslo_messaging.get_rpc_transport( cfg.CONF, url=messaging.TRANSPORT_URL) @@ -65,6 +65,11 @@ class MessagingProducer(object): consumer endpoints :param payload: (subclass `Payload`) payload content """ - self._notifier.cast({'pid': self._pid}, + self._notifier.cast({'id': self._id}, method, **payload.obj_to_dict()) + + @property + def id(self): + """Return MQ producer ID""" + return self._id |