Skip to content

Commit 1344888

Browse files
authored
Fixed partition_key filter for change_feed (#39895)
* Fixed partition_key filter for change_feed * Updated CHANGELOG.md * Addressed comment
1 parent aec83c4 commit 1344888

8 files changed

+315
-139
lines changed

sdk/cosmos/azure-cosmos/CHANGELOG.md

+1
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
#### Breaking Changes
88

99
#### Bugs Fixed
10+
* Fixed `partition_key` filter for `query_items_change_feed` API. See [PR 39895](https://github.com/Azure/azure-sdk-for-python/pull/39895)
1011

1112
#### Other Changes
1213

sdk/cosmos/azure-cosmos/azure/cosmos/_change_feed/change_feed_state.py

+1
Original file line numberDiff line numberDiff line change
@@ -260,6 +260,7 @@ def set_pk_range_id_request_headers(
260260
# the current token feed range spans less than single physical partition
261261
# for this case, need to set both the partition key range id and epk filter headers
262262
request_headers[http_constants.HttpHeaders.PartitionKeyRangeID] = over_lapping_ranges[0]["id"]
263+
request_headers[http_constants.HttpHeaders.ReadFeedKeyType] = "EffectivePartitionKeyRange"
263264
request_headers[http_constants.HttpHeaders.StartEpkString] = self._continuation.current_token.feed_range.min
264265
request_headers[http_constants.HttpHeaders.EndEpkString] = self._continuation.current_token.feed_range.max
265266

sdk/cosmos/azure-cosmos/samples/change_feed_management.py

+123-64
Original file line numberDiff line numberDiff line change
@@ -31,95 +31,151 @@
3131
CONTAINER_ID = config.settings['container_id']
3232

3333

34-
def create_items(container, size):
35-
print('Creating Items')
34+
def create_items(container, size, partition_key_value):
35+
print("Creating Items with partition key value: {}".format(partition_key_value))
3636

37-
for i in range(1, size):
37+
for i in range(size):
3838
c = str(uuid.uuid4())
3939
item_definition = {'id': 'item' + c,
4040
'address': {'street': '1 Microsoft Way' + c,
4141
'city': 'Redmond' + c,
42-
'state': 'WA',
42+
'state': partition_key_value,
4343
'zip code': 98052
4444
}
4545
}
4646

4747
created_item = container.create_item(body=item_definition)
4848

49+
def clean_up(container):
50+
print('\nClean up the container\n')
51+
52+
for item in container.query_items(query='SELECT * FROM c', enable_cross_partition_query=True):
53+
# Deleting the current item
54+
container.delete_item(item, partition_key=item['address']['state'])
4955

5056
def read_change_feed(container):
5157
print('\nReading Change Feed from the beginning\n')
5258

5359
# For a particular Partition Key Range we can use partition_key_range_id]
5460
# 'is_start_from_beginning = True' will read from the beginning of the history of the container
5561
# If no is_start_from_beginning is specified, the read change feed loop will pickup the items that happen while the loop / process is active
56-
response = container.query_items_change_feed(is_start_from_beginning=True)
57-
for doc in response:
62+
create_items(container, 10, 'WA')
63+
response_iterator = container.query_items_change_feed(is_start_from_beginning=True)
64+
for doc in response_iterator:
5865
print(doc)
5966

60-
print('\nFinished reading all the change feed\n')
61-
67+
def read_change_feed_with_start_time(container):
68+
print('\nReading Change Feed from the start time\n')
69+
# You can read change feed from a specific time.
70+
# You must pass in a datetime object for the start_time field.
6271

63-
def read_change_feed_with_start_time(container, start_time):
72+
# Create items
73+
create_items(container, 10, 'WA')
74+
start_time = datetime.now(timezone.utc)
6475
time = start_time.strftime('%a, %d %b %Y %H:%M:%S GMT')
6576
print('\nReading Change Feed from start time of {}\n'.format(time))
77+
create_items(container, 5, 'CA')
78+
create_items(container, 5, 'OR')
6679

67-
# You can read change feed from a specific time.
68-
# You must pass in a datetime object for the start_time field.
69-
response = container.query_items_change_feed(start_time=start_time)
70-
for doc in response:
80+
# Read change feed from the beginning
81+
response_iterator = container.query_items_change_feed(start_time="Beginning")
82+
for doc in response_iterator:
7183
print(doc)
7284

73-
print('\nFinished reading all the change feed from start time of {}\n'.format(time))
85+
# Read change feed from a start time
86+
response_iterator = container.query_items_change_feed(start_time=start_time)
87+
for doc in response_iterator:
88+
print(doc)
89+
90+
def read_change_feed_with_partition_key(container):
91+
print('\nReading Change Feed from the beginning of the partition key\n')
92+
# Create items
93+
create_items(container, 10, 'WA')
94+
create_items(container, 5, 'CA')
95+
create_items(container, 5, 'OR')
96+
97+
# Read change feed with partition key with LatestVersion mode.
98+
# Should only return change feed for the created items with 'CA' partition key
99+
response_iterator = container.query_items_change_feed(start_time="Beginning", partition_key="CA")
100+
for doc in response_iterator:
101+
print(doc)
74102

103+
def read_change_feed_with_continuation(container):
104+
print('\nReading Change Feed from the continuation\n')
105+
# Create items
106+
create_items(container, 10, 'WA')
107+
response_iterator = container.query_items_change_feed(start_time="Beginning")
108+
for doc in response_iterator:
109+
print(doc)
110+
continuation_token = container.client_connection.last_response_headers['etag']
75111

76-
def read_change_feed_with_continuation(container, continuation):
77-
print('\nReading change feed from continuation\n')
112+
# Create additional items
113+
create_items(container, 5, 'CA')
114+
create_items(container, 5, 'OR')
78115

79116
# You can read change feed from a specific continuation token.
80117
# You must pass in a valid continuation token.
81-
response = container.query_items_change_feed(continuation=continuation)
82-
for doc in response:
118+
# From our continuation token above, you will get all items created after the continuation
119+
response_iterator = container.query_items_change_feed(continuation=continuation_token)
120+
for doc in response_iterator:
83121
print(doc)
84122

85-
print('\nFinished reading all the change feed from continuation\n')
86-
87-
def delete_all_items(container):
88-
print('\nDeleting all item\n')
89-
90-
for item in container.query_items(query='SELECT * FROM c', enable_cross_partition_query=True):
91-
# Deleting the current item
92-
container.delete_item(item, partition_key=item['address']['state'])
93-
94-
print('Deleted all items')
95-
96123
def read_change_feed_with_all_versions_and_delete_mode(container):
97-
change_feed_mode = "AllVersionsAndDeletes"
98-
print("\nReading change feed with 'AllVersionsAndDeletes' mode.\n")
124+
print('\nReading Change Feed with AllVersionsAndDeletes mode\n')
125+
# Read the initial change feed with 'AllVersionsAndDeletes' mode.
126+
# This initial call was made to store a point in time in a 'continuation' token
127+
response_iterator = container.query_items_change_feed(mode="AllVersionsAndDeletes")
128+
for doc in response_iterator:
129+
print(doc)
130+
continuation_token = container.client_connection.last_response_headers['etag']
99131

100-
# You can read change feed with a specific change feed mode.
101-
# You must pass in a valid change feed mode: ["LatestVersion", "AllVersionsAndDeletes"].
102-
response = container.query_items_change_feed(mode=change_feed_mode)
103-
for doc in response:
132+
# Read all change feed with 'AllVersionsAndDeletes' mode after create items from a continuation
133+
create_items(container, 10, 'CA')
134+
create_items(container, 10, 'OR')
135+
response_iterator = container.query_items_change_feed(mode="AllVersionsAndDeletes", continuation=continuation_token)
136+
for doc in response_iterator:
104137
print(doc)
105138

106-
print("\nFinished reading all the change feed with 'AllVersionsAndDeletes' mode.\n")
139+
# Read all change feed with 'AllVersionsAndDeletes' mode after delete items from a continuation
140+
clean_up(container)
141+
response_iterator = container.query_items_change_feed(mode="AllVersionsAndDeletes", continuation=continuation_token)
142+
for doc in response_iterator:
143+
print(doc)
107144

108-
def read_change_feed_with_all_versions_and_delete_mode_from_continuation(container, continuation):
109-
change_feed_mode = "AllVersionsAndDeletes"
110-
print("\nReading change feed with 'AllVersionsAndDeletes' mode.\n")
145+
def read_change_feed_with_all_versions_and_delete_mode_with_partition_key(container):
146+
print('\nReading Change Feed with AllVersionsAndDeletes mode from the partition key\n')
111147

112-
# You can read change feed with a specific change feed mode from a specific continuation token.
113-
# You must pass in a valid change feed mode: ["LatestVersion", "AllVersionsAndDeletes"].
114-
# You must pass in a valid continuation token.
115-
response = container.query_items_change_feed(mode=change_feed_mode, continuation=continuation)
116-
for doc in response:
148+
# Read the initial change feed with 'AllVersionsAndDeletes' mode with partition key('CA').
149+
# This initial call was made to store a point in time and 'partition_key' in a 'continuation' token
150+
response_iterator = container.query_items_change_feed(mode="AllVersionsAndDeletes", partition_key="CA")
151+
for doc in response_iterator:
152+
print(doc)
153+
continuation_token = container.client_connection.last_response_headers['etag']
154+
155+
create_items(container, 10, 'CA')
156+
create_items(container, 10, 'OR')
157+
# Read change feed 'AllVersionsAndDeletes' mode with 'CA' partition key value from the previous continuation.
158+
# Should only print the created items with 'CA' partition key value
159+
response_iterator = container.query_items_change_feed(mode='AllVersionsAndDeletes', continuation=continuation_token)
160+
for doc in response_iterator:
117161
print(doc)
162+
continuation_token = container.client_connection.last_response_headers['etag']
118163

119-
print("\nFinished reading all the change feed with 'AllVersionsAndDeletes' mode.\n")
164+
clean_up(container)
165+
# Read change feed 'AllVersionsAndDeletes' mode with 'CA' partition key value from the previous continuation.
166+
# Should only print the deleted items with 'CA' partition key value
167+
response_iterator = container.query_items_change_feed(mode='AllVersionsAndDeletes', continuation=continuation_token)
168+
for doc in response_iterator:
169+
print(doc)
120170

121171
def run_sample():
122172
client = cosmos_client.CosmosClient(HOST, {'masterKey': MASTER_KEY})
173+
# Delete pre-existing database
174+
try:
175+
client.delete_database(DATABASE_ID)
176+
except exceptions.CosmosResourceNotFoundError:
177+
pass
178+
123179
try:
124180
# setup database for this sample
125181
try:
@@ -131,34 +187,37 @@ def run_sample():
131187
try:
132188
container = db.create_container(
133189
id=CONTAINER_ID,
134-
partition_key=partition_key.PartitionKey(path='/address/state', kind=documents.PartitionKind.Hash)
190+
partition_key=partition_key.PartitionKey(path='/address/state', kind=documents.PartitionKind.Hash),
191+
offer_throughput = 11000
135192
)
136193
print('Container with id \'{0}\' created'.format(CONTAINER_ID))
137194

138195
except exceptions.CosmosResourceExistsError:
139196
raise RuntimeError("Container with id '{}' already exists".format(CONTAINER_ID))
140197

141-
# Create items
142-
create_items(container, 100)
143-
# Timestamp post item creations
144-
timestamp = datetime.now(timezone.utc)
145-
# Create more items after time stamp
146-
create_items(container, 50)
147198
# Read change feed from beginning
148199
read_change_feed(container)
200+
clean_up(container)
201+
149202
# Read Change Feed from timestamp
150-
read_change_feed_with_start_time(container, timestamp)
151-
# Delete all items from container
152-
delete_all_items(container)
153-
# Read change feed with 'AllVersionsAndDeletes' mode
154-
read_change_feed_with_all_versions_and_delete_mode(container)
155-
continuation_token = container.client_connection.last_response_headers['etag']
156-
# Read change feed with 'AllVersionsAndDeletes' mode after create item
157-
create_items(container, 10)
158-
read_change_feed_with_all_versions_and_delete_mode_from_continuation(container,continuation_token)
203+
read_change_feed_with_start_time(container)
204+
clean_up(container)
205+
206+
# Read Change Feed from continuation
207+
read_change_feed_with_continuation(container)
208+
clean_up(container)
209+
210+
# Read Change Feed by partition_key
211+
read_change_feed_with_partition_key(container)
212+
clean_up(container)
213+
159214
# Read change feed with 'AllVersionsAndDeletes' mode after create/delete item
160-
delete_all_items(container)
161-
read_change_feed_with_all_versions_and_delete_mode_from_continuation(container,continuation_token)
215+
read_change_feed_with_all_versions_and_delete_mode(container)
216+
clean_up(container)
217+
218+
# Read change feed with 'AllVersionsAndDeletes' mode with partition key for create/delete items.
219+
read_change_feed_with_all_versions_and_delete_mode_with_partition_key(container)
220+
clean_up(container)
162221

163222
# cleanup database after sample
164223
try:
@@ -174,4 +233,4 @@ def run_sample():
174233

175234

176235
if __name__ == '__main__':
177-
run_sample()
236+
run_sample()

0 commit comments

Comments
 (0)