Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 22 additions & 0 deletions bindings/python/example/example.py
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,28 @@ async def main():
except Exception as e:
print(f"Error during scanning: {e}")

# Demo: Column projection
print("\n--- Testing Column Projection ---")
try:
# Project specific columns by index
print("\n1. Projection by index [0, 1] (id, name):")
scanner_index = await table.new_log_scanner(project=[0, 1])
scanner_index.subscribe(None, None)
df_projected = scanner_index.to_pandas()
print(df_projected.head())
print(f" Projected {df_projected.shape[1]} columns: {list(df_projected.columns)}")

# Project specific columns by name (Pythonic!)
print("\n2. Projection by name ['name', 'score'] (Pythonic):")
scanner_names = await table.new_log_scanner(columns=["name", "score"])
scanner_names.subscribe(None, None)
df_named = scanner_names.to_pandas()
print(df_named.head())
print(f" Projected {df_named.shape[1]} columns: {list(df_named.columns)}")

except Exception as e:
print(f"Error during projection: {e}")

# Close connection
conn.close()
print("\nConnection closed")
Expand Down
112 changes: 87 additions & 25 deletions bindings/python/src/table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,12 @@ pub struct FlussTable {
has_primary_key: bool,
}

/// Internal enum to represent different projection types
enum ProjectionType {
Indices(Vec<usize>),
Names(Vec<String>),
}

#[pymethods]
impl FlussTable {
/// Create a new append writer for the table
Expand All @@ -57,32 +63,39 @@ impl FlussTable {
})
}

/// Create a new log scanner for the table
fn new_log_scanner<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
let conn = self.connection.clone();
let metadata = self.metadata.clone();
let table_info = self.table_info.clone();

future_into_py(py, async move {
let fluss_table =
fcore::client::FlussTable::new(&conn, metadata.clone(), table_info.clone());

let table_scan = fluss_table.new_scan();

let rust_scanner = table_scan.create_log_scanner().map_err(|e| {
PyErr::new::<pyo3::exceptions::PyRuntimeError, _>(format!(
"Failed to create log scanner: {e:?}"
))
})?;

let admin = conn
.get_admin()
.await
.map_err(|e| FlussError::new_err(e.to_string()))?;
/// Create a new log scanner for the table.
///
/// Args:
/// project: Optional list of column indices (0-based) to include in the scan.
/// columns: Optional list of column names to include in the scan.
///
/// Returns:
/// LogScanner, optionally with projection applied
///
/// Note:
/// Specify only one of 'project' or 'columns'.
/// If neither is specified, all columns are included.
/// Rust side will validate the projection parameters.
///
#[pyo3(signature = (project=None, columns=None))]
pub fn new_log_scanner<'py>(
&self,
py: Python<'py>,
project: Option<Vec<usize>>,
columns: Option<Vec<String>>,
) -> PyResult<Bound<'py, PyAny>> {
let projection = match (project, columns) {
(Some(_), Some(_)) => {
return Err(FlussError::new_err(
"Specify only one of 'project' or 'columns'".to_string(),
));
}
(Some(indices), None) => Some(ProjectionType::Indices(indices)),
(None, Some(names)) => Some(ProjectionType::Names(names)),
(None, None) => None,
};

let py_scanner = LogScanner::from_core(rust_scanner, admin, table_info.clone());
Python::attach(|py| Py::new(py, py_scanner))
})
self.create_log_scanner_internal(py, projection)
}

/// Get table information
Expand Down Expand Up @@ -126,6 +139,55 @@ impl FlussTable {
has_primary_key,
}
}

/// Internal helper to create log scanner with optional projection
fn create_log_scanner_internal<'py>(
&self,
py: Python<'py>,
projection: Option<ProjectionType>,
) -> PyResult<Bound<'py, PyAny>> {
let conn = self.connection.clone();
let metadata = self.metadata.clone();
let table_info = self.table_info.clone();

future_into_py(py, async move {
let fluss_table =
fcore::client::FlussTable::new(&conn, metadata.clone(), table_info.clone());

let mut table_scan = fluss_table.new_scan();

// Apply projection if specified
if let Some(proj) = projection {
table_scan = match proj {
ProjectionType::Indices(indices) => {
table_scan.project(&indices).map_err(|e| {
FlussError::new_err(format!("Failed to project columns: {e}"))
})?
}
ProjectionType::Names(names) => {
// Convert Vec<String> to Vec<&str> for the API
let column_name_refs: Vec<&str> =
names.iter().map(|s| s.as_str()).collect();
table_scan.project_by_name(&column_name_refs).map_err(|e| {
FlussError::new_err(format!("Failed to project columns: {e}"))
})?
}
};
}

let rust_scanner = table_scan
.create_log_scanner()
.map_err(|e| FlussError::new_err(format!("Failed to create log scanner: {e}")))?;

let admin = conn
.get_admin()
.await
.map_err(|e| FlussError::new_err(e.to_string()))?;

let py_scanner = LogScanner::from_core(rust_scanner, admin, table_info.clone());
Python::attach(|py| Py::new(py, py_scanner))
})
}
}

/// Writer for appending data to a Fluss table
Expand Down
Loading