Skip to content

Commit

Permalink
feat: POC player state tracking #74
Browse files Browse the repository at this point in the history
* Implements generic Player state tracking when using MemorySource
  * State tracking will replace processRecentPlays and all its data structures
* Tracks reported player position OR real-time listened when position not available
* Records listened sessions for more exhaustive "listen to" duration and eventually comparing incoming data playedAt for dups
* Uses own tracked data to calculate most likely player state (stopped/started/paused) as well as health (stale/orphaned from no data)
* Tracks player's reported state, if provided
  • Loading branch information
FoxxMD committed Aug 9, 2023
1 parent 8579715 commit 37d4a1c
Show file tree
Hide file tree
Showing 15 changed files with 559 additions and 75 deletions.
41 changes: 40 additions & 1 deletion src/common/infrastructure/Atomic.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ import {FixedSizeList} from 'fixed-size-list';
import {MESSAGE} from 'triple-beam';
import {Logger} from '@foxxmd/winston';
import TupleMap from "../TupleMap.js";
import is from "@sindresorhus/is";

export type SourceType = 'spotify' | 'plex' | 'tautulli' | 'subsonic' | 'jellyfin' | 'lastfm' | 'deezer' | 'ytmusic' | 'mpris' | 'mopidy' | 'listenbrainz' | 'jriver' | 'kodi';
export const sourceTypes: SourceType[] = ['spotify', 'plex', 'tautulli', 'subsonic', 'jellyfin', 'lastfm', 'deezer', 'ytmusic', 'mpris', 'mopidy', 'listenbrainz', 'jriver', 'kodi'];
Expand Down Expand Up @@ -39,13 +40,39 @@ export const REPORTED_PLAYER_STATUSES = {
unknown: 'unknown' as ReportedPlayerStatus
}

export type CalculatedPlayerStatus = ReportedPlayerStatus | 'stale' | 'orphaned';
export const CALCULATED_PLAYER_STATUSES = {
...REPORTED_PLAYER_STATUSES,
stale: 'stale' as CalculatedPlayerStatus,
orphaned: 'orphaned' as CalculatedPlayerStatus,
}

export interface ConfigMeta {
source: string
mode?: string
configureAs: string
}

export type ListenRange = [Dayjs, Dayjs]
export type SourceData = (PlayObject | PlayerStateData);

export interface PlayerStateData {
platformId: PlayPlatformId
play: PlayObject
status?: ReportedPlayerStatus
position?: number
timestamp?: Dayjs
}

export const asPlayerStateData = (obj: object): obj is PlayerStateData => {
return 'platformId' in obj && 'play' in obj;
}

export interface PlayProgress {
timestamp: Dayjs
position?: number
positionPercent?: number
}
export type ListenRange = [PlayProgress, PlayProgress]

export interface TrackData {
artists?: string[]
Expand Down Expand Up @@ -247,3 +274,15 @@ export interface RegExResult {
export interface NamedGroup {
[name: string]: any
}

export interface numberFormatOptions {
toFixed: number,
defaultVal?: any,
prefix?: string,
suffix?: string,
round?: {
type?: string,
enable: boolean,
indicate?: boolean,
}
}
2 changes: 1 addition & 1 deletion src/sources/AbstractSource.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ import dayjs, {Dayjs} from "dayjs";
import {
buildTrackString,
capitalize, closePlayDate, genGroupId,
genGroupIdStr, mergeArr,
genGroupIdStrFromPlay, mergeArr,
playObjDataMatch, pollingBackoff,
sleep, sortByNewestPlayDate,
sortByOldestPlayDate
Expand Down
2 changes: 1 addition & 1 deletion src/sources/JRiverSource.ts
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ export class JRiverSource extends MemorySource {
}
}

return this.processRecentPlays(play);
return this.processRecentPlaysNew(play);
}

}
11 changes: 9 additions & 2 deletions src/sources/JellyfinSource.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,11 @@ import {
truncateStringToLength
} from "../utils.js";
import {JellySourceConfig} from "../common/infrastructure/config/source/jellyfin.js";
import {FormatPlayObjectOptions, InternalConfig, PlayObject} from "../common/infrastructure/Atomic.js";
import {FormatPlayObjectOptions, InternalConfig, PlayObject, PlayPlatformId} from "../common/infrastructure/Atomic.js";
import EventEmitter from "events";
import {PlayerStateOptions} from "./PlayerState/AbstractPlayerState.js";
import {Logger} from "@foxxmd/winston";
import {JellyfinPlayerState} from "./PlayerState/JellyfinPlayerState.js";

const shortDeviceId = truncateStringToLength(10, '');

