Skip to content
New issue

Have a question about this project? # for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “#”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? # to your account

feat: add initial txn execute state #507

Closed
wants to merge 15 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 7 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions benchmark/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -26,3 +26,4 @@ tracing-subscriber = "0.3.1"
utils = { path = "../utils", version = "0.1.0" }
xline = { path = "../xline" }
xline-client = { path = "../xline-client" }
xlineapi = { path = "../xlineapi" }
2 changes: 1 addition & 1 deletion benchmark/src/bench_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,12 @@ use std::fmt::Debug;
use anyhow::Result;
use etcd_client::{Client as EtcdClient, PutOptions};
use thiserror::Error;
use xline::server::Command;
use xline_client::{
error::XlineClientError as ClientError,
types::kv::{PutRequest, PutResponse},
Client, ClientOptions,
};
use xlineapi::command::Command;

/// The client used in benchmark
#[derive(Error, Debug)]
Expand Down
2 changes: 1 addition & 1 deletion curp/src/server/cmd_worker/conflict_checked_mpmc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -482,8 +482,8 @@ impl<C: Command, CE: CommandExecutor<C>> Filter<C, CE> {
new_vid
}
}
// since a reset is needed, all other vertices doesn't matter anymore, so delete them all
CEEvent::Reset(snapshot, finish_tx) => {
// since a reset is needed, all other vertexes doesn't matter anymore, so delete them all
self.entry_vid.clear();
self.vs.clear();

Expand Down
2 changes: 1 addition & 1 deletion curp/src/server/curp_node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -479,7 +479,7 @@ impl<C: 'static + Command, RC: RoleChange + 'static> CurpNode<C, RC> {
) -> Result<FetchReadStateResponse, CurpError> {
self.check_cluster_version(req.cluster_version)?;
let cmd = req.cmd()?;
let state = self.curp.handle_fetch_read_state(&cmd);
let state = self.curp.handle_fetch_read_state(&cmd)?;
Ok(FetchReadStateResponse::new(state))
}
}
Expand Down
24 changes: 18 additions & 6 deletions curp/src/server/raw_curp/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -720,17 +720,29 @@ impl<C: 'static + Command, RC: RoleChange + 'static> RawCurp<C, RC> {
}

/// Handle `fetch_read_state`
pub(super) fn handle_fetch_read_state(&self, cmd: &C) -> ReadState {
let ids = self.ctx.sp.map_lock(|sp| {
sp.pool
pub(super) fn handle_fetch_read_state(&self, cmd: &C) -> Result<ReadState, CurpError> {
if self.st.read().role != Role::Leader {
return Err(CurpError::Internal("not leader".to_owned()));
}

let ids = {
let sp_l = self.ctx.sp.lock();
let ucp_l = self.ctx.ucp.lock();
sp_l.pool
.iter()
.filter_map(|(id, c)| c.is_conflict_with_cmd(cmd).then_some(*id))
.chain(
ucp_l
.iter()
.filter_map(|(id, c)| c.is_conflict_with_cmd(cmd).then_some(*id)),
)
.unique()
.collect_vec()
});
};
if ids.is_empty() {
ReadState::CommitIndex(self.log.read().commit_index)
Ok(ReadState::CommitIndex(self.log.read().commit_index))
} else {
ReadState::Ids(IdSet::new(ids))
Ok(ReadState::Ids(IdSet::new(ids)))
}
}
}
Expand Down
1 change: 0 additions & 1 deletion xline-client/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ tokio = { version = "0.2.23", package = "madsim-tokio", features = ["sync"] }
tonic = { version = "0.3.0", package = "madsim-tonic" }
tower = { version = "0.4", features = ["discover"] }
utils = { path = "../utils", features = ["parking_lot"] }
xline = { path = "../xline" }
xlineapi = { path = "../xlineapi" }

[dev-dependencies]
Expand Down
2 changes: 1 addition & 1 deletion xline-client/examples/error_handling.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
//! An example to show how the errors are organized in `xline-client`
use anyhow::Result;
use xline::storage::ExecuteError;
use xline_client::{error::XlineClientError, types::kv::PutRequest, Client, ClientOptions};
use xlineapi::execute_error::ExecuteError;

