Skip to content

fix: enhancement of postgres change handlers #54

New issue

Have a question about this project? # for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “#”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? # to your account

Draft
wants to merge 8 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 7 additions & 8 deletions .github/workflows/build-and-test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,12 @@ jobs:
- name: Build
run: dotnet build --configuration Release --no-restore

#- name: Add hosts entries
# run: |
# echo "127.0.0.1 realtime-dev.localhost" | sudo tee -a /etc/hosts
# echo "172.17.0.1 host.docker.internal" | sudo tee -a /etc/hosts
- uses: supabase/setup-cli@v1
with:
version: latest

#- name: Initialize Testing Stack
# run: docker-compose up -d
- name: Start Supabsae
run: supabase start

#- name: Test
# run: dotnet test --no-restore
- name: Test
run: dotnet test --no-restore
6 changes: 3 additions & 3 deletions Realtime/Client.cs
Original file line number Diff line number Diff line change
Expand Up @@ -47,15 +47,15 @@
public ClientOptions Options { get; }

private Func<Dictionary<string, string>>? _getHeaders { get; set; }

/// <inheritdoc />
public Func<Dictionary<string, string>>? GetHeaders
{
get => _getHeaders;
set
{
_getHeaders = value;

if (Socket != null)
Socket.GetHeaders = value;
}
Expand Down Expand Up @@ -345,7 +345,7 @@
/// <exception cref="Exception"></exception>
public RealtimeChannel Channel(string channelName)
{
var topic = $"realtime:{channelName}";
var topic = channelName.StartsWith("realtime:") ? channelName : $"realtime:{channelName}";

if (_subscriptions.TryGetValue(topic, out var channel))
return channel;
Expand Down Expand Up @@ -386,7 +386,7 @@
var options = new ChannelOptions(Options, () => AccessToken, SerializerSettings);

var subscription = new RealtimeChannel(Socket!, key, options);
subscription.Register(changesOptions);

Check warning on line 389 in Realtime/Client.cs

View workflow job for this annotation

GitHub Actions / build-and-test

'RealtimeChannel.Register(PostgresChangesOptions)' is obsolete: 'Use OnPostgresChange instead.'

Check warning on line 389 in Realtime/Client.cs

View workflow job for this annotation

GitHub Actions / build-and-test

'RealtimeChannel.Register(PostgresChangesOptions)' is obsolete: 'Use OnPostgresChange instead.'

Check warning on line 389 in Realtime/Client.cs

View workflow job for this annotation

GitHub Actions / build-and-test

'RealtimeChannel.Register(PostgresChangesOptions)' is obsolete: 'Use OnPostgresChange instead.'

Check warning on line 389 in Realtime/Client.cs

View workflow job for this annotation

GitHub Actions / build-and-test

'RealtimeChannel.Register(PostgresChangesOptions)' is obsolete: 'Use OnPostgresChange instead.'

_subscriptions.Add(key, subscription);

Expand Down
11 changes: 11 additions & 0 deletions Realtime/Interfaces/IRealtimeChannel.cs
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,17 @@ public interface IRealtimeChannel
/// </summary>
string Topic { get; }

/// <summary>
/// Registers and adds a postgres change handler.
/// </summary>
/// <param name="postgresChangeHandler">The handler to process the event.</param>
/// <param name="listenType">The type of event this callback should process.</param>
/// <param name="schema">The schema to listen to.</param>
/// <param name="table">The table to listen to.</param>
/// <param name="filter">The filter to apply.</param>
/// <returns></returns>
public IRealtimeChannel OnPostgresChange(PostgresChangesHandler postgresChangeHandler, ListenType listenType = ListenType.All, string schema = "public", string? table = null, string? filter = null);

/// <summary>
/// Add a state changed listener
/// </summary>
Expand Down
3 changes: 2 additions & 1 deletion Realtime/Interfaces/IRealtimeClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ namespace Supabase.Realtime.Interfaces;
/// </summary>
/// <typeparam name="TSocket"></typeparam>
/// <typeparam name="TChannel"></typeparam>
public interface IRealtimeClient<TSocket, TChannel>: IGettableHeaders
public interface IRealtimeClient<TSocket, TChannel> : IGettableHeaders
where TSocket : IRealtimeSocket
where TChannel : IRealtimeChannel
{
Expand Down Expand Up @@ -95,6 +95,7 @@ public interface IRealtimeClient<TSocket, TChannel>: IGettableHeaders
/// <param name="value"></param>
/// <param name="parameters"></param>
/// <returns></returns>
[Obsolete("Please use Channel(string channelName) instead.")]
TChannel Channel(string database = "realtime", string schema = "public", string table = "*",
string? column = null, string? value = null, Dictionary<string, string>? parameters = null);

Expand Down
3 changes: 2 additions & 1 deletion Realtime/PostgresChanges/PostgresChangesOptions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@
/// <summary>
/// The table for this listener, can be: `*` matching all tables in schema.
/// </summary>
[JsonProperty("table")]
[JsonProperty("table", NullValueHandling = NullValueHandling.Ignore)]
public string? Table { get; set; }

/// <summary>
Expand All @@ -71,6 +71,7 @@
/// The parameters passed to the server
/// </summary>
[JsonProperty("parameters", NullValueHandling = NullValueHandling.Ignore)]
[System.Obsolete("The Parameters property is deprecated and will be removed in a future version.")]
public Dictionary<string, string>? Parameters { get; set; }

/// <summary>
Expand All @@ -95,5 +96,5 @@
Schema = schema;
Table = table;
Filter = filter;
Parameters = parameters;

Check warning on line 99 in Realtime/PostgresChanges/PostgresChangesOptions.cs

View workflow job for this annotation

GitHub Actions / build-and-test

'PostgresChangesOptions.Parameters' is obsolete: 'The Parameters property is deprecated and will be removed in a future version.'

Check warning on line 99 in Realtime/PostgresChanges/PostgresChangesOptions.cs

View workflow job for this annotation

GitHub Actions / build-and-test

'PostgresChangesOptions.Parameters' is obsolete: 'The Parameters property is deprecated and will be removed in a future version.'

Check warning on line 99 in Realtime/PostgresChanges/PostgresChangesOptions.cs

View workflow job for this annotation

GitHub Actions / build-and-test

'PostgresChangesOptions.Parameters' is obsolete: 'The Parameters property is deprecated and will be removed in a future version.'

Check warning on line 99 in Realtime/PostgresChanges/PostgresChangesOptions.cs

View workflow job for this annotation

GitHub Actions / build-and-test

'PostgresChangesOptions.Parameters' is obsolete: 'The Parameters property is deprecated and will be removed in a future version.'
}
Expand Down
25 changes: 22 additions & 3 deletions Realtime/RealtimeChannel.cs
Original file line number Diff line number Diff line change
Expand Up @@ -324,10 +324,28 @@
}

/// <summary>
/// Add a postgres changes listener. Should be paired with <see cref="Register"/>.
/// Registers and adds a postgres change handler.
/// </summary>
/// <param name="postgresChangeHandler">The handler to process the event.</param>
/// <param name="listenType">The type of event this callback should process.</param>
/// <param name="schema">The schema to listen to.</param>
/// <param name="table">The table to listen to.</param>
/// <param name="filter">The filter to apply.</param>
/// <returns></returns>
public IRealtimeChannel OnPostgresChange(PostgresChangesHandler postgresChangeHandler, ListenType listenType = ListenType.All, string schema = "public", string? table = null, string? filter = null)
{
var postgresChangesOptions = new PostgresChangesOptions(schema, table, listenType, filter);
Register(postgresChangesOptions);

Check warning on line 338 in Realtime/RealtimeChannel.cs

View workflow job for this annotation

GitHub Actions / build-and-test

'RealtimeChannel.Register(PostgresChangesOptions)' is obsolete: 'Use OnPostgresChange instead.'

Check warning on line 338 in Realtime/RealtimeChannel.cs

View workflow job for this annotation

GitHub Actions / build-and-test

'RealtimeChannel.Register(PostgresChangesOptions)' is obsolete: 'Use OnPostgresChange instead.'

Check warning on line 338 in Realtime/RealtimeChannel.cs

View workflow job for this annotation

GitHub Actions / build-and-test

'RealtimeChannel.Register(PostgresChangesOptions)' is obsolete: 'Use OnPostgresChange instead.'

Check warning on line 338 in Realtime/RealtimeChannel.cs

View workflow job for this annotation

GitHub Actions / build-and-test

'RealtimeChannel.Register(PostgresChangesOptions)' is obsolete: 'Use OnPostgresChange instead.'
AddPostgresChangeHandler(listenType, postgresChangeHandler);

Check warning on line 339 in Realtime/RealtimeChannel.cs

View workflow job for this annotation

GitHub Actions / build-and-test

'RealtimeChannel.AddPostgresChangeHandler(PostgresChangesOptions.ListenType, IRealtimeChannel.PostgresChangesHandler)' is obsolete: 'Use OnPostgresChange instead.'

Check warning on line 339 in Realtime/RealtimeChannel.cs

View workflow job for this annotation

GitHub Actions / build-and-test

'RealtimeChannel.AddPostgresChangeHandler(PostgresChangesOptions.ListenType, IRealtimeChannel.PostgresChangesHandler)' is obsolete: 'Use OnPostgresChange instead.'

Check warning on line 339 in Realtime/RealtimeChannel.cs

View workflow job for this annotation

GitHub Actions / build-and-test

'RealtimeChannel.AddPostgresChangeHandler(PostgresChangesOptions.ListenType, IRealtimeChannel.PostgresChangesHandler)' is obsolete: 'Use OnPostgresChange instead.'
return this;
}

/// <summary>
/// Adds a postgres changes listener. Should be paired with <see cref="Register"/>.
/// </summary>
/// <param name="listenType">The type of event this callback should process.</param>
/// <param name="postgresChangeHandler"></param>
[Obsolete("Use OnPostgresChange instead.")]
public void AddPostgresChangeHandler(ListenType listenType, PostgresChangesHandler postgresChangeHandler)
{
if (!_postgresChangesHandlers.ContainsKey(listenType))
Expand Down Expand Up @@ -425,6 +443,7 @@
/// </summary>
/// <param name="postgresChangesOptions"></param>
/// <returns></returns>
[Obsolete("Use OnPostgresChange instead.")]
public IRealtimeChannel Register(PostgresChangesOptions postgresChangesOptions)
{
PostgresChangesOptions.Add(postgresChangesOptions);
Expand Down Expand Up @@ -694,7 +713,7 @@
_isRejoining = false;

NotifyErrorOccurred(new RealtimeException(message.Json)
{ Reason = FailureHint.Reason.ChannelJoinFailure });
{ Reason = FailureHint.Reason.ChannelJoinFailure });
break;
}
}
Expand Down Expand Up @@ -733,7 +752,7 @@
break;
case PhoenixStatusError:
NotifyErrorOccurred(new RealtimeException(message.Json)
{ Reason = FailureHint.Reason.ChannelJoinFailure });
{ Reason = FailureHint.Reason.ChannelJoinFailure });
break;
}

