Skip to content

Commit

Permalink
WIP
Browse files Browse the repository at this point in the history
  • Loading branch information
westonpace committed May 8, 2024
1 parent 89e8eb6 commit 48d12cf
Show file tree
Hide file tree
Showing 23 changed files with 338 additions and 63 deletions.
1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,7 @@ http = "0.2.9"
itertools = "0.12"
lazy_static = "1"
log = "0.4"
mockall = { version = "0.12.1" }
mock_instant = { version = "0.3.1", features = ["sync"] }
moka = "0.11"
num-traits = "0.2"
Expand Down
4 changes: 2 additions & 2 deletions python/src/file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ use lance_file::v2::{
reader::{BufferDescriptor, CachedFileMetadata, FileReader},
writer::{FileWriter, FileWriterOptions},
};
use lance_io::{scheduler::StoreScheduler, ReadBatchParams};
use lance_io::{scheduler::ScanScheduler, ReadBatchParams};
use object_store::path::Path;
use pyo3::{
exceptions::{PyIOError, PyRuntimeError, PyValueError},
Expand Down Expand Up @@ -267,7 +267,7 @@ impl LanceFileReader {
let io_parallelism = std::env::var("IO_THREADS")
.map(|val| val.parse::<u32>().unwrap_or(8))
.unwrap_or(8);
let scheduler = StoreScheduler::new(Arc::new(object_store), io_parallelism);
let scheduler = ScanScheduler::new(Arc::new(object_store), io_parallelism);
let file = scheduler.open_file(&path).await.infer_error()?;
let inner = FileReader::try_open(file, None).await.infer_error()?;
Ok(Self {
Expand Down
18 changes: 15 additions & 3 deletions rust/lance-encoding/src/decoder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -537,8 +537,12 @@ impl DecodeBatchScheduler {

let range = range.start as u32..range.end as u32;

self.root_scheduler
.schedule_ranges(&[range.clone()], scheduler, &sink)?;
self.root_scheduler.schedule_ranges(
&[range.clone()],
scheduler,
&sink,
range.start as u64,
)?;

trace!("Finished scheduling of range {:?}", range);
Ok(())
Expand Down Expand Up @@ -567,8 +571,11 @@ impl DecodeBatchScheduler {
format!("{}, ..., {}", indices[0], indices[indices.len() - 1])
}
);
if indices.is_empty() {
return Ok(());
}
self.root_scheduler
.schedule_take(indices, scheduler, &sink)?;
.schedule_take(indices, scheduler, &sink, indices[0] as u64)?;
trace!("Finished scheduling take of {} rows", indices.len());
Ok(())
}
Expand Down Expand Up @@ -740,10 +747,13 @@ pub trait PhysicalPageScheduler: Send + Sync + std::fmt::Debug {
/// * `range` - the range of row offsets (relative to start of page) requested
/// these must be ordered and must not overlap
/// * `scheduler` - a scheduler to submit the I/O request to
/// * `top_level_row` - the row offset of the top level field currently being
/// scheduled. This can be used to assign priority to I/O requests
fn schedule_ranges(
&self,
ranges: &[Range<u32>],
scheduler: &dyn EncodingsIo,
top_level_row: u64,
) -> BoxFuture<'static, Result<Box<dyn PhysicalPageDecoder>>>;
}

Expand Down Expand Up @@ -780,6 +790,7 @@ pub trait LogicalPageScheduler: Send + Sync + std::fmt::Debug {
ranges: &[Range<u32>],
scheduler: &Arc<dyn EncodingsIo>,
sink: &mpsc::UnboundedSender<Box<dyn LogicalPageDecoder>>,
top_level_row: u64,
) -> Result<()>;
/// Schedules I/O for the requested rows (identified by row offsets from start of page)
/// TODO: implement this using schedule_ranges
Expand All @@ -788,6 +799,7 @@ pub trait LogicalPageScheduler: Send + Sync + std::fmt::Debug {
indices: &[u32],
scheduler: &Arc<dyn EncodingsIo>,
sink: &mpsc::UnboundedSender<Box<dyn LogicalPageDecoder>>,
top_level_row: u64,
) -> Result<()>;
/// The number of rows covered by this page
fn num_rows(&self) -> u32;
Expand Down
5 changes: 4 additions & 1 deletion rust/lance-encoding/src/encodings/logical/binary.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,11 +47,12 @@ impl LogicalPageScheduler for BinaryPageScheduler {
ranges: &[std::ops::Range<u32>],
scheduler: &Arc<dyn crate::EncodingsIo>,
sink: &tokio::sync::mpsc::UnboundedSender<Box<dyn crate::decoder::LogicalPageDecoder>>,
top_level_row: u64,
) -> Result<()> {
trace!("Scheduling binary for {} ranges", ranges.len());
let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel();
self.varbin_scheduler
.schedule_ranges(ranges, scheduler, &tx)?;
.schedule_ranges(ranges, scheduler, &tx, top_level_row)?;

while let Some(decoder) = rx.recv().now_or_never() {
let wrapped = BinaryPageDecoder {
Expand All @@ -69,6 +70,7 @@ impl LogicalPageScheduler for BinaryPageScheduler {
indices: &[u32],
scheduler: &Arc<dyn crate::EncodingsIo>,
sink: &tokio::sync::mpsc::UnboundedSender<Box<dyn crate::decoder::LogicalPageDecoder>>,
top_level_row: u64,
) -> Result<()> {
trace!("Scheduling binary for {} indices", indices.len());
self.schedule_ranges(
Expand All @@ -78,6 +80,7 @@ impl LogicalPageScheduler for BinaryPageScheduler {
.collect::<Vec<_>>(),
scheduler,
sink,
top_level_row,
)
}

Expand Down
5 changes: 4 additions & 1 deletion rust/lance-encoding/src/encodings/logical/fixed_size_list.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ impl LogicalPageScheduler for FslPageScheduler {
ranges: &[Range<u32>],
scheduler: &Arc<dyn EncodingsIo>,
sink: &mpsc::UnboundedSender<Box<dyn LogicalPageDecoder>>,
top_level_row: u64,
) -> Result<()> {
let expanded_ranges = ranges
.iter()
Expand All @@ -64,7 +65,7 @@ impl LogicalPageScheduler for FslPageScheduler {
);
let (tx, mut rx) = mpsc::unbounded_channel();
self.items_scheduler
.schedule_ranges(&expanded_ranges, scheduler, &tx)?;
.schedule_ranges(&expanded_ranges, scheduler, &tx, top_level_row)?;
let inner_page_decoder = rx.blocking_recv().unwrap();
sink.send(Box::new(FslPageDecoder {
inner: inner_page_decoder,
Expand All @@ -79,6 +80,7 @@ impl LogicalPageScheduler for FslPageScheduler {
indices: &[u32],
scheduler: &Arc<dyn EncodingsIo>,
sink: &mpsc::UnboundedSender<Box<dyn LogicalPageDecoder>>,
top_level_row: u64,
) -> Result<()> {
self.schedule_ranges(
&indices
Expand All @@ -87,6 +89,7 @@ impl LogicalPageScheduler for FslPageScheduler {
.collect::<Vec<_>>(),
scheduler,
sink,
top_level_row,
)
}

Expand Down
31 changes: 27 additions & 4 deletions rust/lance-encoding/src/encodings/logical/list.rs
Original file line number Diff line number Diff line change
Expand Up @@ -227,6 +227,7 @@ impl LogicalPageScheduler for ListPageScheduler {
ranges: &[std::ops::Range<u32>],
scheduler: &Arc<dyn EncodingsIo>,
sink: &mpsc::UnboundedSender<Box<dyn LogicalPageDecoder>>,
top_level_row: u64,
) -> Result<()> {
// TODO: Shortcut here if the request covers the entire range (can be determined by
// the first_invalid_offset). If this is the case we don't need any indirect I/O. We
Expand Down Expand Up @@ -258,7 +259,7 @@ impl LogicalPageScheduler for ListPageScheduler {
// to this page.
let (tx, mut rx) = mpsc::unbounded_channel();
self.offsets_scheduler
.schedule_ranges(&offsets_ranges, scheduler, &tx)?;
.schedule_ranges(&offsets_ranges, scheduler, &tx, top_level_row)?;
let mut scheduled_offsets = rx.try_recv().unwrap();
let items_schedulers = self.items_schedulers.clone();
let ranges = ranges.to_vec();
Expand Down Expand Up @@ -319,7 +320,17 @@ impl LogicalPageScheduler for ListPageScheduler {
// All requested items are past this page, continue
row_offset += next_scheduler.num_rows() as u64;
if !next_item_ranges.is_empty() {
next_scheduler.schedule_ranges(&next_item_ranges, &scheduler, &tx)?;
// Note: we are providing the same top_level_row to ALL items pages referenced by
// this offsets page. This gives them higher priority.
// TODO: Ideally we would ALSO have a guarantee from the scheduler that items with
// the same top_level_row are scheduled in FCFS order but I don't think it works
// that way. Still, this is probably good enough for a while
next_scheduler.schedule_ranges(
&next_item_ranges,
&scheduler,
&tx,
top_level_row,
)?;
next_item_ranges.clear();
}
next_scheduler = item_schedulers.pop_front().unwrap();
Expand All @@ -342,14 +353,24 @@ impl LogicalPageScheduler for ListPageScheduler {
next_item_ranges.push(page_range);
row_offset += next_scheduler.num_rows() as u64;
if !next_item_ranges.is_empty() {
next_scheduler.schedule_ranges(&next_item_ranges, &scheduler, &tx)?;
next_scheduler.schedule_ranges(
&next_item_ranges,
&scheduler,
&tx,
top_level_row,
)?;
next_item_ranges.clear();
}
next_scheduler = item_schedulers.pop_front().unwrap();
}
}
if !next_item_ranges.is_empty() {
next_scheduler.schedule_ranges(&next_item_ranges, &scheduler, &tx)?;
next_scheduler.schedule_ranges(
&next_item_ranges,
&scheduler,
&tx,
top_level_row,
)?;
}
let mut item_decoders = Vec::new();
drop(tx);
Expand Down Expand Up @@ -388,6 +409,7 @@ impl LogicalPageScheduler for ListPageScheduler {
indices: &[u32],
scheduler: &Arc<dyn EncodingsIo>,
sink: &mpsc::UnboundedSender<Box<dyn LogicalPageDecoder>>,
top_level_row: u64,
) -> Result<()> {
trace!("Scheduling list offsets for {} indices", indices.len());
self.schedule_ranges(
Expand All @@ -397,6 +419,7 @@ impl LogicalPageScheduler for ListPageScheduler {
.collect::<Vec<_>>(),
scheduler,
sink,
top_level_row,
)
}
}
Expand Down
9 changes: 6 additions & 3 deletions rust/lance-encoding/src/encodings/logical/primitive.rs
Original file line number Diff line number Diff line change
Expand Up @@ -80,12 +80,13 @@ impl LogicalPageScheduler for PrimitivePageScheduler {
ranges: &[std::ops::Range<u32>],
scheduler: &Arc<dyn EncodingsIo>,
sink: &mpsc::UnboundedSender<Box<dyn LogicalPageDecoder>>,
top_level_row: u64,
) -> Result<()> {
let num_rows = ranges.iter().map(|r| r.end - r.start).sum();
trace!("Scheduling ranges {:?} from physical page", ranges);
let physical_decoder = self
.physical_decoder
.schedule_ranges(ranges, scheduler.as_ref());
let physical_decoder =
self.physical_decoder
.schedule_ranges(ranges, scheduler.as_ref(), top_level_row);

let logical_decoder = PrimitiveFieldDecoder {
data_type: self.data_type.clone(),
Expand All @@ -104,6 +105,7 @@ impl LogicalPageScheduler for PrimitivePageScheduler {
indices: &[u32],
scheduler: &Arc<dyn EncodingsIo>,
sink: &mpsc::UnboundedSender<Box<dyn LogicalPageDecoder>>,
top_level_row: u64,
) -> Result<()> {
trace!(
"Scheduling take of {} indices from physical page",
Expand All @@ -116,6 +118,7 @@ impl LogicalPageScheduler for PrimitivePageScheduler {
.collect::<Vec<_>>(),
scheduler,
sink,
top_level_row,
)
}
}
Expand Down
22 changes: 19 additions & 3 deletions rust/lance-encoding/src/encodings/logical/struct.rs
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,7 @@ impl LogicalPageScheduler for SimpleStructScheduler {
ranges: &[Range<u32>],
scheduler: &Arc<dyn EncodingsIo>,
sink: &mpsc::UnboundedSender<Box<dyn LogicalPageDecoder>>,
top_level_row: u64,
) -> Result<()> {
for range in ranges.iter().cloned() {
let mut rows_to_read = range.end - range.start;
Expand Down Expand Up @@ -156,6 +157,8 @@ impl LogicalPageScheduler for SimpleStructScheduler {
// The downside of the current algorithm is that many tiny I/O batches means less opportunity for in-batch coalescing.
// Then again, if our outer batch coalescing is super good then maybe we don't bother

let mut current_top_level_row = top_level_row;

while rows_to_read > 0 {
let mut min_rows_added = u32::MAX;
for (col_idx, field_scheduler) in self.children.iter().enumerate() {
Expand Down Expand Up @@ -183,7 +186,12 @@ impl LogicalPageScheduler for SimpleStructScheduler {
page_range_start,
next_page
);
next_page.schedule_ranges(&[page_range], scheduler, sink)?;
next_page.schedule_ranges(
&[page_range],
scheduler,
sink,
current_top_level_row,
)?;

status.rows_queued += rows_to_take;
status.rows_to_take -= rows_to_take;
Expand All @@ -199,6 +207,7 @@ impl LogicalPageScheduler for SimpleStructScheduler {
panic!("Error in scheduling logic, panic to avoid infinite loop");
}
rows_to_read -= min_rows_added;
current_top_level_row += min_rows_added as u64;
for field_status in &mut field_status {
field_status.rows_queued -= min_rows_added;
}
Expand All @@ -216,6 +225,7 @@ impl LogicalPageScheduler for SimpleStructScheduler {
indices: &[u32],
scheduler: &Arc<dyn EncodingsIo>,
sink: &mpsc::UnboundedSender<Box<dyn LogicalPageDecoder>>,
top_level_row: u64,
) -> Result<()> {
trace!("Scheduling struct decode of {} indices", indices.len());

Expand All @@ -236,7 +246,7 @@ impl LogicalPageScheduler for SimpleStructScheduler {
let mut rows_to_read = indices.len() as u32;

// NOTE: See schedule_range for a description of the scheduling algorithm

let mut current_top_level_row = top_level_row;
while rows_to_read > 0 {
let mut min_rows_added = u32::MAX;
for (col_idx, field_scheduler) in self.children.iter().enumerate() {
Expand Down Expand Up @@ -269,7 +279,12 @@ impl LogicalPageScheduler for SimpleStructScheduler {
// We should be guaranteed to get at least one page
let next_page = next_page.unwrap();

next_page.schedule_take(&indices_in_page, scheduler, sink)?;
next_page.schedule_take(
&indices_in_page,
scheduler,
sink,
current_top_level_row,
)?;

let rows_scheduled = indices_in_page.len() as u32;
status.rows_queued += rows_scheduled;
Expand All @@ -281,6 +296,7 @@ impl LogicalPageScheduler for SimpleStructScheduler {
panic!("Error in scheduling logic, panic to avoid infinite loop");
}
rows_to_read -= min_rows_added;
current_top_level_row += min_rows_added as u64;
for field_status in &mut field_status {
field_status.rows_queued -= min_rows_added;
}
Expand Down
13 changes: 11 additions & 2 deletions rust/lance-encoding/src/encodings/physical/basic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -123,18 +123,27 @@ impl PhysicalPageScheduler for BasicPageScheduler {
&self,
ranges: &[std::ops::Range<u32>],
scheduler: &dyn EncodingsIo,
top_level_row: u64,
) -> BoxFuture<'static, Result<Box<dyn PhysicalPageDecoder>>> {
let validity_future = match &self.mode {
SchedulerNullStatus::None(_) | SchedulerNullStatus::All => None,
SchedulerNullStatus::Some(schedulers) => {
trace!("Scheduling ranges {:?} from validity", ranges);
Some(schedulers.validity.schedule_ranges(ranges, scheduler))
Some(
schedulers
.validity
.schedule_ranges(ranges, scheduler, top_level_row),
)
}
};

let values_future = if let Some(values_scheduler) = self.mode.values_scheduler() {
trace!("Scheduling range {:?} from values", ranges);
Some(values_scheduler.schedule_ranges(ranges, scheduler).boxed())
Some(
values_scheduler
.schedule_ranges(ranges, scheduler, top_level_row)
.boxed(),
)
} else {
trace!("No values fetch needed since values all null");
None
Expand Down
3 changes: 2 additions & 1 deletion rust/lance-encoding/src/encodings/physical/bitmap.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ impl PhysicalPageScheduler for DenseBitmapScheduler {
&self,
ranges: &[Range<u32>],
scheduler: &dyn EncodingsIo,
top_level_row: u64,
) -> BoxFuture<'static, Result<Box<dyn PhysicalPageDecoder>>> {
let mut min = u64::MAX;
let mut max = 0;
Expand Down Expand Up @@ -62,7 +63,7 @@ impl PhysicalPageScheduler for DenseBitmapScheduler {
min,
max
);
let bytes = scheduler.submit_request(byte_ranges);
let bytes = scheduler.submit_request(byte_ranges, top_level_row);

async move {
let bytes = bytes.await?;
Expand Down

0 comments on commit 48d12cf

Please sign in to comment.