Skip to content

Commit

Permalink
serialization
Browse files Browse the repository at this point in the history
  • Loading branch information
wjones127 committed May 4, 2024
1 parent b0e8b90 commit cf0abfa
Show file tree
Hide file tree
Showing 4 changed files with 265 additions and 58 deletions.
2 changes: 1 addition & 1 deletion rust/lance-table/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,5 +3,5 @@

pub mod format;
pub mod io;
pub mod rowids;
pub mod utils;
pub mod rowids;
81 changes: 38 additions & 43 deletions rust/lance-table/src/rowids.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
// SPDX-License-Identifier: Apache-2.0
// SPDX-FileCopyrightText: Copyright The Lance Authors

use std::{collections::HashSet, ops::{Range, RangeInclusive}};
use std::{
collections::HashSet,

Check warning on line 5 in rust/lance-table/src/rowids.rs

View workflow job for this annotation

GitHub Actions / linux-build (stable)

unused import: `collections::HashSet`

Check warning on line 5 in rust/lance-table/src/rowids.rs

View workflow job for this annotation

GitHub Actions / linux-build (nightly)

unused import: `collections::HashSet`
ops::{Range, RangeInclusive},
};

use snafu::{location, Location};

Expand All @@ -10,6 +13,7 @@ use lance_core::{Error, Result};
use self::encoded_array::EncodedU64Array;

mod encoded_array;
pub mod serde;

/// A row ID is a unique identifier for a row in a table.
///
Expand Down Expand Up @@ -38,16 +42,16 @@ impl RowId {
}

/// A sequence of row ids.
///
///
/// Row ids are u64s that:
///
///
/// 1. Are **unique** within a table (except for tombstones)
/// 2. Are *often* but not always sorted and/or contiguous.
///
/// This sequence of row ids is optimized to be compact when the row ids are
/// contiguous and sorted. However, it does not require that the row ids are
/// contiguous or sorted.
///
///
/// We can make optimizations that assume uniqueness.
#[derive(Debug)]
pub struct RowIdSequence(Vec<U64Segment>);
Expand Down Expand Up @@ -148,21 +152,22 @@ impl RowIdSequence {
loop {
// Add all segments up to the segment containing the row id.
let segments_handled = old_segments.len() - remaining_segments.len();
let segments_to_add = position.1.0 - segments_handled;
self.0.extend_from_slice(&remaining_segments[..segments_to_add]);
let segments_to_add = position.1 .0 - segments_handled;
self.0
.extend_from_slice(&remaining_segments[..segments_to_add]);
remaining_segments = &remaining_segments[segments_to_add..];

let segment;
(segment, remaining_segments) = remaining_segments.split_first().unwrap();

// Handle all positions for this segment now.
position_batch.push(position.1.1);
position_batch.push(position.1 .1);
while let Some(next_position) = positions_iter.peek() {
if next_position.1.0 != position.1.0 {
if next_position.1 .0 != position.1 .0 {
position = positions_iter.next().unwrap();
break;
}
position_batch.push(next_position.1.1);
position_batch.push(next_position.1 .1);
position = positions_iter.next().unwrap();
}

Expand All @@ -179,7 +184,7 @@ impl RowIdSequence {
}

/// Find the row ids in the sequence.
///
///
/// Returns the row ids and, if found, the position of the segment containing
/// it and the offset within the segment.
fn find_ids(&self, row_ids: impl IntoIterator<Item = u64>) -> Vec<(u64, (usize, usize))> {
Expand All @@ -188,22 +193,24 @@ impl RowIdSequence {
// restarting the search from the beginning each time.
let mut segment_iter = self.0.iter().enumerate().cycle();

row_ids.into_iter()
row_ids
.into_iter()
.filter_map(|row_id| {
let mut i = 0;
// If we've cycled through all segments, we know the row id is not in the sequence.
while i < self.0.len() {
let (segment_idx, segment) = segment_iter.next().unwrap();
if segment.range().map_or(false, |range| range.contains(&row_id)) {
if segment
.range()
.map_or(false, |range| range.contains(&row_id))
{
let offset = match segment {
U64Segment::Tombstones(_) => unreachable!("Tombstones should not be in the sequence"),
U64Segment::Tombstones(_) => {
unreachable!("Tombstones should not be in the sequence")
}
U64Segment::Range(range) => Some((row_id - range.start) as usize),
U64Segment::SortedArray(array) => {
array.binary_search(row_id).ok()
},
U64Segment::Array(array) => {
array.iter().position(|v| v == row_id)
},
U64Segment::SortedArray(array) => array.binary_search(row_id).ok(),
U64Segment::Array(array) => array.iter().position(|v| v == row_id),
};
if let Some(offset) = offset {
return Some((row_id, (segment_idx, offset)));
Expand All @@ -219,22 +226,18 @@ impl RowIdSequence {

/// Replace the positions in the segment with tombstones, pushing the new
/// segments onto the destination vector.
///
///
/// This might involve splitting the segment into multiple segments.
/// It also might increment the tombstone count of the previous segment.
fn delete_from_segment(
dest: &mut Vec<U64Segment>,
segment: &U64Segment,
positions: &[usize],
) {
fn delete_from_segment(dest: &mut Vec<U64Segment>, segment: &U64Segment, positions: &[usize]) {
// Offset to the first position in segment that we haven't added to dest.
let mut offset = 0;
for &position in positions {
// Add portio of segment up to the position.
if position > offset {
dest.push(segment.slice(offset, position - offset));
}

// Add the tombstone. If the last segment is a tombstone, increment the count
// instead of appending a new tombstone segment.
match dest.last_mut() {
Expand All @@ -244,7 +247,7 @@ impl RowIdSequence {

offset = position + 1;
}

// Add the remaining slice of the segment.
if offset < segment.len() {
dest.push(segment.slice(offset, segment.len() - offset));
Expand Down Expand Up @@ -298,12 +301,12 @@ impl U64Segment {
let min_value = array.first().unwrap();
let max_value = array.last().unwrap();
Some(min_value..=max_value)
},
}
U64Segment::Array(array) => {
let min_value = array.min().unwrap();
let max_value = array.max().unwrap();
Some(min_value..=max_value)
},
}
}
}

Expand All @@ -313,14 +316,12 @@ impl U64Segment {
U64Segment::Range(range) => {
let start = range.start + offset as u64;
U64Segment::Range(start..(start + len as u64))
},
}
U64Segment::SortedArray(array) => {
// TODO: this could be optimized.
U64Segment::SortedArray(array.slice(offset, len))
},
U64Segment::Array(array) => {
U64Segment::Array(array.slice(offset, len))
},
}
U64Segment::Array(array) => U64Segment::Array(array.slice(offset, len)),
}
}
}
Expand Down Expand Up @@ -408,16 +409,10 @@ mod test {

let mut sequence = RowIdSequence::from(0..10);
sequence.delete(vec![0, 1, 2, 3, 4, 5, 6, 7, 8, 9]);
assert_eq!(
sequence.0,
vec![U64Segment::Tombstones(10)]
);
assert_eq!(sequence.0, vec![U64Segment::Tombstones(10)]);

let mut sequence = RowIdSequence::from(0..10);
sequence.delete(vec![9, 8, 7, 6, 5, 4, 3, 2, 1, 0]);
assert_eq!(
sequence.0,
vec![U64Segment::Tombstones(10)]
);
assert_eq!(sequence.0, vec![U64Segment::Tombstones(10)]);
}
}
}
34 changes: 20 additions & 14 deletions rust/lance-table/src/rowids/encoded_array.rs
Original file line number Diff line number Diff line change
Expand Up @@ -144,12 +144,14 @@ impl EncodedU64Array {

pub fn slice(&self, offset: usize, len: usize) -> Self {
match self {
EncodedU64Array::U16 { base, offsets } => {
offsets[offset..(offset + len)].iter().map(|o| *base + *o as u64).collect()
}
EncodedU64Array::U32 { base, offsets } => {
offsets[offset..(offset + len)].iter().map(|o| *base + *o as u64).collect()
}
EncodedU64Array::U16 { base, offsets } => offsets[offset..(offset + len)]
.iter()
.map(|o| *base + *o as u64)
.collect(),
EncodedU64Array::U32 { base, offsets } => offsets[offset..(offset + len)]
.iter()
.map(|o| *base + *o as u64)
.collect(),
EncodedU64Array::U64(values) => {
let values = values[offset..(offset + len)].to_vec();
EncodedU64Array::U64(values)
Expand Down Expand Up @@ -291,13 +293,17 @@ mod test {
let range = (2 * u16::MAX as u64)..(40 + 2 * u16::MAX as u64);
let encoded = EncodedU64Array::from(range.clone());
let expected_base = 2 * u16::MAX as u64;
assert!(matches!(
encoded,
EncodedU64Array::U16 {
base,
..
} if base == expected_base
), "{:?}", encoded);
assert!(
matches!(
encoded,
EncodedU64Array::U16 {
base,
..
} if base == expected_base
),
"{:?}",
encoded
);
let roundtripped = encoded.into_iter().collect::<Vec<_>>();
assert_eq!(range.collect::<Vec<_>>(), roundtripped);

Expand All @@ -316,7 +322,7 @@ mod test {
assert_eq!(range.collect::<Vec<_>>(), roundtripped);

// We'll skip u64 since it would take a lot of memory.

// Empty one
let range = 42..42;
let encoded = EncodedU64Array::from(range.clone());
Expand Down

0 comments on commit cf0abfa

Please sign in to comment.