diff --git a/common/src/main/java/com/ably/tracking/common/AblyHelpers.kt b/common/src/main/java/com/ably/tracking/common/AblyHelpers.kt index 0c45c9abe..70133b979 100644 --- a/common/src/main/java/com/ably/tracking/common/AblyHelpers.kt +++ b/common/src/main/java/com/ably/tracking/common/AblyHelpers.kt @@ -161,7 +161,7 @@ fun io.ably.lib.types.PresenceMessage.toTracking(gson: Gson): PresenceMessage? = PresenceMessage( this.action.toTracking(), presenceData, - this.clientId + this.memberKey(), ) } diff --git a/common/src/main/java/com/ably/tracking/common/AblyModels.kt b/common/src/main/java/com/ably/tracking/common/AblyModels.kt index 4e3145399..9d85a5d69 100644 --- a/common/src/main/java/com/ably/tracking/common/AblyModels.kt +++ b/common/src/main/java/com/ably/tracking/common/AblyModels.kt @@ -2,7 +2,19 @@ package com.ably.tracking.common import com.ably.tracking.Resolution -data class PresenceMessage(val action: PresenceAction, val data: PresenceData, val clientId: String) +/** + * Encapsulates the properties of an Ably presence message which are needed by asset tracking SDKs. + */ +data class PresenceMessage( + val action: PresenceAction, + val data: PresenceData, + + /** + * Combination of Ably `clientId` and `connectionId`. + * See: https://sdk.ably.com/builds/ably/specification/main/features/#TP3h + */ + val memberKey: String, +) enum class PresenceAction { PRESENT_OR_ENTER, LEAVE_OR_ABSENT, UPDATE; diff --git a/publishing-sdk/src/main/java/com/ably/tracking/publisher/ConfigurationModels.kt b/publishing-sdk/src/main/java/com/ably/tracking/publisher/ConfigurationModels.kt index d2a0c8f57..1e9878246 100644 --- a/publishing-sdk/src/main/java/com/ably/tracking/publisher/ConfigurationModels.kt +++ b/publishing-sdk/src/main/java/com/ably/tracking/publisher/ConfigurationModels.kt @@ -260,7 +260,7 @@ data class Trackable( override fun hashCode(): Int = id.hashCode() } -data class Subscriber(val id: String, val trackable: Trackable) +data class Subscriber(val memberKey: String, val trackable: Trackable) sealed class Proximity diff --git a/publishing-sdk/src/main/java/com/ably/tracking/publisher/CorePublisher.kt b/publishing-sdk/src/main/java/com/ably/tracking/publisher/CorePublisher.kt index 92de376bb..07aa693a5 100644 --- a/publishing-sdk/src/main/java/com/ably/tracking/publisher/CorePublisher.kt +++ b/publishing-sdk/src/main/java/com/ably/tracking/publisher/CorePublisher.kt @@ -52,16 +52,17 @@ internal interface CorePublisher { val routingProfile: RoutingProfile val trackableStateFlows: Map> - fun addSubscriber(id: String, trackable: Trackable, data: PresenceData, properties: PublisherProperties) + fun addSubscriber(memberKey: String, trackable: Trackable, data: PresenceData, properties: PublisherProperties) fun updateSubscriber( - id: String, + memberKey: String, trackable: Trackable, data: PresenceData, properties: PublisherProperties ) - fun removeSubscriber(id: String, trackable: Trackable, properties: PublisherProperties) + fun removeSubscriber(memberKey: String, trackable: Trackable, properties: PublisherProperties) fun removeAllSubscribers(trackable: Trackable, properties: PublisherProperties) + fun setDestination(destination: Destination, properties: PublisherProperties) fun removeCurrentDestination(properties: PublisherProperties) fun startLocationUpdates(properties: PublisherProperties) @@ -492,8 +493,13 @@ constructor( } } - override fun addSubscriber(id: String, trackable: Trackable, data: PresenceData, properties: PublisherProperties) { - val subscriber = Subscriber(id, trackable) + override fun addSubscriber( + memberKey: String, + trackable: Trackable, + data: PresenceData, + properties: PublisherProperties + ) { + val subscriber = Subscriber(memberKey, trackable) if (properties.subscribers[trackable.id] == null) { properties.subscribers[trackable.id] = mutableSetOf() } @@ -504,13 +510,13 @@ constructor( } override fun updateSubscriber( - id: String, + memberKey: String, trackable: Trackable, data: PresenceData, properties: PublisherProperties ) { properties.subscribers[trackable.id]?.let { subscribers -> - subscribers.find { it.id == id }?.let { subscriber -> + subscribers.find { it.memberKey == memberKey }?.let { subscriber -> data.resolution.let { resolution -> saveOrRemoveResolutionRequest(resolution, trackable, subscriber, properties) resolveResolution(trackable, properties) @@ -519,9 +525,9 @@ constructor( } } - override fun removeSubscriber(id: String, trackable: Trackable, properties: PublisherProperties) { + override fun removeSubscriber(memberKey: String, trackable: Trackable, properties: PublisherProperties) { properties.subscribers[trackable.id]?.let { subscribers -> - subscribers.find { it.id == id }?.let { subscriber -> + subscribers.find { it.memberKey == memberKey }?.let { subscriber -> subscribers.remove(subscriber) properties.requests[trackable.id]?.remove(subscriber) hooks.subscribers?.onSubscriberRemoved(subscriber) diff --git a/publishing-sdk/src/main/java/com/ably/tracking/publisher/workerqueue/workers/PresenceMessageWorker.kt b/publishing-sdk/src/main/java/com/ably/tracking/publisher/workerqueue/workers/PresenceMessageWorker.kt index d0d308e14..57b3df43e 100644 --- a/publishing-sdk/src/main/java/com/ably/tracking/publisher/workerqueue/workers/PresenceMessageWorker.kt +++ b/publishing-sdk/src/main/java/com/ably/tracking/publisher/workerqueue/workers/PresenceMessageWorker.kt @@ -19,7 +19,7 @@ internal class PresenceMessageWorker( PresenceAction.PRESENT_OR_ENTER -> { if (presenceMessage.data.type == ClientTypes.SUBSCRIBER) { corePublisher.addSubscriber( - presenceMessage.clientId, + presenceMessage.memberKey, trackable, presenceMessage.data, properties @@ -28,13 +28,13 @@ internal class PresenceMessageWorker( } PresenceAction.LEAVE_OR_ABSENT -> { if (presenceMessage.data.type == ClientTypes.SUBSCRIBER) { - corePublisher.removeSubscriber(presenceMessage.clientId, trackable, properties) + corePublisher.removeSubscriber(presenceMessage.memberKey, trackable, properties) } } PresenceAction.UPDATE -> { if (presenceMessage.data.type == ClientTypes.SUBSCRIBER) { corePublisher.updateSubscriber( - presenceMessage.clientId, + presenceMessage.memberKey, trackable, presenceMessage.data, properties diff --git a/publishing-sdk/src/test/java/com/ably/tracking/publisher/workerqueue/workers/PresenceMessageWorkerTest.kt b/publishing-sdk/src/test/java/com/ably/tracking/publisher/workerqueue/workers/PresenceMessageWorkerTest.kt index 2ef5fa898..a69e5510f 100644 --- a/publishing-sdk/src/test/java/com/ably/tracking/publisher/workerqueue/workers/PresenceMessageWorkerTest.kt +++ b/publishing-sdk/src/test/java/com/ably/tracking/publisher/workerqueue/workers/PresenceMessageWorkerTest.kt @@ -133,7 +133,7 @@ class PresenceMessageWorkerTest { PresenceMessage( action, PresenceData(if (isSubscriber) ClientTypes.SUBSCRIBER else ClientTypes.PUBLISHER), - "test-client-id" + "test-member-key" ), corePublisher ) diff --git a/subscribing-sdk/src/main/java/com/ably/tracking/subscriber/CoreSubscriber.kt b/subscribing-sdk/src/main/java/com/ably/tracking/subscriber/CoreSubscriber.kt index c13c11c13..6719bbd2d 100644 --- a/subscribing-sdk/src/main/java/com/ably/tracking/subscriber/CoreSubscriber.kt +++ b/subscribing-sdk/src/main/java/com/ably/tracking/subscriber/CoreSubscriber.kt @@ -7,7 +7,9 @@ import com.ably.tracking.common.Ably import com.ably.tracking.common.ClientTypes import com.ably.tracking.common.ConnectionState import com.ably.tracking.common.ConnectionStateChange +import com.ably.tracking.common.PresenceAction import com.ably.tracking.common.PresenceData +import com.ably.tracking.common.PresenceMessage import com.ably.tracking.common.createSingleThreadDispatcher import com.ably.tracking.common.workerqueue.Properties import com.ably.tracking.common.workerqueue.WorkerQueue @@ -34,9 +36,6 @@ internal interface CoreSubscriber { } internal interface SubscriberInteractor { - fun updateTrackableState(properties: SubscriberProperties) - fun updatePublisherPresence(properties: SubscriberProperties, isPublisherPresent: Boolean) - fun updatePublisherResolutionInformation(presenceData: PresenceData) fun subscribeForRawEvents(presenceData: PresenceData) fun subscribeForEnhancedEvents(presenceData: PresenceData) fun subscribeForChannelState() @@ -62,39 +61,33 @@ private class DefaultCoreSubscriber( private val trackableId: String, ) : CoreSubscriber, SubscriberInteractor { - private val scope = CoroutineScope(singleThreadDispatcher + SupervisorJob()) private val workerQueue: WorkerQueue - private val _trackableStates: MutableStateFlow = - MutableStateFlow(TrackableState.Offline()) - private val _publisherPresence: MutableStateFlow = MutableStateFlow(false) - private val _enhancedLocations: MutableSharedFlow = - MutableSharedFlow(replay = 1) - private val _rawLocations: MutableSharedFlow = MutableSharedFlow(replay = 1) - private val _resolutions: MutableSharedFlow = MutableSharedFlow(replay = 1) - private val _nextLocationUpdateIntervals: MutableSharedFlow = - MutableSharedFlow(replay = 1) + + private val eventFlows: SubscriberProperties.EventFlows override val enhancedLocations: SharedFlow - get() = _enhancedLocations.asSharedFlow() + get() = eventFlows.enhancedLocations override val rawLocations: SharedFlow - get() = _rawLocations.asSharedFlow() + get() = eventFlows.rawLocations override val trackableStates: StateFlow - get() = _trackableStates.asStateFlow() + get() = eventFlows.trackableStates override val publisherPresence: StateFlow - get() = _publisherPresence + get() = eventFlows.publisherPresence override val resolutions: SharedFlow - get() = _resolutions.asSharedFlow() + get() = eventFlows.resolutions override val nextLocationUpdateIntervals: SharedFlow - get() = _nextLocationUpdateIntervals.asSharedFlow() + get() = eventFlows.nextLocationUpdateIntervals init { val workerFactory = WorkerFactory(this, ably, trackableId) - val properties = SubscriberProperties(initialResolution) + val scope = CoroutineScope(singleThreadDispatcher + SupervisorJob()) + eventFlows = SubscriberProperties.EventFlows(scope) + val properties = SubscriberProperties(initialResolution, eventFlows) workerQueue = WorkerQueue( properties = properties, scope = scope, @@ -110,31 +103,6 @@ private class DefaultCoreSubscriber( workerQueue.enqueue(workerSpecification) } - override fun updatePublisherPresence(properties: SubscriberProperties, isPublisherPresent: Boolean) { - if (isPublisherPresent != properties.isPublisherOnline) { - properties.isPublisherOnline = isPublisherPresent - scope.launch { _publisherPresence.emit(isPublisherPresent) } - } - } - - override fun updateTrackableState(properties: SubscriberProperties) { - val newTrackableState = when (properties.lastConnectionStateChange.state) { - ConnectionState.ONLINE -> { - when (properties.lastChannelConnectionStateChange.state) { - ConnectionState.ONLINE -> if (properties.isPublisherOnline) TrackableState.Online else TrackableState.Offline() - ConnectionState.OFFLINE -> TrackableState.Offline() - ConnectionState.FAILED -> TrackableState.Failed(properties.lastChannelConnectionStateChange.errorInformation!!) // are we sure error information will always be present? - } - } - ConnectionState.OFFLINE -> TrackableState.Offline() - ConnectionState.FAILED -> TrackableState.Failed(properties.lastConnectionStateChange.errorInformation!!) // are we sure error information will always be present? - } - if (newTrackableState != properties.trackableState) { - properties.trackableState = newTrackableState - scope.launch { _trackableStates.emit(newTrackableState) } - } - } - override fun subscribeForChannelState() { ably.subscribeForChannelStateChange(trackableId) { enqueue(WorkerSpecification.UpdateChannelConnectionState(it)) @@ -143,37 +111,171 @@ private class DefaultCoreSubscriber( override fun subscribeForEnhancedEvents(presenceData: PresenceData) { ably.subscribeForEnhancedEvents(trackableId, presenceData) { - scope.launch { _enhancedLocations.emit(it) } + eventFlows.emitEnhanced(it) } } override fun subscribeForRawEvents(presenceData: PresenceData) { ably.subscribeForRawEvents(trackableId, presenceData) { - scope.launch { _rawLocations.emit(it) } - } - } - - override fun updatePublisherResolutionInformation(presenceData: PresenceData) { - presenceData.resolution?.let { publisherResolution -> - scope.launch { _resolutions.emit(publisherResolution) } - scope.launch { _nextLocationUpdateIntervals.emit(publisherResolution.desiredInterval) } + eventFlows.emitRaw(it) } } override fun notifyAssetIsOffline() { - scope.launch { _trackableStates.emit(TrackableState.Offline()) } + // TODO what is this method achieving, why is it not in normal flow? + // Perhaps related to: https://github.com/ably/ably-asset-tracking-android/issues/802 + eventFlows.emit(TrackableState.Offline()) } } internal data class SubscriberProperties private constructor( var presenceData: PresenceData, + private val eventFlows: EventFlows, + override var isStopped: Boolean = false, - var isPublisherOnline: Boolean = false, - var trackableState: TrackableState = TrackableState.Offline(), - var lastConnectionStateChange: ConnectionStateChange = + + private var presentPublisherMemberKeys: MutableSet = HashSet(), + private var lastEmittedValueOfIsPublisherVisible: Boolean? = null, + private var lastEmittedTrackableState: TrackableState = TrackableState.Offline(), + private var lastConnectionStateChange: ConnectionStateChange = + ConnectionStateChange(ConnectionState.OFFLINE, null), + private var lastChannelConnectionStateChange: ConnectionStateChange = ConnectionStateChange(ConnectionState.OFFLINE, null), - var lastChannelConnectionStateChange: ConnectionStateChange = - ConnectionStateChange(ConnectionState.OFFLINE, null) + private var pendingPublisherResolutions: PendingResolutions = PendingResolutions(), ) : Properties { - constructor(initialResolution: Resolution?) : this(PresenceData(ClientTypes.SUBSCRIBER, initialResolution)) + internal constructor( + initialResolution: Resolution?, + eventFlows: EventFlows, + ) : this(PresenceData(ClientTypes.SUBSCRIBER, initialResolution), eventFlows) + + fun updateForConnectionStateChangeAndThenEmitStateEventsIfRequired(stateChange: ConnectionStateChange) { + lastConnectionStateChange = stateChange + emitStateEventsIfRequired() + } + + fun updateForChannelConnectionStateChangeAndThenEmitStateEventsIfRequired(stateChange: ConnectionStateChange) { + lastChannelConnectionStateChange = stateChange + emitStateEventsIfRequired() + } + + fun updateForPresenceMessagesAndThenEmitStateEventsIfRequired(presenceMessages: List) { + for (presenceMessage in presenceMessages) { + // We are only interested in presence updates from publishers. + if (presenceMessage.data.type == ClientTypes.PUBLISHER) { + + if (presenceMessage.action == PresenceAction.LEAVE_OR_ABSENT) { + // LEAVE or ABSENT + presentPublisherMemberKeys.remove(presenceMessage.memberKey) + } else { + // PRESENT, ENTER or UDPATE + presentPublisherMemberKeys.add(presenceMessage.memberKey) + presenceMessage.data.resolution?.let { publisherResolution -> + pendingPublisherResolutions.add(publisherResolution) + } + } + } + } + emitStateEventsIfRequired() + } + + fun emitStateEventsIfRequired() { + val isAPublisherPresent = (presentPublisherMemberKeys.isNotEmpty()) + + val trackableState = when (lastConnectionStateChange.state) { + ConnectionState.ONLINE -> { + when (lastChannelConnectionStateChange.state) { + ConnectionState.ONLINE -> if (isAPublisherPresent) TrackableState.Online else TrackableState.Offline() + ConnectionState.OFFLINE -> TrackableState.Offline() + ConnectionState.FAILED -> TrackableState.Failed(lastChannelConnectionStateChange.errorInformation!!) // are we sure error information will always be present? + } + } + ConnectionState.OFFLINE -> TrackableState.Offline() + ConnectionState.FAILED -> TrackableState.Failed(lastConnectionStateChange.errorInformation!!) // are we sure error information will always be present? + } + + if (trackableState != lastEmittedTrackableState) { + lastEmittedTrackableState = trackableState + eventFlows.emit(trackableState) + } + + // It is possible for presentPublisherMemberKeys to not be empty, even when we have no connectivity from our side, + // because we've had presence entry events without subsequent leave events. + // Therefore, from the perspective of a user consuming events from publisherPresenceStateFlow, what matters + // is what we're computing for isPublisherVisible (not the simple isAPublisherPresent). + val isPublisherVisible = (isAPublisherPresent && lastConnectionStateChange.state == ConnectionState.ONLINE) + if (lastEmittedValueOfIsPublisherVisible != isPublisherVisible) { + lastEmittedValueOfIsPublisherVisible = isPublisherVisible + eventFlows.emitPublisherPresence(isPublisherVisible) + } + + eventFlows.emit(pendingPublisherResolutions.drain()) + } + + internal class EventFlows(private val scope: CoroutineScope) { + private val _enhancedLocations: MutableSharedFlow = MutableSharedFlow(replay = 1) + private val _rawLocations: MutableSharedFlow = MutableSharedFlow(replay = 1) + private val _trackableStates: MutableStateFlow = MutableStateFlow(TrackableState.Offline()) + private val _publisherPresence: MutableStateFlow = MutableStateFlow(false) + private val _resolutions: MutableSharedFlow = MutableSharedFlow(replay = 1) + private val _nextLocationUpdateIntervals: MutableSharedFlow = MutableSharedFlow(replay = 1) + + fun emitEnhanced(locationUpdate: LocationUpdate) { + scope.launch { _enhancedLocations.emit(locationUpdate) } + } + + fun emitRaw(locationUpdate: LocationUpdate) { + scope.launch { _rawLocations.emit(locationUpdate) } + } + + fun emitPublisherPresence(isPublisherPresent: Boolean) { + scope.launch { _publisherPresence.emit(isPublisherPresent) } + } + + fun emit(trackableState: TrackableState) { + scope.launch { _trackableStates.emit(trackableState) } + } + + fun emit(resolutions: Array) { + if (resolutions.isNotEmpty()) { + scope.launch { + for (resolution in resolutions) { + _resolutions.emit(resolution) + _nextLocationUpdateIntervals.emit(resolution.desiredInterval) + } + } + } + } + + val enhancedLocations: SharedFlow + get() = _enhancedLocations.asSharedFlow() + + val rawLocations: SharedFlow + get() = _rawLocations.asSharedFlow() + + val trackableStates: StateFlow + get() = _trackableStates.asStateFlow() + + val publisherPresence: StateFlow + get() = _publisherPresence + + val resolutions: SharedFlow + get() = _resolutions.asSharedFlow() + + val nextLocationUpdateIntervals: SharedFlow + get() = _nextLocationUpdateIntervals.asSharedFlow() + } + + private class PendingResolutions { + private val resolutions: MutableList = ArrayList() + + fun add(resolution: Resolution) { + resolutions.add(resolution) + } + + fun drain(): Array { + val array = resolutions.toTypedArray() + resolutions.clear() + return array + } + } } diff --git a/subscribing-sdk/src/main/java/com/ably/tracking/subscriber/PresenceMessageProcessor.kt b/subscribing-sdk/src/main/java/com/ably/tracking/subscriber/PresenceMessageProcessor.kt deleted file mode 100644 index 2c6a31506..000000000 --- a/subscribing-sdk/src/main/java/com/ably/tracking/subscriber/PresenceMessageProcessor.kt +++ /dev/null @@ -1,32 +0,0 @@ -package com.ably.tracking.subscriber - -import com.ably.tracking.common.ClientTypes -import com.ably.tracking.common.PresenceAction -import com.ably.tracking.common.PresenceMessage - -internal fun processPresenceMessage( - presenceMessage: PresenceMessage, - properties: SubscriberProperties, - subscriberInteractor: SubscriberInteractor, -) { - when (presenceMessage.action) { - PresenceAction.PRESENT_OR_ENTER -> { - if (presenceMessage.data.type == ClientTypes.PUBLISHER) { - subscriberInteractor.updatePublisherPresence(properties, true) - subscriberInteractor.updateTrackableState(properties) - subscriberInteractor.updatePublisherResolutionInformation(presenceMessage.data) - } - } - PresenceAction.LEAVE_OR_ABSENT -> { - if (presenceMessage.data.type == ClientTypes.PUBLISHER) { - subscriberInteractor.updatePublisherPresence(properties, false) - subscriberInteractor.updateTrackableState(properties) - } - } - PresenceAction.UPDATE -> { - if (presenceMessage.data.type == ClientTypes.PUBLISHER) { - subscriberInteractor.updatePublisherResolutionInformation(presenceMessage.data) - } - } - } -} diff --git a/subscribing-sdk/src/main/java/com/ably/tracking/subscriber/workerqueue/WorkerFactory.kt b/subscribing-sdk/src/main/java/com/ably/tracking/subscriber/workerqueue/WorkerFactory.kt index 26f25c6be..a08863a26 100644 --- a/subscribing-sdk/src/main/java/com/ably/tracking/subscriber/workerqueue/WorkerFactory.kt +++ b/subscribing-sdk/src/main/java/com/ably/tracking/subscriber/workerqueue/WorkerFactory.kt @@ -38,7 +38,6 @@ internal class WorkerFactory( when (workerSpecification) { is WorkerSpecification.StartConnection -> StartConnectionWorker( ably, - subscriberInteractor, trackableId, workerSpecification.callbackFunction ) @@ -52,16 +51,13 @@ internal class WorkerFactory( workerSpecification.callbackFunction ) is WorkerSpecification.UpdateConnectionState -> UpdateConnectionStateWorker( - workerSpecification.connectionStateChange, - subscriberInteractor + workerSpecification.connectionStateChange ) is WorkerSpecification.UpdateChannelConnectionState -> UpdateChannelConnectionStateWorker( - workerSpecification.channelConnectionStateChange, - subscriberInteractor + workerSpecification.channelConnectionStateChange ) is WorkerSpecification.UpdatePublisherPresence -> UpdatePublisherPresenceWorker( - workerSpecification.presenceMessage, - subscriberInteractor + workerSpecification.presenceMessage ) is WorkerSpecification.ChangeResolution -> ChangeResolutionWorker( ably, @@ -69,7 +65,11 @@ internal class WorkerFactory( workerSpecification.resolution, workerSpecification.callbackFunction ) - is WorkerSpecification.Disconnect -> DisconnectWorker(ably, workerSpecification.trackableId, workerSpecification.callbackFunction) + is WorkerSpecification.Disconnect -> DisconnectWorker( + ably, + workerSpecification.trackableId, + workerSpecification.callbackFunction + ) is WorkerSpecification.StopConnection -> StopConnectionWorker( ably, subscriberInteractor, @@ -77,7 +77,6 @@ internal class WorkerFactory( ) is WorkerSpecification.ProcessInitialPresenceMessages -> ProcessInitialPresenceMessagesWorker( workerSpecification.presenceMessages, - subscriberInteractor, workerSpecification.callbackFunction, ) } diff --git a/subscribing-sdk/src/main/java/com/ably/tracking/subscriber/workerqueue/workers/ProcessInitialPresenceMessagesWorker.kt b/subscribing-sdk/src/main/java/com/ably/tracking/subscriber/workerqueue/workers/ProcessInitialPresenceMessagesWorker.kt index aaac20f29..2a92e3e7e 100644 --- a/subscribing-sdk/src/main/java/com/ably/tracking/subscriber/workerqueue/workers/ProcessInitialPresenceMessagesWorker.kt +++ b/subscribing-sdk/src/main/java/com/ably/tracking/subscriber/workerqueue/workers/ProcessInitialPresenceMessagesWorker.kt @@ -3,14 +3,11 @@ package com.ably.tracking.subscriber.workerqueue.workers import com.ably.tracking.common.PresenceMessage import com.ably.tracking.common.ResultCallbackFunction import com.ably.tracking.common.workerqueue.CallbackWorker -import com.ably.tracking.subscriber.SubscriberInteractor import com.ably.tracking.subscriber.SubscriberProperties -import com.ably.tracking.subscriber.processPresenceMessage import com.ably.tracking.subscriber.workerqueue.WorkerSpecification internal class ProcessInitialPresenceMessagesWorker( private val presenceMessages: List, - private val subscriberInteractor: SubscriberInteractor, callbackFunction: ResultCallbackFunction, ) : CallbackWorker(callbackFunction) { override fun doWork( @@ -18,9 +15,7 @@ internal class ProcessInitialPresenceMessagesWorker( doAsyncWork: (suspend () -> Unit) -> Unit, postWork: (WorkerSpecification) -> Unit ): SubscriberProperties { - presenceMessages.forEach { presenceMessage -> - processPresenceMessage(presenceMessage, properties, subscriberInteractor) - } + properties.updateForPresenceMessagesAndThenEmitStateEventsIfRequired(presenceMessages) postWork(WorkerSpecification.SubscribeToChannel(callbackFunction)) return properties } diff --git a/subscribing-sdk/src/main/java/com/ably/tracking/subscriber/workerqueue/workers/StartConnectionWorker.kt b/subscribing-sdk/src/main/java/com/ably/tracking/subscriber/workerqueue/workers/StartConnectionWorker.kt index 41a945cbc..c9adbec3c 100644 --- a/subscribing-sdk/src/main/java/com/ably/tracking/subscriber/workerqueue/workers/StartConnectionWorker.kt +++ b/subscribing-sdk/src/main/java/com/ably/tracking/subscriber/workerqueue/workers/StartConnectionWorker.kt @@ -3,13 +3,11 @@ package com.ably.tracking.subscriber.workerqueue.workers import com.ably.tracking.common.Ably import com.ably.tracking.common.ResultCallbackFunction import com.ably.tracking.subscriber.SubscriberProperties -import com.ably.tracking.subscriber.SubscriberInteractor import com.ably.tracking.common.workerqueue.CallbackWorker import com.ably.tracking.subscriber.workerqueue.WorkerSpecification internal class StartConnectionWorker( private val ably: Ably, - private val subscriberInteractor: SubscriberInteractor, private val trackableId: String, callbackFunction: ResultCallbackFunction ) : CallbackWorker(callbackFunction) { @@ -18,7 +16,7 @@ internal class StartConnectionWorker( doAsyncWork: (suspend () -> Unit) -> Unit, postWork: (WorkerSpecification) -> Unit ): SubscriberProperties { - subscriberInteractor.updateTrackableState(properties) + properties.emitStateEventsIfRequired() doAsyncWork { val startAblyConnectionResult = ably.startConnection() if (startAblyConnectionResult.isFailure) { diff --git a/subscribing-sdk/src/main/java/com/ably/tracking/subscriber/workerqueue/workers/UpdateChannelConnectionStateWorker.kt b/subscribing-sdk/src/main/java/com/ably/tracking/subscriber/workerqueue/workers/UpdateChannelConnectionStateWorker.kt index e4273e849..c025e9a1c 100644 --- a/subscribing-sdk/src/main/java/com/ably/tracking/subscriber/workerqueue/workers/UpdateChannelConnectionStateWorker.kt +++ b/subscribing-sdk/src/main/java/com/ably/tracking/subscriber/workerqueue/workers/UpdateChannelConnectionStateWorker.kt @@ -1,22 +1,19 @@ package com.ably.tracking.subscriber.workerqueue.workers import com.ably.tracking.common.ConnectionStateChange -import com.ably.tracking.subscriber.SubscriberInteractor import com.ably.tracking.subscriber.SubscriberProperties import com.ably.tracking.common.workerqueue.Worker import com.ably.tracking.subscriber.workerqueue.WorkerSpecification internal class UpdateChannelConnectionStateWorker( private val channelConnectionStateChange: ConnectionStateChange, - private val subscriberInteractor: SubscriberInteractor ) : Worker { override fun doWork( properties: SubscriberProperties, doAsyncWork: (suspend () -> Unit) -> Unit, postWork: (WorkerSpecification) -> Unit ): SubscriberProperties { - properties.lastChannelConnectionStateChange = channelConnectionStateChange - subscriberInteractor.updateTrackableState(properties) + properties.updateForChannelConnectionStateChangeAndThenEmitStateEventsIfRequired(channelConnectionStateChange) return properties } diff --git a/subscribing-sdk/src/main/java/com/ably/tracking/subscriber/workerqueue/workers/UpdateConnectionStateWorker.kt b/subscribing-sdk/src/main/java/com/ably/tracking/subscriber/workerqueue/workers/UpdateConnectionStateWorker.kt index 35aa56fb4..94dbcd009 100644 --- a/subscribing-sdk/src/main/java/com/ably/tracking/subscriber/workerqueue/workers/UpdateConnectionStateWorker.kt +++ b/subscribing-sdk/src/main/java/com/ably/tracking/subscriber/workerqueue/workers/UpdateConnectionStateWorker.kt @@ -1,22 +1,19 @@ package com.ably.tracking.subscriber.workerqueue.workers import com.ably.tracking.common.ConnectionStateChange -import com.ably.tracking.subscriber.SubscriberInteractor import com.ably.tracking.subscriber.SubscriberProperties import com.ably.tracking.common.workerqueue.Worker import com.ably.tracking.subscriber.workerqueue.WorkerSpecification internal class UpdateConnectionStateWorker( private val connectionStateChange: ConnectionStateChange, - private val subscriberInteractor: SubscriberInteractor ) : Worker { override fun doWork( properties: SubscriberProperties, doAsyncWork: (suspend () -> Unit) -> Unit, postWork: (WorkerSpecification) -> Unit ): SubscriberProperties { - properties.lastConnectionStateChange = connectionStateChange - subscriberInteractor.updateTrackableState(properties) + properties.updateForConnectionStateChangeAndThenEmitStateEventsIfRequired(connectionStateChange) return properties } diff --git a/subscribing-sdk/src/main/java/com/ably/tracking/subscriber/workerqueue/workers/UpdatePublisherPresenceWorker.kt b/subscribing-sdk/src/main/java/com/ably/tracking/subscriber/workerqueue/workers/UpdatePublisherPresenceWorker.kt index 8b911df1f..44a72a3a5 100644 --- a/subscribing-sdk/src/main/java/com/ably/tracking/subscriber/workerqueue/workers/UpdatePublisherPresenceWorker.kt +++ b/subscribing-sdk/src/main/java/com/ably/tracking/subscriber/workerqueue/workers/UpdatePublisherPresenceWorker.kt @@ -1,22 +1,19 @@ package com.ably.tracking.subscriber.workerqueue.workers import com.ably.tracking.common.PresenceMessage -import com.ably.tracking.subscriber.SubscriberInteractor import com.ably.tracking.subscriber.SubscriberProperties -import com.ably.tracking.subscriber.processPresenceMessage import com.ably.tracking.common.workerqueue.Worker import com.ably.tracking.subscriber.workerqueue.WorkerSpecification internal class UpdatePublisherPresenceWorker( private val presenceMessage: PresenceMessage, - private val subscriberInteractor: SubscriberInteractor ) : Worker { override fun doWork( properties: SubscriberProperties, doAsyncWork: (suspend () -> Unit) -> Unit, postWork: (WorkerSpecification) -> Unit ): SubscriberProperties { - processPresenceMessage(presenceMessage, properties, subscriberInteractor) + properties.updateForPresenceMessagesAndThenEmitStateEventsIfRequired(listOf(presenceMessage)) return properties } diff --git a/subscribing-sdk/src/test/java/com/ably/tracking/subscriber/workerqueue/workers/ChangeResolutionWorkerTest.kt b/subscribing-sdk/src/test/java/com/ably/tracking/subscriber/workerqueue/workers/ChangeResolutionWorkerTest.kt index db01a5266..0801d0155 100644 --- a/subscribing-sdk/src/test/java/com/ably/tracking/subscriber/workerqueue/workers/ChangeResolutionWorkerTest.kt +++ b/subscribing-sdk/src/test/java/com/ably/tracking/subscriber/workerqueue/workers/ChangeResolutionWorkerTest.kt @@ -27,7 +27,7 @@ internal class ChangeResolutionWorkerTest { @Test fun `should return Properties with updated resolution and notify callback`() = runBlockingTest { // given - val initialProperties = SubscriberProperties(Resolution(Accuracy.BALANCED, 100, 100.0)) + val initialProperties = SubscriberProperties(Resolution(Accuracy.BALANCED, 100, 100.0), mockk()) ably.mockUpdatePresenceDataSuccess(trackableId) // when diff --git a/subscribing-sdk/src/test/java/com/ably/tracking/subscriber/workerqueue/workers/DisconnectWorkerTest.kt b/subscribing-sdk/src/test/java/com/ably/tracking/subscriber/workerqueue/workers/DisconnectWorkerTest.kt index 30dd60196..1829cd1db 100644 --- a/subscribing-sdk/src/test/java/com/ably/tracking/subscriber/workerqueue/workers/DisconnectWorkerTest.kt +++ b/subscribing-sdk/src/test/java/com/ably/tracking/subscriber/workerqueue/workers/DisconnectWorkerTest.kt @@ -26,7 +26,7 @@ internal class DisconnectWorkerTest { @Test fun `should call ably disconnect and notify callback`() = runBlockingTest { // given - val initialProperties = SubscriberProperties(Resolution(Accuracy.BALANCED, 100, 100.0)) + val initialProperties = SubscriberProperties(Resolution(Accuracy.BALANCED, 100, 100.0), mockk()) ably.mockDisconnectSuccess(trackableId) // when diff --git a/subscribing-sdk/src/test/java/com/ably/tracking/subscriber/workerqueue/workers/ProcessInitialPresenceMessagesWorkerTest.kt b/subscribing-sdk/src/test/java/com/ably/tracking/subscriber/workerqueue/workers/ProcessInitialPresenceMessagesWorkerTest.kt index c1976cab8..55a63303f 100644 --- a/subscribing-sdk/src/test/java/com/ably/tracking/subscriber/workerqueue/workers/ProcessInitialPresenceMessagesWorkerTest.kt +++ b/subscribing-sdk/src/test/java/com/ably/tracking/subscriber/workerqueue/workers/ProcessInitialPresenceMessagesWorkerTest.kt @@ -1,65 +1,46 @@ package com.ably.tracking.subscriber.workerqueue.workers -import com.ably.tracking.Accuracy -import com.ably.tracking.Resolution import com.ably.tracking.common.ClientTypes import com.ably.tracking.common.PresenceAction import com.ably.tracking.common.PresenceData import com.ably.tracking.common.PresenceMessage import com.ably.tracking.common.ResultCallbackFunction -import com.ably.tracking.subscriber.SubscriberInteractor import com.ably.tracking.subscriber.SubscriberProperties -import com.ably.tracking.subscriber.processPresenceMessage import com.ably.tracking.subscriber.workerqueue.WorkerSpecification +import io.mockk.Runs import io.mockk.every import io.mockk.just import io.mockk.mockk -import io.mockk.mockkStatic -import io.mockk.runs -import io.mockk.unmockkStatic import io.mockk.verify import kotlinx.coroutines.ExperimentalCoroutinesApi import kotlinx.coroutines.test.runBlockingTest -import org.junit.After import org.junit.Assert -import org.junit.Before import org.junit.Test @ExperimentalCoroutinesApi internal class ProcessInitialPresenceMessagesWorkerTest { - private val subscriberInteractor: SubscriberInteractor = mockk() + private val subscriberProperties: SubscriberProperties = mockk() private val asyncWorks = mutableListOf Unit>() private val postedWorks = mutableListOf() - @Before - fun setup() { - mockkStatic("com.ably.tracking.subscriber.PresenceMessageProcessorKt") - every { processPresenceMessage(any(), any(), any()) } just runs - } - - @After - fun cleanup() { - unmockkStatic("com.ably.tracking.subscriber.PresenceMessageProcessorKt") - } - @Test fun `should process all presence messages`() = runBlockingTest { // given - val initialProperties = anyProperties() val presenceMessages = listOf(anyPresenceMessage(), anyPresenceMessage(), anyPresenceMessage()) - val worker = ProcessInitialPresenceMessagesWorker(presenceMessages, subscriberInteractor) {} + val worker = ProcessInitialPresenceMessagesWorker(presenceMessages) {} + every { subscriberProperties.updateForPresenceMessagesAndThenEmitStateEventsIfRequired(any()) } just Runs // when worker.doWork( - initialProperties, + subscriberProperties, asyncWorks.appendWork(), - postedWorks.appendSpecification() + postedWorks.appendSpecification(), ) // then - verify(exactly = presenceMessages.size) { - processPresenceMessage(any(), initialProperties, subscriberInteractor) + verify { + subscriberProperties.updateForPresenceMessagesAndThenEmitStateEventsIfRequired(presenceMessages) } } @@ -67,24 +48,27 @@ internal class ProcessInitialPresenceMessagesWorkerTest { fun `should post subscribe to channel work after processing presence messages`() = runBlockingTest { // given val callbackFunction: ResultCallbackFunction = {} - val worker = ProcessInitialPresenceMessagesWorker(emptyList(), subscriberInteractor, callbackFunction) + val presenceMessages = emptyList() + val worker = ProcessInitialPresenceMessagesWorker(presenceMessages, callbackFunction) + every { subscriberProperties.updateForPresenceMessagesAndThenEmitStateEventsIfRequired(any()) } just Runs // when worker.doWork( - anyProperties(), + subscriberProperties, asyncWorks.appendWork(), - postedWorks.appendSpecification() + postedWorks.appendSpecification(), ) // then Assert.assertEquals(WorkerSpecification.SubscribeToChannel(callbackFunction), postedWorks[0]) + verify { + subscriberProperties.updateForPresenceMessagesAndThenEmitStateEventsIfRequired(presenceMessages) + } } private fun anyPresenceMessage() = PresenceMessage( - PresenceAction.PRESENT_OR_ENTER, - PresenceData(ClientTypes.PUBLISHER, null, null), - clientId = "" + action = PresenceAction.PRESENT_OR_ENTER, + data = PresenceData(ClientTypes.PUBLISHER, null, null), + memberKey = "", ) - - private fun anyProperties() = SubscriberProperties(Resolution(Accuracy.BALANCED, 100, 100.0)) } diff --git a/subscribing-sdk/src/test/java/com/ably/tracking/subscriber/workerqueue/workers/StartConnectionWorkerTest.kt b/subscribing-sdk/src/test/java/com/ably/tracking/subscriber/workerqueue/workers/StartConnectionWorkerTest.kt index 428875786..bdd0978e9 100644 --- a/subscribing-sdk/src/test/java/com/ably/tracking/subscriber/workerqueue/workers/StartConnectionWorkerTest.kt +++ b/subscribing-sdk/src/test/java/com/ably/tracking/subscriber/workerqueue/workers/StartConnectionWorkerTest.kt @@ -1,21 +1,19 @@ package com.ably.tracking.subscriber.workerqueue.workers -import com.ably.tracking.Accuracy -import com.ably.tracking.Resolution import com.ably.tracking.common.Ably +import com.ably.tracking.common.PresenceData import com.ably.tracking.common.ResultCallbackFunction import com.ably.tracking.subscriber.SubscriberProperties -import com.ably.tracking.subscriber.SubscriberInteractor import com.ably.tracking.subscriber.workerqueue.WorkerSpecification import com.ably.tracking.test.common.mockConnectSuccess import com.ably.tracking.test.common.mockConnectFailure import com.ably.tracking.test.common.mockStartConnectionFailure import com.ably.tracking.test.common.mockStartConnectionSuccess +import io.mockk.Runs import io.mockk.coVerify import io.mockk.every import io.mockk.just import io.mockk.mockk -import io.mockk.runs import io.mockk.verify import kotlinx.coroutines.ExperimentalCoroutinesApi import kotlinx.coroutines.test.runBlockingTest @@ -27,11 +25,10 @@ internal class StartConnectionWorkerTest { private val ably: Ably = mockk() private val trackableId = "123123" - private val subscriberInteractor: SubscriberInteractor = mockk { - every { updateTrackableState(any()) } just runs - } private val callbackFunction: ResultCallbackFunction = mockk(relaxed = true) - private val startConnectionWorker = StartConnectionWorker(ably, subscriberInteractor, trackableId, callbackFunction) + private val subscriberProperties: SubscriberProperties = mockk() + private val presenceData: PresenceData = mockk() + private val startConnectionWorker = StartConnectionWorker(ably, trackableId, callbackFunction) private val asyncWorks = mutableListOf Unit>() private val postedWorks = mutableListOf() @@ -39,22 +36,27 @@ internal class StartConnectionWorkerTest { @Test fun `should call ably connect and post update trackable worker specification on success`() = runBlockingTest { // given - val initialProperties = SubscriberProperties(Resolution(Accuracy.BALANCED, 100, 100.0)) ably.mockStartConnectionSuccess() ably.mockConnectSuccess(trackableId) + every { subscriberProperties.emitStateEventsIfRequired() } just Runs + every { subscriberProperties.presenceData } returns presenceData // when val updatedProperties = - startConnectionWorker.doWork(initialProperties, asyncWorks.appendWork(), postedWorks.appendSpecification()) + startConnectionWorker.doWork( + subscriberProperties, + asyncWorks.appendWork(), + postedWorks.appendSpecification() + ) asyncWorks.executeAll() // then - Assert.assertEquals(initialProperties, updatedProperties) - verify { subscriberInteractor.updateTrackableState(initialProperties) } + Assert.assertEquals(subscriberProperties, updatedProperties) coVerify { ably.startConnection() ably.connect( - trackableId, initialProperties.presenceData, + trackableId, + presenceData, useRewind = true, willSubscribe = true ) @@ -65,22 +67,27 @@ internal class StartConnectionWorkerTest { @Test fun `should call ably connect and notify callback on failure`() = runBlockingTest { // given - val initialProperties = SubscriberProperties(Resolution(Accuracy.BALANCED, 100, 100.0)) ably.mockStartConnectionSuccess() ably.mockConnectFailure(trackableId) + every { subscriberProperties.emitStateEventsIfRequired() } just Runs + every { subscriberProperties.presenceData } returns presenceData // when val updatedProperties = - startConnectionWorker.doWork(initialProperties, asyncWorks.appendWork(), postedWorks.appendSpecification()) + startConnectionWorker.doWork( + subscriberProperties, + asyncWorks.appendWork(), + postedWorks.appendSpecification() + ) asyncWorks.executeAll() // then - Assert.assertEquals(initialProperties, updatedProperties) - verify { subscriberInteractor.updateTrackableState(initialProperties) } + Assert.assertEquals(subscriberProperties, updatedProperties) coVerify { ably.startConnection() ably.connect( - trackableId, initialProperties.presenceData, + trackableId, + presenceData, useRewind = true, willSubscribe = true ) @@ -92,22 +99,26 @@ internal class StartConnectionWorkerTest { @Test fun `should notify callback about failure when starting Ably connection fails`() = runBlockingTest { // given - val initialProperties = SubscriberProperties(Resolution(Accuracy.BALANCED, 100, 100.0)) ably.mockStartConnectionFailure() ably.mockConnectSuccess(trackableId) + every { subscriberProperties.emitStateEventsIfRequired() } just Runs // when val updatedProperties = - startConnectionWorker.doWork(initialProperties, asyncWorks.appendWork(), postedWorks.appendSpecification()) + startConnectionWorker.doWork( + subscriberProperties, + asyncWorks.appendWork(), + postedWorks.appendSpecification() + ) asyncWorks.executeAll() // then - Assert.assertEquals(initialProperties, updatedProperties) - verify { subscriberInteractor.updateTrackableState(initialProperties) } + Assert.assertEquals(subscriberProperties, updatedProperties) coVerify { ably.startConnection() } coVerify(exactly = 0) { ably.connect( - trackableId, initialProperties.presenceData, + trackableId, + subscriberProperties.presenceData, useRewind = true, willSubscribe = true ) diff --git a/subscribing-sdk/src/test/java/com/ably/tracking/subscriber/workerqueue/workers/StopConnectionWorkerTest.kt b/subscribing-sdk/src/test/java/com/ably/tracking/subscriber/workerqueue/workers/StopConnectionWorkerTest.kt index b329dc6e0..c580bd683 100644 --- a/subscribing-sdk/src/test/java/com/ably/tracking/subscriber/workerqueue/workers/StopConnectionWorkerTest.kt +++ b/subscribing-sdk/src/test/java/com/ably/tracking/subscriber/workerqueue/workers/StopConnectionWorkerTest.kt @@ -35,7 +35,7 @@ internal class StopConnectionWorkerTest { @Test fun `should call ably close and notify callback with success`() = runBlockingTest { // given - val initialProperties = SubscriberProperties(Resolution(Accuracy.BALANCED, 100, 100.0)) + val initialProperties = SubscriberProperties(Resolution(Accuracy.BALANCED, 100, 100.0), mockk()) ably.mockCloseSuccessWithDelay(10) // when @@ -51,7 +51,7 @@ internal class StopConnectionWorkerTest { @Test fun `should call ably close and notify callback with failure when it fails`() = runBlockingTest { // given - val initialProperties = SubscriberProperties(Resolution(Accuracy.BALANCED, 100, 100.0)) + val initialProperties = SubscriberProperties(Resolution(Accuracy.BALANCED, 100, 100.0), mockk()) ably.mockCloseFailure() // when diff --git a/subscribing-sdk/src/test/java/com/ably/tracking/subscriber/workerqueue/workers/SubscribeForPresenceMessagesWorkerTest.kt b/subscribing-sdk/src/test/java/com/ably/tracking/subscriber/workerqueue/workers/SubscribeForPresenceMessagesWorkerTest.kt index 8fc60c445..804f9d0bb 100644 --- a/subscribing-sdk/src/test/java/com/ably/tracking/subscriber/workerqueue/workers/SubscribeForPresenceMessagesWorkerTest.kt +++ b/subscribing-sdk/src/test/java/com/ably/tracking/subscriber/workerqueue/workers/SubscribeForPresenceMessagesWorkerTest.kt @@ -37,7 +37,7 @@ internal class SubscribeForPresenceMessagesWorkerTest { @Test fun `should post update presence work when presence listener is called`() = runBlockingTest { // given - val initialProperties = SubscriberProperties(Resolution(Accuracy.BALANCED, 100, 100.0)) + val initialProperties = SubscriberProperties(Resolution(Accuracy.BALANCED, 100, 100.0), mockk()) val presenceListenerSlot: CapturingSlot<(PresenceMessage) -> Unit> = slot() val presenceMessage = createPresenceMessage() ably.mockGetCurrentPresenceSuccess(trackableId) @@ -57,35 +57,42 @@ internal class SubscribeForPresenceMessagesWorkerTest { } private fun createPresenceMessage() = PresenceMessage( - PresenceAction.PRESENT_OR_ENTER, - PresenceData(ClientTypes.PUBLISHER, null, null), - clientId = "" + action = PresenceAction.PRESENT_OR_ENTER, + data = PresenceData(ClientTypes.PUBLISHER, null, null), + memberKey = "", ) @Test - fun `should post process initial presence messages work when both get current presence and subscribe to presence return success`() = runBlockingTest { - // given - val initialProperties = SubscriberProperties(Resolution(Accuracy.BALANCED, 100, 100.0)) - val initialPresenceMessages = listOf(anyPresenceMessage()) - ably.mockGetCurrentPresenceSuccess(trackableId, initialPresenceMessages) - ably.mockSubscribeToPresenceSuccess(trackableId) - - // when - subscribeForPresenceMessagesWorker.doWork( - initialProperties, - asyncWorks.appendWork(), - postedWorks.appendSpecification() - ) - asyncWorks.executeAll() - - // then - Assert.assertEquals(WorkerSpecification.ProcessInitialPresenceMessages(initialPresenceMessages, callbackFunction), postedWorks[0]) - } + fun `should post process initial presence messages work when both get current presence and subscribe to presence return success`() = + runBlockingTest { + // given + val initialProperties = SubscriberProperties(Resolution(Accuracy.BALANCED, 100, 100.0), mockk()) + val initialPresenceMessages = listOf(anyPresenceMessage()) + ably.mockGetCurrentPresenceSuccess(trackableId, initialPresenceMessages) + ably.mockSubscribeToPresenceSuccess(trackableId) + + // when + subscribeForPresenceMessagesWorker.doWork( + initialProperties, + asyncWorks.appendWork(), + postedWorks.appendSpecification() + ) + asyncWorks.executeAll() + + // then + Assert.assertEquals( + WorkerSpecification.ProcessInitialPresenceMessages( + initialPresenceMessages, + callbackFunction + ), + postedWorks[0], + ) + } @Test fun `should post disconnect work when subscribe to presence returns failure`() = runBlockingTest { // given - val initialProperties = SubscriberProperties(Resolution(Accuracy.BALANCED, 100, 100.0)) + val initialProperties = SubscriberProperties(Resolution(Accuracy.BALANCED, 100, 100.0), mockk()) ably.mockGetCurrentPresenceSuccess(trackableId) ably.mockSubscribeToPresenceError(trackableId) @@ -104,7 +111,7 @@ internal class SubscribeForPresenceMessagesWorkerTest { @Test fun `should post disconnect work when get current presence returns failure`() = runBlockingTest { // given - val initialProperties = SubscriberProperties(Resolution(Accuracy.BALANCED, 100, 100.0)) + val initialProperties = SubscriberProperties(Resolution(Accuracy.BALANCED, 100, 100.0), mockk()) ably.mockGetCurrentPresenceError(trackableId) ably.mockSubscribeToPresenceSuccess(trackableId) diff --git a/subscribing-sdk/src/test/java/com/ably/tracking/subscriber/workerqueue/workers/SubscribeToChannelWorkerTest.kt b/subscribing-sdk/src/test/java/com/ably/tracking/subscriber/workerqueue/workers/SubscribeToChannelWorkerTest.kt index 735d917c1..7bc3e62b8 100644 --- a/subscribing-sdk/src/test/java/com/ably/tracking/subscriber/workerqueue/workers/SubscribeToChannelWorkerTest.kt +++ b/subscribing-sdk/src/test/java/com/ably/tracking/subscriber/workerqueue/workers/SubscribeToChannelWorkerTest.kt @@ -33,7 +33,7 @@ internal class SubscribeToChannelWorkerTest { @Test fun `should notify callback after calling subscriberInteractor`() = runBlockingTest { // given - val initialProperties = SubscriberProperties(Resolution(Accuracy.BALANCED, 100, 100.0)) + val initialProperties = SubscriberProperties(Resolution(Accuracy.BALANCED, 100, 100.0), mockk()) // when subscribeToChannelWorker.doWork( diff --git a/subscribing-sdk/src/test/java/com/ably/tracking/subscriber/workerqueue/workers/UpdateChannelConnectionStateWorkerTest.kt b/subscribing-sdk/src/test/java/com/ably/tracking/subscriber/workerqueue/workers/UpdateChannelConnectionStateWorkerTest.kt index 4b218e3e2..891e1884f 100644 --- a/subscribing-sdk/src/test/java/com/ably/tracking/subscriber/workerqueue/workers/UpdateChannelConnectionStateWorkerTest.kt +++ b/subscribing-sdk/src/test/java/com/ably/tracking/subscriber/workerqueue/workers/UpdateChannelConnectionStateWorkerTest.kt @@ -1,50 +1,45 @@ package com.ably.tracking.subscriber.workerqueue.workers -import com.ably.tracking.Accuracy -import com.ably.tracking.Resolution -import com.ably.tracking.common.ConnectionState import com.ably.tracking.common.ConnectionStateChange import com.ably.tracking.subscriber.SubscriberProperties -import com.ably.tracking.subscriber.SubscriberInteractor import com.ably.tracking.subscriber.workerqueue.WorkerSpecification +import io.mockk.Runs import io.mockk.every import io.mockk.just import io.mockk.mockk -import io.mockk.runs import io.mockk.verify import kotlinx.coroutines.ExperimentalCoroutinesApi import kotlinx.coroutines.test.runBlockingTest -import org.junit.Assert import org.junit.Test @ExperimentalCoroutinesApi internal class UpdateChannelConnectionStateWorkerTest { - private val subscriberInteractor: SubscriberInteractor = mockk { - every { updateTrackableState(any()) } just runs - } - private val channelConnectionStateChange = ConnectionStateChange(ConnectionState.ONLINE, null) + private val subscriberProperties: SubscriberProperties = mockk() + private val channelConnectionStateChange: ConnectionStateChange = mockk() private val updateChannelConnectionStateWorker = - UpdateChannelConnectionStateWorker(channelConnectionStateChange, subscriberInteractor) + UpdateChannelConnectionStateWorker(channelConnectionStateChange) private val asyncWorks = mutableListOf Unit>() private val postedWorks = mutableListOf() @Test - fun `should call updateTrackableState and update properties`() = runBlockingTest { + fun `should call updateForChannelConnectionStateChangeAndThenEmitStateEventsIfRequired`() = runBlockingTest { // given - val initialProperties = SubscriberProperties(Resolution(Accuracy.BALANCED, 100, 100.0)) + every { subscriberProperties.updateForChannelConnectionStateChangeAndThenEmitStateEventsIfRequired(any()) } just Runs // when - val updatedProperties = - updateChannelConnectionStateWorker.doWork( - initialProperties, - asyncWorks.appendWork(), - postedWorks.appendSpecification() - ) + updateChannelConnectionStateWorker.doWork( + subscriberProperties, + asyncWorks.appendWork(), + postedWorks.appendSpecification() + ) // then - Assert.assertEquals(channelConnectionStateChange, updatedProperties.lastChannelConnectionStateChange) - verify { subscriberInteractor.updateTrackableState(updatedProperties) } + verify { + subscriberProperties.updateForChannelConnectionStateChangeAndThenEmitStateEventsIfRequired( + channelConnectionStateChange + ) + } } } diff --git a/subscribing-sdk/src/test/java/com/ably/tracking/subscriber/workerqueue/workers/UpdateConnectionStateWorkerTest.kt b/subscribing-sdk/src/test/java/com/ably/tracking/subscriber/workerqueue/workers/UpdateConnectionStateWorkerTest.kt index 361853edf..0d776f93e 100644 --- a/subscribing-sdk/src/test/java/com/ably/tracking/subscriber/workerqueue/workers/UpdateConnectionStateWorkerTest.kt +++ b/subscribing-sdk/src/test/java/com/ably/tracking/subscriber/workerqueue/workers/UpdateConnectionStateWorkerTest.kt @@ -1,50 +1,45 @@ package com.ably.tracking.subscriber.workerqueue.workers -import com.ably.tracking.Accuracy -import com.ably.tracking.Resolution -import com.ably.tracking.common.ConnectionState import com.ably.tracking.common.ConnectionStateChange import com.ably.tracking.subscriber.SubscriberProperties -import com.ably.tracking.subscriber.SubscriberInteractor import com.ably.tracking.subscriber.workerqueue.WorkerSpecification +import io.mockk.Runs import io.mockk.every import io.mockk.just import io.mockk.mockk -import io.mockk.runs import io.mockk.verify import kotlinx.coroutines.ExperimentalCoroutinesApi import kotlinx.coroutines.test.runBlockingTest -import org.junit.Assert import org.junit.Test @ExperimentalCoroutinesApi internal class UpdateConnectionStateWorkerTest { - private val subscriberInteractor: SubscriberInteractor = mockk { - every { updateTrackableState(any()) } just runs - } - private val connectionStateChange = ConnectionStateChange(ConnectionState.ONLINE, null) + private val subscriberProperties: SubscriberProperties = mockk() + private val connectionStateChange: ConnectionStateChange = mockk() private val updateConnectionStateWorker = - UpdateConnectionStateWorker(connectionStateChange, subscriberInteractor) + UpdateConnectionStateWorker(connectionStateChange) private val asyncWorks = mutableListOf Unit>() private val postedWorks = mutableListOf() @Test - fun `should call updateTrackableState and update properties`() = runBlockingTest { + fun `should call updateForConnectionStateChangeAndThenEmitStateEventsIfRequired`() = runBlockingTest { // given - val initialProperties = SubscriberProperties(Resolution(Accuracy.BALANCED, 100, 100.0)) + every { subscriberProperties.updateForConnectionStateChangeAndThenEmitStateEventsIfRequired(any()) } just Runs // when - val updatedProperties = - updateConnectionStateWorker.doWork( - initialProperties, - asyncWorks.appendWork(), - postedWorks.appendSpecification() - ) + updateConnectionStateWorker.doWork( + subscriberProperties, + asyncWorks.appendWork(), + postedWorks.appendSpecification() + ) // then - Assert.assertEquals(connectionStateChange, updatedProperties.lastConnectionStateChange) - verify { subscriberInteractor.updateTrackableState(updatedProperties) } + verify { + subscriberProperties.updateForConnectionStateChangeAndThenEmitStateEventsIfRequired( + connectionStateChange + ) + } } } diff --git a/subscribing-sdk/src/test/java/com/ably/tracking/subscriber/workerqueue/workers/UpdatePublisherPresenceWorkerTest.kt b/subscribing-sdk/src/test/java/com/ably/tracking/subscriber/workerqueue/workers/UpdatePublisherPresenceWorkerTest.kt index 69face439..f7263fffe 100644 --- a/subscribing-sdk/src/test/java/com/ably/tracking/subscriber/workerqueue/workers/UpdatePublisherPresenceWorkerTest.kt +++ b/subscribing-sdk/src/test/java/com/ably/tracking/subscriber/workerqueue/workers/UpdatePublisherPresenceWorkerTest.kt @@ -1,18 +1,12 @@ package com.ably.tracking.subscriber.workerqueue.workers -import com.ably.tracking.Accuracy -import com.ably.tracking.Resolution -import com.ably.tracking.common.ClientTypes -import com.ably.tracking.common.PresenceAction -import com.ably.tracking.common.PresenceData import com.ably.tracking.common.PresenceMessage import com.ably.tracking.subscriber.SubscriberProperties -import com.ably.tracking.subscriber.SubscriberInteractor import com.ably.tracking.subscriber.workerqueue.WorkerSpecification +import io.mockk.Runs import io.mockk.every import io.mockk.just import io.mockk.mockk -import io.mockk.runs import io.mockk.verify import kotlinx.coroutines.ExperimentalCoroutinesApi import kotlinx.coroutines.test.runBlockingTest @@ -21,81 +15,27 @@ import org.junit.Test @ExperimentalCoroutinesApi internal class UpdatePublisherPresenceWorkerTest { - private val subscriberInteractor: SubscriberInteractor = mockk { - every { updatePublisherPresence(any(), any()) } just runs - every { updateTrackableState(any()) } just runs - every { updatePublisherResolutionInformation(any()) } just runs - } - + private val subscriberProperties: SubscriberProperties = mockk() + private val presenceMessage: PresenceMessage = mockk() private val asyncWorks = mutableListOf Unit>() private val postedWorks = mutableListOf() @Test - fun `should call subscriber interactor for PRESENT_OR_ENTER presence message`() = runBlockingTest { - // given - val initialProperties = SubscriberProperties(Resolution(Accuracy.BALANCED, 100, 100.0)) - val presenceMessage = createPresenceMessage(PresenceAction.PRESENT_OR_ENTER) - val worker = UpdatePublisherPresenceWorker(presenceMessage, subscriberInteractor) - - // when - worker.doWork( - initialProperties, - asyncWorks.appendWork(), - postedWorks.appendSpecification() - ) - - // then - verify { - subscriberInteractor.updatePublisherPresence(initialProperties, true) - subscriberInteractor.updateTrackableState(initialProperties) - subscriberInteractor.updatePublisherResolutionInformation(presenceMessage.data) - } - } - - @Test - fun `should call subscriber interactor for LEAVE_OR_ABSENT presence message`() = runBlockingTest { + fun `should call updateForPresenceMessagesAndThenEmitStateEventsIfRequired`() = runBlockingTest { // given - val initialProperties = SubscriberProperties(Resolution(Accuracy.BALANCED, 100, 100.0)) - val presenceMessage = createPresenceMessage(PresenceAction.LEAVE_OR_ABSENT) - val worker = UpdatePublisherPresenceWorker(presenceMessage, subscriberInteractor) + val worker = UpdatePublisherPresenceWorker(presenceMessage) + every { subscriberProperties.updateForPresenceMessagesAndThenEmitStateEventsIfRequired(any()) } just Runs // when worker.doWork( - initialProperties, + subscriberProperties, asyncWorks.appendWork(), postedWorks.appendSpecification() ) // then verify { - subscriberInteractor.updatePublisherPresence(initialProperties, false) - subscriberInteractor.updateTrackableState(initialProperties) + subscriberProperties.updateForPresenceMessagesAndThenEmitStateEventsIfRequired(listOf(presenceMessage)) } } - - @Test - fun `should call subscriber interactor for UPDATE presence message`() = runBlockingTest { - // given - val initialProperties = SubscriberProperties(Resolution(Accuracy.BALANCED, 100, 100.0)) - val presenceMessage = createPresenceMessage(PresenceAction.UPDATE) - val worker = UpdatePublisherPresenceWorker(presenceMessage, subscriberInteractor) - - // when - worker.doWork( - initialProperties, - asyncWorks.appendWork(), - postedWorks.appendSpecification() - ) - - // then - verify { - subscriberInteractor.updatePublisherResolutionInformation(presenceMessage.data) - } - } - - private fun createPresenceMessage(action: PresenceAction) = PresenceMessage( - action, - PresenceData(ClientTypes.PUBLISHER, null, null), - clientId = "" - ) }