Compare commits

..

2 Commits

Author SHA1 Message Date
3d71b50384 update buckler middleware 2020-09-23 09:05:06 -04:00
342c1a7c87 convert threading processes to asyncio 2020-09-23 09:04:27 -04:00
4 changed files with 59 additions and 44 deletions

View File

@ -10,4 +10,4 @@ Python packages: `gunicorn aiohttp aiohttp_jinja2`
2. Walk the dinosaur
## Usage
`gunicorn aberrant:app --bind localhost:5250 --worker-class aiohttp.GunicornWebWorker`
`gunicorn aberrant:init_app --bind localhost:5250 --worker-class aiohttp.GunicornWebWorker`

View File

@ -3,6 +3,7 @@
The primary module for serving the Aberrant application.
"""
import json
import asyncio
import jinja2
import aiohttp_jinja2
@ -14,14 +15,10 @@ import events
import rtorrent
import buckler_aiohttp
app = web.Application(middlewares=[buckler_aiohttp.buckler_session])
app.on_shutdown.append(rtorrent.stop_watch)
aiohttp_jinja2.setup(app, loader=jinja2.FileSystemLoader('templates'))
rtorrent.init()
routes = web.RouteTableDef()
@routes.get(config.prefix + "/", name='index')
@routes.get('/', name='index')
async def index(request):
"""The index page."""
torrents = rtorrent.get_active()
@ -29,7 +26,7 @@ async def index(request):
return render_template("index.html", request, locals())
@routes.get(config.prefix + '/ws', name='ws')
@routes.get('/ws', name='ws')
async def websocket_handler(request):
"""The websocket endpoint."""
ws = web.WebSocketResponse(heartbeat=30)
@ -56,7 +53,39 @@ async def websocket_handler(request):
return ws
app.router.add_routes(routes)
async def watcher_loop():
try:
while True:
await rtorrent.update_torrents()
await asyncio.sleep(10)
except asyncio.CancelledError:
return
async def start_background_tasks(app):
rtorrent.init()
app['watcher'] = asyncio.create_task(watcher_loop())
async def cleanup_background_tasks(app):
app['watcher'].cancel()
await app['watcher']
async def init_app():
"""Initializes the application."""
app = web.Application(middlewares=[buckler_aiohttp.buckler_session])
aiohttp_jinja2.setup(app, loader=jinja2.FileSystemLoader('templates'))
app.on_startup.append(start_background_tasks)
app.on_cleanup.append(cleanup_background_tasks)
app.router.add_routes(routes)
app_wrap = web.Application()
app_wrap.add_subapp(config.url_prefix, app)
return app_wrap
if __name__ == "__main__":
aiohttp.web.run_app(app, host='0.0.0.0', port=5250)
app = init_app()
web.run_app(app, host='0.0.0.0', port=5250)

View File

@ -16,8 +16,8 @@ async def buckler_session(request, handler):
Verifies the user with the configured Buckler app and retrieves any
session data they may have. Redirects them to the login page otherwise.
"""
user_id = request.cookies.get('userid')
user_sid = request.cookies.get('session')
user_id = request.cookies.get('userid', '')
user_sid = request.cookies.get('session', '')
url = config.buckler['url'] + '/get_session'
params = {
@ -30,7 +30,14 @@ async def buckler_session(request, handler):
async with session.get(url, params=params) as resp:
data = await resp.json()
if data.get('error'):
raise web.HTTPFound(location=config.buckler['login_url'])
resp = web.HTTPFound(config.buckler['login_url'])
resp.set_cookie(
'redirect',
request.url,
secure=True,
httponly=True)
#samesite='strict')
raise resp
request['session'] = data['session_data']
request['meta'] = data['meta']
@ -51,11 +58,13 @@ async def buckler_session(request, handler):
max_age=30*24*60*60,
secure=True,
httponly=True)
#samesite='strict')
resp.set_cookie(
'session',
user_sid,
max_age=30*24*60*60,
secure=True,
httponly=True)
#samesite='strict')
return resp

View File

@ -4,15 +4,14 @@ This module handles the interface with rTorrent via XMLRPC.
"""
import re
import time
import threading
import asyncio
from collections import defaultdict
import config
import rtorrent_xmlrpc
WATCH_HANDLE = None
sp = []
torrents = [[]] * len(config.rtorrent_insts)
torrents = [[] for _ in range(len(config.rtorrent_insts))]
class Torrent:
def __init__(self, raw, rt_id):
@ -55,26 +54,12 @@ class Torrent:
self.hashing = raw[10]
class Watch(threading.Thread):
"""A thread class that continously queries the rTorrent instances."""
def __init__(self):
super(Watch, self).__init__()
self._stop_event = threading.Event()
def stop(self):
self._stop_event.set()
def stopped(self):
return self._stop_event.is_set()
def run(self):
global torrents
while not self.stopped():
for n in range(len(torrents)):
if self.stopped():
break
torrents[n] = get_all(n)
self._stop_event.wait(0.5)
async def update_torrents():
for n in range(len(torrents)):
try:
torrents[n] = await asyncio.wait_for(get_all(n), 5)
except asyncio.TimeoutError:
continue
def size_units(rate):
@ -126,7 +111,7 @@ def all_torrents():
res += item
return res
def get_all(n):
async def get_all(n):
"""Gets all torrent information from a instance and returns it."""
res = sp[n].d.multicall2('', 'main',
'd.hash=',
@ -145,18 +130,10 @@ def get_all(n):
def init():
"""Initializes the rTorrent interface."""
global WATCH_HANDLE
global sp
for rt_port in config.rtorrent_insts:
s = rtorrent_xmlrpc.SCGIServerProxy(rt_port)
sp.append(s)
WATCH_HANDLE = Watch()
WATCH_HANDLE.start()
async def stop_watch(*args, **kwargs):
"""Stops the watch thread."""
global WATCH_HANDLE
WATCH_HANDLE.stop()
def get_active():
"""Returns all actively seeding or leeching torrents."""