Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Disconnect dangling clients #1132

Merged
merged 1 commit into from
May 23, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
33 changes: 25 additions & 8 deletions crates/core/src/database_instance_context.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use super::database_logger::DatabaseLogger;
use crate::db::relational_db::RelationalDB;
use crate::db::relational_db::{ConnectedClients, RelationalDB};
use crate::db::{Config, Storage};
use crate::error::DBError;
use crate::messages::control_db::Database;
Expand All @@ -11,6 +11,7 @@ use std::sync::Arc;

pub type Result<T> = anyhow::Result<T>;

/// A "live" database.
#[derive(Clone)]
pub struct DatabaseInstanceContext {
pub database: Database,
Expand All @@ -21,31 +22,47 @@ pub struct DatabaseInstanceContext {
}

impl DatabaseInstanceContext {
/// Construct a [`DatabaseInstanceContext`] from a [`Database`] and
/// additional configuration.
///
/// Alongside `Self`, the set of clients who were connected as of the most
/// recent transaction is returned as a [`ConnectedClients`]. If the value
/// `Some`, the set is non-empty. `__disconnect__` should be called for
/// each entry.
pub fn from_database(
config: Config,
database: Database,
instance_id: u64,
root_db_path: PathBuf,
rt: tokio::runtime::Handle,
) -> Result<Self> {
) -> Result<(Self, Option<ConnectedClients>)> {
let mut db_path = root_db_path;
db_path.extend([&*database.address.to_hex(), &*instance_id.to_string()]);
db_path.push("database");

let log_path = DatabaseLogger::filepath(&database.address, instance_id);
let relational_db = Arc::new(match config.storage {
Storage::Memory => RelationalDB::open(db_path, database.address, None)?,
Storage::Disk => RelationalDB::local(db_path, rt, database.address)?,
});
let (relational_db, dangling_connections) = match config.storage {
Storage::Memory => {
let db = RelationalDB::open(db_path, database.address, None)?;
(Arc::new(db), None)
}
Storage::Disk => {
let (db, connected_clients) = RelationalDB::local(db_path, rt, database.address)?;
let connected_clients = (!connected_clients.is_empty()).then_some(connected_clients);
(Arc::new(db), connected_clients)
}
};
let subscriptions = ModuleSubscriptions::new(relational_db.clone(), database.identity);

Ok(Self {
let dbic = Self {
database,
database_instance_id: instance_id,
logger: Arc::new(DatabaseLogger::open(log_path)),
subscriptions,
relational_db,
})
};

Ok((dbic, dangling_connections))
}

pub fn scheduler_db_path(&self, root_db_path: PathBuf) -> PathBuf {
Expand Down
57 changes: 55 additions & 2 deletions crates/core/src/db/datastore/locking_tx_datastore/datastore.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,14 +26,15 @@ use crate::{
use anyhow::{anyhow, Context};
use parking_lot::{Mutex, RwLock};
use spacetimedb_commitlog::payload::{txdata, Txdata};
use spacetimedb_lib::Identity;
use spacetimedb_primitives::{ColList, ConstraintId, IndexId, SequenceId, TableId};
use spacetimedb_sats::db::def::{IndexDef, SequenceDef, TableDef, TableSchema};
use spacetimedb_sats::{bsatn, buffer::BufReader, hash::Hash, AlgebraicValue, ProductValue};
use spacetimedb_table::{indexes::RowPointer, table::RowRef};
use std::sync::Arc;
use std::time::Instant;
use std::{borrow::Cow, time::Duration};
use std::{cell::RefCell, ops::RangeBounds};
use std::{cell::RefCell, collections::HashSet, ops::RangeBounds};
use thiserror::Error;

pub type Result<T> = std::result::Result<T, DBError>;
Expand Down Expand Up @@ -123,6 +124,7 @@ impl Locking {
database_address: self.database_address,
committed_state: self.committed_state.clone(),
progress: RefCell::new(progress),
connected_clients: RefCell::new(HashSet::new()),
}
}
}
Expand Down Expand Up @@ -589,6 +591,17 @@ pub struct Replay<F> {
database_address: Address,
committed_state: Arc<RwLock<CommittedState>>,
progress: RefCell<F>,
/// Tracks the connect / disconnect calls recorded in the transaction history.
///
/// If non-empty after a replay, the remaining entries were not gracefully
/// disconnected. A disconnect call should be performed for each.
connected_clients: RefCell<HashSet<(Identity, Address)>>,
}

impl<F> Replay<F> {
pub fn into_connected_clients(self) -> HashSet<(Identity, Address)> {
self.connected_clients.into_inner()
}
}

impl<F> Replay<F> {
Expand All @@ -598,6 +611,7 @@ impl<F> Replay<F> {
database_address: &self.database_address,
committed_state: &mut committed_state,
progress: &mut *self.progress.borrow_mut(),
connected_clients: &mut self.connected_clients.borrow_mut(),
};
f(&mut visitor)
}
Expand Down Expand Up @@ -626,6 +640,21 @@ impl<F: FnMut(u64)> spacetimedb_commitlog::Decoder for Replay<F> {
}
}

impl<F: FnMut(u64)> spacetimedb_commitlog::Decoder for &mut Replay<F> {
type Record = txdata::Txdata<ProductValue>;
type Error = txdata::DecoderError<ReplayError>;

#[inline]
fn decode_record<'a, R: BufReader<'a>>(
&self,
version: u8,
tx_offset: u64,
reader: &mut R,
) -> std::result::Result<Self::Record, Self::Error> {
spacetimedb_commitlog::Decoder::decode_record(&**self, version, tx_offset, reader)
}
}

// n.b. (Tyler) We actually **do not** want to check constraints at replay
// time because not only is it a pain, but actually **subtly wrong** the
// way we have it implemented. It's wrong because the actual constraints of
Expand Down Expand Up @@ -666,6 +695,7 @@ struct ReplayVisitor<'a, F> {
database_address: &'a Address,
committed_state: &'a mut CommittedState,
progress: &'a mut F,
connected_clients: &'a mut HashSet<(Identity, Address)>,
}

