diff --git a/Cargo.lock b/Cargo.lock index 173843bbaad..0f70678a143 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -9919,6 +9919,7 @@ dependencies = [ "vortex", "vortex-geo", "vortex-tensor", + "wkb", ] [[package]] diff --git a/bench-orchestrator/bench_orchestrator/cli.py b/bench-orchestrator/bench_orchestrator/cli.py index a9e37e70309..b9d7f9bb2ef 100644 --- a/bench-orchestrator/bench_orchestrator/cli.py +++ b/bench-orchestrator/bench_orchestrator/cli.py @@ -85,9 +85,11 @@ def run_ref_auto_complete() -> list[str]: return list(map(lambda x: x.run_id, ResultStore().list_runs(limit=None))) -def targets_from_axes(engine: str, format: str) -> tuple[list[BenchmarkTarget], list[str]]: +def targets_from_axes( + engine: str, format: str, benchmark: Benchmark | None = None +) -> tuple[list[BenchmarkTarget], list[str]]: """Resolve legacy engine/format axes into explicit benchmark targets.""" - return resolve_axis_targets(parse_engines(engine), parse_formats(format)) + return resolve_axis_targets(parse_engines(engine), parse_formats(format), benchmark) def backends_for_engines(engines: list[Engine]) -> list[Engine]: @@ -260,7 +262,7 @@ def run( targets = parse_targets_json(targets_json) warnings: list[str] = [] else: - targets, warnings = targets_from_axes(engine, format) + targets, warnings = targets_from_axes(engine, format, benchmark) except ValueError as exc: console.print(f"[red]{exc}[/red]") raise typer.Exit(1) from exc diff --git a/bench-orchestrator/bench_orchestrator/config.py b/bench-orchestrator/bench_orchestrator/config.py index c597e84c6be..e358bf18f01 100644 --- a/bench-orchestrator/bench_orchestrator/config.py +++ b/bench-orchestrator/bench_orchestrator/config.py @@ -52,6 +52,7 @@ class Benchmark(Enum): POLARSIGNALS = "polarsignals" PUBLIC_BI = "public-bi" STATPOPGEN = "statpopgen" + SPATIALBENCH = "spatialbench" # Engine to supported formats mapping. @@ -72,6 +73,19 @@ class Benchmark(Enum): Engine.LANCE: [Format.LANCE], } +# Engines each benchmark can run on. Benchmarks default to *every* engine; list one here only to +# restrict it. SpatialBench's queries use DuckDB-specific `ST_*` spatial SQL that DataFusion has no +# functions for yet. +BENCHMARK_ENGINES: dict[Benchmark, frozenset[Engine]] = { + Benchmark.SPATIALBENCH: frozenset({Engine.DUCKDB}), +} + + +def engines_for_benchmark(benchmark: Benchmark) -> frozenset[Engine]: + """Return the engines `benchmark` supports, defaulting to every engine when unrestricted.""" + return BENCHMARK_ENGINES.get(benchmark, frozenset(Engine)) + + T = TypeVar("T") @@ -175,13 +189,16 @@ def parse_formats_json(value: str) -> list[Format]: def resolve_axis_targets( - engines: Iterable[Engine], formats: Iterable[Format] + engines: Iterable[Engine], formats: Iterable[Format], benchmark: Benchmark | None = None ) -> tuple[list[BenchmarkTarget], list[str]]: """Expand engine/format axes into supported explicit targets.""" warnings: list[str] = [] targets: list[BenchmarkTarget] = [] for engine in engines: + if benchmark is not None and engine not in engines_for_benchmark(benchmark): + warnings.append(f"Benchmark {benchmark.value} does not support engine {engine.value}") + continue for fmt in formats: target = BenchmarkTarget(engine=engine, format=fmt).normalized() if not target.is_supported(): @@ -200,7 +217,9 @@ def group_targets_by_backend(targets: Iterable[BenchmarkTarget]) -> dict[Engine, return groups -def validate_targets(targets: Iterable[BenchmarkTarget], options: dict[str, str]) -> list[str]: +def validate_targets( + targets: Iterable[BenchmarkTarget], options: dict[str, str], benchmark: Benchmark | None = None +) -> list[str]: """Validate explicit targets against benchmark runner constraints.""" errors: list[str] = [] @@ -208,6 +227,8 @@ def validate_targets(targets: Iterable[BenchmarkTarget], options: dict[str, str] for target in normalized_targets: if not target.is_supported(): errors.append(f"Format {target.format.value} is not supported by engine {target.engine.value}") + if benchmark is not None and target.engine not in engines_for_benchmark(benchmark): + errors.append(f"Benchmark {benchmark.value} does not support engine {target.engine.value}") if options.get("remote-data-dir") and any(target.format == Format.LANCE for target in normalized_targets): errors.append("Lance format is not supported for remote storage benchmarks.") @@ -242,7 +263,7 @@ def backends(self) -> list[Engine]: def validate(self) -> list[str]: """Validate the configuration and return any errors.""" - return validate_targets(self.targets, self.options) + return validate_targets(self.targets, self.options, self.benchmark) @dataclass diff --git a/bench-orchestrator/tests/test_config.py b/bench-orchestrator/tests/test_config.py index c7e2d6bb291..f900048f87b 100644 --- a/bench-orchestrator/tests/test_config.py +++ b/bench-orchestrator/tests/test_config.py @@ -2,6 +2,7 @@ # SPDX-FileCopyrightText: Copyright the Vortex contributors from bench_orchestrator.config import ( + Benchmark, BenchmarkTarget, Engine, Format, @@ -39,6 +40,31 @@ def test_resolve_axis_targets_filters_unsupported_combinations() -> None: assert warnings == ["Format arrow is not supported by engine duckdb"] +def test_resolve_axis_targets_skips_engines_a_benchmark_cannot_run() -> None: + # SpatialBench is DuckDB-only (ST_* spatial SQL), so the DataFusion axis is dropped with a warning. + targets, warnings = resolve_axis_targets( + [Engine.DATAFUSION, Engine.DUCKDB], + [Format.PARQUET, Format.VORTEX], + Benchmark.SPATIALBENCH, + ) + + assert targets == [ + BenchmarkTarget(engine=Engine.DUCKDB, format=Format.PARQUET), + BenchmarkTarget(engine=Engine.DUCKDB, format=Format.VORTEX), + ] + assert warnings == ["Benchmark spatialbench does not support engine datafusion"] + + +def test_validate_targets_rejects_engine_a_benchmark_cannot_run() -> None: + errors = validate_targets( + [BenchmarkTarget(engine=Engine.DATAFUSION, format=Format.PARQUET)], + {}, + Benchmark.SPATIALBENCH, + ) + + assert errors == ["Benchmark spatialbench does not support engine datafusion"] + + def test_validate_targets_rejects_remote_lance() -> None: errors = validate_targets( [BenchmarkTarget(engine=Engine.DATAFUSION, format=Format.LANCE)], diff --git a/vortex-bench/Cargo.toml b/vortex-bench/Cargo.toml index 9cd7fc92d30..f655b5213bb 100644 --- a/vortex-bench/Cargo.toml +++ b/vortex-bench/Cargo.toml @@ -69,6 +69,7 @@ tracing-subscriber = { workspace = true, features = [ ] } url = { workspace = true } uuid = { workspace = true, features = ["v4"] } +wkb = { workspace = true } [dev-dependencies] insta = { workspace = true } diff --git a/vortex-bench/src/conversions.rs b/vortex-bench/src/conversions.rs index ad84ef61b1d..b7c367ef8f6 100644 --- a/vortex-bench/src/conversions.rs +++ b/vortex-bench/src/conversions.rs @@ -4,6 +4,7 @@ use std::fs; use std::path::Path; use std::path::PathBuf; +use std::sync::Arc; use futures::StreamExt; use futures::TryStreamExt; @@ -22,29 +23,43 @@ use tracing::info; use tracing::trace; use vortex::VortexSessionDefault; use vortex::array::ArrayRef; +use vortex::array::ExecutionCtx; use vortex::array::IntoArray; use vortex::array::VortexSessionExecute; use vortex::array::arrays::ChunkedArray; use vortex::array::arrays::ExtensionArray; use vortex::array::arrays::Struct; use vortex::array::arrays::StructArray; +use vortex::array::arrays::VarBinViewArray; use vortex::array::arrays::struct_::StructArrayExt; use vortex::array::arrow::FromArrowArray; use vortex::array::builders::builder_with_capacity; use vortex::array::stream::ArrayStreamAdapter; use vortex::array::stream::ArrayStreamExt; +use vortex::compressor::BtrBlocksCompressorBuilder; use vortex::dtype::DType; +use vortex::dtype::FieldPath; use vortex::dtype::StructFields; use vortex::dtype::arrow::FromArrowType; use vortex::dtype::extension::ExtDType; use vortex::dtype::extension::ExtDTypeRef; use vortex::error::VortexResult; use vortex::error::vortex_err; +use vortex::file::VortexWriteOptions; use vortex::file::WriteOptionsSessionExt; +use vortex::file::WriteStrategyBuilder; +use vortex::layout::LayoutStrategy; +use vortex::layout::layouts::chunked::writer::ChunkedLayoutStrategy; +use vortex::layout::layouts::compressed::CompressingStrategy; +use vortex::layout::layouts::flat::writer::FlatLayoutStrategy; use vortex::session::VortexSession; use vortex::utils::aliases::hash_set::HashSet; use vortex_geo::extension::GeoMetadata; use vortex_geo::extension::WellKnownBinary; +use wkb::Endianness; +use wkb::reader::read_wkb; +use wkb::writer::WriteOptions; +use wkb::writer::write_geometry; use crate::CompactionStrategy; use crate::Format; @@ -142,8 +157,7 @@ pub async fn convert_parquet_file_to_vortex( .open(output_path) .await?; - compaction - .apply_options(SESSION.write_options()) + write_options_for(compaction, &dtype, is_spatialbench(parquet_path)) .write( &mut output_file, ArrayStreamExt::boxed(ArrayStreamAdapter::new(dtype, stream)), @@ -153,6 +167,54 @@ pub async fn convert_parquet_file_to_vortex( Ok(()) } +/// Whether `path` points at SpatialBench data. +fn is_spatialbench(path: &Path) -> bool { + path.components() + .any(|component| component.as_os_str() == "spatialbench") +} + +/// Vortex write options for converting `dtype`-shaped data. +/// +/// For SpatialBench (`skip_binary_dict`), the geometry blobs are large and +/// unique, so the dictionary builder balloons memory (tens of GB) for zero gain. +fn write_options_for( + compaction: CompactionStrategy, + dtype: &DType, + skip_binary_dict: bool, +) -> VortexWriteOptions { + let binary_fields: Vec<_> = match dtype { + DType::Struct(fields, _) if skip_binary_dict => fields + .names() + .iter() + .zip(fields.fields()) + .filter(|(_, field)| matches!(field, DType::Binary(_))) + .map(|(name, _)| name.clone()) + .collect(), + _ => Vec::new(), + }; + if binary_fields.is_empty() { + return compaction.apply_options(SESSION.write_options()); + } + + let mut builder = WriteStrategyBuilder::default(); + if matches!(compaction, CompactionStrategy::Compact) { + builder = + builder.with_btrblocks_builder(BtrBlocksCompressorBuilder::default().with_compact()); + } + for name in binary_fields { + builder = builder.with_field_writer(FieldPath::from_name(name), no_dict_layout()); + } + SESSION.write_options().with_strategy(builder.build()) +} + +/// A chunked + compressed layout that skips dictionary encoding for opaque `Binary` blobs. +fn no_dict_layout() -> Arc { + Arc::new(CompressingStrategy::new( + ChunkedLayoutStrategy::new(FlatLayoutStrategy::default()), + BtrBlocksCompressorBuilder::default().build(), + )) +} + /// Convert all Parquet files in a directory to Vortex format. /// /// This function reads Parquet files from `{input_path}/parquet/` and writes Vortex files to @@ -251,7 +313,7 @@ pub async fn add_geoparquet_metadata(parquet_path: &Path, geo_json: &str) -> any return Ok(()); } - let schema = std::sync::Arc::clone(builder.schema()); + let schema = Arc::clone(builder.schema()); let mut reader = builder.build()?; let tmp_path = parquet_path.with_extension("parquet.tmp"); @@ -314,7 +376,8 @@ fn tag_geo_dtype(dtype: DType, geo_columns: &HashSet) -> VortexResult
) -> VortexResult { if geo_columns.is_empty() { return Ok(chunk); @@ -325,17 +388,51 @@ fn tag_geo_array(chunk: ArrayRef, geo_columns: &HashSet) -> VortexResult let names = struct_array.names().clone(); let validity = struct_array.struct_validity(); let len = struct_array.len(); - let tagged = names - .iter() - .zip(struct_array.iter_unmasked_fields()) - .map(|(name, field)| { - if geo_columns.contains(name.as_ref()) && field.dtype().is_binary() { - let ext = wkb_ext_dtype(field.dtype())?; - Ok(ExtensionArray::try_new(ext, field.clone())?.into_array()) - } else { - Ok(field.clone()) + let mut ctx = SESSION.create_execution_ctx(); + let mut tagged = Vec::with_capacity(names.len()); + for (name, field) in names.iter().zip(struct_array.iter_unmasked_fields()) { + if geo_columns.contains(name.as_ref()) && field.dtype().is_binary() { + let ext = wkb_ext_dtype(field.dtype())?; + let little_endian = wkb_field_to_little_endian(field, &mut ctx)?; + tagged.push(ExtensionArray::try_new(ext, little_endian)?.into_array()); + } else { + tagged.push(field.clone()); + } + } + Ok(StructArray::try_new(names, tagged, len, validity)?.into_array()) +} + +/// Re-encode a binary `field`'s WKB values as little-endian (NDR), the only byte order DuckDB's +/// `GEOMETRY` accepts. Byte order is uniform within a column, so the array is returned untouched (no +/// copy) unless its first non-null value is big-endian; only then is each value re-encoded. +fn wkb_field_to_little_endian(field: &ArrayRef, ctx: &mut ExecutionCtx) -> VortexResult { + let values = field.clone().execute::(ctx)?; + let len = values.len(); + let validity = values.validity()?.execute_mask(len, ctx)?; + let is_big_endian = (0..len) + .find(|&i| validity.value(i)) + .is_some_and(|i| values.bytes_at(i).as_slice().first() == Some(&0x00)); + if !is_big_endian { + return Ok(field.clone()); + } + let little_endian: Vec>> = (0..len) + .map(|i| { + if !validity.value(i) { + return Ok(None); } + let wkb = values.bytes_at(i); + let geometry = read_wkb(wkb.as_slice()).map_err(|e| vortex_err!("invalid WKB: {e}"))?; + let mut encoded = Vec::with_capacity(wkb.len()); + write_geometry( + &mut encoded, + &geometry, + &WriteOptions { + endianness: Endianness::LittleEndian, + }, + ) + .map_err(|e| vortex_err!("re-encoding WKB as little-endian: {e}"))?; + Ok(Some(encoded)) }) - .collect::>>()?; - Ok(StructArray::try_new(names, tagged, len, validity)?.into_array()) + .collect::>()?; + Ok(VarBinViewArray::from_iter_nullable_bin(little_endian).into_array()) } diff --git a/vortex-bench/src/spatialbench/datagen/wkb.rs b/vortex-bench/src/spatialbench/datagen/wkb.rs index 422505a3623..e0160ea6f9c 100644 --- a/vortex-bench/src/spatialbench/datagen/wkb.rs +++ b/vortex-bench/src/spatialbench/datagen/wkb.rs @@ -5,9 +5,11 @@ //! Geometry is emitted as WKB, which DuckDB reads directly as `GEOMETRY` via `ST_GeomFromWKB`. use std::fs; +use std::path::Path; use std::path::PathBuf; use std::sync::Arc; +use anyhow::Context; use anyhow::Result; // spatialbench emits arrow-56 batches, so they must be written with its matching arrow-56 // parquet crate, not the workspace's arrow-58 one. The parquet file itself is version-neutral. @@ -23,6 +25,7 @@ use spatialbench_parquet::basic::Compression; use spatialbench_parquet::file::properties::WriterProperties; use spatialbench_parquet::format::KeyValue; use tokio::fs::File as TokioFile; +use tokio::process::Command; use tracing::info; use super::table::Table; @@ -109,6 +112,52 @@ pub async fn generate_tables(scale_factor: &str, output_dir: PathBuf) -> Result< } } + // `zone` isn't generated in-process (`Table::is_generated` is false); it comes from the upstream + // `spatialbench-cli` (install it, or set `SPATIALBENCH_CLI` to its binary). + generate_zone(scale_factor, &parquet_dir).await?; + + Ok(()) +} + +/// Generate the externally-sourced `zone` table by shelling out to the upstream +/// `spatialbench-cli`. +async fn generate_zone(scale_factor: f64, parquet_dir: &Path) -> Result<()> { + let cli = std::env::var("SPATIALBENCH_CLI").unwrap_or_else(|_| "spatialbench-cli".to_string()); + idempotent_async(parquet_dir.join("zone_0.parquet"), |zone_path| async move { + info!( + scale_factor, + cli, "Generating SpatialBench zone table via spatialbench-cli" + ); + // The CLI writes a fixed `zone.parquet` name into an output directory, so + // generate into a scratch dir and move the produced parquet onto `zone_path`. + let scratch = zone_path.with_extension("zone-scratch"); + fs::create_dir_all(&scratch)?; + let status = Command::new(&cli) + .arg("-s") + .arg(scale_factor.to_string()) + .args(["-T", "zone", "-f", "parquet", "-o"]) + .arg(&scratch) + .status() + .await + .with_context(|| format!("failed to spawn `{cli}` (is it installed / on PATH?)"))?; + anyhow::ensure!( + status.success(), + "`{cli}` exited with {status} while generating zone" + ); + let produced = glob::glob(&scratch.join("**/*.parquet").to_string_lossy())? + .flatten() + .next() + .with_context(|| { + format!( + "`{cli}` produced no zone parquet under {}", + scratch.display() + ) + })?; + fs::rename(&produced, &zone_path)?; + fs::remove_dir_all(&scratch).ok(); + Ok(()) + }) + .await?; Ok(()) }