Expand Down Expand Up @@ -306,7 +309,7 @@ export default class JellyfinSource extends MemorySource {
scrobbleOpts.checkAll = true;

} else {
newPlays = this.processRecentPlays([playObj]);
newPlays = this.processRecentPlaysNew([playObj]);
}

if(newPlays.length > 0) {
Expand All @@ -318,4 +321,8 @@ export default class JellyfinSource extends MemorySource {
}
}
}

getNewPlayer = (logger: Logger, id: PlayPlatformId, opts: PlayerStateOptions) => {
return new JellyfinPlayerState(logger, id, opts);
}
}
2 changes: 1 addition & 1 deletion src/sources/KodiSource.ts
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ export class KodiSource extends MemorySource {

let play = await this.client.getRecentlyPlayed(options);

return this.processRecentPlays(play);
return this.processRecentPlaysNew(play);
}

}
2 changes: 1 addition & 1 deletion src/sources/MPRISSource.ts
Original file line number Diff line number Diff line change
Expand Up @@ -214,7 +214,7 @@ export class MPRISSource extends MemorySource {
if(options.display === true) {
return deduped;
}
return this.processRecentPlays(deduped);
return this.processRecentPlaysNew(deduped);
}
}

Expand Down
152 changes: 132 additions & 20 deletions src/sources/MemorySource.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,17 +5,27 @@ import {
buildTrackString,
toProgressAwarePlayObject,
getProgress,
genGroupIdStr, playPassesScrobbleThreshold, timePassesScrobbleThreshold, thresholdResultSummary, genGroupId
genGroupIdStrFromPlay,
playPassesScrobbleThreshold,
timePassesScrobbleThreshold,
thresholdResultSummary,
genGroupId,
genGroupIdStr, getPlatformIdFromData
} from "../utils.js";
import dayjs from "dayjs";
import {
asPlayerStateData,
DeviceId,
GroupedPlays,
PlayObject, PlayUserId,
GroupedPlays, PlayerStateData,
PlayObject, PlayPlatformId, PlayUserId,
ProgressAwarePlayObject,
ScrobbleThresholdResult
} from "../common/infrastructure/Atomic.js";
import TupleMap from "../common/TupleMap.js";
import {AbstractPlayerState, PlayerStateOptions} from "./PlayerState/AbstractPlayerState.js";
import {GenericPlayerState} from "./PlayerState/GenericPlayerState.js";
import {Logger} from "@foxxmd/winston";

