diff --git a/benches/ops/ops.rs b/benches/ops/ops.rs index a7c690b6e03c..dad82b8e679e 100644 --- a/benches/ops/ops.rs +++ b/benches/ops/ops.rs @@ -12,13 +12,11 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::io::SeekFrom; - use criterion::BenchmarkId; use criterion::Criterion; +use futures::future::join_all; use futures::io; use futures::io::BufReader; -use futures::AsyncSeekExt; use opendal::Operator; use opendal_test::services::fs; use opendal_test::services::s3; @@ -71,13 +69,15 @@ pub fn bench(c: &mut Criterion) { .iter(|| bench_buf_read(input.0.clone(), input.1)) }, ); + + group.throughput(criterion::Throughput::Bytes((TOTAL_SIZE / 2) as u64)); group.bench_with_input( - BenchmarkId::new("seek_read", &path), + BenchmarkId::new("range_read", &path), &(op.clone(), &path), |b, input| { - let pos = rng.gen_range(0..TOTAL_SIZE - BATCH_SIZE) as u64; + let pos = rng.gen_range(0..(TOTAL_SIZE / 2) as u64) as u64; b.to_async(&runtime) - .iter(|| bench_seek_read(input.0.clone(), input.1, pos)) + .iter(|| bench_range_read(input.0.clone(), input.1, pos)) }, ); group.throughput(criterion::Throughput::Bytes((TOTAL_SIZE / 2) as u64)); @@ -99,6 +99,40 @@ pub fn bench(c: &mut Criterion) { .iter(|| bench_write(input.0.clone(), input.1, input.2.clone())) }, ); + + // runtime + const RUNTIME_THREAD: usize = 4; + let mut builder = tokio::runtime::Builder::new_multi_thread(); + builder.enable_all().worker_threads(RUNTIME_THREAD); + + let runtime = builder.build().unwrap(); + for parallel in [2, 4, 6, 8, 10, 12, 16] { + group.throughput(criterion::Throughput::Bytes( + parallel as u64 * TOTAL_SIZE as u64 / 2, + )); + group.bench_with_input( + BenchmarkId::new(&format!("parallel_read_{}", parallel), &path), + &(op.clone(), &path, content.clone()), + |b, input| { + let pos = rng.gen_range(0..(TOTAL_SIZE / 2) as u64) as u64; + b.to_async(&runtime).iter(|| { + let futures = (0..parallel) + .map(|_| async { + bench_range_read(input.0.clone(), input.1, pos).await; + let mut d = 0; + // mock same little cpu work + for c in pos..pos + 100u64 { + d += c & 0x1f1f1f1f + c % 256; + } + let _ = d; + }) + .collect::>(); + join_all(futures) + }) + }, + ); + } + group.finish(); } } @@ -115,10 +149,8 @@ pub async fn bench_read(op: Operator, path: &str) { io::copy(&mut r, &mut io::sink()).await.unwrap(); } -pub async fn bench_seek_read(op: Operator, path: &str, pos: u64) { - let mut r = op.object(path).limited_reader(TOTAL_SIZE as u64); - r.seek(SeekFrom::Start(pos)).await.expect("seek"); - r.seek(SeekFrom::Start(0)).await.expect("seek"); +pub async fn bench_range_read(op: Operator, path: &str, pos: u64) { + let mut r = op.object(path).range_reader(pos, (TOTAL_SIZE / 2) as u64); io::copy(&mut r, &mut io::sink()).await.unwrap(); }