#!/usr/bin/env python3 """ Endpoints for registering and authenticating FIDO2 security keys. """ import json import secrets import functools from datetime import datetime from aiohttp import web from fido2.client import ClientData from fido2.server import Fido2Server from fido2.webauthn import PublicKeyCredentialRpEntity from fido2.ctap2 import AttestationObject, AuthenticatorData, \ AttestedCredentialData from fido2 import cbor import config routes = web.RouteTableDef() rp = PublicKeyCredentialRpEntity(config.server_domain, 'Buckler') server = Fido2Server(rp) def auth_required(func): """ Wrapper for views with should be protected by authtication. """ @functools.wraps(func) async def wrapper(request, *args, **kwargs): login_url = request.app.router['login'].url_for() sid = request.cookies.get('session') try: user_id = int(request.cookies.get('userid')) except (ValueError, TypeError): user_id = None if not sid or not user_id: raise web.HTTPFound(location=login_url) async with request.app['pool'].acquire() as conn: session = await conn.fetchrow( "SELECT user_info.id, user_info.username, user_info.admin, " "user_session.last_used " "FROM user_info LEFT JOIN user_session " "ON (user_info.id = user_session.user_id) " "WHERE user_info.id = $1 AND user_session.id = $2 " "AND user_info.active = TRUE AND user_session.expires > NOW()", user_id, sid) if session: request['session'] = dict(session) resp = await func(request, *args, **kwargs) tz = session['last_used'].tzinfo delta = datetime.now(tz) - session['last_used'] if delta.seconds > 600: async with request.app['pool'].acquire() as conn: await conn.execute( "UPDATE user_session SET last_used = NOW(), " "expires = NOW() + INTERVAL '30 DAYS' " "WHERE user_id = $1 AND id = $2", user_id, sid) resp.set_cookie( 'userid', user_id, max_age=30*24*60*60, secure=True, httponly=True) #samesite='strict') resp.set_cookie( 'session', sid, max_age=30*24*60*60, secure=True, httponly=True) #samesite='strict') return resp else: raise web.HTTPFound(location=login_url) return wrapper @routes.post('/register/begin', name='register_begin') @auth_required async def register_begin(request): user_id = request['session']['id'] username = request['session']['username'] async with request.app['pool'].acquire() as conn: exist_cred = await conn.fetch( "SELECT * FROM user_credential WHERE user_id = $1", user_id) exist_cred = [AttestedCredentialData(c['credential']) for c in exist_cred] registration_data, state = server.register_begin({ 'id': str(user_id).encode('utf8'), 'name': username, 'displayName': username, }, exist_cred, user_verification='discouraged') resp = web.Response(body=cbor.encode(registration_data)) resp.set_cookie( 'state', json.dumps(state), secure=True, httponly=True) #samesite='strict') return resp @routes.post('/register/complete',name='register_complete') @auth_required async def register_complete(request): user_id = request['session']['id'] username = request['session']['username'] data = await request.read() data = cbor.decode(data) client_data = ClientData(data['clientDataJSON']) att_obj = AttestationObject(data['attestationObject']) nick = data['security_key_nick'] try: assert 64 >= len(nick) >= 1 except AssertionError: d = {'ok': False, 'status': 400, 'message': "security key nick must be between 1 and 64 chars" } return web.json_response(d, status=400) state = json.loads(request.cookies.get('state')) auth_data = server.register_complete( state, client_data, att_obj ) async with request.app['pool'].acquire() as conn: await conn.execute( "INSERT INTO user_credential (user_id, nick, credential) " "VALUES ($1, $2, $3)", user_id, nick, auth_data.credential_data) resp = web.json_response({'ok': True}) resp.set_cookie( 'state', '', max_age=0, secure=True, httponly=True) #samesite='strict') return resp @routes.post('/authenticate/begin', name='authenticate_begin') async def authenticate_begin(request): user_id = int(request.cookies.get('userid')) if not user_id: raise web.HTTPUnauthorized data = await request.read() data = cbor.decode(data) async with request.app['pool'].acquire() as conn: credentials = await conn.fetch( "SELECT * FROM user_credential WHERE user_id = $1", user_id) credentials =[AttestedCredentialData(c['credential']) for c in credentials] auth_data, state = server.authenticate_begin(credentials) resp = web.Response(body=cbor.encode(auth_data)) resp.set_cookie( 'state', json.dumps(state), secure=True, httponly=True) #samesite='strict') return resp @routes.post('/authenticate/complete', name='authenticate_complete') async def authenticate_complete(request): user_id = int(request.cookies.get('userid')) async with request.app['pool'].acquire() as conn: credentials = await conn.fetch( "SELECT * FROM user_credential WHERE user_id = $1", user_id) credentials =[AttestedCredentialData(c['credential']) for c in credentials] data = await request.read() data = cbor.decode(data) credential_id = data['credentialId'] client_data = ClientData(data['clientDataJSON']) auth_data = AuthenticatorData(data['authenticatorData']) signature = data['signature'] state = json.loads(request.cookies.get('state')) server.authenticate_complete( state, credentials, credential_id, client_data, auth_data, signature ) # TODO: refactor so code isn't duplicated url = request.cookies.get('redirect') if not url: url = request.app.router['index'].url_for() resp = web.json_response({'ok': True, 'redirect': str(url)}) resp.set_cookie( 'state', '', max_age=0, secure=True, httponly=True) #samesite='strict') resp.set_cookie( 'redirect', '', max_age=0, secure=True, httponly=True) #samesite='strict') resp.set_cookie( 'userid', user_id, max_age=30*24*60*60, secure=True, httponly=True) #samesite='strict') sid = secrets.token_urlsafe(64) ip_address = request.headers['X-Real-IP'] async with request.app['pool'].acquire() as conn: await conn.execute( "INSERT INTO user_session (user_id, id, ip_address)" "VALUES ($1, $2, $3)", user_id, sid, ip_address) resp.set_cookie( 'session', sid, max_age=30*24*60*60, secure=True, httponly=True) #samesite='strict') return resp