Skip to content

Commit 7dc19ea

Browse files
authored
Implement response handlers (#54)
* Implement resopnse handlers Signed-off-by: Daniel Widdis <[email protected]> * Add missing test which uncovered missing return statement Signed-off-by: Daniel Widdis <[email protected]> * Ignore unreachable protobuf code branch Signed-off-by: Daniel Widdis <[email protected]> * Fix event loop deprecation warning Signed-off-by: Daniel Widdis <[email protected]> --------- Signed-off-by: Daniel Widdis <[email protected]>
1 parent ab6ce5a commit 7dc19ea

19 files changed

+375
-109
lines changed

.github/workflows/test.yml

+1
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ jobs:
3030
poetry install
3131
protoc -I=$PROTOBUF_SRC_DIR --python_out=$PROTOBUF_SRC_DIR $PROTOBUF_SRC_DIR/*.proto
3232
2to3 --no-diff -n -w $PROTOBUF_SRC_DIR
33+
find $PROTOBUF_SRC_DIR -type f -name "*_pb2.py" | xargs sed -i.bak -E 's/(USE_C_DESCRIPTORS == False:)/\1 # pragma: no cover/g'
3334
poetry run coverage run --source=src -m pytest -v
3435
poetry run coverage xml
3536
- name: Upload Coverage Report

src/opensearch_sdk_py/actions/internal/discovery_extensions_request_handler.py

+35-13
Original file line numberDiff line numberDiff line change
@@ -9,36 +9,58 @@
99

1010
import logging
1111

12+
from opensearch_sdk_py.actions.internal.register_rest_actions_response_handler import RegisterRestActionsResponseHandler
1213
from opensearch_sdk_py.actions.request_handler import RequestHandler
14+
from opensearch_sdk_py.actions.response_handlers import ResponseHandlers
1315
from opensearch_sdk_py.api.action_extension import ActionExtension
1416
from opensearch_sdk_py.transport.initialize_extension_request import InitializeExtensionRequest
17+
from opensearch_sdk_py.transport.initialize_extension_response import InitializeExtensionResponse
1518
from opensearch_sdk_py.transport.outbound_message_request import OutboundMessageRequest
19+
from opensearch_sdk_py.transport.outbound_message_response import OutboundMessageResponse
1620
from opensearch_sdk_py.transport.register_rest_actions_request import RegisterRestActionsRequest
1721
from opensearch_sdk_py.transport.stream_input import StreamInput
1822
from opensearch_sdk_py.transport.stream_output import StreamOutput
1923

2024

2125
class DiscoveryExtensionsRequestHandler(RequestHandler):
22-
def __init__(self, extension: ActionExtension) -> None:
26+
def __init__(self, extension: ActionExtension, response_handlers: ResponseHandlers) -> None:
2327
super().__init__("internal:discovery/extensions", extension)
28+
self.response_handlers = response_handlers
2429

2530
def handle(self, request: OutboundMessageRequest, input: StreamInput) -> StreamOutput:
2631
initialize_extension_request = InitializeExtensionRequest().read_from(input)
2732
logging.debug(f"< {initialize_extension_request}")
2833

34+
# Create the response message preserving the request id, but don't send it yet.
35+
# This will be sent when response handler calls send()
36+
self.response = OutboundMessageResponse(
37+
request.thread_context_struct,
38+
request.features,
39+
InitializeExtensionResponse(self.extension.name, self.extension.implemented_interfaces),
40+
request.version,
41+
request.request_id,
42+
request.is_handshake,
43+
request.is_compress,
44+
)
45+
2946
# Sometime between tcp and transport handshakes and the eventual response,
3047
# the uniqueId gets added to the thread context.
3148
# request.thread_context_struct.request_headers["extension_unique_id"] = self.extension.name
32-
self.extension.init_response_request_id = request.request_id
33-
34-
return self.send(
35-
OutboundMessageRequest(
36-
thread_context=request.thread_context_struct,
37-
features=request.features,
38-
message=RegisterRestActionsRequest(self.extension.name, self.extension.named_routes),
39-
version=request.version,
40-
action="internal:discovery/registerrestactions",
41-
is_handshake=False,
42-
is_compress=False,
43-
)
49+
50+
# Now send our own initialization requests.
51+
52+
# Create the request, this gets us an auto-increment request id
53+
register_request = OutboundMessageRequest(
54+
thread_context=request.thread_context_struct,
55+
features=request.features,
56+
message=RegisterRestActionsRequest(self.extension.name, self.extension.named_routes),
57+
version=request.version,
58+
action="internal:discovery/registerrestactions",
59+
is_handshake=False,
60+
is_compress=False,
4461
)
62+
# Register response handler to handle this request ID invoking this class's send()
63+
register_response_handler = RegisterRestActionsResponseHandler(self)
64+
self.response_handlers.register(register_request.request_id, register_response_handler)
65+
# Now send the request
66+
return register_response_handler.send(register_request)

src/opensearch_sdk_py/actions/internal/extension_rest_request_handler.py

+16-17
Original file line numberDiff line numberDiff line change
@@ -30,21 +30,20 @@ def handle(self, request: OutboundMessageRequest, input: StreamInput) -> StreamO
3030
route = f"{extension_rest_request.method.name} {extension_rest_request.path}"
3131
response = self.extension.handle(route, extension_rest_request)
3232

33-
return self.send(
34-
OutboundMessageResponse(
35-
request.thread_context_struct,
36-
request.features,
37-
RestExecuteOnExtensionResponse(
38-
status=response.status,
39-
content_type=response.content_type,
40-
content=response.content,
41-
headers=response.headers,
42-
consumed_params=response.consumed_params,
43-
content_consumed=response.content_consumed,
44-
),
45-
request.version,
46-
request.request_id,
47-
request.is_handshake,
48-
request.is_compress,
49-
)
33+
self.response = OutboundMessageResponse(
34+
request.thread_context_struct,
35+
request.features,
36+
RestExecuteOnExtensionResponse(
37+
status=response.status,
38+
content_type=response.content_type,
39+
content=response.content,
40+
headers=response.headers,
41+
consumed_params=response.consumed_params,
42+
content_consumed=response.content_consumed,
43+
),
44+
request.version,
45+
request.request_id,
46+
request.is_handshake,
47+
request.is_compress,
5048
)
49+
return self.send()
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
#
2+
# Copyright OpenSearch Contributors
3+
# SPDX-License-Identifier: Apache-2.0
4+
#
5+
# The OpenSearch Contributors require contributions made to
6+
# this file be licensed under the Apache-2.0 license or a
7+
# compatible open source license.
8+
#
9+
10+
import logging
11+
12+
from opensearch_sdk_py.actions.request_response_handler import RequestResponseHandler
13+
from opensearch_sdk_py.actions.response_handler import ResponseHandler
14+
from opensearch_sdk_py.transport.acknowledged_response import AcknowledgedResponse
15+
from opensearch_sdk_py.transport.outbound_message_request import OutboundMessageRequest
16+
from opensearch_sdk_py.transport.stream_input import StreamInput
17+
from opensearch_sdk_py.transport.stream_output import StreamOutput
18+
19+
20+
class RegisterRestActionsResponseHandler(ResponseHandler):
21+
def __init__(self, next_handler: RequestResponseHandler) -> None:
22+
self.next_handler = next_handler
23+
24+
def handle(self, request: OutboundMessageRequest, input: StreamInput) -> StreamOutput:
25+
ack_response = AcknowledgedResponse().read_from(input)
26+
logging.debug(f"< {ack_response}")
27+
if ack_response.status:
28+
return self.next_handler.send()
29+
else:
30+
# TODO error handling
31+
return None

src/opensearch_sdk_py/actions/internal/request_error_handler.py

+13-14
Original file line numberDiff line numberDiff line change
@@ -31,18 +31,17 @@ def __init__(
3131
super().__init__("internal:error", extension)
3232

3333
def handle(self, request: OutboundMessageRequest, input: StreamInput = None) -> StreamOutput:
34-
return self.send(
35-
OutboundMessageResponse(
36-
request.thread_context_struct,
37-
request.features,
38-
RestExecuteOnExtensionResponse(
39-
status=self.status,
40-
content_type=self.content_type,
41-
content=self.content,
42-
),
43-
request.version,
44-
request.request_id,
45-
request.is_handshake,
46-
request.is_compress,
47-
)
34+
self.response = OutboundMessageResponse(
35+
request.thread_context_struct,
36+
request.features,
37+
RestExecuteOnExtensionResponse(
38+
status=self.status,
39+
content_type=self.content_type,
40+
content=self.content,
41+
),
42+
request.version,
43+
request.request_id,
44+
request.is_handshake,
45+
request.is_compress,
4846
)
47+
return self.send()

src/opensearch_sdk_py/actions/internal/tcp_handshake_request_handler.py

+9-10
Original file line numberDiff line numberDiff line change
@@ -26,14 +26,13 @@ def __init__(self, extension: Extension) -> None:
2626
def handle(self, request: OutboundMessageRequest, input: StreamInput) -> StreamOutput:
2727
tcp_handshake = TransportHandshakerHandshakeRequest().read_from(input)
2828
logging.debug(f"< {tcp_handshake}")
29-
return self.send(
30-
OutboundMessageResponse(
31-
request.thread_context_struct,
32-
request.features,
33-
TransportHandshakerHandshakeResponse(request.version),
34-
request.version,
35-
request.request_id,
36-
request.is_handshake,
37-
request.is_compress,
38-
)
29+
self.response = OutboundMessageResponse(
30+
request.thread_context_struct,
31+
request.features,
32+
TransportHandshakerHandshakeResponse(request.version),
33+
request.version,
34+
request.request_id,
35+
request.is_handshake,
36+
request.is_compress,
3937
)
38+
return self.send()

src/opensearch_sdk_py/actions/internal/transport_handshake_request_handler.py

+21-22
Original file line numberDiff line numberDiff line change
@@ -30,26 +30,25 @@ def handle(self, request: OutboundMessageRequest, input: StreamInput) -> StreamO
3030
transport_handshake = TransportServiceHandshakeRequest().read_from(input)
3131
logging.debug(f"< {transport_handshake}")
3232

33-
return self.send(
34-
OutboundMessageResponse(
35-
request.thread_context_struct,
36-
request.features,
37-
TransportServiceHandshakeResponse(
38-
DiscoveryNode(
39-
node_name="hello-world",
40-
node_id="hello-world",
41-
address=TransportAddress("127.0.0.1", 1234),
42-
roles={
43-
DiscoveryNodeRole.CLUSTER_MANAGER_ROLE,
44-
DiscoveryNodeRole.DATA_ROLE,
45-
DiscoveryNodeRole.INGEST_ROLE,
46-
DiscoveryNodeRole.REMOTE_CLUSTER_CLIENT_ROLE,
47-
},
48-
)
49-
),
50-
request.version,
51-
request.request_id,
52-
request.is_handshake,
53-
request.is_compress,
54-
)
33+
self.response = OutboundMessageResponse(
34+
request.thread_context_struct,
35+
request.features,
36+
TransportServiceHandshakeResponse(
37+
DiscoveryNode(
38+
node_name="hello-world",
39+
node_id="hello-world",
40+
address=TransportAddress("127.0.0.1", 1234),
41+
roles={
42+
DiscoveryNodeRole.CLUSTER_MANAGER_ROLE,
43+
DiscoveryNodeRole.DATA_ROLE,
44+
DiscoveryNodeRole.INGEST_ROLE,
45+
DiscoveryNodeRole.REMOTE_CLUSTER_CLIENT_ROLE,
46+
},
47+
)
48+
),
49+
request.version,
50+
request.request_id,
51+
request.is_handshake,
52+
request.is_compress,
5553
)
54+
return self.send()

src/opensearch_sdk_py/actions/request_handler.py

+8-6
Original file line numberDiff line numberDiff line change
@@ -8,29 +8,31 @@
88
#
99

1010
import logging
11-
from abc import ABC, abstractmethod
11+
from abc import abstractmethod
1212
from typing import Optional
1313

14+
from opensearch_sdk_py.actions.request_response_handler import RequestResponseHandler
1415
from opensearch_sdk_py.extension import Extension
15-
from opensearch_sdk_py.transport.outbound_message import OutboundMessage
1616
from opensearch_sdk_py.transport.outbound_message_request import OutboundMessageRequest
17+
from opensearch_sdk_py.transport.outbound_message_response import OutboundMessageResponse
1718
from opensearch_sdk_py.transport.stream_input import StreamInput
1819
from opensearch_sdk_py.transport.stream_output import StreamOutput
1920

2021

21-
class RequestHandler(ABC):
22+
class RequestHandler(RequestResponseHandler):
2223
def __init__(self, action: str, extension: Extension):
2324
self.action = action
2425
self.extension = extension
26+
self.response: OutboundMessageResponse = None
2527

2628
@abstractmethod
2729
def handle(self, request: OutboundMessageRequest, input: StreamInput = None) -> Optional[bytes]:
2830
pass # pragma: no cover
2931

30-
def send(self, message: OutboundMessage) -> StreamOutput:
32+
def send(self) -> StreamOutput:
3133
output = StreamOutput()
32-
message.write_to(output)
34+
self.response.write_to(output)
3335
raw_out = output.getvalue()
34-
logging.info(f"> {message.__str__()}, size={len(raw_out)} byte(s)")
36+
logging.info(f"> {self.response.__str__()}, size={len(raw_out)} byte(s)")
3537
logging.debug(f"> #{raw_out}")
3638
return output
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
#
2+
# Copyright OpenSearch Contributors
3+
# SPDX-License-Identifier: Apache-2.0
4+
#
5+
# The OpenSearch Contributors require contributions made to
6+
# this file be licensed under the Apache-2.0 license or a
7+
# compatible open source license.
8+
#
9+
10+
from abc import ABC, abstractmethod
11+
12+
from opensearch_sdk_py.transport.stream_output import StreamOutput
13+
14+
15+
class RequestResponseHandler(ABC):
16+
@abstractmethod
17+
def send(self) -> StreamOutput:
18+
pass # pragma: no cover
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
#
2+
# Copyright OpenSearch Contributors
3+
# SPDX-License-Identifier: Apache-2.0
4+
#
5+
# The OpenSearch Contributors require contributions made to
6+
# this file be licensed under the Apache-2.0 license or a
7+
# compatible open source license.
8+
#
9+
10+
import logging
11+
from abc import abstractmethod
12+
from typing import Optional
13+
14+
from opensearch_sdk_py.actions.request_response_handler import RequestResponseHandler
15+
from opensearch_sdk_py.transport.outbound_message_request import OutboundMessageRequest
16+
from opensearch_sdk_py.transport.outbound_message_response import OutboundMessageResponse
17+
from opensearch_sdk_py.transport.stream_input import StreamInput
18+
from opensearch_sdk_py.transport.stream_output import StreamOutput
19+
20+
21+
class ResponseHandler(RequestResponseHandler):
22+
@abstractmethod
23+
def handle(self, response: OutboundMessageResponse, input: StreamInput = None) -> Optional[bytes]:
24+
pass # pragma: no cover
25+
26+
def send(self, request: OutboundMessageRequest) -> StreamOutput:
27+
output = StreamOutput()
28+
request.write_to(output)
29+
raw_out = output.getvalue()
30+
logging.info(f"> {request.__str__()}, size={len(raw_out)} byte(s)")
31+
logging.debug(f"> #{raw_out}")
32+
return output
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
#
2+
# Copyright OpenSearch Contributors
3+
# SPDX-License-Identifier: Apache-2.0
4+
#
5+
# The OpenSearch Contributors require contributions made to
6+
# this file be licensed under the Apache-2.0 license or a
7+
# compatible open source license.
8+
#
9+
10+
from typing import Dict, Optional
11+
12+
from opensearch_sdk_py.actions.response_handler import ResponseHandler
13+
from opensearch_sdk_py.extension import Extension
14+
from opensearch_sdk_py.transport.outbound_message_response import OutboundMessageResponse
15+
from opensearch_sdk_py.transport.stream_input import StreamInput
16+
17+
18+
class ResponseHandlers(Dict[int, ResponseHandler]):
19+
def __init__(self, extension: Extension) -> None:
20+
self.extension = extension
21+
22+
def register(self, request_id: int, handler: ResponseHandler) -> None:
23+
self[request_id] = handler
24+
25+
def handle(self, response: OutboundMessageResponse, input: StreamInput = None) -> Optional[bytes]:
26+
handler = self[response.request_id]
27+
del self[response.request_id]
28+
return handler.handle(response, input) if handler else None

0 commit comments

Comments
 (0)