diff --git a/Sources/SwiftCentrifuge/Client.swift b/Sources/SwiftCentrifuge/Client.swift index 57d826d..44dc1e7 100644 --- a/Sources/SwiftCentrifuge/Client.swift +++ b/Sources/SwiftCentrifuge/Client.swift @@ -892,17 +892,21 @@ fileprivate extension CentrifugeClient { } private func handleData(data: Data) { - guard let replies = try? CentrifugeSerializer.deserializeCommands(data: data) else { return } - for reply in replies { - if reply.id > 0 { - self.opCallbacks[reply.id]?(CentrifugeResolveData(error: nil, reply: reply)) - } else { - if !reply.hasPush { - self.handlePing() + do { + let replies = try CentrifugeSerializer.deserializeCommands(data: data) + for reply in replies { + if reply.id > 0 { + self.opCallbacks[reply.id]?(CentrifugeResolveData(error: nil, reply: reply)) } else { - self.handlePush(push: reply.push) + if !reply.hasPush { + self.handlePing() + } else { + self.handlePush(push: reply.push) + } } } + } catch { + self.log.error("error deserializeCommands: \(error)") } } diff --git a/Sources/SwiftCentrifuge/Helpers.swift b/Sources/SwiftCentrifuge/Helpers.swift index 9e1ea82..ce1b3f4 100644 --- a/Sources/SwiftCentrifuge/Helpers.swift +++ b/Sources/SwiftCentrifuge/Helpers.swift @@ -35,6 +35,26 @@ public struct StreamPosition { var epoch: String = "" } +// Helper function to decode a varint. +func readVarint(from data: Data) throws -> (value: Int, length: Int) { + var value = 0 + var length = 0 + for byte in data { + value |= Int(byte & 0x7F) << (7 * length) + length += 1 + if (byte & 0x80) == 0 { + return (value, length) + } + } + throw ProtobufDecodeError.failedToReadVarintLengthPrefix +} + +private enum ProtobufDecodeError: Error { + case failedToReadVarintLengthPrefix + case notEnoughDataForMessage + case failedToParseMessage(Error) +} + internal enum CentrifugeSerializer { static func serializeCommands(commands: [Centrifugal_Centrifuge_Protocol_Command]) throws -> Data { let stream = OutputStream.toMemory() @@ -46,20 +66,35 @@ internal enum CentrifugeSerializer { return stream.property(forKey: .dataWrittenToMemoryStreamKey) as! Data } + // Note, not using BinaryDelimited.parse to work with all Protobuf versions - SwiftProtobuf 1.27.0 + // introduced noBytesAvailable error which was not available before. static func deserializeCommands(data: Data) throws -> [Centrifugal_Centrifuge_Protocol_Reply] { - var commands = [Centrifugal_Centrifuge_Protocol_Reply]() - let stream = InputStream(data: data as Data) - stream.open() - while true { + var replies = [Centrifugal_Centrifuge_Protocol_Reply]() + var currentIndex = data.startIndex + + while currentIndex < data.endIndex { + // Read the varint length prefix. + let remainingData = data[currentIndex...] + let (messageLength, lengthBytes) = try readVarint(from: remainingData) + // Calculate the total length of the message (varint length prefix + message data). + let totalLength = lengthBytes + messageLength + // Ensure there is enough data left. + guard currentIndex + totalLength <= data.endIndex else { + throw ProtobufDecodeError.notEnoughDataForMessage + } + // Extract the message data. + let messageData = data[(currentIndex + lengthBytes)..<(currentIndex + totalLength)] + // Parse the Protobuf message payload to Centrifuge Reply. do { - let res = try BinaryDelimited.parse(messageType: Centrifugal_Centrifuge_Protocol_Reply.self, from: stream) - commands.append(res) - } catch BinaryDelimited.Error.truncated { - // End of stream - break + let reply = try Centrifugal_Centrifuge_Protocol_Reply(serializedData: messageData) + replies.append(reply) + } catch { + throw ProtobufDecodeError.failedToParseMessage(error) } + // Move to the next message. + currentIndex += totalLength } - return commands + return replies } }