Skip to content

Commit f4072bb

Browse files
committed
feat(store): LocalTieredStore — Tier-1 SQLite + Tier-2 Parquet (epic #540 phase 3b)
Second slice of Phase 3 of epic #540. Adds a real tiered storage implementation that ships the analytics value (cross-run DuckDB / Polars queries) without yet replacing the canonical .iafbt bundle. Layout under <root>: index.sqlite Tier-1 (always in sync) bundles/<handle>.iafbt canonical bytes parquet/portfolio_snapshots/run_id=<h>/... Tier-2 hive-partitioned parquet/trades/run_id=<h>/... Tier-2 parquet/orders/run_id=<h>/... Tier-2 Phase 3b deliberately keeps the bundle as the canonical representation; Tier-2 sidecars are auxiliary, written best-effort, and a malformed sidecar never blocks a write or a read. This trivially preserves byte-identical Backtest round-trips today. Byte-identical Tier-2 -> Backtest reassembly (no bundle on the read path) is Phase 3d. - decompose.py: Backtest -> flat record lists for snapshots / trades / orders, adding run_id and window_name columns so downstream tools group cleanly across walk-forward windows. Extension point for metric_series and any future kind is the DATASETS tuple. - LocalTieredStore: implements BacktestStore + SupportsCopyFrom. write() saves the bundle, upserts the Tier-1 row, and writes hive-partitioned Parquet sidecars per dataset. delete() removes all three tiers. iter_index_rows() serves from SQLite directly. rebuild_index() recreates Tier-1 from the bundles (useful after a software upgrade that adds new index columns). - scan('portfolio_snapshots' | 'trades' | 'orders') returns a pyarrow.dataset.Dataset that DuckDB / Polars can query across every run with partition pruning on run_id. - 15 new tests: Protocol + SupportsCopyFrom conformance, three-tier layout, handle normalisation, round-trip, summary_only, Tier-1 always-in-sync (write/delete/len), Tier-2 cross-run scan, copy_from from LocalDirStore, rebuild_index, missing-handle errors. Includes a synthetic-records test that asserts hive partitions are written and that scan() returns the expected rows + columns. Targeted suite (backtest_store + backtest_index + cli): 101 / 101 passing.
1 parent 13d8a58 commit f4072bb

4 files changed

Lines changed: 723 additions & 0 deletions

File tree

investing_algorithm_framework/services/backtest_store/__init__.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
SupportsCopyFrom,
2020
)
2121
from .local_dir_store import LocalDirStore
22+
from .local_tiered_store import LocalTieredStore
2223

2324
__all__ = [
2425
"BacktestStore",
@@ -27,4 +28,5 @@
2728
"StoreHandleNotFoundError",
2829
"SupportsCopyFrom",
2930
"LocalDirStore",
31+
"LocalTieredStore",
3032
]
Lines changed: 70 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,70 @@
1+
"""Backtest → flat-records decomposer for Tier-2 Parquet writes
2+
(epic #540 phase 3b).
3+
4+
Given a :class:`Backtest`, yields flat record lists for each Tier-2
5+
dataset (``portfolio_snapshots`` / ``trades`` / ``orders``). The
6+
output is shaped for direct ingestion by
7+
:func:`pyarrow.dataset.write_dataset` with hive partitioning on
8+
``run_id``.
9+
10+
The bundle (``.iafbt``) remains the canonical source of truth in
11+
Phase 3b; these Parquet datasets are *auxiliary* — they make
12+
DuckDB / Polars analytics work across thousands of runs without
13+
re-decoding bundles. Round-tripping Tier-2 back to a Backtest is
14+
Phase 3d territory.
15+
"""
16+
17+
from __future__ import annotations
18+
19+
from typing import Any, Dict, Iterator, List
20+
21+
from investing_algorithm_framework.domain import Backtest
22+
23+
24+
def _windowed_records(
25+
backtest: Backtest, attr: str, run_id: str,
26+
) -> Iterator[Dict[str, Any]]:
27+
"""Yield ``to_dict()``-shaped records from every BacktestRun.
28+
29+
Adds ``run_id`` and ``window_name`` columns so downstream
30+
columnar tools can group / partition cleanly across a Backtest's
31+
walk-forward windows.
32+
"""
33+
if not backtest.backtest_runs:
34+
return
35+
for window in backtest.backtest_runs:
36+
items = getattr(window, attr, None) or []
37+
window_name = getattr(window, "backtest_date_range_name", None) or (
38+
window.create_directory_name()
39+
if hasattr(window, "create_directory_name") else ""
40+
)
41+
for obj in items:
42+
d = obj.to_dict() if hasattr(obj, "to_dict") else dict(obj)
43+
d["run_id"] = run_id
44+
d["window_name"] = window_name
45+
yield d
46+
47+
48+
def snapshots(backtest: Backtest, run_id: str) -> List[Dict[str, Any]]:
49+
"""Flat portfolio_snapshots records, one per BacktestRun timestep."""
50+
return list(_windowed_records(backtest, "portfolio_snapshots", run_id))
51+
52+
53+
def trades(backtest: Backtest, run_id: str) -> List[Dict[str, Any]]:
54+
"""Flat trades records (one per trade across all windows)."""
55+
return list(_windowed_records(backtest, "trades", run_id))
56+
57+
58+
def orders(backtest: Backtest, run_id: str) -> List[Dict[str, Any]]:
59+
"""Flat orders records (one per order across all windows)."""
60+
return list(_windowed_records(backtest, "orders", run_id))
61+
62+
63+
# Datasets exposed by LocalTieredStore. Each entry is
64+
# (dataset_name, decomposer_fn). Add new kinds here (e.g.
65+
# metric_series) once their decomposer is in place.
66+
DATASETS = (
67+
("portfolio_snapshots", snapshots),
68+
("trades", trades),
69+
("orders", orders),
70+
)

0 commit comments

Comments
 (0)