diff options
Diffstat (limited to 'api/escalator/common')
-rw-r--r-- | api/escalator/common/__init__.py | 0 | ||||
-rw-r--r-- | api/escalator/common/auth.py | 294 | ||||
-rw-r--r-- | api/escalator/common/client.py | 594 | ||||
-rw-r--r-- | api/escalator/common/config.py | 204 | ||||
-rw-r--r-- | api/escalator/common/crypt.py | 68 | ||||
-rw-r--r-- | api/escalator/common/exception.py | 521 | ||||
-rw-r--r-- | api/escalator/common/rpc.py | 279 | ||||
-rw-r--r-- | api/escalator/common/utils.py | 944 | ||||
-rw-r--r-- | api/escalator/common/wsgi.py | 911 |
9 files changed, 3815 insertions, 0 deletions
diff --git a/api/escalator/common/__init__.py b/api/escalator/common/__init__.py new file mode 100644 index 0000000..e69de29 --- /dev/null +++ b/api/escalator/common/__init__.py diff --git a/api/escalator/common/auth.py b/api/escalator/common/auth.py new file mode 100644 index 0000000..d3e2893 --- /dev/null +++ b/api/escalator/common/auth.py @@ -0,0 +1,294 @@ +# Copyright 2011 OpenStack Foundation +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +""" +This auth module is intended to allow OpenStack client-tools to select from a +variety of authentication strategies, including NoAuth (the default), and +Keystone (an identity management system). + + > auth_plugin = AuthPlugin(creds) + + > auth_plugin.authenticate() + + > auth_plugin.auth_token + abcdefg + + > auth_plugin.management_url + http://service_endpoint/ +""" +import httplib2 +from oslo_serialization import jsonutils +from oslo_log import log as logging +# NOTE(jokke): simplified transition to py3, behaves like py2 xrange +from six.moves import range +import six.moves.urllib.parse as urlparse + +from escalator.common import exception +from escalator import i18n + + +LOG = logging.getLogger(__name__) +_ = i18n._ + + +class BaseStrategy(object): + + def __init__(self): + self.auth_token = None + # TODO(sirp): Should expose selecting public/internal/admin URL. + self.management_url = None + + def authenticate(self): + raise NotImplementedError + + @property + def is_authenticated(self): + raise NotImplementedError + + @property + def strategy(self): + raise NotImplementedError + + +class NoAuthStrategy(BaseStrategy): + + def authenticate(self): + pass + + @property + def is_authenticated(self): + return True + + @property + def strategy(self): + return 'noauth' + + +class KeystoneStrategy(BaseStrategy): + MAX_REDIRECTS = 10 + + def __init__(self, creds, insecure=False, configure_via_auth=True): + self.creds = creds + self.insecure = insecure + self.configure_via_auth = configure_via_auth + super(KeystoneStrategy, self).__init__() + + def check_auth_params(self): + # Ensure that supplied credential parameters are as required + for required in ('username', 'password', 'auth_url', + 'strategy'): + if self.creds.get(required) is None: + raise exception.MissingCredentialError(required=required) + if self.creds['strategy'] != 'keystone': + raise exception.BadAuthStrategy(expected='keystone', + received=self.creds['strategy']) + # For v2.0 also check tenant is present + if self.creds['auth_url'].rstrip('/').endswith('v2.0'): + if self.creds.get("tenant") is None: + raise exception.MissingCredentialError(required='tenant') + + def authenticate(self): + """Authenticate with the Keystone service. + + There are a few scenarios to consider here: + + 1. Which version of Keystone are we using? v1 which uses headers to + pass the credentials, or v2 which uses a JSON encoded request body? + + 2. Keystone may respond back with a redirection using a 305 status + code. + + 3. We may attempt a v1 auth when v2 is what's called for. In this + case, we rewrite the url to contain /v2.0/ and retry using the v2 + protocol. + """ + def _authenticate(auth_url): + # If OS_AUTH_URL is missing a trailing slash add one + if not auth_url.endswith('/'): + auth_url += '/' + token_url = urlparse.urljoin(auth_url, "tokens") + # 1. Check Keystone version + is_v2 = auth_url.rstrip('/').endswith('v2.0') + if is_v2: + self._v2_auth(token_url) + else: + self._v1_auth(token_url) + + self.check_auth_params() + auth_url = self.creds['auth_url'] + for _ in range(self.MAX_REDIRECTS): + try: + _authenticate(auth_url) + except exception.AuthorizationRedirect as e: + # 2. Keystone may redirect us + auth_url = e.url + except exception.AuthorizationFailure: + # 3. In some configurations nova makes redirection to + # v2.0 keystone endpoint. Also, new location does not + # contain real endpoint, only hostname and port. + if 'v2.0' not in auth_url: + auth_url = urlparse.urljoin(auth_url, 'v2.0/') + else: + # If we successfully auth'd, then memorize the correct auth_url + # for future use. + self.creds['auth_url'] = auth_url + break + else: + # Guard against a redirection loop + raise exception.MaxRedirectsExceeded(redirects=self.MAX_REDIRECTS) + + def _v1_auth(self, token_url): + creds = self.creds + + headers = {} + headers['X-Auth-User'] = creds['username'] + headers['X-Auth-Key'] = creds['password'] + + tenant = creds.get('tenant') + if tenant: + headers['X-Auth-Tenant'] = tenant + + resp, resp_body = self._do_request(token_url, 'GET', headers=headers) + + def _management_url(self, resp): + for url_header in ('x-image-management-url', + 'x-server-management-url', + 'x-escalator'): + try: + return resp[url_header] + except KeyError as e: + not_found = e + raise not_found + + if resp.status in (200, 204): + try: + if self.configure_via_auth: + self.management_url = _management_url(self, resp) + self.auth_token = resp['x-auth-token'] + except KeyError: + raise exception.AuthorizationFailure() + elif resp.status == 305: + raise exception.AuthorizationRedirect(uri=resp['location']) + elif resp.status == 400: + raise exception.AuthBadRequest(url=token_url) + elif resp.status == 401: + raise exception.NotAuthenticated() + elif resp.status == 404: + raise exception.AuthUrlNotFound(url=token_url) + else: + raise Exception(_('Unexpected response: %s') % resp.status) + + def _v2_auth(self, token_url): + + creds = self.creds + + creds = { + "auth": { + "tenantName": creds['tenant'], + "passwordCredentials": { + "username": creds['username'], + "password": creds['password'] + } + } + } + + headers = {} + headers['Content-Type'] = 'application/json' + req_body = jsonutils.dumps(creds) + + resp, resp_body = self._do_request( + token_url, 'POST', headers=headers, body=req_body) + + if resp.status == 200: + resp_auth = jsonutils.loads(resp_body)['access'] + creds_region = self.creds.get('region') + if self.configure_via_auth: + endpoint = get_endpoint(resp_auth['serviceCatalog'], + endpoint_region=creds_region) + self.management_url = endpoint + self.auth_token = resp_auth['token']['id'] + elif resp.status == 305: + raise exception.RedirectException(resp['location']) + elif resp.status == 400: + raise exception.AuthBadRequest(url=token_url) + elif resp.status == 401: + raise exception.NotAuthenticated() + elif resp.status == 404: + raise exception.AuthUrlNotFound(url=token_url) + else: + raise Exception(_('Unexpected response: %s') % resp.status) + + @property + def is_authenticated(self): + return self.auth_token is not None + + @property + def strategy(self): + return 'keystone' + + def _do_request(self, url, method, headers=None, body=None): + headers = headers or {} + conn = httplib2.Http() + conn.force_exception_to_status_code = True + conn.disable_ssl_certificate_validation = self.insecure + headers['User-Agent'] = 'escalator-client' + resp, resp_body = conn.request(url, method, headers=headers, body=body) + return resp, resp_body + + +def get_plugin_from_strategy(strategy, creds=None, insecure=False, + configure_via_auth=True): + if strategy == 'noauth': + return NoAuthStrategy() + elif strategy == 'keystone': + return KeystoneStrategy(creds, insecure, + configure_via_auth=configure_via_auth) + else: + raise Exception(_("Unknown auth strategy '%s'") % strategy) + + +def get_endpoint(service_catalog, service_type='image', endpoint_region=None, + endpoint_type='publicURL'): + """ + Select an endpoint from the service catalog + + We search the full service catalog for services + matching both type and region. If the client + supplied no region then any 'image' endpoint + is considered a match. There must be one -- and + only one -- successful match in the catalog, + otherwise we will raise an exception. + """ + endpoint = None + for service in service_catalog: + s_type = None + try: + s_type = service['type'] + except KeyError: + msg = _('Encountered service with no "type": %s') % s_type + LOG.warn(msg) + continue + + if s_type == service_type: + for ep in service['endpoints']: + if endpoint_region is None or endpoint_region == ep['region']: + if endpoint is not None: + # This is a second match, abort + raise exception.RegionAmbiguity(region=endpoint_region) + endpoint = ep + if endpoint and endpoint.get(endpoint_type): + return endpoint[endpoint_type] + else: + raise exception.NoServiceEndpoint() diff --git a/api/escalator/common/client.py b/api/escalator/common/client.py new file mode 100644 index 0000000..586d638 --- /dev/null +++ b/api/escalator/common/client.py @@ -0,0 +1,594 @@ +# Copyright 2010-2011 OpenStack Foundation +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +# HTTPSClientAuthConnection code comes courtesy of ActiveState website: +# http://code.activestate.com/recipes/ +# 577548-https-httplib-client-connection-with-certificate-v/ + +import collections +import copy +import errno +import functools +import httplib +import os +import re + +try: + from eventlet.green import socket + from eventlet.green import ssl +except ImportError: + import socket + import ssl + +import osprofiler.web + +try: + import sendfile # noqa + SENDFILE_SUPPORTED = True +except ImportError: + SENDFILE_SUPPORTED = False + +from oslo_log import log as logging +from oslo_utils import encodeutils +import six +# NOTE(jokke): simplified transition to py3, behaves like py2 xrange +from six.moves import range +import six.moves.urllib.parse as urlparse + +from escalator.common import auth +from escalator.common import exception +from escalator.common import utils +from escalator import i18n + +LOG = logging.getLogger(__name__) +_ = i18n._ + +# common chunk size for get and put +CHUNKSIZE = 65536 + +VERSION_REGEX = re.compile(r"/?v[0-9\.]+") + + +def handle_unauthenticated(func): + """ + Wrap a function to re-authenticate and retry. + """ + @functools.wraps(func) + def wrapped(self, *args, **kwargs): + try: + return func(self, *args, **kwargs) + except exception.NotAuthenticated: + self._authenticate(force_reauth=True) + return func(self, *args, **kwargs) + return wrapped + + +def handle_redirects(func): + """ + Wrap the _do_request function to handle HTTP redirects. + """ + MAX_REDIRECTS = 5 + + @functools.wraps(func) + def wrapped(self, method, url, body, headers): + for _ in range(MAX_REDIRECTS): + try: + return func(self, method, url, body, headers) + except exception.RedirectException as redirect: + if redirect.url is None: + raise exception.InvalidRedirect() + url = redirect.url + raise exception.MaxRedirectsExceeded(redirects=MAX_REDIRECTS) + return wrapped + + +class HTTPSClientAuthConnection(httplib.HTTPSConnection): + """ + Class to make a HTTPS connection, with support for + full client-based SSL Authentication + + :see http://code.activestate.com/recipes/ + 577548-https-httplib-client-connection-with-certificate-v/ + """ + + def __init__(self, host, port, key_file, cert_file, + ca_file, timeout=None, insecure=False): + httplib.HTTPSConnection.__init__(self, host, port, key_file=key_file, + cert_file=cert_file) + self.key_file = key_file + self.cert_file = cert_file + self.ca_file = ca_file + self.timeout = timeout + self.insecure = insecure + + def connect(self): + """ + Connect to a host on a given (SSL) port. + If ca_file is pointing somewhere, use it to check Server Certificate. + + Redefined/copied and extended from httplib.py:1105 (Python 2.6.x). + This is needed to pass cert_reqs=ssl.CERT_REQUIRED as parameter to + ssl.wrap_socket(), which forces SSL to check server certificate against + our client certificate. + """ + sock = socket.create_connection((self.host, self.port), self.timeout) + if self._tunnel_host: + self.sock = sock + self._tunnel() + # Check CA file unless 'insecure' is specificed + if self.insecure is True: + self.sock = ssl.wrap_socket(sock, self.key_file, self.cert_file, + cert_reqs=ssl.CERT_NONE) + else: + self.sock = ssl.wrap_socket(sock, self.key_file, self.cert_file, + ca_certs=self.ca_file, + cert_reqs=ssl.CERT_REQUIRED) + + +class BaseClient(object): + + """A base client class""" + + DEFAULT_PORT = 80 + DEFAULT_DOC_ROOT = None + # Standard CA file locations for Debian/Ubuntu, RedHat/Fedora, + # Suse, FreeBSD/OpenBSD + DEFAULT_CA_FILE_PATH = ('/etc/ssl/certs/ca-certificates.crt:' + '/etc/pki/tls/certs/ca-bundle.crt:' + '/etc/ssl/ca-bundle.pem:' + '/etc/ssl/cert.pem') + + OK_RESPONSE_CODES = ( + httplib.OK, + httplib.CREATED, + httplib.ACCEPTED, + httplib.NO_CONTENT, + ) + + REDIRECT_RESPONSE_CODES = ( + httplib.MOVED_PERMANENTLY, + httplib.FOUND, + httplib.SEE_OTHER, + httplib.USE_PROXY, + httplib.TEMPORARY_REDIRECT, + ) + + def __init__(self, host, port=None, timeout=None, use_ssl=False, + auth_token=None, creds=None, doc_root=None, key_file=None, + cert_file=None, ca_file=None, insecure=False, + configure_via_auth=True): + """ + Creates a new client to some service. + + :param host: The host where service resides + :param port: The port where service resides + :param timeout: Connection timeout. + :param use_ssl: Should we use HTTPS? + :param auth_token: The auth token to pass to the server + :param creds: The credentials to pass to the auth plugin + :param doc_root: Prefix for all URLs we request from host + :param key_file: Optional PEM-formatted file that contains the private + key. + If use_ssl is True, and this param is None (the + default), then an environ variable + ESCALATOR_CLIENT_KEY_FILE is looked for. If no such + environ variable is found, ClientConnectionError + will be raised. + :param cert_file: Optional PEM-formatted certificate chain file. + If use_ssl is True, and this param is None (the + default), then an environ variable + ESCALATOR_CLIENT_CERT_FILE is looked for. If no such + environ variable is found, ClientConnectionError + will be raised. + :param ca_file: Optional CA cert file to use in SSL connections + If use_ssl is True, and this param is None (the + default), then an environ variable + ESCALATOR_CLIENT_CA_FILE is looked for. + :param insecure: Optional. If set then the server's certificate + will not be verified. + :param configure_via_auth: Optional. Defaults to True. If set, the + URL returned from the service catalog for the image + endpoint will **override** the URL supplied to in + the host parameter. + """ + self.host = host + self.port = port or self.DEFAULT_PORT + self.timeout = timeout + # A value of '0' implies never timeout + if timeout == 0: + self.timeout = None + self.use_ssl = use_ssl + self.auth_token = auth_token + self.creds = creds or {} + self.connection = None + self.configure_via_auth = configure_via_auth + # doc_root can be a nullstring, which is valid, and why we + # cannot simply do doc_root or self.DEFAULT_DOC_ROOT below. + self.doc_root = (doc_root if doc_root is not None + else self.DEFAULT_DOC_ROOT) + + self.key_file = key_file + self.cert_file = cert_file + self.ca_file = ca_file + self.insecure = insecure + self.auth_plugin = self.make_auth_plugin(self.creds, self.insecure) + self.connect_kwargs = self.get_connect_kwargs() + + def get_connect_kwargs(self): + connect_kwargs = {} + + # Both secure and insecure connections have a timeout option + connect_kwargs['timeout'] = self.timeout + + if self.use_ssl: + if self.key_file is None: + self.key_file = os.environ.get('ESCALATOR_CLIENT_KEY_FILE') + if self.cert_file is None: + self.cert_file = os.environ.get('ESCALATOR_CLIENT_CERT_FILE') + if self.ca_file is None: + self.ca_file = os.environ.get('ESCALATOR_CLIENT_CA_FILE') + + # Check that key_file/cert_file are either both set or both unset + if self.cert_file is not None and self.key_file is None: + msg = _("You have selected to use SSL in connecting, " + "and you have supplied a cert, " + "however you have failed to supply either a " + "key_file parameter or set the " + "ESCALATOR_CLIENT_KEY_FILE environ variable") + raise exception.ClientConnectionError(msg) + + if self.key_file is not None and self.cert_file is None: + msg = _("You have selected to use SSL in connecting, " + "and you have supplied a key, " + "however you have failed to supply either a " + "cert_file parameter or set the " + "ESCALATOR_CLIENT_CERT_FILE environ variable") + raise exception.ClientConnectionError(msg) + + if (self.key_file is not None and + not os.path.exists(self.key_file)): + msg = _("The key file you specified %s does not " + "exist") % self.key_file + raise exception.ClientConnectionError(msg) + connect_kwargs['key_file'] = self.key_file + + if (self.cert_file is not None and + not os.path.exists(self.cert_file)): + msg = _("The cert file you specified %s does not " + "exist") % self.cert_file + raise exception.ClientConnectionError(msg) + connect_kwargs['cert_file'] = self.cert_file + + if (self.ca_file is not None and + not os.path.exists(self.ca_file)): + msg = _("The CA file you specified %s does not " + "exist") % self.ca_file + raise exception.ClientConnectionError(msg) + + if self.ca_file is None: + for ca in self.DEFAULT_CA_FILE_PATH.split(":"): + if os.path.exists(ca): + self.ca_file = ca + break + + connect_kwargs['ca_file'] = self.ca_file + connect_kwargs['insecure'] = self.insecure + + return connect_kwargs + + def configure_from_url(self, url): + """ + Setups the connection based on the given url. + + The form is: + + <http|https>://<host>:port/doc_root + """ + LOG.debug("Configuring from URL: %s", url) + parsed = urlparse.urlparse(url) + self.use_ssl = parsed.scheme == 'https' + self.host = parsed.hostname + self.port = parsed.port or 80 + self.doc_root = parsed.path.rstrip('/') + + # We need to ensure a version identifier is appended to the doc_root + if not VERSION_REGEX.match(self.doc_root): + if self.DEFAULT_DOC_ROOT: + doc_root = self.DEFAULT_DOC_ROOT.lstrip('/') + self.doc_root += '/' + doc_root + msg = ("Appending doc_root %(doc_root)s to URL %(url)s" % + {'doc_root': doc_root, 'url': url}) + LOG.debug(msg) + + # ensure connection kwargs are re-evaluated after the service catalog + # publicURL is parsed for potential SSL usage + self.connect_kwargs = self.get_connect_kwargs() + + def make_auth_plugin(self, creds, insecure): + """ + Returns an instantiated authentication plugin. + """ + strategy = creds.get('strategy', 'noauth') + plugin = auth.get_plugin_from_strategy(strategy, creds, insecure, + self.configure_via_auth) + return plugin + + def get_connection_type(self): + """ + Returns the proper connection type + """ + if self.use_ssl: + return HTTPSClientAuthConnection + else: + return httplib.HTTPConnection + + def _authenticate(self, force_reauth=False): + """ + Use the authentication plugin to authenticate and set the auth token. + + :param force_reauth: For re-authentication to bypass cache. + """ + auth_plugin = self.auth_plugin + + if not auth_plugin.is_authenticated or force_reauth: + auth_plugin.authenticate() + + self.auth_token = auth_plugin.auth_token + + management_url = auth_plugin.management_url + if management_url and self.configure_via_auth: + self.configure_from_url(management_url) + + @handle_unauthenticated + def do_request(self, method, action, body=None, headers=None, + params=None): + """ + Make a request, returning an HTTP response object. + + :param method: HTTP verb (GET, POST, PUT, etc.) + :param action: Requested path to append to self.doc_root + :param body: Data to send in the body of the request + :param headers: Headers to send with the request + :param params: Key/value pairs to use in query string + :returns: HTTP response object + """ + if not self.auth_token: + self._authenticate() + + url = self._construct_url(action, params) + # NOTE(ameade): We need to copy these kwargs since they can be altered + # in _do_request but we need the originals if handle_unauthenticated + # calls this function again. + return self._do_request(method=method, url=url, + body=copy.deepcopy(body), + headers=copy.deepcopy(headers)) + + def _construct_url(self, action, params=None): + """ + Create a URL object we can use to pass to _do_request(). + """ + action = urlparse.quote(action) + path = '/'.join([self.doc_root or '', action.lstrip('/')]) + scheme = "https" if self.use_ssl else "http" + netloc = "%s:%d" % (self.host, self.port) + + if isinstance(params, dict): + for (key, value) in params.items(): + if value is None: + del params[key] + continue + if not isinstance(value, six.string_types): + value = str(value) + params[key] = encodeutils.safe_encode(value) + query = urlparse.urlencode(params) + else: + query = None + + url = urlparse.ParseResult(scheme, netloc, path, '', query, '') + log_msg = _("Constructed URL: %s") + LOG.debug(log_msg, url.geturl()) + return url + + def _encode_headers(self, headers): + """ + Encodes headers. + + Note: This should be used right before + sending anything out. + + :param headers: Headers to encode + :returns: Dictionary with encoded headers' + names and values + """ + to_str = encodeutils.safe_encode + return dict([(to_str(h), to_str(v)) for h, v in + six.iteritems(headers)]) + + @handle_redirects + def _do_request(self, method, url, body, headers): + """ + Connects to the server and issues a request. Handles converting + any returned HTTP error status codes to ESCALATOR exceptions + and closing the server connection. Returns the result data, or + raises an appropriate exception. + + :param method: HTTP method ("GET", "POST", "PUT", etc...) + :param url: urlparse.ParsedResult object with URL information + :param body: data to send (as string, filelike or iterable), + or None (default) + :param headers: mapping of key/value pairs to add as headers + + :note + + If the body param has a read attribute, and method is either + POST or PUT, this method will automatically conduct a chunked-transfer + encoding and use the body as a file object or iterable, transferring + chunks of data using the connection's send() method. This allows large + objects to be transferred efficiently without buffering the entire + body in memory. + """ + if url.query: + path = url.path + "?" + url.query + else: + path = url.path + + try: + connection_type = self.get_connection_type() + headers = self._encode_headers(headers or {}) + headers.update(osprofiler.web.get_trace_id_headers()) + + if 'x-auth-token' not in headers and self.auth_token: + headers['x-auth-token'] = self.auth_token + + c = connection_type(url.hostname, url.port, **self.connect_kwargs) + + def _pushing(method): + return method.lower() in ('post', 'put') + + def _simple(body): + return body is None or isinstance(body, six.string_types) + + def _filelike(body): + return hasattr(body, 'read') + + def _sendbody(connection, iter): + connection.endheaders() + for sent in iter: + # iterator has done the heavy lifting + pass + + def _chunkbody(connection, iter): + connection.putheader('Transfer-Encoding', 'chunked') + connection.endheaders() + for chunk in iter: + connection.send('%x\r\n%s\r\n' % (len(chunk), chunk)) + connection.send('0\r\n\r\n') + + # Do a simple request or a chunked request, depending + # on whether the body param is file-like or iterable and + # the method is PUT or POST + # + if not _pushing(method) or _simple(body): + # Simple request... + c.request(method, path, body, headers) + elif _filelike(body) or self._iterable(body): + c.putrequest(method, path) + + use_sendfile = self._sendable(body) + + # According to HTTP/1.1, Content-Length and Transfer-Encoding + # conflict. + for header, value in headers.items(): + if use_sendfile or header.lower() != 'content-length': + c.putheader(header, str(value)) + + iter = utils.chunkreadable(body) + + if use_sendfile: + # send actual file without copying into userspace + _sendbody(c, iter) + else: + # otherwise iterate and chunk + _chunkbody(c, iter) + else: + raise TypeError('Unsupported image type: %s' % body.__class__) + + res = c.getresponse() + + def _retry(res): + return res.getheader('Retry-After') + + status_code = self.get_status_code(res) + if status_code in self.OK_RESPONSE_CODES: + return res + elif status_code in self.REDIRECT_RESPONSE_CODES: + raise exception.RedirectException(res.getheader('Location')) + elif status_code == httplib.UNAUTHORIZED: + raise exception.NotAuthenticated(res.read()) + elif status_code == httplib.FORBIDDEN: + raise exception.Forbidden(res.read()) + elif status_code == httplib.NOT_FOUND: + raise exception.NotFound(res.read()) + elif status_code == httplib.CONFLICT: + raise exception.Duplicate(res.read()) + elif status_code == httplib.BAD_REQUEST: + raise exception.Invalid(res.read()) + elif status_code == httplib.MULTIPLE_CHOICES: + raise exception.MultipleChoices(body=res.read()) + elif status_code == httplib.REQUEST_ENTITY_TOO_LARGE: + raise exception.LimitExceeded(retry=_retry(res), + body=res.read()) + elif status_code == httplib.INTERNAL_SERVER_ERROR: + raise exception.ServerError() + elif status_code == httplib.SERVICE_UNAVAILABLE: + raise exception.ServiceUnavailable(retry=_retry(res)) + else: + raise exception.UnexpectedStatus(status=status_code, + body=res.read()) + + except (socket.error, IOError) as e: + raise exception.ClientConnectionError(e) + + def _seekable(self, body): + # pipes are not seekable, avoids sendfile() failure on e.g. + # cat /path/to/image | escalator add ... + # or where add command is launched via popen + try: + os.lseek(body.fileno(), 0, os.SEEK_CUR) + return True + except OSError as e: + return (e.errno != errno.ESPIPE) + + def _sendable(self, body): + return (SENDFILE_SUPPORTED and + hasattr(body, 'fileno') and + self._seekable(body) and + not self.use_ssl) + + def _iterable(self, body): + return isinstance(body, collections.Iterable) + + def get_status_code(self, response): + """ + Returns the integer status code from the response, which + can be either a Webob.Response (used in testing) or httplib.Response + """ + if hasattr(response, 'status_int'): + return response.status_int + else: + return response.status + + def _extract_params(self, actual_params, allowed_params): + """ + Extract a subset of keys from a dictionary. The filters key + will also be extracted, and each of its values will be returned + as an individual param. + + :param actual_params: dict of keys to filter + :param allowed_params: list of keys that 'actual_params' will be + reduced to + :retval subset of 'params' dict + """ + try: + # expect 'filters' param to be a dict here + result = dict(actual_params.get('filters')) + except TypeError: + result = {} + + for allowed_param in allowed_params: + if allowed_param in actual_params: + result[allowed_param] = actual_params[allowed_param] + + return result diff --git a/api/escalator/common/config.py b/api/escalator/common/config.py new file mode 100644 index 0000000..66a59f1 --- /dev/null +++ b/api/escalator/common/config.py @@ -0,0 +1,204 @@ + +# Copyright 2011 OpenStack Foundation +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +""" +Routines for configuring Escalator +""" + +import logging +import logging.config +import logging.handlers +import os +import tempfile + +from oslo_concurrency import lockutils +from oslo_config import cfg +from oslo_policy import policy +from paste import deploy + +from escalator import i18n +from escalator.version import version_info as version + +_ = i18n._ + +paste_deploy_opts = [ + cfg.StrOpt('flavor', + help=_('Partial name of a pipeline in your paste configuration ' + 'file with the service name removed. For example, if ' + 'your paste section name is ' + '[pipeline:escalator-api-keystone] use the value ' + '"keystone"')), + cfg.StrOpt('config_file', + help=_('Name of the paste configuration file.')), +] +task_opts = [ + cfg.IntOpt('task_time_to_live', + default=48, + help=_("Time in hours for which a task lives after, either " + "succeeding or failing"), + deprecated_opts=[cfg.DeprecatedOpt('task_time_to_live', + group='DEFAULT')]), + cfg.StrOpt('task_executor', + default='taskflow', + help=_("Specifies which task executor to be used to run the " + "task scripts.")), + cfg.StrOpt('work_dir', + default=None, + help=_('Work dir for asynchronous task operations. ' + 'The directory set here will be used to operate over ' + 'images - normally before they are imported in the ' + 'destination store. When providing work dir, make sure ' + 'enough space is provided for concurrent tasks to run ' + 'efficiently without running out of space. A rough ' + 'estimation can be done by multiplying the number of ' + '`max_workers` - or the N of workers running - by an ' + 'average image size (e.g 500MB). The image size ' + 'estimation should be done based on the average size in ' + 'your deployment. Note that depending on the tasks ' + 'running you may need to multiply this number by some ' + 'factor depending on what the task does. For example, ' + 'you may want to double the available size if image ' + 'conversion is enabled. All this being said, remember ' + 'these are just estimations and you should do them ' + 'based on the worst case scenario and be prepared to ' + 'act in case they were wrong.')), +] +common_opts = [ + cfg.IntOpt('limit_param_default', default=25, + help=_('Default value for the number of items returned by a ' + 'request if not specified explicitly in the request')), + cfg.IntOpt('api_limit_max', default=1000, + help=_('Maximum permissible number of items that could be ' + 'returned by a request')), + cfg.BoolOpt('enable_v1_api', default=True, + help=_("Deploy the v1 OPNFV Escalator API.")), + cfg.BoolOpt('enable_v2_api', default=True, + help=_("Deploy the v2 OpenStack Images API.")), + cfg.StrOpt('pydev_worker_debug_host', + help=_('The hostname/IP of the pydev process listening for ' + 'debug connections')), + cfg.IntOpt('pydev_worker_debug_port', default=5678, + help=_('The port on which a pydev process is listening for ' + 'connections.')), + cfg.StrOpt('digest_algorithm', default='sha1', + help=_('Digest algorithm which will be used for digital ' + 'signature; the default is sha1 the default in Kilo ' + 'for a smooth upgrade process, and it will be updated ' + 'with sha256 in next release(L). Use the command ' + '"openssl list-message-digest-algorithms" to get the ' + 'available algorithms supported by the version of ' + 'OpenSSL on the platform. Examples are "sha1", ' + '"sha256", "sha512", etc.')), +] + +CONF = cfg.CONF +CONF.register_opts(paste_deploy_opts, group='paste_deploy') +CONF.register_opts(task_opts, group='task') +CONF.register_opts(common_opts) +policy.Enforcer(CONF) + + +def parse_args(args=None, usage=None, default_config_files=None): + if "OSLO_LOCK_PATH" not in os.environ: + lockutils.set_defaults(tempfile.gettempdir()) + + CONF(args=args, + project='escalator', + version=version.cached_version_string(), + usage=usage, + default_config_files=default_config_files) + + +def _get_deployment_flavor(flavor=None): + """ + Retrieve the paste_deploy.flavor config item, formatted appropriately + for appending to the application name. + + :param flavor: if specified, use this setting rather than the + paste_deploy.flavor configuration setting + """ + if not flavor: + flavor = CONF.paste_deploy.flavor + return '' if not flavor else ('-' + flavor) + + +def _get_paste_config_path(): + paste_suffix = '-paste.ini' + conf_suffix = '.conf' + if CONF.config_file: + # Assume paste config is in a paste.ini file corresponding + # to the last config file + path = CONF.config_file[-1].replace(conf_suffix, paste_suffix) + else: + path = CONF.prog + paste_suffix + return CONF.find_file(os.path.basename(path)) + + +def _get_deployment_config_file(): + """ + Retrieve the deployment_config_file config item, formatted as an + absolute pathname. + """ + path = CONF.paste_deploy.config_file + if not path: + path = _get_paste_config_path() + if not path: + msg = _("Unable to locate paste config file for %s.") % CONF.prog + raise RuntimeError(msg) + return os.path.abspath(path) + + +def load_paste_app(app_name, flavor=None, conf_file=None): + """ + Builds and returns a WSGI app from a paste config file. + + We assume the last config file specified in the supplied ConfigOpts + object is the paste config file, if conf_file is None. + + :param app_name: name of the application to load + :param flavor: name of the variant of the application to load + :param conf_file: path to the paste config file + + :raises RuntimeError when config file cannot be located or application + cannot be loaded from config file + """ + # append the deployment flavor to the application name, + # in order to identify the appropriate paste pipeline + app_name += _get_deployment_flavor(flavor) + + if not conf_file: + conf_file = _get_deployment_config_file() + + try: + logger = logging.getLogger(__name__) + logger.debug("Loading %(app_name)s from %(conf_file)s", + {'conf_file': conf_file, 'app_name': app_name}) + + app = deploy.loadapp("config:%s" % conf_file, name=app_name) + + # Log the options used when starting if we're in debug mode... + if CONF.debug: + CONF.log_opt_values(logger, logging.DEBUG) + + return app + except (LookupError, ImportError) as e: + msg = (_("Unable to load %(app_name)s from " + "configuration file %(conf_file)s." + "\nGot: %(e)r") % {'app_name': app_name, + 'conf_file': conf_file, + 'e': e}) + logger.error(msg) + raise RuntimeError(msg) diff --git a/api/escalator/common/crypt.py b/api/escalator/common/crypt.py new file mode 100644 index 0000000..3638f11 --- /dev/null +++ b/api/escalator/common/crypt.py @@ -0,0 +1,68 @@ + +# Copyright 2011 OpenStack Foundation +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +""" +Routines for URL-safe encrypting/decrypting +""" + +import base64 + +from Crypto.Cipher import AES +from Crypto import Random +from Crypto.Random import random +# NOTE(jokke): simplified transition to py3, behaves like py2 xrange +from six.moves import range + + +def urlsafe_encrypt(key, plaintext, blocksize=16): + """ + Encrypts plaintext. Resulting ciphertext will contain URL-safe characters + :param key: AES secret key + :param plaintext: Input text to be encrypted + :param blocksize: Non-zero integer multiple of AES blocksize in bytes (16) + + :returns : Resulting ciphertext + """ + def pad(text): + """ + Pads text to be encrypted + """ + pad_length = (blocksize - len(text) % blocksize) + sr = random.StrongRandom() + pad = ''.join(chr(sr.randint(1, 0xFF)) for i in range(pad_length - 1)) + # We use chr(0) as a delimiter between text and padding + return text + chr(0) + pad + + # random initial 16 bytes for CBC + init_vector = Random.get_random_bytes(16) + cypher = AES.new(key, AES.MODE_CBC, init_vector) + padded = cypher.encrypt(pad(str(plaintext))) + return base64.urlsafe_b64encode(init_vector + padded) + + +def urlsafe_decrypt(key, ciphertext): + """ + Decrypts URL-safe base64 encoded ciphertext + :param key: AES secret key + :param ciphertext: The encrypted text to decrypt + + :returns : Resulting plaintext + """ + # Cast from unicode + ciphertext = base64.urlsafe_b64decode(str(ciphertext)) + cypher = AES.new(key, AES.MODE_CBC, ciphertext[:16]) + padded = cypher.decrypt(ciphertext[16:]) + return padded[:padded.rfind(chr(0))] diff --git a/api/escalator/common/exception.py b/api/escalator/common/exception.py new file mode 100644 index 0000000..6905074 --- /dev/null +++ b/api/escalator/common/exception.py @@ -0,0 +1,521 @@ +# Copyright 2010 United States Government as represented by the +# Administrator of the National Aeronautics and Space Administration. +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +"""Escalator exception subclasses""" + +import six +import six.moves.urllib.parse as urlparse + +from escalator import i18n + +_ = i18n._ + +_FATAL_EXCEPTION_FORMAT_ERRORS = False + + +class RedirectException(Exception): + + def __init__(self, url): + self.url = urlparse.urlparse(url) + + +class EscalatorException(Exception): + """ + Base Escalator Exception + + To correctly use this class, inherit from it and define + a 'message' property. That message will get printf'd + with the keyword arguments provided to the constructor. + """ + message = _("An unknown exception occurred") + + def __init__(self, message=None, *args, **kwargs): + if not message: + message = self.message + try: + if kwargs: + message = message % kwargs + except Exception: + if _FATAL_EXCEPTION_FORMAT_ERRORS: + raise + else: + # at least get the core message out if something happened + pass + self.msg = message + self.message = message + super(EscalatorException, self).__init__(message) + + def __unicode__(self): + # NOTE(flwang): By default, self.msg is an instance of Message, which + # can't be converted by str(). Based on the definition of + # __unicode__, it should return unicode always. + return six.text_type(self.msg) + + +class MissingCredentialError(EscalatorException): + message = _("Missing required credential: %(required)s") + + +class BadAuthStrategy(EscalatorException): + message = _("Incorrect auth strategy, expected \"%(expected)s\" but " + "received \"%(received)s\"") + + +class NotFound(EscalatorException): + message = _("An object with the specified identifier was not found.") + + +class BadStoreUri(EscalatorException): + message = _("The Store URI was malformed.") + + +class Duplicate(EscalatorException): + message = _("An object with the same identifier already exists.") + + +class Conflict(EscalatorException): + message = _("An object with the same identifier is currently being " + "operated on.") + + +class AuthBadRequest(EscalatorException): + message = _("Connect error/bad request to Auth service at URL %(url)s.") + + +class AuthUrlNotFound(EscalatorException): + message = _("Auth service at URL %(url)s not found.") + + +class AuthorizationFailure(EscalatorException): + message = _("Authorization failed.") + + +class NotAuthenticated(EscalatorException): + message = _("You are not authenticated.") + + +class Forbidden(EscalatorException): + message = _("You are not authorized to complete this action.") + + +class ProtectedMetadefNamespaceDelete(Forbidden): + message = _("Metadata definition namespace %(namespace)s is protected" + " and cannot be deleted.") + + +class ProtectedMetadefNamespacePropDelete(Forbidden): + message = _("Metadata definition property %(property_name)s is protected" + " and cannot be deleted.") + + +class ProtectedMetadefObjectDelete(Forbidden): + message = _("Metadata definition object %(object_name)s is protected" + " and cannot be deleted.") + + +class ProtectedMetadefResourceTypeAssociationDelete(Forbidden): + message = _("Metadata definition resource-type-association" + " %(resource_type)s is protected and cannot be deleted.") + + +class ProtectedMetadefResourceTypeSystemDelete(Forbidden): + message = _("Metadata definition resource-type %(resource_type_name)s is" + " a seeded-system type and cannot be deleted.") + + +class ProtectedMetadefTagDelete(Forbidden): + message = _("Metadata definition tag %(tag_name)s is protected" + " and cannot be deleted.") + + +class Invalid(EscalatorException): + message = _("Data supplied was not valid.") + + +class InvalidSortKey(Invalid): + message = _("Sort key supplied was not valid.") + + +class InvalidSortDir(Invalid): + message = _("Sort direction supplied was not valid.") + + +class InvalidPropertyProtectionConfiguration(Invalid): + message = _("Invalid configuration in property protection file.") + + +class InvalidFilterRangeValue(Invalid): + message = _("Unable to filter using the specified range.") + + +class InvalidOptionValue(Invalid): + message = _("Invalid value for option %(option)s: %(value)s") + + +class ReadonlyProperty(Forbidden): + message = _("Attribute '%(property)s' is read-only.") + + +class ReservedProperty(Forbidden): + message = _("Attribute '%(property)s' is reserved.") + + +class AuthorizationRedirect(EscalatorException): + message = _("Redirecting to %(uri)s for authorization.") + + +class ClientConnectionError(EscalatorException): + message = _("There was an error connecting to a server") + + +class ClientConfigurationError(EscalatorException): + message = _("There was an error configuring the client.") + + +class MultipleChoices(EscalatorException): + message = _("The request returned a 302 Multiple Choices. This generally " + "means that you have not included a version indicator in a " + "request URI.\n\nThe body of response returned:\n%(body)s") + + +class LimitExceeded(EscalatorException): + message = _("The request returned a 413 Request Entity Too Large. This " + "generally means that rate limiting or a quota threshold was " + "breached.\n\nThe response body:\n%(body)s") + + def __init__(self, *args, **kwargs): + self.retry_after = (int(kwargs['retry']) if kwargs.get('retry') + else None) + super(LimitExceeded, self).__init__(*args, **kwargs) + + +class ServiceUnavailable(EscalatorException): + message = _("The request returned 503 Service Unavailable. This " + "generally occurs on service overload or other transient " + "outage.") + + def __init__(self, *args, **kwargs): + self.retry_after = (int(kwargs['retry']) if kwargs.get('retry') + else None) + super(ServiceUnavailable, self).__init__(*args, **kwargs) + + +class ServerError(EscalatorException): + message = _("The request returned 500 Internal Server Error.") + + +class UnexpectedStatus(EscalatorException): + message = _("The request returned an unexpected status: %(status)s." + "\n\nThe response body:\n%(body)s") + + +class InvalidContentType(EscalatorException): + message = _("Invalid content type %(content_type)s") + + +class BadRegistryConnectionConfiguration(EscalatorException): + message = _("Registry was not configured correctly on API server. " + "Reason: %(reason)s") + + +class BadDriverConfiguration(EscalatorException): + message = _("Driver %(driver_name)s could not be configured correctly. " + "Reason: %(reason)s") + + +class MaxRedirectsExceeded(EscalatorException): + message = _("Maximum redirects (%(redirects)s) was exceeded.") + + +class InvalidRedirect(EscalatorException): + message = _("Received invalid HTTP redirect.") + + +class NoServiceEndpoint(EscalatorException): + message = _("Response from Keystone does not contain a Glance endpoint.") + + +class RegionAmbiguity(EscalatorException): + message = _("Multiple 'image' service matches for region %(region)s. This " + "generally means that a region is required and you have not " + "supplied one.") + + +class WorkerCreationFailure(EscalatorException): + message = _("Server worker creation failed: %(reason)s.") + + +class SchemaLoadError(EscalatorException): + message = _("Unable to load schema: %(reason)s") + + +class InvalidObject(EscalatorException): + message = _("Provided object does not match schema " + "'%(schema)s': %(reason)s") + + +class UnsupportedHeaderFeature(EscalatorException): + message = _("Provided header feature is unsupported: %(feature)s") + + +class InUseByStore(EscalatorException): + message = _("The image cannot be deleted because it is in use through " + "the backend store outside of escalator.") + + +class SIGHUPInterrupt(EscalatorException): + message = _("System SIGHUP signal received.") + + +class RPCError(EscalatorException): + message = _("%(cls)s exception was raised in the last rpc call: %(val)s") + + +class TaskException(EscalatorException): + message = _("An unknown task exception occurred") + + +class BadTaskConfiguration(EscalatorException): + message = _("Task was not configured properly") + + +class TaskNotFound(TaskException, NotFound): + message = _("Task with the given id %(task_id)s was not found") + + +class InvalidTaskStatus(TaskException, Invalid): + message = _("Provided status of task is unsupported: %(status)s") + + +class InvalidTaskType(TaskException, Invalid): + message = _("Provided type of task is unsupported: %(type)s") + + +class InvalidTaskStatusTransition(TaskException, Invalid): + message = _("Status transition from %(cur_status)s to" + " %(new_status)s is not allowed") + + +class DuplicateLocation(Duplicate): + message = _("The location %(location)s already exists") + + +class InvalidParameterValue(Invalid): + message = _("Invalid value '%(value)s' for parameter '%(param)s': " + "%(extra_msg)s") + + +class MetadefDuplicateNamespace(Duplicate): + message = _("The metadata definition namespace=%(namespace_name)s" + " already exists.") + + +class MetadefDuplicateObject(Duplicate): + message = _("A metadata definition object with name=%(object_name)s" + " already exists in namespace=%(namespace_name)s.") + + +class MetadefDuplicateProperty(Duplicate): + message = _("A metadata definition property with name=%(property_name)s" + " already exists in namespace=%(namespace_name)s.") + + +class MetadefDuplicateResourceType(Duplicate): + message = _("A metadata definition resource-type with" + " name=%(resource_type_name)s already exists.") + + +class MetadefDuplicateResourceTypeAssociation(Duplicate): + message = _("The metadata definition resource-type association of" + " resource-type=%(resource_type_name)s to" + " namespace=%(namespace_name)s" + " already exists.") + + +class MetadefDuplicateTag(Duplicate): + message = _("A metadata tag with name=%(name)s" + " already exists in namespace=%(namespace_name)s.") + + +class MetadefForbidden(Forbidden): + message = _("You are not authorized to complete this action.") + + +class MetadefIntegrityError(Forbidden): + message = _("The metadata definition %(record_type)s with" + " name=%(record_name)s not deleted." + " Other records still refer to it.") + + +class MetadefNamespaceNotFound(NotFound): + message = _("Metadata definition namespace=%(namespace_name)s" + "was not found.") + + +class MetadefObjectNotFound(NotFound): + message = _("The metadata definition object with" + " name=%(object_name)s was not found in" + " namespace=%(namespace_name)s.") + + +class MetadefPropertyNotFound(NotFound): + message = _("The metadata definition property with" + " name=%(property_name)s was not found in" + " namespace=%(namespace_name)s.") + + +class MetadefResourceTypeNotFound(NotFound): + message = _("The metadata definition resource-type with" + " name=%(resource_type_name)s, was not found.") + + +class MetadefResourceTypeAssociationNotFound(NotFound): + message = _("The metadata definition resource-type association of" + " resource-type=%(resource_type_name)s to" + " namespace=%(namespace_name)s," + " was not found.") + + +class MetadefTagNotFound(NotFound): + message = _("The metadata definition tag with" + " name=%(name)s was not found in" + " namespace=%(namespace_name)s.") + + +class InvalidVersion(Invalid): + message = _("Version is invalid: %(reason)s") + + +class InvalidArtifactTypePropertyDefinition(Invalid): + message = _("Invalid property definition") + + +class InvalidArtifactTypeDefinition(Invalid): + message = _("Invalid type definition") + + +class InvalidArtifactPropertyValue(Invalid): + message = _("Property '%(name)s' may not have value '%(val)s': %(msg)s") + + def __init__(self, message=None, *args, **kwargs): + super(InvalidArtifactPropertyValue, self).__init__(message, *args, + **kwargs) + self.name = kwargs.get('name') + self.value = kwargs.get('val') + + +class ArtifactNotFound(NotFound): + message = _("Artifact with id=%(id)s was not found") + + +class ArtifactForbidden(Forbidden): + message = _("Artifact with id=%(id)s is not accessible") + + +class ArtifactDuplicateNameTypeVersion(Duplicate): + message = _("Artifact with the specified type, name and version" + " already exists") + + +class InvalidArtifactStateTransition(Invalid): + message = _("Artifact cannot change state from %(source)s to %(target)s") + + +class ArtifactDuplicateDirectDependency(Duplicate): + message = _("Artifact with the specified type, name and version" + " already has the direct dependency=%(dep)s") + + +class ArtifactDuplicateTransitiveDependency(Duplicate): + message = _("Artifact with the specified type, name and version" + " already has the transitive dependency=%(dep)s") + + +class ArtifactUnsupportedPropertyOperator(Invalid): + message = _("Operator %(op)s is not supported") + + +class ArtifactUnsupportedShowLevel(Invalid): + message = _("Show level %(shl)s is not supported in this operation") + + +class ArtifactPropertyValueNotFound(NotFound): + message = _("Property's %(prop)s value has not been found") + + +class ArtifactInvalidProperty(Invalid): + message = _("Artifact has no property %(prop)s") + + +class ArtifactInvalidPropertyParameter(Invalid): + message = _("Cannot use this parameter with the operator %(op)s") + + +class ArtifactLoadError(EscalatorException): + message = _("Cannot load artifact '%(name)s'") + + +class ArtifactNonMatchingTypeName(ArtifactLoadError): + message = _( + "Plugin name '%(plugin)s' should match artifact typename '%(name)s'") + + +class ArtifactPluginNotFound(NotFound): + message = _("No plugin for '%(name)s' has been loaded") + + +class UnknownArtifactType(NotFound): + message = _("Artifact type with name '%(name)s' and version '%(version)s' " + "is not known") + + +class ArtifactInvalidStateTransition(Invalid): + message = _("Artifact state cannot be changed from %(curr)s to %(to)s") + + +class JsonPatchException(EscalatorException): + message = _("Invalid jsonpatch request") + + +class InvalidJsonPatchBody(JsonPatchException): + message = _("The provided body %(body)s is invalid " + "under given schema: %(schema)s") + + +class InvalidJsonPatchPath(JsonPatchException): + message = _("The provided path '%(path)s' is invalid: %(explanation)s") + + def __init__(self, message=None, *args, **kwargs): + self.explanation = kwargs.get("explanation") + super(InvalidJsonPatchPath, self).__init__(message, *args, **kwargs) + + +class ThreadBinException(EscalatorException): + + def __init__(self, *args): + super(ThreadBinException, self).__init__(*args) + + +class SubprocessCmdFailed(EscalatorException): + message = _("suprocess command failed.") + + +class DeleteConstrainted(EscalatorException): + message = _("delete is not allowed.") + + +class TrustMeFailed(EscalatorException): + message = _("Trust me script failed.") diff --git a/api/escalator/common/rpc.py b/api/escalator/common/rpc.py new file mode 100644 index 0000000..4d50461 --- /dev/null +++ b/api/escalator/common/rpc.py @@ -0,0 +1,279 @@ +# Copyright 2013 Red Hat, Inc. +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +""" +RPC Controller +""" +import datetime +import traceback + +from oslo_config import cfg +from oslo_log import log as logging +import oslo_utils.importutils as imp +from oslo_utils import timeutils +import six +from webob import exc + +from escalator.common import client +from escalator.common import exception +from escalator.common import utils +from escalator.common import wsgi +from escalator import i18n + +LOG = logging.getLogger(__name__) +_ = i18n._ +_LE = i18n._LE + + +rpc_opts = [ + # NOTE(flaper87): Shamelessly copied + # from oslo rpc. + cfg.ListOpt('allowed_rpc_exception_modules', + default=['openstack.common.exception', + 'escalator.common.exception', + 'exceptions', + ], + help='Modules of exceptions that are permitted to be recreated' + ' upon receiving exception data from an rpc call.'), +] + +CONF = cfg.CONF +CONF.register_opts(rpc_opts) + + +class RPCJSONSerializer(wsgi.JSONResponseSerializer): + + def _sanitizer(self, obj): + def to_primitive(_type, _value): + return {"_type": _type, "_value": _value} + + if isinstance(obj, datetime.datetime): + return to_primitive("datetime", timeutils.strtime(obj)) + + return super(RPCJSONSerializer, self)._sanitizer(obj) + + +class RPCJSONDeserializer(wsgi.JSONRequestDeserializer): + + def _to_datetime(self, obj): + return timeutils.parse_strtime(obj) + + def _sanitizer(self, obj): + try: + _type, _value = obj["_type"], obj["_value"] + return getattr(self, "_to_" + _type)(_value) + except (KeyError, AttributeError): + return obj + + +class Controller(object): + """ + Base RPCController. + + This is the base controller for RPC based APIs. Commands + handled by this controller respect the following form: + + [{ + 'command': 'method_name', + 'kwargs': {...} + }] + + The controller is capable of processing more than one command + per request and will always return a list of results. + + :params raise_exc: Boolean that specifies whether to raise + exceptions instead of "serializing" them. + """ + + def __init__(self, raise_exc=False): + self._registered = {} + self.raise_exc = raise_exc + + def register(self, resource, filtered=None, excluded=None, refiner=None): + """ + Exports methods through the RPC Api. + + :params resource: Resource's instance to register. + :params filtered: List of methods that *can* be registered. Read + as "Method must be in this list". + :params excluded: List of methods to exclude. + :params refiner: Callable to use as filter for methods. + + :raises AssertionError: If refiner is not callable. + """ + + funcs = filter(lambda x: not x.startswith("_"), dir(resource)) + + if filtered: + funcs = [f for f in funcs if f in filtered] + + if excluded: + funcs = [f for f in funcs if f not in excluded] + + if refiner: + assert callable(refiner), "Refiner must be callable" + funcs = filter(refiner, funcs) + + for name in funcs: + meth = getattr(resource, name) + + if not callable(meth): + continue + + self._registered[name] = meth + + def __call__(self, req, body): + """ + Executes the command + """ + + if not isinstance(body, list): + msg = _("Request must be a list of commands") + raise exc.HTTPBadRequest(explanation=msg) + + def validate(cmd): + if not isinstance(cmd, dict): + msg = _("Bad Command: %s") % str(cmd) + raise exc.HTTPBadRequest(explanation=msg) + + command, kwargs = cmd.get("command"), cmd.get("kwargs") + + if (not command or not isinstance(command, six.string_types) or + (kwargs and not isinstance(kwargs, dict))): + msg = _("Wrong command structure: %s") % (str(cmd)) + raise exc.HTTPBadRequest(explanation=msg) + + method = self._registered.get(command) + if not method: + # Just raise 404 if the user tries to + # access a private method. No need for + # 403 here since logically the command + # is not registered to the rpc dispatcher + raise exc.HTTPNotFound(explanation=_("Command not found")) + + return True + + # If more than one command were sent then they might + # be intended to be executed sequentially, that for, + # lets first verify they're all valid before executing + # them. + commands = filter(validate, body) + + results = [] + for cmd in commands: + # kwargs is not required + command, kwargs = cmd["command"], cmd.get("kwargs", {}) + method = self._registered[command] + try: + result = method(req.context, **kwargs) + except Exception as e: + if self.raise_exc: + raise + + cls, val = e.__class__, utils.exception_to_str(e) + msg = (_LE("RPC Call Error: %(val)s\n%(tb)s") % + dict(val=val, tb=traceback.format_exc())) + LOG.error(msg) + + # NOTE(flaper87): Don't propagate all exceptions + # but the ones allowed by the user. + module = cls.__module__ + if module not in CONF.allowed_rpc_exception_modules: + cls = exception.RPCError + val = six.text_type(exception.RPCError(cls=cls, val=val)) + + cls_path = "%s.%s" % (cls.__module__, cls.__name__) + result = {"_error": {"cls": cls_path, "val": val}} + results.append(result) + return results + + +class RPCClient(client.BaseClient): + + def __init__(self, *args, **kwargs): + self._serializer = RPCJSONSerializer() + self._deserializer = RPCJSONDeserializer() + + self.raise_exc = kwargs.pop("raise_exc", True) + self.base_path = kwargs.pop("base_path", '/rpc') + super(RPCClient, self).__init__(*args, **kwargs) + + @client.handle_unauthenticated + def bulk_request(self, commands): + """ + Execute multiple commands in a single request. + + :params commands: List of commands to send. Commands + must respect the following form: + + { + 'command': 'method_name', + 'kwargs': method_kwargs + } + """ + body = self._serializer.to_json(commands) + response = super(RPCClient, self).do_request('POST', + self.base_path, + body) + return self._deserializer.from_json(response.read()) + + def do_request(self, method, **kwargs): + """ + Simple do_request override. This method serializes + the outgoing body and builds the command that will + be sent. + + :params method: The remote python method to call + :params kwargs: Dynamic parameters that will be + passed to the remote method. + """ + content = self.bulk_request([{'command': method, + 'kwargs': kwargs}]) + + # NOTE(flaper87): Return the first result if + # a single command was executed. + content = content[0] + + # NOTE(flaper87): Check if content is an error + # and re-raise it if raise_exc is True. Before + # checking if content contains the '_error' key, + # verify if it is an instance of dict - since the + # RPC call may have returned something different. + if self.raise_exc and (isinstance(content, dict) and + '_error' in content): + error = content['_error'] + try: + exc_cls = imp.import_class(error['cls']) + raise exc_cls(error['val']) + except ImportError: + # NOTE(flaper87): The exception + # class couldn't be imported, using + # a generic exception. + raise exception.RPCError(**error) + return content + + def __getattr__(self, item): + """ + This method returns a method_proxy that + will execute the rpc call in the registry + service. + """ + if item.startswith('_'): + raise AttributeError(item) + + def method_proxy(**kw): + return self.do_request(item, **kw) + + return method_proxy diff --git a/api/escalator/common/utils.py b/api/escalator/common/utils.py new file mode 100644 index 0000000..ccdc7f8 --- /dev/null +++ b/api/escalator/common/utils.py @@ -0,0 +1,944 @@ +# Copyright 2010 United States Government as represented by the +# Administrator of the National Aeronautics and Space Administration. +# Copyright 2014 SoftLayer Technologies, Inc. +# Copyright 2015 Mirantis, Inc +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +""" +System-level utilities and helper functions. +""" + +import errno +from functools import reduce + +try: + from eventlet import sleep +except ImportError: + from time import sleep +from eventlet.green import socket + +import functools +import os +import platform +import re +import subprocess +import sys +import uuid +import copy + +from OpenSSL import crypto +from oslo_config import cfg +from oslo_log import log as logging +from oslo_utils import encodeutils +from oslo_utils import excutils +from oslo_utils import netutils +from oslo_utils import strutils +import six +from webob import exc +import ConfigParser + +from escalator.common import exception +from escalator import i18n +# from providerclient.v1 import client as provider_client + +CONF = cfg.CONF + +LOG = logging.getLogger(__name__) +_ = i18n._ +_LE = i18n._LE + + +ESCALATOR_TEST_SOCKET_FD_STR = 'ESCALATOR_TEST_SOCKET_FD' + + +def chunkreadable(iter, chunk_size=65536): + """ + Wrap a readable iterator with a reader yielding chunks of + a preferred size, otherwise leave iterator unchanged. + + :param iter: an iter which may also be readable + :param chunk_size: maximum size of chunk + """ + return chunkiter(iter, chunk_size) if hasattr(iter, 'read') else iter + + +def chunkiter(fp, chunk_size=65536): + """ + Return an iterator to a file-like obj which yields fixed size chunks + + :param fp: a file-like object + :param chunk_size: maximum size of chunk + """ + while True: + chunk = fp.read(chunk_size) + if chunk: + yield chunk + else: + break + + +def cooperative_iter(iter): + """ + Return an iterator which schedules after each + iteration. This can prevent eventlet thread starvation. + + :param iter: an iterator to wrap + """ + try: + for chunk in iter: + sleep(0) + yield chunk + except Exception as err: + with excutils.save_and_reraise_exception(): + msg = _LE("Error: cooperative_iter exception %s") % err + LOG.error(msg) + + +def cooperative_read(fd): + """ + Wrap a file descriptor's read with a partial function which schedules + after each read. This can prevent eventlet thread starvation. + + :param fd: a file descriptor to wrap + """ + def readfn(*args): + result = fd.read(*args) + sleep(0) + return result + return readfn + + +MAX_COOP_READER_BUFFER_SIZE = 134217728 # 128M seems like a sane buffer limit + + +class CooperativeReader(object): + + """ + An eventlet thread friendly class for reading in image data. + + When accessing data either through the iterator or the read method + we perform a sleep to allow a co-operative yield. When there is more than + one image being uploaded/downloaded this prevents eventlet thread + starvation, ie allows all threads to be scheduled periodically rather than + having the same thread be continuously active. + """ + + def __init__(self, fd): + """ + :param fd: Underlying image file object + """ + self.fd = fd + self.iterator = None + # NOTE(markwash): if the underlying supports read(), overwrite the + # default iterator-based implementation with cooperative_read which + # is more straightforward + if hasattr(fd, 'read'): + self.read = cooperative_read(fd) + else: + self.iterator = None + self.buffer = '' + self.position = 0 + + def read(self, length=None): + """Return the requested amount of bytes, fetching the next chunk of + the underlying iterator when needed. + + This is replaced with cooperative_read in __init__ if the underlying + fd already supports read(). + """ + if length is None: + if len(self.buffer) - self.position > 0: + # if no length specified but some data exists in buffer, + # return that data and clear the buffer + result = self.buffer[self.position:] + self.buffer = '' + self.position = 0 + return str(result) + else: + # otherwise read the next chunk from the underlying iterator + # and return it as a whole. Reset the buffer, as subsequent + # calls may specify the length + try: + if self.iterator is None: + self.iterator = self.__iter__() + return self.iterator.next() + except StopIteration: + return '' + finally: + self.buffer = '' + self.position = 0 + else: + result = bytearray() + while len(result) < length: + if self.position < len(self.buffer): + to_read = length - len(result) + chunk = self.buffer[self.position:self.position + to_read] + result.extend(chunk) + + # This check is here to prevent potential OOM issues if + # this code is called with unreasonably high values of read + # size. Currently it is only called from the HTTP clients + # of Glance backend stores, which use httplib for data + # streaming, which has readsize hardcoded to 8K, so this + # check should never fire. Regardless it still worths to + # make the check, as the code may be reused somewhere else. + if len(result) >= MAX_COOP_READER_BUFFER_SIZE: + raise exception.LimitExceeded() + self.position += len(chunk) + else: + try: + if self.iterator is None: + self.iterator = self.__iter__() + self.buffer = self.iterator.next() + self.position = 0 + except StopIteration: + self.buffer = '' + self.position = 0 + return str(result) + return str(result) + + def __iter__(self): + return cooperative_iter(self.fd.__iter__()) + + +class LimitingReader(object): + + """ + Reader designed to fail when reading image data past the configured + allowable amount. + """ + + def __init__(self, data, limit): + """ + :param data: Underlying image data object + :param limit: maximum number of bytes the reader should allow + """ + self.data = data + self.limit = limit + self.bytes_read = 0 + + def __iter__(self): + for chunk in self.data: + self.bytes_read += len(chunk) + if self.bytes_read > self.limit: + raise exception.ImageSizeLimitExceeded() + else: + yield chunk + + def read(self, i): + result = self.data.read(i) + self.bytes_read += len(result) + if self.bytes_read > self.limit: + raise exception.ImageSizeLimitExceeded() + return result + + +def get_dict_meta(response): + result = {} + for key, value in response.json.items(): + result[key] = value + return result + + +def create_mashup_dict(image_meta): + """ + Returns a dictionary-like mashup of the image core properties + and the image custom properties from given image metadata. + + :param image_meta: metadata of image with core and custom properties + """ + + def get_items(): + for key, value in six.iteritems(image_meta): + if isinstance(value, dict): + for subkey, subvalue in six.iteritems( + create_mashup_dict(value)): + if subkey not in image_meta: + yield subkey, subvalue + else: + yield key, value + + return dict(get_items()) + + +def safe_mkdirs(path): + try: + os.makedirs(path) + except OSError as e: + if e.errno != errno.EEXIST: + raise + + +def safe_remove(path): + try: + os.remove(path) + except OSError as e: + if e.errno != errno.ENOENT: + raise + + +class PrettyTable(object): + + """Creates an ASCII art table for use in bin/escalator + + """ + + def __init__(self): + self.columns = [] + + def add_column(self, width, label="", just='l'): + """Add a column to the table + + :param width: number of characters wide the column should be + :param label: column heading + :param just: justification for the column, 'l' for left, + 'r' for right + """ + self.columns.append((width, label, just)) + + def make_header(self): + label_parts = [] + break_parts = [] + for width, label, _ in self.columns: + # NOTE(sirp): headers are always left justified + label_part = self._clip_and_justify(label, width, 'l') + label_parts.append(label_part) + + break_part = '-' * width + break_parts.append(break_part) + + label_line = ' '.join(label_parts) + break_line = ' '.join(break_parts) + return '\n'.join([label_line, break_line]) + + def make_row(self, *args): + row = args + row_parts = [] + for data, (width, _, just) in zip(row, self.columns): + row_part = self._clip_and_justify(data, width, just) + row_parts.append(row_part) + + row_line = ' '.join(row_parts) + return row_line + + @staticmethod + def _clip_and_justify(data, width, just): + # clip field to column width + clipped_data = str(data)[:width] + + if just == 'r': + # right justify + justified = clipped_data.rjust(width) + else: + # left justify + justified = clipped_data.ljust(width) + + return justified + + +def get_terminal_size(): + + def _get_terminal_size_posix(): + import fcntl + import struct + import termios + + height_width = None + + try: + height_width = struct.unpack('hh', fcntl.ioctl(sys.stderr.fileno(), + termios.TIOCGWINSZ, + struct.pack( + 'HH', 0, 0))) + except Exception: + pass + + if not height_width: + try: + p = subprocess.Popen(['stty', 'size'], + shell=False, + stdout=subprocess.PIPE, + stderr=open(os.devnull, 'w')) + result = p.communicate() + if p.returncode == 0: + return tuple(int(x) for x in result[0].split()) + except Exception: + pass + + return height_width + + def _get_terminal_size_win32(): + try: + from ctypes import create_string_buffer + from ctypes import windll + handle = windll.kernel32.GetStdHandle(-12) + csbi = create_string_buffer(22) + res = windll.kernel32.GetConsoleScreenBufferInfo(handle, csbi) + except Exception: + return None + if res: + import struct + unpack_tmp = struct.unpack("hhhhHhhhhhh", csbi.raw) + (bufx, bufy, curx, cury, wattr, + left, top, right, bottom, maxx, maxy) = unpack_tmp + height = bottom - top + 1 + width = right - left + 1 + return (height, width) + else: + return None + + def _get_terminal_size_unknownOS(): + raise NotImplementedError + + func = {'posix': _get_terminal_size_posix, + 'win32': _get_terminal_size_win32} + + height_width = func.get(platform.os.name, _get_terminal_size_unknownOS)() + + if height_width is None: + raise exception.Invalid() + + for i in height_width: + if not isinstance(i, int) or i <= 0: + raise exception.Invalid() + + return height_width[0], height_width[1] + + +def mutating(func): + """Decorator to enforce read-only logic""" + @functools.wraps(func) + def wrapped(self, req, *args, **kwargs): + if req.context.read_only: + msg = "Read-only access" + LOG.debug(msg) + raise exc.HTTPForbidden(msg, request=req, + content_type="text/plain") + return func(self, req, *args, **kwargs) + return wrapped + + +def setup_remote_pydev_debug(host, port): + error_msg = _LE('Error setting up the debug environment. Verify that the' + ' option pydev_worker_debug_host is pointing to a valid ' + 'hostname or IP on which a pydev server is listening on' + ' the port indicated by pydev_worker_debug_port.') + + try: + try: + from pydev import pydevd + except ImportError: + import pydevd + + pydevd.settrace(host, + port=port, + stdoutToServer=True, + stderrToServer=True) + return True + except Exception: + with excutils.save_and_reraise_exception(): + LOG.exception(error_msg) + + +def validate_key_cert(key_file, cert_file): + try: + error_key_name = "private key" + error_filename = key_file + with open(key_file, 'r') as keyfile: + key_str = keyfile.read() + key = crypto.load_privatekey(crypto.FILETYPE_PEM, key_str) + + error_key_name = "certificate" + error_filename = cert_file + with open(cert_file, 'r') as certfile: + cert_str = certfile.read() + cert = crypto.load_certificate(crypto.FILETYPE_PEM, cert_str) + except IOError as ioe: + raise RuntimeError(_("There is a problem with your %(error_key_name)s " + "%(error_filename)s. Please verify it." + " Error: %(ioe)s") % + {'error_key_name': error_key_name, + 'error_filename': error_filename, + 'ioe': ioe}) + except crypto.Error as ce: + raise RuntimeError(_("There is a problem with your %(error_key_name)s " + "%(error_filename)s. Please verify it. OpenSSL" + " error: %(ce)s") % + {'error_key_name': error_key_name, + 'error_filename': error_filename, + 'ce': ce}) + + try: + data = str(uuid.uuid4()) + digest = CONF.digest_algorithm + if digest == 'sha1': + LOG.warn('The FIPS (FEDERAL INFORMATION PROCESSING STANDARDS)' + ' state that the SHA-1 is not suitable for' + ' general-purpose digital signature applications (as' + ' specified in FIPS 186-3) that require 112 bits of' + ' security. The default value is sha1 in Kilo for a' + ' smooth upgrade process, and it will be updated' + ' with sha256 in next release(L).') + out = crypto.sign(key, data, digest) + crypto.verify(cert, out, data, digest) + except crypto.Error as ce: + raise RuntimeError(_("There is a problem with your key pair. " + "Please verify that cert %(cert_file)s and " + "key %(key_file)s belong together. OpenSSL " + "error %(ce)s") % {'cert_file': cert_file, + 'key_file': key_file, + 'ce': ce}) + + +def get_test_suite_socket(): + global ESCALATOR_TEST_SOCKET_FD_STR + if ESCALATOR_TEST_SOCKET_FD_STR in os.environ: + fd = int(os.environ[ESCALATOR_TEST_SOCKET_FD_STR]) + sock = socket.fromfd(fd, socket.AF_INET, socket.SOCK_STREAM) + sock = socket.SocketType(_sock=sock) + sock.listen(CONF.backlog) + del os.environ[ESCALATOR_TEST_SOCKET_FD_STR] + os.close(fd) + return sock + return None + + +def is_uuid_like(val): + """Returns validation of a value as a UUID. + + For our purposes, a UUID is a canonical form string: + aaaaaaaa-aaaa-aaaa-aaaa-aaaaaaaaaaaa + """ + try: + return str(uuid.UUID(val)) == val + except (TypeError, ValueError, AttributeError): + return False + + +def exception_to_str(exc): + try: + error = six.text_type(exc) + except UnicodeError: + try: + error = str(exc) + except UnicodeError: + error = ("Caught '%(exception)s' exception." % + {"exception": exc.__class__.__name__}) + return encodeutils.safe_encode(error, errors='ignore') + + +try: + REGEX_4BYTE_UNICODE = re.compile(u'[\U00010000-\U0010ffff]') +except re.error: + # UCS-2 build case + REGEX_4BYTE_UNICODE = re.compile(u'[\uD800-\uDBFF][\uDC00-\uDFFF]') + + +def no_4byte_params(f): + """ + Checks that no 4 byte unicode characters are allowed + in dicts' keys/values and string's parameters + """ + def wrapper(*args, **kwargs): + + def _is_match(some_str): + return (isinstance(some_str, unicode) and + REGEX_4BYTE_UNICODE.findall(some_str) != []) + + def _check_dict(data_dict): + # a dict of dicts has to be checked recursively + for key, value in data_dict.iteritems(): + if isinstance(value, dict): + _check_dict(value) + else: + if _is_match(key): + msg = _("Property names can't contain 4 byte unicode.") + raise exception.Invalid(msg) + if _is_match(value): + msg = (_("%s can't contain 4 byte unicode characters.") + % key.title()) + raise exception.Invalid(msg) + + for data_dict in [arg for arg in args if isinstance(arg, dict)]: + _check_dict(data_dict) + # now check args for str values + for arg in args: + if _is_match(arg): + msg = _("Param values can't contain 4 byte unicode.") + raise exception.Invalid(msg) + # check kwargs as well, as params are passed as kwargs via + # registry calls + _check_dict(kwargs) + return f(*args, **kwargs) + return wrapper + + +def stash_conf_values(): + """ + Make a copy of some of the current global CONF's settings. + Allows determining if any of these values have changed + when the config is reloaded. + """ + conf = {} + conf['bind_host'] = CONF.bind_host + conf['bind_port'] = CONF.bind_port + conf['tcp_keepidle'] = CONF.cert_file + conf['backlog'] = CONF.backlog + conf['key_file'] = CONF.key_file + conf['cert_file'] = CONF.cert_file + + return conf + + +def validate_ip_format(ip_str): + ''' + valid ip_str format = '10.43.178.9' + invalid ip_str format : '123. 233.42.12', spaces existed in field + '3234.23.453.353', out of range + '-2.23.24.234', negative number in field + '1.2.3.4d', letter in field + '10.43.1789', invalid format + ''' + if not ip_str: + msg = (_("No ip given when check ip")) + LOG.error(msg) + raise exc.HTTPBadRequest(msg, content_type="text/plain") + + valid_fromat = False + if ip_str.count('.') == 3 and all(num.isdigit() and 0 <= int( + num) < 256 for num in ip_str.rstrip().split('.')): + valid_fromat = True + if not valid_fromat: + msg = (_("%s invalid ip format!") % ip_str) + LOG.error(msg) + raise exc.HTTPBadRequest(msg, content_type="text/plain") + + +def valid_cidr(cidr): + if not cidr: + msg = (_("No CIDR given.")) + LOG.error(msg) + raise exc.HTTPBadRequest(explanation=msg) + + cidr_division = cidr.split('/') + if (len(cidr_division) != 2 or + not cidr_division[0] or + not cidr_division[1]): + msg = (_("CIDR format error.")) + LOG.error(msg) + raise exc.HTTPBadRequest(explanation=msg) + + netmask_err_msg = (_("CIDR netmask error, " + "it should be a integer between 0-32.")) + try: + netmask_cidr = int(cidr_division[1]) + except ValueError: + LOG.warn(netmask_err_msg) + raise exc.HTTPBadRequest(explanation=netmask_err_msg) + + if (netmask_cidr < 0 and + netmask_cidr > 32): + LOG.warn(netmask_err_msg) + raise exc.HTTPBadRequest(explanation=netmask_err_msg) + + validate_ip_format(cidr_division[0]) + + +def ip_into_int(ip): + """ + Switch ip string to decimalism integer.. + :param ip: ip string + :return: decimalism integer + """ + return reduce(lambda x, y: (x << 8) + y, map(int, ip.split('.'))) + + +def int_into_ip(num): + inter_ip = lambda x: '.'.join( + [str(x / (256 ** i) % 256) for i in range(3, -1, -1)]) + return inter_ip(num) + + +def is_ip_in_cidr(ip, cidr): + """ + Check ip is in cidr + :param ip: Ip will be checked, like:192.168.1.2. + :param cidr: Ip range,like:192.168.0.0/24. + :return: If ip in cidr, return True, else return False. + """ + if not ip: + msg = "Error, ip is empty" + raise exc.HTTPBadRequest(explanation=msg) + if not cidr: + msg = "Error, CIDR is empty" + raise exc.HTTPBadRequest(explanation=msg) + network = cidr.split('/') + mask = ~(2**(32 - int(network[1])) - 1) + return (ip_into_int(ip) & mask) == (ip_into_int(network[0]) & mask) + + +def is_ip_in_ranges(ip, ip_ranges): + """ + Check ip is in range + : ip: Ip will be checked, like:192.168.1.2. + : ip_ranges : Ip ranges, like: + [{'start':'192.168.0.10', 'end':'192.168.0.20'} + {'start':'192.168.0.50', 'end':'192.168.0.60'}] + :return: If ip in ip_ranges, return True, else return False. + """ + if not ip: + msg = "Error, ip is empty" + raise exc.HTTPBadRequest(explanation=msg) + + if not ip_ranges: + return True + + for ip_range in ip_ranges: + start_ip_int = ip_into_int(ip_range['start']) + end_ip_int = ip_into_int(ip_range['end']) + ip_int = ip_into_int(ip) + if ip_int >= start_ip_int and ip_int <= end_ip_int: + return True + + return False + + +def merge_ip_ranges(ip_ranges): + if not ip_ranges: + return ip_ranges + sort_ranges_by_start_ip = {} + for ip_range in ip_ranges: + start_ip_int = ip_into_int(ip_range['start']) + sort_ranges_by_start_ip.update({str(start_ip_int): ip_range}) + sort_ranges = [sort_ranges_by_start_ip[key] for key in + sorted(sort_ranges_by_start_ip.keys())] + last_range_end_ip = None + + merged_ip_ranges = [] + for ip_range in sort_ranges: + if last_range_end_ip is None: + last_range_end_ip = ip_range['end'] + merged_ip_ranges.append(ip_range) + continue + else: + last_range_end_ip_int = ip_into_int(last_range_end_ip) + ip_range_start_ip_int = ip_into_int(ip_range['start']) + if (last_range_end_ip_int + 1) == ip_range_start_ip_int: + merged_ip_ranges[-1]['end'] = ip_range['end'] + else: + merged_ip_ranges.append(ip_range) + return merged_ip_ranges + + +def _split_ip_ranges(ip_ranges): + ip_ranges_start = set() + ip_ranges_end = set() + if not ip_ranges: + return (ip_ranges_start, ip_ranges_end) + + for ip_range in ip_ranges: + ip_ranges_start.add(ip_range['start']) + ip_ranges_end.add(ip_range['end']) + + return (ip_ranges_start, ip_ranges_end) + + +# [{'start':'192.168.0.10', 'end':'192.168.0.20'}, +# {'start':'192.168.0.21', 'end':'192.168.0.22'}] and +# [{'start':'192.168.0.10', 'end':'192.168.0.22'}] is equal here +def is_ip_ranges_equal(ip_ranges1, ip_ranges2): + if not ip_ranges1 and not ip_ranges2: + return True + if ((ip_ranges1 and not ip_ranges2) or + (ip_ranges2 and not ip_ranges1)): + return False + ip_ranges_1 = copy.deepcopy(ip_ranges1) + ip_ranges_2 = copy.deepcopy(ip_ranges2) + merged_ip_ranges1 = merge_ip_ranges(ip_ranges_1) + merged_ip_ranges2 = merge_ip_ranges(ip_ranges_2) + ip_ranges1_start, ip_ranges1_end = _split_ip_ranges(merged_ip_ranges1) + ip_ranges2_start, ip_ranges2_end = _split_ip_ranges(merged_ip_ranges2) + if (ip_ranges1_start == ip_ranges2_start and + ip_ranges1_end == ip_ranges2_end): + return True + else: + return False + + +def get_dvs_interfaces(host_interfaces): + dvs_interfaces = [] + if not isinstance(host_interfaces, list): + host_interfaces = eval(host_interfaces) + for interface in host_interfaces: + if not isinstance(interface, dict): + interface = eval(interface) + if ('vswitch_type' in interface and + interface['vswitch_type'] == 'dvs'): + dvs_interfaces.append(interface) + + return dvs_interfaces + + +def get_clc_pci_info(pci_info): + clc_pci = [] + flag1 = 'Intel Corporation Coleto Creek PCIe Endpoint' + flag2 = '8086:0435' + for pci in pci_info: + if flag1 in pci or flag2 in pci: + clc_pci.append(pci.split()[0]) + return clc_pci + + +def cpu_str_to_list(spec): + """Parse a CPU set specification. + + :param spec: cpu set string eg "1-4,^3,6" + + Each element in the list is either a single + CPU number, a range of CPU numbers, or a + caret followed by a CPU number to be excluded + from a previous range. + + :returns: a set of CPU indexes + """ + + cpusets = [] + if not spec: + return cpusets + + cpuset_ids = set() + cpuset_reject_ids = set() + for rule in spec.split(','): + rule = rule.strip() + # Handle multi ',' + if len(rule) < 1: + continue + # Note the count limit in the .split() call + range_parts = rule.split('-', 1) + if len(range_parts) > 1: + # So, this was a range; start by converting the parts to ints + try: + start, end = [int(p.strip()) for p in range_parts] + except ValueError: + raise exception.Invalid(_("Invalid range expression %r") + % rule) + # Make sure it's a valid range + if start > end: + raise exception.Invalid(_("Invalid range expression %r") + % rule) + # Add available CPU ids to set + cpuset_ids |= set(range(start, end + 1)) + elif rule[0] == '^': + # Not a range, the rule is an exclusion rule; convert to int + try: + cpuset_reject_ids.add(int(rule[1:].strip())) + except ValueError: + raise exception.Invalid(_("Invalid exclusion " + "expression %r") % rule) + else: + # OK, a single CPU to include; convert to int + try: + cpuset_ids.add(int(rule)) + except ValueError: + raise exception.Invalid(_("Invalid inclusion " + "expression %r") % rule) + + # Use sets to handle the exclusion rules for us + cpuset_ids -= cpuset_reject_ids + cpusets = list(cpuset_ids) + cpusets.sort() + return cpusets + + +def cpu_list_to_str(cpu_list): + """Parse a CPU list to string. + + :param cpu_list: eg "[1,2,3,4,6,7]" + + :returns: a string of CPU ranges, eg 1-4,6,7 + """ + spec = '' + if not cpu_list: + return spec + + cpu_list.sort() + count = 0 + group_cpus = [] + tmp_cpus = [] + for cpu in cpu_list: + if count == 0: + init = cpu + tmp_cpus.append(cpu) + else: + if cpu == (init + count): + tmp_cpus.append(cpu) + else: + group_cpus.append(tmp_cpus) + tmp_cpus = [] + count = 0 + init = cpu + tmp_cpus.append(cpu) + count += 1 + + group_cpus.append(tmp_cpus) + + for group in group_cpus: + if len(group) > 2: + group_spec = ("%s-%s" % (group[0], group[0]+len(group)-1)) + else: + group_str = [str(num) for num in group] + group_spec = ','.join(group_str) + if spec: + spec += ',' + group_spec + else: + spec = group_spec + + return spec + + +def simple_subprocess_call(cmd): + return_code = subprocess.call(cmd, + shell=True, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE) + return return_code + + +def translate_quotation_marks_for_shell(orig_str): + translated_str = '' + quotation_marks = '"' + quotation_marks_count = orig_str.count(quotation_marks) + if quotation_marks_count > 0: + replace_marks = '\\"' + translated_str = orig_str.replace(quotation_marks, replace_marks) + else: + translated_str = orig_str + return translated_str + + +def translate_marks_4_sed_command(ori_str): + translated_str = ori_str + translated_marks = { + '/': '\/', + '.': '\.', + '"': '\\"'} + for translated_mark in translated_marks: + if translated_str.count(translated_mark): + translated_str = translated_str.\ + replace(translated_mark, translated_marks[translated_mark]) + return translated_str + + + diff --git a/api/escalator/common/wsgi.py b/api/escalator/common/wsgi.py new file mode 100644 index 0000000..c4e8bfd --- /dev/null +++ b/api/escalator/common/wsgi.py @@ -0,0 +1,911 @@ +# Copyright 2010 United States Government as represented by the +# Administrator of the National Aeronautics and Space Administration. +# Copyright 2010 OpenStack Foundation +# Copyright 2014 IBM Corp. +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +""" +Utility methods for working with WSGI servers +""" +from __future__ import print_function + +import errno +import functools +import os +import signal +import sys +import time + +import eventlet +from eventlet.green import socket +from eventlet.green import ssl +import eventlet.greenio +import eventlet.wsgi +from oslo_serialization import jsonutils +from oslo_concurrency import processutils +from oslo_config import cfg +from oslo_log import log as logging +from oslo_log import loggers +import routes +import routes.middleware +import six +import webob.dec +import webob.exc +from webob import multidict + +from escalator.common import exception +from escalator.common import utils +from escalator import i18n + + +_ = i18n._ +_LE = i18n._LE +_LI = i18n._LI +_LW = i18n._LW + +bind_opts = [ + cfg.StrOpt('bind_host', default='0.0.0.0', + help=_('Address to bind the server. Useful when ' + 'selecting a particular network interface.')), + cfg.IntOpt('bind_port', + help=_('The port on which the server will listen.')), +] + +socket_opts = [ + cfg.IntOpt('backlog', default=4096, + help=_('The backlog value that will be used when creating the ' + 'TCP listener socket.')), + cfg.IntOpt('tcp_keepidle', default=600, + help=_('The value for the socket option TCP_KEEPIDLE. This is ' + 'the time in seconds that the connection must be idle ' + 'before TCP starts sending keepalive probes.')), + cfg.StrOpt('ca_file', help=_('CA certificate file to use to verify ' + 'connecting clients.')), + cfg.StrOpt('cert_file', help=_('Certificate file to use when starting API ' + 'server securely.')), + cfg.StrOpt('key_file', help=_('Private key file to use when starting API ' + 'server securely.')), +] + +eventlet_opts = [ + cfg.IntOpt('workers', default=processutils.get_worker_count(), + help=_('The number of child process workers that will be ' + 'created to service requests. The default will be ' + 'equal to the number of CPUs available.')), + cfg.IntOpt('max_header_line', default=16384, + help=_('Maximum line size of message headers to be accepted. ' + 'max_header_line may need to be increased when using ' + 'large tokens (typically those generated by the ' + 'Keystone v3 API with big service catalogs')), + cfg.BoolOpt('http_keepalive', default=True, + help=_('If False, server will return the header ' + '"Connection: close", ' + 'If True, server will return "Connection: Keep-Alive" ' + 'in its responses. In order to close the client socket ' + 'connection explicitly after the response is sent and ' + 'read successfully by the client, you simply have to ' + 'set this option to False when you create a wsgi ' + 'server.')), +] + +profiler_opts = [ + cfg.BoolOpt("enabled", default=False, + help=_('If False fully disable profiling feature.')), + cfg.BoolOpt("trace_sqlalchemy", default=False, + help=_("If False doesn't trace SQL requests.")) +] + + +LOG = logging.getLogger(__name__) + +CONF = cfg.CONF +CONF.register_opts(bind_opts) +CONF.register_opts(socket_opts) +CONF.register_opts(eventlet_opts) +CONF.register_opts(profiler_opts, group="profiler") + +ASYNC_EVENTLET_THREAD_POOL_LIST = [] + + +def get_bind_addr(default_port=None): + """Return the host and port to bind to.""" + return (CONF.bind_host, CONF.bind_port or default_port) + + +def ssl_wrap_socket(sock): + """ + Wrap an existing socket in SSL + + :param sock: non-SSL socket to wrap + + :returns: An SSL wrapped socket + """ + utils.validate_key_cert(CONF.key_file, CONF.cert_file) + + ssl_kwargs = { + 'server_side': True, + 'certfile': CONF.cert_file, + 'keyfile': CONF.key_file, + 'cert_reqs': ssl.CERT_NONE, + } + + if CONF.ca_file: + ssl_kwargs['ca_certs'] = CONF.ca_file + ssl_kwargs['cert_reqs'] = ssl.CERT_REQUIRED + + return ssl.wrap_socket(sock, **ssl_kwargs) + + +def get_socket(default_port): + """ + Bind socket to bind ip:port in conf + + note: Mostly comes from Swift with a few small changes... + + :param default_port: port to bind to if none is specified in conf + + :returns : a socket object as returned from socket.listen or + ssl.wrap_socket if conf specifies cert_file + """ + bind_addr = get_bind_addr(default_port) + + # TODO(jaypipes): eventlet's greened socket module does not actually + # support IPv6 in getaddrinfo(). We need to get around this in the + # future or monitor upstream for a fix + address_family = [ + addr[0] for addr in socket.getaddrinfo(bind_addr[0], + bind_addr[1], + socket.AF_UNSPEC, + socket.SOCK_STREAM) + if addr[0] in (socket.AF_INET, socket.AF_INET6) + ][0] + + use_ssl = CONF.key_file or CONF.cert_file + if use_ssl and (not CONF.key_file or not CONF.cert_file): + raise RuntimeError(_("When running server in SSL mode, you must " + "specify both a cert_file and key_file " + "option value in your configuration file")) + + sock = utils.get_test_suite_socket() + retry_until = time.time() + 30 + + while not sock and time.time() < retry_until: + try: + sock = eventlet.listen(bind_addr, + backlog=CONF.backlog, + family=address_family) + except socket.error as err: + if err.args[0] != errno.EADDRINUSE: + raise + eventlet.sleep(0.1) + if not sock: + raise RuntimeError(_("Could not bind to %(host)s:%(port)s after" + " trying for 30 seconds") % + {'host': bind_addr[0], + 'port': bind_addr[1]}) + + return sock + + +def set_eventlet_hub(): + try: + eventlet.hubs.use_hub('poll') + except Exception: + try: + eventlet.hubs.use_hub('selects') + except Exception: + msg = _("eventlet 'poll' nor 'selects' hubs are available " + "on this platform") + raise exception.WorkerCreationFailure( + reason=msg) + + +def get_asynchronous_eventlet_pool(size=1000): + """Return eventlet pool to caller. + + Also store pools created in global list, to wait on + it after getting signal for graceful shutdown. + + :param size: eventlet pool size + :returns: eventlet pool + """ + global ASYNC_EVENTLET_THREAD_POOL_LIST + + pool = eventlet.GreenPool(size=size) + # Add pool to global ASYNC_EVENTLET_THREAD_POOL_LIST + ASYNC_EVENTLET_THREAD_POOL_LIST.append(pool) + + return pool + + +class Server(object): + """Server class to manage multiple WSGI sockets and applications. + + This class requires initialize_escalator_store set to True if + escalator store needs to be initialized. + """ + + def __init__(self, threads=1000, initialize_escalator_store=False): + os.umask(0o27) # ensure files are created with the correct privileges + self._logger = logging.getLogger("eventlet.wsgi.server") + self._wsgi_logger = loggers.WritableLogger(self._logger) + self.threads = threads + self.children = set() + self.stale_children = set() + self.running = True + # NOTE(abhishek): Allows us to only re-initialize escalator_store when + # the API's configuration reloads. + self.initialize_escalator_store = initialize_escalator_store + self.pgid = os.getpid() + try: + # NOTE(flaper87): Make sure this process + # runs in its own process group. + os.setpgid(self.pgid, self.pgid) + except OSError: + # NOTE(flaper87): When running escalator-control, + # (escalator's functional tests, for example) + # setpgid fails with EPERM as escalator-control + # creates a fresh session, of which the newly + # launched service becomes the leader (session + # leaders may not change process groups) + # + # Running escalator-(api) is safe and + # shouldn't raise any error here. + self.pgid = 0 + + def hup(self, *args): + """ + Reloads configuration files with zero down time + """ + signal.signal(signal.SIGHUP, signal.SIG_IGN) + raise exception.SIGHUPInterrupt + + def kill_children(self, *args): + """Kills the entire process group.""" + signal.signal(signal.SIGTERM, signal.SIG_IGN) + signal.signal(signal.SIGINT, signal.SIG_IGN) + self.running = False + os.killpg(self.pgid, signal.SIGTERM) + + def start(self, application, default_port): + """ + Run a WSGI server with the given application. + + :param application: The application to be run in the WSGI server + :param default_port: Port to bind to if none is specified in conf + """ + self.application = application + self.default_port = default_port + self.configure() + self.start_wsgi() + + def start_wsgi(self): + + if CONF.workers == 0: + # Useful for profiling, test, debug etc. + self.pool = self.create_pool() + self.pool.spawn_n(self._single_run, self.application, self.sock) + return + else: + LOG.info(_LI("Starting %d workers") % CONF.workers) + signal.signal(signal.SIGTERM, self.kill_children) + signal.signal(signal.SIGINT, self.kill_children) + signal.signal(signal.SIGHUP, self.hup) + while len(self.children) < CONF.workers: + self.run_child() + + def create_pool(self): + return eventlet.GreenPool(size=self.threads) + + def _remove_children(self, pid): + if pid in self.children: + self.children.remove(pid) + LOG.info(_LI('Removed dead child %s') % pid) + elif pid in self.stale_children: + self.stale_children.remove(pid) + LOG.info(_LI('Removed stale child %s') % pid) + else: + LOG.warn(_LW('Unrecognised child %s') % pid) + + def _verify_and_respawn_children(self, pid, status): + if len(self.stale_children) == 0: + LOG.debug('No stale children') + if os.WIFEXITED(status) and os.WEXITSTATUS(status) != 0: + LOG.error(_LE('Not respawning child %d, cannot ' + 'recover from termination') % pid) + if not self.children and not self.stale_children: + LOG.info( + _LI('All workers have terminated. Exiting')) + self.running = False + else: + if len(self.children) < CONF.workers: + self.run_child() + + def wait_on_children(self): + while self.running: + try: + pid, status = os.wait() + if os.WIFEXITED(status) or os.WIFSIGNALED(status): + self._remove_children(pid) + self._verify_and_respawn_children(pid, status) + except OSError as err: + if err.errno not in (errno.EINTR, errno.ECHILD): + raise + except KeyboardInterrupt: + LOG.info(_LI('Caught keyboard interrupt. Exiting.')) + break + except exception.SIGHUPInterrupt: + self.reload() + continue + eventlet.greenio.shutdown_safe(self.sock) + self.sock.close() + LOG.debug('Exited') + + def configure(self, old_conf=None, has_changed=None): + """ + Apply configuration settings + + :param old_conf: Cached old configuration settings (if any) + :param has changed: callable to determine if a parameter has changed + """ + eventlet.wsgi.MAX_HEADER_LINE = CONF.max_header_line + self.configure_socket(old_conf, has_changed) + if self.initialize_escalator_store: + initialize_escalator_store() + + def reload(self): + """ + Reload and re-apply configuration settings + + Existing child processes are sent a SIGHUP signal + and will exit after completing existing requests. + New child processes, which will have the updated + configuration, are spawned. This allows preventing + interruption to the service. + """ + def _has_changed(old, new, param): + old = old.get(param) + new = getattr(new, param) + return (new != old) + + old_conf = utils.stash_conf_values() + has_changed = functools.partial(_has_changed, old_conf, CONF) + CONF.reload_config_files() + os.killpg(self.pgid, signal.SIGHUP) + self.stale_children = self.children + self.children = set() + + # Ensure any logging config changes are picked up + logging.setup(CONF, 'escalator') + + self.configure(old_conf, has_changed) + self.start_wsgi() + + def wait(self): + """Wait until all servers have completed running.""" + try: + if self.children: + self.wait_on_children() + else: + self.pool.waitall() + except KeyboardInterrupt: + pass + + def run_child(self): + def child_hup(*args): + """Shuts down child processes, existing requests are handled.""" + signal.signal(signal.SIGHUP, signal.SIG_IGN) + eventlet.wsgi.is_accepting = False + self.sock.close() + + pid = os.fork() + if pid == 0: + signal.signal(signal.SIGHUP, child_hup) + signal.signal(signal.SIGTERM, signal.SIG_DFL) + # ignore the interrupt signal to avoid a race whereby + # a child worker receives the signal before the parent + # and is respawned unnecessarily as a result + signal.signal(signal.SIGINT, signal.SIG_IGN) + # The child has no need to stash the unwrapped + # socket, and the reference prevents a clean + # exit on sighup + self._sock = None + self.run_server() + LOG.info(_LI('Child %d exiting normally') % os.getpid()) + # self.pool.waitall() is now called in wsgi's server so + # it's safe to exit here + sys.exit(0) + else: + LOG.info(_LI('Started child %s') % pid) + self.children.add(pid) + + def run_server(self): + """Run a WSGI server.""" + if cfg.CONF.pydev_worker_debug_host: + utils.setup_remote_pydev_debug(cfg.CONF.pydev_worker_debug_host, + cfg.CONF.pydev_worker_debug_port) + + eventlet.wsgi.HttpProtocol.default_request_version = "HTTP/1.0" + self.pool = self.create_pool() + try: + eventlet.wsgi.server(self.sock, + self.application, + log=self._wsgi_logger, + custom_pool=self.pool, + debug=False, + keepalive=CONF.http_keepalive) + except socket.error as err: + if err[0] != errno.EINVAL: + raise + + # waiting on async pools + if ASYNC_EVENTLET_THREAD_POOL_LIST: + for pool in ASYNC_EVENTLET_THREAD_POOL_LIST: + pool.waitall() + + def _single_run(self, application, sock): + """Start a WSGI server in a new green thread.""" + LOG.info(_LI("Starting single process server")) + eventlet.wsgi.server(sock, application, custom_pool=self.pool, + log=self._wsgi_logger, + debug=False, + keepalive=CONF.http_keepalive) + + def configure_socket(self, old_conf=None, has_changed=None): + """ + Ensure a socket exists and is appropriately configured. + + This function is called on start up, and can also be + called in the event of a configuration reload. + + When called for the first time a new socket is created. + If reloading and either bind_host or bind port have been + changed the existing socket must be closed and a new + socket opened (laws of physics). + + In all other cases (bind_host/bind_port have not changed) + the existing socket is reused. + + :param old_conf: Cached old configuration settings (if any) + :param has changed: callable to determine if a parameter has changed + """ + # Do we need a fresh socket? + new_sock = (old_conf is None or ( + has_changed('bind_host') or + has_changed('bind_port'))) + # Will we be using https? + use_ssl = not (not CONF.cert_file or not CONF.key_file) + # Were we using https before? + old_use_ssl = (old_conf is not None and not ( + not old_conf.get('key_file') or + not old_conf.get('cert_file'))) + # Do we now need to perform an SSL wrap on the socket? + wrap_sock = use_ssl is True and (old_use_ssl is False or new_sock) + # Do we now need to perform an SSL unwrap on the socket? + unwrap_sock = use_ssl is False and old_use_ssl is True + + if new_sock: + self._sock = None + if old_conf is not None: + self.sock.close() + _sock = get_socket(self.default_port) + _sock.setsockopt(socket.SOL_SOCKET, + socket.SO_REUSEADDR, 1) + # sockets can hang around forever without keepalive + _sock.setsockopt(socket.SOL_SOCKET, + socket.SO_KEEPALIVE, 1) + self._sock = _sock + + if wrap_sock: + self.sock = ssl_wrap_socket(self._sock) + + if unwrap_sock: + self.sock = self._sock + + if new_sock and not use_ssl: + self.sock = self._sock + + # Pick up newly deployed certs + if old_conf is not None and use_ssl is True and old_use_ssl is True: + if has_changed('cert_file') or has_changed('key_file'): + utils.validate_key_cert(CONF.key_file, CONF.cert_file) + if has_changed('cert_file'): + self.sock.certfile = CONF.cert_file + if has_changed('key_file'): + self.sock.keyfile = CONF.key_file + + if new_sock or (old_conf is not None and has_changed('tcp_keepidle')): + # This option isn't available in the OS X version of eventlet + if hasattr(socket, 'TCP_KEEPIDLE'): + self.sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPIDLE, + CONF.tcp_keepidle) + + if old_conf is not None and has_changed('backlog'): + self.sock.listen(CONF.backlog) + + +class Middleware(object): + """ + Base WSGI middleware wrapper. These classes require an application to be + initialized that will be called next. By default the middleware will + simply call its wrapped app, or you can override __call__ to customize its + behavior. + """ + + def __init__(self, application): + self.application = application + + @classmethod + def factory(cls, global_conf, **local_conf): + def filter(app): + return cls(app) + return filter + + def process_request(self, req): + """ + Called on each request. + + If this returns None, the next application down the stack will be + executed. If it returns a response then that response will be returned + and execution will stop here. + + """ + return None + + def process_response(self, response): + """Do whatever you'd like to the response.""" + return response + + @webob.dec.wsgify + def __call__(self, req): + response = self.process_request(req) + if response: + return response + response = req.get_response(self.application) + response.request = req + try: + return self.process_response(response) + except webob.exc.HTTPException as e: + return e + + +class Debug(Middleware): + """ + Helper class that can be inserted into any WSGI application chain + to get information about the request and response. + """ + + @webob.dec.wsgify + def __call__(self, req): + print(("*" * 40) + " REQUEST ENVIRON") + for key, value in req.environ.items(): + print(key, "=", value) + print('') + resp = req.get_response(self.application) + + print(("*" * 40) + " RESPONSE HEADERS") + for (key, value) in six.iteritems(resp.headers): + print(key, "=", value) + print('') + + resp.app_iter = self.print_generator(resp.app_iter) + + return resp + + @staticmethod + def print_generator(app_iter): + """ + Iterator that prints the contents of a wrapper string iterator + when iterated. + """ + print(("*" * 40) + " BODY") + for part in app_iter: + sys.stdout.write(part) + sys.stdout.flush() + yield part + print() + + +class APIMapper(routes.Mapper): + """ + Handle route matching when url is '' because routes.Mapper returns + an error in this case. + """ + + def routematch(self, url=None, environ=None): + if url is "": + result = self._match("", environ) + return result[0], result[1] + return routes.Mapper.routematch(self, url, environ) + + +class RejectMethodController(object): + + def reject(self, req, allowed_methods, *args, **kwargs): + LOG.debug("The method %s is not allowed for this resource" % + req.environ['REQUEST_METHOD']) + raise webob.exc.HTTPMethodNotAllowed( + headers=[('Allow', allowed_methods)]) + + +class Router(object): + """ + WSGI middleware that maps incoming requests to WSGI apps. + """ + + def __init__(self, mapper): + """ + Create a router for the given routes.Mapper. + + Each route in `mapper` must specify a 'controller', which is a + WSGI app to call. You'll probably want to specify an 'action' as + well and have your controller be a wsgi.Controller, who will route + the request to the action method. + + Examples: + mapper = routes.Mapper() + sc = ServerController() + + # Explicit mapping of one route to a controller+action + mapper.connect(None, "/svrlist", controller=sc, action="list") + + # Actions are all implicitly defined + mapper.resource("server", "servers", controller=sc) + + # Pointing to an arbitrary WSGI app. You can specify the + # {path_info:.*} parameter so the target app can be handed just that + # section of the URL. + mapper.connect(None, "/v1.0/{path_info:.*}", controller=BlogApp()) + """ + mapper.redirect("", "/") + self.map = mapper + self._router = routes.middleware.RoutesMiddleware(self._dispatch, + self.map) + + @classmethod + def factory(cls, global_conf, **local_conf): + return cls(APIMapper()) + + @webob.dec.wsgify + def __call__(self, req): + """ + Route the incoming request to a controller based on self.map. + If no match, return either a 404(Not Found) or 501(Not Implemented). + """ + return self._router + + @staticmethod + @webob.dec.wsgify + def _dispatch(req): + """ + Called by self._router after matching the incoming request to a route + and putting the information into req.environ. Either returns 404, + 501, or the routed WSGI app's response. + """ + match = req.environ['wsgiorg.routing_args'][1] + if not match: + implemented_http_methods = ['GET', 'HEAD', 'POST', 'PUT', + 'DELETE', 'PATCH'] + if req.environ['REQUEST_METHOD'] not in implemented_http_methods: + return webob.exc.HTTPNotImplemented() + else: + return webob.exc.HTTPNotFound() + app = match['controller'] + return app + + +class Request(webob.Request): + """Add some OpenStack API-specific logic to the base webob.Request.""" + + def best_match_content_type(self): + """Determine the requested response content-type.""" + supported = ('application/json',) + bm = self.accept.best_match(supported) + return bm or 'application/json' + + def get_content_type(self, allowed_content_types): + """Determine content type of the request body.""" + if "Content-Type" not in self.headers: + raise exception.InvalidContentType(content_type=None) + + content_type = self.content_type + + if content_type not in allowed_content_types: + raise exception.InvalidContentType(content_type=content_type) + else: + return content_type + + def best_match_language(self): + """Determines best available locale from the Accept-Language header. + + :returns: the best language match or None if the 'Accept-Language' + header was not available in the request. + """ + if not self.accept_language: + return None + langs = i18n.get_available_languages('escalator') + return self.accept_language.best_match(langs) + + def get_content_range(self): + """Return the `Range` in a request.""" + range_str = self.headers.get('Content-Range') + if range_str is not None: + range_ = webob.byterange.ContentRange.parse(range_str) + if range_ is None: + msg = _('Malformed Content-Range header: %s') % range_str + raise webob.exc.HTTPBadRequest(explanation=msg) + return range_ + + +class JSONRequestDeserializer(object): + valid_transfer_encoding = frozenset(['chunked', 'compress', 'deflate', + 'gzip', 'identity']) + + def has_body(self, request): + """ + Returns whether a Webob.Request object will possess an entity body. + + :param request: Webob.Request object + """ + request_encoding = request.headers.get('transfer-encoding', '').lower() + is_valid_encoding = request_encoding in self.valid_transfer_encoding + if is_valid_encoding and request.is_body_readable: + return True + elif request.content_length > 0: + return True + + return False + + @staticmethod + def _sanitizer(obj): + """Sanitizer method that will be passed to jsonutils.loads.""" + return obj + + def from_json(self, datastring): + try: + return jsonutils.loads(datastring, object_hook=self._sanitizer) + except ValueError: + msg = _('Malformed JSON in request body.') + raise webob.exc.HTTPBadRequest(explanation=msg) + + def default(self, request): + if self.has_body(request): + return {'body': self.from_json(request.body)} + else: + return {} + + +class JSONResponseSerializer(object): + + def _sanitizer(self, obj): + """Sanitizer method that will be passed to jsonutils.dumps.""" + if hasattr(obj, "to_dict"): + return obj.to_dict() + if isinstance(obj, multidict.MultiDict): + return obj.mixed() + return jsonutils.to_primitive(obj) + + def to_json(self, data): + return jsonutils.dumps(data, default=self._sanitizer) + + def default(self, response, result): + response.content_type = 'application/json' + response.body = self.to_json(result) + + +def translate_exception(req, e): + """Translates all translatable elements of the given exception.""" + + # The RequestClass attribute in the webob.dec.wsgify decorator + # does not guarantee that the request object will be a particular + # type; this check is therefore necessary. + if not hasattr(req, "best_match_language"): + return e + + locale = req.best_match_language() + + if isinstance(e, webob.exc.HTTPError): + e.explanation = i18n.translate(e.explanation, locale) + e.detail = i18n.translate(e.detail, locale) + if getattr(e, 'body_template', None): + e.body_template = i18n.translate(e.body_template, locale) + return e + + +class Resource(object): + """ + WSGI app that handles (de)serialization and controller dispatch. + + Reads routing information supplied by RoutesMiddleware and calls + the requested action method upon its deserializer, controller, + and serializer. Those three objects may implement any of the basic + controller action methods (create, update, show, index, delete) + along with any that may be specified in the api router. A 'default' + method may also be implemented to be used in place of any + non-implemented actions. Deserializer methods must accept a request + argument and return a dictionary. Controller methods must accept a + request argument. Additionally, they must also accept keyword + arguments that represent the keys returned by the Deserializer. They + may raise a webob.exc exception or return a dict, which will be + serialized by requested content type. + """ + + def __init__(self, controller, deserializer=None, serializer=None): + """ + :param controller: object that implement methods created by routes lib + :param deserializer: object that supports webob request deserialization + through controller-like actions + :param serializer: object that supports webob response serialization + through controller-like actions + """ + self.controller = controller + self.serializer = serializer or JSONResponseSerializer() + self.deserializer = deserializer or JSONRequestDeserializer() + + @webob.dec.wsgify(RequestClass=Request) + def __call__(self, request): + """WSGI method that controls (de)serialization and method dispatch.""" + action_args = self.get_action_args(request.environ) + action = action_args.pop('action', None) + + try: + deserialized_request = self.dispatch(self.deserializer, + action, request) + action_args.update(deserialized_request) + action_result = self.dispatch(self.controller, action, + request, **action_args) + except webob.exc.WSGIHTTPException as e: + exc_info = sys.exc_info() + raise translate_exception(request, e), None, exc_info[2] + + try: + response = webob.Response(request=request) + self.dispatch(self.serializer, action, response, action_result) + return response + except webob.exc.WSGIHTTPException as e: + return translate_exception(request, e) + except webob.exc.HTTPException as e: + return e + # return unserializable result (typically a webob exc) + except Exception: + return action_result + + def dispatch(self, obj, action, *args, **kwargs): + """Find action-specific method on self and call it.""" + try: + method = getattr(obj, action) + except AttributeError: + method = getattr(obj, 'default') + + return method(*args, **kwargs) + + def get_action_args(self, request_environment): + """Parse dictionary created by routes library.""" + try: + args = request_environment['wsgiorg.routing_args'][1].copy() + except Exception: + return {} + + try: + del args['controller'] + except KeyError: + pass + + try: + del args['format'] + except KeyError: + pass + + return args |