Skip to content

Commit

Permalink
serialization
Browse files Browse the repository at this point in the history
  • Loading branch information
wjones127 committed May 6, 2024
1 parent b0e8b90 commit 4eb6b8f
Show file tree
Hide file tree
Showing 6 changed files with 315 additions and 61 deletions.
1 change: 1 addition & 0 deletions rust/lance-table/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ object_store.workspace = true
prost.workspace = true
prost-types.workspace = true
rand.workspace = true
rangemap.workspace = true
roaring.workspace = true
serde.workspace = true
serde_json.workspace = true
Expand Down
2 changes: 1 addition & 1 deletion rust/lance-table/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,5 +3,5 @@

pub mod format;
pub mod io;
pub mod rowids;
pub mod utils;
pub mod rowids;
84 changes: 38 additions & 46 deletions rust/lance-table/src/rowids.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
// SPDX-License-Identifier: Apache-2.0
// SPDX-FileCopyrightText: Copyright The Lance Authors

use std::{collections::HashSet, ops::{Range, RangeInclusive}};
use std::ops::{Range, RangeInclusive};

use snafu::{location, Location};

Expand All @@ -10,6 +10,10 @@ use lance_core::{Error, Result};
use self::encoded_array::EncodedU64Array;

mod encoded_array;
pub mod serde;
mod index;

pub use index::RowIdIndex;

/// A row ID is a unique identifier for a row in a table.
///
Expand Down Expand Up @@ -38,16 +42,16 @@ impl RowId {
}

