diff --git a/deps/rabbit/test/rabbit_stream_queue_SUITE.erl b/deps/rabbit/test/rabbit_stream_queue_SUITE.erl index 2203632f6ec9..76ae4883a990 100644 --- a/deps/rabbit/test/rabbit_stream_queue_SUITE.erl +++ b/deps/rabbit/test/rabbit_stream_queue_SUITE.erl @@ -16,6 +16,7 @@ -compile(nowarn_export_all). -compile(export_all). +-import(rabbit_ct_helpers, [await_condition/2]). -define(WAIT, 5000). suite() -> @@ -2158,8 +2159,10 @@ leader_locator_client_local(Config) -> declare(Config, Server1, Q, [{<<"x-queue-type">>, longstr, <<"stream">>}, {<<"x-queue-leader-locator">>, longstr, <<"client-local">>}])), - ?assertMatch(Server1, proplists:get_value(leader, - find_queue_info(Config, [leader]))), + await_condition( + fun () -> + Server1 == proplists:get_value(leader, find_queue_info(Config, [leader])) + end, 60), ?assertMatch(#'queue.delete_ok'{}, delete(Config, Server1, Q)), @@ -2170,8 +2173,13 @@ leader_locator_client_local(Config) -> declare(Config, Server2, Q2, [{<<"x-queue-type">>, longstr, <<"stream">>}, {<<"x-queue-leader-locator">>, longstr, <<"client-local">>}])), - ?assertMatch(Server2, proplists:get_value(leader, - find_queue_info(Q2, Config, 0, [leader]))), + %% the amqqueue:pid field is updated async for khepri + %% so we need to await the condition here + await_condition( + fun () -> + Server2 == proplists:get_value(leader, + find_queue_info(Q2, Config, 0, [leader])) + end, 60), ?assertMatch(#'queue.delete_ok'{}, delete(Config, Server2, Q2)), @@ -2182,9 +2190,11 @@ leader_locator_client_local(Config) -> declare(Config, Server3, Q3, [{<<"x-queue-type">>, longstr, <<"stream">>}, {<<"x-queue-leader-locator">>, longstr, <<"client-local">>}])), - - ?assertEqual(Server3, proplists:get_value(leader, - find_queue_info(Q3, Config, 0, [leader]))), + await_condition( + fun () -> + Server3 == proplists:get_value(leader, + find_queue_info(Q3, Config, 0, [leader])) + end, 60), ?assertMatch(#'queue.delete_ok'{}, delete(Config, Server3, Q3)), ok. @@ -2225,7 +2235,7 @@ leader_locator_balanced_maintenance(Config) -> declare(Config, Server1, Q, [{<<"x-queue-type">>, longstr, <<"stream">>}, {<<"x-queue-leader-locator">>, longstr, <<"balanced">>}])), - rabbit_ct_helpers:await_condition( + await_condition( fun() -> Info = find_queue_info(Config, [leader]), Leader = proplists:get_value(leader, Info),