Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
98 changes: 94 additions & 4 deletions buckaroo/server/handlers.py
Original file line number Diff line number Diff line change
Expand Up @@ -233,13 +233,28 @@ async def post(self):
if session_id is None:
return

# Optional backend="xorq" routes /load through the same push-down
# path as /load_expr but sourced from a parquet rather than a
# build_dir. Default "pandas" keeps existing callers unaffected.
backend_arg = body.get("backend", "pandas")
if backend_arg not in ("pandas", "xorq"):
self.set_status(400)
self.write({"error_code": "invalid_backend",
"message": f"backend must be 'pandas' or 'xorq' (got {backend_arg!r})"})
return
if backend_arg == "xorq" and mode != "buckaroo":
self.set_status(400)
self.write({"error_code": "invalid_mode_for_backend",
"message": "backend='xorq' requires mode='buckaroo' (XorqInfiniteBuckaroo)"})
return

sessions = self.application.settings["sessions"]
session = sessions.get_or_create(session_id, path)
session.mode = mode
# Loading via /load is always pandas — clear any xorq state left
# by a prior /load_expr on the same session so WS dispatch routes
# to the new pandas dataflow rather than a stale xorq one.
session.backend = "pandas"
session.backend = backend_arg

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 Badge Defer backend mutation until xorq load succeeds

Setting session.backend to "xorq" before dependency/path validation means a failed xorq /load request can leave an existing pandas session in an inconsistent state: old session.dataflow is still present, but backend now says xorq. In that state, DataStreamHandler._handle_buckaroo_state_change selects session.xorq_dataflow (which is None) and returns early, so subsequent buckaroo state updates from clients are ignored for that session even though data is still loaded.

Useful? React with 👍 / 👎.

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Confirmed — bug is real. Pinned in test_xorq_load_failure_preserves_session_backend (commit 2e557a4): session.backend = backend_arg and session.xorq_dataflow = None run before the backend dispatch in handlers.py:254, so a 404/501/500 in the xorq branch leaves a previously-pandas session with backend="xorq" but xorq_dataflow=None, and DataStreamHandler._handle_buckaroo_state_change then drops further state updates.

Fix to follow: move the session mutations (backend, xorq_dataflow reset, expr, etc.) into the success arm of the xorq branch (after the import, path-exists, and load have all succeeded), and likewise wrap the pandas-default mutations after _load_file_with_error_handling returns a non-None file_obj. A failed /load should be a no-op on existing session state.

# Reset any xorq/pandas state left by a prior load on the same
# session so WS dispatch routes to the new dataflow rather than a
# stale one. The branches below repopulate the relevant fields.
session.xorq_dataflow = None
session.expr = None
# Reset the live-typed row-fetch filter so a search term carried
Expand All @@ -251,6 +266,81 @@ async def post(self):
if component_config:
session.component_config = component_config

if backend_arg == "xorq":
# XorqInfiniteBuckaroo over a materialised parquet. Mirrors
# LoadExprHandler's xorq-branch session setup but sourced from
# a file path instead of a build_dir.
try:
from buckaroo.server import xorq_loading # noqa: PLC0415
except ImportError:
self.set_status(501)
self.write({"error_code": "xorq_not_installed",
"message": "xorq is not installed on this server. "
"Install with `pip install buckaroo[xorq]`."})
return

if not os.path.exists(path):
self.set_status(404)
self.write({"error_code": "file_not_found",
"message": f"File not found: {path}"})
return

try:
expr = xorq_loading.load_expr_parquet_path(path)
xorq_dataflow = xorq_loading.XorqServerDataflow(
expr, skip_main_serial=True)
metadata = xorq_loading.get_xorq_metadata(xorq_dataflow, path)
except Exception:
tb = traceback.format_exc()
log.error("load (xorq) error path=%s: %s", path, tb)
resp: dict = {"error_code": "load_error",
"message": "Failed to load parquet via xorq backend"}
Comment on lines +293 to +297

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Badge Return xorq_not_installed when xorq import fails

The new xorq path promises a 501 xorq_not_installed, but missing xorq is actually raised later inside load_expr_parquet_path (from xorq.api import ...) and is caught by this broad except, which returns a generic 500 load_error. Clients can’t distinguish “server misconfigured” from runtime load failures, and automation expecting the documented 501 branch will mis-handle this case.

Useful? React with 👍 / 👎.

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Confirmed — bug is real. Pinned in test_xorq_not_installed_returns_501 (commit 2e557a4). from buckaroo.server import xorq_loading always succeeds because the module-level imports of xorq.api in xorq_stats_v2 / xorq_stat_pipeline are already guarded; the real from xorq.api import connect, deferred_read_parquet is lazy inside load_expr_parquet_path, so when xorq is missing it raises ImportError from inside the try: in handlers.py:283, gets caught by except Exception:, and clients see a generic 500 load_error instead of the documented 501 xorq_not_installed.

