Titivillus/quest/tools.py
2018-10-11 12:37:29 -04:00

101 lines
2.6 KiB
Python

#!/usr/bin/env python3
"""
Some miscellaneous tools and helper functions. Primarily for quests.
"""
import os
import re
import json
import hashlib
import magic
import requests
from django.conf import settings
from channels.layers import get_channel_layer
from asgiref.sync import async_to_sync
IMG_DIR = "/usr/local/www/html/img/"
ALLOWED_MIMES = [
"image/jpeg",
"image/png",
"image/gif",
"video/webm"
]
def download_img(url):
"""
Downloads the requested URL, ensures the mimetype is an acceptable
type, and saves it to file with the hash as filename. Returns a
URL to image.
"""
# TODO: file size limits
# https://stackoverflow.com/questions/22346158/
# TODO: prevent overwriting
url = url.replace('..', '')
if url.startswith(settings.IMG_SVR_URL):
if '/' not in url.replace(settings.IMG_SVR_URL, ''):
return url
try:
res = requests.get(url)
res.raise_for_status()
mime = magic.from_buffer(res.content, mime=True)
assert mime in ALLOWED_MIMES
h = hashlib.sha256()
h.update(res.content)
fname = h.hexdigest()
fname += "." + mime.partition("/")[2]
with open(os.path.join(IMG_DIR, fname), "wb") as file:
for chunk in res.iter_content(100000):
file.write(chunk)
return settings.IMG_SVR_URL + fname
except requests.exceptions.RequestException:
return "INVALID_URL"
except AssertionError:
return "INVALID_MIME_TYPE"
except Exception as e:
print(e)
return "UNKNOWN_ERROR"
def handle_img(text, limit=5):
"""
Finds all image urls in the given set of text and attempts to handle
them appropriately. `limit` will limit how many urls are processed.
The rest will be ignored. If an error occurs during handling, the raw
(unlinked) url will be inserted.
"""
# TODO: handle webms
urls = re.findall(
r"""\[img(?: title=['"](.*)['"])?\](.*)\[\/img\]""",
text.replace('<br', '\n')
)
urls = urls[:limit]
for match_pair in urls:
title, ext_url = match_pair
int_url = download_img(ext_url)
if int_url in ["INVALID_URL", "INVALID_MIME_TYPE", "UNKNOWN_ERROR"]:
text = re.sub(r"\[img.*?\[\/img\]", ext_url, text, 1)
if not title:
title = os.path.basename(ext_url)
img_tag = f'<img src="{int_url}" title="{title}">'
text = re.sub(r"\[img.*?\[\/img\]", img_tag, text, 1)
return text
def send_to_websocket(event, quest_id, data={}):
"""
Acts like QuestConsumer.send() but callable from views.
"""
channel_layer = get_channel_layer()
group_name = f'quest_{quest_id}'
data = json.dumps({'event': event, 'data': data})
async_to_sync(channel_layer.group_send)(
group_name,
{
'type': 'dispatch_send',
'message': data
}
)