fileHost/fileHost.py

321 lines
7.7 KiB
Python
Executable File

#!/usr/bin/env python3
"""
Simple file host using Flask.
"""
import os
import sqlite3
import secrets
import string
from passlib.hash import argon2
from flask import Flask, session, request, abort, redirect, url_for, g, \
render_template
from werkzeug.utils import secure_filename
class ReverseProxied(object):
"""
Wrap the application in this middleware and configure the
front-end server to add these headers, to let you quietly bind
this to a URL other than / and to an HTTP scheme that is
different than what is used locally.
In nginx:
location /myprefix {
proxy_pass http://192.168.0.1:5001;
proxy_set_header Host $host;
proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
proxy_set_header X-Scheme $scheme;
proxy_set_header X-Script-Name /myprefix;
}
:param app: the WSGI application
"""
def __init__(self, app):
self.app = app
def __call__(self, environ, start_response):
script_name = environ.get('HTTP_X_SCRIPT_NAME', '')
if script_name:
environ['SCRIPT_NAME'] = script_name
path_info = environ['PATH_INFO']
if path_info.startswith(script_name):
environ['PATH_INFO'] = path_info[len(script_name):]
scheme = environ.get('HTTP_X_SCHEME', '')
if scheme:
environ['wsgi.url_scheme'] = scheme
return self.app(environ, start_response)
app = Flask(__name__)
app.wsgi_app = ReverseProxied(app.wsgi_app)
app.config['MAX_CONTENT_LENGTH'] = 128 * 1024 * 1024
app.config["UPLOAD_DIR"] = "/usr/local/www/html/up"
app.config["DB_NAME"] = "fileHost.db"
def init():
"""
Initializes the application.
"""
if not os.path.exists("secret_key"):
print("Error: secret_key file not found.")
print("If this is the first time the program is being run, run it \
without your WSGI host first.")
os.exit()
with open("secret_key", "rb") as file:
secret_key = file.read()
app.secret_key = secret_key
if not os.path.exists(app.config.get("DB_NAME")):
print("error: database not found.")
print("If this is the first time the program is being run, run it \
without your WSGI host first.")
os.exit()
con = sqlite3.connect(app.config.get("DB_NAME"))
db = con.cursor()
return con, db
def init_database():
"""
Initializes appropriate tables in the database.
"""
db.execute("CREATE TABLE users(id INTEGER PRIMARY KEY, \
username TEXT, pw_hash TEXT, admin BOOL DEFAULT FALSE)")
db.execute("CREATE TABLE uploads(filename TEXT, uploaded_by TEXT, \
uploaded_date INT DEFAULT (STRFTIME('%s', 'now')))")
def add_user(username, password, admin="FALSE"):
"""
Adds a user to the database.
"""
u = db.execute("SELECT username FROM users WHERE username = ?",
(username,)).fetchone()
if u:
return False
pw_hash = argon2.hash(password)
db.execute("INSERT INTO users (username, pw_hash, admin) VALUES (?,?,?)",
(username, pw_hash, admin))
con.commit()
return True
def verify_password(username, password):
"""
Verifies a user's password.
"""
user = verify_username(username)
if not user:
return False
_, _, pw_hash, admin = user
if argon2.verify(password, pw_hash):
g.user = username
g.admin = admin == "TRUE"
return True
else:
return False
def verify_username(username):
"""
Checks to see if the given username is in the database.
"""
user = db.execute("SELECT * FROM users WHERE username = ?",
(username,)).fetchone()
if user:
return user
else:
return False
@app.route("/delete_file", methods=["POST"])
def delete_file():
"""
Allows an admin to delete a file from the upload directory and the database.
"""
username = request.form.get("user")
password = request.form.get("pass")
filename = request.form.get("fname")
if not verify_password(username, password):
abort(401)
if not g.admin:
abort(401)
try:
os.remove(os.path.join(app.config.get("UPLOAD_DIR"), filename))
db.execute("DELETE FROM uploads WHERE filename = ?", (filename,))
con.commit()
except FileNotFoundError:
return "Error: File not found."
return "Success"
@app.route("/add_user", methods=["POST"])
def addUser():
"""
Allows an admin to add a user via API POST. No frontend allowed.
"""
username = request.form.get("user")
password = request.form.get("pass")
new_username = request.form.get("new_user")
new_password = request.form.get("new_pass")
admin = request.form.get("admin") or "FALSE"
if not verify_password(username, password):
abort(401)
if not g.admin:
abort(401)
res = add_user(new_username, new_password, admin)
if res:
return "Success"
else:
return "Username already exists."
@app.route("/logout", methods=["POST", "GET"])
def logout():
"""
Logs the user out and removes his session cookie.
"""
session.pop("username")
return redirect(url_for("login"))
@app.route("/change_password", methods=["POST", "GET"])
def change_password():
"""
Allows the user to change their password.
"""
username = session.get("username")
if not verify_username(username):
abort(401)
if request.method == "GET":
return render_template("change_password.html")
current_password = request.form.get("current_password")
new_password = request.form.get("new_password")
new_password_verify = request.form.get("new_password_verify")
if not verify_password(username, current_password):
return "The current password does not match!"
if new_password != new_password_verify:
return "The new passwords do not match!"
pw_hash = argon2.hash(new_password)
db.execute("UPDATE users SET pw_hash = ? WHERE username = ?",
(pw_hash, username))
session.pop("username")
return redirect(url_for("login"))
@app.route("/login", methods=["POST", "GET"])
def login():
"""
Logs the user in.
"""
if request.method == "GET":
return render_template("login.html")
username = request.form.get("user")
password = request.form.get("pass")
if verify_password(username, password):
session["username"] = username
return redirect(url_for("index"))
else:
abort(401)
@app.route("/", methods=["POST", "GET"])
def index():
"""
Saves the uploaded file and returns a URL pointing to it.
"""
if not session.get("username"):
if request.method == "GET":
return redirect(url_for("login"))
username = request.form.get("user")
password = request.form.get("pass")
if not verify_password(username, password):
abort(401)
else:
username = session.get("username")
if not verify_username(username):
abort(401)
if request.method == "GET":
return render_template("index.html")
file = request.files.get('file')
fname = secure_filename(file.filename)
fdir = app.config.get("UPLOAD_DIR")
pre = get_rand_chars(8)
if request.form.get("randname") == "on":
fname = pre + os.path.splitext(fname)[1]
else:
fname = pre + "_" + fname
file.save(os.path.join(fdir, fname))
db.execute("INSERT INTO uploads (filename, uploaded_by) VALUES (?,?)",
(fname, username))
con.commit()
#TODO: make this not hardcoded
# url = request.url_root + "up/" + fname
url = "https://steelbea.me/up/" + fname
return url
def get_rand_chars(n):
"""
Returns n number of random characters. Character set includes lowercase
and uppercase ascii letters and digits.
"""
chars = []
for _ in range(n):
char = secrets.choice(string.ascii_letters + string.digits)
chars.append(char)
return "".join(chars)
if __name__ == "__main__":
os.makedirs(app.config.get("UPLOAD_DIR"), exist_ok=True)
initDB = not os.path.exists(app.config.get("DB_NAME"))
con = sqlite3.connect(app.config.get("DB_NAME"))
db = con.cursor()
if initDB:
print("Initializing new database and admin account.")
init_database()
import sys
add_user(sys.argv[1], sys.argv[2], "TRUE")
if not os.path.exists("secret_key"):
secret_key = os.urandom(64)
with open("secret_key", "wb") as file:
file.write(secret_key)
else:
with open("secret_key", "rb") as file:
secret_key = file.read()
app.secret_key = secret_key
app.run(host='0.0.0.0', port=5000)
else:
con, db = init()