Skip to content

Commit 0647b58

Browse files
committed
Close inactive connections
This new middleware introduces a timeout of closing inactive connections between connections after a configured amount of seconds. This builds on top of #405 and partially on #422
1 parent 29940dc commit 0647b58

6 files changed

+155
-12
lines changed

README.md

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -73,6 +73,7 @@ multiple concurrent HTTP requests without blocking.
7373
* [ServerRequest](#serverrequest)
7474
* [ResponseException](#responseexception)
7575
* [React\Http\Middleware](#reacthttpmiddleware)
76+
* [InactiveConnectionTimeoutMiddleware](#inactiveconnectiontimeoutmiddleware)
7677
* [StreamingRequestMiddleware](#streamingrequestmiddleware)
7778
* [LimitConcurrentRequestsMiddleware](#limitconcurrentrequestsmiddleware)
7879
* [RequestBodyBufferMiddleware](#requestbodybuffermiddleware)
@@ -2438,6 +2439,22 @@ access its underlying response object.
24382439

24392440
### React\Http\Middleware
24402441

2442+
#### InactiveConnectionTimeoutMiddleware
2443+
2444+
The `React\Http\Middleware\InactiveConnectionTimeoutMiddleware` is purely a configuration middleware to configure the
2445+
`HttpServer` to close any inactive connections between requests to close the connection and not leave them needlessly open.
2446+
2447+
The following example configures the `HttpServer` to close any inactive connections after one and a half second:
2448+
2449+
```php
2450+
$http = new React\Http\HttpServer(
2451+
new React\Http\Middleware\InactiveConnectionTimeoutMiddleware(1.5),
2452+
$handler
2453+
);
2454+
```
2455+
> Internally, this class is used as a "value object" to override the default timeout of one minute.
2456+
As such it doesn't have any behavior internally, that is all in the internal "StreamingServer".
2457+
24412458
#### StreamingRequestMiddleware
24422459

24432460
The `React\Http\Middleware\StreamingRequestMiddleware` can be used to

src/HttpServer.php

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
use React\Http\Io\IniUtil;
99
use React\Http\Io\MiddlewareRunner;
1010
use React\Http\Io\StreamingServer;
11+
use React\Http\Middleware\InactiveConnectionTimeoutMiddleware;
1112
use React\Http\Middleware\LimitConcurrentRequestsMiddleware;
1213
use React\Http\Middleware\StreamingRequestMiddleware;
1314
use React\Http\Middleware\RequestBodyBufferMiddleware;
@@ -219,10 +220,13 @@ public function __construct($requestHandlerOrLoop)
219220
}
220221

221222
$streaming = false;
223+
$idleConnectTimeout = InactiveConnectionTimeoutMiddleware::DEFAULT_TIMEOUT;
222224
foreach ((array) $requestHandlers as $handler) {
223225
if ($handler instanceof StreamingRequestMiddleware) {
224226
$streaming = true;
225-
break;
227+
}
228+
if ($handler instanceof InactiveConnectionTimeoutMiddleware) {
229+
$idleConnectTimeout = $handler->getTimeout();
226230
}
227231
}
228232

@@ -252,10 +256,10 @@ public function __construct($requestHandlerOrLoop)
252256
* doing anything with the request.
253257
*/
254258
$middleware = \array_filter($middleware, function ($handler) {
255-
return !($handler instanceof StreamingRequestMiddleware);
259+
return !($handler instanceof StreamingRequestMiddleware) && !($handler instanceof InactiveConnectionTimeoutMiddleware);
256260
});
257261

258-
$this->streamingServer = new StreamingServer($loop, new MiddlewareRunner($middleware));
262+
$this->streamingServer = new StreamingServer($loop, new MiddlewareRunner($middleware), $idleConnectTimeout);
259263

260264
$that = $this;
261265
$this->streamingServer->on('error', function ($error) use ($that) {

src/Io/StreamingServer.php

Lines changed: 32 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
use React\EventLoop\LoopInterface;
99
use React\Http\Message\Response;
1010
use React\Http\Message\ServerRequest;
11+
use React\Http\Middleware\InactiveConnectionTimeoutMiddleware;
1112
use React\Promise;
1213
use React\Promise\CancellablePromiseInterface;
1314
use React\Promise\PromiseInterface;
@@ -85,6 +86,7 @@ final class StreamingServer extends EventEmitter
8586
private $callback;
8687
private $parser;
8788
private $loop;
89+
private $idleConnectionTimeout;
8890

8991
/**
9092
* Creates an HTTP server that invokes the given callback for each incoming HTTP request
@@ -96,15 +98,17 @@ final class StreamingServer extends EventEmitter
9698
*
9799
* @param LoopInterface $loop
98100
* @param callable $requestHandler
101+
* @param float $idleConnectTimeout
99102
* @see self::listen()
100103
*/
101-
public function __construct(LoopInterface $loop, $requestHandler)
104+
public function __construct(LoopInterface $loop, $requestHandler, $idleConnectTimeout = InactiveConnectionTimeoutMiddleware::DEFAULT_TIMEOUT)
102105
{
103106
if (!\is_callable($requestHandler)) {
104107
throw new \InvalidArgumentException('Invalid request handler given');
105108
}
106109

107110
$this->loop = $loop;
111+
$this->idleConnectionTimeout = $idleConnectTimeout;
108112

109113
$this->callback = $requestHandler;
110114
$this->parser = new RequestHeaderParser();
@@ -134,7 +138,29 @@ public function __construct(LoopInterface $loop, $requestHandler)
134138
*/
135139
public function listen(ServerInterface $socket)
136140
{
137-
$socket->on('connection', array($this->parser, 'handle'));
141+
$socket->on('connection', array($this, 'handle'));
142+
}
143+
144+
/** @internal */
145+
public function handle(ConnectionInterface $conn)
146+
{
147+
$timer = $this->loop->addTimer($this->idleConnectionTimeout, function () use ($conn) {
148+
$conn->close();
149+
});
150+
$loop = $this->loop;
151+
$conn->on('data', function ($data) use ($loop, $timer) {
152+
if ($data !== '') {
153+
$loop->cancelTimer($timer);
154+
}
155+
});
156+
$conn->on('end', function () use ($loop, $timer) {
157+
$loop->cancelTimer($timer);
158+
});
159+
$conn->on('close', function () use ($loop, $timer) {
160+
$loop->cancelTimer($timer);
161+
});
162+
163+
$this->parser->handle($conn);
138164
}
139165

140166
/** @internal */
@@ -345,7 +371,7 @@ public function handleResponse(ConnectionInterface $connection, ServerRequestInt
345371

346372
// either wait for next request over persistent connection or end connection
347373
if ($persist) {
348-
$this->parser->handle($connection);
374+
$this->handle($connection);
349375
} else {
350376
$connection->end();
351377
}
@@ -366,10 +392,10 @@ public function handleResponse(ConnectionInterface $connection, ServerRequestInt
366392
// write streaming body and then wait for next request over persistent connection
367393
if ($persist) {
368394
$body->pipe($connection, array('end' => false));
369-
$parser = $this->parser;
370-
$body->on('end', function () use ($connection, $parser, $body) {
395+
$that = $this;
396+
$body->on('end', function () use ($connection, $that, $body) {
371397
$connection->removeListener('close', array($body, 'close'));
372-
$parser->handle($connection);
398+
$that->handle($connection);
373399
});
374400
} else {
375401
$body->pipe($connection);
Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,58 @@
1+
<?php
2+
3+
namespace React\Http\Middleware;
4+
5+
use Psr\Http\Message\ResponseInterface;
6+
use Psr\Http\Message\ServerRequestInterface;
7+
use React\Http\Io\HttpBodyStream;
8+
use React\Http\Io\PauseBufferStream;
9+
use React\Promise;
10+
use React\Promise\PromiseInterface;
11+
use React\Promise\Deferred;
12+
use React\Stream\ReadableStreamInterface;
13+
14+
/**
15+
* Closes any inactive connection after the specified amount of seconds since last activity.
16+
*
17+
* This allows you to set an alternative timeout to the default one minute (60 seconds). For example
18+
* thirteen and a half seconds:
19+
*
20+
* ```php
21+
* $http = new React\Http\HttpServer(
22+
* new React\Http\Middleware\InactiveConnectionTimeoutMiddleware(13.5),
23+
* $handler
24+
* );
25+
*
26+
* > Internally, this class is used as a "value object" to override the default timeout of one minute.
27+
* As such it doesn't have any behavior internally, that is all in the internal "StreamingServer".
28+
*/
29+
final class InactiveConnectionTimeoutMiddleware
30+
{
31+
const DEFAULT_TIMEOUT = 60;
32+
33+
/**
34+
* @var float
35+
*/
36+
private $timeout;
37+
38+
/**
39+
* @param float $timeout
40+
*/
41+
public function __construct($timeout = self::DEFAULT_TIMEOUT)
42+
{
43+
$this->timeout = $timeout;
44+
}
45+
46+
public function __invoke(ServerRequestInterface $request, $next)
47+
{
48+
return $next($request);
49+
}
50+
51+
/**
52+
* @return float
53+
*/
54+
public function getTimeout()
55+
{
56+
return $this->timeout;
57+
}
58+
}

tests/HttpServerTest.php

Lines changed: 21 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,8 @@
77
use React\EventLoop\Factory;
88
use React\Http\HttpServer;
99
use React\Http\Io\IniUtil;
10+
use React\Http\Io\StreamingServer;
11+
use React\Http\Middleware\InactiveConnectionTimeoutMiddleware;
1012
use React\Http\Middleware\StreamingRequestMiddleware;
1113
use React\Promise;
1214
use React\Promise\Deferred;
@@ -254,6 +256,24 @@ function (ServerRequestInterface $request) use (&$streaming) {
254256
$this->assertEquals(true, $streaming);
255257
}
256258

259+
public function testIdleConnectionWillBeClosedAfterConfiguredTimeout()
260+
{
261+
$this->connection->expects($this->once())->method('close');
262+
263+
$loop = Factory::create();
264+
$http = new HttpServer($loop, new InactiveConnectionTimeoutMiddleware(0.1), $this->expectCallableNever());
265+
266+
$http->listen($this->socket);
267+
$this->socket->emit('connection', array($this->connection));
268+
$this->connection->emit('data', array(''));
269+
270+
$loop->addTimer(0.12, function () use ($loop) {
271+
$loop->stop();
272+
});
273+
274+
$loop->run();
275+
}
276+
257277
public function testForwardErrors()
258278
{
259279
$exception = new \Exception();
@@ -434,7 +454,7 @@ public function testConstructServerWithMemoryLimitDoesLimitConcurrency()
434454

435455
public function testConstructFiltersOutConfigurationMiddlewareBefore()
436456
{
437-
$http = new HttpServer(new StreamingRequestMiddleware(), function () { });
457+
$http = new HttpServer(new InactiveConnectionTimeoutMiddleware(0), new StreamingRequestMiddleware(), function () { });
438458

439459
$ref = new \ReflectionProperty($http, 'streamingServer');
440460
$ref->setAccessible(true);

tests/Io/StreamingServerTest.php

Lines changed: 20 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,24 @@ public function testRequestEventWillNotBeEmittedForIncompleteHeaders()
5858
$this->connection->emit('data', array($data));
5959
}
6060

61+
public function testIdleConnectionWillBeClosedAfterConfiguredTimeout()
62+
{
63+
$this->connection->expects($this->once())->method('close');
64+
65+
$loop = Factory::create();
66+
$server = new StreamingServer($loop, $this->expectCallableNever(), 0.1);
67+
68+
$server->listen($this->socket);
69+
$this->socket->emit('connection', array($this->connection));
70+
71+
$loop->addTimer(0.12, function () use ($loop) {
72+
$loop->stop();
73+
});
74+
75+
$this->connection->emit('data', array(''));
76+
$loop->run();
77+
}
78+
6179
public function testRequestEventIsEmitted()
6280
{
6381
$server = new StreamingServer(Factory::create(), $this->expectCallableOnce());
@@ -3026,9 +3044,9 @@ public function testNewConnectionWillInvokeParserTwiceAfterInvokingRequestHandle
30263044
// pretend parser just finished parsing
30273045
$server->handleRequest($this->connection, $request);
30283046

3029-
$this->assertCount(2, $this->connection->listeners('close'));
3047+
$this->assertCount(3, $this->connection->listeners('close'));
30303048
$body->end();
3031-
$this->assertCount(1, $this->connection->listeners('close'));
3049+
$this->assertCount(3, $this->connection->listeners('close'));
30323050
}
30333051

30343052
private function createGetRequest()

0 commit comments

Comments
 (0)