Skip to content

Commit

Permalink
Disconnect dangling clients
Browse files Browse the repository at this point in the history
When replaying the commitlog, keep track of unpaired connect/disconnect
calls and call disconnect when instantiating the module.
  • Loading branch information
kim committed Apr 22, 2024
1 parent c9381af commit 31b332d
Show file tree
Hide file tree
Showing 8 changed files with 128 additions and 32 deletions.
17 changes: 10 additions & 7 deletions crates/core/src/database_instance_context.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use super::database_logger::DatabaseLogger;
use crate::db::relational_db::RelationalDB;
use crate::db::relational_db::{ConnectedClients, RelationalDB};
use crate::db::{Config, Storage};
use crate::error::DBError;
use crate::messages::control_db::Database;
Expand All @@ -25,23 +25,26 @@ impl DatabaseInstanceContext {
instance_id: u64,
root_db_path: PathBuf,
rt: tokio::runtime::Handle,
) -> Result<Self> {
) -> Result<(Self, Option<ConnectedClients>)> {
let mut db_path = root_db_path;
db_path.extend([&*database.address.to_hex(), &*instance_id.to_string()]);
db_path.push("database");

let log_path = DatabaseLogger::filepath(&database.address, instance_id);
let relational_db = match config.storage {
Storage::Memory => RelationalDB::open(db_path, database.address, None)?,
Storage::Disk => RelationalDB::local(db_path, rt, database.address)?,
let (relational_db, dangling_connections) = match config.storage {
Storage::Memory => RelationalDB::open(db_path, database.address, None).map(|db| (db, None))?,
Storage::Disk => RelationalDB::local(db_path, rt, database.address)
.map(|(db, connected_clients)| (db, (!connected_clients.is_empty()).then_some(connected_clients)))?,
};

Ok(Self {
let dbic = Self {
database,
database_instance_id: instance_id,
logger: Arc::new(DatabaseLogger::open(log_path)),
relational_db: Arc::new(relational_db),
})
};

Ok((dbic, dangling_connections))
}

pub fn scheduler_db_path(&self, root_db_path: PathBuf) -> PathBuf {
Expand Down
60 changes: 55 additions & 5 deletions crates/core/src/db/datastore/locking_tx_datastore/datastore.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,14 +25,16 @@ use crate::{
};
use anyhow::{anyhow, Context};
use parking_lot::{Mutex, RwLock};
use spacetimedb_commitlog::payload::txdata;
use spacetimedb_lib::Identity;
use spacetimedb_primitives::{ColList, ConstraintId, IndexId, SequenceId, TableId};
use spacetimedb_sats::db::def::{IndexDef, SequenceDef, TableDef, TableSchema};
use spacetimedb_sats::{bsatn, buffer::BufReader, hash::Hash, AlgebraicValue, ProductValue};
use spacetimedb_table::{indexes::RowPointer, table::RowRef};
use std::sync::Arc;
use std::time::Instant;
use std::{borrow::Cow, time::Duration};
use std::{cell::RefCell, ops::RangeBounds};
use std::{cell::RefCell, collections::HashSet, ops::RangeBounds};
use thiserror::Error;

pub type Result<T> = std::result::Result<T, DBError>;
Expand Down Expand Up @@ -122,6 +124,7 @@ impl Locking {
database_address: self.database_address,
committed_state: self.committed_state.clone(),
progress: RefCell::new(progress),
connected_clients: RefCell::new(HashSet::new()),
}
}
}
Expand Down Expand Up @@ -561,11 +564,18 @@ pub struct Replay<F> {
database_address: Address,
committed_state: Arc<RwLock<CommittedState>>,
progress: RefCell<F>,
connected_clients: RefCell<HashSet<(Identity, Address)>>,
}

impl<F> Replay<F> {
pub fn into_connected_clients(self) -> HashSet<(Identity, Address)> {
self.connected_clients.into_inner()
}
}

