Skip to content

Commit

Permalink
refactor get_block_infos
Browse files Browse the repository at this point in the history
  • Loading branch information
gabrieldemian committed Oct 23, 2023
1 parent 46e0abe commit 0eea607
Show file tree
Hide file tree
Showing 7 changed files with 324 additions and 204 deletions.
3 changes: 2 additions & 1 deletion crates/vincenzo/src/bitfield.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,8 @@ mod tests {

#[test]
fn new_bitvec() {
let a = bitvec![u8, Msb0; 0; 0 as usize];
let a = bitvec![u8, Msb0; 0; 1];
// a.set(9, true);
println!("a {a:#?}");
println!("len {:#?}", a.len());

Expand Down
109 changes: 99 additions & 10 deletions crates/vincenzo/src/disk.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,12 +72,12 @@ pub enum DiskMsg {
#[derive(Clone, Copy, Hash, PartialEq, Eq, Default, Debug)]
pub enum PieceOrder {
/// Random First, select random pieces to download
#[default]
Random,
/// Rarest First, select the rarest pieces to download,
/// and the most common to download at the end.
Rarest,
/// Sequential downloads, only used in streaming.
#[default]
Sequential,
}

Expand Down Expand Up @@ -155,8 +155,10 @@ impl Disk {
// now with the dirs created, we create the file
if let Some(file_ext) = last {
file_dir.push(file_ext);

self.open_file(file_dir).await?;
self.open_file(file_dir)
.await?
.set_len(file.length as u64)
.await?;
}
}
}
Expand Down Expand Up @@ -190,8 +192,6 @@ impl Disk {
}
};
}
debug!("pieces_blocks len {:?}", pieces_blocks.len());
debug!("pieces_blocks first {:?}", pieces_blocks.get(0));
self.pieces_blocks.insert(info_hash, pieces_blocks);
self.downloaded_pieces.insert(info_hash, 0);

