Skip to content

Commit

Permalink
feat(node): notify storage payment on receival
Browse files Browse the repository at this point in the history
  • Loading branch information
maqi committed May 2, 2024
1 parent d884f9d commit 81483c4
Show file tree
Hide file tree
Showing 15 changed files with 323 additions and 16 deletions.
16 changes: 16 additions & 0 deletions .github/workflows/memcheck.yml
Original file line number Diff line number Diff line change
Expand Up @@ -439,6 +439,14 @@ jobs:
if: always()
timeout-minutes: 1

- name: Check bootstrap_node directory
run: |
ls -l $BOOTSTRAP_NODE_DATA_PATH
mv $BOOTSTRAP_NODE_DATA_PATH/safenode.log ./bootstrap_node.log
continue-on-error: true
if: always()
timeout-minutes: 1

- name: Upload faucet log
uses: actions/upload-artifact@main
with:
Expand All @@ -455,3 +463,11 @@ jobs:
# path: spend_dag_and_statistics.txt
# continue-on-error: true
# if: always()

- name: Upload bootstrap_node log
uses: actions/upload-artifact@main
with:
name: memory_check_bootstrap_node_log
path: bootstrap_node.log
continue-on-error: true
if: always()
15 changes: 15 additions & 0 deletions .github/workflows/merge.yml
Original file line number Diff line number Diff line change
Expand Up @@ -1020,6 +1020,21 @@ jobs:
CLIENT_DATA_PATH: /home/runner/.local/share/safe/client
timeout-minutes: 10

