Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

move startup and replication logic into a new server sub-crate #295

Merged
merged 66 commits into from Mar 6, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
66 commits
Select commit Hold shift + click to select a range
1d7d865
add new server sub-crate
haydnv Feb 7, 2024
368ae7e
stub out examples dir
haydnv Feb 7, 2024
7b216c9
test characterizing local network interfaces
haydnv Feb 8, 2024
5fa1e4e
test advertising an mDNS service
haydnv Feb 8, 2024
dd0743a
test mDNS peer discovery
haydnv Feb 8, 2024
f311f56
stub out a ServerBuilder struct
haydnv Feb 8, 2024
8c72574
move ServerBuilder into its own module
haydnv Feb 10, 2024
ed90f86
stub out Cluster and Txn structs
haydnv Feb 10, 2024
7e5102a
impl TxnServer
haydnv Feb 10, 2024
0f87d8a
implement ServerBuilder::build
haydnv Feb 10, 2024
7efb24d
move the garbage collection thread into the gateway module
haydnv Feb 10, 2024
fa972ab
move garbage collection logic into the new server module
haydnv Feb 10, 2024
25e02fa
temporarily delete the gateway module
haydnv Feb 12, 2024
d7cbe3e
impl Deserialize + Serialize for Claim
haydnv Feb 12, 2024
f5d453b
impl ReplicaHandler
haydnv Feb 13, 2024
0ddb55c
stub out impl RPCClient for Txn
haydnv Feb 13, 2024
3318254
impl Server::authorize_and_route
haydnv Feb 13, 2024
4009ee6
first draft Kernel::authorize_and_route
haydnv Feb 13, 2024
5e401e5
stub out Txn claim methods
haydnv Feb 13, 2024
01c1db7
impl Txn::grant
haydnv Feb 14, 2024
b4b1758
add a generic State type parameter to Kernel
haydnv Feb 14, 2024
93b5f2b
remove the gateway module and add a generic State type parameter to Txn
haydnv Feb 14, 2024
44cdfcf
rename RPCClient -> Gateway and add a remote-only RPCClient to server…
haydnv Feb 15, 2024
1325357
test starting a Server with a mock State
haydnv Feb 15, 2024
b441bb9
impl Hash for Value
haydnv Feb 15, 2024
4c0ed70
stub out a Cluster keyring type
haydnv Feb 15, 2024
bbebdb2
first draft of Txn::claim
haydnv Feb 15, 2024
11d779a
accept a Borrow<str> in Map::option, Map::require, and Map::remove
haydnv Feb 15, 2024
7ad4b17
test executing a request to /transact/hypothetical
haydnv Feb 15, 2024
a72f0f0
move CacheBlock into the state sub-crate
haydnv Feb 16, 2024
ae4a9c8
update the server sub-crate to depend on the state sub-crate
haydnv Feb 16, 2024
267aef7
add feature flags to the error crate
haydnv Feb 16, 2024
673d7b5
add chain and collection feature flags to the state sub-crate
haydnv Feb 16, 2024
ad88ff5
copy cluster::Dir
haydnv Feb 16, 2024
0da3bcc
copy cluster::Class
haydnv Feb 16, 2024
6be4f64
stub out a root /class endpoint
haydnv Feb 16, 2024
7962242
test creating a new cluster directory at runtime
haydnv Feb 17, 2024
f9ba16c
update inline documentation to include the sequence of events to boot…
haydnv Feb 17, 2024
39b16ec
implement a GET handler for cluster::Dir
haydnv Feb 20, 2024
f8a719e
only claim write transactions & correctly route cluster GET requests …
haydnv Feb 20, 2024
ec1dd22
impl RPCClient for Client & move mDNS service logic into Server::make…
haydnv Feb 20, 2024
1ea98bc
split Builder into two stages
haydnv Feb 20, 2024
c0d4f34
stub out Kernel::replicate_and_join
haydnv Feb 21, 2024
15d8a29
implement Kernel::replicate_and_join
haydnv Feb 21, 2024
3846380
require RPC clients to implement rjwt::Resolve
haydnv Feb 22, 2024
2716839
impl Gateway for Txn
haydnv Feb 22, 2024
3cd9a08
don't use localhost for networking since it does not support mDNS dis…
haydnv Feb 22, 2024
fd67015
impl rjwt::Resolve for Client
haydnv Feb 22, 2024
39bdc5e
remove impl fmt::Display for Value
haydnv Feb 23, 2024
f38f01d
implement transaction claim management for a distributed commit
haydnv Feb 23, 2024
4241cf6
implement Cluster::replicate_commit
haydnv Feb 27, 2024
946fd21
test adding a host to a replica set
haydnv Feb 27, 2024
a4f7c27
first draft of ReplicaSetHandler
haydnv Feb 27, 2024
fabb1e6
use a consistent txn id when fetching a public key
haydnv Feb 28, 2024
d1b6526
remove async_hash dependency for sub-crates which depend on the trans…
haydnv Feb 28, 2024
15064d4
validate the hash of a new cluster replica before allowing it to join…
haydnv Feb 29, 2024
6100135
add egress authorization & tracking to Client
haydnv Feb 29, 2024
dd33011
test adding a new host to a running cluster & verify that the cluster…
haydnv Mar 1, 2024
db2d39e
test adding a new class set version to a running cluster
haydnv Mar 1, 2024
9f8f66b
implement a versioned Library & its handlers
haydnv Mar 5, 2024
aa96a89
implement Cluster::replicate_rollback
haydnv Mar 5, 2024
0a2915a
update to the latest txfs
haydnv Mar 5, 2024
f8fb633
provide an improved error message if a duplicate directory entry is r…
haydnv Mar 5, 2024
47b731c
remove unused code
haydnv Mar 5, 2024
1d34cd8
update trait bounds in the chain and collection sub-crates, and add t…
haydnv Mar 6, 2024
3db6717
impl Service
haydnv Mar 6, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
13 changes: 13 additions & 0 deletions host/Cargo.toml
Expand Up @@ -11,6 +11,19 @@ repository = "https://github.com/haydnv/tinychain.git"
keywords = ["distributed", "transactional", "host", "platform", "runtime"]
categories = ["concurrency", "database-implementations", "data-structures", "hardware-support", "web-programming"]

