Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
68 changes: 68 additions & 0 deletions libs/executors/garf_executors/entrypoints/grpc_server.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
# Copyright 2025 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""gRPC endpoint for garf."""

import argparse
import logging
from concurrent import futures

import grpc
from google.protobuf.json_format import MessageToDict
from grpc_reflection.v1alpha import reflection

import garf_executors
from garf_executors import garf_pb2, garf_pb2_grpc
from garf_executors.entrypoints.tracer import initialize_tracer


class GarfService(garf_pb2_grpc.GarfService):
def Execute(self, request, context):
query_executor = garf_executors.setup_executor(
request.source, request.context.fetcher_parameters
)
execution_context = garf_executors.execution_context.ExecutionContext(
**MessageToDict(request.context, preserving_proto_field_name=True)
)
result = query_executor.execute(
query=request.query,
title=request.title,
context=execution_context,
)
return garf_pb2.ExecuteResponse(results=[result])


if __name__ == '__main__':
parser = argparse.ArgumentParser()
parser.add_argument('--port', dest='port', default=50051, type=int)
parser.add_argument(
'--parallel-threshold', dest='parallel_threshold', default=10, type=int
)
args, _ = parser.parse_known_args()
initialize_tracer()
server = grpc.server(
futures.ThreadPoolExecutor(max_workers=args.parallel_threshold)
)

service = GarfService()
garf_pb2_grpc.add_GarfServiceServicer_to_server(service, server)
SERVICE_NAMES = (
garf_pb2.DESCRIPTOR.services_by_name['GarfService'].full_name,
reflection.SERVICE_NAME,
)
reflection.enable_server_reflection(SERVICE_NAMES, server)
server.add_insecure_port(f'[::]:{args.port}')
server.start()
logging.info('Garf service started, listening on port %d', 50051)
server.wait_for_termination()
45 changes: 45 additions & 0 deletions libs/executors/garf_executors/garf_pb2.py

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

97 changes: 97 additions & 0 deletions libs/executors/garf_executors/garf_pb2_grpc.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
# Generated by the gRPC Python protocol compiler plugin. DO NOT EDIT!
"""Client and server classes corresponding to protobuf-defined services."""
import grpc
import warnings

from . import garf_pb2 as garf__pb2

GRPC_GENERATED_VERSION = '1.75.0'
GRPC_VERSION = grpc.__version__
_version_not_supported = False

try:
from grpc._utilities import first_version_is_lower
_version_not_supported = first_version_is_lower(GRPC_VERSION, GRPC_GENERATED_VERSION)
except ImportError:
_version_not_supported = True

if _version_not_supported:
raise RuntimeError(
f'The grpc package installed is at version {GRPC_VERSION},'
+ f' but the generated code in garf_pb2_grpc.py depends on'
+ f' grpcio>={GRPC_GENERATED_VERSION}.'
+ f' Please upgrade your grpc module to grpcio>={GRPC_GENERATED_VERSION}'
+ f' or downgrade your generated code using grpcio-tools<={GRPC_VERSION}.'
)


class GarfServiceStub(object):
"""Missing associated documentation comment in .proto file."""

def __init__(self, channel):
"""Constructor.

Args:
channel: A grpc.Channel.
"""
self.Execute = channel.unary_unary(
'/garf.GarfService/Execute',
request_serializer=garf__pb2.ExecuteRequest.SerializeToString,
response_deserializer=garf__pb2.ExecuteResponse.FromString,
_registered_method=True)


class GarfServiceServicer(object):
"""Missing associated documentation comment in .proto file."""

def Execute(self, request, context):
"""Missing associated documentation comment in .proto file."""
context.set_code(grpc.StatusCode.UNIMPLEMENTED)
context.set_details('Method not implemented!')
raise NotImplementedError('Method not implemented!')


def add_GarfServiceServicer_to_server(servicer, server):
rpc_method_handlers = {
'Execute': grpc.unary_unary_rpc_method_handler(
servicer.Execute,
request_deserializer=garf__pb2.ExecuteRequest.FromString,
response_serializer=garf__pb2.ExecuteResponse.SerializeToString,
),
}
generic_handler = grpc.method_handlers_generic_handler(
'garf.GarfService', rpc_method_handlers)
server.add_generic_rpc_handlers((generic_handler,))
server.add_registered_method_handlers('garf.GarfService', rpc_method_handlers)


# This class is part of an EXPERIMENTAL API.
class GarfService(object):
"""Missing associated documentation comment in .proto file."""

@staticmethod
def Execute(request,
target,
options=(),
channel_credentials=None,
call_credentials=None,
insecure=False,
compression=None,
wait_for_ready=None,
timeout=None,
metadata=None):
return grpc.experimental.unary_unary(
request,
target,
'/garf.GarfService/Execute',
garf__pb2.ExecuteRequest.SerializeToString,
garf__pb2.ExecuteResponse.FromString,
options,
channel_credentials,
insecure,
call_credentials,
compression,
wait_for_ready,
timeout,
metadata,
_registered_method=True)
3 changes: 3 additions & 0 deletions libs/executors/generate_protos.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
python -m grpc_tools.protoc -I=../../protos/ \
--python_out=./garf_executors --grpc_python_out=./garf_executors \
../../protos/garf.proto
34 changes: 34 additions & 0 deletions libs/executors/tests/end-to-end/test_grpc_server.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
# Copyright 2025 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import grpc
import pytest
from garf_executors import garf_pb2 as pb
from garf_executors import garf_pb2_grpc


@pytest.mark.skip
def test_grpc_call():
with grpc.insecure_channel('localhost:50051') as channel:
stub = garf_pb2_grpc.GarfServiceStub(channel)
request = pb.ExecuteRequest(
source='rest',
title='example',
query='SELECT id, name AS model, data.color AS color FROM objects',
context=pb.ExecutionContext(
fetcher_parameters={'endpoint': 'https://api.restful-api.dev'},
writer='csv',
),
)
result = stub.Execute(request)
assert 'CSV' in result.results[0]
2 changes: 2 additions & 0 deletions libs/ruff.toml
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ exclude = [
"node_modules",
"site-packages",
"venv",
"*pb2.py",
"*pb2_grpc.py",
]

line-length = 80
Expand Down
33 changes: 33 additions & 0 deletions protos/garf.proto
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
syntax = "proto3";
package garf;
import "google/protobuf/struct.proto";


service GarfService {
rpc Execute(ExecuteRequest) returns (ExecuteResponse) {}
}

message ExecuteRequest {
string source = 1;
string title = 2;
string query = 3;
ExecutionContext context = 4;
}


message ExecutionContext {
QueryParameters query_parameters = 1;
google.protobuf.Struct fetcher_parameters = 2;
string writer = 3;
google.protobuf.Struct writer_parameters = 4;
}


message QueryParameters {
google.protobuf.Struct macro = 1;
google.protobuf.Struct template = 2;
}

message ExecuteResponse {
repeated string results = 1;
}