diff --git a/buckaroo/buckaroo_widget.py b/buckaroo/buckaroo_widget.py index f7431bc2c..d5c8e9413 100644 --- a/buckaroo/buckaroo_widget.py +++ b/buckaroo/buckaroo_widget.py @@ -10,6 +10,7 @@ import os import sys import traceback +import warnings from datetime import datetime from typing import Literal, Union import pandas as pd @@ -30,9 +31,10 @@ from buckaroo.extension_utils import copy_extend from .serialization_utils import EMPTY_DF_WHOLE, check_and_fix_df, pd_to_obj, to_parquet, sd_to_parquet_b64 +from .cache.initial_cache import apply_initial_cache, cache_mismatch_reason, extract_column_schema from .dataflow.dataflow import CustomizableDataflow from .dataflow.dataflow_extras import (Sampling, exception_protect) -from .dataflow.styling_core import (ComponentConfig, DFViewerConfig, DisplayArgs, OverrideColumnConfig, PinnedRowConfig, StylingAnalysis, merge_column_config, EMPTY_DFVIEWER_CONFIG) +from .dataflow.styling_core import (ComponentConfig, DFViewerConfig, DisplayArgs, OverrideColumnConfig, PinnedRowConfig, StylingAnalysis, build_df_display_args, EMPTY_DFVIEWER_CONFIG) from .dataflow.autocleaning import PandasAutocleaning from pathlib import Path @@ -126,7 +128,8 @@ def __init__(self, orig_df, debug=False, column_config_overrides:Union[Literal[None], OverrideColumnConfig]=None, pinned_rows:Union[Literal[None], PinnedRowConfig]=None, extra_grid_config=None, component_config:Union[Literal[None], ComponentConfig]=None, - init_sd=None, skip_stat_columns=None, skip_main_serial=False, record_transcript=False): + init_sd=None, skip_stat_columns=None, skip_main_serial=False, record_transcript=False, + initial_cache=None): """ BuckarooWidget was originally designed to extend CustomizableDataFlow @@ -168,7 +171,38 @@ def _df_to_obj(idfself, df:pd.DataFrame): bidirectional_wire(self, self.dataflow, "operation_results") bidirectional_wire(self, self.dataflow, "buckaroo_options") bidirectional_wire(self, self.dataflow, "command_config") - + + self._maybe_apply_initial_cache(initial_cache) + + def _maybe_apply_initial_cache(self, bundle): + """Validate + replay a host-provided initial-load bundle (mechanism only — + no Jupyter store / prewarm). A bundle whose config_id + schema match the + widget's live configuration hydrates the display traits from the cache; a + mismatch warns and keeps the freshly-computed values. + + The dataflow is already built here, so this is for parity with the server + path (and future Jupyter cache exploitation), not a build skip. Replays + via the same ``apply_initial_cache`` the server uses, regenerating + ``df_display_args`` from a zero-row frame when display overrides are set.""" + if bundle is None: + return + df = self.dataflow.processed_df + reason = cache_mismatch_reason( + bundle, analysis_klasses=self.dataflow.analysis_klasses, + sampling_klass=getattr(self.dataflow, 'sampling_klass', None), + init_sd=getattr(self.dataflow, 'init_sd', None) or None, + skip_stat_columns=getattr(self.dataflow, 'skip_stat_columns', None), + schema=extract_column_schema(df) if df is not None else None) + if reason is not None: + warnings.warn("initial_cache ignored (config mismatch): %s" % reason) + return + apply_initial_cache( + self, bundle, df_display_klasses=self.dataflow.df_display_klasses, + column_config_overrides=self.dataflow.column_config_overrides, + component_config=self.dataflow.component_config, + extra_grid_config=self.dataflow.extra_grid_config, + pinned_rows=self.dataflow.pinned_rows, sd_to_jsondf=self._sd_to_jsondf) + def _df_to_obj(self, df:pd.DataFrame): return pd_to_obj(self.sampling_klass.serialize_sample(df)) @@ -364,26 +398,12 @@ def _handle_widget_change(self, change): 'all_stats': self._sd_to_jsondf(merged_sd), 'empty': []} - temp_display_args = {} - for display_name, A_Klass in self.dataflow.df_display_klasses.items(): - df_viewer_config = A_Klass.get_dfviewer_config(merged_sd, processed_df) - base_column_config = df_viewer_config['column_config'] - df_viewer_config['column_config'] = merge_column_config( - base_column_config, self.dataflow.processed_df, self.dataflow.column_config_overrides) - disp_arg = {'data_key': A_Klass.data_key, - #'df_viewer_config': json.loads(json.dumps(df_viewer_config)), - 'df_viewer_config': df_viewer_config, - 'summary_stats_key': A_Klass.summary_stats_key} - temp_display_args[display_name] = disp_arg - - if self.dataflow.pinned_rows is not None: - temp_display_args['main']['df_viewer_config']['pinned_rows'] = self.dataflow.pinned_rows - if self.dataflow.extra_grid_config: - temp_display_args['main']['df_viewer_config']['extra_grid_config'] = self.dataflow.extra_grid_config - if self.dataflow.component_config: - temp_display_args['main']['df_viewer_config']['component_config'] = self.dataflow.component_config - - self.df_display_args = temp_display_args + self.df_display_args = build_df_display_args( + self.dataflow.df_display_klasses, merged_sd, processed_df, + self.dataflow.column_config_overrides, + pinned_rows=self.dataflow.pinned_rows, + extra_grid_config=self.dataflow.extra_grid_config, + component_config=self.dataflow.component_config) _bk_flash("_handle_widget_change EXIT (df_display_args → JS)") @@ -391,11 +411,12 @@ def __init__(self, orig_df, debug=False, column_config_overrides:Union[Literal[None], OverrideColumnConfig]=None, pinned_rows:Union[Literal[None], PinnedRowConfig]=None, extra_grid_config=None, component_config:Union[Literal[None], ComponentConfig]=None, - init_sd=None, skip_stat_columns=None, record_transcript=False): + init_sd=None, skip_stat_columns=None, record_transcript=False, initial_cache=None): super().__init__(orig_df, debug, column_config_overrides, pinned_rows, extra_grid_config, component_config, init_sd, skip_stat_columns=skip_stat_columns, - skip_main_serial=True, record_transcript=record_transcript) + skip_main_serial=True, record_transcript=record_transcript, + initial_cache=initial_cache) def widget_tuple_args_bridge(change_unused): self._handle_widget_change(change_unused) @@ -468,10 +489,10 @@ def __init__(self, orig_df, debug=False, column_config_overrides:Union[Literal[None], OverrideColumnConfig]=None, pinned_rows:Union[Literal[None], PinnedRowConfig]=None, extra_grid_config=None, component_config:Union[Literal[None], ComponentConfig]=None, - init_sd=None, skip_stat_columns=None): + init_sd=None, skip_stat_columns=None, initial_cache=None): super().__init__(orig_df, debug, column_config_overrides, pinned_rows, extra_grid_config, component_config, init_sd, - skip_stat_columns=skip_stat_columns) + skip_stat_columns=skip_stat_columns, initial_cache=initial_cache) self.df_id = str(id(orig_df)) diff --git a/buckaroo/cache/__init__.py b/buckaroo/cache/__init__.py new file mode 100644 index 000000000..a81a0e6dc --- /dev/null +++ b/buckaroo/cache/__init__.py @@ -0,0 +1,8 @@ +"""Initial-load cache: snapshot the first render so it can be replayed +without touching the DataFrame or executing the expression. + +See docs/initial-load-cache-design.md. The handshake (config_fingerprint + +schema) decides whether a precomputed bundle matches the widget's live +configuration; a mismatch warns and recomputes — the cache is never blindly +trusted. +""" diff --git a/buckaroo/cache/fingerprint.py b/buckaroo/cache/fingerprint.py new file mode 100644 index 000000000..9f8a660af --- /dev/null +++ b/buckaroo/cache/fingerprint.py @@ -0,0 +1,61 @@ +"""Stable config fingerprint for the initial-load cache. + +``config_fingerprint`` identifies the *data-touching* configuration — the set +of inputs that determine ``merged_sd`` and the row window — so the handshake +can decide whether a cached bundle matches the widget's live config without +recomputing. It is **cross-process stable**: keyed on each class's +``module.qualname`` (plus an optional per-class ``cache_version``), never on +``id()``, so a bundle built in one process validates in another. + +Display knobs (column_config_overrides, component_config, pinned_rows, theme) +are deliberately *out* of the fingerprint — they're applied at replay from the +bundle, so re-theming never invalidates the cache. See +docs/initial-load-cache-design.md. +""" +import hashlib +import json +from typing import Any, Iterable, List, Optional + +# Bump when the bundle schema or the assembly logic changes incompatibly, so +# old bundles fail the handshake (warn + recompute) rather than mis-serve. +INITIAL_CACHE_VERSION = 1 + + +def _klass_id(kls: Any) -> str: + """Stable identity for an analysis/styling class. + + ``module.qualname`` is reproducible across processes (unlike ``id``). An + optional ``cache_version`` class attribute lets a class bust its own cached + bundles when its logic changes without a global version bump. + """ + mod = getattr(kls, '__module__', '') + qn = getattr(kls, '__qualname__', None) or getattr(kls, '__name__', repr(kls)) + ver = getattr(kls, 'cache_version', '') + return f"{mod}.{qn}:{ver}" + + +def _sampling_id(sampling_klass: Any) -> str: + if sampling_klass is None: + return '' + # Sampling affects which rows reach the analysis pipeline and the window, + # so its identity + the limits that change output are part of the key. + return "|".join([_klass_id(sampling_klass), f"pre={getattr(sampling_klass, 'pre_limit', '')}", + f"ser={getattr(sampling_klass, 'serialize_limit', '')}", f"cols={getattr(sampling_klass, 'max_columns', '')}"]) + + +def config_fingerprint(*, analysis_klasses: Iterable[Any], sampling_klass: Any = None, + init_sd: Optional[dict] = None, skip_stat_columns: Optional[Iterable[str]] = None, + cache_version: Optional[str] = None) -> str: + """Return a stable hex fingerprint of the data-touching configuration.""" + skip: List[str] = sorted(str(c) for c in (skip_stat_columns or [])) + payload = { + 'v': INITIAL_CACHE_VERSION, + 'analysis_klasses': [_klass_id(k) for k in analysis_klasses], + 'sampling': _sampling_id(sampling_klass), + # init_sd injects/overrides stats, so its content is part of identity. + # default=str keeps the hash deterministic past numpy / odd scalars. + 'init_sd': json.dumps(init_sd, sort_keys=True, default=str) if init_sd else None, + 'skip_stat_columns': skip, + 'cache_version': cache_version} + blob = json.dumps(payload, sort_keys=True, default=str) + return hashlib.blake2b(blob.encode(), digest_size=16).hexdigest() diff --git a/buckaroo/cache/initial_cache.py b/buckaroo/cache/initial_cache.py new file mode 100644 index 000000000..4d2461302 --- /dev/null +++ b/buckaroo/cache/initial_cache.py @@ -0,0 +1,267 @@ +"""Initial-load cache: producer, handshake, and consumer. + +Three entry points realise the design's "serve the first render without +touching the data": + +* ``get_initial_cache_data(df_or_expr, ...)`` / ``build_bundle_from_dataflow`` — + the **producer**. Runs the analysis pipeline once and snapshots everything the + frontend needs for the first paint into an ``InitialCacheData`` bundle: the + prerendered ``df_display_args``, ``df_meta``, the type-tagged ``merged_sd`` + parquet (``sd_codec``), the first row window as parquet, and the + ``config_id`` / ``column_schema`` the handshake validates against. +* ``cache_mismatch_reason(bundle, ...)`` — the **handshake**. Returns ``None`` + when the bundle provably matches the live configuration (version + config_id + + schema), or a human-readable reason otherwise. The widget/server compute their + *own* config_id and compare — a bundle's claim is never blindly trusted. +* ``apply_initial_cache(target, bundle, ...)`` — the **consumer**. Hydrates a + target (widget / dataflow / server session) from the bundle alone — no + DataFrame, no expression execution. With replay-time display overrides it + regenerates ``df_display_args`` from a zero-row frame (styling reads only the + summary dict + column/index structure, never row values). + +Backend dispatch for ``get_initial_cache_data`` currently covers pandas; the +polars/xorq builders land with the server integration. ``build_bundle_from_dataflow`` +is backend-agnostic — it reads an already-constructed dataflow, which is how the +server (holding a built ``ServerDataflow`` / ``XorqServerDataflow``) uses it. See +docs/initial-load-cache-design.md. +""" +import copy +from dataclasses import dataclass, field +from typing import Any, Dict, List, Optional, Tuple + +import pandas as pd + +from buckaroo.cache.fingerprint import config_fingerprint +from buckaroo.cache.sd_codec import deserialize_sd, serialize_sd +from buckaroo.dataflow.styling_core import build_df_display_args +from buckaroo.serialization_utils import sd_to_parquet_b64, to_parquet + +# Bump when the bundle shape or replay logic changes incompatibly, so old +# bundles fail the handshake (warn + recompute) rather than mis-render. +CACHE_FORMAT_VERSION = 1 + +# Default first-window size — matches the design's 1000-row cached window. +DEFAULT_WINDOW = 1000 + + +@dataclass +class InitialCacheData: + """In-memory first-render snapshot for one (config, data) pair. + + ``sd_parquet`` and ``first_window_parquet`` hold raw parquet bytes (no b64, + no pickle); persistence to a manifest + parquet files is the store's job. + """ + config_id: str + data_id: Optional[str] + df_meta: Dict[str, Any] + column_schema: Dict[str, Any] + sd_parquet: bytes + first_window_parquet: bytes + first_window: Dict[str, int] + df_display_args: Dict[str, Any] + buckaroo_options: Dict[str, Any] + command_config: Dict[str, Any] + styling_klasses: List[str] = field(default_factory=list) + cache_format_version: int = CACHE_FORMAT_VERSION + + +def extract_column_schema(df: pd.DataFrame) -> Dict[str, Any]: + """Capture the column + index *structure* needed to (a) rebuild a zero-row + frame for styling regeneration and (b) detect a schema drift in the handshake. + + MultiIndex columns are stored as lists-of-lists with their level names so the + rebuilt frame reproduces the exact ``old_col_new_col`` mapping the styler keys + off. Row values are never captured. + """ + cols_multi = isinstance(df.columns, pd.MultiIndex) + columns = [list(c) if isinstance(c, tuple) else c for c in df.columns] + return { + 'columns': columns, + 'columns_multiindex': cols_multi, + 'columns_names': list(df.columns.names) if cols_multi else None, + 'dtypes': [str(dt) for dt in df.dtypes], + 'index_multiindex': isinstance(df.index, pd.MultiIndex), + 'index_names': [None if n is None else str(n) for n in df.index.names], + 'index_nlevels': df.index.nlevels} + + +def _zero_row_df(schema: Dict[str, Any]) -> pd.DataFrame: + """Rebuild a 0-row DataFrame with the schema's columns + index structure. + + Styling reads only column names/order (via ``old_col_new_col``) and index + structure (via ``get_left_col_configs``), so this regenerates identical + ``df_display_args`` without the real frame. + """ + cols = schema['columns'] + if schema.get('columns_multiindex'): + columns: Any = pd.MultiIndex.from_tuples( + [tuple(c) for c in cols], names=schema.get('columns_names')) + else: + columns = cols + if schema.get('index_multiindex'): + index: Any = pd.MultiIndex.from_arrays( + [[] for _ in range(schema['index_nlevels'])], names=schema['index_names']) + else: + names = schema.get('index_names') or [] + index = pd.Index([], name=names[0] if names else None) + return pd.DataFrame(index=index, columns=columns) + + +def build_bundle_from_dataflow(dataflow: Any, *, data_id: Optional[str] = None, + window: int = DEFAULT_WINDOW, cache_version: Optional[str] = None) -> InitialCacheData: + """Snapshot a *built* dataflow's first render into a bundle. + + Backend-agnostic: reads only the public traits a finished dataflow exposes + (``widget_args_tuple``, ``df_display_args``, ``df_meta``, ...). This is what + the server calls — it already holds a built ``ServerDataflow`` / + ``XorqServerDataflow``. + """ + _id, processed_df, merged_sd = dataflow.widget_args_tuple + config_id = config_fingerprint( + analysis_klasses=dataflow.analysis_klasses, + sampling_klass=getattr(dataflow, 'sampling_klass', None), + init_sd=getattr(dataflow, 'init_sd', None) or None, + skip_stat_columns=getattr(dataflow, 'skip_stat_columns', None), + cache_version=cache_version) + total = len(processed_df) + styling_klasses = [ + "%s.%s" % (k.__module__, getattr(k, '__qualname__', k.__name__)) + for k in dataflow.df_display_klasses.values()] + return InitialCacheData( + config_id=config_id, data_id=data_id, + df_meta=dict(dataflow.df_meta), + column_schema=extract_column_schema(processed_df), + sd_parquet=serialize_sd(merged_sd), + first_window_parquet=to_parquet(processed_df[0:window]), + first_window={'start': 0, 'end': min(window, total), 'total_rows': total}, + df_display_args=copy.deepcopy(dataflow.df_display_args), + buckaroo_options=dict(dataflow.buckaroo_options), + command_config=dict(dataflow.command_config), + styling_klasses=styling_klasses) + + +def _build_pandas_dataflow(df: pd.DataFrame, *, analysis_klasses: Any, sampling_klass: Any, + init_sd: Any, skip_stat_columns: Any, column_config_overrides: Any, component_config: Any, + extra_grid_config: Any, pinned_rows: Any) -> Any: + # ServerDataflow is the headless pandas pipeline the server + widget share, + # so a bundle built here matches what the live path produces. analysis_klasses + # / sampling_klass are class attributes, so a custom set needs a fresh subclass + # (the widget does the same with its InnerDataFlow). + from buckaroo.server.data_loading import ServerDataflow + cls = ServerDataflow + if analysis_klasses is not None or sampling_klass is not None: + class _CacheProducerDataflow(ServerDataflow): + pass + if analysis_klasses is not None: + _CacheProducerDataflow.analysis_klasses = analysis_klasses + if sampling_klass is not None: + _CacheProducerDataflow.sampling_klass = sampling_klass + cls = _CacheProducerDataflow + return cls(df, column_config_overrides=column_config_overrides, pinned_rows=pinned_rows, + extra_grid_config=extra_grid_config, component_config=component_config, init_sd=init_sd, + skip_stat_columns=skip_stat_columns, skip_main_serial=True) + + +def get_initial_cache_data(df_or_expr: Any, *, analysis_klasses: Any = None, sampling_klass: Any = None, + init_sd: Any = None, skip_stat_columns: Any = None, window: int = DEFAULT_WINDOW, + data_id: Optional[str] = None, cache_version: Optional[str] = None, + column_config_overrides: Any = None, component_config: Any = None, extra_grid_config: Any = None, + pinned_rows: Any = None) -> Tuple[str, InitialCacheData]: + """Producer: build the pipeline once and return ``(config_id, bundle)``. + + Display overrides passed here are baked into the bundle's baseline + ``df_display_args``; the config_id excludes them, so re-theming at replay + never invalidates the cache. + """ + if not isinstance(df_or_expr, pd.DataFrame): + raise NotImplementedError( + "get_initial_cache_data currently builds the pipeline for pandas " + "DataFrames; polars/xorq dispatch lands with the server integration.") + dataflow = _build_pandas_dataflow( + df_or_expr, analysis_klasses=analysis_klasses, sampling_klass=sampling_klass, + init_sd=init_sd, skip_stat_columns=skip_stat_columns, + column_config_overrides=column_config_overrides, component_config=component_config, + extra_grid_config=extra_grid_config, pinned_rows=pinned_rows) + bundle = build_bundle_from_dataflow( + dataflow, data_id=data_id, window=window, cache_version=cache_version) + return bundle.config_id, bundle + + +def _schema_compatible(cached: Dict[str, Any], live: Dict[str, Any]) -> bool: + return (cached.get('columns') == live.get('columns') + and cached.get('dtypes') == live.get('dtypes')) + + +def cache_mismatch_reason(bundle: InitialCacheData, *, analysis_klasses: Any, + sampling_klass: Any = None, init_sd: Any = None, skip_stat_columns: Any = None, + schema: Optional[Dict[str, Any]] = None, cache_version: Optional[str] = None) -> Optional[str]: + """Return ``None`` when the bundle is safe to replay, else why it isn't. + + Validates, in order: bundle format version, the data-touching ``config_id`` + (recomputed from the *live* config — never read from the bundle), and, when a + live ``schema`` is supplied, that its columns/dtypes match the cached ones. + """ + if bundle.cache_format_version != CACHE_FORMAT_VERSION: + return ("cache_format_version mismatch: bundle %r != runtime %r" + % (bundle.cache_format_version, CACHE_FORMAT_VERSION)) + live_config_id = config_fingerprint( + analysis_klasses=analysis_klasses, sampling_klass=sampling_klass, + init_sd=init_sd, skip_stat_columns=skip_stat_columns, cache_version=cache_version) + if live_config_id != bundle.config_id: + return ("config_id mismatch: live analysis klasses / sampling / init_sd / " + "version differ from the cached bundle") + if schema is not None and not _schema_compatible(bundle.column_schema, schema): + return "schema mismatch: live columns/dtypes differ from the cached bundle" + return None + + +def serve_window_request(payload_args: Dict[str, Any], window: int = DEFAULT_WINDOW, + search_string: str = "") -> bool: + """True iff an infinite_request can be answered from the cached first window. + + The bundle caches only the head slice ``[0:window]``, unsorted and unfiltered. + A sort, a live per-client search, a non-zero start, or an end past the cached + window must fall through to the live source (the scroll path that warms the + expr). ``end`` may exceed the actual row count — the cached parquet simply has + fewer rows — so the bound is the configured ``window``, not the row total. + """ + if search_string: + return False + if payload_args.get('sort'): + return False + start = payload_args.get('start', 0) or 0 + end = payload_args.get('end', 0) or 0 + return start == 0 and 0 <= end <= window + + +def apply_initial_cache(target: Any, bundle: InitialCacheData, *, df_display_klasses: Any = None, + column_config_overrides: Any = None, component_config: Any = None, extra_grid_config: Any = None, + pinned_rows: Any = None, sd_to_jsondf: Any = None) -> None: + """Hydrate ``target`` from the bundle alone — no DataFrame, no execution. + + Sets ``df_data_dict`` / ``df_display_args`` / ``df_meta`` / ``buckaroo_options`` + / ``command_config``. ``df_data_dict['main']`` stays empty (rows arrive via the + window channel); ``all_stats`` is re-derived from the cached ``merged_sd`` via + ``sd_to_jsondf`` (default: the pandas parquet-b64 encoder; polars overrides it). + + When replay-time display overrides are supplied *and* ``df_display_klasses`` is + given, ``df_display_args`` is regenerated from a zero-row frame so the + overrides take effect without rebuilding the source frame. Otherwise the + bundle's prerendered (baseline) ``df_display_args`` is used directly. + """ + if sd_to_jsondf is None: + sd_to_jsondf = sd_to_parquet_b64 + merged_sd = deserialize_sd(bundle.sd_parquet) + target.df_data_dict = {'main': [], 'all_stats': sd_to_jsondf(merged_sd), 'empty': []} + + has_overrides = bool(column_config_overrides or component_config or extra_grid_config or pinned_rows) + if has_overrides and df_display_klasses is not None: + zdf = _zero_row_df(bundle.column_schema) + target.df_display_args = build_df_display_args( + df_display_klasses, merged_sd, zdf, column_config_overrides or {}, + pinned_rows=pinned_rows, extra_grid_config=extra_grid_config, component_config=component_config) + else: + target.df_display_args = copy.deepcopy(bundle.df_display_args) + target.df_meta = dict(bundle.df_meta) + target.buckaroo_options = dict(bundle.buckaroo_options) + target.command_config = dict(bundle.command_config) diff --git a/buckaroo/cache/sd_codec.py b/buckaroo/cache/sd_codec.py new file mode 100644 index 000000000..434a23e0b --- /dev/null +++ b/buckaroo/cache/sd_codec.py @@ -0,0 +1,118 @@ +"""Lossless summary-dict (``merged_sd``) codec for the initial-load cache. + +Persists ``merged_sd`` to parquet **without pickle**, round-tripping every value +type the backends emit. The trick is a type-tagged JSON encoding: JSON-native +values pass through; the rest (``pd.Timestamp`` / ``pd.Timedelta``, stdlib +``datetime`` / ``date`` / ``time`` / ``timedelta``, ``Decimal``, ``bytes``, numpy +scalars, NaN, MultiIndex tuples) are wrapped in a small ``{__bk_t__: tag, v: ...}`` +envelope and reconstructed on decode. The tagged JSON is stored as a single cell +in a parquet container, honouring "persist via parquet". + +``value_counts`` is dropped — nothing at replay recomputes from it and the +frontend never reads it (see #880). numpy scalars decode to native Python +(value-lossless); only types where Python-native would lose information get a +tag. See docs/initial-load-cache-design.md. +""" +import base64 +import datetime +import decimal +import io +import json +import math +from typing import Any + +import numpy as np +import pandas as pd + +_TAG = '__bk_t__' # distinctive sentinel; real stat keys never collide with it +_COL = 'sd_json' + + +def _enc(v: Any) -> Any: + if v is None or isinstance(v, str): + return v + if isinstance(v, bool): # before int — bool is an int subclass + return v + if isinstance(v, np.bool_): + return bool(v) + if isinstance(v, (int, np.integer)): + return int(v) + if isinstance(v, (float, np.floating)): # np.float64 is a float subclass + f = float(v) + return {_TAG: 'nan'} if math.isnan(f) else f + if isinstance(v, np.datetime64): + return {_TAG: 'pd.Timestamp', 'v': pd.Timestamp(v).isoformat()} + if isinstance(v, pd.Timestamp): # before datetime — Timestamp is a datetime subclass + return {_TAG: 'pd.Timestamp', 'v': v.isoformat()} + if isinstance(v, pd.Timedelta): # before timedelta — Timedelta is a timedelta subclass + return {_TAG: 'pd.Timedelta', 'v': v.total_seconds()} + if isinstance(v, datetime.datetime): # before date — datetime is a date subclass + return {_TAG: 'datetime', 'v': v.isoformat()} + if isinstance(v, datetime.date): + return {_TAG: 'date', 'v': v.isoformat()} + if isinstance(v, datetime.time): + return {_TAG: 'time', 'v': v.isoformat()} + if isinstance(v, datetime.timedelta): + return {_TAG: 'timedelta', 'v': v.total_seconds()} + if isinstance(v, decimal.Decimal): + return {_TAG: 'decimal', 'v': str(v)} + if isinstance(v, bytes): + return {_TAG: 'bytes', 'v': base64.b64encode(v).decode('ascii')} + if isinstance(v, np.ndarray): + return [_enc(x) for x in v.tolist()] + if isinstance(v, tuple): + return {_TAG: 'tuple', 'v': [_enc(x) for x in v]} + if isinstance(v, list): + return [_enc(x) for x in v] + if isinstance(v, dict): + return {str(k): _enc(x) for k, x in v.items()} + return {_TAG: 'str', 'v': str(v)} # lossy fallback for anything exotic + + +def _dec(v: Any) -> Any: + if isinstance(v, dict): + tag = v.get(_TAG) + if tag is None: + return {k: _dec(x) for k, x in v.items()} + if tag == 'nan': + return float('nan') + if tag == 'pd.Timestamp': + return pd.Timestamp(v['v']) + if tag == 'pd.Timedelta': + return pd.Timedelta(seconds=v['v']) + if tag == 'datetime': + return datetime.datetime.fromisoformat(v['v']) + if tag == 'date': + return datetime.date.fromisoformat(v['v']) + if tag == 'time': + return datetime.time.fromisoformat(v['v']) + if tag == 'timedelta': + return datetime.timedelta(seconds=v['v']) + if tag == 'decimal': + return decimal.Decimal(v['v']) + if tag == 'bytes': + return base64.b64decode(v['v']) + if tag == 'tuple': + return tuple(_dec(x) for x in v['v']) + if tag == 'str': + return v['v'] + return v + if isinstance(v, list): + return [_dec(x) for x in v] + return v + + +def serialize_sd(sd: dict) -> bytes: + """Encode ``merged_sd`` (minus ``value_counts``) to parquet bytes, losslessly.""" + clean = {col: {k: val for k, val in (cm or {}).items() if k != 'value_counts'} + for col, cm in sd.items()} + payload = json.dumps(_enc(clean)) + buf = io.BytesIO() + pd.DataFrame({_COL: [payload]}).to_parquet(buf, index=False) + return buf.getvalue() + + +def deserialize_sd(data: bytes) -> dict: + """Inverse of ``serialize_sd``.""" + df = pd.read_parquet(io.BytesIO(data)) + return _dec(json.loads(df[_COL].iloc[0])) diff --git a/buckaroo/cache/store.py b/buckaroo/cache/store.py new file mode 100644 index 000000000..82036eb52 --- /dev/null +++ b/buckaroo/cache/store.py @@ -0,0 +1,182 @@ +"""Server-managed initial-load cache store. + +An in-memory LRU over a persistent on-disk directory, keyed by ``data_id`` (the +xorq expr hash, or a host-supplied file identity). Backs the server's +``/load_expr`` hit path: + +* ``put`` writes a bundle to memory *and* disk (so it survives a restart). +* ``get`` returns an in-memory hit directly, or lazily faults a bundle in from + disk on a cold hit, promoting it into the LRU. +* ``prewarm`` loads every persisted bundle under a directory eagerly at startup. +* ``report`` feeds the ``/cache`` introspection endpoint. + +Each bundle persists as a directory ``//`` holding a +``manifest.json`` (everything but the two parquet blobs) plus ``sd.parquet`` and +``first_window.parquet`` — binary on disk, no b64, no pickle. See +docs/initial-load-cache-design.md. +""" +import json +import logging +import os +from collections import OrderedDict +from typing import Any, Dict, List, Optional + +from buckaroo.cache.initial_cache import InitialCacheData + +log = logging.getLogger("buckaroo.cache.store") + +_MANIFEST = "manifest.json" +_SD = "sd.parquet" +_WINDOW = "first_window.parquet" +_DEFAULT_CAPACITY = 64 + +# A manifest is the bundle minus the two parquet byte blobs (stored as sibling +# files). Keys match InitialCacheData field names so read_bundle can splat them. +_MANIFEST_FIELDS = ('config_id', 'data_id', 'df_meta', 'column_schema', 'first_window', + 'df_display_args', 'buckaroo_options', 'command_config', 'styling_klasses', 'cache_format_version') + + +def write_bundle(bundle: InitialCacheData, dir_path: str) -> None: + """Persist a bundle to ``dir_path`` as manifest.json + two parquet files.""" + os.makedirs(dir_path, exist_ok=True) + with open(os.path.join(dir_path, _SD), 'wb') as fh: + fh.write(bundle.sd_parquet) + with open(os.path.join(dir_path, _WINDOW), 'wb') as fh: + fh.write(bundle.first_window_parquet) + manifest = {f: getattr(bundle, f) for f in _MANIFEST_FIELDS} + # Write the manifest last, tmp-then-rename, so a crash mid-write can't leave a + # half-written manifest that prewarm would later choke on (the manifest's + # presence is the "this bundle is complete" signal). + tmp = os.path.join(dir_path, _MANIFEST + '.tmp') + with open(tmp, 'w') as fh: + json.dump(manifest, fh) + os.replace(tmp, os.path.join(dir_path, _MANIFEST)) + + +def read_bundle(dir_path: str) -> InitialCacheData: + """Inverse of ``write_bundle``.""" + with open(os.path.join(dir_path, _MANIFEST)) as fh: + manifest = json.load(fh) + with open(os.path.join(dir_path, _SD), 'rb') as fh: + sd_parquet = fh.read() + with open(os.path.join(dir_path, _WINDOW), 'rb') as fh: + first_window_parquet = fh.read() + return InitialCacheData(sd_parquet=sd_parquet, first_window_parquet=first_window_parquet, **manifest) + + +class InitialCacheStore: + """In-memory LRU over an optional persistent on-disk directory. + + ``base_dir=None`` makes the store memory-only (handy for tests and hosts that + don't want disk). LRU eviction only drops from memory — disk is the durable + layer, so an evicted entry faults back in on the next ``get``. + """ + + def __init__(self, base_dir: Optional[str] = None, capacity: int = _DEFAULT_CAPACITY) -> None: + self.base_dir = os.path.expanduser(base_dir) if base_dir else None + self.capacity = capacity + self._mem: "OrderedDict[str, InitialCacheData]" = OrderedDict() + self._hits: Dict[str, int] = {} + self._disk_loads = 0 + self._misses = 0 + + def _bundle_dir(self, data_id: str) -> Optional[str]: + return os.path.join(self.base_dir, data_id) if self.base_dir else None + + def put(self, bundle: InitialCacheData) -> None: + data_id = bundle.data_id + if not data_id: + raise ValueError("bundle.data_id is required to store a bundle") + self._mem[data_id] = bundle + self._mem.move_to_end(data_id) + self._hits.setdefault(data_id, 0) + if self.base_dir: + try: + write_bundle(bundle, self._bundle_dir(data_id)) + except Exception as e: + log.warning("initial-cache: failed to persist bundle %s: %r", data_id, e) + self._evict_overflow() + + def get(self, data_id: str) -> Optional[InitialCacheData]: + if data_id in self._mem: + self._mem.move_to_end(data_id) + self._hits[data_id] = self._hits.get(data_id, 0) + 1 + return self._mem[data_id] + bundle = self._load_from_disk(data_id) + if bundle is None: + self._misses += 1 + return None + self._mem[data_id] = bundle + self._mem.move_to_end(data_id) + self._hits[data_id] = self._hits.get(data_id, 0) + 1 + self._disk_loads += 1 + self._evict_overflow() + return bundle + + def __contains__(self, data_id: str) -> bool: + if data_id in self._mem: + return True + d = self._bundle_dir(data_id) + return bool(d and os.path.isfile(os.path.join(d, _MANIFEST))) + + def _load_from_disk(self, data_id: str) -> Optional[InitialCacheData]: + d = self._bundle_dir(data_id) + if not d or not os.path.isfile(os.path.join(d, _MANIFEST)): + return None + try: + return read_bundle(d) + except Exception as e: + log.warning("initial-cache: failed to read persisted bundle %s: %r", data_id, e) + return None + + def _evict_overflow(self) -> None: + # Memory-only eviction — the on-disk copy (if any) remains the durable + # layer and is faulted back in on a later get(). + while len(self._mem) > self.capacity: + self._mem.popitem(last=False) + + def prewarm(self, dir_path: Optional[str] = None) -> int: + """Eagerly load every persisted bundle under ``dir_path`` (default + ``base_dir``) into memory. Returns the count loaded.""" + dir_path = dir_path or self.base_dir + if not dir_path or not os.path.isdir(dir_path): + return 0 + loaded = 0 + for name in sorted(os.listdir(dir_path)): + sub = os.path.join(dir_path, name) + if not os.path.isfile(os.path.join(sub, _MANIFEST)): + continue + try: + bundle = read_bundle(sub) + except Exception as e: + log.warning("initial-cache: prewarm skipped %s: %r", sub, e) + continue + key = bundle.data_id or name + self._mem[key] = bundle + self._hits.setdefault(key, 0) + loaded += 1 + self._evict_overflow() + return loaded + + def entries(self) -> List[Dict[str, Any]]: + out: List[Dict[str, Any]] = [] + for data_id, bundle in self._mem.items(): + out.append({ + 'data_id': data_id, + 'config_id': bundle.config_id, + 'bytes': len(bundle.sd_parquet) + len(bundle.first_window_parquet), + 'hits': self._hits.get(data_id, 0), + 'total_rows': bundle.first_window.get('total_rows')}) + return out + + def report(self) -> Dict[str, Any]: + """Introspection payload for the ``/cache`` endpoint.""" + entries = self.entries() + return { + 'entries': entries, + 'count': len(entries), + 'capacity': self.capacity, + 'total_bytes': sum(e['bytes'] for e in entries), + 'disk_loads': self._disk_loads, + 'misses': self._misses, + 'base_dir': self.base_dir} diff --git a/buckaroo/dataflow/dataflow.py b/buckaroo/dataflow/dataflow.py index 1f1f323c2..b837fbd6d 100644 --- a/buckaroo/dataflow/dataflow.py +++ b/buckaroo/dataflow/dataflow.py @@ -17,7 +17,7 @@ OverrideColumnConfig, PinnedRowConfig, merge_sd_overrides, - merge_sds, merge_column_config, StylingAnalysis) + merge_sds, StylingAnalysis, build_df_display_args) from .abc_dataflow import ABCDataflow @@ -702,25 +702,10 @@ def _handle_widget_change(self, change): 'all_stats': self._sd_to_jsondf(merged_sd), 'empty': []} - temp_display_args = {} - for display_name, A_Klass in self.df_display_klasses.items(): - df_viewer_config = A_Klass.get_dfviewer_config(merged_sd, processed_df) - base_column_config = df_viewer_config['column_config'] - df_viewer_config['column_config'] = merge_column_config( - base_column_config, self.processed_df, self.column_config_overrides) - disp_arg = {'data_key': A_Klass.data_key, - 'df_viewer_config': df_viewer_config, - 'summary_stats_key': A_Klass.summary_stats_key} - temp_display_args[display_name] = disp_arg - - if self.pinned_rows is not None: - temp_display_args['main']['df_viewer_config']['pinned_rows'] = self.pinned_rows - if self.extra_grid_config: - temp_display_args['main']['df_viewer_config']['extra_grid_config'] = self.extra_grid_config - if self.component_config: - temp_display_args['main']['df_viewer_config']['component_config'] = self.component_config - - self.df_display_args = temp_display_args + self.df_display_args = build_df_display_args( + self.df_display_klasses, merged_sd, processed_df, self.column_config_overrides, + pinned_rows=self.pinned_rows, extra_grid_config=self.extra_grid_config, + component_config=self.component_config) """ diff --git a/buckaroo/dataflow/styling_core.py b/buckaroo/dataflow/styling_core.py index 286a4ad07..e59850327 100644 --- a/buckaroo/dataflow/styling_core.py +++ b/buckaroo/dataflow/styling_core.py @@ -272,6 +272,35 @@ def rewrite_override_col_references(rewrites: Dict[ColIdentifier, ColIdentifier] return obj +def build_df_display_args(df_display_klasses: Dict[str, Any], merged_sd: SDType, processed_df: pd.DataFrame, + column_config_overrides: OverrideColumnConfig, pinned_rows: Any = None, extra_grid_config: Any = None, + component_config: Any = None) -> Dict[str, DisplayArgs]: + """Assemble ``df_display_args`` from styling klasses + summary dict + frame. + + Shared by the live dataflow path (``_handle_widget_change``) and the + initial-load cache replay path. Reads only ``merged_sd`` plus the frame's + column / index *structure* (never row values), so a zero-row frame with the + right schema regenerates identical display args. + """ + temp_display_args: Dict[str, DisplayArgs] = {} + for display_name, A_Klass in df_display_klasses.items(): + df_viewer_config = A_Klass.get_dfviewer_config(merged_sd, processed_df) + base_column_config = df_viewer_config['column_config'] + df_viewer_config['column_config'] = merge_column_config( + base_column_config, processed_df, column_config_overrides) + temp_display_args[display_name] = { + 'data_key': A_Klass.data_key, + 'df_viewer_config': df_viewer_config, + 'summary_stats_key': A_Klass.summary_stats_key} + if pinned_rows is not None: + temp_display_args['main']['df_viewer_config']['pinned_rows'] = pinned_rows + if extra_grid_config: + temp_display_args['main']['df_viewer_config']['extra_grid_config'] = extra_grid_config + if component_config: + temp_display_args['main']['df_viewer_config']['component_config'] = component_config + return temp_display_args + + def merge_sd_overrides(final_sd:SDType, df:pd.DataFrame, overrides:SDType) -> SDType: """ this is psecifically built for places where keys from the original dataframe will be used in 'overrides' diff --git a/buckaroo/serialization_utils.py b/buckaroo/serialization_utils.py index f3fb6ed7a..95136f422 100644 --- a/buckaroo/serialization_utils.py +++ b/buckaroo/serialization_utils.py @@ -242,6 +242,30 @@ def fake_get_cached_codec(): return data.read() +def slice_window_parquet(parquet_bytes: bytes, start: int, end: int) -> bytes: + """Re-slice an already-serialized row window to rows ``[start, end)``. + + The initial-load cache parks a head window (up to ``DEFAULT_WINDOW`` rows) as + parquet on the session. AG Grid's first block is ``cacheBlockSize`` rows + (visible + 50), smaller than that window, and the client's + ``SmartRowCache.addRows`` rejects a payload whose row count differs from the + requested ``[start, end]`` segment — so a whole-window response breaks the + initial paint once ``total_rows`` exceeds the window. Slicing at the pyarrow + level reproduces the exact wire format the live xorq path + (``window_to_parquet``) writes and the JS ``hyparquet`` reader consumes; only + the row count changes. ``end`` past the window clamps to the rows available. + """ + import pyarrow.parquet as pq + + table = pq.read_table(BytesIO(parquet_bytes)) + lo = max(0, start) + hi = min(end, table.num_rows) + sliced = table.slice(lo, max(0, hi - lo)) + out = BytesIO() + pq.write_table(sliced, out, compression='none') + return out.getvalue() + + def to_parquet_b64(df: pd.DataFrame) -> str: """Convert a DataFrame to a base64-encoded parquet string. diff --git a/buckaroo/server/app.py b/buckaroo/server/app.py index c9a1f937e..2e68fa9cc 100644 --- a/buckaroo/server/app.py +++ b/buckaroo/server/app.py @@ -3,15 +3,17 @@ import tornado.web -from buckaroo.server.handlers import HealthHandler, DiagnosticsHandler, LoadHandler, LoadExprHandler, LoadCompareHandler, ReloadExprHandler, SessionPageHandler +from buckaroo.server.handlers import HealthHandler, DiagnosticsHandler, LoadHandler, LoadExprHandler, LoadCompareHandler, ReloadExprHandler, SessionPageHandler, CacheHandler from buckaroo.server.websocket_handler import DataStreamHandler from buckaroo.server.session import SessionManager +from buckaroo.cache.store import InitialCacheStore SERVER_START_TIME = time.time() def make_app(sessions: SessionManager | None = None, port: int = 8888, open_browser: bool = True, - datasets: list | None = None) -> tornado.web.Application: + datasets: list | None = None, initial_cache_store: InitialCacheStore | None = None, + initial_cache_dir: str | None = None) -> tornado.web.Application: """Build the tornado app. ``datasets`` is the operator-supplied list of dropdown entries the @@ -19,9 +21,18 @@ def make_app(sessions: SessionManager | None = None, port: int = 8888, open_brow dict, with ``kind`` in ``("pandas", "lazy", "xorq")``. ``None`` / ``[]`` means no datasets configured: the page omits the dropdown entirely rather than shipping the author's filesystem layout. See - issue #811.""" + issue #811. + + ``initial_cache_store`` is the server-managed initial-load cache (the + ``/load_expr`` hit path + ``/cache`` endpoint). Defaults to a memory-only + store; pass ``initial_cache_dir`` for a persistent one (prewarmed eagerly at + startup) or inject a prebuilt store directly.""" if sessions is None: sessions = SessionManager() + if initial_cache_store is None: + initial_cache_store = InitialCacheStore(base_dir=initial_cache_dir) + if initial_cache_dir: + initial_cache_store.prewarm() static_path = os.path.join(os.path.dirname(__file__), "..", "static") @@ -32,8 +43,10 @@ def make_app(sessions: SessionManager | None = None, port: int = 8888, open_brow (r"/load_expr", LoadExprHandler), (r"/reload_expr/([^/]+)", ReloadExprHandler), (r"/load_compare", LoadCompareHandler), + (r"/cache", CacheHandler), (r"/s/([^/]+)", SessionPageHandler), (r"/ws/([^/]+)", DataStreamHandler), ], sessions=sessions, port=port, open_browser=open_browser, static_path=os.path.abspath(static_path), - server_start_time=SERVER_START_TIME, datasets=datasets or []) + server_start_time=SERVER_START_TIME, datasets=datasets or [], + initial_cache_store=initial_cache_store) diff --git a/buckaroo/server/handlers.py b/buckaroo/server/handlers.py index 824c105dc..f51025a63 100644 --- a/buckaroo/server/handlers.py +++ b/buckaroo/server/handlers.py @@ -6,10 +6,12 @@ import time import traceback import uuid +import warnings import tornado.escape import tornado.web +from buckaroo.cache.initial_cache import DEFAULT_WINDOW, apply_initial_cache, cache_mismatch_reason from buckaroo.server.data_loading import (load_file, get_metadata, get_display_state, create_dataflow, get_buckaroo_display_state, load_file_lazy, get_metadata_lazy, get_display_state_lazy) from buckaroo.compare import col_join_dfs from buckaroo.df_util import old_col_new_col @@ -306,6 +308,10 @@ async def post(self): session.backend = backend session.xorq_dataflow = None session.expr = None + # A pandas/polars load supersedes any xorq initial-load cache window — + # otherwise the WS serve_window fast path would ship the prior expr's + # cached head slice for the new dataset. + session.initial_cache_window = None session.prompt = prompt if component_config: session.component_config = component_config @@ -428,19 +434,36 @@ async def post(self): project_root = body.get("project_root") cache_storage_path = body.get("cache_storage_path") - + # Initial-load cache: default ON for /load_expr; ``initial_cache: false`` + # turns it off (full compute, skip the store). ``request_id`` is the + # host's correlation id — echoed in the response + stamped on the log + # line so the caller can align its logs with the server's. + initial_cache_enabled = bool(body.get("initial_cache", True)) + request_id = body.get("request_id") + + store = self.application.settings["initial_cache_store"] try: expr = xorq_loading.load_expr_build_dir(build_dir) extra_klasses = ( xorq_loading.load_project_stat_klasses(project_root) + xorq_loading.load_project_post_processing_klasses(project_root) if project_root else []) + # Initial-load cache handshake (default-on). load_expr is already + # cheap; a HIT additionally skips the N+1 stat pipeline by building + # the dataflow with every column's stats skipped — the first paint + + # stats are then served from the cached bundle (apply_initial_cache + # below), while the dataflow stays fully functional for scroll / sort + # / live-search (which need only the expr + orig_col_name). + bundle, cache_status, data_id = self._cache_handshake( + expr, store, initial_cache_enabled, extra_klasses, init_sd, skip_stat_columns) + hit = cache_status == "hit" + skip_cols = list(expr.columns) if hit else skip_stat_columns xorq_dataflow = xorq_loading.XorqServerDataflow( expr, skip_main_serial=True, extra_klasses=extra_klasses, cache_storage_path=cache_storage_path, column_config_overrides=column_config_overrides, extra_grid_config=extra_grid_config, init_sd=init_sd, - skip_stat_columns=skip_stat_columns) + skip_stat_columns=skip_cols) metadata = xorq_loading.get_xorq_metadata(xorq_dataflow, build_dir) except FileNotFoundError: self.set_status(404) @@ -490,6 +513,35 @@ async def post(self): "generated_py_code": "# server mode"} session.operations = [] + # Initial-load cache: replay (hit) or store (miss/mismatch). On a hit the + # first paint + stats come from the bundle (the cheap dataflow's skipped + # stats are never shown); on a miss/mismatch the freshly-built bundle is + # stored. Either way the first window is parked on the session for the WS + # serve_window fast path. Done before the component_config merge so the + # merge lands on the bundle's (replayed) df_display_args too. + if hit: + # Pass the current request's display knobs (built into xorq_dataflow) + # so the replay regenerates df_display_args with them — they're + # excluded from the cache fingerprint by design, so a hit on the same + # expr must still honor this load's overrides rather than the bundle's + # baseline (#877). Mirrors the widget path. component_config is applied + # by the merge loop below, as on the miss path. + apply_initial_cache( + session, bundle, + df_display_klasses=xorq_dataflow.df_display_klasses, + column_config_overrides=xorq_dataflow.column_config_overrides, + extra_grid_config=xorq_dataflow.extra_grid_config, + pinned_rows=xorq_dataflow.pinned_rows) + session.initial_cache_window = ( + bundle.first_window_parquet, DEFAULT_WINDOW, bundle.first_window["total_rows"]) + elif cache_status in ("miss", "mismatch"): + stored = self._store_bundle(xorq_dataflow, data_id) + session.initial_cache_window = ( + stored.first_window_parquet, DEFAULT_WINDOW, stored.first_window["total_rows"]) if stored else None + else: + session.initial_cache_window = None + cache_block = {"status": cache_status, "data_id": data_id, "request_id": request_id} + if component_config and session.df_display_args: for key in session.df_display_args: dvc = session.df_display_args[key].get("df_viewer_config") @@ -516,10 +568,57 @@ async def post(self): port = self.application.settings["port"] browser_action = find_or_create_session_window(session_id, port, reload_if_found=True) - log.info("load_expr session=%s build_dir=%s rows=%d backend=xorq", - session_id, build_dir, metadata["rows"]) + log.info("load_expr session=%s build_dir=%s rows=%d backend=xorq cache=%s request_id=%s", + session_id, build_dir, metadata["rows"], cache_block["status"], request_id) self.write({"session": session_id, "server_pid": os.getpid(), - "browser_action": browser_action, **metadata}) + "browser_action": browser_action, "cache": cache_block, **metadata}) + + def _cache_handshake(self, expr, store, enabled: bool, extra_klasses, init_sd, skip_stat_columns): + """Decide the initial-load cache outcome for ``expr`` *before* building. + + Returns ``(bundle_or_None, status, data_id)`` with ``status`` in + ``{off, miss, hit, mismatch, error}``. A ``hit`` returns the cached bundle + to replay; ``miss`` / ``mismatch`` return ``None`` (the caller builds + + stores). The live config_id is recomputed from the klasses that *would* be + used (built-in xorq stats + project extras) and the request's + init_sd / skip columns — never read from the bundle. Best-effort: any + cache error degrades to a normal compute (``status='error'``).""" + if not enabled: + return None, "off", None + try: + from buckaroo.server import xorq_loading + from buckaroo.xorq_buckaroo import _XORQ_ANALYSIS_KLASSES + data_id = xorq_loading.expr_data_id(expr) + bundle = store.get(data_id) + if bundle is None: + return None, "miss", data_id + live_klasses = list(_XORQ_ANALYSIS_KLASSES) + list(extra_klasses or []) + reason = cache_mismatch_reason( + bundle, analysis_klasses=live_klasses, + sampling_klass=xorq_loading.XorqServerDataflow.sampling_klass, + init_sd=init_sd or None, skip_stat_columns=skip_stat_columns) + if reason is None: + return bundle, "hit", data_id + warnings.warn("initial-load cache mismatch (recomputing): %s" % reason) + return None, "mismatch", data_id + except Exception: + log.error("initial-cache handshake failed: %s", traceback.format_exc()) + return None, "error", None + + def _store_bundle(self, xorq_dataflow, data_id): + """Build + store the bundle for a miss/mismatch. Best-effort — returns the + stored bundle, or ``None`` if building/storing failed (the load still + succeeds; only the cache fast path is skipped).""" + if data_id is None: + return None + try: + from buckaroo.server import xorq_loading + bundle = xorq_loading.build_xorq_bundle(xorq_dataflow, data_id) + self.application.settings["initial_cache_store"].put(bundle) + return bundle + except Exception: + log.error("initial-cache store failed: %s", traceback.format_exc()) + return None class LoadCompareHandler(tornado.web.RequestHandler): @@ -818,6 +917,22 @@ def _render_engine_bar(datasets: list) -> tuple: return bar, datasets_json +class CacheHandler(tornado.web.RequestHandler): + """GET /cache — initial-load cache introspection. + + Reports the in-memory LRU's entries (data_id, config_id, bytes, hits, + total_rows), totals, capacity, and disk-load / miss counters — so a host can + see what's cached and how the cache is performing. Broader session/server + enumeration is #860.""" + + def get(self): + store = self.application.settings.get("initial_cache_store") + if store is None: + self.write({"count": 0, "entries": [], "capacity": 0, "total_bytes": 0}) + return + self.write(store.report()) + + class SessionPageHandler(tornado.web.RequestHandler): def get(self, session_id): self.set_header("Content-Type", "text/html") diff --git a/buckaroo/server/session.py b/buckaroo/server/session.py index dddd3860e..40652adba 100644 --- a/buckaroo/server/session.py +++ b/buckaroo/server/session.py @@ -34,6 +34,12 @@ class SessionState: expr: Any = None # ibis/xorq expression when backend="xorq" build_dir: Optional[str] = None # xorq build dir, stored for /reload_expr project_root: Optional[str] = None # project root for klass discovery + # Initial-load cache first window: ``(parquet_bytes, window_size, total_rows)`` + # set on a /load_expr miss-or-hit so the WS serve_window fast path can answer + # the head slice without touching the expr. Cleared by /load (pandas/polars) + # and by /load_expr with initial_cache disabled — a stale window from a prior + # dataset must never bleed into a new one. + initial_cache_window: Optional[tuple] = None buckaroo_state: dict = field(default_factory=dict) # NOTE: ``search_string`` used to live here, but it's per-client typing # state (not a session-wide property). Two clients sharing a session diff --git a/buckaroo/server/websocket_handler.py b/buckaroo/server/websocket_handler.py index 6288aafa3..9c6758aed 100644 --- a/buckaroo/server/websocket_handler.py +++ b/buckaroo/server/websocket_handler.py @@ -7,6 +7,8 @@ import tornado.websocket +from buckaroo.cache.initial_cache import serve_window_request +from buckaroo.serialization_utils import slice_window_parquet from buckaroo.server.data_loading import (handle_infinite_request, handle_infinite_request_buckaroo, handle_infinite_request_lazy, get_buckaroo_display_state) from buckaroo.server.session import build_state_message @@ -189,6 +191,34 @@ def _send_highlight_overlay(self, session): except Exception: log.debug("highlight overlay write failed for session=%s", self.session_id) + def _serve_from_cache_if_window(self, session, pa): + """Ship ``pa`` from the initial-load cache window if it qualifies. + + The bundle's first window (parked on the session by /load_expr) answers + the head slice — unsorted, unfiltered — without executing the expr. + Returns True when served; False (sort / live search / deeper slice) so + the caller falls through to the live dispatch. ``search_string`` is the + per-client live term (#851), read off the handler.""" + icw = getattr(session, "initial_cache_window", None) + if icw is None: + return False + window_parquet, window_size, total_rows = icw + if not serve_window_request(pa, window_size, self.search_string or ""): + return False + # Ship exactly the requested [start, end] slice. The cached window holds + # up to window_size rows, but AG Grid's first block is smaller, and the + # client rejects a payload whose row count != the requested segment once + # total_rows > window_size (#877) — so the response key stays pa while the + # parquet is sliced to match. length stays the full total (scrollbar size). + start = pa.get("start", 0) or 0 + end = pa.get("end", 0) or 0 + sliced = slice_window_parquet(window_parquet, start, end) if window_parquet else b"" + self.write_message(json.dumps( + {"type": "infinite_resp", "key": pa, "data": [], "length": total_rows})) + if sliced: + self.write_message(sliced, binary=True) + return True + def _handle_infinite_request(self, payload_args): sessions = self.application.settings["sessions"] session = sessions.get(self.session_id) @@ -220,15 +250,19 @@ def _dispatch(pa): return handle_infinite_request(session.df, pa) try: - resp_msg, parquet_bytes = _dispatch(payload_args) - # Two-frame sequence: JSON text frame, then binary Parquet frame - self.write_message(json.dumps(resp_msg)) - if parquet_bytes: - self.write_message(parquet_bytes, binary=True) + # Initial-load cache fast path: the cached head window is shipped + # without touching the expr. Falls through to the live (warmed) + # dispatch for sorts, searches, and deeper slices. + if not self._serve_from_cache_if_window(session, payload_args): + resp_msg, parquet_bytes = _dispatch(payload_args) + # Two-frame sequence: JSON text frame, then binary Parquet frame + self.write_message(json.dumps(resp_msg)) + if parquet_bytes: + self.write_message(parquet_bytes, binary=True) # Handle second_request (eager loading) second_pa = payload_args.get("second_request") - if second_pa: + if second_pa and not self._serve_from_cache_if_window(session, second_pa): resp2, parquet2 = _dispatch(second_pa) self.write_message(json.dumps(resp2)) if parquet2: diff --git a/buckaroo/server/xorq_loading.py b/buckaroo/server/xorq_loading.py index abddb89c4..b66ab30e1 100644 --- a/buckaroo/server/xorq_loading.py +++ b/buckaroo/server/xorq_loading.py @@ -9,11 +9,15 @@ from __future__ import annotations import builtins +import copy import inspect import logging import traceback from pathlib import Path +from buckaroo.cache.fingerprint import config_fingerprint +from buckaroo.cache.initial_cache import DEFAULT_WINDOW, InitialCacheData +from buckaroo.cache.sd_codec import serialize_sd from buckaroo.server.window import clamp_window from buckaroo.xorq_buckaroo import ( NoCleaningConfXorq, XorqAutocleaning, XorqDataflow, XorqDfStatsV2, @@ -23,6 +27,54 @@ log = logging.getLogger(__name__) +def expr_data_id(expr) -> str: + """Content-based, path-independent identity for an expr — the store key. + + ``get_expr_hash`` canonicalizes + tokenizes the expr, so two build dirs of + the same expression share a ``data_id`` and a rebuild over the same data + validates against the cached bundle. + """ + from xorq.common.utils.provenance_utils import get_expr_hash # noqa: PLC0415 + return get_expr_hash(expr) + + +def build_xorq_bundle(xorq_dataflow: XorqServerDataflow, data_id: str, + window: int = DEFAULT_WINDOW) -> InitialCacheData: + """Snapshot a built ``XorqServerDataflow``'s first render into a bundle. + + The xorq counterpart of ``build_bundle_from_dataflow`` — its ``processed_df`` + is an ibis expression, so the first window comes from ``window_to_parquet`` + and the schema from ``expr.schema()`` rather than the pandas ``to_parquet`` / + DataFrame introspection the core builder uses. + """ + _id, expr, merged_sd = xorq_dataflow.widget_args_tuple + total = _expr_count(expr) + end = min(window, total) + schema = expr.schema() + columns = [str(c) for c in expr.columns] + column_schema = { + 'columns': columns, 'columns_multiindex': False, 'columns_names': None, + 'dtypes': [str(schema[c]) for c in expr.columns], + 'index_multiindex': False, 'index_names': [None], 'index_nlevels': 1} + config_id = config_fingerprint( + analysis_klasses=xorq_dataflow.analysis_klasses, + sampling_klass=getattr(xorq_dataflow, 'sampling_klass', None), + init_sd=getattr(xorq_dataflow, 'init_sd', None) or None, + skip_stat_columns=getattr(xorq_dataflow, 'skip_stat_columns', None)) + styling_klasses = [ + "%s.%s" % (k.__module__, getattr(k, '__qualname__', k.__name__)) + for k in xorq_dataflow.df_display_klasses.values()] + return InitialCacheData( + config_id=config_id, data_id=data_id, df_meta=dict(xorq_dataflow.df_meta), + column_schema=column_schema, sd_parquet=serialize_sd(merged_sd), + first_window_parquet=window_to_parquet(expr, 0, end, None, True), + first_window={'start': 0, 'end': end, 'total_rows': total}, + df_display_args=copy.deepcopy(xorq_dataflow.df_display_args), + buckaroo_options=dict(xorq_dataflow.buckaroo_options), + command_config=dict(xorq_dataflow.command_config), + styling_klasses=styling_klasses) + + def _make_cache_storage(cache_storage_path): """Build a ``ParquetSnapshotCache`` from a filesystem path string. diff --git a/docs/initial-load-cache-design.md b/docs/initial-load-cache-design.md new file mode 100644 index 000000000..365209950 --- /dev/null +++ b/docs/initial-load-cache-design.md @@ -0,0 +1,285 @@ +# Initial-load cache — serve the first render without touching the data + +## Status +Branch `feat/initial-load-cache`, PR #877. All decisions below were locked in a +design review (the grill). Follow-ups split out: #880 (trim the summary-stats +*wire* payload to what the frontend reads) and #881 (DFViewer transport +abstraction — JSON / b64-parquet / binary per embedding). + +**Implemented (CI-green), in TDD increments:** +1. `build_df_display_args` refactor (shared assembler). +2. `config_fingerprint` (`buckaroo/cache/fingerprint.py`). +3. Lossless type-tagged SD codec (`buckaroo/cache/sd_codec.py`). +4. Producer + handshake + consumer (`buckaroo/cache/initial_cache.py`): + `get_initial_cache_data` / `build_bundle_from_dataflow`, `cache_mismatch_reason`, + `apply_initial_cache`, `serve_window_request`. +4a. `InitialCacheStore` — LRU + disk persistence + `prewarm` (`buckaroo/cache/store.py`). +4b. Server integration: `/load_expr` default-on store + hit fast path (cheap + `skip_stat_columns=all` rebuild + bundle replay), `cache_mismatch_reason` + handshake, WS `serve_window` fast path, `/cache` endpoint, `request_id` + correlation-id (`server/{handlers,app,websocket_handler,session,xorq_loading}.py`). +5. Widget `initial_cache=` kwarg + handshake in `BuckarooWidgetBase` + (mechanism-only parity, no Jupyter store/driver — `buckaroo_widget.py`). + +**Remaining:** pandas/polars `/load` opt-in (host-supplied `data_id`). + +## Problem + +Buckaroo's first render is expensive. For a xorq expression the cost is *executing +it*: a normal load reconstructs the expr once, then computes summary stats as **one +batched `expr.aggregate(...)` plus one histogram query per column** — ≈ N+1 executions +(`xorq_stat_pipeline.py:157`, the "N+1 filter evaluations" the code optimizes around) +— plus a window query and a row count. That work repeats on every session open even +when neither the data nor the configuration changed. + +The driving consumer is xorq desktop / pydata-app, where an expression has a stable, +content-based identity. Such a host can hand Buckaroo a precomputed first-render +bundle and skip the pipeline — *provided the bundle provably matches the widget's +configuration*. + +## Measured cost (pydata test-1 catalog, xorq 0.3.26, warm process) + +| step | cost | calls | +|---|---|---| +| `load_expr` (build-dir reconstruction) | **~15–18 ms, flat** across 0.5–158 MB | once | +| summary stats (batched aggregate + per-column histograms) | tens of ms → **~85 ms** (158 MB) warm; more cold/wide | **≈ N+1** expr executions | +| first window (`limit` pushdown) + row `count` | ~4 ms warm | 2 | + +So a cold first load ≈ `load(1) + (1+N) stat executions + window(2)`. The bundle +replaces all of that with a cache read, leaving only a one-time ~17 ms `load_expr` to +re-warm the expr for later scrolling. `load_expr` is *not* the cost — the per-column +stat executions are, and they scale with width. + +## Three-layer cache + +``` +buckaroo initial-load bundle ← NEW: hit = first paint with zero execution + ↓ miss +xorq ParquetSnapshotCache ← unchanged (cache_storage_path); per-stat-query snapshots + ↓ miss +live expr execution +``` + +The existing `.buckaroo_stat_cache/parquet/letsql_cache-snapshot-*.parquet` files +(≈ columns+1 per entry) are the xorq snapshot layer. The new bundle sits above it: a +bundle hit never even reaches the snapshot cache. + +## The handshake — validate, never blindly trust + +The backend provides an **optional** bundle alongside the df/expr (which is held but +not executed): + +``` +widget computes its OWN config_id from its live analysis_klasses + config, +and reads its live schema (df.columns/dtypes, or get_expr_hash — no execution) + │ + ┌────┴───────────────────────────────┐ + │ config_id + schema + version match │ → hydrate from bundle; df/expr NEVER touched + └────┬───────────────────────────────┘ + │ any mismatch + ▼ + warnings.warn(reason) + cache:{status:"mismatch",reason} → normal pipeline (execute) +``` + +The widget computes `config_id` itself and compares — it never reads-and-trusts the +bundle's claim. A stale/foreign bundle costs a warning + a normal compute, never a +wrong render. For xorq the `get_expr_hash` match already implies the schema, so the +config_id (analysis klasses) check is the load-bearing one. + +## Entry points + +```python +# buckaroo/cache/initial_cache.py +def get_initial_cache_data(df_or_expr, *, analysis_klasses=None, styling_klasses=None, + sampling_klass=None, init_sd=None, skip_stat_columns=None, window=1000, + data_id=None, cache_version=None) -> tuple[str, InitialCacheData]: + """Producer: run the pipeline ONCE, snapshot first window + stats + config.""" + +def cache_mismatch_reason(bundle, *, analysis_klasses, sampling_klass, init_sd, + skip_stat_columns, schema) -> str | None: # None ⇒ safe to use +def apply_initial_cache(target, bundle) -> None: # set df_data_dict/display_args/meta +``` + +The **consumer** is the server's `/load_expr` path running the handshake against its +in-memory store (below). The widget gets the same handshake (mechanism, not a driver). + +## Keying & storage + +- **xorq `data_id` = `get_expr_hash(expr)`** (`xorq/.../provenance_utils.py:18-25`): + canonicalize → `SnapshotStrategy` tokenize → truncate. Content-based, + path-independent; the build-dir basename already *is* this hash. Verified safe: + `ExprLoader.load_expr` reads only named files (`compiler.py:663-684`) and never + re-verifies a content hash (`:655-657`), so it's *technically* safe to write into a + build dir — but we don't (build dirs are packageable + reproducibility-checked; + keep them pristine). +- **Store: server-managed, keyed by `data_id`, OUTSIDE the build/catalog dir.** + Persistent (survives restart), with an **in-memory LRU** over it. Lazy-on-miss + populates it; `prewarm(dir)` loads it eagerly at startup. +- **pandas/polars:** the host supplies `data_id` (path+mtime+size, or a content hash). + Buckaroo never fingerprints a frame itself. + +## config_id — the handshake key + +Stable cross-process fingerprint (`module.qualname`, not `id()`), over the +data-touching computation only: + +| In the key | Out (replay-time display; regenerated from the bundle) | +|---|---| +| `analysis_klasses` (ordered) | `column_config_overrides` | +| `sampling_klass` params | `component_config` | +| `init_sd`, `skip_stat_columns` | `extra_grid_config`, `pinned_rows` | +| `INITIAL_CACHE_VERSION` (+ optional `cache_version`) | `styling_klasses` | + +Re-theming or overriding a column never invalidates the cache. + +## The bundle — `InitialCacheData` + +Persisted as parquet (binary) + a JSON manifest. No b64 on disk. + +```python +{ + 'cache_format_version': int, 'config_id': str, 'data_id': str | None, + 'df_meta': dict, # columns, rows_shown, total_rows, filtered_rows + 'column_schema': {'columns': [...], 'index': {...}, 'dtypes': [...]}, + 'sd_parquet': '', # merged_sd MINUS value_counts, lossless type-tagged + 'first_window_parquet': '', # to_parquet(rows[0:window]) — window IS cached, not stats-only + 'first_window': {'start': 0, 'end': int, 'total_rows': int}, + 'df_display_args': dict, # prerender for the zero-override common case + 'buckaroo_options': dict, 'command_config': dict, + 'styling_klasses': [str, ...], +} +``` + +The `all_stats` wire payload is derived from `sd_parquet` at serve time (matching +today's `parquet_b64` in `initial_state` — server delivery unchanged; #880 trims it). + +### Stats codec (no pickle, cross-backend) + +`value_counts` is **dropped** — nothing at replay recomputes from it (and it's the +`pd.Series` that made round-tripping hard). The remaining values round-trip via a +**type-tagged** cell codec extending the lossy `sd_to_parquet_b64` encoder +(`serialization_utils.py:361-394`), tagging the non-JSON-native types so they +reconstruct exactly. The type surface differs per backend, so the round-trip is +**tested across pandas/polars/xorq**: + +| type | pandas | polars | xorq | +|---|---|---|---| +| `pd.Timestamp`/`pd.Timedelta` | ✓ | — | — | +| stdlib `datetime`/`date`/`time`/`timedelta` | — | ✓ (`pl_stats_v2.py:86,91,92`) | via `_to_python_scalar` | +| `Decimal`, `bytes` | — | ✓ | — | +| `np.datetime64` | mode→Timestamp | — | →`datetime.date` | + +## Styling stays configurable + +`get_dfviewer_config(sd, df)` reads only the summary dict + column/index *structure*, +never a row value (`styling_core.py:422-473`, `customizations/styling.py:70-142`). So +a **zero-row DataFrame** rebuilt from `column_schema` regenerates `df_display_args` +exactly — feeding `merge_column_config` (`styling_core.py:231-254`) the same +`old_col_new_col` mapping. With no display knobs passed, the prerendered +`df_display_args` is used directly; with knobs, regenerate — frame never built either +way. Refactor: lift the assembly loop (`dataflow.py:705-723`) into a module-level +`build_df_display_args(...)` shared by the live path and the cache path. + +## Serving & the warm path + +- `/load_expr` (default-on): `load_expr` (cheap) → compute `data_id` → store lookup. + **Hit** (handshake passes) → serve cached first paint, no stats, no window + execution. **Miss** → full compute + store. +- After a hit: write the cached response, then `IOLoop.current().add_callback(warm)` — + `warm` is the **cheap `load_expr` + wire `(expr, cached merged_sd)`** onto the + session (~17 ms, synchronous, no `async def`, respects the no-async constraint). The + first scroll is then a ~4 ms window pushdown; the N+1 stat queries never re-run. +- `serve_window` predicate: `start==0 ∧ end≤window ∧ no sort ∧ no search`; anything + else falls through to the live `handle_infinite_request_xorq` pushdown. + +## Observability & metadata + +- **Correlation id:** the POST carries `request_id`; buckaroo echoes it and stamps it + on every log line for that load. `/load_expr` returns `cache: {status, reason}` + (`hit|miss|mismatch`). Lets the host align its logs with buckaroo's. +- **`/cache` endpoint:** cache introspection — `[{expr_hash, data_id, bytes, + last_used, hits}]`, totals, LRU capacity, hit/miss rate. Broader session/server + enumeration is #860. + +## Integration — additive + +- **Server:** `/load_expr` (+ `/load`) accept an `initial_cache` flag — **default ON + for `/load_expr`**, a POST field (`initial_cache: false`) turns it off (full compute, + skip store). `/load` (pandas/polars) is default-off unless the host passes a + `data_id`. Server-managed store (LRU) + `prewarm(dir)` + `/cache` endpoint + + correlation-id logging. Session already retains `build_dir` (`session.py:18-36`) for + the unexecuted-fallback. +- **Widget (Jupyter):** `initial_cache=` kwarg + handshake + `apply_initial_cache` in + the shared `BuckarooWidgetBase.__init__` (`buckaroo_widget.py:125`) — the *same* code + path as the server, so parity is guaranteed. **No Jupyter store / driver / prewarm + built now** (per scope). + +## Scope + +In: `buckaroo/cache/` (producer, handshake, `apply_initial_cache`, type-tagged SD +codec, `config_fingerprint`); `build_df_display_args` refactor; server store + LRU + +`prewarm` + `/cache` + `/load_expr` default-on integration + correlation-id; widget +`initial_cache` mechanism; tests. + +Out: trimming the stats *wire* payload (#880); transport abstraction / binary stats on +the server (#881); a Jupyter store/driver; pandas/polars auto-fingerprinting; caching +past the first window (sort/filter/scroll/ops stay on the source); writing sidecars +into build dirs; live-source staleness detection (delegated to xorq — see risks). + +## Files + +1. `buckaroo/cache/{__init__,initial_cache,fingerprint}.py` *(new)* — producer, + handshake, `apply_initial_cache`, `config_fingerprint` (uses `get_expr_hash` for + xorq `data_id`), `INITIAL_CACHE_VERSION`. +2. `buckaroo/cache/store.py` *(new)* — server-managed `{data_id: bundle}` LRU store, + disk persistence, `prewarm(dir)`. +3. `buckaroo/serialization_utils.py` — type-tagged lossless SD↔parquet codec (drops + `value_counts`). +4. `buckaroo/dataflow/dataflow.py` — extract `build_df_display_args`; pure refactor. +5. `buckaroo/buckaroo_widget.py` — `initial_cache` kwarg + handshake. +6. `buckaroo/server/{handlers,app,session,websocket_handler}.py`, `data_loading.py` — + `/load_expr` default-on + opt-out, store wiring, `serve_window` fast path, `/cache` + endpoint, correlation-id. +7. `tests/unit/cache/` + `tests/unit/test_sd_codec.py` *(new)*. + +## Implementation order (TDD — failing tests, then fix) + +1. **Refactor.** Extract `build_df_display_args`; existing suite stays green. Own commit. +2. **Failing tests** (one commit): + - `config_fingerprint` stable cross-process; differs when an analysis class changes. + - **SD codec cross-backend round-trip** (temporal/`Decimal`/`bytes`/histogram via + pandas/polars/xorq; `value_counts` absent). + - `get_initial_cache_data` → bundle whose `initial_state` / `df_display_args` / + first-window parquet equal a live `XorqServerDataflow`'s, **with the expr raising + on execution** (prove the hit path + `serve_window({0,1000})` never execute it). + - **handshake mismatch:** wrong `analysis_klasses` (config_id) or schema ⇒ + `warnings.warn` + `cache.status=="mismatch"` + a normal compute (expr *is* + executed). Assert both. + - **replay-override parity:** capture with no overrides, replay with non-trivial + `component_config` + `column_config_overrides`; `df_display_args` byte-equal to a + live dataflow with the same knobs, frame untouched. + - `serve_window` returns `None` for sort / search / `start>0` / `end>window`. + - server `/load_expr` default-on: first call (miss) executes + stores; second call + (hit) serves without executing; `initial_cache:false` always executes. + - `prewarm(dir)` loads bundles; `/cache` reports them. + Push, watch CI fail. +3. **Fix.** Cache module + store + codec + refactor wiring + widget kwarg + server + integration. Push, watch green. + +## Open questions / risks + +- **Live-source staleness.** `get_expr_hash` is *structural* — an expr over a mutable + source can keep its hash while the data changes. v1 scopes the cache to pinned/built + exprs (the catalog model). For live sources, detection is delegated to xorq's cache + invalidation when the real expr is next touched (error if invalid). Dependency: I'll + confirm `ParquetSnapshotCache`/`SnapshotStrategy` exposes that signal when building + the scroll/miss path — not a v1 first-paint blocker (the hit path never executes). +- **Eager vs deferred stat execution.** Instrumenting `XorqStatPipeline._execute` + showed 0 calls for some no-`cache_storage` constructs, i.e. the stat queries + defer/short-circuit on that path. The N+1 cost model (from the code + snapshot-file + count) is authoritative; the precise eager/deferred timing can be pinned by + instrumenting `process_table` if a number is needed. +- **Non-deterministic sampling** (`data_loading.py:28-38`, unseeded `df.sample` for + >1M-row frames) — we snapshot one realization; consider a seed. +- **MultiIndex** — the zero-row df must reproduce the exact `old_col_new_col` mapping; + test with a MultiIndex fixture. diff --git a/tests/unit/cache/test_fingerprint.py b/tests/unit/cache/test_fingerprint.py new file mode 100644 index 000000000..2e9932e61 --- /dev/null +++ b/tests/unit/cache/test_fingerprint.py @@ -0,0 +1,64 @@ +"""Tests for the initial-load cache config fingerprint. + +config_fingerprint produces a stable, cross-process identifier of the +*data-touching* configuration (analysis klasses, sampling, init_sd, +skip columns, version). It is the key the handshake validates: a bundle +whose config_fingerprint matches the widget's live config is safe to use +without recomputing. +""" +import subprocess +import sys + +from buckaroo.cache.fingerprint import config_fingerprint, INITIAL_CACHE_VERSION +from buckaroo.customizations.analysis import TypingStats, DefaultSummaryStats +from buckaroo.customizations.histogram import Histogram + + +def test_deterministic_and_hexish(): + a = config_fingerprint(analysis_klasses=[TypingStats, DefaultSummaryStats]) + b = config_fingerprint(analysis_klasses=[TypingStats, DefaultSummaryStats]) + assert a == b + assert isinstance(a, str) and len(a) >= 8 + int(a, 16) # hex digest + + +def test_membership_changes_fingerprint(): + one = config_fingerprint(analysis_klasses=[TypingStats]) + two = config_fingerprint(analysis_klasses=[TypingStats, DefaultSummaryStats]) + three = config_fingerprint(analysis_klasses=[TypingStats, DefaultSummaryStats, Histogram]) + assert len({one, two, three}) == 3 + + +def test_version_changes_fingerprint(): + base = config_fingerprint(analysis_klasses=[TypingStats]) + assert config_fingerprint(analysis_klasses=[TypingStats], cache_version="v2") != base + + +def test_init_sd_and_skip_columns_change_fingerprint(): + base = config_fingerprint(analysis_klasses=[TypingStats]) + assert config_fingerprint( + analysis_klasses=[TypingStats], init_sd={'a': {'mean': 1}}) != base + assert config_fingerprint( + analysis_klasses=[TypingStats], skip_stat_columns=['a']) != base + + +def test_skip_columns_order_insensitive(): + # skip_stat_columns is a set of column names; order must not matter. + x = config_fingerprint(analysis_klasses=[TypingStats], skip_stat_columns=['a', 'b']) + y = config_fingerprint(analysis_klasses=[TypingStats], skip_stat_columns=['b', 'a']) + assert x == y + + +def test_stable_across_processes(): + # The whole point: a bundle built in one process must validate in another. + # An id()-based key would fail this; a qualname-based one passes. + code = ( + "from buckaroo.cache.fingerprint import config_fingerprint;" + "from buckaroo.customizations.analysis import TypingStats, DefaultSummaryStats;" + "print(config_fingerprint(analysis_klasses=[TypingStats, DefaultSummaryStats]))") + out = subprocess.check_output([sys.executable, "-c", code]).decode().strip() + assert out == config_fingerprint(analysis_klasses=[TypingStats, DefaultSummaryStats]) + + +def test_version_constant_is_int(): + assert isinstance(INITIAL_CACHE_VERSION, int) diff --git a/tests/unit/cache/test_initial_cache.py b/tests/unit/cache/test_initial_cache.py new file mode 100644 index 000000000..2f235f805 --- /dev/null +++ b/tests/unit/cache/test_initial_cache.py @@ -0,0 +1,179 @@ +"""Tests for the initial-load cache producer / handshake / consumer. + +These exercise the *backend-agnostic core* (pandas only — no xorq / polars / +server needed): + +* ``get_initial_cache_data`` / ``build_bundle_from_dataflow`` — the producer + runs the pipeline once and snapshots a bundle equal to a live dataflow's + first render. +* ``cache_mismatch_reason`` — the handshake: matching config ⇒ ``None``; + wrong analysis klasses / schema / version ⇒ a reason string. +* ``apply_initial_cache`` — the consumer hydrates a target from the bundle + alone (no DataFrame), and regenerates ``df_display_args`` from a zero-row + frame when replay-time display overrides are supplied (styling is data-free). + +The xorq "expr raises on execution" hit-path proof and the server default-on +wiring land with the server integration. See docs/initial-load-cache-design.md. +""" +import types +from io import BytesIO + +import pandas as pd + +from buckaroo.cache.fingerprint import config_fingerprint +from buckaroo.cache.sd_codec import deserialize_sd +from buckaroo.cache.initial_cache import ( + CACHE_FORMAT_VERSION, InitialCacheData, get_initial_cache_data, + build_bundle_from_dataflow, cache_mismatch_reason, apply_initial_cache, + extract_column_schema) +from buckaroo.pluggable_analysis_framework.utils import filter_analysis +from buckaroo.server.data_loading import ServerDataflow, ServerSampling, create_dataflow + + +def _df(): + return pd.DataFrame({ + 'ints': [1, 2, 3, 4, 5], + 'floats': [1.5, 2.5, 3.5, 4.5, 5.5], + 'strs': ['a', 'b', 'c', 'd', 'e'], + 'dates': pd.to_datetime(['2020-01-01', '2020-06-01', '2021-01-01', '2021-06-01', '2022-01-01'])}) + + +def test_producer_matches_live_dataflow(): + df = _df() + live = create_dataflow(df) + config_id, bundle = get_initial_cache_data(df) + + assert isinstance(bundle, InitialCacheData) + assert bundle.config_id == config_id + assert bundle.cache_format_version == CACHE_FORMAT_VERSION + # config_id is the fingerprint of the data-touching config — independent + # of id(); reproducible from the klass list alone. + assert config_id == config_fingerprint( + analysis_klasses=ServerDataflow.analysis_klasses, sampling_klass=ServerSampling) + # The prerendered display args + meta equal a live dataflow's, byte-for-byte. + assert bundle.df_display_args == live.df_display_args + assert bundle.df_meta == live.df_meta + # merged_sd round-trips (minus value_counts) and keeps the same columns. + live_sd = live.widget_args_tuple[2] + assert set(deserialize_sd(bundle.sd_parquet).keys()) == set(live_sd.keys()) + + +def test_build_bundle_from_dataflow_directly(): + # The API the server uses in increment 4: it already has a built dataflow. + df = _df() + dataflow = create_dataflow(df) + bundle = build_bundle_from_dataflow(dataflow, data_id="abc123") + assert bundle.data_id == "abc123" + assert bundle.df_display_args == dataflow.df_display_args + assert bundle.first_window['total_rows'] == len(df) + + +def test_first_window_parquet_roundtrips(): + df = _df() + _cid, bundle = get_initial_cache_data(df, window=3) + assert bundle.first_window == {'start': 0, 'end': 3, 'total_rows': 5} + back = pd.read_parquet(BytesIO(bundle.first_window_parquet)) + # window cached the first 3 rows (renamed-col scheme has an 'index' col). + assert len(back) == 3 + assert 'index' in back.columns + + +def test_apply_reproduces_display_without_a_dataframe(): + df = _df() + live = create_dataflow(df) + _cid, bundle = get_initial_cache_data(df) + + # A bare target — no DataFrame, no pipeline. Hydration is bundle-only. + target = types.SimpleNamespace() + apply_initial_cache(target, bundle) + + assert target.df_display_args == live.df_display_args + assert target.df_meta == bundle.df_meta + assert target.df_data_dict['main'] == [] + assert target.df_data_dict['empty'] == [] + # all_stats is the wire payload derived from the cached merged_sd. + all_stats = target.df_data_dict['all_stats'] + assert isinstance(all_stats, dict) and all_stats.get('format') == 'parquet_b64' + + +def test_handshake_matches(): + df = _df() + _cid, bundle = get_initial_cache_data(df) + reason = cache_mismatch_reason( + bundle, analysis_klasses=ServerDataflow.analysis_klasses, + sampling_klass=ServerSampling, schema=extract_column_schema(df)) + assert reason is None + + +def test_handshake_wrong_analysis_klasses(): + df = _df() + _cid, bundle = get_initial_cache_data(df) + # Drop the last klass — a different data-touching config ⇒ config_id differs. + fewer = ServerDataflow.analysis_klasses[:-1] + reason = cache_mismatch_reason( + bundle, analysis_klasses=fewer, sampling_klass=ServerSampling, + schema=extract_column_schema(df)) + assert reason is not None + assert 'config' in reason.lower() + + +def test_handshake_wrong_schema(): + df = _df() + _cid, bundle = get_initial_cache_data(df) + other = df.rename(columns={'ints': 'renamed'}) + reason = cache_mismatch_reason( + bundle, analysis_klasses=ServerDataflow.analysis_klasses, + sampling_klass=ServerSampling, schema=extract_column_schema(other)) + assert reason is not None + assert 'schema' in reason.lower() + + +def test_handshake_version_mismatch(): + df = _df() + _cid, bundle = get_initial_cache_data(df) + bundle.cache_format_version = CACHE_FORMAT_VERSION + 1 + reason = cache_mismatch_reason( + bundle, analysis_klasses=ServerDataflow.analysis_klasses, + sampling_klass=ServerSampling) + assert reason is not None + assert 'version' in reason.lower() + + +def test_replay_override_parity(): + # Capture with no overrides; replay with non-trivial display knobs. + # The replayed df_display_args must equal a live dataflow built with the + # same knobs — proving styling regenerates from a zero-row frame. + df = _df() + overrides = {'ints': {'color_map_config': { + 'color_rule': 'color_categorical', 'val_column': 'ints'}}} + component_config = {'height_fraction': 2} + + live = create_dataflow(df) + live_with_knobs = ServerDataflow( + df, column_config_overrides=overrides, component_config=component_config, + skip_main_serial=True) + + _cid, bundle = get_initial_cache_data(df) + # Sanity: the baseline bundle differs from the knob'd render. + assert bundle.df_display_args == live.df_display_args + + df_display_klasses = filter_analysis(ServerDataflow.analysis_klasses, "df_display_name") + target = types.SimpleNamespace() + apply_initial_cache( + target, bundle, df_display_klasses=df_display_klasses, + column_config_overrides=overrides, component_config=component_config) + assert target.df_display_args == live_with_knobs.df_display_args + + +def test_zero_row_df_reproduces_multiindex_col_mapping(): + # The MultiIndex risk: a zero-row frame rebuilt from the schema must + # reproduce the exact old_col_new_col mapping the styler keys off. + from buckaroo.cache.initial_cache import _zero_row_df + from buckaroo.df_util import old_col_new_col + + cols = pd.MultiIndex.from_tuples( + [('a', 'x'), ('a', 'y'), ('b', 'z')], names=['lvl1', 'lvl2']) + df = pd.DataFrame([[1, 2, 3], [4, 5, 6]], columns=cols) + zdf = _zero_row_df(extract_column_schema(df)) + assert len(zdf) == 0 + assert old_col_new_col(zdf) == old_col_new_col(df) diff --git a/tests/unit/cache/test_sd_codec.py b/tests/unit/cache/test_sd_codec.py new file mode 100644 index 000000000..1df6ea7cd --- /dev/null +++ b/tests/unit/cache/test_sd_codec.py @@ -0,0 +1,96 @@ +"""Tests for the lossless summary-dict codec used by the initial-load cache. + +serialize_sd / deserialize_sd persist a ``merged_sd`` losslessly to parquet +without pickle, dropping ``value_counts`` (never recomputed from, never read by +the frontend). The value types differ per backend (pandas → pd.Timestamp; +polars → stdlib datetime / Decimal / bytes; xorq → mixed), so the codec must +round-trip the full union. See docs/initial-load-cache-design.md. +""" +import datetime +import decimal +import math + +import numpy as np +import pandas as pd + +from buckaroo.cache.sd_codec import serialize_sd, deserialize_sd + + +def test_roundtrip_value_types(): + sd = { + 'a': { + '_type': 'integer', 'dtype': 'int64', + 'min': np.int64(3), 'max': 10, + 'mean': np.float64(5.5), + 'nan_stat': float('nan'), + 'np_nan': np.float64('nan'), + 'flag': np.bool_(True), 'py_flag': False, + 'ts': pd.Timestamp('2020-01-02T03:04:05'), + 'td': pd.Timedelta(days=2, hours=3), + 'pydt': datetime.datetime(2021, 1, 1, 12, 30, 0), + 'pydate': datetime.date(2021, 1, 1), + 'pytime': datetime.time(13, 30, 15), + 'pytd': datetime.timedelta(hours=5, minutes=10), + 'dec': decimal.Decimal('3.14'), + 'blob': b'\x00\x01hello', + 'histogram': [{'name': '0-5', 'population': np.float64(40.0)}, + {'name': 'NA', 'NA': 2.0}], + 'orig_col_name': 'A', + 'value_counts': pd.Series([1, 2, 3]), # must be DROPPED + }, + 'b': {'_type': 'string', 'dtype': 'object', 'orig_col_name': 'B'}, + } + out = deserialize_sd(serialize_sd(sd)) + + assert set(out.keys()) == {'a', 'b'} + a = out['a'] + assert 'value_counts' not in a + assert a['_type'] == 'integer' and a['dtype'] == 'int64' + assert a['min'] == 3 and a['max'] == 10 + assert a['mean'] == 5.5 + assert math.isnan(a['nan_stat']) and math.isnan(a['np_nan']) + assert a['flag'] is True and a['py_flag'] is False + assert a['ts'] == pd.Timestamp('2020-01-02T03:04:05') + assert a['td'] == pd.Timedelta(days=2, hours=3) + assert a['pydt'] == datetime.datetime(2021, 1, 1, 12, 30, 0) + assert a['pydate'] == datetime.date(2021, 1, 1) + assert a['pytime'] == datetime.time(13, 30, 15) + assert a['pytd'] == datetime.timedelta(hours=5, minutes=10) + assert a['dec'] == decimal.Decimal('3.14') + assert a['blob'] == b'\x00\x01hello' + assert a['histogram'] == [{'name': '0-5', 'population': 40.0}, {'name': 'NA', 'NA': 2.0}] + assert a['orig_col_name'] == 'A' + assert out['b']['orig_col_name'] == 'B' + + +def test_orig_col_name_tuple_roundtrips(): + # MultiIndex columns carry orig_col_name as a tuple — must not degrade to a list. + sd = {'a': {'_type': 'float', 'orig_col_name': ('lvl1', 'lvl2')}} + out = deserialize_sd(serialize_sd(sd)) + assert out['a']['orig_col_name'] == ('lvl1', 'lvl2') + + +def test_returns_bytes_and_empty_ok(): + assert isinstance(serialize_sd({}), bytes) + assert deserialize_sd(serialize_sd({})) == {} + + +def test_roundtrip_real_pandas_pipeline_sd(): + from buckaroo.customizations.analysis import ( + TypingStats, DefaultSummaryStats, ComputedDefaultSummaryStats) + from buckaroo.customizations.histogram import Histogram + from buckaroo.pluggable_analysis_framework.df_stats_v2 import DfStatsV2 + + df = pd.DataFrame({'ints': [1, 2, 3, None], 'floats': [1.5, 2.5, 3.5, 4.5], 'strs': ['a', 'b', 'c', 'd'], + 'dates': pd.to_datetime(['2020-01-01', '2020-06-01', '2021-01-01', '2021-06-01'])}) + sd = DfStatsV2( + df, [TypingStats, DefaultSummaryStats, Histogram, ComputedDefaultSummaryStats], + 'test_df').sdf + out = deserialize_sd(serialize_sd(sd)) + # Every non-value_counts stat key survives the round-trip, per column. + assert set(out.keys()) == set(sd.keys()) + for col, meta in sd.items(): + for k in meta: + if k == 'value_counts': + continue + assert k in out[col], f"{col}.{k} lost in round-trip" diff --git a/tests/unit/cache/test_store.py b/tests/unit/cache/test_store.py new file mode 100644 index 000000000..f37024351 --- /dev/null +++ b/tests/unit/cache/test_store.py @@ -0,0 +1,136 @@ +"""Tests for the server-managed initial-load cache store. + +``InitialCacheStore`` is an in-memory LRU over a persistent on-disk directory, +keyed by ``data_id`` (the xorq expr hash, or a host-supplied file identity). It +backs the server's ``/load_expr`` hit path: ``prewarm`` loads bundles eagerly at +startup, ``get`` lazily faults them in from disk on a cold hit, and ``report`` +feeds the ``/cache`` introspection endpoint. + +``serve_window_request`` is the pure predicate deciding whether an infinite_request +can be answered from the cached first window (head slice, unsorted, unfiltered). + +All pure Python — no server, no xorq. See docs/initial-load-cache-design.md. +""" +import pandas as pd + +from buckaroo.cache.initial_cache import ( + DEFAULT_WINDOW, get_initial_cache_data, serve_window_request) +from buckaroo.cache.sd_codec import deserialize_sd +from buckaroo.cache.store import InitialCacheStore, write_bundle, read_bundle + + +def _bundle(data_id="d1", n=5): + df = pd.DataFrame({'ints': list(range(n)), 'strs': [chr(97 + i) for i in range(n)]}) + _cid, bundle = get_initial_cache_data(df, data_id=data_id) + return bundle + + +def test_put_get_memory(): + store = InitialCacheStore(base_dir=None) + b = _bundle("d1") + store.put(b) + got = store.get("d1") + assert got is b # in-memory hit returns the same object + + +def test_get_missing_returns_none(): + store = InitialCacheStore(base_dir=None) + assert store.get("nope") is None + + +def test_put_requires_data_id(): + store = InitialCacheStore(base_dir=None) + b = _bundle(data_id=None) + try: + store.put(b) + assert False, "expected ValueError for a bundle with no data_id" + except ValueError: + pass + + +def test_lru_eviction_memory_only(): + store = InitialCacheStore(base_dir=None, capacity=2) + store.put(_bundle("a")) + store.put(_bundle("b")) + store.get("a") # touch 'a' so 'b' is now least-recently-used + store.put(_bundle("c")) # over capacity → evict LRU ('b') + assert store.get("a") is not None + assert store.get("c") is not None + assert store.get("b") is None # evicted, no disk to fall back to + + +def test_disk_persistence_survives_new_store(tmp_path): + base = str(tmp_path / "cache") + s1 = InitialCacheStore(base_dir=base) + original = _bundle("xhash") + s1.put(original) + + # A fresh store over the same dir lazy-loads the bundle from disk. + s2 = InitialCacheStore(base_dir=base) + loaded = s2.get("xhash") + assert loaded is not None + assert loaded.config_id == original.config_id + assert loaded.df_meta == original.df_meta + assert loaded.first_window == original.first_window + assert loaded.first_window_parquet == original.first_window_parquet + assert set(deserialize_sd(loaded.sd_parquet)) == set(deserialize_sd(original.sd_parquet)) + # Simple (non-MultiIndex) df → df_display_args is plain JSON, round-trips exactly. + assert loaded.df_display_args == original.df_display_args + + +def test_lru_evicts_from_memory_but_disk_backs_it(tmp_path): + base = str(tmp_path / "cache") + store = InitialCacheStore(base_dir=base, capacity=1) + store.put(_bundle("a")) + store.put(_bundle("b")) # evicts 'a' from memory, but disk keeps it + assert "a" not in store._mem + assert store.get("a") is not None # faulted back in from disk + + +def test_prewarm_loads_bundles(tmp_path): + base = str(tmp_path / "cache") + seed = InitialCacheStore(base_dir=base) + seed.put(_bundle("one")) + seed.put(_bundle("two")) + + fresh = InitialCacheStore(base_dir=base) + assert fresh.prewarm() == 2 + assert "one" in fresh._mem and "two" in fresh._mem + + +def test_report_shape(): + store = InitialCacheStore(base_dir=None, capacity=10) + store.put(_bundle("a")) + store.put(_bundle("b")) + store.get("a") + rep = store.report() + assert rep['count'] == 2 + assert rep['capacity'] == 10 + assert rep['total_bytes'] > 0 + by_id = {e['data_id']: e for e in rep['entries']} + assert by_id['a']['hits'] >= 1 + assert by_id['a']['bytes'] > 0 + assert 'config_id' in by_id['a'] + + +def test_write_read_bundle_roundtrip(tmp_path): + d = str(tmp_path / "one") + b = _bundle("rt") + write_bundle(b, d) + back = read_bundle(d) + assert back.data_id == "rt" + assert back.config_id == b.config_id + assert back.first_window_parquet == b.first_window_parquet + assert back.cache_format_version == b.cache_format_version + + +def test_serve_window_predicate(): + w = DEFAULT_WINDOW + # Head slice, unsorted, unfiltered → serve from cache. + assert serve_window_request({'start': 0, 'end': 50}, w) is True + assert serve_window_request({'start': 0, 'end': w}, w) is True + # Anything else falls through to the live source. + assert serve_window_request({'start': 0, 'end': w + 1}, w) is False + assert serve_window_request({'start': 50, 'end': 100}, w) is False + assert serve_window_request({'start': 0, 'end': 50, 'sort': 'a'}, w) is False + assert serve_window_request({'start': 0, 'end': 50}, w, search_string="x") is False diff --git a/tests/unit/cache/test_widget_initial_cache.py b/tests/unit/cache/test_widget_initial_cache.py new file mode 100644 index 000000000..5e68f80a6 --- /dev/null +++ b/tests/unit/cache/test_widget_initial_cache.py @@ -0,0 +1,56 @@ +"""Widget-side initial-load cache mechanism. + +``BuckarooWidgetBase`` accepts an optional ``initial_cache=`` bundle. After the +widget builds its dataflow, a matching bundle (handshake passes) hydrates the +widget's display traits from the cache; a mismatch warns and keeps the +freshly-computed values. This is the *same* validate-don't-trust path the server +runs — mechanism only, no Jupyter store/driver/prewarm (per scope). + +The bundle is built with the widget's own analysis klasses + sampling so the +config_id matches; we tag a sentinel onto the bundle's df_meta to prove the +replay actually ran (vs the widget's own computation, which equals the bundle's +on a match). See docs/initial-load-cache-design.md. +""" +import pandas as pd +import pytest + +from buckaroo.buckaroo_widget import BuckarooInfiniteWidget, InfinitePdSampling +from buckaroo.cache.initial_cache import get_initial_cache_data + + +def _matching_bundle(df): + # Same klasses + sampling as the widget ⇒ the handshake's config_id matches. + _cid, bundle = get_initial_cache_data( + df, analysis_klasses=BuckarooInfiniteWidget.analysis_klasses, + sampling_klass=InfinitePdSampling) + return bundle + + +def test_widget_replays_matching_bundle(): + df = pd.DataFrame({'a': [1, 2, 3], 'b': ['x', 'y', 'z']}) + bundle = _matching_bundle(df) + bundle.df_meta = {**bundle.df_meta, 'sentinel': 'from_cache'} + w = BuckarooInfiniteWidget(df, initial_cache=bundle) + # The sentinel only reaches the widget if apply_initial_cache ran. + assert w.df_meta.get('sentinel') == 'from_cache' + assert 'main' in w.df_display_args + + +def test_widget_warns_and_ignores_mismatch(): + df = pd.DataFrame({'a': [1, 2, 3], 'b': ['x', 'y', 'z']}) + bundle = _matching_bundle(df) + bundle.config_id = 'deadbeef0000' # force a config_id mismatch + bundle.df_meta = {**bundle.df_meta, 'sentinel': 'from_cache'} + with pytest.warns(UserWarning): + w = BuckarooInfiniteWidget(df, initial_cache=bundle) + # Mismatch ⇒ the bundle is ignored; the widget shows its own computed meta. + assert 'sentinel' not in w.df_meta + assert w.df_meta['total_rows'] == 3 + + +def test_widget_without_initial_cache_constructs_normally(): + # The new kwarg must default cleanly — no bundle, normal construction. + df = pd.DataFrame({'a': [1, 2, 3], 'b': ['x', 'y', 'z']}) + w = BuckarooInfiniteWidget(df) + assert w.df_meta['total_rows'] == 3 + assert 'main' in w.df_display_args diff --git a/tests/unit/dataflow/dataflow_test.py b/tests/unit/dataflow/dataflow_test.py index 51265e2f2..27b8e46c2 100644 --- a/tests/unit/dataflow/dataflow_test.py +++ b/tests/unit/dataflow/dataflow_test.py @@ -2,6 +2,7 @@ import pandas as pd from buckaroo.dataflow.dataflow import DataFlow from buckaroo.dataflow import dataflow as dft +from buckaroo.dataflow.styling_core import merge_column_config from buckaroo.dataflow.autocleaning import SENTINEL_DF_1, SENTINEL_DF_2 simple_df = pd.DataFrame({'int_col':[1, 2, 3], 'str_col':['a', 'b', 'c']}) @@ -121,7 +122,7 @@ def test_merge_column_config(): {'header_name':'foo', 'col_name':'a', 'displayer_args': {'displayer': 'obj'}}, {'header_name':'bar', 'col_name':'b', 'displayer_args': {'displayer': 'obj'}}] temp_df=pd.DataFrame({'foo':[], 'bar':[], 'volume_colors':[]}) - merged = dft.merge_column_config(computed_column_config, temp_df, overrides) + merged = merge_column_config(computed_column_config, temp_df, overrides) expected = [ {'header_name':'foo', 'col_name': 'a', 'displayer_args': {'displayer': 'obj'}, @@ -138,7 +139,7 @@ def test_merge_column_config_hide(): {'header_name':'foo', 'col_name':'a', 'displayer_args': {'displayer': 'obj'}}, {'header_name':'bar', 'col_name':'b', 'displayer_args': {'displayer': 'obj'}}] temp_df=pd.DataFrame({'foo':[], 'bar':[], 'volume_colors':[]}) - merged = dft.merge_column_config( + merged = merge_column_config( computed_column_config, temp_df, overrides) expected = [ diff --git a/tests/unit/serialization_utils_test.py b/tests/unit/serialization_utils_test.py index 51db27e05..a28cb9ec8 100644 --- a/tests/unit/serialization_utils_test.py +++ b/tests/unit/serialization_utils_test.py @@ -4,7 +4,7 @@ from buckaroo.ddd_library import get_multiindex_with_names_index_df, get_multiindex_cols_df, get_multiindex_index_df from buckaroo.serialization_utils import ( is_ser_dt_safe, is_dataframe_datetime_safe, check_and_fix_df, pd_to_obj, - to_parquet, DuplicateColumnsException, _json_encode_cell) + to_parquet, slice_window_parquet, DuplicateColumnsException, _json_encode_cell) @@ -163,3 +163,34 @@ def test_json_encode_cell_numpy_array_roundtrips(): assert decoded == [3.0, 7.5, 12.0, 16.5, 21.0] + + +def _read_window(parquet_bytes): + import pyarrow.parquet as pq + from io import BytesIO + return pq.read_table(BytesIO(parquet_bytes)) + + +def test_slice_window_parquet_honors_requested_range(): + # A 10-row head window re-sliced to a smaller block ships exactly the + # requested rows — the wire contract the SmartRowCache enforces (#877). + df = pd.DataFrame({'x': range(10), 's': [chr(97 + i) for i in range(10)]}) + window = to_parquet(df) + assert _read_window(window).num_rows == 10 + + sliced = slice_window_parquet(window, 0, 4) + t = _read_window(sliced) + assert t.num_rows == 4 + # Columns + the appended index column survive the re-slice unchanged. + assert t.column_names == _read_window(window).column_names + assert t.column('a').to_pylist() == [0, 1, 2, 3] + assert t.column('index').to_pylist() == [0, 1, 2, 3] + + +def test_slice_window_parquet_clamps_past_end(): + # end past the window's row count clamps to what's available; the whole + # window comes back rather than erroring. + df = pd.DataFrame({'x': range(6)}) + window = to_parquet(df) + assert _read_window(slice_window_parquet(window, 0, 1000)).num_rows == 6 + assert _read_window(slice_window_parquet(window, 0, 6)).num_rows == 6 diff --git a/tests/unit/server/test_initial_cache_server.py b/tests/unit/server/test_initial_cache_server.py new file mode 100644 index 000000000..2d304a3c0 --- /dev/null +++ b/tests/unit/server/test_initial_cache_server.py @@ -0,0 +1,260 @@ +"""Server integration for the initial-load cache — store wiring + observability. + +These cover the *additive* half of the server integration (4b-i): ``/load_expr`` +builds and stores an ``InitialCacheData`` bundle keyed by the expr hash, echoes a +correlation ``request_id``, and the ``/cache`` endpoint reports the store. The +hit fast-path (serve-from-cache + serve_window) lands separately. + +xorq-gated: the bundle's first window comes from a real expr via +``window_to_parquet``. See docs/initial-load-cache-design.md. +""" +import json +import shutil +import sys +import tempfile + +import pytest +import tornado.httpclient +import tornado.testing +import tornado.websocket + +xo = pytest.importorskip("xorq.api") + +from buckaroo.server.app import make_app as _make_app # noqa: E402 + +pytestmark = pytest.mark.skipif( + sys.platform == "win32", reason="Temp file locking prevents cleanup on Windows") + + +def make_app(): + return _make_app(open_browser=False) + + +def _build_expr_dir(builds_root): + expr = xo.memtable({ + 'idx': list(range(10)), + 'name': ['alpha', 'beta', 'gamma', 'alpha', 'delta', + 'epsilon', 'alpha', 'zeta', 'eta', 'alpha']}, name='t') + return str(xo.build_expr(expr, builds_dir=builds_root)) + + +async def _post(port, path, body): + client = tornado.httpclient.AsyncHTTPClient() + return await client.fetch( + f"http://localhost:{port}{path}", method="POST", body=json.dumps(body), + headers={"Content-Type": "application/json"}, raise_error=False) + + +async def _get(port, path): + client = tornado.httpclient.AsyncHTTPClient() + return await client.fetch(f"http://localhost:{port}{path}", method="GET", raise_error=False) + + +class TestInitialCacheServer(tornado.testing.AsyncHTTPTestCase): + def get_app(self): + return make_app() + + @tornado.testing.gen_test + async def test_load_expr_stores_bundle_and_cache_reports_it(self): + builds_root = tempfile.mkdtemp() + try: + build_path = _build_expr_dir(builds_root) + resp = await _post(self.get_http_port(), "/load_expr", + {"session": "ic-1", "build_dir": build_path}) + self.assertEqual(resp.code, 200) + body = json.loads(resp.body) + self.assertEqual(body["rows"], 10) + # The load reports a cache block: first load is a miss that stores. + cache = body["cache"] + self.assertEqual(cache["status"], "miss") + self.assertTrue(cache["data_id"]) + + # /cache reflects the stored bundle. + crep = json.loads((await _get(self.get_http_port(), "/cache")).body) + self.assertEqual(crep["count"], 1) + entry = crep["entries"][0] + self.assertEqual(entry["data_id"], cache["data_id"]) + self.assertEqual(entry["total_rows"], 10) + self.assertGreater(entry["bytes"], 0) + finally: + shutil.rmtree(builds_root, ignore_errors=True) + + @tornado.testing.gen_test + async def test_request_id_echoed(self): + builds_root = tempfile.mkdtemp() + try: + build_path = _build_expr_dir(builds_root) + resp = await _post(self.get_http_port(), "/load_expr", + {"session": "ic-rq", "build_dir": build_path, "request_id": "rq-7"}) + body = json.loads(resp.body) + self.assertEqual(body["cache"]["request_id"], "rq-7") + finally: + shutil.rmtree(builds_root, ignore_errors=True) + + @tornado.testing.gen_test + async def test_initial_cache_false_skips_store(self): + builds_root = tempfile.mkdtemp() + try: + build_path = _build_expr_dir(builds_root) + resp = await _post(self.get_http_port(), "/load_expr", + {"session": "ic-off", "build_dir": build_path, "initial_cache": False}) + self.assertEqual(resp.code, 200) + self.assertEqual(json.loads(resp.body)["cache"]["status"], "off") + crep = json.loads((await _get(self.get_http_port(), "/cache")).body) + self.assertEqual(crep["count"], 0) + finally: + shutil.rmtree(builds_root, ignore_errors=True) + + @tornado.testing.gen_test + async def test_cache_endpoint_empty(self): + crep = json.loads((await _get(self.get_http_port(), "/cache")).body) + self.assertEqual(crep["count"], 0) + self.assertEqual(crep["entries"], []) + + @tornado.testing.gen_test + async def test_load_expr_hit_on_repeat(self): + """Second load of the same expr is a cache HIT (same content-based + data_id). The hit must still render correctly: the WS initial_state + carries the bundle's df_display_args + df_meta.""" + builds_root = tempfile.mkdtemp() + try: + build_path = _build_expr_dir(builds_root) + r1 = json.loads((await _post(self.get_http_port(), "/load_expr", + {"session": "h-miss", "build_dir": build_path})).body) + self.assertEqual(r1["cache"]["status"], "miss") + + r2 = json.loads((await _post(self.get_http_port(), "/load_expr", + {"session": "h-hit", "build_dir": build_path})).body) + self.assertEqual(r2["cache"]["status"], "hit") + self.assertEqual(r2["rows"], 10) + self.assertEqual(r2["cache"]["data_id"], r1["cache"]["data_id"]) + + ws = await tornado.websocket.websocket_connect( + f"ws://localhost:{self.get_http_port()}/ws/h-hit") + init = json.loads(await ws.read_message()) + self.assertEqual(init["df_meta"]["total_rows"], 10) + self.assertIn("main", init["df_display_args"]) + ws.close() + finally: + shutil.rmtree(builds_root, ignore_errors=True) + + @tornado.testing.gen_test + async def test_hit_session_scrolls(self): + """A cache-hit session serves both the cached head window (the fast + path) and a sorted slice (which falls through to the warmed expr) — + both return the full 10-row count.""" + import io + + import pyarrow.parquet as pq + builds_root = tempfile.mkdtemp() + try: + build_path = _build_expr_dir(builds_root) + await _post(self.get_http_port(), "/load_expr", + {"session": "hs-a", "build_dir": build_path}) + await _post(self.get_http_port(), "/load_expr", + {"session": "hs-b", "build_dir": build_path}) # hit + + ws = await tornado.websocket.websocket_connect( + f"ws://localhost:{self.get_http_port()}/ws/hs-b") + await ws.read_message() # initial_state + + # Head window — served from the cache fast path. + ws.write_message(json.dumps({"type": "infinite_request", + "payload_args": {"start": 0, "end": 10, "sourceName": "default", "origEnd": 10}})) + r = json.loads(await ws.read_message()) + self.assertEqual(r["length"], 10) + self.assertEqual(pq.read_table(io.BytesIO(await ws.read_message())).num_rows, 10) + + # Sorted slice — falls through to the warmed expr (cheap dataflow). + ws.write_message(json.dumps({"type": "infinite_request", + "payload_args": {"start": 0, "end": 10, "sort": "a", + "sort_direction": "asc", "sourceName": "default", "origEnd": 10}})) + r2 = json.loads(await ws.read_message()) + self.assertEqual(r2["length"], 10) + self.assertNotIn("error_info", r2) + await ws.read_message() # binary + ws.close() + finally: + shutil.rmtree(builds_root, ignore_errors=True) + + @tornado.testing.gen_test + async def test_hit_serves_requested_subwindow_not_whole_window(self): + """The cache fast path must ship exactly the requested ``[start, end]`` + slice, not the whole cached window. + + AG Grid's first block is ``cacheBlockSize`` rows (visible + 50), smaller + than the cached window. Shipping the full window against a smaller key + makes the client's ``SmartRowCache.addRows`` reject the payload (row count + != requested segment) whenever ``total_rows > window``, so the initial + paint fails. Here the 10-row head window is asked for ``[0, 4]`` — the + binary frame must carry 4 rows.""" + import io + + import pyarrow.parquet as pq + builds_root = tempfile.mkdtemp() + try: + build_path = _build_expr_dir(builds_root) + await _post(self.get_http_port(), "/load_expr", + {"session": "sw-a", "build_dir": build_path}) + await _post(self.get_http_port(), "/load_expr", + {"session": "sw-b", "build_dir": build_path}) # hit + + ws = await tornado.websocket.websocket_connect( + f"ws://localhost:{self.get_http_port()}/ws/sw-b") + await ws.read_message() # initial_state + + ws.write_message(json.dumps({"type": "infinite_request", + "payload_args": {"start": 0, "end": 4, "sourceName": "default", "origEnd": 4}})) + r = json.loads(await ws.read_message()) + self.assertEqual(r["length"], 10) # total stays the full row count + self.assertEqual(r["key"]["end"], 4) + # The binary frame must hold exactly the requested 4 rows. + self.assertEqual(pq.read_table(io.BytesIO(await ws.read_message())).num_rows, 4) + ws.close() + finally: + shutil.rmtree(builds_root, ignore_errors=True) + + @tornado.testing.gen_test + async def test_hit_replays_display_overrides(self): + """A cache hit must honor the *current* request's display knobs, which are + deliberately excluded from the cache fingerprint. + + First load stores a baseline bundle (no overrides). A later hit of the + same expr supplies ``extra_grid_config``; the replayed df_display_args + must carry it, not the bundle's bare baseline.""" + builds_root = tempfile.mkdtemp() + try: + build_path = _build_expr_dir(builds_root) + await _post(self.get_http_port(), "/load_expr", + {"session": "ov-a", "build_dir": build_path}) # miss, baseline bundle + r2 = json.loads((await _post(self.get_http_port(), "/load_expr", + {"session": "ov-b", "build_dir": build_path, + "extra_grid_config": {"rowHeight": 99}})).body) # hit + override + self.assertEqual(r2["cache"]["status"], "hit") + + ws = await tornado.websocket.websocket_connect( + f"ws://localhost:{self.get_http_port()}/ws/ov-b") + init = json.loads(await ws.read_message()) + dvc = init["df_display_args"]["main"]["df_viewer_config"] + self.assertEqual(dvc.get("extra_grid_config"), {"rowHeight": 99}) + ws.close() + finally: + shutil.rmtree(builds_root, ignore_errors=True) + + @tornado.testing.gen_test + async def test_mismatch_recomputes(self): + """Same expr (same data_id) but a different data-touching config + (init_sd) ⇒ the cached bundle's config_id no longer matches ⇒ the load + recomputes rather than mis-serving. Reported as status='mismatch'.""" + builds_root = tempfile.mkdtemp() + try: + build_path = _build_expr_dir(builds_root) + await _post(self.get_http_port(), "/load_expr", + {"session": "mm-1", "build_dir": build_path}) # miss, stores config A + r2 = json.loads((await _post(self.get_http_port(), "/load_expr", + {"session": "mm-2", "build_dir": build_path, + "init_sd": {"name": {"custom_stat": 5}}})).body) + self.assertEqual(r2["cache"]["status"], "mismatch") + self.assertEqual(r2["rows"], 10) + finally: + shutil.rmtree(builds_root, ignore_errors=True)