From 5909b13da4820bab475b25ecb908b0f44bdaee5d Mon Sep 17 00:00:00 2001 From: Alfredo Date: Wed, 28 Nov 2018 20:17:20 -0300 Subject: [PATCH 1/2] add option elasticsearch-start-es-after-block --- .../plugins/elasticsearch/elasticsearch_plugin.cpp | 14 +++++++++++--- 1 file changed, 11 insertions(+), 3 deletions(-) diff --git a/libraries/plugins/elasticsearch/elasticsearch_plugin.cpp b/libraries/plugins/elasticsearch/elasticsearch_plugin.cpp index dcffaea5f9..bfde1e02e8 100644 --- a/libraries/plugins/elasticsearch/elasticsearch_plugin.cpp +++ b/libraries/plugins/elasticsearch/elasticsearch_plugin.cpp @@ -59,6 +59,7 @@ class elasticsearch_plugin_impl std::string _elasticsearch_basic_auth = ""; std::string _elasticsearch_index_prefix = "bitshares-"; bool _elasticsearch_operation_object = false; + uint32_t _elasticsearch_start_es_after_block = 0; // disabled CURL *curl; // curl handler vector bulk_lines; // vector of op lines vector prepare; @@ -428,6 +429,7 @@ void elasticsearch_plugin::plugin_set_program_options( ("elasticsearch-basic-auth", boost::program_options::value(), "Pass basic auth to elasticsearch database('')") ("elasticsearch-index-prefix", boost::program_options::value(), "Add a prefix to the index(bitshares-)") ("elasticsearch-operation-object", boost::program_options::value(), "Save operation as object(false)") + ("elasticsearch-start-es-after-block", boost::program_options::value(), "Start doing ES job after block(0)") ; cfg.add(cli); } @@ -435,11 +437,14 @@ void elasticsearch_plugin::plugin_set_program_options( void elasticsearch_plugin::plugin_initialize(const boost::program_options::variables_map& options) { database().applied_block.connect( [&]( const signed_block& b) { - if(!my->update_account_histories(b)) - { - FC_THROW_EXCEPTION(graphene::chain::plugin_exception, "Error populating ES database, we are going to keep trying."); + if(my->_elasticsearch_start_es_after_block == 0 || b.block_num() > my->_elasticsearch_start_es_after_block) { + if (!my->update_account_histories(b)) { + FC_THROW_EXCEPTION(graphene::chain::plugin_exception, + "Error populating ES database, we are going to keep trying."); + } } } ); + my->_oho_index = database().add_index< primary_index< operation_history_index > >(); database().add_index< primary_index< account_transaction_history_index > >(); @@ -464,6 +469,9 @@ void elasticsearch_plugin::plugin_initialize(const boost::program_options::varia if (options.count("elasticsearch-operation-object")) { my->_elasticsearch_operation_object = options["elasticsearch-operation-object"].as(); } + if (options.count("elasticsearch-start-es-after-block")) { + my->_elasticsearch_start_es_after_block = options["elasticsearch-start-es-after-block"].as(); + } } void elasticsearch_plugin::plugin_startup() From ee4cc4b96ca278e2b8ed28fe5b52cecd9119f755 Mon Sep 17 00:00:00 2001 From: Alfredo Date: Wed, 28 Nov 2018 20:50:05 -0300 Subject: [PATCH 2/2] move start_es_after_block check more deeper --- .../elasticsearch/elasticsearch_plugin.cpp | 21 +++++++++---------- 1 file changed, 10 insertions(+), 11 deletions(-) diff --git a/libraries/plugins/elasticsearch/elasticsearch_plugin.cpp b/libraries/plugins/elasticsearch/elasticsearch_plugin.cpp index bfde1e02e8..6af2df19ac 100644 --- a/libraries/plugins/elasticsearch/elasticsearch_plugin.cpp +++ b/libraries/plugins/elasticsearch/elasticsearch_plugin.cpp @@ -75,7 +75,7 @@ class elasticsearch_plugin_impl std::string index_name; bool is_sync = false; private: - bool add_elasticsearch( const account_id_type account_id, const optional& oho ); + bool add_elasticsearch( const account_id_type account_id, const optional& oho, const uint32_t block_number ); const account_transaction_history_object& addNewEntry(const account_statistics_object& stats_obj, const account_id_type& account_id, const optional & oho); @@ -164,7 +164,7 @@ bool elasticsearch_plugin_impl::update_account_histories( const signed_block& b for( auto& account_id : impacted ) { - if(!add_elasticsearch( account_id, oho )) + if(!add_elasticsearch( account_id, oho, b.block_num() )) return false; } } @@ -277,13 +277,16 @@ void elasticsearch_plugin_impl::doVisitor(const optional & oho) + const optional & oho, + const uint32_t block_number) { const auto &stats_obj = getStatsObject(account_id); const auto &ath = addNewEntry(stats_obj, account_id, oho); growStats(stats_obj, ath); - createBulkLine(ath); - prepareBulk(ath.id); + if(_elasticsearch_start_es_after_block == 0 || block_number > _elasticsearch_start_es_after_block) { + createBulkLine(ath); + prepareBulk(ath.id); + } cleanObjects(ath.id, account_id); if (curl && bulk_lines.size() >= limit_documents) { // we are in bulk time, ready to add data to elasticsearech @@ -437,12 +440,8 @@ void elasticsearch_plugin::plugin_set_program_options( void elasticsearch_plugin::plugin_initialize(const boost::program_options::variables_map& options) { database().applied_block.connect( [&]( const signed_block& b) { - if(my->_elasticsearch_start_es_after_block == 0 || b.block_num() > my->_elasticsearch_start_es_after_block) { - if (!my->update_account_histories(b)) { - FC_THROW_EXCEPTION(graphene::chain::plugin_exception, - "Error populating ES database, we are going to keep trying."); - } - } + if (!my->update_account_histories(b)) + FC_THROW_EXCEPTION(graphene::chain::plugin_exception, "Error populating ES database, we are going to keep trying."); } ); my->_oho_index = database().add_index< primary_index< operation_history_index > >();