Skip to content

Commit

Permalink
shared http clientsession
Browse files Browse the repository at this point in the history
  • Loading branch information
marcelveldt committed Sep 16, 2020
1 parent 136721d commit 955a3db
Show file tree
Hide file tree
Showing 11 changed files with 170 additions and 223 deletions.
2 changes: 1 addition & 1 deletion music_assistant/http_streamer.py
Original file line number Diff line number Diff line change
Expand Up @@ -247,7 +247,7 @@ def fill_buffer():
cur_chunk += 1

# HANDLE FIRST PART OF TRACK
if cur_chunk == 1 and is_last_chunk:
if not chunk and cur_chunk == 1 and is_last_chunk:
LOGGER.warning("Stream error, skip track %s", queue_track.item_id)
break
if cur_chunk <= 2 and not last_fadeout_data:
Expand Down
21 changes: 17 additions & 4 deletions music_assistant/mass.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import threading
from typing import Any, Awaitable, Callable, List, Optional, Union

import aiohttp
from music_assistant.cache import Cache
from music_assistant.config import MassConfig
from music_assistant.constants import (
Expand Down Expand Up @@ -40,6 +41,7 @@ def __init__(self, datapath):
"""

self.loop = None
self._http_session = None
self._event_listeners = []
self._providers = {}
self.config = MassConfig(self, datapath)
Expand All @@ -57,13 +59,18 @@ def __init__(self, datapath):

async def async_start(self):
"""Start running the music assistant server."""
# initialize loop
self.loop = asyncio.get_event_loop()
self.loop.set_exception_handler(self.__handle_exception)
if LOGGER.level == logging.DEBUG:
self.loop.set_debug(True)
# create shared aiohttp ClientSession
self._http_session = aiohttp.ClientSession(
loop=self.loop,
connector=aiohttp.TCPConnector(enable_cleanup_closed=True, ssl=False),
)
await self.database.async_setup()
await self.cache.async_setup()
await self.metadata.async_setup()
await self.music_manager.async_setup()
await self.player_manager.async_setup()
await self.web.async_setup()
Expand All @@ -79,6 +86,13 @@ async def async_stop(self):
for prov in self._providers.values():
await prov.async_on_stop()
await self.player_manager.async_close()
await self._http_session.connector.close()
self._http_session.detach()

@property
def http_session(self):
"""Return the default http session."""
return self._http_session

async def async_register_provider(self, provider: Provider):
"""Register a new Provider/Plugin."""
Expand Down Expand Up @@ -184,6 +198,7 @@ def remove_listener():

return remove_listener

@callback
def add_job(
self, target: Callable[..., Any], *args: Any
) -> Optional[asyncio.Future]:
Expand All @@ -205,9 +220,7 @@ def add_job(
if threading.current_thread() is not threading.main_thread():
# called from other thread
if asyncio.iscoroutine(check_target):
task = asyncio.run_coroutine_threadsafe(
target, self.loop
) # type: ignore
task = asyncio.run_coroutine_threadsafe(target, self.loop) # type: ignore
elif asyncio.iscoroutinefunction(check_target):
task = asyncio.run_coroutine_threadsafe(target(*args), self.loop)
elif is_callback(check_target):
Expand Down
25 changes: 2 additions & 23 deletions music_assistant/metadata.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,6 @@ def __init__(self, mass):
self.musicbrainz = MusicBrainz(mass)
self.fanarttv = FanartTv(mass)

async def async_setup(self):
"""Async setup of metadata module."""
await self.musicbrainz.async_setup()
await self.fanarttv.async_setup()

async def async_get_artist_metadata(self, mb_artist_id, cur_metadata):
"""Get/update rich metadata for an artist by providing the musicbrainz artist id."""
metadata = cur_metadata
Expand Down Expand Up @@ -120,14 +115,6 @@ def __init__(self, mass):
"""Initialize class."""
self.mass = mass
self.cache = mass.cache
self.throttler = None
self._http_session = None

async def async_setup(self):
"""Perform async setup."""
self._http_session = aiohttp.ClientSession(
loop=self.mass.loop, connector=aiohttp.TCPConnector()
)
self.throttler = Throttler(rate_limit=1, period=1)

async def async_search_artist_by_album(
Expand Down Expand Up @@ -209,7 +196,7 @@ async def async_get_data(self, endpoint: str, params: Optional[dict] = None):
headers = {"User-Agent": "Music Assistant/1.0.0 https://github.com/marcelveldt"}
params["fmt"] = "json"
async with self.throttler:
async with self._http_session.get(
async with self.mass.http_session.get(
url, headers=headers, params=params, verify_ssl=False
) as response:
try:
Expand All @@ -231,14 +218,6 @@ def __init__(self, mass):
"""Initialize class."""
self.mass = mass
self.cache = mass.cache
self._http_session = None
self.throttler = None

async def async_setup(self):
"""Perform async setup."""
self._http_session = aiohttp.ClientSession(
loop=self.mass.loop, connector=aiohttp.TCPConnector()
)
self.throttler = Throttler(rate_limit=1, period=2)

async def async_get_artist_images(self, mb_artist_id):
Expand Down Expand Up @@ -271,7 +250,7 @@ async def async_get_data(self, endpoint, params=None):
url = "http://webservice.fanart.tv/v3/%s" % endpoint
params["api_key"] = "639191cb0774661597f28a47e7e2bad5"
async with self.throttler:
async with self._http_session.get(
async with self.mass.http_session.get(
url, params=params, verify_ssl=False
) as response:
try:
Expand Down
3 changes: 2 additions & 1 deletion music_assistant/models/player_queue.py
Original file line number Diff line number Diff line change
Expand Up @@ -211,7 +211,8 @@ def use_queue_stream(self):
it will send a constant stream of audio to the player with all tracks.
"""
supports_crossfade = PlayerFeature.CROSSFADE in self.player.features
return self.crossfade_enabled and not supports_crossfade
supports_queue = PlayerFeature.QUEUE in self.player.features
return not supports_crossfade if self.crossfade_enabled else not supports_queue

@callback
def get_item(self, index):
Expand Down
1 change: 0 additions & 1 deletion music_assistant/models/provider.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,6 @@ async def async_on_start(self) -> bool:
@abstractmethod
async def async_on_stop(self):
"""Handle correct close/cleanup of the provider on exit. Called on shutdown."""
raise NotImplementedError

async def async_on_reload(self):
"""Handle configuration changes for this provider. Called on reload."""
Expand Down
42 changes: 41 additions & 1 deletion music_assistant/player_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
EVENT_PLAYER_REMOVED,
)
from music_assistant.models.config_entry import ConfigEntry, ConfigEntryType
from music_assistant.models.media_types import MediaItem, MediaType
from music_assistant.models.media_types import MediaItem, MediaType, Track
from music_assistant.models.player import (
Player,
PlayerControl,
Expand All @@ -24,6 +24,7 @@
from music_assistant.models.player_queue import PlayerQueue, QueueItem, QueueOption
from music_assistant.models.playerprovider import PlayerProvider
from music_assistant.models.provider import ProviderType
from music_assistant.models.streamdetails import ContentType, StreamDetails, StreamType
from music_assistant.utils import (
async_iter_items,
callback,
Expand Down Expand Up @@ -251,6 +252,45 @@ async def async_play_media(
if queue_opt == QueueOption.Add:
return await player_queue.async_append(queue_items)

async def async_cmd_play_uri(self, player_id: str, uri: str):
"""
Play the specified uri/url on the given player.
Will create a fake track on the queue.
:param player_id: player_id of the player to handle the command.
:param uri: Url/Uri that can be played by a player.
:param queue_opt:
QueueOption.Play -> Insert new items in queue and start playing at inserted position
QueueOption.Replace -> Replace queue contents with these items
QueueOption.Next -> Play item(s) after current playing item
QueueOption.Add -> Append new items at end of the queue
"""
player = self._players[player_id]
if not player:
return
queue_item = QueueItem(
Track(
item_id=uri,
provider="",
name="uri",
)
)
queue_item.streamdetails = StreamDetails(
type=StreamType.URL,
provider="",
item_id=uri,
path=uri,
content_type=ContentType(uri.split(".")[-1]),
sample_rate=44100,
bit_depth=16,
)
# turn on player
await self.async_cmd_power_on(player_id)
# load item into the queue
player_queue = self.get_player_queue(player_id)
return await player_queue.async_insert([queue_item], 0)

async def async_cmd_stop(self, player_id: str) -> None:
"""
Send STOP command to given player.
Expand Down
Loading

0 comments on commit 955a3db

Please sign in to comment.