Skip to content

Commit

Permalink
feat(node): notify storage payment on receival
Browse files Browse the repository at this point in the history
  • Loading branch information
maqi committed May 3, 2024
1 parent bc3cd63 commit abd67cf
Show file tree
Hide file tree
Showing 13 changed files with 333 additions and 15 deletions.
16 changes: 16 additions & 0 deletions .github/workflows/memcheck.yml
Original file line number Diff line number Diff line change
Expand Up @@ -439,6 +439,14 @@ jobs:
if: always()
timeout-minutes: 1

- name: Check bootstrap_node directory
run: |
ls -l $BOOTSTRAP_NODE_DATA_PATH
mv $BOOTSTRAP_NODE_DATA_PATH/safenode.log ./bootstrap_node.log
continue-on-error: true
if: always()
timeout-minutes: 1

- name: Upload faucet log
uses: actions/upload-artifact@main
with:
Expand All @@ -455,3 +463,11 @@ jobs:
# path: spend_dag_and_statistics.txt
# continue-on-error: true
# if: always()

- name: Upload bootstrap_node log
uses: actions/upload-artifact@main
with:
name: memory_check_bootstrap_node_log
path: bootstrap_node.log
continue-on-error: true
if: always()
15 changes: 15 additions & 0 deletions .github/workflows/merge.yml
Original file line number Diff line number Diff line change
Expand Up @@ -1020,6 +1020,21 @@ jobs:
CLIENT_DATA_PATH: /home/runner/.local/share/safe/client
timeout-minutes: 10

