Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 20 additions & 0 deletions Libraries/ConnectNIO/Internal/ConnectStreamChannelHandler.swift
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ final class ConnectStreamChannelHandler: NIOCore.ChannelInboundHandler, @uncheck

private var context: NIOCore.ChannelHandlerContext?
private var isClosed = false
private var hasResponded = false
private var pendingClose: NIOHTTP1.HTTPClientRequestPart?
private var pendingData = Foundation.Data()
private var receivedStatus: NIOHTTP1.HTTPResponseStatus?
Expand Down Expand Up @@ -81,6 +82,7 @@ final class ConnectStreamChannelHandler: NIOCore.ChannelInboundHandler, @uncheck
}

self.closeConnection()
self.hasResponded = true
self.responseCallbacks.receiveClose(.canceled, [:], ConnectError.canceled())
}
}
Expand Down Expand Up @@ -149,6 +151,7 @@ final class ConnectStreamChannelHandler: NIOCore.ChannelInboundHandler, @uncheck
self.responseCallbacks.receiveResponseData(Data(buffer: byteBuffer))
context.fireChannelRead(data)
case .end(let trailers):
self.hasResponded = true
self.responseCallbacks.receiveClose(
self.receivedStatus.map { .fromNIOStatus($0) } ?? .ok,
trailers.map { .fromNIOHeaders($0) } ?? [:],
Expand All @@ -167,7 +170,22 @@ final class ConnectStreamChannelHandler: NIOCore.ChannelInboundHandler, @uncheck
}

func channelInactive(context: ChannelHandlerContext) {
let shouldNotify = !self.hasResponded
self.closeConnection()
if shouldNotify {
self.hasResponded = true
self.responseCallbacks.receiveClose(
.unavailable,
[:],
ConnectError(
code: .unavailable,
message: "Channel became inactive",
exception: nil,
details: [],
metadata: [:]
)
)
}
context.fireChannelInactive()
}

Expand All @@ -176,6 +194,7 @@ final class ConnectStreamChannelHandler: NIOCore.ChannelInboundHandler, @uncheck
return
}

self.hasResponded = true
self.responseCallbacks.receiveClose(
.fromHTTPStatus((error as NSError).code),
[:],
Expand All @@ -190,6 +209,7 @@ final class ConnectStreamChannelHandler: NIOCore.ChannelInboundHandler, @uncheck
}

self.closeConnection()
self.hasResponded = true
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we be consistent about setting hasResponded before closeConnection() calls?

self.responseCallbacks.receiveClose(.deadlineExceeded, [:], ConnectError.deadlineExceeded())
}
}
23 changes: 23 additions & 0 deletions Libraries/ConnectNIO/Internal/ConnectUnaryChannelHandler.swift
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ final class ConnectUnaryChannelHandler: NIOCore.ChannelInboundHandler, @unchecke

private var context: NIOCore.ChannelHandlerContext?
private var isClosed = false
private var hasResponded = false
private var receivedHead: NIOHTTP1.HTTPResponseHead?
private var receivedData: Foundation.Data?
private var receivedEnd: NIOHTTP1.HTTPHeaders?
Expand All @@ -51,6 +52,7 @@ final class ConnectUnaryChannelHandler: NIOCore.ChannelInboundHandler, @unchecke
}

self.closeConnection()
self.hasResponded = true
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Alternatively to my previous comment, what do you think about moving self.hasResponded = true to closeConnection? Seems like that would simplify this a bit

self.onResponse(HTTPResponse(
code: .canceled,
headers: [:],
Expand Down Expand Up @@ -143,6 +145,7 @@ final class ConnectUnaryChannelHandler: NIOCore.ChannelInboundHandler, @unchecke
context.fireChannelRead(data)
case .end(let trailers):
self.receivedEnd = trailers
self.hasResponded = true
self.onResponse(self.createResponse(error: nil))
self.onMetrics(.init(taskMetrics: nil))
self.closeConnection()
Expand All @@ -158,7 +161,25 @@ final class ConnectUnaryChannelHandler: NIOCore.ChannelInboundHandler, @unchecke
}

func channelInactive(context: ChannelHandlerContext) {
let shouldNotify = !self.hasResponded
self.closeConnection()
if shouldNotify {
self.hasResponded = true
self.onResponse(.init(
code: .unavailable,
headers: [:],
message: nil,
trailers: [:],
error: ConnectError(
code: .unavailable,
message: "Channel became inactive",
exception: nil,
details: [],
metadata: [:]
),
tracingInfo: nil
))
}
context.fireChannelInactive()
}

Expand All @@ -167,6 +188,7 @@ final class ConnectUnaryChannelHandler: NIOCore.ChannelInboundHandler, @unchecke
return
}

self.hasResponded = true
self.onResponse(self.createResponse(error: error))
self.closeConnection()
}
Expand All @@ -177,6 +199,7 @@ final class ConnectUnaryChannelHandler: NIOCore.ChannelInboundHandler, @unchecke
}

self.closeConnection()
self.hasResponded = true
self.onResponse(.init(
code: .deadlineExceeded,
headers: [:],
Expand Down