Skip to content

QueryService stats support #584

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 32 additions & 1 deletion tests/query/test_query_session.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
from concurrent.futures import _base as b
from unittest import mock


from ydb.query.base import QueryStatsMode
from ydb.query.session import QuerySession


Expand Down Expand Up @@ -143,3 +143,34 @@ def cancel(self):
assert "attach stream thread" not in thread_names

_check_session_state_empty(session)

@pytest.mark.parametrize(
"stats_mode",
[
None,
QueryStatsMode.UNSPECIFIED,
QueryStatsMode.NONE,
QueryStatsMode.BASIC,
QueryStatsMode.FULL,
QueryStatsMode.PROFILE,
],
)
def test_stats_mode(self, session: QuerySession, stats_mode: QueryStatsMode):
session.create()

for _ in session.execute("SELECT 1; SELECT 2; SELECT 3;", stats_mode=stats_mode):
pass

stats = session.last_query_stats

if stats_mode in [None, QueryStatsMode.NONE, QueryStatsMode.UNSPECIFIED]:
assert stats is None
return

assert stats is not None
assert len(stats.query_phases) > 0

if stats_mode != QueryStatsMode.BASIC:
assert len(stats.query_plan) > 0
else:
assert stats.query_plan == ""
30 changes: 30 additions & 0 deletions tests/query/test_query_transaction.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import pytest

from ydb.query.base import QueryStatsMode
from ydb.query.transaction import QueryTxContext
from ydb.query.transaction import QueryTxStateEnum

Expand Down Expand Up @@ -104,3 +105,32 @@ def test_tx_identity_after_begin_works(self, tx: QueryTxContext):

assert identity.tx_id == tx.tx_id
assert identity.session_id == tx.session_id

@pytest.mark.parametrize(
"stats_mode",
[
None,
QueryStatsMode.UNSPECIFIED,
QueryStatsMode.NONE,
QueryStatsMode.BASIC,
QueryStatsMode.FULL,
QueryStatsMode.PROFILE,
],
)
def test_stats_mode(self, tx: QueryTxContext, stats_mode: QueryStatsMode):
for _ in tx.execute("SELECT 1; SELECT 2; SELECT 3;", commit_tx=True, stats_mode=stats_mode):
pass

stats = tx.last_query_stats

if stats_mode in [None, QueryStatsMode.NONE, QueryStatsMode.UNSPECIFIED]:
assert stats is None
return

assert stats is not None
assert len(stats.query_phases) > 0

if stats_mode != QueryStatsMode.BASIC:
assert len(stats.query_plan) > 0
else:
assert stats.query_plan == ""
2 changes: 1 addition & 1 deletion ydb/aio/query/pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ async def retry_tx_async(
"""Special interface to execute a bunch of commands with transaction in a safe, retriable way.

:param callee: A function, that works with session.
:param tx_mode: Transaction mode, which is a one from the following choises:
:param tx_mode: Transaction mode, which is a one from the following choices:
1) QuerySerializableReadWrite() which is default mode;
2) QueryOnlineReadOnly(allow_inconsistent_reads=False);
3) QuerySnapshotReadOnly();
Expand Down
13 changes: 11 additions & 2 deletions ydb/aio/query/session.py
Original file line number Diff line number Diff line change
Expand Up @@ -117,26 +117,34 @@ async def execute(
exec_mode: base.QueryExecMode = None,
concurrent_result_sets: bool = False,
settings: Optional[BaseRequestSettings] = None,
*,
stats_mode: Optional[base.QueryStatsMode] = None,
) -> AsyncResponseContextIterator:
"""Sends a query to Query Service

:param query: (YQL or SQL text) to be executed.
:param syntax: Syntax of the query, which is a one from the following choises:
:param syntax: Syntax of the query, which is a one from the following choices:
1) QuerySyntax.YQL_V1, which is default;
2) QuerySyntax.PG.
:param parameters: dict with parameters and YDB types;
:param concurrent_result_sets: A flag to allow YDB mix parts of different result sets. Default is False;
:param stats_mode: Mode of query statistics to gather, which is a one from the following choices:
1) QueryStatsMode:NONE, which is default;
2) QueryStatsMode.BASIC;
3) QueryStatsMode.FULL;
4) QueryStatsMode.PROFILE;

