Skip to content

Commit

Permalink
Add subscribeStatus(callback) function to Document (#190)
Browse files Browse the repository at this point in the history
* Add subscribeStatus(callback) function to Document

* Trigger 'detached' event on Client.deactivate()

* Add test code for Document.subscribeStatus(callback)

* Update EventCollector based on coderabbitai review feedback
  • Loading branch information
hiddenviewer authored Oct 31, 2024
1 parent 5ba1c32 commit 13ac1a5
Show file tree
Hide file tree
Showing 4 changed files with 192 additions and 7 deletions.
23 changes: 16 additions & 7 deletions Sources/Core/Client.swift
Original file line number Diff line number Diff line change
Expand Up @@ -206,10 +206,6 @@ public class Client {
return
}

for item in self.attachmentMap {
try self.stopWatchLoop(item.key)
}

let deactivateRequest = DeactivateClientRequest.with { $0.clientID = clientID }

let deactivateResponse = await self.rpcClient.deactivateClient(request: deactivateRequest)
Expand All @@ -219,6 +215,11 @@ public class Client {
throw YorkieError.rpcError(message: deactivateResponse.error.debugDescription)
}

for (key, attachment) in self.attachmentMap {
attachment.doc.applyStatus(.detached)
try self.detachInternal(key)
}

self.status = .deactivated

Logger.info("Client(\(self.key) deactivated.")
Expand Down Expand Up @@ -336,9 +337,7 @@ public class Client {
doc.applyStatus(.detached)
}

try self.stopWatchLoop(doc.getKey())

self.attachmentMap.removeValue(forKey: doc.getKey())
try self.detachInternal(doc.getKey())

Logger.info("[DD] c:\"\(self.key)\" detaches d:\"\(doc.getKey())\"")

Expand Down Expand Up @@ -683,6 +682,16 @@ public class Client {
}
}

private func detachInternal(_ docKey: DocumentKey) throws {
guard self.attachmentMap[docKey] != nil else {
return
}

try self.stopWatchLoop(docKey)

self.attachmentMap.removeValue(forKey: docKey)
}

