Skip to content
This repository has been archived by the owner on Jul 18, 2023. It is now read-only.

Add compression to LineProtocolClient #81

Merged
merged 8 commits into from
Jun 4, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
## 1.2.0 [unreleased]

### Features
1. [#81](https://github.com/influxdata/influxdb-csharp/pull/81): Add compression to LineProtocolClient

### CI
1. [#86](https://github.com/influxdata/influxdb-csharp/pull/86): AppVeyor deploy preview releases

Expand All @@ -8,4 +11,3 @@
### Features
1. [#52](https://github.com/influxdata/influxdb-csharp/pull/52): Basic Changes and Move to .NET Standard 2.0 / Framework 4.6.1
1. [#59](https://github.com/influxdata/influxdb-csharp/pull/59): Add maxBatchSize parameter to IntervalBatcher

39 changes: 34 additions & 5 deletions src/InfluxDB.LineProtocol/Client/LineProtocolClient.cs
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
using InfluxDB.LineProtocol.Payload;
using System;
using System.IO;
using System.Net;
using System.IO.Compression;
using System.Net.Http;
using System.Net.Http.Headers;
using System.Net.Sockets;
using System.Text;
using System.Threading;
Expand All @@ -13,9 +14,10 @@ namespace InfluxDB.LineProtocol.Client
public class LineProtocolClient : LineProtocolClientBase
{
private readonly HttpClient _httpClient;
private readonly bool _enableCompression;

public LineProtocolClient(Uri serverBaseAddress, string database, string username = null, string password = null)
: this(new HttpClientHandler(), serverBaseAddress, database, username, password)
public LineProtocolClient(Uri serverBaseAddress, string database, string username = null, string password = null, bool enableCompression = false)
: this(new HttpClientHandler(), serverBaseAddress, database, username, password, enableCompression)
{
}

Expand All @@ -24,7 +26,8 @@ protected LineProtocolClient(
Uri serverBaseAddress,
string database,
string username,
string password)
string password,
bool enableCompression)
:base(serverBaseAddress, database, username, password)
{
if (serverBaseAddress == null)
Expand All @@ -34,6 +37,7 @@ protected LineProtocolClient(

// Overload that allows injecting handler is protected to avoid HttpMessageHandler being part of our public api which would force clients to reference System.Net.Http when using the lib.
_httpClient = new HttpClient(handler) { BaseAddress = serverBaseAddress };
_enableCompression = enableCompression;
}

protected override async Task<LineProtocolWriteResult> OnSendAsync(
Expand Down Expand Up @@ -64,7 +68,21 @@ protected override async Task<LineProtocolWriteResult> OnSendAsync(
break;
}

var content = new StringContent(payload, Encoding.UTF8);
HttpContent content;

if (_enableCompression)
{
var compressed = Compress(Encoding.UTF8.GetBytes(payload));

content = new ByteArrayContent(compressed);
content.Headers.ContentEncoding.Add("gzip");
content.Headers.ContentType = new MediaTypeHeaderValue("text/plain") { CharSet = "utf-8" };
}
else
{
content = new StringContent(payload, Encoding.UTF8);
}

var response = await _httpClient.PostAsync(endpoint, content, cancellationToken).ConfigureAwait(false);
if (response.IsSuccessStatusCode)
{
Expand All @@ -80,5 +98,16 @@ protected override async Task<LineProtocolWriteResult> OnSendAsync(

return new LineProtocolWriteResult(false, $"{response.StatusCode} {response.ReasonPhrase} {body}");
}

private byte[] Compress(byte[] input)
{
using (var ms = new MemoryStream())
{
using (var gz = new GZipStream(ms, CompressionLevel.Fastest))
gz.Write(input, 0, input.Length);

return ms.ToArray();
}
}
}
}
1 change: 1 addition & 0 deletions src/InfluxDB.LineProtocol/InfluxDB.LineProtocol.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

<ItemGroup>
<PackageReference Include="System.Collections" Version="4.3.0" />
<PackageReference Include="System.IO.Compression" Version="4.3.0" />
<PackageReference Include="System.Linq" Version="4.3.0" />
<PackageReference Include="System.Threading" Version="4.3.0" />
<PackageReference Include="System.Net.Http" Version="4.3.3" />
Expand Down
70 changes: 70 additions & 0 deletions test/InfluxDB.LineProtocol.Tests/Client/LineProtocolClientTests.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
using System.Collections.Generic;
using System.IO;
using System.IO.Compression;
using System.Net;
using System.Text;
using System.Threading.Tasks;
using InfluxDB.LineProtocol.Payload;
using RichardSzalay.MockHttp;
using Xunit;

namespace InfluxDB.LineProtocol.Tests.Client
{
public class LineProtocolClientTests
{
[Fact]
public async Task Sending_uncompressed_data()
{
var client = new MockLineProtocolClient("foo", false);

var payload = new LineProtocolPayload();

payload.Add(new LineProtocolPoint("bar", new Dictionary<string, object> { { "baz", 42 } }));

client.Handler
.Expect($"{client.BaseAddress}write?db=foo")
.WithContent("bar baz=42i\n")
.Respond(HttpStatusCode.NoContent);

var result = await client.WriteAsync(payload);

Assert.True(result.Success);
}

[Fact]
public async Task Sending_compressed_data()
{
var client = new MockLineProtocolClient("foo", true);

var payload = new LineProtocolPayload();

payload.Add(new LineProtocolPoint("bar", new Dictionary<string, object> { { "baz", 42 } }));

client.Handler
.Expect($"{client.BaseAddress}write?db=foo")
.WithHeaders("Content-Encoding", "gzip")
.With(req =>
{
var expected = "bar baz=42i\n";
var actual = Gunzip(req.Content.ReadAsStreamAsync().Result);

return expected == actual;
})
.Respond(HttpStatusCode.NoContent);

var result = await client.WriteAsync(payload);

Assert.True(result.Success);
}

private string Gunzip(Stream compressed)
{
using (var decompressed = new MemoryStream())
using (var gunzip = new GZipStream(compressed, CompressionMode.Decompress))
{
gunzip.CopyTo(decompressed);
return Encoding.UTF8.GetString(decompressed.ToArray());
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,11 @@ namespace InfluxDB.LineProtocol.Tests.Client
{
public class MockLineProtocolClient : LineProtocolClient
{
public MockLineProtocolClient(string database) : this(new MockHttpMessageHandler(), new Uri("http://localhost:8086"), database)
public MockLineProtocolClient(string database, bool enableCompression = false) : this(new MockHttpMessageHandler(), new Uri("http://localhost:8086"), database, enableCompression)
{
}

private MockLineProtocolClient(MockHttpMessageHandler handler, Uri serverBaseAddress, string database) : base(handler, serverBaseAddress, database, null, null)
private MockLineProtocolClient(MockHttpMessageHandler handler, Uri serverBaseAddress, string database, bool enableCompression) : base(handler, serverBaseAddress, database, null, null, enableCompression)
{
Handler = handler;
BaseAddress = serverBaseAddress;
Expand Down