Skip to content

Adopting an existing, pre-bound server-socket channel #748

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions src/aleph/http.clj
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@
| --- | --- |
| `port` | The port the server will bind to. If `0`, the server will bind to a random port. |
| `socket-address` | A `java.net.SocketAddress` specifying both the port and interface to bind to. |
| `existing-channel` | A pre-bound `java.nio.channels.ServerSocketChannel` for the server to use rather than opening and binding its own server-socket. It won't be closed by the server. Possibly obtained from `System/inheritedChannel`. |
| `bootstrap-transform` | A function that takes an `io.netty.bootstrap.ServerBootstrap` object, which represents the server, and modifies it. |
| `http-versions` | An optional vector of allowable HTTP versions to negotiate via ALPN, in preference order. Defaults to `[:http1]`. |
| `ssl-context` | An `io.netty.handler.ssl.SslContext` object or a map of SSL context options (see `aleph.netty/ssl-server-context` for more details) if an SSL connection is desired. When passing an `io.netty.handler.ssl.SslContext` object, it must have an ALPN config matching the `http-versions` option (see `aleph.netty/ssl-server-context` and `aleph.netty/application-protocol-config`). If only HTTP/1.1 is desired, ALPN config is optional.
Expand Down
8 changes: 5 additions & 3 deletions src/aleph/http/server.clj
Original file line number Diff line number Diff line change
Expand Up @@ -728,6 +728,7 @@
[handler
{:keys [port
socket-address
existing-channel
executor
http-versions
ssl-context
Expand Down Expand Up @@ -773,9 +774,10 @@
(netty/start-server
{:pipeline-builder pipeline-builder
:bootstrap-transform bootstrap-transform
:socket-address (if socket-address
socket-address
(InetSocketAddress. port))
:socket-address (cond
socket-address socket-address
(nil? existing-channel) (InetSocketAddress. port))
:existing-channel existing-channel
:on-close (when (and shutdown-executor?
(or (instance? ExecutorService executor)
(instance? ExecutorService continue-executor)))
Expand Down
35 changes: 31 additions & 4 deletions src/aleph/netty.clj
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
Unpooled)
(io.netty.channel
Channel
ChannelFactory
ChannelFuture
ChannelHandler
ChannelHandlerContext
Expand Down Expand Up @@ -1667,6 +1668,22 @@
(.addFirst pipeline "channel-tracker" ^ChannelHandler (channel-tracking-handler chan-group))
(pipeline-builder pipeline)))

(defn- validate-existing-channel
[existing-channel]
(when (some? existing-channel)
(when-not (instance? java.nio.channels.ServerSocketChannel existing-channel)
(throw (IllegalArgumentException.
(str "The existing-channel type is not supported: " (pr-str existing-channel)))))
(when (nil? (.getLocalAddress ^java.nio.channels.ServerSocketChannel existing-channel))
(throw (IllegalArgumentException.
(str "The existing-channel is not bound: " (pr-str existing-channel)))))))

(defn- wrapping-channel-factory
^ChannelFactory [^java.nio.channels.ServerSocketChannel channel]
(proxy [ChannelFactory] []
(newChannel []
(NioServerSocketChannel. channel))))

(defn ^:no-doc start-server
([pipeline-builder
ssl-context
Expand All @@ -1685,11 +1702,13 @@
bootstrap-transform
on-close
^SocketAddress socket-address
existing-channel
transport
shutdown-timeout]
:or {shutdown-timeout default-shutdown-timeout}
:as opts}]
(ensure-transport-available! transport)
(validate-existing-channel existing-channel)
(let [num-cores (.availableProcessors (Runtime/getRuntime))
num-threads (* 2 num-cores)
thread-factory (enumerating-thread-factory "aleph-server-pool" false)
Expand All @@ -1715,23 +1734,30 @@
(.option ChannelOption/SO_REUSEADDR true)
(.option ChannelOption/MAX_MESSAGES_PER_READ Integer/MAX_VALUE)
(.group group)
(.channel channel-class)
(cond-> (nil? existing-channel) (.channel channel-class))
(cond-> (some? existing-channel) (.channelFactory (wrapping-channel-factory existing-channel)))
;;TODO: add a server (.handler) call to the bootstrap, for logging or something
(.childHandler (pipeline-initializer pipeline-builder))
(.childOption ChannelOption/SO_REUSEADDR true)
(.childOption ChannelOption/MAX_MESSAGES_PER_READ Integer/MAX_VALUE)
bootstrap-transform)

^ServerSocketChannel
ch (-> b (.bind socket-address) .sync .channel)]
ch (-> (if (nil? existing-channel)
(.bind b socket-address)
(.register b))
.sync
.channel)]

(reify
Closeable
(close [_]
(when (compare-and-set! closed? false true)
;; This is the three step closing sequence:
;; 1. Stop listening to incoming requests
(-> ch .close .sync)
(if (nil? existing-channel)
(-> ch .close .sync)
(-> ch .deregister .sync))
(-> (if (pos? shutdown-timeout)
;; 2. Wait for in-flight requests to stop processing within the supplied timeout
;; interval.
Expand Down Expand Up @@ -1759,7 +1785,8 @@
(port [_]
(-> ch .localAddress .getPort))
(wait-for-close [_]
(-> ch .closeFuture .await)
(when (nil? existing-channel)
(-> ch .closeFuture .await))
(-> group .terminationFuture .await)
nil)))

Expand Down
10 changes: 6 additions & 4 deletions src/aleph/tcp.clj
Original file line number Diff line number Diff line change
Expand Up @@ -74,14 +74,15 @@
| --- | ---
| `port` | the port the server will bind to. If `0`, the server will bind to a random port.
| `socket-address` | a `java.net.SocketAddress` specifying both the port and interface to bind to.
| `existing-channel` | a pre-bound `java.nio.channels.ServerSocketChannel` for the server to use rather than opening and binding its own server-socket. It won't be closed by the server. Possibly obtained from `System/inheritedChannel`. |
| `ssl-context` | an `io.netty.handler.ssl.SslContext` object or a map of SSL context options (see `aleph.netty/ssl-server-context` for more details). If given, the server will only accept SSL connections and call the handler once the SSL session has been successfully established. If a self-signed certificate is all that's required, `(aleph.netty/self-signed-ssl-context)` will suffice.
| `bootstrap-transform` | a function that takes an `io.netty.bootstrap.ServerBootstrap` object, which represents the server, and modifies it.
| `pipeline-transform` | a function that takes an `io.netty.channel.ChannelPipeline` object, which represents a connection, and modifies it.
| `raw-stream?` | if true, messages from the stream will be `io.netty.buffer.ByteBuf` objects rather than byte-arrays. This will minimize copying, but means that care must be taken with Netty's buffer reference counting. Only recommended for advanced users.
| `shutdown-timeout` | interval in seconds within which in-flight requests must be processed, defaults to 15 seconds. A value of 0 bypasses waiting entirely.
| `transport` | the transport to use, one of `:nio`, `:epoll`, `:kqueue` or `:io-uring` (defaults to `:nio`)."
[handler
{:keys [port socket-address ssl-context bootstrap-transform pipeline-transform epoll?
{:keys [port socket-address existing-channel ssl-context bootstrap-transform pipeline-transform epoll?
shutdown-timeout transport]
:or {bootstrap-transform identity
pipeline-transform identity
Expand All @@ -101,9 +102,10 @@
(server-channel-handler handler options))
(pipeline-transform pipeline))
:bootstrap-transform bootstrap-transform
:socket-address (if socket-address
socket-address
(InetSocketAddress. port))
:socket-address (cond
socket-address socket-address
(nil? existing-channel) (InetSocketAddress. port))
:existing-channel existing-channel
:transport (netty/determine-transport transport epoll?)
:shutdown-timeout shutdown-timeout})))

Expand Down
41 changes: 39 additions & 2 deletions test/aleph/http_test.clj
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
[aleph.resource-leak-detector]
[aleph.ssl :as test-ssl]
[aleph.tcp :as tcp]
[aleph.testutils :refer [str=]]
[aleph.testutils :refer [str= bound-channel]]
[clj-commons.byte-streams :as bs]
[clojure.java.io :as io]
[clojure.string :as str]
Expand Down Expand Up @@ -47,7 +47,10 @@
(java.io
Closeable
File)
(java.net UnknownHostException)
(java.net
BindException
UnknownHostException)
(java.nio.channels ServerSocketChannel)
(java.util.concurrent
SynchronousQueue
ThreadPoolExecutor
Expand Down Expand Up @@ -1747,6 +1750,40 @@
(catch Exception e
(is (instance? ProxyConnectException e)))))))))

