Skip to content

Commit

Permalink
[plugin-azure-data-factory] Add ability to collect pipeline runs
Browse files Browse the repository at this point in the history
  • Loading branch information
valfirst committed Nov 16, 2021
1 parent 61e46de commit 4978be0
Show file tree
Hide file tree
Showing 7 changed files with 296 additions and 14 deletions.
60 changes: 60 additions & 0 deletions docs/modules/plugins/pages/plugin-azure-data-factory.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -66,3 +66,63 @@ When I run pipeline `$pipelineName` in Data Factory `$factoryName` from resource
----
When I run pipeline `vividus-pipeline` in Data Factory `vividus-data-factory` from resource group `vividus-resource-group-ingestion` with wait timeout `PT30S`
----

=== Collect pipeline runs

Collects pipeline runs in Data factory based on input filter conditions.

[source,gherkin]
----
When I collect runs of pipeline `$pipelineName` filtered by:$filters in Data Factory `$factoryName` from resource group `$resourceGroupName` and save them as JSON to $scopes variable `$variableName`
----

* `$pipelineName` - The name of the pipeline to find runs.
* `$filters` - The ExamplesTable with filters to be applied to the pipeline runs to limit the resulting set.
+
.The supported filter types
[cols="1,1,2", options="header"]
|===
|Type
|Alias
|Description

|`LAST_UPDATED_AFTER`
|`last updated after`
|The time at or after which the run event was updated in {iso-date-format-link} format.

|`LAST_UPDATED_BEFORE`
|`last updated before`
|The time at or before which the run event was updated in {iso-date-format-link} format.

|===
+
The filters can be combined in any order and in any composition.
+
.The combination of filters
[source,gherkin]
----
|filterType |filterValue |
|last updated after |2021-11-15T00:00:00+03:00|
|last updated before|2021-11-15T00:00:00+03:00|
----

* `$factoryName` - The name of the factory.
* `$resourceGroupName` - The name of the resource group of the factory.
* `$scopes` - xref:commons:variables.adoc#_scopes[The comma-separated set of the variables scopes].
* `$variableName` - The variable name to store the pipeline runs in JSON format.

[IMPORTANT]
====
The client should have permission to run action `Microsoft.DataFactory/factories/pipelineruns/read`
over scope `/subscriptions/{subscription ID}/resourceGroups/{resource group name}/providers/Microsoft.DataFactory`.
====

.Find pipeline runs from the last day
[source,gherkin]
----
When I collect runs of pipeline `vividus-pipeline` filtered by:
|filterType |filterValue |
|LAST_UPDATED_AFTER |#{generateDate(-P1D, yyyy-MM-dd'T'HH:mm:ssXXX)} |
|LAST_UPDATED_BEFORE |#{generateDate(P, yyyy-MM-dd'T'HH:mm:ssXXX)} |
in Data Factory `vividus-data-factory` from resource group `vividus-resource-group-ingestion` and save them as JSON to scenario variable `pipeline-runs`
----
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,10 @@

package org.vividus.azure.util;

import java.io.IOException;

import com.azure.core.util.serializer.JacksonAdapter;
import com.azure.core.util.serializer.SerializerEncoding;
import com.fasterxml.jackson.annotation.JsonProperty.Access;
import com.fasterxml.jackson.databind.introspect.Annotated;
import com.fasterxml.jackson.databind.introspect.JacksonAnnotationIntrospector;
Expand All @@ -39,4 +42,9 @@ public Access findPropertyAccess(Annotated annotated)
inner.setAnnotationIntrospector(annotationIntrospector);
});
}

