Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions vortex-array/src/arrays/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,7 @@ pub(crate) fn initialize(session: &vortex_session::VortexSession) {
listview::initialize(session);
patched::initialize(session);
primitive::initialize(session);
shared::initialize(session);
struct_::initialize(session);
varbin::initialize(session);
varbinview::initialize(session);
Expand Down
42 changes: 42 additions & 0 deletions vortex-array/src/arrays/shared/kernel.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
// SPDX-License-Identifier: Apache-2.0
// SPDX-FileCopyrightText: Copyright the Vortex contributors

use vortex_error::VortexResult;
use vortex_session::VortexSession;

use crate::ArrayRef;
use crate::ArrayVTable;
use crate::ExecutionCtx;
use crate::arrays::Shared;
use crate::arrays::shared::SharedArrayExt;
use crate::executor::execute_parent_for_child;
use crate::optimizer::kernels::ArrayKernelsExt;
use crate::optimizer::kernels::ExecuteParentFn;

pub(crate) fn initialize(session: &VortexSession) {
session
.kernels()
.register_execute_parent_for_any_parent(Shared.id(), &[execute_parent as ExecuteParentFn]);
}

fn execute_parent(
child: &ArrayRef,
parent: &ArrayRef,
child_idx: usize,
ctx: &mut ExecutionCtx,
) -> VortexResult<Option<ArrayRef>> {
let Some(shared) = child.as_opt::<Shared>() else {
return Ok(None);
};

let mut current = shared.current_array_ref().clone();
while let Some(source) = current
.as_opt::<Shared>()
.map(|shared| shared.current_array_ref().clone())
{
current = source;
}

let kernels = ctx.execute_parent_kernels();
execute_parent_for_child(parent, &current, child_idx, kernels.as_ref(), ctx)
}
5 changes: 5 additions & 0 deletions vortex-array/src/arrays/shared/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,17 @@
// SPDX-FileCopyrightText: Copyright the Vortex contributors

mod array;
mod kernel;
mod vtable;

pub use array::SharedArrayExt;
pub use array::SharedData;
pub use vtable::Shared;
pub use vtable::SharedArray;

pub(crate) fn initialize(session: &vortex_session::VortexSession) {
kernel::initialize(session);
}

#[cfg(test)]
mod tests;
8 changes: 8 additions & 0 deletions vortex-array/src/arrays/shared/vtable.rs
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,14 @@ impl VTable for Shared {
.get_or_compute(|source| source.clone().execute::<Canonical>(ctx))
.map(ExecutionResult::done)
}

fn reduce_parent(
array: ArrayView<'_, Self>,
parent: &ArrayRef,
child_idx: usize,
) -> VortexResult<Option<ArrayRef>> {
array.current_array_ref().reduce_parent(parent, child_idx)
}
}
impl OperationsVTable<Shared> for Shared {
fn scalar_at(
Expand Down
117 changes: 93 additions & 24 deletions vortex-array/src/executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,8 @@ use crate::memory::HostAllocatorRef;
use crate::memory::MemorySessionExt;
use crate::optimizer::ArrayOptimizer;
use crate::optimizer::kernels::ArrayKernelsExt;
use crate::optimizer::kernels::ExecuteParentKernelRef;
use crate::optimizer::kernels::ParentExecutionKernels;
use crate::optimizer::kernels::execute_parent_key;
use crate::stats::ArrayStats;
use crate::stats::StatsSet;

Expand Down Expand Up @@ -360,6 +360,10 @@ impl ExecutionCtx {
self.session.allocator()
}

pub(crate) fn execute_parent_kernels(&self) -> Arc<ParentExecutionKernels> {
Arc::clone(&self.execute_parent_kernels)
}

/// Log an execution step at the current depth.
///
/// Steps are accumulated and dumped as a single trace on Drop at DEBUG level.
Expand Down Expand Up @@ -562,29 +566,50 @@ fn finalize_done(
Ok((output, None))
}

fn execute_parent_for_child(
pub(crate) fn execute_parent_for_child(
parent: &ArrayRef,
child: &ArrayRef,
slot_idx: usize,
kernels: &ParentExecutionKernels,
ctx: &mut ExecutionCtx,
) -> VortexResult<Option<ArrayRef>> {
let key = execute_parent_key(parent.encoding_id(), child.encoding_id());
if let Some(plugins) = kernels.get(&key) {
for plugin in plugins.as_ref() {
if let Some(result) = plugin.execute_parent(child, parent, slot_idx, ctx)? {
if cfg!(debug_assertions) {
vortex_ensure!(
result.len() == parent.len(),
"Executed parent canonical length mismatch"
);
vortex_ensure!(
result.dtype() == parent.dtype(),
"Executed parent canonical dtype mismatch"
);
}
return Ok(Some(result));
if let Some(plugins) = kernels.exact(parent.encoding_id(), child.encoding_id())
&& let Some(result) =
execute_parent_with_plugins(plugins.as_ref(), parent, child, slot_idx, ctx)?
{
return Ok(Some(result));
}
Comment thread
gatesn marked this conversation as resolved.

if let Some(plugins) = kernels.any_parent(child.encoding_id())
&& let Some(result) =
execute_parent_with_plugins(plugins.as_ref(), parent, child, slot_idx, ctx)?
{
return Ok(Some(result));
}

Ok(None)
}

fn execute_parent_with_plugins(
plugins: &[ExecuteParentKernelRef],
parent: &ArrayRef,
child: &ArrayRef,
slot_idx: usize,
ctx: &mut ExecutionCtx,
) -> VortexResult<Option<ArrayRef>> {
for plugin in plugins {
if let Some(result) = plugin.execute_parent(child, parent, slot_idx, ctx)? {
if cfg!(debug_assertions) {
vortex_ensure!(
result.len() == parent.len(),
"Executed parent canonical length mismatch"
);
vortex_ensure!(
result.dtype() == parent.dtype(),
"Executed parent canonical dtype mismatch"
);
}
return Ok(Some(result));
}
}

Expand Down Expand Up @@ -860,10 +885,13 @@ mod tests {
use crate::VTable as _;
use crate::VortexSessionExecute;
use crate::arrays::Bool;
use crate::arrays::BoolArray;
use crate::arrays::Primitive;
use crate::arrays::PrimitiveArray;
use crate::arrays::SharedArray;
use crate::arrays::shared;
use crate::optimizer::kernels::ExecuteParentFn;
use crate::optimizer::kernels::KernelSession;
use crate::optimizer::kernels::execute_parent_key;

fn noop_execute_parent(
_child: &ArrayRef,
Expand All @@ -877,13 +905,13 @@ mod tests {
#[test]
fn execution_ctx_snapshots_execute_parent_kernels_at_creation() {
let session = VortexSession::empty().with_some(KernelSession::empty());
let key = execute_parent_key(Bool.id(), Primitive.id());

let before_registration = session.create_execution_ctx();
assert!(
!before_registration
before_registration
.execute_parent_kernels
.contains_key(&key)
.exact(Bool.id(), Primitive.id())
.is_none()
);

session.kernels().register_execute_parent(
Expand All @@ -893,12 +921,53 @@ mod tests {
);

assert!(
!before_registration
before_registration
.execute_parent_kernels
.contains_key(&key)
.exact(Bool.id(), Primitive.id())
.is_none()
);

let after_registration = session.create_execution_ctx();
assert!(after_registration.execute_parent_kernels.contains_key(&key));
assert!(
after_registration
.execute_parent_kernels
.exact(Bool.id(), Primitive.id())
.is_some()
);
}

fn primitive_execute_parent(
child: &ArrayRef,
parent: &ArrayRef,
child_idx: usize,
_ctx: &mut ExecutionCtx,
) -> VortexResult<Option<ArrayRef>> {
assert!(child.as_opt::<Primitive>().is_some());
assert!(parent.as_opt::<Bool>().is_some());
assert_eq!(child_idx, 1);
Ok(Some(parent.clone()))
}

#[test]
fn shared_child_uses_any_parent_execute_kernel() -> VortexResult<()> {
let session = VortexSession::empty().with_some(KernelSession::empty());
session.kernels().register_execute_parent(
Bool.id(),
Primitive.id(),
&[primitive_execute_parent as ExecuteParentFn],
);
shared::initialize(&session);

let mut ctx = session.create_execution_ctx();
let kernels = ctx.execute_parent_kernels();
let parent = BoolArray::from_iter([true, false]).into_array();
let child =
SharedArray::new(PrimitiveArray::from_iter([1i32, 2]).into_array()).into_array();

let result = execute_parent_for_child(&parent, &child, 1, kernels.as_ref(), &mut ctx)?
.expect("shared child should dispatch to wrapped primitive kernel");

assert_eq!(result.encoding_id(), Bool.id());
Ok(())
}
}
Loading
Loading