Skip to content

Commit

Permalink
Merge pull request #4609 from pwojcikdev/fair-queuing/message-process…
Browse files Browse the repository at this point in the history
…or-3

Replace `tcp_message_manager`
  • Loading branch information
pwojcikdev committed May 10, 2024
2 parents 21e48cd + c962248 commit 6eeaf21
Show file tree
Hide file tree
Showing 18 changed files with 436 additions and 355 deletions.
62 changes: 0 additions & 62 deletions nano/core_test/network.cpp
Expand Up @@ -878,68 +878,6 @@ TEST (network, tcp_no_accept_excluded_peers)
ASSERT_TIMELY_EQ (5s, node0->network.size (), 1);
}

/*
namespace nano
{
TEST (network, tcp_message_manager)
{
nano::transport::tcp_message_manager manager (1);
item.node_id = nano::account (100);
ASSERT_EQ (0, manager.entries.size ());
manager.put_message (item);
ASSERT_EQ (1, manager.entries.size ());
ASSERT_EQ (manager.get_message ().node_id, item.node_id);
ASSERT_EQ (0, manager.entries.size ());
// Fill the queue
manager.entries = decltype (manager.entries) (manager.max_entries, item);
ASSERT_EQ (manager.entries.size (), manager.max_entries);
// This task will wait until a message is consumed
auto future = std::async (std::launch::async, [&] {
manager.put_message (item);
});
// This should give sufficient time to execute put_message
// and prove that it waits on condition variable
std::this_thread::sleep_for (200ms);
ASSERT_EQ (manager.entries.size (), manager.max_entries);
ASSERT_EQ (manager.get_message ().node_id, item.node_id);
ASSERT_NE (std::future_status::timeout, future.wait_for (1s));
ASSERT_EQ (manager.entries.size (), manager.max_entries);
nano::tcp_message_manager manager2 (2);
size_t message_count = 10'000;
std::vector<std::thread> consumers;
for (auto i = 0; i < 4; ++i)
{
consumers.emplace_back ([&] {
for (auto i = 0; i < message_count; ++i)
{
ASSERT_EQ (manager.get_message ().node_id, item.node_id);
}
});
}
std::vector<std::thread> producers;
for (auto i = 0; i < 4; ++i)
{
producers.emplace_back ([&] {
for (auto i = 0; i < message_count; ++i)
{
manager.put_message (item);
}
});
}
for (auto & t : boost::range::join (producers, consumers))
{
t.join ();
}
}
}
*/

TEST (network, cleanup_purge)
{
auto test_start = std::chrono::steady_clock::now ();
Expand Down
10 changes: 10 additions & 0 deletions nano/core_test/toml.cpp
Expand Up @@ -273,6 +273,9 @@ TEST (toml, daemon_config_deserialize_defaults)
ASSERT_EQ (conf.node.request_aggregator.max_queue, defaults.node.request_aggregator.max_queue);
ASSERT_EQ (conf.node.request_aggregator.threads, defaults.node.request_aggregator.threads);
ASSERT_EQ (conf.node.request_aggregator.batch_size, defaults.node.request_aggregator.batch_size);

ASSERT_EQ (conf.node.message_processor.threads, defaults.node.message_processor.threads);
ASSERT_EQ (conf.node.message_processor.max_queue, defaults.node.message_processor.max_queue);
}

TEST (toml, optional_child)
Expand Down Expand Up @@ -584,6 +587,10 @@ TEST (toml, daemon_config_deserialize_no_defaults)
threads = 999
batch_size = 999
[node.message_processor]
threads = 999
max_queue = 999
[opencl]
device = 999
enable = true
Expand Down Expand Up @@ -741,6 +748,9 @@ TEST (toml, daemon_config_deserialize_no_defaults)
ASSERT_NE (conf.node.request_aggregator.max_queue, defaults.node.request_aggregator.max_queue);
ASSERT_NE (conf.node.request_aggregator.threads, defaults.node.request_aggregator.threads);
ASSERT_NE (conf.node.request_aggregator.batch_size, defaults.node.request_aggregator.batch_size);

ASSERT_NE (conf.node.message_processor.threads, defaults.node.message_processor.threads);
ASSERT_NE (conf.node.message_processor.max_queue, defaults.node.message_processor.max_queue);
}

/** There should be no required values **/
Expand Down
3 changes: 2 additions & 1 deletion nano/lib/logging_enums.hpp
Expand Up @@ -49,7 +49,7 @@ enum class type
election,
blockprocessor,
network,
network_processed,
message,
channel,
channel_sent,
socket,
Expand Down Expand Up @@ -78,6 +78,7 @@ enum class type
thread_runner,
signal_manager,
peer_history,
message_processor,

// bootstrap
bulk_pull_client,
Expand Down
3 changes: 3 additions & 0 deletions nano/lib/stats_enums.hpp
Expand Up @@ -72,6 +72,9 @@ enum class type
syn_cookies,
peer_history,
port_mapping,
message_processor,
message_processor_overfill,
message_processor_type,

bootstrap_ascending,
bootstrap_ascending_accounts,
Expand Down
4 changes: 2 additions & 2 deletions nano/lib/thread_roles.cpp
Expand Up @@ -22,8 +22,8 @@ std::string nano::thread_role::get_string (nano::thread_role::name role)
case nano::thread_role::name::work:
thread_role_name_string = "Work pool";
break;
case nano::thread_role::name::packet_processing:
thread_role_name_string = "Pkt processing";
case nano::thread_role::name::message_processing:
thread_role_name_string = "Msg processing";
break;
case nano::thread_role::name::vote_processing:
thread_role_name_string = "Vote processing";
Expand Down
2 changes: 1 addition & 1 deletion nano/lib/thread_roles.hpp
Expand Up @@ -12,7 +12,7 @@ enum class name
unknown,
io,
work,
packet_processing,
message_processing,
vote_processing,
block_processing,
request_loop,
Expand Down
2 changes: 2 additions & 0 deletions nano/node/CMakeLists.txt
Expand Up @@ -99,6 +99,8 @@ add_library(
local_vote_history.hpp
make_store.hpp
make_store.cpp
message_processor.hpp
message_processor.cpp
network.hpp
network.cpp
nodeconfig.hpp
Expand Down

0 comments on commit 6eeaf21

Please sign in to comment.