diff --git a/test/e2e/table.py b/test/e2e/table.py index 5d5bf33..e4af7e3 100644 --- a/test/e2e/table.py +++ b/test/e2e/table.py @@ -72,6 +72,18 @@ def __call__(self, record: dict) -> bool: def ttl_on_attribute_matches(attr_name: str) -> TableMatchFunc: return TTLAttributeMatcher(attr_name) +class PITRMatcher: + def __init__(self, enabled: bool): + self.enabled = enabled + + def __call__(self, record: dict) -> bool: + pitr_enabled = get_point_in_time_recovery_enabled(record['TableName']) + if pitr_enabled is None: + return False + return pitr_enabled == self.enabled + +def point_in_time_recovery_matches(enabled: bool) -> TableMatchFunc: + return PITRMatcher(enabled) class StreamSpecificationMatcher: def __init__(self, enabled: bool): @@ -230,4 +242,16 @@ def get_time_to_live(table_name): resp = c.describe_time_to_live(TableName=table_name) return resp['TimeToLiveDescription'] except c.exceptions.ResourceNotFoundException: - return None \ No newline at end of file + return None + +def get_point_in_time_recovery_enabled(table_name): + """Returns whether point in time recovery is enabled for the table with a supplied name. + + If no such Table exists, returns None. + """ + c = boto3.client('dynamodb', region_name=get_region()) + try: + resp = c.describe_continuous_backups(TableName=table_name) + return resp['ContinuousBackupsDescription']['PointInTimeRecoveryDescription']['PointInTimeRecoveryStatus'] == 'ENABLED' + except c.exceptions.ResourceNotFoundException: + return None diff --git a/test/e2e/tests/test_table.py b/test/e2e/tests/test_table.py index 4ef443f..a5974fe 100644 --- a/test/e2e/tests/test_table.py +++ b/test/e2e/tests/test_table.py @@ -265,6 +265,59 @@ def test_enable_ttl(self, table_lsi): ttl_status = ttl["TimeToLiveStatus"] assert ttl_status in ("ENABLED", "ENABLING") + def test_enable_point_in_time_recovery(self, table_lsi): + (ref, res) = table_lsi + + table_name = res["spec"]["tableName"] + + # Check DynamoDB Table exists + assert self.table_exists(table_name) + + # Get CR latest revision + cr = k8s.wait_resource_consumed_by_controller(ref) + + # Update PITR + updates = { + "spec": { + "continuousBackups": { + "pointInTimeRecoveryEnabled": True + } + } + } + + # Patch k8s resource + k8s.patch_custom_resource(ref, updates) + + table.wait_until( + table_name, + table.point_in_time_recovery_matches(True), + ) + + pitr_enabled = table.get_point_in_time_recovery_enabled(table_name) + assert pitr_enabled is not None + assert pitr_enabled + + # turn off pitr again and ensure it is disabled + updates = { + "spec": { + "continuousBackups": { + "pointInTimeRecoveryEnabled": False + } + } + } + + # Patch k8s resource + k8s.patch_custom_resource(ref, updates) + + table.wait_until( + table_name, + table.point_in_time_recovery_matches(False), + ) + + pitr_enabled = table.get_point_in_time_recovery_enabled(table_name) + assert pitr_enabled is not None + assert not pitr_enabled + def test_enable_stream_specification(self, table_lsi): (ref, res) = table_lsi @@ -706,4 +759,4 @@ def test_multi_updates(self, table_gsi): assert latestTable["ProvisionedThroughput"] is not None assert latestTable["ProvisionedThroughput"]["ReadCapacityUnits"] == 10 - assert latestTable["ProvisionedThroughput"]["WriteCapacityUnits"] == 10 \ No newline at end of file + assert latestTable["ProvisionedThroughput"]["WriteCapacityUnits"] == 10