From 3151b338bf0c13c43b026940a910a408af51cd19 Mon Sep 17 00:00:00 2001 From: care0717 Date: Sun, 13 Jun 2021 20:15:58 +0900 Subject: [PATCH] feat: recover from wal --- src/bin/server.rs | 68 +++++++++++++++++++++++++++++++++++++++-------- src/executor.rs | 37 ++++++++++++++------------ 2 files changed, 77 insertions(+), 28 deletions(-) diff --git a/src/bin/server.rs b/src/bin/server.rs index 343b60a..3cf6812 100644 --- a/src/bin/server.rs +++ b/src/bin/server.rs @@ -2,18 +2,27 @@ use lsm_engine::avl::AvlTreeMap; use lsm_engine::decoder; use lsm_engine::executor::Executor; use lsm_engine::memtable::Memtable; +use std::convert::TryInto; +use std::error::Error; use std::fs::{File, OpenOptions}; -use std::io::{stdout, BufWriter, Error, ErrorKind, Write}; +use std::io::{stdout, BufWriter, ErrorKind, Read, Write}; +use std::mem::size_of; use std::net::{TcpListener, TcpStream}; +use std::path::Path; +use std::result::Result::Ok; use std::sync::{Arc, RwLock}; -use std::thread; +use std::{fs, thread}; fn main() { let listener = TcpListener::bind("0.0.0.0:33333").expect("Error. failed to bind."); + let wal_path = Path::new("data/wal/wal.bin"); + let map = recover(wal_path).unwrap(); + let memtable: Arc>>> = - Arc::new(RwLock::new(Box::new(AvlTreeMap::new()))); + Arc::new(RwLock::new(Box::new(map))); let wal = Arc::new(RwLock::new( OpenOptions::new() + .create(true) .append(true) .open("data/wal/wal.bin") .unwrap(), @@ -38,7 +47,7 @@ fn handler( stream: TcpStream, memtable: Arc>>>, wal: Arc>, -) -> Result<(), Error> { +) -> Result<(), Box> { println!("Connection from {}", stream.peer_addr()?); let mut decoder = decoder::new(&stream); let mut writer = BufWriter::new(&stream); @@ -46,16 +55,22 @@ fn handler( loop { let decoded = decoder.decode(); match decoded { - Ok(c) => { - let result = executor.execute(c); - print!("{}\n", result); - writer.write(format!("{}\n", result).as_bytes())?; - writer.flush()?; - } + Ok(c) => match executor.execute(c) { + Ok(result) => { + print!("{}\n", result); + writer.write(format!("{}\n", result).as_bytes())?; + writer.flush()?; + } + Err(e) => { + print!("{}\n", e); + writer.write(format!("{}\n", e).as_bytes())?; + writer.flush()?; + } + }, Err(e) => { print!("{}", e); if e.kind() == ErrorKind::UnexpectedEof { - return Err(e); + return Err(Box::new(e)); } writer.write(format!("{}\n", e.to_string()).as_bytes())?; writer.flush()?; @@ -64,3 +79,34 @@ fn handler( stdout().flush()?; } } + +fn recover(path: &Path) -> Result, Box> { + let mut map: AvlTreeMap = AvlTreeMap::new(); + if let Ok(mut wal) = File::open(path) { + let mut buffer = Vec::new(); + wal.read_to_end(&mut buffer)?; + let mut index = buffer.len(); + while index > 0 { + index -= size_of::(); + let key_len = + i16::from_le_bytes(buffer[index..(index + size_of::())].try_into()?) as usize; + index -= key_len; + let key = String::from_utf8(buffer[index..(index + key_len)].to_vec())?; + index -= size_of::(); + let value_len = + i32::from_le_bytes(buffer[index..(index + size_of::())].try_into()?) as usize; + index -= value_len; + let value = String::from_utf8(buffer[index..(index + value_len)].to_vec())?; + if map.search(&key).is_none() { + map.insert(key, value); + } + } + } else { + if let Some(parent) = path.parent() { + if !parent.exists() { + fs::create_dir_all(parent)?; + } + } + } + return Ok(map); +} diff --git a/src/executor.rs b/src/executor.rs index 9e8d49c..a55d80b 100644 --- a/src/executor.rs +++ b/src/executor.rs @@ -1,5 +1,6 @@ use crate::command::Command; use crate::memtable::Memtable; +use std::error::Error; use std::fs::File; use std::io::Write; use std::sync::{Arc, RwLock}; @@ -15,29 +16,31 @@ impl Executor { ) -> Self { Self { memtable, wal } } - pub fn execute(&mut self, command: Command) -> String { + pub fn execute(&mut self, command: Command) -> Result> { match command { Command::Set { key, body } => { - let mut file = self.wal.write().unwrap(); - file.write(body.as_bytes()); + let mut file = self.wal.write()?; + file.write(body.as_bytes())?; let body_len = body.len() as i32; - file.write(&body_len.to_le_bytes()); - file.write(key.as_bytes()); + file.write(&body_len.to_le_bytes())?; + file.write(key.as_bytes())?; let key_len = key.len() as i16; - file.write(&key_len.to_le_bytes()); - self.memtable.write().unwrap().insert(key, body); - "STORED".to_string() + file.write(&key_len.to_le_bytes())?; + self.memtable.write()?.insert(key, body); + Ok("STORED".to_string()) + } + Command::Get { key } => { + let value = self + .memtable + .read()? + .search(&key) + .unwrap_or(&"".to_string()) + .to_string(); + Ok(format!("VALUE {} {}", key, value)) } - Command::Get { key } => self - .memtable - .read() - .unwrap() - .search(&key) - .unwrap_or(&"".to_string()) - .to_string(), Command::Delete { key } => { - self.memtable.write().unwrap().delete(&key); - "DELETED".to_string() + self.memtable.write()?.delete(&key); + Ok("DELETED".to_string()) } } }