195 lines
4.8 KiB
Python
195 lines
4.8 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
API for interacting with the Musik database.
|
|
"""
|
|
import os
|
|
import sqlite3
|
|
import multiprocessing
|
|
|
|
import mutagen
|
|
import mutagen.mp3
|
|
|
|
MUSIC_DIR = "/home/iou1name/music/Music"
|
|
MUSIC_EXT = ['flac', 'mp3', 'wav', 'm4a']
|
|
DB_LOCK = multiprocessing.Lock()
|
|
|
|
def db_execute(*args, **kwargs):
|
|
"""
|
|
Opens a connection to the app's database and executes the SQL statements
|
|
passed to this function.
|
|
"""
|
|
with sqlite3.connect('musik.db') as con:
|
|
DB_LOCK.acquire()
|
|
con.row_factory = sqlite3.Row
|
|
cur = con.cursor()
|
|
res = cur.execute(*args, **kwargs)
|
|
DB_LOCK.release()
|
|
return res
|
|
|
|
def db_execute_many(*args, **kwargs):
|
|
"""
|
|
Same as `db_executei()` except with `cur.executemany()`.
|
|
"""
|
|
with sqlite3.connect('musik.db') as con:
|
|
DB_LOCK.acquire()
|
|
con.row_factory = sqlite3.Row
|
|
cur = con.cursor()
|
|
res = cur.executemany(*args, **kwargs)
|
|
DB_LOCK.release()
|
|
return res
|
|
|
|
def init_database():
|
|
"""
|
|
Initializes the database.
|
|
"""
|
|
try:
|
|
db_execute("SELECT * FROM tracks LIMIT 1").fetchone()
|
|
except sqlite3.OperationalError:
|
|
DB_LOCK.release()
|
|
db_execute("CREATE TABLE tracks("
|
|
"filepath TEXT PRIMARY KEY,"
|
|
"artist TEXT,"
|
|
"albumartist TEXT,"
|
|
"date TEXT,"
|
|
"album TEXT,"
|
|
"discnumber TEXT,"
|
|
"tracknumber TEXT,"
|
|
"title TEXT,"
|
|
"genre TEXT,"
|
|
"length TEXT,"
|
|
"last_modified INTEGER"
|
|
")"
|
|
)
|
|
build_library(MUSIC_DIR)
|
|
|
|
|
|
def build_library(root_dir):
|
|
"""Walks the music directory and builds a library of tracks."""
|
|
print("Building library")
|
|
filepaths = []
|
|
for dir_name, sub_dirs, files in os.walk(root_dir):
|
|
for file in files:
|
|
if not os.path.splitext(file)[1][1:] in MUSIC_EXT:
|
|
continue
|
|
filepath = os.path.join(root_dir, dir_name, file)
|
|
last_modified = os.path.getmtime(filepath)
|
|
filepaths.append((filepath, last_modified))
|
|
tracks_prev = db_execute("SELECT * FROM tracks")
|
|
tracks_prev = {track['filepath']: track for track in tracks_prev}
|
|
|
|
global worker
|
|
def worker(tup):
|
|
"""Worker for multi-processing tracks."""
|
|
filepath, last_modified = tup
|
|
track_prev = tracks_prev.get(filepath)
|
|
if track_prev:
|
|
if track_prev['last_modified'] >= last_modified:
|
|
return tuple(track_prev)
|
|
data = read_track(filepath)
|
|
return data
|
|
|
|
with multiprocessing.Pool() as pool:
|
|
mapping = pool.imap(worker, filepaths)
|
|
tracks = []
|
|
prev_percent = 0
|
|
while True:
|
|
try:
|
|
tracks.append(mapping.next())
|
|
except StopIteration:
|
|
break
|
|
percent = round(len(tracks) / len(filepaths) * 100, 2)
|
|
if percent >= prev_percent + 2.5:
|
|
print(f"{percent}%")
|
|
prev_percent = percent
|
|
db_execute("DELETE FROM tracks")
|
|
db_execute_many(
|
|
"INSERT INTO tracks VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)",
|
|
tracks
|
|
)
|
|
print("Done")
|
|
|
|
|
|
def read_track(filepath):
|
|
"""
|
|
Reads the specified file and extracts relevant information from it.
|
|
Returns a tuple.
|
|
"""
|
|
if filepath.endswith("mp3"):
|
|
m = mutagen.mp3.EasyMP3(filepath)
|
|
else:
|
|
m = mutagen.File(filepath)
|
|
artist = m.get('artist', [''])[0]
|
|
if m.get('albumartist'):
|
|
albumartist = m.get('albumartist', [''])[0]
|
|
else:
|
|
albumartist = m.get('artist', [''])[0]
|
|
date = m.get('date', [''])[0]
|
|
album = m.get('album', [''])[0]
|
|
discnumber = m.get('discnumber', [''])[0]
|
|
tracknumber = m.get('tracknumber', [''])[0]
|
|
title = m.get('title', [''])[0]
|
|
genre = m.get('genre', [''])[0]
|
|
length = str(int(m.info.length) // 60) + ":"
|
|
length += str(int(m.info.length) % 60)
|
|
last_modified = os.path.getmtime(filepath)
|
|
|
|
d = locals()
|
|
d.pop('m')
|
|
return tuple(d.values())
|
|
|
|
|
|
def get_artist_albums(artist):
|
|
"""
|
|
Returns a list of all albums produced by the specified `artist`.
|
|
"""
|
|
albums = db_execute(
|
|
"SELECT DISTINCT album FROM tracks WHERE albumartist = ? ORDER BY date ASC",
|
|
(artist,)
|
|
).fetchall()
|
|
albums = [row[0] for row in albums]
|
|
return albums
|
|
|
|
def get_album_tracks(artist, album):
|
|
"""
|
|
Returns a list of all tracks on the `album` produced by `artist`.
|
|
"""
|
|
tracks = db_execute(
|
|
"SELECT discnumber, tracknumber, title FROM tracks "
|
|
"WHERE albumartist = ? AND album = ?"
|
|
"ORDER BY discnumber ASC, tracknumber ASC",
|
|
(artist, album)
|
|
).fetchall()
|
|
tracks = [tuple(row) for row in tracks]
|
|
return tracks
|
|
|
|
def get_track(artist, album, discnumber, tracknumber):
|
|
"""
|
|
Returns a dictionary containing all information about a specific track.
|
|
"""
|
|
track = db_execute(
|
|
"SELECT * FROM tracks "
|
|
"WHERE albumartist = ? AND album = ? "
|
|
"AND discnumber = ? AND tracknumber = ?",
|
|
(artist, album, discnumber, tracknumber)
|
|
).fetchone()
|
|
return dict(track)
|
|
|
|
def get_random_track():
|
|
"""
|
|
Returns a random track.
|
|
"""
|
|
track = db_execute(
|
|
"SELECT * FROM tracks ORDER BY random() LIMIT 1"
|
|
).fetchone()
|
|
return dict(track)
|
|
|
|
def get_all_artists():
|
|
"""
|
|
Returns a list of all artist names.
|
|
"""
|
|
artists = db_execute(
|
|
"SELECT DISTINCT albumartist FROM tracks ORDER BY albumartist ASC"
|
|
)
|
|
artists = [row[0] for row in artists]
|
|
return artists
|