Titivillus/quest/events.py

125 lines
3.0 KiB
Python

#!/usr/bin/env python3
"""
Individual functions for handling WebSocket events. Gets called by the
QuestConsumer object in consumers.py.
"""
import re
import time
import types
import random
import bleach
from django.utils.timezone import localtime
from quest.models import Message, Quest, Post
def message(socket, data):
"""
Gets called when the server receives a 'message' event.
"""
# TODO: validation
message = data.get('message')
quest_id = data.get('quest_id')
# cleaning
message = message.strip()
if not message:
return
tags = ["b", "code", "i", "s"]
message = bleach.clean(message, tags=tags)
# greentext
lines = []
for line in message.splitlines():
if line.startswith(">") and not line.startswith(">>"):
line = '<span class="greenText">' + line + '</span>'
lines.append(line)
message = "<br>".join(lines)
# quote links
quotes = re.findall(r"&gt;&gt;\d+", message)
for quote in quotes:
msg_id = quote.replace("&gt;&gt;", "")
msg = '<a class="quotelink" '
msg += 'href="javascript:scrollToMsg(\'' + msg_id + '\')" '
msg += 'onmousemove="showPreview(event, \'' + msg_id + '\')" '
msg += 'onmouseout="clearPreview()">' + quote + '</a>'
message = message.replace(quote, msg)
# handle image
# dice rolling
if any(map(message.startswith, ["/dice", "/roll"])):
reg = re.search(r"(\d+)d(\d+)([+-]\d+)?", data["message"])
if not reg:
return
try:
groups = [0 if d is None else int(d) for d in reg.groups()]
dice_num, dice_sides, dice_mod = groups
assert 0 < dice_num < 100
assert 0 < dice_sides < 100
assert -1000 < dice_mod < 1000
except (ValueError, AssertionError):
return
dice = [random.randint(1, dice_sides) for _ in range(dice_num)]
total = sum(dice) + dice_mod
roll_msg = f"Rolled {', '.join(map(str, dice))}"
if dice_mod:
if dice_mod > 0:
roll_msg += " + " + str(dice_mod)
else:
roll_msg += " - " + str(dice_mod)[1:]
roll_msg += " = " + str(total)
message += '<hr class="msgSrvHr" /><b>' + roll_msg + "</b>"
user = socket.scope['user']
m = Message(
quest=Quest.objects.get(id=quest_id),
message=message)
if user.username:
m.user = user
m.save()
data = {}
data['message_id'] = m.id
data['message'] = message
data['date'] = int(time.time())
data['name'] = user.username
socket.send('message', data)
def text_post(socket, data):
"""
Called when the QM creates a new text post.
"""
quest_id = data.get('quest_id')
text = data.get('text')
page_num = data.get('page_num')
# cleaning
text = bleach.clean(text.strip())
text = text.replace("\n", "<br>")
# handle image
p = Post(
quest=Quest.objects.get(id=quest_id),
page_num=page_num,
post_type='text',
post_text=text)
p.save()
data = {}
data['text'] = text
data['post_type'] = 'text'
data['date'] = localtime(p.timestamp).strftime('%Y-%m-%d %H:%M')
data['post_id'] = p.id
socket.send('new_post', data)
events = {}
for obj in dir():
if type(locals()[obj]) == types.FunctionType:
events[locals()[obj].__name__] = locals()[obj]