Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 20 additions & 0 deletions python/python/lance/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,17 @@
bytes_read_counter,
iops_counter,
)
from .mem_wal import (
ExecutionPlan,
LsmPointLookupPlanner,
LsmScanner,
LsmVectorSearchPlanner,
MergedGeneration,
RegionField,
RegionSnapshot,
RegionSpec,
RegionWriter,
)
from .namespace import (
DescribeTableRequest,
LanceNamespace,
Expand Down Expand Up @@ -83,6 +94,15 @@
"set_logger",
"write_dataset",
"FFILanceTableProvider",
"ExecutionPlan",
"LsmPointLookupPlanner",
"LsmScanner",
"LsmVectorSearchPlanner",
"MergedGeneration",
"RegionField",
"RegionSpec",
"RegionSnapshot",
"RegionWriter",
]


Expand Down
201 changes: 201 additions & 0 deletions python/python/lance/dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@

from lance.namespace import LanceNamespace

from . import mem_wal
from .commit import CommitLock
from .io import StorageOptionsProvider
from .lance.indices import IndexDescription
Expand Down Expand Up @@ -411,6 +412,25 @@ def analyze_plan(
reader = _coerce_reader(data_obj, schema)
return super(MergeInsertBuilder, self).analyze_plan(reader)

def mark_generations_as_merged(
self, generations: "List[mem_wal.MergedGeneration]"
) -> "MergeInsertBuilder":
"""Mark MemWAL generations as merged into the base table.

Call this before executing the merge_insert when the source data
includes rows from MemWAL flushed generations.

Parameters
----------
generations : list of MergedGeneration
Generations to mark as merged.
"""
from .mem_wal import _to_raw_merged_generations

raw_gens = _to_raw_merged_generations(generations)
super(MergeInsertBuilder, self).mark_generations_as_merged(raw_gens)
return self


class LanceDataset(pa.dataset.Dataset):
"""A Lance Dataset in Lance format where the data is stored at the given uri."""
Expand Down Expand Up @@ -4021,6 +4041,187 @@ def centroids(

return ivf.centroids

def initialize_mem_wal(
self,
*,
maintained_indexes: Optional[List[str]] = None,
region_spec: Optional["mem_wal.RegionSpec"] = None,
) -> None:
"""Initialize MemWAL on this dataset.

Must be called once before any calls to `mem_wal_writer`.
The dataset schema must have at least one field annotated with
the ``lance-schema:unenforced-primary-key`` Arrow field metadata.

Parameters
----------
maintained_indexes : list of str, optional
Names of existing vector indexes to keep updated as data is
written through the MemWAL. Must reference indexes that
already exist on the dataset.
region_spec : RegionSpec, optional
Partitioning specification for automatic region routing.
When provided, Lance will derive a region identifier from each
written row according to the spec and route writes to the
correct `~lance.mem_wal.RegionWriter` automatically.
When ``None`` (default), the caller must manage region IDs
manually by passing them to `mem_wal_writer`.

Raises
------
IOError
- Dataset has no ``lance-schema:unenforced-primary-key`` field.
- An entry in *maintained_indexes* does not exist on the dataset.
- MemWAL has already been initialized on this dataset.

Examples
--------
Without region spec (manual region management):

import lance
import pyarrow as pa
import tempfile
schema = pa.schema([
... pa.field("id", pa.int64(), nullable=False,
... metadata={"lance-schema:unenforced-primary-key": "true"}),
... pa.field("val", pa.float32()),
... ])
table = pa.table({"id": [1], "val": [0.1]}, schema=schema)
with tempfile.TemporaryDirectory() as tmpdir:
ds = lance.write_dataset(table, tmpdir)
ds.initialize_mem_wal()

With a region spec for automatic routing by ``tenant_id``:

from lance.mem_wal import RegionField, RegionSpec
spec = RegionSpec(
... spec_id=1,
... fields=[RegionField(field_id="tenant_id", source_ids=[0],
... result_type="int64")],
... )
ds.initialize_mem_wal(region_spec=spec)
"""
self._ds.initialize_mem_wal(
maintained_indexes=maintained_indexes,
region_spec=region_spec,
)

def mem_wal_writer(
self,
region_id: str,
*,
durable_write: Optional[bool] = None,
sync_indexed_write: Optional[bool] = None,
max_wal_buffer_size: Optional[int] = None,
max_wal_flush_interval_ms: Optional[int] = None,
max_memtable_size: Optional[int] = None,
max_memtable_rows: Optional[int] = None,
max_memtable_batches: Optional[int] = None,
max_unflushed_memtable_bytes: Optional[int] = None,
ivf_index_partition_capacity_safety_factor: Optional[int] = None,
manifest_scan_batch_size: Optional[int] = None,
async_index_buffer_rows: Optional[int] = None,
async_index_interval_ms: Optional[int] = None,
backpressure_log_interval_ms: Optional[int] = None,
stats_log_interval_ms: Optional[int] = None,
) -> "mem_wal.RegionWriter":
"""Get a RegionWriter for the specified region.

`initialize_mem_wal` must be called before using this method.
Each *region* is an independent write shard; use different region IDs
to achieve parallel ingestion without writer contention.

Parameters
----------
region_id : str
UUID string identifying the write region (e.g.
``str(uuid.uuid4())``).
durable_write : bool, optional
Whether to fsync WAL writes (default: ``True``).
sync_indexed_write : bool, optional
Whether index updates are synchronous (default: ``True``).
max_wal_buffer_size : int, optional
Maximum WAL buffer size in bytes (default: 10 MB).
max_wal_flush_interval_ms : int, optional
Maximum WAL flush interval in milliseconds (default: 100).
max_memtable_size : int, optional
Maximum MemTable size in bytes (default: 256 MB).
max_memtable_rows : int, optional
Maximum rows per MemTable (default: 100 000).
max_memtable_batches : int, optional
Maximum batches per MemTable (default: 8 000).
max_unflushed_memtable_bytes : int, optional
Maximum unflushed bytes before backpressure (default: 1 GB).
ivf_index_partition_capacity_safety_factor : int, optional
Safety factor for IVF partition capacity (default: 8).
manifest_scan_batch_size : int, optional
Batch size for manifest scans (default: 2).
async_index_buffer_rows : int, optional
Buffer rows for async index updates (default: 10 000).
async_index_interval_ms : int, optional
Interval for async index updates in milliseconds (default: 1000).
backpressure_log_interval_ms : int, optional
Interval for backpressure log messages in milliseconds
(default: 30 000).
stats_log_interval_ms : int, optional
Interval for statistics log messages in milliseconds
(default: 60 000). Pass ``0`` to disable.

Returns
-------
RegionWriter
A context-manager-compatible writer for the specified region.

Examples
--------
>>> import lance
>>> import pyarrow as pa
>>> import tempfile
>>> import uuid
>>> schema = pa.schema([
... pa.field("id", pa.int64(), nullable=False,
... metadata={"lance-schema:unenforced-primary-key": "true"}),
... pa.field("val", pa.float32()),
... ])
>>> with tempfile.TemporaryDirectory() as tmpdir:
... ds = lance.write_dataset(
... pa.table({"id": [1], "val": [0.1]}, schema=schema),
... tmpdir,
... )
... ds.initialize_mem_wal()
... region_id = str(uuid.uuid4())
... new_data = pa.table({"id": [2], "val": [0.2]}, schema=schema)
... with ds.mem_wal_writer(region_id) as writer:
... writer.put(new_data)
"""
import lance.mem_wal as _mw

kwargs = {
name: val
for name, val in [
("durable_write", durable_write),
("sync_indexed_write", sync_indexed_write),
("max_wal_buffer_size", max_wal_buffer_size),
("max_wal_flush_interval_ms", max_wal_flush_interval_ms),
("max_memtable_size", max_memtable_size),
("max_memtable_rows", max_memtable_rows),
("max_memtable_batches", max_memtable_batches),
("max_unflushed_memtable_bytes", max_unflushed_memtable_bytes),
(
"ivf_index_partition_capacity_safety_factor",
ivf_index_partition_capacity_safety_factor,
),
("manifest_scan_batch_size", manifest_scan_batch_size),
("async_index_buffer_rows", async_index_buffer_rows),
("async_index_interval_ms", async_index_interval_ms),
("backpressure_log_interval_ms", backpressure_log_interval_ms),
("stats_log_interval_ms", stats_log_interval_ms),
]
if val is not None
}
raw = self._ds.mem_wal_writer(region_id, **kwargs)
return _mw.RegionWriter(raw)


class SqlQuery:
"""
Expand Down
Loading
Loading