Skip to content

Commit

Permalink
Merge pull request #779 from Fi3/AddMsgHandlersForDeserializedMsg
Browse files Browse the repository at this point in the history
Add handlers for deserilized messages
  • Loading branch information
Fi3 committed Mar 2, 2024
2 parents a49588d + b6ef686 commit eb42ee7
Show file tree
Hide file tree
Showing 9 changed files with 212 additions and 90 deletions.
4 changes: 2 additions & 2 deletions protocols/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion protocols/v2/roles-logic-sv2/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "roles_logic_sv2"
version = "0.1.1"
version = "0.1.2"
edition = "2018"
description = "Common handlers for use within SV2 roles"
license = "MIT OR Apache-2.0"
Expand Down
43 changes: 39 additions & 4 deletions protocols/v2/roles-logic-sv2/src/handlers/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ use crate::{
use common_messages_sv2::{
ChannelEndpointChanged, SetupConnection, SetupConnectionError, SetupConnectionSuccess,
};
use const_sv2::*;
use core::convert::TryInto;
use std::sync::Arc;
use tracing::{debug, error, info, trace};
Expand All @@ -33,9 +34,26 @@ where
self_: Arc<Mutex<Self>>,
message_type: u8,
payload: &mut [u8],
routing_logic: CommonRoutingLogic<Router>,
) -> Result<SendTo, Error> {
Self::handle_message_common_deserilized(
self_,
(message_type, payload).try_into(),
routing_logic,
)
}
/// Takes a message and it calls the appropriate handler function
///
/// Arguments:
///
/// * `message_type`: See [`const_sv2`].
///
fn handle_message_common_deserilized(
self_: Arc<Mutex<Self>>,
message: Result<CommonMessages<'_>, Error>,
_routing_logic: CommonRoutingLogic<Router>,
) -> Result<SendTo, Error> {
match (message_type, payload).try_into() {
match message {
Ok(CommonMessages::SetupConnectionSuccess(m)) => {
info!(
"Received SetupConnectionSuccess: version={}, flags={:b}",
Expand Down Expand Up @@ -63,7 +81,9 @@ where
.safe_lock(|x| x.handle_channel_endpoint_changed(m))
.map_err(|e| crate::Error::PoisonLock(e.to_string()))?
}
Ok(CommonMessages::SetupConnection(_)) => Err(Error::UnexpectedMessage(message_type)),
Ok(CommonMessages::SetupConnection(_)) => {
Err(Error::UnexpectedMessage(MESSAGE_TYPE_SETUP_CONNECTION))
}
Err(e) => Err(e),
}
}
Expand Down Expand Up @@ -106,7 +126,6 @@ where
Err(e) => Err(e),
}
}

