diff --git a/Sources/Core/Client.swift b/Sources/Core/Client.swift index 03c38349..33a3bb75 100644 --- a/Sources/Core/Client.swift +++ b/Sources/Core/Client.swift @@ -206,10 +206,6 @@ public class Client { return } - for item in self.attachmentMap { - try self.stopWatchLoop(item.key) - } - let deactivateRequest = DeactivateClientRequest.with { $0.clientID = clientID } let deactivateResponse = await self.rpcClient.deactivateClient(request: deactivateRequest) @@ -219,6 +215,11 @@ public class Client { throw YorkieError.rpcError(message: deactivateResponse.error.debugDescription) } + for (key, attachment) in self.attachmentMap { + attachment.doc.applyStatus(.detached) + try self.detachInternal(key) + } + self.status = .deactivated Logger.info("Client(\(self.key) deactivated.") @@ -336,9 +337,7 @@ public class Client { doc.applyStatus(.detached) } - try self.stopWatchLoop(doc.getKey()) - - self.attachmentMap.removeValue(forKey: doc.getKey()) + try self.detachInternal(doc.getKey()) Logger.info("[DD] c:\"\(self.key)\" detaches d:\"\(doc.getKey())\"") @@ -683,6 +682,16 @@ public class Client { } } + private func detachInternal(_ docKey: DocumentKey) throws { + guard self.attachmentMap[docKey] != nil else { + return + } + + try self.stopWatchLoop(docKey) + + self.attachmentMap.removeValue(forKey: docKey) + } + @discardableResult private func syncInternal(_ attachment: Attachment, _ syncMode: SyncMode) async throws -> Document { guard let clientID = self.id else { diff --git a/Sources/Document/Document.swift b/Sources/Document/Document.swift index 2f4db13f..8a5aa62b 100644 --- a/Sources/Document/Document.swift +++ b/Sources/Document/Document.swift @@ -88,6 +88,7 @@ public class Document { private var subscribeCallbacks = [String: SubscribeCallback]() private var presenceSubscribeCallback = [String: SubscribeCallback]() private var connectionSubscribeCallback: SubscribeCallback? + private var statusSubscribeCallback: SubscribeCallback? private var syncSubscribeCallback: SubscribeCallback? /** @@ -194,6 +195,14 @@ public class Document { self.connectionSubscribeCallback = callback } + /** + * `subscribeStatus` registers a callback to subscribe to events on the document. + * The callback will be called when the document status changes. + */ + public func subscribeStatus(_ callback: @escaping SubscribeCallback) { + self.statusSubscribeCallback = callback + } + /** * `subscribePresence` registers a callback to subscribe to events on the document. * The callback will be called when the targetPath or any of its nested values change. @@ -639,6 +648,8 @@ public class Document { } } else if event.type == .connectionChanged { self.connectionSubscribeCallback?(event, self) + } else if event.type == .statusChanged { + self.statusSubscribeCallback?(event, self) } else if event.type == .syncStatusChanged { self.syncSubscribeCallback?(event, self) } else if event.type == .snapshot { diff --git a/Tests/Integration/DocumentIntegrationTests.swift b/Tests/Integration/DocumentIntegrationTests.swift index 27b660d1..9e850312 100644 --- a/Tests/Integration/DocumentIntegrationTests.swift +++ b/Tests/Integration/DocumentIntegrationTests.swift @@ -470,4 +470,108 @@ final class DocumentIntegrationTests: XCTestCase { try await self.c1.deactivate() try await self.c2.deactivate() } + + func test_subscribe_document_status_changed_event() async throws { + let c1 = Client(rpcAddress) + let c2 = Client(rpcAddress) + try await c1.activate() + try await c2.activate() + + let docKey = "\(self.description)-\(Date().description)".toDocKey + let d1 = Document(key: docKey) + let d2 = Document(key: docKey) + + let eventCollectorD1 = EventCollector(doc: d1) + let eventCollectorD2 = EventCollector(doc: d2) + await eventCollectorD1.subscribeDocumentStatus() + await eventCollectorD2.subscribeDocumentStatus() + + // 1. When the client attaches a document, it receives an attached event. + try await c1.attach(d1) + try await c2.attach(d2) + + // 2. When c1 detaches a document, it receives a detached event. + try await c1.detach(d1) + + // 3. When c2 deactivates, it should also receive a detached event. + try await c2.deactivate() + + await eventCollectorD1.verifyNthValue(at: 1, isEqualTo: .attached) + await eventCollectorD1.verifyNthValue(at: 2, isEqualTo: .detached) + + await eventCollectorD2.verifyNthValue(at: 1, isEqualTo: .attached) + await eventCollectorD2.verifyNthValue(at: 2, isEqualTo: .detached) + + // 4. When other document is attached, it receives an attached event. + let docKey2 = "\(self.description)-\(Date().description)".toDocKey + let d3 = Document(key: docKey2) + let d4 = Document(key: docKey2) + let eventCollectorD3 = EventCollector(doc: d3) + let eventCollectorD4 = EventCollector(doc: d4) + await eventCollectorD3.subscribeDocumentStatus() + await eventCollectorD4.subscribeDocumentStatus() + + try await c1.attach(d3, [:], .manual) + + try await c2.activate() + try await c2.attach(d4, [:], .manual) + + // 5. When c1 removes a document, it receives a removed event. + try await c1.remove(d3) + + // 6. When c2 syncs, it should also receive a removed event. + try await c2.sync() + + await eventCollectorD3.verifyNthValue(at: 1, isEqualTo: .attached) + await eventCollectorD3.verifyNthValue(at: 2, isEqualTo: .removed) + + await eventCollectorD4.verifyNthValue(at: 1, isEqualTo: .attached) + await eventCollectorD4.verifyNthValue(at: 2, isEqualTo: .removed) + + // 7. If the document is in the removed state, a detached event should not occur when deactivating. + let eventCount3 = eventCollectorD3.count + let eventCount4 = eventCollectorD4.count + try await c1.deactivate() + try await c2.deactivate() + + try await Task.sleep(nanoseconds: 500_000_000) + + XCTAssertEqual(eventCount3, eventCollectorD3.count) + XCTAssertEqual(eventCount4, eventCollectorD4.count) + } + + func test_document_status_changes_to_detached_when_deactivating() async throws { + let c1 = Client(rpcAddress) + let c2 = Client(rpcAddress) + try await c1.activate() + try await c2.activate() + + let docKey = "\(self.description)-\(Date().description)".toDocKey + let d1 = Document(key: docKey) + let d2 = Document(key: docKey) + + let eventCollectorD1 = EventCollector(doc: d1) + let eventCollectorD2 = EventCollector(doc: d2) + await eventCollectorD1.subscribeDocumentStatus() + await eventCollectorD2.subscribeDocumentStatus() + + // 1. When the client attaches a document, it receives an attached event. + try await c1.attach(d1, [:], .manual) + try await c2.attach(d2, [:], .manual) + + await eventCollectorD1.verifyNthValue(at: 1, isEqualTo: .attached) + await eventCollectorD2.verifyNthValue(at: 1, isEqualTo: .attached) + + // 2. When c1 removes a document, it receives a removed event. + try await c1.remove(d1) + await eventCollectorD1.verifyNthValue(at: 2, isEqualTo: .removed) + + // 3. When c2 deactivates, it should also receive a removed event. + try await c2.deactivate() + // NOTE: For now, document status changes to `Detached` when deactivating. + // This behavior may change in the future. + await eventCollectorD2.verifyNthValue(at: 2, isEqualTo: .detached) + + try await c1.deactivate() + } } diff --git a/Tests/Integration/IntegrationHelper.swift b/Tests/Integration/IntegrationHelper.swift index 6d74a069..2498df80 100644 --- a/Tests/Integration/IntegrationHelper.swift +++ b/Tests/Integration/IntegrationHelper.swift @@ -153,3 +153,64 @@ func subscribeDocs(_ d1: Document, _ d2: Document, _ d1Expected: [any OperationI } } } + +class EventCollector { + private let queue = DispatchQueue(label: "com.yorkie.eventcollector", attributes: .concurrent) + private var _values: [T] = [] + + let doc: Document + var values: [T] { + self.queue.sync { self._values } + } + + var count: Int { + return self.values.count + } + + init(doc: Document) { + self.doc = doc + } + + func add(event: T) { + self.queue.async(flags: .barrier) { + self._values.append(event) + } + } + + func asyncStream() -> AsyncStream { + return AsyncStream { continuation in + for value in self.values { + continuation.yield(value) + } + continuation.finish() + } + } + + func verifyNthValue(at nth: Int, isEqualTo targetValue: T) async { + if nth > self.values.count { + XCTFail("Expected \(nth)th value: \(targetValue), but only received \(self.values.count) values") + return + } + + var counter = 0 + for await value in self.asyncStream() { + counter += 1 + + if counter == nth { + XCTAssertTrue(value == targetValue, "Expected \(nth)th value: \(targetValue), actual value: \(value)") + return + } + } + + XCTFail("Stream ended before finding \(nth)th value") + } + + func subscribeDocumentStatus() async where T == DocumentStatus { + await self.doc.subscribeStatus { [weak self] event, _ in + guard let status = (event as? StatusChangedEvent)?.value.status else { + return + } + self?.add(event: status) + } + } +}