From 20d761ab3aaa229d8907c5a109812d09ec222dcd Mon Sep 17 00:00:00 2001 From: Guilherme Souza Date: Fri, 5 Dec 2025 06:37:53 -0300 Subject: [PATCH 1/2] feat(realtime): implement V2 serializer with binary payload support MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit This implements the Realtime V2 serializer based on supabase-js PRs #1829 and #1894. Key features: - Binary payload support for user messages - Two new message types: user broadcast and user broadcast push - Optional metadata support for user broadcast push messages - Reduced JSON encoding overhead on the server side - Backward compatible with V1 (1.0.0) as default Changes: - Added RealtimeBinaryEncoder and RealtimeBinaryDecoder classes - Added RealtimeSerializer protocol for future extensibility - Updated RealtimeClientOptions to support serializer version selection - Updated RealtimeClientV2 to use binary serializer when v2.0.0 is selected - Added RealtimeBinaryPayload helper for working with binary data - Comprehensive test suite with 16 tests covering encoding/decoding scenarios References: - https://github.com/supabase/supabase-js/pull/1829 - https://github.com/supabase/supabase-js/pull/1894 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude --- Sources/Realtime/RealtimeBinaryDecoder.swift | 266 +++++++++++++ Sources/Realtime/RealtimeBinaryEncoder.swift | 194 +++++++++ Sources/Realtime/RealtimeBinaryPayload.swift | 50 +++ .../Realtime/RealtimeChannel+AsyncAwait.swift | 22 +- Sources/Realtime/RealtimeClientV2.swift | 49 ++- Sources/Realtime/RealtimePostgresFilter.swift | 14 +- Sources/Realtime/RealtimeSerializer.swift | 21 + Sources/Realtime/Types.swift | 17 +- .../RealtimeSerializerTests.swift | 373 ++++++++++++++++++ 9 files changed, 979 insertions(+), 27 deletions(-) create mode 100644 Sources/Realtime/RealtimeBinaryDecoder.swift create mode 100644 Sources/Realtime/RealtimeBinaryEncoder.swift create mode 100644 Sources/Realtime/RealtimeBinaryPayload.swift create mode 100644 Sources/Realtime/RealtimeSerializer.swift create mode 100644 Tests/RealtimeTests/RealtimeSerializerTests.swift diff --git a/Sources/Realtime/RealtimeBinaryDecoder.swift b/Sources/Realtime/RealtimeBinaryDecoder.swift new file mode 100644 index 000000000..3274ab9f4 --- /dev/null +++ b/Sources/Realtime/RealtimeBinaryDecoder.swift @@ -0,0 +1,266 @@ +// +// RealtimeBinaryDecoder.swift +// +// +// Created by Guilherme Souza on 05/12/24. +// + +import Foundation + +/// Binary decoder for Realtime V2 messages. +/// +/// Supports decoding messages with: +/// - Binary payloads +/// - User broadcast messages with metadata +/// - Push, reply, broadcast, and user broadcast message types +final class RealtimeBinaryDecoder: Sendable { + private let headerLength = 1 + private let metaLength = 4 + + enum MessageKind: UInt8 { + case push = 0 + case reply = 1 + case broadcast = 2 + case userBroadcastPush = 3 + case userBroadcast = 4 + } + + enum PayloadEncoding: UInt8 { + case binary = 0 + case json = 1 + } + + /// Decodes binary data into a Realtime message. + /// - Parameter data: Binary data to decode + /// - Returns: Decoded message + func decode(_ data: Data) throws -> RealtimeMessageV2 { + guard !data.isEmpty else { + throw RealtimeError("Empty binary data") + } + + let kind = data[0] + + guard let messageKind = MessageKind(rawValue: kind) else { + throw RealtimeError("Unknown message kind: \(kind)") + } + + switch messageKind { + case .push: + return try decodePush(data) + case .reply: + return try decodeReply(data) + case .broadcast: + return try decodeBroadcast(data) + case .userBroadcast: + return try decodeUserBroadcast(data) + case .userBroadcastPush: + throw RealtimeError("userBroadcastPush should not be received from server") + } + } + + // MARK: - Private Decoding Methods + + private func decodePush(_ data: Data) throws -> RealtimeMessageV2 { + guard data.count >= headerLength + metaLength - 1 else { + throw RealtimeError("Invalid push message length") + } + + let joinRefSize = Int(data[1]) + let topicSize = Int(data[2]) + let eventSize = Int(data[3]) + + var offset = headerLength + metaLength - 1 // pushes have no ref + + let joinRef = try decodeString(from: data, offset: offset, length: joinRefSize) + offset += joinRefSize + + let topic = try decodeString(from: data, offset: offset, length: topicSize) + offset += topicSize + + let event = try decodeString(from: data, offset: offset, length: eventSize) + offset += eventSize + + let payloadData = data.subdata(in: offset.. RealtimeMessageV2 { + guard data.count >= headerLength + metaLength else { + throw RealtimeError("Invalid reply message length") + } + + let joinRefSize = Int(data[1]) + let refSize = Int(data[2]) + let topicSize = Int(data[3]) + let eventSize = Int(data[4]) + + var offset = headerLength + metaLength + + let joinRef = try decodeString(from: data, offset: offset, length: joinRefSize) + offset += joinRefSize + + let ref = try decodeString(from: data, offset: offset, length: refSize) + offset += refSize + + let topic = try decodeString(from: data, offset: offset, length: topicSize) + offset += topicSize + + let event = try decodeString(from: data, offset: offset, length: eventSize) + offset += eventSize + + let responseData = data.subdata(in: offset.. RealtimeMessageV2 { + guard data.count >= headerLength + 2 else { + throw RealtimeError("Invalid broadcast message length") + } + + let topicSize = Int(data[1]) + let eventSize = Int(data[2]) + + var offset = headerLength + 2 + + let topic = try decodeString(from: data, offset: offset, length: topicSize) + offset += topicSize + + let event = try decodeString(from: data, offset: offset, length: eventSize) + offset += eventSize + + let payloadData = data.subdata(in: offset.. RealtimeMessageV2 { + guard data.count >= headerLength + 4 else { + throw RealtimeError("Invalid user broadcast message length") + } + + let topicSize = Int(data[1]) + let userEventSize = Int(data[2]) + let metadataSize = Int(data[3]) + let payloadEncoding = data[4] + + var offset = headerLength + 4 + + let topic = try decodeString(from: data, offset: offset, length: topicSize) + offset += topicSize + + let userEvent = try decodeString(from: data, offset: offset, length: userEventSize) + offset += userEventSize + + let metadata = try decodeString(from: data, offset: offset, length: metadataSize) + offset += metadataSize + + let payloadData = data.subdata(in: offset.. String { + guard offset + length <= data.count else { + throw RealtimeError("Invalid string offset/length") + } + + let stringData = data.subdata(in: offset..<(offset + length)) + guard let string = String(data: stringData, encoding: .utf8) else { + throw RealtimeError("Failed to decode string") + } + return string + } +} + +// MARK: - AnyJSON Extensions for Binary Support + +extension AnyJSON { + /// Creates an AnyJSON value from a Swift value. + init(value: Any) throws { + if let dict = value as? [String: Any] { + var object: JSONObject = [:] + for (key, val) in dict { + object[key] = try AnyJSON(value: val) + } + self = .object(object) + } else if let array = value as? [Any] { + self = .array(try array.map { try AnyJSON(value: $0) }) + } else if let string = value as? String { + self = .string(string) + } else if let bool = value as? Bool { + // Bool must be checked before Int because Bool can be cast to Int + self = .bool(bool) + } else if let int = value as? Int { + self = .integer(int) + } else if let double = value as? Double { + self = .double(double) + } else if value is NSNull { + self = .null + } else { + throw RealtimeError("Unsupported JSON value type: \(type(of: value))") + } + } +} diff --git a/Sources/Realtime/RealtimeBinaryEncoder.swift b/Sources/Realtime/RealtimeBinaryEncoder.swift new file mode 100644 index 000000000..1b237c2b4 --- /dev/null +++ b/Sources/Realtime/RealtimeBinaryEncoder.swift @@ -0,0 +1,194 @@ +// +// RealtimeBinaryEncoder.swift +// +// +// Created by Guilherme Souza on 05/12/24. +// + +import Foundation + +/// Binary encoder for Realtime V2 messages. +/// +/// Supports encoding messages with: +/// - Binary payloads +/// - User broadcast messages with metadata +/// - Reduced JSON encoding overhead +final class RealtimeBinaryEncoder: Sendable { + private let headerLength = 1 + private let metaLength = 4 + private let userBroadcastPushMetaLength = 6 + + enum MessageKind: UInt8 { + case push = 0 + case reply = 1 + case broadcast = 2 + case userBroadcastPush = 3 + case userBroadcast = 4 + } + + enum PayloadEncoding: UInt8 { + case binary = 0 + case json = 1 + } + + private let allowedMetadataKeys: [String] + + init(allowedMetadataKeys: [String] = []) { + self.allowedMetadataKeys = allowedMetadataKeys + } + + /// Encodes a Realtime message to binary format. + /// - Parameter message: The message to encode + /// - Returns: Binary data representation + func encode(_ message: RealtimeMessageV2) throws -> Data { + // Check if this is a user broadcast push + if message.event == "broadcast", + let event = message.payload["event"]?.stringValue + { + return try encodeUserBroadcastPush(message: message, userEvent: event) + } + + // Check if this has a binary payload at top level + if let binaryPayload = getBinaryPayload(from: message.payload) { + return try encodePush(message: message, binaryPayload: binaryPayload) + } + + // Fall back to JSON encoding + return try encodeAsJSON(message) + } + + // MARK: - Private Encoding Methods + + private func encodePush(message: RealtimeMessageV2, binaryPayload: Data) throws -> Data { + let joinRef = message.joinRef ?? "" + let ref = message.ref ?? "" + let topic = message.topic + let event = message.event + + try validateFieldLength(joinRef, name: "joinRef") + try validateFieldLength(ref, name: "ref") + try validateFieldLength(topic, name: "topic") + try validateFieldLength(event, name: "event") + + let metaLength = + self.metaLength + joinRef.utf8.count + ref.utf8.count + topic.utf8.count + event.utf8.count + + var header = Data(capacity: headerLength + metaLength) + + header.append(MessageKind.push.rawValue) + header.append(UInt8(joinRef.utf8.count)) + header.append(UInt8(ref.utf8.count)) + header.append(UInt8(topic.utf8.count)) + header.append(UInt8(event.utf8.count)) + header.append(contentsOf: joinRef.utf8) + header.append(contentsOf: ref.utf8) + header.append(contentsOf: topic.utf8) + header.append(contentsOf: event.utf8) + + var combined = header + combined.append(binaryPayload) + return combined + } + + private func encodeUserBroadcastPush( + message: RealtimeMessageV2, + userEvent: String + ) throws -> Data { + let joinRef = message.joinRef ?? "" + let ref = message.ref ?? "" + let topic = message.topic + + // Extract the payload + let payload = message.payload["payload"] ?? .null + + // Encode payload + let encodedPayload: Data + let encoding: PayloadEncoding + + if let binaryData = getBinaryPayload(from: ["payload": payload]) { + encodedPayload = binaryData + encoding = .binary + } else { + encodedPayload = try JSONSerialization.data(withJSONObject: payload.value, options: []) + encoding = .json + } + + // Extract metadata based on allowed keys + let metadata: JSONObject + if !allowedMetadataKeys.isEmpty { + metadata = message.payload.filter { key, _ in + allowedMetadataKeys.contains(key) && key != "event" && key != "payload" && key != "type" + } + } else { + metadata = [:] + } + + let metadataString: String + if !metadata.isEmpty { + let metadataData = try JSONSerialization.data( + withJSONObject: metadata.mapValues(\.value), + options: [] + ) + metadataString = String(data: metadataData, encoding: .utf8) ?? "" + } else { + metadataString = "" + } + + // Validate lengths + try validateFieldLength(joinRef, name: "joinRef") + try validateFieldLength(ref, name: "ref") + try validateFieldLength(topic, name: "topic") + try validateFieldLength(userEvent, name: "userEvent") + try validateFieldLength(metadataString, name: "metadata") + + let metaLength = + userBroadcastPushMetaLength + joinRef.utf8.count + ref.utf8.count + topic.utf8.count + + userEvent.utf8.count + metadataString.utf8.count + + var header = Data(capacity: headerLength + metaLength) + + header.append(MessageKind.userBroadcastPush.rawValue) + header.append(UInt8(joinRef.utf8.count)) + header.append(UInt8(ref.utf8.count)) + header.append(UInt8(topic.utf8.count)) + header.append(UInt8(userEvent.utf8.count)) + header.append(UInt8(metadataString.utf8.count)) + header.append(encoding.rawValue) + header.append(contentsOf: joinRef.utf8) + header.append(contentsOf: ref.utf8) + header.append(contentsOf: topic.utf8) + header.append(contentsOf: userEvent.utf8) + header.append(contentsOf: metadataString.utf8) + + var combined = header + combined.append(encodedPayload) + return combined + } + + private func encodeAsJSON(_ message: RealtimeMessageV2) throws -> Data { + try JSONEncoder().encode(message) + } + + // MARK: - Helper Methods + + private func getBinaryPayload(from payload: JSONObject) -> Data? { + // Check if there's a "payload" key with base64-encoded binary data marker + guard let payloadValue = payload["payload"], + case .object(let obj) = payloadValue, + let isBinary = obj["__binary__"]?.boolValue, + isBinary, + let base64String = obj["data"]?.stringValue, + let data = Data(base64Encoded: base64String) + else { + return nil + } + return data + } + + private func validateFieldLength(_ field: String, name: String) throws { + let length = field.utf8.count + guard length <= 255 else { + throw RealtimeError("\(name) length \(length) exceeds maximum of 255") + } + } +} diff --git a/Sources/Realtime/RealtimeBinaryPayload.swift b/Sources/Realtime/RealtimeBinaryPayload.swift new file mode 100644 index 000000000..1469ddf19 --- /dev/null +++ b/Sources/Realtime/RealtimeBinaryPayload.swift @@ -0,0 +1,50 @@ +// +// RealtimeBinaryPayload.swift +// +// +// Created by Guilherme Souza on 05/12/24. +// + +import Foundation + +/// Helper for creating and working with binary payloads in Realtime messages. +public enum RealtimeBinaryPayload { + /// Creates a JSON payload marker for binary data. + /// This can be used in `RealtimeMessageV2.payload` to indicate binary data. + /// + /// - Parameter data: The binary data to encode + /// - Returns: An AnyJSON object representing the binary data + public static func binary(_ data: Data) -> AnyJSON { + .object([ + "__binary__": .bool(true), + "data": .string(data.base64EncodedString()), + ]) + } + + /// Checks if a JSON value represents binary data. + /// - Parameter value: The AnyJSON value to check + /// - Returns: true if the value represents binary data + public static func isBinary(_ value: AnyJSON) -> Bool { + guard case .object(let obj) = value, + let isBinary = obj["__binary__"]?.boolValue + else { + return false + } + return isBinary + } + + /// Extracts binary data from a JSON value. + /// - Parameter value: The AnyJSON value containing binary data + /// - Returns: The decoded binary data, or nil if not a binary payload + public static func data(from value: AnyJSON) -> Data? { + guard case .object(let obj) = value, + let isBinary = obj["__binary__"]?.boolValue, + isBinary, + let base64String = obj["data"]?.stringValue, + let data = Data(base64Encoded: base64String) + else { + return nil + } + return data + } +} diff --git a/Sources/Realtime/RealtimeChannel+AsyncAwait.swift b/Sources/Realtime/RealtimeChannel+AsyncAwait.swift index 8a12a4d9d..e7e689684 100644 --- a/Sources/Realtime/RealtimeChannel+AsyncAwait.swift +++ b/Sources/Realtime/RealtimeChannel+AsyncAwait.swift @@ -37,8 +37,8 @@ extension RealtimeChannelV2 { /// Listen for postgres changes in a channel. @available( *, - deprecated, - message: "Use the new filter syntax instead." + deprecated, + message: "Use the new filter syntax instead." ) @_disfavoredOverload public func postgresChange( @@ -65,8 +65,8 @@ extension RealtimeChannelV2 { /// Listen for postgres changes in a channel. @available( *, - deprecated, - message: "Use the new filter syntax instead." + deprecated, + message: "Use the new filter syntax instead." ) @_disfavoredOverload public func postgresChange( @@ -93,8 +93,8 @@ extension RealtimeChannelV2 { /// Listen for postgres changes in a channel. @available( *, - deprecated, - message: "Use the new filter syntax instead." + deprecated, + message: "Use the new filter syntax instead." ) @_disfavoredOverload public func postgresChange( @@ -120,8 +120,8 @@ extension RealtimeChannelV2 { /// Listen for postgres changes in a channel. @available( *, - deprecated, - message: "Use the new filter syntax instead." + deprecated, + message: "Use the new filter syntax instead." ) @_disfavoredOverload public func postgresChange( @@ -168,7 +168,7 @@ extension RealtimeChannelV2 { return stream } - + /// Listen for `system` event. public func system() -> AsyncStream { let (stream, continuation) = AsyncStream.makeStream() @@ -192,8 +192,8 @@ extension RealtimeChannelV2 { } // Helper to work around type ambiguity in macOS 13 -fileprivate extension AsyncStream { - func compactErase() -> AsyncStream { +extension AsyncStream { + fileprivate func compactErase() -> AsyncStream { AsyncStream(compactMap { $0.wrappedAction as? T } as AsyncCompactMapSequence) } } diff --git a/Sources/Realtime/RealtimeClientV2.swift b/Sources/Realtime/RealtimeClientV2.swift index f99c5173d..de8b5eb6d 100644 --- a/Sources/Realtime/RealtimeClientV2.swift +++ b/Sources/Realtime/RealtimeClientV2.swift @@ -55,6 +55,8 @@ public final class RealtimeClientV2: Sendable, RealtimeClientProtocol { let mutableState = LockIsolated(MutableState()) let http: any HTTPClientType let apikey: String + let binaryEncoder: RealtimeBinaryEncoder? + let binaryDecoder: RealtimeBinaryDecoder? var conn: (any WebSocket)? { mutableState.conn @@ -156,6 +158,15 @@ public final class RealtimeClientV2: Sendable, RealtimeClientProtocol { self.wsTransport = wsTransport self.http = http + // Initialize serializer based on version + if options.serializerVersion == "2.0.0" { + binaryEncoder = RealtimeBinaryEncoder(allowedMetadataKeys: options.allowedMetadataKeys) + binaryDecoder = RealtimeBinaryDecoder() + } else { + binaryEncoder = nil + binaryDecoder = nil + } + precondition(options.apikey != nil, "API key is required to connect to Realtime") apikey = options.apikey! @@ -205,7 +216,8 @@ public final class RealtimeClientV2: Sendable, RealtimeClientProtocol { Self.realtimeWebSocketURL( baseURL: Self.realtimeBaseURL(url: url), apikey: options.apikey, - logLevel: options.logLevel + logLevel: options.logLevel, + serializerVersion: options.serializerVersion ), options.headers.dictionary ) @@ -374,10 +386,17 @@ public final class RealtimeClientV2: Sendable, RealtimeClientProtocol { if Task.isCancelled { return } switch event { - case .binary: - self.options.logger?.error("Unsupported binary event received.") - break + case .binary(let data): + // Binary events are supported in V2 serializer + if let decoder = self.binaryDecoder { + let message = try decoder.decode(data) + await onMessage(message) + } else { + self.options.logger?.error( + "Binary event received but V2 serializer is not enabled.") + } case .text(let text): + // Text events are always JSON let data = Data(text.utf8) let message = try JSONDecoder().decode(RealtimeMessageV2.self, from: data) await onMessage(message) @@ -530,8 +549,17 @@ public final class RealtimeClientV2: Sendable, RealtimeClientProtocol { do { // Check cancellation before sending, because this push may have been cancelled before a connection was established. try Task.checkCancellation() - let data = try JSONEncoder().encode(message) - self?.conn?.send(String(decoding: data, as: UTF8.self)) + + // Use binary encoder if V2 serializer is enabled + if let encoder = self?.binaryEncoder { + let encoded = try encoder.encode(message) + // Binary encoder always returns Data + self?.conn?.send(encoded) + } else { + // Fall back to JSON encoding for V1 + let data = try JSONEncoder().encode(message) + self?.conn?.send(String(decoding: data, as: UTF8.self)) + } } catch { self?.options.logger?.error( """ @@ -586,7 +614,12 @@ public final class RealtimeClientV2: Sendable, RealtimeClientProtocol { return url } - static func realtimeWebSocketURL(baseURL: URL, apikey: String?, logLevel: LogLevel?) -> URL { + static func realtimeWebSocketURL( + baseURL: URL, + apikey: String?, + logLevel: LogLevel?, + serializerVersion: String = "1.0.0" + ) -> URL { guard var components = URLComponents(url: baseURL, resolvingAgainstBaseURL: false) else { return baseURL @@ -596,7 +629,7 @@ public final class RealtimeClientV2: Sendable, RealtimeClientProtocol { if let apikey { components.queryItems!.append(URLQueryItem(name: "apikey", value: apikey)) } - components.queryItems!.append(URLQueryItem(name: "vsn", value: "1.0.0")) + components.queryItems!.append(URLQueryItem(name: "vsn", value: serializerVersion)) if let logLevel { components.queryItems!.append(URLQueryItem(name: "log_level", value: logLevel.rawValue)) diff --git a/Sources/Realtime/RealtimePostgresFilter.swift b/Sources/Realtime/RealtimePostgresFilter.swift index 8b8c77906..976d570ad 100644 --- a/Sources/Realtime/RealtimePostgresFilter.swift +++ b/Sources/Realtime/RealtimePostgresFilter.swift @@ -17,19 +17,19 @@ public enum RealtimePostgresFilter { var value: String { switch self { - case let .eq(column, value): + case .eq(let column, let value): return "\(column)=eq.\(value.rawValue)" - case let .neq(column, value): + case .neq(let column, let value): return "\(column)=neq.\(value.rawValue)" - case let .gt(column, value): + case .gt(let column, let value): return "\(column)=gt.\(value.rawValue)" - case let .gte(column, value): + case .gte(let column, let value): return "\(column)=gte.\(value.rawValue)" - case let .lt(column, value): + case .lt(let column, let value): return "\(column)=lt.\(value.rawValue)" - case let .lte(column, value): + case .lte(let column, let value): return "\(column)=lte.\(value.rawValue)" - case let .in(column, values): + case .in(let column, let values): return "\(column)=in.(\(values.map(\.rawValue).joined(separator: ",")))" } } diff --git a/Sources/Realtime/RealtimeSerializer.swift b/Sources/Realtime/RealtimeSerializer.swift new file mode 100644 index 000000000..db53f3698 --- /dev/null +++ b/Sources/Realtime/RealtimeSerializer.swift @@ -0,0 +1,21 @@ +// +// RealtimeSerializer.swift +// +// +// Created by Guilherme Souza on 05/12/24. +// + +import Foundation + +/// Protocol for encoding and decoding Realtime messages. +protocol RealtimeSerializer: Sendable { + /// Encodes a message for sending over the WebSocket connection. + /// - Parameter message: The message to encode + /// - Returns: Either Data (for binary encoding) or String (for JSON encoding) + func encode(_ message: RealtimeMessageV2) throws -> Any + + /// Decodes a message received from the WebSocket connection. + /// - Parameter data: Either Data (binary) or String (JSON) + /// - Returns: The decoded message + func decode(_ data: Any) throws -> RealtimeMessageV2 +} diff --git a/Sources/Realtime/Types.swift b/Sources/Realtime/Types.swift index cd0a44c3a..366db1a4f 100644 --- a/Sources/Realtime/Types.swift +++ b/Sources/Realtime/Types.swift @@ -28,12 +28,23 @@ public struct RealtimeClientOptions: Sendable { package var accessToken: (@Sendable () async throws -> String?)? package var logger: (any SupabaseLogger)? + /// Serializer version to use. Defaults to "1.0.0". + /// - "1.0.0": JSON-only serializer (default, backward compatible) + /// - "2.0.0": Binary serializer with support for binary payloads and metadata + var serializerVersion: String + + /// Allowed metadata keys for user broadcast push messages (V2 serializer only). + /// Only these keys will be included in the metadata section of binary messages. + var allowedMetadataKeys: [String] + public static let defaultHeartbeatInterval: TimeInterval = 25 public static let defaultReconnectDelay: TimeInterval = 7 public static let defaultTimeoutInterval: TimeInterval = 10 public static let defaultDisconnectOnSessionLoss = true public static let defaultConnectOnSubscribe: Bool = true public static let defaultMaxRetryAttempts: Int = 5 + public static let defaultSerializerVersion = "1.0.0" + public static let defaultAllowedMetadataKeys: [String] = [] public init( headers: [String: String] = [:], @@ -46,7 +57,9 @@ public struct RealtimeClientOptions: Sendable { logLevel: LogLevel? = nil, fetch: (@Sendable (_ request: URLRequest) async throws -> (Data, URLResponse))? = nil, accessToken: (@Sendable () async throws -> String?)? = nil, - logger: (any SupabaseLogger)? = nil + logger: (any SupabaseLogger)? = nil, + serializerVersion: String = Self.defaultSerializerVersion, + allowedMetadataKeys: [String] = Self.defaultAllowedMetadataKeys ) { self.headers = HTTPFields(headers) self.heartbeatInterval = heartbeatInterval @@ -59,6 +72,8 @@ public struct RealtimeClientOptions: Sendable { self.fetch = fetch self.accessToken = accessToken self.logger = logger + self.serializerVersion = serializerVersion + self.allowedMetadataKeys = allowedMetadataKeys } var apikey: String? { diff --git a/Tests/RealtimeTests/RealtimeSerializerTests.swift b/Tests/RealtimeTests/RealtimeSerializerTests.swift new file mode 100644 index 000000000..c21be4302 --- /dev/null +++ b/Tests/RealtimeTests/RealtimeSerializerTests.swift @@ -0,0 +1,373 @@ +// +// RealtimeSerializerTests.swift +// +// +// Created by Guilherme Souza on 05/12/24. +// + +import Foundation +import XCTest + +@testable import Realtime + +final class RealtimeSerializerTests: XCTestCase { + // MARK: - Binary Encoder Tests + + func testEncodePushWithBinaryPayload() throws { + let encoder = RealtimeBinaryEncoder() + + let binaryData = Data([0x01, 0x04]) + let message = RealtimeMessageV2( + joinRef: "10", + ref: "1", + topic: "t", + event: "e", + payload: ["payload": RealtimeBinaryPayload.binary(binaryData)] + ) + + let encoded = try encoder.encode(message) + XCTAssertTrue(encoded.count > 0) + + // Verify the structure + XCTAssertEqual(encoded[0], 0) // Kind: push + XCTAssertEqual(encoded[1], 2) // joinRef length + XCTAssertEqual(encoded[2], 1) // ref length + XCTAssertEqual(encoded[3], 1) // topic length + XCTAssertEqual(encoded[4], 1) // event length + + // Verify payload is appended + let headerEnd = 1 + 4 + 2 + 1 + 1 + 1 // header + meta + strings + let payloadStart = encoded.index(encoded.startIndex, offsetBy: headerEnd) + XCTAssertEqual(encoded[payloadStart], 0x01) + XCTAssertEqual(encoded[payloadStart + 1], 0x04) + } + + func testEncodeUserBroadcastPushWithJSONNoMetadata() throws { + let encoder = RealtimeBinaryEncoder() + + let message = RealtimeMessageV2( + joinRef: "10", + ref: "1", + topic: "top", + event: "broadcast", + payload: [ + "type": "broadcast", + "event": "user-event", + "payload": ["a": "b"], + ] + ) + + let encoded = try encoder.encode(message) + + // Verify the structure + XCTAssertEqual(encoded[0], 3) // Kind: userBroadcastPush + XCTAssertEqual(encoded[1], 2) // joinRef length + XCTAssertEqual(encoded[2], 1) // ref length + XCTAssertEqual(encoded[3], 3) // topic length ("top") + XCTAssertEqual(encoded[4], 10) // userEvent length ("user-event") + XCTAssertEqual(encoded[5], 0) // metadata length + XCTAssertEqual(encoded[6], 1) // JSON encoding + } + + func testEncodeUserBroadcastPushWithAllowedMetadata() throws { + let encoder = RealtimeBinaryEncoder(allowedMetadataKeys: ["extra"]) + + let message = RealtimeMessageV2( + joinRef: "10", + ref: "1", + topic: "top", + event: "broadcast", + payload: [ + "type": "broadcast", + "event": "user-event", + "extra": "bit", + "store": .bool(true), // Should not be included + "payload": ["a": "b"], + ] + ) + + let encoded = try encoder.encode(message) + + // Verify metadata is included + XCTAssertEqual(encoded[0], 3) // Kind: userBroadcastPush + XCTAssertEqual(encoded[5], 15) // metadata length ({"extra":"bit"}) + XCTAssertEqual(encoded[6], 1) // JSON encoding + } + + func testEncodeUserBroadcastPushWithBinaryPayload() throws { + let encoder = RealtimeBinaryEncoder() + + let binaryData = Data([0x01, 0x04]) + let message = RealtimeMessageV2( + joinRef: "10", + ref: "1", + topic: "top", + event: "broadcast", + payload: [ + "event": "user-event", + "payload": RealtimeBinaryPayload.binary(binaryData), + ] + ) + + let encoded = try encoder.encode(message) + + // Verify the structure + XCTAssertEqual(encoded[0], 3) // Kind: userBroadcastPush + XCTAssertEqual(encoded[6], 0) // Binary encoding + } + + func testThrowsErrorWhenJoinRefExceeds255() { + let encoder = RealtimeBinaryEncoder() + let longJoinRef = String(repeating: "a", count: 256) + + let message = RealtimeMessageV2( + joinRef: longJoinRef, + ref: "1", + topic: "top", + event: "broadcast", + payload: [ + "event": "user-event", + "payload": ["a": "b"], + ] + ) + + XCTAssertThrowsError(try encoder.encode(message)) { error in + XCTAssertTrue(error.localizedDescription.contains("joinRef length")) + } + } + + func testThrowsErrorWhenTopicExceeds255() { + let encoder = RealtimeBinaryEncoder() + let longTopic = String(repeating: "a", count: 256) + + let message = RealtimeMessageV2( + joinRef: "10", + ref: "1", + topic: longTopic, + event: "broadcast", + payload: [ + "event": "user-event", + "payload": ["a": "b"], + ] + ) + + XCTAssertThrowsError(try encoder.encode(message)) { error in + XCTAssertTrue(error.localizedDescription.contains("topic length")) + } + } + + // MARK: - Binary Decoder Tests + + func testDecodePushWithJSONPayload() throws { + let decoder = RealtimeBinaryDecoder() + + // Construct: kind(1) + joinRefLen(1) + topicLen(1) + eventLen(1) + strings + payload + var data = Data() + data.append(0) // kind: push + data.append(3) // joinRef length + data.append(3) // topic length + data.append(10) // event length + data.append(contentsOf: "123".utf8) + data.append(contentsOf: "top".utf8) + data.append(contentsOf: "some-event".utf8) + data.append(contentsOf: #"{"a":"b"}"#.utf8) + + let message = try decoder.decode(data) + + XCTAssertEqual(message.joinRef, "123") + XCTAssertNil(message.ref) + XCTAssertEqual(message.topic, "top") + XCTAssertEqual(message.event, "some-event") + XCTAssertEqual(message.payload["a"]?.stringValue, "b") + } + + func testDecodeReplyWithJSONPayload() throws { + let decoder = RealtimeBinaryDecoder() + + var data = Data() + data.append(1) // kind: reply + data.append(3) // joinRef length + data.append(2) // ref length + data.append(3) // topic length + data.append(2) // event/status length + data.append(contentsOf: "100".utf8) + data.append(contentsOf: "12".utf8) + data.append(contentsOf: "top".utf8) + data.append(contentsOf: "ok".utf8) + data.append(contentsOf: #"{"a":"b"}"#.utf8) + + let message = try decoder.decode(data) + + XCTAssertEqual(message.joinRef, "100") + XCTAssertEqual(message.ref, "12") + XCTAssertEqual(message.topic, "top") + XCTAssertEqual(message.event, "phx_reply") + XCTAssertEqual(message.payload["status"]?.stringValue, "ok") + XCTAssertEqual(message.payload["response"]?.objectValue?["a"]?.stringValue, "b") + } + + func testDecodeBroadcastWithJSONPayload() throws { + let decoder = RealtimeBinaryDecoder() + + var data = Data() + data.append(2) // kind: broadcast + data.append(3) // topic length + data.append(10) // event length + data.append(contentsOf: "top".utf8) + data.append(contentsOf: "some-event".utf8) + data.append(contentsOf: #"{"a":"b"}"#.utf8) + + let message = try decoder.decode(data) + + XCTAssertNil(message.joinRef) + XCTAssertNil(message.ref) + XCTAssertEqual(message.topic, "top") + XCTAssertEqual(message.event, "some-event") + XCTAssertEqual(message.payload["a"]?.stringValue, "b") + } + + func testDecodeUserBroadcastWithJSONPayloadNoMetadata() throws { + let decoder = RealtimeBinaryDecoder() + + var data = Data() + data.append(4) // kind: userBroadcast + data.append(3) // topic length + data.append(10) // userEvent length + data.append(0) // metadata length + data.append(1) // JSON encoding + data.append(contentsOf: "top".utf8) + data.append(contentsOf: "user-event".utf8) + // no metadata + data.append(contentsOf: #"{"a":"b"}"#.utf8) + + let message = try decoder.decode(data) + + XCTAssertNil(message.joinRef) + XCTAssertNil(message.ref) + XCTAssertEqual(message.topic, "top") + XCTAssertEqual(message.event, "broadcast") + XCTAssertEqual(message.payload["type"]?.stringValue, "broadcast") + XCTAssertEqual(message.payload["event"]?.stringValue, "user-event") + XCTAssertEqual(message.payload["payload"]?.objectValue?["a"]?.stringValue, "b") + } + + func testDecodeUserBroadcastWithJSONPayloadAndMetadata() throws { + let decoder = RealtimeBinaryDecoder() + + var data = Data() + data.append(4) // kind: userBroadcast + data.append(3) // topic length + data.append(10) // userEvent length + data.append(17) // metadata length + data.append(1) // JSON encoding + data.append(contentsOf: "top".utf8) + data.append(contentsOf: "user-event".utf8) + data.append(contentsOf: #"{"replayed":true}"#.utf8) + data.append(contentsOf: #"{"a":"b"}"#.utf8) + + let message = try decoder.decode(data) + + XCTAssertEqual(message.event, "broadcast") + XCTAssertEqual(message.payload["event"]?.stringValue, "user-event") + XCTAssertEqual(message.payload["meta"]?.objectValue?["replayed"]?.boolValue, true) + XCTAssertEqual(message.payload["payload"]?.objectValue?["a"]?.stringValue, "b") + } + + func testDecodeUserBroadcastWithBinaryPayloadNoMetadata() throws { + let decoder = RealtimeBinaryDecoder() + + var data = Data() + data.append(4) // kind: userBroadcast + data.append(3) // topic length + data.append(10) // userEvent length + data.append(0) // metadata length + data.append(0) // binary encoding + data.append(contentsOf: "top".utf8) + data.append(contentsOf: "user-event".utf8) + // no metadata + data.append(0x01) + data.append(0x04) + + let message = try decoder.decode(data) + + XCTAssertEqual(message.event, "broadcast") + XCTAssertEqual(message.payload["type"]?.stringValue, "broadcast") + XCTAssertEqual(message.payload["event"]?.stringValue, "user-event") + + // Check binary payload + let binaryPayload = RealtimeBinaryPayload.data(from: message.payload["payload"]!) + XCTAssertNotNil(binaryPayload) + XCTAssertEqual(binaryPayload, Data([0x01, 0x04])) + } + + func testDecodeUserBroadcastWithBinaryPayloadAndMetadata() throws { + let decoder = RealtimeBinaryDecoder() + + var data = Data() + data.append(4) // kind: userBroadcast + data.append(3) // topic length + data.append(10) // userEvent length + data.append(17) // metadata length + data.append(0) // binary encoding + data.append(contentsOf: "top".utf8) + data.append(contentsOf: "user-event".utf8) + data.append(contentsOf: #"{"replayed":true}"#.utf8) + data.append(0x01) + data.append(0x04) + + let message = try decoder.decode(data) + + XCTAssertEqual(message.payload["event"]?.stringValue, "user-event") + XCTAssertEqual(message.payload["meta"]?.objectValue?["replayed"]?.boolValue, true) + + let binaryPayload = RealtimeBinaryPayload.data(from: message.payload["payload"]!) + XCTAssertNotNil(binaryPayload) + XCTAssertEqual(binaryPayload, Data([0x01, 0x04])) + } + + // MARK: - Binary Payload Helper Tests + + func testBinaryPayloadHelper() { + let data = Data([0x01, 0x02, 0x03]) + let payload = RealtimeBinaryPayload.binary(data) + + XCTAssertTrue(RealtimeBinaryPayload.isBinary(payload)) + + let extractedData = RealtimeBinaryPayload.data(from: payload) + XCTAssertEqual(extractedData, data) + } + + func testBinaryPayloadHelperWithNonBinary() { + let payload: AnyJSON = .string("test") + + XCTAssertFalse(RealtimeBinaryPayload.isBinary(payload)) + XCTAssertNil(RealtimeBinaryPayload.data(from: payload)) + } + + // MARK: - Round-trip Tests + + func testRoundTripUserBroadcastWithBinary() throws { + let encoder = RealtimeBinaryEncoder() + let decoder = RealtimeBinaryDecoder() + + let originalData = Data([0x01, 0x02, 0x03, 0x04]) + let originalMessage = RealtimeMessageV2( + joinRef: "10", + ref: "1", + topic: "test-topic", + event: "broadcast", + payload: [ + "event": "test-event", + "payload": RealtimeBinaryPayload.binary(originalData), + ] + ) + + let encoded = try encoder.encode(originalMessage) + + // Note: We can't directly decode what we encode because the server + // would send it back as userBroadcast (kind 4) not userBroadcastPush (kind 3) + // But we can verify the encoding structure is correct + XCTAssertTrue(encoded.count > 0) + XCTAssertEqual(encoded[0], 3) // userBroadcastPush + } +} From f1d11f1f21340609a57c0b7096615a47033915e8 Mon Sep 17 00:00:00 2001 From: Guilherme Souza Date: Fri, 5 Dec 2025 07:30:36 -0300 Subject: [PATCH 2/2] feat(realtime): add RealtimeMessageV3 with proper binary payload support MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit This commit refactors the binary serializer implementation to introduce RealtimeMessageV3, which provides proper type distinction between JSON and binary payloads. This approach avoids breaking changes by keeping RealtimeMessageV2 intact while allowing gradual migration to the new type. Key changes: - Add RealtimeMessageV3 with RealtimePayload enum (json/binary) - Update RealtimeBinaryEncoder to work with V3 messages - Update RealtimeBinaryDecoder to return V3 messages - Add conversion methods between V2 and V3 - Update RealtimeClientV2 with pushV3() method - Update tests to use V3 messages - Add 4 new V2/V3 conversion tests - Fix MockRealtimeClient to implement pushV3() - Fix Swift 6 warnings with 'any Encoder' and 'any Decoder' The V2 serializer now properly encodes/decodes both JSON and binary payloads with support for metadata in user broadcast messages. Related to supabase-js PRs: - #1829: Implement Realtime V2 Serializer - #1894: Add metadata to realtime user broadcast push 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude --- Sources/Realtime/RealtimeBinaryDecoder.swift | 14 +- Sources/Realtime/RealtimeBinaryEncoder.swift | 54 ++--- Sources/Realtime/RealtimeClientV2.swift | 15 +- Sources/Realtime/RealtimeMessageV3.swift | 212 ++++++++++++++++++ Tests/RealtimeTests/PushV2Tests.swift | 4 + .../RealtimeSerializerTests.swift | 127 +++++++++-- 6 files changed, 374 insertions(+), 52 deletions(-) create mode 100644 Sources/Realtime/RealtimeMessageV3.swift diff --git a/Sources/Realtime/RealtimeBinaryDecoder.swift b/Sources/Realtime/RealtimeBinaryDecoder.swift index 3274ab9f4..684320048 100644 --- a/Sources/Realtime/RealtimeBinaryDecoder.swift +++ b/Sources/Realtime/RealtimeBinaryDecoder.swift @@ -30,10 +30,18 @@ final class RealtimeBinaryDecoder: Sendable { case json = 1 } - /// Decodes binary data into a Realtime message. + /// Decodes binary data into a V3 Realtime message. /// - Parameter data: Binary data to decode - /// - Returns: Decoded message - func decode(_ data: Data) throws -> RealtimeMessageV2 { + /// - Returns: Decoded V3 message + func decode(_ data: Data) throws -> RealtimeMessageV3 { + let v2Message = try decodeToV2(data) + return RealtimeMessageV3.fromV2(v2Message) + } + + /// Decodes binary data into a V2 Realtime message (for backward compatibility). + /// - Parameter data: Binary data to decode + /// - Returns: Decoded V2 message + func decodeToV2(_ data: Data) throws -> RealtimeMessageV2 { guard !data.isEmpty else { throw RealtimeError("Empty binary data") } diff --git a/Sources/Realtime/RealtimeBinaryEncoder.swift b/Sources/Realtime/RealtimeBinaryEncoder.swift index 1b237c2b4..f61ab9de1 100644 --- a/Sources/Realtime/RealtimeBinaryEncoder.swift +++ b/Sources/Realtime/RealtimeBinaryEncoder.swift @@ -37,29 +37,38 @@ final class RealtimeBinaryEncoder: Sendable { self.allowedMetadataKeys = allowedMetadataKeys } - /// Encodes a Realtime message to binary format. + /// Encodes a V3 Realtime message to binary format. /// - Parameter message: The message to encode /// - Returns: Binary data representation - func encode(_ message: RealtimeMessageV2) throws -> Data { + func encode(_ message: RealtimeMessageV3) throws -> Data { // Check if this is a user broadcast push if message.event == "broadcast", - let event = message.payload["event"]?.stringValue + case .json(let jsonPayload) = message.payload, + let event = jsonPayload["event"]?.stringValue { - return try encodeUserBroadcastPush(message: message, userEvent: event) + return try encodeUserBroadcastPush( + message: message, userEvent: event, jsonPayload: jsonPayload) } - // Check if this has a binary payload at top level - if let binaryPayload = getBinaryPayload(from: message.payload) { + // Check if this has a binary payload + if case .binary(let binaryPayload) = message.payload { return try encodePush(message: message, binaryPayload: binaryPayload) } - // Fall back to JSON encoding + // Fall back to JSON encoding for standard JSON messages return try encodeAsJSON(message) } + /// Encodes a V2 Realtime message to binary format (for backward compatibility). + /// - Parameter message: The message to encode + /// - Returns: Binary data representation + func encodeV2(_ message: RealtimeMessageV2) throws -> Data { + try encode(RealtimeMessageV3.fromV2(message)) + } + // MARK: - Private Encoding Methods - private func encodePush(message: RealtimeMessageV2, binaryPayload: Data) throws -> Data { + private func encodePush(message: RealtimeMessageV3, binaryPayload: Data) throws -> Data { let joinRef = message.joinRef ?? "" let ref = message.ref ?? "" let topic = message.topic @@ -91,21 +100,22 @@ final class RealtimeBinaryEncoder: Sendable { } private func encodeUserBroadcastPush( - message: RealtimeMessageV2, - userEvent: String + message: RealtimeMessageV3, + userEvent: String, + jsonPayload: JSONObject ) throws -> Data { let joinRef = message.joinRef ?? "" let ref = message.ref ?? "" let topic = message.topic // Extract the payload - let payload = message.payload["payload"] ?? .null + let payload = jsonPayload["payload"] ?? .null // Encode payload let encodedPayload: Data let encoding: PayloadEncoding - if let binaryData = getBinaryPayload(from: ["payload": payload]) { + if let binaryData = RealtimeBinaryPayload.data(from: payload) { encodedPayload = binaryData encoding = .binary } else { @@ -116,7 +126,7 @@ final class RealtimeBinaryEncoder: Sendable { // Extract metadata based on allowed keys let metadata: JSONObject if !allowedMetadataKeys.isEmpty { - metadata = message.payload.filter { key, _ in + metadata = jsonPayload.filter { key, _ in allowedMetadataKeys.contains(key) && key != "event" && key != "payload" && key != "type" } } else { @@ -165,26 +175,10 @@ final class RealtimeBinaryEncoder: Sendable { return combined } - private func encodeAsJSON(_ message: RealtimeMessageV2) throws -> Data { + private func encodeAsJSON(_ message: RealtimeMessageV3) throws -> Data { try JSONEncoder().encode(message) } - // MARK: - Helper Methods - - private func getBinaryPayload(from payload: JSONObject) -> Data? { - // Check if there's a "payload" key with base64-encoded binary data marker - guard let payloadValue = payload["payload"], - case .object(let obj) = payloadValue, - let isBinary = obj["__binary__"]?.boolValue, - isBinary, - let base64String = obj["data"]?.stringValue, - let data = Data(base64Encoded: base64String) - else { - return nil - } - return data - } - private func validateFieldLength(_ field: String, name: String) throws { let length = field.utf8.count guard length <= 255 else { diff --git a/Sources/Realtime/RealtimeClientV2.swift b/Sources/Realtime/RealtimeClientV2.swift index de8b5eb6d..93736d5ce 100644 --- a/Sources/Realtime/RealtimeClientV2.swift +++ b/Sources/Realtime/RealtimeClientV2.swift @@ -25,6 +25,7 @@ protocol RealtimeClientProtocol: AnyObject, Sendable { func connect() async func push(_ message: RealtimeMessageV2) + func pushV3(_ message: RealtimeMessageV3) func _getAccessToken() async -> String? func makeRef() -> String func _remove(_ channel: any RealtimeChannelProtocol) @@ -389,7 +390,8 @@ public final class RealtimeClientV2: Sendable, RealtimeClientProtocol { case .binary(let data): // Binary events are supported in V2 serializer if let decoder = self.binaryDecoder { - let message = try decoder.decode(data) + let messageV3 = try decoder.decode(data) + let message = messageV3.toV2() await onMessage(message) } else { self.options.logger?.error( @@ -541,10 +543,10 @@ public final class RealtimeClientV2: Sendable, RealtimeClientProtocol { } } - /// Push out a message if the socket is connected. + /// Push out a V3 message if the socket is connected. /// /// If the socket is not connected, the message gets enqueued within a local buffer, and sent out when a connection is next established. - public func push(_ message: RealtimeMessageV2) { + public func pushV3(_ message: RealtimeMessageV3) { let callback = { @Sendable [weak self] in do { // Check cancellation before sending, because this push may have been cancelled before a connection was established. @@ -582,6 +584,13 @@ public final class RealtimeClientV2: Sendable, RealtimeClientProtocol { } } + /// Push out a V2 message if the socket is connected. + /// + /// If the socket is not connected, the message gets enqueued within a local buffer, and sent out when a connection is next established. + public func push(_ message: RealtimeMessageV2) { + pushV3(RealtimeMessageV3.fromV2(message)) + } + private func flushSendBuffer() { mutableState.withValue { $0.sendBuffer.forEach { $0() } diff --git a/Sources/Realtime/RealtimeMessageV3.swift b/Sources/Realtime/RealtimeMessageV3.swift new file mode 100644 index 000000000..0ad3da326 --- /dev/null +++ b/Sources/Realtime/RealtimeMessageV3.swift @@ -0,0 +1,212 @@ +// +// RealtimeMessageV3.swift +// +// +// Created by Guilherme Souza on 05/12/24. +// + +import Foundation + +/// Payload type that can represent either JSON or binary data. +public enum RealtimePayload: Sendable, Hashable { + /// JSON payload represented as a dictionary + case json(JSONObject) + /// Binary payload + case binary(Data) + + /// Returns the JSON object if this is a JSON payload + public var jsonValue: JSONObject? { + if case .json(let object) = self { + return object + } + return nil + } + + /// Returns the binary data if this is a binary payload + public var binaryValue: Data? { + if case .binary(let data) = self { + return data + } + return nil + } + + /// Helper to get a value from the JSON payload + public subscript(key: String) -> AnyJSON? { + jsonValue?[key] + } +} + +extension RealtimePayload: Codable { + public init(from decoder: any Decoder) throws { + // When decoding, we always decode as JSON + let container = try decoder.singleValueContainer() + let object = try container.decode(JSONObject.self) + self = .json(object) + } + + public func encode(to encoder: any Encoder) throws { + var container = encoder.singleValueContainer() + switch self { + case .json(let object): + try container.encode(object) + case .binary(let data): + try container.encode(data) + // Binary payloads should be encoded using the binary encoder, not JSONEncoder + throw EncodingError.invalidValue( + self, + EncodingError.Context( + codingPath: encoder.codingPath, + debugDescription: "Binary payloads must be encoded using RealtimeBinaryEncoder" + ) + ) + } + } +} + +/// V3 Realtime message with proper support for both JSON and binary payloads. +/// +/// This type is designed to work seamlessly with the V2 serializer while maintaining +/// backward compatibility through conversion to/from `RealtimeMessageV2`. +public struct RealtimeMessageV3: Hashable, Sendable { + public let joinRef: String? + public let ref: String? + public let topic: String + public let event: String + public let payload: RealtimePayload + + public init( + joinRef: String?, + ref: String?, + topic: String, + event: String, + payload: RealtimePayload + ) { + self.joinRef = joinRef + self.ref = ref + self.topic = topic + self.event = event + self.payload = payload + } + + /// Convenience initializer for JSON payloads + public init( + joinRef: String?, + ref: String?, + topic: String, + event: String, + payload: JSONObject + ) { + self.init( + joinRef: joinRef, + ref: ref, + topic: topic, + event: event, + payload: .json(payload) + ) + } + + /// Convenience initializer for binary payloads + public init( + joinRef: String?, + ref: String?, + topic: String, + event: String, + binaryPayload: Data + ) { + self.init( + joinRef: joinRef, + ref: ref, + topic: topic, + event: event, + payload: .binary(binaryPayload) + ) + } + + /// Status for the received message if any. + public var status: PushStatus? { + payload["status"] + .flatMap(\.stringValue) + .flatMap(PushStatus.init(rawValue:)) + } + + /// Converts to V2 message format (for backward compatibility) + public func toV2() -> RealtimeMessageV2 { + let jsonPayload: JSONObject + switch payload { + case .json(let object): + jsonPayload = object + case .binary(let data): + // Wrap binary data in the special marker format + jsonPayload = ["payload": RealtimeBinaryPayload.binary(data)] + } + + return RealtimeMessageV2( + joinRef: joinRef, + ref: ref, + topic: topic, + event: event, + payload: jsonPayload + ) + } + + /// Creates from V2 message format + public static func fromV2(_ message: RealtimeMessageV2) -> RealtimeMessageV3 { + // Check if this is a direct binary payload (not a broadcast message with nested binary) + // A direct binary payload would have just the "payload" key with binary marker + if message.payload.count == 1, + let payloadValue = message.payload["payload"], + let binaryData = RealtimeBinaryPayload.data(from: payloadValue) + { + return RealtimeMessageV3( + joinRef: message.joinRef, + ref: message.ref, + topic: message.topic, + event: message.event, + binaryPayload: binaryData + ) + } + + // Otherwise it's a JSON payload (including broadcast messages with nested binary) + return RealtimeMessageV3( + joinRef: message.joinRef, + ref: message.ref, + topic: message.topic, + event: message.event, + payload: message.payload + ) + } +} + +extension RealtimeMessageV3: Codable { + private enum CodingKeys: String, CodingKey { + case joinRef = "join_ref" + case ref + case topic + case event + case payload + } + + public init(from decoder: any Decoder) throws { + let container = try decoder.container(keyedBy: CodingKeys.self) + joinRef = try container.decodeIfPresent(String.self, forKey: .joinRef) + ref = try container.decodeIfPresent(String.self, forKey: .ref) + topic = try container.decode(String.self, forKey: .topic) + event = try container.decode(String.self, forKey: .event) + payload = try container.decode(RealtimePayload.self, forKey: .payload) + } + + public func encode(to encoder: any Encoder) throws { + var container = encoder.container(keyedBy: CodingKeys.self) + try container.encodeIfPresent(joinRef, forKey: .joinRef) + try container.encodeIfPresent(ref, forKey: .ref) + try container.encode(topic, forKey: .topic) + try container.encode(event, forKey: .event) + try container.encode(payload, forKey: .payload) + } +} + +extension RealtimeMessageV3: HasRawMessage { + public var rawMessage: RealtimeMessageV2 { + toV2() + } +} diff --git a/Tests/RealtimeTests/PushV2Tests.swift b/Tests/RealtimeTests/PushV2Tests.swift index 44882981c..d511c52b9 100644 --- a/Tests/RealtimeTests/PushV2Tests.swift +++ b/Tests/RealtimeTests/PushV2Tests.swift @@ -324,6 +324,10 @@ private final class MockRealtimeClient: RealtimeClientProtocol, @unchecked Senda } } + func pushV3(_ message: RealtimeMessageV3) { + push(message.toV2()) + } + func _getAccessToken() async -> String? { return nil } diff --git a/Tests/RealtimeTests/RealtimeSerializerTests.swift b/Tests/RealtimeTests/RealtimeSerializerTests.swift index c21be4302..c2e33e963 100644 --- a/Tests/RealtimeTests/RealtimeSerializerTests.swift +++ b/Tests/RealtimeTests/RealtimeSerializerTests.swift @@ -11,18 +11,18 @@ import XCTest @testable import Realtime final class RealtimeSerializerTests: XCTestCase { - // MARK: - Binary Encoder Tests + // MARK: - Binary Encoder Tests (V3) func testEncodePushWithBinaryPayload() throws { let encoder = RealtimeBinaryEncoder() let binaryData = Data([0x01, 0x04]) - let message = RealtimeMessageV2( + let message = RealtimeMessageV3( joinRef: "10", ref: "1", topic: "t", event: "e", - payload: ["payload": RealtimeBinaryPayload.binary(binaryData)] + binaryPayload: binaryData ) let encoded = try encoder.encode(message) @@ -45,7 +45,7 @@ final class RealtimeSerializerTests: XCTestCase { func testEncodeUserBroadcastPushWithJSONNoMetadata() throws { let encoder = RealtimeBinaryEncoder() - let message = RealtimeMessageV2( + let message = RealtimeMessageV3( joinRef: "10", ref: "1", topic: "top", @@ -72,7 +72,7 @@ final class RealtimeSerializerTests: XCTestCase { func testEncodeUserBroadcastPushWithAllowedMetadata() throws { let encoder = RealtimeBinaryEncoder(allowedMetadataKeys: ["extra"]) - let message = RealtimeMessageV2( + let message = RealtimeMessageV3( joinRef: "10", ref: "1", topic: "top", @@ -98,7 +98,7 @@ final class RealtimeSerializerTests: XCTestCase { let encoder = RealtimeBinaryEncoder() let binaryData = Data([0x01, 0x04]) - let message = RealtimeMessageV2( + let message = RealtimeMessageV3( joinRef: "10", ref: "1", topic: "top", @@ -120,7 +120,7 @@ final class RealtimeSerializerTests: XCTestCase { let encoder = RealtimeBinaryEncoder() let longJoinRef = String(repeating: "a", count: 256) - let message = RealtimeMessageV2( + let message = RealtimeMessageV3( joinRef: longJoinRef, ref: "1", topic: "top", @@ -140,7 +140,7 @@ final class RealtimeSerializerTests: XCTestCase { let encoder = RealtimeBinaryEncoder() let longTopic = String(repeating: "a", count: 256) - let message = RealtimeMessageV2( + let message = RealtimeMessageV3( joinRef: "10", ref: "1", topic: longTopic, @@ -291,11 +291,18 @@ final class RealtimeSerializerTests: XCTestCase { let message = try decoder.decode(data) XCTAssertEqual(message.event, "broadcast") - XCTAssertEqual(message.payload["type"]?.stringValue, "broadcast") - XCTAssertEqual(message.payload["event"]?.stringValue, "user-event") + + // For V3 messages, check the payload structure + guard case .json(let jsonPayload) = message.payload else { + XCTFail("Expected JSON payload") + return + } + + XCTAssertEqual(jsonPayload["type"]?.stringValue, "broadcast") + XCTAssertEqual(jsonPayload["event"]?.stringValue, "user-event") // Check binary payload - let binaryPayload = RealtimeBinaryPayload.data(from: message.payload["payload"]!) + let binaryPayload = RealtimeBinaryPayload.data(from: jsonPayload["payload"]!) XCTAssertNotNil(binaryPayload) XCTAssertEqual(binaryPayload, Data([0x01, 0x04])) } @@ -317,10 +324,15 @@ final class RealtimeSerializerTests: XCTestCase { let message = try decoder.decode(data) - XCTAssertEqual(message.payload["event"]?.stringValue, "user-event") - XCTAssertEqual(message.payload["meta"]?.objectValue?["replayed"]?.boolValue, true) + guard case .json(let jsonPayload) = message.payload else { + XCTFail("Expected JSON payload") + return + } - let binaryPayload = RealtimeBinaryPayload.data(from: message.payload["payload"]!) + XCTAssertEqual(jsonPayload["event"]?.stringValue, "user-event") + XCTAssertEqual(jsonPayload["meta"]?.objectValue?["replayed"]?.boolValue, true) + + let binaryPayload = RealtimeBinaryPayload.data(from: jsonPayload["payload"]!) XCTAssertNotNil(binaryPayload) XCTAssertEqual(binaryPayload, Data([0x01, 0x04])) } @@ -348,10 +360,9 @@ final class RealtimeSerializerTests: XCTestCase { func testRoundTripUserBroadcastWithBinary() throws { let encoder = RealtimeBinaryEncoder() - let decoder = RealtimeBinaryDecoder() let originalData = Data([0x01, 0x02, 0x03, 0x04]) - let originalMessage = RealtimeMessageV2( + let originalMessage = RealtimeMessageV3( joinRef: "10", ref: "1", topic: "test-topic", @@ -370,4 +381,88 @@ final class RealtimeSerializerTests: XCTestCase { XCTAssertTrue(encoded.count > 0) XCTAssertEqual(encoded[0], 3) // userBroadcastPush } + + // MARK: - V2 / V3 Conversion Tests + + func testV2ToV3ConversionWithJSON() { + let v2Message = RealtimeMessageV2( + joinRef: "10", + ref: "1", + topic: "test", + event: "broadcast", + payload: ["key": "value"] + ) + + let v3Message = RealtimeMessageV3.fromV2(v2Message) + + XCTAssertEqual(v3Message.joinRef, "10") + XCTAssertEqual(v3Message.ref, "1") + XCTAssertEqual(v3Message.topic, "test") + XCTAssertEqual(v3Message.event, "broadcast") + XCTAssertEqual(v3Message.payload["key"]?.stringValue, "value") + } + + func testV2ToV3ConversionWithBinary() { + let binaryData = Data([0x01, 0x02, 0x03]) + let v2Message = RealtimeMessageV2( + joinRef: "10", + ref: "1", + topic: "test", + event: "msg", + payload: ["payload": RealtimeBinaryPayload.binary(binaryData)] + ) + + let v3Message = RealtimeMessageV3.fromV2(v2Message) + + XCTAssertEqual(v3Message.joinRef, "10") + XCTAssertEqual(v3Message.ref, "1") + XCTAssertEqual(v3Message.topic, "test") + XCTAssertEqual(v3Message.event, "msg") + + guard case .binary(let extractedData) = v3Message.payload else { + XCTFail("Expected binary payload") + return + } + XCTAssertEqual(extractedData, binaryData) + } + + func testV3ToV2ConversionWithJSON() { + let v3Message = RealtimeMessageV3( + joinRef: "10", + ref: "1", + topic: "test", + event: "broadcast", + payload: ["key": "value"] + ) + + let v2Message = v3Message.toV2() + + XCTAssertEqual(v2Message.joinRef, "10") + XCTAssertEqual(v2Message.ref, "1") + XCTAssertEqual(v2Message.topic, "test") + XCTAssertEqual(v2Message.event, "broadcast") + XCTAssertEqual(v2Message.payload["key"]?.stringValue, "value") + } + + func testV3ToV2ConversionWithBinary() { + let binaryData = Data([0x01, 0x02, 0x03]) + let v3Message = RealtimeMessageV3( + joinRef: "10", + ref: "1", + topic: "test", + event: "msg", + binaryPayload: binaryData + ) + + let v2Message = v3Message.toV2() + + XCTAssertEqual(v2Message.joinRef, "10") + XCTAssertEqual(v2Message.ref, "1") + XCTAssertEqual(v2Message.topic, "test") + XCTAssertEqual(v2Message.event, "msg") + + // Binary data should be wrapped in the special marker format + let extractedData = RealtimeBinaryPayload.data(from: v2Message.payload["payload"]!) + XCTAssertEqual(extractedData, binaryData) + } }