Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(ingest): Deafault urn lowercasing and migration #13038

Open
wants to merge 4 commits into
base: master
Choose a base branch
from

Conversation

treff7es
Copy link
Contributor

Checklist

  • The PR conforms to DataHub's Contributing Guideline (particularly Commit Message Format)
  • Links to related issues (if applicable)
  • Tests for the changes have been added/updated (if applicable)
  • Docs related to the changes have been added/updated (if applicable). If a new feature has been added a Usage Guide has been added for the same.
  • For any breaking change/potential downtime/deprecation/big changes an entry has been made in Updating DataHub

@github-actions github-actions bot added ingestion PR or Issue related to the ingestion of metadata docs Issues and Improvements to docs product PR or Issue related to the DataHub UI/UX devops PR or Issue related to DataHub backend & deployment labels Mar 31, 2025
@datahub-cyborg datahub-cyborg bot added the needs-review Label for PRs that need review from a maintainer. label Mar 31, 2025
Copy link

codecov bot commented Mar 31, 2025

❌ 5 Tests Failed:

Tests completed Failed Passed Skipped
4364 5 4359 73
View the top 3 failed test(s) by shortest run time
tests.integration.kafka-connect.test_kafka_connect::test_kafka_connect_bigquery_sink_ingest
Stack Traces | 0.001s run time
docker_compose_runner = <function docker_compose_runner.<locals>.run at 0x7f1c09b22230>
pytestconfig = <_pytest.config.Config object at 0x7f1c5278f730>
test_resources_dir = PosixPath('.../tests/integration/kafka-connect')

    @pytest.fixture(scope="module")
    def kafka_connect_runner(docker_compose_runner, pytestconfig, test_resources_dir):
        test_resources_dir_kafka = pytestconfig.rootpath / "tests/integration/kafka"
    
        # Share Compose configurations between files and projects
        # https://docs.docker.com/compose/extends/
        docker_compose_file = [
            str(test_resources_dir_kafka / "docker-compose.yml"),
            str(test_resources_dir / "docker-compose.override.yml"),
        ]
        with docker_compose_runner(
            docker_compose_file, "kafka-connect", cleanup=False
        ) as docker_services:
            wait_for_port(
                docker_services,
                "test_mysql",
                3306,
                timeout=120,
                checker=lambda: is_mysql_up("test_mysql", 3306),
            )
    
        with docker_compose_runner(docker_compose_file, "kafka-connect") as docker_services:
            # We sometimes run into issues where the broker fails to come up on the first try because
            # of all the other processes that are running. By running docker compose twice, we can
            # avoid some test flakes. How does this work? The "key" is the same between both
            # calls to the docker_compose_runner and the first one sets cleanup=False.
    
>           wait_for_port(docker_services, "test_broker", 29092, timeout=120)

