Skip to content
New issue

Have a question about this project? # for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “#”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? # to your account

Fix 659: sync and async timer state rework #906

Merged
merged 14 commits into from
Feb 25, 2021
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ public SNIPacket(SNIHandle owner,int id)
{
if (_data != null)
{
Debug.Fail($@"finalizer called for unreleased SNIPacket, tag: {_traceTag}");
//Debug.Fail($@"finalizer called for unreleased SNIPacket, tag: {_traceTag}");
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2783,7 +2783,7 @@ private void CleanUpStateObject(bool isCancelRequested = true)
{
_stateObj.CancelRequest();
}
_stateObj._internalTimeout = false;
_stateObj.SetTimeoutStateStopped();
_stateObj.CloseSession();
_stateObj._bulkCopyOpperationInProgress = false;
_stateObj._bulkCopyWriteTimeout = false;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -956,7 +956,7 @@ private bool TryCloseInternal(bool closeReader)
{
_sharedState._dataReady = true; // set _sharedState._dataReady to not confuse CleanPartialRead
}
_stateObj._internalTimeout = false;
_stateObj.SetTimeoutStateStopped();
if (_sharedState._dataReady)
{
cleanDataFailed = true;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1896,7 +1896,7 @@ internal bool TryRun(RunBehavior runBehavior, SqlCommand cmdHandler, SqlDataRead
// If there is data ready, but we didn't exit the loop, then something is wrong
Debug.Assert(!dataReady, "dataReady not expected - did we forget to skip the row?");

if (stateObj._internalTimeout)
if (stateObj.IsTimeoutStateExpired)
{
runBehavior = RunBehavior.Attention;
}
Expand Down Expand Up @@ -2520,7 +2520,7 @@ internal bool TryRun(RunBehavior runBehavior, SqlCommand cmdHandler, SqlDataRead
stateObj._attentionSent = false;
stateObj.HasReceivedAttention = false;

if (RunBehavior.Clean != (RunBehavior.Clean & runBehavior) && !stateObj._internalTimeout)
if (RunBehavior.Clean != (RunBehavior.Clean & runBehavior) && !stateObj.IsTimeoutStateExpired)
{
// Add attention error to collection - if not RunBehavior.Clean!
stateObj.AddError(new SqlError(0, 0, TdsEnums.MIN_ERROR_CLASS, _server, SQLMessage.OperationCancelled(), "", 0));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,11 @@ sealed internal class LastIOTimer

internal abstract class TdsParserStateObject
{
private const int TimeoutStopped = 0;
private const int TimeoutRunning = 1;
private const int TimeoutExpiredAsync = 2;
private const int TimeoutExpiredSync = 3;

private static int _objectTypeCount; // EventSource counter
internal readonly int _objectID = Interlocked.Increment(ref _objectTypeCount);

Expand Down Expand Up @@ -113,12 +118,15 @@ internal enum SnapshottedStateFlags : byte
// Timeout variables
private long _timeoutMilliseconds;
private long _timeoutTime; // variable used for timeout computations, holds the value of the hi-res performance counter at which this request should expire
private int _timeoutState; // expected to be one of the constant values TimeoutStopped, TimeoutRunning, TimeoutExpiredAsync, TimeoutExpiredSync
private int _timeoutIdentitySource;
private TimeoutState _spareTimeoutState;
private volatile int _timeoutIdentityValue;
internal volatile bool _attentionSent; // true if we sent an Attention to the server
internal volatile bool _attentionSending;
internal bool _internalTimeout; // an internal timeout occurred

private readonly LastIOTimer _lastSuccessfulIOTimer;

private readonly LastIOTimer _lastSuccessfulIOTimer;

// secure password information to be stored
// At maximum number of secure string that need to be stored is two; one for login password and the other for new change password
private SecureString[] _securePasswords = new SecureString[2] { null, null };
Expand Down Expand Up @@ -760,7 +768,7 @@ private void ResetCancelAndProcessAttention()
// operations.
Parser.ProcessPendingAck(this);
}
_internalTimeout = false;
SetTimeoutStateStopped();
}
}

Expand Down Expand Up @@ -1042,7 +1050,7 @@ internal bool TryProcessHeader()
return false;
}

if (_internalTimeout)
if (IsTimeoutStateExpired)
{
ThrowExceptionAndWarning();
return true;
Expand Down Expand Up @@ -2247,11 +2255,67 @@ internal void OnConnectionClosed()
}
}

private void OnTimeout(object state)
public void SetTimeoutStateStopped()
{
Interlocked.Exchange(ref _timeoutState, TimeoutStopped);
_timeoutIdentityValue = 0;
}

public bool IsTimeoutStateExpired
{
if (!_internalTimeout)
get
{
_internalTimeout = true;
int state = _timeoutState;
return state == TimeoutExpiredAsync || state == TimeoutExpiredSync;
}
}

private sealed class TimeoutState
{
public int IdentityValue { get; set; }
}

private void OnTimeoutAsync(object state)
{
#if DEBUG
Thread.Sleep(13000);
#endif
int currentIdentityValue = _timeoutIdentityValue;
TimeoutState timeoutState = (TimeoutState)state;
if (timeoutState.IdentityValue == _timeoutIdentityValue)
{
// the return value is not useful here because no choice is going to be made using it
// we only want to make this call to set the state knowing that it will be seen later
OnTimeoutCore(TimeoutRunning, TimeoutExpiredAsync);
}
else
{
Debug.WriteLine($"OnTimeoutAsync called with identity state={timeoutState.IdentityValue} but current identity is {currentIdentityValue} so it is being ignored");
}
timeoutState.IdentityValue = 0;
Interlocked.CompareExchange(ref _spareTimeoutState, timeoutState, null);
}

private bool OnTimeoutSync()
{
return OnTimeoutCore(TimeoutRunning, TimeoutExpiredSync);
}

/// <summary>
/// attempts to change the timout state from the expected state to the target state and if it succeeds
/// will setup the the stateobject into the timeout expired state
/// </summary>
/// <param name="expectedState">the state that is the expected current state, state will change only if this is correct</param>
/// <param name="targetState">the state that will be changed to if the expected state is correct</param>
/// <returns>boolean value indicating whether the call changed the timeout state</returns>
private bool OnTimeoutCore(int expectedState, int targetState)
{
Debug.Assert(targetState == TimeoutExpiredAsync || targetState == TimeoutExpiredSync, "OnTimeoutCore must have an expiry state as the targetState");

bool retval = false;
if (Interlocked.CompareExchange(ref _timeoutState, targetState, expectedState) == expectedState)
{
retval = true;
// lock protects against Close and Cancel
lock (this)
{
Expand Down Expand Up @@ -2349,6 +2413,7 @@ private void OnTimeout(object state)
}
}
}
return retval;
}

internal void ReadSni(TaskCompletionSource<object> completion)
Expand Down Expand Up @@ -2383,19 +2448,33 @@ internal void ReadSni(TaskCompletionSource<object> completion)
{
Debug.Assert(completion != null, "Async on but null asyncResult passed");

if (_networkPacketTimeout == null)
// if the state is currently stopped then change it to running and allocate a new identity value from
// the identity source. The identity value is used to correlate timer callback events to the currently
// running timeout and prevents a late timer callback affecting a result it does not relate to
int previousTimeoutState = Interlocked.CompareExchange(ref _timeoutState, TimeoutRunning, TimeoutStopped);
if (previousTimeoutState == TimeoutStopped)
{
_networkPacketTimeout = ADP.UnsafeCreateTimer(
new TimerCallback(OnTimeout),
null,
Timeout.Infinite,
Timeout.Infinite);
Debug.Assert(_timeoutIdentityValue == 0, "timer was previously stopped without resetting the _identityValue");
_timeoutIdentityValue = Interlocked.Increment(ref _timeoutIdentitySource);
}

_networkPacketTimeout?.Dispose();
TimeoutState state = Interlocked.Exchange(ref _spareTimeoutState, null) ?? new TimeoutState();
state.IdentityValue = _timeoutIdentityValue;
_networkPacketTimeout = ADP.UnsafeCreateTimer(
new TimerCallback(OnTimeoutAsync),
state,
Timeout.Infinite,
Timeout.Infinite
);


// -1 == Infinite
// 0 == Already timed out (NOTE: To simulate the same behavior as sync we will only timeout on 0 if we receive an IO Pending from SNI)
// >0 == Actual timeout remaining
int msecsRemaining = GetTimeoutRemaining();

Debug.Assert(previousTimeoutState == TimeoutStopped, "previous timeout state was not Stopped");
if (msecsRemaining > 0)
{
ChangeNetworkPacketTimeout(msecsRemaining, Timeout.Infinite);
Expand Down Expand Up @@ -2445,12 +2524,15 @@ internal void ReadSni(TaskCompletionSource<object> completion)
_networkPacketTaskSource.TrySetResult(null);
}
// Disable timeout timer on error
SetTimeoutStateStopped();
ChangeNetworkPacketTimeout(Timeout.Infinite, Timeout.Infinite);
}
else if (msecsRemaining == 0)
{ // Got IO Pending, but we have no time left to wait
// Immediately schedule the timeout timer to fire
ChangeNetworkPacketTimeout(0, Timeout.Infinite);
{
// Got IO Pending, but we have no time left to wait
// disable the timer and set the error state by calling OnTimeoutSync
ChangeNetworkPacketTimeout(Timeout.Infinite, Timeout.Infinite);
OnTimeoutSync();
}
// DO NOT HANDLE PENDING READ HERE - which is TdsEnums.SNI_SUCCESS_IO_PENDING state.
// That is handled by user who initiated async read, or by ReadNetworkPacket which is sync over async.
Expand Down Expand Up @@ -2565,13 +2647,13 @@ private void ReadSniError(TdsParserStateObject stateObj, uint error)
Debug.Assert(_syncOverAsync, "Should never reach here with async on!");
bool fail = false;

if (_internalTimeout)
if (IsTimeoutStateExpired)
{ // This is now our second timeout - time to give up.
fail = true;
}
else
{
stateObj._internalTimeout = true;
stateObj.SetTimeoutStateStopped();
Debug.Assert(_parser.Connection != null, "SqlConnectionInternalTds handler can not be null at this point.");
AddError(new SqlError(TdsEnums.TIMEOUT_EXPIRED, (byte)0x00, TdsEnums.MIN_ERROR_CLASS, _parser.Server, _parser.Connection.TimeoutErrorInternal.GetErrorMessage(), "", 0, TdsEnums.SNI_WAIT_TIMEOUT));

Expand Down Expand Up @@ -2794,6 +2876,25 @@ public void ReadAsyncCallback(IntPtr key, PacketHandle packet, uint error)

ChangeNetworkPacketTimeout(Timeout.Infinite, Timeout.Infinite);

// the timer thread may be unreliable under high contention scenarios it cannot be
// assumed that the timeout has happened on the timer thread callback, check the timeout
// synchrnously and then call OnTimeoutSync to force an atomic change of state
if (TimeoutHasExpired)
{
OnTimeoutSync();
}

// try to change to the stopped state but only do so if currently in the running state
// and use cmpexch so that all changes out of the running state are atomic
int previousState = Interlocked.CompareExchange(ref _timeoutState, TimeoutRunning, TimeoutStopped);

// if the state is anything other than running then this query has reached an end so
// set the correlation _timeoutIdentityValue to 0 to prevent late callbacks executing
if (_timeoutState != TimeoutRunning)
{
_timeoutIdentityValue = 0;
}

ProcessSniPacket(packet, error);
}
catch (Exception e)
Expand Down Expand Up @@ -3862,7 +3963,7 @@ internal void AssertStateIsClean()
// Attention\Cancellation\Timeouts
Debug.Assert(!HasReceivedAttention && !_attentionSent && !_attentionSending, $"StateObj is still dealing with attention: Sent: {_attentionSent}, Received: {HasReceivedAttention}, Sending: {_attentionSending}");
Debug.Assert(!_cancelled, "StateObj still has cancellation set");
Debug.Assert(!_internalTimeout, "StateObj still has internal timeout set");
Debug.Assert(_timeoutState == TimeoutStopped, "StateObj still has internal timeout set");
// Errors and Warnings
Debug.Assert(!_hasErrorOrWarning, "StateObj still has stored errors or warnings");
}
Expand Down