Skip to content

Commit 162ebdc

Browse files
Merge pull request #31 from RumbleDB/omar-changes
Support custom multi-catalogs for Iceberg
2 parents c29ad2c + d1efd35 commit 162ebdc

File tree

7 files changed

+286
-7
lines changed

7 files changed

+286
-7
lines changed

.github/workflows/python-app.yml

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,4 +38,7 @@ jobs:
3838
flake8 . --count --exit-zero --max-complexity=10 --max-line-length=127 --statistics
3939
- name: Test with pytest
4040
run: |
41-
pytest
41+
# run tests independently to enable tests using spark session
42+
for f in tests/test_*.py; do
43+
pytest -q "$f" || exit 1
44+
done

.gitignore

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,14 @@ coverage.xml
5151
.pytest_cache/
5252
cover/
5353

54+
# Iceberg & Delta runtime
55+
outputjson/
56+
outputparquet/
57+
outputtext/
58+
iceberg-warehouse/
59+
spark-warehouse/
60+
metastore_db/
61+
5462
# Translations
5563
*.mo
5664
*.pot

src/jsoniq/jars/rumbledb-2.0.0.jar

76 MB
Binary file not shown.

src/jsoniq/jars/rumbledb-2.0.8.jar

76 MB
Binary file not shown.

src/jsoniq/session.py

Lines changed: 54 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -74,6 +74,10 @@ def __init__(self):
7474
sys.stderr.write("[Error] Could not determine Java version. Please ensure Java is installed and JAVA_HOME is properly set.\n")
7575
sys.exit(43)
7676
self._sparkbuilder = SparkSession.builder.config("spark.jars", jar_path_str)
77+
self._appendable_keys = {
78+
"spark.jars.packages",
79+
"spark.sql.extensions",
80+
}
7781

7882
def getOrCreate(self):
7983
if RumbleSession._rumbleSession is None:
@@ -122,16 +126,60 @@ def config(self, key=None, value=None, conf=None, *, map=None):
122126
self._sparkbuilder = self._sparkbuilder.config(key=key, value=value, conf=conf, map=map)
123127
return self;
124128

129+
def _append_config(self, key, value):
130+
if key not in self._appendable_keys:
131+
raise ValueError(f"{key} is not an appendable Spark config key.")
132+
current = self._sparkbuilder._options.get(key)
133+
if current:
134+
value = current + "," + value
135+
self._sparkbuilder = self._sparkbuilder.config(key=key, value=value)
136+
return self;
137+
125138
def withDelta(self):
139+
self._append_config("spark.jars.packages", "io.delta:delta-spark_2.13:4.0.0")
140+
self._append_config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension")
126141
self._sparkbuilder = self._sparkbuilder \
127-
.config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
128-
.config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") \
129-
.config("spark.jars.packages", "io.delta:delta-spark_2.13:4.0.0")
142+
.config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")
130143
return self;
144+
145+
def withIceberg(self, catalog_names=None):
146+
"""
147+
Configure Iceberg catalog(s).
131148
132-
def withMongo(self):
149+
- If no catalogs are provided (None or empty), the session catalog (spark_catalog)
150+
is configured for Iceberg.
151+
- If catalogs are provided, table names must be fully qualified with the catalog
152+
(<catalog>.<namespace>.<table>). No implicit default is applied.
153+
- Each configured catalog uses its own warehouse directory under
154+
./iceberg-warehouse/<catalog>.
155+
- These are the default settings for the Iceberg catalog, which can be overridden if needed.
156+
"""
157+
self._append_config("spark.jars.packages", "org.apache.iceberg:iceberg-spark-runtime-4.0_2.13:1.10.0")
158+
self._append_config("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions")
159+
if catalog_names is None:
160+
catalog_names = []
161+
if not isinstance(catalog_names, (list, tuple, set)):
162+
raise ValueError("catalog_names must be a list, tuple, or set of strings.")
163+
catalog_names = list(catalog_names)
164+
if len(catalog_names) == 0:
165+
catalog_names = ["spark_catalog"]
166+
catalog_class = "org.apache.iceberg.spark.SparkSessionCatalog"
167+
else:
168+
catalog_class = "org.apache.iceberg.spark.SparkCatalog"
169+
for catalog_name in catalog_names:
170+
if not isinstance(catalog_name, str) or not catalog_name:
171+
raise ValueError("catalog_names must contain non-empty strings.")
172+
warehouse = f"./iceberg-warehouse/{catalog_name}"
173+
self._sparkbuilder = self._sparkbuilder \
174+
.config(f"spark.sql.catalog.{catalog_name}", catalog_class) \
175+
.config(f"spark.sql.catalog.{catalog_name}.type", "hadoop") \
176+
.config(f"spark.sql.catalog.{catalog_name}.warehouse", warehouse)
133177
self._sparkbuilder = self._sparkbuilder \
134-
.config("spark.jars.packages", "org.mongodb.spark:mongo-spark-connector_2.13:10.5.0")
178+
.config("spark.sql.iceberg.check-ordering", "false")
179+
return self;
180+
181+
def withMongo(self):
182+
self._append_config("spark.jars.packages", "org.mongodb.spark:mongo-spark-connector_2.13:10.5.0")
135183
return self;
136184

