Skip to content
New issue

Have a question about this project? # for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “#”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? # to your account

Feature/enable fan out for the same source #314

Open
wants to merge 2 commits into
base: develop
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 14 additions & 10 deletions src/Storages/Streaming/ProxyStream.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -107,11 +107,12 @@ ProxyStream::ProxyStream(
QueryProcessingStage::Enum ProxyStream::getQueryProcessingStage(
ContextPtr context_,
QueryProcessingStage::Enum to_stage,
const StorageSnapshotPtr & storage_snapshot,
const StorageSnapshotPtr & /*storage_snapshot*/,
SelectQueryInfo & query_info) const
{
if (storage)
return storage->getQueryProcessingStage(context_, to_stage, storage_snapshot, query_info);
return storage->getQueryProcessingStage(
context_, to_stage, storage->getStorageSnapshot(storage->getInMemoryMetadataPtr(), context_), query_info);
else
/// When it is created by subquery not a table
return QueryProcessingStage::FetchColumns;
Expand Down Expand Up @@ -174,7 +175,7 @@ void ProxyStream::read(
void ProxyStream::doRead(
QueryPlan & query_plan,
const Names & column_names,
const StorageSnapshotPtr & storage_snapshot,
const StorageSnapshotPtr & /*storage_snapshot*/,
SelectQueryInfo & query_info,
ContextPtr context_,
QueryProcessingStage::Enum processed_stage,
Expand Down Expand Up @@ -206,32 +207,35 @@ void ProxyStream::doRead(
return;
}

assert(storage);
auto proxy_storage_snapshot = storage->getStorageSnapshot(storage->getInMemoryMetadataPtr(), context_);
if (auto * view = storage->as<StorageView>())
{
auto view_context = createProxySubqueryContext(context_, query_info, isStreamingQuery());
view->read(query_plan, column_names, storage_snapshot, query_info, view_context, processed_stage, max_block_size, num_streams);
view->read(
query_plan, column_names, proxy_storage_snapshot, query_info, view_context, processed_stage, max_block_size, num_streams);
query_plan.addInterpreterContext(view_context);
return;
}
else if (auto * materialized_view = storage->as<StorageMaterializedView>())
return materialized_view->read(
query_plan, column_names, storage_snapshot, query_info, context_, processed_stage, max_block_size, num_streams);
query_plan, column_names, proxy_storage_snapshot, query_info, context_, processed_stage, max_block_size, num_streams);
else if (auto * external_stream = storage->as<StorageExternalStream>())
return external_stream->read(
query_plan, column_names, storage_snapshot, query_info, context_, processed_stage, max_block_size, num_streams);
query_plan, column_names, proxy_storage_snapshot, query_info, context_, processed_stage, max_block_size, num_streams);
else if (auto * random_stream = storage->as<StorageRandom>())
return random_stream->read(
query_plan, column_names, storage_snapshot, query_info, context_, processed_stage, max_block_size, num_streams);
query_plan, column_names, proxy_storage_snapshot, query_info, context_, processed_stage, max_block_size, num_streams);
else if (auto * file_stream = storage->as<StorageFile>())
return file_stream->read(
query_plan, column_names, storage_snapshot, query_info, context_, processed_stage, max_block_size, num_streams);
query_plan, column_names, proxy_storage_snapshot, query_info, context_, processed_stage, max_block_size, num_streams);
else if (nested_proxy_storage)
return nested_proxy_storage->read(
query_plan, column_names, storage_snapshot, query_info, context_, processed_stage, max_block_size, num_streams);
query_plan, column_names, proxy_storage_snapshot, query_info, context_, processed_stage, max_block_size, num_streams);

auto * distributed = storage->as<StorageStream>();
assert(distributed);
distributed->read(query_plan, column_names, storage_snapshot, query_info, context_, processed_stage, max_block_size, num_streams);
distributed->read(query_plan, column_names, proxy_storage_snapshot, query_info, context_, processed_stage, max_block_size, num_streams);
}

Names ProxyStream::getRequiredColumnsForProxyStorage(const Names & column_names) const
Expand Down
101 changes: 62 additions & 39 deletions src/Storages/Streaming/SourceColumnsDescription.cpp
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
#include "SourceColumnsDescription.h"
#include <Storages/Streaming/SourceColumnsDescription.h>

#include <Core/Block.h>
#include <NativeLog/Record/Record.h>
#include <Storages/StorageSnapshot.h>
#include <base/ClockUtils.h>
#include <Common/ProtonCommon.h>

#include <numeric>

namespace DB
{
SourceColumnsDescription::PhysicalColumnPositions &
Expand All @@ -30,21 +32,39 @@ void SourceColumnsDescription::PhysicalColumnPositions::clear()
subcolumns.clear();
}

