#!/usr/bin/env python3 """ Contains authentication methods for the app. """ import os import functools from fido2.client import ClientData from fido2.server import Fido2Server, RelyingParty from fido2.ctap2 import AttestationObject, AuthenticatorData, \ AttestedCredentialData from fido2 import cbor from flask import Blueprint, session, render_template, request, abort, \ redirect, url_for, g from passlib.hash import argon2 import config import db auth_views = Blueprint("auth_views", __name__) rp = RelyingParty('steelbea.me', 'Juice') server = Fido2Server(rp) def auth_required(func): """ Wrapper for views which should be protected by authentication. """ @functools.wraps(func) def wrapper(*args, **kwargs): username = session.get('username') token = session.get('token') if not username or not token: return redirect(url_for('auth_views.login')) data = db.get_user(username) if not data: session.pop('username') return redirect(url_for('auth_views.login')) token_hash = data[2] if not token_hash: return redirect(url_for('auth_views.login')) if argon2.verify(token, token_hash): return func(*args, **kwargs) else: session.pop('token') return redirect(url_for('auth_views.login')) return wrapper @auth_views.route('/api/register/begin', methods=['POST']) def register_begin(): if not config.registration_open: return "Registration is closed." username = session.get('username') if not username: return 'invalid' data = db.get_user(username) if not data: session.pop('username') return 'invalid' user_id = data[0] exist_cred = db.get_credentials(user_id) exist_cred = [AttestedCredentialData(cd[2]) for cd in exist_cred] registration_data, state = server.register_begin({ 'id': str(user_id).encode('utf8'), 'name': username, 'displayName': username, }, exist_cred, user_verification='discouraged') session['state'] = state return cbor.encode(registration_data) @auth_views.route('/api/register/complete', methods=['POST']) def register_complete(): if not config.registration_open: return "Registration is closed." username = session.get('username') if not username: return 'invalid' data = db.get_user(username) if not data: session.pop('username') return 'invalid' user_id = data[0] data = cbor.decode(request.get_data()) client_data = ClientData(data['clientDataJSON']) att_obj = AttestationObject(data['attestationObject']) auth_data = server.register_complete( session.pop('state'), client_data, att_obj ) db.set_credential(user_id, auth_data.credential_data) return cbor.encode({'status': 'OK'}) @auth_views.route('/api/authenticate/begin', methods=['POST']) def authenticate_begin(): username = session.get('username') if not username: return 'invalid' data = db.get_user(session['username']) if not data: session.pop('username') return 'invalid' user_id = data[0] credentials = db.get_credentials(user_id) credentials = [AttestedCredentialData(cd[2]) for cd in credentials] auth_data, state = server.authenticate_begin(credentials) session['state'] = state return cbor.encode(auth_data) @auth_views.route('/api/authenticate/complete', methods=['POST']) def authenticate_complete(): username = session.get('username') if not username: return 'invalid' data = db.get_user(session['username']) if not data: session.pop('username') return 'invalid' user_id = data[0] credentials = db.get_credentials(user_id) credentials = [AttestedCredentialData(cd[2]) for cd in credentials] data = cbor.decode(request.get_data()) credential_id = data['credentialId'] client_data = ClientData(data['clientDataJSON']) auth_data = AuthenticatorData(data['authenticatorData']) signature = data['signature'] server.authenticate_complete( session.pop('state'), credentials, credential_id, client_data, auth_data, signature ) token = os.urandom(32) token_hash = argon2.hash(token) db.set_user_token(username, token_hash) session['token'] = token return cbor.encode({'status': 'OK'}) @auth_views.route('/register', methods=['GET', 'POST']) def register(): """ Registration page. """ if not config.registration_open: return "Registration is closed." if request.method == 'GET': params = { 'title': 'Register', 'heading': 'Register an account', 'form_url': url_for('auth_views.register'), } return render_template('auth.html', **params) username = request.form.get('username') if len(username) > 64: return "username too long" elif len(username) < 3: return "username too short" db.set_user(username) # TODO: handle username error session['username'] = username params = { 'title': 'Register', 'heading': 'Register your authenticator', 'api_begin': url_for('auth_views.register_begin'), 'api_complete': url_for('auth_views.register_complete'), } return render_template('auth_fido.html', **params) @auth_views.route('/login', methods=['GET', 'POST']) def login(): """ Login page. """ if request.method == 'GET': params = { 'title': 'Login', 'heading': 'Login', 'form_url': url_for('auth_views.login'), } return render_template('auth.html', **params) username = request.form.get('username') session['username'] = username params = { 'title': 'Login', 'heading': 'Login with your authenticator', 'api_begin': url_for('auth_views.authenticate_begin'), 'api_complete': url_for('auth_views.authenticate_complete'), } return render_template('auth_fido.html', **params)