Skip to content

Commit

Permalink
feat: recover from wal
Browse files Browse the repository at this point in the history
  • Loading branch information
care0717 committed Jun 13, 2021
1 parent bf967c8 commit 3151b33
Show file tree
Hide file tree
Showing 2 changed files with 77 additions and 28 deletions.
68 changes: 57 additions & 11 deletions src/bin/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,18 +2,27 @@ use lsm_engine::avl::AvlTreeMap;
use lsm_engine::decoder;
use lsm_engine::executor::Executor;
use lsm_engine::memtable::Memtable;
use std::convert::TryInto;
use std::error::Error;
use std::fs::{File, OpenOptions};
use std::io::{stdout, BufWriter, Error, ErrorKind, Write};
use std::io::{stdout, BufWriter, ErrorKind, Read, Write};
use std::mem::size_of;
use std::net::{TcpListener, TcpStream};
use std::path::Path;
use std::result::Result::Ok;
use std::sync::{Arc, RwLock};
use std::thread;
use std::{fs, thread};

fn main() {
let listener = TcpListener::bind("0.0.0.0:33333").expect("Error. failed to bind.");
let wal_path = Path::new("data/wal/wal.bin");
let map = recover(wal_path).unwrap();

let memtable: Arc<RwLock<Box<dyn Memtable<String, String>>>> =
Arc::new(RwLock::new(Box::new(AvlTreeMap::new())));
Arc::new(RwLock::new(Box::new(map)));
let wal = Arc::new(RwLock::new(
OpenOptions::new()
.create(true)
.append(true)
.open("data/wal/wal.bin")
.unwrap(),
Expand All @@ -38,24 +47,30 @@ fn handler(
stream: TcpStream,
memtable: Arc<RwLock<Box<dyn Memtable<String, String>>>>,
wal: Arc<RwLock<File>>,
) -> Result<(), Error> {
) -> Result<(), Box<dyn Error>> {
println!("Connection from {}", stream.peer_addr()?);
let mut decoder = decoder::new(&stream);
let mut writer = BufWriter::new(&stream);
let mut executor = Executor::new(memtable, wal);
loop {
let decoded = decoder.decode();
match decoded {
Ok(c) => {
let result = executor.execute(c);
print!("{}\n", result);
writer.write(format!("{}\n", result).as_bytes())?;
writer.flush()?;
}
Ok(c) => match executor.execute(c) {
Ok(result) => {
print!("{}\n", result);
writer.write(format!("{}\n", result).as_bytes())?;
writer.flush()?;
}
Err(e) => {
print!("{}\n", e);
writer.write(format!("{}\n", e).as_bytes())?;
writer.flush()?;
}
},
Err(e) => {
print!("{}", e);
if e.kind() == ErrorKind::UnexpectedEof {
return Err(e);
return Err(Box::new(e));
}
writer.write(format!("{}\n", e.to_string()).as_bytes())?;
writer.flush()?;
Expand All @@ -64,3 +79,34 @@ fn handler(
stdout().flush()?;
}
}

fn recover(path: &Path) -> Result<AvlTreeMap<String, String>, Box<dyn Error>> {
let mut map: AvlTreeMap<String, String> = AvlTreeMap::new();
if let Ok(mut wal) = File::open(path) {
let mut buffer = Vec::new();
wal.read_to_end(&mut buffer)?;
let mut index = buffer.len();
while index > 0 {
index -= size_of::<i16>();
let key_len =
i16::from_le_bytes(buffer[index..(index + size_of::<i16>())].try_into()?) as usize;
index -= key_len;
let key = String::from_utf8(buffer[index..(index + key_len)].to_vec())?;
index -= size_of::<i32>();
let value_len =
i32::from_le_bytes(buffer[index..(index + size_of::<i32>())].try_into()?) as usize;
index -= value_len;
let value = String::from_utf8(buffer[index..(index + value_len)].to_vec())?;
if map.search(&key).is_none() {
map.insert(key, value);
}
}
} else {
if let Some(parent) = path.parent() {
if !parent.exists() {
fs::create_dir_all(parent)?;
}
}
}
return Ok(map);
}
37 changes: 20 additions & 17 deletions src/executor.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use crate::command::Command;
use crate::memtable::Memtable;
use std::error::Error;
use std::fs::File;
use std::io::Write;
use std::sync::{Arc, RwLock};
Expand All @@ -15,29 +16,31 @@ impl Executor {
) -> Self {
Self { memtable, wal }
}
pub fn execute(&mut self, command: Command) -> String {
pub fn execute(&mut self, command: Command) -> Result<String, Box<dyn Error + '_>> {
match command {
Command::Set { key, body } => {
let mut file = self.wal.write().unwrap();
file.write(body.as_bytes());
let mut file = self.wal.write()?;
file.write(body.as_bytes())?;
let body_len = body.len() as i32;
file.write(&body_len.to_le_bytes());
file.write(key.as_bytes());
file.write(&body_len.to_le_bytes())?;
file.write(key.as_bytes())?;
let key_len = key.len() as i16;
file.write(&key_len.to_le_bytes());
self.memtable.write().unwrap().insert(key, body);
"STORED".to_string()
file.write(&key_len.to_le_bytes())?;
self.memtable.write()?.insert(key, body);
Ok("STORED".to_string())
}
Command::Get { key } => {
let value = self
.memtable
.read()?
.search(&key)
.unwrap_or(&"".to_string())
.to_string();
Ok(format!("VALUE {} {}", key, value))
}
Command::Get { key } => self
.memtable
.read()
.unwrap()
.search(&key)
.unwrap_or(&"".to_string())
.to_string(),
Command::Delete { key } => {
self.memtable.write().unwrap().delete(&key);
"DELETED".to_string()
self.memtable.write()?.delete(&key);
Ok("DELETED".to_string())
}
}
}
Expand Down

0 comments on commit 3151b33

Please # to comment.