Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

amqp_channel_flow usage to indicate for publisher that queue is not ready #811

Open
arun-sathya opened this issue Jan 8, 2024 · 0 comments

Comments

@arun-sathya
Copy link

Want to check if the usage of amqp_channel_flow is correct or not ?

Version Details

  • RabbitMQ 3.9.13 on Erlang 24.2.1
  • Rabbitmq-c client is v0.11.0

Scenario
Consumer starts consuming messages from a queue(with exclusive, auto-delete bits set). An upper limit of x-max-length-bytes is set, overflow is set to reject-publish. Internally, Consumer keeps track of how much data is in processing state in bytes and will use amqp_channel_flow method to toggle the consumer consuming/pausing the messsage contents from the queue.

Publisher checks if queue is ready to accept the message before sending, In Order to achieve, publisher redeclares the queue in passive mode and checks consumer_count. If consumer count is zero, tries to resend after some time.

When amqp_channel_flow(conn,1,0) is used by consumer to pause the reading contents from queue, an connection exception is thrown saying
operation channel.flow caused a connection exception not_implemented: "active=false"

Pseudo Code Of Consumer
{
// declares a exclusive auto-delete queue Q1
amqp_queue_declare_ok_t *) declare_ok = amqp_queue_declare(conn,channel, Q1 , 0, 0, 1, 1, amqp_empty_table);

//basic consume
amqp_basic_consume(conn, 1, queuename, amqp_empty_bytes, 0, 1, 0, amqp_empty_table);

while(1)
{
amqp_rpc_reply_t res;
amqp_envelope_t envelope;
amqp_maybe_release_buffers(conn);
res = amqp_consume_message(conn, &envelope, NULL, 0);
if (AMQP_RESPONSE_NORMAL != res.reply_type) {
break;
}
//under some condition consumer wants no more messages to be consumed for some time(too much content is already in
//pipeline) . Want to channel.flow event.
**amqp_channel_flow(conn,1,false) //this is throwing connectoin exception saying operation channel.flow caused a connection exception not_implemented: "active=false" **

// after some time want to resume consumer to accept more messages from the queue
amqp_channel_flow(conn,1,true);
}

}
Pseudo Cod Publisher
{
//want to check if consumer is ready to accept before sending. declare the exclusive queue in passive mode
(amqp_queue_declare_ok_t *) declare_ok = amqp_queue_declare(conn,channel, Q1 , 1, 0, 0, 1, amqp_empty_table);
if(decalre_ok-> consumer-count)
//publish message
else
//retry after some time
}

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant