Skip to content

Commit

Permalink
implement the deletion of tasks after processing a batch
Browse files Browse the repository at this point in the history
add a lot of comments and logs
  • Loading branch information
irevoire committed Aug 10, 2023
1 parent 8c20d6e commit f0c4d36
Show file tree
Hide file tree
Showing 2 changed files with 91 additions and 36 deletions.
2 changes: 1 addition & 1 deletion index-scheduler/src/batch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -480,7 +480,7 @@ impl IndexScheduler {
if let Some(task_id) = to_cancel.max() {
// We retrieve the tasks that were processing before this tasks cancelation started.
// We must *not* reset the processing tasks before calling this method.
let ProcessingTasks { started_at, processing } =
let ProcessingTasks { started_at, processing, .. } =
&*self.processing_tasks.read().unwrap();
return Ok(Some(Batch::TaskCancelation {
task: self.get_task(rtxn, task_id)?.ok_or(Error::CorruptedTaskQueue)?,
Expand Down
125 changes: 90 additions & 35 deletions index-scheduler/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -153,23 +153,34 @@ struct ProcessingTasks {
started_at: OffsetDateTime,
/// The list of tasks ids that are currently running.
processing: RoaringBitmap,
/// The list of tasks ids that were processed in the last batch
processed_previously: RoaringBitmap,
}

impl ProcessingTasks {
/// Creates an empty `ProcessingAt` struct.
fn new() -> ProcessingTasks {
ProcessingTasks { started_at: OffsetDateTime::now_utc(), processing: RoaringBitmap::new() }
ProcessingTasks {
started_at: OffsetDateTime::now_utc(),
processing: RoaringBitmap::new(),
processed_previously: RoaringBitmap::new(),
}
}

/// Stores the currently processing tasks, and the date time at which it started.
fn start_processing_at(&mut self, started_at: OffsetDateTime, processing: RoaringBitmap) {
self.started_at = started_at;
self.processing = processing;
self.processed_previously = std::mem::replace(&mut self.processing, processing);
}

/// Set the processing tasks to an empty list
fn stop_processing(&mut self) {
self.processing = RoaringBitmap::new();
self.processed_previously = std::mem::take(&mut self.processing);
}

/// Returns the tasks that were processed in the previous tick.
fn processed_previously(&self) -> &RoaringBitmap {
&self.processed_previously
}

/// Returns `true` if there, at least, is one task that is currently processing that we must stop.
Expand Down Expand Up @@ -480,6 +491,19 @@ impl IndexScheduler {
features,
};

// initialize the directories we need to process batches.
if let Some(ref zk) = this.zk {
let options = zk::CreateMode::Persistent.with_acls(zk::Acls::anyone_all());
match zk.create("/election", &[], &options).await {
Ok(_) | Err(zk::Error::NodeExists) => (),
Err(e) => panic!("{e}"),
}

match zk.create("/snapshots", &[], &options).await {
Ok(_) | Err(zk::Error::NodeExists) => (),
Err(e) => panic!("{e}"),
}
}
this.run().await;
Ok(this)
}
Expand Down Expand Up @@ -576,23 +600,12 @@ impl IndexScheduler {
#[cfg(test)]
run.breakpoint(Breakpoint::Init);

// potentialy create /leader-q folder
// join the leader q
// subscribe a watcher to the node-1 in the leader q
// Join the potential leaders list.
// The lowest in the list is the leader. And if we're not the leader
// we watch the node right before us to be notified if he dies.
// See https://zookeeper.apache.org/doc/current/recipes.html#sc_leaderElection
let mut watchers = if let Some(ref zk) = zk {
let options = zk::CreateMode::Persistent.with_acls(zk::Acls::anyone_all());
match zk.create("/election", &[], &options).await {
Ok(_) | Err(zk::Error::NodeExists) => (),
Err(e) => panic!("{e}"),
}

match zk.create("/snapshots", &[], &options).await {
Ok(_) | Err(zk::Error::NodeExists) => (),
Err(e) => panic!("{e}"),
}

let options = zk::CreateMode::EphemeralSequential.with_acls(zk::Acls::anyone_all());
// TODO: ugly unwrap
let (_stat, id) = zk.create("/election/node-", &[], &options).await.unwrap();
self_node_id = id;
let previous_path = {
Expand All @@ -601,18 +614,20 @@ impl IndexScheduler {

let self_node_path = format!("node-{}", self_node_id);
let previous_path =
list.into_iter().take_while(|path| dbg!(path) < &self_node_path).last();
list.into_iter().take_while(|path| path < &self_node_path).last();
previous_path.map(|path| format!("/election/{}", path))
};

if let Some(previous_path) = previous_path {
log::warn!("I am the follower {}", self_node_id);
Some((
zk.watch(&previous_path, zk::AddWatchMode::Persistent).await.unwrap(),
zk.watch("/snapshots", zk::AddWatchMode::PersistentRecursive)
.await
.unwrap(),
))
} else {
// if there was no node before ourselves, then we're the leader.
log::warn!("I'm the leader");
None
}
Expand All @@ -623,40 +638,42 @@ impl IndexScheduler {

loop {
match watchers.as_mut() {
Some((lw, sw)) => {
Some((leader_watcher, snapshot_watcher)) => {
// We wait for a new batch processed by the leader OR a disconnection from the leader.
tokio::select! {
zk::WatchedEvent { event_type, session_state, .. } = lw.changed() => match event_type {
zk::WatchedEvent { event_type, session_state, .. } = leader_watcher.changed() => match event_type {
zk::EventType::Session => panic!("Session error {:?}", session_state),
zk::EventType::NodeDeleted => {
// The node behind us has been disconnected,
// am I the leader or is there someone before me.
let zk = zk.as_ref().unwrap();
let previous_path = {
let mut list = zk.list_children("/election").await.unwrap();
list.sort();

let self_node_path = format!("node-{}", self_node_id);
let previous_path =
list.into_iter().take_while(|path| dbg!(path) < &self_node_path).last();
list.into_iter().take_while(|path| path < &self_node_path).last();
previous_path.map(|path| format!("/election/{}", path))
};

let (lw, sw) = watchers.take().unwrap();
lw.remove().await.unwrap();
let (leader_watcher, snapshot_watcher) = watchers.take().unwrap();
leader_watcher.remove().await.unwrap();
watchers = if let Some(previous_path) = previous_path {
log::warn!("I stay a follower {}", self_node_id);
Some((
zk.watch(&previous_path, zk::AddWatchMode::Persistent).await.unwrap(),
zk.watch("/snapshots", zk::AddWatchMode::PersistentRecursive)
.await
.unwrap(),
snapshot_watcher,
))
} else {
log::warn!("I'm the new leader");
sw.remove().await.unwrap();
snapshot_watcher.remove().await.unwrap();
None
}
}
_ => (),
},
zk::WatchedEvent { event_type, session_state, path } = sw.changed() => match event_type {
zk::WatchedEvent { event_type, session_state, path } = snapshot_watcher.changed() => match event_type {
zk::EventType::Session => panic!("Session error {:?}", session_state),
zk::EventType::NodeCreated => {
println!("I should load a snapshot - {}", path);
Expand All @@ -667,18 +684,56 @@ impl IndexScheduler {
}
}
None => {
// we're either a leader or not running in a cluster,
// either way we should wait until we receive a task.
let wake_up = run.wake_up.clone();
let _ = tokio::task::spawn_blocking(move || wake_up.wait()).await;

match run.tick().await {
Ok(TickOutcome::TickAgain(_)) => {
Ok(TickOutcome::TickAgain(n)) => {
// We must tick again.
run.wake_up.signal();
// TODO:
// - create a new snapshot
// - create snapshot in ZK
// - delete task in ZK

println!("I should create a snapshot");
// if we're in a cluster that means we're the leader
// and should share a snapshot of what we've done.
if let Some(ref zk) = run.zk {
// if nothing was processed we have nothing to do.
if n == 0 {
continue;
}

// TODO:
// - create a new snapshot on disk/s3

// we must notify everyone that we dropped a new snapshot on the s3
let options = zk::CreateMode::EphemeralSequential
.with_acls(zk::Acls::anyone_all());
let (_stat, id) = zk
.create("/snapshots/snapshot-", &[], &options)
.await
.unwrap();
log::info!("Notified that there was a new snapshot {id}");

// We can now delete all the tasks that has been processed
let processed = run
.processing_tasks
.read()
.unwrap()
.processed_previously()
.clone(); // we don't want to hold the mutex
log::info!("Deleting {} processed tasks", processed.len());
for task in processed {
let _ = zk // we don't want to crash if we can't delete an update file.
.delete(
&format!(
"/tasks/task-{}",
zk::CreateSequence(task as i32)
),
None,
)
.await;
}
}
}
Ok(TickOutcome::WaitForSignal) => (),
Err(e) => {
Expand Down

0 comments on commit f0c4d36

Please sign in to comment.