Skip to content

Commit

Permalink
librdkafka v2.1.0 (#2035)
Browse files Browse the repository at this point in the history
* librdkafka v2.1.0

* Fix KIP-320 add log truncation error code

* Fix failing Consumer_Poll_MessageError test
  • Loading branch information
emasab committed Apr 6, 2023
1 parent 5576880 commit de70d7d
Show file tree
Hide file tree
Showing 8 changed files with 24 additions and 13 deletions.
2 changes: 1 addition & 1 deletion src/Confluent.Kafka/Config_gen.cs
@@ -1,4 +1,4 @@
// *** Auto-generated from librdkafka v2.1.0-RC3 *** - do not modify manually.
// *** Auto-generated from librdkafka v2.1.0 *** - do not modify manually.
//
// Copyright 2018-2022 Confluent Inc.
//
Expand Down
4 changes: 2 additions & 2 deletions src/Confluent.Kafka/Confluent.Kafka.csproj
Expand Up @@ -12,7 +12,7 @@
<PackageId>Confluent.Kafka</PackageId>
<Title>Confluent.Kafka</Title>
<AssemblyName>Confluent.Kafka</AssemblyName>
<VersionPrefix>2.1.0-RC3</VersionPrefix>
<VersionPrefix>2.1.0</VersionPrefix>
<TargetFrameworks>netstandard2.0;netstandard1.3;net462;net6.0</TargetFrameworks>
<AllowUnsafeBlocks>true</AllowUnsafeBlocks>
<GenerateDocumentationFile>true</GenerateDocumentationFile>
Expand All @@ -21,7 +21,7 @@
</PropertyGroup>

<ItemGroup>
<PackageReference Include="librdkafka.redist" Version="2.1.0-RC3">
<PackageReference Include="librdkafka.redist" Version="2.1.0">
<PrivateAssets Condition="'$(TargetFrameworkIdentifier)' == '.NETFramework'">None</PrivateAssets>
</PackageReference>
<PackageReference Include="System.Memory" Version="4.5.0" />
Expand Down
17 changes: 12 additions & 5 deletions src/Confluent.Kafka/Consumer.cs
Expand Up @@ -794,6 +794,13 @@ internal Consumer(ConsumerBuilder<TKey, TValue> builder)
try
{
var msg = Util.Marshal.PtrToStructure<rd_kafka_message>(msgPtr);
int? msgLeaderEpoch = null;
Offset msgOffset = msg.offset;

if (msg.rkt != IntPtr.Zero && (msgOffset != Offset.Unset))
{
msgLeaderEpoch = Librdkafka.message_leader_epoch(msgPtr);
}

string topic = null;
if (this.enableTopicNameMarshaling)
Expand All @@ -810,7 +817,7 @@ internal Consumer(ConsumerBuilder<TKey, TValue> builder)
{
TopicPartitionOffset = new TopicPartitionOffset(topic,
msg.partition, msg.offset,
Librdkafka.message_leader_epoch(msgPtr)),
msgLeaderEpoch),
Message = null,
IsPartitionEOF = true
};
Expand Down Expand Up @@ -857,7 +864,7 @@ internal Consumer(ConsumerBuilder<TKey, TValue> builder)
{
TopicPartitionOffset = new TopicPartitionOffset(topic,
msg.partition, msg.offset,
Librdkafka.message_leader_epoch(msgPtr)),
msgLeaderEpoch),
Message = new Message<byte[], byte[]>
{
Timestamp = timestamp,
Expand Down Expand Up @@ -890,7 +897,7 @@ internal Consumer(ConsumerBuilder<TKey, TValue> builder)
{
TopicPartitionOffset = new TopicPartitionOffset(topic,
msg.partition, msg.offset,
Librdkafka.message_leader_epoch(msgPtr)),
msgLeaderEpoch),
Message = new Message<byte[], byte[]>
{
Timestamp = timestamp,
Expand Down Expand Up @@ -924,7 +931,7 @@ internal Consumer(ConsumerBuilder<TKey, TValue> builder)
{
TopicPartitionOffset = new TopicPartitionOffset(topic,
msg.partition, msg.offset,
Librdkafka.message_leader_epoch(msgPtr)),
msgLeaderEpoch),
Message = new Message<byte[], byte[]>
{
Timestamp = timestamp,
Expand All @@ -942,7 +949,7 @@ internal Consumer(ConsumerBuilder<TKey, TValue> builder)
{
TopicPartitionOffset = new TopicPartitionOffset(topic,
msg.partition, msg.offset,
Librdkafka.message_leader_epoch(msgPtr)),
msgLeaderEpoch),
Message = new Message<TKey, TValue>
{
Timestamp = timestamp,
Expand Down
6 changes: 5 additions & 1 deletion src/Confluent.Kafka/ErrorCode.cs
Expand Up @@ -329,7 +329,11 @@ public enum ErrorCode
/// No offset to automatically reset to
/// </summary>
Local_AutoOffsetReset = -140,


/// <summary>
/// Partition log truncation detected
/// </summary>
Local_LogTruncation = -139,

/// <summary>
/// Unknown broker error
Expand Down
Expand Up @@ -13,7 +13,7 @@
<PackageId>Confluent.SchemaRegistry.Serdes.Avro</PackageId>
<Title>Confluent.SchemaRegistry.Serdes.Avro</Title>
<AssemblyName>Confluent.SchemaRegistry.Serdes.Avro</AssemblyName>
<VersionPrefix>2.1.0-RC3</VersionPrefix>
<VersionPrefix>2.1.0</VersionPrefix>
<TargetFrameworks>netstandard2.0;</TargetFrameworks>
<AllowUnsafeBlocks>true</AllowUnsafeBlocks>
<GenerateDocumentationFile>true</GenerateDocumentationFile>
Expand Down
Expand Up @@ -13,7 +13,7 @@
<PackageId>Confluent.SchemaRegistry.Serdes.Json</PackageId>
<Title>Confluent.SchemaRegistry.Serdes.Json</Title>
<AssemblyName>Confluent.SchemaRegistry.Serdes.Json</AssemblyName>
<VersionPrefix>2.1.0-RC3</VersionPrefix>
<VersionPrefix>2.1.0</VersionPrefix>
<TargetFrameworks>netstandard2.0;</TargetFrameworks>
<AllowUnsafeBlocks>true</AllowUnsafeBlocks>
<GenerateDocumentationFile>true</GenerateDocumentationFile>
Expand Down
Expand Up @@ -13,7 +13,7 @@
<PackageId>Confluent.SchemaRegistry.Serdes.Protobuf</PackageId>
<Title>Confluent.SchemaRegistry.Serdes.Protobuf</Title>
<AssemblyName>Confluent.SchemaRegistry.Serdes.Protobuf</AssemblyName>
<VersionPrefix>2.1.0-RC3</VersionPrefix>
<VersionPrefix>2.1.0</VersionPrefix>
<TargetFrameworks>netstandard2.0;</TargetFrameworks>
<AllowUnsafeBlocks>true</AllowUnsafeBlocks>
<GenerateDocumentationFile>true</GenerateDocumentationFile>
Expand Down
Expand Up @@ -13,7 +13,7 @@
<PackageId>Confluent.SchemaRegistry</PackageId>
<Title>Confluent.SchemaRegistry</Title>
<AssemblyName>Confluent.SchemaRegistry</AssemblyName>
<VersionPrefix>2.1.0-RC3</VersionPrefix>
<VersionPrefix>2.1.0</VersionPrefix>
<TargetFrameworks>netstandard2.0;netstandard1.4</TargetFrameworks>
<AllowUnsafeBlocks>true</AllowUnsafeBlocks>
<GenerateDocumentationFile>true</GenerateDocumentationFile>
Expand Down

0 comments on commit de70d7d

Please sign in to comment.