impl<F: FnMut(u64)> spacetimedb_commitlog::Decoder for Replay<F> {
type Record = spacetimedb_commitlog::payload::Txdata<ProductValue>;
type Error = spacetimedb_commitlog::payload::txdata::DecoderError<ReplayError>;
type Record = txdata::Txdata<ProductValue>;
type Error = txdata::DecoderError<ReplayError>;

fn decode_record<'a, R: BufReader<'a>>(
&self,
Expand All @@ -578,8 +588,24 @@ impl<F: FnMut(u64)> spacetimedb_commitlog::Decoder for Replay<F> {
database_address: &self.database_address,
committed_state: &mut committed_state,
progress: &mut *self.progress.borrow_mut(),
connected_clients: &mut self.connected_clients.borrow_mut(),
};
spacetimedb_commitlog::payload::txdata::decode_record_fn(&mut visitor, version, tx_offset, reader)
txdata::decode_record_fn(&mut visitor, version, tx_offset, reader)
}
}

impl<F: FnMut(u64)> spacetimedb_commitlog::Decoder for &mut Replay<F> {
type Record = txdata::Txdata<ProductValue>;
type Error = txdata::DecoderError<ReplayError>;

#[inline]
fn decode_record<'a, R: BufReader<'a>>(
&self,
version: u8,
tx_offset: u64,
reader: &mut R,
) -> std::result::Result<Self::Record, Self::Error> {
spacetimedb_commitlog::Decoder::decode_record(&**self, version, tx_offset, reader)
}
}

Expand Down Expand Up @@ -623,6 +649,7 @@ struct ReplayVisitor<'a, F> {
database_address: &'a Address,
committed_state: &'a mut CommittedState,
progress: &'a mut F,
connected_clients: &'a mut HashSet<(Identity, Address)>,
}

impl<F: FnMut(u64)> spacetimedb_commitlog::payload::txdata::Visitor for ReplayVisitor<'_, F> {
Expand Down Expand Up @@ -712,11 +739,34 @@ impl<F: FnMut(u64)> spacetimedb_commitlog::payload::txdata::Visitor for ReplayVi
Ok(())
}

fn visit_tx_end(&mut self) -> std::prelude::v1::Result<(), Self::Error> {
fn visit_tx_end(&mut self) -> std::result::Result<(), Self::Error> {
self.committed_state.next_tx_offset += 1;

Ok(())
}

fn visit_inputs(&mut self, inputs: &txdata::Inputs) -> std::result::Result<(), Self::Error> {
let decode_caller = || {
let buf = &mut inputs.reducer_args.as_ref();
let caller_identity: Identity = bsatn::from_reader(buf).context("Could not decode caller identity")?;
let caller_address: Address = bsatn::from_reader(buf).context("Could not decode caller address")?;
anyhow::Ok((caller_identity, caller_address))
};
if let Some(action) = inputs.reducer_name.strip_prefix("__identity_") {
let caller = decode_caller()?;
match action {
"connected__" => {
self.connected_clients.insert(caller);
}
"disconnected__" => {
self.connected_clients.remove(&caller);
}
_ => {}
}
}

Ok(())
}
}

#[cfg(test)]
Expand Down
43 changes: 32 additions & 11 deletions crates/core/src/db/relational_db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,15 @@ use fs2::FileExt;
use parking_lot::RwLock;
use spacetimedb_commitlog as commitlog;
use spacetimedb_durability::{self as durability, Durability};
use spacetimedb_lib::Identity;
use spacetimedb_primitives::*;
use spacetimedb_sats::db::auth::{StAccess, StTableType};
use spacetimedb_sats::db::def::{ColumnDef, IndexDef, SequenceDef, TableDef, TableSchema};
use spacetimedb_sats::{AlgebraicType, AlgebraicValue, ProductType, ProductValue};
use spacetimedb_table::indexes::RowPointer;
use spacetimedb_vm::errors::ErrorVm;
use std::borrow::Cow;
use std::collections::HashSet;
use std::fs::{create_dir_all, File};
use std::io;
use std::ops::RangeBounds;
Expand All @@ -49,6 +51,10 @@ type DiskSizeFn = Arc<dyn Fn() -> io::Result<u64> + Send + Sync>;