impl<F: FnMut(u64)> spacetimedb_commitlog::payload::txdata::Visitor for ReplayVisitor<'_, F> {
Expand Down Expand Up @@ -759,11 +789,34 @@ impl<F: FnMut(u64)> spacetimedb_commitlog::payload::txdata::Visitor for ReplayVi
Ok(())
}

fn visit_tx_end(&mut self) -> std::prelude::v1::Result<(), Self::Error> {
fn visit_tx_end(&mut self) -> std::result::Result<(), Self::Error> {
self.committed_state.next_tx_offset += 1;

Ok(())
}

fn visit_inputs(&mut self, inputs: &txdata::Inputs) -> std::result::Result<(), Self::Error> {
let decode_caller = || {
let buf = &mut inputs.reducer_args.as_ref();
let caller_identity: Identity = bsatn::from_reader(buf).context("Could not decode caller identity")?;
let caller_address: Address = bsatn::from_reader(buf).context("Could not decode caller address")?;
anyhow::Ok((caller_identity, caller_address))
};
if let Some(action) = inputs.reducer_name.strip_prefix("__identity_") {
let caller = decode_caller()?;
match action {
"connected__" => {
self.connected_clients.insert(caller);
}
"disconnected__" => {
self.connected_clients.remove(&caller);
}
_ => {}
}
}

Ok(())
}
}

#[cfg(test)]
Expand Down
52 changes: 40 additions & 12 deletions crates/core/src/db/relational_db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,15 @@ use fs2::FileExt;
use parking_lot::RwLock;
use spacetimedb_commitlog as commitlog;
use spacetimedb_durability::{self as durability, Durability};
use spacetimedb_lib::Identity;
use spacetimedb_primitives::*;
use spacetimedb_sats::db::auth::{StAccess, StTableType};
use spacetimedb_sats::db::def::{ColumnDef, IndexDef, SequenceDef, TableDef, TableSchema};
use spacetimedb_sats::{AlgebraicType, AlgebraicValue, ProductType, ProductValue};
use spacetimedb_table::indexes::RowPointer;
use spacetimedb_vm::errors::ErrorVm;
use std::borrow::Cow;
use std::collections::HashSet;
use std::fs::{create_dir_all, File};
use std::io;
use std::ops::RangeBounds;
Expand All @@ -49,6 +51,10 @@ type DiskSizeFn = Arc<dyn Fn() -> io::Result<u64> + Send + Sync>;

