diff --git a/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery_config.py b/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery_config.py index 1f777feeccf781..29d13da550a0cc 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery_config.py +++ b/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery_config.py @@ -2,10 +2,8 @@ import os import re from datetime import timedelta -from typing import Any, Dict, List, Optional, Union +from typing import Dict, List, Optional, Union -from google.cloud import bigquery, datacatalog_v1, resourcemanager_v3 -from google.cloud.logging_v2.client import Client as GCPLoggingClient from pydantic import Field, PositiveInt, PrivateAttr, root_validator, validator from datahub.configuration.common import AllowDenyPattern, ConfigModel @@ -18,7 +16,9 @@ from datahub.ingestion.glossary.classification_mixin import ( ClassificationSourceConfigMixin, ) -from datahub.ingestion.source.common.gcp_credentials_config import GCPCredential +from datahub.ingestion.source.bigquery_v2.bigquery_connection import ( + BigQueryConnectionConfig, +) from datahub.ingestion.source.data_lake_common.path_spec import PathSpec from datahub.ingestion.source.sql.sql_config import SQLCommonConfig, SQLFilterConfig from datahub.ingestion.source.state.stateful_ingestion_base import ( @@ -105,64 +105,6 @@ class BigQueryUsageConfig(BaseUsageConfig): ) -class BigQueryConnectionConfig(ConfigModel): - credential: Optional[GCPCredential] = Field( - default=None, description="BigQuery credential informations" - ) - - _credentials_path: Optional[str] = PrivateAttr(None) - - extra_client_options: Dict[str, Any] = Field( - default={}, - description="Additional options to pass to google.cloud.logging_v2.client.Client.", - ) - - project_on_behalf: Optional[str] = Field( - default=None, - description="[Advanced] The BigQuery project in which queries are executed. Will be passed when creating a job. If not passed, falls back to the project associated with the service account.", - ) - - def __init__(self, **data: Any): - super().__init__(**data) - - if self.credential: - self._credentials_path = self.credential.create_credential_temp_file() - logger.debug( - f"Creating temporary credential file at {self._credentials_path}" - ) - os.environ["GOOGLE_APPLICATION_CREDENTIALS"] = self._credentials_path - - def get_bigquery_client(self) -> bigquery.Client: - client_options = self.extra_client_options - return bigquery.Client(self.project_on_behalf, **client_options) - - def get_projects_client(self) -> resourcemanager_v3.ProjectsClient: - return resourcemanager_v3.ProjectsClient() - - def get_policy_tag_manager_client(self) -> datacatalog_v1.PolicyTagManagerClient: - return datacatalog_v1.PolicyTagManagerClient() - - def make_gcp_logging_client( - self, project_id: Optional[str] = None - ) -> GCPLoggingClient: - # See https://github.com/googleapis/google-cloud-python/issues/2674 for - # why we disable gRPC here. - client_options = self.extra_client_options.copy() - client_options["_use_grpc"] = False - if project_id is not None: - return GCPLoggingClient(**client_options, project=project_id) - else: - return GCPLoggingClient(**client_options) - - def get_sql_alchemy_url(self) -> str: - if self.project_on_behalf: - return f"bigquery://{self.project_on_behalf}" - # When project_id is not set, we will attempt to detect the project ID - # based on the credentials or environment variables. - # See https://github.com/mxmzdlv/pybigquery#authentication. - return "bigquery://" - - class GcsLineageProviderConfig(ConfigModel): """ Any source that produces gcs lineage from/to Datasets should inherit this class. diff --git a/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery_connection.py b/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery_connection.py new file mode 100644 index 00000000000000..2aede7005206b3 --- /dev/null +++ b/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery_connection.py @@ -0,0 +1,70 @@ +import logging +import os +from typing import Any, Dict, Optional + +from google.cloud import bigquery, datacatalog_v1, resourcemanager_v3 +from google.cloud.logging_v2.client import Client as GCPLoggingClient +from pydantic import Field, PrivateAttr + +from datahub.configuration.common import ConfigModel +from datahub.ingestion.source.common.gcp_credentials_config import GCPCredential + +logger = logging.getLogger(__name__) + + +class BigQueryConnectionConfig(ConfigModel): + credential: Optional[GCPCredential] = Field( + default=None, description="BigQuery credential informations" + ) + + _credentials_path: Optional[str] = PrivateAttr(None) + + extra_client_options: Dict[str, Any] = Field( + default={}, + description="Additional options to pass to google.cloud.logging_v2.client.Client.", + ) + + project_on_behalf: Optional[str] = Field( + default=None, + description="[Advanced] The BigQuery project in which queries are executed. Will be passed when creating a job. If not passed, falls back to the project associated with the service account.", + ) + + def __init__(self, **data: Any): + super().__init__(**data) + + if self.credential: + self._credentials_path = self.credential.create_credential_temp_file() + logger.debug( + f"Creating temporary credential file at {self._credentials_path}" + ) + os.environ["GOOGLE_APPLICATION_CREDENTIALS"] = self._credentials_path + + def get_bigquery_client(self) -> bigquery.Client: + client_options = self.extra_client_options + return bigquery.Client(self.project_on_behalf, **client_options) + + def get_projects_client(self) -> resourcemanager_v3.ProjectsClient: + return resourcemanager_v3.ProjectsClient() + + def get_policy_tag_manager_client(self) -> datacatalog_v1.PolicyTagManagerClient: + return datacatalog_v1.PolicyTagManagerClient() + + def make_gcp_logging_client( + self, project_id: Optional[str] = None + ) -> GCPLoggingClient: + # See https://github.com/googleapis/google-cloud-python/issues/2674 for + # why we disable gRPC here. + client_options = self.extra_client_options.copy() + client_options["_use_grpc"] = False + if project_id is not None: + return GCPLoggingClient(**client_options, project=project_id) + else: + return GCPLoggingClient(**client_options) + + def get_sql_alchemy_url(self) -> str: + if self.project_on_behalf: + return f"bigquery://{self.project_on_behalf}" + # When project_id is not set, we will attempt to detect the project ID + # based on the credentials or environment variables. + # See https://github.com/mxmzdlv/pybigquery#authentication. + return "bigquery://" diff --git a/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery_queries.py b/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery_queries.py index 47f21c9f32353a..779036aab90331 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery_queries.py +++ b/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery_queries.py @@ -10,10 +10,12 @@ from datahub.ingestion.api.source import Source, SourceReport from datahub.ingestion.api.workunit import MetadataWorkUnit from datahub.ingestion.source.bigquery_v2.bigquery_config import ( - BigQueryConnectionConfig, BigQueryFilterConfig, BigQueryIdentifierConfig, ) +from datahub.ingestion.source.bigquery_v2.bigquery_connection import ( + BigQueryConnectionConfig, +) from datahub.ingestion.source.bigquery_v2.bigquery_report import ( BigQueryQueriesExtractorReport, BigQuerySchemaApiPerfReport, diff --git a/metadata-ingestion/src/datahub/ingestion/source/fivetran/config.py b/metadata-ingestion/src/datahub/ingestion/source/fivetran/config.py index b048b6157fbb9f..6d1c2b2a4aa791 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/fivetran/config.py +++ b/metadata-ingestion/src/datahub/ingestion/source/fivetran/config.py @@ -16,7 +16,7 @@ from datahub.configuration.validate_field_rename import pydantic_renamed_field from datahub.emitter.mce_builder import DEFAULT_ENV from datahub.ingestion.api.report import Report -from datahub.ingestion.source.bigquery_v2.bigquery_config import ( +from datahub.ingestion.source.bigquery_v2.bigquery_connection import ( BigQueryConnectionConfig, ) from datahub.ingestion.source.snowflake.snowflake_connection import ( diff --git a/metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_connection.py b/metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_connection.py index 75c5a3e058ed33..dd405caabac929 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_connection.py +++ b/metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_connection.py @@ -28,7 +28,7 @@ OAuthIdentityProvider, ) from datahub.ingestion.source.snowflake.oauth_generator import OAuthTokenGenerator -from datahub.ingestion.source.sql.sql_config import make_sqlalchemy_uri +from datahub.ingestion.source.sql.sqlalchemy_uri import make_sqlalchemy_uri from datahub.utilities.config_clean import ( remove_protocol, remove_suffix, diff --git a/metadata-ingestion/src/datahub/ingestion/source/sql/athena.py b/metadata-ingestion/src/datahub/ingestion/source/sql/athena.py index 88c66439ab86eb..aeab9c494ccebc 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/sql/athena.py +++ b/metadata-ingestion/src/datahub/ingestion/source/sql/athena.py @@ -35,13 +35,14 @@ SQLAlchemySource, register_custom_type, ) -from datahub.ingestion.source.sql.sql_config import SQLCommonConfig, make_sqlalchemy_uri +from datahub.ingestion.source.sql.sql_config import SQLCommonConfig from datahub.ingestion.source.sql.sql_report import SQLSourceReport from datahub.ingestion.source.sql.sql_utils import ( add_table_to_schema_container, gen_database_container, gen_database_key, ) +from datahub.ingestion.source.sql.sqlalchemy_uri import make_sqlalchemy_uri from datahub.metadata.com.linkedin.pegasus2avro.schema import SchemaField from datahub.metadata.schema_classes import MapTypeClass, RecordTypeClass from datahub.utilities.hive_schema_to_avro import get_avro_schema_for_hive_column diff --git a/metadata-ingestion/src/datahub/ingestion/source/sql/hive_metastore.py b/metadata-ingestion/src/datahub/ingestion/source/sql/hive_metastore.py index bc24125b28766c..d4bf95e2e7618a 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/sql/hive_metastore.py +++ b/metadata-ingestion/src/datahub/ingestion/source/sql/hive_metastore.py @@ -36,7 +36,6 @@ from datahub.ingestion.source.sql.sql_config import ( BasicSQLAlchemyConfig, SQLCommonConfig, - make_sqlalchemy_uri, ) from datahub.ingestion.source.sql.sql_utils import ( add_table_to_schema_container, @@ -46,6 +45,7 @@ gen_schema_key, get_domain_wu, ) +from datahub.ingestion.source.sql.sqlalchemy_uri import make_sqlalchemy_uri from datahub.ingestion.source.state.stateful_ingestion_base import JobId from datahub.metadata.com.linkedin.pegasus2avro.common import StatusClass from datahub.metadata.com.linkedin.pegasus2avro.metadata.snapshot import DatasetSnapshot diff --git a/metadata-ingestion/src/datahub/ingestion/source/sql/mssql/source.py b/metadata-ingestion/src/datahub/ingestion/source/sql/mssql/source.py index adcd20408d0d8f..078f1a33c46fb7 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/sql/mssql/source.py +++ b/metadata-ingestion/src/datahub/ingestion/source/sql/mssql/source.py @@ -44,9 +44,9 @@ ) from datahub.ingestion.source.sql.sql_config import ( BasicSQLAlchemyConfig, - make_sqlalchemy_uri, ) from datahub.ingestion.source.sql.sql_report import SQLSourceReport +from datahub.ingestion.source.sql.sqlalchemy_uri import make_sqlalchemy_uri from datahub.ingestion.source.sql.stored_procedures.base import ( generate_procedure_lineage, ) diff --git a/metadata-ingestion/src/datahub/ingestion/source/sql/sql_config.py b/metadata-ingestion/src/datahub/ingestion/source/sql/sql_config.py index 3ead59eed2d39a..691ae26d5465cf 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/sql/sql_config.py +++ b/metadata-ingestion/src/datahub/ingestion/source/sql/sql_config.py @@ -4,7 +4,6 @@ import pydantic from pydantic import Field -from sqlalchemy.engine import URL from datahub.configuration.common import AllowDenyPattern, ConfigModel from datahub.configuration.source_common import ( @@ -20,6 +19,7 @@ ClassificationSourceConfigMixin, ) from datahub.ingestion.source.ge_profiling_config import GEProfilingConfig +from datahub.ingestion.source.sql.sqlalchemy_uri import make_sqlalchemy_uri from datahub.ingestion.source.state.stale_entity_removal_handler import ( StatefulStaleMetadataRemovalConfig, ) @@ -184,36 +184,3 @@ def get_sql_alchemy_url( class BasicSQLAlchemyConfig(SQLAlchemyConnectionConfig, SQLCommonConfig): pass - - -def make_sqlalchemy_uri( - scheme: str, - username: Optional[str], - password: Optional[str], - at: Optional[str], - db: Optional[str], - uri_opts: Optional[Dict[str, Any]] = None, -) -> str: - host: Optional[str] = None - port: Optional[int] = None - if at: - try: - host, port_str = at.rsplit(":", 1) - port = int(port_str) - except ValueError: - host = at - port = None - if uri_opts: - uri_opts = {k: v for k, v in uri_opts.items() if v is not None} - - return str( - URL.create( - drivername=scheme, - username=username, - password=password, - host=host, - port=port, - database=db, - query=uri_opts or {}, - ) - ) diff --git a/metadata-ingestion/src/datahub/ingestion/source/sql/sqlalchemy_uri.py b/metadata-ingestion/src/datahub/ingestion/source/sql/sqlalchemy_uri.py new file mode 100644 index 00000000000000..cd9f48846ad578 --- /dev/null +++ b/metadata-ingestion/src/datahub/ingestion/source/sql/sqlalchemy_uri.py @@ -0,0 +1,36 @@ +from typing import Any, Dict, Optional + +from sqlalchemy.engine import URL + + +def make_sqlalchemy_uri( + scheme: str, + username: Optional[str], + password: Optional[str], + at: Optional[str], + db: Optional[str], + uri_opts: Optional[Dict[str, Any]] = None, +) -> str: + host: Optional[str] = None + port: Optional[int] = None + if at: + try: + host, port_str = at.rsplit(":", 1) + port = int(port_str) + except ValueError: + host = at + port = None + if uri_opts: + uri_opts = {k: v for k, v in uri_opts.items() if v is not None} + + return str( + URL.create( + drivername=scheme, + username=username, + password=password, + host=host, + port=port, + database=db, + query=uri_opts or {}, + ) + ) diff --git a/metadata-ingestion/src/datahub/ingestion/source/sql/two_tier_sql_source.py b/metadata-ingestion/src/datahub/ingestion/source/sql/two_tier_sql_source.py index 98ad2f6027dfdf..fa29e33dc421ab 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/sql/two_tier_sql_source.py +++ b/metadata-ingestion/src/datahub/ingestion/source/sql/two_tier_sql_source.py @@ -14,12 +14,12 @@ from datahub.ingestion.source.sql.sql_common import SQLAlchemySource, logger from datahub.ingestion.source.sql.sql_config import ( BasicSQLAlchemyConfig, - make_sqlalchemy_uri, ) from datahub.ingestion.source.sql.sql_utils import ( add_table_to_schema_container, gen_database_key, ) +from datahub.ingestion.source.sql.sqlalchemy_uri import make_sqlalchemy_uri class TwoTierSQLAlchemyConfig(BasicSQLAlchemyConfig): diff --git a/metadata-ingestion/src/datahub/ingestion/source/unity/config.py b/metadata-ingestion/src/datahub/ingestion/source/unity/config.py index 6c3f7a51294797..9af14d90001a78 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/unity/config.py +++ b/metadata-ingestion/src/datahub/ingestion/source/unity/config.py @@ -17,7 +17,8 @@ from datahub.configuration.validate_field_rename import pydantic_renamed_field from datahub.ingestion.source.ge_data_profiler import DATABRICKS from datahub.ingestion.source.ge_profiling_config import GEProfilingConfig -from datahub.ingestion.source.sql.sql_config import SQLCommonConfig, make_sqlalchemy_uri +from datahub.ingestion.source.sql.sql_config import SQLCommonConfig +from datahub.ingestion.source.sql.sqlalchemy_uri import make_sqlalchemy_uri from datahub.ingestion.source.state.stale_entity_removal_handler import ( StatefulStaleMetadataRemovalConfig, ) diff --git a/metadata-ingestion/tests/performance/databricks/generator.py b/metadata-ingestion/tests/performance/databricks/generator.py index b11771e55b2c9e..b001ef329e4546 100644 --- a/metadata-ingestion/tests/performance/databricks/generator.py +++ b/metadata-ingestion/tests/performance/databricks/generator.py @@ -14,7 +14,7 @@ from performance.databricks.unity_proxy_mock import _convert_column_type from sqlalchemy import create_engine -from datahub.ingestion.source.sql.sql_config import make_sqlalchemy_uri +from datahub.ingestion.source.sql.sqlalchemy_uri import make_sqlalchemy_uri logger = logging.getLogger(__name__) T = TypeVar("T") diff --git a/metadata-ingestion/tests/unit/bigquery/test_bigquery_source.py b/metadata-ingestion/tests/unit/bigquery/test_bigquery_source.py index a61475f68546dd..aa11cbe6c5792b 100644 --- a/metadata-ingestion/tests/unit/bigquery/test_bigquery_source.py +++ b/metadata-ingestion/tests/unit/bigquery/test_bigquery_source.py @@ -20,9 +20,11 @@ BigQueryTableRef, ) from datahub.ingestion.source.bigquery_v2.bigquery_config import ( - BigQueryConnectionConfig, BigQueryV2Config, ) +from datahub.ingestion.source.bigquery_v2.bigquery_connection import ( + BigQueryConnectionConfig, +) from datahub.ingestion.source.bigquery_v2.bigquery_report import BigQueryV2Report from datahub.ingestion.source.bigquery_v2.bigquery_schema import ( BigqueryDataset,