diff --git a/README.md b/README.md index 882e23f39..af30e1673 100644 --- a/README.md +++ b/README.md @@ -594,9 +594,8 @@ The `--config-dir` flag will specify the directory with the YAML files to be exe ### Validation Reports -The result handlers tell DVT where to store the results of each validation. The -tool can write the results of a validation run to Google BigQuery, PostgreSQL -or print to stdout (default). View the schema of the results table [here](https://github.com/GoogleCloudPlatform/professional-services-data-validator/blob/develop/terraform/results_schema.json). +The result handlers tell DVT where to store the results of each validation. The tool can write the results of a validation run to Google BigQuery, PostgreSQL or print to stdout (default). See [result handler setup](https://github.com/GoogleCloudPlatform/professional-services-data-validator/blob/develop/docs/installation.md#result-handler-setup) for pre-requisites. View the schema of the BigQuery results table [here](https://github.com/GoogleCloudPlatform/professional-services-data-validator/blob/develop/terraform/results_schema.json). + To output to BigQuery or PostgreSQL, simply include the `-rh` flag during a validation run including the schema and table name for the results. diff --git a/data_validation/cli_tools.py b/data_validation/cli_tools.py index 9c2779f99..b4f887417 100644 --- a/data_validation/cli_tools.py +++ b/data_validation/cli_tools.py @@ -1308,21 +1308,16 @@ def _get_result_handler(rc_value: str, sa_file=None) -> dict: if config[0] in connections: # We received connection_name.results_table. conn_from_file = get_connection(config[0]) - if conn_from_file[consts.SOURCE_TYPE] == consts.SOURCE_TYPE_BIGQUERY: + if conn_from_file[consts.SOURCE_TYPE] in [ + consts.SOURCE_TYPE_BIGQUERY, + consts.SOURCE_TYPE_POSTGRES, + ]: result_handler = { consts.RH_TYPE: conn_from_file[consts.SOURCE_TYPE], - consts.PROJECT_ID: conn_from_file["project_id"], consts.TABLE_ID: config[1], - consts.API_ENDPOINT: conn_from_file.get("api_endpoint", None), - consts.STORAGE_API_ENDPOINT: conn_from_file.get( - "storage_api_endpoint", None - ), - } - elif conn_from_file[consts.SOURCE_TYPE] == consts.SOURCE_TYPE_POSTGRES: - result_handler = { - consts.RH_TYPE: conn_from_file[consts.SOURCE_TYPE], - consts.TABLE_ID: config[1], - consts.RH_CONN: conn_from_file, + # Only store the connection name in the result handler to avoid accidentally + # storing credentials in config files. + consts.RH_CONN: config[0], } # TODO Add filesytem handler too. else: diff --git a/data_validation/result_handlers/base_backend.py b/data_validation/result_handlers/base_backend.py index f2ae40e50..ae16b81cf 100644 --- a/data_validation/result_handlers/base_backend.py +++ b/data_validation/result_handlers/base_backend.py @@ -13,7 +13,7 @@ # limitations under the License. import logging -from typing import TYPE_CHECKING +from typing import Optional, TYPE_CHECKING import ibis @@ -58,7 +58,7 @@ class BaseBackendResultHandler: """Write results of data validation to a backend.""" _table_id: str = None - _status_list: list = None + _status_list: Optional[list] = None _text_format: str = None def _filter_by_status_list(self, result_df: "DataFrame") -> "DataFrame": diff --git a/data_validation/result_handlers/bigquery.py b/data_validation/result_handlers/bigquery.py index b00a6310c..1f43fc08e 100644 --- a/data_validation/result_handlers/bigquery.py +++ b/data_validation/result_handlers/bigquery.py @@ -15,6 +15,10 @@ """Output validation report to BigQuery tables""" import logging +from typing import Optional + +import google.oauth2.service_account + from data_validation import clients, consts, exceptions, util from data_validation.result_handlers.base_backend import BaseBackendResultHandler @@ -24,6 +28,14 @@ BQRH_NO_WRITE_MESSAGE = "No results to write to BigQuery" +def credentials_from_key_path(sa_key_path): + if not sa_key_path: + return None + return google.oauth2.service_account.Credentials.from_service_account_file( + sa_key_path + ) + + class BigQueryResultHandler(BaseBackendResultHandler): """Write results of data validation to BigQuery. @@ -38,7 +50,7 @@ class BigQueryResultHandler(BaseBackendResultHandler): def __init__( self, bigquery_client, - status_list: list = None, + status_list: Optional[list] = None, table_id: str = "pso_data_validator.results", text_format: str = consts.FORMAT_TYPE_TABLE, ): @@ -50,10 +62,10 @@ def __init__( @staticmethod def get_handler_for_project( project_id, - status_list=None, + status_list: Optional[list] = None, table_id: str = "pso_data_validator.results", - credentials=None, - api_endpoint: str = None, + sa_key_path: Optional[str] = None, + api_endpoint: Optional[str] = None, text_format: str = consts.FORMAT_TYPE_TABLE, ): """Return BigQueryResultHandler instance for given project. @@ -70,6 +82,39 @@ def get_handler_for_project( This allows the user to influence the text results written via logger.debug. See: https://github.com/GoogleCloudPlatform/professional-services-data-validator/issues/871 """ + + credentials = credentials_from_key_path(sa_key_path) + client = clients.get_google_bigquery_client( + project_id, credentials=credentials, api_endpoint=api_endpoint + ) + return BigQueryResultHandler( + client, + status_list=status_list, + table_id=table_id, + text_format=text_format, + ) + + @staticmethod + def get_handler_for_connection( + connection_config: dict, + status_list: Optional[list] = None, + table_id: str = "pso_data_validator.results", + text_format: str = consts.FORMAT_TYPE_TABLE, + ): + """Return BigQueryResultHandler instance for given connection config. + + Args: + table_id (str): Table ID used for validation results. + status_list (list): provided status to filter the results with + text_format (str, optional): + This allows the user to influence the text results written via logger.debug. + See: https://github.com/GoogleCloudPlatform/professional-services-data-validator/issues/871 + """ + project_id = connection_config[consts.PROJECT_ID] + credentials = credentials_from_key_path( + connection_config.get(consts.GOOGLE_SERVICE_ACCOUNT_KEY_PATH) + ) + api_endpoint = connection_config.get(consts.API_ENDPOINT) client = clients.get_google_bigquery_client( project_id, credentials=credentials, api_endpoint=api_endpoint ) diff --git a/data_validation/result_handlers/factory.py b/data_validation/result_handlers/factory.py index 5ad79950f..7c4b6981b 100644 --- a/data_validation/result_handlers/factory.py +++ b/data_validation/result_handlers/factory.py @@ -14,11 +14,9 @@ """Build a result handler object.""" -from typing import TYPE_CHECKING +from typing import TYPE_CHECKING, Optional -import google.oauth2.service_account - -from data_validation import consts, exceptions +from data_validation import consts, exceptions, state_manager from data_validation.result_handlers.bigquery import BigQueryResultHandler from data_validation.result_handlers.postgres import PostgresResultHandler from data_validation.result_handlers.text import TextResultHandler @@ -31,7 +29,7 @@ def build_result_handler( result_handler_config: dict, validation_type: str, filter_status: list, - text_format: str = None, + text_format: Optional[str] = None, ) -> "BaseBackendResultHandler": """Return a result handler object based on supplied args.""" text_format = text_format or consts.FORMAT_TYPE_TABLE @@ -48,33 +46,40 @@ def build_result_handler( ) result_type = result_handler_config[consts.RH_TYPE] - if result_type == consts.SOURCE_TYPE_BIGQUERY: - project_id = result_handler_config[consts.PROJECT_ID] - table_id = result_handler_config[consts.TABLE_ID] - key_path = result_handler_config.get(consts.GOOGLE_SERVICE_ACCOUNT_KEY_PATH) - if key_path: - credentials = ( - google.oauth2.service_account.Credentials.from_service_account_file( - key_path - ) + table_id = result_handler_config[consts.TABLE_ID] + + if consts.RH_CONN in result_handler_config and result_type in [ + consts.SOURCE_TYPE_BIGQUERY, + consts.SOURCE_TYPE_POSTGRES, + ]: + mgr = state_manager.StateManager() + conn_from_file = mgr.get_connection_config( + result_handler_config[consts.RH_CONN] + ) + if result_type == consts.SOURCE_TYPE_BIGQUERY: + return BigQueryResultHandler.get_handler_for_connection( + conn_from_file, + status_list=filter_status, + table_id=table_id, + text_format=text_format, ) - else: - credentials = None - api_endpoint = result_handler_config.get(consts.API_ENDPOINT) + elif result_type == consts.SOURCE_TYPE_POSTGRES: + return PostgresResultHandler.get_handler_for_connection( + conn_from_file, + status_list=filter_status, + table_id=table_id, + text_format=text_format, + ) + elif result_type == consts.SOURCE_TYPE_BIGQUERY: + # Legacy BigQuery format. return BigQueryResultHandler.get_handler_for_project( - project_id, - filter_status, - table_id=table_id, - credentials=credentials, - api_endpoint=api_endpoint, - text_format=text_format, - ) - elif result_type == consts.SOURCE_TYPE_POSTGRES: - table_id = result_handler_config[consts.TABLE_ID] - return PostgresResultHandler.get_handler_for_connection( - result_handler_config[consts.RH_CONN], - filter_status, + result_handler_config[consts.PROJECT_ID], + status_list=filter_status, table_id=table_id, + sa_key_path=result_handler_config.get( + consts.GOOGLE_SERVICE_ACCOUNT_KEY_PATH + ), + api_endpoint=result_handler_config.get(consts.API_ENDPOINT), text_format=text_format, ) else: diff --git a/data_validation/result_handlers/postgres.py b/data_validation/result_handlers/postgres.py index 22b6b910e..39af18b97 100644 --- a/data_validation/result_handlers/postgres.py +++ b/data_validation/result_handlers/postgres.py @@ -15,7 +15,7 @@ """Output validation report to PostgreSQL table""" import logging -from typing import Iterable, TYPE_CHECKING +from typing import Iterable, Optional, TYPE_CHECKING import numpy import sqlalchemy @@ -75,7 +75,7 @@ class PostgresResultHandler(BaseBackendResultHandler): def __init__( self, client: "BaseBackend", - status_list: list = None, + status_list: Optional[list] = None, table_id: str = "pso_data_validator.results", text_format: str = consts.FORMAT_TYPE_TABLE, ): @@ -87,7 +87,7 @@ def __init__( @staticmethod def get_handler_for_connection( connection_config: dict, - status_list=None, + status_list: Optional[list] = None, table_id: str = "pso_data_validator.results", text_format: str = consts.FORMAT_TYPE_TABLE, ): diff --git a/docs/installation.md b/docs/installation.md index b927915c7..96fdd4573 100644 --- a/docs/installation.md +++ b/docs/installation.md @@ -68,6 +68,8 @@ In order to allow the data validation tool to write to a BigQuery table, users n - A Google Cloud Platform project with the BigQuery API enabled. - A Google user account with appropriate permissions. If you plan to run this tool in production, it's recommended that you create a service account specifically for running the tool. See our [guide](https://cloud.google.com/docs/authentication/production) on how to authenticate with your service account. If you are using a service account, you need to grant your service account appropriate roles on your project so that it has permissions to create and read resources. +Note that DVT uses the BigQuery streaming API to write results to BigQuery, this is because of limits on the number of mutations per table per day. + Clone the repository onto your machine and navigate inside the directory: ``` diff --git a/tests/unit/result_handlers/test_factory.py b/tests/unit/result_handlers/test_factory.py index fd6b38f1f..42c7665f6 100644 --- a/tests/unit/result_handlers/test_factory.py +++ b/tests/unit/result_handlers/test_factory.py @@ -12,6 +12,7 @@ # See the License for the specific language governing permissions and # limitations under the License. +import os from unittest import mock import pytest @@ -32,14 +33,7 @@ PG_CONFIG = { consts.RH_TYPE: consts.SOURCE_TYPE_POSTGRES, consts.TABLE_ID: "schema.table", - consts.RH_CONN: { - "host": "localhost", - "port": "5432", - "user": "dvt_u", - "password": "dvt_p", - "database": "postgres", - consts.SOURCE_TYPE: consts.SOURCE_TYPE_POSTGRES, - }, + consts.RH_CONN: "some_test_pg_rh", } IMPALA_CONFIG = { @@ -74,7 +68,7 @@ def test_build_result_handler_default(module_under_test): assert handler.status_list == filter_status -def test_build_result_handler_bigquery(module_under_test): +def test_build_result_handler_bigquery_legacy(module_under_test): config = BQ_CONFIG filter_status = ["fail"] handler = module_under_test.build_result_handler( @@ -90,7 +84,14 @@ def test_build_result_handler_bigquery(module_under_test): "data_validation.clients.CLIENT_LOOKUP", {consts.SOURCE_TYPE_POSTGRES: mock.Mock()}, ) -def test_build_result_handler_postgres(module_under_test): +def test_build_result_handler_postgres(fs, module_under_test): + pg_conn = '{"host": "localhost","port": "5432","user": "dvt_u","password": "dvt_p","database": "postgres", "source_type": "Postgres"}' + fs.create_file( + f"{consts.DEFAULT_ENV_DIRECTORY}/some_test_pg_rh.connection.json".replace( + "~", os.environ["HOME"] + ), + contents=pg_conn, + ) config = PG_CONFIG filter_status = ["fail"] handler = module_under_test.build_result_handler( diff --git a/tests/unit/test_cli_tools.py b/tests/unit/test_cli_tools.py index 80145b581..2c0f65d09 100644 --- a/tests/unit/test_cli_tools.py +++ b/tests/unit/test_cli_tools.py @@ -627,18 +627,19 @@ def test_get_result_handler_by_conn_file(fs): res = cli_tools._get_result_handler(f"{args.connection_name}.dataset.table") assert res == { consts.RH_TYPE: consts.SOURCE_TYPE_BIGQUERY, - consts.PROJECT_ID: args.project_id, consts.TABLE_ID: "dataset.table", - consts.API_ENDPOINT: args.api_endpoint, - consts.STORAGE_API_ENDPOINT: args.storage_api_endpoint, + consts.RH_CONN: args.connection_name, } # Plus check standard format still works. - res = cli_tools._get_result_handler("project.dataset.table") + res = cli_tools._get_result_handler( + "project.dataset.table", sa_file="/tmp/some-key.json" + ) assert res == { consts.RH_TYPE: consts.SOURCE_TYPE_BIGQUERY, consts.PROJECT_ID: "project", consts.TABLE_ID: "dataset.table", + consts.GOOGLE_SERVICE_ACCOUNT_KEY_PATH: "/tmp/some-key.json", }