#!/usr/bin/env python3 """ Contains authentication methods for the app. """ import os import re import time import sqlite3 import functools from fido2.client import ClientData from fido2.server import Fido2Server, RelyingParty from fido2.ctap2 import AttestationObject, AuthenticatorData, \ AttestedCredentialData from fido2 import cbor from flask import Blueprint, session, render_template, request, \ redirect, url_for, jsonify from passlib.hash import argon2 import db import config from tools import make_error auth_views = Blueprint("auth_views", __name__) rp = RelyingParty('steelbea.me', 'Juice') server = Fido2Server(rp) def auth_required(func): """ Wrapper for views which should be protected by authentication. """ @functools.wraps(func) def wrapper(*args, **kwargs): username = session.get('username') token = session.get('token') if not username or not token: return redirect(url_for('auth_views.login')) data = db.get_user(username) if not data: session.pop('username') session.pop('token') return redirect(url_for('auth_views.login')) user_id = data[0] tokens = db.get_tokens(user_id) if not tokens: return redirect(url_for('auth_views.login')) for token_line in tokens: token_hash = token_line[2] date_expired = token_line[4] if int(time.time()) >= date_expired: continue if argon2.verify(token, token_hash): db.refresh_token(token_line[0]) return func(*args, **kwargs) else: session.pop('token') return redirect(url_for('auth_views.login')) return wrapper @auth_views.route('/api/register/begin', methods=['POST']) def register_begin(): if not config.registration_open: return "Registration is closed." username = session.get('username') if not username: return 'invalid' data = db.get_user(username) if not data: session.pop('username') return 'invalid' user_id = data[0] exist_cred = db.get_credentials(user_id) exist_cred = [AttestedCredentialData(cd[2]) for cd in exist_cred] registration_data, state = server.register_begin({ 'id': str(user_id).encode('utf8'), 'name': username, 'displayName': username, }, exist_cred, user_verification='discouraged') session['state'] = state return cbor.encode(registration_data) @auth_views.route('/api/register/complete', methods=['POST']) def register_complete(): if not config.registration_open: return "Registration is closed." username = session.get('username') if not username: return 'invalid' session.pop('username') data = db.get_user(username) if not data: return 'invalid' user_id = data[0] data = cbor.decode(request.get_data()) client_data = ClientData(data['clientDataJSON']) att_obj = AttestationObject(data['attestationObject']) nick = data['security_key_nick'] try: assert 64 >= len(nick) >= 1 except AssertionError: return make_error(400, "security key nick too long/short") auth_data = server.register_complete( session.pop('state'), client_data, att_obj ) db.set_credential(user_id, nick, auth_data.credential_data) return jsonify(ok=True) @auth_views.route('/api/authenticate/begin', methods=['POST']) def authenticate_begin(): data = cbor.decode(request.get_data()) username = data.get('username') user = db.get_user(username) if not user: return make_error(404, "username not found") session['username'] = username user_id = user[0] credentials = db.get_credentials(user_id) credentials = [AttestedCredentialData(cd[3]) for cd in credentials] auth_data, state = server.authenticate_begin(credentials) session['state'] = state return cbor.encode(auth_data) @auth_views.route('/api/authenticate/complete', methods=['POST']) def authenticate_complete(): username = session.get('username') user = db.get_user(username) if not user: session.pop('username') return make_error(404, "username not found") user_id = user[0] credentials = db.get_credentials(user_id) credentials = [AttestedCredentialData(cd[3]) for cd in credentials] data = cbor.decode(request.get_data()) credential_id = data['credentialId'] client_data = ClientData(data['clientDataJSON']) auth_data = AuthenticatorData(data['authenticatorData']) signature = data['signature'] server.authenticate_complete( session.pop('state'), credentials, credential_id, client_data, auth_data, signature ) token = os.urandom(32) token_hash = argon2.hash(token) db.set_token(user_id, token_hash) session['token'] = token return jsonify(ok=True) @auth_views.route('/register', methods=['GET', 'POST']) def register(): """ Registration page. """ if not config.registration_open: return "Registration is closed." if request.method == 'GET': params = { 'form_url': url_for('auth_views.register'), 'url_prefix': config.url_prefix, } return render_template('register.html', **params) username = request.form.get('username') email = request.form.get('email') try: assert 64 >= len(username) >= 3 except AssertionError: return make_error(400, "username too long/short") try: assert 100 >= len(email) except AssertionError: return "email too long" try: user_id = db.set_user(username, email) except sqlite3.IntegrityError as e: field = re.search(r'user\.(.*)', str(e)).group(1) return make_error(400, f"{field} already exists") session['username'] = username session['user_id'] = user_id params = { 'url_prefix': config.url_prefix, } return render_template('register_key.html', **params) @auth_views.route('/login') def login(): """ Login page. """ params = { 'url_prefix': config.url_prefix, } return render_template('login.html', **params)