Skip to content

Rename stream_id to state_id #842

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 8 additions & 12 deletions quixstreams/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -994,15 +994,13 @@ def _on_assign(self, _, topic_partitions: List[TopicPartition]):
)
committed_offsets[tp.partition][tp.topic] = tp.offset

# Match the assigned TP with a stream ID via DataFrameRegistry
# Match the assigned TP with a state ID via DataFrameRegistry
for tp in non_changelog_tps:
stream_ids = self._dataframe_registry.get_stream_ids(
topic_name=tp.topic
)
# Assign store partitions for the given stream ids
for stream_id in stream_ids:
state_ids = self._dataframe_registry.get_state_ids(topic_name=tp.topic)
# Assign store partitions for the given state ids
for state_id in state_ids:
self._state_manager.on_partition_assign(
stream_id=stream_id,
state_id=state_id,
partition=tp.partition,
committed_offsets=committed_offsets[tp.partition],
)
Expand Down Expand Up @@ -1044,12 +1042,10 @@ def _revoke_state_partitions(self, topic_partitions: List[TopicPartition]):
]
for tp in non_changelog_tps:
if self._state_manager.stores:
stream_ids = self._dataframe_registry.get_stream_ids(
topic_name=tp.topic
)
for stream_id in stream_ids:
state_ids = self._dataframe_registry.get_state_ids(topic_name=tp.topic)
for state_id in state_ids:
self._state_manager.on_partition_revoke(
stream_id=stream_id, partition=tp.partition
state_id=state_id, partition=tp.partition
)

