Skip to content
New issue

Have a question about this project? # for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “#”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? # to your account

[NO NEED REVIEW right now] Feature/continue relax too many parts #516

Draft
wants to merge 11 commits into
base: develop
Choose a base branch
from
14 changes: 11 additions & 3 deletions docs/en/operations/settings/merge-tree-settings.md
Original file line number Diff line number Diff line change
Expand Up @@ -41,11 +41,11 @@ Possible values:

- Any positive integer.

Default value: 300.
Default value: 3000.

To achieve maximum performance of `SELECT` queries, it is necessary to minimize the number of parts processed, see [Merge Tree](../../development/architecture.md#merge-tree).

You can set a larger value to 600 (1200), this will reduce the probability of the `Too many parts` error, but at the same time `SELECT` performance might degrade. Also in case of a merge issue (for example, due to insufficient disk space) you will notice it later than it could be with the original 300.
Prior to 23.6 this setting was set to 300. You can set a higher different value, it will reduce the probability of the `Too many parts` error, but at the same time `SELECT` performance might degrade. Also in case of a merge issue (for example, due to insufficient disk space) you will notice it later than it could be with the original 300.


## parts_to_delay_insert {#parts-to-delay-insert}
Expand Down Expand Up @@ -97,8 +97,16 @@ max_k = parts_to_throw_insert - parts_to_delay_insert
k = 1 + parts_count_in_partition - parts_to_delay_insert
delay_milliseconds = pow(max_delay_to_insert * 1000, k / max_k)
```
For example, if a partition has 299 active parts and parts_to_throw_insert = 300, parts_to_delay_insert = 150, max_delay_to_insert = 1, `INSERT` is delayed for `pow( 1 * 1000, (1 + 299 - 150) / (300 - 150) ) = 1000` milliseconds.

For example if a partition has 299 active parts and parts_to_throw_insert = 300, parts_to_delay_insert = 150, max_delay_to_insert = 1, `INSERT` is delayed for `pow( 1 * 1000, (1 + 299 - 150) / (300 - 150) ) = 1000` milliseconds.
Starting from version 23.1 formula has been changed to:
```code
allowed_parts_over_threshold = parts_to_throw_insert - parts_to_delay_insert
parts_over_threshold = parts_count_in_partition - parts_to_delay_insert + 1
delay_milliseconds = max(min_delay_to_insert_ms, (max_delay_to_insert * 1000) * parts_over_threshold / allowed_parts_over_threshold)
```

For example, if a partition has 224 active parts and parts_to_throw_insert = 300, parts_to_delay_insert = 150, max_delay_to_insert = 1, min_delay_to_insert_ms = 10, `INSERT` is delayed for `max( 10, 1 * 1000 * (224 - 150 + 1) / (300 - 150) ) = 500` milliseconds.

## max_parts_in_total {#max-parts-in-total}

Expand Down
3 changes: 2 additions & 1 deletion src/Core/Settings.h
Original file line number Diff line number Diff line change
Expand Up @@ -271,7 +271,8 @@ static constexpr UInt64 operator""_GiB(unsigned long long value)
M(UInt64, max_replica_delay_for_distributed_queries, 300, "If set, distributed queries of Replicated tables will choose servers with replication delay in seconds less than the specified value (not inclusive). Zero means do not take delay into account.", 0) \
M(Bool, fallback_to_stale_replicas_for_distributed_queries, true, "Suppose max_replica_delay_for_distributed_queries is set and all replicas for the queried table are stale. If this setting is enabled, the query will be performed anyway, otherwise the error will be reported.", 0) \
M(UInt64, preferred_max_column_in_block_size_bytes, 0, "Limit on max column size in block while reading. Helps to decrease cache misses count. Should be close to L2 cache size.", 0) \
\
M(UInt64, parts_to_delay_insert, 0, "If the destination table contains at least that many active parts in a single partition, artificially slow down insert into table.", 0) \
M(UInt64, parts_to_throw_insert, 0, "If more than this number active parts in a single partition of the destination table, throw 'Too many parts ...' exception.", 0) \
M(Bool, insert_distributed_sync, false, "If setting is enabled, insert query into distributed waits until data will be sent to all nodes in cluster.", 0) \
M(UInt64, insert_distributed_timeout, 0, "Timeout for insert query into distributed. Setting is used only with insert_distributed_sync enabled. Zero value means no timeout.", 0) \
M(Int64, distributed_ddl_task_timeout, 180, "Timeout for DDL query responses from all hosts in cluster. If a ddl request has not been performed on all hosts, a response will contain a timeout error and a request will be executed in an async mode. Negative value means infinite. Zero means async mode.", 0) \
Expand Down
2 changes: 1 addition & 1 deletion src/Interpreters/AsynchronousMetrics.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1429,7 +1429,7 @@ void AsynchronousMetrics::update(std::chrono::system_clock::time_point update_ti

const auto & settings = getContext()->getSettingsRef();

calculateMax(max_part_count_for_partition, table_merge_tree->getMaxPartsCountForPartition());
calculateMax(max_part_count_for_partition, table_merge_tree->getMaxPartsCountAndSizeForPartition().first);
total_number_of_bytes += table_merge_tree->totalBytes(settings).value();
total_number_of_rows += table_merge_tree->totalRows(settings).value();
total_number_of_parts += table_merge_tree->getPartsCount();
Expand Down
2 changes: 0 additions & 2 deletions src/Loggers/OwnPatternFormatter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,6 @@
#include <IO/WriteBufferFromString.h>
#include <IO/WriteHelpers.h>
#include <Common/HashTable/Hash.h>
#include <Interpreters/InternalTextLogsQueue.h>
#include <Common/CurrentThread.h>
#include <base/terminalColors.h>


Expand Down
142 changes: 97 additions & 45 deletions src/Storages/MergeTree/MergeTreeData.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -3438,42 +3438,49 @@ size_t MergeTreeData::getPartsCount() const
}


size_t MergeTreeData::getMaxPartsCountForPartitionWithState(DataPartState state) const
std::pair<size_t, size_t> MergeTreeData::getMaxPartsCountAndSizeForPartitionWithState(DataPartState state) const
{
auto lock = lockParts();

size_t res = 0;
size_t cur_count = 0;
size_t cur_parts_count = 0;
size_t cur_parts_size = 0;
size_t max_parts_count = 0;
size_t argmax_parts_size = 0;

const String * cur_partition_id = nullptr;

for (const auto & part : getDataPartsStateRange(state))
{
if (cur_partition_id && part->info.partition_id == *cur_partition_id)
{
++cur_count;
}
else
if (!cur_partition_id || part->info.partition_id != *cur_partition_id)
{
cur_partition_id = &part->info.partition_id;
cur_count = 1;
cur_parts_count = 0;
cur_parts_size = 0;
}

res = std::max(res, cur_count);
++cur_parts_count;
cur_parts_size += part->getBytesOnDisk();

if (cur_parts_count > max_parts_count)
{
max_parts_count = cur_parts_count;
argmax_parts_size = cur_parts_size;
}
}

return res;
return {max_parts_count, argmax_parts_size};
}


size_t MergeTreeData::getMaxPartsCountForPartition() const
std::pair<size_t, size_t> MergeTreeData::getMaxPartsCountAndSizeForPartition() const
{
return getMaxPartsCountForPartitionWithState(DataPartState::Active);
return getMaxPartsCountAndSizeForPartitionWithState(DataPartState::Active);
}


size_t MergeTreeData::getMaxInactivePartsCountForPartition() const
size_t MergeTreeData::getMaxOutdatedPartsCountForPartition() const
{
return getMaxPartsCountForPartitionWithState(DataPartState::Outdated);
return getMaxPartsCountAndSizeForPartitionWithState(DataPartState::Outdated).first;
}


Expand All @@ -3492,72 +3499,117 @@ std::optional<Int64> MergeTreeData::getMinPartDataVersion() const
}


void MergeTreeData::delayInsertOrThrowIfNeeded(Poco::Event * until) const
void MergeTreeData::delayInsertOrThrowIfNeeded(Poco::Event * until, const ContextPtr & query_context, bool allow_throw) const
{
const auto settings = getSettings();
const auto & query_settings = query_context->getSettingsRef();
const size_t parts_count_in_total = getPartsCount();
if (parts_count_in_total >= settings->max_parts_in_total)

/// Check if we have too many parts in total
if (allow_throw && parts_count_in_total >= settings->max_parts_in_total)
{
ProfileEvents::increment(ProfileEvents::RejectedInserts);
throw Exception("Too many parts (" + toString(parts_count_in_total) + ") in all partitions in total. This indicates wrong choice of partition key. The threshold can be modified with 'max_parts_in_total' setting in <merge_tree> element in config.xml or with per-stream setting.", ErrorCodes::TOO_MANY_PARTS);
throw Exception(
ErrorCodes::TOO_MANY_PARTS,
"Too many parts ({}) in all partitions in total. This indicates wrong choice of partition key. The threshold can be modified "
"with 'max_parts_in_total' setting in <merge_tree> element in config.xml or with per-table setting.",
toString(parts_count_in_total));
}

size_t parts_count_in_partition = getMaxPartsCountForPartition();
ssize_t k_inactive = -1;
if (settings->inactive_parts_to_throw_insert > 0 || settings->inactive_parts_to_delay_insert > 0)
size_t outdated_parts_over_threshold = 0;
{
size_t inactive_parts_count_in_partition = getMaxInactivePartsCountForPartition();
if (settings->inactive_parts_to_throw_insert > 0 && inactive_parts_count_in_partition >= settings->inactive_parts_to_throw_insert)
size_t outdated_parts_count_in_partition = 0;
if (settings->inactive_parts_to_throw_insert > 0 || settings->inactive_parts_to_delay_insert > 0)
outdated_parts_count_in_partition = getMaxOutdatedPartsCountForPartition();

if (allow_throw && settings->inactive_parts_to_throw_insert > 0 && outdated_parts_count_in_partition >= settings->inactive_parts_to_throw_insert)
{
ProfileEvents::increment(ProfileEvents::RejectedInserts);
throw Exception(
ErrorCodes::TOO_MANY_PARTS,
"Too many inactive parts ({}). Parts cleaning are processing significantly slower than inserts",
inactive_parts_count_in_partition);
outdated_parts_count_in_partition);
}
k_inactive = ssize_t(inactive_parts_count_in_partition) - ssize_t(settings->inactive_parts_to_delay_insert);
if (settings->inactive_parts_to_delay_insert > 0 && outdated_parts_count_in_partition >= settings->inactive_parts_to_delay_insert)
outdated_parts_over_threshold = outdated_parts_count_in_partition - settings->inactive_parts_to_delay_insert + 1;
}

if (parts_count_in_partition >= settings->parts_to_throw_insert)
auto [parts_count_in_partition, size_of_partition] = getMaxPartsCountAndSizeForPartition();
size_t average_part_size = parts_count_in_partition ? size_of_partition / parts_count_in_partition : 0;
const auto active_parts_to_delay_insert
= query_settings.parts_to_delay_insert ? query_settings.parts_to_delay_insert : settings->parts_to_delay_insert;
const auto active_parts_to_throw_insert
= query_settings.parts_to_throw_insert ? query_settings.parts_to_throw_insert : settings->parts_to_throw_insert;
size_t active_parts_over_threshold = 0;
{
ProfileEvents::increment(ProfileEvents::RejectedInserts);
throw Exception(
ErrorCodes::TOO_MANY_PARTS,
"Too many parts ({}). Merges are processing significantly slower than inserts",
parts_count_in_partition);
bool parts_are_large_enough_in_average
= settings->max_avg_part_size_for_too_many_parts && average_part_size > settings->max_avg_part_size_for_too_many_parts;

if (allow_throw && parts_count_in_partition >= active_parts_to_throw_insert && !parts_are_large_enough_in_average)
{
ProfileEvents::increment(ProfileEvents::RejectedInserts);
throw Exception(
ErrorCodes::TOO_MANY_PARTS,
"Too many parts ({} with average size of {}). Merges are processing significantly slower than inserts",
parts_count_in_partition,
ReadableSize(average_part_size));
}
if (active_parts_to_delay_insert > 0 && parts_count_in_partition >= active_parts_to_delay_insert
&& !parts_are_large_enough_in_average)
/// if parts_count == parts_to_delay_insert -> we're 1 part over threshold
active_parts_over_threshold = parts_count_in_partition - active_parts_to_delay_insert + 1;
}

if (k_inactive < 0 && parts_count_in_partition < settings->parts_to_delay_insert)
/// no need for delay
if (!active_parts_over_threshold && !outdated_parts_over_threshold)
return;

const ssize_t k_active = ssize_t(parts_count_in_partition) - ssize_t(settings->parts_to_delay_insert);
size_t max_k;
size_t k;
if (k_active > k_inactive)
UInt64 delay_milliseconds = 0;
{
max_k = settings->parts_to_throw_insert - settings->parts_to_delay_insert;
k = k_active + 1;
}
else
{
max_k = settings->inactive_parts_to_throw_insert - settings->inactive_parts_to_delay_insert;
k = k_inactive + 1;
size_t parts_over_threshold = 0;
size_t allowed_parts_over_threshold = 1;
const bool use_active_parts_threshold = (active_parts_over_threshold >= outdated_parts_over_threshold);
if (use_active_parts_threshold)
{
parts_over_threshold = active_parts_over_threshold;
allowed_parts_over_threshold = active_parts_to_throw_insert - active_parts_to_delay_insert;
}
else
{
parts_over_threshold = outdated_parts_over_threshold;
allowed_parts_over_threshold = outdated_parts_over_threshold; /// if throw threshold is not set, will use max delay
if (settings->inactive_parts_to_throw_insert > 0)
allowed_parts_over_threshold = settings->inactive_parts_to_throw_insert - settings->inactive_parts_to_delay_insert;
}

const UInt64 max_delay_milliseconds = (settings->max_delay_to_insert > 0 ? settings->max_delay_to_insert * 1000 : 1000);
if (allowed_parts_over_threshold == 0 || parts_over_threshold > allowed_parts_over_threshold)
{
delay_milliseconds = max_delay_milliseconds;
}
else
{
double delay_factor = static_cast<double>(parts_over_threshold) / allowed_parts_over_threshold;
const UInt64 min_delay_milliseconds = settings->min_delay_to_insert_ms;
delay_milliseconds = std::max(min_delay_milliseconds, static_cast<UInt64>(max_delay_milliseconds * delay_factor));
}
}
const UInt64 delay_milliseconds = static_cast<UInt64>(::pow(settings->max_delay_to_insert * 1000, static_cast<double>(k) / max_k));

ProfileEvents::increment(ProfileEvents::DelayedInserts);
ProfileEvents::increment(ProfileEvents::DelayedInsertsMilliseconds, delay_milliseconds);

CurrentMetrics::Increment metric_increment(CurrentMetrics::DelayedInserts);

LOG_INFO(log, "Delaying inserting block by {} ms. because there are {} parts", delay_milliseconds, parts_count_in_partition);
LOG_INFO(log, "Delaying inserting block by {} ms. because there are {} parts and their average size is {}",
delay_milliseconds, parts_count_in_partition, ReadableSize(average_part_size));

if (until)
until->tryWait(delay_milliseconds);
else
std::this_thread::sleep_for(std::chrono::milliseconds(static_cast<size_t>(delay_milliseconds)));
}


MergeTreeData::DataPartPtr MergeTreeData::getActiveContainingPart(
const MergeTreePartInfo & part_info, MergeTreeData::DataPartState state, DataPartsLock & /*lock*/) const
{
Expand Down
12 changes: 8 additions & 4 deletions src/Storages/MergeTree/MergeTreeData.h
Original file line number Diff line number Diff line change
Expand Up @@ -506,9 +506,13 @@ class MergeTreeData : public IStorage, public WithMutableContext
size_t getTotalActiveSizeInRows() const;

size_t getPartsCount() const;
size_t getMaxPartsCountForPartitionWithState(DataPartState state) const;
size_t getMaxPartsCountForPartition() const;
size_t getMaxInactivePartsCountForPartition() const;

/// Returns a pair with: max number of parts in partition across partitions; sum size of parts inside that partition.
/// (if there are multiple partitions with max number of parts, the sum size of parts is returned for arbitrary of them)
std::pair<size_t, size_t> getMaxPartsCountAndSizeForPartitionWithState(DataPartState state) const;
std::pair<size_t, size_t> getMaxPartsCountAndSizeForPartition() const;

size_t getMaxOutdatedPartsCountForPartition() const;

/// Get min value of part->info.getDataVersion() for all active parts.
/// Makes sense only for ordinary MergeTree engines because for them block numbering doesn't depend on partition.
Expand All @@ -528,7 +532,7 @@ class MergeTreeData : public IStorage, public WithMutableContext

/// If the table contains too many active parts, sleep for a while to give them time to merge.
/// If until is non-null, wake up from the sleep earlier if the event happened.
void delayInsertOrThrowIfNeeded(Poco::Event * until = nullptr) const;
void delayInsertOrThrowIfNeeded(Poco::Event * until, const ContextPtr & query_context, bool allow_throw) const;

/// Renames temporary part to a permanent part and adds it to the parts set.
/// It is assumed that the part does not intersect with existing parts.
Expand Down
6 changes: 4 additions & 2 deletions src/Storages/MergeTree/MergeTreeSettings.h
Original file line number Diff line number Diff line change
Expand Up @@ -66,11 +66,13 @@ struct Settings;
M(Bool, remove_rolled_back_parts_immediately, 1, "Setting for an incomplete experimental feature.", 0) \
\
/** Inserts settings. */ \
M(UInt64, parts_to_delay_insert, 150, "If table contains at least that many active parts in single partition, artificially slow down insert into table.", 0) \
M(UInt64, parts_to_delay_insert, 1000, "If table contains at least that many active parts in single partition, artificially slow down insert into table. Disabled if set to 0", 0) \
M(UInt64, inactive_parts_to_delay_insert, 0, "If table contains at least that many inactive parts in single partition, artificially slow down insert into table.", 0) \
M(UInt64, parts_to_throw_insert, 300, "If more than this number active parts in single partition, throw 'Too many parts ...' exception.", 0) \
M(UInt64, parts_to_throw_insert, 3000, "If more than this number active parts in single partition, throw 'Too many parts ...' exception.", 0) \
M(UInt64, inactive_parts_to_throw_insert, 0, "If more than this number inactive parts in single partition, throw 'Too many inactive parts ...' exception.", 0) \
M(UInt64, max_avg_part_size_for_too_many_parts, 1ULL * 1024 * 1024 * 1024, "The 'too many parts' check according to 'parts_to_delay_insert' and 'parts_to_throw_insert' will be active only if the average part size (in the relevant partition) is not larger than the specified threshold. If it is larger than the specified threshold, the INSERTs will be neither delayed or rejected. This allows to have hundreds of terabytes in a single table on a single server if the parts are successfully merged to larger parts. This does not affect the thresholds on inactive parts or total parts.", 0) \
M(UInt64, max_delay_to_insert, 1, "Max delay of inserting data into MergeTree table in seconds, if there are a lot of unmerged parts in single partition.", 0) \
M(UInt64, min_delay_to_insert_ms, 10, "Min delay of inserting data into MergeTree table in milliseconds, if there are a lot of unmerged parts in single partition.", 0) \
M(UInt64, max_parts_in_total, 100000, "If more than this number active parts in all partitions in total, throw 'Too many parts ...' exception.", 0) \
\
/** Replication settings. */ \
Expand Down
Loading