From a222dd4aad072917d44ba18232bb32c01b5e1c18 Mon Sep 17 00:00:00 2001 From: Guilherme Souza Date: Wed, 29 May 2024 08:23:48 -0300 Subject: [PATCH] fix(realtime): handle timeout when subscribing to channel (#349) * fix(realtime): handle timeout when subscribing to channel * test: add test for channel subscribe timeout * Fix typo * test: add test for subscribe with timeout * remove duplicated test --- Sources/Realtime/V2/PushV2.swift | 14 +- Sources/Realtime/V2/RealtimeChannelV2.swift | 13 +- Sources/_Helpers/Task+withTimeout.swift | 39 +++ Tests/RealtimeTests/RealtimeTests.swift | 284 ++++++++++++-------- Tests/_HelpersTests/WithTimeoutTests.swift | 34 +++ 5 files changed, 272 insertions(+), 112 deletions(-) create mode 100644 Sources/_Helpers/Task+withTimeout.swift create mode 100644 Tests/_HelpersTests/WithTimeoutTests.swift diff --git a/Sources/Realtime/V2/PushV2.swift b/Sources/Realtime/V2/PushV2.swift index 54ec2b87..fbbdfd9b 100644 --- a/Sources/Realtime/V2/PushV2.swift +++ b/Sources/Realtime/V2/PushV2.swift @@ -23,8 +23,18 @@ actor PushV2 { await channel?.socket?.push(message) if channel?.config.broadcast.acknowledgeBroadcasts == true { - return await withCheckedContinuation { - receivedContinuation = $0 + do { + return try await withTimeout(interval: channel?.socket?.options.timeoutInterval ?? 10) { + await withCheckedContinuation { + self.receivedContinuation = $0 + } + } + } catch is TimeoutError { + channel?.logger?.debug("Push timed out.") + return .timeout + } catch { + channel?.logger?.error("Error sending push: \(error)") + return .error } } diff --git a/Sources/Realtime/V2/RealtimeChannelV2.swift b/Sources/Realtime/V2/RealtimeChannelV2.swift index 41538c07..f4362269 100644 --- a/Sources/Realtime/V2/RealtimeChannelV2.swift +++ b/Sources/Realtime/V2/RealtimeChannelV2.swift @@ -108,7 +108,18 @@ public actor RealtimeChannelV2 { ) ) - _ = await statusChange.first { @Sendable in $0 == .subscribed } + do { + try await withTimeout(interval: socket?.options.timeoutInterval ?? 10) { [self] in + _ = await statusChange.first { @Sendable in $0 == .subscribed } + } + } catch { + if error is TimeoutError { + logger?.debug("subscribe timed out.") + await subscribe() + } else { + logger?.error("subscribe failed: \(error)") + } + } } public func unsubscribe() async { diff --git a/Sources/_Helpers/Task+withTimeout.swift b/Sources/_Helpers/Task+withTimeout.swift new file mode 100644 index 00000000..290b759e --- /dev/null +++ b/Sources/_Helpers/Task+withTimeout.swift @@ -0,0 +1,39 @@ +// +// Task+withTimeout.swift +// +// +// Created by Guilherme Souza on 19/04/24. +// + +import Foundation + +@discardableResult +package func withTimeout( + interval: TimeInterval, + @_inheritActorContext operation: @escaping @Sendable () async throws -> R +) async throws -> R { + try await withThrowingTaskGroup(of: R.self) { group in + defer { + group.cancelAll() + } + + let deadline = Date(timeIntervalSinceNow: interval) + + group.addTask { + try await operation() + } + + group.addTask { + let interval = deadline.timeIntervalSinceNow + if interval > 0 { + try await Task.sleep(nanoseconds: NSEC_PER_SEC * UInt64(interval)) + } + try Task.checkCancellation() + throw TimeoutError() + } + + return try await group.next()! + } +} + +package struct TimeoutError: Error, Hashable {} diff --git a/Tests/RealtimeTests/RealtimeTests.swift b/Tests/RealtimeTests/RealtimeTests.swift index 4bb39313..f4a1e3c5 100644 --- a/Tests/RealtimeTests/RealtimeTests.swift +++ b/Tests/RealtimeTests/RealtimeTests.swift @@ -27,81 +27,59 @@ final class RealtimeTests: XCTestCase { options: RealtimeClientOptions( headers: ["apikey": apiKey], heartbeatInterval: 1, - reconnectDelay: 1 + reconnectDelay: 1, + timeoutInterval: 2, + logger: TestLogger() ), ws: ws ) } - func testBehavior_Closure() async { - let channel = await sut.channel("public:messages") - _ = await channel.onPostgresChange(InsertAction.self, table: "messages") { _ in } - _ = await channel.onPostgresChange(UpdateAction.self, table: "messages") { _ in } - _ = await channel.onPostgresChange(DeleteAction.self, table: "messages") { _ in } - - let statusChange = await sut.statusChange - - await connectSocketAndWait() - - let status = await statusChange.prefix(3).collect() - XCTAssertEqual(status, [.disconnected, .connecting, .connected]) - - let messageTask = await sut.messageTask - XCTAssertNotNil(messageTask) - - let heartbeatTask = await sut.heartbeatTask - XCTAssertNotNil(heartbeatTask) - - let subscription = Task { - await channel.subscribe() - } - await Task.megaYield() - ws.mockReceive(.messagesSubscribed) - - // Wait until channel subscribed - await subscription.value + override func tearDown() async throws { + await sut.disconnect() - XCTAssertNoDifference(ws.sentMessages.value, [.subscribeToMessages]) + try await super.tearDown() } - func testBehavior_AsyncAwait() async { - let channel = await sut.channel("public:messages") - _ = await channel.postgresChange(InsertAction.self, table: "messages") - _ = await channel.postgresChange(UpdateAction.self, table: "messages") - _ = await channel.postgresChange(DeleteAction.self, table: "messages") + func testBehavior() async throws { + try await withTimeout(interval: 2) { [self] in + let channel = await sut.channel("public:messages") + _ = await channel.postgresChange(InsertAction.self, table: "messages") + _ = await channel.postgresChange(UpdateAction.self, table: "messages") + _ = await channel.postgresChange(DeleteAction.self, table: "messages") - let statusChange = await sut.statusChange + let statusChange = await sut.statusChange - await connectSocketAndWait() + await connectSocketAndWait() - let status = await statusChange.prefix(3).collect() - XCTAssertEqual(status, [.disconnected, .connecting, .connected]) + let status = await statusChange.prefix(3).collect() + XCTAssertEqual(status, [.disconnected, .connecting, .connected]) - let messageTask = await sut.messageTask - XCTAssertNotNil(messageTask) + let messageTask = await sut.messageTask + XCTAssertNotNil(messageTask) - let heartbeatTask = await sut.heartbeatTask - XCTAssertNotNil(heartbeatTask) + let heartbeatTask = await sut.heartbeatTask + XCTAssertNotNil(heartbeatTask) - let subscription = Task { - await channel.subscribe() - } - await Task.megaYield() - ws.mockReceive(.messagesSubscribed) + let subscription = Task { + await channel.subscribe() + } + await Task.megaYield() + ws.mockReceive(.messagesSubscribed) - // Wait until channel subscribed - await subscription.value + // Wait until channel subscribed + await subscription.value - XCTAssertNoDifference(ws.sentMessages.value, [.subscribeToMessages]) + XCTAssertNoDifference(ws.sentMessages.value, [.subscribeToMessages(ref: "1", joinRef: "1")]) + } } - func testHeartbeat() async throws { - let expectation = expectation(description: "heartbeat") - expectation.expectedFulfillmentCount = 2 + func testSubscribeTimeout() async throws { + let channel = await sut.channel("public:messages") + let joinEventCount = LockIsolated(0) ws.on { message in if message.event == "heartbeat" { - expectation.fulfill() return RealtimeMessageV2( joinRef: message.joinRef, ref: message.ref, @@ -114,58 +92,138 @@ final class RealtimeTests: XCTestCase { ) } + if message.event == "phx_join" { + joinEventCount.withValue { $0 += 1 } + + // Skip first join. + if joinEventCount.value == 2 { + return .messagesSubscribed + } + } + return nil } await connectSocketAndWait() - await fulfillment(of: [expectation], timeout: 3) - } - - func testHeartbeat_whenNoResponse_shouldReconnect() async throws { - let sentHeartbeatExpectation = expectation(description: "sentHeartbeat") + Task { + await channel.subscribe() + } - ws.on { - if $0.event == "heartbeat" { - sentHeartbeatExpectation.fulfill() - } + await Task.megaYield() - return nil - } + try? await Task.sleep(nanoseconds: NSEC_PER_SEC * 2) + + let joinSentMessages = ws.sentMessages.value.filter { $0.event == "phx_join" } + + let expectedMessages = try [ + RealtimeMessageV2( + joinRef: "1", + ref: "1", + topic: "realtime:public:messages", + event: "phx_join", + payload: JSONObject( + RealtimeJoinPayload( + config: RealtimeJoinConfig(), + accessToken: apiKey + ) + ) + ), + RealtimeMessageV2( + joinRef: "3", + ref: "3", + topic: "realtime:public:messages", + event: "phx_join", + payload: JSONObject( + RealtimeJoinPayload( + config: RealtimeJoinConfig(), + accessToken: apiKey + ) + ) + ), + ] - let statuses = LockIsolated<[RealtimeClientV2.Status]>([]) + XCTAssertNoDifference( + joinSentMessages, + expectedMessages + ) + } - Task { - for await status in await sut.statusChange { - statuses.withValue { - $0.append(status) + func testHeartbeat() async throws { + try await withTimeout(interval: 4) { [self] in + let expectation = expectation(description: "heartbeat") + expectation.expectedFulfillmentCount = 2 + + ws.on { message in + if message.event == "heartbeat" { + expectation.fulfill() + return RealtimeMessageV2( + joinRef: message.joinRef, + ref: message.ref, + topic: "phoenix", + event: "phx_reply", + payload: [ + "response": [:], + "status": "ok", + ] + ) } + + return nil } + + await connectSocketAndWait() + + await fulfillment(of: [expectation], timeout: 3) } - await Task.megaYield() - await connectSocketAndWait() + } - await fulfillment(of: [sentHeartbeatExpectation], timeout: 2) + func testHeartbeat_whenNoResponse_shouldReconnect() async throws { + try await withTimeout(interval: 6) { [self] in + let sentHeartbeatExpectation = expectation(description: "sentHeartbeat") - let pendingHeartbeatRef = await sut.pendingHeartbeatRef - XCTAssertNotNil(pendingHeartbeatRef) + ws.on { + if $0.event == "heartbeat" { + sentHeartbeatExpectation.fulfill() + } - // Wait until next heartbeat - try await Task.sleep(nanoseconds: NSEC_PER_SEC * 2) + return nil + } - // Wait for reconnect delay - try await Task.sleep(nanoseconds: NSEC_PER_SEC * 1) + let statuses = LockIsolated<[RealtimeClientV2.Status]>([]) - XCTAssertEqual( - statuses.value, - [ - .disconnected, - .connecting, - .connected, - .disconnected, - .connecting, - ] - ) + Task { + for await status in await sut.statusChange { + statuses.withValue { + $0.append(status) + } + } + } + await Task.megaYield() + await connectSocketAndWait() + + await fulfillment(of: [sentHeartbeatExpectation], timeout: 2) + + let pendingHeartbeatRef = await sut.pendingHeartbeatRef + XCTAssertNotNil(pendingHeartbeatRef) + + // Wait until next heartbeat + try await Task.sleep(nanoseconds: NSEC_PER_SEC * 2) + + // Wait for reconnect delay + try await Task.sleep(nanoseconds: NSEC_PER_SEC * 1) + + XCTAssertEqual( + statuses.value, + [ + .disconnected, + .connecting, + .connected, + .disconnected, + .connecting, + ] + ) + } } private func connectSocketAndWait() async { @@ -180,27 +238,29 @@ final class RealtimeTests: XCTestCase { } extension RealtimeMessageV2 { - static let subscribeToMessages = Self( - joinRef: "1", - ref: "1", - topic: "realtime:public:messages", - event: "phx_join", - payload: [ - "access_token": "anon.api.key", - "config": [ - "broadcast": [ - "self": false, - "ack": false, - ], - "postgres_changes": [ - ["table": "messages", "event": "INSERT", "schema": "public"], - ["table": "messages", "schema": "public", "event": "UPDATE"], - ["schema": "public", "table": "messages", "event": "DELETE"], + static func subscribeToMessages(ref: String?, joinRef: String?) -> RealtimeMessageV2 { + Self( + joinRef: joinRef, + ref: ref, + topic: "realtime:public:messages", + event: "phx_join", + payload: [ + "access_token": "anon.api.key", + "config": [ + "broadcast": [ + "self": false, + "ack": false, + ], + "postgres_changes": [ + ["table": "messages", "event": "INSERT", "schema": "public"], + ["table": "messages", "schema": "public", "event": "UPDATE"], + ["schema": "public", "table": "messages", "event": "DELETE"], + ], + "presence": ["key": ""], ], - "presence": ["key": ""], - ], - ] - ) + ] + ) + } static let messagesSubscribed = Self( joinRef: nil, @@ -230,3 +290,9 @@ extension RealtimeMessageV2 { ] ) } + +struct TestLogger: SupabaseLogger { + func log(message: SupabaseLogMessage) { + print(message.description) + } +} diff --git a/Tests/_HelpersTests/WithTimeoutTests.swift b/Tests/_HelpersTests/WithTimeoutTests.swift new file mode 100644 index 00000000..aefa6fb8 --- /dev/null +++ b/Tests/_HelpersTests/WithTimeoutTests.swift @@ -0,0 +1,34 @@ +// +// WithTimeoutTests.swift +// +// +// Created by Guilherme Souza on 19/04/24. +// + +import _Helpers +import Foundation +import XCTest + +final class WithTimeoutTests: XCTestCase { + func testWithTimeout() async { + do { + try await withTimeout(interval: 0.25) { + try await Task.sleep(nanoseconds: NSEC_PER_SEC) + } + XCTFail("Task should timeout.") + } catch { + XCTAssertTrue(error is TimeoutError) + } + + do { + let answer = try await withTimeout(interval: 1.25) { + try await Task.sleep(nanoseconds: NSEC_PER_SEC) + return 42 + } + + XCTAssertEqual(answer, 42) + } catch { + XCTFail("Should not throw error: \(error)") + } + } +}