- name: Verify spends statistics
shell: bash
timeout-minutes: 10
run: |
min_storage_payment_times="3000"
storage_payment_notification_times=$(rg "Current storage statistics is" $NODE_DATA_PATH/*/logs/* -c --stats | \
rg "(\d+) matches" | rg "\d+" -o)
if (( $(echo "$storage_payment_notification_times < $min_storage_payment_times" | bc -l) )); then
echo "Storage payment notification times below the threshold: $storage_payment_notification_times"
exit 1
fi
env:
NODE_DATA_PATH: /home/runner/.local/share/safe/node
if: always()

replication_bench_with_heavy_upload:
if: "!startsWith(github.event.head_commit.message, 'chore(release):')"
name: Replication bench with heavy upload
Expand Down
12 changes: 10 additions & 2 deletions sn_logging/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,11 +39,19 @@ fn current_exe_name() -> String {
std::path::Path::new(&arg).file_name().map(|s| {
let mut name = s.to_string_lossy().into_owned();
// remove sn_ prefix if present
name = name.strip_prefix("sn_").unwrap_or(&name).to_owned();
name = if let Some(name) = name.strip_prefix("sn_") {
name.to_string()
} else {
name.to_owned()
};

// remove .exe prefix on windows
if cfg!(windows) && name.to_lowercase().ends_with(".exe") {
name = name.strip_suffix(".exe").unwrap_or(&name).to_owned();
name = if let Some(name) = name.strip_suffix(".exe") {
name.to_string()
} else {
name.to_owned()
};
}

// if the name is safe, identify it is the client
Expand Down
18 changes: 18 additions & 0 deletions sn_networking/src/event/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,13 @@ pub enum NetworkEvent {
peer_id: PeerId,
keys_to_verify: Vec<NetworkAddress>,
},
/// Notification from peer claiming received a storage payment
StoragePaymentNotification {
spend_addr: NetworkAddress,
owner: String,
royalty: u64,
store_cost: u64,
},
}

/// Terminate node for the following reason
Expand Down Expand Up @@ -222,6 +229,17 @@ impl Debug for NetworkEvent {
"NetworkEvent::ChunkProofVerification({peer_id:?} {keys_to_verify:?})"
)
}
NetworkEvent::StoragePaymentNotification {
spend_addr,
owner,
royalty,
store_cost,
} => {
write!(
f,
"NetworkEvent::StoragePaymentNotification({spend_addr:?} {owner} {store_cost} {royalty})"
)
}
}
}
}
Expand Down
22 changes: 22 additions & 0 deletions sn_networking/src/event/request_response.rs
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,28 @@ impl SwarmDriver {
error!("Received a bad_peer notification from {detected_by:?}, targeting {bad_peer:?}, which is not us.");
}
}
Request::Cmd(sn_protocol::messages::Cmd::StoragePaymentReceived {
spend_addr,
owner,
royalty,
store_cost,
}) => {
let response = Response::Cmd(
sn_protocol::messages::CmdResponse::StoragePaymentReceived(Ok(())),
);
self.swarm
.behaviour_mut()
.request_response
.send_response(channel, response)
.map_err(|_| NetworkError::InternalMsgChannelDropped)?;

self.send_event(NetworkEvent::StoragePaymentNotification {
spend_addr,
owner,
royalty,
store_cost,
})
}
Request::Query(query) => {
self.send_event(NetworkEvent::QueryRequestReceived {
query,
Expand Down
73 changes: 72 additions & 1 deletion sn_networking/src/spends.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,11 @@

use crate::{Network, NetworkError, Result};
use futures::future::join_all;
use sn_transfers::{is_genesis_spend, SignedSpend, SpendAddress, TransferError};
use sn_protocol::NetworkAddress;
use sn_transfers::{
is_genesis_spend, SignedSpend, SpendAddress, TransferError, CASHNOTE_PURPOSE_OF_CHANGE,
CASHNOTE_PURPOSE_OF_NETWORK_ROYALTIES,
};
use std::{collections::BTreeSet, iter::Iterator};

impl Network {
Expand Down Expand Up @@ -54,4 +58,71 @@ impl Network {

Ok(())
}

/// This function verifies a received notification of an owner's claim of storage payment
/// It fetches the correspondent spend, then carry out verification.
/// Once verified, the owner and amount will be reported to Node for further record.
pub async fn handle_storage_payment_notification(
&self,
spend_addr: NetworkAddress,
owner: String,
royalty: u64,
store_cost: u64,
) -> Option<(String, u64, u64)> {
let spend_address = if let Some(spend_address) = spend_addr.as_spend_address() {
spend_address
} else {
error!("When verify storage payment notification, cannot parse SpendAddress from {spend_addr:?}");
return None;
};

let spend = match self.get_spend(spend_address).await {
Ok(spend) => spend,
Err(err) => {
error!(
"When verify storage payment notification, cannot get spend {spend_address:?} {err:?}"
);
return None;
}
};

let royalty_keyword = CASHNOTE_PURPOSE_OF_NETWORK_ROYALTIES.to_string();
let change_keyword = CASHNOTE_PURPOSE_OF_CHANGE.to_string();

// 1, The spend's outputs shall have equal number of royalty and store_cost payments
// 2, The claimed payment shall be within the spend's outputs
let num_of_royalties = spend
.spent_tx()
.outputs
.iter()
.filter(|o| o.purpose == royalty_keyword)
.count();
let num_of_change = spend
.spent_tx()
.outputs
.iter()
.filter(|o| o.purpose == change_keyword)
.count();
let num_of_store_cost = spend.spent_tx().outputs.len() - num_of_royalties - num_of_change;

let payments_match = num_of_store_cost == num_of_royalties;

let find_royalty = spend
.spent_tx()
.outputs
.iter()
.any(|o| o.purpose == royalty_keyword && o.amount.as_nano() == royalty);
let find_store_cost = spend
.spent_tx()
.outputs
.iter()
.any(|o| o.purpose == owner && o.amount.as_nano() == store_cost);

if payments_match && find_royalty && find_store_cost {
Some((owner, royalty, store_cost))
} else {
error!("Claimed storage payment of ({owner} {royalty} {store_cost}) cann't be verified by the spend {spend:?}");
None
}
}
}
28 changes: 28 additions & 0 deletions sn_node/src/bin/safenode/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,9 @@ use sn_logging::{Level, LogFormat, LogOutputDest, ReloadHandle};
use sn_node::{Marker, NodeBuilder, NodeEvent, NodeEventsReceiver};
use sn_peers_acquisition::{get_peers_from_args, PeersArgs};
use sn_protocol::{node::get_safenode_root_dir, node_rpc::NodeCtrl};
use sn_transfers::CASHNOTE_PURPOSE_OF_NETWORK_ROYALTIES;
use std::{
collections::BTreeMap,
env,
io::Write,
net::{IpAddr, Ipv4Addr, SocketAddr},
Expand Down Expand Up @@ -335,6 +337,10 @@ You can check your reward balance by running:

fn monitor_node_events(mut node_events_rx: NodeEventsReceiver, ctrl_tx: mpsc::Sender<NodeCtrl>) {
let _handle = tokio::spawn(async move {
let mut spends_statistics: BTreeMap<String, Vec<u64>> = Default::default();

let royalty_reason = CASHNOTE_PURPOSE_OF_NETWORK_ROYALTIES.to_string();

loop {
match node_events_rx.recv().await {
Ok(NodeEvent::ConnectedToNetwork) => Marker::NodeConnectedToNetwork.log(),
Expand Down Expand Up @@ -366,6 +372,28 @@ fn monitor_node_events(mut node_events_rx: NodeEventsReceiver, ctrl_tx: mpsc::Se
break;
}
}
Ok(NodeEvent::StoragePayments {
owner,
royalty,
store_cost,
}) => {
{
let holders = spends_statistics.entry(royalty_reason.clone()).or_default();
holders.push(royalty);
}
{
let holders = spends_statistics.entry(owner.clone()).or_default();
holders.push(store_cost);
}
let statistics: Vec<_> = spends_statistics
.iter()
.map(|(owner, payments)| {
let total_amount: u64 = payments.iter().sum();
format!("{},{},{total_amount}\n", owner.clone(), payments.len())
})
.collect();
info!("Current storage statistics is:\n{statistics:?}");
}
Ok(event) => {
/* we ignore other events */
trace!("Currently ignored node event {event:?}");
Expand Down
9 changes: 9 additions & 0 deletions sn_node/src/event.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,15 @@ pub enum NodeEvent {
ChannelClosed,
/// Terminates the node
TerminateNode(String),
/// StoragePayment notification
StoragePayments {
/// Owner of the node that received the storage payment
owner: String,
/// Royalty fee paid for the storage
royalty: u64,
/// Cost paid for the storage
store_cost: u64,
},
}

impl NodeEvent {
Expand Down
28 changes: 26 additions & 2 deletions sn_node/src/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -184,12 +184,12 @@ pub(crate) struct Node {
// We keep a copy of the Sender which is clonable and we can obtain a receiver from.
node_cmds: broadcast::Sender<NodeCmd>,
// Peers that are dialed at startup of node.
initial_peers: Arc<Vec<Multiaddr>>,
pub(crate) initial_peers: Arc<Vec<Multiaddr>>,
reward_address: Arc<MainPubkey>,
#[cfg(feature = "open-metrics")]
pub(crate) node_metrics: NodeMetrics,
/// node owner's discord username, in readable format
owner: String,
pub(crate) owner: String,
}

impl Node {
Expand Down Expand Up @@ -485,6 +485,30 @@ impl Node {
network.record_node_issues(peer_id, NodeIssue::FailedChunkProofCheck);
});
}
NetworkEvent::StoragePaymentNotification {
spend_addr,
owner,
royalty,
store_cost,
} => {
event_header = "StoragePaymentNotification";
let network = self.network.clone();
let events_channel = self.events_channel.clone();
info!("Received StoragePaymentNotification, notifying owner {owner:?} received \
{store_cost} tokens for store_cost and {royalty} tokens for royalty, with spend {spend_addr:?}");
let _handle = spawn(async move {
if let Some((owner, royalty, store_cost)) = network
.handle_storage_payment_notification(spend_addr, owner, royalty, store_cost)
.await
{
events_channel.broadcast(NodeEvent::StoragePayments {
owner,
royalty,
store_cost,
});
}
});
}
}

trace!(
Expand Down

0 comments on commit 81483c4

Please sign in to comment.