Skip to content
New issue

Have a question about this project? # for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “#”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? # to your account

Provider pattern for EventPipleline and new SyncEventPipeline for synchronous flush #101

Merged
merged 4 commits into from
Apr 30, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion Analytics-CSharp/Segment/Analytics/Configuration.cs
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,8 @@ private set

public IList<IFlushPolicy> FlushPolicies { get; }

public IEventPipelineProvider EventPipelineProvider { get; }

/// <summary>
/// Configuration that analytics can use
/// </summary>
Expand Down Expand Up @@ -82,7 +84,8 @@ public Configuration(string writeKey,
IAnalyticsErrorHandler analyticsErrorHandler = null,
IStorageProvider storageProvider = default,
IHTTPClientProvider httpClientProvider = default,
IList<IFlushPolicy> flushPolicies = default)
IList<IFlushPolicy> flushPolicies = default,
EventPipelineProvider eventPipelineProvider = default)
{
WriteKey = writeKey;
FlushAt = flushAt;
Expand All @@ -98,6 +101,7 @@ public Configuration(string writeKey,
FlushPolicies = flushPolicies == null ? new ConcurrentList<IFlushPolicy>() : new ConcurrentList<IFlushPolicy>(flushPolicies);
FlushPolicies.Add(new CountFlushPolicy(flushAt));
FlushPolicies.Add(new FrequencyFlushPolicy(flushInterval * 1000L));
EventPipelineProvider = eventPipelineProvider ?? new EventPipelineProvider();
}

public Configuration(string writeKey,
Expand Down
10 changes: 2 additions & 8 deletions Analytics-CSharp/Segment/Analytics/Plugins/SegmentDestination.cs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ namespace Segment.Analytics.Plugins
/// </summary>
public class SegmentDestination : DestinationPlugin, ISubscriber
{
private EventPipeline _pipeline = null;
private IEventPipeline _pipeline = null;

public override string Key => "Segment.io";

Expand Down Expand Up @@ -64,13 +64,7 @@ public override void Configure(Analytics analytics)
// Add DestinationMetadata enrichment plugin
Add(new DestinationMetadataPlugin());

_pipeline = new EventPipeline(
analytics,
Key,
analytics.Configuration.WriteKey,
analytics.Configuration.FlushPolicies,
analytics.Configuration.ApiHost
);
_pipeline = analytics.Configuration.EventPipelineProvider.Create(analytics, Key);

analytics.AnalyticsScope.Launch(analytics.AnalyticsDispatcher, async () =>
{
Expand Down
4 changes: 2 additions & 2 deletions Analytics-CSharp/Segment/Analytics/Utilities/EventPipeline.cs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@

namespace Segment.Analytics.Utilities
{
internal class EventPipeline
public class EventPipeline: IEventPipeline
{
private readonly Analytics _analytics;

Expand All @@ -23,7 +23,7 @@ internal class EventPipeline

private readonly IStorage _storage;

internal string ApiHost { get; set; }
public string ApiHost { get; set; }

public bool Running { get; private set; }

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
namespace Segment.Analytics.Utilities
{
public class EventPipelineProvider:IEventPipelineProvider
{
public EventPipelineProvider()
{
}

public IEventPipeline Create(Analytics analytics, string key)
{
return new EventPipeline(analytics, key,
analytics.Configuration.WriteKey,
analytics.Configuration.FlushPolicies,
analytics.Configuration.ApiHost);
}
}
}
13 changes: 13 additions & 0 deletions Analytics-CSharp/Segment/Analytics/Utilities/IEventPipeline.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
namespace Segment.Analytics.Utilities
{
public interface IEventPipeline
{
bool Running { get; }
string ApiHost { get; set; }

void Put(RawEvent @event);
void Flush();
void Start();
void Stop();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
using System.Collections.Generic;

namespace Segment.Analytics.Utilities
{
public interface IEventPipelineProvider
{
IEventPipeline Create(Analytics analytics, string key);
}
}
202 changes: 202 additions & 0 deletions Analytics-CSharp/Segment/Analytics/Utilities/SyncEventPipeline.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,202 @@
using System.Collections.Generic;
using System.Threading;
using global::System;
using global::System.Linq;
using Segment.Analytics.Policies;
using Segment.Concurrent;
using Segment.Serialization;

namespace Segment.Analytics.Utilities
{
internal sealed class FlushEvent : RawEvent
{
public override string Type => "flush";
public readonly SemaphoreSlim _semaphore;

internal FlushEvent(SemaphoreSlim semaphore)
{
_semaphore = semaphore;
}
}


public class SyncEventPipeline: IEventPipeline
{
private readonly Analytics _analytics;

private readonly string _logTag;

private readonly IList<IFlushPolicy> _flushPolicies;

private Channel<RawEvent> _writeChannel;

private Channel<FlushEvent> _uploadChannel;

private readonly HTTPClient _httpClient;

private readonly IStorage _storage;

public string ApiHost { get; set; }

public bool Running { get; private set; }

internal int _flushTimeout = -1;
internal CancellationToken _flushCancellationToken = CancellationToken.None;

public SyncEventPipeline(
Analytics analytics,
string logTag,
string apiKey,
IList<IFlushPolicy> flushPolicies,
string apiHost = HTTPClient.DefaultAPIHost,
int flushTimeout = -1,
CancellationToken? flushCancellationToken = null)
{
_analytics = analytics;
_logTag = logTag;
_flushPolicies = flushPolicies;
ApiHost = apiHost;

_writeChannel = new Channel<RawEvent>();
_uploadChannel = new Channel<FlushEvent>();
_httpClient = analytics.Configuration.HttpClientProvider.CreateHTTPClient(apiKey, apiHost: apiHost);
_httpClient.AnalyticsRef = analytics;
_storage = analytics.Storage;
Running = false;
_flushTimeout = flushTimeout;
_flushCancellationToken = flushCancellationToken ?? CancellationToken.None;
}

public void Put(RawEvent @event) => _writeChannel.Send(@event);

public void Flush() {
FlushEvent flushEvent = new FlushEvent(new SemaphoreSlim(1,1));
_writeChannel.Send(flushEvent);
flushEvent._semaphore.Wait(_flushTimeout, _flushCancellationToken);
}

public void Start()
{
if (Running) return;

// avoid to re-establish a channel if the pipeline just gets created
if (_writeChannel.isCancelled)
{
_writeChannel = new Channel<RawEvent>();
_uploadChannel = new Channel<FlushEvent>();
}

Running = true;
Schedule();
Write();
Upload();
}

public void Stop()
{
if (!Running) return;
Running = false;

_uploadChannel.Cancel();
_writeChannel.Cancel();
Unschedule();
}

private void Write() => _analytics.AnalyticsScope.Launch(_analytics.FileIODispatcher, async () =>
{
while (!_writeChannel.isCancelled)
{
RawEvent e = await _writeChannel.Receive();
bool isPoison = e is FlushEvent;

if (!isPoison)
{
try
{
string str = JsonUtility.ToJson(e);
Analytics.Logger.Log(LogLevel.Debug, message: _logTag + " running " + str);
await _storage.Write(StorageConstants.Events, str);

foreach (IFlushPolicy flushPolicy in _flushPolicies)
{
flushPolicy.UpdateState(e);
}
}
catch (Exception exception)
{
Analytics.Logger.Log(LogLevel.Error, exception, _logTag + ": Error writing events to storage.");
}
}

if (isPoison || _flushPolicies.Any(o => o.ShouldFlush()))
{
FlushEvent flushEvent = e as FlushEvent ?? new FlushEvent(null);
_uploadChannel.Send(flushEvent);
foreach (IFlushPolicy flushPolicy in _flushPolicies)
{
flushPolicy.Reset();
}
}
}
});

private void Upload() => _analytics.AnalyticsScope.Launch(_analytics.NetworkIODispatcher, async () =>
{
while (!_uploadChannel.isCancelled)
{
FlushEvent flushEvent = await _uploadChannel.Receive();
Analytics.Logger.Log(LogLevel.Debug, message: _logTag + " performing flush");

await Scope.WithContext(_analytics.FileIODispatcher, async () => await _storage.Rollover());

string[] fileUrlList = _storage.Read(StorageConstants.Events).Split(',');
foreach (string url in fileUrlList)
{
if (string.IsNullOrEmpty(url))
{
continue;
}

byte[] data = _storage.ReadAsBytes(url);
if (data == null)
{
continue;
}

bool shouldCleanup = true;
try
{
shouldCleanup = await _httpClient.Upload(data);
Analytics.Logger.Log(LogLevel.Debug, message: _logTag + " uploaded " + url);
}
catch (Exception e)
{
Analytics.Logger.Log(LogLevel.Error, e, _logTag + ": Error uploading to url");
}

if (shouldCleanup)
{
_storage.RemoveFile(url);
}
}
flushEvent._semaphore?.Release();
}
});

private void Schedule()
{
foreach (IFlushPolicy flushPolicy in _flushPolicies)
{
flushPolicy.Schedule(_analytics);
}
}

private void Unschedule()
{
foreach (IFlushPolicy flushPolicy in _flushPolicies)
{
flushPolicy.Unschedule();
}
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
using System.Threading;

namespace Segment.Analytics.Utilities
{
public class SyncEventPipelineProvider: IEventPipelineProvider
{
internal int _flushTimeout = -1;
internal CancellationToken? _flushCancellationToken = null;

public SyncEventPipelineProvider(
int flushTimeout = -1,
CancellationToken? flushCancellationToken = null)
{
_flushTimeout = flushTimeout;
_flushCancellationToken = flushCancellationToken;
}

public IEventPipeline Create(Analytics analytics, string key)
{
return new SyncEventPipeline(analytics, key,
analytics.Configuration.WriteKey,
analytics.Configuration.FlushPolicies,
analytics.Configuration.ApiHost,
_flushTimeout,
_flushCancellationToken);
}
}
}
Loading
Loading