@@ -61,10 +61,10 @@ actor IncomingStreamManager: Loggable {
6161 // MARK: - Packet processing
6262
6363 /// Handles a data stream header.
64- func handle( header: Livekit_DataStream . Header , from identityString: String ) {
64+ func handle( header: Livekit_DataStream . Header , from identityString: String , encryptionType : EncryptionType ) {
6565 let identity = Participant . Identity ( from: identityString)
6666
67- guard let streamInfo = Self . streamInfo ( from: header) else {
67+ guard let streamInfo = Self . streamInfo ( from: header, encryptionType : encryptionType ) else {
6868 return
6969 }
7070 openStream ( with: streamInfo, from: identity)
@@ -110,9 +110,18 @@ actor IncomingStreamManager: Loggable {
110110 }
111111
112112 /// Handles a data stream chunk.
113- func handle( chunk: Livekit_DataStream . Chunk ) {
113+ func handle( chunk: Livekit_DataStream . Chunk , encryptionType : EncryptionType ) {
114114 guard !chunk. content. isEmpty, let descriptor = openStreams [ chunk. streamID] else { return }
115115
116+ if descriptor. info. encryptionType != encryptionType {
117+ let error = StreamError . encryptionTypeMismatch (
118+ expected: descriptor. info. encryptionType,
119+ received: encryptionType
120+ )
121+ descriptor. continuation. finish ( throwing: error)
122+ return
123+ }
124+
116125 let readLength = descriptor. readLength + chunk. content. count
117126
118127 if let totalLength = descriptor. info. totalLength {
@@ -126,10 +135,20 @@ actor IncomingStreamManager: Loggable {
126135 }
127136
128137 /// Handles a data stream trailer.
129- func handle( trailer: Livekit_DataStream . Trailer ) {
138+ func handle( trailer: Livekit_DataStream . Trailer , encryptionType : EncryptionType ) {
130139 guard let descriptor = openStreams [ trailer. streamID] else {
131140 return
132141 }
142+
143+ if descriptor. info. encryptionType != encryptionType {
144+ let error = StreamError . encryptionTypeMismatch (
145+ expected: descriptor. info. encryptionType,
146+ received: encryptionType
147+ )
148+ descriptor. continuation. finish ( throwing: error)
149+ return
150+ }
151+
133152 if let totalLength = descriptor. info. totalLength {
134153 guard descriptor. readLength == totalLength else {
135154 descriptor. continuation. finish ( throwing: StreamError . incomplete)
@@ -186,10 +205,10 @@ public typealias TextStreamHandler = @Sendable (TextStreamReader, Participant.Id
186205// MARK: - From protocol types
187206
188207extension IncomingStreamManager {
189- static func streamInfo( from header: Livekit_DataStream . Header ) -> StreamInfo ? {
208+ static func streamInfo( from header: Livekit_DataStream . Header , encryptionType : EncryptionType ) -> StreamInfo ? {
190209 switch header. contentHeader {
191- case let . byteHeader( byteHeader) : ByteStreamInfo ( header, byteHeader)
192- case let . textHeader( textHeader) : TextStreamInfo ( header, textHeader)
210+ case let . byteHeader( byteHeader) : ByteStreamInfo ( header, byteHeader, encryptionType )
211+ case let . textHeader( textHeader) : TextStreamInfo ( header, textHeader, encryptionType )
193212 default : nil
194213 }
195214 }
@@ -198,14 +217,16 @@ extension IncomingStreamManager {
198217extension ByteStreamInfo {
199218 convenience init (
200219 _ header: Livekit_DataStream . Header ,
201- _ byteHeader: Livekit_DataStream . ByteHeader
220+ _ byteHeader: Livekit_DataStream . ByteHeader ,
221+ _ encryptionType: EncryptionType
202222 ) {
203223 self . init (
204224 id: header. streamID,
205225 topic: header. topic,
206226 timestamp: header. timestampDate,
207227 totalLength: header. hasTotalLength ? Int ( header. totalLength) : nil ,
208228 attributes: header. attributes,
229+ encryptionType: encryptionType,
209230 // ---
210231 mimeType: header. mimeType,
211232 name: byteHeader. name
@@ -216,14 +237,16 @@ extension ByteStreamInfo {
216237extension TextStreamInfo {
217238 convenience init (
218239 _ header: Livekit_DataStream . Header ,
219- _ textHeader: Livekit_DataStream . TextHeader
240+ _ textHeader: Livekit_DataStream . TextHeader ,
241+ _ encryptionType: EncryptionType
220242 ) {
221243 self . init (
222244 id: header. streamID,
223245 topic: header. topic,
224246 timestamp: header. timestampDate,
225247 totalLength: header. hasTotalLength ? Int ( header. totalLength) : nil ,
226248 attributes: header. attributes,
249+ encryptionType: encryptionType,
227250 // ---
228251 operationType: TextStreamInfo . OperationType ( textHeader. operationType) ,
229252 version: Int ( textHeader. version) ,
0 commit comments