Skip to content

Commit

Permalink
Reworked connection tracking logic.
Browse files Browse the repository at this point in the history
- Use concurrent dictionary and key by connection id instead of custom comparer.
- Removed UpdateConnection from ITransportHeartBeat.
- Release previous connection if Add called twice with same connection id.
- Only perform keep alive logic for transports that support it (SupportsKeepAlive)
- Cleaned up and simplified logic in ITransportHeartBeat.Beat.
  • Loading branch information
davidfowl committed Jun 20, 2012
1 parent 301d603 commit 8d27c97
Show file tree
Hide file tree
Showing 8 changed files with 195 additions and 183 deletions.
1 change: 0 additions & 1 deletion SignalR/SignalR.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,6 @@
<Compile Include="Transports\ITransport.cs" />
<Compile Include="Transports\ITransportHeartBeat.cs" />
<Compile Include="Transports\ITransportManager.cs" />
<Compile Include="Transports\ConnectionReference.cs" />
<Compile Include="Transports\ServerSentEventsTransport.cs" />
<Compile Include="Transports\TransportDisconnectBase.cs" />
<Compile Include="Transports\TransportHeartBeat.cs" />
Expand Down
56 changes: 0 additions & 56 deletions SignalR/Transports/ConnectionReference.cs

This file was deleted.

22 changes: 19 additions & 3 deletions SignalR/Transports/ForeverTransport.cs
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ protected IJsonSerializer JsonSerializer
protected virtual void OnSending(string payload)
{
HeartBeat.MarkConnection(this);

if (Sending != null)
{
Sending(payload);
Expand Down Expand Up @@ -94,7 +95,16 @@ protected Task ProcessRequestCore(ITransportConnection connection)
if (Connected != null)
{
// Return a task that completes when the connected event task & the receive loop task are both finished
return TaskAsyncHelper.Interleave(ProcessReceiveRequest, Connected, connection);
bool newConnection = HeartBeat.AddConnection(this);
return TaskAsyncHelper.Interleave(ProcessReceiveRequestWithoutTracking, () =>
{
if (newConnection)
{
return Connected();
}
return TaskAsyncHelper.Empty;
}
, connection);
}

return ProcessReceiveRequest(connection);
Expand All @@ -117,16 +127,19 @@ public virtual Task ProcessRequest(ITransportConnection connection)

public virtual Task Send(PersistentResponse response)
{
HeartBeat.MarkConnection(this);
var data = _jsonSerializer.Stringify(response);

OnSending(data);

return Context.Response.WriteAsync(data);
}

public virtual Task Send(object value)
{
var data = _jsonSerializer.Stringify(value);

OnSending(data);

return Context.Response.EndAsync(data);
}

Expand All @@ -152,8 +165,11 @@ private Task ProcessSendRequest()
private Task ProcessReceiveRequest(ITransportConnection connection, Action postReceive = null)
{
HeartBeat.AddConnection(this);
HeartBeat.MarkConnection(this);
return ProcessReceiveRequestWithoutTracking(connection, postReceive);
}

private Task ProcessReceiveRequestWithoutTracking(ITransportConnection connection, Action postReceive = null)
{
Action afterReceive = () =>
{
if (TransportConnected != null)
Expand Down
10 changes: 10 additions & 0 deletions SignalR/Transports/ITrackingConnection.cs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,11 @@ public interface ITrackingConnection
/// </summary>
bool IsTimedOut { get; }

/// <summary>
/// Gets a value that represents if the connection supprots keep alive.
/// </summary>
bool SupportsKeepAlive { get; }

/// <summary>
/// Gets a value indicating the amount of time to wait after the connection dies before firing the disconnecting the connection.
/// </summary>
Expand All @@ -47,5 +52,10 @@ public interface ITrackingConnection
/// Sends a keep alive ping over the connection.
/// </summary>
void KeepAlive();

/// <summary>
/// Kills the connection.
/// </summary>
void End();
}
}
8 changes: 1 addition & 7 deletions SignalR/Transports/ITransportHeartBeat.cs
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,7 @@ public interface ITransportHeartBeat
/// Adds a new connection to the list of tracked connections.
/// </summary>
/// <param name="connection">The connection to be added.</param>
void AddConnection(ITrackingConnection connection);

/// <summary>
/// Updates an existing connection and it's metadata.
/// </summary>
/// <param name="connection">The connection to be updated.</param>
void UpdateConnection(ITrackingConnection connection);
bool AddConnection(ITrackingConnection connection);

/// <summary>
/// Marks an existing connection as active.
Expand Down
29 changes: 24 additions & 5 deletions SignalR/Transports/LongPollingTransport.cs
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,14 @@ private string JsonpCallback
}
}

public override bool SupportsKeepAlive
{
get
{
return false;
}
}

public Func<string, Task> Received { get; set; }

public Func<Task> TransportConnected { get; set; }
Expand Down Expand Up @@ -179,22 +187,33 @@ private Task ProcessSendRequest()

private Task ProcessConnectRequest(ITransportConnection connection)
{
HeartBeat.AddConnection(this);

if (Connected != null)
{
bool newConnection = HeartBeat.AddConnection(this);

// Return a task that completes when the connected event task & the receive loop task are both finished
return TaskAsyncHelper.Interleave(ProcessReceiveRequest, Connected, connection);
return TaskAsyncHelper.Interleave(ProcessReceiveRequestWithoutTracking, () =>
{
if (newConnection)
{
return Connected();
}
return TaskAsyncHelper.Empty;
},
connection);
}

return ProcessReceiveRequest(connection);
}

private Task ProcessReceiveRequest(ITransportConnection connection, Action postReceive = null)
{
HeartBeat.UpdateConnection(this);
HeartBeat.MarkConnection(this);
HeartBeat.AddConnection(this);
return ProcessReceiveRequestWithoutTracking(connection, postReceive);
}

private Task ProcessReceiveRequestWithoutTracking(ITransportConnection connection, Action postReceive = null)
{
if (TransportConnected != null)
{
TransportConnected().Catch();
Expand Down
17 changes: 16 additions & 1 deletion SignalR/Transports/TransportDisconnectBase.cs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ public abstract class TransportDisconnectBase : ITrackingConnection

protected int _isDisconnected;
private readonly CancellationTokenSource _timeoutTokenSource;
private readonly CancellationTokenSource _endTokenSource;
private readonly CancellationToken _hostShutdownToken;
private readonly CancellationTokenSource _connectionEndToken;

Expand All @@ -25,10 +26,11 @@ public TransportDisconnectBase(HostContext context, IJsonSerializer jsonSerializ
_jsonSerializer = jsonSerializer;
_heartBeat = heartBeat;
_timeoutTokenSource = new CancellationTokenSource();
_endTokenSource = new CancellationTokenSource();
_hostShutdownToken = context.HostShutdownToken();

// Create a token that represents the end of this connection's life
_connectionEndToken = CancellationTokenSource.CreateLinkedTokenSource(_timeoutTokenSource.Token, _hostShutdownToken);
_connectionEndToken = CancellationTokenSource.CreateLinkedTokenSource(_timeoutTokenSource.Token, _endTokenSource.Token, _hostShutdownToken);
}

public string ConnectionId
Expand Down Expand Up @@ -90,6 +92,14 @@ public bool IsTimedOut
}
}

public virtual bool SupportsKeepAlive
{
get
{
return true;
}
}

public virtual TimeSpan DisconnectThreshold
{
get { return TimeSpan.FromSeconds(5); }
Expand Down Expand Up @@ -144,6 +154,11 @@ public virtual void KeepAlive()
{
}

public void End()
{
_endTokenSource.Cancel();
}

protected ITransportConnection Connection { get; set; }

protected HostContext Context
Expand Down
Loading

0 comments on commit 8d27c97

Please # to comment.