239 lines
6.0 KiB
Python
Executable File
239 lines
6.0 KiB
Python
Executable File
#!/usr/bin/env python3
|
|
"""
|
|
Music streaming.
|
|
"""
|
|
import os
|
|
import json
|
|
import random
|
|
import subprocess
|
|
from urllib import parse
|
|
|
|
from flask import Flask, Response, render_template, send_file, url_for
|
|
from flask_restful import reqparse, abort, Api, Resource
|
|
import mutagen
|
|
|
|
MUSIC_DIR = "/mnt/music/Music"
|
|
MUSIC_EXT = ['flac', 'mp3', 'wav', 'm4a']
|
|
FFMPEG_CMD = [
|
|
'ffmpeg', '-y',
|
|
'-loglevel', 'panic',
|
|
'-i', '',
|
|
'-codec:a', 'libopus',
|
|
'-b:a', '64k',
|
|
'-f', 'opus',
|
|
'-'
|
|
]
|
|
|
|
class Track:
|
|
def __init__(self, filepath=None, d=None):
|
|
if d:
|
|
for attr, value in d.items():
|
|
setattr(self, attr, value)
|
|
return
|
|
|
|
if filepath.endswith("mp3"):
|
|
m = mutagen.mp3.EasyMP3(filepath)
|
|
else:
|
|
m = mutagen.File(filepath)
|
|
self.tracknumber = m.get('tracknumber', [''])[0]
|
|
self.title = m.get('title', [''])[0]
|
|
self.artist = m.get('artist', [''])[0]
|
|
self.album = m.get('album', [''])[0]
|
|
self.date = m.get('date', [''])[0]
|
|
self.length = str(int(m.info.length) // 60) + ":"
|
|
self.length += str(int(m.info.length) % 60)
|
|
self.filepath = filepath
|
|
|
|
|
|
def build_library(root_dir):
|
|
"""Walks the music directory and builds a library of tracks."""
|
|
print("Building library")
|
|
tracks = []
|
|
for dir_name, sub_dirs, files in os.walk(root_dir):
|
|
for file in files:
|
|
if not os.path.splitext(file)[1][1:] in MUSIC_EXT:
|
|
continue
|
|
filepath = os.path.join(root_dir, dir_name, file)
|
|
track = Track(filepath)
|
|
tracks.append(track)
|
|
print("Done")
|
|
return tracks
|
|
|
|
|
|
def init_library():
|
|
"""Loads the library from file, or builds a new one if one isn't found."""
|
|
if os.path.isfile("library.json"):
|
|
with open("library.json", "r") as file:
|
|
tracks = json.loads(file.read())
|
|
tracks = [Track(d=d) for d in tracks]
|
|
else:
|
|
tracks = build_library(MUSIC_DIR)
|
|
with open("library.json", "w") as file:
|
|
file.write(json.dumps([vars(t) for t in tracks]))
|
|
return tracks
|
|
|
|
|
|
app = Flask(__name__)
|
|
api = Api(app)
|
|
tracks = init_library()
|
|
|
|
@app.route('/')
|
|
def index():
|
|
"""Main index page."""
|
|
artists = list(set(t.artist for t in tracks))
|
|
artists.sort()
|
|
return render_template('index.html', **locals())
|
|
|
|
|
|
parser = reqparse.RequestParser()
|
|
parser.add_argument('artist')
|
|
parser.add_argument('album')
|
|
parser.add_argument('track')
|
|
|
|
def validate_select_args(args):
|
|
"""
|
|
If a track is specified, both artist and album must also be specified.
|
|
If an album is specified, the artist must also be specified.
|
|
"""
|
|
if args.get('track'):
|
|
if not args.get('artist') or not args.get('album'):
|
|
abort(400, message="Artist and album must also be specified.")
|
|
elif args.get('album'):
|
|
if not args.get('artist'):
|
|
abort(400, message="Artist must also be specified.")
|
|
elif not args.get('artist'):
|
|
abort(400, message="You must specify at least an artist.")
|
|
|
|
class Selection(Resource):
|
|
def get(self):
|
|
global tracks
|
|
args = parser.parse_args()
|
|
validate_select_args(args)
|
|
|
|
if args.get('track'):
|
|
for track in tracks:
|
|
if (track.artist == args.get('artist') and
|
|
track.album == args.get('album') and
|
|
track.title == args.get('track')):
|
|
break
|
|
else:
|
|
abort(404, message="Track does not exist.")
|
|
found = dict(vars(track))
|
|
found.pop('filepath')
|
|
found['streampath'] = url_for(
|
|
'stream',
|
|
artist=track.artist,
|
|
album=track.album,
|
|
track=track.title)
|
|
return found
|
|
|
|
elif args.get('album'):
|
|
found = []
|
|
for track in tracks:
|
|
if (track.artist == args.get('artist') and
|
|
track.album == args.get('album')):
|
|
found.append(track)
|
|
if not found:
|
|
abort(404, message="Album does not exist.")
|
|
found = [t.tracknumber + " - " + t.title for t in found]
|
|
found.sort()
|
|
return found
|
|
|
|
elif args.get('artist'):
|
|
found = []
|
|
for track in tracks:
|
|
if track.artist == args.get('artist'):
|
|
found.append(track)
|
|
if not found:
|
|
abort(404, message="Artist does not exist.")
|
|
found = list(set(t.album for t in found))
|
|
return sorted(found)
|
|
|
|
api.add_resource(Selection, '/select')
|
|
api.init_app(app)
|
|
|
|
|
|
@app.route('/stream/<artist>/<album>/<track>')
|
|
def stream(artist, album, track):
|
|
"""View for the raw audio file."""
|
|
for t in tracks:
|
|
if (t.artist == artist and
|
|
t.album == album and
|
|
t.title == track):
|
|
break
|
|
else:
|
|
abort(404, message="Track does not exist.")
|
|
|
|
FFMPEG_CMD[5] = t.filepath
|
|
|
|
def generate():
|
|
with subprocess.Popen(FFMPEG_CMD, stdout=subprocess.PIPE) as proc:
|
|
data = proc.stdout.read(1024)
|
|
while data:
|
|
yield data
|
|
data = proc.stdout.read(1024)
|
|
return Response(generate(), mimetype="audio/ogg")
|
|
|
|
|
|
@app.route('/get_dir/<path:directory>/')
|
|
@app.route('/get_dir/')
|
|
def get_dir(directory=""):
|
|
"""Returns the contents of the requested directory."""
|
|
directory = directory.replace("DOTDOT", "..")
|
|
directory = os.path.join(MUSIC_DIR, directory)
|
|
directory = os.path.abspath(directory)
|
|
if not directory.startswith(MUSIC_DIR):
|
|
return "False"
|
|
if not os.path.isdir(directory):
|
|
return "False"
|
|
|
|
nav_items = os.listdir(directory)
|
|
nav_items.sort()
|
|
if directory != MUSIC_DIR:
|
|
nav_items = [".."] + nav_items
|
|
|
|
nav_items_new = []
|
|
for item in nav_items:
|
|
if os.path.isdir(os.path.join(directory, item)):
|
|
item += '/'
|
|
nav_items_new.append(item)
|
|
nav_items_new = [directory.replace(MUSIC_DIR, '') + '/'] + nav_items_new
|
|
|
|
return json.dumps(nav_items_new)
|
|
|
|
|
|
@app.route('/get_shuffle')
|
|
def shuffle():
|
|
"""Returns a randomly selected track from the library."""
|
|
item = random.choice(os.listdir(MUSIC_DIR))
|
|
path = os.path.join(MUSIC_DIR, item)
|
|
n = 0
|
|
while not item.rpartition('.')[2] in MUSIC_EXT:
|
|
n += 1
|
|
item = random.choice(os.listdir(path))
|
|
if os.path.isdir(os.path.join(path, item)):
|
|
path = os.path.join(path, item)
|
|
if n == 5:
|
|
item = random.choice(os.listdir(MUSIC_DIR))
|
|
path = os.path.join(MUSIC_DIR, item)
|
|
n = 0
|
|
path = os.path.join(path, item)
|
|
return path.replace(MUSIC_DIR, '')
|
|
|
|
|
|
@app.route('/album_cover/<path:cover>')
|
|
def album_cover(cover):
|
|
"""View for the raw audio file."""
|
|
path = os.path.join(MUSIC_DIR, cover)
|
|
path = os.path.abspath(path)
|
|
if not path.startswith(MUSIC_DIR):
|
|
return "False"
|
|
if not os.path.isfile(path):
|
|
return "False"
|
|
|
|
return send_file(path)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
app.run(host='0.0.0.0', port=5150)
|