Skip to content

Commit

Permalink
Fix track enqueuing (#1747)
Browse files Browse the repository at this point in the history
  • Loading branch information
marcelveldt authored Oct 24, 2024
1 parent 6699407 commit 720e13e
Show file tree
Hide file tree
Showing 2 changed files with 44 additions and 20 deletions.
13 changes: 10 additions & 3 deletions music_assistant/common/models/player_queue.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,21 +51,27 @@ class PlayerQueue(DataClassDictMixin):
flow_mode: bool = False
resume_pos: int = 0
flow_mode_stream_log: list[PlayLogEntry] = field(default_factory=list)
next_track_enqueued: str | None = None

@property
def corrected_elapsed_time(self) -> float:
"""Return the corrected/realtime elapsed time."""
return self.elapsed_time + (time.time() - self.elapsed_time_last_updated)

def __post_serialize__(self, d: dict[Any, Any]) -> dict[Any, Any]:
"""Execute action(s) on serialization."""
d.pop("flow_mode_stream_log", None)
d.pop("enqueued_media_items", None)
d.pop("next_track_enqueued", None)
return d

def to_cache(self) -> dict[str, Any]:
"""Return the dict that is suitable for storing into the cache db."""
d = self.to_dict()
d.pop("current_item", None)
d.pop("next_item", None)
d.pop("index_in_buffer", None)
d.pop("flow_mode", None)
d.pop("flow_mode_stream_log", None)
d.pop("enqueued_media_items", None)
return d

@classmethod
Expand All @@ -75,6 +81,7 @@ def from_cache(cls, d: dict[Any, Any]) -> Self:
d.pop("next_item", None)
d.pop("index_in_buffer", None)
d.pop("flow_mode", None)
d.pop("flow_mode_stream_log", None)
d.pop("enqueued_media_items", None)
d.pop("next_track_enqueued", None)
d.pop("flow_mode_stream_log", None)
return cls.from_dict(d)
51 changes: 34 additions & 17 deletions music_assistant/server/controllers/player_queues.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
import asyncio
import random
import time
from contextlib import suppress
from typing import TYPE_CHECKING, Any, TypedDict

from music_assistant.common.helpers.util import get_changed_keys
Expand Down Expand Up @@ -288,9 +287,6 @@ def set_repeat(self, queue_id: str, repeat_mode: RepeatMode) -> None:
return # no change
queue.repeat_mode = repeat_mode
self.signal_update(queue_id)
# ensure that we trigger enqueue next if repeat mode changed (if needed/supported)
task_id = f"enqueue_next_{queue_id}"
self.mass.call_later(5, self._enqueue_next, queue, queue.current_index, task_id=task_id)

@api_command("player_queues/play_media")
async def play_media(
Expand Down Expand Up @@ -735,6 +731,7 @@ async def play_index(
queue.flow_mode = await self.mass.config.get_player_config_value(queue_id, CONF_FLOW_MODE)
next_index = self._get_next_index(queue_id, index, allow_repeat=False)
queue.current_item = queue_item
queue.next_track_enqueued = None
self.signal_update(queue_id)

# work out if we are playing an album and if we should prefer album loudness
Expand Down Expand Up @@ -923,7 +920,11 @@ def on_player_update(
# and has an item loaded so we are able to resume it
queue.state = player.state or PlayerState.IDLE
queue.current_item = self.get_item(queue_id, queue.current_index)
queue.next_item = self._get_next_item(queue_id)
queue.next_item = (
self.get_item(queue_id, queue.next_track_enqueued)
if queue.next_track_enqueued
else self._get_next_item(queue_id, queue.current_index)
)

# correct elapsed time when seeking
if (
Expand All @@ -935,6 +936,15 @@ def on_player_update(
):
queue.elapsed_time += queue.current_item.streamdetails.seek_position

# enqueue next track if needed
if (
queue.state == PlayerState.PLAYING
and queue.next_item is not None
and not queue.next_track_enqueued
and queue.corrected_elapsed_time > 2
):
self._check_enqueue_next(queue)

# basic throttle: do not send state changed events if queue did not actually change
prev_state = self._prev_states.get(
queue_id,
Expand Down Expand Up @@ -1002,12 +1012,17 @@ def on_player_update(
object_id=queue_item.media_item.uri,
data=round(seconds_streamed, 2),
)

if end_of_queue_reached:
# end of queue reached, clear items
self.mass.call_later(
5, self._check_clear_queue, queue, task_id=f"clear_queue_{queue_id}"
)

# clear 'next track enqueued' flag if new track is loaded
if prev_state["current_index"] != new_state["current_index"]:
queue.next_track_enqueued = None

# watch dynamic radio items refill if needed
elif "current_index" in changed_keys:
if (
Expand Down Expand Up @@ -1135,11 +1150,6 @@ def track_loaded_in_buffer(self, queue_id: str, item_id: str) -> None:
if queue.flow_mode:
return # nothing to do when flow mode is active
self.signal_update(queue_id)
# enqueue the next track as soon as the player reports
# it has started buffering the given queue item
if not queue.flow_mode:
task_id = f"enqueue_next_{queue_id}"
self.mass.call_later(5, self._enqueue_next, queue, item_id, task_id=task_id)

# Main queue manipulation methods

Expand Down Expand Up @@ -1180,6 +1190,7 @@ def update_items(self, queue_id: str, queue_items: list[QueueItem]) -> None:
self._queue_items[queue_id] = queue_items
self._queues[queue_id].items = len(self._queue_items[queue_id])
self.signal_update(queue_id, True)
self._queues[queue_id].next_track_enqueued = None

# Helper methods

Expand Down Expand Up @@ -1376,19 +1387,25 @@ async def _fill_radio_tracks(self, queue_id: str) -> None:
insert_at_index=len(self._queue_items[queue_id]) + 1,
)

async def _enqueue_next(self, queue: PlayerQueue, current_index: int | str) -> None:
"""Enqueue the next item in the queue."""
def _check_enqueue_next(self, queue: PlayerQueue) -> None:
"""Enqueue the next item in the queue (if needed)."""
if queue.flow_mode:
return
if isinstance(current_index, str):
current_index = self.index_by_id(queue.queue_id, current_index)
with suppress(QueueEmpty):
next_item = await self.load_next_item(queue.queue_id, current_index)
if queue.next_item is None:
return
if queue.next_track_enqueued == queue.next_item.queue_item_id:
return

async def _enqueue_next():
next_item = await self.load_next_item(queue.queue_id, queue.current_index)
queue.next_track_enqueued = next_item.queue_item_id
await self.mass.players.enqueue_next_media(
player_id=queue.queue_id,
media=self.player_media_from_queue_item(next_item, queue.flow_mode),
media=self.player_media_from_queue_item(next_item, False),
)

self.mass.create_task(_enqueue_next())

async def _get_radio_tracks(
self, queue_id: str, is_initial_radio_mode: bool = False
) -> list[Track]:
Expand Down

0 comments on commit 720e13e

Please sign in to comment.