From 56fe1a5456424756d183465ed80fd634528f7a2d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Knut=20Olav=20L=C3=B8ite?= Date: Thu, 3 Apr 2025 13:50:02 +0200 Subject: [PATCH 1/2] feat: add SQL statement for egin transaction isolation level Adds an additional option to the `begin [transaction]` SQL statement to specify the isolation level of that transaction. The following format is now supported: ``` {begin | start} [transaction] [isolation level {repeatable read | serializable}] ``` --- .../client_side_statement_executor.py | 21 +++++- .../client_side_statement_parser.py | 9 ++- .../test_dbapi_isolation_level.py | 24 ++++++ .../test_client_side_statement_executor.py | 54 ++++++++++++++ tests/unit/spanner_dbapi/test_parse_utils.py | 74 +++++++++++++++++++ 5 files changed, 179 insertions(+), 3 deletions(-) create mode 100644 tests/unit/spanner_dbapi/test_client_side_statement_executor.py diff --git a/google/cloud/spanner_dbapi/client_side_statement_executor.py b/google/cloud/spanner_dbapi/client_side_statement_executor.py index b1ed2873ae..ffda11f8b8 100644 --- a/google/cloud/spanner_dbapi/client_side_statement_executor.py +++ b/google/cloud/spanner_dbapi/client_side_statement_executor.py @@ -11,7 +11,8 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. -from typing import TYPE_CHECKING +from typing import TYPE_CHECKING, Union +from google.cloud.spanner_v1 import TransactionOptions if TYPE_CHECKING: from google.cloud.spanner_dbapi.cursor import Cursor @@ -58,7 +59,7 @@ def execute(cursor: "Cursor", parsed_statement: ParsedStatement): connection.commit() return None if statement_type == ClientSideStatementType.BEGIN: - connection.begin() + connection.begin(isolation_level=_get_isolation_level(parsed_statement)) return None if statement_type == ClientSideStatementType.ROLLBACK: connection.rollback() @@ -121,3 +122,19 @@ def _get_streamed_result_set(column_name, type_code, column_values): column_values_pb.append(_make_value_pb(column_value)) result_set.values.extend(column_values_pb) return StreamedResultSet(iter([result_set])) + + +def _get_isolation_level( + statement: ParsedStatement, +) -> Union[TransactionOptions.IsolationLevel, None]: + if ( + statement.client_side_statement_params is None + or len(statement.client_side_statement_params) == 0 + ): + return None + level = statement.client_side_statement_params[0] + if not isinstance(level, str) or level == "": + return None + # Replace (duplicate) whitespaces in the string with an underscore. + level = "_".join(level.split()).upper() + return TransactionOptions.IsolationLevel[level] diff --git a/google/cloud/spanner_dbapi/client_side_statement_parser.py b/google/cloud/spanner_dbapi/client_side_statement_parser.py index f978d17f03..7c26c2a98d 100644 --- a/google/cloud/spanner_dbapi/client_side_statement_parser.py +++ b/google/cloud/spanner_dbapi/client_side_statement_parser.py @@ -21,7 +21,10 @@ Statement, ) -RE_BEGIN = re.compile(r"^\s*(BEGIN|START)(\s+TRANSACTION)?\s*$", re.IGNORECASE) +RE_BEGIN = re.compile( + r"^\s*(?:BEGIN|START)(?:\s+TRANSACTION)?(?:\s+ISOLATION\s+LEVEL\s+(REPEATABLE\s+READ|SERIALIZABLE))?\s*$", + re.IGNORECASE, +) RE_COMMIT = re.compile(r"^\s*(COMMIT)(\s+TRANSACTION)?\s*$", re.IGNORECASE) RE_ROLLBACK = re.compile(r"^\s*(ROLLBACK)(\s+TRANSACTION)?\s*$", re.IGNORECASE) RE_SHOW_COMMIT_TIMESTAMP = re.compile( @@ -68,6 +71,10 @@ def parse_stmt(query): elif RE_START_BATCH_DML.match(query): client_side_statement_type = ClientSideStatementType.START_BATCH_DML elif RE_BEGIN.match(query): + match = re.search(RE_BEGIN, query) + isolation_level = match.group(1) + if isolation_level is not None: + client_side_statement_params.append(isolation_level) client_side_statement_type = ClientSideStatementType.BEGIN elif RE_RUN_BATCH.match(query): client_side_statement_type = ClientSideStatementType.RUN_BATCH diff --git a/tests/mockserver_tests/test_dbapi_isolation_level.py b/tests/mockserver_tests/test_dbapi_isolation_level.py index e2b6ddbb46..699a260803 100644 --- a/tests/mockserver_tests/test_dbapi_isolation_level.py +++ b/tests/mockserver_tests/test_dbapi_isolation_level.py @@ -117,3 +117,27 @@ def test_transaction_isolation_level(self): self.assertEqual(1, len(begin_requests)) self.assertEqual(begin_requests[0].options.isolation_level, level) MockServerTestBase.spanner_service.clear_requests() + + def test_begin_isolation_level(self): + connection = Connection(self.instance, self.database) + for level in [ + TransactionOptions.IsolationLevel.REPEATABLE_READ, + TransactionOptions.IsolationLevel.SERIALIZABLE, + ]: + isolation_level_name = level.name.replace("_", " ") + with connection.cursor() as cursor: + cursor.execute(f"begin isolation level {isolation_level_name}") + cursor.execute( + "insert into singers (id, name) values (1, 'Some Singer')" + ) + self.assertEqual(1, cursor.rowcount) + connection.commit() + begin_requests = list( + filter( + lambda msg: isinstance(msg, BeginTransactionRequest), + self.spanner_service.requests, + ) + ) + self.assertEqual(1, len(begin_requests)) + self.assertEqual(begin_requests[0].options.isolation_level, level) + MockServerTestBase.spanner_service.clear_requests() diff --git a/tests/unit/spanner_dbapi/test_client_side_statement_executor.py b/tests/unit/spanner_dbapi/test_client_side_statement_executor.py new file mode 100644 index 0000000000..888f81e830 --- /dev/null +++ b/tests/unit/spanner_dbapi/test_client_side_statement_executor.py @@ -0,0 +1,54 @@ +# Copyright 2025 Google LLC All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import unittest + +from google.cloud.spanner_dbapi.client_side_statement_executor import ( + _get_isolation_level, +) +from google.cloud.spanner_dbapi.parse_utils import classify_statement +from google.cloud.spanner_v1 import TransactionOptions + + +class TestParseUtils(unittest.TestCase): + def test_get_isolation_level(self): + self.assertIsNone(_get_isolation_level(classify_statement("begin"))) + self.assertEqual( + TransactionOptions.IsolationLevel.SERIALIZABLE, + _get_isolation_level( + classify_statement("begin isolation level serializable") + ), + ) + self.assertEqual( + TransactionOptions.IsolationLevel.SERIALIZABLE, + _get_isolation_level( + classify_statement( + "begin transaction isolation level serializable " + ) + ), + ) + self.assertEqual( + TransactionOptions.IsolationLevel.REPEATABLE_READ, + _get_isolation_level( + classify_statement("begin isolation level repeatable read") + ), + ) + self.assertEqual( + TransactionOptions.IsolationLevel.REPEATABLE_READ, + _get_isolation_level( + classify_statement( + "begin transaction isolation level repeatable read " + ) + ), + ) diff --git a/tests/unit/spanner_dbapi/test_parse_utils.py b/tests/unit/spanner_dbapi/test_parse_utils.py index 031fbc443f..f63dbb78e4 100644 --- a/tests/unit/spanner_dbapi/test_parse_utils.py +++ b/tests/unit/spanner_dbapi/test_parse_utils.py @@ -63,8 +63,28 @@ def test_classify_stmt(self): ("commit", StatementType.CLIENT_SIDE), ("begin", StatementType.CLIENT_SIDE), ("start", StatementType.CLIENT_SIDE), + ("begin isolation level serializable", StatementType.CLIENT_SIDE), + ("start isolation level serializable", StatementType.CLIENT_SIDE), + ("begin isolation level repeatable read", StatementType.CLIENT_SIDE), + ("start isolation level repeatable read", StatementType.CLIENT_SIDE), ("begin transaction", StatementType.CLIENT_SIDE), ("start transaction", StatementType.CLIENT_SIDE), + ( + "begin transaction isolation level serializable", + StatementType.CLIENT_SIDE, + ), + ( + "start transaction isolation level serializable", + StatementType.CLIENT_SIDE, + ), + ( + "begin transaction isolation level repeatable read", + StatementType.CLIENT_SIDE, + ), + ( + "start transaction isolation level repeatable read", + StatementType.CLIENT_SIDE, + ), ("rollback", StatementType.CLIENT_SIDE), (" commit TRANSACTION ", StatementType.CLIENT_SIDE), (" rollback TRANSACTION ", StatementType.CLIENT_SIDE), @@ -84,6 +104,16 @@ def test_classify_stmt(self): ("udpate table set col2=1 where col1 = 2", StatementType.UNKNOWN), ("begin foo", StatementType.UNKNOWN), ("begin transaction foo", StatementType.UNKNOWN), + ("begin transaction isolation level", StatementType.UNKNOWN), + ("begin transaction repeatable read", StatementType.UNKNOWN), + ( + "begin transaction isolation level repeatable read foo", + StatementType.UNKNOWN, + ), + ( + "begin transaction isolation level unspecified", + StatementType.UNKNOWN, + ), ("commit foo", StatementType.UNKNOWN), ("commit transaction foo", StatementType.UNKNOWN), ("rollback foo", StatementType.UNKNOWN), @@ -100,6 +130,50 @@ def test_classify_stmt(self): classify_statement(query).statement_type, want_class, query ) + def test_begin_isolation_level(self): + parsed_statement = classify_statement("begin") + self.assertEqual( + parsed_statement, + ParsedStatement( + StatementType.CLIENT_SIDE, + Statement("begin"), + ClientSideStatementType.BEGIN, + [], + ), + ) + parsed_statement = classify_statement("begin isolation level serializable") + self.assertEqual( + parsed_statement, + ParsedStatement( + StatementType.CLIENT_SIDE, + Statement("begin isolation level serializable"), + ClientSideStatementType.BEGIN, + ["serializable"], + ), + ) + parsed_statement = classify_statement("begin isolation level repeatable read") + self.assertEqual( + parsed_statement, + ParsedStatement( + StatementType.CLIENT_SIDE, + Statement("begin isolation level repeatable read"), + ClientSideStatementType.BEGIN, + ["repeatable read"], + ), + ) + parsed_statement = classify_statement( + "begin isolation level repeatable read " + ) + self.assertEqual( + parsed_statement, + ParsedStatement( + StatementType.CLIENT_SIDE, + Statement("begin isolation level repeatable read"), + ClientSideStatementType.BEGIN, + ["repeatable read"], + ), + ) + def test_partition_query_classify_stmt(self): parsed_statement = classify_statement( " PARTITION SELECT s.SongName FROM Songs AS s " From 261512bc1b8e197aee477b8592745a956a02856b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Knut=20Olav=20L=C3=B8ite?= Date: Mon, 14 Apr 2025 11:32:44 +0200 Subject: [PATCH 2/2] test: add test for invalid isolation level --- tests/mockserver_tests/test_dbapi_isolation_level.py | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/tests/mockserver_tests/test_dbapi_isolation_level.py b/tests/mockserver_tests/test_dbapi_isolation_level.py index 699a260803..679740969a 100644 --- a/tests/mockserver_tests/test_dbapi_isolation_level.py +++ b/tests/mockserver_tests/test_dbapi_isolation_level.py @@ -12,6 +12,7 @@ # See the License for the specific language governing permissions and # limitations under the License. +from google.api_core.exceptions import Unknown from google.cloud.spanner_dbapi import Connection from google.cloud.spanner_v1 import ( BeginTransactionRequest, @@ -141,3 +142,9 @@ def test_begin_isolation_level(self): self.assertEqual(1, len(begin_requests)) self.assertEqual(begin_requests[0].options.isolation_level, level) MockServerTestBase.spanner_service.clear_requests() + + def test_begin_invalid_isolation_level(self): + connection = Connection(self.instance, self.database) + with connection.cursor() as cursor: + with self.assertRaises(Unknown): + cursor.execute("begin isolation level does_not_exist")