You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
I want to call reactor.stop() immediately after confirming with a call to channel.basic_ack(method.delivery_tag), but it executes asynchronously and does not return a Deferred. For now, the only way (that I could find) to do this is to add a wait before calling reactor.stop(). Similar issue on stackoverflow.
For reproduction:
run the code from the example
add any message to the QUEUE_NAME queue
Expected behavior - message is removed from the QUEUEUE_NAME queue because of a call to channel.basic_ack(method.delivery_tag)
Received behavior - the message is still in the queue
import logging
import pika
from pika.adapters import twisted_connection
from pika.adapters.twisted_connection import ClosableDeferredQueue, TwistedProtocolConnection
from pika.adapters.twisted_connection import TwistedChannel
from twisted.internet import defer, task
from twisted.internet import protocol, reactor
from twisted.internet.defer import Deferred
from twisted.python.failure import Failure
logging.basicConfig(level=logging.INFO, format="%(asctime)s [%(name)s] %(levelname)s: %(message)s")
logging.getLogger("pika").setLevel(logging.WARNING)
class CONFIG:
RABBITMQ_HOST: str = 'localhost'
RABBITMQ_PORT: int = 5672
RABBITMQ_VIRTUAL_HOST: str = '/'
RABBITMQ_USERNAME: str = 'guest'
RABBITMQ_PASSWORD: str = 'guest'
QUEUE_NAME: str = 'INPUT_TASK'
PREFETCH_COUNT: int = 1
class TwistedConsumer:
channel: TwistedChannel
queue_object: ClosableDeferredQueue
consumer_tag: str
connection: TwistedProtocolConnection
def start_consuming(self) -> Deferred:
return self._connect()
def _connect(self) -> Deferred:
cc = protocol.ClientCreator(
reactor,
twisted_connection.TwistedProtocolConnection,
pika.ConnectionParameters(
host=CONFIG.RABBITMQ_HOST,
port=CONFIG.RABBITMQ_PORT,
virtual_host=CONFIG.RABBITMQ_VIRTUAL_HOST,
credentials=pika.credentials.PlainCredentials(
username=CONFIG.RABBITMQ_USERNAME,
password=CONFIG.RABBITMQ_PASSWORD
),
)
)
deferred_connection: Deferred = cc.connectTCP(CONFIG.RABBITMQ_HOST, CONFIG.RABBITMQ_PORT)
deferred_connection.addCallback(self.on_connected)
deferred_connection.addCallback(self.run)
deferred_connection.addErrback(self.twisted_errback)
return deferred_connection
def on_connected(self, _protocol: TwistedProtocolConnection) -> Deferred:
return _protocol.ready
def twisted_errback(self, failure: Failure):
logging.info(failure)
@defer.inlineCallbacks
def run(self, connection: TwistedProtocolConnection) -> Deferred:
self.connection = connection
self.channel = yield connection.channel()
yield self.channel.queue_declare(queue=CONFIG.QUEUE_NAME, durable=True)
yield self.channel.basic_qos(prefetch_count=CONFIG.PREFETCH_COUNT)
self.queue_object, self.consumer_tag = yield self.channel.basic_consume(queue=CONFIG.QUEUE_NAME,
auto_ack=False)
for _ in range(CONFIG.PREFETCH_COUNT):
task.LoopingCall(self.on_message_consumed).start(interval=0, now=False)
@defer.inlineCallbacks
def on_message_consumed(self) -> None:
logging.info('on_message_consumed start')
deferred: Deferred = self.queue_object.get()
channel, method, properties, body = yield deferred
delivery_tag: int = method.delivery_tag
logging.info(f'received message {delivery_tag}')
yield self.sleep(2) # process message
result: None = self.channel.basic_ack(method.delivery_tag)
yield result # it doesn't work
logging.info(f'on_message_consumed end {delivery_tag}')
reactor.stop()
def sleep(self, second: int) -> Deferred:
d = Deferred()
reactor.callLater(second, d.callback, None)
return d
if __name__ == '__main__':
consumer = TwistedConsumer()
reactor.callFromThread(consumer.start_consuming)
reactor.run()
I want to call
reactor.stop()
immediately after confirming with a call tochannel.basic_ack(method.delivery_tag)
, but it executes asynchronously and does not return aDeferred
. For now, the only way (that I could find) to do this is to add a wait before callingreactor.stop()
.Similar issue on stackoverflow.
For reproduction:
Expected behavior - message is removed from the QUEUEUE_NAME queue because of a call to
channel.basic_ack(method.delivery_tag)
Received behavior - the message is still in the queue
The text was updated successfully, but these errors were encountered: