Skip to content
New issue

Have a question about this project? # for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “#”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? # to your account

fix: rocksdb options not changed even if update in Pegasus config file #1108

Merged
merged 23 commits into from
Aug 25, 2022
108 changes: 103 additions & 5 deletions src/server/pegasus_server_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1560,9 +1560,10 @@ dsn::error_code pegasus_server_impl::start(int argc, char **argv)

ddebug_replica("start to open rocksDB's rdb({})", rdb_path);

// Here we create a `tmp_data_cf_opts` because we don't want to modify `_data_cf_opts`, which
// Here we create a `_table_data_cf_opts` because we don't want to modify `_data_cf_opts`, which
// will be used elsewhere.
rocksdb::ColumnFamilyOptions tmp_data_cf_opts = _data_cf_opts;
_table_data_cf_opts = _data_cf_opts;
_table_data_cf_opts_recalculated = false;
bool has_incompatible_db_options = false;
if (db_exist) {
// When DB exists, meta CF and data CF must be present.
Expand Down Expand Up @@ -1609,7 +1610,7 @@ dsn::error_code pegasus_server_impl::start(int argc, char **argv)
// We don't use `loaded_data_cf_opts` directly because pointer-typed options will
// only be initialized with default values when calling 'LoadLatestOptions', see
// 'rocksdb/utilities/options_util.h'.
reset_usage_scenario_options(loaded_data_cf_opts, &tmp_data_cf_opts);
reset_usage_scenario_options(loaded_data_cf_opts, &_table_data_cf_opts);
_db_opts.allow_ingest_behind = parse_allow_ingest_behind(envs);
}
} else {
Expand All @@ -1620,7 +1621,7 @@ dsn::error_code pegasus_server_impl::start(int argc, char **argv)
}

std::vector<rocksdb::ColumnFamilyDescriptor> column_families(
{{DATA_COLUMN_FAMILY_NAME, tmp_data_cf_opts}, {META_COLUMN_FAMILY_NAME, _meta_cf_opts}});
{{DATA_COLUMN_FAMILY_NAME, _table_data_cf_opts}, {META_COLUMN_FAMILY_NAME, _meta_cf_opts}});
auto s = rocksdb::CheckOptionsCompatibility(rdb_path,
rocksdb::Env::Default(),
_db_opts,
Expand Down Expand Up @@ -2634,6 +2635,10 @@ void pegasus_server_impl::update_usage_scenario(const std::map<std::string, std:
old_usage_scenario,
new_usage_scenario);
}
} else {
// When an old db is opened and the rocksDB related configs in server config.ini has been
// changed, the options related to usage scenario need to be recalculated with new values.
recalculate_data_cf_options(_table_data_cf_opts);
}
}

