#!/usr/bin/env python3 """ Simple file host using Flask. """ import os import sqlite3 import secrets import string from passlib.hash import argon2 from flask import Flask, session, request, abort, redirect, url_for, g, \ render_template from werkzeug.utils import secure_filename class ReverseProxied(object): """ Wrap the application in this middleware and configure the front-end server to add these headers, to let you quietly bind this to a URL other than / and to an HTTP scheme that is different than what is used locally. In nginx: location /myprefix { proxy_pass http://192.168.0.1:5001; proxy_set_header Host $host; proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for; proxy_set_header X-Scheme $scheme; proxy_set_header X-Script-Name /myprefix; } :param app: the WSGI application """ def __init__(self, app): self.app = app def __call__(self, environ, start_response): script_name = environ.get('HTTP_X_SCRIPT_NAME', '') if script_name: environ['SCRIPT_NAME'] = script_name path_info = environ['PATH_INFO'] if path_info.startswith(script_name): environ['PATH_INFO'] = path_info[len(script_name):] scheme = environ.get('HTTP_X_SCHEME', '') if scheme: environ['wsgi.url_scheme'] = scheme return self.app(environ, start_response) app = Flask(__name__) app.wsgi_app = ReverseProxied(app.wsgi_app) app.config['MAX_CONTENT_LENGTH'] = 128 * 1024 * 1024 app.config["UPLOAD_DIR"] = "/usr/local/www/html/up" app.config["DB_NAME"] = "fileHost.db" def init(): """ Initializes the application. """ if not os.path.exists("secret_key"): print("Error: secret_key file not found.") print("If this is the first time the program is being run, run it \ without your WSGI host first.") os.exit() with open("secret_key", "rb") as file: secret_key = file.read() app.secret_key = secret_key if not os.path.exists(app.config.get("DB_NAME")): print("error: database not found.") print("If this is the first time the program is being run, run it \ without your WSGI host first.") os.exit() con = sqlite3.connect(app.config.get("DB_NAME")) db = con.cursor() return con, db def init_database(): """ Initializes appropriate tables in the database. """ db.execute("CREATE TABLE users(id INTEGER PRIMARY KEY, \ username TEXT, pw_hash TEXT, admin BOOL DEFAULT FALSE)") db.execute("CREATE TABLE uploads(filename TEXT, uploaded_by TEXT, \ uploaded_date INT DEFAULT (STRFTIME('%s', 'now')))") def add_user(username, password, admin="FALSE"): """ Adds a user to the database. """ u = db.execute("SELECT username FROM users WHERE username = ?", (username,)).fetchone() if u: return False pw_hash = argon2.hash(password) db.execute("INSERT INTO users (username, pw_hash, admin) VALUES (?,?,?)", (username, pw_hash, admin)) con.commit() return True def verify_password(username, password): """ Verifies a user's password. """ user = verify_username(username) if not user: return False _, _, pw_hash, admin = user if argon2.verify(password, pw_hash): g.user = username g.admin = admin == "TRUE" return True else: return False def verify_username(username): """ Checks to see if the given username is in the database. """ user = db.execute("SELECT * FROM users WHERE username = ?", (username,)).fetchone() if user: return user else: return False @app.route("/delete_file", methods=["POST"]) def delete_file(): """ Allows an admin to delete a file from the upload directory and the database. """ username = request.form.get("user") password = request.form.get("pass") filename = request.form.get("fname") if not verify_password(username, password): abort(401) if not g.admin: abort(401) try: os.remove(os.path.join(app.config.get("UPLOAD_DIR"), filename)) db.execute("DELETE FROM uploads WHERE filename = ?", (filename,)) db.commit() except FileNotFoundError: return "Error: File not found." return "Success" @app.route("/add_user", methods=["POST"]) def addUser(): """ Allows an admin to add a user via API POST. No frontend allowed. """ username = request.form.get("user") password = request.form.get("pass") new_username = request.form.get("new_user") new_password = request.form.get("new_pass") admin = request.form.get("admin") or "FALSE" if not verify_password(username, password): abort(401) if not g.admin: abort(401) res = add_user(new_username, new_password, admin) if res: return "Success" else: return "Username already exists." @app.route("/logout", methods=["POST", "GET"]) def logout(): """ Logs the user out and removes his session cookie. """ session.pop("username") return redirect(url_for("login")) @app.route("/change_password", methods=["POST", "GET"]) def change_password(): """ Allows the user to change their password. """ username = session.get("username") if not verify_username(username): abort(401) if request.method == "GET": return render_template("change_password.html") current_password = request.form.get("current_password") new_password = request.form.get("new_password") new_password_verify = request.form.get("new_password_verify") if not verify_password(username, current_password): return "The current password does not match!" if new_password != new_password_verify: return "The new passwords do not match!" pw_hash = argon2.hash(new_password) db.execute("UPDATE users SET pw_hash = ? WHERE username = ?", (pw_hash, username)) session.pop("username") return redirect(url_for("login")) @app.route("/login", methods=["POST", "GET"]) def login(): """ Logs the user in. """ if request.method == "GET": return render_template("login.html") username = request.form.get("user") password = request.form.get("pass") if verify_password(username, password): session["username"] = username return redirect(url_for("index")) else: abort(401) @app.route("/", methods=["POST", "GET"]) def index(): """ Saves the uploaded file and returns a URL pointing to it. """ if not session.get("username"): if request.method == "GET": return redirect(url_for("login")) username = request.form.get("user") password = request.form.get("pass") if not verify_password(username, password): abort(401) else: username = session.get("username") if not verify_username(username): abort(401) if request.method == "GET": return render_template("index.html") file = request.files.get('file') fname = secure_filename(file.filename) fdir = app.config.get("UPLOAD_DIR") pre = get_rand_chars(8) if request.form.get("randname") == "on": fname = pre + os.path.splitext(fname)[1] else: fname = pre + "_" + fname file.save(os.path.join(fdir, fname)) db.execute("INSERT INTO uploads (filename, uploaded_by) VALUES (?,?)", (fname, username)) con.commit() #TODO: make this not hardcoded # url = request.url_root + "up/" + fname url = "https://steelbea.me/up/" + fname return url def get_rand_chars(n): """ Returns n number of random characters. Character set includes lowercase and uppercase ascii letters and digits. """ chars = [] for _ in range(n): char = secrets.choice(string.ascii_letters + string.digits) chars.append(char) return "".join(chars) if __name__ == "__main__": os.makedirs(app.config.get("UPLOAD_DIR"), exist_ok=True) initDB = not os.path.exists(app.config.get("DB_NAME")) con = sqlite3.connect(app.config.get("DB_NAME")) db = con.cursor() if initDB: print("Initializing new database and admin account.") init_database() import sys add_user(sys.argv[1], sys.argv[2], "TRUE") if not os.path.exists("secret_key"): secret_key = os.urandom(64) with open("secret_key", "wb") as file: file.write(secret_key) else: with open("secret_key", "rb") as file: secret_key = file.read() app.secret_key = secret_key app.run(host='0.0.0.0', port=5000) else: con, db = init()