Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update for up-core-api 1.5.7 change #25

Merged
merged 3 commits into from
Apr 3, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
<name>Kotlin Library for uProtocol</name>
<description>Language specific uProtocol library for building and using UUri, UUID, UAttributes, UTransport, and more
</description>
<version>1.5.6-SNAPSHOT</version>
<version>1.5.7-SNAPSHOT</version>
<packaging>jar</packaging>
<url>https://github.com/eclipse-uprotocol/up-kotlin/</url>

Expand All @@ -43,7 +43,7 @@
<cloudevents.version>2.4.2</cloudevents.version>

<protobuf.version>3.21.10</protobuf.version>
<git.tag.name>uprotocol-core-api-1.5.6</git.tag.name>
<git.tag.name>uprotocol-core-api-1.5.7</git.tag.name>

</properties>

Expand Down
4 changes: 2 additions & 2 deletions src/main/kotlin/org/eclipse/uprotocol/cloudevent/README.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -92,11 +92,11 @@ val result: UMessage = UCloudEvent.toMessage(cloudEvent)
assertNotNull(result)
assertTrue(UCloudEvent.getRequestId(cloudEvent).isPresent())
assertTrue(UCloudEvent.getTtl(cloudEvent).isPresent())
assertEquals(UCloudEvent.getTtl(cloudEvent).get(), result.getAttributes().getTtl())
assertEquals(UCloudEvent.getTtl(cloudEvent), result.getAttributes().getTtl())
assertEquals(UCloudEvent.getPayload(cloudEvent).toByteString(),result.getPayload().getValue())
assertEquals(UCloudEvent.getSource(cloudEvent),LongUriSerializer.instance().serialize(result.getSource()))
assertTrue(UCloudEvent.getPriority(cloudEvent).isPresent())
assertEquals(UCloudEvent.getPriority(cloudEvent).get(), String.valueOf(result.getAttributes().getPriority()))
assertEquals(UCloudEvent.getPriority(cloudEvent), String.valueOf(result.getAttributes().getPriority()))

