diff --git a/src/HotChocolate/Core/src/Abstractions/Execution/VariableBatchRequest.cs b/src/HotChocolate/Core/src/Abstractions/Execution/VariableBatchRequest.cs index c1d8f2e35fa..d854f43cf15 100644 --- a/src/HotChocolate/Core/src/Abstractions/Execution/VariableBatchRequest.cs +++ b/src/HotChocolate/Core/src/Abstractions/Execution/VariableBatchRequest.cs @@ -112,4 +112,25 @@ public VariableBatchRequest( /// GraphQL request flags allow to limit the GraphQL executor capabilities. /// public GraphQLRequestFlags Flags { get; } + + /// + /// Creates a new request with the specified services. + /// + /// + /// The services that shall be used while executing the operation. + /// + /// + /// Returns a new request with the specified services. + /// + public VariableBatchRequest WithServices(IServiceProvider services) => + new( + Document, + DocumentId, + DocumentHash, + OperationName, + VariableValues, + Extensions, + ContextData, + services, + Flags); } diff --git a/src/HotChocolate/Core/src/Execution/RequestExecutor.cs b/src/HotChocolate/Core/src/Execution/RequestExecutor.cs index a3661bff83a..00c9098c3eb 100644 --- a/src/HotChocolate/Core/src/Execution/RequestExecutor.cs +++ b/src/HotChocolate/Core/src/Execution/RequestExecutor.cs @@ -99,7 +99,7 @@ internal async Task ExecuteAsync( if (scopeDataLoader) { - // we ensure that at the begin of each execution there is a fresh batching scope. + // we ensure that at the beginning of each execution there is a fresh batching scope. services.InitializeDataLoaderScope(); } @@ -153,7 +153,14 @@ internal async Task ExecuteAsync( _contextPool.Return(context); } - scope?.Dispose(); + if(scope is IAsyncDisposable asyncScope) + { + await asyncScope.DisposeAsync(); + } + else + { + scope?.Dispose(); + } } } @@ -174,7 +181,7 @@ public Task ExecuteBatchAsync( private async IAsyncEnumerable CreateResponseStream( OperationRequestBatch requestBatch, - [EnumeratorCancellation] CancellationToken cancellationToken = default) + [EnumeratorCancellation] CancellationToken ct = default) { IServiceScope? scope = null; @@ -197,6 +204,31 @@ private async IAsyncEnumerable CreateResponseStream( // we ensure that at the start of each execution there is a fresh batching scope. services.InitializeDataLoaderScope(); + try + { + await foreach (var result in ExecuteBatchStream(requestBatch, services, ct).ConfigureAwait(false)) + { + yield return result; + } + } + finally + { + if(scope is IAsyncDisposable asyncScope) + { + await asyncScope.DisposeAsync(); + } + else + { + scope?.Dispose(); + } + } + } + + private async IAsyncEnumerable ExecuteBatchStream( + OperationRequestBatch requestBatch, + IServiceProvider services, + [EnumeratorCancellation] CancellationToken ct = default) + { var requests = requestBatch.Requests; var requestCount = requests.Count; var tasks = new List(requestCount); @@ -205,7 +237,7 @@ private async IAsyncEnumerable CreateResponseStream( for (var i = 0; i < requestCount; i++) { - tasks.Add(ExecuteBatchItemAsync(requests[i], i, completed, cancellationToken)); + tasks.Add(ExecuteBatchItemAsync(WithServices(requests[i], services), i, completed, ct)); } var buffer = new IOperationResult[8]; @@ -228,7 +260,7 @@ private async IAsyncEnumerable CreateResponseStream( if (task.Status is not TaskStatus.RanToCompletion) { - // we await to throw if its not successful. + // we await to throw if it's not successful. await task; } @@ -252,6 +284,21 @@ private async IAsyncEnumerable CreateResponseStream( while (tasks.Count > 0 || bufferCount > 0); } + private static IOperationRequest WithServices(IOperationRequest request, IServiceProvider services) + { + switch (request) + { + case OperationRequest operationRequest: + return operationRequest.WithServices(services); + + case VariableBatchRequest variableBatchRequest: + return variableBatchRequest.WithServices(services); + + default: + throw new InvalidOperationException("Unexpected request type."); + } + } + private async Task ExecuteBatchItemAsync( IOperationRequest request, int requestIndex,