Skip to content

Commit

Permalink
Pass remaining attempts count when estimating memory for next task at…
Browse files Browse the repository at this point in the history
…tempt
  • Loading branch information
losipiuk committed Aug 19, 2024
1 parent 6495054 commit e872728
Show file tree
Hide file tree
Showing 6 changed files with 48 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2461,7 +2461,8 @@ public List<PrioritizedScheduledTask> taskFailed(TaskId taskId, ExecutionFailure
MemoryRequirements newMemoryLimits = partitionMemoryEstimator.getNextRetryMemoryRequirements(
partition.getMemoryRequirements(),
taskStatus.getPeakMemoryReservation(),
errorCode);
errorCode,
partition.getRemainingAttempts());
partition.setPostFailureMemoryRequirements(newMemoryLimits);

if (errorCode != null && isOutOfMemoryError(errorCode)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,7 @@ public MemoryRequirements getInitialMemoryRequirements()
}

@Override
public MemoryRequirements getNextRetryMemoryRequirements(MemoryRequirements previousMemoryRequirements, DataSize peakMemoryUsage, ErrorCode errorCode)
public MemoryRequirements getNextRetryMemoryRequirements(MemoryRequirements previousMemoryRequirements, DataSize peakMemoryUsage, ErrorCode errorCode, int remainingAttempts)
{
DataSize previousMemory = previousMemoryRequirements.getRequiredMemory();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ public MemoryRequirements getInitialMemoryRequirements()
}

@Override
public MemoryRequirements getNextRetryMemoryRequirements(MemoryRequirements previousMemoryRequirements, DataSize peakMemoryUsage, ErrorCode errorCode)
public MemoryRequirements getNextRetryMemoryRequirements(MemoryRequirements previousMemoryRequirements, DataSize peakMemoryUsage, ErrorCode errorCode, int remainingAttempts)
{
return new MemoryRequirements(DataSize.ofBytes(0));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ public interface PartitionMemoryEstimator
{
MemoryRequirements getInitialMemoryRequirements();

MemoryRequirements getNextRetryMemoryRequirements(MemoryRequirements previousMemoryRequirements, DataSize peakMemoryUsage, ErrorCode errorCode);
MemoryRequirements getNextRetryMemoryRequirements(MemoryRequirements previousMemoryRequirements, DataSize peakMemoryUsage, ErrorCode errorCode, int remainingAttempts);

void registerPartitionFinished(MemoryRequirements previousMemoryRequirements, DataSize peakMemoryUsage, boolean success, Optional<ErrorCode> errorCode);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,43 +103,49 @@ public void testEstimator()
estimator.getNextRetryMemoryRequirements(
new MemoryRequirements(DataSize.of(50, MEGABYTE)),
DataSize.of(10, MEGABYTE),
StandardErrorCode.CORRUPT_PAGE.toErrorCode()))
StandardErrorCode.CORRUPT_PAGE.toErrorCode(),
5))
.isEqualTo(new MemoryRequirements(DataSize.of(50, MEGABYTE)));

assertThat(
estimator.getNextRetryMemoryRequirements(
new MemoryRequirements(DataSize.of(50, MEGABYTE)),
DataSize.of(10, MEGABYTE),
StandardErrorCode.CLUSTER_OUT_OF_MEMORY.toErrorCode()))
StandardErrorCode.CLUSTER_OUT_OF_MEMORY.toErrorCode(),
5))
.isEqualTo(new MemoryRequirements(DataSize.of(150, MEGABYTE)));

assertThat(
estimator.getNextRetryMemoryRequirements(
new MemoryRequirements(DataSize.of(50, MEGABYTE)),
DataSize.of(10, MEGABYTE),
StandardErrorCode.TOO_MANY_REQUESTS_FAILED.toErrorCode()))
StandardErrorCode.TOO_MANY_REQUESTS_FAILED.toErrorCode(),
5))
.isEqualTo(new MemoryRequirements(DataSize.of(150, MEGABYTE)));

assertThat(
estimator.getNextRetryMemoryRequirements(
new MemoryRequirements(DataSize.of(50, MEGABYTE)),
DataSize.of(10, MEGABYTE),
EXCEEDED_LOCAL_MEMORY_LIMIT.toErrorCode()))
EXCEEDED_LOCAL_MEMORY_LIMIT.toErrorCode(),
5))
.isEqualTo(new MemoryRequirements(DataSize.of(150, MEGABYTE)));

