524 lines
12 KiB
Python
524 lines
12 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
Individual functions for handling WebSocket events. Gets called by the
|
|
QuestConsumer object in consumers.py.
|
|
"""
|
|
# TODO: quest owner only events
|
|
# TODO: try/except on database calls
|
|
import re
|
|
import time
|
|
import types
|
|
import random
|
|
|
|
import bleach
|
|
from django.conf import settings
|
|
from django.db import IntegrityError
|
|
from django.utils.timezone import localtime
|
|
from django.urls import reverse
|
|
|
|
from quest.models import *
|
|
from quest.tools import handle_img
|
|
from quest.forms import DiceCallForm, PollForm
|
|
from user.models import User
|
|
|
|
def message(socket, data):
|
|
"""
|
|
Gets called when the server receives a 'message' event.
|
|
"""
|
|
# TODO: validation
|
|
message = data.get('message')
|
|
|
|
# cleaning
|
|
message = message[:512]
|
|
message = message.strip()
|
|
if not message:
|
|
return
|
|
tags = ["b", "code", "i", "s"]
|
|
message = bleach.clean(message, tags=tags)
|
|
|
|
# greentext
|
|
lines = []
|
|
for line in message.splitlines():
|
|
if line.startswith(">") and not line.startswith(">>"):
|
|
line = '<span class="greenText">' + line + '</span>'
|
|
lines.append(line)
|
|
message = "<br>".join(lines)
|
|
|
|
# quote links
|
|
quotes = re.findall(r">>\d+", message)
|
|
for quote in quotes:
|
|
msg_id = quote.replace(">>", "")
|
|
msg = '<a class="quotelink" '
|
|
msg += 'href="javascript:scroll_to_msg(\'' + msg_id + '\')" '
|
|
msg += 'onmousemove="show_preview(event, \'' + msg_id + '\')" '
|
|
msg += 'onmouseout="clear_preview()">' + quote + '</a>'
|
|
message = message.replace(quote, msg)
|
|
|
|
# handle image
|
|
message = handle_img(message)
|
|
|
|
# dice rolling
|
|
if any(map(message.startswith, ["/dice", "/roll"])):
|
|
reg = re.search(r"(\d+)d(\d+)([+-]\d+)?", data["message"])
|
|
if not reg:
|
|
return
|
|
try:
|
|
groups = [0 if d is None else int(d) for d in reg.groups()]
|
|
dice_num, dice_sides, dice_mod = groups
|
|
assert 1 <= dice_num <= 256
|
|
assert 1 <= dice_sides <= 256
|
|
assert -65536 <= dice_mod <= 65536
|
|
except (ValueError, AssertionError):
|
|
return
|
|
dice_results = [random.randint(1, dice_sides) for _ in range(dice_num)]
|
|
total = sum(dice_results) + dice_mod
|
|
roll_msg = f"Rolled {', '.join(map(str, dice_results))}"
|
|
if dice_mod:
|
|
if dice_mod > 0:
|
|
roll_msg += " + " + str(dice_mod)
|
|
else:
|
|
roll_msg += " - " + str(dice_mod)[1:]
|
|
roll_msg += " = " + str(total)
|
|
message += '<hr class="msgSrvHr"><b>' + roll_msg + "</b>"
|
|
|
|
user = socket.scope['user']
|
|
quest = Quest.objects.get(id=socket.quest_id)
|
|
|
|
m = Message(
|
|
quest=quest,
|
|
message=message)
|
|
if user.username:
|
|
m.user = user
|
|
m.save()
|
|
|
|
anonymize = (user.anonymize or quest.anonymize) and user != quest.owner
|
|
|
|
data = {}
|
|
data['message_id'] = m.id
|
|
data['message'] = message
|
|
data['date'] = int(time.time())
|
|
data['name'] = user.username
|
|
data['anonymize'] = anonymize
|
|
socket.send('message', data)
|
|
|
|
# append rolls to dicecall
|
|
if any(map(message.startswith, ["/dice", "/roll"])):
|
|
try:
|
|
dc = DiceCall.objects.get(
|
|
post__quest__id=socket.quest_id,
|
|
open=True
|
|
)
|
|
except DiceCall.DoesNotExist:
|
|
return
|
|
dice_roll = f"{dice_num}d{dice_sides}"
|
|
if dice_mod:
|
|
if dice_mod > 0:
|
|
dice_roll += "+"
|
|
dice_roll += str(dice_mod)
|
|
|
|
if dc.strict and dc.dice_roll != dice_roll:
|
|
return
|
|
|
|
dr = DiceRoll(
|
|
dicecall=dc,
|
|
message=m,
|
|
roll=dice_roll,
|
|
results=re.search(r"Rolled (.+) =", roll_msg).group(1),
|
|
total=total,
|
|
)
|
|
dr.save()
|
|
|
|
if DiceRoll.objects.filter(dicecall=dc).count() == dc.rolls_taken:
|
|
dc.open = False
|
|
dc.save()
|
|
socket.send('close_post', {'post_id': dc.post_id})
|
|
|
|
data = {}
|
|
data['post_text'] = roll_msg + " (" + dice_roll + ")"
|
|
if dc.dice_challenge:
|
|
if total >= dc.dice_challenge:
|
|
data["post_text"] += " - Pass"
|
|
else:
|
|
data["post_text"] += " - Fail"
|
|
data['post_type'] = 'dice'
|
|
data['post_id'] = dc.post_id
|
|
socket.send('update_post', data)
|
|
|
|
|
|
def text_post(socket, data):
|
|
"""
|
|
Called when the QM creates a new text post.
|
|
"""
|
|
# TODO: security
|
|
post_text = data.get('text')
|
|
page_num = data.get('page_num')
|
|
|
|
# cleaning
|
|
post_text = bleach.clean(post_text.strip())
|
|
post_text = post_text.replace("\n", "<br>")
|
|
|
|
# handle image
|
|
|
|
quest = Quest.objects.get(id=socket.quest_id)
|
|
p = Post(
|
|
quest=quest,
|
|
page=Page.objects.get(quest=quest, page_num=page_num),
|
|
post_type='text',
|
|
post_text=post_text)
|
|
p.save()
|
|
|
|
data = {}
|
|
data['post_text'] = post_text
|
|
data['post_type'] = 'text'
|
|
data['date'] = localtime(p.timestamp).strftime('%Y-%m-%d %H:%M')
|
|
data['post_id'] = p.id
|
|
socket.send('new_post', data)
|
|
|
|
server = User.objects.get(id=settings.SERVER_USER_ID)
|
|
m = Message(
|
|
quest=quest,
|
|
user=server,
|
|
message=f"{quest.owner.username} made a new post!",
|
|
)
|
|
m.save()
|
|
data = {}
|
|
data['message_id'] = m.id
|
|
data['message'] = m.message
|
|
data['date'] = int(time.time())
|
|
data['name'] = server.username
|
|
data['anonymize'] = False
|
|
socket.send('message', data)
|
|
|
|
|
|
def dice_post(socket, data):
|
|
"""
|
|
Called when the QM makes a new dice post.
|
|
"""
|
|
page_num = data.get('page_num')
|
|
form = DiceCallForm(data)
|
|
if not form.is_valid():
|
|
return # error message?
|
|
form = form.cleaned_data
|
|
|
|
posts = Post.objects.filter(
|
|
quest__id=socket.quest_id, post_type='dice', dicecall__open=True)
|
|
for post in posts:
|
|
post.dicecall.open = False
|
|
post.dicecall.save()
|
|
socket.send('close_all_posts')
|
|
|
|
dice_roll = str(form['diceNum']) + "d"
|
|
dice_roll += str(form['diceSides'])
|
|
if form['diceMod']:
|
|
if form['diceMod'] > 0:
|
|
dice_roll += "+"
|
|
dice_roll += str(form['diceMod'])
|
|
|
|
post_text = "Roll " + dice_roll
|
|
if form['diceChal']:
|
|
post_text += " vs DC" + str(form['diceChal'])
|
|
|
|
quest = Quest.objects.get(id=socket.quest_id)
|
|
p = Post(
|
|
quest=quest,
|
|
page=Page.objects.get(quest=quest, page_num=page_num),
|
|
post_type='dice',
|
|
post_text=post_text
|
|
)
|
|
p.save()
|
|
dc = DiceCall(
|
|
post=p,
|
|
dice_roll=dice_roll,
|
|
strict=form['diceStrict'],
|
|
dice_challenge=form['diceChal'],
|
|
rolls_taken=form['diceRollsTaken'],
|
|
open=True,
|
|
)
|
|
dc.save()
|
|
|
|
data = {}
|
|
data['post_id'] = p.id
|
|
data['post_type'] = 'dice'
|
|
data['post_text'] = post_text
|
|
data['date'] = localtime(p.timestamp).strftime('%Y-%m-%d %H:%M')
|
|
socket.send('new_post', data)
|
|
|
|
server = User.objects.get(id=settings.SERVER_USER_ID)
|
|
m = Message(
|
|
quest=quest,
|
|
user=server,
|
|
message=f"{quest.owner.username} made a new dice call!",
|
|
)
|
|
m.save()
|
|
data = {}
|
|
data['message_id'] = m.id
|
|
data['message'] = m.message
|
|
data['date'] = int(time.time())
|
|
data['name'] = server.username
|
|
data['anonymize'] = False
|
|
socket.send('message', data)
|
|
|
|
|
|
def poll_post(socket, data):
|
|
"""
|
|
Called when the QM makes a new dice post.
|
|
"""
|
|
page_num = data.get('page_num')
|
|
form = PollForm(data)
|
|
if not form.is_valid():
|
|
return # error message?
|
|
form = form.cleaned_data
|
|
|
|
quest=Quest.objects.get(id=socket.quest_id)
|
|
p = Post(
|
|
quest=quest,
|
|
page=Page.objects.get(quest=quest, page_num=page_num),
|
|
post_type='poll',
|
|
post_text="Poll"
|
|
)
|
|
p.save()
|
|
pl = Poll(
|
|
post=p,
|
|
multi_choice=form.pop('multi_choice'),
|
|
allow_writein=form.pop('allow_writein'),
|
|
open=True
|
|
)
|
|
pl.save()
|
|
options = []
|
|
for key, option in form.items():
|
|
o = PollOption(
|
|
poll=pl,
|
|
text=option
|
|
)
|
|
o.save()
|
|
options.append(o)
|
|
|
|
data = {}
|
|
data['post_type'] = 'poll'
|
|
data['post_id'] = p.id
|
|
data['post_text'] = "Poll"
|
|
data['date'] = localtime(p.timestamp).strftime('%Y-%m-%d %H:%M')
|
|
data['options'] = [(o.id, o.text) for o in options]
|
|
socket.send('new_post', data)
|
|
|
|
server = User.objects.get(id=settings.SERVER_USER_ID)
|
|
m = Message(
|
|
quest=quest,
|
|
user=server,
|
|
message=f"{quest.owner.username} made a new poll!",
|
|
)
|
|
m.save()
|
|
data = {}
|
|
data['message_id'] = m.id
|
|
data['message'] = m.message
|
|
data['date'] = int(time.time())
|
|
data['name'] = server.username
|
|
data['anonymize'] = False
|
|
socket.send('message', data)
|
|
|
|
|
|
def edit_post(socket, data):
|
|
"""
|
|
Called when the QM saves an edited post.
|
|
"""
|
|
post_id = data.get('post_id')
|
|
post_text = data.get('post_text')
|
|
|
|
# cleaning
|
|
post_text = bleach.clean(post_text.strip())
|
|
post_text = post_text.replace("\n", "<br>")
|
|
|
|
# handle image
|
|
|
|
p = Post.objects.get(id=post_id)
|
|
p.post_text = post_text
|
|
p.save()
|
|
|
|
data = {}
|
|
data['post_text'] = post_text
|
|
data['post_type'] = 'text'
|
|
data['post_id'] = p.id
|
|
socket.send('update_post', data)
|
|
|
|
|
|
def close_post(socket, data):
|
|
"""
|
|
Called when the QM closes an open post.
|
|
"""
|
|
post_id = data.get('post_id')
|
|
p = Post.objects.get(id=post_id)
|
|
if data.get('post_type') == 'dice':
|
|
p.dicecall.open = False
|
|
p.dicecall.save()
|
|
elif data.get('post_type') == 'poll':
|
|
p.poll.open = False
|
|
p.poll.save()
|
|
|
|
data = {}
|
|
data['post_id'] = post_id
|
|
socket.send('close_post', data)
|
|
|
|
|
|
def open_post(socket, data):
|
|
"""
|
|
Called when the QM opens a closed post.
|
|
"""
|
|
# TODO: only posts on last page can be opened
|
|
post_id = data.get('post_id')
|
|
p = Post.objects.get(id=post_id)
|
|
if data.get('post_type') == 'dice':
|
|
posts = Post.objects.filter(
|
|
quest__id=socket.quest_id, post_type='dice', dicecall__open=True)
|
|
for post in posts:
|
|
post.dicecall.open = False
|
|
post.dicecall.save()
|
|
socket.send('close_all_posts', {'post_type': 'dice'})
|
|
|
|
p.dicecall.open = True
|
|
p.dicecall.save()
|
|
elif data.get('post_type') == 'poll':
|
|
p.poll.open = True
|
|
p.poll.save()
|
|
|
|
data = {}
|
|
data['post_id'] = post_id
|
|
socket.send('open_post', data)
|
|
|
|
|
|
def vote(socket, data):
|
|
"""
|
|
Called when a player votes in a poll.
|
|
"""
|
|
post_id = data.get('post_id')
|
|
option_id = data.get('option_id')
|
|
polarity = data.get('polarity')
|
|
ip_address = socket.scope['client'][0]
|
|
user = socket.scope['user']
|
|
|
|
if polarity == False:
|
|
v = PollVote.objects.get(ip_address=ip_address, option__id=option_id)
|
|
v.delete()
|
|
else:
|
|
p = Poll.objects.get(post_id=post_id)
|
|
if p.multi_choice == False:
|
|
votes = PollVote.objects.filter(
|
|
ip_address=ip_address,
|
|
option__poll=p
|
|
)
|
|
for vote in votes:
|
|
vote.delete()
|
|
|
|
data = {}
|
|
data['option_id'] = vote.option.id
|
|
data['polarity'] = False
|
|
socket.send('vote', data)
|
|
socket.self_send('set_option_box', data)
|
|
v = PollVote(
|
|
option=PollOption.objects.get(id=option_id),
|
|
ip_address=ip_address
|
|
)
|
|
if user.username:
|
|
v.user = user
|
|
try:
|
|
v.save()
|
|
except IntegrityError: # shouldn't we check this first?
|
|
return
|
|
|
|
data = {}
|
|
data['option_id'] = option_id
|
|
data['polarity'] = polarity
|
|
socket.send('vote', data)
|
|
|
|
|
|
def write_in(socket, data):
|
|
"""
|
|
Called when a player creates a new write-in.
|
|
"""
|
|
post_id = data.get('post_id')
|
|
option_text = data.get('option_text', '')
|
|
user = socket.scope['user']
|
|
|
|
option_text = option_text.strip()
|
|
if not option_text:
|
|
return
|
|
option_text = "Write in: " + bleach.clean(option_text)
|
|
if len(option_text) > 200:
|
|
# error message?
|
|
return
|
|
p = Poll.objects.get(post_id=post_id)
|
|
o = PollOption(
|
|
poll=p,
|
|
text=option_text
|
|
)
|
|
o.save()
|
|
|
|
data = {}
|
|
data['post_id'] = post_id
|
|
data['post_type'] = 'poll'
|
|
data['option_id'] = o.id
|
|
data['option_text'] = option_text
|
|
socket.send('update_post', data)
|
|
|
|
|
|
def new_page(socket, data):
|
|
"""
|
|
Called when the QM creates a new page.
|
|
"""
|
|
title = data.get('page_title')
|
|
appendix = bool(data.get('appendix'))
|
|
|
|
quest = Quest.objects.get(id=socket.quest_id)
|
|
if appendix:
|
|
page = Page.objects.filter(
|
|
quest=quest,
|
|
appendix=True
|
|
).order_by('page_num').last()
|
|
if page:
|
|
page_num = chr(ord(page.page_num)+1)
|
|
else:
|
|
page_num = 'a'
|
|
else:
|
|
last_page = Page.objects.filter(
|
|
quest=quest,
|
|
appendix=False
|
|
).order_by('page_num').last()
|
|
page_num = int(last_page.page_num) + 1
|
|
p = Page(
|
|
quest=quest,
|
|
page_num=page_num,
|
|
title=title,
|
|
appendix=appendix,
|
|
)
|
|
p.save()
|
|
|
|
data = {}
|
|
data['page_num'] = p.page_num
|
|
data['title'] = title
|
|
if appendix:
|
|
data['appendix'] = True
|
|
else:
|
|
data['appendix'] = False
|
|
data['url'] = reverse('quest:quest', args=[socket.quest_id, p.page_num])
|
|
socket.send('new_page', data)
|
|
|
|
server = User.objects.get(id=settings.SERVER_USER_ID)
|
|
m = Message(
|
|
quest=quest,
|
|
user=server,
|
|
message=f"{quest.owner.username} started a new page!",
|
|
)
|
|
m.save()
|
|
data = {}
|
|
data['message_id'] = m.id
|
|
data['message'] = m.message
|
|
data['date'] = int(time.time())
|
|
data['name'] = server.username
|
|
data['anonymize'] = False
|
|
socket.send('message', data)
|
|
|
|
|
|
events = {}
|
|
for obj in dir():
|
|
if type(locals()[obj]) == types.FunctionType:
|
|
events[locals()[obj].__name__] = locals()[obj]
|