pub type Txdata = commitlog::payload::Txdata<ProductValue>;

/// Clients for which a connect reducer call was found in the [`History`], but
/// no corresponding disconnect.
pub type ConnectedClients = HashSet<(Identity, Address)>;

#[derive(Clone)]
pub struct RelationalDB {
// TODO(cloutiertyler): This should not be public
Expand All @@ -61,9 +67,12 @@ pub struct RelationalDB {
/// is `Some`, `None` otherwise.
disk_size_fn: Option<DiskSizeFn>,

// Release file lock last when dropping.
_lock: Arc<File>,
config: Arc<RwLock<DatabaseConfig>>,

// DO NOT ADD FIELDS AFTER THIS.
// By default, fields are dropped in declaration order.
// We want to release the file lock last.
_lock: Arc<File>,
}

impl std::fmt::Debug for RelationalDB {
Expand All @@ -84,7 +93,11 @@ impl RelationalDB {
///
/// The [`tokio::runtime::Handle`] is used to spawn background tasks which
/// take care of flushing and syncing the log.
pub fn local(root: impl AsRef<Path>, rt: tokio::runtime::Handle, address: Address) -> Result<Self, DBError> {
pub fn local(
root: impl AsRef<Path>,
rt: tokio::runtime::Handle,
address: Address,
) -> Result<(Self, ConnectedClients), DBError> {
let log_dir = root.as_ref().join("clog");
create_dir_all(&log_dir)?;
let durability = durability::Local::open(
Expand Down Expand Up @@ -140,10 +153,11 @@ impl RelationalDB {
row_count_fn,
disk_size_fn,

_lock: Arc::new(lock),
config: Arc::new(RwLock::new(DatabaseConfig::with_slow_query(
SlowQueryConfig::with_defaults(),
))),

_lock: Arc::new(lock),
})
}

Expand All @@ -154,7 +168,7 @@ impl RelationalDB {
/// of the database in case of an incomplete replay.
///
/// This restriction may be lifted in the future to allow for "live" followers.
pub fn apply<T>(self, history: T) -> Result<Self, DBError>
pub fn apply<T>(self, history: T) -> Result<(Self, ConnectedClients), DBError>
where
T: durability::History<TxData = Txdata>,
{
Expand Down Expand Up @@ -187,14 +201,16 @@ impl RelationalDB {
}
};

let mut replay = self.inner.replay(progress);
history
.fold_transactions_from(0, self.inner.replay(progress))
.fold_transactions_from(0, &mut replay)
.map_err(anyhow::Error::from)?;
log::info!("[{}] DATABASE: applied transaction history", self.address);
self.inner.rebuild_state_after_replay()?;
log::info!("[{}] DATABASE: rebuilt state after replay", self.address);
let connected_clients = replay.into_connected_clients();

Ok(self)
Ok((self, connected_clients))
}

/// Returns an approximate row count for a particular table.
Expand Down Expand Up @@ -313,6 +329,7 @@ impl RelationalDB {

log::trace!("COMMIT MUT TX");

// TODO: Never returns `None` -- should it?
let Some(tx_data) = self.inner.commit_mut_tx(ctx, tx)? else {
return Ok(None);
};
Expand Down Expand Up @@ -991,11 +1008,15 @@ pub mod tests_utils {
move || handle.size_on_disk()
});

let rdb = RelationalDB::open(root, Self::ADDRESS, Some((handle.clone(), disk_size_fn)))?
.apply(handle.clone())?
.with_row_count(Self::row_count_fn());
let (db, connected_clients) = {
let db = RelationalDB::open(root, Self::ADDRESS, Some((handle.clone(), disk_size_fn)))?;
db.apply(handle.clone())?
};
// TODO: Should we be able to handle the non-empty case?
// `RelationalDB` cannot exist on its own then.
debug_assert!(connected_clients.is_empty());

Ok((rdb, handle))
Ok((db.with_row_count(Self::row_count_fn()), handle))
}

// NOTE: This is important to make compiler tests work.
Expand Down
18 changes: 16 additions & 2 deletions crates/core/src/host/host_controller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use crate::host;
use crate::messages::control_db::HostType;
use crate::module_host_context::{ModuleCreationContext, ModuleHostContext};
use crate::util::spawn_rayon;
use anyhow::ensure;
use anyhow::{ensure, Context};
use futures::TryFutureExt;
use parking_lot::Mutex;
use serde::Serialize;
Expand Down Expand Up @@ -224,7 +224,7 @@ impl HostController {
///
/// In the `Err` case, `F` **MUST** roll back any modifications it has made
/// to the database passed in the [`ModuleHostContext`].
async fn setup_module_host<F, Fut, T>(&self, mhc: ModuleHostContext, f: F) -> anyhow::Result<T>
async fn setup_module_host<F, Fut, T>(&self, mut mhc: ModuleHostContext, f: F) -> anyhow::Result<T>
where
F: FnOnce(ModuleHost) -> Fut,
Fut: Future<Output = anyhow::Result<T>>,
Expand All @@ -242,6 +242,20 @@ impl HostController {
let module_host = spawn_rayon(move || Self::make_module_host(mhc.host_type, mcc)).await?;
module_host.start();

if let Some(dangling) = mhc.dangling_client_connections.take() {
for (identity, address) in dangling {
module_host
.call_identity_connected_disconnected(identity, address, false)
.await
.with_context(|| {
format!(
"Error calling disconnect for {} {} on {}",
identity, address, mhc.dbic.database.address
)
})?;
}
}

let res = f(module_host.clone())
.or_else(|e| async {
module_host.exit().await;
Expand Down
7 changes: 3 additions & 4 deletions crates/core/src/host/module_host.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use super::wasm_common::{CLIENT_CONNECTED_DUNDER, CLIENT_DISCONNECTED_DUNDER};
use super::{ArgsTuple, InvalidReducerArguments, ReducerArgs, ReducerCallResult, ReducerId, Timestamp};
use crate::client::{ClientActorId, ClientConnectionSender};
use crate::database_instance_context::DatabaseInstanceContext;
Expand Down Expand Up @@ -678,12 +679,10 @@ impl ModuleHost {
caller_address: Address,
connected: bool,
) -> Result<(), ReducerCallError> {
// TODO: DUNDER consts are in wasm_common, so seems weird to use them
// here. But maybe there should be dunders for this?
let reducer_name = if connected {
"__identity_connected__"
CLIENT_CONNECTED_DUNDER
} else {
"__identity_disconnected__"
CLIENT_DISCONNECTED_DUNDER
};

self.call_reducer_inner(
Expand Down
4 changes: 4 additions & 0 deletions crates/core/src/host/wasm_common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,10 @@ pub const SETUP_DUNDER: &str = "__setup__";
pub const INIT_DUNDER: &str = "__init__";
/// the reducer with this name is invoked when updating the database
pub const UPDATE_DUNDER: &str = "__update__";
/// The reducer with this name is invoked when a client connects.
pub const CLIENT_CONNECTED_DUNDER: &str = "__identity_connected__";
/// The reducer with this name is invoked when a client disconnects.
pub const CLIENT_DISCONNECTED_DUNDER: &str = "__identity_disconnected__";

#[derive(Debug, Copy, Clone, PartialEq, Eq)]
#[allow(unused)]
Expand Down
2 changes: 2 additions & 0 deletions crates/core/src/module_host_context.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use spacetimedb_lib::Hash;

use crate::database_instance_context::DatabaseInstanceContext;
use crate::db::relational_db::ConnectedClients;
use crate::energy::EnergyMonitor;
use crate::host::scheduler::{Scheduler, SchedulerStarter};
use crate::messages::control_db::HostType;
Expand All @@ -13,6 +14,7 @@ pub struct ModuleHostContext {
pub scheduler_starter: SchedulerStarter,
pub host_type: HostType,
pub program_bytes: AnyBytes,
pub dangling_client_connections: Option<ConnectedClients>,
}

pub struct ModuleCreationContext {
Expand Down

0 comments on commit 31b332d

Please sign in to comment.