From 8242617c546bfb761390c495dccdc4dfe78ddeb9 Mon Sep 17 00:00:00 2001 From: Aman Sharma Date: Fri, 13 Dec 2024 15:54:21 -0800 Subject: [PATCH] Modify send state machine for reliable resets Summary: We've made the following changes to the send stream state machine: * We allow the sending of multiple resets * We only transition from `ResetSent` to `Closed` once we've successfully delivered all data until the `reliableSize` of some reset that has been ACKed by the peer. This transition can happen either when we get an ACK for a reset or an ACK for stream data. Reviewed By: kvtsoy Differential Revision: D66885041 fbshipit-source-id: f2f2617a56ea5713b77c9b48d7b1fa94de8e79db --- quic/state/stream/StreamSendHandlers.cpp | 48 ++++++++----- .../stream/test/StreamStateMachineTest.cpp | 69 ++++++++++++++++++- 2 files changed, 100 insertions(+), 17 deletions(-) diff --git a/quic/state/stream/StreamSendHandlers.cpp b/quic/state/stream/StreamSendHandlers.cpp index 16b8b8cdd..1c0ee602e 100644 --- a/quic/state/stream/StreamSendHandlers.cpp +++ b/quic/state/stream/StreamSendHandlers.cpp @@ -28,14 +28,18 @@ namespace quic { * | Send Stream * | * v - * Send::Open ---------------+ - * | | - * | Ack all bytes | Send RST - * | ti FIN | - * v v - * Send::Closed <------ ResetSent - * RST - * Acked + * Send::Open ------------------------------+ + * | | + * | Ack all bytes | + * | till FIN | Send RST + * | | + * v v + * Send::Closed <---------------------- ResetSent + * RST ACKed and all bytes + * till smallest ACKed + * reliable reset offset + * ACKed. + * */ void sendStopSendingSMHandler( @@ -125,7 +129,8 @@ void sendAckSMHandler( QuicStreamState& stream, const WriteStreamFrame& ackedFrame) { switch (stream.sendState) { - case StreamSendState::Open: { + case StreamSendState::Open: + case StreamSendState::ResetSent: { if (!ackedFrame.fromBufMeta) { // Clean up the acked buffers from the retransmissionBuffer. auto ackedBuffer = stream.retransmissionBuffer.find(ackedFrame.offset); @@ -165,8 +170,14 @@ void sendAckSMHandler( // This stream may be able to invoke some deliveryCallbacks: stream.conn.streamManager->addDeliverable(stream.id); - // Check for whether or not we have ACKed all bytes until our FIN. - if (allBytesTillFinAcked(stream)) { + // Check for whether or not we have ACKed all bytes until our FIN or, + // in the case that we've sent a Reset, until the minimum reliable size of + // some reset acked by the peer. + bool allReliableDataDelivered = + (stream.minReliableSizeAcked && + (*stream.minReliableSizeAcked == 0 || + stream.allBytesAckedTill(*stream.minReliableSizeAcked - 1))); + if (allBytesTillFinAcked(stream) || allReliableDataDelivered) { stream.sendState = StreamSendState::Closed; if (stream.inTerminalStates()) { stream.conn.streamManager->addClosed(stream.id); @@ -174,8 +185,7 @@ void sendAckSMHandler( } break; } - case StreamSendState::Closed: - case StreamSendState::ResetSent: { + case StreamSendState::Closed: { DCHECK(stream.retransmissionBuffer.empty()); DCHECK(stream.pendingWrites.empty()); break; @@ -205,9 +215,15 @@ void sendRstAckSMHandler( stream.minReliableSizeAcked = std::min(*stream.minReliableSizeAcked, reliableSize.value_or(0)); } - stream.sendState = StreamSendState::Closed; - if (stream.inTerminalStates()) { - stream.conn.streamManager->addClosed(stream.id); + + if (*stream.minReliableSizeAcked == 0 || + stream.allBytesAckedTill(*stream.minReliableSizeAcked - 1)) { + // We can only transition to Closed if we have successfully delivered + // all reliable data in some reset that was ACKed by the peer. + stream.sendState = StreamSendState::Closed; + if (stream.inTerminalStates()) { + stream.conn.streamManager->addClosed(stream.id); + } } break; } diff --git a/quic/state/stream/test/StreamStateMachineTest.cpp b/quic/state/stream/test/StreamStateMachineTest.cpp index d155e29dd..2c360ec4d 100644 --- a/quic/state/stream/test/StreamStateMachineTest.cpp +++ b/quic/state/stream/test/StreamStateMachineTest.cpp @@ -323,6 +323,7 @@ TEST_F(QuicResetSentStateTest, ReliableRstAckNoReduction) { stream.readBuffer.emplace_back( folly::IOBuf::copyBuffer("One more thing"), 0xABCD, false); RstStreamFrame frame(id, GenericApplicationErrorCode::UNKNOWN, 0); + stream.updateAckedIntervals(0, 3, false); sendRstAckSMHandler(stream, 5); EXPECT_EQ(stream.sendState, StreamSendState::Closed); @@ -346,6 +347,7 @@ TEST_F(QuicResetSentStateTest, ReliableRstAckReduction) { stream.readBuffer.emplace_back( folly::IOBuf::copyBuffer("One more thing"), 0xABCD, false); RstStreamFrame frame(id, GenericApplicationErrorCode::UNKNOWN, 0); + stream.updateAckedIntervals(0, 1, false); sendRstAckSMHandler(stream, 1); EXPECT_EQ(stream.sendState, StreamSendState::Closed); @@ -370,7 +372,7 @@ TEST_F(QuicResetSentStateTest, ReliableRstAckFirstTime) { RstStreamFrame frame(id, GenericApplicationErrorCode::UNKNOWN, 0); sendRstAckSMHandler(stream, 1); - EXPECT_EQ(stream.sendState, StreamSendState::Closed); + EXPECT_EQ(stream.sendState, StreamSendState::ResetSent); EXPECT_FALSE(stream.finalReadOffset); EXPECT_FALSE(stream.readBuffer.empty()); EXPECT_EQ(*stream.minReliableSizeAcked, 1); @@ -399,6 +401,71 @@ TEST_F(QuicResetSentStateTest, RstAfterReliableRst) { EXPECT_EQ(*stream.minReliableSizeAcked, 0); } +// A reliable RESET has been ACKed, and all bytes till the reliable +// size have been ACKed. +TEST_F(QuicResetSentStateTest, ResetSentToClosedTransition1) { + auto conn = createConn(); + StreamId id = 5; + QuicStreamState stream(id, *conn); + stream.sendState = StreamSendState::ResetSent; + stream.updateAckedIntervals(0, 5, false); + sendRstAckSMHandler(stream, 5); + EXPECT_EQ(stream.sendState, StreamSendState::Closed); +} + +// A reliable RESET has been ACKed, but not all bytes till the +// reliable size have been ACKed. +TEST_F(QuicResetSentStateTest, ResetSentToClosedTransition2) { + auto conn = createConn(); + StreamId id = 5; + QuicStreamState stream(id, *conn); + stream.sendState = StreamSendState::ResetSent; + stream.updateAckedIntervals(0, 4, false); + sendRstAckSMHandler(stream, 5); + EXPECT_EQ(stream.sendState, StreamSendState::ResetSent); +} + +// A reliable RESET with a reliable size was ACKed previously, and +// now we're getting an ACK for stream data until that reliable size +TEST_F(QuicResetSentStateTest, ResetSentToClosedTransition3) { + auto conn = createConn(); + StreamId id = 5; + QuicStreamState stream(id, *conn); + stream.sendState = StreamSendState::ResetSent; + stream.minReliableSizeAcked = 7; + WriteStreamFrame streamFrame(id, 0, 7, false); + auto buf = folly::IOBuf::create(7); + buf->append(7); + stream.retransmissionBuffer.emplace( + std::piecewise_construct, + std::forward_as_tuple(0), + std::forward_as_tuple(std::make_unique( + ChainedByteRangeHead(buf), 0, false))); + sendAckSMHandler(stream, streamFrame); + EXPECT_EQ(stream.sendState, StreamSendState::Closed); +} + +// A reliable RESET with a reliable size was ACKed previously, and +// now we're getting an ACK for stream data, but not until the +// reliable size +TEST_F(QuicResetSentStateTest, ResetSentToClosedTransition4) { + auto conn = createConn(); + StreamId id = 5; + QuicStreamState stream(id, *conn); + stream.sendState = StreamSendState::ResetSent; + stream.minReliableSizeAcked = 8; + WriteStreamFrame streamFrame(id, 0, 7, false); + auto buf = folly::IOBuf::create(7); + buf->append(7); + stream.retransmissionBuffer.emplace( + std::piecewise_construct, + std::forward_as_tuple(0), + std::forward_as_tuple(std::make_unique( + ChainedByteRangeHead(buf), 0, false))); + sendAckSMHandler(stream, streamFrame); + EXPECT_EQ(stream.sendState, StreamSendState::ResetSent); +} + class QuicClosedStateTest : public Test {}; TEST_F(QuicClosedStateTest, RstAck) {