Expand Down
127 changes: 96 additions & 31 deletions RealtimeTests/ChannelPostgresChangesTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@
using RealtimeTests.Models;
using Supabase.Realtime;
using Supabase.Realtime.Interfaces;
using Supabase.Realtime.PostgresChanges;
using static Supabase.Realtime.Constants;
using static Supabase.Realtime.PostgresChanges.PostgresChangesOptions;

Expand Down Expand Up @@ -38,15 +37,13 @@ public async Task ChannelPayloadReturnsModel()
{
var tsc = new TaskCompletionSource<bool>();

var channel = _socketClient!.Channel("example");
channel.Register(new PostgresChangesOptions("public", "*"));
channel.AddPostgresChangeHandler(ListenType.Inserts, (_, changes) =>
{
var model = changes.Model<Todo>();
tsc.SetResult(model != null);
});

await channel.Subscribe();
await _socketClient!.Channel("example")
.OnPostgresChange((_, changes) =>
{
var model = changes.Model<Todo>();
tsc.SetResult(model != null);
}, ListenType.Inserts)
.Subscribe();

await _restClient!.Table<Todo>().Insert(new Todo { UserId = 1, Details = "Client Models a response? ✅" });

Expand All @@ -59,11 +56,10 @@ public async Task ChannelReceivesInsertCallback()
{
var tsc = new TaskCompletionSource<bool>();

var channel = _socketClient!.Channel("realtime", "public", "todos");

channel.AddPostgresChangeHandler(ListenType.Inserts, (_, _) => tsc.SetResult(true));
await _socketClient!.Channel("realtime:public:todos")
.OnPostgresChange((_, _) => tsc.SetResult(true), ListenType.Inserts, table: "todos")
.Subscribe();

await channel.Subscribe();
await _restClient!.Table<Todo>()
.Insert(new Todo { UserId = 1, Details = "Client receives insert callback? ✅" });

Expand All @@ -83,9 +79,8 @@ public async Task ChannelReceivesUpdateCallback()
var oldDetails = model.Details;
var newDetails = $"I'm an updated item ✏️ - {DateTime.Now}";

var channel = _socketClient!.Channel("realtime", "public", "todos");

channel.AddPostgresChangeHandler(ListenType.Updates, (_, changes) =>
await _socketClient!.Channel("realtime:public:todos")
.OnPostgresChange((_, changes) =>
{
var oldModel = changes.OldModel<Todo>();

Expand All @@ -101,9 +96,8 @@ public async Task ChannelReceivesUpdateCallback()
}

tsc.SetResult(true);
});

await channel.Subscribe();
}, ListenType.Updates, table: "todos")
.Subscribe();

