Skip to content

Commit 22518e7

Browse files
committed
Rename stream_id to state_id
1 parent 323802e commit 22518e7

File tree

28 files changed

+313
-326
lines changed

28 files changed

+313
-326
lines changed

quixstreams/app.py

+8-12
Original file line numberDiff line numberDiff line change
@@ -994,15 +994,13 @@ def _on_assign(self, _, topic_partitions: List[TopicPartition]):
994994
)
995995
committed_offsets[tp.partition][tp.topic] = tp.offset
996996

997-
# Match the assigned TP with a stream ID via DataFrameRegistry
997+
# Match the assigned TP with a state ID via DataFrameRegistry
998998
for tp in non_changelog_tps:
999-
stream_ids = self._dataframe_registry.get_stream_ids(
1000-
topic_name=tp.topic
1001-
)
1002-
# Assign store partitions for the given stream ids
1003-
for stream_id in stream_ids:
999+
state_ids = self._dataframe_registry.get_state_ids(topic_name=tp.topic)
1000+
# Assign store partitions for the given state ids
1001+
for state_id in state_ids:
10041002
self._state_manager.on_partition_assign(
1005-
stream_id=stream_id,
1003+
state_id=state_id,
10061004
partition=tp.partition,
10071005
committed_offsets=committed_offsets[tp.partition],
10081006
)
@@ -1044,12 +1042,10 @@ def _revoke_state_partitions(self, topic_partitions: List[TopicPartition]):
10441042
]
10451043
for tp in non_changelog_tps:
10461044
if self._state_manager.stores:
1047-
stream_ids = self._dataframe_registry.get_stream_ids(
1048-
topic_name=tp.topic
1049-
)
1050-
for stream_id in stream_ids:
1045+
state_ids = self._dataframe_registry.get_state_ids(topic_name=tp.topic)
1046+
for state_id in state_ids:
10511047
self._state_manager.on_partition_revoke(
1052-
stream_id=stream_id, partition=tp.partition
1048+
state_id=state_id, partition=tp.partition
10531049
)
10541050

10551051
def _setup_signal_handlers(self):

quixstreams/checkpointing/checkpoint.py

