Skip to content

[FLINK-29549]- Flink Glue Catalog integration #191

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 3 commits into
base: main
Choose a base branch
from

Conversation

FranMorilloAWS
Copy link

Purpose of the change

For example: Implements the Table API for the Kinesis Source.

Verifying this change

Please make sure both new and modified tests in this PR follows the conventions defined in our code quality guide: https://flink.apache.org/contributing/code-style-and-quality-common.html#testing

(Please pick either of the following options)

This change is a trivial rework / code cleanup without any test coverage.

(or)

This change is already covered by existing tests, such as (please describe tests).

(or)

This change added tests and can be verified as follows:

(example:)

  • Added integration tests for end-to-end deployment
  • Added unit tests
  • Manually verified by running the Kinesis connector on a local Flink cluster.

Significant changes

(Please check any boxes [x] if the answer is "yes". You can first publish the PR and check them afterwards, for convenience.)

  • Dependencies have been added or upgraded
  • Public API has been changed (Public API is any class annotated with @Public(Evolving))
  • Serializers have been changed
  • New feature has been introduced
    • If yes, how is this documented? (not applicable / docs / JavaDocs / not documented)

Copy link

boring-cyborg bot commented Mar 3, 2025

Thanks for opening this pull request! Please check out our contributing guidelines. (https://flink.apache.org/contributing/how-to-contribute.html)

Copy link
Contributor

@leekeiabstraction leekeiabstraction left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Provided early comments/questions

Access tables from different catalogs in the same query:

```sql
-- Join tables from different catalogs
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This example joins on customer id match, IIUC, glue shouldn't keep actual values but rather metadata on schema. Should we have an example that joins on a more concrete example e.g. field name match?

<plugins>
<plugin>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.8.1</version>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a motivation for specifying 3.8.1 here? (Similarly for shade plugin and flink dependencies)

The pom.xml at repo root already defines build plugins. Should we point to pom.xml at repo root as parent and rely on convention over configuration?

    <build>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
            </plugin>
            ```

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also applies to flink-catalog-aws-glue/pom.xml

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 - let's delegate this to the project setup on flink-connector-aws

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-json</artifactId>
<version>1.18.0</version>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we use 1.18 here over ${flink.version}?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Similar comments over the rest of this pom.xml on flink packages, jackson databind, connector etc.

</dependency>
<dependency>
<groupId>software.amazon.awssdk</groupId>
<artifactId>bedrockruntime</artifactId>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you elaborate why bedrockruntime is needed?


// Define configuration options that users must provide
public static final ConfigOption<String> REGION =
ConfigOptions.key("region")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we have qualifier/prefix so that it's clear that the config is for GlueCatalogConnector?

For example:

catalog.glue.region
catalog.glue.default-database

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

They don't to that distinction for the other Catalogs

As they would need to put as Catalog type: glue, not sure if it makes sense to be repetitive and add glue to all configs

return true;
} catch (EntityNotFoundException e) {
return false;
} catch (Exception e) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's use more specific exception here

