Skip to content

Commit

Permalink
WIP moving to the sync zookeeper API
Browse files Browse the repository at this point in the history
  • Loading branch information
Kerollmops committed Aug 29, 2023
1 parent 854745c commit 61e537f
Show file tree
Hide file tree
Showing 10 changed files with 512 additions and 598 deletions.
411 changes: 211 additions & 200 deletions Cargo.lock

Large diffs are not rendered by default.

21 changes: 0 additions & 21 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -36,24 +36,3 @@ opt-level = 3
opt-level = 3
[profile.dev.package.roaring]
opt-level = 3

[profile.dev.package.lindera-ipadic-builder]
opt-level = 3
[profile.dev.package.encoding]
opt-level = 3
[profile.dev.package.yada]
opt-level = 3

[profile.release.package.lindera-ipadic-builder]
opt-level = 3
[profile.release.package.encoding]
opt-level = 3
[profile.release.package.yada]
opt-level = 3

[profile.bench.package.lindera-ipadic-builder]
opt-level = 3
[profile.bench.package.encoding]
opt-level = 3
[profile.bench.package.yada]
opt-level = 3
2 changes: 1 addition & 1 deletion index-scheduler/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ thiserror = "1.0.40"
time = { version = "0.3.20", features = ["serde-well-known", "formatting", "parsing", "macros"] }
uuid = { version = "1.3.1", features = ["serde", "v4"] }
tokio = { version = "1.27.0", features = ["full"] }
zookeeper-client = "0.5.0"
zookeeper = "0.8.0"

[dev-dependencies]
big_s = "1.0.2"
Expand Down
482 changes: 206 additions & 276 deletions index-scheduler/src/lib.rs

Large diffs are not rendered by default.

3 changes: 1 addition & 2 deletions meilisearch-auth/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,5 @@ serde_json = { version = "1.0.95", features = ["preserve_order"] }
sha2 = "0.10.6"
thiserror = "1.0.40"
time = { version = "0.3.20", features = ["serde-well-known", "formatting", "parsing", "macros"] }
tokio = { version = "1.27.0", features = ["full"] }
uuid = { version = "1.3.1", features = ["serde", "v4"] }
zookeeper-client = "0.5.0"
zookeeper = "0.8.0"
4 changes: 1 addition & 3 deletions meilisearch-auth/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ use std::error::Error;

use meilisearch_types::error::{Code, ErrorCode};
use meilisearch_types::internal_error;
use zookeeper_client as zk;

pub type Result<T> = std::result::Result<T, AuthControllerError>;

Expand All @@ -20,8 +19,7 @@ internal_error!(
AuthControllerError: meilisearch_types::milli::heed::Error,
std::io::Error,
serde_json::Error,
tokio::task::JoinError,
zk::Error,
zookeeper::ZkError,
std::str::Utf8Error
);

Expand Down
157 changes: 77 additions & 80 deletions meilisearch-auth/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,71 +16,69 @@ pub use store::open_auth_store_env;
use store::{generate_key_as_hexa, HeedAuthStore};
use time::OffsetDateTime;
use uuid::Uuid;
use zookeeper_client as zk;
use zookeeper::{
Acl, AddWatchMode, CreateMode, WatchedEvent, WatchedEventType, ZkError, ZooKeeper,
};

#[derive(Clone)]
pub struct AuthController {
store: Arc<HeedAuthStore>,
master_key: Option<String>,
zk: Option<zk::Client>,
zookeeper: Option<Arc<ZooKeeper>>,
}

