Skip to content

Commit

Permalink
Test 2
Browse files Browse the repository at this point in the history
  • Loading branch information
emasab committed May 1, 2024
1 parent 5274a73 commit 4e1451d
Show file tree
Hide file tree
Showing 3 changed files with 33 additions and 9 deletions.
3 changes: 2 additions & 1 deletion .semaphore/semaphore.yml
Expand Up @@ -127,7 +127,8 @@ blocks:
- cd test/Confluent.Kafka.IntegrationTests && dotnet test -l "console;verbosity=normal" && cd ../..
- name: 'Build and test with "consumer" protocol'
commands:
- cd test/docker && docker-compose -f docker-compose-kraft.yaml up -d && sleep 300 && cd ../..
- cd test/docker && docker-compose -f docker-compose-kraft.yaml up -d && cd ../..
- sleep 300
- export SEMAPHORE_SKIP_FLAKY_TESTS='true'
- export TEST_CONSUMER_GROUP_PROTOCOL=consumer
- dotnet restore
Expand Down
Expand Up @@ -85,6 +85,7 @@ public void AdminClient_IncrementalAlterConfigs(string bootstrapServers)
}
};
adminClient.IncrementalAlterConfigsAsync(toUpdate);
Thread.Sleep(TimeSpan.FromMilliseconds(200));
describeConfigsResult = adminClient.DescribeConfigsAsync(new List<ConfigResource> { configResource }).Result;
Assert.Equal("10001", describeConfigsResult[0].Entries["flush.ms"].Value);
Assert.Equal("delete,compact", describeConfigsResult[0].Entries["cleanup.policy"].Value);
Expand All @@ -95,6 +96,7 @@ public void AdminClient_IncrementalAlterConfigs(string bootstrapServers)
{ configResource, new List<ConfigEntry> { new ConfigEntry { Name = "flush.ms", Value = "20002" , IncrementalOperation = AlterConfigOpType.Set } } }
};
adminClient.IncrementalAlterConfigsAsync(toUpdate, new IncrementalAlterConfigsOptions { ValidateOnly = true }).Wait();
Thread.Sleep(TimeSpan.FromMilliseconds(200));
describeConfigsResult = adminClient.DescribeConfigsAsync(new List<ConfigResource> { configResource }).Result;
Assert.Equal("10001", describeConfigsResult[0].Entries["flush.ms"].Value);

Expand All @@ -116,6 +118,7 @@ public void AdminClient_IncrementalAlterConfigs(string bootstrapServers)
{ configResource2, new List<ConfigEntry> { new ConfigEntry { Name = "flush.ms", Value = "333" , IncrementalOperation = AlterConfigOpType.Set } } }
};
adminClient.IncrementalAlterConfigsAsync(toUpdate).Wait();
Thread.Sleep(TimeSpan.FromMilliseconds(200));
describeConfigsResult = adminClient.DescribeConfigsAsync(new List<ConfigResource> { configResource, configResource2 }).Result;
Assert.Equal(2, describeConfigsResult.Count);
Assert.Equal("222", describeConfigsResult[0].Entries["flush.ms"].Value);
Expand Down
Expand Up @@ -17,6 +17,7 @@
#pragma warning disable xUnit1026

using System;
using System.Threading;
using System.Collections.Generic;
using Xunit;
using Confluent.Kafka.TestsCommon;
Expand Down Expand Up @@ -51,22 +52,41 @@ public void Consumer_Commit_Committed_Position(string bootstrapServers)

var firstMessage = messages[0];
var lastMessage = messages[N - 1];

using (var consumer = new TestConsumerBuilder<byte[], byte[]>(consumerConfig).Build())
{
if (TestConsumerGroupProtocol.IsClassic())
consumer.Assign(new TopicPartitionOffset(singlePartitionTopic, 0, firstMsgOffset));
else
// FIXME: There are problems with new coordinator when
// manually assigning and committing offsets for a group
// id it's returning NotCoordinatorForGroup
consumer.Subscribe(singlePartitionTopic);
consumer.Assign(new TopicPartitionOffset(singlePartitionTopic, 0, firstMsgOffset));
// Test #0.5 (invalid cases)
var offset = consumer.Position(new TopicPartition("invalid-topic", 0));
Assert.Equal(Offset.Unset, offset);

// Test #1
// This is one of the first tests, it seems with KRaft
// group coordinator is loaded on demand.
var record = consumer.Consume(TimeSpan.FromMilliseconds(30000));
var os = consumer.Commit();
List<TopicPartitionOffset> os = null;
while (os == null)
{
try
{
os = consumer.Commit();
}
catch (KafkaException e)
{
Console.WriteLine(e.Error);
if (e.Error == ErrorCode.GroupLoadInProgress ||
e.Error == ErrorCode.NotCoordinatorForGroup)
{
Thread.Sleep(1000);
continue;
}
else
{
throw;
}
}
}

Assert.Equal(firstMsgOffset + 1, os[0].Offset);
offset = consumer.Position(new TopicPartition(singlePartitionTopic, 0));
var co = consumer.Committed(new List<TopicPartition> { new TopicPartition(singlePartitionTopic, 0) }, TimeSpan.FromSeconds(10));
Expand Down

0 comments on commit 4e1451d

Please sign in to comment.