diff --git a/src/adapter/src/coord.rs b/src/adapter/src/coord.rs index c628256ad8bde..9f9febf70b280 100644 --- a/src/adapter/src/coord.rs +++ b/src/adapter/src/coord.rs @@ -1962,18 +1962,18 @@ impl Coordinator { () = self.controller.ready() => { Message::ControllerReady } - // See [`appends::GroupCommitWaiter`] for notes on why this is cancel safe. - permit = group_commit_rx.ready() => { - let span = info_span!(parent: None, "group_commit_notify"); - span.follows_from(Span::current()); - Message::GroupCommitInitiate(span, Some(permit)) - }, // `recv()` on `UnboundedReceiver` is cancellation safe: // https://docs.rs/tokio/1.8.0/tokio/sync/mpsc/struct.UnboundedReceiver.html#cancel-safety m = cmd_rx.recv() => match m { None => break, Some(m) => Message::Command(m), }, + // See [`appends::GroupCommitWaiter`] for notes on why this is cancel safe. + permit = group_commit_rx.ready() => { + let span = info_span!(parent: None, "group_commit_notify"); + span.follows_from(Span::current()); + Message::GroupCommitInitiate(span, Some(permit)) + }, // `recv()` on `UnboundedReceiver` is cancellation safe: // https://docs.rs/tokio/1.8.0/tokio/sync/mpsc/struct.UnboundedReceiver.html#cancel-safety Some(pending_peek) = linearized_reads_ts_rx.recv() => { diff --git a/src/adapter/src/coord/appends.rs b/src/adapter/src/coord/appends.rs index 84c11b6baaa93..c6585a489668c 100644 --- a/src/adapter/src/coord/appends.rs +++ b/src/adapter/src/coord/appends.rs @@ -231,6 +231,8 @@ impl Coordinator { write_lock_guard: Option>, permit: Option, ) { + drop(permit); + let (write_lock_guard, pending_writes): (_, Vec<_>) = if let Some(guard) = write_lock_guard { // If the caller passed in the write lock, then we can execute a group commit. @@ -371,8 +373,14 @@ impl Coordinator { .await .expect("One-shot dropped while waiting synchronously") .unwrap_or_terminate("cannot fail to apply appends"); - self.group_commit_apply(timestamp, responses, write_lock_guard, notifies, permit) - .await; + self.group_commit_apply( + timestamp, + responses, + write_lock_guard, + notifies, + None, /* no permit */ + ) + .await; } else { debug!("async group_commit_apply"); let internal_cmd_tx = self.internal_cmd_tx.clone(); @@ -386,7 +394,7 @@ impl Coordinator { responses, write_lock_guard, notifies, - permit, + None, /* no permit */ Span::current(), )) { warn!("Server closed with non-responded writes, {e}");