:return: Iterator with result sets
"""
self._state._check_session_ready_to_use()

stream_it = await self._execute_call(
query=query,
parameters=parameters,
commit_tx=True,
syntax=syntax,
exec_mode=exec_mode,
parameters=parameters,
stats_mode=stats_mode,
concurrent_result_sets=concurrent_result_sets,
settings=settings,
)
Expand All @@ -147,6 +155,7 @@ async def execute(
rpc_state=None,
response_pb=resp,
session_state=self._state,
session=self,
settings=self._settings,
),
)
16 changes: 12 additions & 4 deletions ydb/aio/query/transaction.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ def __init__(self, driver, session_state, session, tx_mode):

:param driver: A driver instance
:param session_state: A state of session
:param tx_mode: Transaction mode, which is a one from the following choises:
:param tx_mode: Transaction mode, which is a one from the following choices:
1) QuerySerializableReadWrite() which is default mode;
2) QueryOnlineReadOnly(allow_inconsistent_reads=False);
3) QuerySnapshotReadOnly();
Expand Down Expand Up @@ -142,32 +142,40 @@ async def execute(
exec_mode: Optional[base.QueryExecMode] = None,
concurrent_result_sets: Optional[bool] = False,
settings: Optional[BaseRequestSettings] = None,
*,
stats_mode: Optional[base.QueryStatsMode] = None,
) -> AsyncResponseContextIterator:
"""Sends a query to Query Service

:param query: (YQL or SQL text) to be executed.
:param parameters: dict with parameters and YDB types;
:param commit_tx: A special flag that allows transaction commit.
:param syntax: Syntax of the query, which is a one from the following choises:
:param syntax: Syntax of the query, which is a one from the following choices:
1) QuerySyntax.YQL_V1, which is default;
2) QuerySyntax.PG.
:param exec_mode: Exec mode of the query, which is a one from the following choises:
:param exec_mode: Exec mode of the query, which is a one from the following choices:
1) QueryExecMode.EXECUTE, which is default;
2) QueryExecMode.EXPLAIN;
3) QueryExecMode.VALIDATE;
4) QueryExecMode.PARSE.
:param concurrent_result_sets: A flag to allow YDB mix parts of different result sets. Default is False;
:param stats_mode: Mode of query statistics to gather, which is a one from the following choices:
1) QueryStatsMode:NONE, which is default;
2) QueryStatsMode.BASIC;
3) QueryStatsMode.FULL;
4) QueryStatsMode.PROFILE;

:return: Iterator with result sets
"""
await self._ensure_prev_stream_finished()

stream_it = await self._execute_call(
query=query,
parameters=parameters,
commit_tx=commit_tx,
syntax=syntax,
exec_mode=exec_mode,
parameters=parameters,
stats_mode=stats_mode,
concurrent_result_sets=concurrent_result_sets,
settings=settings,
)
Expand Down
13 changes: 11 additions & 2 deletions ydb/query/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@

if typing.TYPE_CHECKING:
from .transaction import BaseQueryTxContext
from .session import BaseQuerySession


class QuerySyntax(enum.IntEnum):
Expand All @@ -41,7 +42,7 @@ class QueryExecMode(enum.IntEnum):
EXECUTE = 50


class StatsMode(enum.IntEnum):
class QueryStatsMode(enum.IntEnum):
UNSPECIFIED = 0
NONE = 10
BASIC = 20
Expand Down Expand Up @@ -132,12 +133,13 @@ def create_execute_query_request(
tx_mode: Optional[BaseQueryTxMode],
syntax: Optional[QuerySyntax],
exec_mode: Optional[QueryExecMode],
stats_mode: Optional[QueryStatsMode],
parameters: Optional[dict],
concurrent_result_sets: Optional[bool],
) -> ydb_query.ExecuteQueryRequest:
syntax = QuerySyntax.YQL_V1 if not syntax else syntax
exec_mode = QueryExecMode.EXECUTE if not exec_mode else exec_mode
stats_mode = StatsMode.NONE # TODO: choise is not supported yet
stats_mode = QueryStatsMode.NONE if stats_mode is None else stats_mode

tx_control = None
if not tx_id and not tx_mode:
Expand Down Expand Up @@ -189,6 +191,7 @@ def wrap_execute_query_response(
response_pb: _apis.ydb_query.ExecuteQueryResponsePart,
session_state: IQuerySessionState,
tx: Optional["BaseQueryTxContext"] = None,
session: Optional["BaseQuerySession"] = None,
commit_tx: Optional[bool] = False,
settings: Optional[QueryClientSettings] = None,
) -> convert.ResultSet:
Expand All @@ -198,6 +201,12 @@ def wrap_execute_query_response(
elif tx and response_pb.tx_meta and not tx.tx_id:
tx._move_to_beginned(response_pb.tx_meta.id)

if response_pb.HasField("exec_stats"):
if tx is not None:
tx._last_query_stats = response_pb.exec_stats
if session is not None:
session._last_query_stats = response_pb.exec_stats

if response_pb.HasField("result_set"):
return convert.ResultSet.from_message(response_pb.result_set, settings)

Expand Down
2 changes: 1 addition & 1 deletion ydb/query/pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,7 @@ def retry_tx_sync(
"""Special interface to execute a bunch of commands with transaction in a safe, retriable way.