// peak memory of failed task 70MB
assertThat(
estimator.getNextRetryMemoryRequirements(
new MemoryRequirements(DataSize.of(50, MEGABYTE)),
DataSize.of(70, MEGABYTE),
StandardErrorCode.CORRUPT_PAGE.toErrorCode()))
StandardErrorCode.CORRUPT_PAGE.toErrorCode(),
5))
.isEqualTo(new MemoryRequirements(DataSize.of(70, MEGABYTE)));

assertThat(
estimator.getNextRetryMemoryRequirements(
new MemoryRequirements(DataSize.of(50, MEGABYTE)),
DataSize.of(70, MEGABYTE),
EXCEEDED_LOCAL_MEMORY_LIMIT.toErrorCode()))
EXCEEDED_LOCAL_MEMORY_LIMIT.toErrorCode(),
5))
.isEqualTo(new MemoryRequirements(DataSize.of(210, MEGABYTE)));

// register a couple successful attempts; 90th percentile is at 300MB
Expand All @@ -164,7 +170,8 @@ public void testEstimator()
estimator.getNextRetryMemoryRequirements(
new MemoryRequirements(DataSize.of(50, MEGABYTE)),
DataSize.of(70, MEGABYTE),
EXCEEDED_LOCAL_MEMORY_LIMIT.toErrorCode()))
EXCEEDED_LOCAL_MEMORY_LIMIT.toErrorCode(),
5))
.isEqualTo(new MemoryRequirements(DataSize.of(300, MEGABYTE)));

// a couple oom errors are registered
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,12 +73,38 @@ public void testInformationSchemaScan()
assertThat(estimator.getNextRetryMemoryRequirements(
new PartitionMemoryEstimator.MemoryRequirements(DataSize.ofBytes(1)),
DataSize.of(5, BYTE),
StandardErrorCode.NOT_SUPPORTED.toErrorCode()))
StandardErrorCode.NOT_SUPPORTED.toErrorCode(),
5))
.isEqualTo(noMemoryRequirements);
assertThat(estimator.getNextRetryMemoryRequirements(
new PartitionMemoryEstimator.MemoryRequirements(DataSize.ofBytes(1)),
DataSize.of(5, BYTE),
StandardErrorCode.GENERIC_INSUFFICIENT_RESOURCES.toErrorCode()))
StandardErrorCode.NOT_SUPPORTED.toErrorCode(),
1))
.isEqualTo(noMemoryRequirements);
assertThat(estimator.getNextRetryMemoryRequirements(
new PartitionMemoryEstimator.MemoryRequirements(DataSize.ofBytes(1)),
DataSize.of(5, BYTE),
StandardErrorCode.NOT_SUPPORTED.toErrorCode(),
0))
.isEqualTo(noMemoryRequirements);
assertThat(estimator.getNextRetryMemoryRequirements(
new PartitionMemoryEstimator.MemoryRequirements(DataSize.ofBytes(1)),
DataSize.of(5, BYTE),
StandardErrorCode.GENERIC_INSUFFICIENT_RESOURCES.toErrorCode(),
5))
.isEqualTo(noMemoryRequirements);
assertThat(estimator.getNextRetryMemoryRequirements(
new PartitionMemoryEstimator.MemoryRequirements(DataSize.ofBytes(1)),
DataSize.of(5, BYTE),
StandardErrorCode.GENERIC_INSUFFICIENT_RESOURCES.toErrorCode(),
1))
.isEqualTo(noMemoryRequirements);
assertThat(estimator.getNextRetryMemoryRequirements(
new PartitionMemoryEstimator.MemoryRequirements(DataSize.ofBytes(1)),
DataSize.of(5, BYTE),
StandardErrorCode.GENERIC_INSUFFICIENT_RESOURCES.toErrorCode(),
0))
.isEqualTo(noMemoryRequirements);
}

Expand Down Expand Up @@ -245,7 +271,7 @@ public MemoryRequirements getInitialMemoryRequirements()
}

@Override
public MemoryRequirements getNextRetryMemoryRequirements(MemoryRequirements previousMemoryRequirements, DataSize peakMemoryUsage, ErrorCode errorCode)
public MemoryRequirements getNextRetryMemoryRequirements(MemoryRequirements previousMemoryRequirements, DataSize peakMemoryUsage, ErrorCode errorCode, int remainingAttempts)
{
throw new RuntimeException("not implemented");
}
Expand Down

0 comments on commit e872728

Please # to comment.