Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

channel.basic_ack for TwistedChannel does not return Deferred #1341

Open
AntonGsv opened this issue Feb 3, 2022 · 1 comment
Open

channel.basic_ack for TwistedChannel does not return Deferred #1341

AntonGsv opened this issue Feb 3, 2022 · 1 comment
Assignees

Comments

@AntonGsv
Copy link

AntonGsv commented Feb 3, 2022

I want to call reactor.stop() immediately after confirming with a call to channel.basic_ack(method.delivery_tag), but it executes asynchronously and does not return a Deferred. For now, the only way (that I could find) to do this is to add a wait before calling reactor.stop().
Similar issue on stackoverflow.

For reproduction:

  • run the code from the example
  • add any message to the QUEUE_NAME queue

Expected behavior - message is removed from the QUEUEUE_NAME queue because of a call to channel.basic_ack(method.delivery_tag)
Received behavior - the message is still in the queue

import logging

import pika
from pika.adapters import twisted_connection
from pika.adapters.twisted_connection import ClosableDeferredQueue, TwistedProtocolConnection
from pika.adapters.twisted_connection import TwistedChannel
from twisted.internet import defer, task
from twisted.internet import protocol, reactor
from twisted.internet.defer import Deferred
from twisted.python.failure import Failure

logging.basicConfig(level=logging.INFO, format="%(asctime)s [%(name)s] %(levelname)s: %(message)s")
logging.getLogger("pika").setLevel(logging.WARNING)


class CONFIG:
    RABBITMQ_HOST: str = 'localhost'
    RABBITMQ_PORT: int = 5672
    RABBITMQ_VIRTUAL_HOST: str = '/'
    RABBITMQ_USERNAME: str = 'guest'
    RABBITMQ_PASSWORD: str = 'guest'
    QUEUE_NAME: str = 'INPUT_TASK'
    PREFETCH_COUNT: int = 1


class TwistedConsumer:
    channel: TwistedChannel
    queue_object: ClosableDeferredQueue
    consumer_tag: str
    connection: TwistedProtocolConnection

    def start_consuming(self) -> Deferred:
        return self._connect()

    def _connect(self) -> Deferred:
        cc = protocol.ClientCreator(
            reactor,
            twisted_connection.TwistedProtocolConnection,
            pika.ConnectionParameters(
                host=CONFIG.RABBITMQ_HOST,
                port=CONFIG.RABBITMQ_PORT,
                virtual_host=CONFIG.RABBITMQ_VIRTUAL_HOST,
                credentials=pika.credentials.PlainCredentials(
                    username=CONFIG.RABBITMQ_USERNAME,
                    password=CONFIG.RABBITMQ_PASSWORD
                ),
            )
        )

        deferred_connection: Deferred = cc.connectTCP(CONFIG.RABBITMQ_HOST, CONFIG.RABBITMQ_PORT)
        deferred_connection.addCallback(self.on_connected)
        deferred_connection.addCallback(self.run)
        deferred_connection.addErrback(self.twisted_errback)
        return deferred_connection

    def on_connected(self, _protocol: TwistedProtocolConnection) -> Deferred:
        return _protocol.ready

    def twisted_errback(self, failure: Failure):
        logging.info(failure)

    @defer.inlineCallbacks
    def run(self, connection: TwistedProtocolConnection) -> Deferred:
        self.connection = connection
        self.channel = yield connection.channel()
        yield self.channel.queue_declare(queue=CONFIG.QUEUE_NAME, durable=True)

        yield self.channel.basic_qos(prefetch_count=CONFIG.PREFETCH_COUNT)
        self.queue_object, self.consumer_tag = yield self.channel.basic_consume(queue=CONFIG.QUEUE_NAME,
                                                                                auto_ack=False)

        for _ in range(CONFIG.PREFETCH_COUNT):
            task.LoopingCall(self.on_message_consumed).start(interval=0, now=False)

    @defer.inlineCallbacks
    def on_message_consumed(self) -> None:
        logging.info('on_message_consumed start')
        deferred: Deferred = self.queue_object.get()
        channel, method, properties, body = yield deferred
        delivery_tag: int = method.delivery_tag
        logging.info(f'received message {delivery_tag}')
        yield self.sleep(2)  # process message
        result: None = self.channel.basic_ack(method.delivery_tag)
        yield result  # it doesn't work
        logging.info(f'on_message_consumed end {delivery_tag}')
        reactor.stop()

    def sleep(self, second: int) -> Deferred:
        d = Deferred()
        reactor.callLater(second, d.callback, None)
        return d


if __name__ == '__main__':
    consumer = TwistedConsumer()
    reactor.callFromThread(consumer.start_consuming)
    reactor.run()
python = "^3.8"
pika = "^1.1.0"
Twisted = {version = ">=17.9.0", extras = ["http2"]}
@lukebakken lukebakken self-assigned this May 24, 2022
@lukebakken lukebakken added this to the 1.3.0 milestone May 24, 2022
@lukebakken
Copy link
Member

Thank you, I will investigate this.

@lukebakken lukebakken removed this from the 1.3.0 milestone Jun 29, 2022
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants