Skip to content

Commit bdb58e1

Browse files
Adds Databricks backend (#325)
* Initial DB framework, ls works (no details) * Switches to using folder_url instead of path * pin_exists() works now * pin_versions() works * pin_open() works * Makes _list_folders into _list_items * Adds mkdir & put, gets pin_write() working * Formatting improvements * Removes conditional to use a custom board object * Improvements to file reading, and adds initial rm * pin_delete() works, new 'exists' aproach * Adds some test pieces * Partially runs tests * Adds support for `detail` in ls * Figures out how to properly cache the fs * Fixes teardown for tests * extends `rm` one more level (fix later) * No errors in tests, moving on to addressing failures * Adds recursive file/folder mapper * fully recursive put (clean later) * Improvements to _map_put * Starts moving to discrete functions * Finishes moving everything to discrete functions * Fixes typo * Creates custom cache class, fixes issue with reading wrong pin * Removes _open from PinsDBCache after confirming that it's not needed * Removes data.csv file * Adds get() and _databricks_get() * Removes test data * Fixes issue with not handling board versioning for test boards * Passes constructor test * Removes PinsDBCache and manual registration of dbc * Emulates the structure of the other test boards * Adds notes to the function, cleans up arguments * Restores BoardRsConnect to helpers * Adds full BoardRSconnect call * Gets fs via calling the function for the tests * Removes Databricks from CI tests * Adds databricks-sdk to requirements dev file * Updates rest of dev reqs versions * Attempts to avoid double forward slash in Linux * Checks if path_to_version is a str before removing trailing slash * Fixes typo on isinstance call * Removes protocol assignment for DatabricksFs * Passes pre-commit * Adds databricks-sdk to minimum reqs * Addresses additional issue from precommit and attempts to solve pyright error * Update pins/databricks/fs.py Co-authored-by: Isabel Zimmerman <[email protected]> * Update requirements/minimum.txt Co-authored-by: Isabel Zimmerman <[email protected]> * Update pins/databricks/fs.py Co-authored-by: Isabel Zimmerman <[email protected]> * Update pins/databricks/fs.py Co-authored-by: Isabel Zimmerman <[email protected]> * Switches from os to pathlib * Converts functions to staticmethods * add docs for board_databricks * load in quartodoc * update all tests * clean up class, make true optional import * run dbc tests * add databricks into pyright deps * load databricks creds * error earlier * pass for windows cache * resolve dependencies * resolve dependencies again * cannot clean up constructors, skip --------- Co-authored-by: Isabel Zimmerman <[email protected]> Co-authored-by: isabel zimmerman <[email protected]>
1 parent ef9f358 commit bdb58e1

File tree

16 files changed

+603
-135
lines changed

16 files changed

+603
-135
lines changed

.env.dev

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,3 +30,8 @@ RSC_LICENSE=
3030
# (Note that the local file backend always uses a temporary directory.)
3131
#
3232
# PINS_TEST_S3__PATH="ci-pins"
33+
34+
# Databricks backend ----
35+
DATABRICKS_HOST=
36+
DATABRICKS_TOKEN=
37+
DATABRICKS_VOLUME=

.github/workflows/ci.yml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -71,6 +71,8 @@ jobs:
7171
AWS_REGION: "us-east-1"
7272
AZURE_STORAGE_ACCOUNT_NAME: ${{ secrets.AZURE_STORAGE_ACCOUNT_NAME }}
7373
AZURE_STORAGE_ACCOUNT_KEY: ${{ secrets.AZURE_STORAGE_ACCOUNT_KEY }}
74+
DATABRICKS_HOST: ${{ secrets.DATABRICKS_HOST }}
75+
DATABRICKS_TOKEN: ${{ secrets.DATABRICKS_TOKEN }}
7476
PYTEST_OPTS: ${{ matrix.pytest_opts }}
7577
REQUIREMENTS: ${{ matrix.requirements }}
7678
ACTION_OS: ${{ matrix.os }}

docs/_quarto.yml

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,8 @@ website:
4949
href: reference/board_gcs.qmd
5050
- text: "`board_azure`"
5151
href: reference/board_azure.qmd
52+
- text: "`board_databricks`"
53+
href: reference/board_databricks.qmd
5254
- text: "`board_connect`"
5355
href: reference/board_connect.qmd
5456
- text: "`board_url`"
@@ -99,6 +101,7 @@ quartodoc:
99101
- board_s3
100102
- board_gcs
101103
- board_azure
104+
- board_databricks
102105
- board_connect
103106
- board_url
104107
- board

pins/__init__.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
board_azure,
2323
board_s3,
2424
board_gcs,
25+
board_databricks,
2526
board,
2627
)
2728
from .boards import board_deparse

