diff --git a/src/main/kotlin/org/eclipse/uprotocol/rpc/RpcClient.kt b/src/main/kotlin/org/eclipse/uprotocol/rpc/RpcClient.kt index 6dc25b6..cde5529 100644 --- a/src/main/kotlin/org/eclipse/uprotocol/rpc/RpcClient.kt +++ b/src/main/kotlin/org/eclipse/uprotocol/rpc/RpcClient.kt @@ -26,7 +26,6 @@ package org.eclipse.uprotocol.rpc import kotlinx.coroutines.flow.Flow import org.eclipse.uprotocol.v1.UPayload -import org.eclipse.uprotocol.v1.UAttributes import org.eclipse.uprotocol.v1.UMessage import org.eclipse.uprotocol.v1.UUri diff --git a/src/main/kotlin/org/eclipse/uprotocol/rpc/RpcMapper.kt b/src/main/kotlin/org/eclipse/uprotocol/rpc/RpcMapper.kt index 9c48d23..f5e8008 100644 --- a/src/main/kotlin/org/eclipse/uprotocol/rpc/RpcMapper.kt +++ b/src/main/kotlin/org/eclipse/uprotocol/rpc/RpcMapper.kt @@ -30,8 +30,8 @@ import com.google.protobuf.Message import kotlinx.coroutines.flow.Flow import kotlinx.coroutines.flow.catch import kotlinx.coroutines.flow.map +import org.eclipse.uprotocol.v1.UCode import org.eclipse.uprotocol.v1.UMessage -import org.eclipse.uprotocol.v1.UPayload import org.eclipse.uprotocol.v1.UStatus import java.util.concurrent.CompletionException @@ -45,30 +45,39 @@ inline fun Flow.toResponse(): Flow { return catch { exception -> throw CompletionException(exception.message, exception) }.map { message -> - if (!message.hasPayload()){ + if (!message.hasPayload()) { throw RuntimeException("Server returned a null payload. Expected [${T::class.java.name}]") } - val any = Any.parseFrom(message.payload.value) - if (any.`is`(T::class.java)) { - unpackPayload(any) - } else { - throw RuntimeException("Unknown payload type [${any.typeUrl}]. Expected [${T::class.java.name}]") + try { + val any = Any.parseFrom(message.payload.value) + if (any.`is`(T::class.java)) { + any.unpack(T::class.java) + } else { + throw RuntimeException("Unknown payload type [${any.typeUrl}]. Expected [${T::class.java.name}]") + } + } catch (e: InvalidProtocolBufferException) { + throw RuntimeException("${e.message} [${UStatus::class.java.name}]", e) } } } /** - * Inline function to unpack a payload of type [Any] into an object of type T, which is what was - * packing into the [Any] object. - * @param payload an [Any] message containing a type of expectedClazz. - * @return Returns an object of type T and of the class name specified, that was packed into the [Any] object. - * @param The message type of the object packed into the [Any]. - * */ -@PublishedApi -internal inline fun unpackPayload(payload: Any): T { - return try { - payload.unpack(T::class.java) - } catch (e: InvalidProtocolBufferException) { - throw RuntimeException("${e.message} [${UStatus::class.java.name}]", e) + * Map a response of Flow<Any> from Link into a Flow containing a Result containing + * the declared expected return type T. + * @return Returns a Flow containing an Result containing the declared expected return type T, if T is UStatus + * and has code not equals to OK, failure Result will be emitted. + * @param The declared expected return type of the RPC method. + */ +inline fun Flow.toResult(): Flow> { + return toResponse().map { response -> + response.runCatching { + if (this is UStatus && code != UCode.OK) { + throw IllegalStateException("${message}, UStatus: $code") + } else { + this + } + } + }.catch { + emit(Result.failure(it)) } } diff --git a/src/test/kotlin/org/eclipse/uprotocol/rpc/RpcTest.kt b/src/test/kotlin/org/eclipse/uprotocol/rpc/RpcTest.kt index cad53f7..7508fee 100644 --- a/src/test/kotlin/org/eclipse/uprotocol/rpc/RpcTest.kt +++ b/src/test/kotlin/org/eclipse/uprotocol/rpc/RpcTest.kt @@ -23,15 +23,13 @@ package org.eclipse.uprotocol.rpc import com.google.protobuf.Any import com.google.protobuf.Int32Value -import com.google.protobuf.InvalidProtocolBufferException import com.google.protobuf.kotlin.toByteString import io.cloudevents.v1.proto.CloudEvent import kotlinx.coroutines.flow.* import kotlinx.coroutines.test.runTest import org.eclipse.uprotocol.uri.serializer.LongUriSerializer import org.eclipse.uprotocol.v1.* -import org.junit.jupiter.api.Assertions.assertEquals -import org.junit.jupiter.api.Assertions.assertThrows +import org.junit.jupiter.api.Assertions.* import org.junit.jupiter.api.DisplayName import org.junit.jupiter.api.Test import java.util.concurrent.CompletionException @@ -51,7 +49,7 @@ internal class RpcTest { } private var happyPath: RpcClient = object : RpcClient { override fun invokeMethod(methodUri: UUri, requestPayload: UPayload, options: CallOptions): Flow { - return flowOf(buildUMessage()) + return flowOf(testUMessage) } } private var withStatusCodeInsteadOfHappyPath: RpcClient = object : RpcClient { @@ -94,7 +92,7 @@ internal class RpcTest { }) } } - private var thatCompletesWithAnException: RpcClient = object : RpcClient { + private var thatFlowWithAnException: RpcClient = object : RpcClient { override fun invokeMethod(methodUri: UUri, requestPayload: UPayload, options: CallOptions): Flow { return flow { throw RuntimeException("Boom") @@ -124,10 +122,10 @@ internal class RpcTest { fun test_compose_happy_path() = runTest { val payload: UPayload = buildUPayload() returnsNumber3.invokeMethod(buildTopic(), payload, buildUCallOptions()).toResponse().map { - Int32Value.of(it.value + 5) - }.first().run { - assertEquals(Int32Value.of(8), this) - } + Int32Value.of(it.value + 5) + }.first().run { + assertEquals(Int32Value.of(8), this) + } } @Test @@ -150,8 +148,8 @@ internal class RpcTest { fun test_success_invoke_method_happy_flow_using_toResponse() = runTest { val payload: UPayload = buildUPayload() happyPath.invokeMethod(buildTopic(), payload, buildUCallOptions()).toResponse().first().run { - assertEquals(buildCloudEvent(), this) - } + assertEquals(buildCloudEvent(), this) + } } @Test @@ -176,7 +174,7 @@ internal class RpcTest { fun test_fail_invoke_method_when_invoke_method_threw_an_exception_using_toResponse() = runTest { val payload: UPayload = buildUPayload() try { - thatCompletesWithAnException.invokeMethod(buildTopic(), payload, buildUCallOptions()) + thatFlowWithAnException.invokeMethod(buildTopic(), payload, buildUCallOptions()) .toResponse().first() fail("should not reach here") } catch (e: Exception) { @@ -228,11 +226,11 @@ internal class RpcTest { .first() fail("should not reach here") } catch (e: Exception) { - assertThrows(InvalidProtocolBufferException::class.java) { + assertThrows(RuntimeException::class.java) { throw e } assertEquals( - "Protocol message contained an invalid tag (zero).", + "Protocol message contained an invalid tag (zero). [org.eclipse.uprotocol.v1.UStatus]", e.message, ) } @@ -242,7 +240,8 @@ internal class RpcTest { fun test_no_payload_in_umessage() = runTest { val payload: UPayload = buildUPayload() try { - thatReturnsUMessageWithoutPayload.invokeMethod(buildTopic(), payload, buildUCallOptions()).toResponse() + thatReturnsUMessageWithoutPayload.invokeMethod(buildTopic(), payload, buildUCallOptions()) + .toResponse() .first() fail("should not reach here") } catch (e: Exception) { @@ -256,6 +255,99 @@ internal class RpcTest { } } + + @Test + fun test_toResult_nonUStatus_happy_path() = runTest { + val payload = buildUPayload() + returnsNumber3.invokeMethod(buildTopic(), payload, buildUCallOptions()).toResult().first().run { + assertTrue(isSuccess) + assertEquals(Int32Value.of(3), getOrNull()) + } + } + + @Test + fun test_toResult_uStatus_happy_path() = runTest { + val payload = buildUPayload() + withStatusCodeHappyPath.invokeMethod(buildTopic(), payload, buildUCallOptions()).toResult().first() + .run { + assertTrue(isSuccess) + assertEquals(UCode.OK, getOrNull()?.code) + assertEquals("all good", getOrNull()?.message) + + } + } + + @Test + fun test_toResult_uStatus_not_ok() = runTest { + val payload = buildUPayload() + withStatusCodeInsteadOfHappyPath.invokeMethod(buildTopic(), payload, buildUCallOptions()).toResult() + .first().run { + assertTrue(isFailure) + assertTrue(exceptionOrNull() is IllegalStateException) + assertEquals("boom, UStatus: INVALID_ARGUMENT", exceptionOrNull()?.message) + } + } + + @Test + fun test_toResult_no_payload_in_umessage() = runTest { + val payload: UPayload = buildUPayload() + thatReturnsUMessageWithoutPayload.invokeMethod(buildTopic(), payload, buildUCallOptions()) + .toResult() + .first().run { + assertTrue(isFailure) + assertTrue(exceptionOrNull() is RuntimeException) + assertEquals( + "Server returned a null payload. Expected [io.cloudevents.v1.proto.CloudEvent]", + exceptionOrNull()?.message + ) + } + } + + + @Test + fun test_fail_invoke_method_when_invoke_method_threw_an_exception_using_toResult() = runTest { + val payload: UPayload = buildUPayload() + thatFlowWithAnException.invokeMethod(buildTopic(), payload, buildUCallOptions()) + .toResult().first().run { + assertTrue(isFailure) + assertTrue(exceptionOrNull() is CompletionException) + assertEquals( + "Boom", + exceptionOrNull()?.message + ) + } + } + + @Test + fun test_fail_invoke_method_when_invoke_method_returns_a_status_using_toResult() = runTest { + val payload: UPayload = buildUPayload() + withStatusCodeInsteadOfHappyPath.invokeMethod(buildTopic(), payload, buildUCallOptions()) + .toResult().first().run { + assertTrue(isFailure) + assertTrue(exceptionOrNull() is RuntimeException) + assertEquals( + "Unknown payload type [type.googleapis.com/uprotocol.v1.UStatus]. Expected [io.cloudevents.v1.proto.CloudEvent]", + exceptionOrNull()?.message + ) + } + } + + @Test + @DisplayName("test toResult with invalid payload that is not of type any") + fun test_invalid_payload_that_is_not_type_any_toResult() = runTest { + val payload: UPayload = buildUPayload() + thatBarfsCrapyPayload.invokeMethod(buildTopic(), payload, buildUCallOptions()) + .toResult().first().run { + assertTrue(isFailure) + assertTrue(exceptionOrNull() is RuntimeException) + assertEquals( + "Protocol message contained an invalid tag (zero). [org.eclipse.uprotocol.v1.UStatus]", + exceptionOrNull()?.message + ) + } + + } + private fun buildCloudEvent(): CloudEvent { return CloudEvent.newBuilder().setSpecVersion("1.0").setId("HARTLEY IS THE BEST") .setSource("https://example.com").build() @@ -269,10 +361,8 @@ internal class RpcTest { } } - private fun buildUMessage(): UMessage { - return uMessage { - payload = buildUPayload() - } + private val testUMessage = uMessage { + payload = buildUPayload() } private fun buildTopic(): UUri {