Skip to content
New issue

Have a question about this project? # for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “#”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? # to your account

feat(metadata): replace image map to delta map #629

Merged
merged 1 commit into from
Jan 3, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -110,32 +110,32 @@ public void setUp() {
0L, new S3StreamObject(0L, STREAM0, 10L, 100L, S3StreamConstant.INVALID_TS));
S3StreamMetadataImage streamImage = new S3StreamMetadataImage(STREAM0, 1L, StreamState.OPENED, 0, 10L, ranges, streamObjects);

NodeS3StreamSetObjectMetadataImage walMetadataImage0 = new NodeS3StreamSetObjectMetadataImage(BROKER0, S3StreamConstant.INVALID_BROKER_EPOCH, Map.of(
NodeS3StreamSetObjectMetadataImage walMetadataImage0 = new NodeS3StreamSetObjectMetadataImage(BROKER0, S3StreamConstant.INVALID_BROKER_EPOCH, DeltaMap.of(
1L, new S3StreamSetObject(1L, BROKER0, List.of(
new StreamOffsetRange(STREAM1, 0L, 100L)), 1L),
2L, new S3StreamSetObject(2L, BROKER0, List.of(
new StreamOffsetRange(STREAM2, 0L, 100L)), 2L)));

S3StreamsMetadataImage streamsImage = new S3StreamsMetadataImage(STREAM0, Map.of(STREAM0, streamImage),
Map.of(BROKER0, walMetadataImage0));
S3StreamsMetadataImage streamsImage = new S3StreamsMetadataImage(STREAM0, DeltaMap.of(STREAM0, streamImage),
DeltaMap.of(BROKER0, walMetadataImage0));
image0 = new MetadataImage(new MetadataProvenance(0, 0, 0), null, null, null, null, null, null, null, streamsImage, objectsImage, null, null);

ranges = new HashMap<>(ranges);
ranges.put(1, new RangeMetadata(STREAM0, 1L, 1, 100L, 150L, BROKER0));
streamObjects = new HashMap<>(streamObjects);
streamObjects.put(1L, new S3StreamObject(1L, STREAM0, 100L, 150L, S3StreamConstant.INVALID_TS));
streamImage = new S3StreamMetadataImage(STREAM0, 2L, StreamState.OPENED, 1, 10L, ranges, streamObjects);
streamsImage = new S3StreamsMetadataImage(STREAM0, Map.of(STREAM0, streamImage),
Map.of(BROKER0, NodeS3StreamSetObjectMetadataImage.EMPTY));
streamsImage = new S3StreamsMetadataImage(STREAM0, DeltaMap.of(STREAM0, streamImage),
DeltaMap.of(BROKER0, NodeS3StreamSetObjectMetadataImage.EMPTY));
image1 = new MetadataImage(new MetadataProvenance(1, 1, 1), null, null, null, null, null, null, null, streamsImage, objectsImage, null, null);

ranges = new HashMap<>(ranges);
ranges.put(2, new RangeMetadata(STREAM0, 2L, 2, 150L, 200L, BROKER0));
streamObjects = new HashMap<>(streamObjects);
streamObjects.put(2L, new S3StreamObject(2L, STREAM0, 150L, 200L, S3StreamConstant.INVALID_TS));
streamImage = new S3StreamMetadataImage(STREAM0, 3L, StreamState.OPENED, 2, 10L, ranges, streamObjects);
streamsImage = new S3StreamsMetadataImage(STREAM0, Map.of(STREAM0, streamImage),
Map.of(BROKER0, NodeS3StreamSetObjectMetadataImage.EMPTY));
streamsImage = new S3StreamsMetadataImage(STREAM0, DeltaMap.of(STREAM0, streamImage),
DeltaMap.of(BROKER0, NodeS3StreamSetObjectMetadataImage.EMPTY));
image2 = new MetadataImage(new MetadataProvenance(2, 2, 2), null, null, null, null, null, null, null, streamsImage, objectsImage, null, null);
}

Expand Down
88 changes: 67 additions & 21 deletions metadata/src/main/java/org/apache/kafka/image/DeltaMap.java
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,12 @@ public class DeltaMap<K, V> {
private static final double MERGE_DELETE_THRESHOLD = 0.1;
private final int[] deltaThresholds;
final List<Map<K, V>> deltas;
Set<K> deleted;
boolean newDeleted;
Set<K> removed;
boolean newRemoved;

public DeltaMap() {
this(new int[]{});
}

public DeltaMap(int[] deltaThresholds) {
this.deltaThresholds = deltaThresholds;
Expand All @@ -40,26 +44,59 @@ public DeltaMap(int[] deltaThresholds) {
deltas.add(new HashMap<>(threshold));
}
deltas.add(new HashMap<>());
deleted = new HashSet<>();
removed = new HashSet<>();
}

public DeltaMap(int[] deltaThresholds, List<Map<K, V>> deltas, Set<K> deleted) {
public DeltaMap(int[] deltaThresholds, List<Map<K, V>> deltas, Set<K> removed) {
this.deltaThresholds = deltaThresholds;
this.deltas = deltas;
this.deleted = deleted;
this.removed = removed;
}

@SuppressWarnings("unchecked")
public static <K, V> DeltaMap<K, V> of(Object... kvList) {
if (kvList.length % 2 != 0) {
throw new IllegalArgumentException("kvList must be even length");
}
DeltaMap<K, V> map = new DeltaMap<>(new int[]{});
for (int i = 0; i < kvList.length; i += 2) {
map.put((K) kvList[i], (V) kvList[i + 1]);
}
return map;
}

public void put(K key, V value) {
Map<K, V> delta0 = deltas.get(0);
delta0.put(key, value);
if (!removed.isEmpty()) {
Set<K> deleted = getRemovedForModify();
deleted.remove(key);
}
}

public void putAll(Map<K, V> addDelta) {
Map<K, V> delta0 = deltas.get(0);
delta0.putAll(addDelta);
if (!deleted.isEmpty()) {
Set<K> deleted = getDeletedForModify();
if (!removed.isEmpty()) {
Set<K> deleted = getRemovedForModify();
addDelta.forEach((k, v) -> deleted.remove(k));
}
}

public boolean containsKey(K key) {
if (removed.contains(key)) {
return false;
}
for (Map<K, V> delta : deltas) {
if (delta.containsKey(key)) {
return true;
}
}
return false;
}

public V get(K key) {
if (deleted.contains(key)) {
if (removed.contains(key)) {
return null;
}
for (Map<K, V> delta : deltas) {
Expand All @@ -71,26 +108,35 @@ public V get(K key) {
return null;
}

public void deleteAll(Collection<K> newDeleted) {
getDeletedForModify().addAll(newDeleted);
public V getOrDefault(K key, V defaultValue) {
V value = get(key);
return value == null ? defaultValue : value;
}

public void remove(K key) {
getRemovedForModify().add(key);
}

public void removeAll(Collection<K> newDeleted) {
getRemovedForModify().addAll(newDeleted);
}

private Set<K> getDeletedForModify() {
if (!newDeleted) {
private Set<K> getRemovedForModify() {
if (!newRemoved) {
int mapSize = deltas.stream().mapToInt(Map::size).sum();
if (mapSize > 0 && 1.0 * deleted.size() / mapSize >= MERGE_DELETE_THRESHOLD) {
if (mapSize > 0 && 1.0 * removed.size() / mapSize >= MERGE_DELETE_THRESHOLD) {
compact();
}
this.deleted = new HashSet<>(deleted);
newDeleted = true;
this.removed = new HashSet<>(removed);
newRemoved = true;
}
return deleted;
return removed;
}

public boolean isEmpty() {
for (Map<K, V> delta : deltas) {
int deltaSize = delta.size();
for (K deletedKey : deleted) {
for (K deletedKey : removed) {
if (delta.containsKey(deletedKey)) {
deltaSize--;
} else {
Expand All @@ -105,7 +151,7 @@ public boolean isEmpty() {
}

public void forEach(BiConsumer<K, V> consumer) {
Set<K> done = new HashSet<>(deleted);
Set<K> done = new HashSet<>(removed);
for (int i = 0; i < deltas.size(); i++) {
boolean lastDelta = i == deltas.size() - 1;
deltas.get(i).forEach((k, v) -> {
Expand Down Expand Up @@ -133,20 +179,20 @@ public DeltaMap<K, V> copy() {
deltas.set(i + 1, next);
}
}
return new DeltaMap<>(deltaThresholds, deltas, this.deleted);
return new DeltaMap<>(deltaThresholds, deltas, this.removed);
}

private Map<K, V> compact() {
Map<K, V> all = new HashMap<>(deltas.get(deltas.size() - 1).size());
for (int i = deltas.size() - 1; i >= 0; i--) {
all.putAll(deltas.get(i));
}
deleted.forEach(all::remove);
removed.forEach(all::remove);
for (int i = 0; i < deltas.size() - 1; i++) {
deltas.set(i, new HashMap<>(deltaThresholds[i]));
}
deltas.set(deltas.size() - 1, all);
deleted = new HashSet<>();
removed = new HashSet<>();
return all;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,42 +18,30 @@

package org.apache.kafka.image;

import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.SortedMap;
import java.util.TreeMap;

import com.automq.stream.s3.metadata.S3StreamConstant;
import org.apache.kafka.common.metadata.NodeWALMetadataRecord;
import org.apache.kafka.metadata.stream.S3StreamSetObject;
import org.apache.kafka.image.writer.ImageWriter;
import org.apache.kafka.image.writer.ImageWriterOptions;
import org.apache.kafka.metadata.stream.S3StreamSetObject;
import org.apache.kafka.server.common.ApiMessageAndVersion;

public class NodeS3StreamSetObjectMetadataImage {
import java.util.ArrayList;
import java.util.Comparator;
import java.util.List;
import java.util.Objects;

public class NodeS3StreamSetObjectMetadataImage {
public static final NodeS3StreamSetObjectMetadataImage EMPTY = new NodeS3StreamSetObjectMetadataImage(S3StreamConstant.INVALID_BROKER_ID,
S3StreamConstant.INVALID_BROKER_EPOCH, Collections.emptyMap());
S3StreamConstant.INVALID_BROKER_EPOCH, new DeltaMap<>(new int[]{100}));
private final int nodeId;
private final long nodeEpoch;
private final Map<Long/*objectId*/, S3StreamSetObject> s3Objects;
private final SortedMap<Long/*orderId*/, S3StreamSetObject> orderIndex;
private final DeltaMap<Long/*objectId*/, S3StreamSetObject> s3Objects;
private List<S3StreamSetObject> orderIndex;

public NodeS3StreamSetObjectMetadataImage(int nodeId, long nodeEpoch, Map<Long, S3StreamSetObject> streamSetObjects) {
public NodeS3StreamSetObjectMetadataImage(int nodeId, long nodeEpoch, DeltaMap<Long, S3StreamSetObject> streamSetObjects) {
this.nodeId = nodeId;
this.nodeEpoch = nodeEpoch;
this.s3Objects = new HashMap<>(streamSetObjects);
// build order index
if (s3Objects.isEmpty()) {
this.orderIndex = Collections.emptySortedMap();
} else {
this.orderIndex = new TreeMap<>();
s3Objects.values().forEach(obj -> orderIndex.put(obj.orderId(), obj));
}
this.s3Objects = streamSetObjects;
}

@Override
Expand All @@ -75,23 +63,23 @@ public int hashCode() {

public void write(ImageWriter writer, ImageWriterOptions options) {
writer.write(new ApiMessageAndVersion(new NodeWALMetadataRecord()
.setNodeId(nodeId)
.setNodeEpoch(nodeEpoch), (short) 0));
s3Objects.values().forEach(wal -> {
writer.write(wal.toRecord());
});
.setNodeId(nodeId)
.setNodeEpoch(nodeEpoch), (short) 0));
s3Objects.forEach((k, v) -> writer.write(v.toRecord()));
}

public Map<Long, S3StreamSetObject> getObjects() {
public DeltaMap<Long, S3StreamSetObject> getObjects() {
return s3Objects;
}

public SortedMap<Long, S3StreamSetObject> getOrderIndex() {
return orderIndex;
}

public List<S3StreamSetObject> orderList() {
return new ArrayList<>(orderIndex.values());
if (orderIndex == null) {
List<S3StreamSetObject> objects = new ArrayList<>();
s3Objects.forEach((k, v) -> objects.add(v));
objects.sort(Comparator.comparingLong(S3StreamSetObject::orderId));
orderIndex = objects;
}
return orderIndex;
}

public int getNodeId() {
Expand All @@ -105,9 +93,9 @@ public long getNodeEpoch() {
@Override
public String toString() {
return "NodeS3WALMetadataImage{" +
"nodeId=" + nodeId +
", nodeEpoch=" + nodeEpoch +
", objects=" + s3Objects +
'}';
"nodeId=" + nodeId +
", nodeEpoch=" + nodeEpoch +
", objects=" + s3Objects +
'}';
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -59,12 +59,12 @@ public void replay(RemoveStreamSetObjectRecord record) {
}

public NodeS3StreamSetObjectMetadataImage apply() {
Map<Long, S3StreamSetObject> newS3StreamSetObjects = new HashMap<>(image.getObjects());
DeltaMap<Long, S3StreamSetObject> streamSetObjects = image.getObjects().copy();
// add all changed stream set objects
newS3StreamSetObjects.putAll(addedS3StreamSetObjects);
streamSetObjects.putAll(addedS3StreamSetObjects);
// remove all removed stream set objects
removedS3StreamSetObjects.forEach(newS3StreamSetObjects::remove);
return new NodeS3StreamSetObjectMetadataImage(this.nodeId, this.nodeEpoch, newS3StreamSetObjects);
streamSetObjects.removeAll(removedS3StreamSetObjects);
return new NodeS3StreamSetObjectMetadataImage(this.nodeId, this.nodeEpoch, streamSetObjects);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ public S3ObjectsImage apply() {
// put all new changed objects
newObjectsMetadata.putAll(changedObjects);
// remove all removed objects
newObjectsMetadata.deleteAll(removedObjectIds);
newObjectsMetadata.removeAll(removedObjectIds);
return new S3ObjectsImage(currentAssignedObjectId, newObjectsMetadata);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,9 +62,7 @@ public void replay(AssignedStreamIdRecord record) {

public void replay(S3StreamRecord record) {
getOrCreateStreamMetadataDelta(record.streamId()).replay(record);
if (deletedStreams.contains(record.streamId())) {
deletedStreams.remove(record.streamId());
}
deletedStreams.remove(record.streamId());
}

public void replay(RemoveS3StreamRecord record) {
Expand All @@ -75,9 +73,7 @@ public void replay(RemoveS3StreamRecord record) {

public void replay(NodeWALMetadataRecord record) {
getOrCreateNodeStreamMetadataDelta(record.nodeId()).replay(record);
if (deletedNodes.contains(record.nodeId())) {
deletedNodes.remove(record.nodeId());
}
deletedNodes.remove(record.nodeId());
}

public void replay(RemoveNodeWALMetadataRecord record) {
Expand Down Expand Up @@ -139,26 +135,16 @@ private NodeS3WALMetadataDelta getOrCreateNodeStreamMetadataDelta(Integer nodeId
}

S3StreamsMetadataImage apply() {
Map<Long, S3StreamMetadataImage> newStreams = new HashMap<>(image.streamsMetadata());
Map<Integer, NodeS3StreamSetObjectMetadataImage> newNodeStreams = new HashMap<>(image.nodeWALMetadata());

DeltaMap<Long, S3StreamMetadataImage> streams = image.streamsMetadata().copy();
// apply the delta changes of old streams since the last image
this.changedStreams.forEach((streamId, delta) -> {
S3StreamMetadataImage newS3StreamMetadataImage = delta.apply();
newStreams.put(streamId, newS3StreamMetadataImage);
});
// remove the deleted streams
deletedStreams.forEach(newStreams::remove);

changedStreams.forEach((streamId, delta) -> streams.put(streamId, delta.apply()));
streams.removeAll(deletedStreams);
DeltaMap<Integer, NodeS3StreamSetObjectMetadataImage> nodes = image.nodeWALMetadata().copy();
// apply the delta changes of old nodes since the last image
this.changedNodes.forEach((nodeId, delta) -> {
NodeS3StreamSetObjectMetadataImage newNodeS3StreamSetObjectMetadataImage = delta.apply();
newNodeStreams.put(nodeId, newNodeS3StreamSetObjectMetadataImage);
});
this.changedNodes.forEach((nodeId, delta) -> nodes.put(nodeId, delta.apply()));
// remove the deleted nodes
deletedNodes.forEach(newNodeStreams::remove);

return new S3StreamsMetadataImage(currentAssignedStreamId, newStreams, newNodeStreams);
nodes.removeAll(deletedNodes);
return new S3StreamsMetadataImage(currentAssignedStreamId, streams, nodes);
}

@Override
Expand Down
Loading