Pyrite/database.py
2025-02-12 09:20:38 -05:00

118 lines
4.0 KiB
Python

#!/usr/bin/env python3
"""
Contains helper functions for accessing the database.
"""
import os
from urllib import parse
import config
async def get_random_track(request, playlist_name=""):
"""Selects a random track."""
async with request.app.state.db_pool.acquire() as conn:
if playlist_name:
track = await conn.fetchrow(
"SELECT * FROM track "
"LEFT JOIN playlist_track ON (track.filepath = playlist_track.filepath) "
"LEFT JOIN playlist ON (playlist_track.playlist_id = playlist.id) "
"WHERE playlist.name = $1 ORDER BY random() LIMIT 1",
playlist_name)
else:
track = await conn.fetchrow(
"SELECT * FROM track ORDER BY random() LIMIT 1")
track = dict(track)
fpath = track.pop('filepath')
art256 = os.path.join(os.path.dirname(fpath), 'folder-256x256.jpg')
art64 = os.path.join(os.path.dirname(fpath), 'folder-64x64.jpg')
track['source'] = convert_filepath(fpath)
track['artwork'] = [{
'src': convert_filepath(art256),
'sizes': '256x256',
'type': 'image/jpeg'
}
]
return track
async def get_artists(request, playlist_name=""):
"""Returns all artists in the database."""
async with request.app.state.db_pool.acquire() as conn:
if playlist_name:
artists = await conn.fetch(
"SELECT DISTINCT track.albumartist FROM track "
"LEFT JOIN playlist_track ON (track.filepath = playlist_track.filepath) "
"LEFT JOIN playlist ON (playlist_track.playlist_id = playlist.id) "
"WHERE playlist.name = $1", playlist_name)
else:
artists = await conn.fetch(
"SELECT DISTINCT albumartist FROM track ORDER BY albumartist")
artists = [a['albumartist'] for a in artists]
return artists
async def get_albums(request, albumartist, playlist_name=""):
"""Return all albums associated with a particular albumartist."""
async with request.app.state.db_pool.acquire() as conn:
if playlist_name:
albums = await conn.fetch(
"SELECT DISTINCT ON (track.album, track.date) track.filepath, track.album, track.date "
"FROM track "
"LEFT JOIN playlist_track ON (track.filepath = playlist_track.filepath) "
"LEFT JOIN playlist ON (playlist_track.playlist_id = playlist.id) "
"WHERE track.albumartist = $1 AND playlist.name = $2 ORDER BY date",
albumartist, playlist_name)
else:
albums = await conn.fetch(
"SELECT DISTINCT ON (album, date) filepath, album, date "
"FROM track "
"WHERE albumartist = $1 ORDER BY date",
albumartist)
albums = [dict(a) for a in albums]
for album in albums:
fpath = album.pop('filepath')
cpath = os.path.join(os.path.dirname(fpath), 'folder-64x64.jpg')
album['cover_art_path'] = convert_filepath(cpath)
return albums
async def get_tracks(request, albumartist, album, date, playlist_name=""):
"""Select an album and return all associated tracks."""
async with request.app.state.db_pool.acquire() as conn:
if playlist_name:
tracks = await conn.fetch(
"SELECT * FROM track "
"LEFT JOIN playlist_track ON (track.filepath = playlist_track.filepath) "
"LEFT JOIN playlist ON (playlist_track.playlist_id = playlist.id) "
"WHERE track.albumartist = $1 AND "
"track.album = $2 AND track.date = $3 AND playlist.name = $4 "
"ORDER BY track.discnumber ASC, track.tracknumber ASC",
albumartist, album, date, playlist_name)
else:
tracks = await conn.fetch(
"SELECT * FROM track "
"WHERE albumartist = $1 AND "
"album = $2 AND date = $3 "
"ORDER BY discnumber ASC, tracknumber ASC",
albumartist, album, date)
tracks = [dict(t) for t in tracks]
for track in tracks:
track['filepath'] = convert_filepath(track['filepath'])
track.pop('last_modified')
return tracks
async def get_playlists(request):
"""Return all playlists in the database."""
async with request.app.state.db_pool.acquire() as conn:
playlists = await conn.fetch("SELECT name FROM playlist")
playlists = [p['name'] for p in playlists]
return playlists
def convert_filepath(path):
"""Convert a filepath to a URL."""
path = config.server_homepage + '/library/' + path
#path = parse.quote(path)
return path