[workspace]
members = [
"chain",
"collection",
"error",
"generic",
"scalar",
"server",
"state",
"transact",
"value",
]

[[bin]]
name = "tinychain"
path = "src/main.rs"
Expand Down
3 changes: 1 addition & 2 deletions host/chain/Cargo.toml
Expand Up @@ -9,12 +9,11 @@ readme = "README.md"
repository = "https://github.com/haydnv/tinychain.git"

[dependencies]
async-hash = "0.5"
async-trait = "0.1"
bytes = "1.3"
destream = "0.7"
futures = "0.3"
freqfs = "0.9"
freqfs = "~0.9.1"
get-size = "0.1"
hex = "0.4"
log = { version = "0.4" }
Expand Down
13 changes: 9 additions & 4 deletions host/chain/src/block.rs
Expand Up @@ -5,10 +5,10 @@
use std::fmt;
use std::marker::PhantomData;

use async_hash::{Output, Sha256};
use async_trait::async_trait;
use bytes::Bytes;
use destream::de;
use freqfs::FileSave;
use futures::future::TryFutureExt;
use futures::join;
use log::debug;
Expand All @@ -20,8 +20,9 @@ use tc_collection::Collection;
use tc_error::*;
use tc_scalar::Scalar;
use tc_transact::fs;
use tc_transact::hash::{AsyncHash, Output, Sha256};
use tc_transact::public::{Route, StateInstance};
use tc_transact::{AsyncHash, IntoView, Transact, Transaction, TxnId};
use tc_transact::{IntoView, Transact, Transaction, TxnId};
use tc_value::Value;
use tcgeneric::{Map, ThreadSafe, Tuple};

