Skip to content

fix(ingest): fix deps for fivetran #13385

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
May 2, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,8 @@
import os
import re
from datetime import timedelta
from typing import Any, Dict, List, Optional, Union
from typing import Dict, List, Optional, Union

from google.cloud import bigquery, datacatalog_v1, resourcemanager_v3
from google.cloud.logging_v2.client import Client as GCPLoggingClient
from pydantic import Field, PositiveInt, PrivateAttr, root_validator, validator

from datahub.configuration.common import AllowDenyPattern, ConfigModel
Expand All @@ -18,7 +16,9 @@
from datahub.ingestion.glossary.classification_mixin import (
ClassificationSourceConfigMixin,
)
from datahub.ingestion.source.common.gcp_credentials_config import GCPCredential
from datahub.ingestion.source.bigquery_v2.bigquery_connection import (
BigQueryConnectionConfig,
)
from datahub.ingestion.source.data_lake_common.path_spec import PathSpec
from datahub.ingestion.source.sql.sql_config import SQLCommonConfig, SQLFilterConfig
from datahub.ingestion.source.state.stateful_ingestion_base import (
Expand Down Expand Up @@ -105,64 +105,6 @@ class BigQueryUsageConfig(BaseUsageConfig):
)


class BigQueryConnectionConfig(ConfigModel):
credential: Optional[GCPCredential] = Field(
default=None, description="BigQuery credential informations"
)

_credentials_path: Optional[str] = PrivateAttr(None)

extra_client_options: Dict[str, Any] = Field(
default={},
description="Additional options to pass to google.cloud.logging_v2.client.Client.",
)

project_on_behalf: Optional[str] = Field(
default=None,
description="[Advanced] The BigQuery project in which queries are executed. Will be passed when creating a job. If not passed, falls back to the project associated with the service account.",
)

def __init__(self, **data: Any):
super().__init__(**data)

if self.credential:
self._credentials_path = self.credential.create_credential_temp_file()
logger.debug(
f"Creating temporary credential file at {self._credentials_path}"
)
os.environ["GOOGLE_APPLICATION_CREDENTIALS"] = self._credentials_path

def get_bigquery_client(self) -> bigquery.Client:
client_options = self.extra_client_options
return bigquery.Client(self.project_on_behalf, **client_options)

def get_projects_client(self) -> resourcemanager_v3.ProjectsClient:
return resourcemanager_v3.ProjectsClient()

def get_policy_tag_manager_client(self) -> datacatalog_v1.PolicyTagManagerClient:
return datacatalog_v1.PolicyTagManagerClient()

def make_gcp_logging_client(
self, project_id: Optional[str] = None
) -> GCPLoggingClient:
# See https://github.com/googleapis/google-cloud-python/issues/2674 for
# why we disable gRPC here.
client_options = self.extra_client_options.copy()
client_options["_use_grpc"] = False
if project_id is not None:
return GCPLoggingClient(**client_options, project=project_id)
else:
return GCPLoggingClient(**client_options)

def get_sql_alchemy_url(self) -> str:
if self.project_on_behalf:
return f"bigquery://{self.project_on_behalf}"
# When project_id is not set, we will attempt to detect the project ID
# based on the credentials or environment variables.
# See https://github.com/mxmzdlv/pybigquery#authentication.
return "bigquery://"


