Skip to content

Commit d8fb500

Browse files
RUST-2004 Benchmark client bulk write (#1293)
1 parent daeeff7 commit d8fb500

20 files changed

+440
-278
lines changed

.evergreen/benchmarks.yml

+1-2
Original file line numberDiff line numberDiff line change
@@ -290,7 +290,7 @@ tasks:
290290
commands:
291291
- func: "bootstrap mongo-orchestration"
292292
vars:
293-
MONGODB_VERSION: "v6.0-perf"
293+
MONGODB_VERSION: "v8.0-perf"
294294
# Note that drivers-evergreen-tools expects `SSL` as the environmental
295295
# variable, not `TLS`, so we have to use that for the actual value used in the
296296
# script; we use `TLS` for the metadata that isn't used by the actual shell
@@ -364,4 +364,3 @@ buildvariants:
364364
display_name: "Compile"
365365
tasks:
366366
- "benchmark-compile"
367-

benchmarks/src/bench.rs

+12-8
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
pub mod bson_decode;
22
pub mod bson_encode;
3+
pub mod bulk_write;
34
pub mod find_many;
45
pub mod find_one;
56
pub mod gridfs_download;
@@ -59,17 +60,20 @@ pub static TARGET_ITERATION_COUNT: Lazy<usize> = Lazy::new(|| {
5960

6061
#[async_trait::async_trait]
6162
pub trait Benchmark: Sized {
63+
/// The options used to construct the benchmark.
6264
type Options;
65+
/// The state needed to perform the benchmark task.
66+
type TaskState: Default;
6367

6468
/// execute once before benchmarking
6569
async fn setup(options: Self::Options) -> Result<Self>;
6670

6771
/// execute at the beginning of every iteration
68-
async fn before_task(&mut self) -> Result<()> {
69-
Ok(())
72+
async fn before_task(&self) -> Result<Self::TaskState> {
73+
Ok(Default::default())
7074
}
7175

72-
async fn do_task(&self) -> Result<()>;
76+
async fn do_task(&self, state: Self::TaskState) -> Result<()>;
7377

7478
/// execute at the end of every iteration
7579
async fn after_task(&self) -> Result<()> {
@@ -108,7 +112,7 @@ fn finished(duration: Duration, iter: usize) -> bool {
108112
pub async fn run_benchmark<B: Benchmark + Send + Sync>(
109113
options: B::Options,
110114
) -> Result<Vec<Duration>> {
111-
let mut test = B::setup(options).await?;
115+
let test = B::setup(options).await?;
112116

113117
let mut test_durations = Vec::new();
114118

@@ -127,9 +131,9 @@ pub async fn run_benchmark<B: Benchmark + Send + Sync>(
127131
while !finished(benchmark_timer.elapsed(), iter) {
128132
progress_bar.inc(1);
129133

130-
test.before_task().await?;
134+
let state = test.before_task().await?;
131135
let timer = Instant::now();
132-
test.do_task().await?;
136+
test.do_task(state).await?;
133137
test_durations.push(timer.elapsed());
134138
test.after_task().await?;
135139

@@ -152,13 +156,13 @@ pub async fn drop_database(uri: &str, database: &str) -> Result<()> {
152156
.run_command(doc! { "hello": true })
153157
.await?;
154158

155-
client.database(&database).drop().await?;
159+
client.database(database).drop().await?;
156160

157161
// in sharded clusters, take additional steps to ensure database is dropped completely.
158162
// see: https://www.mongodb.com/docs/manual/reference/method/db.dropDatabase/#replica-set-and-sharded-clusters
159163
let is_sharded = hello.get_str("msg").ok() == Some("isdbgrid");
160164
if is_sharded {
161-
client.database(&database).drop().await?;
165+
client.database(database).drop().await?;
162166
for host in options.hosts {
163167
client
164168
.database("admin")

benchmarks/src/bench/bson_decode.rs

+8-20
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,7 @@
1-
use std::{convert::TryInto, path::PathBuf};
1+
use anyhow::Result;
2+
use mongodb::bson::Document;
23

3-
use anyhow::{bail, Result};
4-
use mongodb::bson::{Bson, Document};
5-
use serde_json::Value;
6-
7-
use crate::{bench::Benchmark, fs::read_to_string};
4+
use crate::bench::Benchmark;
85

96
pub struct BsonDecodeBenchmark {
107
num_iter: usize,
@@ -13,36 +10,27 @@ pub struct BsonDecodeBenchmark {
1310

1411
pub struct Options {
1512
pub num_iter: usize,
16-
pub path: PathBuf,
13+
pub doc: Document,
1714
}
1815

1916
#[async_trait::async_trait]
2017
impl Benchmark for BsonDecodeBenchmark {
2118
type Options = Options;
19+
type TaskState = ();
2220

2321
async fn setup(options: Self::Options) -> Result<Self> {
24-
let mut file = read_to_string(&options.path).await?;
25-
26-
let json: Value = serde_json::from_str(&mut file)?;
27-
let doc = match json.try_into()? {
28-
Bson::Document(doc) => doc,
29-
_ => bail!("invalid json test file"),
30-
};
31-
3222
let mut bytes: Vec<u8> = Vec::new();
33-
doc.to_writer(&mut bytes)?;
23+
options.doc.to_writer(&mut bytes)?;
3424

3525
Ok(BsonDecodeBenchmark {
3626
num_iter: options.num_iter,
3727
bytes,
3828
})
3929
}
4030

41-
async fn do_task(&self) -> Result<()> {
31+
async fn do_task(&self, _state: Self::TaskState) -> Result<()> {
4232
for _ in 0..self.num_iter {
43-
// `&[u8]` implements `Read`, and `from_reader` needs a `&mut R: Read`, so we need a
44-
// `&mut &[u8]`.
45-
let _doc = Document::from_reader(&mut &self.bytes[..])?;
33+
let _doc = Document::from_reader(&self.bytes[..])?;
4634
}
4735

4836
Ok(())

benchmarks/src/bench/bson_encode.rs

+7-17
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,7 @@
1-
use std::{convert::TryInto, path::PathBuf};
1+
use anyhow::Result;
2+
use mongodb::bson::Document;
23

3-
use anyhow::{bail, Result};
4-
use mongodb::bson::{Bson, Document};
5-
use serde_json::Value;
6-
7-
use crate::{bench::Benchmark, fs::read_to_string};
4+
use crate::bench::Benchmark;
85

96
pub struct BsonEncodeBenchmark {
107
num_iter: usize,
@@ -13,29 +10,22 @@ pub struct BsonEncodeBenchmark {
1310

1411
pub struct Options {
1512
pub num_iter: usize,
16-
pub path: PathBuf,
13+
pub doc: Document,
1714
}
1815

1916
#[async_trait::async_trait]
2017
impl Benchmark for BsonEncodeBenchmark {
2118
type Options = Options;
19+
type TaskState = ();
2220

2321
async fn setup(options: Self::Options) -> Result<Self> {
24-
let mut file = read_to_string(&options.path).await?;
25-
26-
let json: Value = serde_json::from_str(&mut file)?;
27-
let doc = match json.try_into()? {
28-
Bson::Document(doc) => doc,
29-
_ => bail!("invalid json test file"),
30-
};
31-
3222
Ok(BsonEncodeBenchmark {
3323
num_iter: options.num_iter,
34-
doc,
24+
doc: options.doc,
3525
})
3626
}
3727

38-
async fn do_task(&self) -> Result<()> {
28+
async fn do_task(&self, _state: Self::TaskState) -> Result<()> {
3929
for _ in 0..self.num_iter {
4030
let mut bytes: Vec<u8> = Vec::new();
4131
self.doc.to_writer(&mut bytes)?;

benchmarks/src/bench/bulk_write.rs

+149
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,149 @@
1+
use anyhow::Result;
2+
use mongodb::{
3+
bson::{doc, Document},
4+
options::{DeleteOneModel, InsertOneModel, ReplaceOneModel, WriteModel},
5+
Client,
6+
Namespace,
7+
};
8+
use once_cell::sync::Lazy;
9+
10+
use super::{drop_database, Benchmark, COLL_NAME, DATABASE_NAME};
11+
12+
pub struct InsertBulkWriteBenchmark {
13+
client: Client,
14+
uri: String,
15+
write_models: Vec<WriteModel>,
16+
}
17+
18+
pub struct Options {
19+
pub uri: String,
20+
pub doc: Document,
21+
pub num_models: usize,
22+
}
23+
24+
#[async_trait::async_trait]
25+
impl Benchmark for InsertBulkWriteBenchmark {
26+
type Options = Options;
27+
type TaskState = Vec<WriteModel>;
28+
29+
async fn setup(options: Self::Options) -> Result<Self> {
30+
let client = Client::with_uri_str(&options.uri).await?;
31+
drop_database(options.uri.as_str(), DATABASE_NAME.as_str()).await?;
32+
33+
let write_models = vec![
34+
WriteModel::InsertOne(
35+
InsertOneModel::builder()
36+
.namespace(Namespace::new(DATABASE_NAME.as_str(), COLL_NAME.as_str()))
37+
.document(options.doc.clone())
38+
.build()
39+
);
40+
options.num_models
41+
];
42+
43+
Ok(Self {
44+
client,
45+
uri: options.uri,
46+
write_models,
47+
})
48+
}
49+
50+
async fn before_task(&self) -> Result<Self::TaskState> {
51+
self.client
52+
.database(&DATABASE_NAME)
53+
.collection::<Document>(&COLL_NAME)
54+
.drop()
55+
.await?;
56+
self.client
57+
.database(&DATABASE_NAME)
58+
.create_collection(COLL_NAME.as_str())
59+
.await?;
60+
Ok(self.write_models.clone())
61+
}
62+
63+
async fn do_task(&self, write_models: Self::TaskState) -> Result<()> {
64+
self.client.bulk_write(write_models).await?;
65+
Ok(())
66+
}
67+
68+
async fn teardown(&self) -> Result<()> {
69+
drop_database(self.uri.as_str(), DATABASE_NAME.as_str()).await?;
70+
Ok(())
71+
}
72+
}
73+
74+
static COLLECTION_NAMES: Lazy<Vec<String>> =
75+
Lazy::new(|| (1..=10).map(|i| format!("corpus_{}", i)).collect());
76+
77+
pub struct MixedBulkWriteBenchmark {
78+
client: Client,
79+
uri: String,
80+
write_models: Vec<WriteModel>,
81+
}
82+
83+
#[async_trait::async_trait]
84+
impl Benchmark for MixedBulkWriteBenchmark {
85+
type Options = Options;
86+
type TaskState = Vec<WriteModel>;
87+
88+
async fn setup(options: Self::Options) -> Result<Self> {
89+
let client = Client::with_uri_str(&options.uri).await?;
90+
drop_database(options.uri.as_str(), DATABASE_NAME.as_str()).await?;
91+
92+
let mut write_models = Vec::new();
93+
for i in 0..options.num_models {
94+
let collection_name = COLLECTION_NAMES.get(i % 10).unwrap();
95+
let namespace = Namespace::new(DATABASE_NAME.as_str(), collection_name);
96+
if i % 3 == 0 {
97+
write_models.push(
98+
InsertOneModel::builder()
99+
.namespace(namespace)
100+
.document(options.doc.clone())
101+
.build()
102+
.into(),
103+
);
104+
} else if i % 3 == 1 {
105+
write_models.push(
106+
ReplaceOneModel::builder()
107+
.namespace(namespace)
108+
.filter(doc! {})
109+
.replacement(options.doc.clone())
110+
.build()
111+
.into(),
112+
);
113+
} else {
114+
write_models.push(
115+
DeleteOneModel::builder()
116+
.namespace(namespace)
117+
.filter(doc! {})
118+
.build()
119+
.into(),
120+
);
121+
}
122+
}
123+
124+
Ok(Self {
125+
client,
126+
uri: options.uri,
127+
write_models,
128+
})
129+
}
130+
131+
async fn before_task(&self) -> Result<Self::TaskState> {
132+
let database = self.client.database(&DATABASE_NAME);
133+
database.drop().await?;
134+
for collection_name in COLLECTION_NAMES.iter() {
135+
database.create_collection(collection_name).await?;
136+
}
137+
Ok(self.write_models.clone())
138+
}
139+
140+
async fn do_task(&self, write_models: Self::TaskState) -> Result<()> {
141+
self.client.bulk_write(write_models).await?;
142+
Ok(())
143+
}
144+
145+
async fn teardown(&self) -> Result<()> {
146+
drop_database(self.uri.as_str(), DATABASE_NAME.as_str()).await?;
147+
Ok(())
148+
}
149+
}

0 commit comments

Comments
 (0)