(deftest test-existing-channel
(testing "validation"
(is (thrown-with-msg? Exception #"existing-channel"
(with-http-servers basic-handler {:existing-channel "a string"})))
(with-open [unbound-channel (ServerSocketChannel/open)]
(is (thrown-with-msg? Exception #"existing-channel"
(with-http-servers basic-handler {:existing-channel unbound-channel})))))

(testing "with a bound server-socket channel"
(testing "- unknown to the server"
(with-open [_ (bound-channel port)]
;; The port is already bound by a server-socket channel, but
;; we are not telling Aleph about it, so we should get a
;; BindException when Aleph tries to bind to the same port.
(is (thrown? BindException
(with-http-servers basic-handler {})))))

(testing "- known to the server"
(with-open [channel (bound-channel port)]
;; This time, we shouldn't get a BindException, because we are
;; telling Aleph to use an existing server-socket channel,
;; which should be already bound, so Aleph doesn't try to bind.
(with-http-servers basic-handler {:existing-channel channel}
(is (= 200 (:status @(http-get "/string")))))
;; The existing channel should not be closed on a server shutdown,
;; because that channel is not owned by the server.
(is (.isOpen channel)))))

(testing "the :port option is not required when :existing-channel is passed"
(with-redefs [http-server-options (dissoc http-server-options :port)]
(with-open [channel (bound-channel port)]
(with-http1-server basic-handler {:existing-channel channel}
(is (= 200 (:status @(http-get "/string")))))))))

