Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 2 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -594,9 +594,8 @@ The `--config-dir` flag will specify the directory with the YAML files to be exe

### Validation Reports

The result handlers tell DVT where to store the results of each validation. The
tool can write the results of a validation run to Google BigQuery, PostgreSQL
or print to stdout (default). View the schema of the results table [here](https://github.com/GoogleCloudPlatform/professional-services-data-validator/blob/develop/terraform/results_schema.json).
The result handlers tell DVT where to store the results of each validation. The tool can write the results of a validation run to Google BigQuery, PostgreSQL or print to stdout (default). See [result handler setup](https://github.com/GoogleCloudPlatform/professional-services-data-validator/blob/develop/docs/installation.md#result-handler-setup) for pre-requisites. View the schema of the BigQuery results table [here](https://github.com/GoogleCloudPlatform/professional-services-data-validator/blob/develop/terraform/results_schema.json).


To output to BigQuery or PostgreSQL, simply include the `-rh` flag during a validation run including
the schema and table name for the results.
Expand Down
19 changes: 7 additions & 12 deletions data_validation/cli_tools.py
Original file line number Diff line number Diff line change
Expand Up @@ -1308,21 +1308,16 @@ def _get_result_handler(rc_value: str, sa_file=None) -> dict:
if config[0] in connections:
# We received connection_name.results_table.
conn_from_file = get_connection(config[0])
if conn_from_file[consts.SOURCE_TYPE] == consts.SOURCE_TYPE_BIGQUERY:
if conn_from_file[consts.SOURCE_TYPE] in [
consts.SOURCE_TYPE_BIGQUERY,
consts.SOURCE_TYPE_POSTGRES,
]:
result_handler = {
consts.RH_TYPE: conn_from_file[consts.SOURCE_TYPE],
consts.PROJECT_ID: conn_from_file["project_id"],
consts.TABLE_ID: config[1],
consts.API_ENDPOINT: conn_from_file.get("api_endpoint", None),
consts.STORAGE_API_ENDPOINT: conn_from_file.get(
"storage_api_endpoint", None
),
}
elif conn_from_file[consts.SOURCE_TYPE] == consts.SOURCE_TYPE_POSTGRES:
result_handler = {
consts.RH_TYPE: conn_from_file[consts.SOURCE_TYPE],
consts.TABLE_ID: config[1],
consts.RH_CONN: conn_from_file,
# Only store the connection name in the result handler to avoid accidentally
# storing credentials in config files.
consts.RH_CONN: config[0],
}
# TODO Add filesytem handler too.
else:
Expand Down
4 changes: 2 additions & 2 deletions data_validation/result_handlers/base_backend.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
# limitations under the License.

import logging
from typing import TYPE_CHECKING
from typing import Optional, TYPE_CHECKING

import ibis

Expand Down Expand Up @@ -58,7 +58,7 @@ class BaseBackendResultHandler:
"""Write results of data validation to a backend."""

_table_id: str = None
_status_list: list = None
_status_list: Optional[list] = None
_text_format: str = None

def _filter_by_status_list(self, result_df: "DataFrame") -> "DataFrame":
Expand Down
53 changes: 49 additions & 4 deletions data_validation/result_handlers/bigquery.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,10 @@
"""Output validation report to BigQuery tables"""

import logging
from typing import Optional

import google.oauth2.service_account


from data_validation import clients, consts, exceptions, util
from data_validation.result_handlers.base_backend import BaseBackendResultHandler
Expand All @@ -24,6 +28,14 @@
BQRH_NO_WRITE_MESSAGE = "No results to write to BigQuery"


def credentials_from_key_path(sa_key_path):
if not sa_key_path:
return None
return google.oauth2.service_account.Credentials.from_service_account_file(
sa_key_path
)


class BigQueryResultHandler(BaseBackendResultHandler):
"""Write results of data validation to BigQuery.

Expand All @@ -38,7 +50,7 @@ class BigQueryResultHandler(BaseBackendResultHandler):
def __init__(
self,
bigquery_client,
status_list: list = None,
status_list: Optional[list] = None,
table_id: str = "pso_data_validator.results",
text_format: str = consts.FORMAT_TYPE_TABLE,
):
Expand All @@ -50,10 +62,10 @@ def __init__(
@staticmethod
def get_handler_for_project(
project_id,
status_list=None,
status_list: Optional[list] = None,
table_id: str = "pso_data_validator.results",
credentials=None,
api_endpoint: str = None,
sa_key_path: Optional[str] = None,
api_endpoint: Optional[str] = None,
text_format: str = consts.FORMAT_TYPE_TABLE,
):
"""Return BigQueryResultHandler instance for given project.
Expand All @@ -70,6 +82,39 @@ def get_handler_for_project(
This allows the user to influence the text results written via logger.debug.
See: https://github.com/GoogleCloudPlatform/professional-services-data-validator/issues/871
"""

credentials = credentials_from_key_path(sa_key_path)
client = clients.get_google_bigquery_client(
project_id, credentials=credentials, api_endpoint=api_endpoint
)
return BigQueryResultHandler(
client,
status_list=status_list,
table_id=table_id,
text_format=text_format,
)

@staticmethod
def get_handler_for_connection(
connection_config: dict,
status_list: Optional[list] = None,
table_id: str = "pso_data_validator.results",
text_format: str = consts.FORMAT_TYPE_TABLE,
):
"""Return BigQueryResultHandler instance for given connection config.

Args:
table_id (str): Table ID used for validation results.
status_list (list): provided status to filter the results with
text_format (str, optional):
This allows the user to influence the text results written via logger.debug.
See: https://github.com/GoogleCloudPlatform/professional-services-data-validator/issues/871
"""
project_id = connection_config[consts.PROJECT_ID]
credentials = credentials_from_key_path(
connection_config.get(consts.GOOGLE_SERVICE_ACCOUNT_KEY_PATH)
)
api_endpoint = connection_config.get(consts.API_ENDPOINT)
client = clients.get_google_bigquery_client(
project_id, credentials=credentials, api_endpoint=api_endpoint
)
Expand Down
63 changes: 34 additions & 29 deletions data_validation/result_handlers/factory.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,9 @@

"""Build a result handler object."""

from typing import TYPE_CHECKING
from typing import TYPE_CHECKING, Optional

import google.oauth2.service_account

from data_validation import consts, exceptions
from data_validation import consts, exceptions, state_manager
from data_validation.result_handlers.bigquery import BigQueryResultHandler
from data_validation.result_handlers.postgres import PostgresResultHandler
from data_validation.result_handlers.text import TextResultHandler
Expand All @@ -31,7 +29,7 @@ def build_result_handler(
result_handler_config: dict,
validation_type: str,
filter_status: list,
text_format: str = None,
text_format: Optional[str] = None,
) -> "BaseBackendResultHandler":
"""Return a result handler object based on supplied args."""
text_format = text_format or consts.FORMAT_TYPE_TABLE
Expand All @@ -48,33 +46,40 @@ def build_result_handler(
)

result_type = result_handler_config[consts.RH_TYPE]
if result_type == consts.SOURCE_TYPE_BIGQUERY:
project_id = result_handler_config[consts.PROJECT_ID]
table_id = result_handler_config[consts.TABLE_ID]
key_path = result_handler_config.get(consts.GOOGLE_SERVICE_ACCOUNT_KEY_PATH)
if key_path:
credentials = (
google.oauth2.service_account.Credentials.from_service_account_file(
key_path
)
table_id = result_handler_config[consts.TABLE_ID]

if consts.RH_CONN in result_handler_config and result_type in [
consts.SOURCE_TYPE_BIGQUERY,
consts.SOURCE_TYPE_POSTGRES,
]:
mgr = state_manager.StateManager()
conn_from_file = mgr.get_connection_config(
result_handler_config[consts.RH_CONN]
)
if result_type == consts.SOURCE_TYPE_BIGQUERY:
return BigQueryResultHandler.get_handler_for_connection(
conn_from_file,
status_list=filter_status,
table_id=table_id,
text_format=text_format,
)
else:
credentials = None
api_endpoint = result_handler_config.get(consts.API_ENDPOINT)
elif result_type == consts.SOURCE_TYPE_POSTGRES:
return PostgresResultHandler.get_handler_for_connection(
conn_from_file,
status_list=filter_status,
table_id=table_id,
text_format=text_format,
)
elif result_type == consts.SOURCE_TYPE_BIGQUERY:
# Legacy BigQuery format.
return BigQueryResultHandler.get_handler_for_project(
project_id,
filter_status,
table_id=table_id,
credentials=credentials,
api_endpoint=api_endpoint,
text_format=text_format,
)
elif result_type == consts.SOURCE_TYPE_POSTGRES:
table_id = result_handler_config[consts.TABLE_ID]
return PostgresResultHandler.get_handler_for_connection(
result_handler_config[consts.RH_CONN],
filter_status,
result_handler_config[consts.PROJECT_ID],
status_list=filter_status,
table_id=table_id,
sa_key_path=result_handler_config.get(
consts.GOOGLE_SERVICE_ACCOUNT_KEY_PATH
),
api_endpoint=result_handler_config.get(consts.API_ENDPOINT),
text_format=text_format,
)
else:
Expand Down
6 changes: 3 additions & 3 deletions data_validation/result_handlers/postgres.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
"""Output validation report to PostgreSQL table"""

import logging
from typing import Iterable, TYPE_CHECKING
from typing import Iterable, Optional, TYPE_CHECKING

import numpy
import sqlalchemy
Expand Down Expand Up @@ -75,7 +75,7 @@ class PostgresResultHandler(BaseBackendResultHandler):
def __init__(
self,
client: "BaseBackend",
status_list: list = None,
status_list: Optional[list] = None,
table_id: str = "pso_data_validator.results",
text_format: str = consts.FORMAT_TYPE_TABLE,
):
Expand All @@ -87,7 +87,7 @@ def __init__(
@staticmethod
def get_handler_for_connection(
connection_config: dict,
status_list=None,
status_list: Optional[list] = None,
table_id: str = "pso_data_validator.results",
text_format: str = consts.FORMAT_TYPE_TABLE,
):
Expand Down
2 changes: 2 additions & 0 deletions docs/installation.md
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,8 @@ In order to allow the data validation tool to write to a BigQuery table, users n
- A Google Cloud Platform project with the BigQuery API enabled.
- A Google user account with appropriate permissions. If you plan to run this tool in production, it's recommended that you create a service account specifically for running the tool. See our [guide](https://cloud.google.com/docs/authentication/production) on how to authenticate with your service account. If you are using a service account, you need to grant your service account appropriate roles on your project so that it has permissions to create and read resources.

Note that DVT uses the BigQuery streaming API to write results to BigQuery, this is because of limits on the number of mutations per table per day.

Clone the repository onto your machine and navigate inside the directory:

```
Expand Down
21 changes: 11 additions & 10 deletions tests/unit/result_handlers/test_factory.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.

import os
from unittest import mock

import pytest
Expand All @@ -32,14 +33,7 @@
PG_CONFIG = {
consts.RH_TYPE: consts.SOURCE_TYPE_POSTGRES,
consts.TABLE_ID: "schema.table",
consts.RH_CONN: {
"host": "localhost",
"port": "5432",
"user": "dvt_u",
"password": "dvt_p",
"database": "postgres",
consts.SOURCE_TYPE: consts.SOURCE_TYPE_POSTGRES,
},
consts.RH_CONN: "some_test_pg_rh",
}

IMPALA_CONFIG = {
Expand Down Expand Up @@ -74,7 +68,7 @@ def test_build_result_handler_default(module_under_test):
assert handler.status_list == filter_status


def test_build_result_handler_bigquery(module_under_test):
def test_build_result_handler_bigquery_legacy(module_under_test):
config = BQ_CONFIG
filter_status = ["fail"]
handler = module_under_test.build_result_handler(
Expand All @@ -90,7 +84,14 @@ def test_build_result_handler_bigquery(module_under_test):
"data_validation.clients.CLIENT_LOOKUP",
{consts.SOURCE_TYPE_POSTGRES: mock.Mock()},
)
def test_build_result_handler_postgres(module_under_test):
def test_build_result_handler_postgres(fs, module_under_test):
pg_conn = '{"host": "localhost","port": "5432","user": "dvt_u","password": "dvt_p","database": "postgres", "source_type": "Postgres"}'
fs.create_file(
f"{consts.DEFAULT_ENV_DIRECTORY}/some_test_pg_rh.connection.json".replace(
"~", os.environ["HOME"]
),
contents=pg_conn,
)
config = PG_CONFIG
filter_status = ["fail"]
handler = module_under_test.build_result_handler(
Expand Down
9 changes: 5 additions & 4 deletions tests/unit/test_cli_tools.py
Original file line number Diff line number Diff line change
Expand Up @@ -627,18 +627,19 @@ def test_get_result_handler_by_conn_file(fs):
res = cli_tools._get_result_handler(f"{args.connection_name}.dataset.table")
assert res == {
consts.RH_TYPE: consts.SOURCE_TYPE_BIGQUERY,
consts.PROJECT_ID: args.project_id,
consts.TABLE_ID: "dataset.table",
consts.API_ENDPOINT: args.api_endpoint,
consts.STORAGE_API_ENDPOINT: args.storage_api_endpoint,
consts.RH_CONN: args.connection_name,
}

# Plus check standard format still works.
res = cli_tools._get_result_handler("project.dataset.table")
res = cli_tools._get_result_handler(
"project.dataset.table", sa_file="/tmp/some-key.json"
)
assert res == {
consts.RH_TYPE: consts.SOURCE_TYPE_BIGQUERY,
consts.PROJECT_ID: "project",
consts.TABLE_ID: "dataset.table",
consts.GOOGLE_SERVICE_ACCOUNT_KEY_PATH: "/tmp/some-key.json",
}


Expand Down