#!/usr/bin/env python3 """ SocketIO events. """ # TODO: harden the fuck up import re import time import random import functools import bleach from flask import session, request from flask_socketio import SocketIO, emit, join_room import tools import database as db socketio = SocketIO() def qm_only(msg=""): """ A decorator function to protect certain endpoints so that only the QM can access them. """ # TODO: better docstring, test this more thoroughly def actual_decorator(func): @functools.wraps(func) def _nop(*args, **kwargs): data = args[0] room = data.get("room") res = db.get_quest_meta(quest_id=room) if not res: return msg if session.get("user_id") != res[3]: return msg return func(*args, **kwargs) return _nop return actual_decorator @socketio.on('joined') def joined(data): """ Sent by clients when they enter a room. """ room = data["room"] join_room(room) @socketio.on('message') def message(data): """ Sent by a client when the user entered a new message. """ room = int(data["room"]) message = data["message"] name = data["name"] user_id = data.get("user_id") data = {} date = int(time.time()) data["date"] = date data["name"] = name data["user_id"] = user_id data["room"] = room message = message.strip() if not message: return tags = ["b", "code", "i", "s"] message = bleach.clean(message, tags=tags) lines = [] for line in message.splitlines(): if line.startswith(">"): line = '' + line + '' lines.append(line) message = "
".join(lines) message = tools.handle_img(message) data["message"] = message roll_msg = "" if message.startswith("/dice") or message.startswith("/roll"): roll_msg = handle_dice(data) if roll_msg: data["message"] += '
' + roll_msg + "" message_id = db.log_chat_message(data) emit("message", data, room=room) if roll_msg: dice_call_id = db.get_quest_meta(data["room"])[4] if dice_call_id: dice_call = db.get_dice_call(dice_call_id) dice_roll = re.search(r"(\d+d\d+(?:[+-]\d+)?)", message).group(1) if dice_call[2] and dice_roll != dice_call[1]: return roll_results = re.search(r"Rolled (.+) =", roll_msg).group(1) roll_total = int(re.search(r"= (\d+)$", roll_msg).group(1)) roll_data = (dice_roll, roll_results, roll_total) db.insert_quest_roll(message_id, room, dice_call_id, roll_data) if len(db.get_dice_rolls(post_id=dice_call_id)) == dice_call[4]: db.set_post_closed(room) emit("close_dice_call", {"post_id": dice_call_id}, room=room) room = data["room"] data = {} data["post"] = roll_msg + " (" + dice_roll + ")" if dice_call[3]: if roll_total >= dice_call[3]: data["post"] += " - Pass" else: data["post"] += " - Fail" data["post_type"] = "dice" data["post_id"] = dice_call_id emit("update_post", data, room=room) def handle_dice(data): """ Handle /dice or /roll messages. """ reg = re.search(r"(\d+)d(\d+)([+-]\d+)?", data["message"]) if not reg: return try: groups = [0 if d is None else int(d) for d in reg.groups()] diceNum, diceSides, diceMod = groups assert 0 < diceNum < 100 assert 0 < diceSides < 100 assert -1000 < diceMod < 1000 except (ValueError, AssertionError): return dice = [random.randint(1, diceSides) for _ in range(diceNum)] total = sum(dice) if diceMod: total += diceMod msg = f"Rolled {', '.join(map(str, dice))}" if diceMod: if diceMod > 0: msg += " + " + str(diceMod) else: msg += " - " + str(diceMod)[1:] msg += " = " + str(total) return msg @socketio.on("new_post") @qm_only() def new_post(data, internal=False): """ Called when the QM makes a new post. """ room = data.get("room") post = data.get("post") post = bleach.clean(post.strip()) post = post.replace("\n", "
") post = tools.handle_img(post) date = int(time.time()) post_id = db.insert_quest_post(room, "text", post, date) db.set_post_closed(room) data = {} data["post"] = [post] data["post_type"] = "text" data["date"] = date data["post_id"] = post_id emit("new_post", data, room=room) @socketio.on("update_post") @qm_only() def update_post(data): """ Called when the QM edits and saves a post. """ # TODO: enforce only update text posts room = data["room"] post = data["post"] post = post.strip().replace("
", "
") post = tools.handle_img(post) post_id = data["post_id"] db.update_quest_post(post_id, post) data = {} data["post"] = post data["post_id"] = post_id data["post_type"] = "text" emit("update_post", data, room=room) @socketio.on("dice_post") @qm_only() def dice_post(data): """ Called when the QM posts a new dice call. """ room = data["room"] data = {k: v for k, v in data.items() if v} try: diceNum = int(data.get("diceNum", 0)) diceSides = int(data.get("diceSides", 0)) diceMod = int(data.get("diceMod", 0)) diceChal = int(data.get("diceChal", 0)) diceRollsTaken = int(data.get("diceRollsTaken", 0)) assert 0 < diceNum < 100 assert 0 < diceSides < 100 assert -1000 < diceMod < 1000 assert 0 <= diceChal < 100 assert 0 <= diceRollsTaken < 100 except (ValueError, AssertionError): return diceStrict = bool(data.get("diceStrict")) dice_roll = f"{data['diceNum']}d{data['diceSides']}" if diceMod: if diceMod > 0: dice_roll += "+" dice_roll += str(diceMod) post = "Roll " + dice_roll if diceChal: post += " vs DC" + str(diceChal) date = int(time.time()) post_id = db.insert_quest_post(room, "dice", post, date) new_call = (dice_roll, diceStrict, diceChal, diceRollsTaken) db.insert_dice_call(post_id, room, new_call) db.set_post_open(post_id, room) data = {} data["post"] = post data["post_type"] = "dice" data["date"] = date data["post_id"] = post_id emit("new_post", data, room=room) @socketio.on('close_post') @qm_only() def close_dice_call(data): """ Closes an active post. """ room = data["room"] post_id = data.get("post_id") db.set_post_closed(room) data = {} data["post_id"] = post_id emit("close_post", data, room=room) @socketio.on("open_post") @qm_only() def open_dice_call(data): """ Opens an active post. This is only permitted if the active post is the last post in the quest. """ # TODO: enforce only open if last post room = data["room"] post_id = data.get("post_id") db.set_post_open(post_id, room) data = {} data["post_id"] = post_id emit("open_post", data, room=room) @socketio.on("poll_post") @qm_only() def poll_post(data): """ Called when the QM posts a new poll. """ room = data.pop("room", None) multi_choice = bool(data.pop("pollAllowMultipleChoices", None)) allow_writein = bool(data.pop("pollAllowUserOptions", None)) options = [] for key, value in data.items(): if not value or not key.startswith("pollOption-"): continue if len(value) >= 200: return value = bleach.clean(value).replace("\n", "") options.append(value) post = "Poll" date = int(time.time()) post_id = db.insert_quest_post(room, "poll", post, date) db.insert_poll(post_id, room, multi_choice, allow_writein) new_options = [] for option in options: option_id = db.insert_poll_option(post_id, option) new_options.append((option_id, option)) db.set_post_open(post_id, room) data = {} data["post"] = post data["post_id"] = post_id data["post_type"] = "poll" data["date"] = int(time.time()) data["options"] = new_options emit("new_post", data, room=room) @socketio.on("vote") def vote(data): """ Called when a user changes their vote on a poll. """ room = data.get("room") option_id = data.get("option_id") post_id = data.get("post_id") polarity = data.get("polarity") #ip_address = request.remote_addr ip_address = request.headers.get("X-Real-Ip") if not polarity: db.remove_poll_vote(option_id, ip_address) else: poll = db.get_poll(post_id) if poll[2]: # multi-vote allowed db.insert_poll_vote(option_id, ip_address) else: votes = db.get_poll_votes_voted(post_id, ip_address) for vote in votes: db.remove_poll_vote(vote[0], ip_address) data = {} data["option_id"] = vote[0] data["polarity"] = False emit("vote", data, room=room) emit("toggle_option_box", data, room=request.sid) db.insert_poll_vote(option_id, ip_address) data = {} data["option_id"] = option_id data["polarity"] = polarity emit("vote", data, room=room)