137185
def __getattr__(self, name):
@@ -248,4 +296,4 @@ def jsoniq(self, str, **kwargs):
248296
return seq;
249297

250298
def __getattr__(self, item):
251-
return getattr(self._sparksession, item)
299+
return getattr(self._sparksession, item)

tests/test_catalogs_update.py

Lines changed: 168 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,168 @@
1+
from jsoniq import RumbleSession
2+
from unittest import TestCase
3+
import uuid
4+
5+
6+
class TestCatalogsUpdate(TestCase):
7+
"""
8+
Default Spark session catalog for Delta + custom Iceberg catalogs.
9+
- Delta uses spark_catalog.
10+
- Iceberg uses named catalogs (e.g., iceberg, ice_b, ice_one).
11+
"""
12+
@classmethod
13+
def setUpClass(cls):
14+
RumbleSession._rumbleSession = None
15+
RumbleSession._builder = RumbleSession.Builder()
16+
cls.rumble = (
17+
RumbleSession.builder
18+
.withDelta()
19+
.withIceberg(["iceberg", "ice_b", "ice_one"])
20+
.getOrCreate()
21+
)
22+
23+
def _create_insert_count(self, rumble, create_query, insert_query, count_query):
24+
rumble.jsoniq(create_query).applyPUL()
25+
rumble.jsoniq(insert_query).applyPUL()
26+
count_value = rumble.jsoniq(count_query).json()
27+
self.assertEqual(count_value, (2,))
28+
29+
def _assert_query_fails(self, rumble, query):
30+
with self.assertRaises(Exception):
31+
rumble.jsoniq(query).json()
32+
33+
@staticmethod
34+
def _cleanup_warehouses():
35+
import os
36+
import shutil
37+
38+
for dirname in ("spark-warehouse", "iceberg-warehouse"):
39+
path = os.path.join(os.getcwd(), dirname)
40+
shutil.rmtree(path, ignore_errors=True)
41+
42+
@classmethod
43+
def tearDownClass(cls):
44+
try:
45+
if cls.rumble is not None:
46+
cls.rumble._sparksession.stop()
47+
finally:
48+
RumbleSession._rumbleSession = None
49+
cls._cleanup_warehouses()
50+
51+
def test_default_catalogs(self):
52+
"""
53+
Delta uses spark_catalog and Iceberg uses the named catalog "iceberg".
54+
Also verifies that cross-catalog reads are rejected.
55+
"""
56+
suffix = uuid.uuid4().hex
57+
delta_table = f"default.delta_default_{suffix}"
58+
iceberg_table = f"iceberg.default.iceberg_default_{suffix}"
59+
60+
self._create_insert_count(
61+
self.rumble,
62+
f'create collection table("{delta_table}") with {{"k": 1}}',
63+
f'insert {{"k": 2}} last into collection table("{delta_table}")',
64+
f'count(table("{delta_table}"))'
65+
)
66+
self._create_insert_count(
67+
self.rumble,
68+
f'create collection iceberg-table("{iceberg_table}") with {{"k": 1}}',
69+
f'insert {{"k": 2}} last into collection iceberg-table("{iceberg_table}")',
70+
f'count(iceberg-table("{iceberg_table}"))'
71+
)
72+
self._assert_query_fails(
73+
self.rumble,
74+
f'iceberg-table("ice_b.{iceberg_table.split(".", 1)[1]}")'
75+
)
76+
77+
def test_single_custom_catalogs(self):
78+
"""
79+
Iceberg on a single custom catalog (ice_one).
80+
Ensures unqualified access does not resolve to this catalog.
81+
"""
82+
suffix = uuid.uuid4().hex
83+
iceberg_table = f"ice_one.default.ice_single_{suffix}"
84+
85+
self._create_insert_count(
86+
self.rumble,
87+
f'create collection iceberg-table("{iceberg_table}") with {{"k": 1}}',
88+
f'insert {{"k": 2}} last into collection iceberg-table("{iceberg_table}")',
89+
f'count(iceberg-table("{iceberg_table}"))'
90+
)
91+
self._assert_query_fails(
92+
self.rumble,
93+
f'iceberg-table("{iceberg_table.split(".", 1)[1]}")'
94+
)
95+
96+
def test_multiple_catalogs(self):
97+
"""
98+
Iceberg on multiple catalogs (iceberg + ice_b).
99+
Verifies isolation by asserting cross-catalog reads fail.
100+
"""
101+
suffix = uuid.uuid4().hex
102+
iceberg_default_table = f"iceberg.default.iceberg_multi_default_{suffix}"
103+
iceberg_custom_table = f"ice_b.default.iceberg_multi_{suffix}"
104+
105+
self._create_insert_count(
106+
self.rumble,
107+
f'create collection iceberg-table("{iceberg_default_table}") with {{"k": 1}}',
108+
f'insert {{"k": 2}} last into collection iceberg-table("{iceberg_default_table}")',
109+
f'count(iceberg-table("{iceberg_default_table}"))'
110+
)
111+
self._create_insert_count(
112+
self.rumble,
113+
f'create collection iceberg-table("{iceberg_custom_table}") with {{"k": 1}}',
114+
f'insert {{"k": 2}} last into collection iceberg-table("{iceberg_custom_table}")',
115+
f'count(iceberg-table("{iceberg_custom_table}"))'
116+
)
117+
self._assert_query_fails(
118+
self.rumble,
119+
f'iceberg-table("ice_b.{iceberg_default_table.split(".", 1)[1]}")'
120+
)
121+
self._assert_query_fails(
122+
self.rumble,
123+
f'iceberg-table("{iceberg_custom_table.split(".", 1)[1]}")'
124+
)
125+
126+
def test_resolution_order(self):
127+
"""
128+
Matches Iceberg's catalog/namespace resolution order for spark.table().
129+
Ensures unqualified access fails when spark_catalog is not Iceberg.
130+
"""
131+
suffix = uuid.uuid4().hex
132+
table_name = f"iceberg.default.iceberg_res_{suffix}"
133+
short_name = f"iceberg_res_{suffix}"
134+
multi_ns_table = f"iceberg.ns1.ns2.iceberg_res_{suffix}_ns"
135+
136+
self._create_insert_count(
137+
self.rumble,
138+
f'create collection iceberg-table("{table_name}") with {{"k": 1}}',
139+
f'insert {{"k": 2}} last into collection iceberg-table("{table_name}")',
140+
f'count(iceberg-table("{table_name}"))'
141+
)
142+
143+
# catalog.table -> catalog.currentNamespace.table
144+
self._create_insert_count(
145+
self.rumble,
146+
f'create collection iceberg-table("iceberg.{short_name}_2") with {{"k": 1}}',
147+
f'insert {{"k": 2}} last into collection iceberg-table("iceberg.{short_name}_2")',
148+
f'count(iceberg-table("iceberg.{short_name}_2"))'
149+
)
150+
151+
# catalog.namespace1.namespace2.table -> catalog.namespace1.namespace2.table
152+
self._create_insert_count(
153+
self.rumble,
154+
f'create collection iceberg-table("{multi_ns_table}") with {{"k": 1}}',
155+
f'insert {{"k": 2}} last into collection iceberg-table("{multi_ns_table}")',
156+
f'count(iceberg-table("{multi_ns_table}"))'
157+
)
158+
159+
# namespace.table (current catalog) should fail because spark_catalog is not Iceberg here.
160+
self._assert_query_fails(
161+
self.rumble,
162+
f'iceberg-table("default.{short_name}")'
163+
)
164+
# table (current catalog + namespace) should also fail for the same reason.
165+
self._assert_query_fails(
166+
self.rumble,
167+
f'iceberg-table("{short_name}")'
168+
)
Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,52 @@
1+
from jsoniq import RumbleSession
2+
from unittest import TestCase
3+
import uuid
4+
5+
6+
class TestIcebergDefaultCatalog(TestCase):
7+
"""
8+
Iceberg uses the session catalog (spark_catalog).
9+
- Delta custom catalogs are not tested here (to be added later).
10+
"""
11+
12+
@classmethod
13+
def setUpClass(cls):
14+
RumbleSession._rumbleSession = None
15+
RumbleSession._builder = RumbleSession.Builder()
16+
cls.rumble = RumbleSession.builder.withIceberg().getOrCreate()
17+
18+
@classmethod
19+
def tearDownClass(cls):
20+
try:
21+
cls.rumble._sparksession.stop()
22+
finally:
23+
RumbleSession._rumbleSession = None
24+
cls._cleanup_warehouses()
25+
26+
@staticmethod
27+
def _cleanup_warehouses():
28+
import os
29+
import shutil
30+
31+
for dirname in ("spark-warehouse", "iceberg-warehouse"):
32+
path = os.path.join(os.getcwd(), dirname)
33+
shutil.rmtree(path, ignore_errors=True)
34+
35+
def test_default_catalog(self):
36+
"""
37+
Iceberg using spark_catalog with a default namespace.
38+
This test runs in its own session to avoid Delta/spark_catalog conflicts.
39+
"""
40+
suffix = uuid.uuid4().hex
41+
iceberg_table = f"default.iceberg_default_session_{suffix}"
42+
43+
self.rumble.jsoniq(
44+
f'create collection iceberg-table("{iceberg_table}") with {{"k": 1}}'
45+
).applyPUL()
46+
self.rumble.jsoniq(
47+
f'insert {{"k": 2}} last into collection iceberg-table("{iceberg_table}")'
48+
).applyPUL()
49+
count_value = self.rumble.jsoniq(
50+
f'count(iceberg-table("{iceberg_table}"))'
51+
).json()
52+
self.assertEqual(count_value, (2,))

0 commit comments

Comments
 (0)