.../integration/kafka-connect/test_kafka_connect.py:75: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
.../datahub/testing/docker_utils.py:36: in wait_for_port
    docker_services.wait_until_responsive(
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = Services(_docker_compose=DockerComposeExecutor(_compose_command='docker compose', _compose_files=['.../runner/work/d...tegration/kafka-connect/docker-compose.override.yml'], _compose_project_name='pytest5032-kafka-connect'), _services={})
check = <function wait_for_port.<locals>.<lambda> at 0x7f1c09b22690>
timeout = 120, pause = 0.5, clock = <built-in function perf_counter>

    def wait_until_responsive(
        self,
        check: Any,
        timeout: float,
        pause: float,
        clock: Any = timeit.default_timer,
    ) -> None:
        """Wait until a service is responsive."""
    
        ref = clock()
        now = ref
        while (now - ref) < timeout:
            if check():
                return
            time.sleep(pause)
            now = clock()
    
>       raise Exception("Timeout reached while waiting on service!")
E       Exception: Timeout reached while waiting on service!

venv/lib/python3.8.../site-packages/pytest_docker/plugin.py:121: Exception
tests.integration.kafka-connect.test_kafka_connect::test_kafka_connect_ingest_stateful
Stack Traces | 0.001s run time
docker_compose_runner = <function docker_compose_runner.<locals>.run at 0x7f1c09b22230>
pytestconfig = <_pytest.config.Config object at 0x7f1c5278f730>
test_resources_dir = PosixPath('.../tests/integration/kafka-connect')

    @pytest.fixture(scope="module")
    def kafka_connect_runner(docker_compose_runner, pytestconfig, test_resources_dir):
        test_resources_dir_kafka = pytestconfig.rootpath / "tests/integration/kafka"
    
        # Share Compose configurations between files and projects
        # https://docs.docker.com/compose/extends/
        docker_compose_file = [
            str(test_resources_dir_kafka / "docker-compose.yml"),
            str(test_resources_dir / "docker-compose.override.yml"),
        ]
        with docker_compose_runner(
            docker_compose_file, "kafka-connect", cleanup=False
        ) as docker_services:
            wait_for_port(
                docker_services,
                "test_mysql",
                3306,
                timeout=120,
                checker=lambda: is_mysql_up("test_mysql", 3306),
            )
    
        with docker_compose_runner(docker_compose_file, "kafka-connect") as docker_services:
            # We sometimes run into issues where the broker fails to come up on the first try because
            # of all the other processes that are running. By running docker compose twice, we can
            # avoid some test flakes. How does this work? The "key" is the same between both
            # calls to the docker_compose_runner and the first one sets cleanup=False.
    
>           wait_for_port(docker_services, "test_broker", 29092, timeout=120)

.../integration/kafka-connect/test_kafka_connect.py:75: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
.../datahub/testing/docker_utils.py:36: in wait_for_port
    docker_services.wait_until_responsive(
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = Services(_docker_compose=DockerComposeExecutor(_compose_command='docker compose', _compose_files=['.../runner/work/d...tegration/kafka-connect/docker-compose.override.yml'], _compose_project_name='pytest5032-kafka-connect'), _services={})
check = <function wait_for_port.<locals>.<lambda> at 0x7f1c09b22690>
timeout = 120, pause = 0.5, clock = <built-in function perf_counter>

    def wait_until_responsive(
        self,
        check: Any,
        timeout: float,
        pause: float,
        clock: Any = timeit.default_timer,
    ) -> None:
        """Wait until a service is responsive."""
    
        ref = clock()
        now = ref
        while (now - ref) < timeout:
            if check():
                return
            time.sleep(pause)
            now = clock()
    
>       raise Exception("Timeout reached while waiting on service!")
E       Exception: Timeout reached while waiting on service!

venv/lib/python3.8.../site-packages/pytest_docker/plugin.py:121: Exception
tests.integration.kafka-connect.test_kafka_connect::test_kafka_connect_mongosourceconnect_ingest
Stack Traces | 0.001s run time
docker_compose_runner = <function docker_compose_runner.<locals>.run at 0x7f1c09b22230>
pytestconfig = <_pytest.config.Config object at 0x7f1c5278f730>
test_resources_dir = PosixPath('.../tests/integration/kafka-connect')

    @pytest.fixture(scope="module")
    def kafka_connect_runner(docker_compose_runner, pytestconfig, test_resources_dir):
        test_resources_dir_kafka = pytestconfig.rootpath / "tests/integration/kafka"
    
        # Share Compose configurations between files and projects
        # https://docs.docker.com/compose/extends/
        docker_compose_file = [
            str(test_resources_dir_kafka / "docker-compose.yml"),
            str(test_resources_dir / "docker-compose.override.yml"),
        ]
        with docker_compose_runner(
            docker_compose_file, "kafka-connect", cleanup=False
        ) as docker_services:
            wait_for_port(
                docker_services,
                "test_mysql",
                3306,
                timeout=120,
                checker=lambda: is_mysql_up("test_mysql", 3306),
            )
    
        with docker_compose_runner(docker_compose_file, "kafka-connect") as docker_services:
            # We sometimes run into issues where the broker fails to come up on the first try because
            # of all the other processes that are running. By running docker compose twice, we can
            # avoid some test flakes. How does this work? The "key" is the same between both
            # calls to the docker_compose_runner and the first one sets cleanup=False.
    
>           wait_for_port(docker_services, "test_broker", 29092, timeout=120)

.../integration/kafka-connect/test_kafka_connect.py:75: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
.../datahub/testing/docker_utils.py:36: in wait_for_port
    docker_services.wait_until_responsive(
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = Services(_docker_compose=DockerComposeExecutor(_compose_command='docker compose', _compose_files=['.../runner/work/d...tegration/kafka-connect/docker-compose.override.yml'], _compose_project_name='pytest5032-kafka-connect'), _services={})
check = <function wait_for_port.<locals>.<lambda> at 0x7f1c09b22690>
timeout = 120, pause = 0.5, clock = <built-in function perf_counter>

    def wait_until_responsive(
        self,
        check: Any,
        timeout: float,
        pause: float,
        clock: Any = timeit.default_timer,
    ) -> None:
        """Wait until a service is responsive."""
    
        ref = clock()
        now = ref
        while (now - ref) < timeout:
            if check():
                return
            time.sleep(pause)
            now = clock()
    
>       raise Exception("Timeout reached while waiting on service!")
E       Exception: Timeout reached while waiting on service!

venv/lib/python3.8.../site-packages/pytest_docker/plugin.py:121: Exception

To view more test analytics, go to the Test Analytics Dashboard
📋 Got 3 mins? Take this short survey to help us improve Test Analytics.

@sgomezvillamor
Copy link
Contributor

Should this PR change the default value also?

class LowerCaseDatasetUrnConfigMixin(ConfigModel):
convert_urns_to_lowercase: bool = Field(
default=False,
description="Whether to convert dataset urns to lowercase.",
)

@datahub-cyborg datahub-cyborg bot added pending-submitter-response Issue/request has been reviewed but requires a response from the submitter and removed needs-review Label for PRs that need review from a maintainer. labels Apr 2, 2025
@treff7es
Copy link
Contributor Author

treff7es commented Apr 4, 2025

Should this PR change the default value also?

class LowerCaseDatasetUrnConfigMixin(ConfigModel):
convert_urns_to_lowercase: bool = Field(
default=False,
description="Whether to convert dataset urns to lowercase.",
)

The plan is to only enable for SQL sources as it causes issues in SQL parsing if we have mixed case urns.

@datahub-cyborg datahub-cyborg bot added needs-review Label for PRs that need review from a maintainer. and removed pending-submitter-response Issue/request has been reviewed but requires a response from the submitter labels Apr 4, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
devops PR or Issue related to DataHub backend & deployment docs Issues and Improvements to docs ingestion PR or Issue related to the ingestion of metadata needs-review Label for PRs that need review from a maintainer. product PR or Issue related to the DataHub UI/UX
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants