Skip to content

feat: supports aggregateWindow in LINQ expressions #282

New issue

Have a question about this project? # for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “#”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? # to your account

Merged
merged 8 commits into from
Feb 7, 2022
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,11 @@
## 3.4.0 [unreleased]

### Breaking Changes
Changed type of `Duration.magnitude` from `int?` to `long?`.

### Features
1. [#282](https://github.com/influxdata/influxdb-client-csharp/pull/282): Add support for AggregateWindow function [LINQ]

## 3.3.0 [2022-02-04]

### Bug Fixes
8 changes: 8 additions & 0 deletions Client.Linq.Test/DomainObjects.cs
Original file line number Diff line number Diff line change
@@ -86,4 +86,12 @@ public class DataEntityWithLong
{
public long EndWithTicks { get; set; }
}

class SensorDateTimeAsField
{
[Column("data")]
public int Value { get; set; }

[Column( "dataTime")] public DateTime DateTimeField { get; set; }
}
}
106 changes: 106 additions & 0 deletions Client.Linq.Test/InfluxDBQueryVisitorTest.cs
Original file line number Diff line number Diff line change
@@ -929,6 +929,112 @@ public void FilterByLong()
Assert.AreEqual("p3", endWithTicksAssignment?.Id.Name);
Assert.AreEqual("637656739543829486", (endWithTicksAssignment?.Init as IntegerLiteral)?.Value);
}

[Test]
public void AggregateWindow()
{
var query = from s in InfluxDBQueryable<Sensor>.Queryable("my-bucket", "my-org", _queryApi)
where s.Timestamp.AggregateWindow(TimeSpan.FromSeconds(20), TimeSpan.FromSeconds(40), "mean")
where s.Value == 5
select s;
var visitor = BuildQueryVisitor(query);

StringAssert.Contains("aggregateWindow(every: p3, period: p4, fn: p5)", visitor.BuildFluxQuery());

var ast = visitor.BuildFluxAST();

Assert.NotNull(ast);
Assert.NotNull(ast.Body);
Assert.AreEqual(6, ast.Body.Count);

var everyAssignment = ((OptionStatement) ast.Body[2]).Assignment as VariableAssignment;
Assert.AreEqual("p3", everyAssignment?.Id.Name);
Assert.AreEqual(20000000, (everyAssignment.Init as DurationLiteral)?.Values[0].Magnitude);
Assert.AreEqual("us", (everyAssignment.Init as DurationLiteral)?.Values[0].Unit);

var periodAssignment = ((OptionStatement) ast.Body[3]).Assignment as VariableAssignment;
Assert.AreEqual("p4", periodAssignment?.Id.Name);
Assert.AreEqual(40000000, (periodAssignment.Init as DurationLiteral)?.Values[0].Magnitude);
Assert.AreEqual("us", (periodAssignment.Init as DurationLiteral)?.Values[0].Unit);

var fnAssignment = ((OptionStatement) ast.Body[4]).Assignment as VariableAssignment;
Assert.AreEqual("p5", fnAssignment?.Id.Name);
Assert.AreEqual("mean", (fnAssignment.Init as Identifier)?.Name);
}

