Musik/database.py

205 lines
4.9 KiB
Python
Raw Normal View History

#!/usr/bin/env python3
"""
API for interacting with the Musik database.
"""
import os
import sqlite3
import multiprocessing
import mutagen
import mutagen.mp3
MUSIC_DIR = "/home/iou1name/music/Music"
MUSIC_EXT = ['flac', 'mp3', 'wav', 'm4a']
DB_LOCK = multiprocessing.Lock()
def db_execute(*args, **kwargs):
"""
Opens a connection to the app's database and executes the SQL statements
passed to this function.
"""
with sqlite3.connect('musik.db') as con:
DB_LOCK.acquire()
con.row_factory = sqlite3.Row
cur = con.cursor()
res = cur.execute(*args, **kwargs)
DB_LOCK.release()
return res
def db_execute_unsafe(*args, **kwargs):
"""
A non-thread-safe version of `db_execute()`. Only use if you're
certain there will be no competing threads for the database.
"""
with sqlite3.connect('musik.db') as con:
con.row_factory = sqlite3.Row
cur = con.cursor()
res = cur.execute(*args, **kwargs)
return res
def db_execute_many(*args, **kwargs):
"""
Same as `db_executei()` except with `cur.executemany()`.
"""
with sqlite3.connect('musik.db') as con:
DB_LOCK.acquire()
con.row_factory = sqlite3.Row
cur = con.cursor()
res = cur.executemany(*args, **kwargs)
DB_LOCK.release()
return res
def init_database():
"""
Initializes the database.
"""
try:
db_execute_unsafe("SELECT * FROM tracks LIMIT 1").fetchone()
except sqlite3.OperationalError:
db_execute("CREATE TABLE tracks("
"filepath TEXT PRIMARY KEY,"
"artist TEXT,"
"albumartist TEXT,"
"date TEXT,"
"album TEXT,"
"discnumber TEXT,"
"tracknumber TEXT,"
"title TEXT,"
"genre TEXT,"
"length TEXT"
")"
)
build_library(MUSIC_DIR)
def build_library(root_dir):
"""Walks the music directory and builds a library of tracks."""
print("Building library")
filepaths = []
for dir_name, sub_dirs, files in os.walk(root_dir):
for file in files:
if not os.path.splitext(file)[1][1:] in MUSIC_EXT:
continue
filepath = os.path.join(root_dir, dir_name, file)
filepaths.append(filepath)
global worker
def worker(filepath):
"""Worker for multi-processing tracks."""
data = read_track(filepath)
return data
with multiprocessing.Pool() as pool:
mapping = pool.imap(worker, filepaths)
tracks = []
prev_percent = 0
while True:
try:
tracks.append(mapping.next())
except StopIteration:
break
percent = round(len(tracks) / len(filepaths) * 100, 2)
if percent >= prev_percent + 2.5:
print(f"{percent}%")
prev_percent = percent
db_execute_many(
"INSERT INTO tracks VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)",
tracks
)
print("Done")
def read_track(filepath):
"""
Reads the specified file and extracts relevant information from it.
Returns a tuple.
"""
if filepath.endswith("mp3"):
m = mutagen.mp3.EasyMP3(filepath)
else:
m = mutagen.File(filepath)
artist = m.get('artist', [''])[0]
if m.get('albumartist'):
albumartist = m.get('albumartist', [''])[0]
else:
albumartist = m.get('artist', [''])[0]
date = m.get('date', [''])[0]
album = m.get('album', [''])[0]
discnumber = m.get('discnumber', [''])[0]
tracknumber = m.get('tracknumber', [''])[0]
title = m.get('title', [''])[0]
genre = m.get('genre', [''])[0]
length = str(int(m.info.length) // 60) + ":"
length += str(int(m.info.length) % 60)
d = locals()
d.pop('m')
return tuple(d.values())
def insert_track(data):
"""
Inserts a new item into the `tracks` table. `data` should be a tuple
containing an item for every column in the `tracks` table. See the
table declarion in `init_database()` for more information.
"""
db_execute(
"INSERT INTO tracks VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)",
data
)
def get_artist_albums(artist):
"""
Returns a list of all albums produced by the specified `artist`.
"""
albums = db_execute(
2019-11-05 09:50:45 -05:00
"SELECT DISTINCT album FROM tracks WHERE albumartist = ? ORDER BY date ASC",
(artist,)
).fetchall()
albums = [row[0] for row in albums]
return albums
def get_album_tracks(artist, album):
"""
Returns a list of all tracks on the `album` produced by `artist`.
"""
tracks = db_execute(
"SELECT discnumber, tracknumber, title FROM tracks "
2019-11-05 09:50:45 -05:00
"WHERE albumartist = ? AND album = ?"
"ORDER BY discnumber ASC, tracknumber ASC",
(artist, album)
).fetchall()
tracks = [tuple(row) for row in tracks]
return tracks
def get_track(artist, album, discnumber, tracknumber):
"""
Returns a dictionary containing all information about a specific track.
"""
track = db_execute(
"SELECT * FROM tracks "
2019-11-05 09:50:45 -05:00
"WHERE albumartist = ? AND album = ? "
"AND discnumber = ? AND tracknumber = ?",
(artist, album, discnumber, tracknumber)
).fetchone()
return dict(track)
def get_random_track():
"""
Returns a random track.
"""
track = db_execute(
"SELECT * FROM tracks ORDER BY random() LIMIT 1"
).fetchone()
return dict(track)
def get_all_artists():
"""
Returns a list of all artist names.
"""
artists = db_execute(
2019-11-05 09:50:45 -05:00
"SELECT DISTINCT albumartist FROM tracks ORDER BY albumartist ASC"
)
artists = [row[0] for row in artists]
return artists