Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
from typing import Optional

from sqlalchemy.engine import Engine
from sqlalchemy.exc import ProgrammingError
from sqlalchemy.sql import text

from metadata.generated.schema.entity.automations.workflow import (
Expand All @@ -41,13 +42,17 @@
)
from metadata.ingestion.ometa.ometa_api import OpenMetadata
from metadata.ingestion.source.connections_utils import kill_active_connections
from metadata.ingestion.source.database.redshift.models import RedshiftInstanceType
from metadata.ingestion.source.database.redshift.queries import (
REDSHIFT_GET_ALL_RELATIONS,
REDSHIFT_GET_DATABASE_NAMES,
REDSHIFT_TEST_GET_QUERIES,
REDSHIFT_TEST_GET_QUERIES_MAP,
REDSHIFT_TEST_PARTITION_DETAILS,
)
from metadata.utils.constants import THREE_MIN
from metadata.utils.logger import ingestion_logger

logger = ingestion_logger()


def get_connection(connection: RedshiftConnection) -> Engine:
Expand All @@ -61,6 +66,42 @@ def get_connection(connection: RedshiftConnection) -> Engine:
)


def get_redshift_instance_type(engine: Engine) -> RedshiftInstanceType:
"""
Detect whether the connected Amazon Redshift deployment is Provisioned
or Serverless by probing for STL system table availability.

Serverless deployments do not have access to STL_* system tables due to
their architecture. Use SYS_* views instead for Serverless compatibility.

Reference: https://docs.aws.amazon.com/redshift/latest/dg/cm_chap_system-tables.html#sys_view_migration-use_cases

Args:
engine (Engine): SQLAlchemy engine connected to a Redshift endpoint.

Returns:
RedshiftInstanceType: PROVISIONED if STL tables are accessible,
SERVERLESS otherwise.
"""
probe_query = text("SELECT 1 FROM pg_catalog.stl_query LIMIT 1")

try:
with engine.connect() as conn:
conn.execute(probe_query)

logger.info(
"Redshift instance type detected: PROVISIONED (STL tables accessible)"
)
return RedshiftInstanceType.PROVISIONED

except ProgrammingError:
logger.info(
"Redshift instance type detected: SERVERLESS "
"(STL tables not accessible, will use SYS_* views)"
)
return RedshiftInstanceType.SERVERLESS


