Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

kafkaAdmin auto configuration when creating KafkaTemplate #2780

Open
ChangguHan opened this issue Aug 23, 2023 · 12 comments
Open

kafkaAdmin auto configuration when creating KafkaTemplate #2780

ChangguHan opened this issue Aug 23, 2023 · 12 comments

Comments

@ChangguHan
Copy link

Expected Behavior
When using multiple kafkaTemplates with observations, the kafkaAdmin made automatically can connect to kafka with SSL.

Current Behavior
When using multiple kafkaTemplates with observations, the properties for sasl, security, ssl is not applied to the kafkaAdmin.
It could be the problem when I set bootstrap.servers with the port for ssl.

        // KafkaTemplate.java
	@Override
	public void afterSingletonsInstantiated() {
		if (this.observationEnabled && this.applicationContext != null) {
			this.observationRegistry = this.applicationContext.getBeanProvider(ObservationRegistry.class)
					.getIfUnique(() -> this.observationRegistry);
			if (this.kafkaAdmin == null) {
				this.kafkaAdmin = this.applicationContext.getBeanProvider(KafkaAdmin.class).getIfUnique();
				if (this.kafkaAdmin != null) {
					Object producerServers = this.producerFactory.getConfigurationProperties()
							.get(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG);
					String adminServers = this.kafkaAdmin.getBootstrapServers();
					if (!producerServers.equals(adminServers)) {
						Map<String, Object> props = new HashMap<>(this.kafkaAdmin.getConfigurationProperties());

                                                  // It only set bootstrap.server
						props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, producerServers);
						int opTo = this.kafkaAdmin.getOperationTimeout();
						this.kafkaAdmin = new KafkaAdmin(props);
						this.kafkaAdmin.setOperationTimeout(opTo);
					}
				}
			}
		}
		else if (this.micrometerEnabled) {
			this.micrometerHolder = obtainMicrometerHolder();
		}
	}

Context

I would like to suggest two things.

  1. When creating kafkaAdmin at 489 lines, add props for SASL, security, and SSL.
  2. Instead of getting a unique kafkaAdmin bean at 480, guide users to create a kafkaTemplate with kafkaAdmin or use setter.
    Then we could simplify the kafkaAdmin creation logic by making a new kafkaAdmin with producerFactory's config when kakfAdmin is null.

The sample code would be like this.

	@Override
	public void afterSingletonsInstantiated() {
		if (this.observationEnabled && this.applicationContext != null) {
			this.observationRegistry = this.applicationContext.getBeanProvider(ObservationRegistry.class)
					.getIfUnique(() -> this.observationRegistry);
			if (this.kafkaAdmin == null) {
				this.kafkaAdmin = new KafkaAdmin(this.producerFactory.getConfigurationProperties())
			}
		}
		else if (this.micrometerEnabled) {
			this.micrometerHolder = obtainMicrometerHolder();
		}
	}
@garyrussell
Copy link
Contributor

@ChangguHan This is a reasonable request, but the work around is to set an admin instance yourself...

/**
* Set the {@link KafkaAdmin}, used to find the cluster id for observation, if
* present.
* @param kafkaAdmin the admin.
*/
public void setKafkaAdmin(KafkaAdmin kafkaAdmin) {
this.kafkaAdmin = kafkaAdmin;
}

@ChangguHan
Copy link
Author

ChangguHan commented Aug 25, 2023

@garyrussell
Thank you for your comment.

I understood your point.
But It needed a little supplement about SSL if you intended to create a new instance when bootstrap servers from the origin bean and producerFactory are different.

if (!producerServers.equals(adminServers)) {
Map<String, Object> props = new HashMap<>(this.kafkaAdmin.getConfigurationProperties());
props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, producerServers);
int opTo = this.kafkaAdmin.getOperationTimeout();
this.kafkaAdmin = new KafkaAdmin(props);
this.kafkaAdmin.setOperationTimeout(opTo);
}

@garyrussell garyrussell modified the milestones: 3.0.11, 3.0.12 Aug 28, 2023
@garyrussell garyrussell modified the milestones: 3.0.12, Backlog Oct 4, 2023
@Wzy19930507

This comment was marked as outdated.

@PPrydorozhnyi
Copy link

I think I have an issue kinda related to this request. When I want to publish events in parallel and Kafka Admin is not initialized yet, I see several Kafka admin creations/initializations in the logs, and my publishing just stops. As a result, no events are published, and the producing process gets stuck.

I mean something like this:
`val executor = Executors.newVirtualThreadPerTaskExecutor()
val n = 1000

    for (i in 1..n) {
        CompletableFuture.runAsync(
            { template.send(inTopic, i.toString(), "value") },
            executor);
    }`

creation of Kafka Admin beforehand and setting it for the template solves the issue. but I wondering if this is the correct behavior

@artembilan
Copy link
Member

@PPrydorozhnyi ,

that is not possible if your template is a singleton bean in the application context.
The logic there is like this:

	public void afterSingletonsInstantiated() {
		if (this.observationEnabled && this.applicationContext != null) {
			this.observationRegistry = this.applicationContext.getBeanProvider(ObservationRegistry.class)
					.getIfUnique(() -> this.observationRegistry);
			if (this.kafkaAdmin == null) {
				this.kafkaAdmin = this.applicationContext.getBeanProvider(KafkaAdmin.class).getIfUnique();

And this afterSingletonsInstantiated() is called only once when application context is ready.

@PPrydorozhnyi
Copy link

Hi @artembilan. Thanks for the quick response.

Unfortunately, I'm able to reproduce it without any custom bean scopes or manual bean creation.

Created a small project, so you could try it by yourself - Reproduce example

Could you please check? Thanks in advance.

@artembilan
Copy link
Member

o.a.k.clients.admin.AdminClientConfig    : AdminClientConfig values:

But that's correct, because AdminClient is not a KafkaAdmin.
We talk about different object.
Yes, KafkaAdmin uses AdminClient and does that this way:

			try (AdminClient client = createAdmin()) {
				this.clusterId = client.describeCluster().clusterId().get(this.operationTimeout, TimeUnit.SECONDS);
				if (this.clusterId == null) {
					this.clusterId = "null";
				}
			}

So, it is not a surprise to see several instances of the AdminClient created on the fly.

Not sure, though why there has to be many of them since KafkaTemplate logic is like this:

	private String clusterId() {
		if (this.kafkaAdmin != null && this.clusterId == null) {
			this.clusterId = this.kafkaAdmin.clusterId();
		}
		return this.clusterId;
	}

The clusterId is resolved only once.

I'll run your application after lunch.

@artembilan
Copy link
Member

@PPrydorozhnyi ,

can you update your sample project, please, with build tool ?
Not clear what dependencies you use there.
According to README it supposed to be Gradle, so just add those artifacts into the repo.

@PPrydorozhnyi
Copy link

@artembilan

yeap, sorry. added

@artembilan
Copy link
Member

@PPrydorozhnyi ,

I see what is going on.
That KafkaTemplate.clusterId() is really guilty.
When we call send() with observation concurrently, all those threads are meeting a this.clusterId == null condition.
And therefore all of them are calling this.kafkaAdmin.clusterId() 😄

Probably not related to this issue, but still looks like a bug 🤷

@artembilan
Copy link
Member

See the fix: #2944

@PPrydorozhnyi
Copy link

@artembilan nice, thanks a lot!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

5 participants