Skip to content

Commit

Permalink
feat(logstore): handle paused stream (risingwavelabs#20511)
Browse files Browse the repository at this point in the history
  • Loading branch information
kwannoel authored and Standing-Man committed Feb 28, 2025
1 parent 403a276 commit 0961ce8
Showing 1 changed file with 22 additions and 3 deletions.
25 changes: 22 additions & 3 deletions src/stream/src/executor/sync_kv_log_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -261,6 +261,7 @@ impl<S: StateStore> SyncedKvLogStoreExecutor<S> {
new_log_store_state(self.table_id, local_state_store, self.serde);
initial_write_state.init(first_write_epoch).await?;

let mut pause_stream = first_barrier.is_pause_on_startup();
let mut initial_write_epoch = first_write_epoch;

// We only recreate the consume stream when:
Expand Down Expand Up @@ -289,8 +290,15 @@ impl<S: StateStore> SyncedKvLogStoreExecutor<S> {

loop {
let select_result = {
let read_future =
read_future.next_chunk(&read_state, &mut buffer, &self.metrics);
let read_future = async {
if pause_stream {
pending().await
} else {
read_future
.next_chunk(&read_state, &mut buffer, &self.metrics)
.await
}
};
pin_mut!(read_future);
let write_future = write_future.next_event();
pin_mut!(write_future);
Expand All @@ -306,6 +314,18 @@ impl<S: StateStore> SyncedKvLogStoreExecutor<S> {
WriteFutureEvent::UpstreamMessageReceived(msg) => {
match msg {
Message::Barrier(barrier) => {
if let Some(mutation) = barrier.mutation.as_deref() {
match mutation {
Mutation::Pause => {
pause_stream = true;
}
Mutation::Resume => {
pause_stream = false;
}
_ => {}
}
}

let write_state_post_write_barrier = Self::write_barrier(
&mut write_state,
barrier.clone(),
Expand Down Expand Up @@ -510,7 +530,6 @@ impl<S: StateStore> SyncedKvLogStoreExecutor<S> {
) -> StreamExecutorResult<LogStorePostSealCurrentEpoch<'a, S::Local>> {
let epoch = barrier.epoch.prev;
let mut writer = write_state.start_writer(false);
// FIXME(kwannoel): Handle paused stream.
writer.write_barrier(epoch, barrier.is_checkpoint())?;

// FIXME(kwannoel): Flush all unflushed chunks
Expand Down

0 comments on commit 0961ce8

Please # to comment.