Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[KIP-848] Make Kafka integration test run with new protocol too #2212

Merged
merged 9 commits into from May 7, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
15 changes: 12 additions & 3 deletions .semaphore/semaphore.yml
Expand Up @@ -119,12 +119,21 @@ blocks:
commands:
- docker login --username $DOCKERHUB_USER --password $DOCKERHUB_APIKEY
jobs:
- name: 'Build and test'
- name: 'Build and test with "classic" protocol'
commands:
- cd test/docker && docker-compose up -d && sleep 30 && cd ../..
- export SEMAPHORE_SKIP_FLAKY_TETSTS='true'
- export SEMAPHORE_SKIP_FLAKY_TESTS='true'
- dotnet restore
- cd test/Confluent.Kafka.IntegrationTests && dotnet test -l "console;verbosity=normal" && cd ../..
- name: 'Build and test with "consumer" protocol'
commands:
- cd test/docker && docker-compose -f docker-compose-kraft.yaml up -d && cd ../..
- sleep 300
- export SEMAPHORE_SKIP_FLAKY_TESTS='true'
- export TEST_CONSUMER_GROUP_PROTOCOL=consumer
- dotnet restore
- cd test/Confluent.Kafka.IntegrationTests && dotnet test -l "console;verbosity=normal" && cd ../..

- name: 'Schema registry and serdes integration tests'
dependencies: [ ]
task:
Expand All @@ -138,7 +147,7 @@ blocks:
- name: 'Build and test'
commands:
- cd test/docker && docker-compose up -d && cd ../..
- export SEMAPHORE_SKIP_FLAKY_TETSTS='true'
- export SEMAPHORE_SKIP_FLAKY_TESTS='true'
- dotnet restore
- cd test/Confluent.SchemaRegistry.Serdes.IntegrationTests && dotnet test -l "console;verbosity=normal" && cd ../..
# - cd test/Confluent.SchemaRegistry.IntegrationTests && dotnet test -l "console;verbosity=normal" && cd ../..
9 changes: 9 additions & 0 deletions CHANGELOG.md
@@ -1,3 +1,12 @@
# 2.4.0

## Enhancements

