Skip to content

Commit

Permalink
Add missing Unit tests.
Browse files Browse the repository at this point in the history
  • Loading branch information
dddj698 committed Aug 6, 2024
1 parent 43328de commit c8e4053
Show file tree
Hide file tree
Showing 9 changed files with 516 additions and 108 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,8 @@ import java.util.logging.Logger
*/
class InMemoryUSubscriptionClient(
private val transport: UTransport,
private val rpcClient: RpcClient,
private val notifier: Notifier,
private val rpcClient: RpcClient = InMemoryRpcClient(transport),
private val notifier: Notifier = SimpleNotifier(transport),
dispatcher: CoroutineDispatcher = Dispatchers.IO
) : USubscriptionClient {
// Map to store subscription change notification handlers
Expand All @@ -47,14 +47,6 @@ class InMemoryUSubscriptionClient(

private val mutex = Mutex()

/**
* Creates a new USubscription client passing [UTransport]
* used to provide additional options for the RPC requests to uSubscription service.
*
* @param transport the transport to use for sending the notifications
*/
constructor(transport: UTransport):this(transport, InMemoryRpcClient(transport), SimpleNotifier(transport))

// transport Notification listener that will process subscription change notifications
private val mNotificationListener = UListener { message ->
handleNotifications(message)
Expand Down Expand Up @@ -214,8 +206,17 @@ class InMemoryUSubscriptionClient(
}
return rpcClient.invokeMethod(REGISTER_NOTIFICATIONS_METHOD, UPayload.pack(request), options)
.mapToMessage<NotificationsResponse>().mapCatching { response ->
mHandlers[topic]?.takeIf { it != handler}?.let {
throw UStatusException(UCode.ALREADY_EXISTS, "Handler already registered")
handler?.let { handler->
mutex.withLock {
mHandlers[topic]?.let {
if (it != handler){
throw UStatusException(UCode.ALREADY_EXISTS, "Handler already registered")
}
it
}?:run {
mHandlers[topic] = handler
}
}
}
response
}
Expand All @@ -226,15 +227,13 @@ class InMemoryUSubscriptionClient(
*
* @param topic The topic to unregister for notifications.
* @param options The [CallOptions] to be used for the request.
* @param handler The [SubscriptionChangeHandler] to be unregistered.
* @return [Result] with [NotificationsResponse] if uSubscription service accepts the
* request to unregister the caller to be notified of subscription changes, or
* [Result] with [UStatusException] that indicates the reason for the failure.
*/
override suspend fun unregisterForNotifications(
topic: UUri,
options: CallOptions,
handler: SubscriptionChangeHandler?
options: CallOptions
): Result<NotificationsResponse> {
val request = notificationsRequest {
this.topic = topic
Expand Down Expand Up @@ -304,6 +303,7 @@ class InMemoryUSubscriptionClient(
* @param message The notification message from the USubscription service
*/
private fun handleNotifications(message: UMessage) {
println("msg")
// Ignore messages that are not notifications
message.takeIf { it.attributes.type == UMessageType.UMESSAGE_TYPE_NOTIFICATION }?.let { msg ->
// Unpack the notification message from uSubscription called Update
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,15 +107,13 @@ interface USubscriptionClient {
*
* @param topic The topic to unregister for notifications.
* @param options The [CallOptions] to be used for the request.
* @param handler The [SubscriptionChangeHandler] to be unregistered.
* @return [Result] with [NotificationsResponse] if uSubscription service accepts the
* request to unregister the caller to be notified of subscription changes, or
* [Result] with [UStatusException] that indicates the reason for the failure.
*/
suspend fun unregisterForNotifications(
topic: UUri,
options: CallOptions = CallOptions(),
handler: SubscriptionChangeHandler? = null
options: CallOptions = CallOptions()
): Result<NotificationsResponse>

/**
Expand Down
Loading

0 comments on commit c8e4053

Please sign in to comment.