Skip to content

Commit

Permalink
refactor(stream): shortcircuit flushing chunks on barrier (risingwave…
Browse files Browse the repository at this point in the history
  • Loading branch information
kwannoel authored and Standing-Man committed Feb 28, 2025
1 parent 0961ce8 commit a6f5c46
Showing 1 changed file with 21 additions and 20 deletions.
41 changes: 21 additions & 20 deletions src/stream/src/executor/sync_kv_log_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -528,37 +528,38 @@ impl<S: StateStore> SyncedKvLogStoreExecutor<S> {
truncation_offset: Option<ReaderTruncationOffsetType>,
buffer: &mut SyncedLogStoreBuffer,
) -> StreamExecutorResult<LogStorePostSealCurrentEpoch<'a, S::Local>> {
// TODO(kwannoel): As an optimization we can also change flushed chunks to be flushed items
// to reduce memory consumption of logstore.

let epoch = barrier.epoch.prev;
let mut writer = write_state.start_writer(false);
writer.write_barrier(epoch, barrier.is_checkpoint())?;

// FIXME(kwannoel): Flush all unflushed chunks
// As an optimization we can also change it into flushed items instead.
// This will reduce memory consumption of logstore.

// TODO: may stop the for loop when seeing any of flushed item to avoid always iterating the whole buffer
for (epoch, item) in &mut buffer.buffer {
match item {
LogStoreBufferItem::StreamChunk {
chunk,
start_seq_id,
end_seq_id,
flushed,
..
} => {
if !*flushed {
writer.write_chunk(chunk, *epoch, *start_seq_id, *end_seq_id)?;
*flushed = true;
if barrier.is_checkpoint() {
for (epoch, item) in buffer.buffer.iter_mut().rev() {
match item {
LogStoreBufferItem::StreamChunk {
chunk,
start_seq_id,
end_seq_id,
flushed,
..
} => {
if !*flushed {
writer.write_chunk(chunk, *epoch, *start_seq_id, *end_seq_id)?;
*flushed = true;
} else {
break;
}
}
LogStoreBufferItem::Flushed { .. } | LogStoreBufferItem::Barrier { .. } => {}
}
LogStoreBufferItem::Flushed { .. } | LogStoreBufferItem::Barrier { .. } => {}
}
}

// Apply truncation
let (flush_info, _) = writer.finish().await?;
flush_info.report(metrics);

// Apply truncation
let post_seal = write_state.seal_current_epoch(barrier.epoch.curr, truncation_offset);

// Add to buffer
Expand Down

0 comments on commit a6f5c46

Please # to comment.