#!/usr/bin/env python3
"""
Some miscellaneous tools and helper functions. Primarily for quests.
"""
import os
import re
import json
import time
import hashlib
import magic
import requests
from django.conf import settings
from channels.layers import get_channel_layer
from asgiref.sync import async_to_sync
IMG_DIR = "/var/www/html/img/"
ALLOWED_MIMES = [
"image/jpeg",
"image/png",
"image/gif",
"video/webm"
]
def download_img(url):
"""
Downloads the requested URL, ensures the mimetype is an acceptable
type, and saves it to file with the hash as filename. Returns a
URL to image.
"""
# TODO: external server
timeout = 10 # TODO: put in settings
url = url.replace('..', '') # TODO: why is this here?
if url.startswith(settings.IMG_SVR_URL):
if '/' not in url.replace(settings.IMG_SVR_URL, ''):
return url
try:
with requests.get(url, stream=True) as r:
r.raise_for_status()
data = b''
start_time = time.time()
for chunk in r.iter_content(102400):
if time.time() - start_time > timeout:
raise ValueError('TIMEOUT_REACHED')
data += chunk
if len(data) > 4*1024*1024: # TODO: put in settings
raise ValueError('RESPONSE_TOO_LARGE')
mime = magic.from_buffer(data, mime=True)
assert mime in ALLOWED_MIMES
h = hashlib.sha256()
h.update(data)
fname = h.hexdigest()
fname += "." + mime.partition("/")[2]
with open(os.path.join(IMG_DIR, fname), "wb") as file:
file.write(data)
return settings.IMG_SVR_URL + fname
except requests.exceptions.RequestException:
return "INVALID_URL"
except AssertionError:
return "INVALID_MIME_TYPE"
except ValueError as e:
return str(e)
except Exception as e:
print(e) # TODO: log this
return "UNKNOWN_ERROR"
def handle_img(text, limit=5):
"""
Finds all image urls in the given set of text and attempts to handle
them appropriately. `limit` will limit how many urls are processed.
The rest will be ignored. If an error occurs during handling, the raw
(unlinked) url will be inserted.
"""
# TODO: handle webms
urls = re.findall(
r"""\[img(?: title=['"](.*)['"])?\](.*)\[\/img\]""",
text.replace('
', '\n')
)
urls = urls[:limit]
for match_pair in urls:
title, external_url = match_pair
internal_url = download_img(external_url)
if not internal_url.startswith("http"): # download errored
# TODO: error message?
text = re.sub(r"\[img.*?\[\/img\]", external_url, text, 1)
if not title:
title = os.path.basename(external_url)
img_tag = f''
text = re.sub(r"\[img.*?\[\/img\]", img_tag, text, 1)
return text
def send_to_websocket(event, quest_id, data={}):
"""
Acts like QuestConsumer.send() but callable from views.
"""
channel_layer = get_channel_layer()
group_name = f'quest_{quest_id}'
data = json.dumps({'event': event, 'data': data})
async_to_sync(channel_layer.group_send)(
group_name,
{
'type': 'dispatch_send',
'message': data
}
)