Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

pika throws ChannelWrongStateError('Channel is closed.') on publish #1240

Open
SantjagoCorkez opened this issue Aug 28, 2019 · 16 comments
Open
Assignees
Milestone

Comments

@SantjagoCorkez
Copy link

SantjagoCorkez commented Aug 28, 2019

pika version: 1.1.0

As a result of interface link flapping a socket underneath pika's connection to RabbitMQ became dead. Since this state transition has not been properly processed by client code any subsequent publish() to pika's Channel then caused this:

  File "/opt/venv/lib/python3.6/site-packages/pika/adapters/blocking_connection.py", line 2210, in basic_publish
    mandatory=mandatory)
  File "/opt/venv/lib/python3.6/site-packages/pika/channel.py", line 421, in basic_publish
    self._raise_if_not_open()
  File "/opt/venv/lib/python3.6/site-packages/pika/channel.py", line 1389, in _raise_if_not_open
    raise exceptions.ChannelWrongStateError('Channel is closed.')
pika.exceptions.ChannelWrongStateError: Channel is closed.

As a result:
a code like

try:
    self.channel.basic_publish(*a, **k)
except (exceptions.ConnectionClosed, exceptions.ChannelClosed) as error:
    log.debug('Try to reconnect in 5 seconds')
    time.sleep(5)
    self.reconnect()

would fail.

This behaviour was implemented at 761ef5c

But I disagree in total with the approach since it brakes the whole typed exception support of python itself. Typed exceptions are supported for the user of code can just quickly match the error yielded by runtime despite what's the actual reason. The commit mentioned brakes this and offers code users to revert to strings patterns matching. Instead of raising the same exception in cases:

  • channel is in opening state but is not ready to operate
  • channel is [already] closed

one should provide a special type of an exception for each of the cases if the error recovery is supposed to differ, IMO.

For these two cases error recovery approaches obviously to me differ in total. For the channel in opening state it's wait a little then try again (but don't reconnect since there's no reason for it); for the channel is in closed state it could be recover via reconnect. Right now the code user differs what approach to use only with the Exception.args parsing.

@lukebakken lukebakken self-assigned this Aug 30, 2019
@lukebakken lukebakken added this to the 1.1.1 milestone Aug 30, 2019
@lukebakken lukebakken modified the milestones: 1.1.1, 1.2.0 Feb 24, 2020
@mato2000
Copy link

Hi, I had the same error, but not the exact situation. First I had basic_publish that didn't handle a StreamLostError. Then, on the same channel another basic_publish raised a ChannelWrongStateError with the same message.

I think it's confusing, since that the documentation says it will raise UnroutableError and NackError only.

@lukebakken lukebakken removed this from the 1.2.0 milestone Jan 27, 2021
@lukebakken lukebakken added this to the 2.0.0 milestone May 21, 2022
@lukebakken
Copy link
Member

I'll be sure that the documentation is updated as well.

@lukebakken lukebakken removed this from the 2.0.0 milestone May 23, 2022
@lukebakken
Copy link
Member

I'm not exactly sure what scenario could cause ChannelWrongStateError when the state is OPENING because a user of this library should not attempt other channel operations until the on_open_callback is called.

I'll address this issue in this manner:

  • Add an is_opening method so that every state has a corresponding check.
  • Ensure the documentation and examples demonstrate handling the ChannelWrongStateError exception.

In another issue I will see how feasible it would be to have an Recoverable / UnRecoverable sort of exception in the exception hierarchy for connections and channels.

lukebakken added a commit that referenced this issue May 23, 2022
Bump versions for win32 testing

Fixes #1240
@pika pika deleted a comment from SantjagoCorkez May 25, 2022
@pika pika locked as resolved and limited conversation to collaborators May 25, 2022
@pika pika deleted a comment from dgslv May 25, 2022
@pika pika deleted a comment from SantjagoCorkez May 25, 2022
@pika pika deleted a comment from 123mw123 May 25, 2022
@pika pika unlocked this conversation May 25, 2022
@pika pika deleted a comment from azharukan May 25, 2022
@lukebakken
Copy link
Member

