diff --git a/src/main/kotlin/org/eclipse/uprotocol/client/usubscription/v3/InMemoryUSubscriptionClient.kt b/src/main/kotlin/org/eclipse/uprotocol/client/usubscription/v3/InMemoryUSubscriptionClient.kt index 24a1902..5e96f1d 100644 --- a/src/main/kotlin/org/eclipse/uprotocol/client/usubscription/v3/InMemoryUSubscriptionClient.kt +++ b/src/main/kotlin/org/eclipse/uprotocol/client/usubscription/v3/InMemoryUSubscriptionClient.kt @@ -36,8 +36,8 @@ import java.util.logging.Logger */ class InMemoryUSubscriptionClient( private val transport: UTransport, - private val rpcClient: RpcClient, - private val notifier: Notifier, + private val rpcClient: RpcClient = InMemoryRpcClient(transport), + private val notifier: Notifier = SimpleNotifier(transport), dispatcher: CoroutineDispatcher = Dispatchers.IO ) : USubscriptionClient { // Map to store subscription change notification handlers @@ -47,14 +47,6 @@ class InMemoryUSubscriptionClient( private val mutex = Mutex() - /** - * Creates a new USubscription client passing [UTransport] - * used to provide additional options for the RPC requests to uSubscription service. - * - * @param transport the transport to use for sending the notifications - */ - constructor(transport: UTransport):this(transport, InMemoryRpcClient(transport), SimpleNotifier(transport)) - // transport Notification listener that will process subscription change notifications private val mNotificationListener = UListener { message -> handleNotifications(message) @@ -214,8 +206,17 @@ class InMemoryUSubscriptionClient( } return rpcClient.invokeMethod(REGISTER_NOTIFICATIONS_METHOD, UPayload.pack(request), options) .mapToMessage().mapCatching { response -> - mHandlers[topic]?.takeIf { it != handler}?.let { - throw UStatusException(UCode.ALREADY_EXISTS, "Handler already registered") + handler?.let { handler-> + mutex.withLock { + mHandlers[topic]?.let { + if (it != handler){ + throw UStatusException(UCode.ALREADY_EXISTS, "Handler already registered") + } + it + }?:run { + mHandlers[topic] = handler + } + } } response } @@ -226,15 +227,13 @@ class InMemoryUSubscriptionClient( * * @param topic The topic to unregister for notifications. * @param options The [CallOptions] to be used for the request. - * @param handler The [SubscriptionChangeHandler] to be unregistered. * @return [Result] with [NotificationsResponse] if uSubscription service accepts the * request to unregister the caller to be notified of subscription changes, or * [Result] with [UStatusException] that indicates the reason for the failure. */ override suspend fun unregisterForNotifications( topic: UUri, - options: CallOptions, - handler: SubscriptionChangeHandler? + options: CallOptions ): Result { val request = notificationsRequest { this.topic = topic @@ -304,6 +303,7 @@ class InMemoryUSubscriptionClient( * @param message The notification message from the USubscription service */ private fun handleNotifications(message: UMessage) { + println("msg") // Ignore messages that are not notifications message.takeIf { it.attributes.type == UMessageType.UMESSAGE_TYPE_NOTIFICATION }?.let { msg -> // Unpack the notification message from uSubscription called Update diff --git a/src/main/kotlin/org/eclipse/uprotocol/client/usubscription/v3/USubscriptionClient.kt b/src/main/kotlin/org/eclipse/uprotocol/client/usubscription/v3/USubscriptionClient.kt index 67c549f..c5ce9dc 100644 --- a/src/main/kotlin/org/eclipse/uprotocol/client/usubscription/v3/USubscriptionClient.kt +++ b/src/main/kotlin/org/eclipse/uprotocol/client/usubscription/v3/USubscriptionClient.kt @@ -107,15 +107,13 @@ interface USubscriptionClient { * * @param topic The topic to unregister for notifications. * @param options The [CallOptions] to be used for the request. - * @param handler The [SubscriptionChangeHandler] to be unregistered. * @return [Result] with [NotificationsResponse] if uSubscription service accepts the * request to unregister the caller to be notified of subscription changes, or * [Result] with [UStatusException] that indicates the reason for the failure. */ suspend fun unregisterForNotifications( topic: UUri, - options: CallOptions = CallOptions(), - handler: SubscriptionChangeHandler? = null + options: CallOptions = CallOptions() ): Result /** diff --git a/src/test/kotlin/org/eclipse/uprotocol/client/usubscription/v3/InMemoryUSubscriptionClientTest.kt b/src/test/kotlin/org/eclipse/uprotocol/client/usubscription/v3/InMemoryUSubscriptionClientTest.kt index 1602800..1dc9e15 100644 --- a/src/test/kotlin/org/eclipse/uprotocol/client/usubscription/v3/InMemoryUSubscriptionClientTest.kt +++ b/src/test/kotlin/org/eclipse/uprotocol/client/usubscription/v3/InMemoryUSubscriptionClientTest.kt @@ -13,10 +13,7 @@ package org.eclipse.uprotocol.client.usubscription.v3 -import io.mockk.coEvery -import io.mockk.coVerify -import io.mockk.mockk -import io.mockk.slot +import io.mockk.* import kotlinx.coroutines.ExperimentalCoroutinesApi import kotlinx.coroutines.test.TestScope import kotlinx.coroutines.test.UnconfinedTestDispatcher @@ -26,10 +23,11 @@ import org.eclipse.uprotocol.core.usubscription.v3.* import org.eclipse.uprotocol.transport.* import org.eclipse.uprotocol.v1.* import org.junit.jupiter.api.Assertions +import org.junit.jupiter.api.Assertions.assertEquals +import org.junit.jupiter.api.Assertions.assertTrue import org.junit.jupiter.api.BeforeEach import org.junit.jupiter.api.DisplayName import org.junit.jupiter.api.Test -import java.util.concurrent.CompletableFuture @ExperimentalCoroutinesApi class InMemoryUSubscriptionClientTest { @@ -82,7 +80,7 @@ class InMemoryUSubscriptionClientTest { @Test @DisplayName("Test subscribe happy path without handler") fun test_subscribe_happy_path_without_handler() = testScope.runTest { - val topic = createTopic() + val topic = testTopic val transport = TestUTransport() val subscriber: USubscriptionClient = InMemoryUSubscriptionClient(transport, mockClient, mockNotifier) @@ -95,14 +93,14 @@ class InMemoryUSubscriptionClientTest { val result = subscriber.subscribe(topic, {}) val payloadSlot = slot() coVerify { mockClient.invokeMethod(any(), capture(payloadSlot), CallOptions()) } - Assertions.assertEquals(1, transport.listeners.size) - Assertions.assertEquals(SubscriptionStatus.State.SUBSCRIBED, result.getOrNull()?.status?.state) + assertEquals(1, transport.listeners.size) + assertEquals(SubscriptionStatus.State.SUBSCRIBED, result.getOrNull()?.status?.state) } @Test @DisplayName("Test subscribe happy path with null handler") fun test_subscribe_happy_path_with_NULL_handler() = testScope.runTest { - val topic = createTopic() + val topic = testTopic val transport = TestUTransport() val subscriber: USubscriptionClient = InMemoryUSubscriptionClient(transport, mockClient, mockNotifier) @@ -115,14 +113,14 @@ class InMemoryUSubscriptionClientTest { val result = subscriber.subscribe(topic, {}, CallOptions(100), handler = null) val payloadSlot = slot() coVerify { mockClient.invokeMethod(any(), capture(payloadSlot), any()) } - Assertions.assertEquals(1, transport.listeners.size) - Assertions.assertEquals(SubscriptionStatus.State.SUBSCRIBED, result.getOrNull()?.status?.state) + assertEquals(1, transport.listeners.size) + assertEquals(SubscriptionStatus.State.SUBSCRIBED, result.getOrNull()?.status?.state) } @Test @DisplayName("Test subscribe happy path with handler") fun test_subscribe_happy_path_with_handler() = testScope.runTest { - val topic = createTopic() + val topic = testTopic val transport: UTransport = TestUTransport() val handler = SubscriptionChangeHandler { _, _ -> } @@ -137,13 +135,13 @@ class InMemoryUSubscriptionClientTest { val result = subscriber.subscribe(topic, {}, handler = handler) val payloadSlot = slot() coVerify { mockClient.invokeMethod(any(), capture(payloadSlot), CallOptions()) } - Assertions.assertEquals(SubscriptionStatus.State.SUBSCRIBED, result.getOrNull()?.status?.state) + assertEquals(SubscriptionStatus.State.SUBSCRIBED, result.getOrNull()?.status?.state) } @Test @DisplayName("Test subscribe but state is not subscribed") fun test_subscribe_state_not_subscribed() = testScope.runTest { - val topic = createTopic() + val topic = testTopic val transport = TestUTransport() val handler = SubscriptionChangeHandler { _, _ -> } @@ -161,14 +159,14 @@ class InMemoryUSubscriptionClientTest { val result = subscriber.subscribe(topic, {}, handler = handler) val payloadSlot = slot() coVerify { mockClient.invokeMethod(any(), capture(payloadSlot), CallOptions()) } - Assertions.assertEquals(0, transport.listeners.size) - Assertions.assertEquals(SubscriptionStatus.State.UNSUBSCRIBED, result.getOrNull()?.status?.state) + assertEquals(0, transport.listeners.size) + assertEquals(SubscriptionStatus.State.UNSUBSCRIBED, result.getOrNull()?.status?.state) } @Test @DisplayName("Test subscribe pending subscribe") fun test_subscribe_pending_subscribe() = testScope.runTest { - val topic = createTopic() + val topic = testTopic val transport = TestUTransport() val handler = SubscriptionChangeHandler { _, _ -> } @@ -185,15 +183,15 @@ class InMemoryUSubscriptionClientTest { val result = subscriber.subscribe(topic, {}, handler = handler) val payloadSlot = slot() coVerify { mockClient.invokeMethod(any(), capture(payloadSlot), CallOptions()) } - Assertions.assertEquals(1, transport.listeners.size) - Assertions.assertEquals(SubscriptionStatus.State.SUBSCRIBE_PENDING, result.getOrNull()?.status?.state) + assertEquals(1, transport.listeners.size) + assertEquals(SubscriptionStatus.State.SUBSCRIBE_PENDING, result.getOrNull()?.status?.state) } @Test @DisplayName("Test subscribe transport failure") fun test_subscribe_transport_failure() = testScope.runTest { - val topic = createTopic() + val topic = testTopic val transport: UTransport = TestUTransport() val handler = SubscriptionChangeHandler { _, _ -> } @@ -206,19 +204,19 @@ class InMemoryUSubscriptionClientTest { val result = subscriber.subscribe(topic, {}, handler = handler) val payloadSlot = slot() coVerify { mockClient.invokeMethod(any(), capture(payloadSlot), CallOptions()) } - Assertions.assertTrue(result.exceptionOrNull() is UStatusException) - Assertions.assertEquals(UCode.FAILED_PRECONDITION, (result.exceptionOrNull() as UStatusException).code) + assertTrue(result.exceptionOrNull() is UStatusException) + assertEquals(UCode.FAILED_PRECONDITION, (result.exceptionOrNull() as UStatusException).code) } @Test @DisplayName("Test subscribe, then send notification") fun test_subscribe_then_notification() = testScope.runTest { - val topic = createTopic() + val topic = testTopic val transport: UTransport = TestUTransport() var isHandlerCalled = false val handler = SubscriptionChangeHandler { uri, status -> - Assertions.assertEquals(topic, uri) - Assertions.assertEquals(SubscriptionStatus.State.SUBSCRIBED, status.state) + assertEquals(topic, uri) + assertEquals(SubscriptionStatus.State.SUBSCRIBED, status.state) isHandlerCalled = true } val notificationListenerSlot = slot() @@ -247,13 +245,13 @@ class InMemoryUSubscriptionClientTest { ) } notificationListenerSlot.captured.onReceive(testMessage) - Assertions.assertTrue(isHandlerCalled) + assertTrue(isHandlerCalled) } @Test @DisplayName("Test subscribe, then send notification but exception") fun test_subscribe_then_notification_but_exception() = testScope.runTest { - val topic = createTopic() + val topic = testTopic val transport: UTransport = TestUTransport() val handler = SubscriptionChangeHandler { _, _ -> throw Exception("test Error") @@ -289,7 +287,7 @@ class InMemoryUSubscriptionClientTest { @Test @DisplayName("Test subscribe, then send empty notification") fun test_subscribe_then_empty_notification() = testScope.runTest { - val topic = createTopic() + val topic = testTopic val transport: UTransport = TestUTransport() var isHandlerCalled = false val handler = SubscriptionChangeHandler { _, _ -> @@ -325,7 +323,7 @@ class InMemoryUSubscriptionClientTest { @Test @DisplayName("Test subscribe, then send other messages not for notification") fun test_subscribe_then_not_notification() = testScope.runTest { - val topic = createTopic() + val topic = testTopic val transport: UTransport = TestUTransport() var isHandlerCalled = false val handler = SubscriptionChangeHandler { _, _ -> @@ -362,7 +360,7 @@ class InMemoryUSubscriptionClientTest { @Test @DisplayName("Test subscribe, then send notification but wrong topic") fun test_subscribe_then_notification_but_wrong_topic() = testScope.runTest { - val topic = createTopic() + val topic = testTopic val transport: UTransport = TestUTransport() var isHandlerCalled = false val handler = SubscriptionChangeHandler { _, _ -> @@ -400,7 +398,7 @@ class InMemoryUSubscriptionClientTest { @Test @DisplayName("Test subscribe twice with same handler") fun test_subscribe_twice_with_same_handler() = testScope.runTest { - val topic = createTopic() + val topic = testTopic val transport: UTransport = TestUTransport() val handler = SubscriptionChangeHandler { _, _ -> } @@ -414,13 +412,13 @@ class InMemoryUSubscriptionClientTest { subscriber.subscribe(topic, {}, handler = handler) val result = subscriber.subscribe(topic, {}, handler = handler) - Assertions.assertEquals(SubscriptionStatus.State.SUBSCRIBED, result.getOrNull()?.status?.state) + assertEquals(SubscriptionStatus.State.SUBSCRIBED, result.getOrNull()?.status?.state) } @Test @DisplayName("Test subscribe twice with new handler") fun test_subscribe_twice_with_new_handler() = testScope.runTest { - val topic = createTopic() + val topic = testTopic val transport: UTransport = TestUTransport() val handler1 = SubscriptionChangeHandler { _, _ -> } @@ -436,14 +434,14 @@ class InMemoryUSubscriptionClientTest { subscriber.subscribe(topic, {}, handler = handler1) val result = subscriber.subscribe(topic, {}, handler = handler2) - Assertions.assertTrue(result.exceptionOrNull() is UStatusException) - Assertions.assertEquals(UCode.ALREADY_EXISTS, (result.exceptionOrNull() as UStatusException).code) + assertTrue(result.exceptionOrNull() is UStatusException) + assertEquals(UCode.ALREADY_EXISTS, (result.exceptionOrNull() as UStatusException).code) } @Test @DisplayName("Test subscribe and then unsubscribe happy path") fun test_subscribe_and_then_unsubscribe_happy_path() = testScope.runTest { - val topic = createTopic() + val topic = testTopic val transport: UTransport = TestUTransport() val listener = UListener { } val handler = SubscriptionChangeHandler { _, _ -> @@ -457,13 +455,13 @@ class InMemoryUSubscriptionClientTest { subscriber.subscribe(topic, listener, handler = handler) val result = subscriber.unsubscribe(topic, listener) - Assertions.assertEquals(UCode.OK, result.code) + assertEquals(UCode.OK, result.code) } @Test @DisplayName("Test subscribe and then unsubscribe happy path with handler") fun test_subscribe_and_then_unsubscribe_happy_path_with_handler() = testScope.runTest { - val topic = createTopic() + val topic = testTopic val transport: UTransport = TestUTransport() val listener = UListener { } val subscriber: USubscriptionClient = InMemoryUSubscriptionClient(transport, mockClient, mockNotifier) @@ -475,13 +473,13 @@ class InMemoryUSubscriptionClientTest { subscriber.subscribe(topic, listener) val result = subscriber.unsubscribe(topic, listener) - Assertions.assertEquals(UCode.OK, result.code) + assertEquals(UCode.OK, result.code) } @Test @DisplayName("Test unsubscribe happy path") fun test_unsubscribe_happy_path() = testScope.runTest { - val topic = createTopic() + val topic = testTopic val transport: UTransport = TestUTransport() val subscriber: USubscriptionClient = InMemoryUSubscriptionClient(transport, mockClient, mockNotifier) coEvery { mockClient.invokeMethod(any(), any(), any()) } returns Result.success( @@ -493,13 +491,13 @@ class InMemoryUSubscriptionClientTest { val result = subscriber.unsubscribe(topic, { throw UnsupportedOperationException("Unimplemented method 'onReceive'") }) - Assertions.assertEquals(UCode.NOT_FOUND, result.code) + assertEquals(UCode.NOT_FOUND, result.code) } @Test @DisplayName("Test unsubscribe happy path with CallOption") fun test_unsubscribe_happy_path_with_callOption() = testScope.runTest { - val topic = createTopic() + val topic = testTopic val transport: UTransport = TestUTransport() val subscriber: USubscriptionClient = InMemoryUSubscriptionClient(transport, mockClient, mockNotifier) coEvery { mockClient.invokeMethod(any(), any(), any()) } returns Result.success( @@ -511,13 +509,13 @@ class InMemoryUSubscriptionClientTest { val result = subscriber.unsubscribe(topic, { throw UnsupportedOperationException("should not execute") }, CallOptions()) - Assertions.assertEquals(UCode.NOT_FOUND, result.code) + assertEquals(UCode.NOT_FOUND, result.code) } @Test @DisplayName("Test unregisterListener after we successfully subscribed to a topic") fun testUnregisterListener() = testScope.runTest { - val topic = createTopic() + val topic = testTopic val myListener = UListener { } val transport: UTransport = TestUTransport() val subscriber: USubscriptionClient = InMemoryUSubscriptionClient(transport, mockClient, mockNotifier) @@ -530,13 +528,13 @@ class InMemoryUSubscriptionClientTest { subscriber.subscribe(topic, myListener, CallOptions(100)) val result = subscriber.unregisterListener(topic, myListener) - Assertions.assertEquals(UCode.OK, result.code) + assertEquals(UCode.OK, result.code) } @Test @DisplayName("Test unsubscribe with UStatusException") fun testUnsubscribeWithUStatusException() = testScope.runTest { - val topic = createTopic() + val topic = testTopic val transport: UTransport = CommStatusTransport() val subscriber: USubscriptionClient = InMemoryUSubscriptionClient(transport, mockClient, mockNotifier) @@ -546,14 +544,14 @@ class InMemoryUSubscriptionClientTest { val result = subscriber.unsubscribe(topic, {}) - Assertions.assertEquals(UCode.FAILED_PRECONDITION, result.code) - Assertions.assertEquals("CommStatus Error", result.message) + assertEquals(UCode.FAILED_PRECONDITION, result.code) + assertEquals("CommStatus Error", result.message) } @Test @DisplayName("Test unsubscribe with OtherException") fun testUnsubscribeWithOtherException() = testScope.runTest { - val topic = createTopic() + val topic = testTopic val transport: UTransport = CommStatusTransport() val subscriber: USubscriptionClient = InMemoryUSubscriptionClient(transport, mockClient, mockNotifier) @@ -563,14 +561,14 @@ class InMemoryUSubscriptionClientTest { val result = subscriber.unsubscribe(topic, {}) - Assertions.assertEquals(UCode.INVALID_ARGUMENT, result.code) - Assertions.assertEquals("Some Error", result.message) + assertEquals(UCode.INVALID_ARGUMENT, result.code) + assertEquals("Some Error", result.message) } @Test @DisplayName("Test unsubscribe with OtherException No Message") fun testUnsubscribeWithOtherExceptionNoMessage() = testScope.runTest { - val topic = createTopic() + val topic = testTopic val transport: UTransport = CommStatusTransport() val subscriber: USubscriptionClient = InMemoryUSubscriptionClient(transport, mockClient, mockNotifier) @@ -580,16 +578,336 @@ class InMemoryUSubscriptionClientTest { val result = subscriber.unsubscribe(topic, {}) - Assertions.assertEquals(UCode.INVALID_ARGUMENT, result.code) - Assertions.assertEquals("Invalid argument", result.message) + assertEquals(UCode.INVALID_ARGUMENT, result.code) + assertEquals("Invalid argument", result.message) } - private fun createTopic(): UUri { - return uUri { - authorityName = "hartley" - ueId = 3 - ueVersionMajor = 1 - resourceId = 0x8000 + @Test + @DisplayName("Test register for Notifications without Handler") + fun testRegisterForNotificationsWithoutHandler() = testScope.runTest { + val topic = testTopic + val transport = TestUTransport() + val subscriber: USubscriptionClient = InMemoryUSubscriptionClient(transport, mockClient, mockNotifier) + val response = notificationsResponse { } + coEvery { mockClient.invokeMethod(any(), any(), any()) } returns Result.success( + UPayload.pack(response) + ) + val result = subscriber.registerForNotifications(topic) + val uriSlot = slot() + coVerify { mockClient.invokeMethod(capture(uriSlot), any(), any()) } + assertEquals(6, uriSlot.captured.resourceId) + assertTrue(result.isSuccess) + assertEquals(response, result.getOrNull()) + } + + @Test + @DisplayName("Test register for Notifications Exception") + fun testRegisterForNotificationsException() = testScope.runTest { + val topic = testTopic + val transport = TestUTransport() + val exception = IllegalStateException("Some Error") + val subscriber: USubscriptionClient = InMemoryUSubscriptionClient(transport, mockClient, mockNotifier) + coEvery { mockClient.invokeMethod(any(), any(), any()) } returns Result.failure(exception) + val result = subscriber.registerForNotifications(topic) + val uriSlot = slot() + coVerify { mockClient.invokeMethod(capture(uriSlot), any(), any()) } + assertEquals(6, uriSlot.captured.resourceId) + assertTrue(result.isFailure) + assertEquals(exception, result.exceptionOrNull()) + } + + @Test + @DisplayName("Test register for Notifications with CallOption") + fun testRegisterForNotificationsWithCallOption() = testScope.runTest { + val topic = testTopic + val transport = TestUTransport() + val subscriber: USubscriptionClient = InMemoryUSubscriptionClient(transport, mockClient, mockNotifier) + val response = notificationsResponse { } + coEvery { mockClient.invokeMethod(any(), any(), any()) } returns Result.success( + UPayload.pack(response) + ) + val callOptions = CallOptions(100, token = "testToken") + val result = subscriber.registerForNotifications(topic, callOptions) + coVerify { mockClient.invokeMethod(any(), any(), callOptions) } + assertTrue(result.isSuccess) + assertEquals(response, result.getOrNull()) + } + + @Test + @DisplayName("Test register for Notifications with Handler") + fun testRegisterForNotificationsWithHandler() = testScope.runTest { + val topic = testTopic + val transport = TestUTransport() + + val response = notificationsResponse { } + coEvery { mockClient.invokeMethod(any(), any(), any()) } returns Result.success( + UPayload.pack(response) + ) + val notificationListenerSlot = slot() + coEvery { + mockNotifier.registerNotificationListener( + any(), + capture(notificationListenerSlot) + ) + } returns OK_STATUS + var counter = 0 + val handler = SubscriptionChangeHandler { t, status -> + counter += 1 + assertEquals(topic, t) + assertEquals(SubscriptionStatus.State.SUBSCRIBED, status.state) + } + + val subscriber: USubscriptionClient = InMemoryUSubscriptionClient(transport, mockClient, mockNotifier) + val result = subscriber.registerForNotifications(topic, handler = handler) + coVerify { mockClient.invokeMethod(any(), any(), any()) } + assertTrue(result.isSuccess) + assertEquals(response, result.getOrNull()) + notificationListenerSlot.captured.onReceive(uMessage { + forNotification(testTopic, testSink) + setPayload( + UPayload.pack(update { + this.topic = testTopic + status = subscriptionStatus { state = SubscriptionStatus.State.SUBSCRIBED } + }) + ) + }) + assertEquals(1, counter) + } + + @Test + @DisplayName("Test register for Notifications twice with diff Handler") + fun testRegisterForNotificationsTwiceWithDiffHandler() = testScope.runTest { + val topic = testTopic + val transport = TestUTransport() + val response = notificationsResponse { } + coEvery { mockClient.invokeMethod(any(), any(), any()) } returns Result.success( + UPayload.pack(response) + ) + val handler1 = SubscriptionChangeHandler { _, _ -> } + + val handler2 = SubscriptionChangeHandler { _, _ -> + } + + val subscriber: USubscriptionClient = InMemoryUSubscriptionClient(transport, mockClient, mockNotifier) + val result1 = subscriber.registerForNotifications(topic, handler = handler1) + val result2 = subscriber.registerForNotifications(topic, handler = handler2) + coVerify(exactly = 2) { mockClient.invokeMethod(any(), any(), any()) } + assertTrue(result1.isSuccess) + assertTrue(result2.isFailure) + assertEquals(response, result1.getOrNull()) + assertEquals(UCode.ALREADY_EXISTS, (result2.exceptionOrNull() as UStatusException).code) + assertEquals("Handler already registered", (result2.exceptionOrNull() as UStatusException).message) + } + + @Test + @DisplayName("Test register for Notifications twice with same Handler") + fun testRegisterForNotificationsTwiceWithSameHandler() = testScope.runTest { + val topic = testTopic + val transport = TestUTransport() + val response = notificationsResponse { } + coEvery { mockClient.invokeMethod(any(), any(), any()) } returns Result.success( + UPayload.pack(response) + ) + val handler = SubscriptionChangeHandler { _, _ -> + } + + val subscriber: USubscriptionClient = InMemoryUSubscriptionClient(transport, mockClient, mockNotifier) + val result1 = subscriber.registerForNotifications(topic, handler = handler) + val result2 = subscriber.registerForNotifications(topic, handler = handler) + coVerify(exactly = 2) { mockClient.invokeMethod(any(), any(), any()) } + assertTrue(result1.isSuccess) + assertTrue(result2.isSuccess) + assertEquals(response, result1.getOrNull()) + assertEquals(response, result2.getOrNull()) + } + + @Test + @DisplayName("Test unregister for Notifications") + fun testUnregisterForNotifications() = testScope.runTest { + val topic = testTopic + val transport = TestUTransport() + + val response = notificationsResponse { } + coEvery { mockClient.invokeMethod(any(), any(), any()) } returns Result.success( + UPayload.pack(response) + ) + val subscriber: USubscriptionClient = InMemoryUSubscriptionClient(transport, mockClient, mockNotifier) + val result = subscriber.unregisterForNotifications(topic) + + val uriSlot = slot() + coVerify { mockClient.invokeMethod(capture(uriSlot), any(), any()) } + assertEquals(7, uriSlot.captured.resourceId) + assertTrue(result.isSuccess) + assertEquals(response, result.getOrNull()) + } + + @Test + @DisplayName("Test unregister for Notifications Exception") + fun testUnregisterForNotificationsException() = testScope.runTest { + val topic = testTopic + val transport = TestUTransport() + val exception = IllegalStateException("Some Error") + coEvery { mockClient.invokeMethod(any(), any(), any()) } returns Result.failure(exception) + val subscriber: USubscriptionClient = InMemoryUSubscriptionClient(transport, mockClient, mockNotifier) + val result = subscriber.unregisterForNotifications(topic) + + val uriSlot = slot() + coVerify { mockClient.invokeMethod(capture(uriSlot), any(), any()) } + assertEquals(7, uriSlot.captured.resourceId) + assertTrue(result.isFailure) + assertEquals(exception, result.exceptionOrNull()) + } + + @Test + @DisplayName("Test unregister for Notifications with CallOption") + fun testUnregisterForNotificationsWithCallOption() = testScope.runTest { + val topic = testTopic + val transport = TestUTransport() + + val response = notificationsResponse { } + coEvery { mockClient.invokeMethod(any(), any(), any()) } returns Result.success( + UPayload.pack(response) + ) + + val callOptions = CallOptions(100, token = "testToken") + val subscriber: USubscriptionClient = InMemoryUSubscriptionClient(transport, mockClient, mockNotifier) + val result = subscriber.unregisterForNotifications(topic, callOptions) + + val uriSlot = slot() + coVerify { mockClient.invokeMethod(capture(uriSlot), any(), callOptions) } + assertEquals(7, uriSlot.captured.resourceId) + assertTrue(result.isSuccess) + assertEquals(response, result.getOrNull()) + } + + @Test + @DisplayName("Test FetchSubscribers") + fun testFetchSubscribers() = testScope.runTest { + val topic = testTopic + val transport = TestUTransport() + + val response = fetchSubscribersResponse { } + coEvery { mockClient.invokeMethod(any(), any(), any()) } returns Result.success( + UPayload.pack(response) + ) + + val subscriber: USubscriptionClient = InMemoryUSubscriptionClient(transport, mockClient, mockNotifier) + val result = subscriber.fetchSubscribers(topic) + + val uriSlot = slot() + coVerify { mockClient.invokeMethod(capture(uriSlot), any(), any()) } + assertEquals(8, uriSlot.captured.resourceId) + assertTrue(result.isSuccess) + assertEquals(response, result.getOrNull()) + } + + @Test + @DisplayName("Test FetchSubscribers Exception") + fun testFetchSubscribersException() = testScope.runTest { + val topic = testTopic + val transport = TestUTransport() + val exception = IllegalStateException("Some Error") + coEvery { mockClient.invokeMethod(any(), any(), any()) } returns Result.failure( + exception + ) + + val subscriber: USubscriptionClient = InMemoryUSubscriptionClient(transport, mockClient, mockNotifier) + val result = subscriber.fetchSubscribers(topic) + + val uriSlot = slot() + coVerify { mockClient.invokeMethod(capture(uriSlot), any(), any()) } + assertEquals(8, uriSlot.captured.resourceId) + assertTrue(result.isFailure) + assertEquals(exception, result.exceptionOrNull()) + } + + @Test + @DisplayName("Test FetchSubscribers with CallOption") + fun testFetchSubscribersWithCallOption() = testScope.runTest { + val topic = testTopic + val transport = TestUTransport() + + val response = fetchSubscribersResponse { } + coEvery { mockClient.invokeMethod(any(), any(), any()) } returns Result.success( + UPayload.pack(response) + ) + + val callOptions = CallOptions(100, token = "testToken") + val subscriber: USubscriptionClient = InMemoryUSubscriptionClient(transport, mockClient, mockNotifier) + val result = subscriber.fetchSubscribers(topic, callOptions) + + val uriSlot = slot() + coVerify { mockClient.invokeMethod(capture(uriSlot), any(), callOptions) } + assertEquals(8, uriSlot.captured.resourceId) + assertTrue(result.isSuccess) + assertEquals(response, result.getOrNull()) + } + + @Test + @DisplayName("Test FetchSubscriptions") + fun testFetchSubscriptions() = testScope.runTest { + val transport = TestUTransport() + + val response = fetchSubscriptionsResponse { } + coEvery { mockClient.invokeMethod(any(), any(), any()) } returns Result.success( + UPayload.pack(response) + ) + + val subscriber: USubscriptionClient = InMemoryUSubscriptionClient(transport, mockClient, mockNotifier) + val result = subscriber.fetchSubscriptions(fetchSubscriptionsRequest { }) + + val uriSlot = slot() + coVerify { mockClient.invokeMethod(capture(uriSlot), any(), any()) } + assertEquals(3, uriSlot.captured.resourceId) + assertTrue(result.isSuccess) + assertEquals(response, result.getOrNull()) + } + + @Test + @DisplayName("Test FetchSubscriptions with CallOption") + fun testFetchSubscriptionsWithCallOption() = testScope.runTest { + val transport = TestUTransport() + + val response = fetchSubscriptionsResponse { } + coEvery { mockClient.invokeMethod(any(), any(), any()) } returns Result.success( + UPayload.pack(response) + ) + + val callOptions = CallOptions(100, token = "testToken") + val subscriber: USubscriptionClient = InMemoryUSubscriptionClient(transport, mockClient, mockNotifier) + val result = subscriber.fetchSubscriptions(fetchSubscriptionsRequest { }, callOptions) + + val uriSlot = slot() + coVerify { mockClient.invokeMethod(capture(uriSlot), any(), callOptions) } + assertEquals(3, uriSlot.captured.resourceId) + assertTrue(result.isSuccess) + assertEquals(response, result.getOrNull()) + } + + @Test + @DisplayName("Test FetchSubscriptions Exception") + fun testFetchSubscriptionsException() = testScope.runTest { + val transport = TestUTransport() + val exception = IllegalStateException("Some Error") + coEvery { mockClient.invokeMethod(any(), any(), any()) } returns Result.failure( + exception + ) + + val subscriber: USubscriptionClient = InMemoryUSubscriptionClient(transport, mockClient, mockNotifier) + val result = subscriber.fetchSubscriptions(fetchSubscriptionsRequest { }) + + val uriSlot = slot() + coVerify { mockClient.invokeMethod(capture(uriSlot), any(), any()) } + assertEquals(3, uriSlot.captured.resourceId) + assertTrue(result.isFailure) + assertEquals(exception, result.exceptionOrNull()) + } + + private val testTopic = uUri { + authorityName = "hartley" + ueId = 3 + ueVersionMajor = 1 + resourceId = 0x8000 } } \ No newline at end of file diff --git a/src/test/kotlin/org/eclipse/uprotocol/client/utwin/v2/SimpleUTwinClientTest.kt b/src/test/kotlin/org/eclipse/uprotocol/client/utwin/v2/SimpleUTwinClientTest.kt index dafd3bc..aead4a6 100644 --- a/src/test/kotlin/org/eclipse/uprotocol/client/utwin/v2/SimpleUTwinClientTest.kt +++ b/src/test/kotlin/org/eclipse/uprotocol/client/utwin/v2/SimpleUTwinClientTest.kt @@ -12,14 +12,18 @@ */ package org.eclipse.uprotocol.client.utwin.v2 -import io.mockk.coEvery -import io.mockk.mockk -import io.mockk.unmockkAll +import io.mockk.* import kotlinx.coroutines.test.runTest +import org.eclipse.uprotocol.communication.CallOptions import org.eclipse.uprotocol.communication.RpcClient import org.eclipse.uprotocol.communication.UPayload import org.eclipse.uprotocol.communication.UStatusException +import org.eclipse.uprotocol.core.udiscovery.v3.addNodesResponse +import org.eclipse.uprotocol.core.utwin.v2.GetLastMessagesRequest +import org.eclipse.uprotocol.core.utwin.v2.GetLastMessagesResponse import org.eclipse.uprotocol.core.utwin.v2.getLastMessagesRequest +import org.eclipse.uprotocol.core.utwin.v2.getLastMessagesResponse +import org.eclipse.uprotocol.core.utwin.v2.messageResponse import org.eclipse.uprotocol.transport.UTransport import org.eclipse.uprotocol.v1.* import org.junit.jupiter.api.AfterEach @@ -34,7 +38,6 @@ import kotlin.test.assertTrue * This is the test code for said implementation. */ class SimpleUTwinClientTest { - private val transport: UTransport = mockk(relaxed = true) private val rpcClient: RpcClient = mockk(relaxed = true) private val topic: UUri = uUri { @@ -58,12 +61,57 @@ class SimpleUTwinClientTest { @DisplayName("Test calling getLastMessages() with valid topics") fun testGetLastMessages() = runTest { val topics = UUriBatch.newBuilder().addUris(topic).build() - coEvery { rpcClient.invokeMethod(any(), any(), any()) } returns Result.success(UPayload.pack( - getLastMessagesRequest { } + val optionSlot = slot() + coEvery { rpcClient.invokeMethod(any(), any(), capture(optionSlot)) } returns Result.success(UPayload.pack( + getLastMessagesResponse { } )) val client = SimpleUTwinClient(rpcClient) val response = client.getLastMessages(topics) assertTrue(response.isSuccess) + assertEquals("", optionSlot.captured.token) + assertEquals(CallOptions.TIMEOUT_DEFAULT, optionSlot.captured.timeout) + assertEquals(UPriority.UPRIORITY_CS4, optionSlot.captured.priority) + } + + @Test + @DisplayName("Test calling getLastMessages() with valid topics and Data in response") + fun testGetLastMessagesWithDataInResponse() = runTest { + val topics = UUriBatch.newBuilder().addUris(topic).build() + val optionSlot = slot() + coEvery { rpcClient.invokeMethod(any(), any(), capture(optionSlot)) } returns Result.success(UPayload.pack( + getLastMessagesResponse { + this.responses.add(messageResponse { + this.topic = uUri { } + }) + } + )) + val client = SimpleUTwinClient(rpcClient) + val response = client.getLastMessages(topics) + assertTrue(response.isSuccess) + assertEquals("", optionSlot.captured.token) + assertEquals(CallOptions.TIMEOUT_DEFAULT, optionSlot.captured.timeout) + assertEquals(UPriority.UPRIORITY_CS4, optionSlot.captured.priority) + } + + @Test + @DisplayName("Test calling getLastMessages() with valid topics and CallOptions") + fun testGetLastMessagesWithCallOptions() = runTest { + val topics = UUriBatch.newBuilder().addUris(topic).build() + val options = CallOptions( + timeout = 1000, + priority = UPriority.UPRIORITY_CS2, + token = "testToken" + ) + val optionSlot = slot() + coEvery { rpcClient.invokeMethod(any(), any(), capture(optionSlot)) } returns Result.success(UPayload.pack( + getLastMessagesResponse { } + )) + val client = SimpleUTwinClient(rpcClient) + val response = client.getLastMessages(topics, options) + assertTrue(response.isSuccess) + assertEquals("testToken", optionSlot.captured.token) + assertEquals(1000, optionSlot.captured.timeout) + assertEquals(UPriority.UPRIORITY_CS2, optionSlot.captured.priority) } diff --git a/src/test/kotlin/org/eclipse/uprotocol/communication/InMemoryRpcServerTest.kt b/src/test/kotlin/org/eclipse/uprotocol/communication/InMemoryRpcServerTest.kt index add6f2f..335d963 100644 --- a/src/test/kotlin/org/eclipse/uprotocol/communication/InMemoryRpcServerTest.kt +++ b/src/test/kotlin/org/eclipse/uprotocol/communication/InMemoryRpcServerTest.kt @@ -296,7 +296,13 @@ class InMemoryRpcServerTest { server.registerRequestHandler(method, handler) - transport.send(uMessage { forNotification(transport.getSource(), method) }) + transport.send(uMessage { + forNotification(transport.getSource().copy { + resourceId = 0x8000 + }, createMethodUri().copy { + resourceId = 0 + }) + }) assertTrue(transport.receivedResponse.isEmpty()) } diff --git a/src/test/kotlin/org/eclipse/uprotocol/communication/TestUTransport.kt b/src/test/kotlin/org/eclipse/uprotocol/communication/TestUTransport.kt index 622f830..df5c128 100644 --- a/src/test/kotlin/org/eclipse/uprotocol/communication/TestUTransport.kt +++ b/src/test/kotlin/org/eclipse/uprotocol/communication/TestUTransport.kt @@ -78,6 +78,7 @@ open class TestUTransport( val validator = message.attributes.getValidator() if (validator.validate(message.attributes).isFailure()) { + println("flag1") return uStatus { code = UCode.INVALID_ARGUMENT this.message = "Invalid message attributes" @@ -94,6 +95,7 @@ open class TestUTransport( if (message.attributes.type == UMessageType.UMESSAGE_TYPE_NOTIFICATION) { sendJob = scope.launch { + println("flag2") listeners.forEach { listener -> listener.onReceive(message) } } } diff --git a/src/test/kotlin/org/eclipse/uprotocol/communication/UClientTest.kt b/src/test/kotlin/org/eclipse/uprotocol/communication/UClientTest.kt index 624462a..1ce67f4 100644 --- a/src/test/kotlin/org/eclipse/uprotocol/communication/UClientTest.kt +++ b/src/test/kotlin/org/eclipse/uprotocol/communication/UClientTest.kt @@ -69,8 +69,6 @@ class UClientTest { val listener = UListener { assertNotNull(it) } - val subscriptionChangeHandler = SubscriptionChangeHandler { _, _ -> - } client.notify(createTopic(), createDestinationUri()) client.publish(createTopic()) diff --git a/src/test/kotlin/org/eclipse/uprotocol/transport/validator/UAttributesValidatorTest.kt b/src/test/kotlin/org/eclipse/uprotocol/transport/validator/UAttributesValidatorTest.kt index 74d53a8..2960817 100644 --- a/src/test/kotlin/org/eclipse/uprotocol/transport/validator/UAttributesValidatorTest.kt +++ b/src/test/kotlin/org/eclipse/uprotocol/transport/validator/UAttributesValidatorTest.kt @@ -17,6 +17,7 @@ import io.mockk.every import io.mockk.mockk import org.eclipse.uprotocol.transport.* import org.eclipse.uprotocol.transport.validator.UAttributesValidator.Companion.getValidator +import org.eclipse.uprotocol.uuid.factory.UUIDV7 import org.eclipse.uprotocol.v1.* import org.eclipse.uprotocol.validation.ValidationResult import org.junit.jupiter.api.Assertions.* @@ -179,12 +180,17 @@ internal class UAttributesValidatorTest { @Test @DisplayName("Test validation of request message has an invalid sink attribute") fun testUAttributeValidatorRequestMissingSink() { - val message: UMessage = uMessage { - forRequest(defaultUUri, defaultUUri, 1000) + val attributes = uAttributes { + source = defaultUUri + sink = defaultUUri + ttl = 1000 + id = UUIDV7() + type = UMessageType.UMESSAGE_TYPE_REQUEST + priority = UPriority.UPRIORITY_CS4 } - val validator: UAttributesValidator = message.attributes.getValidator() - val result = validator.validate(message.attributes) + val validator: UAttributesValidator = attributes.getValidator() + val result = validator.validate(attributes) assertTrue(result.isFailure()) assertEquals(validator.toString(), "UAttributesValidator.Request") assertEquals(result.getMessage(), "Invalid Sink Uri") @@ -314,11 +320,16 @@ internal class UAttributesValidatorTest { @Test @DisplayName("Test notification validation where the sink is NOT the defaultResourceId") fun testUAttributeValidatorNotificationDefaultResourceId() { - val message: UMessage = uMessage { - forNotification(topicUUri, topicUUri) + val attributes = uAttributes { + source = topicUUri + id = UUIDV7() + sink = methodUUri + type = UMessageType.UMESSAGE_TYPE_NOTIFICATION + priority = UPriority.UPRIORITY_CS1 + } - val validator: UAttributesValidator = message.attributes.getValidator() - val result = validator.validate(message.attributes) + val validator: UAttributesValidator = attributes.getValidator() + val result = validator.validate(attributes) assertTrue(result.isFailure()) assertEquals(validator.toString(), "UAttributesValidator.Notification") @@ -404,11 +415,16 @@ internal class UAttributesValidatorTest { @Test @DisplayName("Test validateTtl of a request message where ttl is less than 0") fun testUAttributeValidatorValidateTtlLessThanZero() { - val message: UMessage = uMessage { - forRequest(defaultUUri, methodUUri, -1) + val attributes = uAttributes { + source = defaultUUri + sink = methodUUri + ttl = -1 + id = UUIDV7() + type = UMessageType.UMESSAGE_TYPE_REQUEST + priority = UPriority.UPRIORITY_CS4 } - val validator: UAttributesValidator = message.attributes.getValidator() - val result = validator.validate(message.attributes) + val validator: UAttributesValidator = attributes.getValidator() + val result = validator.validate(attributes) assertTrue(result.isFailure()) assertEquals(validator.toString(), "UAttributesValidator.Request") @@ -475,14 +491,17 @@ internal class UAttributesValidatorTest { @Test @DisplayName("Test validateSink for a response message where the sink is NOT the defaultResourceId") fun testUAttributeValidatorValidateSinkResponseDefaultResourceId() { - val request: UMessage = uMessage { - forRequest(methodUUri, defaultUUri, 1000) - } - val response: UMessage = uMessage { - forResponse(request.attributes) + val attributes = uAttributes { + source = methodUUri + sink = methodUUri + ttl = 1000 + id = UUIDV7() + type = UMessageType.UMESSAGE_TYPE_RESPONSE + reqid = UUIDV7() + priority = UPriority.UPRIORITY_CS4 } - val validator: UAttributesValidator = response.attributes.getValidator() - val result = validator.validate(response.attributes) + val validator: UAttributesValidator = attributes.getValidator() + val result = validator.validate(attributes) assertTrue(result.isFailure()) assertEquals(validator.toString(), "UAttributesValidator.Response") diff --git a/src/test/kotlin/org/eclipse/uprotocol/uri/validator/UriFilterTest.kt b/src/test/kotlin/org/eclipse/uprotocol/uri/validator/UriFilterTest.kt index b0d44d3..be52299 100644 --- a/src/test/kotlin/org/eclipse/uprotocol/uri/validator/UriFilterTest.kt +++ b/src/test/kotlin/org/eclipse/uprotocol/uri/validator/UriFilterTest.kt @@ -17,8 +17,7 @@ import org.eclipse.uprotocol.uri.factory.UUriFactory import org.eclipse.uprotocol.v1.UUri import org.eclipse.uprotocol.v1.uAttributes import org.eclipse.uprotocol.v1.uUri -import org.junit.jupiter.api.Assertions.assertFalse -import org.junit.jupiter.api.Assertions.assertTrue +import org.junit.jupiter.api.Assertions.* import org.junit.jupiter.api.DisplayName import org.junit.jupiter.api.Test @@ -136,6 +135,26 @@ class UriFilterTest { })) } + @Test + @DisplayName("Test UriFilter constructor") + fun testUriFilterConstructor() { + val uriFilter1 = UriFilter() + assertEquals(UUriFactory.ANY, uriFilter1.source) + assertEquals(UUriFactory.ANY, uriFilter1.sink) + + val uriFilter2 = UriFilter(SOURCE_URI) + assertEquals(SOURCE_URI, uriFilter2.source) + assertEquals(UUriFactory.ANY, uriFilter2.sink) + + val uriFilter3 = UriFilter(sink = SINK_URI) + assertEquals(UUriFactory.ANY, uriFilter3.source) + assertEquals(SINK_URI, uriFilter3.sink) + + val uriFilter4 = UriFilter(SOURCE_URI, SINK_URI) + assertEquals(SOURCE_URI, uriFilter4.source) + assertEquals(SINK_URI, uriFilter4.sink) + } + companion object { private val SOURCE_URI: UUri = UUri.newBuilder().setAuthorityName("source").build() private val SINK_URI: UUri = UUri.newBuilder().setAuthorityName("sink").build()