[Test]
public void AggregateWindowFluxQuery()
{
var queries = new[]
{
(
from s in InfluxDBQueryable<Sensor>.Queryable("my-bucket", "my-org", _queryApi)
where s.Timestamp.AggregateWindow(TimeSpan.FromSeconds(20), TimeSpan.FromSeconds(40), "mean")
select s,
"aggregateWindow(every: p3, period: p4, fn: p5)",
""
),
(
from s in InfluxDBQueryable<Sensor>.Queryable("my-bucket", "my-org", _queryApi)
where s.Timestamp.AggregateWindow(TimeSpan.FromSeconds(20), null, "mean")
select s,
"aggregateWindow(every: p3, fn: p4)",
""
),
(
from s in InfluxDBQueryable<Sensor>.Queryable("my-bucket", "my-org", _queryApi)
where s.Timestamp.AggregateWindow(TimeSpan.FromSeconds(20), null, "mean")
where s.Value == 5
select s,
"aggregateWindow(every: p3, fn: p4)",
" |> filter(fn: (r) => (r[\"data\"] == p5))"
),
(
from s in InfluxDBQueryable<Sensor>.Queryable("my-bucket", "my-org", _queryApi)
where s.Value == 5
where s.Timestamp.AggregateWindow(TimeSpan.FromSeconds(20), null, "mean")
select s,
"aggregateWindow(every: p4, fn: p5)",
" |> filter(fn: (r) => (r[\"data\"] == p3))"
),
(
from s in InfluxDBQueryable<Sensor>.Queryable("my-bucket", "my-org", _queryApi)
where s.Deployment == "prod"
where s.Value == 5
where s.Timestamp.AggregateWindow(TimeSpan.FromSeconds(20), null, "mean")
select s,
"filter(fn: (r) => (r[\"deployment\"] == p3)) |> aggregateWindow(every: p5, fn: p6)",
" |> filter(fn: (r) => (r[\"data\"] == p4))"
),
(
from s in InfluxDBQueryable<Sensor>.Queryable("my-bucket", "my-org", _queryApi)
where s.Deployment == "prod" && s.Value == 5 && s.Timestamp.AggregateWindow(TimeSpan.FromSeconds(20), null, "mean")
select s,
"filter(fn: (r) => (r[\"deployment\"] == p3)) |> aggregateWindow(every: p5, fn: p6)",
" |> filter(fn: (r) => (r[\"data\"] == p4))"
)
};

foreach (var (queryable, expected, filter) in queries)
{
var visitor = BuildQueryVisitor(queryable);

var flux = "start_shifted = int(v: time(v: p2))\n\nfrom(bucket: p1) |> range(start: time(v: start_shifted)) |> " + expected + " |> pivot(rowKey:[\"_time\"], columnKey: [\"_field\"], valueColumn: \"_value\") |> drop(columns: [\"_start\", \"_stop\", \"_measurement\"])" + filter;
Assert.AreEqual(flux, visitor.BuildFluxQuery());
}
}

[Test]
public void AggregateWindowOnlyForTimestamp()
{
var query = from s in InfluxDBQueryable<SensorDateTimeAsField>.Queryable("my-bucket", "my-org", _queryApi)
where s.DateTimeField.AggregateWindow(TimeSpan.FromSeconds(20), TimeSpan.FromSeconds(40), "mean")
where s.Value == 5
select s;

var nse = Assert.Throws<NotSupportedException>(() => BuildQueryVisitor(query));
Assert.AreEqual("AggregateWindow() has to be used only for Timestamp member, e.g. [Column(IsTimestamp = true)].", nse?.Message);
}

private InfluxDBQueryVisitor BuildQueryVisitor(IQueryable queryable, Expression expression = null)
{
17 changes: 17 additions & 0 deletions Client.Linq.Test/ItInfluxDBQueryableTest.cs
Original file line number Diff line number Diff line change
@@ -454,6 +454,23 @@ orderby s.Timestamp

Assert.AreEqual(8, count);
}

[Test]
public void QueryAggregateWindow()
{
var query = from s in InfluxDBQueryable<Sensor>.Queryable("my-bucket", "my-org", _client.GetQueryApiSync())
where s.Timestamp.AggregateWindow(TimeSpan.FromDays(4), null, "mean")
where s.Timestamp > new DateTime(2020, 11, 15, 0, 0, 0, DateTimeKind.Utc)
where s.Timestamp < new DateTime(2020, 11, 18, 0, 0, 0, DateTimeKind.Utc)
select s;

var sensors = query.ToList();

Assert.AreEqual(2, sensors.Count);
// (28 + 12 + 89) / 3 = 43
Assert.AreEqual(43, sensors[0].Value);
Assert.AreEqual(43, sensors.Last().Value);
}

[TearDown]
protected void After()
44 changes: 44 additions & 0 deletions Client.Linq.Test/VariableAggregatorTest.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
using System;
using InfluxDB.Client.Api.Domain;
using InfluxDB.Client.Core.Test;
using InfluxDB.Client.Linq.Internal;
using NUnit.Framework;

