Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow multiple calls to SetStatisticsHandler / SetLogHandler / SetErrorHandler #2155

Open
wants to merge 4 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Expand Up @@ -20,3 +20,4 @@ UpgradeLog*.htm
*~
\#*
core
.idea/
g7ed6e marked this conversation as resolved.
Show resolved Hide resolved
18 changes: 3 additions & 15 deletions src/Confluent.Kafka/ConsumerBuilder.cs
Expand Up @@ -157,11 +157,7 @@ public ConsumerBuilder(IEnumerable<KeyValuePair<string, string>> config)
public ConsumerBuilder<TKey, TValue> SetStatisticsHandler(
Action<IConsumer<TKey, TValue>, string> statisticsHandler)
{
if (this.StatisticsHandler != null)
{
throw new InvalidOperationException("Statistics handler may not be specified more than once.");
}
this.StatisticsHandler = statisticsHandler;
this.StatisticsHandler += statisticsHandler;
return this;
}

Expand All @@ -180,11 +176,7 @@ public ConsumerBuilder(IEnumerable<KeyValuePair<string, string>> config)
public ConsumerBuilder<TKey, TValue> SetErrorHandler(
Action<IConsumer<TKey, TValue>, Error> errorHandler)
{
if (this.ErrorHandler != null)
{
throw new InvalidOperationException("Error handler may not be specified more than once.");
}
this.ErrorHandler = errorHandler;
this.ErrorHandler += errorHandler;
return this;
}

Expand All @@ -210,11 +202,7 @@ public ConsumerBuilder(IEnumerable<KeyValuePair<string, string>> config)
public ConsumerBuilder<TKey, TValue> SetLogHandler(
Action<IConsumer<TKey, TValue>, LogMessage> logHandler)
{
if (this.LogHandler != null)
{
throw new InvalidOperationException("Log handler may not be specified more than once.");
}
this.LogHandler = logHandler;
this.LogHandler += logHandler;
return this;
}

Expand Down
18 changes: 3 additions & 15 deletions src/Confluent.Kafka/ProducerBuilder.cs
Expand Up @@ -167,11 +167,7 @@ public ProducerBuilder(IEnumerable<KeyValuePair<string, string>> config)
/// </remarks>
public ProducerBuilder<TKey, TValue> SetStatisticsHandler(Action<IProducer<TKey, TValue>, string> statisticsHandler)
{
if (this.StatisticsHandler != null)
{
throw new InvalidOperationException("Statistics handler may not be specified more than once.");
}
this.StatisticsHandler = statisticsHandler;
this.StatisticsHandler += statisticsHandler;
return this;
}

Expand Down Expand Up @@ -222,11 +218,7 @@ public ProducerBuilder(IEnumerable<KeyValuePair<string, string>> config)
/// </remarks>
public ProducerBuilder<TKey, TValue> SetErrorHandler(Action<IProducer<TKey, TValue>, Error> errorHandler)
{
if (this.ErrorHandler != null)
{
throw new InvalidOperationException("Error handler may not be specified more than once.");
}
this.ErrorHandler = errorHandler;
this.ErrorHandler += errorHandler;
return this;
}

Expand All @@ -251,11 +243,7 @@ public ProducerBuilder(IEnumerable<KeyValuePair<string, string>> config)
/// </remarks>
public ProducerBuilder<TKey, TValue> SetLogHandler(Action<IProducer<TKey, TValue>, LogMessage> logHandler)
{
if (this.LogHandler != null)
{
throw new InvalidOperationException("Log handler may not be specified more than once.");
}
this.LogHandler = logHandler;
this.LogHandler += logHandler;
return this;
}

Expand Down
16 changes: 16 additions & 0 deletions test/Confluent.Kafka.IntegrationTests/SkipWhenCITheory.cs
@@ -0,0 +1,16 @@
using System;
using Xunit;

namespace Confluent.Kafka.IntegrationTests;

public sealed class SkipWhenCITheory : TheoryAttribute
{
private const string JenkinsBuildIdEnvVarName = "BUILD_ID";

public SkipWhenCITheory(string reason)
{
Skip = Environment.GetEnvironmentVariables().Contains(JenkinsBuildIdEnvVarName)
? reason
: null;
}
}
230 changes: 230 additions & 0 deletions test/Confluent.Kafka.IntegrationTests/Tests/Builder_Handlers.cs
@@ -0,0 +1,230 @@
// Copyright 2016-2017 Confluent Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
// Refer to LICENSE for more information.

#pragma warning disable xUnit1026

using System;
using System.Linq;
using System.Threading;
using Xunit;


namespace Confluent.Kafka.IntegrationTests
{
/// <summary>
/// Test multiple calls to SetLogHandler, SetStatisticsHandler and SetErrorHandler
/// </summary>
public partial class Tests
{
private const string UnreachableBootstrapServers = "localhost:9000";

[Theory, MemberData(nameof(KafkaParameters))]
public void ProducerBuilder_SetLogHandler(string bootstrapServers)
{
LogToFile("start ProducerBuilder_SetLogHandler");

var producerConfig = new ProducerConfig
{
BootstrapServers = bootstrapServers,
Debug = "all"
};

ManualResetEventSlim mres1 = new(), mres2 = new();

using var _ = new ProducerBuilder<string, string>(producerConfig)
.SetLogHandler((_, _) => mres1.Set())
.SetLogHandler((_, _) => mres2.Set())
.Build();

Assert.True(mres1.Wait(TimeSpan.FromSeconds(5)));
Assert.True(mres2.Wait(TimeSpan.FromSeconds(5)));

LogToFile("end ProducerBuilder_SetLogHandler");
}

[Theory, MemberData(nameof(KafkaParameters))]
public void ProducerBuilder_SetStatisticsHandler(string bootstrapServers)
{
LogToFile("start ProducerBuilder_SetStatisticsHandler");

var producerConfig = new ProducerConfig
{
BootstrapServers = bootstrapServers,
StatisticsIntervalMs = 100
};

ManualResetEventSlim mres1 = new(), mres2 = new();

using var _ = new ProducerBuilder<string, string>(producerConfig)
.SetStatisticsHandler((_, _) => mres1.Set())
.SetStatisticsHandler((_, _) => mres2.Set())
.Build();

Assert.True(mres1.Wait(TimeSpan.FromSeconds(5)));
Assert.True(mres2.Wait(TimeSpan.FromSeconds(5)));

LogToFile("end ProducerBuilder_SetStatisticsHandler");
}

[Theory, InlineData(UnreachableBootstrapServers)]
public void ProducerBuilder_SetErrorHandler(string bootstrapServers)
{
LogToFile("start ProducerBuilder_SetErrorHandler");

var producerConfig = new ProducerConfig
{
BootstrapServers = bootstrapServers
};

ManualResetEventSlim mres1 = new(), mres2 = new();

using var _ = new ProducerBuilder<string, string>(producerConfig)
.SetErrorHandler((_, _) => mres1.Set())
.SetErrorHandler((_, _) => mres2.Set())
.Build();

Assert.True(mres1.Wait(TimeSpan.FromSeconds(5)));
Assert.True(mres2.Wait(TimeSpan.FromSeconds(5)));

LogToFile("end ProducerBuilder_SetErrorHandler");
}

[Theory, MemberData(nameof(KafkaParameters))]
public void ConsumerBuilder_SetLogHandler(string bootstrapServers)
{
LogToFile("start ConsumerBuilder_SetLogHandler");

int N = 2;
var firstProduced = Util.ProduceNullStringMessages(bootstrapServers, singlePartitionTopic, 100, N);

var consumerConfig = new ConsumerConfig
{
GroupId = Guid.NewGuid().ToString(),
BootstrapServers = bootstrapServers,
SessionTimeoutMs = 6000,
EnablePartitionEof = true,
Debug = "all"
};

ManualResetEventSlim mres1 = new(), mres2 = new();

using var consumer = new ConsumerBuilder<byte[], byte[]>(consumerConfig)
.SetPartitionsAssignedHandler((c, partitions) =>
{
Assert.Single(partitions);
Assert.Equal(firstProduced.TopicPartition, partitions[0]);
return partitions.Select(p => new TopicPartitionOffset(p, firstProduced.Offset));
})
.SetLogHandler((_, _) => mres1.Set())
.SetLogHandler((_, _) => mres2.Set())
.Build();
consumer.Subscribe(singlePartitionTopic);

Assert.True(mres1.Wait(TimeSpan.FromSeconds(5)));
Assert.True(mres2.Wait(TimeSpan.FromSeconds(5)));

LogToFile("end ConsumerBuilder_SetLogHandler");
}

[Theory, MemberData(nameof(KafkaParameters))]
public void ConsumerBuilder_SetStatisticsHandler(string bootstrapServers)
{
LogToFile("start ConsumerBuilder_SetStatisticsHandler");

int N = 2;
var firstProduced = Util.ProduceNullStringMessages(bootstrapServers, singlePartitionTopic, 100, N);

var consumerConfig = new ConsumerConfig
{
GroupId = Guid.NewGuid().ToString(),
BootstrapServers = bootstrapServers,
SessionTimeoutMs = 6000,
EnablePartitionEof = true,
StatisticsIntervalMs = 100
};

ManualResetEventSlim mres1 = new(), mres2 = new();

using (var consumer = new ConsumerBuilder<byte[], byte[]>(consumerConfig)
.SetPartitionsAssignedHandler((c, partitions) =>
{
Assert.Single(partitions);
Assert.Equal(firstProduced.TopicPartition, partitions[0]);
return partitions.Select(p => new TopicPartitionOffset(p, firstProduced.Offset));
})
.SetStatisticsHandler((_, _) => mres1.Set())
.SetStatisticsHandler((_, _) => mres2.Set())
.Build())
{
consumer.Subscribe(singlePartitionTopic);

while (true)
{
var record = consumer.Consume(TimeSpan.FromMilliseconds(100));
if (record == null) { continue; }
if (record.IsPartitionEOF) { break; }
}

Assert.True(mres1.Wait(TimeSpan.FromSeconds(5)));
Assert.True(mres2.Wait(TimeSpan.FromSeconds(5)));
consumer.Close();
}

LogToFile("end ConsumerBuilder_SetStatisticsHandler");
}

[SkipWhenCITheory("Requires to stop the broker in the while loop to simulate broker is down."), MemberData(nameof(KafkaParameters))]
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure I understand. This requires someone to manually stop the broker?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Exactly. this is required during the unit test to get the error handler called.

public void ConsumerBuilder_SetErrorHandler(string bootstrapServers)
{
LogToFile("start ConsumerBuilder_SetErrorHandler");

int N = 2;
var firstProduced = Util.ProduceNullStringMessages(bootstrapServers, singlePartitionTopic, 100, N);

var consumerConfig = new ConsumerConfig
{
GroupId = Guid.NewGuid().ToString(),
BootstrapServers = bootstrapServers,
SessionTimeoutMs = 6000
};

bool errorHandler1Called = false, errorHandler2Called = false;

using (var consumer = new ConsumerBuilder<byte[], byte[]>(consumerConfig)
.SetPartitionsAssignedHandler((c, partitions) =>
{
Assert.Single(partitions);
Assert.Equal(firstProduced.TopicPartition, partitions[0]);
return partitions.Select(p => new TopicPartitionOffset(p, firstProduced.Offset));
})
.SetErrorHandler((_, _) => errorHandler1Called = true)
.SetErrorHandler((_, _) => errorHandler2Called = true)
.Build())
{
consumer.Subscribe(singlePartitionTopic);

while (!errorHandler1Called && !errorHandler2Called)
{
consumer.Consume(TimeSpan.FromMilliseconds(100));
}

consumer.Close();
}

LogToFile("end ConsumerBuilder_SetErrorHandler");
}
}
}