#!/usr/bin/env python3 """ Individual functions for handling WebSocket events. Gets called by the QuestConsumer object in consumers.py. """ # TODO: quest owner only events import re import time import types import random import bleach from django.utils.timezone import localtime from quest.models import Message, Quest, Post, DiceCall, DiceRoll from quest.forms import DiceCallForm def message(socket, data): """ Gets called when the server receives a 'message' event. """ # TODO: validation message = data.get('message') quest_id = data.get('quest_id') # cleaning message = message.strip() if not message: return tags = ["b", "code", "i", "s"] message = bleach.clean(message, tags=tags) # greentext lines = [] for line in message.splitlines(): if line.startswith(">") and not line.startswith(">>"): line = '' + line + '' lines.append(line) message = "
".join(lines) # quote links quotes = re.findall(r">>\d+", message) for quote in quotes: msg_id = quote.replace(">>", "") msg = '' message = message.replace(quote, msg) # handle image # dice rolling if any(map(message.startswith, ["/dice", "/roll"])): reg = re.search(r"(\d+)d(\d+)([+-]\d+)?", data["message"]) if not reg: return try: groups = [0 if d is None else int(d) for d in reg.groups()] dice_num, dice_sides, dice_mod = groups assert 0 < dice_num < 100 assert 0 < dice_sides < 100 assert -1000 < dice_mod < 1000 except (ValueError, AssertionError): return dice_results = [random.randint(1, dice_sides) for _ in range(dice_num)] total = sum(dice_results) + dice_mod roll_msg = f"Rolled {', '.join(map(str, dice_results))}" if dice_mod: if dice_mod > 0: roll_msg += " + " + str(dice_mod) else: roll_msg += " - " + str(dice_mod)[1:] roll_msg += " = " + str(total) message += '
' + roll_msg + "" user = socket.scope['user'] m = Message( quest=Quest.objects.get(id=quest_id), message=message) if user.username: m.user = user m.save() data = {} data['message_id'] = m.id data['message'] = message data['date'] = int(time.time()) data['name'] = user.username socket.send('message', data) # append rolls to dicecall if any(map(message.startswith, ["/dice", "/roll"])): try: dc = DiceCall.objects.get(post__quest__id=quest_id, open=True) except DiceCall.DoesNotExist: return dice_roll = f"{dice_num}d{dice_sides}" if dice_mod: if dice_mod > 0: dice_roll += "+" dice_roll += str(dice_mod) if dc.strict and dc.dice_roll != dice_roll: return dr = DiceRoll( dicecall=dc, message=m, roll=dice_roll, results=re.search(r"Rolled (.+) =", roll_msg).group(1), total=total, ) dr.save() if DiceRoll.objects.filter(dicecall=dc).count() == dc.rolls_taken: dc.open = False dc.save() socket.send('close_post', {'post_id': dc.post_id}) data = {} data['post_text'] = roll_msg + " (" + dice_roll + ")" if dc.dice_challenge: if total >= dc.dice_challenge: data["post_text"] += " - Pass" else: data["post_text"] += " - Fail" data['post_type'] = 'dice' data['post_id'] = dc.post_id socket.send('update_post', data) def text_post(socket, data): """ Called when the QM creates a new text post. """ # TODO: security quest_id = data.get('quest_id') post_text = data.get('text') page_num = data.get('page_num') # cleaning post_text = bleach.clean(post_text.strip()) post_text = text.replace("\n", "
") # handle image p = Post( quest=Quest.objects.get(id=quest_id), page_num=page_num, post_type='text', post_text=post_text) p.save() data = {} data['post_text'] = post_text data['post_type'] = 'text' data['date'] = localtime(p.timestamp).strftime('%Y-%m-%d %H:%M') data['post_id'] = p.id socket.send('new_post', data) def dice_post(socket, data): """ Called when the QM makes a new dice post. """ quest_id = data.get('quest_id') page_num = data.get('page_num') form = DiceCallForm(data) if not form.is_valid(): return # error message? form = form.cleaned_data posts = Post.objects.filter( quest__id=quest_id, post_type='dice', dicecall__open=True) for post in posts: post.dicecall.open = False post.dicecall.save() socket.send('close_all_posts') dice_roll = str(form['diceNum']) + "d" dice_roll += str(form['diceSides']) if form['diceMod']: if form['diceMod'] > 0: dice_roll += "+" dice_roll += str(form['diceMod']) post_text = "Roll " + dice_roll if form['diceChal']: post_text += " vs DC" + str(form['diceChal']) p = Post( quest=Quest.objects.get(id=quest_id), page_num=page_num, post_type='dice', post_text=post_text ) p.save() dc = DiceCall( post=p, dice_roll=dice_roll, strict=form['diceStrict'], dice_challenge=form['diceChal'], rolls_taken=form['diceRollsTaken'], open=True, ) dc.save() data = {} data['post_text'] = post_text data['post_type'] = 'dice' data['date'] = localtime(p.timestamp).strftime('%Y-%m-%d %H:%M') data['post_id'] = p.id socket.send('new_post', data) def close_post(socket, data): """ Called when the QM closes an open post. """ post_id = data.get('post_id') quest_id = data.get('quest_id') p = Post.objects.get(id=post_id) if data.get('post_type') == 'dice': p.dicecall.open = False p.dicecall.save() elif data.get('post_type') == 'poll': p.poll.open = False p.poll.save() data = {} data['post_id'] = post_id socket.send('close_post', data) def open_post(socket, data): """ Called when the QM opens a closed post. """ # TODO: only posts on last page can be opened post_id = data.get('post_id') quest_id = data.get('quest_id') p = Post.objects.get(id=post_id) if data.get('post_type') == 'dice': posts = Post.objects.filter( quest__id=quest_id, post_type='dice', dicecall__open=True) for post in posts: post.dicecall.open = False post.dicecall.save() socket.send('close_all_posts', {'post_type': 'dice'}) p.dicecall.open = True p.dicecall.save() elif data.get('post_type') == 'poll': p.poll.open = True p.poll.save() data = {} data['post_id'] = post_id socket.send('open_post', data) events = {} for obj in dir(): if type(locals()[obj]) == types.FunctionType: events[locals()[obj].__name__] = locals()[obj]