pub type Txdata = commitlog::payload::Txdata<ProductValue>;

/// Clients for which a connect reducer call was found in the [`History`], but
/// no corresponding disconnect.
pub type ConnectedClients = HashSet<(Identity, Address)>;

#[derive(Clone)]
pub struct RelationalDB {
// TODO(cloutiertyler): This should not be public
Expand All @@ -61,9 +67,12 @@ pub struct RelationalDB {
/// is `Some`, `None` otherwise.
disk_size_fn: Option<DiskSizeFn>,

// Release file lock last when dropping.
_lock: Arc<File>,
config: Arc<RwLock<DatabaseConfig>>,

// DO NOT ADD FIELDS AFTER THIS.
// By default, fields are dropped in declaration order.
// We want to release the file lock last.
_lock: Arc<File>,
gefjon marked this conversation as resolved.
Show resolved Hide resolved
}

impl std::fmt::Debug for RelationalDB {
Expand All @@ -84,7 +93,15 @@ impl RelationalDB {
///
/// The [`tokio::runtime::Handle`] is used to spawn background tasks which
/// take care of flushing and syncing the log.
pub fn local(root: impl AsRef<Path>, rt: tokio::runtime::Handle, address: Address) -> Result<Self, DBError> {
///
/// Alongside `Self`, the set of clients who were connected as of the most
/// recent transaction is returned as a [`ConnectedClients`].
/// `__disconnect__` should be called for each entry.
pub fn local(
root: impl AsRef<Path>,
rt: tokio::runtime::Handle,
address: Address,
) -> Result<(Self, ConnectedClients), DBError> {
kim marked this conversation as resolved.
Show resolved Hide resolved
let log_dir = root.as_ref().join("clog");
create_dir_all(&log_dir)?;
let durability = durability::Local::open(
Expand Down Expand Up @@ -140,10 +157,11 @@ impl RelationalDB {
row_count_fn,
disk_size_fn,

_lock: Arc::new(lock),
config: Arc::new(RwLock::new(DatabaseConfig::with_slow_query(
SlowQueryConfig::with_defaults(),
))),

_lock: Arc::new(lock),
})
}

Expand All @@ -152,9 +170,12 @@ impl RelationalDB {
///
/// Consumes `self` in order to ensure exclusive access, and to prevent use
/// of the database in case of an incomplete replay.
///
/// This restriction may be lifted in the future to allow for "live" followers.
pub fn apply<T>(self, history: T) -> Result<Self, DBError>
///
/// Alongside `Self`, the set of clients who were connected as of the most
/// recent transaction is returned as a [`ConnectedClients`].
/// `__disconnect__` should be called for each entry.
pub fn apply<T>(self, history: T) -> Result<(Self, ConnectedClients), DBError>
kim marked this conversation as resolved.
Show resolved Hide resolved
where
T: durability::History<TxData = Txdata>,
{
Expand Down Expand Up @@ -187,14 +208,16 @@ impl RelationalDB {
}
};

let mut replay = self.inner.replay(progress);
history
.fold_transactions_from(0, self.inner.replay(progress))
.fold_transactions_from(0, &mut replay)
.map_err(anyhow::Error::from)?;
log::info!("[{}] DATABASE: applied transaction history", self.address);
self.inner.rebuild_state_after_replay()?;
log::info!("[{}] DATABASE: rebuilt state after replay", self.address);
let connected_clients = replay.into_connected_clients();

Ok(self)
Ok((self, connected_clients))
}

/// Returns an approximate row count for a particular table.
Expand Down Expand Up @@ -314,6 +337,7 @@ impl RelationalDB {
pub fn commit_tx(&self, ctx: &ExecutionContext, tx: MutTx) -> Result<Option<TxData>, DBError> {
log::trace!("COMMIT MUT TX");

// TODO: Never returns `None` -- should it?
let Some(tx_data) = self.inner.commit_mut_tx(ctx, tx)? else {
return Ok(None);
};
Expand Down Expand Up @@ -1010,11 +1034,15 @@ pub mod tests_utils {
move || handle.size_on_disk()
});

let rdb = RelationalDB::open(root, Self::ADDRESS, Some((handle.clone(), disk_size_fn)))?
.apply(handle.clone())?
.with_row_count(Self::row_count_fn());
let (db, connected_clients) = {
let db = RelationalDB::open(root, Self::ADDRESS, Some((handle.clone(), disk_size_fn)))?;
db.apply(handle.clone())?
};
// TODO: Should we be able to handle the non-empty case?
// `RelationalDB` cannot exist on its own then.
debug_assert!(connected_clients.is_empty());

Ok((rdb, handle))
Ok((db.with_row_count(Self::row_count_fn()), handle))
}

// NOTE: This is important to make compiler tests work.
Expand Down
21 changes: 18 additions & 3 deletions crates/core/src/host/host_controller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use super::module_host::{EntityDef, EventStatus, ModuleHost, NoSuchModule, Updat
use super::{ReducerArgs, Scheduler};
use crate::database_instance_context::DatabaseInstanceContext;
use crate::db::db_metrics::DB_METRICS;
use crate::db::relational_db::RelationalDB;
use crate::db::relational_db::{ConnectedClients, RelationalDB};
use crate::energy::{EnergyMonitor, EnergyQuanta};
use crate::execution_context::ExecutionContext;
use crate::messages::control_db::{Database, HostType};
Expand Down Expand Up @@ -621,7 +621,7 @@ async fn make_dbic(
config: db::Config,
database: Database,
instance_id: u64,
) -> anyhow::Result<DatabaseInstanceContext> {
) -> anyhow::Result<(DatabaseInstanceContext, Option<ConnectedClients>)> {
let root_dir = root_dir.to_path_buf();
let rt = tokio::runtime::Handle::current();
spawn_rayon(move || {
Expand Down Expand Up @@ -717,7 +717,8 @@ impl Host {
) -> anyhow::Result<Self> {
let host_type = database.host_type;
let program_hash = database.program_bytes_address;
let dbic = make_dbic(root_dir, config, database, instance_id).await.map(Arc::new)?;
let (dbic, connected_clients) = make_dbic(root_dir, config, database, instance_id).await?;
let dbic = Arc::new(dbic);

let program_bytes = load_program(&dbic.relational_db, &program_storage, &program_hash)?;
let (scheduler, scheduler_starter) = Scheduler::open(dbic.scheduler_db_path(root_dir.to_path_buf()))?;
Expand All @@ -734,6 +735,20 @@ impl Host {
)
.await?;

if let Some(connected_clients) = connected_clients {
for (identity, address) in connected_clients {
module_host
.call_identity_connected_disconnected(identity, address, false)
.await
.with_context(|| {
format!(
"Error calling disconnect for {} {} on {}",
identity, address, dbic.address
)
})?;
}
}

scheduler_starter.start(&module_host)?;
let metrics_task = tokio::spawn(disk_monitor(dbic.clone(), energy_monitor.clone())).abort_handle();

Expand Down
7 changes: 3 additions & 4 deletions crates/core/src/host/module_host.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use super::wasm_common::{CLIENT_CONNECTED_DUNDER, CLIENT_DISCONNECTED_DUNDER};
use super::{ArgsTuple, InvalidReducerArguments, ReducerArgs, ReducerCallResult, ReducerId, Timestamp};
use crate::client::{ClientActorId, ClientConnectionSender};
use crate::database_instance_context::DatabaseInstanceContext;
Expand Down Expand Up @@ -661,12 +662,10 @@ impl ModuleHost {
caller_address: Address,
connected: bool,
) -> Result<(), ReducerCallError> {
// TODO: DUNDER consts are in wasm_common, so seems weird to use them
// here. But maybe there should be dunders for this?
let reducer_name = if connected {
"__identity_connected__"
CLIENT_CONNECTED_DUNDER
} else {
"__identity_disconnected__"
CLIENT_DISCONNECTED_DUNDER
};

self.call_reducer_inner(
Expand Down