#[tokio::main]
async fn main() -> Result<()> {
Expand Down
4 changes: 2 additions & 2 deletions xline-client/src/clients/auth.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,8 @@ use pbkdf2::{
Pbkdf2,
};
use tonic::transport::Channel;
use xline::server::Command;
use xlineapi::{
command::{command_from_request_wrapper, Command},
AuthDisableResponse, AuthEnableResponse, AuthRoleAddResponse, AuthRoleDeleteResponse,
AuthRoleGetResponse, AuthRoleGrantPermissionResponse, AuthRoleListResponse,
AuthRoleRevokePermissionResponse, AuthStatusResponse, AuthUserAddResponse,
Expand Down Expand Up @@ -713,7 +713,7 @@ impl AuthClient {
) -> Result<Res> {
let propose_id = self.curp_client.gen_propose_id().await?;
let request = RequestWithToken::new_with_token(request.into(), self.token.clone());
let cmd = Command::new(vec![], request, propose_id);
let cmd = command_from_request_wrapper(propose_id, request);

let res_wrapper = if use_fast_path {
let (cmd_res, _sync_error) = self.curp_client.propose(cmd, true).await?;
Expand Down
21 changes: 6 additions & 15 deletions xline-client/src/clients/kv.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use std::sync::Arc;

use curp::client::Client as CurpClient;
use xline::server::{Command, KeyRange};
use xlineapi::command::{command_from_request_wrapper, Command};
use xlineapi::{
CompactionResponse, DeleteRangeResponse, PutResponse, RangeResponse, RequestWithToken,
TxnResponse,
Expand Down Expand Up @@ -55,13 +55,12 @@ impl KvClient {
/// ```
#[inline]
pub async fn put(&self, request: PutRequest) -> Result<PutResponse> {
let key_ranges = vec![KeyRange::new_one_key(request.key())];
let propose_id = self.curp_client.gen_propose_id().await?;
let request = RequestWithToken::new_with_token(
xlineapi::PutRequest::from(request).into(),
self.token.clone(),
);
let cmd = Command::new(key_ranges, request, propose_id);
let cmd = command_from_request_wrapper(propose_id, request);
let (cmd_res, _sync_res) = self.curp_client.propose(cmd, true).await?;
Ok(cmd_res.into_inner().into())
}
Expand Down Expand Up @@ -101,13 +100,12 @@ impl KvClient {
/// ```
#[inline]
pub async fn range(&self, request: RangeRequest) -> Result<RangeResponse> {
let key_ranges = vec![KeyRange::new(request.key(), request.range_end())];
let propose_id = self.curp_client.gen_propose_id().await?;
let request = RequestWithToken::new_with_token(
xlineapi::RangeRequest::from(request).into(),
self.token.clone(),
);
let cmd = Command::new(key_ranges, request, propose_id);
let cmd = command_from_request_wrapper(propose_id, request);
let (cmd_res, _sync_res) = self.curp_client.propose(cmd, true).await?;
Ok(cmd_res.into_inner().into())
}
Expand Down Expand Up @@ -140,13 +138,12 @@ impl KvClient {
/// ```
#[inline]
pub async fn delete(&self, request: DeleteRangeRequest) -> Result<DeleteRangeResponse> {
let key_ranges = vec![KeyRange::new(request.key(), request.range_end())];
let propose_id = self.curp_client.gen_propose_id().await?;
let request = RequestWithToken::new_with_token(
xlineapi::DeleteRangeRequest::from(request).into(),
self.token.clone(),
);
let cmd = Command::new(key_ranges, request, propose_id);
let cmd = command_from_request_wrapper(propose_id, request);
let (cmd_res, _sync_res) = self.curp_client.propose(cmd, true).await?;
Ok(cmd_res.into_inner().into())
}
Expand Down Expand Up @@ -190,18 +187,12 @@ impl KvClient {
/// ```
#[inline]
pub async fn txn(&self, request: TxnRequest) -> Result<TxnResponse> {
let key_ranges = request
.inner
.compare
.iter()
.map(|cmp| KeyRange::new(cmp.key.as_slice(), cmp.range_end.as_slice()))
.collect();
let propose_id = self.curp_client.gen_propose_id().await?;
let request = RequestWithToken::new_with_token(
xlineapi::TxnRequest::from(request).into(),
self.token.clone(),
);
let cmd = Command::new(key_ranges, request, propose_id);
let cmd = command_from_request_wrapper(propose_id, request);
let (cmd_res, Some(sync_res)) = self.curp_client.propose(cmd, false).await? else {
unreachable!("sync_res is always Some when use_fast_path is false");
};
Expand Down Expand Up @@ -254,7 +245,7 @@ impl KvClient {
xlineapi::CompactionRequest::from(request).into(),
self.token.clone(),
);
let cmd = Command::new(vec![], request, propose_id);
let cmd = command_from_request_wrapper(propose_id, request);

let res_wrapper = if use_fast_path {
let (cmd_res, _sync_res) = self.curp_client.propose(cmd, true).await?;
Expand Down
6 changes: 3 additions & 3 deletions xline-client/src/clients/lease.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,8 @@ use std::sync::Arc;
use curp::client::Client as CurpClient;
use futures::channel::mpsc::channel;
use tonic::{transport::Channel, Streaming};
use xline::server::Command;
use xlineapi::{
command::{command_from_request_wrapper, Command},
LeaseGrantResponse, LeaseKeepAliveResponse, LeaseLeasesResponse, LeaseRevokeResponse,
LeaseTimeToLiveResponse, RequestWithToken,
};
Expand Down Expand Up @@ -90,7 +90,7 @@ impl LeaseClient {
xlineapi::LeaseGrantRequest::from(request).into(),
self.token.clone(),
);
let cmd = Command::new(vec![], request, propose_id);
let cmd = command_from_request_wrapper(propose_id, request);
let (cmd_res, _sync_res) = self.curp_client.propose(cmd, true).await?;
Ok(cmd_res.into_inner().into())
}
Expand Down Expand Up @@ -266,7 +266,7 @@ impl LeaseClient {
xlineapi::LeaseLeasesRequest {}.into(),
self.token.clone(),
);
let cmd = Command::new(vec![], request, propose_id);
let cmd = command_from_request_wrapper(propose_id, request);
let (cmd_res, _sync_res) = self.curp_client.propose(cmd, true).await?;
Ok(cmd_res.into_inner().into())
}
Expand Down
26 changes: 3 additions & 23 deletions xline-client/src/clients/lock.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,11 @@ use std::{
};

use clippy_utilities::OverflowArithmetic;
use curp::{client::Client as CurpClient, cmd::ProposeId};
use curp::client::Client as CurpClient;
use futures::{Future, FutureExt};
use tonic::transport::Channel;
use xline::server::{Command, CommandResponse, KeyRange, SyncResponse};
use xlineapi::{
command::{command_from_request_wrapper, Command, CommandResponse, KeyRange, SyncResponse},
Compare, CompareResult, CompareTarget, DeleteRangeRequest, DeleteRangeResponse, EventType,
LockResponse, PutRequest, RangeRequest, RangeResponse, Request, RequestOp, RequestWithToken,
RequestWrapper, Response, ResponseHeader, SortOrder, SortTarget, TargetUnion, TxnRequest,
Expand Down Expand Up @@ -233,26 +233,6 @@ impl LockClient {
Ok(UnlockResponse { header })
}

/// Generate `Command` proposal from `Request`
fn command_from_request_wrapper(propose_id: ProposeId, wrapper: RequestWithToken) -> Command {
#[allow(clippy::wildcard_enum_match_arm)]
let keys = match wrapper.request {
RequestWrapper::DeleteRangeRequest(ref req) => {
vec![KeyRange::new_one_key(req.key.as_slice())]
}
RequestWrapper::RangeRequest(ref req) => {
vec![KeyRange::new(req.key.as_slice(), req.range_end.as_slice())]
}
RequestWrapper::TxnRequest(ref req) => req
.compare
.iter()
.map(|cmp| KeyRange::new(cmp.key.as_slice(), cmp.range_end.as_slice()))
.collect(),
_ => vec![],
};
Command::new(keys, wrapper, propose_id)
}

/// Propose request and get result with fast/slow path
async fn propose<T>(
&self,
Expand All @@ -266,7 +246,7 @@ impl LockClient {
RequestWithToken::new_with_token(request.into(), self.token.clone());
let propose_id = self.curp_client.gen_propose_id().await?;

let cmd = Self::command_from_request_wrapper(propose_id, request_with_token);
let cmd = command_from_request_wrapper(propose_id, request_with_token);
self.curp_client
.propose(cmd, use_fast_path)
.await
Expand Down
2 changes: 1 addition & 1 deletion xline-client/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use curp::{
error::{ClientBuildError, ClientError},
};
use thiserror::Error;
use xline::server::Command;
use xlineapi::command::Command;

/// The result type for `xline-client`
pub type Result<T> = std::result::Result<T, XlineClientError<Command>>;
Expand Down
2 changes: 1 addition & 1 deletion xline-client/src/types/auth.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use xline::server::KeyRange;
use xlineapi::command::KeyRange;
pub use xlineapi::{
AuthDisableResponse, AuthEnableResponse, AuthRoleAddResponse, AuthRoleDeleteResponse,
AuthRoleGetResponse, AuthRoleGrantPermissionResponse, AuthRoleListResponse,
Expand Down
2 changes: 1 addition & 1 deletion xline-client/src/types/kv.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use xline::server::KeyRange;
use xlineapi::command::KeyRange;
pub use xlineapi::{
CompactionResponse, CompareResult, CompareTarget, DeleteRangeResponse, PutResponse,
RangeResponse, Response, ResponseOp, SortOrder, SortTarget, TargetUnion, TxnResponse,
Expand Down
3 changes: 1 addition & 2 deletions xline-client/src/types/watch.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,8 @@
use std::fmt::Debug;

use futures::channel::mpsc::Sender;
use xline::server::KeyRange;
use xlineapi::{command::KeyRange, RequestUnion, WatchCancelRequest, WatchProgressRequest};
pub use xlineapi::{Event, EventType, KeyValue, WatchResponse};
use xlineapi::{RequestUnion, WatchCancelRequest, WatchProgressRequest};

use crate::error::{Result, XlineClientError};

Expand Down
2 changes: 0 additions & 2 deletions xline/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -160,8 +160,6 @@ mod revision_number;
mod rpc {
pub(crate) use xlineapi::*;
}
/// Request validation module
mod request_validation;
/// restore module, only for test
pub mod restore;
/// Revision check
Expand Down
7 changes: 3 additions & 4 deletions xline/src/revision_check.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
use crate::{
rpc::{CompactionRequest, RangeRequest, Request, TxnRequest},
storage::ExecuteError,
};
use xlineapi::execute_error::ExecuteError;

use crate::rpc::{CompactionRequest, RangeRequest, Request, TxnRequest};

/// A union of requests that need revision check
pub(crate) enum RevisionRequest<'a> {
Expand Down
Loading