/// It takes a message type and a payload, and if the message is a serialized setup connection
/// message, it calls the `on_setup_connection` function on the routing logic, and then calls the
/// `handle_setup_connection` function on the router
Expand All @@ -121,7 +140,23 @@ where
payload: &mut [u8],
routing_logic: CommonRoutingLogic<Router>,
) -> Result<SendTo, Error> {
match (message_type, payload).try_into() {
Self::handle_message_common_deserilized(
self_,
(message_type, payload).try_into(),
routing_logic,
)
}

/// It takes a message do setup connection message, it calls
/// the `on_setup_connection` function on the routing logic, and then calls the
/// `handle_setup_connection` function on the router
///
fn handle_message_common_deserilized(
self_: Arc<Mutex<Self>>,
message: Result<CommonMessages<'_>, Error>,
routing_logic: CommonRoutingLogic<Router>,
) -> Result<SendTo, Error> {
match message {
Ok(CommonMessages::SetupConnection(m)) => {
info!(
"Received SetupConnection: version={}, flags={:b}",
Expand Down
18 changes: 16 additions & 2 deletions protocols/v2/roles-logic-sv2/src/handlers/job_declaration.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,14 @@ where
message_type: u8,
payload: &mut [u8],
) -> Result<SendTo, Error> {
match (message_type, payload).try_into() {
Self::handle_message_job_declaration_deserialized(self_, (message_type, payload).try_into())
}

fn handle_message_job_declaration_deserialized(
self_: Arc<Mutex<Self>>,
message: Result<JobDeclaration<'_>, Error>,
) -> Result<SendTo, Error> {
match message {
Ok(JobDeclaration::AllocateMiningJobTokenSuccess(message)) => {
debug!(
"Received AllocateMiningJobTokenSuccess with id: {}",
Expand Down Expand Up @@ -117,7 +124,14 @@ where
message_type: u8,
payload: &mut [u8],
) -> Result<SendTo, Error> {
match (message_type, payload).try_into() {
Self::handle_message_job_declaration_deserialized(self_, (message_type, payload).try_into())
}

fn handle_message_job_declaration_deserialized(
self_: Arc<Mutex<Self>>,
message: Result<JobDeclaration<'_>, Error>,
) -> Result<SendTo, Error> {
match message {
Ok(JobDeclaration::AllocateMiningJobToken(message)) => {
debug!(
"Received AllocateMiningJobToken with id: {}",
Expand Down
111 changes: 89 additions & 22 deletions protocols/v2/roles-logic-sv2/src/handlers/mining.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ use crate::{
use super::SendTo_;

use crate::utils::Mutex;
use const_sv2::*;
use std::{fmt::Debug as D, sync::Arc};
use tracing::{debug, error, info, trace};

Expand Down Expand Up @@ -49,6 +50,25 @@ pub trait ParseDownstreamMiningMessages<
payload: &mut [u8],
routing_logic: MiningRoutingLogic<Self, Up, Selector, Router>,
) -> Result<SendTo<Up>, Error>
where
Self: IsMiningDownstream + Sized,
{
match Self::handle_message_mining_deserialized(
self_mutex,
(message_type, payload).try_into(),
routing_logic,
) {
Err(Error::UnexpectedMessage(0)) => Err(Error::UnexpectedMessage(message_type)),
result => result,
}
}

/// Used to route SV2 mining messages from the downstream
fn handle_message_mining_deserialized(
self_mutex: Arc<Mutex<Self>>,
message: Result<Mining<'_>, Error>,
routing_logic: MiningRoutingLogic<Self, Up, Selector, Router>,
) -> Result<SendTo<Up>, Error>
where
Self: IsMiningDownstream + Sized,
{
Expand All @@ -61,7 +81,7 @@ pub trait ParseDownstreamMiningMessages<
)
})
.map_err(|e| crate::Error::PoisonLock(e.to_string()))?;
match (message_type, payload).try_into() {
match message {
Ok(Mining::OpenStandardMiningChannel(mut m)) => {
info!(
"Received OpenStandardMiningChannel from: {} with id: {}",
Expand Down Expand Up @@ -106,7 +126,9 @@ pub trait ParseDownstreamMiningMessages<
SupportedChannelTypes::Standard => self_mutex
.safe_lock(|self_| self_.handle_open_standard_mining_channel(m, upstream))
.map_err(|e| crate::Error::PoisonLock(e.to_string()))?,
SupportedChannelTypes::Extended => Err(Error::UnexpectedMessage(message_type)),
SupportedChannelTypes::Extended => Err(Error::UnexpectedMessage(
MESSAGE_TYPE_OPEN_STANDARD_MINING_CHANNEL,
)),
SupportedChannelTypes::Group => self_mutex
.safe_lock(|self_| self_.handle_open_standard_mining_channel(m, upstream))
.map_err(|e| crate::Error::PoisonLock(e.to_string()))?,
Expand Down Expand Up @@ -137,11 +159,15 @@ pub trait ParseDownstreamMiningMessages<
channel_type
);
match channel_type {
SupportedChannelTypes::Standard => Err(Error::UnexpectedMessage(message_type)),
SupportedChannelTypes::Standard => Err(Error::UnexpectedMessage(
MESSAGE_TYPE_OPEN_EXTENDED_MINING_CHANNEL,
)),
SupportedChannelTypes::Extended => self_mutex
.safe_lock(|self_| self_.handle_open_extended_mining_channel(m))
.map_err(|e| crate::Error::PoisonLock(e.to_string()))?,
SupportedChannelTypes::Group => Err(Error::UnexpectedMessage(message_type)),
SupportedChannelTypes::Group => Err(Error::UnexpectedMessage(
MESSAGE_TYPE_OPEN_EXTENDED_MINING_CHANNEL,
)),
SupportedChannelTypes::GroupAndExtended => self_mutex
.safe_lock(|self_| self_.handle_open_extended_mining_channel(m))
.map_err(|e| crate::Error::PoisonLock(e.to_string()))?,
Expand Down Expand Up @@ -181,7 +207,9 @@ pub trait ParseDownstreamMiningMessages<
.safe_lock(|self_| self_.handle_submit_shares_standard(m))
.map_err(|e| crate::Error::PoisonLock(e.to_string()))?
}
SupportedChannelTypes::Extended => Err(Error::UnexpectedMessage(message_type)),
SupportedChannelTypes::Extended => Err(Error::UnexpectedMessage(
MESSAGE_TYPE_SUBMIT_SHARES_STANDARD,
)),
SupportedChannelTypes::Group => {
debug!("Received SubmitSharesStandard->Group message");
trace!("SubmitSharesStandard {:?}", m);
Expand All @@ -201,11 +229,15 @@ pub trait ParseDownstreamMiningMessages<
debug!("Received SubmitSharesExtended message");
trace!("SubmitSharesExtended {:?}", m);
match channel_type {
SupportedChannelTypes::Standard => Err(Error::UnexpectedMessage(message_type)),
SupportedChannelTypes::Standard => Err(Error::UnexpectedMessage(
MESSAGE_TYPE_SUBMIT_SHARES_EXTENDED,
)),
SupportedChannelTypes::Extended => self_mutex
.safe_lock(|self_| self_.handle_submit_shares_extended(m))
.map_err(|e| crate::Error::PoisonLock(e.to_string()))?,
SupportedChannelTypes::Group => Err(Error::UnexpectedMessage(message_type)),
SupportedChannelTypes::Group => Err(Error::UnexpectedMessage(
MESSAGE_TYPE_SUBMIT_SHARES_EXTENDED,
)),
SupportedChannelTypes::GroupAndExtended => self_mutex
.safe_lock(|self_| self_.handle_submit_shares_extended(m))
.map_err(|e| crate::Error::PoisonLock(e.to_string()))?,
Expand All @@ -224,10 +256,10 @@ pub trait ParseDownstreamMiningMessages<
(SupportedChannelTypes::GroupAndExtended, true) => self_mutex
.safe_lock(|self_| self_.handle_set_custom_mining_job(m))
.map_err(|e| crate::Error::PoisonLock(e.to_string()))?,
_ => Err(Error::UnexpectedMessage(message_type)),
_ => Err(Error::UnexpectedMessage(MESSAGE_TYPE_SET_CUSTOM_MINING_JOB)),
}
}
Ok(_) => Err(Error::UnexpectedMessage(message_type)),
Ok(_) => Err(Error::UnexpectedMessage(0)),
Err(e) => Err(e),
}
}
Expand Down Expand Up @@ -290,12 +322,27 @@ pub trait ParseUpstreamMiningMessages<
message_type: u8,
payload: &mut [u8],
routing_logic: MiningRoutingLogic<Down, Self, Selector, Router>,
) -> Result<SendTo<Down>, Error> {
match Self::handle_message_mining_deserialized(
self_mutex,
(message_type, payload).try_into(),
routing_logic,
) {
Err(Error::UnexpectedMessage(0)) => Err(Error::UnexpectedMessage(message_type)),
result => result,
}
}

fn handle_message_mining_deserialized(
self_mutex: Arc<Mutex<Self>>,
message: Result<Mining, Error>,
routing_logic: MiningRoutingLogic<Down, Self, Selector, Router>,
) -> Result<SendTo<Down>, Error> {
let (channel_type, is_work_selection_enabled) = self_mutex
.safe_lock(|s| (s.get_channel_type(), s.is_work_selection_enabled()))
.map_err(|e| crate::Error::PoisonLock(e.to_string()))?;

match (message_type, payload).try_into() {
match message {
Ok(Mining::OpenStandardMiningChannelSuccess(mut m)) => {
let remote = match routing_logic {
MiningRoutingLogic::None => None,
Expand All @@ -313,7 +360,9 @@ pub trait ParseUpstreamMiningMessages<
SupportedChannelTypes::Standard => self_mutex
.safe_lock(|s| s.handle_open_standard_mining_channel_success(m, remote))
.map_err(|e| crate::Error::PoisonLock(e.to_string()))?,
SupportedChannelTypes::Extended => Err(Error::UnexpectedMessage(message_type)),
SupportedChannelTypes::Extended => Err(Error::UnexpectedMessage(
MESSAGE_TYPE_OPEN_STANDARD_MINING_CHANNEL_SUCCESS,
)),
SupportedChannelTypes::Group => self_mutex
.safe_lock(|s| s.handle_open_standard_mining_channel_success(m, remote))
.map_err(|e| crate::Error::PoisonLock(e.to_string()))?,
Expand All @@ -326,11 +375,15 @@ pub trait ParseUpstreamMiningMessages<
info!("Received OpenExtendedMiningChannelSuccess with request id: {} and channel id: {}", m.request_id, m.channel_id);
debug!("OpenStandardMiningChannelSuccess: {:?}", m);
match channel_type {
SupportedChannelTypes::Standard => Err(Error::UnexpectedMessage(message_type)),
SupportedChannelTypes::Standard => Err(Error::UnexpectedMessage(
MESSAGE_TYPE_OPEN_EXTENDED_MINING_CHANNEL_SUCCES,
)),
SupportedChannelTypes::Extended => self_mutex
.safe_lock(|s| s.handle_open_extended_mining_channel_success(m))
.map_err(|e| crate::Error::PoisonLock(e.to_string()))?,
SupportedChannelTypes::Group => Err(Error::UnexpectedMessage(message_type)),
SupportedChannelTypes::Group => Err(Error::UnexpectedMessage(
MESSAGE_TYPE_OPEN_EXTENDED_MINING_CHANNEL_SUCCES,
)),
SupportedChannelTypes::GroupAndExtended => self_mutex
.safe_lock(|s| s.handle_open_extended_mining_channel_success(m))
.map_err(|e| crate::Error::PoisonLock(e.to_string()))?,
Expand Down Expand Up @@ -469,18 +522,24 @@ pub trait ParseUpstreamMiningMessages<
SupportedChannelTypes::Standard => self_mutex
.safe_lock(|x| x.handle_new_mining_job(m))
.map_err(|e| crate::Error::PoisonLock(e.to_string()))?,
SupportedChannelTypes::Extended => Err(Error::UnexpectedMessage(message_type)),
SupportedChannelTypes::Group => Err(Error::UnexpectedMessage(message_type)),
SupportedChannelTypes::Extended => {
Err(Error::UnexpectedMessage(MESSAGE_TYPE_NEW_MINING_JOB))
}
SupportedChannelTypes::Group => {
Err(Error::UnexpectedMessage(MESSAGE_TYPE_NEW_MINING_JOB))
}
SupportedChannelTypes::GroupAndExtended => {
Err(Error::UnexpectedMessage(message_type))
Err(Error::UnexpectedMessage(MESSAGE_TYPE_NEW_MINING_JOB))
}
}
}
Ok(Mining::NewExtendedMiningJob(m)) => {
info!("Received new extended mining job for channel id: {} with job id: {} is_future: {}",m.channel_id, m.job_id, m.is_future());
debug!("NewExtendedMiningJob: {:?}", m);
match channel_type {
SupportedChannelTypes::Standard => Err(Error::UnexpectedMessage(message_type)),
SupportedChannelTypes::Standard => Err(Error::UnexpectedMessage(
MESSAGE_TYPE_NEW_EXTENDED_MINING_JOB,
)),
SupportedChannelTypes::Extended => self_mutex
.safe_lock(|x| x.handle_new_extended_mining_job(m))
.map_err(|e| crate::Error::PoisonLock(e.to_string()))?,
Expand Down Expand Up @@ -527,7 +586,9 @@ pub trait ParseUpstreamMiningMessages<
(SupportedChannelTypes::GroupAndExtended, true) => self_mutex
.safe_lock(|x| x.handle_set_custom_mining_job_success(m))
.map_err(|e| crate::Error::PoisonLock(e.to_string()))?,
_ => Err(Error::UnexpectedMessage(message_type)),
_ => Err(Error::UnexpectedMessage(
MESSAGE_TYPE_SET_CUSTOM_MINING_JOB_SUCCESS,
)),
}
}

Expand All @@ -546,7 +607,9 @@ pub trait ParseUpstreamMiningMessages<
(SupportedChannelTypes::GroupAndExtended, true) => self_mutex
.safe_lock(|x| x.handle_set_custom_mining_job_error(m))
.map_err(|e| crate::Error::PoisonLock(e.to_string()))?,
_ => Err(Error::UnexpectedMessage(message_type)),
_ => Err(Error::UnexpectedMessage(
MESSAGE_TYPE_SET_CUSTOM_MINING_JOB_ERROR,
)),
}
}
Ok(Mining::SetTarget(m)) => {
Expand Down Expand Up @@ -590,8 +653,12 @@ pub trait ParseUpstreamMiningMessages<
info!("Received SetGroupChannel");
debug!("SetGroupChannel: {:?}", m);
match channel_type {
SupportedChannelTypes::Standard => Err(Error::UnexpectedMessage(message_type)),
SupportedChannelTypes::Extended => Err(Error::UnexpectedMessage(message_type)),
SupportedChannelTypes::Standard => {
Err(Error::UnexpectedMessage(MESSAGE_TYPE_SET_GROUP_CHANNEL))
}
SupportedChannelTypes::Extended => {
Err(Error::UnexpectedMessage(MESSAGE_TYPE_SET_GROUP_CHANNEL))
}
SupportedChannelTypes::Group => self_mutex
.safe_lock(|x| x.handle_set_group_channel(m))
.map_err(|e| crate::Error::PoisonLock(e.to_string()))?,
Expand All @@ -600,7 +667,7 @@ pub trait ParseUpstreamMiningMessages<
.map_err(|e| crate::Error::PoisonLock(e.to_string()))?,
}
}
Ok(_) => Err(Error::UnexpectedMessage(message_type)),
Ok(_) => Err(Error::UnexpectedMessage(0)),
Err(e) => Err(e),
}
}
Expand Down

0 comments on commit eb42ee7

Please sign in to comment.