pins/boards.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -870,6 +870,8 @@ def board_deparse(board: BaseBoard):
870870
return f"board_gcs({repr(board.board)}{allow_pickle})"
871871
elif prot == "http":
872872
return f"board_url({repr(board.board)}, {board.pin_paths}{allow_pickle})"
873+
elif prot == "dbc":
874+
return f"board_databricks({repr(board.board)}{allow_pickle})"
873875
else:
874876
raise NotImplementedError(
875877
f"board deparsing currently not supported for protocol: {prot}"

pins/constructors.py

Lines changed: 64 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
from .boards import BaseBoard, BoardManual, BoardRsConnect, board_deparse
1111
from .cache import PinsAccessTimeCache, PinsCache, PinsRscCacheMapper, prefix_cache
1212
from .config import get_cache_dir, get_data_dir
13+
from .errors import PinsError
1314

1415
# Kept here for backward-compatibility reasons
1516
# Note that this is not a constructor, but a function to represent them.
@@ -87,6 +88,11 @@ def board(
8788

8889
fs = RsConnectFs(**storage_options)
8990

91+
elif protocol == "dbc":
92+
from pins.databricks.fs import DatabricksFs
93+
94+
fs = DatabricksFs(**storage_options)
95+
9096
else:
9197
fs = fsspec.filesystem(protocol, **storage_options)
9298

@@ -569,3 +575,61 @@ def board_azure(path, versioned=True, cache=DEFAULT, allow_pickle_read=None):
569575

570576
opts = {"use_listings_cache": False}
571577
return board("abfs", path, versioned, cache, allow_pickle_read, storage_options=opts)
578+
579+
580+
def board_databricks(path, versioned=True, cache=DEFAULT, allow_pickle_read=None):
581+
"""Create a board to read and write pins from an Databricks Volume folder.
582+
583+
Parameters
584+
----------
585+
path:
586+
The path to the target folder inside Unity Catalog. The path must include the
587+
catalog, schema, and volume names, preceded by 'Volumes/', for example:
588+
"/Volumes/my-catalog/my-schema/my-volume".
589+
versioned:
590+
Whether or not pins should be versioned.
591+
cache:
592+
Whether to use a cache. By default, pins attempts to select the right cache
593+
directory, given your filesystem. If `None` is passed, then no cache will be
594+
used. You can set the cache using the `PINS_CACHE_DIR` environment variable.
595+
allow_pickle_read: optional, bool
596+
Whether to allow reading pins that use the pickle protocol. Pickles are unsafe,
597+
and can execute arbitrary code. Only allow reading pickles if you trust the
598+
board to execute Python code on your computer.
599+
600+
You can enable reading pickles by setting this to `True`, or by setting the
601+
environment variable `PINS_ALLOW_PICKLE_READ`. If both are set, this argument
602+
takes precedence.
603+
604+
Notes
605+
-----
606+
The Databricks board uses the `databricks-sdk` library to authenticate and interact
607+
with the Databricks Volume.
608+
609+
See <https://docs.databricks.com/aws/en/dev-tools/sdk-python>
610+
611+
612+
Examples
613+
--------
614+
615+
>>> import pytest; pytest.skip()
616+
617+
>>> import pins
618+
>>> from dotenv import load_dotenv
619+
>>> load_dotenv() # eg, for a .env file with DATABRICKS_HOST and DATABRICKS_TOKEN set
620+
>>> board = pins.board_databricks("/Volumes/examples/my-board/test-volume")
621+
>>> board.pin_list()
622+
['df_csv']
623+
624+
>>> board.pin_read("df_csv")
625+
x y z
626+
0 1 a 3
627+
1 2 b 4
628+
"""
629+
try:
630+
import databricks.sdk # noqa: F401
631+
except ModuleNotFoundError:
632+
raise PinsError(
633+
"Install the `databricks-sdk` package for Databricks board support."
634+
)
635+
return board("dbc", path, versioned, cache, allow_pickle_read)

pins/databricks/__init__.py

Whitespace-only changes.

pins/databricks/fs.py

Lines changed: 196 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,196 @@
1+
import shutil
2+
from io import BytesIO
3+
from pathlib import Path, PurePath
4+
5+
from fsspec import AbstractFileSystem
6+
7+
from pins.errors import PinsError
8+
9+
10+
class DatabricksFs(AbstractFileSystem):
11+
protocol = "dbc"
12+
13+
def ls(self, path, detail=False, **kwargs):
14+
return self._databricks_ls(path, detail)
15+
16+
def exists(self, path: str, **kwargs):
17+
return self._databricks_exists(path)
18+
19+
def open(self, path: str, mode: str = "rb", *args, **kwargs):
20+
if mode != "rb":
21+
raise NotImplementedError
22+
return self._databricks_open(path)
23+
24+
def get(self, rpath, lpath, recursive=False, **kwargs):
25+
self._databricks_get(rpath, lpath, recursive, **kwargs)
26+
27+
def mkdir(self, path, create_parents=True, **kwargs):
28+
if not create_parents:
29+
raise NotImplementedError
30+
self._databricks_mkdir(path)
31+
32+
def put(
33+
self,
34+
lpath,
35+
rpath,
36+
recursive=True,
37+
maxdepth=None,
38+
**kwargs,
39+
):
40+
if not recursive:
41+
raise NotImplementedError
42+
if maxdepth is not None:
43+
raise NotImplementedError
44+
self._databricks_put(lpath, rpath)
45+
46+
def rm(self, path, recursive=True, maxdepth=None) -> None:
47+
if not recursive:
48+
raise NotImplementedError
49+
if maxdepth is not None:
50+
raise NotImplementedError
51+
if self._databricks_exists(path):
52+
self._databricks_rm_dir(path)
53+
54+
@staticmethod
55+
def _databricks_put(lpath, rpath):
56+
from databricks.sdk import WorkspaceClient
57+
58+
w = WorkspaceClient()
59+
path = Path(lpath).absolute()
60+
orig_path = path
61+
62+
def _upload_files(path):
63+
contents = Path(path)
64+
for item in contents.iterdir():
65+
abs_path = PurePath(path).joinpath(item)
66+
is_file = Path(abs_path).is_file()
67+
if is_file:
68+
rel_path = abs_path.relative_to(orig_path)
69+
db_path = PurePath(rpath).joinpath(rel_path)
70+
file = open(abs_path, "rb")
71+
w.files.upload(str(db_path), BytesIO(file.read()), overwrite=True)
72+
else:
73+
_upload_files(abs_path)
74+
75+
_upload_files(path)
76+
77+
def _databricks_get(self, board, rpath, lpath, recursive=False, **kwargs):
78+
from databricks.sdk import WorkspaceClient
79+
80+
w = WorkspaceClient()
81+
file_type = self._databricks_is_type(rpath)
82+
if file_type == "file":
83+
board.fs.get(rpath, lpath, **kwargs)
84+
return
85+
86+
def _get_files(path, recursive, **kwargs):
87+
raw_contents = w.files.list_directory_contents(path)
88+
contents = list(raw_contents)
89+
details = list(map(self._databricks_content_details, contents))
90+
for item in details:
91+
item_path = item.get("path")
92+
if item.get("is_directory"):
93+
if recursive:
94+
_get_files(item_path, recursive=recursive, **kwargs)
95+
else:
96+
rel_path = PurePath(item_path).relative_to(rpath)
97+
target_path = PurePath(lpath).joinpath(rel_path)
98+
board.fs.get(item_path, str(target_path))
99+
100+
_get_files(rpath, recursive, **kwargs)
101+
102+
def _databricks_open(self, path):
103+
from databricks.sdk import WorkspaceClient
104+
105+
if not self._databricks_exists(path):
106+
raise PinsError(f"File or directory does not exist at path: {path}")
107+
w = WorkspaceClient()
108+
resp = w.files.download(path)
109+
f = BytesIO()
110+
shutil.copyfileobj(resp.contents, f)
111+
f.seek(0)
112+
return f
113+
114+
def _databricks_exists(self, path: str):
115+
if self._databricks_is_type(path) == "nothing":
116+
return False
117+
else:
118+
return True
119+
120+
@staticmethod
121+
def _databricks_is_type(path: str):
122+
from databricks.sdk import WorkspaceClient
123+
from databricks.sdk.errors import NotFound
124+
125+
w = WorkspaceClient()
126+
try:
127+
w.files.get_metadata(path)
128+
except NotFound:
129+
try:
130+
w.files.get_directory_metadata(path)
131+
except NotFound:
132+
return "nothing"
133+
else:
134+
return "directory"
135+
else:
136+
return "file"
137+
138+
def _databricks_ls(self, path, detail):
139+
from databricks.sdk import WorkspaceClient
140+
141+
if not self._databricks_exists(path):
142+
raise PinsError(f"File or directory does not exist at path: {path}")
143+
w = WorkspaceClient()
144+
if self._databricks_is_type(path) == "file":
145+
if detail:
146+
return [dict(name=path, size=None, type="file")]
147+
else:
148+
return path
149+
150+
contents_raw = w.files.list_directory_contents(path)
151+
contents = list(contents_raw)
152+
items = []
153+
for item in contents:
154+
item = self._databricks_content_details(item)
155+
item_path = item.get("path")
156+
item_path = item_path.rstrip("/")
157+
if detail:
158+
if item.get("is_directory"):
159+
item_type = "directory"
160+
else:
161+
item_type = "file"
162+
items.append(dict(name=item_path, size=None, type=item_type))
163+
else:
164+
items.append(item_path)
165+
return items
166+
167+
def _databricks_rm_dir(self, path):
168+
from databricks.sdk import WorkspaceClient
169+
170+
w = WorkspaceClient()
171+
raw_contents = w.files.list_directory_contents(path)
172+
contents = list(raw_contents)
173+
details = list(map(self._databricks_content_details, contents))
174+
for item in details:
175+
item_path = item.get("path")
176+
if item.get("is_directory"):
177+
self._databricks_rm_dir(item_path)
178+
else:
179+
w.files.delete(item_path)
180+
w.files.delete_directory(path)
181+
182+
@staticmethod
183+
def _databricks_mkdir(path):
184+
from databricks.sdk import WorkspaceClient
185+
186+
w = WorkspaceClient()
187+
w.files.create_directory(path)
188+
189+
@staticmethod
190+
def _databricks_content_details(item):
191+
details = {
192+
"path": item.path,
193+
"name": item.name,
194+
"is_directory": item.is_directory,
195+
}
196+
return details

pins/drivers.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,8 @@ def load_path(filename: str, path_to_version, pin_type=None):
2424
filename = "data.csv"
2525

2626
if path_to_version is not None:
27+
if isinstance(path_to_version, str):
28+
path_to_version = path_to_version.rstrip("/")
2729
path_to_file = f"{path_to_version}/{filename}"
2830
else:
2931
# BoardUrl doesn't have versions, and the file is the full url

pins/tests/conftest.py

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,10 +6,17 @@
66
from importlib_resources import files
77
from pytest import mark as m
88

9-
from pins.tests.helpers import BoardBuilder, RscBoardBuilder, Snapshot, rm_env
9+
from pins.tests.helpers import (
10+
BoardBuilder,
11+
DbcBoardBuilder,
12+
RscBoardBuilder,
13+
Snapshot,
14+
rm_env,
15+
)
1016

1117
EXAMPLE_REL_PATH = "pins/tests/pins-compat"
1218
PATH_TO_EXAMPLE_BOARD = files("pins") / "tests/pins-compat"
19+
PATH_TO_EXAMPLE_BOARD_DBC = "/Volumes/workshops/my-board/my-volume/test"
1320
PATH_TO_EXAMPLE_VERSION = PATH_TO_EXAMPLE_BOARD / "df_csv/20220214T163720Z-9bfad/"
1421
EXAMPLE_PIN_NAME = "df_csv"
1522

@@ -21,6 +28,7 @@
2128
pytest.param(lambda: BoardBuilder("s3"), id="s3", marks=m.fs_s3),
2229
pytest.param(lambda: BoardBuilder("gcs"), id="gcs", marks=m.fs_gcs),
2330
pytest.param(lambda: BoardBuilder("abfs"), id="abfs", marks=m.fs_abfs),
31+
pytest.param(lambda: DbcBoardBuilder("dbc"), id="dbc", marks=m.fs_dbc),
2432
]
2533

2634
# rsc should only be used once, because users are created at docker setup time

0 commit comments

Comments
 (0)