Titivillus/quest/events.py

637 lines
14 KiB
Python

#!/usr/bin/env python3
"""
Individual functions for handling WebSocket events. Gets called by the
QuestConsumer object in consumers.py.
"""
import re
import time
import types
import random
import bleach
from django.conf import settings
from django.db import IntegrityError
from django.utils.timezone import localtime
from django.urls import reverse
from quest.models import *
from quest.tools import handle_img
from quest.forms import DiceCallForm, PollForm
from user.models import User
def message(socket, data):
"""
Gets called when the server receives a 'message' event.
"""
message = data.get('message')
if not all(locals().values()):
return
# cleaning
message = message[:512]
message = message.strip()
message = re.sub(r'\n\n+', '\n\n', message)
if not message:
return
tags = ["b", "code", "i", "s"]
message = bleach.clean(message, tags=tags)
# greentext
lines = []
for line in message.splitlines():
if line.startswith(">") and not line.startswith(">>"):
line = '<span class="greenText">' + line + '</span>'
lines.append(line)
message = "<br>".join(lines)
# quote links
quotes = re.findall(r"&gt;&gt;\d+", message)
for quote in quotes:
msg_id = quote.replace("&gt;&gt;", "")
msg = '<a class="quotelink" '
msg += 'href="javascript:scroll_to_msg(\'' + msg_id + '\')" '
msg += 'onmousemove="show_preview(event, \'' + msg_id + '\')" '
msg += 'onmouseout="clear_preview()">' + quote + '</a>'
message = message.replace(quote, msg)
# handle image
message = handle_img(message)
# dice rolling
if any(map(message.startswith, ["/dice", "/roll"])):
reg = re.search(r"(\d+)d(\d+)([+-]\d+)?", data["message"])
if not reg:
return
try:
# TODO: form validation?
groups = [0 if d is None else int(d) for d in reg.groups()]
dice_num, dice_sides, dice_mod = groups
assert 1 <= dice_num <= 256
assert 1 <= dice_sides <= 256
assert -65536 <= dice_mod <= 65536
except (ValueError, AssertionError):
return
dice_results = [random.randint(1, dice_sides) for _ in range(dice_num)]
total = sum(dice_results) + dice_mod
roll_msg = f"Rolled {', '.join(map(str, dice_results))}"
if dice_mod:
if dice_mod > 0:
roll_msg += " + " + str(dice_mod)
else:
roll_msg += " - " + str(dice_mod)[1:]
roll_msg += " = " + str(total)
message += '<hr class="msgSrvHr"><b>' + roll_msg + "</b>"
user = socket.scope['user']
quest = Quest.objects.get(id=socket.quest_id)
m = Message(
quest=quest,
message=message)
if user.is_authenticated:
m.user = user
m.save()
if user.is_authenticated:
anonymize = (user.anonymize or quest.anonymize) and user != quest.owner
else:
anonymize = True
data = {}
data['message_id'] = m.id
data['message'] = message
data['date'] = int(time.time())
data['name'] = user.username
data['anonymize'] = anonymize
socket.send('message', data)
# append rolls to dicecall
if any(map(message.startswith, ["/dice", "/roll"])):
try:
dc = DiceCall.objects.get(
post__quest__id=socket.quest_id,
open=True
)
except DiceCall.DoesNotExist:
return
dice_roll = f"{dice_num}d{dice_sides}"
if dice_mod:
if dice_mod > 0:
dice_roll += "+"
dice_roll += str(dice_mod)
if dc.strict and dc.dice_roll != dice_roll:
return
dr = DiceRoll(
dicecall=dc,
message=m,
roll=dice_roll,
results=re.search(r"Rolled (.+) =", roll_msg).group(1),
total=total,
)
dr.save()
if DiceRoll.objects.filter(dicecall=dc).count() == dc.rolls_taken:
dc.open = False
dc.save()
socket.send('close_post', {'post_id': dc.post_id})
data = {}
data['post_text'] = roll_msg + " (" + dice_roll + ")"
if dc.dice_challenge:
if total >= dc.dice_challenge:
data["post_text"] += " - Pass"
else:
data["post_text"] += " - Fail"
data['post_type'] = 'dice'
data['post_id'] = dc.post_id
socket.send('update_post', data)
def text_post(socket, data):
"""
Called when the QM creates a new text post.
"""
quest = Quest.objects.get(id=socket.quest_id)
user = socket.scope['user']
if quest.owner != user: # 401 not allowed
return
post_text = data.get('text')
page_num = data.get('page_num')
if not all(locals().values()):
return
try:
page = Page.objects.get(quest=quest, page_num=page_num)
except Page.DoesNotExist:
return
# cleaning
post_text = bleach.clean(post_text.strip())
post_text = post_text.replace("\n", "<br>")
# handle image
post_text = handle_img(post_text)
p = Post(
quest=quest,
page=page,
post_type='text',
post_text=post_text)
p.save()
data = {}
data['post_text'] = post_text
data['post_type'] = 'text'
data['date'] = localtime(p.timestamp).strftime('%Y-%m-%d %H:%M')
data['post_id'] = p.id
socket.send('new_post', data)
server = User.objects.get(id=settings.SERVER_USER_ID)
m = Message(
quest=quest,
user=server,
message=f"{quest.owner.username} made a new post!",
)
m.save()
data = {}
data['message_id'] = m.id
data['message'] = m.message
data['date'] = int(time.time())
data['name'] = server.username
data['anonymize'] = False
socket.send('message', data)
def dice_post(socket, data):
"""
Called when the QM makes a new dice post.
"""
quest = Quest.objects.get(id=socket.quest_id)
user = socket.scope['user']
if quest.owner != user: # 401 not allowed
return
page_num = data.get('page_num')
try:
page = Page.objects.get(quest=quest, page_num=page_num)
except Page.DoesNotExist:
return
form = DiceCallForm(data)
if not form.is_valid():
# TODO: error message event
return
form = form.cleaned_data
posts = Post.objects.filter(
quest=quest,
post_type='dice',
dicecall__open=True
)
for post in posts:
post.dicecall.open = False
post.dicecall.save()
socket.send('close_all_posts', {'post_type': 'dice'})
dice_roll = str(form['diceNum']) + "d"
dice_roll += str(form['diceSides'])
if form['diceMod']:
if form['diceMod'] > 0:
dice_roll += "+"
dice_roll += str(form['diceMod'])
post_text = "Roll " + dice_roll
if form['diceChal']:
post_text += " vs DC" + str(form['diceChal'])
p = Post(
quest=quest,
page=page,
post_type='dice',
post_text=post_text
)
p.save()
dc = DiceCall(
post=p,
dice_roll=dice_roll,
strict=form['diceStrict'],
dice_challenge=form['diceChal'],
rolls_taken=form['diceRollsTaken'],
open=True,
)
dc.save()
data = {}
data['post_id'] = p.id
data['post_type'] = 'dice'
data['post_text'] = post_text
data['date'] = localtime(p.timestamp).strftime('%Y-%m-%d %H:%M')
socket.send('new_post', data)
server = User.objects.get(id=settings.SERVER_USER_ID)
m = Message(
quest=quest,
user=server,
message=f"{quest.owner.username} made a new dice call!",
)
m.save()
data = {}
data['message_id'] = m.id
data['message'] = m.message
data['date'] = int(time.time())
data['name'] = server.username
data['anonymize'] = False
socket.send('message', data)
def poll_post(socket, data):
"""
Called when the QM makes a new dice post.
"""
quest = Quest.objects.get(id=socket.quest_id)
user = socket.scope['user']
if quest.owner != user: # 401 not allowed
return
page_num = data.get('page_num')
try:
page = Page.objects.get(quest=quest, page_num=page_num)
except Page.DoesNotExist:
return
form = PollForm(data)
if not form.is_valid():
return # error message?
form = form.cleaned_data
multi_choice = form.pop('multi_choice')
allow_writein = form.pop('allow_writein')
p = Post(
quest=quest,
page=page,
post_type='poll',
post_text="Poll"
)
p.save()
pl = Poll(
post=p,
multi_choice=multi_choice,
allow_writein=allow_writein,
open=True
)
pl.save()
options = []
for key, option in form.items():
o = PollOption(
poll=pl,
text=option
)
o.save()
options.append(o)
data = {}
data['post_type'] = 'poll'
data['post_id'] = p.id
data['post_text'] = "Poll"
data['date'] = localtime(p.timestamp).strftime('%Y-%m-%d %H:%M')
data['options'] = [(o.id, o.text) for o in options]
data['allow_writein'] = allow_writein
socket.send('new_post', data)
server = User.objects.get(id=settings.SERVER_USER_ID)
m = Message(
quest=quest,
user=server,
message=f"{quest.owner.username} made a new poll!",
)
m.save()
data = {}
data['message_id'] = m.id
data['message'] = m.message
data['date'] = int(time.time())
data['name'] = server.username
data['anonymize'] = False
socket.send('message', data)
def edit_post(socket, data):
"""
Called when the QM saves an edited post.
"""
quest = Quest.objects.get(id=socket.quest_id)
user = socket.scope['user']
if quest.owner != user: # 401 not allowed
return
post_id = data.get('post_id')
post_text = data.get('post_text')
if not all(locals().values()):
return
try:
p = Post.objects.get(id=post_id)
except Post.DoesNotExist:
return
# cleaning
post_text = bleach.clean(post_text.strip())
post_text = post_text.replace("\n", "<br>")
# handle image
post_text = handle_img(post_text)
p.post_text = post_text
p.save()
data = {}
data['post_text'] = post_text
data['post_type'] = 'text'
data['post_id'] = p.id
socket.send('update_post', data)
def close_post(socket, data):
"""
Called when the QM closes an open post.
"""
quest = Quest.objects.get(id=socket.quest_id)
user = socket.scope['user']
if quest.owner != user: # 401 not allowed
return
post_id = data.get('post_id')
try:
p = Post.objects.get(id=post_id)
except Post.DoesNotExist:
return
if data.get('post_type') == 'dice':
p.dicecall.open = False
p.dicecall.save()
elif data.get('post_type') == 'poll':
p.poll.open = False
p.poll.save()
data = {}
data['post_id'] = post_id
socket.send('close_post', data)
def open_post(socket, data):
"""
Called when the QM opens a closed post.
"""
quest = Quest.objects.get(id=socket.quest_id)
user = socket.scope['user']
if quest.owner != user: # 401 not allowed
return
post_id = data.get('post_id')
try:
p = Post.objects.get(id=post_id)
except Post.DoesNotExist:
return
if data.get('post_type') == 'dice':
posts = Post.objects.filter(
quest=quest,
post_type='dice',
dicecall__open=True
)
for post in posts:
post.dicecall.open = False
post.dicecall.save()
socket.send('close_all_posts', {'post_type': 'dice'})
p.dicecall.open = True
p.dicecall.save()
elif data.get('post_type') == 'poll':
p.poll.open = True
p.poll.save()
data = {}
data['post_id'] = post_id
socket.send('open_post', data)
def new_page(socket, data):
"""
Called when the QM creates a new page.
"""
quest = Quest.objects.get(id=socket.quest_id)
user = socket.scope['user']
if quest.owner != user: # 401 not allowed
return
title = data.get('page_title')
appendix = bool(data.get('appendix'))
if not all(locals().values()):
return
if appendix:
page = Page.objects.filter(
quest=quest,
appendix=True
).order_by('page_num').last()
if page:
page_num = chr(ord(page.page_num)+1)
else:
page_num = 'a'
else:
last_page = Page.objects.filter(
quest=quest,
appendix=False
).order_by('page_num').last()
page_num = int(last_page.page_num) + 1
p = Page(
quest=quest,
page_num=page_num,
title=title,
appendix=appendix,
)
p.save()
data = {}
data['page_num'] = p.page_num
data['title'] = title
if appendix:
data['appendix'] = True
else:
data['appendix'] = False
data['url'] = reverse('quest:quest', args=[socket.quest_id, p.page_num])
socket.send('new_page', data)
server = User.objects.get(id=settings.SERVER_USER_ID)
m = Message(
quest=quest,
user=server,
message=f"{quest.owner.username} started a new page!",
)
m.save()
data = {}
data['message_id'] = m.id
data['message'] = m.message
data['date'] = int(time.time())
data['name'] = server.username
data['anonymize'] = False
socket.send('message', data)
def vote(socket, data):
"""
Called when a player votes in a poll.
"""
post_id = data.get('post_id')
option_id = data.get('option_id')
polarity = data.get('polarity')
ip_address = socket.scope['client'][0]
user = socket.scope['user']
if not all(locals().values()):
return
try:
p = Poll.objects.get(post_id=post_id)
except Poll.DoesNotExist:
return
if not p.open:
return
if polarity == False: # player removes his vote
try:
pv = PollVote.objects.get(
ip_address=ip_address,
option__id=option_id
)
except PollVote.DoesNotExist:
return
pv.delete()
else: # player makes a new vote
try:
option = PollOption.objects.get(id=option_id)
except PollOption.DoesNotExist:
return
pvs = PollVote.objects.filter(option=option, ip_address=ip_address)
if pvs.count() != 0: # if player has voted for this option already
return
if p.multi_choice == False:
votes = PollVote.objects.filter(
ip_address=ip_address,
option__poll=p
)
for vote in votes:
vote.delete()
data = {}
data['option_id'] = vote.option.id
data['polarity'] = False
socket.send('vote', data)
socket.self_send('set_option_box', data)
pv = PollVote(
option=option,
ip_address=ip_address
)
if user.is_authenticated:
pv.user = user
pv.save()
data = {}
data['option_id'] = option_id
data['polarity'] = polarity
socket.send('vote', data)
def write_in(socket, data):
"""
Called when a player creates a new write-in.
"""
post_id = data.get('post_id')
option_text = data.get('option_text')
user = socket.scope['user']
if not all(locals().values()):
return
try:
p = Poll.objects.get(post_id=post_id)
except Poll.DoesNotExist:
return
if not p.allow_writein:
return
option_text = option_text.strip()
if not option_text:
return
option_text = "Write-in: " + bleach.clean(option_text)
if len(option_text) > 200:
# TODO: error message event
return
o = PollOption(
poll=p,
text=option_text
)
o.save()
data = {}
data['post_id'] = post_id
data['post_type'] = 'poll'
data['option_id'] = o.id
data['option_text'] = option_text
socket.send('update_post', data)
events = {}
for obj in dir():
if type(locals()[obj]) == types.FunctionType:
events[locals()[obj].__name__] = locals()[obj]