Skip to content

Commit

Permalink
fix scanner (#13292)
Browse files Browse the repository at this point in the history
  • Loading branch information
ivanmorozov333 authored Jan 11, 2025
1 parent c5eecd5 commit 039db66
Showing 1 changed file with 6 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -61,8 +61,7 @@ void TScanHead::OnSourceReady(const std::shared_ptr<IDataSource>& source, std::s
AFL_VERIFY(FinishedSources.emplace(frontSource).second);
while (FinishedSources.size() && (*FinishedSources.begin())->GetFinish() < SortedSources.front()->GetStart()) {
auto finishedSource = *FinishedSources.begin();
if (!finishedSource->GetResultRecordsCount() && Context->GetCommonContext()->GetReadMetadata()->HasLimit() &&
InFlightLimit < MaxInFlight) {
if (!finishedSource->GetResultRecordsCount() && InFlightLimit < MaxInFlight) {
InFlightLimit = 2 * InFlightLimit;
}
FetchedCount += finishedSource->GetResultRecordsCount();
Expand All @@ -75,6 +74,7 @@ void TScanHead::OnSourceReady(const std::shared_ptr<IDataSource>& source, std::s
AFL_NOTICE(NKikimrServices::TX_COLUMNSHARD)("event", "limit_exhausted")(
"limit", Context->GetCommonContext()->GetReadMetadata()->GetLimitRobust())("fetched", FetchedCount);
SortedSources.clear();
break;
}
}
}
Expand Down Expand Up @@ -133,6 +133,8 @@ TConclusion<bool> TScanHead::BuildNextInterval() {
for (auto it = FetchingInFlightSources.begin(); it != FetchingInFlightSources.end(); ++it) {
if ((*it)->GetFinish() < SortedSources.front()->GetStart()) {
++inFlightCountLocal;
} else {
break;
}
}
}
Expand All @@ -148,6 +150,8 @@ TConclusion<bool> TScanHead::BuildNextInterval() {
for (auto it = FetchingInFlightSources.begin(); it != FetchingInFlightSources.end(); ++it) {
if ((*it)->GetFinish() < SortedSources.front()->GetStart()) {
++inFlightCountLocalNew;
} else {
break;
}
}
AFL_VERIFY(inFlightCountLocal <= inFlightCountLocalNew);
Expand Down

0 comments on commit 039db66

Please # to comment.