Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 20 additions & 17 deletions vortex-web/crate/src/wasm.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,8 @@ use futures::TryStreamExt;
use futures::future::BoxFuture;
use serde::Serialize;
use vortex::array::ArrayRef;
use vortex::array::LEGACY_SESSION;
use vortex::array::VortexSessionExecute;
use vortex::array::arrow::ArrowArrayExecutor;
use vortex::array::arrow::ArrowSessionExt;
use vortex::array::buffer::BufferHandle;
use vortex::array::dtype::DType;
use vortex::array::serde::SerializedArray;
Expand Down Expand Up @@ -312,8 +311,8 @@ impl VortexFileHandle {
.await
.map_err(|e| JsValue::from_str(&e.to_string()))?;

let schema =
dtype_to_schema(&dtype, "value").map_err(|e| JsValue::from_str(&e.to_string()))?;
let schema = dtype_to_schema(&self.session, &dtype, "value")
.map_err(|e| JsValue::from_str(&e.to_string()))?;
let arrow_schema = Arc::new(schema);

let mut buf = Vec::new();
Expand All @@ -322,7 +321,7 @@ impl VortexFileHandle {
.map_err(|e| JsValue::from_str(&e.to_string()))?;

for chunk in chunks {
let batch = array_to_record_batch(chunk, &dtype, &arrow_schema)
let batch = array_to_record_batch(&self.session, chunk, &dtype, &arrow_schema)
.map_err(|e| JsValue::from_str(&e.to_string()))?;
writer
.write(&batch)
Expand Down Expand Up @@ -511,11 +510,11 @@ impl VortexFileHandle {

// Convert to Arrow IPC.
let array_dtype = current.dtype().clone();
let schema = dtype_to_schema(&array_dtype, "value")
let schema = dtype_to_schema(&self.session, &array_dtype, "value")
.map_err(|e| JsValue::from_str(&e.to_string()))?;
let arrow_schema = Arc::new(schema);

let batch = array_to_record_batch(current, &array_dtype, &arrow_schema)
let batch = array_to_record_batch(&self.session, current, &array_dtype, &arrow_schema)
.map_err(|e| JsValue::from_str(&e.to_string()))?;

let mut ipc_buf = Vec::new();
Expand Down Expand Up @@ -743,14 +742,14 @@ fn downgrade_arrow_type(dt: DataType) -> DataType {
}

/// Create an Arrow Schema from a Vortex DType, with view types downgraded.
fn dtype_to_schema(dtype: &DType, default_name: &str) -> VortexResult<Schema> {
fn dtype_to_schema(
session: &VortexSession,
dtype: &DType,
default_name: &str,
) -> VortexResult<Schema> {
let schema = match dtype {
DType::Struct(..) => dtype.to_arrow_schema()?,
other => {
let arrow_dt = other.to_arrow_dtype()?;
let nullable = other.is_nullable();
Schema::new(vec![Field::new(default_name, arrow_dt, nullable)])
}
DType::Struct(..) => session.arrow().to_arrow_schema(dtype)?,
other => Schema::new(vec![session.arrow().to_arrow_field(default_name, other)?]),
};
// Downgrade view types in all fields.
Ok(Schema::new(
Expand All @@ -770,8 +769,9 @@ fn dtype_to_schema(dtype: &DType, default_name: &str) -> VortexResult<Schema> {

/// Convert a Vortex ArrayRef into an Arrow RecordBatch using the given schema.
///
/// Always uses `execute_arrow` with explicit types to ensure view types are avoided.
/// Always executes against an explicit target type to ensure view types are avoided.
fn array_to_record_batch(
session: &VortexSession,
array: ArrayRef,
dtype: &DType,
schema: &Arc<Schema>,
Expand All @@ -780,8 +780,11 @@ fn array_to_record_batch(
DType::Struct(..) => DataType::Struct(schema.fields().clone()),
_ => schema.field(0).data_type().clone(),
};
let mut ctx = LEGACY_SESSION.create_execution_ctx();
let arrow = array.execute_arrow(Some(&data_type), &mut ctx)?;
let target = Field::new("", data_type, array.dtype().is_nullable());
let mut ctx = session.create_execution_ctx();
let arrow = session
.arrow()
.execute_arrow(array, Some(&target), &mut ctx)?;
match dtype {
DType::Struct(..) => Ok(RecordBatch::from(arrow.as_struct().clone())),
_ => Ok(RecordBatch::try_new(schema.clone(), vec![arrow])?),
Expand Down
Loading