Skip to content

Commit

Permalink
feat: add priority to I/O scheduler (#2315)
Browse files Browse the repository at this point in the history
This also renames store scheduler to scan scheduler. I'm thinking I
don't want to get into the thorny issue of how to prioritize I/O
requests across different scans. So, with this approach, if multiple
scans are running at the same time, then they will overschedule (and let
the OS deal with it). This can be revisited in the future.

Closes #1958
  • Loading branch information
westonpace committed May 13, 2024
1 parent c6f82fe commit fe2d874
Show file tree
Hide file tree
Showing 23 changed files with 339 additions and 63 deletions.
1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,7 @@ http = "0.2.9"
itertools = "0.12"
lazy_static = "1"
log = "0.4"
mockall = { version = "0.12.1" }
mock_instant = { version = "0.3.1", features = ["sync"] }
moka = "0.11"
num-traits = "0.2"
Expand Down
4 changes: 2 additions & 2 deletions python/src/file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ use lance_file::v2::{
reader::{BufferDescriptor, CachedFileMetadata, FileReader},
writer::{FileWriter, FileWriterOptions},
};
use lance_io::{scheduler::StoreScheduler, ReadBatchParams};
use lance_io::{scheduler::ScanScheduler, ReadBatchParams};
use object_store::path::Path;
use pyo3::{
exceptions::{PyIOError, PyRuntimeError, PyValueError},
Expand Down Expand Up @@ -267,7 +267,7 @@ impl LanceFileReader {
let io_parallelism = std::env::var("IO_THREADS")
.map(|val| val.parse::<u32>().unwrap_or(8))
.unwrap_or(8);
let scheduler = StoreScheduler::new(Arc::new(object_store), io_parallelism);
let scheduler = ScanScheduler::new(Arc::new(object_store), io_parallelism);
let file = scheduler.open_file(&path).await.infer_error()?;
let inner = FileReader::try_open(file, None).await.infer_error()?;
Ok(Self {
Expand Down
18 changes: 15 additions & 3 deletions rust/lance-encoding/src/decoder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -537,8 +537,12 @@ impl DecodeBatchScheduler {

let range = range.start as u32..range.end as u32;

self.root_scheduler
.schedule_ranges(&[range.clone()], scheduler, &sink)?;
self.root_scheduler.schedule_ranges(
&[range.clone()],
scheduler,
&sink,
range.start as u64,
)?;

trace!("Finished scheduling of range {:?}", range);
Ok(())
Expand Down Expand Up @@ -567,8 +571,11 @@ impl DecodeBatchScheduler {
format!("{}, ..., {}", indices[0], indices[indices.len() - 1])
}
);
if indices.is_empty() {
return Ok(());
}
self.root_scheduler
.schedule_take(indices, scheduler, &sink)?;
.schedule_take(indices, scheduler, &sink, indices[0] as u64)?;
trace!("Finished scheduling take of {} rows", indices.len());
Ok(())
}
Expand Down Expand Up @@ -740,10 +747,13 @@ pub trait PhysicalPageScheduler: Send + Sync + std::fmt::Debug {
/// * `range` - the range of row offsets (relative to start of page) requested
/// these must be ordered and must not overlap
/// * `scheduler` - a scheduler to submit the I/O request to
/// * `top_level_row` - the row offset of the top level field currently being
/// scheduled. This can be used to assign priority to I/O requests
fn schedule_ranges(
&self,
ranges: &[Range<u32>],
scheduler: &dyn EncodingsIo,
top_level_row: u64,
) -> BoxFuture<'static, Result<Box<dyn PhysicalPageDecoder>>>;
}

Expand Down Expand Up @@ -780,6 +790,7 @@ pub trait LogicalPageScheduler: Send + Sync + std::fmt::Debug {
ranges: &[Range<u32>],
scheduler: &Arc<dyn EncodingsIo>,
sink: &mpsc::UnboundedSender<Box<dyn LogicalPageDecoder>>,
top_level_row: u64,
) -> Result<()>;
/// Schedules I/O for the requested rows (identified by row offsets from start of page)
/// TODO: implement this using schedule_ranges
Expand All @@ -788,6 +799,7 @@ pub trait LogicalPageScheduler: Send + Sync + std::fmt::Debug {
indices: &[u32],
scheduler: &Arc<dyn EncodingsIo>,
sink: &mpsc::UnboundedSender<Box<dyn LogicalPageDecoder>>,
top_level_row: u64,
) -> Result<()>;
/// The number of rows covered by this page
fn num_rows(&self) -> u32;
Expand Down
5 changes: 4 additions & 1 deletion rust/lance-encoding/src/encodings/logical/binary.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,11 +47,12 @@ impl LogicalPageScheduler for BinaryPageScheduler {
ranges: &[std::ops::Range<u32>],
scheduler: &Arc<dyn crate::EncodingsIo>,
sink: &tokio::sync::mpsc::UnboundedSender<Box<dyn crate::decoder::LogicalPageDecoder>>,
top_level_row: u64,
) -> Result<()> {
trace!("Scheduling binary for {} ranges", ranges.len());
let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel();
self.varbin_scheduler
.schedule_ranges(ranges, scheduler, &tx)?;
.schedule_ranges(ranges, scheduler, &tx, top_level_row)?;

while let Some(decoder) = rx.recv().now_or_never() {
let wrapped = BinaryPageDecoder {
Expand All @@ -69,6 +70,7 @@ impl LogicalPageScheduler for BinaryPageScheduler {
indices: &[u32],
scheduler: &Arc<dyn crate::EncodingsIo>,
sink: &tokio::sync::mpsc::UnboundedSender<Box<dyn crate::decoder::LogicalPageDecoder>>,
top_level_row: u64,
) -> Result<()> {
trace!("Scheduling binary for {} indices", indices.len());
self.schedule_ranges(
Expand All @@ -78,6 +80,7 @@ impl LogicalPageScheduler for BinaryPageScheduler {
.collect::<Vec<_>>(),
scheduler,
sink,
top_level_row,
)
}

Expand Down
5 changes: 4 additions & 1 deletion rust/lance-encoding/src/encodings/logical/fixed_size_list.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ impl LogicalPageScheduler for FslPageScheduler {
ranges: &[Range<u32>],
scheduler: &Arc<dyn EncodingsIo>,
sink: &mpsc::UnboundedSender<Box<dyn LogicalPageDecoder>>,
top_level_row: u64,
) -> Result<()> {
let expanded_ranges = ranges
.iter()
Expand All @@ -64,7 +65,7 @@ impl LogicalPageScheduler for FslPageScheduler {
);
let (tx, mut rx) = mpsc::unbounded_channel();
self.items_scheduler
.schedule_ranges(&expanded_ranges, scheduler, &tx)?;
.schedule_ranges(&expanded_ranges, scheduler, &tx, top_level_row)?;
let inner_page_decoder = rx.blocking_recv().unwrap();
sink.send(Box::new(FslPageDecoder {
inner: inner_page_decoder,
Expand All @@ -79,6 +80,7 @@ impl LogicalPageScheduler for FslPageScheduler {
indices: &[u32],
scheduler: &Arc<dyn EncodingsIo>,
sink: &mpsc::UnboundedSender<Box<dyn LogicalPageDecoder>>,
top_level_row: u64,
) -> Result<()> {
self.schedule_ranges(
&indices
Expand All @@ -87,6 +89,7 @@ impl LogicalPageScheduler for FslPageScheduler {
.collect::<Vec<_>>(),
scheduler,
sink,
top_level_row,
)
}

Expand Down
31 changes: 27 additions & 4 deletions rust/lance-encoding/src/encodings/logical/list.rs
Original file line number Diff line number Diff line change
Expand Up @@ -227,6 +227,7 @@ impl LogicalPageScheduler for ListPageScheduler {
ranges: &[std::ops::Range<u32>],
scheduler: &Arc<dyn EncodingsIo>,
sink: &mpsc::UnboundedSender<Box<dyn LogicalPageDecoder>>,
top_level_row: u64,
) -> Result<()> {
// TODO: Shortcut here if the request covers the entire range (can be determined by
// the first_invalid_offset). If this is the case we don't need any indirect I/O. We
Expand Down Expand Up @@ -258,7 +259,7 @@ impl LogicalPageScheduler for ListPageScheduler {
// to this page.
let (tx, mut rx) = mpsc::unbounded_channel();
self.offsets_scheduler
.schedule_ranges(&offsets_ranges, scheduler, &tx)?;
.schedule_ranges(&offsets_ranges, scheduler, &tx, top_level_row)?;
let mut scheduled_offsets = rx.try_recv().unwrap();
let items_schedulers = self.items_schedulers.clone();
let ranges = ranges.to_vec();
Expand Down Expand Up @@ -319,7 +320,17 @@ impl LogicalPageScheduler for ListPageScheduler {
// All requested items are past this page, continue
row_offset += next_scheduler.num_rows() as u64;
if !next_item_ranges.is_empty() {
next_scheduler.schedule_ranges(&next_item_ranges, &scheduler, &tx)?;
// Note: we are providing the same top_level_row to ALL items pages referenced by
// this offsets page. This gives them higher priority.
// TODO: Ideally we would ALSO have a guarantee from the scheduler that items with
// the same top_level_row are scheduled in FCFS order but I don't think it works
// that way. Still, this is probably good enough for a while
next_scheduler.schedule_ranges(
&next_item_ranges,
&scheduler,
&tx,
top_level_row,
)?;
next_item_ranges.clear();
}
next_scheduler = item_schedulers.pop_front().unwrap();
Expand All @@ -342,14 +353,24 @@ impl LogicalPageScheduler for ListPageScheduler {
next_item_ranges.push(page_range);
row_offset += next_scheduler.num_rows() as u64;
if !next_item_ranges.is_empty() {
next_scheduler.schedule_ranges(&next_item_ranges, &scheduler, &tx)?;
next_scheduler.schedule_ranges(
&next_item_ranges,
&scheduler,
&tx,
top_level_row,
)?;
next_item_ranges.clear();
}
next_scheduler = item_schedulers.pop_front().unwrap();
}
}
if !next_item_ranges.is_empty() {
next_scheduler.schedule_ranges(&next_item_ranges, &scheduler, &tx)?;
next_scheduler.schedule_ranges(
&next_item_ranges,
&scheduler,
&tx,
top_level_row,
)?;
}
let mut item_decoders = Vec::new();
drop(tx);
Expand Down Expand Up @@ -388,6 +409,7 @@ impl LogicalPageScheduler for ListPageScheduler {
indices: &[u32],
scheduler: &Arc<dyn EncodingsIo>,
sink: &mpsc::UnboundedSender<Box<dyn LogicalPageDecoder>>,
top_level_row: u64,
) -> Result<()> {
trace!("Scheduling list offsets for {} indices", indices.len());
self.schedule_ranges(
Expand All @@ -397,6 +419,7 @@ impl LogicalPageScheduler for ListPageScheduler {
.collect::<Vec<_>>(),
scheduler,
sink,
top_level_row,
)
}
}
Expand Down
9 changes: 6 additions & 3 deletions rust/lance-encoding/src/encodings/logical/primitive.rs
Original file line number Diff line number Diff line change
Expand Up @@ -80,12 +80,13 @@ impl LogicalPageScheduler for PrimitivePageScheduler {
ranges: &[std::ops::Range<u32>],
scheduler: &Arc<dyn EncodingsIo>,
sink: &mpsc::UnboundedSender<Box<dyn LogicalPageDecoder>>,
top_level_row: u64,
) -> Result<()> {
let num_rows = ranges.iter().map(|r| r.end - r.start).sum();
trace!("Scheduling ranges {:?} from physical page", ranges);
let physical_decoder = self
.physical_decoder
.schedule_ranges(ranges, scheduler.as_ref());
let physical_decoder =
self.physical_decoder
.schedule_ranges(ranges, scheduler.as_ref(), top_level_row);

let logical_decoder = PrimitiveFieldDecoder {
data_type: self.data_type.clone(),
Expand All @@ -104,6 +105,7 @@ impl LogicalPageScheduler for PrimitivePageScheduler {
indices: &[u32],
scheduler: &Arc<dyn EncodingsIo>,
sink: &mpsc::UnboundedSender<Box<dyn LogicalPageDecoder>>,
top_level_row: u64,
) -> Result<()> {
trace!(
"Scheduling take of {} indices from physical page",
Expand All @@ -116,6 +118,7 @@ impl LogicalPageScheduler for PrimitivePageScheduler {
.collect::<Vec<_>>(),
scheduler,
sink,
top_level_row,
)
}
}
Expand Down
22 changes: 19 additions & 3 deletions rust/lance-encoding/src/encodings/logical/struct.rs
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,7 @@ impl LogicalPageScheduler for SimpleStructScheduler {
ranges: &[Range<u32>],
scheduler: &Arc<dyn EncodingsIo>,
sink: &mpsc::UnboundedSender<Box<dyn LogicalPageDecoder>>,
top_level_row: u64,
) -> Result<()> {
for range in ranges.iter().cloned() {
let mut rows_to_read = range.end - range.start;
Expand Down Expand Up @@ -156,6 +157,8 @@ impl LogicalPageScheduler for SimpleStructScheduler {
// The downside of the current algorithm is that many tiny I/O batches means less opportunity for in-batch coalescing.
// Then again, if our outer batch coalescing is super good then maybe we don't bother

let mut current_top_level_row = top_level_row;

while rows_to_read > 0 {
let mut min_rows_added = u32::MAX;
for (col_idx, field_scheduler) in self.children.iter().enumerate() {
Expand Down Expand Up @@ -183,7 +186,12 @@ impl LogicalPageScheduler for SimpleStructScheduler {
page_range_start,
next_page
);
next_page.schedule_ranges(&[page_range], scheduler, sink)?;
next_page.schedule_ranges(
&[page_range],
scheduler,
sink,
current_top_level_row,
)?;

status.rows_queued += rows_to_take;
status.rows_to_take -= rows_to_take;
Expand All @@ -199,6 +207,7 @@ impl LogicalPageScheduler for SimpleStructScheduler {
panic!("Error in scheduling logic, panic to avoid infinite loop");
}
rows_to_read -= min_rows_added;
current_top_level_row += min_rows_added as u64;
for field_status in &mut field_status {
field_status.rows_queued -= min_rows_added;
}
Expand All @@ -216,6 +225,7 @@ impl LogicalPageScheduler for SimpleStructScheduler {
indices: &[u32],
scheduler: &Arc<dyn EncodingsIo>,
sink: &mpsc::UnboundedSender<Box<dyn LogicalPageDecoder>>,
top_level_row: u64,
) -> Result<()> {
trace!("Scheduling struct decode of {} indices", indices.len());

Expand All @@ -236,7 +246,7 @@ impl LogicalPageScheduler for SimpleStructScheduler {
let mut rows_to_read = indices.len() as u32;

// NOTE: See schedule_range for a description of the scheduling algorithm

let mut current_top_level_row = top_level_row;
while rows_to_read > 0 {
let mut min_rows_added = u32::MAX;
for (col_idx, field_scheduler) in self.children.iter().enumerate() {
Expand Down Expand Up @@ -269,7 +279,12 @@ impl LogicalPageScheduler for SimpleStructScheduler {
// We should be guaranteed to get at least one page
let next_page = next_page.unwrap();

next_page.schedule_take(&indices_in_page, scheduler, sink)?;
next_page.schedule_take(
&indices_in_page,
scheduler,
sink,
current_top_level_row,
)?;

let rows_scheduled = indices_in_page.len() as u32;
status.rows_queued += rows_scheduled;
Expand All @@ -281,6 +296,7 @@ impl LogicalPageScheduler for SimpleStructScheduler {
panic!("Error in scheduling logic, panic to avoid infinite loop");
}
rows_to_read -= min_rows_added;
current_top_level_row += min_rows_added as u64;
for field_status in &mut field_status {
field_status.rows_queued -= min_rows_added;
}
Expand Down
13 changes: 11 additions & 2 deletions rust/lance-encoding/src/encodings/physical/basic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -123,18 +123,27 @@ impl PhysicalPageScheduler for BasicPageScheduler {
&self,
ranges: &[std::ops::Range<u32>],
scheduler: &dyn EncodingsIo,
top_level_row: u64,
) -> BoxFuture<'static, Result<Box<dyn PhysicalPageDecoder>>> {
let validity_future = match &self.mode {
SchedulerNullStatus::None(_) | SchedulerNullStatus::All => None,
SchedulerNullStatus::Some(schedulers) => {
trace!("Scheduling ranges {:?} from validity", ranges);
Some(schedulers.validity.schedule_ranges(ranges, scheduler))
Some(
schedulers
.validity
.schedule_ranges(ranges, scheduler, top_level_row),
)
}
};

let values_future = if let Some(values_scheduler) = self.mode.values_scheduler() {
trace!("Scheduling range {:?} from values", ranges);
Some(values_scheduler.schedule_ranges(ranges, scheduler).boxed())
Some(
values_scheduler
.schedule_ranges(ranges, scheduler, top_level_row)
.boxed(),
)
} else {
trace!("No values fetch needed since values all null");
None
Expand Down
3 changes: 2 additions & 1 deletion rust/lance-encoding/src/encodings/physical/bitmap.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ impl PhysicalPageScheduler for DenseBitmapScheduler {
&self,
ranges: &[Range<u32>],
scheduler: &dyn EncodingsIo,
top_level_row: u64,
) -> BoxFuture<'static, Result<Box<dyn PhysicalPageDecoder>>> {
let mut min = u64::MAX;
let mut max = 0;
Expand Down Expand Up @@ -62,7 +63,7 @@ impl PhysicalPageScheduler for DenseBitmapScheduler {
min,
max
);
let bytes = scheduler.submit_request(byte_ranges);
let bytes = scheduler.submit_request(byte_ranges, top_level_row);

async move {
let bytes = bytes.await?;
Expand Down

0 comments on commit fe2d874

Please sign in to comment.