Skip to content

Commit

Permalink
Throttle metadata scanner
Browse files Browse the repository at this point in the history
  • Loading branch information
marcelveldt committed Oct 25, 2024
1 parent 1b6b216 commit 823bf60
Show file tree
Hide file tree
Showing 3 changed files with 70 additions and 48 deletions.
2 changes: 1 addition & 1 deletion music_assistant/server/controllers/media/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -272,7 +272,7 @@ async def get(
):
# schedule a refresh of the metadata on access of the item
# e.g. the item is being played or opened in the UI
self.mass.metadata.schedule_update_metadata(library_item)
self.mass.metadata.schedule_update_metadata(library_item.uri)
return library_item
# grab full details from the provider
return await self.get_provider_item(
Expand Down
110 changes: 68 additions & 42 deletions music_assistant/server/controllers/metadata.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
from __future__ import annotations

import asyncio
import collections
import logging
import os
import random
Expand Down Expand Up @@ -52,6 +53,7 @@
from music_assistant.server.helpers.api import api_command
from music_assistant.server.helpers.compare import compare_strings
from music_assistant.server.helpers.images import create_collage, get_image_thumb
from music_assistant.server.helpers.throttle_retry import Throttler
from music_assistant.server.models.core_controller import CoreController

if TYPE_CHECKING:
Expand Down Expand Up @@ -104,6 +106,7 @@
REFRESH_INTERVAL_ALBUMS = 60 * 60 * 24 * 90 # 90 days
REFRESH_INTERVAL_TRACKS = 60 * 60 * 24 * 90 # 90 days
REFRESH_INTERVAL_PLAYLISTS = 60 * 60 * 24 * 7 # 7 days
PERIODIC_SCAN_INTERVAL = 60 * 60 * 24 # 1 day
CONF_ENABLE_ONLINE_METADATA = "enable_online_metadata"


Expand All @@ -123,7 +126,10 @@ def __init__(self, *args, **kwargs) -> None:
"Music Assistant's core controller which handles all metadata for music."
)
self.manifest.icon = "book-information-variant"
self._scanner_task: asyncio.Task | None = None
self._lookup_jobs: MetadataLookupQueue = MetadataLookupQueue()
self._lookup_task: asyncio.Task | None = None
self._throttler = Throttler(1, 30)
self._missing_metadata_scan_task: asyncio.Task | None = None

