Overwrought/overwrought_server.py

107 lines
2.5 KiB
Python
Executable File

#!/usr/bin/env python3
"""
The overwrought server, for exchanging mods from master to slave recipients.
"""
import os
import json
import sqlite3
import threading
from passlib.hash import argon2
from flask import Flask, send_file, request, abort
import config_server
app = Flask(__name__)
app.config['db_lock'] = threading.Lock()
os.makedirs('mods', exist_ok=True)
def init_db():
"""
If no database is found, a dummy database is created to allow the
initial modpack upload to go smoothly.
"""
con = sqlite3.connect('modpack.db')
cur = con.cursor()
try:
cur.execute("SELECT filename FROM mod").fetchone()
except sqlite3.OperationalError:
cur.execute("CREATE TABLE mod(filename TEXT)")
con.commit()
con.close()
init_db()
@app.route('/')
def index():
"""
Main index page. Displays the mod summary page.
"""
return send_file(config_server.modpack_name + '.html')
@app.route('/get')
def get():
"""
Returns all mod file entries contained in the database as JSON.
Determing which files to update is left as an exercise up to the
client.
"""
app.config.get('db_lock').acquire()
con = sqlite3.connect('modpack.db')
cur = sqlite3.cursor()
mods = cur.execute('SELECT filename FROM mod').fetchall()
app.config.get('db_lock').release()
mods = [mod[0] for mod in mods]
return json.dumps(mods)
@app.route('/post', methods=['POST'])
def post():
"""
Allows the modpack owner to upload new mods and a new database to the
server. Owner must be validated.
"""
password = request.form.get('password')
if not argon2.verify(password, config_server.pw_hash):
abort(401)
db = request.files.get('database')
app.config.get('db_lock').acquire()
db.save('modpack.db')
app.config.get('db_lock').release()
summary = request.files.get(config_server.modpack_name + '.html')
summary.save(config_server.modpack_name + '.html')
for fname, file in request.files.items():
if not fname.endswith('.jar'):
continue
file.save(os.path.join(config_server.mods_dir, fname))
def generate_hash(password):
"""
A utility for generating an argon2 password hash.
"""
pw_hash = argon2.hash(password)
return pw_hash
if __name__ == "__main__":
import argparse
parser = argparse.ArgumentParser(
description="The overwrought server. Use gunicorn to start."
)
parser.add_argument(
'action',
choices=['hash'],
help="What action to perform.",
)
parser.add_argument(
'target',
help="The target to perform the action on."
)
args = parser.parse_args()
if args.action == "hash":
pw_hash = generate_hash(args.target)
print(pw_hash)