* @param functionPath fully qualified function path
* @throws CatalogException In case of Unexpected errors.
*/
public void dropGlueFunction(ObjectPath functionPath) throws CatalogException {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems like we do not throw CatalogException within the method, do we need to catch, wrap and rethrow?

Comment on lines 36 to 41
case JAVA:
return GlueCatalogConstants.FLINK_JAVA_FUNCTION_PREFIX + function.getClassName();
case SCALA:
return GlueCatalogConstants.FLINK_SCALA_FUNCTION_PREFIX + function.getClassName();
case PYTHON:
return GlueCatalogConstants.FLINK_PYTHON_FUNCTION_PREFIX + function.getClassName();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems like we are using a self defined pattern for class names

    public static final String FLINK_SCALA_FUNCTION_PREFIX = "flink:scala:";
    public static final String FLINK_PYTHON_FUNCTION_PREFIX = "flink:python:";
    public static final String FLINK_JAVA_FUNCTION_PREFIX = "flink:java:";

Documentation on glue's UserDefinedFunction actually says The Java class that contains the function code. . Should we use Java namespace + class name format instead?

https://sdk.amazonaws.com/java/api/latest/software/amazon/awssdk/services/glue/model/UserDefinedFunction.html#className()

Copy link
Contributor

@Samrat002 Samrat002 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @FranMorilloAWS,

It's great to see efforts toward integrating AWS Glue Catalog with Flink. I previously submitted a pull request that implements proposed FLIP-277: apache/flink-connector-aws#47.

I have a couple of questions:

  1. Could you please share the reasons for not working on the existing work in PR [FLINK-30481][FLIP-277] GlueCatalog Implementation #47? What restriction led you to take over the existing ongoing work and put repeated effort for the same?

  2. I noticed that significant portions of the code copied from PR [FLINK-30481][FLIP-277] GlueCatalog Implementation #47. While I'm happy to assist with the review, it would be in the collaborative spirit of open source to retain the original commits when incorporating code from previous contributions.

Copy link
Contributor

@hlteoh37 hlteoh37 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the contribution @FranMorilloAWS It's really nice to see detailed docs and well thought through APIs!

Added some comments around the structure of the repo - will continue looking at the code structure

<plugins>
<plugin>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.8.1</version>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 - let's delegate this to the project setup on flink-connector-aws

Copy link
Contributor

@Samrat002 Samrat002 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you @FranMorilloAWS .

I have done a very high level pass .

Key Notices :

  1. glue client offers paginated requests. these are generally helpful for responses that has very large number or records. for example listing tables . incorporate those changes

  2. Missing Unit test for type conversion

  3. IMO , this pr can be reduced to scope of database and tables. function , partition and other features like stats can be part of future pr.

Cheers, Samrat


/** Utilities for Glue catalog Function related operations. */
@Internal
public class GlueFunctionsOperations extends AbstractGlueOperations {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IMO, function support can be part of followup pr. WDUT ?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unfortunately Functions are needed for being able to select specific columns

throw new IllegalArgumentException("Glue type cannot be null or empty");
}

// Trim but don't lowercase - we'll handle case-insensitivity per type
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why case-insensitivity is handled per type ?
Isn't handling case-insensitivity increases code complexity.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This approach is necessary because:
Glue types like "string", "int", "boolean" should be matched case-insensitively (i.e., "STRING" and "string" are the same type)
It matches AWS Glue's behavior, where type names are case-insensitive

// Create a synchronized client builder to avoid concurrent modification exceptions
this.glueClient = GlueClient.builder()
.region(Region.of(region))
.credentialsProvider(software.amazon.awssdk.auth.credentials.DefaultCredentialsProvider.create())
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

catalog will only work for DefaultCredentialsProvider.

Add other modes with aws.credentials.provider

https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=235836075#FLIP277:NativeGlueCatalogSupportinFlink-Configurations

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Glue client should atleast support

  1. glueendpoint
  2. httpClient
  3. different credential configuration

not supporting these minimal requirement may be create constraints in adoption

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we add those after first release?

@FranMorilloAWS FranMorilloAWS force-pushed the flink-glue-integration branch from 1725a0f to b1f2e33 Compare April 21, 2025 13:07
Comment on lines +138 to +139
GetDatabaseResponse response = glueClient.getDatabase(
GetDatabaseRequest.builder()
.name(databaseName)
.build()
);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

validateDatabaseName only does the pattern match using !VALID_NAME_PATTERN.matcher(databaseName).matches()) .

I am more concerned about

  1. When a user tries to run SHOW CREATE DATABASE MyDataBase, which database will be returned? MyDatabase or myDatabase or any other varies in casing?

  2. What will the SHOW DATABASES output look like?

I don't see any mechanism in the code that can identify this anomaly.

Here is what you can do to fix this problem

Add an identifier before the character in the name to mark that the character is uppercase.

myDatabase will translate to -my-database. character succeeding identifier - will be interpreted as upper case.

@FranMorilloAWS
Copy link
Author

@Samrat002 Considering that glue will lower case all databases, we shouldnt allow users to create databases with uppercase, therefore if they try to do show Create database they must give a database in lowercase if not the command will fail. I added additional tests in Glue Catalog and GlueDatabaseOperations Tests to show this

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants