-
Notifications
You must be signed in to change notification settings - Fork 878
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
AdminClient caching all topics in metadata request calls #1737
Comments
@pranavrth any ideas here? or any other context needed? |
I didn't understand the flow of your code:
|
The main worker here is the Topic_manipulation method
We only see the no longer existent topics listed in metadata but we dont see it in our print statements of the topics returned in any listtopic,createtopic, or deletetopic from confluent_kafka.admin import AdminClient, NewTopic, KafkaError
import confluent_kafka
from random import randint
from time import sleep
from uuid import uuid4
from datetime import datetime
from pprint import pprint
import os
global_timeout = 60
numOfTopicsFromLastCreateAttempt = 0
topicID = 0
def printwithSeparator(message):
print( '-'*20)
print(message)
print( '-'*20)
def create_admin_client():
conf = {
'bootstrap.servers': os.getenv('EVENTHUBS_BROKER'),
'security.protocol': 'SASL_SSL',
'ssl.ca.location': '',
'sasl.mechanism': 'PLAIN',
'sasl.username': '$ConnectionString',
'sasl.password': os.getenv('EVENTHUBS_CONNECTION_STRING'),
'client.id': f'python-admin-{randint(0, 10000)}-{confluent_kafka.__version__}',
'allow.auto.create.topics': 'false',
'connections.max.idle.ms': 180000,
'socket.keepalive.enable' : True,
'request.timeout.ms': 60000,
'socket.timeout.ms': 60000,
'message.timeout.ms': 60000,
'debug': 'admin,metadata',
}
admin = AdminClient(conf)
printwithSeparator(f'Admin { conf["client.id"] } started')
return admin
admin = create_admin_client()
def describeTopics(inTopics):
described = []
for topic in inTopics:
try:
md = admin.list_topics(topic ,timeout=global_timeout)
currTopic = list(md.topics.values())[0]
described.append(str(currTopic))
except Exception as e:
printwithSeparator("Error describing certain topic {}".format(topic), e)
printwithSeparator(f'Described {len(described)} Topics: [{", ".join(described)}]')
def listTopic(in_timeout=global_timeout):
try:
md = admin.list_topics(timeout=in_timeout)
topics = set()
for t in iter(md.topics.values()):
if t.error is not None:
errstr = ": {}".format(t.error)
printwithSeparator(" Error getting topic: {} {} error:{}".format(t, errstr,t.error))
else:
errstr = ""
topics.add(f'{t}')
printwithSeparator(f'populated {len(topics)} topic(s) [{", ".join(topics)}]')
except Exception as e:
printwithSeparator("Error Listing topics", e)
return set()
return topics
def delete_topics(topics, in_timeout=global_timeout):
if (len(topics) == 0):
return
delete_topics = []
try:
fs = admin.delete_topics(topics, request_timeout=in_timeout)
for topic, f in fs.items():
try:
f.result()
delete_topics.append(topic)
except Exception as e:
printwithSeparator("Failed to delete topic {}: {}".format(topic, e))
except Exception as e:
printwithSeparator("Error Deleting topics {}".format(topics), e)
printwithSeparator(f'Deleted {len(delete_topics)} Topics: [{", ".join(delete_topics)}]')
def createTenTopics():
newtopics = []
global topicID
topics =[]
for i in range(10):
newtopics.append(NewTopic(f'dyna-topic-{topicID}', num_partitions= randint(1, 32), replication_factor=1))
topicID += 1
try:
fs = admin.create_topics(newtopics, request_timeout=global_timeout)
for topic, f in fs.items():
try:
f.result()
topics.append(topic)
except Exception as e:
printwithSeparator("Failed to create topic {}: {}".format(topic, e))
printwithSeparator(f'Tried to create {len(newtopics)} Created {len(topics)} Topics: [{", ".join(topics)}]')
except Exception as e:
printwithSeparator("Error Creating Random topics", e)
return len(topics)
def topic_manipulation():
global numOfTopicsFromLastCreateAttempt
topics = listTopic()
printwithSeparator(f'last created topics:{numOfTopicsFromLastCreateAttempt} populated {len(topics)}')
if topics and len(topics) > 0:
topics = list(topics)
describeTopics(topics)
delete_topics(topics)
sleep(10)
numOfTopicsFromLastCreateAttempt = createTenTopics()
sleep(10)
def main():
try:
while True:
try:
topic_manipulation()
except Exception as e:
printwithSeparator(f'Caught Exception: {e}')
except KeyboardInterrupt as e:
printwithSeparator("Caught KeyboardInterrupt")
main() |
we see the topic list of the metadata request for apikey 3 grow over time increasingly even though when the admin client does a listtopic we only see the current topics that are in the namespace for example: Namespace.servicebus.windows.net contains: the admin client makes the metadata request with a large list of topics even though theres only 5 topics in the namespace Heres the debug log on one of the topics that was deleted but admin was still looking %7|1715967551.780|METADATA|python-admin-7974-2.2.0#producer-1| [thrd:main]: sasl_ssl://namespace.servicebus.windows.net:9093/0: Request metadata for 40 topic(s): refresh unavailable topics
%7|1715967551.902|METADATA|python-admin-7974-2.2.0#producer-1| [thrd:main]: sasl_ssl://namespace.servicebus.windows.net:9093/0: ===== Received metadata (for 31 requested topics): refresh unavailable topics =====
%7|1715967551.902|METADATA|python-admin-7974-2.2.0#producer-1| [thrd:main]: sasl_ssl://namespace.servicebus.windows.net:9093/0: ClusterId: namespace.servicebus.windows.net, ControllerId: 0
%7|1715967551.903|METADATA|python-admin-7974-2.2.0#producer-1| [thrd:main]: sasl_ssl://namespace.servicebus.windows.net:9093/0: 1 brokers, 31 topics
%7|1715967551.903|METADATA|python-admin-7974-2.2.0#producer-1| [thrd:main]: sasl_ssl://namespace.servicebus.windows.net:9093/0: Broker #0/1: namespace.servicebus.windows.net:9093 NodeId 0
%7|1715967551.903|METADATA|python-admin-7974-2.2.0#producer-1| [thrd:main]: sasl_ssl://namespace.servicebus.windows.net:9093/0: Topic dyna-topic-2 with 0 partitions: Broker: Unknown topic or partition
%7|1715967551.903|METADATA|python-admin-7974-2.2.0#producer-1| [thrd:main]: Error in metadata reply for topic dyna-topic-2 (PartCnt 0): Broker: Unknown topic or partition
%7|1715967551.903|METADATA|python-admin-7974-2.2.0#producer-1| [thrd:main]: sasl_ssl://namespace.servicebus.windows.net:9093/0: Topic dyna-topic-4 with 0 partitions: Broker: Unknown topic or partition
%7|1715967551.903|METADATA|python-admin-7974-2.2.0#producer-1| [thrd:main]: Error in metadata reply for topic dyna-topic-4 (PartCnt 0): Broker: Unknown topic or partition
%7|1715967551.903|METADATA|python-admin-7974-2.2.0#producer-1| [thrd:main]: sasl_ssl://namespace.servicebus.windows.net:9093/0: Topic dyna-topic-3 with 0 partitions: Broker: Unknown topic or partition
%7|1715967551.903|METADATA|python-admin-7974-2.2.0#producer-1| [thrd:main]: Error in metadata reply for topic dyna-topic-3 (PartCnt 0): Broker: Unknown topic or partition
%7|1715967551.903|METADATA|python-admin-7974-2.2.0#producer-1| [thrd:main]: sasl_ssl://namespace.servicebus.windows.net:9093/0: Topic dyna-topic-1 with 0 partitions: Broker: Unknown topic or partition
%7|1715967551.904|METADATA|python-admin-7974-2.2.0#producer-1| [thrd:main]: Error in metadata reply for topic dyna-topic-1 (PartCnt 0): Broker: Unknown topic or partition
%7|1715967551.904|METADATA|python-admin-7974-2.2.0#producer-1| [thrd:main]: sasl_ssl://namespace.servicebus.windows.net:9093/0: Topic dyna-topic-6 with 0 partitions: Broker: Unknown topic or partition
%7|1715967551.904|METADATA|python-admin-7974-2.2.0#producer-1| [thrd:main]: Error in metadata reply for topic dyna-topic-6 (PartCnt 0): Broker: Unknown topic or partition
%7|1715967551.904|METADATA|python-admin-7974-2.2.0#producer-1| [thrd:main]: sasl_ssl://namespace.servicebus.windows.net:9093/0: Topic dyna-topic-0 with 0 partitions: Broker: Unknown topic or partition
%7|1715967551.905|METADATA|python-admin-7974-2.2.0#producer-1| [thrd:main]: Error in metadata reply for topic dyna-topic-0 (PartCnt 0): Broker: Unknown topic or partition
%7|1715967551.905|METADATA|python-admin-7974-2.2.0#producer-1| [thrd:main]: sasl_ssl://namespace.servicebus.windows.net:9093/0: Topic dyna-topic-5 with 0 partitions: Broker: Unknown topic or partition
%7|1715967551.905|METADATA|python-admin-7974-2.2.0#producer-1| [thrd:main]: Error in metadata reply for topic dyna-topic-5 (PartCnt 0): Broker: Unknown topic or partition
%7|1715967551.905|METADATA|python-admin-7974-2.2.0#producer-1| [thrd:main]: sasl_ssl://namespace.servicebus.windows.net:9093/0: Topic dyna-topic-8 with 0 partitions: Broker: Unknown topic or partition
%7|1715967551.905|METADATA|python-admin-7974-2.2.0#producer-1| [thrd:main]: Error in metadata reply for topic dyna-topic-8 (PartCnt 0): Broker: Unknown topic or partition
%7|1715967551.905|METADATA|python-admin-7974-2.2.0#producer-1| [thrd:main]: sasl_ssl://namespace.servicebus.windows.net:9093/0: Topic dyna-topic-9 with 0 partitions: Broker: Unknown topic or partition
%7|1715967551.905|METADATA|python-admin-7974-2.2.0#producer-1| [thrd:main]: Error in metadata reply for topic dyna-topic-9 (PartCnt 0): Broker: Unknown topic or partition
%7|1715967551.905|METADATA|python-admin-7974-2.2.0#producer-1| [thrd:main]: sasl_ssl://namespace.servicebus.windows.net:9093/0: Topic dyna-topic-7 with 0 partitions: Broker: Unknown topic or partition
%7|1715967551.905|METADATA|python-admin-7974-2.2.0#producer-1| [thrd:main]: Error in metadata reply for topic dyna-topic-7 (PartCnt 0): Broker: Unknown topic or partition
%7|1715967551.905|METADATA|python-admin-7974-2.2.0#producer-1| [thrd:main]: sasl_ssl://namespace.servicebus.windows.net:9093/0: Topic dyna-topic-15 with 0 partitions: Broker: Unknown topic or partition
%7|1715967551.905|METADATA|python-admin-7974-2.2.0#producer-1| [thrd:main]: Error in metadata reply for topic dyna-topic-15 (PartCnt 0): Broker: Unknown topic or partition |
Hey @pranavrth hopefully this is enough to repro the issue |
Sure. I will check it this week. |
Description
I was doing some testing with the Admin client where I delete and recreate topics over and over again with new names over a period of three days and I see that the admin client is caching every topic name and will cause timeouts. I have gotten to a point where only 5 topics exist on the broker but its looking for 8k topics
How to reproduce
normal admin client can be 1.9.2 or 2.3, but list current topics, then delete them, then create new topics again over and over for about a week and you will see them.
Checklist
Please provide the following information:
confluent_kafka.version()
andconfluent_kafka.libversion()
):{...}
conf = {
'bootstrap.servers': self.stressconfigs.get_appr_broker(self.mode),
'security.protocol': 'SASL_SSL',
'ssl.ca.location': '',
'sasl.mechanism': 'PLAIN',
'sasl.username': '$ConnectionString',
'sasl.password': self.stressconfigs.get_appr_Connectiontring(self.mode),
'client.id': 'python-admin-client' + self.clientId,
'allow.auto.create.topics': 'false',
'connections.max.idle.ms': 180000,
'socket.keepalive.enable' : True,
'request.timeout.ms': 60000,
'socket.timeout.ms': 60000,
'message.timeout.ms': 60000,
}
'debug': '..'
as necessary)The text was updated successfully, but these errors were encountered: