diff --git a/CHANGELOG.md b/CHANGELOG.md index 5180813..2c8d903 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,8 @@ ## 1.2.0 [unreleased] +### Features +1. [#81](https://github.com/influxdata/influxdb-csharp/pull/81): Add compression to LineProtocolClient + ### CI 1. [#86](https://github.com/influxdata/influxdb-csharp/pull/86): AppVeyor deploy preview releases @@ -8,4 +11,3 @@ ### Features 1. [#52](https://github.com/influxdata/influxdb-csharp/pull/52): Basic Changes and Move to .NET Standard 2.0 / Framework 4.6.1 1. [#59](https://github.com/influxdata/influxdb-csharp/pull/59): Add maxBatchSize parameter to IntervalBatcher - diff --git a/src/InfluxDB.LineProtocol/Client/LineProtocolClient.cs b/src/InfluxDB.LineProtocol/Client/LineProtocolClient.cs index 279962c..aefeaaf 100644 --- a/src/InfluxDB.LineProtocol/Client/LineProtocolClient.cs +++ b/src/InfluxDB.LineProtocol/Client/LineProtocolClient.cs @@ -1,8 +1,9 @@ using InfluxDB.LineProtocol.Payload; using System; using System.IO; -using System.Net; +using System.IO.Compression; using System.Net.Http; +using System.Net.Http.Headers; using System.Net.Sockets; using System.Text; using System.Threading; @@ -13,9 +14,10 @@ namespace InfluxDB.LineProtocol.Client public class LineProtocolClient : LineProtocolClientBase { private readonly HttpClient _httpClient; + private readonly bool _enableCompression; - public LineProtocolClient(Uri serverBaseAddress, string database, string username = null, string password = null) - : this(new HttpClientHandler(), serverBaseAddress, database, username, password) + public LineProtocolClient(Uri serverBaseAddress, string database, string username = null, string password = null, bool enableCompression = false) + : this(new HttpClientHandler(), serverBaseAddress, database, username, password, enableCompression) { } @@ -24,7 +26,8 @@ protected LineProtocolClient( Uri serverBaseAddress, string database, string username, - string password) + string password, + bool enableCompression) :base(serverBaseAddress, database, username, password) { if (serverBaseAddress == null) @@ -34,6 +37,7 @@ protected LineProtocolClient( // Overload that allows injecting handler is protected to avoid HttpMessageHandler being part of our public api which would force clients to reference System.Net.Http when using the lib. _httpClient = new HttpClient(handler) { BaseAddress = serverBaseAddress }; + _enableCompression = enableCompression; } protected override async Task OnSendAsync( @@ -64,7 +68,21 @@ protected override async Task OnSendAsync( break; } - var content = new StringContent(payload, Encoding.UTF8); + HttpContent content; + + if (_enableCompression) + { + var compressed = Compress(Encoding.UTF8.GetBytes(payload)); + + content = new ByteArrayContent(compressed); + content.Headers.ContentEncoding.Add("gzip"); + content.Headers.ContentType = new MediaTypeHeaderValue("text/plain") { CharSet = "utf-8" }; + } + else + { + content = new StringContent(payload, Encoding.UTF8); + } + var response = await _httpClient.PostAsync(endpoint, content, cancellationToken).ConfigureAwait(false); if (response.IsSuccessStatusCode) { @@ -80,5 +98,16 @@ protected override async Task OnSendAsync( return new LineProtocolWriteResult(false, $"{response.StatusCode} {response.ReasonPhrase} {body}"); } + + private byte[] Compress(byte[] input) + { + using (var ms = new MemoryStream()) + { + using (var gz = new GZipStream(ms, CompressionLevel.Fastest)) + gz.Write(input, 0, input.Length); + + return ms.ToArray(); + } + } } } diff --git a/src/InfluxDB.LineProtocol/InfluxDB.LineProtocol.csproj b/src/InfluxDB.LineProtocol/InfluxDB.LineProtocol.csproj index e07f122..d549aaa 100644 --- a/src/InfluxDB.LineProtocol/InfluxDB.LineProtocol.csproj +++ b/src/InfluxDB.LineProtocol/InfluxDB.LineProtocol.csproj @@ -18,6 +18,7 @@ + diff --git a/test/InfluxDB.LineProtocol.Tests/Client/LineProtocolClientTests.cs b/test/InfluxDB.LineProtocol.Tests/Client/LineProtocolClientTests.cs new file mode 100644 index 0000000..2e1bd7f --- /dev/null +++ b/test/InfluxDB.LineProtocol.Tests/Client/LineProtocolClientTests.cs @@ -0,0 +1,70 @@ +using System.Collections.Generic; +using System.IO; +using System.IO.Compression; +using System.Net; +using System.Text; +using System.Threading.Tasks; +using InfluxDB.LineProtocol.Payload; +using RichardSzalay.MockHttp; +using Xunit; + +namespace InfluxDB.LineProtocol.Tests.Client +{ + public class LineProtocolClientTests + { + [Fact] + public async Task Sending_uncompressed_data() + { + var client = new MockLineProtocolClient("foo", false); + + var payload = new LineProtocolPayload(); + + payload.Add(new LineProtocolPoint("bar", new Dictionary { { "baz", 42 } })); + + client.Handler + .Expect($"{client.BaseAddress}write?db=foo") + .WithContent("bar baz=42i\n") + .Respond(HttpStatusCode.NoContent); + + var result = await client.WriteAsync(payload); + + Assert.True(result.Success); + } + + [Fact] + public async Task Sending_compressed_data() + { + var client = new MockLineProtocolClient("foo", true); + + var payload = new LineProtocolPayload(); + + payload.Add(new LineProtocolPoint("bar", new Dictionary { { "baz", 42 } })); + + client.Handler + .Expect($"{client.BaseAddress}write?db=foo") + .WithHeaders("Content-Encoding", "gzip") + .With(req => + { + var expected = "bar baz=42i\n"; + var actual = Gunzip(req.Content.ReadAsStreamAsync().Result); + + return expected == actual; + }) + .Respond(HttpStatusCode.NoContent); + + var result = await client.WriteAsync(payload); + + Assert.True(result.Success); + } + + private string Gunzip(Stream compressed) + { + using (var decompressed = new MemoryStream()) + using (var gunzip = new GZipStream(compressed, CompressionMode.Decompress)) + { + gunzip.CopyTo(decompressed); + return Encoding.UTF8.GetString(decompressed.ToArray()); + } + } + } +} diff --git a/test/InfluxDB.LineProtocol.Tests/Client/MockLineProtocolClient.cs b/test/InfluxDB.LineProtocol.Tests/Client/MockLineProtocolClient.cs index 47cab05..5377353 100644 --- a/test/InfluxDB.LineProtocol.Tests/Client/MockLineProtocolClient.cs +++ b/test/InfluxDB.LineProtocol.Tests/Client/MockLineProtocolClient.cs @@ -7,11 +7,11 @@ namespace InfluxDB.LineProtocol.Tests.Client { public class MockLineProtocolClient : LineProtocolClient { - public MockLineProtocolClient(string database) : this(new MockHttpMessageHandler(), new Uri("http://localhost:8086"), database) + public MockLineProtocolClient(string database, bool enableCompression = false) : this(new MockHttpMessageHandler(), new Uri("http://localhost:8086"), database, enableCompression) { } - private MockLineProtocolClient(MockHttpMessageHandler handler, Uri serverBaseAddress, string database) : base(handler, serverBaseAddress, database, null, null) + private MockLineProtocolClient(MockHttpMessageHandler handler, Uri serverBaseAddress, string database, bool enableCompression) : base(handler, serverBaseAddress, database, null, null, enableCompression) { Handler = handler; BaseAddress = serverBaseAddress;