#!/usr/bin/env python3 """ Some miscellaneous tools and helper functions. Primarily for quests. """ import os import re import json import time import hashlib import magic import requests from django.conf import settings from channels.layers import get_channel_layer from asgiref.sync import async_to_sync IMG_DIR = "/var/www/html/img/" ALLOWED_MIMES = [ "image/jpeg", "image/png", "image/gif", "video/webm" ] def download_img(url): """ Downloads the requested URL, ensures the mimetype is an acceptable type, and saves it to file with the hash as filename. Returns a URL to image. """ # TODO: external server timeout = 10 # TODO: put in settings url = url.replace('..', '') # TODO: why is this here? if url.startswith(settings.IMG_SVR_URL): if '/' not in url.replace(settings.IMG_SVR_URL, ''): return url try: with requests.get(url, stream=True) as r: r.raise_for_status() data = b'' start_time = time.time() for chunk in r.iter_content(102400): if time.time() - start_time > timeout: raise ValueError('TIMEOUT_REACHED') data += chunk if len(data) > 4*1024*1024: # TODO: put in settings raise ValueError('RESPONSE_TOO_LARGE') mime = magic.from_buffer(data, mime=True) assert mime in ALLOWED_MIMES h = hashlib.sha256() h.update(data) fname = h.hexdigest() fname += "." + mime.partition("/")[2] with open(os.path.join(IMG_DIR, fname), "wb") as file: file.write(data) return settings.IMG_SVR_URL + fname except requests.exceptions.RequestException: return "INVALID_URL" except AssertionError: return "INVALID_MIME_TYPE" except ValueError as e: return str(e) except Exception as e: print(e) # TODO: log this return "UNKNOWN_ERROR" def handle_img(text, limit=5): """ Finds all image urls in the given set of text and attempts to handle them appropriately. `limit` will limit how many urls are processed. The rest will be ignored. If an error occurs during handling, the raw (unlinked) url will be inserted. """ # TODO: handle webms urls = re.findall( r"""\[img(?: title=['"](.*)['"])?\](.*)\[\/img\]""", text.replace('
', '\n') ) urls = urls[:limit] for match_pair in urls: title, external_url = match_pair internal_url = download_img(external_url) if not internal_url.startswith("http"): # download errored # TODO: error message? text = re.sub(r"\[img.*?\[\/img\]", external_url, text, 1) if not title: title = os.path.basename(external_url) img_tag = f'' text = re.sub(r"\[img.*?\[\/img\]", img_tag, text, 1) return text def send_to_websocket(event, quest_id, data={}): """ Acts like QuestConsumer.send() but callable from views. """ channel_layer = get_channel_layer() group_name = f'quest_{quest_id}' data = json.dumps({'event': event, 'data': data}) async_to_sync(channel_layer.group_send)( group_name, { 'type': 'dispatch_send', 'message': data } )