await _restClient.Table<Todo>()
.Set(x => x.Details!, newDetails)
Expand All @@ -119,11 +113,9 @@ public async Task ChannelReceivesDeleteCallback()
{
var tsc = new TaskCompletionSource<bool>();

var channel = _socketClient!.Channel("realtime", "public", "todos");

channel.AddPostgresChangeHandler(ListenType.Deletes, (_, _) => tsc.SetResult(true));

await channel.Subscribe();
await _socketClient!.Channel("realtime:public:todos")
.OnPostgresChange((_, _) => tsc.SetResult(true), ListenType.Deletes, table: "todos")
.Subscribe();

var result = await _restClient!.Table<Todo>().Get();
var model = result.Models.Last();
Expand All @@ -143,9 +135,7 @@ public async Task ChannelReceivesWildcardCallback()

List<Task> tasks = new List<Task> { insertTsc.Task, updateTsc.Task, deleteTsc.Task };

var channel = _socketClient!.Channel("realtime", "public", "todos");

channel.AddPostgresChangeHandler(ListenType.All, (_, changes) =>
await _socketClient!.Channel("realtime:public:todos").OnPostgresChange((_, changes) =>
{
switch (changes.Payload?.Data?.Type)
{
Expand All @@ -159,12 +149,11 @@ public async Task ChannelReceivesWildcardCallback()
deleteTsc.SetResult(true);
break;
}
});

await channel.Subscribe();
}, ListenType.All, table: "todos").Subscribe();

