Skip to content
New issue

Have a question about this project? # for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “#”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? # to your account

add option elasticsearch-start-es-after-block to es plugin #1458

Merged
merged 2 commits into from
Dec 21, 2018
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 15 additions & 8 deletions libraries/plugins/elasticsearch/elasticsearch_plugin.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ class elasticsearch_plugin_impl
std::string _elasticsearch_basic_auth = "";
std::string _elasticsearch_index_prefix = "bitshares-";
bool _elasticsearch_operation_object = false;
uint32_t _elasticsearch_start_es_after_block = 0; // disabled
CURL *curl; // curl handler
vector <string> bulk_lines; // vector of op lines
vector<std::string> prepare;
Expand All @@ -74,7 +75,7 @@ class elasticsearch_plugin_impl
std::string index_name;
bool is_sync = false;
private:
bool add_elasticsearch( const account_id_type account_id, const optional<operation_history_object>& oho );
bool add_elasticsearch( const account_id_type account_id, const optional<operation_history_object>& oho, const uint32_t block_number );
const account_transaction_history_object& addNewEntry(const account_statistics_object& stats_obj,
const account_id_type& account_id,
const optional <operation_history_object>& oho);
Expand Down Expand Up @@ -163,7 +164,7 @@ bool elasticsearch_plugin_impl::update_account_histories( const signed_block& b

for( auto& account_id : impacted )
{
if(!add_elasticsearch( account_id, oho ))
if(!add_elasticsearch( account_id, oho, b.block_num() ))
return false;
}
}
Expand Down Expand Up @@ -276,13 +277,16 @@ void elasticsearch_plugin_impl::doVisitor(const optional <operation_history_obje
}

bool elasticsearch_plugin_impl::add_elasticsearch( const account_id_type account_id,
const optional <operation_history_object>& oho)
const optional <operation_history_object>& oho,
const uint32_t block_number)
{
const auto &stats_obj = getStatsObject(account_id);
const auto &ath = addNewEntry(stats_obj, account_id, oho);
growStats(stats_obj, ath);
createBulkLine(ath);
prepareBulk(ath.id);
if(_elasticsearch_start_es_after_block == 0 || block_number > _elasticsearch_start_es_after_block) {
createBulkLine(ath);
prepareBulk(ath.id);
}
cleanObjects(ath.id, account_id);

if (curl && bulk_lines.size() >= limit_documents) { // we are in bulk time, ready to add data to elasticsearech
Expand Down Expand Up @@ -428,18 +432,18 @@ void elasticsearch_plugin::plugin_set_program_options(
("elasticsearch-basic-auth", boost::program_options::value<std::string>(), "Pass basic auth to elasticsearch database('')")
("elasticsearch-index-prefix", boost::program_options::value<std::string>(), "Add a prefix to the index(bitshares-)")
("elasticsearch-operation-object", boost::program_options::value<bool>(), "Save operation as object(false)")
("elasticsearch-start-es-after-block", boost::program_options::value<uint32_t>(), "Start doing ES job after block(0)")
;
cfg.add(cli);
}

void elasticsearch_plugin::plugin_initialize(const boost::program_options::variables_map& options)
{
database().applied_block.connect( [&]( const signed_block& b) {
if(!my->update_account_histories(b))
{
if (!my->update_account_histories(b))
FC_THROW_EXCEPTION(graphene::chain::plugin_exception, "Error populating ES database, we are going to keep trying.");
}
} );

my->_oho_index = database().add_index< primary_index< operation_history_index > >();
database().add_index< primary_index< account_transaction_history_index > >();

Expand All @@ -464,6 +468,9 @@ void elasticsearch_plugin::plugin_initialize(const boost::program_options::varia
if (options.count("elasticsearch-operation-object")) {
my->_elasticsearch_operation_object = options["elasticsearch-operation-object"].as<bool>();
}
if (options.count("elasticsearch-start-es-after-block")) {
my->_elasticsearch_start_es_after_block = options["elasticsearch-start-es-after-block"].as<uint32_t>();
}
}

void elasticsearch_plugin::plugin_startup()
Expand Down