namespace Client.Linq.Test
{
[TestFixture]
public class VariableAggregatorTest : AbstractTest
{
[Test]
public void TimeStamp()
{
var data = new[]
{
(
TimeSpan.FromMilliseconds(1),
1000
),
(
TimeSpan.FromMilliseconds(-1),
-1000
),
(
TimeSpan.FromDays(2 * 365),
63072000000000
)
};

foreach (var (timeSpan, expected) in data)
{
var aggregator = new VariableAggregator();
aggregator.AddNamedVariable(timeSpan);

var duration =
(((aggregator.GetStatements()[0] as OptionStatement)?.Assignment as VariableAssignment)?.Init as
DurationLiteral)?.Values[0];
Assert.NotNull(duration);
Assert.AreEqual(expected, duration.Magnitude);
}
}
}
}
25 changes: 25 additions & 0 deletions Client.Linq/InfluxDBQueryable.cs
Original file line number Diff line number Diff line change
@@ -237,5 +237,30 @@ public static InfluxDBQueryable<T> ToInfluxQueryable<T>(this IQueryable<T> sourc

return queryable;
}

/// <summary>
/// The extension to use Flux window operator. For more info see https://docs.influxdata.com/flux/v0.x/stdlib/universe/aggregatewindow/
///
/// <example>
/// <code>
/// var query = from s in InfluxDBQueryable&lt;Sensor&gt;.Queryable("my-bucket", "my-org", _queryApi)
/// where s.Timestamp.AggregateWindow(TimeSpan.FromSeconds(20), TimeSpan.FromSeconds(40), "mean")
/// where s.Value == 5
/// select s;
/// </code>
/// </example>
/// </summary>
/// <param name="timestamp">The entity value which is market as a Timestamp.</param>
/// <param name="every">Duration of time between windows.</param>
/// <param name="period">Duration of the window.</param>
/// <param name="fn">Aggregate or selector function used to operate on each window of time.</param>
/// <returns>NotSupportedException if it's called outside LINQ expression.</returns>
/// <exception cref="NotSupportedException">Caused by calling outside of LINQ expression.</exception>
// ReSharper disable UnusedParameter.Global
public static bool AggregateWindow(this DateTime timestamp, TimeSpan every, TimeSpan? period = null, string fn = "mean")
{
throw new NotSupportedException("This should be used only in LINQ expression. " +
"Something like: 'where s.Timestamp.AggregateWindow(TimeSpan.FromSeconds(20), TimeSpan.FromSeconds(40), \"mean\")'.");
}
}
}
28 changes: 28 additions & 0 deletions Client.Linq/Internal/QueryAggregator.cs
Original file line number Diff line number Diff line change
@@ -65,6 +65,7 @@ internal class QueryAggregator
private readonly List<string> _filterByTags;
private readonly List<string> _filterByFields;
private readonly List<(string, string, bool, string)> _orders;
private (string Every, string Period, string Fn)? _aggregateWindow;

internal QueryAggregator()
{
@@ -73,6 +74,7 @@ internal QueryAggregator()
_filterByTags = new List<string>();
_filterByFields = new List<string>();
_orders = new List<(string, string, bool, string)>();
_aggregateWindow = null;
}

internal void AddBucket(string bucket)
@@ -91,6 +93,12 @@ internal void AddRangeStop(string rangeStop, RangeExpressionType expressionType)
_rangeStopAssignment = rangeStop;
_rangeStopExpression = expressionType;
}

internal void AddAggregateWindow(string everyVariable, string periodVariable, string fnVariable)
{
_aggregateWindow = (everyVariable, periodVariable, fnVariable);
}


internal void AddLimitN(string limitNAssignment)
{
@@ -155,6 +163,7 @@ internal string BuildFluxQuery(QueryableOptimizerSettings settings)
BuildOperator("from", "bucket", _bucketAssignment),
BuildRange(transforms),
BuildFilter(_filterByTags),
BuildAggregateWindow(_aggregateWindow),
"pivot(rowKey:[\"_time\"], columnKey: [\"_field\"], valueColumn: \"_value\")"
};

