Skip to content

feat: improve WriteApi performance #97

New issue

Have a question about this project? # for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “#”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? # to your account

Merged
merged 13 commits into from
Jul 28, 2020
1 change: 1 addition & 0 deletions Client/Client.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@

<ItemGroup>
<PackageReference Include="JsonSubTypes" Version="1.5.2" />
<PackageReference Include="Microsoft.Extensions.ObjectPool" Version="3.1.4" />
<PackageReference Include="Microsoft.Net.Http.Headers" Version="2.1.1" />
<PackageReference Include="System.Collections.Immutable" Version="1.7.1" />
<PackageReference Include="System.Configuration.ConfigurationManager" Version="4.5.0" />
Expand Down
72 changes: 41 additions & 31 deletions Client/WriteApi.cs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
using InfluxDB.Client.Core.Exceptions;
using InfluxDB.Client.Internal;
using InfluxDB.Client.Writes;
using Microsoft.Extensions.ObjectPool;
using RestSharp;

namespace InfluxDB.Client
Expand All @@ -26,8 +27,11 @@ public class WriteApi : IDisposable
private readonly MeasurementMapper _measurementMapper = new MeasurementMapper();
private readonly InfluxDBClientOptions _options;
private readonly Subject<BatchWriteData> _subject = new Subject<BatchWriteData>();
private static readonly ObjectPoolProvider _objectPoolProvider = new DefaultObjectPoolProvider();
private static readonly ObjectPool<StringBuilder> _stringBuilderPool = _objectPoolProvider.CreateStringBuilderPool();
private readonly IDisposable _unsubscribeDisposeCommand;


private bool _disposed;
protected internal WriteApi(
InfluxDBClientOptions options,
Expand All @@ -52,26 +56,25 @@ protected internal WriteApi(
//
// https://github.com/dotnet/reactive/issues/19

var tempBoundary = new Subject<IObservable<BatchWriteData>>();

_subject

IObservable<IObservable<BatchWriteRecord>> batches = _subject
//
// Batching
//
.Publish(connectedSource =>
{
var trigger = Observable.Merge(
// triggered by time & count
connectedSource.Window(TimeSpan.FromMilliseconds(
writeOptions.FlushInterval),
writeOptions.BatchSize,
writeOptions.WriteScheduler),
// flush trigger
_flush
);
return connectedSource
.Window(tempBoundary)
.Merge(Observable.Defer(() =>
{
connectedSource
.Window(TimeSpan.FromMilliseconds(writeOptions.FlushInterval), writeOptions.BatchSize,
writeOptions.WriteScheduler)
.Merge(_flush)
.Subscribe(tempBoundary);

return Observable.Empty<IObservable<BatchWriteData>>();
}));
.Window(trigger);
})
//
// Group by key - same bucket, same org
Expand All @@ -83,7 +86,7 @@ protected internal WriteApi(
.Select(grouped =>
{
var aggregate = grouped
.Aggregate(new StringBuilder(""), (builder, batchWrite) =>
.Aggregate(_stringBuilderPool.Get(), (builder, batchWrite) =>
{
var data = batchWrite.ToLineProtocol();

Expand All @@ -95,27 +98,34 @@ protected internal WriteApi(
}

return builder.Append(data);
}).Select(builder => builder.ToString());

return aggregate.Select(records => new BatchWriteRecord(grouped.Key, records));
})
//
// Jitter
//
.Select(source =>
{
if (writeOptions.JitterInterval <= 0)
{
return source;
}
}).Select(builder =>
{
var result = builder.ToString();
builder.Clear();
_stringBuilderPool.Return(builder);
return result;
});

return source.Delay(_ => Observable.Timer(TimeSpan.FromMilliseconds(JitterDelay(writeOptions)), writeOptions.WriteScheduler));
})
.Concat()
return aggregate.Select(records => new BatchWriteRecord(grouped.Key, records))
.Where(batchWriteItem => !string.IsNullOrEmpty(batchWriteItem.ToLineProtocol()));
});

if (writeOptions.JitterInterval > 0)
{
batches = batches
//
// Jitter
//
.Select(source =>
{
return source.Delay(_ => Observable.Timer(TimeSpan.FromMilliseconds(JitterDelay(writeOptions)), writeOptions.WriteScheduler));
});
}
var query = batches
.Concat()
//
// Map to Async request
//
.Where(batchWriteItem => !string.IsNullOrEmpty(batchWriteItem.ToLineProtocol()))
.Select(batchWriteItem =>
{
var org = batchWriteItem.Options.OrganizationId;
Expand Down
20 changes: 10 additions & 10 deletions Client/Writes/PointData.cs
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,16 @@ private PointData(string measurementName)
Precision = WritePrecision.Ns;
}

/// <summary>
/// Create a new Point withe specified a measurement name.
/// </summary>
/// <param name="measurementName">the measurement name</param>
/// <returns>the new Point</returns>
public static PointData Measurement(string measurementName)
{
return new PointData(measurementName);
}

private PointData(string measurementName,
WritePrecision precision,
BigInteger? time,
Expand All @@ -48,16 +58,6 @@ private PointData(string measurementName,
_fields = fields;
}

/// <summary>
/// Create a new Point withe specified a measurement name.
/// </summary>
/// <param name="measurementName">the measurement name</param>
/// <returns>the new Point</returns>
public static PointData Measurement(string measurementName)
{
return new PointData(measurementName);
}

/// <summary>
/// Adds or replaces a tag value for a point.
/// </summary>
Expand Down