Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

AdminClient caching all topics in metadata request calls #1737

Open
6 of 7 tasks
Jgomez13 opened this issue Apr 26, 2024 · 8 comments
Open
6 of 7 tasks

AdminClient caching all topics in metadata request calls #1737

Jgomez13 opened this issue Apr 26, 2024 · 8 comments
Labels
investigate further It's unclear what the issue is at this time but there is enough interest to look into it

Comments

@Jgomez13
Copy link

Jgomez13 commented Apr 26, 2024

Description

I was doing some testing with the Admin client where I delete and recreate topics over and over again with new names over a period of three days and I see that the admin client is caching every topic name and will cause timeouts. I have gotten to a point where only 5 topics exist on the broker but its looking for 8k topics

How to reproduce

normal admin client can be 1.9.2 or 2.3, but list current topics, then delete them, then create new topics again over and over for about a week and you will see them.

Checklist

Please provide the following information:

  • confluent-kafka-python and librdkafka version (confluent_kafka.version() and confluent_kafka.libversion()):
  • [1.9.2]
  • Apache Kafka broker version: Microsoft Event Hubs
  • Client configuration: {...}
    image
    conf = {
    'bootstrap.servers': self.stressconfigs.get_appr_broker(self.mode),
    'security.protocol': 'SASL_SSL',
    'ssl.ca.location': '',
    'sasl.mechanism': 'PLAIN',
    'sasl.username': '$ConnectionString',
    'sasl.password': self.stressconfigs.get_appr_Connectiontring(self.mode),
    'client.id': 'python-admin-client' + self.clientId,
    'allow.auto.create.topics': 'false',
    'connections.max.idle.ms': 180000,
    'socket.keepalive.enable' : True,
    'request.timeout.ms': 60000,
    'socket.timeout.ms': 60000,
    'message.timeout.ms': 60000,
    }
  • Operating system: linux Azure kubernetes
  • Provide client logs (with 'debug': '..' as necessary)
  • Provide broker log excerpts
  • Critical issue Yes
@Jgomez13
Copy link
Author

@pranavrth pranavrth added the investigate further It's unclear what the issue is at this time but there is enough interest to look into it label Apr 29, 2024
@Jgomez13
Copy link
Author

@Jgomez13
Copy link
Author

@pranavrth any ideas here? or any other context needed?

@pranavrth
Copy link
Member

I didn't understand the flow of your code:

  • Are you just creating and deleting the topic in the Admin client?
  • Where are you seeing these topics?
  • Can you provide sample code to reproduce the issue?

@Jgomez13
Copy link
Author

Jgomez13 commented May 17, 2024

The main worker here is the Topic_manipulation method

  1. list topics
  2. describe topics
  3. delete all topics
  4. create 10 topics
  5. repeat

We only see the no longer existent topics listed in metadata but we dont see it in our print statements of the topics returned in any listtopic,createtopic, or deletetopic
for the repro please just add these env variables or replace as you see fit
brokerList = os.getenv('EVENTHUBS_BROKER')
connectionString = os.getenv('EVENTHUBS_CONNECTION_STRING')

from confluent_kafka.admin import AdminClient, NewTopic, KafkaError
import confluent_kafka
from random import randint
from time import sleep
from uuid import uuid4
from datetime import datetime
from pprint import pprint
import os 

global_timeout = 60
numOfTopicsFromLastCreateAttempt = 0
topicID = 0

def printwithSeparator(message):
    print( '-'*20)
    print(message)
    print( '-'*20)

def create_admin_client():
    conf = {
        'bootstrap.servers': os.getenv('EVENTHUBS_BROKER'), 
        'security.protocol': 'SASL_SSL',
        'ssl.ca.location': '',
        'sasl.mechanism': 'PLAIN',
        'sasl.username': '$ConnectionString',
        'sasl.password':  os.getenv('EVENTHUBS_CONNECTION_STRING'),
        'client.id': f'python-admin-{randint(0, 10000)}-{confluent_kafka.__version__}',
        'allow.auto.create.topics': 'false',
        'connections.max.idle.ms': 180000,
        'socket.keepalive.enable' : True,
        'request.timeout.ms': 60000,
        'socket.timeout.ms': 60000,
        'message.timeout.ms':  60000,
        'debug': 'admin,metadata',
    }
    admin = AdminClient(conf)
    printwithSeparator(f'Admin { conf["client.id"] } started')
    return admin
    
admin = create_admin_client()
        
def describeTopics(inTopics):
    described = []
    for topic in inTopics:
        try:
            md = admin.list_topics(topic ,timeout=global_timeout)
            currTopic =  list(md.topics.values())[0]
            described.append(str(currTopic))
        except Exception as e:
            printwithSeparator("Error describing certain topic {}".format(topic), e)            
    printwithSeparator(f'Described {len(described)} Topics: [{", ".join(described)}]')
   
