Skip to content

Commit

Permalink
Merge pull request #3296 from nanocurrency/election_scheduler_predicates
Browse files Browse the repository at this point in the history
Election scheduler predicates
  • Loading branch information
clemahieu committed May 27, 2021
1 parent 03debef commit ac7a3e2
Show file tree
Hide file tree
Showing 6 changed files with 115 additions and 20 deletions.
70 changes: 70 additions & 0 deletions nano/core_test/active_transactions.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1458,3 +1458,73 @@ TEST (active_transactions, vacancy)
ASSERT_EQ (1, node.active.vacancy ());
ASSERT_EQ (0, node.active.size ());
}

// Ensure transactions in excess of capacity are removed in fifo order
TEST (active_transactions, fifo)
{
nano::system system;
nano::node_config config{ nano::get_available_port (), system.logging };
config.active_elections_size = 1;
auto & node = *system.add_node (config);
nano::keypair key0;
nano::keypair key1;
nano::state_block_builder builder;
// Construct two pending entries that can be received simultaneously
auto send0 = builder.make_block ()
.account (nano::dev_genesis_key.pub)
.previous (nano::genesis_hash)
.representative (nano::dev_genesis_key.pub)
.link (key0.pub)
.balance (nano::genesis_amount - 1)
.sign (nano::dev_genesis_key.prv, nano::dev_genesis_key.pub)
.work (*system.work.generate (nano::genesis_hash))
.build_shared ();
ASSERT_EQ (nano::process_result::progress, node.process (*send0).code);
nano::blocks_confirm (node, { send0 }, true);
ASSERT_TIMELY (1s, node.block_confirmed (send0->hash ()));
ASSERT_TIMELY (1s, node.active.empty ());
auto send1 = builder.make_block ()
.account (nano::dev_genesis_key.pub)
.previous (send0->hash ())
.representative (nano::dev_genesis_key.pub)
.link (key1.pub)
.balance (nano::genesis_amount - 2)
.sign (nano::dev_genesis_key.prv, nano::dev_genesis_key.pub)
.work (*system.work.generate (send0->hash ()))
.build_shared ();
ASSERT_EQ (nano::process_result::progress, node.process (*send1).code);
nano::blocks_confirm (node, { send1 }, true);
ASSERT_TIMELY (1s, node.block_confirmed (send1->hash ()));
ASSERT_TIMELY (1s, node.active.empty ());

auto receive0 = builder.make_block ()
.account (key0.pub)
.previous (0)
.representative (nano::dev_genesis_key.pub)
.link (send0->hash ())
.balance (1)
.sign (key0.prv, key0.pub)
.work (*system.work.generate (key0.pub))
.build_shared ();
ASSERT_EQ (nano::process_result::progress, node.process (*receive0).code);
auto receive1 = builder.make_block ()
.account (key1.pub)
.previous (0)
.representative (nano::dev_genesis_key.pub)
.link (send1->hash ())
.balance (1)
.sign (key1.prv, key1.pub)
.work (*system.work.generate (key1.pub))
.build_shared ();
ASSERT_EQ (nano::process_result::progress, node.process (*receive1).code);
node.scheduler.manual (receive0);
// Ensure first transaction becomes active
ASSERT_TIMELY (1s, node.active.election (receive0->qualified_root ()) != nullptr);
node.scheduler.manual (receive1);
// Ensure second transaction becomes active
ASSERT_TIMELY (1s, node.active.election (receive1->qualified_root ()) != nullptr);
// Ensure excess transactions get trimmed
ASSERT_TIMELY (1s, node.active.size () == 1);
// Ensure the surviving transaction is the least recently inserted
ASSERT_TIMELY (1s, node.active.election (receive1->qualified_root ()) != nullptr);
}
3 changes: 2 additions & 1 deletion nano/core_test/election_scheduler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,8 @@ TEST (election_scheduler, flush_vacancy)
.sign (nano::dev_genesis_key.prv, nano::dev_genesis_key.pub)
.work (*system.work.generate (nano::genesis_hash))
.build_shared ();
node.scheduler.manual (send);
ASSERT_EQ (nano::process_result::progress, node.process (*send).code);
node.scheduler.activate (nano::dev_genesis_key.pub, node.store.tx_begin_read ());
// Ensure this call does not block, even though no elections can be activated.
node.scheduler.flush ();
ASSERT_EQ (0, node.active.size ());
Expand Down
14 changes: 11 additions & 3 deletions nano/node/active_transactions.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -317,9 +317,7 @@ void nano::active_transactions::request_confirm (nano::unique_lock<nano::mutex>
{
bool const confirmed_l (election_l->confirmed ());

unconfirmed_count_l += !confirmed_l;
bool const overflow_l (unconfirmed_count_l > node.config.active_elections_size && election_l->election_start < election_ttl_cutoff_l);
if (overflow_l || election_l->transition_time (solicitor))
if (election_l->transition_time (solicitor))
{
if (election_l->optimistic () && election_l->failed ())
{
Expand Down Expand Up @@ -1046,6 +1044,16 @@ void nano::active_transactions::erase_hash (nano::block_hash const & hash_a)
debug_assert (erased == 1);
}

void nano::active_transactions::erase_oldest ()
{
nano::unique_lock<nano::mutex> lock (mutex);
if (!roots.empty ())
{
auto item = roots.get<tag_random_access> ().front ();
cleanup_election (lock, *item.election);
}
}

bool nano::active_transactions::empty ()
{
nano::lock_guard<nano::mutex> lock (mutex);
Expand Down
1 change: 1 addition & 0 deletions nano/node/active_transactions.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,7 @@ class active_transactions final
std::vector<std::shared_ptr<nano::election>> list_active (size_t = std::numeric_limits<size_t>::max ());
void erase (nano::block const &);
void erase_hash (nano::block_hash const &);
void erase_oldest ();
bool empty ();
size_t size ();
void stop ();
Expand Down
42 changes: 28 additions & 14 deletions nano/node/election_scheduler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -87,22 +87,45 @@ size_t nano::election_scheduler::priority_queue_size () const
return priority.size ();
}

bool nano::election_scheduler::priority_queue_predicate () const
{
return node.active.vacancy () > 0 && !priority.empty ();
}

bool nano::election_scheduler::manual_queue_predicate () const
{
return !manual_queue.empty ();
}

bool nano::election_scheduler::overfill_predicate () const
{
return node.active.vacancy () < 0;
}

void nano::election_scheduler::run ()
{
nano::thread_role::set (nano::thread_role::name::election_scheduler);
nano::unique_lock<nano::mutex> lock{ mutex };
while (!stopped)
{
condition.wait (lock, [this] () {
auto vacancy = node.active.vacancy ();
auto has_vacancy = vacancy > 0;
auto available = !priority.empty () || !manual_queue.empty ();
return stopped || (has_vacancy && available);
return stopped || priority_queue_predicate () || manual_queue_predicate () || overfill_predicate ();
});
debug_assert ((std::this_thread::yield (), true)); // Introduce some random delay in debug builds
if (!stopped)
{
if (!priority.empty ())
if (overfill_predicate ())
{
node.active.erase_oldest ();
}
else if (manual_queue_predicate ())
{
auto const [block, previous_balance, election_behavior, confirmation_action] = manual_queue.front ();
nano::unique_lock<nano::mutex> lock2 (node.active.mutex);
node.active.insert_impl (lock2, block, previous_balance, election_behavior, confirmation_action);
manual_queue.pop_front ();
}
else if (priority_queue_predicate ())
{
auto block = priority.top ();
std::shared_ptr<nano::election> election;
Expand All @@ -113,15 +136,6 @@ void nano::election_scheduler::run ()
election->transition_active ();
}
priority.pop ();
++priority_queued;
}
if (!manual_queue.empty ())
{
auto const [block, previous_balance, election_behavior, confirmation_action] = manual_queue.front ();
nano::unique_lock<nano::mutex> lock2 (node.active.mutex);
node.active.insert_impl (lock2, block, previous_balance, election_behavior, confirmation_action);
manual_queue.pop_front ();
++manual_queued;
}
notify ();
}
Expand Down
5 changes: 3 additions & 2 deletions nano/node/election_scheduler.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -36,10 +36,11 @@ class election_scheduler final
private:
void run ();
bool empty_locked () const;
bool priority_queue_predicate () const;
bool manual_queue_predicate () const;
bool overfill_predicate () const;
nano::prioritization priority;
uint64_t priority_queued{ 0 };
std::deque<std::tuple<std::shared_ptr<nano::block>, boost::optional<nano::uint128_t>, nano::election_behavior, std::function<void (std::shared_ptr<nano::block>)>>> manual_queue;
uint64_t manual_queued{ 0 };
nano::node & node;
bool stopped;
nano::condition_variable condition;
Expand Down

0 comments on commit ac7a3e2

Please # to comment.