Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

More bugfixes & some suggested changes #11

Merged
merged 8 commits into from Mar 26, 2024
6 changes: 3 additions & 3 deletions src/delegation/agent.rs
@@ -1,8 +1,8 @@
use super::{payload::Payload, policy::Predicate, store::Store, Delegation};
use crate::ability::arguments::Named;
use crate::did;
use crate::{
ability::arguments::Named,
crypto::{signature::Envelope, varsig, Nonce},
did,
did::Did,
time::Timestamp,
};
Expand Down Expand Up @@ -94,7 +94,7 @@ where

let proofs = &self
.store
.get_chain(&self.did, &subject, "/".into(), vec![], now)
.get_chain(&self.did, &subject, &command, vec![], now)
.map_err(DelegateError::StoreError)?
.ok_or(DelegateError::ProofsNotFound)?;
let to_delegate = proofs.first().1.payload();
Expand Down
55 changes: 21 additions & 34 deletions src/delegation/store/memory.rs
Expand Up @@ -10,10 +10,11 @@ use libipld_core::codec::Encode;
use libipld_core::ipld::Ipld;
use libipld_core::{cid::Cid, codec::Codec};
use nonempty::NonEmpty;
use std::sync::{Arc, Mutex, RwLock, RwLockReadGuard, RwLockWriteGuard};
use std::borrow::Cow;
use std::{
collections::{BTreeMap, BTreeSet},
convert::Infallible,
sync::{Arc, Mutex, MutexGuard},
};
use web_time::SystemTime;

Expand Down Expand Up @@ -79,7 +80,7 @@ pub struct MemoryStore<
V: varsig::Header<C> = varsig::header::Preset,
C: Codec + TryFrom<u64> + Into<u64> = varsig::encoding::Preset,
> {
inner: Arc<RwLock<MemoryStoreInner<DID, V, C>>>,
inner: Arc<Mutex<MemoryStoreInner<DID, V, C>>>,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To account for starvation I guess?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, due to this conversation. I think RwLocks being able to cause deadlocks is... not great. Defaulting to Mutex seems like the better option 👍

}

#[derive(Debug, Clone, PartialEq)]
Expand All @@ -101,25 +102,15 @@ impl<DID: did::Did + Ord, V: varsig::Header<C>, C: Codec + TryFrom<u64> + Into<u
}

pub fn len(&self) -> usize {
self.read().ucans.len()
self.lock().ucans.len()
}

pub fn is_empty(&self) -> bool {
self.read().ucans.is_empty() // FIXME account for revocations?
self.lock().ucans.is_empty() // FIXME account for revocations?
}

fn read(&self) -> RwLockReadGuard<'_, MemoryStoreInner<DID, V, C>> {
match self.inner.read() {
Ok(guard) => guard,
Err(poison) => {
// We ignore lock poisoning for simplicity
poison.into_inner()
}
}
}

