Skip to content

Commit

Permalink
New (unfinished) test to prove out multiframe bug
Browse files Browse the repository at this point in the history
  • Loading branch information
marc-casperlabs committed Feb 28, 2024
1 parent 3b75b2a commit c282665
Showing 1 changed file with 98 additions and 0 deletions.
98 changes: 98 additions & 0 deletions src/rpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1515,4 +1515,102 @@ mod tests {

info!("all joined");
}

#[tokio::test]
async fn send_two_large_requests() {
const NUM_REQUESTS: u16 = 20;
const PAYLOAD_SIZE: usize = 10_000;

tracing_subscriber::fmt()
.with_env_filter(tracing_subscriber::EnvFilter::from_default_env())
.try_init()
.ok();

info!("starting send-two-large test");

let channel_cfg = ChannelConfiguration::new()
.with_max_request_payload_size(PAYLOAD_SIZE as u32 * 2)
.with_max_response_payload_size(0)
.with_request_limit(NUM_REQUESTS / 2);

let protocol_builder =
ProtocolBuilder::with_default_channel_config(channel_cfg).max_frame_size(1024);

let rpc_builder: RpcBuilder<1> = RpcBuilder::new(IoCoreBuilder::with_default_buffer_size(
protocol_builder,
NUM_REQUESTS as usize * 2,
))
.with_bubble_timeouts(true)
.with_default_timeout(Duration::from_secs(5));

let pipe_buffer = (PAYLOAD_SIZE + 16) * NUM_REQUESTS as usize + 1024;

let (alice_stream, bob_stream) = tokio::io::duplex(pipe_buffer);

let mut alice = CompleteSetup::new(&rpc_builder, alice_stream);
let mut bob = CompleteSetup::new(&rpc_builder, bob_stream);

let alice_join_handle = tokio::spawn(async move {
while let Some(incoming_request) = alice
.server
.next_request()
.await
.expect("alice should never error")
{
eprintln!("alice received: {}", incoming_request);
panic!("did not expect alice to receive anything");
}

eprintln!("alice quit quietly");
});

// Preload alice's queue with requests.
let data: Vec<u8> = iter::repeat(0xFF).take(PAYLOAD_SIZE).collect();
let payload = Bytes::from(data);

let mut guards = Vec::new();

for idx in 0..NUM_REQUESTS {
let guard = alice
.client
.create_request(ChannelId::new(0))
.with_payload(payload.clone())
.try_queue_for_sending()
.expect("should never fail to queue, did you make the memory buffer too small?");
guards.push(guard);
eprintln!("pushed {}", idx);
}

let bob_join_handle = tokio::spawn(async move {
while let Some(incoming_request) = bob
.server
.next_request()
.await
.expect("bob should never error")
{
eprintln!("bob received: {}", incoming_request);
incoming_request.respond(None);
}

eprintln!("bob quit quietly");
});

// Both background tasks are running, wait for requests to finish.
for (idx, guard) in guards.into_iter().enumerate() {
guard
.wait_for_response()
.await
.expect("failed to receive response");
eprintln!("guard {} done", idx);
}

// Join both server tasks to ensure there were no panics.
alice_join_handle.await.expect("alice server panicked");
bob_join_handle.await.expect("bob server panicked");

// Drop both clients, resulting in a server shutdown.
eprintln!("dropping clients");
drop(alice.client);
drop(bob.client);
}
}

0 comments on commit c282665

Please # to comment.