Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/main' into uprotocol1.5.6_bring_up
Browse files Browse the repository at this point in the history
# Conflicts:
#	src/main/kotlin/org/eclipse/uprotocol/rpc/RpcClient.kt
#	src/main/kotlin/org/eclipse/uprotocol/rpc/RpcMapper.kt
#	src/test/kotlin/org/eclipse/uprotocol/rpc/RpcTest.kt
  • Loading branch information
dddj698 committed Mar 1, 2024
2 parents f715756 + f9a2cff commit b2e1c77
Show file tree
Hide file tree
Showing 3 changed files with 137 additions and 39 deletions.
1 change: 0 additions & 1 deletion src/main/kotlin/org/eclipse/uprotocol/rpc/RpcClient.kt
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ package org.eclipse.uprotocol.rpc

import kotlinx.coroutines.flow.Flow
import org.eclipse.uprotocol.v1.UPayload
import org.eclipse.uprotocol.v1.UAttributes
import org.eclipse.uprotocol.v1.UMessage
import org.eclipse.uprotocol.v1.UUri

Expand Down
47 changes: 28 additions & 19 deletions src/main/kotlin/org/eclipse/uprotocol/rpc/RpcMapper.kt
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,8 @@ import com.google.protobuf.Message
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.catch
import kotlinx.coroutines.flow.map
import org.eclipse.uprotocol.v1.UCode
import org.eclipse.uprotocol.v1.UMessage
import org.eclipse.uprotocol.v1.UPayload
import org.eclipse.uprotocol.v1.UStatus
import java.util.concurrent.CompletionException

