diff --git a/Sources/MongoClient/Cursor.swift b/Sources/MongoClient/Cursor.swift index 6fd16b10..f58e1826 100644 --- a/Sources/MongoClient/Cursor.swift +++ b/Sources/MongoClient/Cursor.swift @@ -90,16 +90,21 @@ public final class MongoCursor { command.maxTimeMS = self.maxTimeMS command.readConcern = readConcern - let newCursor = try await connection.executeCodable( - command, - decodeAs: GetMoreReply.self, - namespace: namespace, - in: self.transaction, - sessionId: session?.sessionId, - traceLabel: "\(traceLabel ?? "UnknownOperation").getMore", - serviceContext: context - ) - + let newCursor = try await withTaskCancellationHandler { + try await connection.executeCodable( + command, + decodeAs: GetMoreReply.self, + namespace: namespace, + in: self.transaction, + sessionId: session?.sessionId, + traceLabel: "\(traceLabel ?? "UnknownOperation").getMore", + serviceContext: context + ) + } onCancel: { + Task { + try await self.close() + } + } self.id = newCursor.cursor.id return newCursor.cursor.nextBatch } diff --git a/Sources/MongoKitten/ChangeStream.swift b/Sources/MongoKitten/ChangeStream.swift index 7fef685f..e1ed3bb3 100644 --- a/Sources/MongoKitten/ChangeStream.swift +++ b/Sources/MongoKitten/ChangeStream.swift @@ -116,8 +116,9 @@ extension MongoCollection { } /// A change stream is a stream of change notifications for a collection or database -public struct ChangeStream { +public struct ChangeStream: AsyncSequence { public typealias Notification = ChangeStreamNotification + public typealias Element = Notification typealias InputCursor = FinalizedCursor> internal let cursor: InputCursor @@ -137,6 +138,24 @@ public struct ChangeStream { public mutating func setGetMoreInterval(to interval: TimeAmount? = nil) { self.getMoreInterval = interval } + + public struct AsyncIterator: AsyncIteratorProtocol { + public typealias Element = Notification + + private var iterator: InputCursor.AsyncIterator + + init(iterator: InputCursor.AsyncIterator) { + self.iterator = iterator + } + + public mutating func next() async throws -> Element? { + try await iterator.next() + } + } + + public func makeAsyncIterator() -> AsyncIterator { + AsyncIterator(iterator: cursor.makeAsyncIterator()) + } /// Iterates over the change stream notifications and calls the given handler for each notification /// - Parameter handler: The handler to call for each notification