Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(node): notify storage payment on receival #1655

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
16 changes: 16 additions & 0 deletions .github/workflows/memcheck.yml
Original file line number Diff line number Diff line change
Expand Up @@ -439,6 +439,14 @@ jobs:
if: always()
timeout-minutes: 1

- name: Check bootstrap_node directory
run: |
ls -l $BOOTSTRAP_NODE_DATA_PATH
mv $BOOTSTRAP_NODE_DATA_PATH/safenode.log ./bootstrap_node.log
continue-on-error: true
if: always()
timeout-minutes: 1

- name: Upload faucet log
uses: actions/upload-artifact@main
with:
Expand All @@ -455,3 +463,11 @@ jobs:
# path: spend_dag_and_statistics.txt
# continue-on-error: true
# if: always()

- name: Upload bootstrap_node log
uses: actions/upload-artifact@main
with:
name: memory_check_bootstrap_node_log
path: bootstrap_node.log
continue-on-error: true
if: always()
15 changes: 15 additions & 0 deletions .github/workflows/merge.yml
Original file line number Diff line number Diff line change
Expand Up @@ -1020,6 +1020,21 @@ jobs:
CLIENT_DATA_PATH: /home/runner/.local/share/safe/client
timeout-minutes: 10

- name: Verify spends statistics
shell: bash
timeout-minutes: 10
run: |
min_storage_payment_times="3000"
storage_payment_notification_times=$(rg "Current storage statistics is" $NODE_DATA_PATH/*/logs/* -c --stats | \
rg "(\d+) matches" | rg "\d+" -o)
if (( $(echo "$storage_payment_notification_times < $min_storage_payment_times" | bc -l) )); then
echo "Storage payment notification times below the threshold: $storage_payment_notification_times"
exit 1
fi
env:
NODE_DATA_PATH: /home/runner/.local/share/safe/node
if: always()

replication_bench_with_heavy_upload:
if: "!startsWith(github.event.head_commit.message, 'chore(release):')"
name: Replication bench with heavy upload
Expand Down
21 changes: 21 additions & 0 deletions sn_networking/src/event/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ use libp2p::{

use sn_protocol::{
messages::{Query, Request, Response},
storage::ChunkAddress,
NetworkAddress, PrettyPrintRecordKey,
};
use sn_transfers::PaymentQuote;
Expand Down Expand Up @@ -143,6 +144,14 @@ pub enum NetworkEvent {
peer_id: PeerId,
keys_to_verify: Vec<NetworkAddress>,
},
/// Notification from peer claiming received a storage payment
StoragePaymentNotification {
chunk_addr: ChunkAddress,
spend_addr: NetworkAddress,
owner: String,
royalty: u64,
store_cost: u64,
},
}

/// Terminate node for the following reason
Expand Down Expand Up @@ -222,6 +231,18 @@ impl Debug for NetworkEvent {
"NetworkEvent::ChunkProofVerification({peer_id:?} {keys_to_verify:?})"
)
}
NetworkEvent::StoragePaymentNotification {
chunk_addr,
spend_addr,
owner,
royalty,
store_cost,
} => {
write!(
f,
"NetworkEvent::StoragePaymentNotification({chunk_addr:?} {spend_addr:?} {owner} {store_cost} {royalty})"
)
}
}
}
}
Expand Down
24 changes: 24 additions & 0 deletions sn_networking/src/event/request_response.rs
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,30 @@ impl SwarmDriver {
error!("Received a bad_peer notification from {detected_by:?}, targeting {bad_peer:?}, which is not us.");
}
}
Request::Cmd(sn_protocol::messages::Cmd::StoragePaymentReceived {
chunk_addr,
spend_addr,
owner,
royalty,
store_cost,
}) => {
let response = Response::Cmd(
sn_protocol::messages::CmdResponse::StoragePaymentReceived(Ok(())),
);
self.swarm
.behaviour_mut()
.request_response
.send_response(channel, response)
.map_err(|_| NetworkError::InternalMsgChannelDropped)?;

self.send_event(NetworkEvent::StoragePaymentNotification {
chunk_addr,
spend_addr,
owner,
royalty,
store_cost,
})
}
Request::Query(query) => {
self.send_event(NetworkEvent::QueryRequestReceived {
query,
Expand Down
65 changes: 64 additions & 1 deletion sn_networking/src/spends.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,10 @@

use crate::{Network, NetworkError, Result};
use futures::future::join_all;
use sn_transfers::{is_genesis_spend, SignedSpend, SpendAddress, TransferError};
use sn_transfers::{
is_genesis_spend, SignedSpend, SpendAddress, TransferError, CASHNOTE_PURPOSE_OF_CHANGE,
CASHNOTE_PURPOSE_OF_NETWORK_ROYALTIES,
};
use std::{collections::BTreeSet, iter::Iterator};

impl Network {
Expand Down Expand Up @@ -54,4 +57,64 @@ impl Network {

Ok(())
}

/// This function verifies a received notification of an owner's claim of storage payment
/// It fetches the correspondent spend, then carry out verification.
/// Once verified, the owner and amount will be reported to Node for further record.
pub async fn handle_storage_payment_notification(
&self,
spend_address: SpendAddress,
owner: String,
royalty: u64,
store_cost: u64,
) -> Option<(String, u64, u64)> {
let spend = match self.get_spend(spend_address).await {
Ok(spend) => spend,
Err(err) => {
error!(
"When verify storage payment notification, cannot get spend {spend_address:?} {err:?}"
);
return None;
}
};

let royalty_keyword = CASHNOTE_PURPOSE_OF_NETWORK_ROYALTIES.to_string();
let change_keyword = CASHNOTE_PURPOSE_OF_CHANGE.to_string();

// 1, The spend's outputs shall have equal number of royalty and store_cost payments
// 2, The claimed payment shall be within the spend's outputs
let num_of_royalties = spend
.spent_tx()
.outputs
.iter()
.filter(|o| o.purpose == royalty_keyword)
.count();
let num_of_change = spend
.spent_tx()
.outputs
.iter()
.filter(|o| o.purpose == change_keyword)
.count();
let num_of_store_cost = spend.spent_tx().outputs.len() - num_of_royalties - num_of_change;

let payments_match = num_of_store_cost == num_of_royalties;

let find_royalty = spend
.spent_tx()
.outputs
.iter()
.any(|o| o.purpose == royalty_keyword && o.amount.as_nano() == royalty);
let find_store_cost = spend
.spent_tx()
.outputs
.iter()
.any(|o| o.purpose == owner && o.amount.as_nano() == store_cost);

if payments_match && find_royalty && find_store_cost {
Some((owner, royalty, store_cost))
} else {
error!("Claimed storage payment of ({owner} {royalty} {store_cost}) cann't be verified by the spend {spend:?}");
None
}
}
}
48 changes: 47 additions & 1 deletion sn_node/src/bin/safenode/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,14 @@ use sn_logging::metrics::init_metrics;
use sn_logging::{Level, LogFormat, LogOutputDest, ReloadHandle};
use sn_node::{Marker, NodeBuilder, NodeEvent, NodeEventsReceiver};
use sn_peers_acquisition::{get_peers_from_args, PeersArgs};
use sn_protocol::{node::get_safenode_root_dir, node_rpc::NodeCtrl};
use sn_protocol::{
node::get_safenode_root_dir,
node_rpc::NodeCtrl,
storage::{ChunkAddress, SpendAddress},
};
use sn_transfers::CASHNOTE_PURPOSE_OF_NETWORK_ROYALTIES;
use std::{
collections::{BTreeMap, BTreeSet},
env,
io::Write,
net::{IpAddr, Ipv4Addr, SocketAddr},
Expand Down Expand Up @@ -335,6 +341,11 @@ You can check your reward balance by running:

fn monitor_node_events(mut node_events_rx: NodeEventsReceiver, ctrl_tx: mpsc::Sender<NodeCtrl>) {
let _handle = tokio::spawn(async move {
let mut spends_statistics: BTreeMap<String, Vec<u64>> = Default::default();
let mut claimed_spends: BTreeMap<ChunkAddress, BTreeSet<SpendAddress>> = Default::default();

let royalty_reason = CASHNOTE_PURPOSE_OF_NETWORK_ROYALTIES.to_string();

loop {
match node_events_rx.recv().await {
Ok(NodeEvent::ConnectedToNetwork) => Marker::NodeConnectedToNetwork.log(),
Expand Down Expand Up @@ -366,6 +377,41 @@ fn monitor_node_events(mut node_events_rx: NodeEventsReceiver, ctrl_tx: mpsc::Se
break;
}
}
Ok(NodeEvent::StoragePayments {
chunk_address,
spend_address,
owner,
royalty,
store_cost,
}) => {
let holders = claimed_spends.entry(chunk_address).or_default();
if !holders.insert(spend_address) {
warn!("Chunk {chunk_address:?} already claimed storage payment within spend {spend_address:?}");
continue;
}
{
let holders = spends_statistics.entry(royalty_reason.clone()).or_default();
holders.push(royalty);
}
{
let holders = spends_statistics.entry(owner.clone()).or_default();
holders.push(store_cost);
}
let statistics: Vec<_> = spends_statistics
.iter()
.map(|(owner, payments)| {
let total_amount: u64 = payments.iter().sum();
format!("{},{},{total_amount}\n", owner.clone(), payments.len())
})
.collect();

let mut content = "\nCurrent storage statistics is:".to_string();
content = format!("{content}\nOwner, Times, Amount\n");
for entry in statistics.iter() {
content = format!("{content}\n{entry}");
}
info!("{content}");
}
Ok(event) => {
/* we ignore other events */
trace!("Currently ignored node event {event:?}");
Expand Down
15 changes: 14 additions & 1 deletion sn_node/src/event.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
use crate::error::{Error, Result};

use serde::{Deserialize, Serialize};
use sn_protocol::storage::{ChunkAddress, RegisterAddress};
use sn_protocol::storage::{ChunkAddress, RegisterAddress, SpendAddress};
use sn_transfers::UniquePubkey;
use tokio::sync::broadcast;

Expand Down Expand Up @@ -68,6 +68,19 @@ pub enum NodeEvent {
ChannelClosed,
/// Terminates the node
TerminateNode(String),
/// StoragePayment notification
StoragePayments {
/// Address of the chunk that the storage payment paid for
chunk_address: ChunkAddress,
/// Address of the spend that contains the claimed storage payment
spend_address: SpendAddress,
/// Owner of the node that received the storage payment
owner: String,
/// Royalty fee paid for the storage
royalty: u64,
/// Cost paid for the storage
store_cost: u64,
},
}

impl NodeEvent {
Expand Down
44 changes: 42 additions & 2 deletions sn_node/src/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -184,12 +184,12 @@ pub(crate) struct Node {
// We keep a copy of the Sender which is clonable and we can obtain a receiver from.
node_cmds: broadcast::Sender<NodeCmd>,
// Peers that are dialed at startup of node.
initial_peers: Arc<Vec<Multiaddr>>,
pub(crate) initial_peers: Arc<Vec<Multiaddr>>,
reward_address: Arc<MainPubkey>,
#[cfg(feature = "open-metrics")]
pub(crate) node_metrics: NodeMetrics,
/// node owner's discord username, in readable format
owner: String,
pub(crate) owner: String,
}

impl Node {
Expand Down Expand Up @@ -485,6 +485,46 @@ impl Node {
network.record_node_issues(peer_id, NodeIssue::FailedChunkProofCheck);
});
}
NetworkEvent::StoragePaymentNotification {
chunk_addr,
spend_addr,
owner,
royalty,
store_cost,
} => {
event_header = "StoragePaymentNotification";

let spend_address = if let Some(spend_address) = spend_addr.as_spend_address() {
spend_address
} else {
error!("When verify storage payment notification, cannot parse SpendAddress from {spend_addr:?}");
return;
};

let network = self.network.clone();
let events_channel = self.events_channel.clone();
info!("Received StoragePaymentNotification, notifying owner {owner:?} received \
{store_cost} tokens for store_cost and {royalty} tokens for royalty for chunk {chunk_addr:?}, with spend {spend_addr:?}");
let _handle = spawn(async move {
if let Some((owner, royalty, store_cost)) = network
.handle_storage_payment_notification(
spend_address,
owner,
royalty,
store_cost,
)
.await
{
events_channel.broadcast(NodeEvent::StoragePayments {
chunk_address: chunk_addr,
spend_address,
owner,
royalty,
store_cost,
});
}
});
}
}

trace!(
Expand Down