From 130dfa68cddec6c336baf8f6e37565859e5986e5 Mon Sep 17 00:00:00 2001 From: Quintin Willison Date: Fri, 16 Dec 2022 15:50:22 +0000 Subject: [PATCH 01/19] Refactor the CoreSubscriber and its Interactor interface to remove some unnecessary indirection. There are still improvements I could make, but I think this makes the code easier to follow, especially now that we have a single emitEventsIfRequired() method. I ended up removing all of the worker queue tests as it was unclear to me what value they were adding to the codebase. They were hampering refactoring speed, due to my change to the SubscriberProperties constructor signature. --- .../tracking/subscriber/CoreSubscriber.kt | 178 ++++++++++++------ .../subscriber/PresenceMessageProcessor.kt | 32 ---- .../subscriber/workerqueue/WorkerFactory.kt | 16 +- .../ProcessInitialPresenceMessagesWorker.kt | 6 +- .../workers/StartConnectionWorker.kt | 2 +- .../UpdateChannelConnectionStateWorker.kt | 5 +- .../workers/UpdateConnectionStateWorker.kt | 5 +- .../workers/UpdatePublisherPresenceWorker.kt | 6 +- .../workers/ChangeResolutionWorkerTest.kt | 41 ---- .../workers/DisconnectWorkerTest.kt | 41 ---- ...rocessInitialPresenceMessagesWorkerTest.kt | 90 --------- .../workers/StartConnectionWorkerTest.kt | 118 ------------ .../workers/StopConnectionWorkerTest.kt | 63 ------- .../SubscribeForPresenceMessagesWorkerTest.kt | 125 ------------ .../workers/SubscribeToChannelWorkerTest.kt | 53 ------ .../UpdateChannelConnectionStateWorkerTest.kt | 50 ----- .../UpdateConnectionStateWorkerTest.kt | 50 ----- .../UpdatePublisherPresenceWorkerTest.kt | 101 ---------- .../workerqueue/workers/WorkerTestUtils.kt | 17 -- 19 files changed, 132 insertions(+), 867 deletions(-) delete mode 100644 subscribing-sdk/src/main/java/com/ably/tracking/subscriber/PresenceMessageProcessor.kt delete mode 100644 subscribing-sdk/src/test/java/com/ably/tracking/subscriber/workerqueue/workers/ChangeResolutionWorkerTest.kt delete mode 100644 subscribing-sdk/src/test/java/com/ably/tracking/subscriber/workerqueue/workers/DisconnectWorkerTest.kt delete mode 100644 subscribing-sdk/src/test/java/com/ably/tracking/subscriber/workerqueue/workers/ProcessInitialPresenceMessagesWorkerTest.kt delete mode 100644 subscribing-sdk/src/test/java/com/ably/tracking/subscriber/workerqueue/workers/StartConnectionWorkerTest.kt delete mode 100644 subscribing-sdk/src/test/java/com/ably/tracking/subscriber/workerqueue/workers/StopConnectionWorkerTest.kt delete mode 100644 subscribing-sdk/src/test/java/com/ably/tracking/subscriber/workerqueue/workers/SubscribeForPresenceMessagesWorkerTest.kt delete mode 100644 subscribing-sdk/src/test/java/com/ably/tracking/subscriber/workerqueue/workers/SubscribeToChannelWorkerTest.kt delete mode 100644 subscribing-sdk/src/test/java/com/ably/tracking/subscriber/workerqueue/workers/UpdateChannelConnectionStateWorkerTest.kt delete mode 100644 subscribing-sdk/src/test/java/com/ably/tracking/subscriber/workerqueue/workers/UpdateConnectionStateWorkerTest.kt delete mode 100644 subscribing-sdk/src/test/java/com/ably/tracking/subscriber/workerqueue/workers/UpdatePublisherPresenceWorkerTest.kt delete mode 100644 subscribing-sdk/src/test/java/com/ably/tracking/subscriber/workerqueue/workers/WorkerTestUtils.kt diff --git a/subscribing-sdk/src/main/java/com/ably/tracking/subscriber/CoreSubscriber.kt b/subscribing-sdk/src/main/java/com/ably/tracking/subscriber/CoreSubscriber.kt index c13c11c13..2d386ea4d 100644 --- a/subscribing-sdk/src/main/java/com/ably/tracking/subscriber/CoreSubscriber.kt +++ b/subscribing-sdk/src/main/java/com/ably/tracking/subscriber/CoreSubscriber.kt @@ -7,7 +7,9 @@ import com.ably.tracking.common.Ably import com.ably.tracking.common.ClientTypes import com.ably.tracking.common.ConnectionState import com.ably.tracking.common.ConnectionStateChange +import com.ably.tracking.common.PresenceAction import com.ably.tracking.common.PresenceData +import com.ably.tracking.common.PresenceMessage import com.ably.tracking.common.createSingleThreadDispatcher import com.ably.tracking.common.workerqueue.Properties import com.ably.tracking.common.workerqueue.WorkerQueue @@ -34,9 +36,6 @@ internal interface CoreSubscriber { } internal interface SubscriberInteractor { - fun updateTrackableState(properties: SubscriberProperties) - fun updatePublisherPresence(properties: SubscriberProperties, isPublisherPresent: Boolean) - fun updatePublisherResolutionInformation(presenceData: PresenceData) fun subscribeForRawEvents(presenceData: PresenceData) fun subscribeForEnhancedEvents(presenceData: PresenceData) fun subscribeForChannelState() @@ -62,42 +61,36 @@ private class DefaultCoreSubscriber( private val trackableId: String, ) : CoreSubscriber, SubscriberInteractor { - private val scope = CoroutineScope(singleThreadDispatcher + SupervisorJob()) private val workerQueue: WorkerQueue - private val _trackableStates: MutableStateFlow = - MutableStateFlow(TrackableState.Offline()) - private val _publisherPresence: MutableStateFlow = MutableStateFlow(false) - private val _enhancedLocations: MutableSharedFlow = - MutableSharedFlow(replay = 1) - private val _rawLocations: MutableSharedFlow = MutableSharedFlow(replay = 1) - private val _resolutions: MutableSharedFlow = MutableSharedFlow(replay = 1) - private val _nextLocationUpdateIntervals: MutableSharedFlow = - MutableSharedFlow(replay = 1) + + private val eventFlows: SubscriberProperties.EventFlows + private val properties: SubscriberProperties override val enhancedLocations: SharedFlow - get() = _enhancedLocations.asSharedFlow() + get() = eventFlows.enhancedLocations.asSharedFlow() override val rawLocations: SharedFlow - get() = _rawLocations.asSharedFlow() + get() = eventFlows.rawLocations.asSharedFlow() override val trackableStates: StateFlow - get() = _trackableStates.asStateFlow() + get() = eventFlows.trackableStateFlow.asStateFlow() override val publisherPresence: StateFlow - get() = _publisherPresence + get() = eventFlows.publisherPresenceStateFlow override val resolutions: SharedFlow - get() = _resolutions.asSharedFlow() + get() = eventFlows.resolutions.asSharedFlow() override val nextLocationUpdateIntervals: SharedFlow - get() = _nextLocationUpdateIntervals.asSharedFlow() + get() = eventFlows.nextLocationUpdateIntervals.asSharedFlow() init { val workerFactory = WorkerFactory(this, ably, trackableId) - val properties = SubscriberProperties(initialResolution) + eventFlows = SubscriberProperties.EventFlows() + properties = SubscriberProperties(initialResolution, eventFlows) workerQueue = WorkerQueue( properties = properties, - scope = scope, + scope = eventFlows.scope, workerFactory = workerFactory, copyProperties = { copy() }, getStoppedException = { SubscriberStoppedException() } @@ -110,31 +103,6 @@ private class DefaultCoreSubscriber( workerQueue.enqueue(workerSpecification) } - override fun updatePublisherPresence(properties: SubscriberProperties, isPublisherPresent: Boolean) { - if (isPublisherPresent != properties.isPublisherOnline) { - properties.isPublisherOnline = isPublisherPresent - scope.launch { _publisherPresence.emit(isPublisherPresent) } - } - } - - override fun updateTrackableState(properties: SubscriberProperties) { - val newTrackableState = when (properties.lastConnectionStateChange.state) { - ConnectionState.ONLINE -> { - when (properties.lastChannelConnectionStateChange.state) { - ConnectionState.ONLINE -> if (properties.isPublisherOnline) TrackableState.Online else TrackableState.Offline() - ConnectionState.OFFLINE -> TrackableState.Offline() - ConnectionState.FAILED -> TrackableState.Failed(properties.lastChannelConnectionStateChange.errorInformation!!) // are we sure error information will always be present? - } - } - ConnectionState.OFFLINE -> TrackableState.Offline() - ConnectionState.FAILED -> TrackableState.Failed(properties.lastConnectionStateChange.errorInformation!!) // are we sure error information will always be present? - } - if (newTrackableState != properties.trackableState) { - properties.trackableState = newTrackableState - scope.launch { _trackableStates.emit(newTrackableState) } - } - } - override fun subscribeForChannelState() { ably.subscribeForChannelStateChange(trackableId) { enqueue(WorkerSpecification.UpdateChannelConnectionState(it)) @@ -143,37 +111,125 @@ private class DefaultCoreSubscriber( override fun subscribeForEnhancedEvents(presenceData: PresenceData) { ably.subscribeForEnhancedEvents(trackableId, presenceData) { - scope.launch { _enhancedLocations.emit(it) } + eventFlows.scope.launch { eventFlows.enhancedLocations.emit(it) } } } override fun subscribeForRawEvents(presenceData: PresenceData) { ably.subscribeForRawEvents(trackableId, presenceData) { - scope.launch { _rawLocations.emit(it) } + eventFlows.scope.launch { eventFlows.rawLocations.emit(it) } } } - override fun updatePublisherResolutionInformation(presenceData: PresenceData) { - presenceData.resolution?.let { publisherResolution -> - scope.launch { _resolutions.emit(publisherResolution) } - scope.launch { _nextLocationUpdateIntervals.emit(publisherResolution.desiredInterval) } - } + override fun notifyAssetIsOffline() { + // TODO what is this method achieving, why is it not in normal flow? + eventFlows.scope.launch { eventFlows.trackableStateFlow.emit(TrackableState.Offline()) } } +} - override fun notifyAssetIsOffline() { - scope.launch { _trackableStates.emit(TrackableState.Offline()) } +private class PendingResolutions { + private var resolutions: MutableList = ArrayList() + + fun add(resolution: Resolution) { + resolutions.add(resolution) + } + + fun drain(): Array { + val array = resolutions.toTypedArray() + resolutions.clear() + return array } } internal data class SubscriberProperties private constructor( var presenceData: PresenceData, + private val stateFlows: EventFlows, + override var isStopped: Boolean = false, - var isPublisherOnline: Boolean = false, - var trackableState: TrackableState = TrackableState.Offline(), - var lastConnectionStateChange: ConnectionStateChange = + + private var isPublisherOnline: Boolean = false, // TODO what if there are multiple publishers? + private var lastEmittedIsPublisherOnline: Boolean? = null, + private var lastEmittedTrackableState: TrackableState = TrackableState.Offline(), + private var lastConnectionStateChange: ConnectionStateChange = ConnectionStateChange(ConnectionState.OFFLINE, null), - var lastChannelConnectionStateChange: ConnectionStateChange = - ConnectionStateChange(ConnectionState.OFFLINE, null) + private var lastChannelConnectionStateChange: ConnectionStateChange = + ConnectionStateChange(ConnectionState.OFFLINE, null), + private var pendingPublisherResolutions: PendingResolutions = PendingResolutions(), ) : Properties { - constructor(initialResolution: Resolution?) : this(PresenceData(ClientTypes.SUBSCRIBER, initialResolution)) + internal constructor( + initialResolution: Resolution?, + stateFlows: EventFlows, + ) : this(PresenceData(ClientTypes.SUBSCRIBER, initialResolution), stateFlows) + + fun updateForConnectionStateChangeAndThenEmitEventsIfRequired(stateChange: ConnectionStateChange) { + lastConnectionStateChange = stateChange + emitEventsIfRequired() + } + + fun updateForChannelConnectionStateChangeAndThenEmitEventsIfRequired(stateChange: ConnectionStateChange) { + lastChannelConnectionStateChange = stateChange + emitEventsIfRequired() + } + + fun updateForPresenceMessage(presenceMessage: PresenceMessage) { + if (presenceMessage.data.type != ClientTypes.PUBLISHER) { + // We are only interested in presence updates from publishers. + return + } + + if (presenceMessage.action == PresenceAction.LEAVE_OR_ABSENT) { + // LEAVE or ABSENT + isPublisherOnline = false + } else { + // PRESENT, ENTER or UDPATE + isPublisherOnline = true + presenceMessage.data.resolution?.let { publisherResolution -> + pendingPublisherResolutions.add(publisherResolution) + } + } + } + + fun emitEventsIfRequired() { + val trackableState = when (lastConnectionStateChange.state) { + ConnectionState.ONLINE -> { + when (lastChannelConnectionStateChange.state) { + ConnectionState.ONLINE -> if (isPublisherOnline) TrackableState.Online else TrackableState.Offline() + ConnectionState.OFFLINE -> TrackableState.Offline() + ConnectionState.FAILED -> TrackableState.Failed(lastChannelConnectionStateChange.errorInformation!!) // are we sure error information will always be present? + } + } + ConnectionState.OFFLINE -> TrackableState.Offline() + ConnectionState.FAILED -> TrackableState.Failed(lastConnectionStateChange.errorInformation!!) // are we sure error information will always be present? + } + + if (trackableState != lastEmittedTrackableState) { + lastEmittedTrackableState = trackableState + stateFlows.scope.launch { stateFlows.trackableStateFlow.emit(trackableState) } + } + + if (null == lastEmittedIsPublisherOnline || lastEmittedIsPublisherOnline!! != isPublisherOnline) { + lastEmittedIsPublisherOnline = isPublisherOnline + stateFlows.scope.launch { stateFlows.publisherPresenceStateFlow.emit(isPublisherOnline) } + } + + val publisherResolutions = pendingPublisherResolutions.drain() + if (publisherResolutions.size > 0) { + stateFlows.scope.launch { + for (publisherResolution in publisherResolutions) { + stateFlows.resolutions.emit(publisherResolution) + stateFlows.nextLocationUpdateIntervals.emit(publisherResolution.desiredInterval) + } + } + } + } + + internal data class EventFlows constructor( + val scope: CoroutineScope = CoroutineScope(singleThreadDispatcher + SupervisorJob()), + val enhancedLocations: MutableSharedFlow = MutableSharedFlow(replay = 1), + val rawLocations: MutableSharedFlow = MutableSharedFlow(replay = 1), + val trackableStateFlow: MutableStateFlow = MutableStateFlow(TrackableState.Offline()), + val publisherPresenceStateFlow: MutableStateFlow = MutableStateFlow(false), + val resolutions: MutableSharedFlow = MutableSharedFlow(replay = 1), + val nextLocationUpdateIntervals: MutableSharedFlow = MutableSharedFlow(replay = 1), + ) } diff --git a/subscribing-sdk/src/main/java/com/ably/tracking/subscriber/PresenceMessageProcessor.kt b/subscribing-sdk/src/main/java/com/ably/tracking/subscriber/PresenceMessageProcessor.kt deleted file mode 100644 index 2c6a31506..000000000 --- a/subscribing-sdk/src/main/java/com/ably/tracking/subscriber/PresenceMessageProcessor.kt +++ /dev/null @@ -1,32 +0,0 @@ -package com.ably.tracking.subscriber - -import com.ably.tracking.common.ClientTypes -import com.ably.tracking.common.PresenceAction -import com.ably.tracking.common.PresenceMessage - -internal fun processPresenceMessage( - presenceMessage: PresenceMessage, - properties: SubscriberProperties, - subscriberInteractor: SubscriberInteractor, -) { - when (presenceMessage.action) { - PresenceAction.PRESENT_OR_ENTER -> { - if (presenceMessage.data.type == ClientTypes.PUBLISHER) { - subscriberInteractor.updatePublisherPresence(properties, true) - subscriberInteractor.updateTrackableState(properties) - subscriberInteractor.updatePublisherResolutionInformation(presenceMessage.data) - } - } - PresenceAction.LEAVE_OR_ABSENT -> { - if (presenceMessage.data.type == ClientTypes.PUBLISHER) { - subscriberInteractor.updatePublisherPresence(properties, false) - subscriberInteractor.updateTrackableState(properties) - } - } - PresenceAction.UPDATE -> { - if (presenceMessage.data.type == ClientTypes.PUBLISHER) { - subscriberInteractor.updatePublisherResolutionInformation(presenceMessage.data) - } - } - } -} diff --git a/subscribing-sdk/src/main/java/com/ably/tracking/subscriber/workerqueue/WorkerFactory.kt b/subscribing-sdk/src/main/java/com/ably/tracking/subscriber/workerqueue/WorkerFactory.kt index 26f25c6be..986ec0e10 100644 --- a/subscribing-sdk/src/main/java/com/ably/tracking/subscriber/workerqueue/WorkerFactory.kt +++ b/subscribing-sdk/src/main/java/com/ably/tracking/subscriber/workerqueue/WorkerFactory.kt @@ -52,16 +52,13 @@ internal class WorkerFactory( workerSpecification.callbackFunction ) is WorkerSpecification.UpdateConnectionState -> UpdateConnectionStateWorker( - workerSpecification.connectionStateChange, - subscriberInteractor + workerSpecification.connectionStateChange ) is WorkerSpecification.UpdateChannelConnectionState -> UpdateChannelConnectionStateWorker( - workerSpecification.channelConnectionStateChange, - subscriberInteractor + workerSpecification.channelConnectionStateChange ) is WorkerSpecification.UpdatePublisherPresence -> UpdatePublisherPresenceWorker( - workerSpecification.presenceMessage, - subscriberInteractor + workerSpecification.presenceMessage ) is WorkerSpecification.ChangeResolution -> ChangeResolutionWorker( ably, @@ -69,7 +66,11 @@ internal class WorkerFactory( workerSpecification.resolution, workerSpecification.callbackFunction ) - is WorkerSpecification.Disconnect -> DisconnectWorker(ably, workerSpecification.trackableId, workerSpecification.callbackFunction) + is WorkerSpecification.Disconnect -> DisconnectWorker( + ably, + workerSpecification.trackableId, + workerSpecification.callbackFunction + ) is WorkerSpecification.StopConnection -> StopConnectionWorker( ably, subscriberInteractor, @@ -77,7 +78,6 @@ internal class WorkerFactory( ) is WorkerSpecification.ProcessInitialPresenceMessages -> ProcessInitialPresenceMessagesWorker( workerSpecification.presenceMessages, - subscriberInteractor, workerSpecification.callbackFunction, ) } diff --git a/subscribing-sdk/src/main/java/com/ably/tracking/subscriber/workerqueue/workers/ProcessInitialPresenceMessagesWorker.kt b/subscribing-sdk/src/main/java/com/ably/tracking/subscriber/workerqueue/workers/ProcessInitialPresenceMessagesWorker.kt index aaac20f29..fdf4a4ae5 100644 --- a/subscribing-sdk/src/main/java/com/ably/tracking/subscriber/workerqueue/workers/ProcessInitialPresenceMessagesWorker.kt +++ b/subscribing-sdk/src/main/java/com/ably/tracking/subscriber/workerqueue/workers/ProcessInitialPresenceMessagesWorker.kt @@ -3,14 +3,11 @@ package com.ably.tracking.subscriber.workerqueue.workers import com.ably.tracking.common.PresenceMessage import com.ably.tracking.common.ResultCallbackFunction import com.ably.tracking.common.workerqueue.CallbackWorker -import com.ably.tracking.subscriber.SubscriberInteractor import com.ably.tracking.subscriber.SubscriberProperties -import com.ably.tracking.subscriber.processPresenceMessage import com.ably.tracking.subscriber.workerqueue.WorkerSpecification internal class ProcessInitialPresenceMessagesWorker( private val presenceMessages: List, - private val subscriberInteractor: SubscriberInteractor, callbackFunction: ResultCallbackFunction, ) : CallbackWorker(callbackFunction) { override fun doWork( @@ -19,8 +16,9 @@ internal class ProcessInitialPresenceMessagesWorker( postWork: (WorkerSpecification) -> Unit ): SubscriberProperties { presenceMessages.forEach { presenceMessage -> - processPresenceMessage(presenceMessage, properties, subscriberInteractor) + properties.updateForPresenceMessage(presenceMessage) } + properties.emitEventsIfRequired() postWork(WorkerSpecification.SubscribeToChannel(callbackFunction)) return properties } diff --git a/subscribing-sdk/src/main/java/com/ably/tracking/subscriber/workerqueue/workers/StartConnectionWorker.kt b/subscribing-sdk/src/main/java/com/ably/tracking/subscriber/workerqueue/workers/StartConnectionWorker.kt index 41a945cbc..da366ec81 100644 --- a/subscribing-sdk/src/main/java/com/ably/tracking/subscriber/workerqueue/workers/StartConnectionWorker.kt +++ b/subscribing-sdk/src/main/java/com/ably/tracking/subscriber/workerqueue/workers/StartConnectionWorker.kt @@ -18,7 +18,7 @@ internal class StartConnectionWorker( doAsyncWork: (suspend () -> Unit) -> Unit, postWork: (WorkerSpecification) -> Unit ): SubscriberProperties { - subscriberInteractor.updateTrackableState(properties) + properties.emitEventsIfRequired() doAsyncWork { val startAblyConnectionResult = ably.startConnection() if (startAblyConnectionResult.isFailure) { diff --git a/subscribing-sdk/src/main/java/com/ably/tracking/subscriber/workerqueue/workers/UpdateChannelConnectionStateWorker.kt b/subscribing-sdk/src/main/java/com/ably/tracking/subscriber/workerqueue/workers/UpdateChannelConnectionStateWorker.kt index e4273e849..54899a955 100644 --- a/subscribing-sdk/src/main/java/com/ably/tracking/subscriber/workerqueue/workers/UpdateChannelConnectionStateWorker.kt +++ b/subscribing-sdk/src/main/java/com/ably/tracking/subscriber/workerqueue/workers/UpdateChannelConnectionStateWorker.kt @@ -1,22 +1,19 @@ package com.ably.tracking.subscriber.workerqueue.workers import com.ably.tracking.common.ConnectionStateChange -import com.ably.tracking.subscriber.SubscriberInteractor import com.ably.tracking.subscriber.SubscriberProperties import com.ably.tracking.common.workerqueue.Worker import com.ably.tracking.subscriber.workerqueue.WorkerSpecification internal class UpdateChannelConnectionStateWorker( private val channelConnectionStateChange: ConnectionStateChange, - private val subscriberInteractor: SubscriberInteractor ) : Worker { override fun doWork( properties: SubscriberProperties, doAsyncWork: (suspend () -> Unit) -> Unit, postWork: (WorkerSpecification) -> Unit ): SubscriberProperties { - properties.lastChannelConnectionStateChange = channelConnectionStateChange - subscriberInteractor.updateTrackableState(properties) + properties.updateForChannelConnectionStateChangeAndThenEmitEventsIfRequired(channelConnectionStateChange) return properties } diff --git a/subscribing-sdk/src/main/java/com/ably/tracking/subscriber/workerqueue/workers/UpdateConnectionStateWorker.kt b/subscribing-sdk/src/main/java/com/ably/tracking/subscriber/workerqueue/workers/UpdateConnectionStateWorker.kt index 35aa56fb4..12aec62a0 100644 --- a/subscribing-sdk/src/main/java/com/ably/tracking/subscriber/workerqueue/workers/UpdateConnectionStateWorker.kt +++ b/subscribing-sdk/src/main/java/com/ably/tracking/subscriber/workerqueue/workers/UpdateConnectionStateWorker.kt @@ -1,22 +1,19 @@ package com.ably.tracking.subscriber.workerqueue.workers import com.ably.tracking.common.ConnectionStateChange -import com.ably.tracking.subscriber.SubscriberInteractor import com.ably.tracking.subscriber.SubscriberProperties import com.ably.tracking.common.workerqueue.Worker import com.ably.tracking.subscriber.workerqueue.WorkerSpecification internal class UpdateConnectionStateWorker( private val connectionStateChange: ConnectionStateChange, - private val subscriberInteractor: SubscriberInteractor ) : Worker { override fun doWork( properties: SubscriberProperties, doAsyncWork: (suspend () -> Unit) -> Unit, postWork: (WorkerSpecification) -> Unit ): SubscriberProperties { - properties.lastConnectionStateChange = connectionStateChange - subscriberInteractor.updateTrackableState(properties) + properties.updateForConnectionStateChangeAndThenEmitEventsIfRequired(connectionStateChange) return properties } diff --git a/subscribing-sdk/src/main/java/com/ably/tracking/subscriber/workerqueue/workers/UpdatePublisherPresenceWorker.kt b/subscribing-sdk/src/main/java/com/ably/tracking/subscriber/workerqueue/workers/UpdatePublisherPresenceWorker.kt index 8b911df1f..1ae759930 100644 --- a/subscribing-sdk/src/main/java/com/ably/tracking/subscriber/workerqueue/workers/UpdatePublisherPresenceWorker.kt +++ b/subscribing-sdk/src/main/java/com/ably/tracking/subscriber/workerqueue/workers/UpdatePublisherPresenceWorker.kt @@ -1,22 +1,20 @@ package com.ably.tracking.subscriber.workerqueue.workers import com.ably.tracking.common.PresenceMessage -import com.ably.tracking.subscriber.SubscriberInteractor import com.ably.tracking.subscriber.SubscriberProperties -import com.ably.tracking.subscriber.processPresenceMessage import com.ably.tracking.common.workerqueue.Worker import com.ably.tracking.subscriber.workerqueue.WorkerSpecification internal class UpdatePublisherPresenceWorker( private val presenceMessage: PresenceMessage, - private val subscriberInteractor: SubscriberInteractor ) : Worker { override fun doWork( properties: SubscriberProperties, doAsyncWork: (suspend () -> Unit) -> Unit, postWork: (WorkerSpecification) -> Unit ): SubscriberProperties { - processPresenceMessage(presenceMessage, properties, subscriberInteractor) + properties.updateForPresenceMessage(presenceMessage) + properties.emitEventsIfRequired() return properties } diff --git a/subscribing-sdk/src/test/java/com/ably/tracking/subscriber/workerqueue/workers/ChangeResolutionWorkerTest.kt b/subscribing-sdk/src/test/java/com/ably/tracking/subscriber/workerqueue/workers/ChangeResolutionWorkerTest.kt deleted file mode 100644 index db01a5266..000000000 --- a/subscribing-sdk/src/test/java/com/ably/tracking/subscriber/workerqueue/workers/ChangeResolutionWorkerTest.kt +++ /dev/null @@ -1,41 +0,0 @@ -package com.ably.tracking.subscriber.workerqueue.workers - -import com.ably.tracking.Accuracy -import com.ably.tracking.Resolution -import com.ably.tracking.common.Ably -import com.ably.tracking.common.ResultCallbackFunction -import com.ably.tracking.subscriber.SubscriberProperties -import com.ably.tracking.test.common.mockUpdatePresenceDataSuccess -import io.mockk.mockk -import io.mockk.verify -import kotlinx.coroutines.ExperimentalCoroutinesApi -import kotlinx.coroutines.test.runBlockingTest -import org.junit.Assert -import org.junit.Test - -@ExperimentalCoroutinesApi -internal class ChangeResolutionWorkerTest { - - private val ably: Ably = mockk() - private val trackableId = "123123" - private val updatedResolution = Resolution(Accuracy.HIGH, 10, 10.0) - private val callbackFunction: ResultCallbackFunction = mockk(relaxed = true) - private val changeResolutionWorker = ChangeResolutionWorker(ably, trackableId, updatedResolution, callbackFunction) - - private val asyncWorks = mutableListOf Unit>() - - @Test - fun `should return Properties with updated resolution and notify callback`() = runBlockingTest { - // given - val initialProperties = SubscriberProperties(Resolution(Accuracy.BALANCED, 100, 100.0)) - ably.mockUpdatePresenceDataSuccess(trackableId) - - // when - val updatedProperties = changeResolutionWorker.doWork(initialProperties, asyncWorks.appendWork()) {} - asyncWorks.executeAll() - - // then - Assert.assertEquals(updatedResolution, updatedProperties.presenceData.resolution) - verify { callbackFunction.invoke(match { it.isSuccess }) } - } -} diff --git a/subscribing-sdk/src/test/java/com/ably/tracking/subscriber/workerqueue/workers/DisconnectWorkerTest.kt b/subscribing-sdk/src/test/java/com/ably/tracking/subscriber/workerqueue/workers/DisconnectWorkerTest.kt deleted file mode 100644 index 30dd60196..000000000 --- a/subscribing-sdk/src/test/java/com/ably/tracking/subscriber/workerqueue/workers/DisconnectWorkerTest.kt +++ /dev/null @@ -1,41 +0,0 @@ -package com.ably.tracking.subscriber.workerqueue.workers - -import com.ably.tracking.Accuracy -import com.ably.tracking.Resolution -import com.ably.tracking.common.Ably -import com.ably.tracking.subscriber.SubscriberProperties -import com.ably.tracking.test.common.mockDisconnectSuccess -import io.mockk.coVerify -import io.mockk.mockk -import io.mockk.verify -import kotlinx.coroutines.ExperimentalCoroutinesApi -import kotlinx.coroutines.test.runBlockingTest -import org.junit.Assert -import org.junit.Test - -@ExperimentalCoroutinesApi -internal class DisconnectWorkerTest { - - private val ably: Ably = mockk() - private val trackableId = "123123" - private val callbackFunction: () -> Unit = mockk(relaxed = true) - private val disconnectWorker = DisconnectWorker(ably, trackableId, callbackFunction) - - private val asyncWorks = mutableListOf Unit>() - - @Test - fun `should call ably disconnect and notify callback`() = runBlockingTest { - // given - val initialProperties = SubscriberProperties(Resolution(Accuracy.BALANCED, 100, 100.0)) - ably.mockDisconnectSuccess(trackableId) - - // when - val updatedProperties = disconnectWorker.doWork(initialProperties, asyncWorks.appendWork()) {} - asyncWorks.executeAll() - - // then - Assert.assertEquals(initialProperties, updatedProperties) - coVerify { ably.disconnect(trackableId, initialProperties.presenceData) } - verify { callbackFunction.invoke() } - } -} diff --git a/subscribing-sdk/src/test/java/com/ably/tracking/subscriber/workerqueue/workers/ProcessInitialPresenceMessagesWorkerTest.kt b/subscribing-sdk/src/test/java/com/ably/tracking/subscriber/workerqueue/workers/ProcessInitialPresenceMessagesWorkerTest.kt deleted file mode 100644 index c1976cab8..000000000 --- a/subscribing-sdk/src/test/java/com/ably/tracking/subscriber/workerqueue/workers/ProcessInitialPresenceMessagesWorkerTest.kt +++ /dev/null @@ -1,90 +0,0 @@ -package com.ably.tracking.subscriber.workerqueue.workers - -import com.ably.tracking.Accuracy -import com.ably.tracking.Resolution -import com.ably.tracking.common.ClientTypes -import com.ably.tracking.common.PresenceAction -import com.ably.tracking.common.PresenceData -import com.ably.tracking.common.PresenceMessage -import com.ably.tracking.common.ResultCallbackFunction -import com.ably.tracking.subscriber.SubscriberInteractor -import com.ably.tracking.subscriber.SubscriberProperties -import com.ably.tracking.subscriber.processPresenceMessage -import com.ably.tracking.subscriber.workerqueue.WorkerSpecification -import io.mockk.every -import io.mockk.just -import io.mockk.mockk -import io.mockk.mockkStatic -import io.mockk.runs -import io.mockk.unmockkStatic -import io.mockk.verify -import kotlinx.coroutines.ExperimentalCoroutinesApi -import kotlinx.coroutines.test.runBlockingTest -import org.junit.After -import org.junit.Assert -import org.junit.Before -import org.junit.Test - -@ExperimentalCoroutinesApi -internal class ProcessInitialPresenceMessagesWorkerTest { - - private val subscriberInteractor: SubscriberInteractor = mockk() - private val asyncWorks = mutableListOf Unit>() - private val postedWorks = mutableListOf() - - @Before - fun setup() { - mockkStatic("com.ably.tracking.subscriber.PresenceMessageProcessorKt") - every { processPresenceMessage(any(), any(), any()) } just runs - } - - @After - fun cleanup() { - unmockkStatic("com.ably.tracking.subscriber.PresenceMessageProcessorKt") - } - - @Test - fun `should process all presence messages`() = runBlockingTest { - // given - val initialProperties = anyProperties() - val presenceMessages = listOf(anyPresenceMessage(), anyPresenceMessage(), anyPresenceMessage()) - val worker = ProcessInitialPresenceMessagesWorker(presenceMessages, subscriberInteractor) {} - - // when - worker.doWork( - initialProperties, - asyncWorks.appendWork(), - postedWorks.appendSpecification() - ) - - // then - verify(exactly = presenceMessages.size) { - processPresenceMessage(any(), initialProperties, subscriberInteractor) - } - } - - @Test - fun `should post subscribe to channel work after processing presence messages`() = runBlockingTest { - // given - val callbackFunction: ResultCallbackFunction = {} - val worker = ProcessInitialPresenceMessagesWorker(emptyList(), subscriberInteractor, callbackFunction) - - // when - worker.doWork( - anyProperties(), - asyncWorks.appendWork(), - postedWorks.appendSpecification() - ) - - // then - Assert.assertEquals(WorkerSpecification.SubscribeToChannel(callbackFunction), postedWorks[0]) - } - - private fun anyPresenceMessage() = PresenceMessage( - PresenceAction.PRESENT_OR_ENTER, - PresenceData(ClientTypes.PUBLISHER, null, null), - clientId = "" - ) - - private fun anyProperties() = SubscriberProperties(Resolution(Accuracy.BALANCED, 100, 100.0)) -} diff --git a/subscribing-sdk/src/test/java/com/ably/tracking/subscriber/workerqueue/workers/StartConnectionWorkerTest.kt b/subscribing-sdk/src/test/java/com/ably/tracking/subscriber/workerqueue/workers/StartConnectionWorkerTest.kt deleted file mode 100644 index 428875786..000000000 --- a/subscribing-sdk/src/test/java/com/ably/tracking/subscriber/workerqueue/workers/StartConnectionWorkerTest.kt +++ /dev/null @@ -1,118 +0,0 @@ -package com.ably.tracking.subscriber.workerqueue.workers - -import com.ably.tracking.Accuracy -import com.ably.tracking.Resolution -import com.ably.tracking.common.Ably -import com.ably.tracking.common.ResultCallbackFunction -import com.ably.tracking.subscriber.SubscriberProperties -import com.ably.tracking.subscriber.SubscriberInteractor -import com.ably.tracking.subscriber.workerqueue.WorkerSpecification -import com.ably.tracking.test.common.mockConnectSuccess -import com.ably.tracking.test.common.mockConnectFailure -import com.ably.tracking.test.common.mockStartConnectionFailure -import com.ably.tracking.test.common.mockStartConnectionSuccess -import io.mockk.coVerify -import io.mockk.every -import io.mockk.just -import io.mockk.mockk -import io.mockk.runs -import io.mockk.verify -import kotlinx.coroutines.ExperimentalCoroutinesApi -import kotlinx.coroutines.test.runBlockingTest -import org.junit.Assert -import org.junit.Test - -@ExperimentalCoroutinesApi -internal class StartConnectionWorkerTest { - - private val ably: Ably = mockk() - private val trackableId = "123123" - private val subscriberInteractor: SubscriberInteractor = mockk { - every { updateTrackableState(any()) } just runs - } - private val callbackFunction: ResultCallbackFunction = mockk(relaxed = true) - private val startConnectionWorker = StartConnectionWorker(ably, subscriberInteractor, trackableId, callbackFunction) - - private val asyncWorks = mutableListOf Unit>() - private val postedWorks = mutableListOf() - - @Test - fun `should call ably connect and post update trackable worker specification on success`() = runBlockingTest { - // given - val initialProperties = SubscriberProperties(Resolution(Accuracy.BALANCED, 100, 100.0)) - ably.mockStartConnectionSuccess() - ably.mockConnectSuccess(trackableId) - - // when - val updatedProperties = - startConnectionWorker.doWork(initialProperties, asyncWorks.appendWork(), postedWorks.appendSpecification()) - asyncWorks.executeAll() - - // then - Assert.assertEquals(initialProperties, updatedProperties) - verify { subscriberInteractor.updateTrackableState(initialProperties) } - coVerify { - ably.startConnection() - ably.connect( - trackableId, initialProperties.presenceData, - useRewind = true, - willSubscribe = true - ) - } - Assert.assertEquals(WorkerSpecification.SubscribeForPresenceMessages(callbackFunction), postedWorks[0]) - } - - @Test - fun `should call ably connect and notify callback on failure`() = runBlockingTest { - // given - val initialProperties = SubscriberProperties(Resolution(Accuracy.BALANCED, 100, 100.0)) - ably.mockStartConnectionSuccess() - ably.mockConnectFailure(trackableId) - - // when - val updatedProperties = - startConnectionWorker.doWork(initialProperties, asyncWorks.appendWork(), postedWorks.appendSpecification()) - asyncWorks.executeAll() - - // then - Assert.assertEquals(initialProperties, updatedProperties) - verify { subscriberInteractor.updateTrackableState(initialProperties) } - coVerify { - ably.startConnection() - ably.connect( - trackableId, initialProperties.presenceData, - useRewind = true, - willSubscribe = true - ) - } - verify { callbackFunction.invoke(match { it.isFailure }) } - Assert.assertTrue(postedWorks.isEmpty()) - } - - @Test - fun `should notify callback about failure when starting Ably connection fails`() = runBlockingTest { - // given - val initialProperties = SubscriberProperties(Resolution(Accuracy.BALANCED, 100, 100.0)) - ably.mockStartConnectionFailure() - ably.mockConnectSuccess(trackableId) - - // when - val updatedProperties = - startConnectionWorker.doWork(initialProperties, asyncWorks.appendWork(), postedWorks.appendSpecification()) - asyncWorks.executeAll() - - // then - Assert.assertEquals(initialProperties, updatedProperties) - verify { subscriberInteractor.updateTrackableState(initialProperties) } - coVerify { ably.startConnection() } - coVerify(exactly = 0) { - ably.connect( - trackableId, initialProperties.presenceData, - useRewind = true, - willSubscribe = true - ) - } - verify { callbackFunction.invoke(match { it.isFailure }) } - Assert.assertTrue(postedWorks.isEmpty()) - } -} diff --git a/subscribing-sdk/src/test/java/com/ably/tracking/subscriber/workerqueue/workers/StopConnectionWorkerTest.kt b/subscribing-sdk/src/test/java/com/ably/tracking/subscriber/workerqueue/workers/StopConnectionWorkerTest.kt deleted file mode 100644 index b329dc6e0..000000000 --- a/subscribing-sdk/src/test/java/com/ably/tracking/subscriber/workerqueue/workers/StopConnectionWorkerTest.kt +++ /dev/null @@ -1,63 +0,0 @@ -package com.ably.tracking.subscriber.workerqueue.workers - -import com.ably.tracking.Accuracy -import com.ably.tracking.Resolution -import com.ably.tracking.common.Ably -import com.ably.tracking.common.ResultCallbackFunction -import com.ably.tracking.subscriber.SubscriberProperties -import com.ably.tracking.subscriber.SubscriberInteractor -import com.ably.tracking.subscriber.workerqueue.WorkerSpecification -import com.ably.tracking.test.common.mockCloseFailure -import com.ably.tracking.test.common.mockCloseSuccessWithDelay -import io.mockk.every -import io.mockk.just -import io.mockk.mockk -import io.mockk.runs -import io.mockk.verify -import kotlinx.coroutines.ExperimentalCoroutinesApi -import kotlinx.coroutines.test.runBlockingTest -import org.junit.Assert -import org.junit.Test - -@ExperimentalCoroutinesApi -internal class StopConnectionWorkerTest { - - private val ably: Ably = mockk() - private val subscriberInteractor: SubscriberInteractor = mockk { - every { notifyAssetIsOffline() } just runs - } - private val callbackFunction: ResultCallbackFunction = mockk(relaxed = true) - private val stopConnectionWorker = StopConnectionWorker(ably, subscriberInteractor, callbackFunction) - - private val asyncWorks = mutableListOf Unit>() - private val postedWorks = mutableListOf() - - @Test - fun `should call ably close and notify callback with success`() = runBlockingTest { - // given - val initialProperties = SubscriberProperties(Resolution(Accuracy.BALANCED, 100, 100.0)) - ably.mockCloseSuccessWithDelay(10) - - // when - val updatedProperties = - stopConnectionWorker.doWork(initialProperties, asyncWorks.appendWork(), postedWorks.appendSpecification()) - - // then - Assert.assertTrue(updatedProperties.isStopped) - verify { callbackFunction.invoke(match { it.isSuccess }) } - verify { subscriberInteractor.notifyAssetIsOffline() } - } - - @Test - fun `should call ably close and notify callback with failure when it fails`() = runBlockingTest { - // given - val initialProperties = SubscriberProperties(Resolution(Accuracy.BALANCED, 100, 100.0)) - ably.mockCloseFailure() - - // when - stopConnectionWorker.doWork(initialProperties, asyncWorks.appendWork(), postedWorks.appendSpecification()) - - // then - verify { callbackFunction.invoke(match { it.isFailure }) } - } -} diff --git a/subscribing-sdk/src/test/java/com/ably/tracking/subscriber/workerqueue/workers/SubscribeForPresenceMessagesWorkerTest.kt b/subscribing-sdk/src/test/java/com/ably/tracking/subscriber/workerqueue/workers/SubscribeForPresenceMessagesWorkerTest.kt deleted file mode 100644 index 8fc60c445..000000000 --- a/subscribing-sdk/src/test/java/com/ably/tracking/subscriber/workerqueue/workers/SubscribeForPresenceMessagesWorkerTest.kt +++ /dev/null @@ -1,125 +0,0 @@ -package com.ably.tracking.subscriber.workerqueue.workers - -import com.ably.tracking.Accuracy -import com.ably.tracking.Resolution -import com.ably.tracking.common.Ably -import com.ably.tracking.common.ClientTypes -import com.ably.tracking.common.PresenceAction -import com.ably.tracking.common.PresenceData -import com.ably.tracking.common.PresenceMessage -import com.ably.tracking.common.ResultCallbackFunction -import com.ably.tracking.subscriber.SubscriberProperties -import com.ably.tracking.subscriber.workerqueue.WorkerSpecification -import com.ably.tracking.test.common.mockGetCurrentPresenceError -import com.ably.tracking.test.common.mockGetCurrentPresenceSuccess -import com.ably.tracking.test.common.mockSubscribeToPresenceError -import com.ably.tracking.test.common.mockSubscribeToPresenceSuccess -import io.mockk.CapturingSlot -import io.mockk.mockk -import io.mockk.slot -import kotlinx.coroutines.ExperimentalCoroutinesApi -import kotlinx.coroutines.test.runBlockingTest -import org.junit.Assert -import org.junit.Test - -@ExperimentalCoroutinesApi -internal class SubscribeForPresenceMessagesWorkerTest { - - private val ably: Ably = mockk() - private val trackableId = "123123" - private val callbackFunction: ResultCallbackFunction = mockk(relaxed = true) - private val subscribeForPresenceMessagesWorker = - SubscribeForPresenceMessagesWorker(ably, trackableId, callbackFunction) - - private val asyncWorks = mutableListOf Unit>() - private val postedWorks = mutableListOf() - - @Test - fun `should post update presence work when presence listener is called`() = runBlockingTest { - // given - val initialProperties = SubscriberProperties(Resolution(Accuracy.BALANCED, 100, 100.0)) - val presenceListenerSlot: CapturingSlot<(PresenceMessage) -> Unit> = slot() - val presenceMessage = createPresenceMessage() - ably.mockGetCurrentPresenceSuccess(trackableId) - ably.mockSubscribeToPresenceSuccess(trackableId, presenceListenerSlot) - - // when - subscribeForPresenceMessagesWorker.doWork( - initialProperties, - asyncWorks.appendWork(), - postedWorks.appendSpecification() - ) - asyncWorks.executeAll() - presenceListenerSlot.captured.invoke(presenceMessage) - - // then - Assert.assertEquals(WorkerSpecification.UpdatePublisherPresence(presenceMessage), postedWorks[1]) - } - - private fun createPresenceMessage() = PresenceMessage( - PresenceAction.PRESENT_OR_ENTER, - PresenceData(ClientTypes.PUBLISHER, null, null), - clientId = "" - ) - - @Test - fun `should post process initial presence messages work when both get current presence and subscribe to presence return success`() = runBlockingTest { - // given - val initialProperties = SubscriberProperties(Resolution(Accuracy.BALANCED, 100, 100.0)) - val initialPresenceMessages = listOf(anyPresenceMessage()) - ably.mockGetCurrentPresenceSuccess(trackableId, initialPresenceMessages) - ably.mockSubscribeToPresenceSuccess(trackableId) - - // when - subscribeForPresenceMessagesWorker.doWork( - initialProperties, - asyncWorks.appendWork(), - postedWorks.appendSpecification() - ) - asyncWorks.executeAll() - - // then - Assert.assertEquals(WorkerSpecification.ProcessInitialPresenceMessages(initialPresenceMessages, callbackFunction), postedWorks[0]) - } - - @Test - fun `should post disconnect work when subscribe to presence returns failure`() = runBlockingTest { - // given - val initialProperties = SubscriberProperties(Resolution(Accuracy.BALANCED, 100, 100.0)) - ably.mockGetCurrentPresenceSuccess(trackableId) - ably.mockSubscribeToPresenceError(trackableId) - - // when - subscribeForPresenceMessagesWorker.doWork( - initialProperties, - asyncWorks.appendWork(), - postedWorks.appendSpecification() - ) - asyncWorks.executeAll() - - // then - Assert.assertTrue(postedWorks[0] is WorkerSpecification.Disconnect) - } - - @Test - fun `should post disconnect work when get current presence returns failure`() = runBlockingTest { - // given - val initialProperties = SubscriberProperties(Resolution(Accuracy.BALANCED, 100, 100.0)) - ably.mockGetCurrentPresenceError(trackableId) - ably.mockSubscribeToPresenceSuccess(trackableId) - - // when - subscribeForPresenceMessagesWorker.doWork( - initialProperties, - asyncWorks.appendWork(), - postedWorks.appendSpecification() - ) - asyncWorks.executeAll() - - // then - Assert.assertTrue(postedWorks[0] is WorkerSpecification.Disconnect) - } - - private fun anyPresenceMessage() = - PresenceMessage(PresenceAction.PRESENT_OR_ENTER, PresenceData(ClientTypes.PUBLISHER), "any-client-id") -} diff --git a/subscribing-sdk/src/test/java/com/ably/tracking/subscriber/workerqueue/workers/SubscribeToChannelWorkerTest.kt b/subscribing-sdk/src/test/java/com/ably/tracking/subscriber/workerqueue/workers/SubscribeToChannelWorkerTest.kt deleted file mode 100644 index 735d917c1..000000000 --- a/subscribing-sdk/src/test/java/com/ably/tracking/subscriber/workerqueue/workers/SubscribeToChannelWorkerTest.kt +++ /dev/null @@ -1,53 +0,0 @@ -package com.ably.tracking.subscriber.workerqueue.workers - -import com.ably.tracking.Accuracy -import com.ably.tracking.Resolution -import com.ably.tracking.common.ResultCallbackFunction -import com.ably.tracking.subscriber.SubscriberProperties -import com.ably.tracking.subscriber.SubscriberInteractor -import com.ably.tracking.subscriber.workerqueue.WorkerSpecification -import io.mockk.every -import io.mockk.just -import io.mockk.mockk -import io.mockk.runs -import io.mockk.verify -import kotlinx.coroutines.ExperimentalCoroutinesApi -import kotlinx.coroutines.test.runBlockingTest -import org.junit.Test - -@ExperimentalCoroutinesApi -internal class SubscribeToChannelWorkerTest { - - private val subscriberInteractor: SubscriberInteractor = mockk { - every { subscribeForChannelState() } just runs - every { subscribeForEnhancedEvents(any()) } just runs - every { subscribeForRawEvents(any()) } just runs - } - private val callbackFunction: ResultCallbackFunction = mockk(relaxed = true) - private val subscribeToChannelWorker = - SubscribeToChannelWorker(subscriberInteractor, callbackFunction) - - private val asyncWorks = mutableListOf Unit>() - private val postedWorks = mutableListOf() - - @Test - fun `should notify callback after calling subscriberInteractor`() = runBlockingTest { - // given - val initialProperties = SubscriberProperties(Resolution(Accuracy.BALANCED, 100, 100.0)) - - // when - subscribeToChannelWorker.doWork( - initialProperties, - asyncWorks.appendWork(), - postedWorks.appendSpecification() - ) - - // then - verify { - subscriberInteractor.subscribeForChannelState() - subscriberInteractor.subscribeForEnhancedEvents(initialProperties.presenceData) - subscriberInteractor.subscribeForRawEvents(initialProperties.presenceData) - callbackFunction.invoke(match { it.isSuccess }) - } - } -} diff --git a/subscribing-sdk/src/test/java/com/ably/tracking/subscriber/workerqueue/workers/UpdateChannelConnectionStateWorkerTest.kt b/subscribing-sdk/src/test/java/com/ably/tracking/subscriber/workerqueue/workers/UpdateChannelConnectionStateWorkerTest.kt deleted file mode 100644 index 4b218e3e2..000000000 --- a/subscribing-sdk/src/test/java/com/ably/tracking/subscriber/workerqueue/workers/UpdateChannelConnectionStateWorkerTest.kt +++ /dev/null @@ -1,50 +0,0 @@ -package com.ably.tracking.subscriber.workerqueue.workers - -import com.ably.tracking.Accuracy -import com.ably.tracking.Resolution -import com.ably.tracking.common.ConnectionState -import com.ably.tracking.common.ConnectionStateChange -import com.ably.tracking.subscriber.SubscriberProperties -import com.ably.tracking.subscriber.SubscriberInteractor -import com.ably.tracking.subscriber.workerqueue.WorkerSpecification -import io.mockk.every -import io.mockk.just -import io.mockk.mockk -import io.mockk.runs -import io.mockk.verify -import kotlinx.coroutines.ExperimentalCoroutinesApi -import kotlinx.coroutines.test.runBlockingTest -import org.junit.Assert -import org.junit.Test - -@ExperimentalCoroutinesApi -internal class UpdateChannelConnectionStateWorkerTest { - - private val subscriberInteractor: SubscriberInteractor = mockk { - every { updateTrackableState(any()) } just runs - } - private val channelConnectionStateChange = ConnectionStateChange(ConnectionState.ONLINE, null) - private val updateChannelConnectionStateWorker = - UpdateChannelConnectionStateWorker(channelConnectionStateChange, subscriberInteractor) - - private val asyncWorks = mutableListOf Unit>() - private val postedWorks = mutableListOf() - - @Test - fun `should call updateTrackableState and update properties`() = runBlockingTest { - // given - val initialProperties = SubscriberProperties(Resolution(Accuracy.BALANCED, 100, 100.0)) - - // when - val updatedProperties = - updateChannelConnectionStateWorker.doWork( - initialProperties, - asyncWorks.appendWork(), - postedWorks.appendSpecification() - ) - - // then - Assert.assertEquals(channelConnectionStateChange, updatedProperties.lastChannelConnectionStateChange) - verify { subscriberInteractor.updateTrackableState(updatedProperties) } - } -} diff --git a/subscribing-sdk/src/test/java/com/ably/tracking/subscriber/workerqueue/workers/UpdateConnectionStateWorkerTest.kt b/subscribing-sdk/src/test/java/com/ably/tracking/subscriber/workerqueue/workers/UpdateConnectionStateWorkerTest.kt deleted file mode 100644 index 361853edf..000000000 --- a/subscribing-sdk/src/test/java/com/ably/tracking/subscriber/workerqueue/workers/UpdateConnectionStateWorkerTest.kt +++ /dev/null @@ -1,50 +0,0 @@ -package com.ably.tracking.subscriber.workerqueue.workers - -import com.ably.tracking.Accuracy -import com.ably.tracking.Resolution -import com.ably.tracking.common.ConnectionState -import com.ably.tracking.common.ConnectionStateChange -import com.ably.tracking.subscriber.SubscriberProperties -import com.ably.tracking.subscriber.SubscriberInteractor -import com.ably.tracking.subscriber.workerqueue.WorkerSpecification -import io.mockk.every -import io.mockk.just -import io.mockk.mockk -import io.mockk.runs -import io.mockk.verify -import kotlinx.coroutines.ExperimentalCoroutinesApi -import kotlinx.coroutines.test.runBlockingTest -import org.junit.Assert -import org.junit.Test - -@ExperimentalCoroutinesApi -internal class UpdateConnectionStateWorkerTest { - - private val subscriberInteractor: SubscriberInteractor = mockk { - every { updateTrackableState(any()) } just runs - } - private val connectionStateChange = ConnectionStateChange(ConnectionState.ONLINE, null) - private val updateConnectionStateWorker = - UpdateConnectionStateWorker(connectionStateChange, subscriberInteractor) - - private val asyncWorks = mutableListOf Unit>() - private val postedWorks = mutableListOf() - - @Test - fun `should call updateTrackableState and update properties`() = runBlockingTest { - // given - val initialProperties = SubscriberProperties(Resolution(Accuracy.BALANCED, 100, 100.0)) - - // when - val updatedProperties = - updateConnectionStateWorker.doWork( - initialProperties, - asyncWorks.appendWork(), - postedWorks.appendSpecification() - ) - - // then - Assert.assertEquals(connectionStateChange, updatedProperties.lastConnectionStateChange) - verify { subscriberInteractor.updateTrackableState(updatedProperties) } - } -} diff --git a/subscribing-sdk/src/test/java/com/ably/tracking/subscriber/workerqueue/workers/UpdatePublisherPresenceWorkerTest.kt b/subscribing-sdk/src/test/java/com/ably/tracking/subscriber/workerqueue/workers/UpdatePublisherPresenceWorkerTest.kt deleted file mode 100644 index 69face439..000000000 --- a/subscribing-sdk/src/test/java/com/ably/tracking/subscriber/workerqueue/workers/UpdatePublisherPresenceWorkerTest.kt +++ /dev/null @@ -1,101 +0,0 @@ -package com.ably.tracking.subscriber.workerqueue.workers - -import com.ably.tracking.Accuracy -import com.ably.tracking.Resolution -import com.ably.tracking.common.ClientTypes -import com.ably.tracking.common.PresenceAction -import com.ably.tracking.common.PresenceData -import com.ably.tracking.common.PresenceMessage -import com.ably.tracking.subscriber.SubscriberProperties -import com.ably.tracking.subscriber.SubscriberInteractor -import com.ably.tracking.subscriber.workerqueue.WorkerSpecification -import io.mockk.every -import io.mockk.just -import io.mockk.mockk -import io.mockk.runs -import io.mockk.verify -import kotlinx.coroutines.ExperimentalCoroutinesApi -import kotlinx.coroutines.test.runBlockingTest -import org.junit.Test - -@ExperimentalCoroutinesApi -internal class UpdatePublisherPresenceWorkerTest { - - private val subscriberInteractor: SubscriberInteractor = mockk { - every { updatePublisherPresence(any(), any()) } just runs - every { updateTrackableState(any()) } just runs - every { updatePublisherResolutionInformation(any()) } just runs - } - - private val asyncWorks = mutableListOf Unit>() - private val postedWorks = mutableListOf() - - @Test - fun `should call subscriber interactor for PRESENT_OR_ENTER presence message`() = runBlockingTest { - // given - val initialProperties = SubscriberProperties(Resolution(Accuracy.BALANCED, 100, 100.0)) - val presenceMessage = createPresenceMessage(PresenceAction.PRESENT_OR_ENTER) - val worker = UpdatePublisherPresenceWorker(presenceMessage, subscriberInteractor) - - // when - worker.doWork( - initialProperties, - asyncWorks.appendWork(), - postedWorks.appendSpecification() - ) - - // then - verify { - subscriberInteractor.updatePublisherPresence(initialProperties, true) - subscriberInteractor.updateTrackableState(initialProperties) - subscriberInteractor.updatePublisherResolutionInformation(presenceMessage.data) - } - } - - @Test - fun `should call subscriber interactor for LEAVE_OR_ABSENT presence message`() = runBlockingTest { - // given - val initialProperties = SubscriberProperties(Resolution(Accuracy.BALANCED, 100, 100.0)) - val presenceMessage = createPresenceMessage(PresenceAction.LEAVE_OR_ABSENT) - val worker = UpdatePublisherPresenceWorker(presenceMessage, subscriberInteractor) - - // when - worker.doWork( - initialProperties, - asyncWorks.appendWork(), - postedWorks.appendSpecification() - ) - - // then - verify { - subscriberInteractor.updatePublisherPresence(initialProperties, false) - subscriberInteractor.updateTrackableState(initialProperties) - } - } - - @Test - fun `should call subscriber interactor for UPDATE presence message`() = runBlockingTest { - // given - val initialProperties = SubscriberProperties(Resolution(Accuracy.BALANCED, 100, 100.0)) - val presenceMessage = createPresenceMessage(PresenceAction.UPDATE) - val worker = UpdatePublisherPresenceWorker(presenceMessage, subscriberInteractor) - - // when - worker.doWork( - initialProperties, - asyncWorks.appendWork(), - postedWorks.appendSpecification() - ) - - // then - verify { - subscriberInteractor.updatePublisherResolutionInformation(presenceMessage.data) - } - } - - private fun createPresenceMessage(action: PresenceAction) = PresenceMessage( - action, - PresenceData(ClientTypes.PUBLISHER, null, null), - clientId = "" - ) -} diff --git a/subscribing-sdk/src/test/java/com/ably/tracking/subscriber/workerqueue/workers/WorkerTestUtils.kt b/subscribing-sdk/src/test/java/com/ably/tracking/subscriber/workerqueue/workers/WorkerTestUtils.kt deleted file mode 100644 index 941b7162f..000000000 --- a/subscribing-sdk/src/test/java/com/ably/tracking/subscriber/workerqueue/workers/WorkerTestUtils.kt +++ /dev/null @@ -1,17 +0,0 @@ -package com.ably.tracking.subscriber.workerqueue.workers - -import com.ably.tracking.subscriber.workerqueue.WorkerSpecification - -fun MutableList Unit>.appendWork(): (suspend () -> Unit) -> Unit = - { asyncWork -> - add(asyncWork) - } - -suspend fun MutableList Unit>.executeAll() { - forEach { it.invoke() } -} - -internal fun MutableList.appendSpecification(): (WorkerSpecification) -> Unit = - { workSpecification -> - add(workSpecification) - } From 55752b06615f936dbfe4171000eefb151298d004 Mon Sep 17 00:00:00 2001 From: Quintin Willison Date: Fri, 16 Dec 2022 21:28:08 +0000 Subject: [PATCH 02/19] Use memberKey (clientId + connectionId) rather than just clientId for uniquely identifying the source of presence messages. --- .../com/ably/tracking/common/AblyHelpers.kt | 2 +- .../com/ably/tracking/common/AblyModels.kt | 11 ++++++++- .../tracking/publisher/ConfigurationModels.kt | 2 +- .../ably/tracking/publisher/CorePublisher.kt | 24 ++++++++++++------- .../workers/PresenceMessageWorker.kt | 6 ++--- .../workers/PresenceMessageWorkerTest.kt | 2 +- 6 files changed, 31 insertions(+), 16 deletions(-) diff --git a/common/src/main/java/com/ably/tracking/common/AblyHelpers.kt b/common/src/main/java/com/ably/tracking/common/AblyHelpers.kt index 0c45c9abe..70133b979 100644 --- a/common/src/main/java/com/ably/tracking/common/AblyHelpers.kt +++ b/common/src/main/java/com/ably/tracking/common/AblyHelpers.kt @@ -161,7 +161,7 @@ fun io.ably.lib.types.PresenceMessage.toTracking(gson: Gson): PresenceMessage? = PresenceMessage( this.action.toTracking(), presenceData, - this.clientId + this.memberKey(), ) } diff --git a/common/src/main/java/com/ably/tracking/common/AblyModels.kt b/common/src/main/java/com/ably/tracking/common/AblyModels.kt index 4e3145399..c557516a0 100644 --- a/common/src/main/java/com/ably/tracking/common/AblyModels.kt +++ b/common/src/main/java/com/ably/tracking/common/AblyModels.kt @@ -2,7 +2,16 @@ package com.ably.tracking.common import com.ably.tracking.Resolution -data class PresenceMessage(val action: PresenceAction, val data: PresenceData, val clientId: String) +data class PresenceMessage( + val action: PresenceAction, + val data: PresenceData, + + /** + * Combination of Ably `clientId` and `connectionId`. + * See: https://sdk.ably.com/builds/ably/specification/main/features/#TP3h + */ + val memberKey: String, +) enum class PresenceAction { PRESENT_OR_ENTER, LEAVE_OR_ABSENT, UPDATE; diff --git a/publishing-sdk/src/main/java/com/ably/tracking/publisher/ConfigurationModels.kt b/publishing-sdk/src/main/java/com/ably/tracking/publisher/ConfigurationModels.kt index d2a0c8f57..1e9878246 100644 --- a/publishing-sdk/src/main/java/com/ably/tracking/publisher/ConfigurationModels.kt +++ b/publishing-sdk/src/main/java/com/ably/tracking/publisher/ConfigurationModels.kt @@ -260,7 +260,7 @@ data class Trackable( override fun hashCode(): Int = id.hashCode() } -data class Subscriber(val id: String, val trackable: Trackable) +data class Subscriber(val memberKey: String, val trackable: Trackable) sealed class Proximity diff --git a/publishing-sdk/src/main/java/com/ably/tracking/publisher/CorePublisher.kt b/publishing-sdk/src/main/java/com/ably/tracking/publisher/CorePublisher.kt index 92de376bb..07aa693a5 100644 --- a/publishing-sdk/src/main/java/com/ably/tracking/publisher/CorePublisher.kt +++ b/publishing-sdk/src/main/java/com/ably/tracking/publisher/CorePublisher.kt @@ -52,16 +52,17 @@ internal interface CorePublisher { val routingProfile: RoutingProfile val trackableStateFlows: Map> - fun addSubscriber(id: String, trackable: Trackable, data: PresenceData, properties: PublisherProperties) + fun addSubscriber(memberKey: String, trackable: Trackable, data: PresenceData, properties: PublisherProperties) fun updateSubscriber( - id: String, + memberKey: String, trackable: Trackable, data: PresenceData, properties: PublisherProperties ) - fun removeSubscriber(id: String, trackable: Trackable, properties: PublisherProperties) + fun removeSubscriber(memberKey: String, trackable: Trackable, properties: PublisherProperties) fun removeAllSubscribers(trackable: Trackable, properties: PublisherProperties) + fun setDestination(destination: Destination, properties: PublisherProperties) fun removeCurrentDestination(properties: PublisherProperties) fun startLocationUpdates(properties: PublisherProperties) @@ -492,8 +493,13 @@ constructor( } } - override fun addSubscriber(id: String, trackable: Trackable, data: PresenceData, properties: PublisherProperties) { - val subscriber = Subscriber(id, trackable) + override fun addSubscriber( + memberKey: String, + trackable: Trackable, + data: PresenceData, + properties: PublisherProperties + ) { + val subscriber = Subscriber(memberKey, trackable) if (properties.subscribers[trackable.id] == null) { properties.subscribers[trackable.id] = mutableSetOf() } @@ -504,13 +510,13 @@ constructor( } override fun updateSubscriber( - id: String, + memberKey: String, trackable: Trackable, data: PresenceData, properties: PublisherProperties ) { properties.subscribers[trackable.id]?.let { subscribers -> - subscribers.find { it.id == id }?.let { subscriber -> + subscribers.find { it.memberKey == memberKey }?.let { subscriber -> data.resolution.let { resolution -> saveOrRemoveResolutionRequest(resolution, trackable, subscriber, properties) resolveResolution(trackable, properties) @@ -519,9 +525,9 @@ constructor( } } - override fun removeSubscriber(id: String, trackable: Trackable, properties: PublisherProperties) { + override fun removeSubscriber(memberKey: String, trackable: Trackable, properties: PublisherProperties) { properties.subscribers[trackable.id]?.let { subscribers -> - subscribers.find { it.id == id }?.let { subscriber -> + subscribers.find { it.memberKey == memberKey }?.let { subscriber -> subscribers.remove(subscriber) properties.requests[trackable.id]?.remove(subscriber) hooks.subscribers?.onSubscriberRemoved(subscriber) diff --git a/publishing-sdk/src/main/java/com/ably/tracking/publisher/workerqueue/workers/PresenceMessageWorker.kt b/publishing-sdk/src/main/java/com/ably/tracking/publisher/workerqueue/workers/PresenceMessageWorker.kt index d0d308e14..57b3df43e 100644 --- a/publishing-sdk/src/main/java/com/ably/tracking/publisher/workerqueue/workers/PresenceMessageWorker.kt +++ b/publishing-sdk/src/main/java/com/ably/tracking/publisher/workerqueue/workers/PresenceMessageWorker.kt @@ -19,7 +19,7 @@ internal class PresenceMessageWorker( PresenceAction.PRESENT_OR_ENTER -> { if (presenceMessage.data.type == ClientTypes.SUBSCRIBER) { corePublisher.addSubscriber( - presenceMessage.clientId, + presenceMessage.memberKey, trackable, presenceMessage.data, properties @@ -28,13 +28,13 @@ internal class PresenceMessageWorker( } PresenceAction.LEAVE_OR_ABSENT -> { if (presenceMessage.data.type == ClientTypes.SUBSCRIBER) { - corePublisher.removeSubscriber(presenceMessage.clientId, trackable, properties) + corePublisher.removeSubscriber(presenceMessage.memberKey, trackable, properties) } } PresenceAction.UPDATE -> { if (presenceMessage.data.type == ClientTypes.SUBSCRIBER) { corePublisher.updateSubscriber( - presenceMessage.clientId, + presenceMessage.memberKey, trackable, presenceMessage.data, properties diff --git a/publishing-sdk/src/test/java/com/ably/tracking/publisher/workerqueue/workers/PresenceMessageWorkerTest.kt b/publishing-sdk/src/test/java/com/ably/tracking/publisher/workerqueue/workers/PresenceMessageWorkerTest.kt index 2ef5fa898..a69e5510f 100644 --- a/publishing-sdk/src/test/java/com/ably/tracking/publisher/workerqueue/workers/PresenceMessageWorkerTest.kt +++ b/publishing-sdk/src/test/java/com/ably/tracking/publisher/workerqueue/workers/PresenceMessageWorkerTest.kt @@ -133,7 +133,7 @@ class PresenceMessageWorkerTest { PresenceMessage( action, PresenceData(if (isSubscriber) ClientTypes.SUBSCRIBER else ClientTypes.PUBLISHER), - "test-client-id" + "test-member-key" ), corePublisher ) From 9198979f65bcf970aba19ccfa2ec5ea064ed7208 Mon Sep 17 00:00:00 2001 From: Quintin Willison Date: Fri, 16 Dec 2022 21:31:09 +0000 Subject: [PATCH 03/19] Enable Subscribers to support presence messages from multiple Publisher instances. So, for example, this fixes behaviour for the sequence: - Publisher with Member Key A:1 Enters Presence - Publisher with Member Key A:2 Enters Presence - Publisher with Member Key A:1 Leaves Presence Previously this would flag "the" publisher as offline. With this change, it will now continue to flag "the" publisher as online. --- .../ably/tracking/subscriber/CoreSubscriber.kt | 18 ++++++++++-------- 1 file changed, 10 insertions(+), 8 deletions(-) diff --git a/subscribing-sdk/src/main/java/com/ably/tracking/subscriber/CoreSubscriber.kt b/subscribing-sdk/src/main/java/com/ably/tracking/subscriber/CoreSubscriber.kt index 2d386ea4d..5f9fd6183 100644 --- a/subscribing-sdk/src/main/java/com/ably/tracking/subscriber/CoreSubscriber.kt +++ b/subscribing-sdk/src/main/java/com/ably/tracking/subscriber/CoreSubscriber.kt @@ -147,8 +147,8 @@ internal data class SubscriberProperties private constructor( override var isStopped: Boolean = false, - private var isPublisherOnline: Boolean = false, // TODO what if there are multiple publishers? - private var lastEmittedIsPublisherOnline: Boolean? = null, + private var presentPublisherMemberKeys: MutableSet = HashSet(), + private var lastEmittedIsAPublisherPresent: Boolean? = null, private var lastEmittedTrackableState: TrackableState = TrackableState.Offline(), private var lastConnectionStateChange: ConnectionStateChange = ConnectionStateChange(ConnectionState.OFFLINE, null), @@ -179,10 +179,10 @@ internal data class SubscriberProperties private constructor( if (presenceMessage.action == PresenceAction.LEAVE_OR_ABSENT) { // LEAVE or ABSENT - isPublisherOnline = false + presentPublisherMemberKeys.remove(presenceMessage.memberKey) } else { // PRESENT, ENTER or UDPATE - isPublisherOnline = true + presentPublisherMemberKeys.add(presenceMessage.memberKey) presenceMessage.data.resolution?.let { publisherResolution -> pendingPublisherResolutions.add(publisherResolution) } @@ -190,10 +190,12 @@ internal data class SubscriberProperties private constructor( } fun emitEventsIfRequired() { + val isAPublisherPresent = (presentPublisherMemberKeys.size > 0) + val trackableState = when (lastConnectionStateChange.state) { ConnectionState.ONLINE -> { when (lastChannelConnectionStateChange.state) { - ConnectionState.ONLINE -> if (isPublisherOnline) TrackableState.Online else TrackableState.Offline() + ConnectionState.ONLINE -> if (isAPublisherPresent) TrackableState.Online else TrackableState.Offline() ConnectionState.OFFLINE -> TrackableState.Offline() ConnectionState.FAILED -> TrackableState.Failed(lastChannelConnectionStateChange.errorInformation!!) // are we sure error information will always be present? } @@ -207,9 +209,9 @@ internal data class SubscriberProperties private constructor( stateFlows.scope.launch { stateFlows.trackableStateFlow.emit(trackableState) } } - if (null == lastEmittedIsPublisherOnline || lastEmittedIsPublisherOnline!! != isPublisherOnline) { - lastEmittedIsPublisherOnline = isPublisherOnline - stateFlows.scope.launch { stateFlows.publisherPresenceStateFlow.emit(isPublisherOnline) } + if (null == lastEmittedIsAPublisherPresent || lastEmittedIsAPublisherPresent!! != isAPublisherPresent) { + lastEmittedIsAPublisherPresent = isAPublisherPresent + stateFlows.scope.launch { stateFlows.publisherPresenceStateFlow.emit(isAPublisherPresent) } } val publisherResolutions = pendingPublisherResolutions.drain() From 86125f717535304c7a6aa8033a0ec9669701e2b5 Mon Sep 17 00:00:00 2001 From: Quintin Willison Date: Fri, 16 Dec 2022 21:36:22 +0000 Subject: [PATCH 04/19] Fix bug where we were indicating that a publisher was present, despite our local connectivity state being offline. --- .../com/ably/tracking/subscriber/CoreSubscriber.kt | 13 +++++++++---- 1 file changed, 9 insertions(+), 4 deletions(-) diff --git a/subscribing-sdk/src/main/java/com/ably/tracking/subscriber/CoreSubscriber.kt b/subscribing-sdk/src/main/java/com/ably/tracking/subscriber/CoreSubscriber.kt index 5f9fd6183..325a6e41c 100644 --- a/subscribing-sdk/src/main/java/com/ably/tracking/subscriber/CoreSubscriber.kt +++ b/subscribing-sdk/src/main/java/com/ably/tracking/subscriber/CoreSubscriber.kt @@ -148,7 +148,7 @@ internal data class SubscriberProperties private constructor( override var isStopped: Boolean = false, private var presentPublisherMemberKeys: MutableSet = HashSet(), - private var lastEmittedIsAPublisherPresent: Boolean? = null, + private var lastEmittedIsPublisherVisible: Boolean? = null, private var lastEmittedTrackableState: TrackableState = TrackableState.Offline(), private var lastConnectionStateChange: ConnectionStateChange = ConnectionStateChange(ConnectionState.OFFLINE, null), @@ -209,9 +209,14 @@ internal data class SubscriberProperties private constructor( stateFlows.scope.launch { stateFlows.trackableStateFlow.emit(trackableState) } } - if (null == lastEmittedIsAPublisherPresent || lastEmittedIsAPublisherPresent!! != isAPublisherPresent) { - lastEmittedIsAPublisherPresent = isAPublisherPresent - stateFlows.scope.launch { stateFlows.publisherPresenceStateFlow.emit(isAPublisherPresent) } + // It is possible for presentPublisherMemberKeys to not be empty, even when we have no connectivity from our side, + // because we've had presence entry events without subsequent leave events. + // Therefore, from the perspective of a user consuming events from publisherPresenceStateFlow, what matters + // is what we're computing for isPublisherVisible (not the simple isAPublisherPresent). + val isPublisherVisible = (isAPublisherPresent && lastConnectionStateChange.state == ConnectionState.ONLINE) + if (null == lastEmittedIsPublisherVisible || lastEmittedIsPublisherVisible!! != isPublisherVisible) { + lastEmittedIsPublisherVisible = isPublisherVisible + stateFlows.scope.launch { stateFlows.publisherPresenceStateFlow.emit(isPublisherVisible) } } val publisherResolutions = pendingPublisherResolutions.drain() From 33c52a5d6c4038764803a6a4f0ded7a675bc109a Mon Sep 17 00:00:00 2001 From: Quintin Willison Date: Mon, 19 Dec 2022 10:30:06 +0000 Subject: [PATCH 05/19] Reduce visibility of utility class. --- .../tracking/subscriber/CoreSubscriber.kt | 28 +++++++++---------- 1 file changed, 14 insertions(+), 14 deletions(-) diff --git a/subscribing-sdk/src/main/java/com/ably/tracking/subscriber/CoreSubscriber.kt b/subscribing-sdk/src/main/java/com/ably/tracking/subscriber/CoreSubscriber.kt index 325a6e41c..3fca20fa7 100644 --- a/subscribing-sdk/src/main/java/com/ably/tracking/subscriber/CoreSubscriber.kt +++ b/subscribing-sdk/src/main/java/com/ably/tracking/subscriber/CoreSubscriber.kt @@ -127,20 +127,6 @@ private class DefaultCoreSubscriber( } } -private class PendingResolutions { - private var resolutions: MutableList = ArrayList() - - fun add(resolution: Resolution) { - resolutions.add(resolution) - } - - fun drain(): Array { - val array = resolutions.toTypedArray() - resolutions.clear() - return array - } -} - internal data class SubscriberProperties private constructor( var presenceData: PresenceData, private val stateFlows: EventFlows, @@ -239,4 +225,18 @@ internal data class SubscriberProperties private constructor( val resolutions: MutableSharedFlow = MutableSharedFlow(replay = 1), val nextLocationUpdateIntervals: MutableSharedFlow = MutableSharedFlow(replay = 1), ) + + private class PendingResolutions { + private var resolutions: MutableList = ArrayList() + + fun add(resolution: Resolution) { + resolutions.add(resolution) + } + + fun drain(): Array { + val array = resolutions.toTypedArray() + resolutions.clear() + return array + } + } } From 09c8baa9e466483d08f38e11e6871e6f4e27fa14 Mon Sep 17 00:00:00 2001 From: Quintin Willison Date: Mon, 19 Dec 2022 10:32:53 +0000 Subject: [PATCH 06/19] Use isNotEmpty() rather than size for checking collections. --- .../main/java/com/ably/tracking/subscriber/CoreSubscriber.kt | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/subscribing-sdk/src/main/java/com/ably/tracking/subscriber/CoreSubscriber.kt b/subscribing-sdk/src/main/java/com/ably/tracking/subscriber/CoreSubscriber.kt index 3fca20fa7..981b31b17 100644 --- a/subscribing-sdk/src/main/java/com/ably/tracking/subscriber/CoreSubscriber.kt +++ b/subscribing-sdk/src/main/java/com/ably/tracking/subscriber/CoreSubscriber.kt @@ -176,7 +176,7 @@ internal data class SubscriberProperties private constructor( } fun emitEventsIfRequired() { - val isAPublisherPresent = (presentPublisherMemberKeys.size > 0) + val isAPublisherPresent = (presentPublisherMemberKeys.isNotEmpty()) val trackableState = when (lastConnectionStateChange.state) { ConnectionState.ONLINE -> { @@ -206,7 +206,7 @@ internal data class SubscriberProperties private constructor( } val publisherResolutions = pendingPublisherResolutions.drain() - if (publisherResolutions.size > 0) { + if (publisherResolutions.isNotEmpty()) { stateFlows.scope.launch { for (publisherResolution in publisherResolutions) { stateFlows.resolutions.emit(publisherResolution) From aeb1cbcabaa784eef023b8cfb5b86339e30dcda3 Mon Sep 17 00:00:00 2001 From: Quintin Willison Date: Mon, 19 Dec 2022 11:16:19 +0000 Subject: [PATCH 07/19] Further encapsulate the subscriber event flows. --- .../tracking/subscriber/CoreSubscriber.kt | 97 +++++++++++++------ 1 file changed, 67 insertions(+), 30 deletions(-) diff --git a/subscribing-sdk/src/main/java/com/ably/tracking/subscriber/CoreSubscriber.kt b/subscribing-sdk/src/main/java/com/ably/tracking/subscriber/CoreSubscriber.kt index 981b31b17..c510199a8 100644 --- a/subscribing-sdk/src/main/java/com/ably/tracking/subscriber/CoreSubscriber.kt +++ b/subscribing-sdk/src/main/java/com/ably/tracking/subscriber/CoreSubscriber.kt @@ -67,30 +67,31 @@ private class DefaultCoreSubscriber( private val properties: SubscriberProperties override val enhancedLocations: SharedFlow - get() = eventFlows.enhancedLocations.asSharedFlow() + get() = eventFlows.enhancedLocations override val rawLocations: SharedFlow - get() = eventFlows.rawLocations.asSharedFlow() + get() = eventFlows.rawLocations override val trackableStates: StateFlow - get() = eventFlows.trackableStateFlow.asStateFlow() + get() = eventFlows.trackableStates override val publisherPresence: StateFlow - get() = eventFlows.publisherPresenceStateFlow + get() = eventFlows.publisherPresence override val resolutions: SharedFlow - get() = eventFlows.resolutions.asSharedFlow() + get() = eventFlows.resolutions override val nextLocationUpdateIntervals: SharedFlow - get() = eventFlows.nextLocationUpdateIntervals.asSharedFlow() + get() = eventFlows.nextLocationUpdateIntervals init { val workerFactory = WorkerFactory(this, ably, trackableId) - eventFlows = SubscriberProperties.EventFlows() + val scope = CoroutineScope(singleThreadDispatcher + SupervisorJob()) + eventFlows = SubscriberProperties.EventFlows(scope) properties = SubscriberProperties(initialResolution, eventFlows) workerQueue = WorkerQueue( properties = properties, - scope = eventFlows.scope, + scope = scope, workerFactory = workerFactory, copyProperties = { copy() }, getStoppedException = { SubscriberStoppedException() } @@ -111,25 +112,25 @@ private class DefaultCoreSubscriber( override fun subscribeForEnhancedEvents(presenceData: PresenceData) { ably.subscribeForEnhancedEvents(trackableId, presenceData) { - eventFlows.scope.launch { eventFlows.enhancedLocations.emit(it) } + eventFlows.emitEnhanced(it) } } override fun subscribeForRawEvents(presenceData: PresenceData) { ably.subscribeForRawEvents(trackableId, presenceData) { - eventFlows.scope.launch { eventFlows.rawLocations.emit(it) } + eventFlows.emitRaw(it) } } override fun notifyAssetIsOffline() { // TODO what is this method achieving, why is it not in normal flow? - eventFlows.scope.launch { eventFlows.trackableStateFlow.emit(TrackableState.Offline()) } + eventFlows.emit(TrackableState.Offline()) } } internal data class SubscriberProperties private constructor( var presenceData: PresenceData, - private val stateFlows: EventFlows, + private val eventFlows: EventFlows, override var isStopped: Boolean = false, @@ -192,7 +193,7 @@ internal data class SubscriberProperties private constructor( if (trackableState != lastEmittedTrackableState) { lastEmittedTrackableState = trackableState - stateFlows.scope.launch { stateFlows.trackableStateFlow.emit(trackableState) } + eventFlows.emit(trackableState) } // It is possible for presentPublisherMemberKeys to not be empty, even when we have no connectivity from our side, @@ -202,29 +203,65 @@ internal data class SubscriberProperties private constructor( val isPublisherVisible = (isAPublisherPresent && lastConnectionStateChange.state == ConnectionState.ONLINE) if (null == lastEmittedIsPublisherVisible || lastEmittedIsPublisherVisible!! != isPublisherVisible) { lastEmittedIsPublisherVisible = isPublisherVisible - stateFlows.scope.launch { stateFlows.publisherPresenceStateFlow.emit(isPublisherVisible) } + eventFlows.emitPublisherPresence(isPublisherVisible) } - val publisherResolutions = pendingPublisherResolutions.drain() - if (publisherResolutions.isNotEmpty()) { - stateFlows.scope.launch { - for (publisherResolution in publisherResolutions) { - stateFlows.resolutions.emit(publisherResolution) - stateFlows.nextLocationUpdateIntervals.emit(publisherResolution.desiredInterval) + eventFlows.emit(pendingPublisherResolutions.drain()) + } + + internal class EventFlows(private val scope: CoroutineScope) { + private val _enhancedLocations: MutableSharedFlow = MutableSharedFlow(replay = 1) + private val _rawLocations: MutableSharedFlow = MutableSharedFlow(replay = 1) + private val _trackableStates: MutableStateFlow = MutableStateFlow(TrackableState.Offline()) + private val _publisherPresence: MutableStateFlow = MutableStateFlow(false) + private val _resolutions: MutableSharedFlow = MutableSharedFlow(replay = 1) + private val _nextLocationUpdateIntervals: MutableSharedFlow = MutableSharedFlow(replay = 1) + + fun emitEnhanced(locationUpdate: LocationUpdate) { + scope.launch { _enhancedLocations.emit(locationUpdate) } + } + + fun emitRaw(locationUpdate: LocationUpdate) { + scope.launch { _rawLocations.emit(locationUpdate) } + } + + fun emitPublisherPresence(isPublisherPresent: Boolean) { + scope.launch { _publisherPresence.emit(isPublisherPresent) } + } + + fun emit(trackableState: TrackableState) { + scope.launch { _trackableStates.emit(trackableState) } + } + + fun emit(resolutions: Array) { + if (resolutions.isNotEmpty()) { + scope.launch { + for (resolution in resolutions) { + _resolutions.emit(resolution) + _nextLocationUpdateIntervals.emit(resolution.desiredInterval) + } } } } - } - internal data class EventFlows constructor( - val scope: CoroutineScope = CoroutineScope(singleThreadDispatcher + SupervisorJob()), - val enhancedLocations: MutableSharedFlow = MutableSharedFlow(replay = 1), - val rawLocations: MutableSharedFlow = MutableSharedFlow(replay = 1), - val trackableStateFlow: MutableStateFlow = MutableStateFlow(TrackableState.Offline()), - val publisherPresenceStateFlow: MutableStateFlow = MutableStateFlow(false), - val resolutions: MutableSharedFlow = MutableSharedFlow(replay = 1), - val nextLocationUpdateIntervals: MutableSharedFlow = MutableSharedFlow(replay = 1), - ) + val enhancedLocations: SharedFlow + get() = _enhancedLocations.asSharedFlow() + + val rawLocations: SharedFlow + get() = _rawLocations.asSharedFlow() + + val trackableStates: StateFlow + get() = _trackableStates.asStateFlow() + + val publisherPresence: StateFlow + get() = _publisherPresence + + val resolutions: SharedFlow + get() = _resolutions.asSharedFlow() + + val nextLocationUpdateIntervals: SharedFlow + get() = _nextLocationUpdateIntervals.asSharedFlow() + } private class PendingResolutions { private var resolutions: MutableList = ArrayList() From 90fd87198631d6e332f14d3b2f66b78fac403adb Mon Sep 17 00:00:00 2001 From: Quintin Willison Date: Mon, 19 Dec 2022 11:22:55 +0000 Subject: [PATCH 08/19] Add commentary to data class as scope wasn't necessarily clear. --- common/src/main/java/com/ably/tracking/common/AblyModels.kt | 3 +++ 1 file changed, 3 insertions(+) diff --git a/common/src/main/java/com/ably/tracking/common/AblyModels.kt b/common/src/main/java/com/ably/tracking/common/AblyModels.kt index c557516a0..9d85a5d69 100644 --- a/common/src/main/java/com/ably/tracking/common/AblyModels.kt +++ b/common/src/main/java/com/ably/tracking/common/AblyModels.kt @@ -2,6 +2,9 @@ package com.ably.tracking.common import com.ably.tracking.Resolution +/** + * Encapsulates the properties of an Ably presence message which are needed by asset tracking SDKs. + */ data class PresenceMessage( val action: PresenceAction, val data: PresenceData, From b52ecca93a0a218e8e51cc5aef9ad6127e8dfde0 Mon Sep 17 00:00:00 2001 From: Quintin Willison Date: Mon, 19 Dec 2022 12:49:21 +0000 Subject: [PATCH 09/19] Make reference constant. --- .../main/java/com/ably/tracking/subscriber/CoreSubscriber.kt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/subscribing-sdk/src/main/java/com/ably/tracking/subscriber/CoreSubscriber.kt b/subscribing-sdk/src/main/java/com/ably/tracking/subscriber/CoreSubscriber.kt index c510199a8..9138accb6 100644 --- a/subscribing-sdk/src/main/java/com/ably/tracking/subscriber/CoreSubscriber.kt +++ b/subscribing-sdk/src/main/java/com/ably/tracking/subscriber/CoreSubscriber.kt @@ -264,7 +264,7 @@ internal data class SubscriberProperties private constructor( } private class PendingResolutions { - private var resolutions: MutableList = ArrayList() + private val resolutions: MutableList = ArrayList() fun add(resolution: Resolution) { resolutions.add(resolution) From 660988d51d36dcfb5190f15f1ea0d37b9cd8a5e3 Mon Sep 17 00:00:00 2001 From: Quintin Willison Date: Mon, 19 Dec 2022 12:49:55 +0000 Subject: [PATCH 10/19] Move reference from field to local variable (reduce scope of accessibility). --- .../main/java/com/ably/tracking/subscriber/CoreSubscriber.kt | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/subscribing-sdk/src/main/java/com/ably/tracking/subscriber/CoreSubscriber.kt b/subscribing-sdk/src/main/java/com/ably/tracking/subscriber/CoreSubscriber.kt index 9138accb6..798fda4e6 100644 --- a/subscribing-sdk/src/main/java/com/ably/tracking/subscriber/CoreSubscriber.kt +++ b/subscribing-sdk/src/main/java/com/ably/tracking/subscriber/CoreSubscriber.kt @@ -64,7 +64,6 @@ private class DefaultCoreSubscriber( private val workerQueue: WorkerQueue private val eventFlows: SubscriberProperties.EventFlows - private val properties: SubscriberProperties override val enhancedLocations: SharedFlow get() = eventFlows.enhancedLocations @@ -88,7 +87,7 @@ private class DefaultCoreSubscriber( val workerFactory = WorkerFactory(this, ably, trackableId) val scope = CoroutineScope(singleThreadDispatcher + SupervisorJob()) eventFlows = SubscriberProperties.EventFlows(scope) - properties = SubscriberProperties(initialResolution, eventFlows) + val properties = SubscriberProperties(initialResolution, eventFlows) workerQueue = WorkerQueue( properties = properties, scope = scope, From b1ed03e912ebcc495fd1a44192ddcf894330229a Mon Sep 17 00:00:00 2001 From: Quintin Willison Date: Mon, 19 Dec 2022 12:53:36 +0000 Subject: [PATCH 11/19] Add link to bug issue from TODO commentary. --- .../src/main/java/com/ably/tracking/subscriber/CoreSubscriber.kt | 1 + 1 file changed, 1 insertion(+) diff --git a/subscribing-sdk/src/main/java/com/ably/tracking/subscriber/CoreSubscriber.kt b/subscribing-sdk/src/main/java/com/ably/tracking/subscriber/CoreSubscriber.kt index 798fda4e6..adb5b6a0f 100644 --- a/subscribing-sdk/src/main/java/com/ably/tracking/subscriber/CoreSubscriber.kt +++ b/subscribing-sdk/src/main/java/com/ably/tracking/subscriber/CoreSubscriber.kt @@ -123,6 +123,7 @@ private class DefaultCoreSubscriber( override fun notifyAssetIsOffline() { // TODO what is this method achieving, why is it not in normal flow? + // Perhaps related to: https://github.com/ably/ably-asset-tracking-android/issues/802 eventFlows.emit(TrackableState.Offline()) } } From afe49dda29967b3dd4bcad7e83323a2f32927d17 Mon Sep 17 00:00:00 2001 From: Quintin Willison Date: Tue, 20 Dec 2022 13:26:44 +0000 Subject: [PATCH 12/19] Rename variable for readability. --- .../java/com/ably/tracking/subscriber/CoreSubscriber.kt | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/subscribing-sdk/src/main/java/com/ably/tracking/subscriber/CoreSubscriber.kt b/subscribing-sdk/src/main/java/com/ably/tracking/subscriber/CoreSubscriber.kt index adb5b6a0f..c6b520fcd 100644 --- a/subscribing-sdk/src/main/java/com/ably/tracking/subscriber/CoreSubscriber.kt +++ b/subscribing-sdk/src/main/java/com/ably/tracking/subscriber/CoreSubscriber.kt @@ -135,7 +135,7 @@ internal data class SubscriberProperties private constructor( override var isStopped: Boolean = false, private var presentPublisherMemberKeys: MutableSet = HashSet(), - private var lastEmittedIsPublisherVisible: Boolean? = null, + private var lastEmittedValueOfIsPublisherVisible: Boolean? = null, private var lastEmittedTrackableState: TrackableState = TrackableState.Offline(), private var lastConnectionStateChange: ConnectionStateChange = ConnectionStateChange(ConnectionState.OFFLINE, null), @@ -201,8 +201,8 @@ internal data class SubscriberProperties private constructor( // Therefore, from the perspective of a user consuming events from publisherPresenceStateFlow, what matters // is what we're computing for isPublisherVisible (not the simple isAPublisherPresent). val isPublisherVisible = (isAPublisherPresent && lastConnectionStateChange.state == ConnectionState.ONLINE) - if (null == lastEmittedIsPublisherVisible || lastEmittedIsPublisherVisible!! != isPublisherVisible) { - lastEmittedIsPublisherVisible = isPublisherVisible + if (null == lastEmittedValueOfIsPublisherVisible || lastEmittedValueOfIsPublisherVisible!! != isPublisherVisible) { + lastEmittedValueOfIsPublisherVisible = isPublisherVisible eventFlows.emitPublisherPresence(isPublisherVisible) } From b0cd85ec514052e415e34137338737274b1c7347 Mon Sep 17 00:00:00 2001 From: Quintin Willison Date: Tue, 20 Dec 2022 14:05:35 +0000 Subject: [PATCH 13/19] Remove unused field. --- .../com/ably/tracking/subscriber/workerqueue/WorkerFactory.kt | 1 - .../subscriber/workerqueue/workers/StartConnectionWorker.kt | 1 - 2 files changed, 2 deletions(-) diff --git a/subscribing-sdk/src/main/java/com/ably/tracking/subscriber/workerqueue/WorkerFactory.kt b/subscribing-sdk/src/main/java/com/ably/tracking/subscriber/workerqueue/WorkerFactory.kt index 986ec0e10..a08863a26 100644 --- a/subscribing-sdk/src/main/java/com/ably/tracking/subscriber/workerqueue/WorkerFactory.kt +++ b/subscribing-sdk/src/main/java/com/ably/tracking/subscriber/workerqueue/WorkerFactory.kt @@ -38,7 +38,6 @@ internal class WorkerFactory( when (workerSpecification) { is WorkerSpecification.StartConnection -> StartConnectionWorker( ably, - subscriberInteractor, trackableId, workerSpecification.callbackFunction ) diff --git a/subscribing-sdk/src/main/java/com/ably/tracking/subscriber/workerqueue/workers/StartConnectionWorker.kt b/subscribing-sdk/src/main/java/com/ably/tracking/subscriber/workerqueue/workers/StartConnectionWorker.kt index da366ec81..668d12fe5 100644 --- a/subscribing-sdk/src/main/java/com/ably/tracking/subscriber/workerqueue/workers/StartConnectionWorker.kt +++ b/subscribing-sdk/src/main/java/com/ably/tracking/subscriber/workerqueue/workers/StartConnectionWorker.kt @@ -9,7 +9,6 @@ import com.ably.tracking.subscriber.workerqueue.WorkerSpecification internal class StartConnectionWorker( private val ably: Ably, - private val subscriberInteractor: SubscriberInteractor, private val trackableId: String, callbackFunction: ResultCallbackFunction ) : CallbackWorker(callbackFunction) { From 5ff01eb12da28ff3071419f816b0fd786cc0b8a6 Mon Sep 17 00:00:00 2001 From: Quintin Willison Date: Tue, 20 Dec 2022 14:12:25 +0000 Subject: [PATCH 14/19] Conform argument name. --- .../main/java/com/ably/tracking/subscriber/CoreSubscriber.kt | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/subscribing-sdk/src/main/java/com/ably/tracking/subscriber/CoreSubscriber.kt b/subscribing-sdk/src/main/java/com/ably/tracking/subscriber/CoreSubscriber.kt index c6b520fcd..a9bbc6051 100644 --- a/subscribing-sdk/src/main/java/com/ably/tracking/subscriber/CoreSubscriber.kt +++ b/subscribing-sdk/src/main/java/com/ably/tracking/subscriber/CoreSubscriber.kt @@ -145,8 +145,8 @@ internal data class SubscriberProperties private constructor( ) : Properties { internal constructor( initialResolution: Resolution?, - stateFlows: EventFlows, - ) : this(PresenceData(ClientTypes.SUBSCRIBER, initialResolution), stateFlows) + eventFlows: EventFlows, + ) : this(PresenceData(ClientTypes.SUBSCRIBER, initialResolution), eventFlows) fun updateForConnectionStateChangeAndThenEmitEventsIfRequired(stateChange: ConnectionStateChange) { lastConnectionStateChange = stateChange From 8ea1a1b10ac75ecffa83ee6452871ca0ea529e64 Mon Sep 17 00:00:00 2001 From: Quintin Willison Date: Tue, 20 Dec 2022 14:13:33 +0000 Subject: [PATCH 15/19] Rename methods for clarity. --- .../com/ably/tracking/subscriber/CoreSubscriber.kt | 10 +++++----- .../workers/ProcessInitialPresenceMessagesWorker.kt | 2 +- .../workerqueue/workers/StartConnectionWorker.kt | 3 +-- .../workers/UpdateChannelConnectionStateWorker.kt | 2 +- .../workerqueue/workers/UpdateConnectionStateWorker.kt | 2 +- .../workers/UpdatePublisherPresenceWorker.kt | 2 +- 6 files changed, 10 insertions(+), 11 deletions(-) diff --git a/subscribing-sdk/src/main/java/com/ably/tracking/subscriber/CoreSubscriber.kt b/subscribing-sdk/src/main/java/com/ably/tracking/subscriber/CoreSubscriber.kt index a9bbc6051..6a2a32a4d 100644 --- a/subscribing-sdk/src/main/java/com/ably/tracking/subscriber/CoreSubscriber.kt +++ b/subscribing-sdk/src/main/java/com/ably/tracking/subscriber/CoreSubscriber.kt @@ -148,14 +148,14 @@ internal data class SubscriberProperties private constructor( eventFlows: EventFlows, ) : this(PresenceData(ClientTypes.SUBSCRIBER, initialResolution), eventFlows) - fun updateForConnectionStateChangeAndThenEmitEventsIfRequired(stateChange: ConnectionStateChange) { + fun updateForConnectionStateChangeAndThenEmitStateEventsIfRequired(stateChange: ConnectionStateChange) { lastConnectionStateChange = stateChange - emitEventsIfRequired() + emitStateEventsIfRequired() } - fun updateForChannelConnectionStateChangeAndThenEmitEventsIfRequired(stateChange: ConnectionStateChange) { + fun updateForChannelConnectionStateChangeAndThenEmitStateEventsIfRequired(stateChange: ConnectionStateChange) { lastChannelConnectionStateChange = stateChange - emitEventsIfRequired() + emitStateEventsIfRequired() } fun updateForPresenceMessage(presenceMessage: PresenceMessage) { @@ -176,7 +176,7 @@ internal data class SubscriberProperties private constructor( } } - fun emitEventsIfRequired() { + fun emitStateEventsIfRequired() { val isAPublisherPresent = (presentPublisherMemberKeys.isNotEmpty()) val trackableState = when (lastConnectionStateChange.state) { diff --git a/subscribing-sdk/src/main/java/com/ably/tracking/subscriber/workerqueue/workers/ProcessInitialPresenceMessagesWorker.kt b/subscribing-sdk/src/main/java/com/ably/tracking/subscriber/workerqueue/workers/ProcessInitialPresenceMessagesWorker.kt index fdf4a4ae5..61f4d35f1 100644 --- a/subscribing-sdk/src/main/java/com/ably/tracking/subscriber/workerqueue/workers/ProcessInitialPresenceMessagesWorker.kt +++ b/subscribing-sdk/src/main/java/com/ably/tracking/subscriber/workerqueue/workers/ProcessInitialPresenceMessagesWorker.kt @@ -18,7 +18,7 @@ internal class ProcessInitialPresenceMessagesWorker( presenceMessages.forEach { presenceMessage -> properties.updateForPresenceMessage(presenceMessage) } - properties.emitEventsIfRequired() + properties.emitStateEventsIfRequired() postWork(WorkerSpecification.SubscribeToChannel(callbackFunction)) return properties } diff --git a/subscribing-sdk/src/main/java/com/ably/tracking/subscriber/workerqueue/workers/StartConnectionWorker.kt b/subscribing-sdk/src/main/java/com/ably/tracking/subscriber/workerqueue/workers/StartConnectionWorker.kt index 668d12fe5..c9adbec3c 100644 --- a/subscribing-sdk/src/main/java/com/ably/tracking/subscriber/workerqueue/workers/StartConnectionWorker.kt +++ b/subscribing-sdk/src/main/java/com/ably/tracking/subscriber/workerqueue/workers/StartConnectionWorker.kt @@ -3,7 +3,6 @@ package com.ably.tracking.subscriber.workerqueue.workers import com.ably.tracking.common.Ably import com.ably.tracking.common.ResultCallbackFunction import com.ably.tracking.subscriber.SubscriberProperties -import com.ably.tracking.subscriber.SubscriberInteractor import com.ably.tracking.common.workerqueue.CallbackWorker import com.ably.tracking.subscriber.workerqueue.WorkerSpecification @@ -17,7 +16,7 @@ internal class StartConnectionWorker( doAsyncWork: (suspend () -> Unit) -> Unit, postWork: (WorkerSpecification) -> Unit ): SubscriberProperties { - properties.emitEventsIfRequired() + properties.emitStateEventsIfRequired() doAsyncWork { val startAblyConnectionResult = ably.startConnection() if (startAblyConnectionResult.isFailure) { diff --git a/subscribing-sdk/src/main/java/com/ably/tracking/subscriber/workerqueue/workers/UpdateChannelConnectionStateWorker.kt b/subscribing-sdk/src/main/java/com/ably/tracking/subscriber/workerqueue/workers/UpdateChannelConnectionStateWorker.kt index 54899a955..c025e9a1c 100644 --- a/subscribing-sdk/src/main/java/com/ably/tracking/subscriber/workerqueue/workers/UpdateChannelConnectionStateWorker.kt +++ b/subscribing-sdk/src/main/java/com/ably/tracking/subscriber/workerqueue/workers/UpdateChannelConnectionStateWorker.kt @@ -13,7 +13,7 @@ internal class UpdateChannelConnectionStateWorker( doAsyncWork: (suspend () -> Unit) -> Unit, postWork: (WorkerSpecification) -> Unit ): SubscriberProperties { - properties.updateForChannelConnectionStateChangeAndThenEmitEventsIfRequired(channelConnectionStateChange) + properties.updateForChannelConnectionStateChangeAndThenEmitStateEventsIfRequired(channelConnectionStateChange) return properties } diff --git a/subscribing-sdk/src/main/java/com/ably/tracking/subscriber/workerqueue/workers/UpdateConnectionStateWorker.kt b/subscribing-sdk/src/main/java/com/ably/tracking/subscriber/workerqueue/workers/UpdateConnectionStateWorker.kt index 12aec62a0..94dbcd009 100644 --- a/subscribing-sdk/src/main/java/com/ably/tracking/subscriber/workerqueue/workers/UpdateConnectionStateWorker.kt +++ b/subscribing-sdk/src/main/java/com/ably/tracking/subscriber/workerqueue/workers/UpdateConnectionStateWorker.kt @@ -13,7 +13,7 @@ internal class UpdateConnectionStateWorker( doAsyncWork: (suspend () -> Unit) -> Unit, postWork: (WorkerSpecification) -> Unit ): SubscriberProperties { - properties.updateForConnectionStateChangeAndThenEmitEventsIfRequired(connectionStateChange) + properties.updateForConnectionStateChangeAndThenEmitStateEventsIfRequired(connectionStateChange) return properties } diff --git a/subscribing-sdk/src/main/java/com/ably/tracking/subscriber/workerqueue/workers/UpdatePublisherPresenceWorker.kt b/subscribing-sdk/src/main/java/com/ably/tracking/subscriber/workerqueue/workers/UpdatePublisherPresenceWorker.kt index 1ae759930..8bf9d57ca 100644 --- a/subscribing-sdk/src/main/java/com/ably/tracking/subscriber/workerqueue/workers/UpdatePublisherPresenceWorker.kt +++ b/subscribing-sdk/src/main/java/com/ably/tracking/subscriber/workerqueue/workers/UpdatePublisherPresenceWorker.kt @@ -14,7 +14,7 @@ internal class UpdatePublisherPresenceWorker( postWork: (WorkerSpecification) -> Unit ): SubscriberProperties { properties.updateForPresenceMessage(presenceMessage) - properties.emitEventsIfRequired() + properties.emitStateEventsIfRequired() return properties } From 078d33da7f9c58dfdd09499b9790f1dc2380d787 Mon Sep 17 00:00:00 2001 From: Quintin Willison Date: Tue, 20 Dec 2022 14:30:45 +0000 Subject: [PATCH 16/19] Increase responsibilities of update method to simplify use at upstream call sites. --- .../tracking/subscriber/CoreSubscriber.kt | 28 ++++++++++--------- .../ProcessInitialPresenceMessagesWorker.kt | 5 +--- .../workers/UpdatePublisherPresenceWorker.kt | 3 +- 3 files changed, 17 insertions(+), 19 deletions(-) diff --git a/subscribing-sdk/src/main/java/com/ably/tracking/subscriber/CoreSubscriber.kt b/subscribing-sdk/src/main/java/com/ably/tracking/subscriber/CoreSubscriber.kt index 6a2a32a4d..ff2a9bdf4 100644 --- a/subscribing-sdk/src/main/java/com/ably/tracking/subscriber/CoreSubscriber.kt +++ b/subscribing-sdk/src/main/java/com/ably/tracking/subscriber/CoreSubscriber.kt @@ -158,22 +158,24 @@ internal data class SubscriberProperties private constructor( emitStateEventsIfRequired() } - fun updateForPresenceMessage(presenceMessage: PresenceMessage) { - if (presenceMessage.data.type != ClientTypes.PUBLISHER) { + fun updateForPresenceMessagesAndThenEmitStateEventsIfRequired(presenceMessages: List) { + for (presenceMessage in presenceMessages) { // We are only interested in presence updates from publishers. - return - } - - if (presenceMessage.action == PresenceAction.LEAVE_OR_ABSENT) { - // LEAVE or ABSENT - presentPublisherMemberKeys.remove(presenceMessage.memberKey) - } else { - // PRESENT, ENTER or UDPATE - presentPublisherMemberKeys.add(presenceMessage.memberKey) - presenceMessage.data.resolution?.let { publisherResolution -> - pendingPublisherResolutions.add(publisherResolution) + if (presenceMessage.data.type == ClientTypes.PUBLISHER) { + + if (presenceMessage.action == PresenceAction.LEAVE_OR_ABSENT) { + // LEAVE or ABSENT + presentPublisherMemberKeys.remove(presenceMessage.memberKey) + } else { + // PRESENT, ENTER or UDPATE + presentPublisherMemberKeys.add(presenceMessage.memberKey) + presenceMessage.data.resolution?.let { publisherResolution -> + pendingPublisherResolutions.add(publisherResolution) + } + } } } + emitStateEventsIfRequired() } fun emitStateEventsIfRequired() { diff --git a/subscribing-sdk/src/main/java/com/ably/tracking/subscriber/workerqueue/workers/ProcessInitialPresenceMessagesWorker.kt b/subscribing-sdk/src/main/java/com/ably/tracking/subscriber/workerqueue/workers/ProcessInitialPresenceMessagesWorker.kt index 61f4d35f1..2a92e3e7e 100644 --- a/subscribing-sdk/src/main/java/com/ably/tracking/subscriber/workerqueue/workers/ProcessInitialPresenceMessagesWorker.kt +++ b/subscribing-sdk/src/main/java/com/ably/tracking/subscriber/workerqueue/workers/ProcessInitialPresenceMessagesWorker.kt @@ -15,10 +15,7 @@ internal class ProcessInitialPresenceMessagesWorker( doAsyncWork: (suspend () -> Unit) -> Unit, postWork: (WorkerSpecification) -> Unit ): SubscriberProperties { - presenceMessages.forEach { presenceMessage -> - properties.updateForPresenceMessage(presenceMessage) - } - properties.emitStateEventsIfRequired() + properties.updateForPresenceMessagesAndThenEmitStateEventsIfRequired(presenceMessages) postWork(WorkerSpecification.SubscribeToChannel(callbackFunction)) return properties } diff --git a/subscribing-sdk/src/main/java/com/ably/tracking/subscriber/workerqueue/workers/UpdatePublisherPresenceWorker.kt b/subscribing-sdk/src/main/java/com/ably/tracking/subscriber/workerqueue/workers/UpdatePublisherPresenceWorker.kt index 8bf9d57ca..44a72a3a5 100644 --- a/subscribing-sdk/src/main/java/com/ably/tracking/subscriber/workerqueue/workers/UpdatePublisherPresenceWorker.kt +++ b/subscribing-sdk/src/main/java/com/ably/tracking/subscriber/workerqueue/workers/UpdatePublisherPresenceWorker.kt @@ -13,8 +13,7 @@ internal class UpdatePublisherPresenceWorker( doAsyncWork: (suspend () -> Unit) -> Unit, postWork: (WorkerSpecification) -> Unit ): SubscriberProperties { - properties.updateForPresenceMessage(presenceMessage) - properties.emitStateEventsIfRequired() + properties.updateForPresenceMessagesAndThenEmitStateEventsIfRequired(listOf(presenceMessage)) return properties } From da21c9b3a7b685ffe10fbc5c6045253e0389efb8 Mon Sep 17 00:00:00 2001 From: Quintin Willison Date: Tue, 20 Dec 2022 14:41:25 +0000 Subject: [PATCH 17/19] Simplify nullable Boolean check. --- .../main/java/com/ably/tracking/subscriber/CoreSubscriber.kt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/subscribing-sdk/src/main/java/com/ably/tracking/subscriber/CoreSubscriber.kt b/subscribing-sdk/src/main/java/com/ably/tracking/subscriber/CoreSubscriber.kt index ff2a9bdf4..6719bbd2d 100644 --- a/subscribing-sdk/src/main/java/com/ably/tracking/subscriber/CoreSubscriber.kt +++ b/subscribing-sdk/src/main/java/com/ably/tracking/subscriber/CoreSubscriber.kt @@ -203,7 +203,7 @@ internal data class SubscriberProperties private constructor( // Therefore, from the perspective of a user consuming events from publisherPresenceStateFlow, what matters // is what we're computing for isPublisherVisible (not the simple isAPublisherPresent). val isPublisherVisible = (isAPublisherPresent && lastConnectionStateChange.state == ConnectionState.ONLINE) - if (null == lastEmittedValueOfIsPublisherVisible || lastEmittedValueOfIsPublisherVisible!! != isPublisherVisible) { + if (lastEmittedValueOfIsPublisherVisible != isPublisherVisible) { lastEmittedValueOfIsPublisherVisible = isPublisherVisible eventFlows.emitPublisherPresence(isPublisherVisible) } From fca078904b8233ba3b0e62cb96de112c41c5ed88 Mon Sep 17 00:00:00 2001 From: Quintin Willison Date: Wed, 21 Dec 2022 12:28:20 +0000 Subject: [PATCH 18/19] Reinstate removed tests. I initially removed them in 130dfa68cddec6c336baf8f6e37565859e5986e5 but, quite rightly, had push back from other members of the team so I have put time into reinstating them which required refactors to reflect my changes elsewhere. --- .../workers/ChangeResolutionWorkerTest.kt | 41 ++++++ .../workers/DisconnectWorkerTest.kt | 41 ++++++ ...rocessInitialPresenceMessagesWorkerTest.kt | 74 ++++++++++ .../workers/StartConnectionWorkerTest.kt | 129 +++++++++++++++++ .../workers/StopConnectionWorkerTest.kt | 63 +++++++++ .../SubscribeForPresenceMessagesWorkerTest.kt | 131 ++++++++++++++++++ .../workers/SubscribeToChannelWorkerTest.kt | 53 +++++++ .../UpdateChannelConnectionStateWorkerTest.kt | 45 ++++++ .../UpdateConnectionStateWorkerTest.kt | 45 ++++++ .../UpdatePublisherPresenceWorkerTest.kt | 41 ++++++ .../workerqueue/workers/WorkerTestUtils.kt | 17 +++ 11 files changed, 680 insertions(+) create mode 100644 subscribing-sdk/src/test/java/com/ably/tracking/subscriber/workerqueue/workers/ChangeResolutionWorkerTest.kt create mode 100644 subscribing-sdk/src/test/java/com/ably/tracking/subscriber/workerqueue/workers/DisconnectWorkerTest.kt create mode 100644 subscribing-sdk/src/test/java/com/ably/tracking/subscriber/workerqueue/workers/ProcessInitialPresenceMessagesWorkerTest.kt create mode 100644 subscribing-sdk/src/test/java/com/ably/tracking/subscriber/workerqueue/workers/StartConnectionWorkerTest.kt create mode 100644 subscribing-sdk/src/test/java/com/ably/tracking/subscriber/workerqueue/workers/StopConnectionWorkerTest.kt create mode 100644 subscribing-sdk/src/test/java/com/ably/tracking/subscriber/workerqueue/workers/SubscribeForPresenceMessagesWorkerTest.kt create mode 100644 subscribing-sdk/src/test/java/com/ably/tracking/subscriber/workerqueue/workers/SubscribeToChannelWorkerTest.kt create mode 100644 subscribing-sdk/src/test/java/com/ably/tracking/subscriber/workerqueue/workers/UpdateChannelConnectionStateWorkerTest.kt create mode 100644 subscribing-sdk/src/test/java/com/ably/tracking/subscriber/workerqueue/workers/UpdateConnectionStateWorkerTest.kt create mode 100644 subscribing-sdk/src/test/java/com/ably/tracking/subscriber/workerqueue/workers/UpdatePublisherPresenceWorkerTest.kt create mode 100644 subscribing-sdk/src/test/java/com/ably/tracking/subscriber/workerqueue/workers/WorkerTestUtils.kt diff --git a/subscribing-sdk/src/test/java/com/ably/tracking/subscriber/workerqueue/workers/ChangeResolutionWorkerTest.kt b/subscribing-sdk/src/test/java/com/ably/tracking/subscriber/workerqueue/workers/ChangeResolutionWorkerTest.kt new file mode 100644 index 000000000..0801d0155 --- /dev/null +++ b/subscribing-sdk/src/test/java/com/ably/tracking/subscriber/workerqueue/workers/ChangeResolutionWorkerTest.kt @@ -0,0 +1,41 @@ +package com.ably.tracking.subscriber.workerqueue.workers + +import com.ably.tracking.Accuracy +import com.ably.tracking.Resolution +import com.ably.tracking.common.Ably +import com.ably.tracking.common.ResultCallbackFunction +import com.ably.tracking.subscriber.SubscriberProperties +import com.ably.tracking.test.common.mockUpdatePresenceDataSuccess +import io.mockk.mockk +import io.mockk.verify +import kotlinx.coroutines.ExperimentalCoroutinesApi +import kotlinx.coroutines.test.runBlockingTest +import org.junit.Assert +import org.junit.Test + +@ExperimentalCoroutinesApi +internal class ChangeResolutionWorkerTest { + + private val ably: Ably = mockk() + private val trackableId = "123123" + private val updatedResolution = Resolution(Accuracy.HIGH, 10, 10.0) + private val callbackFunction: ResultCallbackFunction = mockk(relaxed = true) + private val changeResolutionWorker = ChangeResolutionWorker(ably, trackableId, updatedResolution, callbackFunction) + + private val asyncWorks = mutableListOf Unit>() + + @Test + fun `should return Properties with updated resolution and notify callback`() = runBlockingTest { + // given + val initialProperties = SubscriberProperties(Resolution(Accuracy.BALANCED, 100, 100.0), mockk()) + ably.mockUpdatePresenceDataSuccess(trackableId) + + // when + val updatedProperties = changeResolutionWorker.doWork(initialProperties, asyncWorks.appendWork()) {} + asyncWorks.executeAll() + + // then + Assert.assertEquals(updatedResolution, updatedProperties.presenceData.resolution) + verify { callbackFunction.invoke(match { it.isSuccess }) } + } +} diff --git a/subscribing-sdk/src/test/java/com/ably/tracking/subscriber/workerqueue/workers/DisconnectWorkerTest.kt b/subscribing-sdk/src/test/java/com/ably/tracking/subscriber/workerqueue/workers/DisconnectWorkerTest.kt new file mode 100644 index 000000000..1829cd1db --- /dev/null +++ b/subscribing-sdk/src/test/java/com/ably/tracking/subscriber/workerqueue/workers/DisconnectWorkerTest.kt @@ -0,0 +1,41 @@ +package com.ably.tracking.subscriber.workerqueue.workers + +import com.ably.tracking.Accuracy +import com.ably.tracking.Resolution +import com.ably.tracking.common.Ably +import com.ably.tracking.subscriber.SubscriberProperties +import com.ably.tracking.test.common.mockDisconnectSuccess +import io.mockk.coVerify +import io.mockk.mockk +import io.mockk.verify +import kotlinx.coroutines.ExperimentalCoroutinesApi +import kotlinx.coroutines.test.runBlockingTest +import org.junit.Assert +import org.junit.Test + +@ExperimentalCoroutinesApi +internal class DisconnectWorkerTest { + + private val ably: Ably = mockk() + private val trackableId = "123123" + private val callbackFunction: () -> Unit = mockk(relaxed = true) + private val disconnectWorker = DisconnectWorker(ably, trackableId, callbackFunction) + + private val asyncWorks = mutableListOf Unit>() + + @Test + fun `should call ably disconnect and notify callback`() = runBlockingTest { + // given + val initialProperties = SubscriberProperties(Resolution(Accuracy.BALANCED, 100, 100.0), mockk()) + ably.mockDisconnectSuccess(trackableId) + + // when + val updatedProperties = disconnectWorker.doWork(initialProperties, asyncWorks.appendWork()) {} + asyncWorks.executeAll() + + // then + Assert.assertEquals(initialProperties, updatedProperties) + coVerify { ably.disconnect(trackableId, initialProperties.presenceData) } + verify { callbackFunction.invoke() } + } +} diff --git a/subscribing-sdk/src/test/java/com/ably/tracking/subscriber/workerqueue/workers/ProcessInitialPresenceMessagesWorkerTest.kt b/subscribing-sdk/src/test/java/com/ably/tracking/subscriber/workerqueue/workers/ProcessInitialPresenceMessagesWorkerTest.kt new file mode 100644 index 000000000..55a63303f --- /dev/null +++ b/subscribing-sdk/src/test/java/com/ably/tracking/subscriber/workerqueue/workers/ProcessInitialPresenceMessagesWorkerTest.kt @@ -0,0 +1,74 @@ +package com.ably.tracking.subscriber.workerqueue.workers + +import com.ably.tracking.common.ClientTypes +import com.ably.tracking.common.PresenceAction +import com.ably.tracking.common.PresenceData +import com.ably.tracking.common.PresenceMessage +import com.ably.tracking.common.ResultCallbackFunction +import com.ably.tracking.subscriber.SubscriberProperties +import com.ably.tracking.subscriber.workerqueue.WorkerSpecification +import io.mockk.Runs +import io.mockk.every +import io.mockk.just +import io.mockk.mockk +import io.mockk.verify +import kotlinx.coroutines.ExperimentalCoroutinesApi +import kotlinx.coroutines.test.runBlockingTest +import org.junit.Assert +import org.junit.Test + +@ExperimentalCoroutinesApi +internal class ProcessInitialPresenceMessagesWorkerTest { + + private val subscriberProperties: SubscriberProperties = mockk() + private val asyncWorks = mutableListOf Unit>() + private val postedWorks = mutableListOf() + + @Test + fun `should process all presence messages`() = runBlockingTest { + // given + val presenceMessages = listOf(anyPresenceMessage(), anyPresenceMessage(), anyPresenceMessage()) + val worker = ProcessInitialPresenceMessagesWorker(presenceMessages) {} + every { subscriberProperties.updateForPresenceMessagesAndThenEmitStateEventsIfRequired(any()) } just Runs + + // when + worker.doWork( + subscriberProperties, + asyncWorks.appendWork(), + postedWorks.appendSpecification(), + ) + + // then + verify { + subscriberProperties.updateForPresenceMessagesAndThenEmitStateEventsIfRequired(presenceMessages) + } + } + + @Test + fun `should post subscribe to channel work after processing presence messages`() = runBlockingTest { + // given + val callbackFunction: ResultCallbackFunction = {} + val presenceMessages = emptyList() + val worker = ProcessInitialPresenceMessagesWorker(presenceMessages, callbackFunction) + every { subscriberProperties.updateForPresenceMessagesAndThenEmitStateEventsIfRequired(any()) } just Runs + + // when + worker.doWork( + subscriberProperties, + asyncWorks.appendWork(), + postedWorks.appendSpecification(), + ) + + // then + Assert.assertEquals(WorkerSpecification.SubscribeToChannel(callbackFunction), postedWorks[0]) + verify { + subscriberProperties.updateForPresenceMessagesAndThenEmitStateEventsIfRequired(presenceMessages) + } + } + + private fun anyPresenceMessage() = PresenceMessage( + action = PresenceAction.PRESENT_OR_ENTER, + data = PresenceData(ClientTypes.PUBLISHER, null, null), + memberKey = "", + ) +} diff --git a/subscribing-sdk/src/test/java/com/ably/tracking/subscriber/workerqueue/workers/StartConnectionWorkerTest.kt b/subscribing-sdk/src/test/java/com/ably/tracking/subscriber/workerqueue/workers/StartConnectionWorkerTest.kt new file mode 100644 index 000000000..bdd0978e9 --- /dev/null +++ b/subscribing-sdk/src/test/java/com/ably/tracking/subscriber/workerqueue/workers/StartConnectionWorkerTest.kt @@ -0,0 +1,129 @@ +package com.ably.tracking.subscriber.workerqueue.workers + +import com.ably.tracking.common.Ably +import com.ably.tracking.common.PresenceData +import com.ably.tracking.common.ResultCallbackFunction +import com.ably.tracking.subscriber.SubscriberProperties +import com.ably.tracking.subscriber.workerqueue.WorkerSpecification +import com.ably.tracking.test.common.mockConnectSuccess +import com.ably.tracking.test.common.mockConnectFailure +import com.ably.tracking.test.common.mockStartConnectionFailure +import com.ably.tracking.test.common.mockStartConnectionSuccess +import io.mockk.Runs +import io.mockk.coVerify +import io.mockk.every +import io.mockk.just +import io.mockk.mockk +import io.mockk.verify +import kotlinx.coroutines.ExperimentalCoroutinesApi +import kotlinx.coroutines.test.runBlockingTest +import org.junit.Assert +import org.junit.Test + +@ExperimentalCoroutinesApi +internal class StartConnectionWorkerTest { + + private val ably: Ably = mockk() + private val trackableId = "123123" + private val callbackFunction: ResultCallbackFunction = mockk(relaxed = true) + private val subscriberProperties: SubscriberProperties = mockk() + private val presenceData: PresenceData = mockk() + private val startConnectionWorker = StartConnectionWorker(ably, trackableId, callbackFunction) + + private val asyncWorks = mutableListOf Unit>() + private val postedWorks = mutableListOf() + + @Test + fun `should call ably connect and post update trackable worker specification on success`() = runBlockingTest { + // given + ably.mockStartConnectionSuccess() + ably.mockConnectSuccess(trackableId) + every { subscriberProperties.emitStateEventsIfRequired() } just Runs + every { subscriberProperties.presenceData } returns presenceData + + // when + val updatedProperties = + startConnectionWorker.doWork( + subscriberProperties, + asyncWorks.appendWork(), + postedWorks.appendSpecification() + ) + asyncWorks.executeAll() + + // then + Assert.assertEquals(subscriberProperties, updatedProperties) + coVerify { + ably.startConnection() + ably.connect( + trackableId, + presenceData, + useRewind = true, + willSubscribe = true + ) + } + Assert.assertEquals(WorkerSpecification.SubscribeForPresenceMessages(callbackFunction), postedWorks[0]) + } + + @Test + fun `should call ably connect and notify callback on failure`() = runBlockingTest { + // given + ably.mockStartConnectionSuccess() + ably.mockConnectFailure(trackableId) + every { subscriberProperties.emitStateEventsIfRequired() } just Runs + every { subscriberProperties.presenceData } returns presenceData + + // when + val updatedProperties = + startConnectionWorker.doWork( + subscriberProperties, + asyncWorks.appendWork(), + postedWorks.appendSpecification() + ) + asyncWorks.executeAll() + + // then + Assert.assertEquals(subscriberProperties, updatedProperties) + coVerify { + ably.startConnection() + ably.connect( + trackableId, + presenceData, + useRewind = true, + willSubscribe = true + ) + } + verify { callbackFunction.invoke(match { it.isFailure }) } + Assert.assertTrue(postedWorks.isEmpty()) + } + + @Test + fun `should notify callback about failure when starting Ably connection fails`() = runBlockingTest { + // given + ably.mockStartConnectionFailure() + ably.mockConnectSuccess(trackableId) + every { subscriberProperties.emitStateEventsIfRequired() } just Runs + + // when + val updatedProperties = + startConnectionWorker.doWork( + subscriberProperties, + asyncWorks.appendWork(), + postedWorks.appendSpecification() + ) + asyncWorks.executeAll() + + // then + Assert.assertEquals(subscriberProperties, updatedProperties) + coVerify { ably.startConnection() } + coVerify(exactly = 0) { + ably.connect( + trackableId, + subscriberProperties.presenceData, + useRewind = true, + willSubscribe = true + ) + } + verify { callbackFunction.invoke(match { it.isFailure }) } + Assert.assertTrue(postedWorks.isEmpty()) + } +} diff --git a/subscribing-sdk/src/test/java/com/ably/tracking/subscriber/workerqueue/workers/StopConnectionWorkerTest.kt b/subscribing-sdk/src/test/java/com/ably/tracking/subscriber/workerqueue/workers/StopConnectionWorkerTest.kt new file mode 100644 index 000000000..c580bd683 --- /dev/null +++ b/subscribing-sdk/src/test/java/com/ably/tracking/subscriber/workerqueue/workers/StopConnectionWorkerTest.kt @@ -0,0 +1,63 @@ +package com.ably.tracking.subscriber.workerqueue.workers + +import com.ably.tracking.Accuracy +import com.ably.tracking.Resolution +import com.ably.tracking.common.Ably +import com.ably.tracking.common.ResultCallbackFunction +import com.ably.tracking.subscriber.SubscriberProperties +import com.ably.tracking.subscriber.SubscriberInteractor +import com.ably.tracking.subscriber.workerqueue.WorkerSpecification +import com.ably.tracking.test.common.mockCloseFailure +import com.ably.tracking.test.common.mockCloseSuccessWithDelay +import io.mockk.every +import io.mockk.just +import io.mockk.mockk +import io.mockk.runs +import io.mockk.verify +import kotlinx.coroutines.ExperimentalCoroutinesApi +import kotlinx.coroutines.test.runBlockingTest +import org.junit.Assert +import org.junit.Test + +@ExperimentalCoroutinesApi +internal class StopConnectionWorkerTest { + + private val ably: Ably = mockk() + private val subscriberInteractor: SubscriberInteractor = mockk { + every { notifyAssetIsOffline() } just runs + } + private val callbackFunction: ResultCallbackFunction = mockk(relaxed = true) + private val stopConnectionWorker = StopConnectionWorker(ably, subscriberInteractor, callbackFunction) + + private val asyncWorks = mutableListOf Unit>() + private val postedWorks = mutableListOf() + + @Test + fun `should call ably close and notify callback with success`() = runBlockingTest { + // given + val initialProperties = SubscriberProperties(Resolution(Accuracy.BALANCED, 100, 100.0), mockk()) + ably.mockCloseSuccessWithDelay(10) + + // when + val updatedProperties = + stopConnectionWorker.doWork(initialProperties, asyncWorks.appendWork(), postedWorks.appendSpecification()) + + // then + Assert.assertTrue(updatedProperties.isStopped) + verify { callbackFunction.invoke(match { it.isSuccess }) } + verify { subscriberInteractor.notifyAssetIsOffline() } + } + + @Test + fun `should call ably close and notify callback with failure when it fails`() = runBlockingTest { + // given + val initialProperties = SubscriberProperties(Resolution(Accuracy.BALANCED, 100, 100.0), mockk()) + ably.mockCloseFailure() + + // when + stopConnectionWorker.doWork(initialProperties, asyncWorks.appendWork(), postedWorks.appendSpecification()) + + // then + verify { callbackFunction.invoke(match { it.isFailure }) } + } +} diff --git a/subscribing-sdk/src/test/java/com/ably/tracking/subscriber/workerqueue/workers/SubscribeForPresenceMessagesWorkerTest.kt b/subscribing-sdk/src/test/java/com/ably/tracking/subscriber/workerqueue/workers/SubscribeForPresenceMessagesWorkerTest.kt new file mode 100644 index 000000000..30bfc7f89 --- /dev/null +++ b/subscribing-sdk/src/test/java/com/ably/tracking/subscriber/workerqueue/workers/SubscribeForPresenceMessagesWorkerTest.kt @@ -0,0 +1,131 @@ +package com.ably.tracking.subscriber.workerqueue.workers + +import com.ably.tracking.Accuracy +import com.ably.tracking.Resolution +import com.ably.tracking.common.Ably +import com.ably.tracking.common.ClientTypes +import com.ably.tracking.common.PresenceAction +import com.ably.tracking.common.PresenceData +import com.ably.tracking.common.PresenceMessage +import com.ably.tracking.common.ResultCallbackFunction +import com.ably.tracking.subscriber.SubscriberProperties +import com.ably.tracking.subscriber.workerqueue.WorkerSpecification +import com.ably.tracking.test.common.mockGetCurrentPresenceError +import com.ably.tracking.test.common.mockGetCurrentPresenceSuccess +import com.ably.tracking.test.common.mockSubscribeToPresenceError +import com.ably.tracking.test.common.mockSubscribeToPresenceSuccess +import io.mockk.CapturingSlot +import io.mockk.mockk +import io.mockk.slot +import kotlinx.coroutines.ExperimentalCoroutinesApi +import kotlinx.coroutines.test.runBlockingTest +import org.junit.Assert +import org.junit.Test + +@ExperimentalCoroutinesApi +internal class SubscribeForPresenceMessagesWorkerTest { + + private val ably: Ably = mockk() + private val trackableId = "123123" + private val callbackFunction: ResultCallbackFunction = mockk(relaxed = true) + private val subscribeForPresenceMessagesWorker = + SubscribeForPresenceMessagesWorker(ably, trackableId, callbackFunction) + + private val asyncWorks = mutableListOf Unit>() + private val postedWorks = mutableListOf() + + @Test + fun `should post update presence work when presence listener is called`() = runBlockingTest { + // given + val initialProperties = SubscriberProperties(Resolution(Accuracy.BALANCED, 100, 100.0), mockk()) + val presenceListenerSlot: CapturingSlot<(PresenceMessage) -> Unit> = slot() + val presenceMessage = createPresenceMessage() + ably.mockGetCurrentPresenceSuccess(trackableId) + ably.mockSubscribeToPresenceSuccess(trackableId, presenceListenerSlot) + + // when + subscribeForPresenceMessagesWorker.doWork( + initialProperties, + asyncWorks.appendWork(), + postedWorks.appendSpecification() + ) + asyncWorks.executeAll() + presenceListenerSlot.captured.invoke(presenceMessage) + + // then + Assert.assertEquals(WorkerSpecification.UpdatePublisherPresence(presenceMessage), postedWorks[1]) + } + + private fun createPresenceMessage() = PresenceMessage( + action = PresenceAction.PRESENT_OR_ENTER, + data = PresenceData(ClientTypes.PUBLISHER, null, null), + memberKey = "", + ) + + @Test + fun `should post process initial presence messages work when both get current presence and subscribe to presence return success`() = + runBlockingTest { + // given + val initialProperties = SubscriberProperties(Resolution(Accuracy.BALANCED, 100, 100.0), mockk()) + val initialPresenceMessages = listOf(anyPresenceMessage()) + ably.mockGetCurrentPresenceSuccess(trackableId, initialPresenceMessages) + ably.mockSubscribeToPresenceSuccess(trackableId) + + // when + subscribeForPresenceMessagesWorker.doWork( + initialProperties, + asyncWorks.appendWork(), + postedWorks.appendSpecification() + ) + asyncWorks.executeAll() + + // then + Assert.assertEquals( + WorkerSpecification.ProcessInitialPresenceMessages( + initialPresenceMessages, + callbackFunction + ), postedWorks[0] + ) + } + + @Test + fun `should post disconnect work when subscribe to presence returns failure`() = runBlockingTest { + // given + val initialProperties = SubscriberProperties(Resolution(Accuracy.BALANCED, 100, 100.0), mockk()) + ably.mockGetCurrentPresenceSuccess(trackableId) + ably.mockSubscribeToPresenceError(trackableId) + + // when + subscribeForPresenceMessagesWorker.doWork( + initialProperties, + asyncWorks.appendWork(), + postedWorks.appendSpecification() + ) + asyncWorks.executeAll() + + // then + Assert.assertTrue(postedWorks[0] is WorkerSpecification.Disconnect) + } + + @Test + fun `should post disconnect work when get current presence returns failure`() = runBlockingTest { + // given + val initialProperties = SubscriberProperties(Resolution(Accuracy.BALANCED, 100, 100.0), mockk()) + ably.mockGetCurrentPresenceError(trackableId) + ably.mockSubscribeToPresenceSuccess(trackableId) + + // when + subscribeForPresenceMessagesWorker.doWork( + initialProperties, + asyncWorks.appendWork(), + postedWorks.appendSpecification() + ) + asyncWorks.executeAll() + + // then + Assert.assertTrue(postedWorks[0] is WorkerSpecification.Disconnect) + } + + private fun anyPresenceMessage() = + PresenceMessage(PresenceAction.PRESENT_OR_ENTER, PresenceData(ClientTypes.PUBLISHER), "any-client-id") +} diff --git a/subscribing-sdk/src/test/java/com/ably/tracking/subscriber/workerqueue/workers/SubscribeToChannelWorkerTest.kt b/subscribing-sdk/src/test/java/com/ably/tracking/subscriber/workerqueue/workers/SubscribeToChannelWorkerTest.kt new file mode 100644 index 000000000..7bc3e62b8 --- /dev/null +++ b/subscribing-sdk/src/test/java/com/ably/tracking/subscriber/workerqueue/workers/SubscribeToChannelWorkerTest.kt @@ -0,0 +1,53 @@ +package com.ably.tracking.subscriber.workerqueue.workers + +import com.ably.tracking.Accuracy +import com.ably.tracking.Resolution +import com.ably.tracking.common.ResultCallbackFunction +import com.ably.tracking.subscriber.SubscriberProperties +import com.ably.tracking.subscriber.SubscriberInteractor +import com.ably.tracking.subscriber.workerqueue.WorkerSpecification +import io.mockk.every +import io.mockk.just +import io.mockk.mockk +import io.mockk.runs +import io.mockk.verify +import kotlinx.coroutines.ExperimentalCoroutinesApi +import kotlinx.coroutines.test.runBlockingTest +import org.junit.Test + +@ExperimentalCoroutinesApi +internal class SubscribeToChannelWorkerTest { + + private val subscriberInteractor: SubscriberInteractor = mockk { + every { subscribeForChannelState() } just runs + every { subscribeForEnhancedEvents(any()) } just runs + every { subscribeForRawEvents(any()) } just runs + } + private val callbackFunction: ResultCallbackFunction = mockk(relaxed = true) + private val subscribeToChannelWorker = + SubscribeToChannelWorker(subscriberInteractor, callbackFunction) + + private val asyncWorks = mutableListOf Unit>() + private val postedWorks = mutableListOf() + + @Test + fun `should notify callback after calling subscriberInteractor`() = runBlockingTest { + // given + val initialProperties = SubscriberProperties(Resolution(Accuracy.BALANCED, 100, 100.0), mockk()) + + // when + subscribeToChannelWorker.doWork( + initialProperties, + asyncWorks.appendWork(), + postedWorks.appendSpecification() + ) + + // then + verify { + subscriberInteractor.subscribeForChannelState() + subscriberInteractor.subscribeForEnhancedEvents(initialProperties.presenceData) + subscriberInteractor.subscribeForRawEvents(initialProperties.presenceData) + callbackFunction.invoke(match { it.isSuccess }) + } + } +} diff --git a/subscribing-sdk/src/test/java/com/ably/tracking/subscriber/workerqueue/workers/UpdateChannelConnectionStateWorkerTest.kt b/subscribing-sdk/src/test/java/com/ably/tracking/subscriber/workerqueue/workers/UpdateChannelConnectionStateWorkerTest.kt new file mode 100644 index 000000000..891e1884f --- /dev/null +++ b/subscribing-sdk/src/test/java/com/ably/tracking/subscriber/workerqueue/workers/UpdateChannelConnectionStateWorkerTest.kt @@ -0,0 +1,45 @@ +package com.ably.tracking.subscriber.workerqueue.workers + +import com.ably.tracking.common.ConnectionStateChange +import com.ably.tracking.subscriber.SubscriberProperties +import com.ably.tracking.subscriber.workerqueue.WorkerSpecification +import io.mockk.Runs +import io.mockk.every +import io.mockk.just +import io.mockk.mockk +import io.mockk.verify +import kotlinx.coroutines.ExperimentalCoroutinesApi +import kotlinx.coroutines.test.runBlockingTest +import org.junit.Test + +@ExperimentalCoroutinesApi +internal class UpdateChannelConnectionStateWorkerTest { + + private val subscriberProperties: SubscriberProperties = mockk() + private val channelConnectionStateChange: ConnectionStateChange = mockk() + private val updateChannelConnectionStateWorker = + UpdateChannelConnectionStateWorker(channelConnectionStateChange) + + private val asyncWorks = mutableListOf Unit>() + private val postedWorks = mutableListOf() + + @Test + fun `should call updateForChannelConnectionStateChangeAndThenEmitStateEventsIfRequired`() = runBlockingTest { + // given + every { subscriberProperties.updateForChannelConnectionStateChangeAndThenEmitStateEventsIfRequired(any()) } just Runs + + // when + updateChannelConnectionStateWorker.doWork( + subscriberProperties, + asyncWorks.appendWork(), + postedWorks.appendSpecification() + ) + + // then + verify { + subscriberProperties.updateForChannelConnectionStateChangeAndThenEmitStateEventsIfRequired( + channelConnectionStateChange + ) + } + } +} diff --git a/subscribing-sdk/src/test/java/com/ably/tracking/subscriber/workerqueue/workers/UpdateConnectionStateWorkerTest.kt b/subscribing-sdk/src/test/java/com/ably/tracking/subscriber/workerqueue/workers/UpdateConnectionStateWorkerTest.kt new file mode 100644 index 000000000..0d776f93e --- /dev/null +++ b/subscribing-sdk/src/test/java/com/ably/tracking/subscriber/workerqueue/workers/UpdateConnectionStateWorkerTest.kt @@ -0,0 +1,45 @@ +package com.ably.tracking.subscriber.workerqueue.workers + +import com.ably.tracking.common.ConnectionStateChange +import com.ably.tracking.subscriber.SubscriberProperties +import com.ably.tracking.subscriber.workerqueue.WorkerSpecification +import io.mockk.Runs +import io.mockk.every +import io.mockk.just +import io.mockk.mockk +import io.mockk.verify +import kotlinx.coroutines.ExperimentalCoroutinesApi +import kotlinx.coroutines.test.runBlockingTest +import org.junit.Test + +@ExperimentalCoroutinesApi +internal class UpdateConnectionStateWorkerTest { + + private val subscriberProperties: SubscriberProperties = mockk() + private val connectionStateChange: ConnectionStateChange = mockk() + private val updateConnectionStateWorker = + UpdateConnectionStateWorker(connectionStateChange) + + private val asyncWorks = mutableListOf Unit>() + private val postedWorks = mutableListOf() + + @Test + fun `should call updateForConnectionStateChangeAndThenEmitStateEventsIfRequired`() = runBlockingTest { + // given + every { subscriberProperties.updateForConnectionStateChangeAndThenEmitStateEventsIfRequired(any()) } just Runs + + // when + updateConnectionStateWorker.doWork( + subscriberProperties, + asyncWorks.appendWork(), + postedWorks.appendSpecification() + ) + + // then + verify { + subscriberProperties.updateForConnectionStateChangeAndThenEmitStateEventsIfRequired( + connectionStateChange + ) + } + } +} diff --git a/subscribing-sdk/src/test/java/com/ably/tracking/subscriber/workerqueue/workers/UpdatePublisherPresenceWorkerTest.kt b/subscribing-sdk/src/test/java/com/ably/tracking/subscriber/workerqueue/workers/UpdatePublisherPresenceWorkerTest.kt new file mode 100644 index 000000000..f7263fffe --- /dev/null +++ b/subscribing-sdk/src/test/java/com/ably/tracking/subscriber/workerqueue/workers/UpdatePublisherPresenceWorkerTest.kt @@ -0,0 +1,41 @@ +package com.ably.tracking.subscriber.workerqueue.workers + +import com.ably.tracking.common.PresenceMessage +import com.ably.tracking.subscriber.SubscriberProperties +import com.ably.tracking.subscriber.workerqueue.WorkerSpecification +import io.mockk.Runs +import io.mockk.every +import io.mockk.just +import io.mockk.mockk +import io.mockk.verify +import kotlinx.coroutines.ExperimentalCoroutinesApi +import kotlinx.coroutines.test.runBlockingTest +import org.junit.Test + +@ExperimentalCoroutinesApi +internal class UpdatePublisherPresenceWorkerTest { + + private val subscriberProperties: SubscriberProperties = mockk() + private val presenceMessage: PresenceMessage = mockk() + private val asyncWorks = mutableListOf Unit>() + private val postedWorks = mutableListOf() + + @Test + fun `should call updateForPresenceMessagesAndThenEmitStateEventsIfRequired`() = runBlockingTest { + // given + val worker = UpdatePublisherPresenceWorker(presenceMessage) + every { subscriberProperties.updateForPresenceMessagesAndThenEmitStateEventsIfRequired(any()) } just Runs + + // when + worker.doWork( + subscriberProperties, + asyncWorks.appendWork(), + postedWorks.appendSpecification() + ) + + // then + verify { + subscriberProperties.updateForPresenceMessagesAndThenEmitStateEventsIfRequired(listOf(presenceMessage)) + } + } +} diff --git a/subscribing-sdk/src/test/java/com/ably/tracking/subscriber/workerqueue/workers/WorkerTestUtils.kt b/subscribing-sdk/src/test/java/com/ably/tracking/subscriber/workerqueue/workers/WorkerTestUtils.kt new file mode 100644 index 000000000..941b7162f --- /dev/null +++ b/subscribing-sdk/src/test/java/com/ably/tracking/subscriber/workerqueue/workers/WorkerTestUtils.kt @@ -0,0 +1,17 @@ +package com.ably.tracking.subscriber.workerqueue.workers + +import com.ably.tracking.subscriber.workerqueue.WorkerSpecification + +fun MutableList Unit>.appendWork(): (suspend () -> Unit) -> Unit = + { asyncWork -> + add(asyncWork) + } + +suspend fun MutableList Unit>.executeAll() { + forEach { it.invoke() } +} + +internal fun MutableList.appendSpecification(): (WorkerSpecification) -> Unit = + { workSpecification -> + add(workSpecification) + } From 88067b84956be3fad019cd5763f3d7caacf2e6cd Mon Sep 17 00:00:00 2001 From: Quintin Willison Date: Wed, 21 Dec 2022 14:23:06 +0000 Subject: [PATCH 19/19] Fix formatting. --- .../workers/SubscribeForPresenceMessagesWorkerTest.kt | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/subscribing-sdk/src/test/java/com/ably/tracking/subscriber/workerqueue/workers/SubscribeForPresenceMessagesWorkerTest.kt b/subscribing-sdk/src/test/java/com/ably/tracking/subscriber/workerqueue/workers/SubscribeForPresenceMessagesWorkerTest.kt index 30bfc7f89..804f9d0bb 100644 --- a/subscribing-sdk/src/test/java/com/ably/tracking/subscriber/workerqueue/workers/SubscribeForPresenceMessagesWorkerTest.kt +++ b/subscribing-sdk/src/test/java/com/ably/tracking/subscriber/workerqueue/workers/SubscribeForPresenceMessagesWorkerTest.kt @@ -84,7 +84,8 @@ internal class SubscribeForPresenceMessagesWorkerTest { WorkerSpecification.ProcessInitialPresenceMessages( initialPresenceMessages, callbackFunction - ), postedWorks[0] + ), + postedWorks[0], ) }