-
Notifications
You must be signed in to change notification settings - Fork 57
/
Copy pathtest_query_session.py
176 lines (131 loc) · 5.31 KB
/
test_query_session.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
import pytest
import threading
import time
from concurrent.futures import _base as b
from unittest import mock
from ydb.query.base import QueryStatsMode
from ydb.query.session import QuerySession
def _check_session_state_empty(session: QuerySession):
assert session._state.session_id is None
assert session._state.node_id is None
assert not session._state.attached
def _check_session_state_full(session: QuerySession):
assert session._state.session_id is not None
assert session._state.node_id is not None
assert session._state.attached
class TestQuerySession:
def test_session_normal_lifecycle(self, session: QuerySession):
_check_session_state_empty(session)
session.create()
_check_session_state_full(session)
session.delete()
_check_session_state_empty(session)
def test_second_create_do_nothing(self, session: QuerySession):
session.create()
_check_session_state_full(session)
session_id_before = session._state.session_id
node_id_before = session._state.node_id
session.create()
_check_session_state_full(session)
assert session._state.session_id == session_id_before
assert session._state.node_id == node_id_before
def test_second_delete_do_nothing(self, session: QuerySession):
session.create()
session.delete()
session.delete()
def test_delete_before_create_not_possible(self, session: QuerySession):
with pytest.raises(RuntimeError):
session.delete()
def test_create_after_delete_not_possible(self, session: QuerySession):
session.create()
session.delete()
with pytest.raises(RuntimeError):
session.create()
def test_transaction_before_create_raises(self, session: QuerySession):
with pytest.raises(RuntimeError):
session.transaction()
def test_transaction_after_delete_raises(self, session: QuerySession):
session.create()
session.delete()
with pytest.raises(RuntimeError):
session.transaction()
def test_transaction_after_create_not_raises(self, session: QuerySession):
session.create()
session.transaction()
def test_execute_before_create_raises(self, session: QuerySession):
with pytest.raises(RuntimeError):
session.execute("select 1;")
def test_execute_after_delete_raises(self, session: QuerySession):
session.create()
session.delete()
with pytest.raises(RuntimeError):
session.execute("select 1;")
def test_basic_execute(self, session: QuerySession):
session.create()
it = session.execute("select 1;")
result_sets = [result_set for result_set in it]
assert len(result_sets) == 1
assert len(result_sets[0].rows) == 1
assert len(result_sets[0].columns) == 1
assert list(result_sets[0].rows[0].values()) == [1]
def test_two_results(self, session: QuerySession):
session.create()
res = []
counter = 0
with session.execute("select 1; select 2") as results:
for result_set in results:
counter += 1
if len(result_set.rows) > 0:
res.append(list(result_set.rows[0].values()))
assert res == [[1], [2]]
assert counter == 2
def test_thread_leaks(self, session: QuerySession):
session.create()
thread_names = [t.name for t in threading.enumerate()]
assert "first response attach stream thread" not in thread_names
assert "attach stream thread" in thread_names
def test_first_resp_timeout(self, session: QuerySession):
class FakeStream:
def __iter__(self):
return self
def __next__(self):
time.sleep(10)
return 1
def cancel(self):
pass
fake_stream = mock.Mock(spec=FakeStream)
session._attach_call = mock.MagicMock(return_value=fake_stream)
assert session._attach_call() == fake_stream
session._create_call()
with pytest.raises(b.TimeoutError):
session._attach(0.1)
fake_stream.cancel.assert_called()
thread_names = [t.name for t in threading.enumerate()]
assert "first response attach stream thread" not in thread_names
assert "attach stream thread" not in thread_names
_check_session_state_empty(session)
@pytest.mark.parametrize(
"stats_mode",
[
None,
QueryStatsMode.UNSPECIFIED,
QueryStatsMode.NONE,
QueryStatsMode.BASIC,
QueryStatsMode.FULL,
QueryStatsMode.PROFILE,
],
)
def test_stats_mode(self, session: QuerySession, stats_mode: QueryStatsMode):
session.create()
for _ in session.execute("SELECT 1; SELECT 2; SELECT 3;", stats_mode=stats_mode):
pass
stats = session.last_query_stats
if stats_mode in [None, QueryStatsMode.NONE, QueryStatsMode.UNSPECIFIED]:
assert stats is None
return
assert stats is not None
assert len(stats.query_phases) > 0
if stats_mode != QueryStatsMode.BASIC:
assert len(stats.query_plan) > 0
else:
assert stats.query_plan == ""