Buckler/auth.py

217 lines
6.0 KiB
Python

#!/usr/bin/env python3
"""
Endpoints for registering and authenticating FIDO2 security keys.
"""
import json
import secrets
import functools
from datetime import datetime
from aiohttp import web
from fido2.client import ClientData
from fido2.server import Fido2Server
from fido2.webauthn import PublicKeyCredentialRpEntity
from fido2.ctap2 import AttestationObject, AuthenticatorData, \
AttestedCredentialData
from fido2 import cbor
import config
routes = web.RouteTableDef()
rp = PublicKeyCredentialRpEntity(config.server_domain, 'Buckler')
server = Fido2Server(rp)
def auth_required(func):
"""
Wrapper for views with should be protected by authtication.
"""
@functools.wraps(func)
async def wrapper(request, *args, **kwargs):
login_url = request.app.router['login'].url_for()
sid = request.cookies.get('session')
try:
user_id = int(request.cookies.get('userid', '0'))
except (ValueError, TypeError):
user_id = None
if not sid or not user_id:
raise web.HTTPFound(location=login_url)
async with request.app['pool'].acquire() as conn:
session = await conn.fetchrow(
"SELECT user_info.id, user_info.username, user_info.admin, "
"user_session.last_used "
"FROM user_info LEFT JOIN user_session "
"ON (user_info.id = user_session.user_id) "
"WHERE user_info.id = $1 AND user_session.id = $2 "
"AND user_info.active = TRUE AND user_session.expires > NOW()",
user_id, sid)
if session:
request['session'] = dict(session)
resp = await func(request, *args, **kwargs)
tz = session['last_used'].tzinfo
delta = datetime.now(tz) - session['last_used']
if delta.seconds > 600:
async with request.app['pool'].acquire() as conn:
await conn.execute(
"UPDATE user_session SET last_used = NOW(), "
"expires = NOW() + INTERVAL '30 DAYS' "
"WHERE user_id = $1 AND id = $2",
user_id, sid)
resp.set_cookie(
'userid',
user_id,
max_age=30*24*60*60,
secure=True,
httponly=True)
resp.set_cookie(
'session',
sid,
max_age=30*24*60*60,
secure=True,
httponly=True)
return resp
else:
raise web.HTTPFound(location=login_url)
return wrapper
@routes.post('/register/begin', name='register_begin')
@auth_required
async def register_begin(request):
user_id = request['session']['id']
username = request['session']['username']
async with request.app['pool'].acquire() as conn:
exist_cred = await conn.fetch(
"SELECT * FROM user_credential WHERE user_id = $1",
user_id)
exist_cred = [AttestedCredentialData(c['credential']) for c in exist_cred]
registration_data, state = server.register_begin({
'id': str(user_id).encode('utf8'),
'name': username,
'displayName': username,
}, exist_cred, user_verification='discouraged')
resp = web.Response(body=cbor.encode(registration_data))
resp.set_cookie('state', json.dumps(state))
return resp
@routes.post('/register/complete',name='register_complete')
@auth_required
async def register_complete(request):
user_id = request['session']['id']
username = request['session']['username']
data = await request.read()
data = cbor.decode(data)
client_data = ClientData(data['clientDataJSON'])
att_obj = AttestationObject(data['attestationObject'])
nick = data['security_key_nick']
try:
assert 64 >= len(nick) >= 1
except AssertionError:
d = {'ok': False,
'status': 400,
'message': "security key nick must be between 1 and 64 chars"
}
return web.json_response(d, status=400)
state = json.loads(request.cookies.get('state'))
auth_data = server.register_complete(
state,
client_data,
att_obj
)
async with request.app['pool'].acquire() as conn:
await conn.execute(
"INSERT INTO user_credential (user_id, nick, credential) "
"VALUES ($1, $2, $3)",
user_id, nick, auth_data.credential_data)
resp = web.json_response({'ok': True})
resp.set_cookie('state', '', max_age=0)
return resp
@routes.post('/authenticate/begin', name='authenticate_begin')
async def authenticate_begin(request):
user_id = int(request.cookies.get('userid'))
if not user_id:
raise web.HTTPUnauthorized
data = await request.read()
data = cbor.decode(data)
async with request.app['pool'].acquire() as conn:
credentials = await conn.fetch(
"SELECT * FROM user_credential WHERE user_id = $1",
user_id)
credentials =[AttestedCredentialData(c['credential']) for c in credentials]
auth_data, state = server.authenticate_begin(credentials)
resp = web.Response(body=cbor.encode(auth_data))
resp.set_cookie('state', json.dumps(state))
return resp
@routes.post('/authenticate/complete', name='authenticate_complete')
async def authenticate_complete(request):
user_id = int(request.cookies.get('userid'))
async with request.app['pool'].acquire() as conn:
credentials = await conn.fetch(
"SELECT * FROM user_credential WHERE user_id = $1",
user_id)
credentials =[AttestedCredentialData(c['credential']) for c in credentials]
data = await request.read()
data = cbor.decode(data)
credential_id = data['credentialId']
client_data = ClientData(data['clientDataJSON'])
auth_data = AuthenticatorData(data['authenticatorData'])
signature = data['signature']
state = json.loads(request.cookies.get('state'))
server.authenticate_complete(
state,
credentials,
credential_id,
client_data,
auth_data,
signature
)
# TODO: refactor so code isn't duplicated
url = request.cookies.get('redirect')
if not url:
url = request.app.router['index'].url_for()
resp = web.json_response({'ok': True, 'redirect': str(url)})
resp.set_cookie('state', '', max_age=0)
resp.set_cookie('redirect', '', max_age=0)
resp.set_cookie(
'userid',
user_id,
max_age=30*24*60*60,
secure=True,
httponly=True)
sid = secrets.token_urlsafe(64)
ip_address = request.headers['X-Real-IP']
async with request.app['pool'].acquire() as conn:
await conn.execute(
"INSERT INTO user_session (user_id, id, ip_address)"
"VALUES ($1, $2, $3)",
user_id,
sid,
ip_address)
resp.set_cookie(
'session',
sid,
max_age=30*24*60*60,
secure=True,
httponly=True)
return resp