fileHost/fileHost.py

481 lines
12 KiB
Python
Executable File

#!/usr/bin/env python3
"""
Simple file host using Flask.
"""
import os
import re
import time
import atexit
import string
import secrets
import sqlite3
import tempfile
import functools
import threading
from datetime import datetime
from passlib.hash import argon2
from flask import Flask, session, request, abort, redirect, url_for, g, \
render_template
from werkzeug.utils import secure_filename
from werkzeug.datastructures import FileStorage
from flask_paranoid import Paranoid
from apscheduler.schedulers.background import BackgroundScheduler
import requests
import buckler_flask
app = Flask(__name__)
app.session_interface = buckler_flask.BucklerSessionInterface()
app.before_request(buckler_flask.require_auth)
app.config['MAX_CONTENT_LENGTH'] = 128 * 1024 * 1024
app.config["UPLOAD_DIR"] = "/var/www/html/up"
app.config["UPLOAD_URL"] = "https://steelbea.me/up/"
app.config["DB_NAME"] = "fileHost.db"
app.config["DB_LOCK"] = threading.Lock()
paranoid = Paranoid(app)
paranoid.redirect_view = 'login'
scheduler = BackgroundScheduler(timezone="America/New_York")
scheduler.start()
def db_execute(*args, **kwargs):
"""
Opens a connection to the app's database and executes the SQL statements
passed to this function.
"""
with sqlite3.connect(app.config.get("DB_NAME")) as con:
app.config.get("DB_LOCK").acquire()
cur = con.cursor()
res = cur.execute(*args, **kwargs)
app.config.get("DB_LOCK").release()
return res
@scheduler.scheduled_job("interval", minutes=1)
def delete_this():
"""
Removes files that are past the expiration date.
"""
records = db_execute(
"SELECT filename, delete_date FROM uploads WHERE delete_date"
).fetchall()
for filename, delete_date in records:
if time.time() >= delete_date:
delete_file(filename)
def init():
"""
Initializes the application.
"""
os.makedirs(app.config.get("UPLOAD_DIR"), exist_ok=True)
# init secret key
if os.path.exists("secret_key"):
with open("secret_key", "rb") as file:
secret_key = file.read()
else:
secret_key = os.urandom(64)
with open("secret_key", "wb") as file:
file.write(secret_key)
app.secret_key = secret_key
# init db
try:
db_execute("SELECT * FROM users").fetchone()
db_execute("SELECT * FROM uploads").fetchone()
except sqlite3.OperationalError:
db_execute("CREATE TABLE users("
"id INTEGER PRIMARY KEY,"
"username TEXT,"
"pw_hash TEXT,"
"admin BOOL DEFAULT FALSE)")
db_execute("CREATE TABLE uploads("
"filename TEXT,"
"uploaded_by TEXT,"
"uploaded_date INTEGER DEFAULT (STRFTIME('%s', 'now')),"
"delete_date INTEGER)")
def add_user(username, password, admin="FALSE"):
"""
Adds a user to the database.
"""
u = db_execute("SELECT username FROM users WHERE username = ?",
(username,)).fetchone()
if u:
return False
pw_hash = argon2.hash(password)
db_execute("INSERT INTO users (username, pw_hash, admin) VALUES (?,?,?)",
(username, pw_hash, admin))
return True
def verify_password(username, password):
"""
Verifies a user's password.
"""
user = verify_username(username)
if not user:
return False
_, _, pw_hash, admin = user
if argon2.verify(password, pw_hash):
g.user = username
g.admin = admin == "TRUE"
return True
else:
return False
def verify_username(username):
"""
Checks to see if the given username is in the database.
"""
user = db_execute("SELECT * FROM users WHERE username = ?",
(username,)).fetchone()
if user:
return user
else:
return False
def delete_file(filename):
"""
Deletes a file from the upload directory and from the database.
"""
try:
os.remove(os.path.join(app.config.get("UPLOAD_DIR"), filename))
db_execute("DELETE FROM uploads WHERE filename = ?", (filename,))
except FileNotFoundError:
return False
return True
def login_required(url=None):
"""
A decorator function to protect certain endpoints by requiring the user
to either pass a valid session cookie, or pass their username and
password along with the request to login.
"""
def actual_decorator(func):
@functools.wraps(func)
def _nop(*args, **kwargs):
username = session.meta['username']
if verify_username(username):
return func(*args, **kwargs)
username = request.form.get("user")
password = request.form.get("pass")
if verify_password(username, password):
session["username"] = username
return func(*args, **kwargs)
if url:
return redirect(url_for(url))
else:
abort(401)
return _nop
return actual_decorator
@app.route("/delete_file", methods=["POST"])
def deleteFile():
"""
Allows a user to delete a file from the upload directory and the database.
"""
username = session.meta['username']
filename = request.form.get("fname")
if not verify_username(username):
abort(401)
if not g.admin:
uploader = db_execute(
"SELECT uploaded_by FROM uploads WHERE filename=?",
(filename,)).fetchone()[0]
if uploader != username:
abort(401)
res = delete_file(filename)
if res:
return "Success"
else:
return "Error: File not found."
@app.route("/add_user", methods=["POST"])
def addUser():
"""
Allows an admin to add a user via API POST. No frontend allowed.
"""
username = request.form.get("user")
password = request.form.get("pass")
new_username = request.form.get("new_user")
new_password = request.form.get("new_pass")
admin = request.form.get("admin") or "FALSE"
if not verify_password(username, password):
abort(401)
if not g.admin:
abort(401)
res = add_user(new_username, new_password, admin)
if res:
return "Success"
else:
return "Username already exists."
@app.route("/logout", methods=["GET"])
def logout():
"""
Logs the user out and removes his session cookie.
"""
session.pop("username")
return redirect(url_for("login"))
@app.route("/change_password", methods=["POST", "GET"])
#@login_required()
def change_password():
"""
Allows the user to change their password.
"""
if request.method == "GET":
return render_template("change_password.html")
username = session.meta['username']
current_password = request.form.get("current_password")
new_password = request.form.get("new_password")
new_password_verify = request.form.get("new_password_verify")
if not verify_password(username, current_password):
return "The current password does not match!"
if new_password != new_password_verify:
return "The new passwords do not match!"
pw_hash = argon2.hash(new_password)
db_execute("UPDATE users SET pw_hash = ? WHERE username = ?",
(pw_hash, username))
session.pop("username")
return redirect(url_for("login"))
@app.route("/login", methods=["POST", "GET"])
def login():
"""
Logs the user in.
"""
if request.method == "GET":
return render_template("login.html")
username = request.form.get("user")
password = request.form.get("pass")
if verify_password(username, password):
session["username"] = username
return redirect(url_for("index"))
else:
abort(401)
@app.route("/manage_uploads", methods=["POST", "GET"])
#@login_required()
def manage_uploads():
"""
Allows the user to view and/or delete uploads they've made.
"""
username = session.meta['username']
if request.method == "GET":
uploads = db_execute(
"SELECT filename, uploaded_date FROM uploads WHERE uploaded_by = ?",
(username,)).fetchall()
new_uploads = []
for file, date in uploads:
date = datetime.fromtimestamp(date).strftime("%Y-%m-%d %H:%M")
new_uploads.append((file, date))
return render_template("manage_uploads.html", uploads=new_uploads,
upload_url=app.config.get("UPLOAD_URL"))
deletes = [fname for fname,_ in request.form.items()]
deletes.remove("submit")
for filename in deletes:
uploader = db_execute(
"SELECT uploaded_by FROM uploads WHERE filename=?",
(filename,)).fetchone()[0]
if uploader != username:
abort(401)
delete_file(filename)
return redirect(url_for("manage_uploads"))
@app.route("/gallery/<path:username>", methods=["GET"])
def gallery(username):
"""
Displays a publicly accessable gallery of files the user has uploaded.
"""
if not verify_username(username):
return "User not found, or user has gallery disabled."
uploads = db_execute(
"SELECT filename, uploaded_date FROM uploads WHERE uploaded_by = ?",
(username,)).fetchall()
new_uploads = []
for file, date in uploads:
date = datetime.fromtimestamp(date).strftime("%Y-%m-%d %H:%M")
new_uploads.append((file, date))
return render_template("gallery.html", uploads=new_uploads, user=username,
upload_url=app.config.get("UPLOAD_URL"))
@app.route("/upload", methods=["POST"])
#@login_required("login")
def upload():
"""
Saves the uploaded files and returns URLs pointing to them.
"""
username = session.meta['username']
if request.form.get("url"):
try:
f = [download_file(request.form.get("url"))]
except ValueError as e:
return str(e)
if request.form.get("text"):
try:
f = [text_paste(request.form.get("text"))]
except Exception as e:
return str(e)
else:
f = []
urls = []
for file in request.files.getlist("files") + f:
fname = secure_filename(file.filename)
pre = get_rand_chars(8)
fdir = app.config.get("UPLOAD_DIR")
if request.form.get("randname") == "on" or request.form.get("text"):
fname = pre + os.path.splitext(fname)[1]
if request.form.get("text"):
fname += ".txt"
else:
fname = pre + "_" + fname
if request.form.get("delflag") == "on":
try:
delete_time = int(request.form.get("delnum"))
assert delete_time >= 1 and delete_time <= 59
except (ValueError, AssertionError):
return 'Invalid entry: "delnum=' + request.form.get("delnum") + '"'
del_dict = {"minute": 60, "hour": 3600, "day": 3600*24, "week": 3600*24*7}
try:
delete_time *= del_dict[request.form.get("deltype")]
except KeyError:
return 'Invalid entry: "deltype=' + request.form.get("deltype")+'"'
delete_time = int(time.time()) + delete_time
db_execute(
"INSERT INTO UPLOADS (filename, uploaded_by, delete_date)"
"VALUES (?,?,?)", (fname, username, delete_time))
else:
db_execute("INSERT INTO uploads (filename, uploaded_by) VALUES (?,?)",
(fname, username))
file.save(os.path.join(fdir, fname))
url = app.config.get("UPLOAD_URL") + fname
urls.append(url)
if request.form.get("html"):
return render_template("result.html", urls=urls)
else:
return "\n".join(urls)
@app.route("/", methods=["GET", "POST"])
#@login_required("login")
def index():
"""
Saves the uploaded file and returns a URL pointing to it.
"""
if request.method == "GET":
return render_template("index.html")
else:
# so I can point my cli script to /upload/ instead of /upload/upload/
return upload()
def get_rand_chars(n):
"""
Returns n number of random characters. Character set includes lowercase
and uppercase ascii letters and digits.
"""
chars = []
for _ in range(n):
char = secrets.choice(string.ascii_letters + string.digits)
chars.append(char)
return "".join(chars)
def download_file(url):
"""
Downloads the file at the given url while observing file size and
timeout limitations.
"""
timeout = 10
requests_kwargs = {
'stream': True,
'headers': {'User-Agent': "Steelbea.me LTD needs YOUR files."},
'timeout': timeout,
'verify': True
}
t = tempfile.TemporaryFile()
with requests.get(url, **requests_kwargs) as r:
size = 0
start_time = time.time()
for chunk in r.iter_content(102400):
if time.time() - start_time > timeout:
raise ValueError('timeout reached')
size += len(chunk)
if size > app.config['MAX_CONTENT_LENGTH']:
raise ValueError('response too large')
t.write(chunk)
if r.headers.get('Content-Disposition'):
fname = re.search(r'filename="(.+)"',
r.headers['content-disposition'])
else:
fname = os.path.basename(url)
t.seek(0)
f = FileStorage(stream=t, filename=fname, name='url')
return f
def text_paste(text):
"""
Simple wrapper to handle a text paste upload.
"""
t = tempfile.TemporaryFile()
t.write(bytes(text, encoding='utf8'))
t.seek(0)
f = FileStorage(stream=t, filename="", name='text')
return f
init()
atexit.register(scheduler.shutdown)
if __name__ == "__main__":
import sys
if len(sys.argv) > 1:
add_user(sys.argv[1], sys.argv[2], "TRUE")
app.run(host='0.0.0.0', port=5000)