Skip to content

Commit

Permalink
Merge pull request #10663 from rabbitmq/exchange-federation-qq
Browse files Browse the repository at this point in the history
Add queue type setting for exchange federation
  • Loading branch information
michaelklishin committed Mar 6, 2024
2 parents 8e0f5d3 + 2e8ff3a commit 21d4afb
Show file tree
Hide file tree
Showing 8 changed files with 67 additions and 16 deletions.
4 changes: 2 additions & 2 deletions deps/rabbitmq_federation/include/rabbit_federation.hrl
Expand Up @@ -16,7 +16,7 @@
message_ttl,
trust_user_id,
ack_mode,
ha_policy,
queue_type,
name,
bind_nowait,
resource_cleanup_mode,
Expand Down Expand Up @@ -45,4 +45,4 @@

-define(FEDERATION_GUIDE_URL, <<"https://rabbitmq.com/federation.html">>).

-define(FEDERATION_PG_SCOPE, rabbitmq_federation_pg_scope).
-define(FEDERATION_PG_SCOPE, rabbitmq_federation_pg_scope).
Expand Up @@ -504,15 +504,16 @@ consume_from_upstream_queue(
#upstream{prefetch_count = Prefetch,
expires = Expiry,
message_ttl = TTL,
ha_policy = HA} = Upstream,
queue_type = QueueType} = Upstream,
#upstream_params{x_or_q = X,
params = Params} = UParams,
Q = upstream_queue_name(name(X), vhost(Params), DownXName),
Args = [A || {_K, _T, V} = A
<- [{<<"x-expires">>, long, Expiry},
{<<"x-message-ttl">>, long, TTL},
{<<"x-ha-policy">>, longstr, HA},
{<<"x-internal-purpose">>, longstr, <<"federation">>}],
{<<"x-internal-purpose">>, longstr, <<"federation">>},
{<<"x-queue-type">>, longstr, atom_to_binary(QueueType)}
],
V =/= none],
amqp_channel:call(Ch, #'queue.declare'{queue = Q,
durable = true,
Expand Down
Expand Up @@ -89,7 +89,8 @@ shared_validation() ->
['no-ack', 'on-publish', 'on-confirm']), optional},
{<<"resource-cleanup-mode">>, rabbit_parameter_validation:enum(
['default', 'never']), optional},
{<<"ha-policy">>, fun rabbit_parameter_validation:binary/2, optional},
{<<"queue-type">>, rabbit_parameter_validation:enum(
['classic', 'quorum']), optional},
{<<"bind-nowait">>, fun rabbit_parameter_validation:boolean/2, optional},
{<<"channel-use-mode">>, rabbit_parameter_validation:enum(
['multiple', 'single']), optional}].
Expand Down
Expand Up @@ -136,7 +136,7 @@ from_upstream_or_set(US, Name, U, XorQ) ->
message_ttl = bget('message-ttl', US, U, none),
trust_user_id = bget('trust-user-id', US, U, false),
ack_mode = to_atom(bget('ack-mode', US, U, <<"on-confirm">>)),
ha_policy = bget('ha-policy', US, U, none),
queue_type = to_atom(bget('queue-type', US, U, <<"classic">>)),
name = Name,
bind_nowait = bget('bind-nowait', US, U, false),
resource_cleanup_mode = to_atom(bget('resource-cleanup-mode', US, U, <<"default">>)),
Expand Down
49 changes: 49 additions & 0 deletions deps/rabbitmq_federation/test/exchange_SUITE.erl
Expand Up @@ -50,6 +50,7 @@ groups() ->
essential() ->
[
single_upstream,
single_upstream_quorum,
multiple_upstreams,
multiple_upstreams_pattern,
single_upstream_multiple_uris,
Expand Down Expand Up @@ -163,9 +164,46 @@ single_upstream(Config) ->
await_binding(Config, 0, UpX, RK),
publish_expect(Ch, UpX, RK, Q, <<"single_upstream payload">>),

Server = rabbit_ct_broker_helpers:get_node_config(Config, 0, nodename),
assert_federation_internal_queue_type(Config, Server, rabbit_classic_queue),

rabbit_ct_client_helpers:close_channel(Ch),
clean_up_federation_related_bits(Config).

single_upstream_quorum(Config) ->
FedX = <<"single_upstream_quorum.federated">>,
UpX = <<"single_upstream_quorum.upstream.x">>,
rabbit_ct_broker_helpers:set_parameter(
Config, 0, <<"federation-upstream">>, <<"localhost">>,
[
{<<"uri">>, rabbit_ct_broker_helpers:node_uri(Config, 0)},
{<<"exchange">>, UpX},
{<<"queue-type">>, <<"quorum">>}
]),
rabbit_ct_broker_helpers:set_policy(
Config, 0,
<<"fed.x">>, <<"^single_upstream_quorum.federated">>, <<"exchanges">>,
[
{<<"federation-upstream">>, <<"localhost">>}
]),

Ch = rabbit_ct_client_helpers:open_channel(Config, 0),

Xs = [
exchange_declare_method(FedX)
],
declare_exchanges(Ch, Xs),

RK = <<"key">>,
Q = declare_and_bind_queue(Ch, FedX, RK),
await_binding(Config, 0, UpX, RK),
publish_expect(Ch, UpX, RK, Q, <<"single_upstream_quorum payload">>),

Server = rabbit_ct_broker_helpers:get_node_config(Config, 0, nodename),
assert_federation_internal_queue_type(Config, Server, rabbit_quorum_queue),

rabbit_ct_client_helpers:close_channel(Ch),
clean_up_federation_related_bits(Config).

multiple_upstreams(Config) ->
FedX = <<"multiple_upstreams.federated">>,
Expand Down Expand Up @@ -870,3 +908,14 @@ await_credentials_obfuscation_seeding_on_two_nodes(Config) ->
end),

