Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Management UI: extend Get Message feature to streams #11030

Draft
wants to merge 1 commit into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
6 changes: 5 additions & 1 deletion deps/rabbitmq_management/priv/www/js/main.js
Original file line number Diff line number Diff line change
Expand Up @@ -1286,7 +1286,11 @@ function get_msgs(params) {
with_req('POST', path, JSON.stringify(params), function(resp) {
var msgs = JSON.parse(resp.responseText);
if (msgs.length == 0) {
show_popup('info', 'Queue is empty');
if ("offset" in params) {
show_popup('info', 'No messages in stream at given offset');
} else {
show_popup('info', 'Queue is empty');
}
} else {
$('#msg-wrapper').slideUp(200);
replace_content('msg-wrapper', format('messages', {'msgs': msgs}));
Expand Down
12 changes: 10 additions & 2 deletions deps/rabbitmq_management/priv/www/js/tmpl/queue.ejs
Original file line number Diff line number Diff line change
Expand Up @@ -309,19 +309,21 @@

<%= format('publish', {'mode': 'queue', 'queue': queue}) %>

<% if (!is_stream(queue)) { %>
<div class="section-hidden">
<h2>Get messages</h2>
<div class="hider">
<% if (!is_stream(queue)) { %>
<p>
Warning: getting messages from a queue is a destructive action.
<span class="help" id="message-get-requeue"></span>
</p>
<% } %>
<form action="#/queues/get" method="post">
<input type="hidden" name="vhost" value="<%= fmt_string(queue.vhost) %>"/>
<input type="hidden" name="name" value="<%= fmt_string(queue.name) %>"/>
<input type="hidden" name="truncate" value="50000"/>
<table class="form">
<% if (!is_stream(queue)) { %>
<tr>
<th><label>Ack Mode:</label></th>
<td>
Expand All @@ -333,6 +335,7 @@
</select>
</td>
</tr>
<% } %>
<tr>
<th><label>Encoding:</label></th>
<td>
Expand All @@ -347,13 +350,18 @@
<th><label>Messages:</label></th>
<td><input type="text" name="count" value="1"/></td>
</tr>
<% if (is_stream(queue)) { %>
<tr>
<th><label>Offset:</label></th>
<td><input type="text" name="offset" value="0"/></td>
</tr>
<% } %>
</table>
<input type="submit" value="Get Message(s)" />
</form>
<div id="msg-wrapper"></div>
</div>
</div>
<% } %>

<% if (is_user_policymaker) { %>
<div class="section-hidden">
Expand Down
141 changes: 116 additions & 25 deletions deps/rabbitmq_management/src/rabbit_mgmt_wm_queue_get.erl
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

-module(rabbit_mgmt_wm_queue_get).

-include_lib("kernel/include/logger.hrl").
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this used anywhere? or just a leftover from debugging?

-export([init/2, resource_exists/2, is_authorized/2, allow_missing_post/2,
allowed_methods/2, accept_content/2, content_types_provided/2,
content_types_accepted/2]).
Expand Down Expand Up @@ -47,34 +48,124 @@ accept_content(ReqData, Context) ->
do_it(ReqData0, Context) ->
VHost = rabbit_mgmt_util:vhost(ReqData0),
Q = rabbit_mgmt_util:id(queue, ReqData0),
rabbit_mgmt_util:with_decode(
[ackmode, count, encoding], ReqData0, Context,
fun([AckModeBin, CountBin, EncBin], Body, ReqData) ->
rabbit_mgmt_util:with_channel(
VHost, ReqData, Context,
fun (Ch) ->
AckMode = list_to_atom(binary_to_list(AckModeBin)),
Count = rabbit_mgmt_util:parse_int(CountBin),
Enc = case EncBin of
<<"auto">> -> auto;
<<"base64">> -> base64;
_ -> throw({error, <<"Unsupported encoding. Please use auto or base64.">>})
end,
Trunc = case maps:get(truncate, Body, undefined) of
undefined -> none;
TruncBin -> rabbit_mgmt_util:parse_int(
TruncBin)
end,

Reply = basic_gets(Count, Ch, Q, AckMode, Enc, Trunc),
maybe_return(Reply, Ch, AckMode),
rabbit_mgmt_util:reply(remove_delivery_tag(Reply),
ReqData, Context)
end)
end).
Resource = rabbit_misc:r(<<"/">>, queue, Q),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
Resource = rabbit_misc:r(<<"/">>, queue, Q),
Resource = rabbit_misc:r(VHost, queue, Q),

{ok, Queue} = rabbit_amqqueue:lookup(Resource),
case amqqueue:get_type(Queue) of
rabbit_stream_queue ->
rabbit_mgmt_util:with_decode(
[count, encoding, offset], ReqData0, Context,
fun([CountBin, EncBin, OffsetBin], Body, ReqData) ->
rabbit_mgmt_util:with_channel(
VHost, ReqData, Context,
fun (Ch) ->
Count = rabbit_mgmt_util:parse_int(CountBin),
Enc = case EncBin of
<<"auto">> -> auto;
<<"base64">> -> base64;
_ -> throw({error, <<"Unsupported encoding. Please use auto or base64.">>})
end,
Offset = rabbit_mgmt_util:parse_int(OffsetBin),
Trunc = case maps:get(truncate, Body, undefined) of
undefined -> none;
TruncBin -> rabbit_mgmt_util:parse_int(
TruncBin)
end,
CTag = <<"ctag">>,
Reply = start_subscription_gets(
Count, Ch, Q, CTag, Offset, Enc, Trunc),
rabbit_mgmt_util:reply(remove_delivery_tag(Reply),
ReqData, Context)
end)
end);
_ ->
rabbit_mgmt_util:with_decode(
[ackmode, count, encoding], ReqData0, Context,
fun([AckModeBin, CountBin, EncBin], Body, ReqData) ->
rabbit_mgmt_util:with_channel(
VHost, ReqData, Context,
fun (Ch) ->
AckMode = list_to_atom(binary_to_list(AckModeBin)),
Count = rabbit_mgmt_util:parse_int(CountBin),
Enc = case EncBin of
<<"auto">> -> auto;
<<"base64">> -> base64;
_ -> throw({error, <<"Unsupported encoding. Please use auto or base64.">>})
end,
Trunc = case maps:get(truncate, Body, undefined) of
undefined -> none;
TruncBin -> rabbit_mgmt_util:parse_int(
TruncBin)
end,
Reply = basic_gets(Count, Ch, Q, AckMode, Enc,
Trunc),
maybe_return(Reply, Ch, AckMode),
rabbit_mgmt_util:reply(remove_delivery_tag(Reply),
ReqData, Context)
end)
end)
end.

start_subscription_gets(Count, Ch, Queue, CTag, Offset, Enc, Trunc) ->
qos(Ch, Count),
subscribe(Ch, Queue, false, Offset, CTag),
Replies = subscription_gets(Count, Ch, Queue, CTag, Offset, Enc, Trunc),
cancel_subscription(Ch, CTag),
Replies.

subscription_gets(0, _Ch, _Queue, _CTag, _Offset, _Enc, _Trunc) ->
[];
subscription_gets(Count, Ch, Queue, CTag, Offset, Enc, Trunc) ->
case subscription_get(Ch, Enc, Trunc) of
none -> [];
Reply -> [Reply | subscription_gets(Count - 1, Ch, Queue, CTag, Offset, Enc, Trunc)]
end.

subscription_get(Ch, Enc, Trunc) ->
receive
{#'basic.deliver'{redelivered = Redelivered,
exchange = Exchange,
routing_key = RoutingKey,
delivery_tag = DeliveryTag,
consumer_tag = ConsumerTag},
#amqp_msg{props = Props, payload = Payload}} ->
ok = amqp_channel:cast(Ch, #'basic.ack'{delivery_tag = DeliveryTag,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I dont think we want to ack the message. If the stream has 20 messages, and we ask for 10 (prefetch = 10) when we ack the first message, the broker will send the 11th message. (Prefetch count means the max number of outstanding ie unacknowledged messages)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This feature should not arguably use AMQP 0-9-1 at all. And the original feature should never have used an AMQP 0-9-1 Erlang client to begin with.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes it would probably be better to use rabbit_queue_type directly to get a message off the stream.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I dont say that an amqp connection should be used to implement this feature, just want to mention the aspect, or implicit benefit that the server side of an amqp connection does additional authorization checks.

Would like to cite the recent attempt for the delete queue endpoint to use an internal API #9550 which was reverted #10062 (comment). Maybe can you recall what was the difficulty, why the path outlined in #10062 was abandoned?

Or maybe that restriction is not applicable for a simple reading from a queue?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds like a way to not move anywhere.. why would not using the 0-9-1 protocol be ok expect it's maybe not 100% optimal? This solves a real world problem with minimal change.

multiple = false}),
[{payload_bytes, size(Payload)},
{redelivered, Redelivered},
{exchange, Exchange},
{routing_key, RoutingKey},
{consumer_tag, ConsumerTag},
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

consumer tag is just hardcoded "ctag" so it does not need to be included.

{properties, rabbit_mgmt_format:basic_properties(Props)}] ++
payload_part(maybe_truncate(Payload, Trunc), Enc)
after
300 ->
none
end.

subscribe(Ch, Queue, NoAck, Offset, CTag) ->
amqp_channel:subscribe(
Ch,
#'basic.consume'{queue = Queue,
no_ack = NoAck,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

NoAck is always false so we can hardcode it I think
Also hardcode the CTag

consumer_tag = CTag,
arguments = [{<<"x-stream-offset">>, long, Offset}]},
self()),
receive
#'basic.consume_ok'{consumer_tag = CTag} ->
ok
end.

qos(Ch, Prefetch) ->
#'basic.qos_ok'{} = amqp_channel:call(
Ch,
#'basic.qos'{global = false, prefetch_count = Prefetch}).

cancel_subscription(Ch, CTag) ->
amqp_channel:call(
Ch,
#'basic.cancel'{
consumer_tag = CTag,
nowait = false}).

basic_gets(0, _, _, _, _, _) ->
[];
Expand Down