Skip to content

Commit

Permalink
WIP: progress on the snapshot import
Browse files Browse the repository at this point in the history
  • Loading branch information
irevoire committed Aug 17, 2023
1 parent 854745c commit 5cf56cf
Show file tree
Hide file tree
Showing 4 changed files with 147 additions and 67 deletions.
41 changes: 27 additions & 14 deletions index-scheduler/src/batch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -590,7 +590,8 @@ impl IndexScheduler {
unreachable!()
};

let mut wtxn = self.env.write_txn()?;
let env = self.env();
let mut wtxn = env.write_txn()?;
let canceled_tasks_content_uuids = self.cancel_matched_tasks(
&mut wtxn,
task.uid,
Expand Down Expand Up @@ -640,7 +641,8 @@ impl IndexScheduler {
unreachable!()
};

let mut wtxn = self.env.write_txn()?;
let env = self.env();
let mut wtxn = env.write_txn()?;
let deleted_tasks_count = self.delete_matched_tasks(&mut wtxn, matched_tasks)?;

task.status = Status::Succeeded;
Expand Down Expand Up @@ -678,12 +680,13 @@ impl IndexScheduler {
// two read operations as the task processing is synchronous.

// 2.1 First copy the LMDB env of the index-scheduler
let env = self.env();
let dst = temp_snapshot_dir.path().join("tasks");
fs::create_dir_all(&dst)?;
self.env.copy_to_path(dst.join("data.mdb"), CompactionOption::Enabled)?;
env.copy_to_path(dst.join("data.mdb"), CompactionOption::Enabled)?;

// 2.2 Create a read transaction on the index-scheduler
let rtxn = self.env.read_txn()?;
let rtxn = env.read_txn()?;

// 2.3 Create the update files directory
let update_files_dir = temp_snapshot_dir.path().join("update_files");
Expand Down Expand Up @@ -769,7 +772,8 @@ impl IndexScheduler {
}
dump_keys.flush()?;

let rtxn = self.env.read_txn()?;
let env = self.env();
let rtxn = env.read_txn()?;

// 2. dump the tasks
let mut dump_tasks = dump.create_tasks_queue()?;
Expand Down Expand Up @@ -865,10 +869,12 @@ impl IndexScheduler {
let index_uid = op.index_uid().to_string();
let index = if must_create_index {
// create the index if it doesn't already exist
let wtxn = self.env.write_txn()?;
let env = self.env();
let wtxn = env.write_txn()?;
self.index_mapper.create_index(wtxn, &index_uid, None)?
} else {
let rtxn = self.env.read_txn()?;
let env = self.env();
let rtxn = env.read_txn()?;
self.index_mapper.index(&rtxn, &index_uid)?
};

Expand All @@ -883,7 +889,8 @@ impl IndexScheduler {
let res = || -> Result<()> {
let index_rtxn = index.read_txn()?;
let stats = crate::index_mapper::IndexStats::new(&index, &index_rtxn)?;
let mut wtxn = self.env.write_txn()?;
let env = self.env();
let mut wtxn = env.write_txn()?;
self.index_mapper.store_stats_of(&mut wtxn, &index_uid, &stats)?;
wtxn.commit()?;
Ok(())
Expand All @@ -906,7 +913,8 @@ impl IndexScheduler {
unreachable!()
};
let index = {
let rtxn = self.env.read_txn()?;
let env = self.env();
let rtxn = env.read_txn()?;
self.index_mapper.index(&rtxn, index_uid)?
};
let deleted_documents = delete_document_by_filter(filter, index);
Expand Down Expand Up @@ -942,7 +950,8 @@ impl IndexScheduler {
Ok(vec![task])
}
Batch::IndexCreation { index_uid, primary_key, task } => {
let wtxn = self.env.write_txn()?;
let env = self.env();
let wtxn = env.write_txn()?;
if self.index_mapper.exists(&wtxn, &index_uid)? {
return Err(Error::IndexAlreadyExists(index_uid));
}
Expand All @@ -951,7 +960,8 @@ impl IndexScheduler {
self.process_batch(Batch::IndexUpdate { index_uid, primary_key, task })
}
Batch::IndexUpdate { index_uid, primary_key, mut task } => {
let rtxn = self.env.read_txn()?;
let env = self.env();
let rtxn = env.read_txn()?;
let index = self.index_mapper.index(&rtxn, &index_uid)?;

if let Some(primary_key) = primary_key.clone() {
Expand Down Expand Up @@ -981,7 +991,8 @@ impl IndexScheduler {
// this is a non-critical operation. If it fails, we should not fail
// the entire batch.
let res = || -> Result<()> {
let mut wtxn = self.env.write_txn()?;
let env = self.env();
let mut wtxn = env.write_txn()?;
let index_rtxn = index.read_txn()?;
let stats = crate::index_mapper::IndexStats::new(&index, &index_rtxn)?;
self.index_mapper.store_stats_of(&mut wtxn, &index_uid, &stats)?;
Expand All @@ -997,7 +1008,8 @@ impl IndexScheduler {
Ok(vec![task])
}
Batch::IndexDeletion { index_uid, index_has_been_created, mut tasks } => {
let wtxn = self.env.write_txn()?;
let env = self.env();
let wtxn = env.write_txn()?;

// it's possible that the index doesn't exist
let number_of_documents = || -> Result<u64> {
Expand Down Expand Up @@ -1028,7 +1040,8 @@ impl IndexScheduler {
Ok(tasks)
}
Batch::IndexSwap { mut task } => {
let mut wtxn = self.env.write_txn()?;
let env = self.env();
let mut wtxn = env.write_txn()?;
let swaps = if let KindWithContent::IndexSwap { swaps } = &task.kind {
swaps
} else {
Expand Down

0 comments on commit 5cf56cf

Please sign in to comment.