8
8
from ydb .tests .olap .common .thread_helper import TestThread , TestThreads
9
9
from ydb import PrimitiveType
10
10
from typing import List , Dict , Any
11
- from ydb .tests .olap .lib .utils import get_external_param , external_param_is_true
11
+ from ydb .tests .olap .lib .utils import get_external_param
12
+
13
+ import random
14
+ import logging
15
+ logger = logging .getLogger (__name__ )
12
16
13
17
14
18
class TestInsert (BaseTestSet ):
@@ -26,16 +30,25 @@ class TestInsert(BaseTestSet):
26
30
)
27
31
28
32
def _loop_upsert (self , ctx : TestContext , data : list , table : str ):
33
+ min_time = 10.0 / len (data )
34
+ max_time = min_time * 10
35
+
29
36
sth = ScenarioTestHelper (ctx )
30
37
table_name = "log" + table
31
38
for batch in data :
32
39
sth .bulk_upsert_data (table_name , self .schema_log , batch )
40
+ logger .info ("Upsert" )
41
+ time .sleep (random .uniform (min_time , max_time ))
42
+
43
+ def _loop_insert (self , ctx : TestContext , rows_count : int , table : str ):
44
+ min_time = 1.0 / rows_count
45
+ max_time = min_time * 10
33
46
34
- def _loop_insert (self , ctx : TestContext , rows_count : int , table : str , ignore_read_errors : bool ):
35
47
sth = ScenarioTestHelper (ctx )
36
48
log : str = sth .get_full_path ("log" + table )
37
49
cnt : str = sth .get_full_path ("cnt" + table )
38
50
for i in range (rows_count ):
51
+ logger .info ("Insert" )
39
52
for c in range (10 ):
40
53
try :
41
54
result = sth .execute_query (
@@ -50,12 +63,9 @@ def _loop_insert(self, ctx: TestContext, rows_count: int, table: str, ignore_rea
50
63
51
64
break
52
65
except Exception :
53
- if ignore_read_errors :
54
- pass
55
- else :
56
- if c >= 9 :
57
- raise
58
- time .sleep (1 )
66
+ if c >= 9 :
67
+ raise
68
+ time .sleep (random .uniform (min_time , max_time ))
59
69
60
70
def scenario_read_data_during_bulk_upsert (self , ctx : TestContext ):
61
71
sth = ScenarioTestHelper (ctx )
@@ -65,7 +75,6 @@ def scenario_read_data_during_bulk_upsert(self, ctx: TestContext):
65
75
rows_count = int (get_external_param ("rows_count" , "1000" ))
66
76
inserts_count = int (get_external_param ("inserts_count" , str (self .def_inserts_count )))
67
77
tables_count = int (get_external_param ("tables_count" , "1" ))
68
- ignore_read_errors = external_param_is_true ("ignore_read_errors" )
69
78
for table in range (tables_count ):
70
79
sth .execute_scheme_query (
71
80
CreateTable (cnt_table_name + str (table )).with_schema (self .schema_cnt )
@@ -86,7 +95,7 @@ def scenario_read_data_during_bulk_upsert(self, ctx: TestContext):
86
95
for table in range (tables_count ):
87
96
thread1 .append (TestThread (target = self ._loop_upsert , args = [ctx , data , str (table )]))
88
97
for table in range (tables_count ):
89
- thread2 .append (TestThread (target = self ._loop_insert , args = [ctx , inserts_count , str (table ), ignore_read_errors ]))
98
+ thread2 .append (TestThread (target = self ._loop_insert , args = [ctx , inserts_count , str (table )]))
90
99
91
100
thread1 .start_all ()
92
101
thread2 .start_all ()
0 commit comments