export default class MemorySource extends AbstractSource {
/*
* MemorySource uses its own state to maintain a list of recently played tracks and determine if a track is valid.
Expand All @@ -42,11 +52,111 @@ export default class MemorySource extends AbstractSource {
* */
candidateRecentlyPlayed: GroupedPlays = new TupleMap<DeviceId, PlayUserId, ProgressAwarePlayObject[]>

players: Map<string, AbstractPlayerState> = new Map();

getFlatCandidateRecentlyPlayed = (): PlayObject[] => {
// TODO sort?
return Array.from(this.candidateRecentlyPlayed.values()).flat();
}

getNewPlayer = (logger: Logger, id: PlayPlatformId, opts: PlayerStateOptions) => {
return new GenericPlayerState(logger, id, opts);
}

processRecentPlaysNew = (datas: (PlayObject | PlayerStateData)[]) => {

const {
data: {
scrobbleThresholds = {}
} = {}
} = this.config;

const newStatefulPlays: PlayObject[] = [];

// create any new players from incoming data
//const incomingPlatformIds: PlayPlatformId[] = [];
for (const data of datas) {
const id = getPlatformIdFromData(data);
const idStr = genGroupIdStr(id);
if (!this.players.has(idStr)) {
//incomingPlatformIds.push(id);
this.players.set(idStr, this.getNewPlayer(this.logger, id, {
staleInterval: (this.config.data.interval ?? 30) * 3,
orphanedInterval: (this.config.data.maxInterval ?? 60) * 5
}));
}
}

const deadPlatformIds: string[] = [];

for (const [key, player] of this.players.entries()) {

let incomingData: PlayObject | PlayerStateData;
// get all incoming datas relevant for each player (this should only be one)
const relevantDatas = datas.filter(x => {
const id = getPlatformIdFromData(x);
return player.platformEquals(id);
});

// we've received some form of communication from the source for this player
if (relevantDatas.length > 0) {
this.lastActivityAt = dayjs();

if (relevantDatas.length > 1) {
this.logger.warn(`More than one data/state for Player ${player.platformIdStr} found in incoming data, will only use first found.`);
}
incomingData = relevantDatas[0];

const [currPlay, prevPlay] = asPlayerStateData(incomingData) ? player.setState(incomingData.status, incomingData.play) : player.setState(undefined, incomingData);
const candidate = prevPlay !== undefined ? prevPlay : currPlay;

if (candidate !== undefined) {
const thresholdResults = timePassesScrobbleThreshold(scrobbleThresholds, candidate.data.listenedFor, candidate.data.duration);

if (thresholdResults.passes) {
const matchingRecent = this.existingDiscovered(candidate); //sRecentlyPlayed.find(x => playObjDataMatch(x, candidate));
let stPrefix = `${buildTrackString(candidate, {include: ['trackId', 'artist', 'track']})}`;
if (matchingRecent === undefined) {
player.logger.debug(`${stPrefix} added after ${thresholdResultSummary(thresholdResults)} and not matching any prior plays`);
newStatefulPlays.push(candidate);
} else {
const {data: {playDate, duration}} = candidate;
const {data: {playDate: rplayDate}} = matchingRecent;
if (!playDate.isSame(rplayDate)) {
if (duration !== undefined) {
if (playDate.isAfter(rplayDate.add(duration, 's'))) {
player.logger.debug(`${stPrefix} added after ${thresholdResultSummary(thresholdResults)} and having a different timestamp than a prior play`);
newStatefulPlays.push(candidate);
}
} else {
const discoveredPlays = this.getRecentlyDiscoveredPlaysByPlatform(genGroupId(candidate));
if (discoveredPlays.length === 0 || !playObjDataMatch(discoveredPlays[0], candidate)) {
// if most recent stateful play is not this track we'll add it
player.logger.debug(`${stPrefix} added after ${thresholdResultSummary(thresholdResults)}. Matched other recent play but could not determine time frame due to missing duration. Allowed due to not being last played track.`);
newStatefulPlays.push(candidate);
}
}
}
}
}
}
} else {
// no communication from the source was received for this player
player.checkStale();
if (player.checkOrphaned() && player.isDead()) {
player.logger.debug(`Removed after being orphaned for ${dayjs.duration(player.stateIntervalOptions.orphanedInterval, 'seconds').asMinutes()} minutes`);
deadPlatformIds.push(player.platformIdStr);
}
}
player.logSummary();
}
for (const deadId of deadPlatformIds) {
this.players.delete(deadId);
}

return newStatefulPlays;
}

processRecentPlays = (plays: PlayObject[], useExistingPlayDate = false) => {

const {
Expand All @@ -61,27 +171,28 @@ export default class MemorySource extends AbstractSource {

// -- otherwise, for sources like Spotify that accurately report when track started to play, we can use existing dates
const flatLockedPlays = useExistingPlayDate ? plays : plays.map((p: any) => {
const {data: {playDate, ...restData}, ...rest} = p;
return {data: {...restData, playDate: dayjs()}, ...rest};
const {data: {playDate, ...restData}, ...rest} = p;
return {data: {...restData, playDate: dayjs()}, ...rest};
});

// group by device-user
const groupedLockedPlays = flatLockedPlays.reduce((acc: GroupedPlays, curr: ProgressAwarePlayObject) => {
const id = genGroupId(curr);
acc.set(id, (acc.get(id) ?? []).concat(curr));
return acc;
}, new Map());

for(const [groupId, lockedPlays] of groupedLockedPlays.entries()) {
for (const [groupId, lockedPlays] of groupedLockedPlays.entries()) {
const groupIdStr = `${groupId[0]}-${groupId[1]}`;
let cRecentlyPlayed = this.candidateRecentlyPlayed.get(groupId) ?? [];
// if no candidates exist new plays are new candidates
if(cRecentlyPlayed.length === 0) {
if (cRecentlyPlayed.length === 0) {
this.logger.debug(`[Platform ${groupIdStr}] No prior candidate recent plays!`)
// update activity date here so that polling interval decreases *before* we get a new valid play
// so that we don't miss a play due to long polling interval
this.lastActivityAt = dayjs();
const progressAware: ProgressAwarePlayObject[] = [];
for(const p of lockedPlays) {
for (const p of lockedPlays) {
progressAware.push(toProgressAwarePlayObject(p));
this.logger.debug(`[Platform ${groupIdStr}] Adding new locked play: ${buildTrackString(p, {include: ['trackId', 'artist', 'track']})}`);
}
Expand All @@ -90,20 +201,20 @@ export default class MemorySource extends AbstractSource {
// otherwise determine new tracks (not found in prior candidates)
const newTracks = lockedPlays.filter((x: any) => cRecentlyPlayed.every(y => !playObjDataMatch(y, x)));
const newProgressAwareTracks: ProgressAwarePlayObject[] = [];
if(newTracks.length > 0) {
if (newTracks.length > 0) {
// update activity date here so that polling interval decreases *before* we get a new valid play
// so that we don't miss a play due to long polling interval
this.lastActivityAt = dayjs();
this.logger.debug(`[Platform ${groupIdStr}] New plays found that do not match existing candidates.`)
for(const p of newTracks) {
for (const p of newTracks) {
this.logger.debug(`[Platform ${groupIdStr}] Adding new locked play: ${buildTrackString(p, {include: ['trackId', 'artist', 'track']})}`);
newProgressAwareTracks.push(toProgressAwarePlayObject(p));
}
}
// filter prior candidates based on new recently played
cRecentlyPlayed = cRecentlyPlayed.filter(x => {
const candidateMatchedLocked = lockedPlays.some((y: any) => playObjDataMatch(x, y));
if(!candidateMatchedLocked) {
if (!candidateMatchedLocked) {
this.logger.debug(`[Platform ${groupIdStr}] Existing candidate not found in locked plays will be removed: ${buildTrackString(x, {include: ['trackId', 'artist', 'track']})}`);
}
return candidateMatchedLocked;
Expand All @@ -119,7 +230,7 @@ export default class MemorySource extends AbstractSource {
// now we check if all candidates pass tests for having been tracked long enough:
// * Has been tracked for at least [duration] seconds or [percentage] of track duration
// * If it has playback position data then it must also have progressed at least [duration] seconds or [percentage] of track duration progress since our initial tracking data
for(const candidate of cRecentlyPlayed) {
for (const candidate of cRecentlyPlayed) {
let thresholdResults: ScrobbleThresholdResult;
thresholdResults = playPassesScrobbleThreshold(candidate, scrobbleThresholds);
const {passes: firstSeenValid} = thresholdResults;
Expand All @@ -138,28 +249,28 @@ export default class MemorySource extends AbstractSource {
}
}

if(firstSeenValid && progressValid) {
if (firstSeenValid && progressValid) {
// a prior candidate has been playing for more than 30 seconds and passed progress test, time to check statefuls

const matchingRecent = this.existingDiscovered(candidate); //sRecentlyPlayed.find(x => playObjDataMatch(x, candidate));
let stPrefix = `[Platform ${groupId}] (Stateful Play) ${buildTrackString(candidate, {include: ['trackId', 'artist', 'track']})}`;
if(matchingRecent === undefined) {
if (matchingRecent === undefined) {
this.logger.debug(`${stPrefix} added after ${thresholdResultSummary(thresholdResults)} and not matching any prior plays`);
newStatefulPlays.push(candidate);
//sRecentlyPlayed.push(candidate);
} else {
const {data: { playDate, duration }} = candidate;
const {data: { playDate: rplayDate }} = matchingRecent;
if(!playDate.isSame(rplayDate)) {
if(duration !== undefined) {
if(playDate.isAfter(rplayDate.add(duration, 's'))) {
const {data: {playDate, duration}} = candidate;
const {data: {playDate: rplayDate}} = matchingRecent;
if (!playDate.isSame(rplayDate)) {
if (duration !== undefined) {
if (playDate.isAfter(rplayDate.add(duration, 's'))) {
this.logger.debug(`${stPrefix} added after ${thresholdResultSummary(thresholdResults)} and having a different timestamp than a prior play`);
newStatefulPlays.push(candidate);
//sRecentlyPlayed.push(candidate);
}
} else {
const discoveredPlays = this.getRecentlyDiscoveredPlaysByPlatform(genGroupId(candidate));
if(discoveredPlays.length === 0 || !playObjDataMatch(discoveredPlays[0], candidate)) {
if (discoveredPlays.length === 0 || !playObjDataMatch(discoveredPlays[0], candidate)) {
// if most recent stateful play is not this track we'll add it
this.logger.debug(`${stPrefix} added after ${thresholdResultSummary(thresholdResults)}. Matched other recent play but could not determine time frame due to missing duration. Allowed due to not being last played track.`);
newStatefulPlays.push(candidate);
Expand All @@ -183,6 +294,7 @@ export default class MemorySource extends AbstractSource {
return playObj.data.playDate.isBefore(dayjs().subtract(30, 's'));
}
}

function sortByPlayDate(a: ProgressAwarePlayObject, b: ProgressAwarePlayObject): number {
throw new Error("Function not implemented.");
}
Expand Down
Loading

0 comments on commit 37d4a1c

Please sign in to comment.