Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(indexer): fix events query #797

Open
wants to merge 9 commits into
base: development
Choose a base branch
from
60 changes: 13 additions & 47 deletions applications/tari_indexer/src/graphql/model/events.rs
Expand Up @@ -25,7 +25,7 @@ use std::{collections::BTreeMap, str::FromStr, sync::Arc};
use async_graphql::{Context, EmptyMutation, EmptySubscription, Object, Schema, SimpleObject};
use log::*;
use serde::{Deserialize, Serialize};
use tari_template_lib::{prelude::ComponentAddress, Hash};
use tari_template_lib::prelude::{ComponentAddress, ResourceAddress};
use tari_transaction::TransactionId;

use crate::substate_manager::SubstateManager;
Expand All @@ -35,19 +35,19 @@ const LOG_TARGET: &str = "tari::indexer::graphql::events";
#[derive(SimpleObject, Clone, Debug, Deserialize, Serialize, PartialEq)]
#[serde(rename_all = "camelCase")]
pub struct Event {
pub component_address: Option<[u8; 32]>,
pub template_address: [u8; 32],
pub tx_hash: [u8; 32],
pub component_address: Option<String>,
pub template_address: String,
pub tx_hash: String,
pub topic: String,
pub payload: BTreeMap<String, String>,
}

impl Event {
fn from_engine_event(event: tari_engine_types::events::Event) -> Result<Self, anyhow::Error> {
Ok(Self {
component_address: event.component_address().map(|comp_addr| comp_addr.into_array()),
template_address: event.template_address().into_array(),
tx_hash: event.tx_hash().into_array(),
component_address: event.component_address().map(|comp_addr| comp_addr.to_string()),
template_address: event.template_address().to_string(),
tx_hash: event.tx_hash().to_string(),
topic: event.topic(),
payload: event.into_payload().into_iter().collect(),
})
Expand Down Expand Up @@ -91,6 +91,7 @@ impl EventQuery {
&self,
ctx: &Context<'_>,
component_address: String,
resource_address: Option<String>,
version: Option<u32>,
) -> Result<Vec<Event>, anyhow::Error> {
let version = version.unwrap_or_default();
Expand All @@ -100,51 +101,16 @@ impl EventQuery {
);
let substate_manager = ctx.data_unchecked::<Arc<SubstateManager>>();
let events = substate_manager
.scan_events_for_substate_from_network(ComponentAddress::from_str(&component_address)?, Some(version))
.scan_events_for_substate_from_network(
ComponentAddress::from_str(&component_address)?,
resource_address.map(|r| ResourceAddress::from_str(&r)).transpose()?,
Some(version),
)
.await?
.iter()
.map(|e| Event::from_engine_event(e.clone()))
.collect::<Result<Vec<Event>, anyhow::Error>>()?;

Ok(events)
}

pub async fn save_event(
&self,
ctx: &Context<'_>,
component_address: String,
template_address: String,
tx_hash: String,
topic: String,
payload: String,
version: u64,
) -> Result<Event, anyhow::Error> {
info!(
target: LOG_TARGET,
"Saving event for component_address = {}, tx_hash = {} and topic = {}", component_address, tx_hash, topic
);

let component_address = ComponentAddress::from_hex(&component_address)?;
let template_address = Hash::from_str(&template_address)?;
let tx_hash = TransactionId::from_hex(&tx_hash)?;

let payload = serde_json::from_str(&payload)?;
let substate_manager = ctx.data_unchecked::<Arc<SubstateManager>>();
substate_manager.save_event_to_db(
component_address,
template_address,
tx_hash,
topic.clone(),
&payload,
version,
)?;

Ok(Event {
component_address: Some(component_address.into_array()),
template_address: template_address.into_array(),
tx_hash: tx_hash.into_array(),
topic,
payload: payload.into_iter().collect(),
})
}
}
33 changes: 24 additions & 9 deletions applications/tari_indexer/src/substate_manager.rs
Expand Up @@ -23,7 +23,7 @@
use std::{collections::HashMap, convert::TryInto, str::FromStr, sync::Arc};

use anyhow::anyhow;
use log::info;
use log::{info, warn};
use serde::{Deserialize, Serialize};
use tari_common_types::types::FixedHash;
use tari_crypto::tari_utilities::message_format::MessageFormat;
Expand All @@ -40,7 +40,7 @@ use tari_indexer_lib::{
NonFungibleSubstate,
};
use tari_template_lib::{
models::TemplateAddress,
models::{ResourceAddress, TemplateAddress},
prelude::{ComponentAddress, Metadata},
};
use tari_transaction::TransactionId;
Expand Down Expand Up @@ -349,6 +349,7 @@ impl SubstateManager {
pub async fn scan_events_for_substate_from_network(
&self,
component_address: ComponentAddress,
resource_address: Option<ResourceAddress>,
version: Option<u32>,
) -> Result<Vec<Event>, anyhow::Error> {
let mut events = vec![];
Expand All @@ -364,7 +365,7 @@ impl SubstateManager {
let stored_events = match tx.get_all_events(&component_address) {
Ok(events) => events,
Err(e) => {
info!(
warn!(
target: LOG_TARGET,
"Failed to get all events for component_address = {}, version = {} with error = {}",
component_address,
Expand All @@ -388,7 +389,7 @@ impl SubstateManager {
}
let network_version_events = self
.substate_scanner
.get_events_for_component_and_version(component_address, v)
.get_events_for_component_and_version(component_address, v, false)
.await?;
events.extend(network_version_events);
}
Expand All @@ -397,32 +398,46 @@ impl SubstateManager {
let version = version.max(latest_version_in_db);

// check if there are newest events for this component address in the network
// Include all events for the transaction so that we can cache them for future queries
let network_events = self
.substate_scanner
.get_events_for_component(component_address, Some(version))
.get_events_for_component(component_address, Some(version), true)
.await?;

// stores the newest network events to the db
// because the same component address with different version
// can be processed in the same transaction, we need to avoid
// duplicates
for (version, event) in network_events {
for (version, event) in network_events.into_iter().filter(|(v, _e)| v > &latest_version_in_db) {
let template_address = event.template_address();
let tx_hash = TransactionId::new(event.tx_hash().into_array());
let topic = event.topic();
let payload = event.payload();
self.save_event_to_db(
component_address,
event.component_address().unwrap_or(component_address),
template_address,
tx_hash,
topic,
payload,
u64::from(version),
)?;
events.push(event);
if event.component_address() == Some(component_address) {
events.push(event);
}
// events.push(event);
}

Ok(events)
match resource_address {
None => Ok(events),
Some(res) => {
let s = res.to_string();
let events = events
.into_iter()
.filter(|e| e.payload().get("resource") == Some(&s))
.collect::<Vec<_>>();
Ok(events)
},
}
}

pub async fn scan_and_update_substates(&self) -> Result<usize, anyhow::Error> {
Expand Down
Expand Up @@ -73,15 +73,11 @@ impl TryFrom<EventData> for crate::graphql::model::events::Event {
type Error = anyhow::Error;

fn try_from(event_data: EventData) -> Result<Self, Self::Error> {
let component_address = event_data
.component_address
.map(|comp_addr| ComponentAddress::from_str(comp_addr.as_str()))
.transpose()?
.map(|comp_addr| comp_addr.into_array());
let component_address = event_data.component_address;

let template_address = Hash::from_hex(&event_data.template_address)?.into_array();
let template_address = event_data.template_address.clone();

let tx_hash = Hash::from_hex(&event_data.tx_hash)?.into_array();
let tx_hash = event_data.tx_hash.clone();

let payload = serde_json::from_str(event_data.payload.as_str())?;

Expand Down
Expand Up @@ -391,8 +391,8 @@ impl SubstateStoreReadTransaction for SqliteSubstateStoreReadTransaction<'_> {
version
);
let res = sql_query(
"SELECT component_address, template_address, tx_hash, topic, payload FROM events WHERE component_address \
= ? AND version = ?",
"SELECT template_address, tx_hash, topic, payload, version, component_address FROM events WHERE \
component_address = ? AND version = ?",
)
.bind::<Nullable<Text>, _>(Some(component_address.to_string()))
.bind::<Integer, _>(version as i32)
Expand All @@ -405,13 +405,15 @@ impl SubstateStoreReadTransaction for SqliteSubstateStoreReadTransaction<'_> {
}

fn get_all_events(&mut self, component_address: &ComponentAddress) -> Result<Vec<EventData>, StorageError> {
let res =
sql_query("SELECT component_address, tx_hash, topic, payload FROM events WHERE component_address = ?")
.bind::<Text, _>(component_address.to_string())
.get_results::<EventData>(self.connection())
.map_err(|e| StorageError::QueryError {
reason: format!("get_events_by_version: {}", e),
})?;
let res = sql_query(
"SELECT component_address, template_address, tx_hash, topic, payload, version FROM events WHERE \
component_address = ?",
)
.bind::<Text, _>(component_address.to_string())
.get_results::<EventData>(self.connection())
.map_err(|e| StorageError::QueryError {
reason: format!("get_events_by_version: {}", e),
})?;
Ok(res)
}
}
Expand Down
9 changes: 7 additions & 2 deletions dan_layer/indexer_lib/src/substate_scanner.rs
Expand Up @@ -398,6 +398,7 @@ where
&self,
component_address: ComponentAddress,
version: u32,
include_other_events_from_tx: bool,
) -> Result<Vec<Event>, IndexerError> {
let substate_address = SubstateId::Component(component_address);

Expand All @@ -411,7 +412,10 @@ where
// to the current component address
let component_tx_events = tx_events
.into_iter()
.filter(|e| e.component_address().is_some() && e.component_address().unwrap() == component_address)
.filter(|e| {
include_other_events_from_tx ||
(e.component_address().is_some() && e.component_address().unwrap() == component_address)
})
.collect::<Vec<Event>>();
Ok(component_tx_events)
},
Expand All @@ -425,13 +429,14 @@ where
&self,
component_address: ComponentAddress,
version: Option<u32>,
include_other_events_from_tx: bool,
) -> Result<Vec<(u32, Event)>, IndexerError> {
let mut events = vec![];
let mut version: u32 = version.unwrap_or_default();

loop {
match self
.get_events_for_component_and_version(component_address, version)
.get_events_for_component_and_version(component_address, version, include_other_events_from_tx)
.await
{
Ok(component_tx_events) => events.extend(
Expand Down
1 change: 1 addition & 0 deletions dan_layer/storage/src/consensus_models/block.rs
Expand Up @@ -134,6 +134,7 @@ impl Block {
block
}

#[allow(clippy::too_many_arguments)]
pub fn load(
id: BlockId,
network: Network,
Expand Down
2 changes: 1 addition & 1 deletion integration_tests/src/indexer.rs
Expand Up @@ -126,7 +126,7 @@ impl IndexerProcess {
.unwrap_or_else(|e| panic!("Failed to save event via graphql client: {}", e));
let res = res.get("saveEvent").unwrap();

assert_eq!(res.component_address, Some([0u8; 32]));
assert_eq!(res.component_address, None);
}

pub fn get_jrpc_indexer_client(&self) -> IndexerJsonRpcClient {
Expand Down
11 changes: 5 additions & 6 deletions integration_tests/tests/steps/indexer.rs
Expand Up @@ -12,7 +12,6 @@ use integration_tests::{
TariWorld,
};
use libp2p::Multiaddr;
use tari_crypto::tari_utilities::hex::Hex;
use tari_indexer_client::types::AddPeerRequest;

#[when(expr = "indexer {word} connects to all other validators")]
Expand Down Expand Up @@ -90,21 +89,21 @@ async fn works_indexer_graphql(world: &mut TariWorld, indexer_name: String) {
// insert event mock data in the substate manager database
indexer.insert_event_mock_data().await;
let mut graphql_client = indexer.get_graphql_indexer_client().await;
let component_address = [0u8; 32];
let template_address = [0u8; 32];
let tx_hash = [0u8; 32];
let component_address = None;
let template_address = "00000000000000000000000000000000";
let tx_hash = "00000000000000000000000000000000";
let query = format!(
"{{ getEventsForTransaction(txHash: {:?}) {{ componentAddress, templateAddress, txHash, topic, payload }}
}}",
tx_hash.to_hex()
tx_hash
);
let res = graphql_client
.send_request::<HashMap<String, Vec<tari_indexer::graphql::model::events::Event>>>(&query, None, None)
.await
.expect("Failed to obtain getEventsForTransaction query result");
let res = res.get("getEventsForTransaction").unwrap();
assert_eq!(res.len(), 1);
assert_eq!(res[0].component_address, Some(component_address));
assert_eq!(res[0].component_address, component_address);
assert_eq!(res[0].template_address, template_address);
assert_eq!(res[0].tx_hash, tx_hash);
assert_eq!(res[0].topic, "my_event");
Expand Down