#!/usr/bin/env python3 """ Simple file host using Flask. """ import os import time import string import urllib import functools from flask import Flask, session, request, abort, redirect, url_for, g, \ render_template from flask_socketio import SocketIO, emit, join_room from flask_paranoid import Paranoid import MySQLdb import bleach from passlib.hash import argon2 class ReverseProxied(object): """ Wrap the application in this middleware and configure the front-end server to add these headers, to let you quietly bind this to a URL other than / and to an HTTP scheme that is different than what is used locally. In nginx: location /myprefix { proxy_pass http://192.168.0.1:5001; proxy_set_header Host $host; proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for; proxy_set_header X-Scheme $scheme; proxy_set_header X-Script-Name /myprefix; } :param app: the WSGI application """ def __init__(self, app): self.app = app def __call__(self, environ, start_response): script_name = environ.get('HTTP_X_SCRIPT_NAME', '') if script_name: environ['SCRIPT_NAME'] = script_name path_info = environ['PATH_INFO'] if path_info.startswith(script_name): environ['PATH_INFO'] = path_info[len(script_name):] scheme = environ.get('HTTP_X_SCHEME', '') if scheme: environ['wsgi.url_scheme'] = scheme return self.app(environ, start_response) app = Flask(__name__) app.wsgi_app = ReverseProxied(app.wsgi_app) app.config['MAX_CONTENT_LENGTH'] = 16 * 1024 * 1024 socketio = SocketIO(app) paranoid = Paranoid(app) paranoid.redirect_view = 'index' def db_execute(*args, **kwargs): """ Opens a connection to the app's database and executes the SQL statements passed to this function. """ passwd = app.config.get("DB_KEY") with MySQLdb.connect(user="anonkun", passwd=passwd, db="anonkun") as cur: cur.execute(*args, **kwargs) return cur def init(): """ Initializes the application. """ # init secret key if os.path.exists("secret_key"): with open("secret_key", "rb") as file: secret_key = file.read() else: secret_key = os.urandom(64) with open("secret_key", "wb") as file: file.write(secret_key) app.secret_key = secret_key # init db with open("db_key", "r") as file: passwd = file.read().strip() # TODO: encrypt this app.config["DB_KEY"] = passwd try: db_execute("SELECT * FROM `users`").fetchone() except MySQLdb.ProgrammingError: # database not initialized with open("anonkun.sql", "r") as file: commands = file.read().split(";") for cmd in commands: cmd = cmd.strip() if not cmd: continue db_execute(cmd) def login_required(url=None): """ A decorator function to protect certain endpoints by requiring the user to either pass a valid session cookie, or pass thier username and password along with the request to login. """ def actual_decorator(func): @functools.wraps(func) def _nop(*args, **kwargs): username = session.get("username") if verify_username(username): return func(*args, **kwargs) username = request.form.get("user") password = request.form.get("pass") if verify_password(username, password): return func(*args, **kwargs) if url: return redirect(url_for(url)) else: abort(401) return _nop return actual_decorator def add_user(username, password): """ Adds a user to the database. """ if verify_username(username): # username taken return "username_taken" elif len(username) > 20: return "username_too_long" pw_hash = argon2.hash(password) db_execute( "INSERT INTO `users` (`username`, `password_hash`) VALUES (%s, %s)", (username, pw_hash)) return "success" def verify_password(username, password): """ Verifies a user's password. """ user = verify_username(username) if not user: return False user_id, _, pw_hash = user if argon2.verify(password, pw_hash): session["user_id"] = user_id return True else: return False def verify_username(username): """ Checks to see if the given username is in the database. """ user = db_execute("SELECT * FROM `users` WHERE `username` = %s", (username,)).fetchone() if user: return user else: return False @socketio.on('joined', namespace="/chat") def joined(data): """ Sent by clients when they enter a room. """ room = data["room"] join_room(room) @socketio.on('message', namespace="/chat") def text(data): """ Sent by a client when the user entered a new message. """ room = data["room"] message = data["message"] name = data["name"] date = int(time.time()) data["date"] = date emit('message', data, room=room) @app.template_filter("strftime") def unix2string(unix): """ Converts a unix timestamp into a string. """ form = "%Y-%m-%d %H:%M:%S" t = time.localtime(unix) return time.strftime(form, t) messages = [{"name":"Anonymous", "date":1528998539, "message":"lol"}, {"name":"Namefag", "date":1528998521, "message":"kek"}] @app.route("/quest/") def quest(quest_title): """ An arbituary quest page. """ quest_title, _, extra = quest_title.partition("/") res = db_execute("SELECT * FROM `quests` WHERE `ident_title` = %s", (quest_title,)) data = res.fetchone() if not data: abort(404) quest_id, quest_title, _, owner_id, quest_data = data res = db_execute("SELECT * FROM `chat_messages` WHERE `room_id` = '%s'", (quest_id,)) messages = res.fetchall() return render_template('quest.html', quest_title=quest_title, quest_body=quest_data, room_id=quest_id, messages=messages) @app.route("/create_quest", methods=["GET", "POST"]) @login_required("login") def create_quest(): """ Starts a new quest. """ if request.method == "GET": return render_template("create_quest.html") canon_title = request.form.get("quest_title") quest_body = request.form.get("quest_body") ident_title = sanitize_title(canon_title) quest_body = bleach.clean(quest_body.strip()) quest_body = quest_body.replace("\n", "
") db_execute( "INSERT INTO `quests` (`canon_title`, `ident_title`, `quest_data`) " \ "VALUES (%s, %s, %s)", (canon_title, ident_title, quest_body)) return redirect(url_for('quest', quest_title=ident_title)) @app.route("/login", methods=["GET", "POST"]) def login(): """ Logs the user in. """ if request.method == "GET": return render_template("login.html") username = request.form.get("user") password = request.form.get("pass") if verify_password(username, password): session["username"] = username return redirect(url_for("index")) else: abort(401) @app.route("/signup", methods=["GET", "POST"]) def signup(): """ Create a new account. """ if request.method == "GET": return render_template("signup.html") username = request.form.get("user") password = request.form.get("pass") verify_password = request.form.get("verify_pass") if len(username) > 20: "username_too_long" elif len(username) < 3: "username_too_short" chrs = [c not in string.ascii_letters + string.digits for c in username] if any(chrs): "username_bad_chars" if verify_username(username): "username_taken" if len(password) > 1024: "password_too_long" elif len(password) < 8: "password_too_short" if password != verify_password: "passwords_dont_match" res = add_user(username, password) return redirect(url_for("index")) @app.route("/") def index(): """ The index page. """ return render_template("index.html") def sanitize_title(canon_title): """ Sanitizes the given canonical title for a quest and returns a url-friendly version. """ ident_title = canon_title.lower().replace(" ", "-") ident_title = urllib.parse.quote(ident_title) return ident_title init() if __name__ == "__main__": app.run(host='0.0.0.0', port=5050)