I've deleted my and other off-topic comments here. @SantjagoCorkez's point that the changes in 761ef5c may make it more difficult to distinguish between recoverarble vs non-recoverable errors is valid, and that's what I'm going to address, as well as documentation around the issue.

@Diaislam
Copy link

I have the same issue

@lukebakken lukebakken added this to the 1.3.0 milestone Jun 28, 2022
lukebakken added a commit that referenced this issue Jun 28, 2022
lukebakken added a commit that referenced this issue Jun 28, 2022
lukebakken added a commit that referenced this issue Jun 28, 2022
@lukebakken lukebakken reopened this Jun 28, 2022
@lukebakken lukebakken modified the milestones: 1.3.0, 2.0.0 Jun 29, 2022
@lukebakken
Copy link
Member

Re-doing the exception hierarchy will have to wait until version 2.0

@Diaislam
Copy link

if you use basic publisher consumer code, the channel will eventually get closed when waiting for messages, even if you choose whatever parameters and heartbeat. and you reconnect each time you publish an event it's still the same.
use the asynchronous code i had the consumer waiting for days for messages, when i started the consumer it immediately processed the messages. now been testing for weeks and no issue so far.

@lukebakken
Copy link
Member

@Diaislam without code that I can run and instructions on how to reproduce what you describe, there's nothing I can do. If you'd like to investigate, open a new issue and help me help you.

@activaigor
Copy link

activaigor commented Aug 12, 2023

still happening to me even on pika==1.3.0
producing is pretty straight-forward

class RabbitMQProducer:

    def __init__(self, model_class: Type[BaseRabbitModel]):
        self._model_class = model_class
        self._client = self._make_connection()

    def send(self, model: BaseRabbitModel):
        assert isinstance(
            model, self._model_class
        ), "You should be consistent in using RabbitMQProducer for the model class you've specified"
        try:
            self._publish(model)
        except (ConnectionClosed, ChannelClosed, ChannelWrongStateError) as error:
            logging.exception(error)
            self._fix_connection()
            self._publish(model)

    def _publish(self, model: BaseRabbitModel):
        self._client.basic_publish(
            exchange=self._model_class.Meta.exchange,
            routing_key=self._model_class.Meta.routing_key,
            body=model.to_json_bytes(),
            properties=pika.BasicProperties(content_type='application/json')
        )

    def _make_connection(self) -> BlockingChannel:
        return RabbitMQClient.make_for_producer(self._model_class)

    def _fix_connection(self):
        logging.warning("Fixing rabbitmq connection...")
        if self._client.is_closed:
            logging.warning("The connection is closed, restart it")
            self._client = self._make_connection()
        else:
            logging.warning("Most likely connection is currently on the opening state, let's wait for a while...")
            time.sleep(0.5)

@MahmudulHassan5809
Copy link

i am also getting this error.

import pika, json
from django.conf import settings


class RabbitMQ:
    def __init__(self, amqp_url):
        self.connection = pika.BlockingConnection(pika.URLParameters(amqp_url))
        self.channel = self.connection.channel()
        self.channel.queue_declare(queue="main")

    def publish(self, method, body):
        properties = pika.BasicProperties(method)
        self.channel.basic_publish(
            exchange="",
            routing_key="main",
            body=json.dumps(body),
            properties=properties,
        )

    def close_connection(self):
        self.connection.close()


# Usage
amqp_url = settings.AMQP_SERVER_URL
rabbitmq = RabbitMQ(amqp_url)

any reason why ??

