#!/usr/bin/env python3 """ Simple file host using Flask. """ import os import string import secrets import sqlite3 import threading from passlib.hash import argon2 from flask import Flask, session, request, abort, redirect, url_for, g, \ render_template from werkzeug.utils import secure_filename class ReverseProxied(object): """ Wrap the application in this middleware and configure the front-end server to add these headers, to let you quietly bind this to a URL other than / and to an HTTP scheme that is different than what is used locally. In nginx: location /myprefix { proxy_pass http://192.168.0.1:5001; proxy_set_header Host $host; proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for; proxy_set_header X-Scheme $scheme; proxy_set_header X-Script-Name /myprefix; } :param app: the WSGI application """ def __init__(self, app): self.app = app def __call__(self, environ, start_response): script_name = environ.get('HTTP_X_SCRIPT_NAME', '') if script_name: environ['SCRIPT_NAME'] = script_name path_info = environ['PATH_INFO'] if path_info.startswith(script_name): environ['PATH_INFO'] = path_info[len(script_name):] scheme = environ.get('HTTP_X_SCHEME', '') if scheme: environ['wsgi.url_scheme'] = scheme return self.app(environ, start_response) app = Flask(__name__) app.wsgi_app = ReverseProxied(app.wsgi_app) app.config['MAX_CONTENT_LENGTH'] = 128 * 1024 * 1024 app.config["UPLOAD_DIR"] = "/usr/local/www/html/up" app.config["DB_NAME"] = "fileHost.db" app.config["DB_LOCK"] = threading.Lock() def db_execute(*args, **kwargs): """ Opens a connection to the app's database and executes the SQL statements passed to this function. """ with sqlite3.connect(app.config.get("DB_NAME")) as con: app.config.get("DB_LOCK").acquire() cur = con.cursor() res = cur.execute(*args, **kwargs) app.config.get("DB_LOCK").release() return res def init(): """ Initializes the application. """ os.makedirs(app.config.get("UPLOAD_DIR"), exist_ok=True) if os.path.exists("secret_key"): with open("secret_key", "rb") as file: secret_key = file.read() else: secret_key = os.urandom(64) with open("secret_key", "wb") as file: file.write(secret_key) app.secret_key = secret_key try: db_execute("SELECT * FROM users").fetchone() db_execute("SELECT * FROM uploads").fetchone() except sqlite3.OperationalError: db_execute("CREATE TABLE users(" "id INTEGER PRIMARY KEY," "username TEXT," "pw_hash TEXT," "admin BOOL DEFAULT FALSE)") db_execute("CREATE TABLE uploads(" "filename TEXT, uploaded_by TEXT," "uploaded_date INT DEFAULT (STRFTIME('%s', 'now')))") def add_user(username, password, admin="FALSE"): """ Adds a user to the database. """ u = db_execute("SELECT username FROM users WHERE username = ?", (username,)).fetchone() if u: return False pw_hash = argon2.hash(password) db_execute("INSERT INTO users (username, pw_hash, admin) VALUES (?,?,?)", (username, pw_hash, admin)) return True def verify_password(username, password): """ Verifies a user's password. """ user = verify_username(username) if not user: return False _, _, pw_hash, admin = user if argon2.verify(password, pw_hash): g.user = username g.admin = admin == "TRUE" return True else: return False def verify_username(username): """ Checks to see if the given username is in the database. """ user = db_execute("SELECT * FROM users WHERE username = ?", (username,)).fetchone() if user: return user else: return False @app.route("/delete_file", methods=["POST"]) def delete_file(): """ Allows an admin to delete a file from the upload directory and the database. """ username = request.form.get("user") password = request.form.get("pass") filename = request.form.get("fname") if not verify_password(username, password): abort(401) if not g.admin: abort(401) try: os.remove(os.path.join(app.config.get("UPLOAD_DIR"), filename)) db.execute("DELETE FROM uploads WHERE filename = ?", (filename,)) con.commit() except FileNotFoundError: return "Error: File not found." return "Success" @app.route("/add_user", methods=["POST"]) def addUser(): """ Allows an admin to add a user via API POST. No frontend allowed. """ username = request.form.get("user") password = request.form.get("pass") new_username = request.form.get("new_user") new_password = request.form.get("new_pass") admin = request.form.get("admin") or "FALSE" if not verify_password(username, password): abort(401) if not g.admin: abort(401) res = add_user(new_username, new_password, admin) if res: return "Success" else: return "Username already exists." @app.route("/logout", methods=["POST", "GET"]) def logout(): """ Logs the user out and removes his session cookie. """ session.pop("username") return redirect(url_for("login")) @app.route("/change_password", methods=["POST", "GET"]) def change_password(): """ Allows the user to change their password. """ username = session.get("username") if not verify_username(username): abort(401) if request.method == "GET": return render_template("change_password.html") current_password = request.form.get("current_password") new_password = request.form.get("new_password") new_password_verify = request.form.get("new_password_verify") if not verify_password(username, current_password): return "The current password does not match!" if new_password != new_password_verify: return "The new passwords do not match!" pw_hash = argon2.hash(new_password) db_execute("UPDATE users SET pw_hash = ? WHERE username = ?", (pw_hash, username)) session.pop("username") return redirect(url_for("login")) @app.route("/login", methods=["POST", "GET"]) def login(): """ Logs the user in. """ if request.method == "GET": return render_template("login.html") username = request.form.get("user") password = request.form.get("pass") if verify_password(username, password): session["username"] = username return redirect(url_for("index")) else: abort(401) @app.route("/", methods=["POST", "GET"]) def index(): """ Saves the uploaded file and returns a URL pointing to it. """ if not session.get("username"): if request.method == "GET": return redirect(url_for("login")) username = request.form.get("user") password = request.form.get("pass") if not verify_password(username, password): abort(401) else: username = session.get("username") if not verify_username(username): abort(401) if request.method == "GET": return render_template("index.html") file = request.files.get('file') fname = secure_filename(file.filename) fdir = app.config.get("UPLOAD_DIR") pre = get_rand_chars(8) if request.form.get("randname") == "on": fname = pre + os.path.splitext(fname)[1] else: fname = pre + "_" + fname file.save(os.path.join(fdir, fname)) db_execute("INSERT INTO uploads (filename, uploaded_by) VALUES (?,?)", (fname, username)) #TODO: make this not hardcoded # url = request.url_root + "up/" + fname url = "https://steelbea.me/up/" + fname return url def get_rand_chars(n): """ Returns n number of random characters. Character set includes lowercase and uppercase ascii letters and digits. """ chars = [] for _ in range(n): char = secrets.choice(string.ascii_letters + string.digits) chars.append(char) return "".join(chars) init() # TODO: make these not global variables? if __name__ == "__main__": import sys add_user(sys.argv[1], sys.argv[2], "TRUE") app.run(host='0.0.0.0', port=5000)