Skip to content

Commit

Permalink
adapter: de-prioritize group commit
Browse files Browse the repository at this point in the history
  • Loading branch information
aljoscha committed Oct 2, 2023
1 parent 4964252 commit 8cf3ac4
Show file tree
Hide file tree
Showing 2 changed files with 17 additions and 9 deletions.
12 changes: 6 additions & 6 deletions src/adapter/src/coord.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1962,18 +1962,18 @@ impl Coordinator {
() = self.controller.ready() => {
Message::ControllerReady
}
// See [`appends::GroupCommitWaiter`] for notes on why this is cancel safe.
permit = group_commit_rx.ready() => {
let span = info_span!(parent: None, "group_commit_notify");
span.follows_from(Span::current());
Message::GroupCommitInitiate(span, Some(permit))
},
// `recv()` on `UnboundedReceiver` is cancellation safe:
// https://docs.rs/tokio/1.8.0/tokio/sync/mpsc/struct.UnboundedReceiver.html#cancel-safety
m = cmd_rx.recv() => match m {
None => break,
Some(m) => Message::Command(m),
},
// See [`appends::GroupCommitWaiter`] for notes on why this is cancel safe.
permit = group_commit_rx.ready() => {
let span = info_span!(parent: None, "group_commit_notify");
span.follows_from(Span::current());
Message::GroupCommitInitiate(span, Some(permit))
},
// `recv()` on `UnboundedReceiver` is cancellation safe:
// https://docs.rs/tokio/1.8.0/tokio/sync/mpsc/struct.UnboundedReceiver.html#cancel-safety
Some(pending_peek) = linearized_reads_ts_rx.recv() => {
Expand Down
14 changes: 11 additions & 3 deletions src/adapter/src/coord/appends.rs
Original file line number Diff line number Diff line change
Expand Up @@ -231,6 +231,8 @@ impl Coordinator {
write_lock_guard: Option<tokio::sync::OwnedMutexGuard<()>>,
permit: Option<GroupCommitPermit>,
) {
drop(permit);

let (write_lock_guard, pending_writes): (_, Vec<_>) = if let Some(guard) = write_lock_guard
{
// If the caller passed in the write lock, then we can execute a group commit.
Expand Down Expand Up @@ -371,8 +373,14 @@ impl Coordinator {
.await
.expect("One-shot dropped while waiting synchronously")
.unwrap_or_terminate("cannot fail to apply appends");
self.group_commit_apply(timestamp, responses, write_lock_guard, notifies, permit)
.await;
self.group_commit_apply(
timestamp,
responses,
write_lock_guard,
notifies,
None, /* no permit */
)
.await;
} else {
debug!("async group_commit_apply");
let internal_cmd_tx = self.internal_cmd_tx.clone();
Expand All @@ -386,7 +394,7 @@ impl Coordinator {
responses,
write_lock_guard,
notifies,
permit,
None, /* no permit */
Span::current(),
)) {
warn!("Server closed with non-responded writes, {e}");
Expand Down

0 comments on commit 8cf3ac4

Please # to comment.