Skip to content

Commit

Permalink
feat: write wal when set
Browse files Browse the repository at this point in the history
  • Loading branch information
care0717 committed Jun 13, 2021
1 parent b3169f9 commit bf967c8
Show file tree
Hide file tree
Showing 5 changed files with 54 additions and 22 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1 +1,2 @@
/target
data
8 changes: 6 additions & 2 deletions src/avl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,9 @@ where
current_tree: &'a AvlTree<T, U>,
}

impl<'a, T: 'a + Ord + Clone + Sync + Send, U: Clone + Sync + Send> Iterator for AvlTreeSetIter<'a, T, U> {
impl<'a, T: 'a + Ord + Clone + Sync + Send, U: Clone + Sync + Send> Iterator
for AvlTreeSetIter<'a, T, U>
{
type Item = (&'a T, &'a U);
fn next(&mut self) -> Option<Self::Item> {
loop {
Expand Down Expand Up @@ -84,7 +86,9 @@ impl<'a, T: 'a + Ord + Clone + Sync + Send, U: Clone + Sync + Send> Iterator for
}
}
}
impl<T: Ord + Clone + Sync + Send, U: Clone + Sync + Send> FromIterator<(T, U)> for AvlTreeMap<T, U> {
impl<T: Ord + Clone + Sync + Send, U: Clone + Sync + Send> FromIterator<(T, U)>
for AvlTreeMap<T, U>
{
fn from_iter<I: IntoIterator<Item = (T, U)>>(iter: I) -> Self {
let mut set = Self::new();

Expand Down
27 changes: 20 additions & 7 deletions src/bin/server.rs
Original file line number Diff line number Diff line change
@@ -1,35 +1,48 @@
use lsm_engine::avl::AvlTreeMap;
use lsm_engine::decoder;
use lsm_engine::executor::Executor;
use lsm_engine::memtable::Memtable;
use std::fs::{File, OpenOptions};
use std::io::{stdout, BufWriter, Error, ErrorKind, Write};
use std::net::{TcpListener, TcpStream};
use std::thread;
use std::sync::{Arc, RwLock};
use lsm_engine::memtable::Memtable;
use lsm_engine::avl::AvlTreeMap;
use std::thread;

fn main() {
let listener = TcpListener::bind("0.0.0.0:33333").expect("Error. failed to bind.");
let mut memtable: Arc<RwLock<Box<dyn Memtable<String, String>>>> = Arc::new(RwLock::new(Box::new(AvlTreeMap::new())));
let memtable: Arc<RwLock<Box<dyn Memtable<String, String>>>> =
Arc::new(RwLock::new(Box::new(AvlTreeMap::new())));
let wal = Arc::new(RwLock::new(
OpenOptions::new()
.append(true)
.open("data/wal/wal.bin")
.unwrap(),
));
for streams in listener.incoming() {
match streams {
Err(e) => {
eprintln!("error: {}", e)
}
Ok(stream) => {
let memtable = memtable.clone();
let wal = wal.clone();
thread::spawn(move || {
handler(stream, memtable).unwrap_or_else(|error| eprintln!("{:?}", error));
handler(stream, memtable, wal).unwrap_or_else(|error| eprintln!("{:?}", error));
});
}
}
}
}

fn handler(stream: TcpStream, memtable: Arc<RwLock<Box<dyn Memtable<String, String>>>>) -> Result<(), Error> {
fn handler(
stream: TcpStream,
memtable: Arc<RwLock<Box<dyn Memtable<String, String>>>>,
wal: Arc<RwLock<File>>,
) -> Result<(), Error> {
println!("Connection from {}", stream.peer_addr()?);
let mut decoder = decoder::new(&stream);
let mut writer = BufWriter::new(&stream);
let mut executor = Executor::new(memtable);
let mut executor = Executor::new(memtable, wal);
loop {
let decoded = decoder.decode();
match decoded {
Expand Down
5 changes: 4 additions & 1 deletion src/decoder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,10 @@ impl<R: Read> Decoder<R> {
if nbytes == 0 {
return Err(Error::new(ErrorKind::UnexpectedEof, "got eof\n"));
}
Ok(command::new_command_set(_key.to_string(), buf))
Ok(command::new_command_set(
_key.to_string(),
buf.trim().parse().unwrap(),
))
}

fn decode_get(&mut self, commands: Vec<&str>) -> Result<command::Command, Error> {
Expand Down
35 changes: 23 additions & 12 deletions src/executor.rs
Original file line number Diff line number Diff line change
@@ -1,29 +1,40 @@
use crate::command::Command;
use crate::memtable::Memtable;
use std::sync::{RwLock, Arc};
use std::fs::File;
use std::io::Write;
use std::sync::{Arc, RwLock};

pub struct Executor {
memtable: Arc<RwLock<Box<dyn Memtable<String, String>>>>,
wal: Arc<RwLock<File>>,
}
impl Executor {
pub fn new(memtable: Arc<RwLock<Box<dyn Memtable<String, String>>>>) -> Self {
Self {
memtable,
}
pub fn new(
memtable: Arc<RwLock<Box<dyn Memtable<String, String>>>>,
wal: Arc<RwLock<File>>,
) -> Self {
Self { memtable, wal }
}
pub fn execute(&mut self, command: Command) -> String {
match command {
Command::Set { key, body } => {
let mut file = self.wal.write().unwrap();
file.write(body.as_bytes());
let body_len = body.len() as i32;
file.write(&body_len.to_le_bytes());
file.write(key.as_bytes());
let key_len = key.len() as i16;
file.write(&key_len.to_le_bytes());
self.memtable.write().unwrap().insert(key, body);
"STORED".to_string()
}
Command::Get { key } => {
self
.memtable.read().unwrap()
.search(&key)
.unwrap_or(&"".to_string())
.to_string()
},
Command::Get { key } => self
.memtable
.read()
.unwrap()
.search(&key)
.unwrap_or(&"".to_string())
.to_string(),
Command::Delete { key } => {
self.memtable.write().unwrap().delete(&key);
"DELETED".to_string()
Expand Down

0 comments on commit bf967c8

Please # to comment.