#!/usr/bin/env python3 """ A music streaming application. """ import asyncio from aiohttp import web, WSMsgType import jinja2 import aiohttp_jinja2 from aiohttp_jinja2 import render_template import uvloop import config import buckler_aiohttp uvloop.install() routes = web.RouteTableDef() with open ('test.opus', 'rb') as file: test_data = file.read() def chunker(seq, size): return (seq[pos:pos + size] for pos in range(0, len(seq), size)) async def send_files(ws): for n, chunk in enumerate(chunker(test_data, 200*1024)): print(f"sending packet #{n}") await ws.send_bytes(chunk) await asyncio.sleep(5) @routes.get('/', name='index') async def index(request): """The index page.""" return render_template('index.html', request, {}) @routes.get('/ws', name='ws') async def websocket_handler(request): """The websocket endpoint.""" ws = web.WebSocketResponse(heartbeat=30) ws_ready = ws.can_prepare(request) if not ws_ready.ok: return web.Response(text="Cannot start websocket.") await ws.prepare(request) async for msg in ws: if msg.type != WSMsgType.TEXT: break if msg.data == "ping": print('ping') await ws.send_str("pong") if msg.data == 'test': print('initiating test') asyncio.create_task(send_files(ws)) await ws.close() return ws async def init_app(): """Initializes the application.""" app = web.Application(middlewares=[buckler_aiohttp.buckler_session]) aiohttp_jinja2.setup( app, trim_blocks=True, lstrip_blocks=True, undefined=jinja2.StrictUndefined, loader=jinja2.FileSystemLoader('templates'), ) app.router.add_routes(routes) app_wrap = web.Application() app_wrap.add_subapp(config.url_prefix, app) return app_wrap if __name__ == "__main__": app = init_app() aiohttp.web.run_app(app, host='0.0.0.0', port=5500)