#!/usr/bin/env python3 """ API for interacting with the Musik database. """ import os import sqlite3 import multiprocessing import mutagen import mutagen.mp3 MUSIC_DIR = "/home/iou1name/music/Music" MUSIC_EXT = ['flac', 'mp3', 'wav', 'm4a'] DB_LOCK = multiprocessing.Lock() def db_execute(*args, **kwargs): """ Opens a connection to the app's database and executes the SQL statements passed to this function. """ with sqlite3.connect('musik.db') as con: DB_LOCK.acquire() con.row_factory = sqlite3.Row cur = con.cursor() res = cur.execute(*args, **kwargs) DB_LOCK.release() return res def db_execute_unsafe(*args, **kwargs): """ A non-thread-safe version of `db_execute()`. Only use if you're certain there will be no competing threads for the database. """ with sqlite3.connect('musik.db') as con: con.row_factory = sqlite3.Row cur = con.cursor() res = cur.execute(*args, **kwargs) return res def db_execute_many(*args, **kwargs): """ Same as `db_executei()` except with `cur.executemany()`. """ with sqlite3.connect('musik.db') as con: DB_LOCK.acquire() con.row_factory = sqlite3.Row cur = con.cursor() res = cur.executemany(*args, **kwargs) DB_LOCK.release() return res def init_database(): """ Initializes the database. """ try: db_execute_unsafe("SELECT * FROM tracks LIMIT 1").fetchone() except sqlite3.OperationalError: db_execute("CREATE TABLE tracks(" "filepath TEXT PRIMARY KEY," "artist TEXT," "albumartist TEXT," "date TEXT," "album TEXT," "discnumber TEXT," "tracknumber TEXT," "title TEXT," "genre TEXT," "length TEXT" ")" ) build_library(MUSIC_DIR) def build_library(root_dir): """Walks the music directory and builds a library of tracks.""" print("Building library") filepaths = [] for dir_name, sub_dirs, files in os.walk(root_dir): for file in files: if not os.path.splitext(file)[1][1:] in MUSIC_EXT: continue filepath = os.path.join(root_dir, dir_name, file) filepaths.append(filepath) global worker def worker(filepath): """Worker for multi-processing tracks.""" data = read_track(filepath) return data with multiprocessing.Pool() as pool: mapping = pool.imap(worker, filepaths) tracks = [] prev_percent = 0 while True: try: tracks.append(mapping.next()) except StopIteration: break percent = round(len(tracks) / len(filepaths) * 100, 2) if percent >= prev_percent + 2.5: print(f"{percent}%") prev_percent = percent db_execute_many( "INSERT INTO tracks VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)", tracks ) print("Done") def read_track(filepath): """ Reads the specified file and extracts relevant information from it. Returns a tuple. """ if filepath.endswith("mp3"): m = mutagen.mp3.EasyMP3(filepath) else: m = mutagen.File(filepath) artist = m.get('artist', [''])[0] if m.get('albumartist'): albumartist = m.get('albumartist', [''])[0] else: albumartist = m.get('artist', [''])[0] date = m.get('date', [''])[0] album = m.get('album', [''])[0] discnumber = m.get('discnumber', [''])[0] tracknumber = m.get('tracknumber', [''])[0] title = m.get('title', [''])[0] genre = m.get('genre', [''])[0] length = str(int(m.info.length) // 60) + ":" length += str(int(m.info.length) % 60) d = locals() d.pop('m') return tuple(d.values()) def insert_track(data): """ Inserts a new item into the `tracks` table. `data` should be a tuple containing an item for every column in the `tracks` table. See the table declarion in `init_database()` for more information. """ db_execute( "INSERT INTO tracks VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)", data ) def get_artist_albums(artist): """ Returns a list of all albums produced by the specified `artist`. """ albums = db_execute( "SELECT DISTINCT album FROM tracks WHERE artist = ? ORDER BY date ASC", (artist,) ).fetchall() albums = [row[0] for row in albums] return albums def get_album_tracks(artist, album): """ Returns a list of all tracks on the `album` produced by `artist`. """ tracks = db_execute( "SELECT discnumber, tracknumber, title FROM tracks " "WHERE artist = ? AND album = ?" "ORDER BY discnumber ASC, tracknumber ASC", (artist, album) ).fetchall() tracks = [tuple(row) for row in tracks] return tracks def get_track(artist, album, discnumber, tracknumber): """ Returns a dictionary containing all information about a specific track. """ track = db_execute( "SELECT * FROM tracks " "WHERE artist = ? AND album = ? " "AND discnumber = ? AND tracknumber = ?", (artist, album, discnumber, tracknumber) ).fetchone() return dict(track) def get_random_track(): """ Returns a random track. """ track = db_execute( "SELECT * FROM tracks ORDER BY random() LIMIT 1" ).fetchone() return dict(track) def get_all_artists(): """ Returns a list of all artist names. """ artists = db_execute( "SELECT DISTINCT albumartist FROM tracks ORDER BY artist ASC" ) artists = [row[0] for row in artists] return artists