Skip to content

Commit bb5ecac

Browse files
authored
Fix stateful mergetree (#83)
1 parent fb5b921 commit bb5ecac

File tree

5 files changed

+38
-13
lines changed

5 files changed

+38
-13
lines changed

programs/local/LocalServer.cpp

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -694,7 +694,14 @@ void LocalServer::processConfig()
694694
// global once flag
695695
/// We load temporary database first, because projections need it.
696696
static std::once_flag db_catalog_once;
697-
std::call_once(db_catalog_once, [&] { DatabaseCatalog::instance().initializeAndLoadTemporaryDatabase(); });
697+
if (config().has("path"))
698+
{
699+
DatabaseCatalog::instance().initializeAndLoadTemporaryDatabase();
700+
}
701+
else
702+
{
703+
std::call_once(db_catalog_once, [&] { DatabaseCatalog::instance().initializeAndLoadTemporaryDatabase(); });
704+
}
698705

699706
/** Init dummy default DB
700707
* NOTE: We force using isolated default database to avoid conflicts with default database from server environment

src/Core/ServerSettings.h

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -71,16 +71,16 @@ namespace DB
7171
M(UInt64, concurrent_threads_soft_limit_num, 0, "Sets how many concurrent thread can be allocated before applying CPU pressure. Zero means Unlimited.", 0) \
7272
M(UInt64, concurrent_threads_soft_limit_ratio_to_cores, 0, "Same as concurrent_threads_soft_limit_num, but with ratio to cores.", 0) \
7373
\
74-
M(UInt64, background_pool_size, 16, "The maximum number of threads what will be used for merging or mutating data parts for *MergeTree-engine tables in a background.", 0) \
74+
M(UInt64, background_pool_size, 0, "The maximum number of threads what will be used for merging or mutating data parts for *MergeTree-engine tables in a background.", 0) \
7575
M(Float, background_merges_mutations_concurrency_ratio, 2, "The number of part mutation tasks that can be executed concurrently by each thread in background pool.", 0) \
7676
M(String, background_merges_mutations_scheduling_policy, "round_robin", "The policy on how to perform a scheduling for background merges and mutations. Possible values are: `round_robin` and `shortest_task_first`. ", 0) \
77-
M(UInt64, background_move_pool_size, 8, "The maximum number of threads that will be used for moving data parts to another disk or volume for *MergeTree-engine tables in a background.", 0) \
78-
M(UInt64, background_fetches_pool_size, 8, "The maximum number of threads that will be used for fetching data parts from another replica for *MergeTree-engine tables in a background.", 0) \
79-
M(UInt64, background_common_pool_size, 8, "The maximum number of threads that will be used for performing a variety of operations (mostly garbage collection) for *MergeTree-engine tables in a background.", 0) \
80-
M(UInt64, background_buffer_flush_schedule_pool_size, 16, "The maximum number of threads that will be used for performing flush operations for Buffer-engine tables in a background.", 0) \
77+
M(UInt64, background_move_pool_size, 0, "The maximum number of threads that will be used for moving data parts to another disk or volume for *MergeTree-engine tables in a background.", 0) \
78+
M(UInt64, background_fetches_pool_size, 0, "The maximum number of threads that will be used for fetching data parts from another replica for *MergeTree-engine tables in a background.", 0) \
79+
M(UInt64, background_common_pool_size, 0, "The maximum number of threads that will be used for performing a variety of operations (mostly garbage collection) for *MergeTree-engine tables in a background.", 0) \
80+
M(UInt64, background_buffer_flush_schedule_pool_size, 0, "The maximum number of threads that will be used for performing flush operations for Buffer-engine tables in a background.", 0) \
8181
M(UInt64, background_schedule_pool_size, 0, "The maximum number of threads that will be used for constantly executing some lightweight periodic operations.", 0) \
82-
M(UInt64, background_message_broker_schedule_pool_size, 16, "The maximum number of threads that will be used for executing background operations for message streaming.", 0) \
83-
M(UInt64, background_distributed_schedule_pool_size, 16, "The maximum number of threads that will be used for executing distributed sends.", 0) \
82+
M(UInt64, background_message_broker_schedule_pool_size, 0, "The maximum number of threads that will be used for executing background operations for message streaming.", 0) \
83+
M(UInt64, background_distributed_schedule_pool_size, 0, "The maximum number of threads that will be used for executing distributed sends.", 0) \
8484
M(Bool, display_secrets_in_show_and_select, false, "Allow showing secrets in SHOW and SELECT queries via a format setting and a grant", 0)
8585

8686

src/Storages/MergeTree/MergeTreeBackgroundExecutor.cpp

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -42,8 +42,8 @@ MergeTreeBackgroundExecutor<Queue>::MergeTreeBackgroundExecutor(
4242
, pool(std::make_unique<ThreadPool>(
4343
CurrentMetrics::MergeTreeBackgroundExecutorThreads, CurrentMetrics::MergeTreeBackgroundExecutorThreadsActive))
4444
{
45-
if (max_tasks_count == 0)
46-
throw Exception(ErrorCodes::INVALID_CONFIG_PARAMETER, "Task count for MergeTreeBackgroundExecutor must not be zero");
45+
// if (max_tasks_count == 0)
46+
// throw Exception(ErrorCodes::INVALID_CONFIG_PARAMETER, "Task count for MergeTreeBackgroundExecutor must not be zero");
4747

4848
pending.setCapacity(max_tasks_count);
4949
active.set_capacity(max_tasks_count);

src/Storages/MergeTree/MergeTreeSettings.h

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -43,8 +43,8 @@ struct Settings;
4343
M(UInt64, max_replicated_merges_in_queue, 1000, "How many tasks of merging and mutating parts are allowed simultaneously in ReplicatedMergeTree queue.", 0) \
4444
M(UInt64, max_replicated_mutations_in_queue, 8, "How many tasks of mutating parts are allowed simultaneously in ReplicatedMergeTree queue.", 0) \
4545
M(UInt64, max_replicated_merges_with_ttl_in_queue, 1, "How many tasks of merging parts with TTL are allowed simultaneously in ReplicatedMergeTree queue.", 0) \
46-
M(UInt64, number_of_free_entries_in_pool_to_lower_max_size_of_merge, 8, "When there is less than specified number of free entries in pool (or replicated queue), start to lower maximum size of merge to process (or to put in queue). This is to allow small merges to process - not filling the pool with long running merges.", 0) \
47-
M(UInt64, number_of_free_entries_in_pool_to_execute_mutation, 20, "When there is less than specified number of free entries in pool, do not execute part mutations. This is to leave free threads for regular merges and avoid \"Too many parts\"", 0) \
46+
M(UInt64, number_of_free_entries_in_pool_to_lower_max_size_of_merge, 0, "When there is less than specified number of free entries in pool (or replicated queue), start to lower maximum size of merge to process (or to put in queue). This is to allow small merges to process - not filling the pool with long running merges.", 0) \
47+
M(UInt64, number_of_free_entries_in_pool_to_execute_mutation, 0, "When there is less than specified number of free entries in pool, do not execute part mutations. This is to leave free threads for regular merges and avoid \"Too many parts\"", 0) \
4848
M(UInt64, max_number_of_mutations_for_replica, 0, "Limit the number of part mutations per replica to the specified amount. Zero means no limit on the number of mutations per replica (the execution can still be constrained by other settings).", 0) \
4949
M(UInt64, max_number_of_merges_with_ttl_in_pool, 2, "When there is more than specified number of merges with TTL entries in pool, do not assign new merge with TTL. This is to leave free threads for regular merges and avoid \"Too many parts\"", 0) \
5050
M(Seconds, old_parts_lifetime, 8 * 60, "How many seconds to keep obsolete parts.", 0) \

tests/test_stateful.py

Lines changed: 19 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
current_process = psutil.Process()
1212
check_thread_count = False
1313

14+
1415
class TestStateful(unittest.TestCase):
1516
def setUp(self) -> None:
1617
shutil.rmtree(test_state_dir, ignore_errors=True)
@@ -41,7 +42,7 @@ def test_path(self):
4142
ret = sess.query("SELECT * FROM db_xxx.view_xxx", "CSV")
4243
self.assertEqual(str(ret), "1\n2\n")
4344

44-
del sess # name sess dir will not be deleted
45+
del sess # name sess dir will not be deleted
4546

4647
sess = session.Session(test_state_dir)
4748
ret = sess.query("SELECT chdb_xxx()", "CSV")
@@ -64,6 +65,22 @@ def test_path(self):
6465
ret = sess2.query("SELECT chdb_xxx()", "CSV")
6566
self.assertEqual(str(ret), "")
6667

68+
def test_mergetree(self):
69+
sess = session.Session()
70+
sess.query(
71+
"CREATE DATABASE IF NOT EXISTS db_xxx_merge ENGINE = Atomic;", "CSV"
72+
)
73+
sess.query(
74+
"CREATE TABLE IF NOT EXISTS db_xxx_merge.log_table_xxx (x String, y Int) ENGINE = MergeTree ORDER BY x;"
75+
)
76+
# insert 1000000 random rows
77+
sess.query(
78+
"INSERT INTO db_xxx_merge.log_table_xxx (x, y) SELECT toString(rand()), rand() FROM numbers(1000000);"
79+
)
80+
sess.query("Optimize TABLE db_xxx_merge.log_table_xxx;")
81+
ret = sess.query("SELECT count(*) FROM db_xxx_merge.log_table_xxx;")
82+
self.assertEqual(str(ret), "1000000\n")
83+
6784
def test_tmp(self):
6885
sess = session.Session()
6986
sess.query("CREATE FUNCTION chdb_xxx AS () -> '0.12.0'", "CSV")
@@ -83,6 +100,7 @@ def test_zfree_thread_count(self):
83100
if check_thread_count:
84101
self.assertEqual(thread_count, 1)
85102

103+
86104
if __name__ == "__main__":
87105
shutil.rmtree(test_state_dir, ignore_errors=True)
88106
check_thread_count = True

0 commit comments

Comments
 (0)