425 lines
11 KiB
Python
Executable File
425 lines
11 KiB
Python
Executable File
#!/usr/bin/env python3
|
|
"""
|
|
Simple file host using Flask.
|
|
"""
|
|
import os
|
|
import time
|
|
import string
|
|
import secrets
|
|
import sqlite3
|
|
import threading
|
|
from datetime import datetime
|
|
|
|
from passlib.hash import argon2
|
|
from flask import Flask, session, request, abort, redirect, url_for, g, \
|
|
render_template
|
|
from werkzeug.utils import secure_filename
|
|
from flask_paranoid import Paranoid
|
|
|
|
class ReverseProxied(object):
|
|
"""
|
|
Wrap the application in this middleware and configure the
|
|
front-end server to add these headers, to let you quietly bind
|
|
this to a URL other than / and to an HTTP scheme that is
|
|
different than what is used locally.
|
|
|
|
In nginx:
|
|
location /myprefix {
|
|
proxy_pass http://192.168.0.1:5001;
|
|
proxy_set_header Host $host;
|
|
proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
|
|
proxy_set_header X-Scheme $scheme;
|
|
proxy_set_header X-Script-Name /myprefix;
|
|
}
|
|
|
|
:param app: the WSGI application
|
|
"""
|
|
def __init__(self, app):
|
|
self.app = app
|
|
|
|
def __call__(self, environ, start_response):
|
|
script_name = environ.get('HTTP_X_SCRIPT_NAME', '')
|
|
if script_name:
|
|
environ['SCRIPT_NAME'] = script_name
|
|
path_info = environ['PATH_INFO']
|
|
if path_info.startswith(script_name):
|
|
environ['PATH_INFO'] = path_info[len(script_name):]
|
|
|
|
scheme = environ.get('HTTP_X_SCHEME', '')
|
|
if scheme:
|
|
environ['wsgi.url_scheme'] = scheme
|
|
return self.app(environ, start_response)
|
|
|
|
|
|
class CronThread(threading.Thread):
|
|
def __init__(self):
|
|
threading.Thread.__init__(self)
|
|
self.stop = threading.Event()
|
|
|
|
def run(self):
|
|
while not self.stop.is_set():
|
|
records = db_execute(
|
|
"SELECT filename, delete_date FROM uploads WHERE delete_date"
|
|
).fetchall()
|
|
for filename, delete_date in records:
|
|
if time.time() >= delete_date:
|
|
delete_file(filename)
|
|
time.sleep(60)
|
|
|
|
|
|
app = Flask(__name__)
|
|
app.wsgi_app = ReverseProxied(app.wsgi_app)
|
|
app.config['MAX_CONTENT_LENGTH'] = 128 * 1024 * 1024
|
|
app.config["UPLOAD_DIR"] = "/usr/local/www/html/up"
|
|
app.config["UPLOAD_URL"] = "https://steelbea.me/up/"
|
|
app.config["DB_NAME"] = "fileHost.db"
|
|
app.config["DB_LOCK"] = threading.Lock()
|
|
|
|
def db_execute(*args, **kwargs):
|
|
"""
|
|
Opens a connection to the app's database and executes the SQL statements
|
|
passed to this function.
|
|
"""
|
|
with sqlite3.connect(app.config.get("DB_NAME")) as con:
|
|
app.config.get("DB_LOCK").acquire()
|
|
cur = con.cursor()
|
|
res = cur.execute(*args, **kwargs)
|
|
app.config.get("DB_LOCK").release()
|
|
return res
|
|
|
|
|
|
def init():
|
|
"""
|
|
Initializes the application.
|
|
"""
|
|
os.makedirs(app.config.get("UPLOAD_DIR"), exist_ok=True)
|
|
|
|
# init secret key
|
|
if os.path.exists("secret_key"):
|
|
with open("secret_key", "rb") as file:
|
|
secret_key = file.read()
|
|
else:
|
|
secret_key = os.urandom(64)
|
|
with open("secret_key", "wb") as file:
|
|
file.write(secret_key)
|
|
app.secret_key = secret_key
|
|
|
|
# init db
|
|
try:
|
|
db_execute("SELECT * FROM users").fetchone()
|
|
db_execute("SELECT * FROM uploads").fetchone()
|
|
except sqlite3.OperationalError:
|
|
db_execute("CREATE TABLE users("
|
|
"id INTEGER PRIMARY KEY,"
|
|
"username TEXT,"
|
|
"pw_hash TEXT,"
|
|
"admin BOOL DEFAULT FALSE,"
|
|
"token)")
|
|
|
|
db_execute("CREATE TABLE uploads("
|
|
"filename TEXT,"
|
|
"uploaded_by TEXT,"
|
|
"uploaded_date INTEGER DEFAULT (STRFTIME('%s', 'now')),"
|
|
"delete_date INTEGER)")
|
|
|
|
# init cron thread
|
|
t = CronThread()
|
|
t.start()
|
|
app.config["CRON_THREAD"] = t
|
|
|
|
# init paranoid
|
|
app.config["paranoid"] = Paranoid(app)
|
|
app.config["paranoid"].redirect_view = 'login'
|
|
|
|
|
|
def add_user(username, password, admin="FALSE"):
|
|
"""
|
|
Adds a user to the database.
|
|
"""
|
|
u = db_execute("SELECT username FROM users WHERE username = ?",
|
|
(username,)).fetchone()
|
|
if u:
|
|
return False
|
|
|
|
pw_hash = argon2.hash(password)
|
|
db_execute("INSERT INTO users (username, pw_hash, admin) VALUES (?,?,?)",
|
|
(username, pw_hash, admin))
|
|
return True
|
|
|
|
|
|
def verify_password(username, password):
|
|
"""
|
|
Verifies a user's password.
|
|
"""
|
|
user = verify_username(username)
|
|
if not user:
|
|
return False
|
|
|
|
_, _, pw_hash, admin = user
|
|
|
|
if argon2.verify(password, pw_hash):
|
|
g.user = username
|
|
g.admin = admin == "TRUE"
|
|
return True
|
|
else:
|
|
return False
|
|
|
|
|
|
def verify_username(username):
|
|
"""
|
|
Checks to see if the given username is in the database.
|
|
"""
|
|
user = db_execute("SELECT * FROM users WHERE username = ?",
|
|
(username,)).fetchone()
|
|
if user:
|
|
return user
|
|
else:
|
|
return False
|
|
|
|
|
|
def delete_file(filename):
|
|
"""
|
|
Deletes a file from the upload directory and from the database.
|
|
"""
|
|
try:
|
|
os.remove(os.path.join(app.config.get("UPLOAD_DIR"), filename))
|
|
db_execute("DELETE FROM uploads WHERE filename = ?", (filename,))
|
|
except FileNotFoundError:
|
|
return False
|
|
return True
|
|
|
|
|
|
|
|
@app.route("/delete_file", methods=["POST"])
|
|
def deleteFile():
|
|
"""
|
|
Allows a user to delete a file from the upload directory and the database.
|
|
"""
|
|
username = session.get("username")
|
|
filename = request.form.get("fname")
|
|
|
|
if not verify_username(username):
|
|
abort(401)
|
|
if not g.admin:
|
|
uploader = db_execute(
|
|
"SELECT uploaded_by FROM uploads WHERE filename=?",
|
|
(filename,)).fetchone()[0]
|
|
if uploader != username:
|
|
abort(401)
|
|
|
|
res = delete_file(filename)
|
|
if res:
|
|
return "Success"
|
|
else:
|
|
return "Error: File not found."
|
|
|
|
|
|
@app.route("/add_user", methods=["POST"])
|
|
def addUser():
|
|
"""
|
|
Allows an admin to add a user via API POST. No frontend allowed.
|
|
"""
|
|
username = request.form.get("user")
|
|
password = request.form.get("pass")
|
|
new_username = request.form.get("new_user")
|
|
new_password = request.form.get("new_pass")
|
|
admin = request.form.get("admin") or "FALSE"
|
|
|
|
if not verify_password(username, password):
|
|
abort(401)
|
|
if not g.admin:
|
|
abort(401)
|
|
|
|
res = add_user(new_username, new_password, admin)
|
|
if res:
|
|
return "Success"
|
|
else:
|
|
return "Username already exists."
|
|
|
|
|
|
@app.route("/logout", methods=["POST", "GET"])
|
|
def logout():
|
|
"""
|
|
Logs the user out and removes his session cookie.
|
|
"""
|
|
session.pop("username")
|
|
return redirect(url_for("login"))
|
|
|
|
|
|
@app.route("/change_password", methods=["POST", "GET"])
|
|
def change_password():
|
|
"""
|
|
Allows the user to change their password.
|
|
"""
|
|
username = session.get("username")
|
|
if not verify_username(username):
|
|
abort(401)
|
|
|
|
if request.method == "GET":
|
|
return render_template("change_password.html")
|
|
|
|
current_password = request.form.get("current_password")
|
|
new_password = request.form.get("new_password")
|
|
new_password_verify = request.form.get("new_password_verify")
|
|
|
|
if not verify_password(username, current_password):
|
|
return "The current password does not match!"
|
|
|
|
if new_password != new_password_verify:
|
|
return "The new passwords do not match!"
|
|
|
|
pw_hash = argon2.hash(new_password)
|
|
db_execute("UPDATE users SET pw_hash = ? WHERE username = ?",
|
|
(pw_hash, username))
|
|
session.pop("username")
|
|
return redirect(url_for("login"))
|
|
|
|
|
|
@app.route("/login", methods=["POST", "GET"])
|
|
def login():
|
|
"""
|
|
Logs the user in.
|
|
"""
|
|
if request.method == "GET":
|
|
return render_template("login.html")
|
|
|
|
username = request.form.get("user")
|
|
password = request.form.get("pass")
|
|
|
|
if verify_password(username, password):
|
|
session["username"] = username
|
|
return redirect(url_for("index"))
|
|
else:
|
|
abort(401)
|
|
|
|
|
|
@app.route("/manage_uploads", methods=["POST", "GET"])
|
|
def manage_uploads():
|
|
"""
|
|
Allows the user to view and/or delete uploads they've made.
|
|
"""
|
|
username = session.get("username")
|
|
if not verify_username(username):
|
|
abort(401)
|
|
|
|
if request.method == "GET":
|
|
uploads = db_execute(
|
|
"SELECT filename, uploaded_date FROM uploads WHERE uploaded_by = ?",
|
|
(username,)).fetchall()
|
|
|
|
new_uploads = []
|
|
for file, date in uploads:
|
|
date = datetime.fromtimestamp(date).strftime("%Y-%m-%d %H:%M")
|
|
new_uploads.append((file, date))
|
|
|
|
return render_template("manage_uploads.html", uploads=new_uploads,
|
|
upload_url=app.config.get("UPLOAD_URL"))
|
|
|
|
deletes = [fname for fname,_ in request.form.items()]
|
|
deletes.remove("submit")
|
|
for filename in deletes:
|
|
uploader = db_execute(
|
|
"SELECT uploaded_by FROM uploads WHERE filename=?",
|
|
(filename,)).fetchone()[0]
|
|
if uploader != username:
|
|
abort(401)
|
|
delete_file(filename)
|
|
return redirect(url_for("manage_uploads"))
|
|
|
|
|
|
@app.route("/gallery/<path:username>", methods=["GET"])
|
|
def gallery(username):
|
|
"""
|
|
Displays a publicly accessable gallery of files the user has uploaded.
|
|
"""
|
|
if not verify_username(username):
|
|
return "User not found, or user has gallery disabled."
|
|
uploads = db_execute(
|
|
"SELECT filename, uploaded_date FROM uploads WHERE uploaded_by = ?",
|
|
(username,)).fetchall()
|
|
new_uploads = []
|
|
for file, date in uploads:
|
|
date = datetime.fromtimestamp(date).strftime("%Y-%m-%d %H:%M")
|
|
new_uploads.append((file, date))
|
|
return render_template("gallery.html", uploads=new_uploads, user=username,
|
|
upload_url=app.config.get("UPLOAD_URL"))
|
|
|
|
|
|
@app.route("/", methods=["POST", "GET"])
|
|
def index():
|
|
"""
|
|
Saves the uploaded file and returns a URL pointing to it.
|
|
"""
|
|
if not session.get("username"):
|
|
if request.method == "GET":
|
|
return redirect(url_for("login"))
|
|
|
|
username = request.form.get("user")
|
|
password = request.form.get("pass")
|
|
if not verify_password(username, password):
|
|
abort(401)
|
|
else:
|
|
username = session.get("username")
|
|
if not verify_username(username):
|
|
abort(401)
|
|
|
|
if request.method == "GET":
|
|
return render_template("index.html")
|
|
|
|
urls = []
|
|
for file in request.files.getlist("file"):
|
|
fname = secure_filename(file.filename)
|
|
pre = get_rand_chars(8)
|
|
fdir = app.config.get("UPLOAD_DIR")
|
|
|
|
if request.form.get("randname") == "on":
|
|
fname = pre + os.path.splitext(fname)[1]
|
|
else:
|
|
fname = pre + "_" + fname
|
|
|
|
if request.form.get("delflag") == "on":
|
|
try:
|
|
delete_time = int(request.form.get("delnum"))
|
|
assert delete_time >= 1 and delete_time <= 59
|
|
except (ValueError, AssertionError):
|
|
return 'Invalid entry: "delnum=' + request.form.get("delnum") + '"'
|
|
del_dict = {"minute": 60, "hour": 3600, "day": 3600*24, "week": 3600*24*7}
|
|
try:
|
|
delete_time *= del_dict[request.form.get("deltype")]
|
|
except KeyError:
|
|
return 'Invalid entry: "deltype=' + request.form.get("deltype")+'"'
|
|
delete_time = int(time.time()) + delete_time
|
|
|
|
db_execute(
|
|
"INSERT INTO UPLOADS (filename, uploaded_by, delete_date)"
|
|
"VALUES (?,?,?)", (fname, username, delete_time))
|
|
else:
|
|
db_execute("INSERT INTO uploads (filename, uploaded_by) VALUES (?,?)",
|
|
(fname, username))
|
|
|
|
file.save(os.path.join(fdir, fname))
|
|
url = app.config.get("UPLOAD_URL") + fname
|
|
urls.append(url)
|
|
return "<br />".join(urls)
|
|
|
|
|
|
def get_rand_chars(n):
|
|
"""
|
|
Returns n number of random characters. Character set includes lowercase
|
|
and uppercase ascii letters and digits.
|
|
"""
|
|
chars = []
|
|
for _ in range(n):
|
|
char = secrets.choice(string.ascii_letters + string.digits)
|
|
chars.append(char)
|
|
return "".join(chars)
|
|
|
|
|
|
init()
|
|
if __name__ == "__main__":
|
|
import sys
|
|
if len(sys.argv) > 1:
|
|
add_user(sys.argv[1], sys.argv[2], "TRUE")
|
|
|
|
app.run(host='0.0.0.0', port=5000)
|