Skip to content

Commit 82da680

Browse files
author
Nick Müller
committed
Added AzureBlobFileSystem support for StructuredDatasets
Signed-off-by: Nick Müller <[email protected]>
1 parent c6e7237 commit 82da680

File tree

5 files changed

+14
-4
lines changed

5 files changed

+14
-4
lines changed

flytekit/types/structured/basic_dfs.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
from flytekit.models.literals import StructuredDatasetMetadata
1313
from flytekit.models.types import StructuredDatasetType
1414
from flytekit.types.structured.structured_dataset import (
15+
ABFS,
1516
GCS,
1617
LOCAL,
1718
PARQUET,
@@ -106,7 +107,7 @@ def decode(
106107

107108

108109
# Don't override default protocol
109-
for protocol in [LOCAL, S3, GCS]:
110+
for protocol in [LOCAL, S3, GCS, ABFS]:
110111
StructuredDatasetTransformerEngine.register(PandasToParquetEncodingHandler(protocol), default_for_type=False)
111112
StructuredDatasetTransformerEngine.register(ParquetToPandasDecodingHandler(protocol), default_for_type=False)
112113
StructuredDatasetTransformerEngine.register(ArrowToParquetEncodingHandler(protocol), default_for_type=False)

flytekit/types/structured/structured_dataset.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@
3939
# Protocols
4040
BIGQUERY = "bq"
4141
S3 = "s3"
42+
ABFS = "abfs"
4243
GCS = "gs"
4344
LOCAL = "/"
4445

plugins/flytekit-data-fsspec/flytekitplugins/fsspec/__init__.py

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@
2626

2727
from flytekit import StructuredDatasetTransformerEngine, logger
2828
from flytekit.configuration import internal
29-
from flytekit.types.structured.structured_dataset import GCS, S3
29+
from flytekit.types.structured.structured_dataset import ABFS, GCS, S3
3030

3131
from .arrow import ArrowToParquetEncodingHandler, ParquetToArrowDecodingHandler
3232
from .pandas import PandasToParquetEncodingHandler, ParquetToPandasDecodingHandler
@@ -41,6 +41,9 @@ def _register(protocol: str):
4141
StructuredDatasetTransformerEngine.register(ParquetToArrowDecodingHandler(protocol), True, True)
4242

4343

44+
if importlib.util.find_spec("adlfs"):
45+
_register(ABFS)
46+
4447
if importlib.util.find_spec("s3fs"):
4548
_register(S3)
4649

plugins/flytekit-polars/flytekitplugins/polars/sd_transformers.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
from flytekit.models.literals import StructuredDatasetMetadata
88
from flytekit.models.types import StructuredDatasetType
99
from flytekit.types.structured.structured_dataset import (
10+
ABFS,
1011
GCS,
1112
LOCAL,
1213
PARQUET,
@@ -62,7 +63,7 @@ def decode(
6263
return pl.read_parquet(path)
6364

6465

65-
for protocol in [LOCAL, S3, GCS]:
66+
for protocol in [LOCAL, S3, GCS, ABFS]:
6667
StructuredDatasetTransformerEngine.register(
6768
PolarsDataFrameToParquetEncodingHandler(protocol), default_for_type=False
6869
)

plugins/flytekit-spark/flytekitplugins/spark/sd_transformers.py

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,11 @@
77
from flytekit.models.literals import StructuredDatasetMetadata
88
from flytekit.models.types import StructuredDatasetType
99
from flytekit.types.structured.structured_dataset import (
10+
ABFS,
11+
GCS,
12+
LOCAL,
1013
PARQUET,
14+
S3,
1115
StructuredDataset,
1216
StructuredDatasetDecoder,
1317
StructuredDatasetEncoder,
@@ -48,6 +52,6 @@ def decode(
4852
return user_ctx.spark_session.read.parquet(flyte_value.uri)
4953

5054

51-
for protocol in ["/", "s3"]:
55+
for protocol in [LOCAL, S3, GCS, ABFS]:
5256
StructuredDatasetTransformerEngine.register(SparkToParquetEncodingHandler(protocol), default_for_type=False)
5357
StructuredDatasetTransformerEngine.register(ParquetToSparkDecodingHandler(protocol), default_for_type=False)

0 commit comments

Comments
 (0)