Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Replace anyhow with thiserror #115

Open
wants to merge 13 commits into
base: master
Choose a base branch
from
Open
683 changes: 345 additions & 338 deletions Cargo.lock

Large diffs are not rendered by default.

1 change: 1 addition & 0 deletions banyan-utils/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ serde = { version = "1.0.133", features = ["derive", "rc"] }
serde_json = "1.0.74"
smol_str = { version = "0.1.21", features = ["serde"] }
structopt = "0.3.25"
thiserror = "1"
tokio = { version = "1.15.0", features = ["full"] }
tracing = "0.1.29"
tracing-subscriber = "0.3.5"
Expand Down
4 changes: 4 additions & 0 deletions banyan-utils/examples/filter_tiny.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,13 +40,17 @@ impl<S> OpsCountingStore<S> {
}

impl<L, S: ReadOnlyStore<L>> ReadOnlyStore<L> for OpsCountingStore<S> {
type Error = anyhow::Error;

fn get(&self, link: &L) -> anyhow::Result<Box<[u8]>> {
self.reads.fetch_add(1, Ordering::SeqCst);
self.inner.get(link)
}
}

impl<L, S: BlockWriter<L> + Send + Sync> BlockWriter<L> for OpsCountingStore<S> {
type Error = anyhow::Error;

fn put(&mut self, data: Vec<u8>) -> anyhow::Result<L> {
self.writes.fetch_add(1, Ordering::SeqCst);
self.inner.put(data)
Expand Down
8 changes: 6 additions & 2 deletions banyan-utils/src/bin/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,9 @@ enum Storage {
Sqlite(SqliteStore<DefaultParams>),
}
impl ReadOnlyStore<Sha256Digest> for Storage {
fn get(&self, link: &Sha256Digest) -> Result<Box<[u8]>> {
type Error = Error;

fn get(&self, link: &Sha256Digest) -> Result<Box<[u8]>, Self::Error> {
match self {
Self::Memory(m) => m.get(link),
Storage::Ipfs(i) => i.get(link),
Expand All @@ -47,7 +49,9 @@ impl ReadOnlyStore<Sha256Digest> for Storage {
}

impl BlockWriter<Sha256Digest> for Storage {
fn put(&mut self, data: Vec<u8>) -> Result<Sha256Digest> {
type Error = Error;

fn put(&mut self, data: Vec<u8>) -> Result<Sha256Digest, Self::Error> {
match self {
Self::Memory(m) => m.put(data),
Storage::Ipfs(i) => i.put(data),
Expand Down
14 changes: 14 additions & 0 deletions banyan-utils/src/error.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
#[derive(Debug, thiserror::Error)]
pub enum Error {
#[error(transparent)]
Banyan(#[from] banyan::error::Error),

#[error(transparent)]
Reqwest(#[from] reqwest::Error),

#[error("unsupported codec {}", .0)]
UnsupportedCodec(u64),

#[error("join error")]
JoinError,
}
26 changes: 15 additions & 11 deletions banyan-utils/src/ipfs.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,14 @@
//! helper methods to work with ipfs/ipld
use anyhow::{anyhow, Result};
use banyan::store::{BlockWriter, ReadOnlyStore};
use futures::prelude::*;
use libipld::Cid;
use serde::{de::IgnoredAny, de::Visitor, Deserialize, Deserializer, Serialize, Serializer};
use std::{convert::TryInto, fmt, str::FromStr};

use crate::tags::Sha256Digest;
use crate::error::Error;

pub fn block_get(key: &Cid) -> Result<Box<[u8]>> {
pub fn block_get(key: &Cid) -> Result<Box<[u8]>, Error> {
let url = reqwest::Url::parse_with_params(
"http://localhost:5001/api/v0/block/get",
&[("arg", format!("{}", key))],
Expand Down Expand Up @@ -71,7 +71,7 @@ struct IpfsPubsubEventIo {
_topic_ids: IgnoredAny,
}

pub fn pubsub_sub(topic: &str) -> Result<impl Stream<Item = reqwest::Result<Vec<u8>>>> {
pub fn pubsub_sub(topic: &str) -> Result<impl Stream<Item = reqwest::Result<Vec<u8>>>, Error> {
let url = reqwest::Url::parse_with_params(
"http://localhost:5001/api/v0/pubsub/sub",
&[("arg", topic)],
Expand All @@ -94,7 +94,7 @@ pub fn pubsub_sub(topic: &str) -> Result<impl Stream<Item = reqwest::Result<Vec<
Ok(data)
}

pub async fn pubsub_pub(topic: &str, data: &[u8]) -> Result<()> {
pub async fn pubsub_pub(topic: &str, data: &[u8]) -> Result<(), Error> {
use percent_encoding::{percent_encode, NON_ALPHANUMERIC};
let topic = percent_encode(topic.as_bytes(), NON_ALPHANUMERIC).to_string();
let data = percent_encode(data, NON_ALPHANUMERIC).to_string();
Expand All @@ -107,16 +107,16 @@ pub async fn pubsub_pub(topic: &str, data: &[u8]) -> Result<()> {
Ok(())
}

fn format_codec(codec: u64) -> Result<&'static str> {
fn format_codec(codec: u64) -> Result<&'static str, Error> {
match codec {
0x71 => Ok("cbor"),
0x70 => Ok("protobuf"),
0x55 => Ok("raw"),
_ => Err(anyhow!("unsupported codec {}", codec)),
_ => Err(Error::UnsupportedCodec(codec)),
}
}

pub fn block_put(data: &[u8], codec: u64, pin: bool) -> Result<Cid> {
pub fn block_put(data: &[u8], codec: u64, pin: bool) -> Result<Cid, Error> {
let url = reqwest::Url::parse_with_params(
"http://localhost:5001/api/v0/block/put",
&[("format", format_codec(codec)?), ("pin", &pin.to_string())],
Expand All @@ -135,19 +135,23 @@ pub fn block_put(data: &[u8], codec: u64, pin: bool) -> Result<Cid> {
pub struct IpfsStore;

impl ReadOnlyStore<Sha256Digest> for IpfsStore {
fn get(&self, link: &Sha256Digest) -> Result<Box<[u8]>> {
type Error = Error;

fn get(&self, link: &Sha256Digest) -> Result<Box<[u8]>, Error> {
let cid: Cid = (*link).into();
std::thread::spawn(move || crate::ipfs::block_get(&cid))
.join()
.map_err(|_| anyhow!("join error!"))?
.map_err(|_| Error::JoinError)?
}
}

impl BlockWriter<Sha256Digest> for IpfsStore {
fn put(&mut self, data: Vec<u8>) -> Result<Sha256Digest> {
type Error = Error;

fn put(&mut self, data: Vec<u8>) -> Result<Sha256Digest, Self::Error> {
let cid = std::thread::spawn(move || crate::ipfs::block_put(&data, 0x71, false))
.join()
.map_err(|_| anyhow!("join error!"))??;
.map_err(|_| Error::JoinError)??;
assert!(cid.hash().code() == 0x12);
assert!(cid.hash().digest().len() == 32);
cid.try_into()
Expand Down
1 change: 1 addition & 0 deletions banyan-utils/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
#![allow(clippy::upper_case_acronyms)]
pub mod dump;
pub mod error;
pub mod ipfs;
pub mod sqlite;
pub mod tag_index;
Expand Down
12 changes: 8 additions & 4 deletions banyan-utils/src/sqlite.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
//! helper methods to work with ipfs/ipld
use anyhow::{anyhow, Result};
use banyan::store::{BlockWriter, ReadOnlyStore};
use ipfs_sqlite_block_store::BlockStore;
use libipld::{codec::References, store::StoreParams, Block, Cid, Ipld};
use parking_lot::Mutex;
use std::sync::Arc;

use crate::error::Error;
use crate::tags::Sha256Digest;

#[derive(Clone)]
Expand All @@ -21,13 +21,15 @@ impl<S: StoreParams> ReadOnlyStore<Sha256Digest> for SqliteStore<S>
where
Ipld: References<S::Codecs>,
{
fn get(&self, link: &Sha256Digest) -> Result<Box<[u8]>> {
type Error = Error;

fn get(&self, link: &Sha256Digest) -> Result<Box<[u8]>, Self::Error> {
let cid = Cid::from(*link);
let block = self.0.lock().get_block(&cid)?;
if let Some(block) = block {
Ok(block.into())
} else {
Err(anyhow!("block not found!"))
Err(Error::NotThere)
}
}
}
Expand All @@ -36,7 +38,9 @@ impl<S: StoreParams> BlockWriter<Sha256Digest> for SqliteStore<S>
where
Ipld: References<S::Codecs>,
{
fn put(&mut self, data: Vec<u8>) -> Result<Sha256Digest> {
type Error = Error;

fn put(&mut self, data: Vec<u8>) -> Result<Sha256Digest, Self::Error> {
let digest = Sha256Digest::new(&data);
let cid = digest.into();
let block = Block::new_unchecked(cid, data);
Expand Down
4 changes: 4 additions & 0 deletions banyan-utils/tests/ops_counting.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,13 +38,17 @@ impl<S> OpsCountingStore<S> {
}

impl<L, S: ReadOnlyStore<L>> ReadOnlyStore<L> for OpsCountingStore<S> {
type Error = anyhow::Error;

fn get(&self, link: &L) -> anyhow::Result<Box<[u8]>> {
self.reads.fetch_add(1, Ordering::SeqCst);
self.inner.get(link)
}
}

impl<L, S: BlockWriter<L> + Send + Sync> BlockWriter<L> for OpsCountingStore<S> {
type Error = anyhow::Error;

fn put(&mut self, data: Vec<u8>) -> anyhow::Result<L> {
self.writes.fetch_add(1, Ordering::SeqCst);
self.inner.put(data)
Expand Down
2 changes: 1 addition & 1 deletion banyan/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ metrics = ["prometheus", "lazy_static"]
default = ["metrics"]

[dependencies]
anyhow = "1.0.52"
cbor-data = "0.8.8"
chacha20 = "0.8.1"
cid = "0.8.6"
Expand All @@ -27,6 +26,7 @@ maplit = "1.0.2"
parking_lot = "0.12.1"
prometheus = { version = "0.13.0", optional = true }
smallvec = "1.7.0"
thiserror = "1.0.37"
tracing = "0.1.29"
weight-cache = "0.2.3"
# the only experimental feature we are using is ZSTD_decompressBound,
Expand Down
57 changes: 57 additions & 0 deletions banyan/src/error.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
#[derive(Debug, thiserror::Error)]
pub enum Error {
#[error("Tree must not be empty")]
TreeMustNotBeEmpty,

#[error("Index out of bounds: {}, length: {}", .tried, .length)]
IndexOutOfBounds { length: usize, tried: u64 },

#[error("must have more than 1 element when extending")]
MustHaveMoreThanOneElement,

#[error("Invalid: {}", .0)]
Invalid(&'static str),

#[error("Single item too large")]
ItemTooLarge,

#[error("Found purged data")]
PurgedDataFound,

#[error("Max size exceeded")]
MaxSizeExceeded,

#[error("Not there")]
NotThere,

#[error("Multiple strong references")]
MultipleStrongRef,

#[error("cypher stream seek offset wraparound")]
SeekOffsetWraparound,

#[error("expected ipld bytes")]
ExpectedIpldBytes,

#[error(transparent)]
Io(#[from] std::io::Error),

#[error(transparent)]
Ipld(#[from] libipld::error::Error),

#[error(transparent)]
CBorParse(#[from] cbor_data::ParseError),

#[error(transparent)]
CBorCodec(#[from] cbor_data::codec::CodecError),

#[error(transparent)]
Cid(#[from] cid::Error),

#[error(transparent)]
FromInt(#[from] std::num::TryFromIntError),

#[cfg(feature = "metrics")]
#[error(transparent)]
Prometheus(#[from] prometheus::Error),
}
4 changes: 2 additions & 2 deletions banyan/src/forest/index_iter.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
use super::{Forest, Secrets, TreeTypes};
use crate::{
error::Error,
index::{CompactSeq, Index, NodeInfo},
query::Query,
store::ReadOnlyStore,
};
use anyhow::Result;
use smallvec::{smallvec, SmallVec};

#[derive(PartialEq)]
Expand Down Expand Up @@ -102,7 +102,7 @@ where
R: ReadOnlyStore<T::Link>,
Q: Query<T>,
{
type Item = Result<Index<T>>;
type Item = Result<Index<T>, Error>;

fn next(&mut self) -> Option<Self::Item> {
let res = loop {
Expand Down
29 changes: 23 additions & 6 deletions banyan/src/forest/mod.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
//! creation and traversal of banyan trees
use super::index::*;
use crate::error::Error;
use crate::store::{BlockWriter, BranchCache, ReadOnlyStore};
use core::{fmt::Debug, hash::Hash, iter::FromIterator, ops::Range};
use libipld::cbor::DagCbor;
Expand Down Expand Up @@ -242,12 +243,28 @@ impl Config {
}
}

pub fn validate(&self) -> anyhow::Result<()> {
anyhow::ensure!(self.max_summary_branches > 1);
anyhow::ensure!(self.max_key_branches > 0);
anyhow::ensure!(self.target_leaf_size > 0 && self.target_leaf_size <= 1024 * 1024);
anyhow::ensure!(self.max_uncompressed_leaf_size <= 16 * 1024 * 1024);
anyhow::ensure!(self.zstd_level >= 1 && self.zstd_level <= 22);
pub fn validate(&self) -> Result<(), Error> {
if self.max_summary_branches <= 1 {
return Err(Error::Invalid("need at least 2 for max_summary_branches"));
}
if self.max_key_branches < 2 {
return Err(Error::Invalid("need at least 2 for max_key_branches"));
}
if self.target_leaf_size == 0 || self.target_leaf_size > 1024 * 1024 {
return Err(Error::Invalid(
"need at least one for target_leaf_size, but not more than 1048576",
));
}
if self.max_uncompressed_leaf_size > 16 * 1024 * 1024 {
return Err(Error::Invalid(
"Cannot have more than 16777216 for max_uncompressed_leaf_size",
));
}
if self.zstd_level < 1 || self.zstd_level > 22 {
return Err(Error::Invalid(
"need at least 1 for zstd_level, but not more than 22",
));
}
Ok(())
}
}
Expand Down
3 changes: 2 additions & 1 deletion banyan/src/forest/prom.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use crate::error::Error;
use lazy_static::lazy_static;
use prometheus::{exponential_buckets, Histogram, HistogramOpts, Registry};

Expand Down Expand Up @@ -52,7 +53,7 @@ lazy_static! {
.unwrap();
}

pub(crate) fn register_metrics(registry: &Registry) -> anyhow::Result<()> {
pub(crate) fn register_metrics(registry: &Registry) -> Result<(), Error> {
registry.register(Box::new(LEAF_LOAD_HIST.clone()))?;
registry.register(Box::new(BRANCH_LOAD_HIST.clone()))?;
registry.register(Box::new(LEAF_STORE_HIST.clone()))?;
Expand Down