Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 21 additions & 14 deletions libs/executors/garf_executors/api_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@
class ApiExecutionContext(execution_context.ExecutionContext):
"""Common context for executing one or more queries."""

writer: str = 'console'
writer: str | list[str] = 'console'


class ApiQueryExecutor(executor.Executor):
Expand Down Expand Up @@ -94,20 +94,27 @@ def execute(
args=context.query_parameters,
**context.fetcher_parameters,
)
writer_client = context.writer_client
logger.debug(
'Start writing data for query %s via %s writer',
title,
type(writer_client),
)
result = writer_client.write(results, title)
logger.debug(
'Finish writing data for query %s via %s writer',
title,
type(writer_client),
)
writer_clients = context.writer_clients
if not writer_clients:
logger.warning('No writers configured, skipping write operation')
return None
writing_results = []
for writer_client in writer_clients:
logger.debug(
'Start writing data for query %s via %s writer',
title,
type(writer_client),
)
result = writer_client.write(results, title)
logger.debug(
'Finish writing data for query %s via %s writer',
title,
type(writer_client),
)
writing_results.append(result)
logger.info('%s executed successfully', title)
return result
# Return the last writer's result for backward compatibility
return writing_results[-1] if writing_results else None
except Exception as e:
logger.error('%s generated an exception: %s', title, str(e))
raise exceptions.GarfExecutorError(
Expand Down
35 changes: 21 additions & 14 deletions libs/executors/garf_executors/bq_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -103,20 +103,27 @@ def execute(
else:
results = report.GarfReport()
if context.writer and results:
writer_client = context.writer_client
logger.debug(
'Start writing data for query %s via %s writer',
title,
type(writer_client),
)
writing_result = writer_client.write(results, title)
logger.debug(
'Finish writing data for query %s via %s writer',
title,
type(writer_client),
)
logger.info('%s executed successfully', title)
return writing_result
writer_clients = context.writer_clients
if not writer_clients:
logger.warning('No writers configured, skipping write operation')
else:
writing_results = []
for writer_client in writer_clients:
logger.debug(
'Start writing data for query %s via %s writer',
title,
type(writer_client),
)
writing_result = writer_client.write(results, title)
logger.debug(
'Finish writing data for query %s via %s writer',
title,
type(writer_client),
)
writing_results.append(writing_result)
logger.info('%s executed successfully', title)
# Return the last writer's result for backward compatibility
return writing_results[-1] if writing_results else None
return results
except google_cloud_exceptions.GoogleCloudError as e:
raise BigQueryExecutorError(e) from e
Expand Down
41 changes: 36 additions & 5 deletions libs/executors/garf_executors/execution_context.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ class ExecutionContext(pydantic.BaseModel):
Attributes:
query_parameters: Parameters to dynamically change query text.
fetcher_parameters: Parameters to specify fetching setup.
writer: Type of writer to use.
writer: Type of writer to use. Can be a single writer string or list of writers.
writer_parameters: Optional parameters to setup writer.
"""

Expand All @@ -45,7 +45,7 @@ class ExecutionContext(pydantic.BaseModel):
fetcher_parameters: dict[str, str | bool | int | list[str | int]] | None = (
pydantic.Field(default_factory=dict)
)
writer: str | None = None
writer: str | list[str] | None = None
writer_parameters: dict[str, str] | None = pydantic.Field(
default_factory=dict
)
Expand Down Expand Up @@ -75,9 +75,40 @@ def save(self, path: str | pathlib.Path | os.PathLike[str]) -> str:

@property
def writer_client(self) -> abs_writer.AbsWriter:
writer_client = writer.create_writer(self.writer, **self.writer_parameters)
if self.writer == 'bq':
"""Returns single writer client."""
if isinstance(self.writer, list) and len(self.writer) > 0:
writer_type = self.writer[0]
else:
writer_type = self.writer

writer_params = self.writer_parameters or {}

if not writer_type:
raise ValueError('No writer specified')

writer_client = writer.create_writer(writer_type, **writer_params)
if writer_type == 'bq':
_ = writer_client.create_or_get_dataset()
if self.writer == 'sheet':
if writer_type == 'sheet':
writer_client.init_client()
return writer_client

@property
def writer_clients(self) -> list[abs_writer.AbsWriter]:
"""Returns list of writer clients."""
if not self.writer:
return []

# Convert single writer to list for uniform processing
writers_to_use = self.writer if isinstance(self.writer, list) else [self.writer]
writer_params = self.writer_parameters or {}

clients = []
for writer_type in writers_to_use:
writer_client = writer.create_writer(writer_type, **writer_params)
if writer_type == 'bq':
_ = writer_client.create_or_get_dataset()
if writer_type == 'sheet':
writer_client.init_client()
clients.append(writer_client)
return clients
35 changes: 21 additions & 14 deletions libs/executors/garf_executors/sql_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -106,19 +106,26 @@ def execute(
finally:
conn.connection.execute(f'DROP TABLE {temp_table_name}')
if context.writer and results:
writer_client = context.writer_client
logger.debug(
'Start writing data for query %s via %s writer',
title,
type(writer_client),
)
writing_result = writer_client.write(results, title)
logger.debug(
'Finish writing data for query %s via %s writer',
title,
type(writer_client),
)
logger.info('%s executed successfully', title)
return writing_result
writer_clients = context.writer_clients
if not writer_clients:
logger.warning('No writers configured, skipping write operation')
else:
writing_results = []
for writer_client in writer_clients:
logger.debug(
'Start writing data for query %s via %s writer',
title,
type(writer_client),
)
writing_result = writer_client.write(results, title)
logger.debug(
'Finish writing data for query %s via %s writer',
title,
type(writer_client),
)
writing_results.append(writing_result)
logger.info('%s executed successfully', title)
# Return the last writer's result for backward compatibility
return writing_results[-1] if writing_results else None
span.set_attribute('execute.num_results', len(results))
return results
22 changes: 22 additions & 0 deletions libs/executors/tests/unit/test_api_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -85,3 +85,25 @@ def test_from_fetcher_alias_returns_initialized_executor(self, tmp_path):
},
)
assert isinstance(executor.fetcher, fake_fetcher.FakeApiReportFetcher)

def test_execute_with_multiple_writers_saves_to_both(self, executor, tmp_path, capsys):
"""Test that multiple writers (console and json) both execute."""
context = api_executor.ApiExecutionContext(
writer=['console', 'json'],
writer_parameters={'destination_folder': str(tmp_path)},
)
executor.execute(
query=_TEST_QUERY,
title='test',
context=context,
)
# Verify JSON file was created
json_file = pathlib.Path(context.writer_clients[1].destination_folder) / 'test.json'
assert json_file.exists()
with pathlib.Path.open(json_file, 'r', encoding='utf-8') as f:
result = json.load(f)
assert result == _TEST_DATA

# Verify console output was generated
output = capsys.readouterr().out
assert 'showing results' in output and 'test' in output
50 changes: 49 additions & 1 deletion libs/executors/tests/unit/test_execution_context.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
# limitations under the License.

import yaml
import pytest
from garf_core import query_editor
from garf_executors.execution_context import ExecutionContext

Expand Down Expand Up @@ -80,4 +81,51 @@ def test_save_returns_correct_data(self, tmp_path):
context.save(tmp_config)
with open(tmp_config, 'r', encoding='utf-8') as f:
config_data = yaml.safe_load(f)
assert config_data == data
# Check that the data is saved correctly without extra fields
assert config_data['writer'] == data['writer']
assert config_data['writer_parameters'] == data['writer_parameters']

def test_multiple_writers_creates_multiple_clients(self, tmp_path):
context = ExecutionContext(
writer=['console', 'json'],
writer_parameters={'destination_folder': str(tmp_path)},
)
writer_clients = context.writer_clients
assert len(writer_clients) == 2
assert writer_clients[0].__class__.__name__ == 'ConsoleWriter'
assert writer_clients[1].__class__.__name__ == 'JsonWriter'

def test_multiple_writers_without_parameters_creates_empty_dicts(self):
context = ExecutionContext(
writer=['console', 'json'],
)
writer_clients = context.writer_clients
assert len(writer_clients) == 2

def test_backward_compatibility_single_writer_still_works(self, tmp_path):
context = ExecutionContext(
writer='json',
writer_parameters={'destination_folder': str(tmp_path)},
)
# Should work with writer_client property
writer_client = context.writer_client
assert writer_client.__class__.__name__ == 'JsonWriter'
# Should also work with writer_clients property
writer_clients = context.writer_clients
assert len(writer_clients) == 1
assert writer_clients[0].__class__.__name__ == 'JsonWriter'


def test_from_file_with_multiple_writers(self, tmp_path):
tmp_config = tmp_path / 'config.yaml'
data = {
'writer': ['console', 'json'],
'writer_parameters': {
'destination_folder': '/tmp',
},
}
with open(tmp_config, 'w', encoding='utf-8') as f:
yaml.dump(data, f, encoding='utf-8')
context = ExecutionContext.from_file(tmp_config)
assert context.writer == ['console', 'json']
assert len(context.writer_clients) == 2