Skip to content
New issue

Have a question about this project? # for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “#”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? # to your account

Jaeger/Zipkin Exporter Performance #1274

Merged
merged 12 commits into from
Sep 22, 2020
4 changes: 2 additions & 2 deletions src/OpenTelemetry.Exporter.Jaeger/JaegerExporterOptions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ public class JaegerExporterOptions
{
internal const string DefaultServiceName = "OpenTelemetry Exporter";

internal const int DefaultMaxPacketSize = 65000;
internal const int DefaultMaxPacketSize = 4096;

/// <summary>
/// Gets or sets the name of the service reporting telemetry. Default value: OpenTelemetry Exporter.
Expand All @@ -40,7 +40,7 @@ public class JaegerExporterOptions
public int AgentPort { get; set; } = 6831;

/// <summary>
/// Gets or sets the maximum packet size in bytes. Default value: 65000.
/// Gets or sets the maximum packet size in bytes. Default value: 4096.
/// </summary>
public int? MaxPacketSize { get; set; } = DefaultMaxPacketSize;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,6 @@ internal static class ZipkinActivityConversionExtensions
private static readonly ConcurrentDictionary<string, ZipkinEndpoint> RemoteEndpointCache = new ConcurrentDictionary<string, ZipkinEndpoint>();
#endif

private static readonly DictionaryEnumerator<string, object, AttributeEnumerationState>.ForEachDelegate ProcessTagsRef = ProcessTags;
private static readonly ListEnumerator<ActivityEvent, PooledList<ZipkinAnnotation>>.ForEachDelegate ProcessActivityEventsRef = ProcessActivityEvents;

internal static ZipkinSpan ToZipkinSpan(this Activity activity, ZipkinEndpoint defaultLocalEndpoint, bool useShortTraceIds = false)
Expand All @@ -67,7 +66,7 @@ internal static ZipkinSpan ToZipkinSpan(this Activity activity, ZipkinEndpoint d
Tags = PooledList<KeyValuePair<string, object>>.Create(),
};

DictionaryEnumerator<string, object, AttributeEnumerationState>.AllocationFreeForEach(activity.TagObjects, ref attributeEnumerationState, ProcessTagsRef);
activity.EnumerateTagValues(ref attributeEnumerationState);

var activitySource = activity.Source;
if (!string.IsNullOrEmpty(activitySource.Name))
Expand Down Expand Up @@ -172,58 +171,6 @@ internal static long ToEpochMicroseconds(this DateTime utcDateTime)
return microseconds - UnixEpochMicroseconds;
}

internal static bool ProcessTags(ref AttributeEnumerationState state, KeyValuePair<string, object> attribute)
{
if (attribute.Value == null)
{
return true;
}

if (attribute.Value is string strVal)
{
string key = attribute.Key;
if (RemoteEndpointServiceNameKeyResolutionDictionary.TryGetValue(key, out int priority)
&& (state.RemoteEndpointServiceName == null || priority < state.RemoteEndpointServiceNamePriority))
{
state.RemoteEndpointServiceName = strVal;
state.RemoteEndpointServiceNamePriority = priority;
}
else if (key == SemanticConventions.AttributeNetPeerName)
{
state.HostName = strVal;
}
else if (key == SemanticConventions.AttributeNetPeerIp)
{
state.IpAddress = strVal;
}
else if (key == SemanticConventions.AttributeNetPeerPort && int.TryParse(strVal, out var port))
{
state.Port = port;
}
else if (key == Resource.ServiceNameKey)
{
state.ServiceName = strVal;
}
else if (key == Resource.ServiceNamespaceKey)
{
state.ServiceNamespace = strVal;
}

PooledList<KeyValuePair<string, object>>.Add(ref state.Tags, new KeyValuePair<string, object>(key, strVal));
}
else
{
if (attribute.Value is int intVal && attribute.Key == SemanticConventions.AttributeNetPeerPort)
{
state.Port = intVal;
}

PooledList<KeyValuePair<string, object>>.Add(ref state.Tags, attribute);
}

return true;
}

private static string EncodeTraceId(ActivityTraceId traceId, bool useShortTraceIds)
{
var id = traceId.ToHexString();
Expand Down Expand Up @@ -254,7 +201,7 @@ private static bool ProcessActivityEvents(ref PooledList<ZipkinAnnotation> annot
return true;
}

internal struct AttributeEnumerationState
internal struct AttributeEnumerationState : IActivityTagEnumerator
{
public PooledList<KeyValuePair<string, object>> Tags;

Expand All @@ -271,6 +218,58 @@ internal struct AttributeEnumerationState
public string IpAddress;

public int Port;

public bool ForEach(KeyValuePair<string, object> activityTag)
{
if (activityTag.Value == null)
{
return true;
}

if (activityTag.Value is string strVal)
{
string key = activityTag.Key;
if (RemoteEndpointServiceNameKeyResolutionDictionary.TryGetValue(key, out int priority)
&& (this.RemoteEndpointServiceName == null || priority < this.RemoteEndpointServiceNamePriority))
{
this.RemoteEndpointServiceName = strVal;
this.RemoteEndpointServiceNamePriority = priority;
}
else if (key == SemanticConventions.AttributeNetPeerName)
{
this.HostName = strVal;
}
else if (key == SemanticConventions.AttributeNetPeerIp)
{
this.IpAddress = strVal;
}
else if (key == SemanticConventions.AttributeNetPeerPort && int.TryParse(strVal, out var port))
{
this.Port = port;
}
else if (key == Resource.ServiceNameKey)
{
this.ServiceName = strVal;
}
else if (key == Resource.ServiceNamespaceKey)
{
this.ServiceNamespace = strVal;
}

PooledList<KeyValuePair<string, object>>.Add(ref this.Tags, new KeyValuePair<string, object>(key, strVal));
}
else
{
if (activityTag.Value is int intVal && activityTag.Key == SemanticConventions.AttributeNetPeerPort)
{
this.Port = intVal;
}

PooledList<KeyValuePair<string, object>>.Add(ref this.Tags, activityTag);
}

return true;
}
}
}
}
50 changes: 25 additions & 25 deletions src/OpenTelemetry.Exporter.Zipkin/ZipkinExporter.cs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
// </copyright>

