Skip to content

Commit

Permalink
move startup and replication logic into a new server sub-crate (#295)
Browse files Browse the repository at this point in the history
* add new server sub-crate

* stub out examples dir

* test characterizing local network interfaces

* test advertising an mDNS service

* test mDNS peer discovery

* stub out a ServerBuilder struct

* move ServerBuilder into its own module

* stub out Cluster and Txn structs

* impl TxnServer

* implement ServerBuilder::build

* move the garbage collection thread into the gateway module

* move garbage collection logic into the new server module

* temporarily delete the gateway module

* impl Deserialize + Serialize for Claim

* impl ReplicaHandler

* stub out impl RPCClient for Txn

* impl Server::authorize_and_route

* first draft Kernel::authorize_and_route

* stub out Txn claim methods

* impl Txn::grant

* add a generic State type parameter to Kernel

* remove the gateway module and add a generic State type parameter to Txn

* rename RPCClient -> Gateway and add a remote-only RPCClient to server::Builder

* test starting a Server with a mock State

* impl Hash for Value

* stub out a Cluster keyring type

* first draft of Txn::claim

* accept a Borrow<str> in Map::option, Map::require, and Map::remove

* test executing a request to /transact/hypothetical

* move CacheBlock into the state sub-crate

* update the server sub-crate to depend on the state sub-crate

* add feature flags to the error crate

* add chain and collection feature flags to the state sub-crate

* copy cluster::Dir

* copy cluster::Class

* stub out a root /class endpoint

* test creating a new cluster directory at runtime

* update inline documentation to include the sequence of events to bootstrap a server & fix documentation build warnings

* implement a GET handler for cluster::Dir

* only claim write transactions & correctly route cluster GET requests when a token is provided

* impl RPCClient for Client & move mDNS service logic into Server::make_discoverable

* split Builder into two stages

* stub out Kernel::replicate_and_join

* implement Kernel::replicate_and_join

* require RPC clients to implement rjwt::Resolve

* impl Gateway for Txn

* don't use localhost for networking since it does not support mDNS discovery

* impl rjwt::Resolve for Client

* remove impl fmt::Display for Value

* implement transaction claim management for a distributed commit

* implement Cluster::replicate_commit

* test adding a host to a replica set

* first draft of ReplicaSetHandler

* use a consistent txn id when fetching a public key

* remove async_hash dependency for sub-crates which depend on the transact sub-crate

* validate the hash of a new cluster replica before allowing it to join the replica set

* add egress authorization & tracking to Client

* test adding a new host to a running cluster & verify that the cluster state is correctly replicated

* test adding a new class set version to a running cluster

* implement a versioned Library & its handlers

* implement Cluster::replicate_rollback

* update to the latest txfs

* provide an improved error message if a duplicate directory entry is requested

* remove unused code

* update trait bounds in the chain and collection sub-crates, and add them as optional deps of the server sub-crate

* impl Service
  • Loading branch information
haydnv committed Mar 6, 2024
1 parent c579183 commit 101f275
Show file tree
Hide file tree
Showing 101 changed files with 6,884 additions and 1,172 deletions.
13 changes: 13 additions & 0 deletions host/Cargo.toml
Expand Up @@ -11,6 +11,19 @@ repository = "https://github.com/haydnv/tinychain.git"
keywords = ["distributed", "transactional", "host", "platform", "runtime"]
categories = ["concurrency", "database-implementations", "data-structures", "hardware-support", "web-programming"]

[workspace]
members = [
"chain",
"collection",
"error",
"generic",
"scalar",
"server",
"state",
"transact",
"value",
]

[[bin]]
name = "tinychain"
path = "src/main.rs"
Expand Down
3 changes: 1 addition & 2 deletions host/chain/Cargo.toml
Expand Up @@ -9,12 +9,11 @@ readme = "README.md"
repository = "https://github.com/haydnv/tinychain.git"

[dependencies]
async-hash = "0.5"
async-trait = "0.1"
bytes = "1.3"
destream = "0.7"
futures = "0.3"
freqfs = "0.9"
freqfs = "~0.9.1"
get-size = "0.1"
hex = "0.4"
log = { version = "0.4" }
Expand Down
13 changes: 9 additions & 4 deletions host/chain/src/block.rs
Expand Up @@ -5,10 +5,10 @@
use std::fmt;
use std::marker::PhantomData;

use async_hash::{Output, Sha256};
use async_trait::async_trait;
use bytes::Bytes;
use destream::de;
use freqfs::FileSave;
use futures::future::TryFutureExt;
use futures::join;
use log::debug;
Expand All @@ -20,8 +20,9 @@ use tc_collection::Collection;
use tc_error::*;
use tc_scalar::Scalar;
use tc_transact::fs;
use tc_transact::hash::{AsyncHash, Output, Sha256};
use tc_transact::public::{Route, StateInstance};
use tc_transact::{AsyncHash, IntoView, Transact, Transaction, TxnId};
use tc_transact::{IntoView, Transact, Transaction, TxnId};
use tc_value::Value;
use tcgeneric::{Map, ThreadSafe, Tuple};

Expand Down Expand Up @@ -203,7 +204,7 @@ where
State::FE: AsType<ChainBlock> + for<'a> fs::FileSave<'a>,
T: Send + Sync,
{
async fn hash(self, txn_id: TxnId) -> TCResult<Output<Sha256>> {
async fn hash(&self, txn_id: TxnId) -> TCResult<Output<Sha256>> {
self.history.hash(txn_id).await
}
}
Expand Down Expand Up @@ -271,7 +272,11 @@ where
impl<'en, State, T> IntoView<'en, State::FE> for BlockChain<State, State::Txn, State::FE, T>
where
State: StateInstance,
State::FE: DenseCacheFile + AsType<ChainBlock> + AsType<BTreeNode> + AsType<TensorNode>,
State::FE: for<'a> FileSave<'a>
+ DenseCacheFile
+ AsType<ChainBlock>
+ AsType<BTreeNode>
+ AsType<TensorNode>,
T: IntoView<'en, State::FE, Txn = State::Txn> + Send + Sync,
{
type Txn = State::Txn;
Expand Down
2 changes: 1 addition & 1 deletion host/chain/src/data/block.rs
@@ -1,7 +1,6 @@
use std::collections::btree_map::{BTreeMap, Entry};
use std::fmt;

use async_hash::{Digest, Hash, Output, Sha256};
use async_trait::async_trait;
use bytes::Bytes;
use destream::{de, en};
Expand All @@ -11,6 +10,7 @@ use log::debug;

use tc_error::*;
use tc_scalar::Scalar;
use tc_transact::hash::{Digest, Hash, Output, Sha256};
use tc_transact::TxnId;
use tc_value::Value;

Expand Down
26 changes: 16 additions & 10 deletions host/chain/src/data/history.rs
Expand Up @@ -3,12 +3,10 @@ use std::fmt;
use std::iter;
use std::marker::PhantomData;

use async_hash::generic_array::GenericArray;
use async_hash::{Output, Sha256};
use async_trait::async_trait;
use bytes::Bytes;
use destream::{de, en};
use freqfs::{DirLock, DirWriteGuard, FileLock, FileReadGuard, FileReadGuardOwned, FileWriteGuard};
use freqfs::*;
use futures::stream::{self, StreamExt};
use futures::{try_join, TryFutureExt, TryStreamExt};
use get_size::GetSize;
Expand All @@ -21,9 +19,10 @@ use tc_collection::Collection;
use tc_error::*;
use tc_scalar::Scalar;
use tc_transact::fs;
use tc_transact::hash::{AsyncHash, GenericArray, Output, Sha256};
use tc_transact::lock::{TxnLock, TxnTaskQueue};
use tc_transact::public::{Public, Route, StateInstance};
use tc_transact::{AsyncHash, IntoView, Transact, Transaction, TxnId};
use tc_transact::{IntoView, Transact, Transaction, TxnId};
use tc_value::Value;
use tcgeneric::{label, Label, Map, TCBoxStream, TCBoxTryStream, ThreadSafe, Tuple};

Expand Down Expand Up @@ -430,7 +429,7 @@ where
State: StateInstance,
State::FE: AsType<ChainBlock> + for<'a> fs::FileSave<'a>,
{
async fn hash(self, txn_id: TxnId) -> TCResult<Output<Sha256>> {
async fn hash(&self, txn_id: TxnId) -> TCResult<Output<Sha256>> {
let latest_block_id = self.latest.read(txn_id).await?;
let latest_block = self.read_block(*latest_block_id).await?;

Expand Down Expand Up @@ -617,7 +616,11 @@ where
impl<'en, State> IntoView<'en, State::FE> for History<State, State::Txn, State::FE>
where
State: StateInstance,
State::FE: DenseCacheFile + AsType<ChainBlock> + AsType<BTreeNode> + AsType<TensorNode>,
State::FE: for<'a> FileSave<'a>
+ DenseCacheFile
+ AsType<ChainBlock>
+ AsType<BTreeNode>
+ AsType<TensorNode>,
{
type Txn = State::Txn;
type View = HistoryView<'en>;
Expand Down Expand Up @@ -812,9 +815,8 @@ async fn parse_block_state<State>(
block_data: Map<Tuple<State>>,
) -> TCResult<BTreeMap<TxnId, Vec<MutationRecord>>>
where
State: StateInstance,
State::FE: DenseCacheFile + AsType<BTreeNode> + AsType<TensorNode>,
State: StateInstance + From<Scalar>,
State::FE: for<'a> FileSave<'a> + DenseCacheFile + AsType<BTreeNode> + AsType<TensorNode>,
Collection<State::Txn, State::FE>: TryCastFrom<State>,
Scalar: TryCastFrom<State>,
Value: TryCastFrom<State>,
Expand Down Expand Up @@ -859,7 +861,11 @@ async fn replay_and_save<State, T>(
) -> TCResult<()>
where
State: StateInstance + From<Collection<State::Txn, State::FE>> + From<Scalar>,
State::FE: DenseCacheFile + AsType<ChainBlock> + AsType<BTreeNode> + AsType<TensorNode>,
State::FE: for<'a> FileSave<'a>
+ DenseCacheFile
+ AsType<ChainBlock>
+ AsType<BTreeNode>
+ AsType<TensorNode>,
T: Route<State> + fmt::Debug,
Collection<State::Txn, State::FE>: TryCastFrom<State>,
Scalar: TryCastFrom<State>,
Expand Down Expand Up @@ -908,7 +914,7 @@ async fn load_history<'en, State>(
) -> TCResult<MutationView<'en>>
where
State: StateInstance,
State::FE: DenseCacheFile + AsType<BTreeNode> + AsType<TensorNode>,
State::FE: for<'a> FileSave<'a> + DenseCacheFile + AsType<BTreeNode> + AsType<TensorNode>,
{
match op {
MutationRecord::Delete(key) => Ok(MutationView::Delete(key)),
Expand Down
7 changes: 5 additions & 2 deletions host/chain/src/data/mod.rs
@@ -1,5 +1,6 @@
use std::fmt;

use freqfs::FileSave;
use futures::TryFutureExt;
use safecast::{AsType, TryCastFrom};

Expand All @@ -26,7 +27,8 @@ pub(super) async fn replay_all<State, T>(
) -> TCResult<()>
where
State: StateInstance + From<Collection<State::Txn, State::FE>> + From<Scalar>,
State::FE: DenseCacheFile + AsType<BTreeNode> + AsType<TensorNode> + Clone,
State::FE:
for<'a> FileSave<'a> + DenseCacheFile + AsType<BTreeNode> + AsType<TensorNode> + Clone,
T: Route<State> + fmt::Debug,
Collection<State::Txn, State::FE>: TryCastFrom<State>,
Scalar: TryCastFrom<State>,
Expand All @@ -49,7 +51,8 @@ async fn replay<State, T>(
) -> TCResult<()>
where
State: StateInstance + From<Collection<State::Txn, State::FE>> + From<Scalar>,
State::FE: DenseCacheFile + AsType<BTreeNode> + AsType<TensorNode> + Clone,
State::FE:
for<'a> FileSave<'a> + DenseCacheFile + AsType<BTreeNode> + AsType<TensorNode> + Clone,
T: Route<State> + fmt::Debug,
Collection<State::Txn, State::FE>: TryCastFrom<State>,
Scalar: TryCastFrom<State>,
Expand Down
12 changes: 7 additions & 5 deletions host/chain/src/data/store.rs
Expand Up @@ -3,6 +3,7 @@ use std::marker::PhantomData;

use async_trait::async_trait;
use destream::en;
use freqfs::FileSave;
use futures::future::TryFutureExt;
use log::debug;
use safecast::*;
Expand All @@ -13,8 +14,9 @@ use tc_collection::{btree, Collection, CollectionBase, CollectionView, Schema};
use tc_error::*;
use tc_scalar::{OpRef, Scalar, TCRef};
use tc_transact::fs;
use tc_transact::hash::{AsyncHash, Hash, Output, Sha256};
use tc_transact::public::StateInstance;
use tc_transact::{AsyncHash, IntoView, Transact, Transaction, TxnId};
use tc_transact::{IntoView, Transact, Transaction, TxnId};
use tc_value::Value;
use tcgeneric::{Id, Instance, NativeClass, ThreadSafe};

Expand Down Expand Up @@ -75,12 +77,12 @@ where
FE: DenseCacheFile + AsType<BTreeNode> + AsType<TensorNode> + Clone,
Txn: Transaction<FE>,
Collection<Txn, FE>: AsyncHash,
Scalar: async_hash::Hash<async_hash::Sha256>,
Scalar: Hash<Sha256>,
{
async fn hash(self, txn_id: TxnId) -> TCResult<async_hash::Output<async_hash::Sha256>> {
async fn hash(&self, txn_id: TxnId) -> TCResult<Output<Sha256>> {
match self {
StoreEntry::Collection(collection) => collection.clone().hash(txn_id).await,
StoreEntry::Scalar(scalar) => Ok(async_hash::Hash::<async_hash::Sha256>::hash(scalar)),
StoreEntry::Scalar(scalar) => Ok(Hash::<Sha256>::hash(scalar)),
}
}
}
Expand Down Expand Up @@ -156,7 +158,7 @@ impl<Txn, FE> Store<Txn, FE> {
impl<Txn, FE> Store<Txn, FE>
where
Txn: Transaction<FE>,
FE: DenseCacheFile + AsType<BTreeNode> + AsType<TensorNode> + Clone,
FE: for<'a> FileSave<'a> + DenseCacheFile + AsType<BTreeNode> + AsType<TensorNode> + Clone,
BTreeNode: freqfs::FileLoad,
{
pub async fn resolve(&self, txn_id: TxnId, scalar: Scalar) -> TCResult<StoreEntry<Txn, FE>> {
Expand Down
11 changes: 6 additions & 5 deletions host/chain/src/lib.rs
Expand Up @@ -4,11 +4,10 @@ use std::fmt;
use std::marker::PhantomData;
use std::sync::Arc;

use async_hash::generic_array::GenericArray;
use async_hash::{Output, Sha256};
use async_trait::async_trait;
use bytes::Bytes;
use destream::{de, en};
use freqfs::FileSave;
use futures::future::TryFutureExt;
use safecast::{AsType, TryCastFrom};

Expand All @@ -18,9 +17,10 @@ use tc_collection::Collection;
use tc_error::*;
use tc_scalar::Scalar;
use tc_transact::fs;
use tc_transact::hash::{AsyncHash, GenericArray, Output, Sha256};
use tc_transact::lock::TxnTaskQueue;
use tc_transact::public::{Route, StateInstance};
use tc_transact::{AsyncHash, IntoView, Transact, Transaction, TxnId};
use tc_transact::{IntoView, Transact, Transaction, TxnId};
use tc_value::Value;
use tcgeneric::*;

Expand Down Expand Up @@ -190,7 +190,7 @@ where
State::FE: AsType<ChainBlock> + for<'a> fs::FileSave<'a> + ThreadSafe,
T: AsyncHash + Send + Sync,
{
async fn hash(self, txn_id: TxnId) -> TCResult<Output<Sha256>> {
async fn hash(&self, txn_id: TxnId) -> TCResult<Output<Sha256>> {
match self {
Self::Block(chain) => chain.hash(txn_id).await,
Self::Sync(chain) => chain.hash(txn_id).await,
Expand Down Expand Up @@ -550,7 +550,8 @@ fn new_queue<State>(
) -> TxnTaskQueue<MutationPending<State::Txn, State::FE>, TCResult<MutationRecord>>
where
State: StateInstance,
State::FE: DenseCacheFile + AsType<BTreeNode> + AsType<TensorNode> + Clone,
State::FE:
for<'a> FileSave<'a> + DenseCacheFile + AsType<BTreeNode> + AsType<TensorNode> + Clone,
{
TxnTaskQueue::new(Arc::pin(move |mutation| {
let store = store.clone();
Expand Down
14 changes: 9 additions & 5 deletions host/chain/src/sync.rs
Expand Up @@ -4,10 +4,9 @@
use std::fmt;
use std::marker::PhantomData;

use async_hash::{Output, Sha256};
use async_trait::async_trait;
use destream::{de, FromStream};
use freqfs::{FileLock, FileWriteGuard};
use freqfs::{FileLock, FileSave, FileWriteGuard};
use futures::TryFutureExt;
use get_size::GetSize;
use log::{debug, trace};
Expand All @@ -19,9 +18,10 @@ use tc_collection::Collection;
use tc_error::*;
use tc_scalar::Scalar;
use tc_transact::fs;
use tc_transact::hash::{AsyncHash, Output, Sha256};
use tc_transact::lock::TxnTaskQueue;
use tc_transact::public::{Route, StateInstance};
use tc_transact::{AsyncHash, IntoView, RPCClient, Transact, Transaction, TxnId};
use tc_transact::{Gateway, IntoView, Transact, Transaction, TxnId};
use tc_value::{Link, Value};
use tcgeneric::{label, Label};

Expand Down Expand Up @@ -145,7 +145,7 @@ where
State: StateInstance,
T: AsyncHash + Send + Sync,
{
async fn hash(self, txn_id: TxnId) -> TCResult<Output<Sha256>> {
async fn hash(&self, txn_id: TxnId) -> TCResult<Output<Sha256>> {
self.subject.hash(txn_id).await
}
}
Expand Down Expand Up @@ -350,7 +350,11 @@ where
impl<State, T> de::FromStream for SyncChain<State, State::Txn, State::FE, T>
where
State: StateInstance,
State::FE: DenseCacheFile + AsType<BTreeNode> + AsType<ChainBlock> + AsType<TensorNode>,
State::FE: for<'a> FileSave<'a>
+ DenseCacheFile
+ AsType<BTreeNode>
+ AsType<ChainBlock>
+ AsType<TensorNode>,
T: FromStream<Context = State::Txn>,
{
type Context = State::Txn;
Expand Down
9 changes: 4 additions & 5 deletions host/collection/Cargo.toml
Expand Up @@ -15,15 +15,14 @@ categories = ["database", "database-implementations", "data-structures"]
opencl = ["ha-ndarray/opencl"]

[dependencies]
async-hash = "0.5"
async-trait = "0.1"
b-table = { version = "0.2", features = ["all"] }
b-tree = { version = "0.3", features = ["all"] }
collate = { version = "0.4", features = ["stream"] }
destream = "0.7"
ds-ext = "~0.1.3"
ds-ext = { path = "../../../ds-ext" }
futures = "0.3"
freqfs = { version = "0.9", features = ["logging"] }
freqfs = { version = "~0.9.1", features = ["logging"] }
ha-ndarray = { version = "0.3", features = ["freqfs", "stream"] }
itertools = "0.12"
log = { version = "0.4", features = ["release_max_level_info"] }
Expand All @@ -32,9 +31,9 @@ smallvec = "1.11"
pin-project = "1.1"
rayon = "1.8"
safecast = "0.2"
tc-error = { path = "../error" }
tc-error = { path = "../error", features = ["ha-ndarray"] }
tc-scalar = { path = "../scalar" }
tc-transact = { path = "../transact" }
tc-value = { path = "../value" }
tcgeneric = { path = "../generic" }
tokio = { version = "1.35", features = ["sync"] }
tokio = { version = "1.36", features = ["sync"] }

0 comments on commit 101f275

Please sign in to comment.