diff --git a/core/src/test/java/kafka/log/stream/s3/StreamMetadataManagerTest.java b/core/src/test/java/kafka/log/stream/s3/StreamMetadataManagerTest.java index 3dcfeabdcb..b3423d3b6f 100644 --- a/core/src/test/java/kafka/log/stream/s3/StreamMetadataManagerTest.java +++ b/core/src/test/java/kafka/log/stream/s3/StreamMetadataManagerTest.java @@ -110,14 +110,14 @@ public void setUp() { 0L, new S3StreamObject(0L, STREAM0, 10L, 100L, S3StreamConstant.INVALID_TS)); S3StreamMetadataImage streamImage = new S3StreamMetadataImage(STREAM0, 1L, StreamState.OPENED, 0, 10L, ranges, streamObjects); - NodeS3StreamSetObjectMetadataImage walMetadataImage0 = new NodeS3StreamSetObjectMetadataImage(BROKER0, S3StreamConstant.INVALID_BROKER_EPOCH, Map.of( + NodeS3StreamSetObjectMetadataImage walMetadataImage0 = new NodeS3StreamSetObjectMetadataImage(BROKER0, S3StreamConstant.INVALID_BROKER_EPOCH, DeltaMap.of( 1L, new S3StreamSetObject(1L, BROKER0, List.of( new StreamOffsetRange(STREAM1, 0L, 100L)), 1L), 2L, new S3StreamSetObject(2L, BROKER0, List.of( new StreamOffsetRange(STREAM2, 0L, 100L)), 2L))); - S3StreamsMetadataImage streamsImage = new S3StreamsMetadataImage(STREAM0, Map.of(STREAM0, streamImage), - Map.of(BROKER0, walMetadataImage0)); + S3StreamsMetadataImage streamsImage = new S3StreamsMetadataImage(STREAM0, DeltaMap.of(STREAM0, streamImage), + DeltaMap.of(BROKER0, walMetadataImage0)); image0 = new MetadataImage(new MetadataProvenance(0, 0, 0), null, null, null, null, null, null, null, streamsImage, objectsImage, null, null); ranges = new HashMap<>(ranges); @@ -125,8 +125,8 @@ public void setUp() { streamObjects = new HashMap<>(streamObjects); streamObjects.put(1L, new S3StreamObject(1L, STREAM0, 100L, 150L, S3StreamConstant.INVALID_TS)); streamImage = new S3StreamMetadataImage(STREAM0, 2L, StreamState.OPENED, 1, 10L, ranges, streamObjects); - streamsImage = new S3StreamsMetadataImage(STREAM0, Map.of(STREAM0, streamImage), - Map.of(BROKER0, NodeS3StreamSetObjectMetadataImage.EMPTY)); + streamsImage = new S3StreamsMetadataImage(STREAM0, DeltaMap.of(STREAM0, streamImage), + DeltaMap.of(BROKER0, NodeS3StreamSetObjectMetadataImage.EMPTY)); image1 = new MetadataImage(new MetadataProvenance(1, 1, 1), null, null, null, null, null, null, null, streamsImage, objectsImage, null, null); ranges = new HashMap<>(ranges); @@ -134,8 +134,8 @@ public void setUp() { streamObjects = new HashMap<>(streamObjects); streamObjects.put(2L, new S3StreamObject(2L, STREAM0, 150L, 200L, S3StreamConstant.INVALID_TS)); streamImage = new S3StreamMetadataImage(STREAM0, 3L, StreamState.OPENED, 2, 10L, ranges, streamObjects); - streamsImage = new S3StreamsMetadataImage(STREAM0, Map.of(STREAM0, streamImage), - Map.of(BROKER0, NodeS3StreamSetObjectMetadataImage.EMPTY)); + streamsImage = new S3StreamsMetadataImage(STREAM0, DeltaMap.of(STREAM0, streamImage), + DeltaMap.of(BROKER0, NodeS3StreamSetObjectMetadataImage.EMPTY)); image2 = new MetadataImage(new MetadataProvenance(2, 2, 2), null, null, null, null, null, null, null, streamsImage, objectsImage, null, null); } diff --git a/metadata/src/main/java/org/apache/kafka/image/DeltaMap.java b/metadata/src/main/java/org/apache/kafka/image/DeltaMap.java index c2181e064a..c41c76e1db 100644 --- a/metadata/src/main/java/org/apache/kafka/image/DeltaMap.java +++ b/metadata/src/main/java/org/apache/kafka/image/DeltaMap.java @@ -30,8 +30,12 @@ public class DeltaMap { private static final double MERGE_DELETE_THRESHOLD = 0.1; private final int[] deltaThresholds; final List> deltas; - Set deleted; - boolean newDeleted; + Set removed; + boolean newRemoved; + + public DeltaMap() { + this(new int[]{}); + } public DeltaMap(int[] deltaThresholds) { this.deltaThresholds = deltaThresholds; @@ -40,26 +44,59 @@ public DeltaMap(int[] deltaThresholds) { deltas.add(new HashMap<>(threshold)); } deltas.add(new HashMap<>()); - deleted = new HashSet<>(); + removed = new HashSet<>(); } - public DeltaMap(int[] deltaThresholds, List> deltas, Set deleted) { + public DeltaMap(int[] deltaThresholds, List> deltas, Set removed) { this.deltaThresholds = deltaThresholds; this.deltas = deltas; - this.deleted = deleted; + this.removed = removed; + } + + @SuppressWarnings("unchecked") + public static DeltaMap of(Object... kvList) { + if (kvList.length % 2 != 0) { + throw new IllegalArgumentException("kvList must be even length"); + } + DeltaMap map = new DeltaMap<>(new int[]{}); + for (int i = 0; i < kvList.length; i += 2) { + map.put((K) kvList[i], (V) kvList[i + 1]); + } + return map; + } + + public void put(K key, V value) { + Map delta0 = deltas.get(0); + delta0.put(key, value); + if (!removed.isEmpty()) { + Set deleted = getRemovedForModify(); + deleted.remove(key); + } } public void putAll(Map addDelta) { Map delta0 = deltas.get(0); delta0.putAll(addDelta); - if (!deleted.isEmpty()) { - Set deleted = getDeletedForModify(); + if (!removed.isEmpty()) { + Set deleted = getRemovedForModify(); addDelta.forEach((k, v) -> deleted.remove(k)); } } + public boolean containsKey(K key) { + if (removed.contains(key)) { + return false; + } + for (Map delta : deltas) { + if (delta.containsKey(key)) { + return true; + } + } + return false; + } + public V get(K key) { - if (deleted.contains(key)) { + if (removed.contains(key)) { return null; } for (Map delta : deltas) { @@ -71,26 +108,35 @@ public V get(K key) { return null; } - public void deleteAll(Collection newDeleted) { - getDeletedForModify().addAll(newDeleted); + public V getOrDefault(K key, V defaultValue) { + V value = get(key); + return value == null ? defaultValue : value; + } + + public void remove(K key) { + getRemovedForModify().add(key); + } + + public void removeAll(Collection newDeleted) { + getRemovedForModify().addAll(newDeleted); } - private Set getDeletedForModify() { - if (!newDeleted) { + private Set getRemovedForModify() { + if (!newRemoved) { int mapSize = deltas.stream().mapToInt(Map::size).sum(); - if (mapSize > 0 && 1.0 * deleted.size() / mapSize >= MERGE_DELETE_THRESHOLD) { + if (mapSize > 0 && 1.0 * removed.size() / mapSize >= MERGE_DELETE_THRESHOLD) { compact(); } - this.deleted = new HashSet<>(deleted); - newDeleted = true; + this.removed = new HashSet<>(removed); + newRemoved = true; } - return deleted; + return removed; } public boolean isEmpty() { for (Map delta : deltas) { int deltaSize = delta.size(); - for (K deletedKey : deleted) { + for (K deletedKey : removed) { if (delta.containsKey(deletedKey)) { deltaSize--; } else { @@ -105,7 +151,7 @@ public boolean isEmpty() { } public void forEach(BiConsumer consumer) { - Set done = new HashSet<>(deleted); + Set done = new HashSet<>(removed); for (int i = 0; i < deltas.size(); i++) { boolean lastDelta = i == deltas.size() - 1; deltas.get(i).forEach((k, v) -> { @@ -133,7 +179,7 @@ public DeltaMap copy() { deltas.set(i + 1, next); } } - return new DeltaMap<>(deltaThresholds, deltas, this.deleted); + return new DeltaMap<>(deltaThresholds, deltas, this.removed); } private Map compact() { @@ -141,12 +187,12 @@ private Map compact() { for (int i = deltas.size() - 1; i >= 0; i--) { all.putAll(deltas.get(i)); } - deleted.forEach(all::remove); + removed.forEach(all::remove); for (int i = 0; i < deltas.size() - 1; i++) { deltas.set(i, new HashMap<>(deltaThresholds[i])); } deltas.set(deltas.size() - 1, all); - deleted = new HashSet<>(); + removed = new HashSet<>(); return all; } diff --git a/metadata/src/main/java/org/apache/kafka/image/NodeS3StreamSetObjectMetadataImage.java b/metadata/src/main/java/org/apache/kafka/image/NodeS3StreamSetObjectMetadataImage.java index b45e4a9346..4a37683df0 100644 --- a/metadata/src/main/java/org/apache/kafka/image/NodeS3StreamSetObjectMetadataImage.java +++ b/metadata/src/main/java/org/apache/kafka/image/NodeS3StreamSetObjectMetadataImage.java @@ -18,42 +18,30 @@ package org.apache.kafka.image; -import java.util.ArrayList; -import java.util.Collections; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.Objects; -import java.util.SortedMap; -import java.util.TreeMap; - import com.automq.stream.s3.metadata.S3StreamConstant; import org.apache.kafka.common.metadata.NodeWALMetadataRecord; -import org.apache.kafka.metadata.stream.S3StreamSetObject; import org.apache.kafka.image.writer.ImageWriter; import org.apache.kafka.image.writer.ImageWriterOptions; +import org.apache.kafka.metadata.stream.S3StreamSetObject; import org.apache.kafka.server.common.ApiMessageAndVersion; -public class NodeS3StreamSetObjectMetadataImage { +import java.util.ArrayList; +import java.util.Comparator; +import java.util.List; +import java.util.Objects; +public class NodeS3StreamSetObjectMetadataImage { public static final NodeS3StreamSetObjectMetadataImage EMPTY = new NodeS3StreamSetObjectMetadataImage(S3StreamConstant.INVALID_BROKER_ID, - S3StreamConstant.INVALID_BROKER_EPOCH, Collections.emptyMap()); + S3StreamConstant.INVALID_BROKER_EPOCH, new DeltaMap<>(new int[]{100})); private final int nodeId; private final long nodeEpoch; - private final Map s3Objects; - private final SortedMap orderIndex; + private final DeltaMap s3Objects; + private List orderIndex; - public NodeS3StreamSetObjectMetadataImage(int nodeId, long nodeEpoch, Map streamSetObjects) { + public NodeS3StreamSetObjectMetadataImage(int nodeId, long nodeEpoch, DeltaMap streamSetObjects) { this.nodeId = nodeId; this.nodeEpoch = nodeEpoch; - this.s3Objects = new HashMap<>(streamSetObjects); - // build order index - if (s3Objects.isEmpty()) { - this.orderIndex = Collections.emptySortedMap(); - } else { - this.orderIndex = new TreeMap<>(); - s3Objects.values().forEach(obj -> orderIndex.put(obj.orderId(), obj)); - } + this.s3Objects = streamSetObjects; } @Override @@ -75,23 +63,23 @@ public int hashCode() { public void write(ImageWriter writer, ImageWriterOptions options) { writer.write(new ApiMessageAndVersion(new NodeWALMetadataRecord() - .setNodeId(nodeId) - .setNodeEpoch(nodeEpoch), (short) 0)); - s3Objects.values().forEach(wal -> { - writer.write(wal.toRecord()); - }); + .setNodeId(nodeId) + .setNodeEpoch(nodeEpoch), (short) 0)); + s3Objects.forEach((k, v) -> writer.write(v.toRecord())); } - public Map getObjects() { + public DeltaMap getObjects() { return s3Objects; } - public SortedMap getOrderIndex() { - return orderIndex; - } - public List orderList() { - return new ArrayList<>(orderIndex.values()); + if (orderIndex == null) { + List objects = new ArrayList<>(); + s3Objects.forEach((k, v) -> objects.add(v)); + objects.sort(Comparator.comparingLong(S3StreamSetObject::orderId)); + orderIndex = objects; + } + return orderIndex; } public int getNodeId() { @@ -105,9 +93,9 @@ public long getNodeEpoch() { @Override public String toString() { return "NodeS3WALMetadataImage{" + - "nodeId=" + nodeId + - ", nodeEpoch=" + nodeEpoch + - ", objects=" + s3Objects + - '}'; + "nodeId=" + nodeId + + ", nodeEpoch=" + nodeEpoch + + ", objects=" + s3Objects + + '}'; } } diff --git a/metadata/src/main/java/org/apache/kafka/image/NodeS3WALMetadataDelta.java b/metadata/src/main/java/org/apache/kafka/image/NodeS3WALMetadataDelta.java index bbddd70999..c7158c8c46 100644 --- a/metadata/src/main/java/org/apache/kafka/image/NodeS3WALMetadataDelta.java +++ b/metadata/src/main/java/org/apache/kafka/image/NodeS3WALMetadataDelta.java @@ -59,12 +59,12 @@ public void replay(RemoveStreamSetObjectRecord record) { } public NodeS3StreamSetObjectMetadataImage apply() { - Map newS3StreamSetObjects = new HashMap<>(image.getObjects()); + DeltaMap streamSetObjects = image.getObjects().copy(); // add all changed stream set objects - newS3StreamSetObjects.putAll(addedS3StreamSetObjects); + streamSetObjects.putAll(addedS3StreamSetObjects); // remove all removed stream set objects - removedS3StreamSetObjects.forEach(newS3StreamSetObjects::remove); - return new NodeS3StreamSetObjectMetadataImage(this.nodeId, this.nodeEpoch, newS3StreamSetObjects); + streamSetObjects.removeAll(removedS3StreamSetObjects); + return new NodeS3StreamSetObjectMetadataImage(this.nodeId, this.nodeEpoch, streamSetObjects); } } diff --git a/metadata/src/main/java/org/apache/kafka/image/S3ObjectsDelta.java b/metadata/src/main/java/org/apache/kafka/image/S3ObjectsDelta.java index e0c9a639d8..034ab8106d 100644 --- a/metadata/src/main/java/org/apache/kafka/image/S3ObjectsDelta.java +++ b/metadata/src/main/java/org/apache/kafka/image/S3ObjectsDelta.java @@ -78,7 +78,7 @@ public S3ObjectsImage apply() { // put all new changed objects newObjectsMetadata.putAll(changedObjects); // remove all removed objects - newObjectsMetadata.deleteAll(removedObjectIds); + newObjectsMetadata.removeAll(removedObjectIds); return new S3ObjectsImage(currentAssignedObjectId, newObjectsMetadata); } diff --git a/metadata/src/main/java/org/apache/kafka/image/S3StreamsMetadataDelta.java b/metadata/src/main/java/org/apache/kafka/image/S3StreamsMetadataDelta.java index 93ceeb82da..7f85bfce99 100644 --- a/metadata/src/main/java/org/apache/kafka/image/S3StreamsMetadataDelta.java +++ b/metadata/src/main/java/org/apache/kafka/image/S3StreamsMetadataDelta.java @@ -62,9 +62,7 @@ public void replay(AssignedStreamIdRecord record) { public void replay(S3StreamRecord record) { getOrCreateStreamMetadataDelta(record.streamId()).replay(record); - if (deletedStreams.contains(record.streamId())) { - deletedStreams.remove(record.streamId()); - } + deletedStreams.remove(record.streamId()); } public void replay(RemoveS3StreamRecord record) { @@ -75,9 +73,7 @@ public void replay(RemoveS3StreamRecord record) { public void replay(NodeWALMetadataRecord record) { getOrCreateNodeStreamMetadataDelta(record.nodeId()).replay(record); - if (deletedNodes.contains(record.nodeId())) { - deletedNodes.remove(record.nodeId()); - } + deletedNodes.remove(record.nodeId()); } public void replay(RemoveNodeWALMetadataRecord record) { @@ -139,26 +135,16 @@ private NodeS3WALMetadataDelta getOrCreateNodeStreamMetadataDelta(Integer nodeId } S3StreamsMetadataImage apply() { - Map newStreams = new HashMap<>(image.streamsMetadata()); - Map newNodeStreams = new HashMap<>(image.nodeWALMetadata()); - + DeltaMap streams = image.streamsMetadata().copy(); // apply the delta changes of old streams since the last image - this.changedStreams.forEach((streamId, delta) -> { - S3StreamMetadataImage newS3StreamMetadataImage = delta.apply(); - newStreams.put(streamId, newS3StreamMetadataImage); - }); - // remove the deleted streams - deletedStreams.forEach(newStreams::remove); - + changedStreams.forEach((streamId, delta) -> streams.put(streamId, delta.apply())); + streams.removeAll(deletedStreams); + DeltaMap nodes = image.nodeWALMetadata().copy(); // apply the delta changes of old nodes since the last image - this.changedNodes.forEach((nodeId, delta) -> { - NodeS3StreamSetObjectMetadataImage newNodeS3StreamSetObjectMetadataImage = delta.apply(); - newNodeStreams.put(nodeId, newNodeS3StreamSetObjectMetadataImage); - }); + this.changedNodes.forEach((nodeId, delta) -> nodes.put(nodeId, delta.apply())); // remove the deleted nodes - deletedNodes.forEach(newNodeStreams::remove); - - return new S3StreamsMetadataImage(currentAssignedStreamId, newStreams, newNodeStreams); + nodes.removeAll(deletedNodes); + return new S3StreamsMetadataImage(currentAssignedStreamId, streams, nodes); } @Override diff --git a/metadata/src/main/java/org/apache/kafka/image/S3StreamsMetadataImage.java b/metadata/src/main/java/org/apache/kafka/image/S3StreamsMetadataImage.java index 196537b31c..c70e32c2de 100644 --- a/metadata/src/main/java/org/apache/kafka/image/S3StreamsMetadataImage.java +++ b/metadata/src/main/java/org/apache/kafka/image/S3StreamsMetadataImage.java @@ -43,18 +43,18 @@ public final class S3StreamsMetadataImage { public static final S3StreamsMetadataImage EMPTY = - new S3StreamsMetadataImage(-1, Collections.emptyMap(), Collections.emptyMap()); + new S3StreamsMetadataImage(-1, new DeltaMap<>(new int[]{1000, 10000}), new DeltaMap<>(new int[]{1000, 10000})); - private long nextAssignedStreamId; + private final long nextAssignedStreamId; - private final Map streamsMetadata; + private final DeltaMap streamsMetadata; - private final Map nodeStreamSetObjectMetadata; + private final DeltaMap nodeStreamSetObjectMetadata; public S3StreamsMetadataImage( long assignedStreamId, - Map streamsMetadata, - Map nodeStreamSetObjectMetadata) { + DeltaMap streamsMetadata, + DeltaMap nodeStreamSetObjectMetadata) { this.nextAssignedStreamId = assignedStreamId + 1; this.streamsMetadata = streamsMetadata; this.nodeStreamSetObjectMetadata = nodeStreamSetObjectMetadata; @@ -69,8 +69,8 @@ public void write(ImageWriter writer, ImageWriterOptions options) { writer.write( new ApiMessageAndVersion( new AssignedStreamIdRecord().setAssignedStreamId(nextAssignedStreamId - 1), (short) 0)); - streamsMetadata.values().forEach(image -> image.write(writer, options)); - nodeStreamSetObjectMetadata.values().forEach(image -> image.write(writer, options)); + streamsMetadata.forEach((k, v) -> v.write(writer, options)); + nodeStreamSetObjectMetadata.forEach((k, v) -> v.write(writer, options)); } public InRangeObjects getObjects(long streamId, long startOffset, long endOffset, int limit) { @@ -315,11 +315,11 @@ public int hashCode() { return Objects.hash(nextAssignedStreamId, streamsMetadata, nodeStreamSetObjectMetadata); } - public Map nodeWALMetadata() { + public DeltaMap nodeWALMetadata() { return nodeStreamSetObjectMetadata; } - public Map streamsMetadata() { + public DeltaMap streamsMetadata() { return streamsMetadata; } @@ -338,13 +338,7 @@ public long nextAssignedStreamId() { @Override public String toString() { - return "S3StreamsMetadataImage{" + - "nextAssignedStreamId=" + nextAssignedStreamId + - ", streamsMetadata=" + streamsMetadata.entrySet().stream(). - map(e -> e.getKey() + ":" + e.getValue()).collect(Collectors.joining(", ")) + - ", nodeWALMetadata=" + nodeStreamSetObjectMetadata.entrySet().stream(). - map(e -> e.getKey() + ":" + e.getValue()).collect(Collectors.joining(", ")) + - '}'; + return "S3StreamsMetadataImage{nextAssignedStreamId=" + nextAssignedStreamId + '}'; } static class StreamOffsetRanges extends AbstractOrderedCollection { diff --git a/metadata/src/test/java/org/apache/kafka/image/DeltaMapTest.java b/metadata/src/test/java/org/apache/kafka/image/DeltaMapTest.java index 0efe3c64c2..74e3420f6f 100644 --- a/metadata/src/test/java/org/apache/kafka/image/DeltaMapTest.java +++ b/metadata/src/test/java/org/apache/kafka/image/DeltaMapTest.java @@ -31,7 +31,7 @@ public class DeltaMapTest { public void testCopy() { DeltaMap map = new DeltaMap<>(new int[]{2}); map.putAll(Map.of(1, 1, 2, 2, 3, 3, 5, 5)); - map.deleteAll(List.of(3)); + map.removeAll(List.of(3)); DeltaMap copy1 = map.copy(); copy1.putAll(Map.of(1, 3)); @@ -49,23 +49,23 @@ public void testCopy() { public void testDelete() { DeltaMap map = new DeltaMap<>(new int[]{10}); map.putAll(Map.of(1, 1, 2, 2, 3, 3)); - map.deleteAll(List.of(3)); + map.removeAll(List.of(3)); map = map.copy(); // trigger compact delete map.putAll(Map.of(4, 4)); assertNull(map.get(3)); - assertEquals(0, map.deleted.size()); + assertEquals(0, map.removed.size()); assertEquals(0, map.deltas.get(0).size()); assertEquals(3, map.deltas.get(1).size()); - map.deleteAll(List.of(4)); - assertEquals(1, map.deleted.size()); + map.removeAll(List.of(4)); + assertEquals(1, map.removed.size()); assertNull(map.get(4)); map.putAll(Map.of(4, 44)); assertEquals(44, map.get(4)); - assertEquals(0, map.deleted.size()); + assertEquals(0, map.removed.size()); } } diff --git a/metadata/src/test/java/org/apache/kafka/image/NodeMetadataImageTest.java b/metadata/src/test/java/org/apache/kafka/image/NodeMetadataImageTest.java index 8f4c1acc7d..d7d6d2df30 100644 --- a/metadata/src/test/java/org/apache/kafka/image/NodeMetadataImageTest.java +++ b/metadata/src/test/java/org/apache/kafka/image/NodeMetadataImageTest.java @@ -17,13 +17,6 @@ package org.apache.kafka.image; -import static org.junit.jupiter.api.Assertions.assertEquals; - -import java.util.ArrayList; -import java.util.Collections; -import java.util.List; -import java.util.Map; - import com.automq.stream.s3.metadata.S3StreamConstant; import com.automq.stream.s3.metadata.StreamOffsetRange; import org.apache.kafka.common.metadata.NodeWALMetadataRecord; @@ -38,6 +31,11 @@ import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Timeout; +import java.util.ArrayList; +import java.util.List; + +import static org.junit.jupiter.api.Assertions.assertEquals; + @Timeout(value = 40) @Tag("S3Unit") public class NodeMetadataImageTest { @@ -50,7 +48,7 @@ public class NodeMetadataImageTest { @Test public void testS3Objects() { - NodeS3StreamSetObjectMetadataImage image0 = new NodeS3StreamSetObjectMetadataImage(BROKER0, S3StreamConstant.INVALID_BROKER_EPOCH, Collections.emptyMap()); + NodeS3StreamSetObjectMetadataImage image0 = new NodeS3StreamSetObjectMetadataImage(BROKER0, S3StreamConstant.INVALID_BROKER_EPOCH, new DeltaMap<>()); List delta0Records = new ArrayList<>(); NodeS3WALMetadataDelta delta0 = new NodeS3WALMetadataDelta(image0); // 1. create StreamSetObject0 and StreamSetObject1 @@ -82,7 +80,7 @@ public void testS3Objects() { RecordTestUtils.replayAll(delta0, delta0Records); // verify delta and check image's write NodeS3StreamSetObjectMetadataImage image1 = new NodeS3StreamSetObjectMetadataImage(BROKER0, 1, - Map.of( + DeltaMap.of( 0L, new S3StreamSetObject(0L, BROKER0, List.of( new StreamOffsetRange(STREAM0, 0L, 100L), new StreamOffsetRange(STREAM1, 0L, 200L)), 0L), @@ -109,7 +107,7 @@ public void testS3Objects() { RecordTestUtils.replayAll(delta1, delta1Records); // verify delta and check image's write NodeS3StreamSetObjectMetadataImage image2 = new NodeS3StreamSetObjectMetadataImage(BROKER0, 2, - Map.of( + DeltaMap.of( 0L, new S3StreamSetObject(0L, BROKER0, List.of( new StreamOffsetRange(STREAM1, 0L, 200L)), 0L), 1L, new S3StreamSetObject(1L, BROKER0, List.of( @@ -125,7 +123,7 @@ public void testS3Objects() { RecordTestUtils.replayAll(delta2, delta2Records); // verify delta and check image's write NodeS3StreamSetObjectMetadataImage image3 = new NodeS3StreamSetObjectMetadataImage(BROKER0, 2, - Map.of( + DeltaMap.of( 0L, new S3StreamSetObject(0L, BROKER0, List.of( new StreamOffsetRange(STREAM1, 0L, 200L)), 0L))); assertEquals(image3, delta2.apply()); diff --git a/metadata/src/test/java/org/apache/kafka/image/S3StreamsMetadataImageTest.java b/metadata/src/test/java/org/apache/kafka/image/S3StreamsMetadataImageTest.java index e8131ec1ec..74e656d729 100644 --- a/metadata/src/test/java/org/apache/kafka/image/S3StreamsMetadataImageTest.java +++ b/metadata/src/test/java/org/apache/kafka/image/S3StreamsMetadataImageTest.java @@ -34,8 +34,6 @@ import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Timeout; -import java.util.Collections; -import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.stream.Collectors; @@ -77,7 +75,7 @@ public void testAssignedChange() { .setAssignedStreamId(0), (short) 0); S3StreamsMetadataDelta delta0 = new S3StreamsMetadataDelta(image0); RecordTestUtils.replayAll(delta0, List.of(record0)); - S3StreamsMetadataImage image1 = new S3StreamsMetadataImage(0, Collections.emptyMap(), Collections.emptyMap()); + S3StreamsMetadataImage image1 = new S3StreamsMetadataImage(0, new DeltaMap<>(), new DeltaMap<>()); assertEquals(image1, delta0.apply()); testToImageAndBack(image1); @@ -85,7 +83,7 @@ public void testAssignedChange() { .setAssignedStreamId(10), (short) 0); S3StreamsMetadataDelta delta1 = new S3StreamsMetadataDelta(image1); RecordTestUtils.replayAll(delta1, List.of(record1)); - S3StreamsMetadataImage image2 = new S3StreamsMetadataImage(10, Collections.emptyMap(), Collections.emptyMap()); + S3StreamsMetadataImage image2 = new S3StreamsMetadataImage(10, new DeltaMap<>(), new DeltaMap<>()); assertEquals(image2, delta1.apply()); } @@ -101,21 +99,21 @@ private void testToImageAndBack(S3StreamsMetadataImage image) { @Test public void testGetObjects() { - Map broker0Objects = Map.of( + DeltaMap broker0Objects = DeltaMap.of( 0L, new S3StreamSetObject(0, BROKER0, List.of(new StreamOffsetRange(STREAM0, 100L, 120L)), 0L), 1L, new S3StreamSetObject(1, BROKER0, List.of(new StreamOffsetRange(STREAM0, 120L, 140L)), 1L), 2L, new S3StreamSetObject(2, BROKER0, List.of(new StreamOffsetRange(STREAM0, 180L, 200L)), 2L), 3L, new S3StreamSetObject(3, BROKER0, List.of( new StreamOffsetRange(STREAM0, 400L, 420L)), 3L), 4L, new S3StreamSetObject(4, BROKER0, List.of(new StreamOffsetRange(STREAM0, 520L, 600L)), 4L)); - Map broker1Objects = Map.of( + DeltaMap broker1Objects = DeltaMap.of( 5L, new S3StreamSetObject(5, BROKER1, List.of(new StreamOffsetRange(STREAM0, 140L, 160L)), 0L), 6L, new S3StreamSetObject(6, BROKER1, List.of(new StreamOffsetRange(STREAM0, 160L, 180L)), 1L), 7L, new S3StreamSetObject(7, BROKER1, List.of(new StreamOffsetRange(STREAM0, 420L, 520L)), 2L)); NodeS3StreamSetObjectMetadataImage broker0WALMetadataImage = new NodeS3StreamSetObjectMetadataImage(BROKER0, S3StreamConstant.INVALID_BROKER_EPOCH, - new HashMap<>(broker0Objects)); + broker0Objects); NodeS3StreamSetObjectMetadataImage broker1WALMetadataImage = new NodeS3StreamSetObjectMetadataImage(BROKER1, S3StreamConstant.INVALID_BROKER_EPOCH, - new HashMap<>(broker1Objects)); + broker1Objects); Map ranges = Map.of( 0, new RangeMetadata(STREAM0, 0L, 0, 10L, 140L, BROKER0), 1, new RangeMetadata(STREAM0, 1L, 1, 140L, 180L, BROKER1), @@ -127,8 +125,8 @@ public void testGetObjects() { 9L, new S3StreamObject(9, STREAM0, 200L, 300L, S3StreamConstant.INVALID_TS), 10L, new S3StreamObject(10, STREAM0, 300L, 400L, S3StreamConstant.INVALID_TS)); S3StreamMetadataImage streamImage = new S3StreamMetadataImage(STREAM0, 4L, StreamState.OPENED, 4, 10, ranges, streamObjects); - S3StreamsMetadataImage streamsImage = new S3StreamsMetadataImage(STREAM0, Map.of(STREAM0, streamImage), - Map.of(BROKER0, broker0WALMetadataImage, BROKER1, broker1WALMetadataImage)); + S3StreamsMetadataImage streamsImage = new S3StreamsMetadataImage(STREAM0, DeltaMap.of(STREAM0, streamImage), + DeltaMap.of(BROKER0, broker0WALMetadataImage, BROKER1, broker1WALMetadataImage)); // 1. search stream_1 InRangeObjects objects = streamsImage.getObjects(STREAM1, 10, 100, Integer.MAX_VALUE);