Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

8 changes: 5 additions & 3 deletions bench-orchestrator/bench_orchestrator/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -85,9 +85,11 @@ def run_ref_auto_complete() -> list[str]:
return list(map(lambda x: x.run_id, ResultStore().list_runs(limit=None)))


def targets_from_axes(engine: str, format: str) -> tuple[list[BenchmarkTarget], list[str]]:
def targets_from_axes(
engine: str, format: str, benchmark: Benchmark | None = None

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't about this much, but why is this needed?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To avoid running SpatialBench on engines (DataFusion) that can't execute its spatial queries for now, and fail early and clearly.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ideally we can remove this function, though users may need a bit aware of Datafusion and other engine cannot run SpatialBench now? seems not a big problem.

) -> tuple[list[BenchmarkTarget], list[str]]:
"""Resolve legacy engine/format axes into explicit benchmark targets."""
return resolve_axis_targets(parse_engines(engine), parse_formats(format))
return resolve_axis_targets(parse_engines(engine), parse_formats(format), benchmark)


def backends_for_engines(engines: list[Engine]) -> list[Engine]:
Expand Down Expand Up @@ -260,7 +262,7 @@ def run(
targets = parse_targets_json(targets_json)
warnings: list[str] = []
else:
targets, warnings = targets_from_axes(engine, format)
targets, warnings = targets_from_axes(engine, format, benchmark)
except ValueError as exc:
console.print(f"[red]{exc}[/red]")
raise typer.Exit(1) from exc
Expand Down
27 changes: 24 additions & 3 deletions bench-orchestrator/bench_orchestrator/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ class Benchmark(Enum):
POLARSIGNALS = "polarsignals"
PUBLIC_BI = "public-bi"
STATPOPGEN = "statpopgen"
SPATIALBENCH = "spatialbench"


# Engine to supported formats mapping.
Expand All @@ -72,6 +73,19 @@ class Benchmark(Enum):
Engine.LANCE: [Format.LANCE],
}

# Engines each benchmark can run on. Benchmarks default to *every* engine; list one here only to
# restrict it. SpatialBench's queries use DuckDB-specific `ST_*` spatial SQL that DataFusion has no
# functions for yet.
BENCHMARK_ENGINES: dict[Benchmark, frozenset[Engine]] = {
Benchmark.SPATIALBENCH: frozenset({Engine.DUCKDB}),
}


def engines_for_benchmark(benchmark: Benchmark) -> frozenset[Engine]:
"""Return the engines `benchmark` supports, defaulting to every engine when unrestricted."""
return BENCHMARK_ENGINES.get(benchmark, frozenset(Engine))


T = TypeVar("T")


Expand Down Expand Up @@ -175,13 +189,16 @@ def parse_formats_json(value: str) -> list[Format]:


def resolve_axis_targets(
engines: Iterable[Engine], formats: Iterable[Format]
engines: Iterable[Engine], formats: Iterable[Format], benchmark: Benchmark | None = None
) -> tuple[list[BenchmarkTarget], list[str]]:
"""Expand engine/format axes into supported explicit targets."""
warnings: list[str] = []
targets: list[BenchmarkTarget] = []

