Skip to content

Commit

Permalink
fix(s3stream): fix sanity check when stream is trimmed in the middle
Browse files Browse the repository at this point in the history
Signed-off-by: Shichao Nie <niesc@automq.com>
  • Loading branch information
SCNieh committed Dec 16, 2023
1 parent 1310eda commit b29f254
Show file tree
Hide file tree
Showing 2 changed files with 99 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -428,7 +428,7 @@ CommitStreamSetObjectRequest buildSplitRequest(List<StreamMetadata> streamMetada
}).filter(Objects::nonNull).forEach(request::addStreamObject);

request.setCompactedObjectIds(Collections.singletonList(objectToSplit.objectId()));
if (!sanityCheckCompactionResult(streamMetadataList, Collections.singletonList(objectToSplit), request)) {
if (isSanityCheckFailed(streamMetadataList, Collections.singletonList(objectToSplit), request)) {
logger.error("Sanity check failed, force split result is illegal");
return null;
}
Expand All @@ -445,6 +445,14 @@ CommitStreamSetObjectRequest buildCompactRequest(List<StreamMetadata> streamMeta
objectsToCompact.size(), objectsToCompact.stream().mapToLong(S3ObjectMetadata::objectSize).sum());
Map<Long, List<StreamDataBlock>> streamDataBlockMap = CompactionUtils.blockWaitObjectIndices(streamMetadataList,
objectsToCompact, s3Operator, logger);
for (List<StreamDataBlock> blocks : streamDataBlockMap.values()) {
for (StreamDataBlock block : blocks) {
if (block.getBlockSize() > compactionCacheSize) {
logger.error("Block {} size exceeds compaction cache size {}, skip compaction", block, compactionCacheSize);
return null;
}
}
}
long now = System.currentTimeMillis();
Set<Long> excludedObjectIds = new HashSet<>();
List<CompactionPlan> compactionPlans = this.compactionAnalyzer.analyze(streamDataBlockMap, excludedObjectIds);
Expand All @@ -457,16 +465,16 @@ CommitStreamSetObjectRequest buildCompactRequest(List<StreamMetadata> streamMeta
request.setCompactedObjectIds(new ArrayList<>(compactedObjectIds));
List<S3ObjectMetadata> compactedObjectMetadata = objectsToCompact.stream()
.filter(e -> compactedObjectIds.contains(e.objectId())).toList();
if (!sanityCheckCompactionResult(streamMetadataList, compactedObjectMetadata, request)) {
if (isSanityCheckFailed(streamMetadataList, compactedObjectMetadata, request)) {
logger.error("Sanity check failed, compaction result is illegal");
return null;
}

return request;
}

boolean sanityCheckCompactionResult(List<StreamMetadata> streamMetadataList, List<S3ObjectMetadata> compactedObjects,
CommitStreamSetObjectRequest request) {
boolean isSanityCheckFailed(List<StreamMetadata> streamMetadataList, List<S3ObjectMetadata> compactedObjects,
CommitStreamSetObjectRequest request) {
Map<Long, StreamMetadata> streamMetadataMap = streamMetadataList.stream()
.collect(Collectors.toMap(StreamMetadata::getStreamId, e -> e));
Map<Long, S3ObjectMetadata> objectMetadataMap = compactedObjects.stream()
Expand All @@ -481,14 +489,22 @@ boolean sanityCheckCompactionResult(List<StreamMetadata> streamMetadataList, Lis
for (long objectId : request.getCompactedObjectIds()) {
S3ObjectMetadata metadata = objectMetadataMap.get(objectId);
for (StreamOffsetRange streamOffsetRange : metadata.getOffsetRanges()) {
if (!streamMetadataMap.containsKey(streamOffsetRange.getStreamId()) ||
streamOffsetRange.getEndOffset() <= streamMetadataMap.get(streamOffsetRange.getStreamId()).getStartOffset()) {
if (!streamMetadataMap.containsKey(streamOffsetRange.getStreamId())) {
// skip non-exist stream
continue;
}
long streamStartOffset = streamMetadataMap.get(streamOffsetRange.getStreamId()).getStartOffset();
if (streamOffsetRange.getEndOffset() <= streamStartOffset) {
// skip stream offset range that has been trimmed
continue;
}
if (streamOffsetRange.getStartOffset() < streamStartOffset) {
// trim stream offset range
streamOffsetRange = new StreamOffsetRange(streamOffsetRange.getStreamId(), streamStartOffset, streamOffsetRange.getEndOffset());
}
if (!sortedStreamOffsetRanges.containsKey(streamOffsetRange.getStreamId())) {
logger.error("Sanity check failed, stream {} is missing after compact", streamOffsetRange.getStreamId());
return false;
return true;
}
boolean contained = false;
for (StreamOffsetRange compactedStreamOffsetRange : sortedStreamOffsetRanges.get(streamOffsetRange.getStreamId())) {
Expand All @@ -500,12 +516,12 @@ boolean sanityCheckCompactionResult(List<StreamMetadata> streamMetadataList, Lis
}
if (!contained) {
logger.error("Sanity check failed, object {} offset range {} is missing after compact", objectId, streamOffsetRange);
return false;
return true;
}
}
}

return true;
return false;
}

private List<StreamOffsetRange> sortAndMerge(List<StreamOffsetRange> streamOffsetRangeList) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,13 @@

import com.automq.stream.s3.Config;
import com.automq.stream.s3.ObjectReader;
import com.automq.stream.s3.ObjectWriter;
import com.automq.stream.s3.StreamDataBlock;
import com.automq.stream.s3.TestUtils;
import com.automq.stream.s3.compact.operator.DataBlockReader;
import com.automq.stream.s3.metadata.StreamMetadata;
import com.automq.stream.s3.metadata.StreamState;
import com.automq.stream.s3.model.StreamRecordBatch;
import com.automq.stream.s3.objects.CommitStreamSetObjectRequest;
import com.automq.stream.s3.objects.ObjectStreamRange;
import com.automq.stream.s3.objects.StreamObject;
Expand Down Expand Up @@ -53,9 +55,11 @@
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyLong;
Expand Down Expand Up @@ -222,6 +226,76 @@ public void testCompactWithDataTrimmed2() {
Assertions.assertTrue(checkDataIntegrity(streamMetadataList, S3_WAL_OBJECT_METADATA_LIST, request));
}

@Test
public void testCompactionWithDataTrimmed3() {
objectManager.prepareObject(1, TimeUnit.MINUTES.toMillis(30)).thenAccept(objectId -> {
assertEquals(OBJECT_3, objectId);
ObjectWriter objectWriter = ObjectWriter.writer(OBJECT_3, s3Operator, 1024, 1024);
StreamRecordBatch r1 = new StreamRecordBatch(STREAM_1, 0, 500, 20, TestUtils.random(20));
StreamRecordBatch r2 = new StreamRecordBatch(STREAM_3, 0, 0, 10, TestUtils.random(1024));
StreamRecordBatch r3 = new StreamRecordBatch(STREAM_3, 0, 10, 10, TestUtils.random(1024));
objectWriter.write(STREAM_1, List.of(r1));
objectWriter.write(STREAM_3, List.of(r2, r3));
objectWriter.close().join();
List<StreamOffsetRange> streamsIndices = List.of(
new StreamOffsetRange(STREAM_1, 500, 520),
new StreamOffsetRange(STREAM_3, 0, 20)
);
S3ObjectMetadata objectMetadata = new S3ObjectMetadata(OBJECT_3, S3ObjectType.STREAM_SET, streamsIndices, System.currentTimeMillis(),
System.currentTimeMillis(), objectWriter.size(), OBJECT_3);
S3_WAL_OBJECT_METADATA_LIST.add(objectMetadata);
List.of(r1, r2, r3).forEach(StreamRecordBatch::release);
}).join();
when(streamManager.getStreams(Collections.emptyList())).thenReturn(CompletableFuture.completedFuture(
List.of(new StreamMetadata(STREAM_0, 0, 0, 20, StreamState.OPENED),
new StreamMetadata(STREAM_1, 0, 25, 500, StreamState.OPENED),
new StreamMetadata(STREAM_2, 0, 30, 270, StreamState.OPENED),
new StreamMetadata(STREAM_3, 0, 10, 20, StreamState.OPENED))));
compactionManager = new CompactionManager(config, objectManager, streamManager, s3Operator);
List<StreamMetadata> streamMetadataList = this.streamManager.getStreams(Collections.emptyList()).join();
CommitStreamSetObjectRequest request = compactionManager.buildCompactRequest(streamMetadataList, S3_WAL_OBJECT_METADATA_LIST);
assertNull(request);
}

@Test
public void testCompactionWithDataTrimmed4() {
objectManager.prepareObject(1, TimeUnit.MINUTES.toMillis(30)).thenAccept(objectId -> {
assertEquals(OBJECT_3, objectId);
ObjectWriter objectWriter = ObjectWriter.writer(OBJECT_3, s3Operator, 200, 1024);
StreamRecordBatch r1 = new StreamRecordBatch(STREAM_1, 0, 500, 20, TestUtils.random(20));
StreamRecordBatch r2 = new StreamRecordBatch(STREAM_3, 0, 0, 10, TestUtils.random(200));
StreamRecordBatch r3 = new StreamRecordBatch(STREAM_3, 0, 10, 10, TestUtils.random(200));
objectWriter.write(STREAM_1, List.of(r1));
objectWriter.write(STREAM_3, List.of(r2, r3));
objectWriter.close().join();
List<StreamOffsetRange> streamsIndices = List.of(
new StreamOffsetRange(STREAM_1, 500, 520),
new StreamOffsetRange(STREAM_3, 0, 20)
);
S3ObjectMetadata objectMetadata = new S3ObjectMetadata(OBJECT_3, S3ObjectType.STREAM_SET, streamsIndices, System.currentTimeMillis(),
System.currentTimeMillis(), objectWriter.size(), OBJECT_3);
S3_WAL_OBJECT_METADATA_LIST.add(objectMetadata);
List.of(r1, r2, r3).forEach(StreamRecordBatch::release);
}).join();
when(streamManager.getStreams(Collections.emptyList())).thenReturn(CompletableFuture.completedFuture(
List.of(new StreamMetadata(STREAM_0, 0, 0, 20, StreamState.OPENED),
new StreamMetadata(STREAM_1, 0, 25, 500, StreamState.OPENED),
new StreamMetadata(STREAM_2, 0, 30, 270, StreamState.OPENED),
new StreamMetadata(STREAM_3, 0, 10, 20, StreamState.OPENED))));
compactionManager = new CompactionManager(config, objectManager, streamManager, s3Operator);
List<StreamMetadata> streamMetadataList = this.streamManager.getStreams(Collections.emptyList()).join();
CommitStreamSetObjectRequest request = compactionManager.buildCompactRequest(streamMetadataList, S3_WAL_OBJECT_METADATA_LIST);

assertEquals(List.of(OBJECT_0, OBJECT_1, OBJECT_2, OBJECT_3), request.getCompactedObjectIds());
assertEquals(OBJECT_0, request.getOrderId());
assertTrue(request.getObjectId() > OBJECT_3);
request.getStreamObjects().forEach(s -> assertTrue(s.getObjectId() > OBJECT_3));
assertEquals(4, request.getStreamObjects().size());
assertEquals(2, request.getStreamRanges().size());

Assertions.assertTrue(checkDataIntegrity(streamMetadataList, S3_WAL_OBJECT_METADATA_LIST, request));
}

@Test
public void testCompactWithNonExistStream() {
when(streamManager.getStreams(Collections.emptyList())).thenReturn(CompletableFuture.completedFuture(
Expand Down

0 comments on commit b29f254

Please # to comment.