#!/usr/bin/env python3 """ Individual functions for handling WebSocket events. Gets called by the QuestConsumer object in consumers.py. """ # TODO: quest owner only events # TODO: try/except on database calls import re import time import types import random import bleach from django.db import IntegrityError from django.utils.timezone import localtime from django.urls import reverse from quest.models import * from quest.tools import handle_img from quest.forms import DiceCallForm, PollForm def message(socket, data): """ Gets called when the server receives a 'message' event. """ # TODO: validation message = data.get('message') # cleaning message = message[:512] message = message.strip() if not message: return tags = ["b", "code", "i", "s"] message = bleach.clean(message, tags=tags) # greentext lines = [] for line in message.splitlines(): if line.startswith(">") and not line.startswith(">>"): line = '' + line + '' lines.append(line) message = "
".join(lines) # quote links quotes = re.findall(r">>\d+", message) for quote in quotes: msg_id = quote.replace(">>", "") msg = '' message = message.replace(quote, msg) # handle image message = handle_img(message) # dice rolling if any(map(message.startswith, ["/dice", "/roll"])): reg = re.search(r"(\d+)d(\d+)([+-]\d+)?", data["message"]) if not reg: return try: groups = [0 if d is None else int(d) for d in reg.groups()] dice_num, dice_sides, dice_mod = groups assert 1 <= dice_num <= 256 assert 1 <= dice_sides <= 256 assert -65536 <= dice_mod <= 65536 except (ValueError, AssertionError): return dice_results = [random.randint(1, dice_sides) for _ in range(dice_num)] total = sum(dice_results) + dice_mod roll_msg = f"Rolled {', '.join(map(str, dice_results))}" if dice_mod: if dice_mod > 0: roll_msg += " + " + str(dice_mod) else: roll_msg += " - " + str(dice_mod)[1:] roll_msg += " = " + str(total) message += '
' + roll_msg + "" user = socket.scope['user'] m = Message( quest=Quest.objects.get(id=socket.quest_id), message=message) if user.username: m.user = user m.save() data = {} data['message_id'] = m.id data['message'] = message data['date'] = int(time.time()) data['name'] = user.username socket.send('message', data) # append rolls to dicecall if any(map(message.startswith, ["/dice", "/roll"])): try: dc = DiceCall.objects.get( post__quest__id=socket.quest_id, open=True ) except DiceCall.DoesNotExist: return dice_roll = f"{dice_num}d{dice_sides}" if dice_mod: if dice_mod > 0: dice_roll += "+" dice_roll += str(dice_mod) if dc.strict and dc.dice_roll != dice_roll: return dr = DiceRoll( dicecall=dc, message=m, roll=dice_roll, results=re.search(r"Rolled (.+) =", roll_msg).group(1), total=total, ) dr.save() if DiceRoll.objects.filter(dicecall=dc).count() == dc.rolls_taken: dc.open = False dc.save() socket.send('close_post', {'post_id': dc.post_id}) data = {} data['post_text'] = roll_msg + " (" + dice_roll + ")" if dc.dice_challenge: if total >= dc.dice_challenge: data["post_text"] += " - Pass" else: data["post_text"] += " - Fail" data['post_type'] = 'dice' data['post_id'] = dc.post_id socket.send('update_post', data) def text_post(socket, data): """ Called when the QM creates a new text post. """ # TODO: security post_text = data.get('text') page_num = data.get('page_num') # cleaning post_text = bleach.clean(post_text.strip()) post_text = post_text.replace("\n", "
") # handle image p = Post( quest=Quest.objects.get(id=socket.quest_id), page_num=page_num, post_type='text', post_text=post_text) p.save() data = {} data['post_text'] = post_text data['post_type'] = 'text' data['date'] = localtime(p.timestamp).strftime('%Y-%m-%d %H:%M') data['post_id'] = p.id socket.send('new_post', data) def dice_post(socket, data): """ Called when the QM makes a new dice post. """ page_num = data.get('page_num') form = DiceCallForm(data) if not form.is_valid(): return # error message? form = form.cleaned_data posts = Post.objects.filter( quest__id=socket.quest_id, post_type='dice', dicecall__open=True) for post in posts: post.dicecall.open = False post.dicecall.save() socket.send('close_all_posts') dice_roll = str(form['diceNum']) + "d" dice_roll += str(form['diceSides']) if form['diceMod']: if form['diceMod'] > 0: dice_roll += "+" dice_roll += str(form['diceMod']) post_text = "Roll " + dice_roll if form['diceChal']: post_text += " vs DC" + str(form['diceChal']) p = Post( quest=Quest.objects.get(id=socket.quest_id), page_num=page_num, post_type='dice', post_text=post_text ) p.save() dc = DiceCall( post=p, dice_roll=dice_roll, strict=form['diceStrict'], dice_challenge=form['diceChal'], rolls_taken=form['diceRollsTaken'], open=True, ) dc.save() data = {} data['post_id'] = p.id data['post_type'] = 'dice' data['post_text'] = post_text data['date'] = localtime(p.timestamp).strftime('%Y-%m-%d %H:%M') socket.send('new_post', data) def poll_post(socket, data): """ Called when the QM makes a new dice post. """ page_num = data.get('page_num') form = PollForm(data) if not form.is_valid(): return # error message? form = form.cleaned_data p = Post( quest=Quest.objects.get(id=socket.quest_id), page_num=page_num, post_type='poll', post_text="Poll" ) p.save() pl = Poll( post=p, multi_choice=form.pop('multi_choice'), allow_writein=form.pop('allow_writein'), open=True ) pl.save() options = [] for key, option in form.items(): o = PollOption( poll=pl, text=option ) o.save() options.append(o) data = {} data['post_type'] = 'poll' data['post_id'] = p.id data['post_text'] = "Poll" data['date'] = localtime(p.timestamp).strftime('%Y-%m-%d %H:%M') data['options'] = [(o.id, o.text) for o in options] socket.send('new_post', data) def edit_post(socket, data): """ Called when the QM saves an edited post. """ post_id = data.get('post_id') post_text = data.get('post_text') # cleaning post_text = bleach.clean(post_text.strip()) post_text = post_text.replace("\n", "
") # handle image p = Post.objects.get(id=post_id) p.post_text = post_text p.save() data = {} data['post_text'] = post_text data['post_type'] = 'text' data['post_id'] = p.id socket.send('update_post', data) def close_post(socket, data): """ Called when the QM closes an open post. """ post_id = data.get('post_id') p = Post.objects.get(id=post_id) if data.get('post_type') == 'dice': p.dicecall.open = False p.dicecall.save() elif data.get('post_type') == 'poll': p.poll.open = False p.poll.save() data = {} data['post_id'] = post_id socket.send('close_post', data) def open_post(socket, data): """ Called when the QM opens a closed post. """ # TODO: only posts on last page can be opened post_id = data.get('post_id') p = Post.objects.get(id=post_id) if data.get('post_type') == 'dice': posts = Post.objects.filter( quest__id=socket.quest_id, post_type='dice', dicecall__open=True) for post in posts: post.dicecall.open = False post.dicecall.save() socket.send('close_all_posts', {'post_type': 'dice'}) p.dicecall.open = True p.dicecall.save() elif data.get('post_type') == 'poll': p.poll.open = True p.poll.save() data = {} data['post_id'] = post_id socket.send('open_post', data) def vote(socket, data): """ Called when a player votes in a poll. """ post_id = data.get('post_id') option_id = data.get('option_id') polarity = data.get('polarity') ip_address = socket.scope['client'][0] user = socket.scope['user'] if polarity == False: v = PollVote.objects.get(ip_address=ip_address, option__id=option_id) v.delete() else: p = Poll.objects.get(post_id=post_id) if p.multi_choice == False: votes = PollVote.objects.filter( ip_address=ip_address, option__poll=p ) for vote in votes: vote.delete() data = {} data['option_id'] = vote.option.id data['polarity'] = False socket.send('vote', data) socket.self_send('set_option_box', data) v = PollVote( option=PollOption.objects.get(id=option_id), ip_address=ip_address ) if user.username: v.user = user try: v.save() except IntegrityError: # shouldn't we check this first? return data = {} data['option_id'] = option_id data['polarity'] = polarity socket.send('vote', data) def write_in(socket, data): """ Called when a player creates a new write-in. """ post_id = data.get('post_id') option_text = data.get('option_text', '') user = socket.scope['user'] option_text = option_text.strip() if not option_text: return option_text = "Write in: " + bleach.clean(option_text) if len(option_text) > 200: # error message? return p = Poll.objects.get(post_id=post_id) o = PollOption( poll=p, text=option_text ) o.save() data = {} data['post_id'] = post_id data['post_type'] = 'poll' data['option_id'] = o.id data['option_text'] = option_text socket.send('update_post', data) def new_page(socket, data): """ Called when the QM creates a new page. """ title = data.get('page_title') quest = Quest.objects.get(id=socket.quest_id) p = PageTitle( quest=quest, page_num=PageTitle.objects.filter(quest=quest).count() + 1, title=title, ) p.save() data = {} data['page_num'] = p.page_num data['title'] = title data['url'] = reverse('quest:quest', args=[socket.quest_id, p.page_num]) socket.send('new_page', data) events = {} for obj in dir(): if type(locals()[obj]) == types.FunctionType: events[locals()[obj].__name__] = locals()[obj]