Skip to content

Commit 7f39e07

Browse files
committed
Close inactive connections and requests
This new middleware introduces a timeout of closing inactive connections between requests after a configured amount of seconds. This builds on top of reactphp#405 and partially on reactphp#422
1 parent 05e170d commit 7f39e07

8 files changed

+253
-12
lines changed

README.md

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -81,6 +81,7 @@ multiple concurrent HTTP requests without blocking.
8181
* [ServerRequest](#serverrequest)
8282
* [ResponseException](#responseexception)
8383
* [React\Http\Middleware](#reacthttpmiddleware)
84+
* [InactiveConnectionTimeoutMiddleware](#inactiveconnectiontimeoutmiddleware)
8485
* [StreamingRequestMiddleware](#streamingrequestmiddleware)
8586
* [LimitConcurrentRequestsMiddleware](#limitconcurrentrequestsmiddleware)
8687
* [RequestBodyBufferMiddleware](#requestbodybuffermiddleware)
@@ -2679,6 +2680,22 @@ access its underlying response object.
26792680

26802681
### React\Http\Middleware
26812682

2683+
#### InactiveConnectionTimeoutMiddleware
2684+
2685+
The `React\Http\Middleware\InactiveConnectionTimeoutMiddleware` is purely a configuration middleware to configure the
2686+
`HttpServer` to close any inactive connections between requests to close the connection and not leave them needlessly open.
2687+
2688+
The following example configures the `HttpServer` to close any inactive connections after one and a half second:
2689+
2690+
```php
2691+
$http = new React\Http\HttpServer(
2692+
new React\Http\Middleware\InactiveConnectionTimeoutMiddleware(1.5),
2693+
$handler
2694+
);
2695+
```
2696+
> Internally, this class is used as a "value object" to override the default timeout of one minute.
2697+
As such it doesn't have any behavior internally, that is all in the internal "StreamingServer".
2698+
26822699
#### StreamingRequestMiddleware
26832700

26842701
The `React\Http\Middleware\StreamingRequestMiddleware` can be used to

src/HttpServer.php

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -5,9 +5,12 @@
55
use Evenement\EventEmitter;
66
use React\EventLoop\Loop;
77
use React\EventLoop\LoopInterface;
8+
use React\Http\Io\Clock;
89
use React\Http\Io\IniUtil;
910
use React\Http\Io\MiddlewareRunner;
11+
use React\Http\Io\RequestHeaderParser;
1012
use React\Http\Io\StreamingServer;
13+
use React\Http\Middleware\InactiveConnectionTimeoutMiddleware;
1114
use React\Http\Middleware\LimitConcurrentRequestsMiddleware;
1215
use React\Http\Middleware\StreamingRequestMiddleware;
1316
use React\Http\Middleware\RequestBodyBufferMiddleware;
@@ -219,10 +222,13 @@ public function __construct($requestHandlerOrLoop)
219222
}
220223

221224
$streaming = false;
225+
$idleConnectionTimeout = InactiveConnectionTimeoutMiddleware::DEFAULT_TIMEOUT;
222226
foreach ((array) $requestHandlers as $handler) {
223227
if ($handler instanceof StreamingRequestMiddleware) {
224228
$streaming = true;
225-
break;
229+
}
230+
if ($handler instanceof InactiveConnectionTimeoutMiddleware) {
231+
$idleConnectionTimeout = $handler->getTimeout();
226232
}
227233
}
228234

@@ -252,10 +258,11 @@ public function __construct($requestHandlerOrLoop)
252258
* doing anything with the request.
253259
*/
254260
$middleware = \array_filter($middleware, function ($handler) {
255-
return !($handler instanceof StreamingRequestMiddleware);
261+
return !($handler instanceof StreamingRequestMiddleware) && !($handler instanceof InactiveConnectionTimeoutMiddleware);
256262
});
257263

258-
$this->streamingServer = new StreamingServer($loop, new MiddlewareRunner($middleware));
264+
$clock = new Clock($loop);
265+
$this->streamingServer = new StreamingServer(new MiddlewareRunner($middleware), new RequestHeaderParser($loop, $clock, $idleConnectionTimeout), $clock);
259266

260267
$that = $this;
261268
$this->streamingServer->on('error', function ($error) use ($that) {

src/Io/RequestHeaderParser.php

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,10 +37,25 @@ public function __construct(Clock $clock)
3737

3838
public function handle(ConnectionInterface $conn)
3939
{
40+
$loop = $this->loop;
41+
$idleConnectionTimeout = $this->idleConnectionTimeout;
42+
$that = $this;
43+
$idleConnectionTimeoutHandler = function () use ($that, $conn) {
44+
$that->emit('error', array(
45+
new \RuntimeException('Request timed out', Response::STATUS_REQUEST_TIMEOUT),
46+
$conn
47+
));
48+
$conn->close();
49+
};
50+
$timer = $loop->addTimer($idleConnectionTimeout, $idleConnectionTimeoutHandler);
51+
$conn->on('close', function () use ($loop, &$timer) {
52+
$loop->cancelTimer($timer);
53+
});
4054
$buffer = '';
4155
$maxSize = $this->maxSize;
4256
$that = $this;
4357
$conn->on('data', $fn = function ($data) use (&$buffer, &$fn, $conn, $maxSize, $that) {
58+
// $loop->cancelTimer($timer);
4459
// append chunk of data to buffer and look for end of request headers
4560
$buffer .= $data;
4661
$endOfHeader = \strpos($buffer, "\r\n\r\n");
@@ -59,6 +74,7 @@ public function handle(ConnectionInterface $conn)
5974

6075
// ignore incomplete requests
6176
if ($endOfHeader === false) {
77+
// $timer = $loop->addTimer($idleConnectionTimeout, $idleConnectionTimeoutHandler);
6278
return;
6379
}
6480

src/Io/StreamingServer.php

Lines changed: 28 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -86,6 +86,8 @@ final class StreamingServer extends EventEmitter
8686

8787
/** @var Clock */
8888
private $clock;
89+
private $loop;
90+
private $idleConnectionTimeout;
8991

9092
/**
9193
* Creates an HTTP server that invokes the given callback for each incoming HTTP request
@@ -95,7 +97,6 @@ final class StreamingServer extends EventEmitter
9597
* connections in order to then parse incoming data as HTTP.
9698
* See also [listen()](#listen) for more details.
9799
*
98-
* @param LoopInterface $loop
99100
* @param callable $requestHandler
100101
* @see self::listen()
101102
*/
@@ -134,7 +135,7 @@ public function __construct(LoopInterface $loop, $requestHandler)
134135
*/
135136
public function listen(ServerInterface $socket)
136137
{
137-
$socket->on('connection', array($this->parser, 'handle'));
138+
$socket->on('connection', array($this, 'parseRequest'));
138139
}
139140

140141
/** @internal */
@@ -359,7 +360,7 @@ public function handleResponse(ConnectionInterface $connection, ServerRequestInt
359360

360361
// either wait for next request over persistent connection or end connection
361362
if ($persist) {
362-
$this->parser->handle($connection);
363+
$this->parseRequest($connection);
363364
} else {
364365
$connection->end();
365366
}
@@ -380,13 +381,34 @@ public function handleResponse(ConnectionInterface $connection, ServerRequestInt
380381
// write streaming body and then wait for next request over persistent connection
381382
if ($persist) {
382383
$body->pipe($connection, array('end' => false));
383-
$parser = $this->parser;
384-
$body->on('end', function () use ($connection, $parser, $body) {
384+
$that = $this;
385+
$body->on('end', function () use ($connection, $body, &$that) {
385386
$connection->removeListener('close', array($body, 'close'));
386-
$parser->handle($connection);
387+
$that->parseRequest($connection);
387388
});
388389
} else {
389390
$body->pipe($connection);
390391
}
391392
}
393+
394+
/**
395+
* @internal
396+
*/
397+
public function parseRequest(ConnectionInterface $connection)
398+
{
399+
$idleConnectionTimeout = $this->idleConnectionTimeout;
400+
$loop = $this->loop;
401+
$idleConnectionTimeoutHandler = function () use ($connection) {
402+
$connection->close();
403+
};
404+
$timer = $this->loop->addTimer($idleConnectionTimeout, $idleConnectionTimeoutHandler);
405+
$connection->once('close', function () use ($loop, &$timer) {
406+
$loop->cancelTimer($timer);
407+
});
408+
$connection->once('data', function () use ($loop, &$timer) {
409+
$loop->cancelTimer($timer);
410+
});
411+
412+
$this->parser->handle($connection);
413+
}
392414
}
Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,62 @@
1+
<?php
2+
3+
namespace React\Http\Middleware;
4+
5+
use Psr\Http\Message\ResponseInterface;
6+
use Psr\Http\Message\ServerRequestInterface;
7+
use React\Http\Io\HttpBodyStream;
8+
use React\Http\Io\PauseBufferStream;
9+
use React\Promise;
10+
use React\Promise\PromiseInterface;
11+
use React\Promise\Deferred;
12+
use React\Stream\ReadableStreamInterface;
13+
14+
/**
15+
* Closes any inactive connection after the specified amount of seconds since last activity.
16+
*
17+
* This allows you to set an alternative timeout to the default one minute (60 seconds). For example
18+
* thirteen and a half seconds:
19+
*
20+
* ```php
21+
* $http = new React\Http\HttpServer(
22+
* new React\Http\Middleware\InactiveConnectionTimeoutMiddleware(13.5),
23+
* $handler
24+
* );
25+
*
26+
* > Internally, this class is used as a "value object" to override the default timeout of one minute.
27+
* As such it doesn't have any behavior internally, that is all in the internal "StreamingServer".
28+
*/
29+
final class InactiveConnectionTimeoutMiddleware
30+
{
31+
/**
32+
* @internal
33+
*/
34+
const DEFAULT_TIMEOUT = 60;
35+
36+
/**
37+
* @var float
38+
*/
39+
private $timeout;
40+
41+
/**
42+
* @param float $timeout
43+
*/
44+
public function __construct($timeout = self::DEFAULT_TIMEOUT)
45+
{
46+
$this->timeout = $timeout;
47+
}
48+
49+
public function __invoke(ServerRequestInterface $request, $next)
50+
{
51+
return $next($request);
52+
}
53+
54+
/**
55+
* @return float
56+
* @internal
57+
*/
58+
public function getTimeout()
59+
{
60+
return $this->timeout;
61+
}
62+
}

tests/HttpServerTest.php

Lines changed: 23 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,8 @@
66
use React\EventLoop\Loop;
77
use React\Http\HttpServer;
88
use React\Http\Io\IniUtil;
9+
use React\Http\Io\StreamingServer;
10+
use React\Http\Middleware\InactiveConnectionTimeoutMiddleware;
911
use React\Http\Middleware\StreamingRequestMiddleware;
1012
use React\Promise;
1113
use React\Promise\Deferred;
@@ -60,10 +62,18 @@ public function testConstructWithoutLoopAssignsLoopAutomatically()
6062
$ref->setAccessible(true);
6163
$clock = $ref->getValue($streamingServer);
6264

65+
$ref = new \ReflectionProperty($streamingServer, 'parser');
66+
$ref->setAccessible(true);
67+
$parser = $ref->getValue($streamingServer);
68+
6369
$ref = new \ReflectionProperty($clock, 'loop');
6470
$ref->setAccessible(true);
6571
$loop = $ref->getValue($clock);
6672

73+
$ref = new \ReflectionProperty($parser, 'loop');
74+
$ref->setAccessible(true);
75+
$loop = $ref->getValue($parser);
76+
6777
$this->assertInstanceOf('React\EventLoop\LoopInterface', $loop);
6878
}
6979

@@ -257,6 +267,18 @@ function (ServerRequestInterface $request) use (&$streaming) {
257267
$this->assertEquals(true, $streaming);
258268
}
259269

270+
public function testIdleConnectionWillBeClosedAfterConfiguredTimeout()
271+
{
272+
$this->connection->expects($this->once())->method('close');
273+
274+
$http = new HttpServer(Loop::get(), new InactiveConnectionTimeoutMiddleware(0.1), $this->expectCallableNever());
275+
276+
$http->listen($this->socket);
277+
$this->socket->emit('connection', array($this->connection));
278+
279+
Loop::run();
280+
}
281+
260282
public function testForwardErrors()
261283
{
262284
$exception = new \Exception();
@@ -439,7 +461,7 @@ public function testConstructServerWithMemoryLimitDoesLimitConcurrency()
439461

440462
public function testConstructFiltersOutConfigurationMiddlewareBefore()
441463
{
442-
$http = new HttpServer(new StreamingRequestMiddleware(), function () { });
464+
$http = new HttpServer(new InactiveConnectionTimeoutMiddleware(0), new StreamingRequestMiddleware(), function () { });
443465

444466
$ref = new \ReflectionProperty($http, 'streamingServer');
445467
$ref->setAccessible(true);

tests/Io/RequestHeaderParserTest.php

Lines changed: 45 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -813,7 +813,7 @@ public function testServerParamsWillBeReusedForMultipleRequestsFromSameConnectio
813813
$clock = $this->getMockBuilder('React\Http\Io\Clock')->disableOriginalConstructor()->getMock();
814814
$clock->expects($this->exactly(2))->method('now')->willReturn(1652972091.3958);
815815

816-
$parser = new RequestHeaderParser($clock);
816+
$parser = $this->createRequestHeaderParser($clock);
817817

818818
$connection = $this->getMockBuilder('React\Socket\Connection')->disableOriginalConstructor()->setMethods(array('getLocalAddress', 'getRemoteAddress'))->getMock();
819819
$connection->expects($this->once())->method('getLocalAddress')->willReturn('tcp://127.1.1.1:8000');
@@ -848,7 +848,7 @@ public function testServerParamsWillBeRememberedUntilConnectionIsClosed()
848848
{
849849
$clock = $this->getMockBuilder('React\Http\Io\Clock')->disableOriginalConstructor()->getMock();
850850

851-
$parser = new RequestHeaderParser($clock);
851+
$parser = $this->createRequestHeaderParser($clock);
852852

853853
$connection = $this->getMockBuilder('React\Socket\Connection')->disableOriginalConstructor()->setMethods(array('getLocalAddress', 'getRemoteAddress'))->getMock();
854854

@@ -887,6 +887,49 @@ public function testQueryParametersWillBeSet()
887887
$this->assertEquals('this', $queryParams['test']);
888888
}
889889

890+
public function testIdleConnectionWillBeClosedAfterConfiguredTimeout()
891+
{
892+
$callback = null;
893+
$caughtError = null;
894+
$timer = $this->getMockBuilder('React\EventLoop\TimerInterface')->getMock();
895+
$loop = $this->getMockBuilder('React\EventLoop\LoopInterface')->getMock();
896+
$loop->expects($this->exactly(2))->method('addTimer')->with(0.1, $this->callback(function ($cb) use (&$tick, &$callback) {
897+
$callback = $cb;
898+
return true;
899+
}))->willReturn($timer);
900+
$loop->expects($this->any())->method('cancelTimer')->with($timer);
901+
902+
$connection = $this->getMockBuilder('React\Socket\Connection')->disableOriginalConstructor()->setMethods(array('close'))->getMock();
903+
$connection->expects($this->once())->method('close');
904+
905+
$parser = $this->createRequestHeaderParser(new Clock($loop), 0.1, $loop);
906+
$parser->on('error', function ($error) use (&$caughtError) {
907+
$caughtError = $error;
908+
});
909+
910+
$parser->handle($connection);
911+
912+
$connection->emit('data', array("GET /foo.php?hello=world&test=this HTTP/"));
913+
914+
self::assertTrue(is_callable($callback));
915+
$callback();
916+
917+
self::assertInstanceOf('\RuntimeException', $caughtError);
918+
self::assertSame('Request timed out', $caughtError->getMessage());
919+
}
920+
921+
public function testIdleConnectionWillNotBeClosedAfterConfiguredTimeoutOnFullRequest()
922+
{
923+
$connection = $this->getMockBuilder('React\Socket\Connection')->disableOriginalConstructor()->setMethods(array('close'))->getMock();
924+
$connection->expects($this->never())->method('close');
925+
926+
$parser = $this->createRequestHeaderParser(new Clock($this->getMockBuilder('React\EventLoop\LoopInterface')->getMock()), 0.1);
927+
928+
$parser->handle($connection);
929+
930+
$connection->emit('data', array($this->createGetRequest()));
931+
}
932+
890933
private function createGetRequest()
891934
{
892935
$data = "GET / HTTP/1.1\r\n";

0 commit comments

Comments
 (0)