class GcsLineageProviderConfig(ConfigModel):
"""
Any source that produces gcs lineage from/to Datasets should inherit this class.
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
import logging
import os
from typing import Any, Dict, Optional

from google.cloud import bigquery, datacatalog_v1, resourcemanager_v3
from google.cloud.logging_v2.client import Client as GCPLoggingClient
from pydantic import Field, PrivateAttr

from datahub.configuration.common import ConfigModel
from datahub.ingestion.source.common.gcp_credentials_config import GCPCredential

logger = logging.getLogger(__name__)


class BigQueryConnectionConfig(ConfigModel):
credential: Optional[GCPCredential] = Field(
default=None, description="BigQuery credential informations"
)

_credentials_path: Optional[str] = PrivateAttr(None)

extra_client_options: Dict[str, Any] = Field(
default={},
description="Additional options to pass to google.cloud.logging_v2.client.Client.",
)

project_on_behalf: Optional[str] = Field(
default=None,
description="[Advanced] The BigQuery project in which queries are executed. Will be passed when creating a job. If not passed, falls back to the project associated with the service account.",
)

def __init__(self, **data: Any):
super().__init__(**data)

if self.credential:
self._credentials_path = self.credential.create_credential_temp_file()
logger.debug(
f"Creating temporary credential file at {self._credentials_path}"
)
os.environ["GOOGLE_APPLICATION_CREDENTIALS"] = self._credentials_path

def get_bigquery_client(self) -> bigquery.Client:
client_options = self.extra_client_options
return bigquery.Client(self.project_on_behalf, **client_options)

def get_projects_client(self) -> resourcemanager_v3.ProjectsClient:
return resourcemanager_v3.ProjectsClient()

def get_policy_tag_manager_client(self) -> datacatalog_v1.PolicyTagManagerClient:
return datacatalog_v1.PolicyTagManagerClient()

def make_gcp_logging_client(
self, project_id: Optional[str] = None
) -> GCPLoggingClient:
# See https://github.com/googleapis/google-cloud-python/issues/2674 for
# why we disable gRPC here.
client_options = self.extra_client_options.copy()
client_options["_use_grpc"] = False
if project_id is not None:
return GCPLoggingClient(**client_options, project=project_id)
else:
return GCPLoggingClient(**client_options)

def get_sql_alchemy_url(self) -> str:
if self.project_on_behalf:
return f"bigquery://{self.project_on_behalf}"
# When project_id is not set, we will attempt to detect the project ID
# based on the credentials or environment variables.
# See https://github.com/mxmzdlv/pybigquery#authentication.
return "bigquery://"
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,12 @@
from datahub.ingestion.api.source import Source, SourceReport
from datahub.ingestion.api.workunit import MetadataWorkUnit
from datahub.ingestion.source.bigquery_v2.bigquery_config import (
BigQueryConnectionConfig,
BigQueryFilterConfig,
BigQueryIdentifierConfig,
)
from datahub.ingestion.source.bigquery_v2.bigquery_connection import (
BigQueryConnectionConfig,
)
from datahub.ingestion.source.bigquery_v2.bigquery_report import (
BigQueryQueriesExtractorReport,
BigQuerySchemaApiPerfReport,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
from datahub.configuration.validate_field_rename import pydantic_renamed_field
from datahub.emitter.mce_builder import DEFAULT_ENV
from datahub.ingestion.api.report import Report
from datahub.ingestion.source.bigquery_v2.bigquery_config import (
from datahub.ingestion.source.bigquery_v2.bigquery_connection import (
BigQueryConnectionConfig,
)
from datahub.ingestion.source.snowflake.snowflake_connection import (
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
OAuthIdentityProvider,
)
from datahub.ingestion.source.snowflake.oauth_generator import OAuthTokenGenerator
from datahub.ingestion.source.sql.sql_config import make_sqlalchemy_uri
from datahub.ingestion.source.sql.sqlalchemy_uri import make_sqlalchemy_uri
from datahub.utilities.config_clean import (
remove_protocol,
remove_suffix,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,13 +35,14 @@
SQLAlchemySource,
register_custom_type,
)
from datahub.ingestion.source.sql.sql_config import SQLCommonConfig, make_sqlalchemy_uri
from datahub.ingestion.source.sql.sql_config import SQLCommonConfig
from datahub.ingestion.source.sql.sql_report import SQLSourceReport
from datahub.ingestion.source.sql.sql_utils import (
add_table_to_schema_container,
gen_database_container,
gen_database_key,
)
from datahub.ingestion.source.sql.sqlalchemy_uri import make_sqlalchemy_uri
from datahub.metadata.com.linkedin.pegasus2avro.schema import SchemaField
from datahub.metadata.schema_classes import MapTypeClass, RecordTypeClass
from datahub.utilities.hive_schema_to_avro import get_avro_schema_for_hive_column
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@
from datahub.ingestion.source.sql.sql_config import (
BasicSQLAlchemyConfig,
SQLCommonConfig,
make_sqlalchemy_uri,
)
from datahub.ingestion.source.sql.sql_utils import (
add_table_to_schema_container,
Expand All @@ -46,6 +45,7 @@
gen_schema_key,
get_domain_wu,
)
from datahub.ingestion.source.sql.sqlalchemy_uri import make_sqlalchemy_uri
from datahub.ingestion.source.state.stateful_ingestion_base import JobId
from datahub.metadata.com.linkedin.pegasus2avro.common import StatusClass
from datahub.metadata.com.linkedin.pegasus2avro.metadata.snapshot import DatasetSnapshot
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,9 +44,9 @@
)
from datahub.ingestion.source.sql.sql_config import (
BasicSQLAlchemyConfig,
make_sqlalchemy_uri,
)
from datahub.ingestion.source.sql.sql_report import SQLSourceReport
from datahub.ingestion.source.sql.sqlalchemy_uri import make_sqlalchemy_uri
from datahub.ingestion.source.sql.stored_procedures.base import (
generate_procedure_lineage,
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@

import pydantic
from pydantic import Field
from sqlalchemy.engine import URL

from datahub.configuration.common import AllowDenyPattern, ConfigModel
from datahub.configuration.source_common import (
Expand All @@ -20,6 +19,7 @@
ClassificationSourceConfigMixin,
)
from datahub.ingestion.source.ge_profiling_config import GEProfilingConfig
from datahub.ingestion.source.sql.sqlalchemy_uri import make_sqlalchemy_uri
from datahub.ingestion.source.state.stale_entity_removal_handler import (
StatefulStaleMetadataRemovalConfig,
)
Expand Down Expand Up @@ -184,36 +184,3 @@ def get_sql_alchemy_url(

class BasicSQLAlchemyConfig(SQLAlchemyConnectionConfig, SQLCommonConfig):
pass


def make_sqlalchemy_uri(
scheme: str,
username: Optional[str],
password: Optional[str],
at: Optional[str],
db: Optional[str],
uri_opts: Optional[Dict[str, Any]] = None,
) -> str:
host: Optional[str] = None
port: Optional[int] = None
if at:
try:
host, port_str = at.rsplit(":", 1)
port = int(port_str)
except ValueError:
host = at
port = None
if uri_opts:
uri_opts = {k: v for k, v in uri_opts.items() if v is not None}

return str(
URL.create(
drivername=scheme,
username=username,
password=password,
host=host,
port=port,
database=db,
query=uri_opts or {},
)
)
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
from typing import Any, Dict, Optional

from sqlalchemy.engine import URL


def make_sqlalchemy_uri(
scheme: str,
username: Optional[str],
password: Optional[str],
at: Optional[str],
db: Optional[str],
uri_opts: Optional[Dict[str, Any]] = None,
) -> str:
host: Optional[str] = None
port: Optional[int] = None
if at:
try:
host, port_str = at.rsplit(":", 1)
port = int(port_str)
except ValueError:
host = at
port = None
if uri_opts:
uri_opts = {k: v for k, v in uri_opts.items() if v is not None}

return str(
URL.create(
drivername=scheme,
username=username,
password=password,
host=host,
port=port,
database=db,
query=uri_opts or {},
)
)
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,12 @@
from datahub.ingestion.source.sql.sql_common import SQLAlchemySource, logger
from datahub.ingestion.source.sql.sql_config import (
BasicSQLAlchemyConfig,
make_sqlalchemy_uri,
)
from datahub.ingestion.source.sql.sql_utils import (
add_table_to_schema_container,
gen_database_key,
)
from datahub.ingestion.source.sql.sqlalchemy_uri import make_sqlalchemy_uri


class TwoTierSQLAlchemyConfig(BasicSQLAlchemyConfig):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,8 @@
from datahub.configuration.validate_field_rename import pydantic_renamed_field
from datahub.ingestion.source.ge_data_profiler import DATABRICKS
from datahub.ingestion.source.ge_profiling_config import GEProfilingConfig
from datahub.ingestion.source.sql.sql_config import SQLCommonConfig, make_sqlalchemy_uri
from datahub.ingestion.source.sql.sql_config import SQLCommonConfig
from datahub.ingestion.source.sql.sqlalchemy_uri import make_sqlalchemy_uri
from datahub.ingestion.source.state.stale_entity_removal_handler import (
StatefulStaleMetadataRemovalConfig,
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
from performance.databricks.unity_proxy_mock import _convert_column_type
from sqlalchemy import create_engine

from datahub.ingestion.source.sql.sql_config import make_sqlalchemy_uri
from datahub.ingestion.source.sql.sqlalchemy_uri import make_sqlalchemy_uri

logger = logging.getLogger(__name__)
T = TypeVar("T")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,11 @@
BigQueryTableRef,
)
from datahub.ingestion.source.bigquery_v2.bigquery_config import (
BigQueryConnectionConfig,
BigQueryV2Config,
)
from datahub.ingestion.source.bigquery_v2.bigquery_connection import (
BigQueryConnectionConfig,
)
from datahub.ingestion.source.bigquery_v2.bigquery_report import BigQueryV2Report
from datahub.ingestion.source.bigquery_v2.bigquery_schema import (
BigqueryDataset,
Expand Down
Loading