Skip to content
New issue

Have a question about this project? # for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “#”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? # to your account

[draft] adapter: distributed timestamp oracle backed by "postgres" #21671

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
17 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion misc/python/materialize/cli/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,7 @@ def main() -> int:
scratch = MZ_ROOT / "scratch"
db = urlparse(args.postgres).path.removeprefix("/")
_run_sql(args.postgres, f"CREATE DATABASE IF NOT EXISTS {db}")
for schema in ["consensus", "adapter", "storage"]:
for schema in ["consensus", "adapter", "timestamp_oracle", "storage"]:
if args.reset:
_run_sql(args.postgres, f"DROP SCHEMA IF EXISTS {schema} CASCADE")
_run_sql(args.postgres, f"CREATE SCHEMA IF NOT EXISTS {schema}")
Expand Down Expand Up @@ -218,6 +218,7 @@ def main() -> int:
f"--persist-consensus-url={args.postgres}?options=--search_path=consensus",
f"--persist-blob-url=file://{mzdata}/persist/blob",
f"--adapter-stash-url={args.postgres}?options=--search_path=adapter",
f"--timestamp-oracle-url={args.postgres}?options=--search_path=timestamp_oracle",
f"--storage-stash-url={args.postgres}?options=--search_path=storage",
f"--environment-id={environment_id}",
"--bootstrap-role=materialize",
Expand Down
1 change: 1 addition & 0 deletions misc/python/materialize/cloudtest/k8s/environmentd.py
Original file line number Diff line number Diff line change
Expand Up @@ -183,6 +183,7 @@ def args(self) -> list[str]:
"--orchestrator-kubernetes-image-pull-policy=if-not-present",
f"--persist-consensus-url=postgres://root@cockroach.{self.cockroach_namespace}:26257?options=--search_path=consensus",
f"--adapter-stash-url=postgres://root@cockroach.{self.cockroach_namespace}:26257?options=--search_path=adapter",
f"--timestamp-oracle-url=postgres://root@cockroach.{self.cockroach_namespace}:26257?options=--search_path=timestamp_oracle",
f"--storage-stash-url=postgres://root@cockroach.{self.cockroach_namespace}:26257?options=--search_path=storage",
"--internal-sql-listen-addr=0.0.0.0:6877",
"--internal-http-listen-addr=0.0.0.0:6878",
Expand Down
1 change: 1 addition & 0 deletions misc/python/materialize/mzcompose/services/materialized.py
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,7 @@ def __init__(
depends_graph["cockroach"] = {"condition": "service_healthy"}
command += [
"--adapter-stash-url=postgres://root@cockroach:26257?options=--search_path=adapter",
"--timestamp-oracle-url=postgres://root@cockroach:26257?options=--search_path=timestamp_oracle",
"--storage-stash-url=postgres://root@cockroach:26257?options=--search_path=storage",
"--persist-consensus-url=postgres://root@cockroach:26257?options=--search_path=consensus",
]
Expand Down
2 changes: 2 additions & 0 deletions src/adapter/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ bytes = "1.3.0"
bytesize = "1.1.0"
chrono = { version = "0.4.23", default-features = false, features = ["std"] }
dec = "0.4.8"
deadpool-postgres = "0.10.3"
derivative = "2.2.0"
differential-dataflow = "0.12.0"
enum-kinds = "0.5.1"
Expand Down Expand Up @@ -43,6 +44,7 @@ mz-persist-client = { path = "../persist-client" }
mz-pgcopy = { path = "../pgcopy" }
mz-pgrepr = { path = "../pgrepr" }
mz-pgwire-common = { path = "../pgwire-common" }
mz-postgres-client = { path = "../postgres-client" }
mz-postgres-util = { path = "../postgres-util" }
mz-prof = { path = "../prof" }
mz-proto = { path = "../proto" }
Expand Down
64 changes: 57 additions & 7 deletions src/adapter/src/coord.rs
Original file line number Diff line number Diff line change
Expand Up @@ -217,6 +217,8 @@ pub enum Message<T = mz_repr::Timestamp> {
Vec<oneshot::Sender<()>>,
/// Permit which limits how many group commits we run at once.
Option<GroupCommitPermit>,
/// Tracing span.
Span,
),
AdvanceTimelines,
ClusterEvent(ClusterEvent),
Expand All @@ -243,6 +245,7 @@ pub enum Message<T = mz_repr::Timestamp> {
stmt: Statement<Raw>,
params: mz_sql::plan::Params,
},
DetermineLinearizedReadTs(Vec<PeekStageTimestamp>),
PeekStageReady {
ctx: ExecuteContext,
stage: PeekStage,
Expand Down Expand Up @@ -286,6 +289,7 @@ impl Message {
Message::ExecuteSingleStatementTransaction { .. } => {
"execute_single_statement_transaction"
}
Message::DetermineLinearizedReadTs(_) => "determine_linearized_read_ts",
Message::PeekStageReady { .. } => "peek_stage_ready",
Message::DrainStatementLog => "drain_statement_log",
}
Expand Down Expand Up @@ -351,6 +355,7 @@ pub enum RealTimeRecencyContext {
view_id: GlobalId,
index_id: GlobalId,
timeline_context: TimelineContext,
oracle_read_ts: Option<Timestamp>,
source_ids: BTreeSet<GlobalId>,
in_immediate_multi_stmt_txn: bool,
key: Vec<MirScalarExpr>,
Expand All @@ -372,7 +377,7 @@ impl RealTimeRecencyContext {
pub enum PeekStage {
Validate(PeekStageValidate),
Optimize(PeekStageOptimize),
Timestamp(PeekStageTimestamp),
ReadHolds(PeekStageReadHolds),
Finish(PeekStageFinish),
}

Expand All @@ -381,7 +386,7 @@ impl PeekStage {
match self {
PeekStage::Validate(_) => None,
PeekStage::Optimize(PeekStageOptimize { validity, .. })
| PeekStage::Timestamp(PeekStageTimestamp { validity, .. })
| PeekStage::ReadHolds(PeekStageReadHolds { validity, .. })
| PeekStage::Finish(PeekStageFinish { validity, .. }) => Some(validity),
}
}
Expand All @@ -393,6 +398,13 @@ pub struct PeekStageValidate {
target_cluster: TargetCluster,
}

#[derive(Debug)]
pub struct PeekStageTimestamp {
ctx: ExecuteContext,
linearized_timeline: Timeline,
optimize_stage: PeekStageOptimize,
}

#[derive(Debug)]
pub struct PeekStageOptimize {
validity: PlanValidity,
Expand All @@ -406,11 +418,12 @@ pub struct PeekStageOptimize {
when: QueryWhen,
target_replica: Option<ReplicaId>,
timeline_context: TimelineContext,
oracle_read_ts: Option<Timestamp>,
in_immediate_multi_stmt_txn: bool,
}

#[derive(Debug)]
pub struct PeekStageTimestamp {
pub struct PeekStageReadHolds {
validity: PlanValidity,
dataflow: DataflowDescription<OptimizedMirRelationExpr>,
finishing: RowSetFinishing,
Expand All @@ -423,6 +436,7 @@ pub struct PeekStageTimestamp {
when: QueryWhen,
target_replica: Option<ReplicaId>,
timeline_context: TimelineContext,
oracle_read_ts: Option<Timestamp>,
in_immediate_multi_stmt_txn: bool,
key: Vec<MirScalarExpr>,
typ: RelationType,
Expand All @@ -442,6 +456,7 @@ pub struct PeekStageFinish {
view_id: GlobalId,
index_id: GlobalId,
timeline_context: TimelineContext,
oracle_read_ts: Option<Timestamp>,
source_ids: BTreeSet<GlobalId>,
real_time_recency_ts: Option<mz_repr::Timestamp>,
key: Vec<MirScalarExpr>,
Expand Down Expand Up @@ -540,6 +555,7 @@ impl PlanValidity {
pub struct Config {
pub dataflow_client: mz_controller::Controller,
pub storage: Box<dyn mz_catalog::DurableCatalogState>,
pub timestamp_oracle_url: Option<String>,
pub unsafe_mode: bool,
pub all_features: bool,
pub build_info: &'static BuildInfo,
Expand Down Expand Up @@ -951,6 +967,9 @@ pub struct Coordinator {
/// Channel for strict serializable reads ready to commit.
strict_serializable_reads_tx: mpsc::UnboundedSender<PendingReadTxn>,

/// Channel for reads that need a linearized read timestamp.
linearized_reads_ts_tx: mpsc::UnboundedSender<PeekStageTimestamp>,

/// Mechanism for totally ordering write and read timestamps, so that all reads
/// reflect exactly the set of writes that precede them, and no writes that follow.
global_timelines: BTreeMap<Timeline, TimelineState<Timestamp>>,
Expand Down Expand Up @@ -1048,6 +1067,9 @@ pub struct Coordinator {
/// Coordinator metrics.
metrics: Metrics,

/// For registering new metrics.
metrics_registry: MetricsRegistry,

/// Tracing handle.
tracing_handle: TracingHandle,

Expand All @@ -1056,6 +1078,9 @@ pub struct Coordinator {

/// Whether to start replicas with the new variable-length row encoding scheme.
variable_length_row_encoding: bool,

/// Postgres connection URL for the Postgres/CRDB-backed timestamp oracle.
timestamp_oracle_url: Option<String>,
}

impl Coordinator {
Expand Down Expand Up @@ -1849,6 +1874,7 @@ impl Coordinator {
mut self,
mut internal_cmd_rx: mpsc::UnboundedReceiver<Message>,
mut strict_serializable_reads_rx: mpsc::UnboundedReceiver<PendingReadTxn>,
mut linearized_reads_ts_rx: mpsc::UnboundedReceiver<PeekStageTimestamp>,
mut cmd_rx: mpsc::UnboundedReceiver<Command>,
group_commit_rx: appends::GroupCommitWaiter,
) {
Expand Down Expand Up @@ -1932,6 +1958,12 @@ impl Coordinator {
() = self.controller.ready() => {
Message::ControllerReady
}
// `recv()` on `UnboundedReceiver` is cancellation safe:
// https://docs.rs/tokio/1.8.0/tokio/sync/mpsc/struct.UnboundedReceiver.html#cancel-safety
m = cmd_rx.recv() => match m {
None => break,
Some(m) => Message::Command(m),
},
// See [`appends::GroupCommitWaiter`] for notes on why this is cancel safe.
permit = group_commit_rx.ready() => {
let span = info_span!(parent: None, "group_commit_notify");
Expand All @@ -1940,10 +1972,13 @@ impl Coordinator {
},
// `recv()` on `UnboundedReceiver` is cancellation safe:
// https://docs.rs/tokio/1.8.0/tokio/sync/mpsc/struct.UnboundedReceiver.html#cancel-safety
m = cmd_rx.recv() => match m {
None => break,
Some(m) => Message::Command(m),
},
Some(pending_peek) = linearized_reads_ts_rx.recv() => {
let mut pending_peeks = vec![pending_peek];
while let Ok(pending_peek) = linearized_reads_ts_rx.try_recv() {
pending_peeks.push(pending_peek);
}
Message::DetermineLinearizedReadTs(pending_peeks)
}
// `recv()` on `UnboundedReceiver` is cancellation safe:
// https://docs.rs/tokio/1.8.0/tokio/sync/mpsc/struct.UnboundedReceiver.html#cancel-safety
Some(pending_read_txn) = strict_serializable_reads_rx.recv() => {
Expand Down Expand Up @@ -2047,6 +2082,7 @@ pub async fn serve(
Config {
dataflow_client,
storage,
timestamp_oracle_url,
unsafe_mode,
all_features,
build_info,
Expand Down Expand Up @@ -2079,6 +2115,7 @@ pub async fn serve(
let (internal_cmd_tx, internal_cmd_rx) = mpsc::unbounded_channel();
let (group_commit_tx, group_commit_rx) = appends::notifier();
let (strict_serializable_reads_tx, strict_serializable_reads_rx) = mpsc::unbounded_channel();
let (linearized_reads_ts_tx, linearized_reads_ts_rx) = mpsc::unbounded_channel();

// Validate and process availability zones.
if !availability_zones.iter().all_unique() {
Expand Down Expand Up @@ -2158,11 +2195,20 @@ pub async fn serve(
let persistence =
CatalogTimestampPersistence::new(timeline.clone(), Arc::clone(&catalog));

let timestamp_oracle_impl = catalog.system_config().timestamp_oracle();

println!(
"------------- TIMESTAMP ORACLE IMPL: {}",
timestamp_oracle_impl
);

handle.block_on(Coordinator::ensure_timeline_state_with_initial_time(
&timeline,
initial_timestamp,
coord_now.clone(),
persistence,
timestamp_oracle_url.clone(),
metrics_registry.clone(),
&mut timestamp_oracles,
));
}
Expand All @@ -2182,6 +2228,7 @@ pub async fn serve(
internal_cmd_tx,
group_commit_tx,
strict_serializable_reads_tx,
linearized_reads_ts_tx,
global_timelines: timestamp_oracles,
transient_id_counter: 1,
active_conns: BTreeMap::new(),
Expand All @@ -2205,9 +2252,11 @@ pub async fn serve(
storage_usage_collection_interval,
segment_client,
metrics,
metrics_registry,
tracing_handle,
statement_logging: StatementLogging::new(),
variable_length_row_encoding,
timestamp_oracle_url,
};
let bootstrap = handle.block_on(async {
coord
Expand All @@ -2232,6 +2281,7 @@ pub async fn serve(
handle.block_on(coord.serve(
internal_cmd_rx,
strict_serializable_reads_rx,
linearized_reads_ts_rx,
cmd_rx,
group_commit_rx,
));
Expand Down
Loading