diff --git a/bindings/python/example/example.py b/bindings/python/example/example.py index 730416bb..5d0302ef 100644 --- a/bindings/python/example/example.py +++ b/bindings/python/example/example.py @@ -224,6 +224,28 @@ async def main(): # TODO: support to_duckdb() + # Test the new poll() method for incremental reading + print("\n--- Testing poll() method ---") + # Reset subscription to start from the beginning + log_scanner.subscribe(None, None) + + # Poll with a timeout of 5000ms (5 seconds) + # Note: poll() returns an empty table (not an error) on timeout + try: + poll_result = log_scanner.poll(5000) + print(f"Number of rows: {poll_result.num_rows}") + + if poll_result.num_rows > 0: + poll_df = poll_result.to_pandas() + print(f"Polled data:\n{poll_df}") + else: + print("Empty result (no records available)") + # Empty table still has schema + print(f"Schema: {poll_result.schema}") + + except Exception as e: + print(f"Error during poll: {e}") + except Exception as e: print(f"Error during scanning: {e}") diff --git a/bindings/python/src/table.rs b/bindings/python/src/table.rs index b56a29db..9e666455 100644 --- a/bindings/python/src/table.rs +++ b/bindings/python/src/table.rs @@ -20,10 +20,13 @@ use crate::*; use arrow::array::RecordBatch; use arrow_pyarrow::{FromPyArrow, ToPyArrow}; use fluss::client::EARLIEST_OFFSET; +use fluss::record::to_arrow_schema; use fluss::rpc::message::OffsetSpec; use pyo3::types::IntoPyDict; use pyo3_async_runtimes::tokio::future_into_py; +use std::collections::HashMap; use std::sync::Arc; +use std::time::Duration; // Time conversion constants const MILLIS_PER_SECOND: i64 = 1_000; @@ -186,7 +189,7 @@ impl FlussTable { } let rust_scanner = table_scan - .create_log_scanner() + .create_record_batch_log_scanner() .map_err(|e| FlussError::new_err(format!("Failed to create log scanner: {e}")))?; let admin = conn @@ -888,7 +891,7 @@ fn get_type_name(value: &Bound) -> String { /// Scanner for reading log data from a Fluss table #[pyclass] pub struct LogScanner { - inner: fcore::client::LogScanner, + inner: fcore::client::RecordBatchLogScanner, admin: fcore::client::FlussAdmin, table_info: fcore::metadata::TableInfo, #[allow(dead_code)] @@ -933,63 +936,78 @@ impl LogScanner { /// Convert all data to Arrow Table fn to_arrow(&self, py: Python) -> PyResult> { - use std::collections::HashMap; - use std::time::Duration; - let mut all_batches = Vec::new(); let num_buckets = self.table_info.get_num_buckets(); let bucket_ids: Vec = (0..num_buckets).collect(); // todo: after supporting list_offsets with timestamp, we can use start_timestamp and end_timestamp here - let mut stopping_offsets: HashMap = TOKIO_RUNTIME - .block_on(async { - self.admin - .list_offsets( - &self.table_info.table_path, - bucket_ids.as_slice(), - OffsetSpec::Latest, - ) - .await + let mut stopping_offsets: HashMap = py + .detach(|| { + TOKIO_RUNTIME.block_on(async { + self.admin + .list_offsets( + &self.table_info.table_path, + bucket_ids.as_slice(), + OffsetSpec::Latest, + ) + .await + }) }) .map_err(|e| FlussError::new_err(e.to_string()))?; - if !stopping_offsets.is_empty() { - loop { - let batch_result = TOKIO_RUNTIME - .block_on(async { self.inner.poll(Duration::from_millis(500)).await }); - - match batch_result { - Ok(scan_records) => { - let mut result_records: Vec = vec![]; - for (bucket, records) in scan_records.into_records_by_buckets() { - let stopping_offset = stopping_offsets.get(&bucket.bucket_id()); - - if stopping_offset.is_none() { - // not to include this bucket, skip records for this bucket - // since we already reach end offset for this bucket - continue; - } - if let Some(last_record) = records.last() { - let offset = last_record.offset(); - result_records.extend(records); - if offset >= stopping_offset.unwrap() - 1 { - stopping_offsets.remove(&bucket.bucket_id()); - } - } - } - - if !result_records.is_empty() { - let arrow_batch = Utils::convert_scan_records_to_arrow(result_records); - all_batches.extend(arrow_batch); - } - - // we have reach end offsets of all bucket - if stopping_offsets.is_empty() { - break; - } - } - Err(e) => return Err(FlussError::new_err(e.to_string())), + // Filter out buckets with no records to read (stop_at <= 0) + stopping_offsets.retain(|_, &mut v| v > 0); + + while !stopping_offsets.is_empty() { + let scan_batches = py + .detach(|| { + TOKIO_RUNTIME + .block_on(async { self.inner.poll(Duration::from_millis(500)).await }) + }) + .map_err(|e| FlussError::new_err(e.to_string()))?; + + if scan_batches.is_empty() { + continue; + } + + for scan_batch in scan_batches { + let bucket_id = scan_batch.bucket().bucket_id(); + + // Check if this bucket is still being tracked; if not, ignore the batch + let Some(&stop_at) = stopping_offsets.get(&bucket_id) else { + continue; + }; + + let base_offset = scan_batch.base_offset(); + let last_offset = scan_batch.last_offset(); + + // If the batch starts at or after the stop_at offset, the bucket is exhausted + if base_offset >= stop_at { + stopping_offsets.remove(&bucket_id); + continue; + } + + let batch = if last_offset >= stop_at { + // This batch contains the target offset; slice it to keep only records + // where offset < stop_at. + let num_to_keep = (stop_at - base_offset) as usize; + let b = scan_batch.into_batch(); + + // Safety check: ensure we don't attempt to slice more rows than the batch contains + let limit = num_to_keep.min(b.num_rows()); + b.slice(0, limit) + } else { + // The entire batch is within the desired range (all offsets < stop_at) + scan_batch.into_batch() + }; + + all_batches.push(Arc::new(batch)); + + // If the batch's last offset reached or passed the inclusive limit (stop_at - 1), + // we are done with this bucket. + if last_offset >= stop_at - 1 { + stopping_offsets.remove(&bucket_id); } } } @@ -1006,15 +1024,68 @@ impl LogScanner { Ok(df) } + /// Poll for new records with the specified timeout + /// + /// Args: + /// timeout_ms: Timeout in milliseconds to wait for records + /// + /// Returns: + /// PyArrow Table containing the polled records + /// + /// Note: + /// - Returns an empty table (with correct schema) if no records are available + /// - When timeout expires, returns an empty table (NOT an error) + fn poll(&self, py: Python, timeout_ms: i64) -> PyResult> { + if timeout_ms < 0 { + return Err(FlussError::new_err(format!( + "timeout_ms must be non-negative, got: {timeout_ms}" + ))); + } + + let timeout = Duration::from_millis(timeout_ms as u64); + let scan_batches = py + .detach(|| TOKIO_RUNTIME.block_on(async { self.inner.poll(timeout).await })) + .map_err(|e| FlussError::new_err(e.to_string()))?; + + // Convert ScanBatch to Arrow batches + if scan_batches.is_empty() { + return self.create_empty_table(py); + } + + let arrow_batches: Vec<_> = scan_batches + .into_iter() + .map(|scan_batch| Arc::new(scan_batch.into_batch())) + .collect(); + + Utils::combine_batches_to_table(py, arrow_batches) + } + + /// Create an empty PyArrow table with the correct schema + fn create_empty_table(&self, py: Python) -> PyResult> { + let arrow_schema = to_arrow_schema(self.table_info.get_row_type()) + .map_err(|e| FlussError::new_err(format!("Failed to get arrow schema: {e}")))?; + let py_schema = arrow_schema + .as_ref() + .to_pyarrow(py) + .map_err(|e| FlussError::new_err(format!("Failed to convert schema: {e}")))?; + + let pyarrow = py.import("pyarrow")?; + let empty_table = pyarrow + .getattr("Table")? + .call_method1("from_batches", (vec![] as Vec>, py_schema))?; + + Ok(empty_table.into()) + } + fn __repr__(&self) -> String { format!("LogScanner(table={})", self.table_info.table_path) } } impl LogScanner { - /// Create LogScanner from core LogScanner + /// Create LogScanner from core RecordBatchLogScanner pub fn from_core( - inner_scanner: fcore::client::LogScanner, + inner_scanner: fcore::client::RecordBatchLogScanner, admin: fcore::client::FlussAdmin, table_info: fcore::metadata::TableInfo, ) -> Self {