+7-11
Original file line numberDiff line numberDiff line change
@@ -148,28 +148,26 @@ def __init__(
148148
self._producer.begin_transaction()
149149

150150
def get_store_transaction(
151-
self, stream_id: str, partition: int, store_name: str = DEFAULT_STATE_STORE_NAME
151+
self, state_id: str, partition: int, store_name: str = DEFAULT_STATE_STORE_NAME
152152
) -> PartitionTransaction:
153153
"""
154154
Get a PartitionTransaction for the given store, topic and partition.
155155
156156
It will return already started transaction if there's one.
157157
158-
:param stream_id: stream id
158+
:param state_id: state id
159159
:param partition: partition number
160160
:param store_name: store name
161161
:return: instance of `PartitionTransaction`
162162
"""
163-
transaction = self._store_transactions.get((stream_id, partition, store_name))
163+
transaction = self._store_transactions.get((state_id, partition, store_name))
164164
if transaction is not None:
165165
return transaction
166166

167-
store = self._state_manager.get_store(
168-
stream_id=stream_id, store_name=store_name
169-
)
167+
store = self._state_manager.get_store(state_id=state_id, store_name=store_name)
170168
transaction = store.start_partition_transaction(partition=partition)
171169

172-
self._store_transactions[(stream_id, partition, store_name)] = transaction
170+
self._store_transactions[(state_id, partition, store_name)] = transaction
173171
return transaction
174172

175173
def close(self):
@@ -227,13 +225,11 @@ def commit(self):
227225

228226
# Step 2. Produce the changelogs
229227
for (
230-
stream_id,
228+
state_id,
231229
partition,
232230
store_name,
233231
), transaction in self._store_transactions.items():
234-
topics = self._dataframe_registry.get_topics_for_stream_id(
235-
stream_id=stream_id
236-
)
232+
topics = self._dataframe_registry.get_topics_for_state_id(state_id=state_id)
237233
processed_offsets = {
238234
topic: offset
239235
for (topic, partition_), offset in self._tp_offsets.items()

quixstreams/dataframe/dataframe.py

+22-21
Original file line numberDiff line numberDiff line change
@@ -135,7 +135,7 @@ def __init__(
135135
registry: DataFrameRegistry,
136136
processing_context: ProcessingContext,
137137
stream: Optional[Stream] = None,
138-
stream_id: Optional[str] = None,
138+
state_id: Optional[str] = None,
139139
):
140140
if not topics:
141141
raise ValueError("At least one Topic must be passed")
@@ -146,15 +146,15 @@ def __init__(
146146
)
147147

148148
self._stream: Stream = stream or Stream()
149-
self._stream_id: str = stream_id or topic_manager.stream_id_from_topics(
149+
self._state_id: str = state_id or topic_manager.state_id_from_topics(
150150
self.topics
151151
)
152152
self._topic_manager = topic_manager
153153
self._registry = registry
154154
self._processing_context = processing_context
155155
self._producer = processing_context.producer
156-
self._registry.register_stream_id(
157-
stream_id=self.stream_id, topic_names=[t.name for t in self._topics]
156+
self._registry.register_state_id(
157+
state_id=self.state_id, topic_names=[t.name for t in self._topics]
158158
)
159159

160160
@property
@@ -166,20 +166,20 @@ def stream(self) -> Stream:
166166
return self._stream
167167

168168
@property
169-
def stream_id(self) -> str:
169+
def state_id(self) -> str:
170170
"""
171171
An identifier of the data stream this StreamingDataFrame
172172
manipulates in the application.
173173
174174
It is used as a common prefix for state stores and group-by topics.
175-
A new `stream_id` is set when StreamingDataFrames are merged via `.merge()`
175+
A new `state_id` is set when StreamingDataFrames are merged via `.merge()`
176176
or grouped via `.group_by()`.
177177
178-
StreamingDataFrames with different `stream_id` cannot access the same state stores.
178+
StreamingDataFrames with different `state_id` cannot access the same state stores.
179179
180-
By default, a topic name or a combination of topic names are used as `stream_id`.
180+
By default, a topic name or a combination of topic names are used as `state_id`.
181181
"""
182-
return self._stream_id
182+
return self._state_id
183183

184184
@property
185185
def topics(self) -> tuple[Topic, ...]:
@@ -286,7 +286,7 @@ def func(d: dict, state: State):
286286
stateful_func = _as_stateful(
287287
func=with_metadata_func,
288288
processing_context=self._processing_context,
289-
stream_id=self.stream_id,
289+
state_id=self.state_id,
290290
)
291291
stream = self.stream.add_apply(stateful_func, expand=expand, metadata=True) # type: ignore[call-overload]
292292
else:
@@ -395,7 +395,7 @@ def func(values: list, state: State):
395395
stateful_func = _as_stateful(
396396
func=with_metadata_func,
397397
processing_context=self._processing_context,
398-
stream_id=self.stream_id,
398+
state_id=self.state_id,
399399
)
400400
return self._add_update(stateful_func, metadata=True)
401401
else:
@@ -497,7 +497,7 @@ def func(d: dict, state: State):
497497
stateful_func = _as_stateful(
498498
func=with_metadata_func,
499499
processing_context=self._processing_context,
500-
stream_id=self.stream_id,
500+
state_id=self.state_id,
501501
)
502502
stream = self.stream.add_filter(stateful_func, metadata=True)
503503
else:
@@ -603,7 +603,7 @@ def func(d: dict, state: State):
603603

604604
groupby_topic = self._topic_manager.repartition_topic(
605605
operation=operation,
606-
stream_id=self.stream_id,
606+
state_id=self.state_id,
607607
config=repartition_config,
608608
key_serializer=key_serializer,
609609
value_serializer=value_serializer,
@@ -631,7 +631,7 @@ def _callback(value, _, timestamp, headers):
631631
stream = self.stream.add_transform(_callback, expand=False)
632632

633633
groupby_sdf = self.__dataframe_clone__(
634-
stream=stream, stream_id=f"{self.stream_id}--groupby--{operation}"
634+
stream=stream, state_id=f"{self.state_id}--groupby--{operation}"
635635
)
636636
self._registry.register_groupby(
637637
source_sdf=self, new_sdf=groupby_sdf, register_new_root=False
@@ -1683,15 +1683,15 @@ def _add_update(
16831683

16841684
def _register_store(self):
16851685
"""
1686-
Register the default store for the current stream_id in StateStoreManager.
1686+
Register the default store for the current state_id in StateStoreManager.
16871687
"""
16881688
self.ensure_topics_copartitioned()
16891689

16901690
# Generate a changelog topic config based on the underlying topics.
16911691
changelog_topic_config = self._topic_manager.derive_topic_config(self._topics)
16921692

16931693
self._processing_context.state_manager.register_store(
1694-
stream_id=self.stream_id, changelog_config=changelog_topic_config
1694+
state_id=self.state_id, changelog_config=changelog_topic_config
16951695
)
16961696

16971697
def _groupby_key(
@@ -1711,21 +1711,22 @@ def __dataframe_clone__(
17111711
self,
17121712
*topics: Topic,
17131713
stream: Optional[Stream] = None,
1714-
stream_id: Optional[str] = None,
1714+
state_id: Optional[str] = None,
17151715
) -> "StreamingDataFrame":
17161716
"""
17171717
Clone the StreamingDataFrame with a new `stream`, `topics`,
1718-
and optional `stream_id` parameters.
1718+
and optional `state_id` parameters.
17191719
17201720
:param topics: one or more `Topic` objects
17211721
:param stream: instance of `Stream`, optional.
1722+
:param state_id: str, optional.
17221723
:return: a new `StreamingDataFrame`.
17231724
"""
17241725

17251726
clone = self.__class__(
17261727
*(topics or self._topics),
17271728
stream=stream,
1728-
stream_id=stream_id,
1729+
state_id=state_id,
17291730
processing_context=self._processing_context,
17301731
topic_manager=self._topic_manager,
17311732
registry=self._registry,
@@ -1840,13 +1841,13 @@ def wrapper(
18401841
def _as_stateful(
18411842
func: Callable[[Any, Any, int, Any, State], T],
18421843
processing_context: ProcessingContext,
1843-
stream_id: str,
1844+
state_id: str,
18441845
) -> Callable[[Any, Any, int, Any], T]:
18451846
@functools.wraps(func)
18461847
def wrapper(value: Any, key: Any, timestamp: int, headers: Any) -> Any:
18471848
ctx = message_context()
18481849
transaction = processing_context.checkpoint.get_store_transaction(
1849-
stream_id=stream_id,
1850+
state_id=state_id,
18501851
partition=ctx.partition,
18511852
)
18521853
# Pass a State object with an interface limited to the key updates only

quixstreams/dataframe/registry.py

+18-18
Original file line numberDiff line numberDiff line change
@@ -24,8 +24,8 @@ def __init__(self) -> None:
2424
self._registry: dict[str, Stream] = {}
2525
self._topics: list[Topic] = []
2626
self._repartition_origins: set[str] = set()
27-
self._topics_to_stream_ids: dict[str, set[str]] = {}
28-
self._stream_ids_to_topics: dict[str, set[str]] = {}
27+
self._topics_to_state_ids: dict[str, set[str]] = {}
28+
self._state_ids_to_topics: dict[str, set[str]] = {}
2929

3030
@property
3131
def consumer_topics(self) -> list[Topic]:
@@ -72,19 +72,19 @@ def register_groupby(
7272
:param new_sdf: the SDF generated by `sdf.group_by()`.
7373
"""
7474

75-
if source_sdf.stream_id in self._repartition_origins:
75+
if source_sdf.state_id in self._repartition_origins:
7676
raise GroupByNestingLimit(
7777
"Subsequent (nested) `SDF.group_by()` operations are not allowed."
7878
)
7979

80-
if new_sdf.stream_id in self._repartition_origins:
80+
if new_sdf.state_id in self._repartition_origins:
8181
raise GroupByDuplicate(
8282
"A `SDF.group_by()` operation appears to be the same as another, "
8383
"either from using the same column or name parameter; "
8484
"adjust by setting a unique name with `SDF.group_by(name=<NAME>)` "
8585
)
8686

87-
self._repartition_origins.add(new_sdf.stream_id)
87+
self._repartition_origins.add(new_sdf.state_id)
8888

8989
if register_new_root:
9090
try:
@@ -114,34 +114,34 @@ def compose_all(
114114
executors[topic] = root_executors[root_stream]
115115
return executors
116116

117-
def register_stream_id(self, stream_id: str, topic_names: list[str]):
117+
def register_state_id(self, state_id: str, topic_names: list[str]):
118118
"""
119-
Register a mapping between the stream_id and topic names.
119+
Register a mapping between the state_id and topic names.
120120
This mapping is later used to match topics to state stores
121121
during assignment and commits.
122122
123-
The same stream id can be registered multiple times.
124-
:param stream_id: stream id of StreamingDataFrame
123+
The same state id can be registered multiple times.
124+
:param state_id: state id of StreamingDataFrame
125125
:param topic_names: list of topics to map the stream id with
126126
"""
127127
for topic_name in topic_names:
128-
self._topics_to_stream_ids.setdefault(topic_name, set()).add(stream_id)
129-
self._stream_ids_to_topics.setdefault(stream_id, set()).add(topic_name)
128+
self._topics_to_state_ids.setdefault(topic_name, set()).add(state_id)
129+
self._state_ids_to_topics.setdefault(state_id, set()).add(topic_name)
130130

131-
def get_stream_ids(self, topic_name: str) -> list[str]:
131+
def get_state_ids(self, topic_name: str) -> list[str]:
132132
"""
133-
Get a list of stream ids for the given topic name
133+
Get a list of state ids for the given topic name
134134
135135
:param topic_name: a name of the topic
136-
:return: a list of stream ids
136+
:return: a list of state ids
137137
"""
138-
return list(self._topics_to_stream_ids[topic_name])
138+
return list(self._topics_to_state_ids[topic_name])
139139

140-
def get_topics_for_stream_id(self, stream_id: str) -> list[str]:
140+
def get_topics_for_state_id(self, state_id: str) -> list[str]:
141141
"""
142142
Get a list of topics for the given stream id.
143143
144-
:param stream_id: stream id
144+
:param state_id: state id
145145
:return: a list of topic names
146146
"""
147-
return list(self._stream_ids_to_topics[stream_id])
147+
return list(self._state_ids_to_topics[state_id])

quixstreams/dataframe/windows/base.py

+4-4
Original file line numberDiff line numberDiff line change
@@ -74,7 +74,7 @@ def register_store(self) -> None:
7474
# Create a config for the changelog topic based on the underlying SDF topics
7575
changelog_config = TopicManager.derive_topic_config(self._dataframe.topics)
7676
self._dataframe.processing_context.state_manager.register_windowed_store(
77-
stream_id=self._dataframe.stream_id,
77+
state_id=self._dataframe.state_id,
7878
store_name=self._name,
7979
changelog_config=changelog_config,
8080
)
@@ -88,7 +88,7 @@ def _apply_window(
8888

8989
windowed_func = _as_windowed(
9090
func=func,
91-
stream_id=self._dataframe.stream_id,
91+
state_id=self._dataframe.state_id,
9292
processing_context=self._dataframe.processing_context,
9393
store_name=name,
9494
)
@@ -400,7 +400,7 @@ def _as_windowed(
400400
func: TransformRecordCallbackExpandedWindowed,
401401
processing_context: "ProcessingContext",
402402
store_name: str,
403-
stream_id: str,
403+
state_id: str,
404404
) -> TransformExpandedCallback:
405405
@functools.wraps(func)
406406
def wrapper(
@@ -410,7 +410,7 @@ def wrapper(
410410
transaction = cast(
411411
WindowedPartitionTransaction,
412412
processing_context.checkpoint.get_store_transaction(
413-
stream_id=stream_id, partition=ctx.partition, store_name=store_name
413+
state_id=state_id, partition=ctx.partition, store_name=store_name
414414
),
415415
)
416416
if key is None:

0 commit comments

Comments
 (0)