Expand Down Expand Up @@ -253,6 +253,8 @@ impl Disk {
.filter(|v| v.info_hash == info_hash)
.collect();

debug!("calculating score of {:?} peers", peer_ctxs.len());

if peer_ctxs.is_empty() {
return Err(Error::NoPeers);
}
Expand Down Expand Up @@ -302,7 +304,7 @@ impl Disk {
debug!("pieces changed to rarest {pieces:?}");
let piece_order = self.piece_order.get_mut(&info_hash).unwrap();
if *piece_order == PieceOrder::Random {
*piece_order = PieceOrder::Rarest;
// *piece_order = PieceOrder::Rarest;
}

Ok(())
Expand Down Expand Up @@ -551,13 +553,11 @@ impl Disk {

let torrent_tx = torrent_ctx.tx.clone();

// the write operation is heavy, so we want to spawn a thread here,
// and only increment the downloaded count after writing,
// because otherwise the UI could say the torrent is complete
// while not every byte is written to disk.
// note: spawning a task to do this write was causing bugs on Windows.
// todo: fix this

fs_file.write_all(&block.block).await.unwrap();

let _ = torrent_tx
.send(TorrentMsg::IncrementDownloaded(len as u64))
.await;
Expand Down Expand Up @@ -1039,6 +1039,95 @@ mod tests {
tokio::fs::remove_dir_all(download_dir).await.unwrap();
}

#[tokio::test]
async fn write_out_of_order() {
let name = "arch";

let info = Info {
file_length: None,
name: name.to_owned(),
piece_length: 3,
pieces: vec![20; 0],
files: Some(vec![metainfo::File {
length: 3,
path: vec!["out.txt".to_owned()],
}]),
};

let magnet = format!("magnet:?xt=urn:btih:9999999999999999999999999999999999999999&dn={name}&tr=udp%3A%2F%2Ftracker.coppersurfer.tk%3A6969%2Fannounce&tr=udp%3A%2F%2Ftracker.openbittorrent.com%3A6969%2Fannounce&tr=udp%3A%2F%2Ftracker.bittor.pw%3A1337%2Fannounce&tr=udp%3A%2F%2Ftracker.opentrackr.org%3A1337&tr=udp%3A%2F%2Fbt.xxx-tracker.com%3A2710%2Fannounce&tr=udp%3A%2F%2Fpublic.popcorn-tracker.org%3A6969%2Fannounce&tr=udp%3A%2F%2Feddie4.nl%3A6969%2Fannounce&tr=udp%3A%2F%2Ftracker.torrent.eu.org%3A451%2Fannounce&tr=udp%3A%2F%2Fp4p.arenabg.com%3A1337%2Fannounce&tr=udp%3A%2F%2Ftracker.tiny-vps.com%3A6969%2Fannounce&tr=udp%3A%2F%2Fopen.stealth.si%3A80%2Fannounce");
let mut rng = rand::thread_rng();
let download_dir: String = (0..20).map(|_| rng.sample(Alphanumeric) as char).collect();

let (disk_tx, _) = mpsc::channel::<DiskMsg>(3);

let (_, rx) = mpsc::channel(5);
let mut disk = Disk::new(rx, download_dir.clone());

let (fr_tx, _) = mpsc::channel::<DaemonMsg>(300);
let magnet = Magnet::new(&magnet).unwrap();
let torrent = Torrent::new(disk_tx, fr_tx, magnet);
let mut info_t = torrent.ctx.info.write().await;
*info_t = info.clone();
drop(info_t);

disk.new_torrent(torrent.ctx.clone()).await.unwrap();

let info_hash = torrent.ctx.info_hash;

let mut p = torrent.ctx.bitfield.write().await;
*p = Bitfield::from_vec(vec![255]);
drop(info);
drop(p);

//
// WRITE
//

let block = Block {
index: 0,
begin: 1,
block: vec![1],
};
let result = disk.write_block(block, info_hash).await;
assert!(result.is_ok());
let block = Block {
index: 0,
begin: 0,
block: vec![2],
};
let result = disk.write_block(block, info_hash).await;
assert!(result.is_ok());
let block = Block {
index: 0,
begin: 2,
block: vec![3],
};
let result = disk.write_block(block.clone(), info_hash).await;
assert!(result.is_ok());

//
// READ
//
let block_info = BlockInfo {
index: 0,
begin: 0,
len: 2,
};
let mut d = disk
.open_file(format!("{download_dir}/arch/out.txt"))
.await
.unwrap();

let mut buf = Vec::new();
d.read_to_end(&mut buf).await.unwrap();

println!("buf {buf:?}");

// let result = disk.read_block(block_info, info_hash).await;
// assert_eq!(result.unwrap(), vec![0, 0, 3]);
tokio::fs::remove_dir_all(&download_dir).await.unwrap();
}

// if we can write, read blocks, and then validate the hash of the pieces
#[tokio::test]
async fn read_write_blocks_and_validate_pieces() {
Expand Down
99 changes: 50 additions & 49 deletions crates/vincenzo/src/metainfo.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,59 +62,54 @@ impl Info {
}
/// Get all block_infos of a torrent
/// Returns an Err if the Info is malformed, if it does not have `files` or `file_length`.
// pub fn get_block_infos_by_piece(
// &self,
// piece: u32,
// ) -> Result<VecDeque<BlockInfo>, error::Error> {
// let is_last_piece = self.pieces() >= piece;
// let piece_len = {
// if is_last_piece {
// let r = (self.get_size() % self.piece_length as u64) as u32;
// r.max(self.piece_length)
// } else {
// self.piece_length
// }
// };
//
// let mut infos: VecDeque<BlockInfo> = VecDeque::new();
// let mut cursor: u32 = 0;
//
// while cursor < piece_len {
// let len = 0;
// let info = BlockInfo {
// index: piece,
// begin: cursor,
// len,
// };
// infos.push_back(info);
// cursor += len;
// }
//
// Ok(infos)
// }
/// Get all block_infos of a torrent
/// Returns an Err if the Info is malformed, if it does not have `files` or `file_length`.
pub fn get_block_infos(&self) -> Result<VecDeque<BlockInfo>, error::Error> {
// multi file torrent
if let Some(files) = &self.files {
let mut infos: VecDeque<BlockInfo> = VecDeque::new();
for (_, file) in files.iter().enumerate() {
let back = infos.back();
let r = file.get_block_infos(self.piece_length, back);
infos.extend(r);
let mut block_infos = Vec::new();

let total_pieces = self.pieces();
let max_block_len = BLOCK_LEN;

let total_size = self.get_size() as u32;

let mut processed_bytes = 0;
let mut file_index = 0;
let mut offset_within_file = 0;

for piece_index in 0..total_pieces as u32 {
let mut piece_offset = 0;

while piece_offset < self.piece_length && processed_bytes < total_size {
let remaining_bytes_in_current_file =
if let Some(files) = &self.files {
files[file_index].length - offset_within_file
} else {
total_size - processed_bytes
};

let block_len = std::cmp::min(
std::cmp::min(remaining_bytes_in_current_file, self.piece_length - piece_offset),
max_block_len
);

block_infos.push(BlockInfo {
index: piece_index,
begin: piece_offset,
len: block_len,
});

processed_bytes += block_len;
piece_offset += block_len;
offset_within_file += block_len;

if let Some(files) = &self.files {
if offset_within_file == files[file_index].length {
file_index += 1;
offset_within_file = 0;
}
}
}
return Ok(infos);
}

// single file torrent
if let Some(length) = self.file_length {
let file = File {
length,
path: vec![self.name.to_owned()],
};
return Ok(file.get_block_infos(self.piece_length, None));
}
Err(error::Error::FileOpenError("".to_owned()))
Ok(block_infos.into())
}
/// Get the total size of the torrent, in bytes.
pub fn get_size(&self) -> u64 {
Expand Down Expand Up @@ -483,6 +478,7 @@ mod tests {
let info = Info {
file_length: Some(30),
piece_length: 15,
pieces: vec![0; 40],
..Default::default()
};
assert_eq!(
Expand Down Expand Up @@ -513,6 +509,7 @@ mod tests {
let info = Info {
file_length: Some(32868),
piece_length: BLOCK_LEN,
pieces: vec![0; 60],
..Default::default()
};
let blocks = info.get_block_infos().unwrap();
Expand Down Expand Up @@ -552,6 +549,7 @@ mod tests {
path: vec!["a.txt".to_owned()],
}]),
piece_length: BLOCK_LEN,
pieces: vec![0; 40],
..Default::default()
};
let blocks = info.get_block_infos().unwrap();
Expand Down Expand Up @@ -586,6 +584,7 @@ mod tests {
path: vec!["a.txt".to_owned()],
}]),
piece_length: 32668,
pieces: vec![0; 40],
..Default::default()
};
let blocks = info.get_block_infos().unwrap();
Expand Down Expand Up @@ -631,6 +630,7 @@ mod tests {
},
]),
piece_length: 32668, // -100 of block_len
pieces: vec![0; 40],
..Default::default()
};
let blocks = info.get_block_infos().unwrap();
Expand Down Expand Up @@ -681,6 +681,7 @@ mod tests {
},
]),
piece_length: 32668, // -100 of block_len
pieces: vec![0; 40],
..Default::default()
};
let blocks = info.get_block_infos().unwrap();
Expand Down
Loading

0 comments on commit 0eea607

Please # to comment.