227 lines
5.4 KiB
Python
227 lines
5.4 KiB
Python
|
#!/usr/bin/env python3
|
||
|
"""
|
||
|
Contains authentication methods for the app.
|
||
|
"""
|
||
|
import os
|
||
|
import functools
|
||
|
|
||
|
from fido2.client import ClientData
|
||
|
from fido2.server import Fido2Server, RelyingParty
|
||
|
from fido2.ctap2 import AttestationObject, AuthenticatorData, \
|
||
|
AttestedCredentialData
|
||
|
from fido2 import cbor
|
||
|
from flask import Blueprint, session, render_template, request, abort, \
|
||
|
redirect, url_for, g
|
||
|
from passlib.hash import argon2
|
||
|
|
||
|
import config
|
||
|
import db
|
||
|
|
||
|
auth_views = Blueprint("auth_views", __name__)
|
||
|
|
||
|
rp = RelyingParty('steelbea.me', 'Juice')
|
||
|
server = Fido2Server(rp)
|
||
|
|
||
|
|
||
|
def auth_required(func):
|
||
|
"""
|
||
|
Wrapper for views which should be protected by authentication.
|
||
|
"""
|
||
|
@functools.wraps(func)
|
||
|
def wrapper(*args, **kwargs):
|
||
|
username = session.get('username')
|
||
|
token = session.get('token')
|
||
|
if not username or not token:
|
||
|
return redirect(url_for('auth_views.login'))
|
||
|
|
||
|
data = db.get_user(username)
|
||
|
if not data:
|
||
|
session.pop('username')
|
||
|
return redirect(url_for('auth_views.login'))
|
||
|
token_hash = data[2]
|
||
|
if not token_hash:
|
||
|
return redirect(url_for('auth_views.login'))
|
||
|
|
||
|
if argon2.verify(token, token_hash):
|
||
|
return func(*args, **kwargs)
|
||
|
else:
|
||
|
session.pop('token')
|
||
|
return redirect(url_for('auth_views.login'))
|
||
|
return wrapper
|
||
|
|
||
|
|
||
|
@auth_views.route('/api/register/begin', methods=['POST'])
|
||
|
def register_begin():
|
||
|
if not config.registration_open:
|
||
|
return "Registration is closed."
|
||
|
|
||
|
username = session.get('username')
|
||
|
if not username:
|
||
|
return 'invalid'
|
||
|
|
||
|
data = db.get_user(username)
|
||
|
if not data:
|
||
|
session.pop('username')
|
||
|
return 'invalid'
|
||
|
user_id = data[0]
|
||
|
|
||
|
exist_cred = db.get_credentials(user_id)
|
||
|
exist_cred = [AttestedCredentialData(cd[2]) for cd in exist_cred]
|
||
|
|
||
|
registration_data, state = server.register_begin({
|
||
|
'id': str(user_id).encode('utf8'),
|
||
|
'name': username,
|
||
|
'displayName': username,
|
||
|
}, exist_cred, user_verification='discouraged')
|
||
|
|
||
|
session['state'] = state
|
||
|
return cbor.encode(registration_data)
|
||
|
|
||
|
|
||
|
@auth_views.route('/api/register/complete', methods=['POST'])
|
||
|
def register_complete():
|
||
|
if not config.registration_open:
|
||
|
return "Registration is closed."
|
||
|
|
||
|
username = session.get('username')
|
||
|
if not username:
|
||
|
return 'invalid'
|
||
|
|
||
|
data = db.get_user(username)
|
||
|
if not data:
|
||
|
session.pop('username')
|
||
|
return 'invalid'
|
||
|
user_id = data[0]
|
||
|
|
||
|
data = cbor.decode(request.get_data())
|
||
|
client_data = ClientData(data['clientDataJSON'])
|
||
|
att_obj = AttestationObject(data['attestationObject'])
|
||
|
|
||
|
auth_data = server.register_complete(
|
||
|
session.pop('state'),
|
||
|
client_data,
|
||
|
att_obj
|
||
|
)
|
||
|
|
||
|
db.set_credential(user_id, auth_data.credential_data)
|
||
|
return cbor.encode({'status': 'OK'})
|
||
|
|
||
|
|
||
|
@auth_views.route('/api/authenticate/begin', methods=['POST'])
|
||
|
def authenticate_begin():
|
||
|
username = session.get('username')
|
||
|
if not username:
|
||
|
return 'invalid'
|
||
|
|
||
|
data = db.get_user(session['username'])
|
||
|
if not data:
|
||
|
session.pop('username')
|
||
|
return 'invalid'
|
||
|
user_id = data[0]
|
||
|
|
||
|
credentials = db.get_credentials(user_id)
|
||
|
credentials = [AttestedCredentialData(cd[2]) for cd in credentials]
|
||
|
|
||
|
auth_data, state = server.authenticate_begin(credentials)
|
||
|
session['state'] = state
|
||
|
return cbor.encode(auth_data)
|
||
|
|
||
|
|
||
|
@auth_views.route('/api/authenticate/complete', methods=['POST'])
|
||
|
def authenticate_complete():
|
||
|
username = session.get('username')
|
||
|
if not username:
|
||
|
return 'invalid'
|
||
|
|
||
|
data = db.get_user(session['username'])
|
||
|
if not data:
|
||
|
session.pop('username')
|
||
|
return 'invalid'
|
||
|
user_id = data[0]
|
||
|
|
||
|
credentials = db.get_credentials(user_id)
|
||
|
credentials = [AttestedCredentialData(cd[2]) for cd in credentials]
|
||
|
|
||
|
data = cbor.decode(request.get_data())
|
||
|
credential_id = data['credentialId']
|
||
|
client_data = ClientData(data['clientDataJSON'])
|
||
|
auth_data = AuthenticatorData(data['authenticatorData'])
|
||
|
signature = data['signature']
|
||
|
|
||
|
server.authenticate_complete(
|
||
|
session.pop('state'),
|
||
|
credentials,
|
||
|
credential_id,
|
||
|
client_data,
|
||
|
auth_data,
|
||
|
signature
|
||
|
)
|
||
|
|
||
|
token = os.urandom(32)
|
||
|
token_hash = argon2.hash(token)
|
||
|
db.set_user_token(username, token_hash)
|
||
|
session['token'] = token
|
||
|
|
||
|
return cbor.encode({'status': 'OK'})
|
||
|
|
||
|
|
||
|
@auth_views.route('/register', methods=['GET', 'POST'])
|
||
|
def register():
|
||
|
"""
|
||
|
Registration page.
|
||
|
"""
|
||
|
if not config.registration_open:
|
||
|
return "Registration is closed."
|
||
|
|
||
|
if request.method == 'GET':
|
||
|
params = {
|
||
|
'title': 'Register',
|
||
|
'heading': 'Register an account',
|
||
|
'form_url': url_for('auth_views.register'),
|
||
|
}
|
||
|
return render_template('auth.html', **params)
|
||
|
|
||
|
username = request.form.get('username')
|
||
|
|
||
|
if len(username) > 64:
|
||
|
return "username too long"
|
||
|
elif len(username) < 3:
|
||
|
return "username too short"
|
||
|
|
||
|
db.set_user(username) # TODO: handle username error
|
||
|
session['username'] = username
|
||
|
|
||
|
params = {
|
||
|
'title': 'Register',
|
||
|
'heading': 'Register your authenticator',
|
||
|
'api_begin': url_for('auth_views.register_begin'),
|
||
|
'api_complete': url_for('auth_views.register_complete'),
|
||
|
}
|
||
|
return render_template('auth_fido.html', **params)
|
||
|
|
||
|
|
||
|
@auth_views.route('/login', methods=['GET', 'POST'])
|
||
|
def login():
|
||
|
"""
|
||
|
Login page.
|
||
|
"""
|
||
|
if request.method == 'GET':
|
||
|
params = {
|
||
|
'title': 'Login',
|
||
|
'heading': 'Login',
|
||
|
'form_url': url_for('auth_views.login'),
|
||
|
}
|
||
|
return render_template('auth.html', **params)
|
||
|
|
||
|
|
||
|
username = request.form.get('username')
|
||
|
session['username'] = username
|
||
|
|
||
|
params = {
|
||
|
'title': 'Login',
|
||
|
'heading': 'Login with your authenticator',
|
||
|
'api_begin': url_for('auth_views.authenticate_begin'),
|
||
|
'api_complete': url_for('auth_views.authenticate_complete'),
|
||
|
}
|
||
|
return render_template('auth_fido.html', **params)
|