Fix to follow: replace the from buckaroo.server import xorq_loading probe with an explicit importlib.util.find_spec("xorq.api") (or a direct import xorq.api inside its own try/except) before calling load_expr_parquet_path, so the 501 branch is actually reachable when xorq is not installed.

if _BUCKAROO_DEBUG:
resp["details"] = tb
self.set_status(500)
self.write(resp)
return

session.expr = expr
session.xorq_dataflow = xorq_dataflow
session.df = None
session.dataflow = None
session.ldf = None
session.metadata = metadata
session.df_display_args = xorq_dataflow.df_display_args
session.df_data_dict = xorq_dataflow.df_data_dict
session.df_meta = xorq_dataflow.df_meta
session.buckaroo_state = {
"cleaning_method": "", "post_processing": "", "sampled": False,
"show_commands": False, "df_display": "main",
"search_string": "", "quick_command_args": {}}
session.buckaroo_options = xorq_dataflow.buckaroo_options
session.command_config = xorq_dataflow.command_config
session.operation_results = {
"transformed_df": {"schema": {"fields": []}, "data": []},
"generated_py_code": "# server mode (xorq backend via /load)"}
session.operations = []

if component_config and session.df_display_args:
for key in session.df_display_args:
dvc = session.df_display_args[key].get("df_viewer_config")
if dvc is not None:
dvc["component_config"] = {
**dvc.get("component_config", {}),
**component_config,
}

self._push_state_to_clients(session, metadata)
browser_action = "skipped" if no_browser else self._handle_browser_window(session_id)

log.info("load session=%s path=%s rows=%d backend=xorq browser=%s",
session_id, path, metadata["rows"], browser_action)
self.write({"session": session_id, "server_pid": os.getpid(),
"browser_action": browser_action, **metadata})
return

# Pandas-default / lazy-polars path. Identical to before — the
# session.backend assignment above already set "pandas".
# Load data in appropriate mode
file_obj, metadata = self._load_file_with_error_handling(path, is_lazy=(mode == "lazy"))
if file_obj is None:
Expand Down
24 changes: 24 additions & 0 deletions buckaroo/server/xorq_loading.py
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,30 @@ def get_xorq_metadata(xorq_dataflow: XorqServerDataflow, build_dir: str) -> dict
return {"path": build_dir, "rows": _expr_count(expr), "columns": columns}


def load_expr_parquet_path(path: str):
"""Wrap a local parquet file in a xorq deferred-read expression.

Counterpart to ``load_expr_build_dir`` for the case where the caller
holds a materialised parquet (e.g. a host that materialised a
catalog entry to a snapshot file via ``xorq catalog run``) and wants
XorqInfiniteBuckaroo's push-down query behaviour rather than the
eager pandas/polars load that ``data_loading.load_file`` does.

``deferred_read_parquet`` wires the file into xorq's datafusion
backend without materialising it; ``XorqServerDataflow`` then takes
that expression and answers every ``infinite_request`` via
``handle_infinite_request_xorq`` (the same code path the
build-dir-driven xorq loader uses).
"""
from xorq.api import connect, deferred_read_parquet # noqa: PLC0415
from xorq.vendor import ibis # noqa: PLC0415

if ibis.options.default_backend is None:
ibis.options.default_backend = connect()
table_name = Path(path).stem.replace("-", "_") or "parquet"
return deferred_read_parquet(path, table_name=table_name)


# ---------------------------------------------------------------------------
# project-authored summary stats (loaded from <project_root>/stats/*.py)
# ---------------------------------------------------------------------------
Expand Down
137 changes: 137 additions & 0 deletions tests/unit/server/test_load_expr.py
Original file line number Diff line number Diff line change
Expand Up @@ -373,3 +373,140 @@ async def test_session_reuse_xorq_then_pandas(self):
finally:
shutil.rmtree(builds_root, ignore_errors=True)
os.unlink(csv_path)


class TestLoadBackendXorq(tornado.testing.AsyncHTTPTestCase):
"""POST /load with backend="xorq" — wraps a parquet path in a
deferred-read xorq expression and serves it through the same
push-down path as /load_expr."""

def get_app(self):
return make_app()

def test_invalid_backend_returns_400(self):
resp = self.fetch("/load", method="POST",
body=json.dumps({"session": "lb-bad", "path": "/tmp/x.parquet",
"backend": "bogus"}),
headers={"Content-Type": "application/json"})
self.assertEqual(resp.code, 400)
self.assertEqual(json.loads(resp.body)["error_code"], "invalid_backend")

def test_xorq_backend_requires_buckaroo_mode(self):
resp = self.fetch("/load", method="POST",
body=json.dumps({"session": "lb-mode", "path": "/tmp/x.parquet",
"backend": "xorq", "mode": "lazy"}),
headers={"Content-Type": "application/json"})
self.assertEqual(resp.code, 400)
self.assertEqual(json.loads(resp.body)["error_code"],
"invalid_mode_for_backend")