@discardableResult
private func syncInternal(_ attachment: Attachment, _ syncMode: SyncMode) async throws -> Document {
guard let clientID = self.id else {
Expand Down
11 changes: 11 additions & 0 deletions Sources/Document/Document.swift
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@ public class Document {
private var subscribeCallbacks = [String: SubscribeCallback]()
private var presenceSubscribeCallback = [String: SubscribeCallback]()
private var connectionSubscribeCallback: SubscribeCallback?
private var statusSubscribeCallback: SubscribeCallback?
private var syncSubscribeCallback: SubscribeCallback?

/**
Expand Down Expand Up @@ -194,6 +195,14 @@ public class Document {
self.connectionSubscribeCallback = callback
}

/**
* `subscribeStatus` registers a callback to subscribe to events on the document.
* The callback will be called when the document status changes.
*/
public func subscribeStatus(_ callback: @escaping SubscribeCallback) {
self.statusSubscribeCallback = callback
}

/**
* `subscribePresence` registers a callback to subscribe to events on the document.
* The callback will be called when the targetPath or any of its nested values change.
Expand Down Expand Up @@ -639,6 +648,8 @@ public class Document {
}
} else if event.type == .connectionChanged {
self.connectionSubscribeCallback?(event, self)
} else if event.type == .statusChanged {
self.statusSubscribeCallback?(event, self)
} else if event.type == .syncStatusChanged {
self.syncSubscribeCallback?(event, self)
} else if event.type == .snapshot {
Expand Down
104 changes: 104 additions & 0 deletions Tests/Integration/DocumentIntegrationTests.swift
Original file line number Diff line number Diff line change
Expand Up @@ -470,4 +470,108 @@ final class DocumentIntegrationTests: XCTestCase {
try await self.c1.deactivate()
try await self.c2.deactivate()
}

func test_subscribe_document_status_changed_event() async throws {
let c1 = Client(rpcAddress)
let c2 = Client(rpcAddress)
try await c1.activate()
try await c2.activate()

let docKey = "\(self.description)-\(Date().description)".toDocKey
let d1 = Document(key: docKey)
let d2 = Document(key: docKey)

let eventCollectorD1 = EventCollector<DocumentStatus>(doc: d1)
let eventCollectorD2 = EventCollector<DocumentStatus>(doc: d2)
await eventCollectorD1.subscribeDocumentStatus()
await eventCollectorD2.subscribeDocumentStatus()

// 1. When the client attaches a document, it receives an attached event.
try await c1.attach(d1)
try await c2.attach(d2)

// 2. When c1 detaches a document, it receives a detached event.
try await c1.detach(d1)

// 3. When c2 deactivates, it should also receive a detached event.
try await c2.deactivate()

await eventCollectorD1.verifyNthValue(at: 1, isEqualTo: .attached)
await eventCollectorD1.verifyNthValue(at: 2, isEqualTo: .detached)

await eventCollectorD2.verifyNthValue(at: 1, isEqualTo: .attached)
await eventCollectorD2.verifyNthValue(at: 2, isEqualTo: .detached)

// 4. When other document is attached, it receives an attached event.
let docKey2 = "\(self.description)-\(Date().description)".toDocKey
let d3 = Document(key: docKey2)
let d4 = Document(key: docKey2)
let eventCollectorD3 = EventCollector<DocumentStatus>(doc: d3)
let eventCollectorD4 = EventCollector<DocumentStatus>(doc: d4)
await eventCollectorD3.subscribeDocumentStatus()
await eventCollectorD4.subscribeDocumentStatus()

try await c1.attach(d3, [:], .manual)

try await c2.activate()
try await c2.attach(d4, [:], .manual)

// 5. When c1 removes a document, it receives a removed event.
try await c1.remove(d3)

// 6. When c2 syncs, it should also receive a removed event.
try await c2.sync()

await eventCollectorD3.verifyNthValue(at: 1, isEqualTo: .attached)
await eventCollectorD3.verifyNthValue(at: 2, isEqualTo: .removed)

await eventCollectorD4.verifyNthValue(at: 1, isEqualTo: .attached)
await eventCollectorD4.verifyNthValue(at: 2, isEqualTo: .removed)

// 7. If the document is in the removed state, a detached event should not occur when deactivating.
let eventCount3 = eventCollectorD3.count
let eventCount4 = eventCollectorD4.count
try await c1.deactivate()
try await c2.deactivate()

try await Task.sleep(nanoseconds: 500_000_000)

XCTAssertEqual(eventCount3, eventCollectorD3.count)
XCTAssertEqual(eventCount4, eventCollectorD4.count)
}

func test_document_status_changes_to_detached_when_deactivating() async throws {
let c1 = Client(rpcAddress)
let c2 = Client(rpcAddress)
try await c1.activate()
try await c2.activate()

let docKey = "\(self.description)-\(Date().description)".toDocKey
let d1 = Document(key: docKey)
let d2 = Document(key: docKey)

let eventCollectorD1 = EventCollector<DocumentStatus>(doc: d1)
let eventCollectorD2 = EventCollector<DocumentStatus>(doc: d2)
await eventCollectorD1.subscribeDocumentStatus()
await eventCollectorD2.subscribeDocumentStatus()

// 1. When the client attaches a document, it receives an attached event.
try await c1.attach(d1, [:], .manual)
try await c2.attach(d2, [:], .manual)

await eventCollectorD1.verifyNthValue(at: 1, isEqualTo: .attached)
await eventCollectorD2.verifyNthValue(at: 1, isEqualTo: .attached)

// 2. When c1 removes a document, it receives a removed event.
try await c1.remove(d1)
await eventCollectorD1.verifyNthValue(at: 2, isEqualTo: .removed)

// 3. When c2 deactivates, it should also receive a removed event.
try await c2.deactivate()
// NOTE: For now, document status changes to `Detached` when deactivating.
// This behavior may change in the future.
await eventCollectorD2.verifyNthValue(at: 2, isEqualTo: .detached)

try await c1.deactivate()
}
}
61 changes: 61 additions & 0 deletions Tests/Integration/IntegrationHelper.swift
Original file line number Diff line number Diff line change
Expand Up @@ -153,3 +153,64 @@ func subscribeDocs(_ d1: Document, _ d2: Document, _ d1Expected: [any OperationI
}
}
}

class EventCollector<T: Equatable> {
private let queue = DispatchQueue(label: "com.yorkie.eventcollector", attributes: .concurrent)
private var _values: [T] = []

let doc: Document
var values: [T] {
self.queue.sync { self._values }
}

var count: Int {
return self.values.count
}

init(doc: Document) {
self.doc = doc
}

func add(event: T) {
self.queue.async(flags: .barrier) {
self._values.append(event)
}
}

func asyncStream() -> AsyncStream<T> {
return AsyncStream<T> { continuation in
for value in self.values {
continuation.yield(value)
}
continuation.finish()
}
}

func verifyNthValue(at nth: Int, isEqualTo targetValue: T) async {
if nth > self.values.count {
XCTFail("Expected \(nth)th value: \(targetValue), but only received \(self.values.count) values")
return
}

var counter = 0
for await value in self.asyncStream() {
counter += 1

if counter == nth {
XCTAssertTrue(value == targetValue, "Expected \(nth)th value: \(targetValue), actual value: \(value)")
return
}
}

XCTFail("Stream ended before finding \(nth)th value")
}

func subscribeDocumentStatus() async where T == DocumentStatus {
await self.doc.subscribeStatus { [weak self] event, _ in
guard let status = (event as? StatusChangedEvent)?.value.status else {
return
}
self?.add(event: status)
}
}
}

0 comments on commit 13ac1a5

Please sign in to comment.