Skip to content

Commit

Permalink
[refactor] hyperledger#4423: Sign all query parameters, implement que…
Browse files Browse the repository at this point in the history
…ry filters in wasm

Signed-off-by: Nikita Strygin <dcnick3@users.noreply.github.com>
  • Loading branch information
DCNick3 committed May 2, 2024
1 parent 85cfd97 commit 55a305e
Show file tree
Hide file tree
Showing 15 changed files with 395 additions and 228 deletions.
36 changes: 15 additions & 21 deletions client/src/client.rs
Expand Up @@ -361,11 +361,11 @@ pub struct Client {
}

/// Query request
#[derive(Debug, Clone, serde::Serialize)]
#[derive(Debug, Clone)]
pub struct QueryRequest {
torii_url: Url,
headers: HashMap<String, String>,
request: crate::data_model::query::QueryRequest<Vec<u8>>,
request: crate::data_model::query::QueryRequest<SignedQuery>,
}

impl QueryRequest {
Expand All @@ -377,12 +377,8 @@ impl QueryRequest {
torii_url: format!("http://{torii_url}").parse().unwrap(),
headers: HashMap::new(),
request: crate::data_model::query::QueryRequest::Query(
crate::data_model::query::QueryWithParameters {
query: Vec::default(),
sorting: Sorting::default(),
pagination: Pagination::default(),
fetch_size: FetchSize::default(),
},
ClientQueryBuilder::new(FindAllAccounts, "alice@wonderland".parse().unwrap())
.sign(&KeyPair::random()),
),
}
}
Expand All @@ -395,11 +391,9 @@ impl QueryRequest {
.headers(self.headers);

match self.request {
crate::data_model::query::QueryRequest::Query(query_with_params) => builder
.params(query_with_params.sorting().clone().into_query_parameters())
.params(query_with_params.pagination().into_query_parameters())
.params(query_with_params.fetch_size().into_query_parameters())
.body(query_with_params.query().clone()),
crate::data_model::query::QueryRequest::Query(signed_query) => {
builder.body(signed_query.encode())
}
crate::data_model::query::QueryRequest::Cursor(cursor) => {
builder.params(Vec::from(cursor))
}
Expand Down Expand Up @@ -490,7 +484,7 @@ impl Client {
///
/// # Errors
/// Fails if signature generation fails
pub fn sign_query(&self, query: QueryBuilder) -> SignedQuery {
pub fn sign_query(&self, query: ClientQueryBuilder) -> SignedQuery {
query.sign(&self.key_pair)
}

Expand Down Expand Up @@ -822,17 +816,17 @@ impl Client {
where
<R::Output as TryFrom<QueryOutputBox>>::Error: Into<eyre::Error>,
{
let query_builder = QueryBuilder::new(request, self.account_id.clone()).with_filter(filter);
let request = self.sign_query(query_builder).encode_versioned();
let query_builder = ClientQueryBuilder::new(request, self.account_id.clone())
.with_filter(filter)
.with_pagination(pagination)
.with_sorting(sorting)
.with_fetch_size(fetch_size);
let request = self.sign_query(query_builder);

let query_request = QueryRequest {
torii_url: self.torii_url.clone(),
headers: self.headers.clone(),
request: crate::data_model::query::QueryRequest::Query(
crate::data_model::query::QueryWithParameters::new(
request, sorting, pagination, fetch_size,
),
),
request: crate::data_model::query::QueryRequest::Query(request),
};

(
Expand Down
Binary file modified configs/swarm/executor.wasm 100644 → 100755
Binary file not shown.
90 changes: 19 additions & 71 deletions core/src/query/store.rs
@@ -1,31 +1,23 @@
//! This module contains [`LiveQueryStore`] actor.

use std::{
cmp::Ordering,
num::NonZeroU64,
time::{Duration, Instant},
};

use indexmap::IndexMap;
use iroha_config::parameters::actual::LiveQueryStore as Config;
use iroha_data_model::{
asset::AssetValue,
query::{
cursor::ForwardCursor, error::QueryExecutionFail, pagination::Pagination, sorting::Sorting,
FetchSize, QueryId, QueryOutputBox, DEFAULT_FETCH_SIZE, MAX_FETCH_SIZE,
},
BatchedResponse, BatchedResponseV1, HasMetadata, IdentifiableBox, ValidationFail,
query::{cursor::ForwardCursor, error::QueryExecutionFail, QueryId, QueryOutputBox},
BatchedResponse, BatchedResponseV1, ValidationFail,
};
use iroha_logger::trace;
use parity_scale_codec::{Decode, Encode};
use serde::{Deserialize, Serialize};
use tokio::sync::{mpsc, oneshot};

use super::{
cursor::{Batch as _, Batched, UnknownCursor},
pagination::Paginate as _,
};
use crate::smartcontracts::query::LazyQueryOutput;
use super::cursor::{Batched, UnknownCursor};
use crate::smartcontracts::query::ProcessedQueryOutput;

/// Query service error.
#[derive(Debug, thiserror::Error, Copy, Clone, Serialize, Deserialize, Encode, Decode)]
Expand Down Expand Up @@ -157,36 +149,26 @@ pub struct LiveQueryStoreHandle {
}

impl LiveQueryStoreHandle {
/// Apply sorting and pagination to the query output.
/// Construct a batched response from a post-processed query output.
///
/// # Errors
///
/// - Returns [`Error::ConnectionClosed`] if [`LiveQueryStore`] is dropped,
/// - Otherwise throws up query output handling errors.
pub fn handle_query_output(
&self,
query_output: LazyQueryOutput<'_>,
sorting: &Sorting,
pagination: Pagination,
fetch_size: FetchSize,
query_output: ProcessedQueryOutput,
) -> Result<BatchedResponse<QueryOutputBox>> {
match query_output {
LazyQueryOutput::QueryOutput(batch) => {
ProcessedQueryOutput::Single(batch) => {
let cursor = ForwardCursor::default();
let result = BatchedResponseV1 { batch, cursor };
Ok(result.into())
}
LazyQueryOutput::Iter(iter) => {
let fetch_size = fetch_size.fetch_size.unwrap_or(DEFAULT_FETCH_SIZE);
if fetch_size > MAX_FETCH_SIZE {
return Err(Error::FetchSizeTooBig);
}

let live_query = Self::apply_sorting_and_pagination(iter, sorting, pagination);
ProcessedQueryOutput::Iter(live_query) => {
let query_id = uuid::Uuid::new_v4().to_string();

let curr_cursor = Some(0);
let live_query = live_query.batched(fetch_size);
self.construct_query_response(query_id, curr_cursor, live_query)
}
}
Expand Down Expand Up @@ -260,57 +242,18 @@ impl LiveQueryStoreHandle {

Ok(query_response.into())
}

fn apply_sorting_and_pagination(
iter: impl Iterator<Item = QueryOutputBox>,
sorting: &Sorting,
pagination: Pagination,
) -> Vec<QueryOutputBox> {
if let Some(key) = &sorting.sort_by_metadata_key {
let mut pairs: Vec<(Option<QueryOutputBox>, QueryOutputBox)> = iter
.map(|value| {
let key = match &value {
QueryOutputBox::Identifiable(IdentifiableBox::Asset(asset)) => {
match asset.value() {
AssetValue::Store(store) => store.get(key).cloned().map(Into::into),
_ => None,
}
}
QueryOutputBox::Identifiable(v) => TryInto::<&dyn HasMetadata>::try_into(v)
.ok()
.and_then(|has_metadata| has_metadata.metadata().get(key))
.cloned()
.map(Into::into),
_ => None,
};
(key, value)
})
.collect();
pairs.sort_by(
|(left_key, _), (right_key, _)| match (left_key, right_key) {
(Some(l), Some(r)) => l.cmp(r),
(Some(_), None) => Ordering::Less,
(None, Some(_)) => Ordering::Greater,
(None, None) => Ordering::Equal,
},
);
pairs
.into_iter()
.map(|(_, val)| val)
.paginate(pagination)
.collect()
} else {
iter.paginate(pagination).collect()
}
}
}

#[cfg(test)]
mod tests {
use iroha_data_model::metadata::MetadataValueBox;
use iroha_data_model::{
metadata::MetadataValueBox,
query::{predicate::PredicateBox, FetchSize, Pagination, Sorting},
};
use nonzero_ext::nonzero;

use super::*;
use crate::smartcontracts::query::LazyQueryOutput;

#[test]
fn query_message_order_preserved() {
Expand All @@ -319,6 +262,7 @@ mod tests {
let query_store_handle = threaded_rt.block_on(async { query_store.start() });

for i in 0..10_000 {
let filter = PredicateBox::default();
let pagination = Pagination::default();
let fetch_size = FetchSize {
fetch_size: Some(nonzero!(1_u32)),
Expand All @@ -331,8 +275,12 @@ mod tests {

let mut counter = 0;

let query_output = query_output
.apply_postprocessing(&filter, &sorting, pagination, fetch_size)
.unwrap();

let (batch, mut cursor) = query_store_handle
.handle_query_output(query_output, &sorting, pagination, fetch_size)
.handle_query_output(query_output)
.unwrap()
.into();
let QueryOutputBox::Vec(v) = batch else {
Expand Down
121 changes: 111 additions & 10 deletions core/src/smartcontracts/isi/query.rs
@@ -1,14 +1,23 @@
//! Query functionality. The common error type is also defined here,
//! alongside functions for converting them into HTTP responses.
use std::cmp::Ordering;

use eyre::Result;
use iroha_data_model::{
prelude::*,
query::{error::QueryExecutionFail as Error, QueryOutputBox},
query::{
error::QueryExecutionFail as Error, predicate::PredicateBox, Pagination, QueryOutputBox,
Sorting,
},
};
use parity_scale_codec::{Decode, Encode};

use crate::{
prelude::ValidQuery,
query::{
cursor::{Batch as _, Batched},
pagination::Paginate as _,
},
state::{StateReadOnly, WorldReadOnly},
};

Expand All @@ -26,6 +35,97 @@ pub enum LazyQueryOutput<'a> {
Iter(Box<dyn Iterator<Item = QueryOutputBox> + 'a>),
}

impl LazyQueryOutput<'_> {
/// If the underlying output is an iterator, apply all the query postprocessing:
/// - filtering
/// - sorting
/// - pagination
/// - batching
pub fn apply_postprocessing(
self,
filter: &PredicateBox,
sorting: &Sorting,
pagination: Pagination,
fetch_size: FetchSize,
) -> Result<ProcessedQueryOutput, Error> {
match self {
// nothing applies to the singular results
LazyQueryOutput::QueryOutput(output) => Ok(ProcessedQueryOutput::Single(output)),
LazyQueryOutput::Iter(iter) => {
// filter the results
let iter = iter.filter(move |v| filter.applies(v));

// sort & paginate
let output = match &sorting.sort_by_metadata_key {
Some(key) => {
// if sorting was requested, we need to retrieve all the results first
let mut pairs: Vec<(Option<QueryOutputBox>, QueryOutputBox)> = iter
.map(|value| {
let key = match &value {
QueryOutputBox::Identifiable(IdentifiableBox::Asset(asset)) => {
match asset.value() {
AssetValue::Store(store) => {
store.get(key).cloned().map(Into::into)
}
_ => None,
}
}
QueryOutputBox::Identifiable(v) => {
TryInto::<&dyn HasMetadata>::try_into(v)
.ok()
.and_then(|has_metadata| {
has_metadata.metadata().get(key)
})
.cloned()
.map(Into::into)
}
_ => None,
};
(key, value)
})
.collect();
pairs.sort_by(|(left_key, _), (right_key, _)| {
match (left_key, right_key) {
(Some(l), Some(r)) => l.cmp(r),
(Some(_), None) => Ordering::Less,
(None, Some(_)) => Ordering::Greater,
(None, None) => Ordering::Equal,
}
});
pairs
.into_iter()
.map(|(_, val)| val)
.paginate(pagination)
.collect::<Vec<_>>()
}
// no sorting, can just paginate the results without constructing the full output vec
None => iter.paginate(pagination).collect::<Vec<_>>(),
};

let fetch_size = fetch_size
.fetch_size
.unwrap_or(iroha_data_model::query::DEFAULT_FETCH_SIZE);
if fetch_size > iroha_data_model::query::MAX_FETCH_SIZE {
return Err(Error::FetchSizeTooBig);
}

// split the results into batches of fetch_size
Ok(ProcessedQueryOutput::Iter(output.batched(fetch_size)))
}
}
}
}

/// An evaluated & post-processed query output that is ready to be sent to the live query store
///
/// It has all the parameters (filtering, sorting, pagination and batching) applied already
pub enum ProcessedQueryOutput {
/// A single query output
Single(QueryOutputBox),
/// An iterable query result, batched into fetch_size-sized chunks
Iter(Batched<Vec<QueryOutputBox>>),
}

impl Lazy for QueryOutputBox {
type Lazy<'a> = LazyQueryOutput<'a>;
}
Expand Down Expand Up @@ -96,17 +196,18 @@ impl ValidQueryRequest {
///
/// # Errors
/// Forwards `self.query.execute` error.
pub fn execute<'state>(
pub fn execute_and_process<'state>(
&'state self,
state_ro: &'state impl StateReadOnly,
) -> Result<LazyQueryOutput<'state>, Error> {
let output = self.0.query().execute(state_ro)?;

Ok(if let LazyQueryOutput::Iter(iter) = output {
LazyQueryOutput::Iter(Box::new(iter.filter(|val| self.0.filter().applies(val))))
} else {
output
})
) -> Result<ProcessedQueryOutput, Error> {
let query = &self.0;

query.query().execute(state_ro)?.apply_postprocessing(
query.filter(),
query.sorting(),
query.pagination(),
query.fetch_size(),
)

// We're not handling the LimitedMetadata case, because
// the predicate when applied to it is ambiguous. We could
Expand Down

0 comments on commit 55a305e

Please sign in to comment.