226 lines
11 KiB
Python
Executable File
226 lines
11 KiB
Python
Executable File
#!/usr/bin/python3
|
|
|
|
import re
|
|
import sys
|
|
from time import sleep
|
|
import argparse
|
|
import os
|
|
from mutagen import id3
|
|
from mutagen.id3._frames import TXXX
|
|
from mutagen._util import MutagenError
|
|
from sqlite3 import OperationalError
|
|
import sqlite3
|
|
|
|
def get_user_id(db: str, name: str) -> str:
|
|
"""Fetches the navidrome user id for the user with either the user_name or name <name> from the navidrome databse <db>.
|
|
|
|
input:
|
|
db: path to a navidrome sql3 database. Must contain at least the user table.
|
|
name: the name of the user of which the user_id should be returned. Can be either user_name or name.
|
|
output:
|
|
navidrome user_id: str
|
|
"""
|
|
try:
|
|
conn = sqlite3.connect(db)
|
|
cursor = conn.cursor()
|
|
cursor.execute(f"""SELECT id FROM user WHERE user_name='{name}' or name='{name}'""")
|
|
id = cursor.fetchall()[0][0]
|
|
conn.close()
|
|
return id
|
|
except OperationalError as e:
|
|
print(f"Failed to read id from {db}: {e}")
|
|
return ''
|
|
|
|
|
|
class ExportService():
|
|
@staticmethod
|
|
def exp(db: str, user_id: str, re_nav: str, re_re: str):
|
|
"""Exports media_file annotations from a given navidrome database <db> to id3v2 tags. Only exports annotations belonging to the navidrome user with id <user_id>.
|
|
|
|
input:
|
|
db: path to a navidrome sql3 database. This database must contain at least the annotation and media_file tables.
|
|
user_id: the user_id to fetch annotations from.
|
|
re_nav: path substitution (if necessary); Navidrome folder structure component.
|
|
re_re: path substitution (if necessary); real folder structure component.
|
|
"""
|
|
a = ExportService.get_annotations(db, user_id, re_nav, re_re)
|
|
print(f"Fetched {len(a)} annotations from db!")
|
|
ExportService.write_annotations(a)
|
|
|
|
|
|
@staticmethod
|
|
def get_annotations(db: str, user_id: str, re_nav: str = "", re_re:str = "") -> dict:
|
|
"""Fetches all annotations belonging to the user with id <user_id> from the provided sql3 navidrome database <db>.
|
|
|
|
input:
|
|
db: path to a navidrome sql3 database. This database must contain at least the annotation and media_file tables.
|
|
user_id: the user_id to fetch annotations from.
|
|
re_nav: path substitution (if necessary); Navidrome folder structure component.
|
|
re_re: path substitution (if necessary); real folder structure component.
|
|
output:
|
|
dict of annotations in the format {path: (playCount, rating, starred)}.
|
|
"""
|
|
annos = {}
|
|
try:
|
|
conn = sqlite3.connect(db)
|
|
cursor = conn.cursor()
|
|
cursor.execute(f"""SELECT path,play_count,rating,starred FROM annotation INNER JOIN media_file ON item_id=id WHERE user_id='{user_id}' AND item_type='media_file'""")
|
|
for p,pc,r,s in cursor.fetchall():
|
|
annos[re.sub(re_nav, re_re, p)] = (pc,r,s)
|
|
conn.close()
|
|
return annos
|
|
except sqlite3.OperationalError as e:
|
|
print(f"Failed to read data from {db}: {e}")
|
|
return annos
|
|
|
|
|
|
@staticmethod
|
|
def write_annotations(annos: dict, re_nav: str = "", re_re: str = ""):
|
|
"""Writes a list of annotations to id3v2 tags.
|
|
|
|
The tags used are TXXX=FNVD_<NAME>, where <NAME> is one of PlayCount, Rating, Starred.
|
|
|
|
input:
|
|
annos: dict of annotations. Format must be {path: (playCount, rating, starred)}.
|
|
re_nav: path substitution (if necessary); Navidrome folder structure component.
|
|
re_re: path substitution (if necessary); real folder structure component.
|
|
"""
|
|
count = 0
|
|
for k,v in annos.items():
|
|
try:
|
|
file = id3.ID3(re.sub(re_nav, re_re, k))
|
|
file.add(TXXX(desc="FNVD_PlayCount", text=str(v[0])))
|
|
file.add(TXXX(desc="FNVD_Rating", text=str(v[1])))
|
|
file.add(TXXX(desc="FNVD_Starred", text=str(v[2])))
|
|
file.save()
|
|
count += 1
|
|
except MutagenError as e:
|
|
print(f"Failed to write data: {e}")
|
|
|
|
print(f"Exported {count} annotations!")
|
|
|
|
|
|
class ImportService():
|
|
@staticmethod
|
|
def imp(path: str, db: str, user_id: str, re_nav: str, re_re: str):
|
|
"""Imports media_file annotations from files located in the <path> directory into to he given navidrome database <db>, as belonging to user with id <user_id>.
|
|
|
|
input:
|
|
path: string containing the path for the root directory of the music folder.
|
|
db: path to a navidrome sql3 database. This database must contain at least the annotation and media_file tables.
|
|
user_id: the user_id to ascribe annotations to.
|
|
re_nav: path substitution (if necessary); Navidrome folder structure component.
|
|
re_re: path substitution (if necessary); real folder structure component.
|
|
"""
|
|
a = ImportService.get_annotations(path, re_nav, re_re)
|
|
print(f"Read {len(a)} annotations from files!")
|
|
ImportService.write_annotations(a, db, user_id)
|
|
|
|
|
|
@staticmethod
|
|
def get_annotations(path: str, re_nav: str = "", re_re: str = "") -> dict:
|
|
"""Finds all mp3 files in path <path> and extracts the FNVD_PlayCount, FNVD_Rating, and FNVD_Starred id3v2 fields into a dict
|
|
|
|
input:
|
|
path: string containing the path for the root directory of the music folder.
|
|
re_nav: path substitution (if necessary); Navidrome folder structure component.
|
|
re_re: path substitution (if necessary); real folder structure component.
|
|
output:
|
|
a dict containing annotations in the format {path: (playCount, rating, starred)}.
|
|
"""
|
|
realpath = os.path.realpath(path)
|
|
annos = {}
|
|
|
|
print(f"Reading files (this may take a while)...")
|
|
for dir,_,files in os.walk(realpath):
|
|
for mp3_file in files:
|
|
if mp3_file.endswith('.mp3'):
|
|
play_count = 0
|
|
rating = 0
|
|
starred = 0
|
|
|
|
try:
|
|
file = id3.ID3(dir+'/'+mp3_file)
|
|
fields = file.getall('TXXX')
|
|
for f in fields:
|
|
if f.desc == 'FNVD_PlayCount':
|
|
play_count = int(f.text[0])
|
|
if f.desc == 'FNVD_Rating':
|
|
rating = int(f.text[0])
|
|
if f.desc == 'FNVD_Starred':
|
|
starred = int(f.text[0])
|
|
|
|
if play_count or rating or starred:
|
|
annos[re.sub(re_re, re_nav, dir)+'/'+mp3_file] = (play_count, rating, starred)
|
|
except MutagenError as e:
|
|
print(f"failed for file {dir+'/'+mp3_file}: {e}")
|
|
return annos
|
|
|
|
|
|
@staticmethod
|
|
def write_annotations(annos: dict, db: str, user_id: str, re_nav: str = "", re_re: str = ""):
|
|
"""Writes a dict of annotations to the navidrome database <db> as user with id <user_id>.
|
|
|
|
input:
|
|
annos: dict of annotations. Format must be {path: (playCount, rating, starred)}.
|
|
db: path to a navidrome sql3 database. This database must contain at least the annotation and media_file tables.
|
|
user_id: the user_id to ascribe annotations to.
|
|
re_nav: path substitution (if necessary); Navidrome folder structure component.
|
|
re_re: path substitution (if necessary); real folder structure component.
|
|
"""
|
|
try:
|
|
conn = sqlite3.connect(db)
|
|
cursor = conn.cursor()
|
|
# step 1: fetch path and item_id for existing annotations WHERE path IN annos.paths
|
|
escaped = "','".join([ re.sub(r'\\', r'\\\\', re.sub("'", "''", re.sub(re_re, re_nav, k))) for k in annos.keys()])
|
|
cursor.execute(f"""SELECT path,id FROM media_file WHERE path IN ('{escaped}')""")
|
|
|
|
# step 2: create list with item_ids
|
|
items = []
|
|
for path,id in cursor.fetchall(): # filepaths without an item_id in navidrome are excluded from items, and thus ignored for insertion.
|
|
pc,r,s = annos[path]
|
|
items.append(f"('{user_id}', '{id}', 'media_file', {pc}, {r}, {s})")
|
|
|
|
# step 3: insert list into annotation
|
|
cursor.execute(f"""INSERT INTO annotation (user_id, item_id, item_type, play_count, rating, starred) VALUES {','.join(items)} ON CONFLICT(user_id, item_id, item_type) DO UPDATE SET play_count = excluded.play_count, rating = excluded.rating, starred=excluded.starred""")
|
|
conn.commit()
|
|
conn.close()
|
|
|
|
print(f"Imported {len(items)} annotations!")
|
|
|
|
except sqlite3.OperationalError as e:
|
|
print(f"Failed to write data: {e}")
|
|
|
|
|
|
def main():
|
|
parser = argparse.ArgumentParser(description="Navidrome backer-upper")
|
|
parser.add_argument("-u", "--user", type=str, nargs=1, metavar="user", default='user', help="Either the username or name of the account for which you want to manage annotations. default: 'user'")
|
|
parser.add_argument("-d", "--db", type=str, nargs=1, metavar="database", default='./navidrome.db', help="Filepath to the navidrome database you want to backup (or restore)")
|
|
parser.add_argument("ie", type=str, nargs=1, metavar="import|export", default=None, choices=['import', 'export'], help="What you want to do with the annotations. choices: [import, export]")
|
|
parser.add_argument("-p", "--path", type=str, nargs=1, required='import' in sys.argv, metavar="mediapath", default=None, help="The folder to import metadata from (import only)")
|
|
parser.add_argument("-f", "--replace-navidrome", type=str, nargs=1, metavar="from", default="", help="Path substitution to make when translating between Navidrome db and real folder structure; Navidrome component")
|
|
parser.add_argument("-t", "--replace-real", type=str, nargs=1, metavar="to", default="", help="Path substitution to make when translating between Navidrome db and real folder structure; real folder component.")
|
|
|
|
args = parser.parse_args()
|
|
if args.user == "user":
|
|
print(f"No user specified, using default ('{args.user}')...")
|
|
if args.db == './navidrome.db':
|
|
print(f"No database specified, using default ({args.db})...")
|
|
if args.ie[0] == "import":
|
|
print(f"Importing annotations from '{args.path[0]}'...")
|
|
elif args.ie[0] == "export":
|
|
print(f"Exporting annotations...")
|
|
sleep(1)
|
|
|
|
# run code
|
|
user_id = get_user_id(args.db[0], args.user[0])
|
|
|
|
if args.ie[0] == "import":
|
|
ImportService.imp(path=args.path[0], db=args.db[0], user_id=user_id, re_nav=args.replace_navidrome[0], re_re=args.replace_real[0])
|
|
elif args.ie[0] == "export":
|
|
ExportService.exp(db=args.db[0], user_id=user_id, re_nav=args.replace_navidrome[0], re_re=args.replace_real[0])
|
|
|
|
|
|
if __name__ == '__main__':
|
|
main()
|