From 342c1a7c87407bd0bfd0b8214429a14542f82b7c Mon Sep 17 00:00:00 2001 From: iou1name Date: Wed, 23 Sep 2020 09:04:27 -0400 Subject: [PATCH] convert threading processes to asyncio --- README.md | 2 +- aberrant.py | 45 +++++++++++++++++++++++++++++++++++++-------- rtorrent.py | 41 +++++++++-------------------------------- 3 files changed, 47 insertions(+), 41 deletions(-) diff --git a/README.md b/README.md index 539b75c..1beba1f 100644 --- a/README.md +++ b/README.md @@ -10,4 +10,4 @@ Python packages: `gunicorn aiohttp aiohttp_jinja2` 2. Walk the dinosaur ## Usage -`gunicorn aberrant:app --bind localhost:5250 --worker-class aiohttp.GunicornWebWorker` +`gunicorn aberrant:init_app --bind localhost:5250 --worker-class aiohttp.GunicornWebWorker` diff --git a/aberrant.py b/aberrant.py index 637afd2..3d9eb21 100644 --- a/aberrant.py +++ b/aberrant.py @@ -3,6 +3,7 @@ The primary module for serving the Aberrant application. """ import json +import asyncio import jinja2 import aiohttp_jinja2 @@ -14,14 +15,10 @@ import events import rtorrent import buckler_aiohttp -app = web.Application(middlewares=[buckler_aiohttp.buckler_session]) -app.on_shutdown.append(rtorrent.stop_watch) -aiohttp_jinja2.setup(app, loader=jinja2.FileSystemLoader('templates')) -rtorrent.init() routes = web.RouteTableDef() -@routes.get(config.prefix + "/", name='index') +@routes.get('/', name='index') async def index(request): """The index page.""" torrents = rtorrent.get_active() @@ -29,7 +26,7 @@ async def index(request): return render_template("index.html", request, locals()) -@routes.get(config.prefix + '/ws', name='ws') +@routes.get('/ws', name='ws') async def websocket_handler(request): """The websocket endpoint.""" ws = web.WebSocketResponse(heartbeat=30) @@ -56,7 +53,39 @@ async def websocket_handler(request): return ws -app.router.add_routes(routes) +async def watcher_loop(): + try: + while True: + await rtorrent.update_torrents() + await asyncio.sleep(10) + except asyncio.CancelledError: + return + + +async def start_background_tasks(app): + rtorrent.init() + app['watcher'] = asyncio.create_task(watcher_loop()) + + +async def cleanup_background_tasks(app): + app['watcher'].cancel() + await app['watcher'] + + +async def init_app(): + """Initializes the application.""" + app = web.Application(middlewares=[buckler_aiohttp.buckler_session]) + aiohttp_jinja2.setup(app, loader=jinja2.FileSystemLoader('templates')) + app.on_startup.append(start_background_tasks) + app.on_cleanup.append(cleanup_background_tasks) + + app.router.add_routes(routes) + + app_wrap = web.Application() + app_wrap.add_subapp(config.url_prefix, app) + return app_wrap + if __name__ == "__main__": - aiohttp.web.run_app(app, host='0.0.0.0', port=5250) + app = init_app() + web.run_app(app, host='0.0.0.0', port=5250) diff --git a/rtorrent.py b/rtorrent.py index a2f9734..8d5239f 100644 --- a/rtorrent.py +++ b/rtorrent.py @@ -4,15 +4,14 @@ This module handles the interface with rTorrent via XMLRPC. """ import re import time -import threading +import asyncio from collections import defaultdict import config import rtorrent_xmlrpc -WATCH_HANDLE = None sp = [] -torrents = [[]] * len(config.rtorrent_insts) +torrents = [[] for _ in range(len(config.rtorrent_insts))] class Torrent: def __init__(self, raw, rt_id): @@ -55,26 +54,12 @@ class Torrent: self.hashing = raw[10] -class Watch(threading.Thread): - """A thread class that continously queries the rTorrent instances.""" - def __init__(self): - super(Watch, self).__init__() - self._stop_event = threading.Event() - - def stop(self): - self._stop_event.set() - - def stopped(self): - return self._stop_event.is_set() - - def run(self): - global torrents - while not self.stopped(): - for n in range(len(torrents)): - if self.stopped(): - break - torrents[n] = get_all(n) - self._stop_event.wait(0.5) +async def update_torrents(): + for n in range(len(torrents)): + try: + torrents[n] = await asyncio.wait_for(get_all(n), 5) + except asyncio.TimeoutError: + continue def size_units(rate): @@ -126,7 +111,7 @@ def all_torrents(): res += item return res -def get_all(n): +async def get_all(n): """Gets all torrent information from a instance and returns it.""" res = sp[n].d.multicall2('', 'main', 'd.hash=', @@ -145,18 +130,10 @@ def get_all(n): def init(): """Initializes the rTorrent interface.""" - global WATCH_HANDLE global sp for rt_port in config.rtorrent_insts: s = rtorrent_xmlrpc.SCGIServerProxy(rt_port) sp.append(s) - WATCH_HANDLE = Watch() - WATCH_HANDLE.start() - -async def stop_watch(*args, **kwargs): - """Stops the watch thread.""" - global WATCH_HANDLE - WATCH_HANDLE.stop() def get_active(): """Returns all actively seeding or leeching torrents."""