diff --git a/vortex-file/src/strategy.rs b/vortex-file/src/strategy.rs index 980082b5802..b7c0e1cb535 100644 --- a/vortex-file/src/strategy.rs +++ b/vortex-file/src/strategy.rs @@ -49,6 +49,8 @@ use vortex_layout::layouts::collect::CollectStrategy; use vortex_layout::layouts::compressed::CompressingStrategy; use vortex_layout::layouts::compressed::CompressorPlugin; use vortex_layout::layouts::dict::writer::DictStrategy; +#[cfg(feature = "unstable_encodings")] +use vortex_layout::layouts::fixed_size_list::writer::FixedSizeListLayoutStrategy; use vortex_layout::layouts::flat::writer::FlatLayoutStrategy; use vortex_layout::layouts::repartition::RepartitionStrategy; use vortex_layout::layouts::repartition::RepartitionWriterOptions; @@ -242,8 +244,20 @@ impl WriteStrategyBuilder { Arc::new(FlatLayoutStrategy::default()) }; - // 7. for each chunk create a flat layout - let chunked = ChunkedLayoutStrategy::new(Arc::clone(&flat)); + // 7. for each chunk create a leaf layout. Under the `unstable_encodings` feature, + // fixed-size-list chunks route through `FixedSizeListLayoutStrategy`, which stores + // elements and list validity as separately-addressable child layouts. Other chunks fall + // through to the flat strategy. + #[cfg(feature = "unstable_encodings")] + let leaf: Arc = Arc::new( + FixedSizeListLayoutStrategy::default() + .with_elements(Arc::clone(&flat)) + .with_validity(Arc::clone(&flat)) + .with_fallback(Arc::clone(&flat)), + ); + #[cfg(not(feature = "unstable_encodings"))] + let leaf: Arc = Arc::clone(&flat); + let chunked = ChunkedLayoutStrategy::new(leaf); // 6. buffer chunks so they end up with closer segment ids physically let buffered = BufferedStrategy::new(chunked, 2 * ONE_MEG); // 2MB diff --git a/vortex-layout/src/layouts/fixed_size_list/mod.rs b/vortex-layout/src/layouts/fixed_size_list/mod.rs new file mode 100644 index 00000000000..291558906ad --- /dev/null +++ b/vortex-layout/src/layouts/fixed_size_list/mod.rs @@ -0,0 +1,255 @@ +// SPDX-License-Identifier: Apache-2.0 +// SPDX-FileCopyrightText: Copyright the Vortex contributors + +mod reader; +pub mod writer; + +use std::sync::Arc; + +use reader::FixedSizeListReader; +use vortex_array::DeserializeMetadata; +use vortex_array::EmptyMetadata; +use vortex_array::dtype::DType; +use vortex_array::dtype::Nullability; +use vortex_error::VortexExpect; +use vortex_error::VortexResult; +use vortex_error::vortex_bail; +use vortex_error::vortex_ensure; +use vortex_error::vortex_err; +use vortex_error::vortex_panic; +use vortex_session::VortexSession; + +use crate::LayoutBuildContext; +use crate::LayoutChildType; +use crate::LayoutEncodingRef; +use crate::LayoutId; +use crate::LayoutReaderContext; +use crate::LayoutReaderRef; +use crate::LayoutRef; +use crate::VTable; +use crate::children::LayoutChildren; +use crate::segments::SegmentId; +use crate::segments::SegmentSource; +use crate::vtable; + +/// Child index of the `elements` layout. +pub const ELEMENTS_CHILD_INDEX: usize = 0; +/// Child index of the `validity` layout, only present when the fixed-size list dtype is nullable. +pub const VALIDITY_CHILD_INDEX: usize = 1; + +vtable!(FixedSizeList); + +impl VTable for FixedSizeList { + type Layout = FixedSizeListLayout; + type Encoding = FixedSizeListLayoutEncoding; + type Metadata = EmptyMetadata; + + fn id(_encoding: &Self::Encoding) -> LayoutId { + LayoutId::new("vortex.fixed_size_list") + } + + fn encoding(_layout: &Self::Layout) -> LayoutEncodingRef { + LayoutEncodingRef::new_ref(FixedSizeListLayoutEncoding.as_ref()) + } + + fn row_count(layout: &Self::Layout) -> u64 { + layout.row_count + } + + fn dtype(layout: &Self::Layout) -> &DType { + &layout.dtype + } + + fn metadata(_layout: &Self::Layout) -> Self::Metadata { + EmptyMetadata + } + + fn segment_ids(_layout: &Self::Layout) -> Vec { + vec![] + } + + fn nchildren(layout: &Self::Layout) -> usize { + 1 + usize::from(layout.dtype.is_nullable()) + } + + fn child(layout: &Self::Layout, idx: usize) -> VortexResult { + match (idx, layout.validity.as_ref()) { + (ELEMENTS_CHILD_INDEX, _) => Ok(Arc::clone(&layout.elements)), + (VALIDITY_CHILD_INDEX, Some(validity)) => Ok(Arc::clone(validity)), + _ => vortex_bail!("Invalid child index {idx} for FixedSizeListLayout"), + } + } + + fn child_type(layout: &Self::Layout, idx: usize) -> LayoutChildType { + match (idx, layout.validity.is_some()) { + (ELEMENTS_CHILD_INDEX, _) => LayoutChildType::Auxiliary("elements".into()), + (VALIDITY_CHILD_INDEX, true) => LayoutChildType::Auxiliary("validity".into()), + _ => vortex_panic!("Invalid child index {idx} for FixedSizeListLayout"), + } + } + + fn new_reader( + layout: &Self::Layout, + name: Arc, + segment_source: Arc, + session: &VortexSession, + ctx: &LayoutReaderContext, + ) -> VortexResult { + Ok(Arc::new(FixedSizeListReader::try_new( + layout.clone(), + name, + segment_source, + session.clone(), + ctx, + )?)) + } + + fn build( + _encoding: &Self::Encoding, + dtype: &DType, + row_count: u64, + _metadata: &::Output, + _segment_ids: Vec, + children: &dyn LayoutChildren, + _ctx: &LayoutBuildContext<'_>, + ) -> VortexResult { + validate_children(dtype, row_count, children)?; + + let element_dtype = dtype + .as_fixed_size_list_element_opt() + .ok_or_else(|| vortex_err!("FixedSizeListLayout requires a FixedSizeList dtype"))?; + let elements = children.child(ELEMENTS_CHILD_INDEX, element_dtype)?; + let validity = dtype + .is_nullable() + .then(|| children.child(VALIDITY_CHILD_INDEX, &DType::Bool(Nullability::NonNullable))) + .transpose()?; + + Ok(FixedSizeListLayout { + row_count, + dtype: dtype.clone(), + elements, + validity, + }) + } + + fn with_children(layout: &mut Self::Layout, children: Vec) -> VortexResult<()> { + validate_child_count(layout.dtype(), children.len())?; + + let mut iter = children.into_iter(); + layout.elements = iter + .next() + .ok_or_else(|| vortex_err!("missing elements child"))?; + layout.validity = layout + .dtype + .is_nullable() + .then(|| { + iter.next() + .ok_or_else(|| vortex_err!("missing validity child")) + }) + .transpose()?; + Ok(()) + } +} + +fn validate_child_count(dtype: &DType, nchildren: usize) -> VortexResult<()> { + let expected = 1 + usize::from(dtype.is_nullable()); + vortex_ensure!( + nchildren == expected, + "FixedSizeListLayout expects {expected} children, got {nchildren}" + ); + Ok(()) +} + +fn validate_children( + dtype: &DType, + row_count: u64, + children: &dyn LayoutChildren, +) -> VortexResult<()> { + validate_child_count(dtype, children.nchildren())?; + let DType::FixedSizeList(_, list_size, _) = dtype else { + vortex_bail!("FixedSizeListLayout requires a FixedSizeList dtype, got {dtype}"); + }; + let expected_elements = row_count + .checked_mul(u64::from(*list_size)) + .ok_or_else(|| vortex_err!("fixed-size list elements row count overflow"))?; + let actual_elements = children.child_row_count(ELEMENTS_CHILD_INDEX); + vortex_ensure!( + actual_elements == expected_elements, + "FixedSizeListLayout elements row count {actual_elements} does not match expected {expected_elements}" + ); + if dtype.is_nullable() { + let validity_rows = children.child_row_count(VALIDITY_CHILD_INDEX); + vortex_ensure!( + validity_rows == row_count, + "FixedSizeListLayout validity row count {validity_rows} does not match row count {row_count}" + ); + } + Ok(()) +} + +#[derive(Debug)] +pub struct FixedSizeListLayoutEncoding; + +/// Stores a fixed-size list by shredding elements and optional list validity into child layouts. +#[derive(Clone, Debug)] +pub struct FixedSizeListLayout { + row_count: u64, + dtype: DType, + elements: LayoutRef, + validity: Option, +} + +impl FixedSizeListLayout { + /// Construct a fixed-size-list layout from its components. + /// + /// # Invariants + /// + /// - `dtype` must be a [`DType::FixedSizeList`]. + /// - `elements.row_count() == row_count * list_size`. + /// - `validity` is present iff `dtype.is_nullable()`. + pub fn new( + row_count: u64, + dtype: DType, + elements: LayoutRef, + validity: Option, + ) -> Self { + Self { + row_count, + dtype, + elements, + validity, + } + } + + /// Number of fixed-size-list rows in this layout. + #[inline] + pub fn row_count(&self) -> u64 { + self.row_count + } + + #[inline] + pub fn elements(&self) -> &LayoutRef { + &self.elements + } + + #[inline] + pub fn validity(&self) -> Option<&LayoutRef> { + self.validity.as_ref() + } + + /// The fixed number of elements in each list row. + #[inline] + pub fn list_size(&self) -> u32 { + match &self.dtype { + DType::FixedSizeList(_, list_size, _) => *list_size, + _ => vortex_panic!("FixedSizeListLayout dtype must be FixedSizeList"), + } + } + + /// The dtype of the inner elements column. + pub fn elements_dtype(&self) -> &DType { + self.dtype + .as_fixed_size_list_element_opt() + .vortex_expect("FixedSizeListLayout dtype must be FixedSizeList") + } +} diff --git a/vortex-layout/src/layouts/fixed_size_list/reader.rs b/vortex-layout/src/layouts/fixed_size_list/reader.rs new file mode 100644 index 00000000000..4e2fcbec698 --- /dev/null +++ b/vortex-layout/src/layouts/fixed_size_list/reader.rs @@ -0,0 +1,621 @@ +// SPDX-License-Identifier: Apache-2.0 +// SPDX-FileCopyrightText: Copyright the Vortex contributors + +use std::ops::Range; +use std::sync::Arc; + +use futures::FutureExt; +use futures::future::BoxFuture; +use futures::try_join; +use vortex_array::ArrayRef; +use vortex_array::IntoArray; +use vortex_array::MaskFuture; +use vortex_array::VortexSessionExecute; +use vortex_array::arrays::FixedSizeListArray; +use vortex_array::dtype::DType; +use vortex_array::dtype::FieldMask; +use vortex_array::dtype::Nullability; +use vortex_array::expr::Expression; +use vortex_array::expr::is_root; +use vortex_array::expr::not; +use vortex_array::expr::root; +use vortex_array::scalar_fn::fns::is_not_null::IsNotNull; +use vortex_array::scalar_fn::fns::is_null::IsNull; +use vortex_array::validity::Validity; +use vortex_error::VortexResult; +use vortex_error::vortex_err; +use vortex_mask::AllOr; +use vortex_mask::Mask; +use vortex_session::VortexSession; + +use crate::ArrayFuture; +use crate::LayoutReader; +use crate::LayoutReaderContext; +use crate::LayoutReaderRef; +use crate::RowSplits; +use crate::SplitRange; +use crate::layouts::fixed_size_list::FixedSizeListLayout; +use crate::segments::SegmentSource; + +type OptionalArrayFuture = BoxFuture<'static, VortexResult>>; + +#[derive(Clone)] +pub(super) struct FixedSizeListReader { + layout: FixedSizeListLayout, + name: Arc, + session: VortexSession, + elements: LayoutReaderRef, + validity: Option, +} + +impl FixedSizeListReader { + pub(super) fn try_new( + layout: FixedSizeListLayout, + name: Arc, + segment_source: Arc, + session: VortexSession, + ctx: &LayoutReaderContext, + ) -> VortexResult { + let elements = layout.elements().new_reader( + format!("{name}.elements").into(), + Arc::clone(&segment_source), + &session, + ctx, + )?; + let validity = layout + .validity() + .map(|v| { + v.new_reader( + format!("{name}.validity").into(), + Arc::clone(&segment_source), + &session, + ctx, + ) + }) + .transpose()?; + + Ok(Self { + layout, + name, + session, + elements, + validity, + }) + } + + fn project_validity( + &self, + row_range: &Range, + expr: &Expression, + mask: MaskFuture, + ) -> VortexResult { + let validity_reader = self.validity.clone(); + let nullability = self.layout.dtype().nullability(); + let row_range = row_range.clone(); + let rewritten = rewrite_validity_expr(expr)?; + + Ok(async move { + let mask = mask.await?; + let row_count = usize::try_from(row_range.end - row_range.start)?; + let out_len = if mask.all_true() { + row_count + } else { + mask.true_count() + }; + + let validity_array = match validity_reader.as_ref() { + Some(v) => Some( + v.projection_evaluation(&row_range, &root(), MaskFuture::ready(mask))? + .await?, + ), + None => None, + }; + + create_validity(validity_array, nullability) + .to_array(out_len) + .apply(&rewritten) + } + .boxed()) + } + + fn project_elements( + &self, + row_range: &Range, + expr: &Expression, + mask: MaskFuture, + ) -> VortexResult { + let projection = ElementsProjection { + reader: self.clone(), + expr: expr.clone(), + row_range: row_range.clone(), + }; + + Ok(async move { + let mask = mask.await?; + if mask.all_true() { + projection.project_full_range().await + } else { + projection.project_sparse(mask).await + } + } + .boxed()) + } +} + +impl LayoutReader for FixedSizeListReader { + fn name(&self) -> &Arc { + &self.name + } + + fn as_any(&self) -> &dyn std::any::Any { + self + } + + fn dtype(&self) -> &DType { + self.layout.dtype() + } + + fn row_count(&self) -> u64 { + self.layout.row_count() + } + + fn register_splits( + &self, + field_mask: &[FieldMask], + split_range: &SplitRange, + splits: &mut RowSplits, + ) -> VortexResult<()> { + self.elements.register_splits( + field_mask, + &element_split_range(split_range, self.layout.list_size())?, + splits, + )?; + if let Some(validity) = &self.validity { + validity.register_splits(field_mask, split_range, splits)?; + } + Ok(()) + } + + fn pruning_evaluation( + &self, + _row_range: &Range, + _expr: &Expression, + mask: Mask, + ) -> VortexResult { + Ok(MaskFuture::ready(mask)) + } + + fn filter_evaluation( + &self, + row_range: &Range, + expr: &Expression, + mask: MaskFuture, + ) -> VortexResult { + let len = mask.len(); + let reader = self.clone(); + let row_range = row_range.clone(); + let expr = expr.clone(); + let session = self.session.clone(); + + Ok(MaskFuture::new(len, async move { + let mask = mask.await?; + if mask.all_false() { + return Ok(mask); + } + + let predicate = reader + .projection_evaluation(&row_range, &expr, MaskFuture::ready(mask.clone()))? + .await?; + let mut ctx = session.create_execution_ctx(); + let predicate_mask = predicate.null_as_false().execute(&mut ctx)?; + + if mask.all_true() { + Ok(predicate_mask) + } else { + Ok(mask.intersect_by_rank(&predicate_mask)) + } + })) + } + + fn projection_evaluation( + &self, + row_range: &Range, + expr: &Expression, + mask: MaskFuture, + ) -> VortexResult { + match classify(expr) { + ExprClass::Validity => self.project_validity(row_range, expr, mask), + ExprClass::Elements => self.project_elements(row_range, expr, mask), + } + } +} + +fn element_split_range(split_range: &SplitRange, list_size: u32) -> VortexResult { + let list_size = u64::from(list_size); + let row_range = element_range(split_range.row_range(), list_size)?; + let row_offset = split_range + .row_offset() + .checked_mul(list_size) + .ok_or_else(|| vortex_err!("fixed-size-list split offset overflow"))?; + SplitRange::try_new(row_offset, row_range) +} + +fn element_range(row_range: &Range, list_size: u64) -> VortexResult> { + let start = row_range + .start + .checked_mul(list_size) + .ok_or_else(|| vortex_err!("fixed-size-list element range overflow"))?; + let end = row_range + .end + .checked_mul(list_size) + .ok_or_else(|| vortex_err!("fixed-size-list element range overflow"))?; + Ok(start..end) +} + +fn fetch_validity( + validity: Option<&LayoutReaderRef>, + row_range: &Range, + mask: MaskFuture, +) -> VortexResult { + let fut = validity + .map(|v| v.projection_evaluation(row_range, &root(), mask)) + .transpose()?; + Ok(async move { + match fut { + Some(f) => f.await.map(Some), + None => Ok(None), + } + } + .boxed()) +} + +fn create_validity(validity_array: Option, nullability: Nullability) -> Validity { + match validity_array { + Some(arr) => Validity::Array(arr), + None => match nullability { + Nullability::Nullable => Validity::AllValid, + Nullability::NonNullable => Validity::NonNullable, + }, + } +} + +fn build_fixed_size_list( + elements: ArrayRef, + validity_array: Option, + dtype: &DType, + len: usize, + expr: &Expression, +) -> VortexResult { + let DType::FixedSizeList(_, list_size, nullability) = dtype else { + return Err(vortex_err!( + "FixedSizeListLayout requires FixedSizeList dtype, got {dtype}" + )); + }; + let validity = create_validity(validity_array, *nullability); + FixedSizeListArray::try_new(elements, *list_size, validity, len)? + .into_array() + .apply(expr) +} + +#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)] +enum ExprClass { + Validity, + Elements, +} + +fn classify(expr: &Expression) -> ExprClass { + if (expr.is::() || expr.is::()) + && expr.children().len() == 1 + && is_root(expr.child(0)) + { + return ExprClass::Validity; + } + + if is_root(expr) { + return ExprClass::Elements; + } + + expr.children() + .iter() + .map(classify) + .max() + .unwrap_or(ExprClass::Validity) +} + +fn rewrite_validity_expr(expr: &Expression) -> VortexResult { + if expr.is::() && expr.children().len() == 1 && is_root(expr.child(0)) { + return Ok(root()); + } + if expr.is::() && expr.children().len() == 1 && is_root(expr.child(0)) { + return Ok(not(root())); + } + let children = expr + .children() + .iter() + .map(rewrite_validity_expr) + .collect::>>()?; + expr.clone().with_children(children) +} + +struct SparseElementPlan { + elements_range: Range, + element_mask: Mask, + kept_count: usize, +} + +fn sparse_element_plan( + row_range: &Range, + mask: &Mask, + list_size: u64, +) -> VortexResult { + let kept_count = mask.true_count(); + if kept_count == 0 || list_size == 0 { + return Ok(SparseElementPlan { + elements_range: 0..0, + element_mask: Mask::new_true(0), + kept_count, + }); + } + + let first = mask + .first() + .ok_or_else(|| vortex_err!("sparse mask has no true values"))?; + let last = mask + .last() + .ok_or_else(|| vortex_err!("sparse mask has no true values"))?; + let first_row = row_range + .start + .checked_add(u64::try_from(first)?) + .ok_or_else(|| vortex_err!("fixed-size-list row range overflow"))?; + let last_row_exclusive = row_range + .start + .checked_add(u64::try_from(last + 1)?) + .ok_or_else(|| vortex_err!("fixed-size-list row range overflow"))?; + let elements_range = element_range(&(first_row..last_row_exclusive), list_size)?; + let element_mask_len = usize::try_from(elements_range.end - elements_range.start)?; + + let list_size_usize = usize::try_from(list_size)?; + let mut element_slices = Vec::with_capacity(kept_count); + match mask.indices() { + AllOr::All => { + return Ok(SparseElementPlan { + elements_range, + element_mask: Mask::new_true(element_mask_len), + kept_count, + }); + } + AllOr::None => {} + AllOr::Some(indices) => { + for &idx in indices { + let relative = idx - first; + let start = relative + .checked_mul(list_size_usize) + .ok_or_else(|| vortex_err!("fixed-size-list element mask overflow"))?; + element_slices.push((start, start + list_size_usize)); + } + } + } + + Ok(SparseElementPlan { + elements_range, + element_mask: Mask::from_slices(element_mask_len, element_slices), + kept_count, + }) +} + +struct ElementsProjection { + reader: FixedSizeListReader, + expr: Expression, + row_range: Range, +} + +impl ElementsProjection { + async fn project_full_range(self) -> VortexResult { + let Self { + reader, + expr, + row_range, + } = self; + let len = usize::try_from(row_range.end - row_range.start)?; + let list_size = u64::from(reader.layout.list_size()); + let elements_range = element_range(&row_range, list_size)?; + let elements_len = usize::try_from(elements_range.end - elements_range.start)?; + let elements_fut = reader.elements.projection_evaluation( + &elements_range, + &root(), + MaskFuture::new_true(elements_len), + )?; + let validity_fut = fetch_validity( + reader.validity.as_ref(), + &row_range, + MaskFuture::new_true(len), + )?; + let (elements, validity) = try_join!(elements_fut, validity_fut)?; + build_fixed_size_list(elements, validity, reader.layout.dtype(), len, &expr) + } + + async fn project_sparse(self, mask: Mask) -> VortexResult { + let Self { + reader, + expr, + row_range, + } = self; + let plan = sparse_element_plan(&row_range, &mask, u64::from(reader.layout.list_size()))?; + let elements_fut = reader.elements.projection_evaluation( + &plan.elements_range, + &root(), + MaskFuture::ready(plan.element_mask), + )?; + let validity_fut = fetch_validity( + reader.validity.as_ref(), + &row_range, + MaskFuture::ready(mask), + )?; + let (elements, validity) = try_join!(elements_fut, validity_fut)?; + build_fixed_size_list( + elements, + validity, + reader.layout.dtype(), + plan.kept_count, + &expr, + ) + } +} + +#[cfg(test)] +mod tests { + use rstest::rstest; + use vortex_array::ArrayContext; + use vortex_array::arrays::BoolArray; + use vortex_array::arrays::FixedSizeListArray; + use vortex_array::arrays::PrimitiveArray; + use vortex_array::assert_arrays_eq; + use vortex_array::expr::is_not_null; + use vortex_array::expr::is_null; + use vortex_array::validity::Validity; + + use super::*; + use crate::LayoutStrategy; + use crate::layouts::fixed_size_list::writer::FixedSizeListLayoutStrategy; + use crate::segments::SegmentSource; + use crate::segments::TestSegments; + use crate::sequence::SequenceId; + use crate::sequence::SequentialArrayStreamExt; + use crate::test::SESSION; + + async fn write_layout( + array: ArrayRef, + ) -> VortexResult<(Arc, crate::LayoutRef)> { + let segments = Arc::new(TestSegments::default()); + let segments_ref: Arc = Arc::::clone(&segments); + let (ptr, eof) = SequenceId::root().split(); + let stream = array.to_array_stream().sequenced(ptr); + let layout = FixedSizeListLayoutStrategy::default() + .write_stream(ArrayContext::empty(), segments, stream, eof, &SESSION) + .await?; + Ok((segments_ref, layout)) + } + + fn create_fsl(nullable: bool) -> ArrayRef { + let validity = if nullable { + Validity::Array(BoolArray::from_iter([true, false, true, true]).into_array()) + } else { + Validity::NonNullable + }; + FixedSizeListArray::new( + PrimitiveArray::from_iter(0i32..8).into_array(), + 2, + validity, + 4, + ) + .into_array() + } + + #[rstest] + #[case::non_nullable(false)] + #[case::nullable(true)] + #[tokio::test] + async fn projection_full_range(#[case] nullable: bool) -> VortexResult<()> { + let fsl = create_fsl(nullable); + let ctx = LayoutReaderContext::new(); + let (segments, layout) = write_layout(fsl.clone()).await?; + let reader = layout.new_reader("".into(), segments, &SESSION, &ctx)?; + + let result = reader + .projection_evaluation(&(0..4), &root(), MaskFuture::new_true(4))? + .await?; + + let mut exec_ctx = SESSION.create_execution_ctx(); + assert_arrays_eq!(result, fsl, &mut exec_ctx); + Ok(()) + } + + #[tokio::test] + async fn projection_partial_range() -> VortexResult<()> { + let fsl = create_fsl(true); + let ctx = LayoutReaderContext::new(); + let (segments, layout) = write_layout(fsl.clone()).await?; + let reader = layout.new_reader("".into(), segments, &SESSION, &ctx)?; + + let result = reader + .projection_evaluation(&(1..4), &root(), MaskFuture::new_true(3))? + .await?; + let expected = fsl.slice(1..4)?; + + let mut exec_ctx = SESSION.create_execution_ctx(); + assert_arrays_eq!(result, expected, &mut exec_ctx); + Ok(()) + } + + #[tokio::test] + async fn projection_sparse_mask() -> VortexResult<()> { + let fsl = create_fsl(true); + let ctx = LayoutReaderContext::new(); + let (segments, layout) = write_layout(fsl.clone()).await?; + let reader = layout.new_reader("".into(), segments, &SESSION, &ctx)?; + let mask = Mask::from_iter([true, false, false, true]); + + let result = reader + .projection_evaluation(&(0..4), &root(), MaskFuture::ready(mask.clone()))? + .await?; + let expected = fsl.filter(mask)?; + + let mut exec_ctx = SESSION.create_execution_ctx(); + assert_arrays_eq!(result, expected, &mut exec_ctx); + Ok(()) + } + + #[tokio::test] + async fn projection_degenerate_list_size_zero() -> VortexResult<()> { + let fsl = FixedSizeListArray::new( + PrimitiveArray::empty::(Nullability::NonNullable).into_array(), + 0, + Validity::Array(BoolArray::from_iter([true, false, true]).into_array()), + 3, + ) + .into_array(); + let ctx = LayoutReaderContext::new(); + let (segments, layout) = write_layout(fsl.clone()).await?; + let reader = layout.new_reader("".into(), segments, &SESSION, &ctx)?; + let mask = Mask::from_iter([false, true, true]); + + let result = reader + .projection_evaluation(&(0..3), &root(), MaskFuture::ready(mask.clone()))? + .await?; + let expected = fsl.filter(mask)?; + + let mut exec_ctx = SESSION.create_execution_ctx(); + assert_arrays_eq!(result, expected, &mut exec_ctx); + Ok(()) + } + + #[rstest] + #[case::nullable(true, vec![true, false, true, true])] + #[case::non_nullable(false, vec![true, true, true, true])] + #[tokio::test] + async fn projection_validity_class( + #[case] nullable: bool, + #[case] valid: Vec, + ) -> VortexResult<()> { + let fsl = create_fsl(nullable); + let ctx = LayoutReaderContext::new(); + let (segments, layout) = write_layout(fsl).await?; + let reader = layout.new_reader("".into(), segments, &SESSION, &ctx)?; + + let not_null = reader + .projection_evaluation(&(0..4), &is_not_null(root()), MaskFuture::new_true(4))? + .await?; + let mut exec_ctx = SESSION.create_execution_ctx(); + assert_arrays_eq!(not_null, BoolArray::from_iter(valid.clone()), &mut exec_ctx); + + let null = reader + .projection_evaluation(&(0..4), &is_null(root()), MaskFuture::new_true(4))? + .await?; + assert_arrays_eq!( + null, + BoolArray::from_iter(valid.iter().map(|v| !v).collect::>()), + &mut exec_ctx + ); + Ok(()) + } +} diff --git a/vortex-layout/src/layouts/fixed_size_list/writer.rs b/vortex-layout/src/layouts/fixed_size_list/writer.rs new file mode 100644 index 00000000000..f6775687e34 --- /dev/null +++ b/vortex-layout/src/layouts/fixed_size_list/writer.rs @@ -0,0 +1,269 @@ +// SPDX-License-Identifier: Apache-2.0 +// SPDX-FileCopyrightText: Copyright the Vortex contributors + +use std::sync::Arc; + +use async_trait::async_trait; +use futures::StreamExt; +use futures::stream; +use vortex_array::ArrayContext; +use vortex_array::ArrayRef; +use vortex_array::IntoArray; +use vortex_array::VortexSessionExecute; +use vortex_array::arrays::FixedSizeListArray; +use vortex_array::arrays::fixed_size_list::FixedSizeListDataParts; +use vortex_array::dtype::DType; +use vortex_error::VortexResult; +use vortex_error::vortex_bail; +use vortex_io::session::RuntimeSessionExt; +use vortex_session::VortexSession; + +use crate::IntoLayout; +use crate::LayoutRef; +use crate::LayoutStrategy; +use crate::layouts::fixed_size_list::FixedSizeListLayout; +use crate::layouts::flat::writer::FlatLayoutStrategy; +use crate::segments::SegmentSinkRef; +use crate::sequence::SendableSequentialStream; +use crate::sequence::SequenceId; +use crate::sequence::SequencePointer; +use crate::sequence::SequentialStream; +use crate::sequence::SequentialStreamAdapter; +use crate::sequence::SequentialStreamExt; + +/// Strategy for writing fixed-size-list arrays, with a fallback for other dtypes. +/// +/// Single-chunk only. For fixed-size-list input the strategy canonicalizes the chunk into a +/// [`FixedSizeListArray`] and writes the `elements` and optional `validity` columns into +/// independently configurable child strategies. +#[derive(Clone)] +pub struct FixedSizeListLayoutStrategy { + elements: Arc, + validity: Arc, + fallback: Arc, +} + +impl Default for FixedSizeListLayoutStrategy { + fn default() -> Self { + let flat: Arc = Arc::new(FlatLayoutStrategy::default()); + Self { + elements: Arc::clone(&flat), + validity: Arc::clone(&flat), + fallback: flat, + } + } +} + +impl FixedSizeListLayoutStrategy { + /// Strategy for the `elements` child. + pub fn with_elements(mut self, elements: Arc) -> Self { + self.elements = elements; + self + } + + /// Strategy for the `validity` child, written only when the list dtype is nullable. + pub fn with_validity(mut self, validity: Arc) -> Self { + self.validity = validity; + self + } + + /// Strategy for non-fixed-size-list input, which is forwarded unchanged. + pub fn with_fallback(mut self, fallback: Arc) -> Self { + self.fallback = fallback; + self + } +} + +#[async_trait] +impl LayoutStrategy for FixedSizeListLayoutStrategy { + async fn write_stream( + &self, + ctx: ArrayContext, + segment_sink: SegmentSinkRef, + mut stream: SendableSequentialStream, + mut eof: SequencePointer, + session: &VortexSession, + ) -> VortexResult { + let dtype = stream.dtype().clone(); + if !dtype.is_fixed_size_list() { + return self + .fallback + .write_stream(ctx, segment_sink, stream, eof, session) + .await; + } + + let Some(chunk) = stream.next().await else { + vortex_bail!("FixedSizeListLayoutStrategy needs a single chunk"); + }; + let (sequence_id, array) = chunk?; + + let mut exec_ctx = session.create_execution_ctx(); + let row_count = array.len(); + let FixedSizeListDataParts { + elements, validity, .. + } = array + .execute::(&mut exec_ctx)? + .into_data_parts(); + let validity_array = dtype + .is_nullable() + .then(|| { + validity + .execute_mask(row_count, &mut exec_ctx) + .map(|m| m.into_array()) + }) + .transpose()?; + + let handle = session.handle(); + let (elements_task, validity_task) = { + let mut sp = sequence_id.descend(); + let mut spawn_layout_writer = |strategy: Arc, array: ArrayRef| { + let stream = single_chunk_stream(array.dtype().clone(), sp.advance(), array); + let child_eof = eof.split_off(); + let ctx = ctx.clone(); + let segment_sink = Arc::clone(&segment_sink); + let session = session.clone(); + handle.spawn_nested(move |h| async move { + let session = session.with_handle(h); + strategy + .write_stream(ctx, segment_sink, stream, child_eof, &session) + .await + }) + }; + ( + spawn_layout_writer(Arc::clone(&self.elements), elements), + validity_array.map(|arr| spawn_layout_writer(Arc::clone(&self.validity), arr)), + ) + }; + + if stream.next().await.is_some() { + vortex_bail!("FixedSizeListLayoutStrategy received more than a single chunk"); + } + + let (elements_layout, validity_layout) = futures::try_join!(elements_task, async move { + match validity_task { + Some(t) => t.await.map(Some), + None => Ok(None), + } + },)?; + + Ok( + FixedSizeListLayout::new(row_count as u64, dtype, elements_layout, validity_layout) + .into_layout(), + ) + } + + fn buffered_bytes(&self) -> u64 { + let fsl_bytes = self.elements.buffered_bytes() + self.validity.buffered_bytes(); + fsl_bytes.max(self.fallback.buffered_bytes()) + } +} + +fn single_chunk_stream( + dtype: DType, + sequence_id: SequenceId, + array: ArrayRef, +) -> SendableSequentialStream { + SequentialStreamAdapter::new( + dtype, + stream::once(async move { Ok((sequence_id, array)) }).boxed(), + ) + .sendable() +} + +#[cfg(test)] +mod tests { + use vortex_array::arrays::BoolArray; + use vortex_array::arrays::FixedSizeListArray; + use vortex_array::arrays::PrimitiveArray; + use vortex_array::validity::Validity; + use vortex_buffer::buffer; + + use super::*; + use crate::segments::TestSegments; + use crate::sequence::SequentialArrayStreamExt; + use crate::test::SESSION; + + async fn write(strategy: &S, array: ArrayRef) -> VortexResult { + let segments = Arc::new(TestSegments::default()); + let (ptr, eof) = SequenceId::root().split(); + let stream = array.to_array_stream().sequenced(ptr); + strategy + .write_stream(ArrayContext::empty(), segments, stream, eof, &SESSION) + .await + } + + fn create_fsl(validity: Validity) -> ArrayRef { + FixedSizeListArray::new(buffer![1i32, 2, 3, 4, 5, 6].into_array(), 2, validity, 3) + .into_array() + } + + #[tokio::test] + async fn basic_non_nullable_input() -> VortexResult<()> { + let layout = write( + &FixedSizeListLayoutStrategy::default(), + create_fsl(Validity::NonNullable), + ) + .await?; + + assert_eq!( + layout.display_tree().to_string(), + "vortex.fixed_size_list, dtype: fixed_size_list(i32)[2], children: 1\n\ + └── elements: vortex.flat, dtype: i32, segment: 0\n" + ); + Ok(()) + } + + #[tokio::test] + async fn basic_nullable_input() -> VortexResult<()> { + let layout = write( + &FixedSizeListLayoutStrategy::default(), + create_fsl(Validity::Array( + BoolArray::from_iter([true, false, true]).into_array(), + )), + ) + .await?; + + assert_eq!( + layout.display_tree().to_string(), + "vortex.fixed_size_list, dtype: fixed_size_list(i32)[2]?, children: 2\n\ + ├── elements: vortex.flat, dtype: i32, segment: 0\n\ + └── validity: vortex.flat, dtype: bool, segment: 1\n" + ); + Ok(()) + } + + #[tokio::test] + async fn non_fixed_size_list_input_routes_to_fallback() -> VortexResult<()> { + let primitive = PrimitiveArray::from_iter([1i32, 2, 3]).into_array(); + let layout = write(&FixedSizeListLayoutStrategy::default(), primitive).await?; + assert_eq!( + layout.display_tree().to_string(), + "vortex.flat, dtype: i32, segment: 0\n" + ); + Ok(()) + } + + #[tokio::test] + async fn empty_stream_errors() { + let segments = Arc::new(TestSegments::default()); + let (_, eof) = SequenceId::root().split(); + let empty = stream::empty::>().boxed(); + let stream = SequentialStreamAdapter::new( + DType::FixedSizeList( + Arc::new(DType::Primitive( + vortex_array::dtype::PType::I32, + vortex_array::dtype::Nullability::NonNullable, + )), + 2, + vortex_array::dtype::Nullability::NonNullable, + ), + empty, + ) + .sendable(); + + let res = FixedSizeListLayoutStrategy::default() + .write_stream(ArrayContext::empty(), segments, stream, eof, &SESSION) + .await; + assert!(res.is_err()); + } +} diff --git a/vortex-layout/src/layouts/mod.rs b/vortex-layout/src/layouts/mod.rs index 18df5b8f347..1c20ef873f3 100644 --- a/vortex-layout/src/layouts/mod.rs +++ b/vortex-layout/src/layouts/mod.rs @@ -14,6 +14,7 @@ pub mod collect; pub mod compressed; pub mod dict; pub mod file_stats; +pub mod fixed_size_list; pub mod flat; pub(crate) mod foreign; pub(crate) mod partitioned; diff --git a/vortex-layout/src/session.rs b/vortex-layout/src/session.rs index 42c8906d107..7aacb918a02 100644 --- a/vortex-layout/src/session.rs +++ b/vortex-layout/src/session.rs @@ -10,6 +10,7 @@ use vortex_session::registry::Registry; use crate::LayoutEncodingRef; use crate::layouts::chunked::ChunkedLayoutEncoding; use crate::layouts::dict::DictLayoutEncoding; +use crate::layouts::fixed_size_list::FixedSizeListLayoutEncoding; use crate::layouts::flat::FlatLayoutEncoding; use crate::layouts::struct_::StructLayoutEncoding; use crate::layouts::zoned::LegacyStatsLayoutEncoding; @@ -48,6 +49,10 @@ impl Default for LayoutSession { // Register the built-in layout encodings. layouts.register(ChunkedLayoutEncoding.id(), ChunkedLayoutEncoding.as_ref()); + layouts.register( + FixedSizeListLayoutEncoding.id(), + FixedSizeListLayoutEncoding.as_ref(), + ); layouts.register(FlatLayoutEncoding.id(), FlatLayoutEncoding.as_ref()); layouts.register(StructLayoutEncoding.id(), StructLayoutEncoding.as_ref()); layouts.register(ZonedLayoutEncoding.id(), ZonedLayoutEncoding.as_ref());