diff --git a/docs/modules/plugins/pages/plugin-azure-data-factory.adoc b/docs/modules/plugins/pages/plugin-azure-data-factory.adoc index 1f6e47a48d..005b8351f5 100644 --- a/docs/modules/plugins/pages/plugin-azure-data-factory.adoc +++ b/docs/modules/plugins/pages/plugin-azure-data-factory.adoc @@ -66,3 +66,63 @@ When I run pipeline `$pipelineName` in Data Factory `$factoryName` from resource ---- When I run pipeline `vividus-pipeline` in Data Factory `vividus-data-factory` from resource group `vividus-resource-group-ingestion` with wait timeout `PT30S` ---- + +=== Collect pipeline runs + +Collects pipeline runs in Data factory based on input filter conditions. + +[source,gherkin] +---- +When I collect runs of pipeline `$pipelineName` filtered by:$filters in Data Factory `$factoryName` from resource group `$resourceGroupName` and save them as JSON to $scopes variable `$variableName` +---- + +* `$pipelineName` - The name of the pipeline to find runs. +* `$filters` - The ExamplesTable with filters to be applied to the pipeline runs to limit the resulting set. ++ +.The supported filter types +[cols="1,1,2", options="header"] +|=== +|Type +|Alias +|Description + +|`LAST_UPDATED_AFTER` +|`last updated after` +|The time at or after which the run event was updated in {iso-date-format-link} format. + +|`LAST_UPDATED_BEFORE` +|`last updated before` +|The time at or before which the run event was updated in {iso-date-format-link} format. + +|=== ++ +The filters can be combined in any order and in any composition. ++ +.The combination of filters +[source,gherkin] +---- +|filterType |filterValue | +|last updated after |2021-11-15T00:00:00+03:00| +|last updated before|2021-11-15T00:00:00+03:00| +---- + +* `$factoryName` - The name of the factory. +* `$resourceGroupName` - The name of the resource group of the factory. +* `$scopes` - xref:commons:variables.adoc#_scopes[The comma-separated set of the variables scopes]. +* `$variableName` - The variable name to store the pipeline runs in JSON format. + +[IMPORTANT] +==== +The client should have permission to run action `Microsoft.DataFactory/factories/pipelineruns/read` +over scope `/subscriptions/{subscription ID}/resourceGroups/{resource group name}/providers/Microsoft.DataFactory`. +==== + +.Find pipeline runs from the last day +[source,gherkin] +---- +When I collect runs of pipeline `vividus-pipeline` filtered by: +|filterType |filterValue | +|LAST_UPDATED_AFTER |#{generateDate(-P1D, yyyy-MM-dd'T'HH:mm:ssXXX)} | +|LAST_UPDATED_BEFORE |#{generateDate(P, yyyy-MM-dd'T'HH:mm:ssXXX)} | +in Data Factory `vividus-data-factory` from resource group `vividus-resource-group-ingestion` and save them as JSON to scenario variable `pipeline-runs` +---- diff --git a/vividus-extension-azure/src/main/java/org/vividus/azure/util/InnersJacksonAdapter.java b/vividus-extension-azure/src/main/java/org/vividus/azure/util/InnersJacksonAdapter.java index 674cf48248..d4ee15a9cb 100644 --- a/vividus-extension-azure/src/main/java/org/vividus/azure/util/InnersJacksonAdapter.java +++ b/vividus-extension-azure/src/main/java/org/vividus/azure/util/InnersJacksonAdapter.java @@ -16,7 +16,10 @@ package org.vividus.azure.util; +import java.io.IOException; + import com.azure.core.util.serializer.JacksonAdapter; +import com.azure.core.util.serializer.SerializerEncoding; import com.fasterxml.jackson.annotation.JsonProperty.Access; import com.fasterxml.jackson.databind.introspect.Annotated; import com.fasterxml.jackson.databind.introspect.JacksonAnnotationIntrospector; @@ -39,4 +42,9 @@ public Access findPropertyAccess(Annotated annotated) inner.setAnnotationIntrospector(annotationIntrospector); }); } + + public String serializeToJson(Object object) throws IOException + { + return super.serialize(object, SerializerEncoding.JSON); + } } diff --git a/vividus-extension-azure/src/test/java/org/vividus/azure/util/InnersJacksonAdapterTests.java b/vividus-extension-azure/src/test/java/org/vividus/azure/util/InnersJacksonAdapterTests.java index 2fc78e1f52..f0c9f8f486 100644 --- a/vividus-extension-azure/src/test/java/org/vividus/azure/util/InnersJacksonAdapterTests.java +++ b/vividus-extension-azure/src/test/java/org/vividus/azure/util/InnersJacksonAdapterTests.java @@ -40,7 +40,7 @@ void shouldSerializeToJson() throws IOException var jacksonAdapter = new InnersJacksonAdapter(); var inner = (TestInner) jacksonAdapter.deserialize(inputJson, TestInner.class, SerializerEncoding.JSON); inner.withPrimaryLocation("westus"); - String outputJson = jacksonAdapter.serialize(inner, SerializerEncoding.JSON); + String outputJson = jacksonAdapter.serializeToJson(inner); assertEquals("{\"name\":\"azure-resource\",\"tags\":{},\"properties\":{\"primaryLocation\":\"westus\"}}", outputJson); } diff --git a/vividus-plugin-azure-data-factory/src/main/java/org/vividus/azure/datafactory/steps/DataFactorySteps.java b/vividus-plugin-azure-data-factory/src/main/java/org/vividus/azure/datafactory/steps/DataFactorySteps.java index 5e23ce8bb9..5d98a78e36 100644 --- a/vividus-plugin-azure-data-factory/src/main/java/org/vividus/azure/datafactory/steps/DataFactorySteps.java +++ b/vividus-plugin-azure-data-factory/src/main/java/org/vividus/azure/datafactory/steps/DataFactorySteps.java @@ -16,16 +16,30 @@ package org.vividus.azure.datafactory.steps; +import java.io.IOException; import java.time.Duration; +import java.time.OffsetDateTime; +import java.util.List; +import java.util.Set; +import java.util.function.BiConsumer; import com.azure.core.credential.TokenCredential; import com.azure.core.management.profile.AzureProfile; import com.azure.resourcemanager.datafactory.DataFactoryManager; +import com.azure.resourcemanager.datafactory.fluent.models.PipelineRunInner; import com.azure.resourcemanager.datafactory.models.PipelineRun; +import com.azure.resourcemanager.datafactory.models.RunFilterParameters; +import com.azure.resourcemanager.datafactory.models.RunQueryFilter; +import com.azure.resourcemanager.datafactory.models.RunQueryFilterOperand; +import com.azure.resourcemanager.datafactory.models.RunQueryFilterOperator; +import org.jbehave.core.annotations.AsParameters; import org.jbehave.core.annotations.When; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.vividus.azure.util.InnersJacksonAdapter; +import org.vividus.bdd.context.IBddVariableContext; +import org.vividus.bdd.variable.VariableScope; import org.vividus.softassert.ISoftAssert; import org.vividus.util.wait.DurationBasedWaiter; import org.vividus.util.wait.WaitMode; @@ -36,12 +50,17 @@ public class DataFactorySteps private static final int RETRY_TIMES = 10; private final DataFactoryManager dataFactoryManager; + private final InnersJacksonAdapter innersJacksonAdapter; + private final IBddVariableContext bddVariableContext; private final ISoftAssert softAssert; - public DataFactorySteps(AzureProfile azureProfile, TokenCredential tokenCredential, ISoftAssert softAssert) + public DataFactorySteps(AzureProfile azureProfile, TokenCredential tokenCredential, + InnersJacksonAdapter innersJacksonAdapter, IBddVariableContext bddVariableContext, ISoftAssert softAssert) { this.dataFactoryManager = DataFactoryManager.authenticate(tokenCredential, azureProfile); this.softAssert = softAssert; + this.innersJacksonAdapter = innersJacksonAdapter; + this.bddVariableContext = bddVariableContext; } /** @@ -96,10 +115,118 @@ public void runPipeline(String pipelineName, String factoryName, String resource } } + /** + * Collects pipeline runs in Data factory based on input filter conditions. + * + * @param pipelineName The name of the pipeline to find runs. + * @param filters The ExamplesTable with filters to be applied to the pipeline runs to limit the + * resulting set. The supported filter types are: + * + * The filters can be combined in any order and in any composition, e.g.
+ * + * |filterType |filterValue |
+ * |last updated after |2021-11-15T00:00:00+03:00|
+ * |last updated before|2021-11-15T00:00:00+03:00|
+ *
+ * @param factoryName The name of the factory. + * @param resourceGroupName The name of the resource group of the factory. + * @param scopes The set (comma separated list of scopes e.g.: STORY, NEXT_BATCHES) of the variable + * scopes.
+ * Available scopes: + * + * @param variableName The variable name to store the pipeline runs in JSON format. + * @throws IOException if an I/O error occurs + */ + @When("I collect runs of pipeline `$pipelineName` filtered by:$filters in Data Factory `$factoryName` from " + + "resource group `$resourceGroupName` and save them as JSON to $scopes variable `$variableName`") + @SuppressWarnings("PMD.UseObjectForClearerAPI") + public void collectPipelineRuns(String pipelineName, List filters, String factoryName, + String resourceGroupName, Set scopes, String variableName) throws IOException + { + RunFilterParameters filterParameters = new RunFilterParameters() + .withFilters( + List.of( + new RunQueryFilter() + .withOperand(RunQueryFilterOperand.PIPELINE_NAME) + .withOperator(RunQueryFilterOperator.EQUALS) + .withValues(List.of(pipelineName)) + ) + ); + filters.forEach(filter -> filter.getFilterType().addFilter(filterParameters, filter.getFilterValue())); + LOGGER.atInfo().addArgument(() -> { + try + { + return innersJacksonAdapter.serializeToJson(filterParameters); + } + catch (IOException e) + { + return ""; + } + }).log("Collecting pipeline runs filtered by: {}"); + List runs = dataFactoryManager.pipelineRuns().queryByFactory(resourceGroupName, factoryName, + filterParameters).innerModel().value(); + bddVariableContext.putVariable(scopes, variableName, innersJacksonAdapter.serializeToJson(runs)); + } + private PipelineRun getPipelineRun(String resourceGroupName, String factoryName, String runId) { PipelineRun pipelineRun = dataFactoryManager.pipelineRuns().get(resourceGroupName, factoryName, runId); LOGGER.atInfo().addArgument(pipelineRun::status).log("The current pipeline run status is \"{}\""); return pipelineRun; } + + @AsParameters + public static class RunFilter + { + private RunFilterType filterType; + private OffsetDateTime filterValue; + + public RunFilterType getFilterType() + { + return filterType; + } + + public void setFilterType(RunFilterType filterType) + { + this.filterType = filterType; + } + + public OffsetDateTime getFilterValue() + { + return filterValue; + } + + public void setFilterValue(OffsetDateTime filterValue) + { + this.filterValue = filterValue; + } + } + + public enum RunFilterType + { + LAST_UPDATED_AFTER(RunFilterParameters::withLastUpdatedAfter), + LAST_UPDATED_BEFORE(RunFilterParameters::withLastUpdatedBefore); + + private final BiConsumer filterSetter; + + RunFilterType(BiConsumer filterSetter) + { + this.filterSetter = filterSetter; + } + + public void addFilter(RunFilterParameters filterParameters, OffsetDateTime filterValue) + { + filterSetter.accept(filterParameters, filterValue); + } + } } diff --git a/vividus-plugin-azure-data-factory/src/test/java/org/vividus/azure/datafactory/steps/DataFactoryStepsTests.java b/vividus-plugin-azure-data-factory/src/test/java/org/vividus/azure/datafactory/steps/DataFactoryStepsTests.java index 23037191e8..9208c74e45 100644 --- a/vividus-plugin-azure-data-factory/src/test/java/org/vividus/azure/datafactory/steps/DataFactoryStepsTests.java +++ b/vividus-plugin-azure-data-factory/src/test/java/org/vividus/azure/datafactory/steps/DataFactoryStepsTests.java @@ -21,33 +21,54 @@ import static com.github.valfirst.slf4jtest.LoggingEvent.warn; import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.is; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.eq; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.mockStatic; +import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; +import java.io.IOException; import java.time.Duration; import java.time.OffsetDateTime; import java.util.ArrayList; import java.util.List; +import java.util.Map; +import java.util.Set; import java.util.function.BiConsumer; import com.azure.core.credential.TokenCredential; import com.azure.core.management.profile.AzureProfile; import com.azure.resourcemanager.datafactory.DataFactoryManager; +import com.azure.resourcemanager.datafactory.fluent.models.PipelineRunInner; +import com.azure.resourcemanager.datafactory.fluent.models.PipelineRunsQueryResponseInner; import com.azure.resourcemanager.datafactory.models.CreateRunResponse; import com.azure.resourcemanager.datafactory.models.PipelineRun; import com.azure.resourcemanager.datafactory.models.PipelineRuns; +import com.azure.resourcemanager.datafactory.models.PipelineRunsQueryResponse; import com.azure.resourcemanager.datafactory.models.Pipelines; +import com.azure.resourcemanager.datafactory.models.RunFilterParameters; +import com.azure.resourcemanager.datafactory.models.RunQueryFilterOperand; +import com.azure.resourcemanager.datafactory.models.RunQueryFilterOperator; import com.github.valfirst.slf4jtest.LoggingEvent; import com.github.valfirst.slf4jtest.TestLogger; import com.github.valfirst.slf4jtest.TestLoggerFactory; import com.github.valfirst.slf4jtest.TestLoggerFactoryExtension; +import org.apache.commons.lang3.function.FailableBiConsumer; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.ArgumentCaptor; import org.mockito.Mock; import org.mockito.MockedStatic; +import org.mockito.Spy; import org.mockito.junit.jupiter.MockitoExtension; +import org.vividus.azure.datafactory.steps.DataFactorySteps.RunFilter; +import org.vividus.azure.datafactory.steps.DataFactorySteps.RunFilterType; +import org.vividus.azure.util.InnersJacksonAdapter; +import org.vividus.bdd.context.IBddVariableContext; +import org.vividus.bdd.variable.VariableScope; import org.vividus.softassert.ISoftAssert; @ExtendWith({ MockitoExtension.class, TestLoggerFactoryExtension.class }) @@ -66,16 +87,19 @@ class DataFactoryStepsTests + "pipeline `$pipelineName` in Data Factory `$factoryName` from resource group " + "`$resourceGroupName` with wait timeout `$waitTimeout` and expect run status to be equal to" + " `$expectedPipelineRunStatus`\""; + private static final String FILTER_LOG_MESSAGE = "Collecting pipeline runs filtered by: {}"; private final TestLogger logger = TestLoggerFactory.getTestLogger(DataFactorySteps.class); @Mock private AzureProfile azureProfile; @Mock private TokenCredential tokenCredential; + @Spy private InnersJacksonAdapter innersJacksonAdapter = new InnersJacksonAdapter(); + @Mock private IBddVariableContext bddVariableContext; @Mock private ISoftAssert softAssert; @Test @SuppressWarnings("removal") - void shouldRunSuccessfulPipeline() + void shouldRunSuccessfulPipeline() throws IOException { shouldRunPipeline(mock(PipelineRun.class), SUCCEEDED, SUCCEEDED, (steps, loggingEvents) -> { loggingEvents.add(0, warn(DEPRECATION_NOTICE)); @@ -85,7 +109,7 @@ void shouldRunSuccessfulPipeline() @Test @SuppressWarnings("removal") - void shouldRunFailingPipeline() + void shouldRunFailingPipeline() throws IOException { var errorMessage = "error message"; PipelineRun finalPipelineRunState = mock(PipelineRun.class); @@ -99,15 +123,36 @@ void shouldRunFailingPipeline() } @Test - void shouldRunFailingPipelineAsExpected() + void shouldRunFailingPipelineAsExpected() throws IOException { shouldRunPipeline(mock(PipelineRun.class), FAILED, FAILED, (steps, loggingEvents) -> { steps.runPipeline(PIPELINE_NAME, FACTORY_NAME, RESOURCE_GROUP_NAME, WAIT_TIMEOUT, FAILED); }); } + @Test + void shouldCollectPipelineRunsSuccessfully() throws IOException + { + shouldCollectPipelineRuns(); + assertThat(logger.getLoggingEvents(), is(List.of(info(FILTER_LOG_MESSAGE, + "{\"lastUpdatedAfter\":\"2021-11-14T21:00:00Z\",\"lastUpdatedBefore\":\"2021-11-15T21:00:00Z\"," + + "\"filters\":[{\"operand\":\"PipelineName\",\"operator\":\"Equals\"," + + "\"values\":[\"pipelineName\"]}]}"))) + ); + } + + @Test + void shouldCollectPipelineRunsWithErrorOnFiltersLogging() throws IOException + { + var ioException = new IOException("IO error"); + when(innersJacksonAdapter.serializeToJson(any())).thenThrow(ioException).thenCallRealMethod(); + shouldCollectPipelineRuns(); + assertThat(logger.getLoggingEvents(), + is(List.of(info(FILTER_LOG_MESSAGE, "")))); + } + private void shouldRunPipeline(PipelineRun finalPipelineRunState, String finalRunStatus, String expectedRunStatus, - BiConsumer> pipelineRunner) + BiConsumer> pipelineRunner) throws IOException { String runId = "run-id"; @@ -149,14 +194,60 @@ private void shouldRunPipeline(PipelineRun finalPipelineRunState, String finalRu }); } - private void executeSteps(BiConsumer consumer) + private void shouldCollectPipelineRuns() throws IOException + { + executeSteps((dataFactoryManager, steps) -> { + var pipelineRunInner = new PipelineRunInner().withAdditionalProperties(Map.of("key", "PipelineRunInner")); + + var innerQueryResponse = mock(PipelineRunsQueryResponseInner.class); + when(innerQueryResponse.value()).thenReturn(List.of(pipelineRunInner)); + + var queryResponse = mock(PipelineRunsQueryResponse.class); + when(queryResponse.innerModel()).thenReturn(innerQueryResponse); + + var pipelineRuns = mock(PipelineRuns.class); + var runFilterParametersCaptor = ArgumentCaptor.forClass(RunFilterParameters.class); + when(pipelineRuns.queryByFactory(eq(RESOURCE_GROUP_NAME), eq(FACTORY_NAME), + runFilterParametersCaptor.capture())).thenReturn(queryResponse); + + when(dataFactoryManager.pipelineRuns()).thenReturn(pipelineRuns); + + var filter1 = new RunFilter(); + filter1.setFilterType(RunFilterType.LAST_UPDATED_AFTER); + filter1.setFilterValue(OffsetDateTime.parse("2021-11-15T00:00:00+03:00")); + var filter2 = new RunFilter(); + filter2.setFilterType(RunFilterType.LAST_UPDATED_BEFORE); + filter2.setFilterValue(OffsetDateTime.parse("2021-11-16T00:00:00+03:00")); + + var scopes = Set.of(VariableScope.SCENARIO); + var variableName = "varName"; + steps.collectPipelineRuns(PIPELINE_NAME, List.of(filter1, filter2), FACTORY_NAME, RESOURCE_GROUP_NAME, + scopes, variableName); + + verify(bddVariableContext).putVariable(scopes, variableName, "[{\"key\":\"PipelineRunInner\"}]"); + + var runFilterParameters = runFilterParametersCaptor.getValue(); + assertEquals(filter1.getFilterValue(), runFilterParameters.lastUpdatedAfter()); + assertEquals(filter2.getFilterValue(), runFilterParameters.lastUpdatedBefore()); + var runQueryFilters = runFilterParameters.filters(); + assertEquals(1, runQueryFilters.size()); + var runQueryFilter = runQueryFilters.get(0); + assertEquals(RunQueryFilterOperand.PIPELINE_NAME, runQueryFilter.operand()); + assertEquals(RunQueryFilterOperator.EQUALS, runQueryFilter.operator()); + assertEquals(List.of(PIPELINE_NAME), runQueryFilter.values()); + }); + } + + private void executeSteps(FailableBiConsumer consumer) + throws IOException { try (MockedStatic dataFactoryManagerStaticMock = mockStatic(DataFactoryManager.class)) { var dataFactoryManager = mock(DataFactoryManager.class); dataFactoryManagerStaticMock.when(() -> DataFactoryManager.authenticate(tokenCredential, azureProfile)) .thenReturn(dataFactoryManager); - var steps = new DataFactorySteps(azureProfile, tokenCredential, softAssert); + var steps = new DataFactorySteps(azureProfile, tokenCredential, innersJacksonAdapter, bddVariableContext, + softAssert); consumer.accept(dataFactoryManager, steps); } } diff --git a/vividus-plugin-azure-event-grid/src/main/java/org/vividus/azure/eventgrid/EventGridManagementSteps.java b/vividus-plugin-azure-event-grid/src/main/java/org/vividus/azure/eventgrid/EventGridManagementSteps.java index 282385dbf1..f0c5247d8c 100644 --- a/vividus-plugin-azure-event-grid/src/main/java/org/vividus/azure/eventgrid/EventGridManagementSteps.java +++ b/vividus-plugin-azure-event-grid/src/main/java/org/vividus/azure/eventgrid/EventGridManagementSteps.java @@ -24,7 +24,6 @@ import com.azure.core.credential.TokenCredential; import com.azure.core.management.profile.AzureProfile; -import com.azure.core.util.serializer.SerializerEncoding; import com.azure.resourcemanager.eventgrid.EventGridManager; import com.azure.resourcemanager.eventgrid.fluent.models.SystemTopicInner; @@ -75,7 +74,6 @@ public void listSystemTopics(String resourceGroupName, Set scopes .stream() .collect(toList()); - bddVariableContext.putVariable(scopes, variableName, - innersJacksonAdapter.serialize(systemTopics, SerializerEncoding.JSON)); + bddVariableContext.putVariable(scopes, variableName, innersJacksonAdapter.serializeToJson(systemTopics)); } } diff --git a/vividus-plugin-azure-storage-account/src/main/java/org/vividus/azure/storage/StorageAccountManagementSteps.java b/vividus-plugin-azure-storage-account/src/main/java/org/vividus/azure/storage/StorageAccountManagementSteps.java index 3ac7daaa37..f3b28e76c3 100644 --- a/vividus-plugin-azure-storage-account/src/main/java/org/vividus/azure/storage/StorageAccountManagementSteps.java +++ b/vividus-plugin-azure-storage-account/src/main/java/org/vividus/azure/storage/StorageAccountManagementSteps.java @@ -24,7 +24,6 @@ import com.azure.core.credential.TokenCredential; import com.azure.core.management.profile.AzureProfile; -import com.azure.core.util.serializer.SerializerEncoding; import com.azure.resourcemanager.storage.StorageManager; import com.azure.resourcemanager.storage.fluent.models.StorageAccountInner; @@ -75,7 +74,6 @@ public void listStorageAccounts(String resourceGroupName, Set sco .stream() .collect(toList()); - bddVariableContext.putVariable(scopes, variableName, - innersJacksonAdapter.serialize(storageAccounts, SerializerEncoding.JSON)); + bddVariableContext.putVariable(scopes, variableName, innersJacksonAdapter.serializeToJson(storageAccounts)); } }