@tornado.testing.gen_test
async def test_xorq_load_failure_preserves_session_backend(self):
"""Codex P1 on #840: a failed xorq /load must not flip
session.backend to 'xorq' before the load is known to succeed.
Otherwise a session previously serving pandas ends up with
backend='xorq' + xorq_dataflow=None, and the WS state-change
handler silently drops further updates (websocket_handler.py:59)."""
d = tempfile.mkdtemp()
good_path = os.path.join(d, "good.parquet")
bogus_path = os.path.join(d, "nonexistent.parquet")
try:
pd.DataFrame({"idx": list(range(3))}).to_parquet(good_path)

# 1. Successful pandas load establishes prior session state.
resp = await _post(self.get_http_port(), "/load",
{"session": "lb-roll", "path": good_path, "mode": "buckaroo"})
self.assertEqual(resp.code, 200)

sessions = self._app.settings["sessions"]
session = sessions.get("lb-roll")
self.assertEqual(session.backend, "pandas")
prior_dataflow = session.dataflow
self.assertIsNotNone(prior_dataflow)

# 2. Failed xorq load (path doesn't exist → 404).
resp = await _post(self.get_http_port(), "/load",
{"session": "lb-roll", "path": bogus_path,
"mode": "buckaroo", "backend": "xorq"})
self.assertEqual(resp.code, 404)

# 3. Session must not have been half-mutated. backend stays
# 'pandas'; the existing pandas dataflow stays reachable so
# WS dispatch can still answer buckaroo_state_change.
self.assertEqual(session.backend, "pandas",
"session.backend must not flip to 'xorq' on failed xorq load")
self.assertIs(session.dataflow, prior_dataflow,
"session.dataflow must survive a failed xorq load")
self.assertIsNone(session.xorq_dataflow)
finally:
shutil.rmtree(d, ignore_errors=True)

def test_xorq_not_installed_returns_501(self):
"""Codex P2 on #840: when xorq.api is not importable the handler
must return 501 xorq_not_installed, not a generic 500 load_error.
The probe in handlers.py has to be explicit — importing
``buckaroo.server.xorq_loading`` succeeds even without xorq
because the transitive ``import xorq.api`` calls in
``xorq_stats_v2`` and ``xorq_stat_pipeline`` are guarded with
try/except.

Uses a real parquet path so the path-exists check passes; that
way a 500 result here proves the bug isn't masked as file-not-
found and is genuinely the unreachable 501 branch."""
from unittest.mock import patch
d = tempfile.mkdtemp()
parquet_path = os.path.join(d, "p.parquet")
try:
pd.DataFrame({"idx": [0]}).to_parquet(parquet_path)
# sys.modules[name] = None forces a subsequent `import name`
# to raise ImportError. patch.dict restores the mapping so
# other tests still see the real xorq.api.
with patch.dict(sys.modules, {"xorq.api": None}):
resp = self.fetch("/load", method="POST",
body=json.dumps({"session": "lb-noxorq",
"path": parquet_path,
"mode": "buckaroo", "backend": "xorq"}),
headers={"Content-Type": "application/json"})
self.assertEqual(resp.code, 501)
self.assertEqual(json.loads(resp.body)["error_code"],
"xorq_not_installed")
finally:
shutil.rmtree(d, ignore_errors=True)

@tornado.testing.gen_test
async def test_load_parquet_via_xorq_backend(self):
"""Happy path: POST /load with backend=xorq, then issue an
infinite_request — the row count must come from the parquet via
XorqServerDataflow, proving routing went through the xorq path."""
d = tempfile.mkdtemp()
parquet_path = os.path.join(d, "lb_fixture.parquet")
try:
pd.DataFrame({"idx": list(range(7)), "name": ["a", "b", "c", "d", "e", "f", "g"]}).to_parquet(parquet_path)

resp = await _post(self.get_http_port(), "/load",
{"session": "lb-ok", "path": parquet_path,
"mode": "buckaroo", "backend": "xorq"})
self.assertEqual(resp.code, 200)
body = json.loads(resp.body)
self.assertEqual(body["session"], "lb-ok")
self.assertEqual(body["rows"], 7)
self.assertEqual({c["name"] for c in body["columns"]}, {"idx", "name"})

ws = await tornado.websocket.websocket_connect(
f"ws://localhost:{self.get_http_port()}/ws/lb-ok")
await ws.read_message() # initial_state

ws.write_message(json.dumps({
"type": "infinite_request",
"payload_args": {"start": 0, "end": 10,
"sourceName": "default", "origEnd": 10}}))
r = json.loads(await ws.read_message())
self.assertEqual(r["type"], "infinite_resp")
self.assertEqual(r["length"], 7)

binary_frame = await ws.read_message()
table = pq.read_table(io.BytesIO(binary_frame))
self.assertEqual(table.num_rows, 7)
ws.close()
finally:
shutil.rmtree(d, ignore_errors=True)
Loading