diff --git a/src/main/java/org/apache/flink/contrib/siddhi/operator/AbstractSiddhiOperator.java b/src/main/java/org/apache/flink/contrib/siddhi/operator/AbstractSiddhiOperator.java index d013133..be3b2aa 100644 --- a/src/main/java/org/apache/flink/contrib/siddhi/operator/AbstractSiddhiOperator.java +++ b/src/main/java/org/apache/flink/contrib/siddhi/operator/AbstractSiddhiOperator.java @@ -26,11 +26,14 @@ import org.apache.flink.core.memory.DataInputViewStreamWrapper; import org.apache.flink.core.memory.DataOutputView; import org.apache.flink.core.memory.DataOutputViewStreamWrapper; +import org.apache.flink.runtime.state.CheckpointStreamFactory; +import org.apache.flink.runtime.state.OperatorStateHandle; import org.apache.flink.streaming.api.TimeCharacteristic; import org.apache.flink.streaming.api.graph.StreamConfig; import org.apache.flink.streaming.api.operators.AbstractStreamOperator; import org.apache.flink.streaming.api.operators.OneInputStreamOperator; import org.apache.flink.streaming.api.operators.Output; +import org.apache.flink.streaming.api.operators.StreamCheckpointedOperator; import org.apache.flink.streaming.api.watermark.Watermark; import org.apache.flink.streaming.runtime.streamrecord.MultiplexingStreamRecordSerializer; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; @@ -48,6 +51,7 @@ import java.util.HashMap; import java.util.Map; import java.util.PriorityQueue; +import java.util.concurrent.RunnableFuture; /** *

Siddhi Runtime Operator

@@ -79,7 +83,7 @@ * @param Input Element Type * @param Output Element Type */ -public abstract class AbstractSiddhiOperator extends AbstractStreamOperator implements OneInputStreamOperator { +public abstract class AbstractSiddhiOperator extends AbstractStreamOperator implements OneInputStreamOperator, StreamCheckpointedOperator { private static final Logger LOGGER = LoggerFactory.getLogger(AbstractSiddhiOperator.class); private static final int INITIAL_PRIORITY_QUEUE_CAPACITY = 11; @@ -263,7 +267,6 @@ public void dispose() throws Exception { @Override public void snapshotState(FSDataOutputStream out, long checkpointId, long timestamp) throws Exception { - super.snapshotState(out, checkpointId, timestamp); final ObjectOutputStream oos = new ObjectOutputStream(out); // Write siddhi snapshot @@ -278,9 +281,13 @@ public void snapshotState(FSDataOutputStream out, long checkpointId, long timest oos.flush(); } + @Override + public RunnableFuture snapshotState(long checkpointId, long timestamp, CheckpointStreamFactory streamFactory) throws Exception { + return super.snapshotState(checkpointId, timestamp, streamFactory); + } + @Override public void restoreState(FSDataInputStream state) throws Exception { - super.restoreState(state); final ObjectInputStream ois = new ObjectInputStream(state); // Restore siddhi snapshot