- References librdkafka.redist 2.4.0. Refer to the [librdkafka v2.4.0 release notes](https://github.com/confluentinc/librdkafka/releases/tag/v2.4.0) for more information.
- [KIP-848 EA](https://cwiki.apache.org/confluence/display/KAFKA/KIP-848%3A+The+Next+Generation+of+the+Consumer+Rebalance+Protocol):
Integration tests running with the new consumer group protocol. The feature is an Early Access: not production ready, still not supported (#2212).


# 2.3.0

## Enhancements
Expand Down
15 changes: 15 additions & 0 deletions Confluent.Kafka.sln
Expand Up @@ -71,6 +71,8 @@ Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "OAuthProducer", "examples\O
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "JsonWithReferences", "examples\JsonWithReferences\JsonWithReferences.csproj", "{2931D890-9420-4EA7-BCEE-AAD53108A629}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Confluent.Kafka.TestsCommon", "test\Confluent.Kafka.TestsCommon\Confluent.Kafka.TestsCommon.csproj", "{1DDD2809-5B7B-4B95-80D3-A3A516D6D356}"
EndProject
Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution
Debug|Any CPU = Debug|Any CPU
Expand Down Expand Up @@ -456,6 +458,18 @@ Global
{2931D890-9420-4EA7-BCEE-AAD53108A629}.Release|x64.Build.0 = Release|Any CPU
{2931D890-9420-4EA7-BCEE-AAD53108A629}.Release|x86.ActiveCfg = Release|Any CPU
{2931D890-9420-4EA7-BCEE-AAD53108A629}.Release|x86.Build.0 = Release|Any CPU
{1DDD2809-5B7B-4B95-80D3-A3A516D6D356}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{1DDD2809-5B7B-4B95-80D3-A3A516D6D356}.Debug|Any CPU.Build.0 = Debug|Any CPU
{1DDD2809-5B7B-4B95-80D3-A3A516D6D356}.Debug|x64.ActiveCfg = Debug|Any CPU
{1DDD2809-5B7B-4B95-80D3-A3A516D6D356}.Debug|x64.Build.0 = Debug|Any CPU
{1DDD2809-5B7B-4B95-80D3-A3A516D6D356}.Debug|x86.ActiveCfg = Debug|Any CPU
{1DDD2809-5B7B-4B95-80D3-A3A516D6D356}.Debug|x86.Build.0 = Debug|Any CPU
{1DDD2809-5B7B-4B95-80D3-A3A516D6D356}.Release|Any CPU.ActiveCfg = Release|Any CPU
{1DDD2809-5B7B-4B95-80D3-A3A516D6D356}.Release|Any CPU.Build.0 = Release|Any CPU
{1DDD2809-5B7B-4B95-80D3-A3A516D6D356}.Release|x64.ActiveCfg = Release|Any CPU
{1DDD2809-5B7B-4B95-80D3-A3A516D6D356}.Release|x64.Build.0 = Release|Any CPU
{1DDD2809-5B7B-4B95-80D3-A3A516D6D356}.Release|x86.ActiveCfg = Release|Any CPU
{1DDD2809-5B7B-4B95-80D3-A3A516D6D356}.Release|x86.Build.0 = Release|Any CPU
EndGlobalSection
GlobalSection(NestedProjects) = preSolution
{09C3255B-1972-4EB8-91D0-FB9F5CD82BCB} = {1EFCD839-0726-4BCE-B745-1E829991B1BC}
Expand Down Expand Up @@ -486,5 +500,6 @@ Global
{98D7F3E1-80EE-437C-8915-528BFD80E9B2} = {1EFCD839-0726-4BCE-B745-1E829991B1BC}
{8F582FFF-EA30-47F1-89D2-81A37F5E7E0C} = {9CE4B5F7-9251-4340-BACB-207066A5DBE8}
{2931D890-9420-4EA7-BCEE-AAD53108A629} = {9CE4B5F7-9251-4340-BACB-207066A5DBE8}
{1DDD2809-5B7B-4B95-80D3-A3A516D6D356} = {90058283-1F8F-465D-89E4-D4374A27E612}
EndGlobalSection
EndGlobal
2 changes: 1 addition & 1 deletion examples/AdminClient/AdminClient.csproj
Expand Up @@ -9,7 +9,7 @@
</PropertyGroup>

<ItemGroup>
<!-- nuget package reference: <PackageReference Include="Confluent.Kafka" Version="2.3.0" /> -->
<!-- nuget package reference: <PackageReference Include="Confluent.Kafka" Version="2.4.0-RC2" /> -->
<ProjectReference Include="../../src/Confluent.Kafka/Confluent.Kafka.csproj" />
</ItemGroup>

Expand Down
2 changes: 1 addition & 1 deletion examples/AvroBlogExamples/AvroBlogExamples.csproj
Expand Up @@ -8,7 +8,7 @@
</PropertyGroup>

<ItemGroup>
<!-- nuget package reference: <PackageReference Include="Confluent.SchemaRegistry.Serdes.Avro" Version="2.3.0" /> -->
<!-- nuget package reference: <PackageReference Include="Confluent.SchemaRegistry.Serdes.Avro" Version="2.4.0-RC2" /> -->
<ProjectReference Include="../../src/Confluent.SchemaRegistry.Serdes.Avro/Confluent.SchemaRegistry.Serdes.Avro.csproj" />
</ItemGroup>

Expand Down
2 changes: 1 addition & 1 deletion examples/AvroGeneric/AvroGeneric.csproj
Expand Up @@ -9,7 +9,7 @@
</PropertyGroup>

<ItemGroup>
<!-- nuget package reference: <PackageReference Include="Confluent.SchemaRegistry.Serdes.Avro" Version="2.3.0" /> -->
<!-- nuget package reference: <PackageReference Include="Confluent.SchemaRegistry.Serdes.Avro" Version="2.4.0-RC2" /> -->
<ProjectReference Include="../../src/Confluent.SchemaRegistry.Serdes.Avro/Confluent.SchemaRegistry.Serdes.Avro.csproj" />
</ItemGroup>

Expand Down
2 changes: 1 addition & 1 deletion examples/AvroSpecific/AvroSpecific.csproj
Expand Up @@ -9,7 +9,7 @@
</PropertyGroup>

<ItemGroup>
<!-- nuget package reference: <PackageReference Include="Confluent.SchemaRegistry.Serdes.Avro" Version="2.3.0" /> -->
<!-- nuget package reference: <PackageReference Include="Confluent.SchemaRegistry.Serdes.Avro" Version="2.4.0-RC2" /> -->
<ProjectReference Include="../../src/Confluent.SchemaRegistry.Serdes.Avro/Confluent.SchemaRegistry.Serdes.Avro.csproj" />
</ItemGroup>

Expand Down
2 changes: 1 addition & 1 deletion examples/Configuration/Configuration.csproj
Expand Up @@ -8,7 +8,7 @@
</PropertyGroup>

<ItemGroup>
<!-- nuget package reference: <PackageReference Include="Confluent.Kafka" Version="2.3.0" /> -->
<!-- nuget package reference: <PackageReference Include="Confluent.Kafka" Version="2.4.0-RC2" /> -->
<ProjectReference Include="../../src/Confluent.Kafka/Confluent.Kafka.csproj" />
<PackageReference Include="Microsoft.Extensions.Configuration" Version="6.0.0" />
<PackageReference Include="Microsoft.Extensions.Configuration.Json" Version="6.0.0" />
Expand Down
2 changes: 1 addition & 1 deletion examples/ConfluentCloud/ConfluentCloud.csproj
Expand Up @@ -7,7 +7,7 @@
</PropertyGroup>

<ItemGroup>
<!-- nuget package reference: <PackageReference Include="Confluent.Kafka" Version="2.3.0" /> -->
<!-- nuget package reference: <PackageReference Include="Confluent.Kafka" Version="2.4.0-RC2" /> -->
<ProjectReference Include="../../src/Confluent.Kafka/Confluent.Kafka.csproj" />
</ItemGroup>

Expand Down
2 changes: 1 addition & 1 deletion examples/Consumer/Consumer.csproj
Expand Up @@ -8,7 +8,7 @@
</PropertyGroup>

<ItemGroup>
<!-- nuget package reference: <PackageReference Include="Confluent.Kafka" Version="2.3.0" /> -->
<!-- nuget package reference: <PackageReference Include="Confluent.Kafka" Version="2.4.0-RC2" /> -->
<ProjectReference Include="../../src/Confluent.Kafka/Confluent.Kafka.csproj" />
</ItemGroup>

Expand Down
2 changes: 1 addition & 1 deletion examples/ExactlyOnce/ExactlyOnce.csproj
Expand Up @@ -9,7 +9,7 @@
</PropertyGroup>

<ItemGroup>
<!-- nuget package reference: <PackageReference Include="Confluent.Kafka" Version="2.3.0" /> -->
<!-- nuget package reference: <PackageReference Include="Confluent.Kafka" Version="2.4.0-RC2" /> -->
<ProjectReference Include="../../src/Confluent.Kafka/Confluent.Kafka.csproj" />
<PackageReference Include="Microsoft.FASTER.Core" Version="1.8.0" />
</ItemGroup>
Expand Down
2 changes: 1 addition & 1 deletion examples/ExactlyOnceOldBroker/ExactlyOnceOldBroker.csproj
Expand Up @@ -9,7 +9,7 @@
</PropertyGroup>

<ItemGroup>
<!-- nuget package reference: <PackageReference Include="Confluent.Kafka" Version="2.3.0" /> -->
<!-- nuget package reference: <PackageReference Include="Confluent.Kafka" Version="2.4.0-RC2" /> -->
<ProjectReference Include="../../src/Confluent.Kafka/Confluent.Kafka.csproj" />
<PackageReference Include="RocksDbSharp" Version="6.2.2" />
<PackageReference Include="RocksDbNative" Version="6.2.2" />
Expand Down
2 changes: 1 addition & 1 deletion examples/JsonSerialization/JsonSerialization.csproj
Expand Up @@ -9,7 +9,7 @@
</PropertyGroup>

<ItemGroup>
<!-- nuget package reference: <PackageReference Include="Confluent.SchemaRegistry.Serdes.Json" Version="2.3.0" /> -->
<!-- nuget package reference: <PackageReference Include="Confluent.SchemaRegistry.Serdes.Json" Version="2.4.0-RC2" /> -->
<ProjectReference Include="../../src/Confluent.SchemaRegistry.Serdes.Json/Confluent.SchemaRegistry.Serdes.Json.csproj" />
</ItemGroup>

Expand Down
2 changes: 1 addition & 1 deletion examples/JsonWithReferences/JsonWithReferences.csproj
Expand Up @@ -9,7 +9,7 @@
</PropertyGroup>

<ItemGroup>
<!-- nuget package reference: <PackageReference Include="Confluent.SchemaRegistry.Serdes.Json" Version="2.3.0" /> -->
<!-- nuget package reference: <PackageReference Include="Confluent.SchemaRegistry.Serdes.Json" Version="2.4.0-RC2" /> -->
<ProjectReference Include="../../src/Confluent.SchemaRegistry.Serdes.Json/Confluent.SchemaRegistry.Serdes.Json.csproj" />
<ProjectReference Include="../../src/Confluent.SchemaRegistry/Confluent.SchemaRegistry.csproj" />
</ItemGroup>
Expand Down
2 changes: 1 addition & 1 deletion examples/OAuthConsumer/OAuthConsumer.csproj
Expand Up @@ -9,7 +9,7 @@
</PropertyGroup>

<ItemGroup>
<!-- nuget package reference: <PackageReference Include="Confluent.Kafka" Version="2.3.0" /> -->
<!-- nuget package reference: <PackageReference Include="Confluent.Kafka" Version="2.4.0-RC2" /> -->
<ProjectReference Include="../../src/Confluent.Kafka/Confluent.Kafka.csproj" />
</ItemGroup>

Expand Down
2 changes: 1 addition & 1 deletion examples/OAuthOIDC/OAuthOIDC.csproj
Expand Up @@ -9,7 +9,7 @@
</PropertyGroup>

<ItemGroup>
<!-- nuget package reference: <PackageReference Include="Confluent.Kafka" Version="2.3.0" /> -->
<!-- nuget package reference: <PackageReference Include="Confluent.Kafka" Version="2.4.0-RC2" /> -->
<ProjectReference Include="../../src/Confluent.Kafka/Confluent.Kafka.csproj" />
</ItemGroup>

Expand Down
2 changes: 1 addition & 1 deletion examples/OAuthProducer/OAuthProducer.csproj
Expand Up @@ -9,7 +9,7 @@
</PropertyGroup>

<ItemGroup>
<!-- nuget package reference: <PackageReference Include="Confluent.Kafka" Version="2.3.0" /> -->
<!-- nuget package reference: <PackageReference Include="Confluent.Kafka" Version="2.4.0-RC2" /> -->
<ProjectReference Include="../../src/Confluent.Kafka/Confluent.Kafka.csproj" />
</ItemGroup>

Expand Down
2 changes: 1 addition & 1 deletion examples/Producer/Producer.csproj
Expand Up @@ -9,7 +9,7 @@
</PropertyGroup>

<ItemGroup>
<!-- nuget package reference: <PackageReference Include="Confluent.Kafka" Version="2.3.0" /> -->
<!-- nuget package reference: <PackageReference Include="Confluent.Kafka" Version="2.4.0-RC2" /> -->
<ProjectReference Include="../../src/Confluent.Kafka/Confluent.Kafka.csproj" />
</ItemGroup>

Expand Down
2 changes: 1 addition & 1 deletion examples/Protobuf/Protobuf.csproj
Expand Up @@ -9,7 +9,7 @@
</PropertyGroup>

<ItemGroup>
<!-- nuget package reference: <PackageReference Include="Confluent.SchemaRegistry.Serdes.Protobuf" Version="2.3.0" /> -->
<!-- nuget package reference: <PackageReference Include="Confluent.SchemaRegistry.Serdes.Protobuf" Version="2.4.0-RC2" /> -->
<ProjectReference Include="../../src/Confluent.SchemaRegistry.Serdes.Protobuf/Confluent.SchemaRegistry.Serdes.Protobuf.csproj" />
</ItemGroup>

Expand Down
2 changes: 1 addition & 1 deletion examples/TlsAuth/TlsAuth.csproj
Expand Up @@ -9,7 +9,7 @@
</PropertyGroup>

<ItemGroup>
<!-- nuget package reference: <PackageReference Include="Confluent.Kafka" Version="2.3.0" /> -->
<!-- nuget package reference: <PackageReference Include="Confluent.Kafka" Version="2.4.0-RC2" /> -->
<ProjectReference Include="../../src/Confluent.Kafka/Confluent.Kafka.csproj" />
</ItemGroup>

Expand Down
36 changes: 34 additions & 2 deletions src/Confluent.Kafka/Config_gen.cs
@@ -1,4 +1,4 @@
// *** Auto-generated from librdkafka v2.3.0 *** - do not modify manually.
// *** Auto-generated from librdkafka v2.4.0-RC2 *** - do not modify manually.
//
// Copyright 2018-2022 Confluent Inc.
//
Expand Down Expand Up @@ -176,6 +176,22 @@ public enum PartitionAssignmentStrategy
CooperativeSticky
}

/// <summary>
/// GroupProtocol enum values
/// </summary>
public enum GroupProtocol
{
/// <summary>
/// Classic
/// </summary>
Classic,

/// <summary>
/// Consumer
/// </summary>
Consumer
}

/// <summary>
/// IsolationLevel enum values
/// </summary>
Expand Down Expand Up @@ -1341,13 +1357,29 @@ public ConsumerConfig ThrowIfContainsNonUserConfigurable()
public int? HeartbeatIntervalMs { get { return GetInt("heartbeat.interval.ms"); } set { this.SetObject("heartbeat.interval.ms", value); } }

/// <summary>
/// Group protocol type. NOTE: Currently, the only supported group protocol type is `consumer`.
/// Group protocol type for the `classic` group protocol. NOTE: Currently, the only supported group protocol type is `consumer`.
///
/// default: consumer
/// importance: low
/// </summary>
public string GroupProtocolType { get { return Get("group.protocol.type"); } set { this.SetObject("group.protocol.type", value); } }

/// <summary>
/// Group protocol to use. Use `classic` for the original protocol and `consumer` for the new protocol introduced in KIP-848. Available protocols: classic or consumer. Default is `classic`, but will change to `consumer` in next releases.
///
/// default: classic
/// importance: high
/// </summary>
public GroupProtocol? GroupProtocol { get { return (GroupProtocol?)GetEnum(typeof(GroupProtocol), "group.protocol"); } set { this.SetObject("group.protocol", value); } }

/// <summary>
/// Server side assignor to use. Keep it null to make server select a suitable assignor for the group. Available assignors: uniform or range. Default is null
///
/// default: ''
/// importance: medium
/// </summary>
public string GroupRemoteAssignor { get { return Get("group.remote.assignor"); } set { this.SetObject("group.remote.assignor", value); } }

/// <summary>
/// How often to query for the current client group coordinator. If the currently assigned coordinator is down the configured query interval will be divided by ten to more quickly recover in case of coordinator reassignment.
///
Expand Down
4 changes: 2 additions & 2 deletions src/Confluent.Kafka/Confluent.Kafka.csproj
Expand Up @@ -13,7 +13,7 @@
<PackageReadmeFile>README.md</PackageReadmeFile>
<Title>Confluent.Kafka</Title>
<AssemblyName>Confluent.Kafka</AssemblyName>
<VersionPrefix>2.3.0</VersionPrefix>
<VersionPrefix>2.4.0-RC2</VersionPrefix>
<TargetFrameworks>netstandard2.0;netstandard1.3;net462;net6.0</TargetFrameworks>
<AllowUnsafeBlocks>true</AllowUnsafeBlocks>
<GenerateDocumentationFile>true</GenerateDocumentationFile>
Expand All @@ -22,7 +22,7 @@
</PropertyGroup>

<ItemGroup>
<PackageReference Include="librdkafka.redist" Version="2.3.0">
<PackageReference Include="librdkafka.redist" Version="2.4.0-RC2">
<PrivateAssets Condition="'$(TargetFrameworkIdentifier)' == '.NETFramework'">None</PrivateAssets>
</PackageReference>
<PackageReference Include="System.Memory" Version="4.5.0" />
Expand Down
Expand Up @@ -13,7 +13,7 @@
<PackageId>Confluent.SchemaRegistry.Serdes.Avro</PackageId>
<Title>Confluent.SchemaRegistry.Serdes.Avro</Title>
<AssemblyName>Confluent.SchemaRegistry.Serdes.Avro</AssemblyName>
<VersionPrefix>2.3.0</VersionPrefix>
<VersionPrefix>2.4.0-RC2</VersionPrefix>
<TargetFrameworks>netstandard2.0;</TargetFrameworks>
<AllowUnsafeBlocks>true</AllowUnsafeBlocks>
<GenerateDocumentationFile>true</GenerateDocumentationFile>
Expand Down
Expand Up @@ -13,7 +13,7 @@
<PackageId>Confluent.SchemaRegistry.Serdes.Json</PackageId>
<Title>Confluent.SchemaRegistry.Serdes.Json</Title>
<AssemblyName>Confluent.SchemaRegistry.Serdes.Json</AssemblyName>
<VersionPrefix>2.3.0</VersionPrefix>
<VersionPrefix>2.4.0-RC2</VersionPrefix>
<TargetFrameworks>netstandard2.0;</TargetFrameworks>
<AllowUnsafeBlocks>true</AllowUnsafeBlocks>
<GenerateDocumentationFile>true</GenerateDocumentationFile>
Expand Down
Expand Up @@ -13,7 +13,7 @@
<PackageId>Confluent.SchemaRegistry.Serdes.Protobuf</PackageId>
<Title>Confluent.SchemaRegistry.Serdes.Protobuf</Title>
<AssemblyName>Confluent.SchemaRegistry.Serdes.Protobuf</AssemblyName>
<VersionPrefix>2.3.0</VersionPrefix>
<VersionPrefix>2.4.0-RC2</VersionPrefix>
<TargetFrameworks>netstandard2.0;</TargetFrameworks>
<AllowUnsafeBlocks>true</AllowUnsafeBlocks>
<GenerateDocumentationFile>true</GenerateDocumentationFile>
Expand Down
Expand Up @@ -13,7 +13,7 @@
<PackageId>Confluent.SchemaRegistry</PackageId>
<Title>Confluent.SchemaRegistry</Title>
<AssemblyName>Confluent.SchemaRegistry</AssemblyName>
<VersionPrefix>2.3.0</VersionPrefix>
<VersionPrefix>2.4.0-RC2</VersionPrefix>
<TargetFrameworks>netstandard2.0;netstandard1.4</TargetFrameworks>
<AllowUnsafeBlocks>true</AllowUnsafeBlocks>
<GenerateDocumentationFile>true</GenerateDocumentationFile>
Expand Down
Expand Up @@ -16,6 +16,7 @@

<ItemGroup>
<ProjectReference Include="../../src/Confluent.Kafka/Confluent.Kafka.csproj" />
<ProjectReference Include="../Confluent.Kafka.TestsCommon/Confluent.Kafka.TestsCommon.csproj" />
</ItemGroup>

<ItemGroup>
Expand Down
4 changes: 4 additions & 0 deletions test/Confluent.Kafka.IntegrationTests/TemporaryTopic.cs
Expand Up @@ -15,6 +15,7 @@
// Refer to LICENSE for more information.

using System;
using System.Threading;
using System.Collections.Generic;
using Confluent.Kafka.Admin;

Expand All @@ -39,6 +40,9 @@ public TemporaryTopic(string prefix, string bootstrapServers, int numPartitions)
adminClient.CreateTopicsAsync(new List<TopicSpecification> {
new TopicSpecification { Name = Name, NumPartitions = numPartitions, ReplicationFactor = 1 } }).Wait();
adminClient.Dispose();

// Wait for propagation (KRaft mainly)
Thread.Sleep(1000);
}

public void Dispose()
Expand Down
3 changes: 2 additions & 1 deletion test/Confluent.Kafka.IntegrationTests/Tests/AddBroker.cs
Expand Up @@ -18,6 +18,7 @@

using System;
using Xunit;
using Confluent.Kafka.TestsCommon;


namespace Confluent.Kafka.IntegrationTests
Expand All @@ -37,7 +38,7 @@ public void AddBrokers(string bootstrapServers)
{
var producerConfig = new ProducerConfig { BootstrapServers = "localhost:65533" };

using (var producer = new ProducerBuilder<Null, string>(producerConfig).Build())
using (var producer = new TestProducerBuilder<Null, string>(producerConfig).Build())
using (var adminClient = new DependentAdminClientBuilder(producer.Handle).Build())
{
try
Expand Down