-
Notifications
You must be signed in to change notification settings - Fork 5.8k
/
Copy pathstorage.py
161 lines (139 loc) · 6.34 KB
/
storage.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
# SPDX-License-Identifier: Apache-2.0
"""
Purpose
Shows how to use the Amazon Relational Database Service (Amazon RDS) Data Service to
interact with an Amazon Aurora Serverless database.
"""
import logging
from botocore.exceptions import ClientError
logger = logging.getLogger(__name__)
class DataServiceNotReadyException(Exception):
pass
class StorageError(Exception):
pass
class Storage:
"""
Wraps calls to the Amazon RDS Data Service.
"""
def __init__(self, cluster, secret, db_name, table_name, rdsdata_client):
"""
:param cluster: The Amazon Resource Name (ARN) of an Aurora DB cluster that
contains the work item database.
:param secret: The ARN of an AWS Secrets Manager secret that contains
credentials used to connect to the database.
:param db_name: The name of the work item database.
:param table_name: The name of the work item table in the database.
:param rdsdata_client: A Boto3 Amazon RDS Data Service client.
"""
self._cluster = cluster
self._secret = secret
self._db_name = db_name
self._table_name = table_name
self._rdsdata_client = rdsdata_client
def _run_statement(self, sql, sql_params=None):
"""
Runs a SQL statement and associated parameters using the Amazon RDS Data Service.
:param sql: The SQL statement to run.
:param sql_params: The parameters associated with the SQL statement.
:return: The result of running the SQL statement.
"""
try:
run_args = {
"database": self._db_name,
"resourceArn": self._cluster,
"secretArn": self._secret,
"sql": sql,
}
if sql_params is not None:
run_args["parameters"] = sql_params
result = self._rdsdata_client.execute_statement(**run_args)
logger.info("Ran statement on %s.", self._db_name)
except ClientError as error:
if (
error.response["Error"]["Code"] == "BadRequestException"
and "Communications link failure" in error.response["Error"]["Message"]
):
raise DataServiceNotReadyException(
"The Aurora Data Service is not ready, probably because it entered "
"pause mode after a period of inactivity. Wait a minute for "
"your cluster to resume and try your request again."
) from error
else:
logger.exception("Run statement on %s failed.", self._db_name)
raise StorageError(error)
else:
return result
def get_work_items(self, archived=None):
"""
Gets work items from the database.
:param archived: When specified, only archived or non-archived work items are
returned. Otherwise, all work items are returned.
:return: The list of retrieved work items.
"""
sql_select = "SELECT iditem, description, guide, status, username, archived"
sql_where = ""
sql_params = None
if archived is not None:
sql_where = "WHERE archived=:archived"
sql_params = [{"name": "archived", "value": {"booleanValue": archived}}]
sql = f"{sql_select} FROM {self._table_name} {sql_where}"
print(sql)
results = self._run_statement(sql, sql_params=sql_params)
output = [
{
"iditem": record[0]["longValue"],
"description": record[1]["stringValue"],
"guide": record[2]["stringValue"],
"status": record[3]["stringValue"],
"username": record[4]["stringValue"],
"archived": record[5]["booleanValue"],
}
for record in results["records"]
]
return output
def add_work_item(self, work_item):
"""
Adds a work item to the database.
Note: Wrapping the INSERT statement in a WITH clause and querying the auto-generated
ID value is required by a change in the Data API for Aurora Serverless v2.
The "generatedFields" fields in the return value for DML statements are all blank.
That's why the RETURNING clause is needed to specify the columns to return, and
the entire statement is structured as a query so that the returned value can
be retrieved from the "records" result set.
This limitation might not be permanent; the DML statement might be simplified
in future.
:param work_item: The work item to add to the database. Because the ID
and archive fields are auto-generated,
you don't need to specify them when creating a new item.
:return: The generated ID of the new work item.
"""
sql = (
"WITH t1 AS ( "
f"INSERT INTO {self._table_name} (description, guide, status, username) "
" VALUES (:description, :guide, :status, :username) RETURNING iditem "
") SELECT iditem FROM t1"
)
sql_params = [
{"name": "description", "value": {"stringValue": work_item["description"]}},
{"name": "guide", "value": {"stringValue": work_item["guide"]}},
{"name": "status", "value": {"stringValue": work_item["status"]}},
{"name": "username", "value": {"stringValue": work_item["username"]}},
]
results = self._run_statement(sql, sql_params=sql_params)
# Old style, for Serverless v1:
# work_item_id = results["generatedFields"][0]["longValue"]
# New style, for Serverless v2:
work_item_id = results["records"][0][0]["longValue"]
return work_item_id
def archive_work_item(self, iditem):
"""
Archives a work item.
:param iditem: The ID of the work item to archive.
"""
sql = f"UPDATE {self._table_name} SET archived=:archived WHERE iditem=:iditem"
sql_params = [
{"name": "archived", "value": {"booleanValue": True}},
{"name": "iditem", "value": {"longValue": int(iditem)}},
]
self._run_statement(sql, sql_params=sql_params)