diff --git a/libs/executors/garf_executors/api_executor.py b/libs/executors/garf_executors/api_executor.py index c5f4f87..3297aa6 100644 --- a/libs/executors/garf_executors/api_executor.py +++ b/libs/executors/garf_executors/api_executor.py @@ -35,7 +35,7 @@ class ApiExecutionContext(execution_context.ExecutionContext): """Common context for executing one or more queries.""" - writer: str = 'console' + writer: str | list[str] = 'console' class ApiQueryExecutor(executor.Executor): @@ -94,20 +94,27 @@ def execute( args=context.query_parameters, **context.fetcher_parameters, ) - writer_client = context.writer_client - logger.debug( - 'Start writing data for query %s via %s writer', - title, - type(writer_client), - ) - result = writer_client.write(results, title) - logger.debug( - 'Finish writing data for query %s via %s writer', - title, - type(writer_client), - ) + writer_clients = context.writer_clients + if not writer_clients: + logger.warning('No writers configured, skipping write operation') + return None + writing_results = [] + for writer_client in writer_clients: + logger.debug( + 'Start writing data for query %s via %s writer', + title, + type(writer_client), + ) + result = writer_client.write(results, title) + logger.debug( + 'Finish writing data for query %s via %s writer', + title, + type(writer_client), + ) + writing_results.append(result) logger.info('%s executed successfully', title) - return result + # Return the last writer's result for backward compatibility + return writing_results[-1] if writing_results else None except Exception as e: logger.error('%s generated an exception: %s', title, str(e)) raise exceptions.GarfExecutorError( diff --git a/libs/executors/garf_executors/bq_executor.py b/libs/executors/garf_executors/bq_executor.py index 110e08c..f696b1d 100644 --- a/libs/executors/garf_executors/bq_executor.py +++ b/libs/executors/garf_executors/bq_executor.py @@ -103,20 +103,27 @@ def execute( else: results = report.GarfReport() if context.writer and results: - writer_client = context.writer_client - logger.debug( - 'Start writing data for query %s via %s writer', - title, - type(writer_client), - ) - writing_result = writer_client.write(results, title) - logger.debug( - 'Finish writing data for query %s via %s writer', - title, - type(writer_client), - ) - logger.info('%s executed successfully', title) - return writing_result + writer_clients = context.writer_clients + if not writer_clients: + logger.warning('No writers configured, skipping write operation') + else: + writing_results = [] + for writer_client in writer_clients: + logger.debug( + 'Start writing data for query %s via %s writer', + title, + type(writer_client), + ) + writing_result = writer_client.write(results, title) + logger.debug( + 'Finish writing data for query %s via %s writer', + title, + type(writer_client), + ) + writing_results.append(writing_result) + logger.info('%s executed successfully', title) + # Return the last writer's result for backward compatibility + return writing_results[-1] if writing_results else None return results except google_cloud_exceptions.GoogleCloudError as e: raise BigQueryExecutorError(e) from e diff --git a/libs/executors/garf_executors/execution_context.py b/libs/executors/garf_executors/execution_context.py index 2f2598a..12adbf4 100644 --- a/libs/executors/garf_executors/execution_context.py +++ b/libs/executors/garf_executors/execution_context.py @@ -35,7 +35,7 @@ class ExecutionContext(pydantic.BaseModel): Attributes: query_parameters: Parameters to dynamically change query text. fetcher_parameters: Parameters to specify fetching setup. - writer: Type of writer to use. + writer: Type of writer to use. Can be a single writer string or list of writers. writer_parameters: Optional parameters to setup writer. """ @@ -45,7 +45,7 @@ class ExecutionContext(pydantic.BaseModel): fetcher_parameters: dict[str, str | bool | int | list[str | int]] | None = ( pydantic.Field(default_factory=dict) ) - writer: str | None = None + writer: str | list[str] | None = None writer_parameters: dict[str, str] | None = pydantic.Field( default_factory=dict ) @@ -75,9 +75,40 @@ def save(self, path: str | pathlib.Path | os.PathLike[str]) -> str: @property def writer_client(self) -> abs_writer.AbsWriter: - writer_client = writer.create_writer(self.writer, **self.writer_parameters) - if self.writer == 'bq': + """Returns single writer client.""" + if isinstance(self.writer, list) and len(self.writer) > 0: + writer_type = self.writer[0] + else: + writer_type = self.writer + + writer_params = self.writer_parameters or {} + + if not writer_type: + raise ValueError('No writer specified') + + writer_client = writer.create_writer(writer_type, **writer_params) + if writer_type == 'bq': _ = writer_client.create_or_get_dataset() - if self.writer == 'sheet': + if writer_type == 'sheet': writer_client.init_client() return writer_client + + @property + def writer_clients(self) -> list[abs_writer.AbsWriter]: + """Returns list of writer clients.""" + if not self.writer: + return [] + + # Convert single writer to list for uniform processing + writers_to_use = self.writer if isinstance(self.writer, list) else [self.writer] + writer_params = self.writer_parameters or {} + + clients = [] + for writer_type in writers_to_use: + writer_client = writer.create_writer(writer_type, **writer_params) + if writer_type == 'bq': + _ = writer_client.create_or_get_dataset() + if writer_type == 'sheet': + writer_client.init_client() + clients.append(writer_client) + return clients diff --git a/libs/executors/garf_executors/sql_executor.py b/libs/executors/garf_executors/sql_executor.py index aa7afd9..853dbf2 100644 --- a/libs/executors/garf_executors/sql_executor.py +++ b/libs/executors/garf_executors/sql_executor.py @@ -106,19 +106,26 @@ def execute( finally: conn.connection.execute(f'DROP TABLE {temp_table_name}') if context.writer and results: - writer_client = context.writer_client - logger.debug( - 'Start writing data for query %s via %s writer', - title, - type(writer_client), - ) - writing_result = writer_client.write(results, title) - logger.debug( - 'Finish writing data for query %s via %s writer', - title, - type(writer_client), - ) - logger.info('%s executed successfully', title) - return writing_result + writer_clients = context.writer_clients + if not writer_clients: + logger.warning('No writers configured, skipping write operation') + else: + writing_results = [] + for writer_client in writer_clients: + logger.debug( + 'Start writing data for query %s via %s writer', + title, + type(writer_client), + ) + writing_result = writer_client.write(results, title) + logger.debug( + 'Finish writing data for query %s via %s writer', + title, + type(writer_client), + ) + writing_results.append(writing_result) + logger.info('%s executed successfully', title) + # Return the last writer's result for backward compatibility + return writing_results[-1] if writing_results else None span.set_attribute('execute.num_results', len(results)) return results diff --git a/libs/executors/tests/unit/test_api_executor.py b/libs/executors/tests/unit/test_api_executor.py index 7f27334..0df11cc 100644 --- a/libs/executors/tests/unit/test_api_executor.py +++ b/libs/executors/tests/unit/test_api_executor.py @@ -85,3 +85,25 @@ def test_from_fetcher_alias_returns_initialized_executor(self, tmp_path): }, ) assert isinstance(executor.fetcher, fake_fetcher.FakeApiReportFetcher) + + def test_execute_with_multiple_writers_saves_to_both(self, executor, tmp_path, capsys): + """Test that multiple writers (console and json) both execute.""" + context = api_executor.ApiExecutionContext( + writer=['console', 'json'], + writer_parameters={'destination_folder': str(tmp_path)}, + ) + executor.execute( + query=_TEST_QUERY, + title='test', + context=context, + ) + # Verify JSON file was created + json_file = pathlib.Path(context.writer_clients[1].destination_folder) / 'test.json' + assert json_file.exists() + with pathlib.Path.open(json_file, 'r', encoding='utf-8') as f: + result = json.load(f) + assert result == _TEST_DATA + + # Verify console output was generated + output = capsys.readouterr().out + assert 'showing results' in output and 'test' in output \ No newline at end of file diff --git a/libs/executors/tests/unit/test_execution_context.py b/libs/executors/tests/unit/test_execution_context.py index d56cae9..6f4a51d 100644 --- a/libs/executors/tests/unit/test_execution_context.py +++ b/libs/executors/tests/unit/test_execution_context.py @@ -13,6 +13,7 @@ # limitations under the License. import yaml +import pytest from garf_core import query_editor from garf_executors.execution_context import ExecutionContext @@ -80,4 +81,51 @@ def test_save_returns_correct_data(self, tmp_path): context.save(tmp_config) with open(tmp_config, 'r', encoding='utf-8') as f: config_data = yaml.safe_load(f) - assert config_data == data + # Check that the data is saved correctly without extra fields + assert config_data['writer'] == data['writer'] + assert config_data['writer_parameters'] == data['writer_parameters'] + + def test_multiple_writers_creates_multiple_clients(self, tmp_path): + context = ExecutionContext( + writer=['console', 'json'], + writer_parameters={'destination_folder': str(tmp_path)}, + ) + writer_clients = context.writer_clients + assert len(writer_clients) == 2 + assert writer_clients[0].__class__.__name__ == 'ConsoleWriter' + assert writer_clients[1].__class__.__name__ == 'JsonWriter' + + def test_multiple_writers_without_parameters_creates_empty_dicts(self): + context = ExecutionContext( + writer=['console', 'json'], + ) + writer_clients = context.writer_clients + assert len(writer_clients) == 2 + + def test_backward_compatibility_single_writer_still_works(self, tmp_path): + context = ExecutionContext( + writer='json', + writer_parameters={'destination_folder': str(tmp_path)}, + ) + # Should work with writer_client property + writer_client = context.writer_client + assert writer_client.__class__.__name__ == 'JsonWriter' + # Should also work with writer_clients property + writer_clients = context.writer_clients + assert len(writer_clients) == 1 + assert writer_clients[0].__class__.__name__ == 'JsonWriter' + + + def test_from_file_with_multiple_writers(self, tmp_path): + tmp_config = tmp_path / 'config.yaml' + data = { + 'writer': ['console', 'json'], + 'writer_parameters': { + 'destination_folder': '/tmp', + }, + } + with open(tmp_config, 'w', encoding='utf-8') as f: + yaml.dump(data, f, encoding='utf-8') + context = ExecutionContext.from_file(tmp_config) + assert context.writer == ['console', 'json'] + assert len(context.writer_clients) == 2 \ No newline at end of file