Skip to content

Commit

Permalink
Modify send state machine for reliable resets
Browse files Browse the repository at this point in the history
Summary:
We've made the following changes to the send stream state machine:
* We allow the sending of multiple resets
* We only transition from `ResetSent` to `Closed` once we've successfully delivered all data until the `reliableSize` of some reset that has been ACKed by the peer. This transition can happen either when we get an ACK for a reset or an ACK for stream data.

Reviewed By: kvtsoy

Differential Revision: D66885041

fbshipit-source-id: f2f2617a56ea5713b77c9b48d7b1fa94de8e79db
  • Loading branch information
Aman Sharma authored and facebook-github-bot committed Dec 13, 2024
1 parent b4b4a14 commit 8242617
Show file tree
Hide file tree
Showing 2 changed files with 100 additions and 17 deletions.
48 changes: 32 additions & 16 deletions quic/state/stream/StreamSendHandlers.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -28,14 +28,18 @@ namespace quic {
* | Send Stream
* |
* v
* Send::Open ---------------+
* | |
* | Ack all bytes | Send RST
* | ti FIN |
* v v
* Send::Closed <------ ResetSent
* RST
* Acked
* Send::Open ------------------------------+
* | |
* | Ack all bytes |
* | till FIN | Send RST
* | |
* v v
* Send::Closed <---------------------- ResetSent
* RST ACKed and all bytes
* till smallest ACKed
* reliable reset offset
* ACKed.
*
*/

void sendStopSendingSMHandler(
Expand Down Expand Up @@ -125,7 +129,8 @@ void sendAckSMHandler(
QuicStreamState& stream,
const WriteStreamFrame& ackedFrame) {
switch (stream.sendState) {
case StreamSendState::Open: {
case StreamSendState::Open:
case StreamSendState::ResetSent: {
if (!ackedFrame.fromBufMeta) {
// Clean up the acked buffers from the retransmissionBuffer.
auto ackedBuffer = stream.retransmissionBuffer.find(ackedFrame.offset);
Expand Down Expand Up @@ -165,17 +170,22 @@ void sendAckSMHandler(
// This stream may be able to invoke some deliveryCallbacks:
stream.conn.streamManager->addDeliverable(stream.id);

// Check for whether or not we have ACKed all bytes until our FIN.
if (allBytesTillFinAcked(stream)) {
// Check for whether or not we have ACKed all bytes until our FIN or,
// in the case that we've sent a Reset, until the minimum reliable size of
// some reset acked by the peer.
bool allReliableDataDelivered =
(stream.minReliableSizeAcked &&
(*stream.minReliableSizeAcked == 0 ||
stream.allBytesAckedTill(*stream.minReliableSizeAcked - 1)));
if (allBytesTillFinAcked(stream) || allReliableDataDelivered) {
stream.sendState = StreamSendState::Closed;
if (stream.inTerminalStates()) {
stream.conn.streamManager->addClosed(stream.id);
}
}
break;
}
case StreamSendState::Closed:
case StreamSendState::ResetSent: {
case StreamSendState::Closed: {
DCHECK(stream.retransmissionBuffer.empty());
DCHECK(stream.pendingWrites.empty());
break;
Expand Down Expand Up @@ -205,9 +215,15 @@ void sendRstAckSMHandler(
stream.minReliableSizeAcked =
std::min(*stream.minReliableSizeAcked, reliableSize.value_or(0));
}
stream.sendState = StreamSendState::Closed;
if (stream.inTerminalStates()) {
stream.conn.streamManager->addClosed(stream.id);

if (*stream.minReliableSizeAcked == 0 ||
stream.allBytesAckedTill(*stream.minReliableSizeAcked - 1)) {
// We can only transition to Closed if we have successfully delivered
// all reliable data in some reset that was ACKed by the peer.
stream.sendState = StreamSendState::Closed;
if (stream.inTerminalStates()) {
stream.conn.streamManager->addClosed(stream.id);
}
}
break;
}
Expand Down
69 changes: 68 additions & 1 deletion quic/state/stream/test/StreamStateMachineTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -323,6 +323,7 @@ TEST_F(QuicResetSentStateTest, ReliableRstAckNoReduction) {
stream.readBuffer.emplace_back(
folly::IOBuf::copyBuffer("One more thing"), 0xABCD, false);
RstStreamFrame frame(id, GenericApplicationErrorCode::UNKNOWN, 0);
stream.updateAckedIntervals(0, 3, false);
sendRstAckSMHandler(stream, 5);

EXPECT_EQ(stream.sendState, StreamSendState::Closed);
Expand All @@ -346,6 +347,7 @@ TEST_F(QuicResetSentStateTest, ReliableRstAckReduction) {
stream.readBuffer.emplace_back(
folly::IOBuf::copyBuffer("One more thing"), 0xABCD, false);
RstStreamFrame frame(id, GenericApplicationErrorCode::UNKNOWN, 0);
stream.updateAckedIntervals(0, 1, false);
sendRstAckSMHandler(stream, 1);

EXPECT_EQ(stream.sendState, StreamSendState::Closed);
Expand All @@ -370,7 +372,7 @@ TEST_F(QuicResetSentStateTest, ReliableRstAckFirstTime) {
RstStreamFrame frame(id, GenericApplicationErrorCode::UNKNOWN, 0);
sendRstAckSMHandler(stream, 1);

EXPECT_EQ(stream.sendState, StreamSendState::Closed);
EXPECT_EQ(stream.sendState, StreamSendState::ResetSent);
EXPECT_FALSE(stream.finalReadOffset);
EXPECT_FALSE(stream.readBuffer.empty());
EXPECT_EQ(*stream.minReliableSizeAcked, 1);
Expand Down Expand Up @@ -399,6 +401,71 @@ TEST_F(QuicResetSentStateTest, RstAfterReliableRst) {
EXPECT_EQ(*stream.minReliableSizeAcked, 0);
}

// A reliable RESET has been ACKed, and all bytes till the reliable
// size have been ACKed.
TEST_F(QuicResetSentStateTest, ResetSentToClosedTransition1) {
auto conn = createConn();
StreamId id = 5;
QuicStreamState stream(id, *conn);
stream.sendState = StreamSendState::ResetSent;
stream.updateAckedIntervals(0, 5, false);
sendRstAckSMHandler(stream, 5);
EXPECT_EQ(stream.sendState, StreamSendState::Closed);
}

// A reliable RESET has been ACKed, but not all bytes till the
// reliable size have been ACKed.
TEST_F(QuicResetSentStateTest, ResetSentToClosedTransition2) {
auto conn = createConn();
StreamId id = 5;
QuicStreamState stream(id, *conn);
stream.sendState = StreamSendState::ResetSent;
stream.updateAckedIntervals(0, 4, false);
sendRstAckSMHandler(stream, 5);
EXPECT_EQ(stream.sendState, StreamSendState::ResetSent);
}

// A reliable RESET with a reliable size was ACKed previously, and
// now we're getting an ACK for stream data until that reliable size
TEST_F(QuicResetSentStateTest, ResetSentToClosedTransition3) {
auto conn = createConn();
StreamId id = 5;
QuicStreamState stream(id, *conn);
stream.sendState = StreamSendState::ResetSent;
stream.minReliableSizeAcked = 7;
WriteStreamFrame streamFrame(id, 0, 7, false);
auto buf = folly::IOBuf::create(7);
buf->append(7);
stream.retransmissionBuffer.emplace(
std::piecewise_construct,
std::forward_as_tuple(0),
std::forward_as_tuple(std::make_unique<WriteStreamBuffer>(
ChainedByteRangeHead(buf), 0, false)));
sendAckSMHandler(stream, streamFrame);
EXPECT_EQ(stream.sendState, StreamSendState::Closed);
}

// A reliable RESET with a reliable size was ACKed previously, and
// now we're getting an ACK for stream data, but not until the
// reliable size
TEST_F(QuicResetSentStateTest, ResetSentToClosedTransition4) {
auto conn = createConn();
StreamId id = 5;
QuicStreamState stream(id, *conn);
stream.sendState = StreamSendState::ResetSent;
stream.minReliableSizeAcked = 8;
WriteStreamFrame streamFrame(id, 0, 7, false);
auto buf = folly::IOBuf::create(7);
buf->append(7);
stream.retransmissionBuffer.emplace(
std::piecewise_construct,
std::forward_as_tuple(0),
std::forward_as_tuple(std::make_unique<WriteStreamBuffer>(
ChainedByteRangeHead(buf), 0, false)));
sendAckSMHandler(stream, streamFrame);
EXPECT_EQ(stream.sendState, StreamSendState::ResetSent);
}

class QuicClosedStateTest : public Test {};

TEST_F(QuicClosedStateTest, RstAck) {
Expand Down

0 comments on commit 8242617

Please # to comment.