SourceColumnsDescription::SourceColumnsDescription(const Names & required_column_names, StorageSnapshotPtr storage_snapshot)
SourceColumnsDescription::SourceColumnsDescription(
const Names & required_column_names, StorageSnapshotPtr storage_snapshot, bool enable_partial_read)
: SourceColumnsDescription(
storage_snapshot->getColumnsByNames(GetColumnsOptions(GetColumnsOptions::All).withSubcolumns().withVirtuals().withExtendedObjects(), required_column_names),
storage_snapshot->getColumnsByNames(
GetColumnsOptions(GetColumnsOptions::All).withSubcolumns().withVirtuals().withExtendedObjects(), required_column_names),
storage_snapshot->getMetadataForQuery()->getSampleBlock(),
storage_snapshot->getColumns(GetColumnsOptions(GetColumnsOptions::All).withExtendedObjects()))
storage_snapshot->getColumns(GetColumnsOptions(GetColumnsOptions::All).withExtendedObjects()),
enable_partial_read)
{
}

SourceColumnsDescription::SourceColumnsDescription(const NamesAndTypesList & columns_to_read, const Block & schema, const NamesAndTypesList & all_extended_columns)
SourceColumnsDescription::SourceColumnsDescription(
const NamesAndTypesList & columns_to_read,
const Block & schema,
const NamesAndTypesList & all_extended_columns,
bool enable_partial_read)
{
/// FIXME, when we have multi-version of schema, the header and the schema may be mismatched
auto column_size = columns_to_read.size();

if (enable_partial_read)
{
/// Just read required partial physical columns
physical_column_positions_to_read.positions.reserve(column_size);
}
else
{
/// Read full physical columns
physical_column_positions_to_read.positions.resize(schema.columns());
std::iota(physical_column_positions_to_read.positions.begin(), physical_column_positions_to_read.positions.end(), 0);
}

positions.reserve(column_size);
physical_column_positions_to_read.positions.reserve(column_size);
subcolumns_to_read.reserve(column_size);

std::vector<uint16_t> read_all_subcolumns_positions;
Expand Down Expand Up @@ -112,45 +132,48 @@ SourceColumnsDescription::SourceColumnsDescription(const NamesAndTypesList & col
auto pos_in_schema = schema.getPositionByName(name_in_storage);
const auto & column_in_storage = schema.getByName(name_in_storage);

/// Calculate main column pos
size_t physical_pos_in_schema_to_read = 0;
/// We don't need to read duplicate physical columns from schema
auto physical_pos_iter = std::find(
physical_column_positions_to_read.positions.begin(), physical_column_positions_to_read.positions.end(), pos_in_schema);
if (physical_pos_iter == physical_column_positions_to_read.positions.end())
size_t physical_pos_in_schema_to_read = pos_in_schema;
/// Specially, re-calculate pos in partially read schema
if (enable_partial_read)
{
physical_pos_in_schema_to_read = physical_column_positions_to_read.positions.size();
physical_column_positions_to_read.positions.emplace_back(pos_in_schema);
/// We don't need to read duplicate physical columns from schema
auto physical_pos_iter = std::find(
physical_column_positions_to_read.positions.begin(), physical_column_positions_to_read.positions.end(), pos_in_schema);
if (physical_pos_iter == physical_column_positions_to_read.positions.end())
{
physical_pos_in_schema_to_read = physical_column_positions_to_read.positions.size();
physical_column_positions_to_read.positions.emplace_back(pos_in_schema);
}
else
physical_pos_in_schema_to_read = physical_pos_iter - physical_column_positions_to_read.positions.begin();
}

/// json, array(json), tuple(..., json, ...)
if (column_in_storage.type->hasDynamicSubcolumns())
/// json, array(json), tuple(..., json, ...)
if (column_in_storage.type->hasDynamicSubcolumns())
{
/// We like to read parent json column once if multiple subcolumns of the same json are required
/// like `select json.a, json.b from stream`
auto find_iter = std::find_if(
physical_object_columns_to_read.begin(),
physical_object_columns_to_read.end(),
[&name_in_storage](const auto & col_name_type) { return col_name_type.name == name_in_storage; });

if (find_iter == physical_object_columns_to_read.end())
{
/// We like to read parent json column once if multiple subcolumns of the same json are required
/// like `select json.a, json.b from stream`
auto find_iter = std::find_if(
physical_object_columns_to_read.begin(),
physical_object_columns_to_read.end(),
[&column](const auto & col_name_type) { return col_name_type.name == column.name; });

if (find_iter == physical_object_columns_to_read.end())
if (column.isSubcolumn())
{
if (column.isSubcolumn())
{
/// When reading a subcolumn of a json like `select json.a from stream`, we will need read the parent `json` column
auto name_and_type = all_extended_columns.tryGetByName(name_in_storage);
assert(name_and_type);
physical_object_columns_to_read.emplace_back(std::move(*name_and_type));
}
else
{
/// This column is parent json column, like `select json from stream`, use the name and type directly
physical_object_columns_to_read.emplace_back(column);
}
/// When reading a subcolumn of a json like `select json.a from stream`, we will need read the parent `json` column
auto name_and_type = all_extended_columns.tryGetByName(name_in_storage);
assert(name_and_type);
physical_object_columns_to_read.emplace_back(std::move(*name_and_type));
}
else
{
/// This column is parent json column, like `select json from stream`, use the name and type directly
physical_object_columns_to_read.emplace_back(column);
}
}
}
else
physical_pos_in_schema_to_read = physical_pos_iter - physical_column_positions_to_read.positions.begin();

/// For subcolumn, which dependents on the main column
if (column.isSubcolumn())
Expand Down Expand Up @@ -181,7 +204,7 @@ SourceColumnsDescription::SourceColumnsDescription(const NamesAndTypesList & col
physical_column_positions_to_read.subcolumns.erase(pos);

/// Clients like to read virtual columns only, add `_tp_time`, then we know how many rows
if (physical_column_positions_to_read.positions.empty())
if (enable_partial_read && physical_column_positions_to_read.positions.empty())
physical_column_positions_to_read.positions.emplace_back(schema.getPositionByName(ProtonConsts::RESERVED_EVENT_TIME));
}
}
8 changes: 6 additions & 2 deletions src/Storages/Streaming/SourceColumnsDescription.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,12 @@ using StorageSnapshotPtr = std::shared_ptr<StorageSnapshot>;
struct SourceColumnsDescription
{
SourceColumnsDescription() = default;
SourceColumnsDescription(const Names & required_column_names, StorageSnapshotPtr storage_snapshot);
SourceColumnsDescription(const NamesAndTypesList & columns_to_read, const Block & schema, const NamesAndTypesList & all_extended_columns);
SourceColumnsDescription(const Names & required_column_names, StorageSnapshotPtr storage_snapshot, bool enable_partial_read = true);
SourceColumnsDescription(
const NamesAndTypesList & columns_to_read,
const Block & schema,
const NamesAndTypesList & all_extended_columns,
bool enable_partial_read = true);

enum class ReadColumnType : uint8_t
{
Expand Down
43 changes: 27 additions & 16 deletions src/Storages/Streaming/StorageStream.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -486,15 +486,19 @@ void StorageStream::readConcat(
for (auto & stream_shard : shards_to_read)
{
auto create_streaming_source = [this, header, storage_snapshot, stream_shard, seek_to_info = query_info.seek_to_info, context_](
Int64 & max_sn_in_parts) {
Int64 & max_sn_in_parts) -> SourcePtr {
if (max_sn_in_parts < 0)
{
/// Fallback to seek streaming store
auto offsets = stream_shard->getOffsets(seek_to_info);
LOG_INFO(log, "Fused read fallbacks to seek stream for shard={} since there are no historical data", stream_shard->shard);

return std::make_shared<StreamingStoreSource>(
stream_shard, header, storage_snapshot, context_, offsets[stream_shard->shard], log);
if (context_->getSettingsRef().query_resource_group.value == "shared")
return source_multiplexers->createChannel(
stream_shard, header.getNames(), storage_snapshot, context_, offsets[stream_shard->shard]);
else
return std::make_shared<StreamingStoreSource>(
stream_shard, header, storage_snapshot, context_, offsets[stream_shard->shard], log);
}

auto committed = stream_shard->storage->inMemoryCommittedSN();
Expand Down Expand Up @@ -526,7 +530,12 @@ void StorageStream::readConcat(
max_sn_in_parts,
committed);

return std::make_shared<StreamingStoreSource>(stream_shard, header, storage_snapshot, context_, max_sn_in_parts + 1, log);
if (context_->getSettingsRef().query_resource_group.value == "shared")
return source_multiplexers->createChannel(
stream_shard, header.getNames(), storage_snapshot, context_, max_sn_in_parts + 1);
else
return std::make_shared<StreamingStoreSource>(
stream_shard, header, storage_snapshot, context_, max_sn_in_parts + 1, log);
}
else
{
Expand All @@ -542,8 +551,12 @@ void StorageStream::readConcat(

/// We need reset max_sn_in_parts to tell caller that we are seeking streaming store directly
max_sn_in_parts = -1;
return std::make_shared<StreamingStoreSource>(
stream_shard, header, storage_snapshot, context_, offsets[stream_shard->shard], log);
if (context_->getSettingsRef().query_resource_group.value == "shared")
return source_multiplexers->createChannel(
stream_shard, header.getNames(), storage_snapshot, context_, offsets[stream_shard->shard]);
else
return std::make_shared<StreamingStoreSource>(
stream_shard, header, storage_snapshot, context_, offsets[stream_shard->shard], log);
}
};

Expand Down Expand Up @@ -601,22 +614,18 @@ void StorageStream::readStreaming(

assert(query_info.seek_to_info);
const auto & settings_ref = context_->getSettingsRef();
/// 1) Checkpointed queries shall not be multiplexed
/// 2) Queries which seek to a specific timestamp shall not be multiplexed
auto share_resource_group = (settings_ref.query_resource_group.value == "shared")
&& (query_info.seek_to_info->getSeekTo().empty() || query_info.seek_to_info->getSeekTo() == "latest")
&& (settings_ref.exec_mode == ExecuteMode::NORMAL);

if (share_resource_group)
if (settings_ref.query_resource_group.value == "shared")
{
auto offsets = stream_shards.back()->getOffsets(query_info.seek_to_info);
for (auto stream_shard : shards_to_read)
{
const auto & offset = offsets[stream_shard->shard];
if (!column_names.empty())
pipes.emplace_back(
stream_shard->source_multiplexers->createChannel(stream_shard->shard, column_names, storage_snapshot, context_));
source_multiplexers->createChannel(std::move(stream_shard), column_names, storage_snapshot, context_, offset));
else
pipes.emplace_back(stream_shard->source_multiplexers->createChannel(
stream_shard->shard, {ProtonConsts::RESERVED_EVENT_TIME}, storage_snapshot, context_));
pipes.emplace_back(source_multiplexers->createChannel(
std::move(stream_shard), {ProtonConsts::RESERVED_EVENT_TIME}, storage_snapshot, context_, offset));
}

LOG_INFO(log, "Starting reading {} streams in shared resource group", pipes.size());
Expand Down Expand Up @@ -945,6 +954,8 @@ void StorageStream::startup()
assert(native_log->enabled());
}

source_multiplexers.reset(new StreamingStoreSourceMultiplexers(getContext(), log));

log_initialized.test_and_set();

LOG_INFO(log, "Started");
Expand Down
7 changes: 5 additions & 2 deletions src/Storages/Streaming/StorageStream.h
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@

#include <Storages/MergeTree/MergeTreeData.h>
#include <Storages/MergeTree/MergeTreeMutationStatus.h>
#include <Storages/Streaming/StreamingStoreSourceMultiplexer.h>

namespace nlog
{
Expand Down Expand Up @@ -298,8 +299,7 @@ class StorageStream final : public shared_ptr_helper<StorageStream>, public Merg
UInt64 base_block_id,
UInt64 sub_block_id);

void
appendToNativeLog(nlog::RecordPtr & record, IngestMode /*ingest_mode*/, klog::AppendCallback callback, klog::CallbackData data);
void appendToNativeLog(nlog::RecordPtr & record, IngestMode /*ingest_mode*/, klog::AppendCallback callback, klog::CallbackData data);

void appendToKafka(
nlog::RecordPtr & record,
Expand Down Expand Up @@ -354,5 +354,8 @@ class StorageStream final : public shared_ptr_helper<StorageStream>, public Merg

std::atomic_flag inited;
std::atomic_flag stopped;

/// Multiplex latest records of each shard.
std::unique_ptr<StreamingStoreSourceMultiplexers> source_multiplexers;
};
}
2 changes: 0 additions & 2 deletions src/Storages/Streaming/StreamShard.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -124,8 +124,6 @@ StreamShard::~StreamShard()

void StreamShard::startup()
{
source_multiplexers.reset(new StreamingStoreSourceMultiplexers(shared_from_this(), storage_stream->getContext(), log));

initLog();

/// for virtual tables or in-memory storage type, there is no storage object
Expand Down
2 changes: 0 additions & 2 deletions src/Storages/Streaming/StreamShard.h
Original file line number Diff line number Diff line change
Expand Up @@ -148,8 +148,6 @@ class StreamShard final : public std::enable_shared_from_this<StreamShard>

std::unique_ptr<StreamCallbackData> callback_data;

std::unique_ptr<StreamingStoreSourceMultiplexers> source_multiplexers;

// For random shard index generation
mutable std::mutex rng_mutex;
pcg64 rng;
Expand Down
Loading