def test_connection(
metadata: OpenMetadata,
engine: Engine,
Expand All @@ -80,12 +121,27 @@ def test_connection(

def test_get_queries_permissions(engine_: Engine):
"""Check if we have the right permissions to list queries"""
redshift_instance_type = get_redshift_instance_type(engine_)

with engine_.connect() as conn:
res = conn.execute(REDSHIFT_TEST_GET_QUERIES).fetchone()
if not all(res):
raise SourceConnectionException(
f"We don't have the right permissions to list queries - {res}"
)
if redshift_instance_type == RedshiftInstanceType.PROVISIONED:
res = conn.execute(
REDSHIFT_TEST_GET_QUERIES_MAP[RedshiftInstanceType.PROVISIONED]
).fetchone()
if not all(res):
raise SourceConnectionException(
"We don't have the right permissions to list queries from stl views (Redshift Provisioned)"
f" - {res}"
)
else:
res = conn.execute(
REDSHIFT_TEST_GET_QUERIES_MAP[RedshiftInstanceType.SERVERLESS]
).fetchone()
if not all(res):
raise SourceConnectionException(
"We don't have the right permissions to list queries from sys views (Redshift Serverless)"
f" - {res}"
)

test_fn = {
"CheckAccess": partial(test_connection_engine_step, engine),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,12 +35,18 @@

from metadata.generated.schema.type.tableQuery import TableQuery
from metadata.ingestion.source.database.lineage_source import LineageSource
from metadata.ingestion.source.database.redshift.connection import (
get_redshift_instance_type,
)
from metadata.ingestion.source.database.redshift.models import RedshiftInstanceType
from metadata.ingestion.source.database.redshift.queries import (
REDSHIFT_GET_STORED_PROCEDURE_QUERIES,
REDSHIFT_SQL_STATEMENT,
REDSHIFT_GET_STORED_PROCEDURE_QUERIES_MAP,
REDSHIFT_SQL_STATEMENT_MAP,
)
from metadata.ingestion.source.database.redshift.query_parser import (
OpenMetadata,
RedshiftQueryParserSource,
WorkflowSource,
)
from metadata.ingestion.source.database.stored_procedures_mixin import (
StoredProcedureLineageMixin,
Expand All @@ -54,7 +60,7 @@
class RedshiftLineageSource(
RedshiftQueryParserSource, StoredProcedureLineageMixin, LineageSource
):
filters = """
provisioned_filters = """
AND (
querytxt ILIKE '%%create%%table%%as%%select%%'
OR querytxt ILIKE '%%insert%%into%%select%%'
Expand All @@ -63,7 +69,30 @@ class RedshiftLineageSource(
)
"""

sql_stmt = REDSHIFT_SQL_STATEMENT
serverless_filters = """
AND (
(query_text ILIKE '%%create%%table%%as%%select%%' AND query_type = 'CTAS')
OR (query_text ILIKE '%%insert%%into%%select%%' AND query_type = 'INSERT')
OR (query_text ILIKE '%%update%%' AND query_type = 'UPDATE')
OR (query_text ILIKE '%%merge%%' AND query_type = 'MERGE')
)
"""

def __init__(self, config: WorkflowSource, metadata: OpenMetadata):
super().__init__(config, metadata)

self.redshift_instance_type = get_redshift_instance_type(self.engine)

if self.redshift_instance_type == RedshiftInstanceType.PROVISIONED:
self.sql_stmt = REDSHIFT_SQL_STATEMENT_MAP[RedshiftInstanceType.PROVISIONED]
self.filters = self.provisioned_filters
logger.info(
"Using STL views for lineage processing of Redshift Provisioned"
)
else:
self.sql_stmt = REDSHIFT_SQL_STATEMENT_MAP[RedshiftInstanceType.SERVERLESS]
self.filters = self.serverless_filters
logger.info("Using SYS views for lineage processing of Redshift Serverless")

def yield_table_query(self) -> Iterator[TableQuery]:
"""
Expand Down Expand Up @@ -101,6 +130,8 @@ def get_stored_procedure_sql_statement(self) -> str:
Return the SQL statement to get the stored procedure queries
"""
start, _ = get_start_and_end(self.source_config.queryLogDuration)
query = REDSHIFT_GET_STORED_PROCEDURE_QUERIES.format(start_date=start)
query = REDSHIFT_GET_STORED_PROCEDURE_QUERIES_MAP[
self.redshift_instance_type
].format(start_date=start)

return query
Original file line number Diff line number Diff line change
Expand Up @@ -216,9 +216,11 @@ def query_table_names_and_types(
result = self.connection.execute(
sql.text(
REDSHIFT_GET_ALL_RELATION_INFO.format(
view_filter="OR c.relkind IN ('v', 'm')"
if self.source_config.includeViews
else "AND c.relkind NOT IN ('v', 'm')"
view_filter=(
"OR c.relkind IN ('v', 'm')"
if self.source_config.includeViews
else "AND c.relkind NOT IN ('v', 'm')"
)
)
),
{"schema": schema_name},
Expand Down Expand Up @@ -263,7 +265,7 @@ def get_database_names_raw(self) -> Iterable[str]:
yield from self._execute_database_query(REDSHIFT_GET_DATABASE_NAMES)

def _set_incremental_table_processor(self, database: str):
"""Prepares the needed data for doing incremental metadata extration for a given database.
"""Prepares the needed data for doing incremental metadata extraction for a given database.

1. Queries Redshift to get the changes done after the `self.incremental.start_datetime_utc`
2. Sets the table map with the changes within the RedshiftIncrementalTableProcessor
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
Redshift models
"""
import re
from enum import Enum
from typing import Dict, List, Optional, Tuple

from pydantic import BaseModel
Expand All @@ -20,6 +21,13 @@
SchemaName = str


class RedshiftInstanceType(Enum):
"""Redshift Instance Types"""

PROVISIONED = "PROVISIONED"
SERVERLESS = "SERVERLESS"


class RedshiftStoredProcedure(BaseModel):
"""Redshift stored procedure list query results"""

Expand Down
Loading
Loading