diff --git a/Cargo.lock b/Cargo.lock index 6e2414ba4..bf98e249e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4029,6 +4029,7 @@ dependencies = [ name = "datasets-derived" version = "0.1.0" dependencies = [ + "async-trait", "common", "datafusion", "datasets-common", diff --git a/crates/core/common/src/catalog.rs b/crates/core/common/src/catalog.rs index 0d12e6724..68ff23e09 100644 --- a/crates/core/common/src/catalog.rs +++ b/crates/core/common/src/catalog.rs @@ -3,4 +3,5 @@ pub mod errors; pub mod logical; pub mod physical; pub mod reader; +pub mod resolve; pub mod sql; diff --git a/crates/core/common/src/catalog/dataset_access.rs b/crates/core/common/src/catalog/dataset_access.rs index 62d28a9bf..c1b96d3f7 100644 --- a/crates/core/common/src/catalog/dataset_access.rs +++ b/crates/core/common/src/catalog/dataset_access.rs @@ -9,7 +9,7 @@ use crate::{BoxError, Dataset}; /// /// This trait provides the minimal interface required for SQL catalog building, /// abstracting over the dataset store implementation. -pub trait DatasetAccess { +pub trait DatasetAccess: Send + Sync + 'static { /// Resolve a dataset reference to a hash reference. /// /// This method resolves a dataset reference (which may contain a version, "latest", etc.) diff --git a/crates/core/common/src/catalog/errors.rs b/crates/core/common/src/catalog/errors.rs index dc9ee1d17..4d72fc6a5 100644 --- a/crates/core/common/src/catalog/errors.rs +++ b/crates/core/common/src/catalog/errors.rs @@ -57,6 +57,12 @@ pub enum CatalogForSqlError { /// exists in the dataset definition but has no physical parquet files. #[error("Table '{table}' has not been synced")] TableNotSynced { table: String }, + + /// Failed during catalog resolution. + /// + /// This wraps errors from the core resolution logic. + #[error("Catalog resolution failed")] + Resolution(#[source] BoxError), } impl CatalogForSqlError { @@ -174,11 +180,17 @@ pub enum PlanningCtxForSqlError { /// dataset does not support eth_call (not an EVM RPC dataset or no provider configured). #[error("Function 'eth_call' not available for dataset '{reference}'")] EthCallNotAvailable { reference: HashReference }, + + /// Failed during catalog resolution. + /// + /// This wraps errors from the core resolution logic. + #[error("Catalog resolution failed")] + Resolution(#[source] BoxError), } -/// Errors specific to catalog_for_sql_with_deps operations +/// Errors specific to catalog_for_derived_table operations /// -/// This error type is used exclusively by `catalog_for_sql_with_deps()` to create +/// This error type is used exclusively by `catalog_for_derived_table()` to create /// a physical catalog for SQL query execution with pre-resolved dependencies. #[derive(Debug, thiserror::Error)] #[allow(clippy::large_enum_variant)] diff --git a/crates/core/common/src/catalog/resolve.rs b/crates/core/common/src/catalog/resolve.rs new file mode 100644 index 000000000..fc4f9eb8a --- /dev/null +++ b/crates/core/common/src/catalog/resolve.rs @@ -0,0 +1,425 @@ +//! Core name resolution for SQL catalogs. +//! +//! This module provides a unified implementation for resolving table and function +//! references from SQL queries into logical catalogs. It abstracts over different +//! schema resolution strategies (dynamic vs pre-resolved) via the [`SchemaResolver`] trait. +//! +//! # Schema Resolvers +//! +//! Two resolver implementations are provided: +//! +//! - [`RegistrySchemaResolver`]: Resolves dataset references dynamically via a store. +//! Used for user queries where datasets are referenced by name/version. +//! +//! - For pre-resolved dependencies (derived datasets), see the `datasets-derived` crate +//! which provides its own resolver implementation. + +use std::{ + collections::{BTreeMap, btree_map::Entry}, + sync::Arc, +}; + +use async_trait::async_trait; +use datafusion::logical_expr::{ScalarUDF, async_udf::AsyncScalarUDF}; +use datasets_common::{ + func_name::{ETH_CALL_FUNCTION_NAME, FuncName}, + hash::Hash, + hash_reference::HashReference, + partial_reference::{PartialReference, PartialReferenceError}, + reference::Reference, + table_name::TableName, +}; +use js_runtime::isolate_pool::IsolatePool; + +use super::{ + dataset_access::DatasetAccess, + logical::{Dataset, Function, LogicalCatalog}, +}; +use crate::{BoxError, ResolvedTable, js_udf::JsUdf, sql::TableReference}; + +/// The schema name used for self-references in derived datasets. +pub const SELF_SCHEMA: &str = "self"; + +// ============================================================================ +// Self References +// ============================================================================ + +/// Functions available for self-reference resolution in derived datasets. +/// +/// This type contains the functions needed to resolve `self.function_name()` references +/// in SQL queries. It can be created from a `Dataset` or directly from function definitions. +#[derive(Clone)] +pub struct SelfReferences { + functions: Vec, +} + +impl SelfReferences { + /// Creates a new `SelfReferences` from a list of functions. + pub fn new(functions: Vec) -> Self { + Self { functions } + } + + /// Creates an empty `SelfReferences` with no functions. + pub fn empty() -> Self { + Self { + functions: Vec::new(), + } + } + + /// Returns true if there are no functions available for self-reference. + pub fn is_empty(&self) -> bool { + self.functions.is_empty() + } + + /// Returns a specific JS function by name. + /// + /// This implements lazy loading by only instantiating the requested function. + pub fn function_by_name(&self, name: &str, isolate_pool: IsolatePool) -> Option { + self.functions.iter().find(|f| f.name == name).map(|f| { + AsyncScalarUDF::new(Arc::new(JsUdf::new( + isolate_pool, + SELF_SCHEMA.to_string(), + f.source.source.clone(), + f.source.filename.clone().into(), + f.name.clone().into(), + f.input_types.clone(), + f.output_type.clone(), + ))) + .into_scalar_udf() + }) + } +} + +impl From<&Dataset> for SelfReferences { + fn from(dataset: &Dataset) -> Self { + Self { + functions: dataset.functions.clone(), + } + } +} + +impl From<&Arc> for SelfReferences { + fn from(dataset: &Arc) -> Self { + Self { + functions: dataset.functions.clone(), + } + } +} + +/// Trait for resolving schema strings to hash references. +/// +/// This trait abstracts over different resolution strategies: +/// - **Dynamic resolution**: Resolve dataset names/versions via a store (for user queries) +/// - **Pre-resolved**: Lookup aliases in a dependencies map (for derived datasets) +#[async_trait] +pub trait SchemaResolver: Send + Sync { + /// Error type for resolution failures. + type Error: std::error::Error + Send + Sync + 'static; + + /// Resolve a schema string to a [`HashReference`]. + /// + /// The schema string format depends on the resolver implementation: + /// - For dynamic resolution: `"namespace/name"` or `"namespace/name@version"` + /// - For pre-resolved: dependency alias like `"eth"` or `"uniswap"` + async fn resolve(&self, schema: &str) -> Result; +} + +// ============================================================================ +// Resolve Errors +// ============================================================================ + +/// Errors that can occur during catalog resolution. +#[derive(Debug, thiserror::Error)] +pub enum ResolveError { + /// Schema resolution failed. + #[error("schema resolution failed: {0}")] + SchemaResolution(SE), + + /// Failed to load dataset from store. + #[error("failed to load dataset {hash}")] + DatasetLoad { + hash: HashReference, + #[source] + source: BoxError, + }, + + /// Table not found in dataset. + #[error("table '{table}' not found in dataset {hash}")] + TableNotFound { + table: TableName, + hash: HashReference, + }, + + /// Function not found in dataset. + #[error("function '{schema}.{function}' not found in dataset {hash}")] + FunctionNotFound { + schema: String, + function: FuncName, + hash: HashReference, + }, + + /// Self-referenced function not found in self dataset. + #[error("self-referenced function '{function}' not found in self dataset")] + SelfFunctionNotFound { function: FuncName }, + + /// Self-reference used but no self dataset provided. + #[error("self-reference to function '{function}' but no self dataset provided")] + NoSelfDataset { function: FuncName }, + + /// eth_call function not available for dataset. + #[error("eth_call not available for dataset {hash}")] + EthCallNotAvailable { hash: HashReference }, + + /// Failed to create eth_call UDF. + #[error("failed to create eth_call UDF for dataset {hash}")] + EthCallCreation { + hash: HashReference, + #[source] + source: BoxError, + }, +} + +/// Resolves table and function references into a logical catalog. +/// +/// This is the core resolution function that handles: +/// - Table resolution: schema + table name → dataset → table schema +/// - Function resolution: schema + function name → dataset → UDF +/// - Self-function resolution: `self.function` → self_dataset → UDF +/// - Deduplication: Multiple references to the same table/function share one instance +/// +/// # Parameters +/// +/// - `store`: Dataset access for loading datasets +/// - `resolver`: Schema resolver for converting schema strings to hash references +/// - `table_refs`: Iterator of (schema, table_name) tuples for qualified table references +/// - `func_refs`: Iterator of (schema?, function_name) tuples. `None` schema means bare +/// function (built-in DataFusion function) +/// - `self_refs`: Functions for resolving `self.function` references (for derived datasets) +/// - `isolate_pool`: JavaScript isolate pool for creating UDFs +/// +/// # Errors +/// +/// Returns [`ResolveError`] if schema resolution, dataset loading, or entity lookup fails. +pub async fn resolve_logical_catalog( + store: &impl DatasetAccess, + resolver: &R, + table_refs: impl IntoIterator, + func_refs: impl IntoIterator, FuncName)>, + self_refs: SelfReferences, + isolate_pool: &IsolatePool, +) -> Result> +where + R: SchemaResolver, +{ + // Use hash-based map to deduplicate datasets and collect resolved tables + // Outer key: dataset hash, Inner key: "schema.table" string + let mut tables: BTreeMap> = BTreeMap::new(); + // Track UDFs from external datasets + // Outer key: dataset hash, Inner key: "schema.function" string + let mut udfs: BTreeMap> = BTreeMap::new(); + // Track self-referenced UDFs separately (deduplicated by function name) + let mut self_udfs: BTreeMap = BTreeMap::new(); + + // === Table Resolution === + for (schema, table) in table_refs { + let hash_ref = resolver + .resolve(&schema) + .await + .map_err(ResolveError::SchemaResolution)?; + + let key = format!("{}.{}", schema, table); + + // Skip if already resolved (deduplication) + if let Entry::Occupied(_) = tables + .entry(hash_ref.hash().clone()) + .or_default() + .entry(key.clone()) + { + continue; + } + + let dataset = + store + .get_dataset(&hash_ref) + .await + .map_err(|e| ResolveError::DatasetLoad { + hash: hash_ref.clone(), + source: e, + })?; + + let dataset_table = dataset + .tables + .iter() + .find(|t| t.name() == &table) + .ok_or_else(|| ResolveError::TableNotFound { + table: table.clone(), + hash: hash_ref.clone(), + })?; + + let resolved = ResolvedTable::new( + TableReference::partial(schema.clone(), table), + dataset_table.clone(), + dataset, + ); + + tables + .entry(hash_ref.hash().clone()) + .or_default() + .insert(key, resolved); + } + + // === Function Resolution === + for (schema, function) in func_refs { + match schema { + // Bare function - skip (built-in DataFusion function) + None => continue, + + // Self-reference - resolve from self_refs + Some(ref s) if s == SELF_SCHEMA => { + // Skip if already resolved (deduplication) + if self_udfs.contains_key(&function) { + continue; + } + + // Check if self-references are available + if self_refs.is_empty() { + return Err(ResolveError::NoSelfDataset { + function: function.clone(), + }); + } + + let udf = self_refs + .function_by_name(function.as_ref(), isolate_pool.clone()) + .ok_or_else(|| ResolveError::SelfFunctionNotFound { + function: function.clone(), + })?; + + self_udfs.insert(function, udf); + } + + // External dataset reference + Some(schema) => { + let hash_ref = resolver + .resolve(&schema) + .await + .map_err(ResolveError::SchemaResolution)?; + + let key = format!("{}.{}", schema, function); + + // Skip if already resolved (deduplication) + if let Entry::Occupied(_) = udfs + .entry(hash_ref.hash().clone()) + .or_default() + .entry(key.clone()) + { + continue; + } + + let dataset = + store + .get_dataset(&hash_ref) + .await + .map_err(|e| ResolveError::DatasetLoad { + hash: hash_ref.clone(), + source: e, + })?; + + let udf = if function.as_ref() == ETH_CALL_FUNCTION_NAME { + store + .eth_call_for_dataset(&schema, &dataset) + .await + .map_err(|e| ResolveError::EthCallCreation { + hash: hash_ref.clone(), + source: e, + })? + .ok_or_else(|| ResolveError::EthCallNotAvailable { + hash: hash_ref.clone(), + })? + } else { + dataset + .function_by_name(schema.clone(), function.as_ref(), isolate_pool.clone()) + .ok_or_else(|| ResolveError::FunctionNotFound { + schema: schema.clone(), + function: function.clone(), + hash: hash_ref.clone(), + })? + }; + + udfs.entry(hash_ref.hash().clone()) + .or_default() + .insert(key, udf); + } + } + } + + Ok(LogicalCatalog { + tables: tables.into_values().flat_map(|m| m.into_values()).collect(), + udfs: self_udfs + .into_values() + .chain(udfs.into_values().flat_map(|m| m.into_values())) + .collect(), + }) +} + +// ============================================================================ +// Registry Schema Resolver +// ============================================================================ + +/// Resolves schema strings dynamically via a dataset store. +/// +/// This resolver is used for user queries where datasets are referenced by +/// name and optionally version (e.g., `"namespace/dataset"` or `"namespace/dataset@v1.0.0"`). +/// +/// The schema string is parsed as a [`PartialReference`] and then resolved +/// via the store's `resolve_revision` method. +pub struct RegistrySchemaResolver<'a, S> { + store: &'a S, +} + +impl<'a, S> RegistrySchemaResolver<'a, S> { + /// Creates a new registry schema resolver using the given store. + pub fn new(store: &'a S) -> Self { + Self { store } + } +} + +#[async_trait] +impl SchemaResolver for RegistrySchemaResolver<'_, S> +where + S: DatasetAccess + Sync, +{ + type Error = RegistryResolveError; + + async fn resolve(&self, schema: &str) -> Result { + let partial: PartialReference = schema + .parse() + .map_err(RegistryResolveError::InvalidReference)?; + let reference: Reference = partial.into(); + self.store + .resolve_revision(&reference) + .await + .map_err(RegistryResolveError::StoreError)? + .ok_or_else(|| RegistryResolveError::NotFound(reference)) + } +} + +/// Errors from registry schema resolution. +#[derive(Debug, thiserror::Error)] +pub enum RegistryResolveError { + /// Schema string could not be parsed as a valid reference. + #[error("invalid reference format")] + InvalidReference(#[source] PartialReferenceError), + + /// Store returned an error during resolution. + #[error("store error")] + StoreError(#[source] BoxError), + + /// Dataset was not found. + #[error("dataset not found: {0}")] + NotFound(Reference), +} + +#[cfg(test)] +mod tests { + // Unit tests can be added here as needed +} diff --git a/crates/core/common/src/catalog/sql.rs b/crates/core/common/src/catalog/sql.rs index cf362629a..aefe8c7bf 100644 --- a/crates/core/common/src/catalog/sql.rs +++ b/crates/core/common/src/catalog/sql.rs @@ -4,90 +4,48 @@ //! Each function serves a specific data path in the Amp architecture, with clear separation //! between validation, planning, and execution operations. //! -//! # Function-to-Data-Path Mapping -//! -//! | Function | Schema Endpoint | Manifest Validation | Query Planning | Query Execution | Derived Dataset | Raw Dataset | -//! |-----------------------------------------------------|------------------|---------------------|------------------|------------------|------------------|-------------| -//! | [`planning_ctx_for_sql_tables_with_deps_and_funcs`] | ✅ | ✅ | ❌ | ❌ | ❌ | ❌ | -//! | [`planning_ctx_for_sql`] | ❌ | ❌ | ✅ **EXCLUSIVE** | ❌ | ❌ | ❌ | -//! | [`catalog_for_sql`] | ❌ | ❌ | ❌ | ✅ **EXCLUSIVE** | ❌ | ❌ | -//! | [`catalog_for_sql_with_deps`] | ❌ | ❌ | ❌ | ❌ | ✅ **EXCLUSIVE** | ❌ | -//! | [`get_logical_catalog`] | ❌ | ❌ | ❌ | ✅ (indirect) | ❌ | ❌ | -//! | [`get_logical_catalog_with_deps_and_funcs`] | ❌ | ❌ | ❌ | ❌ | ✅ (indirect) | ❌ | -//! //! # Data Paths //! -//! ## 1. Manifest Validation Path -//! -//! - **Purpose**: Validate dataset manifests without data access -//! - **Function**: [`planning_ctx_for_sql_tables_with_deps`] -//! - **Entry Points**: -//! - `POST /schema` endpoint (`crates/services/admin-api/src/handlers/schema.rs`) -//! - `POST /manifests` endpoint via manifest validation (`crates/services/admin-api/src/handlers/manifests/register.rs`) -//! - `POST /datasets` endpoint via manifest validation (`crates/services/admin-api/src/handlers/datasets/register.rs`) -//! - **Characteristics**: Multi-table validation, pre-resolved dependencies, no physical data -//! -//! ## 2. Query Planning Path +//! ## 1. Query Planning Path //! //! - **Purpose**: Generate query plans and schemas without execution //! - **Function**: [`planning_ctx_for_sql`] //! - **Entry**: Arrow Flight `GetFlightInfo` (`crates/services/server/src/flight.rs`) //! - **Characteristics**: Fast schema response, logical catalog only, dynamic dataset resolution //! -//! ## 3. Query Execution Path (Arrow Flight) +//! ## 2. Query Execution Path (Arrow Flight) //! //! - **Purpose**: Execute user queries via Arrow Flight -//! - **Function**: [`catalog_for_sql`] (calls [`get_logical_catalog`] internally) +//! - **Function**: [`catalog_for_sql`] //! - **Entry**: Arrow Flight `DoGet` (`crates/services/server/src/flight.rs`) //! - **Characteristics**: Full catalog with physical parquet locations, dynamic dataset resolution, streaming results //! - **Resolution Strategy**: Resolves dataset references to hashes at query time (supports "latest" and version tags) //! -//! ## 4. Derived Dataset Execution Path -//! -//! - **Purpose**: Execute SQL to create derived datasets during extraction -//! - **Function**: [`catalog_for_sql_with_deps`] (calls [`get_logical_catalog_with_deps_and_funcs`] internally) -//! - **Entry**: Worker-based extraction for SQL datasets (`crates/core/dump/src/core/sql_dump.rs`) -//! - **Characteristics**: Full catalog with physical parquet locations, pre-resolved dependencies, writes parquet files -//! - **Resolution Strategy**: Uses locked dataset hashes from manifest dependencies (deterministic, reproducible) -//! //! # Key Insights //! //! - **Clean separation**: Each public function serves exactly one primary data path //! - **Dependency handling**: Two parallel execution paths with different resolution strategies: //! - **Dynamic resolution** (`catalog_for_sql`): For user queries that reference datasets by name/version -//! - **Pre-resolved dependencies** (`catalog_for_sql_with_deps`): For derived datasets with locked dependencies -//! - **Function duplication**: `*_with_deps` variants avoid dual-mode logic and maintain clear boundaries -//! - **Lazy UDF loading**: All functions implement lazy UDF loading for optimal performance -//! - **No raw dataset overlap**: Raw dataset dumps don't use these planning functions -//! - **Two-phase resolution**: Functions without deps use a two-phase pattern: -//! 1. **Resolution phase**: `Reference` → `resolve_revision()` → `HashReference` -//! 2. **Retrieval phase**: `HashReference` → `get_dataset()` → `Dataset` -//! - **Pre-resolved deps**: Functions with deps receive `HashReference` and skip phase 1 +//! - **Pre-resolved dependencies** (in `datasets-derived`): For derived datasets with locked dependencies +//! - **Core resolution**: All functions delegate to [`resolve::resolve_logical_catalog`] for the actual +//! table and function resolution logic -use std::{ - collections::{BTreeMap, btree_map::Entry}, - sync::Arc, -}; +use std::sync::Arc; -use datafusion::{logical_expr::ScalarUDF, sql::parser::Statement}; -use datasets_common::{ - func_name::ETH_CALL_FUNCTION_NAME, hash::Hash, partial_reference::PartialReference, - reference::Reference, -}; +use datafusion::sql::parser::Statement; +use datasets_common::partial_reference::PartialReference; use js_runtime::isolate_pool::IsolatePool; use super::{ dataset_access::DatasetAccess, - errors::{CatalogForSqlError, GetLogicalCatalogError, PlanningCtxForSqlError}, - logical::LogicalCatalog, + errors::{CatalogForSqlError, PlanningCtxForSqlError}, physical::{Catalog, PhysicalTable}, + resolve::{RegistrySchemaResolver, SelfReferences, resolve_logical_catalog}, }; use crate::{ - PlanningContext, ResolvedTable, Store, + PlanningContext, Store, query_context::QueryEnv, - sql::{ - FunctionReference, TableReference, resolve_function_references, resolve_table_references, - }, + sql::{resolve_function_references, resolve_table_references}, }; /// Creates a full catalog with physical data access for SQL query execution. /// @@ -102,13 +60,13 @@ use crate::{ /// - Called during Arrow Flight `DoGet` phase to execute user queries /// - Provides physical catalog for streaming query results to clients /// -/// For derived dataset execution, use [`catalog_for_sql_with_deps`] instead. +/// For derived dataset execution, use [`catalog_for_derived_table`](datasets_derived::catalog::catalog_for_derived_table) instead. /// /// ## Implementation /// /// The function: /// 1. Extracts table references and function names from the query -/// 2. Calls [`get_logical_catalog`] to resolve datasets and build the logical catalog +/// 2. Calls [`resolve_logical_catalog`] to resolve datasets and build the logical catalog /// 3. Queries metadata database for physical parquet locations /// 4. Constructs physical catalog for query execution pub async fn catalog_for_sql( @@ -117,16 +75,33 @@ pub async fn catalog_for_sql( query: &Statement, env: QueryEnv, ) -> Result { - let table_refs = resolve_table_references::(query) - .map_err(CatalogForSqlError::TableReferenceResolution)?; - let func_refs = resolve_function_references::(query) - .map_err(CatalogForSqlError::FunctionReferenceResolution)?; - - let logical_catalog = - get_logical_catalog(dataset_store, table_refs, func_refs, &env.isolate_pool) - .await - .map_err(CatalogForSqlError::GetLogicalCatalog)?; - + // Extract table and function references from SQL + let table_refs: Vec<_> = resolve_table_references::(query) + .map_err(CatalogForSqlError::TableReferenceResolution)? + .into_iter() + .filter_map(|r| r.into_parts()) + .collect(); + + let func_refs: Vec<_> = resolve_function_references::(query) + .map_err(CatalogForSqlError::FunctionReferenceResolution)? + .into_iter() + .map(|r| r.into_parts()) + .collect(); + + // Resolve using the registry schema resolver + let resolver = RegistrySchemaResolver::new(dataset_store); + let logical_catalog = resolve_logical_catalog( + dataset_store, + &resolver, + table_refs, + func_refs, + SelfReferences::empty(), // No self-references for user queries + &env.isolate_pool, + ) + .await + .map_err(|e| CatalogForSqlError::Resolution(Box::new(e)))?; + + // Resolve physical table locations let mut tables = Vec::new(); for table in &logical_catalog.tables { let physical_table = PhysicalTable::get_active(data_store.clone(), table.clone()) @@ -207,338 +182,31 @@ pub async fn planning_ctx_for_sql( store: &impl DatasetAccess, query: &Statement, ) -> Result { - // Get table and function references from the SQL query - let table_refs = resolve_table_references::(query) - .map_err(PlanningCtxForSqlError::TableReferenceResolution)?; - let func_refs = resolve_function_references::(query) - .map_err(PlanningCtxForSqlError::FunctionReferenceResolution)?; - - // Use hash-based map to deduplicate datasets and collect resolved tables - // Inner map: table_ref -> ResolvedTable (deduplicates table references) - let mut tables: BTreeMap> = BTreeMap::new(); - // Track UDFs separately from datasets - outer key: dataset hash, inner key: function reference - // Inner map ensures deduplication: multiple function references to the same UDF share one instance - let mut udfs: BTreeMap> = BTreeMap::new(); - - // Part 1: Process table references - for table_ref in table_refs { - match &table_ref { - TableReference::Bare { .. } => { - return Err(PlanningCtxForSqlError::UnqualifiedTable { - table_ref: table_ref.to_string(), - }); - } - TableReference::Partial { schema, table } => { - // Schema is already parsed as PartialReference, convert to Reference - let reference: Reference = schema.as_ref().clone().into(); - - // Resolve reference to hash reference - let hash_ref = store - .resolve_revision(&reference) - .await - .map_err(|err| PlanningCtxForSqlError::ResolveDatasetReference { - reference: reference.clone(), - source: err, - })? - .ok_or_else(|| PlanningCtxForSqlError::ResolveDatasetReference { - reference: reference.clone(), - source: format!("Dataset '{}' not found", reference).into(), - })?; - - // Convert table_ref to use String schema for internal data structures - let table_ref_string = - TableReference::partial(schema.to_string(), table.as_ref().clone()); - - // Skip if table reference is already resolved (optimization to avoid redundant dataset loading) - let Entry::Vacant(entry) = tables - .entry(hash_ref.hash().clone()) - .or_default() - .entry(table_ref_string.clone()) - else { - continue; - }; - - // Load dataset by hash reference (cached by store) - let dataset = store.get_dataset(&hash_ref).await.map_err(|err| { - PlanningCtxForSqlError::LoadDataset { - reference: hash_ref.clone(), - source: err, - } - })?; - - // Find table in dataset - let dataset_table = dataset - .tables - .iter() - .find(|t| t.name() == table) - .ok_or_else(|| PlanningCtxForSqlError::TableNotFoundInDataset { - table_name: table.as_ref().clone(), - reference: hash_ref, - })?; - - // Create ResolvedTable with the converted string-based table reference - let resolved_table = ResolvedTable::new( - table_ref_string.clone(), - dataset_table.clone(), - dataset.clone(), - ); - - // Insert into vacant entry - entry.insert(resolved_table); - } - } - } - - // Part 2: Process function names (load datasets for UDFs only) - for func_ref in func_refs { - match &func_ref { - FunctionReference::Bare { .. } => continue, // Built-in DataFusion function - FunctionReference::Qualified { schema, function } => { - // Schema is already parsed as PartialReference, convert to Reference - let reference: Reference = schema.as_ref().clone().into(); - - // Resolve reference to hash reference - let hash_ref = store - .resolve_revision(&reference) - .await - .map_err(|err| PlanningCtxForSqlError::ResolveDatasetReference { - reference: reference.clone(), - source: err, - })? - .ok_or_else(|| PlanningCtxForSqlError::ResolveDatasetReference { - reference: reference.clone(), - source: format!("Dataset '{}' not found", reference).into(), - })?; - - // Load dataset by hash reference (cached by store) - let dataset = store.get_dataset(&hash_ref).await.map_err(|err| { - PlanningCtxForSqlError::LoadDataset { - reference: hash_ref.clone(), - source: err, - } - })?; - - // Convert func_ref to use String schema for internal data structures - let func_ref_string = - FunctionReference::qualified(schema.to_string(), function.as_ref().clone()); - - // Skip if function reference is already resolved (optimization to avoid redundant UDF creation) - let Entry::Vacant(entry) = udfs - .entry(hash_ref.hash().clone()) - .or_default() - .entry(func_ref_string) - else { - continue; - }; - - // Get the UDF for this function reference - let udf = if function.as_ref() == ETH_CALL_FUNCTION_NAME { - store - .eth_call_for_dataset(&schema.to_string(), &dataset) - .await - .map_err(|err| PlanningCtxForSqlError::EthCallUdfCreation { - reference: hash_ref.clone(), - source: err, - })? - .ok_or_else(|| PlanningCtxForSqlError::EthCallNotAvailable { - reference: hash_ref.clone(), - })? - } else { - dataset - .function_by_name(schema.to_string(), function, IsolatePool::dummy()) - .ok_or_else(|| PlanningCtxForSqlError::FunctionNotFoundInDataset { - function_name: func_ref.to_string(), - reference: hash_ref, - })? - }; - - entry.insert(udf); - } - } - } - - Ok(PlanningContext::new(LogicalCatalog { - tables: tables - .into_values() - .flat_map(|map| map.into_values()) - .collect(), - udfs: udfs - .into_values() - .flat_map(|map| map.into_values()) - .collect(), - })) -} - -/// Internal helper that builds a logical catalog from table and function references. -/// -/// This function resolves dataset references, loads datasets, and constructs a logical -/// catalog containing resolved tables and UDFs for query planning and execution. -/// -/// ## Where Used -/// -/// Called internally by [`catalog_for_sql`] to build the logical catalog before -/// retrieving physical parquet locations from the metadata database. -async fn get_logical_catalog( - store: &impl DatasetAccess, - table_refs: impl IntoIterator>, - func_refs: impl IntoIterator>, - isolate_pool: &IsolatePool, -) -> Result { - let table_refs = table_refs.into_iter().collect::>(); - let func_refs = func_refs.into_iter().collect::>(); - - // Use hash-based map to deduplicate datasets and collect resolved tables - // Inner map: table_ref -> ResolvedTable (deduplicates table references) - let mut tables: BTreeMap> = BTreeMap::new(); - // Track UDFs separately from datasets - outer key: dataset hash, inner key: function reference - // Inner map ensures deduplication: multiple function references to the same UDF share one instance - let mut udfs: BTreeMap> = BTreeMap::new(); - - // Part 1: Process table references - for table_ref in table_refs { - match &table_ref { - TableReference::Bare { .. } => { - return Err(GetLogicalCatalogError::UnqualifiedTable { - table_ref: table_ref.to_string(), - }); - } - TableReference::Partial { schema, table } => { - // Schema is already parsed as PartialReference, convert to Reference - let reference: Reference = schema.as_ref().clone().into(); - - // Resolve reference to hash reference - let hash_ref = store - .resolve_revision(&reference) - .await - .map_err(|err| GetLogicalCatalogError::ResolveDatasetReference { - reference: reference.clone(), - source: err, - })? - .ok_or_else(|| GetLogicalCatalogError::ResolveDatasetReference { - reference: reference.clone(), - source: format!("Dataset '{}' not found", reference).into(), - })?; - - // Convert table_ref to use String schema for internal data structures - let table_ref_string = - TableReference::partial(schema.to_string(), table.as_ref().clone()); - - // Skip if table reference is already resolved (optimization to avoid redundant dataset loading) - let Entry::Vacant(entry) = tables - .entry(hash_ref.hash().clone()) - .or_default() - .entry(table_ref_string.clone()) - else { - continue; - }; - - // Load dataset by hash reference (cached by store) - let dataset = store.get_dataset(&hash_ref).await.map_err(|err| { - GetLogicalCatalogError::LoadDataset { - reference: hash_ref.clone(), - source: err, - } - })?; - - // Find table in dataset - let dataset_table = dataset - .tables - .iter() - .find(|t| t.name() == table) - .ok_or_else(|| GetLogicalCatalogError::TableNotFoundInDataset { - table_name: table.as_ref().clone(), - reference: hash_ref, - })?; - - // Create ResolvedTable with the converted string-based table reference - let resolved_table = ResolvedTable::new( - table_ref_string.clone(), - dataset_table.clone(), - dataset.clone(), - ); - - // Insert into vacant entry - entry.insert(resolved_table); - } - } - } - - // Part 2: Process function names (load datasets for UDFs only) - for func_ref in func_refs { - match &func_ref { - FunctionReference::Bare { .. } => continue, // Built-in DataFusion function - FunctionReference::Qualified { schema, function } => { - // Schema is already parsed as PartialReference, convert to Reference - let reference: Reference = schema.as_ref().clone().into(); - - // Resolve reference to hash reference - let hash_ref = store - .resolve_revision(&reference) - .await - .map_err(|err| GetLogicalCatalogError::ResolveDatasetReference { - reference: reference.clone(), - source: err, - })? - .ok_or_else(|| GetLogicalCatalogError::ResolveDatasetReference { - reference: reference.clone(), - source: format!("Dataset '{}' not found", reference).into(), - })?; - - // Load dataset by hash reference (cached by store) - let dataset = store.get_dataset(&hash_ref).await.map_err(|err| { - GetLogicalCatalogError::LoadDataset { - reference: hash_ref.clone(), - source: err, - } - })?; - - // Convert func_ref to use String schema for internal data structures - let func_ref_string = - FunctionReference::qualified(schema.to_string(), function.as_ref().clone()); - - // Skip if function reference is already resolved (optimization to avoid redundant UDF creation) - let Entry::Vacant(entry) = udfs - .entry(hash_ref.hash().clone()) - .or_default() - .entry(func_ref_string) - else { - continue; - }; - - // Get the UDF for this function reference - let udf = if function.as_ref() == ETH_CALL_FUNCTION_NAME { - store - .eth_call_for_dataset(&schema.to_string(), &dataset) - .await - .map_err(|err| GetLogicalCatalogError::EthCallUdfCreation { - reference: hash_ref.clone(), - source: err, - })? - .ok_or_else(|| GetLogicalCatalogError::EthCallNotAvailable { - reference: hash_ref.clone(), - })? - } else { - dataset - .function_by_name(schema.to_string(), function, isolate_pool.clone()) - .ok_or_else(|| GetLogicalCatalogError::FunctionNotFoundInDataset { - function_name: func_ref.to_string(), - reference: hash_ref, - })? - }; - - entry.insert(udf); - } - } - } - - Ok(LogicalCatalog { - tables: tables - .into_values() - .flat_map(|map| map.into_values()) - .collect(), - udfs: udfs - .into_values() - .flat_map(|map| map.into_values()) - .collect(), - }) + // Extract table and function references from SQL + let table_refs: Vec<_> = resolve_table_references::(query) + .map_err(PlanningCtxForSqlError::TableReferenceResolution)? + .into_iter() + .filter_map(|r| r.into_parts()) + .collect(); + + let func_refs: Vec<_> = resolve_function_references::(query) + .map_err(PlanningCtxForSqlError::FunctionReferenceResolution)? + .into_iter() + .map(|r| r.into_parts()) + .collect(); + + // Resolve using the registry schema resolver + let resolver = RegistrySchemaResolver::new(store); + let catalog = resolve_logical_catalog( + store, + &resolver, + table_refs, + func_refs, + SelfReferences::empty(), // No self-references for user queries + &IsolatePool::dummy(), + ) + .await + .map_err(|e| PlanningCtxForSqlError::Resolution(Box::new(e)))?; + + Ok(PlanningContext::new(catalog)) } diff --git a/crates/core/common/src/sql.rs b/crates/core/common/src/sql.rs index d4d939202..9f0351f88 100644 --- a/crates/core/common/src/sql.rs +++ b/crates/core/common/src/sql.rs @@ -234,6 +234,17 @@ where }, } } + + /// Converts to (schema, table) tuple. Returns None for bare references. + /// + /// This is useful for passing table references to resolution functions + /// that work with string-based schemas. + pub fn into_parts(self) -> Option<(String, TableName)> { + match self { + Self::Bare { .. } => None, + Self::Partial { schema, table } => Some((schema.to_string(), (*table).clone())), + } + } } impl TableReference @@ -626,6 +637,18 @@ where }, } } + + /// Converts to (schema?, function) tuple. + /// + /// Bare functions return None for schema, qualified functions return Some(schema). + /// This is useful for passing function references to resolution functions + /// that work with string-based schemas. + pub fn into_parts(self) -> (Option, FuncName) { + match self { + Self::Bare { function } => (None, (*function).clone()), + Self::Qualified { schema, function } => (Some(schema.to_string()), (*function).clone()), + } + } } impl std::fmt::Display for FunctionReference diff --git a/crates/core/datasets-derived/Cargo.toml b/crates/core/datasets-derived/Cargo.toml index 5c7593fc4..3f34c72c3 100644 --- a/crates/core/datasets-derived/Cargo.toml +++ b/crates/core/datasets-derived/Cargo.toml @@ -8,6 +8,7 @@ license-file.workspace = true schemars = ["dep:schemars", "datasets-common/schemars", "common/schemars", "dep:serde_json"] [dependencies] +async-trait.workspace = true common = { version = "0.1.0", path = "../common" } datafusion.workspace = true datasets-common = { path = "../datasets-common" } diff --git a/crates/core/datasets-derived/src/catalog.rs b/crates/core/datasets-derived/src/catalog.rs index f62619fb8..5e9bdc62d 100644 --- a/crates/core/datasets-derived/src/catalog.rs +++ b/crates/core/datasets-derived/src/catalog.rs @@ -10,33 +10,27 @@ //! dataset dependencies (DepAlias → Hash mappings) for deterministic, reproducible //! derived dataset execution. -use std::{ - collections::{BTreeMap, btree_map::Entry}, - sync::Arc, -}; +use std::collections::BTreeMap; +use async_trait::async_trait; use common::{ - BoxError, PlanningContext, ResolvedTable, Store, + BoxError, PlanningContext, Store, catalog::{ dataset_access::DatasetAccess, - logical::LogicalCatalog, + logical::{Function as LogicalFunction, FunctionSource as LogicalFunctionSource}, physical::{Catalog, PhysicalTable}, + resolve::{SchemaResolver, SelfReferences, resolve_logical_catalog}, }, - js_udf::JsUdf, query_context::QueryEnv, sql::{ FunctionReference, ResolveFunctionReferencesError, ResolveTableReferencesError, TableReference, resolve_function_references, resolve_table_references, }, }; -use datafusion::{ - logical_expr::{ScalarUDF, async_udf::AsyncScalarUDF}, - sql::parser::Statement, -}; +use datafusion::sql::parser::Statement; use datasets_common::{ deps::alias::{DepAlias, DepAliasError, DepAliasOrSelfRef, DepAliasOrSelfRefError}, - func_name::{ETH_CALL_FUNCTION_NAME, FuncName}, - hash::Hash, + func_name::FuncName, hash_reference::HashReference, table_name::TableName, }; @@ -44,275 +38,130 @@ use js_runtime::isolate_pool::IsolatePool; use crate::manifest::Function; -pub async fn catalog_for_sql_with_deps( +// ============================================================================ +// Self References Helper +// ============================================================================ + +/// Creates `SelfReferences` from a derived dataset manifest. +/// +/// This is used when derived dataset functions need to be resolved via `self.function_name()` +/// references in SQL. +pub fn self_refs_from_manifest(manifest: &crate::Manifest) -> SelfReferences { + self_refs_from_functions(&manifest.functions) +} + +/// Creates `SelfReferences` from manifest function definitions. +pub fn self_refs_from_functions(functions: &BTreeMap) -> SelfReferences { + let logical_functions: Vec = functions + .iter() + .map(|(name, f)| LogicalFunction { + name: name.as_ref().to_string(), + input_types: f.input_types.iter().map(|dt| dt.0.clone()).collect(), + output_type: f.output_type.0.clone(), + source: LogicalFunctionSource { + source: f.source.source.clone(), + filename: f.source.filename.clone(), + }, + }) + .collect(); + + SelfReferences::new(logical_functions) +} + +// ============================================================================ +// Pre-Resolved Resolver +// ============================================================================ + +/// Resolves schema strings using a pre-resolved dependencies map. +/// +/// This resolver is used for derived datasets where dependencies are locked +/// and referenced by alias (e.g., `"eth"` → `HashReference`). +pub struct PreResolvedResolver<'a> { + dependencies: &'a BTreeMap, +} + +impl<'a> PreResolvedResolver<'a> { + /// Creates a new pre-resolved resolver using the given dependencies map. + pub fn new(dependencies: &'a BTreeMap) -> Self { + Self { dependencies } + } +} + +#[async_trait] +impl SchemaResolver for PreResolvedResolver<'_> { + type Error = PreResolvedError; + + async fn resolve(&self, schema: &str) -> Result { + let alias: DepAlias = schema.parse().map_err(PreResolvedError::InvalidAlias)?; + self.dependencies + .get(&alias) + .cloned() + .ok_or_else(|| PreResolvedError::AliasNotFound(alias)) + } +} + +/// Errors from pre-resolved schema resolution. +#[derive(Debug, thiserror::Error)] +pub enum PreResolvedError { + /// Schema string could not be parsed as a valid alias. + #[error("invalid alias format")] + InvalidAlias(#[source] DepAliasError), + + /// Dependency alias was not found in the dependencies map. + #[error("dependency alias not found: {0}")] + AliasNotFound(DepAlias), +} + +pub async fn catalog_for_derived_table( store: &impl DatasetAccess, data_store: &Store, query: &Statement, env: &QueryEnv, dependencies: &BTreeMap, - functions: &BTreeMap, -) -> Result { - let table_refs = resolve_table_references::(query) - .map_err(CatalogForSqlWithDepsError::TableReferenceResolution)?; - let func_refs = resolve_function_references::(query) - .map_err(CatalogForSqlWithDepsError::FunctionReferenceResolution)?; - - get_physical_catalog_with_deps( + self_refs: SelfReferences, +) -> Result { + // Extract table and function references from SQL + let table_refs: Vec<_> = resolve_table_references::(query) + .map_err(CatalogForDerivedTableError::TableReferenceResolution)? + .into_iter() + .filter_map(|r| r.into_parts()) + .collect(); + + let func_refs: Vec<_> = resolve_function_references::(query) + .map_err(CatalogForDerivedTableError::FunctionReferenceResolution)? + .into_iter() + .map(|r| r.into_parts()) + .collect(); + + // Resolve using the pre-resolved resolver + let resolver = PreResolvedResolver::new(dependencies); + let logical_catalog = resolve_logical_catalog( store, - data_store, + &resolver, table_refs, func_refs, - env, - dependencies, - functions, - ) - .await - .map_err(CatalogForSqlWithDepsError::GetPhysicalCatalogWithDeps) -} - -async fn get_physical_catalog_with_deps( - dataset_store: &impl DatasetAccess, - data_store: &Store, - table_refs: impl IntoIterator>, - function_refs: impl IntoIterator>, - env: &QueryEnv, - dependencies: &BTreeMap, - functions: &BTreeMap, -) -> Result { - let logical_catalog = get_logical_catalog_with_deps_and_funcs( - dataset_store, - table_refs, - function_refs, + self_refs, &env.isolate_pool, - dependencies, - functions, ) .await - .map_err(GetPhysicalCatalogWithDepsError::GetLogicalCatalogWithDepsAndFuncs)?; + .map_err(|e| CatalogForDerivedTableError::Resolution(Box::new(e)))?; + // Resolve physical table locations let mut tables = Vec::new(); for table in &logical_catalog.tables { let physical_table = PhysicalTable::get_active(data_store.clone(), table.clone()) .await - .map_err( - |err| GetPhysicalCatalogWithDepsError::PhysicalTableRetrieval { - table: table.to_string(), - source: err, - }, - )? - .ok_or(GetPhysicalCatalogWithDepsError::TableNotSynced { + .map_err(|err| CatalogForDerivedTableError::PhysicalTableRetrieval { + table: table.to_string(), + source: err, + })? + .ok_or_else(|| CatalogForDerivedTableError::TableNotSynced { table: table.to_string(), })?; tables.push(physical_table.into()); } - Ok(Catalog::new(tables, logical_catalog)) -} - -/// Internal helper that builds a logical catalog from table references and function names. -/// -/// This function resolves dataset references, loads dataset metadata, and creates UDFs -/// for the referenced datasets. It builds the logical layer of the catalog without -/// accessing physical data locations. -/// -/// ## Where Used -/// -/// Called internally by: -/// - `get_physical_catalog` (which is called by `catalog_for_sql`) -async fn get_logical_catalog_with_deps_and_funcs( - store: &impl DatasetAccess, - table_refs: impl IntoIterator>, - func_refs: impl IntoIterator>, - isolate_pool: &IsolatePool, - dependencies: &BTreeMap, - functions: &BTreeMap, -) -> Result { - let table_refs = table_refs.into_iter().collect::>(); - let func_refs = func_refs.into_iter().collect::>(); - - // Use hash-based map to deduplicate datasets and collect resolved tables - // Inner map: table_ref -> ResolvedTable (deduplicates table references) - let mut tables: BTreeMap> = BTreeMap::new(); - // Track UDFs from external dependencies - outer key: dataset hash, inner key: function reference - // Inner map ensures deduplication: multiple function references to the same UDF share one instance - let mut udfs: BTreeMap> = BTreeMap::new(); - // Track UDFs defined in this manifest (bare functions and self-references) - separate from dependency functions - // Ensures deduplication: multiple references to the same function share one instance - let mut self_udfs: BTreeMap, ScalarUDF> = BTreeMap::new(); - - // Part 1: Process table references - for table_ref in table_refs { - match &table_ref { - TableReference::Bare { .. } => { - return Err(GetLogicalCatalogWithDepsAndFuncsError::UnqualifiedTable { - table_ref: table_ref.to_string(), - }); - } - TableReference::Partial { schema, table } => { - // Schema is already parsed as DepAlias, lookup in dependencies map - let hash_ref = dependencies.get(schema.as_ref()).ok_or_else(|| { - GetLogicalCatalogWithDepsAndFuncsError::DependencyAliasNotFoundForTableRef { - alias: schema.as_ref().clone(), - } - })?; - - // Convert table_ref to use String schema for internal data structures - let table_ref_string = - TableReference::partial(schema.to_string(), table.as_ref().clone()); - - // Skip if table reference is already resolved (optimization to avoid redundant dataset loading) - let Entry::Vacant(entry) = tables - .entry(hash_ref.hash().clone()) - .or_default() - .entry(table_ref_string.clone()) - else { - continue; - }; - - // Load dataset by hash (cached by store) - let dataset = store.get_dataset(hash_ref).await.map_err(|err| { - GetLogicalCatalogWithDepsAndFuncsError::GetDatasetForTableRef { - reference: hash_ref.clone(), - source: err, - } - })?; - - // Find table in dataset - let dataset_table = dataset - .tables - .iter() - .find(|t| t.name() == table) - .ok_or_else(|| { - GetLogicalCatalogWithDepsAndFuncsError::TableNotFoundInDataset { - table_name: table.as_ref().clone(), - reference: hash_ref.clone(), - } - })?; - - // Create ResolvedTable with the converted string-based table reference - let resolved_table = ResolvedTable::new( - table_ref_string.clone(), - dataset_table.clone(), - dataset.clone(), - ); - - // Insert into vacant entry - entry.insert(resolved_table); - } - } - } - - // Part 2: Process function references (load datasets for qualified UDFs, create UDFs for bare functions) - for func_ref in func_refs { - match &func_ref { - // Skip bare functions - they are assumed to be built-in functions (Amp or DataFusion) - FunctionReference::Bare { function: _ } => continue, - FunctionReference::Qualified { schema, function } => { - // Match on schema type: DepAlias (external dependency) or SelfRef (same-dataset function) - match schema.as_ref() { - DepAliasOrSelfRef::DepAlias(dep_alias) => { - // External dependency reference - lookup in dependencies map - let hash_ref = dependencies.get(dep_alias).ok_or_else(|| { - GetLogicalCatalogWithDepsAndFuncsError::DependencyAliasNotFoundForFunctionRef { - alias: dep_alias.clone(), - } - })?; - - // Load dataset by hash (cached by store) - let dataset = store.get_dataset(hash_ref).await.map_err(|err| { - GetLogicalCatalogWithDepsAndFuncsError::GetDatasetForFunction { - reference: hash_ref.clone(), - source: err, - } - })?; - - // Convert func_ref to use String schema for internal data structures - let func_ref_string = FunctionReference::qualified( - schema.to_string(), - function.as_ref().clone(), - ); - - // Skip if function reference is already resolved (optimization to avoid redundant UDF creation) - let Entry::Vacant(entry) = udfs - .entry(hash_ref.hash().clone()) - .or_default() - .entry(func_ref_string) - else { - continue; - }; - - // Get the UDF for this function reference - let udf = if function.as_ref() == ETH_CALL_FUNCTION_NAME { - store - .eth_call_for_dataset(&schema.to_string(), &dataset) - .await - .map_err(|err| { - GetLogicalCatalogWithDepsAndFuncsError::EthCallUdfCreationForFunction { - reference: hash_ref.clone(), - source: err, - } - })? - .ok_or_else(|| GetLogicalCatalogWithDepsAndFuncsError::EthCallNotAvailable { - reference: hash_ref.clone(), - })? - } else { - dataset - .function_by_name(schema.to_string(), function, isolate_pool.clone()) - .ok_or_else(|| { - GetLogicalCatalogWithDepsAndFuncsError::FunctionNotFoundInDataset { - function_name: func_ref.to_string(), - reference: hash_ref.clone(), - } - })? - }; - - entry.insert(udf); - } - DepAliasOrSelfRef::SelfRef => { - // Same-dataset function reference (self.function_name) - // Look up function in the functions map (defined in this dataset) - if let Some(func_def) = functions.get(function) { - // Skip if function reference is already resolved (optimization) - let Entry::Vacant(entry) = self_udfs.entry(func_ref.clone()) else { - continue; - }; - - // Create UDF from Function definition using JsUdf - // Use "self" as schema qualifier to preserve case sensitivity - let udf = AsyncScalarUDF::new(Arc::new(JsUdf::new( - isolate_pool.clone(), - Some(datasets_common::deps::alias::SELF_REF_KEYWORD.to_string()), // Schema = "self" - func_def.source.source.clone(), - func_def.source.filename.clone().into(), - Arc::from(function.as_ref().as_str()), - func_def - .input_types - .iter() - .map(|dt| dt.clone().into_arrow()) - .collect(), - func_def.output_type.clone().into_arrow(), - ))) - .into_scalar_udf(); - - entry.insert(udf); - } - // If function not in functions map, it's an error (self.function should always be defined) - // TODO: Add proper error variant for this case - } - } - } - } - } - Ok(LogicalCatalog { - tables: tables - .into_values() - .flat_map(|map| map.into_values()) - .collect(), - udfs: self_udfs - .into_values() - .chain(udfs.into_values().flat_map(|map| map.into_values())) - .collect(), - }) + Ok(Catalog::new(tables, logical_catalog)) } /// Type alias for the table references map used in multi-table validation @@ -368,216 +217,51 @@ type TableReferencesMap = BTreeMap< /// ## Function Handling /// /// Bare (unqualified) function references are handled as follows: -/// - If the function is defined in the `functions` parameter, a UDF is created for it +/// - If the function is defined in the `self_refs` parameter, a UDF is created for it /// - If the function is not defined, it's assumed to be a built-in function (logged as debug) /// - TODO: Add validation against DataFusion built-in functions to catch typos pub async fn planning_ctx_for_sql_tables_with_deps_and_funcs( store: &impl DatasetAccess, references: TableReferencesMap, dependencies: BTreeMap, - functions: BTreeMap, + self_refs: SelfReferences, isolate_pool: IsolatePool, ) -> Result { - // Use hash-based map to deduplicate datasets across ALL tables - // Inner map: table_ref -> ResolvedTable (deduplicates table references) - let mut tables: BTreeMap> = BTreeMap::new(); - // Track UDFs from external dependencies - outer key: dataset hash, inner key: function reference - // Inner map ensures deduplication: multiple function references to the same UDF share one instance - let mut udfs: BTreeMap> = BTreeMap::new(); - // Track UDFs defined in this manifest (bare functions and self-references) - separate from dependency functions - // Ensures deduplication: multiple references to the same function share one instance - let mut self_udfs: BTreeMap, ScalarUDF> = BTreeMap::new(); - - // Process all tables - fail fast on first error - for (table_name, (table_refs, func_refs)) in references { - // Part 1: Process table references for this table - for table_ref in table_refs { - match &table_ref { - TableReference::Bare { .. } => { + // Check for unqualified tables and flatten refs + let mut table_refs = Vec::new(); + let mut func_refs = Vec::new(); + + for (table_name, (t_refs, f_refs)) in &references { + for t_ref in t_refs { + match t_ref.clone().into_parts() { + Some(parts) => table_refs.push(parts), + None => { return Err(PlanningCtxForSqlTablesWithDepsError::UnqualifiedTable { table_name: table_name.clone(), - table_ref: table_ref.to_string(), + table_ref: t_ref.to_string(), }); } - TableReference::Partial { schema, table } => { - // Schema is already parsed as DepAlias, lookup in dependencies map - let hash_ref = dependencies.get(schema.as_ref()).ok_or_else(|| { - PlanningCtxForSqlTablesWithDepsError::DependencyAliasNotFoundForTableRef { - table_name: table_name.clone(), - alias: schema.as_ref().clone(), - } - })?; - - // Skip if table reference is already resolved (optimization to avoid redundant dataset loading) - let Entry::Vacant(entry) = tables - .entry(hash_ref.hash().clone()) - .or_default() - .entry(table_ref.to_string_reference()) - else { - continue; - }; - - // Load dataset by hash (cached by store) - let dataset = store.get_dataset(hash_ref).await.map_err(|err| { - PlanningCtxForSqlTablesWithDepsError::GetDatasetForTableRef { - table_name: table_name.clone(), - reference: hash_ref.clone(), - source: err, - } - })?; - - // Find table in dataset - let dataset_table = dataset - .tables - .iter() - .find(|t| t.name() == table) - .ok_or_else(|| { - PlanningCtxForSqlTablesWithDepsError::TableNotFoundInDataset { - table_name: table_name.clone(), - referenced_table_name: table.as_ref().clone(), - reference: hash_ref.clone(), - } - })?; - - // Create ResolvedTable with the converted string-based table reference - let resolved_table = ResolvedTable::new( - table_ref.into_string_reference(), - dataset_table.clone(), - dataset.clone(), - ); - - // Insert into vacant entry - entry.insert(resolved_table); - } } } - - // Part 2: Process function references for this table (load datasets for qualified UDFs only) - for func_ref in func_refs { - match &func_ref { - // Skip bare functions - they are assumed to be built-in functions (Amp or DataFusion) - FunctionReference::Bare { function: _ } => { - continue; - } - FunctionReference::Qualified { schema, function } => { - // Match on schema type: DepAlias (external dependency) or SelfRef (same-dataset function) - match schema.as_ref() { - DepAliasOrSelfRef::DepAlias(dep_alias) => { - // External dependency reference - lookup in dependencies map - let hash_ref = dependencies.get(dep_alias).ok_or_else(|| { - PlanningCtxForSqlTablesWithDepsError::DependencyAliasNotFoundForFunctionRef { - table_name: table_name.clone(), - alias: dep_alias.clone(), - } - })?; - - // Load dataset by hash (cached by store) - let dataset = store.get_dataset(hash_ref).await.map_err(|err| { - PlanningCtxForSqlTablesWithDepsError::GetDatasetForFunction { - table_name: table_name.clone(), - reference: hash_ref.clone(), - source: err, - } - })?; - - // Skip if function reference is already resolved (optimization to avoid redundant UDF creation) - let Entry::Vacant(entry) = udfs - .entry(hash_ref.hash().clone()) - .or_default() - .entry(func_ref.to_string_reference()) - else { - continue; - }; - - // Get the UDF for this function reference - let udf = if function.as_ref() == ETH_CALL_FUNCTION_NAME { - store - .eth_call_for_dataset(&schema.to_string(), &dataset) - .await - .map_err(|err| { - PlanningCtxForSqlTablesWithDepsError::EthCallUdfCreationForFunction { - table_name: table_name.clone(), - reference: hash_ref.clone(), - source: err, - } - })? - .ok_or_else(|| { - PlanningCtxForSqlTablesWithDepsError::EthCallNotAvailable { - table_name: table_name.clone(), - reference: hash_ref.clone(), - } - })? - } else { - dataset - .function_by_name(schema.to_string(), function, IsolatePool::dummy()) - .ok_or_else(|| { - PlanningCtxForSqlTablesWithDepsError::FunctionNotFoundInDataset { - table_name: table_name.clone(), - function_name: func_ref.to_string(), - reference: hash_ref.clone(), - } - })? - }; - - entry.insert(udf); - } - DepAliasOrSelfRef::SelfRef => { - // Same-dataset function reference (self.function_name) - // Look up function in the functions map (defined in this dataset) - if let Some(func_def) = functions.get(function) { - // Skip if function reference is already resolved (optimization) - let Entry::Vacant(entry) = self_udfs.entry(func_ref.clone()) else { - continue; - }; - - // Create UDF from Function definition using JsUdf - // Use "self" as schema qualifier to preserve case sensitivity - let udf = AsyncScalarUDF::new(Arc::new(JsUdf::new( - isolate_pool.clone(), - Some( - datasets_common::deps::alias::SELF_REF_KEYWORD.to_string(), - ), // Schema = "self" - func_def.source.source.clone(), - func_def.source.filename.clone().into(), - Arc::from(function.as_ref().as_str()), - func_def - .input_types - .iter() - .map(|dt| dt.clone().into_arrow()) - .collect(), - func_def.output_type.clone().into_arrow(), - ))) - .into_scalar_udf(); - - entry.insert(udf); - } else { - // Function not in functions map - this is an error for self-references - tracing::error!( - table=%table_name, - function=%function.as_ref(), - "Self-referenced function not defined in functions map" - ); - // TODO: Add proper error variant for this case - } - } - } - } - } + for f_ref in f_refs { + func_refs.push(f_ref.clone().into_parts()); } } - // Flatten to Vec and create single unified planning context - // Extract values from nested BTreeMap structure - Ok(PlanningContext::new(LogicalCatalog { - tables: tables - .into_values() - .flat_map(|map| map.into_values()) - .collect(), - udfs: self_udfs - .into_values() - .chain(udfs.into_values().flat_map(|map| map.into_values())) - .collect(), - })) + // Resolve using the pre-resolved resolver + let resolver = PreResolvedResolver::new(&dependencies); + let catalog = resolve_logical_catalog( + store, + &resolver, + table_refs, + func_refs, + self_refs, + &isolate_pool, + ) + .await + .map_err(PlanningCtxForSqlTablesWithDepsError::Resolution)?; + + Ok(PlanningContext::new(catalog)) } // ================================================================================================ @@ -707,15 +391,26 @@ pub enum PlanningCtxForSqlTablesWithDepsError { referenced_table_name: TableName, reference: HashReference, }, + + /// Failed during catalog resolution. + /// + /// This wraps errors from the core resolution logic when processing + /// flattened references where we cannot determine which derived table's + /// SQL caused the error. + #[error("Catalog resolution failed")] + Resolution(#[source] ResolveError), } +/// Type alias for resolve errors with pre-resolved dependencies. +pub type ResolveError = common::catalog::resolve::ResolveError; + /// Errors specific to planning_ctx_for_sql operations /// /// This error type is used exclusively by `planning_ctx_for_sql()` to create /// a planning context for SQL queries without requiring physical data to exist. #[derive(Debug, thiserror::Error)] -pub enum CatalogForSqlWithDepsError { +pub enum CatalogForDerivedTableError { /// Failed to resolve table references from the SQL statement. /// /// This occurs when: @@ -735,142 +430,21 @@ pub enum CatalogForSqlWithDepsError { #[error("Failed to resolve function references from SQL")] FunctionReferenceResolution(#[source] ResolveFunctionReferencesError), - /// Failed to get the physical catalog for the resolved tables and functions. - /// - /// This wraps errors from `get_physical_catalog_with_deps`, which can occur when: - /// - Dataset retrieval fails - /// - Physical table metadata cannot be retrieved - /// - Tables have not been synced - /// - Dependency aliases are invalid or not found - #[error("Failed to get physical catalog with dependencies: {0}")] - GetPhysicalCatalogWithDeps(#[source] GetPhysicalCatalogWithDepsError), -} - -/// Errors specific to get_physical_catalog_with_deps operations - -#[derive(Debug, thiserror::Error)] -pub enum GetPhysicalCatalogWithDepsError { - /// Failed to get the logical catalog with dependencies and functions. + /// Failed during catalog resolution. /// - /// This wraps errors from `get_logical_catalog_with_deps_and_funcs`, which can occur when: - /// - Dataset names cannot be extracted from table references or function names - /// - Dataset retrieval fails - /// - UDF creation fails - /// - Dependency aliases are invalid or not found - #[error("Failed to get logical catalog with dependencies and functions: {0}")] - GetLogicalCatalogWithDepsAndFuncs(#[source] GetLogicalCatalogWithDepsAndFuncsError), + /// This wraps errors from the core resolution logic. + #[error("Catalog resolution failed")] + Resolution(#[source] BoxError), /// Failed to retrieve physical table metadata from the metadata database. - /// - /// This occurs when querying the metadata database for the active physical - /// location of a table fails due to database connection issues, query errors, - /// or other database-related problems. - #[error("Failed to retrieve physical table metadata for table '{table}': {source}")] - PhysicalTableRetrieval { table: String, source: BoxError }, - - /// Table has not been synced and no physical location exists. - /// - /// This occurs when attempting to load a physical catalog for a table that - /// has been defined but has not yet been dumped/synced to storage. The table - /// exists in the dataset definition but has no physical parquet files. - #[error("Table '{table}' has not been synced")] - TableNotSynced { table: String }, -} - -/// Errors specific to get_logical_catalog_with_deps_and_funcs operations -#[derive(Debug, thiserror::Error)] -#[allow(clippy::large_enum_variant)] -pub enum GetLogicalCatalogWithDepsAndFuncsError { - /// Table is not qualified with a schema/dataset name. - /// - /// All tables must be qualified with a dataset reference in the schema portion. - /// Unqualified tables (e.g., just `table_name`) are not allowed. - #[error("Unqualified table '{table_ref}', all tables must be qualified with a dataset")] - UnqualifiedTable { table_ref: String }, - - /// Dependency alias not found when processing table reference. - /// - /// This occurs when a table reference uses an alias that was not provided - /// in the dependencies map. - #[error( - "Dependency alias '{alias}' referenced in table reference but not provided in dependencies" - )] - DependencyAliasNotFoundForTableRef { alias: DepAlias }, - - /// Failed to retrieve dataset from store when loading dataset for table reference. - /// - /// This occurs when loading a dataset definition fails: - /// - Dataset not found in the store - /// - Dataset manifest is invalid or corrupted - /// - Unsupported dataset kind - /// - Storage backend errors when reading the dataset - #[error("Failed to retrieve dataset '{reference}' for table reference")] - GetDatasetForTableRef { - reference: HashReference, - #[source] - source: BoxError, - }, - - /// Dependency alias not found when processing function reference. - /// - /// This occurs when a function reference uses an alias that was not provided - /// in the dependencies map. - #[error( - "Dependency alias '{alias}' referenced in function reference but not provided in dependencies" - )] - DependencyAliasNotFoundForFunctionRef { alias: DepAlias }, - - /// Failed to retrieve dataset from store when loading dataset for function. - /// - /// This occurs when loading a dataset definition for a function fails: - /// - Dataset not found in the store - /// - Dataset manifest is invalid or corrupted - /// - Unsupported dataset kind - /// - Storage backend errors when reading the dataset - #[error("Failed to retrieve dataset '{reference}' for function reference")] - GetDatasetForFunction { - reference: HashReference, - #[source] - source: BoxError, - }, - - /// Failed to create ETH call UDF for dataset referenced in function name. - /// - /// This occurs when creating the eth_call user-defined function for a function fails: - /// - Invalid provider configuration for the dataset - /// - Provider connection issues - /// - Dataset is not an EVM RPC dataset but eth_call was requested - #[error("Failed to create ETH call UDF for dataset '{reference}' for function reference")] - EthCallUdfCreationForFunction { - reference: HashReference, + #[error("Failed to retrieve physical table metadata for table '{table}'")] + PhysicalTableRetrieval { + table: String, #[source] source: BoxError, }, - /// eth_call function not available for dataset. - /// - /// This occurs when the eth_call function is referenced in SQL but the - /// dataset does not support eth_call (not an EVM RPC dataset or no provider configured). - #[error("Function 'eth_call' not available for dataset '{reference}'")] - EthCallNotAvailable { reference: HashReference }, - - /// Function not found in dataset. - /// - /// This occurs when a function is referenced in the SQL query but the - /// dataset does not contain a function with that name. - #[error("Function '{function_name}' not found in dataset '{reference}'")] - FunctionNotFoundInDataset { - function_name: String, - reference: HashReference, - }, - - /// Table not found in dataset. - /// - /// This occurs when the table name is referenced in the SQL query but the - /// dataset does not contain a table with that name. - #[error("Table '{table_name}' not found in dataset '{reference}'")] - TableNotFoundInDataset { - table_name: TableName, - reference: HashReference, - }, + /// Table has not been synced and no physical location exists. + #[error("Table '{table}' has not been synced")] + TableNotSynced { table: String }, } diff --git a/crates/core/datasets-derived/src/logical.rs b/crates/core/datasets-derived/src/logical.rs index 9ddf2147b..8932ba824 100644 --- a/crates/core/datasets-derived/src/logical.rs +++ b/crates/core/datasets-derived/src/logical.rs @@ -38,6 +38,7 @@ use crate::{ DerivedDatasetKind, Manifest, catalog::{ PlanningCtxForSqlTablesWithDepsError, planning_ctx_for_sql_tables_with_deps_and_funcs, + self_refs_from_manifest, }, manifest::{TableInput, View}, }; @@ -411,7 +412,7 @@ pub async fn validate( store, references, dependencies, - manifest.functions.clone(), + self_refs_from_manifest(manifest), IsolatePool::dummy(), // For manifest validation only (no JS execution) ) .await @@ -443,6 +444,31 @@ pub async fn validate( PlanningCtxForSqlTablesWithDepsError::EthCallNotAvailable { .. } => { ManifestValidationError::EthCallNotAvailable(err) } + PlanningCtxForSqlTablesWithDepsError::Resolution(resolve_err) => { + use common::catalog::resolve::ResolveError; + + use crate::catalog::PreResolvedError; + match resolve_err { + ResolveError::SchemaResolution(PreResolvedError::AliasNotFound(_)) => { + ManifestValidationError::DependencyAliasNotFound(err) + } + ResolveError::FunctionNotFound { .. } + | ResolveError::SelfFunctionNotFound { .. } => { + ManifestValidationError::FunctionNotFoundInDataset(err) + } + ResolveError::TableNotFound { .. } => { + ManifestValidationError::TableNotFoundInDataset(err) + } + ResolveError::EthCallNotAvailable { .. } => { + ManifestValidationError::EthCallNotAvailable(err) + } + ResolveError::DatasetLoad { .. } => ManifestValidationError::GetDataset(err), + ResolveError::EthCallCreation { .. } => { + ManifestValidationError::EthCallUdfCreation(err) + } + _ => ManifestValidationError::CatalogResolution(err), + } + } })?; // Step 4: Validate that all table SQL queries are incremental @@ -614,6 +640,12 @@ pub enum ManifestValidationError { #[error("Dependency alias not found: {0}")] DependencyAliasNotFound(#[source] PlanningCtxForSqlTablesWithDepsError), + /// Catalog resolution failed + /// + /// This occurs when the core catalog resolution logic fails. + #[error("Catalog resolution failed")] + CatalogResolution(#[source] PlanningCtxForSqlTablesWithDepsError), + /// Non-incremental SQL operation in table query /// /// This occurs when a table's SQL query contains operations that cannot be diff --git a/crates/core/dump/src/derived_dataset.rs b/crates/core/dump/src/derived_dataset.rs index b6a6b7686..ad36f2a72 100644 --- a/crates/core/dump/src/derived_dataset.rs +++ b/crates/core/dump/src/derived_dataset.rs @@ -104,7 +104,9 @@ use common::{ }; use datasets_common::{deps::alias::DepAlias, hash_reference::HashReference}; use datasets_derived::{ - Manifest as DerivedManifest, catalog::catalog_for_sql_with_deps, manifest::TableInput, + Manifest as DerivedManifest, + catalog::{catalog_for_derived_table, self_refs_from_manifest}, + manifest::TableInput, }; use futures::StreamExt as _; use metadata_db::NotificationMultiplexerHandle; @@ -355,13 +357,13 @@ async fn dump_table( let mut join_set = tasks::FailFastJoinSet::>::new(); - let catalog = catalog_for_sql_with_deps( + let catalog = catalog_for_derived_table( &ctx.dataset_store, &ctx.data_store, &query, &env, &dependencies, - &manifest.functions, + self_refs_from_manifest(manifest), ) .await?; let planning_ctx = PlanningContext::new(catalog.logical().clone()); diff --git a/crates/services/admin-api/src/handlers/error.rs b/crates/services/admin-api/src/handlers/error.rs index edf03027f..0862d51db 100644 --- a/crates/services/admin-api/src/handlers/error.rs +++ b/crates/services/admin-api/src/handlers/error.rs @@ -1,6 +1,7 @@ //! Error handling types for HTTP handlers use axum::{Json, http::StatusCode}; +use common::utils::error_with_causes; /// Standard error response returned by the API /// @@ -47,7 +48,7 @@ pub struct ErrorResponse { /// /// This trait must be implemented by all handler-specific error enums to enable /// automatic conversion into `ErrorResponse`. -pub trait IntoErrorResponse: std::fmt::Display + Send + Sync + 'static { +pub trait IntoErrorResponse: std::error::Error + Send + Sync + 'static { /// Returns a stable, machine-readable error code /// /// Error codes should use SCREAMING_SNAKE_CASE and remain stable across versions. @@ -65,7 +66,7 @@ where ErrorResponse { status_code: error.status_code(), error_code: error.error_code().to_string(), - error_message: error.to_string(), + error_message: error_with_causes(&error), } } } diff --git a/crates/services/admin-api/src/handlers/schema.rs b/crates/services/admin-api/src/handlers/schema.rs index 12282b85f..78751d327 100644 --- a/crates/services/admin-api/src/handlers/schema.rs +++ b/crates/services/admin-api/src/handlers/schema.rs @@ -29,6 +29,7 @@ use datasets_common::{ use datasets_derived::{ catalog::{ PlanningCtxForSqlTablesWithDepsError, planning_ctx_for_sql_tables_with_deps_and_funcs, + self_refs_from_functions, }, manifest::{Function, TableSchema}, }; @@ -272,7 +273,7 @@ pub async fn handler( &ctx.dataset_store, references, dependencies, - functions, + self_refs_from_functions(&functions), IsolatePool::dummy(), // For schema validation only (no JS execution) ) .await @@ -326,6 +327,33 @@ pub async fn handler( PlanningCtxForSqlTablesWithDepsError::EthCallNotAvailable { .. } => { Error::EthCallNotAvailable(err) } + PlanningCtxForSqlTablesWithDepsError::Resolution(resolve_err) => { + use common::catalog::resolve::ResolveError; + use datasets_derived::catalog::PreResolvedError; + match resolve_err { + ResolveError::SchemaResolution(PreResolvedError::AliasNotFound(_)) => { + Error::DependencyAliasNotFound(err) + } + ResolveError::FunctionNotFound { .. } + | ResolveError::SelfFunctionNotFound { .. } => { + Error::FunctionNotFoundInDataset(err) + } + ResolveError::TableNotFound { .. } => Error::TableNotFoundInDataset(err), + ResolveError::EthCallNotAvailable { .. } => Error::EthCallNotAvailable(err), + ResolveError::DatasetLoad { source, .. } => { + if source + .downcast_ref::() + .is_some_and(|e| matches!(e, GetDatasetError::DatasetNotFound(_))) + { + Error::DatasetNotFound(err) + } else { + Error::GetDataset(err) + } + } + ResolveError::EthCallCreation { .. } => Error::EthCallUdfCreation(err), + _ => Error::CatalogResolution(err), + } + } })?; // Infer schema for each table and extract networks @@ -663,6 +691,12 @@ enum Error { #[error(transparent)] DependencyAliasNotFound(PlanningCtxForSqlTablesWithDepsError), + /// Catalog resolution failed + /// + /// This occurs when the core catalog resolution logic fails. + #[error(transparent)] + CatalogResolution(PlanningCtxForSqlTablesWithDepsError), + /// Failed to infer schema for table /// /// This occurs when: @@ -726,6 +760,7 @@ impl IntoErrorResponse for Error { Error::FunctionNotFoundInDataset(_) => "FUNCTION_NOT_FOUND_IN_DATASET", Error::EthCallNotAvailable(_) => "ETH_CALL_NOT_AVAILABLE", Error::DependencyAliasNotFound(_) => "DEPENDENCY_ALIAS_NOT_FOUND", + Error::CatalogResolution(_) => "CATALOG_RESOLUTION", Error::SchemaInference { .. } => "SCHEMA_INFERENCE", } } @@ -753,6 +788,7 @@ impl IntoErrorResponse for Error { Error::FunctionNotFoundInDataset(_) => StatusCode::NOT_FOUND, Error::EthCallNotAvailable(_) => StatusCode::NOT_FOUND, Error::DependencyAliasNotFound(_) => StatusCode::BAD_REQUEST, + Error::CatalogResolution(_) => StatusCode::INTERNAL_SERVER_ERROR, Error::SchemaInference { .. } => StatusCode::INTERNAL_SERVER_ERROR, } } diff --git a/tests/src/tests/it_admin_api_datasets_register.rs b/tests/src/tests/it_admin_api_datasets_register.rs index 0015cc38a..618800bd1 100644 --- a/tests/src/tests/it_admin_api_datasets_register.rs +++ b/tests/src/tests/it_admin_api_datasets_register.rs @@ -66,9 +66,12 @@ async fn register_with_missing_dependency_fails() { match err { RegisterError::ManifestValidationError(api_err) => { assert_eq!(api_err.error_code, "MANIFEST_VALIDATION_ERROR"); - assert_eq!( - api_err.error_message, - r#"Manifest validation error: Dependency alias not found: In table 'test_table': Dependency alias 'eth_firehose' referenced in table but not provided in dependencies"#, + assert!( + api_err + .error_message + .contains("dependency alias not found: eth_firehose"), + "error message should indicate the missing dependency alias, got: {}", + api_err.error_message ); } _ => panic!("Expected ManifestValidationError, got: {:?}", err), diff --git a/tests/src/tests/it_admin_api_schema.rs b/tests/src/tests/it_admin_api_schema.rs index 7ebfb3c72..7bac2bef8 100644 --- a/tests/src/tests/it_admin_api_schema.rs +++ b/tests/src/tests/it_admin_api_schema.rs @@ -1471,11 +1471,12 @@ async fn multiple_tables_with_missing_function_fails_on_first() { response.error_code, "FUNCTION_NOT_FOUND_IN_DATASET", "should return FUNCTION_NOT_FOUND_IN_DATASET error" ); - // Should fail on table2 which references fake_decode + // Should mention the missing function and the dataset assert!( - response.error_message.contains("table2") - && response.error_message.contains("eth.fake_decode"), - "error message should reference the failing table and function, got: {}", + response + .error_message + .contains("function 'eth.fake_decode' not found in dataset _/eth_firehose"), + "error message should reference the missing function and dataset, got: {}", response.error_message ); }