Skip to content

Commit d6b6bf6

Browse files
rolincovabednar
authored andcommitted
feat: Added WriteApiAsync for simple asynchronous write without batching (#23)
1 parent 367ccd6 commit d6b6bf6

File tree

5 files changed

+532
-1
lines changed

5 files changed

+532
-1
lines changed

CHANGELOG.md

+1
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22

33
### Features
44
1. [#96](https://github.com/influxdata/influxdb-client-csharp/pull/96): The PointData builder is now immutable
5+
1. [#23](https://github.com/influxdata/influxdb-client-csharp/issues/23): Added WriteApiAsync for asynchronous write without batching
56

67
### API
78
1. [#94](https://github.com/influxdata/influxdb-client-csharp/pull/94): Update swagger to latest version

Client.Test/ItWriteApiAsyncTest.cs

+160
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,160 @@
1+
using System;
2+
using System.Collections.Generic;
3+
using System.Linq;
4+
using System.Threading.Tasks;
5+
using InfluxDB.Client.Api.Domain;
6+
using InfluxDB.Client.Core;
7+
using InfluxDB.Client.Core.Flux.Domain;
8+
using InfluxDB.Client.Core.Test;
9+
using InfluxDB.Client.Writes;
10+
using NodaTime;
11+
using NUnit.Framework;
12+
13+
namespace InfluxDB.Client.Test
14+
{
15+
[TestFixture]
16+
public class ItWriteApiAsyncTest : AbstractItClientTest
17+
{
18+
[SetUp]
19+
public new async Task SetUp()
20+
{
21+
_organization = await FindMyOrg();
22+
23+
var retention = new BucketRetentionRules(BucketRetentionRules.TypeEnum.Expire, 3600);
24+
25+
_bucket = await Client.GetBucketsApi()
26+
.CreateBucketAsync(GenerateName("h2o"), retention, _organization);
27+
28+
//
29+
// Add Permissions to read and write to the Bucket
30+
//
31+
var resource = new PermissionResource(PermissionResource.TypeEnum.Buckets, _bucket.Id, null,
32+
_organization.Id);
33+
34+
var readBucket = new Permission(Permission.ActionEnum.Read, resource);
35+
var writeBucket = new Permission(Permission.ActionEnum.Write, resource);
36+
37+
var loggedUser = await Client.GetUsersApi().MeAsync();
38+
Assert.IsNotNull(loggedUser);
39+
40+
var authorization = await Client.GetAuthorizationsApi()
41+
.CreateAuthorizationAsync(_organization,
42+
new List<Permission> {readBucket, writeBucket});
43+
44+
_token = authorization.Token;
45+
46+
Client.Dispose();
47+
var options = new InfluxDBClientOptions.Builder().Url(InfluxDbUrl).AuthenticateToken(_token.ToCharArray())
48+
.Org(_organization.Id).Bucket(_bucket.Id).Build();
49+
Client = InfluxDBClientFactory.Create(options);
50+
51+
_writeApi = Client.GetWriteApiAsync();
52+
}
53+
54+
private Bucket _bucket;
55+
private Organization _organization;
56+
private string _token;
57+
58+
private WriteApiAsync _writeApi;
59+
60+
[Measurement("h2o")]
61+
private class H20Measurement
62+
{
63+
[Column("location", IsTag = true)] public string Location { get; set; }
64+
65+
[Column("water_level")] public double? Level { get; set; }
66+
67+
[Column(IsTimestamp = true)] public DateTime Time { get; set; }
68+
}
69+
70+
[Test]
71+
public async Task Write()
72+
{
73+
// By LineProtocol
74+
await _writeApi.WriteRecordAsync(WritePrecision.S, "h2o,location=coyote_creek water_level=1.0 1");
75+
await _writeApi.WriteRecordAsync(_bucket.Name, _organization.Name, WritePrecision.S,
76+
"h2o,location=coyote_creek water_level=2.0 2");
77+
await _writeApi.WriteRecordsAsync(WritePrecision.S,
78+
new List<string>
79+
{
80+
"h2o,location=coyote_creek water_level=3.0 3",
81+
"h2o,location=coyote_creek water_level=4.0 4"
82+
});
83+
await _writeApi.WriteRecordsAsync(_bucket.Name, _organization.Name, WritePrecision.S,
84+
"h2o,location=coyote_creek water_level=5.0 5",
85+
"h2o,location=coyote_creek water_level=6.0 6");
86+
87+
// By DataPoint
88+
await _writeApi.WritePointAsync(PointData.Measurement("h2o").Tag("location", "coyote_creek")
89+
.Field("water_level", 7.0D).Timestamp(7L, WritePrecision.S));
90+
await _writeApi.WritePointAsync(_bucket.Name, _organization.Name,
91+
PointData.Measurement("h2o").Tag("location", "coyote_creek").Field("water_level", 8.0D)
92+
.Timestamp(8L, WritePrecision.S));
93+
await _writeApi.WritePointsAsync(new List<PointData>
94+
{
95+
PointData.Measurement("h2o").Tag("location", "coyote_creek").Field("water_level", 9.0D)
96+
.Timestamp(9L, WritePrecision.S),
97+
PointData.Measurement("h2o").Tag("location", "coyote_creek").Field("water_level", 10.0D)
98+
.Timestamp(10L, WritePrecision.S)
99+
});
100+
await _writeApi.WritePointsAsync(_bucket.Name, _organization.Name,
101+
PointData.Measurement("h2o").Tag("location", "coyote_creek")
102+
.Field("water_level", 11.0D)
103+
.Timestamp(11L, WritePrecision.S),
104+
PointData.Measurement("h2o").Tag("location", "coyote_creek")
105+
.Field("water_level", 12.0D)
106+
.Timestamp(12L, WritePrecision.S));
107+
108+
// By Measurement
109+
DateTime dtDateTime = new DateTime(1970, 1, 1, 0, 0, 0, 0, DateTimeKind.Utc);
110+
await _writeApi.WriteMeasurementAsync(WritePrecision.S,
111+
new H20Measurement
112+
{
113+
Location = "coyote_creek", Level = 13.0D, Time = dtDateTime.AddSeconds(13)
114+
});
115+
116+
await _writeApi.WriteMeasurementAsync(_bucket.Name, _organization.Name, WritePrecision.S,
117+
new H20Measurement
118+
{
119+
Location = "coyote_creek", Level = 14.0D, Time = dtDateTime.AddSeconds(14)
120+
});
121+
await _writeApi.WriteMeasurementsAsync(WritePrecision.S, new List<H20Measurement>
122+
{
123+
new H20Measurement
124+
{
125+
Location = "coyote_creek", Level = 15.0D, Time = dtDateTime.AddSeconds(15)
126+
},
127+
new H20Measurement
128+
{
129+
Location = "coyote_creek", Level = 16.0D, Time = dtDateTime.AddSeconds(16)
130+
}
131+
});
132+
await _writeApi.WriteMeasurementsAsync(_bucket.Name, _organization.Name, WritePrecision.S,
133+
new H20Measurement
134+
{
135+
Location = "coyote_creek", Level = 17.0D, Time = dtDateTime.AddSeconds(17)
136+
},
137+
new H20Measurement
138+
{
139+
Location = "coyote_creek", Level = 18.0D, Time = dtDateTime.AddSeconds(18)
140+
});
141+
142+
List<FluxTable> query = await Client.GetQueryApi().QueryAsync(
143+
"from(bucket:\"" + _bucket.Name +
144+
"\") |> range(start: 1970-01-01T00:00:00.000000001Z)",
145+
_organization.Id);
146+
147+
Assert.AreEqual(1, query.Count);
148+
Assert.AreEqual(18, query[0].Records.Count);
149+
150+
for (var ii = 1; ii <= 17; ii++)
151+
{
152+
var record = query[0].Records[ii - 1];
153+
Assert.AreEqual("h2o", record.GetMeasurement());
154+
Assert.AreEqual((double) ii, record.GetValue());
155+
Assert.AreEqual("water_level", record.GetField());
156+
Assert.AreEqual(Instant.FromDateTimeUtc(dtDateTime.AddSeconds(ii)), record.GetTime());
157+
}
158+
}
159+
}
160+
}

Client/InfluxDBClient.cs

+14
Original file line numberDiff line numberDiff line change
@@ -104,6 +104,20 @@ public WriteApi GetWriteApi()
104104
{
105105
return GetWriteApi(WriteOptions.CreateNew().Build());
106106
}
107+
108+
/// <summary>
109+
/// Get the Write async client.
110+
/// </summary>
111+
/// <returns>the new client instance for the Write API Async without batching</returns>
112+
public WriteApiAsync GetWriteApiAsync()
113+
{
114+
var service = new WriteService((Configuration) _apiClient.Configuration)
115+
{
116+
ExceptionFactory = _exceptionFactory
117+
};
118+
119+
return new WriteApiAsync(_options, service, this);
120+
}
107121

108122
/// <summary>
109123
/// Get the Write client.

Client/README.md

+53-1
Original file line numberDiff line numberDiff line change
@@ -300,7 +300,8 @@ namespace Examples
300300

301301
## Writes
302302

303-
For writing data we use [WriteApi](https://github.com/influxdata/influxdb-client-csharp/blob/master/Client/WriteApi.cs#L1) that supports:
303+
For writing data we use [WriteApi](https://github.com/influxdata/influxdb-client-csharp/blob/master/Client/WriteApi.cs#L1) or
304+
[WriteApiAsync](https://github.com/influxdata/influxdb-client-csharp/blob/feat/write-api-async/Client/WriteApiAsync.cs) which is simplified version of WriteApi without batching support both supports:
304305

305306
1. writing data using [InfluxDB Line Protocol](https://docs.influxdata.com/influxdb/v1.6/write_protocols/line_protocol_tutorial/), Data Point, POCO
306307
2. use batching for writes
@@ -499,6 +500,57 @@ namespace Examples
499500
}
500501
```
501502

503+
#### Using WriteApiAsync
504+
```c#
505+
using InfluxDB.Client;
506+
using InfluxDB.Client.Api.Domain;
507+
508+
namespace Examples
509+
{
510+
public static class WriteLineProtocol
511+
{
512+
private static readonly char[] Token = "".ToCharArray();
513+
514+
public static void Main(string[] args)
515+
{
516+
var influxDBClient = InfluxDBClientFactory.Create("http://localhost:9999", Token);
517+
518+
//
519+
// Write Data
520+
//
521+
using (var writeApiAsync = influxDBClient.GetWriteApiAsync())
522+
{
523+
//
524+
//
525+
// Write by LineProtocol
526+
//
527+
writeApiAsync.WriteRecordAsync("bucket_name", "org_id", WritePrecision.Ns, "temperature,location=north value=60.0");
528+
529+
//
530+
//
531+
// Write by Data Point
532+
//
533+
var point = PointData.Measurement("temperature")
534+
.Tag("location", "west")
535+
.Field("value", 55D)
536+
.Timestamp(DateTime.UtcNow.AddSeconds(-10), WritePrecision.Ns);
537+
538+
writeApiAsync.WritePointAsync("bucket_name", "org_id", point);
539+
540+
//
541+
// Write by POCO
542+
//
543+
var temperature = new Temperature {Location = "south", Value = 62D, Time = DateTime.UtcNow};
544+
545+
writeApiAsync.WriteMeasurementAsync("bucket_name", "org_id", WritePrecision.Ns, temperature);
546+
}
547+
548+
influxDBClient.Dispose();
549+
}
550+
}
551+
}
552+
```
553+
502554
#### Default Tags
503555

504556
Sometimes is useful to store same information in every measurement e.g. `hostname`, `location`, `customer`.

0 commit comments

Comments
 (0)