diff --git a/libs/executors/garf_executors/entrypoints/grpc_server.py b/libs/executors/garf_executors/entrypoints/grpc_server.py new file mode 100644 index 0000000..a017e76 --- /dev/null +++ b/libs/executors/garf_executors/entrypoints/grpc_server.py @@ -0,0 +1,68 @@ +# Copyright 2025 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""gRPC endpoint for garf.""" + +import argparse +import logging +from concurrent import futures + +import grpc +from google.protobuf.json_format import MessageToDict +from grpc_reflection.v1alpha import reflection + +import garf_executors +from garf_executors import garf_pb2, garf_pb2_grpc +from garf_executors.entrypoints.tracer import initialize_tracer + + +class GarfService(garf_pb2_grpc.GarfService): + def Execute(self, request, context): + query_executor = garf_executors.setup_executor( + request.source, request.context.fetcher_parameters + ) + execution_context = garf_executors.execution_context.ExecutionContext( + **MessageToDict(request.context, preserving_proto_field_name=True) + ) + result = query_executor.execute( + query=request.query, + title=request.title, + context=execution_context, + ) + return garf_pb2.ExecuteResponse(results=[result]) + + +if __name__ == '__main__': + parser = argparse.ArgumentParser() + parser.add_argument('--port', dest='port', default=50051, type=int) + parser.add_argument( + '--parallel-threshold', dest='parallel_threshold', default=10, type=int + ) + args, _ = parser.parse_known_args() + initialize_tracer() + server = grpc.server( + futures.ThreadPoolExecutor(max_workers=args.parallel_threshold) + ) + + service = GarfService() + garf_pb2_grpc.add_GarfServiceServicer_to_server(service, server) + SERVICE_NAMES = ( + garf_pb2.DESCRIPTOR.services_by_name['GarfService'].full_name, + reflection.SERVICE_NAME, + ) + reflection.enable_server_reflection(SERVICE_NAMES, server) + server.add_insecure_port(f'[::]:{args.port}') + server.start() + logging.info('Garf service started, listening on port %d', 50051) + server.wait_for_termination() diff --git a/libs/executors/garf_executors/garf_pb2.py b/libs/executors/garf_executors/garf_pb2.py new file mode 100644 index 0000000..c6d579f --- /dev/null +++ b/libs/executors/garf_executors/garf_pb2.py @@ -0,0 +1,45 @@ +# -*- coding: utf-8 -*- +# Generated by the protocol buffer compiler. DO NOT EDIT! +# NO CHECKED-IN PROTOBUF GENCODE +# source: garf.proto +# Protobuf Python Version: 6.31.1 +"""Generated protocol buffer code.""" +from google.protobuf import descriptor as _descriptor +from google.protobuf import descriptor_pool as _descriptor_pool +from google.protobuf import runtime_version as _runtime_version +from google.protobuf import symbol_database as _symbol_database +from google.protobuf.internal import builder as _builder +_runtime_version.ValidateProtobufRuntimeVersion( + _runtime_version.Domain.PUBLIC, + 6, + 31, + 1, + '', + 'garf.proto' +) +# @@protoc_insertion_point(imports) + +_sym_db = _symbol_database.Default() + + +from google.protobuf import struct_pb2 as google_dot_protobuf_dot_struct__pb2 + + +DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile(b'\n\ngarf.proto\x12\x04garf\x1a\x1cgoogle/protobuf/struct.proto\"g\n\x0e\x45xecuteRequest\x12\x0e\n\x06source\x18\x01 \x01(\t\x12\r\n\x05title\x18\x02 \x01(\t\x12\r\n\x05query\x18\x03 \x01(\t\x12\'\n\x07\x63ontext\x18\x04 \x01(\x0b\x32\x16.garf.ExecutionContext\"\xbc\x01\n\x10\x45xecutionContext\x12/\n\x10query_parameters\x18\x01 \x01(\x0b\x32\x15.garf.QueryParameters\x12\x33\n\x12\x66\x65tcher_parameters\x18\x02 \x01(\x0b\x32\x17.google.protobuf.Struct\x12\x0e\n\x06writer\x18\x03 \x01(\t\x12\x32\n\x11writer_parameters\x18\x04 \x01(\x0b\x32\x17.google.protobuf.Struct\"d\n\x0fQueryParameters\x12&\n\x05macro\x18\x01 \x01(\x0b\x32\x17.google.protobuf.Struct\x12)\n\x08template\x18\x02 \x01(\x0b\x32\x17.google.protobuf.Struct\"\"\n\x0f\x45xecuteResponse\x12\x0f\n\x07results\x18\x01 \x03(\t2G\n\x0bGarfService\x12\x38\n\x07\x45xecute\x12\x14.garf.ExecuteRequest\x1a\x15.garf.ExecuteResponse\"\x00\x62\x06proto3') + +_globals = globals() +_builder.BuildMessageAndEnumDescriptors(DESCRIPTOR, _globals) +_builder.BuildTopDescriptorsAndMessages(DESCRIPTOR, 'garf_pb2', _globals) +if not _descriptor._USE_C_DESCRIPTORS: + DESCRIPTOR._loaded_options = None + _globals['_EXECUTEREQUEST']._serialized_start=50 + _globals['_EXECUTEREQUEST']._serialized_end=153 + _globals['_EXECUTIONCONTEXT']._serialized_start=156 + _globals['_EXECUTIONCONTEXT']._serialized_end=344 + _globals['_QUERYPARAMETERS']._serialized_start=346 + _globals['_QUERYPARAMETERS']._serialized_end=446 + _globals['_EXECUTERESPONSE']._serialized_start=448 + _globals['_EXECUTERESPONSE']._serialized_end=482 + _globals['_GARFSERVICE']._serialized_start=484 + _globals['_GARFSERVICE']._serialized_end=555 +# @@protoc_insertion_point(module_scope) diff --git a/libs/executors/garf_executors/garf_pb2_grpc.py b/libs/executors/garf_executors/garf_pb2_grpc.py new file mode 100644 index 0000000..bead4d6 --- /dev/null +++ b/libs/executors/garf_executors/garf_pb2_grpc.py @@ -0,0 +1,97 @@ +# Generated by the gRPC Python protocol compiler plugin. DO NOT EDIT! +"""Client and server classes corresponding to protobuf-defined services.""" +import grpc +import warnings + +from . import garf_pb2 as garf__pb2 + +GRPC_GENERATED_VERSION = '1.75.0' +GRPC_VERSION = grpc.__version__ +_version_not_supported = False + +try: + from grpc._utilities import first_version_is_lower + _version_not_supported = first_version_is_lower(GRPC_VERSION, GRPC_GENERATED_VERSION) +except ImportError: + _version_not_supported = True + +if _version_not_supported: + raise RuntimeError( + f'The grpc package installed is at version {GRPC_VERSION},' + + f' but the generated code in garf_pb2_grpc.py depends on' + + f' grpcio>={GRPC_GENERATED_VERSION}.' + + f' Please upgrade your grpc module to grpcio>={GRPC_GENERATED_VERSION}' + + f' or downgrade your generated code using grpcio-tools<={GRPC_VERSION}.' + ) + + +class GarfServiceStub(object): + """Missing associated documentation comment in .proto file.""" + + def __init__(self, channel): + """Constructor. + + Args: + channel: A grpc.Channel. + """ + self.Execute = channel.unary_unary( + '/garf.GarfService/Execute', + request_serializer=garf__pb2.ExecuteRequest.SerializeToString, + response_deserializer=garf__pb2.ExecuteResponse.FromString, + _registered_method=True) + + +class GarfServiceServicer(object): + """Missing associated documentation comment in .proto file.""" + + def Execute(self, request, context): + """Missing associated documentation comment in .proto file.""" + context.set_code(grpc.StatusCode.UNIMPLEMENTED) + context.set_details('Method not implemented!') + raise NotImplementedError('Method not implemented!') + + +def add_GarfServiceServicer_to_server(servicer, server): + rpc_method_handlers = { + 'Execute': grpc.unary_unary_rpc_method_handler( + servicer.Execute, + request_deserializer=garf__pb2.ExecuteRequest.FromString, + response_serializer=garf__pb2.ExecuteResponse.SerializeToString, + ), + } + generic_handler = grpc.method_handlers_generic_handler( + 'garf.GarfService', rpc_method_handlers) + server.add_generic_rpc_handlers((generic_handler,)) + server.add_registered_method_handlers('garf.GarfService', rpc_method_handlers) + + + # This class is part of an EXPERIMENTAL API. +class GarfService(object): + """Missing associated documentation comment in .proto file.""" + + @staticmethod + def Execute(request, + target, + options=(), + channel_credentials=None, + call_credentials=None, + insecure=False, + compression=None, + wait_for_ready=None, + timeout=None, + metadata=None): + return grpc.experimental.unary_unary( + request, + target, + '/garf.GarfService/Execute', + garf__pb2.ExecuteRequest.SerializeToString, + garf__pb2.ExecuteResponse.FromString, + options, + channel_credentials, + insecure, + call_credentials, + compression, + wait_for_ready, + timeout, + metadata, + _registered_method=True) diff --git a/libs/executors/generate_protos.sh b/libs/executors/generate_protos.sh new file mode 100644 index 0000000..bff497c --- /dev/null +++ b/libs/executors/generate_protos.sh @@ -0,0 +1,3 @@ +python -m grpc_tools.protoc -I=../../protos/ \ + --python_out=./garf_executors --grpc_python_out=./garf_executors \ + ../../protos/garf.proto diff --git a/libs/executors/tests/end-to-end/test_grpc_server.py b/libs/executors/tests/end-to-end/test_grpc_server.py new file mode 100644 index 0000000..28b18d8 --- /dev/null +++ b/libs/executors/tests/end-to-end/test_grpc_server.py @@ -0,0 +1,34 @@ +# Copyright 2025 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +import grpc +import pytest +from garf_executors import garf_pb2 as pb +from garf_executors import garf_pb2_grpc + + +@pytest.mark.skip +def test_grpc_call(): + with grpc.insecure_channel('localhost:50051') as channel: + stub = garf_pb2_grpc.GarfServiceStub(channel) + request = pb.ExecuteRequest( + source='rest', + title='example', + query='SELECT id, name AS model, data.color AS color FROM objects', + context=pb.ExecutionContext( + fetcher_parameters={'endpoint': 'https://api.restful-api.dev'}, + writer='csv', + ), + ) + result = stub.Execute(request) + assert 'CSV' in result.results[0] diff --git a/libs/ruff.toml b/libs/ruff.toml index 2261166..4b5c14e 100644 --- a/libs/ruff.toml +++ b/libs/ruff.toml @@ -26,6 +26,8 @@ exclude = [ "node_modules", "site-packages", "venv", + "*pb2.py", + "*pb2_grpc.py", ] line-length = 80 diff --git a/protos/garf.proto b/protos/garf.proto new file mode 100644 index 0000000..c6cf60c --- /dev/null +++ b/protos/garf.proto @@ -0,0 +1,33 @@ +syntax = "proto3"; +package garf; +import "google/protobuf/struct.proto"; + + +service GarfService { + rpc Execute(ExecuteRequest) returns (ExecuteResponse) {} +} + +message ExecuteRequest { + string source = 1; + string title = 2; + string query = 3; + ExecutionContext context = 4; +} + + +message ExecutionContext { + QueryParameters query_parameters = 1; + google.protobuf.Struct fetcher_parameters = 2; + string writer = 3; + google.protobuf.Struct writer_parameters = 4; +} + + +message QueryParameters { + google.protobuf.Struct macro = 1; + google.protobuf.Struct template = 2; +} + +message ExecuteResponse { + repeated string results = 1; +}