public String serializeToJson(Object object) throws IOException
{
return super.serialize(object, SerializerEncoding.JSON);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ void shouldSerializeToJson() throws IOException
var jacksonAdapter = new InnersJacksonAdapter();
var inner = (TestInner) jacksonAdapter.deserialize(inputJson, TestInner.class, SerializerEncoding.JSON);
inner.withPrimaryLocation("westus");
String outputJson = jacksonAdapter.serialize(inner, SerializerEncoding.JSON);
String outputJson = jacksonAdapter.serializeToJson(inner);
assertEquals("{\"name\":\"azure-resource\",\"tags\":{},\"properties\":{\"primaryLocation\":\"westus\"}}",
outputJson);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,16 +16,30 @@

package org.vividus.azure.datafactory.steps;

import java.io.IOException;
import java.time.Duration;
import java.time.OffsetDateTime;
import java.util.List;
import java.util.Set;
import java.util.function.BiConsumer;

import com.azure.core.credential.TokenCredential;
import com.azure.core.management.profile.AzureProfile;
import com.azure.resourcemanager.datafactory.DataFactoryManager;
import com.azure.resourcemanager.datafactory.fluent.models.PipelineRunInner;
import com.azure.resourcemanager.datafactory.models.PipelineRun;
import com.azure.resourcemanager.datafactory.models.RunFilterParameters;
import com.azure.resourcemanager.datafactory.models.RunQueryFilter;
import com.azure.resourcemanager.datafactory.models.RunQueryFilterOperand;
import com.azure.resourcemanager.datafactory.models.RunQueryFilterOperator;

import org.jbehave.core.annotations.AsParameters;
import org.jbehave.core.annotations.When;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.vividus.azure.util.InnersJacksonAdapter;
import org.vividus.bdd.context.IBddVariableContext;
import org.vividus.bdd.variable.VariableScope;
import org.vividus.softassert.ISoftAssert;
import org.vividus.util.wait.DurationBasedWaiter;
import org.vividus.util.wait.WaitMode;
Expand All @@ -36,12 +50,17 @@ public class DataFactorySteps
private static final int RETRY_TIMES = 10;

private final DataFactoryManager dataFactoryManager;
private final InnersJacksonAdapter innersJacksonAdapter;
private final IBddVariableContext bddVariableContext;
private final ISoftAssert softAssert;

public DataFactorySteps(AzureProfile azureProfile, TokenCredential tokenCredential, ISoftAssert softAssert)
public DataFactorySteps(AzureProfile azureProfile, TokenCredential tokenCredential,
InnersJacksonAdapter innersJacksonAdapter, IBddVariableContext bddVariableContext, ISoftAssert softAssert)
{
this.dataFactoryManager = DataFactoryManager.authenticate(tokenCredential, azureProfile);
this.softAssert = softAssert;
this.innersJacksonAdapter = innersJacksonAdapter;
this.bddVariableContext = bddVariableContext;
}

/**
Expand Down Expand Up @@ -96,10 +115,118 @@ public void runPipeline(String pipelineName, String factoryName, String resource
}
}

/**
* Collects pipeline runs in Data factory based on input filter conditions.
*
* @param pipelineName The name of the pipeline to find runs.
* @param filters The ExamplesTable with filters to be applied to the pipeline runs to limit the
* resulting set. The supported filter types are:
* <ul>
* <li><code>LAST_UPDATED_AFTER</code> - the time at or after which the run event was
* updated in ISO-8601 format.</li>
* <li><code>LAST_UPDATED_BEFORE</code> - the time at or before which the run event was
* updated in ISO-8601 format.</li>
* </ul>
* The filters can be combined in any order and in any composition, e.g.<br>
* <code>
* |filterType |filterValue |<br>
* |last updated after |2021-11-15T00:00:00+03:00|<br>
* |last updated before|2021-11-15T00:00:00+03:00|<br>
* </code>
* @param factoryName The name of the factory.
* @param resourceGroupName The name of the resource group of the factory.
* @param scopes The set (comma separated list of scopes e.g.: STORY, NEXT_BATCHES) of the variable
* scopes.<br>
* <i>Available scopes:</i>
* <ul>
* <li><b>STEP</b> - the variable will be available only within the step,
* <li><b>SCENARIO</b> - the variable will be available only within the scenario,
* <li><b>STORY</b> - the variable will be available within the whole story,
* <li><b>NEXT_BATCHES</b> - the variable will be available starting from next batch
* </ul>
* @param variableName The variable name to store the pipeline runs in JSON format.
* @throws IOException if an I/O error occurs
*/
@When("I collect runs of pipeline `$pipelineName` filtered by:$filters in Data Factory `$factoryName` from "
+ "resource group `$resourceGroupName` and save them as JSON to $scopes variable `$variableName`")
@SuppressWarnings("PMD.UseObjectForClearerAPI")
public void collectPipelineRuns(String pipelineName, List<RunFilter> filters, String factoryName,
String resourceGroupName, Set<VariableScope> scopes, String variableName) throws IOException
{
RunFilterParameters filterParameters = new RunFilterParameters()
.withFilters(
List.of(
new RunQueryFilter()
.withOperand(RunQueryFilterOperand.PIPELINE_NAME)
.withOperator(RunQueryFilterOperator.EQUALS)
.withValues(List.of(pipelineName))
)
);
filters.forEach(filter -> filter.getFilterType().addFilter(filterParameters, filter.getFilterValue()));
LOGGER.atInfo().addArgument(() -> {
try
{
return innersJacksonAdapter.serializeToJson(filterParameters);
}
catch (IOException e)
{
return "<unable to log filters: " + e.getMessage() + ">";
}
}).log("Collecting pipeline runs filtered by: {}");
List<PipelineRunInner> runs = dataFactoryManager.pipelineRuns().queryByFactory(resourceGroupName, factoryName,
filterParameters).innerModel().value();
bddVariableContext.putVariable(scopes, variableName, innersJacksonAdapter.serializeToJson(runs));
}

private PipelineRun getPipelineRun(String resourceGroupName, String factoryName, String runId)
{
PipelineRun pipelineRun = dataFactoryManager.pipelineRuns().get(resourceGroupName, factoryName, runId);
LOGGER.atInfo().addArgument(pipelineRun::status).log("The current pipeline run status is \"{}\"");
return pipelineRun;
}

@AsParameters
public static class RunFilter
{
private RunFilterType filterType;
private OffsetDateTime filterValue;

public RunFilterType getFilterType()
{
return filterType;
}

public void setFilterType(RunFilterType filterType)
{
this.filterType = filterType;
}

public OffsetDateTime getFilterValue()
{
return filterValue;
}

public void setFilterValue(OffsetDateTime filterValue)
{
this.filterValue = filterValue;
}
}

public enum RunFilterType
{
LAST_UPDATED_AFTER(RunFilterParameters::withLastUpdatedAfter),
LAST_UPDATED_BEFORE(RunFilterParameters::withLastUpdatedBefore);

private final BiConsumer<RunFilterParameters, OffsetDateTime> filterSetter;

RunFilterType(BiConsumer<RunFilterParameters, OffsetDateTime> filterSetter)
{
this.filterSetter = filterSetter;
}

public void addFilter(RunFilterParameters filterParameters, OffsetDateTime filterValue)
{
filterSetter.accept(filterParameters, filterValue);
}
}
}
Loading

0 comments on commit 4978be0

Please # to comment.