Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Zookeeper ha #3971

Draft
wants to merge 71 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
71 commits
Select commit Hold shift + click to select a range
2ce8b42
REMOVE: add docker compôse for tests
ManyTheFish Aug 2, 2023
dc38da9
WIP
ManyTheFish Aug 2, 2023
97e3dfd
makes zk available inside the auth-controller with config coming from…
irevoire Aug 2, 2023
84d56f3
send the creation of api-key to zookeeper
irevoire Aug 2, 2023
49f976c
fix analitics compilation
ManyTheFish Aug 2, 2023
3eb6f4b
Create api keys
ManyTheFish Aug 2, 2023
b0ff595
Event Listener: delete local key if deleted on ZK
ManyTheFish Aug 2, 2023
0cd8157
Forward the keys update to zookeeper
irevoire Aug 3, 2023
a325ddf
Forward the key deletions to zookeeper
irevoire Aug 3, 2023
57dc4b1
implement the watcher for all kind of operations
irevoire Aug 3, 2023
fe7a312
Import the already existing api keys on startup
irevoire Aug 3, 2023
d5523cc
fix the tests
irevoire Aug 3, 2023
5ce01bc
add logs
irevoire Aug 3, 2023
ad7f8ed
fix auto-synchronization with zk
ManyTheFish Aug 3, 2023
3d46e84
update docker compose as an example
ManyTheFish Aug 3, 2023
b311089
Update zookeeper client
ManyTheFish Aug 7, 2023
b2f36b9
Comment Meilisearch container by default
ManyTheFish Aug 7, 2023
b66bf04
Create a task on zookeeper side when a task is created locally
ManyTheFish Aug 7, 2023
0d20d08
fix a few warnings
irevoire Aug 8, 2023
1191ec5
fix the register task watcher
irevoire Aug 8, 2023
8e437ed
Start leader election and task processing (WIP)
ManyTheFish Aug 9, 2023
8c20d6e
fix the leader election
irevoire Aug 9, 2023
f0c4d36
implement the deletion of tasks after processing a batch
irevoire Aug 9, 2023
61ccfaf
wake up after registering a task
irevoire Aug 10, 2023
777eebb
starts creating snapshot, the import is still missing
irevoire Aug 10, 2023
854745c
wip: starts working on importing the snapshots
irevoire Aug 16, 2023
0c7d7c6
WIP moving to the sync zookeeper API
Kerollmops Aug 29, 2023
c488a4a
Fixup a lot of small issues on the ZK config
Kerollmops Aug 30, 2023
2d1434d
Keep the ZK flow when enqueuing tasks
Kerollmops Aug 30, 2023
8c3ad57
React to changes towards the cluster members
Kerollmops Aug 30, 2023
9dd4423
Fix the watcher ordering of the auth/ node
Kerollmops Aug 30, 2023
e257710
WIP fix the tests
Kerollmops Aug 30, 2023
95a011a
Wrap the IndexScheduler fields into an inner struct
Kerollmops Aug 31, 2023
d7233ec
Make things to compile again
Kerollmops Aug 31, 2023
0c68b9e
WIP making the final snapshot swap
Kerollmops Aug 31, 2023
966cbda
make the tests compile again
irevoire Sep 4, 2023
76657af
Add the options into the IndexScheduler
Kerollmops Sep 4, 2023
7d85753
Make the snapshot download work
Kerollmops Sep 4, 2023
41697c4
Introduce the zk-tasks folder
Kerollmops Sep 4, 2023
5b89276
starts using s3
irevoire Sep 5, 2023
01c13c9
Mastering minio
Kerollmops Sep 6, 2023
719fdd7
Fix and crash when the tasks path is unknown
Kerollmops Sep 7, 2023
a53a0fd
Store content files into the S3
Kerollmops Sep 11, 2023
b7109c0
start a script to run everything
irevoire Sep 12, 2023
c158d03
Fix internal error
Kerollmops Sep 12, 2023
f544cfa
Remove tasks and content file on the s3
Kerollmops Sep 12, 2023
f37fdce
Use slashes instead of dots for the s3 paths separators
Kerollmops Sep 12, 2023
9b01506
Move the load snapshot step into a function
Kerollmops Sep 12, 2023
309c33a
Fix again the dots
Kerollmops Sep 12, 2023
8a2e8a8
Load the latest snapshot when we start the engine
Kerollmops Sep 12, 2023
ecd36b1
exposes all the s3 arguments
irevoire Sep 13, 2023
c71ba72
fix build in release mode
irevoire Sep 21, 2023
6325cda
bump charabia
irevoire Sep 21, 2023
98b67f2
move to our new S3 lib
irevoire Sep 28, 2023
dfb84f8
bump strois version
irevoire Oct 10, 2023
b22f126
bump strois to a temporary version while we wait for rusty_s3 to do a…
irevoire Oct 13, 2023
b9983a4
No more create the S3 Bucket
Kerollmops Oct 11, 2023
8f04353
bump strois to a temporary version while we wait for rusty_s3 to do a…
Kerollmops Oct 17, 2023
c573261
WIP: start updating the zookeeper client => leader election is missing
irevoire Oct 31, 2023
c3a8d4b
ICE
irevoire Oct 31, 2023
03b5109
ICE: use a git version of my crate so rust team can pull the repo
irevoire Oct 31, 2023
3661441
WIT: it compiles but the processing of tasks and loading of snapshots…
irevoire Nov 2, 2023
0c18962
it compiles, now tokio is complaining that we block the main thread (…
irevoire Nov 2, 2023
d0a3582
no deadlock on start
irevoire Nov 2, 2023
c5ec817
WIP fix a lot of bugs: The follower do not wake up when a new snapsho…
irevoire Nov 2, 2023
d1bc7ec
fix all the bugs on the snapshots export/import pipeline
irevoire Nov 6, 2023
41178e5
fix the leader election
irevoire Nov 6, 2023
c48f72e
fix a few warnings
irevoire Nov 6, 2023
1585b3e
re-implement the snapshot import at startup
irevoire Nov 7, 2023
e4adf7f
fix the path of the deps
irevoire Nov 7, 2023
a6863a7
use the zookeeper-client-sync from meilisearch instead of tamo
irevoire Nov 8, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
411 changes: 326 additions & 85 deletions Cargo.lock

Large diffs are not rendered by default.

22 changes: 2 additions & 20 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -37,23 +37,5 @@ opt-level = 3
[profile.dev.package.roaring]
opt-level = 3

[profile.dev.package.lindera-ipadic-builder]
opt-level = 3
[profile.dev.package.encoding]
opt-level = 3
[profile.dev.package.yada]
opt-level = 3

[profile.release.package.lindera-ipadic-builder]
opt-level = 3
[profile.release.package.encoding]
opt-level = 3
[profile.release.package.yada]
opt-level = 3

[profile.bench.package.lindera-ipadic-builder]
opt-level = 3
[profile.bench.package.encoding]
opt-level = 3
[profile.bench.package.yada]
opt-level = 3
[patch.crates-io]
strois = { git = "https://github.com/meilisearch/strois", rev = "eeb945c" }
59 changes: 59 additions & 0 deletions docker-compose.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
version: "3.9"
services:
zk1:
container_name: zk1
hostname: zk1
image: bitnami/zookeeper:3.7.1
ports:
- 21811:2181
environment:
- ALLOW_ANONYMOUS_LOGIN=yes
- ZOO_SERVER_ID=1
- ZOO_SERVERS=0.0.0.0:2888:3888,zk2:2888:3888,zk3:2888:3888
zk2:
container_name: zk2
hostname: zk2
image: bitnami/zookeeper:3.7.1
ports:
- 21812:2181
environment:
- ALLOW_ANONYMOUS_LOGIN=yes
- ZOO_SERVER_ID=2
- ZOO_SERVERS=zk1:2888:3888,0.0.0.0:2888:3888,zk3:2888:3888
zk3:
container_name: zk3
hostname: zk3
image: bitnami/zookeeper:3.7.1
ports:
- 21813:2181
environment:
- ALLOW_ANONYMOUS_LOGIN=yes
- ZOO_SERVER_ID=3
- ZOO_SERVERS=zk1:2888:3888,zk2:2888:3888,0.0.0.0:2888:3888
zoonavigator:
container_name: zoonavigator
image: elkozmon/zoonavigator
ports:
- 9000:9000

# Meilisearch instances
# m1:
# container_name: m1
# hostname: m1
# image: getmeili/meilisearch:prototype-zookeeper-ha-0
# ports:
# - 7700:7700
# environment:
# - MEILI_ZK_URL=zk1:2181
# - MEILI_MASTER_KEY=masterkey
# restart: always
# m2:
# container_name: m2
# hostname: m2
# image: getmeili/meilisearch:prototype-zookeeper-ha-0
# ports:
# - 7701:7700
# environment:
# - MEILI_ZK_URL=zk2:2181
# - MEILI_MASTER_KEY=masterkey
# restart: always
28 changes: 14 additions & 14 deletions file-store/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,20 +22,6 @@ pub enum Error {

pub type Result<T> = std::result::Result<T, Error>;

impl Deref for File {
type Target = NamedTempFile;

fn deref(&self) -> &Self::Target {
&self.file
}
}

impl DerefMut for File {
fn deref_mut(&mut self) -> &mut Self::Target {
&mut self.file
}
}

#[derive(Clone, Debug)]
pub struct FileStore {
path: PathBuf,
Expand Down Expand Up @@ -146,6 +132,20 @@ impl File {
}
}

impl Deref for File {
type Target = NamedTempFile;

fn deref(&self) -> &Self::Target {
&self.file
}
}

impl DerefMut for File {
fn deref_mut(&mut self) -> &mut Self::Target {
&mut self.file
}
}

#[cfg(test)]
mod test {
use std::io::Write;
Expand Down
61 changes: 61 additions & 0 deletions ha_test/run.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
#!/bin/bash

function is_everything_installed {
everything_ok=yes

if hash zkli 2>/dev/null; then
echo "✅ zkli installed"
else
everything_ok=no
echo "🥺 zkli is missing, please run \`cargo install zkli\`"
fi

if hash s3cmd 2>/dev/null; then
echo "✅ s3cmd installed"
else
everything_ok=no
echo "🥺 s3cmd is missing, see how to install it here https://s3tools.org/s3cmd"
fi

if [ $everything_ok = "no" ]; then
echo "Exiting..."
exit 1
fi
}

# param: addr of zookeeper
function connect_to_zookeeper {
if ! zkli -a "$1" ls > /dev/null; then
echo "zkli can't connect"
return 1
fi
}

# param: addr of the s3 bucket
function connect_to_s3 {
# S3_SECRET_KEY
# S3_ACCESS_KEY
# S3_HOST
# S3_BUCKET

s3cmd --host="$S3_HOST" --host-bucket="$S3_BUCKET" --access_key="$ACCESS_KEY" --secret_key="$S3_SECRET_KEY" ls

if $?; then
echo "s3cmd can't connect"
return 1
fi
}

is_everything_installed

ZOOKEEPER_ADDR="localhost:2181"
if ! connect_to_zookeeper $ZOOKEEPER_ADDR; then
ZOOKEEPER_ADDR="localhost:21811"
if ! connect_to_zookeeper $ZOOKEEPER_ADDR; then
echo "Can't connect to zkli"
exit 1
fi
fi


connect_to_s3
4 changes: 4 additions & 0 deletions index-scheduler/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,10 @@ tempfile = "3.5.0"
thiserror = "1.0.40"
time = { version = "0.3.20", features = ["serde-well-known", "formatting", "parsing", "macros"] }
uuid = { version = "1.3.1", features = ["serde", "v4"] }
tokio = { version = "1.27.0", features = ["full"] }
zookeeper-client-sync = { git = "https://github.com/meilisearch/zookeeper-client-sync.git" }
parking_lot = "0.12.1"
strois = "0.0.4"

[dev-dependencies]
big_s = "1.0.2"
Expand Down
38 changes: 33 additions & 5 deletions index-scheduler/src/batch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ use uuid::Uuid;

use crate::autobatcher::{self, BatchKind};
use crate::utils::{self, swap_index_uid_in_task};
use crate::{Error, IndexScheduler, ProcessingTasks, Result, TaskId};
use crate::{Error, IndexSchedulerInner, ProcessingTasks, Result, TaskId};

/// Represents a combination of tasks that can all be processed at the same time.
///
Expand Down Expand Up @@ -198,6 +198,35 @@ impl Batch {
| IndexDocumentDeletionByFilter { index_uid, .. } => Some(index_uid),
}
}

/// Return the content fields uuids associated with this batch.
pub fn content_uuids(&self) -> Vec<Uuid> {
match self {
Batch::TaskCancelation { .. }
| Batch::TaskDeletion(_)
| Batch::Dump(_)
| Batch::IndexCreation { .. }
| Batch::IndexDocumentDeletionByFilter { .. }
| Batch::IndexUpdate { .. }
| Batch::SnapshotCreation(_)
| Batch::IndexDeletion { .. }
| Batch::IndexSwap { .. } => vec![],
Batch::IndexOperation { op, .. } => match op {
IndexOperation::DocumentOperation { operations, .. } => operations
.iter()
.flat_map(|op| match op {
DocumentOperation::Add(uuid) => Some(*uuid),
DocumentOperation::Delete(_) => None,
})
.collect(),
IndexOperation::DocumentDeletion { .. }
| IndexOperation::Settings { .. }
| IndexOperation::DocumentClear { .. }
| IndexOperation::SettingsAndDocumentOperation { .. }
| IndexOperation::DocumentClearAndSetting { .. } => vec![],
},
}
}
}

impl IndexOperation {
Expand All @@ -213,7 +242,7 @@ impl IndexOperation {
}
}

impl IndexScheduler {
impl IndexSchedulerInner {
/// Convert an [`BatchKind`](crate::autobatcher::BatchKind) into a [`Batch`].
///
/// ## Arguments
Expand Down Expand Up @@ -480,8 +509,7 @@ impl IndexScheduler {
if let Some(task_id) = to_cancel.max() {
// We retrieve the tasks that were processing before this tasks cancelation started.
// We must *not* reset the processing tasks before calling this method.
let ProcessingTasks { started_at, processing } =
&*self.processing_tasks.read().unwrap();
let ProcessingTasks { started_at, processing, .. } = &*self.processing_tasks.read();
return Ok(Some(Batch::TaskCancelation {
task: self.get_task(rtxn, task_id)?.ok_or(Error::CorruptedTaskQueue)?,
previous_started_at: *started_at,
Expand Down Expand Up @@ -1392,7 +1420,7 @@ impl IndexScheduler {
fn delete_matched_tasks(&self, wtxn: &mut RwTxn, matched_tasks: &RoaringBitmap) -> Result<u64> {
// 1. Remove from this list the tasks that we are not allowed to delete
let enqueued_tasks = self.get_status(wtxn, Status::Enqueued)?;
let processing_tasks = &self.processing_tasks.read().unwrap().processing.clone();
let processing_tasks = &self.processing_tasks.read().processing.clone();

let all_task_ids = self.all_task_ids(wtxn)?;
let mut to_delete_tasks = all_task_ids & matched_tasks;
Expand Down
8 changes: 7 additions & 1 deletion index-scheduler/src/index_mapper/index_map.rs
Original file line number Diff line number Diff line change
Expand Up @@ -295,6 +295,11 @@ impl IndexMap {
"Attempt to finish deletion of an index that was being closed"
);
}

/// Returns the indexes that were opened by the `IndexMap`.
pub fn clear(&mut self) -> Vec<Index> {
self.available.clear().into_iter().map(|(_, (_, index))| index).collect()
}
}

/// Create or open an index in the specified path.
Expand Down Expand Up @@ -335,7 +340,8 @@ mod tests {
impl IndexMapper {
fn test() -> (Self, Env, IndexSchedulerHandle) {
let (index_scheduler, handle) = IndexScheduler::test(true, vec![]);
(index_scheduler.index_mapper, index_scheduler.env, handle)
let index_scheduler = index_scheduler.inner();
(index_scheduler.index_mapper.clone(), index_scheduler.env.clone(), handle)
}
}

Expand Down
11 changes: 8 additions & 3 deletions index-scheduler/src/index_mapper/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ pub struct IndexMapper {
pub(crate) index_stats: Database<UuidCodec, SerdeJson<IndexStats>>,

/// Path to the folder where the LMDB environments of each index are.
base_path: PathBuf,
pub(crate) base_path: PathBuf,
/// The map size an index is opened with on the first time.
index_base_map_size: usize,
/// The quantity by which the map size of an index is incremented upon reopening, in bytes.
Expand Down Expand Up @@ -135,7 +135,7 @@ impl IndexMapper {
index_growth_amount: usize,
index_count: usize,
enable_mdb_writemap: bool,
indexer_config: IndexerConfig,
indexer_config: Arc<IndexerConfig>,
) -> Result<Self> {
let mut wtxn = env.write_txn()?;
let index_mapping = env.create_database(&mut wtxn, Some(INDEX_MAPPING))?;
Expand All @@ -150,7 +150,7 @@ impl IndexMapper {
index_base_map_size,
index_growth_amount,
enable_mdb_writemap,
indexer_config: Arc::new(indexer_config),
indexer_config,
})
}

Expand Down Expand Up @@ -428,6 +428,11 @@ impl IndexMapper {
Ok(())
}

/// Returns the indexes that were opened by the `IndexMapper`.
pub fn clear(&mut self) -> Vec<Index> {
self.index_map.write().unwrap().clear()
}

/// The stats of an index.
///
/// If available in the cache, they are directly returned.
Expand Down
12 changes: 8 additions & 4 deletions index-scheduler/src/insta_snapshot.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use std::collections::BTreeSet;
use std::fmt::Write;
use std::ops::Deref;

use meilisearch_types::heed::types::{OwnedType, SerdeBincode, SerdeJson, Str};
use meilisearch_types::heed::{Database, RoTxn};
Expand All @@ -8,12 +9,13 @@ use meilisearch_types::tasks::{Details, Task};
use roaring::RoaringBitmap;

use crate::index_mapper::IndexMapper;
use crate::{IndexScheduler, Kind, Status, BEI128};
use crate::{IndexScheduler, IndexSchedulerInner, Kind, Status, BEI128};

pub fn snapshot_index_scheduler(scheduler: &IndexScheduler) -> String {
scheduler.assert_internally_consistent();

let IndexScheduler {
let inner = scheduler.inner();
let IndexSchedulerInner {
autobatching_enabled,
must_stop_processing: _,
processing_tasks,
Expand All @@ -38,13 +40,15 @@ pub fn snapshot_index_scheduler(scheduler: &IndexScheduler) -> String {
test_breakpoint_sdr: _,
planned_failures: _,
run_loop_iteration: _,
} = scheduler;
zookeeper: _,
options: _,
} = inner.deref();

let rtxn = env.read_txn().unwrap();

let mut snap = String::new();

let processing_tasks = processing_tasks.read().unwrap().processing.clone();
let processing_tasks = processing_tasks.read().processing.clone();
snap.push_str(&format!("### Autobatching Enabled = {autobatching_enabled}\n"));
snap.push_str("### Processing Tasks:\n");
snap.push_str(&snapshot_bitmap(&processing_tasks));
Expand Down