def listTopic(in_timeout=global_timeout):
    try:
        md = admin.list_topics(timeout=in_timeout)
        topics = set()
        for t in iter(md.topics.values()):
            if t.error is not None:
                errstr = ": {}".format(t.error)
                printwithSeparator(" Error getting topic: {} {} error:{}".format(t, errstr,t.error))
            else:
                errstr = ""
            topics.add(f'{t}')
        printwithSeparator(f'populated {len(topics)} topic(s) [{", ".join(topics)}]')
    except Exception as e:
        printwithSeparator("Error Listing topics", e)
        return set()
    return topics

def delete_topics(topics, in_timeout=global_timeout):
    if (len(topics) == 0):
        return
    delete_topics = []
    try:
        fs = admin.delete_topics(topics, request_timeout=in_timeout)
        for topic, f in fs.items():
            try:
                f.result()
                delete_topics.append(topic)
            except Exception as e:
                printwithSeparator("Failed to delete topic {}: {}".format(topic, e))
    except Exception as e:          
        printwithSeparator("Error Deleting topics {}".format(topics), e)
    printwithSeparator(f'Deleted {len(delete_topics)} Topics: [{", ".join(delete_topics)}]')

def createTenTopics():
    newtopics = []
    global topicID
    topics =[]
    for i in range(10):
        newtopics.append(NewTopic(f'dyna-topic-{topicID}', num_partitions= randint(1, 32), replication_factor=1))
        topicID += 1
    try:
        fs = admin.create_topics(newtopics, request_timeout=global_timeout)
        for topic, f in fs.items():
            try:
                f.result()
                topics.append(topic)
            except Exception as e:
                printwithSeparator("Failed to create topic {}: {}".format(topic, e))
        printwithSeparator(f'Tried to create {len(newtopics)} Created {len(topics)} Topics: [{", ".join(topics)}]')
    except Exception as e:
        printwithSeparator("Error Creating Random topics", e)    
    return len(topics)

def topic_manipulation():
    global numOfTopicsFromLastCreateAttempt
    topics = listTopic()
    printwithSeparator(f'last created topics:{numOfTopicsFromLastCreateAttempt} populated {len(topics)}')
    if topics and len(topics) > 0:
        topics = list(topics)
        describeTopics(topics)
        delete_topics(topics)
        sleep(10)
    numOfTopicsFromLastCreateAttempt = createTenTopics()
    sleep(10)

def main():
    try:
        while True:
            try:
                topic_manipulation()
            except Exception as e:
                printwithSeparator(f'Caught Exception: {e}')
    except KeyboardInterrupt as e:
        printwithSeparator("Caught KeyboardInterrupt")

main()

@Jgomez13
Copy link
Author

Jgomez13 commented May 17, 2024

we see the topic list of the metadata request for apikey 3 grow over time increasingly even though when the admin client does a listtopic we only see the current topics that are in the namespace for example:

Namespace.servicebus.windows.net contains:
dyna-20
dyna-21
dyna-22
dyna-23
dyna-24

the admin client makes the metadata request with a large list of topics even though theres only 5 topics in the namespace
but the
metadata
Metadata Request => [topics] allow_auto_topic_creation
topics => ['dyna-1','dyna-2','dyna-3','dyna-4','dyna-5','dyna-6',........'dyna-6000',null,null,null,....., 2000th null]
name => STRING
allow_auto_topic_creation => False

Heres the debug log on one of the topics that was deleted but admin was still looking

