Skip to content
New issue

Have a question about this project? # for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “#”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? # to your account

Handle multiple incoming connection at the beginning of Http3LoopbackConnection use #69453

Merged
merged 2 commits into from
May 24, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
120 changes: 52 additions & 68 deletions src/libraries/Common/tests/System/Net/Http/Http3LoopbackConnection.cs
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,9 @@ internal sealed class Http3LoopbackConnection : GenericLoopbackConnection

private readonly QuicConnection _connection;

// Queue for holding streams we accepted before we managed to accept the control stream
private readonly Queue<QuicStream> _delayedStreams = new Queue<QuicStream>();

// This is specifically request streams, not control streams
private readonly Dictionary<int, Http3LoopbackStream> _openStreams = new Dictionary<int, Http3LoopbackStream>();

Expand All @@ -51,6 +54,8 @@ public Http3LoopbackConnection(QuicConnection connection)
_connection = connection;
}

public long MaxHeaderListSize { get; private set; } = -1;

public override void Dispose()
{
// Close any remaining request streams (but NOT control streams, as these should not be closed while the connection is open)
Expand All @@ -59,6 +64,11 @@ public override void Dispose()
stream.Dispose();
}

foreach (QuicStream stream in _delayedStreams)
{
stream.Dispose();
}

// We don't dispose the connection currently, because this causes races when the server connection is closed before
// the client has received and handled all response data.
// See discussion in https://github.com/dotnet/runtime/pull/57223#discussion_r687447832
Expand Down Expand Up @@ -107,100 +117,74 @@ public override Task InitializeConnectionAsync()
throw new NotImplementedException();
}

public async Task<Http3LoopbackStream> AcceptStreamAsync()
private Task EnsureControlStreamAcceptedAsync()
{
QuicStream quicStream = await _connection.AcceptStreamAsync().ConfigureAwait(false);
var stream = new Http3LoopbackStream(quicStream);

if (quicStream.CanWrite)
if (_inboundControlStream != null)
{
_openStreams.Add(checked((int)quicStream.StreamId), stream);
_currentStream = stream;
_currentStreamId = quicStream.StreamId;
return Task.CompletedTask;
}

return stream;
}

private async Task HandleControlStreamAsync(Http3LoopbackStream controlStream)
{
if (_inboundControlStream is not null)
return EnsureControlStreamAcceptedInternalAsync();
async Task EnsureControlStreamAcceptedInternalAsync()
{
throw new Exception("Received second control stream from client???");
}
Http3LoopbackStream controlStream;

while (true)
{
QuicStream quicStream = await _connection.AcceptStreamAsync().ConfigureAwait(false);

Assert.False(controlStream.CanWrite);
if (!quicStream.CanWrite)
{
// control stream accepted
controlStream = new Http3LoopbackStream(quicStream);
break;
}

long? streamType = await controlStream.ReadIntegerAsync();
Assert.Equal(Http3LoopbackStream.ControlStream, streamType);
// control streams are unidirectional, so this must be a request stream
// keep it for later and wait for another stream
_delayedStreams.Enqueue(quicStream);
}

List<(long settingId, long settingValue)> settings = await controlStream.ReadSettingsAsync();
(long settingId, long settingValue) = Assert.Single(settings);
long? streamType = await controlStream.ReadIntegerAsync();
Assert.Equal(Http3LoopbackStream.ControlStream, streamType);

Assert.Equal(Http3LoopbackStream.MaxHeaderListSize, settingId);
List<(long settingId, long settingValue)> settings = await controlStream.ReadSettingsAsync();
(long settingId, long settingValue) = Assert.Single(settings);

_inboundControlStream = controlStream;
Assert.Equal(Http3LoopbackStream.MaxHeaderListSize, settingId);
MaxHeaderListSize = settingValue;

_inboundControlStream = controlStream;
}
}

// This will automatically handle the control stream, including validating its contents
public async Task<Http3LoopbackStream> AcceptRequestStreamAsync()
{
Http3LoopbackStream requestStream = null;
await EnsureControlStreamAcceptedAsync().ConfigureAwait(false);

while (true)
if (!_delayedStreams.TryDequeue(out QuicStream quicStream))
{
var stream = await AcceptStreamAsync().ConfigureAwait(false);

// Accepted request stream.
if (stream.CanWrite)
{
// Only one expected.
Assert.True(requestStream is null, "Expected single request stream, got a second");
quicStream = await _connection.AcceptStreamAsync().ConfigureAwait(false);
}

// Control stream is set --> return the request stream.
if (_inboundControlStream is not null)
{
return stream;
}
var stream = new Http3LoopbackStream(quicStream);

// Control stream not set --> need to accept another stream.
requestStream = stream;
continue;
}
Assert.True(quicStream.CanWrite, "Expected writeable stream.");

// Must be the control stream.
await HandleControlStreamAsync(stream);
_openStreams.Add(checked((int)quicStream.StreamId), stream);
_currentStream = stream;
_currentStreamId = quicStream.StreamId;

// We've already accepted request stream --> return it.
if (requestStream is not null)
{
return requestStream;
}
}
return stream;
}

public async Task<(Http3LoopbackStream clientControlStream, Http3LoopbackStream requestStream)> AcceptControlAndRequestStreamAsync()
{
Http3LoopbackStream streamA = null, streamB = null;

try
{
streamA = await AcceptStreamAsync();
streamB = await AcceptStreamAsync();
Http3LoopbackStream requestStream = await AcceptRequestStreamAsync();
Http3LoopbackStream controlStream = _inboundControlStream;

return (streamA.CanWrite, streamB.CanWrite) switch
{
(false, true) => (streamA, streamB),
(true, false) => (streamB, streamA),
_ => throw new Exception("Expected one unidirectional and one bidirectional stream; received something else.")
};
}
catch
{
streamA?.Dispose();
streamB?.Dispose();
throw;
}
return (controlStream, requestStream);
}

public async Task EstablishControlStreamAsync()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,15 +45,7 @@ public async Task ClientSettingsReceived_Success(int headerSizeLimit)
using (requestStream)
{
Assert.False(settingsStream.CanWrite, "Expected unidirectional control stream.");

long? streamType = await settingsStream.ReadIntegerAsync();
Assert.Equal(Http3LoopbackStream.ControlStream, streamType);

List<(long settingId, long settingValue)> settings = await settingsStream.ReadSettingsAsync();
(long settingId, long settingValue) = Assert.Single(settings);

Assert.Equal(Http3LoopbackStream.MaxHeaderListSize, settingId);
Assert.Equal(headerSizeLimit * 1024L, settingValue);
Assert.Equal(headerSizeLimit * 1024L, connection.MaxHeaderListSize);

await requestStream.ReadRequestDataAsync();
await requestStream.SendResponseAsync();
Expand Down