1+ use prometheus:: GaugeVec ;
12use rocksdb;
23
4+ use std:: convert:: TryInto ;
35use std:: path:: Path ;
6+ use std:: sync:: Arc ;
7+ use std:: thread;
8+ use std:: time:: Duration ;
49
510use crate :: config:: Config ;
6- use crate :: util:: { bincode, Bytes } ;
11+ use crate :: new_index:: db_metrics:: RocksDbMetrics ;
12+ use crate :: util:: { bincode, spawn_thread, Bytes } ;
713
814static DB_VERSION : u32 = 1 ;
915
@@ -71,7 +77,7 @@ impl<'a> Iterator for ReverseScanIterator<'a> {
7177
7278#[ derive( Debug ) ]
7379pub struct DB {
74- db : rocksdb:: DB ,
80+ db : Arc < rocksdb:: DB > ,
7581}
7682
7783#[ derive( Copy , Clone , Debug ) ]
@@ -89,18 +95,29 @@ impl DB {
8995 db_opts. set_compaction_style ( rocksdb:: DBCompactionStyle :: Level ) ;
9096 db_opts. set_compression_type ( rocksdb:: DBCompressionType :: Snappy ) ;
9197 db_opts. set_target_file_size_base ( 1_073_741_824 ) ;
92- db_opts. set_write_buffer_size ( 256 << 20 ) ;
9398 db_opts. set_disable_auto_compactions ( !config. initial_sync_compaction ) ; // for initial bulk load
9499
100+
101+ let parallelism: i32 = config. db_parallelism . try_into ( )
102+ . expect ( "db_parallelism value too large for i32" ) ;
103+
104+ // Configure parallelism (background jobs and thread pools)
105+ db_opts. increase_parallelism ( parallelism) ;
106+
107+ // Configure write buffer size (not set by increase_parallelism)
108+ db_opts. set_write_buffer_size ( config. db_write_buffer_size_mb * 1024 * 1024 ) ;
109+
95110 // db_opts.set_advise_random_on_open(???);
96111 db_opts. set_compaction_readahead_size ( 1 << 20 ) ;
97- db_opts. increase_parallelism ( 2 ) ;
98112
99- // let mut block_opts = rocksdb::BlockBasedOptions::default();
100- // block_opts.set_block_size(???);
113+ // Configure block cache
114+ let mut block_opts = rocksdb:: BlockBasedOptions :: default ( ) ;
115+ let cache_size_bytes = config. db_block_cache_mb * 1024 * 1024 ;
116+ block_opts. set_block_cache ( & rocksdb:: Cache :: new_lru_cache ( cache_size_bytes) ) ;
117+ db_opts. set_block_based_table_factory ( & block_opts) ;
101118
102119 let db = DB {
103- db : rocksdb:: DB :: open ( & db_opts, path) . expect ( "failed to open RocksDB" ) ,
120+ db : Arc :: new ( rocksdb:: DB :: open ( & db_opts, path) . expect ( "failed to open RocksDB" ) )
104121 } ;
105122 db. verify_compatibility ( config) ;
106123 db
@@ -220,4 +237,54 @@ impl DB {
220237 Some ( _) => ( ) ,
221238 }
222239 }
240+
241+ pub fn start_stats_exporter ( & self , db_metrics : Arc < RocksDbMetrics > , db_name : & str ) {
242+ let db_arc = Arc :: clone ( & self . db ) ;
243+ let label = db_name. to_string ( ) ;
244+
245+ let update_gauge = move |gauge : & GaugeVec , property : & str | {
246+ if let Ok ( Some ( value) ) = db_arc. property_value ( property) {
247+ if let Ok ( v) = value. parse :: < f64 > ( ) {
248+ gauge. with_label_values ( & [ & label] ) . set ( v) ;
249+ }
250+ }
251+ } ;
252+
253+ spawn_thread ( "db_stats_exporter" , move || loop {
254+ update_gauge ( & db_metrics. num_immutable_mem_table , "rocksdb.num-immutable-mem-table" ) ;
255+ update_gauge ( & db_metrics. mem_table_flush_pending , "rocksdb.mem-table-flush-pending" ) ;
256+ update_gauge ( & db_metrics. compaction_pending , "rocksdb.compaction-pending" ) ;
257+ update_gauge ( & db_metrics. background_errors , "rocksdb.background-errors" ) ;
258+ update_gauge ( & db_metrics. cur_size_active_mem_table , "rocksdb.cur-size-active-mem-table" ) ;
259+ update_gauge ( & db_metrics. cur_size_all_mem_tables , "rocksdb.cur-size-all-mem-tables" ) ;
260+ update_gauge ( & db_metrics. size_all_mem_tables , "rocksdb.size-all-mem-tables" ) ;
261+ update_gauge ( & db_metrics. num_entries_active_mem_table , "rocksdb.num-entries-active-mem-table" ) ;
262+ update_gauge ( & db_metrics. num_entries_imm_mem_tables , "rocksdb.num-entries-imm-mem-tables" ) ;
263+ update_gauge ( & db_metrics. num_deletes_active_mem_table , "rocksdb.num-deletes-active-mem-table" ) ;
264+ update_gauge ( & db_metrics. num_deletes_imm_mem_tables , "rocksdb.num-deletes-imm-mem-tables" ) ;
265+ update_gauge ( & db_metrics. estimate_num_keys , "rocksdb.estimate-num-keys" ) ;
266+ update_gauge ( & db_metrics. estimate_table_readers_mem , "rocksdb.estimate-table-readers-mem" ) ;
267+ update_gauge ( & db_metrics. is_file_deletions_enabled , "rocksdb.is-file-deletions-enabled" ) ;
268+ update_gauge ( & db_metrics. num_snapshots , "rocksdb.num-snapshots" ) ;
269+ update_gauge ( & db_metrics. oldest_snapshot_time , "rocksdb.oldest-snapshot-time" ) ;
270+ update_gauge ( & db_metrics. num_live_versions , "rocksdb.num-live-versions" ) ;
271+ update_gauge ( & db_metrics. current_super_version_number , "rocksdb.current-super-version-number" ) ;
272+ update_gauge ( & db_metrics. estimate_live_data_size , "rocksdb.estimate-live-data-size" ) ;
273+ update_gauge ( & db_metrics. min_log_number_to_keep , "rocksdb.min-log-number-to-keep" ) ;
274+ update_gauge ( & db_metrics. min_obsolete_sst_number_to_keep , "rocksdb.min-obsolete-sst-number-to-keep" ) ;
275+ update_gauge ( & db_metrics. total_sst_files_size , "rocksdb.total-sst-files-size" ) ;
276+ update_gauge ( & db_metrics. live_sst_files_size , "rocksdb.live-sst-files-size" ) ;
277+ update_gauge ( & db_metrics. base_level , "rocksdb.base-level" ) ;
278+ update_gauge ( & db_metrics. estimate_pending_compaction_bytes , "rocksdb.estimate-pending-compaction-bytes" ) ;
279+ update_gauge ( & db_metrics. num_running_compactions , "rocksdb.num-running-compactions" ) ;
280+ update_gauge ( & db_metrics. num_running_flushes , "rocksdb.num-running-flushes" ) ;
281+ update_gauge ( & db_metrics. actual_delayed_write_rate , "rocksdb.actual-delayed-write-rate" ) ;
282+ update_gauge ( & db_metrics. is_write_stopped , "rocksdb.is-write-stopped" ) ;
283+ update_gauge ( & db_metrics. estimate_oldest_key_time , "rocksdb.estimate-oldest-key-time" ) ;
284+ update_gauge ( & db_metrics. block_cache_capacity , "rocksdb.block-cache-capacity" ) ;
285+ update_gauge ( & db_metrics. block_cache_usage , "rocksdb.block-cache-usage" ) ;
286+ update_gauge ( & db_metrics. block_cache_pinned_usage , "rocksdb.block-cache-pinned-usage" ) ;
287+ thread:: sleep ( Duration :: from_secs ( 5 ) ) ;
288+ } ) ;
289+ }
223290}
0 commit comments