Expand Down Expand Up @@ -203,7 +204,7 @@ where
State::FE: AsType<ChainBlock> + for<'a> fs::FileSave<'a>,
T: Send + Sync,
{
async fn hash(self, txn_id: TxnId) -> TCResult<Output<Sha256>> {
async fn hash(&self, txn_id: TxnId) -> TCResult<Output<Sha256>> {
self.history.hash(txn_id).await
}
}
Expand Down Expand Up @@ -271,7 +272,11 @@ where
impl<'en, State, T> IntoView<'en, State::FE> for BlockChain<State, State::Txn, State::FE, T>
where
State: StateInstance,
State::FE: DenseCacheFile + AsType<ChainBlock> + AsType<BTreeNode> + AsType<TensorNode>,
State::FE: for<'a> FileSave<'a>
+ DenseCacheFile
+ AsType<ChainBlock>
+ AsType<BTreeNode>
+ AsType<TensorNode>,
T: IntoView<'en, State::FE, Txn = State::Txn> + Send + Sync,
{
type Txn = State::Txn;
Expand Down
2 changes: 1 addition & 1 deletion host/chain/src/data/block.rs
@@ -1,7 +1,6 @@
use std::collections::btree_map::{BTreeMap, Entry};
use std::fmt;

use async_hash::{Digest, Hash, Output, Sha256};
use async_trait::async_trait;
use bytes::Bytes;
use destream::{de, en};
Expand All @@ -11,6 +10,7 @@ use log::debug;

use tc_error::*;
use tc_scalar::Scalar;
use tc_transact::hash::{Digest, Hash, Output, Sha256};
use tc_transact::TxnId;
use tc_value::Value;

Expand Down
26 changes: 16 additions & 10 deletions host/chain/src/data/history.rs
Expand Up @@ -3,12 +3,10 @@ use std::fmt;
use std::iter;
use std::marker::PhantomData;

use async_hash::generic_array::GenericArray;
use async_hash::{Output, Sha256};
use async_trait::async_trait;
use bytes::Bytes;
use destream::{de, en};
use freqfs::{DirLock, DirWriteGuard, FileLock, FileReadGuard, FileReadGuardOwned, FileWriteGuard};
use freqfs::*;
use futures::stream::{self, StreamExt};
use futures::{try_join, TryFutureExt, TryStreamExt};
use get_size::GetSize;
Expand All @@ -21,9 +19,10 @@ use tc_collection::Collection;
use tc_error::*;
use tc_scalar::Scalar;
use tc_transact::fs;
use tc_transact::hash::{AsyncHash, GenericArray, Output, Sha256};
use tc_transact::lock::{TxnLock, TxnTaskQueue};
use tc_transact::public::{Public, Route, StateInstance};
use tc_transact::{AsyncHash, IntoView, Transact, Transaction, TxnId};
use tc_transact::{IntoView, Transact, Transaction, TxnId};
use tc_value::Value;
use tcgeneric::{label, Label, Map, TCBoxStream, TCBoxTryStream, ThreadSafe, Tuple};

Expand Down Expand Up @@ -430,7 +429,7 @@ where
State: StateInstance,
State::FE: AsType<ChainBlock> + for<'a> fs::FileSave<'a>,
{
async fn hash(self, txn_id: TxnId) -> TCResult<Output<Sha256>> {
async fn hash(&self, txn_id: TxnId) -> TCResult<Output<Sha256>> {
let latest_block_id = self.latest.read(txn_id).await?;
let latest_block = self.read_block(*latest_block_id).await?;

Expand Down Expand Up @@ -617,7 +616,11 @@ where
impl<'en, State> IntoView<'en, State::FE> for History<State, State::Txn, State::FE>
where
State: StateInstance,
State::FE: DenseCacheFile + AsType<ChainBlock> + AsType<BTreeNode> + AsType<TensorNode>,
State::FE: for<'a> FileSave<'a>
+ DenseCacheFile
+ AsType<ChainBlock>
+ AsType<BTreeNode>
+ AsType<TensorNode>,
{
type Txn = State::Txn;
type View = HistoryView<'en>;
Expand Down Expand Up @@ -812,9 +815,8 @@ async fn parse_block_state<State>(
block_data: Map<Tuple<State>>,
) -> TCResult<BTreeMap<TxnId, Vec<MutationRecord>>>
where
State: StateInstance,
State::FE: DenseCacheFile + AsType<BTreeNode> + AsType<TensorNode>,
State: StateInstance + From<Scalar>,
State::FE: for<'a> FileSave<'a> + DenseCacheFile + AsType<BTreeNode> + AsType<TensorNode>,
Collection<State::Txn, State::FE>: TryCastFrom<State>,
Scalar: TryCastFrom<State>,
Value: TryCastFrom<State>,
Expand Down Expand Up @@ -859,7 +861,11 @@ async fn replay_and_save<State, T>(
) -> TCResult<()>
where
State: StateInstance + From<Collection<State::Txn, State::FE>> + From<Scalar>,
State::FE: DenseCacheFile + AsType<ChainBlock> + AsType<BTreeNode> + AsType<TensorNode>,
State::FE: for<'a> FileSave<'a>
+ DenseCacheFile
+ AsType<ChainBlock>
+ AsType<BTreeNode>
+ AsType<TensorNode>,
T: Route<State> + fmt::Debug,
Collection<State::Txn, State::FE>: TryCastFrom<State>,
Scalar: TryCastFrom<State>,
Expand Down Expand Up @@ -908,7 +914,7 @@ async fn load_history<'en, State>(
) -> TCResult<MutationView<'en>>
where
State: StateInstance,
State::FE: DenseCacheFile + AsType<BTreeNode> + AsType<TensorNode>,
State::FE: for<'a> FileSave<'a> + DenseCacheFile + AsType<BTreeNode> + AsType<TensorNode>,
{
match op {
MutationRecord::Delete(key) => Ok(MutationView::Delete(key)),
Expand Down
7 changes: 5 additions & 2 deletions host/chain/src/data/mod.rs
@@ -1,5 +1,6 @@
use std::fmt;

use freqfs::FileSave;
use futures::TryFutureExt;
use safecast::{AsType, TryCastFrom};

Expand All @@ -26,7 +27,8 @@ pub(super) async fn replay_all<State, T>(
) -> TCResult<()>
where
State: StateInstance + From<Collection<State::Txn, State::FE>> + From<Scalar>,
State::FE: DenseCacheFile + AsType<BTreeNode> + AsType<TensorNode> + Clone,
State::FE:
for<'a> FileSave<'a> + DenseCacheFile + AsType<BTreeNode> + AsType<TensorNode> + Clone,
T: Route<State> + fmt::Debug,
Collection<State::Txn, State::FE>: TryCastFrom<State>,
Scalar: TryCastFrom<State>,
Expand All @@ -49,7 +51,8 @@ async fn replay<State, T>(
) -> TCResult<()>
where
State: StateInstance + From<Collection<State::Txn, State::FE>> + From<Scalar>,
State::FE: DenseCacheFile + AsType<BTreeNode> + AsType<TensorNode> + Clone,
State::FE:
for<'a> FileSave<'a> + DenseCacheFile + AsType<BTreeNode> + AsType<TensorNode> + Clone,
T: Route<State> + fmt::Debug,
Collection<State::Txn, State::FE>: TryCastFrom<State>,
Scalar: TryCastFrom<State>,
Expand Down
12 changes: 7 additions & 5 deletions host/chain/src/data/store.rs
Expand Up @@ -3,6 +3,7 @@ use std::marker::PhantomData;

use async_trait::async_trait;
use destream::en;
use freqfs::FileSave;
use futures::future::TryFutureExt;
use log::debug;
use safecast::*;
Expand All @@ -13,8 +14,9 @@ use tc_collection::{btree, Collection, CollectionBase, CollectionView, Schema};
use tc_error::*;
use tc_scalar::{OpRef, Scalar, TCRef};
use tc_transact::fs;
use tc_transact::hash::{AsyncHash, Hash, Output, Sha256};
use tc_transact::public::StateInstance;
use tc_transact::{AsyncHash, IntoView, Transact, Transaction, TxnId};
use tc_transact::{IntoView, Transact, Transaction, TxnId};
use tc_value::Value;
use tcgeneric::{Id, Instance, NativeClass, ThreadSafe};

Expand Down Expand Up @@ -75,12 +77,12 @@ where
FE: DenseCacheFile + AsType<BTreeNode> + AsType<TensorNode> + Clone,
Txn: Transaction<FE>,
Collection<Txn, FE>: AsyncHash,
Scalar: async_hash::Hash<async_hash::Sha256>,
Scalar: Hash<Sha256>,
{
async fn hash(self, txn_id: TxnId) -> TCResult<async_hash::Output<async_hash::Sha256>> {
async fn hash(&self, txn_id: TxnId) -> TCResult<Output<Sha256>> {
match self {
StoreEntry::Collection(collection) => collection.clone().hash(txn_id).await,
StoreEntry::Scalar(scalar) => Ok(async_hash::Hash::<async_hash::Sha256>::hash(scalar)),
StoreEntry::Scalar(scalar) => Ok(Hash::<Sha256>::hash(scalar)),
}
}
}
Expand Down Expand Up @@ -156,7 +158,7 @@ impl<Txn, FE> Store<Txn, FE> {
impl<Txn, FE> Store<Txn, FE>
where
Txn: Transaction<FE>,
FE: DenseCacheFile + AsType<BTreeNode> + AsType<TensorNode> + Clone,
FE: for<'a> FileSave<'a> + DenseCacheFile + AsType<BTreeNode> + AsType<TensorNode> + Clone,
BTreeNode: freqfs::FileLoad,
{
pub async fn resolve(&self, txn_id: TxnId, scalar: Scalar) -> TCResult<StoreEntry<Txn, FE>> {
Expand Down
11 changes: 6 additions & 5 deletions host/chain/src/lib.rs
Expand Up @@ -4,11 +4,10 @@ use std::fmt;
use std::marker::PhantomData;
use std::sync::Arc;

use async_hash::generic_array::GenericArray;
use async_hash::{Output, Sha256};
use async_trait::async_trait;
use bytes::Bytes;
use destream::{de, en};
use freqfs::FileSave;
use futures::future::TryFutureExt;
use safecast::{AsType, TryCastFrom};

Expand All @@ -18,9 +17,10 @@ use tc_collection::Collection;
use tc_error::*;
use tc_scalar::Scalar;
use tc_transact::fs;
use tc_transact::hash::{AsyncHash, GenericArray, Output, Sha256};
use tc_transact::lock::TxnTaskQueue;
use tc_transact::public::{Route, StateInstance};
use tc_transact::{AsyncHash, IntoView, Transact, Transaction, TxnId};
use tc_transact::{IntoView, Transact, Transaction, TxnId};
use tc_value::Value;
use tcgeneric::*;

Expand Down Expand Up @@ -190,7 +190,7 @@ where
State::FE: AsType<ChainBlock> + for<'a> fs::FileSave<'a> + ThreadSafe,
T: AsyncHash + Send + Sync,
{
async fn hash(self, txn_id: TxnId) -> TCResult<Output<Sha256>> {
async fn hash(&self, txn_id: TxnId) -> TCResult<Output<Sha256>> {
match self {
Self::Block(chain) => chain.hash(txn_id).await,
Self::Sync(chain) => chain.hash(txn_id).await,
Expand Down Expand Up @@ -550,7 +550,8 @@ fn new_queue<State>(
) -> TxnTaskQueue<MutationPending<State::Txn, State::FE>, TCResult<MutationRecord>>
where
State: StateInstance,
State::FE: DenseCacheFile + AsType<BTreeNode> + AsType<TensorNode> + Clone,
State::FE:
for<'a> FileSave<'a> + DenseCacheFile + AsType<BTreeNode> + AsType<TensorNode> + Clone,
{
TxnTaskQueue::new(Arc::pin(move |mutation| {
let store = store.clone();
Expand Down
14 changes: 9 additions & 5 deletions host/chain/src/sync.rs
Expand Up @@ -4,10 +4,9 @@
use std::fmt;
use std::marker::PhantomData;

use async_hash::{Output, Sha256};
use async_trait::async_trait;
use destream::{de, FromStream};
use freqfs::{FileLock, FileWriteGuard};
use freqfs::{FileLock, FileSave, FileWriteGuard};
use futures::TryFutureExt;
use get_size::GetSize;
use log::{debug, trace};
Expand All @@ -19,9 +18,10 @@ use tc_collection::Collection;
use tc_error::*;
use tc_scalar::Scalar;
use tc_transact::fs;
use tc_transact::hash::{AsyncHash, Output, Sha256};
use tc_transact::lock::TxnTaskQueue;
use tc_transact::public::{Route, StateInstance};
use tc_transact::{AsyncHash, IntoView, RPCClient, Transact, Transaction, TxnId};
use tc_transact::{Gateway, IntoView, Transact, Transaction, TxnId};
use tc_value::{Link, Value};
use tcgeneric::{label, Label};

Expand Down Expand Up @@ -145,7 +145,7 @@ where
State: StateInstance,
T: AsyncHash + Send + Sync,
{
async fn hash(self, txn_id: TxnId) -> TCResult<Output<Sha256>> {
async fn hash(&self, txn_id: TxnId) -> TCResult<Output<Sha256>> {
self.subject.hash(txn_id).await
}
}
Expand Down Expand Up @@ -350,7 +350,11 @@ where
impl<State, T> de::FromStream for SyncChain<State, State::Txn, State::FE, T>
where
State: StateInstance,
State::FE: DenseCacheFile + AsType<BTreeNode> + AsType<ChainBlock> + AsType<TensorNode>,
State::FE: for<'a> FileSave<'a>
+ DenseCacheFile
+ AsType<BTreeNode>
+ AsType<ChainBlock>
+ AsType<TensorNode>,
T: FromStream<Context = State::Txn>,
{
type Context = State::Txn;
Expand Down
9 changes: 4 additions & 5 deletions host/collection/Cargo.toml
Expand Up @@ -15,15 +15,14 @@ categories = ["database", "database-implementations", "data-structures"]
opencl = ["ha-ndarray/opencl"]

[dependencies]
async-hash = "0.5"
async-trait = "0.1"
b-table = { version = "0.2", features = ["all"] }
b-tree = { version = "0.3", features = ["all"] }
collate = { version = "0.4", features = ["stream"] }
destream = "0.7"
ds-ext = "~0.1.3"
ds-ext = { path = "../../../ds-ext" }
futures = "0.3"
freqfs = { version = "0.9", features = ["logging"] }
freqfs = { version = "~0.9.1", features = ["logging"] }
ha-ndarray = { version = "0.3", features = ["freqfs", "stream"] }
itertools = "0.12"
log = { version = "0.4", features = ["release_max_level_info"] }
Expand All @@ -32,9 +31,9 @@ smallvec = "1.11"
pin-project = "1.1"
rayon = "1.8"
safecast = "0.2"
tc-error = { path = "../error" }
tc-error = { path = "../error", features = ["ha-ndarray"] }
tc-scalar = { path = "../scalar" }
tc-transact = { path = "../transact" }
tc-value = { path = "../value" }
tcgeneric = { path = "../generic" }
tokio = { version = "1.35", features = ["sync"] }
tokio = { version = "1.36", features = ["sync"] }