fn write(&self) -> RwLockWriteGuard<'_, MemoryStoreInner<DID, V, C>> {
match self.inner.write() {
fn lock(&self) -> MutexGuard<'_, MemoryStoreInner<DID, V, C>> {
match self.inner.lock() {
Ok(guard) => guard,
Err(poison) => {
// We ignore lock poisoning for simplicity
Expand Down Expand Up @@ -169,7 +160,7 @@ where
cid: &Cid,
) -> Result<Option<Arc<Delegation<DID, V, Enc>>>, Self::DelegationStoreError> {
// cheap Arc clone
Ok(self.read().ucans.get(cid).cloned())
Ok(self.lock().ucans.get(cid).cloned())
// FIXME
}

Expand All @@ -178,44 +169,40 @@ where
cid: Cid,
delegation: Delegation<DID, V, Enc>,
) -> Result<(), Self::DelegationStoreError> {
let mut write_tx = self.write();
let mut tx = self.lock();

write_tx
.index
tx.index
.entry(delegation.subject().clone())
.or_default()
.entry(delegation.audience().clone())
.or_default()
.insert(cid);

write_tx.ucans.insert(cid.clone(), Arc::new(delegation));
tx.ucans.insert(cid.clone(), Arc::new(delegation));

Ok(())
}

fn revoke(&self, cid: Cid) -> Result<(), Self::DelegationStoreError> {
self.write().revocations.insert(cid);
self.lock().revocations.insert(cid);
Ok(())
}

fn get_chain(
&self,
aud: &DID,
subject: &DID,
command: String,
policy: Vec<Predicate>,
command: &str,
policy: Vec<Predicate>, // FIXME
now: SystemTime,
) -> Result<Option<NonEmpty<(Cid, Arc<Delegation<DID, V, Enc>>)>>, Self::DelegationStoreError>
{
let blank_set = BTreeSet::new();
let blank_map = BTreeMap::new();
let read_tx = self.read();
let tx = self.lock();

let all_powerlines = read_tx.index.get(&None).unwrap_or(&blank_map);
let all_aud_for_subject = read_tx
.index
.get(&Some(subject.clone()))
.unwrap_or(&blank_map);
let all_powerlines = tx.index.get(&None).unwrap_or(&blank_map);
let all_aud_for_subject = tx.index.get(&Some(subject.clone())).unwrap_or(&blank_map);
let powerline_candidates = all_powerlines.get(aud).unwrap_or(&blank_set);
let sub_candidates = all_aud_for_subject.get(aud).unwrap_or(&blank_set);

Expand All @@ -224,9 +211,9 @@ where
let mut hypothesis_chain = vec![];

let corrected_target_command = if command.ends_with('/') {
command
Cow::Borrowed(command)
} else {
format!("{}/", command)
Cow::Owned(format!("{command}/"))
};

'outer: loop {
Expand All @@ -238,11 +225,11 @@ where

'inner: for cid in parent_cid_candidates {
// CHECKS
if read_tx.revocations.contains(cid) {
if tx.revocations.contains(cid) {
continue;
}

if let Some(delegation) = read_tx.ucans.get(cid) {
if let Some(delegation) = tx.ucans.get(cid) {
if delegation.check_time(now).is_err() {
continue;
}
Expand Down
8 changes: 4 additions & 4 deletions src/delegation/store/traits.rs
Expand Up @@ -50,7 +50,7 @@ where
&self,
audience: &DID,
subject: &DID,
command: String,
command: &str,
policy: Vec<Predicate>,
now: SystemTime,
) -> Result<Option<NonEmpty<(Cid, Arc<Delegation<DID, V, C>>)>>, Self::DelegationStoreError>;
Expand All @@ -59,7 +59,7 @@ where
&self,
audience: &DID,
subject: &DID,
command: String,
command: &str,
policy: Vec<Predicate>,
now: SystemTime,
) -> Result<Option<NonEmpty<Cid>>, Self::DelegationStoreError> {
Expand All @@ -71,7 +71,7 @@ where
&self,
issuer: DID,
audience: &DID,
command: String,
command: &str,
policy: Vec<Predicate>,
now: SystemTime,
) -> Result<bool, Self::DelegationStoreError> {
Expand Down Expand Up @@ -125,7 +125,7 @@ where
&self,
audience: &DID,
subject: &DID,
command: String,
command: &str,
policy: Vec<Predicate>,
now: SystemTime,
) -> Result<Option<NonEmpty<(Cid, Arc<Delegation<DID, V, C>>)>>, Self::DelegationStoreError>
Expand Down
63 changes: 33 additions & 30 deletions src/invocation/agent.rs
Expand Up @@ -3,19 +3,21 @@ use super::{
store::Store,
Invocation,
};
use crate::ability::arguments::Named;
use crate::ability::command::ToCommand;
use crate::ability::parse::ParseAbility;
use crate::delegation::Delegation;
use crate::invocation::payload::PayloadBuilder;
use crate::{
ability::{self, arguments, parse::ParseAbilityError, ucan::revoke::Revoke},
ability::{
self, arguments,
arguments::Named,
command::ToCommand,
parse::{ParseAbility, ParseAbilityError},
ucan::revoke::Revoke,
},
crypto::{
signature::{self, Envelope},
varsig, Nonce,
},
delegation,
did::{self, Did},
invocation::payload::PayloadBuilder,
time::Timestamp,
};
use enum_as_inner::EnumAsInner;
Expand Down Expand Up @@ -102,16 +104,9 @@ where
vec![]
} else {
self.delegation_store
.get_chain(
&self.did,
&subject.clone(),
ability.to_command(),
vec![],
now,
)
.map_err(InvokeError::DelegationStoreError)?
.map(|chain| chain.map(|(cid, _)| cid).into())
.unwrap_or(vec![]) // FIXME
.get_chain_cids(&self.did, &subject, &ability.to_command(), vec![], now)? // FIXME policy
.ok_or(InvokeError::ProofsNotFound)?
.into()
};

let payload = Payload {
Expand Down Expand Up @@ -223,7 +218,7 @@ where
.map(|(d, cid)| {
Ok(&d
.as_ref()
.ok_or(ReceiveError::MissingDelegation(*cid))?
.ok_or(ReceiveError::DelegationNotFound(*cid))?
.payload)
})
.collect::<Result<_, ReceiveError<T, DID, D::DelegationStoreError, S, V, C>>>()?;
Expand Down Expand Up @@ -306,8 +301,8 @@ pub enum ReceiveError<
> where
<S as Store<T, DID, V, C>>::InvocationStoreError: fmt::Debug,
{
#[error("missing delegation: {0}")]
MissingDelegation(Cid),
#[error("couldn't find delegation: {0}")]
DelegationNotFound(Cid),

#[error("encoding error: {0}")]
EncodingError(#[from] libipld_core::error::Error),
Expand All @@ -328,7 +323,10 @@ pub enum ReceiveError<
#[derive(Debug, Error)]
pub enum InvokeError<D> {
#[error("delegation store error: {0}")]
DelegationStoreError(#[source] D),
DelegationStoreError(#[from] D),

#[error("The current agent does not have the necessary proofs to invoke.")]
ProofsNotFound,

#[error("store error: {0}")]
SignError(#[source] signature::SignError),
Expand All @@ -337,23 +335,28 @@ pub enum InvokeError<D> {
#[cfg(test)]
mod tests {
use super::*;
use crate::ability::crud::read::Read;
use crate::crypto::varsig;
use crate::crypto::varsig::encoding;
use crate::crypto::varsig::header;
use crate::invocation::{payload::ValidationError, Agent};
use crate::{
ability::{arguments::Named, command::Command},
crypto::signature::Envelope,
ability::{arguments::Named, command::Command, crud::read::Read},
crypto::{
signature::Envelope,
varsig,
varsig::{encoding, header},
},
delegation::store::Store,
invocation::promise::{CantResolve, Resolvable},
invocation::{
payload::ValidationError,
promise::{CantResolve, Resolvable},
Agent,
},
ipld,
};
use libipld_core::{cid::Cid, ipld::Ipld};
use pretty_assertions as pretty;
use rand::thread_rng;
use std::ops::{Add, Sub};
use std::time::{Duration, SystemTime};
use std::{
ops::{Add, Sub},
time::{Duration, SystemTime},
};
use testresult::TestResult;

#[derive(Debug, Clone, PartialEq)]
Expand Down
26 changes: 8 additions & 18 deletions src/invocation/store/memory.rs
@@ -1,7 +1,7 @@
use crate::{crypto::varsig, did::Did, invocation::Invocation};
use super::Store;
use crate::{crypto::varsig, did::Did, invocation::Invocation};
use libipld_core::{cid::Cid, codec::Codec};
use std::sync::{Arc, RwLock, RwLockReadGuard, RwLockWriteGuard};
use std::sync::{Arc, Mutex, MutexGuard};
use std::{collections::BTreeMap, convert::Infallible};

#[derive(Debug, Clone)]
Expand All @@ -11,7 +11,7 @@ pub struct MemoryStore<
V: varsig::Header<C> = varsig::header::Preset,
C: Codec + TryFrom<u64> + Into<u64> = varsig::encoding::Preset,
> {
inner: Arc<RwLock<MemoryStoreInner<T, DID, V, C>>>,
inner: Arc<Mutex<MemoryStoreInner<T, DID, V, C>>>,
}

#[derive(Debug, Clone, PartialEq)]
Expand All @@ -27,18 +27,8 @@ pub struct MemoryStoreInner<
impl<T, DID: Did, V: varsig::Header<Enc>, Enc: Codec + Into<u64> + TryFrom<u64>>
MemoryStore<T, DID, V, Enc>
{
fn read(&self) -> RwLockReadGuard<'_, MemoryStoreInner<T, DID, V, Enc>> {
match self.inner.read() {
Ok(guard) => guard,
Err(poison) => {
// There's no logic errors through lock poisoning in our case
poison.into_inner()
}
}
}

fn write(&self) -> RwLockWriteGuard<'_, MemoryStoreInner<T, DID, V, Enc>> {
match self.inner.write() {
fn lock(&self) -> MutexGuard<'_, MemoryStoreInner<T, DID, V, Enc>> {
match self.inner.lock() {
Ok(guard) => guard,
Err(poison) => {
// There's no logic errors through lock poisoning in our case
Expand All @@ -53,7 +43,7 @@ impl<T, DID: Did, V: varsig::Header<Enc>, Enc: Codec + Into<u64> + TryFrom<u64>>
{
fn default() -> Self {
Self {
inner: Arc::new(RwLock::new(MemoryStoreInner {
inner: Arc::new(Mutex::new(MemoryStoreInner {
store: BTreeMap::new(),
})),
}
Expand All @@ -69,15 +59,15 @@ impl<T, DID: Did, V: varsig::Header<Enc>, Enc: Codec + Into<u64> + TryFrom<u64>>
&self,
cid: Cid,
) -> Result<Option<Arc<Invocation<T, DID, V, Enc>>>, Self::InvocationStoreError> {
Ok(self.read().store.get(&cid).cloned())
Ok(self.lock().store.get(&cid).cloned())
}

fn put(
&self,
cid: Cid,
invocation: Invocation<T, DID, V, Enc>,
) -> Result<(), Self::InvocationStoreError> {
self.write().store.insert(cid, Arc::new(invocation));
self.lock().store.insert(cid, Arc::new(invocation));
Ok(())
}
}