Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Iceberg demo #248

Merged
merged 11 commits into from
Mar 1, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions demos/guru_scripts/docker/tutorial/4.x/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,9 @@ Now that you have a graph schema, you can load data using one of the following m

run loading job load_local_file
```

- Load from Iceberg table, or alternative Spark data sources, through [Tigergraph Spark Connector](https://docs.tigergraph.com/tigergraph-server/current/data-loading/load-from-spark-dataframe)
- Please follow the Jupyter Notebook PySpark demo: [26_load_iceberg.ipynb](./gsql/26_load_iceberg.ipynb)

[Go back to top](#top)

Expand Down
334 changes: 334 additions & 0 deletions demos/guru_scripts/docker/tutorial/4.x/gsql/26_load_iceberg.ipynb
Original file line number Diff line number Diff line change
@@ -0,0 +1,334 @@
{
"cells": [
{
"cell_type": "markdown",
"metadata": {},
"source": [
"# Loading from Iceberg table through [Tigergraph Spark Connector](https://docs.tigergraph.com/tigergraph-server/current/data-loading/load-from-spark-dataframe)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Data preparation for Iceberg table.\n",
"The cell below will create Iceberg tables that corresponds to the graph schema, then insert sample data to them. Omit this step if using other data sources."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"# Account table\n",
"spark.sql(\"\"\"\n",
" CREATE TABLE demo.financialGraph.Account (\n",
" name STRING,\n",
" isBlocked BOOLEAN\n",
" ) USING iceberg\n",
"\"\"\")\n",
"spark.sql(\"\"\"\n",
" INSERT INTO demo.financialGraph.Account (name, isBlocked) VALUES\n",
" ('Scott', FALSE),\n",
" ('Jenny', FALSE),\n",
" ('Steven', TRUE),\n",
" ('Paul', FALSE),\n",
" ('Ed', FALSE)\n",
"\"\"\")\n",
"print(\"Displaying data from Account table:\")\n",
"spark.sql(\"SELECT * FROM demo.financialGraph.Account\").show()\n",
"# +------+---------+\n",
"# | name|isBlocked|\n",
"# +------+---------+\n",
"# | Scott| false|\n",
"# | Jenny| false|\n",
"# |Steven| true|\n",
"# | Paul| false|\n",
"# | Ed| false|\n",
"# +------+---------+\n",
"\n",
"\n",
"# City table\n",
"spark.sql(\"\"\"\n",
" CREATE TABLE demo.financialGraph.City (\n",
" name STRING\n",
" ) USING iceberg\n",
"\"\"\")\n",
"spark.sql(\"\"\"\n",
" INSERT INTO demo.financialGraph.City (name) VALUES\n",
" ('New York'),\n",
" ('Gainesville'),\n",
" ('San Francisco')\n",
"\"\"\")\n",
"print(\"Displaying data from City table:\")\n",
"spark.sql(\"SELECT * FROM demo.financialGraph.City\").show()\n",
"# +-------------+\n",
"# | name|\n",
"# +-------------+\n",
"# | New York|\n",
"# | Gainesville|\n",
"# |San Francisco|\n",
"# +-------------+\n",
"\n",
"\n",
"# Phone table\n",
"spark.sql(\"\"\"\n",
" CREATE TABLE demo.financialGraph.Phone (\n",
" number STRING,\n",
" isBlocked BOOLEAN\n",
" ) USING iceberg\n",
"\"\"\")\n",
"spark.sql(\"\"\"\n",
" INSERT INTO demo.financialGraph.Phone (number, isBlocked) VALUES\n",
" ('718-245-5888', FALSE),\n",
" ('650-658-9867', TRUE),\n",
" ('352-871-8978', FALSE)\n",
"\"\"\")\n",
"print(\"Displaying data from Phone table:\")\n",
"spark.sql(\"SELECT * FROM demo.financialGraph.Phone\").show()\n",
"# +------------+---------+\n",
"# | number|isBlocked|\n",
"# +------------+---------+\n",
"# |718-245-5888| false|\n",
"# |650-658-9867| true|\n",
"# |352-871-8978| false|\n",
"# +------------+---------+\n",
"\n",
"\n",
"# Transfer table\n",
"spark.sql(\"\"\"\n",
" CREATE TABLE demo.financialGraph.transfer (\n",
" from_account STRING,\n",
" to_account STRING,\n",
" date DATE,\n",
" amount INT\n",
" ) USING iceberg\n",
"\"\"\")\n",
"spark.sql(\"\"\"\n",
" INSERT INTO demo.financialGraph.transfer (from_account, to_account, date, amount) VALUES\n",
" ('Scott', 'Ed', CAST('2024-01-04' AS DATE), 20000),\n",
" ('Scott', 'Ed', CAST('2024-02-01' AS DATE), 800),\n",
" ('Scott', 'Ed', CAST('2024-02-14' AS DATE), 500),\n",
" ('Jenny', 'Scott', CAST('2024-04-04' AS DATE), 1000),\n",
" ('Paul', 'Jenny', CAST('2024-02-01' AS DATE), 653),\n",
" ('Steven', 'Jenny', CAST('2024-05-01' AS DATE), 8560),\n",
" ('Ed', 'Paul', CAST('2024-01-04' AS DATE), 1500),\n",
" ('Paul', 'Steven', CAST('2023-05-09' AS DATE), 20000)\n",
"\"\"\")\n",
"print(\"Displaying data from Transfer table:\")\n",
"spark.sql(\"SELECT * FROM demo.financialGraph.transfer\").show()\n",
"# +------------+----------+----------+------+\n",
"# |from_account|to_account| date|amount|\n",
"# +------------+----------+----------+------+\n",
"# | Scott| Ed|2024-01-04| 20000|\n",
"# | Scott| Ed|2024-02-01| 800|\n",
"# | Scott| Ed|2024-02-14| 500|\n",
"# | Jenny| Scott|2024-04-04| 1000|\n",
"# | Paul| Jenny|2024-02-01| 653|\n",
"# | Steven| Jenny|2024-05-01| 8560|\n",
"# | Ed| Paul|2024-01-04| 1500|\n",
"# | Paul| Steven|2023-05-09| 20000|\n",
"# +------------+----------+----------+------+\n",
"\n",
"\n",
"# hasPhone table\n",
"spark.sql(\"\"\"\n",
" CREATE TABLE demo.financialGraph.hasPhone (\n",
" account STRING,\n",
" phone STRING\n",
" ) USING iceberg\n",
"\"\"\")\n",
"spark.sql(\"\"\"\n",
" INSERT INTO demo.financialGraph.hasPhone (account, phone) VALUES\n",
" ('Scott', '718-245-5888'),\n",
" ('Jenny', '718-245-5888'),\n",
" ('Jenny', '650-658-9867'),\n",
" ('Paul', '650-658-9867'),\n",
" ('Ed', '352-871-8978')\n",
"\"\"\")\n",
"print(\"Displaying data from hasPhone table:\")\n",
"spark.sql(\"SELECT * FROM demo.financialGraph.hasPhone\").show()\n",
"# +-------+------------+\n",
"# |account| phone|\n",
"# +-------+------------+\n",
"# | Scott|718-245-5888|\n",
"# | Jenny|718-245-5888|\n",
"# | Jenny|650-658-9867|\n",
"# | Paul|650-658-9867|\n",
"# | Ed|352-871-8978|\n",
"# +-------+------------+\n",
"\n",
"\n",
"# isLocatedIn table\n",
"spark.sql(\"\"\"\n",
" CREATE TABLE demo.financialGraph.isLocatedIn (\n",
" account STRING,\n",
" city STRING\n",
" ) USING iceberg\n",
"\"\"\")\n",
"spark.sql(\"\"\"\n",
" INSERT INTO demo.financialGraph.isLocatedIn (account, city) VALUES\n",
" ('Scott', 'New York'),\n",
" ('Jenny', 'San Francisco'),\n",
" ('Steven', 'San Francisco'),\n",
" ('Paul', 'Gainesville'),\n",
" ('Ed', 'Gainesville')\n",
"\"\"\")\n",
"print(\"Displaying data from isLocatedIn table:\")\n",
"spark.sql(\"SELECT * FROM demo.financialGraph.isLocatedIn\").show()\n",
"# +-------+-------------+\n",
"# |account| city|\n",
"# +-------+-------------+\n",
"# | Scott| New York|\n",
"# | Jenny|San Francisco|\n",
"# | Steven|San Francisco|\n",
"# | Paul| Gainesville|\n",
"# | Ed| Gainesville|\n",
"# +-------+-------------+"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Define the job to load data from an Iceberg table into the target vertex or edge.\n",
"Copy the content below to your container and save as file `load3.gsql`.\n",
"```gsql\n",
"USE GRAPH financialGraph\n",
"\n",
"DROP JOB load_iceberg\n",
"\n",
"//load from iceberg tables\n",
"CREATE LOADING JOB load_iceberg {\n",
" DEFINE FILENAME account;\n",
" DEFINE FILENAME phone;\n",
" DEFINE FILENAME city;\n",
" DEFINE FILENAME hasPhone;\n",
" DEFINE FILENAME locatedIn;\n",
" DEFINE FILENAME transferdata;\n",
" //define the mapping from the source table to the target graph element type. The mapping is specified by VALUES clause. \n",
" LOAD account TO VERTEX Account VALUES ($0, $1);\n",
" LOAD phone TO VERTEX Phone VALUES ($0, $1);\n",
" LOAD city TO VERTEX City VALUES ($0);\n",
" LOAD hasPhone TO Edge hasPhone VALUES ($0, $1);\n",
" LOAD locatedIn TO Edge isLocatedIn VALUES ($0, $1);\n",
" LOAD transferdata TO Edge transfer VALUES ($0, $1, $2, $3);\n",
"}\n",
"```\n",
"Next, run the following in your container's bash command line.\n",
"```bash\n",
"gsql load3.gsql\n",
"```\n",
"Or copy the content and paste in GSQL shell editor of TigerGraph Savanna to run.\n",
"\n",
"---\n",
"\n",
"* The `FILENAME` variables are defined but unassigned, they will be referenced in the following Spark write step. \n",
"* The `LOAD` statement maps the data source to the target schema elements by the **column index**, for example:\n",
"\n",
" For `VERTEX Account ( name STRING PRIMARY KEY, isBlocked BOOL)` and Iceberg table:\n",
" ```\n",
" +------+---------+\n",
" | name|isBlocked|\n",
" +------+---------+\n",
" | Scott| false|\n",
" | Jenny| false|\n",
" +------+---------+\n",
" ```\n",
" The first column(`$0`) is `name` and second column(`$1`) is `isBlock`, so that we can define the LOAD statement as `LOAD account TO VERTEX Account VALUES ($0, $1)`."
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Data loading through TigerGraph Spark connector\n",
"The TigerGraph Spark Connector employs Apache Spark to read data from a Spark DataFrame (from Iceberg table, or alternative Spark data sources) and write to TigerGraph."
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Prerequisite\n",
"Add the JAR of TigerGraph Spark Connector to Spark's `jars` folder. You can download the JAR from [Maven Central](https://central.sonatype.com/artifact/com.tigergraph/tigergraph-spark-connector/versions)."
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Define connection options\n",
"* Fill in the actual version in \"version\", e.g., \"4.1.0\".\n",
"* Make sure the given \"url\" is accessible by your Spark cluster.\n",
"* For TigerGraph Savanna users: replace the \"url\" with \"https://<cloud_domain_name>:443\".\n",
"* Choose one of the following authentication methods:\n",
" - \"username\" and \"password\"\n",
" - \"secret\"\n",
" - \"token\""
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"connection_opts = {\n",
" \"url\": \"http://localhost:14240\",\n",
" \"version\": \"<tg_version>\",\n",
" \"graph\": \"financialGraph\",\n",
" \"username\": \"tigergraph\",\n",
" \"password\": \"tigergraph\"\n",
" # alternative: \"secret\": \"<secret>\"\n",
" # alternative: \"token\": \"<JWT>\"\n",
"}"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Read Iceberg tables as Spark DataFrame and write to TigerGraph"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"# Define the list of tables and their corresponding loading job filenames for TigerGraph\n",
"tables = [\n",
" (\"demo.financialGraph.Account\", \"account\"),\n",
" (\"demo.financialGraph.City\", \"city\"),\n",
" (\"demo.financialGraph.Phone\", \"phone\"),\n",
" (\"demo.financialGraph.transfer\", \"transfer\"),\n",
" (\"demo.financialGraph.hasPhone\", \"hasphone\"),\n",
" (\"demo.financialGraph.isLocatedIn\", \"islocatedin\")\n",
"]\n",
"\n",
"# Loop through each table, read it as a DataFrame, and write to TigerGraph\n",
"for table_name, filename in tables:\n",
" df = spark.sql(f\"SELECT * FROM {table_name}\")\n",
" df.write \\\n",
" .format(\"tigergraph\") \\\n",
" .mode(\"append\") \\\n",
" .options(**connection_opts) \\\n",
" .option(\"loading.job\", \"load_iceberg\") \\\n",
" .option(\"loading.filename\", filename) \\\n",
" .option(\"loading.separator\", \"|\") \\\n",
" .save()\n",
" print(f\"Data from {table_name} table has been written to TigerGraph using filename {filename}.\")"
]
}
],
"metadata": {
"language_info": {
"name": "python"
}
},
"nbformat": 4,
"nbformat_minor": 2
}
Loading