-
Notifications
You must be signed in to change notification settings - Fork 180
feat(vortex-bench): wire SpatialBench into the bench orchestrator #8607
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: develop
Are you sure you want to change the base?
Changes from all commits
b362551
1a8001b
8fa6ee0
c282925
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -4,6 +4,7 @@ | |
| use std::fs; | ||
| use std::path::Path; | ||
| use std::path::PathBuf; | ||
| use std::sync::Arc; | ||
|
|
||
| use futures::StreamExt; | ||
| use futures::TryStreamExt; | ||
|
|
@@ -22,29 +23,43 @@ use tracing::info; | |
| use tracing::trace; | ||
| use vortex::VortexSessionDefault; | ||
| use vortex::array::ArrayRef; | ||
| use vortex::array::ExecutionCtx; | ||
| use vortex::array::IntoArray; | ||
| use vortex::array::VortexSessionExecute; | ||
| use vortex::array::arrays::ChunkedArray; | ||
| use vortex::array::arrays::ExtensionArray; | ||
| use vortex::array::arrays::Struct; | ||
| use vortex::array::arrays::StructArray; | ||
| use vortex::array::arrays::VarBinViewArray; | ||
| use vortex::array::arrays::struct_::StructArrayExt; | ||
| use vortex::array::arrow::FromArrowArray; | ||
| use vortex::array::builders::builder_with_capacity; | ||
| use vortex::array::stream::ArrayStreamAdapter; | ||
| use vortex::array::stream::ArrayStreamExt; | ||
| use vortex::compressor::BtrBlocksCompressorBuilder; | ||
| use vortex::dtype::DType; | ||
| use vortex::dtype::FieldPath; | ||
| use vortex::dtype::StructFields; | ||
| use vortex::dtype::arrow::FromArrowType; | ||
| use vortex::dtype::extension::ExtDType; | ||
| use vortex::dtype::extension::ExtDTypeRef; | ||
| use vortex::error::VortexResult; | ||
| use vortex::error::vortex_err; | ||
| use vortex::file::VortexWriteOptions; | ||
| use vortex::file::WriteOptionsSessionExt; | ||
| use vortex::file::WriteStrategyBuilder; | ||
| use vortex::layout::LayoutStrategy; | ||
| use vortex::layout::layouts::chunked::writer::ChunkedLayoutStrategy; | ||
| use vortex::layout::layouts::compressed::CompressingStrategy; | ||
| use vortex::layout::layouts::flat::writer::FlatLayoutStrategy; | ||
| use vortex::session::VortexSession; | ||
| use vortex::utils::aliases::hash_set::HashSet; | ||
| use vortex_geo::extension::GeoMetadata; | ||
| use vortex_geo::extension::WellKnownBinary; | ||
| use wkb::Endianness; | ||
| use wkb::reader::read_wkb; | ||
| use wkb::writer::WriteOptions; | ||
| use wkb::writer::write_geometry; | ||
|
|
||
| use crate::CompactionStrategy; | ||
| use crate::Format; | ||
|
|
@@ -142,8 +157,7 @@ pub async fn convert_parquet_file_to_vortex( | |
| .open(output_path) | ||
| .await?; | ||
|
|
||
| compaction | ||
| .apply_options(SESSION.write_options()) | ||
| write_options_for(compaction, &dtype, is_spatialbench(parquet_path)) | ||
| .write( | ||
| &mut output_file, | ||
| ArrayStreamExt::boxed(ArrayStreamAdapter::new(dtype, stream)), | ||
|
|
@@ -153,6 +167,54 @@ pub async fn convert_parquet_file_to_vortex( | |
| Ok(()) | ||
| } | ||
|
|
||
| /// Whether `path` points at SpatialBench data. | ||
| fn is_spatialbench(path: &Path) -> bool { | ||
| path.components() | ||
| .any(|component| component.as_os_str() == "spatialbench") | ||
| } | ||
|
|
||
| /// Vortex write options for converting `dtype`-shaped data. | ||
| /// | ||
| /// For SpatialBench (`skip_binary_dict`), the geometry blobs are large and | ||
| /// unique, so the dictionary builder balloons memory (tens of GB) for zero gain. | ||
| fn write_options_for( | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. this is clunky, not sure I have a better way if doing that right now :/
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yeah, this is a way I thought is clean for now, otherwise people need to wait for like 1 hour of dictionary compact on binary for SF=1. |
||
| compaction: CompactionStrategy, | ||
| dtype: &DType, | ||
| skip_binary_dict: bool, | ||
| ) -> VortexWriteOptions { | ||
| let binary_fields: Vec<_> = match dtype { | ||
| DType::Struct(fields, _) if skip_binary_dict => fields | ||
| .names() | ||
| .iter() | ||
| .zip(fields.fields()) | ||
| .filter(|(_, field)| matches!(field, DType::Binary(_))) | ||
| .map(|(name, _)| name.clone()) | ||
| .collect(), | ||
| _ => Vec::new(), | ||
| }; | ||
| if binary_fields.is_empty() { | ||
| return compaction.apply_options(SESSION.write_options()); | ||
| } | ||
|
|
||
| let mut builder = WriteStrategyBuilder::default(); | ||
| if matches!(compaction, CompactionStrategy::Compact) { | ||
| builder = | ||
| builder.with_btrblocks_builder(BtrBlocksCompressorBuilder::default().with_compact()); | ||
| } | ||
| for name in binary_fields { | ||
| builder = builder.with_field_writer(FieldPath::from_name(name), no_dict_layout()); | ||
| } | ||
| SESSION.write_options().with_strategy(builder.build()) | ||
| } | ||
|
|
||
| /// A chunked + compressed layout that skips dictionary encoding for opaque `Binary` blobs. | ||
| fn no_dict_layout() -> Arc<dyn LayoutStrategy> { | ||
| Arc::new(CompressingStrategy::new( | ||
| ChunkedLayoutStrategy::new(FlatLayoutStrategy::default()), | ||
| BtrBlocksCompressorBuilder::default().build(), | ||
| )) | ||
| } | ||
|
|
||
| /// Convert all Parquet files in a directory to Vortex format. | ||
| /// | ||
| /// This function reads Parquet files from `{input_path}/parquet/` and writes Vortex files to | ||
|
|
@@ -251,7 +313,7 @@ pub async fn add_geoparquet_metadata(parquet_path: &Path, geo_json: &str) -> any | |
| return Ok(()); | ||
| } | ||
|
|
||
| let schema = std::sync::Arc::clone(builder.schema()); | ||
| let schema = Arc::clone(builder.schema()); | ||
| let mut reader = builder.build()?; | ||
|
|
||
| let tmp_path = parquet_path.with_extension("parquet.tmp"); | ||
|
|
@@ -314,7 +376,8 @@ fn tag_geo_dtype(dtype: DType, geo_columns: &HashSet<String>) -> VortexResult<DT | |
| )) | ||
| } | ||
|
|
||
| /// Wrap the named binary columns of a struct `chunk` as `vortex.geo.wkb` extension arrays. | ||
| /// Wrap the named binary columns of a struct `chunk` as `vortex.geo.wkb` extension arrays, storing | ||
| /// their WKB little-endian. | ||
| fn tag_geo_array(chunk: ArrayRef, geo_columns: &HashSet<String>) -> VortexResult<ArrayRef> { | ||
| if geo_columns.is_empty() { | ||
| return Ok(chunk); | ||
|
|
@@ -325,17 +388,51 @@ fn tag_geo_array(chunk: ArrayRef, geo_columns: &HashSet<String>) -> VortexResult | |
| let names = struct_array.names().clone(); | ||
| let validity = struct_array.struct_validity(); | ||
| let len = struct_array.len(); | ||
| let tagged = names | ||
| .iter() | ||
| .zip(struct_array.iter_unmasked_fields()) | ||
| .map(|(name, field)| { | ||
| if geo_columns.contains(name.as_ref()) && field.dtype().is_binary() { | ||
| let ext = wkb_ext_dtype(field.dtype())?; | ||
| Ok(ExtensionArray::try_new(ext, field.clone())?.into_array()) | ||
| } else { | ||
| Ok(field.clone()) | ||
| let mut ctx = SESSION.create_execution_ctx(); | ||
| let mut tagged = Vec::with_capacity(names.len()); | ||
| for (name, field) in names.iter().zip(struct_array.iter_unmasked_fields()) { | ||
| if geo_columns.contains(name.as_ref()) && field.dtype().is_binary() { | ||
| let ext = wkb_ext_dtype(field.dtype())?; | ||
| let little_endian = wkb_field_to_little_endian(field, &mut ctx)?; | ||
| tagged.push(ExtensionArray::try_new(ext, little_endian)?.into_array()); | ||
| } else { | ||
| tagged.push(field.clone()); | ||
| } | ||
| } | ||
| Ok(StructArray::try_new(names, tagged, len, validity)?.into_array()) | ||
| } | ||
|
|
||
| /// Re-encode a binary `field`'s WKB values as little-endian (NDR), the only byte order DuckDB's | ||
| /// `GEOMETRY` accepts. Byte order is uniform within a column, so the array is returned untouched (no | ||
| /// copy) unless its first non-null value is big-endian; only then is each value re-encoded. | ||
| fn wkb_field_to_little_endian(field: &ArrayRef, ctx: &mut ExecutionCtx) -> VortexResult<ArrayRef> { | ||
| let values = field.clone().execute::<VarBinViewArray>(ctx)?; | ||
| let len = values.len(); | ||
| let validity = values.validity()?.execute_mask(len, ctx)?; | ||
| let is_big_endian = (0..len) | ||
| .find(|&i| validity.value(i)) | ||
| .is_some_and(|i| values.bytes_at(i).as_slice().first() == Some(&0x00)); | ||
| if !is_big_endian { | ||
| return Ok(field.clone()); | ||
| } | ||
| let little_endian: Vec<Option<Vec<u8>>> = (0..len) | ||
| .map(|i| { | ||
| if !validity.value(i) { | ||
| return Ok(None); | ||
| } | ||
| let wkb = values.bytes_at(i); | ||
| let geometry = read_wkb(wkb.as_slice()).map_err(|e| vortex_err!("invalid WKB: {e}"))?; | ||
| let mut encoded = Vec::with_capacity(wkb.len()); | ||
| write_geometry( | ||
| &mut encoded, | ||
| &geometry, | ||
| &WriteOptions { | ||
| endianness: Endianness::LittleEndian, | ||
| }, | ||
| ) | ||
| .map_err(|e| vortex_err!("re-encoding WKB as little-endian: {e}"))?; | ||
| Ok(Some(encoded)) | ||
| }) | ||
| .collect::<VortexResult<Vec<_>>>()?; | ||
| Ok(StructArray::try_new(names, tagged, len, validity)?.into_array()) | ||
| .collect::<VortexResult<_>>()?; | ||
| Ok(VarBinViewArray::from_iter_nullable_bin(little_endian).into_array()) | ||
| } | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't about this much, but why is this needed?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
To avoid running SpatialBench on engines (DataFusion) that can't execute its spatial queries for now, and fail early and clearly.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ideally we can remove this function, though users may need a bit aware of Datafusion and other engine cannot run SpatialBench now? seems not a big problem.