Skip to content

Commit b1f2e33

Browse files
Rebased Commit
1 parent b55dec1 commit b1f2e33

26 files changed

+5459
-0
lines changed

flink-catalog-aws-glue/.gitignore

+2
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,2 @@
1+
./target
2+
./.idea

flink-catalog-aws-glue/README.md

+358
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,358 @@
1+
# Flink AWS Glue Catalog Connector
2+
3+
The Flink AWS Glue Catalog connector provides integration between Apache Flink and the AWS Glue Data Catalog. This connector enables Flink applications to use AWS Glue as a metadata catalog for tables, databases, and schemas, allowing seamless SQL queries against AWS resources.
4+
5+
## Features
6+
7+
- Register AWS Glue as a catalog in Flink applications
8+
- Access Glue databases and tables through Flink SQL
9+
- Support for various AWS data sources (S3, Kinesis, MSK)
10+
- Mapping between Flink and AWS Glue data types
11+
- Compatibility with Flink's Table API and SQL interface
12+
13+
## Prerequisites
14+
15+
Before getting started, ensure you have the following:
16+
17+
- **AWS account** with appropriate permissions for AWS Glue and other required services
18+
- **AWS credentials** properly configured
19+
20+
## Getting Started
21+
22+
### 1. Add Dependency
23+
24+
Add the AWS Glue Catalog connector to your Flink project:
25+
26+
### 2. Configure AWS Credentials
27+
28+
Ensure AWS credentials are configured using one of these methods:
29+
30+
- Environment variables
31+
- AWS credentials file
32+
- IAM roles (for applications running on AWS)
33+
34+
### 3. Register the Glue Catalog
35+
36+
You can register the AWS Glue catalog using either the Table API or SQL:
37+
38+
#### Using Table API (Java/Scala)
39+
40+
```java
41+
// Java/Scala
42+
import org.apache.flink.table.catalog.glue.GlueCatalog;
43+
import org.apache.flink.table.catalog.Catalog;
44+
45+
// Create Glue catalog instance
46+
Catalog glueCatalog = new GlueCatalog(
47+
"glue_catalog", // Catalog name
48+
"default", // Default database
49+
"us-east-1"); // AWS region
50+
51+
52+
// Register with table environment
53+
tableEnv.registerCatalog("glue_catalog", glueCatalog);
54+
tableEnv.useCatalog("glue_catalog");
55+
```
56+
57+
#### Using Table API (Python)
58+
59+
```python
60+
# Python
61+
from pyflink.table.catalog import GlueCatalog
62+
63+
# Create and register Glue catalog
64+
glue_catalog = GlueCatalog(
65+
"glue_catalog", # Catalog name
66+
"default", # Default database
67+
"us-east-1") # AWS region
68+
69+
t_env.register_catalog("glue_catalog", glue_catalog)
70+
t_env.use_catalog("glue_catalog")
71+
```
72+
73+
#### Using SQL
74+
75+
In the Flink SQL Client, create and use the Glue catalog:
76+
77+
```sql
78+
-- Create a catalog using Glue
79+
CREATE CATALOG glue_catalog WITH (
80+
'type' = 'glue',
81+
'catalog-name' = 'glue_catalog',
82+
'default-database' = 'default',
83+
'region' = 'us-east-1'
84+
);
85+
86+
-- Use the created catalog
87+
USE CATALOG glue_catalog;
88+
89+
-- Use a specific database
90+
USE default;
91+
```
92+
93+
### 4. Create or Reference Glue Tables
94+
95+
Once the catalog is registered, you can create new tables or reference existing ones:
96+
97+
```sql
98+
-- Create a new table in Glue
99+
CREATE TABLE customer_table (
100+
id BIGINT,
101+
name STRING,
102+
region STRING
103+
) WITH (
104+
'connector' = 'kinesis',
105+
'stream.arn' = 'customer-stream',
106+
'aws.region' = 'us-east-1',
107+
'format' = 'json'
108+
);
109+
110+
-- Query existing Glue table
111+
SELECT * FROM glue_catalog.sales_db.order_table;
112+
```
113+
114+
## Catalog Operations
115+
116+
The AWS Glue Catalog connector supports several catalog operations through SQL. Here's a list of the operations that are currently implemented:
117+
118+
### Database Operations
119+
120+
```sql
121+
-- Create a new database
122+
CREATE DATABASE sales_db;
123+
124+
-- Create a database with comment
125+
CREATE DATABASE sales_db COMMENT 'Database for sales data';
126+
127+
-- Create a database if it doesn't exist
128+
CREATE DATABASE IF NOT EXISTS sales_db;
129+
130+
-- Drop a database
131+
DROP DATABASE sales_db;
132+
133+
-- Drop a database if it exists
134+
DROP DATABASE IF EXISTS sales_db;
135+
136+
-- Use a specific database
137+
USE sales_db;
138+
```
139+
140+
### Table Operations
141+
142+
```sql
143+
-- Create a table
144+
CREATE TABLE orders (
145+
order_id BIGINT,
146+
customer_id BIGINT,
147+
order_date TIMESTAMP,
148+
amount DECIMAL(10, 2)
149+
);
150+
151+
-- Create a table with comment and properties
152+
CREATE TABLE orders (
153+
order_id BIGINT,
154+
customer_id BIGINT,
155+
order_date TIMESTAMP,
156+
amount DECIMAL(10, 2),
157+
PRIMARY KEY (order_id) NOT ENFORCED
158+
) COMMENT 'Table storing order information'
159+
WITH (
160+
'connector' = 'kinesis',
161+
'stream.arn' = 'customer-stream',
162+
'aws.region' = 'us-east-1',
163+
'format' = 'json'
164+
);
165+
166+
-- Create table if not exists
167+
CREATE TABLE IF NOT EXISTS orders (
168+
order_id BIGINT,
169+
customer_id BIGINT
170+
);
171+
172+
-- Drop a table
173+
DROP TABLE orders;
174+
175+
-- Drop a table if it exists
176+
DROP TABLE IF EXISTS orders;
177+
178+
-- Show table details
179+
DESCRIBE orders;
180+
```
181+
182+
### View Operations
183+
184+
```sql
185+
-- Create a view
186+
CREATE VIEW order_summary AS
187+
SELECT customer_id, COUNT(*) as order_count, SUM(amount) as total_amount
188+
FROM orders
189+
GROUP BY customer_id;
190+
191+
-- Create a temporary view (only available in current session)
192+
CREATE TEMPORARY VIEW temp_view AS
193+
SELECT * FROM orders WHERE amount > 100;
194+
195+
-- Drop a view
196+
DROP VIEW order_summary;
197+
198+
-- Drop a view if it exists
199+
DROP VIEW IF EXISTS order_summary;
200+
```
201+
202+
### Function Operations
203+
204+
```sql
205+
-- Register a function
206+
CREATE FUNCTION multiply_func AS 'com.example.functions.MultiplyFunction';
207+
208+
-- Register a temporary function
209+
CREATE TEMPORARY FUNCTION temp_function AS 'com.example.functions.TempFunction';
210+
211+
-- Drop a function
212+
DROP FUNCTION multiply_func;
213+
214+
-- Drop a temporary function
215+
DROP TEMPORARY FUNCTION temp_function;
216+
```
217+
218+
### Listing Resources
219+
220+
Query available catalogs, databases, and tables:
221+
222+
```sql
223+
-- List all catalogs
224+
SHOW CATALOGS;
225+
226+
-- List databases in the current catalog
227+
SHOW DATABASES;
228+
229+
-- List tables in the current database
230+
SHOW TABLES;
231+
232+
-- List tables in a specific database
233+
SHOW TABLES FROM sales_db;
234+
235+
-- List views in the current database
236+
SHOW VIEWS;
237+
238+
-- List functions
239+
SHOW FUNCTIONS;
240+
```
241+
242+
## Case Sensitivity in AWS Glue
243+
244+
### Understanding Case Handling
245+
246+
AWS Glue handles case sensitivity in a specific way:
247+
248+
1. **Top-level column names** are automatically lowercased in Glue (e.g., `UserProfile` becomes `userprofile`)
249+
2. **Nested struct field names** preserve their original case in Glue (e.g., inside a struct, `FirstName` stays as `FirstName`)
250+
251+
However, when writing queries in Flink SQL, you should use the **original column names** as defined in your `CREATE TABLE` statement, not how they are stored in Glue.
252+
253+
### Example with Nested Fields
254+
255+
Consider this table definition:
256+
257+
```sql
258+
CREATE TABLE nested_json_test (
259+
`Id` INT,
260+
`UserProfile` ROW<
261+
`FirstName` VARCHAR(255),
262+
`lastName` VARCHAR(255)
263+
>,
264+
`event_data` ROW<
265+
`EventType` VARCHAR(50),
266+
`eventTimestamp` TIMESTAMP(3)
267+
>,
268+
`metadata` MAP<VARCHAR(100), VARCHAR(255)>
269+
)
270+
```
271+
272+
When stored in Glue, the schema looks like:
273+
274+
```json
275+
{
276+
"userprofile": { // Note: lowercased
277+
"FirstName": "string", // Note: original case preserved
278+
"lastName": "string" // Note: original case preserved
279+
}
280+
}
281+
```
282+
283+
### Querying Nested Fields
284+
285+
When querying, always use the original column names as defined in your `CREATE TABLE` statement:
286+
287+
```sql
288+
-- CORRECT: Use the original column names from CREATE TABLE
289+
SELECT UserProfile.FirstName FROM nested_json_test;
290+
291+
-- INCORRECT: This doesn't match your schema definition
292+
SELECT `userprofile`.`FirstName` FROM nested_json_test;
293+
294+
-- For nested fields within nested fields, also use original case
295+
SELECT event_data.EventType, event_data.eventTimestamp FROM nested_json_test;
296+
297+
-- Accessing map fields
298+
SELECT metadata['source_system'] FROM nested_json_test;
299+
```
300+
301+
### Important Notes on Case Sensitivity
302+
303+
1. Always use the original column names as defined in your `CREATE TABLE` statement
304+
2. Use backticks (`) when column names contain special characters or spaces
305+
3. Remember that regardless of how Glue stores the data internally, your queries should match your schema definition
306+
4. When creating tables, defining the schema with backticks is recommended for clarity
307+
308+
## Data Type Mapping
309+
310+
The connector handles mapping between Flink data types and AWS Glue data types automatically. The following table shows the basic type mappings:
311+
312+
| Flink Type | AWS Glue Type |
313+
|------------|---------------|
314+
| CHAR | string |
315+
| VARCHAR | string |
316+
| BOOLEAN | boolean |
317+
| BINARY | binary |
318+
| VARBINARY | binary |
319+
| DECIMAL | decimal |
320+
| TINYINT | byte |
321+
| SMALLINT | short |
322+
| INTEGER | int |
323+
| BIGINT | long |
324+
| FLOAT | float |
325+
| DOUBLE | double |
326+
| DATE | date |
327+
| TIME | string |
328+
| TIMESTAMP | timestamp |
329+
| ROW | struct |
330+
| ARRAY | array |
331+
| MAP | map |
332+
333+
## Limitations and Considerations
334+
335+
1. **Case Sensitivity**: As detailed above, always use the original column names from your schema definition when querying.
336+
3. **AWS Service Limits**: Be aware of AWS Glue service limits that may affect your application.
337+
4. **Authentication**: Ensure proper AWS credentials with appropriate permissions are available.
338+
5. **Region Selection**: The Glue catalog must be registered with the correct AWS region where your Glue resources exist.
339+
6. **Unsupported Operations**: The following operations are not currently supported:
340+
- ALTER DATABASE (modifying database properties)
341+
- ALTER TABLE (modifying table properties or schema)
342+
- RENAME TABLE
343+
- Partition management operations (ADD/DROP PARTITION)
344+
345+
## Troubleshooting
346+
347+
### Common Issues
348+
349+
1. **"Table not found"**: Verify the table exists in the specified Glue database and catalog.
350+
2. **Authentication errors**: Check AWS credentials and permissions.
351+
3. **Case sensitivity errors**: Ensure you're using the original column names as defined in your schema.
352+
4. **Type conversion errors**: Verify that data types are compatible between Flink and Glue.
353+
354+
## Additional Resources
355+
356+
- [Apache Flink Documentation](https://nightlies.apache.org/flink/flink-docs-stable/docs/dev/table/catalogs/)
357+
- [AWS Glue Documentation](https://docs.aws.amazon.com/glue/)
358+
- [Flink SQL Documentation](https://nightlies.apache.org/flink/flink-docs-stable/docs/dev/table/sql/overview/)

0 commit comments

Comments
 (0)