var modeledResponse = await _restClient!.Table<Todo>().Insert(new Todo
{ UserId = 1, Details = "Client receives wildcard callbacks? ✅" });
{ UserId = 1, Details = "Client receives wildcard callbacks? ✅" });
var newModel = modeledResponse.Models.First();

await _restClient.Table<Todo>().Set(x => x.Details!, "And edits.").Match(newModel).Update();
Expand All @@ -176,4 +165,80 @@ public async Task ChannelReceivesWildcardCallback()
Assert.IsTrue(updateTsc.Task.Result);
Assert.IsTrue(deleteTsc.Task.Result);
}

[TestMethod("Channel: Receives Multiple Handlers")]
public async Task ChannelReceivesMultipleHandlers()
{
var insertTsc = new TaskCompletionSource<bool>();
var updateTsc = new TaskCompletionSource<bool>();
var deleteTsc = new TaskCompletionSource<bool>();
var allHandlerTsc = new TaskCompletionSource<bool>();
var filterHandlerTsc = new TaskCompletionSource<bool>();

var insertHandlerCalledCount = 0;
var updateHandlerCalledCount = 0;
var deleteHandlerCalledCount = 0;
var allHandlerCalledCount = 0;
var filterHandlerCalledCount = 0;

var channel = _socketClient!.Channel("realtime:public:todos");

channel.OnPostgresChange((_, changes) =>
{
if (changes.Payload?.Data?.Type == EventType.Insert)
{
insertHandlerCalledCount += 1;
insertTsc.SetResult(true);
}
}, ListenType.Inserts, table: "todos");

channel.OnPostgresChange((_, changes) =>
{
if (changes.Payload?.Data?.Type == EventType.Update)
{
updateHandlerCalledCount += 1;
updateTsc.SetResult(true);
}
}, ListenType.Updates, table: "todos");

channel.OnPostgresChange((_, changes) =>
{
if (changes.Payload?.Data?.Type == EventType.Delete)
{
deleteHandlerCalledCount += 1;
deleteTsc.SetResult(true);
}
}, ListenType.Deletes, table: "todos");

channel.OnPostgresChange((_, _) =>
{
allHandlerCalledCount += 1;
allHandlerTsc.SetResult(true);
}, ListenType.All, table: "todos");

channel.OnPostgresChange((_, changes) =>
{
filterHandlerCalledCount += 1;
filterHandlerTsc.SetResult(true);
}, ListenType.Updates, table: "todos");

await channel.Subscribe();

var modeledResponse = await _restClient!.Table<Todo>().Insert(new Todo
{ UserId = 1, Details = "Testing multiple handlers" });
var newModel = modeledResponse.Models.First();

await _restClient.Table<Todo>().Set(x => x.Details!, "Filtered update").Match(newModel).Update();
await _restClient.Table<Todo>().Set(x => x.Details!, "Another update").Match(newModel).Update();
await _restClient.Table<Todo>().Match(newModel).Delete();

await Task.WhenAll(insertTsc.Task, updateTsc.Task, deleteTsc.Task, allHandlerTsc.Task, filterHandlerTsc.Task);

Assert.AreEqual(insertHandlerCalledCount, 1);
Assert.AreEqual(updateHandlerCalledCount, 2);
Assert.AreEqual(deleteHandlerCalledCount, 1);

Assert.AreEqual(allHandlerCalledCount, 4);
Assert.AreEqual(filterHandlerCalledCount, 1);
}
}
Loading
Loading