From b16a5f93415b9b924b8820c1c84e6f396b418958 Mon Sep 17 00:00:00 2001 From: JaySon Date: Fri, 13 Dec 2024 16:17:42 +0800 Subject: [PATCH] Storages: Shutdown the LocalIndexScheduler before shutting down PageStorage/DeltaMergeStore (#9712) close pingcap/tiflash#9714 Storages: Shutdown the LocalIndexScheduler before shutting down PageStorage/DeltaMergeStore * Add a method `LocalIndexerScheduler::shutdown()` and ensure the running task are all finished before shutting down the GlobalPageStorage in `ContextShared::shutdown()`. Signed-off-by: JaySon-Huang --- dbms/src/Interpreters/Context.cpp | 7 ++++ dbms/src/Interpreters/executeQuery.cpp | 4 +- .../DeltaMerge/LocalIndexerScheduler.cpp | 17 ++++++-- .../DeltaMerge/LocalIndexerScheduler.h | 15 +++++-- dbms/src/Storages/DeltaMerge/Segment.cpp | 40 ++++++++++++------- dbms/src/Storages/DeltaMerge/Segment.h | 13 ++++++ .../DeltaMerge/tests/gtest_segment.cpp | 13 ++---- .../Page/tools/PageCtl/PageStorageCtlV3.cpp | 3 +- 8 files changed, 77 insertions(+), 35 deletions(-) diff --git a/dbms/src/Interpreters/Context.cpp b/dbms/src/Interpreters/Context.cpp index ab6ad976190..8e448d40c5a 100644 --- a/dbms/src/Interpreters/Context.cpp +++ b/dbms/src/Interpreters/Context.cpp @@ -271,6 +271,13 @@ struct ContextShared return; shutdown_called = true; + // The local index scheduler must be shutdown to stop all + // running tasks before shutting down `global_storage_pool`. + if (global_local_indexer_scheduler) + { + global_local_indexer_scheduler->shutdown(); + } + if (global_storage_pool) { // shutdown the gc task of global storage pool before diff --git a/dbms/src/Interpreters/executeQuery.cpp b/dbms/src/Interpreters/executeQuery.cpp index 51ad372db24..2897b56de64 100644 --- a/dbms/src/Interpreters/executeQuery.cpp +++ b/dbms/src/Interpreters/executeQuery.cpp @@ -311,7 +311,7 @@ std::tuple executeQueryImpl( if (elem.read_rows != 0) { - LOG_INFO( + LOG_DEBUG( execute_query_logger, "Read {} rows, {} in {:.3f} sec., {} rows/sec., {}/sec.", elem.read_rows, @@ -421,7 +421,7 @@ void logQueryPipeline(const LoggerPtr & logger, const BlockInputStreamPtr & in) in->dumpTree(log_buffer); return log_buffer.toString(); }; - LOG_INFO(logger, pipeline_log_str()); + LOG_DEBUG(logger, pipeline_log_str()); } BlockIO executeQuery(const String & query, Context & context, bool internal, QueryProcessingStage::Enum stage) diff --git a/dbms/src/Storages/DeltaMerge/LocalIndexerScheduler.cpp b/dbms/src/Storages/DeltaMerge/LocalIndexerScheduler.cpp index d513d146868..15ca84b8002 100644 --- a/dbms/src/Storages/DeltaMerge/LocalIndexerScheduler.cpp +++ b/dbms/src/Storages/DeltaMerge/LocalIndexerScheduler.cpp @@ -64,9 +64,9 @@ LocalIndexerScheduler::LocalIndexerScheduler(const Options & options) start(); } -LocalIndexerScheduler::~LocalIndexerScheduler() +void LocalIndexerScheduler::shutdown() { - LOG_INFO(logger, "LocalIndexerScheduler is destroying. Waiting scheduler and tasks to finish..."); + LOG_INFO(logger, "LocalIndexerScheduler is shutting down. Waiting scheduler and tasks to finish..."); // First quit the scheduler. Don't schedule more tasks. is_shutting_down = true; @@ -81,7 +81,15 @@ LocalIndexerScheduler::~LocalIndexerScheduler() // Then wait all running tasks to finish. pool.reset(); + LOG_INFO(logger, "LocalIndexerScheduler is shutdown."); +} +LocalIndexerScheduler::~LocalIndexerScheduler() +{ + if (!is_shutting_down) + { + shutdown(); + } LOG_INFO(logger, "LocalIndexerScheduler is destroyed"); } @@ -295,7 +303,10 @@ bool LocalIndexerScheduler::tryAddTaskToPool(std::unique_lock & lock } }; - RUNTIME_CHECK(pool); + if (is_shutting_down || !pool) + // shutting down, retry again + return false; + if (!pool->trySchedule(real_job)) // Concurrent task limit reached return false; diff --git a/dbms/src/Storages/DeltaMerge/LocalIndexerScheduler.h b/dbms/src/Storages/DeltaMerge/LocalIndexerScheduler.h index 53349740918..11f70fbdd84 100644 --- a/dbms/src/Storages/DeltaMerge/LocalIndexerScheduler.h +++ b/dbms/src/Storages/DeltaMerge/LocalIndexerScheduler.h @@ -93,6 +93,12 @@ class LocalIndexerScheduler ~LocalIndexerScheduler(); + /** + * @brief Stop the scheduler and wait for running tasks to finish. + * Note that this method won't clear the task pushed. + */ + void shutdown(); + /** * @brief Start the scheduler. In some tests we need to start scheduler * after some tasks are pushed. @@ -101,7 +107,7 @@ class LocalIndexerScheduler /** * @brief Blocks until there is no tasks remaining in the queue and there is no running tasks. - * Should be only used in tests. + * **Should be only used in tests**. */ void waitForFinish(); @@ -114,6 +120,7 @@ class LocalIndexerScheduler /** * @brief Drop all tasks matching specified keyspace id and table id. + * Note that this method won't drop the running tasks. */ size_t dropTasks(KeyspaceID keyspace_id, TableID table_id); @@ -147,9 +154,6 @@ class LocalIndexerScheduler void moveBackReadyTasks(std::unique_lock & lock); private: - bool is_started = false; - std::thread scheduler_thread; - /// Try to add a task to the pool. Returns false if the pool is full /// (for example, reaches concurrent task limit or memory limit). /// When pool is full, we will not try to schedule any more tasks at this moment. @@ -160,6 +164,9 @@ class LocalIndexerScheduler /// heavy pressure. bool tryAddTaskToPool(std::unique_lock & lock, const InternalTaskPtr & task); + std::thread scheduler_thread; + bool is_started = false; + KeyspaceID last_schedule_keyspace_id = 0; std::map last_schedule_table_id_by_ks; diff --git a/dbms/src/Storages/DeltaMerge/Segment.cpp b/dbms/src/Storages/DeltaMerge/Segment.cpp index 201c2075113..c355f12d46d 100644 --- a/dbms/src/Storages/DeltaMerge/Segment.cpp +++ b/dbms/src/Storages/DeltaMerge/Segment.cpp @@ -417,24 +417,34 @@ SegmentPtr Segment::restoreSegment( // DMContext & context, PageIdU64 segment_id) { - Page page = context.storage_pool->metaReader()->read(segment_id); // not limit restore - - ReadBufferFromMemory buf(page.data.begin(), page.data.size()); Segment::SegmentMetaInfo segment_info; - readSegmentMetaInfo(buf, segment_info); + try + { + Page page = context.storage_pool->metaReader()->read(segment_id); // not limit restore - auto delta = DeltaValueSpace::restore(context, segment_info.range, segment_info.delta_id); - auto stable = StableValueSpace::restore(context, segment_info.stable_id); - auto segment = std::make_shared( - parent_log, - segment_info.epoch, - segment_info.range, - segment_id, - segment_info.next_segment_id, - delta, - stable); + ReadBufferFromMemory buf(page.data.begin(), page.data.size()); + readSegmentMetaInfo(buf, segment_info); - return segment; + auto delta = DeltaValueSpace::restore(context, segment_info.range, segment_info.delta_id); + auto stable = StableValueSpace::restore(context, segment_info.stable_id); + auto segment = std::make_shared( + parent_log, + segment_info.epoch, + segment_info.range, + segment_id, + segment_info.next_segment_id, + delta, + stable); + + return segment; + } + catch (DB::Exception & e) + { + e.addMessage(fmt::format("while restoreSegment, segment_id={}", segment_id)); + e.rethrow(); + } + RUNTIME_CHECK_MSG(false, "unreachable"); + return {}; } Segment::SegmentMetaInfos Segment::readAllSegmentsMetaInfoInRange( // diff --git a/dbms/src/Storages/DeltaMerge/Segment.h b/dbms/src/Storages/DeltaMerge/Segment.h index 0af4ab11cb7..22b50f8ad8c 100644 --- a/dbms/src/Storages/DeltaMerge/Segment.h +++ b/dbms/src/Storages/DeltaMerge/Segment.h @@ -170,6 +170,19 @@ class Segment PageIdU64 next_segment_id{}; PageIdU64 delta_id{}; PageIdU64 stable_id{}; + + String toString() const + { + return fmt::format( + "{{version={} epoch={} range={} segment_id={} next_segment_id={} delta_id={} stable_id={}}}", + version, + epoch, + range.toString(), + segment_id, + next_segment_id, + delta_id, + stable_id); + } }; using SegmentMetaInfos = std::vector; diff --git a/dbms/src/Storages/DeltaMerge/tests/gtest_segment.cpp b/dbms/src/Storages/DeltaMerge/tests/gtest_segment.cpp index 89dcf85b37f..5cdf249fe0f 100644 --- a/dbms/src/Storages/DeltaMerge/tests/gtest_segment.cpp +++ b/dbms/src/Storages/DeltaMerge/tests/gtest_segment.cpp @@ -48,11 +48,7 @@ namespace DB::ErrorCodes extern const int DT_DELTA_INDEX_ERROR; } -namespace DB -{ -namespace DM -{ -namespace GC +namespace DB::DM::GC { bool shouldCompactStableWithTooMuchDataOutOfSegmentRange( const DMContext & context, // @@ -63,7 +59,7 @@ bool shouldCompactStableWithTooMuchDataOutOfSegmentRange( double invalid_data_ratio_threshold, const LoggerPtr & log); } -namespace tests +namespace DB::DM::tests { class SegmentOperationTest : public SegmentTestBasic @@ -1366,7 +1362,4 @@ try } CATCH - -} // namespace tests -} // namespace DM -} // namespace DB +} // namespace DB::DM::tests diff --git a/dbms/src/Storages/Page/tools/PageCtl/PageStorageCtlV3.cpp b/dbms/src/Storages/Page/tools/PageCtl/PageStorageCtlV3.cpp index 404951f74fe..c6d52f515f1 100644 --- a/dbms/src/Storages/Page/tools/PageCtl/PageStorageCtlV3.cpp +++ b/dbms/src/Storages/Page/tools/PageCtl/PageStorageCtlV3.cpp @@ -12,6 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. +#include #include #include #include @@ -871,7 +872,7 @@ class PageStorageControlV3 ChecksumClass digest; digest.update(buffer, size); auto checksum = digest.checksum(); - fmt::print("checksum: 0x{:X}\n", checksum); + fmt::println("checksum: 0x{:X}", checksum); auto hex_str = Redact::keyToHexString(buffer, size); delete[] buffer;