using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.IO;
using System.Net;
Expand Down Expand Up @@ -64,22 +63,23 @@ public override ExportResult Export(in Batch<Activity> batch)

try
{
// take a snapshot of the batch
var activities = new List<Activity>();
foreach (var activity in batch)
var requestUri = this.options.Endpoint;

using var request = new HttpRequestMessage(HttpMethod.Post, requestUri)
{
activities.Add(activity);
}
Content = new JsonContent(this, batch),
};

using var response = this.httpClient.SendAsync(request, CancellationToken.None).GetAwaiter().GetResult();

this.SendBatchActivityAsync(activities, CancellationToken.None).GetAwaiter().GetResult();
response.EnsureSuccessStatusCode();

return ExportResult.Success;
}
catch (Exception ex)
{
ZipkinExporterEventSource.Log.FailedExport(ex);

// TODO distinguish retryable exceptions
return ExportResult.Failure;
}
}
Expand Down Expand Up @@ -140,18 +140,6 @@ private static string ResolveHostName()
return result;
}

private async Task SendBatchActivityAsync(IEnumerable<Activity> batchActivity, CancellationToken cancellationToken)
{
var requestUri = this.options.Endpoint;

using var request = new HttpRequestMessage(HttpMethod.Post, requestUri)
{
Content = new JsonContent(this, batchActivity),
};

await this.httpClient.SendAsync(request, cancellationToken);
}

private ZipkinEndpoint GetLocalZipkinEndpoint()
{
var hostName = ResolveHostName();
Expand Down Expand Up @@ -179,18 +167,18 @@ private class JsonContent : HttpContent
};

private readonly ZipkinExporter exporter;
private readonly IEnumerable<Activity> batchActivity;
private readonly Batch<Activity> batch;

#if NET452
private JsonWriter writer;
#else
private Utf8JsonWriter writer;
#endif

public JsonContent(ZipkinExporter exporter, IEnumerable<Activity> batchActivity)
public JsonContent(ZipkinExporter exporter, in Batch<Activity> batch)
{
this.exporter = exporter;
this.batchActivity = batchActivity;
this.batch = batch;

this.Headers.ContentType = JsonHeader;
}
Expand All @@ -213,18 +201,30 @@ protected override Task SerializeToStreamAsync(Stream stream, TransportContext c

this.writer.WriteStartArray();

foreach (var activity in this.batchActivity)
foreach (var activity in this.batch)
{
var zipkinSpan = activity.ToZipkinSpan(this.exporter.LocalEndpoint, this.exporter.options.UseShortTraceIds);

zipkinSpan.Write(this.writer);

zipkinSpan.Return();
#if !NET452
if (this.writer.BytesPending >= this.exporter.options.MaxPacketSize)
{
this.writer.Flush();
}
#endif
}

this.writer.WriteEndArray();

return this.writer.FlushAsync();
this.writer.Flush();

#if NET452
return Task.FromResult(true);
#else
return Task.CompletedTask;
#endif
}

protected override bool TryComputeLength(out long length)
Expand Down
27 changes: 18 additions & 9 deletions src/OpenTelemetry.Exporter.Zipkin/ZipkinExporterOptions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
// </copyright>

using System;

namespace OpenTelemetry.Exporter.Zipkin
Expand All @@ -22,25 +23,33 @@ namespace OpenTelemetry.Exporter.Zipkin
/// </summary>
public sealed class ZipkinExporterOptions
{
/// <summary>
/// Gets or sets Zipkin endpoint address. See https://zipkin.io/zipkin-api/#/default/post_spans.
/// Typically https://zipkin-server-name:9411/api/v2/spans.
/// </summary>
public Uri Endpoint { get; set; } = new Uri("http://localhost:9411/api/v2/spans");
internal const string DefaultServiceName = "OpenTelemetry Exporter";

#if !NET452
internal const int DefaultMaxPacketSize = 4096;
#endif

/// <summary>
/// Gets or sets timeout in seconds.
/// Gets or sets the name of the service reporting telemetry.
/// </summary>
public TimeSpan TimeoutSeconds { get; set; } = TimeSpan.FromSeconds(10);
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TimeoutSeconds wasn't actually being used. The ActivityProcessor now owns timeout, so I just removed it.

public string ServiceName { get; set; } = DefaultServiceName;

/// <summary>
/// Gets or sets the name of the service reporting telemetry.
/// Gets or sets Zipkin endpoint address. See https://zipkin.io/zipkin-api/#/default/post_spans.
/// Typically https://zipkin-server-name:9411/api/v2/spans.
/// </summary>
public string ServiceName { get; set; } = "OpenTelemetry Exporter";
public Uri Endpoint { get; set; } = new Uri("http://localhost:9411/api/v2/spans");

/// <summary>
/// Gets or sets a value indicating whether short trace id should be used.
/// </summary>
public bool UseShortTraceIds { get; set; }

#if !NET452
/// <summary>
/// Gets or sets the maximum packet size in bytes. Default value: 4096.
/// </summary>
public int? MaxPacketSize { get; set; } = DefaultMaxPacketSize;
CodeBlanch marked this conversation as resolved.
Show resolved Hide resolved
#endif
}
}
Loading