val cloudEvent1: CloudEvent = UCloudEvent.fromMessage(result)
assertEquals(cloudEvent,cloudEvent1)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import com.google.protobuf.Empty
import io.cloudevents.CloudEvent
import io.cloudevents.core.builder.CloudEventBuilder
import org.eclipse.uprotocol.cloudevent.datamodel.UCloudEventAttributes
import org.eclipse.uprotocol.uri.Uri
import org.eclipse.uprotocol.uuid.factory.UUIDV8
import org.eclipse.uprotocol.uuid.serializer.LongUuidSerializer
import org.eclipse.uprotocol.v1.UMessageType
Expand All @@ -46,15 +47,15 @@ object CloudEventFactory {
/**
* Create a CloudEvent for an event for the use case of: RPC Request message.
*
* @param applicationUriForRPC The uri for the application requesting the RPC.
* @param serviceMethodUri The uri for the method to be called on the service Ex.: :/body.access/1/rpc.UpdateDoor
* @param applicationUriForRPC The [Uri] for the application requesting the RPC.
* @param serviceMethodUri The [Uri] for the method to be called on the service Ex.: :/body.access/1/rpc.UpdateDoor
* @param protoPayload Protobuf Any object with the Message command to be executed on the sink service.
* @param attributes Additional attributes such as ttl, hash, priority and token.
* @return Returns an request CloudEvent.
*/
fun request(
applicationUriForRPC: String,
serviceMethodUri: String,
applicationUriForRPC: Uri,
serviceMethodUri: Uri,
protoPayload: Any,
attributes: UCloudEventAttributes
): CloudEvent {
Expand All @@ -67,24 +68,24 @@ object CloudEventFactory {
attributes
)
.withType(UCloudEvent.getEventType(UMessageType.UMESSAGE_TYPE_REQUEST))
.withExtension("sink", URI.create(serviceMethodUri))
.withExtension("sink", URI.create(serviceMethodUri.value))
.build()
}

/**
* Create a CloudEvent for an event for the use case of: RPC Response message.
*
* @param applicationUriForRPC The destination of the response. The uri for the original application that requested the RPC and this response is for.
* @param serviceMethodUri The uri for the method that was called on the service Ex.: :/body.access/1/rpc.UpdateDoor
* @param applicationUriForRPC The destination of the response. The [Uri] for the original application that requested the RPC and this response is for.
* @param serviceMethodUri The [Uri] for the method that was called on the service Ex.: :/body.access/1/rpc.UpdateDoor
* @param requestId The cloud event id from the original request cloud event that this response if for.
* @param protoPayload The protobuf serialized response message as defined by the application interface or the
* google.rpc.Status message containing the details of an error.
* @param attributes Additional attributes such as ttl, hash and priority.
* @return Returns an response CloudEvent.
*/
fun response(
applicationUriForRPC: String,
serviceMethodUri: String,
applicationUriForRPC: Uri,
serviceMethodUri: Uri,
requestId: String,
protoPayload: Any,
attributes: UCloudEventAttributes
Expand All @@ -98,24 +99,24 @@ object CloudEventFactory {
attributes
)
.withType(UCloudEvent.getEventType(UMessageType.UMESSAGE_TYPE_RESPONSE))
.withExtension("sink", URI.create(applicationUriForRPC))
.withExtension("sink", URI.create(applicationUriForRPC.value))
.withExtension("reqid", requestId)
.build()
}

/**
* Create a CloudEvent for an event for the use case of: RPC Response message that failed.
*
* @param applicationUriForRPC The destination of the response. The uri for the original application that requested the RPC and this response is for.
* @param serviceMethodUri The uri for the method that was called on the service Ex.: :/body.access/1/rpc.UpdateDoor
* @param applicationUriForRPC The destination of the response. The [Uri] for the original application that requested the RPC and this response is for.
* @param serviceMethodUri The [Uri] for the method that was called on the service Ex.: :/body.access/1/rpc.UpdateDoor
* @param requestId The cloud event id from the original request cloud event that this response if for.
* @param communicationStatus A UCode value that indicates of a platform communication error while delivering this CloudEvent.
* @param attributes Additional attributes such as ttl, hash and priority.
* @return Returns a response CloudEvent Response for the use case of RPC Response message that failed.
*/
fun failedResponse(
applicationUriForRPC: String,
serviceMethodUri: String,
applicationUriForRPC: Uri,
serviceMethodUri: Uri,
requestId: String,
communicationStatus: Int,
attributes: UCloudEventAttributes
Expand All @@ -130,7 +131,7 @@ object CloudEventFactory {
attributes
)
.withType(UCloudEvent.getEventType(UMessageType.UMESSAGE_TYPE_RESPONSE))
.withExtension("sink", URI.create(applicationUriForRPC))
.withExtension("sink", URI.create(applicationUriForRPC.value))
.withExtension("reqid", requestId)
.withExtension("commstatus", communicationStatus)
.build()
Expand All @@ -139,12 +140,12 @@ object CloudEventFactory {
/**
* Create a CloudEvent for an event for the use case of: Publish generic message.
*
* @param source The uri of the topic being published.
* @param source The [Uri] of the topic being published.
* @param protoPayload protobuf Any object with the Message to be published.
* @param attributes Additional attributes such as ttl, hash and priority.
* @return Returns a publish CloudEvent.
*/
fun publish(source: String, protoPayload: Any, attributes: UCloudEventAttributes): CloudEvent {
fun publish(source: Uri, protoPayload: Any, attributes: UCloudEventAttributes): CloudEvent {
val id = generateCloudEventId()
return buildBaseCloudEvent(id, source, protoPayload.toByteArray(), protoPayload.typeUrl, attributes)
.withType(UCloudEvent.getEventType(UMessageType.UMESSAGE_TYPE_PUBLISH))
Expand All @@ -155,22 +156,22 @@ object CloudEventFactory {
* Create a CloudEvent for an event for the use case of: Publish a notification message.<br></br>
* A published event containing the sink (destination) is often referred to as a notification, it is an event sent to a specific consumer.
*
* @param source The uri of the topic being published.
* @param sink The uri of the destination of this notification.
* @param source The [Uri] of the topic being published.
* @param sink The [Uri] of the destination of this notification.
* @param protoPayload protobuf Any object with the Message to be published.
* @param attributes Additional attributes such as ttl, hash and priority.
* @return Returns a publish CloudEvent.
*/
fun notification(
source: String,
sink: String,
source: Uri,
sink: Uri,
protoPayload: Any,
attributes: UCloudEventAttributes
): CloudEvent {
val id = generateCloudEventId()
return buildBaseCloudEvent(id, source, protoPayload.toByteArray(), protoPayload.typeUrl, attributes)
.withType(UCloudEvent.getEventType(UMessageType.UMESSAGE_TYPE_PUBLISH))
.withExtension("sink", URI.create(sink))
.withExtension("sink", URI.create(sink.value))
.build()
}

Expand All @@ -196,14 +197,15 @@ object CloudEventFactory {
* ready to be serialized and sent to the transport layer.
*/
fun buildBaseCloudEvent(
id: String?, source: String,
id: String?,
source: Uri,
protoPayloadBytes: ByteArray,
protoPayloadSchema: String?,
attributes: UCloudEventAttributes
): CloudEventBuilder {
val cloudEventBuilder: CloudEventBuilder = CloudEventBuilder.v1()
.withId(id)
.withSource(URI.create(source)) /* Not needed:
.withSource(URI.create(source.value)) /* Not needed:
.withDataContentType(PROTOBUF_CONTENT_TYPE)
.withDataSchema(URI.create(protoPayloadSchema))
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import io.cloudevents.CloudEvent
import io.cloudevents.CloudEventData
import io.cloudevents.core.builder.CloudEventBuilder
import org.eclipse.uprotocol.UprotocolOptions
import org.eclipse.uprotocol.uri.Uri
import org.eclipse.uprotocol.uri.serializer.LongUriSerializer
import org.eclipse.uprotocol.uuid.factory.getTime
import org.eclipse.uprotocol.uuid.factory.isUuid
Expand Down Expand Up @@ -59,11 +60,11 @@ object UCloudEvent {
/**
* Extract the sink from a cloud event. The sink attribute is optional.
* @param cloudEvent CloudEvent with sink to be extracted.
* @return Returns a String value of a CloudEvent sink attribute if it exists,
* @return Returns a Uri String value of a CloudEvent sink attribute if it exists,
* otherwise Null is returned.
*/
fun getSink(cloudEvent: CloudEvent): String? {
return extractStringValueFromExtension("sink", cloudEvent)
fun getSink(cloudEvent: CloudEvent): Uri? {
return extractStringValueFromExtension("sink", cloudEvent)?.let { Uri(it) }
}

/**
Expand Down Expand Up @@ -118,7 +119,7 @@ object UCloudEvent {


/**
* Extract the string value of the trafceparent attribute from a cloud event. The traceparent attribute is optional.
* Extract the string value of the traceparent attribute from a cloud event. The traceparent attribute is optional.
* @param cloudEvent CloudEvent with traceparent to be extracted.
* @return Returns a String value of a CloudEvent traceparent if it exists,
* otherwise Null is returned.
Expand All @@ -128,17 +129,17 @@ object UCloudEvent {
}

/**
* Extract the integer value of the communication status attribute from a cloud event. The communication status attribute is optional.
* Fetch the UCode from the CloudEvent commstatus integer value. The communication status attribute is optional.
* If there was a platform communication error that occurred while delivering this cloudEvent, it will be indicated in this attribute.
* If the attribute does not exist, it is assumed that everything was UCode.OK_VALUE.
* @param cloudEvent CloudEvent with the platformError to be extracted.
* @return Returns a UCode value that indicates of a platform communication error while delivering this CloudEvent or UCode.OK_VALUE.
* @return Returns a UCode that indicates of a platform communication error while delivering this CloudEvent or UCode.OK.
*/
fun getCommunicationStatus(cloudEvent: CloudEvent): Int {
fun getCommunicationStatus(cloudEvent: CloudEvent): UCode {
return try {
extractIntegerValueFromExtension("commstatus", cloudEvent) ?: UCode.OK_VALUE
UCode.forNumber(extractIntegerValueFromExtension("commstatus", cloudEvent) ?: UCode.OK_VALUE)
} catch (e: Exception) {
UCode.OK_VALUE
UCode.OK
}
}

Expand All @@ -148,7 +149,7 @@ object UCloudEvent {
* @return returns true if the provided CloudEvent is marked with having a platform delivery problem.
*/
fun hasCommunicationStatusProblem(cloudEvent: CloudEvent): Boolean {
return getCommunicationStatus(cloudEvent) != UCode.OK_VALUE
return getCommunicationStatus(cloudEvent) != UCode.OK
}

/**
Expand Down Expand Up @@ -300,9 +301,14 @@ object UCloudEvent {
}

/**
* Get the string representation of the UMessageType
* Get the string representation of the UMessageType.
*
* Note: The UMessageType is determined by the type of the CloudEvent. If
* the UMessageType is UMESSAGE_TYPE_NOTIFICATION, we assume the CloudEvent type
* is "pub.v1" and the sink is present.
* @param type The UMessageType
* @return returns the string representation of the UMessageType
*
*/
fun getEventType(type: UMessageType): String {
return getCeName(type.valueDescriptor)
Expand Down Expand Up @@ -333,8 +339,13 @@ object UCloudEvent {
}

/**
* Get the UMessageType from the string representation
* @param type The string representation of the UMessageType
* Get the UMessageType from the string representation.
*
* Note: The UMessageType is determined by the type of the CloudEvent.
* If the CloudEvent type is "pub.v1" and the sink is present, the UMessageType is assumed to be
* UMESSAGE_TYPE_NOTIFICATION, this is because uProtocol CloudEvent definition did not have an explicit
* notification type.
*
* @return returns the UMessageType
*/
fun getMessageType(type: String): UMessageType {
Expand Down Expand Up @@ -370,7 +381,7 @@ object UCloudEvent {
priority = getUPriority(p, UPriority.UPRIORITY_UNSPECIFIED)
}

getSink(event)?.let { sink = LongUriSerializer.INSTANCE.deserialize(it) }
getSink(event)?.let { sink = LongUriSerializer.INSTANCE.deserialize(it.value) }

getRequestId(event)?.let { reqid = LongUuidSerializer.INSTANCE.deserialize(it) }

Expand Down Expand Up @@ -428,7 +439,7 @@ object UCloudEvent {
}

if (attributes.hasCommstatus()) {
builder.withExtension("commstatus", attributes.commstatus)
builder.withExtension("commstatus", attributes.commstatus.number)
}

if (attributes.hasReqid()) {
Expand Down
Loading
Loading