#!/usr/bin/python3 import sys from time import sleep import argparse import os from mutagen import id3 from mutagen.id3._frames import TXXX from mutagen._util import MutagenError from sqlite3 import OperationalError import sqlite3 def get_user_id(db: str, name: str) -> str: """Fetches the navidrome user id for the user with either the user_name or name from the navidrome databse . input: db: path to a navidrome sql3 database. Must contain at least the user table. name: the name of the user of which the user_id should be returned. Can be either user_name or name. output: navidrome user_id: str """ try: conn = sqlite3.connect(db) cursor = conn.cursor() cursor.execute(f"""SELECT id FROM user WHERE user_name='{name}' or name='{name}'""") id = cursor.fetchall()[0][0] conn.close() return id except OperationalError as e: print(f"Failed to read id from {db}: {e}") return '' class ExportService(): @staticmethod def exp(db: str, user_id: str): """Exports media_file annotations from a given navidrome database to id3v2 tags. Only exports annotations belonging to the navidrome user with id """ a = ExportService.get_annotations(db, user_id) ExportService.write_annotations(a) @staticmethod def get_annotations(db: str, user_id: str) -> list: """Fetches all annotations belonging to the user with id from the provided sql3 navidrome database . input: db: path to a navidrome sql3 database. This database must contain at least the annotation and media_file tables. user_id: the user_id to fetch annotations from. output: list of annotations in the format (path, playCount, rating, starred). """ try: conn = sqlite3.connect(db) cursor = conn.cursor() cursor.execute(f"""SELECT path,play_count,rating,starred FROM annotation INNER JOIN media_file ON item_id=id WHERE user_id='{user_id}' AND item_type='media_file'""") annos = cursor.fetchall() conn.close() return annos except sqlite3.OperationalError as e: print(f"Failed to read data from {db}: {e}") return [] @staticmethod def write_annotations(annos: list): """Writes a list of annotations to id3v2 tags. The tags used are TXXX=FNVD_, where is one of PlayCount, Rating, Starred. input: annos: list of annotations. Format must be (path, playCount, rating, starred). """ count = 0 for a in annos: try: file = id3.ID3(a[0]) file.add(TXXX(desc="FNVD_PlayCount", text=str(a[1]))) file.add(TXXX(desc="FNVD_Rating", text=str(a[2]))) file.add(TXXX(desc="FNVD_Starred", text=str(a[3]))) file.save() count += 1 except MutagenError as e: print(f"Failed to write data: {e}") print(f"Exported {count} annotations!") class ImportService(): @staticmethod def imp(path: str, db: str, user_id: str): """Imports media_file annotations from files located in the directory into to he given navidrome database , as belonging to user with id .""" a = ImportService.get_annotations(path) ImportService.write_annotations(a, db, user_id) @staticmethod def get_annotations(path: str) -> dict: """Finds all mp3 files in path and extracts the FNVD_PlayCount, FNVD_Rating, and FNVD_Starred id3v2 fields into a dict input: path: string containing the path for the root directory of the music folder. output: a dict containing annotations in the format {path: (playCount, rating, starred)}. """ realpath = os.path.realpath(path) annos = {} for dir,_,files in os.walk(realpath): for mp3_file in files: if mp3_file.endswith('.mp3'): play_count = 0 rating = 0 starred = 0 try: file = id3.ID3(dir+'/'+mp3_file) fields = file.getall('TXXX') for f in fields: if f.desc == 'FNVD_PlayCount': play_count = int(f.text[0]) if f.desc == 'FNVD_Rating': rating = int(f.text[0]) if f.desc == 'FNVD_Starred': starred = int(f.text[0]) if play_count or rating or starred: annos[dir+'/'+mp3_file] = (play_count, rating, starred) except MutagenError as e: print(f"failed for file {dir+'/'+mp3_file}: {e}") return annos @staticmethod def write_annotations(annos: dict, db: str, user_id: str): """Writes a dict of annotations to the navidrome database as user with id . input: annos: dict of annotations. Format must be {path: (playCount, rating, starred)}. db: path to a navidrome sql3 database. This database must contain at least the annotation and media_file tables. user_id: the user_id to ascribe annotations to. """ try: conn = sqlite3.connect(db) cursor = conn.cursor() # step 1: fetch path and item_id for existing annotations WHERE path IN annos.paths cursor.execute(f"""SELECT path,id FROM media_file WHERE path IN ('{"','".join(annos.keys())}')""") # step 2: create list with item_ids items = [] for path,id in cursor.fetchall(): # filepaths without an item_id in navidrome are excluded from items, and thus ignored for insertion. pc,r,s = annos[path] items.append(f"('{user_id}', '{id}', 'media_file', {pc}, {r}, {s})") # step 3: insert list into annotation cursor.execute(f"""INSERT INTO annotation (user_id, item_id, item_type, play_count, rating, starred) VALUES {','.join(items)} ON CONFLICT(user_id, item_id, item_type) DO UPDATE SET play_count = excluded.play_count, rating = excluded.rating, starred=excluded.starred""") conn.commit() conn.close() print(f"Imported {len(items)} annotations!") except sqlite3.OperationalError as e: print(f"Failed to write data: {e}") def main(): parser = argparse.ArgumentParser(description="Navidrome backer-upper") parser.add_argument("-u", "--user", type=str, nargs=1, metavar="user", default='user', help="Either the username or name of the account for which you want to manage annotations. default: 'user'") parser.add_argument("-d", "--db", type=str, nargs=1, metavar="database", default='./navidrome.db', help="Filepath to the navidrome database you want to backup (or restore)") parser.add_argument("ie", type=str, nargs=1, metavar="import|export", default=None, choices=['import', 'export'], help="What you want to do with the annotations. choices: [import, export]") parser.add_argument("-p", "--path", type=str, nargs=1, required='import' in sys.argv, metavar="mediapath", default=None, help="The folder to import metadata from (import only)") args = parser.parse_args() if args.user == "user": print(f"No user specified, using default ('{args.user}')...") if args.db == './navidrome.db': print(f"No database specified, using default ({args.db})...") if args.ie[0] == "import": print(f"Importing annotations from '{args.path[0]}'...") elif args.ie[0] == "export": print(f"Exporting annotations...") sleep(2) # run code user_id = get_user_id(args.db, args.user) if args.ie[0] == "import": ImportService.imp(args.path[0], args.db, user_id) elif args.ie[0] == "export": ExportService.exp(args.db, user_id) if __name__ == '__main__': main()