impl AuthController {
pub async fn new(
pub fn new(
db_path: impl AsRef<Path>,
master_key: &Option<String>,
zk: Option<zk::Client>,
zookeeper: Option<Arc<ZooKeeper>>,
) -> Result<Self> {
let store = HeedAuthStore::new(db_path)?;
let controller = Self { store: Arc::new(store), master_key: master_key.clone(), zk };
let controller = Self { store: Arc::new(store), master_key: master_key.clone(), zookeeper };

match controller.zk {
match controller.zookeeper {
// setup the auth zk environment, the `auth` node
Some(ref zk) => {
let options =
zk::CreateMode::Persistent.with_acls(zk::Acls::anyone_all());
Some(ref zookeeper) => {
// TODO: we should catch the potential unexpected errors here https://docs.rs/zookeeper-client/latest/zookeeper_client/struct.Client.html#method.create
// for the moment we consider that `create` only returns Error::NodeExists.
match zk.create("/auth", &[], &options).await {
match zookeeper.create(
"/auth",
vec![],
Acl::open_unsafe().clone(),
CreateMode::Persistent,
) {
// If the store is empty, we must generate and push the default api-keys.
Ok(_) => generate_default_keys(&controller).await?,
Ok(_) => generate_default_keys(&controller)?,
// If the node exist we should clear our DB and download all the existing api-keys
Err(zk::Error::NodeExists) => {
Err(ZkError::NodeExists) => {
log::warn!("Auth directory already exists, we need to clear our keys + import the one in zookeeper");

let store = controller.store.clone();
tokio::task::spawn_blocking(move || store.delete_all_keys()).await??;
let children = zk
.list_children("/auth")
.await
store.delete_all_keys()?;
let children = zookeeper
.get_children("/auth", false)
.expect("Internal, the auth directory was deleted during execution.");

log::info!("Importing {} api-keys", children.len());
for path in children {
log::info!(" Importing {}", path);
match zk.get_data(&format!("/auth/{}", &path)).await {
match zookeeper.get_data(&format!("/auth/{}", &path), false) {
Ok((key, _stat)) => {
let key = serde_json::from_slice(&key).unwrap();
let store = controller.store.clone();
tokio::task::spawn_blocking(move || store.put_api_key(key))
.await??;

},
Err(e) => panic!("{e}")
let key = serde_json::from_slice(&key).unwrap();
let store = controller.store.clone();
store.put_api_key(key)?;
}
Err(e) => panic!("{e}"),
}
// else the file was deleted while we were inserting the key. We ignore it.
// TODO: What happens if someone updates the files before we have the time
// to setup the watcher
}
}
e @ Err(
zk::Error::NoNode
| zk::Error::NoChildrenForEphemerals
| zk::Error::InvalidAcl,
ZkError::NoNode | ZkError::NoChildrenForEphemerals | ZkError::InvalidACL,
) => unreachable!("{e:?}"),
Err(e) => {
panic!("{e}")
}
Err(e) => panic!("{e}"),
}
// TODO: Race condition above:
// What happens if two node join exactly at the same moment:
Expand All @@ -91,39 +89,34 @@ impl AuthController {

// Zookeeper Event listener loop
let controller_clone = controller.clone();
let mut watcher = zk.watch("/auth", zk::AddWatchMode::PersistentRecursive).await?;
let czk = zk.clone();
tokio::spawn(async move {
let zk = czk;
loop {
let zk::WatchedEvent { event_type, session_state, path } =
dbg!(watcher.changed().await);

match event_type {
zk::EventType::Session => panic!("Session error {:?}", session_state),
// a key is deleted from zk
zk::EventType::NodeDeleted => {
// TODO: ugly unwraps
let uuid = path.strip_prefix("/auth/").unwrap();
let uuid = Uuid::parse_str(&uuid).unwrap();
log::info!("The key {} has been deleted", uuid);
dbg!(controller_clone.store.delete_api_key(uuid).unwrap());
}
zk::EventType::NodeCreated | zk::EventType::NodeDataChanged => {
let (key, _stat) = zk.get_data(&path).await.unwrap();
let key: Key = serde_json::from_slice(&key).unwrap();
log::info!("The key {} has been deleted", key.uid);

dbg!(controller_clone.store.put_api_key(key).unwrap());
}
zk::EventType::NodeChildrenChanged => panic!("Got the unexpected NodeChildrenChanged event, what is it used for?"),
let zkk = zookeeper.clone();
zookeeper.add_watch("/auth", AddWatchMode::PersistentRecursive, move |event| {
let WatchedEvent { event_type, path, keeper_state: _ } = dbg!(event);

match event_type {
WatchedEventType::NodeDeleted => {
// TODO: ugly unwraps
let path = path.unwrap();
let uuid = path.strip_prefix("/auth/").unwrap();
let uuid = Uuid::parse_str(&uuid).unwrap();
log::info!("The key {} has been deleted", uuid);
dbg!(controller_clone.store.delete_api_key(uuid).unwrap());
}
WatchedEventType::NodeCreated | WatchedEventType::NodeDataChanged => {
let path = path.unwrap();
let (key, _stat) = zkk.get_data(&path, false).unwrap();
let key: Key = serde_json::from_slice(&key).unwrap();
log::info!("The key {} has been deleted", key.uid);

dbg!(controller_clone.store.put_api_key(key).unwrap());
}
otherwise => panic!("Got the unexpected `{otherwise:?}` event!"),
}
});
})?;
}
None => {
if controller.store.is_empty()? {
generate_default_keys(&controller).await?;
generate_default_keys(&controller)?;
}
}
}
Expand All @@ -147,27 +140,29 @@ impl AuthController {
self.store.used_size()
}

pub async fn create_key(&self, create_key: CreateApiKey) -> Result<Key> {
pub fn create_key(&self, create_key: CreateApiKey) -> Result<Key> {
match self.store.get_api_key(create_key.uid)? {
Some(_) => Err(AuthControllerError::ApiKeyAlreadyExists(create_key.uid.to_string())),
None => self.put_key(create_key.to_key()).await,
None => self.put_key(create_key.to_key()),
}
}

pub async fn put_key(&self, key: Key) -> Result<Key> {
pub fn put_key(&self, key: Key) -> Result<Key> {
let store = self.store.clone();
// TODO: we may commit only after zk persisted the keys
let key = tokio::task::spawn_blocking(move || store.put_api_key(key)).await??;
if let Some(ref zk) = self.zk {
let options = zk::CreateMode::Persistent.with_acls(zk::Acls::anyone_all());

zk.create(&format!("/auth/{}", key.uid), &serde_json::to_vec_pretty(&key)?, &options)
.await?;
let key = store.put_api_key(key)?;
if let Some(zookeeper) = &self.zookeeper {
zookeeper.create(
&format!("/auth/{}", key.uid),
serde_json::to_vec_pretty(&key)?,
Acl::open_unsafe().clone(),
CreateMode::Persistent,
)?;
}
Ok(key)
}

pub async fn update_key(&self, uid: Uuid, patch: PatchApiKey) -> Result<Key> {
pub fn update_key(&self, uid: Uuid, patch: PatchApiKey) -> Result<Key> {
let mut key = self.get_key(uid)?;
match patch.description {
Setting::NotSet => (),
Expand All @@ -180,10 +175,13 @@ impl AuthController {
key.updated_at = OffsetDateTime::now_utc();
let store = self.store.clone();
// TODO: we may commit only after zk persisted the keys
let key = tokio::task::spawn_blocking(move || store.put_api_key(key)).await??;
if let Some(ref zk) = self.zk {
zk.set_data(&format!("/auth/{}", key.uid), &serde_json::to_vec_pretty(&key)?, None)
.await?;
let key = store.put_api_key(key)?;
if let Some(zookeeper) = &self.zookeeper {
zookeeper.set_data(
&format!("/auth/{}", key.uid),
serde_json::to_vec_pretty(&key)?,
None,
)?;
}
Ok(key)
}
Expand Down Expand Up @@ -226,12 +224,12 @@ impl AuthController {
self.store.list_api_keys()
}

pub async fn delete_key(&self, uid: Uuid) -> Result<()> {
pub fn delete_key(&self, uid: Uuid) -> Result<()> {
let store = self.store.clone();
let deleted = tokio::task::spawn_blocking(move || store.delete_api_key(uid)).await??;
let deleted = store.delete_api_key(uid)?;
if deleted {
if let Some(ref zk) = self.zk {
zk.delete(&format!("/auth/{}", uid), None).await?;
if let Some(zookeeper) = &self.zookeeper {
zookeeper.delete(&format!("/auth/{}", uid), None)?;
}
Ok(())
} else {
Expand Down Expand Up @@ -426,10 +424,9 @@ pub struct IndexSearchRules {
pub filter: Option<serde_json::Value>,
}

async fn generate_default_keys(controller: &AuthController) -> Result<()> {
controller.put_key(Key::default_admin()).await?;
controller.put_key(Key::default_search()).await?;

fn generate_default_keys(controller: &AuthController) -> Result<()> {
controller.put_key(Key::default_admin())?;
controller.put_key(Key::default_search())?;
Ok(())
}

Expand Down
2 changes: 1 addition & 1 deletion meilisearch/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ walkdir = "2.3.3"
yaup = "0.2.1"
serde_urlencoded = "0.7.1"
termcolor = "1.2.0"
zookeeper-client = "0.5.0"
zookeeper = "0.8.0"

[dev-dependencies]
actix-rt = "2.8.0"
Expand Down
17 changes: 8 additions & 9 deletions meilisearch/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ use meilisearch_types::versioning::{check_version_file, create_version_file};
use meilisearch_types::{compression, milli, VERSION_FILE_NAME};
pub use option::Opt;
use option::ScheduleSnapshot;
use zookeeper_client as zk;
use zookeeper::ZooKeeper;

use crate::error::MeilisearchHttpError;

Expand Down Expand Up @@ -219,14 +219,14 @@ pub async fn setup_meilisearch(
}

/// Try to start the IndexScheduler and AuthController without checking the VERSION file or anything.
async fn open_or_create_database_unchecked(
fn open_or_create_database_unchecked(
opt: &Opt,
on_failure: OnFailure,
zk: Option<zk::Client>,
zookeeper: Option<Arc<ZooKeeper>>,
) -> anyhow::Result<(IndexScheduler, AuthController)> {
// we don't want to create anything in the data.ms yet, thus we
// wrap our two builders in a closure that'll be executed later.
let auth_controller = AuthController::new(&opt.db_path, &opt.master_key, zk.clone());
let auth_controller = AuthController::new(&opt.db_path, &opt.master_key, zookeeper.clone());
let instance_features = opt.to_instance_features();
let index_scheduler = IndexScheduler::new(IndexSchedulerOptions {
version_file_path: opt.db_path.join(VERSION_FILE_NAME),
Expand All @@ -245,14 +245,13 @@ async fn open_or_create_database_unchecked(
index_growth_amount: byte_unit::Byte::from_str("10GiB").unwrap().get_bytes() as usize,
index_count: DEFAULT_INDEX_COUNT,
instance_features,
zk: zk.clone(),
zookeeper: zookeeper.clone(),
})
.await
.map_err(anyhow::Error::from);

match (
index_scheduler,
auth_controller.await.map_err(anyhow::Error::from),
auth_controller.map_err(anyhow::Error::from),
create_version_file(&opt.db_path).map_err(anyhow::Error::from),
) {
(Ok(i), Ok(a), Ok(())) => Ok((i, a)),
Expand All @@ -269,13 +268,13 @@ async fn open_or_create_database_unchecked(
async fn open_or_create_database(
opt: &Opt,
empty_db: bool,
zk: Option<zk::Client>,
zookeeper: Option<Arc<ZooKeeper>>,
) -> anyhow::Result<(IndexScheduler, AuthController)> {
if !empty_db {
check_version_file(&opt.db_path)?;
}

open_or_create_database_unchecked(opt, OnFailure::KeepDb, zk).await
open_or_create_database_unchecked(opt, OnFailure::KeepDb, zookeeper).await
}

fn import_dump(
Expand Down

0 comments on commit 61e537f

Please sign in to comment.