Skip to content

Commit

Permalink
*: Remove RegionPtrWithBlock (#9735)
Browse files Browse the repository at this point in the history
ref #6233

* Remove `RegionPtrWithBlock`
* Remove `TiDB::TableInfo::schema_version`

Signed-off-by: JaySon-Huang <tshent@qq.com>
  • Loading branch information
JaySon-Huang authored Dec 19, 2024
1 parent ea36205 commit bfe0366
Show file tree
Hide file tree
Showing 18 changed files with 73 additions and 365 deletions.
2 changes: 0 additions & 2 deletions dbms/src/Debug/DBGInvoker.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -101,8 +101,6 @@ DBGInvoker::DBGInvoker()

regSchemalessFunc("region_snapshot", MockRaftCommand::dbgFuncRegionSnapshot);
regSchemalessFunc("region_snapshot_data", MockRaftCommand::dbgFuncRegionSnapshotWithData);
regSchemalessFunc("region_snapshot_pre_handle_block", /**/ MockRaftCommand::dbgFuncRegionSnapshotPreHandleBlock);
regSchemalessFunc("region_snapshot_apply_block", /* */ MockRaftCommand::dbgFuncRegionSnapshotApplyBlock);
regSchemalessFunc("region_snapshot_pre_handle_file", /* */ MockRaftCommand::dbgFuncRegionSnapshotPreHandleDTFiles);
regSchemalessFunc(
"region_snapshot_pre_handle_file_pks",
Expand Down
10 changes: 0 additions & 10 deletions dbms/src/Debug/dbgKVStore/dbgFuncMockRaftCommand.h
Original file line number Diff line number Diff line change
Expand Up @@ -62,16 +62,6 @@ struct MockRaftCommand
// ./storage-client.sh "DBGInvoke region_ingest_sst(database_name, table_name, region_id, start, end)"
static void dbgFuncIngestSST(Context & context, const ASTs & args, DBGInvoker::Printer output);

// Apply snapshot for a region. (pre-handle)
// Usage:
// ./storages-client.sh "DBGInvoke region_snapshot_pre_handle_block(database_name, table_name, region_id, start, end, handle_id1, tso1, del1, r1_c1, r1_c2, ..., handle_id2, tso2, del2, r2_c1, r2_c2, ... )"
static void dbgFuncRegionSnapshotPreHandleBlock(Context & context, const ASTs & args, DBGInvoker::Printer output);

// Apply snapshot for a region. (apply a pre-handle snapshot)
// Usage:
// ./storages-client.sh "DBGInvoke region_snapshot_apply_block(region_id)"
static void dbgFuncRegionSnapshotApplyBlock(Context & context, const ASTs & args, DBGInvoker::Printer output);

// Simulate a region pre-handle snapshot data to DTFiles
// Usage:
// ./storage-client.sh "DBGInvoke region_snapshot_pre_handle_file(database_name, table_name, region_id, start, end, schema_string, pk_name[, test-fields=1, cfs="write,default"])"
Expand Down
180 changes: 12 additions & 168 deletions dbms/src/Debug/dbgKVStore/dbgFuncMockRaftSnapshot.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

#include <Common/Exception.h>
#include <Common/FailPoint.h>
#include <Common/FmtUtils.h>
#include <Common/setThreadName.h>
Expand Down Expand Up @@ -87,9 +88,10 @@ RegionPtr GenDbgRegionSnapshotWithData(Context & context, const ASTs & args)
// Get start key and end key form multiple column if it is clustered_index.
std::vector<Field> start_keys;
std::vector<Field> end_keys;
const auto & pk_idx_cols = table_info.getPrimaryIndexInfo().idx_cols;
for (size_t i = 0; i < handle_column_size; i++)
{
auto & column_info = table_info.columns[table_info.getPrimaryIndexInfo().idx_cols[i].offset];
auto & column_info = table_info.columns[pk_idx_cols[i].offset];
auto start_field
= RegionBench::convertField(column_info, typeid_cast<const ASTLiteral &>(*args[3 + i]).value);
TiDB::DatumBumpy start_datum = TiDB::DatumBumpy(start_field, column_info.tp);
Expand All @@ -108,8 +110,7 @@ RegionPtr GenDbgRegionSnapshotWithData(Context & context, const ASTs & args)

const size_t len = table->table_info.columns.size() + 3;

if ((args_end - args_begin) % len)
throw Exception("Number of insert values and columns do not match.", ErrorCodes::LOGICAL_ERROR);
RUNTIME_CHECK_MSG((((args_end - args_begin) % len) == 0), "Number of insert values and columns do not match.");

// Parse row values
for (auto it = args_begin; it != args_end; it += len)
Expand Down Expand Up @@ -170,9 +171,15 @@ void MockRaftCommand::dbgFuncRegionSnapshotWithData(Context & context, const AST
auto table_id = region->getMappedTableID();
auto cnt = region->writeCFCount();

// Mock to apply a snapshot with data in `region`
// Mock to apply a snapshot with committed rows in `region`
auto & tmt = context.getTMTContext();
context.getTMTContext().getKVStore()->checkAndApplyPreHandledSnapshot<RegionPtrWithBlock>(region, tmt);
tmt.getKVStore()->checkAndApplyPreHandledSnapshot<RegionPtrWithSnapshotFiles>(region, tmt);
// Decode the committed rows into Block and flush to the IStorage layer.
// This dose not ensure the atomic of "apply snapshot". But we only use it for writing tests now.
if (auto region_applied = tmt.getKVStore()->getRegion(region_id); region_applied)
{
tmt.getRegionTable().tryWriteBlockByRegion(region_applied);
}
output(fmt::format("put region #{}, range{} to table #{} with {} records", region_id, range_string, table_id, cnt));
}

Expand Down Expand Up @@ -488,30 +495,10 @@ void MockRaftCommand::dbgFuncIngestSST(Context & context, const ASTs & args, DBG
struct GlobalRegionMap
{
using Key = std::string;
using BlockVal = std::pair<RegionPtr, RegionPtrWithBlock::CachePtr>;
std::unordered_map<Key, BlockVal> regions_block;
using SnapPath = std::pair<RegionPtr, std::vector<DM::ExternalDTFileInfo>>;
std::unordered_map<Key, SnapPath> regions_snap_files;
std::mutex mutex;

void insertRegionCache(const Key & name, BlockVal && val)
{
auto _ = std::lock_guard(mutex);
regions_block[name] = std::move(val);
}
BlockVal popRegionCache(const Key & name)
{
auto _ = std::lock_guard(mutex);
if (auto it = regions_block.find(name); it == regions_block.end())
throw Exception(std::string(__PRETTY_FUNCTION__) + " ... " + name);
else
{
auto ret = std::move(it->second);
regions_block.erase(it);
return ret;
}
}

void insertRegionSnap(const Key & name, SnapPath && val)
{
auto _ = std::lock_guard(mutex);
Expand All @@ -533,149 +520,6 @@ struct GlobalRegionMap

static GlobalRegionMap GLOBAL_REGION_MAP;

/// Mock to pre-decode snapshot to block then apply

/// Pre-decode region data into block cache and remove committed data from `region`
RegionPtrWithBlock::CachePtr GenRegionPreDecodeBlockData(const RegionPtr & region, Context & context)
{
auto keyspace_id = region->getKeyspaceID();
const auto & tmt = context.getTMTContext();
{
Timestamp gc_safe_point = 0;
if (auto pd_client = tmt.getPDClient(); !pd_client->isMock())
{
gc_safe_point = PDClientHelper::getGCSafePointWithRetry(
pd_client,
keyspace_id,
false,
context.getSettingsRef().safe_point_update_interval_seconds);
}
/**
* In 5.0.1, feature `compaction filter` is enabled by default. Under such feature tikv will do gc in write & default cf individually.
* If some rows were updated and add tiflash replica, tiflash store may receive region snapshot with unmatched data in write & default cf sst files.
*/
region->tryCompactionFilter(gc_safe_point);
}
std::optional<RegionDataReadInfoList> data_list_read = std::nullopt;
try
{
data_list_read = ReadRegionCommitCache(region, true);
if (!data_list_read)
return nullptr;
}
catch (const Exception & e)
{
if (e.code() == ErrorCodes::ILLFORMAT_RAFT_ROW)
{
// br or lighting may write illegal data into tikv, skip pre-decode and ingest sst later.
LOG_WARNING(
Logger::get(__PRETTY_FUNCTION__),
"Got error while reading region committed cache: {}. Skip pre-decode and keep original cache.",
e.displayText());
// set data_list_read and let apply snapshot process use empty block
data_list_read = RegionDataReadInfoList();
}
else
throw;
}

TableID table_id = region->getMappedTableID();
Int64 schema_version = DEFAULT_UNSPECIFIED_SCHEMA_VERSION;
Block res_block;

const auto atomic_decode = [&](bool force_decode) -> bool {
Stopwatch watch;
auto storage = tmt.getStorages().get(keyspace_id, table_id);
if (storage == nullptr || storage->isTombstone())
{
if (!force_decode) // Need to update.
return false;
if (storage == nullptr) // Table must have just been GC-ed.
return true;
}

/// Get a structure read lock throughout decode, during which schema must not change.
TableStructureLockHolder lock;
try
{
lock = storage->lockStructureForShare(getThreadNameAndID());
}
catch (DB::Exception & e)
{
// If the storage is physical dropped (but not removed from `ManagedStorages`) when we want to decode snapshot, consider the decode done.
if (e.code() == ErrorCodes::TABLE_IS_DROPPED)
return true;
else
throw;
}

DecodingStorageSchemaSnapshotConstPtr decoding_schema_snapshot
= storage->getSchemaSnapshotAndBlockForDecoding(lock, false, true).first;
res_block = createBlockSortByColumnID(decoding_schema_snapshot);
auto reader = RegionBlockReader(decoding_schema_snapshot);
return reader.read(res_block, *data_list_read, force_decode);
};

/// In TiFlash, the actions between applying raft log and schema changes are not strictly synchronized.
/// There could be a chance that some raft logs come after a table gets tombstoned. Take care of it when
/// decoding data. Check the test case for more details.
FAIL_POINT_PAUSE(FailPoints::pause_before_apply_raft_snapshot);

if (!atomic_decode(false))
{
tmt.getSchemaSyncerManager()->syncSchemas(context, keyspace_id);

if (!atomic_decode(true))
throw Exception(
"Pre-decode " + region->toString() + " cache to table " + std::to_string(table_id) + " block failed",
ErrorCodes::LOGICAL_ERROR);
}

RemoveRegionCommitCache(region, *data_list_read);

return std::make_unique<RegionPreDecodeBlockData>(std::move(res_block), schema_version, std::move(*data_list_read));
}

void MockRaftCommand::dbgFuncRegionSnapshotPreHandleBlock(
Context & context,
const ASTs & args,
DBGInvoker::Printer output)
{
FmtBuffer fmt_buf;
auto region = GenDbgRegionSnapshotWithData(context, args);
const auto region_name = "__snap_" + std::to_string(region->id());
fmt_buf.fmtAppend("pre-handle {} snapshot with data {}", region->toString(false), region->dataInfo());
auto & tmt = context.getTMTContext();
auto block_cache = GenRegionPreDecodeBlockData(region, tmt.getContext());
fmt_buf.append(", pre-decode block cache");
fmt_buf.fmtAppend(
" {{ schema_version: ?, data_list size: {}, block row: {} col: {} bytes: {} }}",
block_cache->data_list_read.size(),
block_cache->block.rows(),
block_cache->block.columns(),
block_cache->block.bytes());
GLOBAL_REGION_MAP.insertRegionCache(region_name, {region, std::move(block_cache)});
output(fmt_buf.toString());
}

void MockRaftCommand::dbgFuncRegionSnapshotApplyBlock(Context & context, const ASTs & args, DBGInvoker::Printer output)
{
if (args.size() != 1)
{
throw Exception("Args not matched, should be: region-id", ErrorCodes::BAD_ARGUMENTS);
}

auto region_id = static_cast<RegionID>(safeGet<UInt64>(typeid_cast<const ASTLiteral &>(*args.front()).value));
auto [region, block_cache] = GLOBAL_REGION_MAP.popRegionCache("__snap_" + std::to_string(region_id));
auto & tmt = context.getTMTContext();
context.getTMTContext().getKVStore()->checkAndApplyPreHandledSnapshot<RegionPtrWithBlock>(
{region, std::move(block_cache)},
tmt);

output(fmt::format("success apply {} with block cache", region->id()));
}


/// Mock to pre-decode snapshot to DTFile(s) then apply

// Simulate a region pre-handle snapshot data to DTFiles
Expand Down
4 changes: 2 additions & 2 deletions dbms/src/Debug/dbgKVStore/dbgFuncRegion.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ void dbgFuncPutRegion(Context & context, const ASTs & args, DBGInvoker::Printer

TMTContext & tmt = context.getTMTContext();
RegionPtr region = RegionBench::createRegion(table_info, region_id, start_keys, end_keys);
tmt.getKVStore()->onSnapshot<RegionPtrWithBlock>(region, nullptr, 0, tmt);
tmt.getKVStore()->onSnapshot<RegionPtrWithSnapshotFiles>(region, nullptr, 0, tmt);

output(fmt::format(
"put region #{}, range{} to table #{} with kvstore.onSnapshot",
Expand All @@ -96,7 +96,7 @@ void dbgFuncPutRegion(Context & context, const ASTs & args, DBGInvoker::Printer

TMTContext & tmt = context.getTMTContext();
RegionPtr region = RegionBench::createRegion(table_id, region_id, start, end);
tmt.getKVStore()->onSnapshot<RegionPtrWithBlock>(region, nullptr, 0, tmt);
tmt.getKVStore()->onSnapshot<RegionPtrWithSnapshotFiles>(region, nullptr, 0, tmt);

output(fmt::format(
"put region #{}, range[{}, {}) to table #{} with kvstore.onSnapshot",
Expand Down
2 changes: 1 addition & 1 deletion dbms/src/Debug/dbgNaturalDag.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -232,7 +232,7 @@ void NaturalDag::buildTables(Context & context)
auto raft_index = RAFT_INIT_LOG_INDEX;
region_meta.setApplied(raft_index, RAFT_INIT_LOG_TERM);
RegionPtr region_ptr = RegionBench::makeRegion(std::move(region_meta));
tmt.getKVStore()->onSnapshot<RegionPtrWithBlock>(region_ptr, nullptr, 0, tmt);
tmt.getKVStore()->onSnapshot<RegionPtrWithSnapshotFiles>(region_ptr, nullptr, 0, tmt);

auto & pairs = region.pairs;
for (auto & pair : pairs)
Expand Down
2 changes: 1 addition & 1 deletion dbms/src/Debug/dbgTools.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -581,7 +581,7 @@ void concurrentBatchInsert(
Regions regions
= createRegions(table_info.id, concurrent_num, key_num_each_region, handle_begin, curr_max_region_id + 1);
for (const RegionPtr & region : regions)
debug_kvstore.onSnapshot<RegionPtrWithBlock>(region, nullptr, 0, tmt);
debug_kvstore.onSnapshot<RegionPtrWithSnapshotFiles>(region, nullptr, 0, tmt);

std::list<std::thread> threads;
for (Int64 i = 0; i < concurrent_num; i++, handle_begin += key_num_each_region)
Expand Down
26 changes: 8 additions & 18 deletions dbms/src/Storages/KVStore/Decode/PartitionStreams.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ static void inline writeCommittedBlockDataIntoStorage(
template <typename ReadList>
static inline bool atomicReadWrite(
AtomicReadWriteCtx & rw_ctx,
const RegionPtrWithBlock & region,
const RegionPtr & region,
ReadList & data_list_read,
bool force_decode)
{
Expand Down Expand Up @@ -147,9 +147,7 @@ static inline bool atomicReadWrite(
should_handle_version_col = false;
}

// Currently, RegionPtrWithBlock with a not-null CachePtr is only used in debug functions
// to apply a pre-decoded snapshot. So it will not take place here.
// In short, we always decode here because there is no pre-decode cache.
// Decode `data_list_read` according to the schema snapshot into `Block`
{
LOG_TRACE(
rw_ctx.log,
Expand All @@ -169,6 +167,7 @@ static inline bool atomicReadWrite(
GET_METRIC(tiflash_raft_write_data_to_storage_duration_seconds, type_decode)
.Observe(rw_ctx.region_decode_cost / 1000.0);
}

if constexpr (std::is_same_v<ReadList, RegionDataReadInfoList>)
{
RUNTIME_CHECK(block_ptr != nullptr);
Expand Down Expand Up @@ -198,12 +197,12 @@ static inline bool atomicReadWrite(

template DM::WriteResult writeRegionDataToStorage<RegionUncommittedDataList>(
Context & context,
const RegionPtrWithBlock & region,
const RegionPtr & region,
RegionUncommittedDataList & data_list_read,
const LoggerPtr & log);
template DM::WriteResult writeRegionDataToStorage<RegionDataReadInfoList>(
Context & context,
const RegionPtrWithBlock & region,
const RegionPtr & region,
RegionDataReadInfoList & data_list_read,
const LoggerPtr & log);

Expand All @@ -212,7 +211,7 @@ template DM::WriteResult writeRegionDataToStorage<RegionDataReadInfoList>(
template <typename ReadList>
DM::WriteResult writeRegionDataToStorage(
Context & context,
const RegionPtrWithBlock & region,
const RegionPtr & region,
ReadList & data_list_read,
const LoggerPtr & log)
{
Expand Down Expand Up @@ -431,21 +430,12 @@ static inline void reportUpstreamLatency(const RegionDataReadInfoList & data_lis

DM::WriteResult RegionTable::writeCommittedByRegion(
Context & context,
const RegionPtrWithBlock & region,
const RegionPtr & region,
RegionDataReadInfoList & data_list_to_remove,
const LoggerPtr & log,
bool lock_region)
{
std::optional<RegionDataReadInfoList> maybe_data_list_read = std::nullopt;
if (region.pre_decode_cache)
{
// If schema version changed, use the kv data to rebuild block cache
maybe_data_list_read = std::move(region.pre_decode_cache->data_list_read);
}
else
{
maybe_data_list_read = ReadRegionCommitCache(region, lock_region);
}
std::optional<RegionDataReadInfoList> maybe_data_list_read = ReadRegionCommitCache(region, lock_region);

if (!maybe_data_list_read.has_value())
return std::nullopt;
Expand Down
3 changes: 1 addition & 2 deletions dbms/src/Storages/KVStore/Decode/PartitionStreams.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ class Region;
using RegionPtr = std::shared_ptr<Region>;
class StorageDeltaMerge;
class TMTContext;
struct RegionPtrWithBlock;

std::optional<RegionDataReadInfoList> ReadRegionCommitCache(const RegionPtr & region, bool lock_region);
void RemoveRegionCommitCache(
Expand All @@ -48,7 +47,7 @@ Block GenRegionBlockDataWithSchema(
template <typename ReadList>
DM::WriteResult writeRegionDataToStorage(
Context & context,
const RegionPtrWithBlock & region,
const RegionPtr & region,
ReadList & data_list_read,
const LoggerPtr & log);

Expand Down
2 changes: 1 addition & 1 deletion dbms/src/Storages/KVStore/Decode/RegionTable.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -250,7 +250,7 @@ void RegionTable::removeRegion(const RegionID region_id, bool remove_data, const
}
}

RegionDataReadInfoList RegionTable::tryWriteBlockByRegion(const RegionPtrWithBlock & region)
RegionDataReadInfoList RegionTable::tryWriteBlockByRegion(const RegionPtr & region)
{
const RegionID region_id = region->id();

Expand Down
Loading

0 comments on commit bfe0366

Please # to comment.