@@ -209,6 +218,25 @@ internal string BuildFluxQuery(QueryableOptimizerSettings settings)
return query.ToString();
}

private string BuildAggregateWindow((string Every, string Period, string Fn)? aggregateWindow)
{
if (aggregateWindow == null)
{
return null;
}

var (every, period, fn) = aggregateWindow.Value;
var list = new List<string>
{
$"every: {every}",
period != null ? $"period: {period}" : null,
$"fn: {fn}"
};


return $"aggregateWindow({JoinList(list, ", ")})";
}

private string BuildDrop(QueryableOptimizerSettings settings)
{
var columns = new List<string>();
55 changes: 53 additions & 2 deletions Client.Linq/Internal/QueryExpressionTreeVisitor.cs
Original file line number Diff line number Diff line change
@@ -2,12 +2,17 @@
using System.Collections.Generic;
using System.Linq;
using System.Linq.Expressions;
using InfluxDB.Client.Api.Domain;
using InfluxDB.Client.Core;
using InfluxDB.Client.Linq.Internal.Expressions;
using Remotion.Linq.Clauses;
using Remotion.Linq.Clauses.Expressions;
using Remotion.Linq.Clauses.ResultOperators;
using Remotion.Linq.Parsing;
using BinaryExpression = System.Linq.Expressions.BinaryExpression;
using Expression = System.Linq.Expressions.Expression;
using MemberExpression = System.Linq.Expressions.MemberExpression;
using UnaryExpression = System.Linq.Expressions.UnaryExpression;

namespace InfluxDB.Client.Linq.Internal
{
@@ -140,7 +145,51 @@ protected override Expression VisitUnary(UnaryExpression expression)

return base.VisitUnary(expression);
}


protected override Expression VisitMethodCall(MethodCallExpression expression)
{
if (expression.Method.Name.Equals("AggregateWindow"))
{
var member = (MemberExpression)expression.Arguments[0];
if (_context.MemberResolver.ResolveMemberType(member.Member) != MemberType.Timestamp)
{
throw new NotSupportedException(
"AggregateWindow() has to be used only for Timestamp member, e.g. [Column(IsTimestamp = true)].");
}

//
// every
//
var every = (TimeSpan) ((ConstantExpression)expression.Arguments[1]).Value;
Arguments.CheckNotNull(every, "every");
var everyVariable = _context.Variables.AddNamedVariable(every);

//
// period
//
string periodVariable = null;
var period = ((ConstantExpression)expression.Arguments[2]).Value as TimeSpan?;
if (period.HasValue)
{
Arguments.CheckNotNull(period, "period");
periodVariable = _context.Variables.AddNamedVariable(period);
}

//
// fn
//
var fn = ((ConstantExpression)expression.Arguments[3]).Value as string;
Arguments.CheckNonEmptyString(fn, "fn");
var fnVariable = _context.Variables.AddNamedVariable(new Identifier("Identifier", "mean"));

_context.QueryAggregator.AddAggregateWindow(everyVariable, periodVariable, fnVariable);

return expression;
}

return base.VisitMethodCall(expression);
}

protected override Exception CreateUnhandledItemException<T>(T unhandledItem, string visitMethod)
{
var message = $"The expression '{unhandledItem}', type: '{typeof(T)}' is not supported.";
@@ -353,7 +402,7 @@ internal static void NormalizeExpressions(List<IExpressionPart> parts)
foreach (var index in indexes)
{
// ()
if (parts[index + 1] is RightParenthesis)
if (parts.Count > index + 1 && parts[index + 1] is RightParenthesis)
{
parts.RemoveAt(index + 1);
parts.RemoveAt(index);
@@ -369,6 +418,8 @@ internal static void NormalizeExpressions(List<IExpressionPart> parts)
{
parts.RemoveAt(parts.Count - 1);
parts.RemoveAt(0);

NormalizeExpressions(parts);
}
}
}
Loading