Skip to content

Commit

Permalink
Test 2
Browse files Browse the repository at this point in the history
  • Loading branch information
emasab committed May 1, 2024
1 parent 5274a73 commit c6c35e6
Show file tree
Hide file tree
Showing 2 changed files with 30 additions and 9 deletions.
3 changes: 2 additions & 1 deletion .semaphore/semaphore.yml
Expand Up @@ -127,7 +127,8 @@ blocks:
- cd test/Confluent.Kafka.IntegrationTests && dotnet test -l "console;verbosity=normal" && cd ../..
- name: 'Build and test with "consumer" protocol'
commands:
- cd test/docker && docker-compose -f docker-compose-kraft.yaml up -d && sleep 300 && cd ../..
- cd test/docker && docker-compose -f docker-compose-kraft.yaml up -d && cd ../..
- sleep 300
- export SEMAPHORE_SKIP_FLAKY_TESTS='true'
- export TEST_CONSUMER_GROUP_PROTOCOL=consumer
- dotnet restore
Expand Down
Expand Up @@ -17,6 +17,7 @@
#pragma warning disable xUnit1026

using System;
using System.Threading;
using System.Collections.Generic;
using Xunit;
using Confluent.Kafka.TestsCommon;
Expand Down Expand Up @@ -51,22 +52,41 @@ public void Consumer_Commit_Committed_Position(string bootstrapServers)

var firstMessage = messages[0];
var lastMessage = messages[N - 1];

using (var consumer = new TestConsumerBuilder<byte[], byte[]>(consumerConfig).Build())
{
if (TestConsumerGroupProtocol.IsClassic())
consumer.Assign(new TopicPartitionOffset(singlePartitionTopic, 0, firstMsgOffset));
else
// FIXME: There are problems with new coordinator when
// manually assigning and committing offsets for a group
// id it's returning NotCoordinatorForGroup
consumer.Subscribe(singlePartitionTopic);
consumer.Assign(new TopicPartitionOffset(singlePartitionTopic, 0, firstMsgOffset));
// Test #0.5 (invalid cases)
var offset = consumer.Position(new TopicPartition("invalid-topic", 0));
Assert.Equal(Offset.Unset, offset);

// Test #1
// This is one of the first tests, it seems with KRaft
// group coordinator is loaded on demand.
var record = consumer.Consume(TimeSpan.FromMilliseconds(30000));
var os = consumer.Commit();
List<TopicPartitionOffset> os = null;
while (os == null)
{
try
{
os = consumer.Commit();
}
catch (KafkaException e)
{
Console.WriteLine(e.Error);
if (e.Error == ErrorCode.GroupLoadInProgress ||
e.Error == ErrorCode.NotCoordinatorForGroup)
{
Thread.Sleep(1000);
continue;
}
else
{
throw;
}
}
}

Assert.Equal(firstMsgOffset + 1, os[0].Offset);
offset = consumer.Position(new TopicPartition(singlePartitionTopic, 0));
var co = consumer.Committed(new List<TopicPartition> { new TopicPartition(singlePartitionTopic, 0) }, TimeSpan.FromSeconds(10));
Expand Down

0 comments on commit c6c35e6

Please sign in to comment.