aboutsummaryrefslogtreecommitdiffstats
path: root/keystone-moon/keystone/auth/plugins/totp.py
blob: d0b61b3b6f4390053e4b83ba614f04a4ea352e42 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
#      http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.

"""Time-based One-time Password Algorithm (TOTP) auth plugin

TOTP is an algorithm that computes a one-time password from a shared secret
key and the current time.

TOTP is an implementation of a hash-based message authentication code (HMAC).
It combines a secret key with the current timestamp using a cryptographic hash
function to generate a one-time password. The timestamp typically increases in
30-second intervals, so passwords generated close together in time from the
same secret key will be equal.
"""

import base64

from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives import hashes
from cryptography.hazmat.primitives.twofactor import totp as crypto_totp
from oslo_log import log
from oslo_utils import timeutils
import six

from keystone import auth
from keystone.auth import plugins
from keystone.common import dependency
from keystone import exception
from keystone.i18n import _


METHOD_NAME = 'totp'

LOG = log.getLogger(__name__)


def _generate_totp_passcode(secret):
    """Generate TOTP passcode.

    :param bytes secret: A base32 encoded secret for the TOTP authentication
    :returns: totp passcode as bytes
    """
    if isinstance(secret, six.text_type):
        # NOTE(dstanek): since this may be coming from the JSON stored in the
        # database it may be UTF-8 encoded
        secret = secret.encode('utf-8')

    # NOTE(nonameentername): cryptography takes a non base32 encoded value for
    # TOTP. Add the correct padding to be able to base32 decode
    while len(secret) % 8 != 0:
        secret = secret + b'='

    decoded = base64.b32decode(secret)
    totp = crypto_totp.TOTP(
        decoded, 6, hashes.SHA1(), 30, backend=default_backend())
    return totp.generate(timeutils.utcnow_ts(microsecond=True))


@dependency.requires('credential_api')
class TOTP(auth.AuthMethodHandler):

    def authenticate(self, context, auth_payload, auth_context):
        """Try to authenticate using TOTP"""
        user_info = plugins.TOTPUserInfo.create(auth_payload, METHOD_NAME)
        auth_passcode = auth_payload.get('user').get('passcode')

        credentials = self.credential_api.list_credentials_for_user(
            user_info.user_id, type='totp')

        valid_passcode = False
        for credential in credentials:
            try:
                generated_passcode = _generate_totp_passcode(
                    credential['blob'])
                if auth_passcode == generated_passcode:
                    valid_passcode = True
                    break
            except (ValueError, KeyError):
                LOG.debug('No TOTP match; credential id: %s, user_id: %s',
                          credential['id'], user_info.user_id)
            except (TypeError):
                LOG.debug('Base32 decode failed for TOTP credential %s',
                          credential['id'])

        if not valid_passcode:
            # authentication failed because of invalid username or passcode
            msg = _('Invalid username or TOTP passcode')
            raise exception.Unauthorized(msg)

        auth_context['user_id'] = user_info.user_id