`

@lukebakken
Copy link
Member

@activaigor @MahmudulHassan5809 neither of you have provided enough information for me to assist you

  • What does RabbitMQ log at the same time?
  • Was there a network issue just prior to the exception in your python code?

@MahmudulHassan5809
Copy link

@lukebakken hello .. this the lgs i got from RabbitMQ . is it ok or i miss some things?

2023-10-23 16:12:10.750047+00:00 [info] <0.20042.3> connection <0.20042.3> ([::1]:42330 -> [::1]:5672): user 'ghouse' authenticated and granted access to vhost '/'
2023-10-23 16:12:10.751565+00:00 [info] <0.20045.3> connection <0.20045.3> ([::1]:42334 -> [::1]:5672): user 'ghouse' authenticated and granted access to vhost '/'
2023-10-23 16:12:10.816502+00:00 [info] <0.20068.3> accepting AMQP connection <0.20068.3> ([::1]:42348 -> [::1]:5672)
2023-10-23 16:12:10.818977+00:00 [info] <0.20068.3> connection <0.20068.3> ([::1]:42348 -> [::1]:5672): user 'ghouse' authenticated and granted access to vhost '/'
2023-10-23 16:15:10.752762+00:00 [erro] <0.20042.3> closing AMQP connection <0.20042.3> ([::1]:42330 -> [::1]:5672):
2023-10-23 16:15:10.752762+00:00 [erro] <0.20042.3> missed heartbeats from client, timeout: 60s
2023-10-23 16:15:10.753537+00:00 [erro] <0.20045.3> closing AMQP connection <0.20045.3> ([::1]:42334 -> [::1]:5672):
2023-10-23 16:15:10.753537+00:00 [erro] <0.20045.3> missed heartbeats from client, timeout: 60s
2023-10-23 16:15:10.821715+00:00 [erro] <0.20068.3> closing AMQP connection <0.20068.3> ([::1]:42348 -> [::1]:5672):
2023-10-23 16:15:10.821715+00:00 [erro] <0.20068.3> missed heartbeats from client, timeout: 60s

there was no network issue. and after few tries rabbitmq works fine like and then it again gives me channel closed or pika.exceptions.StreamLostError: Stream connection lost: ConnectionResetError(104, 'Connection reset by peer')

@pika pika deleted a comment from MahmudulHassan5809 Oct 23, 2023
@lukebakken
Copy link
Member

@MahmudulHassan5809 - follow-up #1449

@vasu-dholakiya
Copy link

Is there any update here? In the connection pool with a heartbeat, connections are getting dropped as well.

@Diaislam
Copy link

Diaislam commented Oct 30, 2023

Is there any update here? In the connection pool with a heartbeat, connections are getting dropped as well.

try this code :

def connect(self):
    credentials = pika.PlainCredentials(self.username, self.password)
    parameters = pika.ConnectionParameters(host = self.server,
                                           credentials = credentials,
                                           heartbeat=heartbeat, 
                                           blocked_connection_timeout=bct)
    self.connection =  pika.BlockingConnection(parameters)
 
    return pika.SelectConnection(
        parameters=parameters,
        on_open_callback=self.on_connection_open,
        on_open_error_callback=self.on_connection_open_error,
        on_close_callback=self.on_connection_closed)

# specify queu
def set_channel(self):

    self.channel = self.connection.channel()
    self.channel.confirm_delivery()
    self.channel.exchange_declare(exchange=self.exchange,
                                  exchange_type='direct',
                                  durable=True)

    self.channel.queue_declare(queue="queue")
    self.channel.queue_bind(queue="queue", exchange="exchange", routing_key='standard_key')
    self.channel.basic_qos(prefetch_count=1)

# send a message to the queue
def send_message(self,message):
    
    if not self.connection or self.connection.is_closed:
        self.connect()
        self.set_channel()
        
    self.channel.basic_publish(exchange = "exchange",
                               routing_key = 'standard_key',
                               mandatory = True,
                               body = json.dumps(message),
                               properties = pika.BasicProperties(delivery_mode = 2)) # delivery mode = 2 persistant

def on_connection_open(self, _unused_connection):
      logging.info('Connection opened')
      self.open_channel()

def on_connection_open_error(self, _unused_connection, err):

    logging.error('Connection open failed: %s', err)
    self.reconnect()

def on_connection_closed(self, _unused_connection, reason):

    self.channel = None
    if self._closing:
        self.connection.ioloop.stop()
    else:
        logging.warning('Connection closed, reconnect necessary: %s', reason)
        self.reconnect()
def reconnect(self):
    """Will be invoked if the connection can't be opened or is
    closed. Indicates that a reconnect is necessary then stops the
    ioloop.
    """
    self.should_reconnect = True
    self.stop()

initiate the variables in your class

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

7 participants