Skip to content

Commit

Permalink
Update HttpSysDelegator to support detaching from and re-initializing…
Browse files Browse the repository at this point in the history
… queues when the http.sys queue no longer exists (#2426)

* Update HttpSysDelegator to support detaching from and re-initializing queues when the http.sys queue no longer exists

* Use a new event id

---------

Co-authored-by: nglore <nglore+odspmdb@microsoft.com>
Co-authored-by: Miha Zupan <mihazupan.zupan1@gmail.com>
  • Loading branch information
3 people authored Mar 5, 2024
1 parent 7e7ff8d commit 55b639a
Show file tree
Hide file tree
Showing 2 changed files with 66 additions and 25 deletions.
90 changes: 65 additions & 25 deletions src/ReverseProxy/Delegation/HttpSysDelegator.cs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ namespace Yarp.ReverseProxy.Delegation;

internal sealed class HttpSysDelegator : IHttpSysDelegator, IClusterChangeListener
{
private const int ERROR_OBJECT_NO_LONGER_EXISTS = 0x1A97;

private readonly IServerDelegationFeature? _serverDelegationFeature;
private readonly ILogger<HttpSysDelegator> _logger;
private readonly ConcurrentDictionary<QueueKey, WeakReference<DelegationQueue>> _queues;
Expand All @@ -44,7 +46,7 @@ public void ResetQueue(string queueName, string urlPrefix)
var key = new QueueKey(queueName, urlPrefix);
if (_queues.TryGetValue(key, out var queueWeakRef) && queueWeakRef.TryGetTarget(out var queue))
{
queue.Detatch();
queue.Detach();
Log.QueueReset(_logger, queueName, urlPrefix);
}
}
Expand Down Expand Up @@ -72,28 +74,50 @@ public void DelegateRequest(HttpContext context, DestinationState destination)
return;
}

// Opportunistically retry initialization if it failed previously.
// This helps when the target queue wasn't yet created because
// the target process hadn't yet started up.
var queueState = queue.Initialize(_serverDelegationFeature);
if (!queueState.IsInitialized)
{
Log.QueueNotInitialized(_logger, destination, queueState.InitializationException);
context.Response.StatusCode = StatusCodes.Status503ServiceUnavailable;
context.Features.Set<IForwarderErrorFeature>(new ForwarderErrorFeature(ForwarderError.NoAvailableDestinations, queueState.InitializationException));
return;
}
Delegate(context, destination, _serverDelegationFeature, requestDelegationFeature, queue, _logger, reattachIfNeeded: true);

try
{
Log.DelegatingRequest(_logger, destination);
requestDelegationFeature.DelegateRequest(queueState.Rule);
}
catch (Exception ex)
static void Delegate(
HttpContext context,
DestinationState destination,
IServerDelegationFeature serverDelegationFeature,
IHttpSysRequestDelegationFeature requestDelegationFeature,
DelegationQueue queue,
ILogger logger,
bool reattachIfNeeded)
{
Log.DelegationFailed(_logger, destination, ex);
context.Response.StatusCode = StatusCodes.Status503ServiceUnavailable;
context.Features.Set<IForwarderErrorFeature>(new ForwarderErrorFeature(ForwarderError.Request, ex));
// Opportunistically retry initialization if it failed previously.
// This helps when the target queue wasn't yet created because
// the target process hadn't yet started up.
var queueState = queue.Initialize(serverDelegationFeature);
if (!queueState.IsInitialized)
{
Log.QueueNotInitialized(logger, destination, queueState.InitializationException);
context.Response.StatusCode = StatusCodes.Status503ServiceUnavailable;
context.Features.Set<IForwarderErrorFeature>(new ForwarderErrorFeature(ForwarderError.NoAvailableDestinations, queueState.InitializationException));
return;
}

try
{
Log.DelegatingRequest(logger, destination);
requestDelegationFeature.DelegateRequest(queueState.Rule);
}
catch (HttpSysException ex) when (reattachIfNeeded && ex.ErrorCode == ERROR_OBJECT_NO_LONGER_EXISTS)
{
Log.QueueNoLongerExists(logger, destination.GetHttpSysDelegationQueue(), destination.Model?.Config?.Address, ex);

// The target queue is gone, detach from it so we can try to re-attach
queue.Detach(queueState);

// Attempt to delegate one more time which will try re-initialize the queue
Delegate(context, destination, serverDelegationFeature, requestDelegationFeature, queue, logger, reattachIfNeeded: false);
}
catch (Exception ex)
{
Log.DelegationFailed(logger, destination, ex);
context.Response.StatusCode = StatusCodes.Status503ServiceUnavailable;
context.Features.Set<IForwarderErrorFeature>(new ForwarderErrorFeature(ForwarderError.Request, ex));
}
}
}

Expand Down Expand Up @@ -245,12 +269,18 @@ public DelegationQueueState Initialize(IServerDelegationFeature delegationFeatur
return state;
}

public void Detatch()
public void Detach(DelegationQueueState? state = null)
{
lock (_syncRoot)
if (state == null || state == _currentState)
{
_currentState.Rule?.Dispose();
_currentState = new DelegationQueueState();
lock (_syncRoot)
{
if (state == null || state == _currentState)
{
_currentState.Rule?.Dispose();
_currentState = new DelegationQueueState();
}
}
}
}

Expand Down Expand Up @@ -352,6 +382,11 @@ private static class Log
EventIds.DelegationQueueReset,
"Detached from queue with name '{queueName}' and url prefix '{urlPrefix}'");

private static readonly Action<ILogger, string?, string?, Exception?> _queueNoLongerExists = LoggerMessage.Define<string?, string?>(
LogLevel.Information,
EventIds.DelegationQueueNoLongerExists,
"Destination queue with name '{queueName}' and url prefix '{urlPrefix}' no longer exists. Detaching and attempting to re-initialize.");

private static readonly Action<ILogger, string, string?, string?, Exception?> _delegatingRequest = LoggerMessage.Define<string, string?, string?>(
LogLevel.Information,
EventIds.DelegatingRequest,
Expand Down Expand Up @@ -382,6 +417,11 @@ public static void QueueReset(ILogger logger, string queueName, string urlPrefix
_queueReset(logger, queueName, urlPrefix, null);
}

public static void QueueNoLongerExists(ILogger logger, string? queueName, string? urlPrefix, Exception? ex)
{
_queueNoLongerExists(logger, queueName, urlPrefix, ex);
}

public static void DelegatingRequest(ILogger logger, DestinationState destination)
{
_delegatingRequest(logger, destination.DestinationId, destination.GetHttpSysDelegationQueue(), destination.Model?.Config?.Address, null);
Expand Down
1 change: 1 addition & 0 deletions src/ReverseProxy/Utilities/EventIds.cs
Original file line number Diff line number Diff line change
Expand Up @@ -68,4 +68,5 @@ internal static class EventIds
public static readonly EventId RetryingWebSocketDowngradeNoHttp2 = new EventId(62, "RetryingWebSocketDowngradeNoHttp2");
public static readonly EventId InvalidSecWebSocketKeyHeader = new EventId(63, "InvalidSecWebSocketKeyHeader");
public static readonly EventId TimeoutNotApplied = new(64, nameof(TimeoutNotApplied));
public static readonly EventId DelegationQueueNoLongerExists = new(65, nameof(DelegationQueueNoLongerExists));
}

0 comments on commit 55b639a

Please # to comment.