convert threading processes to asyncio

This commit is contained in:
iou1name 2020-09-23 09:04:27 -04:00
parent 99490dce06
commit 342c1a7c87
3 changed files with 47 additions and 41 deletions

View File

@ -10,4 +10,4 @@ Python packages: `gunicorn aiohttp aiohttp_jinja2`
2. Walk the dinosaur 2. Walk the dinosaur
## Usage ## Usage
`gunicorn aberrant:app --bind localhost:5250 --worker-class aiohttp.GunicornWebWorker` `gunicorn aberrant:init_app --bind localhost:5250 --worker-class aiohttp.GunicornWebWorker`

View File

@ -3,6 +3,7 @@
The primary module for serving the Aberrant application. The primary module for serving the Aberrant application.
""" """
import json import json
import asyncio
import jinja2 import jinja2
import aiohttp_jinja2 import aiohttp_jinja2
@ -14,14 +15,10 @@ import events
import rtorrent import rtorrent
import buckler_aiohttp import buckler_aiohttp
app = web.Application(middlewares=[buckler_aiohttp.buckler_session])
app.on_shutdown.append(rtorrent.stop_watch)
aiohttp_jinja2.setup(app, loader=jinja2.FileSystemLoader('templates'))
rtorrent.init()
routes = web.RouteTableDef() routes = web.RouteTableDef()
@routes.get(config.prefix + "/", name='index') @routes.get('/', name='index')
async def index(request): async def index(request):
"""The index page.""" """The index page."""
torrents = rtorrent.get_active() torrents = rtorrent.get_active()
@ -29,7 +26,7 @@ async def index(request):
return render_template("index.html", request, locals()) return render_template("index.html", request, locals())
@routes.get(config.prefix + '/ws', name='ws') @routes.get('/ws', name='ws')
async def websocket_handler(request): async def websocket_handler(request):
"""The websocket endpoint.""" """The websocket endpoint."""
ws = web.WebSocketResponse(heartbeat=30) ws = web.WebSocketResponse(heartbeat=30)
@ -56,7 +53,39 @@ async def websocket_handler(request):
return ws return ws
app.router.add_routes(routes) async def watcher_loop():
try:
while True:
await rtorrent.update_torrents()
await asyncio.sleep(10)
except asyncio.CancelledError:
return
async def start_background_tasks(app):
rtorrent.init()
app['watcher'] = asyncio.create_task(watcher_loop())
async def cleanup_background_tasks(app):
app['watcher'].cancel()
await app['watcher']
async def init_app():
"""Initializes the application."""
app = web.Application(middlewares=[buckler_aiohttp.buckler_session])
aiohttp_jinja2.setup(app, loader=jinja2.FileSystemLoader('templates'))
app.on_startup.append(start_background_tasks)
app.on_cleanup.append(cleanup_background_tasks)
app.router.add_routes(routes)
app_wrap = web.Application()
app_wrap.add_subapp(config.url_prefix, app)
return app_wrap
if __name__ == "__main__": if __name__ == "__main__":
aiohttp.web.run_app(app, host='0.0.0.0', port=5250) app = init_app()
web.run_app(app, host='0.0.0.0', port=5250)

View File

@ -4,15 +4,14 @@ This module handles the interface with rTorrent via XMLRPC.
""" """
import re import re
import time import time
import threading import asyncio
from collections import defaultdict from collections import defaultdict
import config import config
import rtorrent_xmlrpc import rtorrent_xmlrpc
WATCH_HANDLE = None
sp = [] sp = []
torrents = [[]] * len(config.rtorrent_insts) torrents = [[] for _ in range(len(config.rtorrent_insts))]
class Torrent: class Torrent:
def __init__(self, raw, rt_id): def __init__(self, raw, rt_id):
@ -55,26 +54,12 @@ class Torrent:
self.hashing = raw[10] self.hashing = raw[10]
class Watch(threading.Thread): async def update_torrents():
"""A thread class that continously queries the rTorrent instances.""" for n in range(len(torrents)):
def __init__(self): try:
super(Watch, self).__init__() torrents[n] = await asyncio.wait_for(get_all(n), 5)
self._stop_event = threading.Event() except asyncio.TimeoutError:
continue
def stop(self):
self._stop_event.set()
def stopped(self):
return self._stop_event.is_set()
def run(self):
global torrents
while not self.stopped():
for n in range(len(torrents)):
if self.stopped():
break
torrents[n] = get_all(n)
self._stop_event.wait(0.5)
def size_units(rate): def size_units(rate):
@ -126,7 +111,7 @@ def all_torrents():
res += item res += item
return res return res
def get_all(n): async def get_all(n):
"""Gets all torrent information from a instance and returns it.""" """Gets all torrent information from a instance and returns it."""
res = sp[n].d.multicall2('', 'main', res = sp[n].d.multicall2('', 'main',
'd.hash=', 'd.hash=',
@ -145,18 +130,10 @@ def get_all(n):
def init(): def init():
"""Initializes the rTorrent interface.""" """Initializes the rTorrent interface."""
global WATCH_HANDLE
global sp global sp
for rt_port in config.rtorrent_insts: for rt_port in config.rtorrent_insts:
s = rtorrent_xmlrpc.SCGIServerProxy(rt_port) s = rtorrent_xmlrpc.SCGIServerProxy(rt_port)
sp.append(s) sp.append(s)
WATCH_HANDLE = Watch()
WATCH_HANDLE.start()
async def stop_watch(*args, **kwargs):
"""Stops the watch thread."""
global WATCH_HANDLE
WATCH_HANDLE.stop()
def get_active(): def get_active():
"""Returns all actively seeding or leeching torrents.""" """Returns all actively seeding or leeching torrents."""