nd-backerupper/backerupper.py

201 lines
8.2 KiB
Python
Executable File

#!/usr/bin/python3
import sys
from time import sleep
import argparse
import os
from mutagen import id3
from mutagen.id3._frames import TXXX
from mutagen._util import MutagenError
from sqlite3 import OperationalError
import sqlite3
def get_user_id(db: str, name: str) -> str:
"""Fetches the navidrome user id for the user with either the user_name or name <name> from the navidrome databse <db>.
input:
db: path to a navidrome sql3 database. Must contain at least the user table.
name: the name of the user of which the user_id should be returned. Can be either user_name or name.
output:
navidrome user_id: str
"""
try:
conn = sqlite3.connect(db)
cursor = conn.cursor()
cursor.execute(f"""SELECT id FROM user WHERE user_name='{name}' or name='{name}'""")
id = cursor.fetchall()[0][0]
conn.close()
return id
except OperationalError as e:
print(f"Failed to read id from {db}: {e}")
return ''
class ExportService():
@staticmethod
def exp(db: str, user_id: str):
"""Exports media_file annotations from a given navidrome database <db> to id3v2 tags. Only exports annotations belonging to the navidrome user with id <user_id>"""
a = ExportService.get_annotations(db, user_id)
ExportService.write_annotations(a)
@staticmethod
def get_annotations(db: str, user_id: str) -> list:
"""Fetches all annotations belonging to the user with id <user_id> from the provided sql3 navidrome database <db>.
input:
db: path to a navidrome sql3 database. This database must contain at least the annotation and media_file tables.
user_id: the user_id to fetch annotations from.
output:
list of annotations in the format (path, playCount, rating, starred).
"""
try:
conn = sqlite3.connect(db)
cursor = conn.cursor()
cursor.execute(f"""SELECT path,play_count,rating,starred FROM annotation INNER JOIN media_file ON item_id=id WHERE user_id='{user_id}' AND item_type='media_file'""")
annos = cursor.fetchall()
conn.close()
return annos
except sqlite3.OperationalError as e:
print(f"Failed to read data from {db}: {e}")
return []
@staticmethod
def write_annotations(annos: list):
"""Writes a list of annotations to id3v2 tags.
The tags used are TXXX=FNVD_<NAME>, where <NAME> is one of PlayCount, Rating, Starred.
input:
annos: list of annotations. Format must be (path, playCount, rating, starred).
"""
count = 0
for a in annos:
try:
file = id3.ID3(a[0])
file.add(TXXX(desc="FNVD_PlayCount", text=str(a[1])))
file.add(TXXX(desc="FNVD_Rating", text=str(a[2])))
file.add(TXXX(desc="FNVD_Starred", text=str(a[3])))
file.save()
count += 1
except MutagenError as e:
print(f"Failed to write data: {e}")
print(f"Exported {count} annotations!")
class ImportService():
@staticmethod
def imp(path: str, db: str, user_id: str):
"""Imports media_file annotations from files located in the <path> directory into to he given navidrome database <db>, as belonging to user with id <user_id>."""
a = ImportService.get_annotations(path)
ImportService.write_annotations(a, db, user_id)
@staticmethod
def get_annotations(path: str) -> dict:
"""Finds all mp3 files in path <path> and extracts the FNVD_PlayCount, FNVD_Rating, and FNVD_Starred id3v2 fields into a dict
input:
path: string containing the path for the root directory of the music folder.
output:
a dict containing annotations in the format {path: (playCount, rating, starred)}.
"""
realpath = os.path.realpath(path)
annos = {}
for dir,_,files in os.walk(realpath):
for mp3_file in files:
if mp3_file.endswith('.mp3'):
play_count = 0
rating = 0
starred = 0
try:
file = id3.ID3(dir+'/'+mp3_file)
fields = file.getall('TXXX')
for f in fields:
if f.desc == 'FNVD_PlayCount':
play_count = int(f.text[0])
if f.desc == 'FNVD_Rating':
rating = int(f.text[0])
if f.desc == 'FNVD_Starred':
starred = int(f.text[0])
if play_count or rating or starred:
annos[dir+'/'+mp3_file] = (play_count, rating, starred)
except MutagenError as e:
print(f"failed for file {dir+'/'+mp3_file}: {e}")
return annos
@staticmethod
def write_annotations(annos: dict, db: str, user_id: str):
"""Writes a dict of annotations to the navidrome database <db> as user with id <user_id>.
input:
annos: dict of annotations. Format must be {path: (playCount, rating, starred)}.
db: path to a navidrome sql3 database. This database must contain at least the annotation and media_file tables.
user_id: the user_id to ascribe annotations to.
"""
try:
conn = sqlite3.connect(db)
cursor = conn.cursor()
# step 1: fetch path and item_id for existing annotations WHERE path IN annos.paths
cursor.execute(f"""SELECT path,id FROM media_file WHERE path IN ('{"','".join(annos.keys())}')""")
# step 2: create list with item_ids
items = []
for path,id in cursor.fetchall(): # filepaths without an item_id in navidrome are excluded from items, and thus ignored for insertion.
pc,r,s = annos[path]
items.append(f"('{user_id}', '{id}', 'media_file', {pc}, {r}, {s})")
# step 3: insert list into annotation
cursor.execute(f"""INSERT INTO annotation (user_id, item_id, item_type, play_count, rating, starred) VALUES {','.join(items)} ON CONFLICT(user_id, item_id, item_type) DO UPDATE SET play_count = excluded.play_count, rating = excluded.rating, starred=excluded.starred""")
conn.commit()
conn.close()
print(f"Imported {len(items)} annotations!")
except sqlite3.OperationalError as e:
print(f"Failed to write data: {e}")
def main():
parser = argparse.ArgumentParser(description="Navidrome backer-upper")
parser.add_argument("-u", "--user", type=str, nargs=1, metavar="user", default='user', help="Either the username or name of the account for which you want to manage annotations. default: 'user'")
parser.add_argument("-d", "--db", type=str, nargs=1, metavar="database", default='./navidrome.db', help="Filepath to the navidrome database you want to backup (or restore)")
parser.add_argument("ie", type=str, nargs=1, metavar="import|export", default=None, choices=['import', 'export'], help="What you want to do with the annotations. choices: [import, export]")
parser.add_argument("-p", "--path", type=str, nargs=1, required='import' in sys.argv, metavar="mediapath", default=None, help="The folder to import metadata from (import only)")
args = parser.parse_args()
if args.user == "user":
print(f"No user specified, using default ('{args.user}')...")
if args.db == './navidrome.db':
print(f"No database specified, using default ({args.db})...")
if args.ie[0] == "import":
print(f"Importing annotations from '{args.path[0]}'...")
elif args.ie[0] == "export":
print(f"Exporting annotations...")
sleep(2)
# run code
user_id = get_user_id(args.db, args.user)
if args.ie[0] == "import":
ImportService.imp(args.path[0], args.db, user_id)
elif args.ie[0] == "export":
ExportService.exp(args.db, user_id)
if __name__ == '__main__':
main()
# db = 'navidrome.db'
# user = "djairo"
# user_id = get_user_id(db, user)
# ExportService.exp(db, user_id)
# ImportService.imp('/media/drive/music/', db, user_id)