#!/usr/bin/env python3 """ Individual functions for handling WebSocket events. Gets called by the QuestConsumer object in consumers.py. """ import re import time import types import random import bleach from django.conf import settings from django.db import IntegrityError from django.utils.timezone import localtime from django.urls import reverse from quest.models import * from quest.tools import handle_img from quest.forms import DiceCallForm, PollForm from user.models import User def message(socket, data): """ Gets called when the server receives a 'message' event. """ message = data.get('message') if not all(locals().values()): return # cleaning message = message[:512] message = message.strip() message = re.sub(r'\n\n+', '\n\n', message) if not message: return tags = ["b", "code", "i", "s"] message = bleach.clean(message, tags=tags) # greentext lines = [] for line in message.splitlines(): if line.startswith(">") and not line.startswith(">>"): line = '' + line + '' lines.append(line) message = "
".join(lines) # quote links quotes = re.findall(r">>\d+", message) for quote in quotes: msg_id = quote.replace(">>", "") msg = '' message = message.replace(quote, msg) # handle image message = handle_img(message) # dice rolling if any(map(message.startswith, ["/dice", "/roll"])): reg = re.search(r"(\d+)d(\d+)([+-]\d+)?", data["message"]) if not reg: return try: # TODO: form validation? groups = [0 if d is None else int(d) for d in reg.groups()] dice_num, dice_sides, dice_mod = groups assert 1 <= dice_num <= 256 assert 1 <= dice_sides <= 256 assert -65536 <= dice_mod <= 65536 except (ValueError, AssertionError): return dice_results = [random.randint(1, dice_sides) for _ in range(dice_num)] total = sum(dice_results) + dice_mod roll_msg = f"Rolled {', '.join(map(str, dice_results))}" if dice_mod: if dice_mod > 0: roll_msg += " + " + str(dice_mod) else: roll_msg += " - " + str(dice_mod)[1:] roll_msg += " = " + str(total) message += '
' + roll_msg + "" user = socket.scope['user'] quest = Quest.objects.get(id=socket.quest_id) m = Message( quest=quest, message=message) if user.is_authenticated: m.user = user m.save() if user.is_authenticated: anonymize = (user.anonymize or quest.anonymize) and user != quest.owner else: anonymize = True data = {} data['message_id'] = m.id data['message'] = message data['date'] = int(time.time()) data['name'] = user.username data['anonymize'] = anonymize socket.send('message', data) # append rolls to dicecall if any(map(message.startswith, ["/dice", "/roll"])): try: dc = DiceCall.objects.get( post__quest__id=socket.quest_id, open=True ) except DiceCall.DoesNotExist: return dice_roll = f"{dice_num}d{dice_sides}" if dice_mod: if dice_mod > 0: dice_roll += "+" dice_roll += str(dice_mod) if dc.strict and dc.dice_roll != dice_roll: return dr = DiceRoll( dicecall=dc, message=m, roll=dice_roll, results=re.search(r"Rolled (.+) =", roll_msg).group(1), total=total, ) dr.save() if DiceRoll.objects.filter(dicecall=dc).count() == dc.rolls_taken: dc.open = False dc.save() socket.send('close_post', {'post_id': dc.post_id}) data = {} data['post_text'] = roll_msg + " (" + dice_roll + ")" if dc.dice_challenge: if total >= dc.dice_challenge: data["post_text"] += " - Pass" else: data["post_text"] += " - Fail" data['post_type'] = 'dice' data['post_id'] = dc.post_id socket.send('update_post', data) def text_post(socket, data): """ Called when the QM creates a new text post. """ quest = Quest.objects.get(id=socket.quest_id) user = socket.scope['user'] if quest.owner != user: # 401 not allowed return post_text = data.get('text') page_num = data.get('page_num') if not all(locals().values()): return try: page = Page.objects.get(quest=quest, page_num=page_num) except Page.DoesNotExist: return # cleaning post_text = bleach.clean(post_text.strip()) post_text = post_text.replace("\n", "
") # handle image post_text = handle_img(post_text) p = Post( quest=quest, page=page, post_type='text', post_text=post_text) p.save() data = {} data['post_text'] = post_text data['post_type'] = 'text' data['date'] = localtime(p.timestamp).strftime('%Y-%m-%d %H:%M') data['post_id'] = p.id socket.send('new_post', data) m = Message( quest=quest, server=True, message=f"{quest.owner.username} made a new post!", ) m.save() data = {} data['message_id'] = m.id data['message'] = m.message data['date'] = int(time.time()) data['name'] = 'Server' data['anonymize'] = False socket.send('message', data) def dice_post(socket, data): """ Called when the QM makes a new dice post. """ quest = Quest.objects.get(id=socket.quest_id) user = socket.scope['user'] if quest.owner != user: # 401 not allowed return page_num = data.get('page_num') try: page = Page.objects.get(quest=quest, page_num=page_num) except Page.DoesNotExist: return form = DiceCallForm(data) if not form.is_valid(): # TODO: error message event return form = form.cleaned_data posts = Post.objects.filter( quest=quest, post_type='dice', dicecall__open=True ) for post in posts: post.dicecall.open = False post.dicecall.save() socket.send('close_all_posts', {'post_type': 'dice'}) dice_roll = str(form['diceNum']) + "d" dice_roll += str(form['diceSides']) if form['diceMod']: if form['diceMod'] > 0: dice_roll += "+" dice_roll += str(form['diceMod']) post_text = "Roll " + dice_roll if form['diceChal']: post_text += " vs DC" + str(form['diceChal']) p = Post( quest=quest, page=page, post_type='dice', post_text=post_text ) p.save() dc = DiceCall( post=p, dice_roll=dice_roll, strict=form['diceStrict'], dice_challenge=form['diceChal'], rolls_taken=form['diceRollsTaken'], open=True, ) dc.save() data = {} data['post_id'] = p.id data['post_type'] = 'dice' data['post_text'] = post_text data['date'] = localtime(p.timestamp).strftime('%Y-%m-%d %H:%M') socket.send('new_post', data) m = Message( quest=quest, server=True, message=f"{quest.owner.username} made a new dice call!", ) m.save() data = {} data['message_id'] = m.id data['message'] = m.message data['date'] = int(time.time()) data['name'] = 'Server' data['anonymize'] = False socket.send('message', data) def poll_post(socket, data): """ Called when the QM makes a new dice post. """ quest = Quest.objects.get(id=socket.quest_id) user = socket.scope['user'] if quest.owner != user: # 401 not allowed return page_num = data.get('page_num') try: page = Page.objects.get(quest=quest, page_num=page_num) except Page.DoesNotExist: return form = PollForm(data) if not form.is_valid(): return # error message? form = form.cleaned_data multi_choice = form.pop('multi_choice') allow_writein = form.pop('allow_writein') p = Post( quest=quest, page=page, post_type='poll', post_text="Poll" ) p.save() pl = Poll( post=p, multi_choice=multi_choice, allow_writein=allow_writein, open=True ) pl.save() options = [] for key, option in form.items(): o = PollOption( poll=pl, text=option ) o.save() options.append(o) data = {} data['post_type'] = 'poll' data['post_id'] = p.id data['post_text'] = "Poll" data['date'] = localtime(p.timestamp).strftime('%Y-%m-%d %H:%M') data['options'] = [(o.id, o.text) for o in options] data['allow_writein'] = allow_writein socket.send('new_post', data) m = Message( quest=quest, server=True, message=f"{quest.owner.username} made a new poll!", ) m.save() data = {} data['message_id'] = m.id data['message'] = m.message data['date'] = int(time.time()) data['name'] = 'Server' data['anonymize'] = False socket.send('message', data) def edit_post(socket, data): """ Called when the QM saves an edited post. """ quest = Quest.objects.get(id=socket.quest_id) user = socket.scope['user'] if quest.owner != user: # 401 not allowed return post_id = data.get('post_id') post_text = data.get('post_text') if not all(locals().values()): return try: p = Post.objects.get(id=post_id) except Post.DoesNotExist: return # cleaning post_text = bleach.clean(post_text.strip()) post_text = post_text.replace("\n", "
") # handle image post_text = handle_img(post_text) p.post_text = post_text p.save() data = {} data['post_text'] = post_text data['post_type'] = 'text' data['post_id'] = p.id socket.send('update_post', data) def close_post(socket, data): """ Called when the QM closes an open post. """ quest = Quest.objects.get(id=socket.quest_id) user = socket.scope['user'] if quest.owner != user: # 401 not allowed return post_id = data.get('post_id') try: p = Post.objects.get(id=post_id) except Post.DoesNotExist: return if data.get('post_type') == 'dice': p.dicecall.open = False p.dicecall.save() elif data.get('post_type') == 'poll': p.poll.open = False p.poll.save() data = {} data['post_id'] = post_id socket.send('close_post', data) def open_post(socket, data): """ Called when the QM opens a closed post. """ quest = Quest.objects.get(id=socket.quest_id) user = socket.scope['user'] if quest.owner != user: # 401 not allowed return post_id = data.get('post_id') try: p = Post.objects.get(id=post_id) except Post.DoesNotExist: return if data.get('post_type') == 'dice': posts = Post.objects.filter( quest=quest, post_type='dice', dicecall__open=True ) for post in posts: post.dicecall.open = False post.dicecall.save() socket.send('close_all_posts', {'post_type': 'dice'}) p.dicecall.open = True p.dicecall.save() elif data.get('post_type') == 'poll': p.poll.open = True p.poll.save() data = {} data['post_id'] = post_id socket.send('open_post', data) def new_page(socket, data): """ Called when the QM creates a new page. """ quest = Quest.objects.get(id=socket.quest_id) user = socket.scope['user'] if quest.owner != user: # 401 not allowed return title = data.get('page_title') appendix = bool(data.get('appendix')) if not all(locals().values()): return if appendix: page = Page.objects.filter( quest=quest, appendix=True ).order_by('page_num').last() if page: page_num = chr(ord(page.page_num)+1) else: page_num = 'a' else: last_page = Page.objects.filter( quest=quest, appendix=False ).order_by('page_num').last() page_num = int(last_page.page_num) + 1 p = Page( quest=quest, page_num=page_num, title=title, appendix=appendix, ) p.save() data = {} data['page_num'] = p.page_num data['title'] = title if appendix: data['appendix'] = True else: data['appendix'] = False data['url'] = reverse('quest:quest', args=[socket.quest_id, p.page_num]) socket.send('new_page', data) server = User.objects.get(id=settings.SERVER_USER_ID) m = Message( quest=quest, user=server, message=f"{quest.owner.username} started a new page!", ) m.save() data = {} data['message_id'] = m.id data['message'] = m.message data['date'] = int(time.time()) data['name'] = server.username data['anonymize'] = False socket.send('message', data) def vote(socket, data): """ Called when a player votes in a poll. """ post_id = data.get('post_id') option_id = data.get('option_id') polarity = data.get('polarity') ip_address = socket.scope['client'][0] user = socket.scope['user'] if not all(locals().values()): return try: p = Poll.objects.get(post_id=post_id) except Poll.DoesNotExist: return if not p.open: return if polarity == False: # player removes his vote try: pv = PollVote.objects.get( ip_address=ip_address, option__id=option_id ) except PollVote.DoesNotExist: return pv.delete() else: # player makes a new vote try: option = PollOption.objects.get(id=option_id) except PollOption.DoesNotExist: return pvs = PollVote.objects.filter(option=option, ip_address=ip_address) if pvs.count() != 0: # if player has voted for this option already return if p.multi_choice == False: votes = PollVote.objects.filter( ip_address=ip_address, option__poll=p ) for vote in votes: vote.delete() data = {} data['option_id'] = vote.option.id data['polarity'] = False socket.send('vote', data) socket.self_send('set_option_box', data) pv = PollVote( option=option, ip_address=ip_address ) if user.is_authenticated: pv.user = user pv.save() data = {} data['option_id'] = option_id data['polarity'] = polarity socket.send('vote', data) def write_in(socket, data): """ Called when a player creates a new write-in. """ post_id = data.get('post_id') option_text = data.get('option_text') user = socket.scope['user'] if not all(locals().values()): return try: p = Poll.objects.get(post_id=post_id) except Poll.DoesNotExist: return if not p.allow_writein: return option_text = option_text.strip() if not option_text: return option_text = "Write-in: " + bleach.clean(option_text) if len(option_text) > 200: # TODO: error message event return o = PollOption( poll=p, text=option_text ) o.save() data = {} data['post_id'] = post_id data['post_type'] = 'poll' data['option_id'] = o.id data['option_text'] = option_text socket.send('update_post', data) events = {} for obj in dir(): if type(locals()[obj]) == types.FunctionType: events[locals()[obj].__name__] = locals()[obj]