Skip to content
New issue

Have a question about this project? # for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “#”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? # to your account

Bulk delete fixes #3689

Merged
merged 3 commits into from
Jan 26, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
using Microsoft.Health.Fhir.Core.Features.Operations.BulkDelete;
using Microsoft.Health.Fhir.Core.Features.Search;
using Microsoft.Health.Fhir.Core.Messages.Delete;
using Microsoft.Health.Fhir.Core.UnitTests.Extensions;
using Microsoft.Health.Fhir.Tests.Common;
using Microsoft.Health.JobManagement;
using Microsoft.Health.Test.Utilities;
Expand All @@ -34,7 +35,7 @@ public BulkDeleteOrchestratorJobTests()
{
_queueClient = Substitute.For<IQueueClient>();
_searchService = Substitute.For<ISearchService>();
_orchestratorJob = new BulkDeleteOrchestratorJob(_queueClient, _searchService);
_orchestratorJob = new BulkDeleteOrchestratorJob(_queueClient, _searchService.CreateMockScopeFactory());

_progress = new Progress<string>((result) => { });
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
using Microsoft.Health.Fhir.Core.Features.Persistence;
using Microsoft.Health.Fhir.Core.Features.Search;
using Microsoft.Health.Fhir.Core.Messages.Delete;
using Microsoft.Health.Fhir.Core.UnitTests.Extensions;
using Microsoft.Health.Fhir.Tests.Common;
using Microsoft.Health.JobManagement;
using Microsoft.Health.Test.Utilities;
Expand All @@ -43,9 +44,7 @@ public BulkDeleteProcessingJobTests()
.Returns(Task.FromResult(new SearchResult(5, new List<Tuple<string, string>>())));
_queueClient = Substitute.For<IQueueClient>();
_deleter = Substitute.For<IDeletionService>();
var deleter = Substitute.For<IScoped<IDeletionService>>();
deleter.Value.Returns(_deleter);
_processingJob = new BulkDeleteProcessingJob(() => deleter, Substitute.For<RequestContextAccessor<IFhirRequestContext>>(), Substitute.For<IMediator>(), _searchService, _queueClient);
_processingJob = new BulkDeleteProcessingJob(_deleter.CreateMockScopeFactory(), Substitute.For<RequestContextAccessor<IFhirRequestContext>>(), Substitute.For<IMediator>(), _searchService.CreateMockScopeFactory(), _queueClient);

_progress = new Progress<string>((result) => { });
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ public static class SearchServiceExtensions
/// <param name="count">The search Count.</param>
/// <param name="continuationToken">An optional ContinuationToken</param>
/// <param name="versionType">The versions of a resource to return</param>
/// <param name="onlyIds">Whether to return only resource ids or the full resource</param>
/// <param name="logger">The logger</param>
/// <returns>Search collection and a continuationToken</returns>
/// <exception cref="PreconditionFailedException">Returns this exception when all passed in params match the search result unusedParams</exception>
Expand All @@ -53,6 +54,7 @@ public static class SearchServiceExtensions
int? count = 2, // Most "Conditional" logic needs only 0, 1 or >1, so here we can limit to "2"
string continuationToken = null,
ResourceVersionType versionType = ResourceVersionType.Latest,
bool onlyIds = false,
ILogger logger = null)
{
// Filters search parameters that can limit the number of results (e.g. _count=1)
Expand Down Expand Up @@ -83,7 +85,7 @@ public static class SearchServiceExtensions

statistics.Iterate();

SearchResult results = await searchService.SearchAsync(instanceType, searchParameters.ToImmutableList(), cancellationToken, resourceVersionTypes: versionType);
SearchResult results = await searchService.SearchAsync(instanceType, searchParameters.ToImmutableList(), cancellationToken, resourceVersionTypes: versionType, onlyIds: onlyIds);
lastContinuationToken = results?.ContinuationToken;

// Check if all parameters passed in were unused, this would result in no search parameters being applied to search results
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
using System.Threading.Tasks;
using EnsureThat;
using Hl7.Fhir.Rest;
using Microsoft.Health.Extensions.DependencyInjection;
using Microsoft.Health.Fhir.Core.Features.Search;
using Microsoft.Health.JobManagement;

Expand All @@ -19,12 +20,12 @@ namespace Microsoft.Health.Fhir.Core.Features.Operations.BulkDelete
public class BulkDeleteOrchestratorJob : IJob
{
private readonly IQueueClient _queueClient;
private readonly ISearchService _searchService;
private readonly Func<IScoped<ISearchService>> _searchService;
private const string OperationCompleted = "Completed";

public BulkDeleteOrchestratorJob(
IQueueClient queueClient,
ISearchService searchService)
Func<IScoped<ISearchService>> searchService)
{
EnsureArg.IsNotNull(queueClient, nameof(queueClient));
EnsureArg.IsNotNull(searchService, nameof(searchService));
Expand All @@ -42,15 +43,16 @@ public async Task<string> ExecuteAsync(JobInfo jobInfo, IProgress<string> progre

BulkDeleteDefinition processingDefinition = null;

using var searchService = _searchService.Invoke();
if (string.IsNullOrEmpty(definition.Type))
{
IReadOnlyList<string> resourceTypes = await _searchService.GetUsedResourceTypes(cancellationToken);
IReadOnlyList<string> resourceTypes = await searchService.Value.GetUsedResourceTypes(cancellationToken);

processingDefinition = await CreateProcessingDefinition(definition, _searchService, new List<string>(resourceTypes), cancellationToken);
processingDefinition = await CreateProcessingDefinition(definition, searchService.Value, new List<string>(resourceTypes), cancellationToken);
}
else
{
processingDefinition = await CreateProcessingDefinition(definition, _searchService, new List<string>() { definition.Type }, cancellationToken);
processingDefinition = await CreateProcessingDefinition(definition, searchService.Value, new List<string>() { definition.Type }, cancellationToken);
}

// Processing Definition can be null if bulk delete was requested on criteria that didn't match any resources. If there is nothing to delete, just finish the job.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,14 +31,14 @@ public class BulkDeleteProcessingJob : IJob
private readonly Func<IScoped<IDeletionService>> _deleterFactory;
private readonly RequestContextAccessor<IFhirRequestContext> _contextAccessor;
private readonly IMediator _mediator;
private readonly ISearchService _searchService;
private readonly Func<IScoped<ISearchService>> _searchService;
private readonly IQueueClient _queueClient;

public BulkDeleteProcessingJob(
Func<IScoped<IDeletionService>> deleterFactory,
RequestContextAccessor<IFhirRequestContext> contextAccessor,
IMediator mediator,
ISearchService searchService,
Func<IScoped<ISearchService>> searchService,
IQueueClient queueClient)
{
_deleterFactory = EnsureArg.IsNotNull(deleterFactory, nameof(deleterFactory));
Expand Down Expand Up @@ -112,7 +112,8 @@ public async Task<string> ExecuteAsync(JobInfo jobInfo, IProgress<string> progre
if (types.Count > 1)
{
types.RemoveAt(0);
BulkDeleteDefinition processingDefinition = await BulkDeleteOrchestratorJob.CreateProcessingDefinition(definition, _searchService, types, cancellationToken);
using var searchService = _searchService.Invoke();
BulkDeleteDefinition processingDefinition = await BulkDeleteOrchestratorJob.CreateProcessingDefinition(definition, searchService.Value, types, cancellationToken);

if (processingDefinition != null)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,6 @@ public ResourceWrapper(
{
EnsureArg.IsNotNullOrEmpty(resourceId, nameof(resourceId));
EnsureArg.IsNotNullOrEmpty(resourceTypeName, nameof(resourceTypeName));
EnsureArg.IsNotNull(rawResource, nameof(rawResource));

ResourceId = resourceId;
Version = versionId;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,21 @@ namespace Microsoft.Health.Fhir.Core.Features.Search
{
public interface ISearchOptionsFactory
{
SearchOptions Create(string resourceType, IReadOnlyList<Tuple<string, string>> queryParameters, bool isAsyncOperation = false, ResourceVersionType resourceVersionTypes = ResourceVersionType.Latest);
SearchOptions Create(
string resourceType,
IReadOnlyList<Tuple<string, string>> queryParameters,
bool isAsyncOperation = false,
ResourceVersionType resourceVersionTypes = ResourceVersionType.Latest,
bool onlyIds = false);

SearchOptions Create(string compartmentType, string compartmentId, string resourceType, IReadOnlyList<Tuple<string, string>> queryParameters, bool isAsyncOperation = false, bool useSmartCompartmentDefinition = false, ResourceVersionType resourceVersionTypes = ResourceVersionType.Latest);
SearchOptions Create(
string compartmentType,
string compartmentId,
string resourceType,
IReadOnlyList<Tuple<string, string>> queryParameters,
bool isAsyncOperation = false,
bool useSmartCompartmentDefinition = false,
ResourceVersionType resourceVersionTypes = ResourceVersionType.Latest,
bool onlyIds = false);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,15 @@ public interface ISearchService
/// <param name="cancellationToken">The cancellation token.</param>
/// <param name="isAsyncOperation">Whether the search is part of an async operation.</param>
/// <param name="resourceVersionTypes">Which version types (latest, soft-deleted, history) to include in search.</param>
/// <param name="onlyIds">Whether to return only the resource ids, not the full resource</param>
/// <returns>A <see cref="SearchResult"/> representing the result.</returns>
Task<SearchResult> SearchAsync(
string resourceType,
IReadOnlyList<Tuple<string, string>> queryParameters,
CancellationToken cancellationToken,
bool isAsyncOperation = false,
ResourceVersionType resourceVersionTypes = ResourceVersionType.Latest);
ResourceVersionType resourceVersionTypes = ResourceVersionType.Latest,
bool onlyIds = false);

/// <summary>
/// Searches the resources using the <paramref name="searchOptions"/>.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ internal SearchOptions(SearchOptions other)
ContinuationToken = other.ContinuationToken;
CountOnly = other.CountOnly;
IncludeTotal = other.IncludeTotal;
OnlyIds = other.OnlyIds;

MaxItemCountSpecifiedByClient = other.MaxItemCountSpecifiedByClient;
Expression = other.Expression;
Expand Down Expand Up @@ -135,6 +136,8 @@ internal set

public IReadOnlyList<(string Param, string Value)> QueryHints { get; set; }

public bool OnlyIds { get; set; }

/// <summary>
/// Performs a shallow clone of this instance
/// </summary>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,9 +47,10 @@ public async Task<SearchResult> SearchAsync(
IReadOnlyList<Tuple<string, string>> queryParameters,
CancellationToken cancellationToken,
bool isAsyncOperation = false,
ResourceVersionType resourceVersionTypes = ResourceVersionType.Latest)
ResourceVersionType resourceVersionTypes = ResourceVersionType.Latest,
bool onlyIds = false)
{
SearchOptions searchOptions = _searchOptionsFactory.Create(resourceType, queryParameters, isAsyncOperation, resourceVersionTypes);
SearchOptions searchOptions = _searchOptionsFactory.Create(resourceType, queryParameters, isAsyncOperation, resourceVersionTypes, onlyIds);

// Execute the actual search.
return await SearchAsync(searchOptions, cancellationToken);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,7 @@ public override async Task<SearchResult> SearchAsync(
}

(IReadOnlyList<FhirCosmosResourceWrapper> results, string continuationToken, _) = await ExecuteSearchAsync<FhirCosmosResourceWrapper>(
_queryBuilder.BuildSqlQuerySpec(searchOptions, new QueryBuilderOptions(includeExpressions)),
_queryBuilder.BuildSqlQuerySpec(searchOptions, new QueryBuilderOptions(includeExpressions, searchOptions.OnlyIds ? QueryProjection.IdAndType : QueryProjection.Default)),
searchOptions,
searchOptions.CountOnly ? null : searchOptions.ContinuationToken,
null,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ private ConditionalDeleteResourceRequest SetupConditionalDelete(
int count = 1,
params SearchResultEntry[] searchResults)
{
_searchService.SearchAsync(Arg.Any<string>(), Arg.Any<IReadOnlyList<Tuple<string, string>>>(), Arg.Any<CancellationToken>())
_searchService.SearchAsync(Arg.Any<string>(), Arg.Any<IReadOnlyList<Tuple<string, string>>>(), Arg.Any<CancellationToken>(), resourceVersionTypes: Arg.Any<ResourceVersionType>(), onlyIds: Arg.Any<bool>())
.Returns(new SearchResult(searchResults, null, null, Enumerable.Empty<Tuple<string, string>>().ToArray()));

if (hardDelete)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,9 +113,6 @@ public async Task<long> DeleteMultipleAsync(ConditionalDeleteResourceRequest req
{
EnsureArg.IsNotNull(request, nameof(request));

var stopwatch = new Stopwatch();
stopwatch.Start();

var searchCount = 1000;

IReadOnlyCollection<SearchResultEntry> matchedResults;
Expand All @@ -127,14 +124,13 @@ public async Task<long> DeleteMultipleAsync(ConditionalDeleteResourceRequest req
request.ConditionalParameters,
cancellationToken,
request.DeleteAll ? searchCount : request.MaxDeleteCount,
versionType: request.VersionType);
versionType: request.VersionType,
onlyIds: true);
}

long numDeleted = 0;
long numQueuedForDeletion = 0;

var initialSearchTime = stopwatch.Elapsed.TotalMilliseconds;

var deleteTasks = new List<Task<long>>();
using var cancellationTokenSource = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken);

Expand Down Expand Up @@ -175,12 +171,13 @@ public async Task<long> DeleteMultipleAsync(ConditionalDeleteResourceRequest req
using (var searchService = _searchServiceFactory.Invoke())
{
(matchedResults, ct) = await searchService.Value.ConditionalSearchAsync(
request.ResourceType,
request.ConditionalParameters,
cancellationToken,
request.DeleteAll ? searchCount : (int)(request.MaxDeleteCount - numQueuedForDeletion),
ct,
request.VersionType);
request.ResourceType,
request.ConditionalParameters,
cancellationToken,
request.DeleteAll ? searchCount : (int)(request.MaxDeleteCount - numQueuedForDeletion),
ct,
request.VersionType,
onlyIds: true);
}
}
else
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,9 +70,9 @@ public SearchOptionsFactory(
_resourceTypeSearchParameter = _searchParameterDefinitionManager.GetSearchParameter(ResourceType.Resource.ToString(), SearchParameterNames.ResourceType);
}

public SearchOptions Create(string resourceType, IReadOnlyList<Tuple<string, string>> queryParameters, bool isAsyncOperation = false, ResourceVersionType resourceVersionTypes = ResourceVersionType.Latest)
public SearchOptions Create(string resourceType, IReadOnlyList<Tuple<string, string>> queryParameters, bool isAsyncOperation = false, ResourceVersionType resourceVersionTypes = ResourceVersionType.Latest, bool onlyIds = false)
{
return Create(null, null, resourceType, queryParameters, isAsyncOperation, resourceVersionTypes: resourceVersionTypes);
return Create(null, null, resourceType, queryParameters, isAsyncOperation, resourceVersionTypes: resourceVersionTypes, onlyIds: onlyIds);
}

[SuppressMessage("Design", "CA1308", Justification = "ToLower() is required to format parameter output correctly.")]
Expand All @@ -83,7 +83,8 @@ public SearchOptions Create(
IReadOnlyList<Tuple<string, string>> queryParameters,
bool isAsyncOperation = false,
bool useSmartCompartmentDefinition = false,
ResourceVersionType resourceVersionTypes = ResourceVersionType.Latest)
ResourceVersionType resourceVersionTypes = ResourceVersionType.Latest,
bool onlyIds = false)
{
var searchOptions = new SearchOptions();

Expand Down Expand Up @@ -253,6 +254,8 @@ public SearchOptions Create(
throw new BadRequestException(string.Format(Core.Resources.ElementsAndSummaryParametersAreIncompatible, KnownQueryParameterNames.Summary, KnownQueryParameterNames.Elements));
}

searchOptions.OnlyIds = onlyIds;

// Check to see if only the count should be returned
searchOptions.CountOnly = searchParams.Summary == SummaryType.Count;

Expand Down
Loading