for engine in engines:
if benchmark is not None and engine not in engines_for_benchmark(benchmark):
warnings.append(f"Benchmark {benchmark.value} does not support engine {engine.value}")
continue
for fmt in formats:
target = BenchmarkTarget(engine=engine, format=fmt).normalized()
if not target.is_supported():
Expand All @@ -200,14 +217,18 @@ def group_targets_by_backend(targets: Iterable[BenchmarkTarget]) -> dict[Engine,
return groups


def validate_targets(targets: Iterable[BenchmarkTarget], options: dict[str, str]) -> list[str]:
def validate_targets(
targets: Iterable[BenchmarkTarget], options: dict[str, str], benchmark: Benchmark | None = None
) -> list[str]:
"""Validate explicit targets against benchmark runner constraints."""
errors: list[str] = []

normalized_targets = [target.normalized() for target in targets]
for target in normalized_targets:
if not target.is_supported():
errors.append(f"Format {target.format.value} is not supported by engine {target.engine.value}")
if benchmark is not None and target.engine not in engines_for_benchmark(benchmark):
errors.append(f"Benchmark {benchmark.value} does not support engine {target.engine.value}")

if options.get("remote-data-dir") and any(target.format == Format.LANCE for target in normalized_targets):
errors.append("Lance format is not supported for remote storage benchmarks.")
Expand Down Expand Up @@ -242,7 +263,7 @@ def backends(self) -> list[Engine]:

def validate(self) -> list[str]:
"""Validate the configuration and return any errors."""
return validate_targets(self.targets, self.options)
return validate_targets(self.targets, self.options, self.benchmark)


@dataclass
Expand Down
26 changes: 26 additions & 0 deletions bench-orchestrator/tests/test_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
# SPDX-FileCopyrightText: Copyright the Vortex contributors

from bench_orchestrator.config import (
Benchmark,
BenchmarkTarget,
Engine,
Format,
Expand Down Expand Up @@ -39,6 +40,31 @@ def test_resolve_axis_targets_filters_unsupported_combinations() -> None:
assert warnings == ["Format arrow is not supported by engine duckdb"]


def test_resolve_axis_targets_skips_engines_a_benchmark_cannot_run() -> None:
# SpatialBench is DuckDB-only (ST_* spatial SQL), so the DataFusion axis is dropped with a warning.
targets, warnings = resolve_axis_targets(
[Engine.DATAFUSION, Engine.DUCKDB],
[Format.PARQUET, Format.VORTEX],
Benchmark.SPATIALBENCH,
)

assert targets == [
BenchmarkTarget(engine=Engine.DUCKDB, format=Format.PARQUET),
BenchmarkTarget(engine=Engine.DUCKDB, format=Format.VORTEX),
]
assert warnings == ["Benchmark spatialbench does not support engine datafusion"]


def test_validate_targets_rejects_engine_a_benchmark_cannot_run() -> None:
errors = validate_targets(
[BenchmarkTarget(engine=Engine.DATAFUSION, format=Format.PARQUET)],
{},
Benchmark.SPATIALBENCH,
)

assert errors == ["Benchmark spatialbench does not support engine datafusion"]


def test_validate_targets_rejects_remote_lance() -> None:
errors = validate_targets(
[BenchmarkTarget(engine=Engine.DATAFUSION, format=Format.LANCE)],
Expand Down
1 change: 1 addition & 0 deletions vortex-bench/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ tracing-subscriber = { workspace = true, features = [
] }
url = { workspace = true }
uuid = { workspace = true, features = ["v4"] }
wkb = { workspace = true }

[dev-dependencies]
insta = { workspace = true }
Expand Down
127 changes: 112 additions & 15 deletions vortex-bench/src/conversions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
use std::fs;
use std::path::Path;
use std::path::PathBuf;
use std::sync::Arc;

use futures::StreamExt;
use futures::TryStreamExt;
Expand All @@ -22,29 +23,43 @@ use tracing::info;
use tracing::trace;
use vortex::VortexSessionDefault;
use vortex::array::ArrayRef;
use vortex::array::ExecutionCtx;
use vortex::array::IntoArray;
use vortex::array::VortexSessionExecute;
use vortex::array::arrays::ChunkedArray;
use vortex::array::arrays::ExtensionArray;
use vortex::array::arrays::Struct;
use vortex::array::arrays::StructArray;
use vortex::array::arrays::VarBinViewArray;
use vortex::array::arrays::struct_::StructArrayExt;
use vortex::array::arrow::FromArrowArray;
use vortex::array::builders::builder_with_capacity;
use vortex::array::stream::ArrayStreamAdapter;
use vortex::array::stream::ArrayStreamExt;
use vortex::compressor::BtrBlocksCompressorBuilder;
use vortex::dtype::DType;
use vortex::dtype::FieldPath;
use vortex::dtype::StructFields;
use vortex::dtype::arrow::FromArrowType;
use vortex::dtype::extension::ExtDType;
use vortex::dtype::extension::ExtDTypeRef;
use vortex::error::VortexResult;
use vortex::error::vortex_err;
use vortex::file::VortexWriteOptions;
use vortex::file::WriteOptionsSessionExt;
use vortex::file::WriteStrategyBuilder;
use vortex::layout::LayoutStrategy;
use vortex::layout::layouts::chunked::writer::ChunkedLayoutStrategy;
use vortex::layout::layouts::compressed::CompressingStrategy;
use vortex::layout::layouts::flat::writer::FlatLayoutStrategy;
use vortex::session::VortexSession;
use vortex::utils::aliases::hash_set::HashSet;
use vortex_geo::extension::GeoMetadata;
use vortex_geo::extension::WellKnownBinary;
use wkb::Endianness;
use wkb::reader::read_wkb;
use wkb::writer::WriteOptions;
use wkb::writer::write_geometry;

use crate::CompactionStrategy;
use crate::Format;
Expand Down Expand Up @@ -142,8 +157,7 @@ pub async fn convert_parquet_file_to_vortex(
.open(output_path)
.await?;

compaction
.apply_options(SESSION.write_options())
write_options_for(compaction, &dtype, is_spatialbench(parquet_path))
.write(
&mut output_file,
ArrayStreamExt::boxed(ArrayStreamAdapter::new(dtype, stream)),
Expand All @@ -153,6 +167,54 @@ pub async fn convert_parquet_file_to_vortex(
Ok(())
}

/// Whether `path` points at SpatialBench data.
fn is_spatialbench(path: &Path) -> bool {
path.components()
.any(|component| component.as_os_str() == "spatialbench")
}

/// Vortex write options for converting `dtype`-shaped data.
///
/// For SpatialBench (`skip_binary_dict`), the geometry blobs are large and
/// unique, so the dictionary builder balloons memory (tens of GB) for zero gain.
fn write_options_for(

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is clunky, not sure I have a better way if doing that right now :/

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, this is a way I thought is clean for now, otherwise people need to wait for like 1 hour of dictionary compact on binary for SF=1.

compaction: CompactionStrategy,
dtype: &DType,
skip_binary_dict: bool,
) -> VortexWriteOptions {
let binary_fields: Vec<_> = match dtype {
DType::Struct(fields, _) if skip_binary_dict => fields
.names()
.iter()
.zip(fields.fields())
.filter(|(_, field)| matches!(field, DType::Binary(_)))
.map(|(name, _)| name.clone())
.collect(),
_ => Vec::new(),
};
if binary_fields.is_empty() {
return compaction.apply_options(SESSION.write_options());
}

let mut builder = WriteStrategyBuilder::default();
if matches!(compaction, CompactionStrategy::Compact) {
builder =
builder.with_btrblocks_builder(BtrBlocksCompressorBuilder::default().with_compact());
}
for name in binary_fields {
builder = builder.with_field_writer(FieldPath::from_name(name), no_dict_layout());
}
SESSION.write_options().with_strategy(builder.build())
}

/// A chunked + compressed layout that skips dictionary encoding for opaque `Binary` blobs.
fn no_dict_layout() -> Arc<dyn LayoutStrategy> {
Arc::new(CompressingStrategy::new(
ChunkedLayoutStrategy::new(FlatLayoutStrategy::default()),
BtrBlocksCompressorBuilder::default().build(),
))
}

/// Convert all Parquet files in a directory to Vortex format.
///
/// This function reads Parquet files from `{input_path}/parquet/` and writes Vortex files to
Expand Down Expand Up @@ -251,7 +313,7 @@ pub async fn add_geoparquet_metadata(parquet_path: &Path, geo_json: &str) -> any
return Ok(());
}

let schema = std::sync::Arc::clone(builder.schema());
let schema = Arc::clone(builder.schema());
let mut reader = builder.build()?;

let tmp_path = parquet_path.with_extension("parquet.tmp");
Expand Down Expand Up @@ -314,7 +376,8 @@ fn tag_geo_dtype(dtype: DType, geo_columns: &HashSet<String>) -> VortexResult<DT
))
}

/// Wrap the named binary columns of a struct `chunk` as `vortex.geo.wkb` extension arrays.
/// Wrap the named binary columns of a struct `chunk` as `vortex.geo.wkb` extension arrays, storing
/// their WKB little-endian.
fn tag_geo_array(chunk: ArrayRef, geo_columns: &HashSet<String>) -> VortexResult<ArrayRef> {
if geo_columns.is_empty() {
return Ok(chunk);
Expand All @@ -325,17 +388,51 @@ fn tag_geo_array(chunk: ArrayRef, geo_columns: &HashSet<String>) -> VortexResult
let names = struct_array.names().clone();
let validity = struct_array.struct_validity();
let len = struct_array.len();
let tagged = names
.iter()
.zip(struct_array.iter_unmasked_fields())
.map(|(name, field)| {
if geo_columns.contains(name.as_ref()) && field.dtype().is_binary() {
let ext = wkb_ext_dtype(field.dtype())?;
Ok(ExtensionArray::try_new(ext, field.clone())?.into_array())
} else {
Ok(field.clone())
let mut ctx = SESSION.create_execution_ctx();
let mut tagged = Vec::with_capacity(names.len());
for (name, field) in names.iter().zip(struct_array.iter_unmasked_fields()) {
if geo_columns.contains(name.as_ref()) && field.dtype().is_binary() {
let ext = wkb_ext_dtype(field.dtype())?;
let little_endian = wkb_field_to_little_endian(field, &mut ctx)?;
tagged.push(ExtensionArray::try_new(ext, little_endian)?.into_array());
} else {
tagged.push(field.clone());
}
}
Ok(StructArray::try_new(names, tagged, len, validity)?.into_array())
}

/// Re-encode a binary `field`'s WKB values as little-endian (NDR), the only byte order DuckDB's
/// `GEOMETRY` accepts. Byte order is uniform within a column, so the array is returned untouched (no
/// copy) unless its first non-null value is big-endian; only then is each value re-encoded.
fn wkb_field_to_little_endian(field: &ArrayRef, ctx: &mut ExecutionCtx) -> VortexResult<ArrayRef> {
let values = field.clone().execute::<VarBinViewArray>(ctx)?;
let len = values.len();
let validity = values.validity()?.execute_mask(len, ctx)?;
let is_big_endian = (0..len)
.find(|&i| validity.value(i))
.is_some_and(|i| values.bytes_at(i).as_slice().first() == Some(&0x00));
if !is_big_endian {
return Ok(field.clone());
}
let little_endian: Vec<Option<Vec<u8>>> = (0..len)
.map(|i| {
if !validity.value(i) {
return Ok(None);
}
let wkb = values.bytes_at(i);
let geometry = read_wkb(wkb.as_slice()).map_err(|e| vortex_err!("invalid WKB: {e}"))?;
let mut encoded = Vec::with_capacity(wkb.len());
write_geometry(
&mut encoded,
&geometry,
&WriteOptions {
endianness: Endianness::LittleEndian,
},
)
.map_err(|e| vortex_err!("re-encoding WKB as little-endian: {e}"))?;
Ok(Some(encoded))
})
.collect::<VortexResult<Vec<_>>>()?;
Ok(StructArray::try_new(names, tagged, len, validity)?.into_array())
.collect::<VortexResult<_>>()?;
Ok(VarBinViewArray::from_iter_nullable_bin(little_endian).into_array())
}
Loading
Loading