Skip to content

Commit c884338

Browse files
authored
Core acceptor pool doc, cleanup and standalone example (#393)
* Better document acceptor module and add a TCP Echo Server example * autopep8 formating * Rename ThreadlessWork --> Work class * Make initialize, is_inactive and shutdown as optional interface methods. Also introduce Readables & Writables custom types. * Move websocket code into its own module * Add websocket client example * Cleanup websocket client
1 parent 9be6c29 commit c884338

27 files changed

+467
-239
lines changed

Makefile

+3-2
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ devtools:
2525
pushd dashboard && npm run devtools && popd
2626

2727
autopep8:
28+
autopep8 --recursive --in-place --aggressive examples
2829
autopep8 --recursive --in-place --aggressive proxy
2930
autopep8 --recursive --in-place --aggressive tests
3031
autopep8 --recursive --in-place --aggressive setup.py
@@ -73,8 +74,8 @@ lib-clean:
7374
rm -rf .hypothesis
7475

7576
lib-lint:
76-
flake8 --ignore=W504 --max-line-length=127 --max-complexity=19 proxy/ tests/ setup.py
77-
mypy --strict --ignore-missing-imports proxy/ tests/ setup.py
77+
flake8 --ignore=W504 --max-line-length=127 --max-complexity=19 examples/ proxy/ tests/ setup.py
78+
mypy --strict --ignore-missing-imports examples/ proxy/ tests/ setup.py
7879

7980
lib-test: lib-clean lib-version lib-lint
8081
pytest -v tests/

examples/README.md

+5
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
# Proxy.py Library Examples
2+
3+
This directory contains examples that demonstrate `proxy.py` core library capabilities.
4+
5+
Looking for `proxy.py` plugin examples? Check [proxy/plugin](https://github.com/abhinavsingh/proxy.py/tree/develop/proxy/plugin) directory.

examples/tcp_echo_server.py

+75
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,75 @@
1+
# -*- coding: utf-8 -*-
2+
"""
3+
proxy.py
4+
~~~~~~~~
5+
⚡⚡⚡ Fast, Lightweight, Pluggable, TLS interception capable proxy server focused on
6+
Network monitoring, controls & Application development, testing, debugging.
7+
8+
:copyright: (c) 2013-present by Abhinav Singh and contributors.
9+
:license: BSD, see LICENSE for more details.
10+
"""
11+
import time
12+
import socket
13+
import selectors
14+
15+
from typing import Dict
16+
17+
from proxy.core.acceptor import AcceptorPool, Work
18+
from proxy.common.flags import Flags
19+
from proxy.common.types import Readables, Writables
20+
21+
22+
class EchoServerHandler(Work):
23+
"""EchoServerHandler implements Work interface.
24+
25+
An instance of EchoServerHandler is created for each client
26+
connection. EchoServerHandler lifecycle is controlled by
27+
Threadless core using asyncio. Implementation must provide
28+
get_events and handle_events method. Optionally, also implement
29+
intialize, is_inactive and shutdown method.
30+
"""
31+
32+
def get_events(self) -> Dict[socket.socket, int]:
33+
# We always want to read from client
34+
# Register for EVENT_READ events
35+
events = {self.client.connection: selectors.EVENT_READ}
36+
# If there is pending buffer for client
37+
# also register for EVENT_WRITE events
38+
if self.client.has_buffer():
39+
events[self.client.connection] |= selectors.EVENT_WRITE
40+
return events
41+
42+
def handle_events(
43+
self,
44+
readables: Readables,
45+
writables: Writables) -> bool:
46+
"""Return True to shutdown work."""
47+
if self.client.connection in readables:
48+
data = self.client.recv()
49+
if data is None:
50+
# Client closed connection, signal shutdown
51+
return True
52+
# Queue data back to client
53+
self.client.queue(data)
54+
55+
if self.client.connection in writables:
56+
self.client.flush()
57+
58+
return False
59+
60+
61+
def main() -> None:
62+
# This example requires `threadless=True`
63+
pool = AcceptorPool(
64+
flags=Flags(num_workers=1, threadless=True),
65+
work_klass=EchoServerHandler)
66+
try:
67+
pool.setup()
68+
while True:
69+
time.sleep(1)
70+
finally:
71+
pool.shutdown()
72+
73+
74+
if __name__ == '__main__':
75+
main()

examples/websocket_client.py

+44
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,44 @@
1+
# -*- coding: utf-8 -*-
2+
"""
3+
proxy.py
4+
~~~~~~~~
5+
⚡⚡⚡ Fast, Lightweight, Pluggable, TLS interception capable proxy server focused on
6+
Network monitoring, controls & Application development, testing, debugging.
7+
8+
:copyright: (c) 2013-present by Abhinav Singh and contributors.
9+
:license: BSD, see LICENSE for more details.
10+
"""
11+
import time
12+
from proxy.http.websocket import WebsocketClient, WebsocketFrame, websocketOpcodes
13+
14+
15+
# globals
16+
client: WebsocketClient
17+
last_dispatch_time: float
18+
static_frame = memoryview(WebsocketFrame.text(b'hello'))
19+
num_echos = 10
20+
21+
22+
def on_message(frame: WebsocketFrame) -> None:
23+
"""WebsocketClient on_message callback."""
24+
global client, num_echos, last_dispatch_time
25+
print('Received %r after %d millisec' % (frame.data, (time.time() - last_dispatch_time) * 1000))
26+
assert(frame.data == b'hello' and frame.opcode == websocketOpcodes.TEXT_FRAME)
27+
if num_echos > 0:
28+
client.queue(static_frame)
29+
last_dispatch_time = time.time()
30+
num_echos -= 1
31+
else:
32+
client.close()
33+
34+
35+
if __name__ == '__main__':
36+
# Constructor establishes socket connection
37+
client = WebsocketClient(b'echo.websocket.org', 80, b'/', on_message=on_message)
38+
# Perform handshake
39+
client.handshake()
40+
# Queue some data for client
41+
client.queue(static_frame)
42+
last_dispatch_time = time.time()
43+
# Start event loop
44+
client.run()

proxy/common/flags.py

+7-8
Original file line numberDiff line numberDiff line change
@@ -21,8 +21,9 @@
2121
import sys
2222
import inspect
2323

24-
from typing import Optional, Union, Dict, List, TypeVar, Type, cast, Any, Tuple
24+
from typing import Optional, Dict, List, TypeVar, Type, cast, Any, Tuple
2525

26+
from .types import IpAddress
2627
from .utils import text_, bytes_
2728
from .constants import DEFAULT_LOG_LEVEL, DEFAULT_LOG_FILE, DEFAULT_LOG_FORMAT, DEFAULT_BACKLOG, DEFAULT_BASIC_AUTH
2829
from .constants import DEFAULT_TIMEOUT, DEFAULT_DEVTOOLS_WS_PATH, DEFAULT_DISABLE_HTTP_PROXY, DEFAULT_DISABLE_HEADERS
@@ -67,8 +68,7 @@ def __init__(
6768
ca_signing_key_file: Optional[str] = None,
6869
ca_file: Optional[str] = None,
6970
num_workers: int = 0,
70-
hostname: Union[ipaddress.IPv4Address,
71-
ipaddress.IPv6Address] = DEFAULT_IPV6_HOSTNAME,
71+
hostname: IpAddress = DEFAULT_IPV6_HOSTNAME,
7272
port: int = DEFAULT_PORT,
7373
backlog: int = DEFAULT_BACKLOG,
7474
static_server_dir: str = DEFAULT_STATIC_SERVER_DIR,
@@ -99,8 +99,7 @@ def __init__(
9999
self.ca_signing_key_file: Optional[str] = ca_signing_key_file
100100
self.ca_file = ca_file
101101
self.num_workers: int = num_workers if num_workers > 0 else multiprocessing.cpu_count()
102-
self.hostname: Union[ipaddress.IPv4Address,
103-
ipaddress.IPv6Address] = hostname
102+
self.hostname: IpAddress = hostname
104103
self.family: socket.AddressFamily = socket.AF_INET6 if hostname.version == 6 else socket.AF_INET
105104
self.port: int = port
106105
self.backlog: int = backlog
@@ -161,7 +160,8 @@ def initialize(
161160
# Setup limits
162161
Flags.set_open_file_limit(args.open_file_limit)
163162

164-
# Prepare list of plugins to load based upon --enable-* and --disable-* flags
163+
# Prepare list of plugins to load based upon --enable-* and --disable-*
164+
# flags
165165
default_plugins: List[Tuple[str, bool]] = []
166166
if args.enable_dashboard:
167167
default_plugins.append((PLUGIN_WEB_SERVER, True))
@@ -249,8 +249,7 @@ def initialize(
249249
opts.get(
250250
'ca_file',
251251
args.ca_file)),
252-
hostname=cast(Union[ipaddress.IPv4Address,
253-
ipaddress.IPv6Address],
252+
hostname=cast(IpAddress,
254253
opts.get('hostname', ipaddress.ip_address(args.hostname))),
255254
port=cast(int, opts.get('port', args.port)),
256255
backlog=cast(int, opts.get('backlog', args.backlog)),

proxy/common/types.py

+7-1
Original file line numberDiff line numberDiff line change
@@ -9,8 +9,9 @@
99
:license: BSD, see LICENSE for more details.
1010
"""
1111
import queue
12+
import ipaddress
1213

13-
from typing import TYPE_CHECKING, Dict, Any
14+
from typing import TYPE_CHECKING, Dict, Any, List, Union
1415

1516
from typing_extensions import Protocol
1617

@@ -23,3 +24,8 @@
2324
class HasFileno(Protocol):
2425
def fileno(self) -> int:
2526
... # pragma: no cover
27+
28+
29+
Readables = List[Union[int, HasFileno]]
30+
Writables = List[Union[int, HasFileno]]
31+
IpAddress = Union[ipaddress.IPv4Address, ipaddress.IPv6Address]

proxy/common/utils.py

+3-1
Original file line numberDiff line numberDiff line change
@@ -101,7 +101,8 @@ def build_http_pkt(line: List[bytes],
101101
def build_websocket_handshake_request(
102102
key: bytes,
103103
method: bytes = b'GET',
104-
url: bytes = b'/') -> bytes:
104+
url: bytes = b'/',
105+
host: bytes = b'localhost') -> bytes:
105106
"""
106107
Build and returns a Websocket handshake request packet.
107108
@@ -112,6 +113,7 @@ def build_websocket_handshake_request(
112113
return build_http_request(
113114
method, url,
114115
headers={
116+
b'Host': host,
115117
b'Connection': b'upgrade',
116118
b'Upgrade': b'websocket',
117119
b'Sec-WebSocket-Key': key,

proxy/core/acceptor/__init__.py

+4
Original file line numberDiff line numberDiff line change
@@ -10,8 +10,12 @@
1010
"""
1111
from .acceptor import Acceptor
1212
from .pool import AcceptorPool
13+
from .work import Work
14+
from .threadless import Threadless
1315

1416
__all__ = [
1517
'Acceptor',
1618
'AcceptorPool',
19+
'Work',
20+
'Threadless',
1721
]

proxy/core/acceptor/acceptor.py

+10-10
Original file line numberDiff line numberDiff line change
@@ -14,32 +14,36 @@
1414
import selectors
1515
import socket
1616
import threading
17-
# import time
17+
1818
from multiprocessing import connection
1919
from multiprocessing.reduction import send_handle, recv_handle
2020
from typing import Optional, Type, Tuple
2121

22+
from .work import Work
23+
from .threadless import Threadless
24+
2225
from ..connection import TcpClientConnection
23-
from ..threadless import ThreadlessWork, Threadless
2426
from ..event import EventQueue, eventNames
2527
from ...common.flags import Flags
2628

2729
logger = logging.getLogger(__name__)
2830

2931

3032
class Acceptor(multiprocessing.Process):
31-
"""Socket client acceptor.
33+
"""Socket server acceptor process.
3234
33-
Accepts client connection over received server socket handle and
34-
starts a new work thread.
35+
Accepts client connection over received server socket handle at startup. Spawns a separate
36+
thread to handle each client request. However, when `--threadless` is enabled, Acceptor also
37+
pre-spawns a `Threadless` process at startup. Accepted client connections are passed to
38+
`Threadless` process which internally uses asyncio event loop to handle client connections.
3539
"""
3640

3741
def __init__(
3842
self,
3943
idd: int,
4044
work_queue: connection.Connection,
4145
flags: Flags,
42-
work_klass: Type[ThreadlessWork],
46+
work_klass: Type[Work],
4347
lock: multiprocessing.synchronize.Lock,
4448
event_queue: Optional[EventQueue] = None) -> None:
4549
super().__init__()
@@ -108,11 +112,7 @@ def run_once(self) -> None:
108112
if len(events) == 0:
109113
return
110114
conn, addr = self.sock.accept()
111-
112-
# now = time.time()
113-
# fileno: int = conn.fileno()
114115
self.start_work(conn, addr)
115-
# logger.info('Work started for fd %d in %f seconds', fileno, time.time() - now)
116116

117117
def run(self) -> None:
118118
self.selector = selectors.DefaultSelector()

proxy/core/acceptor/pool.py

+14-4
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,8 @@
1818
from typing import List, Optional, Type
1919

2020
from .acceptor import Acceptor
21-
from ..threadless import ThreadlessWork
21+
from .work import Work
22+
2223
from ..event import EventQueue, EventDispatcher
2324
from ...common.flags import Flags
2425

@@ -31,11 +32,20 @@ class AcceptorPool:
3132
"""AcceptorPool.
3233
3334
Pre-spawns worker processes to utilize all cores available on the system. Server socket connection is
34-
dispatched over a pipe to workers. Each worker accepts incoming client request and spawns a
35-
separate thread to handle the client request.
35+
dispatched over a pipe to workers. Each Acceptor instance accepts for new client connection.
36+
37+
Example usage:
38+
39+
pool = AcceptorPool(flags=..., work_klass=...)
40+
try:
41+
pool.setup()
42+
while True:
43+
time.sleep(1)
44+
finally:
45+
pool.shutdown()
3646
"""
3747

38-
def __init__(self, flags: Flags, work_klass: Type[ThreadlessWork]) -> None:
48+
def __init__(self, flags: Flags, work_klass: Type[Work]) -> None:
3949
self.flags = flags
4050
self.socket: Optional[socket.socket] = None
4151
self.acceptors: List[Acceptor] = []

0 commit comments

Comments
 (0)