async def get_config_entries(
self,
Expand Down Expand Up @@ -172,12 +178,19 @@ async def setup(self, config: CoreConfig) -> None:
self._collage_images_dir = os.path.join(self.mass.storage_path, "collage_images")
if not await asyncio.to_thread(os.path.exists, self._collage_images_dir):
await asyncio.to_thread(os.mkdir, self._collage_images_dir)

self.mass.streams.register_dynamic_route("/imageproxy", self.handle_imageproxy)
# the lookup task is used to process metadata lookup jobs
self._lookup_task = self.mass.create_task(self._process_metadata_lookup_jobs())
# just tun the scan for missing metadata once at startup
# TODO: allows to enable/disable this in the UI and configure interval/time
self._missing_metadata_scan_task = self.mass.create_task(self._scan_missing_metadata())

async def close(self) -> None:
"""Handle logic on server stop."""
self.stop_metadata_scanner()
if self._lookup_task and not self._lookup_task.done():
self._lookup_task.cancel()
if self._missing_metadata_scan_task and not self._missing_metadata_scan_task.done():
self._missing_metadata_scan_task.cancel()
self.mass.streams.unregister_dynamic_route("/imageproxy")

@property
Expand Down Expand Up @@ -243,39 +256,25 @@ async def update_metadata(
if item.provider != "library":
# this shouldn't happen but just in case.
raise RuntimeError("Metadata can only be updated for library items")
if item.media_type == MediaType.ARTIST:
await self._update_artist_metadata(item, force_refresh=force_refresh)
if item.media_type == MediaType.ALBUM:
await self._update_album_metadata(item, force_refresh=force_refresh)
if item.media_type == MediaType.TRACK:
await self._update_track_metadata(item, force_refresh=force_refresh)
if item.media_type == MediaType.PLAYLIST:
await self._update_playlist_metadata(item, force_refresh=force_refresh)
# just in case it was in the queue, prevent duplicate lookups
self._lookup_jobs.pop(item.uri)
async with self._throttler:
if item.media_type == MediaType.ARTIST:
await self._update_artist_metadata(item, force_refresh=force_refresh)
if item.media_type == MediaType.ALBUM:
await self._update_album_metadata(item, force_refresh=force_refresh)
if item.media_type == MediaType.TRACK:
await self._update_track_metadata(item, force_refresh=force_refresh)
if item.media_type == MediaType.PLAYLIST:
await self._update_playlist_metadata(item, force_refresh=force_refresh)
return item

def schedule_update_metadata(self, item: MediaItemType) -> None:
"""Schedule metadata update for given item."""
task_id = f"metadata_update_{item.uri}"
self.mass.call_later(5, self.update_metadata, item, task_id=task_id)

@api_command("metadata/start_scan")
def start_metadata_scanner(self) -> None:
"""
Start scanner for (missing) metadata.
Usually this is triggered by the music controller after finishing a library sync.
"""
if self._scanner_task and not self._scanner_task.done():
# already running
def schedule_update_metadata(self, uri: str) -> None:
"""Schedule metadata update for given MediaItem uri."""
if "library" not in uri:
return
self._scanner_task = self.mass.create_task(self._metadata_scanner())

@api_command("metadata/stop_scan")
def stop_metadata_scanner(self) -> None:
"""Stop scanner for (missing) metadata."""
if self._scanner_task and not self._scanner_task.done():
self._scanner_task.cancel()
self._scanner_task = None
with suppress(asyncio.QueueFull):
self._lookup_jobs.put_nowait(uri)

async def get_image_data_for_item(
self,
Expand Down Expand Up @@ -736,8 +735,24 @@ async def _get_artist_mbid(self, artist: Artist) -> str | None:
)
return None

async def _metadata_scanner(self) -> None:
"""Scanner for (missing) metadata."""
async def _process_metadata_lookup_jobs(self) -> None:
"""Task to process metadata lookup jobs."""
while True:
item_uri = await self._lookup_jobs.get()
try:
item = await self.mass.music.get_item_by_uri(item_uri)
await self.update_metadata(item)
except Exception as err:
self.logger.error(
"Error while updating metadata for %s: %s",
item_uri,
str(err),
exc_info=err if self.logger.isEnabledFor(10) else None,
)

async def _scan_missing_metadata(self) -> None:
"""Scanner for (missing) metadata, periodically in the background."""
self._periodic_scan = None
# Scan for missing artist images
self.logger.debug("Start lookup for missing artist images...")
query = (
Expand All @@ -746,9 +761,7 @@ async def _metadata_scanner(self) -> None:
f"OR json_extract({DB_TABLE_ARTISTS}.metadata,'$.images') = '[]')"
)
for artist in await self.mass.music.artists.library_items(extra_query=query):
await self._update_artist_metadata(artist)
# sleep a bit to not overload the providers
await asyncio.sleep(10)
self.schedule_update_metadata(artist.uri)

# Scan for missing album images
self.logger.debug("Start lookup for missing album images...")
Expand All @@ -760,9 +773,7 @@ async def _metadata_scanner(self) -> None:
for album in await self.mass.music.albums.library_items(
limit=50, order_by="random", extra_query=query
):
await self._update_album_metadata(album)
# sleep a bit to not overload the providers
await asyncio.sleep(10)
self.schedule_update_metadata(album.uri)

# Force refresh playlist metadata every refresh interval
# this will e.g. update the playlist image and genres if the tracks have changed
Expand All @@ -774,4 +785,19 @@ async def _metadata_scanner(self) -> None:
for playlist in await self.mass.music.playlists.library_items(
limit=10, order_by="random", extra_query=query
):
await self._update_playlist_metadata(playlist, True)
self.schedule_update_metadata(playlist.uri)


class MetadataLookupQueue(asyncio.Queue):
"""Representation of a queue for metadata lookups."""

def _init(self, maxlen: int = 100):
self._queue: collections.deque[str] = collections.deque(maxlen=maxlen)

def _put(self, item: str) -> None:
if item not in self._queue:
self._queue.append(item)

def pop(self, item: str) -> None:
"""Remove item from queue."""
self._queue.remove(item)
6 changes: 1 addition & 5 deletions music_assistant/server/controllers/music.py
Original file line number Diff line number Diff line change
Expand Up @@ -531,7 +531,6 @@ async def remove_item_from_library(
Destructive! Will remove the item and all dependants.
"""
self.mass.metadata.stop_metadata_scanner()
ctrl = self.get_controller(media_type)
item = await ctrl.get_library_item(library_item_id)
# remove from all providers
Expand Down Expand Up @@ -845,16 +844,14 @@ def on_sync_task_done(task: asyncio.Task) -> None:
else:
self.logger.info("Sync task for %s completed", provider.name)
self.mass.signal_event(EventType.SYNC_TASKS_UPDATED, data=self.in_progress_syncs)
# schedule db cleanup + metadata scan after sync
# schedule db cleanup after sync
if not self.in_progress_syncs:
self.mass.create_task(self._cleanup_database())
self.mass.metadata.start_metadata_scanner()

task.add_done_callback(on_sync_task_done)

async def cleanup_provider(self, provider_instance: str) -> None:
"""Cleanup provider records from the database."""
self.mass.metadata.stop_metadata_scanner()
if provider_instance.startswith(("filesystem", "jellyfin", "plex", "opensubsonic")):
# removal of a local provider can become messy very fast due to the relations
# such as images pointing at the files etc. so we just reset the whole db
Expand Down Expand Up @@ -1120,7 +1117,6 @@ async def __migrate_database(self, prev_version: int) -> None:

async def _reset_database(self) -> None:
"""Reset the database."""
self.mass.metadata.stop_metadata_scanner()
await self.close()
db_path = os.path.join(self.mass.storage_path, "library.db")
await asyncio.to_thread(os.remove, db_path)
Expand Down

0 comments on commit 823bf60

Please sign in to comment.