%7|1715967551.780|METADATA|python-admin-7974-2.2.0#producer-1| [thrd:main]: sasl_ssl://namespace.servicebus.windows.net:9093/0: Request metadata for 40 topic(s): refresh unavailable topics
%7|1715967551.902|METADATA|python-admin-7974-2.2.0#producer-1| [thrd:main]: sasl_ssl://namespace.servicebus.windows.net:9093/0: ===== Received metadata (for 31 requested topics): refresh unavailable topics =====
%7|1715967551.902|METADATA|python-admin-7974-2.2.0#producer-1| [thrd:main]: sasl_ssl://namespace.servicebus.windows.net:9093/0: ClusterId: namespace.servicebus.windows.net, ControllerId: 0
%7|1715967551.903|METADATA|python-admin-7974-2.2.0#producer-1| [thrd:main]: sasl_ssl://namespace.servicebus.windows.net:9093/0: 1 brokers, 31 topics
%7|1715967551.903|METADATA|python-admin-7974-2.2.0#producer-1| [thrd:main]: sasl_ssl://namespace.servicebus.windows.net:9093/0:   Broker #0/1: namespace.servicebus.windows.net:9093 NodeId 0
%7|1715967551.903|METADATA|python-admin-7974-2.2.0#producer-1| [thrd:main]: sasl_ssl://namespace.servicebus.windows.net:9093/0:   Topic dyna-topic-2 with 0 partitions: Broker: Unknown topic or partition
%7|1715967551.903|METADATA|python-admin-7974-2.2.0#producer-1| [thrd:main]: Error in metadata reply for topic dyna-topic-2 (PartCnt 0): Broker: Unknown topic or partition
%7|1715967551.903|METADATA|python-admin-7974-2.2.0#producer-1| [thrd:main]: sasl_ssl://namespace.servicebus.windows.net:9093/0:   Topic dyna-topic-4 with 0 partitions: Broker: Unknown topic or partition
%7|1715967551.903|METADATA|python-admin-7974-2.2.0#producer-1| [thrd:main]: Error in metadata reply for topic dyna-topic-4 (PartCnt 0): Broker: Unknown topic or partition
%7|1715967551.903|METADATA|python-admin-7974-2.2.0#producer-1| [thrd:main]: sasl_ssl://namespace.servicebus.windows.net:9093/0:   Topic dyna-topic-3 with 0 partitions: Broker: Unknown topic or partition
%7|1715967551.903|METADATA|python-admin-7974-2.2.0#producer-1| [thrd:main]: Error in metadata reply for topic dyna-topic-3 (PartCnt 0): Broker: Unknown topic or partition
%7|1715967551.903|METADATA|python-admin-7974-2.2.0#producer-1| [thrd:main]: sasl_ssl://namespace.servicebus.windows.net:9093/0:   Topic dyna-topic-1 with 0 partitions: Broker: Unknown topic or partition
%7|1715967551.904|METADATA|python-admin-7974-2.2.0#producer-1| [thrd:main]: Error in metadata reply for topic dyna-topic-1 (PartCnt 0): Broker: Unknown topic or partition
%7|1715967551.904|METADATA|python-admin-7974-2.2.0#producer-1| [thrd:main]: sasl_ssl://namespace.servicebus.windows.net:9093/0:   Topic dyna-topic-6 with 0 partitions: Broker: Unknown topic or partition
%7|1715967551.904|METADATA|python-admin-7974-2.2.0#producer-1| [thrd:main]: Error in metadata reply for topic dyna-topic-6 (PartCnt 0): Broker: Unknown topic or partition
%7|1715967551.904|METADATA|python-admin-7974-2.2.0#producer-1| [thrd:main]: sasl_ssl://namespace.servicebus.windows.net:9093/0:   Topic dyna-topic-0 with 0 partitions: Broker: Unknown topic or partition
%7|1715967551.905|METADATA|python-admin-7974-2.2.0#producer-1| [thrd:main]: Error in metadata reply for topic dyna-topic-0 (PartCnt 0): Broker: Unknown topic or partition
%7|1715967551.905|METADATA|python-admin-7974-2.2.0#producer-1| [thrd:main]: sasl_ssl://namespace.servicebus.windows.net:9093/0:   Topic dyna-topic-5 with 0 partitions: Broker: Unknown topic or partition
%7|1715967551.905|METADATA|python-admin-7974-2.2.0#producer-1| [thrd:main]: Error in metadata reply for topic dyna-topic-5 (PartCnt 0): Broker: Unknown topic or partition
%7|1715967551.905|METADATA|python-admin-7974-2.2.0#producer-1| [thrd:main]: sasl_ssl://namespace.servicebus.windows.net:9093/0:   Topic dyna-topic-8 with 0 partitions: Broker: Unknown topic or partition
%7|1715967551.905|METADATA|python-admin-7974-2.2.0#producer-1| [thrd:main]: Error in metadata reply for topic dyna-topic-8 (PartCnt 0): Broker: Unknown topic or partition
%7|1715967551.905|METADATA|python-admin-7974-2.2.0#producer-1| [thrd:main]: sasl_ssl://namespace.servicebus.windows.net:9093/0:   Topic dyna-topic-9 with 0 partitions: Broker: Unknown topic or partition
%7|1715967551.905|METADATA|python-admin-7974-2.2.0#producer-1| [thrd:main]: Error in metadata reply for topic dyna-topic-9 (PartCnt 0): Broker: Unknown topic or partition
%7|1715967551.905|METADATA|python-admin-7974-2.2.0#producer-1| [thrd:main]: sasl_ssl://namespace.servicebus.windows.net:9093/0:   Topic dyna-topic-7 with 0 partitions: Broker: Unknown topic or partition
%7|1715967551.905|METADATA|python-admin-7974-2.2.0#producer-1| [thrd:main]: Error in metadata reply for topic dyna-topic-7 (PartCnt 0): Broker: Unknown topic or partition
%7|1715967551.905|METADATA|python-admin-7974-2.2.0#producer-1| [thrd:main]: sasl_ssl://namespace.servicebus.windows.net:9093/0:   Topic dyna-topic-15 with 0 partitions: Broker: Unknown topic or partition
%7|1715967551.905|METADATA|python-admin-7974-2.2.0#producer-1| [thrd:main]: Error in metadata reply for topic dyna-topic-15 (PartCnt 0): Broker: Unknown topic or partition

@Jgomez13
Copy link
Author

Hey @pranavrth hopefully this is enough to repro the issue

@pranavrth
Copy link
Member

Sure. I will check it this week.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
investigate further It's unclear what the issue is at this time but there is enough interest to look into it
Projects
None yet
Development

No branches or pull requests

2 participants