Skip to content

Commit 7fd42c4

Browse files
committed
Merged PR 3115: Wait to Complete Pipe
Fixed the `PipeWriterStream` to properly detect a canceled write and throw an `OperationCanceledException` in those cases. And making sure `Complete` is called in a safe manner by ensuring it is in a lock so writes can't be in-progress.
1 parent f198e55 commit 7fd42c4

File tree

8 files changed

+164
-12
lines changed

8 files changed

+164
-12
lines changed

eng/PatchConfig.props

+6
Original file line numberDiff line numberDiff line change
@@ -44,4 +44,10 @@ Later on, this will be checked using this condition:
4444
Microsoft.AspNetCore.CookiePolicy;
4545
</PackagesInPatch>
4646
</PropertyGroup>
47+
<PropertyGroup Condition=" '$(VersionPrefix)' == '2.1.14' ">
48+
<PackagesInPatch>
49+
Microsoft.AspNetCore.Http.Connections;
50+
Microsoft.AspNetCore.SignalR.Core;
51+
</PackagesInPatch>
52+
</PropertyGroup>
4753
</Project>

src/SignalR/clients/ts/FunctionalTests/selenium/run-tests.ts

+22-1
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,25 @@
11
import { ChildProcess, spawn } from "child_process";
2-
import * as fs from "fs";
2+
import * as _fs from "fs";
33
import { EOL } from "os";
44
import * as path from "path";
5+
import { promisify } from "util";
56
import { PassThrough, Readable } from "stream";
67

78
import { run } from "../../webdriver-tap-runner/lib";
89

910
import * as _debug from "debug";
1011
const debug = _debug("signalr-functional-tests:run");
1112

13+
const ARTIFACTS_DIR = path.resolve(__dirname, "..", "..", "..", "..", "artifacts");
14+
const LOGS_DIR = path.resolve(ARTIFACTS_DIR, "logs");
15+
16+
// Promisify things from fs we want to use.
17+
const fs = {
18+
createWriteStream: _fs.createWriteStream,
19+
exists: promisify(_fs.exists),
20+
mkdir: promisify(_fs.mkdir),
21+
};
22+
1223
process.on("unhandledRejection", (reason) => {
1324
console.error(`Unhandled promise rejection: ${reason}`);
1425
process.exit(1);
@@ -102,6 +113,13 @@ if (chromePath) {
102113
try {
103114
const serverPath = path.resolve(__dirname, "..", "bin", configuration, "netcoreapp2.1", "FunctionalTests.dll");
104115

116+
if (!await fs.exists(ARTIFACTS_DIR)) {
117+
await fs.mkdir(ARTIFACTS_DIR);
118+
}
119+
if (!await fs.exists(LOGS_DIR)) {
120+
await fs.mkdir(LOGS_DIR);
121+
}
122+
105123
debug(`Launching Functional Test Server: ${serverPath}`);
106124
const dotnet = spawn("dotnet", [serverPath], {
107125
env: {
@@ -117,6 +135,9 @@ if (chromePath) {
117135
}
118136
}
119137

138+
const logStream = fs.createWriteStream(path.resolve(LOGS_DIR, "ts.functionaltests.dotnet.log"));
139+
dotnet.stdout.pipe(logStream);
140+
120141
process.on("SIGINT", cleanup);
121142
process.on("exit", cleanup);
122143

src/SignalR/common/Http.Connections/src/Internal/HttpConnectionContext.cs

+25-3
Original file line numberDiff line numberDiff line change
@@ -274,13 +274,35 @@ private async Task WaitOnTasks(Task applicationTask, Task transportTask, bool cl
274274
// Cancel any pending flushes from back pressure
275275
Application?.Output.CancelPendingFlush();
276276

277-
// Shutdown both sides and wait for nothing
277+
// Normally it isn't safe to try and acquire this lock because the Send can hold onto it for a long time if there is backpressure
278+
// It is safe to wait for this lock now because the Send will be in one of 4 states
279+
// 1. In the middle of a write which is in the middle of being canceled by the CancelPendingFlush above, when it throws
280+
// an OperationCanceledException it will complete the PipeWriter which will make any other Send waiting on the lock
281+
// throw an InvalidOperationException if they call Write
282+
// 2. About to write and see that there is a pending cancel from the CancelPendingFlush, go to 1 to see what happens
283+
// 3. Enters the Send and sees the Dispose state from DisposeAndRemoveAsync and releases the lock
284+
// 4. No Send in progress
285+
await WriteLock.WaitAsync();
286+
try
287+
{
288+
// Complete the applications read loop
289+
Application?.Output.Complete(transportTask.Exception?.InnerException);
290+
}
291+
finally
292+
{
293+
WriteLock.Release();
294+
}
295+
296+
Log.WaitingForTransportAndApplication(_logger, TransportType);
297+
298+
// Wait for application so we can complete the writer safely
299+
await applicationTask.NoThrow();
300+
301+
// Shutdown application side now that it's finished
278302
Transport?.Output.Complete(applicationTask.Exception?.InnerException);
279-
Application?.Output.Complete(transportTask.Exception?.InnerException);
280303

281304
try
282305
{
283-
Log.WaitingForTransportAndApplication(_logger, TransportType);
284306
// A poorly written application *could* in theory get stuck forever and it'll show up as a memory leak
285307
await Task.WhenAll(applicationTask, transportTask);
286308
}

src/SignalR/common/Http.Connections/src/Internal/HttpConnectionDispatcher.cs

+10-5
Original file line numberDiff line numberDiff line change
@@ -511,6 +511,14 @@ private async Task ProcessSend(HttpContext context, HttpConnectionDispatcherOpti
511511

512512
context.Response.StatusCode = StatusCodes.Status404NotFound;
513513
context.Response.ContentType = "text/plain";
514+
515+
// There are no writes anymore (since this is the write "loop")
516+
// So it is safe to complete the writer
517+
// We complete the writer here because we already have the WriteLock acquired
518+
// and it's unsafe to complete outside of the lock
519+
// Other code isn't guaranteed to be able to acquire the lock before another write
520+
// even if CancelPendingFlush is called, and the other write could hang if there is backpressure
521+
connection.Application.Output.Complete();
514522
return;
515523
}
516524

@@ -549,11 +557,8 @@ private async Task ProcessDeleteAsync(HttpContext context)
549557

550558
Log.TerminatingConection(_logger);
551559

552-
// Complete the receiving end of the pipe
553-
connection.Application.Output.Complete();
554-
555-
// Dispose the connection gracefully, but don't wait for it. We assign it here so we can wait in tests
556-
connection.DisposeAndRemoveTask = _manager.DisposeAndRemoveAsync(connection, closeGracefully: true);
560+
// Dispose the connection, but don't wait for it. We assign it here so we can wait in tests
561+
connection.DisposeAndRemoveTask = _manager.DisposeAndRemoveAsync(connection, closeGracefully: false);
557562

558563
context.Response.StatusCode = StatusCodes.Status202Accepted;
559564
context.Response.ContentType = "text/plain";
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
// Copyright (c) .NET Foundation. All rights reserved.
2+
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
3+
4+
using System.Runtime.CompilerServices;
5+
6+
namespace System.Threading.Tasks
7+
{
8+
internal static class TaskExtensions
9+
{
10+
public static async Task NoThrow(this Task task)
11+
{
12+
await new NoThrowAwaiter(task);
13+
}
14+
}
15+
16+
internal readonly struct NoThrowAwaiter : ICriticalNotifyCompletion
17+
{
18+
private readonly Task _task;
19+
public NoThrowAwaiter(Task task) { _task = task; }
20+
public NoThrowAwaiter GetAwaiter() => this;
21+
public bool IsCompleted => _task.IsCompleted;
22+
// Observe exception
23+
public void GetResult() { _ = _task.Exception; }
24+
public void OnCompleted(Action continuation) => _task.GetAwaiter().OnCompleted(continuation);
25+
public void UnsafeOnCompleted(Action continuation) => OnCompleted(continuation);
26+
}
27+
}

src/SignalR/common/Shared/PipeWriterStream.cs

+10-2
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
// Copyright (c) .NET Foundation. All rights reserved.
1+
// Copyright (c) .NET Foundation. All rights reserved.
22
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
33

44
using System;
@@ -76,7 +76,15 @@ private ValueTask WriteCoreAsync(ReadOnlyMemory<byte> source, CancellationToken
7676

7777
_length += source.Length;
7878
var task = _pipeWriter.WriteAsync(source);
79-
if (!task.IsCompletedSuccessfully)
79+
if (task.IsCompletedSuccessfully)
80+
{
81+
// Cancellation can be triggered by PipeWriter.CancelPendingFlush
82+
if (task.Result.IsCanceled)
83+
{
84+
throw new OperationCanceledException();
85+
}
86+
}
87+
else if (!task.IsCompletedSuccessfully)
8088
{
8189
return WriteSlowAsync(task);
8290
}

src/SignalR/server/Core/src/HubConnectionContext.cs

+61
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@ public class HubConnectionContext
3333

3434
private long _lastSendTimestamp = Stopwatch.GetTimestamp();
3535
private ReadOnlyMemory<byte> _cachedPingMessage;
36+
private volatile bool _connectionAborted;
3637

3738
/// <summary>
3839
/// Initializes a new instance of the <see cref="HubConnectionContext"/> class.
@@ -99,6 +100,12 @@ public virtual ValueTask WriteAsync(HubMessage message, CancellationToken cancel
99100
return new ValueTask(WriteSlowAsync(message));
100101
}
101102

103+
if (_connectionAborted)
104+
{
105+
_writeLock.Release();
106+
return default;
107+
}
108+
102109
// This method should never throw synchronously
103110
var task = WriteCore(message);
104111

@@ -129,6 +136,12 @@ public virtual ValueTask WriteAsync(SerializedHubMessage message, CancellationTo
129136
return new ValueTask(WriteSlowAsync(message));
130137
}
131138

139+
if (_connectionAborted)
140+
{
141+
_writeLock.Release();
142+
return default;
143+
}
144+
132145
// This method should never throw synchronously
133146
var task = WriteCore(message);
134147

@@ -158,6 +171,8 @@ private ValueTask<FlushResult> WriteCore(HubMessage message)
158171
{
159172
Log.FailedWritingMessage(_logger, ex);
160173

174+
Abort();
175+
161176
return new ValueTask<FlushResult>(new FlushResult(isCanceled: false, isCompleted: true));
162177
}
163178
}
@@ -175,6 +190,8 @@ private ValueTask<FlushResult> WriteCore(SerializedHubMessage message)
175190
{
176191
Log.FailedWritingMessage(_logger, ex);
177192

193+
Abort();
194+
178195
return new ValueTask<FlushResult>(new FlushResult(isCanceled: false, isCompleted: true));
179196
}
180197
}
@@ -188,6 +205,8 @@ private async Task CompleteWriteAsync(ValueTask<FlushResult> task)
188205
catch (Exception ex)
189206
{
190207
Log.FailedWritingMessage(_logger, ex);
208+
209+
Abort();
191210
}
192211
finally
193212
{
@@ -201,13 +220,20 @@ private async Task WriteSlowAsync(HubMessage message)
201220
await _writeLock.WaitAsync();
202221
try
203222
{
223+
if (_connectionAborted)
224+
{
225+
return;
226+
}
227+
204228
// Failed to get the lock immediately when entering WriteAsync so await until it is available
205229

206230
await WriteCore(message);
207231
}
208232
catch (Exception ex)
209233
{
210234
Log.FailedWritingMessage(_logger, ex);
235+
236+
Abort();
211237
}
212238
finally
213239
{
@@ -219,6 +245,11 @@ private async Task WriteSlowAsync(SerializedHubMessage message)
219245
{
220246
try
221247
{
248+
if (_connectionAborted)
249+
{
250+
return;
251+
}
252+
222253
// Failed to get the lock immediately when entering WriteAsync so await until it is available
223254
await _writeLock.WaitAsync();
224255

@@ -227,6 +258,8 @@ private async Task WriteSlowAsync(SerializedHubMessage message)
227258
catch (Exception ex)
228259
{
229260
Log.FailedWritingMessage(_logger, ex);
261+
262+
Abort();
230263
}
231264
finally
232265
{
@@ -250,13 +283,20 @@ private async Task TryWritePingSlowAsync()
250283
{
251284
try
252285
{
286+
if (_connectionAborted)
287+
{
288+
return;
289+
}
290+
253291
await _connectionContext.Transport.Output.WriteAsync(_cachedPingMessage);
254292

255293
Log.SentPing(_logger);
256294
}
257295
catch (Exception ex)
258296
{
259297
Log.FailedWritingMessage(_logger, ex);
298+
299+
Abort();
260300
}
261301
finally
262302
{
@@ -293,6 +333,12 @@ private async Task WriteHandshakeResponseAsync(HandshakeResponseMessage message)
293333
/// </summary>
294334
public virtual void Abort()
295335
{
336+
_connectionAborted = true;
337+
338+
// Cancel any current writes or writes that are about to happen and have already gone past the _connectionAborted bool
339+
// We have to do this outside of the lock otherwise it could hang if the write is observing backpressure
340+
_connectionContext.Transport.Output.CancelPendingFlush();
341+
296342
// If we already triggered the token then noop, this isn't thread safe but it's good enough
297343
// to avoid spawning a new task in the most common cases
298344
if (_connectionAbortedTokenSource.IsCancellationRequested)
@@ -423,9 +469,24 @@ internal void Abort(Exception exception)
423469
internal Task AbortAsync()
424470
{
425471
Abort();
472+
473+
// Acquire lock to make sure all writes are completed
474+
if (!_writeLock.Wait(0))
475+
{
476+
return AbortAsyncSlow();
477+
}
478+
479+
_writeLock.Release();
426480
return _abortCompletedTcs.Task;
427481
}
428482

483+
private async Task AbortAsyncSlow()
484+
{
485+
await _writeLock.WaitAsync();
486+
_writeLock.Release();
487+
await _abortCompletedTcs.Task;
488+
}
489+
429490
private void KeepAliveTick()
430491
{
431492
var timestamp = Stopwatch.GetTimestamp();

src/SignalR/server/SignalR/test/HubConnectionHandlerTests.cs

+3-1
Original file line numberDiff line numberDiff line change
@@ -79,9 +79,11 @@ public async Task AbortFromHubMethodForcesClientDisconnect()
7979
{
8080
var connectionHandlerTask = await client.ConnectAsync(connectionHandler);
8181

82-
await client.InvokeAsync(nameof(AbortHub.Kill));
82+
await client.SendInvocationAsync(nameof(AbortHub.Kill));
8383

8484
await connectionHandlerTask.OrTimeout();
85+
86+
Assert.Null(client.TryRead());
8587
}
8688
}
8789

0 commit comments

Comments
 (0)