diff --git a/src/stream/src/executor/sync_kv_log_store.rs b/src/stream/src/executor/sync_kv_log_store.rs index 6514fbec71110..7225d5d759dc4 100644 --- a/src/stream/src/executor/sync_kv_log_store.rs +++ b/src/stream/src/executor/sync_kv_log_store.rs @@ -528,37 +528,38 @@ impl SyncedKvLogStoreExecutor { truncation_offset: Option, buffer: &mut SyncedLogStoreBuffer, ) -> StreamExecutorResult> { + // TODO(kwannoel): As an optimization we can also change flushed chunks to be flushed items + // to reduce memory consumption of logstore. + let epoch = barrier.epoch.prev; let mut writer = write_state.start_writer(false); writer.write_barrier(epoch, barrier.is_checkpoint())?; - // FIXME(kwannoel): Flush all unflushed chunks - // As an optimization we can also change it into flushed items instead. - // This will reduce memory consumption of logstore. - - // TODO: may stop the for loop when seeing any of flushed item to avoid always iterating the whole buffer - for (epoch, item) in &mut buffer.buffer { - match item { - LogStoreBufferItem::StreamChunk { - chunk, - start_seq_id, - end_seq_id, - flushed, - .. - } => { - if !*flushed { - writer.write_chunk(chunk, *epoch, *start_seq_id, *end_seq_id)?; - *flushed = true; + if barrier.is_checkpoint() { + for (epoch, item) in buffer.buffer.iter_mut().rev() { + match item { + LogStoreBufferItem::StreamChunk { + chunk, + start_seq_id, + end_seq_id, + flushed, + .. + } => { + if !*flushed { + writer.write_chunk(chunk, *epoch, *start_seq_id, *end_seq_id)?; + *flushed = true; + } else { + break; + } } + LogStoreBufferItem::Flushed { .. } | LogStoreBufferItem::Barrier { .. } => {} } - LogStoreBufferItem::Flushed { .. } | LogStoreBufferItem::Barrier { .. } => {} } } + // Apply truncation let (flush_info, _) = writer.finish().await?; flush_info.report(metrics); - - // Apply truncation let post_seal = write_state.seal_current_epoch(barrier.epoch.curr, truncation_offset); // Add to buffer