Expand All @@ -45,30 +45,39 @@ inline fun <reified T : Message> Flow<UMessage>.toResponse(): Flow<T> {
return catch { exception ->
throw CompletionException(exception.message, exception)
}.map { message ->
if (!message.hasPayload()){
if (!message.hasPayload()) {
throw RuntimeException("Server returned a null payload. Expected [${T::class.java.name}]")
}
val any = Any.parseFrom(message.payload.value)
if (any.`is`(T::class.java)) {
unpackPayload(any)
} else {
throw RuntimeException("Unknown payload type [${any.typeUrl}]. Expected [${T::class.java.name}]")
try {
val any = Any.parseFrom(message.payload.value)
if (any.`is`(T::class.java)) {
any.unpack(T::class.java)
} else {
throw RuntimeException("Unknown payload type [${any.typeUrl}]. Expected [${T::class.java.name}]")
}
} catch (e: InvalidProtocolBufferException) {
throw RuntimeException("${e.message} [${UStatus::class.java.name}]", e)
}
}
}

/**
* Inline function to unpack a payload of type [Any] into an object of type T, which is what was
* packing into the [Any] object.
* @param payload an [Any] message containing a type of expectedClazz.
* @return Returns an object of type T and of the class name specified, that was packed into the [Any] object.
* @param <T> The message type of the object packed into the [Any].
* </T> */
@PublishedApi
internal inline fun <reified T : Message> unpackPayload(payload: Any): T {
return try {
payload.unpack(T::class.java)
} catch (e: InvalidProtocolBufferException) {
throw RuntimeException("${e.message} [${UStatus::class.java.name}]", e)
* Map a response of Flow&lt;Any&gt; from Link into a Flow containing a Result containing
* the declared expected return type T.
* @return Returns a Flow containing an Result containing the declared expected return type T, if T is UStatus
* and has code not equals to OK, failure Result will be emitted.
* @param <T> The declared expected return type of the RPC method.
*/
inline fun <reified T : Message> Flow<UMessage>.toResult(): Flow<Result<T>> {
return toResponse<T>().map { response ->
response.runCatching {
if (this is UStatus && code != UCode.OK) {
throw IllegalStateException("${message}, UStatus: $code")
} else {
this
}
}
}.catch {
emit(Result.failure(it))
}
}
128 changes: 109 additions & 19 deletions src/test/kotlin/org/eclipse/uprotocol/rpc/RpcTest.kt
Original file line number Diff line number Diff line change
Expand Up @@ -23,15 +23,13 @@ package org.eclipse.uprotocol.rpc

import com.google.protobuf.Any
import com.google.protobuf.Int32Value
import com.google.protobuf.InvalidProtocolBufferException
import com.google.protobuf.kotlin.toByteString
import io.cloudevents.v1.proto.CloudEvent
import kotlinx.coroutines.flow.*
import kotlinx.coroutines.test.runTest
import org.eclipse.uprotocol.uri.serializer.LongUriSerializer
import org.eclipse.uprotocol.v1.*
import org.junit.jupiter.api.Assertions.assertEquals
import org.junit.jupiter.api.Assertions.assertThrows
import org.junit.jupiter.api.Assertions.*
import org.junit.jupiter.api.DisplayName
import org.junit.jupiter.api.Test
import java.util.concurrent.CompletionException
Expand All @@ -51,7 +49,7 @@ internal class RpcTest {
}
private var happyPath: RpcClient = object : RpcClient {
override fun invokeMethod(methodUri: UUri, requestPayload: UPayload, options: CallOptions): Flow<UMessage> {
return flowOf(buildUMessage())
return flowOf(testUMessage)
}
}
private var withStatusCodeInsteadOfHappyPath: RpcClient = object : RpcClient {
Expand Down Expand Up @@ -94,7 +92,7 @@ internal class RpcTest {
})
}
}
private var thatCompletesWithAnException: RpcClient = object : RpcClient {
private var thatFlowWithAnException: RpcClient = object : RpcClient {
override fun invokeMethod(methodUri: UUri, requestPayload: UPayload, options: CallOptions): Flow<UMessage> {
return flow {
throw RuntimeException("Boom")
Expand Down Expand Up @@ -124,10 +122,10 @@ internal class RpcTest {
fun test_compose_happy_path() = runTest {
val payload: UPayload = buildUPayload()
returnsNumber3.invokeMethod(buildTopic(), payload, buildUCallOptions()).toResponse<Int32Value>().map {
Int32Value.of(it.value + 5)
}.first().run {
assertEquals(Int32Value.of(8), this)
}
Int32Value.of(it.value + 5)
}.first().run {
assertEquals(Int32Value.of(8), this)
}
}

@Test
Expand All @@ -150,8 +148,8 @@ internal class RpcTest {
fun test_success_invoke_method_happy_flow_using_toResponse() = runTest {
val payload: UPayload = buildUPayload()
happyPath.invokeMethod(buildTopic(), payload, buildUCallOptions()).toResponse<CloudEvent>().first().run {
assertEquals(buildCloudEvent(), this)
}
assertEquals(buildCloudEvent(), this)
}
}

@Test
Expand All @@ -176,7 +174,7 @@ internal class RpcTest {
fun test_fail_invoke_method_when_invoke_method_threw_an_exception_using_toResponse() = runTest {
val payload: UPayload = buildUPayload()
try {
thatCompletesWithAnException.invokeMethod(buildTopic(), payload, buildUCallOptions())
thatFlowWithAnException.invokeMethod(buildTopic(), payload, buildUCallOptions())
.toResponse<CloudEvent>().first()
fail("should not reach here")
} catch (e: Exception) {
Expand Down Expand Up @@ -228,11 +226,11 @@ internal class RpcTest {
.first()
fail("should not reach here")
} catch (e: Exception) {
assertThrows(InvalidProtocolBufferException::class.java) {
assertThrows(RuntimeException::class.java) {
throw e
}
assertEquals(
"Protocol message contained an invalid tag (zero).",
"Protocol message contained an invalid tag (zero). [org.eclipse.uprotocol.v1.UStatus]",
e.message,
)
}
Expand All @@ -242,7 +240,8 @@ internal class RpcTest {
fun test_no_payload_in_umessage() = runTest {
val payload: UPayload = buildUPayload()
try {
thatReturnsUMessageWithoutPayload.invokeMethod(buildTopic(), payload, buildUCallOptions()).toResponse<CloudEvent>()
thatReturnsUMessageWithoutPayload.invokeMethod(buildTopic(), payload, buildUCallOptions())
.toResponse<CloudEvent>()
.first()
fail("should not reach here")
} catch (e: Exception) {
Expand All @@ -256,6 +255,99 @@ internal class RpcTest {
}
}


@Test
fun test_toResult_nonUStatus_happy_path() = runTest {
val payload = buildUPayload()
returnsNumber3.invokeMethod(buildTopic(), payload, buildUCallOptions()).toResult<Int32Value>().first().run {
assertTrue(isSuccess)
assertEquals(Int32Value.of(3), getOrNull())
}
}

@Test
fun test_toResult_uStatus_happy_path() = runTest {
val payload = buildUPayload()
withStatusCodeHappyPath.invokeMethod(buildTopic(), payload, buildUCallOptions()).toResult<UStatus>().first()
.run {
assertTrue(isSuccess)
assertEquals(UCode.OK, getOrNull()?.code)
assertEquals("all good", getOrNull()?.message)

}
}

@Test
fun test_toResult_uStatus_not_ok() = runTest {
val payload = buildUPayload()
withStatusCodeInsteadOfHappyPath.invokeMethod(buildTopic(), payload, buildUCallOptions()).toResult<UStatus>()
.first().run {
assertTrue(isFailure)
assertTrue(exceptionOrNull() is IllegalStateException)
assertEquals("boom, UStatus: INVALID_ARGUMENT", exceptionOrNull()?.message)
}
}

@Test
fun test_toResult_no_payload_in_umessage() = runTest {
val payload: UPayload = buildUPayload()
thatReturnsUMessageWithoutPayload.invokeMethod(buildTopic(), payload, buildUCallOptions())
.toResult<CloudEvent>()
.first().run {
assertTrue(isFailure)
assertTrue(exceptionOrNull() is RuntimeException)
assertEquals(
"Server returned a null payload. Expected [io.cloudevents.v1.proto.CloudEvent]",
exceptionOrNull()?.message
)
}
}


@Test
fun test_fail_invoke_method_when_invoke_method_threw_an_exception_using_toResult() = runTest {
val payload: UPayload = buildUPayload()
thatFlowWithAnException.invokeMethod(buildTopic(), payload, buildUCallOptions())
.toResult<CloudEvent>().first().run {
assertTrue(isFailure)
assertTrue(exceptionOrNull() is CompletionException)
assertEquals(
"Boom",
exceptionOrNull()?.message
)
}
}

@Test
fun test_fail_invoke_method_when_invoke_method_returns_a_status_using_toResult() = runTest {
val payload: UPayload = buildUPayload()
withStatusCodeInsteadOfHappyPath.invokeMethod(buildTopic(), payload, buildUCallOptions())
.toResult<CloudEvent>().first().run {
assertTrue(isFailure)
assertTrue(exceptionOrNull() is RuntimeException)
assertEquals(
"Unknown payload type [type.googleapis.com/uprotocol.v1.UStatus]. Expected [io.cloudevents.v1.proto.CloudEvent]",
exceptionOrNull()?.message
)
}
}

@Test
@DisplayName("test toResult with invalid payload that is not of type any")
fun test_invalid_payload_that_is_not_type_any_toResult() = runTest {
val payload: UPayload = buildUPayload()
thatBarfsCrapyPayload.invokeMethod(buildTopic(), payload, buildUCallOptions())
.toResult<CloudEvent>().first().run {
assertTrue(isFailure)
assertTrue(exceptionOrNull() is RuntimeException)
assertEquals(
"Protocol message contained an invalid tag (zero). [org.eclipse.uprotocol.v1.UStatus]",
exceptionOrNull()?.message
)
}

}

private fun buildCloudEvent(): CloudEvent {
return CloudEvent.newBuilder().setSpecVersion("1.0").setId("HARTLEY IS THE BEST")
.setSource("https://example.com").build()
Expand All @@ -269,10 +361,8 @@ internal class RpcTest {
}
}

private fun buildUMessage(): UMessage {
return uMessage {
payload = buildUPayload()
}
private val testUMessage = uMessage {
payload = buildUPayload()
}

private fun buildTopic(): UUri {
Expand Down

0 comments on commit b2e1c77

Please sign in to comment.