Skip to content

Commit

Permalink
feat: Implement WAL-based RaftLog storage
Browse files Browse the repository at this point in the history
Replace sled-based storage with a dedicated WAL implementation optimized
for Raft Log operations. The new implementation provides:

Performance improvements:
- Non-blocking batched fdatasync
- FILO caching for latest logs (configurable size/count)
- ~0.5ms write latency

Compatibility:
- Backward compatible with existing data format
- Auto-upgrades from V003 to V004 format on startup
- Supports rolling upgrades (no protocol changes)
- databend-metactl supports both V003/V004 import/export

Technical details:
- New storage format: V004
- Optimized specifically for Raft Log operations
- Preserves all existing functionality
  • Loading branch information
drmingdrmer committed Nov 6, 2024
1 parent bb56f16 commit 91aaac5
Show file tree
Hide file tree
Showing 52 changed files with 1,650 additions and 1,313 deletions.
115 changes: 75 additions & 40 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 3 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -289,6 +289,7 @@ flagset = "0.4"
flatbuffers = "24" # Must use the same version with arrow-ipc
flate2 = "1"
foreign_vec = "0.1.0"
fs_extra = "1.3.0"
futures = "0.3.24"
futures-async-stream = { version = "0.2.7" }
futures-util = "0.3.24"
Expand Down Expand Up @@ -408,6 +409,7 @@ prost = { version = "0.13" }
prost-build = { version = "0.13" }
prqlc = "0.11.3"
quanta = "0.11.1"
raft-log = { version = "0.2.1" }
rand = { version = "0.8.5", features = ["small_rng"] }
rayon = "1.9.0"
recursive = "0.1.1"
Expand Down Expand Up @@ -617,7 +619,7 @@ deltalake = { git = "https://github.com/delta-io/delta-rs", rev = "3038c145" }
ethnum = { git = "https://github.com/datafuse-extras/ethnum-rs", rev = "4cb05f1" }
jsonb = { git = "https://github.com/databendlabs/jsonb", rev = "ada713c" }
openai_api_rust = { git = "https://github.com/datafuse-extras/openai-api", rev = "819a0ed" }
openraft = { git = "https://github.com/databendlabs/openraft", tag = "v0.10.0-alpha.6" }
openraft = { git = "https://github.com/databendlabs/openraft", tag = "v0.10.0-alpha.7" }
orc-rust = { git = "https://github.com/datafusion-contrib/orc-rust", rev = "dfb1ede" }
recursive = { git = "https://github.com/datafuse-extras/recursive.git", rev = "6af35a1" }
sled = { git = "https://github.com/datafuse-extras/sled", tag = "v0.34.7-datafuse.1" }
Expand Down
2 changes: 1 addition & 1 deletion src/meta/binaries/metactl/export_from_disk.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ pub async fn export_from_dir(args: &ExportArgs) -> anyhow::Result<()> {
eprintln!();
eprintln!("Export:");

let sto_inn = StoreInner::open_create(&raft_config, Some(()), None).await?;
let sto_inn = StoreInner::open(&raft_config).await?;
let mut lines = Arc::new(sto_inn).export();

eprintln!(" From: {}", raft_config.raft_dir);
Expand Down
15 changes: 7 additions & 8 deletions src/meta/binaries/metactl/import.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,9 @@ use databend_common_meta_raft_store::config::RaftConfig;
use databend_common_meta_raft_store::key_spaces::RaftStoreEntry;
use databend_common_meta_raft_store::key_spaces::SMEntry;
use databend_common_meta_raft_store::ondisk::DataVersion;
use databend_common_meta_raft_store::sm_v003::adapter::SnapshotUpgradeV002ToV003;
use databend_common_meta_raft_store::sm_v003::adapter::SnapshotUpgradeV002ToV004;
use databend_common_meta_raft_store::sm_v003::write_entry::WriteEntry;
use databend_common_meta_raft_store::sm_v003::SnapshotStoreV003;
use databend_common_meta_raft_store::state::RaftState;
use databend_common_meta_raft_store::state_machine::MetaSnapshotId;
use databend_common_meta_sled_store::get_sled_db;
use databend_common_meta_sled_store::init_sled_db;
Expand Down Expand Up @@ -108,6 +107,7 @@ async fn import_lines<B: BufRead + 'static>(
}
DataVersion::V002 => import_v002(raft_config, it).await?,
DataVersion::V003 => import_v003(raft_config, it).await?,
DataVersion::V004 => crate::import_v004::import_v004(raft_config, it).await?,
};

Ok(max_log_id)
Expand Down Expand Up @@ -147,7 +147,7 @@ async fn import_v003(
let writer = snapshot_store.new_writer()?;
let (tx, join_handle) = writer.spawn_writer_thread("import_v003");

let mut converter = SnapshotUpgradeV002ToV003 {
let mut converter = SnapshotUpgradeV002ToV004 {
sys_data: sys_data.clone(),
};

Expand Down Expand Up @@ -305,10 +305,9 @@ async fn init_new_cluster(
eprintln!();
eprintln!("Initialize Cluster with: {:?}", nodes);

let db = get_sled_db();
let raft_config: RaftConfig = args.clone().into();

let mut sto = RaftStore::open_create(&raft_config, Some(()), None).await?;
let mut sto = RaftStore::open(&raft_config).await?;

let last_applied = {
let sm2 = sto.get_state_machine().await;
Expand Down Expand Up @@ -372,9 +371,9 @@ async fn init_new_cluster(
}
}

// Reset node id
let raft_state = RaftState::open_create(&db, &raft_config, Some(()), None).await?;
raft_state.set_node_id(args.id).await?;
// TODO: Reset node id
// let raft_state = RaftState::open_create(&db, &raft_config, Some(()), None).await?;
// raft_state.set_node_id(args.id).await?;

Ok(())
}
Expand Down
Loading

0 comments on commit 91aaac5

Please # to comment.