#!/usr/bin/env python3
"""
Individual functions for handling WebSocket events. Gets called by the
QuestConsumer object in consumers.py.
"""
# TODO: quest owner only events
import re
import time
import types
import random
import bleach
from django.utils.timezone import localtime
from quest.models import Message, Quest, Post, DiceCall, DiceRoll
from quest.forms import DiceCallForm
def message(socket, data):
"""
Gets called when the server receives a 'message' event.
"""
# TODO: validation
message = data.get('message')
quest_id = data.get('quest_id')
# cleaning
message = message.strip()
if not message:
return
tags = ["b", "code", "i", "s"]
message = bleach.clean(message, tags=tags)
# greentext
lines = []
for line in message.splitlines():
if line.startswith(">") and not line.startswith(">>"):
line = '' + line + ''
lines.append(line)
message = "
".join(lines)
# quote links
quotes = re.findall(r">>\d+", message)
for quote in quotes:
msg_id = quote.replace(">>", "")
msg = '' + quote + ''
message = message.replace(quote, msg)
# handle image
# dice rolling
if any(map(message.startswith, ["/dice", "/roll"])):
reg = re.search(r"(\d+)d(\d+)([+-]\d+)?", data["message"])
if not reg:
return
try:
groups = [0 if d is None else int(d) for d in reg.groups()]
dice_num, dice_sides, dice_mod = groups
assert 0 < dice_num < 100
assert 0 < dice_sides < 100
assert -1000 < dice_mod < 1000
except (ValueError, AssertionError):
return
dice_results = [random.randint(1, dice_sides) for _ in range(dice_num)]
total = sum(dice_results) + dice_mod
roll_msg = f"Rolled {', '.join(map(str, dice_results))}"
if dice_mod:
if dice_mod > 0:
roll_msg += " + " + str(dice_mod)
else:
roll_msg += " - " + str(dice_mod)[1:]
roll_msg += " = " + str(total)
message += '
' + roll_msg + ""
user = socket.scope['user']
m = Message(
quest=Quest.objects.get(id=quest_id),
message=message)
if user.username:
m.user = user
m.save()
data = {}
data['message_id'] = m.id
data['message'] = message
data['date'] = int(time.time())
data['name'] = user.username
socket.send('message', data)
# append rolls to dicecall
if any(map(message.startswith, ["/dice", "/roll"])):
try:
dc = DiceCall.objects.get(post__quest__id=quest_id, open=True)
except DiceCall.DoesNotExist:
return
dice_roll = f"{dice_num}d{dice_sides}"
if dice_mod:
if dice_mod > 0:
dice_roll += "+"
dice_roll += str(dice_mod)
if dc.strict and dc.dice_roll != dice_roll:
return
dr = DiceRoll(
dicecall=dc,
message=m,
roll=dice_roll,
results=re.search(r"Rolled (.+) =", roll_msg).group(1),
total=total,
)
dr.save()
if DiceRoll.objects.filter(dicecall=dc).count() == dc.rolls_taken:
dc.open = False
dc.save()
socket.send('close_post', {'post_id': dc.post_id})
data = {}
data['post_text'] = roll_msg + " (" + dice_roll + ")"
if dc.dice_challenge:
if total >= dc.dice_challenge:
data["post_text"] += " - Pass"
else:
data["post_text"] += " - Fail"
data['post_type'] = 'dice'
data['post_id'] = dc.post_id
socket.send('update_post', data)
def text_post(socket, data):
"""
Called when the QM creates a new text post.
"""
# TODO: security
quest_id = data.get('quest_id')
post_text = data.get('text')
page_num = data.get('page_num')
# cleaning
post_text = bleach.clean(post_text.strip())
post_text = post_text.replace("\n", "
")
# handle image
p = Post(
quest=Quest.objects.get(id=quest_id),
page_num=page_num,
post_type='text',
post_text=post_text)
p.save()
data = {}
data['post_text'] = post_text
data['post_type'] = 'text'
data['date'] = localtime(p.timestamp).strftime('%Y-%m-%d %H:%M')
data['post_id'] = p.id
socket.send('new_post', data)
def dice_post(socket, data):
"""
Called when the QM makes a new dice post.
"""
quest_id = data.get('quest_id')
page_num = data.get('page_num')
form = DiceCallForm(data)
if not form.is_valid():
return # error message?
form = form.cleaned_data
posts = Post.objects.filter(
quest__id=quest_id, post_type='dice', dicecall__open=True)
for post in posts:
post.dicecall.open = False
post.dicecall.save()
socket.send('close_all_posts')
dice_roll = str(form['diceNum']) + "d"
dice_roll += str(form['diceSides'])
if form['diceMod']:
if form['diceMod'] > 0:
dice_roll += "+"
dice_roll += str(form['diceMod'])
post_text = "Roll " + dice_roll
if form['diceChal']:
post_text += " vs DC" + str(form['diceChal'])
p = Post(
quest=Quest.objects.get(id=quest_id),
page_num=page_num,
post_type='dice',
post_text=post_text
)
p.save()
dc = DiceCall(
post=p,
dice_roll=dice_roll,
strict=form['diceStrict'],
dice_challenge=form['diceChal'],
rolls_taken=form['diceRollsTaken'],
open=True,
)
dc.save()
data = {}
data['post_text'] = post_text
data['post_type'] = 'dice'
data['date'] = localtime(p.timestamp).strftime('%Y-%m-%d %H:%M')
data['post_id'] = p.id
socket.send('new_post', data)
def close_post(socket, data):
"""
Called when the QM closes an open post.
"""
post_id = data.get('post_id')
quest_id = data.get('quest_id')
p = Post.objects.get(id=post_id)
if data.get('post_type') == 'dice':
p.dicecall.open = False
p.dicecall.save()
elif data.get('post_type') == 'poll':
p.poll.open = False
p.poll.save()
data = {}
data['post_id'] = post_id
socket.send('close_post', data)
def open_post(socket, data):
"""
Called when the QM opens a closed post.
"""
# TODO: only posts on last page can be opened
post_id = data.get('post_id')
quest_id = data.get('quest_id')
p = Post.objects.get(id=post_id)
if data.get('post_type') == 'dice':
posts = Post.objects.filter(
quest__id=quest_id, post_type='dice', dicecall__open=True)
for post in posts:
post.dicecall.open = False
post.dicecall.save()
socket.send('close_all_posts', {'post_type': 'dice'})
p.dicecall.open = True
p.dicecall.save()
elif data.get('post_type') == 'poll':
p.poll.open = True
p.poll.save()
data = {}
data['post_id'] = post_id
socket.send('open_post', data)
events = {}
for obj in dir():
if type(locals()[obj]) == types.FunctionType:
events[locals()[obj].__name__] = locals()[obj]