@@ -95,11 +95,14 @@ impl Default for Config {
95
95
}
96
96
}
97
97
98
- fn default_opts ( ) -> rocksdb:: Options {
98
+ fn default_opts ( parallelism : u8 ) -> rocksdb:: Options {
99
99
let mut block_opts = rocksdb:: BlockBasedOptions :: default ( ) ;
100
100
block_opts. set_checksum_type ( rocksdb:: ChecksumType :: CRC32c ) ;
101
101
102
102
let mut opts = rocksdb:: Options :: default ( ) ;
103
+ opts. increase_parallelism ( parallelism. into ( ) ) ;
104
+ opts. set_max_subcompactions ( parallelism. into ( ) ) ;
105
+
103
106
opts. set_keep_log_file_num ( 10 ) ;
104
107
opts. set_max_open_files ( 16 ) ;
105
108
opts. set_compaction_style ( rocksdb:: DBCompactionStyle :: Level ) ;
@@ -114,23 +117,27 @@ fn default_opts() -> rocksdb::Options {
114
117
}
115
118
116
119
impl DBStore {
117
- fn create_cf_descriptors ( ) -> Vec < rocksdb:: ColumnFamilyDescriptor > {
120
+ fn create_cf_descriptors ( parallelism : u8 ) -> Vec < rocksdb:: ColumnFamilyDescriptor > {
118
121
COLUMN_FAMILIES
119
122
. iter ( )
120
- . map ( |& name| rocksdb:: ColumnFamilyDescriptor :: new ( name, default_opts ( ) ) )
123
+ . map ( |& name| rocksdb:: ColumnFamilyDescriptor :: new ( name, default_opts ( parallelism ) ) )
121
124
. collect ( )
122
125
}
123
126
124
- fn open_internal ( path : & Path , log_dir : Option < & Path > ) -> Result < Self > {
125
- let mut db_opts = default_opts ( ) ;
127
+ fn open_internal ( path : & Path , log_dir : Option < & Path > , parallelism : u8 ) -> Result < Self > {
128
+ let mut db_opts = default_opts ( parallelism ) ;
126
129
db_opts. create_if_missing ( true ) ;
127
130
db_opts. create_missing_column_families ( true ) ;
128
131
if let Some ( d) = log_dir {
129
132
db_opts. set_db_log_dir ( d) ;
130
133
}
131
134
132
- let db = rocksdb:: DB :: open_cf_descriptors ( & db_opts, path, Self :: create_cf_descriptors ( ) )
133
- . with_context ( || format ! ( "failed to open DB: {}" , path. display( ) ) ) ?;
135
+ let db = rocksdb:: DB :: open_cf_descriptors (
136
+ & db_opts,
137
+ path,
138
+ Self :: create_cf_descriptors ( parallelism) ,
139
+ )
140
+ . with_context ( || format ! ( "failed to open DB: {}" , path. display( ) ) ) ?;
134
141
let live_files = db. live_files ( ) ?;
135
142
info ! (
136
143
"{:?}: {} SST files, {} GB, {} Grows" ,
@@ -155,8 +162,13 @@ impl DBStore {
155
162
}
156
163
157
164
/// Opens a new RocksDB at the specified location.
158
- pub fn open ( path : & Path , log_dir : Option < & Path > , auto_reindex : bool ) -> Result < Self > {
159
- let mut store = Self :: open_internal ( path, log_dir) ?;
165
+ pub fn open (
166
+ path : & Path ,
167
+ log_dir : Option < & Path > ,
168
+ auto_reindex : bool ,
169
+ parallelism : u8 ,
170
+ ) -> Result < Self > {
171
+ let mut store = Self :: open_internal ( path, log_dir, parallelism) ?;
160
172
let config = store. get_config ( ) ;
161
173
debug ! ( "DB {:?}" , config) ;
162
174
let mut config = config. unwrap_or_default ( ) ; // use default config when DB is empty
@@ -182,13 +194,13 @@ impl DBStore {
182
194
) ;
183
195
// close DB before deletion
184
196
drop ( store) ;
185
- rocksdb:: DB :: destroy ( & default_opts ( ) , path) . with_context ( || {
197
+ rocksdb:: DB :: destroy ( & default_opts ( parallelism ) , path) . with_context ( || {
186
198
format ! (
187
199
"re-index required but the old database ({}) can not be deleted" ,
188
200
path. display( )
189
201
)
190
202
} ) ?;
191
- store = Self :: open_internal ( path, log_dir) ?;
203
+ store = Self :: open_internal ( path, log_dir, parallelism ) ?;
192
204
config = Config :: default ( ) ; // re-init config after dropping DB
193
205
}
194
206
if config. compacted {
@@ -432,13 +444,13 @@ mod tests {
432
444
fn test_reindex_new_format ( ) {
433
445
let dir = tempfile:: tempdir ( ) . unwrap ( ) ;
434
446
{
435
- let store = DBStore :: open ( dir. path ( ) , None , false ) . unwrap ( ) ;
447
+ let store = DBStore :: open ( dir. path ( ) , None , false , 1 ) . unwrap ( ) ;
436
448
let mut config = store. get_config ( ) . unwrap ( ) ;
437
449
config. format += 1 ;
438
450
store. set_config ( config) ;
439
451
} ;
440
452
assert_eq ! (
441
- DBStore :: open( dir. path( ) , None , false )
453
+ DBStore :: open( dir. path( ) , None , false , 1 )
442
454
. err( )
443
455
. unwrap( )
444
456
. to_string( ) ,
@@ -449,7 +461,7 @@ mod tests {
449
461
)
450
462
) ;
451
463
{
452
- let store = DBStore :: open ( dir. path ( ) , None , true ) . unwrap ( ) ;
464
+ let store = DBStore :: open ( dir. path ( ) , None , true , 1 ) . unwrap ( ) ;
453
465
store. flush ( ) ;
454
466
let config = store. get_config ( ) . unwrap ( ) ;
455
467
assert_eq ! ( config. format, CURRENT_FORMAT ) ;
@@ -467,14 +479,14 @@ mod tests {
467
479
db. put ( b"F" , b"" ) . unwrap ( ) ; // insert legacy DB compaction marker (in 'default' column family)
468
480
} ;
469
481
assert_eq ! (
470
- DBStore :: open( dir. path( ) , None , false )
482
+ DBStore :: open( dir. path( ) , None , false , 1 )
471
483
. err( )
472
484
. unwrap( )
473
485
. to_string( ) ,
474
486
format!( "re-index required due to legacy format" , )
475
487
) ;
476
488
{
477
- let store = DBStore :: open ( dir. path ( ) , None , true ) . unwrap ( ) ;
489
+ let store = DBStore :: open ( dir. path ( ) , None , true , 1 ) . unwrap ( ) ;
478
490
store. flush ( ) ;
479
491
let config = store. get_config ( ) . unwrap ( ) ;
480
492
assert_eq ! ( config. format, CURRENT_FORMAT ) ;
@@ -484,7 +496,7 @@ mod tests {
484
496
#[ test]
485
497
fn test_db_prefix_scan ( ) {
486
498
let dir = tempfile:: tempdir ( ) . unwrap ( ) ;
487
- let store = DBStore :: open ( dir. path ( ) , None , true ) . unwrap ( ) ;
499
+ let store = DBStore :: open ( dir. path ( ) , None , true , 1 ) . unwrap ( ) ;
488
500
489
501
let items = [
490
502
* b"ab " ,
@@ -509,15 +521,15 @@ mod tests {
509
521
#[ test]
510
522
fn test_db_log_in_same_dir ( ) {
511
523
let dir1 = tempfile:: tempdir ( ) . unwrap ( ) ;
512
- let _store = DBStore :: open ( dir1. path ( ) , None , true ) . unwrap ( ) ;
524
+ let _store = DBStore :: open ( dir1. path ( ) , None , true , 1 ) . unwrap ( ) ;
513
525
514
526
// LOG file is created in dir1
515
527
let dir_files = list_log_files ( dir1. path ( ) ) ;
516
528
assert_eq ! ( dir_files, vec![ OsStr :: new( "LOG" ) ] ) ;
517
529
518
530
let dir2 = tempfile:: tempdir ( ) . unwrap ( ) ;
519
531
let dir3 = tempfile:: tempdir ( ) . unwrap ( ) ;
520
- let _store = DBStore :: open ( dir2. path ( ) , Some ( dir3. path ( ) ) , true ) . unwrap ( ) ;
532
+ let _store = DBStore :: open ( dir2. path ( ) , Some ( dir3. path ( ) ) , true , 1 ) . unwrap ( ) ;
521
533
522
534
// *_LOG file is not created in dir2, but in dir3
523
535
let dir_files = list_log_files ( dir2. path ( ) ) ;
0 commit comments