Skip to content

Commit

Permalink
fix(realtime): handle timeout when subscribing to channel (#349)
Browse files Browse the repository at this point in the history
* fix(realtime): handle timeout when subscribing to channel

* test: add test for channel subscribe timeout

* Fix typo

* test: add test for subscribe with timeout

* remove duplicated test
  • Loading branch information
grdsdev authored May 29, 2024
1 parent 41e3c6e commit a222dd4
Show file tree
Hide file tree
Showing 5 changed files with 272 additions and 112 deletions.
14 changes: 12 additions & 2 deletions Sources/Realtime/V2/PushV2.swift
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,18 @@ actor PushV2 {
await channel?.socket?.push(message)

if channel?.config.broadcast.acknowledgeBroadcasts == true {
return await withCheckedContinuation {
receivedContinuation = $0
do {
return try await withTimeout(interval: channel?.socket?.options.timeoutInterval ?? 10) {
await withCheckedContinuation {
self.receivedContinuation = $0
}
}
} catch is TimeoutError {
channel?.logger?.debug("Push timed out.")
return .timeout
} catch {
channel?.logger?.error("Error sending push: \(error)")
return .error
}
}

Expand Down
13 changes: 12 additions & 1 deletion Sources/Realtime/V2/RealtimeChannelV2.swift
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,18 @@ public actor RealtimeChannelV2 {
)
)

_ = await statusChange.first { @Sendable in $0 == .subscribed }
do {
try await withTimeout(interval: socket?.options.timeoutInterval ?? 10) { [self] in
_ = await statusChange.first { @Sendable in $0 == .subscribed }
}
} catch {
if error is TimeoutError {
logger?.debug("subscribe timed out.")
await subscribe()
} else {
logger?.error("subscribe failed: \(error)")
}
}
}

public func unsubscribe() async {
Expand Down
39 changes: 39 additions & 0 deletions Sources/_Helpers/Task+withTimeout.swift
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
//
// Task+withTimeout.swift
//
//
// Created by Guilherme Souza on 19/04/24.
//

import Foundation

@discardableResult
package func withTimeout<R: Sendable>(
interval: TimeInterval,
@_inheritActorContext operation: @escaping @Sendable () async throws -> R
) async throws -> R {
try await withThrowingTaskGroup(of: R.self) { group in
defer {
group.cancelAll()
}

let deadline = Date(timeIntervalSinceNow: interval)

group.addTask {
try await operation()
}

group.addTask {
let interval = deadline.timeIntervalSinceNow
if interval > 0 {
try await Task.sleep(nanoseconds: NSEC_PER_SEC * UInt64(interval))
}
try Task.checkCancellation()
throw TimeoutError()
}

return try await group.next()!
}
}

package struct TimeoutError: Error, Hashable {}
Loading

0 comments on commit a222dd4

Please sign in to comment.