:param callee: A function, that works with session.
:param tx_mode: Transaction mode, which is a one from the following choises:
:param tx_mode: Transaction mode, which is a one from the following choices:
1) QuerySerializableReadWrite() which is default mode;
2) QueryOnlineReadOnly(allow_inconsistent_reads=False);
3) QuerySnapshotReadOnly();
Expand Down
31 changes: 25 additions & 6 deletions ydb/query/session.py
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,12 @@ def __init__(self, driver: common_utils.SupportedDriverType, settings: Optional[
.with_timeout(DEFAULT_ATTACH_LONG_TIMEOUT)
)

self._last_query_stats = None

@property
def last_query_stats(self):
return self._last_query_stats

def _get_client_settings(
self,
driver: common_utils.SupportedDriverType,
Expand Down Expand Up @@ -189,22 +195,26 @@ def _attach_call(self) -> Iterable[_apis.ydb_query.SessionState]:
def _execute_call(
self,
query: str,
parameters: dict = None,
commit_tx: bool = False,
syntax: base.QuerySyntax = None,
exec_mode: base.QueryExecMode = None,
parameters: dict = None,
stats_mode: Optional[base.QueryStatsMode] = None,
concurrent_result_sets: bool = False,
settings: Optional[BaseRequestSettings] = None,
) -> Iterable[_apis.ydb_query.ExecuteQueryResponsePart]:
self._last_query_stats = None

request = base.create_execute_query_request(
query=query,
session_id=self._state.session_id,
parameters=parameters,
commit_tx=commit_tx,
session_id=self._state.session_id,
tx_mode=None,
tx_id=None,
syntax=syntax,
exec_mode=exec_mode,
parameters=parameters,
stats_mode=stats_mode,
concurrent_result_sets=concurrent_result_sets,
)

Expand Down Expand Up @@ -293,7 +303,7 @@ def create(self, settings: Optional[BaseRequestSettings] = None) -> "QuerySessio
def transaction(self, tx_mode: Optional[base.BaseQueryTxMode] = None) -> QueryTxContext:
"""Creates a transaction context manager with specified transaction mode.

:param tx_mode: Transaction mode, which is a one from the following choises:
:param tx_mode: Transaction mode, which is a one from the following choices:
1) QuerySerializableReadWrite() which is default mode;
2) QueryOnlineReadOnly(allow_inconsistent_reads=False);
3) QuerySnapshotReadOnly();
Expand Down Expand Up @@ -321,26 +331,34 @@ def execute(
exec_mode: base.QueryExecMode = None,
concurrent_result_sets: bool = False,
settings: Optional[BaseRequestSettings] = None,
*,
stats_mode: Optional[base.QueryStatsMode] = None,
) -> base.SyncResponseContextIterator:
"""Sends a query to Query Service

:param query: (YQL or SQL text) to be executed.
:param syntax: Syntax of the query, which is a one from the following choises:
:param syntax: Syntax of the query, which is a one from the following choices:
1) QuerySyntax.YQL_V1, which is default;
2) QuerySyntax.PG.
:param parameters: dict with parameters and YDB types;
:param concurrent_result_sets: A flag to allow YDB mix parts of different result sets. Default is False;
:param stats_mode: Mode of query statistics to gather, which is a one from the following choices:
1) QueryStatsMode:NONE, which is default;
2) QueryStatsMode.BASIC;
3) QueryStatsMode.FULL;
4) QueryStatsMode.PROFILE;

:return: Iterator with result sets
"""
self._state._check_session_ready_to_use()

stream_it = self._execute_call(
query=query,
parameters=parameters,
commit_tx=True,
syntax=syntax,
exec_mode=exec_mode,
parameters=parameters,
stats_mode=stats_mode,
concurrent_result_sets=concurrent_result_sets,
settings=settings,
)
Expand All @@ -351,6 +369,7 @@ def execute(
rpc_state=None,
response_pb=resp,
session_state=self._state,
session=self,
settings=self._settings,
),
)
Loading
Loading