Skip to content

Commit

Permalink
Change Client, Document actor to class (#187)
Browse files Browse the repository at this point in the history
* Change Client, Document actor to class

* Refactor for loop to for-in to speed
  • Loading branch information
humdrum authored Jul 17, 2024
1 parent 62bd08f commit 6860357
Show file tree
Hide file tree
Showing 5 changed files with 70 additions and 106 deletions.
84 changes: 41 additions & 43 deletions Sources/Core/Client.swift
Original file line number Diff line number Diff line change
Expand Up @@ -126,8 +126,9 @@ public struct ClientOptions {
* It has documents and sends changes of the documents in local
* to the server to synchronize with other replicas in remote.
*/
public actor Client {
private var attachmentMap: [DocumentKey: Attachment]
@MainActor
public class Client {
private var attachmentMap = [DocumentKey: Attachment]()
private let syncLoopDuration: Int
private let reconnectStreamDelay: Int
private let maximumAttachmentTimeout: Int
Expand All @@ -141,17 +142,14 @@ public actor Client {
public private(set) var id: ActorID?
public nonisolated let key: String
public var isActive: Bool { self.status == .activated }
public private(set) var status: ClientStatus
public private(set) var status: ClientStatus = .deactivated

/**
* @param rpcAddr - the address of the RPC server.
* @param opts - the options of the client.
*/
public init(_ urlString: String, _ options: ClientOptions = ClientOptions()) {
public nonisolated init(_ urlString: String, _ options: ClientOptions = ClientOptions()) {
self.key = options.key ?? UUID().uuidString

self.status = .deactivated
self.attachmentMap = [String: Attachment]()
self.syncLoopDuration = options.syncLoopDuration
self.reconnectStreamDelay = options.reconnectStreamDelay
self.maximumAttachmentTimeout = options.maximumAttachmentTimeout
Expand All @@ -169,7 +167,7 @@ public actor Client {
* @param url - the url of the RPC server.
* @param opts - the options of the client.
*/
init?(_ url: URL, _ options: ClientOptions = ClientOptions()) {
convenience init?(_ url: URL, _ options: ClientOptions = ClientOptions()) {
self.init(url.absoluteString, options)
}

Expand Down Expand Up @@ -240,18 +238,18 @@ public actor Client {
throw YorkieError.unexpected(message: "Invalid client ID! [\(self.id ?? "nil")]")
}

guard await doc.status == .detached else {
guard doc.status == .detached else {
throw YorkieError.documentNotDetached(message: "\(doc) is not detached.")
}

await doc.setActor(clientID)
try await doc.update { _, presence in
doc.setActor(clientID)
try doc.update { _, presence in
presence.set(initialPresence)
}

var attachRequest = AttachDocumentRequest()
attachRequest.clientID = clientID
attachRequest.changePack = Converter.toChangePack(pack: await doc.createChangePack())
attachRequest.changePack = Converter.toChangePack(pack: doc.createChangePack())

do {
let docKey = doc.getKey()
Expand All @@ -266,18 +264,18 @@ public actor Client {
}

let pack = try Converter.fromChangePack(message.changePack)
try await doc.applyChangePack(pack)
try doc.applyChangePack(pack)

if await doc.status == .removed {
if doc.status == .removed {
throw YorkieError.documentRemoved(message: "\(doc) is removed.")
}

await doc.applyStatus(.attached)
doc.applyStatus(.attached)

self.attachmentMap[doc.getKey()] = Attachment(doc: doc, docID: message.documentID, syncMode: syncMode, remoteChangeEventReceived: false)

if syncMode != .manual {
try await self.runWatchLoop(docKey)
try self.runWatchLoop(docKey)
try await self.waitForInitialization(semaphore, docKey)
}

Expand Down Expand Up @@ -314,14 +312,14 @@ public actor Client {
throw YorkieError.documentNotAttached(message: "\(doc.getKey()) is not attached when \(#function).")
}

try await doc.update { _, presence in
try doc.update { _, presence in
presence.clear()
}

var detachDocumentRequest = DetachDocumentRequest()
detachDocumentRequest.clientID = clientID
detachDocumentRequest.documentID = attachment.docID
detachDocumentRequest.changePack = Converter.toChangePack(pack: await doc.createChangePack())
detachDocumentRequest.changePack = Converter.toChangePack(pack: doc.createChangePack())

do {
let detachDocumentResponse = await self.rpcClient.detachDocument(request: detachDocumentRequest, headers: self.authHeader.makeHeader(doc.getKey()))
Expand All @@ -332,10 +330,10 @@ public actor Client {

let pack = try Converter.fromChangePack(message.changePack)

try await doc.applyChangePack(pack)
try doc.applyChangePack(pack)

if await doc.status != .removed {
await doc.applyStatus(.detached)
if doc.status != .removed {
doc.applyStatus(.detached)
}

try self.stopWatchLoop(doc.getKey())
Expand Down Expand Up @@ -371,7 +369,7 @@ public actor Client {
var removeDocumentRequest = RemoveDocumentRequest()
removeDocumentRequest.clientID = clientID
removeDocumentRequest.documentID = attachment.docID
removeDocumentRequest.changePack = Converter.toChangePack(pack: await doc.createChangePack(true))
removeDocumentRequest.changePack = Converter.toChangePack(pack: doc.createChangePack(true))

do {
let removeDocumentResponse = await self.rpcClient.removeDocument(request: removeDocumentRequest, headers: self.authHeader.makeHeader(doc.getKey()))
Expand All @@ -381,7 +379,7 @@ public actor Client {
}

let pack = try Converter.fromChangePack(message.changePack)
try await doc.applyChangePack(pack)
try doc.applyChangePack(pack)

try self.stopWatchLoop(doc.getKey())

Expand All @@ -400,7 +398,7 @@ public actor Client {
* `changeSyncMode` changes the synchronization mode of the given document.
*/
@discardableResult
public func changeSyncMode(_ doc: Document, _ syncMode: SyncMode) async throws -> Document {
public func changeSyncMode(_ doc: Document, _ syncMode: SyncMode) throws -> Document {
let docKey = doc.getKey()

guard let attachment = self.attachmentMap[docKey] else {
Expand Down Expand Up @@ -430,7 +428,7 @@ public actor Client {

// manual to realtime
if prevSyncMode == .manual {
try await self.runWatchLoop(docKey)
try self.runWatchLoop(docKey)
}

return doc
Expand Down Expand Up @@ -535,7 +533,7 @@ public actor Client {
await self.doSyncLoop()
}

private func doWatchLoop(_ docKey: DocumentKey) async throws {
private func doWatchLoop(_ docKey: DocumentKey) throws {
self.attachmentMap[docKey]?.resetWatchLoopTimer()

guard self.isActive, let id = self.id else {
Expand Down Expand Up @@ -582,13 +580,13 @@ public actor Client {

self.attachmentMap[docKey]?.connectStream(stream)

await self.attachmentMap[docKey]?.doc.publishConnectionEvent(.connected)
self.attachmentMap[docKey]?.doc.publishConnectionEvent(.connected)
}

private func runWatchLoop(_ docKey: DocumentKey) async throws {
private func runWatchLoop(_ docKey: DocumentKey) throws {
Logger.debug("[WL] c:\"\(self.key)\" run watch loop")

try await self.doWatchLoop(docKey)
try self.doWatchLoop(docKey)
}

private func stopWatchLoop(_ docKey: DocumentKey) throws {
Expand Down Expand Up @@ -620,38 +618,38 @@ public actor Client {
switch body {
case .initialization(let initialization):
var onlineClients = Set<ActorID>()
let actorID = await self.attachmentMap[docKey]?.doc.actorID
let actorID = self.attachmentMap[docKey]?.doc.actorID

for pbClientID in initialization.clientIds.filter({ $0 != actorID }) {
onlineClients.insert(pbClientID)
}

self.semaphoresForInitialzation[docKey]?.signal()

await self.attachmentMap[docKey]?.doc.setOnlineClients(onlineClients)
await self.attachmentMap[docKey]?.doc.publishPresenceEvent(.initialized)
self.attachmentMap[docKey]?.doc.setOnlineClients(onlineClients)
self.attachmentMap[docKey]?.doc.publishPresenceEvent(.initialized)
case .event(let pbWatchEvent):
let publisher = pbWatchEvent.publisher

switch pbWatchEvent.type {
case .documentChanged:
self.attachmentMap[docKey]?.remoteChangeEventReceived = true
case .documentWatched:
await self.attachmentMap[docKey]?.doc.addOnlineClient(publisher)
self.attachmentMap[docKey]?.doc.addOnlineClient(publisher)
// NOTE(chacha912): We added to onlineClients, but we won't trigger watched event
// unless we also know their initial presence data at this point.
if let presence = await self.attachmentMap[docKey]?.doc.getPresence(publisher) {
await self.attachmentMap[docKey]?.doc.publishPresenceEvent(.watched, publisher, presence)
if let presence = self.attachmentMap[docKey]?.doc.getPresence(publisher) {
self.attachmentMap[docKey]?.doc.publishPresenceEvent(.watched, publisher, presence)
}
case .documentUnwatched:
// NOTE(chacha912): There is no presence, when PresenceChange(clear) is applied before unwatching.
// In that case, the 'unwatched' event is triggered while handling the PresenceChange.
let presence = await self.attachmentMap[docKey]?.doc.getPresence(publisher)
let presence = self.attachmentMap[docKey]?.doc.getPresence(publisher)

await self.attachmentMap[docKey]?.doc.removeOnlineClient(publisher)
self.attachmentMap[docKey]?.doc.removeOnlineClient(publisher)

if let presence {
await self.attachmentMap[docKey]?.doc.publishPresenceEvent(.unwatched, publisher, presence)
self.attachmentMap[docKey]?.doc.publishPresenceEvent(.unwatched, publisher, presence)
}
default:
break
Expand Down Expand Up @@ -695,7 +693,7 @@ public actor Client {
pushPullRequest.clientID = clientID

let doc = attachment.doc
let requestPack = await doc.createChangePack()
let requestPack = doc.createChangePack()
let localSize = requestPack.getChangeSize()

pushPullRequest.changePack = Converter.toChangePack(pack: requestPack)
Expand All @@ -719,20 +717,20 @@ public actor Client {
return doc
}

try await doc.applyChangePack(responsePack)
try doc.applyChangePack(responsePack)

if await doc.status == .removed {
if doc.status == .removed {
self.attachmentMap.removeValue(forKey: docKey)
}

await doc.publishSyncEvent(.synced)
doc.publishSyncEvent(.synced)

let remoteSize = responsePack.getChangeSize()
Logger.info("[PP] c:\"\(self.key)\" sync d:\"\(docKey)\", push:\(localSize) pull:\(remoteSize) cp:\(responsePack.getCheckpoint().toTestString)")

return doc
} catch {
await doc.publishSyncEvent(.syncFailed)
doc.publishSyncEvent(.syncFailed)

Logger.error("[PP] c:\"\(self.key)\" err : \(error)")

Expand Down
6 changes: 2 additions & 4 deletions Sources/Document/CRDT/CRDTTree.swift
Original file line number Diff line number Diff line change
Expand Up @@ -621,8 +621,7 @@ class CRDTTree: CRDTElement {
let allChildren = realParent.innerChildren
let index = isLeftMost ? 0 : (allChildren.firstIndex(where: { $0 === leftNode }) ?? -1) + 1

for index in index ..< allChildren.count {
let next = allChildren[index]
for next in allChildren.suffix(from: index) {
if !next.id.createdAt.after(editedAt) {
break
}
Expand Down Expand Up @@ -1141,8 +1140,7 @@ class CRDTTree: CRDTElement {
// Generate ranges by accumulating consecutive nodes.
var start: TreeToken<CRDTTreeNode>?
var end: TreeToken<CRDTTreeNode>?
for index in 0 ..< candidates.count {
let cur = candidates[index]
for (index, cur) in candidates.enumerated() {
let next = candidates[safe: index + 1]
if start == nil {
start = cur
Expand Down
Loading

0 comments on commit 6860357

Please sign in to comment.