(deftest ^:leak test-leak-in-raw-stream-handler
;; NOTE: Expecting 2 leaks because `with-raw-handler` will run its body for both http1 and
;; http2. It would be nicer to put this assertion into the body but the http1 server seems to
Expand Down
10 changes: 10 additions & 0 deletions test/aleph/tcp_test.clj
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
[aleph.netty :as netty]
[aleph.resource-leak-detector]
[aleph.tcp :as tcp]
[aleph.testutils :refer [bound-channel]]
[clj-commons.byte-streams :as bs]
[clojure.test :refer [deftest testing is]]
[manifold.stream :as s]))
Expand Down Expand Up @@ -55,4 +56,13 @@
(catch Exception _
(is (not (netty/io-uring-available?)))))))

(deftest test-existing-channel
(testing "the :port option is not required when :existing-channel is passed"
(let [port 8083]
(with-open [channel (bound-channel port)]
(with-server (tcp/start-server echo-handler {:existing-channel channel :shutdown-timeout 0})
(let [c @(tcp/client {:host "localhost" :port port})]
(s/put! c "foo")
(is (= "foo" (bs/to-string @(s/take! c))))))))))

(aleph.resource-leak-detector/instrument-tests!)
9 changes: 8 additions & 1 deletion test/aleph/testutils.clj
Original file line number Diff line number Diff line change
@@ -1,8 +1,15 @@
(ns aleph.testutils
(:import (io.netty.util AsciiString)))
(:import (io.netty.util AsciiString)
(java.net InetSocketAddress)
(java.nio.channels ServerSocketChannel)))

(defn str=
"AsciiString-aware equals"
[^CharSequence x ^CharSequence y]
(AsciiString/contentEquals x y))

(defn bound-channel
"Returns a new server-socket channel bound to a `port`."
^ServerSocketChannel [port]
(doto (ServerSocketChannel/open)
(.bind (InetSocketAddress. port))))