Skip to content

Commit

Permalink
implement the deletion of tasks after processing a batch
Browse files Browse the repository at this point in the history
  • Loading branch information
irevoire committed Aug 9, 2023
1 parent 8c20d6e commit 83806d1
Showing 1 changed file with 55 additions and 8 deletions.
63 changes: 55 additions & 8 deletions index-scheduler/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -153,25 +153,38 @@ struct ProcessingTasks {
started_at: OffsetDateTime,
/// The list of tasks ids that are currently running.
processing: RoaringBitmap,
/// The list of tasks ids that were processed in the last batch
processed_previously: RoaringBitmap,
}

impl ProcessingTasks {
/// Creates an empty `ProcessingAt` struct.
fn new() -> ProcessingTasks {
ProcessingTasks { started_at: OffsetDateTime::now_utc(), processing: RoaringBitmap::new() }
ProcessingTasks {
started_at: OffsetDateTime::now_utc(),
processing: RoaringBitmap::new(),
processed_previously: RoaringBitmap::new(),
}
}

/// Stores the currently processing tasks, and the date time at which it started.
fn start_processing_at(&mut self, started_at: OffsetDateTime, processing: RoaringBitmap) {
self.started_at = started_at;
self.processed_previously = self.processing;
self.processing = processing;
}

/// Set the processing tasks to an empty list
fn stop_processing(&mut self) {
self.processed_previously = self.processing;
self.processing = RoaringBitmap::new();
}

/// Returns the tasks that were processed in the previous tick.
fn processed_previously(&self) -> &RoaringBitmap {
&self.processed_previously
}

/// Returns `true` if there, at least, is one task that is currently processing that we must stop.
fn must_cancel_processing_tasks(&self, canceled_tasks: &RoaringBitmap) -> bool {
!self.processing.is_disjoint(canceled_tasks)
Expand Down Expand Up @@ -606,6 +619,7 @@ impl IndexScheduler {
};

if let Some(previous_path) = previous_path {
log::warn!("I am the follower {}", self_node_id);
Some((
zk.watch(&previous_path, zk::AddWatchMode::Persistent).await.unwrap(),
zk.watch("/snapshots", zk::AddWatchMode::PersistentRecursive)
Expand Down Expand Up @@ -635,10 +649,11 @@ impl IndexScheduler {

let self_node_path = format!("node-{}", self_node_id);
let previous_path =
list.into_iter().take_while(|path| dbg!(path) < &self_node_path).last();
list.into_iter().take_while(|path| dbg!(path) < &self_node_id).last();
previous_path.map(|path| format!("/election/{}", path))
};

log::warn!("I stay follower {}", self_node_path);
let (lw, sw) = watchers.take().unwrap();
lw.remove().await.unwrap();
watchers = if let Some(previous_path) = previous_path {
Expand Down Expand Up @@ -671,14 +686,46 @@ impl IndexScheduler {
let _ = tokio::task::spawn_blocking(move || wake_up.wait()).await;

match run.tick().await {
Ok(TickOutcome::TickAgain(_)) => {
Ok(TickOutcome::TickAgain(n)) => {
// We must tick again.
run.wake_up.signal();
// TODO:
// - create a new snapshot
// - create snapshot in ZK
// - delete task in ZK

println!("I should create a snapshot");
if n == 0 {
// if no task were processed there is nothing to share in the cluster conf.
continue;
}

// TODO:
// - create a new snapshot on disk/s3

if let Some(ref zk) = self.zk {
// we must notify everyone that we dropped a new snapshot on the s3
let options = zk::CreateMode::EphemeralSequential
.with_acls(zk::Acls::anyone_all());
let (_stat, id) = zk
.create("/snapshots/snapshot-", &[], &options)
.await
.unwrap();
// We can now delete all the tasks that has been processed

let processed = self
.processing_tasks
.read()
.unwrap()
.processed_previously()
.clone(); // we don't want to hold the mutex
for task in processed {
let _ = zk // we don't want to crash if we can't delete an update file.
.delete(
&format!(
"/tasks/task-{}",
CreateSequence(task as i32)
),
None,
)
.await;
}
}
}
Ok(TickOutcome::WaitForSignal) => (),
Err(e) => {
Expand Down

0 comments on commit 83806d1

Please sign in to comment.