#!/usr/bin/env python3 """ SocketIO events. """ import re import time import random import bleach from flask import session from flask_socketio import SocketIO, emit, join_room import tools import database as db socketio = SocketIO() @socketio.on('joined') def joined(data): """ Sent by clients when they enter a room. """ room = data["room"] join_room(room) @socketio.on('message') def message(data): """ Sent by a client when the user entered a new message. """ room = int(data["room"]) message = data["message"] name = data["name"] user_id = data.get("user_id") data = {} date = int(time.time()) data["date"] = date data["name"] = name data["user_id"] = user_id data["room"] = room message = message.strip() if not message: return tags = ["b", "code", "i", "s"] message = bleach.clean(message, tags=tags) lines = [] for line in message.splitlines(): if line.startswith(">"): line = '' + line + '' lines.append(line) message = "
".join(lines) message = tools.handle_img(message) if message.startswith("/dice") or message.startswith("/roll"): roll_msg = handle_dice(message) if roll_msg: message += '
' + roll_msg data["message"] = message db.log_chat_message(data) emit("message", data, room=room) def handle_dice(message): """ Handle /dice or /roll messages. """ reg = re.search(r"(\d+)d(\d+)([+-]\d+)?", message) if not reg: return try: groups = [0 if d is None else int(d) for d in reg.groups()] diceNum, diceSides, diceMod = groups assert 0 < diceNum < 100 assert 0 < diceSides < 100 assert -1000 < diceMod < 1000 except (ValueError, AssertionError): return dice = [random.randint(1, diceSides) for _ in range(diceNum)] total = sum(dice) if diceMod: total += diceMod msg = f"Rolled {', '.join(map(str, dice))}" if diceMod: if diceMod > 0: msg += " +" msg += " " + str(diceMod) msg += " = " + str(total) + "" return msg @socketio.on("new_post") def new_post(data, internal=False): """ Called when the QM makes a new post. """ room = data["room"] res = db.get_quest_meta(quest_id=room) if not res: return if session.get("user_id") != res[3]: return post = data["post"] if not internal: post = bleach.clean(post.strip()) post = post.replace("\n", "
") post = tools.handle_img(post) editable = data.get("editable", True) data = {} data["post"] = [post] data["date"] = int(time.time()) post_id = db.insert_quest_post(room, post, data["date"]) data["post_id"] = post_id data["editable"] = editable # TODO: enforce this on server-side emit("new_post", data, room=room) @socketio.on("update_post") def update_post(data): """ Called when the QM edits and saves a post. """ room = data["room"] res = db.get_quest_meta(quest_id=room) if not res: return if session.get("user_id") != res[3]: return post = data["post"] post = post.strip().replace("
", "
") post = tools.handle_img(post) post_id = data["post_id"] db.update_quest_post(post_id, post) data = {} data["post"] = post data["post_id"] = post_id emit("update_post", data, room=room) @socketio.on("dice_post") def dice_post(data): """ Called when the QM posts a new dice call. """ room = data["room"] res = db.get_quest_meta(quest_id=room) if not res: return if session.get("user_id") != res[3]: return data = {k: v for k, v in data.items() if v} try: diceNum = int(data.get("diceNum", 0)) diceSides = int(data.get("diceSides", 0)) diceMod = int(data.get("diceMod", 0)) diceChal = int(data.get("diceChal", 0)) diceRolls = int(data.get("diceRolls", 0)) assert 0 < diceNum < 100 assert 0 < diceSides < 100 assert -1000 < diceMod < 1000 assert 0 <= diceChal < 100 assert 0 <= diceRolls < 100 except (ValueError, AssertionError): return post = f"

Roll {data['diceNum']}d{data['diceSides']}" if diceMod: if diceMod > 0: post += "+" post += str(diceMod) if diceChal: post += " vs DC" + str(diceChal) new_post({"post": post, "room": room, "editable": False}, internal=True)