Skip to content

Commit

Permalink
feat(node): notify storage payment on receival
Browse files Browse the repository at this point in the history
  • Loading branch information
maqi committed May 1, 2024
1 parent 5252242 commit 56664e8
Show file tree
Hide file tree
Showing 10 changed files with 274 additions and 10 deletions.
18 changes: 18 additions & 0 deletions sn_networking/src/event/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,13 @@ pub enum NetworkEvent {
peer_id: PeerId,
keys_to_verify: Vec<NetworkAddress>,
},
/// Notification from peer claiming received a storage payment
StoragePaymentNotification {
spend_addr: NetworkAddress,
owner: String,
royalty: u64,
store_cost: u64,
},
}

/// Terminate node for the following reason
Expand Down Expand Up @@ -222,6 +229,17 @@ impl Debug for NetworkEvent {
"NetworkEvent::ChunkProofVerification({peer_id:?} {keys_to_verify:?})"
)
}
NetworkEvent::StoragePaymentNotification {
spend_addr,
owner,
royalty,
store_cost,
} => {
write!(
f,
"NetworkEvent::StoragePaymentNotification({spend_addr:?} {owner} {store_cost} {royalty})"
)
}
}
}
}
Expand Down
22 changes: 22 additions & 0 deletions sn_networking/src/event/request_response.rs
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,28 @@ impl SwarmDriver {
error!("Received a bad_peer notification from {detected_by:?}, targeting {bad_peer:?}, which is not us.");
}
}
Request::Cmd(sn_protocol::messages::Cmd::StoragePaymentReceived {
spend_addr,
owner,
royalty,
store_cost,
}) => {
let response = Response::Cmd(
sn_protocol::messages::CmdResponse::StoragePaymentReceived(Ok(())),
);
self.swarm
.behaviour_mut()
.request_response
.send_response(channel, response)
.map_err(|_| NetworkError::InternalMsgChannelDropped)?;

self.send_event(NetworkEvent::StoragePaymentNotification {
spend_addr,
owner,
royalty,
store_cost,
})
}
Request::Query(query) => {
self.send_event(NetworkEvent::QueryRequestReceived {
query,
Expand Down
69 changes: 69 additions & 0 deletions sn_networking/src/spends.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

use crate::{Network, NetworkError, Result};
use futures::future::join_all;
use sn_protocol::NetworkAddress;
use sn_transfers::{is_genesis_spend, SignedSpend, SpendAddress, TransferError};
use std::{collections::BTreeSet, iter::Iterator};

Expand Down Expand Up @@ -54,4 +55,72 @@ impl Network {

Ok(())
}

/// This function verifies a received notification of an owner's claim of storage payment
/// It fetches the correspondent spend, then carry out verification.
/// Once verified, the owner and amount will be reported to Node for further record.
pub async fn handle_storage_payment_notification(
&self,
spend_addr: NetworkAddress,
owner: String,
royalty: u64,
store_cost: u64,
) -> Option<(String, u64, u64)> {
let spend_address = if let Some(spend_address) = spend_addr.as_spend_address() {
spend_address
} else {
error!("When verify storage payment notification, cannot parse SpendAddress from {spend_addr:?}");
return None;
};

let spend = match self.get_spend(spend_address).await {
Ok(spend) => spend,
Err(err) => {
error!(
"When verify storage payment notification, cannot get spend {spend_address:?} {err:?}"
);
return None;
}
};

// TODO: using proper CASH_NOTE_PURPOSE consts once PR 1653 merged

Check notice

Code scanning / devskim

A "TODO" or similar was left in source code, possibly indicating incomplete functionality Note

Suspicious comment
let royalty_keyword = "CASH_NOTE_REASON_FOR_NETWORK_ROYALTIES".to_string();
let change_keyword = "CASH_NOTE_REASON_FOR_CHANGE".to_string();

// 1, The spend's outputs shall have equal number of royalty and store_cost payments
// 2, The claimed payment shall be within the spend's outputs
let num_of_royalties = spend
.spent_tx()
.outputs
.iter()
.filter(|o| o.purpose == royalty_keyword)
.count();
let num_of_change = spend
.spent_tx()
.outputs
.iter()
.filter(|o| o.purpose == change_keyword)
.count();
let num_of_store_cost = spend.spent_tx().outputs.len() - num_of_royalties - num_of_change;

let payments_match = num_of_store_cost == num_of_royalties;

let find_royalty = spend
.spent_tx()
.outputs
.iter()
.any(|o| o.purpose == royalty_keyword && o.amount.as_nano() == royalty);
let find_store_cost = spend
.spent_tx()
.outputs
.iter()
.any(|o| o.purpose == owner && o.amount.as_nano() == store_cost);

if payments_match && find_royalty && find_store_cost {
Some((owner, royalty, store_cost))
} else {
error!("Claimed storage payment of ({owner} {royalty} {store_cost}) cann't be verified by the spend {spend:?}");
None
}
}
}
28 changes: 28 additions & 0 deletions sn_node/src/bin/safenode/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ use sn_peers_acquisition::{get_peers_from_args, PeersArgs};
use sn_protocol::{node::get_safenode_root_dir, node_rpc::NodeCtrl};
use std::{
env,
collections::BTreeMap,
io::Write,
net::{IpAddr, Ipv4Addr, SocketAddr},
path::{Path, PathBuf},
Expand Down Expand Up @@ -335,6 +336,11 @@ You can check your reward balance by running:

fn monitor_node_events(mut node_events_rx: NodeEventsReceiver, ctrl_tx: mpsc::Sender<NodeCtrl>) {
let _handle = tokio::spawn(async move {
let mut spends_statistics: BTreeMap<String, Vec<u64>> = Default::default();

// TODO: use the proper CASH_NOTE_PURPOSE const
let royalty_reason = "ROYALTY".to_string();

loop {
match node_events_rx.recv().await {
Ok(NodeEvent::ConnectedToNetwork) => Marker::NodeConnectedToNetwork.log(),
Expand Down Expand Up @@ -366,6 +372,28 @@ fn monitor_node_events(mut node_events_rx: NodeEventsReceiver, ctrl_tx: mpsc::Se
break;
}
}
Ok(NodeEvent::StoragePayments {
owner,
royalty,
store_cost,
}) => {
{
let holders = spends_statistics.entry(royalty_reason.clone()).or_default();
holders.push(royalty);
}
{
let holders = spends_statistics.entry(owner.clone()).or_default();
holders.push(store_cost);
}
let statistics: Vec<_> = spends_statistics
.iter()
.map(|(owner, payments)| {
let total_amount: u64 = payments.iter().sum();
format!("{},{},{total_amount}\n", owner.clone(), payments.len())
})
.collect();
info!("Current storage statistics is:\n{statistics:?}");
}
Ok(event) => {
/* we ignore other events */
trace!("Currently ignored node event {event:?}");
Expand Down
9 changes: 9 additions & 0 deletions sn_node/src/event.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,15 @@ pub enum NodeEvent {
ChannelClosed,
/// Terminates the node
TerminateNode(String),
/// StoragePayment notification
StoragePayments {
/// Owner of the node that received the storage payment
owner: String,
/// Royalty fee paid for the storage
royalty: u64,
/// Cost paid for the storage
store_cost: u64,
},
}

impl NodeEvent {
Expand Down
28 changes: 26 additions & 2 deletions sn_node/src/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -184,12 +184,12 @@ pub(crate) struct Node {
// We keep a copy of the Sender which is clonable and we can obtain a receiver from.
node_cmds: broadcast::Sender<NodeCmd>,
// Peers that are dialed at startup of node.
initial_peers: Arc<Vec<Multiaddr>>,
pub(crate) initial_peers: Arc<Vec<Multiaddr>>,
reward_address: Arc<MainPubkey>,
#[cfg(feature = "open-metrics")]
pub(crate) node_metrics: NodeMetrics,
/// node owner's discord username, in readable format
owner: String,
pub(crate) owner: String,
}

impl Node {
Expand Down Expand Up @@ -485,6 +485,30 @@ impl Node {
network.record_node_issues(peer_id, NodeIssue::FailedChunkProofCheck);
});
}
NetworkEvent::StoragePaymentNotification {
spend_addr,
owner,
royalty,
store_cost,
} => {
event_header = "StoragePaymentNotification";
let network = self.network.clone();
let events_channel = self.events_channel.clone();
info!("Received StoragePaymentNotification, notifying owner {owner:?} received \
{store_cost} tokens for store_cost and {royalty} tokens for royalty, with spend {spend_addr:?}");
let _handle = spawn(async move {
if let Some((owner, royalty, store_cost)) = network
.handle_storage_payment_notification(spend_addr, owner, royalty, store_cost)
.await
{
events_channel.broadcast(NodeEvent::StoragePayments {
owner,
royalty,
store_cost,
});
}
});
}
}

trace!(
Expand Down
66 changes: 58 additions & 8 deletions sn_node/src/put_validation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,14 @@
// permissions and limitations relating to use of the SAFE Network Software.

use crate::{node::Node, quote::verify_quote_for_storecost, Error, Marker, Result};
use libp2p::kad::{Record, RecordKey};
use libp2p::{
kad::{Record, RecordKey},
multiaddr::Protocol,
PeerId,
};
use sn_networking::{get_raw_signed_spends_from_record, GetRecordError, NetworkError};
use sn_protocol::{
messages::CmdOk,
messages::{Cmd, CmdOk, Request},
storage::{
try_deserialize_record, try_serialize_record, Chunk, RecordHeader, RecordKind, RecordType,
SpendAddress,
Expand All @@ -23,7 +27,7 @@ use sn_transfers::{
SignedSpend, Transfer, UniquePubkey, WalletError, NETWORK_ROYALTIES_PK,
};
use std::collections::BTreeSet;
use tokio::task::JoinSet;
use tokio::{spawn, task::JoinSet};
use xor_name::XorName;

impl Node {
Expand All @@ -38,6 +42,7 @@ impl Node {
let already_exists = self
.validate_key_and_existence(&chunk.network_address(), &record_key)
.await?;
let store_cost = payment.quote.cost;

// Validate the payment and that we received what we asked.
// This stores any payments to disk
Expand All @@ -56,7 +61,15 @@ impl Node {
}

// Finally before we store, lets bail for any payment issues
payment_res?;
let spend_address = payment_res?;

// The storage payment notification will only be sent out on the first non-existing
let royalties_fee = calculate_royalties_fee(store_cost);
self.send_storage_payment_notification(
NetworkAddress::from_spend_address(spend_address),
royalties_fee,
store_cost,
);

// Writing chunk to disk takes time, hence try to execute it first.
// So that when the replicate target asking for the copy,
Expand Down Expand Up @@ -219,6 +232,39 @@ impl Node {
}
}

fn send_storage_payment_notification(
&self,
spend_addr: NetworkAddress,
royalties_fee: NanoTokens,
store_cost: NanoTokens,
) {
let request = Request::Cmd(Cmd::StoragePaymentReceived {
spend_addr,
owner: self.owner.clone(),
royalty: royalties_fee.as_nano(),
store_cost: store_cost.as_nano(),
});
let network = self.network.clone();

// TODO: shall use a proper `collector`?

Check notice

Code scanning / devskim

A "TODO" or similar was left in source code, possibly indicating incomplete functionality Note

Suspicious comment
if let Some(target_peer) = self.get_first_initial_peer() {
let _handle = spawn(async move {
network.send_req_ignore_reply(request, target_peer);
});
}
}

fn get_first_initial_peer(&self) -> Option<PeerId> {
if let Some(first_multi_addr) = self.initial_peers.first() {
for protocol in first_multi_addr.iter() {
if let Protocol::P2p(peer_id) = protocol {
return Some(peer_id);
}
}
}
None
}

/// Check key is valid compared to the network name, and if we already have this data or not.
/// returns true if data already exists locally
async fn validate_key_and_existence(
Expand Down Expand Up @@ -487,7 +533,7 @@ impl Node {
&self,
address: &NetworkAddress,
payment: Payment,
) -> Result<()> {
) -> Result<SpendAddress> {
let key = address.to_record_key();
let pretty_key = PrettyPrintRecordKey::from(&key).into_owned();
trace!("Validating record payment for {pretty_key}");
Expand Down Expand Up @@ -521,10 +567,14 @@ impl Node {
.reward_wallet_balance
.set(new_balance as i64);

if royalties_cash_notes_r.is_empty() {
// TODO: so far, it's assumed the store_cost and royalty are paid by single spend

Check notice

Code scanning / devskim

A "TODO" or similar was left in source code, possibly indicating incomplete functionality Note

Suspicious comment
// may need to cover the case that they are covered by different input spends
let parent_spend_address = if let Some(cnr) = royalties_cash_notes_r.first() {
cnr.parent_spend
} else {
warn!("No network royalties payment found for record {pretty_key}");
return Err(Error::NoNetworkRoyaltiesPayment(pretty_key.into_owned()));
}
};

// check if the quote is valid
let storecost = payment.quote.cost;
Expand All @@ -550,7 +600,7 @@ impl Node {
// vdash metric (if modified please notify at https://github.com/happybeing/vdash/issues):
info!("Total payment of {received_fee:?} nanos accepted for record {pretty_key}");

Ok(())
Ok(parent_spend_address)
}

async fn register_validation(
Expand Down
8 changes: 8 additions & 0 deletions sn_protocol/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,14 @@ impl NetworkAddress {
}
}

/// Try to return the represented `SpendAddress`.
pub fn as_spend_address(&self) -> Option<SpendAddress> {
match self {
NetworkAddress::SpendAddress(spend_address) => Some(*spend_address),
_ => None,
}
}

/// Return the convertable `RecordKey`.
pub fn to_record_key(&self) -> RecordKey {
match self {
Expand Down

0 comments on commit 56664e8

Please sign in to comment.