diff --git a/db/db_impl/db_impl_compaction_flush.cc b/db/db_impl/db_impl_compaction_flush.cc index f552310e8..2dfd7713e 100644 --- a/db/db_impl/db_impl_compaction_flush.cc +++ b/db/db_impl/db_impl_compaction_flush.cc @@ -1704,6 +1704,7 @@ Status DBImpl::Flush(const FlushOptions& flush_options, ROCKS_LOG_INFO(immutable_db_options_.info_log, "[%s] Manual flush start.", cfh->GetName().c_str()); Status s; + cfh->imm.BeginManualOperation(); if (immutable_db_options_.atomic_flush) { s = AtomicFlushMemTables({cfh->cfd()}, flush_options, FlushReason::kManualFlush); @@ -1711,6 +1712,7 @@ Status DBImpl::Flush(const FlushOptions& flush_options, s = FlushMemTable(cfh->cfd(), flush_options, FlushReason::kManualFlush); } + cfh->imm.CompleteManualOperation(); ROCKS_LOG_INFO(immutable_db_options_.info_log, "[%s] Manual flush finished, status: %s\n", cfh->GetName().c_str(), s.ToString().c_str()); diff --git a/db/memtable_list.cc b/db/memtable_list.cc index f447ee735..0740a62a7 100644 --- a/db/memtable_list.cc +++ b/db/memtable_list.cc @@ -329,7 +329,8 @@ bool MemTableListVersion::TrimHistory(autovector* to_delete, // not yet started. bool MemTableList::IsFlushPending() const { if ((flush_requested_ && num_flush_not_started_ > 0) || - (num_flush_not_started_ >= min_write_buffer_number_to_merge_)) { + (num_flush_not_started_ >= min_write_buffer_number_to_merge_) || + (active_manuals_ && num_flush_not_started_)) { assert(imm_flush_needed.load(std::memory_order_relaxed)); return true; } diff --git a/db/memtable_list.h b/db/memtable_list.h index 866ecccb6..ebcac56fb 100644 --- a/db/memtable_list.h +++ b/db/memtable_list.h @@ -391,6 +391,12 @@ class MemTableList { void RemoveOldMemTables(uint64_t log_number, autovector* to_delete); + void BeginManualOperation() {++active_manuals_}; + + void CompleteManualOperation() { + assert(active_manuals >= 1); + --active_manuals_}; + private: friend Status InstallMemtableAtomicFlushResults( const autovector* imm_lists, @@ -436,6 +442,9 @@ class MemTableList { // Cached value of current_->HasHistory(). std::atomic current_has_history_; + + // count of manual flush/compactions active + std::atomic active_manuals_{0}; }; // Installs memtable atomic flush results.