Titivillus/quest/tools.py

111 lines
2.9 KiB
Python
Raw Normal View History

2018-09-18 11:45:12 -04:00
#!/usr/bin/env python3
"""
Some miscellaneous tools and helper functions. Primarily for quests.
"""
import os
import re
import json
import time
2018-09-18 11:45:12 -04:00
import hashlib
import magic
import requests
from django.conf import settings
from channels.layers import get_channel_layer
from asgiref.sync import async_to_sync
2018-09-18 11:45:12 -04:00
2019-04-13 17:08:32 -04:00
IMG_DIR = "/var/www/html/img/"
2018-09-18 11:45:12 -04:00
ALLOWED_MIMES = [
"image/jpeg",
"image/png",
"image/gif",
"video/webm"
]
def download_img(url):
"""
Downloads the requested URL, ensures the mimetype is an acceptable
type, and saves it to file with the hash as filename. Returns a
URL to image.
"""
# TODO: external server
timeout = 10 # TODO: put in settings
url = url.replace('..', '') # TODO: why is this here?
2018-10-11 12:37:29 -04:00
if url.startswith(settings.IMG_SVR_URL):
if '/' not in url.replace(settings.IMG_SVR_URL, ''):
return url
2018-09-18 11:45:12 -04:00
try:
with requests.get(url, stream=True) as r:
r.raise_for_status()
data = b''
start_time = time.time()
for chunk in r.iter_content(102400):
if time.time() - start_time > timeout:
raise ValueError('TIMEOUT_REACHED')
data += chunk
if len(data) > 4*1024*1024: # TODO: put in settings
raise ValueError('RESPONSE_TOO_LARGE')
mime = magic.from_buffer(data, mime=True)
2018-09-18 11:45:12 -04:00
assert mime in ALLOWED_MIMES
h = hashlib.sha256()
h.update(data)
2018-09-18 11:45:12 -04:00
fname = h.hexdigest()
fname += "." + mime.partition("/")[2]
with open(os.path.join(IMG_DIR, fname), "wb") as file:
file.write(data)
2018-09-18 11:45:12 -04:00
return settings.IMG_SVR_URL + fname
except requests.exceptions.RequestException:
return "INVALID_URL"
except AssertionError:
return "INVALID_MIME_TYPE"
except ValueError as e:
return str(e)
2018-09-18 11:45:12 -04:00
except Exception as e:
print(e) # TODO: log this
2018-09-18 11:45:12 -04:00
return "UNKNOWN_ERROR"
def handle_img(text, limit=5):
"""
Finds all image urls in the given set of text and attempts to handle
them appropriately. `limit` will limit how many urls are processed.
The rest will be ignored. If an error occurs during handling, the raw
(unlinked) url will be inserted.
"""
# TODO: handle webms
2018-10-11 12:37:29 -04:00
urls = re.findall(
r"""\[img(?: title=['"](.*)['"])?\](.*)\[\/img\]""",
text.replace('<br>', '\n')
2018-10-11 12:37:29 -04:00
)
urls = urls[:limit]
2018-09-18 11:45:12 -04:00
2018-10-11 12:37:29 -04:00
for match_pair in urls:
title, external_url = match_pair
internal_url = download_img(external_url)
if not internal_url.startswith("http"): # download errored
# TODO: error message?
text = re.sub(r"\[img.*?\[\/img\]", external_url, text, 1)
2018-10-11 12:37:29 -04:00
if not title:
title = os.path.basename(external_url)
img_tag = f'<img src="{internal_url}" title="{title}">'
2018-09-18 11:45:12 -04:00
2018-10-11 12:37:29 -04:00
text = re.sub(r"\[img.*?\[\/img\]", img_tag, text, 1)
2018-09-18 11:45:12 -04:00
return text
def send_to_websocket(event, quest_id, data={}):
"""
Acts like QuestConsumer.send() but callable from views.
"""
channel_layer = get_channel_layer()
group_name = f'quest_{quest_id}'
data = json.dumps({'event': event, 'data': data})
async_to_sync(channel_layer.group_send)(
group_name,
{
'type': 'dispatch_send',
'message': data
}
)