timer:sleep(1000).

assert_federation_internal_queue_type(Config, Server, Expected) ->
Qs = all_queues_on(Config, Server),
FedQs = lists:filter(
fun(Q) ->
lists:member(
{<<"x-internal-purpose">>, longstr, <<"federation">>}, amqqueue:get_arguments(Q))
end,
Qs),
FedQTypes = lists:map(fun(Q) -> amqqueue:get_type(Q) end, FedQs),
?assertEqual([Expected], lists:uniq(FedQTypes)).
4 changes: 2 additions & 2 deletions deps/rabbitmq_federation_management/priv/www/js/federation.js
Expand Up @@ -75,8 +75,8 @@ HELP['federation-expires'] =
HELP['federation-ttl'] =
'Time in milliseconds that undelivered messages should be held upstream when there is a network outage or backlog. Leave this blank to mean "forever".';

HELP['ha-policy'] =
'Determines the "x-ha-policy" argument for the upstream queue for a federated exchange. Default is "none", meaning the queue is not HA.';
HELP['queue-type'] =
'Defines the queue type for the upstream queue for a federated exchange. Default is "classic". Set to "quorum" for high availability.';

HELP['queue'] =
'The name of the upstream queue. Default is to use the same name as the federated queue.';
Expand Down
Expand Up @@ -56,8 +56,8 @@
</tr>

<tr>
<th>HA Policy</th>
<td><%= fmt_string(upstream.value['ha-policy']) %></td>
<th>Queue Type</th>
<td><%= fmt_string(upstream.value['queue-type']) %></td>
</tr>

<tr>
Expand Down
Expand Up @@ -19,7 +19,7 @@
<th>Max Hops</th>
<th>Expiry</th>
<th>Message TTL</th>
<th>HA Policy</th>
<th>Queue Type</th>
<th>Queue</th>
<th>Consumer tag</th>
</tr>
Expand All @@ -43,7 +43,7 @@
<td class="r"><%= upstream.value['max-hops'] %></td>
<td class="r"><%= fmt_time(upstream.value.expires, 'ms') %></td>
<td class="r"><%= fmt_time(upstream.value['message-ttl'], 'ms') %></td>
<td class="r"><%= fmt_string(upstream.value['ha-policy']) %></td>
<td class="r"><%= fmt_string(upstream.value['queue-type']) %></td>
<td class="r"><%= fmt_string(upstream.value['queue']) %></td>
<td class="r"><%= fmt_string(upstream.value['consumer-tag']) %></td>
</tr>
Expand Down Expand Up @@ -195,11 +195,11 @@
<tr>
<th>
<label>
HA Policy:
<span class="help" id="ha-policy"></span>
Queue Type:
<span class="help" id="queue-type"></span>
</label>
</th>
<td><input type="text" name="ha-policy"/></td>
<td><input type="text" name="queue-type"/></td>
</tr>
</tr>

Expand Down

0 comments on commit 21d4afb

Please sign in to comment.