/// A sequence of row ids.
///
///
/// Row ids are u64s that:
///
///
/// 1. Are **unique** within a table (except for tombstones)
/// 2. Are *often* but not always sorted and/or contiguous.
///
/// This sequence of row ids is optimized to be compact when the row ids are
/// contiguous and sorted. However, it does not require that the row ids are
/// contiguous or sorted.
///
///
/// We can make optimizations that assume uniqueness.
#[derive(Debug)]
pub struct RowIdSequence(Vec<U64Segment>);
Expand Down Expand Up @@ -148,21 +152,22 @@ impl RowIdSequence {
loop {
// Add all segments up to the segment containing the row id.
let segments_handled = old_segments.len() - remaining_segments.len();
let segments_to_add = position.1.0 - segments_handled;
self.0.extend_from_slice(&remaining_segments[..segments_to_add]);
let segments_to_add = position.1 .0 - segments_handled;
self.0
.extend_from_slice(&remaining_segments[..segments_to_add]);
remaining_segments = &remaining_segments[segments_to_add..];

let segment;
(segment, remaining_segments) = remaining_segments.split_first().unwrap();

// Handle all positions for this segment now.
position_batch.push(position.1.1);
position_batch.push(position.1 .1);
while let Some(next_position) = positions_iter.peek() {
if next_position.1.0 != position.1.0 {
if next_position.1 .0 != position.1 .0 {
position = positions_iter.next().unwrap();
break;
}
position_batch.push(next_position.1.1);
position_batch.push(next_position.1 .1);
position = positions_iter.next().unwrap();
}

Expand All @@ -179,7 +184,7 @@ impl RowIdSequence {
}

/// Find the row ids in the sequence.
///
///
/// Returns the row ids and, if found, the position of the segment containing
/// it and the offset within the segment.
fn find_ids(&self, row_ids: impl IntoIterator<Item = u64>) -> Vec<(u64, (usize, usize))> {
Expand All @@ -188,22 +193,24 @@ impl RowIdSequence {
// restarting the search from the beginning each time.
let mut segment_iter = self.0.iter().enumerate().cycle();

row_ids.into_iter()
row_ids
.into_iter()
.filter_map(|row_id| {
let mut i = 0;
// If we've cycled through all segments, we know the row id is not in the sequence.
while i < self.0.len() {
let (segment_idx, segment) = segment_iter.next().unwrap();
if segment.range().map_or(false, |range| range.contains(&row_id)) {
if segment
.range()
.map_or(false, |range| range.contains(&row_id))
{
let offset = match segment {
U64Segment::Tombstones(_) => unreachable!("Tombstones should not be in the sequence"),
U64Segment::Tombstones(_) => {
unreachable!("Tombstones should not be in the sequence")
}
U64Segment::Range(range) => Some((row_id - range.start) as usize),
U64Segment::SortedArray(array) => {
array.binary_search(row_id).ok()
},
U64Segment::Array(array) => {
array.iter().position(|v| v == row_id)
},
U64Segment::SortedArray(array) => array.binary_search(row_id).ok(),
U64Segment::Array(array) => array.iter().position(|v| v == row_id),
};
if let Some(offset) = offset {
return Some((row_id, (segment_idx, offset)));
Expand All @@ -219,22 +226,18 @@ impl RowIdSequence {

/// Replace the positions in the segment with tombstones, pushing the new
/// segments onto the destination vector.
///
///
/// This might involve splitting the segment into multiple segments.
/// It also might increment the tombstone count of the previous segment.
fn delete_from_segment(
dest: &mut Vec<U64Segment>,
segment: &U64Segment,
positions: &[usize],
) {
fn delete_from_segment(dest: &mut Vec<U64Segment>, segment: &U64Segment, positions: &[usize]) {
// Offset to the first position in segment that we haven't added to dest.
let mut offset = 0;
for &position in positions {
// Add portio of segment up to the position.
if position > offset {
dest.push(segment.slice(offset, position - offset));
}

// Add the tombstone. If the last segment is a tombstone, increment the count
// instead of appending a new tombstone segment.
match dest.last_mut() {
Expand All @@ -244,17 +247,14 @@ impl RowIdSequence {

offset = position + 1;
}

// Add the remaining slice of the segment.
if offset < segment.len() {
dest.push(segment.slice(offset, segment.len() - offset));
}
}
}

/// An index of row ids
pub struct RowIdIndex {}

/// Different ways to represent a sequence of u64s.
///
/// This is designed to be especially efficient for sequences that are sorted,
Expand Down Expand Up @@ -298,12 +298,12 @@ impl U64Segment {
let min_value = array.first().unwrap();
let max_value = array.last().unwrap();
Some(min_value..=max_value)
},
}
U64Segment::Array(array) => {
let min_value = array.min().unwrap();
let max_value = array.max().unwrap();
Some(min_value..=max_value)
},
}
}
}

Expand All @@ -313,14 +313,12 @@ impl U64Segment {
U64Segment::Range(range) => {
let start = range.start + offset as u64;
U64Segment::Range(start..(start + len as u64))
},
}
U64Segment::SortedArray(array) => {
// TODO: this could be optimized.
U64Segment::SortedArray(array.slice(offset, len))
},
U64Segment::Array(array) => {
U64Segment::Array(array.slice(offset, len))
},
}
U64Segment::Array(array) => U64Segment::Array(array.slice(offset, len)),
}
}
}
Expand Down Expand Up @@ -408,16 +406,10 @@ mod test {

let mut sequence = RowIdSequence::from(0..10);
sequence.delete(vec![0, 1, 2, 3, 4, 5, 6, 7, 8, 9]);
assert_eq!(
sequence.0,
vec![U64Segment::Tombstones(10)]
);
assert_eq!(sequence.0, vec![U64Segment::Tombstones(10)]);

let mut sequence = RowIdSequence::from(0..10);
sequence.delete(vec![9, 8, 7, 6, 5, 4, 3, 2, 1, 0]);
assert_eq!(
sequence.0,
vec![U64Segment::Tombstones(10)]
);
assert_eq!(sequence.0, vec![U64Segment::Tombstones(10)]);
}
}
}
34 changes: 20 additions & 14 deletions rust/lance-table/src/rowids/encoded_array.rs
Original file line number Diff line number Diff line change
Expand Up @@ -144,12 +144,14 @@ impl EncodedU64Array {

pub fn slice(&self, offset: usize, len: usize) -> Self {
match self {
EncodedU64Array::U16 { base, offsets } => {
offsets[offset..(offset + len)].iter().map(|o| *base + *o as u64).collect()
}
EncodedU64Array::U32 { base, offsets } => {
offsets[offset..(offset + len)].iter().map(|o| *base + *o as u64).collect()
}
EncodedU64Array::U16 { base, offsets } => offsets[offset..(offset + len)]
.iter()
.map(|o| *base + *o as u64)
.collect(),
EncodedU64Array::U32 { base, offsets } => offsets[offset..(offset + len)]
.iter()
.map(|o| *base + *o as u64)
.collect(),
EncodedU64Array::U64(values) => {
let values = values[offset..(offset + len)].to_vec();
EncodedU64Array::U64(values)
Expand Down Expand Up @@ -291,13 +293,17 @@ mod test {
let range = (2 * u16::MAX as u64)..(40 + 2 * u16::MAX as u64);
let encoded = EncodedU64Array::from(range.clone());
let expected_base = 2 * u16::MAX as u64;
assert!(matches!(
encoded,
EncodedU64Array::U16 {
base,
..
} if base == expected_base
), "{:?}", encoded);
assert!(
matches!(
encoded,
EncodedU64Array::U16 {
base,
..
} if base == expected_base
),
"{:?}",
encoded
);
let roundtripped = encoded.into_iter().collect::<Vec<_>>();
assert_eq!(range.collect::<Vec<_>>(), roundtripped);

Expand All @@ -316,7 +322,7 @@ mod test {
assert_eq!(range.collect::<Vec<_>>(), roundtripped);

// We'll skip u64 since it would take a lot of memory.

// Empty one
let range = 42..42;
let encoded = EncodedU64Array::from(range.clone());
Expand Down
49 changes: 49 additions & 0 deletions rust/lance-table/src/rowids/index.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
// SPDX-License-Identifier: Apache-2.0
// SPDX-FileCopyrightText: Copyright The Lance Authors

use std::{collections::BTreeMap, ops::{Range, RangeInclusive}};

use lance_core::utils::address::RowAddress;
use rangemap::{RangeInclusiveMap, RangeMap};

use super::{RowIdSequence, U64Segment};

/// An index of row ids
///
/// This index is used to map row ids to their corresponding addresses.
///
/// This is structured as a B tree, where the keys and values can either
/// be a single value or a range.
pub struct RowIdIndex(RangeInclusiveMap<u64, (U64Segment, U64Segment)>);

impl RowIdIndex {
/// Create a new index from a list of fragment ids and their corresponding row id sequences.
pub fn new(fragment_indices: &[(u32, RowIdSequence)]) -> Self {
let mut pieces = fragment_indices
.iter()
.flat_map(|(fragment_id, sequence)| decompose_sequence(*fragment_id, sequence))
.collect::<Vec<_>>();
pieces.sort_by_key(|(range, _)| *range.start());
todo!("Handle overlapping ranges")

}
}

fn decompose_sequence(
fragment_id: u32,
sequence: &RowIdSequence,
) -> Vec<(RangeInclusive<u64>, (U64Segment, U64Segment))> {
let mut start_address: u64 = RowAddress::first_row(fragment_id).into();
sequence.0
.iter()
.filter_map(|segment| {
let segment_len = segment.len() as u64;
let address_segment = U64Segment::Range(start_address..(start_address as u64 + segment_len));
start_address += segment_len as u64;

let coverage = segment.range()?;

Some((coverage, (segment.clone(), address_segment)))
})
.collect()
}

0 comments on commit 4eb6b8f

Please sign in to comment.