Skip to content

Commit

Permalink
Add Id column as identifier for SQL outbox messages
Browse files Browse the repository at this point in the history
  • Loading branch information
tbd-develop committed May 1, 2024
1 parent 14178ef commit f8e73b3
Show file tree
Hide file tree
Showing 2 changed files with 25 additions and 5 deletions.
5 changes: 4 additions & 1 deletion samples/outbox-sql-sample/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,10 @@

var publisher = app.Services.GetRequiredService<IEventPublisher>();

await publisher.PublishAsync(Guid.NewGuid(), new SampleEvent { SomeValue = "Hello, World", SomeOtherValue = 99 });
var key = Guid.NewGuid();

await publisher.PublishAsync(key, new SampleEvent { SomeValue = "Hello, World", SomeOtherValue = 99 });
await publisher.PublishAsync(key, new SampleEvent { SomeValue = "Hello, Another World", SomeOtherValue = 10001 });

var factory = app.Services.GetRequiredService<IDbContextFactory<OutboxDbContext>>();

Expand Down
25 changes: 21 additions & 4 deletions src/TbdDevelop.Kafka.Outbox.SqlServer/SqlServerOutbox.cs
Original file line number Diff line number Diff line change
Expand Up @@ -54,17 +54,22 @@ await context.OutboxMessages.AddAsync(new OutboxMessageContent
var @event = JsonSerializer.Deserialize(message.Content, type, SerializerOptions);

return (IOutboxMessage)Activator.CreateInstance(
typeof(OutboxMessage<>).MakeGenericType(type),
message.Key, message.DateAdded, @event)!;
typeof(SqlOutboxMessage<>).MakeGenericType(type),
message.Id, message.Key, message.DateAdded, @event)!;
}

public async Task Commit(IOutboxMessage message, CancellationToken cancellationToken = default)
{
await using var context = await factory.CreateDbContextAsync(cancellationToken);

if (message is not ISqlOutboxMessage sqlMessage)
{
return;
}

var current =
await context.OutboxMessages.SingleOrDefaultAsync(m => m.Key == message.Key,
cancellationToken);
await context.OutboxMessages.FirstOrDefaultAsync(m =>
m.Id == sqlMessage.Id, cancellationToken);

if (current is null)
{
Expand All @@ -75,4 +80,16 @@ await context.OutboxMessages.SingleOrDefaultAsync(m => m.Key == message.Key,

await context.SaveChangesAsync(cancellationToken);
}

private sealed class SqlOutboxMessage<TEvent>(int id, Guid key, DateTime dateAdded, TEvent @event)
: OutboxMessage<TEvent>(key, dateAdded, @event), ISqlOutboxMessage
where TEvent : IEvent
{
public int Id { get; } = id;
}

private interface ISqlOutboxMessage : IOutboxMessage
{
int Id { get; }
}
}

0 comments on commit f8e73b3

Please # to comment.