diff --git a/gradle.properties b/gradle.properties index 235f7d4c..136c0ae3 100644 --- a/gradle.properties +++ b/gradle.properties @@ -11,7 +11,7 @@ android.suppressUnsupportedOptionWarnings=android.suppressUnsupportedOptionWarni kotlin.code.style=official kotlin.mpp.stability.nowarn=true GROUP=dev.yorkie -VERSION_NAME=0.4.24-rc +VERSION_NAME=0.4.24-rc2 POM_DESCRIPTION=Document store for building collaborative editing applications. POM_INCEPTION_YEAR=2022 POM_URL=https://github.com/yorkie-team/yorkie-android-sdk diff --git a/yorkie/src/main/kotlin/dev/yorkie/core/Client.kt b/yorkie/src/main/kotlin/dev/yorkie/core/Client.kt index 9baa4cc2..1e3cf4fd 100644 --- a/yorkie/src/main/kotlin/dev/yorkie/core/Client.kt +++ b/yorkie/src/main/kotlin/dev/yorkie/core/Client.kt @@ -40,6 +40,7 @@ import dev.yorkie.util.createSingleThreadDispatcher import java.io.Closeable import java.io.InterruptedIOException import java.util.UUID +import java.util.concurrent.TimeoutException import kotlin.collections.Map.Entry import kotlin.coroutines.coroutineContext import kotlin.time.Duration @@ -75,6 +76,7 @@ import kotlinx.coroutines.launch import kotlinx.coroutines.sync.Mutex import kotlinx.coroutines.sync.withLock import kotlinx.coroutines.withContext +import kotlinx.coroutines.withTimeoutOrNull import okhttp3.OkHttpClient /** @@ -310,32 +312,39 @@ public class Client @VisibleForTesting internal constructor( while (true) { ensureActive() latestStream.safeClose() - val stream = service.watchDocument( - attachment.document.key.documentBasedRequestHeader, - ).also { - latestStream = it - } + val stream = withTimeoutOrNull(1_000) { + service.watchDocument( + attachment.document.key.documentBasedRequestHeader, + ).also { + latestStream = it + } + } ?: continue val streamJob = launch(start = CoroutineStart.UNDISPATCHED) { val channel = stream.responseChannel() - var retry = 0 while (!stream.isReceiveClosed() && !channel.isClosedForReceive) { - val receiveResult = channel.receiveCatching() - receiveResult.onSuccess { - attachment.document.publishEvent(StreamConnectionChanged.Connected) - handleWatchDocumentsResponse(attachment.document.key, it) - retry = 0 - }.onFailure { - if (receiveResult.isClosed) { - return@onFailure + withTimeoutOrNull(60_000) { + val receiveResult = channel.receiveCatching() + receiveResult.onSuccess { + attachment.document.publishEvent(StreamConnectionChanged.Connected) + handleWatchDocumentsResponse(attachment.document.key, it) + }.onFailure { + if (receiveResult.isClosed) { + stream.safeClose() + return@onFailure + } + handleWatchStreamFailure(attachment.document, stream, it) + }.onClosed { + handleWatchStreamFailure( + attachment.document, + stream, + it ?: ClosedReceiveChannelException("Channel was closed"), + ) } - retry++ - handleWatchStreamFailure(attachment.document, stream, it, retry > 3) - }.onClosed { + } ?: run { handleWatchStreamFailure( attachment.document, stream, - it ?: ClosedReceiveChannelException("Channel was closed"), - true, + TimeoutException("channel timed out"), ) } } @@ -362,12 +371,10 @@ public class Client @VisibleForTesting internal constructor( document: Document, stream: ServerOnlyStreamInterface<*, *>, cause: Throwable?, - closeStream: Boolean, ) { onWatchStreamCanceled(document) - if (closeStream) { - stream.safeClose() - } + stream.safeClose() + cause?.let(::sendWatchStreamException) coroutineContext.ensureActive() delay(options.reconnectStreamDelay.inWholeMilliseconds) @@ -409,11 +416,14 @@ public class Client @VisibleForTesting internal constructor( } private suspend fun ServerOnlyStreamInterface<*, *>?.safeClose() { - if (this == null || isReceiveClosed()) { + if (this == null) { return } withContext(NonCancellable) { - receiveClose() + runCatching { + responseChannel().cancel() + receiveClose() + } } }