@@ -180,28 +180,26 @@ public final class HTTPServerUpgradeHandler: ChannelInboundHandler, RemovableCha
180180 // We'll attempt to upgrade. This may take a while, so while we're waiting more data can come in.
181181 self . upgradeState = . awaitingUpgrader
182182
183- let eventLoop = context. eventLoop
184- let loopBoundContext = context. loopBound
185183 self . handleUpgrade ( context: context, request: request, requestedProtocols: requestedProtocols)
186- . hop ( to: eventLoop) // the user might return a future from another EventLoop.
187184 . whenSuccess { callback in
188- eventLoop. assertInEventLoop ( )
189185 if let callback = callback {
190186 self . gotUpgrader ( upgrader: callback)
191187 } else {
192- self . notUpgrading ( context: loopBoundContext . value , data: requestPart)
188+ self . notUpgrading ( context: context , data: requestPart)
193189 }
194190 }
195191 }
196192
197193 /// The core of the upgrade handling logic.
198194 ///
199- /// - Returns: An `EventLoopFuture` that will contain a callback to invoke if upgrade is requested, or nil if upgrade has failed. Never returns a failed future.
195+ /// - Returns: An isolated `EventLoopFuture` that will contain a callback to invoke if upgrade is requested,
196+ /// or nil if upgrade has failed. Never returns a failed future.
200197 private func handleUpgrade(
201198 context: ChannelHandlerContext ,
202199 request: HTTPRequestHead ,
203200 requestedProtocols: [ String ]
204- ) -> EventLoopFuture < ( ( ) -> Void ) ? > {
201+ ) -> EventLoopFuture < ( ( ) -> Void ) ? > . Isolated {
202+
205203 let connectionHeader = Set ( request. headers [ canonicalForm: " connection " ] . map { $0. lowercased ( ) } )
206204 let allHeaderNames = Set ( request. headers. map { $0. name. lowercased ( ) } )
207205
@@ -219,18 +217,21 @@ public final class HTTPServerUpgradeHandler: ChannelInboundHandler, RemovableCha
219217 /// Attempt to upgrade a single protocol.
220218 ///
221219 /// Will recurse through `protocolIterator` if upgrade fails.
220+ ///
221+ /// - Returns: An isolated `EventLoopFuture` that will contain a callback to invoke if upgrade is requested,
222+ /// or nil if upgrade has failed. Never returns a failed future.
222223 private func handleUpgradeForProtocol(
223224 context: ChannelHandlerContext ,
224225 protocolIterator: Array < String > . Iterator ,
225226 request: HTTPRequestHead ,
226227 allHeaderNames: Set < String > ,
227228 connectionHeader: Set < String >
228- ) -> EventLoopFuture < ( ( ) -> Void ) ? > {
229+ ) -> EventLoopFuture < ( ( ) -> Void ) ? > . Isolated {
229230 // We want a local copy of the protocol iterator. We'll pass it to the next invocation of the function.
230231 var protocolIterator = protocolIterator
231232 guard let proto = protocolIterator. next ( ) else {
232233 // We're done! No suitable protocol for upgrade.
233- return context. eventLoop. makeSucceededFuture ( nil )
234+ return context. eventLoop. makeSucceededIsolatedFuture ( nil )
234235 }
235236
236237 guard let upgrader = self . upgraders [ proto. lowercased ( ) ] else {
@@ -256,66 +257,67 @@ public final class HTTPServerUpgradeHandler: ChannelInboundHandler, RemovableCha
256257
257258 let responseHeaders = self . buildUpgradeHeaders ( protocol: proto)
258259 let pipeline = context. pipeline
259- let loopBoundContext = context . loopBound
260+
260261 return upgrader. buildUpgradeResponse (
261262 channel: context. channel,
262263 upgradeRequest: request,
263264 initialResponseHeaders: responseHeaders
264- ) . map { finalResponseHeaders in
265- {
266- // Ok, we're upgrading.
267- self . upgradeState = . upgrading
268-
269- // Before we finish the upgrade we have to remove the HTTPDecoder and any other non-Encoder HTTP
270- // handlers from the pipeline, to prevent them parsing any more data. We'll buffer the data until
271- // that completes.
272- // While there are a lot of Futures involved here it's quite possible that all of this code will
273- // actually complete synchronously: we just want to program for the possibility that it won't.
274- // Once that's done, we send the upgrade response, then remove the HTTP encoder, then call the
275- // internal handler, then call the user code, and then finally when the user code is done we do
276- // our final cleanup steps, namely we replay the received data we buffered in the meantime and
277- // then remove ourselves from the pipeline.
278- self . removeExtraHandlers ( pipeline: pipeline) . flatMap {
279- self . sendUpgradeResponse (
280- context: loopBoundContext. value,
281- upgradeRequest: request,
282- responseHeaders: finalResponseHeaders
283- )
284- } . flatMap {
285- pipeline. syncOperations. removeHandler ( self . httpEncoder)
286- } . flatMap { ( ) -> EventLoopFuture < Void > in
287- let context = loopBoundContext. value
288- self . upgradeCompletionHandler ( context)
289- return upgrader. upgrade ( context: context, upgradeRequest: request)
290- } . whenComplete { result in
291- let context = loopBoundContext. value
292- switch result {
293- case . success:
294- context. fireUserInboundEventTriggered (
295- HTTPServerUpgradeEvents . upgradeComplete ( toProtocol: proto, upgradeRequest: request)
296- )
297- self . upgradeState = . upgradeComplete
298- // When we remove ourselves we'll be delivering any buffered data.
299- context. pipeline. syncOperations. removeHandler ( context: context, promise: nil )
300-
301- case . failure( let error) :
302- // Remain in the '.upgrading' state.
303- context. fireErrorCaught ( error)
304- }
265+ ) . hop ( to: context. eventLoop)
266+ . assumeIsolated ( )
267+ . map { finalResponseHeaders in
268+ {
269+ // Ok, we're upgrading.
270+ self . upgradeState = . upgrading
271+
272+ // Before we finish the upgrade we have to remove the HTTPDecoder and any other non-Encoder HTTP
273+ // handlers from the pipeline, to prevent them parsing any more data. We'll buffer the data until
274+ // that completes.
275+ // While there are a lot of Futures involved here it's quite possible that all of this code will
276+ // actually complete synchronously: we just want to program for the possibility that it won't.
277+ // Once that's done, we send the upgrade response, then remove the HTTP encoder, then call the
278+ // internal handler, then call the user code, and then finally when the user code is done we do
279+ // our final cleanup steps, namely we replay the received data we buffered in the meantime and
280+ // then remove ourselves from the pipeline.
281+ self . removeExtraHandlers ( pipeline: pipeline)
282+ . assumeIsolated ( )
283+ . flatMap {
284+ self . sendUpgradeResponse (
285+ context: context,
286+ upgradeRequest: request,
287+ responseHeaders: finalResponseHeaders
288+ )
289+ } . flatMap {
290+ pipeline. syncOperations. removeHandler ( self . httpEncoder)
291+ } . flatMap { ( ) -> EventLoopFuture < Void > in
292+ self . upgradeCompletionHandler ( context)
293+ return upgrader. upgrade ( context: context, upgradeRequest: request)
294+ } . whenComplete { result in
295+ switch result {
296+ case . success:
297+ context. fireUserInboundEventTriggered (
298+ HTTPServerUpgradeEvents . upgradeComplete ( toProtocol: proto, upgradeRequest: request)
299+ )
300+ self . upgradeState = . upgradeComplete
301+ // When we remove ourselves we'll be delivering any buffered data.
302+ context. pipeline. syncOperations. removeHandler ( context: context, promise: nil )
303+
304+ case . failure( let error) :
305+ // Remain in the '.upgrading' state.
306+ context. fireErrorCaught ( error)
307+ }
308+ }
305309 }
310+ } . flatMapError { error in
311+ // No upgrade here. We want to fire the error down the pipeline, and then try another loop iteration.
312+ context. fireErrorCaught ( error)
313+ return self . handleUpgradeForProtocol (
314+ context: context,
315+ protocolIterator: protocolIterator,
316+ request: request,
317+ allHeaderNames: allHeaderNames,
318+ connectionHeader: connectionHeader
319+ )
306320 }
307- } . flatMapError { error in
308- // No upgrade here. We want to fire the error down the pipeline, and then try another loop iteration.
309- let context = loopBoundContext. value
310- context. fireErrorCaught ( error)
311- return self . handleUpgradeForProtocol (
312- context: context,
313- protocolIterator: protocolIterator,
314- request: request,
315- allHeaderNames: allHeaderNames,
316- connectionHeader: connectionHeader
317- )
318- }
319321 }
320322
321323 private func gotUpgrader( upgrader: @escaping ( ( ) -> Void ) ) {
@@ -379,7 +381,7 @@ public final class HTTPServerUpgradeHandler: ChannelInboundHandler, RemovableCha
379381 }
380382
381383 return . andAllSucceed(
382- self . extraHTTPHandlers. map { pipeline. removeHandler ( $0) } ,
384+ self . extraHTTPHandlers. map { pipeline. syncOperations . removeHandler ( $0) } ,
383385 on: pipeline. eventLoop
384386 )
385387 }
0 commit comments