diff options
Diffstat (limited to 'api/escalator/common/rpc.py')
-rw-r--r-- | api/escalator/common/rpc.py | 279 |
1 files changed, 279 insertions, 0 deletions
diff --git a/api/escalator/common/rpc.py b/api/escalator/common/rpc.py new file mode 100644 index 0000000..4d50461 --- /dev/null +++ b/api/escalator/common/rpc.py @@ -0,0 +1,279 @@ +# Copyright 2013 Red Hat, Inc. +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +""" +RPC Controller +""" +import datetime +import traceback + +from oslo_config import cfg +from oslo_log import log as logging +import oslo_utils.importutils as imp +from oslo_utils import timeutils +import six +from webob import exc + +from escalator.common import client +from escalator.common import exception +from escalator.common import utils +from escalator.common import wsgi +from escalator import i18n + +LOG = logging.getLogger(__name__) +_ = i18n._ +_LE = i18n._LE + + +rpc_opts = [ + # NOTE(flaper87): Shamelessly copied + # from oslo rpc. + cfg.ListOpt('allowed_rpc_exception_modules', + default=['openstack.common.exception', + 'escalator.common.exception', + 'exceptions', + ], + help='Modules of exceptions that are permitted to be recreated' + ' upon receiving exception data from an rpc call.'), +] + +CONF = cfg.CONF +CONF.register_opts(rpc_opts) + + +class RPCJSONSerializer(wsgi.JSONResponseSerializer): + + def _sanitizer(self, obj): + def to_primitive(_type, _value): + return {"_type": _type, "_value": _value} + + if isinstance(obj, datetime.datetime): + return to_primitive("datetime", timeutils.strtime(obj)) + + return super(RPCJSONSerializer, self)._sanitizer(obj) + + +class RPCJSONDeserializer(wsgi.JSONRequestDeserializer): + + def _to_datetime(self, obj): + return timeutils.parse_strtime(obj) + + def _sanitizer(self, obj): + try: + _type, _value = obj["_type"], obj["_value"] + return getattr(self, "_to_" + _type)(_value) + except (KeyError, AttributeError): + return obj + + +class Controller(object): + """ + Base RPCController. + + This is the base controller for RPC based APIs. Commands + handled by this controller respect the following form: + + [{ + 'command': 'method_name', + 'kwargs': {...} + }] + + The controller is capable of processing more than one command + per request and will always return a list of results. + + :params raise_exc: Boolean that specifies whether to raise + exceptions instead of "serializing" them. + """ + + def __init__(self, raise_exc=False): + self._registered = {} + self.raise_exc = raise_exc + + def register(self, resource, filtered=None, excluded=None, refiner=None): + """ + Exports methods through the RPC Api. + + :params resource: Resource's instance to register. + :params filtered: List of methods that *can* be registered. Read + as "Method must be in this list". + :params excluded: List of methods to exclude. + :params refiner: Callable to use as filter for methods. + + :raises AssertionError: If refiner is not callable. + """ + + funcs = filter(lambda x: not x.startswith("_"), dir(resource)) + + if filtered: + funcs = [f for f in funcs if f in filtered] + + if excluded: + funcs = [f for f in funcs if f not in excluded] + + if refiner: + assert callable(refiner), "Refiner must be callable" + funcs = filter(refiner, funcs) + + for name in funcs: + meth = getattr(resource, name) + + if not callable(meth): + continue + + self._registered[name] = meth + + def __call__(self, req, body): + """ + Executes the command + """ + + if not isinstance(body, list): + msg = _("Request must be a list of commands") + raise exc.HTTPBadRequest(explanation=msg) + + def validate(cmd): + if not isinstance(cmd, dict): + msg = _("Bad Command: %s") % str(cmd) + raise exc.HTTPBadRequest(explanation=msg) + + command, kwargs = cmd.get("command"), cmd.get("kwargs") + + if (not command or not isinstance(command, six.string_types) or + (kwargs and not isinstance(kwargs, dict))): + msg = _("Wrong command structure: %s") % (str(cmd)) + raise exc.HTTPBadRequest(explanation=msg) + + method = self._registered.get(command) + if not method: + # Just raise 404 if the user tries to + # access a private method. No need for + # 403 here since logically the command + # is not registered to the rpc dispatcher + raise exc.HTTPNotFound(explanation=_("Command not found")) + + return True + + # If more than one command were sent then they might + # be intended to be executed sequentially, that for, + # lets first verify they're all valid before executing + # them. + commands = filter(validate, body) + + results = [] + for cmd in commands: + # kwargs is not required + command, kwargs = cmd["command"], cmd.get("kwargs", {}) + method = self._registered[command] + try: + result = method(req.context, **kwargs) + except Exception as e: + if self.raise_exc: + raise + + cls, val = e.__class__, utils.exception_to_str(e) + msg = (_LE("RPC Call Error: %(val)s\n%(tb)s") % + dict(val=val, tb=traceback.format_exc())) + LOG.error(msg) + + # NOTE(flaper87): Don't propagate all exceptions + # but the ones allowed by the user. + module = cls.__module__ + if module not in CONF.allowed_rpc_exception_modules: + cls = exception.RPCError + val = six.text_type(exception.RPCError(cls=cls, val=val)) + + cls_path = "%s.%s" % (cls.__module__, cls.__name__) + result = {"_error": {"cls": cls_path, "val": val}} + results.append(result) + return results + + +class RPCClient(client.BaseClient): + + def __init__(self, *args, **kwargs): + self._serializer = RPCJSONSerializer() + self._deserializer = RPCJSONDeserializer() + + self.raise_exc = kwargs.pop("raise_exc", True) + self.base_path = kwargs.pop("base_path", '/rpc') + super(RPCClient, self).__init__(*args, **kwargs) + + @client.handle_unauthenticated + def bulk_request(self, commands): + """ + Execute multiple commands in a single request. + + :params commands: List of commands to send. Commands + must respect the following form: + + { + 'command': 'method_name', + 'kwargs': method_kwargs + } + """ + body = self._serializer.to_json(commands) + response = super(RPCClient, self).do_request('POST', + self.base_path, + body) + return self._deserializer.from_json(response.read()) + + def do_request(self, method, **kwargs): + """ + Simple do_request override. This method serializes + the outgoing body and builds the command that will + be sent. + + :params method: The remote python method to call + :params kwargs: Dynamic parameters that will be + passed to the remote method. + """ + content = self.bulk_request([{'command': method, + 'kwargs': kwargs}]) + + # NOTE(flaper87): Return the first result if + # a single command was executed. + content = content[0] + + # NOTE(flaper87): Check if content is an error + # and re-raise it if raise_exc is True. Before + # checking if content contains the '_error' key, + # verify if it is an instance of dict - since the + # RPC call may have returned something different. + if self.raise_exc and (isinstance(content, dict) and + '_error' in content): + error = content['_error'] + try: + exc_cls = imp.import_class(error['cls']) + raise exc_cls(error['val']) + except ImportError: + # NOTE(flaper87): The exception + # class couldn't be imported, using + # a generic exception. + raise exception.RPCError(**error) + return content + + def __getattr__(self, item): + """ + This method returns a method_proxy that + will execute the rpc call in the registry + service. + """ + if item.startswith('_'): + raise AttributeError(item) + + def method_proxy(**kw): + return self.do_request(item, **kw) + + return method_proxy |