Skip to content

Commit

Permalink
adapter: apply_write off main loop
Browse files Browse the repository at this point in the history
  • Loading branch information
aljoscha committed Oct 2, 2023
1 parent 8cf3ac4 commit 2d157a8
Showing 1 changed file with 34 additions and 18 deletions.
52 changes: 34 additions & 18 deletions src/adapter/src/coord/appends.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ use mz_ore::vec::VecExt;
use mz_repr::{Diff, GlobalId, Row, Timestamp};
use mz_sql::plan::Plan;
use mz_storage_client::client::TimestamplessUpdate;
use mz_storage_types::sources::Timeline;
use tokio::sync::{oneshot, Notify, OwnedMutexGuard, OwnedSemaphorePermit, Semaphore};
use tracing::{debug, warn, Instrument, Span};

Expand Down Expand Up @@ -427,25 +428,40 @@ impl Coordinator {
notifies: Vec<oneshot::Sender<()>>,
_permit: Option<GroupCommitPermit>,
) {
self.apply_local_write(timestamp).await;
for response in responses {
let (ctx, result) = response.finalize();
ctx.retire(result);
}
for tx in notifies {
let _ = tx.send(());
}
let mut shared_oracle = self
.get_shared_timestamp_oracle(&Timeline::EpochMilliseconds)
.expect("todo");

let internal_cmd_tx = self.internal_cmd_tx.clone();

task::spawn(|| format!("group_commit_apply:{timestamp}"), async move {
shared_oracle.apply_write(timestamp).await;

// Advancing timelines will update all timeline read holds, and update the read timestamps
// of non-realtime timelines. There are no guarantees that we need to provide with the
// ordering of advancing timelines and user transactions. Updating read holds are only to
// allow compaction and free some memory. Non-realtime timelines can only be written to by
// upstream sources, which we don't provide ordering guarantees for with respect to user
// transactions. We send the `AdvanceTimelines` message here out of convenience, because we
// know at least the real-time timeline will have a read hold that can be updated.
self.internal_cmd_tx
.send(Message::AdvanceTimelines)
.expect("sending to self.internal_cmd_tx cannot fail");
for response in responses {
let (ctx, result) = response.finalize();
ctx.retire(result);
}
for tx in notifies {
let _ = tx.send(());
}

// Advancing timelines will update all timeline read holds, and update the read timestamps
// of non-realtime timelines. There are no guarantees that we need to provide with the
// ordering of advancing timelines and user transactions. Updating read holds are only to
// allow compaction and free some memory. Non-realtime timelines can only be written to by
// upstream sources, which we don't provide ordering guarantees for with respect to user
// transactions. We send the `AdvanceTimelines` message here out of convenience, because we
// know at least the real-time timeline will have a read hold that can be updated.
internal_cmd_tx
.send(Message::AdvanceTimelines)
.expect("sending to self.internal_cmd_tx cannot fail");

// Make sure we hold these until here, after we returned the
// notifies.
// TODO: Or maybe not?
// drop(write_lock_guard);
// drop(permit);
});
}

/// Submit a write to be executed during the next group commit and trigger a group commit.
Expand Down

0 comments on commit 2d157a8

Please # to comment.