def _setup_signal_handlers(self):
Expand Down
18 changes: 7 additions & 11 deletions quixstreams/checkpointing/checkpoint.py
Original file line number Diff line number Diff line change
Expand Up @@ -148,28 +148,26 @@ def __init__(
self._producer.begin_transaction()

def get_store_transaction(
self, stream_id: str, partition: int, store_name: str = DEFAULT_STATE_STORE_NAME
self, state_id: str, partition: int, store_name: str = DEFAULT_STATE_STORE_NAME
) -> PartitionTransaction:
"""
Get a PartitionTransaction for the given store, topic and partition.

It will return already started transaction if there's one.

:param stream_id: stream id
:param state_id: state id
:param partition: partition number
:param store_name: store name
:return: instance of `PartitionTransaction`
"""
transaction = self._store_transactions.get((stream_id, partition, store_name))
transaction = self._store_transactions.get((state_id, partition, store_name))
if transaction is not None:
return transaction

store = self._state_manager.get_store(
stream_id=stream_id, store_name=store_name
)
store = self._state_manager.get_store(state_id=state_id, store_name=store_name)
transaction = store.start_partition_transaction(partition=partition)

self._store_transactions[(stream_id, partition, store_name)] = transaction
self._store_transactions[(state_id, partition, store_name)] = transaction
return transaction

def close(self):
Expand Down Expand Up @@ -227,13 +225,11 @@ def commit(self):

# Step 2. Produce the changelogs
for (
stream_id,
state_id,
partition,
store_name,
), transaction in self._store_transactions.items():
topics = self._dataframe_registry.get_topics_for_stream_id(
stream_id=stream_id
)
topics = self._dataframe_registry.get_topics_for_state_id(state_id=state_id)
processed_offsets = {
topic: offset
for (topic, partition_), offset in self._tp_offsets.items()
Expand Down
72 changes: 54 additions & 18 deletions quixstreams/dataframe/dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ class StreamingDataFrame:
What it Does:

- Builds a data processing pipeline, declaratively (not executed immediately)
- Executes this pipeline on inputs at runtime (Kafka message values)
- Executes this pipeline on inputs at runtime (Kafka message values)
- Provides functions/interface similar to Pandas Dataframes/Series
- Enables stateful processing (and manages everything related to it)

Expand Down Expand Up @@ -135,21 +135,26 @@ def __init__(
registry: DataFrameRegistry,
processing_context: ProcessingContext,
stream: Optional[Stream] = None,
state_id: Optional[str] = None,
):
if not topics:
raise ValueError("At least one Topic must be passed")

self._stream: Stream = stream or Stream()
# Implicitly deduplicate Topic objects into a tuple and sort them by name
self._topics: tuple[Topic, ...] = tuple(
sorted({t.name: t for t in topics}.values(), key=attrgetter("name"))
)

self._stream: Stream = stream or Stream()
self._state_id: str = state_id or topic_manager.state_id_from_topics(
self.topics
)
self._topic_manager = topic_manager
self._registry = registry
self._processing_context = processing_context
self._producer = processing_context.producer
self._registry.register_stream_id(
stream_id=self.stream_id, topic_names=[t.name for t in self._topics]
self._registry.register_state_id(
state_id=self.state_id, topic_names=[t.name for t in self._topics]
)

@property
Expand All @@ -161,20 +166,20 @@ def stream(self) -> Stream:
return self._stream

@property
def stream_id(self) -> str:
def state_id(self) -> str:
"""
An identifier of the data stream this StreamingDataFrame
manipulates in the application.

It is used as a common prefix for state stores and group-by topics.
A new `stream_id` is set when StreamingDataFrames are merged via `.merge()`
A new `state_id` is set when StreamingDataFrames are merged via `.merge()`
or grouped via `.group_by()`.

StreamingDataFrames with different `stream_id` cannot access the same state stores.
StreamingDataFrames with different `state_id` cannot access the same state stores.

By default, a topic name or a combination of topic names are used as `stream_id`.
By default, a topic name or a combination of topic names are used as `state_id`.
"""
return self._topic_manager.stream_id_from_topics(self.topics)
return self._state_id

@property
def topics(self) -> tuple[Topic, ...]:
Expand Down Expand Up @@ -281,7 +286,7 @@ def func(d: dict, state: State):
stateful_func = _as_stateful(
func=with_metadata_func,
processing_context=self._processing_context,
stream_id=self.stream_id,
state_id=self.state_id,
)
stream = self.stream.add_apply(stateful_func, expand=expand, metadata=True) # type: ignore[call-overload]
else:
Expand Down Expand Up @@ -390,7 +395,7 @@ def func(values: list, state: State):
stateful_func = _as_stateful(
func=with_metadata_func,
processing_context=self._processing_context,
stream_id=self.stream_id,
state_id=self.state_id,
)
return self._add_update(stateful_func, metadata=True)
else:
Expand Down Expand Up @@ -492,7 +497,7 @@ def func(d: dict, state: State):
stateful_func = _as_stateful(
func=with_metadata_func,
processing_context=self._processing_context,
stream_id=self.stream_id,
state_id=self.state_id,
)
stream = self.stream.add_filter(stateful_func, metadata=True)
else:
Expand Down Expand Up @@ -591,9 +596,14 @@ def func(d: dict, state: State):
# Generate a config for the new repartition topic based on the underlying topics
repartition_config = self._topic_manager.derive_topic_config(self._topics)

# If the topic has only one partition, we don't need a repartition topic
# we can directly change the messages key as they all go to the same partition.
if repartition_config.num_partitions == 1:
return self._single_partition_groupby(operation, key)

groupby_topic = self._topic_manager.repartition_topic(
operation=operation,
stream_id=self.stream_id,
state_id=self.state_id,
config=repartition_config,
key_serializer=key_serializer,
value_serializer=value_serializer,
Expand All @@ -606,6 +616,29 @@ def func(d: dict, state: State):
self._registry.register_groupby(source_sdf=self, new_sdf=groupby_sdf)
return groupby_sdf

def _single_partition_groupby(
self, operation: str, key: Union[str, Callable[[Any], Any]]
) -> "StreamingDataFrame":
if isinstance(key, str):

def _callback(value, _, timestamp, headers):
return value, value[key], timestamp, headers
else:

def _callback(value, _, timestamp, headers):
return value, key(value), timestamp, headers

stream = self.stream.add_transform(_callback, expand=False)

groupby_sdf = self.__dataframe_clone__(
stream=stream, state_id=f"{self.state_id}--groupby--{operation}"
)
self._registry.register_groupby(
source_sdf=self, new_sdf=groupby_sdf, register_new_root=False
)

return groupby_sdf

def contains(self, keys: Union[str, list[str]]) -> StreamingSeries:
"""
Check if keys are present in the Row value.
Expand Down Expand Up @@ -1650,15 +1683,15 @@ def _add_update(

def _register_store(self):
"""
Register the default store for the current stream_id in StateStoreManager.
Register the default store for the current state_id in StateStoreManager.
"""
self.ensure_topics_copartitioned()

# Generate a changelog topic config based on the underlying topics.
changelog_topic_config = self._topic_manager.derive_topic_config(self._topics)

self._processing_context.state_manager.register_store(
stream_id=self.stream_id, changelog_config=changelog_topic_config
state_id=self.state_id, changelog_config=changelog_topic_config
)

def _groupby_key(
Expand All @@ -1678,19 +1711,22 @@ def __dataframe_clone__(
self,
*topics: Topic,
stream: Optional[Stream] = None,
state_id: Optional[str] = None,
) -> "StreamingDataFrame":
"""
Clone the StreamingDataFrame with a new `stream`, `topics`,
and optional `stream_id` parameters.
and optional `state_id` parameters.

:param topics: one or more `Topic` objects
:param stream: instance of `Stream`, optional.
:param state_id: str, optional.
:return: a new `StreamingDataFrame`.
"""

clone = self.__class__(
*(topics or self._topics),
stream=stream,
state_id=state_id,
processing_context=self._processing_context,
topic_manager=self._topic_manager,
registry=self._registry,
Expand Down Expand Up @@ -1805,13 +1841,13 @@ def wrapper(
def _as_stateful(
func: Callable[[Any, Any, int, Any, State], T],
processing_context: ProcessingContext,
stream_id: str,
state_id: str,
) -> Callable[[Any, Any, int, Any], T]:
@functools.wraps(func)
def wrapper(value: Any, key: Any, timestamp: int, headers: Any) -> Any:
ctx = message_context()
transaction = processing_context.checkpoint.get_store_transaction(
stream_id=stream_id,
state_id=state_id,
partition=ctx.partition,
)
# Pass a State object with an interface limited to the key updates only
Expand Down
55 changes: 34 additions & 21 deletions quixstreams/dataframe/registry.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,8 @@ def __init__(self) -> None:
self._registry: dict[str, Stream] = {}
self._topics: list[Topic] = []
self._repartition_origins: set[str] = set()
self._topics_to_stream_ids: dict[str, set[str]] = {}
self._stream_ids_to_topics: dict[str, set[str]] = {}
self._topics_to_state_ids: dict[str, set[str]] = {}
self._state_ids_to_topics: dict[str, set[str]] = {}

@property
def consumer_topics(self) -> list[Topic]:
Expand Down Expand Up @@ -61,26 +61,39 @@ def register_root(
self._registry[topic.name] = dataframe.stream

def register_groupby(
self, source_sdf: "StreamingDataFrame", new_sdf: "StreamingDataFrame"
self,
source_sdf: "StreamingDataFrame",
new_sdf: "StreamingDataFrame",
register_new_root: bool = True,
):
"""
Register a "groupby" SDF, which is one generated with `SDF.group_by()`.
:param source_sdf: the SDF used by `sdf.group_by()`
:param new_sdf: the SDF generated by `sdf.group_by()`.
"""
if source_sdf.stream_id in self._repartition_origins:
if source_sdf.state_id in self._repartition_origins:
raise GroupByNestingLimit(
"Subsequent (nested) `SDF.group_by()` operations are not allowed."
)
try:
self.register_root(new_sdf)
except StreamingDataFrameDuplicate:

if new_sdf.state_id in self._repartition_origins:
raise GroupByDuplicate(
"A `SDF.group_by()` operation appears to be the same as another, "
"either from using the same column or name parameter; "
"adjust by setting a unique name with `SDF.group_by(name=<NAME>)` "
)
self._repartition_origins.add(new_sdf.stream_id)

self._repartition_origins.add(new_sdf.state_id)

if register_new_root:
try:
self.register_root(new_sdf)
except StreamingDataFrameDuplicate:
raise GroupByDuplicate(
"A `SDF.group_by()` operation appears to be the same as another, "
"either from using the same column or name parameter; "
"adjust by setting a unique name with `SDF.group_by(name=<NAME>)` "
)

def compose_all(
self, sink: Optional[VoidExecutor] = None
Expand All @@ -100,34 +113,34 @@ def compose_all(
executors[topic] = root_executors[root_stream]
return executors

def register_stream_id(self, stream_id: str, topic_names: list[str]):
def register_state_id(self, state_id: str, topic_names: list[str]):
"""
Register a mapping between the stream_id and topic names.
Register a mapping between the state_id and topic names.
This mapping is later used to match topics to state stores
during assignment and commits.

The same stream id can be registered multiple times.
:param stream_id: stream id of StreamingDataFrame
The same state id can be registered multiple times.
:param state_id: state id of StreamingDataFrame
:param topic_names: list of topics to map the stream id with
"""
for topic_name in topic_names:
self._topics_to_stream_ids.setdefault(topic_name, set()).add(stream_id)
self._stream_ids_to_topics.setdefault(stream_id, set()).add(topic_name)
self._topics_to_state_ids.setdefault(topic_name, set()).add(state_id)
self._state_ids_to_topics.setdefault(state_id, set()).add(topic_name)

def get_stream_ids(self, topic_name: str) -> list[str]:
def get_state_ids(self, topic_name: str) -> list[str]:
"""
Get a list of stream ids for the given topic name
Get a list of state ids for the given topic name

:param topic_name: a name of the topic
:return: a list of stream ids
:return: a list of state ids
"""
return list(self._topics_to_stream_ids[topic_name])
return list(self._topics_to_state_ids[topic_name])

def get_topics_for_stream_id(self, stream_id: str) -> list[str]:
def get_topics_for_state_id(self, state_id: str) -> list[str]:
"""
Get a list of topics for the given stream id.

:param stream_id: stream id
:param state_id: state id
:return: a list of topic names
"""
return list(self._stream_ids_to_topics[stream_id])
return list(self._state_ids_to_topics[state_id])
Loading