diff --git a/Directory.Packages.props b/Directory.Packages.props index d2e90f1f5..8c825fa6c 100644 --- a/Directory.Packages.props +++ b/Directory.Packages.props @@ -36,7 +36,9 @@ - + + + diff --git a/src/Testcontainers.Kafka/KafkaBuilder.cs b/src/Testcontainers.Kafka/KafkaBuilder.cs index 210129972..efa81c439 100644 --- a/src/Testcontainers.Kafka/KafkaBuilder.cs +++ b/src/Testcontainers.Kafka/KafkaBuilder.cs @@ -10,10 +10,14 @@ public sealed class KafkaBuilder : ContainerBuilder /// Initializes a new instance of the class. /// @@ -43,6 +47,49 @@ public override KafkaContainer Build() return new KafkaContainer(DockerResourceConfiguration); } + /// + /// Adds a listener to the Kafka configuration in the format host:port. + /// + /// + /// The host will be included as a network alias, allowing additional connections + /// to the Kafka broker within the same container network. + /// + /// This method is useful for registering custom listeners beyond the default ones, + /// enabling specific connection points for Kafka brokers. + /// + /// Default listeners include: + /// - PLAINTEXT://0.0.0.0:9092 + /// - BROKER://0.0.0.0:9093 + /// - CONTROLLER://0.0.0.0:9094 + /// + /// The MsSql database. + /// A configured instance of . + public KafkaBuilder WithListener(string kafka) + { + var index = DockerResourceConfiguration.Listeners?.Count() ?? 0; + var protocol = $"{ProtocolPrefix}-{index}"; + var listener = $"{protocol}://{kafka}"; + var listenerSecurityProtocolMap = $"{protocol}:PLAINTEXT"; + + var listeners = new[] { listener }; + var listenersSecurityProtocolMap = new[] { listenerSecurityProtocolMap }; + + var host = kafka.Split(':')[0]; + + var updatedListeners = DockerResourceConfiguration.Environments["KAFKA_LISTENERS"] + .Split(',') + .Concat(listeners); + + var updatedListenersSecurityProtocolMap = DockerResourceConfiguration.Environments["KAFKA_LISTENER_SECURITY_PROTOCOL_MAP"] + .Split(',') + .Concat(listenersSecurityProtocolMap); + + return Merge(DockerResourceConfiguration, new KafkaConfiguration(listeners, listeners)) + .WithEnvironment("KAFKA_LISTENERS", string.Join(",", updatedListeners)) + .WithEnvironment("KAFKA_LISTENER_SECURITY_PROTOCOL_MAP", string.Join(",", updatedListenersSecurityProtocolMap)) + .WithNetworkAliases(host); + } + /// protected override KafkaBuilder Init() { @@ -51,10 +98,12 @@ protected override KafkaBuilder Init() .WithPortBinding(KafkaPort, true) .WithPortBinding(BrokerPort, true) .WithPortBinding(ZookeeperPort, true) - .WithEnvironment("KAFKA_LISTENERS", "PLAINTEXT://0.0.0.0:" + KafkaPort + ",BROKER://0.0.0.0:" + BrokerPort) - .WithEnvironment("KAFKA_LISTENER_SECURITY_PROTOCOL_MAP", "BROKER:PLAINTEXT,PLAINTEXT:PLAINTEXT") + .WithEnvironment("KAFKA_LISTENERS", $"PLAINTEXT://0.0.0.0:{KafkaPort},BROKER://0.0.0.0:{BrokerPort},CONTROLLER://0.0.0.0:{ControllerPort}") + .WithEnvironment("KAFKA_LISTENER_SECURITY_PROTOCOL_MAP", "BROKER:PLAINTEXT,CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT") .WithEnvironment("KAFKA_INTER_BROKER_LISTENER_NAME", "BROKER") .WithEnvironment("KAFKA_BROKER_ID", "1") + .WithEnvironment("KAFKA_NODE_ID", "1") + .WithEnvironment("KAFKA_CONTROLLER_QUORUM_VOTERS", "1@localhost:" + ControllerPort) .WithEnvironment("KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR", "1") .WithEnvironment("KAFKA_OFFSETS_TOPIC_NUM_PARTITIONS", "1") .WithEnvironment("KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR", "1") @@ -68,6 +117,7 @@ protected override KafkaBuilder Init() .WithStartupCallback((container, ct) => { const char lf = '\n'; + var additionalAdvertisedListeners = string.Join(",", container.AdvertisedListeners ?? Array.Empty()); var startupScript = new StringBuilder(); startupScript.Append("#!/bin/bash"); startupScript.Append(lf); @@ -79,7 +129,7 @@ protected override KafkaBuilder Init() startupScript.Append(lf); startupScript.Append("zookeeper-server-start zookeeper.properties &"); startupScript.Append(lf); - startupScript.Append("export KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://" + container.Hostname + ":" + container.GetMappedPublicPort(KafkaPort) + ",BROKER://" + container.IpAddress + ":" + BrokerPort); + startupScript.Append("export KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://" + container.Hostname + ":" + container.GetMappedPublicPort(KafkaPort) + ",BROKER://" + container.IpAddress + ":" + BrokerPort + "," + additionalAdvertisedListeners); startupScript.Append(lf); startupScript.Append("echo '' > /etc/confluent/docker/ensure"); startupScript.Append(lf); diff --git a/src/Testcontainers.Kafka/KafkaConfiguration.cs b/src/Testcontainers.Kafka/KafkaConfiguration.cs index f741cb306..7299e3488 100644 --- a/src/Testcontainers.Kafka/KafkaConfiguration.cs +++ b/src/Testcontainers.Kafka/KafkaConfiguration.cs @@ -7,8 +7,14 @@ public sealed class KafkaConfiguration : ContainerConfiguration /// /// Initializes a new instance of the class. /// - public KafkaConfiguration() + /// A list of listeners. + /// A list of advertised listeners. + public KafkaConfiguration( + IEnumerable listeners = null, + IEnumerable advertisedListeners = null) { + Listeners = listeners; + AdvertisedListeners = advertisedListeners; } /// @@ -49,5 +55,17 @@ public KafkaConfiguration(KafkaConfiguration resourceConfiguration) public KafkaConfiguration(KafkaConfiguration oldValue, KafkaConfiguration newValue) : base(oldValue, newValue) { + Listeners = BuildConfiguration.Combine(oldValue.Listeners, newValue.Listeners); + AdvertisedListeners = BuildConfiguration.Combine(oldValue.AdvertisedListeners, newValue.AdvertisedListeners); } + + /// + /// Gets a list of listeners. + /// + public IEnumerable Listeners { get; } + + /// + /// Gets a list of advertised listeners. + /// + public IEnumerable AdvertisedListeners { get; } } \ No newline at end of file diff --git a/src/Testcontainers.Kafka/KafkaContainer.cs b/src/Testcontainers.Kafka/KafkaContainer.cs index 41407fa7d..89a20db5b 100644 --- a/src/Testcontainers.Kafka/KafkaContainer.cs +++ b/src/Testcontainers.Kafka/KafkaContainer.cs @@ -4,6 +4,8 @@ namespace Testcontainers.Kafka; [PublicAPI] public sealed class KafkaContainer : DockerContainer { + private readonly KafkaConfiguration _configuration; + /// /// Initializes a new instance of the class. /// @@ -11,6 +13,7 @@ public sealed class KafkaContainer : DockerContainer public KafkaContainer(KafkaConfiguration configuration) : base(configuration) { + _configuration = configuration; } /// @@ -21,4 +24,15 @@ public string GetBootstrapAddress() { return new UriBuilder("PLAINTEXT", Hostname, GetMappedPublicPort(KafkaBuilder.KafkaPort)).ToString(); } + + /// + /// Gets a list of advertised listeners. + /// + public IEnumerable AdvertisedListeners + { + get + { + return _configuration.AdvertisedListeners; + } + } } \ No newline at end of file diff --git a/src/Testcontainers.Kafka/Usings.cs b/src/Testcontainers.Kafka/Usings.cs index 4ee39f0be..da93d34a3 100644 --- a/src/Testcontainers.Kafka/Usings.cs +++ b/src/Testcontainers.Kafka/Usings.cs @@ -1,4 +1,6 @@ global using System; +global using System.Collections.Generic; +global using System.Linq; global using System.Text; global using Docker.DotNet.Models; global using DotNet.Testcontainers.Builders; diff --git a/tests/Testcontainers.Kafka.Tests/KafkaContainerNetworkTest.cs b/tests/Testcontainers.Kafka.Tests/KafkaContainerNetworkTest.cs new file mode 100644 index 000000000..d5d9ba26d --- /dev/null +++ b/tests/Testcontainers.Kafka.Tests/KafkaContainerNetworkTest.cs @@ -0,0 +1,68 @@ +namespace Testcontainers.Kafka; + +public sealed class KafkaContainerNetworkTest : IAsyncLifetime +{ + private const string Message = "Message produced by kafkacat"; + + private const string Listener = "kafka:19092"; + + private const string DataFilePath = "/data/msgs.txt"; + + private readonly INetwork _network; + + private readonly IContainer _kafkaContainer; + + private readonly IContainer _kCatContainer; + + public KafkaContainerNetworkTest() + { + _network = new NetworkBuilder() + .Build(); + + _kafkaContainer = new KafkaBuilder() + .WithImage("confluentinc/cp-kafka:6.1.9") + .WithNetwork(_network) + .WithListener(Listener) + .Build(); + + _kCatContainer = new ContainerBuilder() + .WithImage("confluentinc/cp-kafkacat:6.1.9") + .WithNetwork(_network) + .WithEntrypoint(CommonCommands.SleepInfinity) + .WithResourceMapping(Encoding.Default.GetBytes(Message), DataFilePath) + .Build(); + } + + public async Task InitializeAsync() + { + await _kafkaContainer.StartAsync() + .ConfigureAwait(false); + + await _kCatContainer.StartAsync() + .ConfigureAwait(false); + } + + public async Task DisposeAsync() + { + await _kafkaContainer.StartAsync() + .ConfigureAwait(false); + + await _kCatContainer.StartAsync() + .ConfigureAwait(false); + + await _network.DisposeAsync() + .ConfigureAwait(false); + } + + [Fact] + public async Task ConsumesProducedKafkaMessage() + { + _ = await _kCatContainer.ExecAsync(new[] { "kafkacat", "-b", Listener, "-t", "msgs", "-P", "-l", DataFilePath }) + .ConfigureAwait(true); + + var execResult = await _kCatContainer.ExecAsync(new[] { "kafkacat", "-b", Listener, "-C", "-t", "msgs", "-c", "1" }) + .ConfigureAwait(true); + + Assert.Equal(Message, execResult.Stdout.Trim()); + } +} \ No newline at end of file diff --git a/tests/Testcontainers.Kafka.Tests/KafkaContainerRegistryTest.cs b/tests/Testcontainers.Kafka.Tests/KafkaContainerRegistryTest.cs new file mode 100644 index 000000000..9220c202b --- /dev/null +++ b/tests/Testcontainers.Kafka.Tests/KafkaContainerRegistryTest.cs @@ -0,0 +1,129 @@ +namespace Testcontainers.Kafka; + +public sealed class KafkaContainerRegistryTest : IAsyncLifetime +{ + private const string Schema = @" + { + ""$schema"": ""http://json-schema.org/draft-04/schema#"", + ""title"": ""User"", + ""type"": ""object"", + ""additionalProperties"": false, + ""properties"": { + ""FirstName"": { + ""type"": [""null"", ""string""] + }, + ""LastName"": { + ""type"": [""null"", ""string""] + } + } + }"; + + private const ushort RestPort = 8085; + + private const string SchemaRegistryNetworkAlias = "schema-registry"; + + private const string Listener = "kafka:19092"; + + private readonly INetwork _network; + + private readonly KafkaContainer _kafkaContainer; + + private readonly IContainer _schemaRegistryContainer; + + public KafkaContainerRegistryTest() + { + _network = new NetworkBuilder() + .Build(); + + _kafkaContainer = new KafkaBuilder() + .WithImage("confluentinc/cp-kafka:6.1.9") + .WithNetwork(_network) + .WithListener(Listener) + .Build(); + + _schemaRegistryContainer = new ContainerBuilder() + .WithImage("confluentinc/cp-schema-registry:6.1.9") + .WithPortBinding(RestPort, true) + .WithNetwork(_network) + .WithNetworkAliases(SchemaRegistryNetworkAlias) + .WithEnvironment("SCHEMA_REGISTRY_LISTENERS", "http://0.0.0.0:" + RestPort) + .WithEnvironment("SCHEMA_REGISTRY_KAFKASTORE_SECURITY_PROTOCOL", "PLAINTEXT") + .WithEnvironment("SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS", "PLAINTEXT://" + Listener) + .WithEnvironment("SCHEMA_REGISTRY_HOST_NAME", SchemaRegistryNetworkAlias) + .WithWaitStrategy(Wait.ForUnixContainer().UntilHttpRequestIsSucceeded(request => + request.ForPort(RestPort).ForPath("/subjects"))) + .Build(); + } + + public async Task InitializeAsync() + { + await _kafkaContainer.StartAsync() + .ConfigureAwait(false); + + await _schemaRegistryContainer.StartAsync() + .ConfigureAwait(false); + } + + public async Task DisposeAsync() + { + await _kafkaContainer.StartAsync() + .ConfigureAwait(false); + + await _schemaRegistryContainer.StartAsync() + .ConfigureAwait(false); + + await _network.DisposeAsync() + .ConfigureAwait(false); + } + + [Fact] + public async Task ConsumerReturnsProducerMessage() + { + // Given + const string topic = "user"; + + var subject = SubjectNameStrategy.Topic.ConstructValueSubjectName(topic); + + var bootstrapServer = _kafkaContainer.GetBootstrapAddress(); + + var producerConfig = new ProducerConfig(); + producerConfig.BootstrapServers = bootstrapServer; + + var consumerConfig = new ConsumerConfig(); + consumerConfig.BootstrapServers = bootstrapServer; + consumerConfig.GroupId = "sample-consumer"; + consumerConfig.AutoOffsetReset = AutoOffsetReset.Earliest; + + var message = new Message(); + message.Value = new User("John", "Doe"); + + var schemaRegistryConfig = new SchemaRegistryConfig(); + schemaRegistryConfig.Url = new UriBuilder(Uri.UriSchemeHttp, _schemaRegistryContainer.Hostname, _schemaRegistryContainer.GetMappedPublicPort(RestPort)).ToString(); + + // When + using var schemaRegistry = new CachedSchemaRegistryClient(schemaRegistryConfig); + _ = await schemaRegistry.RegisterSchemaAsync(subject, new Schema(Schema, SchemaType.Json)) + .ConfigureAwait(true); + + using var producer = new ProducerBuilder(producerConfig) + .SetValueSerializer(new JsonSerializer(schemaRegistry)) + .Build(); + + _ = await producer.ProduceAsync(topic, message) + .ConfigureAwait(true); + + using var consumer = new ConsumerBuilder(consumerConfig) + .SetValueDeserializer(new JsonDeserializer().AsSyncOverAsync()) + .Build(); + + consumer.Subscribe(topic); + + var result = consumer.Consume(TimeSpan.FromSeconds(15)); + + // Then + Assert.NotNull(result); + Assert.Equal(message.Value, result.Message.Value); + } + + private record User(string FirstName, string LastName); +} \ No newline at end of file diff --git a/tests/Testcontainers.Kafka.Tests/Testcontainers.Kafka.Tests.csproj b/tests/Testcontainers.Kafka.Tests/Testcontainers.Kafka.Tests.csproj index 94298bbaa..a968f03fd 100644 --- a/tests/Testcontainers.Kafka.Tests/Testcontainers.Kafka.Tests.csproj +++ b/tests/Testcontainers.Kafka.Tests/Testcontainers.Kafka.Tests.csproj @@ -10,6 +10,8 @@ + + diff --git a/tests/Testcontainers.Kafka.Tests/Usings.cs b/tests/Testcontainers.Kafka.Tests/Usings.cs index a7fcce194..3e69bfb4f 100644 --- a/tests/Testcontainers.Kafka.Tests/Usings.cs +++ b/tests/Testcontainers.Kafka.Tests/Usings.cs @@ -1,5 +1,15 @@ global using System; +global using System.Collections.Generic; +global using System.Diagnostics; +global using System.Text; +global using System.Threading; global using System.Threading.Tasks; global using Confluent.Kafka; +global using Confluent.Kafka.SyncOverAsync; +global using Confluent.SchemaRegistry; +global using Confluent.SchemaRegistry.Serdes; +global using DotNet.Testcontainers.Builders; global using DotNet.Testcontainers.Commons; +global using DotNet.Testcontainers.Containers; +global using DotNet.Testcontainers.Networks; global using Xunit; \ No newline at end of file