- name: Verify spends statistics
shell: bash
timeout-minutes: 10
run: |
min_storage_payment_times="3000"
storage_payment_notification_times=$(rg "Current storage statistics is" $NODE_DATA_PATH/*/logs/* -c --stats | \
rg "(\d+) matches" | rg "\d+" -o)
if (( $(echo "$storage_payment_notification_times < $min_storage_payment_times" | bc -l) )); then
echo "Storage payment notification times below the threshold: $storage_payment_notification_times"
exit 1
fi
env:
NODE_DATA_PATH: /home/runner/.local/share/safe/node
if: always()

replication_bench_with_heavy_upload:
if: "!startsWith(github.event.head_commit.message, 'chore(release):')"
name: Replication bench with heavy upload
Expand Down
18 changes: 18 additions & 0 deletions sn_networking/src/event/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,13 @@ pub enum NetworkEvent {
peer_id: PeerId,
keys_to_verify: Vec<NetworkAddress>,
},
/// Notification from peer claiming received a storage payment
StoragePaymentNotification {
spend_addr: NetworkAddress,
owner: String,
royalty: u64,
store_cost: u64,
},
}

/// Terminate node for the following reason
Expand Down Expand Up @@ -222,6 +229,17 @@ impl Debug for NetworkEvent {
"NetworkEvent::ChunkProofVerification({peer_id:?} {keys_to_verify:?})"
)
}
NetworkEvent::StoragePaymentNotification {
spend_addr,
owner,
royalty,
store_cost,
} => {
write!(
f,
"NetworkEvent::StoragePaymentNotification({spend_addr:?} {owner} {store_cost} {royalty})"
)
}
}
}
}
Expand Down
22 changes: 22 additions & 0 deletions sn_networking/src/event/request_response.rs
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,28 @@ impl SwarmDriver {
error!("Received a bad_peer notification from {detected_by:?}, targeting {bad_peer:?}, which is not us.");
}
}
Request::Cmd(sn_protocol::messages::Cmd::StoragePaymentReceived {
spend_addr,
owner,
royalty,
store_cost,
}) => {
let response = Response::Cmd(
sn_protocol::messages::CmdResponse::StoragePaymentReceived(Ok(())),
);
self.swarm
.behaviour_mut()
.request_response
.send_response(channel, response)
.map_err(|_| NetworkError::InternalMsgChannelDropped)?;

self.send_event(NetworkEvent::StoragePaymentNotification {
spend_addr,
owner,
royalty,
store_cost,
})
}
Request::Query(query) => {
self.send_event(NetworkEvent::QueryRequestReceived {
query,
Expand Down
65 changes: 64 additions & 1 deletion sn_networking/src/spends.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,10 @@

use crate::{Network, NetworkError, Result};
use futures::future::join_all;
use sn_transfers::{is_genesis_spend, SignedSpend, SpendAddress, TransferError};
use sn_transfers::{
is_genesis_spend, SignedSpend, SpendAddress, TransferError, CASHNOTE_PURPOSE_OF_CHANGE,
CASHNOTE_PURPOSE_OF_NETWORK_ROYALTIES,
};
use std::{collections::BTreeSet, iter::Iterator};

impl Network {
Expand Down Expand Up @@ -54,4 +57,64 @@ impl Network {

Ok(())
}

/// This function verifies a received notification of an owner's claim of storage payment
/// It fetches the correspondent spend, then carry out verification.
/// Once verified, the owner and amount will be reported to Node for further record.
pub async fn handle_storage_payment_notification(
&self,
spend_address: SpendAddress,
owner: String,
royalty: u64,
store_cost: u64,
) -> Option<(String, u64, u64)> {
let spend = match self.get_spend(spend_address).await {
Ok(spend) => spend,
Err(err) => {
error!(
"When verify storage payment notification, cannot get spend {spend_address:?} {err:?}"
);
return None;
}
};

let royalty_keyword = CASHNOTE_PURPOSE_OF_NETWORK_ROYALTIES.to_string();
let change_keyword = CASHNOTE_PURPOSE_OF_CHANGE.to_string();

// 1, The spend's outputs shall have equal number of royalty and store_cost payments
// 2, The claimed payment shall be within the spend's outputs
let num_of_royalties = spend
.spent_tx()
.outputs
.iter()
.filter(|o| o.purpose == royalty_keyword)
.count();
let num_of_change = spend
.spent_tx()
.outputs
.iter()
.filter(|o| o.purpose == change_keyword)
.count();
let num_of_store_cost = spend.spent_tx().outputs.len() - num_of_royalties - num_of_change;

let payments_match = num_of_store_cost == num_of_royalties;

let find_royalty = spend
.spent_tx()
.outputs
.iter()
.any(|o| o.purpose == royalty_keyword && o.amount.as_nano() == royalty);
let find_store_cost = spend
.spent_tx()
.outputs
.iter()
.any(|o| o.purpose == owner && o.amount.as_nano() == store_cost);

if payments_match && find_royalty && find_store_cost {
Some((owner, royalty, store_cost))
} else {
error!("Claimed storage payment of ({owner} {royalty} {store_cost}) cann't be verified by the spend {spend:?}");
None
}
}
}
43 changes: 42 additions & 1 deletion sn_node/src/bin/safenode/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,10 @@ use sn_logging::metrics::init_metrics;
use sn_logging::{Level, LogFormat, LogOutputDest, ReloadHandle};
use sn_node::{Marker, NodeBuilder, NodeEvent, NodeEventsReceiver};
use sn_peers_acquisition::{get_peers_from_args, PeersArgs};
use sn_protocol::{node::get_safenode_root_dir, node_rpc::NodeCtrl};
use sn_protocol::{node::get_safenode_root_dir, node_rpc::NodeCtrl, storage::SpendAddress};
use sn_transfers::CASHNOTE_PURPOSE_OF_NETWORK_ROYALTIES;
use std::{
collections::{BTreeMap, BTreeSet},
env,
io::Write,
net::{IpAddr, Ipv4Addr, SocketAddr},
Expand Down Expand Up @@ -335,6 +337,11 @@ You can check your reward balance by running:

fn monitor_node_events(mut node_events_rx: NodeEventsReceiver, ctrl_tx: mpsc::Sender<NodeCtrl>) {
let _handle = tokio::spawn(async move {
let mut spends_statistics: BTreeMap<String, Vec<u64>> = Default::default();
let mut claimed_spends: BTreeMap<SpendAddress, BTreeSet<String>> = Default::default();

let royalty_reason = CASHNOTE_PURPOSE_OF_NETWORK_ROYALTIES.to_string();

loop {
match node_events_rx.recv().await {
Ok(NodeEvent::ConnectedToNetwork) => Marker::NodeConnectedToNetwork.log(),
Expand Down Expand Up @@ -366,6 +373,40 @@ fn monitor_node_events(mut node_events_rx: NodeEventsReceiver, ctrl_tx: mpsc::Se
break;
}
}
Ok(NodeEvent::StoragePayments {
spend_address,
owner,
royalty,
store_cost,
}) => {
let holders = claimed_spends.entry(spend_address).or_default();
if !holders.insert(owner.clone()) {
warn!("Owner {owner} already claimed storage payment within spend {spend_address:?}");
continue;
}
{
let holders = spends_statistics.entry(royalty_reason.clone()).or_default();
holders.push(royalty);
}
{
let holders = spends_statistics.entry(owner.clone()).or_default();
holders.push(store_cost);
}
let statistics: Vec<_> = spends_statistics
.iter()
.map(|(owner, payments)| {
let total_amount: u64 = payments.iter().sum();
format!("{},{},{total_amount}\n", owner.clone(), payments.len())
})
.collect();

let mut content = "\nCurrent storage statistics is:".to_string();
content = format!("{content}\nOwner, Times, Amount\n");
for entry in statistics.iter() {
content = format!("{content}\n{entry}");
}
info!("{content}");
}
Ok(event) => {
/* we ignore other events */
trace!("Currently ignored node event {event:?}");
Expand Down
13 changes: 12 additions & 1 deletion sn_node/src/event.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
use crate::error::{Error, Result};

use serde::{Deserialize, Serialize};
use sn_protocol::storage::{ChunkAddress, RegisterAddress};
use sn_protocol::storage::{ChunkAddress, RegisterAddress, SpendAddress};
use sn_transfers::UniquePubkey;
use tokio::sync::broadcast;

Expand Down Expand Up @@ -68,6 +68,17 @@ pub enum NodeEvent {
ChannelClosed,
/// Terminates the node
TerminateNode(String),
/// StoragePayment notification
StoragePayments {
/// Address of the spend that contains the claimed storage payment
spend_address: SpendAddress,
/// Owner of the node that received the storage payment
owner: String,
/// Royalty fee paid for the storage
royalty: u64,
/// Cost paid for the storage
store_cost: u64,
},
}

impl NodeEvent {
Expand Down
42 changes: 40 additions & 2 deletions sn_node/src/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -184,12 +184,12 @@ pub(crate) struct Node {
// We keep a copy of the Sender which is clonable and we can obtain a receiver from.
node_cmds: broadcast::Sender<NodeCmd>,
// Peers that are dialed at startup of node.
initial_peers: Arc<Vec<Multiaddr>>,
pub(crate) initial_peers: Arc<Vec<Multiaddr>>,
reward_address: Arc<MainPubkey>,
#[cfg(feature = "open-metrics")]
pub(crate) node_metrics: NodeMetrics,
/// node owner's discord username, in readable format
owner: String,
pub(crate) owner: String,
}

impl Node {
Expand Down Expand Up @@ -485,6 +485,44 @@ impl Node {
network.record_node_issues(peer_id, NodeIssue::FailedChunkProofCheck);
});
}
NetworkEvent::StoragePaymentNotification {
spend_addr,
owner,
royalty,
store_cost,
} => {
event_header = "StoragePaymentNotification";

let spend_address = if let Some(spend_address) = spend_addr.as_spend_address() {
spend_address
} else {
error!("When verify storage payment notification, cannot parse SpendAddress from {spend_addr:?}");
return;
};

let network = self.network.clone();
let events_channel = self.events_channel.clone();
info!("Received StoragePaymentNotification, notifying owner {owner:?} received \
{store_cost} tokens for store_cost and {royalty} tokens for royalty, with spend {spend_addr:?}");
let _handle = spawn(async move {
if let Some((owner, royalty, store_cost)) = network
.handle_storage_payment_notification(
spend_address,
owner,
royalty,
store_cost,
)
.await
{
events_channel.broadcast(NodeEvent::StoragePayments {
spend_address,
owner,
royalty,
store_cost,
});
}
});
}
}

trace!(
Expand Down

0 comments on commit abd67cf

Please sign in to comment.