From c05c943327f004b0dfb2f55a11452eaf716cef61 Mon Sep 17 00:00:00 2001 From: Hyowoo Kim Date: Tue, 23 Jan 2024 17:04:48 +0900 Subject: [PATCH] Enable users to choose which dispatchers Client and Document should use (#11) --- .../kotlin/dev/yorkie/core/ClientTest.kt | 20 +++++++++++ .../kotlin/dev/yorkie/core/DocumentTest.kt | 9 +++++ .../kotlin/dev/yorkie/core/GCTest.kt | 36 ++++++++++++++----- .../kotlin/dev/yorkie/core/PresenceTest.kt | 24 +++++++++++++ .../kotlin/dev/yorkie/core/TestUtils.kt | 5 +++ .../src/main/kotlin/dev/yorkie/core/Client.kt | 22 +++++++++--- .../kotlin/dev/yorkie/document/Document.kt | 19 ++++++++-- .../test/kotlin/dev/yorkie/core/ClientTest.kt | 13 ++++++- .../dev/yorkie/document/DocumentTest.kt | 6 ++++ 9 files changed, 138 insertions(+), 16 deletions(-) diff --git a/yorkie/src/androidTest/kotlin/dev/yorkie/core/ClientTest.kt b/yorkie/src/androidTest/kotlin/dev/yorkie/core/ClientTest.kt index 41d96e141..cd491245d 100644 --- a/yorkie/src/androidTest/kotlin/dev/yorkie/core/ClientTest.kt +++ b/yorkie/src/androidTest/kotlin/dev/yorkie/core/ClientTest.kt @@ -155,6 +155,10 @@ class ClientTest { client2.detachAsync(document2).await() client1.deactivateAsync().await() client2.deactivateAsync().await() + document1.close() + document2.close() + client1.close() + client2.close() collectJobs.forEach(Job::cancel) } @@ -230,6 +234,10 @@ class ClientTest { client2.detachAsync(document2).await() client1.deactivateAsync().await() client2.deactivateAsync().await() + document1.close() + document2.close() + client1.close() + client2.close() } } @@ -294,6 +302,11 @@ class ClientTest { client1.deactivateAsync().await() client2.deactivateAsync().await() + + document1.close() + document2.close() + client1.close() + client2.close() collectJob.cancel() } @@ -362,6 +375,10 @@ class ClientTest { client1.deactivateAsync().await() client2.deactivateAsync().await() client3.deactivateAsync().await() + document1.close() + document2.close() + client1.close() + client2.close() } } @@ -504,6 +521,9 @@ class ClientTest { client.detachAsync(document).await() client.deactivateAsync().await() + + document.close() + client.close() } } } diff --git a/yorkie/src/androidTest/kotlin/dev/yorkie/core/DocumentTest.kt b/yorkie/src/androidTest/kotlin/dev/yorkie/core/DocumentTest.kt index eebb1cc2f..92f5ee728 100644 --- a/yorkie/src/androidTest/kotlin/dev/yorkie/core/DocumentTest.kt +++ b/yorkie/src/androidTest/kotlin/dev/yorkie/core/DocumentTest.kt @@ -71,6 +71,8 @@ class DocumentTest { } client.deactivateAsync().await() + document.close() + client.close() } } @@ -105,6 +107,10 @@ class DocumentTest { client1.deactivateAsync().await() client2.deactivateAsync().await() + document1.close() + document2.close() + client1.close() + client2.close() } } @@ -221,6 +227,9 @@ class DocumentTest { } client.deactivateAsync().await() + + document.close() + client.close() } } diff --git a/yorkie/src/androidTest/kotlin/dev/yorkie/core/GCTest.kt b/yorkie/src/androidTest/kotlin/dev/yorkie/core/GCTest.kt index 4cf20f7e9..10d63f9cf 100644 --- a/yorkie/src/androidTest/kotlin/dev/yorkie/core/GCTest.kt +++ b/yorkie/src/androidTest/kotlin/dev/yorkie/core/GCTest.kt @@ -10,7 +10,6 @@ import dev.yorkie.document.json.JsonTree import dev.yorkie.document.json.JsonTree.TextNode import dev.yorkie.document.json.TreeBuilder.element import dev.yorkie.document.json.TreeBuilder.text -import dev.yorkie.document.time.TimeTicket import dev.yorkie.document.time.TimeTicket.Companion.MaxTimeTicket import dev.yorkie.gson import dev.yorkie.util.IndexTreeNode @@ -46,8 +45,10 @@ class GCTest { }.await() assertJsonContentEquals("""{"1":1,"3":3}""", document.toJson()) assertEquals(4, document.garbageLength) - assertEquals(4, document.garbageCollect(TimeTicket.MaxTimeTicket)) + assertEquals(4, document.garbageCollect(MaxTimeTicket)) assertEquals(0, document.garbageLength) + + document.close() } } @@ -70,8 +71,10 @@ class GCTest { document.toJson(), ) assertEquals(1, document.garbageLength) - assertEquals(1, document.garbageCollect(TimeTicket.MaxTimeTicket)) + assertEquals(1, document.garbageCollect(MaxTimeTicket)) assertEquals(0, document.garbageLength) + + document.close() } } @@ -107,8 +110,10 @@ class GCTest { document.toJson(), ) assertEquals(4, document.garbageLength) - assertEquals(4, document.garbageCollect(TimeTicket.MaxTimeTicket)) + assertEquals(4, document.garbageCollect(MaxTimeTicket)) assertEquals(0, document.garbageLength) + + document.close() } } @@ -146,7 +151,7 @@ class GCTest { var nodeLengthBeforeGC = getNodeLength(document.getRoot().getAs("t").indexTree.root) assertEquals(2, document.garbageLength) - assertEquals(2, document.garbageCollect(TimeTicket.MaxTimeTicket)) + assertEquals(2, document.garbageCollect(MaxTimeTicket)) assertEquals(0, document.garbageLength) var nodeLengthAfterGC = getNodeLength(document.getRoot().getAs("t").indexTree.root) @@ -168,7 +173,7 @@ class GCTest { nodeLengthBeforeGC = getNodeLength(document.getRoot().getAs("t").indexTree.root) assertEquals(1, document.garbageLength) - assertEquals(1, document.garbageCollect(TimeTicket.MaxTimeTicket)) + assertEquals(1, document.garbageCollect(MaxTimeTicket)) assertEquals(0, document.garbageLength) nodeLengthAfterGC = getNodeLength(document.getRoot().getAs("t").indexTree.root) @@ -191,11 +196,13 @@ class GCTest { getNodeLength(document.getRoot().getAs("t").indexTree.root) assertEquals(5, document.garbageLength) - assertEquals(5, document.garbageCollect(TimeTicket.MaxTimeTicket)) + assertEquals(5, document.garbageCollect(MaxTimeTicket)) assertEquals(0, document.garbageLength) nodeLengthAfterGC = getNodeLength(document.getRoot().getAs("t").indexTree.root) assertEquals(5, nodeLengthBeforeGC - nodeLengthAfterGC) + + document.close() } } @@ -456,7 +463,7 @@ class GCTest { }.await() assertJsonContentEquals("""{"1":1, "3":3}""", document.toJson()) assertEquals(4, document.garbageLength) - assertEquals(0, document.garbageCollect(TimeTicket.MaxTimeTicket)) + assertEquals(0, document.garbageCollect(MaxTimeTicket)) assertEquals(4, document.garbageLength) } @@ -489,6 +496,9 @@ class GCTest { assertEquals(3, document.garbageLength) assertEquals(3, document.garbageLengthFromClone) + + document.close() + client.close() } @Test @@ -545,6 +555,11 @@ class GCTest { c1.deactivateAsync().await() c2.deactivateAsync().await() + + d1.close() + d2.close() + c1.close() + c2.close() } } @@ -600,6 +615,11 @@ class GCTest { c1.deactivateAsync().await() c2.deactivateAsync().await() + + d1.close() + d2.close() + c1.close() + c2.close() } } diff --git a/yorkie/src/androidTest/kotlin/dev/yorkie/core/PresenceTest.kt b/yorkie/src/androidTest/kotlin/dev/yorkie/core/PresenceTest.kt index c8f505c34..91094a6cc 100644 --- a/yorkie/src/androidTest/kotlin/dev/yorkie/core/PresenceTest.kt +++ b/yorkie/src/androidTest/kotlin/dev/yorkie/core/PresenceTest.kt @@ -175,6 +175,10 @@ class PresenceTest { c2.detachAsync(d2).await() c1.deactivateAsync().await() c2.deactivateAsync().await() + d1.close() + d2.close() + c1.close() + c2.close() } } @@ -285,6 +289,10 @@ class PresenceTest { c2.detachAsync(d2).await() c1.deactivateAsync().await() c2.deactivateAsync().await() + d1.close() + d2.close() + c1.close() + c2.close() } } @@ -343,6 +351,10 @@ class PresenceTest { c1.detachAsync(d1).await() c1.deactivateAsync().await() c2.deactivateAsync().await() + d1.close() + d2.close() + c1.close() + c2.close() } } @@ -429,6 +441,12 @@ class PresenceTest { c1.deactivateAsync().await() c2.deactivateAsync().await() c3.deactivateAsync().await() + d1.close() + d2.close() + d3.close() + c1.close() + c2.close() + c3.close() } } @@ -617,6 +635,12 @@ class PresenceTest { c1.deactivateAsync().await() c2.deactivateAsync().await() c3.deactivateAsync().await() + d1.close() + d2.close() + d3.close() + c1.close() + c2.close() + c3.close() } } diff --git a/yorkie/src/androidTest/kotlin/dev/yorkie/core/TestUtils.kt b/yorkie/src/androidTest/kotlin/dev/yorkie/core/TestUtils.kt index 67ba636be..9363a962d 100644 --- a/yorkie/src/androidTest/kotlin/dev/yorkie/core/TestUtils.kt +++ b/yorkie/src/androidTest/kotlin/dev/yorkie/core/TestUtils.kt @@ -58,5 +58,10 @@ fun withTwoClientsAndDocuments( } client1.deactivateAsync().await() client2.deactivateAsync().await() + + document1.close() + document2.close() + client1.close() + client2.close() } } diff --git a/yorkie/src/main/kotlin/dev/yorkie/core/Client.kt b/yorkie/src/main/kotlin/dev/yorkie/core/Client.kt index 35413ccb5..8cf1a5da0 100644 --- a/yorkie/src/main/kotlin/dev/yorkie/core/Client.kt +++ b/yorkie/src/main/kotlin/dev/yorkie/core/Client.kt @@ -30,14 +30,17 @@ import io.grpc.Channel import io.grpc.Metadata import io.grpc.StatusException import io.grpc.android.AndroidChannelBuilder +import java.io.Closeable import java.util.UUID import kotlin.time.Duration import kotlin.time.Duration.Companion.milliseconds +import kotlinx.coroutines.CoroutineDispatcher import kotlinx.coroutines.CoroutineScope import kotlinx.coroutines.Deferred import kotlinx.coroutines.Job import kotlinx.coroutines.SupervisorJob import kotlinx.coroutines.async +import kotlinx.coroutines.cancel import kotlinx.coroutines.cancelChildren import kotlinx.coroutines.delay import kotlinx.coroutines.flow.Flow @@ -56,15 +59,17 @@ import kotlinx.coroutines.launch * Client that can communicate with the server. * It has [Document]s and sends changes of the documents in local * to the server to synchronize with other replicas in remote. + * + * A single-threaded, [Closeable] [dispatcher] is used as default. + * Therefore you need to [close] the client, when the client is no longer needed. + * If you provide your own [dispatcher], it is up to you to decide [close] is needed or not. */ public class Client @VisibleForTesting internal constructor( private val channel: Channel, private val options: Options = Options(), -) { - private val scope = CoroutineScope( - SupervisorJob() + - createSingleThreadDispatcher("Client(${options.key})"), - ) + private val dispatcher: CoroutineDispatcher, +) : Closeable { + private val scope = CoroutineScope(SupervisorJob() + dispatcher) private val activationJob = SupervisorJob() private val eventStream = MutableSharedFlow() @@ -103,6 +108,7 @@ public class Client @VisibleForTesting internal constructor( rpcHost: String, rpcPort: Int, options: Options = Options(), + dispatcher: CoroutineDispatcher = createSingleThreadDispatcher("Client(${options.key})"), buildChannel: (AndroidChannelBuilder) -> AndroidChannelBuilder = { it }, ) : this( channel = AndroidChannelBuilder.forAddress(rpcHost, rpcPort) @@ -110,6 +116,7 @@ public class Client @VisibleForTesting internal constructor( .context(context.applicationContext) .build(), options = options, + dispatcher = dispatcher, ) /** @@ -555,6 +562,11 @@ public class Client @VisibleForTesting internal constructor( } } + override fun close() { + scope.cancel() + (dispatcher as? Closeable)?.close() + } + private data class SyncResult(val document: Document, val result: Result) private class WatchJobHolder(val documentID: String, val job: Job) diff --git a/yorkie/src/main/kotlin/dev/yorkie/document/Document.kt b/yorkie/src/main/kotlin/dev/yorkie/document/Document.kt index df12077f4..ca883652c 100644 --- a/yorkie/src/main/kotlin/dev/yorkie/document/Document.kt +++ b/yorkie/src/main/kotlin/dev/yorkie/document/Document.kt @@ -29,10 +29,13 @@ import dev.yorkie.document.time.TimeTicket import dev.yorkie.document.time.TimeTicket.Companion.InitialTimeTicket import dev.yorkie.util.YorkieLogger import dev.yorkie.util.createSingleThreadDispatcher +import java.io.Closeable +import kotlinx.coroutines.CoroutineDispatcher import kotlinx.coroutines.CoroutineScope import kotlinx.coroutines.Deferred import kotlinx.coroutines.SupervisorJob import kotlinx.coroutines.async +import kotlinx.coroutines.cancel import kotlinx.coroutines.flow.Flow import kotlinx.coroutines.flow.MutableSharedFlow import kotlinx.coroutines.flow.MutableStateFlow @@ -50,9 +53,16 @@ import kotlinx.coroutines.withContext /** * A CRDT-based data type. * We can represent the model of the application and edit it even while offline. + * + * A single-threaded, [Closeable] [dispatcher] is used as default. + * Therefore you need to [close] the client, when the client is no longer needed. + * If you provide your own [dispatcher], it is up to you to decide [close] is needed or not. */ -public class Document(public val key: Key, private val options: Options = Options()) { - private val dispatcher = createSingleThreadDispatcher("Document($key)") +public class Document( + public val key: Key, + private val options: Options = Options(), + private val dispatcher: CoroutineDispatcher = createSingleThreadDispatcher("Document($key)"), +) : Closeable { private val scope = CoroutineScope(SupervisorJob() + dispatcher) private val localChanges = mutableListOf() @@ -395,6 +405,11 @@ public class Document(public val key: Key, private val options: Options = Option return root.toJson() } + override fun close() { + scope.cancel() + (dispatcher as? Closeable)?.close() + } + public sealed interface Event { /** diff --git a/yorkie/src/test/kotlin/dev/yorkie/core/ClientTest.kt b/yorkie/src/test/kotlin/dev/yorkie/core/ClientTest.kt index 4446e4f55..92cd73ae1 100644 --- a/yorkie/src/test/kotlin/dev/yorkie/core/ClientTest.kt +++ b/yorkie/src/test/kotlin/dev/yorkie/core/ClientTest.kt @@ -27,6 +27,7 @@ import dev.yorkie.document.change.ChangeID import dev.yorkie.document.change.ChangePack import dev.yorkie.document.change.CheckPoint import dev.yorkie.document.time.ActorID +import dev.yorkie.util.createSingleThreadDispatcher import io.grpc.Channel import io.grpc.inprocess.InProcessChannelBuilder import io.grpc.inprocess.InProcessServerBuilder @@ -38,6 +39,7 @@ import kotlin.test.assertTrue import kotlinx.coroutines.delay import kotlinx.coroutines.flow.first import kotlinx.coroutines.test.runTest +import org.junit.After import org.junit.Before import org.junit.Rule import org.junit.Test @@ -72,7 +74,16 @@ class ClientTest { channel = grpcCleanup.register( InProcessChannelBuilder.forName(serverName).directExecutor().build(), ) - target = Client(channel, Client.Options(key = TEST_KEY, apiKey = TEST_KEY)) + target = Client( + channel, + Client.Options(key = TEST_KEY, apiKey = TEST_KEY), + createSingleThreadDispatcher("Client Test"), + ) + } + + @After + fun tearDown() { + target.close() } @Test diff --git a/yorkie/src/test/kotlin/dev/yorkie/document/DocumentTest.kt b/yorkie/src/test/kotlin/dev/yorkie/document/DocumentTest.kt index 0f7fe47cc..d4b1b42b5 100644 --- a/yorkie/src/test/kotlin/dev/yorkie/document/DocumentTest.kt +++ b/yorkie/src/test/kotlin/dev/yorkie/document/DocumentTest.kt @@ -16,6 +16,7 @@ import kotlinx.coroutines.ExperimentalCoroutinesApi import kotlinx.coroutines.launch import kotlinx.coroutines.test.UnconfinedTestDispatcher import kotlinx.coroutines.test.runTest +import org.junit.After import org.junit.Assert.assertEquals import org.junit.Assert.assertThrows import org.junit.Assert.assertTrue @@ -31,6 +32,11 @@ class DocumentTest { target = Document(Document.Key("")) } + @After + fun tearDown() { + target.close() + } + @Test fun `should not throw error when trying to delete missing key`() { runTest {