diff --git a/dapr/clients/grpc/_request.py b/dapr/clients/grpc/_request.py index af7613299..bdd699b1e 100644 --- a/dapr/clients/grpc/_request.py +++ b/dapr/clients/grpc/_request.py @@ -285,6 +285,7 @@ def __init__( data: Optional[Union[bytes, str]] = None, etag: Optional[str] = None, operation_type: TransactionOperationType = TransactionOperationType.upsert, + metadata: Optional[Dict[str, str]] = None, ): """Initializes TransactionalStateOperation item from :obj:`runtime_v1.TransactionalStateOperation`. @@ -305,6 +306,7 @@ def __init__( self._data = data # type: ignore self._etag = etag self._operation_type = operation_type + self._metadata = metadata @property def key(self) -> str: @@ -326,6 +328,11 @@ def operation_type(self) -> TransactionOperationType: """Gets etag.""" return self._operation_type + @property + def metadata(self) -> Dict[str, str]: + """Gets metadata.""" + return {} if self._metadata is None else self._metadata + class EncryptRequestIterator(DaprRequest): """An iterator for cryptography encrypt API requests. diff --git a/dapr/clients/grpc/client.py b/dapr/clients/grpc/client.py index 5a18ad057..9868af083 100644 --- a/dapr/clients/grpc/client.py +++ b/dapr/clients/grpc/client.py @@ -939,6 +939,7 @@ def execute_state_transaction( key=o.key, value=to_bytes(o.data) if o.data is not None else to_bytes(''), etag=common_v1.Etag(value=o.etag) if o.etag is not None else None, + metadata=o.metadata, ), ) for o in operations diff --git a/examples/state_store/README.md b/examples/state_store/README.md index f42074762..b778193c5 100644 --- a/examples/state_store/README.md +++ b/examples/state_store/README.md @@ -41,13 +41,15 @@ expected_stdout_lines: - "== APP == Cannot save bulk due to bad etags. ErrorCode=StatusCode.ABORTED" - "== APP == Got value=b'value_1' eTag=1" - "== APP == Got items with etags: [(b'value_1_updated', '2'), (b'value_2', '2')]" + - "== APP == Transaction with outbox pattern executed successfully!" + - "== APP == Got value after outbox pattern: b'val1'" - "== APP == Got values after transaction delete: [b'', b'']" - "== APP == Got value after delete: b''" timeout_seconds: 5 --> ```bash -dapr run -- python3 state_store.py +dapr run --resources-path components/ -- python3 state_store.py ``` @@ -68,6 +70,10 @@ The output should be as follows: == APP == Got items with etags: [(b'value_1_updated', '2'), (b'value_2', '2')] +== APP == Transaction with outbox pattern executed successfully! + +== APP == Got value after outbox pattern: b'val1' + == APP == Got values after transaction delete: [b'', b''] == APP == Got value after delete: b'' diff --git a/examples/state_store/components/pubsub.yaml b/examples/state_store/components/pubsub.yaml new file mode 100644 index 000000000..18764d8ce --- /dev/null +++ b/examples/state_store/components/pubsub.yaml @@ -0,0 +1,12 @@ +apiVersion: dapr.io/v1alpha1 +kind: Component +metadata: + name: pubsub +spec: + type: pubsub.redis + version: v1 + metadata: + - name: redisHost + value: localhost:6379 + - name: redisPassword + value: "" diff --git a/examples/state_store/components/statestore.yaml b/examples/state_store/components/statestore.yaml new file mode 100644 index 000000000..c69fd7171 --- /dev/null +++ b/examples/state_store/components/statestore.yaml @@ -0,0 +1,18 @@ +apiVersion: dapr.io/v1alpha1 +kind: Component +metadata: + name: statestore +spec: + type: state.redis + version: v1 + metadata: + - name: redisHost + value: localhost:6379 + - name: redisPassword + value: "" + - name: actorStateStore + value: "true" + - name: outboxPublishPubsub + value: "pubsub" + - name: outboxPublishTopic + value: "test" \ No newline at end of file diff --git a/examples/state_store/state_store.py b/examples/state_store/state_store.py index a7b449d61..301c675bc 100644 --- a/examples/state_store/state_store.py +++ b/examples/state_store/state_store.py @@ -88,6 +88,24 @@ ).items print(f'Got items with etags: {[(i.data, i.etag) for i in items]}') + # Outbox pattern + # pass in the ("outbox.projection", "true") metadata to the transaction to enable the outbox pattern. + d.execute_state_transaction( + store_name=storeName, + operations=[ + TransactionalStateOperation( + key='key1', data='val1', metadata={'outbox.projection': 'false'} + ), + TransactionalStateOperation( + key='key1', data='val2', metadata={'outbox.projection': 'true'} + ), + ], + ) + print('Transaction with outbox pattern executed successfully!') + + val = d.get_state(store_name=storeName, key='key1').data + print(f'Got value after outbox pattern: {val}') + # Transaction delete d.execute_state_transaction( store_name=storeName,