Skip to content

Commit

Permalink
fix(store): ignore operation with illegal argument in recover
Browse files Browse the repository at this point in the history
Signed-off-by: SSpirits <admin@lv5.moe>
  • Loading branch information
ShadowySpirits committed Oct 23, 2023
1 parent 0a0f1ec commit edced8b
Show file tree
Hide file tree
Showing 3 changed files with 19 additions and 24 deletions.
26 changes: 8 additions & 18 deletions store/src/main/java/com/automq/rocketmq/store/S3StreamStore.java
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import com.automq.stream.s3.wal.BlockWALService;
import com.automq.stream.s3.wal.WriteAheadLog;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ThreadPoolExecutor;
Expand All @@ -52,28 +53,20 @@ public class S3StreamStore implements StreamStore {
private static final Logger LOGGER = LoggerFactory.getLogger(S3StreamStore.class);
private final Config s3Config;
private final StreamClient streamClient;
private final StoreMetadataService metadataService;
private final StreamManager streamManager;
private final ObjectManager objectManager;
private final WriteAheadLog writeAheadLog;
private final S3Operator operator;
private final Storage storage;
private final CompactionManager compactionManager;
private final S3BlockCache blockCache;
private final ThreadPoolExecutor storeWorkingThreadPool;

public S3StreamStore(StoreConfig storeConfig, S3StreamConfig streamConfig, StoreMetadataService metadataService,
S3Operator operator) {
this.s3Config = configFrom(streamConfig);

// Build meta service and related manager
this.metadataService = metadataService;
this.streamManager = new S3StreamManager(metadataService);
this.objectManager = new S3ObjectManager(metadataService);
StreamManager streamManager = new S3StreamManager(metadataService);
ObjectManager objectManager = new S3ObjectManager(metadataService);

this.operator = operator;
this.writeAheadLog = BlockWALService.builder(s3Config.s3WALPath(), s3Config.s3WALCapacity()).config(s3Config).build();
this.blockCache = new DefaultS3BlockCache(s3Config.s3CacheSize(), objectManager, operator);
WriteAheadLog writeAheadLog = BlockWALService.builder(s3Config.s3WALPath(), s3Config.s3WALCapacity()).config(s3Config).build();
S3BlockCache blockCache = new DefaultS3BlockCache(s3Config.s3CacheSize(), objectManager, operator);

// Build the s3 storage
this.storage = new S3Storage(s3Config, writeAheadLog, streamManager, objectManager, blockCache, operator);
Expand Down Expand Up @@ -117,13 +110,10 @@ public CompletableFuture<Void> close(List<Long> streamIds) {
List<CompletableFuture<Void>> futureList = streamIds.stream()
.map(streamId -> {
Optional<Stream> stream = streamClient.getStream(streamId);
if (stream.isEmpty()) {
return stream.get().close();
}
return null;
return stream.map(Stream::close).orElse(null);
})
.filter(x -> x != null)
.collect(java.util.stream.Collectors.toList());
.filter(Objects::nonNull)
.toList();
return CompletableFuture.allOf(futureList.toArray(new CompletableFuture[0]))
.thenApplyAsync(result -> result, storeWorkingThreadPool);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -424,10 +424,11 @@ public CompletableFuture<AckResult> ack(String receiptHandle) {
AckOperation operation = new AckOperation(handle.topicId(), handle.queueId(), operationStreamId,
snapshotStreamId, stateMachine, handle.consumerGroupId(), handle.operationId(), System.currentTimeMillis(),
AckOperation.AckOperationType.ACK_NORMAL);
return operationLogService.logAckOperation(operation).thenApply(nil -> {
inflightService.decreaseInflightCount(handle.consumerGroupId(), handle.topicId(), handle.queueId(), 1);
return new AckResult(AckResult.Status.SUCCESS);
}).exceptionally(throwable -> new AckResult(AckResult.Status.ERROR));
return operationLogService.logAckOperation(operation)
.thenApply(nil -> {
inflightService.decreaseInflightCount(handle.consumerGroupId(), handle.topicId(), handle.queueId(), 1);
return new AckResult(AckResult.Status.SUCCESS);
}).exceptionally(throwable -> new AckResult(AckResult.Status.ERROR));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import com.automq.rocketmq.common.config.StoreConfig;
import com.automq.rocketmq.store.api.MessageStateMachine;
import com.automq.rocketmq.store.api.StreamStore;
import com.automq.rocketmq.store.exception.StoreErrorCode;
import com.automq.rocketmq.store.exception.StoreException;
import com.automq.rocketmq.store.model.operation.AckOperation;
import com.automq.rocketmq.store.model.operation.ChangeInvisibleDurationOperation;
Expand Down Expand Up @@ -87,7 +88,9 @@ public CompletableFuture<Void> recover(MessageStateMachine stateMachine, long op
replay(batchWithContext.baseOffset(), operation);
} catch (StoreException e) {
LOGGER.error("Topic {}, queue: {}: Replay operation:{} failed when recover", stateMachine.topicId(), stateMachine.queueId(), operation, e);
throw new CompletionException(e);
if (e.code() != StoreErrorCode.ILLEGAL_ARGUMENT) {
throw new CompletionException(e);
}
}
}
}));
Expand Down Expand Up @@ -122,7 +125,8 @@ public CompletableFuture<LogResult> logAckOperation(AckOperation operation) {
}

@Override
public CompletableFuture<LogResult> logChangeInvisibleDurationOperation(ChangeInvisibleDurationOperation operation) {
public CompletableFuture<LogResult> logChangeInvisibleDurationOperation(
ChangeInvisibleDurationOperation operation) {
return streamStore.append(operation.operationStreamId(),
new SingleRecord(ByteBuffer.wrap(SerializeUtil.encodeChangeInvisibleDurationOperation(operation))))
.thenApply(result -> {
Expand Down

0 comments on commit edced8b

Please # to comment.