Expand Down Expand Up @@ -2993,7 +2998,8 @@ bool pegasus_server_impl::set_usage_scenario(const std::string &usage_scenario)
void pegasus_server_impl::reset_usage_scenario_options(
const rocksdb::ColumnFamilyOptions &base_opts, rocksdb::ColumnFamilyOptions *target_opts)
{
// reset usage scenario related options, refer to options set in 'set_usage_scenario' function.
// reset usage scenario related options, refer to options set in 'set_usage_scenario'
// function.
target_opts->level0_file_num_compaction_trigger = base_opts.level0_file_num_compaction_trigger;
target_opts->level0_slowdown_writes_trigger = base_opts.level0_slowdown_writes_trigger;
target_opts->level0_stop_writes_trigger = base_opts.level0_stop_writes_trigger;
Expand All @@ -3007,6 +3013,98 @@ void pegasus_server_impl::reset_usage_scenario_options(
target_opts->max_write_buffer_number = base_opts.max_write_buffer_number;
}

void pegasus_server_impl::recalculate_data_cf_options(
const rocksdb::ColumnFamilyOptions &cur_data_cf_opts)
{
#define UPDATE_NUMBER_OPTION_IF_NEEDED(option, value) \
do { \
if ((value) != cur_data_cf_opts.option) { \
new_options[#option] = std::to_string((value)); \
} \
} while (0)

#define UPDATE_BOOL_OPTION_IF_NEEDED(option, value) \
do { \
if ((value) != cur_data_cf_opts.option) { \
if ((value)) { \
new_options[#option] = "true"; \
} else { \
new_options[#option] = "false"; \
} \
} \
} while (0)

#define UPDATE_OPTION_IF_NEEDED(option) UPDATE_NUMBER_OPTION_IF_NEEDED(option, _data_cf_opts.option)

if (_table_data_cf_opts_recalculated)
return;
std::unordered_map<std::string, std::string> new_options;
if (ROCKSDB_ENV_USAGE_SCENARIO_NORMAL == _usage_scenario ||
ROCKSDB_ENV_USAGE_SCENARIO_PREFER_WRITE == _usage_scenario) {
if (ROCKSDB_ENV_USAGE_SCENARIO_NORMAL == _usage_scenario) {
if (!check_value_if_nearby(_data_cf_opts.write_buffer_size,
cur_data_cf_opts.write_buffer_size)) {
new_options["write_buffer_size"] =
std::to_string(get_random_nearby(_data_cf_opts.write_buffer_size));
}
UPDATE_OPTION_IF_NEEDED(level0_file_num_compaction_trigger);
} else {
uint64_t buffer_size = dsn::rand::next_u64(_data_cf_opts.write_buffer_size,
_data_cf_opts.write_buffer_size * 2);
if (!(cur_data_cf_opts.write_buffer_size >= _data_cf_opts.write_buffer_size &&
cur_data_cf_opts.write_buffer_size <= _data_cf_opts.write_buffer_size * 2)) {
new_options["write_buffer_size"] = std::to_string(buffer_size);
uint64_t max_size = get_random_nearby(_data_cf_opts.max_bytes_for_level_base);
new_options["level0_file_num_compaction_trigger"] =
std::to_string(std::max<uint64_t>(4UL, max_size / buffer_size));
} else if (!check_value_if_nearby(_data_cf_opts.max_bytes_for_level_base,
cur_data_cf_opts.max_bytes_for_level_base)) {
uint64_t max_size = get_random_nearby(_data_cf_opts.max_bytes_for_level_base);
new_options["level0_file_num_compaction_trigger"] =
std::to_string(std::max<uint64_t>(4UL, max_size / buffer_size));
}
}
UPDATE_OPTION_IF_NEEDED(level0_slowdown_writes_trigger);
UPDATE_OPTION_IF_NEEDED(level0_stop_writes_trigger);
UPDATE_OPTION_IF_NEEDED(soft_pending_compaction_bytes_limit);
UPDATE_OPTION_IF_NEEDED(hard_pending_compaction_bytes_limit);
UPDATE_BOOL_OPTION_IF_NEEDED(disable_auto_compactions, false);
UPDATE_OPTION_IF_NEEDED(max_compaction_bytes);
UPDATE_OPTION_IF_NEEDED(max_write_buffer_number);
} else {
// ROCKSDB_ENV_USAGE_SCENARIO_BULK_LOAD
UPDATE_NUMBER_OPTION_IF_NEEDED(level0_file_num_compaction_trigger, 1000000000);
UPDATE_NUMBER_OPTION_IF_NEEDED(level0_slowdown_writes_trigger, 1000000000);
UPDATE_NUMBER_OPTION_IF_NEEDED(level0_stop_writes_trigger, 1000000000);
UPDATE_NUMBER_OPTION_IF_NEEDED(soft_pending_compaction_bytes_limit, 0);
UPDATE_NUMBER_OPTION_IF_NEEDED(hard_pending_compaction_bytes_limit, 0);
UPDATE_BOOL_OPTION_IF_NEEDED(disable_auto_compactions, true);
UPDATE_NUMBER_OPTION_IF_NEEDED(max_compaction_bytes, static_cast<uint64_t>(1) << 60);
if (!check_value_if_nearby(_data_cf_opts.write_buffer_size * 4,
cur_data_cf_opts.write_buffer_size)) {
new_options["write_buffer_size"] =
std::to_string(get_random_nearby(_data_cf_opts.write_buffer_size * 4));
}
if (cur_data_cf_opts.max_write_buffer_number !=
std::max(_data_cf_opts.max_write_buffer_number, 6)) {
new_options["max_write_buffer_number"] =
std::to_string(std::max(_data_cf_opts.max_write_buffer_number, 6));
}
}
if (new_options.size() > 0) {
if (set_options(new_options)) {
ddebug_replica(
"{}: recalculate the value of the options related to usage scenario \"{}\"",
replica_name(),
_usage_scenario);
}
}
_table_data_cf_opts_recalculated = true;
#undef UPDATE_OPTION_IF_NEEDED
#undef UPDATE_BOOL_OPTION_IF_NEEDED
#undef UPDATE_NUMBER_OPTION_IF_NEEDED
}

bool pegasus_server_impl::set_options(
const std::unordered_map<std::string, std::string> &new_options)
{
Expand Down
19 changes: 19 additions & 0 deletions src/server/pegasus_server_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -312,6 +312,9 @@ class pegasus_server_impl : public pegasus_read_service
// return true if successfully changed
bool set_usage_scenario(const std::string &usage_scenario);

// recalculate option value if necessary
void recalculate_data_cf_options(const rocksdb::ColumnFamilyOptions &cur_data_cf_opts);

void reset_usage_scenario_options(const rocksdb::ColumnFamilyOptions &base_opts,
rocksdb::ColumnFamilyOptions *target_opts);

Expand All @@ -325,6 +328,15 @@ class pegasus_server_impl : public pegasus_read_service
return dsn::rand::next_u64(base_value - gap, base_value + gap);
}

// return true if value in range of [0.75, 1.25] * base_value
bool check_value_if_nearby(uint64_t base_value, uint64_t check_value)
{
uint64_t gap = base_value / 4;
uint64_t actual_gap =
(base_value < check_value) ? check_value - base_value : base_value - check_value;
return actual_gap <= gap;
}

// return true if expired
bool check_if_record_expired(uint32_t epoch_now, rocksdb::Slice raw_value)
{
Expand Down Expand Up @@ -413,11 +425,18 @@ class pegasus_server_impl : public pegasus_read_service
std::shared_ptr<KeyWithTTLCompactionFilterFactory> _key_ttl_compaction_filter_factory;
std::shared_ptr<rocksdb::Statistics> _statistics;
rocksdb::DBOptions _db_opts;
// The value of option in data_cf according to conf template file config.ini
rocksdb::ColumnFamilyOptions _data_cf_opts;
// Dynamically calculate the value of current data_cf option according to the conf module file
// and usage scenario
rocksdb::ColumnFamilyOptions _table_data_cf_opts;
rocksdb::ColumnFamilyOptions _meta_cf_opts;
rocksdb::ReadOptions _data_cf_rd_opts;
std::string _usage_scenario;
std::string _user_specified_compaction;
// Whether it is necessary to update the current data_cf, it is required when opening the db at
// the first time, but not later
bool _table_data_cf_opts_recalculated;

rocksdb::DB *_db;
rocksdb::ColumnFamilyHandle *_data_cf;
Expand Down