# Licensed under the Apache License, Version 2.0 (the "License"); you may # not use this file except in compliance with the License. You may obtain # a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the # License for the specific language governing permissions and limitations # under the License. """Time-based One-time Password Algorithm (TOTP) auth plugin TOTP is an algorithm that computes a one-time password from a shared secret key and the current time. TOTP is an implementation of a hash-based message authentication code (HMAC). It combines a secret key with the current timestamp using a cryptographic hash function to generate a one-time password. The timestamp typically increases in 30-second intervals, so passwords generated close together in time from the same secret key will be equal. """ import base64 from cryptography.hazmat.backends import default_backend from cryptography.hazmat.primitives import hashes from cryptography.hazmat.primitives.twofactor import totp as crypto_totp from oslo_log import log from oslo_utils import timeutils import six from keystone import auth from keystone.auth import plugins from keystone.common import dependency from keystone import exception from keystone.i18n import _ METHOD_NAME = 'totp' LOG = log.getLogger(__name__) def _generate_totp_passcode(secret): """Generate TOTP passcode. :param bytes secret: A base32 encoded secret for the TOTP authentication :returns: totp passcode as bytes """ if isinstance(secret, six.text_type): # NOTE(dstanek): since this may be coming from the JSON stored in the # database it may be UTF-8 encoded secret = secret.encode('utf-8') # NOTE(nonameentername): cryptography takes a non base32 encoded value for # TOTP. Add the correct padding to be able to base32 decode while len(secret) % 8 != 0: secret = secret + b'=' decoded = base64.b32decode(secret) totp = crypto_totp.TOTP( decoded, 6, hashes.SHA1(), 30, backend=default_backend()) return totp.generate(timeutils.utcnow_ts(microsecond=True)) @dependency.requires('credential_api') class TOTP(auth.AuthMethodHandler): def authenticate(self, context, auth_payload, auth_context): """Try to authenticate using TOTP""" user_info = plugins.TOTPUserInfo.create(auth_payload, METHOD_NAME) auth_passcode = auth_payload.get('user').get('passcode') credentials = self.credential_api.list_credentials_for_user( user_info.user_id, type='totp') valid_passcode = False for credential in credentials: try: generated_passcode = _generate_totp_passcode( credential['blob']) if auth_passcode == generated_passcode: valid_passcode = True break except (ValueError, KeyError): LOG.debug('No TOTP match; credential id: %s, user_id: %s', credential['id'], user_info.user_id) except (TypeError): LOG.debug('Base32 decode failed for TOTP credential %s', credential['id']) if not valid_passcode: # authentication failed because of invalid username or passcode msg = _('Invalid username or TOTP passcode') raise exception.Unauthorized(msg) auth_context['user_id'] = user_info.user_id