From ae0e5d85b477ae4fe36e8550af1df13316e24312 Mon Sep 17 00:00:00 2001 From: Guilherme Souza Date: Thu, 30 Jan 2025 08:39:38 -0300 Subject: [PATCH] fix duplicated files --- Sources/Realtime/Defaults.swift | 108 -- Sources/Realtime/Delegated.swift | 102 -- Sources/Realtime/Deprecated.swift | 81 -- Sources/Realtime/HeartbeatTimer.swift | 136 --- Sources/Realtime/PhoenixTransport.swift | 316 ------ Sources/Realtime/Presence.swift | 417 ------- Sources/Realtime/Push.swift | 265 ----- Sources/Realtime/RealtimeChannel.swift | 1038 ----------------- Sources/Realtime/RealtimeClient.swift | 1072 ------------------ Sources/Realtime/RealtimeMessage.swift | 87 -- Sources/Realtime/TimeoutTimer.swift | 108 -- Sources/Realtime/V2/CallbackManager.swift | 208 ---- Sources/Realtime/V2/PostgresAction.swift | 92 -- Sources/Realtime/V2/PostgresActionData.swift | 25 - Sources/Realtime/V2/PresenceAction.swift | 143 --- Sources/Realtime/V2/PushV2.swift | 61 - Sources/Realtime/V2/RealtimeChannelV2.swift | 568 ---------- Sources/Realtime/V2/RealtimeClientV2.swift | 558 --------- Sources/Realtime/V2/RealtimeJoinConfig.swift | 92 -- Sources/Realtime/V2/RealtimeMessageV2.swift | 82 -- Sources/Realtime/V2/Types.swift | 86 -- 21 files changed, 5645 deletions(-) delete mode 100644 Sources/Realtime/Defaults.swift delete mode 100644 Sources/Realtime/Delegated.swift delete mode 100644 Sources/Realtime/Deprecated.swift delete mode 100644 Sources/Realtime/HeartbeatTimer.swift delete mode 100644 Sources/Realtime/PhoenixTransport.swift delete mode 100644 Sources/Realtime/Presence.swift delete mode 100644 Sources/Realtime/Push.swift delete mode 100644 Sources/Realtime/RealtimeChannel.swift delete mode 100644 Sources/Realtime/RealtimeClient.swift delete mode 100644 Sources/Realtime/RealtimeMessage.swift delete mode 100644 Sources/Realtime/TimeoutTimer.swift delete mode 100644 Sources/Realtime/V2/CallbackManager.swift delete mode 100644 Sources/Realtime/V2/PostgresAction.swift delete mode 100644 Sources/Realtime/V2/PostgresActionData.swift delete mode 100644 Sources/Realtime/V2/PresenceAction.swift delete mode 100644 Sources/Realtime/V2/PushV2.swift delete mode 100644 Sources/Realtime/V2/RealtimeChannelV2.swift delete mode 100644 Sources/Realtime/V2/RealtimeClientV2.swift delete mode 100644 Sources/Realtime/V2/RealtimeJoinConfig.swift delete mode 100644 Sources/Realtime/V2/RealtimeMessageV2.swift delete mode 100644 Sources/Realtime/V2/Types.swift diff --git a/Sources/Realtime/Defaults.swift b/Sources/Realtime/Defaults.swift deleted file mode 100644 index e74f08bc..00000000 --- a/Sources/Realtime/Defaults.swift +++ /dev/null @@ -1,108 +0,0 @@ -// Copyright (c) 2021 David Stump -// -// Permission is hereby granted, free of charge, to any person obtaining a copy -// of this software and associated documentation files (the "Software"), to deal -// in the Software without restriction, including without limitation the rights -// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell -// copies of the Software, and to permit persons to whom the Software is -// furnished to do so, subject to the following conditions: -// -// The above copyright notice and this permission notice shall be included in -// all copies or substantial portions of the Software. -// -// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR -// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, -// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE -// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER -// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, -// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN -// THE SOFTWARE. - -import Foundation - -/// A collection of default values and behaviors used across the Client -public enum Defaults { - /// Default timeout when sending messages - public static let timeoutInterval: TimeInterval = 10.0 - - /// Default interval to send heartbeats on - public static let heartbeatInterval: TimeInterval = 30.0 - - /// Default maximum amount of time which the system may delay heartbeat events in order to - /// minimize power usage - public static let heartbeatLeeway: DispatchTimeInterval = .milliseconds(10) - - /// Default reconnect algorithm for the socket - public static let reconnectSteppedBackOff: (Int) -> TimeInterval = { tries in - tries > 9 ? 5.0 : [0.01, 0.05, 0.1, 0.15, 0.2, 0.25, 0.5, 1.0, 2.0][tries - 1] - } - - /** Default rejoin algorithm for individual channels */ - public static let rejoinSteppedBackOff: (Int) -> TimeInterval = { tries in - tries > 3 ? 10 : [1, 2, 5][tries - 1] - } - - public static let vsn = "2.0.0" - - /// Default encode function, utilizing JSONSerialization.data - public static let encode: (Any) -> Data = { json in - try! JSONSerialization - .data( - withJSONObject: json, - options: JSONSerialization.WritingOptions() - ) - } - - /// Default decode function, utilizing JSONSerialization.jsonObject - public static let decode: (Data) -> Any? = { data in - guard - let json = - try? JSONSerialization - .jsonObject( - with: data, - options: JSONSerialization.ReadingOptions() - ) - else { return nil } - return json - } - - public static let heartbeatQueue: DispatchQueue = .init( - label: "com.phoenix.socket.heartbeat" - ) -} - -/// Represents the multiple states that a Channel can be in -/// throughout it's lifecycle. -public enum ChannelState: String { - case closed - case errored - case joined - case joining - case leaving -} - -/// Represents the different events that can be sent through -/// a channel regarding a Channel's lifecycle. -public enum ChannelEvent { - public static let join = "phx_join" - public static let leave = "phx_leave" - public static let close = "phx_close" - public static let error = "phx_error" - public static let reply = "phx_reply" - public static let system = "system" - public static let broadcast = "broadcast" - public static let accessToken = "access_token" - public static let presence = "presence" - public static let presenceDiff = "presence_diff" - public static let presenceState = "presence_state" - public static let postgresChanges = "postgres_changes" - - public static let heartbeat = "heartbeat" - - static func isLifecyleEvent(_ event: String) -> Bool { - switch event { - case join, leave, reply, error, close: true - default: false - } - } -} diff --git a/Sources/Realtime/Delegated.swift b/Sources/Realtime/Delegated.swift deleted file mode 100644 index 6e548914..00000000 --- a/Sources/Realtime/Delegated.swift +++ /dev/null @@ -1,102 +0,0 @@ -// Copyright (c) 2021 David Stump -// -// Permission is hereby granted, free of charge, to any person obtaining a copy -// of this software and associated documentation files (the "Software"), to deal -// in the Software without restriction, including without limitation the rights -// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell -// copies of the Software, and to permit persons to whom the Software is -// furnished to do so, subject to the following conditions: -// -// The above copyright notice and this permission notice shall be included in -// all copies or substantial portions of the Software. -// -// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR -// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, -// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE -// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER -// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, -// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN -// THE SOFTWARE. - -/// Provides a memory-safe way of passing callbacks around while not creating -/// retain cycles. This file was copied from https://github.com/dreymonde/Delegated -/// instead of added as a dependency to reduce the number of packages that -/// ship with SwiftPhoenixClient -public struct Delegated { - private(set) var callback: ((Input) -> Output?)? - - public init() {} - - public mutating func delegate( - to target: Target, - with callback: @escaping (Target, Input) -> Output - ) { - self.callback = { [weak target] input in - guard let target else { - return nil - } - return callback(target, input) - } - } - - public func call(_ input: Input) -> Output? { - callback?(input) - } - - public var isDelegateSet: Bool { - callback != nil - } -} - -extension Delegated { - public mutating func stronglyDelegate( - to target: Target, - with callback: @escaping (Target, Input) -> Output - ) { - self.callback = { input in - callback(target, input) - } - } - - public mutating func manuallyDelegate(with callback: @escaping (Input) -> Output) { - self.callback = callback - } - - public mutating func removeDelegate() { - callback = nil - } -} - -extension Delegated where Input == Void { - public mutating func delegate( - to target: Target, - with callback: @escaping (Target) -> Output - ) { - delegate(to: target, with: { target, _ in callback(target) }) - } - - public mutating func stronglyDelegate( - to target: Target, - with callback: @escaping (Target) -> Output - ) { - stronglyDelegate(to: target, with: { target, _ in callback(target) }) - } -} - -extension Delegated where Input == Void { - public func call() -> Output? { - call(()) - } -} - -extension Delegated where Output == Void { - public func call(_ input: Input) { - callback?(input) - } -} - -extension Delegated where Input == Void, Output == Void { - public func call() { - call(()) - } -} diff --git a/Sources/Realtime/Deprecated.swift b/Sources/Realtime/Deprecated.swift deleted file mode 100644 index 27d32f91..00000000 --- a/Sources/Realtime/Deprecated.swift +++ /dev/null @@ -1,81 +0,0 @@ -// -// Deprecated.swift -// -// -// Created by Guilherme Souza on 23/12/23. -// - -import Foundation -import Helpers - -@available(*, deprecated, renamed: "RealtimeMessage") -public typealias Message = RealtimeMessage - -extension RealtimeClientV2 { - @available(*, deprecated, renamed: "channels") - public var subscriptions: [String: RealtimeChannelV2] { - channels - } - - @available(*, deprecated, renamed: "RealtimeClientOptions") - public struct Configuration: Sendable { - var url: URL - var apiKey: String - var headers: [String: String] - var heartbeatInterval: TimeInterval - var reconnectDelay: TimeInterval - var timeoutInterval: TimeInterval - var disconnectOnSessionLoss: Bool - var connectOnSubscribe: Bool - var logger: (any SupabaseLogger)? - - public init( - url: URL, - apiKey: String, - headers: [String: String] = [:], - heartbeatInterval: TimeInterval = 15, - reconnectDelay: TimeInterval = 7, - timeoutInterval: TimeInterval = 10, - disconnectOnSessionLoss: Bool = true, - connectOnSubscribe: Bool = true, - logger: (any SupabaseLogger)? = nil - ) { - self.url = url - self.apiKey = apiKey - self.headers = headers - self.heartbeatInterval = heartbeatInterval - self.reconnectDelay = reconnectDelay - self.timeoutInterval = timeoutInterval - self.disconnectOnSessionLoss = disconnectOnSessionLoss - self.connectOnSubscribe = connectOnSubscribe - self.logger = logger - } - } - - @available(*, deprecated, renamed: "RealtimeClientStatus") - public typealias Status = RealtimeClientStatus - - @available(*, deprecated, renamed: "RealtimeClientV2.init(url:options:)") - public convenience init(config: Configuration) { - self.init( - url: config.url, - options: RealtimeClientOptions( - headers: config.headers, - heartbeatInterval: config.heartbeatInterval, - reconnectDelay: config.reconnectDelay, - timeoutInterval: config.timeoutInterval, - disconnectOnSessionLoss: config.disconnectOnSessionLoss, - connectOnSubscribe: config.connectOnSubscribe, - logger: config.logger - ) - ) - } -} - -extension RealtimeChannelV2 { - @available(*, deprecated, renamed: "RealtimeSubscription") - public typealias Subscription = ObservationToken - - @available(*, deprecated, renamed: "RealtimeChannelStatus") - public typealias Status = RealtimeChannelStatus -} diff --git a/Sources/Realtime/HeartbeatTimer.swift b/Sources/Realtime/HeartbeatTimer.swift deleted file mode 100644 index 7bd4ccbf..00000000 --- a/Sources/Realtime/HeartbeatTimer.swift +++ /dev/null @@ -1,136 +0,0 @@ -// Copyright (c) 2021 David Stump -// -// Permission is hereby granted, free of charge, to any person obtaining a copy -// of this software and associated documentation files (the "Software"), to deal -// in the Software without restriction, including without limitation the rights -// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell -// copies of the Software, and to permit persons to whom the Software is -// furnished to do so, subject to the following conditions: -// -// The above copyright notice and this permission notice shall be included in -// all copies or substantial portions of the Software. -// -// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR -// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, -// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE -// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER -// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, -// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN -// THE SOFTWARE. - -import Foundation - -/** - Heartbeat Timer class which manages the lifecycle of the underlying - timer which triggers when a heartbeat should be fired. This heartbeat - runs on it's own Queue so that it does not interfere with the main - queue but guarantees thread safety. - */ - -class HeartbeatTimer { - // ---------------------------------------------------------------------- - - // MARK: - Dependencies - - // ---------------------------------------------------------------------- - // The interval to wait before firing the Timer - let timeInterval: TimeInterval - - /// The maximum amount of time which the system may delay the delivery of the timer events - let leeway: DispatchTimeInterval - - // The DispatchQueue to schedule the timers on - let queue: DispatchQueue - - // UUID which specifies the Timer instance. Verifies that timers are different - let uuid: String = UUID().uuidString - - // ---------------------------------------------------------------------- - - // MARK: - Properties - - // ---------------------------------------------------------------------- - // The underlying, cancelable, resettable, timer. - private var temporaryTimer: (any DispatchSourceTimer)? - // The event handler that is called by the timer when it fires. - private var temporaryEventHandler: (() -> Void)? - - /** - Create a new HeartbeatTimer - - - Parameters: - - timeInterval: Interval to fire the timer. Repeats - - queue: Queue to schedule the timer on - - leeway: The maximum amount of time which the system may delay the delivery of the timer events - */ - init( - timeInterval: TimeInterval, queue: DispatchQueue = Defaults.heartbeatQueue, - leeway: DispatchTimeInterval = Defaults.heartbeatLeeway - ) { - self.timeInterval = timeInterval - self.queue = queue - self.leeway = leeway - } - - /** - Create a new HeartbeatTimer - - - Parameter timeInterval: Interval to fire the timer. Repeats - */ - convenience init(timeInterval: TimeInterval) { - self.init(timeInterval: timeInterval, queue: Defaults.heartbeatQueue) - } - - func start(eventHandler: @escaping () -> Void) { - queue.sync { - // Create a new DispatchSourceTimer, passing the event handler - let timer = DispatchSource.makeTimerSource(flags: [], queue: queue) - timer.setEventHandler(handler: eventHandler) - - // Schedule the timer to first fire in `timeInterval` and then - // repeat every `timeInterval` - timer.schedule( - deadline: DispatchTime.now() + self.timeInterval, - repeating: self.timeInterval, - leeway: self.leeway - ) - - // Start the timer - timer.resume() - self.temporaryEventHandler = eventHandler - self.temporaryTimer = timer - } - } - - func stop() { - // Must be queued synchronously to prevent threading issues. - queue.sync { - // DispatchSourceTimer will automatically cancel when released - temporaryTimer = nil - temporaryEventHandler = nil - } - } - - /** - True if the Timer exists and has not been cancelled. False otherwise - */ - var isValid: Bool { - guard let timer = temporaryTimer else { return false } - return !timer.isCancelled - } - - /** - Calls the Timer's event handler immediately. This method - is primarily used in tests (not ideal) - */ - func fire() { - guard isValid else { return } - temporaryEventHandler?() - } -} - -extension HeartbeatTimer: Equatable { - static func == (lhs: HeartbeatTimer, rhs: HeartbeatTimer) -> Bool { - lhs.uuid == rhs.uuid - } -} diff --git a/Sources/Realtime/PhoenixTransport.swift b/Sources/Realtime/PhoenixTransport.swift deleted file mode 100644 index 79c85400..00000000 --- a/Sources/Realtime/PhoenixTransport.swift +++ /dev/null @@ -1,316 +0,0 @@ -// Copyright (c) 2021 David Stump -// -// Permission is hereby granted, free of charge, to any person obtaining a copy -// of this software and associated documentation files (the "Software"), to deal -// in the Software without restriction, including without limitation the rights -// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell -// copies of the Software, and to permit persons to whom the Software is -// furnished to do so, subject to the following conditions: -// -// The above copyright notice and this permission notice shall be included in -// all copies or substantial portions of the Software. -// -// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR -// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, -// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE -// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER -// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, -// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN -// THE SOFTWARE. - -import Foundation - -#if canImport(FoundationNetworking) - import FoundationNetworking -#endif - -// ---------------------------------------------------------------------- - -// MARK: - Transport Protocol - -// ---------------------------------------------------------------------- -/** - Defines a `Socket`'s Transport layer. - */ -// sourcery: AutoMockable -public protocol PhoenixTransport { - /// The current `ReadyState` of the `Transport` layer - var readyState: PhoenixTransportReadyState { get } - - /// Delegate for the `Transport` layer - var delegate: (any PhoenixTransportDelegate)? { get set } - - /** - Connect to the server - - - Parameters: - - headers: Headers to include in the URLRequests when opening the Websocket connection. Can be empty [:] - */ - func connect(with headers: [String: String]) - - /** - Disconnect from the server. - - - Parameters: - - code: Status code as defined by Section 7.4 of RFC 6455. - - reason: Reason why the connection is closing. Optional. - */ - func disconnect(code: Int, reason: String?) - - /** - Sends a message to the server. - - - Parameter data: Data to send. - */ - func send(data: Data) -} - -// ---------------------------------------------------------------------- - -// MARK: - Transport Delegate Protocol - -// ---------------------------------------------------------------------- -/// Delegate to receive notifications of events that occur in the `Transport` layer -public protocol PhoenixTransportDelegate { - /** - Notified when the `Transport` opens. - - - Parameter response: Response from the server indicating that the WebSocket handshake was successful and the connection has been upgraded to webSockets - */ - func onOpen(response: URLResponse?) - - /** - Notified when the `Transport` receives an error. - - - Parameter error: Client-side error from the underlying `Transport` implementation - - Parameter response: Response from the server, if any, that occurred with the Error - - */ - func onError(error: any Error, response: URLResponse?) - - /** - Notified when the `Transport` receives a message from the server. - - - Parameter message: Message received from the server - */ - func onMessage(message: String) - - /** - Notified when the `Transport` closes. - - - Parameter code: Code that was sent when the `Transport` closed - - Parameter reason: A concise human-readable prose explanation for the closure - */ - func onClose(code: Int, reason: String?) -} - -// ---------------------------------------------------------------------- - -// MARK: - Transport Ready State Enum - -// ---------------------------------------------------------------------- -/// Available `ReadyState`s of a `Transport` layer. -public enum PhoenixTransportReadyState { - /// The `Transport` is opening a connection to the server. - case connecting - - /// The `Transport` is connected to the server. - case open - - /// The `Transport` is closing the connection to the server. - case closing - - /// The `Transport` has disconnected from the server. - case closed -} - -// ---------------------------------------------------------------------- - -// MARK: - Default Websocket Transport Implementation - -// ---------------------------------------------------------------------- -/// A `Transport` implementation that relies on URLSession's native WebSocket -/// implementation. -/// -/// This implementation ships default with SwiftPhoenixClient however -/// SwiftPhoenixClient supports earlier OS versions using one of the submodule -/// `Transport` implementations. Or you can create your own implementation using -/// your own WebSocket library or implementation. -@available(macOS 10.15, iOS 13, watchOS 6, tvOS 13, *) -open class URLSessionTransport: NSObject, PhoenixTransport, URLSessionWebSocketDelegate { - /// The URL to connect to - let url: URL - - /// The URLSession configuration - let configuration: URLSessionConfiguration - - /// The underling URLSession. Assigned during `connect()` - private var session: URLSession? = nil - - /// The ongoing task. Assigned during `connect()` - private var task: URLSessionWebSocketTask? = nil - - /** - Initializes a `Transport` layer built using URLSession's WebSocket - - Example: - - ```swift - let url = URL("wss://example.com/socket") - let transport: Transport = URLSessionTransport(url: url) - ``` - - Using a custom `URLSessionConfiguration` - - ```swift - let url = URL("wss://example.com/socket") - let configuration = URLSessionConfiguration.default - let transport: Transport = URLSessionTransport(url: url, configuration: configuration) - ``` - - - parameter url: URL to connect to - - parameter configuration: Provide your own URLSessionConfiguration. Uses `.default` if none provided - */ - public init(url: URL, configuration: URLSessionConfiguration = .default) { - // URLSession requires that the endpoint be "wss" instead of "https". - let endpoint = url.absoluteString - let wsEndpoint = - endpoint - .replacingOccurrences(of: "http://", with: "ws://") - .replacingOccurrences(of: "https://", with: "wss://") - - // Force unwrapping should be safe here since a valid URL came in and we just - // replaced the protocol. - self.url = URL(string: wsEndpoint)! - self.configuration = configuration - - super.init() - } - - // MARK: - Transport - - public var readyState: PhoenixTransportReadyState = .closed - public var delegate: (any PhoenixTransportDelegate)? = nil - - public func connect(with headers: [String: String]) { - // Set the transport state as connecting - readyState = .connecting - - // Create the session and websocket task - session = URLSession(configuration: configuration, delegate: self, delegateQueue: nil) - var request = URLRequest(url: url) - - for (key, value) in headers { - guard let value = value as? String else { continue } - request.addValue(value, forHTTPHeaderField: key) - } - - task = session?.webSocketTask(with: request) - - // Start the task - task?.resume() - } - - open func disconnect(code: Int, reason: String?) { - /* - TODO: - 1. Provide a "strict" mode that fails if an invalid close code is given - 2. If strict mode is disabled, default to CloseCode.invalid - 3. Provide default .normalClosure function - */ - guard let closeCode = URLSessionWebSocketTask.CloseCode(rawValue: code) else { - fatalError("Could not create a CloseCode with invalid code: [\(code)].") - } - - readyState = .closing - task?.cancel(with: closeCode, reason: reason?.data(using: .utf8)) - session?.finishTasksAndInvalidate() - } - - open func send(data: Data) { - Task { - try? await task?.send(.string(String(data: data, encoding: .utf8)!)) - } - } - - // MARK: - URLSessionWebSocketDelegate - - open func urlSession( - _: URLSession, - webSocketTask: URLSessionWebSocketTask, - didOpenWithProtocol _: String? - ) { - // The Websocket is connected. Set Transport state to open and inform delegate - readyState = .open - delegate?.onOpen(response: webSocketTask.response) - - // Start receiving messages - receive() - } - - open func urlSession( - _: URLSession, - webSocketTask _: URLSessionWebSocketTask, - didCloseWith closeCode: URLSessionWebSocketTask.CloseCode, - reason: Data? - ) { - // A close frame was received from the server. - readyState = .closed - delegate?.onClose( - code: closeCode.rawValue, reason: reason.flatMap { String(data: $0, encoding: .utf8) } - ) - } - - open func urlSession( - _: URLSession, - task: URLSessionTask, - didCompleteWithError error: (any Error)? - ) { - // The task has terminated. Inform the delegate that the transport has closed abnormally - // if this was caused by an error. - guard let err = error else { return } - - abnormalErrorReceived(err, response: task.response) - } - - // MARK: - Private - - private func receive() { - Task { - do { - let result = try await task?.receive() - switch result { - case .data: - print("Data received. This method is unsupported by the Client") - case let .string(text): - self.delegate?.onMessage(message: text) - default: - fatalError("Unknown result was received. [\(String(describing: result))]") - } - - // Since `.receive()` is only good for a single message, it must - // be called again after a message is received in order to - // received the next message. - self.receive() - } catch { - print("Error when receiving \(error)") - self.abnormalErrorReceived(error, response: nil) - } - } - } - - private func abnormalErrorReceived(_ error: any Error, response: URLResponse?) { - // Set the state of the Transport to closed - readyState = .closed - - // Inform the Transport's delegate that an error occurred. - delegate?.onError(error: error, response: response) - - // An abnormal error is results in an abnormal closure, such as internet getting dropped - // so inform the delegate that the Transport has closed abnormally. This will kick off - // the reconnect logic. - delegate?.onClose( - code: RealtimeClient.CloseCode.abnormal.rawValue, reason: error.localizedDescription - ) - } -} diff --git a/Sources/Realtime/Presence.swift b/Sources/Realtime/Presence.swift deleted file mode 100644 index 2370697f..00000000 --- a/Sources/Realtime/Presence.swift +++ /dev/null @@ -1,417 +0,0 @@ -// Copyright (c) 2021 David Stump -// -// Permission is hereby granted, free of charge, to any person obtaining a copy -// of this software and associated documentation files (the "Software"), to deal -// in the Software without restriction, including without limitation the rights -// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell -// copies of the Software, and to permit persons to whom the Software is -// furnished to do so, subject to the following conditions: -// -// The above copyright notice and this permission notice shall be included in -// all copies or substantial portions of the Software. -// -// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR -// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, -// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE -// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER -// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, -// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN -// THE SOFTWARE. - -import Foundation - -/// The Presence object provides features for syncing presence information from -/// the server with the client and handling presences joining and leaving. -/// -/// ## Syncing state from the server -/// -/// To sync presence state from the server, first instantiate an object and pass -/// your channel in to track lifecycle events: -/// -/// let channel = socket.channel("some:topic") -/// let presence = Presence(channel) -/// -/// If you have custom syncing state events, you can configure the `Presence` -/// object to use those instead. -/// -/// let options = Options(events: [.state: "my_state", .diff: "my_diff"]) -/// let presence = Presence(channel, opts: options) -/// -/// Next, use the presence.onSync callback to react to state changes from the -/// server. For example, to render the list of users every time the list -/// changes, you could write: -/// -/// presence.onSync { renderUsers(presence.list()) } -/// -/// ## Listing Presences -/// -/// presence.list is used to return a list of presence information based on the -/// local state of metadata. By default, all presence metadata is returned, but -/// a listBy function can be supplied to allow the client to select which -/// metadata to use for a given presence. For example, you may have a user -/// online from different devices with a metadata status of "online", but they -/// have set themselves to "away" on another device. In this case, the app may -/// choose to use the "away" status for what appears on the UI. The example -/// below defines a listBy function which prioritizes the first metadata which -/// was registered for each user. This could be the first tab they opened, or -/// the first device they came online from: -/// -/// let listBy: (String, Presence.Map) -> Presence.Meta = { id, pres in -/// let first = pres["metas"]!.first! -/// first["count"] = pres["metas"]!.count -/// first["id"] = id -/// return first -/// } -/// let onlineUsers = presence.list(by: listBy) -/// -/// (NOTE: The underlying behavior is a `map` on the `presence.state`. You are -/// mapping the `state` dictionary into whatever datastructure suites your needs) -/// -/// ## Handling individual presence join and leave events -/// -/// The presence.onJoin and presence.onLeave callbacks can be used to react to -/// individual presences joining and leaving the app. For example: -/// -/// let presence = Presence(channel) -/// presence.onJoin { [weak self] (key, current, newPres) in -/// if let cur = current { -/// print("user additional presence", cur) -/// } else { -/// print("user entered for the first time", newPres) -/// } -/// } -/// -/// presence.onLeave { [weak self] (key, current, leftPres) in -/// if current["metas"]?.isEmpty == true { -/// print("user has left from all devices", leftPres) -/// } else { -/// print("user left from a device", current) -/// } -/// } -/// -/// presence.onSync { renderUsers(presence.list()) } -@available( - *, - deprecated, - renamed: "PresenceV2", - message: "Presence class is deprecated in favor of PresenceV2. See migration guide: https://github.com/supabase-community/supabase-swift/blob/main/docs/migrations/RealtimeV2%20Migration%20Guide.md" -) -public final class Presence { - // ---------------------------------------------------------------------- - - // MARK: - Enums and Structs - - // ---------------------------------------------------------------------- - /// Custom options that can be provided when creating Presence - /// - /// ### Example: - /// - /// let options = Options(events: [.state: "my_state", .diff: "my_diff"]) - /// let presence = Presence(channel, opts: options) - public struct Options { - let events: [Events: String] - - /// Default set of Options used when creating Presence. Uses the - /// phoenix events "presence_state" and "presence_diff" - public static let defaults = Options(events: [ - .state: "presence_state", - .diff: "presence_diff", - ]) - - public init(events: [Events: String]) { - self.events = events - } - } - - /// Presense Events - public enum Events: String { - case state - case diff - } - - // ---------------------------------------------------------------------- - - // MARK: - Typaliases - - // ---------------------------------------------------------------------- - /// Meta details of a Presence. Just a dictionary of properties - public typealias Meta = [String: Any] - - /// A mapping of a String to an array of Metas. e.g. {"metas": [{id: 1}]} - public typealias Map = [String: [Meta]] - - /// A mapping of a Presence state to a mapping of Metas - public typealias State = [String: Map] - - // Diff has keys "joins" and "leaves", pointing to a Presence.State each - // containing the users that joined and left. - public typealias Diff = [String: State] - - /// Closure signature of OnJoin callbacks - public typealias OnJoin = (_ key: String, _ current: Map?, _ new: Map) -> Void - - /// Closure signature for OnLeave callbacks - public typealias OnLeave = (_ key: String, _ current: Map, _ left: Map) -> Void - - //// Closure signature for OnSync callbacks - public typealias OnSync = () -> Void - - /// Collection of callbacks with default values - struct Caller { - var onJoin: OnJoin = { _, _, _ in } - var onLeave: OnLeave = { _, _, _ in } - var onSync: OnSync = {} - } - - // ---------------------------------------------------------------------- - - // MARK: - Properties - - // ---------------------------------------------------------------------- - /// The channel the Presence belongs to - weak var channel: RealtimeChannel? - - /// Caller to callback hooks - var caller: Caller - - /// The state of the Presence - public private(set) var state: State - - /// Pending `join` and `leave` diffs that need to be synced - public private(set) var pendingDiffs: [Diff] - - /// The channel's joinRef, set when state events occur - public private(set) var joinRef: String? - - public var isPendingSyncState: Bool { - guard let safeJoinRef = joinRef else { return true } - return safeJoinRef != channel?.joinRef - } - - /// Callback to be informed of joins - public var onJoin: OnJoin { - get { caller.onJoin } - set { caller.onJoin = newValue } - } - - /// Set the OnJoin callback - public func onJoin(_ callback: @escaping OnJoin) { - onJoin = callback - } - - /// Callback to be informed of leaves - public var onLeave: OnLeave { - get { caller.onLeave } - set { caller.onLeave = newValue } - } - - /// Set the OnLeave callback - public func onLeave(_ callback: @escaping OnLeave) { - onLeave = callback - } - - /// Callback to be informed of synces - public var onSync: OnSync { - get { caller.onSync } - set { caller.onSync = newValue } - } - - /// Set the OnSync callback - public func onSync(_ callback: @escaping OnSync) { - onSync = callback - } - - public init(channel: RealtimeChannel, opts: Options = Options.defaults) { - state = [:] - pendingDiffs = [] - self.channel = channel - joinRef = nil - caller = Caller() - - guard // Do not subscribe to events if they were not provided - let stateEvent = opts.events[.state], - let diffEvent = opts.events[.diff] - else { return } - - self.channel?.delegateOn(stateEvent, filter: ChannelFilter(), to: self) { (self, message) in - guard let newState = message.rawPayload as? State else { return } - - self.joinRef = self.channel?.joinRef - self.state = Presence.syncState( - self.state, - newState: newState, - onJoin: self.caller.onJoin, - onLeave: self.caller.onLeave - ) - - for diff in self.pendingDiffs { - self.state = Presence.syncDiff( - self.state, - diff: diff, - onJoin: self.caller.onJoin, - onLeave: self.caller.onLeave - ) - } - - self.pendingDiffs = [] - self.caller.onSync() - } - - self.channel?.delegateOn(diffEvent, filter: ChannelFilter(), to: self) { (self, message) in - guard let diff = message.rawPayload as? Diff else { return } - if self.isPendingSyncState { - self.pendingDiffs.append(diff) - } else { - self.state = Presence.syncDiff( - self.state, - diff: diff, - onJoin: self.caller.onJoin, - onLeave: self.caller.onLeave - ) - self.caller.onSync() - } - } - } - - /// Returns the array of presences, with deault selected metadata. - public func list() -> [Map] { - list(by: { _, pres in pres }) - } - - /// Returns the array of presences, with selected metadata - public func list(by transformer: (String, Map) -> T) -> [T] { - Presence.listBy(state, transformer: transformer) - } - - /// Filter the Presence state with a given function - public func filter(by filter: ((String, Map) -> Bool)?) -> State { - Presence.filter(state, by: filter) - } - - // ---------------------------------------------------------------------- - - // MARK: - Static - - // ---------------------------------------------------------------------- - - // Used to sync the list of presences on the server - // with the client's state. An optional `onJoin` and `onLeave` callback can - // be provided to react to changes in the client's local presences across - // disconnects and reconnects with the server. - // - // - returns: Presence.State - @discardableResult - public static func syncState( - _ currentState: State, - newState: State, - onJoin: OnJoin = { _, _, _ in }, - onLeave: OnLeave = { _, _, _ in } - ) -> State { - let state = currentState - var leaves: Presence.State = [:] - var joins: Presence.State = [:] - - for (key, presence) in state { - if newState[key] == nil { - leaves[key] = presence - } - } - - for (key, newPresence) in newState { - if let currentPresence = state[key] { - let newRefs = newPresence["metas"]!.map { $0["phx_ref"] as! String } - let curRefs = currentPresence["metas"]!.map { $0["phx_ref"] as! String } - - let joinedMetas = newPresence["metas"]!.filter { (meta: Meta) -> Bool in - !curRefs.contains { $0 == meta["phx_ref"] as! String } - } - let leftMetas = currentPresence["metas"]!.filter { (meta: Meta) -> Bool in - !newRefs.contains { $0 == meta["phx_ref"] as! String } - } - - if joinedMetas.count > 0 { - joins[key] = newPresence - joins[key]!["metas"] = joinedMetas - } - - if leftMetas.count > 0 { - leaves[key] = currentPresence - leaves[key]!["metas"] = leftMetas - } - } else { - joins[key] = newPresence - } - } - - return Presence.syncDiff( - state, - diff: ["joins": joins, "leaves": leaves], - onJoin: onJoin, - onLeave: onLeave - ) - } - - // Used to sync a diff of presence join and leave - // events from the server, as they happen. Like `syncState`, `syncDiff` - // accepts optional `onJoin` and `onLeave` callbacks to react to a user - // joining or leaving from a device. - // - // - returns: Presence.State - @discardableResult - public static func syncDiff( - _ currentState: State, - diff: Diff, - onJoin: OnJoin = { _, _, _ in }, - onLeave: OnLeave = { _, _, _ in } - ) -> State { - var state = currentState - diff["joins"]?.forEach { key, newPresence in - let currentPresence = state[key] - state[key] = newPresence - - if let curPresence = currentPresence { - let joinedRefs = state[key]!["metas"]!.map { $0["phx_ref"] as! String } - let curMetas = curPresence["metas"]!.filter { (meta: Meta) -> Bool in - !joinedRefs.contains { $0 == meta["phx_ref"] as! String } - } - state[key]!["metas"]!.insert(contentsOf: curMetas, at: 0) - } - - onJoin(key, currentPresence, newPresence) - } - - diff["leaves"]?.forEach { key, leftPresence in - guard var curPresence = state[key] else { return } - let refsToRemove = leftPresence["metas"]!.map { $0["phx_ref"] as! String } - let keepMetas = curPresence["metas"]!.filter { (meta: Meta) -> Bool in - !refsToRemove.contains { $0 == meta["phx_ref"] as! String } - } - - curPresence["metas"] = keepMetas - onLeave(key, curPresence, leftPresence) - - if keepMetas.count > 0 { - state[key]!["metas"] = keepMetas - } else { - state.removeValue(forKey: key) - } - } - - return state - } - - public static func filter( - _ presences: State, - by filter: ((String, Map) -> Bool)? - ) -> State { - let safeFilter = filter ?? { _, _ in true } - return presences.filter(safeFilter) - } - - public static func listBy( - _ presences: State, - transformer: (String, Map) -> T - ) -> [T] { - presences.map(transformer) - } -} diff --git a/Sources/Realtime/Push.swift b/Sources/Realtime/Push.swift deleted file mode 100644 index 7f681b6d..00000000 --- a/Sources/Realtime/Push.swift +++ /dev/null @@ -1,265 +0,0 @@ -// Copyright (c) 2021 David Stump -// -// Permission is hereby granted, free of charge, to any person obtaining a copy -// of this software and associated documentation files (the "Software"), to deal -// in the Software without restriction, including without limitation the rights -// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell -// copies of the Software, and to permit persons to whom the Software is -// furnished to do so, subject to the following conditions: -// -// The above copyright notice and this permission notice shall be included in -// all copies or substantial portions of the Software. -// -// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR -// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, -// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE -// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER -// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, -// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN -// THE SOFTWARE. - -import Foundation - -/// Represnts pushing data to a `Channel` through the `Socket` -public class Push { - /// The channel sending the Push - public weak var channel: RealtimeChannel? - - /// The event, for example `phx_join` - public let event: String - - /// The payload, for example ["user_id": "abc123"] - public var payload: Payload - - /// The push timeout. Default is 10.0 seconds - public var timeout: TimeInterval - - /// The server's response to the Push - var receivedMessage: RealtimeMessage? - - /// Timer which triggers a timeout event - var timeoutTimer: TimerQueue - - /// WorkItem to be performed when the timeout timer fires - var timeoutWorkItem: DispatchWorkItem? - - /// Hooks into a Push. Where .receive("ok", callback(Payload)) are stored - var receiveHooks: [PushStatus: [Delegated]] - - /// True if the Push has been sent - var sent: Bool - - /// The reference ID of the Push - var ref: String? - - /// The event that is associated with the reference ID of the Push - var refEvent: String? - - /// Initializes a Push - /// - /// - parameter channel: The Channel - /// - parameter event: The event, for example ChannelEvent.join - /// - parameter payload: Optional. The Payload to send, e.g. ["user_id": "abc123"] - /// - parameter timeout: Optional. The push timeout. Default is 10.0s - init( - channel: RealtimeChannel, - event: String, - payload: Payload = [:], - timeout: TimeInterval = Defaults.timeoutInterval - ) { - self.channel = channel - self.event = event - self.payload = payload - self.timeout = timeout - receivedMessage = nil - timeoutTimer = TimerQueue.main - receiveHooks = [:] - sent = false - ref = nil - } - - /// Resets and sends the Push - /// - parameter timeout: Optional. The push timeout. Default is 10.0s - public func resend(_ timeout: TimeInterval = Defaults.timeoutInterval) { - self.timeout = timeout - reset() - send() - } - - /// Sends the Push. If it has already timed out, then the call will - /// be ignored and return early. Use `resend` in this case. - public func send() { - guard !hasReceived(status: .timeout) else { return } - - startTimeout() - sent = true - channel?.socket?.push( - topic: channel?.topic ?? "", - event: event, - payload: payload, - ref: ref, - joinRef: channel?.joinRef - ) - } - - /// Receive a specific event when sending an Outbound message. Subscribing - /// to status events with this method does not guarantees no retain cycles. - /// You should pass `weak self` in the capture list of the callback. You - /// can call `.delegateReceive(status:, to:, callback:) and the library will - /// handle it for you. - /// - /// Example: - /// - /// channel - /// .send(event:"custom", payload: ["body": "example"]) - /// .receive("error") { [weak self] payload in - /// print("Error: ", payload) - /// } - /// - /// - parameter status: Status to receive - /// - parameter callback: Callback to fire when the status is recevied - @discardableResult - public func receive( - _ status: PushStatus, - callback: @escaping ((RealtimeMessage) -> Void) - ) -> Push { - var delegated = Delegated() - delegated.manuallyDelegate(with: callback) - - return receive(status, delegated: delegated) - } - - /// Receive a specific event when sending an Outbound message. Automatically - /// prevents retain cycles. See `manualReceive(status:, callback:)` if you - /// want to handle this yourself. - /// - /// Example: - /// - /// channel - /// .send(event:"custom", payload: ["body": "example"]) - /// .delegateReceive("error", to: self) { payload in - /// print("Error: ", payload) - /// } - /// - /// - parameter status: Status to receive - /// - parameter owner: The class that is calling .receive. Usually `self` - /// - parameter callback: Callback to fire when the status is recevied - @discardableResult - public func delegateReceive( - _ status: PushStatus, - to owner: Target, - callback: @escaping ((Target, RealtimeMessage) -> Void) - ) -> Push { - var delegated = Delegated() - delegated.delegate(to: owner, with: callback) - - return receive(status, delegated: delegated) - } - - /// Shared behavior between `receive` calls - @discardableResult - func receive(_ status: PushStatus, delegated: Delegated) -> Push { - // If the message has already been received, pass it to the callback immediately - if hasReceived(status: status), let receivedMessage { - delegated.call(receivedMessage) - } - - if receiveHooks[status] == nil { - /// Create a new array of hooks if no previous hook is associated with status - receiveHooks[status] = [delegated] - } else { - /// A previous hook for this status already exists. Just append the new hook - receiveHooks[status]?.append(delegated) - } - - return self - } - - /// Resets the Push as it was after it was first tnitialized. - func reset() { - cancelRefEvent() - ref = nil - refEvent = nil - receivedMessage = nil - sent = false - } - - /// Finds the receiveHook which needs to be informed of a status response - /// - /// - parameter status: Status which was received, e.g. "ok", "error", "timeout" - /// - parameter response: Response that was received - private func matchReceive(_ status: PushStatus, message: RealtimeMessage) { - receiveHooks[status]?.forEach { $0.call(message) } - } - - /// Reverses the result on channel.on(ChannelEvent, callback) that spawned the Push - private func cancelRefEvent() { - guard let refEvent else { return } - channel?.off(refEvent) - } - - /// Cancel any ongoing Timeout Timer - func cancelTimeout() { - timeoutWorkItem?.cancel() - timeoutWorkItem = nil - } - - /// Starts the Timer which will trigger a timeout after a specific _timeout_ - /// time, in milliseconds, is reached. - func startTimeout() { - // Cancel any existing timeout before starting a new one - if let safeWorkItem = timeoutWorkItem, !safeWorkItem.isCancelled { - cancelTimeout() - } - - guard - let channel, - let socket = channel.socket - else { return } - - let ref = socket.makeRef() - let refEvent = channel.replyEventName(ref) - - self.ref = ref - self.refEvent = refEvent - - /// If a response is received before the Timer triggers, cancel timer - /// and match the received event to it's corresponding hook - channel.delegateOn(refEvent, filter: ChannelFilter(), to: self) { (self, message) in - self.cancelRefEvent() - self.cancelTimeout() - self.receivedMessage = message - - /// Check if there is event a status available - guard let status = message.status else { return } - self.matchReceive(status, message: message) - } - - /// Setup and start the Timeout timer. - let workItem = DispatchWorkItem { - self.trigger(.timeout, payload: [:]) - } - - timeoutWorkItem = workItem - timeoutTimer.queue(timeInterval: timeout, execute: workItem) - } - - /// Checks if a status has already been received by the Push. - /// - /// - parameter status: Status to check - /// - return: True if given status has been received by the Push. - func hasReceived(status: PushStatus) -> Bool { - receivedMessage?.status == status - } - - /// Triggers an event to be sent though the Channel - func trigger(_ status: PushStatus, payload: Payload) { - /// If there is no ref event, then there is nothing to trigger on the channel - guard let refEvent else { return } - - var mutPayload = payload - mutPayload["status"] = status.rawValue - - channel?.trigger(event: refEvent, payload: mutPayload) - } -} diff --git a/Sources/Realtime/RealtimeChannel.swift b/Sources/Realtime/RealtimeChannel.swift deleted file mode 100644 index 380df82f..00000000 --- a/Sources/Realtime/RealtimeChannel.swift +++ /dev/null @@ -1,1038 +0,0 @@ -// Copyright (c) 2021 David Stump -// -// Permission is hereby granted, free of charge, to any person obtaining a copy -// of this software and associated documentation files (the "Software"), to deal -// in the Software without restriction, including without limitation the rights -// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell -// copies of the Software, and to permit persons to whom the Software is -// furnished to do so, subject to the following conditions: -// -// The above copyright notice and this permission notice shall be included in -// all copies or substantial portions of the Software. -// -// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR -// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, -// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE -// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER -// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, -// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN -// THE SOFTWARE. - -import ConcurrencyExtras -import Foundation -import Helpers -import Swift -import HTTPTypes - -/// Container class of bindings to the channel -struct Binding { - let type: String - let filter: [String: String] - - // The callback to be triggered - let callback: Delegated - - let id: String? -} - -public struct ChannelFilter { - public var event: String? - public var schema: String? - public let table: String? - public let filter: String? - - public init( - event: String? = nil, schema: String? = nil, table: String? = nil, filter: String? = nil - ) { - self.event = event - self.schema = schema - self.table = table - self.filter = filter - } - - var asDictionary: [String: String] { - [ - "event": event, - "schema": schema, - "table": table, - "filter": filter, - ].compactMapValues { $0 } - } -} - -public enum ChannelResponse { - case ok, timedOut, error -} - -public enum RealtimeListenTypes: String { - case postgresChanges = "postgres_changes" - case broadcast - case presence -} - -/// Represents the broadcast and presence options for a channel. -public struct RealtimeChannelOptions { - /// Used to track presence payload across clients. Must be unique per client. If `nil`, the server - /// will generate one. - var presenceKey: String? - /// Enables the client to receive their own`broadcast` messages - var broadcastSelf: Bool - /// Instructs the server to acknowledge the client's `broadcast` messages - var broadcastAcknowledge: Bool - - public init( - presenceKey: String? = nil, - broadcastSelf: Bool = false, - broadcastAcknowledge: Bool = false - ) { - self.presenceKey = presenceKey - self.broadcastSelf = broadcastSelf - self.broadcastAcknowledge = broadcastAcknowledge - } - - /// Parameters used to configure the channel - var params: [String: [String: Any]] { - [ - "config": [ - "presence": [ - "key": presenceKey ?? "", - ], - "broadcast": [ - "ack": broadcastAcknowledge, - "self": broadcastSelf, - ], - ], - ] - } -} - -public enum RealtimeSubscribeStates { - case subscribed - case timedOut - case closed - case channelError -} - -/// -/// Represents a RealtimeChannel which is bound to a topic -/// -/// A RealtimeChannel can bind to multiple events on a given topic and -/// be informed when those events occur within a topic. -/// -/// ### Example: -/// -/// let channel = socket.channel("room:123", params: ["token": "Room Token"]) -/// channel.on("new_msg") { payload in print("Got message", payload") } -/// channel.push("new_msg, payload: ["body": "This is a message"]) -/// .receive("ok") { payload in print("Sent message", payload) } -/// .receive("error") { payload in print("Send failed", payload) } -/// .receive("timeout") { payload in print("Networking issue...", payload) } -/// -/// channel.join() -/// .receive("ok") { payload in print("RealtimeChannel Joined", payload) } -/// .receive("error") { payload in print("Failed ot join", payload) } -/// .receive("timeout") { payload in print("Networking issue...", payload) } -/// -@available( - *, - deprecated, - message: "Use new RealtimeChannelV2 class instead. See migration guide: https://github.com/supabase-community/supabase-swift/blob/main/docs/migrations/RealtimeV2%20Migration%20Guide.md" -) -public class RealtimeChannel { - /// The topic of the RealtimeChannel. e.g. "rooms:friends" - public let topic: String - - /// The params sent when joining the channel - public var params: Payload { - didSet { joinPush.payload = params } - } - - public private(set) lazy var presence = Presence(channel: self) - - /// The Socket that the channel belongs to - weak var socket: RealtimeClient? - - var subTopic: String - - /// Current state of the RealtimeChannel - var state: ChannelState - - /// Collection of event bindings - let bindings: LockIsolated<[String: [Binding]]> - - /// Timeout when attempting to join a RealtimeChannel - var timeout: TimeInterval - - /// Set to true once the channel calls .join() - var joinedOnce: Bool - - /// Push to send when the channel calls .join() - var joinPush: Push! - - /// Buffer of Pushes that will be sent once the RealtimeChannel's socket connects - var pushBuffer: [Push] - - /// Timer to attempt to rejoin - var rejoinTimer: TimeoutTimer - - /// Refs of stateChange hooks - var stateChangeRefs: [String] - - /// Initialize a RealtimeChannel - /// - /// - parameter topic: Topic of the RealtimeChannel - /// - parameter params: Optional. Parameters to send when joining. - /// - parameter socket: Socket that the channel is a part of - init(topic: String, params: [String: Any] = [:], socket: RealtimeClient) { - state = ChannelState.closed - self.topic = topic - subTopic = topic.replacingOccurrences(of: "realtime:", with: "") - self.params = params - self.socket = socket - bindings = LockIsolated([:]) - timeout = socket.timeout - joinedOnce = false - pushBuffer = [] - stateChangeRefs = [] - rejoinTimer = TimeoutTimer() - - // Setup Timer delgation - rejoinTimer.callback - .delegate(to: self) { (self) in - if self.socket?.isConnected == true { self.rejoin() } - } - - rejoinTimer.timerCalculation - .delegate(to: self) { (self, tries) -> TimeInterval in - self.socket?.rejoinAfter(tries) ?? 5.0 - } - - // Respond to socket events - let onErrorRef = self.socket?.delegateOnError( - to: self, - callback: { (self, _) in - self.rejoinTimer.reset() - } - ) - if let ref = onErrorRef { stateChangeRefs.append(ref) } - - let onOpenRef = self.socket?.delegateOnOpen( - to: self, - callback: { (self) in - self.rejoinTimer.reset() - if self.isErrored { self.rejoin() } - } - ) - if let ref = onOpenRef { stateChangeRefs.append(ref) } - - // Setup Push Event to be sent when joining - joinPush = Push( - channel: self, - event: ChannelEvent.join, - payload: self.params, - timeout: timeout - ) - - /// Handle when a response is received after join() - joinPush.delegateReceive(.ok, to: self) { (self, _) in - // Mark the RealtimeChannel as joined - self.state = ChannelState.joined - - // Reset the timer, preventing it from attempting to join again - self.rejoinTimer.reset() - - // Send and buffered messages and clear the buffer - self.pushBuffer.forEach { $0.send() } - self.pushBuffer = [] - } - - // Perform if RealtimeChannel errors while attempting to joi - joinPush.delegateReceive(.error, to: self) { (self, _) in - self.state = .errored - if self.socket?.isConnected == true { self.rejoinTimer.scheduleTimeout() } - } - - // Handle when the join push times out when sending after join() - joinPush.delegateReceive(.timeout, to: self) { (self, _) in - // log that the channel timed out - self.socket?.logItems( - "channel", "timeout \(self.topic) \(self.joinRef ?? "") after \(self.timeout)s" - ) - - // Send a Push to the server to leave the channel - let leavePush = Push( - channel: self, - event: ChannelEvent.leave, - timeout: self.timeout - ) - leavePush.send() - - // Mark the RealtimeChannel as in an error and attempt to rejoin if socket is connected - self.state = ChannelState.errored - self.joinPush.reset() - - if self.socket?.isConnected == true { self.rejoinTimer.scheduleTimeout() } - } - - /// Perfom when the RealtimeChannel has been closed - delegateOnClose(to: self) { (self, _) in - // Reset any timer that may be on-going - self.rejoinTimer.reset() - - // Log that the channel was left - self.socket?.logItems( - "channel", "close topic: \(self.topic) joinRef: \(self.joinRef ?? "nil")" - ) - - // Mark the channel as closed and remove it from the socket - self.state = ChannelState.closed - self.socket?.remove(self) - } - - /// Perfom when the RealtimeChannel errors - delegateOnError(to: self) { (self, message) in - // Log that the channel received an error - self.socket?.logItems( - "channel", "error topic: \(self.topic) joinRef: \(self.joinRef ?? "nil") mesage: \(message)" - ) - - // If error was received while joining, then reset the Push - if self.isJoining { - // Make sure that the "phx_join" isn't buffered to send once the socket - // reconnects. The channel will send a new join event when the socket connects. - if let safeJoinRef = self.joinRef { - self.socket?.removeFromSendBuffer(ref: safeJoinRef) - } - - // Reset the push to be used again later - self.joinPush.reset() - } - - // Mark the channel as errored and attempt to rejoin if socket is currently connected - self.state = ChannelState.errored - if self.socket?.isConnected == true { self.rejoinTimer.scheduleTimeout() } - } - - // Perform when the join reply is received - delegateOn(ChannelEvent.reply, filter: ChannelFilter(), to: self) { (self, message) in - // Trigger bindings - self.trigger( - event: self.replyEventName(message.ref), - payload: message.rawPayload, - ref: message.ref, - joinRef: message.joinRef - ) - } - } - - deinit { - rejoinTimer.reset() - } - - /// Overridable message hook. Receives all events for specialized message - /// handling before dispatching to the channel callbacks. - /// - /// - parameter msg: The Message received by the client from the server - /// - return: Must return the message, modified or unmodified - public var onMessage: (_ message: RealtimeMessage) -> RealtimeMessage = { message in - message - } - - /// Joins the channel - /// - /// - parameter timeout: Optional. Defaults to RealtimeChannel's timeout - /// - return: Push event - @discardableResult - public func subscribe( - timeout: TimeInterval? = nil, - callback: ((RealtimeSubscribeStates, (any Error)?) -> Void)? = nil - ) -> RealtimeChannel { - if socket?.isConnected == false { - socket?.connect() - } - - guard !joinedOnce else { - fatalError( - "tried to join multiple times. 'join' " - + "can only be called a single time per channel instance" - ) - } - - onError { message in - let values = message.payload.values.map { "\($0) " } - let error = RealtimeError(values.isEmpty ? "error" : values.joined(separator: ", ")) - callback?(.channelError, error) - } - - onClose { _ in - callback?(.closed, nil) - } - - // Join the RealtimeChannel - if let safeTimeout = timeout { - self.timeout = safeTimeout - } - - let broadcast = params["config", as: [String: Any].self]?["broadcast"] - let presence = params["config", as: [String: Any].self]?["presence"] - - var accessTokenPayload: Payload = [:] - var config: Payload = [ - "postgres_changes": bindings.value["postgres_changes"]?.map(\.filter) ?? [], - ] - - config["broadcast"] = broadcast - config["presence"] = presence - - if let accessToken = socket?.accessToken { - accessTokenPayload["access_token"] = accessToken - } - - params["config"] = config - - joinedOnce = true - rejoin() - - joinPush - .delegateReceive(.ok, to: self) { (self, message) in - if self.socket?.accessToken != nil { - self.socket?.setAuth(self.socket?.accessToken) - } - - guard let serverPostgresFilters = message.payload["postgres_changes"] as? [[String: Any]] - else { - callback?(.subscribed, nil) - return - } - - let clientPostgresBindings = self.bindings.value["postgres_changes"] ?? [] - let bindingsCount = clientPostgresBindings.count - var newPostgresBindings: [Binding] = [] - - for i in 0 ..< bindingsCount { - let clientPostgresBinding = clientPostgresBindings[i] - - let event = clientPostgresBinding.filter["event"] - let schema = clientPostgresBinding.filter["schema"] - let table = clientPostgresBinding.filter["table"] - let filter = clientPostgresBinding.filter["filter"] - - let serverPostgresFilter = serverPostgresFilters[i] - - if serverPostgresFilter["event", as: String.self] == event, - serverPostgresFilter["schema", as: String.self] == schema, - serverPostgresFilter["table", as: String.self] == table, - serverPostgresFilter["filter", as: String.self] == filter - { - newPostgresBindings.append( - Binding( - type: clientPostgresBinding.type, - filter: clientPostgresBinding.filter, - callback: clientPostgresBinding.callback, - id: serverPostgresFilter["id", as: Int.self].flatMap(String.init) - ) - ) - } else { - self.unsubscribe() - callback?( - .channelError, - RealtimeError("Mismatch between client and server bindings for postgres changes.") - ) - return - } - } - - self.bindings.withValue { [newPostgresBindings] in - $0["postgres_changes"] = newPostgresBindings - } - callback?(.subscribed, nil) - } - .delegateReceive(.error, to: self) { _, message in - let values = message.payload.values.map { "\($0) " } - let error = RealtimeError(values.isEmpty ? "error" : values.joined(separator: ", ")) - callback?(.channelError, error) - } - .delegateReceive(.timeout, to: self) { _, _ in - callback?(.timedOut, nil) - } - - return self - } - - public func presenceState() -> Presence.State { - presence.state - } - - public func track(_ payload: Payload, opts: Payload = [:]) async -> ChannelResponse { - await send( - type: .presence, - payload: [ - "event": "track", - "payload": payload, - ], - opts: opts - ) - } - - public func untrack(opts: Payload = [:]) async -> ChannelResponse { - await send( - type: .presence, - payload: ["event": "untrack"], - opts: opts - ) - } - - /// Hook into when the RealtimeChannel is closed. Does not handle retain cycles. - /// Use `delegateOnClose(to:)` for automatic handling of retain cycles. - /// - /// Example: - /// - /// let channel = socket.channel("topic") - /// channel.onClose() { [weak self] message in - /// self?.print("RealtimeChannel \(message.topic) has closed" - /// } - /// - /// - parameter handler: Called when the RealtimeChannel closes - /// - return: Ref counter of the subscription. See `func off()` - @discardableResult - public func onClose(_ handler: @escaping ((RealtimeMessage) -> Void)) -> RealtimeChannel { - on(ChannelEvent.close, filter: ChannelFilter(), handler: handler) - } - - /// Hook into when the RealtimeChannel is closed. Automatically handles retain - /// cycles. Use `onClose()` to handle yourself. - /// - /// Example: - /// - /// let channel = socket.channel("topic") - /// channel.delegateOnClose(to: self) { (self, message) in - /// self.print("RealtimeChannel \(message.topic) has closed" - /// } - /// - /// - parameter owner: Class registering the callback. Usually `self` - /// - parameter callback: Called when the RealtimeChannel closes - /// - return: Ref counter of the subscription. See `func off()` - @discardableResult - public func delegateOnClose( - to owner: Target, - callback: @escaping ((Target, RealtimeMessage) -> Void) - ) -> RealtimeChannel { - delegateOn( - ChannelEvent.close, filter: ChannelFilter(), to: owner, callback: callback - ) - } - - /// Hook into when the RealtimeChannel receives an Error. Does not handle retain - /// cycles. Use `delegateOnError(to:)` for automatic handling of retain - /// cycles. - /// - /// Example: - /// - /// let channel = socket.channel("topic") - /// channel.onError() { [weak self] (message) in - /// self?.print("RealtimeChannel \(message.topic) has errored" - /// } - /// - /// - parameter handler: Called when the RealtimeChannel closes - /// - return: Ref counter of the subscription. See `func off()` - @discardableResult - public func onError(_ handler: @escaping ((_ message: RealtimeMessage) -> Void)) - -> RealtimeChannel - { - on(ChannelEvent.error, filter: ChannelFilter(), handler: handler) - } - - /// Hook into when the RealtimeChannel receives an Error. Automatically handles - /// retain cycles. Use `onError()` to handle yourself. - /// - /// Example: - /// - /// let channel = socket.channel("topic") - /// channel.delegateOnError(to: self) { (self, message) in - /// self.print("RealtimeChannel \(message.topic) has closed" - /// } - /// - /// - parameter owner: Class registering the callback. Usually `self` - /// - parameter callback: Called when the RealtimeChannel closes - /// - return: Ref counter of the subscription. See `func off()` - @discardableResult - public func delegateOnError( - to owner: Target, - callback: @escaping ((Target, RealtimeMessage) -> Void) - ) -> RealtimeChannel { - delegateOn( - ChannelEvent.error, filter: ChannelFilter(), to: owner, callback: callback - ) - } - - /// Subscribes on channel events. Does not handle retain cycles. Use - /// `delegateOn(_:, to:)` for automatic handling of retain cycles. - /// - /// Subscription returns a ref counter, which can be used later to - /// unsubscribe the exact event listener - /// - /// Example: - /// - /// let channel = socket.channel("topic") - /// let ref1 = channel.on("event") { [weak self] (message) in - /// self?.print("do stuff") - /// } - /// let ref2 = channel.on("event") { [weak self] (message) in - /// self?.print("do other stuff") - /// } - /// channel.off("event", ref1) - /// - /// Since unsubscription of ref1, "do stuff" won't print, but "do other - /// stuff" will keep on printing on the "event" - /// - /// - parameter event: Event to receive - /// - parameter handler: Called with the event's message - /// - return: Ref counter of the subscription. See `func off()` - @discardableResult - public func on( - _ event: String, - filter: ChannelFilter, - handler: @escaping ((RealtimeMessage) -> Void) - ) -> RealtimeChannel { - var delegated = Delegated() - delegated.manuallyDelegate(with: handler) - - return on(event, filter: filter, delegated: delegated) - } - - /// Subscribes on channel events. Automatically handles retain cycles. Use - /// `on()` to handle yourself. - /// - /// Subscription returns a ref counter, which can be used later to - /// unsubscribe the exact event listener - /// - /// Example: - /// - /// let channel = socket.channel("topic") - /// let ref1 = channel.delegateOn("event", to: self) { (self, message) in - /// self?.print("do stuff") - /// } - /// let ref2 = channel.delegateOn("event", to: self) { (self, message) in - /// self?.print("do other stuff") - /// } - /// channel.off("event", ref1) - /// - /// Since unsubscription of ref1, "do stuff" won't print, but "do other - /// stuff" will keep on printing on the "event" - /// - /// - parameter event: Event to receive - /// - parameter owner: Class registering the callback. Usually `self` - /// - parameter callback: Called with the event's message - /// - return: Ref counter of the subscription. See `func off()` - @discardableResult - public func delegateOn( - _ event: String, - filter: ChannelFilter, - to owner: Target, - callback: @escaping ((Target, RealtimeMessage) -> Void) - ) -> RealtimeChannel { - var delegated = Delegated() - delegated.delegate(to: owner, with: callback) - - return on(event, filter: filter, delegated: delegated) - } - - /// Shared method between `on` and `manualOn` - @discardableResult - private func on( - _ type: String, filter: ChannelFilter, delegated: Delegated - ) -> RealtimeChannel { - bindings.withValue { - $0[type.lowercased(), default: []].append( - Binding(type: type.lowercased(), filter: filter.asDictionary, callback: delegated, id: nil) - ) - } - - return self - } - - /// Unsubscribes from a channel event. If a `ref` is given, only the exact - /// listener will be removed. Else all listeners for the `event` will be - /// removed. - /// - /// Example: - /// - /// let channel = socket.channel("topic") - /// let ref1 = channel.on("event") { _ in print("ref1 event" } - /// let ref2 = channel.on("event") { _ in print("ref2 event" } - /// let ref3 = channel.on("other_event") { _ in print("ref3 other" } - /// let ref4 = channel.on("other_event") { _ in print("ref4 other" } - /// channel.off("event", ref1) - /// channel.off("other_event") - /// - /// After this, only "ref2 event" will be printed if the channel receives - /// "event" and nothing is printed if the channel receives "other_event". - /// - /// - parameter event: Event to unsubscribe from - /// - parameter ref: Ref counter returned when subscribing. Can be omitted - public func off(_ type: String, filter: [String: String] = [:]) { - bindings.withValue { - $0[type.lowercased()] = $0[type.lowercased(), default: []].filter { bind in - !(bind.type.lowercased() == type.lowercased() && bind.filter == filter) - } - } - } - - /// Push a payload to the RealtimeChannel - /// - /// Example: - /// - /// channel - /// .push("event", payload: ["message": "hello") - /// .receive("ok") { _ in { print("message sent") } - /// - /// - parameter event: Event to push - /// - parameter payload: Payload to push - /// - parameter timeout: Optional timeout - @discardableResult - public func push( - _ event: String, - payload: Payload, - timeout: TimeInterval = Defaults.timeoutInterval - ) -> Push { - guard joinedOnce else { - fatalError( - "Tried to push \(event) to \(topic) before joining. Use channel.join() before pushing events" - ) - } - - let pushEvent = Push( - channel: self, - event: event, - payload: payload, - timeout: timeout - ) - if canPush { - pushEvent.send() - } else { - pushEvent.startTimeout() - pushBuffer.append(pushEvent) - } - - return pushEvent - } - - public func send( - type: RealtimeListenTypes, - event: String? = nil, - payload: Payload, - opts: Payload = [:] - ) async -> ChannelResponse { - var payload = payload - payload["type"] = type.rawValue - if let event { - payload["event"] = event - } - - if !canPush, type == .broadcast { - var headers = socket?.headers ?? [:] - headers["Content-Type"] = "application/json" - headers["apikey"] = socket?.accessToken - - let body = [ - "messages": [ - "topic": subTopic, - "payload": payload, - "event": event as Any, - ], - ] - - do { - let request = try HTTPRequest( - url: broadcastEndpointURL, - method: .post, - headers: HTTPFields(headers.compactMapValues { $0 }), - body: JSONSerialization.data(withJSONObject: body) - ) - - let response = try await socket?.http.send(request) - guard let response, 200 ..< 300 ~= response.statusCode else { - return .error - } - return .ok - } catch { - return .error - } - } else { - return await withCheckedContinuation { continuation in - let push = self.push( - type.rawValue, payload: payload, - timeout: (opts["timeout"] as? TimeInterval) ?? self.timeout - ) - - if let type = payload["type"] as? String, type == "broadcast", - let config = self.params["config"] as? [String: Any], - let broadcast = config["broadcast"] as? [String: Any] - { - let ack = broadcast["ack"] as? Bool - if ack == nil || ack == false { - continuation.resume(returning: .ok) - return - } - } - - push - .receive(.ok) { _ in - continuation.resume(returning: .ok) - } - .receive(.timeout) { _ in - continuation.resume(returning: .timedOut) - } - } - } - } - - /// Leaves the channel - /// - /// Unsubscribes from server events, and instructs channel to terminate on - /// server - /// - /// Triggers onClose() hooks - /// - /// To receive leave acknowledgements, use the a `receive` - /// hook to bind to the server ack, ie: - /// - /// Example: - //// - /// channel.leave().receive("ok") { _ in { print("left") } - /// - /// - parameter timeout: Optional timeout - /// - return: Push that can add receive hooks - @discardableResult - public func unsubscribe(timeout: TimeInterval = Defaults.timeoutInterval) -> Push { - // If attempting a rejoin during a leave, then reset, cancelling the rejoin - rejoinTimer.reset() - - // Now set the state to leaving - state = .leaving - - /// Delegated callback for a successful or a failed channel leave - var onCloseDelegate = Delegated() - onCloseDelegate.delegate(to: self) { (self, _) in - self.socket?.logItems("channel", "leave \(self.topic)") - - // Triggers onClose() hooks - self.trigger(event: ChannelEvent.close, payload: ["reason": "leave"]) - } - - // Push event to send to the server - let leavePush = Push( - channel: self, - event: ChannelEvent.leave, - timeout: timeout - ) - - // Perform the same behavior if successfully left the channel - // or if sending the event timed out - leavePush - .receive(.ok, delegated: onCloseDelegate) - .receive(.timeout, delegated: onCloseDelegate) - leavePush.send() - - // If the RealtimeChannel cannot send push events, trigger a success locally - if !canPush { - leavePush.trigger(.ok, payload: [:]) - } - - // Return the push so it can be bound to - return leavePush - } - - /// Overridable message hook. Receives all events for specialized message - /// handling before dispatching to the channel callbacks. - /// - /// - parameter event: The event the message was for - /// - parameter payload: The payload for the message - /// - parameter ref: The reference of the message - /// - return: Must return the payload, modified or unmodified - public func onMessage(callback: @escaping (RealtimeMessage) -> RealtimeMessage) { - onMessage = callback - } - - // ---------------------------------------------------------------------- - - // MARK: - Internal - - // ---------------------------------------------------------------------- - /// Checks if an event received by the Socket belongs to this RealtimeChannel - func isMember(_ message: RealtimeMessage) -> Bool { - // Return false if the message's topic does not match the RealtimeChannel's topic - guard message.topic == topic else { return false } - - guard - let safeJoinRef = message.joinRef, - safeJoinRef != joinRef, - ChannelEvent.isLifecyleEvent(message.event) - else { return true } - - socket?.logItems( - "channel", "dropping outdated message", message.topic, message.event, message.rawPayload, - safeJoinRef - ) - return false - } - - /// Sends the payload to join the RealtimeChannel - func sendJoin(_ timeout: TimeInterval) { - state = ChannelState.joining - joinPush.resend(timeout) - } - - /// Rejoins the channel - func rejoin(_ timeout: TimeInterval? = nil) { - // Do not attempt to rejoin if the channel is in the process of leaving - guard !isLeaving else { return } - - // Leave potentially duplicate channels - socket?.leaveOpenTopic(topic: topic) - - // Send the joinPush - sendJoin(timeout ?? self.timeout) - } - - /// Triggers an event to the correct event bindings created by - /// `channel.on("event")`. - /// - /// - parameter message: Message to pass to the event bindings - func trigger(_ message: RealtimeMessage) { - let typeLower = message.event.lowercased() - - let events = Set([ - ChannelEvent.close, - ChannelEvent.error, - ChannelEvent.leave, - ChannelEvent.join, - ]) - - if message.ref != message.joinRef, events.contains(typeLower) { - return - } - - let handledMessage = message - - let bindings: [Binding] = if ["insert", "update", "delete"].contains(typeLower) { - self.bindings.value["postgres_changes", default: []].filter { bind in - bind.filter["event"] == "*" || bind.filter["event"] == typeLower - } - } else { - self.bindings.value[typeLower, default: []].filter { bind in - if ["broadcast", "presence", "postgres_changes"].contains(typeLower) { - let bindEvent = bind.filter["event"]?.lowercased() - - if let bindId = bind.id.flatMap(Int.init) { - let ids = message.payload["ids", as: [Int].self] ?? [] - return ids.contains(bindId) - && ( - bindEvent == "*" - || bindEvent - == message.payload["data", as: [String: Any].self]?["type", as: String.self]? - .lowercased() - ) - } - - return bindEvent == "*" - || bindEvent == message.payload["event", as: String.self]?.lowercased() - } - - return bind.type.lowercased() == typeLower - } - } - - bindings.forEach { $0.callback.call(handledMessage) } - } - - /// Triggers an event to the correct event bindings created by - //// `channel.on("event")`. - /// - /// - parameter event: Event to trigger - /// - parameter payload: Payload of the event - /// - parameter ref: Ref of the event. Defaults to empty - /// - parameter joinRef: Ref of the join event. Defaults to nil - func trigger( - event: String, - payload: Payload = [:], - ref: String = "", - joinRef: String? = nil - ) { - let message = RealtimeMessage( - ref: ref, - topic: topic, - event: event, - payload: payload, - joinRef: joinRef ?? self.joinRef - ) - trigger(message) - } - - /// - parameter ref: The ref of the event push - /// - return: The event name of the reply - func replyEventName(_ ref: String) -> String { - "chan_reply_\(ref)" - } - - /// The Ref send during the join message. - var joinRef: String? { - joinPush.ref - } - - /// - return: True if the RealtimeChannel can push messages, meaning the socket - /// is connected and the channel is joined - var canPush: Bool { - socket?.isConnected == true && isJoined - } - - var broadcastEndpointURL: URL { - var url = socket?.endPoint ?? "" - url = url.replacingOccurrences(of: "^ws", with: "http", options: .regularExpression, range: nil) - url = url.replacingOccurrences( - of: "(/socket/websocket|/socket|/websocket)/?$", with: "", options: .regularExpression, - range: nil - ) - url = - "\(url.replacingOccurrences(of: "/+$", with: "", options: .regularExpression, range: nil))/api/broadcast" - return URL(string: url)! - } -} - -// ---------------------------------------------------------------------- - -// MARK: - Public API - -// ---------------------------------------------------------------------- -extension RealtimeChannel { - /// - return: True if the RealtimeChannel has been closed - public var isClosed: Bool { - state == .closed - } - - /// - return: True if the RealtimeChannel experienced an error - public var isErrored: Bool { - state == .errored - } - - /// - return: True if the channel has joined - public var isJoined: Bool { - state == .joined - } - - /// - return: True if the channel has requested to join - public var isJoining: Bool { - state == .joining - } - - /// - return: True if the channel has requested to leave - public var isLeaving: Bool { - state == .leaving - } -} - -extension [String: Any] { - subscript(_ key: Key, as _: T.Type) -> T? { - self[key] as? T - } -} diff --git a/Sources/Realtime/RealtimeClient.swift b/Sources/Realtime/RealtimeClient.swift deleted file mode 100644 index 6366dc77..00000000 --- a/Sources/Realtime/RealtimeClient.swift +++ /dev/null @@ -1,1072 +0,0 @@ -// Copyright (c) 2021 David Stump -// -// Permission is hereby granted, free of charge, to any person obtaining a copy -// of this software and associated documentation files (the "Software"), to deal -// in the Software without restriction, including without limitation the rights -// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell -// copies of the Software, and to permit persons to whom the Software is -// furnished to do so, subject to the following conditions: -// -// The above copyright notice and this permission notice shall be included in -// all copies or substantial portions of the Software. -// -// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR -// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, -// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE -// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER -// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, -// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN -// THE SOFTWARE. - -import ConcurrencyExtras -import Foundation -import Helpers - -#if canImport(FoundationNetworking) - import FoundationNetworking -#endif - -public enum SocketError: Error { - case abnormalClosureError -} - -/// Alias for a JSON dictionary [String: Any] -public typealias Payload = [String: Any] - -/// Alias for a function returning an optional JSON dictionary (`Payload?`) -public typealias PayloadClosure = () -> Payload? - -/// Struct that gathers callbacks assigned to the Socket -struct StateChangeCallbacks { - var open: LockIsolated<[(ref: String, callback: Delegated)]> = .init([]) - var close: LockIsolated<[(ref: String, callback: Delegated<(Int, String?), Void>)]> = .init([]) - var error: LockIsolated<[(ref: String, callback: Delegated<(any Error, URLResponse?), Void>)]> = - .init([]) - var message: LockIsolated<[(ref: String, callback: Delegated)]> = .init([]) -} - -/// ## Socket Connection -/// A single connection is established to the server and -/// channels are multiplexed over the connection. -/// Connect to the server using the `RealtimeClient` class: -/// -/// ```swift -/// let socket = new RealtimeClient("/socket", paramsClosure: { ["userToken": "123" ] }) -/// socket.connect() -/// ``` -/// -/// The `RealtimeClient` constructor takes the mount point of the socket, -/// the authentication params, as well as options that can be found in -/// the Socket docs, such as configuring the heartbeat. -@available( - *, - deprecated, - message: "Use new RealtimeClientV2 class instead. See migration guide: https://github.com/supabase-community/supabase-swift/blob/main/docs/migrations/RealtimeV2%20Migration%20Guide.md" -) -public class RealtimeClient: PhoenixTransportDelegate { - // ---------------------------------------------------------------------- - - // MARK: - Public Attributes - - // ---------------------------------------------------------------------- - /// The string WebSocket endpoint (ie `"ws://example.com/socket"`, - /// `"wss://example.com"`, etc.) That was passed to the Socket during - /// initialization. The URL endpoint will be modified by the Socket to - /// include `"/websocket"` if missing. - public let endPoint: String - - /// The fully qualified socket URL - public private(set) var endPointUrl: URL - - /// Resolves to return the `paramsClosure` result at the time of calling. - /// If the `Socket` was created with static params, then those will be - /// returned every time. - public var params: Payload? { - paramsClosure?() - } - - /// The optional params closure used to get params when connecting. Must - /// be set when initializing the Socket. - public let paramsClosure: PayloadClosure? - - /// The WebSocket transport. Default behavior is to provide a - /// URLSessionWebsocketTask. See README for alternatives. - private let transport: (URL) -> any PhoenixTransport - - /// Phoenix serializer version, defaults to "2.0.0" - public let vsn: String - - /// Override to provide custom encoding of data before writing to the socket - public var encode: (Any) -> Data = Defaults.encode - - /// Override to provide custom decoding of data read from the socket - public var decode: (Data) -> Any? = Defaults.decode - - /// Timeout to use when opening connections - public var timeout: TimeInterval = Defaults.timeoutInterval - - /// Custom headers to be added to the socket connection request - public var headers: [String: String] = [:] - - /// Interval between sending a heartbeat - public var heartbeatInterval: TimeInterval = Defaults.heartbeatInterval - - /// The maximum amount of time which the system may delay heartbeats in order to optimize power - /// usage - public var heartbeatLeeway: DispatchTimeInterval = Defaults.heartbeatLeeway - - /// Interval between socket reconnect attempts, in seconds - public var reconnectAfter: (Int) -> TimeInterval = Defaults.reconnectSteppedBackOff - - /// Interval between channel rejoin attempts, in seconds - public var rejoinAfter: (Int) -> TimeInterval = Defaults.rejoinSteppedBackOff - - /// The optional function to receive logs - public var logger: ((String) -> Void)? - - /// Disables heartbeats from being sent. Default is false. - public var skipHeartbeat: Bool = false - - /// Enable/Disable SSL certificate validation. Default is false. This - /// must be set before calling `socket.connect()` in order to be applied - public var disableSSLCertValidation: Bool = false - - #if os(Linux) || os(Windows) - #else - /// Configure custom SSL validation logic, eg. SSL pinning. This - /// must be set before calling `socket.connect()` in order to apply. - // public var security: SSLTrustValidator? - - /// Configure the encryption used by your client by setting the - /// allowed cipher suites supported by your server. This must be - /// set before calling `socket.connect()` in order to apply. - public var enabledSSLCipherSuites: [SSLCipherSuite]? - #endif - - // ---------------------------------------------------------------------- - - // MARK: - Private Attributes - - // ---------------------------------------------------------------------- - /// Callbacks for socket state changes - var stateChangeCallbacks: StateChangeCallbacks = .init() - - /// Collection on channels created for the Socket - public internal(set) var channels: [RealtimeChannel] = [] - - /// Buffers messages that need to be sent once the socket has connected. It is an array - /// of tuples, with the ref of the message to send and the callback that will send the message. - var sendBuffer: [(ref: String?, callback: () throws -> Void)] = [] - - /// Ref counter for messages - var ref: UInt64 = .min // 0 (max: 18,446,744,073,709,551,615) - - /// Timer that triggers sending new Heartbeat messages - var heartbeatTimer: HeartbeatTimer? - - /// Ref counter for the last heartbeat that was sent - var pendingHeartbeatRef: String? - - /// Timer to use when attempting to reconnect - var reconnectTimer: TimeoutTimer - - /// Close status - var closeStatus: CloseStatus = .unknown - - /// The connection to the server - var connection: (any PhoenixTransport)? = nil - - /// The HTTPClient to perform HTTP requests. - let http: any HTTPClientType - - var accessToken: String? - - // ---------------------------------------------------------------------- - - // MARK: - Initialization - - // ---------------------------------------------------------------------- - @available(macOS 10.15, iOS 13, watchOS 6, tvOS 13, *) - public convenience init( - _ endPoint: String, - headers: [String: String] = [:], - params: Payload? = nil, - vsn: String = Defaults.vsn - ) { - self.init( - endPoint: endPoint, - headers: headers, - transport: { url in URLSessionTransport(url: url) }, - paramsClosure: { params }, - vsn: vsn - ) - } - - @available(macOS 10.15, iOS 13, watchOS 6, tvOS 13, *) - public convenience init( - _ endPoint: String, - headers: [String: String] = [:], - paramsClosure: PayloadClosure?, - vsn: String = Defaults.vsn - ) { - self.init( - endPoint: endPoint, - headers: headers, - transport: { url in URLSessionTransport(url: url) }, - paramsClosure: paramsClosure, - vsn: vsn - ) - } - - public init( - endPoint: String, - headers: [String: String] = [:], - transport: @escaping ((URL) -> any PhoenixTransport), - paramsClosure: PayloadClosure? = nil, - vsn: String = Defaults.vsn - ) { - self.transport = transport - self.paramsClosure = paramsClosure - self.endPoint = endPoint - self.vsn = vsn - - var headers = headers - if headers["X-Client-Info"] == nil { - headers["X-Client-Info"] = "realtime-swift/\(version)" - } - self.headers = headers - http = HTTPClient(fetch: { try await URLSession.shared.data(for: $0) }, interceptors: []) - - let params = paramsClosure?() - if let jwt = (params?["Authorization"] as? String)?.split(separator: " ").last { - accessToken = String(jwt) - } else { - accessToken = params?["apikey"] as? String - } - endPointUrl = RealtimeClient.buildEndpointUrl( - endpoint: endPoint, - paramsClosure: paramsClosure, - vsn: vsn - ) - - reconnectTimer = TimeoutTimer() - reconnectTimer.callback.delegate(to: self) { (self) in - self.logItems("Socket attempting to reconnect") - self.teardown(reason: "reconnection") { self.connect() } - } - reconnectTimer.timerCalculation - .delegate(to: self) { (self, tries) -> TimeInterval in - let interval = self.reconnectAfter(tries) - self.logItems("Socket reconnecting in \(interval)s") - return interval - } - } - - deinit { - reconnectTimer.reset() - } - - // ---------------------------------------------------------------------- - - // MARK: - Public - - // ---------------------------------------------------------------------- - /// - return: The socket protocol, wss or ws - public var websocketProtocol: String { - switch endPointUrl.scheme { - case "https": "wss" - case "http": "ws" - default: endPointUrl.scheme ?? "" - } - } - - /// - return: True if the socket is connected - public var isConnected: Bool { - connectionState == .open - } - - /// - return: The state of the connect. [.connecting, .open, .closing, .closed] - public var connectionState: PhoenixTransportReadyState { - connection?.readyState ?? .closed - } - - /// Sets the JWT access token used for channel subscription authorization and Realtime RLS. - /// - Parameter token: A JWT string. - public func setAuth(_ token: String?) { - accessToken = token - - for channel in channels { - if token != nil { - channel.params["user_token"] = token - } - - if channel.joinedOnce, channel.isJoined { - channel.push(ChannelEvent.accessToken, payload: ["access_token": token as Any]) - } - } - } - - /// Connects the Socket. The params passed to the Socket on initialization - /// will be sent through the connection. If the Socket is already connected, - /// then this call will be ignored. - public func connect() { - // Do not attempt to reconnect if the socket is currently connected - guard !isConnected else { return } - - // Reset the close status when attempting to connect - closeStatus = .unknown - - // We need to build this right before attempting to connect as the - // parameters could be built upon demand and change over time - endPointUrl = RealtimeClient.buildEndpointUrl( - endpoint: endPoint, - paramsClosure: paramsClosure, - vsn: vsn - ) - - connection = transport(endPointUrl) - connection?.delegate = self - // self.connection?.disableSSLCertValidation = disableSSLCertValidation - // - // #if os(Linux) - // #else - // self.connection?.security = security - // self.connection?.enabledSSLCipherSuites = enabledSSLCipherSuites - // #endif - - connection?.connect(with: headers) - } - - /// Disconnects the socket - /// - /// - parameter code: Optional. Closing status code - /// - parameter callback: Optional. Called when disconnected - public func disconnect( - code: CloseCode = CloseCode.normal, - reason: String? = nil, - callback: (() -> Void)? = nil - ) { - // The socket was closed cleanly by the User - closeStatus = CloseStatus(closeCode: code.rawValue) - - // Reset any reconnects and teardown the socket connection - reconnectTimer.reset() - teardown(code: code, reason: reason, callback: callback) - } - - func teardown( - code: CloseCode = CloseCode.normal, reason: String? = nil, callback: (() -> Void)? = nil - ) { - connection?.delegate = nil - connection?.disconnect(code: code.rawValue, reason: reason) - connection = nil - - // The socket connection has been turndown, heartbeats are not needed - heartbeatTimer?.stop() - - // Since the connection's delegate was nil'd out, inform all state - // callbacks that the connection has closed - stateChangeCallbacks.close.value.forEach { $0.callback.call((code.rawValue, reason)) } - callback?() - } - - // ---------------------------------------------------------------------- - - // MARK: - Register Socket State Callbacks - - // ---------------------------------------------------------------------- - - /// Registers callbacks for connection open events. Does not handle retain - /// cycles. Use `delegateOnOpen(to:)` for automatic handling of retain cycles. - /// - /// Example: - /// - /// socket.onOpen() { [weak self] in - /// self?.print("Socket Connection Open") - /// } - /// - /// - parameter callback: Called when the Socket is opened - @discardableResult - public func onOpen(callback: @escaping () -> Void) -> String { - onOpen { _ in callback() } - } - - /// Registers callbacks for connection open events. Does not handle retain - /// cycles. Use `delegateOnOpen(to:)` for automatic handling of retain cycles. - /// - /// Example: - /// - /// socket.onOpen() { [weak self] response in - /// self?.print("Socket Connection Open") - /// } - /// - /// - parameter callback: Called when the Socket is opened - @discardableResult - public func onOpen(callback: @escaping (URLResponse?) -> Void) -> String { - var delegated = Delegated() - delegated.manuallyDelegate(with: callback) - - return stateChangeCallbacks.open.withValue { [delegated] in - self.append(callback: delegated, to: &$0) - } - } - - /// Registers callbacks for connection open events. Automatically handles - /// retain cycles. Use `onOpen()` to handle yourself. - /// - /// Example: - /// - /// socket.delegateOnOpen(to: self) { self in - /// self.print("Socket Connection Open") - /// } - /// - /// - parameter owner: Class registering the callback. Usually `self` - /// - parameter callback: Called when the Socket is opened - @discardableResult - public func delegateOnOpen( - to owner: T, - callback: @escaping ((T) -> Void) - ) -> String { - delegateOnOpen(to: owner) { owner, _ in callback(owner) } - } - - /// Registers callbacks for connection open events. Automatically handles - /// retain cycles. Use `onOpen()` to handle yourself. - /// - /// Example: - /// - /// socket.delegateOnOpen(to: self) { self, response in - /// self.print("Socket Connection Open") - /// } - /// - /// - parameter owner: Class registering the callback. Usually `self` - /// - parameter callback: Called when the Socket is opened - @discardableResult - public func delegateOnOpen( - to owner: T, - callback: @escaping ((T, URLResponse?) -> Void) - ) -> String { - var delegated = Delegated() - delegated.delegate(to: owner, with: callback) - - return stateChangeCallbacks.open.withValue { [delegated] in - self.append(callback: delegated, to: &$0) - } - } - - /// Registers callbacks for connection close events. Does not handle retain - /// cycles. Use `delegateOnClose(_:)` for automatic handling of retain cycles. - /// - /// Example: - /// - /// socket.onClose() { [weak self] in - /// self?.print("Socket Connection Close") - /// } - /// - /// - parameter callback: Called when the Socket is closed - @discardableResult - public func onClose(callback: @escaping () -> Void) -> String { - onClose { _, _ in callback() } - } - - /// Registers callbacks for connection close events. Does not handle retain - /// cycles. Use `delegateOnClose(_:)` for automatic handling of retain cycles. - /// - /// Example: - /// - /// socket.onClose() { [weak self] code, reason in - /// self?.print("Socket Connection Close") - /// } - /// - /// - parameter callback: Called when the Socket is closed - @discardableResult - public func onClose(callback: @escaping (Int, String?) -> Void) -> String { - var delegated = Delegated<(Int, String?), Void>() - delegated.manuallyDelegate(with: callback) - - return stateChangeCallbacks.close.withValue { [delegated] in - self.append(callback: delegated, to: &$0) - } - } - - /// Registers callbacks for connection close events. Automatically handles - /// retain cycles. Use `onClose()` to handle yourself. - /// - /// Example: - /// - /// socket.delegateOnClose(self) { self in - /// self.print("Socket Connection Close") - /// } - /// - /// - parameter owner: Class registering the callback. Usually `self` - /// - parameter callback: Called when the Socket is closed - @discardableResult - public func delegateOnClose( - to owner: T, - callback: @escaping ((T) -> Void) - ) -> String { - delegateOnClose(to: owner) { owner, _ in callback(owner) } - } - - /// Registers callbacks for connection close events. Automatically handles - /// retain cycles. Use `onClose()` to handle yourself. - /// - /// Example: - /// - /// socket.delegateOnClose(self) { self, code, reason in - /// self.print("Socket Connection Close") - /// } - /// - /// - parameter owner: Class registering the callback. Usually `self` - /// - parameter callback: Called when the Socket is closed - @discardableResult - public func delegateOnClose( - to owner: T, - callback: @escaping ((T, (Int, String?)) -> Void) - ) -> String { - var delegated = Delegated<(Int, String?), Void>() - delegated.delegate(to: owner, with: callback) - - return stateChangeCallbacks.close.withValue { [delegated] in - self.append(callback: delegated, to: &$0) - } - } - - /// Registers callbacks for connection error events. Does not handle retain - /// cycles. Use `delegateOnError(to:)` for automatic handling of retain cycles. - /// - /// Example: - /// - /// socket.onError() { [weak self] (error) in - /// self?.print("Socket Connection Error", error) - /// } - /// - /// - parameter callback: Called when the Socket errors - @discardableResult - public func onError(callback: @escaping ((any Error, URLResponse?)) -> Void) -> String { - var delegated = Delegated<(any Error, URLResponse?), Void>() - delegated.manuallyDelegate(with: callback) - - return stateChangeCallbacks.error.withValue { [delegated] in - self.append(callback: delegated, to: &$0) - } - } - - /// Registers callbacks for connection error events. Automatically handles - /// retain cycles. Use `manualOnError()` to handle yourself. - /// - /// Example: - /// - /// socket.delegateOnError(to: self) { (self, error) in - /// self.print("Socket Connection Error", error) - /// } - /// - /// - parameter owner: Class registering the callback. Usually `self` - /// - parameter callback: Called when the Socket errors - @discardableResult - public func delegateOnError( - to owner: T, - callback: @escaping ((T, (any Error, URLResponse?)) -> Void) - ) -> String { - var delegated = Delegated<(any Error, URLResponse?), Void>() - delegated.delegate(to: owner, with: callback) - - return stateChangeCallbacks.error.withValue { [delegated] in - self.append(callback: delegated, to: &$0) - } - } - - /// Registers callbacks for connection message events. Does not handle - /// retain cycles. Use `delegateOnMessage(_to:)` for automatic handling of - /// retain cycles. - /// - /// Example: - /// - /// socket.onMessage() { [weak self] (message) in - /// self?.print("Socket Connection Message", message) - /// } - /// - /// - parameter callback: Called when the Socket receives a message event - @discardableResult - public func onMessage(callback: @escaping (RealtimeMessage) -> Void) -> String { - var delegated = Delegated() - delegated.manuallyDelegate(with: callback) - - return stateChangeCallbacks.message.withValue { [delegated] in - append(callback: delegated, to: &$0) - } - } - - /// Registers callbacks for connection message events. Automatically handles - /// retain cycles. Use `onMessage()` to handle yourself. - /// - /// Example: - /// - /// socket.delegateOnMessage(self) { (self, message) in - /// self.print("Socket Connection Message", message) - /// } - /// - /// - parameter owner: Class registering the callback. Usually `self` - /// - parameter callback: Called when the Socket receives a message event - @discardableResult - public func delegateOnMessage( - to owner: T, - callback: @escaping ((T, RealtimeMessage) -> Void) - ) -> String { - var delegated = Delegated() - delegated.delegate(to: owner, with: callback) - - return stateChangeCallbacks.message.withValue { [delegated] in - self.append(callback: delegated, to: &$0) - } - } - - private func append(callback: T, to array: inout [(ref: String, callback: T)]) - -> String - { - let ref = makeRef() - array.append((ref, callback)) - return ref - } - - /// Releases all stored callback hooks (onError, onOpen, onClose, etc.) You should - /// call this method when you are finished when the Socket in order to release - /// any references held by the socket. - public func releaseCallbacks() { - stateChangeCallbacks.open.setValue([]) - stateChangeCallbacks.close.setValue([]) - stateChangeCallbacks.error.setValue([]) - stateChangeCallbacks.message.setValue([]) - } - - // ---------------------------------------------------------------------- - - // MARK: - Channel Initialization - - // ---------------------------------------------------------------------- - /// Initialize a new Channel - /// - /// Example: - /// - /// let channel = socket.channel("rooms", params: ["user_id": "abc123"]) - /// - /// - parameter topic: Topic of the channel - /// - parameter params: Optional. Parameters for the channel - /// - return: A new channel - public func channel( - _ topic: String, - params: RealtimeChannelOptions = .init() - ) -> RealtimeChannel { - let channel = RealtimeChannel( - topic: "realtime:\(topic)", params: params.params, socket: self - ) - channels.append(channel) - - return channel - } - - /// Unsubscribes and removes a single channel - public func remove(_ channel: RealtimeChannel) { - channel.unsubscribe() - off(channel.stateChangeRefs) - channels.removeAll(where: { $0.joinRef == channel.joinRef }) - - if channels.isEmpty { - disconnect() - } - } - - /// Unsubscribes and removes all channels - public func removeAllChannels() { - for channel in channels { - remove(channel) - } - } - - /// Removes `onOpen`, `onClose`, `onError,` and `onMessage` registrations. - /// - /// - /// - Parameter refs: List of refs returned by calls to `onOpen`, `onClose`, etc - public func off(_ refs: [String]) { - stateChangeCallbacks.open.withValue { - $0 = $0.filter { - !refs.contains($0.ref) - } - } - stateChangeCallbacks.close.withValue { - $0 = $0.filter { - !refs.contains($0.ref) - } - } - stateChangeCallbacks.error.withValue { - $0 = $0.filter { - !refs.contains($0.ref) - } - } - stateChangeCallbacks.message.withValue { - $0 = $0.filter { - !refs.contains($0.ref) - } - } - } - - // ---------------------------------------------------------------------- - - // MARK: - Sending Data - - // ---------------------------------------------------------------------- - /// Sends data through the Socket. This method is internal. Instead, you - /// should call `push(_:, payload:, timeout:)` on the Channel you are - /// sending an event to. - /// - /// - parameter topic: - /// - parameter event: - /// - parameter payload: - /// - parameter ref: Optional. Defaults to nil - /// - parameter joinRef: Optional. Defaults to nil - func push( - topic: String, - event: String, - payload: Payload, - ref: String? = nil, - joinRef: String? = nil - ) { - let callback: (() throws -> Void) = { [weak self] in - guard let self else { return } - let body: [Any?] = [joinRef, ref, topic, event, payload] - let data = encode(body) - - logItems("push", "Sending \(String(data: data, encoding: String.Encoding.utf8) ?? "")") - connection?.send(data: data) - } - - /// If the socket is connected, then execute the callback immediately. - if isConnected { - try? callback() - } else { - /// If the socket is not connected, add the push to a buffer which will - /// be sent immediately upon connection. - sendBuffer.append((ref: ref, callback: callback)) - } - } - - /// - return: the next message ref, accounting for overflows - public func makeRef() -> String { - ref = (ref == UInt64.max) ? 0 : ref + 1 - return String(ref) - } - - /// Logs the message. Override Socket.logger for specialized logging. noops by default - /// - /// - parameter items: List of items to be logged. Behaves just like debugPrint() - func logItems(_ items: Any...) { - let msg = items.map { String(describing: $0) }.joined(separator: ", ") - logger?("SwiftPhoenixClient: \(msg)") - } - - // ---------------------------------------------------------------------- - - // MARK: - Connection Events - - // ---------------------------------------------------------------------- - /// Called when the underlying Websocket connects to it's host - func onConnectionOpen(response: URLResponse?) { - logItems("transport", "Connected to \(endPoint)") - - // Reset the close status now that the socket has been connected - closeStatus = .unknown - - // Send any messages that were waiting for a connection - flushSendBuffer() - - // Reset how the socket tried to reconnect - reconnectTimer.reset() - - // Restart the heartbeat timer - resetHeartbeat() - - // Inform all onOpen callbacks that the Socket has opened - stateChangeCallbacks.open.value.forEach { $0.callback.call(response) } - } - - func onConnectionClosed(code: Int, reason: String?) { - logItems("transport", "close") - - // Send an error to all channels - triggerChannelError() - - // Prevent the heartbeat from triggering if the - heartbeatTimer?.stop() - - // Only attempt to reconnect if the socket did not close normally, - // or if it was closed abnormally but on client side (e.g. due to heartbeat timeout) - if closeStatus.shouldReconnect { - reconnectTimer.scheduleTimeout() - } - - stateChangeCallbacks.close.value.forEach { $0.callback.call((code, reason)) } - } - - func onConnectionError(_ error: any Error, response: URLResponse?) { - logItems("transport", error, response ?? "") - - // Send an error to all channels - triggerChannelError() - - // Inform any state callbacks of the error - stateChangeCallbacks.error.value.forEach { $0.callback.call((error, response)) } - } - - func onConnectionMessage(_ rawMessage: String) { - logItems("receive ", rawMessage) - - guard - let data = rawMessage.data(using: String.Encoding.utf8), - let json = decode(data) as? [Any?], - let message = RealtimeMessage(json: json) - else { - logItems("receive: Unable to parse JSON: \(rawMessage)") - return - } - - // Clear heartbeat ref, preventing a heartbeat timeout disconnect - if message.ref == pendingHeartbeatRef { pendingHeartbeatRef = nil } - - if message.event == "phx_close" { - print("Close Event Received") - } - - // Dispatch the message to all channels that belong to the topic - channels - .filter { $0.isMember(message) } - .forEach { $0.trigger(message) } - - // Inform all onMessage callbacks of the message - stateChangeCallbacks.message.value.forEach { $0.callback.call(message) } - } - - /// Triggers an error event to all of the connected Channels - func triggerChannelError() { - for channel in channels { - // Only trigger a channel error if it is in an "opened" state - if !(channel.isErrored || channel.isLeaving || channel.isClosed) { - channel.trigger(event: ChannelEvent.error) - } - } - } - - /// Send all messages that were buffered before the socket opened - func flushSendBuffer() { - guard isConnected, sendBuffer.count > 0 else { return } - sendBuffer.forEach { try? $0.callback() } - sendBuffer = [] - } - - /// Removes an item from the sendBuffer with the matching ref - func removeFromSendBuffer(ref: String) { - sendBuffer = sendBuffer.filter { $0.ref != ref } - } - - /// Builds a fully qualified socket `URL` from `endPoint` and `params`. - static func buildEndpointUrl( - endpoint: String, paramsClosure params: PayloadClosure?, vsn: String - ) -> URL { - guard - let url = URL(string: endpoint), - var urlComponents = URLComponents(url: url, resolvingAgainstBaseURL: false) - else { fatalError("Malformed URL: \(endpoint)") } - - // Ensure that the URL ends with "/websocket - if !urlComponents.path.contains("/websocket") { - // Do not duplicate '/' in the path - if urlComponents.path.last != "/" { - urlComponents.path.append("/") - } - - // append 'websocket' to the path - urlComponents.path.append("websocket") - } - - urlComponents.queryItems = [URLQueryItem(name: "vsn", value: vsn)] - - // If there are parameters, append them to the URL - if let params = params?() { - urlComponents.queryItems?.append( - contentsOf: params.map { - URLQueryItem(name: $0.key, value: String(describing: $0.value)) - } - ) - } - - guard let qualifiedUrl = urlComponents.url - else { fatalError("Malformed URL while adding parameters") } - return qualifiedUrl - } - - // Leaves any channel that is open that has a duplicate topic - func leaveOpenTopic(topic: String) { - guard - let dupe = channels.first(where: { $0.topic == topic && ($0.isJoined || $0.isJoining) }) - else { return } - - logItems("transport", "leaving duplicate topic: [\(topic)]") - dupe.unsubscribe() - } - - // ---------------------------------------------------------------------- - - // MARK: - Heartbeat - - // ---------------------------------------------------------------------- - func resetHeartbeat() { - // Clear anything related to the heartbeat - pendingHeartbeatRef = nil - heartbeatTimer?.stop() - - // Do not start up the heartbeat timer if skipHeartbeat is true - guard !skipHeartbeat else { return } - - heartbeatTimer = HeartbeatTimer(timeInterval: heartbeatInterval, leeway: heartbeatLeeway) - heartbeatTimer?.start(eventHandler: { [weak self] in - self?.sendHeartbeat() - }) - } - - /// Sends a heartbeat payload to the phoenix servers - func sendHeartbeat() { - // Do not send if the connection is closed - guard isConnected else { return } - - // If there is a pending heartbeat ref, then the last heartbeat was - // never acknowledged by the server. Close the connection and attempt - // to reconnect. - if let _ = pendingHeartbeatRef { - pendingHeartbeatRef = nil - logItems( - "transport", - "heartbeat timeout. Attempting to re-establish connection" - ) - - // Close the socket manually, flagging the closure as abnormal. Do not use - // `teardown` or `disconnect` as they will nil out the websocket delegate. - abnormalClose("heartbeat timeout") - - return - } - - // The last heartbeat was acknowledged by the server. Send another one - pendingHeartbeatRef = makeRef() - push( - topic: "phoenix", - event: ChannelEvent.heartbeat, - payload: [:], - ref: pendingHeartbeatRef - ) - } - - func abnormalClose(_ reason: String) { - closeStatus = .abnormal - - /* - We use NORMAL here since the client is the one determining to close the - connection. However, we set to close status to abnormal so that - the client knows that it should attempt to reconnect. - - If the server subsequently acknowledges with code 1000 (normal close), - the socket will keep the `.abnormal` close status and trigger a reconnection. - */ - connection?.disconnect(code: CloseCode.normal.rawValue, reason: reason) - } - - // ---------------------------------------------------------------------- - - // MARK: - TransportDelegate - - // ---------------------------------------------------------------------- - public func onOpen(response: URLResponse?) { - onConnectionOpen(response: response) - } - - public func onError(error: any Error, response: URLResponse?) { - onConnectionError(error, response: response) - } - - public func onMessage(message: String) { - onConnectionMessage(message) - } - - public func onClose(code: Int, reason: String? = nil) { - closeStatus.update(transportCloseCode: code) - onConnectionClosed(code: code, reason: reason) - } -} - -// ---------------------------------------------------------------------- - -// MARK: - Close Codes - -// ---------------------------------------------------------------------- -extension RealtimeClient { - public enum CloseCode: Int { - case abnormal = 999 - - case normal = 1000 - - case goingAway = 1001 - } -} - -// ---------------------------------------------------------------------- - -// MARK: - Close Status - -// ---------------------------------------------------------------------- -extension RealtimeClient { - /// Indicates the different closure states a socket can be in. - enum CloseStatus { - /// Undetermined closure state - case unknown - /// A clean closure requested either by the client or the server - case clean - /// An abnormal closure requested by the client - case abnormal - - /// Temporarily close the socket, pausing reconnect attempts. Useful on mobile - /// clients when disconnecting a because the app resigned active but should - /// reconnect when app enters active state. - case temporary - - init(closeCode: Int) { - switch closeCode { - case CloseCode.abnormal.rawValue: - self = .abnormal - case CloseCode.goingAway.rawValue: - self = .temporary - default: - self = .clean - } - } - - mutating func update(transportCloseCode: Int) { - switch self { - case .unknown, .clean, .temporary: - // Allow transport layer to override these statuses. - self = .init(closeCode: transportCloseCode) - case .abnormal: - // Do not allow transport layer to override the abnormal close status. - // The socket itself should reset it on the next connection attempt. - // See `Socket.abnormalClose(_:)` for more information. - break - } - } - - var shouldReconnect: Bool { - switch self { - case .unknown, .abnormal: - true - case .clean, .temporary: - false - } - } - } -} diff --git a/Sources/Realtime/RealtimeMessage.swift b/Sources/Realtime/RealtimeMessage.swift deleted file mode 100644 index 3feb0066..00000000 --- a/Sources/Realtime/RealtimeMessage.swift +++ /dev/null @@ -1,87 +0,0 @@ -// Copyright (c) 2021 David Stump -// -// Permission is hereby granted, free of charge, to any person obtaining a copy -// of this software and associated documentation files (the "Software"), to deal -// in the Software without restriction, including without limitation the rights -// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell -// copies of the Software, and to permit persons to whom the Software is -// furnished to do so, subject to the following conditions: -// -// The above copyright notice and this permission notice shall be included in -// all copies or substantial portions of the Software. -// -// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR -// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, -// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE -// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER -// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, -// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN -// THE SOFTWARE. - -import Foundation -import Helpers - -/// Data that is received from the Server. -public struct RealtimeMessage { - /// Reference number. Empty if missing - public let ref: String - - /// Join Reference number - let joinRef: String? - - /// Message topic - public let topic: String - - /// Message event - public let event: String - - /// The raw payload from the Message, including a nested response from - /// phx_reply events. It is recommended to use `payload` instead. - let rawPayload: Payload - - /// Message payload - public var payload: Payload { - guard let response = rawPayload["response"] as? Payload - else { return rawPayload } - return response - } - - /// Convenience accessor. Equivalent to getting the status as such: - /// ```swift - /// message.payload["status"] - /// ``` - public var status: PushStatus? { - (rawPayload["status"] as? String).flatMap(PushStatus.init(rawValue:)) - } - - init( - ref: String = "", - topic: String = "", - event: String = "", - payload: Payload = [:], - joinRef: String? = nil - ) { - self.ref = ref - self.topic = topic - self.event = event - rawPayload = payload - self.joinRef = joinRef - } - - init?(json: [Any?]) { - guard json.count > 4 else { return nil } - joinRef = json[0] as? String - ref = json[1] as? String ?? "" - - if let topic = json[2] as? String, - let event = json[3] as? String, - let payload = json[4] as? Payload - { - self.topic = topic - self.event = event - rawPayload = payload - } else { - return nil - } - } -} diff --git a/Sources/Realtime/TimeoutTimer.swift b/Sources/Realtime/TimeoutTimer.swift deleted file mode 100644 index b6b37c4c..00000000 --- a/Sources/Realtime/TimeoutTimer.swift +++ /dev/null @@ -1,108 +0,0 @@ -// Copyright (c) 2021 David Stump -// -// Permission is hereby granted, free of charge, to any person obtaining a copy -// of this software and associated documentation files (the "Software"), to deal -// in the Software without restriction, including without limitation the rights -// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell -// copies of the Software, and to permit persons to whom the Software is -// furnished to do so, subject to the following conditions: -// -// The above copyright notice and this permission notice shall be included in -// all copies or substantial portions of the Software. -// -// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR -// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, -// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE -// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER -// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, -// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN -// THE SOFTWARE. - -/// Creates a timer that can perform calculated reties by setting -/// `timerCalculation` , such as exponential backoff. -/// -/// ### Example -/// -/// let reconnectTimer = TimeoutTimer() -/// -/// // Receive a callbcak when the timer is fired -/// reconnectTimer.callback.delegate(to: self) { (_) in -/// print("timer was fired") -/// } -/// -/// // Provide timer interval calculation -/// reconnectTimer.timerCalculation.delegate(to: self) { (_, tries) -> TimeInterval in -/// return tries > 2 ? 1000 : [1000, 5000, 10000][tries - 1] -/// } -/// -/// reconnectTimer.scheduleTimeout() // fires after 1000ms -/// reconnectTimer.scheduleTimeout() // fires after 5000ms -/// reconnectTimer.reset() -/// reconnectTimer.scheduleTimeout() // fires after 1000ms - -import Foundation - -// sourcery: AutoMockable -class TimeoutTimer { - /// Callback to be informed when the underlying Timer fires - var callback = Delegated() - - /// Provides TimeInterval to use when scheduling the timer - var timerCalculation = Delegated() - - /// The work to be done when the queue fires - var workItem: DispatchWorkItem? - - /// The number of times the underlyingTimer hass been set off. - var tries: Int = 0 - - /// The Queue to execute on. In testing, this is overridden - var queue: TimerQueue = .main - - /// Resets the Timer, clearing the number of tries and stops - /// any scheduled timeout. - func reset() { - tries = 0 - clearTimer() - } - - /// Schedules a timeout callback to fire after a calculated timeout duration. - func scheduleTimeout() { - // Clear any ongoing timer, not resetting the number of tries - clearTimer() - - // Get the next calculated interval, in milliseconds. Do not - // start the timer if the interval is returned as nil. - guard let timeInterval = timerCalculation.call(tries + 1) else { return } - - let workItem = DispatchWorkItem { - self.tries += 1 - self.callback.call() - } - - self.workItem = workItem - queue.queue(timeInterval: timeInterval, execute: workItem) - } - - /// Invalidates any ongoing Timer. Will not clear how many tries have been made - private func clearTimer() { - workItem?.cancel() - workItem = nil - } -} - -/// Wrapper class around a DispatchQueue. Allows for providing a fake clock -/// during tests. -class TimerQueue { - // Can be overriden in tests - static var main = TimerQueue() - - func queue(timeInterval: TimeInterval, execute: DispatchWorkItem) { - // TimeInterval is always in seconds. Multiply it by 1000 to convert - // to milliseconds and round to the nearest millisecond. - let dispatchInterval = Int(round(timeInterval * 1000)) - - let dispatchTime = DispatchTime.now() + .milliseconds(dispatchInterval) - DispatchQueue.main.asyncAfter(deadline: dispatchTime, execute: execute) - } -} diff --git a/Sources/Realtime/V2/CallbackManager.swift b/Sources/Realtime/V2/CallbackManager.swift deleted file mode 100644 index 3d92e184..00000000 --- a/Sources/Realtime/V2/CallbackManager.swift +++ /dev/null @@ -1,208 +0,0 @@ -import ConcurrencyExtras -import Foundation -import Helpers - -final class CallbackManager: Sendable { - struct MutableState { - var id = 0 - var serverChanges: [PostgresJoinConfig] = [] - var callbacks: [RealtimeCallback] = [] - } - - private let mutableState = LockIsolated(MutableState()) - - var serverChanges: [PostgresJoinConfig] { - mutableState.serverChanges - } - - var callbacks: [RealtimeCallback] { - mutableState.callbacks - } - - deinit { - reset() - } - - @discardableResult - func addBroadcastCallback( - event: String, - callback: @escaping @Sendable (JSONObject) -> Void - ) -> Int { - mutableState.withValue { - $0.id += 1 - $0.callbacks.append( - .broadcast( - BroadcastCallback( - id: $0.id, - event: event, - callback: callback - ) - ) - ) - return $0.id - } - } - - @discardableResult - func addPostgresCallback( - filter: PostgresJoinConfig, - callback: @escaping @Sendable (AnyAction) -> Void - ) -> Int { - mutableState.withValue { - $0.id += 1 - $0.callbacks.append( - .postgres( - PostgresCallback( - id: $0.id, - filter: filter, - callback: callback - ) - ) - ) - return $0.id - } - } - - @discardableResult - func addPresenceCallback(callback: @escaping @Sendable (any PresenceAction) -> Void) -> Int { - mutableState.withValue { - $0.id += 1 - $0.callbacks.append(.presence(PresenceCallback(id: $0.id, callback: callback))) - return $0.id - } - } - - @discardableResult - func addSystemCallback(callback: @escaping @Sendable (RealtimeMessageV2) -> Void) -> Int { - mutableState.withValue { - $0.id += 1 - $0.callbacks.append(.system(SystemCallback(id: $0.id, callback: callback))) - return $0.id - } - } - - func setServerChanges(changes: [PostgresJoinConfig]) { - mutableState.withValue { - $0.serverChanges = changes - } - } - - func removeCallback(id: Int) { - mutableState.withValue { - $0.callbacks.removeAll { $0.id == id } - } - } - - func triggerPostgresChanges(ids: [Int], data: AnyAction) { - // Read mutableState at start to acquire lock once. - let mutableState = mutableState.value - - let filters = mutableState.serverChanges.filter { - ids.contains($0.id) - } - let postgresCallbacks = mutableState.callbacks.compactMap { - if case let .postgres(callback) = $0 { - return callback - } - return nil - } - - let callbacks = postgresCallbacks.filter { cc in - filters.contains { sc in - cc.filter == sc - } - } - - for item in callbacks { - item.callback(data) - } - } - - func triggerBroadcast(event: String, json: JSONObject) { - let broadcastCallbacks = mutableState.callbacks.compactMap { - if case let .broadcast(callback) = $0 { - return callback - } - return nil - } - let callbacks = broadcastCallbacks.filter { $0.event == event } - callbacks.forEach { $0.callback(json) } - } - - func triggerPresenceDiffs( - joins: [String: PresenceV2], - leaves: [String: PresenceV2], - rawMessage: RealtimeMessageV2 - ) { - let presenceCallbacks = mutableState.callbacks.compactMap { - if case let .presence(callback) = $0 { - return callback - } - return nil - } - for presenceCallback in presenceCallbacks { - presenceCallback.callback( - PresenceActionImpl( - joins: joins, - leaves: leaves, - rawMessage: rawMessage - ) - ) - } - } - - func triggerSystem(message: RealtimeMessageV2) { - let systemCallbacks = mutableState.callbacks.compactMap { - if case .system(let callback) = $0 { - return callback - } - return nil - } - - for systemCallback in systemCallbacks { - systemCallback.callback(message) - } - } - - func reset() { - mutableState.setValue(MutableState()) - } -} - -struct PostgresCallback { - var id: Int - var filter: PostgresJoinConfig - var callback: @Sendable (AnyAction) -> Void -} - -struct BroadcastCallback { - var id: Int - var event: String - var callback: @Sendable (JSONObject) -> Void -} - -struct PresenceCallback { - var id: Int - var callback: @Sendable (any PresenceAction) -> Void -} - -struct SystemCallback { - var id: Int - var callback: @Sendable (RealtimeMessageV2) -> Void -} - -enum RealtimeCallback { - case postgres(PostgresCallback) - case broadcast(BroadcastCallback) - case presence(PresenceCallback) - case system(SystemCallback) - - var id: Int { - switch self { - case let .postgres(callback): callback.id - case let .broadcast(callback): callback.id - case let .presence(callback): callback.id - case let .system(callback): callback.id - } - } -} diff --git a/Sources/Realtime/V2/PostgresAction.swift b/Sources/Realtime/V2/PostgresAction.swift deleted file mode 100644 index 320fea96..00000000 --- a/Sources/Realtime/V2/PostgresAction.swift +++ /dev/null @@ -1,92 +0,0 @@ -// -// PostgresAction.swift -// -// -// Created by Guilherme Souza on 23/12/23. -// - -import Foundation -import Helpers - -public struct Column: Equatable, Codable, Sendable { - public let name: String - public let type: String -} - -public protocol PostgresAction: Equatable, Sendable { - static var eventType: PostgresChangeEvent { get } -} - -public protocol HasRecord { - var record: JSONObject { get } -} - -public protocol HasOldRecord { - var oldRecord: JSONObject { get } -} - -public protocol HasRawMessage { - var rawMessage: RealtimeMessageV2 { get } -} - -public struct InsertAction: PostgresAction, HasRecord, HasRawMessage { - public static let eventType: PostgresChangeEvent = .insert - - public let columns: [Column] - public let commitTimestamp: Date - public let record: [String: AnyJSON] - public let rawMessage: RealtimeMessageV2 -} - -public struct UpdateAction: PostgresAction, HasRecord, HasOldRecord, HasRawMessage { - public static let eventType: PostgresChangeEvent = .update - - public let columns: [Column] - public let commitTimestamp: Date - public let record, oldRecord: [String: AnyJSON] - public let rawMessage: RealtimeMessageV2 -} - -public struct DeleteAction: PostgresAction, HasOldRecord, HasRawMessage { - public static let eventType: PostgresChangeEvent = .delete - - public let columns: [Column] - public let commitTimestamp: Date - public let oldRecord: [String: AnyJSON] - public let rawMessage: RealtimeMessageV2 -} - -public enum AnyAction: PostgresAction, HasRawMessage { - public static let eventType: PostgresChangeEvent = .all - - case insert(InsertAction) - case update(UpdateAction) - case delete(DeleteAction) - - var wrappedAction: any PostgresAction & HasRawMessage { - switch self { - case let .insert(action): action - case let .update(action): action - case let .delete(action): action - } - } - - public var rawMessage: RealtimeMessageV2 { - wrappedAction.rawMessage - } -} - -extension HasRecord { - public func decodeRecord(as _: T.Type = T.self, decoder: JSONDecoder) throws -> T { - try record.decode(as: T.self, decoder: decoder) - } -} - -extension HasOldRecord { - public func decodeOldRecord( - as _: T.Type = T.self, - decoder: JSONDecoder - ) throws -> T { - try oldRecord.decode(as: T.self, decoder: decoder) - } -} diff --git a/Sources/Realtime/V2/PostgresActionData.swift b/Sources/Realtime/V2/PostgresActionData.swift deleted file mode 100644 index bf46856d..00000000 --- a/Sources/Realtime/V2/PostgresActionData.swift +++ /dev/null @@ -1,25 +0,0 @@ -// -// PostgresActionData.swift -// -// -// Created by Guilherme Souza on 26/12/23. -// - -import Foundation -import Helpers - -struct PostgresActionData: Codable { - var type: String - var record: [String: AnyJSON]? - var oldRecord: [String: AnyJSON]? - var columns: [Column] - var commitTimestamp: Date - - enum CodingKeys: String, CodingKey { - case type - case record - case oldRecord = "old_record" - case columns - case commitTimestamp = "commit_timestamp" - } -} diff --git a/Sources/Realtime/V2/PresenceAction.swift b/Sources/Realtime/V2/PresenceAction.swift deleted file mode 100644 index f019ea34..00000000 --- a/Sources/Realtime/V2/PresenceAction.swift +++ /dev/null @@ -1,143 +0,0 @@ -// -// PresenceAction.swift -// -// -// Created by Guilherme Souza on 24/12/23. -// - -import Foundation -import Helpers - -public struct PresenceV2: Hashable, Sendable { - /// The presence reference of the object. - public let ref: String - - /// The object the other client is tracking. Can be done via the - /// ``RealtimeChannelV2/track(state:)`` method. - public let state: JSONObject -} - -extension PresenceV2: Codable { - struct _StringCodingKey: CodingKey { - var stringValue: String - - init(_ stringValue: String) { - self.init(stringValue: stringValue)! - } - - init?(stringValue: String) { - self.stringValue = stringValue - } - - var intValue: Int? - - init?(intValue: Int) { - stringValue = "\(intValue)" - self.intValue = intValue - } - } - - public init(from decoder: any Decoder) throws { - let container = try decoder.singleValueContainer() - - let json = try container.decode(JSONObject.self) - - let codingPath = container.codingPath + [ - _StringCodingKey("metas"), - _StringCodingKey(intValue: 0)!, - ] - - guard var meta = json["metas"]?.arrayValue?.first?.objectValue else { - throw DecodingError.typeMismatch( - JSONObject.self, - DecodingError.Context( - codingPath: codingPath, - debugDescription: "A presence should at least have a phx_ref." - ) - ) - } - - guard let presenceRef = meta["phx_ref"]?.stringValue else { - throw DecodingError.typeMismatch( - String.self, - DecodingError.Context( - codingPath: codingPath + [_StringCodingKey("phx_ref")], - debugDescription: "A presence should at least have a phx_ref." - ) - ) - } - - meta["phx_ref"] = nil - self = PresenceV2(ref: presenceRef, state: meta) - } - - public func encode(to encoder: any Encoder) throws { - var container = encoder.container(keyedBy: _StringCodingKey.self) - try container.encode(ref, forKey: _StringCodingKey("phx_ref")) - try container.encode(state, forKey: _StringCodingKey("state")) - } - - /// Decode ``state``. - /// - /// - Note: You can also receive your own presence, but without your state so be aware of - /// exceptions. - public func decodeState( - as _: T.Type = T.self, - decoder: JSONDecoder = AnyJSON.decoder - ) throws -> T { - try state.decode(as: T.self, decoder: decoder) - } -} - -/// Represents a presence action. -public protocol PresenceAction: Sendable, HasRawMessage { - /// Represents a map of ``PresenceV2`` objects indexed by their key. - /// - /// Your own key can be customized when creating the channel within the presence config. - var joins: [String: PresenceV2] { get } - - /// Represents a map of ``PresenceV2`` objects indexed by their key. - /// - /// Your own key can be customized when creating the channel within the presence config. - var leaves: [String: PresenceV2] { get } -} - -extension PresenceAction { - /// Decode all ``PresenceAction/joins`` values. - /// - Parameters: - /// - ignoreOtherTypes: Whether to ignore presences which cannot be decoded such as your own - /// presence. - public func decodeJoins( - as _: T.Type = T.self, - ignoreOtherTypes: Bool = true, - decoder: JSONDecoder = AnyJSON.decoder - ) throws -> [T] { - if ignoreOtherTypes { - return joins.values.compactMap { try? $0.decodeState(as: T.self, decoder: decoder) } - } - - return try joins.values.map { try $0.decodeState(as: T.self, decoder: decoder) } - } - - /// Decode all ``PresenceAction/leaves`` values. - /// - Parameters: - /// - ignoreOtherTypes: Whether to ignore presences which cannot be decoded such as your own - /// presence. - public func decodeLeaves( - as _: T.Type = T.self, - ignoreOtherTypes: Bool = true, - decoder: JSONDecoder = AnyJSON.decoder - ) throws -> [T] { - if ignoreOtherTypes { - return leaves.values.compactMap { try? $0.decodeState(as: T.self, decoder: decoder) } - } - - return try leaves.values.map { try $0.decodeState(as: T.self, decoder: decoder) } - } -} - -struct PresenceActionImpl: PresenceAction { - var joins: [String: PresenceV2] - var leaves: [String: PresenceV2] - var rawMessage: RealtimeMessageV2 -} diff --git a/Sources/Realtime/V2/PushV2.swift b/Sources/Realtime/V2/PushV2.swift deleted file mode 100644 index 884fc981..00000000 --- a/Sources/Realtime/V2/PushV2.swift +++ /dev/null @@ -1,61 +0,0 @@ -// -// PushV2.swift -// -// -// Created by Guilherme Souza on 02/01/24. -// - -import Foundation -import Helpers - -/// Represents the different status of a push -public enum PushStatus: String, Sendable { - case ok - case error - case timeout -} - -actor PushV2 { - private weak var channel: RealtimeChannelV2? - let message: RealtimeMessageV2 - - private var receivedContinuation: CheckedContinuation? - - init(channel: RealtimeChannelV2?, message: RealtimeMessageV2) { - self.channel = channel - self.message = message - } - - func send() async -> PushStatus { - guard let channel = channel else { - return .error - } - - channel.socket.push(message) - - if !channel.config.broadcast.acknowledgeBroadcasts { - // channel was configured with `ack = false`, - // don't wait for a response and return `ok`. - return .ok - } - - do { - return try await withTimeout(interval: channel.socket.options.timeoutInterval) { - await withCheckedContinuation { continuation in - self.receivedContinuation = continuation - } - } - } catch is TimeoutError { - channel.logger?.debug("Push timed out.") - return .timeout - } catch { - channel.logger?.error("Error sending push: \(error.localizedDescription)") - return .error - } - } - - func didReceive(status: PushStatus) { - receivedContinuation?.resume(returning: status) - receivedContinuation = nil - } -} diff --git a/Sources/Realtime/V2/RealtimeChannelV2.swift b/Sources/Realtime/V2/RealtimeChannelV2.swift deleted file mode 100644 index 50c1958e..00000000 --- a/Sources/Realtime/V2/RealtimeChannelV2.swift +++ /dev/null @@ -1,568 +0,0 @@ -import ConcurrencyExtras -import Foundation -import HTTPTypes -import Helpers -import IssueReporting - -#if canImport(FoundationNetworking) - import FoundationNetworking - - extension HTTPURLResponse { - convenience init() { - self.init( - url: URL(string: "http://127.0.0.1")!, - statusCode: 200, - httpVersion: nil, - headerFields: nil - )! - } - } -#endif - -public struct RealtimeChannelConfig: Sendable { - public var broadcast: BroadcastJoinConfig - public var presence: PresenceJoinConfig - public var isPrivate: Bool -} - -public final class RealtimeChannelV2: Sendable { - struct MutableState { - var clientChanges: [PostgresJoinConfig] = [] - var joinRef: String? - var pushes: [String: PushV2] = [:] - } - - private let mutableState = LockIsolated(MutableState()) - - let topic: String - let config: RealtimeChannelConfig - let logger: (any SupabaseLogger)? - let socket: RealtimeClientV2 - var joinRef: String? { mutableState.joinRef } - - let callbackManager = CallbackManager() - private let statusSubject = AsyncValueSubject(.unsubscribed) - - public private(set) var status: RealtimeChannelStatus { - get { statusSubject.value } - set { statusSubject.yield(newValue) } - } - - public var statusChange: AsyncStream { - statusSubject.values - } - - /// Listen for connection status changes. - /// - Parameter listener: Closure that will be called when connection status changes. - /// - Returns: An observation handle that can be used to stop listening. - /// - /// - Note: Use ``statusChange`` if you prefer to use Async/Await. - public func onStatusChange( - _ listener: @escaping @Sendable (RealtimeChannelStatus) -> Void - ) -> RealtimeSubscription { - let task = statusSubject.onChange { listener($0) } - return RealtimeSubscription { task.cancel() } - } - - init( - topic: String, - config: RealtimeChannelConfig, - socket: RealtimeClientV2, - logger: (any SupabaseLogger)? - ) { - self.topic = topic - self.config = config - self.logger = logger - self.socket = socket - } - - deinit { - callbackManager.reset() - } - - /// Subscribes to the channel - public func subscribe() async { - if socket.status != .connected { - if socket.options.connectOnSubscribe != true { - reportIssue( - "You can't subscribe to a channel while the realtime client is not connected. Did you forget to call `realtime.connect()`?" - ) - return - } - await socket.connect() - } - - status = .subscribing - logger?.debug("Subscribing to channel \(topic)") - - let joinConfig = RealtimeJoinConfig( - broadcast: config.broadcast, - presence: config.presence, - postgresChanges: mutableState.clientChanges, - isPrivate: config.isPrivate - ) - - let payload = RealtimeJoinPayload( - config: joinConfig, - accessToken: await socket._getAccessToken() - ) - - let joinRef = socket.makeRef() - mutableState.withValue { $0.joinRef = joinRef } - - logger?.debug("Subscribing to channel with body: \(joinConfig)") - - await push( - ChannelEvent.join, - ref: joinRef, - payload: try! JSONObject(payload) - ) - - do { - try await withTimeout(interval: socket.options.timeoutInterval) { [self] in - _ = await statusChange.first { @Sendable in $0 == .subscribed } - } - } catch { - if error is TimeoutError { - logger?.debug("Subscribe timed out.") - await subscribe() - } else { - logger?.error("Subscribe failed: \(error)") - } - } - } - - public func unsubscribe() async { - status = .unsubscribing - logger?.debug("Unsubscribing from channel \(topic)") - - await push(ChannelEvent.leave) - } - - @available( - *, - deprecated, - message: - "manually updating auth token per channel is not recommended, please use `setAuth` in RealtimeClient instead." - ) - public func updateAuth(jwt: String?) async { - logger?.debug("Updating auth token for channel \(topic)") - await push( - ChannelEvent.accessToken, - payload: ["access_token": jwt.map { .string($0) } ?? .null] - ) - } - - /// Send a broadcast message with `event` and a `Codable` payload. - /// - Parameters: - /// - event: Broadcast message event. - /// - message: Message payload. - public func broadcast(event: String, message: some Codable) async throws { - try await broadcast(event: event, message: JSONObject(message)) - } - - /// Send a broadcast message with `event` and a raw `JSON` payload. - /// - Parameters: - /// - event: Broadcast message event. - /// - message: Message payload. - public func broadcast(event: String, message: JSONObject) async { - if status != .subscribed { - struct Message: Encodable { - let topic: String - let event: String - let payload: JSONObject - let `private`: Bool - } - - var headers: HTTPFields = [.contentType: "application/json"] - if let apiKey = socket.options.apikey { - headers[.apiKey] = apiKey - } - if let accessToken = await socket._getAccessToken() { - headers[.authorization] = "Bearer \(accessToken)" - } - - let task = Task { [headers] in - _ = try? await socket.http.send( - HTTPRequest( - url: socket.broadcastURL, - method: .post, - headers: headers, - body: JSONEncoder().encode( - [ - "messages": [ - Message( - topic: topic, - event: event, - payload: message, - private: config.isPrivate - ) - ] - ] - ) - ) - ) - } - - if config.broadcast.acknowledgeBroadcasts { - try? await withTimeout(interval: socket.options.timeoutInterval) { - await task.value - } - } - } else { - await push( - ChannelEvent.broadcast, - payload: [ - "type": "broadcast", - "event": .string(event), - "payload": .object(message), - ] - ) - } - } - - /// Tracks the given state in the channel. - /// - Parameter state: The state to be tracked, conforming to `Codable`. - /// - Throws: An error if the tracking fails. - public func track(_ state: some Codable) async throws { - try await track(state: JSONObject(state)) - } - - /// Tracks the given state in the channel. - /// - Parameter state: The state to be tracked as a `JSONObject`. - public func track(state: JSONObject) async { - if status != .subscribed { - reportIssue( - "You can only track your presence after subscribing to the channel. Did you forget to call `channel.subscribe()`?" - ) - } - - await push( - ChannelEvent.presence, - payload: [ - "type": "presence", - "event": "track", - "payload": .object(state), - ] - ) - } - - /// Stops tracking the current state in the channel. - public func untrack() async { - await push( - ChannelEvent.presence, - payload: [ - "type": "presence", - "event": "untrack", - ] - ) - } - - func onMessage(_ message: RealtimeMessageV2) async { - do { - guard let eventType = message._eventType else { - logger?.debug("Received message without event type: \(message)") - return - } - - switch eventType { - case .tokenExpired: - // deprecated type - break - - case .system: - if message.status == .ok { - logger?.debug("Subscribed to channel \(message.topic)") - status = .subscribed - } else { - logger?.debug( - "Failed to subscribe to channel \(message.topic): \(message.payload)" - ) - } - - callbackManager.triggerSystem(message: message) - - case .reply: - guard - let ref = message.ref, - let status = message.payload["status"]?.stringValue - else { - throw RealtimeError("Received a reply with unexpected payload: \(message)") - } - - await didReceiveReply(ref: ref, status: status) - - if message.payload["response"]?.objectValue?.keys - .contains(ChannelEvent.postgresChanges) == true - { - let serverPostgresChanges = try message.payload["response"]? - .objectValue?["postgres_changes"]? - .decode(as: [PostgresJoinConfig].self) - - callbackManager.setServerChanges(changes: serverPostgresChanges ?? []) - - if self.status != .subscribed { - self.status = .subscribed - logger?.debug("Subscribed to channel \(message.topic)") - } - } - - case .postgresChanges: - guard let data = message.payload["data"] else { - logger?.debug("Expected \"data\" key in message payload.") - return - } - - let ids = message.payload["ids"]?.arrayValue?.compactMap(\.intValue) ?? [] - - let postgresActions = try data.decode(as: PostgresActionData.self) - - let action: AnyAction - switch postgresActions.type { - case "UPDATE": - action = .update( - UpdateAction( - columns: postgresActions.columns, - commitTimestamp: postgresActions.commitTimestamp, - record: postgresActions.record ?? [:], - oldRecord: postgresActions.oldRecord ?? [:], - rawMessage: message - ) - ) - - case "DELETE": - action = .delete( - DeleteAction( - columns: postgresActions.columns, - commitTimestamp: postgresActions.commitTimestamp, - oldRecord: postgresActions.oldRecord ?? [:], - rawMessage: message - ) - ) - - case "INSERT": - action = .insert( - InsertAction( - columns: postgresActions.columns, - commitTimestamp: postgresActions.commitTimestamp, - record: postgresActions.record ?? [:], - rawMessage: message - ) - ) - - default: - throw RealtimeError("Unknown event type: \(postgresActions.type)") - } - - callbackManager.triggerPostgresChanges(ids: ids, data: action) - - case .broadcast: - let payload = message.payload - - guard let event = payload["event"]?.stringValue else { - throw RealtimeError("Expected 'event' key in 'payload' for broadcast event.") - } - - callbackManager.triggerBroadcast(event: event, json: payload) - - case .close: - socket._remove(self) - logger?.debug("Unsubscribed from channel \(message.topic)") - status = .unsubscribed - - case .error: - logger?.debug( - "Received an error in channel \(message.topic). That could be as a result of an invalid access token" - ) - - case .presenceDiff: - let joins = try message.payload["joins"]?.decode(as: [String: PresenceV2].self) ?? [:] - let leaves = try message.payload["leaves"]?.decode(as: [String: PresenceV2].self) ?? [:] - callbackManager.triggerPresenceDiffs(joins: joins, leaves: leaves, rawMessage: message) - - case .presenceState: - let joins = try message.payload.decode(as: [String: PresenceV2].self) - callbackManager.triggerPresenceDiffs(joins: joins, leaves: [:], rawMessage: message) - } - } catch { - logger?.debug("Failed: \(error)") - } - } - - /// Listen for clients joining / leaving the channel using presences. - public func onPresenceChange( - _ callback: @escaping @Sendable (any PresenceAction) -> Void - ) -> RealtimeSubscription { - let id = callbackManager.addPresenceCallback(callback: callback) - return RealtimeSubscription { [weak callbackManager, logger] in - logger?.debug("Removing presence callback with id: \(id)") - callbackManager?.removeCallback(id: id) - } - } - - /// Listen for postgres changes in a channel. - public func onPostgresChange( - _: AnyAction.Type, - schema: String = "public", - table: String? = nil, - filter: String? = nil, - callback: @escaping @Sendable (AnyAction) -> Void - ) -> RealtimeSubscription { - _onPostgresChange( - event: .all, - schema: schema, - table: table, - filter: filter - ) { - callback($0) - } - } - - /// Listen for postgres changes in a channel. - public func onPostgresChange( - _: InsertAction.Type, - schema: String = "public", - table: String? = nil, - filter: String? = nil, - callback: @escaping @Sendable (InsertAction) -> Void - ) -> RealtimeSubscription { - _onPostgresChange( - event: .insert, - schema: schema, - table: table, - filter: filter - ) { - guard case let .insert(action) = $0 else { return } - callback(action) - } - } - - /// Listen for postgres changes in a channel. - public func onPostgresChange( - _: UpdateAction.Type, - schema: String = "public", - table: String? = nil, - filter: String? = nil, - callback: @escaping @Sendable (UpdateAction) -> Void - ) -> RealtimeSubscription { - _onPostgresChange( - event: .update, - schema: schema, - table: table, - filter: filter - ) { - guard case let .update(action) = $0 else { return } - callback(action) - } - } - - /// Listen for postgres changes in a channel. - public func onPostgresChange( - _: DeleteAction.Type, - schema: String = "public", - table: String? = nil, - filter: String? = nil, - callback: @escaping @Sendable (DeleteAction) -> Void - ) -> RealtimeSubscription { - _onPostgresChange( - event: .delete, - schema: schema, - table: table, - filter: filter - ) { - guard case let .delete(action) = $0 else { return } - callback(action) - } - } - - func _onPostgresChange( - event: PostgresChangeEvent, - schema: String, - table: String?, - filter: String?, - callback: @escaping @Sendable (AnyAction) -> Void - ) -> RealtimeSubscription { - guard status != .subscribed else { - reportIssue( - "You cannot call postgresChange after joining the channel, this won't work as expected." - ) - return RealtimeSubscription {} - } - - let config = PostgresJoinConfig( - event: event, - schema: schema, - table: table, - filter: filter - ) - - mutableState.withValue { - $0.clientChanges.append(config) - } - - let id = callbackManager.addPostgresCallback(filter: config, callback: callback) - return RealtimeSubscription { [weak callbackManager, logger] in - logger?.debug("Removing postgres callback with id: \(id)") - callbackManager?.removeCallback(id: id) - } - } - - /// Listen for broadcast messages sent by other clients within the same channel under a specific `event`. - public func onBroadcast( - event: String, - callback: @escaping @Sendable (JSONObject) -> Void - ) -> RealtimeSubscription { - let id = callbackManager.addBroadcastCallback(event: event, callback: callback) - return RealtimeSubscription { [weak callbackManager, logger] in - logger?.debug("Removing broadcast callback with id: \(id)") - callbackManager?.removeCallback(id: id) - } - } - - /// Listen for `system` event. - public func onSystem( - callback: @escaping @Sendable (RealtimeMessageV2) -> Void - ) -> RealtimeSubscription { - let id = callbackManager.addSystemCallback(callback: callback) - return RealtimeSubscription { [weak callbackManager, logger] in - logger?.debug("Removing system callback with id: \(id)") - callbackManager?.removeCallback(id: id) - } - } - - /// Listen for `system` event. - public func onSystem( - callback: @escaping @Sendable () -> Void - ) -> RealtimeSubscription { - self.onSystem { _ in callback() } - } - - @discardableResult - func push(_ event: String, ref: String? = nil, payload: JSONObject = [:]) async -> PushStatus { - let push = mutableState.withValue { - let message = RealtimeMessageV2( - joinRef: $0.joinRef, - ref: ref ?? socket.makeRef(), - topic: self.topic, - event: event, - payload: payload - ) - - let push = PushV2(channel: self, message: message) - if let ref = message.ref { - $0.pushes[ref] = push - } - - return push - } - - return await push.send() - } - - private func didReceiveReply(ref: String, status: String) async { - let push = mutableState.withValue { - $0.pushes.removeValue(forKey: ref) - } - await push?.didReceive(status: PushStatus(rawValue: status) ?? .ok) - } -} diff --git a/Sources/Realtime/V2/RealtimeClientV2.swift b/Sources/Realtime/V2/RealtimeClientV2.swift deleted file mode 100644 index 34a01940..00000000 --- a/Sources/Realtime/V2/RealtimeClientV2.swift +++ /dev/null @@ -1,558 +0,0 @@ -// -// RealtimeClientV2.swift -// -// -// Created by Guilherme Souza on 26/12/23. -// - -import ConcurrencyExtras -import Foundation -import Helpers - -#if canImport(FoundationNetworking) - import FoundationNetworking -#endif - -public typealias JSONObject = Helpers.JSONObject - -/// Factory function for returning a new WebSocket connection. -typealias WebSocketTransport = @Sendable () async throws -> any WebSocket - -public final class RealtimeClientV2: Sendable { - struct MutableState { - var accessToken: String? - var ref = 0 - var pendingHeartbeatRef: String? - - /// Long-running task that keeps sending heartbeat messages. - var heartbeatTask: Task? - - /// Long-running task for listening for incoming messages from WebSocket. - var messageTask: Task? - - var connectionTask: Task? - var channels: [RealtimeChannelV2] = [] - var sendBuffer: [@Sendable () -> Void] = [] - - var conn: (any WebSocket)? - } - - let url: URL - let options: RealtimeClientOptions - let wsTransport: WebSocketTransport - let mutableState = LockIsolated(MutableState()) - let http: any HTTPClientType - let apikey: String? - - var conn: (any WebSocket)? { - mutableState.conn - } - - /// All managed channels indexed by their topics. - public var channels: [String: RealtimeChannelV2] { - mutableState.channels.reduce( - into: [:], - { $0[$1.topic] = $1 } - ) - } - - private let statusSubject = AsyncValueSubject(.disconnected) - - /// Listen for connection status changes. - /// - /// You can also use ``onStatusChange(_:)`` for a closure based method. - public var statusChange: AsyncStream { - statusSubject.values - } - - /// The current connection status. - public private(set) var status: RealtimeClientStatus { - get { statusSubject.value } - set { statusSubject.yield(newValue) } - } - - /// Listen for connection status changes. - /// - Parameter listener: Closure that will be called when connection status changes. - /// - Returns: An observation handle that can be used to stop listening. - /// - /// - Note: Use ``statusChange`` if you prefer to use Async/Await. - public func onStatusChange( - _ listener: @escaping @Sendable (RealtimeClientStatus) -> Void - ) -> RealtimeSubscription { - let task = statusSubject.onChange { listener($0) } - return RealtimeSubscription { task.cancel() } - } - - public convenience init(url: URL, options: RealtimeClientOptions) { - var interceptors: [any HTTPClientInterceptor] = [] - - if let logger = options.logger { - interceptors.append(LoggerInterceptor(logger: logger)) - } - - self.init( - url: url, - options: options, - wsTransport: { - let configuration = URLSessionConfiguration.default - configuration.httpAdditionalHeaders = options.headers.dictionary - return try await URLSessionWebSocket.connect( - to: Self.realtimeWebSocketURL( - baseURL: Self.realtimeBaseURL(url: url), - apikey: options.apikey - ), - configuration: configuration - ) - }, - http: HTTPClient( - fetch: options.fetch ?? { try await URLSession.shared.data(for: $0) }, - interceptors: interceptors - ) - ) - } - - init( - url: URL, - options: RealtimeClientOptions, - wsTransport: @escaping WebSocketTransport, - http: any HTTPClientType - ) { - self.url = url - self.options = options - self.wsTransport = wsTransport - self.http = http - apikey = options.apikey - - mutableState.withValue { - if let accessToken = options.headers[.authorization]?.split(separator: " ").last { - $0.accessToken = String(accessToken) - } else { - $0.accessToken = options.apikey - } - } - } - - deinit { - mutableState.withValue { - $0.heartbeatTask?.cancel() - $0.messageTask?.cancel() - $0.channels = [] - } - } - - /// Connects the socket. - /// - /// Suspends until connected. - public func connect() async { - await connect(reconnect: false) - } - - func connect(reconnect: Bool) async { - if status == .disconnected { - let connectionTask = Task { - if reconnect { - try? await _clock.sleep(for: options.reconnectDelay) - - if Task.isCancelled { - options.logger?.debug("Reconnect cancelled, returning") - return - } - } - - if status == .connected { - options.logger?.debug("WebsSocket already connected") - return - } - - status = .connecting - - do { - let conn = try await wsTransport() - mutableState.withValue { $0.conn = conn } - onConnected(reconnect: reconnect) - } catch { - onError(error) - } - } - - mutableState.withValue { - $0.connectionTask = connectionTask - } - } - - _ = await statusChange.first { @Sendable in $0 == .connected } - } - - private func onConnected(reconnect: Bool) { - status = .connected - options.logger?.debug("Connected to realtime WebSocket") - listenForMessages() - startHeartbeating() - if reconnect { - rejoinChannels() - } - - flushSendBuffer() - } - - private func onDisconnected() { - options.logger? - .debug( - "WebSocket disconnected. Trying again in \(options.reconnectDelay)" - ) - reconnect() - } - - private func onError(_ error: (any Error)?) { - options.logger? - .debug( - "WebSocket error \(error?.localizedDescription ?? ""). Trying again in \(options.reconnectDelay)" - ) - reconnect() - } - - private func onClose(code: Int?, reason: String?) { - options.logger?.debug( - "WebSocket closed. Code: \(code?.description ?? ""), Reason: \(reason ?? "")") - - reconnect() - } - - private func reconnect() { - Task { - disconnect() - await connect(reconnect: true) - } - } - - /// Creates a new channel and bind it to this client. - /// - Parameters: - /// - topic: Channel's topic. - /// - options: Configuration options for the channel. - /// - Returns: Channel instance. - /// - /// - Note: This method doesn't subscribe to the channel, call ``RealtimeChannelV2/subscribe()`` on the returned channel instance. - public func channel( - _ topic: String, - options: @Sendable (inout RealtimeChannelConfig) -> Void = { _ in } - ) -> RealtimeChannelV2 { - var config = RealtimeChannelConfig( - broadcast: BroadcastJoinConfig(acknowledgeBroadcasts: false, receiveOwnBroadcasts: false), - presence: PresenceJoinConfig(key: ""), - isPrivate: false - ) - options(&config) - - let channel = RealtimeChannelV2( - topic: "realtime:\(topic)", - config: config, - socket: self, - logger: self.options.logger - ) - - mutableState.withValue { - $0.channels.append(channel) - } - - return channel - } - - @available( - *, deprecated, - message: - "Client handles channels automatically, this method will be removed on the next major release." - ) - public func addChannel(_ channel: RealtimeChannelV2) { - mutableState.withValue { - $0.channels.append(channel) - } - } - - /// Unsubscribe and removes channel. - /// - /// If there is no channel left, client is disconnected. - public func removeChannel(_ channel: RealtimeChannelV2) async { - if channel.status == .subscribed { - await channel.unsubscribe() - } - - if channels.isEmpty { - options.logger?.debug("No more subscribed channel in socket") - disconnect() - } - } - - func _remove(_ channel: RealtimeChannelV2) { - mutableState.withValue { - $0.channels.removeAll { - $0.joinRef == channel.joinRef - } - } - } - - /// Unsubscribes and removes all channels. - public func removeAllChannels() async { - await withTaskGroup(of: Void.self) { group in - for channel in channels.values { - group.addTask { await self.removeChannel(channel) } - } - - await group.waitForAll() - } - } - - func _getAccessToken() async -> String? { - if let accessToken = try? await options.accessToken?() { - return accessToken - } - return mutableState.accessToken - } - - private func rejoinChannels() { - Task { - for channel in channels.values { - await channel.subscribe() - } - } - } - - private func listenForMessages() { - let messageTask = Task { [weak self] in - guard let self, let conn = self.conn else { return } - - do { - for await event in conn.events { - if Task.isCancelled { return } - - switch event { - case .binary: - self.options.logger?.error("Unsupported binary event received.") - break - case .text(let text): - let data = Data(text.utf8) - let message = try JSONDecoder().decode(RealtimeMessageV2.self, from: data) - await onMessage(message) - - case let .close(code, reason): - onClose(code: code, reason: reason) - } - } - } catch { - onError(error) - } - } - mutableState.withValue { - $0.messageTask = messageTask - } - } - - private func startHeartbeating() { - let heartbeatTask = Task { [weak self, options] in - while !Task.isCancelled { - try? await _clock.sleep(for: options.heartbeatInterval) - if Task.isCancelled { - break - } - self?.sendHeartbeat() - } - } - mutableState.withValue { - $0.heartbeatTask = heartbeatTask - } - } - - private func sendHeartbeat() { - let pendingHeartbeatRef: String? = mutableState.withValue { - if $0.pendingHeartbeatRef != nil { - $0.pendingHeartbeatRef = nil - return nil - } - - let ref = makeRef() - $0.pendingHeartbeatRef = ref - return ref - } - - if let pendingHeartbeatRef { - push( - RealtimeMessageV2( - joinRef: nil, - ref: pendingHeartbeatRef, - topic: "phoenix", - event: "heartbeat", - payload: [:] - ) - ) - } else { - options.logger?.debug("Heartbeat timeout") - reconnect() - } - } - - /// Disconnects client. - /// - Parameters: - /// - code: A numeric status code to send on disconnect. - /// - reason: A custom reason for the disconnect. - public func disconnect(code: Int? = nil, reason: String? = nil) { - options.logger?.debug("Closing WebSocket connection") - - conn?.close(code: code, reason: reason) - - mutableState.withValue { - $0.ref = 0 - $0.messageTask?.cancel() - $0.heartbeatTask?.cancel() - $0.connectionTask?.cancel() - $0.conn = nil - } - - status = .disconnected - } - - /// Sets the JWT access token used for channel subscription authorization and Realtime RLS. - /// - /// If `token` is nil it will use the ``RealtimeClientOptions/accessToken`` callback function or the token set on the client. - /// - /// On callback used, it will set the value of the token internal to the client. - /// - Parameter token: A JWT string to override the token set on the client. - public func setAuth(_ token: String? = nil) async { - var token = token - - if token == nil { - token = try? await options.accessToken?() - } - - if token == nil { - token = mutableState.accessToken - } - - if let token, let payload = JWT.decodePayload(token), - let exp = payload["exp"] as? TimeInterval, exp < Date().timeIntervalSince1970 - { - options.logger?.warning( - "InvalidJWTToken: Invalid value for JWT claim \"exp\" with value \(exp)") - return - } - - mutableState.withValue { [token] in - $0.accessToken = token - } - - for channel in channels.values { - if channel.status == .subscribed { - options.logger?.debug("Updating auth token for channel \(channel.topic)") - await channel.push( - ChannelEvent.accessToken, - payload: ["access_token": token.map { .string($0) } ?? .null] - ) - } - } - } - - private func onMessage(_ message: RealtimeMessageV2) async { - let channels = mutableState.withValue { - if let ref = message.ref, ref == $0.pendingHeartbeatRef { - $0.pendingHeartbeatRef = nil - options.logger?.debug("heartbeat received") - } else { - options.logger? - .debug("Received event \(message.event) for channel \(message.topic)") - } - - return $0.channels.filter { $0.topic == message.topic } - } - - for channel in channels { - await channel.onMessage(message) - } - } - - /// Push out a message if the socket is connected. - /// - /// If the socket is not connected, the message gets enqueued within a local buffer, and sent out when a connection is next established. - public func push(_ message: RealtimeMessageV2) { - let callback = { @Sendable [weak self] in - do { - // Check cancellation before sending, because this push may have been cancelled before a connection was established. - try Task.checkCancellation() - let data = try JSONEncoder().encode(message) - self?.conn?.send(String(decoding: data, as: UTF8.self)) - } catch { - self?.options.logger?.error( - """ - Failed to send message: - \(message) - - Error: - \(error) - """) - } - } - - if status == .connected { - callback() - } else { - mutableState.withValue { - $0.sendBuffer.append(callback) - } - } - } - - private func flushSendBuffer() { - mutableState.withValue { - $0.sendBuffer.forEach { $0() } - $0.sendBuffer = [] - } - } - - func makeRef() -> String { - mutableState.withValue { - $0.ref += 1 - return $0.ref.description - } - } - - static func realtimeBaseURL(url: URL) -> URL { - guard var components = URLComponents(url: url, resolvingAgainstBaseURL: false) else { - return url - } - - if components.scheme == "https" { - components.scheme = "wss" - } else if components.scheme == "http" { - components.scheme = "ws" - } - - guard let url = components.url else { - return url - } - - return url - } - - static func realtimeWebSocketURL(baseURL: URL, apikey: String?) -> URL { - guard var components = URLComponents(url: baseURL, resolvingAgainstBaseURL: false) - else { - return baseURL - } - - components.queryItems = components.queryItems ?? [] - if let apikey { - components.queryItems!.append(URLQueryItem(name: "apikey", value: apikey)) - } - components.queryItems!.append(URLQueryItem(name: "vsn", value: "1.0.0")) - - components.path.append("/websocket") - components.path = components.path.replacingOccurrences(of: "//", with: "/") - - guard let url = components.url else { - return baseURL - } - - return url - } - - var broadcastURL: URL { - url.appendingPathComponent("api/broadcast") - } -} diff --git a/Sources/Realtime/V2/RealtimeJoinConfig.swift b/Sources/Realtime/V2/RealtimeJoinConfig.swift deleted file mode 100644 index 06b0a492..00000000 --- a/Sources/Realtime/V2/RealtimeJoinConfig.swift +++ /dev/null @@ -1,92 +0,0 @@ -// -// RealtimeJoinConfig.swift -// -// -// Created by Guilherme Souza on 24/12/23. -// - -import Foundation - -struct RealtimeJoinPayload: Codable { - var config: RealtimeJoinConfig - var accessToken: String? - - enum CodingKeys: String, CodingKey { - case config - case accessToken = "access_token" - } -} - -struct RealtimeJoinConfig: Codable, Hashable { - var broadcast: BroadcastJoinConfig = .init() - var presence: PresenceJoinConfig = .init() - var postgresChanges: [PostgresJoinConfig] = [] - var isPrivate: Bool = false - - enum CodingKeys: String, CodingKey { - case broadcast - case presence - case isPrivate = "private" - case postgresChanges = "postgres_changes" - } -} - -public struct BroadcastJoinConfig: Codable, Hashable, Sendable { - /// Instructs server to acknowledge that broadcast message was received. - public var acknowledgeBroadcasts: Bool = false - /// Broadcast messages back to the sender. - /// - /// By default, broadcast messages are only sent to other clients. - public var receiveOwnBroadcasts: Bool = false - - enum CodingKeys: String, CodingKey { - case acknowledgeBroadcasts = "ack" - case receiveOwnBroadcasts = "self" - } -} - -public struct PresenceJoinConfig: Codable, Hashable, Sendable { - /// Track presence payload across clients. - public var key: String = "" -} - -public enum PostgresChangeEvent: String, Codable, Sendable { - case insert = "INSERT" - case update = "UPDATE" - case delete = "DELETE" - case all = "*" -} - -struct PostgresJoinConfig: Codable, Hashable, Sendable { - var event: PostgresChangeEvent? - var schema: String - var table: String? - var filter: String? - var id: Int = 0 - - static func == (lhs: Self, rhs: Self) -> Bool { - lhs.schema == rhs.schema - && lhs.table == rhs.table - && lhs.filter == rhs.filter - && (lhs.event == rhs.event || rhs.event == .all) - } - - func hash(into hasher: inout Hasher) { - hasher.combine(schema) - hasher.combine(table) - hasher.combine(filter) - hasher.combine(event) - } - - func encode(to encoder: any Encoder) throws { - var container = encoder.container(keyedBy: CodingKeys.self) - try container.encode(event, forKey: .event) - try container.encode(schema, forKey: .schema) - try container.encodeIfPresent(table, forKey: .table) - try container.encodeIfPresent(filter, forKey: .filter) - - if id != 0 { - try container.encode(id, forKey: .id) - } - } -} diff --git a/Sources/Realtime/V2/RealtimeMessageV2.swift b/Sources/Realtime/V2/RealtimeMessageV2.swift deleted file mode 100644 index 200498cd..00000000 --- a/Sources/Realtime/V2/RealtimeMessageV2.swift +++ /dev/null @@ -1,82 +0,0 @@ -import Foundation -import Helpers - -public struct RealtimeMessageV2: Hashable, Codable, Sendable { - public let joinRef: String? - public let ref: String? - public let topic: String - public let event: String - public let payload: JSONObject - - public init(joinRef: String?, ref: String?, topic: String, event: String, payload: JSONObject) { - self.joinRef = joinRef - self.ref = ref - self.topic = topic - self.event = event - self.payload = payload - } - - /// Status for the received message if any. - public var status: PushStatus? { - payload["status"] - .flatMap(\.stringValue) - .flatMap(PushStatus.init(rawValue:)) - } - - @available( - *, deprecated, - message: "Access to event type will be removed, please inspect raw event value instead." - ) - public var eventType: EventType? { _eventType } - - var _eventType: EventType? { - switch event { - case ChannelEvent.system: .system - case ChannelEvent.postgresChanges: - .postgresChanges - case ChannelEvent.broadcast: - .broadcast - case ChannelEvent.close: - .close - case ChannelEvent.error: - .error - case ChannelEvent.presenceDiff: - .presenceDiff - case ChannelEvent.presenceState: - .presenceState - case ChannelEvent.reply: - .reply - default: - nil - } - } - - public enum EventType { - case system - case postgresChanges - case broadcast - case close - case error - case presenceDiff - case presenceState - @available( - *, deprecated, - message: - "tokenExpired gets returned as system, check payload for verifying if is a token expiration." - ) - case tokenExpired - case reply - } - - private enum CodingKeys: String, CodingKey { - case joinRef = "join_ref" - case ref - case topic - case event - case payload - } -} - -extension RealtimeMessageV2: HasRawMessage { - public var rawMessage: RealtimeMessageV2 { self } -} diff --git a/Sources/Realtime/V2/Types.swift b/Sources/Realtime/V2/Types.swift deleted file mode 100644 index 184b8cb5..00000000 --- a/Sources/Realtime/V2/Types.swift +++ /dev/null @@ -1,86 +0,0 @@ -// -// Types.swift -// -// -// Created by Guilherme Souza on 13/05/24. -// - -import Foundation -import HTTPTypes -import Helpers - -#if canImport(FoundationNetworking) - import FoundationNetworking -#endif - -/// Options for initializing ``RealtimeClientV2``. -public struct RealtimeClientOptions: Sendable { - package var headers: HTTPFields - var heartbeatInterval: TimeInterval - var reconnectDelay: TimeInterval - var timeoutInterval: TimeInterval - var disconnectOnSessionLoss: Bool - var connectOnSubscribe: Bool - var fetch: (@Sendable (_ request: URLRequest) async throws -> (Data, URLResponse))? - package var accessToken: (@Sendable () async throws -> String?)? - package var logger: (any SupabaseLogger)? - - public static let defaultHeartbeatInterval: TimeInterval = 15 - public static let defaultReconnectDelay: TimeInterval = 7 - public static let defaultTimeoutInterval: TimeInterval = 10 - public static let defaultDisconnectOnSessionLoss = true - public static let defaultConnectOnSubscribe: Bool = true - - public init( - headers: [String: String] = [:], - heartbeatInterval: TimeInterval = Self.defaultHeartbeatInterval, - reconnectDelay: TimeInterval = Self.defaultReconnectDelay, - timeoutInterval: TimeInterval = Self.defaultTimeoutInterval, - disconnectOnSessionLoss: Bool = Self.defaultDisconnectOnSessionLoss, - connectOnSubscribe: Bool = Self.defaultConnectOnSubscribe, - fetch: (@Sendable (_ request: URLRequest) async throws -> (Data, URLResponse))? = nil, - accessToken: (@Sendable () async throws -> String?)? = nil, - logger: (any SupabaseLogger)? = nil - ) { - self.headers = HTTPFields(headers) - self.heartbeatInterval = heartbeatInterval - self.reconnectDelay = reconnectDelay - self.timeoutInterval = timeoutInterval - self.disconnectOnSessionLoss = disconnectOnSessionLoss - self.connectOnSubscribe = connectOnSubscribe - self.fetch = fetch - self.accessToken = accessToken - self.logger = logger - } - - var apikey: String? { - headers[.apiKey] - } -} - -public typealias RealtimeSubscription = ObservationToken - -public enum RealtimeChannelStatus: Sendable { - case unsubscribed - case subscribing - case subscribed - case unsubscribing -} - -public enum RealtimeClientStatus: Sendable, CustomStringConvertible { - case disconnected - case connecting - case connected - - public var description: String { - switch self { - case .disconnected: "Disconnected" - case .connecting: "Connecting" - case .connected: "Connected" - } - } -} - -extension HTTPField.Name { - static let apiKey = Self("apiKey")! -}