Skip to content

Commit

Permalink
Snapshot queued events
Browse files Browse the repository at this point in the history
  • Loading branch information
haoch committed Jul 27, 2017
1 parent fd6ac14 commit 5e07371
Show file tree
Hide file tree
Showing 2 changed files with 59 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

package org.apache.flink.contrib.siddhi.operator;

import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.util.HashMap;
import java.util.Iterator;
Expand All @@ -30,7 +32,9 @@
import org.apache.flink.contrib.siddhi.exception.UndefinedStreamException;
import org.apache.flink.contrib.siddhi.schema.StreamSchema;
import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.DataInputViewStreamWrapper;
import org.apache.flink.core.memory.DataOutputView;
import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
import org.apache.flink.runtime.state.StateInitializationContext;
import org.apache.flink.runtime.state.StateSnapshotContext;
import org.apache.flink.streaming.api.TimeCharacteristic;
Expand Down Expand Up @@ -84,6 +88,7 @@ public abstract class AbstractSiddhiOperator<IN, OUT> extends AbstractStreamOper
private static final Logger LOGGER = LoggerFactory.getLogger(AbstractSiddhiOperator.class);
private static final int INITIAL_PRIORITY_QUEUE_CAPACITY = 11;
private static final String SIDDHI_RUNTIME_STATE_NAME = "siddhiRuntimeState";
private static final String QUEUED_RECORDS_STATE_NAME = "queuedRecordsState";

private final SiddhiOperatorContext siddhiPlan;
private final String executionExpression;
Expand All @@ -98,6 +103,7 @@ public abstract class AbstractSiddhiOperator<IN, OUT> extends AbstractStreamOper
private transient PriorityQueue<StreamRecord<IN>> priorityQueue;

private transient ListState<byte[]> siddhiRuntimeState;
private transient ListState<byte[]> queuedRecordsState;

/**
* @param siddhiPlan Siddhi CEP Execution Plan
Expand Down Expand Up @@ -138,7 +144,7 @@ public void processElement(StreamRecord<IN> element) throws Exception {

if (isProcessingTime) {
processEvent(streamId, schema, element.getValue(), System.currentTimeMillis());
this.checkpointState();
this.checkpointSiddhiRuntimeState();
} else {
PriorityQueue<StreamRecord<IN>> priorityQueue = getPriorityQueue();
// event time processing
Expand Down Expand Up @@ -224,23 +230,23 @@ private void startSiddhiRuntime() {
this.siddhiRuntime = siddhiManager.createExecutionPlanRuntime(executionExpression);
this.siddhiRuntime.start();
registerInputAndOutput(this.siddhiRuntime);
LOGGER.info("Siddhi runtime {} started", siddhiRuntime.getName());
LOGGER.info("Siddhi {} started", siddhiRuntime.getName());
} else {
throw new IllegalStateException("Siddhi runtime has already been initialized");
throw new IllegalStateException("Siddhi has already been initialized");
}
}


private void shutdownSiddhiRuntime() {
if (this.siddhiRuntime != null) {
this.siddhiRuntime.shutdown();
LOGGER.info("Siddhi runtime {} shutdown", this.siddhiRuntime.getName());
LOGGER.info("Siddhi {} shutdown", this.siddhiRuntime.getName());
this.siddhiRuntime = null;
this.siddhiManager.shutdown();
this.siddhiManager = null;
this.inputStreamHandlers = null;
} else {
throw new IllegalStateException("Siddhi runtime has already shutdown");
throw new IllegalStateException("Siddhi has already shutdown");
}
}

Expand All @@ -261,38 +267,71 @@ public void dispose() throws Exception {
super.dispose();
}


private void checkpointState() throws Exception {
this.siddhiRuntimeState.clear();
this.siddhiRuntimeState.add(this.siddhiRuntime.snapshot());
}

@Override
public void snapshotState(StateSnapshotContext context) throws Exception {
super.snapshotState(context);
checkpointState();
checkpointSiddhiRuntimeState();
checkpointRecordQueueState();
}

private void restoreState() throws Exception {
final Iterator<byte[]> iterator = siddhiRuntimeState.get().iterator();
if (iterator.hasNext()) {
this.siddhiRuntime.restore(iterator.next());
LOGGER.info("Restore siddhi state");
final Iterator<byte[]> siddhiState = siddhiRuntimeState.get().iterator();
if (siddhiState.hasNext()) {
this.siddhiRuntime.restore(siddhiState.next());
}

LOGGER.info("Restore queued records state");
final Iterator<byte[]> queueState = queuedRecordsState.get().iterator();
if (queueState.hasNext()) {
final ByteArrayInputStream byteArrayInputStream = new ByteArrayInputStream(queueState.next());
final DataInputViewStreamWrapper dataInputView = new DataInputViewStreamWrapper(byteArrayInputStream);
try {
this.priorityQueue = restoreQueuerState(dataInputView);
} finally {
dataInputView.close();
byteArrayInputStream.close();
}
}
}

@Override
public void initializeState(StateInitializationContext context) throws Exception {
super.initializeState(context);
if (siddhiRuntimeState == null) {
siddhiRuntimeState = context.getOperatorStateStore().getListState(new ListStateDescriptor<>(SIDDHI_RUNTIME_STATE_NAME,
siddhiRuntimeState = context.getOperatorStateStore().getUnionListState(new ListStateDescriptor<>(SIDDHI_RUNTIME_STATE_NAME,
new BytePrimitiveArraySerializer()));
}
if (queuedRecordsState == null) {
queuedRecordsState = context.getOperatorStateStore().getListState(
new ListStateDescriptor<>(QUEUED_RECORDS_STATE_NAME, new BytePrimitiveArraySerializer()));
}
if (context.isRestored()) {
restoreState();
}
}


private void checkpointSiddhiRuntimeState() throws Exception {
this.siddhiRuntimeState.clear();
this.siddhiRuntimeState.add(this.siddhiRuntime.snapshot());
this.queuedRecordsState.clear();
}

private void checkpointRecordQueueState() throws Exception {
final ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
final DataOutputViewStreamWrapper dataOutputView = new DataOutputViewStreamWrapper(byteArrayOutputStream);
try {
snapshotQueueState(this.priorityQueue, dataOutputView);
this.queuedRecordsState.clear();
this.queuedRecordsState.add(byteArrayOutputStream.toByteArray());
} finally {
dataOutputView.close();
byteArrayOutputStream.close();
}
}

protected abstract void snapshotQueueState(PriorityQueue<StreamRecord<IN>> queue, DataOutputView dataOutputView) throws IOException;

protected abstract PriorityQueue<StreamRecord<IN>> restoreQueuerState(DataInputView dataInputView) throws IOException;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,9 @@

import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeutils.CompatibilityResult;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.contrib.siddhi.schema.StreamSchema;
import org.apache.flink.contrib.siddhi.utils.SiddhiTypeFactory;
Expand Down

0 comments on commit 5e07371

Please # to comment.